WebFlux定點推送以及全推送靈活websocket運(yùn)用是什么

本篇文章為大家展示了WebFlux定點推送以及全推送靈活websocket運(yùn)用是什么,內(nèi)容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細(xì)介紹希望你能有所收獲。

成都創(chuàng)新互聯(lián)是專業(yè)的廣宗網(wǎng)站建設(shè)公司,廣宗接單;提供網(wǎng)站制作、網(wǎng)站設(shè)計,網(wǎng)頁設(shè)計,網(wǎng)站設(shè)計,建網(wǎng)站,PHP網(wǎng)站建設(shè)等專業(yè)做網(wǎng)站服務(wù);采用PHP框架,可快速的進(jìn)行廣宗網(wǎng)站開發(fā)網(wǎng)頁制作和功能擴(kuò)展;專業(yè)做搜索引擎喜愛的網(wǎng)站,專業(yè)的做網(wǎng)站團(tuán)隊,希望更多企業(yè)前來合作!

前言

        WebFlux 本身提供了對 WebSocket 協(xié)議的支持,處理 WebSocket 請求需要對應(yīng)的 handler 實現(xiàn) WebSocketHandler 接口,每一個 WebSocket 都有一個關(guān)聯(lián)的 WebSocketSession,包含了建立請求時的握手信息 HandshakeInfo,以及其它相關(guān)的信息??梢酝ㄟ^ session 的 receive() 方法來接收客戶端的數(shù)據(jù),通過 session 的 send() 方法向客戶端發(fā)送數(shù)據(jù)。

示例

下面是一個簡單的 WebSocketHandler 示例:

@Component
public class EchoHandler implements WebSocketHandler {
    public Mono<Void> handle(WebSocketSession session) {
        return session.send(
                session.receive().map(
                        msg -> session.textMessage("ECHO -> " + msg.getPayloadAsText())));
    }
}

        有了 handler 之后,還需要讓 WebFlux 知道哪些請求需要交給這個 handler 進(jìn)行處理,因此要創(chuàng)建相應(yīng)的 HandlerMapping。

        在處理 HTTP 請求時,我們經(jīng)常使用 WebFlux 中最簡單的 handler 定義方式,即通過注解 @RequestMapping 將某個方法定義為處理特定路徑請求的 handler。 但是這個注解是用于處理 HTTP 請求的,對于 WebSocket 請求而言,收到請求后還需要協(xié)議升級的過程,之后才是 handler 的執(zhí)行,所以我們不能直接通過該注解定義請求映射,不過可以使用 SimpleUrlHandlerMapping 來添加映射。

@Configuration
public class WebSocketConfiguration {
    @Bean
    public HandlerMapping webSocketMapping(EchoHandler echoHandler) {
        final Map<String, WebSocketHandler> map = new HashMap<>(1);
        map.put("/echo", echoHandler);

        final SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setOrder(Ordered.HIGHEST_PRECEDENCE);
        mapping.setUrlMap(map);
        return mapping;
    }

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

這樣就能夠?qū)l(fā)往 /echo 的 WebSocket 請求交給 EchoHandler 處理。

我們還要為 WebSocket 類型的 handler 創(chuàng)建對應(yīng)的 WebSocketHandlerAdapter,以便讓 DispatcherHandler 能夠調(diào)用我們的 WebSocketHandler。

完成這三個步驟后,當(dāng)一個 WebSocket 請求到達(dá) WebFlux 時,首先由 DispatcherHandler 進(jìn)行處理,它會根據(jù)已有的 HandlerMapping 找到這個 WebSocket 請求對應(yīng)的 handler,接著發(fā)現(xiàn)該 handler 實現(xiàn)了 WebSocketHandler 接口,于是會通過 WebSocketHandlerAdapter 來完成該 handler 的調(diào)用。

疑惑

        從上面的例子不難看出,沒接收一個請求后,就得在里面里面返回消息,后面就不能再給他發(fā)消息了。其次是我每次新添加或者刪除一個消息的處理類Handler,就得每次去修改配置文件中的SimpleUrlHandlerMapping的UrlMap的內(nèi)容,感覺不是很友好。于是針對這2點進(jìn)行修改和調(diào)整如下:

 1. 用自定義注解注冊 Handler

我們能否像注冊 HTTP 請求的 Handler 那樣,也通過類似 RequestMapping 的注解來注冊 Handler 呢?

雖然官方?jīng)]有相關(guān)實現(xiàn),但我們可以自己實現(xiàn)一個類似的注解,不妨叫作 WebSocketMapping

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface WebSocketMapping {
    String value() default "";
}

@Retention(RetentionPolicy.RUNTIME) 表明該注解工作在運(yùn)行期間,@Target(ElementType.TYPE) 表明該注解作用在類上。

