父子进程监听套接字实验

实验程序

首先看以下的demo程序,由recv唤醒时机中的程序简单改改得来,本篇博客主要探究一个问题,当父子进程同时监听server_sockfd,当客户端请求到达时,操作系统会把请求分配给哪个进程?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// server.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <netinet/tcp.h>
#include <netdb.h>
#include <errno.h>

int main(int argc, const char *argv[]) {
int server_sockfd; // 服务端套接字
int client_sockfd; // 客户端套接字
struct sockaddr_in my_add; // 服务器网络地址结构体
struct sockaddr_in remote_addr; // 客户端网络地址结构体
socklen_t sin_size;
memset(&my_add, 0, sizeof(my_add)); // 数据初始化--清零
my_add.sin_family = AF_INET; // 设置为IP通信
my_add.sin_addr.s_addr = INADDR_ANY; // 服务器IP地址--允许连接到所有本地地址上
my_add.sin_port = htons(8000); // 服务器端口号

// 创建服务器端套接字--IPV4协议 ,面向连接通信,TCP协议
if ((server_sockfd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
perror("socket");
return 1;
}

// 将套接字绑定到服务器的网络地址上
if (bind(server_sockfd, (struct sockaddr *)&my_add, sizeof(struct sockaddr)) < 0) {
perror("bind");
return 1;
}

// 监听连接请求--监听队列长度为1000,避免出现其他问题
listen(server_sockfd, 1000);
sin_size = sizeof(struct sockaddr_in);

pid_t pid = fork();
if (pid == 0) {
int cnt = 0;
printf("Child process started, PID: %d\n", getpid());
while (1) {
if ((client_sockfd = accept(server_sockfd, (struct sockaddr *)&remote_addr, &sin_size)) < 0) {
perror("accept");
return 1;
}
cnt++;
printf("child accept client %s %d times\n", inet_ntoa(remote_addr.sin_addr), cnt);
char buf[1 << 20];
int recv_cnt;
int send_cnt;
while ((recv_cnt = recv(client_sockfd, buf, 1 << 20, 0)) > 0) {
send_cnt = send(client_sockfd, buf, recv_cnt, 0);
if (send_cnt < 0) {
perror("write failed");
break;
}
}
close(client_sockfd);
}

} else if (pid > 0) {
int cnt = 0;
while (1) {
if ((client_sockfd = accept(server_sockfd, (struct sockaddr *)&remote_addr, &sin_size)) < 0) {
perror("accept");
return 1;
}
cnt++;
printf("parent accept client %s %d times\n", inet_ntoa(remote_addr.sin_addr), cnt);
char buf[1 << 20];
int recv_cnt;
int send_cnt;
while ((recv_cnt = recv(client_sockfd, buf, 1 << 20, 0)) > 0) {
send_cnt = send(client_sockfd, buf, recv_cnt, 0);
if (send_cnt < 0) {
perror("write failed");
break;
}
}
close(client_sockfd);
}
} else {
printf("Fork() failed.\n");
return 1;
}
close(server_sockfd);
return 0;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// client.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <netinet/tcp.h>
#include <netdb.h>
#include <thread>
#include <vector>
#include <iostream>

using namespace std;

int failed_num = 0;
int client_func() {
int client_sockfd;
struct sockaddr_in remota_addr; // 服务器端网络地址结构体
memset(&remota_addr, 0, sizeof(remota_addr)); // 数据初始化--清零
remota_addr.sin_family = AF_INET; // 设置为IPV4通信
remota_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
remota_addr.sin_port = htons(8000); // 服务器端口号
// 创建客户端套接字--Ipv4协议,面向连接通信,TCP协议
// 成功,返回0 ,失败返回-1
if ((client_sockfd = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
perror("socket");
return 1;
}

// 将套接字绑定到服务器的网络地址上
if (connect(client_sockfd, (struct sockaddr *)&remota_addr, sizeof(struct sockaddr)) < 0) {
perror("connect");
return 1;
}

printf("connect to server\n");

char send_buf[1 << 20];
char recv_buf[1 << 20];
int test_size = (1 << 19);
int test_times = 10;
int cnt;

srand(time(NULL));
for (int i = 0; i < test_times; i++) {
cnt = send(client_sockfd, send_buf, test_size, 0);
if (cnt < 0) {
perror("send failed");
failed_num++;
break;
}
cnt = recv(client_sockfd, recv_buf, test_size, 0);
if (cnt < 0) {
perror("recv failed");
failed_num++;
break;
}
while (cnt != test_size) {
int curr_cnt = recv(client_sockfd, recv_buf + cnt, test_size - cnt, 0);
if (curr_cnt < 0) {
perror("recv failed");
failed_num++;
break;
}
cnt += curr_cnt;
}
}
close(client_sockfd);
return 0;
}

int main(int argc, const char *argv[]) {
vector<thread> threads;
int thread_num = 1000;
for (int i = 0; i < thread_num; i++) {
threads.emplace_back(client_func);
}
for (int i = 0; i < thread_num; i++) {
threads[i].join();
}
printf("client test complete, failed num:%d\n", failed_num);
return 0;
}

执行server程序,查看当前监听8000端口的进程

1
2
3
4
5
6
7
8
root@ubuntu ~/c/fork_tcp_test [SIGINT]# ./server
Child process started, PID: 4239


root@ubuntu ~/c/fork_tcp_test# lsof -i:8000
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
server 4238 root 3u IPv4 71161 0t0 TCP *:8000 (LISTEN)
server 4239 root 3u IPv4 71161 0t0 TCP *:8000 (LISTEN)

证明父子进程是可以同时监听一个套接字的

运行client程序,向server发送数据报文

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# client输出
connect to server
connect to server
connect to server
connect to server
connect to server
connect to server
client test complete, failed num:0

# server输出
parent accept client 127.0.0.1 466 times
child accept client 127.0.0.1 522 times
child accept client 127.0.0.1 523 times
child accept client 127.0.0.1 524 times
child accept client 127.0.0.1 525 times
child accept client 127.0.0.1 526 times
child accept client 127.0.0.1 527 times
parent accept client 127.0.0.1 467 times
child accept client 127.0.0.1 528 times
child accept client 127.0.0.1 529 times
child accept client 127.0.0.1 530 times
child accept client 127.0.0.1 531 times
parent accept client 127.0.0.1 468 times
parent accept client 127.0.0.1 469 times

父进程接收了469次客户端请求,子进程接收了531次客户端请求,表明操作系统分配请求要么是轮询,要么是随机,目的是让两个进程同等地处理到达的客户端请求。

探究经历

刚开始我的思路是先从accept系统调用入手,找到请求队列,再找到对这个队列入队操作的相关代码,找到请求分配机制。首先阅读《Linux内核源代码情景分析》的函数sys_accept()——接受连接请求一节,大致了解accept系统调用流程,然后阅读内核源码,找到可能的请求队列。

socket.c - net/socket.c - Linux source code v5.10.223 - Bootlin

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
SYSCALL_DEFINE3(accept, int, fd, struct sockaddr __user *, upeer_sockaddr,
int __user *, upeer_addrlen)
{
return __sys_accept4(fd, upeer_sockaddr, upeer_addrlen, 0);
}
__sys_accept4
__sys_accept4_file
do_accept
sock->ops->accept
inet_accept // 通过搜索proto_ops成员找到
READ_ONCE(sk1->sk_prot)->accept
inet_csk_accept // 通过相关博客找到
// 猜测icsk_accept_queue就是要找的请求队列
struct inet_connection_sock *icsk = inet_csk(sk);
struct request_sock_queue *queue = &icsk->icsk_accept_queue;

观察函数调用时调用参数的转变

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// fd 监听套接字对应的文件描述符  upeer_sockaddr 客户端地址
int __sys_accept4(int fd, struct sockaddr __user *upeer_sockaddr,
int __user *upeer_addrlen, int flags)

// file fd对应的file结构体 upeer_sockaddr 客户端地址
int __sys_accept4_file(struct file *file, unsigned file_flags,
struct sockaddr __user *upeer_sockaddr,
int __user *upeer_addrlen, int flags,
unsigned long nofile)

// file fd对应的file结构体 upeer_sockaddr 客户端地址
struct file *do_accept(struct file *file, unsigned file_flags,
struct sockaddr __user *upeer_sockaddr,
int __user *upeer_addrlen, int flags)

// sock: file对应的监听套接字,通过sock_from_file得来
// newsock: 创建的套接字,之后可通过它获取upeer_sockaddr
int inet_accept(struct socket *sock, struct socket *newsock, int flags,
bool kern)

// sk: sock的sk成员 newsock可通过返回值struct sock *得来
struct sock *inet_csk_accept(struct sock *sk, int flags, int *err, bool kern)

struct inet_connection_sock *icsk = inet_csk(sk);
struct request_sock_queue *queue = &icsk->icsk_accept_queue;

// 因此icsk_accept_queue的成员链路为:
struct socket - general BSD socket
struct sock - network layer representation of sockets
struct inet_connection_sock - INET connection oriented sock
struct request_sock_queue - queue of request_socks
struct request_sock - mini sock to represent a connection request
fd---->file---->sock---->sk---->icsk---->icsk_accept_queue

查找icsk_accept_queue相关的入队函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
struct sock *inet_csk_reqsk_queue_add(struct sock *sk,
struct request_sock *req,
struct sock *child)
{
struct request_sock_queue *queue = &inet_csk(sk)->icsk_accept_queue;

spin_lock(&queue->rskq_lock);
if (unlikely(sk->sk_state != TCP_LISTEN)) {
inet_child_forget(sk, req, child);
child = NULL;
} else {
req->sk = child;
req->dl_next = NULL;
if (queue->rskq_accept_head == NULL)
WRITE_ONCE(queue->rskq_accept_head, req);
else
queue->rskq_accept_tail->dl_next = req;
queue->rskq_accept_tail = req;
sk_acceptq_added(sk);
}
spin_unlock(&queue->rskq_lock);
return child;
}

反推猜测它的调用栈,当一个客户端请求到达时,应该插入哪个fd的icsk_accept_queue

1
2
3
4
5
6
7
8
9
10
11
12
struct sock *inet_csk_complete_hashdance(struct sock *sk, struct sock *child,
struct request_sock *req, bool own_req)
inet_csk_reqsk_queue_add(sk, req, child)


struct sock *tcp_check_req(struct sock *sk, struct sk_buff *skb,
struct request_sock *req,
bool fastopen, bool *req_stolen)
return inet_csk_complete_hashdance(sk, child, req, own_req);

int tcp_v4_rcv(struct sk_buff *skb)
nsk = tcp_check_req(sk, skb, req, false, &req_stolen);

研究tcp_v4_rcv中出现的sk变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
th = (const struct tcphdr *)skb->data;
sk = __inet_lookup_skb(&tcp_hashinfo, skb, __tcp_hdrlen(th), th->source,
th->dest, sdif, &refcounted);
// __inet_lookup_skb调用栈
static inline struct sock *__inet_lookup_skb(struct inet_hashinfo *hashinfo,
struct sk_buff *skb,
int doff,
const __be16 sport,
const __be16 dport,
const int sdif,
bool *refcounted)
const struct iphdr *iph = ip_hdr(skb);
return __inet_lookup(dev_net(skb_dst(skb)->dev), hashinfo, skb,
doff, iph->saddr, sport,
iph->daddr, dport, inet_iif(skb), sdif,
refcounted);


static inline struct sock *__inet_lookup(struct net *net,
struct inet_hashinfo *hashinfo,
struct sk_buff *skb, int doff,
const __be32 saddr, const __be16 sport,
const __be32 daddr, const __be16 dport,
const int dif, const int sdif,
bool *refcounted)
{
u16 hnum = ntohs(dport);
struct sock *sk;

sk = __inet_lookup_established(net, hashinfo, saddr, sport,
daddr, hnum, dif, sdif);
*refcounted = true;
if (sk)
return sk;
*refcounted = false;
return __inet_lookup_listener(net, hashinfo, skb, doff, saddr,
sport, daddr, hnum, dif, sdif);
}

struct sock *__inet_lookup_listener(struct net *net,
struct inet_hashinfo *hashinfo,
struct sk_buff *skb, int doff,
const __be32 saddr, __be16 sport,
const __be32 daddr, const unsigned short hnum,
const int dif, const int sdif)
{
struct inet_listen_hashbucket *ilb2;
struct sock *result = NULL;
unsigned int hash2;

hash2 = ipv4_portaddr_hash(net, daddr, hnum); // 查找监听特定网卡IP的套接字
ilb2 = inet_lhash2_bucket(hashinfo, hash2);

result = inet_lhash2_lookup(net, ilb2, skb, doff,
saddr, sport, daddr, hnum,
dif, sdif);
if (result)
goto done;

/* Lookup lhash2 with INADDR_ANY */
hash2 = ipv4_portaddr_hash(net, htonl(INADDR_ANY), hnum); // 查找监听INADDR_ANY的套接字
ilb2 = inet_lhash2_bucket(hashinfo, hash2);

result = inet_lhash2_lookup(net, ilb2, skb, doff,
saddr, sport, htonl(INADDR_ANY), hnum,
dif, sdif);
done:
if (IS_ERR(result))
return NULL;
return result;
}

研究四元组(源IP,源端口,目的IP,目的端口)到sock的映射

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
/*
* Here are some nice properties to exploit here. The BSD API
* does not allow a listening sock to specify the remote port nor the
* remote address for the connection. So always assume those are both
* wildcarded during the search since they can never be otherwise.
*/

/* called with rcu_read_lock() : No refcount taken on the socket */
static struct sock *inet_lhash2_lookup(struct net *net,
struct inet_listen_hashbucket *ilb2,
struct sk_buff *skb, int doff,
const __be32 saddr, __be16 sport,
const __be32 daddr, const unsigned short hnum,
const int dif, const int sdif)
{
struct inet_connection_sock *icsk;
struct sock *sk, *result = NULL;
int score, hiscore = 0;

inet_lhash2_for_each_icsk_rcu(icsk, &ilb2->head) {
sk = (struct sock *)icsk;
score = compute_score(sk, net, hnum, daddr, dif, sdif);
if (score > hiscore) {
result = inet_lookup_reuseport(net, sk, skb, doff,
saddr, sport, daddr, hnum, inet_ehashfn);
if (result)
return result;

result = sk;
hiscore = score;
}
}

return result;
}
// reuseport 相关调用栈
struct sock *inet_lookup_reuseport(struct net *net, struct sock *sk,
struct sk_buff *skb, int doff,
__be32 saddr, __be16 sport,
__be32 daddr, unsigned short hnum,
inet_ehashfn_t *ehashfn)
{
struct sock *reuse_sk = NULL;
u32 phash;

if (sk->sk_reuseport) {
phash = INDIRECT_CALL_2(ehashfn, udp_ehashfn, inet_ehashfn,
net, daddr, hnum, saddr, sport);
reuse_sk = reuseport_select_sock(sk, phash, skb, doff);
}
return reuse_sk;
}

/**
* reuseport_select_sock - Select a socket from an SO_REUSEPORT group.
* @sk: First socket in the group.
* @hash: When no BPF filter is available, use this hash to select.
* @skb: skb to run through BPF filter.
* @hdr_len: BPF filter expects skb data pointer at payload data. If
* the skb does not yet point at the payload, this parameter represents
* how far the pointer needs to advance to reach the payload.
* Returns a socket that should receive the packet (or NULL on error).
*/
struct sock *reuseport_select_sock(struct sock *sk,
u32 hash,
struct sk_buff *skb,
int hdr_len)
{
struct sock_reuseport *reuse;
struct bpf_prog *prog;
struct sock *sk2 = NULL;
u16 socks;

rcu_read_lock();
reuse = rcu_dereference(sk->sk_reuseport_cb);

/* if memory allocation failed or add call is not yet complete */
if (!reuse)
goto out;

prog = rcu_dereference(reuse->prog);
socks = READ_ONCE(reuse->num_socks);
if (likely(socks)) {
/* paired with smp_wmb() in reuseport_add_sock() */
smp_rmb();

if (!prog || !skb)
goto select_by_hash;

if (prog->type == BPF_PROG_TYPE_SK_REUSEPORT)
sk2 = bpf_run_sk_reuseport(reuse, sk, prog, skb, hash);
else
sk2 = run_bpf_filter(reuse, socks, prog, skb, hdr_len);

select_by_hash:
/* no bpf or invalid bpf result: fall back to hash usage */
if (!sk2) {
int i, j;

i = j = reciprocal_scale(hash, socks);
while (reuse->socks[i]->sk_state == TCP_ESTABLISHED) {
i++;
if (i >= socks)
i = 0;
if (i == j)
goto out;
}
sk2 = reuse->socks[i];
}
}

out:
rcu_read_unlock();
return sk2;
}

看到reuseport,我以为父子进程是通过这种方式共同监听一个套接字

【译】使用 SO_REUSEPORT 套接字选项提升服务性能

从以上博客中发现并不是,而且博客里恰好介绍了父子进程同时监听同一个套接字的实现方式

答案

1
2
// icsk_accept_queue的成员链路为:
fd---->file---->sock---->sk---->icsk---->icsk_accept_queue

因此操作系统并不需要考虑将客户端请求分配给父进程还是子进程,因为它们的打开描述符对应同一个套接字,也就对应了同一个请求队列,所以哪个进程先获取套接字锁就先获取连接请求。

相关代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// fork系统调用中打开文件表的拷贝
static int copy_files(unsigned long clone_flags, struct task_struct *tsk)
{
struct files_struct *oldf, *newf;
oldf = current->files;
if (clone_flags & CLONE_FILES) {
atomic_inc(&oldf->count);
}
newf = dup_fd(oldf, NR_OPEN_MAX, &error);
tsk->files = newf;
}
/*
* Allocate a new files structure and copy contents from the
* passed in files structure.
* errorp will be valid only when the returned files_struct is NULL.
*/
struct files_struct *dup_fd(struct files_struct *oldf, unsigned int max_fds, int *errorp)
{
struct files_struct *newf;
struct file **old_fds, **new_fds;
unsigned int open_files, i;
struct fdtable *old_fdt, *new_fdt;
newf = kmem_cache_alloc(files_cachep, GFP_KERNEL);
for (i = open_files; i != 0; i--) {
struct file *f = *old_fds++;
if (f) {
get_file(f);
}
rcu_assign_pointer(*new_fds++, f); // 仅仅是拷贝指针
}
return newf;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// struct file *file与struct socket *

/**
* sock_alloc_file - Bind a &socket to a &file
* @sock: socket
* @flags: file status flags
* @dname: protocol name
*
* Returns the &file bound with @sock, implicitly storing it
* in sock->file. If dname is %NULL, sets to "".
* On failure the return is a ERR pointer (see linux/err.h).
* This function uses GFP_KERNEL internally.
*/

struct file *sock_alloc_file(struct socket *sock, int flags, const char *dname)
{
struct file *file;

if (!dname)
dname = sock->sk ? sock->sk->sk_prot_creator->name : "";

file = alloc_file_pseudo(SOCK_INODE(sock), sock_mnt, dname,
O_RDWR | (flags & O_NONBLOCK),
&socket_file_ops);
if (IS_ERR(file)) {
sock_release(sock);
return file;
}

sock->file = file;
file->private_data = sock;
stream_open(SOCK_INODE(sock), file);
return file;
}

/**
* sock_from_file - Return the &socket bounded to @file.
* @file: file
* @err: pointer to an error code return
*
* On failure returns %NULL and assigns -ENOTSOCK to @err.
*/

struct socket *sock_from_file(struct file *file, int *err)
{
if (file->f_op == &socket_file_ops)
return file->private_data; /* set in sock_map_fd */

*err = -ENOTSOCK;
return NULL;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 加锁解锁操作
struct sock *inet_csk_accept(struct sock *sk, int flags, int *err, bool kern)
{
struct inet_connection_sock *icsk = inet_csk(sk);
struct request_sock_queue *queue = &icsk->icsk_accept_queue;
struct request_sock *req;
struct sock *newsk;
int error;

lock_sock(sk);

/* We need to make sure that this socket is listening,
* and that it has something pending.
*/
error = -EINVAL;
if (sk->sk_state != TCP_LISTEN)
goto out_err;

/* Find already established connection */
if (reqsk_queue_empty(queue)) {
long timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);

/* If this is a non blocking socket don't sleep */
error = -EAGAIN;
if (!timeo)
goto out_err;

error = inet_csk_wait_for_connect(sk, timeo);
if (error)
goto out_err;
}
req = reqsk_queue_remove(queue, sk);
newsk = req->sk;
release_sock(sk);
}

相关博客:

网络编程——sockaddr 与 sockaddr_in

linux内核协议栈 accept 系统调用

深入理解Linux网络笔记(一):内核是如何接收网络包的

【译】使用 SO_REUSEPORT 套接字选项提升服务性能