自己封装的一个Solr Data Import Request Handler Scheduler

2019-03-27 01:11|来源: 网路

经过将近一天的努力,终于搞定了Solr的 Data Import Request Handler Scheduler。

Scheduler主要解决两个问题:

1.定时增量更新索引。

2.定时重做索引。

经过测试,Scheduler已经可以实现完全基于配置,无需开发功能,无需人工干预的情况下实现以上两个功能(结合 Solr 的 Data Import Request Handler前提下)。

为了方便以后使用,我将代码放到http://code.google.com上,地址是:http://code.google.com/p/solr-dataimport-scheduler/ 

这里贴出一下主要的代码备忘:

SolrDataImportProperties.java 配置文件读取:

View Code

BaseTimerTask.java  TimerTask基类,封装了一些基本的属性读取、请求发送方法:

View Code
  1 package org.apache.solr.handler.dataimport.scheduler;
  2 
  3 import java.io.IOException;
  4 import java.net.HttpURLConnection;
  5 import java.net.MalformedURLException;
  6 import java.net.URL;
  7 import java.text.DateFormat;
  8 import java.text.ParseException;
  9 import java.text.SimpleDateFormat;
 10 import java.util.Date;
 11 import java.util.Timer;
 12 import java.util.TimerTask;
 13 
 14 import org.slf4j.Logger;
 15 import org.slf4j.LoggerFactory;
 16 
 17 public abstract class BaseTimerTask extends TimerTask {
 18     protected String syncEnabled;
 19     protected String[] syncCores;
 20     protected String server;
 21     protected String port;
 22     protected String webapp;
 23     protected String params;
 24     protected String interval;
 25     protected String cores;
 26     protected SolrDataImportProperties p;
 27     protected boolean singleCore;
 28 
 29     protected String reBuildIndexParams;
 30     protected String reBuildIndexBeginTime;
 31     protected String reBuildIndexInterval;
 32 
 33     protected static final Logger logger = LoggerFactory
 34             .getLogger(BaseTimerTask.class);
 35 
 36     public BaseTimerTask(String webAppName, Timer t) throws Exception {
 37         // load properties from global dataimport.properties
 38         p = new SolrDataImportProperties();
 39         reloadParams();
 40         fixParams(webAppName);
 41 
 42         if (!syncEnabled.equals("1"))
 43             throw new Exception("Schedule disabled");
 44 
 45         if (syncCores == null
 46                 || (syncCores.length == 1 && syncCores[0].isEmpty())) {
 47             singleCore = true;
 48             logger.info("<index update process> Single core identified in dataimport.properties");
 49         } else {
 50             singleCore = false;
 51             logger.info("<index update process> Multiple cores identified in dataimport.properties. Sync active for: "
 52                     + cores);
 53         }
 54     }
 55 
 56     protected void reloadParams() {
 57         p.loadProperties(true);
 58         syncEnabled = p.getProperty(SolrDataImportProperties.SYNC_ENABLED);
 59         cores = p.getProperty(SolrDataImportProperties.SYNC_CORES);
 60         server = p.getProperty(SolrDataImportProperties.SERVER);
 61         port = p.getProperty(SolrDataImportProperties.PORT);
 62         webapp = p.getProperty(SolrDataImportProperties.WEBAPP);
 63         params = p.getProperty(SolrDataImportProperties.PARAMS);
 64         interval = p.getProperty(SolrDataImportProperties.INTERVAL);
 65         syncCores = cores != null ? cores.split(",") : null;
 66 
 67         reBuildIndexParams = p
 68                 .getProperty(SolrDataImportProperties.REBUILDINDEXPARAMS);
 69         reBuildIndexBeginTime = p
 70                 .getProperty(SolrDataImportProperties.REBUILDINDEXBEGINTIME);
 71         reBuildIndexInterval = p
 72                 .getProperty(SolrDataImportProperties.REBUILDINDEXINTERVAL);
 73 
 74     }
 75 
 76     protected void fixParams(String webAppName) {
 77         if (server == null || server.isEmpty())
 78             server = "localhost";
 79         if (port == null || port.isEmpty())
 80             port = "8080";
 81         if (webapp == null || webapp.isEmpty())
 82             webapp = webAppName;
 83         if (interval == null || interval.isEmpty() || getIntervalInt() <= 0)
 84             interval = "30";
 85         if (reBuildIndexBeginTime == null || reBuildIndexBeginTime.isEmpty())
 86             interval = "00:00:00";
 87         if (reBuildIndexInterval == null || reBuildIndexInterval.isEmpty()
 88                 || getReBuildIndexIntervalInt() <= 0)
 89             reBuildIndexInterval = "0";
 90     }
 91 
 92     protected void prepUrlSendHttpPost(String params) {
 93         String coreUrl = "http://" + server + ":" + port + "/" + webapp
 94                 + params;
 95         sendHttpPost(coreUrl, null);
 96     }
 97 
 98     protected void prepUrlSendHttpPost(String coreName, String params) {
 99         String coreUrl = "http://" + server + ":" + port + "/" + webapp + "/"
100                 + coreName + params;
101         sendHttpPost(coreUrl, coreName);
102     }
103 
104     protected void sendHttpPost(String completeUrl, String coreName) {
105         DateFormat df = new SimpleDateFormat("dd.MM.yyyy HH:mm:ss SSS");
106         Date startTime = new Date();
107 
108         // prepare the core var
109         String core = coreName == null ? "" : "[" + coreName + "] ";
110 
111         logger.info(core
112                 + "<index update process> Process started at .............. "
113                 + df.format(startTime));
114 
115         try {
116 
117             URL url = new URL(completeUrl);
118             HttpURLConnection conn = (HttpURLConnection) url.openConnection();
119 
120             conn.setRequestMethod("POST");
121             conn.setRequestProperty("type", "submit");
122             conn.setDoOutput(true);
123 
124             // Send HTTP POST
125             conn.connect();
126 
127             logger.info(core + "<index update process> Full URL\t\t\t\t"
128                     + conn.getURL());
129             logger.info(core + "<index update process> Response message\t\t\t"
130                     + conn.getResponseMessage());
131             logger.info(core + "<index update process> Response code\t\t\t"
132                     + conn.getResponseCode());
133 
134             // listen for change in properties file if an error occurs
135             if (conn.getResponseCode() != 200) {
136                 reloadParams();
137             }
138 
139             conn.disconnect();
140             logger.info(core
141                     + "<index update process> Disconnected from server\t\t"
142                     + server);
143             Date endTime = new Date();
144             logger.info(core
145                     + "<index update process> Process ended at ................ "
146                     + df.format(endTime));
147         } catch (MalformedURLException mue) {
148             logger.error("Failed to assemble URL for HTTP POST", mue);
149         } catch (IOException ioe) {
150             logger.error(
151                     "Failed to connect to the specified URL while trying to send HTTP POST",
152                     ioe);
153         } catch (Exception e) {
154             logger.error("Failed to send HTTP POST", e);
155         }
156     }
157 
158     public int getIntervalInt() {
159         try {
160             return Integer.parseInt(interval);
161         } catch (NumberFormatException e) {
162             logger.warn(
163                     "Unable to convert 'interval' to number. Using default value (30) instead",
164                     e);
165             return 30; // return default in case of error
166         }
167     }
168 
169     public int getReBuildIndexIntervalInt() {
170         try {
171             return Integer.parseInt(reBuildIndexInterval);
172         } catch (NumberFormatException e) {
173             logger.info(
174                     "Unable to convert 'reBuildIndexInterval' to number. do't rebuild index.",
175                     e);
176             return 0; // return default in case of error
177         }
178     }
179 
180     public Date getReBuildIndexBeginTime() {
181         Date beginDate = null;
182         try {
183             SimpleDateFormat sdfDate = new SimpleDateFormat("yyyy-MM-dd");
184             String dateStr = sdfDate.format(new Date());
185             beginDate = sdfDate.parse(dateStr);
186             if (reBuildIndexBeginTime == null
187                     || reBuildIndexBeginTime.isEmpty()) {
188                 return beginDate;
189             }
190             if (reBuildIndexBeginTime.matches("\\d{2}:\\d{2}:\\d{2}")) {
191                 SimpleDateFormat sdf = new SimpleDateFormat(
192                         "yyyy-MM-dd HH:mm:ss");
193                 beginDate = sdf.parse(dateStr + " " + reBuildIndexBeginTime);
194             } else if (reBuildIndexBeginTime
195                     .matches("\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}")) {
196                 SimpleDateFormat sdf = new SimpleDateFormat(
197                         "yyyy-MM-dd HH:mm:ss");
198                 beginDate = sdf.parse(reBuildIndexBeginTime);
199             }
200             return beginDate;
201         } catch (ParseException e) {
202             logger.warn(
203                     "Unable to convert 'reBuildIndexBeginTime' to date. use now time.",
204                     e);
205             return beginDate;
206         }
207     }
208 
209 }

