广告
返回顶部
首页 > 资讯 > 后端开发 > 其他教程 >C++中的Reactor原理与实现
  • 525
分享到

C++中的Reactor原理与实现

2024-04-02 19:04:59 525人浏览 薄情痞子
摘要

目录一、Reactor介绍二、代码实现一、Reactor介绍 reactor设计模式是event-driven architecture的一种实现方式,处理多个客户端并发的向服务端请

一、Reactor介绍

reactor设计模式是event-driven architecture的一种实现方式,处理多个客户端并发的向服务端请求服务的场景。每种服务在服务端可能由多个方法组成。reactor会解耦并发请求的服务并分发给对应的事件处理器来处理。

中心思想是将所有要处理的I/o事件注册到一个中心I/o多路复用器上,同时主线程/进程阻塞在多路复用器上;一旦有I/o事件到来或是准备就绪(文件描述符或Socket可读、写),多路复用器返回并将事先注册的相应l/o事件分发到对应的处理器中。

处理机制为:主程序将事件以及对应事件处理的方法在Reactor上进行注册, 如果相应的事件发生,Reactor将会主动调用事件注册的接口,即 回调函数.

二、代码实现

前提准备:1单例模式:单例模式(Singleton Pattern,也称为单件模式),使用最广泛的设计模式之一。其意图是保证一个类(结构体)仅有一个实例,并提供一个访问它的全局访问点,该实例被所有程序模块共享。
2.回调函数:把一段可执行的代码像参数传递那样传给其他代码,而这段代码会在某个时刻被调用执行,这就叫做回调。

对epoll反应堆中结构体定义


struct nitem { // fd

	int fd;		//要监听的文件描述符

	int status;	//是否在监听:1->在红黑树上(监听),0->不在(不监听)
	int events;	//对应的监听事件,	EPOLLIN和EPOLLOUT(不同的事件,走不同的回调函数)
	void *arg;	//指向自己结构体指针
#if 0
	NCALLBACK callback;
#else
	NCALLBACK *readcb;   // epollin
	NCALLBACK *writecb;  // epollout
	NCALLBACK *acceptcb; // epollin
#endif
	unsigned char sbuffer[BUFFER_LENGTH]; //
	int slength;

	unsigned char rbuffer[BUFFER_LENGTH];
	int rlength;
	
};


struct itemblock {

	struct itemblock *next;
	struct nitem *items;

};

struct reactor {

	int epfd;
	struct itemblock *head; 

};

单例模式,创建reactor的一个实例


struct reactor *instance = NULL;
int init_reactor(struct reactor *r) {

	if (r == NULL) return -1;

	int epfd = epoll_create(1); //int size
	r->epfd = epfd;

	// fd --> item
	r->head = (struct itemblock*)malloc(sizeof(struct itemblock));
	if (r->head == NULL) {
		close(epfd);
		return -2;
	} 
	memset(r->head, 0, sizeof(struct itemblock));

	r->head->items = (struct nitem *)malloc(MAX_EPOLL_EVENT * sizeof(struct nitem));
	if (r->head->items == NULL) {
		free(r->head);
		close(epfd);
		return -2;
	}
	memset(r->head->items, 0, (MAX_EPOLL_EVENT * sizeof(struct nitem)));
	
	r->head->next = NULL;
	
	return 0;
}
struct reactor *getInstance(void) { //singleton

	if (instance == NULL) {

		instance = (struct reactor *)malloc(sizeof(struct reactor));
		if (instance == NULL) return NULL;
		memset(instance, 0, sizeof(struct reactor));

		if (0 > init_reactor(instance)) {
			free(instance);
			return NULL;
		}

	}

	return instance;
}

事件注册





