iis服务器助手广告广告
返回顶部
首页 > 资讯 > 精选 >elasticsearch索引创建create index集群matedata更新的方法
  • 364
分享到

elasticsearch索引创建create index集群matedata更新的方法

2023-06-30 08:06:34 364人浏览 薄情痞子
摘要

本文小编为大家详细介绍“elasticsearch索引创建create index集群matedata更新的方法”,内容详细,步骤清晰,细节处理妥当,希望这篇“elasticsearch索引创建create index集

本文小编为大家详细介绍“elasticsearch索引创建create index集群matedata更新的方法”,内容详细,步骤清晰,细节处理妥当,希望这篇“elasticsearch索引创建create index集群matedata更新的方法”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。

    更新集群index matedata

    创建索引需要创建索引并且更新集群index matedata,这一过程在MetaDataCreateIndexService的createIndex方法中完成。这里会提交一个高优先级,AckedClusterStateUpdateTask类型的task。索引创建需要即时得到反馈,异常这个task需要返回,会超时,而且这个任务的优先级也非常高。

    下面具体看一下它的execute方法,这个方法会在master执行任务时调用,这个方法非常长,主要完成以下三个功能:更新合并request,template中的mapping和setting,调用indiceService创建索引,对创建后的索引添加mapping。这一系列功能完成后,合并完成后生成新的matedata,并更新集群状态,完成了索引的创建。

    首先创建index的create方法

    代码如下所示:

    @Override            public ClusterState execute(ClusterState currentState) throws Exception {                boolean indexCreated = false;                String removalReason = null;                try {            //检查request的合法性,1.5版本主要检查index名字是否合法,如不能含有某些字符,另外就是集群节点版本                    validate(request, currentState);                    for (Alias alias : request.aliases()) {//检查别称是否合法                        aliasValidator.validateAlias(alias, request.index(), currentState.metaData());                    }                    // 查找索引模板                    List<IndexTemplateMetaData> templates = findTemplates(request, currentState, indexTemplateFilter);                    Map<String, Custom> customs = Maps.newHashMap();                    // add the request mapping                    Map<String, Map<String, Object>> mappings = Maps.newHashMap();                    Map<String, AliasMetaData> templatesAliases = Maps.newHashMap();                    List<String> templateNames = Lists.newArrayList();            //取出request中的mapping配置,虽然mapping可以后面添加,多数情况创建索引的时候还是会附带着mapping,在request中mapping是一个map                    for (Map.Entry<String, String> entry : request.mappings().entrySet()) {                        mappings.put(entry.geTKEy(), parseMapping(entry.getValue()));                    }            //一些预设如warm等                    for (Map.Entry<String, Custom> entry : request.customs().entrySet()) {                        customs.put(entry.getKey(), entry.getValue());                    }                    // 将找到的template和request中的mapping合并                    for (IndexTemplateMetaData template : templates) {                        templateNames.add(template.getName());                        for (ObjectObjectCursor<String, CompressedString> cursor : template.mappings()) {                            if (mappings.containsKey(cursor.key)) {                                XContentHelper.mergeDefaults(mappings.get(cursor.key), parseMapping(cursor.value.string()));                            } else {                                mappings.put(cursor.key, parseMapping(cursor.value.string()));                            }                        }                        // 合并custom                        for (ObjectObjectCursor<String, Custom> cursor : template.customs()) {                            String type = cursor.key;                            IndexMetaData.Custom custom = cursor.value;                            IndexMetaData.Custom existing = customs.get(type);                            if (existing == null) {                                customs.put(type, custom);                            } else {                                IndexMetaData.Custom merged = IndexMetaData.lookupFactorySafe(type).merge(existing, custom);                                customs.put(type, merged);                            }                        }                        //处理合并别名                        for (ObjectObjectCursor<String, AliasMetaData> cursor : template.aliases()) {                            AliasMetaData aliasMetaData = cursor.value;                            //if an alias with same name came with the create index request itself,                            // ignore this one taken from the index template                            if (request.aliases().contains(new Alias(aliasMetaData.alias()))) {                                continue;                            }                            //if an alias with same name was already processed, ignore this one                            if (templatesAliases.containsKey(cursor.key)) {                                continue;                            }                            //Allow templatesAliases to be templated by replacing a token with the name of the index that we are applying it to                            if (aliasMetaData.alias().contains("{index}")) {                                String templatedAlias = aliasMetaData.alias().replace("{index}", request.index());                                aliasMetaData = AliasMetaData.newAliasMetaData(aliasMetaData, templatedAlias);                            }                            aliasValidator.validateAliasMetaData(aliasMetaData, request.index(), currentState.metaData());                            templatesAliases.put(aliasMetaData.alias(), aliasMetaData);                        }                    }                    // 合并完template和request,现在开始处理配置基本的mapping,合并逻辑跟之前相同,只是mapping来源不同                    File mappingsDir = new File(environment.configFile(), "mappings");                    if (mappingsDir.isDirectory()) {                        // first index level                        File indexMappingsDir = new File(mappingsDir, request.index());                        if (indexMappingsDir.isDirectory()) {                            addMappings(mappings, indexMappingsDir);                        }                        // second is the _default mapping                        File defaultMappingsDir = new File(mappingsDir, "_default");                        if (defaultMappingsDir.isDirectory()) {                            addMappings(mappings, defaultMappingsDir);                        }                    }            //处理index的配置(setting)                    ImmutableSettings.Builder indexSettingsBuilder = settingsBuilder();                    //加入模板中的setting                    for (int i = templates.size() - 1; i >= 0; i--) {                        indexSettingsBuilder.put(templates.get(i).settings());                    }                    // 加入request中的mapping,request中设置会覆盖模板中的设置                    indexSettingsBuilder.put(request.settings());            //处理shard,shard数量不能小于1,因此这里需要特殊处理,如果没有则要使用默认值                    if (request.index().equals(ScriptService.SCRIPT_INDEX)) {                        indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 1));                    } else {                        if (indexSettingsBuilder.get(SETTING_NUMBER_OF_SHARDS) == null) {                            if (request.index().equals(riverIndexName)) {                                indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 1));                            } else {                                indexSettingsBuilder.put(SETTING_NUMBER_OF_SHARDS, settings.getAsInt(SETTING_NUMBER_OF_SHARDS, 5));                            }                        }                    }                    if (request.index().equals(ScriptService.SCRIPT_INDEX)) {                        indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 0));                        indexSettingsBuilder.put(SETTING_AUTO_EXPAND_REPLICAS, "0-all");                    }                    else {                        if (indexSettingsBuilder.get(SETTING_NUMBER_OF_REPLICAS) == null) {                            if (request.index().equals(riverIndexName)) {                                indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1));                            } else {                                indexSettingsBuilder.put(SETTING_NUMBER_OF_REPLICAS, settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, 1));                            }                        }                    }            //处理副本                    if (settings.get(SETTING_AUTO_EXPAND_REPLICAS) != null && indexSettingsBuilder.get(SETTING_AUTO_EXPAND_REPLICAS) == null) {                        indexSettingsBuilder.put(SETTING_AUTO_EXPAND_REPLICAS, settings.get(SETTING_AUTO_EXPAND_REPLICAS));                    }                    if (indexSettingsBuilder.get(SETTING_VERSION_CREATED) == null) {                        Discoverynodes nodes = currentState.nodes();                        final Version createdVersion = Version.smallest(version, nodes.smallestNonClientNodeVersion());                        indexSettingsBuilder.put(SETTING_VERSION_CREATED, createdVersion);                    }                    if (indexSettingsBuilder.get(SETTING_CREATION_DATE) == null) {                        indexSettingsBuilder.put(SETTING_CREATION_DATE, System.currentTimeMillis());                    }                    indexSettingsBuilder.put(SETTING_UUID, Strings.randomBase64UUID());            //创建setting                    Settings actualIndexSettings = indexSettingsBuilder.build();            // 通过indiceservice创建索引                    indicesService.createIndex(request.index(), actualIndexSettings, clusterService.localNode().id());                    indexCreated = true;                    //如果创建成功这里就可以获取到对应的indexservice,否则会抛出异常                    IndexService indexService = indicesService.indexServiceSafe(request.index());            //获取mappingService试图放置mapping                    MapperService mapperService = indexService.mapperService();                    // 为索引添加mapping,首先是默认mapping                    if (mappings.containsKey(MapperService.DEFAULT_MAPPING)) {                        try {                            mapperService.merge(MapperService.DEFAULT_MAPPING, new CompressedString(XContentFactory.JSONBuilder().map(mappings.get(MapperService.DEFAULT_MAPPING)).string()), false);                        } catch (Exception e) {                            removalReason = "failed on parsing default mapping on index creation";                            throw new MapperParsingException("mapping [" + MapperService.DEFAULT_MAPPING + "]", e);                        }                    }                    for (Map.Entry<String, Map<String, Object>> entry : mappings.entrySet()) {                        if (entry.getKey().equals(MapperService.DEFAULT_MAPPING)) {                            continue;                        }                        try {                            // apply the default here, its the first time we parse it                            mapperService.merge(entry.getKey(), new CompressedString(XContentFactory.jsonBuilder().map(entry.getValue()).string()), true);                        } catch (Exception e) {                            removalReason = "failed on parsing mappings on index creation";                            throw new MapperParsingException("mapping [" + entry.getKey() + "]", e);                        }                    }            //添加request中的别称                    IndexQueryParserService indexQueryParserService = indexService.queryParserService();                    for (Alias alias : request.aliases()) {                        if (Strings.hasLength(alias.filter())) {                            aliasValidator.validateAliasFilter(alias.name(), alias.filter(), indexQueryParserService);                        }                    }                    for (AliasMetaData aliasMetaData : templatesAliases.values()) {                        if (aliasMetaData.filter() != null) {                            aliasValidator.validateAliasFilter(aliasMetaData.alias(), aliasMetaData.filter().uncompressed(), indexQueryParserService);                        }                    }                    // 以下更新Index的matedata,                    Map<String, MappingMetaData> mappingsMetaData = Maps.newHashMap();                    for (DocumentMapper mapper : mapperService.docMappers(true)) {                        MappingMetaData mappingMd = new MappingMetaData(mapper);                        mappingsMetaData.put(mapper.type(), mappingMd);                    }                    final IndexMetaData.Builder indexMetaDataBuilder = IndexMetaData.builder(request.index()).settings(actualIndexSettings);                    for (MappingMetaData mappingMd : mappingsMetaData.values()) {                        indexMetaDataBuilder.putMapping(mappingMd);                    }                    for (AliasMetaData aliasMetaData : templatesAliases.values()) {                        indexMetaDataBuilder.putAlias(aliasMetaData);                    }                    for (Alias alias : request.aliases()) {                        AliasMetaData aliasMetaData = AliasMetaData.builder(alias.name()).filter(alias.filter())                                .indexRouting(alias.indexRouting()).searchRouting(alias.searchRouting()).build();                        indexMetaDataBuilder.putAlias(aliasMetaData);                    }                    for (Map.Entry<String, Custom> customEntry : customs.entrySet()) {                        indexMetaDataBuilder.putCustom(customEntry.getKey(), customEntry.getValue());                    }                    indexMetaDataBuilder.state(request.state());            //matedata更新完毕,build新的matedata                    final IndexMetaData indexMetaData;                    try {                        indexMetaData = indexMetaDataBuilder.build();                    } catch (Exception e) {                        removalReason = "failed to build index metadata";                        throw e;                    }                    indexService.indicesLifecycle().beforeIndexAddedToCluster(new Index(request.index()),                            indexMetaData.settings());            //更新集群的matedata,将新build的indexmatadata加入到metadata中                    MetaData newMetaData = MetaData.builder(currentState.metaData())                            .put(indexMetaData, false)                            .build();                    logger.info("[{}] creating index, cause [{}], templates {}, shards [{}]/[{}], mappings {}", request.index(), request.cause(), templateNames, indexMetaData.numberOfShards(), indexMetaData.numberOfReplicas(), mappings.keySet());            //阻塞集群,更新matadata                    ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());                    if (!request.blocks().isEmpty()) {                        for (ClusterBlock block : request.blocks()) {                            blocks.addIndexBlock(request.index(), block);                        }                    }                    if (request.state() == State.CLOSE) {                        blocks.addIndexBlock(request.index(), MetaDataIndexStateService.INDEX_CLOSED_BLOCK);                    }                    ClusterState updatedState = ClusterState.builder(currentState).blocks(blocks).metaData(newMetaData).build();                    if (request.state() == State.OPEN) {                        RoutingTable.Builder routingTableBuilder = RoutingTable.builder(updatedState.routingTable())                                .addAsNew(updatedState.metaData().index(request.index()));                        RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(updatedState).routingTable(routingTableBuilder).build());                        updatedState = ClusterState.builder(updatedState).routingResult(routingResult).build();                    }                    removalReason = "cleaning up after validating index on master";                    return updatedState;                } finally {                    if (indexCreated) {                        // Index was already partially created - need to clean up                        indicesService.removeIndex(request.index(), removalReason != null ? removalReason : "failed to create index");                    }                }            }        });    }

     以上就是创建index的create方法,方法中主要进行了两个动作:合并更新index的matadata和创建index。更新合并matadata的过程都在上面的代码中体现了。

    从indice中获取对应的IndexService

    创建索引是调用indiceSerivice构建一个guice的injector,这个injector包含了Index的所有功能(如分词,相似度等)。同时会将其存储到indiceService中,以一个map的格式存储Map<String, Tuple<IndexService, Injector>> indices。运行中的集群每次对某个索引的操作都首先从indice中获取对应的IndexService。

    这一部分代码如下所示:

    public synchronized IndexService createIndex(String sIndexName, @IndexSettings Settings settings, String localNodeId) throws ElasticsearchException {        if (!lifecycle.started()) {            throw new ElasticsearchIllegalStateException("Can't create an index [" + sIndexName + "], node is closed");        }        Index index = new Index(sIndexName);    //检测index是否已经存在        if (indices.containsKey(index.name())) {            throw new IndexAlreadyExistsException(index);        }        indicesLifecycle.beforeIndexCreated(index, settings);        logger.debug("creating Index [{}], shards [{}]/[{}]", sIndexName, settings.get(SETTING_NUMBER_OF_SHARDS), settings.get(SETTING_NUMBER_OF_REPLICAS));        Settings indexSettings = settingsBuilder()                .put(this.settings)                .put(settings)                .classLoader(settings.getClassLoader())                .build();    //构建index对应的injector        ModulesBuilder modules = new ModulesBuilder();        modules.add(new IndexNameModule(index));        modules.add(new LocalNodeIdModule(localNodeId));        modules.add(new IndexSettingsModule(index, indexSettings));        modules.add(new IndexPluginsModule(indexSettings, pluginsService));        modules.add(new IndexStoreModule(indexSettings));        modules.add(new IndexEngineModule(indexSettings));        modules.add(new AnalysisModule(indexSettings, indicesAnalysisService));        modules.add(new SimilarityModule(indexSettings));        modules.add(new IndexCacheModule(indexSettings));        modules.add(new IndexFieldDataModule(indexSettings));        modules.add(new CodecModule(indexSettings));        modules.add(new MapperServiceModule());        modules.add(new IndexQueryParserModule(indexSettings));        modules.add(new IndexAliasesServiceModule());        modules.add(new IndexGatewayModule(indexSettings, injector.getInstance(Gateway.class)));        modules.add(new IndexModule(indexSettings));        Injector indexInjector;        try {            indexInjector = modules.createChildInjector(injector);        } catch (CreationException e) {            throw new IndexCreationException(index, Injectors.getFirstErrorFailure(e));        } catch (Throwable e) {            throw new IndexCreationException(index, e);        }        IndexService indexService = indexInjector.getInstance(IndexService.class);        indicesLifecycle.afterIndexCreated(indexService);     //将Indexservice和IndexInjector加入到indice map中        indices = newMapBuilder(indices).put(index.name(), new Tuple&lt;&gt;(indexService, indexInjector)).immutableMap();        return indexService;    }

    以上方法就是具体创建索引的过程,它是在master上操作的,同时它是同步方法。这样才能保证集群的Index创建一致性,因此这也会导致之前所说的大量创建创建索引时候的速度瓶颈。但是创建大量索引的动作是不常见的,需要尽量避免。创建一个索引对于一个集群来说就是开启对于该索引的各种操作,因此这里通过guice将索引的各个功能模块注入,并获得index操作的接口类Indexservice。如果这个方法执行成功,则可以合并template及request中的mapping,并且向刚创建的索引添加合并后的mapping,最后构建新的matadata,并将集群新的matadata发送给各个节点完成索引创建。

    读到这里,这篇“elasticsearch索引创建create index集群matedata更新的方法”文章已经介绍完毕,想要掌握这篇文章的知识点还需要大家自己动手实践使用过才能领会,如果想了解更多相关内容的文章,欢迎关注编程网精选频道。

    --结束END--

    本文标题: elasticsearch索引创建create index集群matedata更新的方法

    本文链接: https://www.lsjlt.com/news/327796.html(转载时请注明来源链接)

    有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

    本篇文章演示代码以及资料文档资料下载

    下载Word文档到电脑,方便收藏和打印~

    下载Word文档
    猜你喜欢
    软考高级职称资格查询
    编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
    • 官方手机版

    • 微信公众号

    • 商务合作