blob: 5ad435060a978b915ca5ae07a2512ec084ae5d8a [file] [log] [blame]
% Regression tests for TFTP
# More information at http://www.secdev.org/projects/UTscapy/
+ TFTP coverage tests
= Test answers
assert TFTP_DATA(block=1).answers(TFTP_RRQ())
assert not TFTP_WRQ().answers(TFTP_RRQ())
assert not TFTP_RRQ().answers(TFTP_WRQ())
assert TFTP_ACK(block=1).answers(TFTP_DATA(block=1))
assert not TFTP_ACK(block=0).answers(TFTP_DATA(block=1))
assert TFTP_ACK(block=0).answers(TFTP_RRQ())
assert not TFTP_ACK().answers(TFTP_ACK())
assert TFTP_ERROR().answers(TFTP_DATA()) and TFTP_ERROR().answers(TFTP_ACK())
assert TFTP_OACK().answers(TFTP_WRQ())
+ TFTP Automatons
~ linux
= Utilities
~ linux
from scapy.automaton import select_objects
class MockTFTPSocket(object):
packets = []
def recv(self, n=None):
pkt = self.packets.pop(0)
return pkt
def send(self, *args, **kargs):
pass
def close(self):
pass
@classmethod
def select(classname, inputs, remain):
test = [s for s in inputs if isinstance(s, classname)]
if test:
if len(test[0].packets):
return test
else:
inputs = [s for s in inputs if not isinstance(s, classname)]
return select_objects(inputs, remain)
= TFTP_read() automaton
~ linux
class MockReadSocket(MockTFTPSocket):
packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_DATA(block=1) / ("P" * 512),
IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_DATA(block=2) / "<3"]
tftp_read = TFTP_read("file.txt", "1.2.3.4", sport=0x2807,
ll=MockReadSocket,
recvsock=MockReadSocket, debug=5)
res = tftp_read.run()
assert res == (b"P" * 512 + b"<3")
= TFTP_read() automaton error
~ linux
class MockReadSocket(MockTFTPSocket):
packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ERROR(errorcode=2, errormsg="Fatal error")]
tftp_read = TFTP_read("file.txt", "1.2.3.4", sport=0x2807,
ll=MockReadSocket,
recvsock=MockReadSocket)
try:
tftp_read.run()
assert False
except Automaton.ErrorState as e:
assert "Reached ERROR" in str(e)
assert "ERROR Access violation" in str(e)
= TFTP_write() automaton
~ linux
data_received = b""
class MockWriteSocket(MockTFTPSocket):
packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ACK(block=0),
IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ACK(block=1) ]
def send(self, *args, **kargs):
if len(args) and Raw in args[0]:
global data_received
data_received += args[0][Raw].load
tftp_write = TFTP_write("file.txt", "P" * 767 + "Scapy <3", "1.2.3.4", sport=0x2807,
ll=MockWriteSocket,
recvsock=MockWriteSocket)
tftp_write.run()
assert data_received == (b"P" * 767 + b"Scapy <3")
= TFTP_write() automaton error
~ linux
class MockWriteSocket(MockTFTPSocket):
packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ERROR(errorcode=2, errormsg="Fatal error")]
tftp_write = TFTP_write("file.txt", "P" * 767 + "Scapy <3", "1.2.3.4", sport=0x2807,
ll=MockWriteSocket,
recvsock=MockWriteSocket)
try:
tftp_write.run()
assert False
except Automaton.ErrorState as e:
assert "Reached ERROR" in str(e)
assert "ERROR Access violation" in str(e)
= TFTP_WRQ_server() automaton
~ linux
class MockWRQSocket(MockTFTPSocket):
packets = [IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_WRQ(filename="scapy.txt"),
IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=1) / ("P" * 512),
IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=2) / "<3"]
tftp_wrq = TFTP_WRQ_server(ip="1.2.3.4", sport=0x2807,
ll=MockWRQSocket,
recvsock=MockWRQSocket)
assert tftp_wrq.run() == (b"scapy.txt", (b"P" * 512 + b"<3"))
= TFTP_WRQ_server() automaton with options
~ linux
class MockWRQSocket(MockTFTPSocket):
packets = [IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_WRQ(filename="scapy.txt") / TFTP_Options(options=[TFTP_Option(oname="blksize", value="100")]),
IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=1) / ("P" * 100),
IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=2) / "<3"]
tftp_wrq = TFTP_WRQ_server(ip="1.2.3.4", sport=0x2807,
ll=MockWRQSocket,
recvsock=MockWRQSocket)
assert tftp_wrq.run() == (b"scapy.txt", (b"P" * 100 + b"<3"))
= TFTP_RRQ_server() automaton
~ linux
sent_data = "P" * 512 + "<3"
import tempfile
filename = tempfile.mktemp(suffix=".txt")
fdesc = open(filename, "w")
fdesc.write(sent_data)
fdesc.close()
received_data = ""
class MockRRQSocket(MockTFTPSocket):
packets = [IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_RRQ(filename="scapy.txt") / TFTP_Options(options=[TFTP_Option(oname="blksize", value="100")]),
IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_RRQ(filename=filename[5:]) / TFTP_Options(),
IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_ACK(block=1),
IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_ACK(block=2) ]
def send(self, *args, **kargs):
if len(args):
pkt = args[0]
if TFTP_DATA in pkt:
global received_data
received_data += pkt[Raw].load.decode("utf-8")
tftp_rrq = TFTP_RRQ_server(ip="1.2.3.4", sport=0x2807, dir="/tmp/", serve_one=True,
ll=MockRRQSocket,
recvsock=MockRRQSocket)
tftp_rrq.run()
assert received_data == sent_data
import os
os.unlink(filename)