blob: 4a69695f6665bfa4db57c50fb61af9982296d966 [file] [log] [blame]
% send, sniff, sr* tests for Scapy
~ needs_root
############
############
+ Test bridge_and_sniff() using tap sockets
~ tap
= Create two tap interfaces
import subprocess
from threading import Thread
tap0, tap1 = [TunTapInterface("tap%d" % i) for i in range(2)]
chk_kwargs = {"timeout": 3}
if LINUX:
for i in range(2):
assert subprocess.check_call(["ip", "link", "set", "tap%d" % i, "up"], **chk_kwargs) == 0
else:
for i in range(2):
assert subprocess.check_call(["ifconfig", "tap%d" % i, "up"], **chk_kwargs) == 0
= Run a sniff thread on the tap1 **interface**
* It will terminate when 5 IP packets from 192.0.2.1 have been sniffed
started = threading.Event()
t_sniff = Thread(
target=sniff,
kwargs={"iface": "tap1", "count": 5, "prn": Packet.summary,
"lfilter": lambda p: IP in p and p[IP].src == "192.0.2.1",
"started_callback": started.set},
name="tests sniff 1")
t_sniff.start()
started.wait(timeout=5)
= Run a bridge_and_sniff thread between the taps **sockets**
* It will terminate when 5 IP packets from 192.0.2.1 have been forwarded
started = threading.Event()
t_bridge = Thread(target=bridge_and_sniff, args=(tap0, tap1),
kwargs={"store": False, "count": 5, 'prn': Packet.summary,
"lfilter": lambda p: IP in p and p[IP].src == "192.0.2.1",
"started_callback": started.set},
name="tests bridge_and_sniff 1")
t_bridge.start()
started.wait(timeout=5)
= Send five IP packets from 192.0.2.1 to the tap0 **interface**
sendp([Ether(dst=ETHER_BROADCAST) / IP(src="192.0.2.1") / ICMP()], iface="tap0",
count=5)
= Wait for the threads
t_bridge.join(5)
t_sniff.join(5)
assert not t_bridge.is_alive()
assert not t_sniff.is_alive()
= Run a sniff thread on the tap1 **interface**
* It will terminate when 5 IP packets from 198.51.100.1 have been sniffed
started = threading.Event()
t_sniff = Thread(
target=sniff,
kwargs={"iface": "tap1", "count": 5, "prn": Packet.summary,
"lfilter": lambda p: IP in p and p[IP].src == "198.51.100.1",
"started_callback": started.set},
name="tests sniff 2")
t_sniff.start()
started.wait(timeout=5)
= Run a bridge_and_sniff thread between the taps **sockets**
* It will "NAT" packets from 192.0.2.1 to 198.51.100.1 and will terminate when 5 IP packets have been forwarded
def nat_1_2(pkt):
if IP in pkt and pkt[IP].src == "192.0.2.1":
pkt[IP].src = "198.51.100.1"
del pkt[IP].chksum
return pkt
return False
started = threading.Event()
t_bridge = Thread(target=bridge_and_sniff, args=(tap0, tap1),
kwargs={"store": False, "count": 5, 'prn': Packet.summary,
"xfrm12": nat_1_2,
"lfilter": lambda p: IP in p and p[IP].src == "192.0.2.1",
"started_callback": started.set},
name="tests bridge_and_sniff 2")
t_bridge.start()
started.wait(timeout=5)
= Send five IP packets from 192.0.2.1 to the tap0 **interface**
sendp([Ether(dst=ETHER_BROADCAST) / IP(src="192.0.2.1") / ICMP()], iface="tap0",
count=5)
= Wait for the threads
t_bridge.join(5)
t_sniff.join(5)
assert not t_bridge.is_alive()
assert not t_sniff.is_alive()
= Delete the tap interfaces
if conf.use_pypy:
# See https://pypy.readthedocs.io/en/latest/cpython_differences.html
tap0.close()
tap1.close()
else:
del tap0, tap1
############
############
+ Test bridge_and_sniff() using tun sockets
~ tun not_libpcap
= Create two tun interfaces
import subprocess
from threading import Thread
tun0, tun1 = [TunTapInterface("tun%d" % i) for i in range(2)]
chk_kwargs = {"timeout": 3}
if LINUX:
for i in range(2):
assert subprocess.check_call(["ip", "link", "set", "tun%d" % i, "up"], **chk_kwargs) == 0
assert subprocess.check_call([
"ip", "addr", "change",
"192.0.2.1", "peer", "192.0.2.2", "dev", "tun0"], **chk_kwargs) == 0
else:
for i in range(2):
assert subprocess.check_call(["ifconfig", "tun%d" % i, "up"], **chk_kwargs) == 0
assert subprocess.check_call(["ifconfig", "tun0", "192.0.2.1", "192.0.2.2"], **chk_kwargs) == 0
= Run a sniff thread on the tun1 **interface**
* It will terminate when 5 IP packets from 192.0.2.1 have been sniffed
started = threading.Event()
t_sniff = Thread(target=sniff,
kwargs={"iface": "tun1", "count": 5,
"prn": Packet.summary,
"lfilter": lambda p: IP in p and p[IP].src == "192.0.2.1",
"started_callback": started.set},
name="tests sniff 3")
t_sniff.start()
started.wait(timeout=5)
= Run a bridge_and_sniff thread between the tuns **sockets**
* It will terminate when 5 IP packets from 192.0.2.1 have been forwarded.
started = threading.Event()
t_bridge = Thread(target=bridge_and_sniff, args=(tun0, tun1),
kwargs={"store": False, "count": 5, 'prn': Packet.summary,
"xfrm12": lambda pkt: pkt,
"lfilter": lambda p: IP in p and p[IP].src == "192.0.2.1",
"started_callback": started.set},
name="tests bridge_and_sniff 3")
t_bridge.start()
started.wait(timeout=5)
= Send five IP packets from 192.0.2.1 to the tun0 **interface**
conf.route.add(net="192.0.2.2/32", dev="tun0")
send([IP(src="192.0.2.1", dst="192.0.2.2") / ICMP()], count=5, iface="tun0")
conf.route.delt(net="192.0.2.2/32", dev="tun0")
= Wait for the threads
t_bridge.join(5)
t_sniff.join(5)
assert not t_bridge.is_alive()
assert not t_sniff.is_alive()
= Run a sniff thread on the tun1 **interface**
* It will terminate when 5 IP packets from 198.51.100.1 have been sniffed
started = threading.Event()
t_sniff = Thread(target=sniff,
kwargs={"iface": "tun1", "count": 5, "prn": Packet.summary,
"lfilter": lambda p: IP in p and p[IP].src == "198.51.100.1",
"started_callback": started.set},
name="tests sniff 4")
t_sniff.start()
started.wait(timeout=5)
= Run a bridge_and_sniff thread between the tuns **sockets**
* It will "NAT" packets from 192.0.2.1 to 198.51.100.1 and will terminate when 5 IP packets have been forwarded
def nat_1_2(pkt):
if IP in pkt and pkt[IP].src == "192.0.2.1":
pkt[IP].src = "198.51.100.1"
del pkt[IP].chksum
return pkt
return False
started = threading.Event()
t_bridge = Thread(target=bridge_and_sniff, args=(tun0, tun1),
kwargs={"store": False, "count": 5, 'prn': Packet.summary,
"xfrm12": nat_1_2,
"lfilter": lambda p: IP in p and p[IP].src == "192.0.2.1",
"started_callback": started.set},
name="tests bridge_and_sniff 4")
t_bridge.start()
started.wait(timeout=5)
= Send five IP packets from 192.0.2.1 to the tun0 **interface**
conf.route.add(net="192.0.2.2/32", dev="tun0")
send([IP(src="192.0.2.1", dst="192.0.2.2") / ICMP()], count=5, iface="tun0")
conf.route.delt(net="192.0.2.2/32", dev="tun0")
= Wait for the threads
t_bridge.join(5)
t_sniff.join(5)
assert not t_bridge.is_alive()
assert not t_sniff.is_alive()
= Delete the tun interfaces
if conf.use_pypy:
# See https://pypy.readthedocs.io/en/latest/cpython_differences.html
tun0.close()
tun1.close()
else:
del tun0, tun1
############
############
+ Test bridge_and_sniff() using veth pairs
~ linux needs_root veth
= Ensure bridge_and_sniff does not close sockets if data is send within xfrm on ingress interface
from scapy.arch.linux import VEthPair
with VEthPair('a_0', 'a_1') as veth_0:
with VEthPair('b_0', 'b_1') as veth_1:
xfrm_count = {
'a_0':0,
'b_0': 0
}
def xfrm_x(pkt):
pkt_tx = pkt.copy()
ether_lyr = pkt_tx[Ether]
ether_lyr.type = 0x1234 # we send to peer interface - avoid loop
# send on receiving interface - triggers return None on recv() in L2Socket
sendp(pkt_tx, iface=pkt.sniffed_on)
global xfrm_count
xfrm_count[pkt.sniffed_on] = xfrm_count[pkt.sniffed_on] + 1
return True
started = threading.Event()
t_bridge = Thread(target=bridge_and_sniff,
args=('a_0', 'b_0'),
kwargs={
'xfrm12': xfrm_x,
'xfrm21': xfrm_x,
'store': False,
'count': 4,
'lfilter': lambda p: Ether in p and p[Ether].type == 0xbeef,
"started_callback": started.set},
name="tests bridge_and_sniff VEthPair")
t_bridge.start()
started.wait(timeout=5)
# send frames in both directions
for if_name in ['a_1', 'b_1', 'a_1', 'b_1']:
sendp([Ether(type=0xbeef) /
Raw(b'On a scale from one to ten what is your favourite colour of the alphabet?')],
iface=if_name)
t_bridge.join(1)
# now test of the socket used in bridge_and_sniff() was alive all the time
assert (xfrm_count['a_0'] == 2)
assert (xfrm_count['b_0'] == 2)
############
############
+ Test arpleak() using a tap socket
~ tap tcpdump
= Create a tap interface
from unittest import mock
import struct
import subprocess
from threading import Thread
import time
tap0 = TunTapInterface("tap0")
chk_kwargs = {"timeout": 3}
if LINUX:
assert subprocess.check_call(["ip", "link", "set", "tap0", "up"], **chk_kwargs) == 0
else:
assert subprocess.check_call(["ifconfig", "tap0", "up"], **chk_kwargs) == 0
= Check for arpleak
def answer_arp_leak(pkt):
mymac = b"\x00\x01\x02\x03\x04\x06"
myip = b"\xc0\x00\x02\x02" # 192.0.2.2
if not ARP in pkt:
return
e_src = pkt.src
pkt = raw(pkt[ARP])
if pkt[:4] != b'\x00\x01\x08\x00':
print("Invalid ARP")
return
hwlen, plen, op = struct.unpack('>BBH', pkt[4:8])
if op != 1:
print("Invalid ARP op")
return
fmt = ('%ds%ds' % (hwlen, plen)) * 2
hwsrc, psrc, hwdst, pdst = struct.unpack(fmt,
pkt[8:8 + (plen + hwlen) * 2])
if pdst[:4] != myip[:plen]:
print("Invalid ARP pdst %r" % pdst)
return
ans = Ether(dst=e_src, src=mymac, type=0x0806)
ans /= (b'\x00\x01\x08\x00' +
struct.pack('>BBH' + fmt,
hwlen, plen, 2, mymac, myip, hwsrc, psrc))
tap0.send(ans)
print('Answered!')
started = threading.Event()
t_answer = Thread(
target=sniff,
kwargs={"prn": answer_arp_leak, "timeout": 10, "store": False,
"opened_socket": tap0,
"started_callback": started.set},
name="tests answer_arp_leak")
t_answer.start()
started.wait(timeout=5)
@mock.patch("scapy.layers.l2.get_if_addr")
@mock.patch("scapy.layers.l2.get_if_hwaddr")
def test_arpleak(mock_get_if_hwaddr, mock_get_if_addr, hwlen=255, plen=255):
conf.route.ifadd("tap0", "192.0.2.0/24")
mock_get_if_addr.side_effect = lambda _: "192.0.2.1"
mock_get_if_hwaddr.side_effect = lambda _: "00:01:02:03:04:05"
return arpleak("192.0.2.2/31", timeout=2, hwlen=hwlen, plen=plen)
ans, unans = test_arpleak()
assert len(ans) == 1
assert len(unans) == 1
ans, unans = test_arpleak(hwlen=6)
assert len(ans) == 1
assert len(unans) == 1
ans, unans = test_arpleak(plen=4)
assert len(ans) == 1
assert len(unans) == 1
t_answer.join(15)
if t_answer.is_alive():
raise Exception("Test timed out")
if conf.use_pypy:
# See https://pypy.readthedocs.io/en/latest/cpython_differences.html
tap0.close()
else:
del tap0
#####
#####
+ Test sr() on multiple interfaces
= Setup multiple linux interfaces and ranges
~ linux needs_root dbg
import os
exit_status = os.system("ip netns add blob0")
exit_status |= os.system("ip netns add blob1")
exit_status |= os.system("ip link add name scapy0.0 type veth peer name scapy0.1")
exit_status |= os.system("ip link add name scapy1.0 type veth peer name scapy1.1")
exit_status |= os.system("ip link set scapy0.1 netns blob0 up")
exit_status |= os.system("ip link set scapy1.1 netns blob1 up")
exit_status |= os.system("ip addr add 100.64.2.1/24 dev scapy0.0")
exit_status |= os.system("ip addr add 100.64.3.1/24 dev scapy1.0")
exit_status |= os.system("ip --netns blob0 addr add 100.64.2.2/24 dev scapy0.1")
exit_status |= os.system("ip --netns blob1 addr add 100.64.3.2/24 dev scapy1.1")
exit_status |= os.system("ip link set scapy0.0 up")
exit_status |= os.system("ip link set scapy1.0 up")
assert exit_status == 0
conf.ifaces.reload()
conf.route.resync()
try:
pkts = sr(IP(dst=["100.64.2.2", "100.64.3.2"])/ICMP(), timeout=1)[0]
assert len(pkts) == 2
assert pkts[0].answer.src in ["100.64.2.2", "100.64.3.2"]
assert pkts[1].answer.src in ["100.64.2.2", "100.64.3.2"]
finally:
e = os.system("ip netns del blob0")
e = os.system("ip netns del blob1")
conf.ifaces.reload()
conf.route.resync()
= sr() performance test
~ linux needs_root veth not_pypy
import subprocess
import shlex
try:
# Create a dedicated network name space to simulate remote host
subprocess.check_call(shlex.split("sudo ip netns add scapy"))
# Create a virtual Ethernet pair to connect default and new NS
subprocess.check_call(shlex.split("sudo ip link add type veth"))
# Move veth1 to the new NS
subprocess.check_call(shlex.split("sudo ip link set veth1 netns scapy"))
# Setup vNIC in the default NS
subprocess.check_call(shlex.split("sudo ip link set veth0 up"))
subprocess.check_call(shlex.split("sudo ip addr add 192.168.168.1/24 dev veth0"))
# Setup vNIC in the dedicated NS
subprocess.check_call(shlex.split("sudo ip netns exec scapy ip link set lo up"))
subprocess.check_call(shlex.split("sudo ip netns exec scapy ip link set veth1 up"))
subprocess.check_call(shlex.split("sudo ip netns exec scapy ip addr add 192.168.168.2/24 dev veth1"))
# Perform test
conf.route.resync()
res, unansw = sr(IP(dst='192.168.168.2') / ICMP(seq=(1, 1000)), timeout=1, verbose=False)
finally:
try:
# Bring down the interfaces
subprocess.check_call(shlex.split("sudo ip netns exec scapy ip link set veth1 down"))
subprocess.check_call(shlex.split("sudo ip netns exec scapy ip link set lo down"))
# Delete the namespace
subprocess.check_call(shlex.split("sudo ip netns delete scapy"))
# Remove the virtual Ethernet pair
subprocess.check_call(shlex.split("sudo ip link delete veth0"))
except subprocess.CalledProcessError as e:
print(f"Error during cleanup: {e}")
len(res) == 1000