网络数据的接收





数据的接收

数据的接收

过程概述

说明: 本文使用6.5内核, 涉及网卡时使用intel igb。

  • 数据包从外部网络进入网卡
  • 网卡(通过DMA)将包拷贝到内核内存中的ring buffer
  • 产生硬件中断,通知系统收到了一个包
  • 驱动调用NAPI,如果轮询(poll)还没有开始,就开始轮询
  • ksoftirqd软中断调用NAPI的poll函数从ring buffer收包(poll函数是网卡驱动在初始化阶段注册的;每个cpu上都运行着一个ksoftirqd进程,在系统启动期间就注册了)
  • ring buffer里面对应的内存区域解除映射(unmapped)
  • 如果packet steering功能打开,或者网卡有多队列,网卡收到的数据包会被分发到多个cpu
  • 数据包从队列进入协议层
  • 协议层处理数据包
  • 数据包从协议层进入相应socket的接收队列: poll函数将收到的包送到协议栈注册的ip_rcv函数中,ip_rcv函数再将包送到tcp_v4_rcv函数或udp_rcv函数中
  • 在另外一个方向上,使用系统调用(read/recv/recvfrom/recvmsg等)从接收队列中获取数据: 穿过套接口层后调用tcp_recvmsg函数或udp_recvmsg函数,复制数据到用户空间

硬中断处理

当数据帧从网线或其它方式到达网卡上的时候,网卡会在分配给自己的ring buffer中寻找可以使用的内存位置,找到后DMA会把数据DMA到⽹卡之前关联的内存里,此时CPU是没有感知的。当DMA操作完成以后,网卡会向CPU发起⼀个硬中断,通知CPU有数据到达。

注意,这个过程中,如果ring buffer满了,新来的数据包将会被丢弃(ifconfig命令查看网卡,可以看到overruns信息项,表示因为环形队列满被丢弃的包,此时可以通过ethtool命令来加大队列的长度)。

对于intel igb网卡,当有数据包到达网卡时,DMA把数据映射到内存,通知CPU硬中断,执行注册的硬中断处理函数igb_msix_ring():

igb_msix_ring() - drivers/net/ethernet/intel/igb/igb_main.c
    __napi_schedule() - net/core/dev.c
        ____napi_schedule() - net/core/dev.c
            __raise_softirq_irqoff(NET_RX_SOFTIRQ) - net/core/dev.c

__raise_softirq_irqoff(NET_RX_SOFTIRQ)发起NET_RX_SOFTIRQ软中断。

软中断处理

ksoftirqd为软中断处理进程,ksoftirqd收到NET_RX_SOFTIRQ软中断后,执行软中断处理函数net_rx_action(),调用网卡驱动poll()函数(对于igb网卡为igb_poll函数)收包。

run_ksoftirqd() - kernel/softirqd.c
    __do_softirq() - kernel/softirqd.c
        h->action(h) - kernel/softirqd.c
            net_rx_action() - net/core/dev.c
                napi_poll() - net/core/dev.c
                    __napi_poll - net/core/dev.c
                        work = n->poll(n, weight) - net/core/dev.c
                            igb_poll() - drivers/net/ethernet/intel/igb/igb_main.c
                                igb_clean_rx_irq() - drivers/net/ethernet/intel/igb/igb_main.c

igb_poll()调用igb_clean_rx_irq():

链路层

igb_clean_rx_irq()

