1. Epoll

1.1 TCP通信大致流程

Sever端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// server.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>

int main()
{
// 1. 创建监听的套接字
int lfd = socket(AF_INET, SOCK_STREAM, 0);
if(lfd == -1)
{
perror("socket");
exit(0);
}

// 2. 将socket()返回值和本地的IP端口绑定到一起
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(10000); // 大端端口
// INADDR_ANY代表本机的所有IP, 假设有三个网卡就有三个IP地址
// 这个宏可以代表任意一个IP地址
// 这个宏一般用于本地的绑定操作
addr.sin_addr.s_addr = INADDR_ANY; // 这个宏的值为0 == 0.0.0.0
//inet_pton(AF_INET, "192.168.237.131", &addr.sin_addr.s_addr);
int ret = bind(lfd, (struct sockaddr*)&addr, sizeof(addr));
if(ret == -1)
{
perror("bind");
exit(0);
}

// 3. 设置监听
ret = listen(lfd, 128);
if(ret == -1)
{
perror("listen");
exit(0);
}

// 4. 阻塞等待并接受客户端连接
struct sockaddr_in cliaddr;
int clilen = sizeof(cliaddr);
int cfd = accept(lfd, (struct sockaddr*)&cliaddr, &clilen);
if(cfd == -1)
{
perror("accept");
exit(0);
}
// 打印客户端的地址信息
char ip[24] = {0};
printf("客户端的IP地址: %s, 端口: %d\n",
inet_ntop(AF_INET, &cliaddr.sin_addr.s_addr, ip, sizeof(ip)),
ntohs(cliaddr.sin_port));

// 5. 和客户端通信
while(1)
{
// 接收数据
char buf[1024];
memset(buf, 0, sizeof(buf));
int len = read(cfd, buf, sizeof(buf));
if(len > 0)
{
printf("客户端say: %s\n", buf);
write(cfd, buf, len);
}
else if(len == 0)
{
printf("客户端断开了连接...\n");
break;
}
else
{
perror("read");
break;
}
}

close(cfd);
close(lfd);

return 0;
}

Client端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// client.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>

int main()
{
// 1. 创建通信的套接字
int fd = socket(AF_INET, SOCK_STREAM, 0);
if(fd == -1)
{
perror("socket");
exit(0);
}

// 2. 连接服务器
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(10000); // 大端端口
inet_pton(AF_INET, "192.168.237.131", &addr.sin_addr.s_addr);

int ret = connect(fd, (struct sockaddr*)&addr, sizeof(addr));
if(ret == -1)
{
perror("connect");
exit(0);
}

// 3. 和服务器端通信
int number = 0;
while(1)
{
// 发送数据
char buf[1024];
sprintf(buf, "你好, 服务器...%d\n", number++);
write(fd, buf, strlen(buf)+1);

// 接收数据
memset(buf, 0, sizeof(buf));
int len = read(fd, buf, sizeof(buf));
if(len > 0)
{
printf("服务器say: %s\n", buf);
}
else if(len == 0)
{
printf("服务器断开了连接...\n");
break;
}
else
{
perror("read");
break;
}
sleep(1); // 每隔1s发送一条数据
}

close(fd);

return 0;
}

1.2 Select

以下模型都使用这个Client 进行交互

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#include <iostream>
#include <string>
#include <WinSock2.h>
#include <WS2tcpip.h>
#include <time.h>
#pragma comment(lib, "ws2_32.lib")

// __FILE__ 获取源文件的相对路径和名字
// __LINE__ 获取该行代码在文件中的行号
// __func__ 或 __FUNCTION__ 获取函数名
#define LOGI(format, ...) fprintf(stderr,"[INFO] [%s:%d]:%s() " format "\n", __FILE__,__LINE__,__func__,##__VA_ARGS__)
#define LOGE(format, ...) fprintf(stderr,"[ERROR] [%s:%d]:%s() " format "\n", __FILE__,__LINE__,__func__,##__VA_ARGS__)

int main(int argc, char** argv)
{
const char * serverIp = "127.0.0.1";
const int serverPort = 8080;
//const int clientPort = 8011;// 可以指定客户端端口,也可以不指定
LOGI("TcpClient start,serverIp=%s,serverPort=%d", serverIp,serverPort);

WSADATA wsaData;
if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
{
LOGE("WSAStartup error");
return -1;
}

int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd == -1)
{
LOGE("create socket error");
WSACleanup();
return -1;
}
int on = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (const char*)&on, sizeof(on));

