| % Regression tests for the Ecu utility |
| |
| # More information at http://www.secdev.org/projects/UTscapy/ |
| |
| ############ |
| ############ |
| |
| + Setup |
| ~ conf command |
| |
| = Load modules |
| |
| import copy |
| import itertools |
| |
| load_contrib("isotp", globals_dict=globals()) |
| load_contrib("automotive.uds", globals_dict=globals()) |
| load_contrib("automotive.gm.gmlan", globals_dict=globals()) |
| load_layer("can", globals_dict=globals()) |
| conf.contribs["CAN"]["swap-bytes"] = True |
| |
| = Load Ecu module |
| |
| load_contrib("automotive.ecu", globals_dict=globals()) |
| from scapy.contrib.automotive.uds_ecu_states import * |
| from scapy.contrib.automotive.uds_logging import * |
| from scapy.contrib.automotive.gm.gmlan_ecu_states import * |
| from scapy.contrib.automotive.gm.gmlan_logging import * |
| |
| + EcuState Basic checks |
| |
| = Check EcuState basic functionality |
| |
| state = EcuState() |
| state["session"] = 2 |
| state["securityAccess"] = 4 |
| print(repr(state)) |
| assert repr(state) == "securityAccess4session2" |
| |
| = More complex tests |
| |
| state = EcuState(ses=4) |
| assert state.ses == 4 |
| state.ses = 5 |
| assert state.ses == 5 |
| |
| = Even more complex tests |
| |
| state = EcuState(myinfo="42") |
| |
| state.ses = 5 |
| assert state.ses == 5 |
| |
| state["ses"] = None |
| assert state.ses is None |
| |
| state.ses = 5 |
| assert 5 == state.ses |
| |
| assert "42" == state.myinfo |
| assert repr(state) == "myinfo42ses5" |
| |
| = Delete Attribute Test |
| |
| state = EcuState(myinfo="42") |
| |
| state.ses = 5 |
| assert state.ses == 5 |
| |
| del state.ses |
| |
| try: |
| x = state.ses |
| assert False |
| except (KeyError, AttributeError): |
| assert state.myinfo == "42" |
| |
| = Copy tests |
| |
| state = EcuState(myinfo="42") |
| state.ses = 5 |
| |
| ns = copy.copy(state) |
| |
| ns.ses = 6 |
| |
| assert ns.ses == 6 |
| assert state.ses == 5 |
| assert ns.myinfo == "42" |
| |
| |
| = Move tests |
| |
| state = EcuState(myinfo="42") |
| state.ses = 5 |
| |
| ns = state |
| |
| ns.ses = 6 |
| |
| assert ns.ses == 6 |
| assert state.ses == 6 |
| assert ns.myinfo == "42" |
| |
| = equal tests |
| |
| state = EcuState(myinfo="42") |
| state.ses = 5 |
| |
| ns = copy.copy(state) |
| |
| assert state == ns |
| assert hash(state) == hash(ns) |
| |
| ns.ses = 6 |
| |
| assert state != ns |
| assert hash(state) != hash(ns) |
| |
| ns.ses = 5 |
| |
| assert state == ns |
| assert hash(state) == hash(ns) |
| |
| ns.sa = 5 |
| |
| assert state != ns |
| assert hash(state) != hash(ns) |
| |
| |
| = hash tests |
| |
| state = EcuState(myinfo="42") |
| state.ses = 5 |
| |
| ns = copy.copy(state) |
| |
| assert hash(state) == hash(ns) |
| |
| ns.ses = 6 |
| |
| assert hash(state) != hash(ns) |
| |
| ns.ses = 5 |
| |
| assert hash(state) == hash(ns) |
| |
| ns.sa = 5 |
| |
| assert hash(state) != hash(ns) |
| |
| = command tests |
| |
| state = EcuState(myinfo="42") |
| state.ses = 5 |
| |
| state.command() |
| assert "EcuState(myinfo='42', ses=5)" == state.command() |
| |
| = less than tests |
| |
| s1 = EcuState() |
| s2 = EcuState() |
| |
| s1.a = 1 |
| s2.a = 2 |
| |
| assert s1 < s2 |
| |
| s1.b = 4 |
| |
| assert s1 > s2 |
| |
| s2.b = 1 |
| |
| assert s1 < s2 |
| |
| s1.a = 2 |
| |
| assert s1 > s2 |
| |
| = less than tests 2 |
| |
| s1 = EcuState() |
| s2 = EcuState() |
| |
| s1.c = "x" |
| s2.c = 4 |
| exception = False |
| |
| try: |
| assert s1 < s2 |
| except TypeError: |
| exception = True |
| |
| assert exception |
| |
| = less than tests 3 |
| |
| s1 = EcuState() |
| s2 = EcuState() |
| |
| |
| s1.A = 1 |
| s1.a = 2 |
| |
| s2.A = 2 |
| s2.a = 1 |
| |
| assert s1 < s2 |
| |
| = less than tests 4 |
| |
| s1 = EcuState() |
| s2 = EcuState() |
| |
| |
| s1.A = 1 |
| s1.a = 2 |
| |
| s2.A = 2 |
| s2.b = 100 |
| |
| assert s1 < s2 |
| |
| = less than tests 5 |
| |
| s1 = EcuState() |
| s2 = EcuState() |
| |
| |
| s1.A = 100 |
| s1.a = 2 |
| |
| s2.A = 2 |
| s2.b = 100 |
| |
| assert s1 > s2 |
| assert not s1 > s1 |
| assert not s1 < s1 |
| |
| = less than tests 6 |
| |
| s1 = EcuState() |
| s2 = EcuState() |
| |
| |
| s1.A = 100 |
| s1.B = 200 |
| |
| s2.a = 2 |
| s2.b = 1 |
| |
| assert s1 < s2 |
| |
| = contains test |
| |
| s1 = EcuState(ses=[1,2,3]) |
| s2 = EcuState(ses=1) |
| |
| assert s1 != s2 |
| assert s2 in s1 |
| assert s1 not in s2 |
| |
| s1 = EcuState(ses=[2,3]) |
| s2 = EcuState(ses=1) |
| |
| assert s1 != s2 |
| assert s2 not in s1 |
| assert s1 not in s2 |
| |
| |
| s1 = EcuState(ses=[1,2,3], security=5) |
| s2 = EcuState(ses=1) |
| |
| assert s1 != s2 |
| assert s2 not in s1 |
| assert s1 not in s2 |
| |
| s1 = EcuState(ses=range(4), security=[None, 5]) |
| s2 = EcuState(ses=1) |
| |
| assert s1 != s2 |
| assert s2 in s1 |
| assert s1 not in s2 |
| |
| s1 = EcuState(ses=range(4), security=[None, 5]) |
| s2 = EcuState(ses=range(2)) |
| |
| assert s1 != s2 |
| assert s2 < s1 |
| assert s2 in s1 |
| assert s1 not in s2 |
| |
| s1 = EcuState(ses=range(4), security=[None, 5]) |
| s2 = EcuState(ses=range(2), security=5) |
| |
| assert s1 != s2 |
| assert s2 in s1 |
| assert s1 not in s2 |
| |
| s1 = EcuState(ses=range(4), security=[None, 5]) |
| s2 = EcuState(ses=range(5)) |
| |
| assert s1 != s2 |
| assert s2 not in s1 |
| assert s1 not in s2 |
| |
| s1 = EcuState(ses=range(4), security=[None, range(5)]) |
| s2 = EcuState(ses=3) |
| |
| print(s1._expand()) |
| print(s2._expand()) |
| |
| assert s1 != s2 |
| assert s2 in s1 |
| assert s1 not in s2 |
| |
| s1 = EcuState(ses=range(4), security=[None, range(5), [5, 7, range(4), [range(10), 10]]]) |
| s2 = EcuState(ses=3, security=10) |
| |
| print(s1._expand()) |
| print(s2._expand()) |
| |
| assert s1 != s2 |
| assert s2 in s1 |
| assert s1 not in s2 |
| |
| s1 = EcuState(ses=range(4), security=[None, range(5), [5, 7, range(4), [range(10), "B"]]]) |
| s2 = EcuState(ses=3, security="B") |
| |
| print(s1._expand()) |
| print(s2._expand()) |
| |
| assert s1 != s2 |
| assert s2 in s1 |
| assert s1 not in s2 |
| |
| s1 = EcuState(ses=range(4), security=[None, range(5), [5, 7, range(4), [range(10), "B"]]]) |
| s2 = EcuState(ses=3, security="C") |
| |
| print(s1._expand()) |
| print(s2._expand()) |
| |
| assert s1 != s2 |
| assert s2 not in s1 |
| assert s1 not in s2 |
| |
| s1 = EcuState(ses=range(3), security=5) |
| s2 = EcuState(ses=1, security=5) |
| |
| assert s1 != s2 |
| assert s2 in s1 |
| assert s1 not in s2 |
| |
| s1 = EcuState(ses=range(3), security=(x for x in range(1, 10, 2))) |
| s2 = EcuState(ses=1, security=5) |
| |
| assert s1 != s2 |
| assert s2 in s1 |
| assert s1 not in s2 |
| |
| s1 = EcuState(ses=[1,2,3]) |
| s2 = EcuState(ses=[1,2,3]) |
| |
| assert s1 in s2 |
| assert s2 in s1 |
| assert s1 == s2 |
| |
| s1 = EcuState(ses=1) |
| s2 = EcuState(ses=1) |
| |
| assert s1 in s2 |
| assert s2 in s1 |
| assert s1 == s2 |
| |
| s1 = EcuState(ses=range(3), security=range(5)) |
| for ses, sec in itertools.product(range(3), range(5)): |
| s2 = EcuState(ses=ses, security=sec) |
| assert s1 != s2 |
| assert s2 in s1 |
| assert s1 not in s2 |
| |
| |
| s1 = EcuState(ses=[0, 1, 2], security=[43, 44]) |
| for ses, sec in itertools.product(range(3), range(43, 45)): |
| s2 = EcuState(ses=ses, security=sec) |
| assert s1 != s2 |
| assert s2 in s1 |
| assert s1 not in s2 |
| |
| s1 = EcuState(ses=[0, 1, 2], security=["a", "b"]) |
| for ses, sec in itertools.product(range(3), (x for x in "ab")): |
| s2 = EcuState(ses=ses, security=sec) |
| assert s1 != s2 |
| assert s2 in s1 |
| try: |
| assert s1 not in s2 |
| except TypeError: |
| assert True |
| |
| s1 = [EcuState(ses=1), EcuState(ses=2), EcuState(ses=3)] |
| s2 = EcuState(ses=3) |
| |
| assert s2 in s1 |
| assert s1 not in s2 |
| |
| |
| s1 = EcuState(ses=1, sa="SEC") |
| s2 = EcuState(ses=1, sa="SOC") |
| |
| assert s1 not in s2 |
| assert s2 not in s1 |
| assert s1 != s2 |
| |
| s1 = EcuState(ses=1, sa="SEC") |
| s2 = EcuState(ses=1, sa="SEC") |
| |
| assert s1 in s2 |
| assert s2 in s1 |
| assert s1 == s2 |
| |
| |
| s1 = EcuState(ses=1, sa="SEC") |
| s2 = EcuState(ses=1, sa=["SEC", "SOL"]) |
| |
| |
| assert s1 in s2 |
| assert s2 not in s1 |
| assert s1 != s2 |
| |
| |
| s1 = EcuState(ses=1, sa=b"SEC") |
| s2 = EcuState(ses=1, sa=[b"SEC", "SOL"]) |
| |
| assert s1 in s2 |
| assert s2 not in s1 |
| assert s1 != s2 |
| |
| |
| + EcuState modification tests |
| |
| = Basic definitions for tests |
| |
| class myPack1(Packet): |
| fields_desc = [ |
| IntField("fakefield", 1) |
| ] |
| |
| class myPack2(Packet): |
| fields_desc = [ |
| IntField("statefield", 1) |
| ] |
| |
| @EcuState.extend_pkt_with_modifier(myPack2) |
| def modify_ecu_state(self, req, ecustate): |
| # type: (Packet, Packet, EcuState) -> None |
| ecustate.state = self.statefield |
| |
| pkt = myPack1()/myPack2() |
| st = EcuState() |
| exception = False |
| |
| try: |
| assert st.state == 1 |
| except AttributeError: |
| exception = True |
| |
| assert exception == True |
| assert EcuState.is_modifier_pkt(pkt) |
| assert not EcuState.is_modifier_pkt(myPack1()) |
| |
| mod = EcuState.get_modified_ecu_state(pkt, Raw(), st) |
| assert mod != st |
| assert mod.state ==1 |
| |
| pkt2 = myPack1()/myPack1()/myPack1()/myPack2(statefield=5) |
| mod2 = EcuState.get_modified_ecu_state(pkt2, Raw(), mod) |
| |
| assert mod != mod2 |
| assert mod < mod2 |
| |
| pkt2 = myPack1()/myPack1()/myPack1()/myPack2(statefield=4)/myPack2(statefield=5) |
| mod2 = EcuState.get_modified_ecu_state(pkt2, Raw(), mod) |
| mod.state = 5 |
| assert mod != mod2 |
| assert mod > mod2 |
| |
| + EcuResponse tests |
| |
| = Basic checks |
| |
| resp = EcuResponse(EcuState(session=1), UDS()/UDS_DSCPR(b"\x03")) |
| |
| assert not resp.supports_state(EcuState()) |
| assert not resp.supports_state(EcuState(session=2)) |
| assert resp.supports_state(EcuState(session=1)) |
| assert resp.answers(UDS()/UDS_DSC(b"\x03")) |
| |
| = Command checks |
| |
| resp = EcuResponse(EcuState(session=1), UDS()/UDS_DSCPR(b"\x03")) |
| cmd = resp.command() |
| |
| print(cmd) |
| resp1 = eval(cmd) |
| assert resp1 == resp |
| |
| = Command checks 2 |
| |
| p1 = UDS(bytes(UDS()/UDS_NR(b"\x10\x00"))) |
| p2 = UDS(bytes(UDS()/UDS_DSCPR(b"\x03"))) |
| |
| resp = EcuResponse([EcuState(session=1), EcuState(session=3)], [p1, p2]) |
| cmd = resp.command() |
| |
| print(cmd) |
| resp1 = eval(cmd) |
| assert any(resp1.supports_state(s) for s in resp.states) |
| assert any(resp.supports_state(s) for s in resp1.states) |
| assert len(resp.responses) == len(resp1.responses) |
| assert all(bytes(x) == bytes(y) for x, y in zip(resp.responses, resp1.responses)) |
| assert resp1 == resp |
| |
| = Compare check |
| |
| p1 = UDS(bytes(UDS()/UDS_NR(b"\x10\x00"))) |
| p2 = UDS(bytes(UDS()/UDS_DSCPR(b"\x03"))) |
| |
| resp = EcuResponse([EcuState(session=1), EcuState(session=3)], [p1, p2]) |
| |
| resp1 = EcuResponse([EcuState(session=1)], [p1, p2]) |
| |
| resp2 = EcuResponse([EcuState(session=2)], [p1, p2]) |
| resp3 = EcuResponse([EcuState(session=1)], [p2]) |
| |
| |
| assert resp == resp1 |
| assert resp != resp2 |
| assert resp != resp3 |
| |
| = Key response check |
| |
| req = UDS()/UDS_DSC(b"\x03") |
| p1 = UDS(bytes(UDS()/UDS_NR(b"\x10\x00"))) |
| p2 = UDS(bytes(UDS()/UDS_DSCPR(b"\x03"))) |
| |
| resp = EcuResponse([EcuState(session=1), EcuState(session=3)], [p1, p2]) |
| |
| assert resp.answers(req) |
| assert resp.key_response.answers(req) |
| |
| |
| |
| + Ecu Simple operations |
| |
| = Log all commands applied to an Ecu |
| |
| msgs = [UDS(service=16) / UDS_DSC(diagnosticSessionType=3), |
| UDS(service=16) / UDS_DSC(diagnosticSessionType=4), |
| UDS(service=16) / UDS_DSC(diagnosticSessionType=5), |
| UDS(service=16) / UDS_DSC(diagnosticSessionType=6), |
| UDS(service=16) / UDS_DSC(diagnosticSessionType=2)] |
| |
| ecu = Ecu(verbose=False, store_supported_responses=False) |
| ecu.update(PacketList(msgs)) |
| assert len(ecu.log["DiagnosticSessionControl"]) == 5 |
| timestamp, value = ecu.log["DiagnosticSessionControl"][0] |
| assert timestamp > 0 |
| assert value == "extendedDiagnosticSession" |
| assert ecu.log["DiagnosticSessionControl"][-1][1] == "programmingSession" |
| |
| |
| = Trace all commands applied to an Ecu |
| |
| msgs = [UDS(service=16) / UDS_DSC(diagnosticSessionType=3), |
| UDS(service=80) / UDS_DSCPR(diagnosticSessionType=3, sessionParameterRecord=b'\\x002\\x01\\xf4')] |
| |
| ecu = Ecu(verbose=True, logging=False, store_supported_responses=False) |
| ecu.update(PacketList(msgs)) |
| assert ecu.state.session == 3 |
| |
| |
| = Generate supported responses of an Ecu |
| |
| msgs = [UDS(service=16) / UDS_DSC(diagnosticSessionType=3), |
| UDS(service=80) / UDS_DSCPR(diagnosticSessionType=3, sessionParameterRecord=b'\\x002\\x01\\xf4'), |
| UDS(service=16) / UDS_DSC(diagnosticSessionType=4)] |
| |
| ecu = Ecu(verbose=False, logging=False, store_supported_responses=True) |
| ecu.update(PacketList(msgs)) |
| supported_responses = ecu.supported_responses |
| unanswered_packets = ecu.unanswered_packets |
| assert ecu.state.session == 3 |
| assert len(supported_responses) == 1 |
| assert len(unanswered_packets) == 1 |
| |
| response = supported_responses[0] |
| print(response.command()) |
| assert response.supports_state(EcuState()) |
| assert response.key_response.service == 80 |
| assert unanswered_packets[0].diagnosticSessionType == 4 |
| |
| |
| + Ecu Advanced checks |
| |
| = Analyze multiple UDS messages |
| |
| udsmsgs = sniff(offline=scapy_path("test/pcaps/ecu_trace.pcap.gz"), |
| session=ISOTPSession(use_ext_address=False, basecls=UDS), |
| count=50, timeout=3) |
| |
| assert len(udsmsgs) == 50 |
| |
| ecu = Ecu() |
| ecu.update(udsmsgs) |
| response = ecu.supported_responses[0] |
| assert response.supports_state(EcuState()) |
| assert response.key_response.service == 80 |
| assert response.key_response.diagnosticSessionType == 3 |
| response = ecu.supported_responses[1] |
| assert response.supports_state(EcuState(session=3)) |
| assert response.key_response.service == 80 |
| assert response.key_response.diagnosticSessionType == 2 |
| response = ecu.supported_responses[4] |
| print(response) |
| state = EcuState(session=2, security_level=18) |
| print(state) |
| assert response.supports_state(state) |
| assert response.key_response.service == 110 |
| assert response.key_response.dataIdentifier == 61786 |
| assert len(ecu.log["TransferData"]) == 2 |
| |
| + EcuSession tests |
| |
| = Analyze on the fly with EcuSession |
| |
| session = EcuSession() |
| |
| with PcapReader(scapy_path("test/pcaps/ecu_trace.pcap.gz")) as sock: |
| udsmsgs = sniff(session=ISOTPSession(supersession=session, use_ext_address=False, basecls=UDS), count=50, opened_socket=sock, timeout=3) |
| |
| assert len(udsmsgs) == 50 |
| |
| ecu = session.ecu |
| response = ecu.supported_responses[0] |
| assert response.supports_state(EcuState()) |
| assert response.key_response.service == 80 |
| assert response.key_response.diagnosticSessionType == 3 |
| response = ecu.supported_responses[1] |
| assert response.supports_state(EcuState(session=3)) |
| assert response.key_response.service == 80 |
| assert response.key_response.diagnosticSessionType == 2 |
| response = ecu.supported_responses[4] |
| print(response) |
| state = EcuState(session=2, security_level=18) |
| print(state) |
| assert response.supports_state(state) |
| assert response.key_response.service == 110 |
| assert response.key_response.dataIdentifier == 61786 |
| assert len(ecu.log["TransferData"]) == 2 |
| |
| |
| = Analyze on the fly with EcuSession GMLAN1 |
| |
| session = EcuSession() |
| |
| conf.contribs['CAN']['swap-bytes'] = True |
| |
| with PcapReader(scapy_path("test/pcaps/gmlan_trace.pcap.gz")) as sock: |
| gmlanmsgs = sniff(session=ISOTPSession(supersession=session, rx_id=[0x241, 0x641, 0x101], basecls=GMLAN), count=2, opened_socket=sock, timeout=3) |
| ecu = session.ecu |
| print("Check 1 after change to diagnostic mode") |
| assert len(ecu.supported_responses) == 1 |
| assert ecu.state == EcuState(session=3) |
| gmlanmsgs = sniff(session=ISOTPSession(supersession=session, rx_id=[0x241, 0x641, 0x101], basecls=GMLAN), count=6, opened_socket=sock) |
| ecu = session.ecu |
| print("Check 2 after some more messages were read1") |
| assert len(ecu.supported_responses) == 3 |
| print("Check 2 after some more messages were read2") |
| assert ecu.state.session == 3 |
| print("assert 1") |
| assert ecu.state.communication_control == 1 |
| gmlanmsgs = sniff(session=ISOTPSession(supersession=session, rx_id=[0x241, 0x641, 0x101], basecls=GMLAN), count=2, opened_socket=sock) |
| ecu = session.ecu |
| print("Check 3 after change to programming mode (bootloader)") |
| assert len(ecu.supported_responses) == 4 |
| assert ecu.state.session == 2 |
| assert ecu.state.communication_control == 1 |
| gmlanmsgs = sniff(session=ISOTPSession(supersession=session, rx_id=[0x241, 0x641, 0x101], basecls=GMLAN), count=6, opened_socket=sock) |
| ecu = session.ecu |
| print("Check 4 after gaining security access") |
| assert len(ecu.supported_responses) == 6 |
| assert ecu.state.session == 2 |
| assert ecu.state.security_level == 2 |
| assert ecu.state.communication_control == 1 |
| |
| = Analyze on the fly with EcuSession GMLAN logging test |
| |
| session = EcuSession(verbose=False, store_supported_responses=False) |
| |
| conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4 |
| conf.contribs['CAN']['swap-bytes'] = True |
| |
| conf.debug_dissector = True |
| gmlanmsgs = sniff(offline=scapy_path("test/pcaps/gmlan_trace.pcap.gz"), |
| session=ISOTPSession(supersession=session, rx_id=[0x241, 0x641, 0x101], basecls=GMLAN), |
| count=200, timeout=6) |
| |
| ecu = session.ecu |
| assert len(ecu.supported_responses) == 0 |
| |
| assert len([m for m in gmlanmsgs if m.sprintf("%GMLAN.service%") == "TransferData"]) == len(ecu.log["TransferData"]) |
| assert len([m for m in gmlanmsgs if m.sprintf("%GMLAN.service%") == "RequestDownload"]) == len(ecu.log["RequestDownload"]) |
| assert len([m for m in gmlanmsgs if m.sprintf("%GMLAN.service%") == "ReadDataByIdentifier"]) == len(ecu.log["ReadDataByIdentifier"]) |
| |
| assert len(ecu.log["SecurityAccess"]) == 2 |
| assert len(ecu.log["SecurityAccessPositiveResponse"]) == 2 |
| |
| assert ecu.log["TransferData"][-1][1][0] == "downloadAndExecuteOrExecute" |