static int igb_clean_rx_irq(struct igb_q_vector *q_vector, const int budget)
{
	struct igb_adapter *adapter = q_vector->adapter;
	struct igb_ring *rx_ring = q_vector->rx.ring;
	struct sk_buff *skb = rx_ring->skb;
	unsigned int total_bytes = 0, total_packets = 0;
	u16 cleaned_count = igb_desc_unused(rx_ring);
	unsigned int xdp_xmit = 0;
	struct xdp_buff xdp;
	u32 frame_sz = 0;
	int rx_buf_pgcnt;

	/* Frame size depend on rx_ring setup when PAGE_SIZE=4K */
#if (PAGE_SIZE < 8192)
	frame_sz = igb_rx_frame_truesize(rx_ring, 0);
#endif
	xdp_init_buff(&xdp, frame_sz, &rx_ring->xdp_rxq);

	while (likely(total_packets < budget)) {
		union e1000_adv_rx_desc *rx_desc;
		struct igb_rx_buffer *rx_buffer;
		ktime_t timestamp = 0;
		int pkt_offset = 0;
		unsigned int size;
		void *pktbuf;

		/* return some buffers to hardware, one at a time is too slow */
		if (cleaned_count >= IGB_RX_BUFFER_WRITE) {
			igb_alloc_rx_buffers(rx_ring, cleaned_count);
			cleaned_count = 0;
		}

		rx_desc = IGB_RX_DESC(rx_ring, rx_ring->next_to_clean);
		size = le16_to_cpu(rx_desc->wb.upper.length);
		if (!size)
			break;

		/* This memory barrier is needed to keep us from reading
		 * any other fields out of the rx_desc until we know the
		 * descriptor has been written back
		 */
		dma_rmb();

		rx_buffer = igb_get_rx_buffer(rx_ring, size, &rx_buf_pgcnt);
		pktbuf = page_address(rx_buffer->page) + rx_buffer->page_offset;

		/* pull rx packet timestamp if available and valid */
		if (igb_test_staterr(rx_desc, E1000_RXDADV_STAT_TSIP)) {
			int ts_hdr_len;

			ts_hdr_len = igb_ptp_rx_pktstamp(rx_ring->q_vector,
							 pktbuf, &timestamp);

			pkt_offset += ts_hdr_len;
			size -= ts_hdr_len;
		}

		/* retrieve a buffer from the ring */
		if (!skb) {
			unsigned char *hard_start = pktbuf - igb_rx_offset(rx_ring);
			unsigned int offset = pkt_offset + igb_rx_offset(rx_ring);

			xdp_prepare_buff(&xdp, hard_start, offset, size, true);
			xdp_buff_clear_frags_flag(&xdp);
#if (PAGE_SIZE > 4096)
			/* At larger PAGE_SIZE, frame_sz depend on len size */
			xdp.frame_sz = igb_rx_frame_truesize(rx_ring, size);
#endif
			skb = igb_run_xdp(adapter, rx_ring, &xdp);
		}

		if (IS_ERR(skb)) {
			unsigned int xdp_res = -PTR_ERR(skb);

			if (xdp_res & (IGB_XDP_TX | IGB_XDP_REDIR)) {
				xdp_xmit |= xdp_res;
				igb_rx_buffer_flip(rx_ring, rx_buffer, size);
			} else {
				rx_buffer->pagecnt_bias++;
			}
			total_packets++;
			total_bytes += size;
		} else if (skb)
			igb_add_rx_frag(rx_ring, rx_buffer, skb, size);
		else if (ring_uses_build_skb(rx_ring))
			skb = igb_build_skb(rx_ring, rx_buffer, &xdp,
					    timestamp);
		else
			skb = igb_construct_skb(rx_ring, rx_buffer,
						&xdp, timestamp);

		/* exit if we failed to retrieve a buffer */
		if (!skb) {
			rx_ring->rx_stats.alloc_failed++;
			rx_buffer->pagecnt_bias++;
			break;
		}

		igb_put_rx_buffer(rx_ring, rx_buffer, rx_buf_pgcnt);
		cleaned_count++;

		/* fetch next buffer in frame if non-eop */
		if (igb_is_non_eop(rx_ring, rx_desc))
			continue;

		/* verify the packet layout is correct */
		if (igb_cleanup_headers(rx_ring, rx_desc, skb)) {
			skb = NULL;
			continue;
		}

		/* probably a little skewed due to removing CRC */
		total_bytes += skb->len;

		/* populate checksum, timestamp, VLAN, and protocol */
		igb_process_skb_fields(rx_ring, rx_desc, skb);

		napi_gro_receive(&q_vector->napi, skb);

		/* reset skb pointer */
		skb = NULL;

		/* update budget accounting */
		total_packets++;
	}

	/* place incomplete frames back on ring for completion */
	rx_ring->skb = skb;

	if (xdp_xmit & IGB_XDP_REDIR)
		xdp_do_flush();

	if (xdp_xmit & IGB_XDP_TX) {
		struct igb_ring *tx_ring = igb_xdp_tx_queue_mapping(adapter);

		igb_xdp_ring_update_tail(tx_ring);
	}

	u64_stats_update_begin(&rx_ring->rx_syncp);
	rx_ring->rx_stats.packets += total_packets;
	rx_ring->rx_stats.bytes += total_bytes;
	u64_stats_update_end(&rx_ring->rx_syncp);
	q_vector->rx.total_packets += total_packets;
	q_vector->rx.total_bytes += total_bytes;

	if (cleaned_count)
		igb_alloc_rx_buffers(rx_ring, cleaned_count);

	return total_packets;
}

