| % send, sniff, sr* tests for Scapy |
| |
| ~ needs_root |
| |
| ############ |
| ############ |
| + Test bridge_and_sniff() using tap sockets |
| |
| ~ tap |
| |
| = Create two tap interfaces |
| |
| import subprocess |
| from threading import Thread |
| |
| tap0, tap1 = [TunTapInterface("tap%d" % i) for i in range(2)] |
| |
| chk_kwargs = {"timeout": 3} |
| |
| if LINUX: |
| for i in range(2): |
| assert subprocess.check_call(["ip", "link", "set", "tap%d" % i, "up"], **chk_kwargs) == 0 |
| else: |
| for i in range(2): |
| assert subprocess.check_call(["ifconfig", "tap%d" % i, "up"], **chk_kwargs) == 0 |
| |
| = Run a sniff thread on the tap1 **interface** |
| * It will terminate when 5 IP packets from 192.0.2.1 have been sniffed |
| started = threading.Event() |
| t_sniff = Thread( |
| target=sniff, |
| kwargs={"iface": "tap1", "count": 5, "prn": Packet.summary, |
| "lfilter": lambda p: IP in p and p[IP].src == "192.0.2.1", |
| "started_callback": started.set}, |
| name="tests sniff 1") |
| t_sniff.start() |
| started.wait(timeout=5) |
| |
| = Run a bridge_and_sniff thread between the taps **sockets** |
| * It will terminate when 5 IP packets from 192.0.2.1 have been forwarded |
| started = threading.Event() |
| t_bridge = Thread(target=bridge_and_sniff, args=(tap0, tap1), |
| kwargs={"store": False, "count": 5, 'prn': Packet.summary, |
| "lfilter": lambda p: IP in p and p[IP].src == "192.0.2.1", |
| "started_callback": started.set}, |
| name="tests bridge_and_sniff 1") |
| t_bridge.start() |
| started.wait(timeout=5) |
| |
| = Send five IP packets from 192.0.2.1 to the tap0 **interface** |
| sendp([Ether(dst=ETHER_BROADCAST) / IP(src="192.0.2.1") / ICMP()], iface="tap0", |
| count=5) |
| |
| = Wait for the threads |
| t_bridge.join(5) |
| t_sniff.join(5) |
| assert not t_bridge.is_alive() |
| assert not t_sniff.is_alive() |
| |
| = Run a sniff thread on the tap1 **interface** |
| * It will terminate when 5 IP packets from 198.51.100.1 have been sniffed |
| started = threading.Event() |
| t_sniff = Thread( |
| target=sniff, |
| kwargs={"iface": "tap1", "count": 5, "prn": Packet.summary, |
| "lfilter": lambda p: IP in p and p[IP].src == "198.51.100.1", |
| "started_callback": started.set}, |
| name="tests sniff 2") |
| t_sniff.start() |
| started.wait(timeout=5) |
| |
| = Run a bridge_and_sniff thread between the taps **sockets** |
| * It will "NAT" packets from 192.0.2.1 to 198.51.100.1 and will terminate when 5 IP packets have been forwarded |
| def nat_1_2(pkt): |
| if IP in pkt and pkt[IP].src == "192.0.2.1": |
| pkt[IP].src = "198.51.100.1" |
| del pkt[IP].chksum |
| return pkt |
| return False |
| |
| started = threading.Event() |
| t_bridge = Thread(target=bridge_and_sniff, args=(tap0, tap1), |
| kwargs={"store": False, "count": 5, 'prn': Packet.summary, |
| "xfrm12": nat_1_2, |
| "lfilter": lambda p: IP in p and p[IP].src == "192.0.2.1", |
| "started_callback": started.set}, |
| name="tests bridge_and_sniff 2") |
| t_bridge.start() |
| started.wait(timeout=5) |
| |
| = Send five IP packets from 192.0.2.1 to the tap0 **interface** |
| sendp([Ether(dst=ETHER_BROADCAST) / IP(src="192.0.2.1") / ICMP()], iface="tap0", |
| count=5) |
| |
| = Wait for the threads |
| t_bridge.join(5) |
| t_sniff.join(5) |
| assert not t_bridge.is_alive() |
| assert not t_sniff.is_alive() |
| |
| = Delete the tap interfaces |
| if conf.use_pypy: |
| # See https://pypy.readthedocs.io/en/latest/cpython_differences.html |
| tap0.close() |
| tap1.close() |
| else: |
| del tap0, tap1 |
| |
| |
| ############ |
| ############ |
| + Test bridge_and_sniff() using tun sockets |
| |
| ~ tun not_libpcap |
| |
| = Create two tun interfaces |
| |
| import subprocess |
| from threading import Thread |
| |
| tun0, tun1 = [TunTapInterface("tun%d" % i) for i in range(2)] |
| |
| chk_kwargs = {"timeout": 3} |
| |
| if LINUX: |
| for i in range(2): |
| assert subprocess.check_call(["ip", "link", "set", "tun%d" % i, "up"], **chk_kwargs) == 0 |
| assert subprocess.check_call([ |
| "ip", "addr", "change", |
| "192.0.2.1", "peer", "192.0.2.2", "dev", "tun0"], **chk_kwargs) == 0 |
| else: |
| for i in range(2): |
| assert subprocess.check_call(["ifconfig", "tun%d" % i, "up"], **chk_kwargs) == 0 |
| assert subprocess.check_call(["ifconfig", "tun0", "192.0.2.1", "192.0.2.2"], **chk_kwargs) == 0 |
| |
| = Run a sniff thread on the tun1 **interface** |
| * It will terminate when 5 IP packets from 192.0.2.1 have been sniffed |
| started = threading.Event() |
| t_sniff = Thread(target=sniff, |
| kwargs={"iface": "tun1", "count": 5, |
| "prn": Packet.summary, |
| "lfilter": lambda p: IP in p and p[IP].src == "192.0.2.1", |
| "started_callback": started.set}, |
| name="tests sniff 3") |
| |
| t_sniff.start() |
| started.wait(timeout=5) |
| |
| = Run a bridge_and_sniff thread between the tuns **sockets** |
| * It will terminate when 5 IP packets from 192.0.2.1 have been forwarded. |
| started = threading.Event() |
| t_bridge = Thread(target=bridge_and_sniff, args=(tun0, tun1), |
| kwargs={"store": False, "count": 5, 'prn': Packet.summary, |
| "xfrm12": lambda pkt: pkt, |
| "lfilter": lambda p: IP in p and p[IP].src == "192.0.2.1", |
| "started_callback": started.set}, |
| name="tests bridge_and_sniff 3") |
| t_bridge.start() |
| started.wait(timeout=5) |
| |
| = Send five IP packets from 192.0.2.1 to the tun0 **interface** |
| conf.route.add(net="192.0.2.2/32", dev="tun0") |
| send([IP(src="192.0.2.1", dst="192.0.2.2") / ICMP()], count=5, iface="tun0") |
| conf.route.delt(net="192.0.2.2/32", dev="tun0") |
| |
| = Wait for the threads |
| t_bridge.join(5) |
| t_sniff.join(5) |
| assert not t_bridge.is_alive() |
| assert not t_sniff.is_alive() |
| |
| = Run a sniff thread on the tun1 **interface** |
| * It will terminate when 5 IP packets from 198.51.100.1 have been sniffed |
| started = threading.Event() |
| t_sniff = Thread(target=sniff, |
| kwargs={"iface": "tun1", "count": 5, "prn": Packet.summary, |
| "lfilter": lambda p: IP in p and p[IP].src == "198.51.100.1", |
| "started_callback": started.set}, |
| name="tests sniff 4") |
| |
| t_sniff.start() |
| started.wait(timeout=5) |
| |
| = Run a bridge_and_sniff thread between the tuns **sockets** |
| * It will "NAT" packets from 192.0.2.1 to 198.51.100.1 and will terminate when 5 IP packets have been forwarded |
| def nat_1_2(pkt): |
| if IP in pkt and pkt[IP].src == "192.0.2.1": |
| pkt[IP].src = "198.51.100.1" |
| del pkt[IP].chksum |
| return pkt |
| return False |
| |
| started = threading.Event() |
| t_bridge = Thread(target=bridge_and_sniff, args=(tun0, tun1), |
| kwargs={"store": False, "count": 5, 'prn': Packet.summary, |
| "xfrm12": nat_1_2, |
| "lfilter": lambda p: IP in p and p[IP].src == "192.0.2.1", |
| "started_callback": started.set}, |
| name="tests bridge_and_sniff 4") |
| t_bridge.start() |
| started.wait(timeout=5) |
| |
| = Send five IP packets from 192.0.2.1 to the tun0 **interface** |
| conf.route.add(net="192.0.2.2/32", dev="tun0") |
| send([IP(src="192.0.2.1", dst="192.0.2.2") / ICMP()], count=5, iface="tun0") |
| conf.route.delt(net="192.0.2.2/32", dev="tun0") |
| |
| = Wait for the threads |
| t_bridge.join(5) |
| t_sniff.join(5) |
| assert not t_bridge.is_alive() |
| assert not t_sniff.is_alive() |
| |
| = Delete the tun interfaces |
| if conf.use_pypy: |
| # See https://pypy.readthedocs.io/en/latest/cpython_differences.html |
| tun0.close() |
| tun1.close() |
| else: |
| del tun0, tun1 |
| |
| |
| ############ |
| ############ |
| + Test bridge_and_sniff() using veth pairs |
| ~ linux needs_root veth |
| |
| = Ensure bridge_and_sniff does not close sockets if data is send within xfrm on ingress interface |
| |
| from scapy.arch.linux import VEthPair |
| |
| with VEthPair('a_0', 'a_1') as veth_0: |
| with VEthPair('b_0', 'b_1') as veth_1: |
| xfrm_count = { |
| 'a_0':0, |
| 'b_0': 0 |
| } |
| def xfrm_x(pkt): |
| pkt_tx = pkt.copy() |
| ether_lyr = pkt_tx[Ether] |
| ether_lyr.type = 0x1234 # we send to peer interface - avoid loop |
| # send on receiving interface - triggers return None on recv() in L2Socket |
| sendp(pkt_tx, iface=pkt.sniffed_on) |
| global xfrm_count |
| xfrm_count[pkt.sniffed_on] = xfrm_count[pkt.sniffed_on] + 1 |
| return True |
| started = threading.Event() |
| t_bridge = Thread(target=bridge_and_sniff, |
| args=('a_0', 'b_0'), |
| kwargs={ |
| 'xfrm12': xfrm_x, |
| 'xfrm21': xfrm_x, |
| 'store': False, |
| 'count': 4, |
| 'lfilter': lambda p: Ether in p and p[Ether].type == 0xbeef, |
| "started_callback": started.set}, |
| name="tests bridge_and_sniff VEthPair") |
| t_bridge.start() |
| started.wait(timeout=5) |
| # send frames in both directions |
| for if_name in ['a_1', 'b_1', 'a_1', 'b_1']: |
| sendp([Ether(type=0xbeef) / |
| Raw(b'On a scale from one to ten what is your favourite colour of the alphabet?')], |
| iface=if_name) |
| t_bridge.join(1) |
| # now test of the socket used in bridge_and_sniff() was alive all the time |
| assert (xfrm_count['a_0'] == 2) |
| assert (xfrm_count['b_0'] == 2) |
| |
| |
| ############ |
| ############ |
| + Test arpleak() using a tap socket |
| |
| ~ tap tcpdump |
| |
| = Create a tap interface |
| |
| from unittest import mock |
| import struct |
| import subprocess |
| from threading import Thread |
| import time |
| |
| tap0 = TunTapInterface("tap0") |
| |
| chk_kwargs = {"timeout": 3} |
| |
| if LINUX: |
| assert subprocess.check_call(["ip", "link", "set", "tap0", "up"], **chk_kwargs) == 0 |
| else: |
| assert subprocess.check_call(["ifconfig", "tap0", "up"], **chk_kwargs) == 0 |
| |
| = Check for arpleak |
| |
| def answer_arp_leak(pkt): |
| mymac = b"\x00\x01\x02\x03\x04\x06" |
| myip = b"\xc0\x00\x02\x02" # 192.0.2.2 |
| if not ARP in pkt: |
| return |
| e_src = pkt.src |
| pkt = raw(pkt[ARP]) |
| if pkt[:4] != b'\x00\x01\x08\x00': |
| print("Invalid ARP") |
| return |
| hwlen, plen, op = struct.unpack('>BBH', pkt[4:8]) |
| if op != 1: |
| print("Invalid ARP op") |
| return |
| fmt = ('%ds%ds' % (hwlen, plen)) * 2 |
| hwsrc, psrc, hwdst, pdst = struct.unpack(fmt, |
| pkt[8:8 + (plen + hwlen) * 2]) |
| if pdst[:4] != myip[:plen]: |
| print("Invalid ARP pdst %r" % pdst) |
| return |
| ans = Ether(dst=e_src, src=mymac, type=0x0806) |
| ans /= (b'\x00\x01\x08\x00' + |
| struct.pack('>BBH' + fmt, |
| hwlen, plen, 2, mymac, myip, hwsrc, psrc)) |
| tap0.send(ans) |
| print('Answered!') |
| |
| started = threading.Event() |
| t_answer = Thread( |
| target=sniff, |
| kwargs={"prn": answer_arp_leak, "timeout": 10, "store": False, |
| "opened_socket": tap0, |
| "started_callback": started.set}, |
| name="tests answer_arp_leak") |
| |
| t_answer.start() |
| started.wait(timeout=5) |
| |
| @mock.patch("scapy.layers.l2.get_if_addr") |
| @mock.patch("scapy.layers.l2.get_if_hwaddr") |
| def test_arpleak(mock_get_if_hwaddr, mock_get_if_addr, hwlen=255, plen=255): |
| conf.route.ifadd("tap0", "192.0.2.0/24") |
| mock_get_if_addr.side_effect = lambda _: "192.0.2.1" |
| mock_get_if_hwaddr.side_effect = lambda _: "00:01:02:03:04:05" |
| return arpleak("192.0.2.2/31", timeout=2, hwlen=hwlen, plen=plen) |
| |
| ans, unans = test_arpleak() |
| assert len(ans) == 1 |
| assert len(unans) == 1 |
| ans, unans = test_arpleak(hwlen=6) |
| assert len(ans) == 1 |
| assert len(unans) == 1 |
| ans, unans = test_arpleak(plen=4) |
| assert len(ans) == 1 |
| assert len(unans) == 1 |
| |
| t_answer.join(15) |
| |
| if t_answer.is_alive(): |
| raise Exception("Test timed out") |
| |
| if conf.use_pypy: |
| # See https://pypy.readthedocs.io/en/latest/cpython_differences.html |
| tap0.close() |
| else: |
| del tap0 |
| |
| ##### |
| ##### |
| + Test sr() on multiple interfaces |
| |
| = Setup multiple linux interfaces and ranges |
| ~ linux needs_root dbg |
| |
| import os |
| exit_status = os.system("ip netns add blob0") |
| exit_status |= os.system("ip netns add blob1") |
| exit_status |= os.system("ip link add name scapy0.0 type veth peer name scapy0.1") |
| exit_status |= os.system("ip link add name scapy1.0 type veth peer name scapy1.1") |
| exit_status |= os.system("ip link set scapy0.1 netns blob0 up") |
| exit_status |= os.system("ip link set scapy1.1 netns blob1 up") |
| exit_status |= os.system("ip addr add 100.64.2.1/24 dev scapy0.0") |
| exit_status |= os.system("ip addr add 100.64.3.1/24 dev scapy1.0") |
| exit_status |= os.system("ip --netns blob0 addr add 100.64.2.2/24 dev scapy0.1") |
| exit_status |= os.system("ip --netns blob1 addr add 100.64.3.2/24 dev scapy1.1") |
| exit_status |= os.system("ip link set scapy0.0 up") |
| exit_status |= os.system("ip link set scapy1.0 up") |
| assert exit_status == 0 |
| |
| conf.ifaces.reload() |
| conf.route.resync() |
| |
| try: |
| pkts = sr(IP(dst=["100.64.2.2", "100.64.3.2"])/ICMP(), timeout=1)[0] |
| assert len(pkts) == 2 |
| assert pkts[0].answer.src in ["100.64.2.2", "100.64.3.2"] |
| assert pkts[1].answer.src in ["100.64.2.2", "100.64.3.2"] |
| finally: |
| e = os.system("ip netns del blob0") |
| e = os.system("ip netns del blob1") |
| conf.ifaces.reload() |
| conf.route.resync() |
| |
| |
| = sr() performance test |
| ~ linux needs_root veth not_pypy |
| |
| import subprocess |
| import shlex |
| |
| try: |
| # Create a dedicated network name space to simulate remote host |
| subprocess.check_call(shlex.split("sudo ip netns add scapy")) |
| # Create a virtual Ethernet pair to connect default and new NS |
| subprocess.check_call(shlex.split("sudo ip link add type veth")) |
| # Move veth1 to the new NS |
| subprocess.check_call(shlex.split("sudo ip link set veth1 netns scapy")) |
| # Setup vNIC in the default NS |
| subprocess.check_call(shlex.split("sudo ip link set veth0 up")) |
| subprocess.check_call(shlex.split("sudo ip addr add 192.168.168.1/24 dev veth0")) |
| # Setup vNIC in the dedicated NS |
| subprocess.check_call(shlex.split("sudo ip netns exec scapy ip link set lo up")) |
| subprocess.check_call(shlex.split("sudo ip netns exec scapy ip link set veth1 up")) |
| subprocess.check_call(shlex.split("sudo ip netns exec scapy ip addr add 192.168.168.2/24 dev veth1")) |
| # Perform test |
| conf.route.resync() |
| res, unansw = sr(IP(dst='192.168.168.2') / ICMP(seq=(1, 1000)), timeout=1, verbose=False) |
| finally: |
| try: |
| # Bring down the interfaces |
| subprocess.check_call(shlex.split("sudo ip netns exec scapy ip link set veth1 down")) |
| subprocess.check_call(shlex.split("sudo ip netns exec scapy ip link set lo down")) |
| # Delete the namespace |
| subprocess.check_call(shlex.split("sudo ip netns delete scapy")) |
| # Remove the virtual Ethernet pair |
| subprocess.check_call(shlex.split("sudo ip link delete veth0")) |
| except subprocess.CalledProcessError as e: |
| print(f"Error during cleanup: {e}") |
| |
| len(res) == 1000 |
| |