我們先看下該注解最終的使用方式。下面是一個 TimeHandler 的示例,它會每秒鐘會向客戶端發(fā)送一次時間。我們通過注解 @WebSocketMapping("/time") 完成了 TimeHandler 的注冊,告訴 WebFlux 當(dāng)有 WebSocket 請求發(fā)往 /echo 路徑時,就交給 EchoHandler 處理:

@Component
@WebSocketMapping("/echo")
public class EchoHandler implements WebSocketHandler {
    @Override
    public Mono<Void> handle(final WebSocketSession session) {
        return session.send(
                session.receive()
                        .map(msg -> session.textMessage(
                                "服務(wù)端返回:小明, -> " + msg.getPayloadAsText())));
    }
}

是不是和 RequestMapping 一樣方便?

到目前為止,這個注解還沒有實際的功能,還不能自動注冊 handler。回顧我們上面注冊路由的方式,我們創(chuàng)建了一個 SimpleUrlHandlerMapping,并手動添加了 EchoHandler 的映射規(guī)則,然后將其作為 HandlerMapping 的 Bean 返回。

現(xiàn)在我們要創(chuàng)建一個專門的 HandlerMapping 類來處理 WebSocketMapping 注解,自動完成 handler 的注冊:

public class WebSocketMappingHandlerMapping extends SimpleUrlHandlerMapping{
	
	private Map<String, WebSocketHandler> handlerMap = new LinkedHashMap<>();
	/**
     * Register WebSocket handlers annotated by @WebSocketMapping
     * @throws BeansException
     */
    @Override
    public void initApplicationContext() throws BeansException {
        Map<String, Object> beanMap = obtainApplicationContext()
                .getBeansWithAnnotation(WebSocketMapping.class);
        beanMap.values().forEach(bean -> {
            if (!(bean instanceof WebSocketHandler)) {
                throw new RuntimeException(
                        String.format("Controller [%s] doesn't implement WebSocketHandler interface.",
                                bean.getClass().getName()));
            }
            WebSocketMapping annotation = AnnotationUtils.getAnnotation(
                    bean.getClass(), WebSocketMapping.class);
            //webSocketMapping 映射到管理中
            handlerMap.put(Objects.requireNonNull(annotation).value(),(WebSocketHandler) bean);
        });
        super.setOrder(Ordered.HIGHEST_PRECEDENCE);
        super.setUrlMap(handlerMap);
        super.initApplicationContext();
    }
}

我們的 WebSocketMappingHandlerMapping 類,實際上就是 SimpleUrlHandlerMapping,只不過增加了一些初始化的操作。

initApplicationContext() 方法是 Spring 中 ApplicationObjectSupport 類的方法,用于自定義類的初始化行為,在我們的 WebSocketMappingHandlerMapping 中,初始化工作主要是收集使用了 @WebSocketMapping 注解并且實現(xiàn)來 WebSocketHandler 接口的 Component,然后將它們注冊到內(nèi)部的 SimpleUrlHandlerMapping 中。之后的路由工作都是由父類 SimpleUrlHandlerMapping 已實現(xiàn)的功能來完成。

現(xiàn)在,我們只需要返回 WebSocketMappingHandlerMapping 的 Bean,就能自動處理 @WebSocketMapping 注解了:

@Configuration
public class WebSocketConfiguration {

	@Bean
	public HandlerMapping webSocketMapping() {
		return new WebSocketMappingHandlerMapping();
	}

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter();
	}
}

2. WebSocket 請求處理過程剖析

我們來看下基于 Reactor Netty 的 WebFlux 具體是如何處理 WebSocket 請求的。

前面說過,WebSocket 請求進(jìn)入 WebFlux 后,首先會從 HandlerMapping 中找到對應(yīng)的 WebSocketHandler,再由 WebSocketHandlerAdapter 進(jìn)行實際的調(diào)用。這就不再多做闡述,有興趣的朋友可以去看看WebSocketHandler,WebSocketHandlerAdapter。

3. 分離數(shù)據(jù)的接收與發(fā)送操作

我們知道 HTTP 協(xié)議是半雙工通信,雖然客戶端和服務(wù)器都能給對方發(fā)數(shù)據(jù),但是同一時間內(nèi)只會由一方向另一方發(fā)送數(shù)據(jù),并且在順序上是客戶端先發(fā)送請求,然后才由服務(wù)器返回響應(yīng)數(shù)據(jù)。所以服務(wù)器處理 HTTP 的邏輯很簡單,就是每接收到一個客戶端請求,就返回一個響應(yīng)。

