iis服务器助手广告
返回顶部
首页 > 资讯 > 精选 >elasticsearch节点间通信的transport启动过程是什么
  • 909
分享到

elasticsearch节点间通信的transport启动过程是什么

2023-06-30 08:06:06 909人浏览 安东尼
摘要

这篇文章主要介绍“elasticsearch节点间通信的transport启动过程是什么”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“elasticsearch节点间通信的transport启动过程

这篇文章主要介绍“elasticsearch节点间通信的transport启动过程是什么”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“elasticsearch节点间通信的transport启动过程是什么”文章能帮助大家解决问题。

transport

transport顾名思义是集群通信的基本通道,无论是集群状态信息,还是搜索索引请求信息,都是通过transport传送。elasticsearch定义了tansport,tansportmessage,tansportchannel,tansportrequest,tansportresponse等所需的所有的基础接口。这里将以transport为主,分析过程中会附带介绍其它接口。首先看一下transport节点的定义,如下图所示:

elasticsearch节点间通信的transport启动过程是什么

NettyTransport实现了该接口。分析NettyTransport前简单说一下Netty的用法,Netty的使用需要三个模块ServerBootStrap,ClientBootStrap(v3.x)及MessageHandler。ServerBootStrap启动服务器,ClientBootStrap启动客户端并连接服务器,MessageHandler是message处理逻辑所在,也就是业务逻辑。其它详细使用请参考Netty官方文档。

启动serverBootStrap

NettyTransport每个在doStart()方法中启动serverBootStrap,和ClientBootStrap,并绑定ip,代码如下所示:

