blob: bddaf06315d24727e474063223b886aaef91e944 [file] [log] [blame]
% Regression tests for Scapy
# More information at http://www.secdev.org/projects/UTscapy/
############
############
+ Information on Scapy
= Setup
def expect_exception(e, c):
try:
c()
return False
except e:
return True
= Get conf
~ conf command
* Dump the current configuration
conf
IP().src
conf.loopback_name
= Test module version detection
~ conf
class FakeModule(object):
__version__ = "v1.12"
class FakeModule2(object):
__version__ = "5.143.3.12"
class FakeModule3(object):
__version__ = "v2.4.2.dev42"
from scapy.config import _version_checker
assert _version_checker(FakeModule, (1,11,5))
assert not _version_checker(FakeModule, (1,13))
assert _version_checker(FakeModule2, (5, 1))
assert not _version_checker(FakeModule2, (5, 143, 4))
assert _version_checker(FakeModule3, (2, 4, 2))
= Check Scapy version
from unittest import mock
import scapy
from scapy import _parse_tag, _version_from_git_describe
from scapy.config import _version_checker
b = Bunch(returncode=0, communicate=lambda *args, **kargs: (b"v2.4.5rc1-261-g44b98e14", None))
with mock.patch('scapy.subprocess.Popen', return_value=b):
with mock.patch('scapy.os.path.isdir', return_value=True):
class GitModuleScapy(object):
__version__ = _version_from_git_describe()
# GH3847
with mock.patch('scapy.subprocess.Popen', return_value=b):
with mock.patch('scapy.os.path.isdir', return_value=False):
try:
_version_from_git_describe()
assert False
except ValueError:
pass
assert GitModuleScapy.__version__ == '2.4.5rc1.dev261'
assert _version_checker(GitModuleScapy, (2, 4, 5))
= List layers
~ conf command
ls()
= List layers - advanced
~ conf command
with ContextManagerCaptureOutput() as cmco:
ls("IP", case_sensitive=True)
result_ls = cmco.get_output().split("\n")
assert all("IP" in x for x in result_ls if x.strip())
assert len(result_ls) >= 3
= List packet fields - ls
~ command
with ContextManagerCaptureOutput() as cmco:
ls(ARP(hwsrc="aa:aa:aa:aa:aa:aa", psrc="1.1.1.1"))
result_ls = cmco.get_output().split("\n")
result_ls
assert result_ls[5] == "hwsrc : MultipleTypeField (SourceMACField, StrFixedLenField) = 'aa:aa:aa:aa:aa:aa' ('None')"
assert result_ls[6] == "psrc : MultipleTypeField (SourceIPField, SourceIP6Field, StrFixedLenField) = '1.1.1.1' ('None')"
= List commands
~ conf command
lsc()
= List contribs
~ command
def test_list_contrib():
with ContextManagerCaptureOutput() as cmco:
list_contrib()
result_list_contrib = cmco.get_output()
assert "http2 : HTTP/2 (RFC 7540, RFC 7541) status=loads" in result_list_contrib
assert len(result_list_contrib.split('\n')) > 40
test_list_contrib()
= Test packet show() on LatexTheme
% with LatexTheme
class SmallPacket(Packet):
fields_desc = [ByteField("a", 0)]
conf_color_theme = conf.color_theme
conf.color_theme = LatexTheme()
pkt = SmallPacket()
with ContextManagerCaptureOutput() as cmco:
pkt.show()
result = cmco.get_output().strip()
assert result == '\\#\\#\\#[ \\textcolor{red}{\\bf SmallPacket} ]\\#\\#\\#\n \\textcolor{blue}{a} = \\textcolor{purple}{0}'
conf.color_theme = conf_color_theme
= Test rfc()
~ command
dat = rfc(IP, ret=True).split("\n")
assert dat[0].replace(" ", "").strip() == "0123"
assert "0123456789" in dat[1].replace(" ", "")
for l in dat:
# only upper case and +-
assert re.match(r"[A-Z+-]*", l)
# Add a space before each + to avoid conflicts with UTscapy !
# They will be stripped below
result = """
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|VERSION| IHL | TOS | LEN |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| ID |FLAGS| FRAG |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| TTL | PROTO | CHKSUM |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| SRC |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| DST |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| OPTIONS |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Fig. IP
""".strip()
result = [x.strip() for x in result.split("\n")]
output = [x.strip() for x in rfc(IP, ret=True).strip().split("\n")]
assert result == output
result = """
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| CODE | ID | LEN |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| TYPE |L|M|S|RES|VERSI| MESSAGE LEN |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| | DATA |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Fig. EAP_TTLS
""".strip()
result = [x.strip() for x in result.split("\n")]
output = [x.strip() for x in rfc(EAP_TTLS, ret=True).strip().split("\n")]
assert result == output
result = """
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|VERSION| TC | FL |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| PLEN | NH | HLIM |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| SRC |
+ +
| |
+ +
| |
+ +
| |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| DST |
+ +
| |
+ +
| |
+ +
| |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Fig. IPv6
""".strip()
result = [x.strip() for x in result.split("\n")]
output = [x.strip() for x in rfc(IPv6, ret=True).strip().split("\n")]
assert result == output
class TestPad(Packet):
fields_desc = [ShortField("f0", 0),
ShortField("f1", 0),
PadField(ByteField("f2", 1), 8),
PadField(ShortField("f3", 0), 4)]
result = """
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| F0 | F1 |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| F2 | padding |
+-+-+-+-+-+-+-+-+ +
| |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| F3 | padding |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
Fig. TestPad
""".strip()
result = [x.strip() for x in result.split("\n")]
output = [x.strip() for x in rfc(TestPad, ret=True).strip().split("\n")]
assert result == output
= Check that all contrib modules are well-configured
~ command
list_contrib(_debug=True)
= Configuration
~ conf
conf.debug_dissector = True
= Configuration conf.use_* LINUX
~ linux
try:
conf.use_bpf = True
assert False
except:
True
assert not conf.use_bpf
= Configuration conf.use_* WINDOWS
~ windows
try:
conf.use_bpf = True
assert False
except:
True
assert not conf.use_bpf
= Configuration conf.use_pcap
~ linux libpcap
if not conf.use_pcap:
assert not conf.iface.provider.libpcap
conf.use_pcap = True
assert conf.iface.provider.libpcap
for iface in conf.ifaces.values():
assert iface.provider.libpcap or iface.is_valid() == False
conf.use_pcap = False
assert not conf.iface.provider.libpcap
= Test layer filtering
~ filter
pkt = NetflowHeader()/NetflowHeaderV5()/NetflowRecordV5()
conf.layers.filter([NetflowHeader, NetflowHeaderV5])
assert NetflowRecordV5 not in NetflowHeader(bytes(pkt))
conf.layers.unfilter()
assert NetflowRecordV5 in NetflowHeader(bytes(pkt))
###########
###########
= UTscapy route check
* Check that UTscapy has correctly replaced the routes. Many tests won't work otherwise
p = IP().src
p
assert p == "127.0.0.1"
############
############
+ Scapy functions tests
= Interface related functions
from unittest import mock
conf.iface
get_if_addr(conf.iface)
get_if_hwaddr(conf.iface)
bytes_hex(get_if_raw_addr(conf.iface))
def get_dummy_interface():
"""Returns a dummy network interface"""
conf.ifaces._add_fake_iface("dummy0")
return "dummy0"
get_if_raw_addr(get_dummy_interface())
get_if_list()
get_working_if()
get_if_raw_addr6(conf.iface)
= More Interfaces related functions
# Test name resolution
old = conf.iface
conf.iface = conf.iface.name
assert conf.iface == old
assert isinstance(conf.iface, NetworkInterface)
assert conf.iface.is_valid()
from unittest import mock
@mock.patch("scapy.interfaces.conf.route.routes", [])
@mock.patch("scapy.interfaces.conf.ifaces.values")
def _test_get_working_if(rou):
rou.side_effect = lambda: []
assert get_working_if() is None
assert conf.iface + "a" # left +
assert "hey! are you, ready to go ? %s" % conf.iface # format
assert "cuz you know the way to go" + conf.iface # right +
_test_get_working_if()
= Test conf.ifaces
conf.iface
conf.ifaces
assert conf.iface in conf.ifaces.values()
assert conf.ifaces.dev_from_index(conf.iface.index) == conf.iface
assert conf.ifaces.dev_from_networkname(conf.iface.network_name) == conf.iface
conf.ifaces.data = {'a': NetworkInterface(InterfaceProvider(), {"name": 'a', "network_name": 'a', "description": 'a', "ips": ["127.0.0.1", "::1", "::2", "127.0.0.2"], "mac": 'aa:aa:aa:aa:aa:aa'})}
with ContextManagerCaptureOutput() as cmco:
conf.ifaces.show()
output = cmco.get_output()
data = """
Source Index Name MAC IPv4 IPv6
Unknown 0 a aa:aa:aa:aa:aa:aa 127.0.0.1 ::1
127.0.0.2 ::2
""".strip()
output = [x.strip() for x in output.strip().split("\n")]
data = [x.strip() for x in data.strip().split("\n")]
assert output == data
conf.ifaces.reload()
= Test extcap detection in conf.ifaces
~ linux extcap
import os
from scapy.libs.extcap import load_extcap
_bkp_extcap = conf.prog.extcap_folders
_bkp_providers = conf.ifaces.providers.copy()
conf.ifaces.providers.clear()
# Create some sort of extcap parody program
extcapfld = get_temp_dir()
extcapprog = os.path.join(extcapfld, "runner.sh")
data = """#!/usr/bin/env python3
import struct
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--extcap-interfaces', action='store_true')
parser.add_argument('--capture', action='store_true')
parser.add_argument('--extcap-config', action='store_true')
parser.add_argument('--scan-follow-rsp', action='store_true')
parser.add_argument('--scan-follow-aux', action='store_true')
parser.add_argument('--extcap-interface', type=str)
parser.add_argument('--fifo', type=str)
args = parser.parse_args()
if args.extcap_interfaces:
# List interfaces
print(bytes.fromhex("0a657874636170207b76657273696f6e3d342e312e317d7b646973706c61793d6e524620536e696666657220666f7220426c7565746f6f7468204c457d7b68656c703d68747470733a2f2f7777772e6e6f7264696373656d692e636f6d2f536f6674776172652d616e642d546f6f6c732f446576656c6f706d656e742d546f6f6c732f6e52462d536e69666665722d666f722d426c7565746f6f74682d4c457d0a696e74657266616365207b76616c75653d2f6465762f747479555342352d4e6f6e657d7b646973706c61793d6e524620536e696666657220666f7220426c7565746f6f7468204c457d0a636f6e74726f6c207b6e756d6265723d307d7b747970653d73656c6563746f727d7b646973706c61793d4465766963657d7b746f6f6c7469703d446576696365206c6973747d0a636f6e74726f6c207b6e756d6265723d317d7b747970653d73656c6563746f727d7b646973706c61793d4b65797d7b746f6f6c7469703d7d0a636f6e74726f6c207b6e756d6265723d327d7b747970653d737472696e677d7b646973706c61793d56616c75657d7b746f6f6c7469703d3620646967697420706173736b6579206f72203136206f7220333220627974657320656e6372797074696f6e206b657920696e2068657861646563696d616c207374617274696e67207769746820273078272c2062696720656e6469616e20666f726d61742e49662074686520656e7465726564206b65792069732073686f72746572207468616e203136206f722033322062797465732c2069742077696c6c206265207a65726f2d70616464656420696e2066726f6e74277d7b76616c69646174696f6e3d5c625e28285b302d395d7b367d297c2830785b302d39612d66412d465d7b312c36347d297c285b302d39412d46612d665d7b327d5b3a2d5d297b357d285b302d39412d46612d665d7b327d2920287075626c69637c72616e646f6d2929245c627d0a636f6e74726f6c207b6e756d6265723d337d7b747970653d737472696e677d7b646973706c61793d41647620486f707d7b64656661756c743d33372c33382c33397d7b746f6f6c7469703d4164766572746973696e67206368616e6e656c20686f702073657175656e63652e204368616e676520746865206f7264657220696e2077686963682074686520736e6966666572207377697463686573206164766572746973696e67206368616e6e656c732e2056616c6964206368616e6e656c73206172652033372c20333820616e642033392073657061726174656420627920636f6d6d612e7d7b76616c69646174696f6e3d5e5c732a282833377c33387c3339295c732a2c5c732a297b302c327d2833377c33387c3339297b317d5c732a247d7b72657175697265643d747275657d0a636f6e74726f6c207b6e756d6265723d377d7b747970653d627574746f6e7d7b646973706c61793d436c6561727d7b746f6f6c746f703d436c656172206f722072656d6f7665206465766963652066726f6d20446576696365206c6973747d0a636f6e74726f6c207b6e756d6265723d347d7b747970653d627574746f6e7d7b726f6c653d68656c707d7b646973706c61793d48656c707d7b746f6f6c7469703d416363657373207573657220677569646520286c61756e636865732062726f77736572297d0a636f6e74726f6c207b6e756d6265723d357d7b747970653d627574746f6e7d7b726f6c653d726573746f72657d7b646973706c61793d44656661756c74737d7b746f6f6c7469703d52657365747320746865207573657220696e7465726661636520616e6420636c6561727320746865206c6f672066696c657d0a636f6e74726f6c207b6e756d6265723d367d7b747970653d627574746f6e7d7b726f6c653d6c6f676765727d7b646973706c61793d4c6f677d7b746f6f6c7469703d4c6f672070657220696e746572666163657d0a76616c7565207b636f6e74726f6c3d307d7b76616c75653d207d7b646973706c61793d416c6c206164766572746973696e6720646576696365737d7b64656661756c743d747275657d0a76616c7565207b636f6e74726f6c3d307d7b76616c75653d5b30302c30302c30302c30302c30302c30302c305d7d7b646973706c61793d466f6c6c6f772049524b7d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d307d7b646973706c61793d4c656761637920506173736b65797d7b64656661756c743d747275657d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d317d7b646973706c61793d4c6567616379204f4f4220646174617d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d327d7b646973706c61793d4c6567616379204c544b7d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d337d7b646973706c61793d5343204c544b7d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d347d7b646973706c61793d53432050726976617465204b65797d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d357d7b646973706c61793d49524b7d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d367d7b646973706c61793d416464204c4520616464726573737d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d377d7b646973706c61793d466f6c6c6f77204c4520616464726573737d").decode())
elif args.extcap_interface and args.extcap_config:
# List config
print(bytes.fromhex("617267207b6e756d6265723d307d7b63616c6c3d2d2d6f6e6c792d6164766572746973696e677d7b646973706c61793d4f6e6c79206164766572746973696e67207061636b6574737d7b746f6f6c7469703d54686520736e69666665722077696c6c206f6e6c792063617074757265206164766572746973696e67207061636b6574732066726f6d207468652073656c6563746564206465766963657d7b747970653d626f6f6c666c61677d7b736176653d747275657d0a617267207b6e756d6265723d317d7b63616c6c3d2d2d6f6e6c792d6c65676163792d6164766572746973696e677d7b646973706c61793d4f6e6c79206c6567616379206164766572746973696e67207061636b6574737d7b746f6f6c7469703d54686520736e69666665722077696c6c206f6e6c792063617074757265206c6567616379206164766572746973696e67207061636b6574732066726f6d207468652073656c6563746564206465766963657d7b747970653d626f6f6c666c61677d7b736176653d747275657d0a617267207b6e756d6265723d327d7b63616c6c3d2d2d7363616e2d666f6c6c6f772d7273707d7b646973706c61793d46696e64207363616e20726573706f6e736520646174617d7b746f6f6c7469703d54686520736e69666665722077696c6c20666f6c6c6f77207363616e20726571756573747320616e64207363616e20726573706f6e73657320696e207363616e206d6f64657d7b747970653d626f6f6c666c61677d7b64656661756c743d747275657d7b736176653d747275657d0a617267207b6e756d6265723d337d7b63616c6c3d2d2d7363616e2d666f6c6c6f772d6175787d7b646973706c61793d46696e6420617578696c6961727920706f696e74657220646174617d7b746f6f6c7469703d54686520736e69666665722077696c6c20666f6c6c6f772061757820706f696e7465727320696e207363616e206d6f64657d7b747970653d626f6f6c666c61677d7b64656661756c743d747275657d7b736176653d747275657d0a617267207b6e756d6265723d337d7b63616c6c3d2d2d636f6465647d7b646973706c61793d5363616e20616e6420666f6c6c6f772064657669636573206f6e204c4520436f646564205048597d7b746f6f6c7469703d5363616e20666f72206465766963657320616e6420666f6c6c6f772061647665727469736572206f6e204c4520436f646564205048597d7b747970653d626f6f6c666c61677d7b64656661756c743d66616c73657d7b736176653d747275657d").decode())
elif args.capture and args.extcap_interface and args.fifo:
# Capture
pkts = [
bytes.fromhex("ffffffffffff00000000000008004500001c0001000040117cce7f0000017f0000010035003500080172")
]
with open(args.fifo, "wb", 0) as fd:
# header
fd.write(
struct.pack(
"IHHIIII",
0xa1b2c3d4,
2, 4, 0, 0, 65535, 1
)
)
for pkt in pkts:
fd.write(struct.pack("IIII", 0, 0, len(pkt), len(pkt)))
fd.write(bytes(pkt))
else:
raise ValueError("Bad arguments")
""".strip()
with open(extcapprog, "w") as fd:
fd.write(data)
print(data)
os.chmod(extcapprog, 0o777)
# Inject and load provider
conf.prog.extcap_folders = [extcapfld]
load_extcap()
print(conf.ifaces.providers)
conf.ifaces.reload()
# Now do the tests
iface = conf.ifaces.dev_from_networkname('/dev/ttyUSB5-None')
assert iface.name == "nRF Sniffer for Bluetooth LE"
sock = iface.l2listen()(iface=iface)
pkts = sock.sniff(timeout=2)
sock.close()
assert UDP in pkts[0]
config = iface.get_extcap_config()
assert config["arg"] == [
('0', '--only-advertising', 'Only advertising packets', '', ''),
('1', '--only-legacy-advertising', 'Only legacy advertising packets', '', ''),
('2', '--scan-follow-rsp', 'Find scan response data', 'true', ''),
('3', '--scan-follow-aux', 'Find auxiliary pointer data', 'true', ''),
('3', '--coded', 'Scan and follow devices on LE Coded PHY', 'false', '')
]
# Restore
conf.prog.extcap_folders = _bkp_extcap
conf.ifaces.providers = _bkp_providers
conf.ifaces.reload()
= Test read_routes6() - default output
routes6 = read_routes6()
if WINDOWS:
from scapy.arch.windows import _route_add_loopback
_route_add_loopback(routes6, True)
routes6
# Expected results:
# - one route if there is only the loopback interface
# - one route if IPv6 is supported but disabled on network interfaces
# - three routes if there is a network interface
# - on OpenBSD, only two routes on lo0 are expected
if routes6:
iflist = get_if_list()
if WINDOWS:
from scapy.arch.windows import _route_add_loopback
_route_add_loopback(ipv6=True, iflist=iflist)
if OPENBSD:
len(routes6) >= 2
elif iflist == [conf.loopback_name]:
len(routes6) == 1
elif len(iflist) >= 2:
len(routes6) >= 1
else:
False
else:
# IPv6 seems disabled. Force a route to ::1
conf.route6.routes.append(("::1", 128, "::", conf.loopback_name, ["::1"], 1))
conf.route6.ipv6_ifaces = set([conf.loopback_name])
True
= Build HBHOptUnknown for IPv6ExtHdrHopByHop with disabled autopad
~ ipv6 hbh opt
* Build the HBHOptUnknown of IPv6ExtHdrHopByHop with autopad=0
v6Opt = HBHOptUnknown(otype=3, optlen=7, optdata="Beijing")
pkt = Ether()/IPv6()/IPv6ExtHdrHopByHop(autopad=0, options=[v6Opt, ])
pkt.build()
= Build HBHOptUnknown for IPv6ExtHdrDestOpt with disabled autopad
~ ipv6 hbh opt
* Build the HBHOptUnknown of IPv6ExtHdrDestOpt with autopad=0
v6Opt = HBHOptUnknown(otype=3, optlen=6, optdata="Haikou")
pkt = Ether()/IPv6()/IPv6ExtHdrDestOpt(autopad=0, options=[v6Opt, ])
pkt.build()
= Test read_routes6() - check mandatory routes
import re
ll_route = re.compile(r"fe80:\d{0,2}:")
# match fe80::, fe80:5:, etc. (if scoped)
conf.route6
if len(routes6) > 2 and not WINDOWS:
# Identify routes to fe80::/64
assert sum(1 for r in routes6 if r[0] == "::1" and r[4] == ["::1"]) >= 1
if len(iflist) >= 2:
assert sum(1 for r in routes6 if ll_route.match(r[0]) and r[1] == 64) >= 1
try:
# Identify a route to a node IPv6 link-local address
assert sum(1 for r in routes6 if in6_islladdr(r[0]) and r[1] == 128) >= 1
except:
# IPv6 is not available, but we still check the loopback
assert conf.route6.route("::/0") == (conf.loopback_name, "::", "::")
assert sum(1 for r in routes6 if r[1] == 128 and r[4] == ["::1"]) >= 1
else:
True
= Test ifchange()
conf.route6.ifchange(conf.loopback_name, "::1/128")
if WINDOWS:
conf.netcache.in6_neighbor["::1"] = "ff:ff:ff:ff:ff:ff" # Restore fake cache
True
= Packet.route()
assert (Ether() / ARP()).route()[0] is not None
assert (Ether() / ARP()).payload.route()[0] is not None
assert (ARP(ptype=0, pdst="hello. this isn't a valid IP")).route()[0] is None
= utils/in4_is*
assert in4_ismaddr("224.0.0.1")
assert not in4_ismaddr("192.168.0.1")
assert in4_ismaddr("239.0.0.255")
assert in4_ismlladdr("224.0.0.1")
assert in4_ismlladdr("224.0.0.255")
assert not in4_ismlladdr("224.0.1.255")
assert in4_ismgladdr("235.0.0.1")
assert not in4_ismgladdr("224.0.0.1")
assert not in4_ismgladdr("239.0.0.1")
assert in4_ismlsaddr("239.0.0.1")
assert not in4_ismlsaddr("224.0.0.1")
assert in4_isaddrllallnodes("224.0.0.1")
assert not in4_isaddrllallnodes("224.0.0.3")
assert in4_getnsmac(b'\xe0\x00\x00\x01') == '01:00:5e:00:00:01'
assert getmacbyip("224.0.0.1") == '01:00:5e:00:00:01'
= plain_str test
data = b"\xffsweet\xef celestia\xab"
assert plain_str(data) == "\\xffsweet\\xef celestia\\xab"
############
############
+ compat.py
= test bytes_hex/hex_bytes
monty_data = b"Stop! Who approaches the Bridge of Death must answer me these questions three, 'ere the other side he see."
hex_data = bytes_hex(monty_data)
assert hex_data == b'53746f70212057686f20617070726f61636865732074686520427269646765206f66204465617468206d75737420616e73776572206d65207468657365207175657374696f6e732074687265652c202765726520746865206f746865722073696465206865207365652e'
assert hex_bytes(hex_data) == monty_data
= orb/chb
assert orb(b"\x01"[0]) == 1
assert chb(1) == b"\x01"
############
############
+ Main.py tests
= Pickle and unpickle a packet
import pickle
a = IP(dst="192.168.0.1")/UDP()
b = pickle.dumps(a)
c = pickle.loads(b)
assert c[IP].dst == "192.168.0.1"
assert raw(c) == raw(a)
= Usage test
from scapy.main import _usage
try:
_usage()
assert False
except SystemExit:
assert True
= Session test
import builtins
# This is automatic when using the console
def get_var(var):
return builtins.__dict__["scapy_session"][var]
def set_var(var, value):
builtins.__dict__["scapy_session"][var] = value
def del_var(var):
del builtins.__dict__["scapy_session"][var]
init_session(None, {"init_value": 123})
set_var("test_value", "8.8.8.8") # test_value = "8.8.8.8"
save_session()
del_var("test_value")
load_session()
update_session()
assert get_var("test_value") == "8.8.8.8" #test_value == "8.8.8.8"
assert get_var("init_value") == 123
= Session test with fname
session_name = tempfile.mktemp()
init_session(session_name)
set_var("test_value", IP(dst="192.168.0.1")) # test_value = IP(dst="192.168.0.1")
save_session(fname="%s.dat" % session_name)
del_var("test_value")
set_var("z", True) #z = True
load_session(fname="%s.dat" % session_name)
try:
get_var("z")
assert False
except:
pass
set_var("z", False) #z = False
update_session(fname="%s.dat" % session_name)
assert get_var("test_value").dst == "192.168.0.1" #test_value.dst == "192.168.0.1"
assert not get_var("z")
= Clear session files
os.remove("%s.dat" % session_name)
= Test temporary file creation
~ ci_only
scapy_delete_temp_files()
tmpfile = get_temp_file(autoext=".ut")
tmpfile
if WINDOWS:
assert "scapy" in tmpfile and tmpfile.lower().startswith('c:\\users\\appveyor\\appdata\\local\\temp')
else:
import platform
BYPASS_TMP = platform.python_implementation().lower() == "pypy" or DARWIN
assert "scapy" in tmpfile and (BYPASS_TMP == True or "/tmp/" in tmpfile)
assert conf.temp_files[0].endswith(".ut")
scapy_delete_temp_files()
assert len(conf.temp_files) == 0
= Emulate interact()
~ interact
import sys
from unittest import mock
from scapy.main import interact
from scapy.main import DEFAULT_PRESTART_FILE, DEFAULT_PRESTART, _read_config_file
_read_config_file(DEFAULT_PRESTART_FILE, _locals=globals(), default=DEFAULT_PRESTART)
# By now .config/scapy/startup.py should have been created
with open(DEFAULT_PRESTART_FILE, "r") as fd:
OLD_DEFAULT_PRESTART = fd.read()
with open(DEFAULT_PRESTART_FILE, "w+") as fd:
fd.write("conf.interactive_shell = 'ipython'")
# Detect IPython
try:
import IPython
except:
code_interact_import = "scapy.main.code.interact"
else:
code_interact_import = "IPython.embed"
@mock.patch(code_interact_import)
def interact_emulator(code_int, extra_args=[]):
try:
code_int.side_effect = lambda *args, **kwargs: lambda *args, **kwargs: None
interact(argv=["-s scapy1"] + extra_args, mybanner="What a test")
finally:
sys.ps1 = ">>> "
interact_emulator() # Default
try:
interact_emulator(extra_args=["-?"]) # Failing
assert False
except:
pass
interact_emulator(extra_args=["-d"]) # Extended
= Emulate interact() and test startup.py with ptpython
~ interact
import sys
from unittest import mock
from scapy.main import DEFAULT_PRESTART_FILE, DEFAULT_PRESTART, _read_config_file
_read_config_file(DEFAULT_PRESTART_FILE, _locals=globals(), default=DEFAULT_PRESTART)
# By now .config/scapy/startup.py should have been created
with open(DEFAULT_PRESTART_FILE, "w+") as fd:
fd.write("conf.interactive_shell = 'ptpython'")
called = []
def checker(*args, **kwargs):
locals = kwargs.pop("locals")
assert locals["IP"]
history_filename = kwargs.pop("history_filename")
assert history_filename == conf.histfile
called.append(True)
ptpython_mocked_module = Bunch(
repl=Bunch(
embed=checker
)
)
modules_patched = {
"ptpython": ptpython_mocked_module,
"ptpython.repl": ptpython_mocked_module.repl,
"ptpython.repl.embed": ptpython_mocked_module.repl.embed,
}
with mock.patch.dict("sys.modules", modules_patched):
try:
interact()
finally:
sys.ps1 = ">>> "
# Restore
with open(DEFAULT_PRESTART_FILE, "w") as fd:
print(OLD_DEFAULT_PRESTART)
r = fd.write(OLD_DEFAULT_PRESTART)
assert called
= Test explore() with GUI mode
~ command
from unittest import mock
def test_explore_gui(is_layer, layer):
prompt_toolkit_mocked_module = Bunch(
shortcuts=Bunch(
dialogs=Bunch(
radiolist_dialog=(lambda *args, **kargs: layer),
button_dialog=(lambda *args, **kargs: "layers" if is_layer else "contribs")
)
),
formatted_text=Bunch(HTML=lambda x: x),
__version__="2.0.0"
)
# a mock.patch isn't enough to mock a module. Let's roll sys.modules
modules_patched = {
"prompt_toolkit": prompt_toolkit_mocked_module,
"prompt_toolkit.shortcuts": prompt_toolkit_mocked_module.shortcuts,
"prompt_toolkit.shortcuts.dialogs": prompt_toolkit_mocked_module.shortcuts.dialogs,
"prompt_toolkit.formatted_text": prompt_toolkit_mocked_module.formatted_text,
}
with mock.patch.dict("sys.modules", modules_patched):
with ContextManagerCaptureOutput() as cmco:
explore()
result_explore = cmco.get_output()
return result_explore
conf.interactive = True
explore_dns = test_explore_gui(True, "scapy.layers.dns")
assert "DNS" in explore_dns
assert "DNS Question Record" in explore_dns
assert "DNSRRNSEC3" in explore_dns
assert "DNS TSIG Resource Record" in explore_dns
explore_avs = test_explore_gui(False, "avs")
assert "AVSWLANHeader" in explore_avs
assert "AVS WLAN Monitor Header" in explore_avs
= Test explore() with non-GUI mode
~ command
def test_explore_non_gui(layer):
with ContextManagerCaptureOutput() as cmco:
explore(layer)
result_explore = cmco.get_output()
return result_explore
explore_dns = test_explore_non_gui("scapy.layers.dns")
assert "DNS" in explore_dns
assert "DNS Question Record" in explore_dns
assert "DNSRRNSEC3" in explore_dns
assert "DNS TSIG Resource Record" in explore_dns
explore_avs = test_explore_non_gui("avs")
assert "AVSWLANHeader" in explore_avs
assert "AVS WLAN Monitor Header" in explore_avs
assert test_explore_non_gui("scapy.layers.dns") == test_explore_non_gui("dns")
assert test_explore_non_gui("scapy.contrib.avs") == test_explore_non_gui("avs")
try:
explore("unknown_module")
assert False # The previous should have raised an exception
except Scapy_Exception:
pass
= Test load_contrib overwrite
load_contrib("gtp")
assert GTPHeader.__module__ == "scapy.contrib.gtp"
load_contrib("gtp_v2")
assert GTPHeader.__module__ == "scapy.contrib.gtp_v2"
load_contrib("gtp")
assert GTPHeader.__module__ == "scapy.contrib.gtp"
= Test load_contrib failure
try:
load_contrib("doesnotexist")
assert False
except:
pass
= Test sane function
sane("A\x00\xFFB") == "A..B"
= Test lhex function
assert lhex(42) == "0x2a"
assert lhex((28,7)) == "(0x1c, 0x7)"
assert lhex([28,7]) == "[0x1c, 0x7]"
= Test restart function
from unittest import mock
conf.interactive = True
try:
from scapy.utils import restart
import os
@mock.patch("os.execv")
@mock.patch("subprocess.call")
@mock.patch("os._exit")
def _test(e, m, m2):
def check(x, y=[]):
z = [x] + y if not isinstance(x, list) else x + y
assert os.path.isfile(z[0])
assert os.path.isfile(z[1])
return 0
m2.side_effect = check
m.side_effect = check
e.side_effect = lambda x: None
restart()
_test()
finally:
conf.interactive = False
= Test linehexdump function
conf_color_theme = conf.color_theme
conf.color_theme = BlackAndWhite()
assert linehexdump(Ether(src="00:01:02:03:04:05"), dump=True) == 'FF FF FF FF FF FF 00 01 02 03 04 05 90 00 ..............'
conf.color_theme = conf_color_theme
= Test chexdump function
chexdump(Ether(src="00:01:02:02:04:05"), dump=True) == "0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x01, 0x02, 0x02, 0x04, 0x05, 0x90, 0x00"
= Test repr_hex function
repr_hex("scapy") == "7363617079"
= Test hexstr function
hexstr(b"A\x00\xFFB") == "41 00 FF 42 A..B"
= Test fletcher16 functions
assert fletcher16_checksum(b"\x28\x07") == 22319
assert fletcher16_checkbytes(b"\x28\x07", 1) == b"\xaf("
= Test hexdiff function
~ not_pypy
def test_hexdiff(a, b, algo=None, autojunk=False):
conf_color_theme = conf.color_theme
conf.color_theme = BlackAndWhite()
with ContextManagerCaptureOutput() as cmco:
hexdiff(a, b, algo=algo, autojunk=autojunk)
result_hexdiff = cmco.get_output()
conf.interactive = True
conf.color_theme = conf_color_theme
return result_hexdiff
# Basic string test
result_hexdiff = test_hexdiff("abcde", "abCde")
expected = "0000 61 62 63 64 65 abcde\n"
expected += " 0000 61 62 43 64 65 abCde\n"
assert result_hexdiff == expected
# More advanced string test
result_hexdiff = test_hexdiff("add_common_", "_common_removed")
expected = "0000 61 64 64 5F 63 6F 6D 6D 6F 6E 5F add_common_ \n"
expected += " -003 5F 63 6F 6D 6D 6F 6E 5F 72 65 6D 6F 76 _common_remov\n"
expected += " 000d 65 64 ed\n"
assert result_hexdiff == expected
# Compare packets
result_hexdiff = test_hexdiff(IP(dst="127.0.0.1", src="127.0.0.1"), IP(dst="127.0.0.2", src="127.0.0.1"))
expected = "0000 45 00 00 14 00 01 00 00 40 00 7C E7 7F 00 00 01 E.......@.|.....\n"
expected += " 0000 45 00 00 14 00 01 00 00 40 00 7C E6 7F 00 00 01 E.......@.|.....\n"
expected += "0010 7F 00 00 01 ....\n"
expected += " 0010 7F 00 00 02 ....\n"
assert result_hexdiff == expected
# Compare using difflib
a = "A" * 1000 + "findme" + "B" * 1000
b = "A" * 1000 + "B" * 1000
ret1 = test_hexdiff(a, b, algo="difflib")
ret2 = test_hexdiff(a, b, algo="difflib", autojunk=True)
expected_ret1 = """
03d0 03d0 41 41 41 41 41 41 41 41 41 41 41 41 41 41 41 41 AAAAAAAAAAAAAAAA
03e0 41 41 41 41 41 41 41 41 66 69 6E 64 6D 65 42 42 AAAAAAAAfindmeBB
03e0 41 41 41 41 41 41 41 41 42 42 AAAAAAAA BB
03ea 03ea 42 42 42 42 42 42 42 42 42 42 42 42 42 42 42 42 BBBBBBBBBBBBBBBB
"""
expected_ret2 = """
03d0 03d0 41 41 41 41 41 41 41 41 41 41 41 41 41 41 41 41 AAAAAAAAAAAAAAAA
03e0 41 41 41 41 41 41 41 41 66 69 6E 64 6D 65 42 42 AAAAAAAAfindmeBB
03e0 41 41 41 41 41 41 41 41 42 42 42 42 42 42 42 42 AAAAAAAABBBBBBBB
03f0 03f0 42 42 42 42 42 42 42 42 42 42 42 42 42 42 42 42 BBBBBBBBBBBBBBBB
"""
assert ret1 != ret2
assert expected_ret1 in ret1
assert expected_ret2 in ret2
# Test corner cases that should not crash
hexdiff(b"abc", IP() / TCP())
hexdiff(IP() / TCP(), b"abc")
= Test mysummary functions - Ether
p = Ether(dst="ff:ff:ff:ff:ff:ff", src="ff:ff:ff:ff:ff:ff", type=0x9000)
p
assert p.mysummary() in ['ff:ff:ff:ff:ff:ff > ff:ff:ff:ff:ff:ff (%s)' % loop
for loop in ['0x9000', 'LOOP']]
= Test zerofree_randstring function
random.seed(0x2807)
zerofree_randstring(4) in [b"\xd2\x12\xe4\x5b", b'\xd3\x8b\x13\x12']
= Test strand function
assert strand(b"AC", b"BC") == b'@C'
= Test export_object and import_object functions
from unittest import mock
def test_export_import_object():
with ContextManagerCaptureOutput() as cmco:
export_object(2807)
result_export_object = cmco.get_output(eval_bytes=True)
assert result_export_object.startswith("eNprYPL9zqUHAAdrAf8=")
assert import_object(result_export_object) == 2807
test_export_import_object()
= Test tex_escape function
tex_escape("$#_") == "\\$\\#\\_"
= Test colgen function
f = colgen(range(3))
assert len([next(f) for i in range(2)]) == 2
= Test incremental_label function
f = incremental_label()
assert [next(f) for i in range(2)] == ["tag00000", "tag00001"]
= Test corrupt_* functions
import random
random.seed(0x2807)
assert corrupt_bytes("ABCDE") in [b"ABCDW", b"ABCDX"]
assert sane(corrupt_bytes("ABCDE", n=3)) in ["A.8D4", ".2.DE"]
assert corrupt_bits("ABCDE") in [b"EBCDE", b"ABCDG"]
assert sane(corrupt_bits("ABCDE", n=3)) in ["AF.EE", "QB.TE"]
= Test save_object and load_object functions
import tempfile
fd, fname = tempfile.mkstemp()
save_object(fname, 2807)
assert load_object(fname) == 2807
= Test whois function
~ netaccess
if not WINDOWS:
result = whois("193.0.6.139")
assert b"inetnum" in result and b"Amsterdam" in result
= Test manuf DB methods
~ manufdb
assert conf.manufdb._resolve_MAC("00:00:0F:01:02:03") == "Next:01:02:03"
assert conf.manufdb._get_short_manuf("00:00:0F:01:02:03") == "Next"
assert in6_addrtovendor("fe80::0200:0fff:fe01:0203").lower().startswith("next")
assert conf.manufdb.lookup("00:00:0F:01:02:03") == ('Next', 'Next, Inc.')
assert "00:00:0F" in conf.manufdb.reverse_lookup("Next")
= Test multiple wireshark's manuf formats
~ manufdb
new_format = """
# comment
00:00:00 JokyIsland Joky Insland Corp SA
00:01:12 SecdevCorp Secdev Corporation SA LLC
EE:05:01 Scapy Scapy CO LTD & CIE
FF:00:11 NoName
"""
old_format = """
# comment
00:00:00 JokyIsland # Joky Insland Corp SA
00:01:12 SecdevCorp # Secdev Corporation SA LLC
EE:05:01 Scapy # Scapy CO LTD & CIE
FF:00:11 NoName
"""
manuf1 = get_temp_file()
manuf2 = get_temp_file()
with open(manuf1, "w") as w:
w.write(old_format)
with open(manuf2, "w") as w:
w.write(new_format)
a = load_manuf(manuf1)
b = load_manuf(manuf2)
assert a.lookup("00:00:00") == ('JokyIsland', 'Joky Insland Corp SA')
assert a.lookup("FF:00:11:00:00:00") == ('NoName', 'NoName')
assert a.reverse_lookup("Scapy") == {'EE:05:01': ('Scapy', 'Scapy CO LTD & CIE')}
assert a.reverse_lookup("Secdevcorp") == {'00:01:12': ('SecdevCorp', 'Secdev Corporation SA LLC')}
assert b.lookup("00:00:00") == ('JokyIsland', 'Joky Insland Corp SA')
assert b.lookup("FF:00:11:00:00:00") == ('NoName', 'NoName')
assert b.reverse_lookup("Scapy") == {'EE:05:01': ('Scapy', 'Scapy CO LTD & CIE')}
assert b.reverse_lookup("Secdevcorp") == {'00:01:12': ('SecdevCorp', 'Secdev Corporation SA LLC')}
scapy_delete_temp_files()
= Test load_services
data_services = """
itu-bicc-stc 3097/sctp
cvsup 5999/udp # CVSup
x11 6000-6063/tcp # X Window System
x11 6000-6063/udp # X Window System
ndl-ahp-svc 6064/tcp # NDL-AHP-SVC
"""
services = get_temp_file()
with open(services, "w") as w:
w.write(data_services)
tcp, udp, sctp = load_services(services)
assert tcp[6002] == "x11"
assert tcp.ndl_ahp_svc == 6064
assert tcp.x11 in range(6000, 6093)
assert udp[6002] == "x11"
assert udp.x11 in range(6000, 6093)
assert udp.cvsup == 5999
assert sctp[3097] == "itu_bicc_stc"
assert sctp.itu_bicc_stc == 3097
scapy_delete_temp_files()
= Test utility functions - network related
~ netaccess
assert atol("1.1.1.1") == 0x1010101
assert atol("192.168.0.1") == 0xc0a80001
= Test autorun functions
~ autorun
ret = autorun_get_text_interactive_session("IP().src")
ret
assert ret == (">>> IP().src\n'127.0.0.1'\n", '127.0.0.1')
ret = autorun_get_html_interactive_session("IP().src")
ret
assert ret == ("<span class=prompt>&gt;&gt;&gt; </span>IP().src\n'127.0.0.1'\n", '127.0.0.1')
ret = autorun_get_latex_interactive_session("IP().src")
ret
assert ret == ("\\textcolor{blue}{{\\tt\\char62}{\\tt\\char62}{\\tt\\char62} }IP().src\n'127.0.0.1'\n", '127.0.0.1')
ret = autorun_get_text_interactive_session("scapy_undefined")
assert "NameError" in ret[0]
= Test autorun with logging
cmds = """log_runtime.info(hex_bytes("446166742050756e6b"))\n"""
ret = autorun_get_text_interactive_session(cmds)
ret
assert "Daft Punk" in ret[0]
= Test utility TEX functions
assert tex_escape("{scapy}\\^$~#_&%|><") == "{\\tt\\char123}scapy{\\tt\\char125}{\\tt\\char92}\\^{}\\${\\tt\\char126}\\#\\_\\&\\%{\\tt\\char124}{\\tt\\char62}{\\tt\\char60}"
a = colgen(1, 2, 3)
assert next(a) == (1, 2, 2)
assert next(a) == (1, 3, 3)
assert next(a) == (2, 2, 1)
assert next(a) == (2, 3, 2)
assert next(a) == (2, 1, 3)
assert next(a) == (3, 3, 1)
assert next(a) == (3, 1, 2)
assert next(a) == (3, 2, 3)
= Test config file functions
saved_conf_verb = conf.verb
fd, fname = tempfile.mkstemp()
os.write(fd, b"conf.verb = 42\n")
os.close(fd)
from scapy.main import _read_config_file
_read_config_file(fname, globals(), locals())
assert conf.verb == 42
conf.verb = saved_conf_verb
= Test config file functions failures
from scapy.main import _read_config_file, _probe_config_folder
assert _read_config_file(_probe_config_folder("filethatdoesnotexistnorwillever.tsppajfsrdrr")) is None
= Test CacheInstance repr
conf.netcache
= Test pyx detection functions
from unittest.mock import patch
def _r(*args, **kwargs):
raise OSError
with patch("scapy.libs.test_pyx.subprocess.check_call", _r):
from scapy.libs.test_pyx import _test_pyx
assert _test_pyx() == False
= Test matplotlib detection functions
from unittest.mock import MagicMock, patch
bck_scapy_libs_matplot = sys.modules.get("scapy.libs.matplot", None)
if bck_scapy_libs_matplot:
del sys.modules["scapy.libs.matplot"]
mock_matplotlib = MagicMock()
mock_matplotlib.get_backend.return_value = "inline"
mock_matplotlib.pyplot = MagicMock()
mock_matplotlib.pyplot.plt = None
with patch.dict("sys.modules", **{ "matplotlib": mock_matplotlib, "matplotlib.lines": mock_matplotlib}):
from scapy.libs.matplot import MATPLOTLIB, MATPLOTLIB_INLINED, MATPLOTLIB_DEFAULT_PLOT_KARGS, Line2D
assert MATPLOTLIB == 1
assert MATPLOTLIB_INLINED == 1
assert "marker" in MATPLOTLIB_DEFAULT_PLOT_KARGS
mock_matplotlib.get_backend.return_value = "ko"
with patch.dict("sys.modules", **{ "matplotlib": mock_matplotlib, "matplotlib.lines": mock_matplotlib}):
from scapy.libs.matplot import MATPLOTLIB, MATPLOTLIB_INLINED, MATPLOTLIB_DEFAULT_PLOT_KARGS
assert MATPLOTLIB == 1
assert MATPLOTLIB_INLINED == 0
assert "marker" in MATPLOTLIB_DEFAULT_PLOT_KARGS
if bck_scapy_libs_matplot:
sys.modules["scapy.libs.matplot"] = bck_scapy_libs_matplot
############
############
+ Basic tests
* Those test are here mainly to check nothing has been broken
* and to catch Exceptions
= Packet class methods
p = IP()/ICMP()
ret = p.do_build_ps()
assert ret[0] == b"@\x00\x00\x00\x00\x01\x00\x00@\x01\x00\x00\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\x00\x00\x00\x00\x00\x00"
assert len(ret[1]) == 2
assert p[ICMP].firstlayer() == p
assert p.command() == "IP()/ICMP()"
p.decode_payload_as(UDP)
assert p.sport == 2048 and p.dport == 63487
= hide_defaults
conf_color_theme = conf.color_theme
conf.color_theme = BlackAndWhite()
p = IP(ttl=64)/ICMP()
assert repr(p) in ["<IP frag=0 ttl=64 proto=icmp |<ICMP |>>", "<IP frag=0 ttl=64 proto=1 |<ICMP |>>"]
p.hide_defaults()
assert repr(p) in ["<IP frag=0 proto=icmp |<ICMP |>>", "<IP frag=0 proto=1 |<ICMP |>>"]
conf.color_theme = conf_color_theme
= split_layers
p = IP()/ICMP()
s = raw(p)
split_layers(IP, ICMP, proto=1)
assert Raw in IP(s)
bind_layers(IP, ICMP, frag=0, proto=1)
= fuzz
r = fuzz(IP(tos=2)/ICMP())
assert r.tos == 2
z = r.ttl
assert r.ttl != z
assert r.ttl != z
= fuzz a Packet with MultipleTypeField
fuzz(ARP(pdst="127.0.0.1"))
fuzz(IP()/ARP(pdst='10.0.0.254'))
= fuzz on packets with advanced RandNum
x = IP(dst="8.8.8.8")/fuzz(UDP()/NTP(version=4))
x.show2()
x = IP(raw(x))
assert NTP in x
= fuzz on packets with FlagsField
assert isinstance(fuzz(TCP()).flags, VolatileValue)
= Building some packets
~ basic IP TCP UDP NTP LLC SNAP Dot11
IP()/TCP()
Ether()/IP()/UDP()/NTP()
Dot11()/LLC()/SNAP()/IP()/TCP()/"XXX"
IP(ttl=25)/TCP(sport=12, dport=42)
IP().summary()
= Manipulating some packets
~ basic IP TCP
a=IP(ttl=4)/TCP()
a.ttl
a.ttl=10
del a.ttl
a.ttl
TCP in a
a[TCP]
a[TCP].dport=[80,443]
a
assert a.copy().time == a.time
a=3
= Bind string array as payload
~ basic
assert bytes(Raw("sca")/"py") == b"scapy"
assert bytes(Raw("sca")/b"py") == b"scapy"
assert bytes(Raw("sca")/bytearray(b"py")) == b"scapy"
assert bytes("sca"/Raw("py")) == b"scapy"
assert bytes(b"sca"/Raw("py")) == b"scapy"
assert bytes(bytearray(b"sca")/Raw("py")) == b"scapy"
a=Raw("sca")
a.add_payload("py")
assert bytes(a) == b"scapy"
a=Raw("sca")
a.add_payload(b"py")
assert bytes(a) == b"scapy"
a=Raw("sca")
a.add_payload(bytearray(b"py"))
assert bytes(a) == b"scapy"
= Checking overloads
~ basic IP TCP Ether
a=Ether()/IP()/TCP()
r = a.proto
r
r == 6
= sprintf() function
~ basic sprintf Ether IP UDP NTP
a=Ether()/IP()/IP(ttl=4)/UDP()/NTP()
r = a.sprintf("%type% %IP.ttl% %#05xr,UDP.sport% %IP:2.ttl%")
r
r in ['0x800 64 0x07b 4', 'IPv4 64 0x07b 4']
= sprintf() function
~ basic sprintf IP TCP SNAP LLC Dot11
* This test is on the conditional substring feature of <tt>sprintf()</tt>
a=Dot11()/LLC()/SNAP()/IP()/TCP()
r = a.sprintf("{IP:{TCP:flags=%TCP.flags%}{UDP:port=%UDP.ports%} %IP.src%}")
r
r == 'flags=S 127.0.0.1'
= haslayer function
~ basic haslayer IP TCP ICMP ISAKMP
x=IP(id=1)/ISAKMP_payload_SA(prop=ISAKMP_payload_SA(prop=IP()/ICMP()))/TCP()
r = (TCP in x, ICMP in x, IP in x, UDP in x)
r
r == (True,True,True,False)
= getlayer function
~ basic getlayer IP ISAKMP UDP
x=IP(id=1)/ISAKMP_payload_SA(prop=IP(id=2)/UDP(dport=1))/IP(id=3)/UDP(dport=2)
x[IP]
x[IP:2]
x[IP:3]
x.getlayer(IP,3)
x.getlayer(IP,4)
x[UDP]
x[UDP:1]
x[UDP:2]
assert(x[IP].id == 1 and x[IP:2].id == 2 and x[IP:3].id == 3 and
x.getlayer(IP).id == 1 and x.getlayer(IP,3).id == 3 and
x.getlayer(IP,4) == None and
x[UDP].dport == 1 and x[UDP:2].dport == 2)
try:
x[IP:4]
except IndexError:
True
else:
False
= getlayer / haslayer with name
~ basic getlayer IP ICMP IPerror TCPerror
x = IP() / ICMP() / IPerror()
assert x.getlayer(ICMP) is not None
assert x.getlayer(IPerror) is not None
assert x.getlayer("IP in ICMP") is not None
assert x.getlayer(TCPerror) is None
assert x.getlayer("TCP in ICMP") is None
assert x.haslayer(ICMP)
assert x.haslayer(IPerror)
assert x.haslayer("IP in ICMP")
assert not x.haslayer(TCPerror)
assert not x.haslayer("TCP in ICMP")
= getlayer with a filter
~ getlayer IP
pkt = IP() / IP(ttl=3) / IP()
assert pkt[IP::{"ttl":3}].ttl == 3
assert pkt.getlayer(IP, ttl=3).ttl == 3
assert IPv6ExtHdrHopByHop(options=[HBHOptUnknown()]).getlayer(HBHOptUnknown, otype=42) is None
= specific haslayer and getlayer implementations for EAP
~ haslayer getlayer EAP
pkt = Ether() / EAPOL() / EAP_MD5()
assert EAP in pkt
assert pkt.haslayer(EAP)
assert isinstance(pkt[EAP], EAP_MD5)
assert isinstance(pkt.getlayer(EAP), EAP_MD5)
= specific haslayer and getlayer implementations for RadiusAttribute
~ haslayer getlayer RadiusAttribute
pkt = RadiusAttr_EAP_Message()
assert RadiusAttribute in pkt
assert pkt.haslayer(RadiusAttribute)
assert isinstance(pkt[RadiusAttribute], RadiusAttr_EAP_Message)
assert isinstance(pkt.getlayer(RadiusAttribute), RadiusAttr_EAP_Message)
= equality
~ basic
w=Ether()/IP()/UDP(dport=53)
x=Ether()/IP(version=4)/UDP()
y=Ether()/IP()/UDP(dport=4)
z=Ether()/IP()/UDP()/NTP()
t=Ether()/IP()/TCP()
assert x != y and x != z and x != t and y != z and y != t and z != t and w == x
= answers
~ basic
a1, a2 = "1.2.3.4", "5.6.7.8"
p1 = IP(src=a1, dst=a2)/ICMP(type=8)
p2 = IP(src=a2, dst=a1)/ICMP(type=0)
assert p1.hashret() == p2.hashret()
assert not p1.answers(p2)
assert p2.answers(p1)
assert p1 > p2
assert p2 < p1
assert p1 == p1
conf_back = conf.checkIPinIP
conf.checkIPinIP = True
px = [IP()/p1, IPv6()/p1]
assert not any(p.hashret() == p2.hashret() for p in px)
assert not any(p.answers(p2) for p in px)
assert not any(p2.answers(p) for p in px)
conf.checkIPinIP = False
assert all(p.hashret() == p2.hashret() for p in px)
assert not any(p.answers(p2) for p in px)
assert all(p2.answers(p) for p in px)
conf.checkIPinIP = conf_back
= answers - Net
~ netaccess
a1, a2 = Net("www.google.com"), Net("www.secdev.org")
prt1, prt2 = 12345, 54321
s1, s2 = 2767216324, 3845532842
p1 = IP(src=a1, dst=a2)/TCP(flags='SA', seq=s1, ack=s2, sport=prt1, dport=prt2)
p2 = IP(src=a2, dst=a1)/TCP(flags='R', seq=s2, ack=0, sport=prt2, dport=prt1)
assert p2.answers(p1)
assert not p1.answers(p2)
# Not available yet because of IPv6
# a1, a2 = Net6("www.google.com"), Net6("www.secdev.org")
p1 = IP(src=a1, dst=a2)/TCP(flags='S', seq=s1, ack=0, sport=prt1, dport=prt2)
p2 = IP(src=a2, dst=a1)/TCP(flags='RA', seq=0, ack=s1+1, sport=prt2, dport=prt1)
assert p2.answers(p1)
assert not p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='S', seq=s1, ack=0, sport=prt1, dport=prt2)
p2 = IP(src=a2, dst=a1)/TCP(flags='SA', seq=s2, ack=s1+1, sport=prt2, dport=prt1)
assert p2.answers(p1)
assert not p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='A', seq=s1, ack=s2+1, sport=prt1, dport=prt2)
assert not p2.answers(p1)
assert p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='S', seq=s1, ack=0, sport=prt1, dport=prt2)
p2 = IP(src=a2, dst=a1)/TCP(flags='SA', seq=s2, ack=s1+10, sport=prt2, dport=prt1)
assert not p2.answers(p1)
assert not p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='A', seq=s1, ack=s2+1, sport=prt1, dport=prt2)
assert not p2.answers(p1)
assert not p1.answers(p2)
p1 = IP(src=a1, dst=a2)/TCP(flags='A', seq=s1+9, ack=s2+10, sport=prt1, dport=prt2)
assert not p2.answers(p1)
assert not p1.answers(p2)
= conf.checkIPsrc
conf_checkIPsrc = conf.checkIPsrc
conf.checkIPsrc = 0
query = IP(id=42676, src='10.128.0.7', dst='192.168.0.1')/ICMP(id=26)
answer = IP(src='192.168.48.19', dst='10.128.0.7')/ICMP(type=11)/IPerror(id=42676, src='192.168.51.23', dst='192.168.0.1')/ICMPerror(id=26)
assert answer.answers(query)
conf.checkIPsrc = conf_checkIPsrc
############
############
+ command() / json() tests
~ command
= Test command() with normal packet
pkt = IP(dst="127.0.0.1", src="127.0.0.1") / UDP(dport=12345, sport=654)
assert pkt.command() == "IP(src='127.0.0.1', dst='127.0.0.1')/UDP(sport=654, dport=12345)"
= Test json() with normal packet
assert pkt.json() == '{"version": 4, "ihl": null, "tos": 0, "len": null, "id": 1, "flags": 0, "frag": 0, "ttl": 64, "proto": 17, "chksum": null, "src": "127.0.0.1", "dst": "127.0.0.1", "payload": {"sport": 654, "dport": 12345, "len": null, "chksum": null}}'
= Test command() with nested packet
pkt = DNS(qd=[DNSQR(qtype="A", qname="google.com")])
assert pkt.command() == "DNS(qd=[DNSQR(qname=b'google.com.', qtype=1)])"
= Test json() with nested packet
assert pkt.json() == '{"length": null, "id": 0, "qr": 0, "opcode": 0, "aa": 0, "tc": 0, "rd": 1, "ra": 0, "z": 0, "ad": 0, "cd": 0, "rcode": 0, "qdcount": null, "ancount": null, "nscount": null, "arcount": null, "qd": [{"qname": "google.com.", "qtype": 1, "unicastresponse": 0, "qclass": 1}]}'
= Test command() with ASN.1 packet
pkt = KRB_AP_REP(bytes(KRB_AP_REP(encPart=EncryptedData())))
assert pkt.command() == "KRB_AP_REP(pvno=ASN1_INTEGER(5), msgType=ASN1_INTEGER(15), encPart=EncryptedData(etype=ASN1_INTEGER(23), kvno=None, cipher=ASN1_STRING(b'')))"
= Test json(à with ASN.1 packet
assert pkt.json() == '{"pvno": {"type": "ASN1_INTEGER", "value": "5"}, "msgType": {"type": "ASN1_INTEGER", "value": "15"}, "encPart": {"etype": {"type": "ASN1_INTEGER", "value": "23"}, "kvno": null, "cipher": {"type": "ASN1_STRING", "value": ""}}}'
= Test command() with meaningless payload
pkt = PPTPStartControlConnectionReply() / IP(dst="127.0.0.1", src="127.0.0.1")
assert pkt.command() == "PPTPStartControlConnectionReply()/IP(src='127.0.0.1', dst='127.0.0.1')"
= Test json() with meaningless payload
assert pkt.json() == '{"len": 156, "type": 1, "magic_cookie": 439041101, "ctrl_msg_type": 2, "reserved_0": 0, "protocol_version": 256, "result_code": 1, "error_code": 0, "framing_capabilities": 0, "bearer_capabilities": 0, "maximum_channels": 65535, "firmware_revision": 256, "host_name": "linux", "vendor_string": "", "payload": {"version": 4, "ihl": null, "tos": 0, "len": null, "id": 1, "flags": 0, "frag": 0, "ttl": 64, "proto": 0, "chksum": null, "src": "127.0.0.1", "dst": "127.0.0.1"}}'
############
############
+ Tests on padding
= Padding assembly
r = raw(Padding("abc"))
r
assert r == b"abc"
r = raw(Padding("abc")/Padding("def"))
r
assert r == b"abcdef"
r = raw(Raw("ABC")/Padding("abc")/Padding("def"))
r
assert r == b"ABCabcdef"
r = raw(Raw("ABC")/Padding("abc")/Raw("DEF")/Padding("def"))
r
assert r == b"ABCDEFabcdef"
= Padding and length computation
p = IP(raw(IP()/Padding("abc")))
p
assert p.len == 20 and len(p) == 23
p = IP(raw(IP()/Raw("ABC")/Padding("abc")))
p
assert p.len == 23 and len(p) == 26
p = IP(raw(IP()/Raw("ABC")/Padding("abc")/Padding("def")))
p
assert p.len == 23 and len(p) == 29
= PadField test
~ PadField padding
class TestPad(Packet):
fields_desc = [ PadField(StrNullField("st", b""), 6, padwith=b"\xff"), StrField("id", b"")]
assert TestPad() == TestPad(raw(TestPad()))
assert raw(TestPad(st=b"st", id=b"id")) == b'st\x00\xff\xff\xffid'
= ReversePadField
~ PadField padding
class TestReversePad(Packet):
fields_desc = [ ByteField("a", 0),
ReversePadField(IntField("b", 0), 4)]
assert raw(TestReversePad(a=1, b=0xffffffff)) == b'\x01\x00\x00\x00\xff\xff\xff\xff'
assert TestReversePad(raw(TestReversePad(a=1, b=0xffffffff))).b == 0xffffffff
############
############
+ Tests on default value changes mechanism
= Creation of an IPv3 class from IP class with different default values
class IPv3(IP):
version = 3
ttl = 32
= Test of IPv3 class
a = IPv3()
v,t = a.version, a.ttl
v,t
assert (v,t) == (3,32)
r = raw(a)
r
assert r == b'5\x00\x00\x14\x00\x01\x00\x00 \x00\xac\xe7\x7f\x00\x00\x01\x7f\x00\x00\x01'
############
############
+ ASN.1 tests
= ASN1 - ASN1_Object
assert ASN1_Object(1) == ASN1_Object(1)
assert ASN1_Object(1) > ASN1_Object(0)
assert ASN1_Object(1) >= ASN1_Object(1)
assert ASN1_Object(0) < ASN1_Object(1)
assert ASN1_Object(1) <= ASN1_Object(2)
assert ASN1_Object(1) != ASN1_Object(2)
ASN1_Object(2).show()
= ASN1 - RandASN1Object
a = RandASN1Object()
random.seed(0x2807)
o = bytes(a)
o
assert o in [
b'\x1e\x023V', # PyPy 2.7
b'A\x02\x07q', # Python 2.7
b'F\x02\xfe\x92', # python 3.7-3.9
]
= ASN1 - ASN1_BIT_STRING
a = ASN1_BIT_STRING("test", readable=True)
a
assert a.val == '01110100011001010111001101110100'
assert raw(a) == b'\x03\x05\x00test'
a = ASN1_BIT_STRING(b"\xff"*16, readable=True)
a
assert a.val == "1" * 128
assert raw(a) == b'\x03\x11\x00\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff'
= ASN1 - ASN1_SEQUENCE
a = ASN1_SEQUENCE([ASN1_Object(1), ASN1_Object(0)])
assert a.strshow() == '# ASN1_SEQUENCE:\n <ASN1_Object[1]>\n <ASN1_Object[0]>\n'
= ASN1 - ASN1_DECODING_ERROR
a = ASN1_DECODING_ERROR("error", exc=OSError(1))
assert repr(a) == "<ASN1_DECODING_ERROR['error']{{1}}>"
b = ASN1_DECODING_ERROR("error", exc=OSError(ASN1_BIT_STRING("0")))
assert repr(b) in ["<ASN1_DECODING_ERROR['error']{{<ASN1_BIT_STRING[0]=b'\\x00' (7 unused bits)>}}>",
"<ASN1_DECODING_ERROR['error']{{<ASN1_BIT_STRING[0]='\\x00' (7 unused bits)>}}>"]
= ASN1 - ASN1_INTEGER
a = ASN1_INTEGER(int("1"*23))
assert repr(a) in ["0x25a55a46e5da99c71c7 <ASN1_INTEGER[1111111111...1111111111]>",
"0x25a55a46e5da99c71c7 <ASN1_INTEGER[1111111111...111111111L]>"]
= ASN1 - ASN1_OID
assert raw(ASN1_OID("")) == b"\x06\x00"
= RandASN1Object(), specific crashes
import random
# ASN1F_NUMERIC_STRING
random.seed(1514315682)
raw(RandASN1Object())
# ASN1F_VIDEOTEX_STRING
random.seed(1240186058)
raw(RandASN1Object())
# ASN1F_UTC_TIME & ASN1F_GENERALIZED_TIME
random.seed(1873503288)
raw(RandASN1Object())
= SSID is parsed properly even with the presence of RSN Information
~ SSID RSN Information
# A regression test for https://github.com/secdev/scapy/pull/2685.
# https://github.com/secdev/scapy/issues/2683 describes a packet with
# RSN Information that isn't parsed properly,
# causing the SSID to be overridden.
# This test checks the SSID is parsed properly.
filename = scapy_path("/test/pcaps/bad_rsn_parsing_overrides_ssid.pcap")
frame = rdpcap(filename)[0]
beacon = frame.getlayer(5)
ssid = beacon.network_stats()['ssid']
assert ssid == "ROUTE-821E295"
= SSID is parsed properly even when the Country Information Tag Element has an odd length (not complying with the standard) and a missing pad byte
~ Missing Pad Byte in Country Info
# A regression test for https://github.com/secdev/scapy/pull/2685.
# https://github.com/secdev/scapy/issues/4132 describes a packet with
# a Country Information element tag that has an odd length, even though it's against the standard.
# The transmitter should have added a padding byte to make the length even, but it didn't.
# The effect on scapy used to be improper parsing of the next tag elements, causing the SSID to be overridden.
# This test checks the SSID is parsed properly.
from io import BytesIO
pcapfile = BytesIO(b'\n\r\r\n\x80\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x03\x00\x10\x00Linux 6.1.21-v8+\x04\x00E\x00Dumpcap (Wireshark) 3.4.10 (Git v3.4.10 packaged as 3.4.10-0+deb11u1)\x00\x00\x00\x00\x00\x00\x00\x80\x00\x00\x00\x01\x00\x00\x00@\x00\x00\x00\x7f\x00\x00\x00\x00\x04\x00\x00\x02\x00\x05\x00wifi2\x00\x00\x00\t\x00\x01\x00\t\x00\x00\x00\x0c\x00\x10\x00Linux 6.1.21-v8+\x00\x00\x00\x00@\x00\x00\x00\x06\x00\x00\x00\xb0\x01\x00\x00\x00\x00\x00\x00c\xd3\x87\x17\xe3c5\x82\x90\x01\x00\x00\x90\x01\x00\x00\x00\x00 \x00\xae@\x00\xa0 \x08\x00\xa0 \x08\x00\x00\x10\x0cd\x14@\x01\xa9\x00\x0c\x00\x00\x00\xa6\x00\xa8\x01\x80\x00\x00\x00\xff\xff\xff\xff\xff\xff\x02\xbf\xaf\x9f\xf8\x07\x02\xbf\xaf\x9f\xf8\x070\x96[p\xdcM\x06\x00\x00\x00d\x00\x11\x00\x00\x00\x01\x08\x8c\x12\x98$\xb0H`l\x03\x01,\x05\x04\x00\x01\x00\x00\x07QUS \x01\r\x80$\x01\x80(\x01\x80,\x01\x800\x01\x804\x01\x808\x01\x80<\x01\x80@\x01\x80d\x01\x80h\x01\x80l\x01\x80p\x01\x80t\x01\x80x\x01\x80|\x01\x80\x80\x01\x80\x84\x01\x80\x88\x01\x80\x8c\x01\x80\x90\x01\x80\x95\x01\x80\x99\x01\x80\x9d\x01\x80\xa1\x01\x80\xa5\x01\x800\x14\x01\x00\x00\x0f\xac\x04\x01\x00\x00\x0f\xac\x04\x01\x00\x00\x0f\xac\x02\x0c\x00;\x02s\x00-\x1a,\t\x13\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00,\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00=\x16,\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xdd\x18\x00P\xf2\x02\x01\x01\x81\x00\x03\xa4\x00\x00\'\xa4\x00\x00BC]\x00a\x11.\x00\xdd;\x00P\xf2\x04\x10J\x00\x01\x10\x10D\x00\x01\x02\x10I\x00\x06\x007*\x00\x01 \x10\x11\x00\x1358" Hisense Roku TV\x10T\x00\x08\x00\x07\x00P\xf2\x04\x00\x01\xdd\x16\xc8:k\x01\x01\x1048<@dhlptx|\x80\x84\x88\x8c\x90\xdd\x12Po\x9a\t\x02\x02\x00!\x0b\x03\x06\x00\x02\xbf\xaf\x9f\xf8\x07\xdd\rPo\x9a\n\x00\x00\x06\x01\x11\x1cD\x002\xf5N\xfbh\xb0\x01\x00\x00')
pktpcap = rdpcap(pcapfile)
frame = pktpcap[0]
beacon = frame.getlayer(4)
stats = beacon.network_stats()
ssid = stats['ssid']
assert ssid == ""
country = stats['country']
assert country == 'US'
############
############
+ Network tests
* Those tests need network access
= Sending and receiving an ICMP
~ netaccess needs_root IP ICMP icmp_firewall
def _test():
with no_debug_dissector():
x = sr1(IP(dst="www.google.com")/ICMP(),timeout=3)
assert x is not None
x
assert x[IP].ottl() in [32, 64, 128, 255]
assert 0 <= x[IP].hops() <= 126
assert ICMP in x and x[ICMP].type == 0
retry_test(_test)
= Sending a TCP syn message at layer 2 and layer 3
~ netaccess needs_root IP
def _test():
tmp = send(IP(dst="8.8.8.8")/TCP(sport=RandShort(), dport=53, flags="S"), return_packets=True, realtime=True)
assert len(tmp) == 1
tmp = sendp(Ether()/IP(dst="8.8.8.8")/TCP(sport=RandShort(), dport=53, flags="S"), return_packets=True, realtime=True)
assert len(tmp) == 1
p = Ether()/IP(dst="8.8.8.8")/TCP(sport=RandShort(), dport=53, flags="S")
from decimal import Decimal
p.time = Decimal(p.time)
tmp = sendp(p, return_packets=True, realtime=True)
assert len(tmp) == 1
retry_test(_test)
= Latency check: localhost ICMP
~ netaccess needs_root linux latency
# Note: still needs to enforce L3RawSocket as this won't work otherwise with libpcap
sock = conf.L3socket
conf.L3socket = L3RawSocket
def _test():
req = IP(dst="127.0.0.1")/ICMP()
ans = sr1(req, timeout=3)
assert (ans.time - req.sent_time) >= 0
assert (ans.time - req.sent_time) <= 1e-3
try:
retry_test(_test)
finally:
conf.L3socket = sock
= Test sniffing on multiple sockets
~ netaccess needs_root sniff
# This test sniffs on the same interface twice at the same time, to
# simulate sniffing on multiple interfaces.
def _test():
iface = conf.route.route(str(Net("www.google.com")))[0]
port = int(RandShort())
pkt = IP(dst="www.google.com")/TCP(sport=port, dport=80, flags="S")
def cb():
sr1(pkt, timeout=3)
sniffer = AsyncSniffer(started_callback=cb,
iface=[iface, iface],
lfilter=lambda x: TCP in x and x[TCP].dport == port,
prn=lambda x: x.summary(),
count=2)
sniffer.start()
sniffer.join(timeout=3)
assert len(sniffer.results) == 2
for pkt in sniffer.results:
assert pkt.sniffed_on == iface
retry_test(_test)
= Test sniffing with AsyncSniffer on failed
try:
sniffer = AsyncSniffer(iface="this_interface_does_not_exists")
sniffer.start()
sniffer.join()
assert False, "Should have errored by now"
except ValueError:
assert True
= Sending a TCP syn 'forever' at layer 2 and layer 3
~ netaccess needs_root IP
def _test():
tmp = srloop(IP(dst="8.8.8.8")/TCP(sport=RandShort(), dport=53, flags="S"), count=1, timeout=3)
assert type(tmp) == tuple and len(tmp[0]) == 1
tmp = srploop(Ether()/IP(dst="8.8.8.8")/TCP(sport=RandShort(), dport=53, flags="S"), count=1, timeout=3)
assert type(tmp) == tuple and len(tmp[0]) == 1
retry_test(_test)
= Sending and receiving an TCP syn with flooding methods
~ netaccess needs_root IP flood
from functools import partial
# flooding methods do not support timeout. Packing the test for security
def _test_flood(ip, flood_function, add_ether=False):
with no_debug_dissector():
p = IP(dst=ip)/TCP(sport=RandShort(), dport=80, flags="S")
if add_ether:
p = Ether()/p
p.show2()
x = flood_function(p, timeout=0.5, maxretries=10)
if type(x) == tuple:
x = x[0][0][1]
x
assert x[IP].ottl() in [32, 64, 128, 255]
assert 0 <= x[IP].hops() <= 126
_test_srflood = partial(_test_flood, "www.google.com", srflood)
retry_test(_test_srflood)
_test_sr1flood = partial(_test_flood, "www.google.fr", sr1flood)
retry_test(_test_sr1flood)
_test_srpflood = partial(_test_flood, "www.google.net", srpflood, True)
retry_test(_test_srpflood)
_test_srp1flood = partial(_test_flood, "www.google.co.uk", srp1flood, True)
retry_test(_test_srp1flood)
= test chainEX
~ netaccess
import socket
sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ssck = StreamSocket(sck)
try:
r = ssck.sr1(ICMP(type='echo-request'), timeout=0.1, chainEX=True, threaded=False)
assert False
except Exception:
assert True
finally:
sck.close()
= Sending and receiving an ICMPv6EchoRequest
~ netaccess ipv6
def _test():
with no_debug_dissector():
x = sr1(IPv6(dst="www.google.com")/ICMPv6EchoRequest(),timeout=3)
x
assert x[IPv6].ottl() in [32, 64, 128, 255]
assert 0 <= x[IPv6].hops() <= 126
x is not None and ICMPv6EchoReply in x and x[ICMPv6EchoReply].type == 129
retry_test(_test)
= Whois request
~ netaccess IP as_resolvers
* This test retries on failure because it often fails
def _test():
IP(src="8.8.8.8").whois()
retry_test(_test)
= AS resolvers
~ netaccess IP as_resolvers
* This test retries on failure because it often fails
def _test():
ret = conf.AS_resolver.resolve("8.8.8.8", "8.8.4.4")
assert (len(ret) == 2)
assert any(x[1] == "AS15169" for x in ret)
retry_test(_test)
riswhois_data = b"route: 8.8.8.0/24\ndescr: Google\norigin: AS15169\nnotify: [email protected]\nmnt-by: MAINT-AS15169\nchanged: [email protected] 20150728\nsource: RADB\n\nroute: 8.0.0.0/9\ndescr: Proxy-registered route object\norigin: AS3356\nremarks: auto-generated route object\nremarks: this next line gives the robot something to recognize\nremarks: L'enfer, c'est les autres\nremarks: \nremarks: This route object is for a Level 3 customer route\nremarks: which is being exported under this origin AS.\nremarks: \nremarks: This route object was created because no existing\nremarks: route object with the same origin was found, and\nremarks: since some Level 3 peers filter based on these objects\nremarks: this route may be rejected if this object is not created.\nremarks: \nremarks: Please contact [email protected] if you have any\nremarks: questions regarding this object.\nmnt-by: LEVEL3-MNT\nchanged: [email protected] 20060203\nsource: LEVEL3\n\n\n"
ret = AS_resolver_riswhois()._parse_whois(riswhois_data)
assert ret == ('AS15169', 'Google')
retry_test(_test)
# This test is too buggy, and is simulated below
#def _test():
# ret = AS_resolver_cymru().resolve("8.8.8.8")
# assert (len(ret) == 1)
# all(x[1] == "AS15169" for x in ret)
#
#retry_test(_test)
cymru_bulk_data = """
Bulk mode; whois.cymru.com [2017-10-03 08:38:08 +0000]
24776 | 217.25.178.5 | INFOCLIP-AS, FR
36459 | 192.30.253.112 | GITHUB - GitHub, Inc., US
26496 | 68.178.213.61 | AS-26496-GO-DADDY-COM-LLC - GoDaddy.com, LLC, US
"""
tmp = AS_resolver_cymru().parse(cymru_bulk_data)
assert len(tmp) == 3
assert [l[1] for l in tmp] == ['AS24776', 'AS36459', 'AS26496']
= AS resolver - IPv6
~ netaccess IP as_resolvers
* This test retries on failure because it often fails
def _test():
as_resolver6 = AS_resolver6()
ret = as_resolver6.resolve("2001:4860:4860::8888", "2001:4860:4860::4444")
assert (len(ret) == 2)
assert any(x[1] == 15169 for x in ret)
retry_test(_test)
= AS resolver - socket error
~ IP
* This test checks that a failing resolver will not crash a script
class MockAS_resolver(object):
def resolve(self, *ips):
raise socket.error
asrm = AS_resolver_multi(MockAS_resolver())
assert len(asrm.resolve(["8.8.8.8", "8.8.4.4"])) == 0
= sendpfast
~ tcpreplay
old_interactive = conf.interactive
conf.interactive = False
try:
sendpfast([])
assert False
except Exception:
assert True
conf.interactive = old_interactive
assert True
############
############
+ Generator tests
= Implicit logic 1
~ IP TCP
a = Ether() / IP(ttl=(5, 10)) / TCP(dport=[80, 443])
ls(a)
ls(a, verbose=True)
l = [p for p in a]
len(l) == 12
= Implicit logic 2
~ IP
a = IP(ttl=[1,2,(5,9)])
ls(a)
ls(a, verbose=True)
l = [p for p in a]
len(l) == 7
= Implicit logic 3
# In case there's a single option: __iter__ should not return self
a = Ether()/IP(src="127.0.0.1", dst="127.0.0.1")/ICMP()
for i in a:
i.sent_time = 1
assert a.sent_time is None
# In case they are several, self should never be returned
a = Ether()/IP(src="127.0.0.1", dst="127.0.0.1")/ICMP(seq=(0, 5))
for i in a:
i.sent_time = 1
assert a.sent_time is None
############
############
+ Real usages
= Port scan
~ netaccess needs_root IP TCP
def _test():
with no_debug_dissector():
ans,unans=sr(IP(dst="www.google.com/30")/TCP(dport=[80,443]), timeout=2)
# New format: all Python versions
ans.make_table(lambda s, r: (s.dst, s.dport, r.sprintf("{TCP:%TCP.flags%}{ICMP:%ICMP.code%}")))
retry_test(_test)
= Send & receive with debug_match
~ netaccess needs_root IP ICMP
def _test():
old_debug_match = conf.debug_match
conf.debug_match = True
with no_debug_dissector():
ans, unans = sr(IP(dst="www.google.fr") / TCP(sport=RandShort(), dport=80, flags="S"), timeout=2)
assert ans[0].query == ans[0][0]
assert ans[0].answer == ans[0][1]
conf.debug_match = old_debug_match
assert ans and not unans
retry_test(_test)
= Send & receive with retry
~ netaccess needs_root IP ICMP
def _test():
with no_debug_dissector():
ans, unans = sr(IP(dst=["8.8.8.8", "1.2.3.4"]) / TCP(sport=RandShort(), dport=53, flags="S"), timeout=2, retry=1)
assert len(ans) == 1 and len(unans) == 1
retry_test(_test)
= Send & receive with multi
~ netaccess needs_root IP ICMP
def _test():
with no_debug_dissector():
ans, unans = sr(IP(dst=["8.8.8.8", "1.2.3.4"]) / TCP(sport=RandShort(), dport=53, flags="S"), timeout=2, multi=1)
assert len(ans) >= 1 and len(unans) == 1
retry_test(_test)
= Traceroute function
~ netaccess needs_root tcpdump
* Let's test traceroute
def _test():
with no_debug_dissector():
ans, unans = traceroute("www.slashdot.org")
ans.nsummary()
s,r=ans[0]
s.show()
s.show(2)
retry_test(_test)
= send() and sniff()
~ netaccess needs_root
def _test():
sendp(Ether()/IP(src="9.0.0.0")/UDP(), count=3, iface=conf.iface)
r = sniff(timeout=3, count=1,
lfilter=lambda x: IP in x and x[IP].src == "9.0.0.0",
iface=conf.iface,
started_callback=_test)
assert r
= sniff() with socket failure
* GH issue 3631
REFPACKET = Ether()/IP()/UDP()
# A socket that fails after 10 packets
class OOPipe(ObjectPipe):
def recv(self, x=MTU):
self.i = getattr(self, "i", 0) + 1
if self.i == 11:
self.close()
raise OSError("Giant failure")
pkt = super(OOPipe, self).recv(x)
self.send(REFPACKET)
return pkt
o = OOPipe()
o.send(REFPACKET)
pkts = sniff(opened_socket=[o], timeout=3)
assert len(pkts) == 10
= GH issue 3306
~ netaccess needs_root
send(fuzz(ARP()))
= Test SuperSocket.select
~ select
from unittest import mock
@mock.patch("scapy.supersocket.select")
def _test_select(select):
def f(a, b, c, d):
raise IOError(0)
select.side_effect = f
try:
SuperSocket.select([])
return False
except:
return True
assert _test_select()
= Test L2ListenTcpdump socket
~ netaccess
# Needs to be fixed. Fails randomly
#import time
#for i in range(10):
# read_s = L2ListenTcpdump(iface=conf.iface)
# out_s = conf.L2socket(iface=conf.iface)
# time.sleep(5) # wait for read_s to be ready
# icmp_r = Ether()/IP(dst="secdev.org")/ICMP()
# res = sndrcv(out_s, icmp_r, timeout=5, rcv_pks=read_s)[0]
# read_s.close()
# out_s.close()
# time.sleep(5)
# if res:
# break
#
#response = res[0][1]
#assert response[ICMP].type == 0
True
= Test set of sent_time by sr
~ netaccess needs_root IP ICMP
def _test():
packet = IP(dst="8.8.8.8")/ICMP()
r = sr(packet, timeout=2)
assert packet.sent_time is not None
retry_test(_test)
= Test set of sent_time by sr (multiple packets)
~ netaccess needs_root IP ICMP
def _test():
packet1 = IP(dst="8.8.8.8")/ICMP()
packet2 = IP(dst="8.8.4.4")/ICMP()
r = sr([packet1, packet2], timeout=2)
assert packet1.sent_time is not None
assert packet2.sent_time is not None
retry_test(_test)
= Test set of sent_time by srflood
~ netaccess needs_root IP ICMP
def _test():
packet = IP(dst="8.8.8.8")/ICMP()
r = srflood(packet, timeout=0.5)
assert packet.sent_time is not None
retry_test(_test)
= Test set of sent_time by srflood (multiple packets)
~ netaccess needs_root IP ICMP
def _test():
packet1 = IP(dst="8.8.8.8")/ICMP()
packet2 = IP(dst="8.8.4.4")/ICMP()
r = srflood([packet1, packet2], timeout=0.5)
assert packet1.sent_time is not None
assert packet2.sent_time is not None
retry_test(_test)
############
############
+ ManuFDB tests
= __repr__
if conf.manufdb:
len(conf.manufdb)
else:
True
= check _resolve_MAC
if conf.manufdb:
assert conf.manufdb._resolve_MAC("00:00:17") == "Oracle"
else:
True
############
############
+ Ether tests with IPv6
= Ether IPv6 checking for dst
~ netaccess ipv6
p = Ether()/IPv6(dst="www.google.com")/TCP()
assert p.dst != p[IPv6].dst
p.show()
############
############
+ pcap / pcapng format support
~ pcap
= Variable creations
from io import BytesIO
pcapfile = BytesIO(b'\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00e\x00\x00\x00\xcf\xc5\xacVo*\n\x00(\x00\x00\x00(\x00\x00\x00E\x00\x00(\x00\x01\x00\x00@\x06|\xcd\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91|\x00\x00\xcf\xc5\xacV_-\n\x00\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r\xcf\xc5\xacV\xf90\n\x00\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x01|\xde\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\xf7\xff\x00\x00\x00\x00')
pcapngfile = BytesIO(b'\n\r\r\n\\\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x01\x00,\x00File created by merging: \nFile1: test.pcap \n\x04\x00\x08\x00mergecap\x00\x00\x00\x00\\\x00\x00\x00\x01\x00\x00\x00\\\x00\x00\x00e\x00\x00\x00\xff\xff\x00\x00\x02\x006\x00Unknown/not available in original file format(libpcap)\x00\x00\t\x00\x01\x00\x06\x00\x00\x00\x00\x00\x00\x00\\\x00\x00\x00\x06\x00\x00\x00H\x00\x00\x00\x00\x00\x00\x00\x8d*\x05\x00/\xfc[\xcd(\x00\x00\x00(\x00\x00\x00E\x00\x00(\x00\x01\x00\x00@\x06|\xcd\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91|\x00\x00H\x00\x00\x00\x06\x00\x00\x00<\x00\x00\x00\x00\x00\x00\x00\x8d*\x05\x00\x1f\xff[\xcd\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r<\x00\x00\x00\x06\x00\x00\x00<\x00\x00\x00\x00\x00\x00\x00\x8d*\x05\x00\xb9\x02\\\xcd\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x01|\xde\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\xf7\xff\x00\x00\x00\x00<\x00\x00\x00')
pcapnanofile = BytesIO(b"M<\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00e\x00\x00\x00\xcf\xc5\xacV\xc9\xc1\xb5'(\x00\x00\x00(\x00\x00\x00E\x00\x00(\x00\x01\x00\x00@\x06|\xcd\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91|\x00\x00\xcf\xc5\xacV-;\xc1'\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r\xcf\xc5\xacV\x9aL\xcf'\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x01|\xde\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\xf7\xff\x00\x00\x00\x00")
pcapwirelenfile = BytesIO(b'\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00\x01\x00\x00\x00}\x87pZ.\xa2\x08\x00\x0f\x00\x00\x00\x10\x00\x00\x00\xff\xff\xff\xff\xff\xff GG\xee\xdd\xa8\x90\x00a')
pcapngdefaults = BytesIO(base64.b64decode(b'Cg0NChwAAABNPCsaAQAAAP//////////HAAAAAEAAAAgAAAAEgEAAP//AAAJAAEACUeZiQAAAAAgAAAAAQAAACAAAAASAQAA//8AAAkAAQAJAAAAAAAAACAAAAABAAAAIAAAABIBAAD//wAACQABAAkAAAAAAAAAIAAAAAEAAAAgAAAAEgEAAP//AAAJAAEACQAAAAAAAAAgAAAABgAAAIQBAAADAAAApO/bFdgJaeBiAQAAYgEAAFVVVVVVVVXV////////IMbr4D7PCABFAAFIlQkAAEAR5JwAAAAA/////wBEAEMBNJDsAQEGAFSpVwIACoAAAAAAAAAAAAAAAAAAAAAAACDG6+A+zwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABjglNjNQEB/wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAsOs+bAAAhAEAAAYAAACAAQAAAwAAAKTv2xXIDYznYAEAAGABAABVVVVVVVVV1QEAXn//+iDG6+A+zwgARQABRgGPAAAEEal3qf5wqO////rhbgdsATJi0U5PVElGWSAqIEhUVFAvMS4xDQpIT1NUOiAyMzkuMjU1LjI1NS4yNTA6MTkwMA0KQ0FDSEUtQ09OVFJPTDogbWF4LWFnZT0xODAwDQpMT0NBVElPTjogaHR0cDovLzE2OS4yNTQuMTEyLjE2ODo1NTAwMC9ucmMvZGRkLnhtbA0KTlQ6IHV1aWQ6NEQ0NTQ5MzAtMDIwMC0xMDAwLTgwMDEtMjBDNkVCRTAzRUNGDQpOVFM6IHNzZHA6YWxpdmUNClNFUlZFUjogRnJlZUJTRC84LjAgVVBuUC8xLjAgUGFuYXNvbmljLU1JTC1ETE5BLVNWLzEuMA0KVVNOOiB1dWlkOjRENDU0OTMwLTAyMDAtMTAwMC04MDAxLTIwQzZFQkUwM0VDRg0KDQpcQcvWgAEAAAYAAAC4AQAAAwAAAKTv2xV4Ao3nlQEAAJUBAABVVVVVVVVV1QEAXn//+iDG6+A+zwgARQABewGQAAAEEalBqf5wqO////rhbgdsAWfu+k5PVElGWSAqIEhUVFAvMS4xDQpIT1NUOiAyMzkuMjU1LjI1NS4yNTA6MTkwMA0KQ0FDSEUtQ09OVFJPTDogbWF4LWFnZT0xODAwDQpMT0NBVElPTjogaHR0cDovLzE2OS4yNTQuMTEyLjE2ODo1NTAwMC9ucmMvZGRkLnhtbA0KTlQ6IHVybjpwYW5hc29uaWMtY29tOmRldmljZTpwMDBSZW1vdGVDb250cm9sbGVyOjENCk5UUzogc3NkcDphbGl2ZQ0KU0VSVkVSOiBGcmVlQlNELzguMCBVUG5QLzEuMCBQYW5hc29uaWMtTUlMLURMTkEtU1YvMS4wDQpVU046IHV1aWQ6NEQ0NTQ5MzAtMDIwMC0xMDAwLTgwMDEtMjBDNkVCRTAzRUNGOjp1cm46cGFuYXNvbmljLWNvbTpkZXZpY2U6cDAwUmVtb3RlQ29udHJvbGxlcjoxDQoNCrLVKmoAAAC4AQAABgAAAHgBAAADAAAApO/bFVjbjedXAQAAVwEAAFVVVVVVVVXVAQBef//6IMbr4D7PCABFAAE9AZEAAAQRqX6p/nCo7///+uFuB2wBKaZATk9USUZZICogSFRUUC8xLjENCkhPU1Q6IDIzOS4yNTUuMjU1LjI1MDoxOTAwDQpDQUNIRS1DT05UUk9MOiBtYXgtYWdlPTE4MDANCkxPQ0FUSU9OOiBodHRwOi8vMTY5LjI1NC4xMTIuMTY4OjU1MDAwL25yYy9kZGQueG1sDQpOVDogdXBucDpyb290ZGV2aWNlDQpOVFM6IHNzZHA6YWxpdmUNClNFUlZFUjogRnJlZUJTRC84LjAgVVBuUC8xLjAgUGFuYXNvbmljLU1JTC1ETE5BLVNWLzEuMA0KVVNOOiB1dWlkOjRENDU0OTMwLTAyMDAtMTAwMC04MDAxLTIwQzZFQkUwM0VDRjo6dXBucDpyb290ZGV2aWNlDQoNCjagXoUAeAEAAAYAAAC0AQAAAwAAAKTv2xXYw47nkwEAAJMBAABVVVVVVVVV1QEAXn//+iDG6+A+zwgARQABeQGSAAAEEalBqf5wqO////rhbgdsAWWV4E5PVElGWSAqIEhUVFAvMS4xDQpIT1NUOiAyMzkuMjU1LjI1NS4yNTA6MTkwMA0KQ0FDSEUtQ09OVFJPTDogbWF4LWFnZT0xODAwDQpMT0NBVElPTjogaHR0cDovLzE2OS4yNTQuMTEyLjE2ODo1NTAwMC9ucmMvZGRkLnhtbA0KTlQ6IHVybjpwYW5hc29uaWMtY29tOnNlcnZpY2U6cDAwTmV0d29ya0NvbnRyb2w6MQ0KTlRTOiBzc2RwOmFsaXZlDQpTRVJWRVI6IEZyZWVCU0QvOC4wIFVQblAvMS4wIFBhbmFzb25pYy1NSUwtRExOQS1TVi8xLjANClVTTjogdXVpZDo0RDQ1NDkzMC0wMjAwLTEwMDAtODAwMS0yMEM2RUJFMDNFQ0Y6OnVybjpwYW5hc29uaWMtY29tOnNlcnZpY2U6cDAwTmV0d29ya0NvbnRyb2w6MQ0KDQovXKFrALQBAAAGAAAAqAEAAAMAAACk79sVuJKP54cBAACHAQAAVVVVVVVVVdUBAF5///ogxuvgPs8IAEUAAW0BkwAABBGpTKn+cKjv///64W4HbAFZRNJOT1RJRlkgKiBIVFRQLzEuMQ0KSE9TVDogMjM5LjI1NS4yNTUuMjUwOjE5MDANCkNBQ0hFLUNPTlRST0w6IG1heC1hZ2U9MTgwMA0KTE9DQVRJT046IGh0dHA6Ly8xNjkuMjU0LjExMi4xNjg6NTUwMDAvbnJjL2RkZC54bWwNCk5UOiB1cm46ZGlhbC1tdWx0aXNjcmVlbi1vcmc6c2VydmljZTpkaWFsOjENCk5UUzogc3NkcDphbGl2ZQ0KU0VSVkVSOiBGcmVlQlNELzguMCBVUG5QLzEuMCBQYW5hc29uaWMtTUlMLURMTkEtU1YvMS4wDQpVU046IHV1aWQ6NEQ0NTQ5MzAtMDIwMC0xMDAwLTgwMDEtMjBDNkVCRTAzRUNGOjp1cm46ZGlhbC1tdWx0aXNjcmVlbi1vcmc6c2VydmljZTpkaWFsOjENCg0KLn5A6QCoAQAA'))
= Read a pcap file
pktpcap = rdpcap(pcapfile)
= Read a pcapng file
pktpcapng = rdpcap(pcapngfile)
assert pktpcapng[0].time == 1454163407.666223
= Read a pcap file with nanosecond precision
pktpcapnano = rdpcap(pcapnanofile)
assert pktpcapnano[0].time == 1454163407.666223049
= Read a pcapng file with nanosecond precision and default tsresol
pktpcapngdefaults = rdpcap(pcapngdefaults)
assert pktpcapngdefaults[0].time == 1575115986.114775512
assert Ether in pktpcapngdefaults[0]
= Read a pcapng with little-endian SHB
pktcapng = sniff(offline=scapy_path("/test/pcaps/macos.pcapng.gz"))
assert len(pktcapng) != 0
= Write a pcapng
tmpfile = get_temp_file(autoext=".pcapng")
r = RawPcapNgWriter(tmpfile)
r._write_block_shb()
r._write_block_idb(linktype=DLT_EN10MB)
ts = 1632568366.384185
r._write_block_epb(raw(Ether()/"Hello Scapy!!!"), ifid=0, timestamp=ts)
r.f.close()
assert os.stat(tmpfile).st_size == 108
l = rdpcap(tmpfile)
assert b"Scapy" in l[0][Raw].load
assert l[0].time == ts
= Check wrpcapng()
tmpfile = get_temp_file(autoext=".pcapng")
p = Ether()/"Hello Scapy!!!"
p.time = 1632568366.384185
wrpcapng(tmpfile, p)
assert os.stat(tmpfile).st_size == 108
l = rdpcap(tmpfile)
assert b"Scapy" in l[0][Raw].load
assert l[0].time == ts
p = Ether() / IPv6() / TCP()
p.comment = b"Hello Scapy!"
wrpcapng(tmpfile, p)
l = rdpcap(tmpfile)
assert l[0].comment == p.comment
= rdpcap on fifo
~ linux
f = get_temp_file()
os.unlink(f)
os.mkfifo(f)
p = Ether(bytes(Ether(dst="ff:ff:ff:ff:ff:ff")/"Hello Scapy!!!"))
s = AsyncSniffer(offline=f)
s.start()
wrpcap(f, p)
s.join(timeout=1)
assert s.results[0] == p
= Check multiple packets with different combination of linktype,comment,direction,sniffed_on fields. test both wrpcap() and wrpcapng()
import random,string
random.seed(0x2807)
plist = []
ptypes = []
ptypes.append(Ether((Ether() / IPv6() / TCP()).build()))
ptypes.append(IP((IP() / IPv6() / TCP()).build()))
ifaces=[None,'','i','int0',''.join(random.choices(string.printable,k=20))]
comments=[None,'','a','abcd',''.join(random.choices(string.printable,k=20))]
directions=[None,0,1,2,3]
for iface in ifaces:
for comment in comments:
if comment is not None:
comment=comment.encode('utf-8')
for direction in directions:
for p in ptypes:
if iface is not None and type(ptypes[ifaces.index(iface) % len(ptypes)]) != type(p):
continue
pnew = p.copy()
pnew.time = 1632568366.384185
pnew.sniffed_on = iface
pnew.direction = direction
pnew.comment = comment
plist.append(pnew)
random.shuffle(plist)
tmpfile = get_temp_file(autoext=".pcapng")
wrpcapng(tmpfile, plist)
plist_check = rdpcap(tmpfile)
assert len(plist_check) == len(plist)
for i in range(len(plist)):
assert plist_check[i].comment == plist[i].comment
assert plist_check[i].direction == plist[i].direction
assert plist_check[i].sniffed_on == plist[i].sniffed_on
assert plist_check[i].time == plist[i].time
#if interface is unknown, verify pkt bytes integrity and that linktype was set to first packet
if plist[i].sniffed_on is None:
assert bytes(plist_check[i]) == bytes(plist[i])
assert type(plist_check[i]) == type(plist[0])
else:
assert plist_check[i] == plist[i]
tmpfile = get_temp_file(autoext=".pcap")
wrpcap(tmpfile, plist)
plist_check = rdpcap(tmpfile)
for i in range(len(plist)):
assert plist_check[i].time == plist[i].time
assert type(plist_check[i]) == type(plist[0])
assert bytes(plist_check[i]) == bytes(plist[i])
= PcapNg - Process Information Block
pib_pcapng_file = BytesIO(b'\n\r\r\n\xbc\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x02\x00\x05\x00arm64\x00\x00\x00\x03\x00f\x00Darwin Kernel Version 23.3.0: Thu Dec 21 02:29:41 PST 2023; root:xnu-10002.81.5~11/RELEASE_ARM64_T8122\x00\x00\x04\x00 \x00tcpdump (libpcap version 1.10.1)\x00\x00\x00\x00\xbc\x00\x00\x00\x01\x00\x00\x00 \x00\x00\x00\x01\x00\x00\x00\x00\x00\x08\x00\x02\x00\x03\x00en0\x00\x00\x00\x00\x00 \x00\x00\x00\x01\x00\x00\x80 \x00\x00\x00$\'\x00\x00\x02\x00\x06\x00trustd\x00\x00\x00\x00\x00\x00 \x00\x00\x00\x01\x00\x00\x80$\x00\x00\x00")\x00\x00\x02\x00\x0c\x00mobileassetd\x00\x00\x00\x00$\x00\x00\x00\x06\x00\x00\x00\x90\x00\x00\x00\x00\x00\x00\x00\xfb\x18\x06\x00EcqdB\x00\x00\x00B\x00\x00\x00\xe8\x9f\x80\xfa\x8c\xc6P\xa6\xd8\xd5\x83v\x08\x00E\x00\x004\x00\x00@\x00@\x06\x90T\nh\x01\xc3\xc0\xe5\xdd_\xf4\xb8\x00P\x95\xc3\xcb\x01\xcb\xeb\x11\xe8\x80\x11\x08\x00\x0c\xe6\x00\x00\x01\x01\x08\n\xbe\xb8\xd4\xb3\xbb\x9b4\xbc\x00\x00\x01\x80\x04\x00\x00\x00\x00\x00\x03\x80\x04\x00\x01\x00\x00\x00\x02\x00\x04\x00\x02\x00\x00\x00\x02\x80\x04\x00\x00\x00\x00\x00\x04\x80\x04\x00\x10\x00\x00\x00\x00\x00\x00\x00\x90\x00\x00\x00')
l = rdpcap(pib_pcapng_file)
assert(len(l) == 1)
assert(TCP in l[0])
assert(len(l[0].process_information) == 2)
assert(l[0].process_information["proc"]["name"] == "trustd")
= OSS-Fuzz Findings
from io import BytesIO
# Issue 68352
file = BytesIO(b"\n\r\r\n\x1c\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x1c\x00\x00\x00\x01\x00\x00\x00\x14\x00\x00\x00\xe4\x00\x00\x00\x00\x00\x04\x00\x14\x00\x00\x00\x01\x00\x00\x00(\x00\x00\x00\xe4\x00\x00\x00\x00\x00\x04\x00\x02\x00\t\x00b'ens16\xb0'\x00\x00\x00\x00\x00\x00\x00(\x00\x00\x00\x06\x00\x00\x004\x00\x00\x00\x01\x00\x00\x00}\x17\x06\x00\xb5t\x1d\x85\x14\x00\x00\x00\x14\x00\x00\x00E\x00\x00\x14\x00\x01\x00\x00@\x00|\xe7\x7f\x00\x00\x01\x7f\x00\x00\x014\x00\x00\x00")
rdpcap(file)
# Issue 68354
file = BytesIO(b'\n\r\r\n\xff\xfe\xfe\xffM<+\x1a')
try:
rdpcap(file)
except Scapy_Exception:
pass
# Issue #70115
file = BytesIO(b"\n\r\r\n\x00\x00\x008\x1a+<M\x00\x01\x00\x00\xef\xff\xff\x81\x01\x00\x00\x01\x00\x00\x00\x0c\x00\x00\x00\x00\x00\x00\x00\x009\x00\x00\x00\x01\x00\x00\x000\x00e\x00\x00\x00\x00\x00\x00\x00\x008\x00\x00\x00\x06\x00\x00\x00d\x00\x00\x00\x00'\x00\x00\x00\x01\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x000\x00\x00\x00\x01\x00\x00\x008\x00\x00\n\x0c\x00A\x05\xc0\x01\x00\x00\x00\x00\x00d\x00\x00\x00\x00\x00\x00\x00\x00\x000\x00\x00\x00\x01\x00\x00\x008\x00\x00\n\x0c\x00A\x05\xc0\x83\x00\x00\x043\x01\x00\x00\x00\x00\x00d\x00\x00\x00\x00d")
l = rdpcap(file)
# Issue #69628
file = BytesIO(b"\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00\x01\x00\x00\x00\x04{\xdcf\xc2\xa5\x07\x008\x00\x00\x008\x00\x00\x00A]+\xdb]\x04\x8e(6\n\x99\xcb\x08\x00E\x00\x00*\x00\x01\x00\x00@\x06\xe3V\x07\x87\xa5m\x17\x15\xd3m\x01\x85\x01\x85\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\xc5_\x00\x000\x00")
l = rdpcap(file)
assert l[0][LDAP].summary() == "LDAP"
# Issue #69628 - 32-bit alternative
file = BytesIO(b'\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00\x01\x00\x00\x00%\xa8\xddfK\x1b\x05\x00\xca\xca\xca\xca*\x00\x00\x00\xff\xff\xff\xff\xff\xff\x86"\x11&\xab3\x08\x06\x00\x01\x08\x00\x06\x04\x00\x01]\x80\x0f\x13*r\n\x00\x02\x0f\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
l = rdpcap(file)
assert len(l) == 0 or ARP in l[0]
= Read a pcap file with wirelen != captured len
pktpcapwirelen = rdpcap(pcapwirelenfile)
= Check all packet lists are the same
assert list(pktpcap) == list(pktpcapng) == list(pktpcapnano)
assert [float(p.time) for p in pktpcap] == [float(p.time) for p in pktpcapng] == [float(p.time) for p in pktpcapnano]
= Check packets from pcap file
assert all(IP in pkt for pkt in pktpcap)
assert all(any(proto in pkt for pkt in pktpcap) for proto in [ICMP, UDP, TCP])
= Check wirelen value from pcap file
assert len(pktpcapwirelen) == 1
assert pktpcapwirelen[0].wirelen is not None
assert len(pktpcapwirelen[0]) < pktpcapwirelen[0].wirelen
= Check wrpcap() then rdpcap() with wirelen
import os, tempfile
fdesc, filename = tempfile.mkstemp()
fdesc = os.fdopen(fdesc, "wb")
wrpcap(fdesc, pktpcapwirelen)
fdesc.close()
newpktpcapwirelen = rdpcap(filename)
assert len(newpktpcapwirelen) == 1
assert newpktpcapwirelen[0].wirelen is not None
assert len(newpktpcapwirelen[0]) < newpktpcapwirelen[0].wirelen
assert newpktpcapwirelen[0].wirelen == pktpcapwirelen[0].wirelen
= Check wrpcap() then rdpcap() with sent_time on SndRcvList
f = get_temp_file()
s = Ether()/IP()
r = Ether()/IP()
s.sent_time = 1
r.time = 2
wrpcap(f, SndRcvList([(s, r)]))
pcap = rdpcap(f)
assert pcap[0].time == 1
assert pcap[1].time == 2
= Check wrpcap()
fdesc, filename = tempfile.mkstemp()
fdesc = os.fdopen(fdesc, "wb")
wrpcap(fdesc, pktpcap)
fdesc.close()
= Check offline sniff() (by PacketList)
l=sniff(offline=PacketList([IP()/TCP(),IP()/TCP()]))
assert len(l) == 2
assert all(TCP in p for p in l)
= Check offline sniff() (by filename)
assert list(pktpcap) == list(sniff(offline=filename))
= Check offline sniff() (by file object)
fdesc = open(filename, "rb")
assert list(pktpcap) == list(sniff(offline=fdesc))
fdesc.close()
= Check offline sniff() with a filter (by filename)
~ tcpdump libpcap
pktpcap_flt = [(proto, sniff(offline=filename, filter=proto.__name__.lower()))
for proto in [ICMP, UDP, TCP]]
assert all(list(pktpcap[proto]) == list(packets) for proto, packets in pktpcap_flt)
= Check offline sniff() with a filter (by file object)
~ tcpdump libpcap
fdesc = open(filename, "rb")
pktpcap_tcp = sniff(offline=fdesc, filter="tcp")
fdesc.close()
assert list(pktpcap[TCP]) == list(pktpcap_tcp)
os.unlink(filename)
= Check offline sniff() with a PcapNg file and a filter (by file object)
~ tcpdump libpcap
pcapng_data = b'\n\r\r\n`\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x04\x009\x00TShark (Wireshark) 3.2.3 (Git v3.2.3 packaged as 3.2.3-1)\x00\x00\x00\x00\x00\x00\x00`\x00\x00\x00\x01\x00\x00\x00\x14\x00\x00\x00\xe4\x00\x00\x00\xff\xff\x00\x00\x14\x00\x00\x00\x06\x00\x00\x00<\x00\x00\x00\x00\x00\x00\x00\x98\xcd\x05\x00\x19\x83\xf7\x9e\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r<\x00\x00\x00'
if OPENBSD:
# Note: OpenBSD tcpdump does not support PcapNg
assert True
else:
fdesc, filename = tempfile.mkstemp()
os.close(fdesc)
fd = open(filename, "wb")
fd.write(pcapng_data)
fd.close()
packets = sniff(offline=filename, filter="udp")
os.unlink(filename)
assert UDP in packets[0]
= Check offline sniff() with Packets and tcpdump with a filter
~ tcpdump libpcap
l = sniff(offline=IP()/UDP(sport=(10000, 10001)), filter="udp")
assert len(l) == 2
assert all(UDP in p for p in l)
l = sniff(offline=[p for p in IP()/UDP(sport=(10000, 10001))], filter="udp")
assert len(l) == 2
assert all(UDP in p for p in l)
l = sniff(offline=IP()/UDP(sport=(10000, 10001)), filter="tcp")
assert len(l) == 0
= Check offline sniff() with Packets, tcpdump and a bad filter
~ tcpdump libpcap
try:
sniff(offline=IP()/UDP(), filter="bad filter")
except Scapy_Exception:
pass
else:
assert False
= Check online sniff() with a bad filter
~ needs_root tcpdump libpcap
try:
sniff(timeout=0, filter="bad filter")
except Scapy_Exception:
pass
else:
assert False
= Check offline sniff with lfilter
assert len(sniff(offline=[IP()/UDP(), IP()/TCP()], lfilter=lambda x: TCP in x)) == 1
= Check offline sniff() without a tcpdump binary
~ tcpdump
from unittest import mock
conf_prog_tcpdump = conf.prog.tcpdump
conf.prog.tcpdump = "tcpdump_fake"
def _test_sniff_notcpdump():
try:
sniff(offline="fake.pcap", filter="tcp")
assert False
except:
assert True
_test_sniff_notcpdump()
conf.prog.tcpdump = conf_prog_tcpdump
= Check wrpcap(nano=True)
fdesc, filename = tempfile.mkstemp()
fdesc = os.fdopen(fdesc, "wb")
pktpcapnano[0].time += Decimal('1E-9')
wrpcap(fdesc, pktpcapnano, nano=True)
fdesc.close()
pktpcapnanoread = rdpcap(filename)
assert pktpcapnanoread[0].time == pktpcapnano[0].time
os.unlink(filename)
= Check PcapNg with nanosecond precision using obsolete packet block
* first packet from capture file icmp2.ntar -- https://wiki.wireshark.org/Development/PcapNg?action=AttachFile&do=view&target=icmp2.ntar
pcapngfile = BytesIO(b'\n\r\r\n\x1c\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xa8\x03\x00\x00\x00\x00\x00\x00\x1c\x00\x00\x00\x01\x00\x00\x00(\x00\x00\x00\x01\x00\x00\x00\xff\xff\x00\x00\r\x00\x01\x00\x04\x04K\x00\t\x00\x01\x00\tK=N\x00\x00\x00\x00(\x00\x00\x00\x02\x00\x00\x00n\x00\x00\x00\x00\x00\x00\x00e\x14\x00\x00)4\'ON\x00\x00\x00N\x00\x00\x00\x00\x12\xf0\x11h\xd6\x00\x13r\t{\xea\x08\x00E\x00\x00<\x90\xa1\x00\x00\x80\x01\x8e\xad\xc0\xa8M\x07\xc0\xa8M\x1a\x08\x00r[\x03\x00\xd8\x00abcdefghijklmnopqrstuvwabcdefghi\xeay$\xf6\x00\x00n\x00\x00\x00')
pktpcapng = rdpcap(pcapngfile)
assert len(pktpcapng) == 1
pkt = pktpcapng[0]
# weird, but wireshark agrees
assert pkt.time == 22425.352221737
assert isinstance(pkt, Ether)
pkt = pkt.payload
assert isinstance(pkt, IP)
pkt = pkt.payload
assert isinstance(pkt, ICMP)
pkt = pkt.payload
assert isinstance(pkt, Raw) and pkt.load == b'abcdefghijklmnopqrstuvwabcdefghi'
pkt = pkt.payload
assert isinstance(pkt, Padding) and pkt.load == b'\xeay$\xf6'
pkt = pkt.payload
assert isinstance(pkt, NoPayload)
= Check PcapNg using Simple Packet Block
* previous file with the (obsolete) packet block replaced by a Simple Packet Block
pcapngfile = BytesIO(b'\n\r\r\n\x1c\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xa8\x03\x00\x00\x00\x00\x00\x00\x1c\x00\x00\x00\x01\x00\x00\x00(\x00\x00\x00\x01\x00\x00\x00\xff\xff\x00\x00\r\x00\x01\x00\x04\x04K\x00\t\x00\x01\x00\tK=N\x00\x00\x00\x00(\x00\x00\x00\x03\x00\x00\x00`\x00\x00\x00N\x00\x00\x00\x00\x12\xf0\x11h\xd6\x00\x13r\t{\xea\x08\x00E\x00\x00<\x90\xa1\x00\x00\x80\x01\x8e\xad\xc0\xa8M\x07\xc0\xa8M\x1a\x08\x00r[\x03\x00\xd8\x00abcdefghijklmnopqrstuvwabcdefghi\xeay$\xf6\x00\x00`\x00\x00\x00')
pktpcapng = rdpcap(pcapngfile)
assert len(pktpcapng) == 1
pkt = pktpcapng[0]
assert isinstance(pkt, Ether)
pkt = pkt.payload
assert isinstance(pkt, IP)
pkt = pkt.payload
assert isinstance(pkt, ICMP)
pkt = pkt.payload
assert isinstance(pkt, Raw) and pkt.load == b'abcdefghijklmnopqrstuvwabcdefghi'
pkt = pkt.payload
assert isinstance(pkt, Padding) and pkt.load == b'\xeay$\xf6'
pkt = pkt.payload
assert isinstance(pkt, NoPayload)
= Invalid pcapng files
from io import BytesIO
# Invalid PCAPNG format -> Raise
try:
invalid_pcapngfile_1 = BytesIO(b'\n\r\r\n\r\x00\x00\x00M<+\x1a\xb2<\xb2\xa1\x01\x00\x00\x00\r\x00\x00\x00M<+\x1a\x80\xaa\xb2\x02')
rdpcap(invalid_pcapngfile_1)
assert False
except Scapy_Exception:
pass
# Invalid Packet in PCAPNG -> return
invalid_pcapngfile_2 = BytesIO(b'\n\r\r\n\x00\x00\x00\x1c\x1a+<M\x00\x01\x00\x00\x00\x00\x00\x01\x00\x00\x00\x10\x00\x00\x00\x1c\x00\x00\x00\x01\x00\x00\x00\x10\x12\x12\x12\x12\x00\x00\x00\x10')
assert len(rdpcap(invalid_pcapngfile_2)) == 0
# Invalid interface ID in PCAPNG -> raise EOFError
try:
invalid_pcapngfile_3 = BytesIO(b'\n\n\n\x14\x00\x00\x00M<+\x1a \x14\x00\x00\x00\x03\x00\x00\x00\x14\x00\x00\x00 \x14\x00\x00\x00')
rdpcap(invalid_pcapngfile_3)
assert False
except Scapy_Exception:
pass
# Invalid SPB in PCAPNG -> raise EOFError
try:
invalid_pcapngfile_4 = BytesIO(b'\n\n\n\x14\x00\x00\x00M<+\x1a \x14\x00\x00\x00\x01\x00\x00\x00\x14\x00\x00\x00 \x14\x00\x00\x00\x03\x00\x00\x00\x0c\x00\x00\x00\x0c\x00\x00\x00')
rdpcap(invalid_pcapngfile_4)
assert False
except Scapy_Exception:
pass
= Check PcapWriter on null write
f = BytesIO()
w = PcapWriter(f)
w.write([])
assert len(f.getvalue()) == 0
# Stop being closed for reals, but we still want to have the header written
with mock.patch.object(f, 'close') as cf:
w.close()
cf.assert_called_once_with()
assert len(f.getvalue()) != 0
= Check PcapWriter sets correct linktype after null write
f = BytesIO()
w = PcapWriter(f)
w.write([])
assert len(f.getvalue()) == 0
w.write(Ether()/IP()/ICMP())
assert len(f.getvalue()) != 0
# Stop being closed for reals, but we still want to have the header written
with mock.patch.object(f, 'close') as cf:
w.close()
cf.assert_called_once_with()
f.seek(0) or None
assert len(f.getvalue()) != 0
r = PcapReader(f)
f.seek(0) or None
assert r.LLcls is Ether
assert r.linktype == DLT_EN10MB
l = [ p for p in RawPcapReader(f) ]
assert len(l) == 1
= Check RawPcapReader on pcap
~ pcap
fd = get_temp_file()
wrpcap(fd, [Ether()/IP()/ICMP()])
assert len([p for p in RawPcapReader(fd)]) == 1
for (x, y) in RawPcapReader(fd):
pass
= Check RawPcapReader with a Context Manager
~ pcap
filename = get_temp_file(fd=False)
wrpcap(filename, [IP()/TCP(), IP()/UDP()])
try:
with RawPcapReader(filename) as reader:
packet = next(reader, None)
assert True
except TypeError:
assert False
= Check RawPcapWriter
~ pcap
# GH3256
fd = get_temp_file()
with RawPcapWriter(fd, linktype=1) as w:
w.write(b"test")
fd = get_temp_file()
with RawPcapWriter(fd) as w:
w.write(b"test")
assert w.linktype == 1
= Check tcpdump()
~ tcpdump
from io import BytesIO
* No very specific tests because we do not want to depend on tcpdump output
pcapfile = BytesIO(b'\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00\x01\x00\x00\x000}$]\xff\\\t\x006\x00\x00\x006\x00\x00\x00\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x08\x00E\x00\x00(\x00\x01\x00\x00@\x06|\xcd\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91|\x00\x000}$]\x87i\t\x00*\x00\x00\x00*\x00\x00\x00\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x08\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r0}$]\xfbp\t\x00*\x00\x00\x00*\x00\x00\x00\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x08\x00E\x00\x00\x1c\x00\x01\x00\x00@\x01|\xde\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\xf7\xff\x00\x00\x00\x00')
data = tcpdump(pcapfile, dump=True, args=['-nn']).split(b'\n')
print(data)
assert b'127.0.0.1.20 > 127.0.0.1.80:' in data[0]
assert b'127.0.0.1.53 > 127.0.0.1.53:' in data[1]
assert b'127.0.0.1 > 127.0.0.1:' in data[2]
* Non existing tcpdump binary
from unittest import mock
conf_prog_tcpdump = conf.prog.tcpdump
conf.prog.tcpdump = "tcpdump_fake"
def _test_tcpdump_notcpdump():
try:
tcpdump(IP()/TCP())
assert False
except:
assert True
_test_tcpdump_notcpdump()
conf.prog.tcpdump = conf_prog_tcpdump
# Also check with use_tempfile=True (for non-OSX platforms)
pcapfile.seek(0) or None
tempfile_count = len(conf.temp_files)
data = tcpdump(pcapfile, dump=True, args=['-nn'], use_tempfile=True).split(b'\n')
print(data)
assert b'127.0.0.1.20 > 127.0.0.1.80:' in data[0]
assert b'127.0.0.1.53 > 127.0.0.1.53:' in data[1]
assert b'127.0.0.1 > 127.0.0.1:' in data[2]
# We should have another tempfile tracked.
assert len(conf.temp_files) > tempfile_count
# Check with a simple packet
data = tcpdump([Ether()/IP()/ICMP()], dump=True, args=['-nn']).split(b'\n')
print(data)
assert b'127.0.0.1 > 127.0.0.1: ICMP' in data[0].upper()
= Check tcpdump() command with linktype
~ tcpdump libpcap
f = BytesIO()
pkt = Ether()/IP()/ICMP()
with mock.patch('subprocess.Popen', return_value=Bunch(
stdin=f, wait=lambda: None)) as popen:
# Prevent closing the BytesIO
with mock.patch.object(f, 'close'):
tcpdump([pkt], linktype="DLT_EN10MB", use_tempfile=False)
expected_command = [conf.prog.tcpdump, '-y', 'EN10MB', '-U', '-r', '-']
if OPENBSD:
expected_command = [conf.prog.tcpdump, '-y', 'EN10MB', '-r', '-']
popen.assert_called_once_with(
expected_command,
stdin=subprocess.PIPE, stdout=None, stderr=None)
print(bytes_hex(f.getvalue()))
assert raw(pkt) in f.getvalue()
f.close()
del f, pkt
= Check tcpdump() command with linktype and args
~ tcpdump libpcap
f = BytesIO()
pkt = Ether()/IP()/ICMP()
with mock.patch('subprocess.Popen', return_value=Bunch(
stdin=f, wait=lambda: None)) as popen:
# Prevent closing the BytesIO
with mock.patch.object(f, 'close'):
tcpdump([pkt], linktype=scapy.data.DLT_EN10MB, use_tempfile=False)
expected_command = [conf.prog.tcpdump, '-y', 'EN10MB', '-U', '-r', '-']
if OPENBSD:
expected_command = [conf.prog.tcpdump, '-y', 'EN10MB', '-r', '-']
popen.assert_called_once_with(
expected_command,
stdin=subprocess.PIPE, stdout=None, stderr=None)
print(bytes_hex(f.getvalue()))
assert raw(pkt) in f.getvalue()
f.close()
del f, pkt
= Check sniff() offline with linktype & 802.11 filter
~ tcpdump linux
fd = get_temp_file()
wrpcap(fd, [RadioTap()/Dot11()/Dot11ProbeReq(), RadioTap()/Dot11()])
lst = sniff(offline=fd, filter="subtype probe-req")
assert len(lst) == 1
= Check tcpdump() command rejects non-string input for prog
pkt = Ether()/IP()/ICMP()
try:
tcpdump([pkt], prog=+17607067425, args=['-nn'])
except ValueError as e:
if hasattr(e, 'args'):
assert 'prog' in e.args[0]
else:
assert 'prog' in e.message
else:
assert False, 'expected exception'
= Check tcpdump() command with tshark
~ tshark
pcapfile = BytesIO(b'\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00e\x00\x00\x00\xcf\xc5\xacVo*\n\x00(\x00\x00\x00(\x00\x00\x00E\x00\x00(\x00\x01\x00\x00@\x06|\xcd\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91|\x00\x00\xcf\xc5\xacV_-\n\x00\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r\xcf\xc5\xacV\xf90\n\x00\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x01|\xde\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\xf7\xff\x00\x00\x00\x00')
# tshark doesn't need workarounds on OSX
tempfile_count = len(conf.temp_files)
values = [tuple(int(val) for val in line[:-1].split(b'\t')) for line in tcpdump(pcapfile, prog=conf.prog.tshark, getfd=True, args=['-T', 'fields', '-e', 'ip.ttl', '-e', 'ip.proto'])]
assert values == [(64, 6), (64, 17), (64, 1)]
assert len(conf.temp_files) == tempfile_count
= Check tdecode command directly for tshark
~ tshark
pkts = [
Ether()/IP(src='192.0.2.1', dst='192.0.2.2')/ICMP(type='echo-request')/Raw(b'X'*100),
Ether()/IP(src='192.0.2.2', dst='192.0.2.1')/ICMP(type='echo-reply')/Raw(b'X'*100),
]
# tshark doesn't need workarounds on OSX
tempfile_count = len(conf.temp_files)
r = tdecode(pkts, dump=True)
r
assert b'Src: 192.0.2.1' in r
assert b'Src: 192.0.2.2' in r
assert b'Dst: 192.0.2.2' in r
assert b'Dst: 192.0.2.1' in r
assert b'Echo (ping) request' in r
assert b'Echo (ping) reply' in r
assert b'ICMP' in r
assert len(conf.temp_files) == tempfile_count
= Check tdecode with linktype
~ tshark
# These are the same as the ping packets above
pkts = [
b'\xff\xff\xff\xff\xff\xff\xac"\x0b\xc5j\xdb\x08\x00E\x00\x00\x80\x00\x01\x00\x00@\x01\xf6x\xc0\x00\x02\x01\xc0\x00\x02\x02\x08\x00\xb6\xbe\x00\x00\x00\x00XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX',
b'\xff\xff\xff\xff\xff\xff\xac"\x0b\xc5j\xdb\x08\x00E\x00\x00\x80\x00\x01\x00\x00@\x01\xf6x\xc0\x00\x02\x02\xc0\x00\x02\x01\x00\x00\xbe\xbe\x00\x00\x00\x00XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX',
]
# tshark doesn't need workarounds on OSX
tempfile_count = len(conf.temp_files)
r = tdecode(pkts, dump=True, linktype=DLT_EN10MB)
assert b'Src: 192.0.2.1' in r
assert b'Src: 192.0.2.2' in r
assert b'Dst: 192.0.2.2' in r
assert b'Dst: 192.0.2.1' in r
assert b'Echo (ping) request' in r
assert b'Echo (ping) reply' in r
assert b'ICMP' in r
assert len(conf.temp_files) == tempfile_count
= Run scapy's tshark command
~ needs_root
tshark(count=1, timeout=3)
= Check wireshark()
~ wireshark
f = BytesIO()
pkt = Ether()/IP()/ICMP()
with mock.patch('subprocess.Popen', return_value=Bunch(stdin=f)) as popen:
# Prevent closing the BytesIO
with mock.patch.object(f, 'close'):
wireshark([pkt])
popen.assert_called_once_with(
[conf.prog.wireshark, '-ki', '-'],
stdin=subprocess.PIPE, stdout=None, stderr=None)
print(bytes_hex(f.getvalue()))
assert raw(pkt) in f.getvalue()
f.close()
del f, pkt
= Check Raw IP pcap files
import tempfile
filename = tempfile.mktemp(suffix=".pcap")
wrpcap(filename, [IP()/UDP(), IPv6()/UDP()], linktype=DLT_RAW)
packets = rdpcap(filename)
assert isinstance(packets[0], IP) and isinstance(packets[1], IPv6)
= Check wrpcap() with no packet
import tempfile
filename = tempfile.mktemp(suffix=".pcap")
wrpcap(filename, [])
fstat = os.stat(filename)
assert fstat.st_size != 0
os.remove(filename)
= Check wrpcap() with SndRcvList
import tempfile
filename = tempfile.mktemp(suffix=".pcap")
wrpcap(filename, SndRcvList(res=[(Ether()/IP(), Ether()/IP())]))
assert len(rdpcap(filename)) == 2
os.remove(filename)
= Check wrpcap() with different packets types
from unittest import mock
import os
import tempfile
with mock.patch("scapy.utils.warning") as warning:
filename = tempfile.mktemp()
wrpcap(filename, [IP(), Ether(), IP(), IP()])
os.remove(filename)
assert any("Inconsistent" in arg for arg in warning.call_args[0])
= Check wrpcap() with the Loopback layer
~ tshark
for cls in [Loopback, LoopbackOpenBSD]:
filename = tempfile.mktemp(suffix=".pcap")
wrpcap(filename, [cls()/IP()/ICMP()])
return_value = b"".join(line for line in tcpdump(filename, prog=conf.prog.tshark, getfd=True))
assert b"Echo (ping) request" in return_value
############
############
+ ERF Ethernet format support
= Variable creations
erffile = BytesIO(b'3;!E_9\x92_\x02\x04\x00p\x00\x00\x00P\x00\x00\x00\x0fS?\xca\xc0\x1cjz\x18\x90\xed\x81\x00\x01:\x08\x00E\x00\x00(\xdf\xab@\x00;\x06\xb3s\n\x01]\xdb\n\xfb9\xda\xc3v\x84\xecD\x16\xb9\xab\xda\xa1b\xf9P\x10f\x98\x18\xcb\x00\x00\x00\x00\x90\x9e\xd7\xd2_\x929_\x0f\x9e\xcd\x1f\x01\x88\xb9\x15[/s<\x01\x88\xb9\x15[/\xcd\x1f\x01\x88\xb9\x15[/0\xcd"E_9\x92_\x02\x04\x00p\x00\x00\x00P\x00\x00\x1cjz\x18\x90\xed\x00\x0fS?\xca\xc0\x08\x00E\x00\x00(\xa2\xdd@\x00@\x06\xebA\n\xfb9\xda\n\x01]\xdb\x84\xec\xc3v\xda\xa1b\xf9D\x16\xb9\xacP\x10\x9a\xf0\xe4q\x00\x00\x00\x00\x00\x00\x00\x00o\xbc\xe2{_\x929_\x0f\x9f+3\x01\x88\xb9\x15u\x1e(^\x01\x88\xb9\x15u\x1e+3\x01\x88\xb9\x15u\x1e')
erffilewithheader = BytesIO(b'4;!E_9\x92_\x82\x00\x00x\x00\x00\x00P\x00\x00\x1a+<M^o\x00\x00\x00\x0fS?\xca\xc0\x1cjz\x18\x90\xed\x81\x00\x01:\x08\x00E\x00\x00(\xdf\xab@\x00;\x06\xb3s\n\x01]\xdb\n\xfb9\xda\xc3v\x84\xecD\x16\xb9\xab\xda\xa1b\xf9P\x10f\x98\x18\xcb\x00\x00\x00\x00\x90\x9e\xd7\xd2_\x929_\x0f\x9e\xcd\x1f\x01\x88\xb9\x15[/s<\x01\x88\xb9\x15[/\xcd\x1f\x01\x88\xb9\x15[/0\xcd"E_9\x92_\x82\x00\x00x\x00\x00\x00P\x00\x00\x1a+<M^o\x00\x00\x1cjz\x18\x90\xed\x00\x0fS?\xca\xc0\x08\x00E\x00\x00(\xa2\xdd@\x00@\x06\xebA\n\xfb9\xda\n\x01]\xdb\x84\xec\xc3v\xda\xa1b\xf9D\x16\xb9\xacP\x10\x9a\xf0\xe4q\x00\x00\x00\x00\x00\x00\x00\x00o\xbc\xe2{_\x929_\x0f\x9f+3\x01\x88\xb9\x15u\x1e(^\x01\x88\xb9\x15u\x1e+3\x01\x88\xb9\x15u\x1e')
= Check reading of ERF Ethernet file
pkterf = rderf(erffile)
assert pkterf[0].time == 1603418463.270038318
assert pkterf[0][IP].src == "10.1.93.219"
assert pkterf[0][IP].dst == "10.251.57.218"
assert pkterf[0][Ether].src == "1c:6a:7a:18:90:ed"
assert pkterf[0][Ether].dst == "00:0f:53:3f:ca:c0"
= Check writing of ERF Ethernet file
import os, tempfile
fdesc, filename = tempfile.mkstemp()
fdesc = os.fdopen(fdesc, "wb")
wrerf(fdesc, pkterf)
fdesc.close()
newpkterf = rderf(filename)
assert pkterf[1][Ether].src == newpkterf[1][Ether].src
assert len(pkterf) == len(newpkterf)
assert newpkterf[0].time is not None
assert newpkterf[0].wirelen is not None
assert newpkterf[0].time == pkterf[0].time
assert newpkterf[0].wirelen == pkterf[0].wirelen
assert newpkterf[1].time is not None
assert newpkterf[1].wirelen is not None
assert newpkterf[1].time == pkterf[1].time
assert newpkterf[1].wirelen == pkterf[1].wirelen
_, filename = tempfile.mkstemp()
wrerf(filename, pkterf, append=True)
wrerf(filename, pkterf, append=True)
newdoublepkterf = rderf(filename)
assert len(newpkterf) * 2 == len(newdoublepkterf)
= Check rderf
pkterf = rderf(erffilewithheader)
assert pkterf[1].time == 1603418463.270062279
assert pkterf[1][Ether].src == "00:0f:53:3f:ca:c0"
############
############
+ Mocked read_routes() and read_routes6() calls
= Create patcher util
~ mock_read_routes_bsd little_endian_only
# mock the random to get consistency
from unittest import mock
from scapy.pton_ntop import inet_pton
import scapy.arch.bpf.pfroute
og_afinet6 = socket.AF_INET6
og_inet_pton = socket.inet_pton
def mock_inet_pton(af, data):
if af in [24, 28, 30]:
return og_inet_pton(og_afinet6, data)
return og_inet_pton(af, data)
og_inet_ntop = socket.inet_ntop
def mock_inet_ntop(af, data):
if af in [24, 28, 30]:
return og_inet_ntop(og_afinet6, data)
return og_inet_ntop(af, data)
class BSDLoader:
def __init__(self, OPENBSD=False, FREEBSD=False, NETBSD=False, DARWIN=False, sysctldata=None, ifaces={}, AF_INET6=socket.AF_INET6):
self.sysctldata = sysctldata
self.ifaces = ifaces
socket.AF_LINK = 18
self.loadpatches = [
mock.patch('socket.AF_INET6', AF_INET6),
mock.patch('socket.inet_pton', side_effect=mock_inet_pton),
mock.patch('socket.inet_ntop', side_effect=mock_inet_ntop),
mock.patch('scapy.consts.OPENBSD', OPENBSD),
# mock.patch('scapy.consts.FREEBSD', FREEBSD),
mock.patch('scapy.consts.NETBSD', NETBSD),
mock.patch('scapy.consts.DARWIN', DARWIN),
]
def __enter__(self):
# Apply patches that only occur when loading
for p in self.loadpatches:
p.start()
# Reload module
pfroute = importlib.reload(scapy.arch.bpf.pfroute)
# Now apply post-load patches
self.patches = [
mock.patch.object(
pfroute,
'_sr1_bsdsysctl',
return_value=pfroute.pfmsghdrs(self.sysctldata)
),
mock.patch.object(
pfroute,
'_get_if_list',
return_value=self.ifaces,
),
]
for p in self.patches:
p.start()
return pfroute
def __exit__(self, *args, **kwargs):
for p in self.loadpatches:
p.stop()
for p in self.patches:
p.stop()
= OpenBSD 7.5 amd64 - read_routes()
~ mock_read_routes_bsd little_endian_only
import zlib
_PFROUTE_DATA = zlib.decompress(bytes.fromhex('789c7bc1c0ca92c0c0c8c0c0c0c160cec2c0e0ccc10006de0f1850003f0376c08a431c06049830f96bc40f30e2925710626460636163c828cb3360108d6548e5c4aabf0bc6576248c9482ec8494d2c4e4dc1e78e03607f323380fd09243d71f853109be6067c2623dcf5008d5fcfc080e2cf0f48f20a42cc0c1240e7e4e41be0340f593fbafbbd71b81f2b20d2fdf504dcff9f2aee6704bbdf151873d8dc8f961ce0ee67c4264ec0bd18ee0702cadc0fe2b280ddefc8d880d5fdb80031ee07a66b747e1732ffff7f440a22359f5c80bb9f1912fe2ccc58ddcf03a5cd675f494316c71a2f98f6c1bd09761f1002dd26f4b900bb7ad4f820d73fd0f4c4a280d53f5c04dc4dc03f70fb88711f25fe3980ee1f0607acfe61a7c83fe7ffa3f2d1d317f9ee1f05a360148c8251300a46c128180543030000bd836967'))
with BSDLoader(OPENBSD=True, sysctldata=_PFROUTE_DATA, AF_INET6=24) as pfroute:
routes = pfroute.read_routes()
assert routes == [
(0, 0, '172.23.192.1', 'hvn0', '172.23.192.138', 1),
(3758096384, 4026531840, '127.0.0.1', 'lo0', '127.0.0.1', 1),
(2130706432, 4278190080, '127.0.0.1', 'lo0', '127.0.0.1', 1),
(2130706433, 4294967295, '127.0.0.1', 'lo0', '127.0.0.1', 1),
(2887237632, 4294963200, '172.23.192.138', 'hvn0', '172.23.192.138', 1),
(2887237633, 4294967295, '0.0.0.0', 'hvn0', '172.23.192.138', 1),
(2887237770, 4294967295, '0.0.0.0', 'hvn0', '172.23.192.138', 1),
(2887241727, 4294967295, '172.23.192.138', 'hvn0', '172.23.192.138', 1)
]
= OpenBSD 7.5 amd64 - read_routes6()
~ mock_read_routes_bsd little_endian_only
import zlib
_PFROUTE_DATA = zlib.decompress(bytes.fromhex('789ced96bb0dc2301086cf38481125a248411131011d3505a2600906406204325a4661040a6a4264f130d6c5b19c87e2f07f458adcd9be2f7fe190984647924414d3a67c1e6252dcf7544f56dfb24cbceac2ac171a7a633a979494e39fce6baffde9e32f94ff8e56eab5e9bfe076c98866ecf6eee7fbf8ebdfa03dffbef2ffcd6f38f977eb9f4eecf5aaf9347fb6311cff8bd77ce3f1bf7acdf7f5bfb18de1f8f3f9fd4bfe8f8a5e67ff9c5f1f8c7f6eaf57cdd79ffffbfe4fd5eb0ef297dcf9aef5a6f77fd5fe7de55f087bdd80f1e7d7b7977fa4fcb7af7bdaf467c7cff899b8f34b7f69abbbe4cfad0f26ffc6ff3ffcfa60f29f0c347f00000000000000000000306a9e0a72ae83'))
with BSDLoader(OPENBSD=True, sysctldata=_PFROUTE_DATA, AF_INET6=24) as pfroute:
routes = pfroute.read_routes6()
assert routes == [
('::', 96, '::1', 'lo0', ['::1'], 1),
('::1', 128, '::1', 'lo0', ['::1'], 1),
('::ffff:0.0.0.0', 96, '::1', 'lo0', ['::1'], 1),
('2002::', 24, '::1', 'lo0', ['::1'], 1),
('2002:7f00::', 24, '::1', 'lo0', ['::1'], 1),
('2002:e000::', 20, '::1', 'lo0', ['::1'], 1),
('2002:ff00::', 24, '::1', 'lo0', ['::1'], 1),
('fe80::', 10, '::1', 'lo0', ['::1'], 1),
('fec0::', 10, '::1', 'lo0', ['::1'], 1),
('fe80:3::1', 128, 'fe80:3::1', 'lo0', ['fe80:3::1'], 1),
('ff01::', 16, '::1', 'lo0', ['::1'], 1),
('ff01:3::', 32, 'fe80:3::1', 'lo0', ['fe80:3::1'], 1),
('ff02::', 16, '::1', 'lo0', ['::1'], 1),
('ff02:3::', 32, 'fe80:3::1', 'lo0', ['fe80:3::1'], 1)
]
= FreeBSD 14.1 amd64 - read_routes()
~ mock_read_routes_bsd little_endian_only
import zlib
from scapy.arch.bpf.pfroute import _bsd_iff_flags
_FREEBSD_IFACES = {1: {'name': 'lo0', 'index': 1, 'flags': FlagValue(32841, _bsd_iff_flags), 'mac': '00:00:00:00:00:00', 'ips': [{'af_family': 28, 'index': 1, 'address': '::1', 'scope': 16}, {'af_family': 28, 'index': 1, 'address': 'fe80::1', 'scope': 32}, {'af_family': 2, 'index': 1, 'address': '127.0.0.1'}]}, 2: {'name': 'hn0', 'index': 2, 'flags': FlagValue(34883, _bsd_iff_flags), 'mac': '00:15:5d:00:65:07', 'ips': [{'af_family': 28, 'index': 2, 'address': 'fe80::215:5dff:fe00:6507', 'scope': 32}, {'af_family': 2, 'index': 2, 'address': '172.23.198.182'}]}}
_PFROUTE_DATA = zlib.decompress(bytes.fromhex('789c136064656162606060e660603067200ceeb012a18808c008a55970c80b3061f2d7881f60c4256f21c4c4c0c6ccc6909167c0201acb90ca4ea43b20e61edb8670182b0bc8125606010663620c7020d2220280118d46072077d623490b0831324820c95b80f8cc0c0c39f90624d98b612e343d3002fd3f10e98109873c34fe117c507ca3c9ffffff01cea77a7ae01898f4c08c431edd9de8e141adf40000e8611aa8'))
with BSDLoader(FREEBSD=True, sysctldata=_PFROUTE_DATA, ifaces=_FREEBSD_IFACES, AF_INET6=28) as pfroute:
routes = pfroute.read_routes()
assert routes == [
(0, 0, '172.23.192.1', 'hn0', '172.23.198.182', 1),
(2130706433, 4294967295, '0.0.0.0', 'lo0', '127.0.0.1', 1),
(2887237632, 4294963200, '0.0.0.0', 'hn0', '172.23.198.182', 1),
(2887239350, 4294967295, '0.0.0.0', 'lo0', '127.0.0.1', 1),
(3758096384, 4026531840, '0.0.0.0', 'lo0', '127.0.0.1', 250),
(3758096384, 4026531840, '0.0.0.0', 'hn0', '172.23.198.182', 250)
]
= FreeBSD 14.1 amd64 - read_routes6()
~ mock_read_routes_bsd little_endian_only
_PFROUTE_DATA = zlib.decompress(bytes.fromhex('789ce5553b0e8240109d593e62b72121b1a0e00824165a72042e40696261f40a1ecd837816101236c28461872801e36bb678f3797979bb9ba3e722006c0380030890498aecc0f6f4193e8ec7fb191e295f75d02d3c86083b07e0724b457aa57b93d64f2fd0b0970ccc26ad6781e4a4b0e9d68d1f1d622e7ff29fc95b3f2f6bcddbdafd2cefe33c33f6ede763b87f2e3fb3d64f04bd889f0ec3737e9a3e7a7f691ee9bc4ffd233ad0e858fafd530c6fd3fdedf78fdbd3e44b813c5f4f6fd27a16663f378ecb97f15387aa77d7edf9aaeb1d1fced714a2024e1ba14eaa43454555d6ed46c7d2f97219dea69bfaf7afff41c55c50f9ff3adc3f979f2f44725d78'))
with BSDLoader(FREEBSD=True, sysctldata=_PFROUTE_DATA, ifaces=_FREEBSD_IFACES, AF_INET6=28) as pfroute:
routes = pfroute.read_routes6()
assert routes == [
('::', 96, '::1', 'lo0', ['::1'], 1),
('::1', 128, '::', 'lo0', ['::1'], 1),
('::ffff:0.0.0.0', 96, '::1', 'lo0', ['::1'], 1),
('fe80::', 10, '::1', 'lo0', ['::1'], 1),
('fe80::', 64, '::', 'lo0', ['fe80::1'], 1),
('fe80::1', 128, '::', 'lo0', ['fe80::1'], 1),
('fe80::', 64, '::', 'hn0', ['fe80::215:5dff:fe00:6507'], 1),
('fe80::215:5dff:fe00:6507', 128, '::', 'lo0', ['::1'], 1),
('ff02::', 16, '::1', 'lo0', ['::1'], 1),
('ff00::', 8, '::', 'lo0', ['::1', 'fe80::1'], 250),
('ff00::', 8, '::', 'hn0', ['fe80::215:5dff:fe00:6507'], 250)
]
= NetBSD 10.0 amd64 - read_routes()
~ mock_read_routes_bsd little_endian_only
import zlib
from scapy.arch.bpf.pfroute import _bsd_iff_flags
_NETBSD_IFACES = {1: {'name': 'hvn0', 'index': 1, 'flags': FlagValue(34883, _bsd_iff_flags), 'mac': '00:15:5d:00:65:0a', 'ips': [{'af_family': 24, 'index': 1, 'address': 'fe80:1::7184:2b50:9fbe:e337', 'scope': 32}, {'af_family': 2, 'index': 1, 'address': '172.23.207.191'}]}, 2: {'name': 'lo0', 'index': 2, 'flags': FlagValue(32841, _bsd_iff_flags), 'mac': '00:00:00:00:00:00', 'ips': [{'af_family': 2, 'index': 2, 'address': '127.0.0.1'}, {'af_family': 24, 'index': 2, 'address': '::1', 'scope': 16}, {'af_family': 24, 'index': 2, 'address': 'fe80:2::1', 'scope': 32}]}}
_PFROUTE_DATA = zlib.decompress(bytes.fromhex('789c3bc1c0c2c2c8c0c0c00cc4e60ca8c08a91816640800993bf46fc00868d42428c0c6c2c6c0c196579060ca2b10ca95cc8eacfef87a93b00f407c8486e0e4c7f600311cd94b81ed5ddf5987cb83f58ff0301c85d424c0c12c040cec937c0aa6e07d4fdac0c2c0cc6f4773fdc1de8ee24e4ee0bd0f4c3c88819ee2c2cd471232e7703d30b9c0f4e2758d4b183c2ffff0792d311b1f1402d80ee0e5cfec1161fc8fa6640e383550592a769058cef5d4943e6a3e75f3eb0fb813e108d15fa5c404387e000001e173214'))
with BSDLoader(NETBSD=True, sysctldata=_PFROUTE_DATA, ifaces=_NETBSD_IFACES, AF_INET6=24) as pfroute:
routes = pfroute.read_routes()
assert routes == [
(0, 0, '172.23.192.1', 'hvn0', '172.23.207.191', 1),
(2130706432, 4294967040, '127.0.0.1', 'lo0', '127.0.0.1', 1),
(2130706433, 4294967295, '0.0.0.0', 'lo0', '127.0.0.1', 1),
(2887237632, 4294967295, '0.0.0.0', 'hvn0', '172.23.207.191', 1),
(2887241663, 4294967295, '0.0.0.0', 'lo0', '172.23.207.191', 1),
(2887237633, 4294967295, '0.0.0.0', 'hvn0', '', 1),
(3758096384, 4026531840, '0.0.0.0', 'hvn0', '172.23.207.191', 250),
(3758096384, 4026531840, '0.0.0.0', 'lo0', '127.0.0.1', 250)
]
= NetBSD 10.0 amd64 - read_routes6()
~ mock_read_routes_bsd little_endian_only
_PFROUTE_DATA = zlib.decompress(bytes.fromhex('789ced97b14ec3301445af4da8aa964a5544a50e1dd8592a31f01b8c2c1d2b31205017d65682ffe86f30213e8331123fd09109d3368a8127fbd9898c2c87de29ce4dac77749f1d0722cb24807e17b8845bd78f1e0f7968326ee48bea62a40cdadeefe712e323e0f67eea350f12e53f35e3d7e67f43c97f8c0c171e75ff31bfae8b72a49cebd2e1a3e57d5d387c38f837489b5f397cb43a7fa5787fafe0fbda07e2f29f89c133e713e9ba4f52e796bc4ff4bddfffc64e907bc9fa442de22e589fc8c0bd29c7c9712bd6276a4dde9f2bde27d275f72aead772dc847b3710c28f3b947e700b939fe7021dc3fd21f986ed9fcb3ab879b89b6234c3bc679e7ff1747eb57e79d78845cdf37928b9eab271db72b5cd53f5f3ce8c94abf18b65f1750fd07c196ee3fb75ffbb42c95597ef7f97edfdd8eb54897aeb949eb79aae53ddc79edca1f7e52d37dbc74441cf9b51f396ff346f1927ef830efa028ffb7c47'))
with BSDLoader(NETBSD=True, sysctldata=_PFROUTE_DATA, ifaces=_NETBSD_IFACES, AF_INET6=24) as pfroute:
routes = pfroute.read_routes6()
assert routes == [
('::', 104, '::1', 'lo0', ['::1'], 1),
('::', 96, '::1', 'lo0', ['::1'], 1),
('::1', 128, '::', 'lo0', ['::1'], 1),
('::127.0.0.0', 104, '::1', 'lo0', ['::1'], 1),
('::224.0.0.0', 100, '::1', 'lo0', ['::1'], 1),
('::255.0.0.0', 104, '::1', 'lo0', ['::1'], 1),
('::ffff:0.0.0.0', 96, '::1', 'lo0', ['::1'], 1),
('2001:db8::', 32, '::1', 'lo0', ['::1'], 1),
('2002::', 24, '::1', 'lo0', ['::1'], 1),
('2002:7f00::', 24, '::1', 'lo0', ['::1'], 1),
('2002:e000::', 20, '::1', 'lo0', ['::1'], 1),
('2002:ff00::', 24, '::1', 'lo0', ['::1'], 1),
('fe80::', 10, '::1', 'lo0', ['::1'], 1),
('fe80:1::', 64, '::', 'hvn0', ['fe80:1::7184:2b50:9fbe:e337'], 1),
('fe80:1::7184:2b50:9fbe:e337',
128,
'::',
'lo0',
['fe80:1::7184:2b50:9fbe:e337'],
1),
('fe80:2::', 64, 'fe80:2::1', 'lo0', ['fe80:2::1'], 1),
('fe80:2::1', 128, '::', 'lo0', ['fe80:2::1'], 1),
('ff01:1::', 32, '::', 'hvn0', ['fe80:1::7184:2b50:9fbe:e337'], 1),
('ff01:2::', 32, '::1', 'lo0', ['::1'], 1),
('ff02:1::', 32, '::', 'hvn0', ['fe80:1::7184:2b50:9fbe:e337'], 1),
('ff02:2::', 32, '::1', 'lo0', ['::1'], 1),
('ff00::', 8, '::', 'hvn0', ['fe80:1::7184:2b50:9fbe:e337'], 250),
('ff00::', 8, '::', 'lo0', ['::1', 'fe80:2::1'], 250)
]
= Darwin 23.6 (MacOS 14.5) x86_64 - read_routes()
~ mock_read_routes_bsd little_endian_only
import zlib
from scapy.arch.bpf.pfroute import _bsd_iff_flags
_DARWIN_IFACES = {1: {'name': 'lo0', 'index': 1, 'flags': FlagValue(32841, _bsd_iff_flags), 'mac': '00:00:00:00:00:00', 'ips': [{'af_family': 2, 'index': 1, 'address': '127.0.0.1'}, {'af_family': 30, 'index': 1, 'address': '::1', 'scope': 16}, {'af_family': 30, 'index': 1, 'address': 'fe80:1::1', 'scope': 32}]}, 2: {'name': 'gif0', 'index': 2, 'flags': FlagValue(32784, _bsd_iff_flags), 'mac': '00:00:00:00:00:00', 'ips': []}, 3: {'name': 'stf0', 'index': 3, 'flags': FlagValue(0, _bsd_iff_flags), 'mac': '00:00:00:00:00:00', 'ips': []}, 4: {'name': 'XHC2', 'index': 4, 'flags': FlagValue(0, _bsd_iff_flags), 'mac': '00:00:00:00:00:00', 'ips': []}, 5: {'name': 'en0', 'index': 5, 'flags': FlagValue(34915, _bsd_iff_flags), 'mac': '52:54:00:09:49:17', 'ips': [{'af_family': 30, 'index': 5, 'address': 'fe80:5::409:eec9:f06c:50ab', 'scope': 32}, {'af_family': 30, 'index': 5, 'address': 'fec0::89e:daf7:5cb1:f1f0', 'scope': 64}, {'af_family': 30, 'index': 5, 'address': 'fec0::c0c4:1f0b:61ba:ea8', 'scope': 64}, {'af_family': 2, 'index': 5, 'address': '10.0.2.15'}]}, 6: {'name': 'utun0', 'index': 6, 'flags': FlagValue(32849, _bsd_iff_flags), 'mac': '00:00:00:00:00:00', 'ips': [{'af_family': 30, 'index': 6, 'address': 'fe80:6::d36e:82de:94dc:84fc', 'scope': 32}]}, 7: {'name': 'utun1', 'index': 7, 'flags': FlagValue(32849, _bsd_iff_flags), 'mac': '00:00:00:00:00:00', 'ips': [{'af_family': 30, 'index': 7, 'address': 'fe80:7::7ce2:1f7b:2c29:a5ee', 'scope': 32}]}, 8: {'name': 'utun2', 'index': 8, 'flags': FlagValue(32849, _bsd_iff_flags), 'mac': '00:00:00:00:00:00', 'ips': [{'af_family': 30, 'index': 8, 'address': 'fe80:8::e4e0:bef:bf56:2605', 'scope': 32}]}, 9: {'name': 'utun3', 'index': 9, 'flags': FlagValue(32849, _bsd_iff_flags), 'mac': '00:00:00:00:00:00', 'ips': [{'af_family': 30, 'index': 9, 'address': 'fe80:9::ce81:b1c:bd2c:69e', 'scope': 32}]}}
_PFROUTE_DATA = zlib.decompress(bytes.fromhex('789ccdd94d6813411400e0d9ddcc36e6608d4472a950a8e0d183e8a1a7da5a503008012948b11eaa78f0e6498b180a45c1802d08164f7ab3f4208817b1e84a0ff647ea0fd2832815a5e021018f518931dbce6c5e76df6cb2e36c3a0325bce936fbf5bdd96176669ad00c2584584963e060fd7382e0ed3315fc22a4ed3183718a98260df675f3b8c83c5dc41ceeab7f1acca6ca6366922f451ebf6596598c5d84b8b9f1fd3b01cb8bc58f17a35852e01b337b29b17dd774d5b65a4b97a1dee5c1305772db55f3bba6998b26cc7d6eedf2cc2872ed5ffe5f974df2671abd211eea7a4ce0d9403c59816703e963f7b2508f857b3a5037ef5e72751b34fa581fcf9305fe9ebb4a029785f4b17bd59a5d3661431bb5fbe700b76e2ae780eef2d4619f4f3807c43d1fa5b3e753da587ad7e7b4b11c583aad8d65fe50561bcbc29de7fa589e7c93b5a87ea6d30bcf6eca5a12c082cd77a2269aefd295d280ac45798d2aa541598b052c70edd3ca82ad93986548d612435e8ecb5a605e145986652deaf3f23ba19185c2b83d8b7d8caf61c2c6eedbf7f81a463876ab3d7fa25b62ca4bf5c81b8d2c6b1a59dee96339b9aa8f25b72c6b513ed755732bd12dc1671abe3b71cb9ae099c6deb3b62d97af47b7c455a3196dde03b2fd4e8f4696c72a2cd8781135d178a95bd655586093cfcbab823616e7fb2f590b7c0f505223a72cfd1e10836556d6a2be46e5872a2c2af27234f76053850536d9bc8c54c61fe96219bdf6e9be2e96b1f1a913ba582e5d397bbb5dcbddbac5bd3fdf631536a873d84f1b961bc1d81be6946d69fafb8bcc44492fe1f38cc7680ac24d4dd70a0cade2b035d529f0bdbc56b73ee06b2af7daa7be7abaf72ae6af5a30dec93da17b17266c184739eb1135d9bdf9b9bf8d18db9bb7d97e78a773e46c7e1983f14e3ee7af7fcc27dbb534ea5588e52ce52b88b17ab9cffa4fc4c573441393e02ca520744569cce5ed43ec6667290639b7d5dbe931dd38c18976def40fb15043e2'))
with BSDLoader(DARWIN=True, sysctldata=_PFROUTE_DATA, ifaces=_DARWIN_IFACES, AF_INET6=30) as pfroute:
routes = pfroute.read_routes()
assert routes == [
(0, 0, '10.0.2.2', 'en0', '10.0.2.15', 1),
(167772672, 4294967295, '0.0.0.0', 'en0', '10.0.2.15', 1),
(167772674, 4294967295, '0.0.0.0', 'en0', '10.0.2.15', 1),
(167772674, 4294967295, '0.0.0.0', 'en0', '10.0.2.15', 1),
(167772675, 4294967295, '0.0.0.0', 'en0', '10.0.2.15', 1),
(167772687, 4294967295, '0.0.0.0', 'en0', '10.0.2.15', 1),
(167772927, 4294967295, '0.0.0.0', 'en0', '10.0.2.15', 1),
(2130706432, 4294967040, '127.0.0.1', 'lo0', '127.0.0.1', 1),
(2130706433, 4294967295, '127.0.0.1', 'lo0', '127.0.0.1', 1),
(2851995648, 4294967295, '0.0.0.0', 'en0', '10.0.2.15', 1),
(3758096384, 4043308800, '0.0.0.0', 'en0', '10.0.2.15', 1),
(3758096635, 4294967295, '0.0.0.0', 'en0', '10.0.2.15', 1),
(4294967295, 4294967295, '0.0.0.0', 'en0', '10.0.2.15', 1)
]
= Darwin 23.6 (MacOS 14.5) x86_64 - read_routes6()
~ mock_read_routes_bsd little_endian_only
_PFROUTE_DATA = zlib.decompress(bytes.fromhex('789cd5dd5f8c13451800f0d96dbbdb5e8fbbdaebfd114eaf08148fdc830124045f0a1581981062f48c8698c32022313c1925c2c381f8a0a28290887f728644e0d0aa60ce80e60897887fe08120f060cc051030218af74713088a60b73b5bda6e77f73a33fbcd7dfbc27667d2fefaddb7dfcc6cb7a58f84122142488028e9e9b97f8f90cadb60c8a1c1656bbddbbbed6637297e6635e4d01e8c0c1d1b797ed9a7c67e5fceac99e6f9d35d5edf47b3567c5c73683fbd76d3d91d839b6f58667d0ce695fe99f5e2e3ba43fb860b6deb3bda770f59e6f018cc277597463e73b8f878d8a1fdd2f9e8f091ce54c83247c660be1cf0cd1c293e1e71683fb131da7ab843eb31f6f7e7cc4aeedf503049a6b801d2c2cc0a4fdb7e5a3374a2cdb7bc46fd28ef6c9d7f43a7ceacaad69b5416772d56c94c7a784e715ba59ae1562faaf5fec9ed55d6417a39e23b9b1ec6125fea858d2f87770e32ef5c19de1106efd4e06a920e8669894f0620bd2cf14d91ad64f2dbf308894df8f9a9f4f65d38bcab9079d722f36e42e67d1f99770099f72832ef6554ded4d3130999747c70188db70399772632ef1a64deadc8bcfdc8bcc79179cf48f18eb278833db96585d26e8a6a4aae31f8edbdc4e2d5ee25eaac7f546dfd475757e8e159905e96f57c4a5b5438b63a9eb880cbdb08eabdc2ea8d516f22f0072e6f10d4cb9c0f96b729b810973704ea1de6f64eedc2e59d06ea651a8f4bbcb33b7179ef17e455171a5ec5c35bcd56d93bef075cde07047961c68b6ca6a1458c1726bed94ce345315e98f1229b69fe1dd2cb5b1fb299d41a482fef7891cdcc3826c6eb73fe26cdfdd513b5cd62bcfe7dde52ead541bdccf95bf086f7e1f24640bdcce75bc15bb71797b71ed4cb7bbe65338bbe87f4f2c6379b79380de9e53ddf72de51482ff3fc61b2b9ffe633eb6a9079a390deeb2c5efdef652430ed3962ce226a21bd4ce75ba977022e6f12597c93a0f13df5138337a92c21d763f4110922f14e43e64d21f35ab7c0a2f0aae180e5ba0b99f76e245eeb56752cf1b5ee2c9f24c66baee7831ede6ab6626fcd68e1700c85f7aa68af8f9f1f27952bd17385c30b20bdd718bc334862595a0fd36748aa905e96f90ef5d2af582441c70b96f531f5d6522fe8fd041cf1ad93e165591f9779c3905e8ef8d22bc0b0e71b4b3da3de06195e8ef85a5e0dd2cb91bf2dd45b87c43b917ab1d433cb0b9abf2cd7a3cabca0d7cf583e8f2df382e6ef632c5ee560ba462d4c8041f3e1257e2fe87c6711bf17743c6e64f56a9657d929c6ebdfe7b1796fb8105fd0f1ed04c3fa38ef6db29e5201bd7f5280f72f48efbfacf9db34629140c76301de00a49769be53eabd538cd7e7fad06279c9f85fbf957a41f3e11cb357cefc813dbe72e60fccf537791f3d2aaafefa7cbea5909d6fb7bd3aa497793e999233dfb9c9ea9d5fd76d8a60f3418017b49eb5f27b41c70b01f105adbf02bc2164def17fbd2fe76de832eb426e038daf002fb6f8828e6f02bca0d74b047823c8bca0d753057841ef4714e005bd5f4e8017f47e39015ed0ebebcff27beb21bd02e20b7a7f8900ef1d905ea6fb4bd4d45c73964e9f1ed0cbf47961a917b49e317d7ea1a646b46e53bcfce53def427a99ee8729f57e8bccfb1ba497e97e8d9c771bad0b5db1eba0eb0b015ed0f585002fe8fa428017747d21c00bbabe6862f72e34f69b17c37ae3fcde6648af80f80ababf1a2cbe6d90de667e6f1299773232ef3dc8bc539079a7427a1f62f686bb2909f4f7f038bcafcbf03eceed6d7e02d2cb11df2d9484251fb6c9f072e4c376f3119a7cf89a9240e39b64f6d6bf623e8a7f07e97d92dd4bebc383a0df2fe4f06e91e1e5c887f7cc47b0f9c03e5f6fd8489f612912ef6b32bcacd727735e29f3878becde3764787f65f7be23c3cb910fdb917977caf0b2cf771a3e301fc1ce7738ceb70f2909cbf9d623c3cb91bfbb647839f2618f0c2fc7f946bdb0e71b47feeea5242cf9db2bc3cb11df7d32bc1cf3c94fe83380ce27c9695e6f3be8fd041cde2c32ef67c8bc9fcbf0728c6f5f5012687de0882ff5a2c9870332bc1ce3c5979484251ffacc4768f2e12b64de8332bc1cf5ec102561c95fea858def7fccf16da4d74b26fc88c4db23c3cb743fad9aba463e26247b7e45a8bc6d9c7bf5f2b671eead41e6ad45e6ad47e68d61f12a4d86f72d34f940bd75d0de192cdee0d205e6afbc1a9bfe22a497e9f7b90cef1aeb68f055486f1bb7577f81c73b90f31a5f3c5188a24c27a4f02514db667b0763f7e65ed7f6b40e6df9fdd8add2cdad6f2ff5878249650a8c3fbf9f882ba4a58afe87685e28b9401b71561d5e93e7772bcafef6c47486cc2ff8166d2ef1b5e5472f7587826aa311dfe3f43df8e8566fbb35f248272904cbcb599c078e5b9adf59fcba05e7a324b2a4d9bbbf71be197f0feb7cf3290fcaffe4b6b6d36b379ddd31b8f986b1ef920fb6be4071b6bd6e22aed992ceadbf11676332ed15e7957c71d6bdda365c685bdfd1be7bc8d87789b3ad2f509c6daf9b88eb6e71b6f537e26c7c01c52bce276d91aaca19f66abb743e3a7ca43395ff6bbac4d9d61728ceb6d74dc4c36e71b6f537e26c7c11c52bce97035cce8857db898dd1d6c31d5afe5a804b9c6d7d81e26c7bdd443ce216675bffa2719af8569f07ec6d02c7e9427c15fac68bdf8397bbd2fb7570f38ed3c4b73ca0ce70cf2fd7961f181d2971561aa72bf487740e1c6d8baef8a6ae77accee2fefdd6fc5de956acff7045b4f3964b5bd996cfb88895b01efdfa0ae79abb9de75cab64af74ae553257cadf7e6bfe066c769beb38d86dfdfaad3991879d674ee461b7cd1f1cecb67efdd63cc3c3ce33cff0b0dbc66407bbad5fbf35767bd879c66e0fbb6d9c73b0dbfa81d4970aeb49b7ba515b614cacd40fa4be28635b7357324bab2f4a75eb4307bb9cfaa254b7e672b0cba92f4a75eb1807bb9cfaa254b73670b0cba92f2ae2faa222ac2f2ae2faa222ae2f2ae2faa2fa535ffe079dfe8806'))
with BSDLoader(DARWIN=True, sysctldata=_PFROUTE_DATA, ifaces=_DARWIN_IFACES, AF_INET6=30) as pfroute:
routes = pfroute.read_routes6()
assert routes == [
('::', 0, 'fe80:5::2', 'en0', ['fe80:5::409:eec9:f06c:50ab'], 1),
('::', 0, 'fe80:6::', 'utun0', ['fe80:6::d36e:82de:94dc:84fc'], 1),
('::', 0, 'fe80:7::', 'utun1', ['fe80:7::7ce2:1f7b:2c29:a5ee'], 1),
('::', 0, 'fe80:8::', 'utun2', ['fe80:8::e4e0:bef:bf56:2605'], 1),
('::', 0, 'fe80:9::', 'utun3', ['fe80:9::ce81:b1c:bd2c:69e'], 1),
('::1', 128, '::1', 'lo0', ['::1'], 1),
('fe80:1::', 64, 'fe80:1::1', 'lo0', ['fe80:1::1'], 1),
('fe80:1::1', 128, '::', 'lo0', ['fe80:1::1'], 1),
('fe80:5::', 64, '::', 'en0', ['fe80:5::409:eec9:f06c:50ab'], 1),
('fe80:5::2', 128, '::', 'en0', ['fe80:5::409:eec9:f06c:50ab'], 1),
('fe80:5::409:eec9:f06c:50ab', 128, '::', 'lo0', ['fe80:5::409:eec9:f06c:50ab'], 1),
('fe80:6::', 64, 'fe80:6::d36e:82de:94dc:84fc', 'utun0', ['fe80:6::d36e:82de:94dc:84fc'], 1),
('fe80:6::d36e:82de:94dc:84fc', 128, '::', 'lo0', ['fe80:6::d36e:82de:94dc:84fc'], 1),
('fe80:7::', 64, 'fe80:7::7ce2:1f7b:2c29:a5ee', 'utun1', ['fe80:7::7ce2:1f7b:2c29:a5ee'], 1),
('fe80:7::7ce2:1f7b:2c29:a5ee', 128, '::', 'lo0', ['fe80:7::7ce2:1f7b:2c29:a5ee'], 1),
('fe80:8::', 64, 'fe80:8::e4e0:bef:bf56:2605', 'utun2', ['fe80:8::e4e0:bef:bf56:2605'], 1),
('fe80:8::e4e0:bef:bf56:2605', 128, '::', 'lo0', ['fe80:8::e4e0:bef:bf56:2605'], 1),
('fe80:9::', 64, 'fe80:9::ce81:b1c:bd2c:69e', 'utun3', ['fe80:9::ce81:b1c:bd2c:69e'], 1),
('fe80:9::ce81:b1c:bd2c:69e', 128, '::', 'lo0', ['fe80:9::ce81:b1c:bd2c:69e'], 1),
('fec0::', 64, '::', 'en0', ['fe80:5::409:eec9:f06c:50ab'], 1),
('fec0::2', 128, '::', 'en0', ['fe80:5::409:eec9:f06c:50ab'], 1),
('fec0::89e:daf7:5cb1:f1f0', 128, '::', 'lo0', ['fec0::89e:daf7:5cb1:f1f0'], 1),
('fec0::c0c4:1f0b:61ba:ea8', 128, '::', 'lo0', ['fec0::c0c4:1f0b:61ba:ea8'], 1),
('ff00::', 8, '::1', 'lo0', ['::1'], 1),
('ff00::', 8, '::', 'en0', ['fe80:5::409:eec9:f06c:50ab'], 1),
('ff00::', 8, 'fe80:6::d36e:82de:94dc:84fc', 'utun0', ['fe80:6::d36e:82de:94dc:84fc'], 1),
('ff00::', 8, 'fe80:7::7ce2:1f7b:2c29:a5ee', 'utun1', ['fe80:7::7ce2:1f7b:2c29:a5ee'], 1),
('ff00::', 8, 'fe80:8::e4e0:bef:bf56:2605', 'utun2', ['fe80:8::e4e0:bef:bf56:2605'], 1),
('ff00::', 8, 'fe80:9::ce81:b1c:bd2c:69e', 'utun3', ['fe80:9::ce81:b1c:bd2c:69e'], 1),
('ff01:1::', 32, '::1', 'lo0', ['::1'], 1),
('ff01:5::', 32, '::', 'en0', ['fe80:5::409:eec9:f06c:50ab'], 1),
('ff01:6::', 32, 'fe80:6::d36e:82de:94dc:84fc', 'utun0', ['fe80:6::d36e:82de:94dc:84fc'], 1),
('ff01:7::', 32, 'fe80:7::7ce2:1f7b:2c29:a5ee', 'utun1', ['fe80:7::7ce2:1f7b:2c29:a5ee'], 1),
('ff01:8::', 32, 'fe80:8::e4e0:bef:bf56:2605', 'utun2', ['fe80:8::e4e0:bef:bf56:2605'], 1),
('ff01:9::', 32, 'fe80:9::ce81:b1c:bd2c:69e', 'utun3', ['fe80:9::ce81:b1c:bd2c:69e'], 1),
('ff02:1::', 32, '::1', 'lo0', ['::1'], 1),
('ff02:5::', 32, '::', 'en0', ['fe80:5::409:eec9:f06c:50ab'], 1),
('ff02:6::', 32, 'fe80:6::d36e:82de:94dc:84fc', 'utun0', ['fe80:6::d36e:82de:94dc:84fc'], 1),
('ff02:7::', 32, 'fe80:7::7ce2:1f7b:2c29:a5ee', 'utun1', ['fe80:7::7ce2:1f7b:2c29:a5ee'], 1),
('ff02:8::', 32, 'fe80:8::e4e0:bef:bf56:2605', 'utun2', ['fe80:8::e4e0:bef:bf56:2605'], 1),
('ff02:9::', 32, 'fe80:9::ce81:b1c:bd2c:69e', 'utun3', ['fe80:9::ce81:b1c:bd2c:69e'], 1)
]
############
############
+ Mocked _parse_tcpreplay_result(stdout, stderr, argv, results_dict)
~ mock_parse_tcpreplay_result
= Test mocked _parse_tcpreplay_result
from scapy.sendrecv import _parse_tcpreplay_result
stdout = """Actual: 1024 packets (198929 bytes) sent in 67.88 seconds.
Rated: 2930.6 bps, 0.02 Mbps, 15.09 pps
Statistics for network device: mon0
Attempted packets: 1024
Successful packets: 1024
Failed packets: 0
Retried packets (ENOBUFS): 0
Retried packets (EAGAIN): 0"""
stderr = """Warning in sendpacket.c:sendpacket_open_pf() line 669:
Unsupported physical layer type 0x0323 on mon0. Maybe it works, maybe it won't. See tickets #123/318
sending out mon0
processing file: replay-example.pcap"""
argv = ['tcpreplay', '--intf1=mon0', '--multiplier=1.00', '--timer=nano', 'replay-example.pcap']
results_dict = _parse_tcpreplay_result(stdout, stderr, argv)
results_dict
assert results_dict["packets"] == 1024
assert results_dict["bytes"] == 198929
assert results_dict["time"] == 67.88
assert results_dict["bps"] == 2930.6
assert results_dict["mbps"] == 0.02
assert results_dict["pps"] == 15.09
assert results_dict["attempted"] == 1024
assert results_dict["successful"] == 1024
assert results_dict["failed"] == 0
assert results_dict["retried_enobufs"] == 0
assert results_dict["retried_eagain"] == 0
assert results_dict["command"] == " ".join(argv)
assert len(results_dict["warnings"]) == 3
= Test more recent version with flows
data = """Actual: 1 packets (42 bytes) sent in 0.000278 seconds
Rated: 151079.1 Bps, 1.20 Mbps, 3597.12 pps
Flows: 1 flows, 3597.12 fps, 1 flow packets, 0 non-flow
Statistics for network device: enp0s3
Successful packets: 1
Failed packets: 0
Truncated packets: 0
Retried packets (ENOBUFS): 0
Retried packets (EAGAIN): 0
"""
results_dict = _parse_tcpreplay_result(data, "", [])
results_dict
expected = {
'bps': 151079.1,
'bytes': 42,
'command': '',
'failed': 0,
'flow_packets': 1,
'flows': 1,
'fps': 3597.12,
'mbps': 1.2,
'non_flow': 0,
'packets': 1,
'pps': 3597.12,
'retried_eagain': 0,
'retried_enobufs': 0,
'successful': 1,
'time': 0.000278,
'truncated': 0,
'warnings': []
}
assert results_dict == expected
############
############
+ Mocked route() calls
= Mocked IPv4 routes calls
import scapy
conf.ifaces._add_fake_iface("enp3s0")
conf.ifaces._add_fake_iface("lo")
old_iface = conf.iface
old_loopback = conf.loopback_name
try:
conf.iface = 'enp3s0'
conf.loopback_name = 'lo'
conf.route.invalidate_cache()
conf.route.routes = [
(4294967295, 4294967295, '0.0.0.0', 'wlan0', '', 281),
(4294967295, 4294967295, '0.0.0.0', 'lo', '', 291),
(4294967295, 4294967295, '0.0.0.0', 'enp3s0', '192.168.0.119', 281),
(3758096384, 4026531840, '0.0.0.0', 'lo', '', 291),
(3758096384, 4026531840, '0.0.0.0', 'wlan0', '', 281),
(3758096384, 4026531840, '0.0.0.0', 'enp3s0', '1.1.1.1', 281),
(3232235775, 4294967295, '0.0.0.0', 'enp3s0', '2.2.2.2', 281),
(3232235639, 4294967295, '0.0.0.0', 'enp3s0', '3.3.3.3', 281),
(3232235520, 4294967040, '0.0.0.0', 'enp3s0', '4.4.4.4', 281),
(0, 0, '192.168.0.254', 'enp3s0', '192.168.0.119', 25)
]
assert conf.route.route("192.168.0.0-10") == ('enp3s0', '4.4.4.4', '0.0.0.0')
assert conf.route.route("192.168.0.119") == ('lo', '192.168.0.119', '0.0.0.0')
assert conf.route.route("224.0.0.0") == ('enp3s0', '1.1.1.1', '0.0.0.0')
assert conf.route.route("255.255.255.255") == ('enp3s0', '192.168.0.119', '0.0.0.0')
assert conf.route.route("*") == ('enp3s0', '192.168.0.119', '192.168.0.254')
finally:
conf.loopback_name = old_loopback
conf.iface = old_iface
conf.route.resync()
conf.ifaces.reload()
= Mocked IPv6 routes calls
conf.ifaces._add_fake_iface("enp3s0")
conf.ifaces._add_fake_iface("lo")
old_iface = conf.iface
old_loopback = conf.loopback_name
try:
conf.route6.ipv6_ifaces = set(['enp3s0', 'wlan0', 'lo'])
conf.iface = 'enp3s0'
conf.loopback_name = 'lo'
conf.route6.invalidate_cache()
conf.route6.routes = [
('fe80::dd17:1fa6:a123:ab4', 128, '::', 'lo', ['fe80::dd17:1fa6:a123:ab4'], 291),
('fe80::7101:5678:1234:da65', 128, '::', 'enp3s0', ['fe80::7101:5678:1234:da65'], 281),
('fe80::1f:ae12:4d2c:abff', 128, '::', 'wlan0', ['fe80::1f:ae12:4d2c:abff'], 281),
('fe80::', 64, '::', 'wlan0', ['fe80::1f:ae12:4d2c:abff'], 281),
('fe80::', 64, '::', 'lo', ['fe80::dd17:1fa6:a123:ab4'], 291),
('fe80::', 64, '::', 'enp3s0', ['fe80::7101:5678:1234:da65'], 281),
('2a01:e35:1e06:ab56:7010:6548:9646:fa77', 128, '::', 'enp3s0', ['2a01:e35:1e06:ab56:7010:6548:9646:fa77', '2a01:e35:1e06:ab56:512:8bb7:8ab8:14a8'], 281),
('2a01:e35:1e06:ab56:512:8bb7:8ab8:14a8', 128, '::', 'enp3s0', ['2a01:e35:1e06:ab56:7010:6548:9646:fa77', '2a01:e35:1e06:ab56:512:8bb7:8ab8:14a8'], 281),
('2a01:e35:1e06:ab56::', 64, '::', 'enp3s0', ['2a01:e35:1e06:ab56:7010:6548:9646:fa77', '2a01:e35:1e06:ab56:512:8bb7:8ab8:14a8'], 281),
('::', 0, 'fe80::160c:64aa:ef6f:fe14', 'enp3s0', ['2a01:e35:1e06:ab56:7010:6548:9646:fa77', '2a01:e35:1e06:ab56:512:8bb7:8ab8:14a8'], 281)
]
assert conf.route6.route("2a01:e35:1e06:ab56:512:8bb7:8ab8:14a8") == ('enp3s0', '2a01:e35:1e06:ab56:7010:6548:9646:fa77', '::')
assert conf.route6.route("::1") == ('enp3s0', '2a01:e35:1e06:ab56:7010:6548:9646:fa77', 'fe80::160c:64aa:ef6f:fe14')
assert conf.route6.route("ff02::1") == ('enp3s0', 'fe80::7101:5678:1234:da65', '::')
assert conf.route6.route("fe80::1") == ('enp3s0', 'fe80::7101:5678:1234:da65', '::')
assert conf.route6.route("fe80::1", dev='lo') == ('lo', 'fe80::dd17:1fa6:a123:ab4', '::')
finally:
conf.loopback_name = old_loopback
conf.iface = old_iface
conf.route6.resync()
conf.ifaces.reload()
= Windows: reset routes properly
if WINDOWS:
from scapy.arch.windows import _route_add_loopback
_route_add_loopback()
############
############
############
+ Tests of StreamSocket
= Test with DNS over TCP
~ netaccess
import socket
sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sck.connect(("8.8.8.8", 53))
class DNSTCP(Packet):
name = "DNS over TCP"
fields_desc = [ FieldLenField("len", None, fmt="!H", length_of="dns"),
PacketLenField("dns", 0, DNS, length_from=lambda p: p.len)]
ssck = StreamSocket(sck, DNSTCP)
r = ssck.sr1(DNSTCP(dns=DNS(rd=1, qd=DNSQR(qname="www.example.com"))), timeout=3)
sck.close()
assert DNSTCP in r and len(r.dns.an)
############
+ Tests of SSLStreamContext
= Test with recv() calls that return exact packet-length rawings
~ sslraweamsocket
import socket
class MockSocket(object):
def __init__(self):
self.l = [ b'\x00\x00\x00\x01', b'\x00\x00\x00\x02', b'\x00\x00\x00\x03' ]
def recv(self, x):
if len(self.l) == 0:
return b""
return self.l.pop(0)
def fileno(self):
return -1
def close(self):
return
class TestPacket(Packet):
name = 'TestPacket'
fields_desc = [
IntField('data', 0)
]
def guess_payload_class(self, p):
return conf.padding_layer
s = MockSocket()
ss = SSLStreamSocket(s, basecls=TestPacket)
p = ss.recv()
assert p.data == 1
p = ss.recv()
assert p.data == 2
p = ss.recv()
assert p.data == 3
try:
ss.recv()
ret = False
except EOFError:
ret = True
assert ret
= Test with recv() calls that return twice as much data as the exact packet-length
~ sslraweamsocket
import socket
class MockSocket(object):
def __init__(self):
self.l = [ b'\x00\x00\x00\x01\x00\x00\x00\x02', b'\x00\x00\x00\x03\x00\x00\x00\x04' ]
def recv(self, x):
if len(self.l) == 0:
return b""
return self.l.pop(0)
def fileno(self):
return -1
def close(self):
return
class TestPacket(Packet):
name = 'TestPacket'
fields_desc = [
IntField('data', 0)
]
def guess_payload_class(self, p):
return conf.padding_layer
s = MockSocket()
ss = SSLStreamSocket(s, basecls=TestPacket)
p = ss.recv()
assert p.data == 1
p = ss.recv()
assert p.data == 2
p = ss.recv()
assert p.data == 3
p = ss.recv()
assert p.data == 4
try:
ss.recv()
ret = False
except EOFError:
ret = True
assert ret
= Test with recv() calls that return not enough data
~ sslraweamsocket
import socket
class MockSocket(object):
def __init__(self):
self.l = [ b'\x00\x00', b'\x00\x01', b'\x00\x00\x00', b'\x02', b'\x00\x00', b'\x00', b'\x03' ]
def recv(self, x):
if len(self.l) == 0:
return b""
return self.l.pop(0)
def fileno(self):
return -1
def close(self):
return
class TestPacket(Packet):
name = 'TestPacket'
fields_desc = [
IntField('data', 0)
]
def guess_payload_class(self, p):
return conf.padding_layer
s = MockSocket()
ss = SSLStreamSocket(s, basecls=TestPacket)
p = ss.recv()
assert p.data == 1
p = ss.recv()
assert p.data == 2
p = ss.recv()
assert p.data == 3
try:
ss.recv()
ret = False
except EOFError:
ret = True
assert ret
############
############
+ Test correct conversion from binary to rawing of IPv6 addresses
= IPv6 bin to rawing conversion
from scapy.pton_ntop import _inet6_ntop, inet_ntop
import socket
for binfrm, address in [
(b'\x00' * 16, '::'),
(b'\x11\x11\x22\x22\x33\x33\x44\x44\x55\x55\x66\x66\x77\x77\x88\x88',
'1111:2222:3333:4444:5555:6666:7777:8888'),
(b'\x11\x11\x22\x22\x33\x33\x44\x44\x55\x55\x00\x00\x00\x00\x00\x00',
'1111:2222:3333:4444:5555::'),
(b'\x00\x00\x00\x00\x00\x00\x44\x44\x55\x55\x66\x66\x77\x77\x88\x88',
'::4444:5555:6666:7777:8888'),
(b'\x00\x00\x00\x00\x33\x33\x44\x44\x00\x00\x00\x00\x00\x00\x88\x88',
'0:0:3333:4444::8888'),
(b'\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
'1::'),
(b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01',
'::1'),
(b'\x11\x11\x00\x00\x00\x00\x44\x44\x00\x00\x00\x00\x77\x77\x88\x88',
'1111::4444:0:0:7777:8888'),
(b'\x10\x00\x02\x00\x00\x30\x00\x04\x00\x05\x00\x60\x07\x00\x80\x00',
'1000:200:30:4:5:60:700:8000'),
]:
addr1 = inet_ntop(socket.AF_INET6, binfrm)
addr2 = _inet6_ntop(binfrm)
assert address == addr1 == addr2
= IPv6 bin to rawing conversion - Zero-block of length 1
binfrm = b'\x11\x11\x22\x22\x33\x33\x44\x44\x55\x55\x66\x66\x00\x00\x88\x88'
addr1, addr2 = inet_ntop(socket.AF_INET6, binfrm), _inet6_ntop(binfrm)
# On Mac OS socket.inet_ntop is not fully compliant with RFC 5952 and
# shortens the single zero block to '::'. This is a valid IPv6 address
# representation anyway.
assert(addr1 in ['1111:2222:3333:4444:5555:6666:0:8888',
'1111:2222:3333:4444:5555:6666::8888'])
assert addr2 == '1111:2222:3333:4444:5555:6666:0:8888'
= IPv6 bin to rawing conversion - Illegal sizes
for binfrm in ["\x00" * 15, b"\x00" * 17]:
rc = False
try:
inet_ntop(socket.AF_INET6, binfrm)
except Exception as exc1:
_exc1 = exc1
rc = True
assert rc
try:
_inet6_ntop(binfrm)
except Exception as exc2:
rc = isinstance(exc2, type(_exc1))
assert rc
############
############
+ Addresses generators
= Net
assert list(Net("192.168.0.0/31")) == ["192.168.0.0", "192.168.0.1"]
assert "1.2.3.4" in Net("0.0.0.0/0")
assert "192.168.0.0/25" in Net("192.168.0.0/24")
assert "192.168.0.0/23" not in Net("192.168.0.0/24")
assert "0.0.0.0/1" in Net("0.0.0.0/0")
assert "0.0.0.0/0" not in Net("0.0.0.0/1")
assert Net("1.2.3.0/24") == Net("1.2.3.0", "1.2.3.255")
assert hash(Net("1.2.3.0/24")) == hash(Net("1.2.3.0", "1.2.3.255"))
= Net using name
~ netaccess
ip = IP(dst="www.google.com")
n1 = ip.dst
assert isinstance(n1, Net)
ip.show()
= Net using implicit format in IP
assert len(list(IP(dst=("192.168.0.100", "192.168.0.199")))) == 100
= Multiple IP addresses test
~ netaccess
ip = IP(dst=['192.168.0.1', 'www.google.fr'],ihl=(1,5))
assert ip.dst[0] == '192.168.0.1'
assert isinstance(ip.dst[1], Net)
src = ip.src
assert src
assert isinstance(src, str)
= OID
oid = OID("1.2.3.4.5.6-8")
sum(1 for o in oid) == 3
assert oid.__iterlen__() == 3
= Net6
n1 = Net6("2001:db8::/127")
assert len(list(n1)) == 2
assert len(n1) == 2
n2 = Net6("fec0::/110")
assert len(n2) == 262144
assert "ffff::ffff" in Net6("::/0")
assert "::/1" in Net6("::/0")
assert "::/0" not in Net6("::/1")
assert Net6("::/120") == Net6("::", "::ff")
assert hash(Net6("::/120")) == hash(Net6("::", "::ff"))
assert Net6("::1.2.3.0/120") == Net6("::1.2.3.0", "::1.2.3.255")
assert hash(Net6("::1.2.3.0/120")) == hash(Net6("::1.2.3.0", "::1.2.3.255"))
assert Net6("::1.2.3.0/120") != Net("1.2.3.0/24")
assert hash(Net6("::1.2.3.0/120")) != hash(Net("1.2.3.0/24"))
= Net6 using web address
~ netaccess ipv6
ip = IPv6(dst="www.google.com")
n1 = ip.dst
assert isinstance(n1, Net6)
assert "www.google.com" in repr(n1)
ip.show()
ip = IPv6(dst="www.yahoo.com")
assert IPv6(raw(ip)).dst == [p.dst for p in ip][0]
= Net6 using implicit format in IPv6
assert len(list(IPv6(dst=("fe80::1", "fe80::1f")))) == 31
= Multiple IPv6 addresses test
~ netaccess ipv6
ip = IPv6(dst=['2001:db8::1', 'www.google.fr'],hlim=(1,5))
assert ip.dst[0] == '2001:db8::1'
assert isinstance(ip.dst[1], Net6)
src = ip.src
assert src
assert isinstance(src, str)
= Test repr on Net
~ netaccess
conf.color_theme = BlackAndWhite()
output = repr(IP(src="www.google.com"))
assert 'Net("www.google.com/32")' in output
= Test repr on Net
~ netaccess ipv6
conf.color_theme = BlackAndWhite()
assert 'Net6("www.google.com/128")' in repr(IPv6(src="www.google.com"))
############
############
+ IPv6 helpers
= in6_getLocalUniquePrefix()
p = in6_getLocalUniquePrefix()
len(inet_pton(socket.AF_INET6, p)) == 16 and p.startswith("fd")
= Misc addresses manipulation functions
teredoAddrExtractInfo("2001:0:0a0b:0c0d:0028:f508:f508:08f5") == ("10.11.12.13", 40, "10.247.247.10", 2807)
ip6 = IP6Field("test", None)
ip6.i2repr("", "2001:0:0a0b:0c0d:0028:f508:f508:08f5") == "2001:0:0a0b:0c0d:0028:f508:f508:08f5 [Teredo srv: 10.11.12.13 cli: 10.247.247.10:2807]"
ip6.i2repr("", "2002:0102:0304::1") == "2002:0102:0304::1 [6to4 GW: 1.2.3.4]"
in6_iseui64("fe80::bae8:58ff:fed4:e5f6") == True
in6_isanycast("2001:db8::fdff:ffff:ffff:ff80") == True
a = inet_pton(socket.AF_INET6, "2001:db8::2807")
in6_xor(a, a) == b"\x00" * 16
a = inet_pton(socket.AF_INET6, "fe80::bae8:58ff:fed4:e5f6")
r = inet_ntop(socket.AF_INET6, in6_getnsma(a))
r == "ff02::1:ffd4:e5f6"
in6_isllsnmaddr(r) == True
in6_isdocaddr("2001:db8::2807") == True
in6_isaddrllallnodes("ff02::1") == True
in6_isaddrllallservers("ff02::2") == True
= in6_getscope()
assert in6_getscope("2001:db8::2807") == IPV6_ADDR_GLOBAL
assert in6_getscope("fec0::2807") == IPV6_ADDR_SITELOCAL
assert in6_getscope("fe80::2807") == IPV6_ADDR_LINKLOCAL
assert in6_getscope("ff02::2807") == IPV6_ADDR_LINKLOCAL
assert in6_getscope("ff0e::2807") == IPV6_ADDR_GLOBAL
assert in6_getscope("ff05::2807") == IPV6_ADDR_SITELOCAL
assert in6_getscope("ff01::2807") == IPV6_ADDR_LOOPBACK
assert in6_getscope("::1") == IPV6_ADDR_LOOPBACK
= construct_source_candidate_set()
dev_addresses = [('fe80::', IPV6_ADDR_LINKLOCAL, "linklocal"),('fec0::', IPV6_ADDR_SITELOCAL, "sitelocal"),('ff0e::', IPV6_ADDR_GLOBAL, "global")]
assert construct_source_candidate_set("2001:db8::2807", 0, dev_addresses) == ["ff0e::"]
assert construct_source_candidate_set("fec0::2807", 0, dev_addresses) == ["fec0::"]
assert construct_source_candidate_set("fe80::2807", 0, dev_addresses) == ["fe80::"]
assert construct_source_candidate_set("ff02::2807", 0, dev_addresses) == ["fe80::"]
assert construct_source_candidate_set("ff0e::2807", 0, dev_addresses) == ["ff0e::"]
assert construct_source_candidate_set("ff05::2807", 0, dev_addresses) == ["fec0::"]
assert construct_source_candidate_set("ff01::2807", 0, dev_addresses) == ["::1"]
assert construct_source_candidate_set("::", 0, dev_addresses) == ["ff0e::"]
= inet_pton()
from scapy.pton_ntop import _inet6_pton, inet_pton
import socket
ip6_bad_addrs = ["fe80::2e67:ef2d:7eca::ed8a",
"fe80:1234:abcd::192.168.40.12:abcd",
"fe80:1234:abcd::192.168.40",
"fe80:1234:abcd::192.168.400.12",
"1234:5678:9abc:def0:1234:5678:9abc:def0:",
"1234:5678:9abc:def0:1234:5678:9abc:def0:1234"]
for ip6 in ip6_bad_addrs:
rc = False
exc1 = None
try:
res1 = inet_pton(socket.AF_INET6, ip6)
except Exception as e:
rc = True
exc1 = e
assert rc
rc = False
try:
res2 = _inet6_pton(ip6)
except Exception as exc2:
rc = isinstance(exc2, type(exc1))
assert rc
ip6_good_addrs = [("fe80:1234:abcd::192.168.40.12",
b'\xfe\x80\x124\xab\xcd\x00\x00\x00\x00\x00\x00\xc0\xa8(\x0c'),
("fe80:1234:abcd::fe06",
b'\xfe\x80\x124\xab\xcd\x00\x00\x00\x00\x00\x00\x00\x00\xfe\x06'),
("fe80::2e67:ef2d:7ece:ed8a",
b'\xfe\x80\x00\x00\x00\x00\x00\x00.g\xef-~\xce\xed\x8a'),
("::ffff",
b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff'),
("ffff::",
b'\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'),
('::', b'\x00' * 16)]
for ip6, res in ip6_good_addrs:
res1 = inet_pton(socket.AF_INET6, ip6)
res2 = _inet6_pton(ip6)
assert res == res1 == res2
############
############
+ Test Route class
= make_route()
r4 = Route()
tmp_route = r4.make_route(host="10.12.13.14")
(tmp_route[0], tmp_route[1], tmp_route[2]) == (168561934, 4294967295, '0.0.0.0')
tmp_route = r4.make_route(net="10.12.13.0/24")
(tmp_route[0], tmp_route[1], tmp_route[2]) == (168561920, 4294967040, '0.0.0.0')
= add() & delt()
r4 = Route()
len_r4 = len(r4.routes)
r4.add(net="192.168.1.0/24", gw="1.2.3.4")
len(r4.routes) == len_r4 + 1
r4.delt(net="192.168.1.0/24", gw="1.2.3.4")
len(r4.routes) == len_r4
= ifchange()
r4.add(net="192.168.1.0/24", gw="1.2.3.4", dev=get_dummy_interface())
r4.ifchange(get_dummy_interface(), "5.6.7.8")
r4.routes[-1][4] == "5.6.7.8"
= ifdel()
r4.ifdel(get_dummy_interface())
len(r4.routes) == len_r4
= ifadd() & get_if_bcast()
r4 = Route()
len_r4 = len(r4.routes)
r4.ifadd(get_dummy_interface(), "1.2.3.4/24")
len(r4.routes) == len_r4 +1
r4.get_if_bcast(get_dummy_interface()) == "1.2.3.255"
r4.ifdel(get_dummy_interface())
len(r4.routes) == len_r4
dummy_interface = get_dummy_interface()
bck_conf_route_routes = conf.route.routes
conf.route.routes = [
(0, 0, '172.21.230.1', dummy_interface, '172.21.230.10', 1), # 0.0.0.0 / 0.0.0.0 == 255.255.255.255
(2851995648, 4294901760, '0.0.0.0', dummy_interface, '172.21.230.10', 1), # 169.254.0.0 / 255.255.0.0 == 169.254.255.255
(2887116288, 4294967040, '0.0.0.0', dummy_interface, '172.21.230.10', 1), # 172.21.230.0 / 255.255.255.0 == 172.21.230.255
(2887116289, 4294967295, '0.0.0.0', dummy_interface, '172.21.230.10', 1), # 172.21.230.1 / 255.255.255.255 == 172.21.230.1
(3758096384, 4026531840, '0.0.0.0', dummy_interface, '172.21.230.10', 1), # 224.0.0.0 / 240.0.0.0 == 239.255.255.255
(3758096635, 4294967295, '0.0.0.0', dummy_interface, '172.21.230.10', 1), # 224.0.0.251 / 255.255.255.255 == 224.0.0.251
(4294967295, 4294967295, '0.0.0.0', dummy_interface, '172.21.230.10', 1), # 255.255.255.255 / 255.255.255.255 == 255.255.255.255
]
assert sorted(conf.route.get_if_bcast(dummy_interface)) == sorted(['169.254.255.255', '172.21.230.255', '239.255.255.255'])
conf.route.routes = bck_conf_route_routes
= Remove dummy interface
conf.ifaces.reload()
############
############
+ Flags
= IP flags
~ IP
pkt = IP(flags="MF")
assert pkt.flags.MF
assert not pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 1 (MF)>'
pkt.flags.MF = 0
pkt.flags.DF = 1
assert not pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 2 (DF)>'
pkt.flags |= 'evil+MF'
pkt.flags &= 'DF+MF'
assert pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 3 (MF+DF)>'
pkt = IP(flags=3)
assert pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 3 (MF+DF)>'
pkt.flags = 6
assert not pkt.flags.MF
assert pkt.flags.DF
assert pkt.flags.evil
assert repr(pkt.flags) == '<Flag 6 (DF+evil)>'
assert len({IP().flags, IP().flags}) == 1
pkt = IP()
pkt.flags = ""
assert pkt.flags == 0
= TCP flags
~ TCP
pkt = TCP(flags="SA")
assert pkt.flags == 18
assert pkt.flags.S
assert pkt.flags.A
assert pkt.flags.SA
assert not any(getattr(pkt.flags, f) for f in 'FRPUECN')
assert repr(pkt.flags) == '<Flag 18 (SA)>'
pkt.flags.U = True
pkt.flags.S = False
assert pkt.flags.A
assert pkt.flags.U
assert pkt.flags.AU
assert not any(getattr(pkt.flags, f) for f in 'FSRPECN')
assert repr(pkt.flags) == '<Flag 48 (AU)>'
pkt.flags &= 'SFA'
pkt.flags |= 'P'
assert pkt.flags.P
assert pkt.flags.A
assert pkt.flags.PA
assert not any(getattr(pkt.flags, f) for f in 'FSRUECN')
pkt = TCP(flags=56)
assert all(getattr(pkt.flags, f) for f in 'PAU')
assert pkt.flags.PAU
assert not any(getattr(pkt.flags, f) for f in 'FSRECN')
assert repr(pkt.flags) == '<Flag 56 (PAU)>'
pkt.flags = 50
assert all(getattr(pkt.flags, f) for f in 'SAU')
assert pkt.flags.SAU
assert not any(getattr(pkt.flags, f) for f in 'FRPECN')
assert repr(pkt.flags) == '<Flag 50 (SAU)>'
= Flag values mutation with .raw_packet_cache
~ IP TCP
pkt = IP(raw(IP(flags="MF")/TCP(flags="SA")))
assert pkt.raw_packet_cache is not None
assert pkt[TCP].raw_packet_cache is not None
assert pkt.flags.MF
assert not pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 1 (MF)>'
assert pkt[TCP].flags.S
assert pkt[TCP].flags.A
assert pkt[TCP].flags.SA
assert not any(getattr(pkt[TCP].flags, f) for f in 'FRPUECN')
assert repr(pkt[TCP].flags) == '<Flag 18 (SA)>'
pkt.flags.MF = 0
pkt.flags.DF = 1
pkt[TCP].flags.U = True
pkt[TCP].flags.S = False
pkt = IP(raw(pkt))
assert not pkt.flags.MF
assert pkt.flags.DF
assert not pkt.flags.evil
assert repr(pkt.flags) == '<Flag 2 (DF)>'
assert pkt[TCP].flags.A
assert pkt[TCP].flags.U
assert pkt[TCP].flags.AU
assert not any(getattr(pkt[TCP].flags, f) for f in 'FSRPECN')
assert repr(pkt[TCP].flags) == '<Flag 48 (AU)>'
= Operations on flag values
~ TCP
p1, p2 = TCP(flags="SU"), TCP(flags="AU")
assert (p1.flags & p2.flags).U
assert not any(getattr(p1.flags & p2.flags, f) for f in 'FSRPAECN')
assert all(getattr(p1.flags | p2.flags, f) for f in 'SAU')
assert (p1.flags | p2.flags).SAU
assert not any(getattr(p1.flags | p2.flags, f) for f in 'FRPECN')
assert TCP(flags="SA").flags & TCP(flags="S").flags == TCP(flags="S").flags
assert TCP(flags="SA").flags | TCP(flags="S").flags == TCP(flags="SA").flags
############
############
+ 802.3
= Test detection
assert isinstance(Dot3(raw(Ether())),Ether)
assert isinstance(Ether(raw(Dot3())),Dot3)
a = Ether(b'\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00')
assert isinstance(a,Dot3)
assert a.dst == 'ff:ff:ff:ff:ff:ff'
assert a.src == '00:00:00:00:00:00'
a = Dot3(b'\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x90\x00')
assert isinstance(a,Ether)
assert a.dst == 'ff:ff:ff:ff:ff:ff'
assert a.src == '00:00:00:00:00:00'
############
############
+ ASN.1
= MIB
~ mib
import tempfile
fd, fname = tempfile.mkstemp()
os.write(fd, b"-- MIB test\nscapy OBJECT IDENTIFIER ::= {test 2807}\n")
os.close(fd)
load_mib(fname)
assert sum(1 for k in conf.mib.d.values() if "scapy" in k) == 1
assert sum(1 for oid in conf.mib) > 100
= MIB - graph
~ mib
from unittest import mock
@mock.patch("scapy.asn1.mib.do_graph")
def get_mib_graph(do_graph):
def store_graph(graph, **kargs):
assert graph.startswith("""digraph "mib" {""")
assert """"test.2807" [ label="scapy" ];""" in graph
do_graph.side_effect = store_graph
conf.mib._make_graph()
get_mib_graph()
= MIB - test aliases
~ mib
# https://github.com/secdev/scapy/issues/2542
assert conf.mib._oidname("2.5.29.19") == "basicConstraints"
= DADict tests
a = DADict("test")
a[0] = "test_value1"
a["scapy"] = "test_value2"
assert a.test_value1 == 0
assert a.test_value2 == "scapy"
with ContextManagerCaptureOutput() as cmco:
a._show()
outp = cmco.get_output()
assert "scapy = 'test_value2'" in outp
assert "0 = 'test_value1'" in outp
= Test ETHER_TYPES
assert ETHER_TYPES.IPv4 == 2048
try:
import warnings
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
ETHER_TYPES["BAOBAB"] = 0xffff
assert ETHER_TYPES.BAOBAB == 0xffff
assert issubclass(w[-1].category, DeprecationWarning)
except DeprecationWarning:
# -Werror is used
pass
= MIB - Check that MIB OIDs are not duplicated
~ mib
from scapy.asn1.mib import x509_oids_sets
_dct = {}
for d in x509_oids_sets:
for elt in d:
if elt in _dct:
raise ValueError("OID %s already exists" % elt)
_dct.update(d)
= BER tests
BER_id_enc(42) == '*'
BER_id_enc(2807) == b'\xbfw'
b = BERcodec_IPADDRESS()
r1 = b.enc("8.8.8.8")
r1 == b'@\x04\x08\x08\x08\x08'
r2 = b.dec(r1)[0]
r2.val == '8.8.8.8'
a = b'\x1f\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\x01\x01\x00C\x02\x01U0\x0f0\r\x06\x08+\x06\x01\x02\x01\x02\x01\x00\x02\x01!'
ret = False
try:
BERcodec_Object.check_type(a)
except BER_BadTag_Decoding_Error:
ret = True
else:
ret = False
assert ret
= BER trigger failures
try:
BERcodec_INTEGER.do_dec(b"\x02\x01")
assert False
except BER_Decoding_Error:
pass
############
############
+ Fields
= FieldLenField with BitField
class Test(Packet):
name = "Test"
fields_desc = [
FieldLenField("BitCount", None, fmt="H", count_of="Values"),
FieldLenField("ByteCount", None, fmt="B", length_of="Values"),
FieldListField("Values", [], BitField("data", 0x0, size=1),
count_from=lambda pkt: pkt.BitCount),
]
pkt = Test(raw(Test(Values=[0, 0, 0, 0, 1, 1, 1, 1])))
assert pkt.BitCount == 8
assert pkt.ByteCount == 1
= PacketListField
class TestPacket(Packet):
name = 'TestPacket'
fields_desc = [ PacketListField('list', [], 0) ]
a = TestPacket()
a.list.append(1)
assert len(a.list) == 1
b = TestPacket()
assert len(b.list) == 0
= Test PacketListField deepcopy
class SubPacket(Packet):
name = "SubPacket"
fields_desc = [
ByteField("mem", 1),
]
class TestPacket(Packet):
name = "TestPacket"
fields_desc = [
PacketListField("packlist", SubPacket(), SubPacket),
]
a = TestPacket()
b = a.copy()
fuzz(b)
assert a.packlist[0].mem == 1
= PacketField
class InnerPacket(Packet):
fields_desc = [ StrField("f_name", "test") ]
class TestPacket(Packet):
fields_desc = [ PacketField("inner", InnerPacket(), InnerPacket) ]
p = TestPacket()
print(p.inner.f_name)
assert p.inner.f_name == b"test"
p = TestPacket()
p.inner.f_name = b"scapy"
assert p.inner.f_name == b"scapy"
p = TestPacket()
assert p.inner.f_name == b"test"
+ UUIDField
= Parsing a human-readable UUID
f = UUIDField('f', '01234567-89ab-cdef-0123-456789abcdef')
f.addfield(None, b'', f.default) == hex_bytes('0123456789abcdef0123456789abcdef')
= Parsing a machine-encoded UUID
f = UUIDField('f', bytearray.fromhex('0123456789abcdef0123456789abcdef'))
f.addfield(None, b'', f.default) == hex_bytes('0123456789abcdef0123456789abcdef')
= Parsing a tuple of values
f = UUIDField('f', (0x01234567, 0x89ab, 0xcdef, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef))
f.addfield(None, b'', f.default) == hex_bytes('0123456789abcdef0123456789abcdef')
= Handle None values
f = UUIDField('f', None)
f.addfield(None, b'', f.default) == hex_bytes('00000000000000000000000000000000')
= Get a UUID for dissection
from uuid import UUID
f = UUIDField('f', None)
f.getfield(None, bytearray.fromhex('0123456789abcdef0123456789abcdef01')) == (b'\x01', UUID('01234567-89ab-cdef-0123-456789abcdef'))
= Verify little endian UUIDField
* The endianness of a UUIDField should be apply by block on each block in parenthesis '(01234567)-(89ab)-(cdef)-(01)(23)-(45)(67)(89)(ab)(cd)(ef)'
f = UUIDField('f', '01234567-89ab-cdef-0123-456789abcdef', uuid_fmt=UUIDField.FORMAT_LE)
f.addfield(None, b'', f.default) == hex_bytes('67452301ab89efcd0123456789abcdef')
= Verify reversed UUIDField
* This should reverse the entire value as 128-bits
f = UUIDField('f', '01234567-89ab-cdef-0123-456789abcdef', uuid_fmt=UUIDField.FORMAT_REV)
f.addfield(None, b'', f.default) == hex_bytes('efcdab8967452301efcdab8967452301')
+ RandUUID
= RandUUID setup
RANDUUID_TEMPLATE = '01234567-89ab-*-01*-*****ef'
RANDUUID_FIXED = uuid.uuid4()
= RandUUID default behaviour
ru = RandUUID()
assert ru._fix().version == 4
assert ru.command() == "RandUUID()"
= RandUUID incorrect implicit args
assert expect_exception(ValueError, lambda: RandUUID(node=0x1234, name="scapy"))
assert expect_exception(ValueError, lambda: RandUUID(node=0x1234, namespace=uuid.uuid4()))
assert expect_exception(ValueError, lambda: RandUUID(clock_seq=0x1234, name="scapy"))
assert expect_exception(ValueError, lambda: RandUUID(clock_seq=0x1234, namespace=uuid.uuid4()))
assert expect_exception(ValueError, lambda: RandUUID(name="scapy"))
assert expect_exception(ValueError, lambda: RandUUID(namespace=uuid.uuid4()))
= RandUUID v4 UUID (correct args)
u = RandUUID(version=4)._fix()
assert u.version == 4
u2 = RandUUID(version=4)._fix()
assert u2.version == 4
assert str(u) != str(u2)
= RandUUID v4 UUID (incorrect args)
assert expect_exception(ValueError, lambda: RandUUID(version=4, template=RANDUUID_TEMPLATE))
assert expect_exception(ValueError, lambda: RandUUID(version=4, node=0x1234))
assert expect_exception(ValueError, lambda: RandUUID(version=4, clock_seq=0x1234))
assert expect_exception(ValueError, lambda: RandUUID(version=4, namespace=uuid.uuid4()))
assert expect_exception(ValueError, lambda: RandUUID(version=4, name="scapy"))
= RandUUID v1 UUID
u = RandUUID(version=1)._fix()
assert u.version in [1, 4]
u = RandUUID(version=1, node=0x1234)._fix()
assert u.version == 1
assert u.node == 0x1234
u = RandUUID(version=1, clock_seq=0x1234)._fix()
assert u.version == 1
assert u.clock_seq == 0x1234
ru = RandUUID(version=1, node=0x1234, clock_seq=0x1bcd)
assert ru.command() == "RandUUID(node=4660, clock_seq=7117, version=1)"
u = ru._fix()
assert u.version == 1
assert u.node == 0x1234
assert u.clock_seq == 0x1bcd
= RandUUID v1 UUID (implicit version)
u = RandUUID(node=0x1234)._fix()
assert u.version == 1
assert u.node == 0x1234
u = RandUUID(clock_seq=0x1234)._fix()
assert u.version == 1
assert u.clock_seq == 0x1234
u = RandUUID(node=0x1234, clock_seq=0x1bcd)._fix()
assert u.version == 1
assert u.node == 0x1234
assert u.clock_seq == 0x1bcd
= RandUUID v1 UUID (incorrect args)
assert expect_exception(ValueError, lambda: RandUUID(version=1, template=RANDUUID_TEMPLATE))
assert expect_exception(ValueError, lambda: RandUUID(version=1, namespace=uuid.uuid4()))
assert expect_exception(ValueError, lambda: RandUUID(version=1, name="scapy"))
= RandUUID v5 UUID
ru = RandUUID(version=5, namespace=RANDUUID_FIXED, name="scapy")
u = ru._fix()
assert u.version == 5
assert ru.command() == "RandUUID(namespace=%r, name='scapy', version=5)" % RANDUUID_FIXED
u2 = RandUUID(version=5, namespace=RANDUUID_FIXED, name="scapy")._fix()
assert u2.version == 5
assert u.bytes == u2.bytes
# implicit v5
u2 = RandUUID(namespace=RANDUUID_FIXED, name="scapy")._fix()
assert u.bytes == u2.bytes
= RandUUID v5 UUID (incorrect args)
assert expect_exception(ValueError, lambda: RandUUID(version=5, template=RANDUUID_TEMPLATE))
assert expect_exception(ValueError, lambda: RandUUID(version=5, node=0x1234))
assert expect_exception(ValueError, lambda: RandUUID(version=5, clock_seq=0x1234))
= RandUUID v3 UUID
u = RandUUID(version=3, namespace=RANDUUID_FIXED, name="scapy")._fix()
assert u.version == 3
u2 = RandUUID(version=3, namespace=RANDUUID_FIXED, name="scapy")._fix()
assert u2.version == 3
assert u.bytes == u2.bytes
# implicit v5
u2 = RandUUID(namespace=RANDUUID_FIXED, name="scapy")._fix()
assert u.bytes != u2.bytes
= RandUUID v3 UUID (incorrect args)
assert expect_exception(ValueError, lambda: RandUUID(version=5, template=RANDUUID_TEMPLATE))
assert expect_exception(ValueError, lambda: RandUUID(version=5, node=0x1234))
assert expect_exception(ValueError, lambda: RandUUID(version=5, clock_seq=0x1234))
= RandUUID looks like a UUID with str
assert re.match(r'[0-9a-f]{8}(-[0-9a-f]{4}){3}-[0-9a-f]{12}', str(RandUUID()), re.I) is not None
= RandUUID with a static part
* RandUUID template can contain static part such a 01234567-89ab-*-01*-*****ef
ru = RandUUID('01234567-89ab-*-01*-*****ef')
assert re.match(r'01234567-89ab-[0-9a-f]{4}-01[0-9a-f]{2}-[0-9a-f]{10}ef', str(ru), re.I) is not None
assert ru.command() == "RandUUID(template='01234567-89ab-*-01*-*****ef')"
= RandUUID with a range part
* RandUUID template can contain a part with a range of values such a 01234567-89ab-*-01*-****c0:c9ef
assert re.match(r'01234567-89ab-[0-9a-f]{4}-01[0-9a-f]{2}-[0-9a-f]{8}c[0-9]ef', str(RandUUID('01234567-89ab-*-01*-****c0:c9ef')), re.I) is not None
############
############
+ MPLS tests
= MPLS - build/dissection
from scapy.contrib.mpls import EoMCW, MPLS
p1 = MPLS()/IP()/UDP()
assert p1[MPLS].s == 1
p2 = MPLS()/MPLS()/IP()/UDP()
assert p2[MPLS].s == 0
p1[MPLS]
p1[IP]
p2[MPLS]
p2[MPLS:1]
p2[IP]
= MPLS encapsulated Ethernet with CW - build/dissection
p = Ether(dst="11:11:11:11:11:11", src="22:22:22:22:22:22")
p /= MPLS(label=1)/EoMCW(seq=1234)
p /= Ether(dst="33:33:33:33:33:33", src="44:44:44:44:44:44")/IP()
p = Ether(raw(p))
assert p[EoMCW].zero == 0
assert p[EoMCW].reserved == 0
assert p[EoMCW].seq == 1234
= MPLS encapsulated Ethernet without CW - build/dissection
p = Ether(dst="11:11:11:11:11:11", src="22:22:22:22:22:22")
p /= MPLS(label=2)/MPLS(label=1)
p /= Ether(dst="33:33:33:33:33:33", src="44:44:44:44:44:44")/IP()
p = Ether(raw(p))
assert p[Ether:2].type == 0x0800
try:
p[EoMCW]
except IndexError:
ret = True
else:
ret = False
assert ret
assert p[Ether:2].type == 0x0800
= MPLS encapsulated IP - build/dissection
p = Ether(dst="11:11:11:11:11:11", src="22:22:22:22:22:22")
p /= MPLS(label=1)/IP()
p = Ether(raw(p))
try:
p[EoMCW]
except IndexError:
ret = True
else:
ret = False
assert ret
try:
p[Ether:2]
except IndexError:
ret = True
else:
ret = False
assert ret
p[IP]
############
############
+ PacketList methods
= sr()
class Req(Packet):
fields_desc = [
ByteField("raw", 0)
]
def answers(self, other):
return False
class Res(Packet):
fields_desc = [
ByteField("raw", 0)
]
def answers(self, other):
return other.__class__ == Req and other.raw == self.raw
pl = PacketList([Req(b"1"), Res(b"1"), Req(b"2"), Req(b"3"), Req(b"4"), Res(b"3"), Res(b"1"), Res(b"1"), Res(b"4")])
srl, rl = pl.sr()
assert len(srl) == 3
assert len(rl) == 3
srl, rl = pl.sr(lookahead=1)
assert len(srl) == 1
assert len(rl) == 7
srl, rl = pl.sr(lookahead=2)
assert len(srl) == 2
assert len(rl) == 5
srl, rl = pl.sr(lookahead=3)
assert len(srl) == 3
assert len(rl) == 3
pl = PacketList([Req(b"\x05"), Res(b"1"), Res(b"2"), Res(b"3"), Res(b"4"), Res(b"3"), Res(b"1"), Res(b"1"), Res(b"\x05")])
srl, rl = pl.sr(lookahead=3)
assert len(srl) == 0
assert len(rl) == 9
srl, rl = pl.sr(lookahead=7)
assert len(srl) == 0
assert len(rl) == 9
srl, rl = pl.sr(lookahead=8)
assert len(srl) == 1
assert len(rl) == 7
srl, rl = pl.sr(lookahead=0)
assert len(srl) == 1
assert len(rl) == 7
srl, rl = pl.sr(lookahead=None)
assert len(srl) == 1
assert len(rl) == 7
= pickle test
import pickle
import io
srl, rl = PacketList([Raw(b"1"), Raw(b"1"), Raw(b"2"), Raw(b"3"), Raw(b"4"), Raw(b"3"), Raw(b"1"), Raw(b"1"), Raw(b"4")]).sr()
assert len(srl) == 4
f = io.BytesIO()
pickle.dump(srl, f)
unp = pickle.loads(f.getvalue())
assert len(unp) == len(srl)
assert all(bytes(a[0]) == bytes(b[0]) for a, b in zip(unp, srl))
= plot()
from unittest import mock
import scapy.libs.matplot
@mock.patch("scapy.libs.matplot.plt")
def test_plot(mock_plt):
def fake_plot(data, **kwargs):
return data
mock_plt.plot = fake_plot
plist = PacketList([IP(id=i)/TCP() for i in range(10)])
lines = plist.plot(lambda p: (p.time, p.id))
assert len(lines) == 10
test_plot()
= diffplot()
from unittest import mock
import scapy.libs.matplot
@mock.patch("scapy.libs.matplot.plt")
def test_diffplot(mock_plt):
def fake_plot(data, **kwargs):
return data
mock_plt.plot = fake_plot
plist = PacketList([IP(id=i)/TCP() for i in range(10)])
lines = plist.diffplot(lambda x,y: (x.time, y.id-x.id))
assert len(lines) == 9
test_diffplot()
= multiplot()
from unittest import mock
import scapy.libs.matplot
@mock.patch("scapy.libs.matplot.plt")
def test_multiplot(mock_plt):
def fake_plot(data, **kwargs):
return data
mock_plt.plot = fake_plot
tmp = [IP(id=i)/TCP() for i in range(10)]
plist = PacketList([tuple(tmp[i-2:i]) for i in range(2, 10, 2)])
lines = plist.multiplot(lambda x, y: (y[IP].src, (y.time, y[IP].id)))
assert len(lines) == 1
assert len(lines[0]) == 4
test_multiplot()
= rawhexdump()
def test_rawhexdump():
with ContextManagerCaptureOutput() as cmco:
p = PacketList([IP()/TCP() for i in range(2)])
p.rawhexdump()
result_pl_rawhexdump = cmco.get_output()
assert len(result_pl_rawhexdump.split('\n')) == 7
assert result_pl_rawhexdump.startswith("0000 45 00 00 28")
test_rawhexdump()
= hexraw()
def test_hexraw():
with ContextManagerCaptureOutput() as cmco:
p = PacketList([IP()/Raw(str(i)) for i in range(2)])
p.hexraw()
result_pl_hexraw = cmco.get_output()
assert len(result_pl_hexraw.split('\n')) == 5
assert "0000 30" in result_pl_hexraw
test_hexraw()
= hexdump()
def test_hexdump():
with ContextManagerCaptureOutput() as cmco:
p = PacketList([IP()/Raw(str(i)) for i in range(2)])
p.hexdump()
result_pl_hexdump = cmco.get_output()
assert len(result_pl_hexdump.split('\n')) == 7
assert "0010 7F 00 00 01 31" in result_pl_hexdump
test_hexdump()
= import_hexcap()
@mock.patch("scapy.utils.input")
def test_import_hexcap(mock_input):
data = """
0000 FF FF FF FF FF FF AA AA AA AA AA AA 08 00 45 00 ..............E.
0010 00 1C 00 01 00 00 40 01 7C DE 7F 00 00 01 7F 00 ......@.|.......
0020 00 01 08 00 F7 FF 00 00 00 00 ..........
"""[1:].split("\n")
lines = iter(data)
mock_input.side_effect = lambda: next(lines)
return import_hexcap()
pkt = test_import_hexcap()
pkt = Ether(pkt)
assert pkt[Ether].dst == "ff:ff:ff:ff:ff:ff"
assert pkt[IP].dst == "127.0.0.1"
assert ICMP in pkt
= import_hexcap(input_string)
data = """
0000 FF FF FF FF FF FF AA AA AA AA AA AA 08 00 45 00 ..............E.
0010 00 1C 00 01 00 00 40 01 7C DE 7F 00 00 01 7F 00 ......@.|.......
0020 00 01 08 00 F7 FF 00 00 00 00 ..........
"""[1:]
pkt = import_hexcap(data)
pkt = Ether(pkt)
assert pkt[Ether].dst == "ff:ff:ff:ff:ff:ff"
assert pkt[IP].dst == "127.0.0.1"
assert ICMP in pkt
= padding()
def test_padding():
with ContextManagerCaptureOutput() as cmco:
p = PacketList([IP()/conf.padding_layer(str(i)) for i in range(2)])
p.padding()
result_pl_padding = cmco.get_output()
assert len(result_pl_padding.split('\n')) == 5
assert "0000 30" in result_pl_padding
test_padding()
= nzpadding()
def test_nzpadding():
with ContextManagerCaptureOutput() as cmco:
p = PacketList([IP()/conf.padding_layer("AB"), IP()/conf.padding_layer("\x00\x00")])
p.nzpadding()
result_pl_nzpadding = cmco.get_output()
assert len(result_pl_nzpadding.split('\n')) == 3
assert "0000 41 42" in result_pl_nzpadding
test_nzpadding()
= conversations()
from unittest import mock
@mock.patch("scapy.plist.do_graph")
def test_conversations(mock_do_graph):
def fake_do_graph(graph, **kwargs):
return graph
mock_do_graph.side_effect = fake_do_graph
plist = PacketList([IP(dst="127.0.0.2")/TCP(dport=i) for i in range(2)])
plist.extend([IP(src="127.0.0.2")/TCP(sport=i) for i in range(2)])
plist.extend([IPv6(dst="::2", src="::1")/TCP(sport=i) for i in range(2)])
plist.extend([IPv6(src="::2", dst="::1")/TCP(sport=i) for i in range(2)])
plist.extend([Ether()/ARP(pdst="127.0.0.1")])
result_conversations = plist.conversations()
assert len(result_conversations.split('\n')) == 8
assert result_conversations.startswith('digraph "conv" {')
assert "127.0.0.1" in result_conversations
assert "::1" in result_conversations
test_conversations()
= sessions()
pl = PacketList([Ether()/IPv6()/ICMPv6EchoRequest(), Ether()/IPv6()/IPv6()])
pl.extend([Ether()/IP()/IP(), Ether()/ARP()])
pl.extend([Ether()/Ether()/IP()])
assert len(pl.sessions().keys()) == 5
= afterglow()
from unittest import mock
@mock.patch("scapy.plist.do_graph")
def test_afterglow(mock_do_graph):
def fake_do_graph(graph, **kwargs):
return graph
mock_do_graph.side_effect = fake_do_graph
plist = PacketList([IP(dst="127.0.0.2")/TCP(dport=i) for i in range(2)])
plist.extend([IP(src="127.0.0.2")/TCP(sport=i) for i in range(2)])
result_afterglow = plist.afterglow()
assert len(result_afterglow.split('\n')) == 19
assert result_afterglow.startswith('digraph "afterglow" {')
test_afterglow()
= psdump()
print("PYX: %d" % PYX)
if PYX:
import tempfile
import os
filename = tempfile.mktemp(suffix=".eps")
plist = PacketList([IP()/TCP()])
plist.psdump(filename)
assert os.path.exists(filename)
os.unlink(filename)
= pdfdump()
print("PYX: %d" % PYX)
if PYX:
import tempfile
import os
filename = tempfile.mktemp(suffix=".pdf")
plist = PacketList([IP()/TCP()])
plist.pdfdump(filename)
assert os.path.exists(filename)
os.unlink(filename)
= svgdump()
print("PYX: %d" % PYX)
if PYX:
import tempfile
import os
filename = tempfile.mktemp(suffix=".svg")
plist = PacketList([IP()/TCP()])
plist.svgdump(filename)
assert os.path.exists(filename)
os.unlink(filename)
= __getstate__ / __setstate__ (used by pickle)
import pickle
frm = Ether(src='00:11:22:33:44:55', dst='00:22:33:44:55:66')/Raw()
frm.time = EDecimal(123.45)
frm.sniffed_on = "iface"
frm.wirelen = 1
pl = PacketList(res=[frm, frm], name='WhatAGreatName')
pickled = pickle.dumps(pl)
pl = pickle.loads(pickled)
assert pl.listname == "WhatAGreatName"
assert len(pl) == 2
assert pl[0].time == 123.45
assert pl[0].sniffed_on == "iface"
assert pl[0].wirelen == 1
assert pl[0][Ether].src == '00:11:22:33:44:55'
assert pl[1][Ether].dst == '00:22:33:44:55:66'
= EDecimal
# GH4488
p1, p2 = EDecimal('1722417787.778435252'), EDecimal('1722417787.778435216')
assert p1 != p2
assert p1 > p2
assert not (p1 < p2)
assert p1 == 1722417787.778435252 # float test
assert p2 == 1722417787.778435216
assert (p1, 0) > (p2, 1)
############
############
+ Scapy version
= _version()
import os
from datetime import datetime, timezone
version_filename = os.path.join(scapy._SCAPY_PKG_DIR, "VERSION")
mtime = datetime.fromtimestamp(os.path.getmtime(scapy.__file__), timezone.utc)
version = "2.0.0"
with open(version_filename, "w") as fd:
fd.write(version)
os.environ["SCAPY_VERSION"] = "9.9.9"
assert scapy._version() == "9.9.9"
del os.environ["SCAPY_VERSION"]
assert scapy._version() == version
os.unlink(version_filename)
from unittest import mock
with mock.patch("scapy._version_from_git_archive") as archive:
archive.return_value = "4.4.4"
assert scapy._version() == "4.4.4"
archive.side_effect = ValueError()
with mock.patch("scapy._version_from_git_describe") as git:
git.return_value = "3.3.3"
assert scapy._version() == "3.3.3"
git.side_effect = Exception()
assert scapy._version() == mtime.strftime("%Y.%m.%d")
with mock.patch("os.path.getmtime") as getmtime:
getmtime.side_effect = Exception()
assert scapy._version() == "0.0.0"
= UTscapy HTML output
import tempfile, os
from scapy.tools.UTscapy import TestCampaign, pack_html_campaigns
test_campaign = TestCampaign("test")
test_campaign.output_file = tempfile.mktemp()
html = pack_html_campaigns([test_campaign], None, local=True)
dirname = os.path.dirname(test_campaign.output_file)
filename_js = "%s/UTscapy.js" % dirname
filename_css = "%s/UTscapy.css" % dirname
assert os.path.isfile(filename_js)
assert os.path.isfile(filename_css)
os.remove(filename_js)
os.remove(filename_css)
= test get_temp_dir
dname = get_temp_dir()
assert os.path.isdir(dname)
= test fragleak functions
~ netaccess linux fragleak
from unittest import mock
@mock.patch("scapy.layers.inet.conf.L3socket")
@mock.patch("scapy.layers.inet.select.select")
@mock.patch("scapy.layers.inet.sr1")
def _test_fragleak(func, sr1, select, L3socket):
packets = [IP(src="4.4.4.4")/ICMP()/IPerror(dst="8.8.8.8")/conf.padding_layer(load=b"greatdata")]
iterator = iter(packets)
ne = lambda *args, **kwargs: next(iterator)
L3socket.side_effect = lambda: Bunch(recv=ne, send=lambda x: None)
sr1.side_effect = ne
select.side_effect = lambda a, b, c, d: a+b+c
with ContextManagerCaptureOutput() as cmco:
func("8.8.8.8", count=1)
out = cmco.get_output()
return "greatdata" in out
assert _test_fragleak(fragleak)
assert _test_fragleak(fragleak2)