標籤:

基於Netty的IM簡單實現原理

最近在開發MobIM,實現了消息傳輸和群等功能的IM功能。SDK功能包小,而功能全面。可以與原來的系統進行無縫整合。

自己抽空也實現了一套IM Server和IMClient的業務通信模式。沒有實現複雜的UI界面,實現簡單的登錄註冊,發消息,收消息。伺服器端與客戶端都使用Netty通信。

Netty基於非阻塞(nio),事件驅動的網路應用程序框架和工具。

通過Netty面對大規模的並發請求可以處理的得心用手。用來替代原來的bio網路應用請求框架。

BIO通信即平時使用的基於Socket,ServerSocket的InputStream和OutStream。

Netty神奇的地方在於是否是阻塞的。

while(true){//主線程死循環等待新連接到來 Socket socket = serverSocket.accept();//為新的連接創建新的線程,客戶端與伺服器上的線程數1:1 executor.submit(new ConnectIOnHandler(socket));

在BIO模型中,伺服器通過ServerSocket來開啟監聽,每當有請求的時候開啟一個線程來接受處理和維持狀態。這種思想在低並發,小吞吐的應用還可以應付,一旦遇到大並發,大吞吐的請求,必然歇菜。線程和客戶端保持著1:1的對應關係,維持著線程。維持那麼的多的線程,JVM必然不堪重負,伺服器必然崩潰,宕機。

而在非阻塞的Netty中,卻可以應付自如。從容應對。Tomcat就是基於BIO的網路通信模式(Tomcat可以通過一定配置,改成非阻塞模式),而JBoss卻是基於非阻塞的NIO實現。

NIO的網路通信模式很強勁,但是上手卻一點都不容易。其中解決和牽扯到好多網路問題。如:網路延時,TCP的粘包/拆包,網路故障等一堆一堆的問題。而Netty呢,針對nio複雜的編程難題而進行一系列的封裝實現,提供給廣大開發者一套開源簡單,方便使用的API類庫,甚至青出於藍而勝於藍,甚至幾乎完美的解決CPU突然飆升到100%的bug :bugs.sun.com/bugdatabas (其實也沒有真正的解決,只是把復現的概率降到了最低而已)。

用Netty來實現IM實在太合適了。可以在最短的時間裡整出一套思路清晰,架構簡明的IM通信底層模型。提下需求,底層用JSON 字元串String進行通信,對象通過JSON序列化成JSON String。收到JSON數據後再反序列化成對象。

首先,我們先看伺服器是怎麼實現的。

private static final StringDecoder DECODER = new StringDecoder(); private static final StringEncoder ENCODER = new StringEncoder();... //boss線程監聽埠,worker線程負責數據讀寫 bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup(); //輔助啟動類 ServerBootstrap bootstrap = new ServerBootstrap(); try { //設置線程池 bootstrap.group(bossGroup, workerGroup); //設置socket工廠 bootstrap.channel(NioServerSocketChannel.class); bootstrap.handler(new LoggingHandler(LogLevel.INFO)); //設置管道工廠 bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //獲取管道 ChannelPipeline pipe = socketChannel.pipeline(); // Add the text line codec combination first, pipe.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); // the encoder and decoder are static as these are sharable //字元串編碼器 pipe.addLast(DECODER); //字元串解碼器 pipe.addLast(ENCODER); //業務處理類 pipe.addLast(new IMServerHandle()); } }); //綁定埠 // Bind and start to accept incoming connections. ChannelFuture f = bootstrap.bind(port).sync(); if (f.isSuccess()) { Log.debug("server start success... port: " + port + ", main work thread: " + Thread.currentThread().getId()); } ////等待服務端監聽埠關閉 // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { //優雅退出,釋放線程池資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }

以上是Netty伺服器啟動的代碼。其中需要注意childHandler方法。需要把我們要添加的業務處理handler來添加到這裡。通過ChannelPipeline 添加ChannelHandler。而處理字元串的就在IMServerHandle里實現。IMServerHandle繼承了SimpleChannelInboundHandler<T>類。其中泛型T就是要轉換成的對象。客戶端與伺服器端通信是本質上通過位元組碼byte[]通信的,而通過StringDecoder 和StringEncoder工具類對byte[]進行轉換,在IMServerHandle中獲取到String進行處理即可。

看下IMServerHandle的實現方式。

/*** * 面向IM通信操作的業務類 * @author xhj * */public class IMServerHandle extends SimpleChannelInboundHandler<String> { /** * user操作業務類 */ private UserBiz userBiz = new UserBiz(); /*** * 消息操作的業務類 */ private IMMessageBiz immessagebiz = new IMMessageBiz(); /*** * 處理接受到的String類型的JSON數據 */ @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(" get msg >> "+msg); //把JSON數據進行反序列化 Request req = JSON.parseObject(msg, Request.class); Response respon = new Response(); respon.setSendTime(System.currentTimeMillis()); //判斷是否是合法的請求 if(req != null ) { System.out.println("the req method >> "+req.getMethod()); //獲取操作類型 if(req.getMethod() == IMProtocol.LOGIN) { //獲取要操作的對象 User user = JSON.parseObject(req.getBody(),User.class); //設置返回數據的操作類型 respon.setMethod(IMProtocol.LOGIN); //執行業務操作 boolean bl = userBiz.login(user); if(bl) {//檢驗用戶有效 //設置響應數據 respon.setBody("login ok"); //設置狀態 respon.setStatus(0); //登錄成功將連接channel保存到Groups里 ChannelGroups.add(ctx.channel()); //將用戶的uname和channelId進行綁定,伺服器向指定用戶發送消息的時候需要用到uname和channelId ChannelGroups.putUser(user.getUname(), ctx.channel().id()); //發送廣播通知某人登錄成功了 userBiz.freshUserLoginStatus(user); } else {//用戶密碼錯誤 //設置錯誤描述 respon.setErrorStr("pwd-error"); //設置狀態描述碼 respon.setStatus(-1); } //將Response序列化為json字元串 msg = JSON.toJSONString(respon); //發送josn字元串數據,注意後面一定要加"
" ctx.writeAndFlush(msg+"
"); } else if(req.getMethod() == IMProtocol.SEND) { IMMessage immsg = JSON.parseObject(req.getBody(), IMMessage.class); immsg.setSendTime(System.currentTimeMillis()); c

通過IMServerHandle可以十分方便的處理獲取到的String字元串。處理完後,可以直接通過ChannelHandlerContext的writeAndFlush方法發送數據。

再看下Netty客戶端如何實現。

private BlockingQueue<Request> requests = new LinkedBlockingQueue<>(); /** * String字元串解碼器 */private static final StringDecoder DECODER = new StringDecoder(); /*** * String字元串編碼器 */private static final StringEncoder ENCODER = new StringEncoder(); /** * 客戶端業務處理Handler */ private IMClientHandler clientHandler ; /** * 添加發送請求Request * @param request */ public void addRequest(Request request) { try { requests.put(request); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 是否繼續進行運行 */ private boolean run = true; public void run() { //遠程IP String host = "172.20.10.7"; //埠號 int port = 10000; //工作線程 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //輔助啟動類 Bootstrap b = new Bootstrap(); // (1) //設置線程池 b.group(workerGroup); // (2) //設置socket工廠 不是ServerSocket而是Socket b.channel(NioSocketChannel.class); // (3) b.handler(new LoggingHandler(LogLevel.INFO)); //設置管道工廠 b.handler(new ChannelInitializer<SocketChannel>() { public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipe = ch.pipeline(); // Add the text line codec combination first, pipe.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); // the encoder and decoder are static as these are sharable //字元串解碼器 pipe.addLast(DECODER); //字元串編碼器 pipe.addLast(ENCODER); clientHandler = new IMClientHandler(); //IM業務處理類 pipe.addLast(clientHandler); } }); // Start the client. ChannelFuture f = b.connect(host, port).sync(); // (5) Channel ch = f.channel(); ChannelFuture lastWriteFuture = null; while(run) { //將要發送的Request轉化為JSON String類型 String line = JSON.toJSONString(requests.take()); if(line != null && line.length() > 0) {//判斷非空 // Sends the received line to the server. //發送數據到伺服器 lastWriteFuture = ch.writeAndFlush(line + "
"); } } // Wait until all messages are flushed before closing the channel. //關閉寫的埠 if (lastWriteFuture != null) { lastWriteFuture.sync(); } } catch(Exception ex){ ex.printStackTrace(); } finally { //優雅的關閉工作線程 workerGroup.shutdownGracefully(); } } /** * 增加消息監聽接受介面 * @param messgeReceivedListener */ public void addMessgeReceivedListener(MessageSender.MessgeReceivedListener messgeReceivedListener) { clientHandler.addMessgeReceivedListener(messgeReceivedListener); } /*** * 移除消息監聽介面 * @param messgeReceivedListener */ public void remove(MessageSender.MessgeReceivedListener messgeReceivedListener) { clientHandler.remove(messgeReceivedListener); }

Netty的client端實現和Server實現方式大同小異。比Server端要簡要些了。少一個NIOEventLoop。在Bootstrap 的handle方法中增加ChannelInitializer初始化監聽器,並增加了IMClientHandler的監聽操作。其中IMClientHandler具體處理伺服器返回的通信信息。

通過ChannelFuture 獲取Channel,通過Channel在一個循環里發送請求。如果消息隊列BlockingQueue非空的時候,獲取Request並發送。以上發送,如何接受數據呢?接受到的json被反序列化直接變成了對象Response,對Response進行處理即可。

定義了一個消息接受到的監聽介面。

public static interface MessgeReceivedListener { public void onMessageReceived(Response msg); public void onMessageDisconnect(); public void onMessageConnect();}

在介面onMessageReceived方法里直接對獲取成功的響應進行處理。

而伺服器端對某個客戶端進行發送操作,把Channel添加到ChannelGroup里,將uname和channelid對應起來。需要對某個用戶發送消息的時候通過uname獲取channelid,通過channelid從ChannelGroup里獲取channel,通過channel發送即可。

具體操作如下:

public void transformMessage(IMMessage message) { Channel channel = ChannelGroups.getChannel(ChannelGroups.getChannelId(message.getTo())); if(channel != null && channel.isActive()) { Response response = new Response(); response.setBody(JSON.toJSONString(message)); response.setStatus(0); response.setMethod(IMProtocol.REV); response.setSendTime(System.currentTimeMillis()); channel.writeAndFlush(JSON.toJSON(response)+"
"); } }ChannelGroups的代碼實現:public class ChannelGroups { private static final Map<String,ChannelId> userList = new ConcurrentHashMap(); private static final ChannelGroup CHANNEL_GROUP = new DefaultChannelGroup("ChannelGroups", GlobalEventExecutor.INSTANCE); public static void putUser(String uname,ChannelId id) { userList.put(uname,id); }

通過以上代碼解析應該對IM的通信模式有了比較全面的認識。具體實現過程可以下載源代碼進行查看。歡迎大家反饋提出問題。

github.com/sinxiao/Nett

運行效果圖。


推薦閱讀:

聽阿里雲工程師談談如何開發一個優秀的SDK
與 Xcode 相比,用 Adobe AIR/Flex做 iOS 開發有哪些優勢和局限?
無人機 SDK 平台開放後,有哪些 App 場景?

TAG:SDK |