| % Regression tests for TFTP |
| |
| # More information at http://www.secdev.org/projects/UTscapy/ |
| |
| + TFTP coverage tests |
| |
| = Test answers |
| |
| assert TFTP_DATA(block=1).answers(TFTP_RRQ()) |
| assert not TFTP_WRQ().answers(TFTP_RRQ()) |
| assert not TFTP_RRQ().answers(TFTP_WRQ()) |
| assert TFTP_ACK(block=1).answers(TFTP_DATA(block=1)) |
| assert not TFTP_ACK(block=0).answers(TFTP_DATA(block=1)) |
| assert TFTP_ACK(block=0).answers(TFTP_RRQ()) |
| assert not TFTP_ACK().answers(TFTP_ACK()) |
| assert TFTP_ERROR().answers(TFTP_DATA()) and TFTP_ERROR().answers(TFTP_ACK()) |
| assert TFTP_OACK().answers(TFTP_WRQ()) |
| |
| + TFTP Automatons |
| ~ linux |
| |
| = Utilities |
| ~ linux |
| |
| from scapy.automaton import select_objects |
| |
| class MockTFTPSocket(object): |
| packets = [] |
| def recv(self, n=None): |
| pkt = self.packets.pop(0) |
| return pkt |
| def send(self, *args, **kargs): |
| pass |
| def close(self): |
| pass |
| @classmethod |
| def select(classname, inputs, remain): |
| test = [s for s in inputs if isinstance(s, classname)] |
| if test: |
| if len(test[0].packets): |
| return test |
| else: |
| inputs = [s for s in inputs if not isinstance(s, classname)] |
| return select_objects(inputs, remain) |
| |
| |
| = TFTP_read() automaton |
| ~ linux |
| |
| class MockReadSocket(MockTFTPSocket): |
| packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_DATA(block=1) / ("P" * 512), |
| IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_DATA(block=2) / "<3"] |
| |
| tftp_read = TFTP_read("file.txt", "1.2.3.4", sport=0x2807, |
| ll=MockReadSocket, |
| recvsock=MockReadSocket, debug=5) |
| |
| res = tftp_read.run() |
| assert res == (b"P" * 512 + b"<3") |
| |
| = TFTP_read() automaton error |
| ~ linux |
| |
| class MockReadSocket(MockTFTPSocket): |
| packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ERROR(errorcode=2, errormsg="Fatal error")] |
| |
| tftp_read = TFTP_read("file.txt", "1.2.3.4", sport=0x2807, |
| ll=MockReadSocket, |
| recvsock=MockReadSocket) |
| |
| try: |
| tftp_read.run() |
| assert False |
| except Automaton.ErrorState as e: |
| assert "Reached ERROR" in str(e) |
| assert "ERROR Access violation" in str(e) |
| |
| |
| = TFTP_write() automaton |
| ~ linux |
| |
| data_received = b"" |
| |
| class MockWriteSocket(MockTFTPSocket): |
| packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ACK(block=0), |
| IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ACK(block=1) ] |
| def send(self, *args, **kargs): |
| if len(args) and Raw in args[0]: |
| global data_received |
| data_received += args[0][Raw].load |
| |
| tftp_write = TFTP_write("file.txt", "P" * 767 + "Scapy <3", "1.2.3.4", sport=0x2807, |
| ll=MockWriteSocket, |
| recvsock=MockWriteSocket) |
| |
| tftp_write.run() |
| assert data_received == (b"P" * 767 + b"Scapy <3") |
| |
| = TFTP_write() automaton error |
| ~ linux |
| |
| class MockWriteSocket(MockTFTPSocket): |
| packets = [IP(src="1.2.3.4") / UDP(dport=0x2807) / TFTP_ERROR(errorcode=2, errormsg="Fatal error")] |
| |
| tftp_write = TFTP_write("file.txt", "P" * 767 + "Scapy <3", "1.2.3.4", sport=0x2807, |
| ll=MockWriteSocket, |
| recvsock=MockWriteSocket) |
| |
| try: |
| tftp_write.run() |
| assert False |
| except Automaton.ErrorState as e: |
| assert "Reached ERROR" in str(e) |
| assert "ERROR Access violation" in str(e) |
| |
| |
| = TFTP_WRQ_server() automaton |
| ~ linux |
| |
| class MockWRQSocket(MockTFTPSocket): |
| packets = [IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_WRQ(filename="scapy.txt"), |
| IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=1) / ("P" * 512), |
| IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=2) / "<3"] |
| |
| tftp_wrq = TFTP_WRQ_server(ip="1.2.3.4", sport=0x2807, |
| ll=MockWRQSocket, |
| recvsock=MockWRQSocket) |
| assert tftp_wrq.run() == (b"scapy.txt", (b"P" * 512 + b"<3")) |
| |
| = TFTP_WRQ_server() automaton with options |
| ~ linux |
| |
| class MockWRQSocket(MockTFTPSocket): |
| packets = [IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_WRQ(filename="scapy.txt") / TFTP_Options(options=[TFTP_Option(oname="blksize", value="100")]), |
| IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=1) / ("P" * 100), |
| IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_DATA(block=2) / "<3"] |
| |
| tftp_wrq = TFTP_WRQ_server(ip="1.2.3.4", sport=0x2807, |
| ll=MockWRQSocket, |
| recvsock=MockWRQSocket) |
| assert tftp_wrq.run() == (b"scapy.txt", (b"P" * 100 + b"<3")) |
| |
| = TFTP_RRQ_server() automaton |
| ~ linux |
| |
| sent_data = "P" * 512 + "<3" |
| import tempfile |
| filename = tempfile.mktemp(suffix=".txt") |
| fdesc = open(filename, "w") |
| fdesc.write(sent_data) |
| fdesc.close() |
| |
| received_data = "" |
| |
| class MockRRQSocket(MockTFTPSocket): |
| packets = [IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_RRQ(filename="scapy.txt") / TFTP_Options(options=[TFTP_Option(oname="blksize", value="100")]), |
| IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_RRQ(filename=filename[5:]) / TFTP_Options(), |
| IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_ACK(block=1), |
| IP(dst="1.2.3.4") / UDP(dport=0x2807) / TFTP() / TFTP_ACK(block=2) ] |
| def send(self, *args, **kargs): |
| if len(args): |
| pkt = args[0] |
| if TFTP_DATA in pkt: |
| global received_data |
| received_data += pkt[Raw].load.decode("utf-8") |
| |
| tftp_rrq = TFTP_RRQ_server(ip="1.2.3.4", sport=0x2807, dir="/tmp/", serve_one=True, |
| ll=MockRRQSocket, |
| recvsock=MockRRQSocket) |
| tftp_rrq.run() |
| assert received_data == sent_data |
| |
| import os |
| os.unlink(filename) |