DeltaImportHTTPPostScheduler.java 增量索引更新任务计划:

View Code
 1 package org.apache.solr.handler.dataimport.scheduler;
 2 
 3 import java.util.Timer;
 4 
 5 import org.slf4j.Logger;
 6 import org.slf4j.LoggerFactory;
 7 
 8 /**
 9  * 增量更新索引的任务
10  * @author zhangliang
11  *
12  */
13 public class DeltaImportHTTPPostScheduler extends BaseTimerTask {
14 
15     private static final Logger logger = LoggerFactory
16             .getLogger(DeltaImportHTTPPostScheduler.class);
17 
18     public DeltaImportHTTPPostScheduler(String webAppName, Timer t)
19             throws Exception {
20         super(webAppName, t);
21         logger.info("<index update process> DeltaImportHTTPPostScheduler init");
22     }
23 
24     public void run() {
25         try {
26             // check mandatory params
27             if (server.isEmpty() || webapp.isEmpty() || params == null
28                     || params.isEmpty()) {
29                 logger.warn("<index update process> Insuficient info provided for data import");
30                 logger.info("<index update process> Reloading global dataimport.properties");
31                 reloadParams();
32                 // single-core
33             } else if (singleCore) {
34                 prepUrlSendHttpPost(params);
35 
36                 // multi-core
37             } else if (syncCores.length == 0
38                     || (syncCores.length == 1 && syncCores[0].isEmpty())) {
39                 logger.warn("<index update process> No cores scheduled for data import");
40                 logger.info("<index update process> Reloading global dataimport.properties");
41                 reloadParams();
42 
43             } else {
44                 for (String core : syncCores) {
45                     prepUrlSendHttpPost(core, params);
46                 }
47             }
48         } catch (Exception e) {
49             logger.error("Failed to prepare for sendHttpPost", e);
50             reloadParams();
51         }
52     }
53 }