/*
sockaddr_in client_addr;
client_addr.sin_family = AF_INET;
client_addr.sin_addr.S_un.S_addr = INADDR_ANY;
//client_addr.sin_port = htons(clientPort);
if (bind(fd, (LPSOCKADDR)&client_addr, sizeof(SOCKADDR)) == -1)
{
LOGE("socket bind error");
WSACleanup();
return -1;
}
*/

//设置 server_addr
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(serverPort);
//server_addr.sin_addr.s_addr = inet_addr(serverIp);
inet_pton(AF_INET, serverIp, &server_addr.sin_addr);

if (connect(fd, (struct sockaddr*)&server_addr, sizeof(sockaddr_in)) == -1)
{
LOGE("socket connect error");
return -1;
}
LOGI("fd=%d connect success",fd);

char buf[10000];
int size;
uint64_t totalSize = 0;
time_t t1 = time(NULL);

while (true)
{
size = recv(fd, buf, sizeof(buf), 0);
if (size <= 0)
break;

totalSize += size;

if (totalSize > 6291456)/* 62914560=60*1024*1024=60mb*/
{
time_t t2 = time(NULL);
if (t2 - t1 > 0)
{
uint64_t speed = totalSize / 1024 / 1024 / (t2 - t1);
printf("fd=%d,size=%d,totalSize=%llu,speed=%lluMbps\n", fd,size, totalSize, speed);
totalSize = 0;
t1 = time(NULL);
}
}
}

LOGE("fd=%d disconnect", fd);

closesocket(fd);
WSACleanup();
return 0;
}

阻塞型一对一Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#include <stdint.h>
#include <WinSock2.h>
#include <WS2tcpip.h>
#include <iostream>
#pragma comment(lib, "ws2_32.lib")

// __FILE__ 获取源文件的相对路径和名字
// __LINE__ 获取该行代码在文件中的行号
// __func__ 或 __FUNCTION__ 获取函数名
#define LOGI(format, ...) fprintf(stderr,"[INFO] [%s:%d]:%s() " format "\n", __FILE__,__LINE__,__func__,##__VA_ARGS__)
#define LOGE(format, ...) fprintf(stderr,"[ERROR] [%s:%d]:%s() " format "\n", __FILE__,__LINE__,__func__,##__VA_ARGS__)


int main()
{
const char* ip = "127.0.0.1";
uint16_t port = 8080;
LOGI("TcpServer1 tcp://%s:%d", ip, port);

SOCKET server_fd = -1;
WSADATA wsaData;
if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
{
LOGI("WSAStartup error");
return -1;
}
SOCKADDR_IN server_addr;

server_addr.sin_family = AF_INET;
server_addr.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
//server_addr.sin_addr.s_addr = inet_addr("192.168.2.61");
server_addr.sin_port = htons(port);

server_fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (bind(server_fd, (SOCKADDR*)&server_addr, sizeof(SOCKADDR)) == SOCKET_ERROR)
{
LOGI("socket bind error");
return -1;
}

if (listen(server_fd, SOMAXCONN) < 0)
{
LOGI("socket listen error");
return -1;
}

while (true)
{
LOGI("阻塞监听新连接...");
// 阻塞接收请求 start
int len = sizeof(SOCKADDR);
SOCKADDR_IN accept_addr;
int clientFd = accept(server_fd, (SOCKADDR*)&accept_addr, &len);
//const char* clientIp = inet_ntoa(accept_addr.sin_addr);

if (clientFd == SOCKET_ERROR)
{
LOGI("accept connection error");
break;
}
// 阻塞接收请求 end
LOGI("发现新连接:clientFd=%d", clientFd);

int size;
uint64_t totalSize = 0;
time_t t1 = time(NULL);

while (true)
{
char buf[1024];
memset(buf, 1, sizeof(buf));
size = ::send(clientFd, buf, sizeof(buf), 0);
if (size < 0)
{
printf("clientFd=%d,send error,错误码:%d\n", clientFd, WSAGetLastError());
break;
}
totalSize += size;

if (totalSize > 62914560)/* 62914560=60*1024*1024=60mb*/
{
time_t t2 = time(NULL);
if (t2 - t1 > 0)
{
uint64_t speed = totalSize / 1024 / 1024 / (t2 - t1);
printf("clientFd=%d,size=%d,totalSize=%llu,speed=%lluMbps\n", clientFd, size, totalSize, speed);

totalSize = 0;
t1 = time(NULL);
}
}
}

LOGI("关闭连接 clientFd=%d", clientFd);
}

return 0;
}

