iis服务器助手广告
返回顶部
首页 > 资讯 > 前端开发 > node.js >如何解决Socket粘包问题
  • 783
分享到

如何解决Socket粘包问题

2024-04-02 19:04:59 783人浏览 薄情痞子
摘要

本篇内容介绍了“如何解决Socket粘包问题”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!问题一:TCP存

本篇内容介绍了“如何解决Socket粘包问题”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

问题一:TCP存在粘包问题吗?

先说答案:tcp 本身并没有粘包和半包一说,因为 TCP 本质上只是一个传输控制协议(Transmission Control  Protocol,TCP),它是一种面向连接的、可靠的、基于字节流的传输层通信协议,由 IETF 的 RFC 793 定义。

所谓的协议本质上是一个约定,就好比 Java  编程约定使用驼峰命名法一样,约定的意义是为了让通讯双方,能够正常的进行消息互换的,那粘包和半包问题又是如何产生的呢?

这是因为在 TCP  的交互中,数据是以字节流的形式进行传输的,而“流”的传输是没有边界的,因为没有边界所以就不能区分消息的归属,从而就会产生粘包和半包问题(粘包和半包的定义,详见上一篇)。所以说  TCP 协议本身并不存在粘包和半包问题,只是在使用中如果不能有效的确定流的边界就会产生粘包和半包问题。

问题二:分隔符是最优解决方案?

坦白的说,经过评论区大家的耐心“开导”,我也意识到了以结束符作为最终的解决方案存在一定的局限性,比如当一条消息中间如果出现了结束符就会造成半包的问题,所以如果是复杂的字符串要对内容进行编码和解码处理,这样才能保证结束符的正确性。

问题三:Socket 高效吗?

这个问题的答案是否定的,其实上文在开头已经描述了应用场景:「传统的 Socket  编程」,学习它的意义就在于理解更早期更底层的一些知识,当然作为补充本文会提供更加高效的消息通讯方案——Netty 通讯。

聊完了以上问题,接下来咱们先来补充一下上篇文章中提到的,将消息分为消息头和消息体的代码实现。

一、封装消息头和消息体

在开始写服务器端和客户端之前,咱们先来编写一个消息的封装类,使用它可以将消息封装成消息头和消息体,如下图所示:

如何解决Socket粘包问题

消息头中存储消息体的长度,从而确定了消息的边界,便解决粘包和半包问题。

1.消息封装类

