blob: 1fe7ab31d26fbd123489e34b7f0d368559e5f070 [file] [log] [blame]
% Regression tests for obdscanner
~ vcan_socket needs_root linux not_pypy automotive_comm scanner
+ Configuration
~ conf
= Imports
with open(scapy_path("test/contrib/automotive/interface_mockup.py")) as f:
exec(f.read())
load_contrib("automotive.ecu", globals_dict=globals())
+ Usage tests
= Test wrong usage
print(sys.executable)
result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
returncode = result.wait()
std_out, std_err = result.communicate()
assert returncode != 0
expected_output = plain_str(b'usage:')
assert expected_output in plain_str(std_err)
= Test show help
result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py", "--help"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
assert result.wait() == 0
std_out, std_err = result.communicate()
expected_output = plain_str(b'Scan for all possible obd service classes and their subfunctions.')
assert expected_output in plain_str(std_out)
+ Scan tests
= Load contribution layer
from scapy.contrib.automotive.obd.obd import *
+ Simulate scanner
= Test DTC scan
drain_bus(iface0)
s3 = OBD()/OBD_S03_PR(dtcs=[OBD_DTC()])
example_responses = [EcuResponse(responses=s3)]
with new_can_socket0() as isocan, ISOTPSocket(isocan, 0x7e8, 0x7e0, basecls=OBD, padding=True) as ecu, \
new_can_socket0() as isocan2, ISOTPSocket(isocan2, 0x7e0, 0x7e8, basecls=OBD, padding=True) as tester:
conf.verb = -1
answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=OBD, verbose=False)
sim = threading.Thread(target=answering_machine, kwargs={'verbose': False, 'timeout': 15, 'stop_filter': lambda p: bytes(p) == b"\x01\xff\xff\xff\xff"})
sim.start()
try:
result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
std_out1, std_err1 = result.communicate()
result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
std_out2, std_err2 = result.communicate()
except Exception as e:
print(e)
finally:
tester.send(b"\x01\xff\xff\xff\xff")
sim.join(timeout=10)
expected_output = b"1 requests were sent, 1 answered"
assert bytes_encode(expected_output) in bytes_encode(std_out1) or bytes_encode(expected_output) in bytes_encode(std_out2)
= Test supported PIDs scan
drain_bus(iface0)
s1_pid00 = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID00(supported_pids="PID03+PID0B+PID0F")])
s6_mid00 = OBD()/OBD_S06_PR(data_records=[OBD_S06_PR_Record()/OBD_MID00(supported_mids="")])
s8_tid00 = OBD()/OBD_S08_PR(data_records=[OBD_S08_PR_Record()/OBD_TID00(supported_tids="")])
s9_iid00 = OBD()/OBD_S09_PR(data_records=[OBD_S09_PR_Record()/OBD_IID00(supported_iids="")])
s1_pid03 = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID03(fuel_system1=0, fuel_system2=2)])
s1_pid0B = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID0B(data=100)])
s1_pid0F = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID0F(data=50)])
# Create answers for 'supported PIDs scan'
example_responses = \
[EcuResponse(responses=s3),
EcuResponse(responses=s1_pid00),
EcuResponse(responses=s6_mid00),
EcuResponse(responses=s8_tid00),
EcuResponse(responses=s9_iid00),
EcuResponse(responses=s1_pid03),
EcuResponse(responses=s1_pid0B),
EcuResponse(responses=s1_pid0F)]
with new_can_socket0() as isocan, ISOTPSocket(isocan, 0x7e8, 0x7e0, basecls=OBD, padding=True) as ecu, \
new_can_socket0() as isocan2, ISOTPSocket(isocan2, 0x7e0, 0x7e8, basecls=OBD, padding=True) as tester:
answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=OBD, verbose=False)
sim = threading.Thread(target=answering_machine, kwargs={'verbose': False, 'timeout': 100, 'stop_filter': lambda p: bytes(p) == b"\x01\xff\xff\xff\xff"})
sim.start()
try:
result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
std_out1, std_err1 = result.communicate()
print(std_out2, std_err2)
result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
std_out2, std_err2 = result.communicate()
print(std_out2, std_err2)
except Exception as e:
print(e)
finally:
tester.send(b"\x01\xff\xff\xff\xff")
sim.join(timeout=10)
expected_output = ["supported_pids=PID0F+PID0B+PID03", "fuel_system1=OpenLoopInsufficientEngineTemperature fuel_system2=ClosedLoop", "data=100 kPa", "data=50.0 deg. C"]
for out in expected_output:
assert bytes_encode(out) in bytes_encode(std_out1) or bytes_encode(out) in bytes_encode(std_out2)
= Test only Service 01 PIDs scan
drain_bus(iface0)
s1_pid00 = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID00(supported_pids="PID03+PID0B+PID0F")])
s6_mid00 = OBD()/OBD_S06_PR(data_records=[OBD_S06_PR_Record()/OBD_MID00(supported_mids="")])
s8_tid00 = OBD()/OBD_S08_PR(data_records=[OBD_S08_PR_Record()/OBD_TID00(supported_tids="")])
s9_iid00 = OBD()/OBD_S09_PR(data_records=[OBD_S09_PR_Record()/OBD_IID00(supported_iids="")])
s1_pid03 = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID03(fuel_system1=0, fuel_system2=2)])
s1_pid0B = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID0B(data=100)])
s1_pid0F = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID0F(data=50)])
# Create answers for 'supported PIDs scan'
example_responses = \
[EcuResponse(responses=s3),
EcuResponse(responses=s1_pid00),
EcuResponse(responses=s6_mid00),
EcuResponse(responses=s8_tid00),
EcuResponse(responses=s9_iid00),
EcuResponse(responses=s1_pid03),
EcuResponse(responses=s1_pid0B),
EcuResponse(responses=s1_pid0F)]
with new_can_socket0() as isocan, ISOTPSocket(isocan, 0x7e8, 0x7e0, basecls=OBD, padding=True) as ecu, \
new_can_socket0() as isocan2, ISOTPSocket(isocan2, 0x7e0, 0x7e8, basecls=OBD, padding=True) as tester:
answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=OBD, verbose=False)
sim = threading.Thread(target=answering_machine, kwargs={'verbose': False, 'timeout': 100, 'stop_filter': lambda p: bytes(p) == b"\x01\xff\xff\xff\xff"})
sim.start()
try:
result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30", "-1"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
std_out1, std_err1 = result.communicate()
print(std_out1, std_err1)
result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30", "-1"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
std_out2, std_err2 = result.communicate()
print(std_out2, std_err2)
except Exception as e:
print(e)
finally:
tester.send(b"\x01\xff\xff\xff\xff")
sim.join(timeout=10)
expected_output = ["supported_pids=PID0F+PID0B+PID03", "fuel_system1=OpenLoopInsufficientEngineTemperature fuel_system2=ClosedLoop", "data=100 kPa", "data=50.0 deg. C"]
for out in expected_output:
assert bytes_encode(out) in bytes_encode(std_out1) or bytes_encode(out) in bytes_encode(std_out2)
= Test full scan
drain_bus(iface0)
# Add unsupported PID
s1_pid01 = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID01()])
example_responses.append(EcuResponse(responses=s1_pid01))
with new_can_socket0() as isocan, ISOTPSocket(isocan, 0x7e8, 0x7e0, basecls=OBD, padding=True) as ecu, \
new_can_socket0() as isocan2, ISOTPSocket(isocan2, 0x7e0, 0x7e8, basecls=OBD, padding=True) as tester:
answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=OBD, verbose=False)
sim = threading.Thread(target=answering_machine, kwargs={'verbose': False, 'timeout': 100, 'stop_filter': lambda p: bytes(p) == b"\x01\xff\xff\xff\xff"})
sim.start()
try:
result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30", "-f", "-1", "-3"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
std_out1, std_err1 = result.communicate()
result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30", "-f", "-1", "-3"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
std_out2, std_err2 = result.communicate()
except Exception as e:
print(e)
finally:
tester.send(b"\x01\xff\xff\xff\xff")
sim.join(timeout=10)
expected_output = ["256 requests were sent", "1 requests were sent, 1 answered"]
for out in expected_output:
assert bytes_encode(out) in bytes_encode(std_out1) or bytes_encode(out) in bytes_encode(std_out2)
+ Cleanup
= Delete vcan interfaces
assert cleanup_interfaces()