阻塞型一对多Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
#include <stdint.h>
#include <WinSock2.h>
#include <WS2tcpip.h>
#include <iostream>
#include <map>
#include <mutex>
#include <thread>
#pragma comment(lib, "ws2_32.lib")

// __FILE__ 获取源文件的相对路径和名字
// __LINE__ 获取该行代码在文件中的行号
// __func__ 或 __FUNCTION__ 获取函数名
#define LOGI(format, ...) fprintf(stderr,"[INFO] [%s:%d]:%s() " format "\n", __FILE__,__LINE__,__func__,##__VA_ARGS__)
#define LOGE(format, ...) fprintf(stderr,"[ERROR] [%s:%d]:%s() " format "\n", __FILE__,__LINE__,__func__,##__VA_ARGS__)

/*
无论是linux还是windows,一个进程可用的内存空间都有上限,比如windows内存空间上限有2G,
而默认情况下,一个线程的栈要预留1M的内存空间,所以理论一个进程中最多可以开2048个线程
但实际上远远达不到。

虽然能实现一对n的服务,但很显然不适合高并发场景。
*/

class Server;
class Connection
{
public:
Connection(Server* server, int clientFd)
:m_server(server), m_clientFd(clientFd)
{
LOGI("");
}

~Connection()
{
LOGI("");
closesocket(m_clientFd);
if (th)
{
th->join();
delete th;
th = nullptr;
}
}

public:
typedef void (*DisconnectionCallback)(void*, int); //(server, sockFd)
void setDisconnectionCallback(DisconnectionCallback cb, void* arg)
{
m_disconnectionCallback = cb;
m_arg = arg;
}

int start()
{
th = new std::thread([](Connection* conn)
{
int size;
uint64_t totalSize = 0;
time_t t1 = time(NULL);

while (true)
{
char buf[1024];
memset(buf, 1, sizeof(buf));
size = ::send(conn->m_clientFd, buf, sizeof(buf), 0);
if (size < 0)
{
printf("clientFd=%d,send error,错误码:%d\n", conn->m_clientFd, WSAGetLastError());
conn->m_disconnectionCallback(conn->m_arg, conn->m_clientFd);
break;
}
totalSize += size;

if (totalSize > 62914560)/* 62914560=60*1024*1024=60mb*/
{
time_t t2 = time(NULL);
if (t2 - t1 > 0)
{
uint64_t speed = totalSize / 1024 / 1024 / (t2 - t1);

printf("clientFd=%d,size=%d,totalSize=%llu,speed=%lluMbps\n", conn->m_clientFd, size, totalSize, speed);

totalSize = 0;
t1 = time(NULL);
}
}
}

}, this);
return 0;
}
int getClientFd()
{
return m_clientFd;
}

private:
Server* m_server;
int m_clientFd;
std::thread* th = nullptr;;

DisconnectionCallback m_disconnectionCallback = nullptr;
void* m_arg = nullptr;// server *
};

