看solr源代码的笔记,主要是代码简单解析

2019-03-27 01:18|来源: 网路

配置

solr 对一个搜索请求的的流程

在solrconfig.xml会配置一个handler。配置了前置处理组件preParams,还有后置处理组件filterResult,当然还有默认的组件


 <requestHandler name="standard" class="solr.SearchHandler" default="true">

      <arr name="first-components">
         <str>preParams</str>
      </arr>
         <lst name="defaults">
           <str name="echoParams">explicit</str>
           <int name="rows">10</int>
           <int name="start">0</int>
          <str name="q">*:*</str>
         </lst>    

      <arr name="last-components">
         <str>filterResult</str>
      </arr>     

    </requestHandler>



http请求控制器

当一个查询请求过来的时候,先到类SolrDispatchFilter,由这个分发器寻找对应的handler来处理。
 

                String qt = solrReq.getParams().get( CommonParams.QT );
                handler = core.getRequestHandler( qt );


---------------------------------------------------------------------------------------------------

                this.execute( req, handler, solrReq, solrRsp );
                HttpCacheHeaderUtil.checkHttpCachingVeto(solrRsp, resp, reqMethod);


-----------------------------------------------------------------------------------------------

从上面的代码里看出是由solrCore留下的接口来处理请求。从代码框架上,从此刻开始进入solr的核心代码。


  protected void execute( HttpServletRequest req, SolrRequestHandler handler, SolrQueryRequest sreq, SolrQueryResponse rsp) {
    sreq.getContext().put( "webapp", req.getContextPath() );
    sreq.getCore().execute( handler, sreq, rsp );
  }



看一下solrCore代码execute的方法 的主要代码

public void execute(SolrRequestHandler handler, SolrQueryRequest req, SolrQueryResponse rsp) {
。。。。。
    handler.handleRequest(req,rsp);
    setResponseHeaderValues(handler,req,rsp);
 。。。。。。。
  }


主要实现对请求的处理,并将请求结果的状态信息写到响应的头部


SolrRequestHandler 处理器


再看一下对请求的处理。。先看定义该请求处理器的接口,可以更好理解。只有两个方法,一个是初始化信息,主要是配置时的默认参数,另一个就是处理请求的接口。

public interface SolrRequestHandler extends SolrInfoMBean {
  public void init(NamedList args);
  public void handleRequest(SolrQueryRequest req, SolrQueryResponse rsp);
}


先看一下实现该接口的类RequestHandlerBase


public void handleRequest(SolrQueryRequest req, SolrQueryResponse rsp) {
    numRequests++;
    try {
      SolrPluginUtils.setDefaults(req,defaults,appends,invariants);
      rsp.setHttpCaching(httpCaching);
      handleRequestBody( req, rsp );
      // count timeouts
      NamedList header = rsp.getResponseHeader();
      if(header != null) {
        Object partialResults = header.get("partialResults");
        boolean timedOut = partialResults == null ? false : (Boolean)partialResults;
        if( timedOut ) {
          numTimeouts++;
          rsp.setHttpCaching(false);
        }
      }
    } catch (Exception e) {
      SolrException.log(SolrCore.log,e);
      if (e instanceof ParseException) {
        e = new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
      }
      rsp.setException(e);
      numErrors++;
    }
    totalTime += rsp.getEndTime() - req.getStartTime();
  }


主要记录该请求处理的状态与处理时间记录。真正的实现方法交由各个子类      handleRequestBody( req, rsp );

现在看一下SearchHandler对于搜索处理的实现方法


