数据通信场景:长连接还是短连接
我们如何在真实场景中使用Netty进行交流?大致有三种方式:
首先,长连接通道用于通信,即服务器和客户端之间的通道始终是开放的。如果服务器性能足够好,而且我们的客户端数量比较少,是一个适合长连接的通道。
第二,采用短连接方式,一次性批量提交数据,即我们会将数据保存在本地的临时缓冲区或临时表中。当达到数量时,进行批量提交;或者通过计划任务轮询提交。这种情况有弊端,就是无法实现实时传输。如果应用不要求实时性能,可以考虑使用。
第三,采用特殊的长连接。它有什么特别之处?如果在指定时间内服务器和客户端之间没有通信,连接将被断开。如果客户端在断开连接后需要向服务器发送请求,将会重新建立连接。这里有点像缓存线程池。
本博客将使用Netty实现第三种数据通信方式。接下来就来看看吧~
Netty数据通信代码示例
请求消息对象
package day3; import java.io.Serializable; public class Request implements Serializable{ private static final long SerialVersionUID = 1L; private String id ; private String name ; private String requestMessage ; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getRequestMessage() { return requestMessage; } public void setRequestMessage(String requestMessage) { this.requestMessage = requestMessage; } }响应消息对象
package day3; import java.io.Serializable; public class Response implements Serializable{ private static final long serialVersionUID = 1L; private String id; private String name; private String responseMessage; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getResponseMessage() { return responseMessage; } public void setResponseMessage(String responseMessage) { this.responseMessage = responseMessage; } }编解码器处理器
注意,在上一篇博客Netty练习(二):TCP解包粘贴中,我们继承了ByteToMessageDecoder和MessageToByteEncoder,实现了ByteBuff和message对象之间的转换。事实上,这在实践中,我们可以利用相关的序列化框架(JBossMarshlling/protobuf/Kryo/MessagePack)来帮助我们快速完成编码和解码。这里我用的是JBoss编组(JBoss-marshaling-1.3.0.cr9.jar+JBoss-marshaling-Serial-1.3.0.cr9.jar)。具体来说,客户端与服务器交互的消息对象只需要实现JDK默认的序列化接口,同时使用JBoss编组为后续的客户端/服务器生成编码器和XXX。
客户端处理程序
在这里,我们可以清楚地看到,我们直接将对象转换成了自定义的消息响应对象,这说明JBoss编组与Netty结合后,编解码是如此简单。
客户
package day3; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; import java.util.concurrent.TimeUnit; /** * */ public class Client { private static class SingletonHolder { static final Client instance = new Client(); } public static Client getInstance(){ return SingletonHolder.instance; } private EventLoopGroup group; private Bootstrap b; //通过ChannelFuture实现读写 *** 作 private ChannelFuture cf ; private Client(){ group = new NioEventLoopGroup(); b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); //超时handler(当服务器端与客户端在指定时间以上没有任何进行通信,则会关闭相应的通道,主要为减小服务端资源占用) sc.pipeline().addLast(new ReadTimeoutHandler(3)); sc.pipeline().addLast(new ClientHandler()); } }); } public void connect(){ try { this.cf = b.connect("127.0.0.1", 8765).sync(); System.out.println("远程服务器已经连接, 可以进行数据交换.."); } catch (Exception e) { e.printStackTrace(); } } //这里是通道关闭,再次建立连接的核心代码 public ChannelFuture getChannelFuture(){ if(this.cf == null){ this.connect(); } if(!this.cf.channel().isActive()){ this.connect(); } return this.cf; } public static void main(String[] args) throws Exception{ final Client c = Client.getInstance(); //注意client好像没有调用connect()方法进行连接,但是实际上在下面的代码中做了 ChannelFuture cf = c.getChannelFuture(); for(int i = 1; i <= 3; i++ ){ Request request = new Request(); request.setId("" + i); request.setName("pro" + i); request.setRequestMessage("数据信息" + i); cf.channel().writeAndFlush(request); TimeUnit.SECONDS.sleep(4); } cf.channel().closeFuture().sync(); //通道关闭后,通过另一个线程模拟客户端再次建立连接发送请求 new Thread(new Runnable() { @Override public void run() { try { System.out.println("进入子线程..."); ChannelFuture cf = c.getChannelFuture(); System.out.println(cf.channel().isActive()); System.out.println(cf.channel().isOpen()); //再次发送数据 Request request = new Request(); request.setId("" + 4); request.setName("pro" + 4); request.setRequestMessage("数据信息" + 4); cf.channel().writeAndFlush(request); cf.channel().closeFuture().sync(); System.out.println("子线程结束..."); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); System.out.println("断开连接,主线程结束.."); } }这里对客户端进行了初步封装,singleton通过静态内部类实现。
客户端的Handler不仅仅是Marshall编译的,还有Netty自带的ReadTimeoutHandler,是客户端和服务器一段时间不通信断开连接的基础。从这里也可以看出Netty的强大。通过提供一些预定义的处理程序,您可以使您的代码简单,只需关注业务实现。客户端超时后断开通道后,如何建立连接再次通信?你通过getChannelFuture()就知道了。
客户端代码模拟一个线程通信超时,通道关闭后,另一个线程再次与服务器通信。
服务器处理程序
计算机网络服务器
package day3; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; public class Server { public static void main(String[] args) throws Exception{ EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) //设置日志 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); sc.pipeline().addLast(new ReadTimeoutHandler(3)); sc.pipeline().addLast(new ServerHandler()); } }); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }运行结果
注意:首先,客户端向服务器发送三条消息,但是每条消息的发送间隔是4S。因为超时设置为3S,所以在发送第一条消息后,通道被断开。接下来,客户端启动另一个线程再次与服务器通信。
就这样,这个博客结束了。对你有用吗?
下周我们来看看Netty在心跳检测中的应用。_
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)