igb_clean_rx_irq()主体是一个while循环:
• igb_alloc_rx_buffers函数分配一批rx_buffer(单个申请效率低)
• IGB_RX_DESC从ring buffer中取出下一个可读位置(next_to_clean)的rx_desc,之后从rx_desc获取到接收的数据 buffer大小
• igb_get_rx_buffer函数从ring buffer中取出下一个可读位置(next_to_clean)的rx_buffer
• pull rx packet timestamp if available and valid
• retrieve a buffer from the ring, 通过xdp对数据进行处理,根据不同情况,通过ixgbe_build_skb或者ixgbe_construct_skb创建skb
• igb_put_rx_buffer(): 如果rx_buffer可以重用,调用igb_reuse_rx_page();否则解除DMA映射并回收内存;最后通过设置rx_buffer→page = NULL来清空rx_buffer的内容。
• 调用igb_is_non_eop(): If the buffer is an EOP buffer this function exits returning false, otherwise it will place the sk_buff in the next buffer to be chained and return true indicating that this is in fact a non-EOP buffer(EOP: End of packet)
• 通过igb_cleanup_headers()检查包头是否正确;
• 将skb的长度累计到total_bytes,用于统计数据
• 调用igb_process_skb_fields()设置skb的checksum、timestamp、VLAN和protocol等信息(Populate skb header fields from Rx descriptor)
• 调用napi_gro_receive()将构建好的skb传递给网络协议栈
• 更新统计数据
• 如果没数据或者budget不够就退出循环,否则回到开始继续循环

下面介绍GRO与napi_gro_receive函数:

GRO(Generic Receive Offloading)

主要思想:
通过合并”足够类似”的包来减少传送给网络栈的包数,减少CPU的使用量。例如,大文件传输的场景,包的数量非常多,大部分包都是一段文件数据。相比于每次都将小包送到网络栈,可以将收到的小包合并成一个很大的包再送到网络栈。GRO使协议层只需处理一个header,就可以将包含大量数据的大包送到用户程序。
GRO给协议栈提供了一次将包交给网络协议栈之前,对其检查校验和、修改协议头和发送应答包(ACK packets)的机会。

处理规则:
如果GRO的buffer相比于包太小了,则可能会什么都不做;
如果当前包属于某个更大包的一个分片,调用enqueue_backlog()将这个分片放到某个CPU的包队列;当包重组完成后,会交给协议栈继续处理;
如果当前包不是分片包,会交给协议栈继续处理。

优缺点:
信息丢失:包的option或者flag信息在合并时会丢失。

TIPS:
使用tcpdump抓包时,如果看收到了非常大、看似不太正常的包,这很可能是系统开启了GRO(tcpdump的抓包在GRO处理之后)。

命令:
查看GSO是否开启: ethtool -k ens33 | grep generic-receive-offload
通常都是开启的,即: generic-receive-offload: on

napi_gro_receive()

gro_result_t napi_gro_receive(struct napi_struct *napi, struct sk_buff *skb)
{
	//...
	ret = napi_skb_finish(napi, skb, dev_gro_receive(napi, skb));
	//...
}

napi_gro_receive()调用dev_gro_receive()完成多个数据包的合并, 然后调用napi_skb_finish()。

napi_skb_finish()

static gro_result_t napi_skb_finish(struct napi_struct *napi,
				    struct sk_buff *skb,
				    gro_result_t ret)
{
	//...
	gro_normal_one(napi, skb, 1);
	//..
}

gro_normal_one()

gro_normal_list() - include/net/gro.h
    netif_receive_skb_list_internal() - net/core/dev.c
        __netif_receive_skb_list() - net/core/dev.c
            __netif_receive_skb_list_core() - net/core/dev.c
                __netif_receive_skb_core() - net/core/dev.c

__netif_receive_skb_core()

__netif_receive_skb_core()的一些功能:
• 一些准备工作,例如处理skb时间戳,重置网络头,重置传输头,重置MAC长度等
• Generic XDP(eXpress Data Path)软件执行XDP程序: XDP是硬件功能,本来应该由硬件网卡来执行,当硬件网卡不支持offload模式的XDP时,可以选择Generic模式的XDP
• 处理VLAN header
• TAP处理:例如tcpdump抓包、流量过滤
• TC(Traffic Control): TC规则或TC BPF程序
• Netfilter: 处理iptables规则等,这里通过nf_ingress函数进入

