一、Select模型介绍

套接字Select模型的中心思想是利用select函数实现对I/O的管理。利用select函数来判断套接字(一个或多个)上是否存在数据,或者能否向套接字写入数据。它也是同步的,也会阻塞。但和阻塞模型不同的是,Select模型可以同时管理多个Socket。

select函数原型:

1
2
3
4
5
6
7
int select (
int nfds,
fd_set FAR * readfds,
fd_set FAR * writefds,
fd_set FAR * exceptfds,
const struct timeval FAR * timeout
);

第一个参数nfds会被忽略。之所以仍然要提供这个参数,只是为了保持与早期的Berkeley套接字应用程序的兼容。

三个fd_set参数:一个用于检查可读性(readfds),一个用于检查可写性(writefds),另一个用于例外数据( excepfds)。

从根本上说,fdset数据类型代表着一系列特定套接字的集合。
其中,readfds集合包括符合下述任何一个条件的套接字:

  • 有数据可以读入。
  • 连接已经关闭、重设或中止。(可以用来判断客户端或服务端程序是否退出了)
  • 假如已调用了listen,而且一个连接正在建立,那么accept函数调用会成功。

writefds集合包括符合下述任何一个条件的套接字:

  • 有数据可以发出。
  • 如果已完成了对一个非锁定连接调用的处理,连接就会成功。

最后,exceptfds集合包括符合下述任何一个条件的套接字:

  • 假如已完成了对一个非锁定连接调用的处理,连接尝试就会失败。
  • 有带外(out-of-band,OOB)数据可供读取。

例如,假定我们想测试一个套接字是否“可读”,必须将自己的套接字增添到readfds集合,再等待select函数完成。select完成之后,必须判断自己的套接字是否仍为readfds集合的一部分。若答案是肯定的,便表明该套接字“可读”,可立即着手从它上面读取数据。在三个参数中(readfds、writedfss和exceptfds),任何两个都可以是空值(NULL);但是,至少有一个不能为空值!在任何不为空的集合中,必须包含至少一个套接字句柄;否则,select函数便没有任何东西可以等待。

最后一个参数timeout对应的是一个指针,它指向一个timeval结构,用于决定select最多等待I/O操作完成多久的时间。如timeout是一个空指针,那么select调用会无限期地“锁定”或停顿下去,直到至少有一个描述符符合指定的条件后结束。对timeval结构的定义如下:

1
2
3
4
struct timeval {
long tv_sec;
long tv_usec;
} ;

若将超时值设置为(0,0),表明select会立即返回,允许应用程序对select操作进行“轮询”。出于对性能方面的考虑,应避免这样的设置。
select成功完成后,会在fd_set结构中,返回刚好有未完成的I/O操作的所有套接字句柄的总量。
若超过timeval设定的时间,便会返回0。
不管由于什么原因,假如select调用失败,都会返回SOCKET_ERROR。
用select对套接字进行监视之前,在自己的应用程序中,必须将套接字句柄分配给一个集合,设置好一个或全部读、写以及例外fd_set结构。将一个套接字分配给任何一个集合后,再来调用select,便可知道一个套接字上是否正在发生上述的I/O活动。

Winsock提供了下列宏操作,对fd_set进行处理与检查:

1
2
3
4
FD_CLR(s, *set);      从set中删除套接字s。
FD_ISSET(s, *set); 检查s是否set集合的一名成员;返回TRUE或FALSE。
FD_SET(s, *set); 将套接字s加入集合set
FD_ZERO(*set); 将set初始化成空集合。

二、示例

2.1 服务端

服务端代码比之前介绍的“套接字I/O 阻塞模型”中的示例稍微复杂一点。可以管理多个客户端的连接,对每个新的客户端发送“Hello,I’m server”问候信息,在客户端程序退出时自动关闭连接等功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#include <winsock2.h>
#include <iostream>
#include <assert.h>
#include <vector>

using namespace std;

#pragma comment(lib, "Ws2_32.lib")

const u_short kPort = 10001;
const std::string kHelloServer = "hello, I'm server.";

