% Regression tests for Scapy

# More information at http://www.secdev.org/projects/UTscapy/

############
############
+ Information on Scapy

= Setup
def expect_exception(e, c):
    try:
        c()
        return False
    except e:
        return True

= Get conf
~ conf command
* Dump the current configuration
conf

IP().src
conf.loopback_name

= Test module version detection
~ conf

class FakeModule(object):
    __version__ = "v1.12"

class FakeModule2(object):
    __version__ = "5.143.3.12"

class FakeModule3(object):
    __version__ = "v2.4.2.dev42"

from scapy.config import _version_checker

assert _version_checker(FakeModule, (1,11,5))
assert not _version_checker(FakeModule, (1,13))

assert _version_checker(FakeModule2, (5, 1))
assert not _version_checker(FakeModule2, (5, 143, 4))

assert _version_checker(FakeModule3, (2, 4, 2))

= Check Scapy version

from unittest import mock

import scapy
from scapy import _parse_tag, _version_from_git_describe
from scapy.config import _version_checker

b = Bunch(returncode=0, communicate=lambda *args, **kargs: (b"v2.4.5rc1-261-g44b98e14", None))
with mock.patch('scapy.subprocess.Popen', return_value=b):
    with mock.patch('scapy.os.path.isdir', return_value=True):
        class GitModuleScapy(object):
            __version__ = _version_from_git_describe()

# GH3847
with mock.patch('scapy.subprocess.Popen', return_value=b):
    with mock.patch('scapy.os.path.isdir', return_value=False):
        try:
            _version_from_git_describe()
            assert False
        except ValueError:
            pass

assert GitModuleScapy.__version__ == '2.4.5rc1.dev261'
assert _version_checker(GitModuleScapy, (2, 4, 5))

= List layers
~ conf command
ls()

= List layers - advanced
~ conf command

with ContextManagerCaptureOutput() as cmco:
    ls("IP", case_sensitive=True)
    result_ls = cmco.get_output().split("\n")

assert all("IP" in x for x in result_ls if x.strip())
assert len(result_ls) >= 3

= List packet fields - ls
~ command

with ContextManagerCaptureOutput() as cmco:
    ls(ARP(hwsrc="aa:aa:aa:aa:aa:aa", psrc="1.1.1.1"))
    result_ls = cmco.get_output().split("\n")

result_ls
assert result_ls[5] == "hwsrc      : MultipleTypeField (SourceMACField, StrFixedLenField) = 'aa:aa:aa:aa:aa:aa' ('None')"
assert result_ls[6] == "psrc       : MultipleTypeField (SourceIPField, SourceIP6Field, StrFixedLenField) = '1.1.1.1'       ('None')"

= List commands
~ conf command
lsc()

= List contribs
~ command
def test_list_contrib():
    with ContextManagerCaptureOutput() as cmco:
        list_contrib()
        result_list_contrib = cmco.get_output()
    assert "http2               : HTTP/2 (RFC 7540, RFC 7541)              status=loads" in result_list_contrib
    assert len(result_list_contrib.split('\n')) > 40

test_list_contrib()

= Test packet show() on LatexTheme
% with LatexTheme

class SmallPacket(Packet):
    fields_desc = [ByteField("a", 0)]

conf_color_theme = conf.color_theme
conf.color_theme = LatexTheme()
pkt = SmallPacket()
with ContextManagerCaptureOutput() as cmco:
    pkt.show()
    result = cmco.get_output().strip()

assert result == '\\#\\#\\#[ \\textcolor{red}{\\bf SmallPacket} ]\\#\\#\\#\n  \\textcolor{blue}{a}         = \\textcolor{purple}{0}'
conf.color_theme = conf_color_theme


= Test rfc()
~ command

dat = rfc(IP, ret=True).split("\n")
assert dat[0].replace(" ", "").strip() == "0123"
assert "0123456789" in dat[1].replace(" ", "")
for l in dat:
    # only upper case and +-
    assert re.match(r"[A-Z+-]*", l)

# Add a space before each + to avoid conflicts with UTscapy !
# They will be stripped below
result = """
 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 |VERSION|  IHL  |      TOS      |              LEN              |
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 |               ID              |FLAGS|           FRAG          |
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 |      TTL      |     PROTO     |             CHKSUM            |
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 |                              SRC                              |
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 |                              DST                              |
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 |            OPTIONS            |
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

                             Fig. IP
""".strip()
result = [x.strip() for x in result.split("\n")]
output = [x.strip() for x in rfc(IP, ret=True).strip().split("\n")]
assert result == output

result = """
  0                   1                   2                   3
  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 |      CODE     |       ID      |              LEN              |
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 |      TYPE     |L|M|S|RES|VERSI|          MESSAGE LEN          |
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 |                               |              DATA             |
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

                           Fig. EAP_TTLS
""".strip()
result = [x.strip() for x in result.split("\n")]
output = [x.strip() for x in rfc(EAP_TTLS, ret=True).strip().split("\n")]
assert result == output


result = """
  0                   1                   2                   3
  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 |VERSION|       TC      |                   FL                  |
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 |              PLEN             |       NH      |      HLIM     |
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 |                              SRC                              |
 +                                                               +
 |                                                               |
 +                                                               +
 |                                                               |
 +                                                               +
 |                                                               |
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 |                              DST                              |
 +                                                               +
 |                                                               |
 +                                                               +
 |                                                               |
 +                                                               +
 |                                                               |
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

                             Fig. IPv6
""".strip()
result = [x.strip() for x in result.split("\n")]
output = [x.strip() for x in rfc(IPv6, ret=True).strip().split("\n")]
assert result == output


class TestPad(Packet):
   fields_desc = [ShortField("f0", 0),
                  ShortField("f1", 0),
                  PadField(ByteField("f2", 1), 8),
                  PadField(ShortField("f3", 0), 4)]


result = """
  0                   1                   2                   3
  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 |               F0              |               F1              |
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 |       F2      |                    padding                    |
 +-+-+-+-+-+-+-+-+                                               +
 |                                                               |
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 |               F3              |            padding            |
 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

                            Fig. TestPad
""".strip()
result = [x.strip() for x in result.split("\n")]
output = [x.strip() for x in rfc(TestPad, ret=True).strip().split("\n")]
assert result == output

= Check that all contrib modules are well-configured
~ command
list_contrib(_debug=True)

= Configuration
~ conf
conf.debug_dissector = True

= Configuration conf.use_* LINUX
~ linux

try:
    conf.use_bpf = True
    assert False
except:
    True

assert not conf.use_bpf

= Configuration conf.use_* WINDOWS
~ windows

try:
    conf.use_bpf = True
    assert False
except:
    True

assert not conf.use_bpf

= Configuration conf.use_pcap
~ linux libpcap

if not conf.use_pcap:
    assert not conf.iface.provider.libpcap
    conf.use_pcap = True
    assert conf.iface.provider.libpcap
    for iface in conf.ifaces.values():
        assert iface.provider.libpcap or iface.is_valid() == False
    conf.use_pcap = False
    assert not conf.iface.provider.libpcap

= Test layer filtering
~ filter

pkt = NetflowHeader()/NetflowHeaderV5()/NetflowRecordV5()

conf.layers.filter([NetflowHeader, NetflowHeaderV5])
assert NetflowRecordV5 not in NetflowHeader(bytes(pkt))

conf.layers.unfilter()
assert NetflowRecordV5 in NetflowHeader(bytes(pkt))


###########
###########
= UTscapy route check
* Check that UTscapy has correctly replaced the routes. Many tests won't work otherwise

p = IP().src
p
assert p == "127.0.0.1"

############
############
+ Scapy functions tests

= Interface related functions

from unittest import mock

conf.iface

get_if_addr(conf.iface)
get_if_hwaddr(conf.iface)

bytes_hex(get_if_raw_addr(conf.iface))

def get_dummy_interface():
    """Returns a dummy network interface"""
    conf.ifaces._add_fake_iface("dummy0")
    return "dummy0"

get_if_raw_addr(get_dummy_interface())

get_if_list()

get_working_if()

get_if_raw_addr6(conf.iface)

= More Interfaces related functions

# Test name resolution
old = conf.iface
conf.iface = conf.iface.name
assert conf.iface == old

assert isinstance(conf.iface, NetworkInterface)
assert conf.iface.is_valid()

from unittest import mock
@mock.patch("scapy.interfaces.conf.route.routes", [])
@mock.patch("scapy.interfaces.conf.ifaces.values")
def _test_get_working_if(rou):
    rou.side_effect = lambda: []
    assert get_working_if() is None

assert conf.iface + "a"  # left +
assert "hey! are you, ready to go ? %s" % conf.iface  # format
assert "cuz you know the way to go" + conf.iface  # right +

_test_get_working_if()

= Test conf.ifaces

conf.iface
conf.ifaces

assert conf.iface in conf.ifaces.values()
assert conf.ifaces.dev_from_index(conf.iface.index) == conf.iface
assert conf.ifaces.dev_from_networkname(conf.iface.network_name) == conf.iface

conf.ifaces.data = {'a': NetworkInterface(InterfaceProvider(), {"name": 'a', "network_name": 'a', "description": 'a', "ips": ["127.0.0.1", "::1", "::2", "127.0.0.2"], "mac": 'aa:aa:aa:aa:aa:aa'})}

with ContextManagerCaptureOutput() as cmco:
    conf.ifaces.show()
    output = cmco.get_output()

data = """
Source   Index  Name  MAC                IPv4       IPv6
Unknown  0      a     aa:aa:aa:aa:aa:aa  127.0.0.1  ::1
                                         127.0.0.2  ::2
""".strip()

output = [x.strip() for x in output.strip().split("\n")]
data = [x.strip() for x in data.strip().split("\n")]

assert output == data

conf.ifaces.reload()

= Test extcap detection in conf.ifaces
~ linux extcap

import os
from scapy.libs.extcap import load_extcap

_bkp_extcap = conf.prog.extcap_folders
_bkp_providers = conf.ifaces.providers.copy()

conf.ifaces.providers.clear()

# Create some sort of extcap parody program
extcapfld = get_temp_dir()
extcapprog = os.path.join(extcapfld, "runner.sh")
data = """#!/usr/bin/env python3

import struct
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--extcap-interfaces', action='store_true')
parser.add_argument('--capture', action='store_true')
parser.add_argument('--extcap-config', action='store_true')
parser.add_argument('--scan-follow-rsp', action='store_true')
parser.add_argument('--scan-follow-aux', action='store_true')
parser.add_argument('--extcap-interface', type=str)
parser.add_argument('--fifo', type=str)

args = parser.parse_args()
if args.extcap_interfaces:
    # List interfaces
    print(bytes.fromhex("0a657874636170207b76657273696f6e3d342e312e317d7b646973706c61793d6e524620536e696666657220666f7220426c7565746f6f7468204c457d7b68656c703d68747470733a2f2f7777772e6e6f7264696373656d692e636f6d2f536f6674776172652d616e642d546f6f6c732f446576656c6f706d656e742d546f6f6c732f6e52462d536e69666665722d666f722d426c7565746f6f74682d4c457d0a696e74657266616365207b76616c75653d2f6465762f747479555342352d4e6f6e657d7b646973706c61793d6e524620536e696666657220666f7220426c7565746f6f7468204c457d0a636f6e74726f6c207b6e756d6265723d307d7b747970653d73656c6563746f727d7b646973706c61793d4465766963657d7b746f6f6c7469703d446576696365206c6973747d0a636f6e74726f6c207b6e756d6265723d317d7b747970653d73656c6563746f727d7b646973706c61793d4b65797d7b746f6f6c7469703d7d0a636f6e74726f6c207b6e756d6265723d327d7b747970653d737472696e677d7b646973706c61793d56616c75657d7b746f6f6c7469703d3620646967697420706173736b6579206f72203136206f7220333220627974657320656e6372797074696f6e206b657920696e2068657861646563696d616c207374617274696e67207769746820273078272c2062696720656e6469616e20666f726d61742e49662074686520656e7465726564206b65792069732073686f72746572207468616e203136206f722033322062797465732c2069742077696c6c206265207a65726f2d70616464656420696e2066726f6e74277d7b76616c69646174696f6e3d5c625e28285b302d395d7b367d297c2830785b302d39612d66412d465d7b312c36347d297c285b302d39412d46612d665d7b327d5b3a2d5d297b357d285b302d39412d46612d665d7b327d2920287075626c69637c72616e646f6d2929245c627d0a636f6e74726f6c207b6e756d6265723d337d7b747970653d737472696e677d7b646973706c61793d41647620486f707d7b64656661756c743d33372c33382c33397d7b746f6f6c7469703d4164766572746973696e67206368616e6e656c20686f702073657175656e63652e204368616e676520746865206f7264657220696e2077686963682074686520736e6966666572207377697463686573206164766572746973696e67206368616e6e656c732e2056616c6964206368616e6e656c73206172652033372c20333820616e642033392073657061726174656420627920636f6d6d612e7d7b76616c69646174696f6e3d5e5c732a282833377c33387c3339295c732a2c5c732a297b302c327d2833377c33387c3339297b317d5c732a247d7b72657175697265643d747275657d0a636f6e74726f6c207b6e756d6265723d377d7b747970653d627574746f6e7d7b646973706c61793d436c6561727d7b746f6f6c746f703d436c656172206f722072656d6f7665206465766963652066726f6d20446576696365206c6973747d0a636f6e74726f6c207b6e756d6265723d347d7b747970653d627574746f6e7d7b726f6c653d68656c707d7b646973706c61793d48656c707d7b746f6f6c7469703d416363657373207573657220677569646520286c61756e636865732062726f77736572297d0a636f6e74726f6c207b6e756d6265723d357d7b747970653d627574746f6e7d7b726f6c653d726573746f72657d7b646973706c61793d44656661756c74737d7b746f6f6c7469703d52657365747320746865207573657220696e7465726661636520616e6420636c6561727320746865206c6f672066696c657d0a636f6e74726f6c207b6e756d6265723d367d7b747970653d627574746f6e7d7b726f6c653d6c6f676765727d7b646973706c61793d4c6f677d7b746f6f6c7469703d4c6f672070657220696e746572666163657d0a76616c7565207b636f6e74726f6c3d307d7b76616c75653d207d7b646973706c61793d416c6c206164766572746973696e6720646576696365737d7b64656661756c743d747275657d0a76616c7565207b636f6e74726f6c3d307d7b76616c75653d5b30302c30302c30302c30302c30302c30302c305d7d7b646973706c61793d466f6c6c6f772049524b7d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d307d7b646973706c61793d4c656761637920506173736b65797d7b64656661756c743d747275657d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d317d7b646973706c61793d4c6567616379204f4f4220646174617d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d327d7b646973706c61793d4c6567616379204c544b7d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d337d7b646973706c61793d5343204c544b7d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d347d7b646973706c61793d53432050726976617465204b65797d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d357d7b646973706c61793d49524b7d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d367d7b646973706c61793d416464204c4520616464726573737d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d377d7b646973706c61793d466f6c6c6f77204c4520616464726573737d").decode())
elif args.extcap_interface and args.extcap_config:
    # List config
    print(bytes.fromhex("617267207b6e756d6265723d307d7b63616c6c3d2d2d6f6e6c792d6164766572746973696e677d7b646973706c61793d4f6e6c79206164766572746973696e67207061636b6574737d7b746f6f6c7469703d54686520736e69666665722077696c6c206f6e6c792063617074757265206164766572746973696e67207061636b6574732066726f6d207468652073656c6563746564206465766963657d7b747970653d626f6f6c666c61677d7b736176653d747275657d0a617267207b6e756d6265723d317d7b63616c6c3d2d2d6f6e6c792d6c65676163792d6164766572746973696e677d7b646973706c61793d4f6e6c79206c6567616379206164766572746973696e67207061636b6574737d7b746f6f6c7469703d54686520736e69666665722077696c6c206f6e6c792063617074757265206c6567616379206164766572746973696e67207061636b6574732066726f6d207468652073656c6563746564206465766963657d7b747970653d626f6f6c666c61677d7b736176653d747275657d0a617267207b6e756d6265723d327d7b63616c6c3d2d2d7363616e2d666f6c6c6f772d7273707d7b646973706c61793d46696e64207363616e20726573706f6e736520646174617d7b746f6f6c7469703d54686520736e69666665722077696c6c20666f6c6c6f77207363616e20726571756573747320616e64207363616e20726573706f6e73657320696e207363616e206d6f64657d7b747970653d626f6f6c666c61677d7b64656661756c743d747275657d7b736176653d747275657d0a617267207b6e756d6265723d337d7b63616c6c3d2d2d7363616e2d666f6c6c6f772d6175787d7b646973706c61793d46696e6420617578696c6961727920706f696e74657220646174617d7b746f6f6c7469703d54686520736e69666665722077696c6c20666f6c6c6f772061757820706f696e7465727320696e207363616e206d6f64657d7b747970653d626f6f6c666c61677d7b64656661756c743d747275657d7b736176653d747275657d0a617267207b6e756d6265723d337d7b63616c6c3d2d2d636f6465647d7b646973706c61793d5363616e20616e6420666f6c6c6f772064657669636573206f6e204c4520436f646564205048597d7b746f6f6c7469703d5363616e20666f72206465766963657320616e6420666f6c6c6f772061647665727469736572206f6e204c4520436f646564205048597d7b747970653d626f6f6c666c61677d7b64656661756c743d66616c73657d7b736176653d747275657d").decode())
elif args.capture and args.extcap_interface and args.fifo:
    # Capture
    pkts = [
        bytes.fromhex("ffffffffffff00000000000008004500001c0001000040117cce7f0000017f0000010035003500080172")
    ]
    with open(args.fifo, "wb", 0) as fd:
        # header
        fd.write(
            struct.pack(
                "IHHIIII",
                0xa1b2c3d4,
                2, 4, 0, 0, 65535, 1
            )
        )
        for pkt in pkts:
            fd.write(struct.pack("IIII", 0, 0, len(pkt), len(pkt)))
            fd.write(bytes(pkt))
else:
    raise ValueError("Bad arguments")
""".strip()
with open(extcapprog, "w") as fd:
    fd.write(data)

print(data)

os.chmod(extcapprog, 0o777)

# Inject and load provider
conf.prog.extcap_folders = [extcapfld]
load_extcap()
print(conf.ifaces.providers)
conf.ifaces.reload()

# Now do the tests
iface = conf.ifaces.dev_from_networkname('/dev/ttyUSB5-None')
assert iface.name == "nRF Sniffer for Bluetooth LE"
sock = iface.l2listen()(iface=iface)
pkts = sock.sniff(timeout=2)
sock.close()
assert UDP in pkts[0]

config = iface.get_extcap_config()
assert config["arg"] == [
    ('0', '--only-advertising', 'Only advertising packets', '', ''),
    ('1', '--only-legacy-advertising', 'Only legacy advertising packets', '', ''),
    ('2', '--scan-follow-rsp', 'Find scan response data', 'true', ''),
    ('3', '--scan-follow-aux', 'Find auxiliary pointer data', 'true', ''),
    ('3', '--coded', 'Scan and follow devices on LE Coded PHY', 'false', '')
]

# Restore
conf.prog.extcap_folders = _bkp_extcap
conf.ifaces.providers = _bkp_providers
conf.ifaces.reload()

= Test read_routes6() - default output

routes6 = read_routes6()
if WINDOWS:
    from scapy.arch.windows import _route_add_loopback
    _route_add_loopback(routes6, True)

routes6

# Expected results:
# - one route if there is only the loopback interface
# - one route if IPv6 is supported but disabled on network interfaces
# - three routes if there is a network interface
# - on OpenBSD, only two routes on lo0 are expected

if routes6:
    iflist = get_if_list()
    if WINDOWS:
        from scapy.arch.windows import _route_add_loopback
        _route_add_loopback(ipv6=True, iflist=iflist)
    if OPENBSD:
        len(routes6) >= 2
    elif iflist == [conf.loopback_name]:
        len(routes6) == 1
    elif len(iflist) >= 2:
        len(routes6) >= 1
    else:
        False
else:
    # IPv6 seems disabled. Force a route to ::1
    conf.route6.routes.append(("::1", 128, "::", conf.loopback_name, ["::1"], 1))
    conf.route6.ipv6_ifaces = set([conf.loopback_name])
    True

= Build HBHOptUnknown for IPv6ExtHdrHopByHop with disabled autopad
~ ipv6 hbh opt
* Build the HBHOptUnknown of IPv6ExtHdrHopByHop with autopad=0
v6Opt = HBHOptUnknown(otype=3, optlen=7, optdata="Beijing")
pkt = Ether()/IPv6()/IPv6ExtHdrHopByHop(autopad=0, options=[v6Opt, ])
pkt.build()

= Build HBHOptUnknown for IPv6ExtHdrDestOpt with disabled autopad
~ ipv6 hbh opt
* Build the HBHOptUnknown of IPv6ExtHdrDestOpt with autopad=0
v6Opt = HBHOptUnknown(otype=3, optlen=6, optdata="Haikou")
pkt = Ether()/IPv6()/IPv6ExtHdrDestOpt(autopad=0, options=[v6Opt, ])
pkt.build()


= Test read_routes6() - check mandatory routes

import re
ll_route = re.compile(r"fe80:\d{0,2}:")
# match fe80::, fe80:5:, etc. (if scoped)

conf.route6

if len(routes6) > 2 and not WINDOWS:
    # Identify routes to fe80::/64
    assert sum(1 for r in routes6 if r[0] == "::1" and r[4] == ["::1"]) >= 1
    if len(iflist) >= 2:
        assert sum(1 for r in routes6 if ll_route.match(r[0]) and r[1] == 64) >= 1
        try:
            # Identify a route to a node IPv6 link-local address
            assert sum(1 for r in routes6 if in6_islladdr(r[0]) and r[1] == 128) >= 1
        except:
            # IPv6 is not available, but we still check the loopback
            assert conf.route6.route("::/0") == (conf.loopback_name, "::", "::")
            assert sum(1 for r in routes6 if r[1] == 128 and r[4] == ["::1"]) >= 1
else:
    True

= Test ifchange()
conf.route6.ifchange(conf.loopback_name, "::1/128")
if WINDOWS:
    conf.netcache.in6_neighbor["::1"] = "ff:ff:ff:ff:ff:ff"  # Restore fake cache

True

= Packet.route()
assert (Ether() / ARP()).route()[0] is not None
assert (Ether() / ARP()).payload.route()[0] is not None
assert (ARP(ptype=0, pdst="hello. this isn't a valid IP")).route()[0] is None

= utils/in4_is*

assert in4_ismaddr("224.0.0.1")
assert not in4_ismaddr("192.168.0.1")
assert in4_ismaddr("239.0.0.255")

assert in4_ismlladdr("224.0.0.1")
assert in4_ismlladdr("224.0.0.255")
assert not in4_ismlladdr("224.0.1.255")

assert in4_ismgladdr("235.0.0.1")
assert not in4_ismgladdr("224.0.0.1")
assert not in4_ismgladdr("239.0.0.1")

assert in4_ismlsaddr("239.0.0.1")
assert not in4_ismlsaddr("224.0.0.1")

assert in4_isaddrllallnodes("224.0.0.1")
assert not in4_isaddrllallnodes("224.0.0.3")

assert in4_getnsmac(b'\xe0\x00\x00\x01') == '01:00:5e:00:00:01'
assert getmacbyip("224.0.0.1") == '01:00:5e:00:00:01'

= plain_str test

data = b"\xffsweet\xef celestia\xab"
assert plain_str(data) == "\\xffsweet\\xef celestia\\xab"

############
############
+ compat.py

= test bytes_hex/hex_bytes

monty_data = b"Stop! Who approaches the Bridge of Death must answer me these questions three, 'ere the other side he see."
hex_data = bytes_hex(monty_data)
assert hex_data == b'53746f70212057686f20617070726f61636865732074686520427269646765206f66204465617468206d75737420616e73776572206d65207468657365207175657374696f6e732074687265652c202765726520746865206f746865722073696465206865207365652e'
assert hex_bytes(hex_data) == monty_data

= orb/chb

assert orb(b"\x01"[0]) == 1
assert chb(1) == b"\x01"

############
############
+ Main.py tests

= Pickle and unpickle a packet

import pickle

a = IP(dst="192.168.0.1")/UDP()

b = pickle.dumps(a)
c = pickle.loads(b)

assert c[IP].dst == "192.168.0.1"
assert raw(c) == raw(a)

= Usage test

from scapy.main import _usage
try:
    _usage()
    assert False
except SystemExit:
    assert True

= Session test

import builtins

# This is automatic when using the console
def get_var(var):
    return builtins.__dict__["scapy_session"][var]

def set_var(var, value):
    builtins.__dict__["scapy_session"][var] = value

def del_var(var):
    del builtins.__dict__["scapy_session"][var]

init_session(None, {"init_value": 123})
set_var("test_value", "8.8.8.8") # test_value = "8.8.8.8"
save_session()
del_var("test_value")
load_session()
update_session()
assert get_var("test_value") == "8.8.8.8" #test_value == "8.8.8.8"
assert get_var("init_value") == 123
 
= Session test with fname

session_name = tempfile.mktemp()
init_session(session_name)
set_var("test_value", IP(dst="192.168.0.1")) # test_value = IP(dst="192.168.0.1")
save_session(fname="%s.dat" % session_name)
del_var("test_value")

set_var("z", True) #z = True
load_session(fname="%s.dat" % session_name)
try:
    get_var("z")
    assert False
except:
    pass

set_var("z", False) #z = False
update_session(fname="%s.dat" % session_name)
assert get_var("test_value").dst == "192.168.0.1" #test_value.dst == "192.168.0.1"
assert not get_var("z")

= Clear session files

os.remove("%s.dat" % session_name)

= Test temporary file creation
~ ci_only

scapy_delete_temp_files()

tmpfile = get_temp_file(autoext=".ut")
tmpfile
if WINDOWS:
    assert "scapy" in tmpfile and tmpfile.lower().startswith('c:\\users\\appveyor\\appdata\\local\\temp')
else:
    import platform
    BYPASS_TMP = platform.python_implementation().lower() == "pypy" or DARWIN
    assert "scapy" in tmpfile and (BYPASS_TMP == True or "/tmp/" in tmpfile)

assert conf.temp_files[0].endswith(".ut")
scapy_delete_temp_files()
assert len(conf.temp_files) == 0

= Emulate interact()
~ interact

import sys
from unittest import mock
from scapy.main import interact

from scapy.main import DEFAULT_PRESTART_FILE, DEFAULT_PRESTART, _read_config_file
_read_config_file(DEFAULT_PRESTART_FILE, _locals=globals(), default=DEFAULT_PRESTART)
# By now .config/scapy/startup.py should have been created
with open(DEFAULT_PRESTART_FILE, "r") as fd:
    OLD_DEFAULT_PRESTART = fd.read()

with open(DEFAULT_PRESTART_FILE, "w+") as fd:
    fd.write("conf.interactive_shell = 'ipython'")

# Detect IPython
try:
    import IPython
except:
    code_interact_import = "scapy.main.code.interact"
else:
    code_interact_import = "IPython.embed"

@mock.patch(code_interact_import)
def interact_emulator(code_int, extra_args=[]):
    try:
        code_int.side_effect = lambda *args, **kwargs: lambda *args, **kwargs: None
        interact(argv=["-s scapy1"] + extra_args, mybanner="What a test")
    finally:
        sys.ps1 = ">>> "

interact_emulator()  # Default

try:
    interact_emulator(extra_args=["-?"])  # Failing
    assert False
except:
    pass

interact_emulator(extra_args=["-d"])  # Extended

= Emulate interact() and test startup.py with ptpython
~ interact

import sys
from unittest import mock

from scapy.main import DEFAULT_PRESTART_FILE, DEFAULT_PRESTART, _read_config_file
_read_config_file(DEFAULT_PRESTART_FILE, _locals=globals(), default=DEFAULT_PRESTART)
# By now .config/scapy/startup.py should have been created
with open(DEFAULT_PRESTART_FILE, "w+") as fd:
    fd.write("conf.interactive_shell = 'ptpython'")

called = []
def checker(*args, **kwargs):
    locals = kwargs.pop("locals")
    assert locals["IP"]
    history_filename = kwargs.pop("history_filename")
    assert history_filename == conf.histfile
    called.append(True)

ptpython_mocked_module = Bunch(
    repl=Bunch(
        embed=checker
    )
)