而 WebSocket 是全雙工通信,客戶端和服務(wù)器可以隨時向另一方發(fā)送數(shù)據(jù),所以不再是"發(fā)送請求、返回響應(yīng)"的通信方式了。我們上面的 EchoHandler 示例用的仍舊是這一方式,即收到數(shù)據(jù)后再針對性地返回一條數(shù)據(jù),我們下面就來看看如何充分利用 WebSocket 的雙向通信。

WebSocket 的處理,主要是通過 session 完成對兩個數(shù)據(jù)流的操作,一個是客戶端發(fā)給服務(wù)器的數(shù)據(jù)流,一個是服務(wù)器發(fā)給客戶端的數(shù)據(jù)流:

WebSocketSession 方法描述
Flux<WebSocketMessage> receive()接收來自客戶端的數(shù)據(jù)流,當(dāng)連接關(guān)閉時數(shù)據(jù)流結(jié)束。
Mono<Void> send(Publisher<WebSocketMessage>)向客戶端發(fā)送數(shù)據(jù)流,當(dāng)數(shù)據(jù)流結(jié)束時,往客戶端的寫操作也會隨之結(jié)束,此時返回的 Mono<Void> 會發(fā)出一個完成信號。

在 WebSocketHandler 中,最后應(yīng)該將兩個數(shù)據(jù)流的處理結(jié)果整合成一個信號流,并返回一個 Mono<Void> 用于表明處理是否結(jié)束。

我們分別為兩個流定義處理的邏輯:

  • 對于輸出流:服務(wù)器每秒向客戶端發(fā)送一個數(shù)字;

  • 對于輸入流:每當(dāng)收到客戶端消息時,就打印到標(biāo)準(zhǔn)輸出

Mono<Void> input = session.receive()
                   .map(WebSocketMessage::getPayloadAsText)
                   .map(msg -> id + ": " + msg)
				   .doOnNext(System.out::println).then();

Mono<Void> output = session.send(Flux.create(sink -> 
                    senderMap.put(id, new WebSocketSender(session, sink))));

 這兩個處理邏輯互相獨立,它們之間沒有先后關(guān)系,操作執(zhí)行完之后都是返回一個 Mono<Void>,但是如何將這兩個操作的結(jié)果整合成一個信號流返回給 WebFlux 呢?我們可以使用 WebFlux 中的 Mono.zip() 方法:

@Component
@WebSocketMapping("/echo")
public class EchoHandler implements WebSocketHandler {

	@Autowired
	private ConcurrentHashMap<String, WebSocketSender> senderMap;

	@Override
	public Mono<Void> handle(WebSocketSession session) {

		Mono<Void> input = session.receive()
                .map(WebSocketMessage::getPayloadAsText).map(msg -> id + ": " + msg)
				.doOnNext(System.out::println).then();

		Mono<Void> output = session.send(Flux.create(sink -> 
                senderMap.put(id, new WebSocketSender(session, sink))));
		/**
		 * Mono.zip() 會將多個 Mono 合并為一個新的 Mono,
         * 任何一個 Mono 產(chǎn)生 error 或 complete 都會導(dǎo)致合并后的 Mono
		 * 也隨之產(chǎn)生 error 或 complete,此時其它的 Mono 則會被執(zhí)行取消操作。
		 */
		return Mono.zip(input, output).then();
	}
}

4. 從 Handler 外部發(fā)送數(shù)據(jù)

這里所說的從外部發(fā)送數(shù)據(jù),指的是需要在 WebSocketHandler 的代碼范圍之外,在其它地方通過代碼調(diào)用的方式向 WebSocket 連接發(fā)送數(shù)據(jù)。

思路:在定義 session 的 send() 操作時,通過編程的方式創(chuàng)建 Flux,即使用 Flux.create() 方法創(chuàng)建,將發(fā)布 Flux 數(shù)據(jù)的 FluxSink 暴露出來,并進(jìn)行保存,然后在需要發(fā)送數(shù)據(jù)的地方,調(diào)用 FluxSink<T> 的 next(T data) 方法,向 Flux 的訂閱者發(fā)布數(shù)據(jù)。

create 方法是以編程方式創(chuàng)建 Flux 的高級形式,它允許每次產(chǎn)生多個數(shù)據(jù),并且可以由多個線程產(chǎn)生。

create 方法將內(nèi)部的 FluxSink 暴露出來,F(xiàn)luxSink 提供了 next、error、complete 方法。通過 create 方法,可以將響應(yīng)式堆棧中的 API 與其它 API 進(jìn)行連接。

考慮這么一個場景:服務(wù)器與客戶端 A 建立 WebSocket 連接后,允許客戶端 B 通過 HTTP 向客戶端 A 發(fā)送數(shù)據(jù)。

不考慮安全性、魯棒性等問題,我們給出一個簡單的示例。