其最核心的功能是deliver_skb(),继续在协议栈上传递skb:

deliver_skb()

static inline int deliver_skb(struct sk_buff *skb,
			      struct packet_type *pt_prev,
			      struct net_device *orig_dev)
{
	if (unlikely(skb_orphan_frags_rx(skb, GFP_ATOMIC)))
		return -ENOMEM;
	refcount_inc(&skb->users);
	return pt_prev->func(skb, skb->dev, pt_prev, orig_dev);
}

核心函数为pt_prev→func:

/*
 *	The list of packet types we will receive (as opposed to discard)
 *	and the routines to invoke.
 *
 *	Why 16. Because with 16 the only overlap we get on a hash of the
 *	low nibble of the protocol value is RARP/SNAP/X.25.
 *
 *		0800	IP
 *		0001	802.3
 *		0002	AX.25
 *		0004	802.2
 *		8035	RARP
 *		0005	SNAP
 *		0805	X.25
 *		0806	ARP
 *		8137	IPX
 *		0009	Localtalk
 *		86DD	IPv6
 */
#define PTYPE_HASH_SIZE	(16)
#define PTYPE_HASH_MASK	(PTYPE_HASH_SIZE - 1)

extern struct list_head ptype_all __read_mostly;
extern struct list_head ptype_base[PTYPE_HASH_SIZE] __read_mostly;

根据协议类型packet_type.type在ptype_base哈希表中找到对应函数即packet_type.func,通过deliver_skb函数调用packet_type.func把skb交到对应的函数处理。
对于IP协议,这个packet_type就是ip_packet_type结构体变量,这个func就是ip_rcv函数(在下文中的网络层会详细介绍)。

网络层

ip_rcv()

注册:

static struct packet_type ip_packet_type __read_mostly = {
	.type = cpu_to_be16(ETH_P_IP),
	.func = ip_rcv,
	.list_func = ip_list_rcv,
};

IP层在函数inet_init中将自身注册到ptype_base哈希表,即:
dev_add_pack(&ip_packet_type): https://elixir.bootlin.com/linux/latest/source/net/ipv4/af_inet.c

流程:

ip_rcv() - net/ipv4/ip_input.c
    ip_rcv_finish() - net/ipv4/ip_input.c
        dst_input() - include/net/dst.h
            ip_local_deliver() - net/ipv4/ip_input.c
                ip_local_deliver_finish() - net/ipv4/ip_input.c
                    ip_protocol_deliver_rcu() - net/ipv4/ip_input.c
                        ret = INDIRECT_CALL_2(ipprot->handler, tcp_v4_rcv, udp_rcv, skb)

实现:

int ip_rcv(struct sk_buff *skb, struct net_device *dev, struct packet_type *pt,
	   struct net_device *orig_dev)
{
	struct net *net = dev_net(dev);

	skb = ip_rcv_core(skb, net);
	if (skb == NULL)
		return NET_RX_DROP;

	return NF_HOOK(NFPROTO_IPV4, NF_INET_PRE_ROUTING,
		       net, NULL, skb, dev, NULL,
		       ip_rcv_finish);
}

如果netfilter未通过,skb将不会继续被处理,直接返回;
否则netfilter允许继续处理该数据包,将skb传入ip_rcv_finish函数继续执行:

ip_rcv_finish()

static int ip_rcv_finish(struct net *net, struct sock *sk, struct sk_buff *skb)
{
	struct net_device *dev = skb->dev;
	int ret;

	/* if ingress device is enslaved to an L3 master device pass the
	 * skb to its handler for processing
	 */
	skb = l3mdev_ip_rcv(skb);
	if (!skb)
		return NET_RX_SUCCESS;

	ret = ip_rcv_finish_core(net, sk, skb, dev, NULL);
	if (ret != NET_RX_DROP)
		ret = dst_input(skb);
	return ret;
}

首先调用ip_rcv_finish_core函数对skb做一些初始化的工作,然后调用dst_input函数来根据skb的路由对数据报进行进一步的处理:

dst_input()

/* Input packet from network to transport.  */
static inline int dst_input(struct sk_buff *skb)
{
	return INDIRECT_CALL_INET(skb_dst(skb)->input,
				  ip6_input, ip_local_deliver, skb);
}