FullImportHTTPPostScheduler.java 重做索引任务计划:

View Code
 1 package org.apache.solr.handler.dataimport.scheduler;
 2 
 3 import java.util.Timer;
 4 
 5 import org.slf4j.Logger;
 6 import org.slf4j.LoggerFactory;
 7 
 8 /**
 9  * 重做索引的任务
10  * @author zhangliang
11  *
12  */
13 public class FullImportHTTPPostScheduler extends BaseTimerTask {
14 
15     private static final Logger logger = LoggerFactory
16             .getLogger(FullImportHTTPPostScheduler.class);
17 
18     public FullImportHTTPPostScheduler(String webAppName, Timer t)
19             throws Exception {
20         super(webAppName, t);
21         logger.info("<index update process> DeltaImportHTTPPostScheduler init");
22     }
23 
24     public void run() {
25         try {
26             // check mandatory params
27             if (server.isEmpty() || webapp.isEmpty()
28                     || reBuildIndexParams == null
29                     || reBuildIndexParams.isEmpty()) {
30                 logger.warn("<index update process> Insuficient info provided for data import, reBuildIndexParams is null");
31                 logger.info("<index update process> Reloading global dataimport.properties");
32                 reloadParams();
33                 // single-core
34             } else if (singleCore) {
35                 prepUrlSendHttpPost(reBuildIndexParams);
36 
37                 // multi-core
38             } else if (syncCores.length == 0
39                     || (syncCores.length == 1 && syncCores[0].isEmpty())) {
40                 logger.warn("<index update process> No cores scheduled for data import");
41                 logger.info("<index update process> Reloading global dataimport.properties");
42                 reloadParams();
43 
44             } else {
45                 for (String core : syncCores) {
46                     prepUrlSendHttpPost(core, reBuildIndexParams);
47                 }
48             }
49         } catch (Exception e) {
50             logger.error("Failed to prepare for sendHttpPost", e);
51             reloadParams();
52         }
53     }
54 }

ApplicationListener.java 调用任务计划的Listener:

View Code
  1 package org.apache.solr.handler.dataimport.scheduler;
  2 
  3 import java.util.Calendar;
  4 import java.util.Date;
  5 import java.util.Timer;
  6 
  7 import javax.servlet.ServletContext;
  8 import javax.servlet.ServletContextEvent;
  9 import javax.servlet.ServletContextListener;
 10 
 11 import org.slf4j.Logger;
 12 import org.slf4j.LoggerFactory;
 13 
 14 public class ApplicationListener implements ServletContextListener {
 15 
 16     private static final Logger logger = LoggerFactory
 17             .getLogger(ApplicationListener.class);
 18 
 19     @Override
 20     public void contextDestroyed(ServletContextEvent servletContextEvent) {
 21         ServletContext servletContext = servletContextEvent.getServletContext();
 22 
 23         // get our timer from the context
 24         Timer timer = (Timer) servletContext.getAttribute("timer");
 25         Timer fullImportTimer = (Timer) servletContext
 26                 .getAttribute("fullImportTimer");
 27 
 28         // cancel all active tasks in the timers queue
 29         if (timer != null)
 30             timer.cancel();
 31         if (fullImportTimer != null)
 32             fullImportTimer.cancel();
 33 
 34         // remove the timer from the context
 35         servletContext.removeAttribute("timer");
 36         servletContext.removeAttribute("fullImportTimer");
 37 
 38     }
 39 
 40     @Override
 41     public void contextInitialized(ServletContextEvent servletContextEvent) {
 42         ServletContext servletContext = servletContextEvent.getServletContext();
 43         try {
 44             // 增量更新任务计划
 45             // create the timer and timer task objects
 46             Timer timer = new Timer();
 47             DeltaImportHTTPPostScheduler task = new DeltaImportHTTPPostScheduler(
 48                     servletContext.getServletContextName(), timer);
 49 
 50             // get our interval from HTTPPostScheduler
 51             int interval = task.getIntervalInt();
 52 
 53             // get a calendar to set the start time (first run)
 54             Calendar calendar = Calendar.getInstance();
 55 
 56             // set the first run to now + interval (to avoid fireing while the
 57             // app/server is starting)
 58             calendar.add(Calendar.MINUTE, interval);
 59             Date startTime = calendar.getTime();
 60 
 61             // schedule the task
 62             timer.scheduleAtFixedRate(task, startTime, 1000 * 60 * interval);
 63 
 64             // save the timer in context
 65             servletContext.setAttribute("timer", timer);
 66 
 67             // 重做索引任务计划
 68             Timer fullImportTimer = new Timer();
 69             FullImportHTTPPostScheduler fullImportTask = new FullImportHTTPPostScheduler(
 70                     servletContext.getServletContextName(), fullImportTimer);
 71 
 72             int reBuildIndexInterval = fullImportTask
 73                     .getReBuildIndexIntervalInt();
 74             if (reBuildIndexInterval <= 0) {
 75                 logger.warn("Full Import Schedule disabled");
 76                 return;
 77             }
 78 
 79             Calendar fullImportCalendar = Calendar.getInstance();
 80             Date beginDate = fullImportTask.getReBuildIndexBeginTime();
 81             fullImportCalendar.setTime(beginDate);
 82             fullImportCalendar.add(Calendar.MINUTE, reBuildIndexInterval);
 83             Date fullImportStartTime = fullImportCalendar.getTime();
 84 
 85             // schedule the task
 86             fullImportTimer.scheduleAtFixedRate(fullImportTask,
 87                     fullImportStartTime, 1000 * 60 * reBuildIndexInterval);
 88 
 89             // save the timer in context
 90             servletContext.setAttribute("fullImportTimer", fullImportTimer);
 91 
 92         } catch (Exception e) {
 93             if (e.getMessage().endsWith("disabled")) {
 94                 logger.warn("Schedule disabled");
 95             } else {
 96                 logger.error("Problem initializing the scheduled task: ", e);
 97             }
 98         }
 99 
100     }
101 
102 }

 

