blob: d651fccee3f1fb3749b2743146844a680a818158 [file] [log] [blame]
% Regression tests for Linux only
# More information at http://www.secdev.org/projects/UTscapy/
############
############
+ Linux only test
= L3RawSocket
~ netaccess IP TCP linux needs_root
with no_debug_dissector():
x = sr1(IP(dst="www.google.com")/TCP(sport=RandShort(), dport=80, flags="S"),timeout=3)
x
assert x[IP].ottl() in [32, 64, 128, 255]
assert 0 <= x[IP].hops() <= 126
# TODO: fix this test (randomly stuck)
# ex: https://travis-ci.org/secdev/scapy/jobs/247473497
#= Supersocket _flush_fd
#~ needs_root linux
#
#import select
#
#from scapy.arch.linux import _flush_fd
#socket = conf.L2listen()
#select.select([socket],[],[],2)
#_flush_fd(socket.ins)
= Interface aliases & sub-interfaces
~ linux needs_root
import os
exit_status = os.system("ip link add name scapy0 type dummy")
exit_status = os.system("ip addr add 192.0.2.1/24 dev scapy0")
exit_status = os.system("ip link set scapy0 up")
exit_status = os.system("ifconfig scapy0:0 inet 198.51.100.1/24 up")
if exit_status == 0:
exit_status = os.system("ip addr show scapy0")
print(get_if_list())
conf.route.resync()
print(conf.route.routes)
assert conf.route.route("198.51.100.254") == ("scapy0", "198.51.100.1", "0.0.0.0")
route_alias = (3325256704, 4294967040, "0.0.0.0", "scapy0", "198.51.100.1", 0)
assert route_alias in conf.route.routes
exit_status = os.system("ip link add link scapy0 name scapy0.42 type vlan id 42")
exit_status = os.system("ip addr add 203.0.113.42/24 dev scapy0.42")
exit_status = os.system("ip link set scapy0.42 up")
exit_status = os.system("ip route add 192.0.2.43/32 via 203.0.113.41")
print(get_if_list())
conf.route.resync()
print(conf.route.routes)
assert conf.route.route("192.0.2.43") == ("scapy0.42", "203.0.113.42", "203.0.113.41")
route_specific = (3221226027, 4294967295, "203.0.113.41", "scapy0.42", "0.0.0.0", 0)
assert route_specific in conf.route.routes
assert conf.route.route("203.0.113.42") == ('scapy0.42', '203.0.113.42', '0.0.0.0')
assert conf.route.route("203.0.113.43") == ('scapy0.42', '203.0.113.42', '0.0.0.0')
exit_status = os.system("ip link del name dev scapy0")
else:
assert True
= Test scoped interface addresses
~ linux needs_root
import os
exit_status = os.system("ip link add name scapy0 type dummy")
exit_status = os.system("ip link add name scapy1 type dummy")
exit_status |= os.system("ip addr add 192.0.2.1/24 dev scapy0")
exit_status |= os.system("ip addr add 192.0.3.1/24 dev scapy1")
exit_status |= os.system("ip link set scapy0 address 00:01:02:03:04:05 multicast on up")
exit_status |= os.system("ip link set scapy1 address 06:07:08:09:10:11 multicast on up")
assert exit_status == 0
conf.ifaces.reload()
conf.route.resync()
conf.route6.resync()
conf.route6
try:
# IPv4
a = Ether()/IP(dst="224.0.0.1%scapy0")
assert a[Ether].src == "00:01:02:03:04:05"
assert a[IP].src == "192.0.2.1"
b = Ether()/IP(dst="224.0.0.1%scapy1")
assert b[Ether].src == "06:07:08:09:10:11"
assert b[IP].src == "192.0.3.1"
c = Ether()/IP(dst="224.0.0.1/24%scapy1")
assert c[Ether].src == "06:07:08:09:10:11"
assert c[IP].src == "192.0.3.1"
# IPv6
a = Ether()/IPv6(dst="ff02::fb%scapy0")
assert a[Ether].src == "00:01:02:03:04:05"
assert a[IPv6].src == "fe80::201:2ff:fe03:405"
b = Ether()/IPv6(dst="ff02::fb%scapy1")
assert b[Ether].src == "06:07:08:09:10:11"
assert b[IPv6].src == "fe80::407:8ff:fe09:1011"
c = Ether()/IPv6(dst="ff02::fb/30%scapy1")
assert c[Ether].src == "06:07:08:09:10:11"
assert c[IPv6].src == "fe80::407:8ff:fe09:1011"
finally:
exit_status = os.system("ip link del scapy0")
exit_status = os.system("ip link del scapy1")
conf.ifaces.reload()
conf.route.resync()
conf.route6.resync()
= catch loopback device missing
~ linux needs_root
from unittest.mock import patch
# can't remove the lo device (or its address without causing trouble) - use some pseudo dummy instead
with patch('scapy.arch.linux.conf.loopback_name', 'scapy_lo_x'):
routes = read_routes()
= catch loopback device no address assigned
~ linux needs_root
import os, socket
from unittest.mock import patch
try:
exit_status = os.system("ip link add name scapy_lo type dummy")
assert exit_status == 0
exit_status = os.system("ip link set dev scapy_lo up")
assert exit_status == 0
with patch('scapy.arch.linux.conf.loopback_name', 'scapy_lo'):
routes = read_routes()
exit_status = os.system("ip addr add dev scapy_lo 10.10.0.1/24")
assert exit_status == 0
with patch('scapy.arch.linux.conf.loopback_name', 'scapy_lo'):
routes = read_routes()
lo_routes = [
(ltoa(dst_int), ltoa(msk_int), gw_str, if_name, if_addr, metric)
for dst_int, msk_int, gw_str, if_name, if_addr, metric in routes
if if_name == "scapy_lo"
]
lo_routes.sort(key=lambda x: x[0])
expected_routes = [
(168427520, 4294967040, '0.0.0.0', 'scapy_lo', '10.10.0.1', 0),
(168427521, 4294967295, '0.0.0.0', 'scapy_lo', '10.10.0.1', 0),
(168427775, 4294967295, '0.0.0.0', 'scapy_lo', '10.10.0.1', 0),
]
print(lo_routes)
print(expected_routes)
finally:
exit_status = os.system("ip link del dev scapy_lo")
assert exit_status == 0
= IPv6 link-local address selection
conf.ifaces._add_fake_iface("scapy0", 'e2:39:91:79:19:10')
from unittest.mock import patch
conf.route6.routes = [('fe80::', 64, '::', 'scapy0', ['fe80::e039:91ff:fe79:1910'], 256)]
conf.route6.ipv6_ifaces = set(['scapy0'])
bck_conf_iface = conf.iface
conf.iface = "scapy0"
p = Ether()/IPv6(dst="ff02::1")/ICMPv6NIQueryName(data="ff02::1")
print(p.sprintf("%Ether.src% > %Ether.dst%\n%IPv6.src% > %IPv6.dst%"))
ip6_ll_address = 'fe80::e039:91ff:fe79:1910'
print(p[IPv6].src, ip6_ll_address)
assert p[IPv6].src == ip6_ll_address
mac_address = 'e2:39:91:79:19:10'
print(p[Ether].src, mac_address)
assert p[Ether].src == mac_address
conf.iface = bck_conf_iface
conf.route6.resync()
= IPv6 - check OS routes
~ linux ipv6
addrs = in6_getifaddr()
if addrs:
assert all(in6_isvalid(addr[0]) for addr in in6_getifaddr()), 'invalid ipv6 address'
ifaces6 = [addr[2] for addr in in6_getifaddr()]
assert all(iface in ifaces6 for iface in conf.route6.ipv6_ifaces), 'ipv6 interface has route but no real'
= veth interface error handling
~ linux needs_root veth
from scapy.arch.linux import VEthPair
try:
veth = VEthPair('this_IF_name_is_to_long_and_will_cause_an_error', 'veth_scapy_1')
veth.setup()
assert False
except subprocess.CalledProcessError:
pass
except Exception:
assert False
= veth interface usage - ctx manager
~ linux needs_root veth
from threading import Condition
cond_started = Condition()
def _sniffer_started():
global cond_started
cond_started.acquire()
cond_started.notify()
cond_started.release()
cond_started.acquire()
try:
with VEthPair('veth_scapy_0', 'veth_scapy_1') as veth:
if_list = get_if_list()
assert ('veth_scapy_0' in if_list)
assert ('veth_scapy_1' in if_list)
frm_count = 0
def _sniffer():
sniffed = sniff(iface='veth_scapy_1',
store=True,
count=2,
lfilter=lambda p: Ether in p and p[Ether].type == 0xbeef,
started_callback=_sniffer_started,
timeout=3)
global frm_count
frm_count = 2
t_sniffer = Thread(target=_sniffer, name="linux.uts sniff veth_scapy_1")
t_sniffer.start()
cond_started.wait()
sendp(Ether(type=0xbeef)/Raw(b'0123456789'),
iface='veth_scapy_0',
count=2)
t_sniffer.join(1)
assert frm_count == 2
if_list = get_if_list()
assert ('veth_scapy_0' not in if_list)
assert ('veth_scapy_1' not in if_list)
except subprocess.CalledProcessError:
assert False
except Exception:
assert False
= veth interface usage - manual interface handling
~ linux needs_root veth
from threading import Condition
cond_started = Condition()
def _sniffer_started():
global cond_started
cond_started.acquire()
cond_started.notify()
cond_started.release()
cond_started.acquire()
veth = VEthPair('veth_scapy_0', 'veth_scapy_1')
try:
veth.setup()
veth.up()
except subprocess.CalledProcessError:
assert False
except Exception:
assert False
conf.ifaces.reload()
if_list = get_if_list()
assert ('veth_scapy_0' in if_list)
assert ('veth_scapy_1' in if_list)
frm_count = 0
def _sniffer():
sniffed = sniff(iface='veth_scapy_1',
store=True,
count=2,
lfilter=lambda p: Ether in p and p[Ether].type == 0xbeef,
started_callback=_sniffer_started,
timeout=3)
global frm_count
frm_count = 2
t_sniffer = Thread(target=_sniffer, name="linux.uts sniff veth_scapy_1 2")
t_sniffer.start()
cond_started.wait()
sendp(Ether(type=0xbeef)/Raw(b'0123456789'),
iface='veth_scapy_0',
count=2)
t_sniffer.join(1)
assert frm_count == 2
try:
veth.down()
veth.destroy()
conf.ifaces.reload()
if_list = get_if_list()
assert ('veth_scapy_0' not in if_list)
assert ('veth_scapy_1' not in if_list)
except subprocess.CalledProcessError:
assert False
except Exception:
assert False
= Routing table, interface with no names
~ linux
from unittest.mock import patch
@patch("scapy.arch.linux.ioctl")
def test_read_routes(mock_ioctl):
def raise_ioerror(*args, **kwargs):
if args[1] == 0x8912:
return args[2]
raise IOError
mock_ioctl.side_effect = raise_ioerror
read_routes()
test_read_routes()
= L3PacketSocket sendto exception
~ linux needs_root
from scapy.arch.linux import L3PacketSocket
import socket
from unittest import mock
@mock.patch("scapy.arch.linux.socket.socket.sendto")
def test_L3PacketSocket_sendto_python3(mock_sendto):
mock_sendto.side_effect = OSError(22, 2807)
l3ps = L3PacketSocket()
l3ps.send(IP(dst="8.8.8.8")/ICMP())
return True
assert test_L3PacketSocket_sendto_python3()
= Test _interface_selection
~ netaccess linux needs_root
import os
from scapy.sendrecv import _interface_selection
assert _interface_selection(IP(dst="8.8.8.8")/UDP()) == (conf.iface, False)
exit_status = os.system("ip link add name scapy0 type dummy")
exit_status = os.system("ip addr add 192.0.2.1/24 dev scapy0")
exit_status = os.system("ip addr add fc00::/24 dev scapy0")
exit_status = os.system("ip link set scapy0 up")
conf.ifaces.reload()
conf.route.resync()
conf.route6.resync()
assert _interface_selection(IP(dst="192.0.2.42")/UDP()) == ("scapy0", False)
assert _interface_selection(IPv6(dst="fc00::ae0d")/UDP()) == ("scapy0", True)
exit_status = os.system("ip link del name dev scapy0")
conf.ifaces.reload()
conf.route.resync()
conf.route6.resync()
= Test 802.1Q sniffing
~ linux needs_root veth
from scapy.arch.linux import VEthPair
from threading import Thread, Condition
def _send():
sendp(Ether()/IP(dst="198.51.100.2")/ICMP(), iface='vlanleft0', count=2)
with VEthPair("left0", "right0") as veth:
exit_status = os.system("ip link add link right0 name vlanright0 type vlan id 42")
exit_status = os.system("ip link add link left0 name vlanleft0 type vlan id 42")
exit_status = os.system("ip link set vlanright0 up")
exit_status = os.system("ip link set vlanleft0 up")
exit_status = os.system("ip addr add 198.51.100.1/24 dev vlanleft0")
exit_status = os.system("ip addr add 198.51.100.2/24 dev vlanright0")
sniffer = AsyncSniffer(
iface="right0",
lfilter=lambda p: Dot1Q in p,
count=2,
timeout=5,
started_callback=_send,
)
sniffer.start()
sniffer.join(1)
if sniffer.running:
sniffer.stop()
raise Scapy_Exception("Sniffer did not stop !")
else:
results = sniffer.results
assert len(results) == 2
assert all(Dot1Q in x for x in results)
= Reload interfaces & routes
conf.ifaces.reload()
conf.route.resync()
conf.route6.resync()