用Python开发ping命令

经过前面学习,我们知道 ping 命令内部通过 ICMP 协议探测目标 IP ,并计算 往返时间 。 本文使用 Python 开发一个简版 ping 命令, 演示如何通过 套接字 发送和接收 ICMP 协议报文。

其他语言版本:

报文封装

ICMP 报文同样分为头部和数据,其中头部的结构非常简单:

注意到, ICMP 头部只有三个固定字段,其余部分因消息类型而异。固定字段如下:

  • type类型
  • code代码
  • checksum校验和

ICMP 报文有很多不同的类型,由 typecode 字段区分。 而 ping 命令使用其中两种:

如上图,机器 A 通过 回显请求 ( echo request ) 询问机器 B ; 机器 B 收到报文后通过 回显应答 ( echo reply ) 响应机器 A 。 这两种报文的典型结构如下:

对应的 type 以及 code 字段值列举如下:

名称 类型 代码
回显请求 8 0
回显应答 0 0

按照惯例,回显报文除了固定字段,其余部分组织成 3 个字段:

  • 标识符 ( identifier ),一般填写进程 PID 以区分不同的 ping 进程;
  • 报文序号 ( sequence number ),用于为报文编号;
  • 数据 ( data ),可以是任意数据;

按 ICMP 协议规定, 回显应答 报文必须原封不动地回传这些字段。如果将 发送时间 封装在 数据负载payload )中, 应答收到后将其取出,即可计算 往返时间 ( round trip time )。

Python 标准库 struct 模块提供了用于 封装网络报文 的工具,可以这样封装数据负载:

1
2
3
4
import struct

sending_ts = time.time()
payload = struct.pack('!d', sending_ts)

这段代码将当前时间戳封装起来,其中冒号 ! 表示 网络 字节序d 表示双精度浮点。

封装报文头部也是类似的:

1
2
header = struct.pack('!BBHHH', _type, code, checksum, ident, seq)
icmp = header + payload

其中, B 表示长度为一个字节的无符号整数, H 表示长度为两个字节的无符号整数。

校验和

ICMP 报文校验和字段需要自行计算,计算步骤如下:

  1. 0 为校验和封装一个用于计算的 伪报文
  2. 将报文分成两个字节一组,如果总字节数为奇数,则在末尾追加一个零字节;
  3. 对所有 双字节 进行按位求和;
  4. 将高于 16 位的进位取出相加,直到没有进位;
  5. 将校验和按位取反;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