使用说明
1.将上面的编译文件打包成 apache-solr-dataimportscheduler-1.0.jar, 然后和solr自带的 apache-solr-dataimporthandler-*.jar, apache-solr-dataimporthandler-extras-*.jar 放到solr.war的lib目录下面
2.修改solr.war中WEB-INF/web.xml, 在servlet节点前面增加:

<listener>
<listener-class>
org.apache.solr.handler.dataimport.scheduler.ApplicationListener
</listener-class>
</listener>

3.将apache-solr-dataimportscheduler-.jar 中 dataimport.properties 取出并根据实际情况修改,然后放到 solr.home/conf (不是solr.home/core/conf) 目录下面
4.重启tomcat或者jboss 即可

dataimport.properties 配置项说明

#################################################
# #
# dataimport scheduler properties #
# #
#################################################

# to sync or not to sync
# 1 - active; anything else - inactive
syncEnabled=1

# which cores to schedule
# in a multi-core environment you can decide which cores you want syncronized
# leave empty or comment it out if using single-core deployment
syncCores=core1,core2

# solr server name or IP address
# [defaults to localhost if empty]
server=localhost

# solr server port
# [defaults to 80 if empty]
port=8080

# application name/context
# [defaults to current ServletContextListener's context (app) name]
webapp=solr

# URL params [mandatory]
# remainder of URL
params=/dataimport?command=delta-import&clean=false&commit=true# schedule interval
# number of minutes between two runs
# [defaults to 30 if empty]
interval=1

# 重做索引的时间间隔,单位分钟,默认7200,即5天; 
# 为空,为0,或者注释掉:表示永不重做索引
reBuildIndexInterval=7200

# 重做索引的参数
reBuildIndexParams=/dataimport?command=full-import&clean=true&commit=true# 重做索引时间间隔的计时开始时间,第一次真正执行的时间=reBuildIndexBeginTime+reBuildIndexInterval*60*1000;
# 两种格式:2012-04-11 03:10:00 或者 03:10:00,后一种会自动补全日期部分为服务启动时的日期
reBuildIndexBeginTime=03:10:00

 

 


转自:http://www.cnblogs.com/ezhangliang/archive/2012/04/11/2441945

相关问答