modules_patched = {
    "ptpython": ptpython_mocked_module,
    "ptpython.repl": ptpython_mocked_module.repl,
    "ptpython.repl.embed": ptpython_mocked_module.repl.embed,
}

with mock.patch.dict("sys.modules", modules_patched):
    try:
        interact()
    finally:
        sys.ps1 = ">>> "

# Restore
with open(DEFAULT_PRESTART_FILE, "w") as fd:
    print(OLD_DEFAULT_PRESTART)
    r = fd.write(OLD_DEFAULT_PRESTART)

assert called

= Test explore() with GUI mode
~ command

from unittest import mock

def test_explore_gui(is_layer, layer):
    prompt_toolkit_mocked_module = Bunch(
                                       shortcuts=Bunch(
                                           dialogs=Bunch(
                                               radiolist_dialog=(lambda *args, **kargs: layer),
                                               button_dialog=(lambda *args, **kargs: "layers" if is_layer else "contribs")
                                           )
                                       ),
                                       formatted_text=Bunch(HTML=lambda x: x),
                                       __version__="2.0.0"
                                   )
    # a mock.patch isn't enough to mock a module. Let's roll sys.modules
    modules_patched = {
        "prompt_toolkit": prompt_toolkit_mocked_module,
        "prompt_toolkit.shortcuts": prompt_toolkit_mocked_module.shortcuts,
        "prompt_toolkit.shortcuts.dialogs": prompt_toolkit_mocked_module.shortcuts.dialogs,
        "prompt_toolkit.formatted_text": prompt_toolkit_mocked_module.formatted_text,
    }
    with mock.patch.dict("sys.modules", modules_patched):
        with ContextManagerCaptureOutput() as cmco:
            explore()
            result_explore = cmco.get_output()
        return result_explore

conf.interactive = True
explore_dns = test_explore_gui(True, "scapy.layers.dns")
assert "DNS" in explore_dns
assert "DNS Question Record" in explore_dns
assert "DNSRRNSEC3" in explore_dns
assert "DNS TSIG Resource Record" in explore_dns

explore_avs = test_explore_gui(False, "avs")
assert "AVSWLANHeader" in explore_avs
assert "AVS WLAN Monitor Header" in explore_avs

= Test explore() with non-GUI mode
~ command

def test_explore_non_gui(layer):
    with ContextManagerCaptureOutput() as cmco:
        explore(layer)
        result_explore = cmco.get_output()
    return result_explore

explore_dns = test_explore_non_gui("scapy.layers.dns")
assert "DNS" in explore_dns
assert "DNS Question Record" in explore_dns
assert "DNSRRNSEC3" in explore_dns
assert "DNS TSIG Resource Record" in explore_dns

explore_avs = test_explore_non_gui("avs")
assert "AVSWLANHeader" in explore_avs
assert "AVS WLAN Monitor Header" in explore_avs

assert test_explore_non_gui("scapy.layers.dns") == test_explore_non_gui("dns")
assert test_explore_non_gui("scapy.contrib.avs") == test_explore_non_gui("avs")

try:
    explore("unknown_module")
    assert False  # The previous should have raised an exception
except Scapy_Exception:
    pass

= Test load_contrib overwrite
load_contrib("gtp")
assert GTPHeader.__module__ == "scapy.contrib.gtp"

load_contrib("gtp_v2")
assert GTPHeader.__module__ == "scapy.contrib.gtp_v2"

load_contrib("gtp")
assert GTPHeader.__module__ == "scapy.contrib.gtp"

= Test load_contrib failure
try:
    load_contrib("doesnotexist")
    assert False
except:
    pass

= Test sane function
sane("A\x00\xFFB") == "A..B"

= Test lhex function
assert lhex(42) == "0x2a"
assert lhex((28,7)) == "(0x1c, 0x7)"
assert lhex([28,7]) == "[0x1c, 0x7]"

= Test restart function
from unittest import mock
conf.interactive = True

try:
    from scapy.utils import restart
    import os
    @mock.patch("os.execv")
    @mock.patch("subprocess.call")
    @mock.patch("os._exit")
    def _test(e, m, m2):
        def check(x, y=[]):
            z = [x] + y if not isinstance(x, list) else x + y
            assert os.path.isfile(z[0])
            assert os.path.isfile(z[1])
            return 0
        m2.side_effect = check
        m.side_effect = check
        e.side_effect = lambda x: None
        restart()
    _test()
finally:
    conf.interactive = False

= Test linehexdump function
conf_color_theme = conf.color_theme
conf.color_theme = BlackAndWhite()
assert linehexdump(Ether(src="00:01:02:03:04:05"), dump=True) == 'FF FF FF FF FF FF 00 01 02 03 04 05 90 00  ..............'
conf.color_theme = conf_color_theme

= Test chexdump function
chexdump(Ether(src="00:01:02:02:04:05"), dump=True) == "0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x01, 0x02, 0x02, 0x04, 0x05, 0x90, 0x00"

= Test repr_hex function
repr_hex("scapy") == "7363617079"

= Test hexstr function
hexstr(b"A\x00\xFFB") == "41 00 FF 42  A..B"

= Test fletcher16 functions
assert fletcher16_checksum(b"\x28\x07") == 22319
assert fletcher16_checkbytes(b"\x28\x07", 1) == b"\xaf("

= Test hexdiff function
~ not_pypy
def test_hexdiff(a, b, algo=None, autojunk=False):
    conf_color_theme = conf.color_theme
    conf.color_theme = BlackAndWhite()
    with ContextManagerCaptureOutput() as cmco:
        hexdiff(a, b, algo=algo, autojunk=autojunk)
        result_hexdiff = cmco.get_output()
    conf.interactive = True
    conf.color_theme = conf_color_theme
    return result_hexdiff

# Basic string test

result_hexdiff = test_hexdiff("abcde", "abCde")
expected  = "0000        61 62 63 64 65                                     abcde\n"
expected += "     0000   61 62 43 64 65                                     abCde\n"
assert result_hexdiff == expected

# More advanced string test

result_hexdiff = test_hexdiff("add_common_", "_common_removed")
expected  = "0000        61 64 64 5F 63 6F 6D 6D  6F 6E 5F                  add_common_     \n"
expected += "     -003            5F 63 6F 6D 6D  6F 6E 5F 72 65 6D 6F 76      _common_remov\n"
expected += "     000d   65 64                                              ed\n"
assert result_hexdiff == expected

# Compare packets

result_hexdiff = test_hexdiff(IP(dst="127.0.0.1", src="127.0.0.1"), IP(dst="127.0.0.2", src="127.0.0.1"))
expected  = "0000        45 00 00 14 00 01 00 00  40 00 7C E7 7F 00 00 01   E.......@.|.....\n"
expected += "     0000   45 00 00 14 00 01 00 00  40 00 7C E6 7F 00 00 01   E.......@.|.....\n"
expected += "0010        7F 00 00 01                                        ....\n"
expected += "     0010   7F 00 00 02                                        ....\n"
assert result_hexdiff == expected

# Compare using difflib

a = "A" * 1000 + "findme" + "B" * 1000
b = "A" * 1000 + "B" * 1000
ret1 = test_hexdiff(a, b, algo="difflib")
ret2 = test_hexdiff(a, b, algo="difflib", autojunk=True)

expected_ret1 = """
03d0 03d0   41 41 41 41 41 41 41 41  41 41 41 41 41 41 41 41   AAAAAAAAAAAAAAAA
03e0        41 41 41 41 41 41 41 41  66 69 6E 64 6D 65 42 42   AAAAAAAAfindmeBB
     03e0   41 41 41 41 41 41 41 41                    42 42   AAAAAAAA      BB
03ea 03ea   42 42 42 42 42 42 42 42  42 42 42 42 42 42 42 42   BBBBBBBBBBBBBBBB
"""
expected_ret2 = """
03d0 03d0   41 41 41 41 41 41 41 41  41 41 41 41 41 41 41 41   AAAAAAAAAAAAAAAA
03e0        41 41 41 41 41 41 41 41  66 69 6E 64 6D 65 42 42   AAAAAAAAfindmeBB
     03e0   41 41 41 41 41 41 41 41  42 42 42 42 42 42 42 42   AAAAAAAABBBBBBBB
03f0 03f0   42 42 42 42 42 42 42 42  42 42 42 42 42 42 42 42   BBBBBBBBBBBBBBBB
"""

assert ret1 != ret2
assert expected_ret1 in ret1
assert expected_ret2 in ret2

# Test corner cases that should not crash

hexdiff(b"abc", IP() / TCP())
hexdiff(IP() / TCP(), b"abc")

= Test mysummary functions - Ether

p = Ether(dst="ff:ff:ff:ff:ff:ff", src="ff:ff:ff:ff:ff:ff", type=0x9000)
p
assert p.mysummary() in ['ff:ff:ff:ff:ff:ff > ff:ff:ff:ff:ff:ff (%s)' % loop
                         for loop in ['0x9000', 'LOOP']]

= Test zerofree_randstring function
random.seed(0x2807)
zerofree_randstring(4) in [b"\xd2\x12\xe4\x5b", b'\xd3\x8b\x13\x12']

= Test strand function
assert strand(b"AC", b"BC") == b'@C'

= Test export_object and import_object functions
from unittest import mock
def test_export_import_object():
    with ContextManagerCaptureOutput() as cmco:
        export_object(2807)
        result_export_object = cmco.get_output(eval_bytes=True)
    assert result_export_object.startswith("eNprYPL9zqUHAAdrAf8=")
    assert import_object(result_export_object) == 2807

test_export_import_object()

= Test tex_escape function
tex_escape("$#_") == "\\$\\#\\_"

= Test colgen function
f = colgen(range(3))
assert len([next(f) for i in range(2)]) == 2

= Test incremental_label function
f = incremental_label()
assert [next(f) for i in range(2)] == ["tag00000", "tag00001"]

= Test corrupt_* functions
import random
random.seed(0x2807)
assert corrupt_bytes("ABCDE") in [b"ABCDW", b"ABCDX"]
assert sane(corrupt_bytes("ABCDE", n=3)) in ["A.8D4", ".2.DE"]

assert corrupt_bits("ABCDE") in [b"EBCDE", b"ABCDG"]
assert sane(corrupt_bits("ABCDE", n=3)) in ["AF.EE", "QB.TE"]

= Test save_object and load_object functions
import tempfile
fd, fname = tempfile.mkstemp()
save_object(fname, 2807)
assert load_object(fname) == 2807

= Test whois function
~ netaccess

if not WINDOWS:
    result = whois("193.0.6.139")
    assert b"inetnum" in result and b"Amsterdam" in result

= Test manuf DB methods
~ manufdb
assert conf.manufdb._resolve_MAC("00:00:0F:01:02:03") == "Next:01:02:03"
assert conf.manufdb._get_short_manuf("00:00:0F:01:02:03") == "Next"
assert in6_addrtovendor("fe80::0200:0fff:fe01:0203").lower().startswith("next")

assert conf.manufdb.lookup("00:00:0F:01:02:03") == ('Next', 'Next, Inc.')
assert "00:00:0F" in conf.manufdb.reverse_lookup("Next")

= Test multiple wireshark's manuf formats
~ manufdb

new_format = """
# comment
00:00:00    JokyIsland    Joky Insland Corp SA
00:01:12    SecdevCorp    Secdev Corporation SA LLC
EE:05:01    Scapy         Scapy CO LTD & CIE
FF:00:11    NoName
"""
old_format = """
# comment
00:00:00    JokyIsland  #  Joky Insland Corp SA
00:01:12    SecdevCorp  #  Secdev Corporation SA LLC
EE:05:01    Scapy       #  Scapy CO LTD & CIE
FF:00:11    NoName
"""

manuf1 = get_temp_file()
manuf2 = get_temp_file()

with open(manuf1, "w") as w:
    w.write(old_format)

with open(manuf2, "w") as w:
    w.write(new_format)

a = load_manuf(manuf1)
b = load_manuf(manuf2)

assert a.lookup("00:00:00") == ('JokyIsland', 'Joky Insland Corp SA')
assert a.lookup("FF:00:11:00:00:00") == ('NoName', 'NoName')
assert a.reverse_lookup("Scapy") == {'EE:05:01': ('Scapy', 'Scapy CO LTD & CIE')}
assert a.reverse_lookup("Secdevcorp") == {'00:01:12': ('SecdevCorp', 'Secdev Corporation SA LLC')}


assert b.lookup("00:00:00") == ('JokyIsland', 'Joky Insland Corp SA')
assert b.lookup("FF:00:11:00:00:00") == ('NoName', 'NoName')
assert b.reverse_lookup("Scapy") == {'EE:05:01': ('Scapy', 'Scapy CO LTD & CIE')}
assert b.reverse_lookup("Secdevcorp") == {'00:01:12': ('SecdevCorp', 'Secdev Corporation SA LLC')}

scapy_delete_temp_files()

= Test load_services

data_services = """
itu-bicc-stc	3097/sctp
cvsup		5999/udp			# CVSup
x11		6000-6063/tcp			# X Window System
x11		6000-6063/udp			# X Window System
ndl-ahp-svc	6064/tcp			# NDL-AHP-SVC
"""

services = get_temp_file()
with open(services, "w") as w:
    w.write(data_services)

tcp, udp, sctp = load_services(services)
assert tcp[6002] == "x11"
assert tcp.ndl_ahp_svc == 6064
assert tcp.x11 in range(6000, 6093)
assert udp[6002] == "x11"
assert udp.x11 in range(6000, 6093)
assert udp.cvsup == 5999
assert sctp[3097] == "itu_bicc_stc"
assert sctp.itu_bicc_stc == 3097

scapy_delete_temp_files()

= Test utility functions - network related
~ netaccess

assert atol("1.1.1.1") == 0x1010101
assert atol("192.168.0.1") == 0xc0a80001

= Test autorun functions
~ autorun

ret = autorun_get_text_interactive_session("IP().src")
ret
assert ret == (">>> IP().src\n'127.0.0.1'\n", '127.0.0.1')

ret = autorun_get_html_interactive_session("IP().src")
ret
assert ret == ("<span class=prompt>&gt;&gt;&gt; </span>IP().src\n'127.0.0.1'\n", '127.0.0.1')

ret = autorun_get_latex_interactive_session("IP().src")
ret
assert ret == ("\\textcolor{blue}{{\\tt\\char62}{\\tt\\char62}{\\tt\\char62} }IP().src\n'127.0.0.1'\n", '127.0.0.1')

ret = autorun_get_text_interactive_session("scapy_undefined")
assert "NameError" in ret[0]

= Test autorun with logging

cmds = """log_runtime.info(hex_bytes("446166742050756e6b"))\n"""
ret = autorun_get_text_interactive_session(cmds)
ret
assert "Daft Punk" in ret[0]

= Test utility TEX functions

assert tex_escape("{scapy}\\^$~#_&%|><") == "{\\tt\\char123}scapy{\\tt\\char125}{\\tt\\char92}\\^{}\\${\\tt\\char126}\\#\\_\\&\\%{\\tt\\char124}{\\tt\\char62}{\\tt\\char60}"

a = colgen(1, 2, 3)
assert next(a) == (1, 2, 2)
assert next(a) == (1, 3, 3)
assert next(a) == (2, 2, 1)
assert next(a) == (2, 3, 2)
assert next(a) == (2, 1, 3)
assert next(a) == (3, 3, 1)
assert next(a) == (3, 1, 2)
assert next(a) == (3, 2, 3)

= Test config file functions

saved_conf_verb = conf.verb
fd, fname = tempfile.mkstemp()
os.write(fd, b"conf.verb = 42\n")
os.close(fd)
from scapy.main import _read_config_file
_read_config_file(fname, globals(), locals())
assert conf.verb == 42
conf.verb = saved_conf_verb

= Test config file functions failures

from scapy.main import _read_config_file, _probe_config_folder
assert _read_config_file(_probe_config_folder("filethatdoesnotexistnorwillever.tsppajfsrdrr")) is None

= Test CacheInstance repr

conf.netcache

= Test pyx detection functions

from unittest.mock import patch

def _r(*args, **kwargs):
    raise OSError

with patch("scapy.libs.test_pyx.subprocess.check_call", _r):
    from scapy.libs.test_pyx import _test_pyx
    assert _test_pyx() == False

= Test matplotlib detection functions

from unittest.mock import MagicMock, patch

bck_scapy_libs_matplot = sys.modules.get("scapy.libs.matplot", None)
if bck_scapy_libs_matplot:
    del sys.modules["scapy.libs.matplot"]

mock_matplotlib = MagicMock()
mock_matplotlib.get_backend.return_value = "inline"
mock_matplotlib.pyplot = MagicMock()
mock_matplotlib.pyplot.plt = None
with patch.dict("sys.modules", **{ "matplotlib": mock_matplotlib, "matplotlib.lines": mock_matplotlib}):
    from scapy.libs.matplot import MATPLOTLIB, MATPLOTLIB_INLINED, MATPLOTLIB_DEFAULT_PLOT_KARGS, Line2D
    assert MATPLOTLIB == 1
    assert MATPLOTLIB_INLINED == 1
    assert "marker" in MATPLOTLIB_DEFAULT_PLOT_KARGS

mock_matplotlib.get_backend.return_value = "ko"
with patch.dict("sys.modules", **{ "matplotlib": mock_matplotlib, "matplotlib.lines": mock_matplotlib}):
    from scapy.libs.matplot import MATPLOTLIB, MATPLOTLIB_INLINED, MATPLOTLIB_DEFAULT_PLOT_KARGS
    assert MATPLOTLIB == 1
    assert MATPLOTLIB_INLINED == 0
    assert "marker" in MATPLOTLIB_DEFAULT_PLOT_KARGS

if bck_scapy_libs_matplot:
    sys.modules["scapy.libs.matplot"] = bck_scapy_libs_matplot


############
############
+ Basic tests

* Those test are here mainly to check nothing has been broken
* and to catch Exceptions

= Packet class methods
p = IP()/ICMP()
ret = p.do_build_ps()                                                                                                                             
assert ret[0] == b"@\x00\x00\x00\x00\x01\x00\x00@\x01\x00\x00\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\x00\x00\x00\x00\x00\x00"
assert len(ret[1]) == 2

assert p[ICMP].firstlayer() == p

assert p.command() == "IP()/ICMP()"

p.decode_payload_as(UDP)
assert p.sport == 2048 and p.dport == 63487

= hide_defaults
conf_color_theme = conf.color_theme
conf.color_theme = BlackAndWhite()
p = IP(ttl=64)/ICMP()
assert repr(p) in ["<IP  frag=0 ttl=64 proto=icmp |<ICMP  |>>", "<IP  frag=0 ttl=64 proto=1 |<ICMP  |>>"]
p.hide_defaults()
assert repr(p) in ["<IP  frag=0 proto=icmp |<ICMP  |>>", "<IP  frag=0 proto=1 |<ICMP  |>>"]
conf.color_theme = conf_color_theme

= split_layers
p = IP()/ICMP()
s = raw(p)
split_layers(IP, ICMP, proto=1)
assert Raw in IP(s)
bind_layers(IP, ICMP, frag=0, proto=1)

= fuzz

r = fuzz(IP(tos=2)/ICMP())
assert r.tos == 2
z = r.ttl
assert r.ttl != z
assert r.ttl != z


= fuzz a Packet with MultipleTypeField

fuzz(ARP(pdst="127.0.0.1"))
fuzz(IP()/ARP(pdst='10.0.0.254'))

= fuzz on packets with advanced RandNum

x = IP(dst="8.8.8.8")/fuzz(UDP()/NTP(version=4))
x.show2()
x = IP(raw(x))
assert NTP in x

= fuzz on packets with FlagsField
assert isinstance(fuzz(TCP()).flags, VolatileValue)

= Building some packets
~ basic IP TCP UDP NTP LLC SNAP Dot11
IP()/TCP()
Ether()/IP()/UDP()/NTP()
Dot11()/LLC()/SNAP()/IP()/TCP()/"XXX"
IP(ttl=25)/TCP(sport=12, dport=42)
IP().summary()

= Manipulating some packets
~ basic IP TCP
a=IP(ttl=4)/TCP()
a.ttl
a.ttl=10
del a.ttl
a.ttl
TCP in a
a[TCP]
a[TCP].dport=[80,443]
a
assert a.copy().time == a.time
a=3

= Bind string array as payload
~ basic
assert bytes(Raw("sca")/"py") == b"scapy"
assert bytes(Raw("sca")/b"py") == b"scapy"
assert bytes(Raw("sca")/bytearray(b"py")) == b"scapy"
assert bytes("sca"/Raw("py")) == b"scapy"
assert bytes(b"sca"/Raw("py")) == b"scapy"
assert bytes(bytearray(b"sca")/Raw("py")) == b"scapy"
a=Raw("sca")
a.add_payload("py")
assert bytes(a) == b"scapy"
a=Raw("sca")
a.add_payload(b"py")
assert bytes(a) == b"scapy"
a=Raw("sca")
a.add_payload(bytearray(b"py"))
assert bytes(a) == b"scapy"

= Checking overloads
~ basic IP TCP Ether
a=Ether()/IP()/TCP()
r = a.proto
r
r == 6


= sprintf() function
~ basic sprintf Ether IP UDP NTP
a=Ether()/IP()/IP(ttl=4)/UDP()/NTP()
r = a.sprintf("%type% %IP.ttl% %#05xr,UDP.sport% %IP:2.ttl%")
r
r in ['0x800 64 0x07b 4', 'IPv4 64 0x07b 4']


= sprintf() function 
~ basic sprintf IP TCP SNAP LLC Dot11
* This test is on the conditional substring feature of <tt>sprintf()</tt>
a=Dot11()/LLC()/SNAP()/IP()/TCP()
r = a.sprintf("{IP:{TCP:flags=%TCP.flags%}{UDP:port=%UDP.ports%} %IP.src%}")
r
r == 'flags=S 127.0.0.1'


= haslayer function
~ basic haslayer IP TCP ICMP ISAKMP
x=IP(id=1)/ISAKMP_payload_SA(prop=ISAKMP_payload_SA(prop=IP()/ICMP()))/TCP()
r = (TCP in x, ICMP in x, IP in x, UDP in x)
r
r == (True,True,True,False)

= getlayer function
~ basic getlayer IP ISAKMP UDP
x=IP(id=1)/ISAKMP_payload_SA(prop=IP(id=2)/UDP(dport=1))/IP(id=3)/UDP(dport=2)
x[IP]
x[IP:2]
x[IP:3]
x.getlayer(IP,3)
x.getlayer(IP,4)
x[UDP]
x[UDP:1]
x[UDP:2]
assert(x[IP].id == 1 and x[IP:2].id == 2 and x[IP:3].id == 3 and 
       x.getlayer(IP).id == 1 and x.getlayer(IP,3).id == 3 and
       x.getlayer(IP,4) == None and
       x[UDP].dport == 1 and x[UDP:2].dport == 2)
try:
    x[IP:4]
except IndexError:
    True
else:
    False

= getlayer / haslayer with name
~ basic getlayer IP ICMP IPerror TCPerror
x = IP() / ICMP() / IPerror()
assert x.getlayer(ICMP) is not None
assert x.getlayer(IPerror) is not None
assert x.getlayer("IP in ICMP") is not None
assert x.getlayer(TCPerror) is None
assert x.getlayer("TCP in ICMP") is None
assert x.haslayer(ICMP)
assert x.haslayer(IPerror)
assert x.haslayer("IP in ICMP")
assert not x.haslayer(TCPerror)
assert not x.haslayer("TCP in ICMP")

= getlayer with a filter
~ getlayer IP
pkt = IP() / IP(ttl=3) / IP()
assert pkt[IP::{"ttl":3}].ttl == 3
assert pkt.getlayer(IP, ttl=3).ttl == 3
assert IPv6ExtHdrHopByHop(options=[HBHOptUnknown()]).getlayer(HBHOptUnknown, otype=42) is None

= specific haslayer and getlayer implementations for EAP
~ haslayer getlayer EAP
pkt = Ether() / EAPOL() / EAP_MD5()
assert EAP in pkt
assert pkt.haslayer(EAP)
assert isinstance(pkt[EAP], EAP_MD5)
assert isinstance(pkt.getlayer(EAP), EAP_MD5)

= specific haslayer and getlayer implementations for RadiusAttribute
~ haslayer getlayer RadiusAttribute
pkt = RadiusAttr_EAP_Message()
assert RadiusAttribute in pkt
assert pkt.haslayer(RadiusAttribute)
assert isinstance(pkt[RadiusAttribute], RadiusAttr_EAP_Message)
assert isinstance(pkt.getlayer(RadiusAttribute), RadiusAttr_EAP_Message)


= equality
~ basic
w=Ether()/IP()/UDP(dport=53)
x=Ether()/IP(version=4)/UDP()
y=Ether()/IP()/UDP(dport=4)
z=Ether()/IP()/UDP()/NTP()
t=Ether()/IP()/TCP()
assert x != y and x != z and x != t and y != z and y != t and z != t and w == x

= answers
~ basic
a1, a2 = "1.2.3.4", "5.6.7.8"
p1 = IP(src=a1, dst=a2)/ICMP(type=8)
p2 = IP(src=a2, dst=a1)/ICMP(type=0)
assert p1.hashret() == p2.hashret()
assert not p1.answers(p2)
assert p2.answers(p1)
assert p1 > p2
assert p2 < p1
assert p1 == p1
conf_back = conf.checkIPinIP
conf.checkIPinIP = True
px = [IP()/p1, IPv6()/p1]
assert not any(p.hashret() == p2.hashret() for p in px)
assert not any(p.answers(p2) for p in px)
assert not any(p2.answers(p) for p in px)
conf.checkIPinIP = False
assert all(p.hashret() == p2.hashret() for p in px)
assert not any(p.answers(p2) for p in px)
assert all(p2.answers(p) for p in px)
conf.checkIPinIP = conf_back

= answers - Net
~ netaccess

a1, a2 = Net("www.google.com"), Net("www.secdev.org")
prt1, prt2 = 12345, 54321
s1, s2 = 2767216324, 3845532842
p1 = IP(src=a1, dst=a2)/TCP(flags='SA', seq=s1, ack=s2, sport=prt1, dport=prt2)
p2 = IP(src=a2, dst=a1)/TCP(flags='R', seq=s2, ack=0, sport=prt2, dport=prt1)
assert p2.answers(p1)
assert not p1.answers(p2)
# Not available yet because of IPv6
# a1, a2 = Net6("www.google.com"), Net6("www.secdev.org")
p1 = IP(src=a1, dst=a2)/TCP(flags='S', seq=s1, ack=0, sport=prt1, dport=prt2)
p2 = IP(src=a2, dst=a1)/TCP(flags='RA', seq=0, ack=s1+1, sport=prt2, dport=prt1)
assert p2.answers(p1)
assert not p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='S', seq=s1, ack=0, sport=prt1, dport=prt2)
p2 = IP(src=a2, dst=a1)/TCP(flags='SA', seq=s2, ack=s1+1, sport=prt2, dport=prt1)
assert p2.answers(p1)
assert not p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='A', seq=s1, ack=s2+1, sport=prt1, dport=prt2)
assert not p2.answers(p1)
assert p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='S', seq=s1, ack=0, sport=prt1, dport=prt2)
p2 = IP(src=a2, dst=a1)/TCP(flags='SA', seq=s2, ack=s1+10, sport=prt2, dport=prt1)
assert not p2.answers(p1)
assert not p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='A', seq=s1, ack=s2+1, sport=prt1, dport=prt2)
assert not p2.answers(p1)
assert not p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='A', seq=s1+9, ack=s2+10, sport=prt1, dport=prt2)
assert not p2.answers(p1)
assert not p1.answers(p2)

= conf.checkIPsrc

conf_checkIPsrc = conf.checkIPsrc
conf.checkIPsrc = 0
query = IP(id=42676, src='10.128.0.7', dst='192.168.0.1')/ICMP(id=26)
answer = IP(src='192.168.48.19', dst='10.128.0.7')/ICMP(type=11)/IPerror(id=42676, src='192.168.51.23', dst='192.168.0.1')/ICMPerror(id=26)
assert answer.answers(query)
conf.checkIPsrc = conf_checkIPsrc


############
############
+ command() / json() tests
~ command

= Test command() with normal packet

pkt = IP(dst="127.0.0.1", src="127.0.0.1") / UDP(dport=12345, sport=654)
assert pkt.command() == "IP(src='127.0.0.1', dst='127.0.0.1')/UDP(sport=654, dport=12345)"