int nreactor_set_event(int fd, NCALLBACK cb, int event, void *arg) {

	struct reactor *r = getInstance();
	
	struct epoll_event ev = {0};
	//1
	if (event == READ_CB) {
		r->head->items[fd].fd = fd;
		r->head->items[fd].readcb = cb;
		r->head->items[fd].arg = arg;

		ev.events = EPOLLIN;
		
	}
	//2
	else if (event == WRITE_CB) {
		r->head->items[fd].fd = fd;
		r->head->items[fd].writecb = cb;
		r->head->items[fd].arg = arg;

		ev.events = EPOLLOUT;
	} 
	//3
	else if (event == ACCEPT_CB) {
		r->head->items[fd].fd = fd;
		r->head->items[fd].acceptcb = cb;	//回调函数
		r->head->items[fd].arg = arg;

		ev.events = EPOLLIN;
	}

	ev.data.ptr = &r->head->items[fd];

	
	if (r->head->items[fd].events == NOSET_CB) {
		if (epoll_ctl(r->epfd, EPOLL_CTL_ADD, fd, &ev) < 0) {
			printf("epoll_ctl EPOLL_CTL_ADD failed, %d\n", errno);
			return -1;
		}
		r->head->items[fd].events = event;
	} else if (r->head->items[fd].events != event) {

		if (epoll_ctl(r->epfd, EPOLL_CTL_MOD, fd, &ev) < 0) {
			printf("epoll_ctl EPOLL_CTL_MOD failed\n");
			return -1;
		}
		r->head->items[fd].events = event;
	}
	
	return 0;
}

回调函数书写

int write_callback(int fd, int event, void *arg) {
	struct reactor *R = getInstance();
	
	unsigned char *sbuffer = R->head->items[fd].sbuffer;
	int length = R->head->items[fd].slength;
	int ret = send(fd, sbuffer, length, 0);
	if (ret < length) {
		nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
	} else {
		nreactor_set_event(fd, read_callback, READ_CB, NULL);
	}
	return 0;
}
// 5k qps
int read_callback(int fd, int event, void *arg) {
	struct reactor *R = getInstance();
	unsigned char *buffer = R->head->items[fd].rbuffer;
	
#if 0 //ET
	int idx = 0, ret = 0;
	while (idx < BUFFER_LENGTH) {
		ret = recv(fd, buffer+idx, BUFFER_LENGTH-idx, 0);
		if (ret == -1) { 
			break;
		} else if (ret > 0) {
			idx += ret;
		} else {// == 0
			break;
		}
	}
	if (idx == BUFFER_LENGTH && ret != -1) {
		nreactor_set_event(fd, read_callback, READ_CB, NULL);
	} else if (ret == 0) {
		nreactor_set_event
		//close(fd);
	} else {
		nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
	}
	
#else //LT
	int ret = recv(fd, buffer, BUFFER_LENGTH, 0);
	if (ret == 0) { // fin
		
		nreactor_del_event(fd, NULL, 0, NULL);
		close(fd);
		
	} else if (ret > 0) {
		unsigned char *sbuffer = R->head->items[fd].sbuffer;
		memcpy(sbuffer, buffer, ret);
		R->head->items[fd].slength = ret;
		printf("readcb: %s\n", sbuffer);
		nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
	}
		
#endif
	
}
// WEB server 
// ET / LT
int accept_callback(int fd, int event, void *arg) {
	int connfd;
	struct sockaddr_in client;
    socklen_t len = sizeof(client);
    if ((connfd = accept(fd, (struct sockaddr *)&client, &len)) == -1) {
        printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }
	nreactor_set_event(connfd, read_callback, READ_CB, NULL);
}

监听描述符变化

// accept --> EPOLL

int reactor_loop(int listenfd) {

	struct reactor *R = getInstance();	
	
	struct epoll_event events[POLL_SIZE] = {0};
	while (1) {
		int nready = epoll_wait(R->epfd, events, POLL_SIZE, -1);
		if (nready == -1) {
			continue;
		}

		int i = 0;
		for (i = 0;i < nready;i ++) {
			
			struct nitem *item = (struct nitem *)events[i].data.ptr;
			int connfd = item->fd;

			if (connfd == listenfd) { //
				item->acceptcb(listenfd, 0, NULL);
			} else {
			
				if (events[i].events & EPOLLIN) { //
					item->readcb(connfd, 0, NULL);
				
				} 
				if (events[i].events & EPOLLOUT) {
					item->writecb(connfd, 0, NULL);
		
				}
			}
		}

	}
	return 0;
}

完整代码实现

#define MAXLNE  4096
#define POLL_SIZE	1024
#define BUFFER_LENGTH		1024
#define MAX_EPOLL_EVENT		1024
#define NOSET_CB	0
#define READ_CB		1
#define WRITE_CB	2
#define ACCEPT_CB	3

typedef int NCALLBACK(int fd, int event, void *arg);