class Server
{
public:
Server(const char *ip,uint16_t port);
~Server();

public:
int start()
{
LOGI("TcpServer2 tcp://%s:%d", m_ip, m_port);
//创建描述符
m_sockFd = socket(AF_INET, SOCK_STREAM, 0);
if (m_sockFd < 0)
{
LOGI("create socket error");
return -1;
}
int on = 1;
setsockopt(m_sockFd, SOL_SOCKET, SO_REUSEADDR, (const char*)&on, sizeof(on));

// bind
SOCKADDR_IN server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
//server_addr.sin_addr.s_addr = inet_addr("192.168.2.61");
server_addr.sin_port = htons(m_port);

if (bind(m_sockFd, (SOCKADDR*)&server_addr, sizeof(SOCKADDR)) == SOCKET_ERROR)
{
LOGI("socket bind error");
return -1;
}
// listen
if (listen(m_sockFd, 10) < 0)
{
LOGE("socket listen error");
return -1;
}

while (true)
{
LOGI("阻塞监听新连接...");
// 阻塞接收请求 start
int clientFd;
char clientIp[40] = { 0 };
uint16_t clientPort;

socklen_t len = 0;
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
len = sizeof(addr);

clientFd = accept(m_sockFd, (struct sockaddr*)&addr, &len);
if (clientFd < 0)
{
LOGE("socket accept error");
return -1;
}
strcpy(clientIp, inet_ntoa(addr.sin_addr));
clientPort = ntohs(addr.sin_port);
// 阻塞接收请求 end
LOGI("发现新连接:clientIp=%s,clientPort=%d,clientFd=%d", clientIp, clientPort, clientFd);

Connection* conn = new Connection(this, clientFd);
conn->setDisconnectionCallback(Server::cbDisconnection, this);
this->addConnection(conn);
conn->start();//非阻塞在子线程中启动

}
return 0;
}

void handleDisconnection(int clientFd)
{
LOGI("clientFd=%d", clientFd);
this->removeConnection(clientFd);
}

static void cbDisconnection(void* arg, int clientFd)
{
LOGI("clientFd=%d", clientFd);
Server* server = (Server*)arg;
server->handleDisconnection(clientFd);
}


private:
const char* m_ip;
uint16_t m_port;
int m_sockFd;

std::map<int, Connection*> m_connMap;// <sockFd,conn> 维护所有被创建的连接
std::mutex m_connMap_mtx;
bool addConnection(Connection* conn)
{
m_connMap_mtx.lock();
if (m_connMap.find(conn->getClientFd()) != m_connMap.end())
{
m_connMap_mtx.unlock();
return false;
}
else
{
m_connMap.insert(std::make_pair(conn->getClientFd(), conn));
m_connMap_mtx.unlock();
return true;
}

}

Connection* getConnection(int clientFd)
{
m_connMap_mtx.lock();
std::map<int, Connection*>::iterator it = m_connMap.find(clientFd);
if ( it != m_connMap.end())
{
m_connMap_mtx.unlock();
return it->second;
}
else
{
m_connMap_mtx.unlock();
return nullptr;
}
}

bool removeConnection(int clientFd)
{
m_connMap_mtx.lock();
std::map<int, Connection*>::iterator it = m_connMap.find(clientFd);
if (it != m_connMap.end())
{
m_connMap.erase(it);
m_connMap_mtx.unlock();
return true;
}
else
{
m_connMap_mtx.unlock();
return false;
}
}

};

Server::Server(const char* ip, uint16_t port):m_ip(ip),m_port(port),m_sockFd(-1)
{
WSADATA wsaData;
if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
{
LOGE("WSAStartup Error");
return;
}

}
Server::~Server()
{
if (m_sockFd > -1)
{
closesocket(m_sockFd);
m_sockFd = -1;
}
WSACleanup();
}


int main()
{
const char* ip = "127.0.0.1";
uint16_t port = 8080;
Server server(ip,port);
server.start();
return 0;
}

一对多非阻塞型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
#include <iostream>
#include <time.h>
#include <string>
#include <WinSock2.h>
#include <WS2tcpip.h>
#pragma comment(lib, "ws2_32.lib")

static std::string getTime() {
const char* time_fmt = "%Y-%m-%d %H:%M:%S";
time_t t = time(nullptr);
char time_str[64];
strftime(time_str, sizeof(time_str), time_fmt, localtime(&t));

return time_str;
}
// __FILE__ 获取源文件的相对路径和名字
// __LINE__ 获取该行代码在文件中的行号
// __func__ 或 __FUNCTION__ 获取函数名

#define LOGI(format, ...) fprintf(stderr,"[INFO]%s [%s:%d %s()] " format "\n", getTime().data(),__FILE__,__LINE__,__func__ ,##__VA_ARGS__)
#define LOGE(format, ...) fprintf(stderr,"[ERROR]%s [%s:%d %s()] " format "\n",getTime().data(),__FILE__,__LINE__,__func__ ,##__VA_ARGS__)