= Test json() with normal packet

assert pkt.json() == '{"version": 4, "ihl": null, "tos": 0, "len": null, "id": 1, "flags": 0, "frag": 0, "ttl": 64, "proto": 17, "chksum": null, "src": "127.0.0.1", "dst": "127.0.0.1", "payload": {"sport": 654, "dport": 12345, "len": null, "chksum": null}}'

= Test command() with nested packet

pkt = DNS(qd=[DNSQR(qtype="A", qname="google.com")])
assert pkt.command() == "DNS(qd=[DNSQR(qname=b'google.com.', qtype=1)])"

= Test json() with nested packet

assert pkt.json() == '{"length": null, "id": 0, "qr": 0, "opcode": 0, "aa": 0, "tc": 0, "rd": 1, "ra": 0, "z": 0, "ad": 0, "cd": 0, "rcode": 0, "qdcount": null, "ancount": null, "nscount": null, "arcount": null, "qd": [{"qname": "google.com.", "qtype": 1, "unicastresponse": 0, "qclass": 1}]}'

= Test command() with ASN.1 packet

pkt = KRB_AP_REP(bytes(KRB_AP_REP(encPart=EncryptedData())))
assert pkt.command() == "KRB_AP_REP(pvno=ASN1_INTEGER(5), msgType=ASN1_INTEGER(15), encPart=EncryptedData(etype=ASN1_INTEGER(23), kvno=None, cipher=ASN1_STRING(b'')))"

