iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > 其他教程 >C++简单实现RPC网络通讯的示例详解
  • 116
分享到

C++简单实现RPC网络通讯的示例详解

C++实现RPC网络通讯C++ RPC网络通讯C++ 网络通讯 2023-05-18 05:05:50 116人浏览 独家记忆
摘要

目录一、rpc简介1.1 简介1.2 本地调用和远程调用的区别1.3 RPC运行的流程1.4 小结二、RPC简单实现2.1 客户端实现代码2.2 服务端代码三、加强版RPC(以&ld

RPC是远程调用系统简称,它允许程序调用运行在另一台计算机上的过程,就像调用本地的过程一样。RPC 实现了网络编程的“过程调用”模型,让程序员可以像调用本地函数一样调用远程函数。最近在做的也是远程调用过程,所以通过重新梳理RPC来整理总结一下。

项目来源:

GitHub - qicosmos/rest_rpc: modern c++(C++11), simple, easy to use rpc framework

一、RPC简介

1.1 简介

RPC指的是计算机A的进程调用另外一台计算机B的进程,A上的进程被挂起,B上被调用的进程开始执行,当B执行完毕后将执行结果返回给A,A的进程继续执行。调用方可以通过使用参数将信息传送给被调用方,然后通过传回的结果得到信息。这些传递的信息都是被加密过或者其他方式处理。这个过程对开发人员是透明的,因此RPC可以看作是本地过程调用的一种扩展,使被调用过程不必与调用过程位于同一物理机中。

RPC可以用于构建基于B/S模式的分布式应用程序:请求服务是一个客户端、而服务提供程序是一台服务器。和常规和本地的调用过程一样,远程过程调用是同步操作,在结果返回之前,需要暂时中止请求程序。

RPC的优点:

  • 支持面向过程和面向线程的模型;
  • 内部消息传递机制对用户隐藏;
  • 基于 RPC 模式的开发可以减少代码重写;
  • 可以在本地环境和分布式环境中运行;

1.2 本地调用和远程调用的区别

以ARM环境为例,我们拆解本地调用的过程,以下面代码为例:

int selfIncrement(int a)
{
    return a + 1;
}
int a = 10;

当执行到selfIncrement(a)时,首先把a存入寄存器R0,之后转到函数地址selfIncrement,执行函数内的指令 ADD R0,#1。跳转到函数的地址偏移量在编译时确定。

但是如果这是一个远程调用,selfIncrement函数存在于其他机器,为了实现远程调用,请求方和服务方需要提供需要解决以下问题:

1. 网络传输。

本地调用的参数存放在寄存器或栈中,在同一块内存中,可以直接访问到。远程过程调用需要借助网络来传递参数和需要调用的函数 ID。

2. 编解码

请求方需要将参数转化为字节流,服务提供方需要将字节流转化为参数。

3. 函数映射表

服务提供方的函数需要有唯一的 ID 标识,请求方通过 ID 标识告知服务提供方需要调用哪个函数。

以上三个功能即为 RPC 的基本框架所必须包含的功能。

1.3 RPC运行的流程

一次 RPC 调用的运行流程大致分为如下七步,具体如下图所示。

1.客户端调用客户端存根程序,将参数传入;

2.客户端存根程序将参数转化为标准格式,并编组进消息;

3.客户端存根程序将消息发送到传输层,传输层将消息传送至远程服务器;

4.服务器的传输层将消息传递到服务器存根程序,存根程序对阐述进行解包,并使用本地调用的机制调用所需的函数;

5.运算完成之后,将结果返回给服务器存根,存根将结果编组为消息,之后发送给传输层;

6.服务器传输层将结果消息发送给客户端传输层;

7.客户端存根对返回消息解包,并返回给调用方。

服务端存根和客户端存根可以看做是被封装起来的细节,这些细节对于开发人员来说是透明的,但是在客户端层面看到的是 “本地” 调用了 selfIncrement() 方法,在服务端层面,则需要封装、网络传输、解封装等等操作。因此 RPC 可以看作是传统本地过程调用的一种扩展,其使得被调用过程不必与调用过程位于同一物理机中。

1.4 小结

RPC 的目标是做到在远程机器上调用函数与本地调用函数一样的体验。 为了达到这个目的,需要实现网络传输、序列化与反序列化、函数映射表等功能,其中网络传输可以使用socket或其他,序列化和反序列化可以使用protobuf,函数映射表可以使用std::function。

lambda与std::function内容可以看:

C++11 匿名函数lambda的使用

C++11 std::function 基础用法

lambda 表达式和 std::function 的功能是类似的,lambda 表达式可以转换为 std::function,一般情况下,更多使用 lambda 表达式,只有在需要回调函数的情况下才会使用 std::function。

二、RPC简单实现

2.1 客户端实现代码

#include <iOStream>
#include <memory>
#include <thread>
#include <functional>
#include <cstring>
 
class RPCClient
{
public:
    using RPCCallback = std::function<void(const std::string&)>;
    RPCClient(const std::string& server_address) : server_address_(server_address) {}
    ~RPCClient() {}
 
    void Call(const std::string& method, const std::string& request, RPCCallback callback)
    {
        // 序列化请求数据
        std::string data = Serialize(method, request);
        // 发送请求
        SendRequest(data);
        // 开启线程接收响应
        std::thread t([this, callback]() {
            std::string response = RecvResponse();
            // 反序列化响应数据
            std::string result = Deserialize(response);
            callback(result);
        });
        t.detach();
    }
 
private:
    std::string Serialize(const std::string& method, const std::string& request)
    {
        // 省略序列化实现
    }
 
    void SendRequest(const std::string& data)
    {
        // 省略网络发送实现
    }
 
    std::string RecvResponse()
    {
        // 省略网络接收实现
    }
 
    std::string Deserialize(const std::string& response)
    {
        // 省略反序列化实现
    }
 
private:
    std::string server_address_;
};
 
int main()
{
    std::shared_ptr<RPCClient> client(new RPCClient("127.0.0.1:8000"));
    client->Call("Add", "1,2", [](const std::string& result) {
        std::cout << "Result: " << result << std::endl;
    });
    return 0;
}

这段代码定义了RPCClient类来处理客户端的请求任务,用到了lambda和std::function来处理函数调用,在Call中使用多线程技术。main中使用智能指针管理Rpcclient类,并调用了客户端的Add函数。 

127.0.0.1为本地地址,对开发来说需要使用本地地址自测,端口号为8000,需要选择一个空闲端口来通信。

2.2 服务端代码

下面是服务端的实现

#include <iostream>
#include <map>
#include <functional>
#include <memory>
#include <thread>
#include <mutex>
// 使用第三方库实现序列化和反序列化
#include <boost/serialization/serialization.hpp>
#include <boost/serialization/map.hpp>
using namespace std;
// 定义RPC函数类型
using RPCCallback = std::function<std::string(const std::string&)>;
class RPCHandler {
public:
    void reGISterCallback(const std::string& name, RPCCallback callback) {
        std::unique_lock<std::mutex> lock(mtx_);
        callbacks_[name] = callback;
    }
    std::string handleRequest(const std::string& request) {
        // 反序列化请求
        std::map<std::string, std::string> requestMap;
        std::istringstream is(request);
        boost::arcHive::text_iarchive ia(is);
        ia >> requestMap;
        // 查找并调用对应的回调函数
        std::string name = requestMap["name"];
        std::string args = requestMap["args"];
        std::unique_lock<std::mutex> lock(mtx_);
        auto it = callbacks_.find(name);
        if (it == callbacks_.end()) {
            return "Error: Unknown function";
        }
        RPCCallback callback = it->second;
        return callback(args);
    }
private:
    std::map<std::string, RPCCallback> callbacks_;
    std::mutex mtx_;
};
int main() {
    RPCHandler rpcHandler;
    // 注册回调函数
    rpcHandler.registerCallback("add", [](const std::string& args) {
        std::istringstream is(args);
        int a, b;
        is >> a >> b;
        int result = a + b;
        std::ostringstream os;
        os << result;
        return os.str();
    });
    rpcHandler.registerCallback("sub", [](const std::string& args) {
        std::istringstream is(args);
        int a, b;
        is >> a >> b;
        int result = a - b;
        std::ostringstream os;
        os << result;
        return os.str
    });
    // 创建处理请求的线程
    std::thread requestThread([&]() {
        while (true) {
            std::string request;
            std::cin >> request;
            std::string response = rpcHandler.handleRequest(request);
            std::cout << response << std::endl;
        }
    });
    requestThread.join();
    return 0;
}

上面的代码实现了一个简单的C++ RPC服务端。主要实现了以下功能:

1.定义了RPC函数类型 RPCCallback,使用std::function<std::string(const std::string&)>表示。

2.RPCHandler类实现了注册函数和处理请求的功能。

3.在main函数中创建了一个RPCHandler对象,并注册了两个函数"add" 和 "sub"。这些函数通过lambda表达式实现,并在被调用时通过std::istringstream读取参数并返回结果。

4.创建了一个新线程requestThread来处理请求。在这个线程中,通过std::cin读取请求,然后调用RPCHandler的handleRequest函数并使用std::cout输出响应。

注意,这套代码是最简单的RPC机制,只能调用本地的资源,他还存在以下缺点:

1.代码并没有处理错误处理,如果请求格式不正确或函数不存在,服务端将会返回“Error: Unknown function”。

2.没有使用网络库进行通信,所以只能在本机上使用。

3.没有提供高效的并发性能,所有请求都在单独的线程中处理。

4.没有考虑RPC服务的可用性和高可用性,如果服务端崩溃或不可用,客户端将无法继续使用服务。

5.没有考虑RPC服务的可扩展性,如果有大量请求需要处理,可能会导致性能问题。

6.使用了第三方库Boost.Serialization来实现序列化和反序列化,如果不想使用第三方库,可能需要自己实现序列化的功能。

下面我们一步一步完善它。

三、加强版RPC(以“RPC简单实现”为基础)

3.1 加入错误处理

下面是 RPCHandler 类中加入错误处理的代码示例:

class RPCHandler {
public:
    // 其他代码...
    std::string handleRequest(const std::string& request) {
        // 反序列化请求
        std::map<std::string, std::string> requestMap;
        std::istringstream is(request);
        boost::archive::text_iarchive ia(is);
        ia >> requestMap;
        // 查找并调用对应的回调函数
        std::string name = requestMap["name"];
        std::string args = requestMap["args"];
        std::unique_lock<std::mutex> lock(mtx_);
        auto it = callbacks_.find(name);
        if (it == callbacks_.end()) {
            return "Error: Unknown function";
        }
        RPCCallback callback = it->second;
        try {
            return callback(args);
        } catch (const std::exception& e) {
            return "Error: Exception occurred: " + std::string(e.what());
        } catch (...) {
            return "Error: Unknown exception occurred";
        }
    }
};

上面的代码在 RPCHandler 类的 handleRequest 函数中加入了错误处理的代码,它使用了 try-catch 语句来捕获可能发生的异常。如果找不到对应的函数或发生了异常,会返回错误信息。这样,如果请求格式不正确或函数不存在,服务端将会返回相应的错误信息。

3.2 加入网络连接(socket)

加入网络连接不需要动服务端的实现,只需要在main里创造套接字去链接就好:

int main() 
{
    io_context ioc;
    ip::tcp::acceptor acceptor(ioc, ip::tcp::endpoint(ip::tcp::v4(), 8080));
    RPCHandler rpcHandler;
    // 注册函数
    rpcHandler.registerCallback("add", [](const std::string& args) {
        std::istringstream is(args);
        int a, b;
        is >> a >> b;
        int result = a + b;
        std::ostringstream os;
        os << result;
        return os.str();
    });
    rpcHandler.registerCallback("sub", [](const std::string& args) {
        std::istringstream is(args);
        int a, b;
        is >> a >> b;
        int result = a - b;
        std::ostringstream os;
        os << result;
        return os.str();
    });
    // 等待连接
    while (true) {
        ip::tcp::socket socket(ioc);
        acceptor.accept(socket);
        // 创建线程处理请求
        std::thread requestThread([&](ip::tcp::socket socket) {
            while (true) {
                // 读取请求
                boost::asio::streambuf buf;
                read_until(socket, buf, '\n');
                std::string request = boost::asio::buffer_cast<const char*>(buf.data());
                request.pop_back();
                // 处理请求
                std::string response = rpcHandler.handleRequest(request);
                // 发送响应
                write(socket, buffer(response + '\n'));
            }
        }, std::move(socket));
        requestThread.detach();
    }
    return 0;
}

这是一个使用Boost.Asio库实现的RPC服务端代码示例。它使用了TCP协议监听8080端口,等待客户端的连接。当有客户端连接时,创建一个新线程来处理请求。请求和响应通过网络传输。

3.3 加强并发性

使用并发和异步机制,忽略重复代码,实现如下:

class RPCHandler {
public:
    // ...
    void handleConnection(ip::tcp::socket socket) {
        while (true) {
            // 读取请求
            boost::asio::streambuf buf;
            read_until(socket, buf, '\n');
            std::string request = boost::asio::buffer_cast<const char*>(buf.data());
            request.pop_back();
            // 使用并行执行处理请求
            std::vector<std::future<std::string>> futures;
            for (int i = 0; i < request.size(); i++) {
                futures.emplace_back(std::async(std::launch::async, &RPCHandler::handleRequest, this, request[i]));
            }
            // 等待所有请求处理完成并发送响应
            for (auto& f : futures) {
                std::string response = f.get();
                write(socket, buffer(response + '\n'));
            }
        }
    }
};

这样,请求会被分成多个部分并行处理,可以利用多核 CPU 的优势提高服务端的并发性能。

main():

int main() {
    io_context ioc;
    ip::tcp::acceptor acceptor(ioc, ip::tcp::endpoint(ip::tcp::v4(), 8080));
    RPCHandler rpcHandler;
    // 注册函数
    rpcHandler.registerCallback("add", [](const std::string& args) {
        std::istringstream is(args);
        int a, b;
        is >> a >> b;
        int result = a + b;
        std::ostringstream os;
        os << result;
        return os.str();
    });
    rpcHandler.registerCallback("sub", [](const std::string& args) {
        std::istringstream is(args);
        int a, b;
        is >> a >> b;
        int result = a - b;
        std::ostringstream os;
        os << result;
        return os.str();
    });
    // 创建线程池
    boost::thread_pool::executor pool(10);
    // 等待连接
    while (true) {
        ip::tcp::socket socket(ioc);
        acceptor.accept(socket);
        // 将请求添加到线程池中处理
        pool.submit(boost::bind(&RPCHandler::handleConnection, &rpcHandler, std::move(socket)));
    }
    return 0;
}

在 main 函数中可以使用 boost::thread_pool::executor 来管理线程池,在线程池中提交任务来处理请求。这里的线程池大小设置为10,可以根据实际情况调整。

3.4 加入容错机制(修改客户端部分)

在其中使用了重试机制来保证客户端能够重新连接服务端:

class RPCClient {
public:
    RPCClient(const std::string& address, int port) : address_(address), port_(port), socket_(io_context_) {
        connect();
    }
    std::string call(const std::string& name, const std::string& args) {
        // 序列化请求
        std::ostringstream os;
        boost::archive::text_oarchive oa(os);
        std::map<std::string, std::string> request;
        request["name"] = name;
        request["args"] = args;
        oa << request;
        std::string requestStr = os.str();
        // 发送请求
        write(socket_, buffer(requestStr + '\n'));
        // 读取响应
        boost::asio::streambuf buf;
        read_until(socket_, buf, '\n');
        std::string response = boost::asio::buffer_cast<const char*>(buf.data());
        response.pop_back();
        return response;
    }
private:
    void connect() {
        bool connected = false;
        while (!connected) {
            try {
                socket_.connect(ip::tcp::endpoint(ip::address::from_string(address_), port_));
                connected = true;
            } catch (const std::exception& e) {
                std::cerr << "Error connecting to server: " << e.what() << std::endl;
                std::this_thread::sleep_for(std::chrono::seconds(1));
            }
        }
    }
    std::string address_;
    int port_;
    io_context io_context_;
    ip::tcp::socket socket_;
};

在这个示例中,当连接服务端失败时,客户端会在一定的时间间隔后重试连接,直到成功连接上服务端为止。

以上就是C++简单实现RPC网络通讯的示例详解的详细内容,更多关于C++实现RPC网络通讯的资料请关注编程网其它相关文章!

--结束END--

本文标题: C++简单实现RPC网络通讯的示例详解

本文链接: https://www.lsjlt.com/news/211680.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • C++简单实现RPC网络通讯的示例详解
    目录一、RPC简介1.1 简介1.2 本地调用和远程调用的区别1.3 RPC运行的流程1.4 小结二、RPC简单实现2.1 客户端实现代码2.2 服务端代码三、加强版RPC(以&ld...
    99+
    2023-05-18
    C++实现RPC网络通讯 C++ RPC网络通讯 C++ 网络通讯
  • C#实现简单串口通讯实例
    本文实例为大家分享了C#实现简单串口通讯的具体代码,供大家参考,具体内容如下 参数设置界面代码: using System; using System.Collections.Gen...
    99+
    2024-04-02
  • c# 实现简单的串口通讯
    目录开发环境:第一步第二步第三步本文提供一个用C#实现串口通讯实例,亲自编写,亲测可用! 开发环境: VS2008+.net FrameWork3.5(实际上2.0应该也可以) 第一...
    99+
    2024-04-02
  • C++实现简单通讯录系统
    本文实例为大家分享了C++实现简单通讯录系统的具体代码,供大家参考,具体内容如下 需求分析: 1.通讯录可以添加联系人。 2.通讯录可以显示所有联系人。 3.通讯录可以查找联系人。 ...
    99+
    2024-04-02
  • C++实现一个简单消息队列的示例详解
    目录前言一、如何实现1、接口定义2、用到的对象3、基本流程二、完整代码三、使用示例线程通信总结前言 消息队列在多线程的场景有时会用到,尤其是线程通信跨线程调用的时候,就可以使用消息队...
    99+
    2022-12-15
    C++实现消息队列 C++消息队列
  • node.js实现简单爬虫示例详解
    目录node.js实现简单爬虫第一步第二步爬虫结果小结:node.js实现简单爬虫 工具:cheerio cheerio 是 nodejs 特别为服务端定制的,能够快速灵活的对 JQ...
    99+
    2023-05-17
    node.js简单爬虫 node.js爬虫
  • C#怎么实现简单串口通讯
    本篇内容主要讲解“C#怎么实现简单串口通讯”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“C#怎么实现简单串口通讯”吧!参数设置界面代码:using System;using ...
    99+
    2023-06-29
  • java编程实现简单的网络爬虫示例过程
    本项目中需要用到两个第三方jar包,分别为 jsoup 和 commons-io。 jsoup的作用是为了解析网页, commons-io 是为了把数据保存到本地。 1.爬取贴吧 第...
    99+
    2024-04-02
  • java实现一个简单的网络爬虫代码示例
    目前市面上流行的爬虫以python居多,简单了解之后,觉得简单的一些页面的爬虫,主要就是去解析目标页面(html)。那么就在想,java有没有用户方便解析html页面呢?找到了一个jsoup包,一个非常方便解析html的工具呢。使用方式也非...
    99+
    2023-05-30
    网络爬虫 java jsoup
  • C++实现简单的通讯录管理系统
    本文实例为大家分享了C++实现简单的通讯录管理系统的具体代码,供大家参考,具体内容如下 案例描述: 通讯录是一个可以记录亲人、好友信息的工具。本教程主要利用C++来实现一个通讯录管理...
    99+
    2024-04-02
  • C语言实现简单通讯录功能
    本文实例为大家分享了C语言实现简单通讯录功能的具体代码,供大家参考,具体内容如下 1.存放联系人信息 2.信息:名字+年龄+性别+电话+住址 3.增加联系人 4.删除联系人 5.查找...
    99+
    2024-04-02
  • JavaScript模拟实现简单的MVC的示例详解
    目录场景核心思想initControllerViewModelMVC是一种常见的软件架构模式,将一个应用程序分为三个核心的部分:模型(Model)、视图(View)和控制器(Cont...
    99+
    2023-05-15
    JavaScript实现MVC JavaScript MVC
  • C语言实现简易通讯录实例
    目录一、问题描述二、功能介绍二、实现流程1.创建通讯录2.创建源文件test.c3.创建源文件contact.c4.删除通讯录联系人信息 (DelContact函数实现)5...
    99+
    2024-04-02
  • C语言实现简单通讯录系统
    本文实例为大家分享了C语言通讯录系统(增删改查),供大家参考,具体内容如下 全部代码如下所示: #include <iostream> #include <s...
    99+
    2024-04-02
  • c#多进程通讯的实现示例
    目录引言共享内存Windows的MSMQ命名管道匿名管道Channel     IPC     Http  &n...
    99+
    2024-04-02
  • C++实现简单通讯录管理系统
    本文实例为大家分享了C++实现简单的通讯录管理系统的具体代码,供大家参考,具体内容如下 一、代码 #include <iostream> #include <str...
    99+
    2024-04-02
  • JS实现简单的下雪特效示例详解
    目录前言主要实现代码HTML代码JS代码前言 很多南方的小伙伴可能没怎么见过或者从来没见过下雪,今天我给大家带来一个小Demo,模拟了下雪场景,首先让我们看一下运行效果 可以点击看看...
    99+
    2024-04-02
  • Golang实现简单http服务器的示例详解
    目录一、基本描述二 、具体方法2.1 连接的建立2.2 http请求解析2.3 http请求处理2.4 http请求响应三、完整示例一、基本描述 完成一个http请求的处理和响应,主...
    99+
    2023-03-20
    Golang实现http服务器 Golang http服务器 Golang 服务器
  • C#实现TCP和UDP通信的示例详解
    目录UDP发送UDP接收TCP发送TCP接收C#在命名空间System.Net.Sockets中对伯克利套接字提供了良好的封装,提供了完善的TCP和UDP通信功能。 从编程的角度出发...
    99+
    2023-03-01
    C# TCP UDP通信 C# TCP UDP C# TCP C# UDP
  • C++简易通讯录系统实现流程详解
    目录实现功能定义通讯录和通讯录人员结构体实现通讯录输入菜单1.定义菜单函数:2.主函数循环体中循环展示菜单信息实现增删改查和清空通讯录功能运行截图展示实现功能 提示:这里可以添加本文...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作