可见,对于IPv4,调用ip_local_deliver():

ip_local_deliver()

ip_local_deliver() - net/ipv4/ip_input.c
    ip_local_deliver_finish() - net/ipv4/ip_input.c
        ip_protocol_deliver_rcu() - net/ipv4/ip_input.c

ip_protocol_deliver_rcu()

void ip_protocol_deliver_rcu(struct net *net, struct sk_buff *skb, int protocol)
{
	//...
	ret = INDIRECT_CALL_2(ipprot->handler, tcp_v4_rcv, udp_rcv, skb);
	//...
}

可见:
对于tcp, 最终调用tcp_v4_rcv()。
对于udp, 最终调用udp_rcv()。

传输层

概述

传输层在两个方向上处理数据:
从硬件层到传输层的方向上,需要将数据包添加到socket的接收队列;
从系统调用到传输层的方向上,用户空间获取数据。
前者对应tcp_v4_rcv()与udp_rcv();后者对应tcp_recvmsg()与udp_recvmsg()。

tcp_v4_rcv()

tcp_v4_do_rcv() - net/ipv4/tcp_ipv4.c
    tcp_rcv_established() - net/ipv4/tcp_input.c
        tcp_data_queue() - net/ipv4/tcp_input.c
tcp_add_backlog() - net/ipv4/tcp_ipv4.c
int tcp_v4_rcv(struct sk_buff *skb)
{
	//...
	bh_lock_sock_nested(sk);
	tcp_segs_in(tcp_sk(sk), skb);
	ret = 0;
	if (!sock_owned_by_user(sk)) {
		ret = tcp_v4_do_rcv(sk, skb);
	} else {
		if (tcp_add_backlog(sk, skb, &drop_reason))
			goto discard_and_relse;
	}
	bh_unlock_sock(sk);
	//...
}

如果是进程上下文,那么接收到的skb则只能通过tcp_add_backlog()先放置到后备队列中。
如果是软中断上下文则放置到接收队列中。

udp_rcv()

__udp4_lib_rcv() - net/ipv4/udp.c
    udp_unicast_rcv_skb() - net/ipv4/udp.c
        udp_queue_rcv_skb() - net/ipv4/udp.c
            udp_queue_rcv_one_skb() - net/ipv4/udp.c
                __udp_queue_rcv_skb() - net/ipv4/udp.c
                    __udp_enqueue_schedule_skb() - net/ipv4/udp.c
                        __skb_queue_tail() - net/ipv4/udp.c

tcp_recvmsg()

tcp_recvmsg_locked() - net/ipv4/tcp.c
    skb_copy_datagram_msg() - net/ipv4/tcp.c

tcp_recvmsg()从接收队列中复制数据到用户空间。

核心函数tcp_recvmsg_locked():
函数带上locked后缀,表示这个函数的上下文中已经加锁与解锁。
其功能比较单一,但整个流程比较复杂,需要考虑的细节比较多。