= Test json(à with ASN.1 packet

assert pkt.json() == '{"pvno": {"type": "ASN1_INTEGER", "value": "5"}, "msgType": {"type": "ASN1_INTEGER", "value": "15"}, "encPart": {"etype": {"type": "ASN1_INTEGER", "value": "23"}, "kvno": null, "cipher": {"type": "ASN1_STRING", "value": ""}}}'

= Test command() with meaningless payload

pkt = PPTPStartControlConnectionReply() / IP(dst="127.0.0.1", src="127.0.0.1")
assert pkt.command() == "PPTPStartControlConnectionReply()/IP(src='127.0.0.1', dst='127.0.0.1')"

= Test json() with meaningless payload

assert pkt.json() == '{"len": 156, "type": 1, "magic_cookie": 439041101, "ctrl_msg_type": 2, "reserved_0": 0, "protocol_version": 256, "result_code": 1, "error_code": 0, "framing_capabilities": 0, "bearer_capabilities": 0, "maximum_channels": 65535, "firmware_revision": 256, "host_name": "linux", "vendor_string": "", "payload": {"version": 4, "ihl": null, "tos": 0, "len": null, "id": 1, "flags": 0, "frag": 0, "ttl": 64, "proto": 0, "chksum": null, "src": "127.0.0.1", "dst": "127.0.0.1"}}'


############
############
+ Tests on padding

= Padding assembly
r = raw(Padding("abc"))
r
assert r == b"abc" 
r = raw(Padding("abc")/Padding("def"))
r
assert r == b"abcdef" 
r = raw(Raw("ABC")/Padding("abc")/Padding("def"))
r
assert r == b"ABCabcdef" 
r = raw(Raw("ABC")/Padding("abc")/Raw("DEF")/Padding("def"))
r
assert r == b"ABCDEFabcdef"

= Padding and length computation
p = IP(raw(IP()/Padding("abc")))
p
assert p.len == 20 and len(p) == 23
p = IP(raw(IP()/Raw("ABC")/Padding("abc")))
p
assert p.len == 23 and len(p) == 26
p = IP(raw(IP()/Raw("ABC")/Padding("abc")/Padding("def")))
p
assert p.len == 23 and len(p) == 29

= PadField test
~ PadField padding

class TestPad(Packet):
    fields_desc = [ PadField(StrNullField("st", b""), 6, padwith=b"\xff"), StrField("id", b"")]

assert TestPad() == TestPad(raw(TestPad()))
assert raw(TestPad(st=b"st", id=b"id")) == b'st\x00\xff\xff\xffid'

= ReversePadField
~ PadField padding

class TestReversePad(Packet):
    fields_desc = [ ByteField("a", 0),
                    ReversePadField(IntField("b", 0), 4)]

assert raw(TestReversePad(a=1, b=0xffffffff)) == b'\x01\x00\x00\x00\xff\xff\xff\xff'
assert TestReversePad(raw(TestReversePad(a=1, b=0xffffffff))).b == 0xffffffff

############
############
+ Tests on default value changes mechanism

= Creation of an IPv3 class from IP class with different default values
class IPv3(IP):
    version = 3
    ttl = 32

= Test of IPv3 class
a = IPv3()
v,t = a.version, a.ttl
v,t
assert (v,t) == (3,32)
r = raw(a)
r
assert r == b'5\x00\x00\x14\x00\x01\x00\x00 \x00\xac\xe7\x7f\x00\x00\x01\x7f\x00\x00\x01'

############
############
+ ASN.1 tests

= ASN1 - ASN1_Object
assert ASN1_Object(1) == ASN1_Object(1)
assert ASN1_Object(1) > ASN1_Object(0)
assert ASN1_Object(1) >= ASN1_Object(1)
assert ASN1_Object(0) < ASN1_Object(1)
assert ASN1_Object(1) <= ASN1_Object(2)
assert ASN1_Object(1) != ASN1_Object(2)
ASN1_Object(2).show()

= ASN1 - RandASN1Object
a = RandASN1Object()
random.seed(0x2807)
o = bytes(a)
o
assert o in [
    b'\x1e\x023V',  # PyPy 2.7
    b'A\x02\x07q',  # Python 2.7
    b'F\x02\xfe\x92',   # python 3.7-3.9
]

= ASN1 - ASN1_BIT_STRING
a = ASN1_BIT_STRING("test", readable=True)
a
assert a.val == '01110100011001010111001101110100'
assert raw(a) == b'\x03\x05\x00test'

a = ASN1_BIT_STRING(b"\xff"*16, readable=True)
a
assert a.val == "1" * 128
assert raw(a) == b'\x03\x11\x00\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff'

= ASN1 - ASN1_SEQUENCE
a = ASN1_SEQUENCE([ASN1_Object(1), ASN1_Object(0)])
assert a.strshow() == '# ASN1_SEQUENCE:\n  <ASN1_Object[1]>\n  <ASN1_Object[0]>\n'

= ASN1 - ASN1_DECODING_ERROR
a = ASN1_DECODING_ERROR("error", exc=OSError(1))
assert repr(a) == "<ASN1_DECODING_ERROR['error']{{1}}>"
b = ASN1_DECODING_ERROR("error", exc=OSError(ASN1_BIT_STRING("0")))
assert repr(b) in ["<ASN1_DECODING_ERROR['error']{{<ASN1_BIT_STRING[0]=b'\\x00' (7 unused bits)>}}>",
                   "<ASN1_DECODING_ERROR['error']{{<ASN1_BIT_STRING[0]='\\x00' (7 unused bits)>}}>"]

= ASN1 - ASN1_INTEGER
a = ASN1_INTEGER(int("1"*23))
assert repr(a) in ["0x25a55a46e5da99c71c7 <ASN1_INTEGER[1111111111...1111111111]>",
                   "0x25a55a46e5da99c71c7 <ASN1_INTEGER[1111111111...111111111L]>"]

= ASN1 - ASN1_OID
assert raw(ASN1_OID("")) == b"\x06\x00"

= RandASN1Object(), specific crashes

import random

# ASN1F_NUMERIC_STRING
random.seed(1514315682)
raw(RandASN1Object())

# ASN1F_VIDEOTEX_STRING
random.seed(1240186058)
raw(RandASN1Object())

# ASN1F_UTC_TIME & ASN1F_GENERALIZED_TIME
random.seed(1873503288)
raw(RandASN1Object())

= SSID is parsed properly even with the presence of RSN Information
~ SSID RSN Information
# A regression test for https://github.com/secdev/scapy/pull/2685.
# https://github.com/secdev/scapy/issues/2683 describes a packet with
# RSN Information that isn't parsed properly,
# causing the SSID to be overridden.
# This test checks the SSID is parsed properly.
filename = scapy_path("/test/pcaps/bad_rsn_parsing_overrides_ssid.pcap")
frame = rdpcap(filename)[0]
beacon = frame.getlayer(5)
ssid = beacon.network_stats()['ssid']
assert ssid == "ROUTE-821E295"

= SSID is parsed properly even when the Country Information Tag Element has an odd length (not complying with the standard) and a missing pad byte
~ Missing Pad Byte in Country Info
# A regression test for https://github.com/secdev/scapy/pull/2685.
# https://github.com/secdev/scapy/issues/4132 describes a packet with
# a Country Information element tag that has an odd length, even though it's against the standard.
# The transmitter should have added a padding byte to make the length even, but it didn't.
# The effect on scapy used to be improper parsing of the next tag elements, causing the SSID to be overridden.
# This test checks the SSID is parsed properly.
from io import BytesIO
pcapfile = BytesIO(b'\n\r\r\n\x80\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x03\x00\x10\x00Linux 6.1.21-v8+\x04\x00E\x00Dumpcap (Wireshark) 3.4.10 (Git v3.4.10 packaged as 3.4.10-0+deb11u1)\x00\x00\x00\x00\x00\x00\x00\x80\x00\x00\x00\x01\x00\x00\x00@\x00\x00\x00\x7f\x00\x00\x00\x00\x04\x00\x00\x02\x00\x05\x00wifi2\x00\x00\x00\t\x00\x01\x00\t\x00\x00\x00\x0c\x00\x10\x00Linux 6.1.21-v8+\x00\x00\x00\x00@\x00\x00\x00\x06\x00\x00\x00\xb0\x01\x00\x00\x00\x00\x00\x00c\xd3\x87\x17\xe3c5\x82\x90\x01\x00\x00\x90\x01\x00\x00\x00\x00 \x00\xae@\x00\xa0 \x08\x00\xa0 \x08\x00\x00\x10\x0cd\x14@\x01\xa9\x00\x0c\x00\x00\x00\xa6\x00\xa8\x01\x80\x00\x00\x00\xff\xff\xff\xff\xff\xff\x02\xbf\xaf\x9f\xf8\x07\x02\xbf\xaf\x9f\xf8\x070\x96[p\xdcM\x06\x00\x00\x00d\x00\x11\x00\x00\x00\x01\x08\x8c\x12\x98$\xb0H`l\x03\x01,\x05\x04\x00\x01\x00\x00\x07QUS \x01\r\x80$\x01\x80(\x01\x80,\x01\x800\x01\x804\x01\x808\x01\x80<\x01\x80@\x01\x80d\x01\x80h\x01\x80l\x01\x80p\x01\x80t\x01\x80x\x01\x80|\x01\x80\x80\x01\x80\x84\x01\x80\x88\x01\x80\x8c\x01\x80\x90\x01\x80\x95\x01\x80\x99\x01\x80\x9d\x01\x80\xa1\x01\x80\xa5\x01\x800\x14\x01\x00\x00\x0f\xac\x04\x01\x00\x00\x0f\xac\x04\x01\x00\x00\x0f\xac\x02\x0c\x00;\x02s\x00-\x1a,\t\x13\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00,\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00=\x16,\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xdd\x18\x00P\xf2\x02\x01\x01\x81\x00\x03\xa4\x00\x00\'\xa4\x00\x00BC]\x00a\x11.\x00\xdd;\x00P\xf2\x04\x10J\x00\x01\x10\x10D\x00\x01\x02\x10I\x00\x06\x007*\x00\x01 \x10\x11\x00\x1358" Hisense Roku TV\x10T\x00\x08\x00\x07\x00P\xf2\x04\x00\x01\xdd\x16\xc8:k\x01\x01\x1048<@dhlptx|\x80\x84\x88\x8c\x90\xdd\x12Po\x9a\t\x02\x02\x00!\x0b\x03\x06\x00\x02\xbf\xaf\x9f\xf8\x07\xdd\rPo\x9a\n\x00\x00\x06\x01\x11\x1cD\x002\xf5N\xfbh\xb0\x01\x00\x00')
pktpcap = rdpcap(pcapfile)
frame = pktpcap[0]
beacon = frame.getlayer(4)
stats = beacon.network_stats()
ssid = stats['ssid']
assert ssid == ""
country = stats['country']
assert country == 'US'

############
############
+ Network tests

* Those tests need network access

= Sending and receiving an ICMP
~ netaccess needs_root IP ICMP icmp_firewall
def _test():
    with no_debug_dissector():
    	x = sr1(IP(dst="www.google.com")/ICMP(),timeout=3)
    assert x is not None
    x
    assert x[IP].ottl() in [32, 64, 128, 255]
    assert 0 <= x[IP].hops() <= 126
    assert ICMP in x and x[ICMP].type == 0

retry_test(_test)

= Sending a TCP syn message at layer 2 and layer 3
~ netaccess needs_root IP
def _test():
    tmp = send(IP(dst="8.8.8.8")/TCP(sport=RandShort(), dport=53, flags="S"), return_packets=True, realtime=True)
    assert len(tmp) == 1
    
    tmp = sendp(Ether()/IP(dst="8.8.8.8")/TCP(sport=RandShort(), dport=53, flags="S"), return_packets=True, realtime=True)
    assert len(tmp) == 1
    
    p = Ether()/IP(dst="8.8.8.8")/TCP(sport=RandShort(), dport=53, flags="S")
    from decimal import Decimal
    p.time = Decimal(p.time)
    tmp = sendp(p, return_packets=True, realtime=True)
    assert len(tmp) == 1

retry_test(_test)

= Latency check: localhost ICMP
~ netaccess needs_root linux latency

# Note: still needs to enforce L3RawSocket as this won't work otherwise with libpcap
sock = conf.L3socket
conf.L3socket = L3RawSocket

def _test():
    req = IP(dst="127.0.0.1")/ICMP()
    ans = sr1(req, timeout=3)
    assert (ans.time - req.sent_time) >= 0
    assert (ans.time - req.sent_time) <= 1e-3

try:
    retry_test(_test)
finally:
    conf.L3socket = sock

= Test sniffing on multiple sockets
~ netaccess needs_root sniff

# This test sniffs on the same interface twice at the same time, to
# simulate sniffing on multiple interfaces.

def _test():
	iface = conf.route.route(str(Net("www.google.com")))[0]
	port = int(RandShort())
	pkt = IP(dst="www.google.com")/TCP(sport=port, dport=80, flags="S")
	def cb():
	    sr1(pkt, timeout=3)
	sniffer = AsyncSniffer(started_callback=cb,
			       iface=[iface, iface],
			       lfilter=lambda x: TCP in x and x[TCP].dport == port,
			       prn=lambda x: x.summary(),
			       count=2)
	sniffer.start()
	sniffer.join(timeout=3)
	assert len(sniffer.results) == 2
	for pkt in sniffer.results:
	    assert pkt.sniffed_on == iface

retry_test(_test)

= Test sniffing with AsyncSniffer on failed

try:
    sniffer = AsyncSniffer(iface="this_interface_does_not_exists")
    sniffer.start()
    sniffer.join()
    assert False, "Should have errored by now"
except ValueError:
    assert True

= Sending a TCP syn 'forever' at layer 2 and layer 3
~ netaccess needs_root IP
def _test():
    tmp = srloop(IP(dst="8.8.8.8")/TCP(sport=RandShort(), dport=53, flags="S"), count=1, timeout=3)
    assert type(tmp) == tuple and len(tmp[0]) == 1
    
    tmp = srploop(Ether()/IP(dst="8.8.8.8")/TCP(sport=RandShort(), dport=53, flags="S"), count=1, timeout=3)
    assert type(tmp) == tuple and len(tmp[0]) == 1

retry_test(_test)

= Sending and receiving an TCP syn with flooding methods
~ netaccess needs_root IP flood
from functools import partial
# flooding methods do not support timeout. Packing the test for security
def _test_flood(ip, flood_function, add_ether=False):
    with no_debug_dissector():
        p = IP(dst=ip)/TCP(sport=RandShort(), dport=80, flags="S")
        if add_ether:
            p = Ether()/p
        p.show2()
        x = flood_function(p, timeout=0.5, maxretries=10)
    if type(x) == tuple:
        x = x[0][0][1]
    x
    assert x[IP].ottl() in [32, 64, 128, 255]
    assert 0 <= x[IP].hops() <= 126

_test_srflood = partial(_test_flood, "www.google.com", srflood)
retry_test(_test_srflood)

_test_sr1flood = partial(_test_flood, "www.google.fr", sr1flood)
retry_test(_test_sr1flood)

_test_srpflood = partial(_test_flood, "www.google.net", srpflood, True)
retry_test(_test_srpflood)

_test_srp1flood = partial(_test_flood, "www.google.co.uk", srp1flood, True)
retry_test(_test_srp1flood)

= test chainEX
~ netaccess

import socket
sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ssck = StreamSocket(sck)

try:
    r = ssck.sr1(ICMP(type='echo-request'), timeout=0.1, chainEX=True, threaded=False)
    assert False
except Exception:
    assert True
finally:
    sck.close()

= Sending and receiving an ICMPv6EchoRequest
~ netaccess ipv6
def _test():
    with no_debug_dissector():
        x = sr1(IPv6(dst="www.google.com")/ICMPv6EchoRequest(),timeout=3)
    x
    assert x[IPv6].ottl() in [32, 64, 128, 255]
    assert 0 <= x[IPv6].hops() <= 126
    x is not None and ICMPv6EchoReply in x and x[ICMPv6EchoReply].type == 129

retry_test(_test)

= Whois request
~ netaccess IP as_resolvers
* This test retries on failure because it often fails
def _test():
    IP(src="8.8.8.8").whois()

retry_test(_test)

= AS resolvers
~ netaccess IP as_resolvers
* This test retries on failure because it often fails

def _test():
    ret = conf.AS_resolver.resolve("8.8.8.8", "8.8.4.4")
    assert (len(ret) == 2)
    assert any(x[1] == "AS15169" for x in ret)

retry_test(_test)

riswhois_data = b"route:      8.8.8.0/24\ndescr:      Google\norigin:     AS15169\nnotify:     radb-contact@google.com\nmnt-by:     MAINT-AS15169\nchanged:    radb-contact@google.com 20150728\nsource:     RADB\n\nroute:         8.0.0.0/9\ndescr:         Proxy-registered route object\norigin:        AS3356\nremarks:       auto-generated route object\nremarks:       this next line gives the robot something to recognize\nremarks:       L'enfer, c'est les autres\nremarks:       \nremarks:       This route object is for a Level 3 customer route\nremarks:       which is being exported under this origin AS.\nremarks:       \nremarks:       This route object was created because no existing\nremarks:       route object with the same origin was found, and\nremarks:       since some Level 3 peers filter based on these objects\nremarks:       this route may be rejected if this object is not created.\nremarks:       \nremarks:       Please contact routing@Level3.net if you have any\nremarks:       questions regarding this object.\nmnt-by:        LEVEL3-MNT\nchanged:       roy@Level3.net 20060203\nsource:        LEVEL3\n\n\n"

ret = AS_resolver_riswhois()._parse_whois(riswhois_data)
assert ret == ('AS15169', 'Google')

retry_test(_test)

# This test is too buggy, and is simulated below
#def _test():
#    ret = AS_resolver_cymru().resolve("8.8.8.8")
#    assert (len(ret) == 1)
#    all(x[1] == "AS15169" for x in ret)
#
#retry_test(_test)

cymru_bulk_data = """
Bulk mode; whois.cymru.com [2017-10-03 08:38:08 +0000]
24776   | 217.25.178.5     | INFOCLIP-AS, FR
36459   | 192.30.253.112   | GITHUB - GitHub, Inc., US
26496   | 68.178.213.61    | AS-26496-GO-DADDY-COM-LLC - GoDaddy.com, LLC, US
"""
tmp = AS_resolver_cymru().parse(cymru_bulk_data)
assert len(tmp) == 3
assert [l[1] for l in tmp] == ['AS24776', 'AS36459', 'AS26496']

= AS resolver - IPv6
~ netaccess IP as_resolvers
* This test retries on failure because it often fails

def _test():
    as_resolver6 = AS_resolver6()
    ret = as_resolver6.resolve("2001:4860:4860::8888", "2001:4860:4860::4444")
    assert (len(ret) == 2)
    assert any(x[1] == 15169 for x in ret)

retry_test(_test)

= AS resolver - socket error
~ IP
* This test checks that a failing resolver will not crash a script

class MockAS_resolver(object):
  def resolve(self, *ips):
    raise socket.error

asrm = AS_resolver_multi(MockAS_resolver())
assert len(asrm.resolve(["8.8.8.8", "8.8.4.4"])) == 0

= sendpfast
~ tcpreplay

old_interactive = conf.interactive
conf.interactive = False
try:
    sendpfast([])
    assert False
except Exception:
    assert True

conf.interactive = old_interactive
assert True

############
############
+ Generator tests

= Implicit logic 1
~ IP TCP
a = Ether() / IP(ttl=(5, 10)) / TCP(dport=[80, 443])
ls(a)
ls(a, verbose=True)
l = [p for p in a]
len(l) == 12

= Implicit logic 2
~ IP
a = IP(ttl=[1,2,(5,9)])
ls(a)
ls(a, verbose=True)
l = [p for p in a]
len(l) == 7

= Implicit logic 3
# In case there's a single option: __iter__ should not return self
a = Ether()/IP(src="127.0.0.1", dst="127.0.0.1")/ICMP()
for i in a:
    i.sent_time = 1

assert a.sent_time is None

# In case they are several, self should never be returned
a = Ether()/IP(src="127.0.0.1", dst="127.0.0.1")/ICMP(seq=(0, 5))
for i in a:
    i.sent_time = 1

assert a.sent_time is None


############
############
+ Real usages

= Port scan
~ netaccess needs_root IP TCP
def _test():
    with no_debug_dissector():
        ans,unans=sr(IP(dst="www.google.com/30")/TCP(dport=[80,443]), timeout=2)
    
    # New format: all Python versions
    ans.make_table(lambda s, r: (s.dst, s.dport, r.sprintf("{TCP:%TCP.flags%}{ICMP:%ICMP.code%}")))

retry_test(_test)

= Send & receive with debug_match
~ netaccess needs_root IP ICMP
def _test():
    old_debug_match = conf.debug_match
    conf.debug_match = True
    with no_debug_dissector():
        ans, unans = sr(IP(dst="www.google.fr") / TCP(sport=RandShort(), dport=80, flags="S"), timeout=2)
        assert ans[0].query == ans[0][0]
        assert ans[0].answer == ans[0][1]
    conf.debug_match = old_debug_match
    assert ans and not unans

retry_test(_test)

= Send & receive with retry
~ netaccess needs_root IP ICMP
def _test():
    with no_debug_dissector():
        ans, unans = sr(IP(dst=["8.8.8.8", "1.2.3.4"]) / TCP(sport=RandShort(), dport=53, flags="S"), timeout=2, retry=1)
    assert len(ans) == 1 and len(unans) == 1

retry_test(_test)

= Send & receive with multi
~ netaccess needs_root IP ICMP
def _test():
    with no_debug_dissector():
        ans, unans = sr(IP(dst=["8.8.8.8", "1.2.3.4"]) / TCP(sport=RandShort(), dport=53, flags="S"), timeout=2, multi=1)
    assert len(ans) >= 1 and len(unans) == 1

retry_test(_test)

= Traceroute function
~ netaccess needs_root tcpdump
* Let's test traceroute
def _test():
    with no_debug_dissector():
        ans, unans = traceroute("www.slashdot.org")
    ans.nsummary()
    s,r=ans[0]
    s.show()
    s.show(2)

retry_test(_test)

= send() and sniff()
~ netaccess needs_root

def _test():
    sendp(Ether()/IP(src="9.0.0.0")/UDP(), count=3, iface=conf.iface)

r = sniff(timeout=3, count=1,
          lfilter=lambda x: IP in x and x[IP].src == "9.0.0.0",
          iface=conf.iface,
          started_callback=_test)

assert r

= sniff() with socket failure
* GH issue 3631

REFPACKET = Ether()/IP()/UDP()

# A socket that fails after 10 packets
class OOPipe(ObjectPipe):
     def recv(self, x=MTU):
         self.i = getattr(self, "i", 0) + 1
         if self.i == 11:
             self.close()
             raise OSError("Giant failure")
         pkt = super(OOPipe, self).recv(x)
         self.send(REFPACKET)
         return pkt

o = OOPipe()
o.send(REFPACKET)

pkts = sniff(opened_socket=[o], timeout=3)
assert len(pkts) == 10

= GH issue 3306
~ netaccess needs_root

send(fuzz(ARP()))

= Test SuperSocket.select
~ select

from unittest import mock

@mock.patch("scapy.supersocket.select")
def _test_select(select):
    def f(a, b, c, d):
        raise IOError(0)
    select.side_effect = f
    try:
        SuperSocket.select([])
        return False
    except:
        return True

assert _test_select()

= Test L2ListenTcpdump socket
~ netaccess

# Needs to be fixed. Fails randomly
#import time
#for i in range(10):
#    read_s = L2ListenTcpdump(iface=conf.iface)
#    out_s = conf.L2socket(iface=conf.iface)
#    time.sleep(5)  # wait for read_s to be ready
#    icmp_r = Ether()/IP(dst="secdev.org")/ICMP()
#    res = sndrcv(out_s, icmp_r, timeout=5, rcv_pks=read_s)[0]
#    read_s.close()
#    out_s.close()
#    time.sleep(5)
#    if res:
#        break
#
#response = res[0][1]
#assert response[ICMP].type == 0

True

= Test set of sent_time by sr
~ netaccess needs_root IP ICMP
def _test():
    packet = IP(dst="8.8.8.8")/ICMP()
    r = sr(packet, timeout=2)
    assert packet.sent_time is not None

retry_test(_test)

= Test set of sent_time by sr (multiple packets)
~ netaccess needs_root IP ICMP
def _test():
    packet1 = IP(dst="8.8.8.8")/ICMP()
    packet2 = IP(dst="8.8.4.4")/ICMP()
    r = sr([packet1, packet2], timeout=2)
    assert packet1.sent_time is not None
    assert packet2.sent_time is not None

retry_test(_test)

= Test set of sent_time by srflood
~ netaccess needs_root IP ICMP
def _test():
    packet = IP(dst="8.8.8.8")/ICMP()
    r = srflood(packet, timeout=0.5)
    assert packet.sent_time is not None

retry_test(_test)

= Test set of sent_time by srflood (multiple packets)
~ netaccess needs_root IP ICMP
def _test():
    packet1 = IP(dst="8.8.8.8")/ICMP()
    packet2 = IP(dst="8.8.4.4")/ICMP()
    r = srflood([packet1, packet2], timeout=0.5)
    assert packet1.sent_time is not None
    assert packet2.sent_time is not None

retry_test(_test)

############
############
+ ManuFDB tests

= __repr__

if conf.manufdb:
    len(conf.manufdb)
else:
    True

= check _resolve_MAC

if conf.manufdb:
    assert conf.manufdb._resolve_MAC("00:00:17") == "Oracle"
else:
    True


############
############
+ Ether tests with IPv6

= Ether IPv6 checking for dst
~ netaccess ipv6

p = Ether()/IPv6(dst="www.google.com")/TCP()
assert p.dst != p[IPv6].dst
p.show()


############
############
+ pcap / pcapng format support
~ pcap

= Variable creations
from io import BytesIO
pcapfile = BytesIO(b'\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00e\x00\x00\x00\xcf\xc5\xacVo*\n\x00(\x00\x00\x00(\x00\x00\x00E\x00\x00(\x00\x01\x00\x00@\x06|\xcd\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91|\x00\x00\xcf\xc5\xacV_-\n\x00\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r\xcf\xc5\xacV\xf90\n\x00\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x01|\xde\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\xf7\xff\x00\x00\x00\x00')
pcapngfile = BytesIO(b'\n\r\r\n\\\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x01\x00,\x00File created by merging: \nFile1: test.pcap \n\x04\x00\x08\x00mergecap\x00\x00\x00\x00\\\x00\x00\x00\x01\x00\x00\x00\\\x00\x00\x00e\x00\x00\x00\xff\xff\x00\x00\x02\x006\x00Unknown/not available in original file format(libpcap)\x00\x00\t\x00\x01\x00\x06\x00\x00\x00\x00\x00\x00\x00\\\x00\x00\x00\x06\x00\x00\x00H\x00\x00\x00\x00\x00\x00\x00\x8d*\x05\x00/\xfc[\xcd(\x00\x00\x00(\x00\x00\x00E\x00\x00(\x00\x01\x00\x00@\x06|\xcd\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91|\x00\x00H\x00\x00\x00\x06\x00\x00\x00<\x00\x00\x00\x00\x00\x00\x00\x8d*\x05\x00\x1f\xff[\xcd\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r<\x00\x00\x00\x06\x00\x00\x00<\x00\x00\x00\x00\x00\x00\x00\x8d*\x05\x00\xb9\x02\\\xcd\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x01|\xde\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\xf7\xff\x00\x00\x00\x00<\x00\x00\x00')
pcapnanofile = BytesIO(b"M<\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00e\x00\x00\x00\xcf\xc5\xacV\xc9\xc1\xb5'(\x00\x00\x00(\x00\x00\x00E\x00\x00(\x00\x01\x00\x00@\x06|\xcd\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91|\x00\x00\xcf\xc5\xacV-;\xc1'\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r\xcf\xc5\xacV\x9aL\xcf'\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x01|\xde\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\xf7\xff\x00\x00\x00\x00")
pcapwirelenfile = BytesIO(b'\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00\x01\x00\x00\x00}\x87pZ.\xa2\x08\x00\x0f\x00\x00\x00\x10\x00\x00\x00\xff\xff\xff\xff\xff\xff GG\xee\xdd\xa8\x90\x00a')
pcapngdefaults = BytesIO(base64.b64decode(b'Cg0NChwAAABNPCsaAQAAAP//////////HAAAAAEAAAAgAAAAEgEAAP//AAAJAAEACUeZiQAAAAAgAAAAAQAAACAAAAASAQAA//8AAAkAAQAJAAAAAAAAACAAAAABAAAAIAAAABIBAAD//wAACQABAAkAAAAAAAAAIAAAAAEAAAAgAAAAEgEAAP//AAAJAAEACQAAAAAAAAAgAAAABgAAAIQBAAADAAAApO/bFdgJaeBiAQAAYgEAAFVVVVVVVVXV////////IMbr4D7PCABFAAFIlQkAAEAR5JwAAAAA/////wBEAEMBNJDsAQEGAFSpVwIACoAAAAAAAAAAAAAAAAAAAAAAACDG6+A+zwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABjglNjNQEB/wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAsOs+bAAAhAEAAAYAAACAAQAAAwAAAKTv2xXIDYznYAEAAGABAABVVVVVVVVV1QEAXn//+iDG6+A+zwgARQABRgGPAAAEEal3qf5wqO////rhbgdsATJi0U5PVElGWSAqIEhUVFAvMS4xDQpIT1NUOiAyMzkuMjU1LjI1NS4yNTA6MTkwMA0KQ0FDSEUtQ09OVFJPTDogbWF4LWFnZT0xODAwDQpMT0NBVElPTjogaHR0cDovLzE2OS4yNTQuMTEyLjE2ODo1NTAwMC9ucmMvZGRkLnhtbA0KTlQ6IHV1aWQ6NEQ0NTQ5MzAtMDIwMC0xMDAwLTgwMDEtMjBDNkVCRTAzRUNGDQpOVFM6IHNzZHA6YWxpdmUNClNFUlZFUjogRnJlZUJTRC84LjAgVVBuUC8xLjAgUGFuYXNvbmljLU1JTC1ETE5BLVNWLzEuMA0KVVNOOiB1dWlkOjRENDU0OTMwLTAyMDAtMTAwMC04MDAxLTIwQzZFQkUwM0VDRg0KDQpcQcvWgAEAAAYAAAC4AQAAAwAAAKTv2xV4Ao3nlQEAAJUBAABVVVVVVVVV1QEAXn//+iDG6+A+zwgARQABewGQAAAEEalBqf5wqO////rhbgdsAWfu+k5PVElGWSAqIEhUVFAvMS4xDQpIT1NUOiAyMzkuMjU1LjI1NS4yNTA6MTkwMA0KQ0FDSEUtQ09OVFJPTDogbWF4LWFnZT0xODAwDQpMT0NBVElPTjogaHR0cDovLzE2OS4yNTQuMTEyLjE2ODo1NTAwMC9ucmMvZGRkLnhtbA0KTlQ6IHVybjpwYW5hc29uaWMtY29tOmRldmljZTpwMDBSZW1vdGVDb250cm9sbGVyOjENCk5UUzogc3NkcDphbGl2ZQ0KU0VSVkVSOiBGcmVlQlNELzguMCBVUG5QLzEuMCBQYW5hc29uaWMtTUlMLURMTkEtU1YvMS4wDQpVU046IHV1aWQ6NEQ0NTQ5MzAtMDIwMC0xMDAwLTgwMDEtMjBDNkVCRTAzRUNGOjp1cm46cGFuYXNvbmljLWNvbTpkZXZpY2U6cDAwUmVtb3RlQ29udHJvbGxlcjoxDQoNCrLVKmoAAAC4AQAABgAAAHgBAAADAAAApO/bFVjbjedXAQAAVwEAAFVVVVVVVVXVAQBef//6IMbr4D7PCABFAAE9AZEAAAQRqX6p/nCo7///+uFuB2wBKaZATk9USUZZICogSFRUUC8xLjENCkhPU1Q6IDIzOS4yNTUuMjU1LjI1MDoxOTAwDQpDQUNIRS1DT05UUk9MOiBtYXgtYWdlPTE4MDANCkxPQ0FUSU9OOiBodHRwOi8vMTY5LjI1NC4xMTIuMTY4OjU1MDAwL25yYy9kZGQueG1sDQpOVDogdXBucDpyb290ZGV2aWNlDQpOVFM6IHNzZHA6YWxpdmUNClNFUlZFUjogRnJlZUJTRC84LjAgVVBuUC8xLjAgUGFuYXNvbmljLU1JTC1ETE5BLVNWLzEuMA0KVVNOOiB1dWlkOjRENDU0OTMwLTAyMDAtMTAwMC04MDAxLTIwQzZFQkUwM0VDRjo6dXBucDpyb290ZGV2aWNlDQoNCjagXoUAeAEAAAYAAAC0AQAAAwAAAKTv2xXYw47nkwEAAJMBAABVVVVVVVVV1QEAXn//+iDG6+A+zwgARQABeQGSAAAEEalBqf5wqO////rhbgdsAWWV4E5PVElGWSAqIEhUVFAvMS4xDQpIT1NUOiAyMzkuMjU1LjI1NS4yNTA6MTkwMA0KQ0FDSEUtQ09OVFJPTDogbWF4LWFnZT0xODAwDQpMT0NBVElPTjogaHR0cDovLzE2OS4yNTQuMTEyLjE2ODo1NTAwMC9ucmMvZGRkLnhtbA0KTlQ6IHVybjpwYW5hc29uaWMtY29tOnNlcnZpY2U6cDAwTmV0d29ya0NvbnRyb2w6MQ0KTlRTOiBzc2RwOmFsaXZlDQpTRVJWRVI6IEZyZWVCU0QvOC4wIFVQblAvMS4wIFBhbmFzb25pYy1NSUwtRExOQS1TVi8xLjANClVTTjogdXVpZDo0RDQ1NDkzMC0wMjAwLTEwMDAtODAwMS0yMEM2RUJFMDNFQ0Y6OnVybjpwYW5hc29uaWMtY29tOnNlcnZpY2U6cDAwTmV0d29ya0NvbnRyb2w6MQ0KDQovXKFrALQBAAAGAAAAqAEAAAMAAACk79sVuJKP54cBAACHAQAAVVVVVVVVVdUBAF5///ogxuvgPs8IAEUAAW0BkwAABBGpTKn+cKjv///64W4HbAFZRNJOT1RJRlkgKiBIVFRQLzEuMQ0KSE9TVDogMjM5LjI1NS4yNTUuMjUwOjE5MDANCkNBQ0hFLUNPTlRST0w6IG1heC1hZ2U9MTgwMA0KTE9DQVRJT046IGh0dHA6Ly8xNjkuMjU0LjExMi4xNjg6NTUwMDAvbnJjL2RkZC54bWwNCk5UOiB1cm46ZGlhbC1tdWx0aXNjcmVlbi1vcmc6c2VydmljZTpkaWFsOjENCk5UUzogc3NkcDphbGl2ZQ0KU0VSVkVSOiBGcmVlQlNELzguMCBVUG5QLzEuMCBQYW5hc29uaWMtTUlMLURMTkEtU1YvMS4wDQpVU046IHV1aWQ6NEQ0NTQ5MzAtMDIwMC0xMDAwLTgwMDEtMjBDNkVCRTAzRUNGOjp1cm46ZGlhbC1tdWx0aXNjcmVlbi1vcmc6c2VydmljZTpkaWFsOjENCg0KLn5A6QCoAQAA'))

= Read a pcap file
pktpcap = rdpcap(pcapfile)

= Read a pcapng file
pktpcapng = rdpcap(pcapngfile)
assert pktpcapng[0].time == 1454163407.666223

= Read a pcap file with nanosecond precision
pktpcapnano = rdpcap(pcapnanofile)
assert pktpcapnano[0].time == 1454163407.666223049

= Read a pcapng file with nanosecond precision and default tsresol
pktpcapngdefaults = rdpcap(pcapngdefaults)
assert pktpcapngdefaults[0].time == 1575115986.114775512
assert Ether in pktpcapngdefaults[0]

= Read a pcapng with little-endian SHB
pktcapng = sniff(offline=scapy_path("/test/pcaps/macos.pcapng.gz"))
assert len(pktcapng) != 0

= Write a pcapng

tmpfile = get_temp_file(autoext=".pcapng")
r = RawPcapNgWriter(tmpfile)
r._write_block_shb()
r._write_block_idb(linktype=DLT_EN10MB)
ts = 1632568366.384185
r._write_block_epb(raw(Ether()/"Hello Scapy!!!"), ifid=0, timestamp=ts)
r.f.close()

assert os.stat(tmpfile).st_size == 108

l = rdpcap(tmpfile)
assert b"Scapy" in l[0][Raw].load
assert l[0].time == ts

= Check wrpcapng()

tmpfile = get_temp_file(autoext=".pcapng")
p = Ether()/"Hello Scapy!!!"
p.time = 1632568366.384185
wrpcapng(tmpfile, p)

assert os.stat(tmpfile).st_size == 108

l = rdpcap(tmpfile)
assert b"Scapy" in l[0][Raw].load
assert l[0].time == ts

p = Ether() / IPv6() / TCP()
p.comment = b"Hello Scapy!"
wrpcapng(tmpfile, p)
l = rdpcap(tmpfile)
assert l[0].comment == p.comment

= rdpcap on fifo
~ linux
f = get_temp_file()
os.unlink(f)
os.mkfifo(f)
p = Ether(bytes(Ether(dst="ff:ff:ff:ff:ff:ff")/"Hello Scapy!!!"))
s = AsyncSniffer(offline=f)
s.start()
wrpcap(f, p)
s.join(timeout=1)
assert s.results[0] == p

= Check multiple packets with different combination of linktype,comment,direction,sniffed_on fields. test both wrpcap() and wrpcapng()
import random,string
random.seed(0x2807)
plist = []
ptypes = []
ptypes.append(Ether((Ether() / IPv6() / TCP()).build())) 
ptypes.append(IP((IP() / IPv6() / TCP()).build()))
ifaces=[None,'','i','int0',''.join(random.choices(string.printable,k=20))]
comments=[None,'','a','abcd',''.join(random.choices(string.printable,k=20))]
directions=[None,0,1,2,3]

for iface in ifaces:
    for comment in comments:
        if comment is not None:
            comment=comment.encode('utf-8')
        for direction in directions:
            for p in ptypes:
                if iface is not None and type(ptypes[ifaces.index(iface) % len(ptypes)]) != type(p):
                    continue
                pnew = p.copy()
                pnew.time = 1632568366.384185
                pnew.sniffed_on = iface
                pnew.direction = direction
                pnew.comment = comment
                plist.append(pnew)

random.shuffle(plist)
tmpfile = get_temp_file(autoext=".pcapng")
wrpcapng(tmpfile, plist)
plist_check = rdpcap(tmpfile)
assert len(plist_check) == len(plist)
for i in range(len(plist)):
    assert plist_check[i].comment == plist[i].comment
    assert plist_check[i].direction == plist[i].direction
    assert plist_check[i].sniffed_on == plist[i].sniffed_on
    assert plist_check[i].time == plist[i].time
    #if interface is unknown, verify pkt bytes integrity and that linktype was set to first packet 
    if plist[i].sniffed_on is None:
        assert bytes(plist_check[i]) == bytes(plist[i])
        assert type(plist_check[i]) == type(plist[0])
    else:
        assert plist_check[i] == plist[i]

tmpfile = get_temp_file(autoext=".pcap")
wrpcap(tmpfile, plist)
plist_check = rdpcap(tmpfile)
for i in range(len(plist)):
    assert plist_check[i].time == plist[i].time
    assert type(plist_check[i]) == type(plist[0])
    assert bytes(plist_check[i]) == bytes(plist[i])

= PcapNg - Process Information Block

pib_pcapng_file = BytesIO(b'\n\r\r\n\xbc\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x02\x00\x05\x00arm64\x00\x00\x00\x03\x00f\x00Darwin Kernel Version 23.3.0: Thu Dec 21 02:29:41 PST 2023; root:xnu-10002.81.5~11/RELEASE_ARM64_T8122\x00\x00\x04\x00 \x00tcpdump (libpcap version 1.10.1)\x00\x00\x00\x00\xbc\x00\x00\x00\x01\x00\x00\x00 \x00\x00\x00\x01\x00\x00\x00\x00\x00\x08\x00\x02\x00\x03\x00en0\x00\x00\x00\x00\x00 \x00\x00\x00\x01\x00\x00\x80 \x00\x00\x00$\'\x00\x00\x02\x00\x06\x00trustd\x00\x00\x00\x00\x00\x00 \x00\x00\x00\x01\x00\x00\x80$\x00\x00\x00")\x00\x00\x02\x00\x0c\x00mobileassetd\x00\x00\x00\x00$\x00\x00\x00\x06\x00\x00\x00\x90\x00\x00\x00\x00\x00\x00\x00\xfb\x18\x06\x00EcqdB\x00\x00\x00B\x00\x00\x00\xe8\x9f\x80\xfa\x8c\xc6P\xa6\xd8\xd5\x83v\x08\x00E\x00\x004\x00\x00@\x00@\x06\x90T\nh\x01\xc3\xc0\xe5\xdd_\xf4\xb8\x00P\x95\xc3\xcb\x01\xcb\xeb\x11\xe8\x80\x11\x08\x00\x0c\xe6\x00\x00\x01\x01\x08\n\xbe\xb8\xd4\xb3\xbb\x9b4\xbc\x00\x00\x01\x80\x04\x00\x00\x00\x00\x00\x03\x80\x04\x00\x01\x00\x00\x00\x02\x00\x04\x00\x02\x00\x00\x00\x02\x80\x04\x00\x00\x00\x00\x00\x04\x80\x04\x00\x10\x00\x00\x00\x00\x00\x00\x00\x90\x00\x00\x00')

l = rdpcap(pib_pcapng_file)
assert(len(l) == 1)
assert(TCP in l[0])
assert(len(l[0].process_information) == 2)
assert(l[0].process_information["proc"]["name"] == "trustd")

= OSS-Fuzz Findings

from io import BytesIO
# Issue 68352
file = BytesIO(b"\n\r\r\n\x1c\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x1c\x00\x00\x00\x01\x00\x00\x00\x14\x00\x00\x00\xe4\x00\x00\x00\x00\x00\x04\x00\x14\x00\x00\x00\x01\x00\x00\x00(\x00\x00\x00\xe4\x00\x00\x00\x00\x00\x04\x00\x02\x00\t\x00b'ens16\xb0'\x00\x00\x00\x00\x00\x00\x00(\x00\x00\x00\x06\x00\x00\x004\x00\x00\x00\x01\x00\x00\x00}\x17\x06\x00\xb5t\x1d\x85\x14\x00\x00\x00\x14\x00\x00\x00E\x00\x00\x14\x00\x01\x00\x00@\x00|\xe7\x7f\x00\x00\x01\x7f\x00\x00\x014\x00\x00\x00")
rdpcap(file)

# Issue 68354
file = BytesIO(b'\n\r\r\n\xff\xfe\xfe\xffM<+\x1a')
try:
    rdpcap(file)
except Scapy_Exception:
    pass

# Issue #70115
file = BytesIO(b"\n\r\r\n\x00\x00\x008\x1a+<M\x00\x01\x00\x00\xef\xff\xff\x81\x01\x00\x00\x01\x00\x00\x00\x0c\x00\x00\x00\x00\x00\x00\x00\x009\x00\x00\x00\x01\x00\x00\x000\x00e\x00\x00\x00\x00\x00\x00\x00\x008\x00\x00\x00\x06\x00\x00\x00d\x00\x00\x00\x00'\x00\x00\x00\x01\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x000\x00\x00\x00\x01\x00\x00\x008\x00\x00\n\x0c\x00A\x05\xc0\x01\x00\x00\x00\x00\x00d\x00\x00\x00\x00\x00\x00\x00\x00\x000\x00\x00\x00\x01\x00\x00\x008\x00\x00\n\x0c\x00A\x05\xc0\x83\x00\x00\x043\x01\x00\x00\x00\x00\x00d\x00\x00\x00\x00d")
l = rdpcap(file)

# Issue #69628
file = BytesIO(b"\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00\x01\x00\x00\x00\x04{\xdcf\xc2\xa5\x07\x008\x00\x00\x008\x00\x00\x00A]+\xdb]\x04\x8e(6\n\x99\xcb\x08\x00E\x00\x00*\x00\x01\x00\x00@\x06\xe3V\x07\x87\xa5m\x17\x15\xd3m\x01\x85\x01\x85\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\xc5_\x00\x000\x00")
l = rdpcap(file)
assert l[0][LDAP].summary() == "LDAP"

# Issue #69628 - 32-bit alternative
file = BytesIO(b'\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00\x01\x00\x00\x00%\xa8\xddfK\x1b\x05\x00\xca\xca\xca\xca*\x00\x00\x00\xff\xff\xff\xff\xff\xff\x86"\x11&\xab3\x08\x06\x00\x01\x08\x00\x06\x04\x00\x01]\x80\x0f\x13*r\n\x00\x02\x0f\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
l = rdpcap(file)
assert len(l) == 0 or ARP in l[0]

= Read a pcap file with wirelen != captured len
pktpcapwirelen = rdpcap(pcapwirelenfile)

= Check all packet lists are the same
assert list(pktpcap) == list(pktpcapng) == list(pktpcapnano)
assert [float(p.time) for p in pktpcap] == [float(p.time) for p in pktpcapng] == [float(p.time) for p in pktpcapnano]

= Check packets from pcap file
assert all(IP in pkt for pkt in pktpcap)
assert all(any(proto in pkt for pkt in pktpcap) for proto in [ICMP, UDP, TCP])

= Check wirelen value from pcap file
assert len(pktpcapwirelen) == 1
assert pktpcapwirelen[0].wirelen is not None
assert len(pktpcapwirelen[0]) < pktpcapwirelen[0].wirelen

= Check wrpcap() then rdpcap() with wirelen
import os, tempfile
fdesc, filename = tempfile.mkstemp()
fdesc = os.fdopen(fdesc, "wb")
wrpcap(fdesc, pktpcapwirelen)
fdesc.close()
newpktpcapwirelen = rdpcap(filename)
assert len(newpktpcapwirelen) == 1
assert newpktpcapwirelen[0].wirelen is not None
assert len(newpktpcapwirelen[0]) < newpktpcapwirelen[0].wirelen
assert newpktpcapwirelen[0].wirelen == pktpcapwirelen[0].wirelen

= Check wrpcap() then rdpcap() with sent_time on SndRcvList
f = get_temp_file()
s = Ether()/IP()
r = Ether()/IP()
s.sent_time = 1
r.time = 2
wrpcap(f, SndRcvList([(s, r)]))
pcap = rdpcap(f)
assert pcap[0].time == 1
assert pcap[1].time == 2

= Check wrpcap()
fdesc, filename = tempfile.mkstemp()
fdesc = os.fdopen(fdesc, "wb")
wrpcap(fdesc, pktpcap)
fdesc.close()

= Check offline sniff() (by PacketList)
l=sniff(offline=PacketList([IP()/TCP(),IP()/TCP()]))
assert len(l) == 2
assert all(TCP in p for p in l)

= Check offline sniff() (by filename)
assert list(pktpcap) == list(sniff(offline=filename))

= Check offline sniff() (by file object)
fdesc = open(filename, "rb")
assert list(pktpcap) == list(sniff(offline=fdesc))
fdesc.close()

= Check offline sniff() with a filter (by filename)
~ tcpdump libpcap
pktpcap_flt = [(proto, sniff(offline=filename, filter=proto.__name__.lower()))
               for proto in [ICMP, UDP, TCP]]
assert all(list(pktpcap[proto]) == list(packets) for proto, packets in pktpcap_flt)

= Check offline sniff() with a filter (by file object)
~ tcpdump libpcap
fdesc = open(filename, "rb")
pktpcap_tcp = sniff(offline=fdesc, filter="tcp")
fdesc.close()
assert list(pktpcap[TCP]) == list(pktpcap_tcp)
os.unlink(filename)

= Check offline sniff() with a PcapNg file and a filter (by file object)
~ tcpdump libpcap

pcapng_data = b'\n\r\r\n`\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x04\x009\x00TShark (Wireshark) 3.2.3 (Git v3.2.3 packaged as 3.2.3-1)\x00\x00\x00\x00\x00\x00\x00`\x00\x00\x00\x01\x00\x00\x00\x14\x00\x00\x00\xe4\x00\x00\x00\xff\xff\x00\x00\x14\x00\x00\x00\x06\x00\x00\x00<\x00\x00\x00\x00\x00\x00\x00\x98\xcd\x05\x00\x19\x83\xf7\x9e\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r<\x00\x00\x00'

if OPENBSD:
    # Note: OpenBSD tcpdump does not support PcapNg
    assert True
else:
    fdesc, filename = tempfile.mkstemp()
    os.close(fdesc)
    fd = open(filename, "wb")
    fd.write(pcapng_data)
    fd.close()
    packets = sniff(offline=filename, filter="udp")
    os.unlink(filename)
    assert UDP in packets[0]

= Check offline sniff() with Packets and tcpdump with a filter
~ tcpdump libpcap

l = sniff(offline=IP()/UDP(sport=(10000, 10001)), filter="udp")
assert len(l) == 2
assert all(UDP in p for p in l)

l = sniff(offline=[p for p in IP()/UDP(sport=(10000, 10001))], filter="udp")
assert len(l) == 2
assert all(UDP in p for p in l)

l = sniff(offline=IP()/UDP(sport=(10000, 10001)), filter="tcp")
assert len(l) == 0

= Check offline sniff() with Packets, tcpdump and a bad filter
~ tcpdump libpcap 

try:
    sniff(offline=IP()/UDP(), filter="bad filter")
except Scapy_Exception:
    pass
else:
    assert False

= Check online sniff() with a bad filter
~ needs_root tcpdump libpcap
try:
    sniff(timeout=0, filter="bad filter")
except Scapy_Exception:
    pass
else:
    assert False

= Check offline sniff with lfilter
assert len(sniff(offline=[IP()/UDP(), IP()/TCP()], lfilter=lambda x: TCP in x)) == 1

= Check offline sniff() without a tcpdump binary
~ tcpdump
from unittest import mock

conf_prog_tcpdump = conf.prog.tcpdump
conf.prog.tcpdump = "tcpdump_fake"

def _test_sniff_notcpdump():
    try:
        sniff(offline="fake.pcap", filter="tcp")
        assert False
    except:
        assert True

_test_sniff_notcpdump()
conf.prog.tcpdump = conf_prog_tcpdump

= Check wrpcap(nano=True)
fdesc, filename = tempfile.mkstemp()
fdesc = os.fdopen(fdesc, "wb")
pktpcapnano[0].time += Decimal('1E-9')
wrpcap(fdesc, pktpcapnano, nano=True)
fdesc.close()
pktpcapnanoread = rdpcap(filename)
assert pktpcapnanoread[0].time == pktpcapnano[0].time
os.unlink(filename)

= Check PcapNg with nanosecond precision using obsolete packet block
* first packet from capture file icmp2.ntar -- https://wiki.wireshark.org/Development/PcapNg?action=AttachFile&do=view&target=icmp2.ntar
pcapngfile = BytesIO(b'\n\r\r\n\x1c\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xa8\x03\x00\x00\x00\x00\x00\x00\x1c\x00\x00\x00\x01\x00\x00\x00(\x00\x00\x00\x01\x00\x00\x00\xff\xff\x00\x00\r\x00\x01\x00\x04\x04K\x00\t\x00\x01\x00\tK=N\x00\x00\x00\x00(\x00\x00\x00\x02\x00\x00\x00n\x00\x00\x00\x00\x00\x00\x00e\x14\x00\x00)4\'ON\x00\x00\x00N\x00\x00\x00\x00\x12\xf0\x11h\xd6\x00\x13r\t{\xea\x08\x00E\x00\x00<\x90\xa1\x00\x00\x80\x01\x8e\xad\xc0\xa8M\x07\xc0\xa8M\x1a\x08\x00r[\x03\x00\xd8\x00abcdefghijklmnopqrstuvwabcdefghi\xeay$\xf6\x00\x00n\x00\x00\x00')
pktpcapng = rdpcap(pcapngfile)
assert len(pktpcapng) == 1
pkt = pktpcapng[0]
# weird, but wireshark agrees
assert pkt.time == 22425.352221737
assert isinstance(pkt, Ether)
pkt = pkt.payload
assert isinstance(pkt, IP)
pkt = pkt.payload
assert isinstance(pkt, ICMP)
pkt = pkt.payload
assert isinstance(pkt, Raw) and pkt.load == b'abcdefghijklmnopqrstuvwabcdefghi'
pkt = pkt.payload
assert isinstance(pkt, Padding) and pkt.load == b'\xeay$\xf6'
pkt = pkt.payload
assert isinstance(pkt, NoPayload)

= Check PcapNg using Simple Packet Block
* previous file with the (obsolete) packet block replaced by a Simple Packet Block
pcapngfile = BytesIO(b'\n\r\r\n\x1c\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xa8\x03\x00\x00\x00\x00\x00\x00\x1c\x00\x00\x00\x01\x00\x00\x00(\x00\x00\x00\x01\x00\x00\x00\xff\xff\x00\x00\r\x00\x01\x00\x04\x04K\x00\t\x00\x01\x00\tK=N\x00\x00\x00\x00(\x00\x00\x00\x03\x00\x00\x00`\x00\x00\x00N\x00\x00\x00\x00\x12\xf0\x11h\xd6\x00\x13r\t{\xea\x08\x00E\x00\x00<\x90\xa1\x00\x00\x80\x01\x8e\xad\xc0\xa8M\x07\xc0\xa8M\x1a\x08\x00r[\x03\x00\xd8\x00abcdefghijklmnopqrstuvwabcdefghi\xeay$\xf6\x00\x00`\x00\x00\x00')
pktpcapng = rdpcap(pcapngfile)
assert len(pktpcapng) == 1
pkt = pktpcapng[0]
assert isinstance(pkt, Ether)
pkt = pkt.payload
assert isinstance(pkt, IP)
pkt = pkt.payload
assert isinstance(pkt, ICMP)
pkt = pkt.payload
assert isinstance(pkt, Raw) and pkt.load == b'abcdefghijklmnopqrstuvwabcdefghi'
pkt = pkt.payload
assert isinstance(pkt, Padding) and pkt.load == b'\xeay$\xf6'
pkt = pkt.payload
assert isinstance(pkt, NoPayload)

= Invalid pcapng files

from io import BytesIO

# Invalid PCAPNG format -> Raise
try:
    invalid_pcapngfile_1 = BytesIO(b'\n\r\r\n\r\x00\x00\x00M<+\x1a\xb2<\xb2\xa1\x01\x00\x00\x00\r\x00\x00\x00M<+\x1a\x80\xaa\xb2\x02')
    rdpcap(invalid_pcapngfile_1)
    assert False
except Scapy_Exception:
    pass

# Invalid Packet in PCAPNG -> return
invalid_pcapngfile_2 = BytesIO(b'\n\r\r\n\x00\x00\x00\x1c\x1a+<M\x00\x01\x00\x00\x00\x00\x00\x01\x00\x00\x00\x10\x00\x00\x00\x1c\x00\x00\x00\x01\x00\x00\x00\x10\x12\x12\x12\x12\x00\x00\x00\x10')
assert len(rdpcap(invalid_pcapngfile_2)) == 0

# Invalid interface ID in PCAPNG -> raise EOFError
try:
    invalid_pcapngfile_3 = BytesIO(b'\n\n\n\x14\x00\x00\x00M<+\x1a    \x14\x00\x00\x00\x03\x00\x00\x00\x14\x00\x00\x00        \x14\x00\x00\x00')
    rdpcap(invalid_pcapngfile_3)
    assert False
except Scapy_Exception:
    pass

# Invalid SPB in PCAPNG -> raise EOFError
try:
    invalid_pcapngfile_4 = BytesIO(b'\n\n\n\x14\x00\x00\x00M<+\x1a    \x14\x00\x00\x00\x01\x00\x00\x00\x14\x00\x00\x00        \x14\x00\x00\x00\x03\x00\x00\x00\x0c\x00\x00\x00\x0c\x00\x00\x00')
    rdpcap(invalid_pcapngfile_4)
    assert False
except Scapy_Exception:
    pass

= Check PcapWriter on null write

f = BytesIO()
w = PcapWriter(f)
w.write([])
assert len(f.getvalue()) == 0

# Stop being closed for reals, but we still want to have the header written
with mock.patch.object(f, 'close') as cf:
    w.close()

cf.assert_called_once_with()
assert len(f.getvalue()) != 0

= Check PcapWriter sets correct linktype after null write

f = BytesIO()
w = PcapWriter(f)
w.write([])
assert len(f.getvalue()) == 0
w.write(Ether()/IP()/ICMP())
assert len(f.getvalue()) != 0

# Stop being closed for reals, but we still want to have the header written
with mock.patch.object(f, 'close') as cf:
    w.close()

cf.assert_called_once_with()
f.seek(0) or None
assert len(f.getvalue()) != 0

r = PcapReader(f)
f.seek(0) or None
assert r.LLcls is Ether
assert r.linktype == DLT_EN10MB

l = [ p for p in RawPcapReader(f) ]
assert len(l) == 1

= Check RawPcapReader on pcap
~ pcap

fd = get_temp_file()
wrpcap(fd, [Ether()/IP()/ICMP()])
assert len([p for p in RawPcapReader(fd)]) == 1

for (x, y) in RawPcapReader(fd):
    pass

= Check RawPcapReader with a Context Manager
~ pcap

filename = get_temp_file(fd=False)
wrpcap(filename, [IP()/TCP(), IP()/UDP()])

try:
    with RawPcapReader(filename) as reader:
        packet = next(reader, None)
    assert True
except TypeError:
    assert False

= Check RawPcapWriter
~ pcap

# GH3256
fd = get_temp_file()
with RawPcapWriter(fd, linktype=1) as w:
    w.write(b"test")

fd = get_temp_file()
with RawPcapWriter(fd) as w:
    w.write(b"test")
    assert w.linktype == 1

= Check tcpdump()
~ tcpdump
from io import BytesIO
* No very specific tests because we do not want to depend on tcpdump output
pcapfile = BytesIO(b'\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00\x01\x00\x00\x000}$]\xff\\\t\x006\x00\x00\x006\x00\x00\x00\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x08\x00E\x00\x00(\x00\x01\x00\x00@\x06|\xcd\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91|\x00\x000}$]\x87i\t\x00*\x00\x00\x00*\x00\x00\x00\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x08\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r0}$]\xfbp\t\x00*\x00\x00\x00*\x00\x00\x00\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x08\x00E\x00\x00\x1c\x00\x01\x00\x00@\x01|\xde\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\xf7\xff\x00\x00\x00\x00')

data = tcpdump(pcapfile, dump=True, args=['-nn']).split(b'\n')
print(data)
assert b'127.0.0.1.20 > 127.0.0.1.80:' in data[0]
assert b'127.0.0.1.53 > 127.0.0.1.53:' in data[1]
assert b'127.0.0.1 > 127.0.0.1:' in data[2]

* Non existing tcpdump binary

from unittest import mock

conf_prog_tcpdump = conf.prog.tcpdump
conf.prog.tcpdump = "tcpdump_fake"

def _test_tcpdump_notcpdump():
    try:
        tcpdump(IP()/TCP())
        assert False
    except:
        assert True

_test_tcpdump_notcpdump()
conf.prog.tcpdump = conf_prog_tcpdump

# Also check with use_tempfile=True (for non-OSX platforms)
pcapfile.seek(0) or None
tempfile_count = len(conf.temp_files)
data = tcpdump(pcapfile, dump=True, args=['-nn'], use_tempfile=True).split(b'\n')
print(data)
assert b'127.0.0.1.20 > 127.0.0.1.80:' in data[0]
assert b'127.0.0.1.53 > 127.0.0.1.53:' in data[1]
assert b'127.0.0.1 > 127.0.0.1:' in data[2]
# We should have another tempfile tracked.
assert len(conf.temp_files) > tempfile_count

# Check with a simple packet
data = tcpdump([Ether()/IP()/ICMP()], dump=True, args=['-nn']).split(b'\n')
print(data)
assert b'127.0.0.1 > 127.0.0.1: ICMP' in data[0].upper()

= Check tcpdump() command with linktype 
~ tcpdump libpcap

f = BytesIO()
pkt = Ether()/IP()/ICMP()

with mock.patch('subprocess.Popen', return_value=Bunch(
        stdin=f, wait=lambda: None)) as popen:
    # Prevent closing the BytesIO
    with mock.patch.object(f, 'close'):
        tcpdump([pkt], linktype="DLT_EN10MB", use_tempfile=False)

expected_command = [conf.prog.tcpdump, '-y', 'EN10MB', '-U', '-r', '-']
if OPENBSD:
    expected_command = [conf.prog.tcpdump, '-y', 'EN10MB', '-r', '-']

popen.assert_called_once_with(
    expected_command,
    stdin=subprocess.PIPE, stdout=None, stderr=None)

print(bytes_hex(f.getvalue()))
assert raw(pkt) in f.getvalue()
f.close()
del f, pkt

= Check tcpdump() command with linktype and args
~ tcpdump libpcap

f = BytesIO()
pkt = Ether()/IP()/ICMP()

with mock.patch('subprocess.Popen', return_value=Bunch(
        stdin=f, wait=lambda: None)) as popen:
    # Prevent closing the BytesIO
    with mock.patch.object(f, 'close'):
        tcpdump([pkt], linktype=scapy.data.DLT_EN10MB, use_tempfile=False)

expected_command = [conf.prog.tcpdump, '-y', 'EN10MB', '-U', '-r', '-']
if OPENBSD:
    expected_command = [conf.prog.tcpdump, '-y', 'EN10MB', '-r', '-']

popen.assert_called_once_with(
    expected_command,
    stdin=subprocess.PIPE, stdout=None, stderr=None)

print(bytes_hex(f.getvalue()))
assert raw(pkt) in f.getvalue()
f.close()
del f, pkt

= Check sniff() offline with linktype & 802.11 filter
~ tcpdump linux

fd = get_temp_file()
wrpcap(fd, [RadioTap()/Dot11()/Dot11ProbeReq(), RadioTap()/Dot11()])
lst = sniff(offline=fd, filter="subtype probe-req")
assert len(lst) == 1

= Check tcpdump() command rejects non-string input for prog

pkt = Ether()/IP()/ICMP()

try:
    tcpdump([pkt], prog=+17607067425, args=['-nn'])
except ValueError as e:
    if hasattr(e, 'args'):
        assert 'prog' in e.args[0]
    else:
        assert 'prog' in e.message
else:
    assert False, 'expected exception'

= Check tcpdump() command with tshark
~ tshark
pcapfile = BytesIO(b'\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00e\x00\x00\x00\xcf\xc5\xacVo*\n\x00(\x00\x00\x00(\x00\x00\x00E\x00\x00(\x00\x01\x00\x00@\x06|\xcd\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91|\x00\x00\xcf\xc5\xacV_-\n\x00\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r\xcf\xc5\xacV\xf90\n\x00\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x01|\xde\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\xf7\xff\x00\x00\x00\x00')
# tshark doesn't need workarounds on OSX
tempfile_count = len(conf.temp_files)
values = [tuple(int(val) for val in line[:-1].split(b'\t')) for line in tcpdump(pcapfile, prog=conf.prog.tshark, getfd=True, args=['-T', 'fields', '-e', 'ip.ttl', '-e', 'ip.proto'])]
assert values == [(64, 6), (64, 17), (64, 1)]
assert len(conf.temp_files) == tempfile_count

= Check tdecode command directly for tshark
~ tshark

pkts = [
    Ether()/IP(src='192.0.2.1', dst='192.0.2.2')/ICMP(type='echo-request')/Raw(b'X'*100),
    Ether()/IP(src='192.0.2.2', dst='192.0.2.1')/ICMP(type='echo-reply')/Raw(b'X'*100),
]

# tshark doesn't need workarounds on OSX
tempfile_count = len(conf.temp_files)

r = tdecode(pkts, dump=True)
r
assert b'Src: 192.0.2.1' in r
assert b'Src: 192.0.2.2' in r
assert b'Dst: 192.0.2.2' in r
assert b'Dst: 192.0.2.1' in r
assert b'Echo (ping) request' in r
assert b'Echo (ping) reply' in r
assert b'ICMP' in r
assert len(conf.temp_files) == tempfile_count

= Check tdecode with linktype
~ tshark

# These are the same as the ping packets above
pkts = [
  b'\xff\xff\xff\xff\xff\xff\xac"\x0b\xc5j\xdb\x08\x00E\x00\x00\x80\x00\x01\x00\x00@\x01\xf6x\xc0\x00\x02\x01\xc0\x00\x02\x02\x08\x00\xb6\xbe\x00\x00\x00\x00XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX',
  b'\xff\xff\xff\xff\xff\xff\xac"\x0b\xc5j\xdb\x08\x00E\x00\x00\x80\x00\x01\x00\x00@\x01\xf6x\xc0\x00\x02\x02\xc0\x00\x02\x01\x00\x00\xbe\xbe\x00\x00\x00\x00XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX',
]

# tshark doesn't need workarounds on OSX
tempfile_count = len(conf.temp_files)

r = tdecode(pkts, dump=True, linktype=DLT_EN10MB)
assert b'Src: 192.0.2.1' in r
assert b'Src: 192.0.2.2' in r
assert b'Dst: 192.0.2.2' in r
assert b'Dst: 192.0.2.1' in r
assert b'Echo (ping) request' in r
assert b'Echo (ping) reply' in r
assert b'ICMP' in r
assert len(conf.temp_files) == tempfile_count


= Run scapy's tshark command
~ needs_root
tshark(count=1, timeout=3)

= Check wireshark()
~ wireshark

f = BytesIO()
pkt = Ether()/IP()/ICMP()

with mock.patch('subprocess.Popen', return_value=Bunch(stdin=f)) as popen:
    # Prevent closing the BytesIO
    with mock.patch.object(f, 'close'):
        wireshark([pkt])

popen.assert_called_once_with(
    [conf.prog.wireshark, '-ki', '-'],
    stdin=subprocess.PIPE, stdout=None, stderr=None)

print(bytes_hex(f.getvalue()))
assert raw(pkt) in f.getvalue()
f.close()
del f, pkt

= Check Raw IP pcap files

import tempfile
filename = tempfile.mktemp(suffix=".pcap")
wrpcap(filename, [IP()/UDP(), IPv6()/UDP()], linktype=DLT_RAW)
packets = rdpcap(filename)
assert isinstance(packets[0], IP) and isinstance(packets[1], IPv6)

= Check wrpcap() with no packet

import tempfile
filename = tempfile.mktemp(suffix=".pcap")
wrpcap(filename, [])
fstat = os.stat(filename)
assert fstat.st_size != 0
os.remove(filename)

= Check wrpcap() with SndRcvList

import tempfile
filename = tempfile.mktemp(suffix=".pcap")
wrpcap(filename, SndRcvList(res=[(Ether()/IP(), Ether()/IP())]))
assert len(rdpcap(filename)) == 2
os.remove(filename)

= Check wrpcap() with different packets types

from unittest import mock
import os
import tempfile

with mock.patch("scapy.utils.warning") as warning:
    filename = tempfile.mktemp()
    wrpcap(filename, [IP(), Ether(), IP(), IP()])
    os.remove(filename)
    assert any("Inconsistent" in arg for arg in warning.call_args[0])

= Check wrpcap() with the Loopback layer
~ tshark

for cls in [Loopback, LoopbackOpenBSD]:
    filename = tempfile.mktemp(suffix=".pcap")
    wrpcap(filename, [cls()/IP()/ICMP()])
    return_value = b"".join(line for line in tcpdump(filename, prog=conf.prog.tshark, getfd=True))
    assert b"Echo (ping) request" in return_value

############
############
+ ERF Ethernet format support 

= Variable creations
erffile = BytesIO(b'3;!E_9\x92_\x02\x04\x00p\x00\x00\x00P\x00\x00\x00\x0fS?\xca\xc0\x1cjz\x18\x90\xed\x81\x00\x01:\x08\x00E\x00\x00(\xdf\xab@\x00;\x06\xb3s\n\x01]\xdb\n\xfb9\xda\xc3v\x84\xecD\x16\xb9\xab\xda\xa1b\xf9P\x10f\x98\x18\xcb\x00\x00\x00\x00\x90\x9e\xd7\xd2_\x929_\x0f\x9e\xcd\x1f\x01\x88\xb9\x15[/s<\x01\x88\xb9\x15[/\xcd\x1f\x01\x88\xb9\x15[/0\xcd"E_9\x92_\x02\x04\x00p\x00\x00\x00P\x00\x00\x1cjz\x18\x90\xed\x00\x0fS?\xca\xc0\x08\x00E\x00\x00(\xa2\xdd@\x00@\x06\xebA\n\xfb9\xda\n\x01]\xdb\x84\xec\xc3v\xda\xa1b\xf9D\x16\xb9\xacP\x10\x9a\xf0\xe4q\x00\x00\x00\x00\x00\x00\x00\x00o\xbc\xe2{_\x929_\x0f\x9f+3\x01\x88\xb9\x15u\x1e(^\x01\x88\xb9\x15u\x1e+3\x01\x88\xb9\x15u\x1e')
erffilewithheader = BytesIO(b'4;!E_9\x92_\x82\x00\x00x\x00\x00\x00P\x00\x00\x1a+<M^o\x00\x00\x00\x0fS?\xca\xc0\x1cjz\x18\x90\xed\x81\x00\x01:\x08\x00E\x00\x00(\xdf\xab@\x00;\x06\xb3s\n\x01]\xdb\n\xfb9\xda\xc3v\x84\xecD\x16\xb9\xab\xda\xa1b\xf9P\x10f\x98\x18\xcb\x00\x00\x00\x00\x90\x9e\xd7\xd2_\x929_\x0f\x9e\xcd\x1f\x01\x88\xb9\x15[/s<\x01\x88\xb9\x15[/\xcd\x1f\x01\x88\xb9\x15[/0\xcd"E_9\x92_\x82\x00\x00x\x00\x00\x00P\x00\x00\x1a+<M^o\x00\x00\x1cjz\x18\x90\xed\x00\x0fS?\xca\xc0\x08\x00E\x00\x00(\xa2\xdd@\x00@\x06\xebA\n\xfb9\xda\n\x01]\xdb\x84\xec\xc3v\xda\xa1b\xf9D\x16\xb9\xacP\x10\x9a\xf0\xe4q\x00\x00\x00\x00\x00\x00\x00\x00o\xbc\xe2{_\x929_\x0f\x9f+3\x01\x88\xb9\x15u\x1e(^\x01\x88\xb9\x15u\x1e+3\x01\x88\xb9\x15u\x1e')

= Check reading of ERF Ethernet file
pkterf = rderf(erffile)
assert pkterf[0].time == 1603418463.270038318
assert pkterf[0][IP].src == "10.1.93.219"
assert pkterf[0][IP].dst == "10.251.57.218"
assert pkterf[0][Ether].src == "1c:6a:7a:18:90:ed"
assert pkterf[0][Ether].dst == "00:0f:53:3f:ca:c0"

= Check writing of ERF Ethernet file
import os, tempfile
fdesc, filename = tempfile.mkstemp()
fdesc = os.fdopen(fdesc, "wb")
wrerf(fdesc, pkterf)
fdesc.close()
newpkterf = rderf(filename)

assert pkterf[1][Ether].src == newpkterf[1][Ether].src

assert len(pkterf) == len(newpkterf)
assert newpkterf[0].time is not None
assert newpkterf[0].wirelen is not None
assert newpkterf[0].time == pkterf[0].time
assert newpkterf[0].wirelen == pkterf[0].wirelen
assert newpkterf[1].time is not None
assert newpkterf[1].wirelen is not None
assert newpkterf[1].time == pkterf[1].time
assert newpkterf[1].wirelen == pkterf[1].wirelen

_, filename = tempfile.mkstemp()
wrerf(filename, pkterf, append=True)
wrerf(filename, pkterf, append=True)
newdoublepkterf = rderf(filename)

assert len(newpkterf) * 2 == len(newdoublepkterf)

= Check rderf
pkterf = rderf(erffilewithheader)
assert pkterf[1].time == 1603418463.270062279
assert pkterf[1][Ether].src == "00:0f:53:3f:ca:c0"

############
############
+ Mocked read_routes() and read_routes6() calls

= Create patcher util
~ mock_read_routes_bsd little_endian_only

# mock the random to get consistency
from unittest import mock

from scapy.pton_ntop import inet_pton
import scapy.arch.bpf.pfroute

og_afinet6 = socket.AF_INET6
og_inet_pton = socket.inet_pton
def mock_inet_pton(af, data):
    if af in [24, 28, 30]:
        return og_inet_pton(og_afinet6, data)
    return og_inet_pton(af, data)


og_inet_ntop = socket.inet_ntop
def mock_inet_ntop(af, data):
    if af in [24, 28, 30]:
        return og_inet_ntop(og_afinet6, data)
    return og_inet_ntop(af, data)


class BSDLoader:
    def __init__(self, OPENBSD=False, FREEBSD=False, NETBSD=False, DARWIN=False, sysctldata=None, ifaces={}, AF_INET6=socket.AF_INET6):
        self.sysctldata = sysctldata
        self.ifaces = ifaces
        socket.AF_LINK = 18
        self.loadpatches = [
            mock.patch('socket.AF_INET6', AF_INET6),
            mock.patch('socket.inet_pton', side_effect=mock_inet_pton),
            mock.patch('socket.inet_ntop', side_effect=mock_inet_ntop),
            mock.patch('scapy.consts.OPENBSD', OPENBSD),
            # mock.patch('scapy.consts.FREEBSD', FREEBSD),
            mock.patch('scapy.consts.NETBSD', NETBSD),
            mock.patch('scapy.consts.DARWIN', DARWIN),
        ]
    def __enter__(self):
        # Apply patches that only occur when loading
        for p in self.loadpatches:
            p.start()
        # Reload module
        pfroute = importlib.reload(scapy.arch.bpf.pfroute)
        # Now apply post-load patches
        self.patches = [
            mock.patch.object(
                pfroute,
                '_sr1_bsdsysctl',
                return_value=pfroute.pfmsghdrs(self.sysctldata)
            ),
            mock.patch.object(
                pfroute,
                '_get_if_list',
                return_value=self.ifaces,
            ),
        ]
        for p in self.patches:
            p.start()        
        return pfroute
    def __exit__(self, *args, **kwargs):
        for p in self.loadpatches:
            p.stop()
        for p in self.patches:
            p.stop()


= OpenBSD 7.5 amd64 - read_routes()
~ mock_read_routes_bsd little_endian_only

import zlib

_PFROUTE_DATA = zlib.decompress(bytes.fromhex('789c7bc1c0ca92c0c0c8c0c0c0c160cec2c0e0ccc10006de0f1850003f0376c08a431c06049830f96bc40f30e2925710626460636163c828cb3360108d6548e5c4aabf0bc6576248c9482ec8494d2c4e4dc1e78e03607f323380fd09243d71f853109be6067c2623dcf5008d5fcfc080e2cf0f48f20a42cc0c1240e7e4e41be0340f593fbafbbd71b81f2b20d2fdf504dcff9f2aee6704bbdf151873d8dc8f961ce0ee67c4264ec0bd18ee0702cadc0fe2b280ddefc8d880d5fdb80031ee07a66b747e1732ffff7f440a22359f5c80bb9f1912fe2ccc58ddcf03a5cd675f494316c71a2f98f6c1bd09761f1002dd26f4b900bb7ad4f820d73fd0f4c4a280d53f5c04dc4dc03f70fb88711f25fe3980ee1f0607acfe61a7c83fe7ffa3f2d1d317f9ee1f05a360148c8251300a46c128180543030000bd836967'))


with BSDLoader(OPENBSD=True, sysctldata=_PFROUTE_DATA, AF_INET6=24) as pfroute:
    routes = pfroute.read_routes()


assert routes == [
    (0, 0, '172.23.192.1', 'hvn0', '172.23.192.138', 1),
    (3758096384, 4026531840, '127.0.0.1', 'lo0', '127.0.0.1', 1),
    (2130706432, 4278190080, '127.0.0.1', 'lo0', '127.0.0.1', 1),
    (2130706433, 4294967295, '127.0.0.1', 'lo0', '127.0.0.1', 1),
    (2887237632, 4294963200, '172.23.192.138', 'hvn0', '172.23.192.138', 1),
    (2887237633, 4294967295, '0.0.0.0', 'hvn0', '172.23.192.138', 1),
    (2887237770, 4294967295, '0.0.0.0', 'hvn0', '172.23.192.138', 1),
    (2887241727, 4294967295, '172.23.192.138', 'hvn0', '172.23.192.138', 1)
]


= OpenBSD 7.5 amd64 - read_routes6()
~ mock_read_routes_bsd little_endian_only

import zlib

_PFROUTE_DATA = zlib.decompress(bytes.fromhex('789ced96bb0dc2301086cf38481125a248411131011d3505a2600906406204325a4661040a6a4264f130d6c5b19c87e2f07f458adcd9be2f7fe190984647924414d3a67c1e6252dcf7544f56dfb24cbceac2ac171a7a633a979494e39fce6baffde9e32f94ff8e56eab5e9bfe076c98866ecf6eee7fbf8ebdfa03dffbef2ffcd6f38f977eb9f4eecf5aaf9347fb6311cff8bd77ce3f1bf7acdf7f5bfb18de1f8f3f9fd4bfe8f8a5e67ff9c5f1f8c7f6eaf57cdd79ffffbfe4fd5eb0ef297dcf9aef5a6f77fd5fe7de55f087bdd80f1e7d7b7977fa4fcb7af7bdaf467c7cff899b8f34b7f69abbbe4cfad0f26ffc6ff3ffcfa60f29f0c347f00000000000000000000306a9e0a72ae83'))


with BSDLoader(OPENBSD=True, sysctldata=_PFROUTE_DATA, AF_INET6=24) as pfroute:
    routes = pfroute.read_routes6()


assert routes == [
    ('::', 96, '::1', 'lo0', ['::1'], 1),
    ('::1', 128, '::1', 'lo0', ['::1'], 1),
    ('::ffff:0.0.0.0', 96, '::1', 'lo0', ['::1'], 1),
    ('2002::', 24, '::1', 'lo0', ['::1'], 1),
    ('2002:7f00::', 24, '::1', 'lo0', ['::1'], 1),
    ('2002:e000::', 20, '::1', 'lo0', ['::1'], 1),
    ('2002:ff00::', 24, '::1', 'lo0', ['::1'], 1),
    ('fe80::', 10, '::1', 'lo0', ['::1'], 1),
    ('fec0::', 10, '::1', 'lo0', ['::1'], 1),
    ('fe80:3::1', 128, 'fe80:3::1', 'lo0', ['fe80:3::1'], 1),
    ('ff01::', 16, '::1', 'lo0', ['::1'], 1),
    ('ff01:3::', 32, 'fe80:3::1', 'lo0', ['fe80:3::1'], 1),
    ('ff02::', 16, '::1', 'lo0', ['::1'], 1),
    ('ff02:3::', 32, 'fe80:3::1', 'lo0', ['fe80:3::1'], 1)
]

= FreeBSD 14.1 amd64 - read_routes()
~ mock_read_routes_bsd little_endian_only

import zlib

from scapy.arch.bpf.pfroute import _bsd_iff_flags
_FREEBSD_IFACES = {1: {'name': 'lo0', 'index': 1, 'flags': FlagValue(32841, _bsd_iff_flags), 'mac': '00:00:00:00:00:00', 'ips': [{'af_family': 28, 'index': 1, 'address': '::1', 'scope': 16}, {'af_family': 28, 'index': 1, 'address': 'fe80::1', 'scope': 32}, {'af_family': 2, 'index': 1, 'address': '127.0.0.1'}]}, 2: {'name': 'hn0', 'index': 2, 'flags': FlagValue(34883, _bsd_iff_flags), 'mac': '00:15:5d:00:65:07', 'ips': [{'af_family': 28, 'index': 2, 'address': 'fe80::215:5dff:fe00:6507', 'scope': 32}, {'af_family': 2, 'index': 2, 'address': '172.23.198.182'}]}}

_PFROUTE_DATA = zlib.decompress(bytes.fromhex('789c136064656162606060e660603067200ceeb012a18808c008a55970c80b3061f2d7881f60c4256f21c4c4c0c6ccc6909167c0201acb90ca4ea43b20e61edb8670182b0bc8125606010663620c7020d2220280118d46072077d623490b0831324820c95b80f8cc0c0c39f90624d98b612e343d3002fd3f10e98109873c34fe117c507ca3c9ffffff01cea77a7ae01898f4c08c431edd9de8e141adf40000e8611aa8'))


with BSDLoader(FREEBSD=True, sysctldata=_PFROUTE_DATA, ifaces=_FREEBSD_IFACES, AF_INET6=28) as pfroute:
    routes = pfroute.read_routes()

assert routes == [
    (0, 0, '172.23.192.1', 'hn0', '172.23.198.182', 1),
    (2130706433, 4294967295, '0.0.0.0', 'lo0', '127.0.0.1', 1),
    (2887237632, 4294963200, '0.0.0.0', 'hn0', '172.23.198.182', 1),
    (2887239350, 4294967295, '0.0.0.0', 'lo0', '127.0.0.1', 1),
    (3758096384, 4026531840, '0.0.0.0', 'lo0', '127.0.0.1', 250),
    (3758096384, 4026531840, '0.0.0.0', 'hn0', '172.23.198.182', 250)
]

= FreeBSD 14.1 amd64 - read_routes6()
~ mock_read_routes_bsd little_endian_only

_PFROUTE_DATA = zlib.decompress(bytes.fromhex('789ce5553b0e8240109d593e62b72121b1a0e00824165a72042e40696261f40a1ecd837816101236c28461872801e36bb678f3797979bb9ba3e722006c0380030890498aecc0f6f4193e8ec7fb191e295f75d02d3c86083b07e0724b457aa57b93d64f2fd0b0970ccc26ad6781e4a4b0e9d68d1f1d622e7ff29fc95b3f2f6bcddbdafd2cefe33c33f6ede763b87f2e3fb3d64f04bd889f0ec3737e9a3e7a7f691ee9bc4ffd233ad0e858fafd530c6fd3fdedf78fdbd3e44b813c5f4f6fd27a16663f378ecb97f15387aa77d7edf9aaeb1d1fced714a2024e1ba14eaa43454555d6ed46c7d2f97219dea69bfaf7afff41c55c50f9ff3adc3f979f2f44725d78'))


with BSDLoader(FREEBSD=True, sysctldata=_PFROUTE_DATA, ifaces=_FREEBSD_IFACES, AF_INET6=28) as pfroute:
    routes = pfroute.read_routes6()

assert routes == [
    ('::', 96, '::1', 'lo0', ['::1'], 1),
    ('::1', 128, '::', 'lo0', ['::1'], 1),
    ('::ffff:0.0.0.0', 96, '::1', 'lo0', ['::1'], 1),
    ('fe80::', 10, '::1', 'lo0', ['::1'], 1),
    ('fe80::', 64, '::', 'lo0', ['fe80::1'], 1),
    ('fe80::1', 128, '::', 'lo0', ['fe80::1'], 1),
    ('fe80::', 64, '::', 'hn0', ['fe80::215:5dff:fe00:6507'], 1),
    ('fe80::215:5dff:fe00:6507', 128, '::', 'lo0', ['::1'], 1),
    ('ff02::', 16, '::1', 'lo0', ['::1'], 1),
    ('ff00::', 8, '::', 'lo0', ['::1', 'fe80::1'], 250),
    ('ff00::', 8, '::', 'hn0', ['fe80::215:5dff:fe00:6507'], 250)
]

= NetBSD 10.0 amd64 - read_routes()
~ mock_read_routes_bsd little_endian_only

import zlib

from scapy.arch.bpf.pfroute import _bsd_iff_flags
_NETBSD_IFACES = {1: {'name': 'hvn0', 'index': 1, 'flags': FlagValue(34883, _bsd_iff_flags), 'mac': '00:15:5d:00:65:0a', 'ips': [{'af_family': 24, 'index': 1, 'address': 'fe80:1::7184:2b50:9fbe:e337', 'scope': 32}, {'af_family': 2, 'index': 1, 'address': '172.23.207.191'}]}, 2: {'name': 'lo0', 'index': 2, 'flags': FlagValue(32841, _bsd_iff_flags), 'mac': '00:00:00:00:00:00', 'ips': [{'af_family': 2, 'index': 2, 'address': '127.0.0.1'}, {'af_family': 24, 'index': 2, 'address': '::1', 'scope': 16}, {'af_family': 24, 'index': 2, 'address': 'fe80:2::1', 'scope': 32}]}}

_PFROUTE_DATA = zlib.decompress(bytes.fromhex('789c3bc1c0c2c2c8c0c0c00cc4e60ca8c08a91816640800993bf46fc00868d42428c0c6c2c6c0c196579060ca2b10ca95cc8eacfef87a93b00f407c8486e0e4c7f600311cd94b81ed5ddf5987cb83f58ff0301c85d424c0c12c040cec937c0aa6e07d4fdac0c2c0cc6f4773fdc1de8ee24e4ee0bd0f4c3c88819ee2c2cd471232e7703d30b9c0f4e2758d4b183c2ffff0792d311b1f1402d80ee0e5cfec1161fc8fa6640e383550592a769058cef5d4943e6a3e75f3eb0fb813e108d15fa5c404387e000001e173214'))


with BSDLoader(NETBSD=True, sysctldata=_PFROUTE_DATA, ifaces=_NETBSD_IFACES, AF_INET6=24) as pfroute:
    routes = pfroute.read_routes()

assert routes == [
    (0, 0, '172.23.192.1', 'hvn0', '172.23.207.191', 1),
    (2130706432, 4294967040, '127.0.0.1', 'lo0', '127.0.0.1', 1),
    (2130706433, 4294967295, '0.0.0.0', 'lo0', '127.0.0.1', 1),
    (2887237632, 4294967295, '0.0.0.0', 'hvn0', '172.23.207.191', 1),
    (2887241663, 4294967295, '0.0.0.0', 'lo0', '172.23.207.191', 1),
    (2887237633, 4294967295, '0.0.0.0', 'hvn0', '', 1),
    (3758096384, 4026531840, '0.0.0.0', 'hvn0', '172.23.207.191', 250),
    (3758096384, 4026531840, '0.0.0.0', 'lo0', '127.0.0.1', 250)
]

= NetBSD 10.0 amd64 - read_routes6()
~ mock_read_routes_bsd little_endian_only

_PFROUTE_DATA = zlib.decompress(bytes.fromhex('789ced97b14ec3301445af4da8aa964a5544a50e1dd8592a31f01b8c2c1d2b31205017d65682ffe86f30213e8331123fd09109d3368a8127fbd9898c2c87de29ce4dac77749f1d0722cb24807e17b8845bd78f1e0f7968326ee48bea62a40cdadeefe712e323e0f67eea350f12e53f35e3d7e67f43c97f8c0c171e75ff31bfae8b72a49cebd2e1a3e57d5d387c38f837489b5f397cb43a7fa5787fafe0fbda07e2f29f89c133e713e9ba4f52e796bc4ff4bddfffc64e907bc9fa442de22e589fc8c0bd29c7c9712bd6276a4dde9f2bde27d275f72aead772dc847b3710c28f3b947e700b939fe7021dc3fd21f986ed9fcb3ab879b89b6234c3bc679e7ff1747eb57e79d78845cdf37928b9eab271db72b5cd53f5f3ce8c94abf18b65f1750fd07c196ee3fb75ffbb42c95597ef7f97edfdd8eb54897aeb949eb79aae53ddc79edca1f7e52d37dbc74441cf9b51f396ff346f1927ef830efa028ffb7c47'))


with BSDLoader(NETBSD=True, sysctldata=_PFROUTE_DATA, ifaces=_NETBSD_IFACES, AF_INET6=24) as pfroute:
    routes = pfroute.read_routes6()

assert routes == [
    ('::', 104, '::1', 'lo0', ['::1'], 1),
    ('::', 96, '::1', 'lo0', ['::1'], 1),
    ('::1', 128, '::', 'lo0', ['::1'], 1),
    ('::127.0.0.0', 104, '::1', 'lo0', ['::1'], 1),
    ('::224.0.0.0', 100, '::1', 'lo0', ['::1'], 1),
    ('::255.0.0.0', 104, '::1', 'lo0', ['::1'], 1),
    ('::ffff:0.0.0.0', 96, '::1', 'lo0', ['::1'], 1),
    ('2001:db8::', 32, '::1', 'lo0', ['::1'], 1),
    ('2002::', 24, '::1', 'lo0', ['::1'], 1),
    ('2002:7f00::', 24, '::1', 'lo0', ['::1'], 1),
    ('2002:e000::', 20, '::1', 'lo0', ['::1'], 1),
    ('2002:ff00::', 24, '::1', 'lo0', ['::1'], 1),
    ('fe80::', 10, '::1', 'lo0', ['::1'], 1),
    ('fe80:1::', 64, '::', 'hvn0', ['fe80:1::7184:2b50:9fbe:e337'], 1),
    ('fe80:1::7184:2b50:9fbe:e337',
     128,
     '::',
     'lo0',
     ['fe80:1::7184:2b50:9fbe:e337'],
     1),
    ('fe80:2::', 64, 'fe80:2::1', 'lo0', ['fe80:2::1'], 1),
    ('fe80:2::1', 128, '::', 'lo0', ['fe80:2::1'], 1),
    ('ff01:1::', 32, '::', 'hvn0', ['fe80:1::7184:2b50:9fbe:e337'], 1),
    ('ff01:2::', 32, '::1', 'lo0', ['::1'], 1),
    ('ff02:1::', 32, '::', 'hvn0', ['fe80:1::7184:2b50:9fbe:e337'], 1),
    ('ff02:2::', 32, '::1', 'lo0', ['::1'], 1),
    ('ff00::', 8, '::', 'hvn0', ['fe80:1::7184:2b50:9fbe:e337'], 250),
    ('ff00::', 8, '::', 'lo0', ['::1', 'fe80:2::1'], 250)
]

= Darwin 23.6 (MacOS 14.5) x86_64 - read_routes()
~ mock_read_routes_bsd little_endian_only

import zlib

from scapy.arch.bpf.pfroute import _bsd_iff_flags
_DARWIN_IFACES = {1: {'name': 'lo0',  'index': 1,  'flags': FlagValue(32841, _bsd_iff_flags),  'mac': '00:00:00:00:00:00',  'ips': [{'af_family': 2, 'index': 1, 'address': '127.0.0.1'},   {'af_family': 30, 'index': 1, 'address': '::1', 'scope': 16},   {'af_family': 30, 'index': 1, 'address': 'fe80:1::1', 'scope': 32}]}, 2: {'name': 'gif0',  'index': 2,  'flags': FlagValue(32784, _bsd_iff_flags),  'mac': '00:00:00:00:00:00',  'ips': []}, 3: {'name': 'stf0',  'index': 3,  'flags': FlagValue(0, _bsd_iff_flags),  'mac': '00:00:00:00:00:00',  'ips': []}, 4: {'name': 'XHC2',  'index': 4,  'flags': FlagValue(0, _bsd_iff_flags),  'mac': '00:00:00:00:00:00',  'ips': []}, 5: {'name': 'en0',  'index': 5,  'flags': FlagValue(34915, _bsd_iff_flags),  'mac': '52:54:00:09:49:17',  'ips': [{'af_family': 30,    'index': 5,    'address': 'fe80:5::409:eec9:f06c:50ab',    'scope': 32},   {'af_family': 30,    'index': 5,    'address': 'fec0::89e:daf7:5cb1:f1f0',    'scope': 64},   {'af_family': 30,    'index': 5,    'address': 'fec0::c0c4:1f0b:61ba:ea8',    'scope': 64},   {'af_family': 2, 'index': 5, 'address': '10.0.2.15'}]}, 6: {'name': 'utun0',  'index': 6,  'flags': FlagValue(32849, _bsd_iff_flags),  'mac': '00:00:00:00:00:00',  'ips': [{'af_family': 30,    'index': 6,    'address': 'fe80:6::d36e:82de:94dc:84fc',    'scope': 32}]}, 7: {'name': 'utun1',  'index': 7,  'flags': FlagValue(32849, _bsd_iff_flags),  'mac': '00:00:00:00:00:00',  'ips': [{'af_family': 30,    'index': 7,    'address': 'fe80:7::7ce2:1f7b:2c29:a5ee',    'scope': 32}]}, 8: {'name': 'utun2',  'index': 8,  'flags': FlagValue(32849, _bsd_iff_flags),  'mac': '00:00:00:00:00:00',  'ips': [{'af_family': 30,    'index': 8,    'address': 'fe80:8::e4e0:bef:bf56:2605',    'scope': 32}]}, 9: {'name': 'utun3',  'index': 9,  'flags': FlagValue(32849, _bsd_iff_flags),  'mac': '00:00:00:00:00:00',  'ips': [{'af_family': 30,    'index': 9,    'address': 'fe80:9::ce81:b1c:bd2c:69e',    'scope': 32}]}}

_PFROUTE_DATA = zlib.decompress(bytes.fromhex('789ccdd94d6813411400e0d9ddcc36e6608d4472a950a8e0d183e8a1a7da5a503008012948b11eaa78f0e6498b180a45c1802d08164f7ab3f4208817b1e84a0ff647ea0fd2832815a5e021018f518931dbce6c5e76df6cb2e36c3a0325bce936fbf5bdd96176669ad00c2584584963e060fd7382e0ed3315fc22a4ed3183718a98260df675f3b8c83c5dc41ceeab7f1acca6ca6366922f451ebf6596598c5d84b8b9f1fd3b01cb8bc58f17a35852e01b337b29b17dd774d5b65a4b97a1dee5c1305772db55f3bba6998b26cc7d6eedf2cc2872ed5ffe5f974df2671abd211eea7a4ce0d9403c59816703e963f7b2508f857b3a5037ef5e72751b34fa581fcf9305fe9ebb4a029785f4b17bd59a5d3661431bb5fbe700b76e2ae780eef2d4619f4f3807c43d1fa5b3e753da587ad7e7b4b11c583aad8d65fe50561bcbc29de7fa589e7c93b5a87ea6d30bcf6eca5a12c082cd77a2269aefd295d280ac45798d2aa541598b052c70edd3ca82ad93986548d612435e8ecb5a605e145986652deaf3f23ba19185c2b83d8b7d8caf61c2c6eedbf7f81a463876ab3d7fa25b62ca4bf5c81b8d2c6b1a59dee96339b9aa8f25b72c6b513ed755732bd12dc1671abe3b71cb9ae099c6deb3b62d97af47b7c455a3196dde03b2fd4e8f4696c72a2cd8781135d178a95bd655586093cfcbab823616e7fb2f590b7c0f505223a72cfd1e10836556d6a2be46e5872a2c2af27234f76053850536d9bc8c54c61fe96219bdf6e9be2e96b1f1a913ba582e5d397bbb5dcbddbac5bd3fdf631536a873d84f1b961bc1d81be6946d69fafb8bcc44492fe1f38cc7680ac24d4dd70a0cade2b035d529f0bdbc56b73ee06b2af7daa7be7abaf72ae6af5a30dec93da17b17266c184739eb1135d9bdf9b9bf8d18db9bb7d97e78a773e46c7e1983f14e3ee7af7fcc27dbb534ea5588e52ce52b88b17ab9cffa4fc4c573441393e02ca520744569cce5ed43ec6667290639b7d5dbe931dd38c18976def40fb15043e2'))


with BSDLoader(DARWIN=True, sysctldata=_PFROUTE_DATA, ifaces=_DARWIN_IFACES, AF_INET6=30) as pfroute:
    routes = pfroute.read_routes()

assert routes == [
    (0, 0, '10.0.2.2', 'en0', '10.0.2.15', 1),
    (167772672, 4294967295, '0.0.0.0', 'en0', '10.0.2.15', 1),
    (167772674, 4294967295, '0.0.0.0', 'en0', '10.0.2.15', 1),
    (167772674, 4294967295, '0.0.0.0', 'en0', '10.0.2.15', 1),
    (167772675, 4294967295, '0.0.0.0', 'en0', '10.0.2.15', 1),
    (167772687, 4294967295, '0.0.0.0', 'en0', '10.0.2.15', 1),
    (167772927, 4294967295, '0.0.0.0', 'en0', '10.0.2.15', 1),
    (2130706432, 4294967040, '127.0.0.1', 'lo0', '127.0.0.1', 1),
    (2130706433, 4294967295, '127.0.0.1', 'lo0', '127.0.0.1', 1),
    (2851995648, 4294967295, '0.0.0.0', 'en0', '10.0.2.15', 1),
    (3758096384, 4043308800, '0.0.0.0', 'en0', '10.0.2.15', 1),
    (3758096635, 4294967295, '0.0.0.0', 'en0', '10.0.2.15', 1),
    (4294967295, 4294967295, '0.0.0.0', 'en0', '10.0.2.15', 1)
]

= Darwin 23.6 (MacOS 14.5) x86_64 - read_routes6()
~ mock_read_routes_bsd little_endian_only

_PFROUTE_DATA = zlib.decompress(bytes.fromhex('789cd5dd5f8c13451800f0d96dbbdb5e8fbbdaebfd114eaf08148fdc830124045f0a1581981062f48c8698c32022313c1925c2c381f8a0a28290887f728644e0d0aa60ce80e60897887fe08120f060cc051030218af74713088a60b73b5bda6e77f73a33fbcd7dfbc27667d2fefaddb7dfcc6cb7a58f84122142488028e9e9b97f8f90cadb60c8a1c1656bbddbbbed6637297e6635e4d01e8c0c1d1b797ed9a7c67e5fceac99e6f9d35d5edf47b3567c5c73683fbd76d3d91d839b6f58667d0ce695fe99f5e2e3ba43fb860b6deb3bda770f59e6f018cc277597463e73b8f878d8a1fdd2f9e8f091ce54c83247c660be1cf0cd1c293e1e71683fb131da7ab843eb31f6f7e7cc4aeedf503049a6b801d2c2cc0a4fdb7e5a3374a2cdb7bc46fd28ef6c9d7f43a7ceacaad69b5416772d56c94c7a784e715ba59ae1562faaf5fec9ed55d6417a39e23b9b1ec6125fea858d2f87770e32ef5c19de1106efd4e06a920e8669894f0620bd2cf14d91ad64f2dbf308894df8f9a9f4f65d38bcab9079d722f36e42e67d1f99770099f72832ef6554ded4d3130999747c70188db70399772632ef1a64deadc8bcfdc8bcc79179cf48f18eb278833db96585d26e8a6a4aae31f8edbdc4e2d5ee25eaac7f546dfd475757e8e159905e96f57c4a5b5438b63a9eb880cbdb08eabdc2ea8d516f22f0072e6f10d4cb9c0f96b729b810973704ea1de6f64eedc2e59d06ea651a8f4bbcb33b7179ef17e455171a5ec5c35bcd56d93bef075cde07047961c68b6ca6a1458c1726bed94ce345315e98f1229b69fe1dd2cb5b1fb299d41a482fef7891cdcc3826c6eb73fe26cdfdd513b5cd62bcfe7dde52ead541bdccf95bf086f7e1f24640bdcce75bc15bb71797b71ed4cb7bbe65338bbe87f4f2c6379b79380de9e53ddf72de51482ff3fc61b2b9ffe633eb6a9079a390deeb2c5efdef652430ed3962ce226a21bd4ce75ba977022e6f12597c93a0f13df5138337a92c21d763f4110922f14e43e64d21f35ab7c0a2f0aae180e5ba0b99f76e245eeb56752cf1b5ee2c9f24c66baee7831ede6ab6626fcd68e1700c85f7aa68af8f9f1f27952bd17385c30b20bdd718bc334862595a0fd36748aa905e96f90ef5d2af582441c70b96f531f5d6522fe8fd041cf1ad93e165591f9779c3905e8ef8d22bc0b0e71b4b3da3de06195e8ef85a5e0dd2cb91bf2dd45b87c43b917ab1d433cb0b9abf2cd7a3cabca0d7cf583e8f2df382e6ef632c5ee560ba462d4c8041f3e1257e2fe87c6711bf17743c6e64f56a9657d929c6ebdfe7b1796fb8105fd0f1ed04c3fa38ef6db29e5201bd7f5280f72f48efbfacf9db34629140c76301de00a49769be53eabd538cd7e7fad06279c9f85fbf957a41f3e11cb357cefc813dbe72e60fccf537791f3d2aaafefa7cbea5909d6fb7bd3aa497793e999233dfb9c9ea9d5fd76d8a60f3418017b49eb5f27b41c70b01f105adbf02bc2164def17fbd2fe76de832eb426e038daf002fb6f8828e6f02bca0d74b047823c8bca0d753057841ef4714e005bd5f4e8017f47e39015ed0ebebcff27beb21bd02e20b7a7f8900ef1d905ea6fb4bd4d45c73964e9f1ed0cbf47961a917b49e317d7ea1a646b46e53bcfce53def427a99ee8729f57e8bccfb1ba497e97e8d9c771bad0b5db1eba0eb0b015ed0f585002fe8fa428017747d21c00bbabe6862f72e34f69b17c37ae3fcde6648af80f80ababf1a2cbe6d90de667e6f1299773232ef3dc8bc539079a7427a1f62f686bb2909f4f7f038bcafcbf03eceed6d7e02d2cb11df2d9484251fb6c9f072e4c376f3119a7cf89a9240e39b64f6d6bf623e8a7f07e97d92dd4bebc383a0df2fe4f06e91e1e5c887f7cc47b0f9c03e5f6fd8489f612912ef6b32bcacd727735e29f3878becde3764787f65f7be23c3cb910fdb917977caf0b2cf771a3e301fc1ce7738ceb70f2909cbf9d623c3cb91bfbb647839f2618f0c2fc7f946bdb0e71b47feeea5242cf9db2bc3cb11df7d32bc1cf3c94fe83380ce27c9695e6f3be8fd041cde2c32ef67c8bc9fcbf0728c6f5f5012687de0882ff5a2c9870332bc1ce3c5979484251ffacc4768f2e12b64de8332bc1cf5ec102561c95fea858def7fccf16da4d74b26fc88c4db23c3cb743fad9aba463e26247b7e45a8bc6d9c7bf5f2b671eead41e6ad45e6ad47e68d61f12a4d86f72d34f940bd75d0de192cdee0d205e6afbc1a9bfe22a497e9f7b90cef1aeb68f055486f1bb7577f81c73b90f31a5f3c5188a24c27a4f02514db667b0763f7e65ed7f6b40e6df9fdd8add2cdad6f2ff5878249650a8c3fbf9f882ba4a58afe87685e28b9401b71561d5e93e7772bcafef6c47486cc2ff8166d2ef1b5e5472f7587826aa311dfe3f43df8e8566fbb35f248272904cbcb599c078e5b9adf59fcba05e7a324b2a4d9bbbf71be197f0feb7cf3290fcaffe4b6b6d36b379ddd31b8f986b1ef920fb6be4071b6bd6e22aed992ceadbf11676332ed15e7957c71d6bdda365c685bdfd1be7bc8d87789b3ad2f509c6daf9b88eb6e71b6f537e26c7c01c52bce276d91aaca19f66abb743e3a7ca43395ff6bbac4d9d61728ceb6d74dc4c36e71b6f537e26c7c11c52bce97035cce8857db898dd1d6c31d5afe5a804b9c6d7d81e26c7bdd443ce216675bffa2719af8569f07ec6d02c7e9427c15fac68bdf8397bbd2fb7570f38ed3c4b73ca0ce70cf2fd7961f181d2971561aa72bf487740e1c6d8baef8a6ae77accee2fefdd6fc5de956acff7045b4f3964b5bd996cfb88895b01efdfa0ae79abb9de75cab64af74ae553257cadf7e6bfe066c769beb38d86dfdfaad3991879d674ee461b7cd1f1cecb67efdd63cc3c3ce33cff0b0dbc66407bbad5fbf35767bd879c66e0fbb6d9c73b0dbfa81d4970aeb49b7ba515b614cacd40fa4be28635b7357324bab2f4a75eb4307bb9cfaa254b7e672b0cba92f4a75eb1807bb9cfaa254b73670b0cba92f2ae2faa222ac2f2ae2faa222ae2f2ae2faa2fa535ffe079dfe8806'))


with BSDLoader(DARWIN=True, sysctldata=_PFROUTE_DATA, ifaces=_DARWIN_IFACES, AF_INET6=30) as pfroute:
    routes = pfroute.read_routes6()

assert routes == [
    ('::', 0, 'fe80:5::2', 'en0', ['fe80:5::409:eec9:f06c:50ab'], 1),
    ('::', 0, 'fe80:6::', 'utun0', ['fe80:6::d36e:82de:94dc:84fc'], 1),
    ('::', 0, 'fe80:7::', 'utun1', ['fe80:7::7ce2:1f7b:2c29:a5ee'], 1),
    ('::', 0, 'fe80:8::', 'utun2', ['fe80:8::e4e0:bef:bf56:2605'], 1),
    ('::', 0, 'fe80:9::', 'utun3', ['fe80:9::ce81:b1c:bd2c:69e'], 1),
    ('::1', 128, '::1', 'lo0', ['::1'], 1),
    ('fe80:1::', 64, 'fe80:1::1', 'lo0', ['fe80:1::1'], 1),
    ('fe80:1::1', 128, '::', 'lo0', ['fe80:1::1'], 1),
    ('fe80:5::', 64, '::', 'en0', ['fe80:5::409:eec9:f06c:50ab'], 1),
    ('fe80:5::2', 128, '::', 'en0', ['fe80:5::409:eec9:f06c:50ab'], 1),
    ('fe80:5::409:eec9:f06c:50ab', 128, '::', 'lo0', ['fe80:5::409:eec9:f06c:50ab'], 1),
    ('fe80:6::', 64, 'fe80:6::d36e:82de:94dc:84fc', 'utun0', ['fe80:6::d36e:82de:94dc:84fc'], 1),
    ('fe80:6::d36e:82de:94dc:84fc', 128, '::', 'lo0', ['fe80:6::d36e:82de:94dc:84fc'], 1),
    ('fe80:7::', 64, 'fe80:7::7ce2:1f7b:2c29:a5ee', 'utun1', ['fe80:7::7ce2:1f7b:2c29:a5ee'], 1),
    ('fe80:7::7ce2:1f7b:2c29:a5ee', 128, '::', 'lo0', ['fe80:7::7ce2:1f7b:2c29:a5ee'], 1),
    ('fe80:8::', 64, 'fe80:8::e4e0:bef:bf56:2605', 'utun2', ['fe80:8::e4e0:bef:bf56:2605'], 1),
    ('fe80:8::e4e0:bef:bf56:2605', 128, '::', 'lo0', ['fe80:8::e4e0:bef:bf56:2605'], 1),
    ('fe80:9::', 64, 'fe80:9::ce81:b1c:bd2c:69e', 'utun3', ['fe80:9::ce81:b1c:bd2c:69e'], 1),
    ('fe80:9::ce81:b1c:bd2c:69e', 128, '::', 'lo0', ['fe80:9::ce81:b1c:bd2c:69e'], 1),
    ('fec0::', 64, '::', 'en0', ['fe80:5::409:eec9:f06c:50ab'], 1),
    ('fec0::2', 128, '::', 'en0', ['fe80:5::409:eec9:f06c:50ab'], 1),
    ('fec0::89e:daf7:5cb1:f1f0', 128, '::', 'lo0', ['fec0::89e:daf7:5cb1:f1f0'], 1),
    ('fec0::c0c4:1f0b:61ba:ea8', 128, '::', 'lo0', ['fec0::c0c4:1f0b:61ba:ea8'], 1),
    ('ff00::', 8, '::1', 'lo0', ['::1'], 1),
    ('ff00::', 8, '::', 'en0', ['fe80:5::409:eec9:f06c:50ab'], 1),
    ('ff00::', 8, 'fe80:6::d36e:82de:94dc:84fc', 'utun0', ['fe80:6::d36e:82de:94dc:84fc'], 1),
    ('ff00::', 8, 'fe80:7::7ce2:1f7b:2c29:a5ee', 'utun1', ['fe80:7::7ce2:1f7b:2c29:a5ee'], 1),
    ('ff00::', 8, 'fe80:8::e4e0:bef:bf56:2605', 'utun2', ['fe80:8::e4e0:bef:bf56:2605'], 1),
    ('ff00::', 8, 'fe80:9::ce81:b1c:bd2c:69e', 'utun3', ['fe80:9::ce81:b1c:bd2c:69e'], 1),
    ('ff01:1::', 32, '::1', 'lo0', ['::1'], 1),
    ('ff01:5::', 32, '::', 'en0', ['fe80:5::409:eec9:f06c:50ab'], 1),
    ('ff01:6::', 32, 'fe80:6::d36e:82de:94dc:84fc', 'utun0', ['fe80:6::d36e:82de:94dc:84fc'], 1),
    ('ff01:7::', 32, 'fe80:7::7ce2:1f7b:2c29:a5ee', 'utun1', ['fe80:7::7ce2:1f7b:2c29:a5ee'], 1),
    ('ff01:8::', 32, 'fe80:8::e4e0:bef:bf56:2605', 'utun2', ['fe80:8::e4e0:bef:bf56:2605'], 1),
    ('ff01:9::', 32, 'fe80:9::ce81:b1c:bd2c:69e', 'utun3', ['fe80:9::ce81:b1c:bd2c:69e'], 1),
    ('ff02:1::', 32, '::1', 'lo0', ['::1'], 1),
    ('ff02:5::', 32, '::', 'en0', ['fe80:5::409:eec9:f06c:50ab'], 1),
    ('ff02:6::', 32, 'fe80:6::d36e:82de:94dc:84fc', 'utun0', ['fe80:6::d36e:82de:94dc:84fc'], 1),
    ('ff02:7::', 32, 'fe80:7::7ce2:1f7b:2c29:a5ee', 'utun1', ['fe80:7::7ce2:1f7b:2c29:a5ee'], 1),
    ('ff02:8::', 32, 'fe80:8::e4e0:bef:bf56:2605', 'utun2', ['fe80:8::e4e0:bef:bf56:2605'], 1),
    ('ff02:9::', 32, 'fe80:9::ce81:b1c:bd2c:69e', 'utun3', ['fe80:9::ce81:b1c:bd2c:69e'], 1)
]

############
############
+ Mocked _parse_tcpreplay_result(stdout, stderr, argv, results_dict)
~ mock_parse_tcpreplay_result

= Test mocked _parse_tcpreplay_result

from scapy.sendrecv import _parse_tcpreplay_result

stdout = """Actual: 1024 packets (198929 bytes) sent in 67.88 seconds.
Rated: 2930.6 bps, 0.02 Mbps, 15.09 pps
Statistics for network device: mon0
        Attempted packets:         1024
        Successful packets:        1024
        Failed packets:            0
        Retried packets (ENOBUFS): 0
        Retried packets (EAGAIN):  0"""

stderr = """Warning in sendpacket.c:sendpacket_open_pf() line 669:
Unsupported physical layer type 0x0323 on mon0.  Maybe it works, maybe it won't.  See tickets #123/318
sending out mon0
processing file: replay-example.pcap"""

argv = ['tcpreplay', '--intf1=mon0', '--multiplier=1.00', '--timer=nano', 'replay-example.pcap']
results_dict = _parse_tcpreplay_result(stdout, stderr, argv)

results_dict

assert results_dict["packets"] == 1024
assert results_dict["bytes"] == 198929
assert results_dict["time"] == 67.88
assert results_dict["bps"] == 2930.6
assert results_dict["mbps"] == 0.02
assert results_dict["pps"] == 15.09
assert results_dict["attempted"] == 1024
assert results_dict["successful"] == 1024
assert results_dict["failed"] == 0
assert results_dict["retried_enobufs"] == 0
assert results_dict["retried_eagain"] == 0
assert results_dict["command"] == " ".join(argv)
assert len(results_dict["warnings"]) == 3

= Test more recent version with flows

data = """Actual: 1 packets (42 bytes) sent in 0.000278 seconds
Rated: 151079.1 Bps, 1.20 Mbps, 3597.12 pps
Flows: 1 flows, 3597.12 fps, 1 flow packets, 0 non-flow
Statistics for network device: enp0s3
        Successful packets:        1
        Failed packets:            0
        Truncated packets:         0
        Retried packets (ENOBUFS): 0
        Retried packets (EAGAIN):  0
"""

results_dict = _parse_tcpreplay_result(data, "", [])
results_dict

expected = {
    'bps': 151079.1,
    'bytes': 42,
    'command': '',
    'failed': 0,
    'flow_packets': 1,
    'flows': 1,
    'fps': 3597.12,
    'mbps': 1.2,
    'non_flow': 0,
    'packets': 1,
    'pps': 3597.12,
    'retried_eagain': 0,
    'retried_enobufs': 0,
    'successful': 1,
    'time': 0.000278,
    'truncated': 0,
    'warnings': []
}

assert results_dict == expected

############
############
+ Mocked route() calls

= Mocked IPv4 routes calls

import scapy

conf.ifaces._add_fake_iface("enp3s0")
conf.ifaces._add_fake_iface("lo")

old_iface = conf.iface
old_loopback = conf.loopback_name
try:
    conf.iface = 'enp3s0'
    conf.loopback_name = 'lo'
    conf.route.invalidate_cache()
    conf.route.routes = [
        (4294967295, 4294967295, '0.0.0.0', 'wlan0', '', 281),
        (4294967295, 4294967295, '0.0.0.0', 'lo', '', 291),
        (4294967295, 4294967295, '0.0.0.0', 'enp3s0', '192.168.0.119', 281),
        (3758096384, 4026531840, '0.0.0.0', 'lo', '', 291),
        (3758096384, 4026531840, '0.0.0.0', 'wlan0', '', 281),
        (3758096384, 4026531840, '0.0.0.0', 'enp3s0', '1.1.1.1', 281),
        (3232235775, 4294967295, '0.0.0.0', 'enp3s0', '2.2.2.2', 281),
        (3232235639, 4294967295, '0.0.0.0', 'enp3s0', '3.3.3.3', 281),
        (3232235520, 4294967040, '0.0.0.0', 'enp3s0', '4.4.4.4', 281),
        (0, 0, '192.168.0.254', 'enp3s0', '192.168.0.119', 25)
    ]
    assert conf.route.route("192.168.0.0-10") == ('enp3s0', '4.4.4.4', '0.0.0.0')
    assert conf.route.route("192.168.0.119") == ('lo', '192.168.0.119', '0.0.0.0')
    assert conf.route.route("224.0.0.0") == ('enp3s0', '1.1.1.1', '0.0.0.0')
    assert conf.route.route("255.255.255.255") == ('enp3s0', '192.168.0.119', '0.0.0.0')
    assert conf.route.route("*") == ('enp3s0', '192.168.0.119', '192.168.0.254')
finally:
    conf.loopback_name = old_loopback
    conf.iface = old_iface
    conf.route.resync()
    conf.ifaces.reload()


= Mocked IPv6 routes calls

conf.ifaces._add_fake_iface("enp3s0")
conf.ifaces._add_fake_iface("lo")

old_iface = conf.iface
old_loopback = conf.loopback_name
try:
    conf.route6.ipv6_ifaces = set(['enp3s0', 'wlan0', 'lo'])
    conf.iface = 'enp3s0'
    conf.loopback_name = 'lo'
    conf.route6.invalidate_cache()
    conf.route6.routes = [
        ('fe80::dd17:1fa6:a123:ab4', 128, '::', 'lo', ['fe80::dd17:1fa6:a123:ab4'], 291),
        ('fe80::7101:5678:1234:da65', 128, '::', 'enp3s0', ['fe80::7101:5678:1234:da65'], 281),
        ('fe80::1f:ae12:4d2c:abff', 128, '::', 'wlan0', ['fe80::1f:ae12:4d2c:abff'], 281),
        ('fe80::', 64, '::', 'wlan0', ['fe80::1f:ae12:4d2c:abff'], 281),
        ('fe80::', 64, '::', 'lo', ['fe80::dd17:1fa6:a123:ab4'], 291),
        ('fe80::', 64, '::', 'enp3s0', ['fe80::7101:5678:1234:da65'], 281),
        ('2a01:e35:1e06:ab56:7010:6548:9646:fa77', 128, '::', 'enp3s0', ['2a01:e35:1e06:ab56:7010:6548:9646:fa77', '2a01:e35:1e06:ab56:512:8bb7:8ab8:14a8'], 281),
        ('2a01:e35:1e06:ab56:512:8bb7:8ab8:14a8', 128, '::', 'enp3s0', ['2a01:e35:1e06:ab56:7010:6548:9646:fa77', '2a01:e35:1e06:ab56:512:8bb7:8ab8:14a8'], 281),
        ('2a01:e35:1e06:ab56::', 64, '::', 'enp3s0', ['2a01:e35:1e06:ab56:7010:6548:9646:fa77', '2a01:e35:1e06:ab56:512:8bb7:8ab8:14a8'], 281),
        ('::', 0, 'fe80::160c:64aa:ef6f:fe14', 'enp3s0', ['2a01:e35:1e06:ab56:7010:6548:9646:fa77', '2a01:e35:1e06:ab56:512:8bb7:8ab8:14a8'], 281)
    ]
    assert conf.route6.route("2a01:e35:1e06:ab56:512:8bb7:8ab8:14a8") == ('enp3s0', '2a01:e35:1e06:ab56:7010:6548:9646:fa77', '::')
    assert conf.route6.route("::1") == ('enp3s0', '2a01:e35:1e06:ab56:7010:6548:9646:fa77', 'fe80::160c:64aa:ef6f:fe14')
    assert conf.route6.route("ff02::1") == ('enp3s0', 'fe80::7101:5678:1234:da65', '::')
    assert conf.route6.route("fe80::1") == ('enp3s0', 'fe80::7101:5678:1234:da65', '::')
    assert conf.route6.route("fe80::1", dev='lo') == ('lo', 'fe80::dd17:1fa6:a123:ab4', '::')
finally:
    conf.loopback_name = old_loopback
    conf.iface = old_iface
    conf.route6.resync()
    conf.ifaces.reload()

= Windows: reset routes properly

if WINDOWS:
    from scapy.arch.windows import _route_add_loopback
    _route_add_loopback()


############
############
############
+ Tests of StreamSocket

= Test with DNS over TCP
~ netaccess

import socket
sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sck.connect(("8.8.8.8", 53))

class DNSTCP(Packet):
    name = "DNS over TCP"
    fields_desc = [ FieldLenField("len", None, fmt="!H", length_of="dns"),
                    PacketLenField("dns", 0, DNS, length_from=lambda p: p.len)]

ssck = StreamSocket(sck, DNSTCP)

r = ssck.sr1(DNSTCP(dns=DNS(rd=1, qd=DNSQR(qname="www.example.com"))), timeout=3)
sck.close()
assert DNSTCP in r and len(r.dns.an)

############
+ Tests of SSLStreamContext

= Test with recv() calls that return exact packet-length rawings
~ sslraweamsocket

import socket
class MockSocket(object):
    def __init__(self):
        self.l = [ b'\x00\x00\x00\x01', b'\x00\x00\x00\x02', b'\x00\x00\x00\x03' ]
    def recv(self, x):
        if len(self.l) == 0:
            return b""
        return self.l.pop(0)
    def fileno(self):
        return -1
    def close(self):
        return


class TestPacket(Packet):
    name = 'TestPacket'
    fields_desc = [
        IntField('data', 0)
    ]
    def guess_payload_class(self, p):
        return conf.padding_layer

s = MockSocket()
ss = SSLStreamSocket(s, basecls=TestPacket)

p = ss.recv()
assert p.data == 1
p = ss.recv()
assert p.data == 2
p = ss.recv()
assert p.data == 3
try:
    ss.recv()
    ret = False
except EOFError:
    ret = True

assert ret

= Test with recv() calls that return twice as much data as the exact packet-length
~ sslraweamsocket

import socket
class MockSocket(object):
    def __init__(self):
        self.l = [ b'\x00\x00\x00\x01\x00\x00\x00\x02', b'\x00\x00\x00\x03\x00\x00\x00\x04' ]
    def recv(self, x):
        if len(self.l) == 0:
            return b""
        return self.l.pop(0)
    def fileno(self):
        return -1
    def close(self):
        return


class TestPacket(Packet):
    name = 'TestPacket'
    fields_desc = [
        IntField('data', 0)
    ]
    def guess_payload_class(self, p):
        return conf.padding_layer

s = MockSocket()
ss = SSLStreamSocket(s, basecls=TestPacket)

p = ss.recv()
assert p.data == 1
p = ss.recv()
assert p.data == 2
p = ss.recv()
assert p.data == 3
p = ss.recv()
assert p.data == 4
try:
    ss.recv()
    ret = False
except EOFError:
    ret = True

assert ret

= Test with recv() calls that return not enough data
~ sslraweamsocket

import socket
class MockSocket(object):
    def __init__(self):
        self.l = [ b'\x00\x00', b'\x00\x01', b'\x00\x00\x00', b'\x02', b'\x00\x00', b'\x00', b'\x03' ]
    def recv(self, x):
        if len(self.l) == 0:
            return b""
        return self.l.pop(0)
    def fileno(self):
        return -1
    def close(self):
        return


class TestPacket(Packet):
    name = 'TestPacket'
    fields_desc = [
        IntField('data', 0)
    ]
    def guess_payload_class(self, p):
        return conf.padding_layer

s = MockSocket()
ss = SSLStreamSocket(s, basecls=TestPacket)

p = ss.recv()
assert p.data == 1

p = ss.recv()
assert p.data == 2

p = ss.recv()
assert p.data == 3

try:
    ss.recv()
    ret = False
except EOFError:
    ret = True

assert ret


############
############
+ Test correct conversion from binary to rawing of IPv6 addresses

= IPv6 bin to rawing conversion
from scapy.pton_ntop import _inet6_ntop, inet_ntop
import socket
for binfrm, address in [
        (b'\x00' * 16, '::'),
        (b'\x11\x11\x22\x22\x33\x33\x44\x44\x55\x55\x66\x66\x77\x77\x88\x88',
         '1111:2222:3333:4444:5555:6666:7777:8888'),
        (b'\x11\x11\x22\x22\x33\x33\x44\x44\x55\x55\x00\x00\x00\x00\x00\x00',
         '1111:2222:3333:4444:5555::'),
        (b'\x00\x00\x00\x00\x00\x00\x44\x44\x55\x55\x66\x66\x77\x77\x88\x88',
         '::4444:5555:6666:7777:8888'),
        (b'\x00\x00\x00\x00\x33\x33\x44\x44\x00\x00\x00\x00\x00\x00\x88\x88',
         '0:0:3333:4444::8888'),
        (b'\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
         '1::'),
        (b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01',
         '::1'),
        (b'\x11\x11\x00\x00\x00\x00\x44\x44\x00\x00\x00\x00\x77\x77\x88\x88',
         '1111::4444:0:0:7777:8888'),
        (b'\x10\x00\x02\x00\x00\x30\x00\x04\x00\x05\x00\x60\x07\x00\x80\x00',
         '1000:200:30:4:5:60:700:8000'),
]:
    addr1 = inet_ntop(socket.AF_INET6, binfrm)
    addr2 = _inet6_ntop(binfrm)
    assert address == addr1 == addr2

= IPv6 bin to rawing conversion - Zero-block of length 1
binfrm = b'\x11\x11\x22\x22\x33\x33\x44\x44\x55\x55\x66\x66\x00\x00\x88\x88'
addr1, addr2 = inet_ntop(socket.AF_INET6, binfrm), _inet6_ntop(binfrm)
# On Mac OS socket.inet_ntop is not fully compliant with RFC 5952 and
# shortens the single zero block to '::'. This is a valid IPv6 address
# representation anyway.
assert(addr1 in ['1111:2222:3333:4444:5555:6666:0:8888',
                 '1111:2222:3333:4444:5555:6666::8888'])
assert addr2 == '1111:2222:3333:4444:5555:6666:0:8888'

= IPv6 bin to rawing conversion - Illegal sizes
for binfrm in ["\x00" * 15, b"\x00" * 17]:
    rc = False
    try:
        inet_ntop(socket.AF_INET6, binfrm)
    except Exception as exc1:
        _exc1 = exc1
        rc = True
    assert rc
    try:
        _inet6_ntop(binfrm)
    except Exception as exc2:
        rc = isinstance(exc2, type(_exc1))
    assert rc


############
############
+ Addresses generators

= Net

assert list(Net("192.168.0.0/31")) == ["192.168.0.0", "192.168.0.1"]

assert "1.2.3.4" in Net("0.0.0.0/0")

assert "192.168.0.0/25" in Net("192.168.0.0/24")

assert "192.168.0.0/23" not in Net("192.168.0.0/24")

assert "0.0.0.0/1" in Net("0.0.0.0/0")

assert "0.0.0.0/0" not in Net("0.0.0.0/1")

assert Net("1.2.3.0/24") == Net("1.2.3.0", "1.2.3.255")

assert hash(Net("1.2.3.0/24")) == hash(Net("1.2.3.0", "1.2.3.255"))

= Net using name
~ netaccess

ip = IP(dst="www.google.com")
n1 = ip.dst
assert isinstance(n1, Net)
ip.show()

= Net using implicit format in IP

assert len(list(IP(dst=("192.168.0.100", "192.168.0.199")))) == 100

= Multiple IP addresses test
~ netaccess

ip = IP(dst=['192.168.0.1', 'www.google.fr'],ihl=(1,5))
assert ip.dst[0] == '192.168.0.1'
assert isinstance(ip.dst[1], Net)
src = ip.src
assert src
assert isinstance(src, str)

= OID

oid = OID("1.2.3.4.5.6-8")
sum(1 for o in oid) == 3
assert oid.__iterlen__() == 3

= Net6

n1 = Net6("2001:db8::/127")
assert len(list(n1)) == 2
assert len(n1) == 2

n2 = Net6("fec0::/110")
assert len(n2) == 262144

assert "ffff::ffff" in Net6("::/0")

assert "::/1" in Net6("::/0")

assert "::/0" not in Net6("::/1")

assert Net6("::/120") == Net6("::", "::ff")

assert hash(Net6("::/120")) == hash(Net6("::", "::ff"))

assert Net6("::1.2.3.0/120") == Net6("::1.2.3.0", "::1.2.3.255")

assert hash(Net6("::1.2.3.0/120")) == hash(Net6("::1.2.3.0", "::1.2.3.255"))

assert Net6("::1.2.3.0/120") != Net("1.2.3.0/24")

assert hash(Net6("::1.2.3.0/120")) != hash(Net("1.2.3.0/24"))

= Net6 using web address
~ netaccess ipv6

ip = IPv6(dst="www.google.com")
n1 = ip.dst
assert isinstance(n1, Net6)
assert "www.google.com" in repr(n1)
ip.show()

ip = IPv6(dst="www.yahoo.com")
assert IPv6(raw(ip)).dst == [p.dst for p in ip][0]

= Net6 using implicit format in IPv6

assert len(list(IPv6(dst=("fe80::1", "fe80::1f")))) == 31

= Multiple IPv6 addresses test
~ netaccess ipv6

ip = IPv6(dst=['2001:db8::1', 'www.google.fr'],hlim=(1,5))
assert ip.dst[0] == '2001:db8::1'
assert isinstance(ip.dst[1], Net6)
src = ip.src
assert src
assert isinstance(src, str)

= Test repr on Net
~ netaccess

conf.color_theme = BlackAndWhite()
output = repr(IP(src="www.google.com"))
assert 'Net("www.google.com/32")' in output

= Test repr on Net
~ netaccess ipv6

conf.color_theme = BlackAndWhite()
assert 'Net6("www.google.com/128")' in repr(IPv6(src="www.google.com"))

############
############
+ IPv6 helpers

= in6_getLocalUniquePrefix()

p = in6_getLocalUniquePrefix()
len(inet_pton(socket.AF_INET6, p)) == 16 and p.startswith("fd")

= Misc addresses manipulation functions

teredoAddrExtractInfo("2001:0:0a0b:0c0d:0028:f508:f508:08f5") == ("10.11.12.13", 40, "10.247.247.10", 2807)

ip6 = IP6Field("test", None)
ip6.i2repr("", "2001:0:0a0b:0c0d:0028:f508:f508:08f5") == "2001:0:0a0b:0c0d:0028:f508:f508:08f5 [Teredo srv: 10.11.12.13 cli: 10.247.247.10:2807]"
ip6.i2repr("", "2002:0102:0304::1") == "2002:0102:0304::1 [6to4 GW: 1.2.3.4]"

in6_iseui64("fe80::bae8:58ff:fed4:e5f6") == True

in6_isanycast("2001:db8::fdff:ffff:ffff:ff80") == True

a = inet_pton(socket.AF_INET6, "2001:db8::2807")
in6_xor(a, a) == b"\x00" * 16

a = inet_pton(socket.AF_INET6, "fe80::bae8:58ff:fed4:e5f6")
r = inet_ntop(socket.AF_INET6, in6_getnsma(a)) 
r == "ff02::1:ffd4:e5f6"

in6_isllsnmaddr(r) == True

in6_isdocaddr("2001:db8::2807") == True

in6_isaddrllallnodes("ff02::1") == True

in6_isaddrllallservers("ff02::2") == True

= in6_getscope()

assert in6_getscope("2001:db8::2807") == IPV6_ADDR_GLOBAL
assert in6_getscope("fec0::2807") == IPV6_ADDR_SITELOCAL
assert in6_getscope("fe80::2807") == IPV6_ADDR_LINKLOCAL
assert in6_getscope("ff02::2807") == IPV6_ADDR_LINKLOCAL
assert in6_getscope("ff0e::2807") == IPV6_ADDR_GLOBAL
assert in6_getscope("ff05::2807") == IPV6_ADDR_SITELOCAL
assert in6_getscope("ff01::2807") == IPV6_ADDR_LOOPBACK
assert in6_getscope("::1") == IPV6_ADDR_LOOPBACK

= construct_source_candidate_set()

dev_addresses = [('fe80::', IPV6_ADDR_LINKLOCAL, "linklocal"),('fec0::', IPV6_ADDR_SITELOCAL, "sitelocal"),('ff0e::', IPV6_ADDR_GLOBAL, "global")]

assert construct_source_candidate_set("2001:db8::2807", 0, dev_addresses) == ["ff0e::"]
assert construct_source_candidate_set("fec0::2807", 0, dev_addresses) == ["fec0::"]
assert construct_source_candidate_set("fe80::2807", 0, dev_addresses) == ["fe80::"]
assert construct_source_candidate_set("ff02::2807", 0, dev_addresses) == ["fe80::"]
assert construct_source_candidate_set("ff0e::2807", 0, dev_addresses) == ["ff0e::"]
assert construct_source_candidate_set("ff05::2807", 0, dev_addresses) == ["fec0::"]
assert construct_source_candidate_set("ff01::2807", 0, dev_addresses) == ["::1"]
assert construct_source_candidate_set("::", 0, dev_addresses) == ["ff0e::"]

= inet_pton()

from scapy.pton_ntop import _inet6_pton, inet_pton
import socket

ip6_bad_addrs = ["fe80::2e67:ef2d:7eca::ed8a",
                 "fe80:1234:abcd::192.168.40.12:abcd",
                 "fe80:1234:abcd::192.168.40",
                 "fe80:1234:abcd::192.168.400.12",
                 "1234:5678:9abc:def0:1234:5678:9abc:def0:",
                 "1234:5678:9abc:def0:1234:5678:9abc:def0:1234"]
for ip6 in ip6_bad_addrs:
    rc = False
    exc1 = None
    try:
        res1 = inet_pton(socket.AF_INET6, ip6)
    except Exception as e:
        rc = True
        exc1 = e
    assert rc
    rc = False
    try:
        res2 = _inet6_pton(ip6)
    except Exception as exc2:
        rc = isinstance(exc2, type(exc1))
    assert rc

ip6_good_addrs = [("fe80:1234:abcd::192.168.40.12",
                   b'\xfe\x80\x124\xab\xcd\x00\x00\x00\x00\x00\x00\xc0\xa8(\x0c'),
                  ("fe80:1234:abcd::fe06",
                   b'\xfe\x80\x124\xab\xcd\x00\x00\x00\x00\x00\x00\x00\x00\xfe\x06'),
                  ("fe80::2e67:ef2d:7ece:ed8a",
                   b'\xfe\x80\x00\x00\x00\x00\x00\x00.g\xef-~\xce\xed\x8a'),
                  ("::ffff",
                   b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff'),
                  ("ffff::",
                   b'\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'),
                  ('::', b'\x00' * 16)]
for ip6, res in ip6_good_addrs:
    res1 = inet_pton(socket.AF_INET6, ip6)
    res2 = _inet6_pton(ip6)
    assert res == res1 == res2


############
############
+ Test Route class

= make_route()

r4 = Route()
tmp_route = r4.make_route(host="10.12.13.14")
(tmp_route[0], tmp_route[1], tmp_route[2]) == (168561934, 4294967295, '0.0.0.0')

tmp_route = r4.make_route(net="10.12.13.0/24")
(tmp_route[0], tmp_route[1], tmp_route[2]) == (168561920, 4294967040, '0.0.0.0')

= add() & delt()

r4 = Route()
len_r4 = len(r4.routes)
r4.add(net="192.168.1.0/24", gw="1.2.3.4")
len(r4.routes) == len_r4 + 1
r4.delt(net="192.168.1.0/24", gw="1.2.3.4")
len(r4.routes) == len_r4

= ifchange()

r4.add(net="192.168.1.0/24", gw="1.2.3.4", dev=get_dummy_interface())
r4.ifchange(get_dummy_interface(), "5.6.7.8")
r4.routes[-1][4] == "5.6.7.8"

= ifdel()

r4.ifdel(get_dummy_interface())
len(r4.routes) == len_r4

= ifadd() & get_if_bcast()

r4 = Route()
len_r4 = len(r4.routes)

r4.ifadd(get_dummy_interface(), "1.2.3.4/24")
len(r4.routes) == len_r4 +1 

r4.get_if_bcast(get_dummy_interface()) == "1.2.3.255"

r4.ifdel(get_dummy_interface())
len(r4.routes) == len_r4

dummy_interface = get_dummy_interface()

bck_conf_route_routes = conf.route.routes
conf.route.routes = [
    (0, 0, '172.21.230.1', dummy_interface, '172.21.230.10', 1),                # 0.0.0.0         / 0.0.0.0         == 255.255.255.255
    (2851995648, 4294901760, '0.0.0.0', dummy_interface, '172.21.230.10', 1),   # 169.254.0.0     / 255.255.0.0     == 169.254.255.255
    (2887116288, 4294967040, '0.0.0.0', dummy_interface, '172.21.230.10', 1),   # 172.21.230.0    / 255.255.255.0   == 172.21.230.255
    (2887116289, 4294967295, '0.0.0.0', dummy_interface, '172.21.230.10', 1),   # 172.21.230.1    / 255.255.255.255 == 172.21.230.1
    (3758096384, 4026531840, '0.0.0.0', dummy_interface, '172.21.230.10', 1),   # 224.0.0.0       / 240.0.0.0       == 239.255.255.255
    (3758096635, 4294967295, '0.0.0.0', dummy_interface, '172.21.230.10', 1),   # 224.0.0.251     / 255.255.255.255 == 224.0.0.251
    (4294967295, 4294967295, '0.0.0.0', dummy_interface, '172.21.230.10', 1),   # 255.255.255.255 / 255.255.255.255 == 255.255.255.255
    ]

assert sorted(conf.route.get_if_bcast(dummy_interface)) == sorted(['169.254.255.255', '172.21.230.255', '239.255.255.255'])
conf.route.routes = bck_conf_route_routes

= Remove dummy interface

conf.ifaces.reload()

############
############
+ Flags

= IP flags
~ IP

pkt = IP(flags="MF")
assert pkt.flags.MF
assert not pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 1 (MF)>'
pkt.flags.MF = 0
pkt.flags.DF = 1
assert not pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 2 (DF)>'
pkt.flags |= 'evil+MF'
pkt.flags &= 'DF+MF'
assert pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 3 (MF+DF)>'

pkt = IP(flags=3)
assert pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 3 (MF+DF)>'
pkt.flags = 6
assert not pkt.flags.MF
assert pkt.flags.DF
assert pkt.flags.evil
assert repr(pkt.flags) == '<Flag 6 (DF+evil)>'

assert len({IP().flags, IP().flags}) == 1

pkt = IP()
pkt.flags = ""
assert pkt.flags == 0

= TCP flags
~ TCP

pkt = TCP(flags="SA")
assert pkt.flags == 18
assert pkt.flags.S
assert pkt.flags.A
assert pkt.flags.SA
assert not any(getattr(pkt.flags, f) for f in 'FRPUECN')
assert repr(pkt.flags) == '<Flag 18 (SA)>'
pkt.flags.U = True
pkt.flags.S = False
assert pkt.flags.A
assert pkt.flags.U
assert pkt.flags.AU
assert not any(getattr(pkt.flags, f) for f in 'FSRPECN')
assert repr(pkt.flags) == '<Flag 48 (AU)>'
pkt.flags &= 'SFA'
pkt.flags |= 'P'
assert pkt.flags.P
assert pkt.flags.A
assert pkt.flags.PA
assert not any(getattr(pkt.flags, f) for f in 'FSRUECN')

pkt = TCP(flags=56)
assert all(getattr(pkt.flags, f) for f in 'PAU')
assert pkt.flags.PAU
assert not any(getattr(pkt.flags, f) for f in 'FSRECN')
assert repr(pkt.flags) == '<Flag 56 (PAU)>'
pkt.flags = 50
assert all(getattr(pkt.flags, f) for f in 'SAU')
assert pkt.flags.SAU
assert not any(getattr(pkt.flags, f) for f in 'FRPECN')
assert repr(pkt.flags) == '<Flag 50 (SAU)>'

= Flag values mutation with .raw_packet_cache
~ IP TCP

pkt = IP(raw(IP(flags="MF")/TCP(flags="SA")))
assert pkt.raw_packet_cache is not None
assert pkt[TCP].raw_packet_cache is not None
assert pkt.flags.MF
assert not pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 1 (MF)>'
assert pkt[TCP].flags.S
assert pkt[TCP].flags.A
assert pkt[TCP].flags.SA
assert not any(getattr(pkt[TCP].flags, f) for f in 'FRPUECN')
assert repr(pkt[TCP].flags) == '<Flag 18 (SA)>'
pkt.flags.MF = 0
pkt.flags.DF = 1
pkt[TCP].flags.U = True
pkt[TCP].flags.S = False
pkt = IP(raw(pkt))
assert not pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 2 (DF)>'
assert pkt[TCP].flags.A
assert pkt[TCP].flags.U
assert pkt[TCP].flags.AU
assert not any(getattr(pkt[TCP].flags, f) for f in 'FSRPECN')
assert repr(pkt[TCP].flags) == '<Flag 48 (AU)>'

= Operations on flag values
~ TCP

p1, p2 = TCP(flags="SU"), TCP(flags="AU")
assert (p1.flags & p2.flags).U
assert not any(getattr(p1.flags & p2.flags, f) for f in 'FSRPAECN')
assert all(getattr(p1.flags | p2.flags, f) for f in 'SAU')
assert (p1.flags | p2.flags).SAU
assert not any(getattr(p1.flags | p2.flags, f) for f in 'FRPECN')

assert TCP(flags="SA").flags & TCP(flags="S").flags == TCP(flags="S").flags
assert TCP(flags="SA").flags | TCP(flags="S").flags == TCP(flags="SA").flags


############
############
+ 802.3

= Test detection

assert isinstance(Dot3(raw(Ether())),Ether)
assert isinstance(Ether(raw(Dot3())),Dot3)

a = Ether(b'\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00')
assert isinstance(a,Dot3)
assert a.dst == 'ff:ff:ff:ff:ff:ff'
assert a.src == '00:00:00:00:00:00'

a = Dot3(b'\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x90\x00')
assert isinstance(a,Ether)
assert a.dst == 'ff:ff:ff:ff:ff:ff'
assert a.src == '00:00:00:00:00:00'


############
############
+ ASN.1

= MIB
~ mib

import tempfile
fd, fname = tempfile.mkstemp()
os.write(fd, b"-- MIB test\nscapy       OBJECT IDENTIFIER ::= {test 2807}\n")
os.close(fd)

load_mib(fname)
assert sum(1 for k in conf.mib.d.values() if "scapy" in k) == 1

assert sum(1 for oid in conf.mib) > 100

= MIB - graph
~ mib

from unittest import mock

@mock.patch("scapy.asn1.mib.do_graph")
def get_mib_graph(do_graph):
    def store_graph(graph, **kargs):
        assert graph.startswith("""digraph "mib" {""")
        assert """"test.2807" [ label="scapy"  ];""" in graph
    do_graph.side_effect = store_graph
    conf.mib._make_graph()

get_mib_graph()

= MIB - test aliases
~ mib

# https://github.com/secdev/scapy/issues/2542
assert conf.mib._oidname("2.5.29.19") == "basicConstraints"

= DADict tests

a = DADict("test")
a[0] = "test_value1"
a["scapy"] = "test_value2"

assert a.test_value1 == 0
assert a.test_value2 == "scapy"

with ContextManagerCaptureOutput() as cmco:
    a._show()
    outp = cmco.get_output()

assert "scapy = 'test_value2'" in outp
assert "0 = 'test_value1'" in outp

= Test ETHER_TYPES

assert ETHER_TYPES.IPv4 == 2048
try:
    import warnings
    
    with warnings.catch_warnings(record=True) as w:
        warnings.simplefilter("always")
        ETHER_TYPES["BAOBAB"] = 0xffff
        assert ETHER_TYPES.BAOBAB == 0xffff
        assert issubclass(w[-1].category, DeprecationWarning)
except DeprecationWarning:
    # -Werror is used
    pass

= MIB - Check that MIB OIDs are not duplicated
~ mib

from scapy.asn1.mib import x509_oids_sets

_dct = {}
for d in x509_oids_sets:
    for elt in d:
        if elt in _dct:
            raise ValueError("OID %s already exists" % elt)
    _dct.update(d)

= BER tests

BER_id_enc(42) == '*'
BER_id_enc(2807) == b'\xbfw'

b = BERcodec_IPADDRESS()
r1 = b.enc("8.8.8.8")
r1 == b'@\x04\x08\x08\x08\x08'

r2 = b.dec(r1)[0]
r2.val == '8.8.8.8'

a = b'\x1f\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\x01\x01\x00C\x02\x01U0\x0f0\r\x06\x08+\x06\x01\x02\x01\x02\x01\x00\x02\x01!'
ret = False
try:
    BERcodec_Object.check_type(a)
except BER_BadTag_Decoding_Error:
    ret = True
else:
    ret = False

assert ret

= BER trigger failures

try:
    BERcodec_INTEGER.do_dec(b"\x02\x01")
    assert False
except BER_Decoding_Error:
    pass


############
############
+ Fields

= FieldLenField with BitField
class Test(Packet):
    name = "Test"
    fields_desc = [
        FieldLenField("BitCount", None, fmt="H", count_of="Values"),
        FieldLenField("ByteCount", None, fmt="B", length_of="Values"),
        FieldListField("Values", [], BitField("data", 0x0, size=1),
                       count_from=lambda pkt: pkt.BitCount),
    ]

pkt = Test(raw(Test(Values=[0, 0, 0, 0, 1, 1, 1, 1])))
assert pkt.BitCount == 8
assert pkt.ByteCount == 1

= PacketListField

class TestPacket(Packet):
  name = 'TestPacket'
  fields_desc = [ PacketListField('list', [], 0) ]

a = TestPacket()
a.list.append(1)
assert len(a.list) == 1

b = TestPacket()
assert len(b.list) == 0

= Test PacketListField deepcopy
class SubPacket(Packet):
    name = "SubPacket"
    fields_desc = [
        ByteField("mem", 1),
    ]

class TestPacket(Packet):
    name = "TestPacket"
    fields_desc = [
        PacketListField("packlist", SubPacket(), SubPacket),
    ]

a = TestPacket()
b = a.copy()
fuzz(b)
assert a.packlist[0].mem == 1

= PacketField

class InnerPacket(Packet):
    fields_desc = [ StrField("f_name", "test") ]

class TestPacket(Packet):
    fields_desc = [ PacketField("inner", InnerPacket(), InnerPacket) ]

p = TestPacket()
print(p.inner.f_name)
assert p.inner.f_name == b"test"

p = TestPacket()
p.inner.f_name = b"scapy"
assert p.inner.f_name == b"scapy"

p = TestPacket()
assert p.inner.f_name == b"test"

+ UUIDField

= Parsing a human-readable UUID
f = UUIDField('f', '01234567-89ab-cdef-0123-456789abcdef')
f.addfield(None, b'', f.default) == hex_bytes('0123456789abcdef0123456789abcdef')

= Parsing a machine-encoded UUID
f = UUIDField('f', bytearray.fromhex('0123456789abcdef0123456789abcdef'))
f.addfield(None, b'', f.default) == hex_bytes('0123456789abcdef0123456789abcdef')

= Parsing a tuple of values
f = UUIDField('f', (0x01234567, 0x89ab, 0xcdef, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef))
f.addfield(None, b'', f.default) == hex_bytes('0123456789abcdef0123456789abcdef')

= Handle None values
f = UUIDField('f', None)
f.addfield(None, b'', f.default) == hex_bytes('00000000000000000000000000000000')

= Get a UUID for dissection
from uuid import UUID
f = UUIDField('f', None)
f.getfield(None, bytearray.fromhex('0123456789abcdef0123456789abcdef01')) == (b'\x01', UUID('01234567-89ab-cdef-0123-456789abcdef'))

= Verify little endian UUIDField
* The endianness of a UUIDField should be apply by block on each block in parenthesis '(01234567)-(89ab)-(cdef)-(01)(23)-(45)(67)(89)(ab)(cd)(ef)'
f = UUIDField('f', '01234567-89ab-cdef-0123-456789abcdef', uuid_fmt=UUIDField.FORMAT_LE)
f.addfield(None, b'', f.default) == hex_bytes('67452301ab89efcd0123456789abcdef')

= Verify reversed UUIDField
* This should reverse the entire value as 128-bits
f = UUIDField('f', '01234567-89ab-cdef-0123-456789abcdef', uuid_fmt=UUIDField.FORMAT_REV)
f.addfield(None, b'', f.default) == hex_bytes('efcdab8967452301efcdab8967452301')

+ RandUUID

= RandUUID setup

RANDUUID_TEMPLATE = '01234567-89ab-*-01*-*****ef'
RANDUUID_FIXED = uuid.uuid4()

= RandUUID default behaviour

ru = RandUUID()
assert ru._fix().version == 4
assert ru.command() == "RandUUID()"

= RandUUID incorrect implicit args

assert expect_exception(ValueError, lambda: RandUUID(node=0x1234, name="scapy"))
assert expect_exception(ValueError, lambda: RandUUID(node=0x1234, namespace=uuid.uuid4()))
assert expect_exception(ValueError, lambda: RandUUID(clock_seq=0x1234, name="scapy"))
assert expect_exception(ValueError, lambda: RandUUID(clock_seq=0x1234, namespace=uuid.uuid4()))
assert expect_exception(ValueError, lambda: RandUUID(name="scapy"))
assert expect_exception(ValueError, lambda: RandUUID(namespace=uuid.uuid4()))

= RandUUID v4 UUID (correct args)

u = RandUUID(version=4)._fix()
assert u.version == 4

u2 = RandUUID(version=4)._fix()
assert u2.version == 4

assert str(u) != str(u2)

= RandUUID v4 UUID (incorrect args)

assert expect_exception(ValueError, lambda: RandUUID(version=4, template=RANDUUID_TEMPLATE))
assert expect_exception(ValueError, lambda: RandUUID(version=4, node=0x1234))
assert expect_exception(ValueError, lambda: RandUUID(version=4, clock_seq=0x1234))
assert expect_exception(ValueError, lambda: RandUUID(version=4, namespace=uuid.uuid4()))
assert expect_exception(ValueError, lambda: RandUUID(version=4, name="scapy"))

= RandUUID v1 UUID

u = RandUUID(version=1)._fix()
assert u.version in [1, 4]

u = RandUUID(version=1, node=0x1234)._fix()
assert u.version == 1
assert u.node == 0x1234

u = RandUUID(version=1, clock_seq=0x1234)._fix()
assert u.version == 1
assert u.clock_seq == 0x1234

ru = RandUUID(version=1, node=0x1234, clock_seq=0x1bcd)
assert ru.command() == "RandUUID(node=4660, clock_seq=7117, version=1)"
u = ru._fix()
assert u.version == 1
assert u.node == 0x1234
assert u.clock_seq == 0x1bcd

= RandUUID v1 UUID (implicit version)

u = RandUUID(node=0x1234)._fix()
assert u.version == 1
assert u.node == 0x1234

u = RandUUID(clock_seq=0x1234)._fix()
assert u.version == 1
assert u.clock_seq == 0x1234

u = RandUUID(node=0x1234, clock_seq=0x1bcd)._fix()
assert u.version == 1
assert u.node == 0x1234
assert u.clock_seq == 0x1bcd

= RandUUID v1 UUID (incorrect args)

assert expect_exception(ValueError, lambda: RandUUID(version=1, template=RANDUUID_TEMPLATE))
assert expect_exception(ValueError, lambda: RandUUID(version=1, namespace=uuid.uuid4()))
assert expect_exception(ValueError, lambda: RandUUID(version=1, name="scapy"))

= RandUUID v5 UUID

ru = RandUUID(version=5, namespace=RANDUUID_FIXED, name="scapy")
u = ru._fix()
assert u.version == 5
assert ru.command() == "RandUUID(namespace=%r, name='scapy', version=5)" % RANDUUID_FIXED

u2 = RandUUID(version=5, namespace=RANDUUID_FIXED, name="scapy")._fix()
assert u2.version == 5
assert u.bytes == u2.bytes

# implicit v5
u2 = RandUUID(namespace=RANDUUID_FIXED, name="scapy")._fix()
assert u.bytes == u2.bytes

= RandUUID v5 UUID (incorrect args)

assert expect_exception(ValueError, lambda: RandUUID(version=5, template=RANDUUID_TEMPLATE))
assert expect_exception(ValueError, lambda: RandUUID(version=5, node=0x1234))
assert expect_exception(ValueError, lambda: RandUUID(version=5, clock_seq=0x1234))

= RandUUID v3 UUID

u = RandUUID(version=3, namespace=RANDUUID_FIXED, name="scapy")._fix()
assert u.version == 3

u2 = RandUUID(version=3, namespace=RANDUUID_FIXED, name="scapy")._fix()
assert u2.version == 3
assert u.bytes == u2.bytes

# implicit v5
u2 = RandUUID(namespace=RANDUUID_FIXED, name="scapy")._fix()
assert u.bytes != u2.bytes

= RandUUID v3 UUID (incorrect args)

assert expect_exception(ValueError, lambda: RandUUID(version=5, template=RANDUUID_TEMPLATE))
assert expect_exception(ValueError, lambda: RandUUID(version=5, node=0x1234))
assert expect_exception(ValueError, lambda: RandUUID(version=5, clock_seq=0x1234))

= RandUUID looks like a UUID with str
assert re.match(r'[0-9a-f]{8}(-[0-9a-f]{4}){3}-[0-9a-f]{12}', str(RandUUID()), re.I) is not None

= RandUUID with a static part
* RandUUID template can contain static part such a 01234567-89ab-*-01*-*****ef
ru = RandUUID('01234567-89ab-*-01*-*****ef')
assert re.match(r'01234567-89ab-[0-9a-f]{4}-01[0-9a-f]{2}-[0-9a-f]{10}ef', str(ru), re.I) is not None
assert ru.command() == "RandUUID(template='01234567-89ab-*-01*-*****ef')"

= RandUUID with a range part
* RandUUID template can contain a part with a range of values such a 01234567-89ab-*-01*-****c0:c9ef
assert re.match(r'01234567-89ab-[0-9a-f]{4}-01[0-9a-f]{2}-[0-9a-f]{8}c[0-9]ef', str(RandUUID('01234567-89ab-*-01*-****c0:c9ef')), re.I) is not None

############
############
+ MPLS tests

= MPLS - build/dissection
from scapy.contrib.mpls import EoMCW, MPLS
p1 = MPLS()/IP()/UDP()
assert p1[MPLS].s == 1
p2 = MPLS()/MPLS()/IP()/UDP()
assert p2[MPLS].s == 0

p1[MPLS]
p1[IP]
p2[MPLS]
p2[MPLS:1]
p2[IP]

= MPLS encapsulated Ethernet with CW - build/dissection
p = Ether(dst="11:11:11:11:11:11", src="22:22:22:22:22:22")
p /= MPLS(label=1)/EoMCW(seq=1234)
p /= Ether(dst="33:33:33:33:33:33", src="44:44:44:44:44:44")/IP()
p = Ether(raw(p))
assert p[EoMCW].zero == 0
assert p[EoMCW].reserved == 0
assert p[EoMCW].seq == 1234

=  MPLS encapsulated Ethernet without CW - build/dissection
p = Ether(dst="11:11:11:11:11:11", src="22:22:22:22:22:22")
p /= MPLS(label=2)/MPLS(label=1)
p /= Ether(dst="33:33:33:33:33:33", src="44:44:44:44:44:44")/IP()
p = Ether(raw(p))
assert p[Ether:2].type == 0x0800

try:
    p[EoMCW]
except IndexError:
    ret = True
else:
    ret = False

assert ret
assert p[Ether:2].type == 0x0800

= MPLS encapsulated IP - build/dissection
p = Ether(dst="11:11:11:11:11:11", src="22:22:22:22:22:22")
p /= MPLS(label=1)/IP()
p = Ether(raw(p))

try:
    p[EoMCW]
except IndexError:
    ret = True
else:
    ret = False

assert ret

try:
    p[Ether:2]
except IndexError:
    ret = True
else:
    ret = False

assert ret

p[IP]


############
############
+ PacketList methods

= sr()

class Req(Packet):
    fields_desc = [
        ByteField("raw", 0)
    ]
    def answers(self, other):
        return False

class Res(Packet):
    fields_desc = [
        ByteField("raw", 0)
    ]
    def answers(self, other):
        return other.__class__ == Req and other.raw == self.raw

pl = PacketList([Req(b"1"), Res(b"1"), Req(b"2"), Req(b"3"), Req(b"4"), Res(b"3"), Res(b"1"), Res(b"1"), Res(b"4")])

srl, rl = pl.sr()
assert len(srl) == 3
assert len(rl) == 3

srl, rl = pl.sr(lookahead=1)
assert len(srl) == 1
assert len(rl) == 7

srl, rl = pl.sr(lookahead=2)
assert len(srl) == 2
assert len(rl) == 5

srl, rl = pl.sr(lookahead=3)
assert len(srl) == 3
assert len(rl) == 3

pl = PacketList([Req(b"\x05"), Res(b"1"), Res(b"2"), Res(b"3"), Res(b"4"), Res(b"3"), Res(b"1"), Res(b"1"), Res(b"\x05")])

srl, rl = pl.sr(lookahead=3)
assert len(srl) == 0
assert len(rl) == 9

srl, rl = pl.sr(lookahead=7)
assert len(srl) == 0
assert len(rl) == 9

srl, rl = pl.sr(lookahead=8)
assert len(srl) == 1
assert len(rl) == 7

srl, rl = pl.sr(lookahead=0)
assert len(srl) == 1
assert len(rl) == 7

srl, rl = pl.sr(lookahead=None)
assert len(srl) == 1
assert len(rl) == 7

= pickle test
import pickle
import io

srl, rl = PacketList([Raw(b"1"), Raw(b"1"), Raw(b"2"), Raw(b"3"), Raw(b"4"), Raw(b"3"), Raw(b"1"), Raw(b"1"), Raw(b"4")]).sr()
assert len(srl) == 4

f = io.BytesIO()

pickle.dump(srl, f)

unp = pickle.loads(f.getvalue())

assert len(unp) == len(srl)
assert all(bytes(a[0]) == bytes(b[0]) for a, b in zip(unp, srl))

= plot()

from unittest import mock
import scapy.libs.matplot

@mock.patch("scapy.libs.matplot.plt")
def test_plot(mock_plt):
    def fake_plot(data, **kwargs):
        return data
    mock_plt.plot = fake_plot
    plist = PacketList([IP(id=i)/TCP() for i in range(10)])
    lines = plist.plot(lambda p: (p.time, p.id))
    assert len(lines) == 10

test_plot()

= diffplot()

from unittest import mock
import scapy.libs.matplot

@mock.patch("scapy.libs.matplot.plt")
def test_diffplot(mock_plt):
    def fake_plot(data, **kwargs):
        return data
    mock_plt.plot = fake_plot
    plist = PacketList([IP(id=i)/TCP() for i in range(10)])
    lines = plist.diffplot(lambda x,y: (x.time, y.id-x.id))
    assert len(lines) == 9

test_diffplot()

= multiplot()

from unittest import mock
import scapy.libs.matplot

@mock.patch("scapy.libs.matplot.plt")
def test_multiplot(mock_plt):
    def fake_plot(data, **kwargs):
        return data
    mock_plt.plot = fake_plot
    tmp = [IP(id=i)/TCP() for i in range(10)]
    plist = PacketList([tuple(tmp[i-2:i]) for i in range(2, 10, 2)])
    lines = plist.multiplot(lambda x, y: (y[IP].src, (y.time, y[IP].id)))
    assert len(lines) == 1
    assert len(lines[0]) == 4

test_multiplot()

= rawhexdump()

def test_rawhexdump():
    with ContextManagerCaptureOutput() as cmco:
        p = PacketList([IP()/TCP() for i in range(2)])
        p.rawhexdump()
        result_pl_rawhexdump = cmco.get_output()
    assert len(result_pl_rawhexdump.split('\n')) == 7
    assert result_pl_rawhexdump.startswith("0000  45 00 00 28")

test_rawhexdump()

= hexraw()

def test_hexraw():
    with ContextManagerCaptureOutput() as cmco:
        p = PacketList([IP()/Raw(str(i)) for i in range(2)])
        p.hexraw()
        result_pl_hexraw = cmco.get_output()
    assert len(result_pl_hexraw.split('\n')) == 5
    assert "0000  30" in result_pl_hexraw

test_hexraw()

= hexdump()

def test_hexdump():
    with ContextManagerCaptureOutput() as cmco:
        p = PacketList([IP()/Raw(str(i)) for i in range(2)])
        p.hexdump()
        result_pl_hexdump = cmco.get_output()
    assert len(result_pl_hexdump.split('\n')) == 7
    assert "0010  7F 00 00 01 31" in result_pl_hexdump

test_hexdump()

= import_hexcap()

@mock.patch("scapy.utils.input")
def test_import_hexcap(mock_input):
    data = """
0000  FF FF FF FF FF FF AA AA AA AA AA AA 08 00 45 00  ..............E.
0010  00 1C 00 01 00 00 40 01 7C DE 7F 00 00 01 7F 00  ......@.|.......
0020  00 01 08 00 F7 FF 00 00 00 00                    ..........
"""[1:].split("\n")
    lines = iter(data)
    mock_input.side_effect = lambda: next(lines)
    return import_hexcap()

pkt = test_import_hexcap()
pkt = Ether(pkt)
assert pkt[Ether].dst == "ff:ff:ff:ff:ff:ff"
assert pkt[IP].dst == "127.0.0.1"
assert ICMP in pkt

= import_hexcap(input_string)
data = """
0000  FF FF FF FF FF FF AA AA AA AA AA AA 08 00 45 00  ..............E.
0010  00 1C 00 01 00 00 40 01 7C DE 7F 00 00 01 7F 00  ......@.|.......
0020  00 01 08 00 F7 FF 00 00 00 00                    ..........
"""[1:]
pkt = import_hexcap(data)
pkt = Ether(pkt)
assert pkt[Ether].dst == "ff:ff:ff:ff:ff:ff"
assert pkt[IP].dst == "127.0.0.1"
assert ICMP in pkt

= padding()

def test_padding():
    with ContextManagerCaptureOutput() as cmco:
        p = PacketList([IP()/conf.padding_layer(str(i)) for i in range(2)])
        p.padding()
        result_pl_padding = cmco.get_output()
    assert len(result_pl_padding.split('\n')) == 5
    assert "0000  30" in result_pl_padding

test_padding()

= nzpadding()

def test_nzpadding():
    with ContextManagerCaptureOutput() as cmco:
        p = PacketList([IP()/conf.padding_layer("AB"), IP()/conf.padding_layer("\x00\x00")])
        p.nzpadding()
        result_pl_nzpadding = cmco.get_output()
    assert len(result_pl_nzpadding.split('\n')) == 3
    assert "0000  41 42" in result_pl_nzpadding

test_nzpadding()

= conversations()

from unittest import mock
@mock.patch("scapy.plist.do_graph")
def test_conversations(mock_do_graph):
    def fake_do_graph(graph, **kwargs):
        return graph
    mock_do_graph.side_effect = fake_do_graph
    plist = PacketList([IP(dst="127.0.0.2")/TCP(dport=i) for i in range(2)])
    plist.extend([IP(src="127.0.0.2")/TCP(sport=i) for i in range(2)])
    plist.extend([IPv6(dst="::2", src="::1")/TCP(sport=i) for i in range(2)])
    plist.extend([IPv6(src="::2", dst="::1")/TCP(sport=i) for i in range(2)])
    plist.extend([Ether()/ARP(pdst="127.0.0.1")])
    result_conversations = plist.conversations()
    assert len(result_conversations.split('\n')) == 8
    assert result_conversations.startswith('digraph "conv" {')
    assert "127.0.0.1" in result_conversations
    assert "::1" in result_conversations

test_conversations()

= sessions()

pl = PacketList([Ether()/IPv6()/ICMPv6EchoRequest(), Ether()/IPv6()/IPv6()])
pl.extend([Ether()/IP()/IP(), Ether()/ARP()])
pl.extend([Ether()/Ether()/IP()])
assert len(pl.sessions().keys()) == 5

= afterglow()

from unittest import mock
@mock.patch("scapy.plist.do_graph")
def test_afterglow(mock_do_graph):
    def fake_do_graph(graph, **kwargs):
        return graph
    mock_do_graph.side_effect = fake_do_graph
    plist = PacketList([IP(dst="127.0.0.2")/TCP(dport=i) for i in range(2)])
    plist.extend([IP(src="127.0.0.2")/TCP(sport=i) for i in range(2)])
    result_afterglow = plist.afterglow()
    assert len(result_afterglow.split('\n')) == 19
    assert result_afterglow.startswith('digraph "afterglow" {')

test_afterglow()

= psdump()

print("PYX: %d" % PYX)
if PYX:
    import tempfile
    import os
    filename = tempfile.mktemp(suffix=".eps")
    plist = PacketList([IP()/TCP()])
    plist.psdump(filename)
    assert os.path.exists(filename)
    os.unlink(filename)

= pdfdump()

print("PYX: %d" % PYX)
if PYX:
    import tempfile
    import os
    filename = tempfile.mktemp(suffix=".pdf")
    plist = PacketList([IP()/TCP()])
    plist.pdfdump(filename)
    assert os.path.exists(filename)
    os.unlink(filename)

= svgdump()

print("PYX: %d" % PYX)
if PYX:
    import tempfile
    import os
    filename = tempfile.mktemp(suffix=".svg")
    plist = PacketList([IP()/TCP()])
    plist.svgdump(filename)
    assert os.path.exists(filename)
    os.unlink(filename)

= __getstate__ / __setstate__ (used by pickle)

import pickle

frm = Ether(src='00:11:22:33:44:55', dst='00:22:33:44:55:66')/Raw()
frm.time = EDecimal(123.45)
frm.sniffed_on = "iface"
frm.wirelen = 1
pl = PacketList(res=[frm, frm], name='WhatAGreatName')
pickled = pickle.dumps(pl)
pl = pickle.loads(pickled)
assert pl.listname == "WhatAGreatName"
assert len(pl) == 2
assert pl[0].time == 123.45
assert pl[0].sniffed_on == "iface"
assert pl[0].wirelen == 1
assert pl[0][Ether].src == '00:11:22:33:44:55'
assert pl[1][Ether].dst == '00:22:33:44:55:66'

= EDecimal

# GH4488
p1, p2 = EDecimal('1722417787.778435252'), EDecimal('1722417787.778435216')
assert p1 != p2
assert p1 > p2
assert not (p1 < p2)
assert p1 == 1722417787.778435252  # float test
assert p2 == 1722417787.778435216
assert (p1, 0) > (p2, 1)

############
############
+ Scapy version

= _version()

import os
from datetime import datetime, timezone

version_filename = os.path.join(scapy._SCAPY_PKG_DIR, "VERSION")
mtime = datetime.fromtimestamp(os.path.getmtime(scapy.__file__), timezone.utc)
version = "2.0.0"
with open(version_filename, "w") as fd:
    fd.write(version)

os.environ["SCAPY_VERSION"] = "9.9.9"
assert scapy._version() == "9.9.9"
del os.environ["SCAPY_VERSION"]

assert scapy._version() == version
os.unlink(version_filename)

from unittest import mock
with mock.patch("scapy._version_from_git_archive") as archive:
    archive.return_value = "4.4.4"
    assert scapy._version() == "4.4.4"
    archive.side_effect = ValueError()
    with mock.patch("scapy._version_from_git_describe") as git:
        git.return_value = "3.3.3"
        assert scapy._version() == "3.3.3"
        git.side_effect = Exception()
        assert scapy._version() == mtime.strftime("%Y.%m.%d")
        with mock.patch("os.path.getmtime") as getmtime:
            getmtime.side_effect = Exception()
            assert scapy._version() == "0.0.0"


= UTscapy HTML output

import tempfile, os
from scapy.tools.UTscapy import TestCampaign, pack_html_campaigns
test_campaign = TestCampaign("test")
test_campaign.output_file = tempfile.mktemp()
html = pack_html_campaigns([test_campaign], None, local=True)
dirname = os.path.dirname(test_campaign.output_file)
filename_js = "%s/UTscapy.js" % dirname
filename_css = "%s/UTscapy.css" % dirname
assert os.path.isfile(filename_js)
assert os.path.isfile(filename_css)
os.remove(filename_js)
os.remove(filename_css)

= test get_temp_dir

dname = get_temp_dir()
assert os.path.isdir(dname)

= test fragleak functions
~ netaccess linux fragleak

from unittest import mock

@mock.patch("scapy.layers.inet.conf.L3socket")
@mock.patch("scapy.layers.inet.select.select")
@mock.patch("scapy.layers.inet.sr1")
def _test_fragleak(func, sr1, select, L3socket):
    packets = [IP(src="4.4.4.4")/ICMP()/IPerror(dst="8.8.8.8")/conf.padding_layer(load=b"greatdata")]
    iterator = iter(packets)
    ne = lambda *args, **kwargs: next(iterator)
    L3socket.side_effect = lambda: Bunch(recv=ne, send=lambda x: None)
    sr1.side_effect = ne
    select.side_effect = lambda a, b, c, d: a+b+c
    with ContextManagerCaptureOutput() as cmco:
        func("8.8.8.8", count=1)
        out = cmco.get_output()
        return "greatdata" in out

assert _test_fragleak(fragleak)
assert _test_fragleak(fragleak2)
