| % Regression tests for Scapy |
| |
| # More information at http://www.secdev.org/projects/UTscapy/ |
| |
| ############ |
| ############ |
| + Information on Scapy |
| |
| = Setup |
| def expect_exception(e, c): |
| try: |
| c() |
| return False |
| except e: |
| return True |
| |
| = Get conf |
| ~ conf command |
| * Dump the current configuration |
| conf |
| |
| IP().src |
| conf.loopback_name |
| |
| = Test module version detection |
| ~ conf |
| |
| class FakeModule(object): |
| __version__ = "v1.12" |
| |
| class FakeModule2(object): |
| __version__ = "5.143.3.12" |
| |
| class FakeModule3(object): |
| __version__ = "v2.4.2.dev42" |
| |
| from scapy.config import _version_checker |
| |
| assert _version_checker(FakeModule, (1,11,5)) |
| assert not _version_checker(FakeModule, (1,13)) |
| |
| assert _version_checker(FakeModule2, (5, 1)) |
| assert not _version_checker(FakeModule2, (5, 143, 4)) |
| |
| assert _version_checker(FakeModule3, (2, 4, 2)) |
| |
| = Check Scapy version |
| |
| from unittest import mock |
| |
| import scapy |
| from scapy import _parse_tag, _version_from_git_describe |
| from scapy.config import _version_checker |
| |
| b = Bunch(returncode=0, communicate=lambda *args, **kargs: (b"v2.4.5rc1-261-g44b98e14", None)) |
| with mock.patch('scapy.subprocess.Popen', return_value=b): |
| with mock.patch('scapy.os.path.isdir', return_value=True): |
| class GitModuleScapy(object): |
| __version__ = _version_from_git_describe() |
| |
| # GH3847 |
| with mock.patch('scapy.subprocess.Popen', return_value=b): |
| with mock.patch('scapy.os.path.isdir', return_value=False): |
| try: |
| _version_from_git_describe() |
| assert False |
| except ValueError: |
| pass |
| |
| assert GitModuleScapy.__version__ == '2.4.5rc1.dev261' |
| assert _version_checker(GitModuleScapy, (2, 4, 5)) |
| |
| = List layers |
| ~ conf command |
| ls() |
| |
| = List layers - advanced |
| ~ conf command |
| |
| with ContextManagerCaptureOutput() as cmco: |
| ls("IP", case_sensitive=True) |
| result_ls = cmco.get_output().split("\n") |
| |
| assert all("IP" in x for x in result_ls if x.strip()) |
| assert len(result_ls) >= 3 |
| |
| = List packet fields - ls |
| ~ command |
| |
| with ContextManagerCaptureOutput() as cmco: |
| ls(ARP(hwsrc="aa:aa:aa:aa:aa:aa", psrc="1.1.1.1")) |
| result_ls = cmco.get_output().split("\n") |
| |
| result_ls |
| assert result_ls[5] == "hwsrc : MultipleTypeField (SourceMACField, StrFixedLenField) = 'aa:aa:aa:aa:aa:aa' ('None')" |
| assert result_ls[6] == "psrc : MultipleTypeField (SourceIPField, SourceIP6Field, StrFixedLenField) = '1.1.1.1' ('None')" |
| |
| = List commands |
| ~ conf command |
| lsc() |
| |
| = List contribs |
| ~ command |
| def test_list_contrib(): |
| with ContextManagerCaptureOutput() as cmco: |
| list_contrib() |
| result_list_contrib = cmco.get_output() |
| assert "http2 : HTTP/2 (RFC 7540, RFC 7541) status=loads" in result_list_contrib |
| assert len(result_list_contrib.split('\n')) > 40 |
| |
| test_list_contrib() |
| |
| = Test packet show() on LatexTheme |
| % with LatexTheme |
| |
| class SmallPacket(Packet): |
| fields_desc = [ByteField("a", 0)] |
| |
| conf_color_theme = conf.color_theme |
| conf.color_theme = LatexTheme() |
| pkt = SmallPacket() |
| with ContextManagerCaptureOutput() as cmco: |
| pkt.show() |
| result = cmco.get_output().strip() |
| |
| assert result == '\\#\\#\\#[ \\textcolor{red}{\\bf SmallPacket} ]\\#\\#\\#\n \\textcolor{blue}{a} = \\textcolor{purple}{0}' |
| conf.color_theme = conf_color_theme |
| |
| |
| = Test rfc() |
| ~ command |
| |
| dat = rfc(IP, ret=True).split("\n") |
| assert dat[0].replace(" ", "").strip() == "0123" |
| assert "0123456789" in dat[1].replace(" ", "") |
| for l in dat: |
| # only upper case and +- |
| assert re.match(r"[A-Z+-]*", l) |
| |
| # Add a space before each + to avoid conflicts with UTscapy ! |
| # They will be stripped below |
| result = """ |
| 0 1 2 3 |
| 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| |VERSION| IHL | TOS | LEN | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | ID |FLAGS| FRAG | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | TTL | PROTO | CHKSUM | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | SRC | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | DST | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | OPTIONS | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| |
| Fig. IP |
| """.strip() |
| result = [x.strip() for x in result.split("\n")] |
| output = [x.strip() for x in rfc(IP, ret=True).strip().split("\n")] |
| assert result == output |
| |
| result = """ |
| 0 1 2 3 |
| 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | CODE | ID | LEN | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | TYPE |L|M|S|RES|VERSI| MESSAGE LEN | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | | DATA | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| |
| Fig. EAP_TTLS |
| """.strip() |
| result = [x.strip() for x in result.split("\n")] |
| output = [x.strip() for x in rfc(EAP_TTLS, ret=True).strip().split("\n")] |
| assert result == output |
| |
| |
| result = """ |
| 0 1 2 3 |
| 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| |VERSION| TC | FL | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | PLEN | NH | HLIM | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | SRC | |
| + + |
| | | |
| + + |
| | | |
| + + |
| | | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | DST | |
| + + |
| | | |
| + + |
| | | |
| + + |
| | | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| |
| Fig. IPv6 |
| """.strip() |
| result = [x.strip() for x in result.split("\n")] |
| output = [x.strip() for x in rfc(IPv6, ret=True).strip().split("\n")] |
| assert result == output |
| |
| |
| class TestPad(Packet): |
| fields_desc = [ShortField("f0", 0), |
| ShortField("f1", 0), |
| PadField(ByteField("f2", 1), 8), |
| PadField(ShortField("f3", 0), 4)] |
| |
| |
| result = """ |
| 0 1 2 3 |
| 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | F0 | F1 | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | F2 | padding | |
| +-+-+-+-+-+-+-+-+ + |
| | | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| | F3 | padding | |
| +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |
| |
| Fig. TestPad |
| """.strip() |
| result = [x.strip() for x in result.split("\n")] |
| output = [x.strip() for x in rfc(TestPad, ret=True).strip().split("\n")] |
| assert result == output |
| |
| = Check that all contrib modules are well-configured |
| ~ command |
| list_contrib(_debug=True) |
| |
| = Configuration |
| ~ conf |
| conf.debug_dissector = True |
| |
| = Configuration conf.use_* LINUX |
| ~ linux |
| |
| try: |
| conf.use_bpf = True |
| assert False |
| except: |
| True |
| |
| assert not conf.use_bpf |
| |
| = Configuration conf.use_* WINDOWS |
| ~ windows |
| |
| try: |
| conf.use_bpf = True |
| assert False |
| except: |
| True |
| |
| assert not conf.use_bpf |
| |
| = Configuration conf.use_pcap |
| ~ linux libpcap |
| |
| if not conf.use_pcap: |
| assert not conf.iface.provider.libpcap |
| conf.use_pcap = True |
| assert conf.iface.provider.libpcap |
| for iface in conf.ifaces.values(): |
| assert iface.provider.libpcap or iface.is_valid() == False |
| conf.use_pcap = False |
| assert not conf.iface.provider.libpcap |
| |
| = Test layer filtering |
| ~ filter |
| |
| pkt = NetflowHeader()/NetflowHeaderV5()/NetflowRecordV5() |
| |
| conf.layers.filter([NetflowHeader, NetflowHeaderV5]) |
| assert NetflowRecordV5 not in NetflowHeader(bytes(pkt)) |
| |
| conf.layers.unfilter() |
| assert NetflowRecordV5 in NetflowHeader(bytes(pkt)) |
| |
| |
| ########### |
| ########### |
| = UTscapy route check |
| * Check that UTscapy has correctly replaced the routes. Many tests won't work otherwise |
| |
| p = IP().src |
| p |
| assert p == "127.0.0.1" |
| |
| ############ |
| ############ |
| + Scapy functions tests |
| |
| = Interface related functions |
| |
| from unittest import mock |
| |
| conf.iface |
| |
| get_if_addr(conf.iface) |
| get_if_hwaddr(conf.iface) |
| |
| bytes_hex(get_if_raw_addr(conf.iface)) |
| |
| def get_dummy_interface(): |
| """Returns a dummy network interface""" |
| conf.ifaces._add_fake_iface("dummy0") |
| return "dummy0" |
| |
| get_if_raw_addr(get_dummy_interface()) |
| |
| get_if_list() |
| |
| get_working_if() |
| |
| get_if_raw_addr6(conf.iface) |
| |
| = More Interfaces related functions |
| |
| # Test name resolution |
| old = conf.iface |
| conf.iface = conf.iface.name |
| assert conf.iface == old |
| |
| assert isinstance(conf.iface, NetworkInterface) |
| assert conf.iface.is_valid() |
| |
| from unittest import mock |
| @mock.patch("scapy.interfaces.conf.route.routes", []) |
| @mock.patch("scapy.interfaces.conf.ifaces.values") |
| def _test_get_working_if(rou): |
| rou.side_effect = lambda: [] |
| assert get_working_if() is None |
| |
| assert conf.iface + "a" # left + |
| assert "hey! are you, ready to go ? %s" % conf.iface # format |
| assert "cuz you know the way to go" + conf.iface # right + |
| |
| _test_get_working_if() |
| |
| = Test conf.ifaces |
| |
| conf.iface |
| conf.ifaces |
| |
| assert conf.iface in conf.ifaces.values() |
| assert conf.ifaces.dev_from_index(conf.iface.index) == conf.iface |
| assert conf.ifaces.dev_from_networkname(conf.iface.network_name) == conf.iface |
| |
| conf.ifaces.data = {'a': NetworkInterface(InterfaceProvider(), {"name": 'a', "network_name": 'a', "description": 'a', "ips": ["127.0.0.1", "::1", "::2", "127.0.0.2"], "mac": 'aa:aa:aa:aa:aa:aa'})} |
| |
| with ContextManagerCaptureOutput() as cmco: |
| conf.ifaces.show() |
| output = cmco.get_output() |
| |
| data = """ |
| Source Index Name MAC IPv4 IPv6 |
| Unknown 0 a aa:aa:aa:aa:aa:aa 127.0.0.1 ::1 |
| 127.0.0.2 ::2 |
| """.strip() |
| |
| output = [x.strip() for x in output.strip().split("\n")] |
| data = [x.strip() for x in data.strip().split("\n")] |
| |
| assert output == data |
| |
| conf.ifaces.reload() |
| |
| = Test extcap detection in conf.ifaces |
| ~ linux extcap |
| |
| import os |
| from scapy.libs.extcap import load_extcap |
| |
| _bkp_extcap = conf.prog.extcap_folders |
| _bkp_providers = conf.ifaces.providers.copy() |
| |
| conf.ifaces.providers.clear() |
| |
| # Create some sort of extcap parody program |
| extcapfld = get_temp_dir() |
| extcapprog = os.path.join(extcapfld, "runner.sh") |
| data = """#!/usr/bin/env python3 |
| |
| import struct |
| import argparse |
| parser = argparse.ArgumentParser() |
| parser.add_argument('--extcap-interfaces', action='store_true') |
| parser.add_argument('--capture', action='store_true') |
| parser.add_argument('--extcap-config', action='store_true') |
| parser.add_argument('--scan-follow-rsp', action='store_true') |
| parser.add_argument('--scan-follow-aux', action='store_true') |
| parser.add_argument('--extcap-interface', type=str) |
| parser.add_argument('--fifo', type=str) |
| |
| args = parser.parse_args() |
| if args.extcap_interfaces: |
| # List interfaces |
| print(bytes.fromhex("0a657874636170207b76657273696f6e3d342e312e317d7b646973706c61793d6e524620536e696666657220666f7220426c7565746f6f7468204c457d7b68656c703d68747470733a2f2f7777772e6e6f7264696373656d692e636f6d2f536f6674776172652d616e642d546f6f6c732f446576656c6f706d656e742d546f6f6c732f6e52462d536e69666665722d666f722d426c7565746f6f74682d4c457d0a696e74657266616365207b76616c75653d2f6465762f747479555342352d4e6f6e657d7b646973706c61793d6e524620536e696666657220666f7220426c7565746f6f7468204c457d0a636f6e74726f6c207b6e756d6265723d307d7b747970653d73656c6563746f727d7b646973706c61793d4465766963657d7b746f6f6c7469703d446576696365206c6973747d0a636f6e74726f6c207b6e756d6265723d317d7b747970653d73656c6563746f727d7b646973706c61793d4b65797d7b746f6f6c7469703d7d0a636f6e74726f6c207b6e756d6265723d327d7b747970653d737472696e677d7b646973706c61793d56616c75657d7b746f6f6c7469703d3620646967697420706173736b6579206f72203136206f7220333220627974657320656e6372797074696f6e206b657920696e2068657861646563696d616c207374617274696e67207769746820273078272c2062696720656e6469616e20666f726d61742e49662074686520656e7465726564206b65792069732073686f72746572207468616e203136206f722033322062797465732c2069742077696c6c206265207a65726f2d70616464656420696e2066726f6e74277d7b76616c69646174696f6e3d5c625e28285b302d395d7b367d297c2830785b302d39612d66412d465d7b312c36347d297c285b302d39412d46612d665d7b327d5b3a2d5d297b357d285b302d39412d46612d665d7b327d2920287075626c69637c72616e646f6d2929245c627d0a636f6e74726f6c207b6e756d6265723d337d7b747970653d737472696e677d7b646973706c61793d41647620486f707d7b64656661756c743d33372c33382c33397d7b746f6f6c7469703d4164766572746973696e67206368616e6e656c20686f702073657175656e63652e204368616e676520746865206f7264657220696e2077686963682074686520736e6966666572207377697463686573206164766572746973696e67206368616e6e656c732e2056616c6964206368616e6e656c73206172652033372c20333820616e642033392073657061726174656420627920636f6d6d612e7d7b76616c69646174696f6e3d5e5c732a282833377c33387c3339295c732a2c5c732a297b302c327d2833377c33387c3339297b317d5c732a247d7b72657175697265643d747275657d0a636f6e74726f6c207b6e756d6265723d377d7b747970653d627574746f6e7d7b646973706c61793d436c6561727d7b746f6f6c746f703d436c656172206f722072656d6f7665206465766963652066726f6d20446576696365206c6973747d0a636f6e74726f6c207b6e756d6265723d347d7b747970653d627574746f6e7d7b726f6c653d68656c707d7b646973706c61793d48656c707d7b746f6f6c7469703d416363657373207573657220677569646520286c61756e636865732062726f77736572297d0a636f6e74726f6c207b6e756d6265723d357d7b747970653d627574746f6e7d7b726f6c653d726573746f72657d7b646973706c61793d44656661756c74737d7b746f6f6c7469703d52657365747320746865207573657220696e7465726661636520616e6420636c6561727320746865206c6f672066696c657d0a636f6e74726f6c207b6e756d6265723d367d7b747970653d627574746f6e7d7b726f6c653d6c6f676765727d7b646973706c61793d4c6f677d7b746f6f6c7469703d4c6f672070657220696e746572666163657d0a76616c7565207b636f6e74726f6c3d307d7b76616c75653d207d7b646973706c61793d416c6c206164766572746973696e6720646576696365737d7b64656661756c743d747275657d0a76616c7565207b636f6e74726f6c3d307d7b76616c75653d5b30302c30302c30302c30302c30302c30302c305d7d7b646973706c61793d466f6c6c6f772049524b7d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d307d7b646973706c61793d4c656761637920506173736b65797d7b64656661756c743d747275657d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d317d7b646973706c61793d4c6567616379204f4f4220646174617d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d327d7b646973706c61793d4c6567616379204c544b7d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d337d7b646973706c61793d5343204c544b7d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d347d7b646973706c61793d53432050726976617465204b65797d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d357d7b646973706c61793d49524b7d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d367d7b646973706c61793d416464204c4520616464726573737d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d377d7b646973706c61793d466f6c6c6f77204c4520616464726573737d").decode()) |
| elif args.extcap_interface and args.extcap_config: |
| # List config |
| print(bytes.fromhex("617267207b6e756d6265723d307d7b63616c6c3d2d2d6f6e6c792d6164766572746973696e677d7b646973706c61793d4f6e6c79206164766572746973696e67207061636b6574737d7b746f6f6c7469703d54686520736e69666665722077696c6c206f6e6c792063617074757265206164766572746973696e67207061636b6574732066726f6d207468652073656c6563746564206465766963657d7b747970653d626f6f6c666c61677d7b736176653d747275657d0a617267207b6e756d6265723d317d7b63616c6c3d2d2d6f6e6c792d6c65676163792d6164766572746973696e677d7b646973706c61793d4f6e6c79206c6567616379206164766572746973696e67207061636b6574737d7b746f6f6c7469703d54686520736e69666665722077696c6c206f6e6c792063617074757265206c6567616379206164766572746973696e67207061636b6574732066726f6d207468652073656c6563746564206465766963657d7b747970653d626f6f6c666c61677d7b736176653d747275657d0a617267207b6e756d6265723d327d7b63616c6c3d2d2d7363616e2d666f6c6c6f772d7273707d7b646973706c61793d46696e64207363616e20726573706f6e736520646174617d7b746f6f6c7469703d54686520736e69666665722077696c6c20666f6c6c6f77207363616e20726571756573747320616e64207363616e20726573706f6e73657320696e207363616e206d6f64657d7b747970653d626f6f6c666c61677d7b64656661756c743d747275657d7b736176653d747275657d0a617267207b6e756d6265723d337d7b63616c6c3d2d2d7363616e2d666f6c6c6f772d6175787d7b646973706c61793d46696e6420617578696c6961727920706f696e74657220646174617d7b746f6f6c7469703d54686520736e69666665722077696c6c20666f6c6c6f772061757820706f696e7465727320696e207363616e206d6f64657d7b747970653d626f6f6c666c61677d7b64656661756c743d747275657d7b736176653d747275657d0a617267207b6e756d6265723d337d7b63616c6c3d2d2d636f6465647d7b646973706c61793d5363616e20616e6420666f6c6c6f772064657669636573206f6e204c4520436f646564205048597d7b746f6f6c7469703d5363616e20666f72206465766963657320616e6420666f6c6c6f772061647665727469736572206f6e204c4520436f646564205048597d7b747970653d626f6f6c666c61677d7b64656661756c743d66616c73657d7b736176653d747275657d").decode()) |
| elif args.capture and args.extcap_interface and args.fifo: |
| # Capture |
| pkts = [ |
| bytes.fromhex("ffffffffffff00000000000008004500001c0001000040117cce7f0000017f0000010035003500080172") |
| ] |
| with open(args.fifo, "wb", 0) as fd: |
| # header |
| fd.write( |
| struct.pack( |
| "IHHIIII", |
| 0xa1b2c3d4, |
| 2, 4, 0, 0, 65535, 1 |
| ) |
| ) |
| for pkt in pkts: |
| fd.write(struct.pack("IIII", 0, 0, len(pkt), len(pkt))) |
| fd.write(bytes(pkt)) |
| else: |
| raise ValueError("Bad arguments") |
| """.strip() |
| with open(extcapprog, "w") as fd: |
| fd.write(data) |
| |
| print(data) |
| |
| os.chmod(extcapprog, 0o777) |
| |
| # Inject and load provider |
| conf.prog.extcap_folders = [extcapfld] |
| load_extcap() |
| print(conf.ifaces.providers) |
| conf.ifaces.reload() |
| |
| # Now do the tests |
| iface = conf.ifaces.dev_from_networkname('/dev/ttyUSB5-None') |
| assert iface.name == "nRF Sniffer for Bluetooth LE" |
| sock = iface.l2listen()(iface=iface) |
| pkts = sock.sniff(timeout=2) |
| sock.close() |
| assert UDP in pkts[0] |
| |
| config = iface.get_extcap_config() |
| assert config["arg"] == [ |
| ('0', '--only-advertising', 'Only advertising packets', '', ''), |
| ('1', '--only-legacy-advertising', 'Only legacy advertising packets', '', ''), |
| ('2', '--scan-follow-rsp', 'Find scan response data', 'true', ''), |
| ('3', '--scan-follow-aux', 'Find auxiliary pointer data', 'true', ''), |
| ('3', '--coded', 'Scan and follow devices on LE Coded PHY', 'false', '') |
| ] |
| |
| # Restore |
| conf.prog.extcap_folders = _bkp_extcap |
| conf.ifaces.providers = _bkp_providers |
| conf.ifaces.reload() |
| |
| = Test read_routes6() - default output |
| |
| routes6 = read_routes6() |
| if WINDOWS: |
| from scapy.arch.windows import _route_add_loopback |
| _route_add_loopback(routes6, True) |
| |
| routes6 |
| |
| # Expected results: |
| # - one route if there is only the loopback interface |
| # - one route if IPv6 is supported but disabled on network interfaces |
| # - three routes if there is a network interface |
| # - on OpenBSD, only two routes on lo0 are expected |
| |
| if routes6: |
| iflist = get_if_list() |
| if WINDOWS: |
| from scapy.arch.windows import _route_add_loopback |
| _route_add_loopback(ipv6=True, iflist=iflist) |
| if OPENBSD: |
| len(routes6) >= 2 |
| elif iflist == [conf.loopback_name]: |
| len(routes6) == 1 |
| elif len(iflist) >= 2: |
| len(routes6) >= 1 |
| else: |
| False |
| else: |
| # IPv6 seems disabled. Force a route to ::1 |
| conf.route6.routes.append(("::1", 128, "::", conf.loopback_name, ["::1"], 1)) |
| conf.route6.ipv6_ifaces = set([conf.loopback_name]) |
| True |
| |
| = Build HBHOptUnknown for IPv6ExtHdrHopByHop with disabled autopad |
| ~ ipv6 hbh opt |
| * Build the HBHOptUnknown of IPv6ExtHdrHopByHop with autopad=0 |
| v6Opt = HBHOptUnknown(otype=3, optlen=7, optdata="Beijing") |
| pkt = Ether()/IPv6()/IPv6ExtHdrHopByHop(autopad=0, options=[v6Opt, ]) |
| pkt.build() |
| |
| = Build HBHOptUnknown for IPv6ExtHdrDestOpt with disabled autopad |
| ~ ipv6 hbh opt |
| * Build the HBHOptUnknown of IPv6ExtHdrDestOpt with autopad=0 |
| v6Opt = HBHOptUnknown(otype=3, optlen=6, optdata="Haikou") |
| pkt = Ether()/IPv6()/IPv6ExtHdrDestOpt(autopad=0, options=[v6Opt, ]) |
| pkt.build() |
| |
| |
| = Test read_routes6() - check mandatory routes |
| |
| import re |
| ll_route = re.compile(r"fe80:\d{0,2}:") |
| # match fe80::, fe80:5:, etc. (if scoped) |
| |
| conf.route6 |
| |
| if len(routes6) > 2 and not WINDOWS: |
| # Identify routes to fe80::/64 |
| assert sum(1 for r in routes6 if r[0] == "::1" and r[4] == ["::1"]) >= 1 |
| if len(iflist) >= 2: |
| assert sum(1 for r in routes6 if ll_route.match(r[0]) and r[1] == 64) >= 1 |
| try: |
| # Identify a route to a node IPv6 link-local address |
| assert sum(1 for r in routes6 if in6_islladdr(r[0]) and r[1] == 128) >= 1 |
| except: |
| # IPv6 is not available, but we still check the loopback |
| assert conf.route6.route("::/0") == (conf.loopback_name, "::", "::") |
| assert sum(1 for r in routes6 if r[1] == 128 and r[4] == ["::1"]) >= 1 |
| else: |
| True |
| |
| = Test ifchange() |
| conf.route6.ifchange(conf.loopback_name, "::1/128") |
| if WINDOWS: |
| conf.netcache.in6_neighbor["::1"] = "ff:ff:ff:ff:ff:ff" # Restore fake cache |
| |
| True |
| |
| = Packet.route() |
| assert (Ether() / ARP()).route()[0] is not None |
| assert (Ether() / ARP()).payload.route()[0] is not None |
| assert (ARP(ptype=0, pdst="hello. this isn't a valid IP")).route()[0] is None |
| |
| = utils/in4_is* |
| |
| assert in4_ismaddr("224.0.0.1") |
| assert not in4_ismaddr("192.168.0.1") |
| assert in4_ismaddr("239.0.0.255") |
| |
| assert in4_ismlladdr("224.0.0.1") |
| assert in4_ismlladdr("224.0.0.255") |
| assert not in4_ismlladdr("224.0.1.255") |
| |
| assert in4_ismgladdr("235.0.0.1") |
| assert not in4_ismgladdr("224.0.0.1") |
| assert not in4_ismgladdr("239.0.0.1") |
| |
| assert in4_ismlsaddr("239.0.0.1") |
| assert not in4_ismlsaddr("224.0.0.1") |
| |
| assert in4_isaddrllallnodes("224.0.0.1") |
| assert not in4_isaddrllallnodes("224.0.0.3") |
| |
| assert in4_getnsmac(b'\xe0\x00\x00\x01') == '01:00:5e:00:00:01' |
| assert getmacbyip("224.0.0.1") == '01:00:5e:00:00:01' |
| |
| = plain_str test |
| |
| data = b"\xffsweet\xef celestia\xab" |
| assert plain_str(data) == "\\xffsweet\\xef celestia\\xab" |
| |
| ############ |
| ############ |
| + compat.py |
| |
| = test bytes_hex/hex_bytes |
| |
| monty_data = b"Stop! Who approaches the Bridge of Death must answer me these questions three, 'ere the other side he see." |
| hex_data = bytes_hex(monty_data) |
| assert hex_data == b'53746f70212057686f20617070726f61636865732074686520427269646765206f66204465617468206d75737420616e73776572206d65207468657365207175657374696f6e732074687265652c202765726520746865206f746865722073696465206865207365652e' |
| assert hex_bytes(hex_data) == monty_data |
| |
| = orb/chb |
| |
| assert orb(b"\x01"[0]) == 1 |
| assert chb(1) == b"\x01" |
| |
| ############ |
| ############ |
| + Main.py tests |
| |
| = Pickle and unpickle a packet |
| |
| import pickle |
| |
| a = IP(dst="192.168.0.1")/UDP() |
| |
| b = pickle.dumps(a) |
| c = pickle.loads(b) |
| |
| assert c[IP].dst == "192.168.0.1" |
| assert raw(c) == raw(a) |
| |
| = Usage test |
| |
| from scapy.main import _usage |
| try: |
| _usage() |
| assert False |
| except SystemExit: |
| assert True |
| |
| = Session test |
| |
| import builtins |
| |
| # This is automatic when using the console |
| def get_var(var): |
| return builtins.__dict__["scapy_session"][var] |
| |
| def set_var(var, value): |
| builtins.__dict__["scapy_session"][var] = value |
| |
| def del_var(var): |
| del builtins.__dict__["scapy_session"][var] |
| |
| init_session(None, {"init_value": 123}) |
| set_var("test_value", "8.8.8.8") # test_value = "8.8.8.8" |
| save_session() |
| del_var("test_value") |
| load_session() |
| update_session() |
| assert get_var("test_value") == "8.8.8.8" #test_value == "8.8.8.8" |
| assert get_var("init_value") == 123 |
| |
| = Session test with fname |
| |
| session_name = tempfile.mktemp() |
| init_session(session_name) |
| set_var("test_value", IP(dst="192.168.0.1")) # test_value = IP(dst="192.168.0.1") |
| save_session(fname="%s.dat" % session_name) |
| del_var("test_value") |
| |
| set_var("z", True) #z = True |
| load_session(fname="%s.dat" % session_name) |
| try: |
| get_var("z") |
| assert False |
| except: |
| pass |
| |
| set_var("z", False) #z = False |
| update_session(fname="%s.dat" % session_name) |
| assert get_var("test_value").dst == "192.168.0.1" #test_value.dst == "192.168.0.1" |
| assert not get_var("z") |
| |
| = Clear session files |
| |
| os.remove("%s.dat" % session_name) |
| |
| = Test temporary file creation |
| ~ ci_only |
| |
| scapy_delete_temp_files() |
| |
| tmpfile = get_temp_file(autoext=".ut") |
| tmpfile |
| if WINDOWS: |
| assert "scapy" in tmpfile and tmpfile.lower().startswith('c:\\users\\appveyor\\appdata\\local\\temp') |
| else: |
| import platform |
| BYPASS_TMP = platform.python_implementation().lower() == "pypy" or DARWIN |
| assert "scapy" in tmpfile and (BYPASS_TMP == True or "/tmp/" in tmpfile) |
| |
| assert conf.temp_files[0].endswith(".ut") |
| scapy_delete_temp_files() |
| assert len(conf.temp_files) == 0 |
| |
| = Emulate interact() |
| ~ interact |
| |
| import sys |
| from unittest import mock |
| from scapy.main import interact |
| |
| from scapy.main import DEFAULT_PRESTART_FILE, DEFAULT_PRESTART, _read_config_file |
| _read_config_file(DEFAULT_PRESTART_FILE, _locals=globals(), default=DEFAULT_PRESTART) |
| # By now .config/scapy/startup.py should have been created |
| with open(DEFAULT_PRESTART_FILE, "r") as fd: |
| OLD_DEFAULT_PRESTART = fd.read() |
| |
| with open(DEFAULT_PRESTART_FILE, "w+") as fd: |
| fd.write("conf.interactive_shell = 'ipython'") |
| |
| # Detect IPython |
| try: |
| import IPython |
| except: |
| code_interact_import = "scapy.main.code.interact" |
| else: |
| code_interact_import = "IPython.embed" |
| |
| @mock.patch(code_interact_import) |
| def interact_emulator(code_int, extra_args=[]): |
| try: |
| code_int.side_effect = lambda *args, **kwargs: lambda *args, **kwargs: None |
| interact(argv=["-s scapy1"] + extra_args, mybanner="What a test") |
| finally: |
| sys.ps1 = ">>> " |
| |
| interact_emulator() # Default |
| |
| try: |
| interact_emulator(extra_args=["-?"]) # Failing |
| assert False |
| except: |
| pass |
| |
| interact_emulator(extra_args=["-d"]) # Extended |
| |
| = Emulate interact() and test startup.py with ptpython |
| ~ interact |
| |
| import sys |
| from unittest import mock |
| |
| from scapy.main import DEFAULT_PRESTART_FILE, DEFAULT_PRESTART, _read_config_file |
| _read_config_file(DEFAULT_PRESTART_FILE, _locals=globals(), default=DEFAULT_PRESTART) |
| # By now .config/scapy/startup.py should have been created |
| with open(DEFAULT_PRESTART_FILE, "w+") as fd: |
| fd.write("conf.interactive_shell = 'ptpython'") |
| |
| called = [] |
| def checker(*args, **kwargs): |
| locals = kwargs.pop("locals") |
| assert locals["IP"] |
| history_filename = kwargs.pop("history_filename") |
| assert history_filename == conf.histfile |
| called.append(True) |
| |
| ptpython_mocked_module = Bunch( |
| repl=Bunch( |
| embed=checker |
| ) |
| ) |
| |
| modules_patched = { |
| "ptpython": ptpython_mocked_module, |
| "ptpython.repl": ptpython_mocked_module.repl, |
| "ptpython.repl.embed": ptpython_mocked_module.repl.embed, |
| } |
| |
| with mock.patch.dict("sys.modules", modules_patched): |
| try: |
| interact() |
| finally: |
| sys.ps1 = ">>> " |
| |
| # Restore |
| with open(DEFAULT_PRESTART_FILE, "w") as fd: |
| print(OLD_DEFAULT_PRESTART) |
| r = fd.write(OLD_DEFAULT_PRESTART) |
| |
| assert called |
| |
| = Test explore() with GUI mode |
| ~ command |
| |
| from unittest import mock |
| |
| def test_explore_gui(is_layer, layer): |
| prompt_toolkit_mocked_module = Bunch( |
| shortcuts=Bunch( |
| dialogs=Bunch( |
| radiolist_dialog=(lambda *args, **kargs: layer), |
| button_dialog=(lambda *args, **kargs: "layers" if is_layer else "contribs") |
| ) |
| ), |
| formatted_text=Bunch(HTML=lambda x: x), |
| __version__="2.0.0" |
| ) |
| # a mock.patch isn't enough to mock a module. Let's roll sys.modules |
| modules_patched = { |
| "prompt_toolkit": prompt_toolkit_mocked_module, |
| "prompt_toolkit.shortcuts": prompt_toolkit_mocked_module.shortcuts, |
| "prompt_toolkit.shortcuts.dialogs": prompt_toolkit_mocked_module.shortcuts.dialogs, |
| "prompt_toolkit.formatted_text": prompt_toolkit_mocked_module.formatted_text, |
| } |
| with mock.patch.dict("sys.modules", modules_patched): |
| with ContextManagerCaptureOutput() as cmco: |
| explore() |
| result_explore = cmco.get_output() |
| return result_explore |
| |
| conf.interactive = True |
| explore_dns = test_explore_gui(True, "scapy.layers.dns") |
| assert "DNS" in explore_dns |
| assert "DNS Question Record" in explore_dns |
| assert "DNSRRNSEC3" in explore_dns |
| assert "DNS TSIG Resource Record" in explore_dns |
| |
| explore_avs = test_explore_gui(False, "avs") |
| assert "AVSWLANHeader" in explore_avs |
| assert "AVS WLAN Monitor Header" in explore_avs |
| |
| = Test explore() with non-GUI mode |
| ~ command |
| |
| def test_explore_non_gui(layer): |
| with ContextManagerCaptureOutput() as cmco: |
| explore(layer) |
| result_explore = cmco.get_output() |
| return result_explore |
| |
| explore_dns = test_explore_non_gui("scapy.layers.dns") |
| assert "DNS" in explore_dns |
| assert "DNS Question Record" in explore_dns |
| assert "DNSRRNSEC3" in explore_dns |
| assert "DNS TSIG Resource Record" in explore_dns |
| |
| explore_avs = test_explore_non_gui("avs") |
| assert "AVSWLANHeader" in explore_avs |
| assert "AVS WLAN Monitor Header" in explore_avs |
| |
| assert test_explore_non_gui("scapy.layers.dns") == test_explore_non_gui("dns") |
| assert test_explore_non_gui("scapy.contrib.avs") == test_explore_non_gui("avs") |
| |
| try: |
| explore("unknown_module") |
| assert False # The previous should have raised an exception |
| except Scapy_Exception: |
| pass |
| |
| = Test load_contrib overwrite |
| load_contrib("gtp") |
| assert GTPHeader.__module__ == "scapy.contrib.gtp" |
| |
| load_contrib("gtp_v2") |
| assert GTPHeader.__module__ == "scapy.contrib.gtp_v2" |
| |
| load_contrib("gtp") |
| assert GTPHeader.__module__ == "scapy.contrib.gtp" |
| |
| = Test load_contrib failure |
| try: |
| load_contrib("doesnotexist") |
| assert False |
| except: |
| pass |
| |
| = Test sane function |
| sane("A\x00\xFFB") == "A..B" |
| |
| = Test lhex function |
| assert lhex(42) == "0x2a" |
| assert lhex((28,7)) == "(0x1c, 0x7)" |
| assert lhex([28,7]) == "[0x1c, 0x7]" |
| |
| = Test restart function |
| from unittest import mock |
| conf.interactive = True |
| |
| try: |
| from scapy.utils import restart |
| import os |
| @mock.patch("os.execv") |
| @mock.patch("subprocess.call") |
| @mock.patch("os._exit") |
| def _test(e, m, m2): |
| def check(x, y=[]): |
| z = [x] + y if not isinstance(x, list) else x + y |
| assert os.path.isfile(z[0]) |
| assert os.path.isfile(z[1]) |
| return 0 |
| m2.side_effect = check |
| m.side_effect = check |
| e.side_effect = lambda x: None |
| restart() |
| _test() |
| finally: |
| conf.interactive = False |
| |
| = Test linehexdump function |
| conf_color_theme = conf.color_theme |
| conf.color_theme = BlackAndWhite() |
| assert linehexdump(Ether(src="00:01:02:03:04:05"), dump=True) == 'FF FF FF FF FF FF 00 01 02 03 04 05 90 00 ..............' |
| conf.color_theme = conf_color_theme |
| |
| = Test chexdump function |
| chexdump(Ether(src="00:01:02:02:04:05"), dump=True) == "0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x01, 0x02, 0x02, 0x04, 0x05, 0x90, 0x00" |
| |
| = Test repr_hex function |
| repr_hex("scapy") == "7363617079" |
| |
| = Test hexstr function |
| hexstr(b"A\x00\xFFB") == "41 00 FF 42 A..B" |
| |
| = Test fletcher16 functions |
| assert fletcher16_checksum(b"\x28\x07") == 22319 |
| assert fletcher16_checkbytes(b"\x28\x07", 1) == b"\xaf(" |
| |
| = Test hexdiff function |
| ~ not_pypy |
| def test_hexdiff(a, b, algo=None, autojunk=False): |
| conf_color_theme = conf.color_theme |
| conf.color_theme = BlackAndWhite() |
| with ContextManagerCaptureOutput() as cmco: |
| hexdiff(a, b, algo=algo, autojunk=autojunk) |
| result_hexdiff = cmco.get_output() |
| conf.interactive = True |
| conf.color_theme = conf_color_theme |
| return result_hexdiff |
| |
| # Basic string test |
| |
| result_hexdiff = test_hexdiff("abcde", "abCde") |
| expected = "0000 61 62 63 64 65 abcde\n" |
| expected += " 0000 61 62 43 64 65 abCde\n" |
| assert result_hexdiff == expected |
| |
| # More advanced string test |
| |
| result_hexdiff = test_hexdiff("add_common_", "_common_removed") |
| expected = "0000 61 64 64 5F 63 6F 6D 6D 6F 6E 5F add_common_ \n" |
| expected += " -003 5F 63 6F 6D 6D 6F 6E 5F 72 65 6D 6F 76 _common_remov\n" |
| expected += " 000d 65 64 ed\n" |
| assert result_hexdiff == expected |
| |
| # Compare packets |
| |
| result_hexdiff = test_hexdiff(IP(dst="127.0.0.1", src="127.0.0.1"), IP(dst="127.0.0.2", src="127.0.0.1")) |
| expected = "0000 45 00 00 14 00 01 00 00 40 00 7C E7 7F 00 00 01 E.......@.|.....\n" |
| expected += " 0000 45 00 00 14 00 01 00 00 40 00 7C E6 7F 00 00 01 E.......@.|.....\n" |
| expected += "0010 7F 00 00 01 ....\n" |
| expected += " 0010 7F 00 00 02 ....\n" |
| assert result_hexdiff == expected |
| |
| # Compare using difflib |
| |
| a = "A" * 1000 + "findme" + "B" * 1000 |
| b = "A" * 1000 + "B" * 1000 |
| ret1 = test_hexdiff(a, b, algo="difflib") |
| ret2 = test_hexdiff(a, b, algo="difflib", autojunk=True) |
| |
| expected_ret1 = """ |
| 03d0 03d0 41 41 41 41 41 41 41 41 41 41 41 41 41 41 41 41 AAAAAAAAAAAAAAAA |
| 03e0 41 41 41 41 41 41 41 41 66 69 6E 64 6D 65 42 42 AAAAAAAAfindmeBB |
| 03e0 41 41 41 41 41 41 41 41 42 42 AAAAAAAA BB |
| 03ea 03ea 42 42 42 42 42 42 42 42 42 42 42 42 42 42 42 42 BBBBBBBBBBBBBBBB |
| """ |
| expected_ret2 = """ |
| 03d0 03d0 41 41 41 41 41 41 41 41 41 41 41 41 41 41 41 41 AAAAAAAAAAAAAAAA |
| 03e0 41 41 41 41 41 41 41 41 66 69 6E 64 6D 65 42 42 AAAAAAAAfindmeBB |
| 03e0 41 41 41 41 41 41 41 41 42 42 42 42 42 42 42 42 AAAAAAAABBBBBBBB |
| 03f0 03f0 42 42 42 42 42 42 42 42 42 42 42 42 42 42 42 42 BBBBBBBBBBBBBBBB |
| """ |
| |
| assert ret1 != ret2 |
| assert expected_ret1 in ret1 |
| assert expected_ret2 in ret2 |
| |
| # Test corner cases that should not crash |
| |
| hexdiff(b"abc", IP() / TCP()) |
| hexdiff(IP() / TCP(), b"abc") |
| |
| = Test mysummary functions - Ether |
| |
| p = Ether(dst="ff:ff:ff:ff:ff:ff", src="ff:ff:ff:ff:ff:ff", type=0x9000) |
| p |
| assert p.mysummary() in ['ff:ff:ff:ff:ff:ff > ff:ff:ff:ff:ff:ff (%s)' % loop |
| for loop in ['0x9000', 'LOOP']] |
| |
| = Test zerofree_randstring function |
| random.seed(0x2807) |
| zerofree_randstring(4) in [b"\xd2\x12\xe4\x5b", b'\xd3\x8b\x13\x12'] |
| |
| = Test strand function |
| assert strand(b"AC", b"BC") == b'@C' |
| |
| = Test export_object and import_object functions |
| from unittest import mock |
| def test_export_import_object(): |
| with ContextManagerCaptureOutput() as cmco: |
| export_object(2807) |
| result_export_object = cmco.get_output(eval_bytes=True) |
| assert result_export_object.startswith("eNprYPL9zqUHAAdrAf8=") |
| assert import_object(result_export_object) == 2807 |
| |
| test_export_import_object() |
| |
| = Test tex_escape function |
| tex_escape("$#_") == "\\$\\#\\_" |
| |
| = Test colgen function |
| f = colgen(range(3)) |
| assert len([next(f) for i in range(2)]) == 2 |
| |
| = Test incremental_label function |
| f = incremental_label() |
| assert [next(f) for i in range(2)] == ["tag00000", "tag00001"] |
| |
| = Test corrupt_* functions |
| import random |
| random.seed(0x2807) |
| assert corrupt_bytes("ABCDE") in [b"ABCDW", b"ABCDX"] |
| assert sane(corrupt_bytes("ABCDE", n=3)) in ["A.8D4", ".2.DE"] |
| |
| assert corrupt_bits("ABCDE") in [b"EBCDE", b"ABCDG"] |
| assert sane(corrupt_bits("ABCDE", n=3)) in ["AF.EE", "QB.TE"] |
| |
| = Test save_object and load_object functions |
| import tempfile |
| fd, fname = tempfile.mkstemp() |
| save_object(fname, 2807) |
| assert load_object(fname) == 2807 |
| |
| = Test whois function |
| ~ netaccess |
| |
| if not WINDOWS: |
| result = whois("193.0.6.139") |
| assert b"inetnum" in result and b"Amsterdam" in result |
| |
| = Test manuf DB methods |
| ~ manufdb |
| assert conf.manufdb._resolve_MAC("00:00:0F:01:02:03") == "Next:01:02:03" |
| assert conf.manufdb._get_short_manuf("00:00:0F:01:02:03") == "Next" |
| assert in6_addrtovendor("fe80::0200:0fff:fe01:0203").lower().startswith("next") |
| |
| assert conf.manufdb.lookup("00:00:0F:01:02:03") == ('Next', 'Next, Inc.') |
| assert "00:00:0F" in conf.manufdb.reverse_lookup("Next") |
| |
| = Test multiple wireshark's manuf formats |
| ~ manufdb |
| |
| new_format = """ |
| # comment |
| 00:00:00 JokyIsland Joky Insland Corp SA |
| 00:01:12 SecdevCorp Secdev Corporation SA LLC |
| EE:05:01 Scapy Scapy CO LTD & CIE |
| FF:00:11 NoName |
| """ |
| old_format = """ |
| # comment |
| 00:00:00 JokyIsland # Joky Insland Corp SA |
| 00:01:12 SecdevCorp # Secdev Corporation SA LLC |
| EE:05:01 Scapy # Scapy CO LTD & CIE |
| FF:00:11 NoName |
| """ |
| |
| manuf1 = get_temp_file() |
| manuf2 = get_temp_file() |
| |
| with open(manuf1, "w") as w: |
| w.write(old_format) |
| |
| with open(manuf2, "w") as w: |
| w.write(new_format) |
| |
| a = load_manuf(manuf1) |
| b = load_manuf(manuf2) |
| |
| assert a.lookup("00:00:00") == ('JokyIsland', 'Joky Insland Corp SA') |
| assert a.lookup("FF:00:11:00:00:00") == ('NoName', 'NoName') |
| assert a.reverse_lookup("Scapy") == {'EE:05:01': ('Scapy', 'Scapy CO LTD & CIE')} |
| assert a.reverse_lookup("Secdevcorp") == {'00:01:12': ('SecdevCorp', 'Secdev Corporation SA LLC')} |
| |
| |
| assert b.lookup("00:00:00") == ('JokyIsland', 'Joky Insland Corp SA') |
| assert b.lookup("FF:00:11:00:00:00") == ('NoName', 'NoName') |
| assert b.reverse_lookup("Scapy") == {'EE:05:01': ('Scapy', 'Scapy CO LTD & CIE')} |
| assert b.reverse_lookup("Secdevcorp") == {'00:01:12': ('SecdevCorp', 'Secdev Corporation SA LLC')} |
| |
| scapy_delete_temp_files() |
| |
| = Test load_services |
| |
| data_services = """ |
| itu-bicc-stc 3097/sctp |
| cvsup 5999/udp # CVSup |
| x11 6000-6063/tcp # X Window System |
| x11 6000-6063/udp # X Window System |
| ndl-ahp-svc 6064/tcp # NDL-AHP-SVC |
| """ |
| |
| services = get_temp_file() |
| with open(services, "w") as w: |
| w.write(data_services) |
| |
| tcp, udp, sctp = load_services(services) |
| assert tcp[6002] == "x11" |
| assert tcp.ndl_ahp_svc == 6064 |
| assert tcp.x11 in range(6000, 6093) |
| assert udp[6002] == "x11" |
| assert udp.x11 in range(6000, 6093) |
| assert udp.cvsup == 5999 |
| assert sctp[3097] == "itu_bicc_stc" |
| assert sctp.itu_bicc_stc == 3097 |
| |
| scapy_delete_temp_files() |
| |
| = Test utility functions - network related |
| ~ netaccess |
| |
| assert atol("1.1.1.1") == 0x1010101 |
| assert atol("192.168.0.1") == 0xc0a80001 |
| |
| = Test autorun functions |
| ~ autorun |
| |
| ret = autorun_get_text_interactive_session("IP().src") |
| ret |
| assert ret == (">>> IP().src\n'127.0.0.1'\n", '127.0.0.1') |
| |
| ret = autorun_get_html_interactive_session("IP().src") |
| ret |
| assert ret == ("<span class=prompt>>>> </span>IP().src\n'127.0.0.1'\n", '127.0.0.1') |
| |
| ret = autorun_get_latex_interactive_session("IP().src") |
| ret |
| assert ret == ("\\textcolor{blue}{{\\tt\\char62}{\\tt\\char62}{\\tt\\char62} }IP().src\n'127.0.0.1'\n", '127.0.0.1') |
| |
| ret = autorun_get_text_interactive_session("scapy_undefined") |
| assert "NameError" in ret[0] |
| |
| = Test autorun with logging |
| |
| cmds = """log_runtime.info(hex_bytes("446166742050756e6b"))\n""" |
| ret = autorun_get_text_interactive_session(cmds) |
| ret |
| assert "Daft Punk" in ret[0] |
| |
| = Test utility TEX functions |
| |
| assert tex_escape("{scapy}\\^$~#_&%|><") == "{\\tt\\char123}scapy{\\tt\\char125}{\\tt\\char92}\\^{}\\${\\tt\\char126}\\#\\_\\&\\%{\\tt\\char124}{\\tt\\char62}{\\tt\\char60}" |
| |
| a = colgen(1, 2, 3) |
| assert next(a) == (1, 2, 2) |
| assert next(a) == (1, 3, 3) |
| assert next(a) == (2, 2, 1) |
| assert next(a) == (2, 3, 2) |
| assert next(a) == (2, 1, 3) |
| assert next(a) == (3, 3, 1) |
| assert next(a) == (3, 1, 2) |
| assert next(a) == (3, 2, 3) |
| |
| = Test config file functions |
| |
| saved_conf_verb = conf.verb |
| fd, fname = tempfile.mkstemp() |
| os.write(fd, b"conf.verb = 42\n") |
| os.close(fd) |
| from scapy.main import _read_config_file |
| _read_config_file(fname, globals(), locals()) |
| assert conf.verb == 42 |
| conf.verb = saved_conf_verb |
| |
| = Test config file functions failures |
| |
| from scapy.main import _read_config_file, _probe_config_folder |
| assert _read_config_file(_probe_config_folder("filethatdoesnotexistnorwillever.tsppajfsrdrr")) is None |
| |
| = Test CacheInstance repr |
| |
| conf.netcache |
| |
| = Test pyx detection functions |
| |
| from unittest.mock import patch |
| |
| def _r(*args, **kwargs): |
| raise OSError |
| |
| with patch("scapy.libs.test_pyx.subprocess.check_call", _r): |
| from scapy.libs.test_pyx import _test_pyx |
| assert _test_pyx() == False |
| |
| = Test matplotlib detection functions |
| |
| from unittest.mock import MagicMock, patch |
| |
| bck_scapy_libs_matplot = sys.modules.get("scapy.libs.matplot", None) |
| if bck_scapy_libs_matplot: |
| del sys.modules["scapy.libs.matplot"] |
| |
| mock_matplotlib = MagicMock() |
| mock_matplotlib.get_backend.return_value = "inline" |
| mock_matplotlib.pyplot = MagicMock() |
| mock_matplotlib.pyplot.plt = None |
| with patch.dict("sys.modules", **{ "matplotlib": mock_matplotlib, "matplotlib.lines": mock_matplotlib}): |
| from scapy.libs.matplot import MATPLOTLIB, MATPLOTLIB_INLINED, MATPLOTLIB_DEFAULT_PLOT_KARGS, Line2D |
| assert MATPLOTLIB == 1 |
| assert MATPLOTLIB_INLINED == 1 |
| assert "marker" in MATPLOTLIB_DEFAULT_PLOT_KARGS |
| |
| mock_matplotlib.get_backend.return_value = "ko" |
| with patch.dict("sys.modules", **{ "matplotlib": mock_matplotlib, "matplotlib.lines": mock_matplotlib}): |
| from scapy.libs.matplot import MATPLOTLIB, MATPLOTLIB_INLINED, MATPLOTLIB_DEFAULT_PLOT_KARGS |
| assert MATPLOTLIB == 1 |
| assert MATPLOTLIB_INLINED == 0 |
| assert "marker" in MATPLOTLIB_DEFAULT_PLOT_KARGS |
| |
| if bck_scapy_libs_matplot: |
| sys.modules["scapy.libs.matplot"] = bck_scapy_libs_matplot |
| |
| |
| ############ |
| ############ |
| + Basic tests |
| |
| * Those test are here mainly to check nothing has been broken |
| * and to catch Exceptions |
| |
| = Packet class methods |
| p = IP()/ICMP() |
| ret = p.do_build_ps() |
| assert ret[0] == b"@\x00\x00\x00\x00\x01\x00\x00@\x01\x00\x00\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\x00\x00\x00\x00\x00\x00" |
| assert len(ret[1]) == 2 |
| |
| assert p[ICMP].firstlayer() == p |
| |
| assert p.command() == "IP()/ICMP()" |
| |
| p.decode_payload_as(UDP) |
| assert p.sport == 2048 and p.dport == 63487 |
| |
| = hide_defaults |
| conf_color_theme = conf.color_theme |
| conf.color_theme = BlackAndWhite() |
| p = IP(ttl=64)/ICMP() |
| assert repr(p) in ["<IP frag=0 ttl=64 proto=icmp |<ICMP |>>", "<IP frag=0 ttl=64 proto=1 |<ICMP |>>"] |
| p.hide_defaults() |
| assert repr(p) in ["<IP frag=0 proto=icmp |<ICMP |>>", "<IP frag=0 proto=1 |<ICMP |>>"] |
| conf.color_theme = conf_color_theme |
| |
| = split_layers |
| p = IP()/ICMP() |
| s = raw(p) |
| split_layers(IP, ICMP, proto=1) |
| assert Raw in IP(s) |
| bind_layers(IP, ICMP, frag=0, proto=1) |
| |
| = fuzz |
| |
| r = fuzz(IP(tos=2)/ICMP()) |
| assert r.tos == 2 |
| z = r.ttl |
| assert r.ttl != z |
| assert r.ttl != z |
| |
| |
| = fuzz a Packet with MultipleTypeField |
| |
| fuzz(ARP(pdst="127.0.0.1")) |
| fuzz(IP()/ARP(pdst='10.0.0.254')) |
| |
| = fuzz on packets with advanced RandNum |
| |
| x = IP(dst="8.8.8.8")/fuzz(UDP()/NTP(version=4)) |
| x.show2() |
| x = IP(raw(x)) |
| assert NTP in x |
| |
| = fuzz on packets with FlagsField |
| assert isinstance(fuzz(TCP()).flags, VolatileValue) |
| |
| = Building some packets |
| ~ basic IP TCP UDP NTP LLC SNAP Dot11 |
| IP()/TCP() |
| Ether()/IP()/UDP()/NTP() |
| Dot11()/LLC()/SNAP()/IP()/TCP()/"XXX" |
| IP(ttl=25)/TCP(sport=12, dport=42) |
| IP().summary() |
| |
| = Manipulating some packets |
| ~ basic IP TCP |
| a=IP(ttl=4)/TCP() |
| a.ttl |
| a.ttl=10 |
| del a.ttl |
| a.ttl |
| TCP in a |
| a[TCP] |
| a[TCP].dport=[80,443] |
| a |
| assert a.copy().time == a.time |
| a=3 |
| |
| = Bind string array as payload |
| ~ basic |
| assert bytes(Raw("sca")/"py") == b"scapy" |
| assert bytes(Raw("sca")/b"py") == b"scapy" |
| assert bytes(Raw("sca")/bytearray(b"py")) == b"scapy" |
| assert bytes("sca"/Raw("py")) == b"scapy" |
| assert bytes(b"sca"/Raw("py")) == b"scapy" |
| assert bytes(bytearray(b"sca")/Raw("py")) == b"scapy" |
| a=Raw("sca") |
| a.add_payload("py") |
| assert bytes(a) == b"scapy" |
| a=Raw("sca") |
| a.add_payload(b"py") |
| assert bytes(a) == b"scapy" |
| a=Raw("sca") |
| a.add_payload(bytearray(b"py")) |
| assert bytes(a) == b"scapy" |
| |
| = Checking overloads |
| ~ basic IP TCP Ether |
| a=Ether()/IP()/TCP() |
| r = a.proto |
| r |
| r == 6 |
| |
| |
| = sprintf() function |
| ~ basic sprintf Ether IP UDP NTP |
| a=Ether()/IP()/IP(ttl=4)/UDP()/NTP() |
| r = a.sprintf("%type% %IP.ttl% %#05xr,UDP.sport% %IP:2.ttl%") |
| r |
| r in ['0x800 64 0x07b 4', 'IPv4 64 0x07b 4'] |
| |
| |
| = sprintf() function |
| ~ basic sprintf IP TCP SNAP LLC Dot11 |
| * This test is on the conditional substring feature of <tt>sprintf()</tt> |
| a=Dot11()/LLC()/SNAP()/IP()/TCP() |
| r = a.sprintf("{IP:{TCP:flags=%TCP.flags%}{UDP:port=%UDP.ports%} %IP.src%}") |
| r |
| r == 'flags=S 127.0.0.1' |
| |
| |
| = haslayer function |
| ~ basic haslayer IP TCP ICMP ISAKMP |
| x=IP(id=1)/ISAKMP_payload_SA(prop=ISAKMP_payload_SA(prop=IP()/ICMP()))/TCP() |
| r = (TCP in x, ICMP in x, IP in x, UDP in x) |
| r |
| r == (True,True,True,False) |
| |
| = getlayer function |
| ~ basic getlayer IP ISAKMP UDP |
| x=IP(id=1)/ISAKMP_payload_SA(prop=IP(id=2)/UDP(dport=1))/IP(id=3)/UDP(dport=2) |
| x[IP] |
| x[IP:2] |
| x[IP:3] |
| x.getlayer(IP,3) |
| x.getlayer(IP,4) |
| x[UDP] |
| x[UDP:1] |
| x[UDP:2] |
| assert(x[IP].id == 1 and x[IP:2].id == 2 and x[IP:3].id == 3 and |
| x.getlayer(IP).id == 1 and x.getlayer(IP,3).id == 3 and |
| x.getlayer(IP,4) == None and |
| x[UDP].dport == 1 and x[UDP:2].dport == 2) |
| try: |
| x[IP:4] |
| except IndexError: |
| True |
| else: |
| False |
| |
| = getlayer / haslayer with name |
| ~ basic getlayer IP ICMP IPerror TCPerror |
| x = IP() / ICMP() / IPerror() |
| assert x.getlayer(ICMP) is not None |
| assert x.getlayer(IPerror) is not None |
| assert x.getlayer("IP in ICMP") is not None |
| assert x.getlayer(TCPerror) is None |
| assert x.getlayer("TCP in ICMP") is None |
| assert x.haslayer(ICMP) |
| assert x.haslayer(IPerror) |
| assert x.haslayer("IP in ICMP") |
| assert not x.haslayer(TCPerror) |
| assert not x.haslayer("TCP in ICMP") |
| |
| = getlayer with a filter |
| ~ getlayer IP |
| pkt = IP() / IP(ttl=3) / IP() |
| assert pkt[IP::{"ttl":3}].ttl == 3 |
| assert pkt.getlayer(IP, ttl=3).ttl == 3 |
| assert IPv6ExtHdrHopByHop(options=[HBHOptUnknown()]).getlayer(HBHOptUnknown, otype=42) is None |
| |
| = specific haslayer and getlayer implementations for EAP |
| ~ haslayer getlayer EAP |
| pkt = Ether() / EAPOL() / EAP_MD5() |
| assert EAP in pkt |
| assert pkt.haslayer(EAP) |
| assert isinstance(pkt[EAP], EAP_MD5) |
| assert isinstance(pkt.getlayer(EAP), EAP_MD5) |
| |
| = specific haslayer and getlayer implementations for RadiusAttribute |
| ~ haslayer getlayer RadiusAttribute |
| pkt = RadiusAttr_EAP_Message() |
| assert RadiusAttribute in pkt |
| assert pkt.haslayer(RadiusAttribute) |
| assert isinstance(pkt[RadiusAttribute], RadiusAttr_EAP_Message) |
| assert isinstance(pkt.getlayer(RadiusAttribute), RadiusAttr_EAP_Message) |
| |
| |
| = equality |
| ~ basic |
| w=Ether()/IP()/UDP(dport=53) |
| x=Ether()/IP(version=4)/UDP() |
| y=Ether()/IP()/UDP(dport=4) |
| z=Ether()/IP()/UDP()/NTP() |
| t=Ether()/IP()/TCP() |
| assert x != y and x != z and x != t and y != z and y != t and z != t and w == x |
| |
| = answers |
| ~ basic |
| a1, a2 = "1.2.3.4", "5.6.7.8" |
| p1 = IP(src=a1, dst=a2)/ICMP(type=8) |
| p2 = IP(src=a2, dst=a1)/ICMP(type=0) |
| assert p1.hashret() == p2.hashret() |
| assert not p1.answers(p2) |
| assert p2.answers(p1) |
| assert p1 > p2 |
| assert p2 < p1 |
| assert p1 == p1 |
| conf_back = conf.checkIPinIP |
| conf.checkIPinIP = True |
| px = [IP()/p1, IPv6()/p1] |
| assert not any(p.hashret() == p2.hashret() for p in px) |
| assert not any(p.answers(p2) for p in px) |
| assert not any(p2.answers(p) for p in px) |
| conf.checkIPinIP = False |
| assert all(p.hashret() == p2.hashret() for p in px) |
| assert not any(p.answers(p2) for p in px) |
| assert all(p2.answers(p) for p in px) |
| conf.checkIPinIP = conf_back |
| |
| = answers - Net |
| ~ netaccess |
| |
| a1, a2 = Net("www.google.com"), Net("www.secdev.org") |
| prt1, prt2 = 12345, 54321 |
| s1, s2 = 2767216324, 3845532842 |
| p1 = IP(src=a1, dst=a2)/TCP(flags='SA', seq=s1, ack=s2, sport=prt1, dport=prt2) |
| p2 = IP(src=a2, dst=a1)/TCP(flags='R', seq=s2, ack=0, sport=prt2, dport=prt1) |
| assert p2.answers(p1) |
| assert not p1.answers(p2) |
| # Not available yet because of IPv6 |
| # a1, a2 = Net6("www.google.com"), Net6("www.secdev.org") |
| p1 = IP(src=a1, dst=a2)/TCP(flags='S', seq=s1, ack=0, sport=prt1, dport=prt2) |
| p2 = IP(src=a2, dst=a1)/TCP(flags='RA', seq=0, ack=s1+1, sport=prt2, dport=prt1) |
| assert p2.answers(p1) |
| assert not p1.answers(p2) |
| p1 = IP(src=a1, dst=a2)/TCP(flags='S', seq=s1, ack=0, sport=prt1, dport=prt2) |
| p2 = IP(src=a2, dst=a1)/TCP(flags='SA', seq=s2, ack=s1+1, sport=prt2, dport=prt1) |
| assert p2.answers(p1) |
| assert not p1.answers(p2) |
| p1 = IP(src=a1, dst=a2)/TCP(flags='A', seq=s1, ack=s2+1, sport=prt1, dport=prt2) |
| assert not p2.answers(p1) |
| assert p1.answers(p2) |
| p1 = IP(src=a1, dst=a2)/TCP(flags='S', seq=s1, ack=0, sport=prt1, dport=prt2) |
| p2 = IP(src=a2, dst=a1)/TCP(flags='SA', seq=s2, ack=s1+10, sport=prt2, dport=prt1) |
| assert not p2.answers(p1) |
| assert not p1.answers(p2) |
| p1 = IP(src=a1, dst=a2)/TCP(flags='A', seq=s1, ack=s2+1, sport=prt1, dport=prt2) |
| assert not p2.answers(p1) |
| assert not p1.answers(p2) |
| p1 = IP(src=a1, dst=a2)/TCP(flags='A', seq=s1+9, ack=s2+10, sport=prt1, dport=prt2) |
| assert not p2.answers(p1) |
| assert not p1.answers(p2) |
| |
| = conf.checkIPsrc |
| |
| conf_checkIPsrc = conf.checkIPsrc |
| conf.checkIPsrc = 0 |
| query = IP(id=42676, src='10.128.0.7', dst='192.168.0.1')/ICMP(id=26) |
| answer = IP(src='192.168.48.19', dst='10.128.0.7')/ICMP(type=11)/IPerror(id=42676, src='192.168.51.23', dst='192.168.0.1')/ICMPerror(id=26) |
| assert answer.answers(query) |
| conf.checkIPsrc = conf_checkIPsrc |
| |
| |
| ############ |
| ############ |
| + command() / json() tests |
| ~ command |
| |
| = Test command() with normal packet |
| |
| pkt = IP(dst="127.0.0.1", src="127.0.0.1") / UDP(dport=12345, sport=654) |
| assert pkt.command() == "IP(src='127.0.0.1', dst='127.0.0.1')/UDP(sport=654, dport=12345)" |
| |
| = Test json() with normal packet |
| |
| assert pkt.json() == '{"version": 4, "ihl": null, "tos": 0, "len": null, "id": 1, "flags": 0, "frag": 0, "ttl": 64, "proto": 17, "chksum": null, "src": "127.0.0.1", "dst": "127.0.0.1", "payload": {"sport": 654, "dport": 12345, "len": null, "chksum": null}}' |
| |
| = Test command() with nested packet |
| |
| pkt = DNS(qd=[DNSQR(qtype="A", qname="google.com")]) |
| assert pkt.command() == "DNS(qd=[DNSQR(qname=b'google.com.', qtype=1)])" |
| |
| = Test json() with nested packet |
| |
| assert pkt.json() == '{"length": null, "id": 0, "qr": 0, "opcode": 0, "aa": 0, "tc": 0, "rd": 1, "ra": 0, "z": 0, "ad": 0, "cd": 0, "rcode": 0, "qdcount": null, "ancount": null, "nscount": null, "arcount": null, "qd": [{"qname": "google.com.", "qtype": 1, "unicastresponse": 0, "qclass": 1}]}' |
| |
| = Test command() with ASN.1 packet |
| |
| pkt = KRB_AP_REP(bytes(KRB_AP_REP(encPart=EncryptedData()))) |
| assert pkt.command() == "KRB_AP_REP(pvno=ASN1_INTEGER(5), msgType=ASN1_INTEGER(15), encPart=EncryptedData(etype=ASN1_INTEGER(23), kvno=None, cipher=ASN1_STRING(b'')))" |
| |
| = Test json(Ã with ASN.1 packet |
| |
| assert pkt.json() == '{"pvno": {"type": "ASN1_INTEGER", "value": "5"}, "msgType": {"type": "ASN1_INTEGER", "value": "15"}, "encPart": {"etype": {"type": "ASN1_INTEGER", "value": "23"}, "kvno": null, "cipher": {"type": "ASN1_STRING", "value": ""}}}' |
| |
| = Test command() with meaningless payload |
| |
| pkt = PPTPStartControlConnectionReply() / IP(dst="127.0.0.1", src="127.0.0.1") |
| assert pkt.command() == "PPTPStartControlConnectionReply()/IP(src='127.0.0.1', dst='127.0.0.1')" |
| |
| = Test json() with meaningless payload |
| |
| assert pkt.json() == '{"len": 156, "type": 1, "magic_cookie": 439041101, "ctrl_msg_type": 2, "reserved_0": 0, "protocol_version": 256, "result_code": 1, "error_code": 0, "framing_capabilities": 0, "bearer_capabilities": 0, "maximum_channels": 65535, "firmware_revision": 256, "host_name": "linux", "vendor_string": "", "payload": {"version": 4, "ihl": null, "tos": 0, "len": null, "id": 1, "flags": 0, "frag": 0, "ttl": 64, "proto": 0, "chksum": null, "src": "127.0.0.1", "dst": "127.0.0.1"}}' |
| |
| |
| ############ |
| ############ |
| + Tests on padding |
| |
| = Padding assembly |
| r = raw(Padding("abc")) |
| r |
| assert r == b"abc" |
| r = raw(Padding("abc")/Padding("def")) |
| r |
| assert r == b"abcdef" |
| r = raw(Raw("ABC")/Padding("abc")/Padding("def")) |
| r |
| assert r == b"ABCabcdef" |
| r = raw(Raw("ABC")/Padding("abc")/Raw("DEF")/Padding("def")) |
| r |
| assert r == b"ABCDEFabcdef" |
| |
| = Padding and length computation |
| p = IP(raw(IP()/Padding("abc"))) |
| p |
| assert p.len == 20 and len(p) == 23 |
| p = IP(raw(IP()/Raw("ABC")/Padding("abc"))) |
| p |
| assert p.len == 23 and len(p) == 26 |
| p = IP(raw(IP()/Raw("ABC")/Padding("abc")/Padding("def"))) |
| p |
| assert p.len == 23 and len(p) == 29 |
| |
| = PadField test |
| ~ PadField padding |
| |
| class TestPad(Packet): |
| fields_desc = [ PadField(StrNullField("st", b""), 6, padwith=b"\xff"), StrField("id", b"")] |
| |
| assert TestPad() == TestPad(raw(TestPad())) |
| assert raw(TestPad(st=b"st", id=b"id")) == b'st\x00\xff\xff\xffid' |
| |
| = ReversePadField |
| ~ PadField padding |
| |
| class TestReversePad(Packet): |
| fields_desc = [ ByteField("a", 0), |
| ReversePadField(IntField("b", 0), 4)] |
| |
| assert raw(TestReversePad(a=1, b=0xffffffff)) == b'\x01\x00\x00\x00\xff\xff\xff\xff' |
| assert TestReversePad(raw(TestReversePad(a=1, b=0xffffffff))).b == 0xffffffff |
| |
| ############ |
| ############ |
| + Tests on default value changes mechanism |
| |
| = Creation of an IPv3 class from IP class with different default values |
| class IPv3(IP): |
| version = 3 |
| ttl = 32 |
| |
| = Test of IPv3 class |
| a = IPv3() |
| v,t = a.version, a.ttl |
| v,t |
| assert (v,t) == (3,32) |
| r = raw(a) |
| r |
| assert r == b'5\x00\x00\x14\x00\x01\x00\x00 \x00\xac\xe7\x7f\x00\x00\x01\x7f\x00\x00\x01' |
| |
| ############ |
| ############ |
| + ASN.1 tests |
| |
| = ASN1 - ASN1_Object |
| assert ASN1_Object(1) == ASN1_Object(1) |
| assert ASN1_Object(1) > ASN1_Object(0) |
| assert ASN1_Object(1) >= ASN1_Object(1) |
| assert ASN1_Object(0) < ASN1_Object(1) |
| assert ASN1_Object(1) <= ASN1_Object(2) |
| assert ASN1_Object(1) != ASN1_Object(2) |
| ASN1_Object(2).show() |
| |
| = ASN1 - RandASN1Object |
| a = RandASN1Object() |
| random.seed(0x2807) |
| o = bytes(a) |
| o |
| assert o in [ |
| b'\x1e\x023V', # PyPy 2.7 |
| b'A\x02\x07q', # Python 2.7 |
| b'F\x02\xfe\x92', # python 3.7-3.9 |
| ] |
| |
| = ASN1 - ASN1_BIT_STRING |
| a = ASN1_BIT_STRING("test", readable=True) |
| a |
| assert a.val == '01110100011001010111001101110100' |
| assert raw(a) == b'\x03\x05\x00test' |
| |
| a = ASN1_BIT_STRING(b"\xff"*16, readable=True) |
| a |
| assert a.val == "1" * 128 |
| assert raw(a) == b'\x03\x11\x00\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff' |
| |
| = ASN1 - ASN1_SEQUENCE |
| a = ASN1_SEQUENCE([ASN1_Object(1), ASN1_Object(0)]) |
| assert a.strshow() == '# ASN1_SEQUENCE:\n <ASN1_Object[1]>\n <ASN1_Object[0]>\n' |
| |
| = ASN1 - ASN1_DECODING_ERROR |
| a = ASN1_DECODING_ERROR("error", exc=OSError(1)) |
| assert repr(a) == "<ASN1_DECODING_ERROR['error']{{1}}>" |
| b = ASN1_DECODING_ERROR("error", exc=OSError(ASN1_BIT_STRING("0"))) |
| assert repr(b) in ["<ASN1_DECODING_ERROR['error']{{<ASN1_BIT_STRING[0]=b'\\x00' (7 unused bits)>}}>", |
| "<ASN1_DECODING_ERROR['error']{{<ASN1_BIT_STRING[0]='\\x00' (7 unused bits)>}}>"] |
| |
| = ASN1 - ASN1_INTEGER |
| a = ASN1_INTEGER(int("1"*23)) |
| assert repr(a) in ["0x25a55a46e5da99c71c7 <ASN1_INTEGER[1111111111...1111111111]>", |
| "0x25a55a46e5da99c71c7 <ASN1_INTEGER[1111111111...111111111L]>"] |
| |
| = ASN1 - ASN1_OID |
| assert raw(ASN1_OID("")) == b"\x06\x00" |
| |
| = RandASN1Object(), specific crashes |
| |
| import random |
| |
| # ASN1F_NUMERIC_STRING |
| random.seed(1514315682) |
| raw(RandASN1Object()) |
| |
| # ASN1F_VIDEOTEX_STRING |
| random.seed(1240186058) |
| raw(RandASN1Object()) |
| |
| # ASN1F_UTC_TIME & ASN1F_GENERALIZED_TIME |
| random.seed(1873503288) |
| raw(RandASN1Object()) |
| |
| = SSID is parsed properly even with the presence of RSN Information |
| ~ SSID RSN Information |
| # A regression test for https://github.com/secdev/scapy/pull/2685. |
| # https://github.com/secdev/scapy/issues/2683 describes a packet with |
| # RSN Information that isn't parsed properly, |
| # causing the SSID to be overridden. |
| # This test checks the SSID is parsed properly. |
| filename = scapy_path("/test/pcaps/bad_rsn_parsing_overrides_ssid.pcap") |
| frame = rdpcap(filename)[0] |
| beacon = frame.getlayer(5) |
| ssid = beacon.network_stats()['ssid'] |
| assert ssid == "ROUTE-821E295" |
| |
| = SSID is parsed properly even when the Country Information Tag Element has an odd length (not complying with the standard) and a missing pad byte |
| ~ Missing Pad Byte in Country Info |
| # A regression test for https://github.com/secdev/scapy/pull/2685. |
| # https://github.com/secdev/scapy/issues/4132 describes a packet with |
| # a Country Information element tag that has an odd length, even though it's against the standard. |
| # The transmitter should have added a padding byte to make the length even, but it didn't. |
| # The effect on scapy used to be improper parsing of the next tag elements, causing the SSID to be overridden. |
| # This test checks the SSID is parsed properly. |
| from io import BytesIO |
| pcapfile = BytesIO(b'\n\r\r\n\x80\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x03\x00\x10\x00Linux 6.1.21-v8+\x04\x00E\x00Dumpcap (Wireshark) 3.4.10 (Git v3.4.10 packaged as 3.4.10-0+deb11u1)\x00\x00\x00\x00\x00\x00\x00\x80\x00\x00\x00\x01\x00\x00\x00@\x00\x00\x00\x7f\x00\x00\x00\x00\x04\x00\x00\x02\x00\x05\x00wifi2\x00\x00\x00\t\x00\x01\x00\t\x00\x00\x00\x0c\x00\x10\x00Linux 6.1.21-v8+\x00\x00\x00\x00@\x00\x00\x00\x06\x00\x00\x00\xb0\x01\x00\x00\x00\x00\x00\x00c\xd3\x87\x17\xe3c5\x82\x90\x01\x00\x00\x90\x01\x00\x00\x00\x00 \x00\xae@\x00\xa0 \x08\x00\xa0 \x08\x00\x00\x10\x0cd\x14@\x01\xa9\x00\x0c\x00\x00\x00\xa6\x00\xa8\x01\x80\x00\x00\x00\xff\xff\xff\xff\xff\xff\x02\xbf\xaf\x9f\xf8\x07\x02\xbf\xaf\x9f\xf8\x070\x96[p\xdcM\x06\x00\x00\x00d\x00\x11\x00\x00\x00\x01\x08\x8c\x12\x98$\xb0H`l\x03\x01,\x05\x04\x00\x01\x00\x00\x07QUS \x01\r\x80$\x01\x80(\x01\x80,\x01\x800\x01\x804\x01\x808\x01\x80<\x01\x80@\x01\x80d\x01\x80h\x01\x80l\x01\x80p\x01\x80t\x01\x80x\x01\x80|\x01\x80\x80\x01\x80\x84\x01\x80\x88\x01\x80\x8c\x01\x80\x90\x01\x80\x95\x01\x80\x99\x01\x80\x9d\x01\x80\xa1\x01\x80\xa5\x01\x800\x14\x01\x00\x00\x0f\xac\x04\x01\x00\x00\x0f\xac\x04\x01\x00\x00\x0f\xac\x02\x0c\x00;\x02s\x00-\x1a,\t\x13\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00,\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00=\x16,\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xdd\x18\x00P\xf2\x02\x01\x01\x81\x00\x03\xa4\x00\x00\'\xa4\x00\x00BC]\x00a\x11.\x00\xdd;\x00P\xf2\x04\x10J\x00\x01\x10\x10D\x00\x01\x02\x10I\x00\x06\x007*\x00\x01 \x10\x11\x00\x1358" Hisense Roku TV\x10T\x00\x08\x00\x07\x00P\xf2\x04\x00\x01\xdd\x16\xc8:k\x01\x01\x1048<@dhlptx|\x80\x84\x88\x8c\x90\xdd\x12Po\x9a\t\x02\x02\x00!\x0b\x03\x06\x00\x02\xbf\xaf\x9f\xf8\x07\xdd\rPo\x9a\n\x00\x00\x06\x01\x11\x1cD\x002\xf5N\xfbh\xb0\x01\x00\x00') |
| pktpcap = rdpcap(pcapfile) |
| frame = pktpcap[0] |
| beacon = frame.getlayer(4) |
| stats = beacon.network_stats() |
| ssid = stats['ssid'] |
| assert ssid == "" |
| country = stats['country'] |
| assert country == 'US' |
| |
| ############ |
| ############ |
| + Network tests |
| |
| * Those tests need network access |
| |
| = Sending and receiving an ICMP |
| ~ netaccess needs_root IP ICMP icmp_firewall |
| def _test(): |
| with no_debug_dissector(): |
| x = sr1(IP(dst="www.google.com")/ICMP(),timeout=3) |
| assert x is not None |
| x |
| assert x[IP].ottl() in [32, 64, 128, 255] |
| assert 0 <= x[IP].hops() <= 126 |
| assert ICMP in x and x[ICMP].type == 0 |
| |
| retry_test(_test) |
| |
| = Sending a TCP syn message at layer 2 and layer 3 |
| ~ netaccess needs_root IP |
| def _test(): |
| tmp = send(IP(dst="8.8.8.8")/TCP(sport=RandShort(), dport=53, flags="S"), return_packets=True, realtime=True) |
| assert len(tmp) == 1 |
| |
| tmp = sendp(Ether()/IP(dst="8.8.8.8")/TCP(sport=RandShort(), dport=53, flags="S"), return_packets=True, realtime=True) |
| assert len(tmp) == 1 |
| |
| p = Ether()/IP(dst="8.8.8.8")/TCP(sport=RandShort(), dport=53, flags="S") |
| from decimal import Decimal |
| p.time = Decimal(p.time) |
| tmp = sendp(p, return_packets=True, realtime=True) |
| assert len(tmp) == 1 |
| |
| retry_test(_test) |
| |
| = Latency check: localhost ICMP |
| ~ netaccess needs_root linux latency |
| |
| # Note: still needs to enforce L3RawSocket as this won't work otherwise with libpcap |
| sock = conf.L3socket |
| conf.L3socket = L3RawSocket |
| |
| def _test(): |
| req = IP(dst="127.0.0.1")/ICMP() |
| ans = sr1(req, timeout=3) |
| assert (ans.time - req.sent_time) >= 0 |
| assert (ans.time - req.sent_time) <= 1e-3 |
| |
| try: |
| retry_test(_test) |
| finally: |
| conf.L3socket = sock |
| |
| = Test sniffing on multiple sockets |
| ~ netaccess needs_root sniff |
| |
| # This test sniffs on the same interface twice at the same time, to |
| # simulate sniffing on multiple interfaces. |
| |
| def _test(): |
| iface = conf.route.route(str(Net("www.google.com")))[0] |
| port = int(RandShort()) |
| pkt = IP(dst="www.google.com")/TCP(sport=port, dport=80, flags="S") |
| def cb(): |
| sr1(pkt, timeout=3) |
| sniffer = AsyncSniffer(started_callback=cb, |
| iface=[iface, iface], |
| lfilter=lambda x: TCP in x and x[TCP].dport == port, |
| prn=lambda x: x.summary(), |
| count=2) |
| sniffer.start() |
| sniffer.join(timeout=3) |
| assert len(sniffer.results) == 2 |
| for pkt in sniffer.results: |
| assert pkt.sniffed_on == iface |
| |
| retry_test(_test) |
| |
| = Test sniffing with AsyncSniffer on failed |
| |
| try: |
| sniffer = AsyncSniffer(iface="this_interface_does_not_exists") |
| sniffer.start() |
| sniffer.join() |
| assert False, "Should have errored by now" |
| except ValueError: |
| assert True |
| |
| = Sending a TCP syn 'forever' at layer 2 and layer 3 |
| ~ netaccess needs_root IP |
| def _test(): |
| tmp = srloop(IP(dst="8.8.8.8")/TCP(sport=RandShort(), dport=53, flags="S"), count=1, timeout=3) |
| assert type(tmp) == tuple and len(tmp[0]) == 1 |
| |
| tmp = srploop(Ether()/IP(dst="8.8.8.8")/TCP(sport=RandShort(), dport=53, flags="S"), count=1, timeout=3) |
| assert type(tmp) == tuple and len(tmp[0]) == 1 |
| |
| retry_test(_test) |
| |
| = Sending and receiving an TCP syn with flooding methods |
| ~ netaccess needs_root IP flood |
| from functools import partial |
| # flooding methods do not support timeout. Packing the test for security |
| def _test_flood(ip, flood_function, add_ether=False): |
| with no_debug_dissector(): |
| p = IP(dst=ip)/TCP(sport=RandShort(), dport=80, flags="S") |
| if add_ether: |
| p = Ether()/p |
| p.show2() |
| x = flood_function(p, timeout=0.5, maxretries=10) |
| if type(x) == tuple: |
| x = x[0][0][1] |
| x |
| assert x[IP].ottl() in [32, 64, 128, 255] |
| assert 0 <= x[IP].hops() <= 126 |
| |
| _test_srflood = partial(_test_flood, "www.google.com", srflood) |
| retry_test(_test_srflood) |
| |
| _test_sr1flood = partial(_test_flood, "www.google.fr", sr1flood) |
| retry_test(_test_sr1flood) |
| |
| _test_srpflood = partial(_test_flood, "www.google.net", srpflood, True) |
| retry_test(_test_srpflood) |
| |
| _test_srp1flood = partial(_test_flood, "www.google.co.uk", srp1flood, True) |
| retry_test(_test_srp1flood) |
| |
| = test chainEX |
| ~ netaccess |
| |
| import socket |
| sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| ssck = StreamSocket(sck) |
| |
| try: |
| r = ssck.sr1(ICMP(type='echo-request'), timeout=0.1, chainEX=True, threaded=False) |
| assert False |
| except Exception: |
| assert True |
| finally: |
| sck.close() |
| |
| = Sending and receiving an ICMPv6EchoRequest |
| ~ netaccess ipv6 |
| def _test(): |
| with no_debug_dissector(): |
| x = sr1(IPv6(dst="www.google.com")/ICMPv6EchoRequest(),timeout=3) |
| x |
| assert x[IPv6].ottl() in [32, 64, 128, 255] |
| assert 0 <= x[IPv6].hops() <= 126 |
| x is not None and ICMPv6EchoReply in x and x[ICMPv6EchoReply].type == 129 |
| |
| retry_test(_test) |
| |
| = Whois request |
| ~ netaccess IP as_resolvers |
| * This test retries on failure because it often fails |
| def _test(): |
| IP(src="8.8.8.8").whois() |
| |
| retry_test(_test) |
| |
| = AS resolvers |
| ~ netaccess IP as_resolvers |
| * This test retries on failure because it often fails |
| |
| def _test(): |
| ret = conf.AS_resolver.resolve("8.8.8.8", "8.8.4.4") |
| assert (len(ret) == 2) |
| assert any(x[1] == "AS15169" for x in ret) |
| |
| retry_test(_test) |
| |
| riswhois_data = b"route: 8.8.8.0/24\ndescr: Google\norigin: AS15169\nnotify: [email protected]\nmnt-by: MAINT-AS15169\nchanged: [email protected] 20150728\nsource: RADB\n\nroute: 8.0.0.0/9\ndescr: Proxy-registered route object\norigin: AS3356\nremarks: auto-generated route object\nremarks: this next line gives the robot something to recognize\nremarks: L'enfer, c'est les autres\nremarks: \nremarks: This route object is for a Level 3 customer route\nremarks: which is being exported under this origin AS.\nremarks: \nremarks: This route object was created because no existing\nremarks: route object with the same origin was found, and\nremarks: since some Level 3 peers filter based on these objects\nremarks: this route may be rejected if this object is not created.\nremarks: \nremarks: Please contact [email protected] if you have any\nremarks: questions regarding this object.\nmnt-by: LEVEL3-MNT\nchanged: [email protected] 20060203\nsource: LEVEL3\n\n\n" |
| |
| ret = AS_resolver_riswhois()._parse_whois(riswhois_data) |
| assert ret == ('AS15169', 'Google') |
| |
| retry_test(_test) |
| |
| # This test is too buggy, and is simulated below |
| #def _test(): |
| # ret = AS_resolver_cymru().resolve("8.8.8.8") |
| # assert (len(ret) == 1) |
| # all(x[1] == "AS15169" for x in ret) |
| # |
| #retry_test(_test) |
| |
| cymru_bulk_data = """ |
| Bulk mode; whois.cymru.com [2017-10-03 08:38:08 +0000] |
| 24776 | 217.25.178.5 | INFOCLIP-AS, FR |
| 36459 | 192.30.253.112 | GITHUB - GitHub, Inc., US |
| 26496 | 68.178.213.61 | AS-26496-GO-DADDY-COM-LLC - GoDaddy.com, LLC, US |
| """ |
| tmp = AS_resolver_cymru().parse(cymru_bulk_data) |
| assert len(tmp) == 3 |
| assert [l[1] for l in tmp] == ['AS24776', 'AS36459', 'AS26496'] |
| |
| = AS resolver - IPv6 |
| ~ netaccess IP as_resolvers |
| * This test retries on failure because it often fails |
| |
| def _test(): |
| as_resolver6 = AS_resolver6() |
| ret = as_resolver6.resolve("2001:4860:4860::8888", "2001:4860:4860::4444") |
| assert (len(ret) == 2) |
| assert any(x[1] == 15169 for x in ret) |
| |
| retry_test(_test) |
| |
| = AS resolver - socket error |
| ~ IP |
| * This test checks that a failing resolver will not crash a script |
| |
| class MockAS_resolver(object): |
| def resolve(self, *ips): |
| raise socket.error |
| |
| asrm = AS_resolver_multi(MockAS_resolver()) |
| assert len(asrm.resolve(["8.8.8.8", "8.8.4.4"])) == 0 |
| |
| = sendpfast |
| ~ tcpreplay |
| |
| old_interactive = conf.interactive |
| conf.interactive = False |
| try: |
| sendpfast([]) |
| assert False |
| except Exception: |
| assert True |
| |
| conf.interactive = old_interactive |
| assert True |
| |
| ############ |
| ############ |
| + Generator tests |
| |
| = Implicit logic 1 |
| ~ IP TCP |
| a = Ether() / IP(ttl=(5, 10)) / TCP(dport=[80, 443]) |
| ls(a) |
| ls(a, verbose=True) |
| l = [p for p in a] |
| len(l) == 12 |
| |
| = Implicit logic 2 |
| ~ IP |
| a = IP(ttl=[1,2,(5,9)]) |
| ls(a) |
| ls(a, verbose=True) |
| l = [p for p in a] |
| len(l) == 7 |
| |
| = Implicit logic 3 |
| # In case there's a single option: __iter__ should not return self |
| a = Ether()/IP(src="127.0.0.1", dst="127.0.0.1")/ICMP() |
| for i in a: |
| i.sent_time = 1 |
| |
| assert a.sent_time is None |
| |
| # In case they are several, self should never be returned |
| a = Ether()/IP(src="127.0.0.1", dst="127.0.0.1")/ICMP(seq=(0, 5)) |
| for i in a: |
| i.sent_time = 1 |
| |
| assert a.sent_time is None |
| |
| |
| ############ |
| ############ |
| + Real usages |
| |
| = Port scan |
| ~ netaccess needs_root IP TCP |
| def _test(): |
| with no_debug_dissector(): |
| ans,unans=sr(IP(dst="www.google.com/30")/TCP(dport=[80,443]), timeout=2) |
| |
| # New format: all Python versions |
| ans.make_table(lambda s, r: (s.dst, s.dport, r.sprintf("{TCP:%TCP.flags%}{ICMP:%ICMP.code%}"))) |
| |
| retry_test(_test) |
| |
| = Send & receive with debug_match |
| ~ netaccess needs_root IP ICMP |
| def _test(): |
| old_debug_match = conf.debug_match |
| conf.debug_match = True |
| with no_debug_dissector(): |
| ans, unans = sr(IP(dst="www.google.fr") / TCP(sport=RandShort(), dport=80, flags="S"), timeout=2) |
| assert ans[0].query == ans[0][0] |
| assert ans[0].answer == ans[0][1] |
| conf.debug_match = old_debug_match |
| assert ans and not unans |
| |
| retry_test(_test) |
| |
| = Send & receive with retry |
| ~ netaccess needs_root IP ICMP |
| def _test(): |
| with no_debug_dissector(): |
| ans, unans = sr(IP(dst=["8.8.8.8", "1.2.3.4"]) / TCP(sport=RandShort(), dport=53, flags="S"), timeout=2, retry=1) |
| assert len(ans) == 1 and len(unans) == 1 |
| |
| retry_test(_test) |
| |
| = Send & receive with multi |
| ~ netaccess needs_root IP ICMP |
| def _test(): |
| with no_debug_dissector(): |
| ans, unans = sr(IP(dst=["8.8.8.8", "1.2.3.4"]) / TCP(sport=RandShort(), dport=53, flags="S"), timeout=2, multi=1) |
| assert len(ans) >= 1 and len(unans) == 1 |
| |
| retry_test(_test) |
| |
| = Traceroute function |
| ~ netaccess needs_root tcpdump |
| * Let's test traceroute |
| def _test(): |
| with no_debug_dissector(): |
| ans, unans = traceroute("www.slashdot.org") |
| ans.nsummary() |
| s,r=ans[0] |
| s.show() |
| s.show(2) |
| |
| retry_test(_test) |
| |
| = send() and sniff() |
| ~ netaccess needs_root |
| |
| def _test(): |
| sendp(Ether()/IP(src="9.0.0.0")/UDP(), count=3, iface=conf.iface) |
| |
| r = sniff(timeout=3, count=1, |
| lfilter=lambda x: IP in x and x[IP].src == "9.0.0.0", |
| iface=conf.iface, |
| started_callback=_test) |
| |
| assert r |
| |
| = sniff() with socket failure |
| * GH issue 3631 |
| |
| REFPACKET = Ether()/IP()/UDP() |
| |
| # A socket that fails after 10 packets |
| class OOPipe(ObjectPipe): |
| def recv(self, x=MTU): |
| self.i = getattr(self, "i", 0) + 1 |
| if self.i == 11: |
| self.close() |
| raise OSError("Giant failure") |
| pkt = super(OOPipe, self).recv(x) |
| self.send(REFPACKET) |
| return pkt |
| |
| o = OOPipe() |
| o.send(REFPACKET) |
| |
| pkts = sniff(opened_socket=[o], timeout=3) |
| assert len(pkts) == 10 |
| |
| = GH issue 3306 |
| ~ netaccess needs_root |
| |
| send(fuzz(ARP())) |
| |
| = Test SuperSocket.select |
| ~ select |
| |
| from unittest import mock |
| |
| @mock.patch("scapy.supersocket.select") |
| def _test_select(select): |
| def f(a, b, c, d): |
| raise IOError(0) |
| select.side_effect = f |
| try: |
| SuperSocket.select([]) |
| return False |
| except: |
| return True |
| |
| assert _test_select() |
| |
| = Test L2ListenTcpdump socket |
| ~ netaccess |
| |
| # Needs to be fixed. Fails randomly |
| #import time |
| #for i in range(10): |
| # read_s = L2ListenTcpdump(iface=conf.iface) |
| # out_s = conf.L2socket(iface=conf.iface) |
| # time.sleep(5) # wait for read_s to be ready |
| # icmp_r = Ether()/IP(dst="secdev.org")/ICMP() |
| # res = sndrcv(out_s, icmp_r, timeout=5, rcv_pks=read_s)[0] |
| # read_s.close() |
| # out_s.close() |
| # time.sleep(5) |
| # if res: |
| # break |
| # |
| #response = res[0][1] |
| #assert response[ICMP].type == 0 |
| |
| True |
| |
| = Test set of sent_time by sr |
| ~ netaccess needs_root IP ICMP |
| def _test(): |
| packet = IP(dst="8.8.8.8")/ICMP() |
| r = sr(packet, timeout=2) |
| assert packet.sent_time is not None |
| |
| retry_test(_test) |
| |
| = Test set of sent_time by sr (multiple packets) |
| ~ netaccess needs_root IP ICMP |
| def _test(): |
| packet1 = IP(dst="8.8.8.8")/ICMP() |
| packet2 = IP(dst="8.8.4.4")/ICMP() |
| r = sr([packet1, packet2], timeout=2) |
| assert packet1.sent_time is not None |
| assert packet2.sent_time is not None |
| |
| retry_test(_test) |
| |
| = Test set of sent_time by srflood |
| ~ netaccess needs_root IP ICMP |
| def _test(): |
| packet = IP(dst="8.8.8.8")/ICMP() |
| r = srflood(packet, timeout=0.5) |
| assert packet.sent_time is not None |
| |
| retry_test(_test) |
| |
| = Test set of sent_time by srflood (multiple packets) |
| ~ netaccess needs_root IP ICMP |
| def _test(): |
| packet1 = IP(dst="8.8.8.8")/ICMP() |
| packet2 = IP(dst="8.8.4.4")/ICMP() |
| r = srflood([packet1, packet2], timeout=0.5) |
| assert packet1.sent_time is not None |
| assert packet2.sent_time is not None |
| |
| retry_test(_test) |
| |
| ############ |
| ############ |
| + ManuFDB tests |
| |
| = __repr__ |
| |
| if conf.manufdb: |
| len(conf.manufdb) |
| else: |
| True |
| |
| = check _resolve_MAC |
| |
| if conf.manufdb: |
| assert conf.manufdb._resolve_MAC("00:00:17") == "Oracle" |
| else: |
| True |
| |
| |
| ############ |
| ############ |
| + Ether tests with IPv6 |
| |
| = Ether IPv6 checking for dst |
| ~ netaccess ipv6 |
| |
| p = Ether()/IPv6(dst="www.google.com")/TCP() |
| assert p.dst != p[IPv6].dst |
| p.show() |
| |
| |
| ############ |
| ############ |
| + pcap / pcapng format support |
| ~ pcap |
| |
| = Variable creations |
| from io import BytesIO |
| pcapfile = BytesIO(b'\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00e\x00\x00\x00\xcf\xc5\xacVo*\n\x00(\x00\x00\x00(\x00\x00\x00E\x00\x00(\x00\x01\x00\x00@\x06|\xcd\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91|\x00\x00\xcf\xc5\xacV_-\n\x00\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r\xcf\xc5\xacV\xf90\n\x00\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x01|\xde\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\xf7\xff\x00\x00\x00\x00') |
| pcapngfile = BytesIO(b'\n\r\r\n\\\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x01\x00,\x00File created by merging: \nFile1: test.pcap \n\x04\x00\x08\x00mergecap\x00\x00\x00\x00\\\x00\x00\x00\x01\x00\x00\x00\\\x00\x00\x00e\x00\x00\x00\xff\xff\x00\x00\x02\x006\x00Unknown/not available in original file format(libpcap)\x00\x00\t\x00\x01\x00\x06\x00\x00\x00\x00\x00\x00\x00\\\x00\x00\x00\x06\x00\x00\x00H\x00\x00\x00\x00\x00\x00\x00\x8d*\x05\x00/\xfc[\xcd(\x00\x00\x00(\x00\x00\x00E\x00\x00(\x00\x01\x00\x00@\x06|\xcd\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91|\x00\x00H\x00\x00\x00\x06\x00\x00\x00<\x00\x00\x00\x00\x00\x00\x00\x8d*\x05\x00\x1f\xff[\xcd\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r<\x00\x00\x00\x06\x00\x00\x00<\x00\x00\x00\x00\x00\x00\x00\x8d*\x05\x00\xb9\x02\\\xcd\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x01|\xde\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\xf7\xff\x00\x00\x00\x00<\x00\x00\x00') |
| pcapnanofile = BytesIO(b"M<\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00e\x00\x00\x00\xcf\xc5\xacV\xc9\xc1\xb5'(\x00\x00\x00(\x00\x00\x00E\x00\x00(\x00\x01\x00\x00@\x06|\xcd\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91|\x00\x00\xcf\xc5\xacV-;\xc1'\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r\xcf\xc5\xacV\x9aL\xcf'\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x01|\xde\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\xf7\xff\x00\x00\x00\x00") |
| pcapwirelenfile = BytesIO(b'\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00\x01\x00\x00\x00}\x87pZ.\xa2\x08\x00\x0f\x00\x00\x00\x10\x00\x00\x00\xff\xff\xff\xff\xff\xff GG\xee\xdd\xa8\x90\x00a') |
| pcapngdefaults = BytesIO(base64.b64decode(b'Cg0NChwAAABNPCsaAQAAAP//////////HAAAAAEAAAAgAAAAEgEAAP//AAAJAAEACUeZiQAAAAAgAAAAAQAAACAAAAASAQAA//8AAAkAAQAJAAAAAAAAACAAAAABAAAAIAAAABIBAAD//wAACQABAAkAAAAAAAAAIAAAAAEAAAAgAAAAEgEAAP//AAAJAAEACQAAAAAAAAAgAAAABgAAAIQBAAADAAAApO/bFdgJaeBiAQAAYgEAAFVVVVVVVVXV////////IMbr4D7PCABFAAFIlQkAAEAR5JwAAAAA/////wBEAEMBNJDsAQEGAFSpVwIACoAAAAAAAAAAAAAAAAAAAAAAACDG6+A+zwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABjglNjNQEB/wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAsOs+bAAAhAEAAAYAAACAAQAAAwAAAKTv2xXIDYznYAEAAGABAABVVVVVVVVV1QEAXn//+iDG6+A+zwgARQABRgGPAAAEEal3qf5wqO////rhbgdsATJi0U5PVElGWSAqIEhUVFAvMS4xDQpIT1NUOiAyMzkuMjU1LjI1NS4yNTA6MTkwMA0KQ0FDSEUtQ09OVFJPTDogbWF4LWFnZT0xODAwDQpMT0NBVElPTjogaHR0cDovLzE2OS4yNTQuMTEyLjE2ODo1NTAwMC9ucmMvZGRkLnhtbA0KTlQ6IHV1aWQ6NEQ0NTQ5MzAtMDIwMC0xMDAwLTgwMDEtMjBDNkVCRTAzRUNGDQpOVFM6IHNzZHA6YWxpdmUNClNFUlZFUjogRnJlZUJTRC84LjAgVVBuUC8xLjAgUGFuYXNvbmljLU1JTC1ETE5BLVNWLzEuMA0KVVNOOiB1dWlkOjRENDU0OTMwLTAyMDAtMTAwMC04MDAxLTIwQzZFQkUwM0VDRg0KDQpcQcvWgAEAAAYAAAC4AQAAAwAAAKTv2xV4Ao3nlQEAAJUBAABVVVVVVVVV1QEAXn//+iDG6+A+zwgARQABewGQAAAEEalBqf5wqO////rhbgdsAWfu+k5PVElGWSAqIEhUVFAvMS4xDQpIT1NUOiAyMzkuMjU1LjI1NS4yNTA6MTkwMA0KQ0FDSEUtQ09OVFJPTDogbWF4LWFnZT0xODAwDQpMT0NBVElPTjogaHR0cDovLzE2OS4yNTQuMTEyLjE2ODo1NTAwMC9ucmMvZGRkLnhtbA0KTlQ6IHVybjpwYW5hc29uaWMtY29tOmRldmljZTpwMDBSZW1vdGVDb250cm9sbGVyOjENCk5UUzogc3NkcDphbGl2ZQ0KU0VSVkVSOiBGcmVlQlNELzguMCBVUG5QLzEuMCBQYW5hc29uaWMtTUlMLURMTkEtU1YvMS4wDQpVU046IHV1aWQ6NEQ0NTQ5MzAtMDIwMC0xMDAwLTgwMDEtMjBDNkVCRTAzRUNGOjp1cm46cGFuYXNvbmljLWNvbTpkZXZpY2U6cDAwUmVtb3RlQ29udHJvbGxlcjoxDQoNCrLVKmoAAAC4AQAABgAAAHgBAAADAAAApO/bFVjbjedXAQAAVwEAAFVVVVVVVVXVAQBef//6IMbr4D7PCABFAAE9AZEAAAQRqX6p/nCo7///+uFuB2wBKaZATk9USUZZICogSFRUUC8xLjENCkhPU1Q6IDIzOS4yNTUuMjU1LjI1MDoxOTAwDQpDQUNIRS1DT05UUk9MOiBtYXgtYWdlPTE4MDANCkxPQ0FUSU9OOiBodHRwOi8vMTY5LjI1NC4xMTIuMTY4OjU1MDAwL25yYy9kZGQueG1sDQpOVDogdXBucDpyb290ZGV2aWNlDQpOVFM6IHNzZHA6YWxpdmUNClNFUlZFUjogRnJlZUJTRC84LjAgVVBuUC8xLjAgUGFuYXNvbmljLU1JTC1ETE5BLVNWLzEuMA0KVVNOOiB1dWlkOjRENDU0OTMwLTAyMDAtMTAwMC04MDAxLTIwQzZFQkUwM0VDRjo6dXBucDpyb290ZGV2aWNlDQoNCjagXoUAeAEAAAYAAAC0AQAAAwAAAKTv2xXYw47nkwEAAJMBAABVVVVVVVVV1QEAXn//+iDG6+A+zwgARQABeQGSAAAEEalBqf5wqO////rhbgdsAWWV4E5PVElGWSAqIEhUVFAvMS4xDQpIT1NUOiAyMzkuMjU1LjI1NS4yNTA6MTkwMA0KQ0FDSEUtQ09OVFJPTDogbWF4LWFnZT0xODAwDQpMT0NBVElPTjogaHR0cDovLzE2OS4yNTQuMTEyLjE2ODo1NTAwMC9ucmMvZGRkLnhtbA0KTlQ6IHVybjpwYW5hc29uaWMtY29tOnNlcnZpY2U6cDAwTmV0d29ya0NvbnRyb2w6MQ0KTlRTOiBzc2RwOmFsaXZlDQpTRVJWRVI6IEZyZWVCU0QvOC4wIFVQblAvMS4wIFBhbmFzb25pYy1NSUwtRExOQS1TVi8xLjANClVTTjogdXVpZDo0RDQ1NDkzMC0wMjAwLTEwMDAtODAwMS0yMEM2RUJFMDNFQ0Y6OnVybjpwYW5hc29uaWMtY29tOnNlcnZpY2U6cDAwTmV0d29ya0NvbnRyb2w6MQ0KDQovXKFrALQBAAAGAAAAqAEAAAMAAACk79sVuJKP54cBAACHAQAAVVVVVVVVVdUBAF5///ogxuvgPs8IAEUAAW0BkwAABBGpTKn+cKjv///64W4HbAFZRNJOT1RJRlkgKiBIVFRQLzEuMQ0KSE9TVDogMjM5LjI1NS4yNTUuMjUwOjE5MDANCkNBQ0hFLUNPTlRST0w6IG1heC1hZ2U9MTgwMA0KTE9DQVRJT046IGh0dHA6Ly8xNjkuMjU0LjExMi4xNjg6NTUwMDAvbnJjL2RkZC54bWwNCk5UOiB1cm46ZGlhbC1tdWx0aXNjcmVlbi1vcmc6c2VydmljZTpkaWFsOjENCk5UUzogc3NkcDphbGl2ZQ0KU0VSVkVSOiBGcmVlQlNELzguMCBVUG5QLzEuMCBQYW5hc29uaWMtTUlMLURMTkEtU1YvMS4wDQpVU046IHV1aWQ6NEQ0NTQ5MzAtMDIwMC0xMDAwLTgwMDEtMjBDNkVCRTAzRUNGOjp1cm46ZGlhbC1tdWx0aXNjcmVlbi1vcmc6c2VydmljZTpkaWFsOjENCg0KLn5A6QCoAQAA')) |
| |
| = Read a pcap file |
| pktpcap = rdpcap(pcapfile) |
| |
| = Read a pcapng file |
| pktpcapng = rdpcap(pcapngfile) |
| assert pktpcapng[0].time == 1454163407.666223 |
| |
| = Read a pcap file with nanosecond precision |
| pktpcapnano = rdpcap(pcapnanofile) |
| assert pktpcapnano[0].time == 1454163407.666223049 |
| |
| = Read a pcapng file with nanosecond precision and default tsresol |
| pktpcapngdefaults = rdpcap(pcapngdefaults) |
| assert pktpcapngdefaults[0].time == 1575115986.114775512 |
| assert Ether in pktpcapngdefaults[0] |
| |
| = Read a pcapng with little-endian SHB |
| pktcapng = sniff(offline=scapy_path("/test/pcaps/macos.pcapng.gz")) |
| assert len(pktcapng) != 0 |
| |
| = Write a pcapng |
| |
| tmpfile = get_temp_file(autoext=".pcapng") |
| r = RawPcapNgWriter(tmpfile) |
| r._write_block_shb() |
| r._write_block_idb(linktype=DLT_EN10MB) |
| ts = 1632568366.384185 |
| r._write_block_epb(raw(Ether()/"Hello Scapy!!!"), ifid=0, timestamp=ts) |
| r.f.close() |
| |
| assert os.stat(tmpfile).st_size == 108 |
| |
| l = rdpcap(tmpfile) |
| assert b"Scapy" in l[0][Raw].load |
| assert l[0].time == ts |
| |
| = Check wrpcapng() |
| |
| tmpfile = get_temp_file(autoext=".pcapng") |
| p = Ether()/"Hello Scapy!!!" |
| p.time = 1632568366.384185 |
| wrpcapng(tmpfile, p) |
| |
| assert os.stat(tmpfile).st_size == 108 |
| |
| l = rdpcap(tmpfile) |
| assert b"Scapy" in l[0][Raw].load |
| assert l[0].time == ts |
| |
| p = Ether() / IPv6() / TCP() |
| p.comment = b"Hello Scapy!" |
| wrpcapng(tmpfile, p) |
| l = rdpcap(tmpfile) |
| assert l[0].comment == p.comment |
| |
| = rdpcap on fifo |
| ~ linux |
| f = get_temp_file() |
| os.unlink(f) |
| os.mkfifo(f) |
| p = Ether(bytes(Ether(dst="ff:ff:ff:ff:ff:ff")/"Hello Scapy!!!")) |
| s = AsyncSniffer(offline=f) |
| s.start() |
| wrpcap(f, p) |
| s.join(timeout=1) |
| assert s.results[0] == p |
| |
| = Check multiple packets with different combination of linktype,comment,direction,sniffed_on fields. test both wrpcap() and wrpcapng() |
| import random,string |
| random.seed(0x2807) |
| plist = [] |
| ptypes = [] |
| ptypes.append(Ether((Ether() / IPv6() / TCP()).build())) |
| ptypes.append(IP((IP() / IPv6() / TCP()).build())) |
| ifaces=[None,'','i','int0',''.join(random.choices(string.printable,k=20))] |
| comments=[None,'','a','abcd',''.join(random.choices(string.printable,k=20))] |
| directions=[None,0,1,2,3] |
| |
| for iface in ifaces: |
| for comment in comments: |
| if comment is not None: |
| comment=comment.encode('utf-8') |
| for direction in directions: |
| for p in ptypes: |
| if iface is not None and type(ptypes[ifaces.index(iface) % len(ptypes)]) != type(p): |
| continue |
| pnew = p.copy() |
| pnew.time = 1632568366.384185 |
| pnew.sniffed_on = iface |
| pnew.direction = direction |
| pnew.comment = comment |
| plist.append(pnew) |
| |
| random.shuffle(plist) |
| tmpfile = get_temp_file(autoext=".pcapng") |
| wrpcapng(tmpfile, plist) |
| plist_check = rdpcap(tmpfile) |
| assert len(plist_check) == len(plist) |
| for i in range(len(plist)): |
| assert plist_check[i].comment == plist[i].comment |
| assert plist_check[i].direction == plist[i].direction |
| assert plist_check[i].sniffed_on == plist[i].sniffed_on |
| assert plist_check[i].time == plist[i].time |
| #if interface is unknown, verify pkt bytes integrity and that linktype was set to first packet |
| if plist[i].sniffed_on is None: |
| assert bytes(plist_check[i]) == bytes(plist[i]) |
| assert type(plist_check[i]) == type(plist[0]) |
| else: |
| assert plist_check[i] == plist[i] |
| |
| tmpfile = get_temp_file(autoext=".pcap") |
| wrpcap(tmpfile, plist) |
| plist_check = rdpcap(tmpfile) |
| for i in range(len(plist)): |
| assert plist_check[i].time == plist[i].time |
| assert type(plist_check[i]) == type(plist[0]) |
| assert bytes(plist_check[i]) == bytes(plist[i]) |
| |
| = PcapNg - Process Information Block |
| |
| pib_pcapng_file = BytesIO(b'\n\r\r\n\xbc\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x02\x00\x05\x00arm64\x00\x00\x00\x03\x00f\x00Darwin Kernel Version 23.3.0: Thu Dec 21 02:29:41 PST 2023; root:xnu-10002.81.5~11/RELEASE_ARM64_T8122\x00\x00\x04\x00 \x00tcpdump (libpcap version 1.10.1)\x00\x00\x00\x00\xbc\x00\x00\x00\x01\x00\x00\x00 \x00\x00\x00\x01\x00\x00\x00\x00\x00\x08\x00\x02\x00\x03\x00en0\x00\x00\x00\x00\x00 \x00\x00\x00\x01\x00\x00\x80 \x00\x00\x00$\'\x00\x00\x02\x00\x06\x00trustd\x00\x00\x00\x00\x00\x00 \x00\x00\x00\x01\x00\x00\x80$\x00\x00\x00")\x00\x00\x02\x00\x0c\x00mobileassetd\x00\x00\x00\x00$\x00\x00\x00\x06\x00\x00\x00\x90\x00\x00\x00\x00\x00\x00\x00\xfb\x18\x06\x00EcqdB\x00\x00\x00B\x00\x00\x00\xe8\x9f\x80\xfa\x8c\xc6P\xa6\xd8\xd5\x83v\x08\x00E\x00\x004\x00\x00@\x00@\x06\x90T\nh\x01\xc3\xc0\xe5\xdd_\xf4\xb8\x00P\x95\xc3\xcb\x01\xcb\xeb\x11\xe8\x80\x11\x08\x00\x0c\xe6\x00\x00\x01\x01\x08\n\xbe\xb8\xd4\xb3\xbb\x9b4\xbc\x00\x00\x01\x80\x04\x00\x00\x00\x00\x00\x03\x80\x04\x00\x01\x00\x00\x00\x02\x00\x04\x00\x02\x00\x00\x00\x02\x80\x04\x00\x00\x00\x00\x00\x04\x80\x04\x00\x10\x00\x00\x00\x00\x00\x00\x00\x90\x00\x00\x00') |
| |
| l = rdpcap(pib_pcapng_file) |
| assert(len(l) == 1) |
| assert(TCP in l[0]) |
| assert(len(l[0].process_information) == 2) |
| assert(l[0].process_information["proc"]["name"] == "trustd") |
| |
| = OSS-Fuzz Findings |
| |
| from io import BytesIO |
| # Issue 68352 |
| file = BytesIO(b"\n\r\r\n\x1c\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x1c\x00\x00\x00\x01\x00\x00\x00\x14\x00\x00\x00\xe4\x00\x00\x00\x00\x00\x04\x00\x14\x00\x00\x00\x01\x00\x00\x00(\x00\x00\x00\xe4\x00\x00\x00\x00\x00\x04\x00\x02\x00\t\x00b'ens16\xb0'\x00\x00\x00\x00\x00\x00\x00(\x00\x00\x00\x06\x00\x00\x004\x00\x00\x00\x01\x00\x00\x00}\x17\x06\x00\xb5t\x1d\x85\x14\x00\x00\x00\x14\x00\x00\x00E\x00\x00\x14\x00\x01\x00\x00@\x00|\xe7\x7f\x00\x00\x01\x7f\x00\x00\x014\x00\x00\x00") |
| rdpcap(file) |
| |
| # Issue 68354 |
| file = BytesIO(b'\n\r\r\n\xff\xfe\xfe\xffM<+\x1a') |
| try: |
| rdpcap(file) |
| except Scapy_Exception: |
| pass |
| |
| # Issue #70115 |
| file = BytesIO(b"\n\r\r\n\x00\x00\x008\x1a+<M\x00\x01\x00\x00\xef\xff\xff\x81\x01\x00\x00\x01\x00\x00\x00\x0c\x00\x00\x00\x00\x00\x00\x00\x009\x00\x00\x00\x01\x00\x00\x000\x00e\x00\x00\x00\x00\x00\x00\x00\x008\x00\x00\x00\x06\x00\x00\x00d\x00\x00\x00\x00'\x00\x00\x00\x01\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x000\x00\x00\x00\x01\x00\x00\x008\x00\x00\n\x0c\x00A\x05\xc0\x01\x00\x00\x00\x00\x00d\x00\x00\x00\x00\x00\x00\x00\x00\x000\x00\x00\x00\x01\x00\x00\x008\x00\x00\n\x0c\x00A\x05\xc0\x83\x00\x00\x043\x01\x00\x00\x00\x00\x00d\x00\x00\x00\x00d") |
| l = rdpcap(file) |
| |
| # Issue #69628 |
| file = BytesIO(b"\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00\x01\x00\x00\x00\x04{\xdcf\xc2\xa5\x07\x008\x00\x00\x008\x00\x00\x00A]+\xdb]\x04\x8e(6\n\x99\xcb\x08\x00E\x00\x00*\x00\x01\x00\x00@\x06\xe3V\x07\x87\xa5m\x17\x15\xd3m\x01\x85\x01\x85\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\xc5_\x00\x000\x00") |
| l = rdpcap(file) |
| assert l[0][LDAP].summary() == "LDAP" |
| |
| # Issue #69628 - 32-bit alternative |
| file = BytesIO(b'\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00\x01\x00\x00\x00%\xa8\xddfK\x1b\x05\x00\xca\xca\xca\xca*\x00\x00\x00\xff\xff\xff\xff\xff\xff\x86"\x11&\xab3\x08\x06\x00\x01\x08\x00\x06\x04\x00\x01]\x80\x0f\x13*r\n\x00\x02\x0f\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00') |
| l = rdpcap(file) |
| assert len(l) == 0 or ARP in l[0] |
| |
| = Read a pcap file with wirelen != captured len |
| pktpcapwirelen = rdpcap(pcapwirelenfile) |
| |
| = Check all packet lists are the same |
| assert list(pktpcap) == list(pktpcapng) == list(pktpcapnano) |
| assert [float(p.time) for p in pktpcap] == [float(p.time) for p in pktpcapng] == [float(p.time) for p in pktpcapnano] |
| |
| = Check packets from pcap file |
| assert all(IP in pkt for pkt in pktpcap) |
| assert all(any(proto in pkt for pkt in pktpcap) for proto in [ICMP, UDP, TCP]) |
| |
| = Check wirelen value from pcap file |
| assert len(pktpcapwirelen) == 1 |
| assert pktpcapwirelen[0].wirelen is not None |
| assert len(pktpcapwirelen[0]) < pktpcapwirelen[0].wirelen |
| |
| = Check wrpcap() then rdpcap() with wirelen |
| import os, tempfile |
| fdesc, filename = tempfile.mkstemp() |
| fdesc = os.fdopen(fdesc, "wb") |
| wrpcap(fdesc, pktpcapwirelen) |
| fdesc.close() |
| newpktpcapwirelen = rdpcap(filename) |
| assert len(newpktpcapwirelen) == 1 |
| assert newpktpcapwirelen[0].wirelen is not None |
| assert len(newpktpcapwirelen[0]) < newpktpcapwirelen[0].wirelen |
| assert newpktpcapwirelen[0].wirelen == pktpcapwirelen[0].wirelen |
| |
| = Check wrpcap() then rdpcap() with sent_time on SndRcvList |
| f = get_temp_file() |
| s = Ether()/IP() |
| r = Ether()/IP() |
| s.sent_time = 1 |
| r.time = 2 |
| wrpcap(f, SndRcvList([(s, r)])) |
| pcap = rdpcap(f) |
| assert pcap[0].time == 1 |
| assert pcap[1].time == 2 |
| |
| = Check wrpcap() |
| fdesc, filename = tempfile.mkstemp() |
| fdesc = os.fdopen(fdesc, "wb") |
| wrpcap(fdesc, pktpcap) |
| fdesc.close() |
| |
| = Check offline sniff() (by PacketList) |
| l=sniff(offline=PacketList([IP()/TCP(),IP()/TCP()])) |
| assert len(l) == 2 |
| assert all(TCP in p for p in l) |
| |
| = Check offline sniff() (by filename) |
| assert list(pktpcap) == list(sniff(offline=filename)) |
| |
| = Check offline sniff() (by file object) |
| fdesc = open(filename, "rb") |
| assert list(pktpcap) == list(sniff(offline=fdesc)) |
| fdesc.close() |
| |
| = Check offline sniff() with a filter (by filename) |
| ~ tcpdump libpcap |
| pktpcap_flt = [(proto, sniff(offline=filename, filter=proto.__name__.lower())) |
| for proto in [ICMP, UDP, TCP]] |
| assert all(list(pktpcap[proto]) == list(packets) for proto, packets in pktpcap_flt) |
| |
| = Check offline sniff() with a filter (by file object) |
| ~ tcpdump libpcap |
| fdesc = open(filename, "rb") |
| pktpcap_tcp = sniff(offline=fdesc, filter="tcp") |
| fdesc.close() |
| assert list(pktpcap[TCP]) == list(pktpcap_tcp) |
| os.unlink(filename) |
| |
| = Check offline sniff() with a PcapNg file and a filter (by file object) |
| ~ tcpdump libpcap |
| |
| pcapng_data = b'\n\r\r\n`\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x04\x009\x00TShark (Wireshark) 3.2.3 (Git v3.2.3 packaged as 3.2.3-1)\x00\x00\x00\x00\x00\x00\x00`\x00\x00\x00\x01\x00\x00\x00\x14\x00\x00\x00\xe4\x00\x00\x00\xff\xff\x00\x00\x14\x00\x00\x00\x06\x00\x00\x00<\x00\x00\x00\x00\x00\x00\x00\x98\xcd\x05\x00\x19\x83\xf7\x9e\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r<\x00\x00\x00' |
| |
| if OPENBSD: |
| # Note: OpenBSD tcpdump does not support PcapNg |
| assert True |
| else: |
| fdesc, filename = tempfile.mkstemp() |
| os.close(fdesc) |
| fd = open(filename, "wb") |
| fd.write(pcapng_data) |
| fd.close() |
| packets = sniff(offline=filename, filter="udp") |
| os.unlink(filename) |
| assert UDP in packets[0] |
| |
| = Check offline sniff() with Packets and tcpdump with a filter |
| ~ tcpdump libpcap |
| |
| l = sniff(offline=IP()/UDP(sport=(10000, 10001)), filter="udp") |
| assert len(l) == 2 |
| assert all(UDP in p for p in l) |
| |
| l = sniff(offline=[p for p in IP()/UDP(sport=(10000, 10001))], filter="udp") |
| assert len(l) == 2 |
| assert all(UDP in p for p in l) |
| |
| l = sniff(offline=IP()/UDP(sport=(10000, 10001)), filter="tcp") |
| assert len(l) == 0 |
| |
| = Check offline sniff() with Packets, tcpdump and a bad filter |
| ~ tcpdump libpcap |
| |
| try: |
| sniff(offline=IP()/UDP(), filter="bad filter") |
| except Scapy_Exception: |
| pass |
| else: |
| assert False |
| |
| = Check online sniff() with a bad filter |
| ~ needs_root tcpdump libpcap |
| try: |
| sniff(timeout=0, filter="bad filter") |
| except Scapy_Exception: |
| pass |
| else: |
| assert False |
| |
| = Check offline sniff with lfilter |
| assert len(sniff(offline=[IP()/UDP(), IP()/TCP()], lfilter=lambda x: TCP in x)) == 1 |
| |
| = Check offline sniff() without a tcpdump binary |
| ~ tcpdump |
| from unittest import mock |
| |
| conf_prog_tcpdump = conf.prog.tcpdump |
| conf.prog.tcpdump = "tcpdump_fake" |
| |
| def _test_sniff_notcpdump(): |
| try: |
| sniff(offline="fake.pcap", filter="tcp") |
| assert False |
| except: |
| assert True |
| |
| _test_sniff_notcpdump() |
| conf.prog.tcpdump = conf_prog_tcpdump |
| |
| = Check wrpcap(nano=True) |
| fdesc, filename = tempfile.mkstemp() |
| fdesc = os.fdopen(fdesc, "wb") |
| pktpcapnano[0].time += Decimal('1E-9') |
| wrpcap(fdesc, pktpcapnano, nano=True) |
| fdesc.close() |
| pktpcapnanoread = rdpcap(filename) |
| assert pktpcapnanoread[0].time == pktpcapnano[0].time |
| os.unlink(filename) |
| |
| = Check PcapNg with nanosecond precision using obsolete packet block |
| * first packet from capture file icmp2.ntar -- https://wiki.wireshark.org/Development/PcapNg?action=AttachFile&do=view&target=icmp2.ntar |
| pcapngfile = BytesIO(b'\n\r\r\n\x1c\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xa8\x03\x00\x00\x00\x00\x00\x00\x1c\x00\x00\x00\x01\x00\x00\x00(\x00\x00\x00\x01\x00\x00\x00\xff\xff\x00\x00\r\x00\x01\x00\x04\x04K\x00\t\x00\x01\x00\tK=N\x00\x00\x00\x00(\x00\x00\x00\x02\x00\x00\x00n\x00\x00\x00\x00\x00\x00\x00e\x14\x00\x00)4\'ON\x00\x00\x00N\x00\x00\x00\x00\x12\xf0\x11h\xd6\x00\x13r\t{\xea\x08\x00E\x00\x00<\x90\xa1\x00\x00\x80\x01\x8e\xad\xc0\xa8M\x07\xc0\xa8M\x1a\x08\x00r[\x03\x00\xd8\x00abcdefghijklmnopqrstuvwabcdefghi\xeay$\xf6\x00\x00n\x00\x00\x00') |
| pktpcapng = rdpcap(pcapngfile) |
| assert len(pktpcapng) == 1 |
| pkt = pktpcapng[0] |
| # weird, but wireshark agrees |
| assert pkt.time == 22425.352221737 |
| assert isinstance(pkt, Ether) |
| pkt = pkt.payload |
| assert isinstance(pkt, IP) |
| pkt = pkt.payload |
| assert isinstance(pkt, ICMP) |
| pkt = pkt.payload |
| assert isinstance(pkt, Raw) and pkt.load == b'abcdefghijklmnopqrstuvwabcdefghi' |
| pkt = pkt.payload |
| assert isinstance(pkt, Padding) and pkt.load == b'\xeay$\xf6' |
| pkt = pkt.payload |
| assert isinstance(pkt, NoPayload) |
| |
| = Check PcapNg using Simple Packet Block |
| * previous file with the (obsolete) packet block replaced by a Simple Packet Block |
| pcapngfile = BytesIO(b'\n\r\r\n\x1c\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xa8\x03\x00\x00\x00\x00\x00\x00\x1c\x00\x00\x00\x01\x00\x00\x00(\x00\x00\x00\x01\x00\x00\x00\xff\xff\x00\x00\r\x00\x01\x00\x04\x04K\x00\t\x00\x01\x00\tK=N\x00\x00\x00\x00(\x00\x00\x00\x03\x00\x00\x00`\x00\x00\x00N\x00\x00\x00\x00\x12\xf0\x11h\xd6\x00\x13r\t{\xea\x08\x00E\x00\x00<\x90\xa1\x00\x00\x80\x01\x8e\xad\xc0\xa8M\x07\xc0\xa8M\x1a\x08\x00r[\x03\x00\xd8\x00abcdefghijklmnopqrstuvwabcdefghi\xeay$\xf6\x00\x00`\x00\x00\x00') |
| pktpcapng = rdpcap(pcapngfile) |
| assert len(pktpcapng) == 1 |
| pkt = pktpcapng[0] |
| assert isinstance(pkt, Ether) |
| pkt = pkt.payload |
| assert isinstance(pkt, IP) |
| pkt = pkt.payload |
| assert isinstance(pkt, ICMP) |
| pkt = pkt.payload |
| assert isinstance(pkt, Raw) and pkt.load == b'abcdefghijklmnopqrstuvwabcdefghi' |
| pkt = pkt.payload |
| assert isinstance(pkt, Padding) and pkt.load == b'\xeay$\xf6' |
| pkt = pkt.payload |
| assert isinstance(pkt, NoPayload) |
| |
| = Invalid pcapng files |
| |
| from io import BytesIO |
| |
| # Invalid PCAPNG format -> Raise |
| try: |
| invalid_pcapngfile_1 = BytesIO(b'\n\r\r\n\r\x00\x00\x00M<+\x1a\xb2<\xb2\xa1\x01\x00\x00\x00\r\x00\x00\x00M<+\x1a\x80\xaa\xb2\x02') |
| rdpcap(invalid_pcapngfile_1) |
| assert False |
| except Scapy_Exception: |
| pass |
| |
| # Invalid Packet in PCAPNG -> return |
| invalid_pcapngfile_2 = BytesIO(b'\n\r\r\n\x00\x00\x00\x1c\x1a+<M\x00\x01\x00\x00\x00\x00\x00\x01\x00\x00\x00\x10\x00\x00\x00\x1c\x00\x00\x00\x01\x00\x00\x00\x10\x12\x12\x12\x12\x00\x00\x00\x10') |
| assert len(rdpcap(invalid_pcapngfile_2)) == 0 |
| |
| # Invalid interface ID in PCAPNG -> raise EOFError |
| try: |
| invalid_pcapngfile_3 = BytesIO(b'\n\n\n\x14\x00\x00\x00M<+\x1a \x14\x00\x00\x00\x03\x00\x00\x00\x14\x00\x00\x00 \x14\x00\x00\x00') |
| rdpcap(invalid_pcapngfile_3) |
| assert False |
| except Scapy_Exception: |
| pass |
| |
| # Invalid SPB in PCAPNG -> raise EOFError |
| try: |
| invalid_pcapngfile_4 = BytesIO(b'\n\n\n\x14\x00\x00\x00M<+\x1a \x14\x00\x00\x00\x01\x00\x00\x00\x14\x00\x00\x00 \x14\x00\x00\x00\x03\x00\x00\x00\x0c\x00\x00\x00\x0c\x00\x00\x00') |
| rdpcap(invalid_pcapngfile_4) |
| assert False |
| except Scapy_Exception: |
| pass |
| |
| = Check PcapWriter on null write |
| |
| f = BytesIO() |
| w = PcapWriter(f) |
| w.write([]) |
| assert len(f.getvalue()) == 0 |
| |
| # Stop being closed for reals, but we still want to have the header written |
| with mock.patch.object(f, 'close') as cf: |
| w.close() |
| |
| cf.assert_called_once_with() |
| assert len(f.getvalue()) != 0 |
| |
| = Check PcapWriter sets correct linktype after null write |
| |
| f = BytesIO() |
| w = PcapWriter(f) |
| w.write([]) |
| assert len(f.getvalue()) == 0 |
| w.write(Ether()/IP()/ICMP()) |
| assert len(f.getvalue()) != 0 |
| |
| # Stop being closed for reals, but we still want to have the header written |
| with mock.patch.object(f, 'close') as cf: |
| w.close() |
| |
| cf.assert_called_once_with() |
| f.seek(0) or None |
| assert len(f.getvalue()) != 0 |
| |
| r = PcapReader(f) |
| f.seek(0) or None |
| assert r.LLcls is Ether |
| assert r.linktype == DLT_EN10MB |
| |
| l = [ p for p in RawPcapReader(f) ] |
| assert len(l) == 1 |
| |
| = Check RawPcapReader on pcap |
| ~ pcap |
| |
| fd = get_temp_file() |
| wrpcap(fd, [Ether()/IP()/ICMP()]) |
| assert len([p for p in RawPcapReader(fd)]) == 1 |
| |
| for (x, y) in RawPcapReader(fd): |
| pass |
| |
| = Check RawPcapReader with a Context Manager |
| ~ pcap |
| |
| filename = get_temp_file(fd=False) |
| wrpcap(filename, [IP()/TCP(), IP()/UDP()]) |
| |
| try: |
| with RawPcapReader(filename) as reader: |
| packet = next(reader, None) |
| assert True |
| except TypeError: |
| assert False |
| |
| = Check RawPcapWriter |
| ~ pcap |
| |
| # GH3256 |
| fd = get_temp_file() |
| with RawPcapWriter(fd, linktype=1) as w: |
| w.write(b"test") |
| |
| fd = get_temp_file() |
| with RawPcapWriter(fd) as w: |
| w.write(b"test") |
| assert w.linktype == 1 |
| |
| = Check tcpdump() |
| ~ tcpdump |
| from io import BytesIO |
| * No very specific tests because we do not want to depend on tcpdump output |
| pcapfile = BytesIO(b'\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00\x01\x00\x00\x000}$]\xff\\\t\x006\x00\x00\x006\x00\x00\x00\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x08\x00E\x00\x00(\x00\x01\x00\x00@\x06|\xcd\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91|\x00\x000}$]\x87i\t\x00*\x00\x00\x00*\x00\x00\x00\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x08\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r0}$]\xfbp\t\x00*\x00\x00\x00*\x00\x00\x00\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x08\x00E\x00\x00\x1c\x00\x01\x00\x00@\x01|\xde\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\xf7\xff\x00\x00\x00\x00') |
| |
| data = tcpdump(pcapfile, dump=True, args=['-nn']).split(b'\n') |
| print(data) |
| assert b'127.0.0.1.20 > 127.0.0.1.80:' in data[0] |
| assert b'127.0.0.1.53 > 127.0.0.1.53:' in data[1] |
| assert b'127.0.0.1 > 127.0.0.1:' in data[2] |
| |
| * Non existing tcpdump binary |
| |
| from unittest import mock |
| |
| conf_prog_tcpdump = conf.prog.tcpdump |
| conf.prog.tcpdump = "tcpdump_fake" |
| |
| def _test_tcpdump_notcpdump(): |
| try: |
| tcpdump(IP()/TCP()) |
| assert False |
| except: |
| assert True |
| |
| _test_tcpdump_notcpdump() |
| conf.prog.tcpdump = conf_prog_tcpdump |
| |
| # Also check with use_tempfile=True (for non-OSX platforms) |
| pcapfile.seek(0) or None |
| tempfile_count = len(conf.temp_files) |
| data = tcpdump(pcapfile, dump=True, args=['-nn'], use_tempfile=True).split(b'\n') |
| print(data) |
| assert b'127.0.0.1.20 > 127.0.0.1.80:' in data[0] |
| assert b'127.0.0.1.53 > 127.0.0.1.53:' in data[1] |
| assert b'127.0.0.1 > 127.0.0.1:' in data[2] |
| # We should have another tempfile tracked. |
| assert len(conf.temp_files) > tempfile_count |
| |
| # Check with a simple packet |
| data = tcpdump([Ether()/IP()/ICMP()], dump=True, args=['-nn']).split(b'\n') |
| print(data) |
| assert b'127.0.0.1 > 127.0.0.1: ICMP' in data[0].upper() |
| |
| = Check tcpdump() command with linktype |
| ~ tcpdump libpcap |
| |
| f = BytesIO() |
| pkt = Ether()/IP()/ICMP() |
| |
| with mock.patch('subprocess.Popen', return_value=Bunch( |
| stdin=f, wait=lambda: None)) as popen: |
| # Prevent closing the BytesIO |
| with mock.patch.object(f, 'close'): |
| tcpdump([pkt], linktype="DLT_EN10MB", use_tempfile=False) |
| |
| expected_command = [conf.prog.tcpdump, '-y', 'EN10MB', '-U', '-r', '-'] |
| if OPENBSD: |
| expected_command = [conf.prog.tcpdump, '-y', 'EN10MB', '-r', '-'] |
| |
| popen.assert_called_once_with( |
| expected_command, |
| stdin=subprocess.PIPE, stdout=None, stderr=None) |
| |
| print(bytes_hex(f.getvalue())) |
| assert raw(pkt) in f.getvalue() |
| f.close() |
| del f, pkt |
| |
| = Check tcpdump() command with linktype and args |
| ~ tcpdump libpcap |
| |
| f = BytesIO() |
| pkt = Ether()/IP()/ICMP() |
| |
| with mock.patch('subprocess.Popen', return_value=Bunch( |
| stdin=f, wait=lambda: None)) as popen: |
| # Prevent closing the BytesIO |
| with mock.patch.object(f, 'close'): |
| tcpdump([pkt], linktype=scapy.data.DLT_EN10MB, use_tempfile=False) |
| |
| expected_command = [conf.prog.tcpdump, '-y', 'EN10MB', '-U', '-r', '-'] |
| if OPENBSD: |
| expected_command = [conf.prog.tcpdump, '-y', 'EN10MB', '-r', '-'] |
| |
| popen.assert_called_once_with( |
| expected_command, |
| stdin=subprocess.PIPE, stdout=None, stderr=None) |
| |
| print(bytes_hex(f.getvalue())) |
| assert raw(pkt) in f.getvalue() |
| f.close() |
| del f, pkt |
| |
| = Check sniff() offline with linktype & 802.11 filter |
| ~ tcpdump linux |
| |
| fd = get_temp_file() |
| wrpcap(fd, [RadioTap()/Dot11()/Dot11ProbeReq(), RadioTap()/Dot11()]) |
| lst = sniff(offline=fd, filter="subtype probe-req") |
| assert len(lst) == 1 |
| |
| = Check tcpdump() command rejects non-string input for prog |
| |
| pkt = Ether()/IP()/ICMP() |
| |
| try: |
| tcpdump([pkt], prog=+17607067425, args=['-nn']) |
| except ValueError as e: |
| if hasattr(e, 'args'): |
| assert 'prog' in e.args[0] |
| else: |
| assert 'prog' in e.message |
| else: |
| assert False, 'expected exception' |
| |
| = Check tcpdump() command with tshark |
| ~ tshark |
| pcapfile = BytesIO(b'\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00e\x00\x00\x00\xcf\xc5\xacVo*\n\x00(\x00\x00\x00(\x00\x00\x00E\x00\x00(\x00\x01\x00\x00@\x06|\xcd\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91|\x00\x00\xcf\xc5\xacV_-\n\x00\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r\xcf\xc5\xacV\xf90\n\x00\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x01|\xde\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\xf7\xff\x00\x00\x00\x00') |
| # tshark doesn't need workarounds on OSX |
| tempfile_count = len(conf.temp_files) |
| values = [tuple(int(val) for val in line[:-1].split(b'\t')) for line in tcpdump(pcapfile, prog=conf.prog.tshark, getfd=True, args=['-T', 'fields', '-e', 'ip.ttl', '-e', 'ip.proto'])] |
| assert values == [(64, 6), (64, 17), (64, 1)] |
| assert len(conf.temp_files) == tempfile_count |
| |
| = Check tdecode command directly for tshark |
| ~ tshark |
| |
| pkts = [ |
| Ether()/IP(src='192.0.2.1', dst='192.0.2.2')/ICMP(type='echo-request')/Raw(b'X'*100), |
| Ether()/IP(src='192.0.2.2', dst='192.0.2.1')/ICMP(type='echo-reply')/Raw(b'X'*100), |
| ] |
| |
| # tshark doesn't need workarounds on OSX |
| tempfile_count = len(conf.temp_files) |
| |
| r = tdecode(pkts, dump=True) |
| r |
| assert b'Src: 192.0.2.1' in r |
| assert b'Src: 192.0.2.2' in r |
| assert b'Dst: 192.0.2.2' in r |
| assert b'Dst: 192.0.2.1' in r |
| assert b'Echo (ping) request' in r |
| assert b'Echo (ping) reply' in r |
| assert b'ICMP' in r |
| assert len(conf.temp_files) == tempfile_count |
| |
| = Check tdecode with linktype |
| ~ tshark |
| |
| # These are the same as the ping packets above |
| pkts = [ |
| b'\xff\xff\xff\xff\xff\xff\xac"\x0b\xc5j\xdb\x08\x00E\x00\x00\x80\x00\x01\x00\x00@\x01\xf6x\xc0\x00\x02\x01\xc0\x00\x02\x02\x08\x00\xb6\xbe\x00\x00\x00\x00XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX', |
| b'\xff\xff\xff\xff\xff\xff\xac"\x0b\xc5j\xdb\x08\x00E\x00\x00\x80\x00\x01\x00\x00@\x01\xf6x\xc0\x00\x02\x02\xc0\x00\x02\x01\x00\x00\xbe\xbe\x00\x00\x00\x00XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX', |
| ] |
| |
| # tshark doesn't need workarounds on OSX |
| tempfile_count = len(conf.temp_files) |
| |
| r = tdecode(pkts, dump=True, linktype=DLT_EN10MB) |
| assert b'Src: 192.0.2.1' in r |
| assert b'Src: 192.0.2.2' in r |
| assert b'Dst: 192.0.2.2' in r |
| assert b'Dst: 192.0.2.1' in r |
| assert b'Echo (ping) request' in r |
| assert b'Echo (ping) reply' in r |
| assert b'ICMP' in r |
| assert len(conf.temp_files) == tempfile_count |
| |
| |
| = Run scapy's tshark command |
| ~ needs_root |
| tshark(count=1, timeout=3) |
| |
| = Check wireshark() |
| ~ wireshark |
| |
| f = BytesIO() |
| pkt = Ether()/IP()/ICMP() |
| |
| with mock.patch('subprocess.Popen', return_value=Bunch(stdin=f)) as popen: |
| # Prevent closing the BytesIO |
| with mock.patch.object(f, 'close'): |
| wireshark([pkt]) |
| |
| popen.assert_called_once_with( |
| [conf.prog.wireshark, '-ki', '-'], |
| stdin=subprocess.PIPE, stdout=None, stderr=None) |
| |
| print(bytes_hex(f.getvalue())) |
| assert raw(pkt) in f.getvalue() |
| f.close() |
| del f, pkt |
| |
| = Check Raw IP pcap files |
| |
| import tempfile |
| filename = tempfile.mktemp(suffix=".pcap") |
| wrpcap(filename, [IP()/UDP(), IPv6()/UDP()], linktype=DLT_RAW) |
| packets = rdpcap(filename) |
| assert isinstance(packets[0], IP) and isinstance(packets[1], IPv6) |
| |
| = Check wrpcap() with no packet |
| |
| import tempfile |
| filename = tempfile.mktemp(suffix=".pcap") |
| wrpcap(filename, []) |
| fstat = os.stat(filename) |
| assert fstat.st_size != 0 |
| os.remove(filename) |
| |
| = Check wrpcap() with SndRcvList |
| |
| import tempfile |
| filename = tempfile.mktemp(suffix=".pcap") |
| wrpcap(filename, SndRcvList(res=[(Ether()/IP(), Ether()/IP())])) |
| assert len(rdpcap(filename)) == 2 |
| os.remove(filename) |
| |
| = Check wrpcap() with different packets types |
| |
| from unittest import mock |
| import os |
| import tempfile |
| |
| with mock.patch("scapy.utils.warning") as warning: |
| filename = tempfile.mktemp() |
| wrpcap(filename, [IP(), Ether(), IP(), IP()]) |
| os.remove(filename) |
| assert any("Inconsistent" in arg for arg in warning.call_args[0]) |
| |
| = Check wrpcap() with the Loopback layer |
| ~ tshark |
| |
| for cls in [Loopback, LoopbackOpenBSD]: |
| filename = tempfile.mktemp(suffix=".pcap") |
| wrpcap(filename, [cls()/IP()/ICMP()]) |
| return_value = b"".join(line for line in tcpdump(filename, prog=conf.prog.tshark, getfd=True)) |
| assert b"Echo (ping) request" in return_value |
| |
| ############ |
| ############ |
| + ERF Ethernet format support |
| |
| = Variable creations |
| erffile = BytesIO(b'3;!E_9\x92_\x02\x04\x00p\x00\x00\x00P\x00\x00\x00\x0fS?\xca\xc0\x1cjz\x18\x90\xed\x81\x00\x01:\x08\x00E\x00\x00(\xdf\xab@\x00;\x06\xb3s\n\x01]\xdb\n\xfb9\xda\xc3v\x84\xecD\x16\xb9\xab\xda\xa1b\xf9P\x10f\x98\x18\xcb\x00\x00\x00\x00\x90\x9e\xd7\xd2_\x929_\x0f\x9e\xcd\x1f\x01\x88\xb9\x15[/s<\x01\x88\xb9\x15[/\xcd\x1f\x01\x88\xb9\x15[/0\xcd"E_9\x92_\x02\x04\x00p\x00\x00\x00P\x00\x00\x1cjz\x18\x90\xed\x00\x0fS?\xca\xc0\x08\x00E\x00\x00(\xa2\xdd@\x00@\x06\xebA\n\xfb9\xda\n\x01]\xdb\x84\xec\xc3v\xda\xa1b\xf9D\x16\xb9\xacP\x10\x9a\xf0\xe4q\x00\x00\x00\x00\x00\x00\x00\x00o\xbc\xe2{_\x929_\x0f\x9f+3\x01\x88\xb9\x15u\x1e(^\x01\x88\xb9\x15u\x1e+3\x01\x88\xb9\x15u\x1e') |
| erffilewithheader = BytesIO(b'4;!E_9\x92_\x82\x00\x00x\x00\x00\x00P\x00\x00\x1a+<M^o\x00\x00\x00\x0fS?\xca\xc0\x1cjz\x18\x90\xed\x81\x00\x01:\x08\x00E\x00\x00(\xdf\xab@\x00;\x06\xb3s\n\x01]\xdb\n\xfb9\xda\xc3v\x84\xecD\x16\xb9\xab\xda\xa1b\xf9P\x10f\x98\x18\xcb\x00\x00\x00\x00\x90\x9e\xd7\xd2_\x929_\x0f\x9e\xcd\x1f\x01\x88\xb9\x15[/s<\x01\x88\xb9\x15[/\xcd\x1f\x01\x88\xb9\x15[/0\xcd"E_9\x92_\x82\x00\x00x\x00\x00\x00P\x00\x00\x1a+<M^o\x00\x00\x1cjz\x18\x90\xed\x00\x0fS?\xca\xc0\x08\x00E\x00\x00(\xa2\xdd@\x00@\x06\xebA\n\xfb9\xda\n\x01]\xdb\x84\xec\xc3v\xda\xa1b\xf9D\x16\xb9\xacP\x10\x9a\xf0\xe4q\x00\x00\x00\x00\x00\x00\x00\x00o\xbc\xe2{_\x929_\x0f\x9f+3\x01\x88\xb9\x15u\x1e(^\x01\x88\xb9\x15u\x1e+3\x01\x88\xb9\x15u\x1e') |
| |
| = Check reading of ERF Ethernet file |
| pkterf = rderf(erffile) |
| assert pkterf[0].time == 1603418463.270038318 |
| assert pkterf[0][IP].src == "10.1.93.219" |
| assert pkterf[0][IP].dst == "10.251.57.218" |
| assert pkterf[0][Ether].src == "1c:6a:7a:18:90:ed" |
| assert pkterf[0][Ether].dst == "00:0f:53:3f:ca:c0" |
| |
| = Check writing of ERF Ethernet file |
| import os, tempfile |
| fdesc, filename = tempfile.mkstemp() |
| fdesc = os.fdopen(fdesc, "wb") |
| wrerf(fdesc, pkterf) |
| fdesc.close() |
| newpkterf = rderf(filename) |
| |
| assert pkterf[1][Ether].src == newpkterf[1][Ether].src |
| |
| assert len(pkterf) == len(newpkterf) |
| assert newpkterf[0].time is not None |
| assert newpkterf[0].wirelen is not None |
| assert newpkterf[0].time == pkterf[0].time |
| assert newpkterf[0].wirelen == pkterf[0].wirelen |
| assert newpkterf[1].time is not None |
| assert newpkterf[1].wirelen is not None |
| assert newpkterf[1].time == pkterf[1].time |
| assert newpkterf[1].wirelen == pkterf[1].wirelen |
| |
| _, filename = tempfile.mkstemp() |
| wrerf(filename, pkterf, append=True) |
| wrerf(filename, pkterf, append=True) |
| newdoublepkterf = rderf(filename) |
| |
| assert len(newpkterf) * 2 == len(newdoublepkterf) |
| |
| = Check rderf |
| pkterf = rderf(erffilewithheader) |
| assert pkterf[1].time == 1603418463.270062279 |
| assert pkterf[1][Ether].src == "00:0f:53:3f:ca:c0" |
| |
| ############ |
| ############ |
| + Mocked read_routes() and read_routes6() calls |
| |
| = Create patcher util |
| ~ mock_read_routes_bsd little_endian_only |
| |
| # mock the random to get consistency |
| from unittest import mock |
| |
| from scapy.pton_ntop import inet_pton |
| import scapy.arch.bpf.pfroute |
| |
| og_afinet6 = socket.AF_INET6 |
| og_inet_pton = socket.inet_pton |
| def mock_inet_pton(af, data): |
| if af in [24, 28, 30]: |
| return og_inet_pton(og_afinet6, data) |
| return og_inet_pton(af, data) |
| |
| |
| og_inet_ntop = socket.inet_ntop |
| def mock_inet_ntop(af, data): |
| if af in [24, 28, 30]: |
| return og_inet_ntop(og_afinet6, data) |
| return og_inet_ntop(af, data) |
| |
| |
| class BSDLoader: |
| def __init__(self, OPENBSD=False, FREEBSD=False, NETBSD=False, DARWIN=False, sysctldata=None, ifaces={}, AF_INET6=socket.AF_INET6): |
| self.sysctldata = sysctldata |
| self.ifaces = ifaces |
| socket.AF_LINK = 18 |
| self.loadpatches = [ |
| mock.patch('socket.AF_INET6', AF_INET6), |
| mock.patch('socket.inet_pton', side_effect=mock_inet_pton), |
| mock.patch('socket.inet_ntop', side_effect=mock_inet_ntop), |
| mock.patch('scapy.consts.OPENBSD', OPENBSD), |
| # mock.patch('scapy.consts.FREEBSD', FREEBSD), |
| mock.patch('scapy.consts.NETBSD', NETBSD), |
| mock.patch('scapy.consts.DARWIN', DARWIN), |
| ] |
| def __enter__(self): |
| # Apply patches that only occur when loading |
| for p in self.loadpatches: |
| p.start() |
| # Reload module |
| pfroute = importlib.reload(scapy.arch.bpf.pfroute) |
| # Now apply post-load patches |
| self.patches = [ |
| mock.patch.object( |
| pfroute, |
| '_sr1_bsdsysctl', |
| return_value=pfroute.pfmsghdrs(self.sysctldata) |
| ), |
| mock.patch.object( |
| pfroute, |
| '_get_if_list', |
| return_value=self.ifaces, |
| ), |
| ] |
| for p in self.patches: |
| p.start() |
| return pfroute |
| def __exit__(self, *args, **kwargs): |
| for p in self.loadpatches: |
| p.stop() |
| for p in self.patches: |
| p.stop() |
| |
| |
| = OpenBSD 7.5 amd64 - read_routes() |
| ~ mock_read_routes_bsd little_endian_only |
| |
| import zlib |
| |
| _PFROUTE_DATA = zlib.decompress(bytes.fromhex('789c7bc1c0ca92c0c0c8c0c0c0c160cec2c0e0ccc10006de0f1850003f0376c08a431c06049830f96bc40f30e2925710626460636163c828cb3360108d6548e5c4aabf0bc6576248c9482ec8494d2c4e4dc1e78e03607f323380fd09243d71f853109be6067c2623dcf5008d5fcfc080e2cf0f48f20a42cc0c1240e7e4e41be0340f593fbafbbd71b81f2b20d2fdf504dcff9f2aee6704bbdf151873d8dc8f961ce0ee67c4264ec0bd18ee0702cadc0fe2b280ddefc8d880d5fdb80031ee07a66b747e1732ffff7f440a22359f5c80bb9f1912fe2ccc58ddcf03a5cd675f494316c71a2f98f6c1bd09761f1002dd26f4b900bb7ad4f820d73fd0f4c4a280d53f5c04dc4dc03f70fb88711f25fe3980ee1f0607acfe61a7c83fe7ffa3f2d1d317f9ee1f05a360148c8251300a46c128180543030000bd836967')) |
| |
| |
| with BSDLoader(OPENBSD=True, sysctldata=_PFROUTE_DATA, AF_INET6=24) as pfroute: |
| routes = pfroute.read_routes() |
| |
| |
| assert routes == [ |
| (0, 0, '172.23.192.1', 'hvn0', '172.23.192.138', 1), |
| (3758096384, 4026531840, '127.0.0.1', 'lo0', '127.0.0.1', 1), |
| (2130706432, 4278190080, '127.0.0.1', 'lo0', '127.0.0.1', 1), |
| (2130706433, 4294967295, '127.0.0.1', 'lo0', '127.0.0.1', 1), |
| (2887237632, 4294963200, '172.23.192.138', 'hvn0', '172.23.192.138', 1), |
| (2887237633, 4294967295, '0.0.0.0', 'hvn0', '172.23.192.138', 1), |
| (2887237770, 4294967295, '0.0.0.0', 'hvn0', '172.23.192.138', 1), |
| (2887241727, 4294967295, '172.23.192.138', 'hvn0', '172.23.192.138', 1) |
| ] |
| |
| |
| = OpenBSD 7.5 amd64 - read_routes6() |
| ~ mock_read_routes_bsd little_endian_only |
| |
| import zlib |
| |
| _PFROUTE_DATA = zlib.decompress(bytes.fromhex('789ced96bb0dc2301086cf38481125a248411131011d3505a2600906406204325a4661040a6a4264f130d6c5b19c87e2f07f458adcd9be2f7fe190984647924414d3a67c1e6252dcf7544f56dfb24cbceac2ac171a7a633a979494e39fce6baffde9e32f94ff8e56eab5e9bfe076c98866ecf6eee7fbf8ebdfa03dffbef2ffcd6f38f977eb9f4eecf5aaf9347fb6311cff8bd77ce3f1bf7acdf7f5bfb18de1f8f3f9fd4bfe8f8a5e67ff9c5f1f8c7f6eaf57cdd79ffffbfe4fd5eb0ef297dcf9aef5a6f77fd5fe7de55f087bdd80f1e7d7b7977fa4fcb7af7bdaf467c7cff899b8f34b7f69abbbe4cfad0f26ffc6ff3ffcfa60f29f0c347f00000000000000000000306a9e0a72ae83')) |
| |
| |
| with BSDLoader(OPENBSD=True, sysctldata=_PFROUTE_DATA, AF_INET6=24) as pfroute: |
| routes = pfroute.read_routes6() |
| |
| |
| assert routes == [ |
| ('::', 96, '::1', 'lo0', ['::1'], 1), |
| ('::1', 128, '::1', 'lo0', ['::1'], 1), |
| ('::ffff:0.0.0.0', 96, '::1', 'lo0', ['::1'], 1), |
| ('2002::', 24, '::1', 'lo0', ['::1'], 1), |
| ('2002:7f00::', 24, '::1', 'lo0', ['::1'], 1), |
| ('2002:e000::', 20, '::1', 'lo0', ['::1'], 1), |
| ('2002:ff00::', 24, '::1', 'lo0', ['::1'], 1), |
| ('fe80::', 10, '::1', 'lo0', ['::1'], 1), |
| ('fec0::', 10, '::1', 'lo0', ['::1'], 1), |
| ('fe80:3::1', 128, 'fe80:3::1', 'lo0', ['fe80:3::1'], 1), |
| ('ff01::', 16, '::1', 'lo0', ['::1'], 1), |
| ('ff01:3::', 32, 'fe80:3::1', 'lo0', ['fe80:3::1'], 1), |
| ('ff02::', 16, '::1', 'lo0', ['::1'], 1), |
| ('ff02:3::', 32, 'fe80:3::1', 'lo0', ['fe80:3::1'], 1) |
| ] |
| |
| = FreeBSD 14.1 amd64 - read_routes() |
| ~ mock_read_routes_bsd little_endian_only |
| |
| import zlib |
| |
| from scapy.arch.bpf.pfroute import _bsd_iff_flags |
| _FREEBSD_IFACES = {1: {'name': 'lo0', 'index': 1, 'flags': FlagValue(32841, _bsd_iff_flags), 'mac': '00:00:00:00:00:00', 'ips': [{'af_family': 28, 'index': 1, 'address': '::1', 'scope': 16}, {'af_family': 28, 'index': 1, 'address': 'fe80::1', 'scope': 32}, {'af_family': 2, 'index': 1, 'address': '127.0.0.1'}]}, 2: {'name': 'hn0', 'index': 2, 'flags': FlagValue(34883, _bsd_iff_flags), 'mac': '00:15:5d:00:65:07', 'ips': [{'af_family': 28, 'index': 2, 'address': 'fe80::215:5dff:fe00:6507', 'scope': 32}, {'af_family': 2, 'index': 2, 'address': '172.23.198.182'}]}} |
| |
| _PFROUTE_DATA = zlib.decompress(bytes.fromhex('789c136064656162606060e660603067200ceeb012a18808c008a55970c80b3061f2d7881f60c4256f21c4c4c0c6ccc6909167c0201acb90ca4ea43b20e61edb8670182b0bc8125606010663620c7020d2220280118d46072077d623490b0831324820c95b80f8cc0c0c39f90624d98b612e343d3002fd3f10e98109873c34fe117c507ca3c9ffffff01cea77a7ae01898f4c08c431edd9de8e141adf40000e8611aa8')) |
| |
| |
| with BSDLoader(FREEBSD=True, sysctldata=_PFROUTE_DATA, ifaces=_FREEBSD_IFACES, AF_INET6=28) as pfroute: |
| routes = pfroute.read_routes() |
| |
| assert routes == [ |
| (0, 0, '172.23.192.1', 'hn0', '172.23.198.182', 1), |
| (2130706433, 4294967295, '0.0.0.0', 'lo0', '127.0.0.1', 1), |
| (2887237632, 4294963200, '0.0.0.0', 'hn0', '172.23.198.182', 1), |
| (2887239350, 4294967295, '0.0.0.0', 'lo0', '127.0.0.1', 1), |
| (3758096384, 4026531840, '0.0.0.0', 'lo0', '127.0.0.1', 250), |
| (3758096384, 4026531840, '0.0.0.0', 'hn0', '172.23.198.182', 250) |
| ] |
| |
| = FreeBSD 14.1 amd64 - read_routes6() |
| ~ mock_read_routes_bsd little_endian_only |
| |
| _PFROUTE_DATA = zlib.decompress(bytes.fromhex('789ce5553b0e8240109d593e62b72121b1a0e00824165a72042e40696261f40a1ecd837816101236c28461872801e36bb678f3797979bb9ba3e722006c0380030890498aecc0f6f4193e8ec7fb191e295f75d02d3c86083b07e0724b457aa57b93d64f2fd0b0970ccc26ad6781e4a4b0e9d68d1f1d622e7ff29fc95b3f2f6bcddbdafd2cefe33c33f6ede763b87f2e3fb3d64f04bd889f0ec3737e9a3e7a7f691ee9bc4ffd233ad0e858fafd530c6fd3fdedf78fdbd3e44b813c5f4f6fd27a16663f378ecb97f15387aa77d7edf9aaeb1d1fced714a2024e1ba14eaa43454555d6ed46c7d2f97219dea69bfaf7afff41c55c50f9ff3adc3f979f2f44725d78')) |
| |
| |
| with BSDLoader(FREEBSD=True, sysctldata=_PFROUTE_DATA, ifaces=_FREEBSD_IFACES, AF_INET6=28) as pfroute: |
| routes = pfroute.read_routes6() |
| |
| assert routes == [ |
| ('::', 96, '::1', 'lo0', ['::1'], 1), |
| ('::1', 128, '::', 'lo0', ['::1'], 1), |
| ('::ffff:0.0.0.0', 96, '::1', 'lo0', ['::1'], 1), |
| ('fe80::', 10, '::1', 'lo0', ['::1'], 1), |
| ('fe80::', 64, '::', 'lo0', ['fe80::1'], 1), |
| ('fe80::1', 128, '::', 'lo0', ['fe80::1'], 1), |
| ('fe80::', 64, '::', 'hn0', ['fe80::215:5dff:fe00:6507'], 1), |
| ('fe80::215:5dff:fe00:6507', 128, '::', 'lo0', ['::1'], 1), |
| ('ff02::', 16, '::1', 'lo0', ['::1'], 1), |
| ('ff00::', 8, '::', 'lo0', ['::1', 'fe80::1'], 250), |
| ('ff00::', 8, '::', 'hn0', ['fe80::215:5dff:fe00:6507'], 250) |
| ] |
| |
| = NetBSD 10.0 amd64 - read_routes() |
| ~ mock_read_routes_bsd little_endian_only |
| |
| import zlib |
| |
| from scapy.arch.bpf.pfroute import _bsd_iff_flags |
| _NETBSD_IFACES = {1: {'name': 'hvn0', 'index': 1, 'flags': FlagValue(34883, _bsd_iff_flags), 'mac': '00:15:5d:00:65:0a', 'ips': [{'af_family': 24, 'index': 1, 'address': 'fe80:1::7184:2b50:9fbe:e337', 'scope': 32}, {'af_family': 2, 'index': 1, 'address': '172.23.207.191'}]}, 2: {'name': 'lo0', 'index': 2, 'flags': FlagValue(32841, _bsd_iff_flags), 'mac': '00:00:00:00:00:00', 'ips': [{'af_family': 2, 'index': 2, 'address': '127.0.0.1'}, {'af_family': 24, 'index': 2, 'address': '::1', 'scope': 16}, {'af_family': 24, 'index': 2, 'address': 'fe80:2::1', 'scope': 32}]}} |
| |
| _PFROUTE_DATA = zlib.decompress(bytes.fromhex('789c3bc1c0c2c2c8c0c0c00cc4e60ca8c08a91816640800993bf46fc00868d42428c0c6c2c6c0c196579060ca2b10ca95cc8eacfef87a93b00f407c8486e0e4c7f600311cd94b81ed5ddf5987cb83f58ff0301c85d424c0c12c040cec937c0aa6e07d4fdac0c2c0cc6f4773fdc1de8ee24e4ee0bd0f4c3c88819ee2c2cd471232e7703d30b9c0f4e2758d4b183c2ffff0792d311b1f1402d80ee0e5cfec1161fc8fa6640e383550592a769058cef5d4943e6a3e75f3eb0fb813e108d15fa5c404387e000001e173214')) |
| |
| |
| with BSDLoader(NETBSD=True, sysctldata=_PFROUTE_DATA, ifaces=_NETBSD_IFACES, AF_INET6=24) as pfroute: |
| routes = pfroute.read_routes() |
| |
| assert routes == [ |
| (0, 0, '172.23.192.1', 'hvn0', '172.23.207.191', 1), |
| (2130706432, 4294967040, '127.0.0.1', 'lo0', '127.0.0.1', 1), |
| (2130706433, 4294967295, '0.0.0.0', 'lo0', '127.0.0.1', 1), |
| (2887237632, 4294967295, '0.0.0.0', 'hvn0', '172.23.207.191', 1), |
| (2887241663, 4294967295, '0.0.0.0', 'lo0', '172.23.207.191', 1), |
| (2887237633, 4294967295, '0.0.0.0', 'hvn0', '', 1), |
| (3758096384, 4026531840, '0.0.0.0', 'hvn0', '172.23.207.191', 250), |
| (3758096384, 4026531840, '0.0.0.0', 'lo0', '127.0.0.1', 250) |
| ] |
| |
| = NetBSD 10.0 amd64 - read_routes6() |
| ~ mock_read_routes_bsd little_endian_only |
| |
| _PFROUTE_DATA = zlib.decompress(bytes.fromhex('789ced97b14ec3301445af4da8aa964a5544a50e1dd8592a31f01b8c2c1d2b31205017d65682ffe86f30213e8331123fd09109d3368a8127fbd9898c2c87de29ce4dac77749f1d0722cb24807e17b8845bd78f1e0f7968326ee48bea62a40cdadeefe712e323e0f67eea350f12e53f35e3d7e67f43c97f8c0c171e75ff31bfae8b72a49cebd2e1a3e57d5d387c38f837489b5f397cb43a7fa5787fafe0fbda07e2f29f89c133e713e9ba4f52e796bc4ff4bddfffc64e907bc9fa442de22e589fc8c0bd29c7c9712bd6276a4dde9f2bde27d275f72aead772dc847b3710c28f3b947e700b939fe7021dc3fd21f986ed9fcb3ab879b89b6234c3bc679e7ff1747eb57e79d78845cdf37928b9eab271db72b5cd53f5f3ce8c94abf18b65f1750fd07c196ee3fb75ffbb42c95597ef7f97edfdd8eb54897aeb949eb79aae53ddc79edca1f7e52d37dbc74441cf9b51f396ff346f1927ef830efa028ffb7c47')) |
| |
| |
| with BSDLoader(NETBSD=True, sysctldata=_PFROUTE_DATA, ifaces=_NETBSD_IFACES, AF_INET6=24) as pfroute: |
| routes = pfroute.read_routes6() |
| |
| assert routes == [ |
| ('::', 104, '::1', 'lo0', ['::1'], 1), |
| ('::', 96, '::1', 'lo0', ['::1'], 1), |
| ('::1', 128, '::', 'lo0', ['::1'], 1), |
| ('::127.0.0.0', 104, '::1', 'lo0', ['::1'], 1), |
| ('::224.0.0.0', 100, '::1', 'lo0', ['::1'], 1), |
| ('::255.0.0.0', 104, '::1', 'lo0', ['::1'], 1), |
| ('::ffff:0.0.0.0', 96, '::1', 'lo0', ['::1'], 1), |
| ('2001:db8::', 32, '::1', 'lo0', ['::1'], 1), |
| ('2002::', 24, '::1', 'lo0', ['::1'], 1), |
| ('2002:7f00::', 24, '::1', 'lo0', ['::1'], 1), |
| ('2002:e000::', 20, '::1', 'lo0', ['::1'], 1), |
| ('2002:ff00::', 24, '::1', 'lo0', ['::1'], 1), |
| ('fe80::', 10, '::1', 'lo0', ['::1'], 1), |
| ('fe80:1::', 64, '::', 'hvn0', ['fe80:1::7184:2b50:9fbe:e337'], 1), |
| ('fe80:1::7184:2b50:9fbe:e337', |
| 128, |
| '::', |
| 'lo0', |
| ['fe80:1::7184:2b50:9fbe:e337'], |
| 1), |
| ('fe80:2::', 64, 'fe80:2::1', 'lo0', ['fe80:2::1'], 1), |
| ('fe80:2::1', 128, '::', 'lo0', ['fe80:2::1'], 1), |
| ('ff01:1::', 32, '::', 'hvn0', ['fe80:1::7184:2b50:9fbe:e337'], 1), |
| ('ff01:2::', 32, '::1', 'lo0', ['::1'], 1), |
| ('ff02:1::', 32, '::', 'hvn0', ['fe80:1::7184:2b50:9fbe:e337'], 1), |
| ('ff02:2::', 32, '::1', 'lo0', ['::1'], 1), |
| ('ff00::', 8, '::', 'hvn0', ['fe80:1::7184:2b50:9fbe:e337'], 250), |
| ('ff00::', 8, '::', 'lo0', ['::1', 'fe80:2::1'], 250) |
| ] |
| |
| = Darwin 23.6 (MacOS 14.5) x86_64 - read_routes() |
| ~ mock_read_routes_bsd little_endian_only |
| |
| import zlib |
| |
| from scapy.arch.bpf.pfroute import _bsd_iff_flags |
| _DARWIN_IFACES = {1: {'name': 'lo0', 'index': 1, 'flags': FlagValue(32841, _bsd_iff_flags), 'mac': '00:00:00:00:00:00', 'ips': [{'af_family': 2, 'index': 1, 'address': '127.0.0.1'}, {'af_family': 30, 'index': 1, 'address': '::1', 'scope': 16}, {'af_family': 30, 'index': 1, 'address': 'fe80:1::1', 'scope': 32}]}, 2: {'name': 'gif0', 'index': 2, 'flags': FlagValue(32784, _bsd_iff_flags), 'mac': '00:00:00:00:00:00', 'ips': []}, 3: {'name': 'stf0', 'index': 3, 'flags': FlagValue(0, _bsd_iff_flags), 'mac': '00:00:00:00:00:00', 'ips': []}, 4: {'name': 'XHC2', 'index': 4, 'flags': FlagValue(0, _bsd_iff_flags), 'mac': '00:00:00:00:00:00', 'ips': []}, 5: {'name': 'en0', 'index': 5, 'flags': FlagValue(34915, _bsd_iff_flags), 'mac': '52:54:00:09:49:17', 'ips': [{'af_family': 30, 'index': 5, 'address': 'fe80:5::409:eec9:f06c:50ab', 'scope': 32}, {'af_family': 30, 'index': 5, 'address': 'fec0::89e:daf7:5cb1:f1f0', 'scope': 64}, {'af_family': 30, 'index': 5, 'address': 'fec0::c0c4:1f0b:61ba:ea8', 'scope': 64}, {'af_family': 2, 'index': 5, 'address': '10.0.2.15'}]}, 6: {'name': 'utun0', 'index': 6, 'flags': FlagValue(32849, _bsd_iff_flags), 'mac': '00:00:00:00:00:00', 'ips': [{'af_family': 30, 'index': 6, 'address': 'fe80:6::d36e:82de:94dc:84fc', 'scope': 32}]}, 7: {'name': 'utun1', 'index': 7, 'flags': FlagValue(32849, _bsd_iff_flags), 'mac': '00:00:00:00:00:00', 'ips': [{'af_family': 30, 'index': 7, 'address': 'fe80:7::7ce2:1f7b:2c29:a5ee', 'scope': 32}]}, 8: {'name': 'utun2', 'index': 8, 'flags': FlagValue(32849, _bsd_iff_flags), 'mac': '00:00:00:00:00:00', 'ips': [{'af_family': 30, 'index': 8, 'address': 'fe80:8::e4e0:bef:bf56:2605', 'scope': 32}]}, 9: {'name': 'utun3', 'index': 9, 'flags': FlagValue(32849, _bsd_iff_flags), 'mac': '00:00:00:00:00:00', 'ips': [{'af_family': 30, 'index': 9, 'address': 'fe80:9::ce81:b1c:bd2c:69e', 'scope': 32}]}} |
| |
| _PFROUTE_DATA = zlib.decompress(bytes.fromhex('789ccdd94d6813411400e0d9ddcc36e6608d4472a950a8e0d183e8a1a7da5a503008012948b11eaa78f0e6498b180a45c1802d08164f7ab3f4208817b1e84a0ff647ea0fd2832815a5e021018f518931dbce6c5e76df6cb2e36c3a0325bce936fbf5bdd96176669ad00c2584584963e060fd7382e0ed3315fc22a4ed3183718a98260df675f3b8c83c5dc41ceeab7f1acca6ca6366922f451ebf6596598c5d84b8b9f1fd3b01cb8bc58f17a35852e01b337b29b17dd774d5b65a4b97a1dee5c1305772db55f3bba6998b26cc7d6eedf2cc2872ed5ffe5f974df2671abd211eea7a4ce0d9403c59816703e963f7b2508f857b3a5037ef5e72751b34fa581fcf9305fe9ebb4a029785f4b17bd59a5d3661431bb5fbe700b76e2ae780eef2d4619f4f3807c43d1fa5b3e753da587ad7e7b4b11c583aad8d65fe50561bcbc29de7fa589e7c93b5a87ea6d30bcf6eca5a12c082cd77a2269aefd295d280ac45798d2aa541598b052c70edd3ca82ad93986548d612435e8ecb5a605e145986652deaf3f23ba19185c2b83d8b7d8caf61c2c6eedbf7f81a463876ab3d7fa25b62ca4bf5c81b8d2c6b1a59dee96339b9aa8f25b72c6b513ed755732bd12dc1671abe3b71cb9ae099c6deb3b62d97af47b7c455a3196dde03b2fd4e8f4696c72a2cd8781135d178a95bd655586093cfcbab823616e7fb2f590b7c0f505223a72cfd1e10836556d6a2be46e5872a2c2af27234f76053850536d9bc8c54c61fe96219bdf6e9be2e96b1f1a913ba582e5d397bbb5dcbddbac5bd3fdf631536a873d84f1b961bc1d81be6946d69fafb8bcc44492fe1f38cc7680ac24d4dd70a0cade2b035d529f0bdbc56b73ee06b2af7daa7be7abaf72ae6af5a30dec93da17b17266c184739eb1135d9bdf9b9bf8d18db9bb7d97e78a773e46c7e1983f14e3ee7af7fcc27dbb534ea5588e52ce52b88b17ab9cffa4fc4c573441393e02ca520744569cce5ed43ec6667290639b7d5dbe931dd38c18976def40fb15043e2')) |
| |
| |
| with BSDLoader(DARWIN=True, sysctldata=_PFROUTE_DATA, ifaces=_DARWIN_IFACES, AF_INET6=30) as pfroute: |
| routes = pfroute.read_routes() |
| |
| assert routes == [ |
| (0, 0, '10.0.2.2', 'en0', '10.0.2.15', 1), |
| (167772672, 4294967295, '0.0.0.0', 'en0', '10.0.2.15', 1), |
| (167772674, 4294967295, '0.0.0.0', 'en0', '10.0.2.15', 1), |
| (167772674, 4294967295, '0.0.0.0', 'en0', '10.0.2.15', 1), |
| (167772675, 4294967295, '0.0.0.0', 'en0', '10.0.2.15', 1), |
| (167772687, 4294967295, '0.0.0.0', 'en0', '10.0.2.15', 1), |
| (167772927, 4294967295, '0.0.0.0', 'en0', '10.0.2.15', 1), |
| (2130706432, 4294967040, '127.0.0.1', 'lo0', '127.0.0.1', 1), |
| (2130706433, 4294967295, '127.0.0.1', 'lo0', '127.0.0.1', 1), |
| (2851995648, 4294967295, '0.0.0.0', 'en0', '10.0.2.15', 1), |
| (3758096384, 4043308800, '0.0.0.0', 'en0', '10.0.2.15', 1), |
| (3758096635, 4294967295, '0.0.0.0', 'en0', '10.0.2.15', 1), |
| (4294967295, 4294967295, '0.0.0.0', 'en0', '10.0.2.15', 1) |
| ] |
| |
| = Darwin 23.6 (MacOS 14.5) x86_64 - read_routes6() |
| ~ mock_read_routes_bsd little_endian_only |
| |
| _PFROUTE_DATA = zlib.decompress(bytes.fromhex('789cd5dd5f8c13451800f0d96dbbdb5e8fbbdaebfd114eaf08148fdc830124045f0a1581981062f48c8698c32022313c1925c2c381f8a0a28290887f728644e0d0aa60ce80e60897887fe08120f060cc051030218af74713088a60b73b5bda6e77f73a33fbcd7dfbc27667d2fefaddb7dfcc6cb7a58f84122142488028e9e9b97f8f90cadb60c8a1c1656bbddbbbed6637297e6635e4d01e8c0c1d1b797ed9a7c67e5fceac99e6f9d35d5edf47b3567c5c73683fbd76d3d91d839b6f58667d0ce695fe99f5e2e3ba43fb860b6deb3bda770f59e6f018cc277597463e73b8f878d8a1fdd2f9e8f091ce54c83247c660be1cf0cd1c293e1e71683fb131da7ab843eb31f6f7e7cc4aeedf503049a6b801d2c2cc0a4fdb7e5a3374a2cdb7bc46fd28ef6c9d7f43a7ceacaad69b5416772d56c94c7a784e715ba59ae1562faaf5fec9ed55d6417a39e23b9b1ec6125fea858d2f87770e32ef5c19de1106efd4e06a920e8669894f0620bd2cf14d91ad64f2dbf308894df8f9a9f4f65d38bcab9079d722f36e42e67d1f99770099f72832ef6554ded4d3130999747c70188db70399772632ef1a64deadc8bcfdc8bcc79179cf48f18eb278833db96585d26e8a6a4aae31f8edbdc4e2d5ee25eaac7f546dfd475757e8e159905e96f57c4a5b5438b63a9eb880cbdb08eabdc2ea8d516f22f0072e6f10d4cb9c0f96b729b810973704ea1de6f64eedc2e59d06ea651a8f4bbcb33b7179ef17e455171a5ec5c35bcd56d93bef075cde07047961c68b6ca6a1458c1726bed94ce345315e98f1229b69fe1dd2cb5b1fb299d41a482fef7891cdcc3826c6eb73fe26cdfdd513b5cd62bcfe7dde52ead541bdccf95bf086f7e1f24640bdcce75bc15bb71797b71ed4cb7bbe65338bbe87f4f2c6379b79380de9e53ddf72de51482ff3fc61b2b9ffe633eb6a9079a390deeb2c5efdef652430ed3962ce226a21bd4ce75ba977022e6f12597c93a0f13df5138337a92c21d763f4110922f14e43e64d21f35ab7c0a2f0aae180e5ba0b99f76e245eeb56752cf1b5ee2c9f24c66baee7831ede6ab6626fcd68e1700c85f7aa68af8f9f1f27952bd17385c30b20bdd718bc334862595a0fd36748aa905e96f90ef5d2af582441c70b96f531f5d6522fe8fd041cf1ad93e165591f9779c3905e8ef8d22bc0b0e71b4b3da3de06195e8ef85a5e0dd2cb91bf2dd45b87c43b917ab1d433cb0b9abf2cd7a3cabca0d7cf583e8f2df382e6ef632c5ee560ba462d4c8041f3e1257e2fe87c6711bf17743c6e64f56a9657d929c6ebdfe7b1796fb8105fd0f1ed04c3fa38ef6db29e5201bd7f5280f72f48efbfacf9db34629140c76301de00a49769be53eabd538cd7e7fad06279c9f85fbf957a41f3e11cb357cefc813dbe72e60fccf537791f3d2aaafefa7cbea5909d6fb7bd3aa497793e999233dfb9c9ea9d5fd76d8a60f3418017b49eb5f27b41c70b01f105adbf02bc2164def17fbd2fe76de832eb426e038daf002fb6f8828e6f02bca0d74b047823c8bca0d753057841ef4714e005bd5f4e8017f47e39015ed0ebebcff27beb21bd02e20b7a7f8900ef1d905ea6fb4bd4d45c73964e9f1ed0cbf47961a917b49e317d7ea1a646b46e53bcfce53def427a99ee8729f57e8bccfb1ba497e97e8d9c771bad0b5db1eba0eb0b015ed0f585002fe8fa428017747d21c00bbabe6862f72e34f69b17c37ae3fcde6648af80f80ababf1a2cbe6d90de667e6f1299773232ef3dc8bc539079a7427a1f62f686bb2909f4f7f038bcafcbf03eceed6d7e02d2cb11df2d9484251fb6c9f072e4c376f3119a7cf89a9240e39b64f6d6bf623e8a7f07e97d92dd4bebc383a0df2fe4f06e91e1e5c887f7cc47b0f9c03e5f6fd8489f612912ef6b32bcacd727735e29f3878becde3764787f65f7be23c3cb910fdb917977caf0b2cf771a3e301fc1ce7738ceb70f2909cbf9d623c3cb91bfbb647839f2618f0c2fc7f946bdb0e71b47feeea5242cf9db2bc3cb11df7d32bc1cf3c94fe83380ce27c9695e6f3be8fd041cde2c32ef67c8bc9fcbf0728c6f5f5012687de0882ff5a2c9870332bc1ce3c5979484251ffacc4768f2e12b64de8332bc1cf5ec102561c95fea858def7fccf16da4d74b26fc88c4db23c3cb743fad9aba463e26247b7e45a8bc6d9c7bf5f2b671eead41e6ad45e6ad47e68d61f12a4d86f72d34f940bd75d0de192cdee0d205e6afbc1a9bfe22a497e9f7b90cef1aeb68f055486f1bb7577f81c73b90f31a5f3c5188a24c27a4f02514db667b0763f7e65ed7f6b40e6df9fdd8add2cdad6f2ff5878249650a8c3fbf9f882ba4a58afe87685e28b9401b71561d5e93e7772bcafef6c47486cc2ff8166d2ef1b5e5472f7587826aa311dfe3f43df8e8566fbb35f248272904cbcb599c078e5b9adf59fcba05e7a324b2a4d9bbbf71be197f0feb7cf3290fcaffe4b6b6d36b379ddd31b8f986b1ef920fb6be4071b6bd6e22aed992ceadbf11676332ed15e7957c71d6bdda365c685bdfd1be7bc8d87789b3ad2f509c6daf9b88eb6e71b6f537e26c7c01c52bce276d91aaca19f66abb743e3a7ca43395ff6bbac4d9d61728ceb6d74dc4c36e71b6f537e26c7c11c52bce97035cce8857db898dd1d6c31d5afe5a804b9c6d7d81e26c7bdd443ce216675bffa2719af8569f07ec6d02c7e9427c15fac68bdf8397bbd2fb7570f38ed3c4b73ca0ce70cf2fd7961f181d2971561aa72bf487740e1c6d8baef8a6ae77accee2fefdd6fc5de956acff7045b4f3964b5bd996cfb88895b01efdfa0ae79abb9de75cab64af74ae553257cadf7e6bfe066c769beb38d86dfdfaad3991879d674ee461b7cd1f1cecb67efdd63cc3c3ce33cff0b0dbc66407bbad5fbf35767bd879c66e0fbb6d9c73b0dbfa81d4970aeb49b7ba515b614cacd40fa4be28635b7357324bab2f4a75eb4307bb9cfaa254b7e672b0cba92f4a75eb1807bb9cfaa254b73670b0cba92f2ae2faa222ac2f2ae2faa222ae2f2ae2faa2fa535ffe079dfe8806')) |
| |
| |
| with BSDLoader(DARWIN=True, sysctldata=_PFROUTE_DATA, ifaces=_DARWIN_IFACES, AF_INET6=30) as pfroute: |
| routes = pfroute.read_routes6() |
| |
| assert routes == [ |
| ('::', 0, 'fe80:5::2', 'en0', ['fe80:5::409:eec9:f06c:50ab'], 1), |
| ('::', 0, 'fe80:6::', 'utun0', ['fe80:6::d36e:82de:94dc:84fc'], 1), |
| ('::', 0, 'fe80:7::', 'utun1', ['fe80:7::7ce2:1f7b:2c29:a5ee'], 1), |
| ('::', 0, 'fe80:8::', 'utun2', ['fe80:8::e4e0:bef:bf56:2605'], 1), |
| ('::', 0, 'fe80:9::', 'utun3', ['fe80:9::ce81:b1c:bd2c:69e'], 1), |
| ('::1', 128, '::1', 'lo0', ['::1'], 1), |
| ('fe80:1::', 64, 'fe80:1::1', 'lo0', ['fe80:1::1'], 1), |
| ('fe80:1::1', 128, '::', 'lo0', ['fe80:1::1'], 1), |
| ('fe80:5::', 64, '::', 'en0', ['fe80:5::409:eec9:f06c:50ab'], 1), |
| ('fe80:5::2', 128, '::', 'en0', ['fe80:5::409:eec9:f06c:50ab'], 1), |
| ('fe80:5::409:eec9:f06c:50ab', 128, '::', 'lo0', ['fe80:5::409:eec9:f06c:50ab'], 1), |
| ('fe80:6::', 64, 'fe80:6::d36e:82de:94dc:84fc', 'utun0', ['fe80:6::d36e:82de:94dc:84fc'], 1), |
| ('fe80:6::d36e:82de:94dc:84fc', 128, '::', 'lo0', ['fe80:6::d36e:82de:94dc:84fc'], 1), |
| ('fe80:7::', 64, 'fe80:7::7ce2:1f7b:2c29:a5ee', 'utun1', ['fe80:7::7ce2:1f7b:2c29:a5ee'], 1), |
| ('fe80:7::7ce2:1f7b:2c29:a5ee', 128, '::', 'lo0', ['fe80:7::7ce2:1f7b:2c29:a5ee'], 1), |
| ('fe80:8::', 64, 'fe80:8::e4e0:bef:bf56:2605', 'utun2', ['fe80:8::e4e0:bef:bf56:2605'], 1), |
| ('fe80:8::e4e0:bef:bf56:2605', 128, '::', 'lo0', ['fe80:8::e4e0:bef:bf56:2605'], 1), |
| ('fe80:9::', 64, 'fe80:9::ce81:b1c:bd2c:69e', 'utun3', ['fe80:9::ce81:b1c:bd2c:69e'], 1), |
| ('fe80:9::ce81:b1c:bd2c:69e', 128, '::', 'lo0', ['fe80:9::ce81:b1c:bd2c:69e'], 1), |
| ('fec0::', 64, '::', 'en0', ['fe80:5::409:eec9:f06c:50ab'], 1), |
| ('fec0::2', 128, '::', 'en0', ['fe80:5::409:eec9:f06c:50ab'], 1), |
| ('fec0::89e:daf7:5cb1:f1f0', 128, '::', 'lo0', ['fec0::89e:daf7:5cb1:f1f0'], 1), |
| ('fec0::c0c4:1f0b:61ba:ea8', 128, '::', 'lo0', ['fec0::c0c4:1f0b:61ba:ea8'], 1), |
| ('ff00::', 8, '::1', 'lo0', ['::1'], 1), |
| ('ff00::', 8, '::', 'en0', ['fe80:5::409:eec9:f06c:50ab'], 1), |
| ('ff00::', 8, 'fe80:6::d36e:82de:94dc:84fc', 'utun0', ['fe80:6::d36e:82de:94dc:84fc'], 1), |
| ('ff00::', 8, 'fe80:7::7ce2:1f7b:2c29:a5ee', 'utun1', ['fe80:7::7ce2:1f7b:2c29:a5ee'], 1), |
| ('ff00::', 8, 'fe80:8::e4e0:bef:bf56:2605', 'utun2', ['fe80:8::e4e0:bef:bf56:2605'], 1), |
| ('ff00::', 8, 'fe80:9::ce81:b1c:bd2c:69e', 'utun3', ['fe80:9::ce81:b1c:bd2c:69e'], 1), |
| ('ff01:1::', 32, '::1', 'lo0', ['::1'], 1), |
| ('ff01:5::', 32, '::', 'en0', ['fe80:5::409:eec9:f06c:50ab'], 1), |
| ('ff01:6::', 32, 'fe80:6::d36e:82de:94dc:84fc', 'utun0', ['fe80:6::d36e:82de:94dc:84fc'], 1), |
| ('ff01:7::', 32, 'fe80:7::7ce2:1f7b:2c29:a5ee', 'utun1', ['fe80:7::7ce2:1f7b:2c29:a5ee'], 1), |
| ('ff01:8::', 32, 'fe80:8::e4e0:bef:bf56:2605', 'utun2', ['fe80:8::e4e0:bef:bf56:2605'], 1), |
| ('ff01:9::', 32, 'fe80:9::ce81:b1c:bd2c:69e', 'utun3', ['fe80:9::ce81:b1c:bd2c:69e'], 1), |
| ('ff02:1::', 32, '::1', 'lo0', ['::1'], 1), |
| ('ff02:5::', 32, '::', 'en0', ['fe80:5::409:eec9:f06c:50ab'], 1), |
| ('ff02:6::', 32, 'fe80:6::d36e:82de:94dc:84fc', 'utun0', ['fe80:6::d36e:82de:94dc:84fc'], 1), |
| ('ff02:7::', 32, 'fe80:7::7ce2:1f7b:2c29:a5ee', 'utun1', ['fe80:7::7ce2:1f7b:2c29:a5ee'], 1), |
| ('ff02:8::', 32, 'fe80:8::e4e0:bef:bf56:2605', 'utun2', ['fe80:8::e4e0:bef:bf56:2605'], 1), |
| ('ff02:9::', 32, 'fe80:9::ce81:b1c:bd2c:69e', 'utun3', ['fe80:9::ce81:b1c:bd2c:69e'], 1) |
| ] |
| |
| ############ |
| ############ |
| + Mocked _parse_tcpreplay_result(stdout, stderr, argv, results_dict) |
| ~ mock_parse_tcpreplay_result |
| |
| = Test mocked _parse_tcpreplay_result |
| |
| from scapy.sendrecv import _parse_tcpreplay_result |
| |
| stdout = """Actual: 1024 packets (198929 bytes) sent in 67.88 seconds. |
| Rated: 2930.6 bps, 0.02 Mbps, 15.09 pps |
| Statistics for network device: mon0 |
| Attempted packets: 1024 |
| Successful packets: 1024 |
| Failed packets: 0 |
| Retried packets (ENOBUFS): 0 |
| Retried packets (EAGAIN): 0""" |
| |
| stderr = """Warning in sendpacket.c:sendpacket_open_pf() line 669: |
| Unsupported physical layer type 0x0323 on mon0. Maybe it works, maybe it won't. See tickets #123/318 |
| sending out mon0 |
| processing file: replay-example.pcap""" |
| |
| argv = ['tcpreplay', '--intf1=mon0', '--multiplier=1.00', '--timer=nano', 'replay-example.pcap'] |
| results_dict = _parse_tcpreplay_result(stdout, stderr, argv) |
| |
| results_dict |
| |
| assert results_dict["packets"] == 1024 |
| assert results_dict["bytes"] == 198929 |
| assert results_dict["time"] == 67.88 |
| assert results_dict["bps"] == 2930.6 |
| assert results_dict["mbps"] == 0.02 |
| assert results_dict["pps"] == 15.09 |
| assert results_dict["attempted"] == 1024 |
| assert results_dict["successful"] == 1024 |
| assert results_dict["failed"] == 0 |
| assert results_dict["retried_enobufs"] == 0 |
| assert results_dict["retried_eagain"] == 0 |
| assert results_dict["command"] == " ".join(argv) |
| assert len(results_dict["warnings"]) == 3 |
| |
| = Test more recent version with flows |
| |
| data = """Actual: 1 packets (42 bytes) sent in 0.000278 seconds |
| Rated: 151079.1 Bps, 1.20 Mbps, 3597.12 pps |
| Flows: 1 flows, 3597.12 fps, 1 flow packets, 0 non-flow |
| Statistics for network device: enp0s3 |
| Successful packets: 1 |
| Failed packets: 0 |
| Truncated packets: 0 |
| Retried packets (ENOBUFS): 0 |
| Retried packets (EAGAIN): 0 |
| """ |
| |
| results_dict = _parse_tcpreplay_result(data, "", []) |
| results_dict |
| |
| expected = { |
| 'bps': 151079.1, |
| 'bytes': 42, |
| 'command': '', |
| 'failed': 0, |
| 'flow_packets': 1, |
| 'flows': 1, |
| 'fps': 3597.12, |
| 'mbps': 1.2, |
| 'non_flow': 0, |
| 'packets': 1, |
| 'pps': 3597.12, |
| 'retried_eagain': 0, |
| 'retried_enobufs': 0, |
| 'successful': 1, |
| 'time': 0.000278, |
| 'truncated': 0, |
| 'warnings': [] |
| } |
| |
| assert results_dict == expected |
| |
| ############ |
| ############ |
| + Mocked route() calls |
| |
| = Mocked IPv4 routes calls |
| |
| import scapy |
| |
| conf.ifaces._add_fake_iface("enp3s0") |
| conf.ifaces._add_fake_iface("lo") |
| |
| old_iface = conf.iface |
| old_loopback = conf.loopback_name |
| try: |
| conf.iface = 'enp3s0' |
| conf.loopback_name = 'lo' |
| conf.route.invalidate_cache() |
| conf.route.routes = [ |
| (4294967295, 4294967295, '0.0.0.0', 'wlan0', '', 281), |
| (4294967295, 4294967295, '0.0.0.0', 'lo', '', 291), |
| (4294967295, 4294967295, '0.0.0.0', 'enp3s0', '192.168.0.119', 281), |
| (3758096384, 4026531840, '0.0.0.0', 'lo', '', 291), |
| (3758096384, 4026531840, '0.0.0.0', 'wlan0', '', 281), |
| (3758096384, 4026531840, '0.0.0.0', 'enp3s0', '1.1.1.1', 281), |
| (3232235775, 4294967295, '0.0.0.0', 'enp3s0', '2.2.2.2', 281), |
| (3232235639, 4294967295, '0.0.0.0', 'enp3s0', '3.3.3.3', 281), |
| (3232235520, 4294967040, '0.0.0.0', 'enp3s0', '4.4.4.4', 281), |
| (0, 0, '192.168.0.254', 'enp3s0', '192.168.0.119', 25) |
| ] |
| assert conf.route.route("192.168.0.0-10") == ('enp3s0', '4.4.4.4', '0.0.0.0') |
| assert conf.route.route("192.168.0.119") == ('lo', '192.168.0.119', '0.0.0.0') |
| assert conf.route.route("224.0.0.0") == ('enp3s0', '1.1.1.1', '0.0.0.0') |
| assert conf.route.route("255.255.255.255") == ('enp3s0', '192.168.0.119', '0.0.0.0') |
| assert conf.route.route("*") == ('enp3s0', '192.168.0.119', '192.168.0.254') |
| finally: |
| conf.loopback_name = old_loopback |
| conf.iface = old_iface |
| conf.route.resync() |
| conf.ifaces.reload() |
| |
| |
| = Mocked IPv6 routes calls |
| |
| conf.ifaces._add_fake_iface("enp3s0") |
| conf.ifaces._add_fake_iface("lo") |
| |
| old_iface = conf.iface |
| old_loopback = conf.loopback_name |
| try: |
| conf.route6.ipv6_ifaces = set(['enp3s0', 'wlan0', 'lo']) |
| conf.iface = 'enp3s0' |
| conf.loopback_name = 'lo' |
| conf.route6.invalidate_cache() |
| conf.route6.routes = [ |
| ('fe80::dd17:1fa6:a123:ab4', 128, '::', 'lo', ['fe80::dd17:1fa6:a123:ab4'], 291), |
| ('fe80::7101:5678:1234:da65', 128, '::', 'enp3s0', ['fe80::7101:5678:1234:da65'], 281), |
| ('fe80::1f:ae12:4d2c:abff', 128, '::', 'wlan0', ['fe80::1f:ae12:4d2c:abff'], 281), |
| ('fe80::', 64, '::', 'wlan0', ['fe80::1f:ae12:4d2c:abff'], 281), |
| ('fe80::', 64, '::', 'lo', ['fe80::dd17:1fa6:a123:ab4'], 291), |
| ('fe80::', 64, '::', 'enp3s0', ['fe80::7101:5678:1234:da65'], 281), |
| ('2a01:e35:1e06:ab56:7010:6548:9646:fa77', 128, '::', 'enp3s0', ['2a01:e35:1e06:ab56:7010:6548:9646:fa77', '2a01:e35:1e06:ab56:512:8bb7:8ab8:14a8'], 281), |
| ('2a01:e35:1e06:ab56:512:8bb7:8ab8:14a8', 128, '::', 'enp3s0', ['2a01:e35:1e06:ab56:7010:6548:9646:fa77', '2a01:e35:1e06:ab56:512:8bb7:8ab8:14a8'], 281), |
| ('2a01:e35:1e06:ab56::', 64, '::', 'enp3s0', ['2a01:e35:1e06:ab56:7010:6548:9646:fa77', '2a01:e35:1e06:ab56:512:8bb7:8ab8:14a8'], 281), |
| ('::', 0, 'fe80::160c:64aa:ef6f:fe14', 'enp3s0', ['2a01:e35:1e06:ab56:7010:6548:9646:fa77', '2a01:e35:1e06:ab56:512:8bb7:8ab8:14a8'], 281) |
| ] |
| assert conf.route6.route("2a01:e35:1e06:ab56:512:8bb7:8ab8:14a8") == ('enp3s0', '2a01:e35:1e06:ab56:7010:6548:9646:fa77', '::') |
| assert conf.route6.route("::1") == ('enp3s0', '2a01:e35:1e06:ab56:7010:6548:9646:fa77', 'fe80::160c:64aa:ef6f:fe14') |
| assert conf.route6.route("ff02::1") == ('enp3s0', 'fe80::7101:5678:1234:da65', '::') |
| assert conf.route6.route("fe80::1") == ('enp3s0', 'fe80::7101:5678:1234:da65', '::') |
| assert conf.route6.route("fe80::1", dev='lo') == ('lo', 'fe80::dd17:1fa6:a123:ab4', '::') |
| finally: |
| conf.loopback_name = old_loopback |
| conf.iface = old_iface |
| conf.route6.resync() |
| conf.ifaces.reload() |
| |
| = Windows: reset routes properly |
| |
| if WINDOWS: |
| from scapy.arch.windows import _route_add_loopback |
| _route_add_loopback() |
| |
| |
| ############ |
| ############ |
| ############ |
| + Tests of StreamSocket |
| |
| = Test with DNS over TCP |
| ~ netaccess |
| |
| import socket |
| sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| sck.connect(("8.8.8.8", 53)) |
| |
| class DNSTCP(Packet): |
| name = "DNS over TCP" |
| fields_desc = [ FieldLenField("len", None, fmt="!H", length_of="dns"), |
| PacketLenField("dns", 0, DNS, length_from=lambda p: p.len)] |
| |
| ssck = StreamSocket(sck, DNSTCP) |
| |
| r = ssck.sr1(DNSTCP(dns=DNS(rd=1, qd=DNSQR(qname="www.example.com"))), timeout=3) |
| sck.close() |
| assert DNSTCP in r and len(r.dns.an) |
| |
| ############ |
| + Tests of SSLStreamContext |
| |
| = Test with recv() calls that return exact packet-length rawings |
| ~ sslraweamsocket |
| |
| import socket |
| class MockSocket(object): |
| def __init__(self): |
| self.l = [ b'\x00\x00\x00\x01', b'\x00\x00\x00\x02', b'\x00\x00\x00\x03' ] |
| def recv(self, x): |
| if len(self.l) == 0: |
| return b"" |
| return self.l.pop(0) |
| def fileno(self): |
| return -1 |
| def close(self): |
| return |
| |
| |
| class TestPacket(Packet): |
| name = 'TestPacket' |
| fields_desc = [ |
| IntField('data', 0) |
| ] |
| def guess_payload_class(self, p): |
| return conf.padding_layer |
| |
| s = MockSocket() |
| ss = SSLStreamSocket(s, basecls=TestPacket) |
| |
| p = ss.recv() |
| assert p.data == 1 |
| p = ss.recv() |
| assert p.data == 2 |
| p = ss.recv() |
| assert p.data == 3 |
| try: |
| ss.recv() |
| ret = False |
| except EOFError: |
| ret = True |
| |
| assert ret |
| |
| = Test with recv() calls that return twice as much data as the exact packet-length |
| ~ sslraweamsocket |
| |
| import socket |
| class MockSocket(object): |
| def __init__(self): |
| self.l = [ b'\x00\x00\x00\x01\x00\x00\x00\x02', b'\x00\x00\x00\x03\x00\x00\x00\x04' ] |
| def recv(self, x): |
| if len(self.l) == 0: |
| return b"" |
| return self.l.pop(0) |
| def fileno(self): |
| return -1 |
| def close(self): |
| return |
| |
| |
| class TestPacket(Packet): |
| name = 'TestPacket' |
| fields_desc = [ |
| IntField('data', 0) |
| ] |
| def guess_payload_class(self, p): |
| return conf.padding_layer |
| |
| s = MockSocket() |
| ss = SSLStreamSocket(s, basecls=TestPacket) |
| |
| p = ss.recv() |
| assert p.data == 1 |
| p = ss.recv() |
| assert p.data == 2 |
| p = ss.recv() |
| assert p.data == 3 |
| p = ss.recv() |
| assert p.data == 4 |
| try: |
| ss.recv() |
| ret = False |
| except EOFError: |
| ret = True |
| |
| assert ret |
| |
| = Test with recv() calls that return not enough data |
| ~ sslraweamsocket |
| |
| import socket |
| class MockSocket(object): |
| def __init__(self): |
| self.l = [ b'\x00\x00', b'\x00\x01', b'\x00\x00\x00', b'\x02', b'\x00\x00', b'\x00', b'\x03' ] |
| def recv(self, x): |
| if len(self.l) == 0: |
| return b"" |
| return self.l.pop(0) |
| def fileno(self): |
| return -1 |
| def close(self): |
| return |
| |
| |
| class TestPacket(Packet): |
| name = 'TestPacket' |
| fields_desc = [ |
| IntField('data', 0) |
| ] |
| def guess_payload_class(self, p): |
| return conf.padding_layer |
| |
| s = MockSocket() |
| ss = SSLStreamSocket(s, basecls=TestPacket) |
| |
| p = ss.recv() |
| assert p.data == 1 |
| |
| p = ss.recv() |
| assert p.data == 2 |
| |
| p = ss.recv() |
| assert p.data == 3 |
| |
| try: |
| ss.recv() |
| ret = False |
| except EOFError: |
| ret = True |
| |
| assert ret |
| |
| |
| ############ |
| ############ |
| + Test correct conversion from binary to rawing of IPv6 addresses |
| |
| = IPv6 bin to rawing conversion |
| from scapy.pton_ntop import _inet6_ntop, inet_ntop |
| import socket |
| for binfrm, address in [ |
| (b'\x00' * 16, '::'), |
| (b'\x11\x11\x22\x22\x33\x33\x44\x44\x55\x55\x66\x66\x77\x77\x88\x88', |
| '1111:2222:3333:4444:5555:6666:7777:8888'), |
| (b'\x11\x11\x22\x22\x33\x33\x44\x44\x55\x55\x00\x00\x00\x00\x00\x00', |
| '1111:2222:3333:4444:5555::'), |
| (b'\x00\x00\x00\x00\x00\x00\x44\x44\x55\x55\x66\x66\x77\x77\x88\x88', |
| '::4444:5555:6666:7777:8888'), |
| (b'\x00\x00\x00\x00\x33\x33\x44\x44\x00\x00\x00\x00\x00\x00\x88\x88', |
| '0:0:3333:4444::8888'), |
| (b'\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00', |
| '1::'), |
| (b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01', |
| '::1'), |
| (b'\x11\x11\x00\x00\x00\x00\x44\x44\x00\x00\x00\x00\x77\x77\x88\x88', |
| '1111::4444:0:0:7777:8888'), |
| (b'\x10\x00\x02\x00\x00\x30\x00\x04\x00\x05\x00\x60\x07\x00\x80\x00', |
| '1000:200:30:4:5:60:700:8000'), |
| ]: |
| addr1 = inet_ntop(socket.AF_INET6, binfrm) |
| addr2 = _inet6_ntop(binfrm) |
| assert address == addr1 == addr2 |
| |
| = IPv6 bin to rawing conversion - Zero-block of length 1 |
| binfrm = b'\x11\x11\x22\x22\x33\x33\x44\x44\x55\x55\x66\x66\x00\x00\x88\x88' |
| addr1, addr2 = inet_ntop(socket.AF_INET6, binfrm), _inet6_ntop(binfrm) |
| # On Mac OS socket.inet_ntop is not fully compliant with RFC 5952 and |
| # shortens the single zero block to '::'. This is a valid IPv6 address |
| # representation anyway. |
| assert(addr1 in ['1111:2222:3333:4444:5555:6666:0:8888', |
| '1111:2222:3333:4444:5555:6666::8888']) |
| assert addr2 == '1111:2222:3333:4444:5555:6666:0:8888' |
| |
| = IPv6 bin to rawing conversion - Illegal sizes |
| for binfrm in ["\x00" * 15, b"\x00" * 17]: |
| rc = False |
| try: |
| inet_ntop(socket.AF_INET6, binfrm) |
| except Exception as exc1: |
| _exc1 = exc1 |
| rc = True |
| assert rc |
| try: |
| _inet6_ntop(binfrm) |
| except Exception as exc2: |
| rc = isinstance(exc2, type(_exc1)) |
| assert rc |
| |
| |
| ############ |
| ############ |
| + Addresses generators |
| |
| = Net |
| |
| assert list(Net("192.168.0.0/31")) == ["192.168.0.0", "192.168.0.1"] |
| |
| assert "1.2.3.4" in Net("0.0.0.0/0") |
| |
| assert "192.168.0.0/25" in Net("192.168.0.0/24") |
| |
| assert "192.168.0.0/23" not in Net("192.168.0.0/24") |
| |
| assert "0.0.0.0/1" in Net("0.0.0.0/0") |
| |
| assert "0.0.0.0/0" not in Net("0.0.0.0/1") |
| |
| assert Net("1.2.3.0/24") == Net("1.2.3.0", "1.2.3.255") |
| |
| assert hash(Net("1.2.3.0/24")) == hash(Net("1.2.3.0", "1.2.3.255")) |
| |
| = Net using name |
| ~ netaccess |
| |
| ip = IP(dst="www.google.com") |
| n1 = ip.dst |
| assert isinstance(n1, Net) |
| ip.show() |
| |
| = Net using implicit format in IP |
| |
| assert len(list(IP(dst=("192.168.0.100", "192.168.0.199")))) == 100 |
| |
| = Multiple IP addresses test |
| ~ netaccess |
| |
| ip = IP(dst=['192.168.0.1', 'www.google.fr'],ihl=(1,5)) |
| assert ip.dst[0] == '192.168.0.1' |
| assert isinstance(ip.dst[1], Net) |
| src = ip.src |
| assert src |
| assert isinstance(src, str) |
| |
| = OID |
| |
| oid = OID("1.2.3.4.5.6-8") |
| sum(1 for o in oid) == 3 |
| assert oid.__iterlen__() == 3 |
| |
| = Net6 |
| |
| n1 = Net6("2001:db8::/127") |
| assert len(list(n1)) == 2 |
| assert len(n1) == 2 |
| |
| n2 = Net6("fec0::/110") |
| assert len(n2) == 262144 |
| |
| assert "ffff::ffff" in Net6("::/0") |
| |
| assert "::/1" in Net6("::/0") |
| |
| assert "::/0" not in Net6("::/1") |
| |
| assert Net6("::/120") == Net6("::", "::ff") |
| |
| assert hash(Net6("::/120")) == hash(Net6("::", "::ff")) |
| |
| assert Net6("::1.2.3.0/120") == Net6("::1.2.3.0", "::1.2.3.255") |
| |
| assert hash(Net6("::1.2.3.0/120")) == hash(Net6("::1.2.3.0", "::1.2.3.255")) |
| |
| assert Net6("::1.2.3.0/120") != Net("1.2.3.0/24") |
| |
| assert hash(Net6("::1.2.3.0/120")) != hash(Net("1.2.3.0/24")) |
| |
| = Net6 using web address |
| ~ netaccess ipv6 |
| |
| ip = IPv6(dst="www.google.com") |
| n1 = ip.dst |
| assert isinstance(n1, Net6) |
| assert "www.google.com" in repr(n1) |
| ip.show() |
| |
| ip = IPv6(dst="www.yahoo.com") |
| assert IPv6(raw(ip)).dst == [p.dst for p in ip][0] |
| |
| = Net6 using implicit format in IPv6 |
| |
| assert len(list(IPv6(dst=("fe80::1", "fe80::1f")))) == 31 |
| |
| = Multiple IPv6 addresses test |
| ~ netaccess ipv6 |
| |
| ip = IPv6(dst=['2001:db8::1', 'www.google.fr'],hlim=(1,5)) |
| assert ip.dst[0] == '2001:db8::1' |
| assert isinstance(ip.dst[1], Net6) |
| src = ip.src |
| assert src |
| assert isinstance(src, str) |
| |
| = Test repr on Net |
| ~ netaccess |
| |
| conf.color_theme = BlackAndWhite() |
| output = repr(IP(src="www.google.com")) |
| assert 'Net("www.google.com/32")' in output |
| |
| = Test repr on Net |
| ~ netaccess ipv6 |
| |
| conf.color_theme = BlackAndWhite() |
| assert 'Net6("www.google.com/128")' in repr(IPv6(src="www.google.com")) |
| |
| ############ |
| ############ |
| + IPv6 helpers |
| |
| = in6_getLocalUniquePrefix() |
| |
| p = in6_getLocalUniquePrefix() |
| len(inet_pton(socket.AF_INET6, p)) == 16 and p.startswith("fd") |
| |
| = Misc addresses manipulation functions |
| |
| teredoAddrExtractInfo("2001:0:0a0b:0c0d:0028:f508:f508:08f5") == ("10.11.12.13", 40, "10.247.247.10", 2807) |
| |
| ip6 = IP6Field("test", None) |
| ip6.i2repr("", "2001:0:0a0b:0c0d:0028:f508:f508:08f5") == "2001:0:0a0b:0c0d:0028:f508:f508:08f5 [Teredo srv: 10.11.12.13 cli: 10.247.247.10:2807]" |
| ip6.i2repr("", "2002:0102:0304::1") == "2002:0102:0304::1 [6to4 GW: 1.2.3.4]" |
| |
| in6_iseui64("fe80::bae8:58ff:fed4:e5f6") == True |
| |
| in6_isanycast("2001:db8::fdff:ffff:ffff:ff80") == True |
| |
| a = inet_pton(socket.AF_INET6, "2001:db8::2807") |
| in6_xor(a, a) == b"\x00" * 16 |
| |
| a = inet_pton(socket.AF_INET6, "fe80::bae8:58ff:fed4:e5f6") |
| r = inet_ntop(socket.AF_INET6, in6_getnsma(a)) |
| r == "ff02::1:ffd4:e5f6" |
| |
| in6_isllsnmaddr(r) == True |
| |
| in6_isdocaddr("2001:db8::2807") == True |
| |
| in6_isaddrllallnodes("ff02::1") == True |
| |
| in6_isaddrllallservers("ff02::2") == True |
| |
| = in6_getscope() |
| |
| assert in6_getscope("2001:db8::2807") == IPV6_ADDR_GLOBAL |
| assert in6_getscope("fec0::2807") == IPV6_ADDR_SITELOCAL |
| assert in6_getscope("fe80::2807") == IPV6_ADDR_LINKLOCAL |
| assert in6_getscope("ff02::2807") == IPV6_ADDR_LINKLOCAL |
| assert in6_getscope("ff0e::2807") == IPV6_ADDR_GLOBAL |
| assert in6_getscope("ff05::2807") == IPV6_ADDR_SITELOCAL |
| assert in6_getscope("ff01::2807") == IPV6_ADDR_LOOPBACK |
| assert in6_getscope("::1") == IPV6_ADDR_LOOPBACK |
| |
| = construct_source_candidate_set() |
| |
| dev_addresses = [('fe80::', IPV6_ADDR_LINKLOCAL, "linklocal"),('fec0::', IPV6_ADDR_SITELOCAL, "sitelocal"),('ff0e::', IPV6_ADDR_GLOBAL, "global")] |
| |
| assert construct_source_candidate_set("2001:db8::2807", 0, dev_addresses) == ["ff0e::"] |
| assert construct_source_candidate_set("fec0::2807", 0, dev_addresses) == ["fec0::"] |
| assert construct_source_candidate_set("fe80::2807", 0, dev_addresses) == ["fe80::"] |
| assert construct_source_candidate_set("ff02::2807", 0, dev_addresses) == ["fe80::"] |
| assert construct_source_candidate_set("ff0e::2807", 0, dev_addresses) == ["ff0e::"] |
| assert construct_source_candidate_set("ff05::2807", 0, dev_addresses) == ["fec0::"] |
| assert construct_source_candidate_set("ff01::2807", 0, dev_addresses) == ["::1"] |
| assert construct_source_candidate_set("::", 0, dev_addresses) == ["ff0e::"] |
| |
| = inet_pton() |
| |
| from scapy.pton_ntop import _inet6_pton, inet_pton |
| import socket |
| |
| ip6_bad_addrs = ["fe80::2e67:ef2d:7eca::ed8a", |
| "fe80:1234:abcd::192.168.40.12:abcd", |
| "fe80:1234:abcd::192.168.40", |
| "fe80:1234:abcd::192.168.400.12", |
| "1234:5678:9abc:def0:1234:5678:9abc:def0:", |
| "1234:5678:9abc:def0:1234:5678:9abc:def0:1234"] |
| for ip6 in ip6_bad_addrs: |
| rc = False |
| exc1 = None |
| try: |
| res1 = inet_pton(socket.AF_INET6, ip6) |
| except Exception as e: |
| rc = True |
| exc1 = e |
| assert rc |
| rc = False |
| try: |
| res2 = _inet6_pton(ip6) |
| except Exception as exc2: |
| rc = isinstance(exc2, type(exc1)) |
| assert rc |
| |
| ip6_good_addrs = [("fe80:1234:abcd::192.168.40.12", |
| b'\xfe\x80\x124\xab\xcd\x00\x00\x00\x00\x00\x00\xc0\xa8(\x0c'), |
| ("fe80:1234:abcd::fe06", |
| b'\xfe\x80\x124\xab\xcd\x00\x00\x00\x00\x00\x00\x00\x00\xfe\x06'), |
| ("fe80::2e67:ef2d:7ece:ed8a", |
| b'\xfe\x80\x00\x00\x00\x00\x00\x00.g\xef-~\xce\xed\x8a'), |
| ("::ffff", |
| b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff'), |
| ("ffff::", |
| b'\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'), |
| ('::', b'\x00' * 16)] |
| for ip6, res in ip6_good_addrs: |
| res1 = inet_pton(socket.AF_INET6, ip6) |
| res2 = _inet6_pton(ip6) |
| assert res == res1 == res2 |
| |
| |
| ############ |
| ############ |
| + Test Route class |
| |
| = make_route() |
| |
| r4 = Route() |
| tmp_route = r4.make_route(host="10.12.13.14") |
| (tmp_route[0], tmp_route[1], tmp_route[2]) == (168561934, 4294967295, '0.0.0.0') |
| |
| tmp_route = r4.make_route(net="10.12.13.0/24") |
| (tmp_route[0], tmp_route[1], tmp_route[2]) == (168561920, 4294967040, '0.0.0.0') |
| |
| = add() & delt() |
| |
| r4 = Route() |
| len_r4 = len(r4.routes) |
| r4.add(net="192.168.1.0/24", gw="1.2.3.4") |
| len(r4.routes) == len_r4 + 1 |
| r4.delt(net="192.168.1.0/24", gw="1.2.3.4") |
| len(r4.routes) == len_r4 |
| |
| = ifchange() |
| |
| r4.add(net="192.168.1.0/24", gw="1.2.3.4", dev=get_dummy_interface()) |
| r4.ifchange(get_dummy_interface(), "5.6.7.8") |
| r4.routes[-1][4] == "5.6.7.8" |
| |
| = ifdel() |
| |
| r4.ifdel(get_dummy_interface()) |
| len(r4.routes) == len_r4 |
| |
| = ifadd() & get_if_bcast() |
| |
| r4 = Route() |
| len_r4 = len(r4.routes) |
| |
| r4.ifadd(get_dummy_interface(), "1.2.3.4/24") |
| len(r4.routes) == len_r4 +1 |
| |
| r4.get_if_bcast(get_dummy_interface()) == "1.2.3.255" |
| |
| r4.ifdel(get_dummy_interface()) |
| len(r4.routes) == len_r4 |
| |
| dummy_interface = get_dummy_interface() |
| |
| bck_conf_route_routes = conf.route.routes |
| conf.route.routes = [ |
| (0, 0, '172.21.230.1', dummy_interface, '172.21.230.10', 1), # 0.0.0.0 / 0.0.0.0 == 255.255.255.255 |
| (2851995648, 4294901760, '0.0.0.0', dummy_interface, '172.21.230.10', 1), # 169.254.0.0 / 255.255.0.0 == 169.254.255.255 |
| (2887116288, 4294967040, '0.0.0.0', dummy_interface, '172.21.230.10', 1), # 172.21.230.0 / 255.255.255.0 == 172.21.230.255 |
| (2887116289, 4294967295, '0.0.0.0', dummy_interface, '172.21.230.10', 1), # 172.21.230.1 / 255.255.255.255 == 172.21.230.1 |
| (3758096384, 4026531840, '0.0.0.0', dummy_interface, '172.21.230.10', 1), # 224.0.0.0 / 240.0.0.0 == 239.255.255.255 |
| (3758096635, 4294967295, '0.0.0.0', dummy_interface, '172.21.230.10', 1), # 224.0.0.251 / 255.255.255.255 == 224.0.0.251 |
| (4294967295, 4294967295, '0.0.0.0', dummy_interface, '172.21.230.10', 1), # 255.255.255.255 / 255.255.255.255 == 255.255.255.255 |
| ] |
| |
| assert sorted(conf.route.get_if_bcast(dummy_interface)) == sorted(['169.254.255.255', '172.21.230.255', '239.255.255.255']) |
| conf.route.routes = bck_conf_route_routes |
| |
| = Remove dummy interface |
| |
| conf.ifaces.reload() |
| |
| ############ |
| ############ |
| + Flags |
| |
| = IP flags |
| ~ IP |
| |
| pkt = IP(flags="MF") |
| assert pkt.flags.MF |
| assert not pkt.flags.DF |
| assert not pkt.flags.evil |
| assert repr(pkt.flags) == '<Flag 1 (MF)>' |
| pkt.flags.MF = 0 |
| pkt.flags.DF = 1 |
| assert not pkt.flags.MF |
| assert pkt.flags.DF |
| assert not pkt.flags.evil |
| assert repr(pkt.flags) == '<Flag 2 (DF)>' |
| pkt.flags |= 'evil+MF' |
| pkt.flags &= 'DF+MF' |
| assert pkt.flags.MF |
| assert pkt.flags.DF |
| assert not pkt.flags.evil |
| assert repr(pkt.flags) == '<Flag 3 (MF+DF)>' |
| |
| pkt = IP(flags=3) |
| assert pkt.flags.MF |
| assert pkt.flags.DF |
| assert not pkt.flags.evil |
| assert repr(pkt.flags) == '<Flag 3 (MF+DF)>' |
| pkt.flags = 6 |
| assert not pkt.flags.MF |
| assert pkt.flags.DF |
| assert pkt.flags.evil |
| assert repr(pkt.flags) == '<Flag 6 (DF+evil)>' |
| |
| assert len({IP().flags, IP().flags}) == 1 |
| |
| pkt = IP() |
| pkt.flags = "" |
| assert pkt.flags == 0 |
| |
| = TCP flags |
| ~ TCP |
| |
| pkt = TCP(flags="SA") |
| assert pkt.flags == 18 |
| assert pkt.flags.S |
| assert pkt.flags.A |
| assert pkt.flags.SA |
| assert not any(getattr(pkt.flags, f) for f in 'FRPUECN') |
| assert repr(pkt.flags) == '<Flag 18 (SA)>' |
| pkt.flags.U = True |
| pkt.flags.S = False |
| assert pkt.flags.A |
| assert pkt.flags.U |
| assert pkt.flags.AU |
| assert not any(getattr(pkt.flags, f) for f in 'FSRPECN') |
| assert repr(pkt.flags) == '<Flag 48 (AU)>' |
| pkt.flags &= 'SFA' |
| pkt.flags |= 'P' |
| assert pkt.flags.P |
| assert pkt.flags.A |
| assert pkt.flags.PA |
| assert not any(getattr(pkt.flags, f) for f in 'FSRUECN') |
| |
| pkt = TCP(flags=56) |
| assert all(getattr(pkt.flags, f) for f in 'PAU') |
| assert pkt.flags.PAU |
| assert not any(getattr(pkt.flags, f) for f in 'FSRECN') |
| assert repr(pkt.flags) == '<Flag 56 (PAU)>' |
| pkt.flags = 50 |
| assert all(getattr(pkt.flags, f) for f in 'SAU') |
| assert pkt.flags.SAU |
| assert not any(getattr(pkt.flags, f) for f in 'FRPECN') |
| assert repr(pkt.flags) == '<Flag 50 (SAU)>' |
| |
| = Flag values mutation with .raw_packet_cache |
| ~ IP TCP |
| |
| pkt = IP(raw(IP(flags="MF")/TCP(flags="SA"))) |
| assert pkt.raw_packet_cache is not None |
| assert pkt[TCP].raw_packet_cache is not None |
| assert pkt.flags.MF |
| assert not pkt.flags.DF |
| assert not pkt.flags.evil |
| assert repr(pkt.flags) == '<Flag 1 (MF)>' |
| assert pkt[TCP].flags.S |
| assert pkt[TCP].flags.A |
| assert pkt[TCP].flags.SA |
| assert not any(getattr(pkt[TCP].flags, f) for f in 'FRPUECN') |
| assert repr(pkt[TCP].flags) == '<Flag 18 (SA)>' |
| pkt.flags.MF = 0 |
| pkt.flags.DF = 1 |
| pkt[TCP].flags.U = True |
| pkt[TCP].flags.S = False |
| pkt = IP(raw(pkt)) |
| assert not pkt.flags.MF |
| assert pkt.flags.DF |
| assert not pkt.flags.evil |
| assert repr(pkt.flags) == '<Flag 2 (DF)>' |
| assert pkt[TCP].flags.A |
| assert pkt[TCP].flags.U |
| assert pkt[TCP].flags.AU |
| assert not any(getattr(pkt[TCP].flags, f) for f in 'FSRPECN') |
| assert repr(pkt[TCP].flags) == '<Flag 48 (AU)>' |
| |
| = Operations on flag values |
| ~ TCP |
| |
| p1, p2 = TCP(flags="SU"), TCP(flags="AU") |
| assert (p1.flags & p2.flags).U |
| assert not any(getattr(p1.flags & p2.flags, f) for f in 'FSRPAECN') |
| assert all(getattr(p1.flags | p2.flags, f) for f in 'SAU') |
| assert (p1.flags | p2.flags).SAU |
| assert not any(getattr(p1.flags | p2.flags, f) for f in 'FRPECN') |
| |
| assert TCP(flags="SA").flags & TCP(flags="S").flags == TCP(flags="S").flags |
| assert TCP(flags="SA").flags | TCP(flags="S").flags == TCP(flags="SA").flags |
| |
| |
| ############ |
| ############ |
| + 802.3 |
| |
| = Test detection |
| |
| assert isinstance(Dot3(raw(Ether())),Ether) |
| assert isinstance(Ether(raw(Dot3())),Dot3) |
| |
| a = Ether(b'\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00') |
| assert isinstance(a,Dot3) |
| assert a.dst == 'ff:ff:ff:ff:ff:ff' |
| assert a.src == '00:00:00:00:00:00' |
| |
| a = Dot3(b'\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x90\x00') |
| assert isinstance(a,Ether) |
| assert a.dst == 'ff:ff:ff:ff:ff:ff' |
| assert a.src == '00:00:00:00:00:00' |
| |
| |
| ############ |
| ############ |
| + ASN.1 |
| |
| = MIB |
| ~ mib |
| |
| import tempfile |
| fd, fname = tempfile.mkstemp() |
| os.write(fd, b"-- MIB test\nscapy OBJECT IDENTIFIER ::= {test 2807}\n") |
| os.close(fd) |
| |
| load_mib(fname) |
| assert sum(1 for k in conf.mib.d.values() if "scapy" in k) == 1 |
| |
| assert sum(1 for oid in conf.mib) > 100 |
| |
| = MIB - graph |
| ~ mib |
| |
| from unittest import mock |
| |
| @mock.patch("scapy.asn1.mib.do_graph") |
| def get_mib_graph(do_graph): |
| def store_graph(graph, **kargs): |
| assert graph.startswith("""digraph "mib" {""") |
| assert """"test.2807" [ label="scapy" ];""" in graph |
| do_graph.side_effect = store_graph |
| conf.mib._make_graph() |
| |
| get_mib_graph() |
| |
| = MIB - test aliases |
| ~ mib |
| |
| # https://github.com/secdev/scapy/issues/2542 |
| assert conf.mib._oidname("2.5.29.19") == "basicConstraints" |
| |
| = DADict tests |
| |
| a = DADict("test") |
| a[0] = "test_value1" |
| a["scapy"] = "test_value2" |
| |
| assert a.test_value1 == 0 |
| assert a.test_value2 == "scapy" |
| |
| with ContextManagerCaptureOutput() as cmco: |
| a._show() |
| outp = cmco.get_output() |
| |
| assert "scapy = 'test_value2'" in outp |
| assert "0 = 'test_value1'" in outp |
| |
| = Test ETHER_TYPES |
| |
| assert ETHER_TYPES.IPv4 == 2048 |
| try: |
| import warnings |
| |
| with warnings.catch_warnings(record=True) as w: |
| warnings.simplefilter("always") |
| ETHER_TYPES["BAOBAB"] = 0xffff |
| assert ETHER_TYPES.BAOBAB == 0xffff |
| assert issubclass(w[-1].category, DeprecationWarning) |
| except DeprecationWarning: |
| # -Werror is used |
| pass |
| |
| = MIB - Check that MIB OIDs are not duplicated |
| ~ mib |
| |
| from scapy.asn1.mib import x509_oids_sets |
| |
| _dct = {} |
| for d in x509_oids_sets: |
| for elt in d: |
| if elt in _dct: |
| raise ValueError("OID %s already exists" % elt) |
| _dct.update(d) |
| |
| = BER tests |
| |
| BER_id_enc(42) == '*' |
| BER_id_enc(2807) == b'\xbfw' |
| |
| b = BERcodec_IPADDRESS() |
| r1 = b.enc("8.8.8.8") |
| r1 == b'@\x04\x08\x08\x08\x08' |
| |
| r2 = b.dec(r1)[0] |
| r2.val == '8.8.8.8' |
| |
| a = b'\x1f\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\x01\x01\x00C\x02\x01U0\x0f0\r\x06\x08+\x06\x01\x02\x01\x02\x01\x00\x02\x01!' |
| ret = False |
| try: |
| BERcodec_Object.check_type(a) |
| except BER_BadTag_Decoding_Error: |
| ret = True |
| else: |
| ret = False |
| |
| assert ret |
| |
| = BER trigger failures |
| |
| try: |
| BERcodec_INTEGER.do_dec(b"\x02\x01") |
| assert False |
| except BER_Decoding_Error: |
| pass |
| |
| |
| ############ |
| ############ |
| + Fields |
| |
| = FieldLenField with BitField |
| class Test(Packet): |
| name = "Test" |
| fields_desc = [ |
| FieldLenField("BitCount", None, fmt="H", count_of="Values"), |
| FieldLenField("ByteCount", None, fmt="B", length_of="Values"), |
| FieldListField("Values", [], BitField("data", 0x0, size=1), |
| count_from=lambda pkt: pkt.BitCount), |
| ] |
| |
| pkt = Test(raw(Test(Values=[0, 0, 0, 0, 1, 1, 1, 1]))) |
| assert pkt.BitCount == 8 |
| assert pkt.ByteCount == 1 |
| |
| = PacketListField |
| |
| class TestPacket(Packet): |
| name = 'TestPacket' |
| fields_desc = [ PacketListField('list', [], 0) ] |
| |
| a = TestPacket() |
| a.list.append(1) |
| assert len(a.list) == 1 |
| |
| b = TestPacket() |
| assert len(b.list) == 0 |
| |
| = Test PacketListField deepcopy |
| class SubPacket(Packet): |
| name = "SubPacket" |
| fields_desc = [ |
| ByteField("mem", 1), |
| ] |
| |
| class TestPacket(Packet): |
| name = "TestPacket" |
| fields_desc = [ |
| PacketListField("packlist", SubPacket(), SubPacket), |
| ] |
| |
| a = TestPacket() |
| b = a.copy() |
| fuzz(b) |
| assert a.packlist[0].mem == 1 |
| |
| = PacketField |
| |
| class InnerPacket(Packet): |
| fields_desc = [ StrField("f_name", "test") ] |
| |
| class TestPacket(Packet): |
| fields_desc = [ PacketField("inner", InnerPacket(), InnerPacket) ] |
| |
| p = TestPacket() |
| print(p.inner.f_name) |
| assert p.inner.f_name == b"test" |
| |
| p = TestPacket() |
| p.inner.f_name = b"scapy" |
| assert p.inner.f_name == b"scapy" |
| |
| p = TestPacket() |
| assert p.inner.f_name == b"test" |
| |
| + UUIDField |
| |
| = Parsing a human-readable UUID |
| f = UUIDField('f', '01234567-89ab-cdef-0123-456789abcdef') |
| f.addfield(None, b'', f.default) == hex_bytes('0123456789abcdef0123456789abcdef') |
| |
| = Parsing a machine-encoded UUID |
| f = UUIDField('f', bytearray.fromhex('0123456789abcdef0123456789abcdef')) |
| f.addfield(None, b'', f.default) == hex_bytes('0123456789abcdef0123456789abcdef') |
| |
| = Parsing a tuple of values |
| f = UUIDField('f', (0x01234567, 0x89ab, 0xcdef, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef)) |
| f.addfield(None, b'', f.default) == hex_bytes('0123456789abcdef0123456789abcdef') |
| |
| = Handle None values |
| f = UUIDField('f', None) |
| f.addfield(None, b'', f.default) == hex_bytes('00000000000000000000000000000000') |
| |
| = Get a UUID for dissection |
| from uuid import UUID |
| f = UUIDField('f', None) |
| f.getfield(None, bytearray.fromhex('0123456789abcdef0123456789abcdef01')) == (b'\x01', UUID('01234567-89ab-cdef-0123-456789abcdef')) |
| |
| = Verify little endian UUIDField |
| * The endianness of a UUIDField should be apply by block on each block in parenthesis '(01234567)-(89ab)-(cdef)-(01)(23)-(45)(67)(89)(ab)(cd)(ef)' |
| f = UUIDField('f', '01234567-89ab-cdef-0123-456789abcdef', uuid_fmt=UUIDField.FORMAT_LE) |
| f.addfield(None, b'', f.default) == hex_bytes('67452301ab89efcd0123456789abcdef') |
| |
| = Verify reversed UUIDField |
| * This should reverse the entire value as 128-bits |
| f = UUIDField('f', '01234567-89ab-cdef-0123-456789abcdef', uuid_fmt=UUIDField.FORMAT_REV) |
| f.addfield(None, b'', f.default) == hex_bytes('efcdab8967452301efcdab8967452301') |
| |
| + RandUUID |
| |
| = RandUUID setup |
| |
| RANDUUID_TEMPLATE = '01234567-89ab-*-01*-*****ef' |
| RANDUUID_FIXED = uuid.uuid4() |
| |
| = RandUUID default behaviour |
| |
| ru = RandUUID() |
| assert ru._fix().version == 4 |
| assert ru.command() == "RandUUID()" |
| |
| = RandUUID incorrect implicit args |
| |
| assert expect_exception(ValueError, lambda: RandUUID(node=0x1234, name="scapy")) |
| assert expect_exception(ValueError, lambda: RandUUID(node=0x1234, namespace=uuid.uuid4())) |
| assert expect_exception(ValueError, lambda: RandUUID(clock_seq=0x1234, name="scapy")) |
| assert expect_exception(ValueError, lambda: RandUUID(clock_seq=0x1234, namespace=uuid.uuid4())) |
| assert expect_exception(ValueError, lambda: RandUUID(name="scapy")) |
| assert expect_exception(ValueError, lambda: RandUUID(namespace=uuid.uuid4())) |
| |
| = RandUUID v4 UUID (correct args) |
| |
| u = RandUUID(version=4)._fix() |
| assert u.version == 4 |
| |
| u2 = RandUUID(version=4)._fix() |
| assert u2.version == 4 |
| |
| assert str(u) != str(u2) |
| |
| = RandUUID v4 UUID (incorrect args) |
| |
| assert expect_exception(ValueError, lambda: RandUUID(version=4, template=RANDUUID_TEMPLATE)) |
| assert expect_exception(ValueError, lambda: RandUUID(version=4, node=0x1234)) |
| assert expect_exception(ValueError, lambda: RandUUID(version=4, clock_seq=0x1234)) |
| assert expect_exception(ValueError, lambda: RandUUID(version=4, namespace=uuid.uuid4())) |
| assert expect_exception(ValueError, lambda: RandUUID(version=4, name="scapy")) |
| |
| = RandUUID v1 UUID |
| |
| u = RandUUID(version=1)._fix() |
| assert u.version in [1, 4] |
| |
| u = RandUUID(version=1, node=0x1234)._fix() |
| assert u.version == 1 |
| assert u.node == 0x1234 |
| |
| u = RandUUID(version=1, clock_seq=0x1234)._fix() |
| assert u.version == 1 |
| assert u.clock_seq == 0x1234 |
| |
| ru = RandUUID(version=1, node=0x1234, clock_seq=0x1bcd) |
| assert ru.command() == "RandUUID(node=4660, clock_seq=7117, version=1)" |
| u = ru._fix() |
| assert u.version == 1 |
| assert u.node == 0x1234 |
| assert u.clock_seq == 0x1bcd |
| |
| = RandUUID v1 UUID (implicit version) |
| |
| u = RandUUID(node=0x1234)._fix() |
| assert u.version == 1 |
| assert u.node == 0x1234 |
| |
| u = RandUUID(clock_seq=0x1234)._fix() |
| assert u.version == 1 |
| assert u.clock_seq == 0x1234 |
| |
| u = RandUUID(node=0x1234, clock_seq=0x1bcd)._fix() |
| assert u.version == 1 |
| assert u.node == 0x1234 |
| assert u.clock_seq == 0x1bcd |
| |
| = RandUUID v1 UUID (incorrect args) |
| |
| assert expect_exception(ValueError, lambda: RandUUID(version=1, template=RANDUUID_TEMPLATE)) |
| assert expect_exception(ValueError, lambda: RandUUID(version=1, namespace=uuid.uuid4())) |
| assert expect_exception(ValueError, lambda: RandUUID(version=1, name="scapy")) |
| |
| = RandUUID v5 UUID |
| |
| ru = RandUUID(version=5, namespace=RANDUUID_FIXED, name="scapy") |
| u = ru._fix() |
| assert u.version == 5 |
| assert ru.command() == "RandUUID(namespace=%r, name='scapy', version=5)" % RANDUUID_FIXED |
| |
| u2 = RandUUID(version=5, namespace=RANDUUID_FIXED, name="scapy")._fix() |
| assert u2.version == 5 |
| assert u.bytes == u2.bytes |
| |
| # implicit v5 |
| u2 = RandUUID(namespace=RANDUUID_FIXED, name="scapy")._fix() |
| assert u.bytes == u2.bytes |
| |
| = RandUUID v5 UUID (incorrect args) |
| |
| assert expect_exception(ValueError, lambda: RandUUID(version=5, template=RANDUUID_TEMPLATE)) |
| assert expect_exception(ValueError, lambda: RandUUID(version=5, node=0x1234)) |
| assert expect_exception(ValueError, lambda: RandUUID(version=5, clock_seq=0x1234)) |
| |
| = RandUUID v3 UUID |
| |
| u = RandUUID(version=3, namespace=RANDUUID_FIXED, name="scapy")._fix() |
| assert u.version == 3 |
| |
| u2 = RandUUID(version=3, namespace=RANDUUID_FIXED, name="scapy")._fix() |
| assert u2.version == 3 |
| assert u.bytes == u2.bytes |
| |
| # implicit v5 |
| u2 = RandUUID(namespace=RANDUUID_FIXED, name="scapy")._fix() |
| assert u.bytes != u2.bytes |
| |
| = RandUUID v3 UUID (incorrect args) |
| |
| assert expect_exception(ValueError, lambda: RandUUID(version=5, template=RANDUUID_TEMPLATE)) |
| assert expect_exception(ValueError, lambda: RandUUID(version=5, node=0x1234)) |
| assert expect_exception(ValueError, lambda: RandUUID(version=5, clock_seq=0x1234)) |
| |
| = RandUUID looks like a UUID with str |
| assert re.match(r'[0-9a-f]{8}(-[0-9a-f]{4}){3}-[0-9a-f]{12}', str(RandUUID()), re.I) is not None |
| |
| = RandUUID with a static part |
| * RandUUID template can contain static part such a 01234567-89ab-*-01*-*****ef |
| ru = RandUUID('01234567-89ab-*-01*-*****ef') |
| assert re.match(r'01234567-89ab-[0-9a-f]{4}-01[0-9a-f]{2}-[0-9a-f]{10}ef', str(ru), re.I) is not None |
| assert ru.command() == "RandUUID(template='01234567-89ab-*-01*-*****ef')" |
| |
| = RandUUID with a range part |
| * RandUUID template can contain a part with a range of values such a 01234567-89ab-*-01*-****c0:c9ef |
| assert re.match(r'01234567-89ab-[0-9a-f]{4}-01[0-9a-f]{2}-[0-9a-f]{8}c[0-9]ef', str(RandUUID('01234567-89ab-*-01*-****c0:c9ef')), re.I) is not None |
| |
| ############ |
| ############ |
| + MPLS tests |
| |
| = MPLS - build/dissection |
| from scapy.contrib.mpls import EoMCW, MPLS |
| p1 = MPLS()/IP()/UDP() |
| assert p1[MPLS].s == 1 |
| p2 = MPLS()/MPLS()/IP()/UDP() |
| assert p2[MPLS].s == 0 |
| |
| p1[MPLS] |
| p1[IP] |
| p2[MPLS] |
| p2[MPLS:1] |
| p2[IP] |
| |
| = MPLS encapsulated Ethernet with CW - build/dissection |
| p = Ether(dst="11:11:11:11:11:11", src="22:22:22:22:22:22") |
| p /= MPLS(label=1)/EoMCW(seq=1234) |
| p /= Ether(dst="33:33:33:33:33:33", src="44:44:44:44:44:44")/IP() |
| p = Ether(raw(p)) |
| assert p[EoMCW].zero == 0 |
| assert p[EoMCW].reserved == 0 |
| assert p[EoMCW].seq == 1234 |
| |
| = MPLS encapsulated Ethernet without CW - build/dissection |
| p = Ether(dst="11:11:11:11:11:11", src="22:22:22:22:22:22") |
| p /= MPLS(label=2)/MPLS(label=1) |
| p /= Ether(dst="33:33:33:33:33:33", src="44:44:44:44:44:44")/IP() |
| p = Ether(raw(p)) |
| assert p[Ether:2].type == 0x0800 |
| |
| try: |
| p[EoMCW] |
| except IndexError: |
| ret = True |
| else: |
| ret = False |
| |
| assert ret |
| assert p[Ether:2].type == 0x0800 |
| |
| = MPLS encapsulated IP - build/dissection |
| p = Ether(dst="11:11:11:11:11:11", src="22:22:22:22:22:22") |
| p /= MPLS(label=1)/IP() |
| p = Ether(raw(p)) |
| |
| try: |
| p[EoMCW] |
| except IndexError: |
| ret = True |
| else: |
| ret = False |
| |
| assert ret |
| |
| try: |
| p[Ether:2] |
| except IndexError: |
| ret = True |
| else: |
| ret = False |
| |
| assert ret |
| |
| p[IP] |
| |
| |
| ############ |
| ############ |
| + PacketList methods |
| |
| = sr() |
| |
| class Req(Packet): |
| fields_desc = [ |
| ByteField("raw", 0) |
| ] |
| def answers(self, other): |
| return False |
| |
| class Res(Packet): |
| fields_desc = [ |
| ByteField("raw", 0) |
| ] |
| def answers(self, other): |
| return other.__class__ == Req and other.raw == self.raw |
| |
| pl = PacketList([Req(b"1"), Res(b"1"), Req(b"2"), Req(b"3"), Req(b"4"), Res(b"3"), Res(b"1"), Res(b"1"), Res(b"4")]) |
| |
| srl, rl = pl.sr() |
| assert len(srl) == 3 |
| assert len(rl) == 3 |
| |
| srl, rl = pl.sr(lookahead=1) |
| assert len(srl) == 1 |
| assert len(rl) == 7 |
| |
| srl, rl = pl.sr(lookahead=2) |
| assert len(srl) == 2 |
| assert len(rl) == 5 |
| |
| srl, rl = pl.sr(lookahead=3) |
| assert len(srl) == 3 |
| assert len(rl) == 3 |
| |
| pl = PacketList([Req(b"\x05"), Res(b"1"), Res(b"2"), Res(b"3"), Res(b"4"), Res(b"3"), Res(b"1"), Res(b"1"), Res(b"\x05")]) |
| |
| srl, rl = pl.sr(lookahead=3) |
| assert len(srl) == 0 |
| assert len(rl) == 9 |
| |
| srl, rl = pl.sr(lookahead=7) |
| assert len(srl) == 0 |
| assert len(rl) == 9 |
| |
| srl, rl = pl.sr(lookahead=8) |
| assert len(srl) == 1 |
| assert len(rl) == 7 |
| |
| srl, rl = pl.sr(lookahead=0) |
| assert len(srl) == 1 |
| assert len(rl) == 7 |
| |
| srl, rl = pl.sr(lookahead=None) |
| assert len(srl) == 1 |
| assert len(rl) == 7 |
| |
| = pickle test |
| import pickle |
| import io |
| |
| srl, rl = PacketList([Raw(b"1"), Raw(b"1"), Raw(b"2"), Raw(b"3"), Raw(b"4"), Raw(b"3"), Raw(b"1"), Raw(b"1"), Raw(b"4")]).sr() |
| assert len(srl) == 4 |
| |
| f = io.BytesIO() |
| |
| pickle.dump(srl, f) |
| |
| unp = pickle.loads(f.getvalue()) |
| |
| assert len(unp) == len(srl) |
| assert all(bytes(a[0]) == bytes(b[0]) for a, b in zip(unp, srl)) |
| |
| = plot() |
| |
| from unittest import mock |
| import scapy.libs.matplot |
| |
| @mock.patch("scapy.libs.matplot.plt") |
| def test_plot(mock_plt): |
| def fake_plot(data, **kwargs): |
| return data |
| mock_plt.plot = fake_plot |
| plist = PacketList([IP(id=i)/TCP() for i in range(10)]) |
| lines = plist.plot(lambda p: (p.time, p.id)) |
| assert len(lines) == 10 |
| |
| test_plot() |
| |
| = diffplot() |
| |
| from unittest import mock |
| import scapy.libs.matplot |
| |
| @mock.patch("scapy.libs.matplot.plt") |
| def test_diffplot(mock_plt): |
| def fake_plot(data, **kwargs): |
| return data |
| mock_plt.plot = fake_plot |
| plist = PacketList([IP(id=i)/TCP() for i in range(10)]) |
| lines = plist.diffplot(lambda x,y: (x.time, y.id-x.id)) |
| assert len(lines) == 9 |
| |
| test_diffplot() |
| |
| = multiplot() |
| |
| from unittest import mock |
| import scapy.libs.matplot |
| |
| @mock.patch("scapy.libs.matplot.plt") |
| def test_multiplot(mock_plt): |
| def fake_plot(data, **kwargs): |
| return data |
| mock_plt.plot = fake_plot |
| tmp = [IP(id=i)/TCP() for i in range(10)] |
| plist = PacketList([tuple(tmp[i-2:i]) for i in range(2, 10, 2)]) |
| lines = plist.multiplot(lambda x, y: (y[IP].src, (y.time, y[IP].id))) |
| assert len(lines) == 1 |
| assert len(lines[0]) == 4 |
| |
| test_multiplot() |
| |
| = rawhexdump() |
| |
| def test_rawhexdump(): |
| with ContextManagerCaptureOutput() as cmco: |
| p = PacketList([IP()/TCP() for i in range(2)]) |
| p.rawhexdump() |
| result_pl_rawhexdump = cmco.get_output() |
| assert len(result_pl_rawhexdump.split('\n')) == 7 |
| assert result_pl_rawhexdump.startswith("0000 45 00 00 28") |
| |
| test_rawhexdump() |
| |
| = hexraw() |
| |
| def test_hexraw(): |
| with ContextManagerCaptureOutput() as cmco: |
| p = PacketList([IP()/Raw(str(i)) for i in range(2)]) |
| p.hexraw() |
| result_pl_hexraw = cmco.get_output() |
| assert len(result_pl_hexraw.split('\n')) == 5 |
| assert "0000 30" in result_pl_hexraw |
| |
| test_hexraw() |
| |
| = hexdump() |
| |
| def test_hexdump(): |
| with ContextManagerCaptureOutput() as cmco: |
| p = PacketList([IP()/Raw(str(i)) for i in range(2)]) |
| p.hexdump() |
| result_pl_hexdump = cmco.get_output() |
| assert len(result_pl_hexdump.split('\n')) == 7 |
| assert "0010 7F 00 00 01 31" in result_pl_hexdump |
| |
| test_hexdump() |
| |
| = import_hexcap() |
| |
| @mock.patch("scapy.utils.input") |
| def test_import_hexcap(mock_input): |
| data = """ |
| 0000 FF FF FF FF FF FF AA AA AA AA AA AA 08 00 45 00 ..............E. |
| 0010 00 1C 00 01 00 00 40 01 7C DE 7F 00 00 01 7F 00 ......@.|....... |
| 0020 00 01 08 00 F7 FF 00 00 00 00 .......... |
| """[1:].split("\n") |
| lines = iter(data) |
| mock_input.side_effect = lambda: next(lines) |
| return import_hexcap() |
| |
| pkt = test_import_hexcap() |
| pkt = Ether(pkt) |
| assert pkt[Ether].dst == "ff:ff:ff:ff:ff:ff" |
| assert pkt[IP].dst == "127.0.0.1" |
| assert ICMP in pkt |
| |
| = import_hexcap(input_string) |
| data = """ |
| 0000 FF FF FF FF FF FF AA AA AA AA AA AA 08 00 45 00 ..............E. |
| 0010 00 1C 00 01 00 00 40 01 7C DE 7F 00 00 01 7F 00 ......@.|....... |
| 0020 00 01 08 00 F7 FF 00 00 00 00 .......... |
| """[1:] |
| pkt = import_hexcap(data) |
| pkt = Ether(pkt) |
| assert pkt[Ether].dst == "ff:ff:ff:ff:ff:ff" |
| assert pkt[IP].dst == "127.0.0.1" |
| assert ICMP in pkt |
| |
| = padding() |
| |
| def test_padding(): |
| with ContextManagerCaptureOutput() as cmco: |
| p = PacketList([IP()/conf.padding_layer(str(i)) for i in range(2)]) |
| p.padding() |
| result_pl_padding = cmco.get_output() |
| assert len(result_pl_padding.split('\n')) == 5 |
| assert "0000 30" in result_pl_padding |
| |
| test_padding() |
| |
| = nzpadding() |
| |
| def test_nzpadding(): |
| with ContextManagerCaptureOutput() as cmco: |
| p = PacketList([IP()/conf.padding_layer("AB"), IP()/conf.padding_layer("\x00\x00")]) |
| p.nzpadding() |
| result_pl_nzpadding = cmco.get_output() |
| assert len(result_pl_nzpadding.split('\n')) == 3 |
| assert "0000 41 42" in result_pl_nzpadding |
| |
| test_nzpadding() |
| |
| = conversations() |
| |
| from unittest import mock |
| @mock.patch("scapy.plist.do_graph") |
| def test_conversations(mock_do_graph): |
| def fake_do_graph(graph, **kwargs): |
| return graph |
| mock_do_graph.side_effect = fake_do_graph |
| plist = PacketList([IP(dst="127.0.0.2")/TCP(dport=i) for i in range(2)]) |
| plist.extend([IP(src="127.0.0.2")/TCP(sport=i) for i in range(2)]) |
| plist.extend([IPv6(dst="::2", src="::1")/TCP(sport=i) for i in range(2)]) |
| plist.extend([IPv6(src="::2", dst="::1")/TCP(sport=i) for i in range(2)]) |
| plist.extend([Ether()/ARP(pdst="127.0.0.1")]) |
| result_conversations = plist.conversations() |
| assert len(result_conversations.split('\n')) == 8 |
| assert result_conversations.startswith('digraph "conv" {') |
| assert "127.0.0.1" in result_conversations |
| assert "::1" in result_conversations |
| |
| test_conversations() |
| |
| = sessions() |
| |
| pl = PacketList([Ether()/IPv6()/ICMPv6EchoRequest(), Ether()/IPv6()/IPv6()]) |
| pl.extend([Ether()/IP()/IP(), Ether()/ARP()]) |
| pl.extend([Ether()/Ether()/IP()]) |
| assert len(pl.sessions().keys()) == 5 |
| |
| = afterglow() |
| |
| from unittest import mock |
| @mock.patch("scapy.plist.do_graph") |
| def test_afterglow(mock_do_graph): |
| def fake_do_graph(graph, **kwargs): |
| return graph |
| mock_do_graph.side_effect = fake_do_graph |
| plist = PacketList([IP(dst="127.0.0.2")/TCP(dport=i) for i in range(2)]) |
| plist.extend([IP(src="127.0.0.2")/TCP(sport=i) for i in range(2)]) |
| result_afterglow = plist.afterglow() |
| assert len(result_afterglow.split('\n')) == 19 |
| assert result_afterglow.startswith('digraph "afterglow" {') |
| |
| test_afterglow() |
| |
| = psdump() |
| |
| print("PYX: %d" % PYX) |
| if PYX: |
| import tempfile |
| import os |
| filename = tempfile.mktemp(suffix=".eps") |
| plist = PacketList([IP()/TCP()]) |
| plist.psdump(filename) |
| assert os.path.exists(filename) |
| os.unlink(filename) |
| |
| = pdfdump() |
| |
| print("PYX: %d" % PYX) |
| if PYX: |
| import tempfile |
| import os |
| filename = tempfile.mktemp(suffix=".pdf") |
| plist = PacketList([IP()/TCP()]) |
| plist.pdfdump(filename) |
| assert os.path.exists(filename) |
| os.unlink(filename) |
| |
| = svgdump() |
| |
| print("PYX: %d" % PYX) |
| if PYX: |
| import tempfile |
| import os |
| filename = tempfile.mktemp(suffix=".svg") |
| plist = PacketList([IP()/TCP()]) |
| plist.svgdump(filename) |
| assert os.path.exists(filename) |
| os.unlink(filename) |
| |
| = __getstate__ / __setstate__ (used by pickle) |
| |
| import pickle |
| |
| frm = Ether(src='00:11:22:33:44:55', dst='00:22:33:44:55:66')/Raw() |
| frm.time = EDecimal(123.45) |
| frm.sniffed_on = "iface" |
| frm.wirelen = 1 |
| pl = PacketList(res=[frm, frm], name='WhatAGreatName') |
| pickled = pickle.dumps(pl) |
| pl = pickle.loads(pickled) |
| assert pl.listname == "WhatAGreatName" |
| assert len(pl) == 2 |
| assert pl[0].time == 123.45 |
| assert pl[0].sniffed_on == "iface" |
| assert pl[0].wirelen == 1 |
| assert pl[0][Ether].src == '00:11:22:33:44:55' |
| assert pl[1][Ether].dst == '00:22:33:44:55:66' |
| |
| = EDecimal |
| |
| # GH4488 |
| p1, p2 = EDecimal('1722417787.778435252'), EDecimal('1722417787.778435216') |
| assert p1 != p2 |
| assert p1 > p2 |
| assert not (p1 < p2) |
| assert p1 == 1722417787.778435252 # float test |
| assert p2 == 1722417787.778435216 |
| assert (p1, 0) > (p2, 1) |
| |
| ############ |
| ############ |
| + Scapy version |
| |
| = _version() |
| |
| import os |
| from datetime import datetime, timezone |
| |
| version_filename = os.path.join(scapy._SCAPY_PKG_DIR, "VERSION") |
| mtime = datetime.fromtimestamp(os.path.getmtime(scapy.__file__), timezone.utc) |
| version = "2.0.0" |
| with open(version_filename, "w") as fd: |
| fd.write(version) |
| |
| os.environ["SCAPY_VERSION"] = "9.9.9" |
| assert scapy._version() == "9.9.9" |
| del os.environ["SCAPY_VERSION"] |
| |
| assert scapy._version() == version |
| os.unlink(version_filename) |
| |
| from unittest import mock |
| with mock.patch("scapy._version_from_git_archive") as archive: |
| archive.return_value = "4.4.4" |
| assert scapy._version() == "4.4.4" |
| archive.side_effect = ValueError() |
| with mock.patch("scapy._version_from_git_describe") as git: |
| git.return_value = "3.3.3" |
| assert scapy._version() == "3.3.3" |
| git.side_effect = Exception() |
| assert scapy._version() == mtime.strftime("%Y.%m.%d") |
| with mock.patch("os.path.getmtime") as getmtime: |
| getmtime.side_effect = Exception() |
| assert scapy._version() == "0.0.0" |
| |
| |
| = UTscapy HTML output |
| |
| import tempfile, os |
| from scapy.tools.UTscapy import TestCampaign, pack_html_campaigns |
| test_campaign = TestCampaign("test") |
| test_campaign.output_file = tempfile.mktemp() |
| html = pack_html_campaigns([test_campaign], None, local=True) |
| dirname = os.path.dirname(test_campaign.output_file) |
| filename_js = "%s/UTscapy.js" % dirname |
| filename_css = "%s/UTscapy.css" % dirname |
| assert os.path.isfile(filename_js) |
| assert os.path.isfile(filename_css) |
| os.remove(filename_js) |
| os.remove(filename_css) |
| |
| = test get_temp_dir |
| |
| dname = get_temp_dir() |
| assert os.path.isdir(dname) |
| |
| = test fragleak functions |
| ~ netaccess linux fragleak |
| |
| from unittest import mock |
| |
| @mock.patch("scapy.layers.inet.conf.L3socket") |
| @mock.patch("scapy.layers.inet.select.select") |
| @mock.patch("scapy.layers.inet.sr1") |
| def _test_fragleak(func, sr1, select, L3socket): |
| packets = [IP(src="4.4.4.4")/ICMP()/IPerror(dst="8.8.8.8")/conf.padding_layer(load=b"greatdata")] |
| iterator = iter(packets) |
| ne = lambda *args, **kwargs: next(iterator) |
| L3socket.side_effect = lambda: Bunch(recv=ne, send=lambda x: None) |
| sr1.side_effect = ne |
| select.side_effect = lambda a, b, c, d: a+b+c |
| with ContextManagerCaptureOutput() as cmco: |
| func("8.8.8.8", count=1) |
| out = cmco.get_output() |
| return "greatdata" in out |
| |
| assert _test_fragleak(fragleak) |
| assert _test_fragleak(fragleak2) |