| % tuntap tests for Scapy |
| |
| # Packet capture-based tests are in sendsniff.uts |
| |
| ####### |
| + Test Linux-specific protocol headers for TunTap |
| ~ linux tun not_libpcap |
| |
| = Linux-specific protocol headers |
| |
| p = LinuxTunPacketInfo()/IP() |
| assert p.type == 2048 |
| |
| p = LinuxTunPacketInfo(raw(p)) |
| assert p.type == 2048 |
| assert isinstance(p.payload, IP) |
| |
| p = LinuxTunPacketInfo()/IPv6() |
| assert p.type == 0x86dd |
| |
| p = LinuxTunPacketInfo(raw(p)) |
| assert p.type == 0x86dd |
| |
| assert isinstance(p.payload, IPv6) |
| |
| ####### |
| + Test tun device |
| |
| ~ tun needs_root not_libpcap |
| |
| = Create a tun interface |
| |
| import subprocess |
| from threading import Thread |
| |
| tun0 = TunTapInterface("tun0", strip_packet_info=False) |
| |
| if LINUX: |
| assert subprocess.check_call(["ip", "link", "set", "tun0", "up"]) == 0 |
| assert subprocess.check_call([ |
| "ip", "addr", "change", |
| "192.0.2.1", "peer", "192.0.2.2", "dev", "tun0"]) == 0 |
| elif BSD: |
| assert subprocess.check_call(["ifconfig", "tun0", "up"]) == 0 |
| assert subprocess.check_call([ |
| "ifconfig", "tun0", "192.0.2.1", "192.0.2.2"]) == 0 |
| else: |
| raise NotImplementedError() |
| |
| conf.ifaces.reload() |
| conf.route.resync() |
| conf.route6.resync() |
| |
| = Setup ICMPEcho_am on the interface |
| |
| am = tun0.am(ICMPEcho_am, count=3) |
| am.defoptsniff['timeout'] = 5 |
| t_am = Thread(target=am) |
| t_am.start() |
| |
| = Send ping packets from OS into scapy |
| |
| send(IP(dst="192.0.2.2")/ICMP(seq=(1,3))) |
| |
| = Cleanup |
| |
| t_am.join(timeout=3) |
| |
| tun0.close() |
| |
| ####### |
| + Test strip_packet_info=False on Linux |
| |
| ~ tun linux needs_root not_libpcap |
| |
| = Create a tun interface |
| |
| if not LINUX: |
| raise NotImplementedError() |
| |
| import subprocess |
| |
| tun0 = TunTapInterface("tun0", strip_packet_info=False) |
| |
| assert subprocess.check_call(["ip", "link", "set", "tun0", "up"]) == 0 |
| assert subprocess.check_call([ |
| "ip", "addr", "change", |
| "192.0.2.1", "peer", "192.0.2.2", "dev", "tun0"]) == 0 |
| |
| conf.ifaces.reload() |
| conf.route.resync() |
| conf.route6.resync() |
| |
| = Send ping packets from Linux into Scapy |
| |
| def cb(): |
| send(IP(dst="192.0.2.2")/ICMP(seq=(1,3))) |
| |
| t = AsyncSniffer(opened_socket=tun0, lfilter=lambda x: ICMP in x, started_callback=cb, count=3) |
| t.start() |
| t.join(timeout=3) |
| |
| assert len(t.results) >= 3 |
| icmp4_sequences = set() |
| |
| for pkt in t.results: |
| pkt |
| assert isinstance(pkt, LinuxTunPacketInfo) |
| if not isinstance(pkt.payload, IP) or ICMP not in pkt: |
| # We might get IPv6 router solicitation or other traffic... |
| continue |
| if pkt[IP].src != '192.0.2.1' or pkt[IP].dst != '192.0.2.2' or pkt[ICMP].type != 8: |
| continue |
| icmp4_sequences.add(pkt.seq) |
| |
| # Expect to get 3 different ICMP sequence numbers |
| assert len(icmp4_sequences) == 3 |
| |
| = Delete the tun interface |
| tun0.close() |
| |
| + Test strip_packet_info=True and IPv6 |
| |
| ~ tun needs_root ipv6 not_libpcap |
| |
| = Create a tun interface with IPv4 + IPv6 |
| |
| import subprocess |
| |
| tun0 = TunTapInterface("tun0", strip_packet_info=True) |
| |
| if LINUX: |
| assert subprocess.check_call(["ip", "link", "set", "tun0", "up"]) == 0 |
| assert subprocess.check_call([ |
| "ip", "addr", "change", |
| "192.0.2.1", "peer", "192.0.2.2", "dev", "tun0"]) == 0 |
| assert subprocess.check_call([ |
| "ip", "-6", "addr", "add", |
| "2001:db8::1", "peer", "2001:db8::2", "dev", "tun0"]) == 0 |
| elif BSD: |
| assert subprocess.check_call(["ifconfig", "tun0", "up"]) == 0 |
| assert subprocess.check_call([ |
| "ifconfig", "tun0", "192.0.2.1", "192.0.2.2"]) == 0 |
| assert subprocess.check_call([ |
| "ifconfig", "tun0", "inet6", "2001:db8::1/128", "2001:db8::2"]) == 0 |
| else: |
| raise NotImplementedError() |
| |
| conf.ifaces.reload() |
| conf.route.resync() |
| conf.route6.resync() |
| |
| = Send ping packets from OS into Scapy |
| |
| def cb(): |
| send(IP(dst="192.0.2.2")/ICMP(seq=(1,3))) |
| send(IPv6(dst="2001:db8::2")/ICMPv6EchoRequest(seq=(1,3))) |
| |
| t = AsyncSniffer(opened_socket=tun0, lfilter=lambda x: ICMP in x or ICMPv6EchoRequest in x, started_callback=cb, count=6) |
| t.start() |
| t.join(timeout=3) |
| |
| assert len(t.results) >= 6 |
| icmp4_sequences = set() |
| icmp6_sequences = set() |
| |
| for pkt in t.results: |
| pkt |
| assert isinstance(pkt, (IP, IPv6)) |
| if (isinstance(pkt, IP) and |
| pkt[IP].src == "192.0.2.1" and pkt[IP].dst == "192.0.2.2" and |
| ICMP in pkt and pkt[ICMP].type == 8): |
| icmp4_sequences.add(pkt[ICMP].seq) |
| if (isinstance(pkt, IPv6) and |
| pkt[IPv6].src == "2001:db8::1" and pkt[IPv6].dst == "2001:db8::2" and |
| ICMPv6EchoRequest in pkt): |
| icmp6_sequences.add(pkt[ICMPv6EchoRequest].seq) |
| |
| # Expect to get 3 different ICMP sequence numbers |
| assert len(icmp4_sequences) == 3, ( |
| "Expected 3 IPv4 ICMP ping packets, got: " + repr(icmp4_sequences)) |
| assert len(icmp6_sequences) == 3, ( |
| "Expected 3 IPv6 ICMP ping packets, got: " + repr(icmp6_sequences)) |
| |
| = Delete the tun interface |
| tun0.close() |
| |
| + Test tap interfaces |
| |
| ~ tap needs_root not_libpcap |
| |
| = Create a tap interface with IPv4 |
| |
| import subprocess |
| |
| tap0 = TunTapInterface("tap0") |
| |
| if LINUX: |
| assert subprocess.check_call(["ip", "link", "set", "tap0", "up"]) == 0 |
| assert subprocess.check_call([ |
| "ip", "addr", "change", "192.0.2.1/30", "dev", "tap0"]) == 0 |
| assert subprocess.check_call([ |
| "ip", "neigh", "replace", |
| "192.0.2.2", "lladdr", "20:00:00:20:00:00", "dev", "tap0"]) == 0 |
| else: |
| assert subprocess.check_call(["ifconfig", "tap0", "up"]) == 0 |
| assert subprocess.check_call([ |
| "ifconfig", "tap0", "192.0.2.1", "netmask", "255.255.255.252"]) == 0 |
| assert subprocess.check_call([ |
| "arp", "-s", "192.0.2.2", "20:00:00:20:00:00", "temp"]) == 0 |
| |
| conf.ifaces.reload() |
| conf.route.resync() |
| conf.route6.resync() |
| |
| = Send ping packets from OS into Scapy |
| |
| conf.ifaces |
| conf.route |
| |
| def cb(): |
| sendp(Ether(dst="ff:ff:ff:ff:ff:ff")/IP(dst="192.0.2.2")/ICMP(seq=(1,3)), iface="tap0") |
| |
| t = AsyncSniffer(opened_socket=tap0, lfilter=lambda x: ICMP in x, started_callback=cb, count=3) |
| t.start() |
| t.join(timeout=3) |
| |
| assert len(t.results) >= 3 |
| icmp4_sequences = set() |
| |
| for pkt in t.results: |
| pkt |
| assert isinstance(pkt, Ether) |
| if (IP in pkt and |
| pkt[IP].src == "192.0.2.1" and pkt[IP].dst == "192.0.2.2" and |
| ICMP in pkt and pkt[ICMP].type == 8): |
| icmp4_sequences.add(pkt[ICMP].seq) |
| |
| # Expect to get 3 different ICMP sequence numbers |
| assert len(icmp4_sequences) == 3, ( |
| "Expected 3 IPv4 ICMP ping packets, got: " + repr(icmp4_sequences)) |
| |
| = Delete the tap interface |
| tap0.close() |
| |
| + Refresh interfaces |
| ~ linux tun tap not_libpcap |
| |
| = Cleanup |
| |
| conf.ifaces.reload() |
| conf.route.resync() |
| conf.route6.resync() |