首先是将solrconfig.xml上配置的各个处理组件按一定顺序组装起来,先是first-Component,默认的component,last-component.这些处理组件会按照它们的顺序来执行,以下是searchHandler的实现主体。方法handleRequestBody


 

  @Override
  public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception, ParseException, InstantiationException, IllegalAccessException
  {
    // int sleep = req.getParams().getInt("sleep",0);
    // if (sleep > 0) {log.error("SLEEPING for " + sleep);  Thread.sleep(sleep);}
    ResponseBuilder rb = new ResponseBuilder();
    rb.req = req;
    rb.rsp = rsp;
    rb.components = components;
    rb.setDebug(req.getParams().getBool(CommonParams.DEBUG_QUERY, false));

    final RTimer timer = rb.isDebug() ? new RTimer() : null;

    if (timer == null) {
      // non-debugging prepare phase
      for( SearchComponent c : components ) {
        c.prepare(rb);
      }
    } else {
      // debugging prepare phase
      RTimer subt = timer.sub( "prepare" );
      for( SearchComponent c : components ) {
        rb.setTimer( subt.sub( c.getName() ) );
        c.prepare(rb);
        rb.getTimer().stop();
      }
      subt.stop();
    }
     //单机版
    if (rb.shards == null) {
      // a normal non-distributed request

      // The semantics of debugging vs not debugging are different enough that
      // it makes sense to have two control loops
      if(!rb.isDebug()) {
        // Process
        for( SearchComponent c : components ) {
          c.process(rb);
        }
      }
      else {
        // Process
        RTimer subt = timer.sub( "process" );
        for( SearchComponent c : components ) {
          rb.setTimer( subt.sub( c.getName() ) );
          c.process(rb);
          rb.getTimer().stop();
        }
        subt.stop();
        timer.stop();

        // add the timing info
        if( rb.getDebugInfo() == null ) {
          rb.setDebugInfo( new SimpleOrderedMap<Object>() );
        }
        rb.getDebugInfo().add( "timing", timer.asNamedList() );
      }

    } else {//分布式请求
      // a distributed request

      HttpCommComponent comm = new HttpCommComponent();

      if (rb.outgoing == null) {
        rb.outgoing = new LinkedList<ShardRequest>();
      }
      rb.finished = new ArrayList<ShardRequest>();
 
      //起始状态为0,结束状态为整数的最大值
      int nextStage = 0;
      do {
        rb.stage = nextStage;
        nextStage = ResponseBuilder.STAGE_DONE;

        // call all components
        for( SearchComponent c : components ) {
          //得到所有组件运行后返回的下一个状态,并取最小值
          nextStage = Math.min(nextStage, c.distributedProcess(rb));
        }


        // 如果有需要向子机发送请求
        while (rb.outgoing.size() > 0) {

          // submit all current request tasks at once
          while (rb.outgoing.size() > 0) {
            ShardRequest sreq = rb.outgoing.remove(0);
            sreq.actualShards = sreq.shards;
            if (sreq.actualShards==ShardRequest.ALL_SHARDS) {
              sreq.actualShards = rb.shards;
            }
            sreq.responses = new ArrayList<ShardResponse>();

            // 向各个子机发送请求
            for (String shard : sreq.actualShards) {
              ModifiableSolrParams params = new ModifiableSolrParams(sreq.params);
              params.remove(ShardParams.SHARDS);      // not a top-level request
              params.remove("indent");
              params.remove(CommonParams.HEADER_ECHO_PARAMS);
              params.set(ShardParams.IS_SHARD, true);  // a sub (shard) request
              String shardHandler = req.getParams().get(ShardParams.SHARDS_QT);
              if (shardHandler == null) {
                params.remove(CommonParams.QT);
              } else {
                params.set(CommonParams.QT, shardHandler);
              }
            //提交子请求
             comm.submit(sreq, shard, params);
            }
          }


          // now wait for replies, but if anyone puts more requests on
          // the outgoing queue, send them out immediately (by exiting
          // this loop)
          while (rb.outgoing.size() == 0) {
            ShardResponse srsp = comm.takeCompletedOrError();
            if (srsp == null) break;  // no more requests to wait for

            // Was there an exception?  If so, abort everything and
            // rethrow
            if (srsp.getException() != null) {
              comm.cancelAll();
              if (srsp.getException() instanceof SolrException) {
                throw (SolrException)srsp.getException();
              } else {
                throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, srsp.getException());
              }
            }
 
            rb.finished.add(srsp.getShardRequest());

            //每个组件都对于返回的数据处理
            for(SearchComponent c : components) {
              c.handleResponses(rb, srsp.getShardRequest());
            }
          }
        }//请求队列结束

        //再对该轮请求进行收尾工作
        for(SearchComponent c : components) {
            c.finishStage(rb);
         }

        //如果状态未到结束,则继续循环
      } while (nextStage != Integer.MAX_VALUE);
    }
  }

首先运行的是各个组件的方法prepare

 

      for( SearchComponent c : components ) {
        c.prepare(rb);
      }


再则如果不是分布式搜索,则比较简单的运行

  for( SearchComponent c : components ) {
          c.process(rb);
        }


