blob: 78fa349d8992f28456698b436a1cdc3cd1fd6fb0 [file] [log] [blame]
% Regression tests for the CANSocket
~ vcan_socket linux needs_root not_pypy
# More information at http://www.secdev.org/projects/UTscapy/
############
############
+ Configuration of CAN virtual sockets
= Load module
~ conf
conf.contribs['CAN'] = {'swap-bytes': False, 'remove-padding': True}
load_layer("can", globals_dict=globals())
conf.contribs['CANSocket'] = {'use-python-can': True}
from scapy.contrib.cansocket_python_can import *
= Setup string for vcan
~ conf command
bashCommand = "/bin/bash -c 'sudo modprobe vcan; sudo ip link add name vcan0 type vcan; sudo ip link set dev vcan0 up'"
= Load os
~ conf command
import os
import threading
from subprocess import call
= Setup vcan0
~ conf command
0 == os.system(bashCommand)
= Define common used functions
send_done = threading.Event()
def sender(sock, msg):
if not hasattr(msg, "__iter__"):
msg = [msg]
for m in msg:
sock.send(m)
send_done.set()
+ Basic Packet Tests()
= CAN Packet init
canframe = CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08')
bytes(canframe) == b'\x00\x00\x07\xff\x08\x00\x00\x00\x01\x02\x03\x04\x05\x06\x07\x08'
+ Basic Socket Tests()
= CAN Socket Init
sock1 = CANSocket(bustype='socketcan', channel='vcan0')
sock1.close()
del sock1
sock1 = None
= CAN Socket send recv small packet
sock1 = CANSocket(bustype='socketcan', channel='vcan0')
sock2 = CANSocket(bustype='socketcan', channel='vcan0')
sock2.send(CAN(identifier=0x7ff,length=1,data=b'\x01'))
rx = sock1.recv()
sock1.close()
sock2.close()
assert rx == CAN(identifier=0x7ff,length=1,data=b'\x01')
= CAN Socket send recv small packet test with
with CANSocket(bustype='socketcan', channel='vcan0') as sock1, \
CANSocket(bustype='socketcan', channel='vcan0') as sock2:
sock2.send(CAN(identifier=0x7ff,length=1,data=b'\x01'))
rx = sock1.recv()
assert rx == CAN(identifier=0x7ff,length=1,data=b'\x01')
= CAN Socket send recv ISOTP_Packet
from scapy.contrib.isotp import ISOTPHeader, ISOTP_FF
with CANSocket(bustype='socketcan', channel='vcan0') as sock1, \
CANSocket(bustype='socketcan', channel='vcan0') as sock2:
sock2.send(ISOTPHeader(identifier=0x7ff)/ISOTP_FF(message_size=100, data=b'abcdef'))
rx = sock1.recv()
assert rx == CAN(identifier=0x7ff,length=8,data=b'\x10\x64abcdef')
= CAN Socket basecls test
with CANSocket(bustype='socketcan', channel='vcan0') as sock1, \
CANSocket(bustype='socketcan', channel='vcan0') as sock2:
sock1.basecls = Raw
sock2.send(CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
rx = sock1.recv()
assert rx == Raw(bytes(CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08')))
= CAN Socket send recv
with CANSocket(bustype='socketcan', channel='vcan0') as sock1, \
CANSocket(bustype='socketcan', channel='vcan0') as sock2:
sock2.send(CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock1.basecls = CAN
rx = sock1.recv()
assert rx == CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08')
= CAN Socket send recv swapped
conf.contribs['CAN']['swap-bytes'] = True
with CANSocket(bustype='socketcan', channel='vcan0') as sock1, \
CANSocket(bustype='socketcan', channel='vcan0') as sock2:
sock2.send(CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock1.basecls = CAN
rx = sock1.recv()
assert rx == CAN(identifier=0x7ff,length=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08')
conf.contribs['CAN']['swap-bytes'] = False
= sniff with filtermask 0x7ff
msgs = [CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(identifier=0x100, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')]
with CANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x200, 'can_mask': 0x7ff}]) as sock1, \
CANSocket(bustype='socketcan', channel='vcan0') as sock2:
for m in msgs:
sock2.send(m)
packets = sock1.sniff(timeout=0.1, count=3)
assert len(packets) == 3
= sniff with filtermask 0x700
msgs = [CAN(identifier=0x212, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(identifier=0x2ff, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(identifier=0x1ff, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(identifier=0x2aa, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')]
with CANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x200, 'can_mask': 0x700}]) as sock1, \
CANSocket(bustype='socketcan', channel='vcan0') as sock2:
for m in msgs:
sock2.send(m)
packets = sock1.sniff(timeout=0.1, count=4)
assert len(packets) == 4
= sniff with filtermask 0x0ff
msgs = [CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(identifier=0x301, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(identifier=0x1ff, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(identifier=0x700, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(identifier=0x100, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')]
with CANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x200, 'can_mask': 0xff}]) as sock1, \
CANSocket(bustype='socketcan', channel='vcan0') as sock2:
for m in msgs:
sock2.send(m)
packets = sock1.sniff(timeout=0.1, count=4)
assert len(packets) == 4
= sniff with multiple filters
msgs = [CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(identifier=0x400, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(identifier=0x500, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(identifier=0x600, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(identifier=0x700, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(identifier=0x7ff, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')]
with CANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x200, 'can_mask': 0x7ff}, {'can_id': 0x400, 'can_mask': 0x7ff}, {'can_id': 0x600, 'can_mask': 0x7ff}, {'can_id': 0x7ff, 'can_mask': 0x7ff}]) as sock1, \
CANSocket(bustype='socketcan', channel='vcan0') as sock2:
for m in msgs:
sock2.send(m)
packets = sock1.sniff(timeout=0.1, count=4)
assert len(packets) == 4
= sniff with filtermask 0x7ff
msgs = [CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(identifier=0x300, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(identifier=0x100, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(identifier=0x200, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')]
with CANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x200, 'can_mask': 0x7ff}]) as sock1, \
CANSocket(bustype='socketcan', channel='vcan0') as sock2:
for m in msgs:
sock2.send(m)
packets = sock1.sniff(timeout=0.1, count=4)
assert len(packets) == 4
= sniff with filtermask 0x1FFFFFFF
msgs = [CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(flags='extended', identifier=0x10020000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(flags='extended', identifier=0x10030000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(flags='extended', identifier=0x10040000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'),
CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08')]
with CANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x10000000, 'can_mask': 0x1fffffff}]) as sock1, \
CANSocket(bustype='socketcan', channel='vcan0') as sock2:
for m in msgs:
sock2.send(m)
packets = sock1.sniff(timeout=0.1, count=2)
assert len(packets) == 2
+ bridge and sniff tests
= bridge and sniff setup vcan1 package forwarding
bashCommand = "/bin/bash -c 'sudo ip link add name vcan1 type vcan; sudo ip link set dev vcan1 up'"
assert 0 == os.system(bashCommand)
sock0 = CANSocket(bustype='socketcan', channel='vcan0')
sock1 = CANSocket(bustype='socketcan', channel='vcan1')
bridgeStarted = threading.Event()
def bridge():
global bridgeStarted
bSock0 = CANSocket(
bustype='socketcan', channel='vcan0',
bitrate=250000)
bSock1 = CANSocket(
bustype='socketcan', channel='vcan1',
bitrate=250000)
def pnr(pkt):
return pkt
bSock0.timeout = 0.01
bSock1.timeout = 0.01
bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, xfrm21=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=6)
bSock0.close()
bSock1.close()
threadBridge = threading.Thread(target=bridge)
threadBridge.start()
bridgeStarted.wait(timeout=1)
sock0.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock0.send(CAN(flags='extended', identifier=0x10020000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock0.send(CAN(flags='extended', identifier=0x10030000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock0.send(CAN(flags='extended', identifier=0x10040000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
packetsVCan1 = sock1.sniff(timeout=0.5, count=6)
assert len(packetsVCan1) == 6
sock1.close()
sock0.close()
threadBridge.join(timeout=3)
assert not threadBridge.is_alive()
= bridge and sniff setup vcan0 package forwarding
sock0 = CANSocket(bustype='socketcan', channel='vcan0')
sock1 = CANSocket(bustype='socketcan', channel='vcan1')
bridgeStarted = threading.Event()
def bridge():
global bridgeStarted
bSock0 = CANSocket(bustype='socketcan', channel='vcan0')
bSock1 = CANSocket(bustype='socketcan', channel='vcan1')
def pnr(pkt):
return pkt
bSock0.timeout = 0.01
bSock1.timeout = 0.01
bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, xfrm21=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=4)
bSock0.close()
bSock1.close()
threadBridge = threading.Thread(target=bridge)
threadBridge.start()
bridgeStarted.wait(timeout=1)
sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06'))
sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06'))
sock1.send(CAN(flags='extended', identifier=0x80, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06'))
sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06'))
packetsVCan0 = sock0.sniff(timeout=0.5, count=4)
assert len(packetsVCan0) == 4
sock0.close()
sock1.close()
threadBridge.join(timeout=3)
assert not threadBridge.is_alive()
=bridge and sniff setup vcan0 vcan1 package forwarding both directions
sock0 = CANSocket(bustype='socketcan', channel='vcan0')
sock1 = CANSocket(bustype='socketcan', channel='vcan1')
bridgeStarted = threading.Event()
def bridge():
global bridgeStarted
bSock0 = CANSocket(bustype='socketcan', channel='vcan0')
bSock1 = CANSocket(bustype='socketcan', channel='vcan1')
def pnr(pkt):
return pkt
bSock0.timeout = 0.01
bSock1.timeout = 0.01
bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, xfrm21=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=10)
bSock0.close()
bSock1.close()
threadBridge = threading.Thread(target=bridge)
threadBridge.start()
bridgeStarted.wait(timeout=1)
sock0.send(CAN(flags='extended', identifier=0x25, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock0.send(CAN(flags='extended', identifier=0x20, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock0.send(CAN(flags='extended', identifier=0x25, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock0.send(CAN(flags='extended', identifier=0x25, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock0.send(CAN(flags='extended', identifier=0x20, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock0.send(CAN(flags='extended', identifier=0x30, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock1.send(CAN(flags='extended', identifier=0x40, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06'))
sock1.send(CAN(flags='extended', identifier=0x40, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06'))
sock1.send(CAN(flags='extended', identifier=0x80, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06'))
sock1.send(CAN(flags='extended', identifier=0x40, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06'))
packetsVCan0 = sock0.sniff(timeout=0.5, count=4)
packetsVCan1 = sock1.sniff(timeout=0.5, count=6)
assert len(packetsVCan0) == 4
assert len(packetsVCan1) == 6
sock0.close()
sock1.close()
threadBridge.join(timeout=3)
assert not threadBridge.is_alive()
=bridge and sniff setup vcan1 package change
sock0 = CANSocket(bustype='socketcan', channel='vcan0')
sock1 = CANSocket(bustype='socketcan', channel='vcan1', can_filters=[{'can_id': 0x10010000, 'can_mask': 0x1fffffff}])
bridgeStarted = threading.Event()
def bridgeWithPackageChangeVCan0ToVCan1():
global bridgeStarted
bSock0 = CANSocket(bustype='socketcan', channel='vcan0')
bSock1 = CANSocket(bustype='socketcan', channel='vcan1')
def pnr(pkt):
pkt.data = b'\x08\x07\x06\x05\x04\x03\x02\x01'
pkt.identifier = 0x10010000
return pkt
bSock0.timeout = 0.01
bSock1.timeout = 0.01
bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=6)
bSock0.close()
bSock1.close()
threadBridge = threading.Thread(target=bridgeWithPackageChangeVCan0ToVCan1)
threadBridge.start()
bridgeStarted.wait(timeout=1)
sock0.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock0.send(CAN(flags='extended', identifier=0x10020000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock0.send(CAN(flags='extended', identifier=0x10030000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock0.send(CAN(flags='extended', identifier=0x10040000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
packetsVCan1 = sock1.sniff(timeout=0.5, count=6)
assert len(packetsVCan1) == 6
sock0.close()
sock1.close()
threadBridge.join(timeout=3)
assert not threadBridge.is_alive()
=bridge and sniff setup vcan0 package change
sock1 = CANSocket(bustype='socketcan', channel='vcan1')
sock0 = CANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x10010000, 'can_mask': 0x1fffffff}])
bridgeStarted = threading.Event()
def bridgeWithPackageChangeVCan1ToVCan0():
global bridgeStarted
bSock0 = CANSocket(bustype='socketcan', channel='vcan0')
bSock1 = CANSocket(bustype='socketcan', channel='vcan1')
def pnr(pkt):
pkt.data = b'\x08\x07\x06\x05\x04\x03\x02\x01'
pkt.identifier = 0x10010000
return pkt
bSock0.timeout = 0.01
bSock1.timeout = 0.01
bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm21=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=4)
bSock0.close()
bSock1.close()
threadBridge = threading.Thread(target=bridgeWithPackageChangeVCan1ToVCan0)
threadBridge.start()
bridgeStarted.wait(timeout=1)
sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06'))
sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06'))
sock1.send(CAN(flags='extended', identifier=0x10050000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06'))
sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06'))
packetsVCan0 = sock0.sniff(timeout=0.5, count=4)
assert len(packetsVCan0) == 4
sock0.close()
sock1.close()
threadBridge.join(timeout=3)
assert not threadBridge.is_alive()
=bridge and sniff setup vcan0 and vcan1 package change in both directions
sock0 = CANSocket(bustype='socketcan', channel='vcan0', can_filters=[{'can_id': 0x10010000, 'can_mask': 0x1fffffff}])
sock1 = CANSocket(bustype='socketcan', channel='vcan1', can_filters=[{'can_id': 0x10010000, 'can_mask': 0x1fffffff}])
bridgeStarted = threading.Event()
def bridgeWithPackageChangeBothDirections():
global bridgeStarted
bSock0 = CANSocket(bustype='socketcan', channel='vcan0')
bSock1 = CANSocket(bustype='socketcan', channel='vcan1')
def pnr(pkt):
pkt.data = b'\x08\x07\x06\x05\x04\x03\x02\x01'
pkt.identifier = 0x10010000
return pkt
bSock0.timeout = 0.01
bSock1.timeout = 0.01
bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, xfrm21=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=10)
bSock0.close()
bSock1.close()
threadBridge = threading.Thread(target=bridgeWithPackageChangeBothDirections)
threadBridge.start()
bridgeStarted.wait(timeout=1)
sock0.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock0.send(CAN(flags='extended', identifier=0x10020000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock0.send(CAN(flags='extended', identifier=0x10030000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock0.send(CAN(flags='extended', identifier=0x10040000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06'))
sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06'))
sock1.send(CAN(flags='extended', identifier=0x10050000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06'))
sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06'))
packetsVCan0 = sock0.sniff(timeout=0.5, count=4)
packetsVCan1 = sock1.sniff(timeout=0.5, count=6)
assert len(packetsVCan0) == 4
assert len(packetsVCan1) == 6
sock0.close()
sock1.close()
threadBridge.join(timeout=3)
assert not threadBridge.is_alive()
=bridge and sniff setup vcan0 package remove
sock0 = CANSocket(bustype='socketcan', channel='vcan0')
sock1 = CANSocket(bustype='socketcan', channel='vcan1')
bridgeStarted = threading.Event()
def bridgeWithRemovePackageFromVCan0ToVCan1():
global bridgeStarted
bSock0 = CANSocket(bustype='socketcan', channel='vcan0')
bSock1 = CANSocket(bustype='socketcan', channel='vcan1')
def pnr(pkt):
if(pkt.identifier == 0x10020000):
pkt = False
else:
pkt = pkt
return pkt
bSock0.timeout = 0.01
bSock1.timeout = 0.01
bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=6)
bSock0.close()
bSock1.close()
threadBridge = threading.Thread(target=bridgeWithRemovePackageFromVCan0ToVCan1)
threadBridge.start()
bridgeStarted.wait(timeout=1)
sock0.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock0.send(CAN(flags='extended', identifier=0x10020000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock0.send(CAN(flags='extended', identifier=0x10030000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock0.send(CAN(flags='extended', identifier=0x10040000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
packetsVCan1 = sock1.sniff(timeout=0.5, count=5)
assert len(packetsVCan1) == 5
sock0.close()
sock1.close()
threadBridge.join(timeout=3)
assert not threadBridge.is_alive()
=bridge and sniff setup vcan1 package remove
sock0 = CANSocket(bustype='socketcan', channel='vcan0')
sock1 = CANSocket(bustype='socketcan', channel='vcan1')
bridgeStarted = threading.Event()
def bridgeWithRemovePackageFromVCan1ToVCan0():
global bridgeStarted
bSock0 = CANSocket(bustype='socketcan', channel='vcan0')
bSock1 = CANSocket(bustype='socketcan', channel='vcan1')
def pnr(pkt):
if(pkt.identifier == 0x10050000):
pkt = False
else:
pkt = pkt
return pkt
bSock0.timeout = 0.01
bSock1.timeout = 0.01
bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm21=pnr, timeout=0.5, started_callback=bridgeStarted.set, count=4)
bSock0.close()
bSock1.close()
threadBridge = threading.Thread(target=bridgeWithRemovePackageFromVCan1ToVCan0)
threadBridge.start()
bridgeStarted.wait(timeout=1)
sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06'))
sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06'))
sock1.send(CAN(flags='extended', identifier=0x10050000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06'))
sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06'))
packetsVCan0 = sock0.sniff(timeout=0.5, count=3)
assert len(packetsVCan0) == 3
sock0.close()
sock1.close()
threadBridge.join(timeout=3)
assert not threadBridge.is_alive()
=bridge and sniff setup vcan0 and vcan1 package remove both directions
sock0 = CANSocket(bustype='socketcan', channel='vcan0')
sock1 = CANSocket(bustype='socketcan', channel='vcan1')
bridgeStarted = threading.Event()
def bridgeWithRemovePackageInBothDirections():
global bridgeStarted
bSock0 = CANSocket(bustype='socketcan', channel='vcan0')
bSock1 = CANSocket(bustype='socketcan', channel='vcan1')
def pnrA(pkt):
if(pkt.identifier == 0x10020000):
pkt = False
else:
pkt = pkt
return pkt
def pnrB(pkt):
if (pkt.identifier == 0x10050000):
pkt = False
else:
pkt = pkt
return pkt
bSock0.timeout = 0.01
bSock1.timeout = 0.01
bridge_and_sniff(if1=bSock0, if2=bSock1, xfrm12=pnrA, xfrm21=pnrB, timeout=0.5, started_callback=bridgeStarted.set, count=10)
bSock0.close()
bSock1.close()
threadBridge = threading.Thread(target=bridgeWithRemovePackageInBothDirections)
threadBridge.start()
bridgeStarted.wait(timeout=1)
sock0.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock0.send(CAN(flags='extended', identifier=0x10020000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock0.send(CAN(flags='extended', identifier=0x10030000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock0.send(CAN(flags='extended', identifier=0x10040000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock0.send(CAN(flags='extended', identifier=0x10000000, length=8, data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))
sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06'))
sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06'))
sock1.send(CAN(flags='extended', identifier=0x10050000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06'))
sock1.send(CAN(flags='extended', identifier=0x10010000, length=8, data=b'\x01\x02\x03\x04\x05\x04\x05\x06'))
packetsVCan0 = sock0.sniff(timeout=0.5, count=3)
packetsVCan1 = sock1.sniff(timeout=0.5, count=5)
assert len(packetsVCan0) == 3
assert len(packetsVCan1) == 5
threadBridge.join(timeout=3)
assert not threadBridge.is_alive()
sock0.close()
sock1.close()
= Delete vcan interfaces
~ needs_root linux vcan_socket
if 0 != call(["sudo", "ip" ,"link", "delete", "vcan0"]):
raise Exception("vcan0 could not be deleted")
if 0 != call(["sudo", "ip" ,"link", "delete", "vcan1"]):
raise Exception("vcan1 could not be deleted")