solr导入数据的高效方法

2019-03-27 00:22|来源: 网路


   Solr提供了丰富的数据导入接口,可以导入数据库表、xmljsoncsv各种格式的数据信息。


Solr的数据导入接口可以分为两类:DIH接口和HTTP接口 。关于DIH接口的用法可以参看链接:http://blog.chenlb.com/2010/03/solr-data-import-quick-start.html,我就不重复造轮子了。


关于HTTP接口有EmbeddedSolrServerConcurrentUpdateSolrServerHttpSolrServer


其中EmbeddedSolrServer是不用走HTTP通道的,所以性能比其它两个SolrServer要高1.5倍。但是本质上讲,这三个SolrServer在导入数据的处理方式上是一致的。其处理的过程如下:


1、把数据信息封装成Collection<SolrInputDocument>


2、Collection<SolrInputDocument>转化成xml或者序列化成javabin


3、形成SolrRequest,把转化后的信息封装成SolrRequest的参数。


4、如果是EmbeddedSolrServer,则直接对应到UpdateHandler。如果是HttpSolrServer,则走HTTP通道,需要用HttpClient把参数传递到服务器。


5、xml或者javabin还原成SolrInputDocument


6、SolrInputDocument转化成luceneDocument


7、luceneIndexWriterDocument写入到索引中。



这里面最让人费解的是在solrj中已经形成了SolrInputDocument,但是还得把SolrInputDocument转化成xml或者javabin,然后再还原回来。而且需要用Collection来保存。


而且,如果用EmbeddedSolrServer,无需走HTTP通道,也只能这样做。让人感到恼火。其实是有其它办法的。如果我们拿到了SolrCore,通过SolrCore就可以拿到UpdateProcessorChain,通过ProcessorChain就可以绕过前面的5步,直接到第6步了。



处理的代码如下:

public class SolrRecordHandler implements Runnable{
//生产者-消费者 solr doc
    private ArrayBlockingQueue<SolrInputDocument> docs=new ArrayBlockingQueue<SolrInputDocument>(5000);
        public void wrap(ResultSet rs){
SolrInputDocument doc=new SolrInputDocument();
        try {
             ResultSetMetaData rsm = rs.getMetaData();
             int numColumns = rsm.getColumnCount();
             for (int i = 1; i < (numColumns + 1); i++)
             {
                    doc.addField(rsm.getColumnName(i), rs.getObject(i));
             }
                                                                                                                                
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
        docs.add(doc);
        }
@Override
    public void run() {
        logger.info("solr 写线程启动开始。。。。");
        SolrCore core = cores.getCore("review");
        UpdateRequestProcessorChain chain=  core.getUpdateProcessingChain(null);
        SolrParams param=new ModifiableSolrParams();
        SolrQueryRequestBase req=new SolrQueryRequestBase(core,param){};
        SolrQueryResponse rsp=new SolrQueryResponse();
        UpdateRequestProcessor processor=chain.createProcessor(req, rsp);
        //不停地从队列中读取元素,直到任务结束
        SolrInputDocument doc;
        AddUpdateCommand acmd=new AddUpdateCommand(req);
        while(true){
            try {
                doc=docs.take();
                //读取到一个空的doc,则表明任务结束
                if(doc.isEmpty()){
                    break;
                }
                acmd.solrDoc=doc;
                processor.processAdd(acmd);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        logger.info("solr index thread finished!");
        //任务完成,则提交
        try {
            CommitUpdateCommand cmd=new CommitUpdateCommand(req, false);
            processor.processCommit(cmd);
        } catch (IOException e) {
            e.printStackTrace();
        }finally{
            try {
                processor.finish();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        isfinished.set(true);
    }
}


上面的代码是多线程的,一个线程负责把数据库的封装成SolrInputDocument,然后放到阻塞队列中,另外一个线程负责从阻塞队列中取出SolrInputDocument,然后添加到索引中。

最后,感谢@李雨前 的帮助。




本文出自 “每天进步一点点” 博客,请务必保留此出处http://sbp810050504.blog.51cto.com/2799422/1403656


转自:http://sbp810050504.blog.51cto.com/2799422/1403656

相关问答

更多
  • solr数据导入,经过这几天的查资料,我觉得solr数据导入可以有三种方式: 1、编写数据xml文件,通过post.jar导入; 2、通过DIH导入; 3、利用solrj导入数据; 现针对第三种方式进行研究,在第一步中写了一段小的测试代码,可以参考:http://wiki.apache.org/solr/Solrj#Streaming_documents_for_an_update 具体的代码解释如下: String url = "http://localhost:8080/solr"; HttpSolrS ...
  • 首先,先查看数据是否导入 然后,点击界面右上角的original UI 再次查询就可以查到了。
  • 调研了一下,发现索引文件的数据结构相当复杂,这个好像是每提交一次建索引,就会将以前已生成的索引重新组织,而且还会生成新文件,所以如果采用在HDFS中追加写索引文件,那工作量将相当大,必须清楚了解索引文件数据结构及索引文件关联,下面有三篇对lucene索引结构的分析,我是没怎么看懂,有兴趣的可以看一下
  • 在solr与tomcat整合文章中,我用的索引库是mycore,现在就以这个为例。 首先要准备jar包:solr-dataimporthandler-4.8.1.jar、solr-dataimporthandler-extras-4.8.1.jar和mysql-connector-java-5.0.7-bin.jar这三个包到solr的tomcat的webapps\solr\WEB-INF\lib下 在这个文件夹的conf下配置两个文件,添加一个文件。先配置solrconfig.xml。 在该文件下添加一个 ...
  • 你必须在你的dataConfig中使用想要的sql语句中的deletedPkQuery属性,例如 deletedPkQuery="select ID from table where state = 'deleted'" 对你起作用吗? 来源http://wiki.apache.org/solr/DataImportHandler这是一个很好的教程: http : //solr.pl/en/2011/01/03/data-import-handler-%E2%80%93的拆卸, -数据-从索引/ You ...
  • JAVA_HOME=/usr/java/default JAVA_OPTIONS="-Dsolr.solr.home=/opt/solr/solr $JAVA_OPTIONS" JETTY_HOME=/opt/solr JETTY_USER=solr JETTY_LOGS=/opt/solr/logs 所有这些设置都很重要。 特别是,不设置JETTY_LOGS会导致jetty尝试(并且失败)将请求日志放在/ home / solr / logs中。 通过链接 https://wiki.apache.org ...
  • 如果您想进行部分导入,请查看增量导入 。 If you want to do a partial import, take a look at delta imports.
  • 如果DIH处理程序忙于运行请求,则会忽略发送给它的任何其他请求。 所以你必须回顾一下你如何/何时调用DIH的方法,一些想法: 正如你所说你有很高的编辑频率,通过id重新编排似乎并不是最好的,基于时间的某些时间似乎更具可扩展性。 你可以添加一个'lastUpdated'列(当这个行被创建/更新时通过触发器填充),然后每X(1分钟,5分钟......任何你能负担得起)调用reindex。 如果忽略一个请求,则不会丢失数据,应该重新索引的行将在下一个运行的增量上重新索引。 如果你想保持你的基于ID的方法,你需要: ...
  • 您可以将所有数据导入默认字段 。 那么您不需要明确提及字段名称。 (虽然你仍然可以,如果你想) 使用Solr的默认schema.xml已包含使用此类“catchall”字段的示例: 首先,必须像任何其他字段一样声明该字段: 然后,必须将此新字段声明为默认字段。 每当搜索不到特定字段时,将搜索以下字段:
  • 您遇到的错误与Solr无关,而与您访问数据库的方式有关。 如果查看异常: java.sql.SQLException: Operation not allowed after ResultSet closed 。 我建议将batchSize参数更改为其他值,例如1000 。 batchSize选项用于批量检索数据库表的行,以减少内存使用(通常用于防止在运行数据导入处理程序时内存不足)。 虽然批量较小的可能较慢,但该选项并不打算影响导入过程的速度。 The error you're facing does n ...