浏览代码

加入redis,支持分布式多节点部署

master
燕鹏 2 年前
父节点
当前提交
264b9b072d
  1. 2
      build.gradle
  2. 23
      src/main/java/com/aiprose/im/MsgVo.java
  3. 73
      src/main/java/com/aiprose/im/config/RedisConfig.java
  4. 58
      src/main/java/com/aiprose/im/config/RedisReceiver.java
  5. 37
      src/main/java/com/aiprose/im/socket/MsgVo.java
  6. 110
      src/main/java/com/aiprose/im/socket/WebSocket.java
  7. 49
      src/main/java/com/aiprose/im/socket/WebSocketPool.java
  8. 273
      src/main/java/com/aiprose/im/utils/RedisUtil.java
  9. 5
      src/main/resources/application.yml
  10. 21
      src/main/resources/static/lisi.html
  11. 3
      src/main/resources/static/wangwu.html
  12. 20
      src/main/resources/static/zhangsan.html

2
build.gradle

@ -22,7 +22,7 @@ repositories {
dependencies {
// implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
// implementation 'org.springframework.boot:spring-boot-starter-data-redis-reactive'
implementation 'org.springframework.boot:spring-boot-starter-data-redis-reactive'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-starter-websocket'
implementation 'com.alibaba:fastjson:1.2.79'

23
src/main/java/com/aiprose/im/MsgVo.java

@ -1,23 +0,0 @@
package com.aiprose.im;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* @author yanpeng
* @version 1.0
* @desc TODO
* @company 北京中经网软件有限公司
* @date 2022/2/25 15:12
*/
@Data
public class MsgVo implements Serializable {
private String sendUserId;
private String receiveUserId;
private String message;
// 1在线用户列表 2点对点消息
private Integer msgType;
}

73
src/main/java/com/aiprose/im/config/RedisConfig.java

@ -0,0 +1,73 @@
package com.aiprose.im.config;
import com.aiprose.im.socket.WebSocketPool;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
/**
* @author yanpeng
* @version 1.0
* @desc redis序列化配置去掉key前面的前缀
* @company 北京中经网软件有限公司
* @date 2022/3/4 9:52
*/
@Slf4j
@Configuration
public class RedisConfig {
/**
* redis消息监听器容器
* 可以添加多个监听不同话题的redis监听器只需要把消息监听器和相应的消息订阅处理器绑定该消息监听器
* 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
* @param connectionFactory
* @return
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 监听msgToAll
container.addMessageListener(listenerAdapter, new PatternTopic(WebSocketPool.CHANNEL));
log.info("Subscribed Redis channel: " + WebSocketPool.CHANNEL);
return container;
}
@Bean
public MessageListenerAdapter messageListenerAdapter(RedisReceiver redisReceiver){
return new MessageListenerAdapter(redisReceiver,"receiveMessage");
}
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
// 使用Jackson2JsonRedisSerialize 替换默认序列化
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
// 设置value的序列化规则和 key的序列化规则
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}

58
src/main/java/com/aiprose/im/config/RedisReceiver.java

@ -0,0 +1,58 @@
package com.aiprose.im.config;
import com.aiprose.im.socket.MsgVo;
import com.aiprose.im.socket.WebSocketPool;
import com.aiprose.im.utils.RedisUtil;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author yanpeng
* @version 1.0
* @desc TODO
* @company 北京中经网软件有限公司
* @date 2022/3/3 13:49
*/
@Component
public class RedisReceiver {
@Autowired
private WebSocketPool socketPool;
@Autowired
private RedisUtil redisUtil;
public void receiveMessage(String message){
MsgVo msgVo = JSON.parseObject(message, MsgVo.class);
try {
// 点对点发送消息
if (StringUtils.isNotBlank(msgVo.getReceiveUserId())) { // p2p
if(redisUtil.hasKeyHash(WebSocketPool.HASHKEY,msgVo.getReceiveUserId())){
String sessionid = redisUtil.getHashIndex(WebSocketPool.HASHKEY, msgVo.getReceiveUserId()).toString();
if(socketPool.has(sessionid)){
socketPool.get(sessionid).getAsyncRemote().sendText(message);
}
}
} else { // 群发
List<Object> sessions = redisUtil.getHash(WebSocketPool.HASHKEY);
for (Object item : sessions) {
//如果某个session不存在,分布式环境下可能session在其他的节点,继续给其他人广播
try {
if(socketPool.has(item.toString())){
socketPool.get(item.toString()).getAsyncRemote().sendText(message);
}
}catch (Exception e1){
e1.printStackTrace();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}

37
src/main/java/com/aiprose/im/socket/MsgVo.java

@ -0,0 +1,37 @@
package com.aiprose.im.socket;
import lombok.Data;
import java.io.Serializable;
/**
* @author yanpeng
* @version 1.0
* @desc TODO
* @company 北京中经网软件有限公司
* @date 2022/2/25 15:12
*/
@Data
public class MsgVo implements Serializable {
/**发送用户ID*/
private String sendUserId;
/**目标用户ID*/
private String receiveUserId;
/**消息内容*/
private String message;
/**
* 1在线用户列表 2点对点聊天消息 3返回消息发送状态
*/
private Integer msgType;
/** 0 成功 1失败 */
private Integer msgStatus = 0;
/** 单条消息标识,返回消息发送状态 */
private String uuid;
/**发送时间*/
private Long sendDate;
private String appId;
}

110
src/main/java/com/aiprose/im/socket/WebSocket.java

@ -1,19 +1,19 @@
package com.aiprose.im.socket;
import com.aiprose.im.MsgVo;
import com.aiprose.im.utils.RedisUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.PathVariable;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.Map;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author yanpeng
@ -27,55 +27,99 @@ import java.util.concurrent.ConcurrentHashMap;
@ServerEndpoint("/ws/imserver/{user}")
public class WebSocket {
// TODO 分布式环境放到redis中
private static Map<String,Session> sessions = new ConcurrentHashMap<>();
private static WebSocketPool socketPool;
private static StringRedisTemplate redisTemplate;
private static RedisUtil redisUtil;
@Autowired
public void setSocketPool(WebSocketPool socketPool) {
WebSocket.socketPool = socketPool;
}
@Autowired
public void setRedisTemplate(StringRedisTemplate redisTemplate) {
WebSocket.redisTemplate = redisTemplate;
}
@Autowired
public void setRedisUtil(RedisUtil redisUtil) {
WebSocket.redisUtil = redisUtil;
}
@OnOpen
public void open(@PathParam("user") String user, Session session){
sessions.put(user,session);
log.info(user+"连接成功");
public void open(@PathParam("user") String user, Session session) {
socketPool.open(session);
redisUtil.setHash(WebSocketPool.HASHKEY, user, session.getId());
log.info(user + "连接成功");
MsgVo msgVo = new MsgVo();
msgVo.setMsgType(1);
Set<String> strings = sessions.keySet();
String onlineUsers = JSON.toJSONString(strings);
msgVo.setMessage(onlineUsers);
Set<String> onlineUsers = redisUtil.hashKeys(WebSocketPool.HASHKEY);
System.out.println(onlineUsers);
String onlineUserStr = JSON.toJSONString(onlineUsers);
msgVo.setMessage(onlineUserStr);
sendMessage(msgVo);
}
@OnMessage
public void message(@PathParam("user") String user, String message){
log.info("收到消息{}",message);
MsgVo msgVo = JSONObject.parseObject(message,MsgVo.class);
if(StringUtils.isBlank(msgVo.getReceiveUserId())){
public void message(@PathParam("user") String user, String message) {
long timeMillis = System.currentTimeMillis();
log.info("收到消息{}", message);
MsgVo msgVo = JSONObject.parseObject(message, MsgVo.class);
if (StringUtils.isBlank(msgVo.getReceiveUserId())) {
return;
}
if(StringUtils.isBlank(msgVo.getMessage())){
if (StringUtils.isBlank(msgVo.getMessage())) {
return;
}
msgVo.setMsgType(2);
sendMessage(msgVo);
}
if (StringUtils.isBlank(msgVo.getUuid())) {
return;
}
MsgVo backMsg = new MsgVo();
backMsg.setSendDate(timeMillis);
try {
// TODO 保存聊天记录到数据库中
// 返回消息发送状态
backMsg.setMsgType(3);
backMsg.setReceiveUserId(user);
backMsg.setUuid(msgVo.getUuid());
backMsg.setMsgStatus(0);
sendMessage(backMsg);
} catch (Exception e) {
e.printStackTrace();
backMsg.setMsgStatus(1);
sendMessage(backMsg);
}
try {
// 发送消息给目标用户
msgVo.setMsgType(2);
msgVo.setSendDate(timeMillis);
sendMessage(msgVo);
}catch (Exception e){
e.printStackTrace();
}
}
@OnClose
public void close(@PathParam("user") String user){
sessions.remove(user);
log.info(user+"关闭会话");
public void close(@PathParam("user") String user) {
if (redisUtil.hasKeyHash(WebSocketPool.HASHKEY, user)) {
String key = redisUtil.getHashIndex(WebSocketPool.HASHKEY, user).toString();
socketPool.close(key);
redisUtil.deleteHash(WebSocketPool.HASHKEY, user);
}
log.info(user + "关闭会话");
}
@OnError
public void close(Session session,Throwable error){
public void close(Session session, Throwable error) {
socketPool.close(session.getId());
log.error(error.getMessage());
}
private void sendMessage(MsgVo msgVo){
// TODO 保存小心
if(StringUtils.isNotBlank(msgVo.getReceiveUserId())){ // p2p
sessions.get(msgVo.getReceiveUserId()).getAsyncRemote().sendText(JSON.toJSONString(msgVo));
}else{ // 群发
for(Session item : sessions.values()){
item.getAsyncRemote().sendText(JSON.toJSONString(msgVo));
}
}
private void sendMessage(MsgVo msgVo) {
redisTemplate.convertAndSend(WebSocketPool.CHANNEL, JSONObject.toJSONString(msgVo));
}
}

49
src/main/java/com/aiprose/im/socket/WebSocketPool.java

@ -0,0 +1,49 @@
package com.aiprose.im.socket;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.websocket.Session;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author yanpeng
* @version 1.0
* @desc TODO
* @company 北京中经网软件有限公司
* @date 2022/3/3 14:04
*/
@Component
public class WebSocketPool {
public static final String HASHKEY = "imonline";
public static final String CHANNEL = "channel:ceisim";
private static Map<String, Session> sessions = null;
@PostConstruct
public void initMap(){
if(sessions == null){
sessions = new ConcurrentHashMap<>();
}
}
public void open(Session session){
if(sessions == null){
sessions = new ConcurrentHashMap<>();
}
sessions.put(session.getId(),session);
}
public void close(String key){
sessions.remove(key);
}
public Session get(String key){
return sessions.get(key);
}
public Boolean has(String key){
return sessions.containsKey(key);
}
}

273
src/main/java/com/aiprose/im/utils/RedisUtil.java

@ -0,0 +1,273 @@
package com.aiprose.im.utils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.ListOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.data.redis.support.atomic.RedisAtomicLong;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* @author yanpeng
* @version 1.0
* @desc TODO
* @company 北京中经网软件有限公司
* @date 2022/3/3 10:35
*/
@Component
public class RedisUtil {
@Autowired
private RedisTemplate redisTemplate;
@Resource(name="redisTemplate")
private ValueOperations<String, Object> valueOperations;
@Resource(name="redisTemplate")
private HashOperations<String, String, Object> hashOperations;
@Resource(name="redisTemplate")
private ListOperations<Object, Object> listOperations;
/**
* @param key
* @param value
* @param expireTime
* @Title: set
* @Description: set cache.
*/
public void set(String key, int value, Date expireTime) {
RedisAtomicLong counter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory());
counter.set(value);
counter.expireAt(expireTime);
}
/**
* @param key
* @param value
* @param timeout
* @param unit
* @Title: set
* @Description: set cache.
*/
public void set(String key, int value, long timeout, TimeUnit unit) {
RedisAtomicLong counter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory());
counter.set(value);
counter.expire(timeout, unit);
}
/**
* @param key
* @return
* @Title: generate
* @Description: Atomically increments by one the current value.
*/
public long generate(String key) {
RedisAtomicLong counter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory());
return counter.incrementAndGet();
}
/**
* @param key
* @return
* @Title: generate
* @Description: Atomically increments by one the current value.
*/
public long generate(String key, Date expireTime) {
RedisAtomicLong counter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory());
counter.expireAt(expireTime);
return counter.incrementAndGet();
}
/**
* @param key
* @param increment
* @return
* @Title: generate
* @Description: Atomically adds the given value to the current value.
*/
public long generate(String key, int increment) {
RedisAtomicLong counter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory());
return counter.addAndGet(increment);
}
/**
* @param key
* @param increment
* @param expireTime
* @return
* @Title: generate
* @Description: Atomically adds the given value to the current value.
*/
public long generate(String key, int increment, Date expireTime) {
RedisAtomicLong counter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory());
counter.expireAt(expireTime);
return counter.addAndGet(increment);
}
/**
* 保存对象
* @param key
* @param value
*/
public void setObject(String key,Object value) {
valueOperations.set(key, value);
}
/**
* 获取对象
* @param key
* @return
*/
public Object getObect(String key) {
return valueOperations.get(key);
}
/**
* 删除对象
* @param key
*/
public void deleteObject(String key) {
redisTemplate.delete(key);
}
/**
* 插入HaspMap
* @param key
* @param hashKey
* @param value
*/
public void setHash(String key, String hashKey, Object value) {
hashOperations.put(key, hashKey, value);
}
/**
* 获取知道下标HaspMap
* @param key
* @param hashKey
* @return
*/
public Object getHashIndex(String key, String hashKey) {
return hashOperations.get(key, hashKey);
}
/**
* 获取HaspMap
* @param key
* @return
*/
public List<Object> getHash(String key) {
return hashOperations.values(key);
}
/**
* 判断是否存在某个key
* @param key
* @param hashKey
* @return
*/
public Boolean hasKeyHash(String key, String hashKey) {
return hashOperations.hasKey(key, hashKey);
}
/**
* 获取Map
* @param key
* @return
*/
public Map<String, Object> entriesHasp(String key){
return hashOperations.entries(key);
}
/**
* 获取Map
* @param key
* @return
*/
public Set<String> hashKeys(String key){
return hashOperations.keys(key);
}
/**
* 大小
* @param key
* @return
*/
public long sizeHash(String key) {
return hashOperations.size(key);
}
/**
* 删除HaspMap
* @param key
* @param value
*/
public void deleteHash(String key, Object value) {
hashOperations.delete(key, value);
}
/**
* 数组里面添加元素
* @param key
* @param value
*/
public void setLeftList(String key,Object value)
{
listOperations.leftPush(key, value);
}
/**
* 全部添加
* @param key
* @param list
*/
public void setleftAllList(String key, List list) {
listOperations.leftPushAll(key, list);
}
/**
* 对指定下标的数组元素进行替换
* @param key
* @param index
* @param value
*/
public void setList(String key,long index,Object value) {
listOperations.set(key, index, value);
}
/**
* 数组大小
* @param key
*/
public long sizeList(String key) {
return listOperations.size(key);
}
/**
* 获取指定下标元素
* @param key
* @param index
* @return
*/
public Object getListIndex(String key,long index) {
return listOperations.index(key, index);
}
/**
* 获取list 指定开始-结束
* @param key
* @param start 开始
* @param end 结束
* @return
*/
public Object getList(String key,long start, long end) {
return listOperations.range(key, start, end);
}
}

5
src/main/resources/application.yml

@ -1,2 +1,7 @@
server:
port: 8999
spring:
redis:
database: 5
host: 192.168.0.120
port: 6379

21
src/main/resources/static/lisi.html

@ -5,26 +5,31 @@
<title>Title</title>
</head>
<body>
<button onclick="sendHandler('zhangsan')">给zhangsan发送消息</button>
<button onclick="sendHandler('wangwu')">给wangwu发送消息</button>
<button onclick="sendHandler('zhangsan')">给zhangsan发送消息</button>
<button onclick="sendHandler('wangwu')">给wangwu发送消息</button>
<script>
// const token = ""
// var ws = new WebSocket("ws://192.168.0.70:8801/ws/imserver?token=" + token)
var ws = new WebSocket("ws://192.168.0.70:8999/ws/imserver/lisi")
ws.open = function (msg){
console.log("open:"+msg)
ws.open = function (msg) {
console.log("open:" + msg)
}
ws.onmessage = function (msg){
console.log("message:"+msg)
ws.onmessage = function (msg) {
console.log("message:" + msg)
}
function sendHandler(userid){
function sendHandler(userid) {
var param = {}
param.sendUserId = 'lisi'
param.receiveUserId = userid
param.message = '来自 lisi 消息'
param.msgType = 1
param.msgType = 2
param.uuid = new Date() * 1
ws.send(JSON.stringify(param))
}
</script>
</body>
</body>
</html>

3
src/main/resources/static/wangwu.html

@ -22,7 +22,8 @@
param.sendUserId = 'wangwu'
param.receiveUserId = userid
param.message = '来自 wangwu 消息'
param.msgType = 1
param.msgType = 2
param.uuid = new Date() * 1
ws.send(JSON.stringify(param))
}
</script>

20
src/main/resources/static/zhangsan.html

@ -5,24 +5,28 @@
<title>Title</title>
</head>
<body>
<button onclick="sendHandler('lisi')">给lisi发送消息</button>
<button onclick="sendHandler('wangwu')">给wangwu发送消息</button>
<button onclick="sendHandler('lisi')">给lisi发送消息</button>
<button onclick="sendHandler('wangwu')">给wangwu发送消息</button>
<script>
// const token= ""
// var ws = new WebSocket("ws://192.168.0.70:8801/ws/imserver?token="+token)
var ws = new WebSocket("ws://192.168.0.70:8999/ws/imserver/zhangsan")
ws.open = function (msg){
console.log("open:"+msg)
ws.open = function (msg) {
console.log("open:" + msg)
}
ws.onmessage = function (msg){
console.log("message:"+msg)
ws.onmessage = function (msg) {
console.log("message:" + msg)
}
function sendHandler(userid){
function sendHandler(userid) {
var param = {}
param.sendUserId = 'zhangsan'
param.receiveUserId = userid
param.message = '来自 zhangsan 消息'
param.msgType = 1
param.msgType = 2
param.uuid = new Date() * 1
ws.send(JSON.stringify(param))
}
</script>

正在加载...
取消
保存