Netty实践(三):实际场景下的数据通信

Netty实践(三):实际场景下的数据通信,第1张

Netty实践(三):实际场景下的数据通信

数据通信场景:长连接还是短连接

我们如何在真实场景中使用Netty进行交流?大致有三种方式:

首先,长连接通道用于通信,即服务器和客户端之间的通道始终是开放的。如果服务器性能足够好,而且我们的客户端数量比较少,是一个适合长连接的通道。

第二,采用短连接方式,一次性批量提交数据,即我们会将数据保存在本地的临时缓冲区或临时表中。当达到数量时,进行批量提交;或者通过计划任务轮询提交。这种情况有弊端,就是无法实现实时传输。如果应用不要求实时性能,可以考虑使用。

第三,采用特殊的长连接。它有什么特别之处?如果在指定时间内服务器和客户端之间没有通信,连接将被断开。如果客户端在断开连接后需要向服务器发送请求,将会重新建立连接。这里有点像缓存线程池。

本博客将使用Netty实现第三种数据通信方式。接下来就来看看吧~



Netty数据通信代码示例


请求消息对象

package day3; import java.io.Serializable; public class Request implements Serializable{    private static final long  SerialVersionUID = 1L;        private String id ;    private String name ;    private String requestMessage ;        public String getId() {       return id;    }    public void setId(String id) {       this.id = id;    }    public String getName() {       return name;    }    public void setName(String name) {       this.name = name;    }    public String getRequestMessage() {       return requestMessage;    }    public void setRequestMessage(String requestMessage) {       this.requestMessage = requestMessage;    } }


响应消息对象

package day3; import java.io.Serializable; public class Response implements Serializable{        private static final long serialVersionUID = 1L;        private String id;    private String name;    private String responseMessage;        public String getId() {       return id;    }    public void setId(String id) {       this.id = id;    }    public String getName() {       return name;    }    public void setName(String name) {       this.name = name;    }    public String getResponseMessage() {       return responseMessage;    }    public void setResponseMessage(String responseMessage) {       this.responseMessage = responseMessage;    }     }


编解码器处理器

