| % Regression tests for Linux only |
| |
| # More information at http://www.secdev.org/projects/UTscapy/ |
| |
| |
| ############ |
| ############ |
| |
| + Linux only test |
| |
| = L3RawSocket |
| ~ netaccess IP TCP linux needs_root |
| |
| with no_debug_dissector(): |
| x = sr1(IP(dst="www.google.com")/TCP(sport=RandShort(), dport=80, flags="S"),timeout=3) |
| |
| x |
| assert x[IP].ottl() in [32, 64, 128, 255] |
| assert 0 <= x[IP].hops() <= 126 |
| |
| # TODO: fix this test (randomly stuck) |
| # ex: https://travis-ci.org/secdev/scapy/jobs/247473497 |
| |
| #= Supersocket _flush_fd |
| #~ needs_root linux |
| # |
| #import select |
| # |
| #from scapy.arch.linux import _flush_fd |
| #socket = conf.L2listen() |
| #select.select([socket],[],[],2) |
| #_flush_fd(socket.ins) |
| |
| = Interface aliases & sub-interfaces |
| ~ linux needs_root |
| |
| import os |
| exit_status = os.system("ip link add name scapy0 type dummy") |
| exit_status = os.system("ip addr add 192.0.2.1/24 dev scapy0") |
| exit_status = os.system("ip link set scapy0 up") |
| exit_status = os.system("ifconfig scapy0:0 inet 198.51.100.1/24 up") |
| if exit_status == 0: |
| exit_status = os.system("ip addr show scapy0") |
| print(get_if_list()) |
| conf.route.resync() |
| print(conf.route.routes) |
| assert conf.route.route("198.51.100.254") == ("scapy0", "198.51.100.1", "0.0.0.0") |
| route_alias = (3325256704, 4294967040, "0.0.0.0", "scapy0", "198.51.100.1", 0) |
| assert route_alias in conf.route.routes |
| exit_status = os.system("ip link add link scapy0 name scapy0.42 type vlan id 42") |
| exit_status = os.system("ip addr add 203.0.113.42/24 dev scapy0.42") |
| exit_status = os.system("ip link set scapy0.42 up") |
| exit_status = os.system("ip route add 192.0.2.43/32 via 203.0.113.41") |
| print(get_if_list()) |
| conf.route.resync() |
| print(conf.route.routes) |
| assert conf.route.route("192.0.2.43") == ("scapy0.42", "203.0.113.42", "203.0.113.41") |
| route_specific = (3221226027, 4294967295, "203.0.113.41", "scapy0.42", "0.0.0.0", 0) |
| assert route_specific in conf.route.routes |
| assert conf.route.route("203.0.113.42") == ('scapy0.42', '203.0.113.42', '0.0.0.0') |
| assert conf.route.route("203.0.113.43") == ('scapy0.42', '203.0.113.42', '0.0.0.0') |
| exit_status = os.system("ip link del name dev scapy0") |
| else: |
| assert True |
| |
| |
| = Test scoped interface addresses |
| ~ linux needs_root |
| |
| import os |
| exit_status = os.system("ip link add name scapy0 type dummy") |
| exit_status = os.system("ip link add name scapy1 type dummy") |
| exit_status |= os.system("ip addr add 192.0.2.1/24 dev scapy0") |
| exit_status |= os.system("ip addr add 192.0.3.1/24 dev scapy1") |
| exit_status |= os.system("ip link set scapy0 address 00:01:02:03:04:05 multicast on up") |
| exit_status |= os.system("ip link set scapy1 address 06:07:08:09:10:11 multicast on up") |
| assert exit_status == 0 |
| |
| conf.ifaces.reload() |
| conf.route.resync() |
| conf.route6.resync() |
| |
| conf.route6 |
| |
| try: |
| # IPv4 |
| a = Ether()/IP(dst="224.0.0.1%scapy0") |
| assert a[Ether].src == "00:01:02:03:04:05" |
| assert a[IP].src == "192.0.2.1" |
| b = Ether()/IP(dst="224.0.0.1%scapy1") |
| assert b[Ether].src == "06:07:08:09:10:11" |
| assert b[IP].src == "192.0.3.1" |
| c = Ether()/IP(dst="224.0.0.1/24%scapy1") |
| assert c[Ether].src == "06:07:08:09:10:11" |
| assert c[IP].src == "192.0.3.1" |
| # IPv6 |
| a = Ether()/IPv6(dst="ff02::fb%scapy0") |
| assert a[Ether].src == "00:01:02:03:04:05" |
| assert a[IPv6].src == "fe80::201:2ff:fe03:405" |
| b = Ether()/IPv6(dst="ff02::fb%scapy1") |
| assert b[Ether].src == "06:07:08:09:10:11" |
| assert b[IPv6].src == "fe80::407:8ff:fe09:1011" |
| c = Ether()/IPv6(dst="ff02::fb/30%scapy1") |
| assert c[Ether].src == "06:07:08:09:10:11" |
| assert c[IPv6].src == "fe80::407:8ff:fe09:1011" |
| finally: |
| exit_status = os.system("ip link del scapy0") |
| exit_status = os.system("ip link del scapy1") |
| conf.ifaces.reload() |
| conf.route.resync() |
| conf.route6.resync() |
| |
| = catch loopback device missing |
| ~ linux needs_root |
| |
| from unittest.mock import patch |
| |
| # can't remove the lo device (or its address without causing trouble) - use some pseudo dummy instead |
| |
| with patch('scapy.arch.linux.conf.loopback_name', 'scapy_lo_x'): |
| routes = read_routes() |
| |
| = catch loopback device no address assigned |
| ~ linux needs_root |
| |
| import os, socket |
| from unittest.mock import patch |
| |
| try: |
| exit_status = os.system("ip link add name scapy_lo type dummy") |
| assert exit_status == 0 |
| exit_status = os.system("ip link set dev scapy_lo up") |
| assert exit_status == 0 |
| |
| with patch('scapy.arch.linux.conf.loopback_name', 'scapy_lo'): |
| routes = read_routes() |
| |
| exit_status = os.system("ip addr add dev scapy_lo 10.10.0.1/24") |
| assert exit_status == 0 |
| |
| with patch('scapy.arch.linux.conf.loopback_name', 'scapy_lo'): |
| routes = read_routes() |
| |
| lo_routes = [ |
| (ltoa(dst_int), ltoa(msk_int), gw_str, if_name, if_addr, metric) |
| for dst_int, msk_int, gw_str, if_name, if_addr, metric in routes |
| if if_name == "scapy_lo" |
| ] |
| lo_routes.sort(key=lambda x: x[0]) |
| |
| expected_routes = [ |
| (168427520, 4294967040, '0.0.0.0', 'scapy_lo', '10.10.0.1', 0), |
| (168427521, 4294967295, '0.0.0.0', 'scapy_lo', '10.10.0.1', 0), |
| (168427775, 4294967295, '0.0.0.0', 'scapy_lo', '10.10.0.1', 0), |
| ] |
| print(lo_routes) |
| print(expected_routes) |
| finally: |
| exit_status = os.system("ip link del dev scapy_lo") |
| assert exit_status == 0 |
| |
| = IPv6 link-local address selection |
| |
| conf.ifaces._add_fake_iface("scapy0", 'e2:39:91:79:19:10') |
| |
| from unittest.mock import patch |
| conf.route6.routes = [('fe80::', 64, '::', 'scapy0', ['fe80::e039:91ff:fe79:1910'], 256)] |
| conf.route6.ipv6_ifaces = set(['scapy0']) |
| bck_conf_iface = conf.iface |
| conf.iface = "scapy0" |
| |
| p = Ether()/IPv6(dst="ff02::1")/ICMPv6NIQueryName(data="ff02::1") |
| print(p.sprintf("%Ether.src% > %Ether.dst%\n%IPv6.src% > %IPv6.dst%")) |
| ip6_ll_address = 'fe80::e039:91ff:fe79:1910' |
| print(p[IPv6].src, ip6_ll_address) |
| assert p[IPv6].src == ip6_ll_address |
| mac_address = 'e2:39:91:79:19:10' |
| print(p[Ether].src, mac_address) |
| assert p[Ether].src == mac_address |
| |
| conf.iface = bck_conf_iface |
| conf.route6.resync() |
| |
| = IPv6 - check OS routes |
| ~ linux ipv6 |
| |
| addrs = in6_getifaddr() |
| if addrs: |
| assert all(in6_isvalid(addr[0]) for addr in in6_getifaddr()), 'invalid ipv6 address' |
| ifaces6 = [addr[2] for addr in in6_getifaddr()] |
| assert all(iface in ifaces6 for iface in conf.route6.ipv6_ifaces), 'ipv6 interface has route but no real' |
| |
| |
| = veth interface error handling |
| ~ linux needs_root veth |
| |
| from scapy.arch.linux import VEthPair |
| |
| try: |
| veth = VEthPair('this_IF_name_is_to_long_and_will_cause_an_error', 'veth_scapy_1') |
| veth.setup() |
| assert False |
| except subprocess.CalledProcessError: |
| pass |
| except Exception: |
| assert False |
| |
| = veth interface usage - ctx manager |
| ~ linux needs_root veth |
| |
| from threading import Condition |
| |
| cond_started = Condition() |
| |
| def _sniffer_started(): |
| |
| global cond_started |
| cond_started.acquire() |
| cond_started.notify() |
| cond_started.release() |
| |
| cond_started.acquire() |
| |
| try: |
| with VEthPair('veth_scapy_0', 'veth_scapy_1') as veth: |
| if_list = get_if_list() |
| assert ('veth_scapy_0' in if_list) |
| assert ('veth_scapy_1' in if_list) |
| frm_count = 0 |
| def _sniffer(): |
| sniffed = sniff(iface='veth_scapy_1', |
| store=True, |
| count=2, |
| lfilter=lambda p: Ether in p and p[Ether].type == 0xbeef, |
| started_callback=_sniffer_started, |
| timeout=3) |
| global frm_count |
| frm_count = 2 |
| t_sniffer = Thread(target=_sniffer, name="linux.uts sniff veth_scapy_1") |
| t_sniffer.start() |
| cond_started.wait() |
| sendp(Ether(type=0xbeef)/Raw(b'0123456789'), |
| iface='veth_scapy_0', |
| count=2) |
| t_sniffer.join(1) |
| assert frm_count == 2 |
| |
| if_list = get_if_list() |
| assert ('veth_scapy_0' not in if_list) |
| assert ('veth_scapy_1' not in if_list) |
| except subprocess.CalledProcessError: |
| assert False |
| except Exception: |
| assert False |
| |
| = veth interface usage - manual interface handling |
| ~ linux needs_root veth |
| |
| from threading import Condition |
| |
| cond_started = Condition() |
| |
| def _sniffer_started(): |
| |
| global cond_started |
| cond_started.acquire() |
| cond_started.notify() |
| cond_started.release() |
| |
| cond_started.acquire() |
| |
| |
| veth = VEthPair('veth_scapy_0', 'veth_scapy_1') |
| try: |
| veth.setup() |
| veth.up() |
| except subprocess.CalledProcessError: |
| assert False |
| except Exception: |
| assert False |
| |
| conf.ifaces.reload() |
| if_list = get_if_list() |
| assert ('veth_scapy_0' in if_list) |
| assert ('veth_scapy_1' in if_list) |
| |
| frm_count = 0 |
| def _sniffer(): |
| sniffed = sniff(iface='veth_scapy_1', |
| store=True, |
| count=2, |
| lfilter=lambda p: Ether in p and p[Ether].type == 0xbeef, |
| started_callback=_sniffer_started, |
| timeout=3) |
| global frm_count |
| frm_count = 2 |
| |
| t_sniffer = Thread(target=_sniffer, name="linux.uts sniff veth_scapy_1 2") |
| t_sniffer.start() |
| cond_started.wait() |
| sendp(Ether(type=0xbeef)/Raw(b'0123456789'), |
| iface='veth_scapy_0', |
| count=2) |
| t_sniffer.join(1) |
| assert frm_count == 2 |
| |
| try: |
| veth.down() |
| veth.destroy() |
| |
| conf.ifaces.reload() |
| if_list = get_if_list() |
| assert ('veth_scapy_0' not in if_list) |
| assert ('veth_scapy_1' not in if_list) |
| except subprocess.CalledProcessError: |
| assert False |
| except Exception: |
| assert False |
| |
| |
| = Routing table, interface with no names |
| ~ linux |
| |
| from unittest.mock import patch |
| |
| @patch("scapy.arch.linux.ioctl") |
| def test_read_routes(mock_ioctl): |
| def raise_ioerror(*args, **kwargs): |
| if args[1] == 0x8912: |
| return args[2] |
| raise IOError |
| mock_ioctl.side_effect = raise_ioerror |
| read_routes() |
| |
| test_read_routes() |
| |
| |
| = L3PacketSocket sendto exception |
| ~ linux needs_root |
| |
| from scapy.arch.linux import L3PacketSocket |
| |
| import socket |
| from unittest import mock |
| |
| @mock.patch("scapy.arch.linux.socket.socket.sendto") |
| def test_L3PacketSocket_sendto_python3(mock_sendto): |
| mock_sendto.side_effect = OSError(22, 2807) |
| l3ps = L3PacketSocket() |
| l3ps.send(IP(dst="8.8.8.8")/ICMP()) |
| return True |
| |
| assert test_L3PacketSocket_sendto_python3() |
| |
| = Test _interface_selection |
| ~ netaccess linux needs_root |
| |
| import os |
| from scapy.sendrecv import _interface_selection |
| assert _interface_selection(IP(dst="8.8.8.8")/UDP()) == (conf.iface, False) |
| exit_status = os.system("ip link add name scapy0 type dummy") |
| exit_status = os.system("ip addr add 192.0.2.1/24 dev scapy0") |
| exit_status = os.system("ip addr add fc00::/24 dev scapy0") |
| exit_status = os.system("ip link set scapy0 up") |
| conf.ifaces.reload() |
| conf.route.resync() |
| conf.route6.resync() |
| assert _interface_selection(IP(dst="192.0.2.42")/UDP()) == ("scapy0", False) |
| assert _interface_selection(IPv6(dst="fc00::ae0d")/UDP()) == ("scapy0", True) |
| exit_status = os.system("ip link del name dev scapy0") |
| conf.ifaces.reload() |
| conf.route.resync() |
| conf.route6.resync() |
| |
| = Test 802.1Q sniffing |
| ~ linux needs_root veth |
| |
| from scapy.arch.linux import VEthPair |
| from threading import Thread, Condition |
| |
| def _send(): |
| sendp(Ether()/IP(dst="198.51.100.2")/ICMP(), iface='vlanleft0', count=2) |
| |
| |
| with VEthPair("left0", "right0") as veth: |
| exit_status = os.system("ip link add link right0 name vlanright0 type vlan id 42") |
| exit_status = os.system("ip link add link left0 name vlanleft0 type vlan id 42") |
| exit_status = os.system("ip link set vlanright0 up") |
| exit_status = os.system("ip link set vlanleft0 up") |
| exit_status = os.system("ip addr add 198.51.100.1/24 dev vlanleft0") |
| exit_status = os.system("ip addr add 198.51.100.2/24 dev vlanright0") |
| sniffer = AsyncSniffer( |
| iface="right0", |
| lfilter=lambda p: Dot1Q in p, |
| count=2, |
| timeout=5, |
| started_callback=_send, |
| ) |
| sniffer.start() |
| sniffer.join(1) |
| if sniffer.running: |
| sniffer.stop() |
| raise Scapy_Exception("Sniffer did not stop !") |
| else: |
| results = sniffer.results |
| |
| |
| assert len(results) == 2 |
| assert all(Dot1Q in x for x in results) |
| |
| |
| = Reload interfaces & routes |
| |
| conf.ifaces.reload() |
| conf.route.resync() |
| conf.route6.resync() |