知识点
相关文章
更多最近更新
更多深入剖析SolrCloud(四)
2019-03-27 01:05|来源: 网路
在上一篇中介绍了连接Zookeeper集群的方法,这一篇将围绕一个有趣的话题---来展开,这就是Replication(索引复制),关于Solr Replication的详细介绍,可以参考http://wiki.apache.org/solr/SolrReplication。
在开始这个话题之前,先从我最近在应用中引入solr的master/slave架构时,遇到的一个让我困扰的实际问题。
应用场景简单描述如下:
1)首先master节点下载索引分片,然后创建配置文件,加入master节点的replication配置片段,再对索引分片进行合并(关于mergeIndex,可以参考http://wiki.apache.org/solr/MergingSolrIndexes),然后利用上述配置文件和索引数据去创建一个solr核。
2)slave节点创建配置文件,加入slave节点的replication配置片段,创建一个空的solr核,等待从master节点进行索引数据同步
出现的问题:slave节点没有从master节点同步到数据。
问题分析:
1)首先检查master节点,获取最新的可复制索引的版本号,
http://master_host:port/solr/replication?command=indexversion
发现返回的索引版本号是0,这说明mater节点根本没有触发replication动作,
2)为了确认上述判断,在slave节点上进一步查看replication的详细信息
http://slave_host:port/solr/replication?command=details
发现确实如此,尽管master节点的索引版本号和slave节点的索引版本号不一致,但索引却没有同步过来,再分别查看master节点和slave节点的日志,发现索引复制动作确实没有开始。
综上所述,确实是master节点没有触发索引复制动作,那究竟是为何呢?先将原因摆出来,后面会通过源码的分析来加以说明。
原因:solr合并索引时,不管你是通过mergeindexes的http命令,还是调用底层lucene的IndexWriter,记得最后一定要提交一个commit,否则,不仅索引不仅不会对查询可见,更是对于master/slave架构的solr集群来说,master节点的replication动作不会触发,因为indexversion没有感知到变化。
好了,下面开始对Solr的Replication的分析。
Solr容器在加载solr核的时候,会对已经注册的各个实现SolrCoreAware接口的Handler进行回调,调用其inform方法。
对于ReplicationHandler来说,就是在这里对自己是属于master节点还是slave节点进行判断,若是slave节点,则创建一个SnapPuller对象,定时负责从master节点主动拉索引数据下来;若是master节点,则只设置相应的参数。
this.core = core;
registerFileStreamResponseWriter();
registerCloseHook();
NamedList slave = (NamedList) initArgs.get("slave");
boolean enableSlave = isEnabled( slave );
if (enableSlave) {
tempSnapPuller = snapPuller = new SnapPuller(slave, this, core);
isSlave = true;
}
NamedList master = (NamedList) initArgs.get("master");
boolean enableMaster = isEnabled( master );
if (!enableSlave && !enableMaster) {
enableMaster = true;
master = new NamedList<Object>();
}
if (enableMaster) {
includeConfFiles = (String) master.get(CONF_FILES);
if (includeConfFiles != null && includeConfFiles.trim().length() > 0) {
List<String> files = Arrays.asList(includeConfFiles.split(","));
for (String file : files) {
if (file.trim().length() == 0) continue;
String[] strs = file.split(":");
// if there is an alias add it or it is null
confFileNameAlias.add(strs[0], strs.length > 1 ? strs[1] : null);
}
LOG.info("Replication enabled for following config files: " + includeConfFiles);
}
List backup = master.getAll("backupAfter");
boolean backupOnCommit = backup.contains("commit");
boolean backupOnOptimize = !backupOnCommit && backup.contains("optimize");
List replicateAfter = master.getAll(REPLICATE_AFTER);
replicateOnCommit = replicateAfter.contains("commit");
replicateOnOptimize = !replicateOnCommit && replicateAfter.contains("optimize");
if (!replicateOnCommit && ! replicateOnOptimize) {
replicateOnCommit = true;
}
// if we only want to replicate on optimize, we need the deletion policy to
// save the last optimized commit point.
if (replicateOnOptimize) {
IndexDeletionPolicyWrapper wrapper = core.getDeletionPolicy();
IndexDeletionPolicy policy = wrapper == null ? null : wrapper.getWrappedDeletionPolicy();
if (policy instanceof SolrDeletionPolicy) {
SolrDeletionPolicy solrPolicy = (SolrDeletionPolicy)policy;
if (solrPolicy.getMaxOptimizedCommitsToKeep() < 1) {
solrPolicy.setMaxOptimizedCommitsToKeep(1);
}
} else {
LOG.warn("Replication can't call setMaxOptimizedCommitsToKeep on " + policy);
}
}
if (replicateOnOptimize || backupOnOptimize) {
core.getUpdateHandler().registerOptimizeCallback(getEventListener(backupOnOptimize, replicateOnOptimize));
}
if (replicateOnCommit || backupOnCommit) {
replicateOnCommit = true;
core.getUpdateHandler().registerCommitCallback(getEventListener(backupOnCommit, replicateOnCommit));
}
if (replicateAfter.contains("startup")) {
replicateOnStart = true;
RefCounted<SolrIndexSearcher> s = core.getNewestSearcher( false);
try {
DirectoryReader reader = s== null ? null : s.get().getIndexReader();
if (reader!= null && reader.getIndexCommit() != null && reader.getIndexCommit().getGeneration() != 1L) {
try {
if(replicateOnOptimize){
Collection<IndexCommit> commits = DirectoryReader.listCommits(reader.directory());
for (IndexCommit ic : commits) {
if(ic.getSegmentCount() == 1){
if(indexCommitPoint == null || indexCommitPoint.getGeneration() < ic.getGeneration()) indexCommitPoint = ic;
}
}
} else{
indexCommitPoint = reader.getIndexCommit();
}
} finally {
// We don't need to save commit points for replication, the SolrDeletionPolicy
// always saves the last commit point (and the last optimized commit point, if needed)
/** *
if(indexCommitPoint != null){
core.getDeletionPolicy().saveCommitPoint(indexCommitPoint.getGeneration());
}
** */
}
}
// reboot the writer on the new index
core.getUpdateHandler().newIndexWriter();
} catch (IOException e) {
LOG.warn("Unable to get IndexCommit on startup", e);
} finally {
if (s!= null) s.decref();
}
}
String reserve = (String) master.get(RESERVE);
if (reserve != null && !reserve.trim().equals("")) {
reserveCommitDuration = SnapPuller.readInterval(reserve);
}
LOG.info("Commits will be reserved for " + reserveCommitDuration);
isMaster = true;
}
}
ReplicationHandler可以响应多种命令:
1) indexversion。
这里需要了解的第一个概念是索引提交点(IndexCommit),这是底层lucene的东西,可以自行查阅资料。首先获取最新的索引提交点,然后从其中获取索引版本号和索引所属代。
if (commitPoint != null && replicationEnabled.get()) {
core.getDeletionPolicy().setReserveDuration(commitPoint.getVersion(), reserveCommitDuration);
rsp.add(CMD_INDEX_VERSION, commitPoint.getVersion());
rsp.add(GENERATION, commitPoint.getGeneration());
2)backup。这个命令用来对索引做快照。首先获取最新的索引提交点,然后创建做一个SnapShooter,具体的快照动作由这个对象完成,
try {
int numberToKeep = params.getInt(NUMBER_BACKUPS_TO_KEEP, Integer.MAX_VALUE);
IndexDeletionPolicyWrapper delPolicy = core.getDeletionPolicy();
IndexCommit indexCommit = delPolicy.getLatestCommit();
if(indexCommit == null) {
indexCommit = req.getSearcher().getReader().getIndexCommit();
}
// small race here before the commit point is saved
new SnapShooter(core, params.get("location")).createSnapAsync(indexCommit, numberToKeep, this);
} catch (Exception e) {
LOG.warn("Exception during creating a snapshot", e);
rsp.add("exception", e);
}
}
快照对象会启动一个线程去异步地做一个索引备份。
void createSnapAsync(final IndexCommit indexCommit, final int numberToKeep, final ReplicationHandler replicationHandler) {
replicationHandler.core.getDeletionPolicy().saveCommitPoint(indexCommit.getVersion());
new Thread() {
@Override
public void run() {
createSnapshot(indexCommit, numberToKeep, replicationHandler);
}
}.start();
}
void createSnapshot(final IndexCommit indexCommit, int numberToKeep, ReplicationHandler replicationHandler) {
NamedList details = new NamedList();
details.add("startTime", new Date().toString());
File snapShotDir = null;
String directoryName = null;
Lock lock = null;
try {
if(numberToKeep<Integer.MAX_VALUE) {
deleteOldBackups(numberToKeep);
}
SimpleDateFormat fmt = new SimpleDateFormat(DATE_FMT, Locale.US);
directoryName = "snapshot." + fmt.format(new Date());
lock = lockFactory.makeLock(directoryName + ".lock");
if (lock.isLocked()) return;
snapShotDir = new File(snapDir, directoryName);
if (!snapShotDir.mkdir()) {
LOG.warn("Unable to create snapshot directory: " + snapShotDir.getAbsolutePath());
return;
}
Collection<String> files = indexCommit.getFileNames();
FileCopier fileCopier = new FileCopier(solrCore.getDeletionPolicy(), indexCommit);
fileCopier.copyFiles(files, snapShotDir);
details.add("fileCount", files.size());
details.add("status", "success");
details.add("snapshotCompletedAt", new Date().toString());
} catch (Exception e) {
SnapPuller.delTree(snapShotDir);
LOG.error("Exception while creating snapshot", e);
details.add("snapShootException", e.getMessage());
} finally {
replicationHandler.core.getDeletionPolicy().releaseCommitPoint(indexCommit.getVersion());
replicationHandler.snapShootDetails = details;
if (lock != null) {
try {
lock.release();
} catch (IOException e) {
LOG.error("Unable to release snapshoot lock: " + directoryName + ".lock");
}
}
}
}
3)fetchindex。响应来自slave节点的取索引文件的请求,会启动一个线程来实现索引文件的获取。
String masterUrl = solrParams.get(MASTER_URL);
if (!isSlave && masterUrl == null) {
rsp.add(STATUS,ERR_STATUS);
rsp.add("message","No slave configured or no 'masterUrl' Specified");
return;
}
final SolrParams paramsCopy = new ModifiableSolrParams(solrParams);
new Thread() {
@Override
public void run() {
doFetch(paramsCopy);
}
}.start();
rsp.add(STATUS, OK_STATUS);
具体的获取动作是通过SnapPuller对象来实现的,首先尝试获取pull对象锁,如果请求锁失败,则说明还有取索引数据动作未结束,如果请求锁成功,就调用SnapPuller对象的fetchLatestIndex方法来取最新的索引数据。
void doFetch(SolrParams solrParams) {
String masterUrl = solrParams == null ? null : solrParams.get(MASTER_URL);
if (!snapPullLock.tryLock())
return;
try {
tempSnapPuller = snapPuller;
if (masterUrl != null) {
NamedList<Object> nl = solrParams.toNamedList();
nl.remove(SnapPuller.POLL_INTERVAL);
tempSnapPuller = new SnapPuller(nl, this, core);
}
tempSnapPuller.fetchLatestIndex(core);
} catch (Exception e) {
LOG.error("SnapPull failed ", e);
} finally {
tempSnapPuller = snapPuller;
snapPullLock.unlock();
}
}
最后真正的取索引数据过程,首先,若mastet节点的indexversion为0,则说明master节点根本没有提供可供复制的索引数据,若master节点和slave节点的indexversion相同,则说明slave节点目前与master节点索引数据状态保持一致,无需同步。若两者的indexversion不同,则开始索引复制过程,首先从master节点上下载指定索引版本号的索引文件列表,然后创建一个索引文件同步服务线程来完成同并工作。
这里需要区分的是,如果master节点的年代比slave节点要老,那就说明两者已经不相容,此时slave节点需要新建一个索引目录,再从master节点做一次全量索引复制。还需要注意的一点是,索引同步也是可以同步配置文件的,若配置文件发生变化,则需要对solr核进行一次reload操作。最对了,还有,和文章开头一样, slave节点同步完数据后,别忘了做一次commit操作,以便刷新自己的索引提交点到最新的状态。最后,关闭并等待同步服务线程结束。此外,具体的取索引文件是通过FileFetcher对象来完成。
boolean fetchLatestIndex(SolrCore core) throws IOException {
replicationStartTime = System.currentTimeMillis();
try {
//get the current 'replicateable' index version in the master
NamedList response = null;
try {
response = getLatestVersion();
} catch (Exception e) {
LOG.error("Master at: " + masterUrl + " is not available. Index fetch failed. Exception: " + e.getMessage());
return false;
}
long latestVersion = (Long) response.get(CMD_INDEX_VERSION);
long latestGeneration = (Long) response.get(GENERATION);
if (latestVersion == 0L) {
//there is nothing to be replicated
return false;
}
IndexCommit commit;
RefCounted<SolrIndexSearcher> searcherRefCounted = null;
try {
searcherRefCounted = core.getNewestSearcher(false);
commit = searcherRefCounted.get().getReader().getIndexCommit();
} finally {
if (searcherRefCounted != null)
searcherRefCounted.decref();
}
if (commit.getVersion() == latestVersion && commit.getGeneration() == latestGeneration) {
//master and slave are alsready in sync just return
LOG.info("Slave in sync with master.");
return false;
}
LOG.info("Master's version: " + latestVersion + ", generation: " + latestGeneration);
LOG.info("Slave's version: " + commit.getVersion() + ", generation: " + commit.getGeneration());
LOG.info("Starting replication process");
// get the list of files first
fetchFileList(latestVersion);
// this can happen if the commit point is deleted before we fetch the file list.
if(filesToDownload.isEmpty()) return false;
LOG.info("Number of files in latest index in master: " + filesToDownload.size());
// Create the sync service
fsyncService = Executors.newSingleThreadExecutor();
// use a synchronized list because the list is read by other threads (to show details)
filesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
// if the generateion of master is older than that of the slave , it means they are not compatible to be copied
// then a new index direcory to be created and all the files need to be copied
boolean isFullCopyNeeded = commit.getGeneration() >= latestGeneration;
File tmpIndexDir = createTempindexDir(core);
if (isIndexStale())
isFullCopyNeeded = true;
successfulInstall = false;
boolean deleteTmpIdxDir = true;
File indexDir = null ;
try {
indexDir = new File(core.getIndexDir());
downloadIndexFiles(isFullCopyNeeded, tmpIndexDir, latestVersion);
LOG.info("Total time taken for download : " + ((System.currentTimeMillis() - replicationStartTime) / 1000) + " secs");
Collection<Map<String, Object>> modifiedConfFiles = getModifiedConfFiles(confFilesToDownload);
if (!modifiedConfFiles.isEmpty()) {
downloadConfFiles(confFilesToDownload, latestVersion);
if (isFullCopyNeeded) {
successfulInstall = modifyIndexProps(tmpIndexDir.getName());
deleteTmpIdxDir = false;
} else {
successfulInstall = copyIndexFiles(tmpIndexDir, indexDir);
}
if (successfulInstall) {
LOG.info("Configuration files are modified, core will be reloaded");
logReplicationTimeAndConfFiles(modifiedConfFiles, successfulInstall);//write to a file time of replication and conf files.
reloadCore();
}
} else {
terminateAndWaitFsyncService();
if (isFullCopyNeeded) {
successfulInstall = modifyIndexProps(tmpIndexDir.getName());
deleteTmpIdxDir = false;
} else {
successfulInstall = copyIndexFiles(tmpIndexDir, indexDir);
}
if (successfulInstall) {
logReplicationTimeAndConfFiles(modifiedConfFiles, successfulInstall);
doCommit();
}
}
replicationStartTime = 0;
return successfulInstall;
} catch (ReplicationHandlerException e) {
LOG.error("User aborted Replication");
} catch (SolrException e) {
throw e;
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Index fetch failed : ", e);
} finally {
if (deleteTmpIdxDir) delTree(tmpIndexDir);
else delTree(indexDir);
}
return successfulInstall;
} finally {
if (!successfulInstall) {
logReplicationTimeAndConfFiles(null, successfulInstall);
}
filesToDownload = filesDownloaded = confFilesDownloaded = confFilesToDownload = null;
replicationStartTime = 0;
fileFetcher = null;
if (fsyncService != null && !fsyncService.isShutdown()) fsyncService.shutdownNow();
fsyncService = null;
stop = false;
fsyncException = null;
}
}
转自:http://www.cnblogs.com/phinecos/archive/2012/02/29/2372682
相关问答
更多-
为什么要使用solrcloud[2023-01-30]
现在我们来剖析下这样一个简单的集群构建的基本流程: 先从第一台solr服务器说起: 1) 它首先启动一个嵌入式的Zookeeper服务器,作为集群状态信息的管理者, 2) 将自己这个节点注册到/node_states/目录下 3) 同时将自己注册到/live_nodes/目录下 4)创建/overseer_elect/leader,为后续Overseer节点的选举做准备,新建一个Overseer, 5) 更新/clusterstate.json目录下json格式的集群状态信息 6) 本机从Zookeeper ... -
javaweb怎么用solrcloud[2022-06-01]
1.搭建tomcat单机版solr 见我的上一篇博文solr安装-tomcat单机版 2.安装配置zookeeper 具体下载,安装,配置不详细说了,很简单。只列出我的关键配置项,如下: clientPort=4181 server.2=hadoop.datanode5.com:2888:9888 server.3=hadoop.datanode2.com:2888:9888 server.1=hadoop.datanode3.com:2888:9888 3. 配置solrCloud (1)先配置主服务器t ... -
Solrcloud多核配置(Solrcloud multicore configuration)[2022-08-22]
在SolrCloud中,您的每个Core都将成为Collection 。 每个Collection都有自己的一组配置文件和数据。 您可能会发现这个有用的移动多核SOLR实例到云 Solr 5.0(以后)对如何使用分片创建SolrCloud设置以及如何添加集合等进行了一些更改。 下面列出的所有内容都是我对Solr参考指南的理解。 我强烈建议彻底完成它。 https://cwiki.apache.org/confluence/display/solr/Apache+Solr+Reference+Guide 我在 ... -
Solr(主要)用于存储和搜索,Hadoop(主要)用于分布式处理。 他们解决不同的问题。 最常见的是使用Solr和HDFS来存储/加载其索引文件 ,以便使用HDFS集群中的现有功能,或者允许通过Solr搜索已处理的Hadoop结果 。 如果您在Google上进行一些搜索,您会发现很多用例,演示文稿和库,例如LucidWorks的Hadoop集成 , Solr + Hadoop或Hortonworks的索引以及在Apache Solr中搜索数据 。 Solr is (mainly) for storage ...
-
如何使用Spring Data Solr解决SolrCloud的问题?(How to Get Clusterstate of SolrCloud with Spring Data Solr?)[2023-12-30]
我用过: CloudSolrServer solrServer; ... solrServer.getZkStateReader().getClusterState(); ... I used that: CloudSolrServer solrServer; ... solrServer.getZkStateReader().getClusterState(); ... -
负载平衡器为solrcloud(Load balancer for solrcloud)[2021-10-14]
任何负载平衡器都可以。 我们使用nginx没有麻烦。 如果你需要类似于SolrJ的东西,没有什么能阻止你写一些.Net代码来查询zookeeper关于不同集合/分片的状态等。你把它放在一个库中,并且可以透明地从你的.Net代码中使用它。 作为奖励,您可以减少Solr节点之间的往返次数,因为您可以找出哪些是文档需要的分片,并始终将更新发送给分片领导。 Any load balancer would do. We are using nginx without trouble. If you need some ... -
SolrCloud - 内存不足(SolrCloud - Out of Memory)[2023-02-25]
最好的方法应该是将solr云升级到版本6.2.1。 如果节点是32位拱,它还取决于节点架构。 那么如果节点是64位arch,那么堆大小超过2gb就不会工作,你可以分配更多的堆大小但是会产生gc开销错误。 所以最好更新solr并添加更多分片和副本以避免错误。 The best approach should be to upgrade the solr cloud to version 6.2.1. it also depends on the node architecture if the node is ... -
此补丁是在旧的SwitchOnRebuild使用Solr Core切换API时创建的,现在已在版本5 *及更高版本中弃用。 由于Zookeeper存在问题,因此不建议在SolrCloud模式下运行。 此代码使用Solr'集合'API( /solr/admin/collections?action=LIST ),您需要检查此API是否可用于Solr 4.10(我认为它是但我不是100%确定) 然后,您只需要忽略有关schemaFactory的部分,因为它是Solr 5. *特定的。 请注意,此修补程序依赖于要 ...
-
我将尝试回答您的部分问题:SolrCloud确实在所有节点上编制索引,因此它会对副本产生性能影响。 这是由于“热复制”模型而不是您习惯的“冷复制”而完成的。 它解决了数据完整性问题以及群集上的实时搜索。 作为性能影响的代价,您可以获得一致的数据和更快的数据可用性。 实际上,您始终可以将数据拆分为分片(以额外硬件的价格),并具有相当的性能。 在任何一种情况下,由您决定SolrCloud是否适合您的需求。 您可以在没有云模型的情况下使用Solr 4,并像以前一样自行管理。 I'll try to answer ...
-
Solr 4.x和SolrCloud(Solr 4.x and SolrCloud)[2023-11-06]
云功能是Solr的一部分。 对于生产,您需要将ZooKeeper作为单独的应用程序(它也可以作为Solr的一部分运行,但不建议在生产中执行此操作) https://cwiki.apache.org/confluence/display/solr/SolrCloud Cloud features are a part of Solr. For production you would need to have ZooKeeper as separate application (it's can be run ...