更多
  • 缺少的部分是设置一个ReponseParser ,了解Solr DIH应答请求的方式。 添加XMLResponseParser应该可以解决问题。 template.execute(new SolrCallback() { @Override public SolrResponse doInSolr(SolrServer solrServer) throws SolrServerException, IOException { SolrRequest reque ...
  • 如果您的MySQL DB和Solr服务器不在同一台计算机上,您可能会遇到网络问题。 我店里的DB和Solr服务器不在同一台机器上,有时导入速度会慢下来,具体取决于当天的情况。 可能是你最大的贡献者是你的嵌套实体。 当Solr导入文档时,似乎Solr的行为就像嵌套实体是嵌套循环一样。 如果您可以使用一系列内部或右侧连接在一个查询中将列组合在一起,那么您可能会好得多。 我们曾经在我工作的地方使用嵌套实体,导入可能需要数小时。 我们能够编写一个相当复杂的MySQL连接来替换那些嵌套的实体。 我们的全部进口通常在1 ...
  • JAVA_HOME=/usr/java/default JAVA_OPTIONS="-Dsolr.solr.home=/opt/solr/solr $JAVA_OPTIONS" JETTY_HOME=/opt/solr JETTY_USER=solr JETTY_LOGS=/opt/solr/logs 所有这些设置都很重要。 特别是,不设置JETTY_LOGS会导致jetty尝试(并且失败)将请求日志放在/ home / solr / logs中。 通过链接 https://wiki.apache.org ...
  • 这就是我解决问题的方法。 我使用Camel-Http组件而不是Camel-Solr组件,并调用Solr数据导入处理程序来执行delta导入。 路线: from("quartz2://SolrUpdateHandlerTimer?trigger.repeatCount=-1&trigger.repeatInterval=300000") .to("direct:start"); from("direct:start") .setHeader(Exchange. ...
  • 如果DIH处理程序忙于运行请求,则会忽略发送给它的任何其他请求。 所以你必须回顾一下你如何/何时调用DIH的方法,一些想法: 正如你所说你有很高的编辑频率,通过id重新编排似乎并不是最好的,基于时间的某些时间似乎更具可扩展性。 你可以添加一个'lastUpdated'列(当这个行被创建/更新时通过触发器填充),然后每X(1分钟,5分钟......任何你能负担得起)调用reindex。 如果忽略一个请求,则不会丢失数据,应该重新索引的行将在下一个运行的增量上重新索引。 如果你想保持你的基于ID的方法,你需要: ...
  • 要提两点: 根据这里的文档, 请注意,'entity'元素可以嵌套,这样可以在此处镜像示例数据库中的实体关系,这样我们就可以生成非规范化的Solr记录,其中可能包含一个项目的多个功能 因此,尽管实体的结构在xml中就像父子一样,但它表示solr在内部存储为非规范化。 所以这就是帽子发生在这里 你是否正在接受孩子的身份? 我可以看到,根在所有子文档中重复,这是预期的,但id(假设它是子id)也是相同的,这是奇怪的。 你能证实吗? 在solr中添加文档时,使用嵌套的概念,你总是得到1 + n个文档,其中1是父文 ...
  • 是的,还有更好的方法: 您应该在这里使用通道,并且如您所建议的那样,使用一些数据结构来容纳更多的调度程序。 我想出了这个,这是我做的最基本的工作示例: package main import ( "errors" "fmt" "sync" "time" ) // a scheduler runs f on every receive on t and exits when receiving from quit is non-blocking type scheduler ...
  • 也许不是一个正确的答案,但问题似乎解决了。 当然,我们必须做一些事情才能实现这一点,但是我们所能想到的就是删除log4j属性文件中的CONSOLE日志记录并删除它创建的11GB日志文件。 猜猜这可能至少可以为其他人提供其他东西来尝试谁有同样的问题。 Maybe not a proper answer but the issue seemed to resolve itself. Of course we must have done something to make this happen, howeve ...
  • DIH不生成嵌套文档。 Solr支持它们,但DIH还不能生成它们。 DIH中的嵌套实体能够合并源并能够基于来自不同源的迭代来创建实体。 例如,如果外部实体读取文件名称的文件,内部实体从这些文件中加载内容,每个文件都有自己的记录。 您可能希望现在使用SolrJ将嵌套对象代码移动到客户端。 DIH does not produce nested documents. Solr supports them, but DIH can't yet generate them. The nested entities ...
  • [B @ 53bd370f不是散列,而是byte []。toString()的结果。 无论Mysql返回什么,都被视为byte []而不是String。 尝试将id转换为varchar或char,如下所示: SELECT cast(CONCAT('book_', b.id) as CHAR) AS book_id... the [B@53bd370f is not a hash, but the result of a byte[].toString(). Whatever Mysql is retur ...