| % Regression tests for the CANSocket |
| ~ vcan_socket linux needs_root not_pypy |
| |
| # More information at http://www.secdev.org/projects/UTscapy/ |
| |
| |
| ############ |
| ############ |
| + Configuration of CAN virtual sockets |
| |
| = Load module |
| ~ conf |
| |
| conf.contribs['CAN'] = {'swap-bytes': False, 'remove-padding': True} |
| load_layer("can", globals_dict=globals()) |
| conf.contribs['CANSocket'] = {'use-python-can': True} |
| from scapy.contrib.cansocket_python_can import * |
| |
| = Setup string for vcan |
| ~ conf command |
| bashCommand = "/bin/bash -c 'sudo modprobe vcan; sudo ip link add name vcan0 type vcan; sudo ip link set dev vcan0 up'" |
| |
| = Load os |
| ~ conf command |
| |
| import os |
| import threading |
| from subprocess import call |
| |
| = Setup vcan0 |
| ~ conf command |
| |
| 0 == os.system(bashCommand) |
| |
| = Define common used functions |
| |
| send_done = threading.Event() |
| |
| def sender(sock, msg): |
| if not hasattr(msg, "__iter__"): |
| msg = [msg] |
| for m in msg: |
| sock.send(m) |
| send_done.set() |
| |
| + Basic Packet Tests() |
| = CAN Packet init |
| |
| canframe = CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') |
| bytes(canframe) == b'\x00\x00\x07\xff\x08\x00\x00\x00\x01\x02\x03\x04\x05\x06\x07\x08' |
| |
| + Basic Socket Tests() |
| = CAN Socket Init |
| |
| sock1 = CANSocket(bustype='socketcan', channel='vcan0') |
| sock1.close() |
| del sock1 |
| sock1 = None |
| |
| = CAN Socket send recv small packet |
| |
| sock1 = CANSocket(bustype='socketcan', channel='vcan0') |
| sock2 = CANSocket(bustype='socketcan', channel='vcan0') |
| |
| sock2.send(CAN(identifier=0x7ff,length=1,data=b'\x01')) |
| rx = sock1.recv() |
| sock1.close() |
| sock2.close() |
| |
| assert rx == CAN(identifier=0x7ff,length=1,data=b'\x01') |
| |
| = CAN Socket send recv small packet test with |
| |
| with CANSocket(bustype='socketcan', channel='vcan0') as sock1, \ |
| CANSocket(bustype='socketcan', channel='vcan0') as sock2: |
| sock2.send(CAN(identifier=0x7ff,length=1,data=b'\x01')) |
| rx = sock1.recv() |
| |
| assert rx == CAN(identifier=0x7ff,length=1,data=b'\x01') |
| |
| |
| = CAN Socket send recv ISOTP_Packet |
| |
| from scapy.contrib.isotp import ISOTPHeader, ISOTP_FF |
| |
| with CANSocket(bustype='socketcan', channel='vcan0') as sock1, \ |
| CANSocket(bustype='socketcan', channel='vcan0') as sock2: |
| sock2.send(ISOTPHeader(identifier=0x7ff)/ISOTP_FF(message_size=100, data=b'abcdef')) |
| rx = sock1.recv() |
| assert rx == CAN(identifier=0x7ff,length=8,data=b'\x10\x64abcdef') |
| |
| |
| = CAN Socket basecls test |
| |
| with CANSocket(bustype='socketcan', channel='vcan0') as sock1, \ |
| CANSocket(bustype='socketcan', channel='vcan0') as sock2: |
| sock1.basecls = Raw |
| sock2.send(CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| rx = sock1.recv() |
| assert rx == Raw(bytes(CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))) |
| |
| = CAN Socket send recv |
| |
| with CANSocket(bustype='socketcan', channel='vcan0') as sock1, \ |
| CANSocket(bustype='socketcan', channel='vcan0') as sock2: |
| sock2.send(CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock1.basecls = CAN |
| rx = sock1.recv() |
| assert rx == CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') |
| |
| = CAN Socket send recv swapped |
| |
| conf.contribs['CAN']['swap-bytes'] = True |
| |
| with CANSocket(bustype='socketcan', channel='vcan0') as sock1, \ |
| CANSocket(bustype='socketcan', channel='vcan0') as sock2: |
| sock2.send(CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock1.basecls = CAN |
| rx = sock1.recv() |
| assert rx == CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') |
| |
| conf.contribs['CAN']['swap-bytes'] = False |
| |
| = sniff with filtermask 0x7ff |
| |
| msgs = [CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(identifier=0x100, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')] |
| |
| with CANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x200, 'can_mask': 0x7ff}]) as sock1, \ |
| CANSocket(bustype='socketcan', channel='vcan0') as sock2: |
| for m in msgs: |
| sock2.send(m) |
| packets = sock1.sniff(timeout=0.1, count=3) |
| assert len(packets) == 3 |
| |
| |
| = sniff with filtermask 0x700 |
| |
| msgs = [CAN(identifier=0x212, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(identifier=0x2ff, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(identifier=0x1ff, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(identifier=0x2aa, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')] |
| |
| with CANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x200, 'can_mask': 0x700}]) as sock1, \ |
| CANSocket(bustype='socketcan', channel='vcan0') as sock2: |
| for m in msgs: |
| sock2.send(m) |
| packets = sock1.sniff(timeout=0.1, count=4) |
| assert len(packets) == 4 |
| |
| = sniff with filtermask 0x0ff |
| |
| msgs = [CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(identifier=0x301, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(identifier=0x1ff, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(identifier=0x700, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(identifier=0x100, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')] |
| |
| with CANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x200, 'can_mask': 0xff}]) as sock1, \ |
| CANSocket(bustype='socketcan', channel='vcan0') as sock2: |
| for m in msgs: |
| sock2.send(m) |
| packets = sock1.sniff(timeout=0.1, count=4) |
| assert len(packets) == 4 |
| |
| = sniff with multiple filters |
| |
| msgs = [CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(identifier=0x400, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(identifier=0x500, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(identifier=0x600, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(identifier=0x700, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(identifier=0x7ff, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')] |
| |
| with CANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x200, 'can_mask': 0x7ff}, {'can_id': 0x400, 'can_mask': 0x7ff}, {'can_id': 0x600, 'can_mask': 0x7ff}, {'can_id': 0x7ff, 'can_mask': 0x7ff}]) as sock1, \ |
| CANSocket(bustype='socketcan', channel='vcan0') as sock2: |
| for m in msgs: |
| sock2.send(m) |
| packets = sock1.sniff(timeout=0.1, count=4) |
| assert len(packets) == 4 |
| |
| = sniff with filtermask 0x7ff |
| |
| msgs = [CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(identifier=0x100, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')] |
| |
| |
| with CANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x200, 'can_mask': 0x7ff}]) as sock1, \ |
| CANSocket(bustype='socketcan', channel='vcan0') as sock2: |
| for m in msgs: |
| sock2.send(m) |
| packets = sock1.sniff(timeout=0.1, count=4) |
| assert len(packets) == 4 |
| |
| = sniff with filtermask 0x1FFFFFFF |
| |
| msgs = [CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(flags='extended', identifier=0x10020000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(flags='extended', identifier=0x10030000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(flags='extended', identifier=0x10040000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'), |
| CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')] |
| |
| with CANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x10000000, 'can_mask': 0x1fffffff}]) as sock1, \ |
| CANSocket(bustype='socketcan', channel='vcan0') as sock2: |
| for m in msgs: |
| sock2.send(m) |
| packets = sock1.sniff(timeout=0.1, count=2) |
| assert len(packets) == 2 |
| |
| |
| + bridge and sniff tests |
| = bridge and sniff setup vcan1 package forwarding |
| |
| bashCommand = "/bin/bash -c 'sudo ip link add name vcan1 type vcan; sudo ip link set dev vcan1 up'" |
| assert 0 == os.system(bashCommand) |
| |
| sock0 = CANSocket(bustype='socketcan', channel='vcan0') |
| sock1 = CANSocket(bustype='socketcan', channel='vcan1') |
| |
| bridgeStarted = threading.Event() |
| def bridge(): |
| global bridgeStarted |
| bSock0 = CANSocket( |
| bustype='socketcan', channel='vcan0', |
| bitrate=250000) |
| bSock1 = CANSocket( |
| bustype='socketcan', channel='vcan1', |
| bitrate=250000) |
| def pnr(pkt): |
| return pkt |
| bSock0.timeout = 0.01 |
| bSock1.timeout = 0.01 |
| bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, xfrm21=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=6) |
| bSock0.close() |
| bSock1.close() |
| |
| threadBridge = threading.Thread(target=bridge) |
| threadBridge.start() |
| bridgeStarted.wait(timeout=1) |
| |
| sock0.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock0.send(CAN(flags='extended', identifier=0x10020000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock0.send(CAN(flags='extended', identifier=0x10030000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock0.send(CAN(flags='extended', identifier=0x10040000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| |
| packetsVCan1 = sock1.sniff(timeout=0.5, count=6) |
| assert len(packetsVCan1) == 6 |
| |
| sock1.close() |
| sock0.close() |
| |
| threadBridge.join(timeout=3) |
| assert not threadBridge.is_alive() |
| |
| = bridge and sniff setup vcan0 package forwarding |
| |
| sock0 = CANSocket(bustype='socketcan', channel='vcan0') |
| sock1 = CANSocket(bustype='socketcan', channel='vcan1') |
| |
| bridgeStarted = threading.Event() |
| def bridge(): |
| global bridgeStarted |
| bSock0 = CANSocket(bustype='socketcan', channel='vcan0') |
| bSock1 = CANSocket(bustype='socketcan', channel='vcan1') |
| def pnr(pkt): |
| return pkt |
| bSock0.timeout = 0.01 |
| bSock1.timeout = 0.01 |
| bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, xfrm21=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=4) |
| bSock0.close() |
| bSock1.close() |
| |
| threadBridge = threading.Thread(target=bridge) |
| threadBridge.start() |
| bridgeStarted.wait(timeout=1) |
| |
| sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) |
| sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) |
| sock1.send(CAN(flags='extended', identifier=0x80, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) |
| sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) |
| |
| packetsVCan0 = sock0.sniff(timeout=0.5, count=4) |
| assert len(packetsVCan0) == 4 |
| |
| sock0.close() |
| sock1.close() |
| |
| threadBridge.join(timeout=3) |
| assert not threadBridge.is_alive() |
| |
| =bridge and sniff setup vcan0 vcan1 package forwarding both directions |
| |
| sock0 = CANSocket(bustype='socketcan', channel='vcan0') |
| sock1 = CANSocket(bustype='socketcan', channel='vcan1') |
| |
| bridgeStarted = threading.Event() |
| def bridge(): |
| global bridgeStarted |
| bSock0 = CANSocket(bustype='socketcan', channel='vcan0') |
| bSock1 = CANSocket(bustype='socketcan', channel='vcan1') |
| def pnr(pkt): |
| return pkt |
| bSock0.timeout = 0.01 |
| bSock1.timeout = 0.01 |
| bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, xfrm21=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=10) |
| bSock0.close() |
| bSock1.close() |
| |
| threadBridge = threading.Thread(target=bridge) |
| threadBridge.start() |
| bridgeStarted.wait(timeout=1) |
| |
| sock0.send(CAN(flags='extended', identifier=0x25, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock0.send(CAN(flags='extended', identifier=0x20, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock0.send(CAN(flags='extended', identifier=0x25, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock0.send(CAN(flags='extended', identifier=0x25, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock0.send(CAN(flags='extended', identifier=0x20, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock0.send(CAN(flags='extended', identifier=0x30, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock1.send(CAN(flags='extended', identifier=0x40, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) |
| sock1.send(CAN(flags='extended', identifier=0x40, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) |
| sock1.send(CAN(flags='extended', identifier=0x80, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) |
| sock1.send(CAN(flags='extended', identifier=0x40, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) |
| |
| packetsVCan0 = sock0.sniff(timeout=0.5, count=4) |
| packetsVCan1 = sock1.sniff(timeout=0.5, count=6) |
| assert len(packetsVCan0) == 4 |
| assert len(packetsVCan1) == 6 |
| |
| sock0.close() |
| sock1.close() |
| |
| threadBridge.join(timeout=3) |
| assert not threadBridge.is_alive() |
| |
| =bridge and sniff setup vcan1 package change |
| |
| sock0 = CANSocket(bustype='socketcan', channel='vcan0') |
| sock1 = CANSocket(bustype='socketcan', channel='vcan1', can_filters=[{'can_id': 0x10010000, 'can_mask': 0x1fffffff}]) |
| |
| bridgeStarted = threading.Event() |
| def bridgeWithPackageChangeVCan0ToVCan1(): |
| global bridgeStarted |
| bSock0 = CANSocket(bustype='socketcan', channel='vcan0') |
| bSock1 = CANSocket(bustype='socketcan', channel='vcan1') |
| def pnr(pkt): |
| pkt.data = b'\x08\x07\x06\x05\x04\x03\x02\x01' |
| pkt.identifier = 0x10010000 |
| return pkt |
| bSock0.timeout = 0.01 |
| bSock1.timeout = 0.01 |
| bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=6) |
| bSock0.close() |
| bSock1.close() |
| |
| threadBridge = threading.Thread(target=bridgeWithPackageChangeVCan0ToVCan1) |
| threadBridge.start() |
| bridgeStarted.wait(timeout=1) |
| sock0.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock0.send(CAN(flags='extended', identifier=0x10020000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock0.send(CAN(flags='extended', identifier=0x10030000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock0.send(CAN(flags='extended', identifier=0x10040000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| |
| packetsVCan1 = sock1.sniff(timeout=0.5, count=6) |
| assert len(packetsVCan1) == 6 |
| |
| sock0.close() |
| sock1.close() |
| |
| threadBridge.join(timeout=3) |
| assert not threadBridge.is_alive() |
| |
| =bridge and sniff setup vcan0 package change |
| |
| sock1 = CANSocket(bustype='socketcan', channel='vcan1') |
| sock0 = CANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x10010000, 'can_mask': 0x1fffffff}]) |
| |
| bridgeStarted = threading.Event() |
| def bridgeWithPackageChangeVCan1ToVCan0(): |
| global bridgeStarted |
| bSock0 = CANSocket(bustype='socketcan', channel='vcan0') |
| bSock1 = CANSocket(bustype='socketcan', channel='vcan1') |
| def pnr(pkt): |
| pkt.data = b'\x08\x07\x06\x05\x04\x03\x02\x01' |
| pkt.identifier = 0x10010000 |
| return pkt |
| bSock0.timeout = 0.01 |
| bSock1.timeout = 0.01 |
| bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm21=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=4) |
| bSock0.close() |
| bSock1.close() |
| |
| threadBridge = threading.Thread(target=bridgeWithPackageChangeVCan1ToVCan0) |
| threadBridge.start() |
| bridgeStarted.wait(timeout=1) |
| |
| sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) |
| sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) |
| sock1.send(CAN(flags='extended', identifier=0x10050000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) |
| sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) |
| |
| packetsVCan0 = sock0.sniff(timeout=0.5, count=4) |
| assert len(packetsVCan0) == 4 |
| |
| sock0.close() |
| sock1.close() |
| |
| threadBridge.join(timeout=3) |
| assert not threadBridge.is_alive() |
| |
| =bridge and sniff setup vcan0 and vcan1 package change in both directions |
| |
| sock0 = CANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x10010000, 'can_mask': 0x1fffffff}]) |
| sock1 = CANSocket(bustype='socketcan', channel='vcan1', can_filters=[{'can_id': 0x10010000, 'can_mask': 0x1fffffff}]) |
| |
| bridgeStarted = threading.Event() |
| def bridgeWithPackageChangeBothDirections(): |
| global bridgeStarted |
| bSock0 = CANSocket(bustype='socketcan', channel='vcan0') |
| bSock1 = CANSocket(bustype='socketcan', channel='vcan1') |
| def pnr(pkt): |
| pkt.data = b'\x08\x07\x06\x05\x04\x03\x02\x01' |
| pkt.identifier = 0x10010000 |
| return pkt |
| bSock0.timeout = 0.01 |
| bSock1.timeout = 0.01 |
| bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, xfrm21=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=10) |
| bSock0.close() |
| bSock1.close() |
| |
| threadBridge = threading.Thread(target=bridgeWithPackageChangeBothDirections) |
| threadBridge.start() |
| bridgeStarted.wait(timeout=1) |
| |
| sock0.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock0.send(CAN(flags='extended', identifier=0x10020000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock0.send(CAN(flags='extended', identifier=0x10030000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock0.send(CAN(flags='extended', identifier=0x10040000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) |
| sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) |
| sock1.send(CAN(flags='extended', identifier=0x10050000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) |
| sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) |
| |
| packetsVCan0 = sock0.sniff(timeout=0.5, count=4) |
| packetsVCan1 = sock1.sniff(timeout=0.5, count=6) |
| assert len(packetsVCan0) == 4 |
| assert len(packetsVCan1) == 6 |
| |
| sock0.close() |
| sock1.close() |
| |
| threadBridge.join(timeout=3) |
| assert not threadBridge.is_alive() |
| |
| =bridge and sniff setup vcan0 package remove |
| |
| sock0 = CANSocket(bustype='socketcan', channel='vcan0') |
| sock1 = CANSocket(bustype='socketcan', channel='vcan1') |
| |
| bridgeStarted = threading.Event() |
| def bridgeWithRemovePackageFromVCan0ToVCan1(): |
| global bridgeStarted |
| bSock0 = CANSocket(bustype='socketcan', channel='vcan0') |
| bSock1 = CANSocket(bustype='socketcan', channel='vcan1') |
| def pnr(pkt): |
| if(pkt.identifier == 0x10020000): |
| pkt = False |
| else: |
| pkt = pkt |
| return pkt |
| bSock0.timeout = 0.01 |
| bSock1.timeout = 0.01 |
| bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=6) |
| bSock0.close() |
| bSock1.close() |
| |
| threadBridge = threading.Thread(target=bridgeWithRemovePackageFromVCan0ToVCan1) |
| threadBridge.start() |
| bridgeStarted.wait(timeout=1) |
| |
| sock0.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock0.send(CAN(flags='extended', identifier=0x10020000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock0.send(CAN(flags='extended', identifier=0x10030000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock0.send(CAN(flags='extended', identifier=0x10040000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| |
| packetsVCan1 = sock1.sniff(timeout=0.5, count=5) |
| |
| assert len(packetsVCan1) == 5 |
| |
| sock0.close() |
| sock1.close() |
| |
| threadBridge.join(timeout=3) |
| assert not threadBridge.is_alive() |
| |
| =bridge and sniff setup vcan1 package remove |
| |
| sock0 = CANSocket(bustype='socketcan', channel='vcan0') |
| sock1 = CANSocket(bustype='socketcan', channel='vcan1') |
| |
| bridgeStarted = threading.Event() |
| def bridgeWithRemovePackageFromVCan1ToVCan0(): |
| global bridgeStarted |
| bSock0 = CANSocket(bustype='socketcan', channel='vcan0') |
| bSock1 = CANSocket(bustype='socketcan', channel='vcan1') |
| def pnr(pkt): |
| if(pkt.identifier == 0x10050000): |
| pkt = False |
| else: |
| pkt = pkt |
| return pkt |
| bSock0.timeout = 0.01 |
| bSock1.timeout = 0.01 |
| bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm21=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=4) |
| bSock0.close() |
| bSock1.close() |
| |
| threadBridge = threading.Thread(target=bridgeWithRemovePackageFromVCan1ToVCan0) |
| threadBridge.start() |
| bridgeStarted.wait(timeout=1) |
| |
| sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) |
| sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) |
| sock1.send(CAN(flags='extended', identifier=0x10050000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) |
| sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) |
| |
| packetsVCan0 = sock0.sniff(timeout=0.5, count=3) |
| |
| assert len(packetsVCan0) == 3 |
| |
| sock0.close() |
| sock1.close() |
| |
| threadBridge.join(timeout=3) |
| assert not threadBridge.is_alive() |
| |
| |
| =bridge and sniff setup vcan0 and vcan1 package remove both directions |
| |
| sock0 = CANSocket(bustype='socketcan', channel='vcan0') |
| sock1 = CANSocket(bustype='socketcan', channel='vcan1') |
| |
| bridgeStarted = threading.Event() |
| def bridgeWithRemovePackageInBothDirections(): |
| global bridgeStarted |
| bSock0 = CANSocket(bustype='socketcan', channel='vcan0') |
| bSock1 = CANSocket(bustype='socketcan', channel='vcan1') |
| def pnrA(pkt): |
| if(pkt.identifier == 0x10020000): |
| pkt = False |
| else: |
| pkt = pkt |
| return pkt |
| def pnrB(pkt): |
| if (pkt.identifier == 0x10050000): |
| pkt = False |
| else: |
| pkt = pkt |
| return pkt |
| bSock0.timeout = 0.01 |
| bSock1.timeout = 0.01 |
| bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnrA, xfrm21=pnrB, timeout=0.5, started_callback=bridgeStarted.set, count=10) |
| bSock0.close() |
| bSock1.close() |
| |
| threadBridge = threading.Thread(target=bridgeWithRemovePackageInBothDirections) |
| threadBridge.start() |
| bridgeStarted.wait(timeout=1) |
| |
| sock0.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock0.send(CAN(flags='extended', identifier=0x10020000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock0.send(CAN(flags='extended', identifier=0x10030000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock0.send(CAN(flags='extended', identifier=0x10040000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) |
| sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) |
| sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) |
| sock1.send(CAN(flags='extended', identifier=0x10050000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) |
| sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06')) |
| |
| packetsVCan0 = sock0.sniff(timeout=0.5, count=3) |
| packetsVCan1 = sock1.sniff(timeout=0.5, count=5) |
| |
| assert len(packetsVCan0) == 3 |
| assert len(packetsVCan1) == 5 |
| |
| threadBridge.join(timeout=3) |
| assert not threadBridge.is_alive() |
| |
| sock0.close() |
| sock1.close() |
| |
| = Delete vcan interfaces |
| ~ needs_root linux vcan_socket |
| |
| if 0 != call(["sudo", "ip" ,"link", "delete", "vcan0"]): |
| raise Exception("vcan0 could not be deleted") |
| |
| if 0 != call(["sudo", "ip" ,"link", "delete", "vcan1"]): |
| raise Exception("vcan1 could not be deleted") |