本篇文章小编给大家分享一下Java实战之用springboot+netty实现简单的一对一聊天代码示例,文章代码介绍的很详细,小编觉得挺不错的,现在分享给大家供大家参考,有需要的小伙伴们可以来看看。
一、引入pom
4.0.0 com.chat.info chat-server 1.0-SNAPSHOT org.springframework.boot spring-boot-starter-parent 2.1.4.RELEASE UTF-8 1.8 org.springframework.boot spring-boot-starter-web io.netty netty-all 4.1.33.Final org.projectlombok lombok com.alibaba fastjson 1.2.56 org.springframework.boot spring-boot-starter-thymeleaf org.springframework.boot spring-boot-maven-plugin
二、创建netty服务端
package com.chat.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @Component @Slf4j public class ChatServer { private EventLoopGroup bossGroup; private EventLoopGroup workGroup; private void run() throws Exception { log.info("开始启动聊天服务器"); bossGroup = new NioEventLoopGroup(1); workGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChatServerInitializer()); //启动服务器 ChannelFuture channelFuture = serverBootstrap.bind(7000).sync(); log.info("开始启动聊天服务器结束"); channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } /** * 初始化服务器 */ @PostConstruct() public void init() { new Thread(() -> { try { run(); } catch (Exception e) { e.printStackTrace(); } }).start(); } @PreDestroy public void destroy() throws InterruptedException { if (bossGroup != null) { bossGroup.shutdownGracefully().sync(); } if (workGroup != null) { workGroup.shutdownGracefully().sync(); } } }
package com.chat.server; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; public class ChatServerInitializer extends ChannelInitializer{ @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //使用http的编码器和解码器 pipeline.addLast(new HttpServerCodec()); //添加块处理器 pipeline.addLast(new ChunkedWriteHandler()); pipeline.addLast(new HttpObjectAggregator(8192)); pipeline.addLast(new WebSocketServerProtocolHandler("/chat")); //自定义handler,处理业务逻辑 pipeline.addLast(new ChatServerHandler()); } }
package com.chat.server; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.chat.config.ChatConfig; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.AttributeKey; import lombok.extern.slf4j.Slf4j; import java.time.LocalDateTime; @Slf4j public class ChatServerHandler extends SimpleChannelInboundHandler{ @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception { //传过来的是json字符串 String text = textWebSocketFrame.text(); JSONObject jsonObject = JSON.parseObject(text); //获取到发送人的用户id Object msg = jsonObject.get("msg"); String userId = (String) jsonObject.get("userId"); Channel channel = channelHandlerContext.channel(); if (msg == null) { //说明是第一次登录上来连接,还没有开始进行聊天,将uid加到map里面 register(userId, channel); } else { //有消息了,开始聊天了 sendMsg(msg, userId); } } /** * 第一次登录进来 * * @param userId * @param channel */ private void register(String userId, Channel channel) { if (!ChatConfig.concurrentHashMap.containsKey(userId)) { //没有指定的userId ChatConfig.concurrentHashMap.put(userId, channel); // 将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户ID AttributeKey key = AttributeKey.valueOf("userId"); channel.attr(key).setIfAbsent(userId); } } /** * 开发发送消息,进行聊天 * * @param msg * @param userId */ private void sendMsg(Object msg, String userId) { Channel channel1 = ChatConfig.concurrentHashMap.get(userId); if (channel1 != null) { channel1.writeAndFlush(new TextWebSocketFrame("服务器时间" + LocalDateTime.now() + " " + msg)); } } /** * 一旦客户端连接上来,该方法被执行 * * @param ctx * @throws Exception */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { log.info("handlerAdded 被调用" + ctx.channel().id().asLongText()); } /** * 断开连接,需要移除用户 * * @param ctx * @throws Exception */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { removeUserId(ctx); } /** * 移除用户 * * @param ctx */ private void removeUserId(ChannelHandlerContext ctx) { Channel channel = ctx.channel(); AttributeKey key = AttributeKey.valueOf("userId"); String userId = channel.attr(key).get(); ChatConfig.concurrentHashMap.remove(userId); log.info("用户下线,userId:{}", userId); } /** * 处理移除,关闭通道 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
三、存储用户channel的map
package com.chat.config; import io.netty.channel.Channel; import java.util.concurrent.ConcurrentHashMap; public class ChatConfig { public static ConcurrentHashMapconcurrentHashMap = new ConcurrentHashMap(); }
四、客户端html
Title
五、controller模拟用户登录以及要发送信息给谁
package com.chat.controller; import com.chat.config.ChatConfig; import io.netty.channel.Channel; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import org.springframework.stereotype.Controller; import org.springframework.ui.Model; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; @Controller public class ChatController { @GetMapping("login") public String login(Model model, @RequestParam("userId") String userId, @RequestParam("sendId") String sendId) { model.addAttribute("userId", userId); model.addAttribute("sendId", sendId); return "chat"; } @GetMapping("sendMsg") public String login(@RequestParam("sendId") String sendId) throws InterruptedException { while (true) { Channel channel = ChatConfig.concurrentHashMap.get(sendId); if (channel != null) { channel.writeAndFlush(new TextWebSocketFrame("test")); Thread.sleep(1000); } } } }
六、测试
登录成功要发消息给bbb
登录成功要发消息给aaa