blob: 4ced520357bef261d5a57f0b9e15dcb64bf173ae [file] [log] [blame]
% Regression tests for the Ecu utility
# More information at http://www.secdev.org/projects/UTscapy/
############
############
+ Setup
~ conf command
= Load modules
import copy
import itertools
load_contrib("isotp", globals_dict=globals())
load_contrib("automotive.uds", globals_dict=globals())
load_contrib("automotive.gm.gmlan", globals_dict=globals())
load_layer("can", globals_dict=globals())
conf.contribs["CAN"]["swap-bytes"] = True
= Load Ecu module
load_contrib("automotive.ecu", globals_dict=globals())
from scapy.contrib.automotive.uds_ecu_states import *
from scapy.contrib.automotive.uds_logging import *
from scapy.contrib.automotive.gm.gmlan_ecu_states import *
from scapy.contrib.automotive.gm.gmlan_logging import *
+ EcuState Basic checks
= Check EcuState basic functionality
state = EcuState()
state["session"] = 2
state["securityAccess"] = 4
print(repr(state))
assert repr(state) == "securityAccess4session2"
= More complex tests
state = EcuState(ses=4)
assert state.ses == 4
state.ses = 5
assert state.ses == 5
= Even more complex tests
state = EcuState(myinfo="42")
state.ses = 5
assert state.ses == 5
state["ses"] = None
assert state.ses is None
state.ses = 5
assert 5 == state.ses
assert "42" == state.myinfo
assert repr(state) == "myinfo42ses5"
= Delete Attribute Test
state = EcuState(myinfo="42")
state.ses = 5
assert state.ses == 5
del state.ses
try:
x = state.ses
assert False
except (KeyError, AttributeError):
assert state.myinfo == "42"
= Copy tests
state = EcuState(myinfo="42")
state.ses = 5
ns = copy.copy(state)
ns.ses = 6
assert ns.ses == 6
assert state.ses == 5
assert ns.myinfo == "42"
= Move tests
state = EcuState(myinfo="42")
state.ses = 5
ns = state
ns.ses = 6
assert ns.ses == 6
assert state.ses == 6
assert ns.myinfo == "42"
= equal tests
state = EcuState(myinfo="42")
state.ses = 5
ns = copy.copy(state)
assert state == ns
assert hash(state) == hash(ns)
ns.ses = 6
assert state != ns
assert hash(state) != hash(ns)
ns.ses = 5
assert state == ns
assert hash(state) == hash(ns)
ns.sa = 5
assert state != ns
assert hash(state) != hash(ns)
= hash tests
state = EcuState(myinfo="42")
state.ses = 5
ns = copy.copy(state)
assert hash(state) == hash(ns)
ns.ses = 6
assert hash(state) != hash(ns)
ns.ses = 5
assert hash(state) == hash(ns)
ns.sa = 5
assert hash(state) != hash(ns)
= command tests
state = EcuState(myinfo="42")
state.ses = 5
state.command()
assert "EcuState(myinfo='42', ses=5)" == state.command()
= less than tests
s1 = EcuState()
s2 = EcuState()
s1.a = 1
s2.a = 2
assert s1 < s2
s1.b = 4
assert s1 > s2
s2.b = 1
assert s1 < s2
s1.a = 2
assert s1 > s2
= less than tests 2
s1 = EcuState()
s2 = EcuState()
s1.c = "x"
s2.c = 4
exception = False
try:
assert s1 < s2
except TypeError:
exception = True
assert exception
= less than tests 3
s1 = EcuState()
s2 = EcuState()
s1.A = 1
s1.a = 2
s2.A = 2
s2.a = 1
assert s1 < s2
= less than tests 4
s1 = EcuState()
s2 = EcuState()
s1.A = 1
s1.a = 2
s2.A = 2
s2.b = 100
assert s1 < s2
= less than tests 5
s1 = EcuState()
s2 = EcuState()
s1.A = 100
s1.a = 2
s2.A = 2
s2.b = 100
assert s1 > s2
assert not s1 > s1
assert not s1 < s1
= less than tests 6
s1 = EcuState()
s2 = EcuState()
s1.A = 100
s1.B = 200
s2.a = 2
s2.b = 1
assert s1 < s2
= contains test
s1 = EcuState(ses=[1,2,3])
s2 = EcuState(ses=1)
assert s1 != s2
assert s2 in s1
assert s1 not in s2
s1 = EcuState(ses=[2,3])
s2 = EcuState(ses=1)
assert s1 != s2
assert s2 not in s1
assert s1 not in s2
s1 = EcuState(ses=[1,2,3], security=5)
s2 = EcuState(ses=1)
assert s1 != s2
assert s2 not in s1
assert s1 not in s2
s1 = EcuState(ses=range(4), security=[None, 5])
s2 = EcuState(ses=1)
assert s1 != s2
assert s2 in s1
assert s1 not in s2
s1 = EcuState(ses=range(4), security=[None, 5])
s2 = EcuState(ses=range(2))
assert s1 != s2
assert s2 < s1
assert s2 in s1
assert s1 not in s2
s1 = EcuState(ses=range(4), security=[None, 5])
s2 = EcuState(ses=range(2), security=5)
assert s1 != s2
assert s2 in s1
assert s1 not in s2
s1 = EcuState(ses=range(4), security=[None, 5])
s2 = EcuState(ses=range(5))
assert s1 != s2
assert s2 not in s1
assert s1 not in s2
s1 = EcuState(ses=range(4), security=[None, range(5)])
s2 = EcuState(ses=3)
print(s1._expand())
print(s2._expand())
assert s1 != s2
assert s2 in s1
assert s1 not in s2
s1 = EcuState(ses=range(4), security=[None, range(5), [5, 7, range(4), [range(10), 10]]])
s2 = EcuState(ses=3, security=10)
print(s1._expand())
print(s2._expand())
assert s1 != s2
assert s2 in s1
assert s1 not in s2
s1 = EcuState(ses=range(4), security=[None, range(5), [5, 7, range(4), [range(10), "B"]]])
s2 = EcuState(ses=3, security="B")
print(s1._expand())
print(s2._expand())
assert s1 != s2
assert s2 in s1
assert s1 not in s2
s1 = EcuState(ses=range(4), security=[None, range(5), [5, 7, range(4), [range(10), "B"]]])
s2 = EcuState(ses=3, security="C")
print(s1._expand())
print(s2._expand())
assert s1 != s2
assert s2 not in s1
assert s1 not in s2
s1 = EcuState(ses=range(3), security=5)
s2 = EcuState(ses=1, security=5)
assert s1 != s2
assert s2 in s1
assert s1 not in s2
s1 = EcuState(ses=range(3), security=(x for x in range(1, 10, 2)))
s2 = EcuState(ses=1, security=5)
assert s1 != s2
assert s2 in s1
assert s1 not in s2
s1 = EcuState(ses=[1,2,3])
s2 = EcuState(ses=[1,2,3])
assert s1 in s2
assert s2 in s1
assert s1 == s2
s1 = EcuState(ses=1)
s2 = EcuState(ses=1)
assert s1 in s2
assert s2 in s1
assert s1 == s2
s1 = EcuState(ses=range(3), security=range(5))
for ses, sec in itertools.product(range(3), range(5)):
s2 = EcuState(ses=ses, security=sec)
assert s1 != s2
assert s2 in s1
assert s1 not in s2
s1 = EcuState(ses=[0, 1, 2], security=[43, 44])
for ses, sec in itertools.product(range(3), range(43, 45)):
s2 = EcuState(ses=ses, security=sec)
assert s1 != s2
assert s2 in s1
assert s1 not in s2
s1 = EcuState(ses=[0, 1, 2], security=["a", "b"])
for ses, sec in itertools.product(range(3), (x for x in "ab")):
s2 = EcuState(ses=ses, security=sec)
assert s1 != s2
assert s2 in s1
try:
assert s1 not in s2
except TypeError:
assert True
s1 = [EcuState(ses=1), EcuState(ses=2), EcuState(ses=3)]
s2 = EcuState(ses=3)
assert s2 in s1
assert s1 not in s2
s1 = EcuState(ses=1, sa="SEC")
s2 = EcuState(ses=1, sa="SOC")
assert s1 not in s2
assert s2 not in s1
assert s1 != s2
s1 = EcuState(ses=1, sa="SEC")
s2 = EcuState(ses=1, sa="SEC")
assert s1 in s2
assert s2 in s1
assert s1 == s2
s1 = EcuState(ses=1, sa="SEC")
s2 = EcuState(ses=1, sa=["SEC", "SOL"])
assert s1 in s2
assert s2 not in s1
assert s1 != s2
s1 = EcuState(ses=1, sa=b"SEC")
s2 = EcuState(ses=1, sa=[b"SEC", "SOL"])
assert s1 in s2
assert s2 not in s1
assert s1 != s2
+ EcuState modification tests
= Basic definitions for tests
class myPack1(Packet):
fields_desc = [
IntField("fakefield", 1)
]
class myPack2(Packet):
fields_desc = [
IntField("statefield", 1)
]
@EcuState.extend_pkt_with_modifier(myPack2)
def modify_ecu_state(self, req, ecustate):
# type: (Packet, Packet, EcuState) -> None
ecustate.state = self.statefield
pkt = myPack1()/myPack2()
st = EcuState()
exception = False
try:
assert st.state == 1
except AttributeError:
exception = True
assert exception == True
assert EcuState.is_modifier_pkt(pkt)
assert not EcuState.is_modifier_pkt(myPack1())
mod = EcuState.get_modified_ecu_state(pkt, Raw(), st)
assert mod != st
assert mod.state ==1
pkt2 = myPack1()/myPack1()/myPack1()/myPack2(statefield=5)
mod2 = EcuState.get_modified_ecu_state(pkt2, Raw(), mod)
assert mod != mod2
assert mod < mod2
pkt2 = myPack1()/myPack1()/myPack1()/myPack2(statefield=4)/myPack2(statefield=5)
mod2 = EcuState.get_modified_ecu_state(pkt2, Raw(), mod)
mod.state = 5
assert mod != mod2
assert mod > mod2
+ EcuResponse tests
= Basic checks
resp = EcuResponse(EcuState(session=1), UDS()/UDS_DSCPR(b"\x03"))
assert not resp.supports_state(EcuState())
assert not resp.supports_state(EcuState(session=2))
assert resp.supports_state(EcuState(session=1))
assert resp.answers(UDS()/UDS_DSC(b"\x03"))
= Command checks
resp = EcuResponse(EcuState(session=1), UDS()/UDS_DSCPR(b"\x03"))
cmd = resp.command()
print(cmd)
resp1 = eval(cmd)
assert resp1 == resp
= Command checks 2
p1 = UDS(bytes(UDS()/UDS_NR(b"\x10\x00")))
p2 = UDS(bytes(UDS()/UDS_DSCPR(b"\x03")))
resp = EcuResponse([EcuState(session=1), EcuState(session=3)], [p1, p2])
cmd = resp.command()
print(cmd)
resp1 = eval(cmd)
assert any(resp1.supports_state(s) for s in resp.states)
assert any(resp.supports_state(s) for s in resp1.states)
assert len(resp.responses) == len(resp1.responses)
assert all(bytes(x) == bytes(y) for x, y in zip(resp.responses, resp1.responses))
assert resp1 == resp
= Compare check
p1 = UDS(bytes(UDS()/UDS_NR(b"\x10\x00")))
p2 = UDS(bytes(UDS()/UDS_DSCPR(b"\x03")))
resp = EcuResponse([EcuState(session=1), EcuState(session=3)], [p1, p2])
resp1 = EcuResponse([EcuState(session=1)], [p1, p2])
resp2 = EcuResponse([EcuState(session=2)], [p1, p2])
resp3 = EcuResponse([EcuState(session=1)], [p2])
assert resp == resp1
assert resp != resp2
assert resp != resp3
= Key response check
req = UDS()/UDS_DSC(b"\x03")
p1 = UDS(bytes(UDS()/UDS_NR(b"\x10\x00")))
p2 = UDS(bytes(UDS()/UDS_DSCPR(b"\x03")))
resp = EcuResponse([EcuState(session=1), EcuState(session=3)], [p1, p2])
assert resp.answers(req)
assert resp.key_response.answers(req)
+ Ecu Simple operations
= Log all commands applied to an Ecu
msgs = [UDS(service=16) / UDS_DSC(diagnosticSessionType=3),
UDS(service=16) / UDS_DSC(diagnosticSessionType=4),
UDS(service=16) / UDS_DSC(diagnosticSessionType=5),
UDS(service=16) / UDS_DSC(diagnosticSessionType=6),
UDS(service=16) / UDS_DSC(diagnosticSessionType=2)]
ecu = Ecu(verbose=False, store_supported_responses=False)
ecu.update(PacketList(msgs))
assert len(ecu.log["DiagnosticSessionControl"]) == 5
timestamp, value = ecu.log["DiagnosticSessionControl"][0]
assert timestamp > 0
assert value == "extendedDiagnosticSession"
assert ecu.log["DiagnosticSessionControl"][-1][1] == "programmingSession"
= Trace all commands applied to an Ecu
msgs = [UDS(service=16) / UDS_DSC(diagnosticSessionType=3),
UDS(service=80) / UDS_DSCPR(diagnosticSessionType=3, sessionParameterRecord=b'\\x002\\x01\\xf4')]
ecu = Ecu(verbose=True, logging=False, store_supported_responses=False)
ecu.update(PacketList(msgs))
assert ecu.state.session == 3
= Generate supported responses of an Ecu
msgs = [UDS(service=16) / UDS_DSC(diagnosticSessionType=3),
UDS(service=80) / UDS_DSCPR(diagnosticSessionType=3, sessionParameterRecord=b'\\x002\\x01\\xf4'),
UDS(service=16) / UDS_DSC(diagnosticSessionType=4)]
ecu = Ecu(verbose=False, logging=False, store_supported_responses=True)
ecu.update(PacketList(msgs))
supported_responses = ecu.supported_responses
unanswered_packets = ecu.unanswered_packets
assert ecu.state.session == 3
assert len(supported_responses) == 1
assert len(unanswered_packets) == 1
response = supported_responses[0]
print(response.command())
assert response.supports_state(EcuState())
assert response.key_response.service == 80
assert unanswered_packets[0].diagnosticSessionType == 4
+ Ecu Advanced checks
= Analyze multiple UDS messages
udsmsgs = sniff(offline=scapy_path("test/pcaps/ecu_trace.pcap.gz"),
session=ISOTPSession(use_ext_address=False, basecls=UDS),
count=50, timeout=3)
assert len(udsmsgs) == 50
ecu = Ecu()
ecu.update(udsmsgs)
response = ecu.supported_responses[0]
assert response.supports_state(EcuState())
assert response.key_response.service == 80
assert response.key_response.diagnosticSessionType == 3
response = ecu.supported_responses[1]
assert response.supports_state(EcuState(session=3))
assert response.key_response.service == 80
assert response.key_response.diagnosticSessionType == 2
response = ecu.supported_responses[4]
print(response)
state = EcuState(session=2, security_level=18)
print(state)
assert response.supports_state(state)
assert response.key_response.service == 110
assert response.key_response.dataIdentifier == 61786
assert len(ecu.log["TransferData"]) == 2
+ EcuSession tests
= Analyze on the fly with EcuSession
session = EcuSession()
with PcapReader(scapy_path("test/pcaps/ecu_trace.pcap.gz")) as sock:
udsmsgs = sniff(session=ISOTPSession(supersession=session, use_ext_address=False, basecls=UDS), count=50, opened_socket=sock, timeout=3)
assert len(udsmsgs) == 50
ecu = session.ecu
response = ecu.supported_responses[0]
assert response.supports_state(EcuState())
assert response.key_response.service == 80
assert response.key_response.diagnosticSessionType == 3
response = ecu.supported_responses[1]
assert response.supports_state(EcuState(session=3))
assert response.key_response.service == 80
assert response.key_response.diagnosticSessionType == 2
response = ecu.supported_responses[4]
print(response)
state = EcuState(session=2, security_level=18)
print(state)
assert response.supports_state(state)
assert response.key_response.service == 110
assert response.key_response.dataIdentifier == 61786
assert len(ecu.log["TransferData"]) == 2
= Analyze on the fly with EcuSession GMLAN1
session = EcuSession()
conf.contribs['CAN']['swap-bytes'] = True
with PcapReader(scapy_path("test/pcaps/gmlan_trace.pcap.gz")) as sock:
gmlanmsgs = sniff(session=ISOTPSession(supersession=session, rx_id=[0x241, 0x641, 0x101], basecls=GMLAN), count=2, opened_socket=sock, timeout=3)
ecu = session.ecu
print("Check 1 after change to diagnostic mode")
assert len(ecu.supported_responses) == 1
assert ecu.state == EcuState(session=3)
gmlanmsgs = sniff(session=ISOTPSession(supersession=session, rx_id=[0x241, 0x641, 0x101], basecls=GMLAN), count=6, opened_socket=sock)
ecu = session.ecu
print("Check 2 after some more messages were read1")
assert len(ecu.supported_responses) == 3
print("Check 2 after some more messages were read2")
assert ecu.state.session == 3
print("assert 1")
assert ecu.state.communication_control == 1
gmlanmsgs = sniff(session=ISOTPSession(supersession=session, rx_id=[0x241, 0x641, 0x101], basecls=GMLAN), count=2, opened_socket=sock)
ecu = session.ecu
print("Check 3 after change to programming mode (bootloader)")
assert len(ecu.supported_responses) == 4
assert ecu.state.session == 2
assert ecu.state.communication_control == 1
gmlanmsgs = sniff(session=ISOTPSession(supersession=session, rx_id=[0x241, 0x641, 0x101], basecls=GMLAN), count=6, opened_socket=sock)
ecu = session.ecu
print("Check 4 after gaining security access")
assert len(ecu.supported_responses) == 6
assert ecu.state.session == 2
assert ecu.state.security_level == 2
assert ecu.state.communication_control == 1
= Analyze on the fly with EcuSession GMLAN logging test
session = EcuSession(verbose=False, store_supported_responses=False)
conf.contribs['GMLAN']['GMLAN_ECU_AddressingScheme'] = 4
conf.contribs['CAN']['swap-bytes'] = True
conf.debug_dissector = True
gmlanmsgs = sniff(offline=scapy_path("test/pcaps/gmlan_trace.pcap.gz"),
session=ISOTPSession(supersession=session, rx_id=[0x241, 0x641, 0x101], basecls=GMLAN),
count=200, timeout=6)
ecu = session.ecu
assert len(ecu.supported_responses) == 0
assert len([m for m in gmlanmsgs if m.sprintf("%GMLAN.service%") == "TransferData"]) == len(ecu.log["TransferData"])
assert len([m for m in gmlanmsgs if m.sprintf("%GMLAN.service%") == "RequestDownload"]) == len(ecu.log["RequestDownload"])
assert len([m for m in gmlanmsgs if m.sprintf("%GMLAN.service%") == "ReadDataByIdentifier"]) == len(ecu.log["ReadDataByIdentifier"])
assert len(ecu.log["SecurityAccess"]) == 2
assert len(ecu.log["SecurityAccessPositiveResponse"]) == 2
assert ecu.log["TransferData"][-1][1][0] == "downloadAndExecuteOrExecute"