struct nitem { // fd
	int fd;		//要监听的文件描述符
	int status;	//是否在监听:1->在红黑树上(监听),0->不在(不监听)
	int events;	//对应的监听事件,	EPOLLIN和EPOLLOUT(不同的事件,走不同的回调函数)
	void *arg;	//指向自己结构体指针
#if 0
	NCALLBACK callback;
#else
	NCALLBACK *readcb;   // epollin
	NCALLBACK *writecb;  // epollout
	NCALLBACK *acceptcb; // epollin
#endif
	unsigned char sbuffer[BUFFER_LENGTH]; //
	int slength;
	unsigned char rbuffer[BUFFER_LENGTH];
	int rlength;
	
};

struct itemblock {
	struct itemblock *next;
	struct nitem *items;
};

struct reactor {
	int epfd;
	struct itemblock *head; 
};

int init_reactor(struct reactor *r);
int read_callback(int fd, int event, void *arg);
int write_callback(int fd, int event, void *arg);
int accept_callback(int fd, int event, void *arg);

struct reactor *instance = NULL;
struct reactor *getInstance(void) { //singleton
	if (instance == NULL) {
		instance = (struct reactor *)malloc(sizeof(struct reactor));
		if (instance == NULL) return NULL;
		memset(instance, 0, sizeof(struct reactor));
		if (0 > init_reactor(instance)) {
			free(instance);
			return NULL;
		}
	}
	return instance;
}




int nreactor_set_event(int fd, NCALLBACK cb, int event, void *arg) {
	struct reactor *r = getInstance();
	
	struct epoll_event ev = {0};
	//1
	if (event == READ_CB) {
		r->head->items[fd].fd = fd;
		r->head->items[fd].readcb = cb;
		r->head->items[fd].arg = arg;
		ev.events = EPOLLIN;
		
	}
	//2
	else if (event == WRITE_CB) {
		r->head->items[fd].fd = fd;
		r->head->items[fd].writecb = cb;
		r->head->items[fd].arg = arg;
		ev.events = EPOLLOUT;
	} 
	//3
	else if (event == ACCEPT_CB) {
		r->head->items[fd].fd = fd;
		r->head->items[fd].acceptcb = cb;	//回调函数
		r->head->items[fd].arg = arg;
		ev.events = EPOLLIN;
	}
	ev.data.ptr = &r->head->items[fd];
	
	if (r->head->items[fd].events == NOSET_CB) {
		if (epoll_ctl(r->epfd, EPOLL_CTL_ADD, fd, &ev) < 0) {
			printf("epoll_ctl EPOLL_CTL_ADD failed, %d\n", errno);
			return -1;
		}
		r->head->items[fd].events = event;
	} else if (r->head->items[fd].events != event) {
		if (epoll_ctl(r->epfd, EPOLL_CTL_MOD, fd, &ev) < 0) {
			printf("epoll_ctl EPOLL_CTL_MOD failed\n");
			return -1;
		}
		r->head->items[fd].events = event;
	}
	
	return 0;
}