def calculate_checksum(icmp):
    if len(icmp) % 2:
        icmp += b'\00'

    checksum = 0
    for i in range(len(icmp)//2):
        word, = struct.unpack('!H', icmp[2*i:2*i+2])
        checksum += word

    while True:
        carry = checksum >> 16
        if carry:
            checksum = (checksum & 0xffff) + carry
        else:
            break

    checksum = ~checksum & 0xffff

    return struct.pack('!H', checksum)

套接字

编程发起网络通信,离不开套接字,收发 ICMP 报文当然也不例外。我们需要创建一个原始套接字,协议类型为 IPPROTO_ICMP :

1
2
3
from socket import socket, AF_INET, SOCK_RAW, IPPROTO_ICMP

s = socket(AF_INET, SOCK_RAW, IPPROTO_ICMP)

调用 sendto 系统调用发送 ICMP 报文:

1
s.sendto(icmp, 0, ('xxx.xxx.xxx.xxx', 0))

其中,第一个参数为封装好的 ICMP 报文; 第二个参数为发送标志位,无特殊要求一般填 0 ; 第三个参数为目的 IP 地址-端口对,端口这里填 0

类似地,调用 recvfrom 系统调用即可接收 ICMP 报文:

1
2
ip, (src_ip, _) = s.recvfrom(1500)
icmp = ip[20:]

参数为接收缓冲区大小,这里用 1500 刚好是一个典型的 MTU 大小。 注意到, recvfrom 系统调用返回 IP 报文,去掉前 20 字节的 IP 头部便得到 ICMP 报文。

注意,创建 原始套接字 ( SOCK_RAW )需要超级用户权限。

代码实现

掌握基本原理后,便可着手编写代码了。

首先,实现 pack_icmp_echo_request 函数,用于封装 ICMP 回显请求 报文:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def pack_icmp_echo_request(ident, seq, payload):
    pseudo = struct.pack(
        '!BBHHH',
        8,
        0,
        0,
        ident,
        seq,
    ) + payload
    checksum = calculate_checksum(pseudo)
    return pseudo[:2] + checksum + pseudo[4:]
  1. 2-9 行封装用于计算校验和的 伪报文
    • 类型 字段为 8
    • 代码 字段为 0
    • 校验和 字段为 0
    • 标识符序号 以及 数据负载 字段由参数指定;
  2. 10 行调用 calculate_checksum 函数计算 校验和
  3. 11 行替换伪报文中的校验和并返回。

对应地,实现 unpack_icmp_echo_reply 用于解析 ICMP 回显答复 报文:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
def unpack_icmp_echo_reply(icmp):
    _type, code, _, ident, seq, = struct.unpack(
        '!BBHHH',
        icmp[:8]
    )
    if _type != 0:
        return
    if code != 0:
        return

    payload = icmp[8:]

    return ident, seq, payload

接着,实现 send_routine 用于循环发送 ICMP 回显请求 报文:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
def send_routine(sock, addr, ident, magic, stop):
    # first sequence no
    seq = 1

    while not stop:
        # currrent time
        sending_ts = time.time()

        # packet current time to payload
        # in order to calculate round trip time from reply
        payload = struct.pack('!d', sending_ts) + magic

        # pack icmp packet
        icmp = pack_icmp_echo_request(ident, seq, payload)

        # send it
        sock.sendto(icmp, 0, (addr, 0))

        seq += 1
        time.sleep(1)

该函数需要 5 个参数,分别如下:

  • sock ,用于发送报文的 套接字
  • addr ,目标 IP 地址
  • ident标识符
  • magic ,打包在数据负载中的魔性字符串;
  • stop ,停止发送标识;
  1. 3 行定义 报文序号 ,从 1 开始递增; 接着是发送循环,不停发包,每次相隔一秒;
  2. 7 行获取 发送时间戳 ; 第 11 行将时间戳以及魔性字符串打包成 数据负载
  3. 14 行调用 pack_icmp_echo_request 封装 回显请求 报文;
  4. 17 行调用 sendto 系统调用 发送报文 ; 第 19-20 行自增发送序号并等待一秒;

同样,实现 recv_routine 函数用于循环接收 ICMP 回显答复 报文:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
def recv_routine(sock, ident, magic):
    while True:
        # wait for another icmp packet
        ip, (src_addr, _) = sock.recvfrom(1500)

        # unpack it
        result = unpack_icmp_echo_reply(ip[20:])
        if not result:
            continue

        # print info
        _ident, seq, payload = result
        if _ident != ident:
            continue

        sending_ts, = struct.unpack('!d', payload[:8])
        print('%s seq=%d %5.2fms' % (
            src_addr,
            seq,
            (time.time()-sending_ts) * 1000,
        ))
  1. 4 行调用 recvfrom 系统调用接收 ICMP 报文;
  2. 7 行调用 unpack_icmp_echo_reply 解析报文
  3. 8-9 行忽略非回显答复报文;
  4. 13-14 行检查标识符并忽略非法报文(可能是响应其他进程的);
  5. 16 行从 数据负载 中取出 发送时间戳
  6. 17-21 行,计算 往返时间 并输出提示;

报文 发送接收 均实现完毕,如何让程序同时干两件事情呢? 可以选用 线程 方案:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def ping(addr):
    # create socket for sending and receiving icmp packet
    sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)

    # id field
    ident = os.getpid()
    # magic string to pad
    magic = b'1234567890'

    # sender thread stop flag
    # append anything to stop
    sender_stop = []

    # start sender thread
    # call send_routine function to send icmp forever
    args = (sock, addr, ident, magic, sender_stop,)
    sender = threading.Thread(target=send_routine, args=args)
    sender.start()

    try:
        # receive icmp reply forever
        recv_routine(sock, ident, magic)
    except KeyboardInterrupt:
        pass

    # tell sender thread to stop
    sender_stop.append(True)

    # clean sender thread
    sender.join()

    print()
  1. 3 行创建用于发送、接收报文的 套接字
  2. 6 行获取进程 PID 作为 标识符
  3. 16-18 行启动一个 子线程 执行 报文发送 函数;
  4. 20-24主线程 执行 报文接收 函数直至用户按下 ctrl-C
  5. 27 行程序退出前,通知发送线程退出并回收线程资源( join );

将以上所有代码片段组装在一起,便得到 ping.py 命令。 迫不及待想运行一下:

1
2
3
4
5
$ gcc -o ping ping.c
$ sudo ./ping 8.8.8.8
8.8.8.8 seq=1 25.70ms
8.8.8.8 seq=2 25.28ms
8.8.8.8 seq=3 25.26ms

It works!

完整源码

扩展阅读

小菜学网络】系列文章首发于公众号【小菜学编程】,敬请关注:

【小菜学网络】系列文章首发于公众号【小菜学编程】,敬请关注: