C++簡單實現(xiàn)RPC網(wǎng)絡通訊-創(chuàng)新互聯(lián)

? RPC是遠程調(diào)用系統(tǒng)簡稱,它允許程序調(diào)用運行在另一臺計算機上的過程,就像調(diào)用本地的過程一樣。RPC 實現(xiàn)了網(wǎng)絡編程的“過程調(diào)用”模型,讓程序員可以像調(diào)用本地函數(shù)一樣調(diào)用遠程函數(shù)。最近在做的也是遠程調(diào)用過程,所以通過重新梳理RPC來整理總結一下。

創(chuàng)新互聯(lián)公司的客戶來自各行各業(yè),為了共同目標,我們在工作上密切配合,從創(chuàng)業(yè)型小企業(yè)到企事業(yè)單位,感謝他們對我們的要求,感謝他們從不同領域給我們帶來的挑戰(zhàn),讓我們激情的團隊有機會用頭腦與智慧不斷的給客戶帶來驚喜。專業(yè)領域包括成都做網(wǎng)站、網(wǎng)站建設、外貿(mào)營銷網(wǎng)站建設、電商網(wǎng)站開發(fā)、微信營銷、系統(tǒng)平臺開發(fā)。

? 項目來源:

GitHub - qicosmos/rest_rpc: modern C++(C++11), simple, easy to use rpc framework

目錄

一、RPC簡介

1.1 簡介

1.2 本地調(diào)用和遠程調(diào)用的區(qū)別

1.3 RPC運行的流程

1.4 小結

二、RPC簡單實現(xiàn)

2.1 客戶端實現(xiàn)代碼

2.2 服務端代碼

三、加強版RPC(以“RPC簡單實現(xiàn)”為基礎)

3.1 加入錯誤處理

3.2 加入網(wǎng)絡連接(socket)

3.3 加強并發(fā)性

3.4 加入容錯機制(修改客戶端部分)

3.5 負載均衡

四、總結


一、RPC簡介 1.1 簡介

? RPC指的是計算機A的進程調(diào)用另外一臺計算機B的進程,A上的進程被掛起,B上被調(diào)用的進程開始執(zhí)行,當B執(zhí)行完畢后將執(zhí)行結果返回給A,A的進程繼續(xù)執(zhí)行。調(diào)用方可以通過使用參數(shù)將信息傳送給被調(diào)用方,然后通過傳回的結果得到信息。這些傳遞的信息都是被加密過或者其他方式處理。這個過程對開發(fā)人員是透明的,因此RPC可以看作是本地過程調(diào)用的一種擴展,使被調(diào)用過程不必與調(diào)用過程位于同一物理機中。

? RPC可以用于構建基于B/S模式的分布式應用程序:請求服務是一個客戶端、而服務提供程序是一臺服務器。和常規(guī)和本地的調(diào)用過程一樣,遠程過程調(diào)用是同步操作,在結果返回之前,需要暫時中止請求程序。

RPC的優(yōu)點:

  1. 支持面向過程和面向線程的模型;
  2. 內(nèi)部消息傳遞機制對用戶隱藏;
  3. 基于 RPC 模式的開發(fā)可以減少代碼重寫;
  4. 可以在本地環(huán)境和分布式環(huán)境中運行;
1.2 本地調(diào)用和遠程調(diào)用的區(qū)別

? 以ARM環(huán)境為例,我們拆解本地調(diào)用的過程,以下面代碼為例:

int selfIncrement(int a)
{
    return a + 1;
}

int a = 10;
selfIncrement(a);

? 當執(zhí)行到selfIncrement(a)時,首先把a存入寄存器R0,之后轉到函數(shù)地址selfIncrement,執(zhí)行函數(shù)內(nèi)的指令?ADD R0,#1。跳轉到函數(shù)的地址偏移量在編譯時確定。

? 但是如果這是一個遠程調(diào)用,selfIncrement函數(shù)存在于其他機器,為了實現(xiàn)遠程調(diào)用,請求方和服務方需要提供需要解決以下問題:

1. 網(wǎng)絡傳輸。

?????本地調(diào)用的參數(shù)存放在寄存器或棧中,在同一塊內(nèi)存中,可以直接訪問到。遠程過程調(diào)用需要借助網(wǎng)絡來傳遞參數(shù)和需要調(diào)用的函數(shù) ID。