int nreactor_del_event(int fd, NCALLBACK cb, int event, void *arg) {
	struct reactor *r = getInstance();
	
	struct epoll_event ev = {0};
	ev.data.ptr = arg;
	epoll_ctl(r->epfd, EPOLL_CTL_DEL, fd, &ev);
	r->head->items[fd].events = 0;
	return 0;
}
int write_callback(int fd, int event, void *arg) {
	struct reactor *R = getInstance();
	
	unsigned char *sbuffer = R->head->items[fd].sbuffer;
	int length = R->head->items[fd].slength;
	int ret = send(fd, sbuffer, length, 0);
	if (ret < length) {
		nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
	} else {
		nreactor_set_event(fd, read_callback, READ_CB, NULL);
	}
	return 0;
}
// 5k qps
int read_callback(int fd, int event, void *arg) {
	struct reactor *R = getInstance();
	unsigned char *buffer = R->head->items[fd].rbuffer;
	
#if 0 //ET
	int idx = 0, ret = 0;
	while (idx < BUFFER_LENGTH) {
		ret = recv(fd, buffer+idx, BUFFER_LENGTH-idx, 0);
		if (ret == -1) { 
			break;
		} else if (ret > 0) {
			idx += ret;
		} else {// == 0
			break;
		}
	}
	if (idx == BUFFER_LENGTH && ret != -1) {
		nreactor_set_event(fd, read_callback, READ_CB, NULL);
	} else if (ret == 0) {
		nreactor_set_event
		//close(fd);
	} else {
		nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
	}
	
#else //LT
	int ret = recv(fd, buffer, BUFFER_LENGTH, 0);
	if (ret == 0) { // fin
		
		nreactor_del_event(fd, NULL, 0, NULL);
		close(fd);
		
	} else if (ret > 0) {
		unsigned char *sbuffer = R->head->items[fd].sbuffer;
		memcpy(sbuffer, buffer, ret);
		R->head->items[fd].slength = ret;
		printf("readcb: %s\n", sbuffer);
		nreactor_set_event(fd, write_callback, WRITE_CB, NULL);
	}
		
#endif
	
}
// web server 
// ET / LT
int accept_callback(int fd, int event, void *arg) {
	int connfd;
	struct sockaddr_in client;
    socklen_t len = sizeof(client);
    if ((connfd = accept(fd, (struct sockaddr *)&client, &len)) == -1) {
        printf("accept socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }
	nreactor_set_event(connfd, read_callback, READ_CB, NULL);
}
int init_server(int port) {
	int listenfd;
    struct sockaddr_in servaddr;
    char buff[MAXLNE];
 
    if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
        printf("create socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }
 
    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(port);
 
    if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) {
        printf("bind socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }
 
    if (listen(listenfd, 10) == -1) {
        printf("listen socket error: %s(errno: %d)\n", strerror(errno), errno);
        return 0;
    }
	return listenfd;
}
int init_reactor(struct reactor *r) {
	if (r == NULL) return -1;
	int epfd = epoll_create(1); //int size
	r->epfd = epfd;
	// fd --> item
	r->head = (struct itemblock*)malloc(sizeof(struct itemblock));
	if (r->head == NULL) {
		close(epfd);
		return -2;
	} 
	memset(r->head, 0, sizeof(struct itemblock));
	r->head->items = (struct nitem *)malloc(MAX_EPOLL_EVENT * sizeof(struct nitem));
	if (r->head->items == NULL) {
		free(r->head);
		close(epfd);
		return -2;
	}
	memset(r->head->items, 0, (MAX_EPOLL_EVENT * sizeof(struct nitem)));
	
	r->head->next = NULL;
	
	return 0;
}
// accept --> EPOLL

int reactor_loop(int listenfd) {
	struct reactor *R = getInstance();	
	
	struct epoll_event events[POLL_SIZE] = {0};
	while (1) {
		int nready = epoll_wait(R->epfd, events, POLL_SIZE, -1);
		if (nready == -1) {
			continue;
		}
		int i = 0;
		for (i = 0;i < nready;i ++) {
			
			struct nitem *item = (struct nitem *)events[i].data.ptr;
			int connfd = item->fd;
			if (connfd == listenfd) { //
				item->acceptcb(listenfd, 0, NULL);
			} else {
			
				if (events[i].events & EPOLLIN) { //
					item->readcb(connfd, 0, NULL);
				
				} 
				if (events[i].events & EPOLLOUT) {
					item->writecb(connfd, 0, NULL);
		
				}
			}
		}
	}
	return 0;
}
int main(int arGC, char **argv) 
{
    
 	int  connfd, n;
	int listenfd = init_server(9999);
	nreactor_set_event(listenfd, accept_callback, ACCEPT_CB, NULL);
	//nreactor_set_event(listenfd, accept_callback, read_callback, write_callback);
	
	reactor_loop(listenfd);
	 
    return 0;
}

到此这篇关于Reactor原理与实现的文章就介绍到这了,更多相关Reactor原理内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

--结束END--

本文标题: C++中的Reactor原理与实现

本文链接: https://www.lsjlt.com/news/153187.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • C++中的Reactor原理与实现
    目录一、Reactor介绍二、代码实现一、Reactor介绍 reactor设计模式是event-driven architecture的一种实现方式,处理多个客户端并发的向服务端请...
    99+
    2022-11-13
  • C++中Reactor怎么实现
    这篇文章主要介绍“C++中Reactor怎么实现”,在日常操作中,相信很多人在C++中Reactor怎么实现问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”C++中Reactor怎么实现”的疑惑有所帮助!接下来...
    99+
    2023-07-02
  • C++基于reactor的服务器百万并发实现与讲解
    目录一、服务器的代码实现与讲解二、环境设置reactor实现的原理请参考:https://www.jb51.net/article/253794.htm本次百万并发的代码实现也是基于...
    99+
    2022-11-13
  • C++位图的实现原理与方法
    概念 位图就是bitmap的缩写,所谓bitmap,就是用每一位来存放某种状态,适用于大规模数据,该数据都是不重复的简单数据。通常是用来判断某个数据存不存在的 例如:给40亿个不重...
    99+
    2022-11-12
  • C/C++并查集的查询与合并实现原理
    目录一、并查集的概念二、并查集的实现1.并查集不同集合(树)的形成2.find()函数找一个元素集合的编号3.合并两个不同集合(合并两棵不同的树)4.查询两个元素是否在一个集合5.并...
    99+
    2023-02-13
    C++并查集的查询与合并 C语言并查集的合并与查询
  • c++与python实现二分查找的原理及实现
    目录1、时间复杂度与优缺点2、python实现3、C++实现在计算机中,数据的查找方式与其存储方式关系密切。试想一下,如果图书馆中书籍杂乱无章的存放,那么要想找到心仪的书籍将会非常困...
    99+
    2022-11-13
  • C++中线程的原理与实现方法是什么
    这篇文章主要介绍“C++中线程的原理与实现方法是什么”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“C++中线程的原理与实现方法是什么”文章能帮助大家解决问题。在C++中有多种实现线程的方式C++11...
    99+
    2023-07-05
  • C++与Lua实现交互的原理是什么
    本篇文章给大家分享的是有关C++与Lua实现交互的原理是什么,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。具体步骤:1,找到cocos自带的绑定工具脚本文件genbinding...
    99+
    2023-06-06
  • C++中function的实现原理详解
    目录前言自己实现function前言 类模版std::function是一种通用、多态的函数封装。std::function的实例可以对任何可以调用的目标实体进行存储、复制、和调用操...
    99+
    2022-12-09
    C++ function实现原理 C++ function原理 C++ function
  • C++ 超详细分析多态的原理与实现
    目录多态的定义及实现多态的构成条件虚函数重写C++11的override和final抽象类多态的原理虚函数表动态绑定与静态绑定单继承和多继承关系的虚函数表单继承中的虚函数表多继承中的...
    99+
    2022-11-13
  • C++ 线段树原理与实现示例详解
    目录一、问题引入二、线段树的构建三、线段树的单点修改与查询1、修改2、查询四、线段树的区间修改与查询1、修改2、查询一、问题引入 对于一般的区间问题,比如RMQ(区间的最值)、区间的...
    99+
    2022-11-13
  • C++多态的实现与原理及抽象类实例分析
    这篇文章主要讲解了“C++多态的实现与原理及抽象类实例分析”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“C++多态的实现与原理及抽象类实例分析”吧!多态的概念多态: 从字面意思来看,就是事物...
    99+
    2023-06-29
  • Java中多线程Reactor模式的实现
    目录1、 主服务器2、IO请求handler+线程池3、客户端多线程Reactor模式旨在分配多个reactor每一个reactor独立拥有一个selector,在网络通信中大体设计...
    99+
    2022-11-12
  • C++基于reactor的服务器百万并发如何实现
    这篇文章主要介绍“C++基于reactor的服务器百万并发如何实现”,在日常操作中,相信很多人在C++基于reactor的服务器百万并发如何实现问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”C++基于reac...
    99+
    2023-07-02
  • C# 中get与post的原理是什么
    这期内容当中小编将会给大家带来有关C# 中get与post的原理是什么,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。C# get post中post和get的不同之处get与post的区别在于:(对于CG...
    99+
    2023-06-17
  • 分析C# Dictionary的实现原理
    目录一、理论知识1.1、Hash算法1.2、Hash桶算法1.3、解决冲突算法二、Dictionary实现2.1、Entry结构体2.2、其它关键私有变量2.3、Dictionary...
    99+
    2022-11-12
  • 详解C# ConcurrentBag的实现原理
    目录一、ConcurrentBag类二、 ConcurrentBag线程安全实现原理2.1、ConcurrentBag的私有字段2.2、用于数据存储的ThreadLocalList类...
    99+
    2022-11-12
  • JavaScript中async与await实现原理与细节
    目录一、回调地狱二、Promise三、生成器(generator)四、使用生成器同步化promise五、async、await异步代码究极解决方案一、回调地狱 在es6兴起之后许多人...
    99+
    2022-11-13
  • PHP Session ID的实现原理与实例
    目录Session作用 session 的工作机制:PHPSESSIONID的生产算法原理:php.ini配置如下:PHP Session工作原理PHPcli模式通过session_...
    99+
    2022-11-12
  • Android沙盘原理与实现
       一、前言   据网秦发布的《2012年上半年全球手机安全报告》,2012年上半年Android病毒感染量增长迅猛,尤以5、6月为突出,上半年感染手机12...
    99+
    2022-06-06
    Android
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作