| % Regression tests for Scapy Answering Machines |
| |
| # More information at http://www.secdev.org/projects/UTscapy/ |
| |
| |
| ############ |
| ############ |
| + Answering Machines |
| |
| = Generic answering machine mocker |
| from unittest import mock |
| @mock.patch("scapy.ansmachine.sniff") |
| def test_am(cls_name, packet_query, check_reply, mock_sniff, **kargs): |
| packet_query = packet_query.__class__(bytes(packet_query)) |
| def sniff(*args,**kargs): |
| kargs["prn"](packet_query) |
| mock_sniff.side_effect = sniff |
| am = cls_name(**kargs) |
| called = [False] |
| def _sndrpl(x): |
| called[0] = True |
| check_reply(x.__class__(bytes(x))) |
| am.send_reply = _sndrpl |
| am() |
| assert called[0], "Filter never passed for AnsweringMachine !" |
| |
| |
| = BOOT_am |
| def check_BOOTP_am_reply(packet): |
| assert BOOTP in packet and packet[BOOTP].op == 2 |
| assert packet[BOOTP].yiaddr == "192.168.1.128" and packet[BOOTP].giaddr == "192.168.1.1" |
| |
| test_am(BOOTP_am, |
| Ether()/IP()/UDP()/BOOTP(op=1), |
| check_BOOTP_am_reply) |
| |
| |
| = DHCP_am |
| def check_DHCP_am_reply(packet): |
| assert DHCP in packet and len(packet[DHCP].options) |
| assert ("domain", b"localnet") in packet[DHCP].options |
| |
| test_am(DHCP_am, |
| Ether()/IP()/UDP()/BOOTP(op=1)/DHCP(options=[('message-type', 'request')]), |
| check_DHCP_am_reply, |
| domain="localnet") |
| |
| |
| = ARP_am |
| def check_ARP_am_reply(packet): |
| assert ARP in packet and packet[ARP].psrc == "10.28.7.1" |
| assert packet[ARP].hwsrc == "00:01:02:03:04:05" |
| |
| test_am(ARP_am, |
| Ether()/ARP(pdst="10.28.7.1"), |
| check_ARP_am_reply, |
| IP_addr="10.28.7.1", |
| ARP_addr="00:01:02:03:04:05") |
| |
| = ICMPEcho_am |
| def check_ICMP_am_reply(packet): |
| packet.show() |
| assert packet[Ether].src != "ff:ff:ff:ff:ff:ff" |
| assert packet[Ether].dst == "aa:aa:aa:aa:aa:aa" |
| assert IP in packet and ICMP in packet |
| assert packet[IP].dst == "1.1.1.1" |
| assert packet[IP].src == "2.2.2.2" |
| assert packet[ICMP].seq == 12 |
| |
| test_am(ICMPEcho_am, |
| Ether(src="aa:aa:aa:aa:aa:aa", dst="ff:ff:ff:ff:ff:ff")/IP(src="1.1.1.1", dst="2.2.2.2")/ICMP(seq=12), |
| check_ICMP_am_reply) |
| |
| = DNS_am |
| def check_DNS_am_reply(packet): |
| assert packet[Ether].src == "bb:bb:bb:bb:bb:bb" |
| assert packet[Ether].dst == "aa:aa:aa:aa:aa:aa" |
| assert packet[IP].src == "127.0.0.2" |
| assert packet[IP].dst == "127.0.0.1" |
| assert DNS in packet and packet[DNS].ancount == 1 |
| assert packet[DNS].an[0].rdata == "192.168.1.1" |
| assert packet[DNS].qd[0].qname == b"www.secdev.org." |
| |
| test_am(DNS_am, |
| Ether(src="aa:aa:aa:aa:aa:aa", dst="bb:bb:bb:bb:bb:bb")/IP(src="127.0.0.1", dst="127.0.0.2")/UDP()/DNS(qd=DNSQR(qname="www.secdev.org")), |
| check_DNS_am_reply, |
| joker="192.168.1.1") |
| |
| def check_DNS_am_reply_srvmatch(packet): |
| assert DNS in packet and packet[DNS].ancount == 1 |
| assert isinstance(packet[DNS].an[0], DNSRRSRV) |
| assert packet[DNS].an[0].rrname == b'_ldap._tcp.dc._msdcs.scapy.fr.' |
| assert packet[DNS].an[0].port == 389 |
| assert packet[DNS].an[0].target == b'dc.scapy.fr.' |
| |
| test_am(DNS_am, |
| Ether()/IP()/UDP()/DNS(qd=DNSQR(qname=b'_ldap._tcp.dc._msdcs.scapy.fr.', qtype="SRV")), |
| check_DNS_am_reply_srvmatch, |
| srvmatch={"_ldap._tcp.dc._msdcs.scapy.fr": (389, "dc.scapy.fr")}) |
| |
| def check_DNS_am_reply_arpa(packet): |
| assert DNS in packet and packet[DNS].ancount == 1 |
| assert packet[DNS].an[0].rdata == b"scapy." |
| assert packet[DNS].an[0].rrname == b"1.0.16.172.in-addr.arpa." |
| |
| test_am(DNS_am, |
| Ether()/IP()/UDP()/DNS(qd=DNSQR(qname=b"1.0.16.172.in-addr.arpa.", qtype="PTR")), |
| check_DNS_am_reply_arpa, |
| jokerarpa="scapy") |
| |
| def check_DNS_am_reply2(packet): |
| assert DNS in packet and packet[DNS].ancount == 2 |
| assert packet[DNS].an[0].rdata == "128.0.0.1" |
| assert packet[DNS].an[1].rdata == "::1" |
| |
| test_am(DNS_am, |
| Ether()/IP(b'E\x00\x00H\x00\x01\x00\x00@\x11|\xa2\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x004\xe8\x9a\x00\x00\x01\x00\x00\x02\x00\x00\x00\x00\x00\x00\x06gaagle\x03com\x00\x00\x01\x00\x01\x06google\x03com\x00\x00\x1c\x00\x01'), |
| check_DNS_am_reply2, |
| match={"google.com": ("127.0.0.1", "::1"), "gaagle.com": "128.0.0.1"}, |
| joker=False) |
| |
| assert DNS_am().make_reply(Ether()) is None |
| assert DNS_am().make_reply(Ether()/IP()) is None |
| assert DNS_am().make_reply(Ether()/IP()/UDP()) is None |
| assert DNS_am().make_reply( |
| Ether()/IP()/UDP()/DNS(b'q\xa04\x00\x00\xa0\x01\x00\xf3\x00\x01\x04\x01y') |
| ) is None |
| |
| = LLMNR_am |
| def check_LLMNR_am_am_reply(packet): |
| # assert packet[Ether].src == get_if_hwaddr(conf.iface) |
| assert packet[Ether].dst == "aa:aa:aa:aa:aa:aa" |
| # assert packet[IP].src == get_if_addr(conf.iface) |
| assert packet[IP].dst == "192.168.0.1" |
| assert packet[UDP].dport == 51938 |
| assert packet[UDP].sport == 5355 |
| assert LLMNRResponse in packet and packet[LLMNRResponse].ancount == 1 and packet[LLMNRResponse].qdcount == 1 |
| assert packet[LLMNRResponse].qd[0].qname == b"TEST." |
| assert packet[LLMNRResponse].an[0].rdata == "192.168.1.1" |
| assert packet[LLMNRResponse].an[0].rrname == b"TEST." |
| assert packet[LLMNRResponse].an[0].ttl == 60 |
| |
| test_am(LLMNR_am, |
| Ether(src="aa:aa:aa:aa:aa:aa", dst="01:00:5e:00:00:fc")/IP(src="192.168.0.1", dst="224.0.0.252")/UDP(dport=5355, sport=51938)/LLMNRQuery(qd=DNSQR(qname=b"TEST.", qtype="A")), |
| check_LLMNR_am_am_reply, |
| ttl=60, |
| match={"TEST": "192.168.1.1"}) |
| |
| = mDNS_am |
| def check_mDNS_am_reply(packet): |
| packet.show() |
| # assert packet[Ether].src == get_if_hwaddr(conf.iface) |
| assert packet[Ether].dst == "01:00:5e:00:00:fb" |
| # assert packet[IP].src == get_if_addr(conf.iface) |
| assert packet[IP].dst == "224.0.0.251" |
| assert packet[IP].ttl == 255 |
| assert packet[UDP].dport == 5353 |
| assert packet[UDP].sport == 5353 |
| assert DNS in packet and packet[DNS].ancount == 1 and packet[DNS].qdcount == 0 |
| assert packet[DNS].an[0].rdata == "192.168.1.1" |
| assert packet[DNS].an[0].rrname == b"TEST.local." |
| assert packet[DNS].an[0].ttl == 10 |
| |
| test_am(mDNS_am, |
| Ether(src="aa:aa:aa:aa:aa:aa", dst="01:00:5e:00:00:fb")/IP(src="192.168.0.1", dst="224.0.0.251", ttl=1)/UDP(dport=5353, sport=5353)/DNS(qd=DNSQR(qname=b"TEST.local.", qtype="A")), |
| check_mDNS_am_reply, |
| joker="192.168.1.1") |
| |
| |
| def check_mDNS_am_reply2(packet): |
| # $ avahi-resolve -n bonjour.local |
| packet.show() |
| # assert packet[Ether].src == get_if_hwaddr(conf.iface) |
| assert packet[Ether].dst == "01:00:5e:00:00:fb" |
| # assert packet[IP].src == get_if_addr(conf.iface) |
| assert packet[IP].dst == "224.0.0.251" |
| assert packet[IP].ttl == 255 |
| assert packet[UDP].dport == 5353 |
| assert packet[UDP].sport == 5353 |
| assert DNS in packet and packet[DNS].ancount == 2 and packet[DNS].qdcount == 0 |
| assert packet[DNS].an[0].rdata == "192.168.1.1" |
| assert packet[DNS].an[0].rrname == b"bonjour.local." |
| assert packet[DNS].an[0].ttl == 120 |
| assert packet[DNS].an[1].type == 47 |
| assert packet[DNS].an[1].rrname == b"bonjour.local." |
| assert packet[DNS].an[1].ttl == 120 |
| |
| test_am(mDNS_am, |
| Ether(b'\x01\x00^\x00\x00\xfb\xaa\xaa\xaa\xaa\xaa\xaa\x08\x00E\x00\x00A\xce}@\x00\xff\x11\x0b\x89\xc0\xa8\x00\x01\xe0\x00\x00\xfb\x14\xe9\x14\xe9\x00-\xdbl\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x07bonjour\x05local\x00\x00\x01\x00\x01\xc0\x0c\x00\x1c\x00\x01'), |
| check_mDNS_am_reply2, |
| joker="192.168.1.1", |
| ttl=120) |
| |
| = DHCPv6_am - Basic Instantiaion |
| ~ osx netaccess |
| a = DHCPv6_am() |
| a.usage() |
| |
| a.parse_options(dns="2001:500::1035", domain="localdomain, local", duid=None, |
| iface=conf.iface, advpref=255, sntpservers=None, |
| sipdomains=None, sipservers=None, |
| nisdomain=None, nisservers=None, |
| nispdomain=None, nispservers=None, |
| bcmcsdomains=None, bcmcsservers=None, |
| debug=1) |
| |
| = DHCPv6_am - SOLICIT |
| ~ osx netaccess |
| req = IPv6(dst="::1")/UDP()/DHCP6(msgtype=1)/DHCP6OptClientId(duid=DUID_LLT()) |
| assert a.is_request(req) |
| res = a.make_reply(req) |
| assert not a.is_request(res) |
| assert res[DHCP6_Advertise] |
| assert res[DHCP6OptPref].prefval == 255 |
| assert res[DHCP6OptReconfAccept] |
| a.print_reply(req, res) |
| |
| = DHCPv6_am - INFO-REQUEST |
| ~ osx netaccess |
| req = IPv6(dst="::1")/UDP()/DHCP6(msgtype=11)/DHCP6OptClientId(duid=DUID_LLT()) |
| assert a.is_request(req) |
| res = a.make_reply(req) |
| assert not a.is_request(res) |
| assert res[DHCP6_Reply] |
| assert "local" in res[DHCP6OptDNSDomains].dnsdomains |
| a.print_reply(req, res) |
| |
| = DHCPv6_am - REQUEST |
| ~ osx netaccess |
| req = IPv6(dst="::1")/UDP()/DHCP6(msgtype=3)/DHCP6OptClientId(duid=DUID_LLT())/DHCP6OptServerId(duid=a.duid) |
| assert a.is_request(req) |
| res = a.make_reply(req) |
| assert not a.is_request(res) |
| assert res[UDP].dport == 546 |
| assert res[DHCP6_Solicit] |
| a.print_reply(req, res) |
| |
| = WiFi_am |
| from unittest import mock |
| @mock.patch("scapy.layers.dot11.sniff") |
| def test_WiFi_am(packet_query, check_reply, mock_sniff, **kargs): |
| def sniff(*args,**kargs): |
| kargs["prn"](packet_query) |
| mock_sniff.side_effect = sniff |
| am = WiFi_am(**kargs) |
| am.send_reply = check_reply |
| am() |
| |
| def check_WiFi_am_reply(packet): |
| assert isinstance(packet, list) and len(packet) == 2 |
| assert TCP in packet[0] and Raw in packet[0] and raw(packet[0][Raw]) == b"5c4pY" |
| |
| test_WiFi_am(Dot11(FCfield="to-DS")/IP()/TCP()/"Scapy", |
| check_WiFi_am_reply, |
| iffrom="scapy0", ifto="scapy1", replace="5c4pY", pattern="Scapy") |
| |
| |
| = NBNS_am |
| def check_NBNS_am_reply(name): |
| def check(packet): |
| packet.show() |
| assert packet[Ether].src != "ff:ff:ff:ff:ff:ff" |
| assert packet[Ether].dst == "aa:aa:aa:aa:aa:aa" |
| assert NBNSQueryResponse in packet and packet[NBNSQueryResponse].RR_NAME == name |
| return check |
| |
| for server_name in (None, "", b"test", "test"): |
| test_am(NBNS_am, |
| Ether(src="aa:aa:aa:aa:aa:aa", dst="ff:ff:ff:ff:ff:ff")/IP()/UDP()/NBNSHeader()/NBNSQueryRequest(QUESTION_NAME="test"), |
| check_NBNS_am_reply(b"test"), |
| server_name=server_name) |
| |
| test_am(NBNS_am, |
| Ether(src="aa:aa:aa:aa:aa:aa", dst="ff:ff:ff:ff:ff:ff")/IP()/UDP()/NBNSHeader()/NBNSQueryRequest(QUESTION_NAME=b"\x85"), |
| check_NBNS_am_reply(b"\x85"), |
| server_name=b"\x85") |
| |
| = LdapPing_am |
| def check_LdapPing_am_reply(packet): |
| nlogon = packet[CLDAP].protocolOp.attributes[0] |
| assert nlogon.type == b"Netlogon" |
| logonresp = NETLOGON(nlogon.values[0].value.val) |
| assert isinstance(logonresp, NETLOGON_SAM_LOGON_RESPONSE_EX) |
| logonresp.show() |
| assert logonresp.DnsForestName == b'scapy.fr.', "DnsForestName" |
| assert logonresp.DnsDomainName == b'scapy.fr.', "DnsDomainName" |
| assert logonresp.DnsHostName == b'DC.scapy.fr.', "DnsHostName" |
| assert logonresp.NetbiosDomainName == b'SCAPY.', "NetbiosDomainName" |
| assert logonresp.NetbiosComputerName == b'DC.', "NetbiosComputerName" |
| assert logonresp.NtVersion == 3, "NtVersion" |
| assert logonresp.Flags == 0x3f3fd, "Flags" |
| assert logonresp.ClientSiteName == b'Default-First-Site-Name.', "ClientSiteName" |
| |
| test_am(LdapPing_am, |
| Ether(b'\xaa\xaa\xaa\xaa\xaa\xaa\xbb\xbb\xbb\xbb\xbb\xbb\x08\x00E\x00\x00\xaf\x9d\xb1\x00\x00\x80\x11\x9c\x89\xac\x13P\x01\xac\x13W\xdb\xc7{\x01\x85\x00\x9bV[0q\x02\x01\x01cl\x04\x00\n\x01\x00\n\x01\x00\x02\x01\x00\x02\x01\x00\x01\x01\x00\xa0M\xa3\x15\x04\tDnsDomain\x04\x08scapy.fr\xa3\x0e\x04\x04Host\x04\x06HOST01\xa3\r\x04\x05NtVer\x04\x04\x16\x00\x00 \xa3\x15\x04\x0bDnsHostName\x04\x06HOST010\n\x04\x08Netlogon'), |
| check_LdapPing_am_reply, |
| NetbiosComputerName="DC", |
| NetbiosDomainName="SCAPY", |
| DnsForestName="scapy.fr") |
| |
| |
| def check_NBNS_LdapPing_am_reply(packet): |
| packet.show() |
| assert SMBMailslot_Write in packet, "SMBMailslot_Write" |
| assert packet[SMBMailslot_Write].Name == b'\\MAILSLOT\\NET\\GETDC510CC0AD', "SMBMailslot_Write.Name" |
| logonresp = NETLOGON(packet[SMBMailslot_Write].Data.load) |
| logonresp.show() |
| assert logonresp.DcSockAddrSize == 16, "DcSockAddrSize" |
| assert isinstance(logonresp.DcSockAddr, DcSockAddr) |
| assert logonresp.DcSockAddr.sin_family == 2, "sin_family" |
| assert logonresp.DcSockAddr.sin_port == 0, "sin_port" |
| assert logonresp.DcSockAddr.sin_zero == 0, "sin_zero" |
| assert logonresp.DcSockAddr.sin_addr == get_if_addr(conf.iface) |
| assert logonresp.DnsForestName == b'scapy.fr.', "DnsForestName" |
| assert logonresp.DnsDomainName == b'scapy.fr.', "DnsDomainName" |
| assert logonresp.DnsHostName == b'DC.scapy.fr.', "DnsHostName" |
| assert logonresp.NetbiosDomainName == b'SCAPY.', "NetbiosDomainName" |
| assert logonresp.NetbiosComputerName == b'DC.', "NetbiosComputerName" |
| assert logonresp.NtVersion == 13, "NtVersion" |
| assert logonresp.Flags == 0x3f3fd, "Flags" |
| assert logonresp.ClientSiteName == b'Default-First-Site-Name.', "ClientSiteName" |
| |
| test_am(LdapPing_am, |
| Ether(b'\xaa\xaa\xaa\xaa\xaa\xaa\xbb\xbb\xbb\xbb\xbb\xbb\x08\x00E\x00\x01\n\xff\x82\x00\x00\x80\x11:]\xac\x13P\x01\xac\x13W\xdb\x00\x8a\x00\x8a\x00\xf6\xd5\xcb\x10\x02\xde\x9d\xac\x13P\x01\x00\x8a\x00\xe0\x00\x00 EIEPFDFEDADBCACACACACACACACACAAA\x00 FDEDEBFAFJCACACACACACACACACACABM\x00\xffSMB%\x00\x00\x00\x00\x18\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xfe\x00\x00\x00\x00\x11\x00\x00@\x00\x02\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\\\x00@\x00\\\x00\x03\x00\x01\x00\x00\x00\x02\x00W\x00\\MAILSLOT\\NET\\NETLOGON\x00\x12\x00\x00\x00H\x00O\x00S\x00T\x000\x001\x00\x00\x00\x00\x00\\MAILSLOT\\NET\\GETDC510CC0AD\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0b\x00\x00 \xff\xff\xff\xff'), |
| check_NBNS_LdapPing_am_reply, |
| NetbiosComputerName="DC", |
| NetbiosDomainName="SCAPY", |
| DnsForestName="scapy.fr") |