dubbo的超時機制原理

本篇內(nèi)容主要講解“dubbo的超時機制原理”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學(xué)習(xí)“dubbo的超時機制原理”吧!

站在用戶的角度思考問題,與客戶深入溝通,找到許昌網(wǎng)站設(shè)計與許昌網(wǎng)站推廣的解決方案,憑借多年的經(jīng)驗,讓設(shè)計與互聯(lián)網(wǎng)技術(shù)結(jié)合,創(chuàng)造個性化、用戶體驗好的作品,建站類型包括:網(wǎng)站設(shè)計、做網(wǎng)站、企業(yè)官網(wǎng)、英文網(wǎng)站、手機端網(wǎng)站、網(wǎng)站推廣、主機域名、雅安服務(wù)器托管、企業(yè)郵箱。業(yè)務(wù)覆蓋許昌地區(qū)。

在工作中碰到一個業(yè)務(wù)接口時間比較長,需要修改超時時間,不知道原理,在網(wǎng)上搜索,看到有人說如果你覺得自己了解了dubbo的超時機制,那么問問自己以下問題:

  • 超時是針對消費端還是服務(wù)端?

  • 超時在哪設(shè)置?

  • 超時設(shè)置的優(yōu)先級是什么?

  • 超時的實現(xiàn)原理是什么?

  • 超時解決的是什么問題 ?

如果連這些都回答不上了,那只能說明還沒有完全掌握 dubbo的超時機制。

于是索性就自己本地搭了個環(huán)境,研究了一下源碼。 先來說一說結(jié)論:

  1. 超時是針對消費端的,消費端會拋出TimeoutException 而服務(wù)器端僅僅是一個 warn日志

  2. 超時在消費端、服務(wù)器端設(shè)置,dubbo會合并這兩個設(shè)置

  3. consumer方法級別 > provider 方法級別 > consumer 接口級別 > provider 接口級別 > consumer 全局級別 > provider 全局級別。如果都沒配置,那么就是dubbo默認的1秒

  4. 見下面分析

  5. 最主要是寶貴的線程,客戶端的用戶線程不能因為服務(wù)端超時而一直類似wait, 導(dǎo)致無法正常響應(yīng)其他業(yè)務(wù)。

一、超時時間設(shè)置
全局超時配置
<dubbo:consumer timeout="5000" />
指定接口以及特定方法超時配置
    <dubbo:service interface="me.kimi.samples.dubbo.facade.QuestionFacade" ref="questionFacade" timeout="6000">
        <dubbo:method name="getQuestionById" timeout="7000"/>
    </dubbo:service>

觀察控制臺打印的注冊URL:

consumer://172.16.71.30/me.kimi.samples.dubbo.facade.QuestionFacade?application=demo-consumer&category=providers,configurators,routers&check=false&default.proxy=jdk&default.timeout=5000&dubbo=2.6.2&getQuestionById.timeout=7000&interface=me.kimi.samples.dubbo.facade.QuestionFacade&logger=log4j&methods=getQuestionById&pid=13884&side=consumer&timeout=6000&timestamp=1536630294523

可以看到:

  • default.timeout=5000

  • timeout=6000

  • getQuestionById.timeout=7000

分別對應(yīng)了全局、類級別、方法級別的超時設(shè)置。

省略一部分調(diào)用鏈,最終會來到這里 DubboInvoker,讀取超時時間:

com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker

    @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);
        ExchangeClient currentClient;        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }        try {            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);            // 讀取超時時間,這里dubbo已經(jīng)把服務(wù)端的timeout參數(shù)和消費端的timeout參數(shù)合并
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);            if (isOneway) {                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout);
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));                return new RpcResult();
            } else {
                RpcContext.getContext().setFuture(null);                // 返回 DefaultFuture
                // get()在沒返回值之前會 阻塞 await
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

看一下參數(shù)獲取的方法:

public int getMethodParameter(String method, String key, int defaultValue) {    // 首先查 getQuestionById.timeout
    String methodKey = method + "." + key;    // 從數(shù)字緩存中先獲取,不需要每次都 parseInt
    Number n = getNumbers().get(methodKey);    if (n != null) {        return n.intValue();
    }    // 沒得話,去取字符串值
    String value = getMethodParameter(method, key);    if (value == null || value.length() == 0) {        // 三個地方都沒配置,返回默認值,默認是1秒
        return defaultValue;
    }    // 放入緩存中
    int i = Integer.parseInt(value);
    getNumbers().put(methodKey, i);    return i;
}
public String getMethodParameter(String method, String key) {    // 首先查 getQuestionById.timeout
    String value = parameters.get(method + "." + key);    if (value == null || value.length() == 0) {        // 沒有設(shè)定方法級別的,去查接口級別或全局的
        return getParameter(key);
    }    return value;
}
public String getParameter(String key) {    // 接口級別去查 timeout 
    String value = parameters.get(key);    if (value == null || value.length() == 0) {        // 沒的話查詢?nèi)旨墑e default.timeout
        value = parameters.get(Constants.DEFAULT_KEY_PREFIX + key);
    }    return value;
}

從代碼中可以看出超時時間的設(shè)置:方法級別 > 接口級別 > 全局級別。

這里要特殊提一點,就是dubbo會合并服務(wù)端客戶端的設(shè)置。

修改客戶端配置, 只留下全局設(shè)置:

<dubbo:consumer timeout="2000" proxy="jdk"/>
    <dubbo:service interface="me.kimi.samples.dubbo.facade.QuestionFacade" ref="questionFacade"/>

服務(wù)端配置如下:

    <dubbo:provider timeout="10000" accepts="500"/>
    <!-- service implementation, as same as regular local bean -->
    <bean id="questionFacade" class="me.kimi.samples.dubbo.provider.service.QuestionFacadeImpl"/>
    <!-- declare the service interface to be exported -->
    <dubbo:service interface="me.kimi.samples.dubbo.facade.QuestionFacade" ref="questionFacade" timeout="9000"/>

最后在客戶端調(diào)用的時候,發(fā)現(xiàn)timeout是9000ms, debug發(fā)現(xiàn)客戶端合并了url, 合并結(jié)果如下:

dubbo://172.16.71.30:20880/me.kimi.samples.dubbo.facade.QuestionFacade?anyhost=true&application=demo-provider&default.accepts=500&default.timeout=10000&dubbo=2.6.2&generic=false&interface=me.kimi.samples.dubbo.facade.QuestionFacade&logger=log4j&methods=getQuestionById&pid=17508&side=provider&timeout=9000&timestamp=1536660132286

查看源碼 com.alibaba.dubbo.registry.integration.RegistryDirectory#mergeUrl:

private URL mergeUrl(URL providerUrl) {
    providerUrl = ClusterUtils.mergeUrl(providerUrl, queryMap); // Merge the consumer side parameters
    List<Configurator> localConfigurators = this.configurators; // local reference
    if (localConfigurators != null && !localConfigurators.isEmpty()) {        for (Configurator configurator : localConfigurators) {
            providerUrl = configurator.configure(providerUrl);
        }
    }
    providerUrl = providerUrl.addParameter(Constants.CHECK_KEY, String.valueOf(false)); // Do not check whether the connection is successful or not, always create Invoker!
    // 這里就是合并服務(wù)器端的參數(shù),所以除了timeout參數(shù),其他很多參數(shù)也是這樣的
    // 即已客戶端優(yōu)先
    this.overrideDirectoryUrl = this.overrideDirectoryUrl.addParametersIfAbsent(providerUrl.getParameters()); 
    if ((providerUrl.getPath() == null || providerUrl.getPath().length() == 0)
            && "dubbo".equals(providerUrl.getProtocol())) { // Compatible version 1.0
        //fix by tony.chenl DUBBO-44
        String path = directoryUrl.getParameter(Constants.INTERFACE_KEY);        if (path != null) {            int i = path.indexOf('/');            if (i >= 0) {
                path = path.substring(i + 1);
            }
            i = path.lastIndexOf(':');            if (i >= 0) {
                path = path.substring(0, i);
            }
            providerUrl = providerUrl.setPath(path);
        }
    }    return providerUrl;
}

所以綜合,超時時間的優(yōu)先級為:

consumer方法級別 > provider 方法級別 > consumer 接口級別 > provider 接口級別 > consumer 全局級別 > provider 全局級別。

二、超時實現(xiàn)

有了超時時間,那么dubbo是怎么實現(xiàn)超時的呢?

再看上面的DubboInvoker,對于一般的有返回值的調(diào)用,最終調(diào)用:

return (Result) currentClient.request(inv, timeout).get();

先看一下request方法,來到 com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel的Request方法:

    @Override
    public ResponseFuture request(Object request, int timeout) throws RemotingException {        if (closed) {            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }        // create request.
        Request req = new Request();
        req.setVersion("2.0.0");
        req.setTwoWay(true);
        req.setData(request);
       
        DefaultFuture future = new DefaultFuture(channel, req, timeout);        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();            throw e;
        }        return future;
    }

重點是 DefaultFuture:

static {
    Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer");
    th.setDaemon(true);
    th.start();
}

類加載的時候會啟動一個超時掃描線程:

public DefaultFuture(Channel channel, Request request, int timeout) {    this.channel = channel;    this.request = request;    this.id = request.getId();    this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);    // 每個 DefaultFuture 都有一個 id, 對應(yīng)當(dāng)前請求id, 然后被放到 靜態(tài)Map中。
    FUTURES.put(id, this);    // id 對應(yīng)的 Channel 也存起來,后續(xù)超時需要處理
    CHANNELS.put(id, channel);
}

再看下get方法:

@Overridepublic Object get() throws RemotingException {    return get(timeout);
}@Overridepublic Object get(int timeout) throws RemotingException {    if (timeout <= 0) {
        timeout = Constants.DEFAULT_TIMEOUT;
    }    if (!isDone()) {        long start = System.currentTimeMillis();
        lock.lock();        try {            while (!isDone()) {                // 這里可以看到在調(diào)用的時候需要等待
                done.await(timeout, TimeUnit.MILLISECONDS);                if (isDone() || System.currentTimeMillis() - start > timeout) {                    break;
                }
            }
        } catch (InterruptedException e) {            throw new RuntimeException(e);
        } finally {
            lock.unlock();
        }        
        if (!isDone()) {            throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
        }
    }    // 處理返回值
    // 線程掃描超時,正常返回都在這里
    return returnFromResponse();
}

從上面代碼上可以看到,get方法,會使當(dāng)前線程掛起等待。那么什么時候會被恢復(fù)呢,可以想到兩類情況:

  1. 超時

  2. 服務(wù)端正常返回

那么回過頭來看看超時掃描線程,看一下掃描線程做了什么事情:

private static class RemotingInvocationTimeoutScan implements Runnable {        @Override
        public void run() {            while (true) {                try {                    // 就是去掃描DefaultFuture列表
                    for (DefaultFuture future : FUTURES.values()) {                        if (future == null || future.isDone()) {                            continue;
                        }                        // 如果future未完成,且超時
                        if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {                            // 創(chuàng)建一個異常的Response
                            Response timeoutResponse = new Response(future.getId());                            // set timeout status.
                            timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
                            timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));                            // 處理異常
                            DefaultFuture.received(future.getChannel(), timeoutResponse);
                        }
                    }
                    Thread.sleep(30);
                } catch (Throwable e) {
                    logger.error("Exception when scan the timeout invocation of remoting.", e);
                }
            }
        }
    }

看下 received方法

public static void received(Channel channel, Response response) {    try {
        DefaultFuture future = FUTURES.remove(response.getId());        if (future != null) {
            future.doReceived(response);
        } else {
            logger.warn("The timeout response finally returned at "
                    + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                    + ", response " + response
                    + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                    + " -> " + channel.getRemoteAddress()));
        }
    } finally {
        CHANNELS.remove(response.getId());
    }
}
private void doReceived(Response res) {
    lock.lock();    try {        // 設(shè)置響應(yīng)
        // 這樣isDone就是true了
        response = res;        if (done != null) {            // 恢復(fù)掛起的線程 
            done.signal();
        }
    } finally {
        lock.unlock();
    }    if (callback != null) {
        invokeCallback(callback);
    }
}

顯然這里掃描線程把用戶請求線程恢復(fù)了。 恢復(fù)以后,順著剛才的 DefaultFuture 的get方法,來到 returnFromResponse方法:

private Object returnFromResponse() throws RemotingException {
    Response res = response;    if (res == null) {        throw new IllegalStateException("response cannot be null");
    }    // 正常返回,返回 Result 對象
    if (res.getStatus() == Response.OK) {        return res.getResult();
    }    // 超時處理
    if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {        // 重新拋出異常
        throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
    }    throw new RemotingException(channel, res.getErrorMessage());
}

超時掃描線程,構(gòu)建了一個 超時 Response, 在這里拋出 超時異常。

超時拋異常是看見了,那么正常返回是怎么處理的呢,因為 done還 await在那里。 這里暫時不細說dubbo其他組件的原理,只要知道在網(wǎng)絡(luò)事件完成(即服務(wù)器端在規(guī)定時間內(nèi)正常返回)的時候,會有個回調(diào),在整個回調(diào)過程中,最終會回調(diào)到 com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler 的 received 方法,看下代碼:

@Overridepublic void received(Channel channel, Object message) throws RemotingException {
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);    try {        if (message instanceof Request) {            // handle request.
            Request request = (Request) message;            if (request.isEvent()) {
                handlerEvent(channel, request);
            } else {                if (request.isTwoWay()) {
                    Response response = handleRequest(exchangeChannel, request);
                    channel.send(response);
                } else {
                    handler.received(exchangeChannel, request.getData());
                }
            }
        } else if (message instanceof Response) {            // 請求會回調(diào)到這里
            handleResponse(channel, (Response) message);
        } else if (message instanceof String) {            if (isClientSide(channel)) {
                Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                logger.error(e.getMessage(), e);
            } else {
                String echo = handler.telnet(channel, (String) message);                if (echo != null && echo.length() > 0) {
                    channel.send(echo);
                }
            }
        } else {
            handler.received(exchangeChannel, message);
        }
    } finally {
        HeaderExchangeChannel.removeChannelIfDisconnected(channel);
    }
}

處理響應(yīng):

static void handleResponse(Channel channel, Response response) throws RemotingException {    // 不是心跳包,是正常的業(yè)務(wù)返回
    if (response != null && !response.isHeartbeat()) {
        DefaultFuture.received(channel, response);
    }
}

到此,相信大家對“dubbo的超時機制原理”有了更深的了解,不妨來實際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進入相關(guān)頻道進行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

名稱欄目:dubbo的超時機制原理
文章路徑:http://bm7419.com/article32/igsepc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供電子商務(wù)、定制網(wǎng)站、搜索引擎優(yōu)化、企業(yè)網(wǎng)站制作、域名注冊、網(wǎng)站制作

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

綿陽服務(wù)器托管