static int tcp_recvmsg_locked(struct sock *sk, struct msghdr *msg, size_t len,
			      int flags, struct scm_timestamping_internal *tss,
			      int *cmsg_flags)
{
	struct tcp_sock *tp = tcp_sk(sk);
	int copied = 0;
	u32 peek_seq;
	u32 *seq;
	unsigned long used;
	int err;
	int target;		/* Read at least this many bytes */
	long timeo;
	struct sk_buff *skb, *last;
	u32 urg_hole = 0;

	err = -ENOTCONN;
	if (sk->sk_state == TCP_LISTEN)
		goto out;

	if (tp->recvmsg_inq) {
		*cmsg_flags = TCP_CMSG_INQ;
		msg->msg_get_inq = 1;
	}
	timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);

	/* Urgent data needs to be handled specially. */
	if (flags & MSG_OOB)
		goto recv_urg;

	if (unlikely(tp->repair)) {
		err = -EPERM;
		if (!(flags & MSG_PEEK))
			goto out;

		if (tp->repair_queue == TCP_SEND_QUEUE)
			goto recv_sndq;

		err = -EINVAL;
		if (tp->repair_queue == TCP_NO_QUEUE)
			goto out;

		/* 'common' recv queue MSG_PEEK-ing */
	}

	seq = &tp->copied_seq;
	if (flags & MSG_PEEK) {
		peek_seq = tp->copied_seq;
		seq = &peek_seq;
	}

	target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);

	do {
		u32 offset;

		/* Are we at urgent data? Stop if we have read anything or have SIGURG pending. */
		if (unlikely(tp->urg_data) && tp->urg_seq == *seq) {
			if (copied)
				break;
			if (signal_pending(current)) {
				copied = timeo ? sock_intr_errno(timeo) : -EAGAIN;
				break;
			}
		}

		/* Next get a buffer. */

		last = skb_peek_tail(&sk->sk_receive_queue);
		skb_queue_walk(&sk->sk_receive_queue, skb) {
			last = skb;
			/* Now that we have two receive queues this
			 * shouldn't happen.
			 */
			if (WARN(before(*seq, TCP_SKB_CB(skb)->seq),
				 "TCP recvmsg seq # bug: copied %X, seq %X, rcvnxt %X, fl %X\n",
				 *seq, TCP_SKB_CB(skb)->seq, tp->rcv_nxt,
				 flags))
				break;

			offset = *seq - TCP_SKB_CB(skb)->seq;
			if (unlikely(TCP_SKB_CB(skb)->tcp_flags & TCPHDR_SYN)) {
				pr_err_once("%s: found a SYN, please report !\n", __func__);
				offset--;
			}
			if (offset < skb->len)
				goto found_ok_skb;
			if (TCP_SKB_CB(skb)->tcp_flags & TCPHDR_FIN)
				goto found_fin_ok;
			WARN(!(flags & MSG_PEEK),
			     "TCP recvmsg seq # bug 2: copied %X, seq %X, rcvnxt %X, fl %X\n",
			     *seq, TCP_SKB_CB(skb)->seq, tp->rcv_nxt, flags);
		}

		/* Well, if we have backlog, try to process it now yet. */

		if (copied >= target && !READ_ONCE(sk->sk_backlog.tail))
			break;

		if (copied) {
			if (!timeo ||
			    sk->sk_err ||
			    sk->sk_state == TCP_CLOSE ||
			    (sk->sk_shutdown & RCV_SHUTDOWN) ||
			    signal_pending(current))
				break;
		} else {
			if (sock_flag(sk, SOCK_DONE))
				break;

			if (sk->sk_err) {
				copied = sock_error(sk);
				break;
			}

			if (sk->sk_shutdown & RCV_SHUTDOWN)
				break;

			if (sk->sk_state == TCP_CLOSE) {
				/* This occurs when user tries to read
				 * from never connected socket.
				 */
				copied = -ENOTCONN;
				break;
			}

			if (!timeo) {
				copied = -EAGAIN;
				break;
			}

			if (signal_pending(current)) {
				copied = sock_intr_errno(timeo);
				break;
			}
		}

		if (copied >= target) {
			/* Do not sleep, just process backlog. */
			__sk_flush_backlog(sk);
		} else {
			tcp_cleanup_rbuf(sk, copied);
			sk_wait_data(sk, &timeo, last);
		}

		if ((flags & MSG_PEEK) &&
		    (peek_seq - copied - urg_hole != tp->copied_seq)) {
			net_dbg_ratelimited("TCP(%s:%d): Application bug, race in MSG_PEEK\n",
					    current->comm,
					    task_pid_nr(current));
			peek_seq = tp->copied_seq;
		}
		continue;

found_ok_skb:
		/* Ok so how much can we use? */
		used = skb->len - offset;
		if (len < used)
			used = len;

		/* Do we have urgent data here? */
		if (unlikely(tp->urg_data)) {
			u32 urg_offset = tp->urg_seq - *seq;
			if (urg_offset < used) {
				if (!urg_offset) {
					if (!sock_flag(sk, SOCK_URGINLINE)) {
						WRITE_ONCE(*seq, *seq + 1);
						urg_hole++;
						offset++;
						used--;
						if (!used)
							goto skip_copy;
					}
				} else
					used = urg_offset;
			}
		}

		if (!(flags & MSG_TRUNC)) {
			err = skb_copy_datagram_msg(skb, offset, msg, used);
			if (err) {
				/* Exception. Bailout! */
				if (!copied)
					copied = -EFAULT;
				break;
			}
		}

		WRITE_ONCE(*seq, *seq + used);
		copied += used;
		len -= used;

		tcp_rcv_space_adjust(sk);

skip_copy:
		if (unlikely(tp->urg_data) && after(tp->copied_seq, tp->urg_seq)) {
			WRITE_ONCE(tp->urg_data, 0);
			tcp_fast_path_check(sk);
		}

		if (TCP_SKB_CB(skb)->has_rxtstamp) {
			tcp_update_recv_tstamps(skb, tss);
			*cmsg_flags |= TCP_CMSG_TS;
		}

		if (used + offset < skb->len)
			continue;

		if (TCP_SKB_CB(skb)->tcp_flags & TCPHDR_FIN)
			goto found_fin_ok;
		if (!(flags & MSG_PEEK))
			tcp_eat_recv_skb(sk, skb);
		continue;

found_fin_ok:
		/* Process the FIN. */
		WRITE_ONCE(*seq, *seq + 1);
		if (!(flags & MSG_PEEK))
			tcp_eat_recv_skb(sk, skb);
		break;
	} while (len > 0);

	/* According to UNIX98, msg_name/msg_namelen are ignored
	 * on connected socket. I was just happy when found this 8) --ANK
	 */

	/* Clean up data we have read: This will do ACK frames. */
	tcp_cleanup_rbuf(sk, copied);
	return copied;

