Spring Websocket实时统计在线用户数

GitHub源码地址

概述

最近有个需求,需要实时统计在线的人数,由于该项目并没用到实时通信,也只有这里需要实时统计在线,没必要再搭建一套实时通信服务,所以直接整合的Spring Websocket。下面的demo是项目中的简化版,使用Spring Boot搭建的环境。效果如图所示:

网上对Websocket的讲解有很多了,这里就不在赘述。简单粗暴,直接开干。

代码实现

pom文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>

<dependency>
<groupId>org.webjars</groupId>
<artifactId>jquery</artifactId>
<version>3.1.0</version>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.56</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

Websocket工具类

先写一个工具类,用来处理在线人数的统计以及WebSocketSession的管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
/**
* @Author: Xiuming Lee
* @Describe: 统计人数相关工具类
*/
@Slf4j
public class WebSocketCountUtil {
/**
* 静态变量,用来记录当前在线连接数。即返回前台的人数。
*/
private static Long onlineCount = 0L;
/**
* 用来存放普通用户ID。
*/
private static CopyOnWriteArraySet<String> userIdSet = new CopyOnWriteArraySet<>();
/**
* 用来存放普通用户Session和id。
*/
private static CopyOnWriteArraySet<SessionEntity> usersSessionEntitySet = new CopyOnWriteArraySet<>();
/**
* 用来存放管理员Session和id。
*/
private static CopyOnWriteArraySet<SessionEntity> adminSessionEntitySet = new CopyOnWriteArraySet<>();

/**
* 在线人数增加
*/
public static void onlineCountAdd(WebSocketSession session, String userId) {
userIdSet.add(userId);
SessionEntity sessionEntity = new SessionEntity(userId, session);
usersSessionEntitySet.add(sessionEntity);
onlineCountChangeIf();
}

/**
* 在线人数减少
*/
public static void onlineCountReduce(WebSocketSession session) {
usersSessionEntitySet.forEach(sessionEntity -> {
if (sessionEntity.getSession().getId().equals(session.getId())) {
usersSessionEntitySet.remove(sessionEntity);
userIdSet.remove(sessionEntity.getUserId());
onlineCountChangeIf();
}
});
}

/**
* admin用户增加
*/
public static void adminSessionAdd(WebSocketSession session, String adminUserId) {
SessionEntity sessionEntity = new SessionEntity(adminUserId, session);
adminSessionEntitySet.add(sessionEntity);
}

/**
* admin用户减少
*/
public static void adminSessionReduce(WebSocketSession session) {
log.info("admin用户减少");
log.info(adminSessionEntitySet.toString());
adminSessionEntitySet.forEach(sessionEntity -> {
if (sessionEntity.getSession().getId().equals(session.getId())) {
adminSessionEntitySet.remove(sessionEntity);
log.info(adminSessionEntitySet.toString());
}
});
}

/**
* 向admin推送消息
*/
public static void setMessageToAdmin() {
adminSessionEntitySet.forEach(sessionEntity -> {
MessageEntity messageEntity = new MessageEntity("2", String.valueOf(getOnlineCount()));
String messageString = JSONObject.toJSONString(messageEntity);
try {
sessionEntity.getSession().sendMessage(new TextMessage(messageString));
} catch (IOException e) {
e.printStackTrace();
log.error("发送信息失败-->{}", e.getMessage());
}
});

}

public static Long getOnlineCount() {
return onlineCount;
}

/**
* 在线人数是否改变
*/
private static void onlineCountChangeIf() {
Long size = Long.valueOf(userIdSet.size());
if (onlineCount.equals(size)) {
// 在线人数没有变
return;
}
// 在线人数变了
onlineCount = size;
// 向admin发送消息
setMessageToAdmin();
}
}

WebSocketHandler编写

WebSocketHandler是消息和生命周期事件的处理程序。下面编写了两个WebSocketHandler分别处理普通用户的连接和管理员的连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* @Author: Xiuming Lee
* @Describe: 普通用户登录成功后,连接websocket处理的Handler
*/
@Slf4j
public class UserConnectionHandler implements WebSocketHandler {
/**
* 建立连接后触发的回调
*/
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
log.warn("用户连接成功->{}",session);
}

/**
* 收到消息时触发的回调
*/
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
TextMessage textMessage = (TextMessage)message;
log.warn("收到消息->{}",textMessage);
MessageEntity messageEntity = JSONObject.parseObject(textMessage.getPayload(), MessageEntity.class);
WebSocketCountUtil.onlineCountAdd(session,messageEntity.getContent());
}

/**
* 传输消息出错时触发的回调
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
System.out.println("传输消息出错"+"afterConnectionClosed");
System.out.println(session);
}

/**
* 断开连接后触发的回调
*/
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
log.warn("断开连接->{}",session);
WebSocketCountUtil.onlineCountReduce(session);
}

/**
* 是否处理分片消息
*/
@Override
public boolean supportsPartialMessages() {
return false;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* @Author: Xiuming Lee
* @Describe: 管理员连接处理的Handler,管理员查看实时用户情况
*/
@Slf4j
public class AdminConnectionHandler implements WebSocketHandler {
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
log.warn("管理员连接成功->{}",session);
}

@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
TextMessage textMessage = (TextMessage)message;
log.warn("收到消息->{}",textMessage);
System.out.println(session);
MessageEntity messageEntity = JSONObject.parseObject(textMessage.getPayload(), MessageEntity.class);
WebSocketCountUtil.adminSessionAdd(session,messageEntity.getContent());
}

@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
System.out.println("传输消息出错"+"afterConnectionClosed");
System.out.println(session);

}

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
log.warn("管理员断开连接->{}",session);
WebSocketCountUtil.adminSessionReduce(session);
}

@Override
public boolean supportsPartialMessages() {
return false;
}
}

WebSocket配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* @Author: Xiuming Lee
* @Describe:WebSocket配置文件
*/
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(new UserConnectionHandler(),"/ws/userCount") // 普通用户websocket的连接路径
.addInterceptors(new HttpSessionHandshakeInterceptor())
.setAllowedOrigins("*");
registry.addHandler(new AdminConnectionHandler(),"/ws/admin") // 管理员websocket的连接路径
.addInterceptors(new HttpSessionHandshakeInterceptor())
.setAllowedOrigins("*");
}

/**
* 以下是配置WebSocket的配置属性,例如消息缓冲区大小,空闲超时等。
* @return
*/
@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
container.setMaxTextMessageBufferSize(8192);
container.setMaxBinaryMessageBufferSize(8192);
container.setMaxSessionIdleTimeout(300000L);
return container;
}

}

结语

以上核心代码已贴出,完整代码参考GitHub源码

Xiuming Lee wechat
欢迎您扫一扫上面的微信公众号订阅,更多惊喜等着您哦!
-------------本文结束感谢您的阅读-------------