就结束!

如果是分布式搜索,过程会比较复杂些,对于每个组件处理都会返回一个状态,对于以下几个方法循环执行,直到状态结束 。  

在类ResponseBuilder定义了几个状态。

  

  public static int STAGE_START           = 0;
  public static int STAGE_PARSE_QUERY     = 1000;
  public static int STAGE_EXECUTE_QUERY   = 2000;
  public static int STAGE_GET_FIELDS      = 3000;
  public static int STAGE_DONE            = Integer.MAX_VALUE;

从STAGE_START---->STAGE_PARSE_QUERY------>STAGE_EXECUTE_QUERY--------------->STAGE_GET_FIELDS------------>STAGE_DONE

从这些状态名称可以猜得出整个对应的过程。

每个组件先调用方法distributeProcess,并返回下一个状态

     for( SearchComponent c : components ) {
          // the next stage is the minimum of what all components report
          nextStage = Math.min(nextStage, c.distributedProcess(rb));
        }


而方法handleResponse主要处理返回来的数据

     

      for(SearchComponent c : components) {
              c.handleResponses(rb, srsp.getShardRequest());
            }

然后交由finishStage方法来对每一个状态的过程作结束动作。

------------------------------

  for(SearchComponent c : components) {
            c.finishStage(rb);
         }


-----------------------------

了解这个流程有助于扩展solr。比如有个业务是要我对搜索的自然结果排序进行干预,而这个干预只针对前几页结果,所以我不得不做个组件来对其中结果进行处理。

所以我想可以添加一个组件放在最后-------------》

1)如果是分布式搜索:

       这个组件可以在重写finsihStage做处理。算是对最终结果的排序处理即可。

2)如果只是单机:

      这个组件可以在重写process做处理



组件

现在看一下其中一个主要的组件QueryComponent

prepare

对于QueryComponent主要解析用户传送的语法解析参数defType,以及过滤查询fq,返回字段集fl.排序字段Sort


单机处理

process


   分布式搜索过程中的某一步,这里应该是主机要合并文档,取出对应的文档的过程,

主机发出指定的solr主键ids来取文档集,首先取出对应的lucene的内部id集。如果某些文档已不在则弃掉。


String ids = params.get(ShardParams.IDS);
    if (ids != null) {//将传过来的ids,放进结果集中,并在后面取出对应的结果文档
     SchemaField idField = req.getSchema().getUniqueKeyField();
      List<String> idArr = StrUtils.splitSmart(ids, ",", true);
      int[] luceneIds = new int[idArr.size()];
      int docs = 0;
      for (int i=0; i<idArr.size(); i++) {
      //solr主键id对应的文档lucene内部的id
       int id = req.getSearcher().getFirstMatch(
                new Term(idField.getName(), idField.getType().toInternal(idArr.get(i))));
        if (id >= 0)
          luceneIds[docs++] = id;
      }
     
      DocListAndSet res = new DocListAndSet();

      //这里并没有传入scores[]

  res.docList = new DocSlice(0, docs, luceneIds, null, docs, 0);
//需要另一种doc集合处理。
 if (rb.isNeedDocSet()) {
 List<Query> queries = new ArrayList<Query>();
  queries.add(rb.getQuery());
List<Query> filters = rb.getFilters(); 
if (filters != null)
 queries.addAll(filters);
  res.docSet = searcher.getDocSet(queries);
 } 
rb.setResults(res);
 rsp.add("response",rb.getResults().docList);
 return; 
}

  //封装搜索值对象与封装结果值对象 
   SolrIndexSearcher.QueryCommand cmd = rb.getQueryCommand();
   //设置超时最大值
    cmd.setTimeAllowed(timeAllowed);
    SolrIndexSearcher.QueryResult result = new SolrIndexSearcher.QueryResult();
    //搜索
    searcher.search(result,cmd);
    //设置搜索结果
    rb.setResult( result );
    rsp.add("response",rb.getResults().docList);
    rsp.getToLog().add("hits", rb.getResults().docList.matches());
    //对含有字段排序处理
    doFieldSortValues(rb, searcher);
   //非分布查询过程,且搜索结果数小于50,进行缓存
    doPrefetch(rb);


目前看到真实获取文档内容的是在

QueryResponseWriter

例如xml的输出格式类XMLWriter




分布式处理

