示例下载: test-websocket.tar
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>2.7.1</version>
</dependency>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
map $http_upgrade $connection_upgrade { default upgrade; '' close; } server { location = /prod-api/robot-work/endpoint { proxy_pass http://192.168.140.1:8080/robot-work/endpoint; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $connection_upgrade; proxy_set_header Host $host; } } |
前后端分离使用的是token, 而不是传统的session模式, 所以需要实现HttpServletRequest#getUserPrincipal 让websocket在建立连接的时候能获取到凭证
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
@Component @Slf4j public class RequestWrapperFilter implements Filter { @Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { EbotServletRequestWrapper _request = new EbotServletRequestWrapper((HttpServletRequest) request); chain.doFilter(_request, response); } } @Slf4j public class MyServletRequestWrapper extends HttpServletRequestWrapper { public MyServletRequestWrapper(HttpServletRequest request) { super(request); } @Override public Principal getUserPrincipal() { HttpServletRequest request = (HttpServletRequest)getRequest(); String token = request.getHeader("token"); if(token != null && !token.isEmpty()) { Principal principal = new User(token); log.info(principal.getName()); return principal; } return null; } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
@Configuration @Slf4j @EnableWebSocketMessageBroker public class Config implements WebSocketMessageBrokerConfigurer { @Autowired private ApplicationContext context; @Override public void configureMessageBroker(MessageBrokerRegistry config) { config.enableSimpleBroker("/topic", "/queue") //心跳设置 .setHeartbeatValue(new long[]{0, 20000}) .setTaskScheduler(new DefaultManagedTaskScheduler()); config.setApplicationDestinationPrefixes("/app"); } @Override public void configureWebSocketTransport(WebSocketTransportRegistration registry) { registry.addDecoratorFactory( handler -> { //自定义的装饰器,用于跟踪那些用户建立了websocket session WebSocketHandler decorator = new TrackingUserSessionDecorator(handler); ConfigurableListableBeanFactory beanFactory = ((ConfigurableApplicationContext) context).getBeanFactory(); beanFactory.registerSingleton("trackingUserSessionDecorator", decorator); return decorator; }); } @Override public void registerStompEndpoints(StompEndpointRegistry registry) { //网关代理一会被当做跨域处理 registry.addEndpoint("/endpoint") .setAllowedOrigins("*"); } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
@Slf4j public class TrackingUserSessionDecorator extends WebSocketHandlerDecorator { /** * 统计有活动session 的用户 */ private Map<String, AtomicInteger> userSessionCountMap = new HashMap<String, AtomicInteger>(); public TrackingUserSessionDecorator(WebSocketHandler delegate) { super(delegate); } @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { if(session.getPrincipal() != null) { String username = session.getPrincipal().getName(); userSessionCountMap.putIfAbsent(username, new AtomicInteger(0)); userSessionCountMap.get(username).incrementAndGet(); } super.afterConnectionEstablished(session); } @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { if(session.getPrincipal() != null) { String username = session.getPrincipal().getName(); int refCount = userSessionCountMap.getOrDefault(username, new AtomicInteger(0)).decrementAndGet(); } super.afterConnectionClosed(session, closeStatus); } } |
websocket.adoc (有可能是github的路径映射改变了,blob路径不可用)
Posted in: spring practise
Comments are closed.