消息的封装类中提供了两个方法:一个是将消息转换成消息头 + 消息体的方法,另一个是读取消息头的方法,具体实现代码如下:

 class SocketPacket {     // 消息头存储的长度(占 8 字节)     static final int HEAD_SIZE = 8;           public byte[] toBytes(String context) {         // 协议体 byte 数组         byte[] bodyByte = context.getBytes();         int bodyByteLength = bodyByte.length;         // 最终封装对象         byte[] result = new byte[HEAD_SIZE + bodyByteLength];         // 借助 NumberFORMat 将 int 转换为 byte[]         NumberFormat numberFormat = NumberFormat.getNumberInstance();         numberFormat.setMinimumIntegerDigits(HEAD_SIZE);         numberFormat.setGroupingUsed(false);         // 协议头 byte 数组         byte[] headByte = numberFormat.format(bodyByteLength).getBytes();         // 封装协议头         System.arraycopy(headByte, 0, result, 0, HEAD_SIZE);         // 封装协议体         System.arraycopy(bodyByte, 0, result, HEAD_SIZE, bodyByteLength);         return result;     }           public int getHeader(InputStream inputStream) throws IOException {         int result = 0;         byte[] bytes = new byte[HEAD_SIZE];         inputStream.read(bytes, 0, HEAD_SIZE);         // 得到消息体的字节长度         result = Integer.valueOf(new String(bytes));         return result;     } }

2.编写客户端

接下来我们来定义客户端,在客户端中我们添加一组待发送的消息,随机给服务器端发送一个消息,实现代码如下:

 class MySocketClient {     public static void main(String[] args) throws IOException {         // 启动 Socket 并尝试连接服务器         Socket socket = new Socket("127.0.0.1", 9093);         // 发送消息合集(随机发送一条消息)         final String[] message = {"Hi,Java.", "Hi,sql~", "关注公众号|Java中文社群."};         // 创建协议封装对象         SocketPacket socketPacket = new SocketPacket();         try (OutputStream outputStream = socket.getOutputStream()) {             // 给服务器端发送 10 次消息             for (int i = 0; i < 10; i++) {                 // 随机发送一条消息                 String msg = message[new Random().nextInt(message.length)];                 // 将内容封装为:协议头+协议体                 byte[] bytes = socketPacket.toBytes(msg);                 // 发送消息                 outputStream.write(bytes, 0, bytes.length);                 outputStream.flush();             }         }     } }

3.编写服务器端

服务器端我们使用线程池来处理每个客户端的业务请求,实现代码如下:

 class MySocketServer {     public static void main(String[] args) throws IOException {         // 创建 Socket 服务器端         ServerSocket serverSocket = new ServerSocket(9093);         // 获取客户端连接         Socket clientSocket = serverSocket.accept();         // 使用线程池处理更多的客户端         ThreadPoolExecutor threadPool = new ThreadPoolExecutor(100, 150, 100,                 TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000));         threadPool.submit(() -> {             // 客户端消息处理             proceSSMessage(clientSocket);         });     }          private static void processMessage(Socket clientSocket) {         // Socket 封装对象         SocketPacket socketPacket = new SocketPacket();         // 获取客户端发送的消息对象         try (InputStream inputStream = clientSocket.getInputStream()) {             while (true) {                 // 获取消息头(也就是消息体的长度)                 int bodyLength = socketPacket.getHeader(inputStream);                 // 消息体 byte 数组                 byte[] bodyByte = new byte[bodyLength];                 // 每次实际读取字节数                 int readCount = 0;                 // 消息体赋值下标                 int bodyIndex = 0;                 // 循环接收消息头中定义的长度                 while (bodyIndex <= (bodyLength - 1) &&                         (readCount = inputStream.read(bodyByte, bodyIndex, bodyLength)) != -1) {                     bodyIndex += readCount;                 }                 bodyIndex = 0;                 // 成功接收到客户端的消息并打印                 System.out.println("接收到客户端的信息:" + new String(bodyByte));             }         } catch (IOException ioException) {             System.out.println(ioException.getMessage());         }     } }

以上程序的执行结果如下:

如何解决Socket粘包问题

从上述结果可以看出,消息通讯正常,客户端和服务器端的交互中并没有出现粘包和半包的问题。

二、使用 Netty 实现高效通讯

以上的内容都是针对传统 Socket 编程的,但要实现更加高效的通讯和连接对象的复用就要使用 NIO(Non-Blocking IO,非阻塞 IO)或者  AIO(Asynchronous IO,异步非阻塞 IO)了。

传统的 Socket 编程是 BIO(Blocking IO,同步阻塞 IO),它和 NIO 和 AIO 的区别如下:

  • BIO 来自传统的 java.io  包,它是基于流模型实现的,交互的方式是同步、阻塞方式,也就是说在读入输入流或者输出流时,在读写动作完成之前,线程会一直阻塞在那里,它们之间的调用是可靠的线性顺序。它的优点就是代码比较简单、直观;缺点就是  IO 的效率和扩展性很低,容易成为应用性能瓶颈。

  • NIO 是 Java 1.4 引入的 java.nio 包,提供了 Channel、Selector、Buffer  等新的抽象,可以构建多路复用的、同步非阻塞 IO 程序,同时提供了更接近操作系统底层高性能的数据操作方式。

  • AIO 是 Java 1.7 之后引入的包,是 NIO 的升级版本,提供了异步非堵塞的 IO 操作方式,因此人们叫它 AIO(Asynchronous  IO),异步 IO 是基于事件和回调机制实现的,也就是应用操作之后会直接返回,不会堵塞在那里,当后台处理完成,操作系统会通知相应的线程进行后续的操作。

PS:AIO 可以看作是 NIO 的升级,它也叫 NIO 2。

传统 Socket 的通讯流程:

如何解决Socket粘包问题

NIO 的通讯流程:

如何解决Socket粘包问题

使用 Netty 替代传统 NIO 编程

NIO 的设计思路虽然很好,但它的代码编写比较麻烦,比如 Buffer 的使用和 Selector  的编写等。并且在面对断线重连、包丢失和粘包等复杂问题时手动处理的成本都很大,因此我们通常会使用 Netty 框架来替代传统的 NIO。

Netty 是什么?

Netty 是一个异步、事件驱动的用来做高性能、高可靠性的网络应用框架,使用它可以快速轻松地开发网络应用程序,极大的简化了网络编程的复杂度。

Netty 主要优点有以下几个:

  1. 鸿蒙官方战略合作共建——HarmonyOS技术社区

  2. 框架设计优雅,底层模型随意切换适应不同的网络协议要求;

  3. 提供很多标准的协议、安全、编码解码的支持;

  4. 简化了 NIO 使用中的诸多不便;

  5. 社区非常活跃,很多开源框架中都使用了 Netty 框架,如 dubboRocketMQspark 等。

Netty 主要包含以下 3 个部分,如下图所示:

如何解决Socket粘包问题

图片这 3 个部分的功能介绍如下。

1. Core 核心层

Core 核心层是 Netty 最精华的内容,它提供了底层网络通信的通用抽象和实现,包括可扩展的事件模型、通用的通信 api、支持零拷贝的 ByteBuf  等。

2. Protocol Support 协议支持层

协议支持层基本上覆盖了主流协议的编解码实现,如 Http、SSL、Protobuf、压缩、大文件传输、websocket、文本、二进制等主流协议,此外  Netty 还支持自定义应用层协议。Netty 丰富的协议支持降低了用户的开发成本,基于 Netty 我们可以快速开发 HTTP、WEBSocket  等服务。

3. Transport Service 传输服务层

传输服务层提供了网络传输能力的定义和实现方法。它支持 Socket、HTTP 隧道、虚拟机管道等传输方式。Netty 对 TCP、UDP  等数据传输做了抽象和封装,用户可以更聚焦在业务逻辑实现上,而不必关系底层数据传输的细节。

Netty 使用

对 Netty 有了大概的认识之后,接下来我们用 Netty  来编写一个基础的通讯服务器,它包含两个端:服务器端和客户端,客户端负责发送消息,服务器端负责接收并打印消息,具体的实现步骤如下。

1.添加 Netty 框架

首先我们需要先添加 Netty 框架的支持,如果是 Maven 项目添加如下配置即可:

<!-- 添加 Netty 框架 --> <!-- https://mvnrepository.com/artifact/io.netty/netty-all --> <dependency>     <groupId>io.netty</groupId>     <artifactId>netty-all</artifactId>     <version>4.1.56.Final</version> </dependency>

Netty 版本说明

Netty 的 3.x 和 4.x 为主流的稳定版本,而最新的 5.x 已经是放弃的测试版了,因此推荐使用 Netty 4.x 的最新稳定版。

2. 服务器端实现代码

按照官方的推荐,这里将服务器端的代码分为以下 3 个部分:

  • MyNettyServer:服务器端的核心业务代码;

  • ServerInitializer:服务器端通道(Channel)初始化;

  • ServerHandler:服务器端接收到信息之后的处理逻辑。

PS:Channel 字面意思为“通道”,它是网络通信的载体。Channel 提供了基本的 API 用于网络 I/O 操作,如  reGISter、bind、connect、read、write、flush 等。Netty 自己实现的 Channel 是以 jdk NIO Channel  为基础的,相比较于 JDK NIO,Netty 的 Channel 提供了更高层次的抽象,同时屏蔽了底层 Socket 的复杂性,赋予了 Channel  更加强大的功能,你在使用 Netty 时基本不需要再与 Java Socket 类直接打交道。

服务器端的实现代码如下:

// 定义服务器的端口号 static final int PORT = 8007;   static class MyNettyServer {     public static void main(String[] args) {         // 创建一个线程组,用来负责接收客户端连接         EventLoopGroup bossGroup = new NioEventLoopGroup();         // 创建另一个线程组,用来负责 I/O 的读写         EventLoopGroup workerGroup = new NioEventLoopGroup();         try {             // 创建一个 Server 实例(可理解为 Netty 的入门类)             ServerBootstrap b = new ServerBootstrap();             // 将两个线程池设置到 Server 实例             b.group(bossGroup, workerGroup)                     // 设置 Netty 通道的类型为 NiOServerSocket(非阻塞 I/O Socket 服务器)                     .channel(NioServerSocketChannel.class)                     // 设置建立连接之后的执行器(ServerInitializer 是我创建的一个自定义类)                     .childHandler(new ServerInitializer());             // 绑定端口并且进行同步             ChannelFuture future = b.bind(PORT).sync();             // 对关闭通道进行监听             future.channel().closeFuture().sync();         } catch (InterruptedException e) {             e.printStackTrace();         } finally {             // 资源关闭             bossGroup.shutdownGracefully();             workerGroup.shutdownGracefully();         }     } }   static class ServerInitializer extends ChannelInitializer<SocketChannel> {     // 字符串编码器和解码器     private static final StringDecoder DECODER = new StringDecoder();     private static final StringEncoder ENCODER = new StringEncoder();     // 服务器端连接之后的执行器(自定义的类)     private static final ServerHandler SERVER_HANDLER = new ServerHandler();           @Override     public void initChannel(SocketChannel ch) {         // 通道 Channel 设置         ChannelPipeline pipeline = ch.pipeline();         // 设置(字符串)编码器和解码器         pipeline.addLast(DECODER);         pipeline.addLast(ENCODER);         // 服务器端连接之后的执行器,接收到消息之后的业务处理         pipeline.addLast(SERVER_HANDLER);     } }   static class ServerHandler extends SimpleChannelInboundHandler<String> {           @Override     public void channelRead0(ChannelHandlerContext ctx, String request) {         if (!request.isEmpty()) {             System.out.println("接到客户端的消息:" + request);         }     }           @Override     public void channelReadComplete(ChannelHandlerContext ctx) {         ctx.flush();     }           @Override     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {         cause.printStackTrace();         ctx.close();     } }

3.客户端实现代码

客户端的代码实现也是分为以下 3 个部分:

  • MyNettyClient:客户端核心业务代码;

  • ClientInitializer:客户端通道初始化;

  • ClientHandler:接收到消息之后的处理逻辑。

客户端的实现代码如下:

 static class MyNettyClient {     public static void main(String[] args) {         // 创建事件循环线程组(客户端的线程组只有一个)         EventLoopGroup group = new NioEventLoopGroup();         try {             // Netty 客户端启动对象             Bootstrap b = new Bootstrap();             // 设置启动参数             b.group(group)                     // 设置通道类型                     .channel(NioSocketChannel.class)                     // 设置启动执行器(负责启动事件的业务执行,ClientInitializer 为自定义的类)                     .handler(new ClientInitializer());              // 连接服务器端并同步通道             Channel ch = b.connect("127.0.0.1", 8007).sync().channel();              // 发送消息             ChannelFuture lastWriteFuture = null;             // 给服务器端发送 10 条消息             for (int i = 0; i < 10; i++) {                 // 发送给服务器消息                 lastWriteFuture = ch.writeAndFlush("Hi,Java.");             }             // 在关闭通道之前,同步刷新所有的消息             if (lastWriteFuture != null) {                 lastWriteFuture.sync();             }         } catch (InterruptedException e) {             e.printStackTrace();         } finally {             // 释放资源             group.shutdownGracefully();         }     } }   static class ClientInitializer extends ChannelInitializer<SocketChannel> {     // 字符串编码器和解码器     private static final StringDecoder DECODER = new StringDecoder();     private static final StringEncoder ENCODER = new StringEncoder();     // 客户端连接成功之后业务处理     private static final ClientHandler CLIENT_HANDLER = new ClientHandler();           @Override     public void initChannel(SocketChannel ch) {         ChannelPipeline pipeline = ch.pipeline();         // 设置(字符串)编码器和解码器         pipeline.addLast(DECODER);         pipeline.addLast(ENCODER);         // 客户端连接成功之后的业务处理         pipeline.addLast(CLIENT_HANDLER);     } }   static class ClientHandler extends SimpleChannelInboundHandler<String> {          @Override     protected void channelRead0(ChannelHandlerContext ctx, String msg) {         System.err.println("接到服务器的消息:" + msg);     }           @Override     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {         cause.printStackTrace();         ctx.close();     } }

从以上代码可以看出,我们代码实现的功能是,客户端给服务器端发送 10 条消息。

编写完上述代码之后,我们就可以启动服务器端和客户端了,启动之后,它们的执行结果如下:

如何解决Socket粘包问题

从上述结果中可以看出,虽然客户端和服务器端实现了通信,但在  Netty 的使用中依然存在粘包的问题,服务器端一次收到了 10 条消息,而不是每次只收到一条消息,因此接下来我们要解决掉 Netty 中的粘包问题。

三、解决 Netty 粘包问题

在 Netty 中,解决粘包问题的常用方案有以下 3 种:

  1. 鸿蒙官方战略合作共建——HarmonyOS技术社区

  2. 设置固定大小的消息长度,如果长度不足则使用空字符弥补,它的缺点比较明显,比较消耗网络流量,因此不建议使用;

  3. 使用分隔符来确定消息的边界,从而避免粘包和半包问题的产生;

  4. 将消息分为消息头和消息体,在头部中保存有当前整个消息的长度,只有在读取到足够长度的消息之后才算是读到了一个完整的消息。

接下来我们分别来看后两种推荐的解决方案。

1.使用分隔符解决粘包问题

在 Netty 中提供了 DelimiterBasedFrameDecoder 类用来以特殊符号作为消息的结束符,从而解决粘包和半包的问题。

它的核心实现代码是在初始化通道(Channel)时,通过设置 DelimiterBasedFrameDecoder  来分隔消息,需要在客户端和服务器端都进行设置,具体实现代码如下。

服务器端核心实现代码如下:

 static class ServerInitializer extends ChannelInitializer<SocketChannel> {     // 字符串编码器和解码器     private static final StringDecoder DECODER = new StringDecoder();     private static final StringEncoder ENCODER = new StringEncoder();     // 服务器端连接之后的执行器(自定义的类)     private static final ServerHandler SERVER_HANDLER = new ServerHandler();           @Override     public void initChannel(SocketChannel ch) {         // 通道 Channel 设置         ChannelPipeline pipeline = ch.pipeline();         // 19 行:设置结尾分隔符【核心代码】(参数1:为消息的最大长度,可自定义;参数2:分隔符[此处以换行符为分隔符])         pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));         // 设置(字符串)编码器和解码器         pipeline.addLast(DECODER);         pipeline.addLast(ENCODER);         // 服务器端连接之后的执行器,接收到消息之后的业务处理         pipeline.addLast(SERVER_HANDLER);     } }

核心代码为第 19 行,代码中已经备注了方法的含义,这里就不再赘述。

客户端的核心实现代码如下:

 static class ClientInitializer extends ChannelInitializer<SocketChannel> {     // 字符串编码器和解码器     private static final StringDecoder DECODER = new StringDecoder();     private static final StringEncoder ENCODER = new StringEncoder();     // 客户端连接成功之后业务处理     private static final ClientHandler CLIENT_HANDLER = new ClientHandler();           @Override     public void initChannel(SocketChannel ch) {         ChannelPipeline pipeline = ch.pipeline();         // 17 行:设置结尾分隔符【核心代码】(参数1:为消息的最大长度,可自定义;参数2:分隔符[此处以换行符为分隔符])         pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));         // 设置(字符串)编码器和解码器         pipeline.addLast(DECODER);         pipeline.addLast(ENCODER);         // 客户端连接成功之后的业务处理         pipeline.addLast(CLIENT_HANDLER);     } }

完整的服务器端和客户端的实现代码如下:

import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder;  public class NettyExample {     // 定义服务器的端口号     static final int PORT = 8007;           static class MyNettyServer {         public static void main(String[] args) {             // 创建一个线程组,用来负责接收客户端连接             EventLoopGroup bossGroup = new NioEventLoopGroup();             // 创建另一个线程组,用来负责 I/O 的读写             EventLoopGroup workerGroup = new NioEventLoopGroup();             try {                 // 创建一个 Server 实例(可理解为 Netty 的入门类)                 ServerBootstrap b = new ServerBootstrap();                 // 将两个线程池设置到 Server 实例                 b.group(bossGroup, workerGroup)                         // 设置 Netty 通道的类型为 NioServerSocket(非阻塞 I/O Socket 服务器)                         .channel(NioServerSocketChannel.class)                         // 设置建立连接之后的执行器(ServerInitializer 是我创建的一个自定义类)                         .childHandler(new ServerInitializer());                 // 绑定端口并且进行同步                 ChannelFuture future = b.bind(PORT).sync();                 // 对关闭通道进行监听                 future.channel().closeFuture().sync();             } catch (InterruptedException e) {                 e.printStackTrace();             } finally {                 // 资源关闭                 bossGroup.shutdownGracefully();                 workerGroup.shutdownGracefully();             }         }     }           static class ServerInitializer extends ChannelInitializer<SocketChannel> {         // 字符串编码器和解码器         private static final StringDecoder DECODER = new StringDecoder();         private static final StringEncoder ENCODER = new StringEncoder();         // 服务器端连接之后的执行器(自定义的类)         private static final ServerHandler SERVER_HANDLER = new ServerHandler();                   @Override         public void initChannel(SocketChannel ch) {             // 通道 Channel 设置             ChannelPipeline pipeline = ch.pipeline();             // 设置结尾分隔符             pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));             // 设置(字符串)编码器和解码器             pipeline.addLast(DECODER);             pipeline.addLast(ENCODER);             // 服务器端连接之后的执行器,接收到消息之后的业务处理             pipeline.addLast(SERVER_HANDLER);         }     }           static class ServerHandler extends SimpleChannelInboundHandler<String> {                   @Override         public void channelRead0(ChannelHandlerContext ctx, String request) {             if (!request.isEmpty()) {                 System.out.println("接到客户端的消息:" + request);             }         }                   @Override         public void channelReadComplete(ChannelHandlerContext ctx) {             ctx.flush();         }                   @Override         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {             cause.printStackTrace();             ctx.close();         }     }           static class MyNettyClient {         public static void main(String[] args) {             // 创建事件循环线程组(客户端的线程组只有一个)             EventLoopGroup group = new NioEventLoopGroup();             try {                 // Netty 客户端启动对象                 Bootstrap b = new Bootstrap();                 // 设置启动参数                 b.group(group)                         // 设置通道类型                         .channel(NioSocketChannel.class)                         // 设置启动执行器(负责启动事件的业务执行,ClientInitializer 为自定义的类)                         .handler(new ClientInitializer());                  // 连接服务器端并同步通道                 Channel ch = b.connect("127.0.0.1", PORT).sync().channel();                  // 发送消息                 ChannelFuture lastWriteFuture = null;                 // 给服务器端发送 10 条消息                 for (int i = 0; i < 10; i++) {                     // 发送给服务器消息                     lastWriteFuture = ch.writeAndFlush("Hi,Java.\n");                 }                 // 在关闭通道之前,同步刷新所有的消息                 if (lastWriteFuture != null) {                     lastWriteFuture.sync();                 }             } catch (InterruptedException e) {                 e.printStackTrace();             } finally {                 // 释放资源                 group.shutdownGracefully();             }         }     }           static class ClientInitializer extends ChannelInitializer<SocketChannel> {         // 字符串编码器和解码器         private static final StringDecoder DECODER = new StringDecoder();         private static final StringEncoder ENCODER = new StringEncoder();         // 客户端连接成功之后业务处理         private static final ClientHandler CLIENT_HANDLER = new ClientHandler();                   @Override         public void initChannel(SocketChannel ch) {             ChannelPipeline pipeline = ch.pipeline();             // 设置结尾分隔符             pipeline.addLast(new DelimiterBasedFrameDecoder(1024, Delimiters.lineDelimiter()));             // 设置(字符串)编码器和解码器             pipeline.addLast(DECODER);             pipeline.addLast(ENCODER);             // 客户端连接成功之后的业务处理             pipeline.addLast(CLIENT_HANDLER);         }     }           static class ClientHandler extends SimpleChannelInboundHandler<String> {                   @Override         protected void channelRead0(ChannelHandlerContext ctx, String msg) {             System.err.println("接到服务器的消息:" + msg);         }                   @Override         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {             cause.printStackTrace();             ctx.close();         }     } }

最终的执行结果如下图所示:

如何解决Socket粘包问题

从上述结果中可以看出,Netty 可以正常使用了,它已经不存在粘包和半包问题了。

2.封装消息解决粘包问题

此解决方案的核心是将消息分为消息头 +  消息体,在消息头中保存消息体的长度,从而确定一条消息的边界,这样就避免了粘包和半包问题了,它的实现过程如下图所示:

如何解决Socket粘包问题

在 Netty 中可以通过  LengthFieldPrepender(编码)和  LengthFieldBasedFrameDecoder(解码)两个类实现消息的封装。和上一个解决方案类似,我们需要分别在服务器端和客户端通过设置通道(Channel)来解决粘包问题。

服务器端的核心代码如下:

 static class ServerInitializer extends ChannelInitializer<SocketChannel> {     // 字符串编码器和解码器     private static final StringDecoder DECODER = new StringDecoder();     private static final StringEncoder ENCODER = new StringEncoder();     // 服务器端连接之后的执行器(自定义的类)     private static final NettyExample.ServerHandler SERVER_HANDLER = new NettyExample.ServerHandler();           @Override     public void initChannel(SocketChannel ch) {         // 通道 Channel 设置         ChannelPipeline pipeline = ch.pipeline();         // 18 行:消息解码:读取消息头和消息体         pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));         // 20 行:消息编码:将消息封装为消息头和消息体,在消息前添加消息体的长度         pipeline.addLast(new LengthFieldPrepender(4));         // 设置(字符串)编码器和解码器         pipeline.addLast(DECODER);         pipeline.addLast(ENCODER);         // 服务器端连接之后的执行器,接收到消息之后的业务处理         pipeline.addLast(SERVER_HANDLER);     } }

其中核心代码是 18 行和 20 行,通过 LengthFieldPrepender 实现编码(将消息打包成消息头 + 消息体),通过  LengthFieldBasedFrameDecoder 实现解码(从封装的消息中取出消息的内容)。

LengthFieldBasedFrameDecoder 的参数说明如下:

  1. 鸿蒙官方战略合作共建——HarmonyOS技术社区

  2. 参数 1:maxFrameLength - 发送的数据包最大长度;

  3. 参数 2:lengthFieldOffset - 长度域偏移量,指的是长度域位于整个数据包字节数组中的下标;

  4. 参数 3:lengthFieldLength - 长度域自己的字节数长度;

  5. 参数 4:lengthAdjustment &ndash;  长度域的偏移量矫正。如果长度域的值,除了包含有效数据域的长度外,还包含了其他域(如长度域自身)长度,那么,就需要进行矫正。矫正的值为:包长 - 长度域的值 &ndash;  长度域偏移 &ndash; 长度域长;

  6. 参数 5:initialBytesToStrip &ndash; 丢弃的起始字节数。丢弃处于有效数据前面的字节数量。比如前面有 4 个节点的长度域,则它的值为  4。

LengthFieldBasedFrameDecoder(1024,0,4,0,4) 的意思是:数据包最大长度为  1024,长度域占首部的四个字节,在读数据的时候去掉首部四个字节(即长度域)。

客户端的核心实现代码如下:

 static class ClientInitializer extends ChannelInitializer<SocketChannel> {     // 字符串编码器和解码器     private static final StringDecoder DECODER = new StringDecoder();     private static final StringEncoder ENCODER = new StringEncoder();     // 客户端连接成功之后业务处理     private static final NettyExample.ClientHandler CLIENT_HANDLER = new NettyExample.ClientHandler();           @Override     public void initChannel(SocketChannel ch) {         ChannelPipeline pipeline = ch.pipeline();         // 消息解码:读取消息头和消息体         pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));         // 消息编码:将消息封装为消息头和消息体,在响应字节数据前面添加消息体长度         pipeline.addLast(new LengthFieldPrepender(4));         // 设置(字符串)编码器和解码器         pipeline.addLast(DECODER);         pipeline.addLast(ENCODER);         // 客户端连接成功之后的业务处理         pipeline.addLast(CLIENT_HANDLER);     } }

完整的服务器端和客户端的实现代码如下:

import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder;   public class NettyExample {     // 定义服务器的端口号     static final int PORT = 8007;           static class MyNettyServer {         public static void main(String[] args) {             // 创建一个线程组,用来负责接收客户端连接             EventLoopGroup bossGroup = new NioEventLoopGroup();             // 创建另一个线程组,用来负责 I/O 的读写             EventLoopGroup workerGroup = new NioEventLoopGroup();             try {                 // 创建一个 Server 实例(可理解为 Netty 的入门类)                 ServerBootstrap b = new ServerBootstrap();                 // 将两个线程池设置到 Server 实例                 b.group(bossGroup, workerGroup)                         // 设置 Netty 通道的类型为 NioServerSocket(非阻塞 I/O Socket 服务器)                         .channel(NioServerSocketChannel.class)                         // 设置建立连接之后的执行器(ServerInitializer 是我创建的一个自定义类)                         .childHandler(new NettyExample.ServerInitializer());                 // 绑定端口并且进行同步                 ChannelFuture future = b.bind(PORT).sync();                 // 对关闭通道进行监听                 future.channel().closeFuture().sync();             } catch (InterruptedException e) {                 e.printStackTrace();             } finally {                 // 资源关闭                 bossGroup.shutdownGracefully();                 workerGroup.shutdownGracefully();             }         }     }           static class ServerInitializer extends ChannelInitializer<SocketChannel> {         // 字符串编码器和解码器         private static final StringDecoder DECODER = new StringDecoder();         private static final StringEncoder ENCODER = new StringEncoder();         // 服务器端连接之后的执行器(自定义的类)         private static final NettyExample.ServerHandler SERVER_HANDLER = new NettyExample.ServerHandler();                   @Override         public void initChannel(SocketChannel ch) {             // 通道 Channel 设置             ChannelPipeline pipeline = ch.pipeline();             // 消息解码:读取消息头和消息体             pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));             // 消息编码:将消息封装为消息头和消息体,在响应字节数据前面添加消息体长度             pipeline.addLast(new LengthFieldPrepender(4));             // 设置(字符串)编码器和解码器             pipeline.addLast(DECODER);             pipeline.addLast(ENCODER);             // 服务器端连接之后的执行器,接收到消息之后的业务处理             pipeline.addLast(SERVER_HANDLER);         }     }           static class ServerHandler extends SimpleChannelInboundHandler<String> {                   @Override         public void channelRead0(ChannelHandlerContext ctx, String request) {             if (!request.isEmpty()) {                 System.out.println("接到客户端的消息:" + request);             }         }                   @Override         public void channelReadComplete(ChannelHandlerContext ctx) {             ctx.flush();         }                   @Override         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {             cause.printStackTrace();             ctx.close();         }     }           static class MyNettyClient {         public static void main(String[] args) {             // 创建事件循环线程组(客户端的线程组只有一个)             EventLoopGroup group = new NioEventLoopGroup();             try {                 // Netty 客户端启动对象                 Bootstrap b = new Bootstrap();                 // 设置启动参数                 b.group(group)                         // 设置通道类型                         .channel(NioSocketChannel.class)                         // 设置启动执行器(负责启动事件的业务执行,ClientInitializer 为自定义的类)                         .handler(new NettyExample.ClientInitializer());                  // 连接服务器端并同步通道                 Channel ch = b.connect("127.0.0.1", PORT).sync().channel();                  // 发送消息                 ChannelFuture lastWriteFuture = null;                 // 给服务器端发送 10 条消息                 for (int i = 0; i < 10; i++) {                     // 发送给服务器消息                     lastWriteFuture = ch.writeAndFlush("Hi,Java.\n");                 }                 // 在关闭通道之前,同步刷新所有的消息                 if (lastWriteFuture != null) {                     lastWriteFuture.sync();                 }             } catch (InterruptedException e) {                 e.printStackTrace();             } finally {                 // 释放资源                 group.shutdownGracefully();             }         }     }           static class ClientInitializer extends ChannelInitializer<SocketChannel> {         // 字符串编码器和解码器         private static final StringDecoder DECODER = new StringDecoder();         private static final StringEncoder ENCODER = new StringEncoder();         // 客户端连接成功之后业务处理         private static final NettyExample.ClientHandler CLIENT_HANDLER = new NettyExample.ClientHandler();                   @Override         public void initChannel(SocketChannel ch) {             ChannelPipeline pipeline = ch.pipeline();             // 消息解码:读取消息头和消息体             pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));             // 消息编码:将消息封装为消息头和消息体,在响应字节数据前面添加消息体长度             pipeline.addLast(new LengthFieldPrepender(4));             // 设置(字符串)编码器和解码器             pipeline.addLast(DECODER);             pipeline.addLast(ENCODER);             // 客户端连接成功之后的业务处理             pipeline.addLast(CLIENT_HANDLER);         }     }           static class ClientHandler extends SimpleChannelInboundHandler<String> {                   @Override         protected void channelRead0(ChannelHandlerContext ctx, String msg) {             System.err.println("接到服务器的消息:" + msg);         }                   @Override         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {             cause.printStackTrace();             ctx.close();         }     } }

以上程序的执行结果为:

如何解决Socket粘包问题

“如何解决Socket粘包问题”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注编程网网站,小编将为大家输出更多高质量的实用文章!

--结束END--

本文标题: 如何解决Socket粘包问题

本文链接: https://www.lsjlt.com/news/84019.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • 如何解决Socket粘包问题
    本篇内容介绍了“如何解决Socket粘包问题”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!问题一:TCP存...
    99+
    2024-04-02
  • python socket粘包问题怎么解决
    今天小编给大家分享一下python socket粘包问题怎么解决的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我...
    99+
    2024-04-02
  • Socket粘包问题的解决方法有哪些
    这篇文章主要讲解了“Socket粘包问题的解决方法有哪些”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Socket粘包问题的解决方法有哪些”吧!什么是 TC...
    99+
    2024-04-02
  • 解决TCP粘包/拆包问题的方法及示例
    TCP粘包和拆包是网络编程中常见的问题,特别是在数据传输的过程中,可能会发生将多个数据包粘在一起或将一个数据包拆成多个数据包的情况,这可能会导致应用程序无法正确解析数据,从而造成数据错误或系统故障。本文将介绍TCP粘包和拆包的原因、解决方案...
    99+
    2023-09-25
    网络 服务器 tcp/ip c# 网络协议
  • Netty如何解决TCP 粘包拆包
    小编给大家分享一下Netty如何解决TCP 粘包拆包,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!什么是粘包/拆包    ...
    99+
    2023-06-20
  • workerman怎么自定义协议解决粘包拆包问题
    这篇“workerman怎么自定义协议解决粘包拆包问题”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“workerman怎么自...
    99+
    2023-07-04
  • 如何解决TCP socket的阻塞问题
    目录解决TCP socket的阻塞问题在异常处理程序当中退出socket连接TCP连接阻塞的监控和处理我们整理出符合该类异常的特征如下如何查看一个连接的创建时间解决TCP socke...
    99+
    2024-04-02
  • Golang通过包长协议处理TCP粘包的问题解决
    目录tcp粘包现象代码重现tcp粘包问题处理方法tcp粘包产生的原因这里就不说了,因为大家能搜索TCP粘包的处理方法,想必大概对TCP粘包有了一定了解,所以我们直接从处理思路开始讲起...
    99+
    2024-04-02
  • 如何解决xp不能复制粘贴问题
    小编给大家分享一下如何解决xp不能复制粘贴问题,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!无法使用复粘贴功能,可以用注册表修复,注册表是系统的核心部分,所以操作时为谨慎。修复方法,先进入注册表,从开始菜单中打开“运行”程...
    99+
    2023-06-28
  • 如何解决C# Socket发送数据大小问题
    今天就跟大家聊聊有关如何解决C# Socket发送数据大小问题,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。TCP/IP是可靠性传输协议,它能保证数据能按顺序的方式到达目的地.看到以...
    99+
    2023-06-17
  • workerman 自定义的协议如何解决粘包拆包
    前言:         由于最近在使用 workerman 实现 Unity3D 联机游戏的服务端,虽然也可以通过 TCP 协议直接通信,但是在实际测试的过程中发现了一些小问题。         比如双方的数据包都是字符串的方式吗,还有就因...
    99+
    2023-09-07
    TCP粘包拆包 workerman unity3d PHP网游服务
  • Python Socket通信黏包问题分
    参考:http://www.cnblogs.com/Eva-J/articles/8244551.html#_label5 1.黏包的表现(以客户端远程操作服务端命令为例) 注:只有在TCP协议通信的情况下,才会产生黏包问题 基于TCP协...
    99+
    2023-01-30
    通信 Python Socket
  • 如何解决webpack4 css打包压缩问题
    这篇文章将为大家详细讲解有关如何解决webpack4 css打包压缩问题,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。这两天一直在练习这个webpack4, 发现有好多问...
    99+
    2024-04-02
  • 如何解决webpack dll打包重复问题
    这篇文章主要介绍了如何解决webpack dll打包重复问题,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。关于webpack dll的使用,...
    99+
    2024-04-02
  • 如何分析Socket TIME_WAIT 问题
    本篇文章给大家分享的是有关如何分析Socket TIME_WAIT 问题,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。Socket TIME_WAIT 问题tcp/ip详解的卷...
    99+
    2023-06-04
  • pycharm无法导入包问题如何解决
    如果PyCharm无法导入包,可以尝试以下几种解决方法:1. 确保你已经正确安装了需要导入的包。可以通过在终端中运行`pip ins...
    99+
    2023-09-22
    pycharm
  • 如何解决IDEA包转模块的问题
    这篇文章给大家分享的是有关如何解决IDEA包转模块的问题的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。在IDEA拉取代码,但是拉取下来之后,发现之前创建的模块变成包类型的了(直接模块也可能造成的这个问题,同样可以...
    99+
    2023-06-15
  • 怎么解决TCP socket的阻塞问题
    小编给大家分享一下怎么解决TCP socket的阻塞问题,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!解决TCP socket的阻塞问题大家知道,tcp...
    99+
    2023-06-22
  • Netty解决 TCP 粘包拆包的方法
    什么是粘包/拆包        一般所谓的TCP粘包是在一次接收数据不能完全地体现一个完整的消息数据。TCP通讯为何存在粘...
    99+
    2024-04-02
  • 如何解决golang在import包报错的问题
    这篇文章主要介绍了如何解决golang在import包报错的问题,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。什么是golanggolang 是Google开发的一种静态强类...
    99+
    2023-06-14
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作