数据的接收
过程概述
说明: 本文使用6.5内核, 涉及网卡时使用intel igb。
- 数据包从外部网络进入网卡
- 网卡(通过DMA)将包拷贝到内核内存中的ring buffer
- 产生硬件中断,通知系统收到了一个包
- 驱动调用NAPI,如果轮询(poll)还没有开始,就开始轮询
- ksoftirqd软中断调用NAPI的poll函数从ring buffer收包(poll函数是网卡驱动在初始化阶段注册的;每个cpu上都运行着一个ksoftirqd进程,在系统启动期间就注册了)
- ring buffer里面对应的内存区域解除映射(unmapped)
- 如果packet steering功能打开,或者网卡有多队列,网卡收到的数据包会被分发到多个cpu
- 数据包从队列进入协议层
- 协议层处理数据包
- 数据包从协议层进入相应socket的接收队列: poll函数将收到的包送到协议栈注册的ip_rcv函数中,ip_rcv函数再将包送到tcp_v4_rcv函数或udp_rcv函数中
- 在另外一个方向上,使用系统调用(read/recv/recvfrom/recvmsg等)从接收队列中获取数据: 穿过套接口层后调用tcp_recvmsg函数或udp_recvmsg函数,复制数据到用户空间
硬中断处理
当数据帧从网线或其它方式到达网卡上的时候,网卡会在分配给自己的ring buffer中寻找可以使用的内存位置,找到后DMA会把数据DMA到⽹卡之前关联的内存里,此时CPU是没有感知的。当DMA操作完成以后,网卡会向CPU发起⼀个硬中断,通知CPU有数据到达。
注意,这个过程中,如果ring buffer满了,新来的数据包将会被丢弃(ifconfig命令查看网卡,可以看到overruns信息项,表示因为环形队列满被丢弃的包,此时可以通过ethtool命令来加大队列的长度)。
对于intel igb网卡,当有数据包到达网卡时,DMA把数据映射到内存,通知CPU硬中断,执行注册的硬中断处理函数igb_msix_ring():
igb_msix_ring(): https://elixir.bootlin.com/linux/latest/source/drivers/net/ethernet/intel/igb/igb_main.c
igb_msix_ring() - drivers/net/ethernet/intel/igb/igb_main.c __napi_schedule() - net/core/dev.c ____napi_schedule() - net/core/dev.c __raise_softirq_irqoff(NET_RX_SOFTIRQ) - net/core/dev.c
__raise_softirq_irqoff(NET_RX_SOFTIRQ)发起NET_RX_SOFTIRQ软中断。
软中断处理
ksoftirqd为软中断处理进程,ksoftirqd收到NET_RX_SOFTIRQ软中断后,执行软中断处理函数net_rx_action(),调用网卡驱动poll()函数(对于igb网卡为igb_poll函数)收包。
run_ksoftirqd() - kernel/softirqd.c __do_softirq() - kernel/softirqd.c h->action(h) - kernel/softirqd.c net_rx_action() - net/core/dev.c napi_poll() - net/core/dev.c __napi_poll - net/core/dev.c work = n->poll(n, weight) - net/core/dev.c igb_poll() - drivers/net/ethernet/intel/igb/igb_main.c igb_clean_rx_irq() - drivers/net/ethernet/intel/igb/igb_main.c
igb_poll()调用igb_clean_rx_irq():
链路层
igb_clean_rx_irq()
static int igb_clean_rx_irq(struct igb_q_vector *q_vector, const int budget)
{
struct igb_adapter *adapter = q_vector->adapter;
struct igb_ring *rx_ring = q_vector->rx.ring;
struct sk_buff *skb = rx_ring->skb;
unsigned int total_bytes = 0, total_packets = 0;
u16 cleaned_count = igb_desc_unused(rx_ring);
unsigned int xdp_xmit = 0;
struct xdp_buff xdp;
u32 frame_sz = 0;
int rx_buf_pgcnt;
/* Frame size depend on rx_ring setup when PAGE_SIZE=4K */
#if (PAGE_SIZE < 8192)
frame_sz = igb_rx_frame_truesize(rx_ring, 0);
#endif
xdp_init_buff(&xdp, frame_sz, &rx_ring->xdp_rxq);
while (likely(total_packets < budget)) {
union e1000_adv_rx_desc *rx_desc;
struct igb_rx_buffer *rx_buffer;
ktime_t timestamp = 0;
int pkt_offset = 0;
unsigned int size;
void *pktbuf;
/* return some buffers to hardware, one at a time is too slow */
if (cleaned_count >= IGB_RX_BUFFER_WRITE) {
igb_alloc_rx_buffers(rx_ring, cleaned_count);
cleaned_count = 0;
}
rx_desc = IGB_RX_DESC(rx_ring, rx_ring->next_to_clean);
size = le16_to_cpu(rx_desc->wb.upper.length);
if (!size)
break;
/* This memory barrier is needed to keep us from reading
* any other fields out of the rx_desc until we know the
* descriptor has been written back
*/
dma_rmb();
rx_buffer = igb_get_rx_buffer(rx_ring, size, &rx_buf_pgcnt);
pktbuf = page_address(rx_buffer->page) + rx_buffer->page_offset;
/* pull rx packet timestamp if available and valid */
if (igb_test_staterr(rx_desc, E1000_RXDADV_STAT_TSIP)) {
int ts_hdr_len;
ts_hdr_len = igb_ptp_rx_pktstamp(rx_ring->q_vector,
pktbuf, ×tamp);
pkt_offset += ts_hdr_len;
size -= ts_hdr_len;
}
/* retrieve a buffer from the ring */
if (!skb) {
unsigned char *hard_start = pktbuf - igb_rx_offset(rx_ring);
unsigned int offset = pkt_offset + igb_rx_offset(rx_ring);
xdp_prepare_buff(&xdp, hard_start, offset, size, true);
xdp_buff_clear_frags_flag(&xdp);
#if (PAGE_SIZE > 4096)
/* At larger PAGE_SIZE, frame_sz depend on len size */
xdp.frame_sz = igb_rx_frame_truesize(rx_ring, size);
#endif
skb = igb_run_xdp(adapter, rx_ring, &xdp);
}
if (IS_ERR(skb)) {
unsigned int xdp_res = -PTR_ERR(skb);
if (xdp_res & (IGB_XDP_TX | IGB_XDP_REDIR)) {
xdp_xmit |= xdp_res;
igb_rx_buffer_flip(rx_ring, rx_buffer, size);
} else {
rx_buffer->pagecnt_bias++;
}
total_packets++;
total_bytes += size;
} else if (skb)
igb_add_rx_frag(rx_ring, rx_buffer, skb, size);
else if (ring_uses_build_skb(rx_ring))
skb = igb_build_skb(rx_ring, rx_buffer, &xdp,
timestamp);
else
skb = igb_construct_skb(rx_ring, rx_buffer,
&xdp, timestamp);
/* exit if we failed to retrieve a buffer */
if (!skb) {
rx_ring->rx_stats.alloc_failed++;
rx_buffer->pagecnt_bias++;
break;
}
igb_put_rx_buffer(rx_ring, rx_buffer, rx_buf_pgcnt);
cleaned_count++;
/* fetch next buffer in frame if non-eop */
if (igb_is_non_eop(rx_ring, rx_desc))
continue;
/* verify the packet layout is correct */
if (igb_cleanup_headers(rx_ring, rx_desc, skb)) {
skb = NULL;
continue;
}
/* probably a little skewed due to removing CRC */
total_bytes += skb->len;
/* populate checksum, timestamp, VLAN, and protocol */
igb_process_skb_fields(rx_ring, rx_desc, skb);
napi_gro_receive(&q_vector->napi, skb);
/* reset skb pointer */
skb = NULL;
/* update budget accounting */
total_packets++;
}
/* place incomplete frames back on ring for completion */
rx_ring->skb = skb;
if (xdp_xmit & IGB_XDP_REDIR)
xdp_do_flush();
if (xdp_xmit & IGB_XDP_TX) {
struct igb_ring *tx_ring = igb_xdp_tx_queue_mapping(adapter);
igb_xdp_ring_update_tail(tx_ring);
}
u64_stats_update_begin(&rx_ring->rx_syncp);
rx_ring->rx_stats.packets += total_packets;
rx_ring->rx_stats.bytes += total_bytes;
u64_stats_update_end(&rx_ring->rx_syncp);
q_vector->rx.total_packets += total_packets;
q_vector->rx.total_bytes += total_bytes;
if (cleaned_count)
igb_alloc_rx_buffers(rx_ring, cleaned_count);
return total_packets;
}
igb_clean_rx_irq()主体是一个while循环:
• igb_alloc_rx_buffers函数分配一批rx_buffer(单个申请效率低)
• IGB_RX_DESC从ring buffer中取出下一个可读位置(next_to_clean)的rx_desc,之后从rx_desc获取到接收的数据 buffer大小
• igb_get_rx_buffer函数从ring buffer中取出下一个可读位置(next_to_clean)的rx_buffer
• pull rx packet timestamp if available and valid
• retrieve a buffer from the ring, 通过xdp对数据进行处理,根据不同情况,通过ixgbe_build_skb或者ixgbe_construct_skb创建skb
• igb_put_rx_buffer(): 如果rx_buffer可以重用,调用igb_reuse_rx_page();否则解除DMA映射并回收内存;最后通过设置rx_buffer→page = NULL来清空rx_buffer的内容。
• 调用igb_is_non_eop(): If the buffer is an EOP buffer this function exits returning false, otherwise it will place the sk_buff in the next buffer to be chained and return true indicating that this is in fact a non-EOP buffer(EOP: End of packet)
• 通过igb_cleanup_headers()检查包头是否正确;
• 将skb的长度累计到total_bytes,用于统计数据
• 调用igb_process_skb_fields()设置skb的checksum、timestamp、VLAN和protocol等信息(Populate skb header fields from Rx descriptor)
• 调用napi_gro_receive()将构建好的skb传递给网络协议栈
• 更新统计数据
• 如果没数据或者budget不够就退出循环,否则回到开始继续循环
下面介绍GRO与napi_gro_receive函数:
GRO(Generic Receive Offloading)
主要思想:
通过合并”足够类似”的包来减少传送给网络栈的包数,减少CPU的使用量。例如,大文件传输的场景,包的数量非常多,大部分包都是一段文件数据。相比于每次都将小包送到网络栈,可以将收到的小包合并成一个很大的包再送到网络栈。GRO使协议层只需处理一个header,就可以将包含大量数据的大包送到用户程序。
GRO给协议栈提供了一次将包交给网络协议栈之前,对其检查校验和、修改协议头和发送应答包(ACK packets)的机会。
处理规则:
如果GRO的buffer相比于包太小了,则可能会什么都不做;
如果当前包属于某个更大包的一个分片,调用enqueue_backlog()将这个分片放到某个CPU的包队列;当包重组完成后,会交给协议栈继续处理;
如果当前包不是分片包,会交给协议栈继续处理。
优缺点:
信息丢失:包的option或者flag信息在合并时会丢失。
TIPS:
使用tcpdump抓包时,如果看收到了非常大、看似不太正常的包,这很可能是系统开启了GRO(tcpdump的抓包在GRO处理之后)。
命令:
查看GSO是否开启: ethtool -k ens33 | grep generic-receive-offload
通常都是开启的,即: generic-receive-offload: on
napi_gro_receive()
gro_result_t napi_gro_receive(struct napi_struct *napi, struct sk_buff *skb)
{
//...
ret = napi_skb_finish(napi, skb, dev_gro_receive(napi, skb));
//...
}
napi_gro_receive()调用dev_gro_receive()完成多个数据包的合并, 然后调用napi_skb_finish()。
napi_skb_finish()
static gro_result_t napi_skb_finish(struct napi_struct *napi,
struct sk_buff *skb,
gro_result_t ret)
{
//...
gro_normal_one(napi, skb, 1);
//..
}
gro_normal_one()
gro_normal_one(): https://elixir.bootlin.com/linux/latest/source/include/net/gro.h
gro_normal_list() - include/net/gro.h netif_receive_skb_list_internal() - net/core/dev.c __netif_receive_skb_list() - net/core/dev.c __netif_receive_skb_list_core() - net/core/dev.c __netif_receive_skb_core() - net/core/dev.c
__netif_receive_skb_core()
__netif_receive_skb_core(): https://elixir.bootlin.com/linux/latest/source/net/core/dev.c
__netif_receive_skb_core()的一些功能:
• 一些准备工作,例如处理skb时间戳,重置网络头,重置传输头,重置MAC长度等
• Generic XDP(eXpress Data Path)软件执行XDP程序: XDP是硬件功能,本来应该由硬件网卡来执行,当硬件网卡不支持offload模式的XDP时,可以选择Generic模式的XDP
• 处理VLAN header
• TAP处理:例如tcpdump抓包、流量过滤
• TC(Traffic Control): TC规则或TC BPF程序
• Netfilter: 处理iptables规则等,这里通过nf_ingress函数进入
其最核心的功能是deliver_skb(),继续在协议栈上传递skb:
deliver_skb()
static inline int deliver_skb(struct sk_buff *skb,
struct packet_type *pt_prev,
struct net_device *orig_dev)
{
if (unlikely(skb_orphan_frags_rx(skb, GFP_ATOMIC)))
return -ENOMEM;
refcount_inc(&skb->users);
return pt_prev->func(skb, skb->dev, pt_prev, orig_dev);
}
核心函数为pt_prev→func:
/*
* The list of packet types we will receive (as opposed to discard)
* and the routines to invoke.
*
* Why 16. Because with 16 the only overlap we get on a hash of the
* low nibble of the protocol value is RARP/SNAP/X.25.
*
* 0800 IP
* 0001 802.3
* 0002 AX.25
* 0004 802.2
* 8035 RARP
* 0005 SNAP
* 0805 X.25
* 0806 ARP
* 8137 IPX
* 0009 Localtalk
* 86DD IPv6
*/
#define PTYPE_HASH_SIZE (16)
#define PTYPE_HASH_MASK (PTYPE_HASH_SIZE - 1)
extern struct list_head ptype_all __read_mostly;
extern struct list_head ptype_base[PTYPE_HASH_SIZE] __read_mostly;
根据协议类型packet_type.type在ptype_base哈希表中找到对应函数即packet_type.func,通过deliver_skb函数调用packet_type.func把skb交到对应的函数处理。
对于IP协议,这个packet_type就是ip_packet_type结构体变量,这个func就是ip_rcv函数(在下文中的网络层会详细介绍)。
网络层
ip_rcv()
注册:
static struct packet_type ip_packet_type __read_mostly = {
.type = cpu_to_be16(ETH_P_IP),
.func = ip_rcv,
.list_func = ip_list_rcv,
};
IP层在函数inet_init中将自身注册到ptype_base哈希表,即:
dev_add_pack(&ip_packet_type): https://elixir.bootlin.com/linux/latest/source/net/ipv4/af_inet.c
流程:
ip_rcv() - net/ipv4/ip_input.c ip_rcv_finish() - net/ipv4/ip_input.c dst_input() - include/net/dst.h ip_local_deliver() - net/ipv4/ip_input.c ip_local_deliver_finish() - net/ipv4/ip_input.c ip_protocol_deliver_rcu() - net/ipv4/ip_input.c ret = INDIRECT_CALL_2(ipprot->handler, tcp_v4_rcv, udp_rcv, skb)
实现:
int ip_rcv(struct sk_buff *skb, struct net_device *dev, struct packet_type *pt,
struct net_device *orig_dev)
{
struct net *net = dev_net(dev);
skb = ip_rcv_core(skb, net);
if (skb == NULL)
return NET_RX_DROP;
return NF_HOOK(NFPROTO_IPV4, NF_INET_PRE_ROUTING,
net, NULL, skb, dev, NULL,
ip_rcv_finish);
}
如果netfilter未通过,skb将不会继续被处理,直接返回;
否则netfilter允许继续处理该数据包,将skb传入ip_rcv_finish函数继续执行:
ip_rcv_finish()
static int ip_rcv_finish(struct net *net, struct sock *sk, struct sk_buff *skb)
{
struct net_device *dev = skb->dev;
int ret;
/* if ingress device is enslaved to an L3 master device pass the
* skb to its handler for processing
*/
skb = l3mdev_ip_rcv(skb);
if (!skb)
return NET_RX_SUCCESS;
ret = ip_rcv_finish_core(net, sk, skb, dev, NULL);
if (ret != NET_RX_DROP)
ret = dst_input(skb);
return ret;
}
首先调用ip_rcv_finish_core函数对skb做一些初始化的工作,然后调用dst_input函数来根据skb的路由对数据报进行进一步的处理:
dst_input()
/* Input packet from network to transport. */
static inline int dst_input(struct sk_buff *skb)
{
return INDIRECT_CALL_INET(skb_dst(skb)->input,
ip6_input, ip_local_deliver, skb);
}
可见,对于IPv4,调用ip_local_deliver():
ip_local_deliver()
ip_local_deliver() - net/ipv4/ip_input.c ip_local_deliver_finish() - net/ipv4/ip_input.c ip_protocol_deliver_rcu() - net/ipv4/ip_input.c
ip_protocol_deliver_rcu()
void ip_protocol_deliver_rcu(struct net *net, struct sk_buff *skb, int protocol)
{
//...
ret = INDIRECT_CALL_2(ipprot->handler, tcp_v4_rcv, udp_rcv, skb);
//...
}
可见:
对于tcp, 最终调用tcp_v4_rcv()。
对于udp, 最终调用udp_rcv()。
传输层
概述
传输层在两个方向上处理数据:
从硬件层到传输层的方向上,需要将数据包添加到socket的接收队列;
从系统调用到传输层的方向上,用户空间获取数据。
前者对应tcp_v4_rcv()与udp_rcv();后者对应tcp_recvmsg()与udp_recvmsg()。
tcp_v4_rcv()
tcp_v4_do_rcv() - net/ipv4/tcp_ipv4.c tcp_rcv_established() - net/ipv4/tcp_input.c tcp_data_queue() - net/ipv4/tcp_input.c tcp_add_backlog() - net/ipv4/tcp_ipv4.c
int tcp_v4_rcv(struct sk_buff *skb)
{
//...
bh_lock_sock_nested(sk);
tcp_segs_in(tcp_sk(sk), skb);
ret = 0;
if (!sock_owned_by_user(sk)) {
ret = tcp_v4_do_rcv(sk, skb);
} else {
if (tcp_add_backlog(sk, skb, &drop_reason))
goto discard_and_relse;
}
bh_unlock_sock(sk);
//...
}
如果是进程上下文,那么接收到的skb则只能通过tcp_add_backlog()先放置到后备队列中。
如果是软中断上下文则放置到接收队列中。
udp_rcv()
__udp4_lib_rcv() - net/ipv4/udp.c udp_unicast_rcv_skb() - net/ipv4/udp.c udp_queue_rcv_skb() - net/ipv4/udp.c udp_queue_rcv_one_skb() - net/ipv4/udp.c __udp_queue_rcv_skb() - net/ipv4/udp.c __udp_enqueue_schedule_skb() - net/ipv4/udp.c __skb_queue_tail() - net/ipv4/udp.c
tcp_recvmsg()
tcp_recvmsg_locked() - net/ipv4/tcp.c skb_copy_datagram_msg() - net/ipv4/tcp.c
tcp_recvmsg()从接收队列中复制数据到用户空间。
核心函数tcp_recvmsg_locked():
函数带上locked后缀,表示这个函数的上下文中已经加锁与解锁。
其功能比较单一,但整个流程比较复杂,需要考虑的细节比较多。
static int tcp_recvmsg_locked(struct sock *sk, struct msghdr *msg, size_t len,
int flags, struct scm_timestamping_internal *tss,
int *cmsg_flags)
{
struct tcp_sock *tp = tcp_sk(sk);
int copied = 0;
u32 peek_seq;
u32 *seq;
unsigned long used;
int err;
int target; /* Read at least this many bytes */
long timeo;
struct sk_buff *skb, *last;
u32 urg_hole = 0;
err = -ENOTCONN;
if (sk->sk_state == TCP_LISTEN)
goto out;
if (tp->recvmsg_inq) {
*cmsg_flags = TCP_CMSG_INQ;
msg->msg_get_inq = 1;
}
timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
/* Urgent data needs to be handled specially. */
if (flags & MSG_OOB)
goto recv_urg;
if (unlikely(tp->repair)) {
err = -EPERM;
if (!(flags & MSG_PEEK))
goto out;
if (tp->repair_queue == TCP_SEND_QUEUE)
goto recv_sndq;
err = -EINVAL;
if (tp->repair_queue == TCP_NO_QUEUE)
goto out;
/* 'common' recv queue MSG_PEEK-ing */
}
seq = &tp->copied_seq;
if (flags & MSG_PEEK) {
peek_seq = tp->copied_seq;
seq = &peek_seq;
}
target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
do {
u32 offset;
/* Are we at urgent data? Stop if we have read anything or have SIGURG pending. */
if (unlikely(tp->urg_data) && tp->urg_seq == *seq) {
if (copied)
break;
if (signal_pending(current)) {
copied = timeo ? sock_intr_errno(timeo) : -EAGAIN;
break;
}
}
/* Next get a buffer. */
last = skb_peek_tail(&sk->sk_receive_queue);
skb_queue_walk(&sk->sk_receive_queue, skb) {
last = skb;
/* Now that we have two receive queues this
* shouldn't happen.
*/
if (WARN(before(*seq, TCP_SKB_CB(skb)->seq),
"TCP recvmsg seq # bug: copied %X, seq %X, rcvnxt %X, fl %X\n",
*seq, TCP_SKB_CB(skb)->seq, tp->rcv_nxt,
flags))
break;
offset = *seq - TCP_SKB_CB(skb)->seq;
if (unlikely(TCP_SKB_CB(skb)->tcp_flags & TCPHDR_SYN)) {
pr_err_once("%s: found a SYN, please report !\n", __func__);
offset--;
}
if (offset < skb->len)
goto found_ok_skb;
if (TCP_SKB_CB(skb)->tcp_flags & TCPHDR_FIN)
goto found_fin_ok;
WARN(!(flags & MSG_PEEK),
"TCP recvmsg seq # bug 2: copied %X, seq %X, rcvnxt %X, fl %X\n",
*seq, TCP_SKB_CB(skb)->seq, tp->rcv_nxt, flags);
}
/* Well, if we have backlog, try to process it now yet. */
if (copied >= target && !READ_ONCE(sk->sk_backlog.tail))
break;
if (copied) {
if (!timeo ||
sk->sk_err ||
sk->sk_state == TCP_CLOSE ||
(sk->sk_shutdown & RCV_SHUTDOWN) ||
signal_pending(current))
break;
} else {
if (sock_flag(sk, SOCK_DONE))
break;
if (sk->sk_err) {
copied = sock_error(sk);
break;
}
if (sk->sk_shutdown & RCV_SHUTDOWN)
break;
if (sk->sk_state == TCP_CLOSE) {
/* This occurs when user tries to read
* from never connected socket.
*/
copied = -ENOTCONN;
break;
}
if (!timeo) {
copied = -EAGAIN;
break;
}
if (signal_pending(current)) {
copied = sock_intr_errno(timeo);
break;
}
}
if (copied >= target) {
/* Do not sleep, just process backlog. */
__sk_flush_backlog(sk);
} else {
tcp_cleanup_rbuf(sk, copied);
sk_wait_data(sk, &timeo, last);
}
if ((flags & MSG_PEEK) &&
(peek_seq - copied - urg_hole != tp->copied_seq)) {
net_dbg_ratelimited("TCP(%s:%d): Application bug, race in MSG_PEEK\n",
current->comm,
task_pid_nr(current));
peek_seq = tp->copied_seq;
}
continue;
found_ok_skb:
/* Ok so how much can we use? */
used = skb->len - offset;
if (len < used)
used = len;
/* Do we have urgent data here? */
if (unlikely(tp->urg_data)) {
u32 urg_offset = tp->urg_seq - *seq;
if (urg_offset < used) {
if (!urg_offset) {
if (!sock_flag(sk, SOCK_URGINLINE)) {
WRITE_ONCE(*seq, *seq + 1);
urg_hole++;
offset++;
used--;
if (!used)
goto skip_copy;
}
} else
used = urg_offset;
}
}
if (!(flags & MSG_TRUNC)) {
err = skb_copy_datagram_msg(skb, offset, msg, used);
if (err) {
/* Exception. Bailout! */
if (!copied)
copied = -EFAULT;
break;
}
}
WRITE_ONCE(*seq, *seq + used);
copied += used;
len -= used;
tcp_rcv_space_adjust(sk);
skip_copy:
if (unlikely(tp->urg_data) && after(tp->copied_seq, tp->urg_seq)) {
WRITE_ONCE(tp->urg_data, 0);
tcp_fast_path_check(sk);
}
if (TCP_SKB_CB(skb)->has_rxtstamp) {
tcp_update_recv_tstamps(skb, tss);
*cmsg_flags |= TCP_CMSG_TS;
}
if (used + offset < skb->len)
continue;
if (TCP_SKB_CB(skb)->tcp_flags & TCPHDR_FIN)
goto found_fin_ok;
if (!(flags & MSG_PEEK))
tcp_eat_recv_skb(sk, skb);
continue;
found_fin_ok:
/* Process the FIN. */
WRITE_ONCE(*seq, *seq + 1);
if (!(flags & MSG_PEEK))
tcp_eat_recv_skb(sk, skb);
break;
} while (len > 0);
/* According to UNIX98, msg_name/msg_namelen are ignored
* on connected socket. I was just happy when found this 8) --ANK
*/
/* Clean up data we have read: This will do ACK frames. */
tcp_cleanup_rbuf(sk, copied);
return copied;
out:
return err;
recv_urg:
err = tcp_recv_urg(sk, msg, len, flags);
goto out;
recv_sndq:
err = tcp_peek_sndq(sk, msg, len);
goto out;
}
- 如果TCP状态为LISTEN, 不允许读数据,需直接返回。
- tp→recvmsg_inq与cmsg_flags处理CMSG相关,参考https://lore.kernel.org/netdev/650c22ca-cffc-0255-9a05-2413a1e20826@kernel.dk/
- sock_rcvtimeo()获取阻塞读取时间,如果非阻塞,其值为0。
- 如果是带外数据(MSG_OOB),则跳转到recv_urg。
- 如果tcp_sock带有repair标记(主要是为了容器迁移,参考https://lwn.net/Articles/495304/),则进行相应处理。
- 如果是查看数据(MSG_PEEK),保存一份peek_seq。
- target = sock_rcvlowat(sk, flags & MSG_WAITALL, len): sock_rcvlowat()根据是否设置MSG_WAITALL标志来获取需要接收数据的长度。如果设置了MSG_WAITALL标志,则读取长度为输入参数len;否则为min_t(int, READ_ONCE(sk→sk_rcvlowat), len),而sk→sk_rcvlowat的默认值为1,因此在没有设置MSG_WAITALL的情况下,函数返回长度1(这也意味着只要有数据就可以返回了)。
- urg_data与urg_seq检测是否读取到带外数据,如果读到带外数据之前已经读取了部分数据,或者用户进程有信号待处理,则终止本次正常数据的读取。
- 获取下一个带读取的段。
- offset = *seq – TCP_SKB_CB(skb)→seq: 已经获取到下一个待读取的段,计算该段开始读取数据的偏移位置。
- TCPHDR_SYN: SYN标志占用了一个序号,如果存在SYN标志,需要调整offset偏移。
- if (offset < skb→len): 如果偏移在该段的数据长度范围内,说明获取待读的段才是有效的,之后跳转到found_ok_skb读取数据。
- TCPHDR_FIN: 段中存在FIN标志,跳转到found_fin_ok处理。
- if (copied >= target && !READ_ONCE(sk→sk_backlog.tail)) 只有在读取完数据后,才能在后备队列不为空的情况下,去处理接收到后备队列中的TCP段,否则结束本次读取。
关于后备队列:
当用户进程锁定传输控制块时,才会将SKB放入后备队列中(在tcp_v4_rcv一节
中已有说明);一旦用户进程释放传输控制块就应当立即处理后备队列。处理后备队列直接在release_sock()中实现,以确保在任何时候解锁传输控制块时能够立即处理后备队列。 - 接收队列中可读的段已经读完,在处理后备队列之前需要检测是否有导致返回的事件、状态等,这些情况下需要结束本次读取。检测的条件包括: 即将终结,有错误发生,shutdown后不允许接收数据,TCP_CLOSE状态,非阻塞读,收到信号。
- __sk_flush_backlog(sk): 处理本次的backlog
- sk_wait_data(sk, &timeo, last): 如果数据未读取且为阻塞读取,则进入睡眠等待接收数据。
- tcp_cleanup_rbuf(sk, copied): 清理已读数据,如有必要还会发送ACK。
参考:
《Linux内核源码剖析-TCP/IP实现》第31章TCP的输入 31.10.2节
udp_recvmsg()
/*
* This should be easy, if there is something there we
* return it, otherwise we block.
*/
int udp_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int flags,
int *addr_len)
{
//...
skb = __skb_recv_udp(sk, flags, &off, &err);
if (!skb)
return err;
//...
if (checksum_valid || udp_skb_csum_unnecessary(skb)) {
if (udp_skb_is_linear(skb))
err = copy_linear_skb(skb, copied, off, &msg->msg_iter);
else
err = skb_copy_datagram_msg(skb, off, msg, copied);
} else {
err = skb_copy_and_csum_datagram_msg(skb, off, msg);
//...
}
//...
}
核心函数__skb_recv_udp()从sk_receive_queue接收队列中获取数据报:
如果没有获取到数据报,则udp_recvmsg()返回;
否则,udp_recvmsg()将根据情况调用相关的copy函数复制数据到用户空间。
注意,__skb_recv_udp()获取数据报时,接收队列中可能还没有数据,如果是阻塞,则会睡眠等待,直到超时或者队列中有数据而被唤醒。
系统调用
无论是socket fd的read系统调用,还是recvfrom/recv/recvmsg系统调用,最终都会调用sock_recvmsg()或者sock_recvmsg_nosec(), 而sock_recvmsg()最终也是调用或者sock_recvmsg_nosec()。
read()
static ssize_t sock_read_iter(struct kiocb *iocb, struct iov_iter *to)
{
//...
res = sock_recvmsg(sock, &msg, msg.msg_flags);
//...
}
recv()与recvfrom()
SYSCALL_DEFINE4(recv, int, fd, void __user *, ubuf, size_t, size,
unsigned int, flags)
{
return __sys_recvfrom(fd, ubuf, size, flags, NULL, NULL);
}
SYSCALL_DEFINE6(recvfrom, int, fd, void __user *, ubuf, size_t, size,
unsigned int, flags, struct sockaddr __user *, addr,
int __user *, addr_len)
{
return __sys_recvfrom(fd, ubuf, size, flags, addr, addr_len);
}
__sys_recvfrom()调用sock_recvmsg()。
recvmsg()
SYSCALL_DEFINE3(recvmsg, int, fd, struct user_msghdr __user *, msg,
unsigned int, flags)
{
return __sys_recvmsg(fd, msg, flags, true);
}
__sys_recvmsg:
____sys_recvmsg sock_recvmsg_nosec(sock, msg_sys, flags); sock_recvmsg(sock, msg_sys, flags);
sock_recvmsg()调用sock_recvmsg_nosec()。
套接口层
sock_recvmsg_nosec()
如上文所述,所有接收相关的系统调用,最终都会进入sock_recvmsg_nosec函数:
static inline int sock_recvmsg_nosec(struct socket *sock, struct msghdr *msg,
int flags)
{
//...
int ret = INDIRECT_CALL_INET(sock->ops->recvmsg, inet6_recvmsg,
inet_recvmsg, sock, msg,
msg_data_left(msg), flags);
//...
}
可见,sock_recvmsg_nosec()最终调用inet6_recvmsg()(IPv6)或inet_recvmsg():
INDIRECT_CALL_INET宏使用INDIRECT_CALL_2,如果sock→ops→recvmsg指向inet6_recvmsg或inet_recvmsg, 则直接调用这两个函数,以减少retpoline技术带来的开销。
inet_recvmsg()
inet_recvmsg():
int inet_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
int flags)
{
//...
err = INDIRECT_CALL_2(sk->sk_prot->recvmsg, tcp_recvmsg, udp_recvmsg,
sk, msg, size, flags, &addr_len);
//...
}
inet6_recvmsg():
int inet6_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
int flags)
{
//...
err = INDIRECT_CALL_2(prot->recvmsg, tcp_recvmsg, udpv6_recvmsg,
sk, msg, size, flags, &addr_len);
//...
}
可见:
对于tcp, 最终调用tcp_recvmsg()。
对于udp, 最终调用udp_recvmsg()/udpv6_recvmsg()。
tcp_recvmsg()与udp_recvmsg()在上文中的传输层已经作了介绍。