int main()
{
const char* ip = "127.0.0.1";
uint16_t port = 8080;

LOGI("TcpServer_select tcp://%s:%d", ip, port);

WSADATA wsaData;
if (WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
{
LOGE("WSAStartup Error");
return -1;
}
//创建描述符
int serverFd = socket(AF_INET, SOCK_STREAM, 0);
if (serverFd < 0) {
LOGI("create socket error");
return -1;
}
int ret;
//unsigned long ul = 1;
//ret = ioctlsocket(serverFd, FIONBIO, (unsigned long*)&ul);
//if (ret == SOCKET_ERROR) {
// LOGE("设置非阻塞失败");
// return -1;
//}

int on = 1;
setsockopt(serverFd, SOL_SOCKET, SO_REUSEADDR, (const char*)&on, sizeof(on));

// bind
SOCKADDR_IN server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
//server_addr.sin_addr.s_addr = inet_addr("192.168.2.61");
server_addr.sin_port = htons(port);

if (bind(serverFd, (SOCKADDR*)&server_addr, sizeof(SOCKADDR)) == SOCKET_ERROR) {
LOGI("socket bind error");
return -1;
}
// listen
if (listen(serverFd, 10) < 0)
{
LOGE("socket listen error");
return -1;
}

char recvBuf[1000] = { 0 };
int recvBufLen = 1000;
int recvLen = 0;

int max_fd = 0;
fd_set readfds;
FD_ZERO(&readfds);

//将sockFd添加进入集合内,并跟新最大文件描述符
FD_SET(serverFd, &readfds);
max_fd = max_fd > serverFd ? max_fd : serverFd;

struct timeval timeout;
timeout.tv_sec = 0;// 秒
timeout.tv_usec = 0;//微秒

char sendBuf[10000];
int sendBufLen = 10000;
memset(sendBuf, 0, sendBufLen);

while (true)
{
//printf("loop...\n");
fd_set readfds_temp;
FD_ZERO(&readfds_temp);

readfds_temp = readfds;
ret = select(max_fd + 1, &readfds_temp, nullptr, nullptr, &timeout);
if (ret < 0)
{
//LOGE("未检测到活跃fd");
}
else
{
// 每个进程默认打开3个文件描述符
// 0,1,2 其中0代表标准输入流,1代表标准输出流,2代表标准错误流

for (int fd = 3; fd < max_fd + 1; fd++)
{
if (FD_ISSET(fd, &readfds_temp))
{
LOGI("fd=%d,触发可读事件", fd);

if (fd == serverFd) //如果套接字就绪了则等待客户端连接
{
int clientFd;
if ((clientFd = accept(serverFd, NULL, NULL)) == -1)
{
LOGE("accept error");
}
LOGI("发现新连接:clientFd=%d", clientFd);

//如果有客户端连接将产生的新的文件描述符添加到集合中,并更新最大文件描述符
FD_SET(clientFd, &readfds);
max_fd = max_fd > clientFd ? max_fd : clientFd;
}
else //客户端发来消息了
{
//memset(recvBuf, 0, recvBufLen);
recvLen = recv(fd, recvBuf, recvBufLen, 0);

if (recvLen <= 0)
{
LOGE("fd=%d,recvLen=%d error", fd,recvLen);
closesocket(fd);
FD_CLR(fd, &readfds); //从可读集合中删除
continue;
}
else
{
LOGI("fd=%d,recvLen=%d success", fd, recvLen);
}
}
}
}
}

for (int i = 0; i < readfds.fd_count; i++)
{
int fd = readfds.fd_array[i];
if (fd != serverFd)
{
// 客户端fd
int size = send(fd, sendBuf, sendBufLen, 0);
if (size < 0)
{
LOGE("fd=%d,send error,错误码:%d", fd, WSAGetLastError());
continue;
}

}
}
}

if (serverFd > -1)
{
closesocket(serverFd);
serverFd = -1;
}
WSACleanup();
return 0;
}

select在检测到可读事件后, 赋值于 readfds_temp, 这也是创建临时变量的原因(将赋值容器和存储容器分离开)
select函数要监听小于第一个参数参数的值, 因此要取出最大的, +1

1.3 Poll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <poll.h>

int main()
{
// 1.创建套接字
int serverFd = socket(AF_INET, SOCK_STREAM, 0);
if (serverFd == -1)
{
perror("socket");
exit(0);
}
// 2. 绑定 ip, port
struct sockaddr_in addr;
addr.sin_port = htons(8080);
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
int ret = bind(serverFd, (struct sockaddr*)&addr, sizeof(addr));
if (ret == -1)
{
perror("bind");
exit(0);
}
// 3. 监听
ret = listen(serverFd, 100);
if (ret == -1)
{
perror("listen");
exit(0);
}

// 4. 等待连接 -> 循环
// 检测 -> 读缓冲区, 委托内核去处理
// 数据初始化, 创建自定义的文件描述符集
struct pollfd fds[1024];
// 初始化
for (int i = 0; i < 1024; ++i)
{
fds[i].fd = -1;
fds[i].events = POLLIN;
}
fds[0].fd = serverFd;

char sendBuf[10000];
int sendBufLen = 10000;
memset(sendBuf, 0, sendBufLen);

int maxfd = 0;
while (true)
{
printf("maxfd=%d\n", maxfd);

//printf("step1\n");
// 委托内核检测
ret = poll(fds, maxfd + 1, 1); //fds传递到内核, 每一次都有一个复制的过程, 而epoll没有

//printf("step2\n");
if (ret == -1)
{
perror("select\n");
break;
}

// 检测的度缓冲区有变化
// 有新连接
if (fds[0].revents & POLLIN)
{
struct sockaddr_in conn_addr;
socklen_t conn_addr_len = sizeof(addr);
// 这个accept是不会阻塞的
int connfd = accept(serverFd, (struct sockaddr*)&conn_addr, &conn_addr_len);
// 委托内核检测connfd的读缓冲区
int i;
for (i = 0; i < 1024; ++i)
{
if (fds[i].fd == -1)
{
fds[i].fd = connfd;
break;
}
}
maxfd = i > maxfd ? i : maxfd;
}

//printf("step3\n");
// 通信, 有客户端发送数据过来
for (int i = 1; i <= maxfd; ++i)
{
// 如果在集合中, 说明读缓冲区有数据
if (fds[i].revents & POLLIN)
{
char buf[128];
int count = read(fds[i].fd, buf, sizeof(buf));
if (count <= 0)
{
printf("client disconnect ...\n");
close(fds[i].fd);
fds[i].fd = -1;
continue;
}
else
{
printf("client say: %s\n", buf);
//write(fds[i].fd, buf, strlen(buf)+1);
}

}

if (fds[i].fd > -1)
{
printf("fds[%d].fd=%d \n", i, fds[i].fd);
int size = send(fds[i].fd, sendBuf, sendBufLen, 0);
if (size < 0)
{
printf("fds[%d].fd=%d,send error \n", i, fds[i].fd);
close(fds[i].fd);
fds[i].fd = -1;
continue;
}

}
}
}
close(serverFd);
return 0;
}

1.4 Epoll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/epoll.h>

int main()
{
// 1.创建套接字
int serverFd = socket(AF_INET, SOCK_STREAM, 0);
if (serverFd == -1)
{
perror("socket");
exit(0);
}
// 2. 绑定 ip, port
struct sockaddr_in addr;
addr.sin_port = htons(8080);
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = INADDR_ANY;
int ret = bind(serverFd, (struct sockaddr*)&addr, sizeof(addr));
if (ret == -1)
{
perror("bind");
exit(0);
}
// 3. 监听
ret = listen(serverFd, 100);
if (ret == -1)
{
perror("listen");
exit(0);
}

// 创建epoll树
int epfd = epoll_create(1000);//有关size参数介绍:https://blog.csdn.net/zhoumuyu_yu/article/details/112472419
if (epfd == -1)
{
perror("epoll_create");
exit(0);
}

// 将监听lfd添加到树上
struct epoll_event serverFdEvt;
// 检测事件的初始化
serverFdEvt.events = EPOLLIN;
serverFdEvt.data.fd = serverFd;
epoll_ctl(epfd, EPOLL_CTL_ADD, serverFd, &serverFdEvt);

struct epoll_event events[1024];

int s1 = sizeof(events);
int s2 = sizeof(events[0]);

char sendBuf[10000];
int sendBufLen = 10000;
memset(sendBuf, 0, sendBufLen);

// 开始检测
while (true)
{
int nums = epoll_wait(epfd, events, s1 / s2, 1);//milliseconds
printf("serverFd=%d,nums = %d\n", serverFd, nums);

// 遍历状态变化的文件描述符集合
for (int i = 0; i < nums; ++i)
{
int curfd = events[i].data.fd;
printf("curfd=%d \n", curfd);
// 有新连接
if (curfd == serverFd)
{
struct sockaddr_in conn_addr;
socklen_t conn_addr_len = sizeof(addr);
int connfd = accept(serverFd, (struct sockaddr*)&conn_addr, &conn_addr_len);

printf("connfd=%d\n", connfd);
if (connfd == -1)
{
perror("accept\n");
//exit(0);
break;
}
// 将通信的fd挂到树上
serverFdEvt.events = EPOLLIN | EPOLLOUT;
//serverFdEvt.events = EPOLLIN;
serverFdEvt.data.fd = connfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &serverFdEvt);
}
// 通信
else
{
// 读事件触发, 写事件触发
/*
if(events[i].events & EPOLLOUT)
{
continue;
}
*/
if (events[i].events & EPOLLIN)
{
char buf[128];
int count = read(curfd, buf, sizeof(buf));
if (count <= 0)
{
printf("client disconnect ...\n");
close(curfd);
// 从树上删除该节点
epoll_ctl(epfd, EPOLL_CTL_DEL, curfd, NULL);
continue;
}
else
{
// 正常情况
printf("client say: %s\n", buf);
}
}
if (curfd > -1)
{

int size = send(curfd, sendBuf, sendBufLen, 0);
if (size < 0) {
printf("curfd=%d,send error \n", curfd);
close(curfd);
// 从树上删除该节点
epoll_ctl(epfd, EPOLL_CTL_DEL, curfd, NULL);
continue;
}
}
}
}
}
close(serverFd);
return 0;
}