首先是 WebSocketHandler 的實現(xiàn),客戶端發(fā)送 WebSocket 建立請求時,需要在 query 參數(shù)中為當(dāng)前連接指定一個 id,服務(wù)器會以該 id 為鍵,以對應(yīng)的 WebSocketSender 為值存放到 senderMap 中:

@Component
@WebSocketMapping("/echo")
public class EchoHandler implements WebSocketHandler {

	@Autowired
	private ConcurrentHashMap<String, WebSocketSender> senderMap;

	@Override
	public Mono<Void> handle(WebSocketSession session) {
		// TODO Auto-generated method stub
		HandshakeInfo handshakeInfo = session.getHandshakeInfo();
		Map<String, String> queryMap = getQueryMap(handshakeInfo.getUri().getQuery());
		String id = queryMap.getOrDefault("id", "defaultId");
		Mono<Void> input = session.receive().map(WebSocketMessage::getPayloadAsText).map(msg -> id + ": " + msg)
				.doOnNext(System.out::println).then();

		Mono<Void> output = session.send(Flux.create(sink -> senderMap.put(id, new WebSocketSender(session, sink))));
		/**
		 * Mono.zip() 會將多個 Mono 合并為一個新的 Mono,任何一個 Mono 產(chǎn)生 error 或 complete 都會導(dǎo)致合并后的 Mono
		 * 也隨之產(chǎn)生 error 或 complete,此時其它的 Mono 則會被執(zhí)行取消操作。
		 */
		return Mono.zip(input, output).then();
	}

	//用于獲取url參數(shù)
	 private Map<String, String> getQueryMap(String queryStr) {
        Map<String, String> queryMap = new HashMap<>();
        if (!StringUtils.isEmpty(queryStr)) {
            String[] queryParam = queryStr.split("&");
            Arrays.stream(queryParam).forEach(s -> {
                String[] kv = s.split("=", 2);
                String value = kv.length == 2 ? kv[1] : "";
                queryMap.put(kv[0], value);
            });
        }
        return queryMap;
    }
}

其中,senderMap 是我們自己定義的 Bean,在配置文件中定義:

@Configuration
public class WebSocketConfiguration {

	@Bean
	public HandlerMapping webSocketMapping() {
		return new WebSocketMappingHandlerMapping();
	}

	@Bean
	public ConcurrentHashMap<String, WebSocketSender> senderMap() {
		return new ConcurrentHashMap<String, WebSocketSender>();
	}

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter();
	}
}

WebSocketSender 是我們自己創(chuàng)建的類,目的是保存 WebSocket 連接的 session 以及對應(yīng)的 FluxSink,以便在 WebSocketHandler 代碼范圍外發(fā)送數(shù)據(jù):

public class WebSocketSender {
	private WebSocketSession session;
    private FluxSink<WebSocketMessage> sink;

    public WebSocketSender(WebSocketSession session, FluxSink<WebSocketMessage> sink) {
        this.session = session;
        this.sink = sink;
    }

    public void sendData(String data) {
        sink.next(session.textMessage(data));
    }
}

接著我們來實現(xiàn) HTTP Controller,用戶在發(fā)起 HTTP 請求時,通過 query 參數(shù)指定要通信的 WebSocket 連接 id,以及要發(fā)送的數(shù)據(jù),然后從 senderMap 中取出對應(yīng)的 WebSocketSender,調(diào)用其 send() 方法向客戶端發(fā)送數(shù)據(jù):

@RestController
@RequestMapping("/msg")
public class MsgController {

	@Autowired
	private ConcurrentHashMap<String, WebSocketSender> senderMap;

	@RequestMapping("/send")
	public String sendMessage(@RequestParam String id, @RequestParam String data) {
		WebSocketSender sender = senderMap.get(id);
		if (sender != null) {
			sender.sendData(data);
			return String.format("Message '%s' sent to connection: %s.", data, id);
		} else {
			return String.format("Connection of id '%s' doesn't exist", id);
		}
	}
}

5. 測試

我這就不再寫頁面了,直接就用https://www.websocket.org/echo.html進(jìn)行測試了,結(jié)果如下:

WebFlux定點推送以及全推送靈活websocket運(yùn)用是什么

這樣就算完成了定點推送了,全推送,和部分推送就不再寫了,只要從ConcurrentHashMap中取出來去發(fā)送就是了。

上述內(nèi)容就是WebFlux定點推送以及全推送靈活websocket運(yùn)用是什么,你們學(xué)到知識或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識儲備,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。

分享題目:WebFlux定點推送以及全推送靈活websocket運(yùn)用是什么
網(wǎng)址分享:http://bm7419.com/article16/pcejdg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供微信公眾號、企業(yè)網(wǎng)站制作、品牌網(wǎng)站制作、Google企業(yè)建站、標(biāo)簽優(yōu)化

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都app開發(fā)公司