int main()
{
WSADATA wsaData;
WORD wVersionRequested = MAKEWORD(2, 2);
WSAStartup(wVersionRequested, &wsaData);

SOCKET socket_ = INVALID_SOCKET;
std::vector<SOCKET> clients;

do
{
// (1)
socket_ = ::socket(AF_INET, SOCK_STREAM, 0);
if (socket_ == INVALID_SOCKET) {
std::cout << "create socket failed, GLE: " << WSAGetLastError() << std::endl;
break;
}

// (2)
struct sockaddr_in addr = { 0 };
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_port = htons(kPort);
if (bind(socket_, reinterpret_cast<const sockaddr*>(&addr), sizeof(addr)) == SOCKET_ERROR) {
std::cout << "bind failed, GLE: " << WSAGetLastError() << std::endl;
break;
}

// (3)
if (listen(socket_, 5) == SOCKET_ERROR) {
std::cout << "listen failed, GLE: " << WSAGetLastError() << std::endl;
break;
}
std::cout << "listen on port: " << kPort << std::endl;


fd_set fd_read;
while (true) // TODO 未处理何时退出的问题
{
// (4)
FD_ZERO(&fd_read);

FD_SET(socket_, &fd_read);

for (std::vector<SOCKET>::iterator it = clients.begin(); it != clients.end(); ++it)
FD_SET(*it, &fd_read);

// (5)
timeval timeout = { 3, 0 };
int ret = select(0, &fd_read, NULL, NULL, &timeout);
if (ret == SOCKET_ERROR) {
std::cout << "select failed, GLE: " << WSAGetLastError() << std::endl;
break;
}

// (6.1)
// 检查服务端的监听socket,是否有新的连接被搁置
if (FD_ISSET(socket_, &fd_read)) {

// (7.1)
struct sockaddr_in addr_c = { 0 };
int addr_len = sizeof(addr_c);
SOCKET s = accept(socket_, reinterpret_cast<sockaddr*>(&addr_c), &addr_len);
if (s == SOCKET_ERROR) {
std::cout << "accept failed, GLE: " << WSAGetLastError() << std::endl;
}
else {
clients.push_back(s);
std::cout << "new connection" << std::endl;

int left = kHelloServer.length();
int idx = 0;
while (left > 0) {
int err = send(s, (const char*)(kHelloServer.c_str() + idx), left, 0);
if (err == SOCKET_ERROR) {
std::cout << "send failed, GLE: " << WSAGetLastError() << std::endl;
break;
}

left -= err;
idx += err;

std::cout << "bytes sent: " << err << std::endl;
}
}
}

// (6.2)
// 检查与客户端连接的socket,是否有数据可以读入
for (std::vector<SOCKET>::iterator it = clients.begin(); it != clients.end(); ) {
if (FD_ISSET(*it, &fd_read)) {

// (7.2)
char buf[100] = { 0 };
int err = recv(*it, buf, 100, 0);
if (err > 0) {
std::cout << "recv: " << buf << std::endl;

++it;
}
else if (err == 0) {
std::cout << "connection closed." << std::endl;
closesocket(*it);

it = clients.erase(it);
}
else {
std::cout << "recv failed, GLE: " << WSAGetLastError() << std::endl;
closesocket(*it);

it = clients.erase(it);
}
}
else {
++it;
}
}
} // while
} while (false);

// (8)
for (std::vector<SOCKET>::iterator it = clients.begin(); it != clients.end(); ++it) {
closesocket(*it);
}

// (9)
closesocket(socket_);

WSACleanup();
return 0;
}

2.2 客户端

客户端在连接上服务端之后,便可以不间断的接收服务端的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#include <winsock2.h>
#include <iostream>
#include <assert.h>
#include <vector>

using namespace std;

#pragma comment(lib, "Ws2_32.lib")

const std::string kIP = "127.0.0.1";
const u_short kPort = 10001;
const std::string kHelloClient = "hello, I'm client.";

int main()
{
WSADATA wsaData;
WORD wVersionRequested = MAKEWORD(2, 2);
WSAStartup(wVersionRequested, &wsaData);

SOCKET socket_ = INVALID_SOCKET;

do
{
// (1)
socket_ = ::socket(AF_INET, SOCK_STREAM, 0);
if (socket_ == INVALID_SOCKET) {
std::cout << "create socket failed, GLE: " << WSAGetLastError() << std::endl;
break;
}


// (2)
struct sockaddr_in addr = { 0 };
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr(kIP.c_str());
addr.sin_port = htons(kPort);
if (connect(socket_, reinterpret_cast<const sockaddr*>(&addr), sizeof(addr)) == SOCKET_ERROR) {
std::cout << "connect failed, GLE: " << WSAGetLastError() << std::endl;
break;
}


fd_set fd_read;
while (true) // TODO 未处理何时退出的问题
{
// (3)
FD_ZERO(&fd_read);
FD_SET(socket_, &fd_read);

// (4)
timeval timeout = { 3, 0 };
int ret = select(0, &fd_read, NULL, NULL, &timeout);
if (ret == SOCKET_ERROR) {
std::cout << "select failed, GLE: " << WSAGetLastError() << std::endl;
break;
}

// (5)
if (FD_ISSET(socket_, &fd_read)) {
char buf[100] = { 0 };
int err = recv(socket_, buf, 100, 0);
if (err > 0) {
std::cout << "recv: " << buf << std::endl;
}
else if (err == 0) {
std::cout << "connection closed." << std::endl;
break;
}
else {
std::cout << "recv failed, GLE: " << WSAGetLastError() << std::endl;
break;
}
}
} // while
} while (false);


// (6)
closesocket(socket_);

WSACleanup();
return 0;
}