关于Epoll的相关函数
在epoll中一共提供是三个API函数,分别处理不同的操作,函数原型如下:

1
2
3
4
5
6
7
#include <sys/epoll.h>
// 创建epoll实例,通过一棵红黑树管理待检测集合
int epoll_create(int size);
// 管理红黑树上的文件描述符(添加、修改、删除)
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
// 检测epoll树中是否有就绪的文件描述符
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

select/poll低效的原因之一是将“添加/维护待检测任务”和“阻塞进程/线程”两个步骤合二为一。每次调用select都需要这两步操作,然而大多数应用场景中,需要监视的socket个数相对固定,并不需要每次都修改。epoll将这两个操作分开,先用epoll_ctl()维护等待队列,再调用epoll_wait()阻塞进程(解耦)。通过下图的对比显而易见,epoll的效率得到了提升。
在这里插入图片描述

epoll_create()函数的作用是创建一个红黑树模型的实例,用于管理待检测的文件描述符的集合。

1
int epoll_create(int size);
  • 函数参数 size:在Linux内核2.6.8版本以后,这个参数是被忽略的,只需要指定一个大于0的数值就可以了。
  • 函数返回值:
    • 失败:返回-1
    • 成功:返回一个有效的文件描述符,通过这个文件描述符就可以访问创建的epoll实例了