1)distributedProcess

  @Override  
  public int distributedProcess(ResponseBuilder rb) throws IOException {
    if (rb.stage < ResponseBuilder.STAGE_PARSE_QUERY)
      return ResponseBuilder.STAGE_PARSE_QUERY;
    if (rb.stage == ResponseBuilder.STAGE_PARSE_QUERY) {
      createDistributedIdf(rb);
      return ResponseBuilder.STAGE_EXECUTE_QUERY;
    }
    if (rb.stage < ResponseBuilder.STAGE_EXECUTE_QUERY) return ResponseBuilder.STAGE_EXECUTE_QUERY;
    if (rb.stage == ResponseBuilder.STAGE_EXECUTE_QUERY) {
//分布式查询
     createMainQuery(rb);
      return ResponseBuilder.STAGE_GET_FIELDS;
    }
    if (rb.stage < ResponseBuilder.STAGE_GET_FIELDS) return ResponseBuilder.STAGE_GET_FIELDS;
    if (rb.stage == ResponseBuilder.STAGE_GET_FIELDS) {
 
    //这里就会去对应的主机拿取需要的字段,封装请求字段的参数,放进请求队列里,可以由外部的searchHandler提交该请求,最后结果放在ShardResponse类里。
     createRetrieveDocs(rb);
      return ResponseBuilder.STAGE_DONE;
    }
    return ResponseBuilder.STAGE_DONE;
  }


  


 2) handleResponses

 public void handleResponses(ResponseBuilder rb, ShardRequest sreq) {  

         if ((sreq.purpose & ShardRequest.PURPOSE_GET_TOP_IDS) != 0) {

                      //合并ids 

               mergeIds(rb, sreq);

              //合并groupCount   

            mergeGroupCounts(rb, sreq); 

           }    

       if ((sreq.purpose & ShardRequest.PURPOSE_GET_FIELDS) != 0) {

               //获取文档的字段,并将结题组装起来放到最终结果列表对应的位置里    

             returnFields(rb, sreq);    

            return;  

       }

  }


   3)  finishStage


 

 @Override
  public void finishStage(ResponseBuilder rb) {
   //这里说是==获取文档内容的值,在
   if (rb.stage == ResponseBuilder.STAGE_GET_FIELDS) {
       //有些文档可能已不存在了,则忽略掉
      for (Iterator<SolrDocument> iter = rb._responseDocs.iterator(); iter.hasNext();) {
        if (iter.next() == null) {
          iter.remove();
          rb._responseDocs.setNumFound(rb._responseDocs.getNumFound()-1);
        }        
      }

      rb.rsp.add("response", rb._responseDocs);
    }
  }


同样最后的结果是保存在

ResponseBuilder

     ResponseBuilder
         NamedList values = new SimpleOrderedMap();

这个字段里,以键为"response",单机存储的是lucene 的内部id列表
如果是分布式,则存储的是SolrDocumentList,不用再去索引拿出对应的存储字段,
这个在QueryResponseWriter里有对应的处理










转自:http://blog.csdn.net/duck_genuine/article/details/6962624

相关问答

更多
  • java源代码是用来关联jar中的编译代码的。 应用场景:通常在打开eclipse中的jar的时候,发现class文件不能被打开,此时出现下面的界面,此时就需要通过“Attach Source”,之后找到对应的java源代码来实现代码关联,即可正常的显示class类中的内容。 备注:如果此处ava源代码指的是源代码文件(“.java”),是用来进行代码维护和二次开发的必备东西。
  • spring的源代码下载地址: http://www.springframework.org/ spring的源代码查看请去这个网站搜索: http://www.open-open.com/index.htm
  • 多实践,多操作。网上有许多软件。许多程序编写的教程。你可以先跟着做,做熟悉了可以自己创意。加上一些教程里没有的东西。举一反三。
  • //最简单的helloword  import android.app.Activity;  import android.os.Bundle;  import android.widget.TextView; public class HelloWorld extends Activity { @Override public void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); TextView tv ...
  • 要多简单。这样简单行么。
  • 根据LUCENE-4288 ,Solr只有在默认情况下从SVN签出时才会正确打包。 但是,如果将package-src-tgz更改为package-local-src-tgz ,它将正确打包。 在solr/build.xml找到以下行: 并将package-src-tgz更改为package-local-src-tgz 。