blob: a4844e006de517fc254e8fe0e52c2c06e9371ff1 [file] [log] [blame]
% Regression tests for Scapy Answering Machines
# More information at http://www.secdev.org/projects/UTscapy/
############
############
+ Answering Machines
= Generic answering machine mocker
from unittest import mock
@mock.patch("scapy.ansmachine.sniff")
def test_am(cls_name, packet_query, check_reply, mock_sniff, **kargs):
packet_query = packet_query.__class__(bytes(packet_query))
def sniff(*args,**kargs):
kargs["prn"](packet_query)
mock_sniff.side_effect = sniff
am = cls_name(**kargs)
called = [False]
def _sndrpl(x):
called[0] = True
check_reply(x.__class__(bytes(x)))
am.send_reply = _sndrpl
am()
assert called[0], "Filter never passed for AnsweringMachine !"
= BOOT_am
def check_BOOTP_am_reply(packet):
assert BOOTP in packet and packet[BOOTP].op == 2
assert packet[BOOTP].yiaddr == "192.168.1.128" and packet[BOOTP].giaddr == "192.168.1.1"
test_am(BOOTP_am,
Ether()/IP()/UDP()/BOOTP(op=1),
check_BOOTP_am_reply)
= DHCP_am
def check_DHCP_am_reply(packet):
assert DHCP in packet and len(packet[DHCP].options)
assert ("domain", b"localnet") in packet[DHCP].options
test_am(DHCP_am,
Ether()/IP()/UDP()/BOOTP(op=1)/DHCP(options=[('message-type', 'request')]),
check_DHCP_am_reply,
domain="localnet")
= ARP_am
def check_ARP_am_reply(packet):
assert ARP in packet and packet[ARP].psrc == "10.28.7.1"
assert packet[ARP].hwsrc == "00:01:02:03:04:05"
test_am(ARP_am,
Ether()/ARP(pdst="10.28.7.1"),
check_ARP_am_reply,
IP_addr="10.28.7.1",
ARP_addr="00:01:02:03:04:05")
= ICMPEcho_am
def check_ICMP_am_reply(packet):
packet.show()
assert packet[Ether].src != "ff:ff:ff:ff:ff:ff"
assert packet[Ether].dst == "aa:aa:aa:aa:aa:aa"
assert IP in packet and ICMP in packet
assert packet[IP].dst == "1.1.1.1"
assert packet[IP].src == "2.2.2.2"
assert packet[ICMP].seq == 12
test_am(ICMPEcho_am,
Ether(src="aa:aa:aa:aa:aa:aa", dst="ff:ff:ff:ff:ff:ff")/IP(src="1.1.1.1", dst="2.2.2.2")/ICMP(seq=12),
check_ICMP_am_reply)
= DNS_am
def check_DNS_am_reply(packet):
assert packet[Ether].src == "bb:bb:bb:bb:bb:bb"
assert packet[Ether].dst == "aa:aa:aa:aa:aa:aa"
assert packet[IP].src == "127.0.0.2"
assert packet[IP].dst == "127.0.0.1"
assert DNS in packet and packet[DNS].ancount == 1
assert packet[DNS].an[0].rdata == "192.168.1.1"
assert packet[DNS].qd[0].qname == b"www.secdev.org."
test_am(DNS_am,
Ether(src="aa:aa:aa:aa:aa:aa", dst="bb:bb:bb:bb:bb:bb")/IP(src="127.0.0.1", dst="127.0.0.2")/UDP()/DNS(qd=DNSQR(qname="www.secdev.org")),
check_DNS_am_reply,
joker="192.168.1.1")
def check_DNS_am_reply_srvmatch(packet):
assert DNS in packet and packet[DNS].ancount == 1
assert isinstance(packet[DNS].an[0], DNSRRSRV)
assert packet[DNS].an[0].rrname == b'_ldap._tcp.dc._msdcs.scapy.fr.'
assert packet[DNS].an[0].port == 389
assert packet[DNS].an[0].target == b'dc.scapy.fr.'
test_am(DNS_am,
Ether()/IP()/UDP()/DNS(qd=DNSQR(qname=b'_ldap._tcp.dc._msdcs.scapy.fr.', qtype="SRV")),
check_DNS_am_reply_srvmatch,
srvmatch={"_ldap._tcp.dc._msdcs.scapy.fr": (389, "dc.scapy.fr")})
def check_DNS_am_reply_arpa(packet):
assert DNS in packet and packet[DNS].ancount == 1
assert packet[DNS].an[0].rdata == b"scapy."
assert packet[DNS].an[0].rrname == b"1.0.16.172.in-addr.arpa."
test_am(DNS_am,
Ether()/IP()/UDP()/DNS(qd=DNSQR(qname=b"1.0.16.172.in-addr.arpa.", qtype="PTR")),
check_DNS_am_reply_arpa,
jokerarpa="scapy")
def check_DNS_am_reply2(packet):
assert DNS in packet and packet[DNS].ancount == 2
assert packet[DNS].an[0].rdata == "128.0.0.1"
assert packet[DNS].an[1].rdata == "::1"
test_am(DNS_am,
Ether()/IP(b'E\x00\x00H\x00\x01\x00\x00@\x11|\xa2\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x004\xe8\x9a\x00\x00\x01\x00\x00\x02\x00\x00\x00\x00\x00\x00\x06gaagle\x03com\x00\x00\x01\x00\x01\x06google\x03com\x00\x00\x1c\x00\x01'),
check_DNS_am_reply2,
match={"google.com": ("127.0.0.1", "::1"), "gaagle.com": "128.0.0.1"},
joker=False)
assert DNS_am().make_reply(Ether()) is None
assert DNS_am().make_reply(Ether()/IP()) is None
assert DNS_am().make_reply(Ether()/IP()/UDP()) is None
assert DNS_am().make_reply(
Ether()/IP()/UDP()/DNS(b'q\xa04\x00\x00\xa0\x01\x00\xf3\x00\x01\x04\x01y')
) is None
= LLMNR_am
def check_LLMNR_am_am_reply(packet):
# assert packet[Ether].src == get_if_hwaddr(conf.iface)
assert packet[Ether].dst == "aa:aa:aa:aa:aa:aa"
# assert packet[IP].src == get_if_addr(conf.iface)
assert packet[IP].dst == "192.168.0.1"
assert packet[UDP].dport == 51938
assert packet[UDP].sport == 5355
assert LLMNRResponse in packet and packet[LLMNRResponse].ancount == 1 and packet[LLMNRResponse].qdcount == 1
assert packet[LLMNRResponse].qd[0].qname == b"TEST."
assert packet[LLMNRResponse].an[0].rdata == "192.168.1.1"
assert packet[LLMNRResponse].an[0].rrname == b"TEST."
assert packet[LLMNRResponse].an[0].ttl == 60
test_am(LLMNR_am,
Ether(src="aa:aa:aa:aa:aa:aa", dst="01:00:5e:00:00:fc")/IP(src="192.168.0.1", dst="224.0.0.252")/UDP(dport=5355, sport=51938)/LLMNRQuery(qd=DNSQR(qname=b"TEST.", qtype="A")),
check_LLMNR_am_am_reply,
ttl=60,
match={"TEST": "192.168.1.1"})
= mDNS_am
def check_mDNS_am_reply(packet):
packet.show()
# assert packet[Ether].src == get_if_hwaddr(conf.iface)
assert packet[Ether].dst == "01:00:5e:00:00:fb"
# assert packet[IP].src == get_if_addr(conf.iface)
assert packet[IP].dst == "224.0.0.251"
assert packet[IP].ttl == 255
assert packet[UDP].dport == 5353
assert packet[UDP].sport == 5353
assert DNS in packet and packet[DNS].ancount == 1 and packet[DNS].qdcount == 0
assert packet[DNS].an[0].rdata == "192.168.1.1"
assert packet[DNS].an[0].rrname == b"TEST.local."
assert packet[DNS].an[0].ttl == 10
test_am(mDNS_am,
Ether(src="aa:aa:aa:aa:aa:aa", dst="01:00:5e:00:00:fb")/IP(src="192.168.0.1", dst="224.0.0.251", ttl=1)/UDP(dport=5353, sport=5353)/DNS(qd=DNSQR(qname=b"TEST.local.", qtype="A")),
check_mDNS_am_reply,
joker="192.168.1.1")
def check_mDNS_am_reply2(packet):
# $ avahi-resolve -n bonjour.local
packet.show()
# assert packet[Ether].src == get_if_hwaddr(conf.iface)
assert packet[Ether].dst == "01:00:5e:00:00:fb"
# assert packet[IP].src == get_if_addr(conf.iface)
assert packet[IP].dst == "224.0.0.251"
assert packet[IP].ttl == 255
assert packet[UDP].dport == 5353
assert packet[UDP].sport == 5353
assert DNS in packet and packet[DNS].ancount == 2 and packet[DNS].qdcount == 0
assert packet[DNS].an[0].rdata == "192.168.1.1"
assert packet[DNS].an[0].rrname == b"bonjour.local."
assert packet[DNS].an[0].ttl == 120
assert packet[DNS].an[1].type == 47
assert packet[DNS].an[1].rrname == b"bonjour.local."
assert packet[DNS].an[1].ttl == 120
test_am(mDNS_am,
Ether(b'\x01\x00^\x00\x00\xfb\xaa\xaa\xaa\xaa\xaa\xaa\x08\x00E\x00\x00A\xce}@\x00\xff\x11\x0b\x89\xc0\xa8\x00\x01\xe0\x00\x00\xfb\x14\xe9\x14\xe9\x00-\xdbl\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x07bonjour\x05local\x00\x00\x01\x00\x01\xc0\x0c\x00\x1c\x00\x01'),
check_mDNS_am_reply2,
joker="192.168.1.1",
ttl=120)
= DHCPv6_am - Basic Instantiaion
~ osx netaccess
a = DHCPv6_am()
a.usage()
a.parse_options(dns="2001:500::1035", domain="localdomain, local", duid=None,
iface=conf.iface, advpref=255, sntpservers=None,
sipdomains=None, sipservers=None,
nisdomain=None, nisservers=None,
nispdomain=None, nispservers=None,
bcmcsdomains=None, bcmcsservers=None,
debug=1)
= DHCPv6_am - SOLICIT
~ osx netaccess
req = IPv6(dst="::1")/UDP()/DHCP6(msgtype=1)/DHCP6OptClientId(duid=DUID_LLT())
assert a.is_request(req)
res = a.make_reply(req)
assert not a.is_request(res)
assert res[DHCP6_Advertise]
assert res[DHCP6OptPref].prefval == 255
assert res[DHCP6OptReconfAccept]
a.print_reply(req, res)
= DHCPv6_am - INFO-REQUEST
~ osx netaccess
req = IPv6(dst="::1")/UDP()/DHCP6(msgtype=11)/DHCP6OptClientId(duid=DUID_LLT())
assert a.is_request(req)
res = a.make_reply(req)
assert not a.is_request(res)
assert res[DHCP6_Reply]
assert "local" in res[DHCP6OptDNSDomains].dnsdomains
a.print_reply(req, res)
= DHCPv6_am - REQUEST
~ osx netaccess
req = IPv6(dst="::1")/UDP()/DHCP6(msgtype=3)/DHCP6OptClientId(duid=DUID_LLT())/DHCP6OptServerId(duid=a.duid)
assert a.is_request(req)
res = a.make_reply(req)
assert not a.is_request(res)
assert res[UDP].dport == 546
assert res[DHCP6_Solicit]
a.print_reply(req, res)
= WiFi_am
from unittest import mock
@mock.patch("scapy.layers.dot11.sniff")
def test_WiFi_am(packet_query, check_reply, mock_sniff, **kargs):
def sniff(*args,**kargs):
kargs["prn"](packet_query)
mock_sniff.side_effect = sniff
am = WiFi_am(**kargs)
am.send_reply = check_reply
am()
def check_WiFi_am_reply(packet):
assert isinstance(packet, list) and len(packet) == 2
assert TCP in packet[0] and Raw in packet[0] and raw(packet[0][Raw]) == b"5c4pY"
test_WiFi_am(Dot11(FCfield="to-DS")/IP()/TCP()/"Scapy",
check_WiFi_am_reply,
iffrom="scapy0", ifto="scapy1", replace="5c4pY", pattern="Scapy")
= NBNS_am
def check_NBNS_am_reply(name):
def check(packet):
packet.show()
assert packet[Ether].src != "ff:ff:ff:ff:ff:ff"
assert packet[Ether].dst == "aa:aa:aa:aa:aa:aa"
assert NBNSQueryResponse in packet and packet[NBNSQueryResponse].RR_NAME == name
return check
for server_name in (None, "", b"test", "test"):
test_am(NBNS_am,
Ether(src="aa:aa:aa:aa:aa:aa", dst="ff:ff:ff:ff:ff:ff")/IP()/UDP()/NBNSHeader()/NBNSQueryRequest(QUESTION_NAME="test"),
check_NBNS_am_reply(b"test"),
server_name=server_name)
test_am(NBNS_am,
Ether(src="aa:aa:aa:aa:aa:aa", dst="ff:ff:ff:ff:ff:ff")/IP()/UDP()/NBNSHeader()/NBNSQueryRequest(QUESTION_NAME=b"\x85"),
check_NBNS_am_reply(b"\x85"),
server_name=b"\x85")
= LdapPing_am
def check_LdapPing_am_reply(packet):
nlogon = packet[CLDAP].protocolOp.attributes[0]
assert nlogon.type == b"Netlogon"
logonresp = NETLOGON(nlogon.values[0].value.val)
assert isinstance(logonresp, NETLOGON_SAM_LOGON_RESPONSE_EX)
logonresp.show()
assert logonresp.DnsForestName == b'scapy.fr.', "DnsForestName"
assert logonresp.DnsDomainName == b'scapy.fr.', "DnsDomainName"
assert logonresp.DnsHostName == b'DC.scapy.fr.', "DnsHostName"
assert logonresp.NetbiosDomainName == b'SCAPY.', "NetbiosDomainName"
assert logonresp.NetbiosComputerName == b'DC.', "NetbiosComputerName"
assert logonresp.NtVersion == 3, "NtVersion"
assert logonresp.Flags == 0x3f3fd, "Flags"
assert logonresp.ClientSiteName == b'Default-First-Site-Name.', "ClientSiteName"
test_am(LdapPing_am,
Ether(b'\xaa\xaa\xaa\xaa\xaa\xaa\xbb\xbb\xbb\xbb\xbb\xbb\x08\x00E\x00\x00\xaf\x9d\xb1\x00\x00\x80\x11\x9c\x89\xac\x13P\x01\xac\x13W\xdb\xc7{\x01\x85\x00\x9bV[0q\x02\x01\x01cl\x04\x00\n\x01\x00\n\x01\x00\x02\x01\x00\x02\x01\x00\x01\x01\x00\xa0M\xa3\x15\x04\tDnsDomain\x04\x08scapy.fr\xa3\x0e\x04\x04Host\x04\x06HOST01\xa3\r\x04\x05NtVer\x04\x04\x16\x00\x00 \xa3\x15\x04\x0bDnsHostName\x04\x06HOST010\n\x04\x08Netlogon'),
check_LdapPing_am_reply,
NetbiosComputerName="DC",
NetbiosDomainName="SCAPY",
DnsForestName="scapy.fr")
def check_NBNS_LdapPing_am_reply(packet):
packet.show()
assert SMBMailslot_Write in packet, "SMBMailslot_Write"
assert packet[SMBMailslot_Write].Name == b'\\MAILSLOT\\NET\\GETDC510CC0AD', "SMBMailslot_Write.Name"
logonresp = NETLOGON(packet[SMBMailslot_Write].Data.load)
logonresp.show()
assert logonresp.DcSockAddrSize == 16, "DcSockAddrSize"
assert isinstance(logonresp.DcSockAddr, DcSockAddr)
assert logonresp.DcSockAddr.sin_family == 2, "sin_family"
assert logonresp.DcSockAddr.sin_port == 0, "sin_port"
assert logonresp.DcSockAddr.sin_zero == 0, "sin_zero"
assert logonresp.DcSockAddr.sin_addr == get_if_addr(conf.iface)
assert logonresp.DnsForestName == b'scapy.fr.', "DnsForestName"
assert logonresp.DnsDomainName == b'scapy.fr.', "DnsDomainName"
assert logonresp.DnsHostName == b'DC.scapy.fr.', "DnsHostName"
assert logonresp.NetbiosDomainName == b'SCAPY.', "NetbiosDomainName"
assert logonresp.NetbiosComputerName == b'DC.', "NetbiosComputerName"
assert logonresp.NtVersion == 13, "NtVersion"
assert logonresp.Flags == 0x3f3fd, "Flags"
assert logonresp.ClientSiteName == b'Default-First-Site-Name.', "ClientSiteName"
test_am(LdapPing_am,
Ether(b'\xaa\xaa\xaa\xaa\xaa\xaa\xbb\xbb\xbb\xbb\xbb\xbb\x08\x00E\x00\x01\n\xff\x82\x00\x00\x80\x11:]\xac\x13P\x01\xac\x13W\xdb\x00\x8a\x00\x8a\x00\xf6\xd5\xcb\x10\x02\xde\x9d\xac\x13P\x01\x00\x8a\x00\xe0\x00\x00 EIEPFDFEDADBCACACACACACACACACAAA\x00 FDEDEBFAFJCACACACACACACACACACABM\x00\xffSMB%\x00\x00\x00\x00\x18\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xfe\x00\x00\x00\x00\x11\x00\x00@\x00\x02\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\\\x00@\x00\\\x00\x03\x00\x01\x00\x00\x00\x02\x00W\x00\\MAILSLOT\\NET\\NETLOGON\x00\x12\x00\x00\x00H\x00O\x00S\x00T\x000\x001\x00\x00\x00\x00\x00\\MAILSLOT\\NET\\GETDC510CC0AD\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0b\x00\x00 \xff\xff\xff\xff'),
check_NBNS_LdapPing_am_reply,
NetbiosComputerName="DC",
NetbiosDomainName="SCAPY",
DnsForestName="scapy.fr")