out:
	return err;

recv_urg:
	err = tcp_recv_urg(sk, msg, len, flags);
	goto out;

recv_sndq:
	err = tcp_peek_sndq(sk, msg, len);
	goto out;
}
  • 如果TCP状态为LISTEN, 不允许读数据,需直接返回。
  • tp→recvmsg_inq与cmsg_flags处理CMSG相关,参考https://lore.kernel.org/netdev/650c22ca-cffc-0255-9a05-2413a1e20826@kernel.dk/
  • sock_rcvtimeo()获取阻塞读取时间,如果非阻塞,其值为0。
  • 如果是带外数据(MSG_OOB),则跳转到recv_urg。
  • 如果tcp_sock带有repair标记(主要是为了容器迁移,参考https://lwn.net/Articles/495304/),则进行相应处理。
  • 如果是查看数据(MSG_PEEK),保存一份peek_seq。
  • target = sock_rcvlowat(sk, flags & MSG_WAITALL, len): sock_rcvlowat()根据是否设置MSG_WAITALL标志来获取需要接收数据的长度。如果设置了MSG_WAITALL标志,则读取长度为输入参数len;否则为min_t(int, READ_ONCE(sk→sk_rcvlowat), len),而sk→sk_rcvlowat的默认值为1,因此在没有设置MSG_WAITALL的情况下,函数返回长度1(这也意味着只要有数据就可以返回了)。
  • urg_data与urg_seq检测是否读取到带外数据,如果读到带外数据之前已经读取了部分数据,或者用户进程有信号待处理,则终止本次正常数据的读取。
  • 获取下一个带读取的段。
  • offset = *seq – TCP_SKB_CB(skb)→seq: 已经获取到下一个待读取的段,计算该段开始读取数据的偏移位置。
  • TCPHDR_SYN: SYN标志占用了一个序号,如果存在SYN标志,需要调整offset偏移。
  • if (offset < skb→len): 如果偏移在该段的数据长度范围内,说明获取待读的段才是有效的,之后跳转到found_ok_skb读取数据。
  • TCPHDR_FIN: 段中存在FIN标志,跳转到found_fin_ok处理。
  • if (copied >= target && !READ_ONCE(sk→sk_backlog.tail)) 只有在读取完数据后,才能在后备队列不为空的情况下,去处理接收到后备队列中的TCP段,否则结束本次读取。
    关于后备队列:
    当用户进程锁定传输控制块时,才会将SKB放入后备队列中(在tcp_v4_rcv一节
    中已有说明);一旦用户进程释放传输控制块就应当立即处理后备队列。处理后备队列直接在release_sock()中实现,以确保在任何时候解锁传输控制块时能够立即处理后备队列。
  • 接收队列中可读的段已经读完,在处理后备队列之前需要检测是否有导致返回的事件、状态等,这些情况下需要结束本次读取。检测的条件包括: 即将终结,有错误发生,shutdown后不允许接收数据,TCP_CLOSE状态,非阻塞读,收到信号。
  • __sk_flush_backlog(sk): 处理本次的backlog
  • sk_wait_data(sk, &timeo, last): 如果数据未读取且为阻塞读取,则进入睡眠等待接收数据。
  • tcp_cleanup_rbuf(sk, copied): 清理已读数据,如有必要还会发送ACK。

参考:
《Linux内核源码剖析-TCP/IP实现》第31章TCP的输入 31.10.2节

udp_recvmsg()

/*
 * 	This should be easy, if there is something there we
 * 	return it, otherwise we block.
 */

int udp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int flags,
		int *addr_len)
{
	//...
	skb = __skb_recv_udp(sk, flags, &off, &err);
	if (!skb)
		return err;
	//...
	if (checksum_valid || udp_skb_csum_unnecessary(skb)) {
		if (udp_skb_is_linear(skb))
			err = copy_linear_skb(skb, copied, off, &msg->msg_iter);
		else
			err = skb_copy_datagram_msg(skb, off, msg, copied);
	} else {
		err = skb_copy_and_csum_datagram_msg(skb, off, msg);
		//...
	}
	//...
}

