From 264b9b072d1fb490dd9866b8c8b956f34a9c3918 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=87=95=E9=B9=8F?= Date: Fri, 4 Mar 2022 11:46:23 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=A0=E5=85=A5redis=EF=BC=8C=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E5=88=86=E5=B8=83=E5=BC=8F=E5=A4=9A=E8=8A=82=E7=82=B9?= =?UTF-8?q?=E9=83=A8=E7=BD=B2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.gradle | 2 +- src/main/java/com/aiprose/im/MsgVo.java | 23 -- .../com/aiprose/im/config/RedisConfig.java | 73 +++++ .../com/aiprose/im/config/RedisReceiver.java | 58 ++++ .../java/com/aiprose/im/socket/MsgVo.java | 37 +++ .../java/com/aiprose/im/socket/WebSocket.java | 110 ++++--- .../com/aiprose/im/socket/WebSocketPool.java | 49 ++++ .../java/com/aiprose/im/utils/RedisUtil.java | 273 ++++++++++++++++++ src/main/resources/application.yml | 5 + src/main/resources/static/lisi.html | 21 +- src/main/resources/static/wangwu.html | 3 +- src/main/resources/static/zhangsan.html | 20 +- 12 files changed, 600 insertions(+), 74 deletions(-) delete mode 100644 src/main/java/com/aiprose/im/MsgVo.java create mode 100644 src/main/java/com/aiprose/im/config/RedisConfig.java create mode 100644 src/main/java/com/aiprose/im/config/RedisReceiver.java create mode 100644 src/main/java/com/aiprose/im/socket/MsgVo.java create mode 100644 src/main/java/com/aiprose/im/socket/WebSocketPool.java create mode 100644 src/main/java/com/aiprose/im/utils/RedisUtil.java diff --git a/build.gradle b/build.gradle index 31b8c9c..505bed2 100644 --- a/build.gradle +++ b/build.gradle @@ -22,7 +22,7 @@ repositories { dependencies { // implementation 'org.springframework.boot:spring-boot-starter-data-jpa' -// implementation 'org.springframework.boot:spring-boot-starter-data-redis-reactive' + implementation 'org.springframework.boot:spring-boot-starter-data-redis-reactive' implementation 'org.springframework.boot:spring-boot-starter-webflux' implementation 'org.springframework.boot:spring-boot-starter-websocket' implementation 'com.alibaba:fastjson:1.2.79' diff --git a/src/main/java/com/aiprose/im/MsgVo.java b/src/main/java/com/aiprose/im/MsgVo.java deleted file mode 100644 index 76f0b07..0000000 --- a/src/main/java/com/aiprose/im/MsgVo.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.aiprose.im; - -import lombok.Builder; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.io.Serializable; - -/** - * @author yanpeng - * @version 1.0 - * @desc TODO - * @company 北京中经网软件有限公司 - * @date 2022/2/25 15:12 - */ -@Data -public class MsgVo implements Serializable { - private String sendUserId; - private String receiveUserId; - private String message; - // 1在线用户列表 2点对点消息 - private Integer msgType; -} diff --git a/src/main/java/com/aiprose/im/config/RedisConfig.java b/src/main/java/com/aiprose/im/config/RedisConfig.java new file mode 100644 index 0000000..1320633 --- /dev/null +++ b/src/main/java/com/aiprose/im/config/RedisConfig.java @@ -0,0 +1,73 @@ +package com.aiprose.im.config; + +import com.aiprose.im.socket.WebSocketPool; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.listener.PatternTopic; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; +import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; +import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; +import org.springframework.data.redis.serializer.StringRedisSerializer; + +/** + * @author yanpeng + * @version 1.0 + * @desc redis序列化配置,去掉key前面的前缀 + * @company 北京中经网软件有限公司 + * @date 2022/3/4 9:52 + */ +@Slf4j +@Configuration +public class RedisConfig { + /** + * redis消息监听器容器 + * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器 + * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理 + * @param connectionFactory + * @return + */ + @Bean + public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(connectionFactory); + // 监听msgToAll + container.addMessageListener(listenerAdapter, new PatternTopic(WebSocketPool.CHANNEL)); + log.info("Subscribed Redis channel: " + WebSocketPool.CHANNEL); + return container; + } + + @Bean + public MessageListenerAdapter messageListenerAdapter(RedisReceiver redisReceiver){ + return new MessageListenerAdapter(redisReceiver,"receiveMessage"); + } + + @Bean + public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) { + RedisTemplate redisTemplate = new RedisTemplate<>(); + redisTemplate.setConnectionFactory(redisConnectionFactory); + + // 使用Jackson2JsonRedisSerialize 替换默认序列化 + Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); + + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); + objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); + + jackson2JsonRedisSerializer.setObjectMapper(objectMapper); + + // 设置value的序列化规则和 key的序列化规则 + redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); + redisTemplate.setHashKeySerializer(new StringRedisSerializer()); + + redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); + redisTemplate.setKeySerializer(new StringRedisSerializer()); + redisTemplate.afterPropertiesSet(); + return redisTemplate; + } +} diff --git a/src/main/java/com/aiprose/im/config/RedisReceiver.java b/src/main/java/com/aiprose/im/config/RedisReceiver.java new file mode 100644 index 0000000..3634904 --- /dev/null +++ b/src/main/java/com/aiprose/im/config/RedisReceiver.java @@ -0,0 +1,58 @@ +package com.aiprose.im.config; + +import com.aiprose.im.socket.MsgVo; +import com.aiprose.im.socket.WebSocketPool; +import com.aiprose.im.utils.RedisUtil; +import com.alibaba.fastjson.JSON; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * @author yanpeng + * @version 1.0 + * @desc TODO + * @company 北京中经网软件有限公司 + * @date 2022/3/3 13:49 + */ +@Component +public class RedisReceiver { + + @Autowired + private WebSocketPool socketPool; + + @Autowired + private RedisUtil redisUtil; + + public void receiveMessage(String message){ + MsgVo msgVo = JSON.parseObject(message, MsgVo.class); + try { + // 点对点发送消息 + if (StringUtils.isNotBlank(msgVo.getReceiveUserId())) { // p2p + if(redisUtil.hasKeyHash(WebSocketPool.HASHKEY,msgVo.getReceiveUserId())){ + String sessionid = redisUtil.getHashIndex(WebSocketPool.HASHKEY, msgVo.getReceiveUserId()).toString(); + if(socketPool.has(sessionid)){ + socketPool.get(sessionid).getAsyncRemote().sendText(message); + } + } + } else { // 群发 + List sessions = redisUtil.getHash(WebSocketPool.HASHKEY); + for (Object item : sessions) { + //如果某个session不存在,分布式环境下可能session在其他的节点,继续给其他人广播 + try { + if(socketPool.has(item.toString())){ + socketPool.get(item.toString()).getAsyncRemote().sendText(message); + } + }catch (Exception e1){ + e1.printStackTrace(); + } + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + +} diff --git a/src/main/java/com/aiprose/im/socket/MsgVo.java b/src/main/java/com/aiprose/im/socket/MsgVo.java new file mode 100644 index 0000000..abc5a40 --- /dev/null +++ b/src/main/java/com/aiprose/im/socket/MsgVo.java @@ -0,0 +1,37 @@ +package com.aiprose.im.socket; + +import lombok.Data; + +import java.io.Serializable; + +/** + * @author yanpeng + * @version 1.0 + * @desc TODO + * @company 北京中经网软件有限公司 + * @date 2022/2/25 15:12 + */ +@Data +public class MsgVo implements Serializable { + /**发送用户ID*/ + private String sendUserId; + /**目标用户ID*/ + private String receiveUserId; + /**消息内容*/ + private String message; + /** + * 1在线用户列表 2点对点聊天消息 3返回消息发送状态 + */ + private Integer msgType; + + /** 0 成功 1失败 */ + private Integer msgStatus = 0; + + /** 单条消息标识,返回消息发送状态 */ + private String uuid; + + /**发送时间*/ + private Long sendDate; + + private String appId; +} diff --git a/src/main/java/com/aiprose/im/socket/WebSocket.java b/src/main/java/com/aiprose/im/socket/WebSocket.java index 715e9f9..8915218 100644 --- a/src/main/java/com/aiprose/im/socket/WebSocket.java +++ b/src/main/java/com/aiprose/im/socket/WebSocket.java @@ -1,19 +1,19 @@ package com.aiprose.im.socket; -import com.aiprose.im.MsgVo; +import com.aiprose.im.utils.RedisUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; -import org.springframework.web.bind.annotation.PathVariable; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; -import java.util.Map; +import java.util.Date; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; /** * @author yanpeng @@ -27,55 +27,99 @@ import java.util.concurrent.ConcurrentHashMap; @ServerEndpoint("/ws/imserver/{user}") public class WebSocket { - // TODO 分布式环境放到redis中 - private static Map sessions = new ConcurrentHashMap<>(); + private static WebSocketPool socketPool; + + private static StringRedisTemplate redisTemplate; + + private static RedisUtil redisUtil; + + @Autowired + public void setSocketPool(WebSocketPool socketPool) { + WebSocket.socketPool = socketPool; + } + + @Autowired + public void setRedisTemplate(StringRedisTemplate redisTemplate) { + WebSocket.redisTemplate = redisTemplate; + } + + @Autowired + public void setRedisUtil(RedisUtil redisUtil) { + WebSocket.redisUtil = redisUtil; + } @OnOpen - public void open(@PathParam("user") String user, Session session){ - sessions.put(user,session); - log.info(user+"连接成功"); + public void open(@PathParam("user") String user, Session session) { + socketPool.open(session); + redisUtil.setHash(WebSocketPool.HASHKEY, user, session.getId()); + log.info(user + "连接成功"); MsgVo msgVo = new MsgVo(); msgVo.setMsgType(1); - Set strings = sessions.keySet(); - String onlineUsers = JSON.toJSONString(strings); - msgVo.setMessage(onlineUsers); + Set onlineUsers = redisUtil.hashKeys(WebSocketPool.HASHKEY); + System.out.println(onlineUsers); + String onlineUserStr = JSON.toJSONString(onlineUsers); + msgVo.setMessage(onlineUserStr); sendMessage(msgVo); } @OnMessage - public void message(@PathParam("user") String user, String message){ - log.info("收到消息{}",message); - MsgVo msgVo = JSONObject.parseObject(message,MsgVo.class); - if(StringUtils.isBlank(msgVo.getReceiveUserId())){ + public void message(@PathParam("user") String user, String message) { + long timeMillis = System.currentTimeMillis(); + log.info("收到消息{}", message); + MsgVo msgVo = JSONObject.parseObject(message, MsgVo.class); + if (StringUtils.isBlank(msgVo.getReceiveUserId())) { return; } - if(StringUtils.isBlank(msgVo.getMessage())){ + if (StringUtils.isBlank(msgVo.getMessage())) { return; } - msgVo.setMsgType(2); - sendMessage(msgVo); - } + if (StringUtils.isBlank(msgVo.getUuid())) { + return; + } + MsgVo backMsg = new MsgVo(); + backMsg.setSendDate(timeMillis); + try { + // TODO 保存聊天记录到数据库中 + // 返回消息发送状态 + backMsg.setMsgType(3); + backMsg.setReceiveUserId(user); + backMsg.setUuid(msgVo.getUuid()); + backMsg.setMsgStatus(0); + sendMessage(backMsg); + } catch (Exception e) { + e.printStackTrace(); + backMsg.setMsgStatus(1); + sendMessage(backMsg); + } + + try { + // 发送消息给目标用户 + msgVo.setMsgType(2); + msgVo.setSendDate(timeMillis); + sendMessage(msgVo); + }catch (Exception e){ + e.printStackTrace(); + } + } @OnClose - public void close(@PathParam("user") String user){ - sessions.remove(user); - log.info(user+"关闭会话"); + public void close(@PathParam("user") String user) { + if (redisUtil.hasKeyHash(WebSocketPool.HASHKEY, user)) { + String key = redisUtil.getHashIndex(WebSocketPool.HASHKEY, user).toString(); + socketPool.close(key); + redisUtil.deleteHash(WebSocketPool.HASHKEY, user); + } + log.info(user + "关闭会话"); } @OnError - public void close(Session session,Throwable error){ + public void close(Session session, Throwable error) { + socketPool.close(session.getId()); log.error(error.getMessage()); } - private void sendMessage(MsgVo msgVo){ - // TODO 保存小心 - if(StringUtils.isNotBlank(msgVo.getReceiveUserId())){ // p2p - sessions.get(msgVo.getReceiveUserId()).getAsyncRemote().sendText(JSON.toJSONString(msgVo)); - }else{ // 群发 - for(Session item : sessions.values()){ - item.getAsyncRemote().sendText(JSON.toJSONString(msgVo)); - } - } + private void sendMessage(MsgVo msgVo) { + redisTemplate.convertAndSend(WebSocketPool.CHANNEL, JSONObject.toJSONString(msgVo)); } } diff --git a/src/main/java/com/aiprose/im/socket/WebSocketPool.java b/src/main/java/com/aiprose/im/socket/WebSocketPool.java new file mode 100644 index 0000000..80e4b51 --- /dev/null +++ b/src/main/java/com/aiprose/im/socket/WebSocketPool.java @@ -0,0 +1,49 @@ +package com.aiprose.im.socket; + +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.websocket.Session; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author yanpeng + * @version 1.0 + * @desc TODO + * @company 北京中经网软件有限公司 + * @date 2022/3/3 14:04 + */ +@Component +public class WebSocketPool { + public static final String HASHKEY = "imonline"; + public static final String CHANNEL = "channel:ceisim"; + + private static Map sessions = null; + + @PostConstruct + public void initMap(){ + if(sessions == null){ + sessions = new ConcurrentHashMap<>(); + } + } + + public void open(Session session){ + if(sessions == null){ + sessions = new ConcurrentHashMap<>(); + } + sessions.put(session.getId(),session); + } + + public void close(String key){ + sessions.remove(key); + } + + public Session get(String key){ + return sessions.get(key); + } + + public Boolean has(String key){ + return sessions.containsKey(key); + } +} diff --git a/src/main/java/com/aiprose/im/utils/RedisUtil.java b/src/main/java/com/aiprose/im/utils/RedisUtil.java new file mode 100644 index 0000000..340f82d --- /dev/null +++ b/src/main/java/com/aiprose/im/utils/RedisUtil.java @@ -0,0 +1,273 @@ +package com.aiprose.im.utils; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.HashOperations; +import org.springframework.data.redis.core.ListOperations; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.ValueOperations; +import org.springframework.data.redis.support.atomic.RedisAtomicLong; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +/** + * @author yanpeng + * @version 1.0 + * @desc TODO + * @company 北京中经网软件有限公司 + * @date 2022/3/3 10:35 + */ +@Component +public class RedisUtil { + + @Autowired + private RedisTemplate redisTemplate; + + @Resource(name="redisTemplate") + private ValueOperations valueOperations; + @Resource(name="redisTemplate") + private HashOperations hashOperations; + @Resource(name="redisTemplate") + private ListOperations listOperations; + + /** + * @param key + * @param value + * @param expireTime + * @Title: set + * @Description: set cache. + */ + public void set(String key, int value, Date expireTime) { + RedisAtomicLong counter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory()); + counter.set(value); + counter.expireAt(expireTime); + } + + /** + * @param key + * @param value + * @param timeout + * @param unit + * @Title: set + * @Description: set cache. + */ + public void set(String key, int value, long timeout, TimeUnit unit) { + RedisAtomicLong counter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory()); + counter.set(value); + counter.expire(timeout, unit); + } + + /** + * @param key + * @return + * @Title: generate + * @Description: Atomically increments by one the current value. + */ + public long generate(String key) { + RedisAtomicLong counter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory()); + return counter.incrementAndGet(); + } + + /** + * @param key + * @return + * @Title: generate + * @Description: Atomically increments by one the current value. + */ + public long generate(String key, Date expireTime) { + RedisAtomicLong counter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory()); + counter.expireAt(expireTime); + return counter.incrementAndGet(); + } + + /** + * @param key + * @param increment + * @return + * @Title: generate + * @Description: Atomically adds the given value to the current value. + */ + public long generate(String key, int increment) { + RedisAtomicLong counter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory()); + return counter.addAndGet(increment); + } + + /** + * @param key + * @param increment + * @param expireTime + * @return + * @Title: generate + * @Description: Atomically adds the given value to the current value. + */ + public long generate(String key, int increment, Date expireTime) { + RedisAtomicLong counter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory()); + counter.expireAt(expireTime); + return counter.addAndGet(increment); + } + + /** + * 保存对象 + * @param key + * @param value + */ + public void setObject(String key,Object value) { + valueOperations.set(key, value); + } + + /** + * 获取对象 + * @param key + * @return + */ + public Object getObect(String key) { + return valueOperations.get(key); + } + + /** + * 删除对象 + * @param key + */ + public void deleteObject(String key) { + redisTemplate.delete(key); + } + + /** + * 插入HaspMap + * @param key + * @param hashKey + * @param value + */ + public void setHash(String key, String hashKey, Object value) { + hashOperations.put(key, hashKey, value); + } + + /** + * 获取知道下标HaspMap + * @param key + * @param hashKey + * @return + */ + public Object getHashIndex(String key, String hashKey) { + return hashOperations.get(key, hashKey); + } + + /** + * 获取HaspMap + * @param key + * @return + */ + public List getHash(String key) { + return hashOperations.values(key); + } + + /** + * 判断是否存在某个key + * @param key + * @param hashKey + * @return + */ + public Boolean hasKeyHash(String key, String hashKey) { + return hashOperations.hasKey(key, hashKey); + } + + /** + * 获取Map + * @param key + * @return + */ + public Map entriesHasp(String key){ + return hashOperations.entries(key); + } + + /** + * 获取Map + * @param key + * @return + */ + public Set hashKeys(String key){ + return hashOperations.keys(key); + } + + /** + * 大小 + * @param key + * @return + */ + public long sizeHash(String key) { + return hashOperations.size(key); + } + + /** + * 删除HaspMap + * @param key + * @param value + */ + public void deleteHash(String key, Object value) { + hashOperations.delete(key, value); + } + + /** + * 数组里面添加元素 + * @param key + * @param value + */ + public void setLeftList(String key,Object value) + { + listOperations.leftPush(key, value); + } + + /** + * 全部添加 + * @param key + * @param list + */ + public void setleftAllList(String key, List list) { + listOperations.leftPushAll(key, list); + } + + /** + * 对指定下标的数组元素进行替换 + * @param key + * @param index + * @param value + */ + public void setList(String key,long index,Object value) { + listOperations.set(key, index, value); + } + + /** + * 数组大小 + * @param key + */ + public long sizeList(String key) { + return listOperations.size(key); + } + + /** + * 获取指定下标元素 + * @param key + * @param index + * @return + */ + public Object getListIndex(String key,long index) { + return listOperations.index(key, index); + } + + /** + * 获取list 指定开始-结束 + * @param key + * @param start 开始 + * @param end 结束 + * @return + */ + public Object getList(String key,long start, long end) { + return listOperations.range(key, start, end); + } + +} \ No newline at end of file diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index a443b5e..52f723d 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,2 +1,7 @@ server: port: 8999 +spring: + redis: + database: 5 + host: 192.168.0.120 + port: 6379 \ No newline at end of file diff --git a/src/main/resources/static/lisi.html b/src/main/resources/static/lisi.html index 490d287..fdbe01c 100644 --- a/src/main/resources/static/lisi.html +++ b/src/main/resources/static/lisi.html @@ -5,26 +5,31 @@ Title - - + + + \ No newline at end of file diff --git a/src/main/resources/static/wangwu.html b/src/main/resources/static/wangwu.html index b51fd7b..a765508 100644 --- a/src/main/resources/static/wangwu.html +++ b/src/main/resources/static/wangwu.html @@ -22,7 +22,8 @@ param.sendUserId = 'wangwu' param.receiveUserId = userid param.message = '来自 wangwu 消息' - param.msgType = 1 + param.msgType = 2 + param.uuid = new Date() * 1 ws.send(JSON.stringify(param)) } diff --git a/src/main/resources/static/zhangsan.html b/src/main/resources/static/zhangsan.html index 192e9e9..3e78d99 100644 --- a/src/main/resources/static/zhangsan.html +++ b/src/main/resources/static/zhangsan.html @@ -5,24 +5,28 @@ Title - - + +