epoll_ctl()函数的作用是管理红黑树实例上的节点,可以进行添加、删除、修改操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
// 联合体, 多个变量共用同一块内存        
typedef union epoll_data {
void *ptr;
int fd; // 通常情况下使用这个成员, 和epoll_ctl的第三个参数相同即可
uint32_t u32;
uint64_t u64;
} epoll_data_t;

struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
  • 函数参数:
    • epfd:epoll_create() 函数的返回值,通过这个参数找到epoll实例
    • op:这是一个枚举值,控制通过该函数执行什么操作
      • EPOLL_CTL_ADD:往epoll模型中添加新的节点
      • EPOLL_CTL_MOD:修改epoll模型中已经存在的节点
      • EPOLL_CTL_DEL:删除epoll模型中的指定的节点
    • fd:文件描述符,即要添加/修改/删除的文件描述符
    • event:epoll事件,用来修饰第三个参数对应的文件描述符的,指定检测这个文件描述符的什么事件
      • events:委托epoll检测的事件
        
        • EPOLLIN:读事件, 接收数据, 检测读缓冲区,如果有数据该文件描述符就绪
        • EPOLLOUT:写事件, 发送数据, 检测写缓冲区,如果可写该文件描述符就绪
        • EPOLLERR:异常事件
      • data:用户数据变量,这是一个联合体类型,通常情况下使用里边的`fd`成员,用于存储待检测的文件描述符的值,在调用`epoll_wait()`函数的时候这个值会被传出。
        
  • 函数返回值:
    • 失败:返回-1
    • 成功:返回0
      epoll_wait()函数的作用是检测创建的epoll实例中有没有就绪的文件描述符。
