| % Regression tests for obdscanner |
| ~ vcan_socket needs_root linux not_pypy automotive_comm scanner |
| |
| + Configuration |
| ~ conf |
| |
| = Imports |
| with open(scapy_path("test/contrib/automotive/interface_mockup.py")) as f: |
| exec(f.read()) |
| |
| load_contrib("automotive.ecu", globals_dict=globals()) |
| |
| + Usage tests |
| |
| = Test wrong usage |
| print(sys.executable) |
| result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| returncode = result.wait() |
| std_out, std_err = result.communicate() |
| assert returncode != 0 |
| |
| expected_output = plain_str(b'usage:') |
| assert expected_output in plain_str(std_err) |
| |
| |
| = Test show help |
| result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py", "--help"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| assert result.wait() == 0 |
| std_out, std_err = result.communicate() |
| expected_output = plain_str(b'Scan for all possible obd service classes and their subfunctions.') |
| assert expected_output in plain_str(std_out) |
| |
| + Scan tests |
| |
| = Load contribution layer |
| |
| from scapy.contrib.automotive.obd.obd import * |
| |
| + Simulate scanner |
| |
| = Test DTC scan |
| |
| drain_bus(iface0) |
| |
| s3 = OBD()/OBD_S03_PR(dtcs=[OBD_DTC()]) |
| |
| example_responses = [EcuResponse(responses=s3)] |
| |
| with new_can_socket0() as isocan, ISOTPSocket(isocan, 0x7e8, 0x7e0, basecls=OBD, padding=True) as ecu, \ |
| new_can_socket0() as isocan2, ISOTPSocket(isocan2, 0x7e0, 0x7e8, basecls=OBD, padding=True) as tester: |
| conf.verb = -1 |
| answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=OBD, verbose=False) |
| sim = threading.Thread(target=answering_machine, kwargs={'verbose': False, 'timeout': 15, 'stop_filter': lambda p: bytes(p) == b"\x01\xff\xff\xff\xff"}) |
| sim.start() |
| try: |
| result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| std_out1, std_err1 = result.communicate() |
| result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| std_out2, std_err2 = result.communicate() |
| except Exception as e: |
| print(e) |
| finally: |
| tester.send(b"\x01\xff\xff\xff\xff") |
| sim.join(timeout=10) |
| expected_output = b"1 requests were sent, 1 answered" |
| assert bytes_encode(expected_output) in bytes_encode(std_out1) or bytes_encode(expected_output) in bytes_encode(std_out2) |
| |
| = Test supported PIDs scan |
| |
| drain_bus(iface0) |
| |
| s1_pid00 = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID00(supported_pids="PID03+PID0B+PID0F")]) |
| s6_mid00 = OBD()/OBD_S06_PR(data_records=[OBD_S06_PR_Record()/OBD_MID00(supported_mids="")]) |
| s8_tid00 = OBD()/OBD_S08_PR(data_records=[OBD_S08_PR_Record()/OBD_TID00(supported_tids="")]) |
| s9_iid00 = OBD()/OBD_S09_PR(data_records=[OBD_S09_PR_Record()/OBD_IID00(supported_iids="")]) |
| |
| s1_pid03 = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID03(fuel_system1=0, fuel_system2=2)]) |
| s1_pid0B = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID0B(data=100)]) |
| s1_pid0F = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID0F(data=50)]) |
| |
| # Create answers for 'supported PIDs scan' |
| example_responses = \ |
| [EcuResponse(responses=s3), |
| EcuResponse(responses=s1_pid00), |
| EcuResponse(responses=s6_mid00), |
| EcuResponse(responses=s8_tid00), |
| EcuResponse(responses=s9_iid00), |
| EcuResponse(responses=s1_pid03), |
| EcuResponse(responses=s1_pid0B), |
| EcuResponse(responses=s1_pid0F)] |
| |
| |
| |
| with new_can_socket0() as isocan, ISOTPSocket(isocan, 0x7e8, 0x7e0, basecls=OBD, padding=True) as ecu, \ |
| new_can_socket0() as isocan2, ISOTPSocket(isocan2, 0x7e0, 0x7e8, basecls=OBD, padding=True) as tester: |
| answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=OBD, verbose=False) |
| sim = threading.Thread(target=answering_machine, kwargs={'verbose': False, 'timeout': 100, 'stop_filter': lambda p: bytes(p) == b"\x01\xff\xff\xff\xff"}) |
| sim.start() |
| try: |
| result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| std_out1, std_err1 = result.communicate() |
| print(std_out2, std_err2) |
| result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| std_out2, std_err2 = result.communicate() |
| print(std_out2, std_err2) |
| except Exception as e: |
| print(e) |
| finally: |
| tester.send(b"\x01\xff\xff\xff\xff") |
| sim.join(timeout=10) |
| expected_output = ["supported_pids=PID0F+PID0B+PID03", "fuel_system1=OpenLoopInsufficientEngineTemperature fuel_system2=ClosedLoop", "data=100 kPa", "data=50.0 deg. C"] |
| for out in expected_output: |
| assert bytes_encode(out) in bytes_encode(std_out1) or bytes_encode(out) in bytes_encode(std_out2) |
| |
| = Test only Service 01 PIDs scan |
| |
| drain_bus(iface0) |
| |
| s1_pid00 = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID00(supported_pids="PID03+PID0B+PID0F")]) |
| s6_mid00 = OBD()/OBD_S06_PR(data_records=[OBD_S06_PR_Record()/OBD_MID00(supported_mids="")]) |
| s8_tid00 = OBD()/OBD_S08_PR(data_records=[OBD_S08_PR_Record()/OBD_TID00(supported_tids="")]) |
| s9_iid00 = OBD()/OBD_S09_PR(data_records=[OBD_S09_PR_Record()/OBD_IID00(supported_iids="")]) |
| |
| s1_pid03 = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID03(fuel_system1=0, fuel_system2=2)]) |
| s1_pid0B = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID0B(data=100)]) |
| s1_pid0F = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID0F(data=50)]) |
| |
| # Create answers for 'supported PIDs scan' |
| example_responses = \ |
| [EcuResponse(responses=s3), |
| EcuResponse(responses=s1_pid00), |
| EcuResponse(responses=s6_mid00), |
| EcuResponse(responses=s8_tid00), |
| EcuResponse(responses=s9_iid00), |
| EcuResponse(responses=s1_pid03), |
| EcuResponse(responses=s1_pid0B), |
| EcuResponse(responses=s1_pid0F)] |
| |
| |
| |
| with new_can_socket0() as isocan, ISOTPSocket(isocan, 0x7e8, 0x7e0, basecls=OBD, padding=True) as ecu, \ |
| new_can_socket0() as isocan2, ISOTPSocket(isocan2, 0x7e0, 0x7e8, basecls=OBD, padding=True) as tester: |
| answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=OBD, verbose=False) |
| sim = threading.Thread(target=answering_machine, kwargs={'verbose': False, 'timeout': 100, 'stop_filter': lambda p: bytes(p) == b"\x01\xff\xff\xff\xff"}) |
| sim.start() |
| try: |
| result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30", "-1"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| std_out1, std_err1 = result.communicate() |
| print(std_out1, std_err1) |
| result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30", "-1"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| std_out2, std_err2 = result.communicate() |
| print(std_out2, std_err2) |
| except Exception as e: |
| print(e) |
| finally: |
| tester.send(b"\x01\xff\xff\xff\xff") |
| sim.join(timeout=10) |
| expected_output = ["supported_pids=PID0F+PID0B+PID03", "fuel_system1=OpenLoopInsufficientEngineTemperature fuel_system2=ClosedLoop", "data=100 kPa", "data=50.0 deg. C"] |
| for out in expected_output: |
| assert bytes_encode(out) in bytes_encode(std_out1) or bytes_encode(out) in bytes_encode(std_out2) |
| |
| |
| = Test full scan |
| |
| drain_bus(iface0) |
| |
| # Add unsupported PID |
| s1_pid01 = OBD()/OBD_S01_PR(data_records=[OBD_S01_PR_Record()/OBD_PID01()]) |
| example_responses.append(EcuResponse(responses=s1_pid01)) |
| |
| with new_can_socket0() as isocan, ISOTPSocket(isocan, 0x7e8, 0x7e0, basecls=OBD, padding=True) as ecu, \ |
| new_can_socket0() as isocan2, ISOTPSocket(isocan2, 0x7e0, 0x7e8, basecls=OBD, padding=True) as tester: |
| answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=OBD, verbose=False) |
| sim = threading.Thread(target=answering_machine, kwargs={'verbose': False, 'timeout': 100, 'stop_filter': lambda p: bytes(p) == b"\x01\xff\xff\xff\xff"}) |
| sim.start() |
| try: |
| result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30", "-f", "-1", "-3"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| std_out1, std_err1 = result.communicate() |
| result = subprocess.Popen([sys.executable, "scapy/tools/automotive/obdscanner.py"] + can_socket_string_list + ["-s", "0x7e0", "-d", "0x7e8", "-t", "0.30", "-f", "-1", "-3"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
| std_out2, std_err2 = result.communicate() |
| except Exception as e: |
| print(e) |
| finally: |
| tester.send(b"\x01\xff\xff\xff\xff") |
| sim.join(timeout=10) |
| expected_output = ["256 requests were sent", "1 requests were sent, 1 answered"] |
| for out in expected_output: |
| assert bytes_encode(out) in bytes_encode(std_out1) or bytes_encode(out) in bytes_encode(std_out2) |
| |
| + Cleanup |
| |
| = Delete vcan interfaces |
| |
| assert cleanup_interfaces() |