日本黄色一级经典视频|伊人久久精品视频|亚洲黄色色周成人视频九九九|av免费网址黄色小短片|黄色Av无码亚洲成年人|亚洲1区2区3区无码|真人黄片免费观看|无码一级小说欧美日免费三级|日韩中文字幕91在线看|精品久久久无码中文字幕边打电话

當前位置:首頁 > 單片機 > 架構師社區(qū)
[導讀]-???問題起因??-最近做項目時遇到了需要多用戶之間通信的問題,涉及到了WebSocket握手請求,以及集群中WebSocketSession共享的問題。期間我經(jīng)過了幾天的研究,總結出了幾個實現(xiàn)分布式WebSocket集群的辦法,從zuul到springcloudgateway...



-? ? ?問題起因? ? -

最近做項目時遇到了需要多用戶之間通信的問題,涉及到了WebSocket握手請求,以及集群中WebSocket Session共享的問題。

期間我經(jīng)過了幾天的研究,總結出了幾個實現(xiàn)分布式WebSocket集群的辦法,從zuul到spring cloud gateway的不同嘗試,總結出了這篇文章,希望能幫助到某些人,并且能一起分享這方面的想法與研究。

以下是我的場景描述

  • 資源:4臺服務器。其中只有一臺服務器具備ssl認證域名,一臺redis mysql服務器,兩臺應用服務器(集群)
  • 應用發(fā)布限制條件:由于場景需要,應用場所需要ssl認證的域名才能發(fā)布。因此ssl認證的域名服務器用來當api網(wǎng)關,負責https請求與wss(安全認證的ws)連接。俗稱https卸載,用戶請求https域名服務器(eg:https://oiscircle.com/xxx),但真實訪問到的是http ip地址的形式。只要網(wǎng)關配置高,能handle多個應用
  • 需求:用戶登錄應用,需要與服務器建立wss連接,不同角色之間可以單發(fā)消息,也可以群發(fā)消息
  • 集群中的應用服務類型:每個集群實例都負責http無狀態(tài)請求服務與ws長連接服務



-? ? ?系統(tǒng)架構圖? ? -


在我的實現(xiàn)里,每個應用服務器都負責http and ws請求,其實也可以將ws請求建立的聊天模型單獨成立為一個模塊。從分布式的角度來看,這兩種實現(xiàn)類型差不多,但從實現(xiàn)方便性來說,一個應用服務http ws請求的方式更為方便。下文會有解釋。

本文涉及的技術棧

  • Eureka 服務發(fā)現(xiàn)與注冊
  • Redis Session共享
  • Redis 消息訂閱
  • Spring Boot
  • Zuul 網(wǎng)關
  • Spring Cloud Gateway 網(wǎng)關
  • Spring WebSocket 處理長連接
  • Ribbon 負載均衡
  • Netty 多協(xié)議NIO網(wǎng)絡通信框架
  • Consistent Hash 一致性哈希算法
相信能走到這一步的人都了解過我上面列舉的技術棧了,如果還沒有,可以先去網(wǎng)上找找入門教程了解一下。下面的內(nèi)容都與上述技術相關,題主默認大家都了解過了...


-? ? ?技術可行性分析? ? -

下面我將描述session特性,以及根據(jù)這些特性列舉出n個解決分布式架構中處理ws請求的集群方案

WebSocketSession與HttpSession

在Spring所集成的WebSocket里面,每個ws連接都有一個對應的session:WebSocketSession,在Spring WebSocket中,我們建立ws連接之后可以通過類似這樣的方式進行與客戶端的通信:

protected?void?handleTextMessage(WebSocketSession?session,?TextMessage?message)?{
???System.out.println("服務器接收到的消息:?" ?message?);
???//send?message?to?client
???session.sendMessage(new?TextMessage("message"));
}
那么問題來了:ws的session無法序列化到redis,因此在集群中,我們無法將所有WebSocketSession都緩存到redis進行session共享。每臺服務器都有各自的session。于此相反的是HttpSession,redis可以支持httpsession共享,但是目前沒有websocket session共享的方案,因此走redis websocket session共享這條路是行不通的。

有的人可能會想:我可不可以將sessin關鍵信息緩存到redis,集群中的服務器從redis拿取session關鍵信息然后重新構建websocket session...我只想說這種方法如果有人能試出來,請告訴我一聲...

以上便是websocket session與http session共享的區(qū)別,總的來說就是http session共享已經(jīng)有解決方案了,而且很簡單,只要引入相關依賴:spring-session-data-redisspring-boot-starter-redis,大家可以從網(wǎng)上找個demo玩一下就知道怎么做了。而websocket session共享的方案由于websocket底層實現(xiàn)的方式,我們無法做到真正的websocket session共享。


-? ? ?解決方案的演變??? -

Netty與Spring WebSocket

剛開始的時候,我嘗試著用netty實現(xiàn)了websocket服務端的搭建。在netty里面,并沒有websocket session這樣的概念,與其類似的是channel,每一個客戶端連接都代表一個channel。前端的ws請求通過netty監(jiān)聽的端口,走websocket協(xié)議進行ws握手連接之后,通過一些列的handler(責鏈模式)進行消息處理。與websocket session類似地,服務端在連接建立后有一個channel,我們可以通過channel進行與客戶端的通信。

???/**
????*?TODO?根據(jù)服務器傳進來的id,分配到不同的group
????*/

???private?static?final?ChannelGroup?GROUP?=?new?DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
?
???@Override
???protected?void?channelRead0(ChannelHandlerContext?ctx,?TextWebSocketFrame?msg)?throws?Exception?{
???????//retain增加引用計數(shù),防止接下來的調(diào)用引用失效
???????System.out.println("服務器接收到來自?"? ?ctx.channel().id()? ?"?的消息:?"? ?msg.text());
???????//將消息發(fā)送給group里面的所有channel,也就是發(fā)送消息給客戶端
???????GROUP.writeAndFlush(msg.retain());
???}
那么,服務端用netty還是用spring websocket?以下我將從幾個方面列舉這兩種實現(xiàn)方式的優(yōu)缺點。


-? ? ?使用 netty 實現(xiàn) websocket??? -


玩過netty的人都知道netty是的線程模型是nio模型,并發(fā)量非常高,spring5之前的網(wǎng)絡線程模型是servlet實現(xiàn)的,而servlet不是nio模型,所以在spring5之后,spring的底層網(wǎng)絡實現(xiàn)采用了netty。如果我們單獨使用netty來開發(fā)websocket服務端,速度快是絕對的,但是可能會遇到下列問題:

  1. 與系統(tǒng)的其他應用集成不方便,在rpc調(diào)用的時候,無法享受springcloud里feign服務調(diào)用的便利性
  2. 業(yè)務邏輯可能要重復實現(xiàn)
  3. 使用netty可能需要重復造輪子
  4. 怎么連接上服務注冊中心,也是一件麻煩的事情
  5. restful服務與ws服務需要分開實現(xiàn),如果在netty上實現(xiàn)restful服務,有多麻煩可想而知,用spring一站式restful開發(fā)相信很多人都習慣了。

-? ? ?使用 spring websocket 實現(xiàn) ws 服務? ? -

spring websocket已經(jīng)被springboot很好地集成了,所以在springboot上開發(fā)ws服務非常方便,做法非常簡單

第一步:添加依賴

<dependency>
???<groupId>org.springframework.bootgroupId>
???<artifactId>spring-boot-starter-websocketartifactId>
dependency>
第二步:添加配置類

@Configuration
public?class?WebSocketConfig?implements?WebSocketConfigurer?{
@Override
public?void?registerWebSocketHandlers(WebSocketHandlerRegistry?registry)?{
????registry.addHandler(myHandler(),?"/")
????????.setAllowedOrigins("*");
}
?
@Bean
?public?WebSocketHandler?myHandler()?{
?????return?new?MessageHandler();
?}
}
第三步:實現(xiàn)消息監(jiān)聽類

@Component
@SuppressWarnings("unchecked")
public?class?MessageHandler?extends?TextWebSocketHandler?{
???private?List?clients?=?new?ArrayList<>();
?
???@Override
???public?void?afterConnectionEstablished(WebSocketSession?session)?{
???????clients.add(session);
???????System.out.println("uri?:"? ?session.getUri());
???????System.out.println("連接建立:?"? ?session.getId());
???????System.out.println("current?seesion:?"? ?clients.size());
???}
?
???@Override
???public?void?afterConnectionClosed(WebSocketSession?session,?CloseStatus?status)?{
???????clients.remove(session);
???????System.out.println("斷開連接:?"? ?session.getId());
???}
?
???@Override
???protected?void?handleTextMessage(WebSocketSession?session,?TextMessage?message)?{
???????String?payload?=?message.getPayload();
???????Map?map?=?JSONObject.parseObject(payload,?HashMap.class);
???????System.out.println("接受到的數(shù)據(jù)"? ?map);
???????clients.forEach(s?->?{
???????????try?{
???????????????System.out.println("發(fā)送消息給:?"? ?session.getId());
???????????????s.sendMessage(new?TextMessage("服務器返回收到的信息,"? ?payload));
???????????}?catch?(Exception?e)?{
???????????????e.printStackTrace();
???????????}
???????});
???}
}
從這個demo中,使用spring websocket實現(xiàn)ws服務的便利性大家可想而知了。為了能更好地向spring cloud大家族看齊,我最終采用了spring websocket實現(xiàn)ws服務。

因此我的應用服務架構是這樣子的:一個應用既負責restful服務,也負責ws服務。沒有將ws服務模塊拆分是因為拆分出去要使用feign來進行服務調(diào)用。第一本人比較懶惰,第二拆分與不拆分相差在多了一層服務間的io調(diào)用,所以就沒有這么做了。


-? ? ?從zuul開始技術轉型? ? -

要實現(xiàn)websocket集群,我們必不可免地得從zuul轉型到spring cloud gateway。原因如下:

zuul1.0版本不支持websocket轉發(fā),zuul 2.0開始支持websocket,zuul2.0幾個月前開源了,但是2.0版本沒有被spring boot集成,而且文檔不健全。因此轉型是必須的,同時轉型也很容易實現(xiàn)。

在gateway中,為了實現(xiàn)ssl認證和動態(tài)路由負載均衡,yml文件中以下的某些配置是必須的,在這里提前避免大家采坑

server:
??port:?443
??ssl:
????enabled:?true
????key-store:?classpath:xxx.jks
????key-store-password:?xxxx
????key-store-type:?JKS
????key-alias:?alias
spring:
??application:
????name:?api-gateway
??cloud:
????gateway:
??????httpclient:
????????ssl:
??????????handshake-timeout-millis:?10000
??????????close-notify-flush-timeout-millis:?3000
??????????close-notify-read-timeout-millis:?0
??????????useInsecureTrustManager:?true
??????discovery:
????????locator:
??????????enabled:?true
??????????lower-case-service-id:?true
??????routes:
??????-?id:?dc
????????uri:?lb://dc
????????predicates:
????????-?Path=/dc/**
??????-?id:?wecheck
????????uri:?lb://wecheck
????????predicates:
????????-?Path=/wecheck/**
如果要愉快地玩https卸載,我們還需要配置一個filter,否則請求網(wǎng)關時會出現(xiàn)錯誤not an SSL/TLS record

@Component
public?class?HttpsToHttpFilter?implements?GlobalFilter,?Ordered?{
??private?static?final?int?HTTPS_TO_HTTP_FILTER_ORDER?=?10099;
??@Override
??public?Mono?filter(ServerWebExchange?exchange,?GatewayFilterChain?chain)?{
??????URI?originalUri?=?exchange.getRequest().getURI();
??????ServerHttpRequest?request?=?exchange.getRequest();
??????ServerHttpRequest.Builder?mutate?=?request.mutate();
??????String?forwardedUri?=?request.getURI().toString();
??????if?(forwardedUri?!=?null?
本站聲明: 本文章由作者或相關機構授權發(fā)布,目的在于傳遞更多信息,并不代表本站贊同其觀點,本站亦不保證或承諾內(nèi)容真實性等。需要轉載請聯(lián)系該專欄作者,如若文章內(nèi)容侵犯您的權益,請及時聯(lián)系本站刪除( 郵箱:macysun@21ic.com )。
換一批
延伸閱讀
關閉