1
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
  • 函数参数:
    • epfd:epoll_create() 函数的返回值, 通过这个参数找到epoll实例
    • events:传出参数, 这是一个结构体数组的地址, 里边存储了已就绪的文件描述符的信息
    • maxevents:修饰第二个参数, 结构体数组的容量(元素个数)
    • timeout:如果检测的epoll实例中没有已就绪的文件描述符,该函数阻塞的时长, 单位ms 毫秒
      • 0:函数不阻塞,不管epoll实例中有没有就绪的文件描述符,函数被调用后都直接返回
      • 大于0:如果epoll实例中没有已就绪的文件描述符,函数阻塞对应的毫秒数再返回
      • -1:函数一直阻塞,直到epoll实例中有已就绪的文件描述符之后才解除阻塞
  • 函数返回值:
    • 成功:
      • 等于0:函数是阻塞被强制解除了, 没有检测到满足条件的文件描述符
      • 大于0:检测到的已就绪的文件描述符的总个数
    • 失败:返回-1

2. 定时器

简单示例
定时器每5秒到期一次,read 操作将读取定时器的到期次数,并输出到标准输出。
如果在非阻塞模式下没有数据可读,程序会继续循环等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <iostream>
#include <sys/timerfd.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <time.h>

int main() {
// 创建一个定时器文件描述符
int tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
if (tfd == -1) {
std::cerr << "timerfd_create failed: " << strerror(errno) << std::endl;
return 1;
}

// 设置定时器时间间隔
struct itimerspec new_value;
new_value.it_value.tv_sec = 5; // 初始到期时间为5秒
new_value.it_value.tv_nsec = 0;
new_value.it_interval.tv_sec = 5; // 重复间隔时间为5秒
new_value.it_interval.tv_nsec = 0;

if (timerfd_settime(tfd, 0, &new_value, NULL) == -1) {
std::cerr << "timerfd_settime failed: " << strerror(errno) << std::endl;
close(tfd);
return 1;
}

// 等待定时器事件
uint64_t expirations;
ssize_t s;
while (true) {
s = read(tfd, &expirations, sizeof(expirations));
if (s != sizeof(expirations)) {
if (errno == EAGAIN) {
// 非阻塞模式下,没有数据可读
continue;
} else {
std::cerr << "read failed: " << strerror(errno) << std::endl;
close(tfd);
return 1;
}
}
std::cout << "Timer expired " << expirations << " times" << std::endl;
}

close(tfd);
return 0;
}
1
2
3
4
timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); 

//函数原型
int timerfd_create(int clockid, int flags);

clockid:
CLOCK_MONOTONIC: 使用单调时钟(monotonic clock)该时钟从某个不明确的点开始计时,只会单调递增,不会受到系统时间调整的影响,这适合用于测量时间间隔

flags:
TFD_NONBLOCK: 以非阻塞模式创建文件描述符, 读操作将立即返回, 而不是阻塞等待
TFD_CLOEXEC: 在执行 execve 时关闭文件描述符(close-on-exec 标志)这是为了安全性,确保子进程不继承这个文件描述符。

1
2
3
4
timerfd_settime(tfd, 0, &new_value, NULL);

//函数原型
int timerfd_settime(int fd, int flags, const struct itimerspec *new_value, struct itimerspec *old_value);

fd:
由 timerfd_create 返回的定时器文件描述符。

flags:
可选标志位。常见的值是 0 或 TFD_TIMER_ABSTIME。
0: 相对时间,表示从当前时间开始计时。
TFD_TIMER_ABSTIME: 绝对时间,表示从系统启动时开始计时。

new_value:
指向 itimerspec 结构体的指针,用于设置新的定时器值。
itimerspec 结构体定义了初始到期时间和重复间隔时间。

old_value:
指向 itimerspec 结构体的指针,用于存储先前的定时器值。如果不需要,可以传递 NULL。

如果函数执行成功,则返回值为 0。
如果函数执行失败,则返回值为 -1,并且会设置全局变量 errno,以指示错误的具体原因。

1
2
3
4
struct itimerspec {
struct timespec it_interval; // 重复间隔时间
struct timespec it_value; // 初始到期时间
};

it_interval:
定时器到期后的重复间隔。如果为0,则定时器仅到期一次。
it_value:
定时器的初始到期时间。当设置为0时,定时器被禁用。