protected void doStart() throws ElasticsearchException {       clientBootstrap = createClientBootstrap();//根据配置启动客户端       ……//省略了无关分代码    createServerBootstrap(name, mergedSettings);//启动server端       bindServerBootstrap(name, mergedSettings);//绑定ip        }

每一个节点都需要发送和接收,因此两者都需要启动,client和server的启动分别在相应的方法中,启动过程就是netty的启动过程,有兴趣可以去看相应方法。bindServerBootstrap(name, mergedSettings)将本地ip和断开绑定到netty同时设定好export host(export host的具体作业我也看明白也没有看到相关的绑定,需要进一步研究)。

启动client及server的过程中将messagehandler注入到channelpipeline中。至此启动过程完成,但是client并未连接任何server,连接过程是在节点启动后,才连接到其它节点的。

如何连接到node

方法代码如下所示:

public void connectToNode(DiscoveryNode node, boolean light) {     //transport的模块必须要启动        if (!lifecycle.started()) {            throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport");        }     //获取读,每个节点可以和多个节点建立连接,因此这里用读锁        globalLock.readLock().lock();        try {        //以node.id为基础获取一个锁,这保证对于每个node只能建立一次连接            connectionLock.acquire(node.id());            try {                if (!lifecycle.started()) {                    throw new ElasticsearchIllegalStateException("can't add nodes to a stopped transport");                }                NodeChannels nodeChannels = connectedNodes.get(node);                if (nodeChannels != null) {                    return;                }                try {                    if (light) {//这里的light,就是对该节点只获取一个channel,所有类型(5种连接类型下面会说到)都使用者一个channel                        nodeChannels = connectToChannelsLight(node);                    } else {                        nodeChannels = new NodeChannels(new Channel[connectionsPerNodeRecovery], new Channel[connectionsPerNodeBulk], new Channel[connectionsPerNodeReg], new Channel[connectionsPerNodeState], new Channel[connectionsPerNodePing]);                        try {                            connectToChannels(nodeChannels, node);                        } catch (Throwable e) {                            logger.trace("failed to connect to [{}], cleaning dangling connections", e, node);                            nodeChannels.close();                            throw e;                        }                    }                    // we acquire a connection lock, so no way there is an existing connection                    connectedNodes.put(node, nodeChannels);                    if (logger.isDebugEnabled()) {                        logger.debug("connected to node [{}]", node);                    }                    transportServiceAdapter.raiseNodeConnected(node);                } catch (ConnectTransportException e) {                    throw e;                } catch (Exception e) {                    throw new ConnectTransportException(node, "general node connection failure", e);                }            } finally {                connectionLock.release(node.id());            }        } finally {            globalLock.readLock().unlock();        }    }

如果不是轻连接,每个server和clien之间都有5中连接,着5中连接承担着不同的任务

连接方法的代码

protected void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) {    //五种连接方式,不同的连接方式对应不同的集群操作        ChannelFuture[] connectRecovery = new ChannelFuture[nodeChannels.recovery.length];        ChannelFuture[] connectBulk = new ChannelFuture[nodeChannels.bulk.length];        ChannelFuture[] connectReg = new ChannelFuture[nodeChannels.reg.length];        ChannelFuture[] connectState = new ChannelFuture[nodeChannels.state.length];        ChannelFuture[] connectPing = new ChannelFuture[nodeChannels.ping.length];        InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();    //尝试建立连接        for (int i = 0; i < connectRecovery.length; i++) {            connectRecovery[i] = clientBootstrap.connect(address);        }        for (int i = 0; i < connectBulk.length; i++) {            connectBulk[i] = clientBootstrap.connect(address);        }        for (int i = 0; i < connectReg.length; i++) {            connectReg[i] = clientBootstrap.connect(address);        }        for (int i = 0; i < connectState.length; i++) {            connectState[i] = clientBootstrap.connect(address);        }        for (int i = 0; i < connectPing.length; i++) {            connectPing[i] = clientBootstrap.connect(address);        }    //获取每个连接的channel存入到相应的channels中便于后面使用。        try {            for (int i = 0; i < connectRecovery.length; i++) {                connectRecovery[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));                if (!connectRecovery[i].isSuccess()) {                    throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectRecovery[i].getCause());                }                nodeChannels.recovery[i] = connectRecovery[i].getChannel();                nodeChannels.recovery[i].getCloseFuture().addListener(new ChannelCloseListener(node));            }            for (int i = 0; i < connectBulk.length; i++) {                connectBulk[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));                if (!connectBulk[i].isSuccess()) {                    throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectBulk[i].getCause());                }                nodeChannels.bulk[i] = connectBulk[i].getChannel();                nodeChannels.bulk[i].getCloseFuture().addListener(new ChannelCloseListener(node));            }            for (int i = 0; i < connectReg.length; i++) {                connectReg[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));                if (!connectReg[i].isSuccess()) {                    throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectReg[i].getCause());                }                nodeChannels.reg[i] = connectReg[i].getChannel();                nodeChannels.reg[i].getCloseFuture().addListener(new ChannelCloseListener(node));            }            for (int i = 0; i < connectState.length; i++) {                connectState[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));                if (!connectState[i].isSuccess()) {                    throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectState[i].getCause());                }                nodeChannels.state[i] = connectState[i].getChannel();                nodeChannels.state[i].getCloseFuture().addListener(new ChannelCloseListener(node));            }            for (int i = 0; i < connectPing.length; i++) {                connectPing[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));                if (!connectPing[i].isSuccess()) {                    throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectPing[i].getCause());                }                nodeChannels.ping[i] = connectPing[i].getChannel();                nodeChannels.ping[i].getCloseFuture().addListener(new ChannelCloseListener(node));            }            if (nodeChannels.recovery.length == 0) {                if (nodeChannels.bulk.length > 0) {                    nodeChannels.recovery = nodeChannels.bulk;                } else {                    nodeChannels.recovery = nodeChannels.reg;                }            }            if (nodeChannels.bulk.length == 0) {                nodeChannels.bulk = nodeChannels.reg;            }        } catch (RuntimeException e) {            // clean the futures            for (ChannelFuture future : ImmutableList.<ChannelFuture>builder().add(connectRecovery).add(connectBulk).add(connectReg).add(connectState).add(connectPing).build()) {                future.cancel();                if (future.getChannel() != null && future.getChannel().isOpen()) {                    try {                        future.getChannel().close();                    } catch (Exception e1) {                        // ignore                    }                }            }            throw e;        }    }

关于“elasticsearch节点间通信的transport启动过程是什么”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识,可以关注编程网精选频道,小编每天都会为大家更新不同的知识点。

--结束END--

本文标题: elasticsearch节点间通信的transport启动过程是什么

本文链接: https://www.lsjlt.com/news/327745.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • elasticsearch节点间通信的transport启动过程是什么
    这篇文章主要介绍“elasticsearch节点间通信的transport启动过程是什么”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“elasticsearch节点间通信的transport启动过程...
    99+
    2023-06-30
  • elasticsearch节点间通信的基础transport启动过程
    目录前言transport启动serverBootStrap如何连接到node连接方法的代码总结前言 在前一篇中我们分析了cluster的一些元素。接下来的章节会对cluster的运...
    99+
    2024-04-02
  • ssl通信的过程是什么
    SSL(Secure Socket Layer)是一种用于保护互联网通信安全的协议。SSL通信的过程大致可以分为以下几个步骤:1. ...
    99+
    2023-08-25
    ssl
  • ssl通信过程是什么
    SSL(Secure Sockets Layer)是一种常用的加密通信协议,用于在网络上实现安全的通信。SSL通信过程包括以下几个步...
    99+
    2023-08-24
    ssl
  • 什么是进程间通信
    这篇文章主要介绍“什么是进程间通信”,在日常操作中,相信很多人在什么是进程间通信问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”什么是进程间通信”的疑惑有所帮助!接下来,请跟着...
    99+
    2024-04-02
  • Python进程间的通信方式是什么
    这篇文章主要介绍“Python进程间的通信方式是什么”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Python进程间的通信方式是什么”文章能帮助大家解决问题。什么是进程的通信这里举一个例子接介绍通信...
    99+
    2023-06-29
  • java线程间通信的方法是什么
    Java线程间通信的方法有以下几种: 使用共享变量:多个线程共享同一个变量,通过对变量的读写操作来实现线程间的通信。例如,一个线程...
    99+
    2023-10-28
    java
  • python进程间的通信机制是什么
    本文小编为大家详细介绍“python进程间的通信机制是什么”,内容详细,步骤清晰,细节处理妥当,希望这篇“python进程间的通信机制是什么”文章能帮助大家解决疑惑,下面跟着小编的思路慢慢深入,一起来学习新知识吧。进程间通信表示进程之间的数...
    99+
    2023-07-05
  • Linux进程间通信的方式是什么
    本篇内容主要讲解“Linux进程间通信的方式是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Linux进程间通信的方式是什么”吧!·进程间通信:操作系统为系统提供的用于实现进程间通信的方式进...
    99+
    2023-06-29
  • golang进程间通信的方法是什么
    Golang中可以使用多种方式进行进程间通信,包括:1. Channel(通道):Golang的Channel是一种用于在协程之间进...
    99+
    2023-08-23
    golang
  • Android中APP的启动过程是什么
    在Android中,APP的启动过程主要包括以下几个步骤:1. 用户点击APP图标或通过其他方式触发APP启动的事件。2. 系统根据...
    99+
    2023-08-08
    Android
  • android应用启动过程是什么
    Android应用的启动过程可以分为以下几个步骤:1. 用户点击应用图标:用户在设备上点击应用图标,触发应用的启动。2. 系统启动应...
    99+
    2023-09-09
    android
  • spring容器启动过程是什么
    Spring容器的启动过程如下:1. 加载配置文件:Spring容器需要加载一个或多个配置文件,配置文件可以是XML文件、Java注...
    99+
    2023-09-14
    spring
  • Android组件Activity的启动过程是什么
    这篇文章主要介绍了Android组件Activity的启动过程是什么的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Android组件Activity的启动过程是什么文章都会有所收获,下面我们一起来看看吧。在分析...
    99+
    2023-07-06
  • Android中的ActivityThread和APP启动过程是什么
    ActivityThread是Android中负责管理所有Activity的线程,它负责处理Activity的生命周期、事件分发、消...
    99+
    2024-03-08
    Android
  • angular模块间通信的方法是什么
    在Angular中,模块间通信的方法有多种,下面是一些常用的方法:1. 通过共享服务进行通信:创建一个共享服务,可以在多个模块中注入...
    99+
    2023-10-11
    angular
  • docker容器间通信的方法是什么
    Docker容器间通信的方法有以下几种: 使用容器名称或ID进行通信:每个Docker容器都有一个唯一的名称或ID,可以使用这个...
    99+
    2023-10-25
    docker
  • 微信小程序的页面间通讯策略是什么
    微信小程序的页面间通讯策略主要有以下几种方式: 页面传参:通过页面跳转时传入参数,在目标页面的onLoad生命周期函数中获取传入...
    99+
    2024-04-09
    微信小程序
  • VB.NET监视启动过程的具体步骤是什么
    VB.NET监视启动过程的具体步骤是什么,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。大家可能会对VB.NET启动过程的实现不是很陌生,因为这一操作技巧是比较基础的,初学者在学...
    99+
    2023-06-17
  • redis动态增加节点的方法是什么
    Redis动态增加节点的方法有两种,分别是使用Redis Sentinel和使用Redis Cluster。1. 使用Redis S...
    99+
    2023-08-24
    redis
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作