? 2. 編解碼

?????請求方需要將參數(shù)轉化為字節(jié)流,服務提供方需要將字節(jié)流轉化為參數(shù)。

? 3. 函數(shù)映射表

?????服務提供方的函數(shù)需要有唯一的 ID 標識,請求方通過 ID 標識告知服務提供方需要調(diào)用哪個函數(shù)。

以上三個功能即為 RPC 的基本框架所必須包含的功能。

1.3 RPC運行的流程

一次 RPC 調(diào)用的運行流程大致分為如下七步,具體如下圖所示。

  1. 客戶端調(diào)用客戶端存根程序,將參數(shù)傳入;
  2. 客戶端存根程序將參數(shù)轉化為標準格式,并編組進消息;
  3. 客戶端存根程序將消息發(fā)送到傳輸層,傳輸層將消息傳送至遠程服務器;
  4. 服務器的傳輸層將消息傳遞到服務器存根程序,存根程序對闡述進行解包,并使用本地調(diào)用的機制調(diào)用所需的函數(shù);
  5. 運算完成之后,將結果返回給服務器存根,存根將結果編組為消息,之后發(fā)送給傳輸層;
  6. 服務器傳輸層將結果消息發(fā)送給客戶端傳輸層;
  7. 客戶端存根對返回消息解包,并返回給調(diào)用方。

服務端存根和客戶端存根可以看做是被封裝起來的細節(jié),這些細節(jié)對于開發(fā)人員來說是透明的,但是在客戶端層面看到的是 “本地” 調(diào)用了?selfIncrement()方法,在服務端層面,則需要封裝、網(wǎng)絡傳輸、解封裝等等操作。因此 RPC 可以看作是傳統(tǒng)本地過程調(diào)用的一種擴展,其使得被調(diào)用過程不必與調(diào)用過程位于同一物理機中。

1.4 小結

RPC 的目標是做到在遠程機器上調(diào)用函數(shù)與本地調(diào)用函數(shù)一樣的體驗。 為了達到這個目的,需要實現(xiàn)網(wǎng)絡傳輸、序列化與反序列化、函數(shù)映射表等功能,其中網(wǎng)絡傳輸可以使用socket或其他,序列化和反序列化可以使用protobuf,函數(shù)映射表可以使用std::function。

? lambda與std::function內(nèi)容可以看:

C++11 匿名函數(shù)lambda的使用_Thomas_Lbw的博客-博客

C++11 std::function 基礎用法_Thomas_Lbw的博客-博客

lambda表達式和?std::function的功能是類似的,lambda表達式可以轉換為?std::function,一般情況下,更多使用?lambda表達式,只有在需要回調(diào)函數(shù)的情況下才會使用?std::function。

二、RPC簡單實現(xiàn) 2.1 客戶端實現(xiàn)代碼
#include#include#include#include#includeclass RPCClient
{
public:
    using RPCCallback = std::function;
    RPCClient(const std::string& server_address) : server_address_(server_address) {}
    ~RPCClient() {}

    void Call(const std::string& method, const std::string& request, RPCCallback callback)
    {
        // 序列化請求數(shù)據(jù)
        std::string data = Serialize(method, request);
        // 發(fā)送請求
        SendRequest(data);
        // 開啟線程接收響應
        std::thread t([this, callback]() {
            std::string response = RecvResponse();
            // 反序列化響應數(shù)據(jù)
            std::string result = Deserialize(response);
            callback(result);
        });
        t.detach();
    }

private:
    std::string Serialize(const std::string& method, const std::string& request)
    {
        // 省略序列化實現(xiàn)
    }

    void SendRequest(const std::string& data)
    {
        // 省略網(wǎng)絡發(fā)送實現(xiàn)
    }

    std::string RecvResponse()
    {
        // 省略網(wǎng)絡接收實現(xiàn)
    }

    std::string Deserialize(const std::string& response)
    {
        // 省略反序列化實現(xiàn)
    }

private:
    std::string server_address_;
};

int main()
{
    std::shared_ptrclient(new RPCClient("127.0.0.1:8000"));
    client->Call("Add", "1,2", [](const std::string& result) {
        std::cout<< "Result: "<< result<< std::endl;
    });
    return 0;
}

? 這段代碼定義了RPCClient類來處理客戶端的請求任務,用到了lambda和std::function來處理函數(shù)調(diào)用,在Call中使用多線程技術。main中使用智能指針管理Rpcclient類,并調(diào)用了客戶端的Add函數(shù)。?

? 127.0.0.1為本地地址,對開發(fā)來說需要使用本地地址自測,端口號為8000,需要選擇一個空閑端口來通信。

2.2 服務端代碼

? 下面是服務端的實現(xiàn)

#include#include#include#include#include#include// 使用第三方庫實現(xiàn)序列化和反序列化
#include#includeusing namespace std;

// 定義RPC函數(shù)類型
using RPCCallback = std::function;

class RPCHandler {
public:
    void registerCallback(const std::string& name, RPCCallback callback) {
        std::unique_locklock(mtx_);
        callbacks_[name] = callback;
    }

    std::string handleRequest(const std::string& request) {
        // 反序列化請求
        std::maprequestMap;
        std::istringstream is(request);
        boost::archive::text_iarchive ia(is);
        ia >>requestMap;

        // 查找并調(diào)用對應的回調(diào)函數(shù)
        std::string name = requestMap["name"];
        std::string args = requestMap["args"];
        std::unique_locklock(mtx_);
        auto it = callbacks_.find(name);
        if (it == callbacks_.end()) {
            return "Error: Unknown function";
        }
        RPCCallback callback = it->second;
        return callback(args);
    }

private:
    std::mapcallbacks_;
    std::mutex mtx_;
};

int main() {
    RPCHandler rpcHandler;

    // 注冊回調(diào)函數(shù)
    rpcHandler.registerCallback("add", [](const std::string& args) {
        std::istringstream is(args);
        int a, b;
        is >>a >>b;
        int result = a + b;
        std::ostringstream os;
        os<< result;
        return os.str();
    });

    rpcHandler.registerCallback("sub", [](const std::string& args) {
        std::istringstream is(args);
        int a, b;
        is >>a >>b;
        int result = a - b;
        std::ostringstream os;
        os<< result;
        return os.str
    });

    // 創(chuàng)建處理請求的線程
    std::thread requestThread([&]() {
        while (true) {
            std::string request;
            std::cin >>request;
            std::string response = rpcHandler.handleRequest(request);
            std::cout<< response<< std::endl;
        }
    });

    requestThread.join();
    return 0;
}

上面的代碼實現(xiàn)了一個簡單的C++ RPC服務端。主要實現(xiàn)了以下功能:

  1. 定義了RPC函數(shù)類型 RPCCallback,使用std::function表示。
  2. RPCHandler類實現(xiàn)了注冊函數(shù)和處理請求的功能。
  3. 在main函數(shù)中創(chuàng)建了一個RPCHandler對象,并注冊了兩個函數(shù)"add" 和 "sub"。這些函數(shù)通過lambda表達式實現(xiàn),并在被調(diào)用時通過std::istringstream讀取參數(shù)并返回結果。
  4. 創(chuàng)建了一個新線程requestThread來處理請求。在這個線程中,通過std::cin讀取請求,然后調(diào)用RPCHandler的handleRequest函數(shù)并使用std::cout輸出響應。

注意,這套代碼是最簡單的RPC機制,只能調(diào)用本地的資源,他還存在以下缺點:

  1. 代碼并沒有處理錯誤處理,如果請求格式不正確或函數(shù)不存在,服務端將會返回“Error: Unknown function”。
  2. 沒有使用網(wǎng)絡庫進行通信,所以只能在本機上使用。
  3. 沒有提供高效的并發(fā)性能,所有請求都在單獨的線程中處理。
  4. 沒有考慮RPC服務的可用性和高可用性,如果服務端崩潰或不可用,客戶端將無法繼續(xù)使用服務。
  5. 沒有考慮RPC服務的可擴展性,如果有大量請求需要處理,可能會導致性能問題。
  6. 使用了第三方庫Boost.Serialization來實現(xiàn)序列化和反序列化,如果不想使用第三方庫,可能需要自己實現(xiàn)序列化的功能。

下面我們一步一步完善它。

三、加強版RPC(以“RPC簡單實現(xiàn)”為基礎) 3.1 加入錯誤處理

下面是 RPCHandler 類中加入錯誤處理的代碼示例:

class RPCHandler {
public:
    // 其他代碼...