package day3; import io.netty.handler.codec.marshalling.DefaultMarshallerProvider; import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider; import io.netty.handler.codec.marshalling.MarshallerProvider; import io.netty.handler.codec.marshalling.MarshallingDecoder; import io.netty.handler.codec.marshalling.MarshallingEncoder; import io.netty.handler.codec.marshalling.UnmarshallerProvider; import org.jboss.marshalling.MarshallerFactory; import org.jboss.marshalling.Marshalling; import org.jboss.marshalling.MarshallingConfiguration; /**  * Marshalling工厂  */ public final class MarshallingCodeCFactory {     /**      * 创建Jboss Marshalling×××MarshallingDecoder      * @return MarshallingDecoder      */     public static MarshallingDecoder buildMarshallingDecoder() {        //首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。       final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");       //创建了MarshallingConfiguration对象,配置了版本号为5        final MarshallingConfiguration configuration = new MarshallingConfiguration();       configuration.setVersion(5);       //根据marshallerFactory和configuration创建provider       UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);       //构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度       MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);       return decoder;     }     /**      * 创建Jboss Marshalling编码器MarshallingEncoder      * @return MarshallingEncoder      */     public static MarshallingEncoder buildMarshallingEncoder() {       final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");       final MarshallingConfiguration configuration = new MarshallingConfiguration();       configuration.setVersion(5);       MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);       //构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组       MarshallingEncoder encoder = new MarshallingEncoder(provider);       return encoder;     } }

注意,在上一篇博客Netty练习(二):TCP解包粘贴中,我们继承了ByteToMessageDecoder和MessageToByteEncoder,实现了ByteBuff和message对象之间的转换。事实上,这在实践中,我们可以利用相关的序列化框架(JBossMarshlling/protobuf/Kryo/MessagePack)来帮助我们快速完成编码和解码。这里我用的是JBoss编组(JBoss-marshaling-1.3.0.cr9.jar+JBoss-marshaling-Serial-1.3.0.cr9.jar)。具体来说,客户端与服务器交互的消息对象只需要实现JDK默认的序列化接口,同时使用JBoss编组为后续的客户端/服务器生成编码器和XXX。


客户端处理程序

package day3; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.ReferenceCountUtil; public class ClientHandler extends ChannelHandlerAdapter{        @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {       try {          Response resp = (Response)msg;          System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());              } finally {          ReferenceCountUtil.release(msg);       }    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {       ctx.close();    }     }

在这里,我们可以清楚地看到,我们直接将对象转换成了自定义的消息响应对象,这说明JBoss编组与Netty结合后,编解码是如此简单。



客户

package day3; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; import java.util.concurrent.TimeUnit; /**  *  */ public class Client {        private static class SingletonHolder {       static final Client instance = new Client();    }        public static Client getInstance(){       return SingletonHolder.instance;    }        private EventLoopGroup group;    private Bootstrap b;    //通过ChannelFuture实现读写 *** 作    private ChannelFuture cf ;        private Client(){          group = new NioEventLoopGroup();          b = new Bootstrap();          b.group(group)           .channel(NioSocketChannel.class)           .handler(new LoggingHandler(LogLevel.INFO))           .handler(new ChannelInitializer<SocketChannel>() {                @Override                protected void initChannel(SocketChannel sc) throws Exception {                   sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());                   sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());                   //超时handler(当服务器端与客户端在指定时间以上没有任何进行通信,则会关闭相应的通道,主要为减小服务端资源占用)                   sc.pipeline().addLast(new ReadTimeoutHandler(3));                    sc.pipeline().addLast(new ClientHandler());                }           });    }        public void connect(){       try {          this.cf = b.connect("127.0.0.1", 8765).sync();          System.out.println("远程服务器已经连接, 可以进行数据交换..");                   } catch (Exception e) {          e.printStackTrace();       }    }    //这里是通道关闭,再次建立连接的核心代码    public ChannelFuture getChannelFuture(){              if(this.cf == null){          this.connect();       }       if(!this.cf.channel().isActive()){          this.connect();       }              return this.cf;    }        public static void main(String[] args) throws Exception{       final Client c = Client.getInstance();       //注意client好像没有调用connect()方法进行连接,但是实际上在下面的代码中做了       ChannelFuture cf = c.getChannelFuture();       for(int i = 1; i <= 3; i++ ){          Request request = new Request();          request.setId("" + i);          request.setName("pro" + i);          request.setRequestMessage("数据信息" + i);          cf.channel().writeAndFlush(request);          TimeUnit.SECONDS.sleep(4);       }       cf.channel().closeFuture().sync();              //通道关闭后,通过另一个线程模拟客户端再次建立连接发送请求       new Thread(new Runnable() {          @Override          public void run() {             try {                System.out.println("进入子线程...");                ChannelFuture cf = c.getChannelFuture();                System.out.println(cf.channel().isActive());                System.out.println(cf.channel().isOpen());                                //再次发送数据                Request request = new Request();                request.setId("" + 4);                request.setName("pro" + 4);                request.setRequestMessage("数据信息" + 4);                cf.channel().writeAndFlush(request);                               cf.channel().closeFuture().sync();                System.out.println("子线程结束...");             } catch (InterruptedException e) {                e.printStackTrace();             }          }       }).start();              System.out.println("断开连接,主线程结束..");           }             }

这里对客户端进行了初步封装,singleton通过静态内部类实现。

客户端的Handler不仅仅是Marshall编译的,还有Netty自带的ReadTimeoutHandler,是客户端和服务器一段时间不通信断开连接的基础。从这里也可以看出Netty的强大。通过提供一些预定义的处理程序,您可以使您的代码简单,只需关注业务实现。客户端超时后断开通道后,如何建立连接再次通信?你通过getChannelFuture()就知道了。

客户端代码模拟一个线程通信超时,通道关闭后,另一个线程再次与服务器通信。



服务器处理程序

package day3; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; public class ServerHandler extends ChannelHandlerAdapter{    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {       Request request = (Request)msg;       System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getRequestMessage());       Response response = new Response();       response.setId(request.getId());       response.setName("response" + request.getId());       response.setResponseMessage("响应内容" + request.getId());       ctx.writeAndFlush(response);//.addListener(ChannelFutureListener.CLOSE);    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {       ctx.close();    }         }


计算机网络服务器

package day3; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; public class Server {    public static void main(String[] args) throws Exception{              EventLoopGroup pGroup = new NioEventLoopGroup();       EventLoopGroup cGroup = new NioEventLoopGroup();              ServerBootstrap b = new ServerBootstrap();       b.group(pGroup, cGroup)        .channel(NioServerSocketChannel.class)        .option(ChannelOption.SO_BACKLOG, 1024)        //设置日志        .handler(new LoggingHandler(LogLevel.INFO))        .childHandler(new ChannelInitializer<SocketChannel>() {          protected void initChannel(SocketChannel sc) throws Exception {             sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());             sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());             sc.pipeline().addLast(new ReadTimeoutHandler(3));             sc.pipeline().addLast(new ServerHandler());          }       });              ChannelFuture cf = b.bind(8765).sync();              cf.channel().closeFuture().sync();       pGroup.shutdownGracefully();       cGroup.shutdownGracefully();           } }



运行结果


注意:首先,客户端向服务器发送三条消息,但是每条消息的发送间隔是4S。因为超时设置为3S,所以在发送第一条消息后,通道被断开。接下来,客户端启动另一个线程再次与服务器通信。



就这样,这个博客结束了。对你有用吗?

下周我们来看看Netty在心跳检测中的应用。_






欢迎分享,转载请注明来源:内存溢出

原文地址: http://www.outofmemory.cn/zz/777911.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-03
下一篇 2022-05-03

发表评论

登录后才能评论

评论列表(0条)

保存