核心函数__skb_recv_udp()从sk_receive_queue接收队列中获取数据报:
如果没有获取到数据报,则udp_recvmsg()返回;
否则,udp_recvmsg()将根据情况调用相关的copy函数复制数据到用户空间。

注意,__skb_recv_udp()获取数据报时,接收队列中可能还没有数据,如果是阻塞,则会睡眠等待,直到超时或者队列中有数据而被唤醒。

系统调用

无论是socket fd的read系统调用,还是recvfrom/recv/recvmsg系统调用,最终都会调用sock_recvmsg()或者sock_recvmsg_nosec(), 而sock_recvmsg()最终也是调用或者sock_recvmsg_nosec()。

read()

static ssize_t sock_read_iter(struct kiocb *iocb, struct iov_iter *to)
{
	//...
	res = sock_recvmsg(sock, &msg, msg.msg_flags);
	//...
}

recv()与recvfrom()

SYSCALL_DEFINE4(recv, int, fd, void __user *, ubuf, size_t, size,
		unsigned int, flags)
{
	return __sys_recvfrom(fd, ubuf, size, flags, NULL, NULL);
}
SYSCALL_DEFINE6(recvfrom, int, fd, void __user *, ubuf, size_t, size,
		unsigned int, flags, struct sockaddr __user *, addr,
		int __user *, addr_len)
{
	return __sys_recvfrom(fd, ubuf, size, flags, addr, addr_len);
}

__sys_recvfrom()调用sock_recvmsg()。

recvmsg()

SYSCALL_DEFINE3(recvmsg, int, fd, struct user_msghdr __user *, msg,
		unsigned int, flags)
{
	return __sys_recvmsg(fd, msg, flags, true);
}

__sys_recvmsg:

____sys_recvmsg
        sock_recvmsg_nosec(sock, msg_sys, flags);
        sock_recvmsg(sock, msg_sys, flags);

sock_recvmsg()调用sock_recvmsg_nosec()。

套接口层

sock_recvmsg_nosec()

如上文所述,所有接收相关的系统调用,最终都会进入sock_recvmsg_nosec函数:

static inline int sock_recvmsg_nosec(struct socket *sock, struct msghdr *msg,
				     int flags)
{
	//...
	int ret = INDIRECT_CALL_INET(sock->ops->recvmsg, inet6_recvmsg,
				     inet_recvmsg, sock, msg,
				     msg_data_left(msg), flags);
	//...
}

可见,sock_recvmsg_nosec()最终调用inet6_recvmsg()(IPv6)或inet_recvmsg():
INDIRECT_CALL_INET宏使用INDIRECT_CALL_2,如果sock→ops→recvmsg指向inet6_recvmsg或inet_recvmsg, 则直接调用这两个函数,以减少retpoline技术带来的开销。

inet_recvmsg()

inet_recvmsg():

int inet_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
		 int flags)
{
	//...
	err = INDIRECT_CALL_2(sk->sk_prot->recvmsg, tcp_recvmsg, udp_recvmsg,
			      sk, msg, size, flags, &addr_len);
	//...
}

inet6_recvmsg():

int inet6_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
		  int flags)
{
	//...
	err = INDIRECT_CALL_2(prot->recvmsg, tcp_recvmsg, udpv6_recvmsg,
			      sk, msg, size, flags, &addr_len);
	//...
}

可见:
对于tcp, 最终调用tcp_recvmsg()。
对于udp, 最终调用udp_recvmsg()/udpv6_recvmsg()。
tcp_recvmsg()与udp_recvmsg()在上文中的传输层已经作了介绍。


Leave a Reply

Your email address will not be published. Required fields are marked *