    std::string handleRequest(const std::string& request) {
        // 反序列化請求
        std::maprequestMap;
        std::istringstream is(request);
        boost::archive::text_iarchive ia(is);
        ia >>requestMap;

        // 查找并調(diào)用對應的回調(diào)函數(shù)
        std::string name = requestMap["name"];
        std::string args = requestMap["args"];
        std::unique_locklock(mtx_);
        auto it = callbacks_.find(name);
        if (it == callbacks_.end()) {
            return "Error: Unknown function";
        }
        RPCCallback callback = it->second;
        try {
            return callback(args);
        } catch (const std::exception& e) {
            return "Error: Exception occurred: " + std::string(e.what());
        } catch (...) {
            return "Error: Unknown exception occurred";
        }
    }
};

上面的代碼在 RPCHandler 類的 handleRequest 函數(shù)中加入了錯誤處理的代碼,它使用了 try-catch 語句來捕獲可能發(fā)生的異常。如果找不到對應的函數(shù)或發(fā)生了異常,會返回錯誤信息。這樣,如果請求格式不正確或函數(shù)不存在,服務端將會返回相應的錯誤信息。

3.2 加入網(wǎng)絡連接(socket)

? 加入網(wǎng)絡連接不需要動服務端的實現(xiàn),只需要在main里創(chuàng)造套接字去鏈接就好:

int main() 
{
    io_context ioc;
    ip::tcp::acceptor acceptor(ioc, ip::tcp::endpoint(ip::tcp::v4(), 8080));
    RPCHandler rpcHandler;
    // 注冊函數(shù)
    rpcHandler.registerCallback("add", [](const std::string& args) {
        std::istringstream is(args);
        int a, b;
        is >>a >>b;
        int result = a + b;
        std::ostringstream os;
        os<< result;
        return os.str();
    });

    rpcHandler.registerCallback("sub", [](const std::string& args) {
        std::istringstream is(args);
        int a, b;
        is >>a >>b;
        int result = a - b;
        std::ostringstream os;
        os<< result;
        return os.str();
    });

    // 等待連接
    while (true) {
        ip::tcp::socket socket(ioc);
        acceptor.accept(socket);

        // 創(chuàng)建線程處理請求
        std::thread requestThread([&](ip::tcp::socket socket) {
            while (true) {
                // 讀取請求
                boost::asio::streambuf buf;
                read_until(socket, buf, '\n');
                std::string request = boost::asio::buffer_cast(buf.data());
                request.pop_back();

                // 處理請求
                std::string response = rpcHandler.handleRequest(request);

                // 發(fā)送響應
                write(socket, buffer(response + '\n'));
            }
        }, std::move(socket));

        requestThread.detach();
    }

    return 0;
}

這是一個使用Boost.Asio庫實現(xiàn)的RPC服務端代碼示例。它使用了TCP協(xié)議監(jiān)聽8080端口,等待客戶端的連接。當有客戶端連接時,創(chuàng)建一個新線程來處理請求。請求和響應通過網(wǎng)絡傳輸。

3.3 加強并發(fā)性

?使用并發(fā)和異步機制,忽略重復代碼,實現(xiàn)如下:

class RPCHandler {
public:
    // ...
    void handleConnection(ip::tcp::socket socket) {
        while (true) {
            // 讀取請求
            boost::asio::streambuf buf;
            read_until(socket, buf, '\n');
            std::string request = boost::asio::buffer_cast(buf.data());
            request.pop_back();

            // 使用并行執(zhí)行處理請求
            std::vector>futures;
            for (int i = 0; i< request.size(); i++) {
                futures.emplace_back(std::async(std::launch::async, &RPCHandler::handleRequest, this, request[i]));
            }

            // 等待所有請求處理完成并發(fā)送響應
            for (auto& f : futures) {
                std::string response = f.get();
                write(socket, buffer(response + '\n'));
            }
        }
    }
};

這樣,請求會被分成多個部分并行處理,可以利用多核 CPU 的優(yōu)勢提高服務端的并發(fā)性能。

main():

int main() {
    io_context ioc;
    ip::tcp::acceptor acceptor(ioc, ip::tcp::endpoint(ip::tcp::v4(), 8080));
    RPCHandler rpcHandler;

    // 注冊函數(shù)
    rpcHandler.registerCallback("add", [](const std::string& args) {
        std::istringstream is(args);
        int a, b;
        is >>a >>b;
        int result = a + b;
        std::ostringstream os;
        os<< result;
        return os.str();
    });

    rpcHandler.registerCallback("sub", [](const std::string& args) {
        std::istringstream is(args);
        int a, b;
        is >>a >>b;
        int result = a - b;
        std::ostringstream os;
        os<< result;
        return os.str();
    });

    // 創(chuàng)建線程池
    boost::thread_pool::executor pool(10);

    // 等待連接
    while (true) {
        ip::tcp::socket socket(ioc);
        acceptor.accept(socket);

        // 將請求添加到線程池中處理
        pool.submit(boost::bind(&RPCHandler::handleConnection, &rpcHandler, std::move(socket)));
    }

    return 0;
}

在 main 函數(shù)中可以使用 boost::thread_pool::executor 來管理線程池,在線程池中提交任務來處理請求。這里的線程池大小設置為10,可以根據(jù)實際情況調(diào)整。

3.4 加入容錯機制(修改客戶端部分)

在其中使用了重試機制來保證客戶端能夠重新連接服務端:

class RPCClient {
public:
    RPCClient(const std::string& address, int port) : address_(address), port_(port), socket_(io_context_) {
        connect();
    }

    std::string call(const std::string& name, const std::string& args) {
        // 序列化請求
        std::ostringstream os;
        boost::archive::text_oarchive oa(os);
        std::maprequest;
        request["name"] = name;
        request["args"] = args;
        oa<< request;
        std::string requestStr = os.str();

        // 發(fā)送請求
        write(socket_, buffer(requestStr + '\n'));

        // 讀取響應
        boost::asio::streambuf buf;
        read_until(socket_, buf, '\n');
        std::string response = boost::asio::buffer_cast(buf.data());
        response.pop_back();

        return response;
    }

private:
    void connect() {
        bool connected = false;
        while (!connected) {
            try {
                socket_.connect(ip::tcp::endpoint(ip::address::from_string(address_), port_));
                connected = true;
            } catch (const std::exception& e) {
                std::cerr<< "Error connecting to server: "<< e.what()<< std::endl;
                std::this_thread::sleep_for(std::chrono::seconds(1));
            }
        }
    }

    std::string address_;
    int port_;
    io_context io_context_;
    ip::tcp::socket socket_;
};

在這個示例中,當連接服務端失敗時,客戶端會在一定的時間間隔后重試連接,直到成功連接上服務端為止。

3.5 負載均衡

? 服務端需要處理大量的請求,這部分的實現(xiàn)是可以獨立拎出來長篇大論的,在此貼出其他大神的帖子吧。

服務器負載均衡_負載均衡服務器_我是一條胖咸魚的博客-博客

什么是負載均衡,看完文章秒懂_愛銘網(wǎng)絡的博客-博客_負載均衡

四、總結

? 至此,我們逐步完善了RPC,在最簡單的RPC基礎上加入了網(wǎng)絡連接、加入錯誤處理、增強了并發(fā)訪問的功能、并加入了容錯機制,但是對于一個可以讓客戶正常使用的RPC來說,這還遠遠不夠,我本人也是實力有限,僅僅能讀懂或者解析部分RPC的設計動機及原理,要詳細介紹RPC光寫這些是遠遠不夠的。工作中一套RPC附加其他功能需要一個團隊忙活差不多兩個月,我僅僅在其中負責測試工具開發(fā)和代碼生成,所以有不妥的地方請讀者諒解,有錯的地方請指出必將改正。好夢!?。?/p>

你是否還在尋找穩(wěn)定的海外服務器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準確流量調(diào)度確保服務器高可用性,企業(yè)級服務器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧

標題名稱:C++簡單實現(xiàn)RPC網(wǎng)絡通訊-創(chuàng)新互聯(lián)
轉載源于:http://bm7419.com/article24/dpohce.html

成都網(wǎng)站建設公司_創(chuàng)新互聯(lián),為您提供移動網(wǎng)站建設、外貿(mào)網(wǎng)站建設、面包屑導航、電子商務網(wǎng)站維護、定制開發(fā)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉載內(nèi)容為主,如果涉及侵權請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉載,或轉載時需注明來源: 創(chuàng)新互聯(lián)

成都做網(wǎng)站