blob: f622f385eec710d8e36c6b6c720041cd5828343e [file] [log] [blame]
########################
% Pipetool related tests
########################
+ Basic tests
= Test default test case
s = PeriodicSource("hello", 1, name="src")
d1 = Drain(name="d1")
c = ConsoleSink(name="c")
tf = TransformDrain(lambda x: "Got %s" % x)
s > d1 > c
d1 > tf
try:
t = TermSink(name="PipeToolsPeriodicTest", keepterm=False)
tf > t
except (IOError, OSError):
pass
p = PipeEngine(s)
p.start()
time.sleep(3)
s.msg = []
p.stop()
= Test add_pipe
s = AutoSource()
p = PipeEngine(s)
p.add(Pipe())
assert len(p.active_pipes) == 2
x = p.spawn_Pipe()
assert len(p.active_pipes) == 3
assert isinstance(x, Pipe)
= Test exhausted source
s = AutoSource()
s._gen_data("hello")
s.is_exhausted = True
d1 = Drain(name="d1")
c = ConsoleSink(name="c")
s > d1 > c
p = PipeEngine(s)
p.start()
p.wait_and_stop()
= Test add_pipe on running instance
p = PipeEngine()
p.start()
s = CLIFeeder()
d1 = Drain(name="d1")
c = QueueSink(name="c")
s > d1 > c
p.add(s)
s.send("hello")
s.send("hi")
assert c.q.get(timeout=5) == "hello"
assert c.q.get(timeout=5) == "hi"
p.stop()
= Test Operators
s = AutoSource()
p = PipeEngine(s)
assert p == p
a = AutoSource()
b = AutoSource()
a >> b
assert len(a.high_sinks) == 1
assert len(a.high_sources) == 0
assert len(b.high_sinks) == 0
assert len(b.high_sources) == 1
a
b
a = Sink()
b = AutoSource()
a << b
assert len(a.high_sinks) == 0
assert len(a.high_sources) == 1
assert len(b.high_sinks) == 1
assert len(b.high_sources) == 0
a
b
a = Sink()
b = Sink()
a % b
assert len(a.sinks) == 1
assert len(a.sources) == 1
assert len(b.sinks) == 1
assert len(b.sources) == 1
a = Sink()
b = Sink()
a//b
assert len(a.high_sinks) == 1
assert len(a.high_sources) == 1
assert len(b.high_sinks) == 1
assert len(b.high_sources) == 1
a = AutoSource()
b = Sink()
a^b
assert len(b.trigger_sources) == 1
assert len(a.trigger_sinks) == 1
= Test doc
s = AutoSource()
p = PipeEngine(s)
p.list_pipes()
p.list_pipes_detailed()
= Test RawConsoleSink with CLIFeeder
p = PipeEngine()
s = CLIFeeder()
s.send("hello")
s.is_exhausted = True
r, w = os.pipe()
d1 = Drain(name="d1")
c = RawConsoleSink(name="c")
c._write_pipe = w
s > d1 > c
p.add(s)
p.start()
assert os.read(r, 20) == b"hello\n"
p.wait_and_stop()
= Test QueueSink with CLIFeeder
p = PipeEngine()
s = CLIFeeder()
s.send("hello")
s.is_exhausted = True
d1 = Drain(name="d1")
c = QueueSink(name="c")
s > d1 > c
p.add(s)
p.start()
p.wait_and_stop()
assert c.recv() == "hello"
assert c.recv(block=False) is None
= Test UpDrain
test_val = None
class TestSink(Sink):
def high_push(self, msg):
global test_val
test_val = msg
p = PipeEngine()
s = CLIFeeder()
s.send("hello")
s.is_exhausted = True
d1 = UpDrain(name="d1")
c = TestSink(name="c")
s > d1
d1 >> c
p.add(s)
p.start()
p.wait_and_stop()
assert test_val == "hello"
= Test DownDrain
test_val = None
class TestSink(Sink):
def push(self, msg):
global test_val
test_val = msg
p = PipeEngine()
s = CLIHighFeeder()
s.send("hello")
s.is_exhausted = True
d1 = DownDrain(name="d1")
c = TestSink(name="c")
s >> d1
d1 > c
p.add(s)
p.start()
p.wait_and_stop()
assert test_val == "hello"
= Test PeriodicSource exhaustion
s = PeriodicSource("", 1)
s.msg = []
p = PipeEngine(s)
p.start()
p.wait_and_stop()
+ Advanced ScapyPipes pipetools tests
= Test SniffSource
from unittest import mock
fd = ObjectPipe("sniffsource")
fd.write("test")
@mock.patch("scapy.scapypipes.conf.L2listen")
def _test(l2listen):
l2listen.return_value=Bunch(close=lambda *args: None, fileno=lambda: fd.fileno(), recv=lambda *args: Raw("data"))
p = PipeEngine()
s = SniffSource()
assert s.s is None
d1 = Drain(name="d1")
c = QueueSink(name="c")
s > d1 > c
p.add(s)
p.start()
x = c.q.get(2)
assert bytes(x) == b"data"
assert s.s is not None
p.stop()
try:
_test()
finally:
fd.close()
= Test SniffSource with socket
fd = ObjectPipe("sniffsource_socket")
fd.write("test")
class FakeSocket(object):
def __init__(self):
self.times = 0
def recv(self, x=None):
if self.times > 2:
return
self.times += 1
return Raw(b'hello')
def fileno(self):
return fd.fileno()
try:
p = PipeEngine()
s = SniffSource(socket=FakeSocket())
assert s.s is not None
d = Drain()
c = QueueSink()
p.add(s > d > c)
p.start()
msg = c.q.get(timeout=1)
p.stop()
assert raw(msg) == b'hello'
finally:
fd.close()
= Test SniffSource with invalid args
try:
s = SniffSource(iface='eth0', socket='not a socket')
except ValueError:
pass
else:
# expected ValueError
assert False
= Test exhausted AutoSource and SniffSource
from unittest import mock
from scapy.error import Scapy_Exception
def _fail():
raise Scapy_Exception()
a = AutoSource()
a._send = mock.MagicMock(side_effect=_fail)
a.send("x")
try:
a.deliver()
except:
pass
s = SniffSource()
s.s = mock.MagicMock()
s.s.recv = mock.MagicMock(side_effect=_fail)
try:
s.deliver()
except:
pass
= Test WiresharkSink
~ wiresharksink
q = ObjectPipe("wiresharksink")
pkt = Ether(dst="aa:aa:aa:aa:aa:aa", src="bb:bb:bb:bb:bb:bb")/IP(dst="127.0.0.1", src="127.0.0.1")/ICMP()
from unittest import mock
with mock.patch("scapy.scapypipes.subprocess.Popen", return_value=Bunch(stdin=q)) as popen:
sink = WiresharkSink()
sink.start()
sink.push(pkt)
q.recv()
q.recv()
assert raw(pkt) in q.recv()
popen.assert_called_once_with(
[conf.prog.wireshark, '-Slki', '-'], stdin=subprocess.PIPE, stdout=None,
stderr=None)
= Test WiresharkSink with linktype
~ wiresharksink
linktype = scapy.data.DLT_EN3MB
q = ObjectPipe("wiresharksink_linktype")
pkt = Ether(dst="aa:aa:aa:aa:aa:aa", src="bb:bb:bb:bb:bb:bb")/IP(dst="127.0.0.1", src="127.0.0.1")/ICMP()
from unittest import mock
with mock.patch("scapy.scapypipes.subprocess.Popen", return_value=Bunch(stdin=q)) as popen:
sink = WiresharkSink(linktype=linktype)
sink.start()
sink.push(pkt)
chb(linktype) in q.recv()
q.recv()
assert raw(pkt) in q.recv()
= Test WiresharkSink with args
~ wiresharksink
linktype = scapy.data.DLT_EN3MB
q = ObjectPipe("wiresharksink_args")
pkt = Ether(dst="aa:aa:aa:aa:aa:aa", src="bb:bb:bb:bb:bb:bb")/IP(dst="127.0.0.1", src="127.0.0.1")/ICMP()
from unittest import mock
with mock.patch("scapy.scapypipes.subprocess.Popen", return_value=Bunch(stdin=q)) as popen:
sink = WiresharkSink(args=['-c', '1'])
sink.start()
sink.push(pkt)
popen.assert_called_once_with(
[conf.prog.wireshark, '-Slki', '-', '-c', '1'],
stdin=subprocess.PIPE, stdout=None, stderr=None)
= Test RdpcapSource and WrpcapSink
dname = get_temp_dir()
req = Ether(b'E\x00\x00\x1c\x00\x00\x00\x004\x01\x1d\x04\xd8:\xd0\x83\xc0\xa8\x00w\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
rpy = Ether(b'\x8c\xf8\x13C5P\xdcS`\xeb\x80H\x08\x00E\x00\x00\x1c\x00\x00\x00\x004\x01\x1d\x04\xd8:\xd0\x83\xc0\xa8\x00w\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
wrpcap(os.path.join(dname, "t.pcap"), [req, rpy])
p = PipeEngine()
s = RdpcapSource(os.path.join(dname, "t.pcap"))
d1 = Drain(name="d1")
c = WrpcapSink(os.path.join(dname, "t2.pcap.gz"), name="c", gz=1)
s > d1 > c
p.add(s)
p.start()
p.wait_and_stop()
results = rdpcap(os.path.join(dname, "t2.pcap.gz"))
assert raw(results[0]) == raw(req)
assert raw(results[1]) == raw(rpy)
os.unlink(os.path.join(dname, "t.pcap"))
os.unlink(os.path.join(dname, "t2.pcap.gz"))
= Test InjectSink and Inject3Sink
~ needs_root
from unittest import mock
a = IP(dst="192.168.0.1")/ICMP()
msgs = []
class FakeSocket(object):
def __init__(self, *arg, **karg):
pass
def close(self):
pass
def send(self, msg):
global msgs
msgs.append(msg)
@mock.patch("scapy.scapypipes.conf.L2socket", FakeSocket)
@mock.patch("scapy.scapypipes.conf.L3socket", FakeSocket)
def _inject_sink(i3):
s = CLIFeeder()
s.send(a)
s.is_exhausted = True
d1 = Drain(name="d1")
c = Inject3Sink() if i3 else InjectSink()
s > d1 > c
p = PipeEngine(s)
p.start()
p.wait_and_stop()
_inject_sink(False) # InjectSink
_inject_sink(True) # Inject3Sink
assert msgs == [a,a]
= TriggerDrain and TriggeredValve with CLIFeeder
s = CLIFeeder()
d1 = TriggerDrain(lambda x:x=="trigger")
d2 = TriggeredValve()
c = QueueSink()
s > d1 > d2 > c
d1 ^ d2
p = PipeEngine(s)
p.start()
s.send("hello")
s.send("trigger")
s.send("hello2")
s.send("trigger")
s.send("hello3")
assert c.q.get(timeout=5) == "hello"
assert c.q.get(timeout=5) == "trigger"
assert c.q.get(timeout=5) == "hello3"
p.stop()
= TriggerDrain and TriggeredValve with CLIHighFeeder
s = CLIHighFeeder()
d1 = TriggerDrain(lambda x:x=="trigger")
d2 = TriggeredValve()
c = QueueSink()
s >> d1
d1 >> d2
d2 >> c
d1 ^ d2
p = PipeEngine(s)
p.start()
s.send("hello")
s.send("trigger")
s.send("hello2")
s.send("trigger")
s.send("hello3")
assert c.q.get(timeout=5) == "hello"
assert c.q.get(timeout=5) == "trigger"
assert c.q.get(timeout=5) == "hello3"
p.stop()
= TriggerDrain and TriggeredQueueingValve with CLIFeeder
s = CLIFeeder()
d1 = TriggerDrain(lambda x:x=="trigger")
d2 = TriggeredValve()
c = QueueSink()
s > d1 > d2 > c
d1 ^ d2
p = PipeEngine(s)
p.start()
s.send("hello")
s.send("trigger")
s.send("hello2")
s.send("trigger")
s.send("hello3")
assert c.q.get(timeout=5) == "hello"
assert c.q.get(timeout=5) == "trigger"
assert c.q.get(timeout=5) == "hello3"
p.stop()
= TriggerDrain and TriggeredSwitch with CLIFeeder on high channel
s = CLIFeeder()
d1 = TriggerDrain(lambda x:x=="trigger")
d2 = TriggeredSwitch()
c = QueueSink()
s > d1 > d2
d2 >> c
d1 ^ d2
p = PipeEngine(s)
p.start()
s.send("hello")
s.send("trigger")
s.send("hello2")
s.send("trigger")
s.send("hello3")
assert c.q.get(timeout=5) == "trigger"
assert c.q.get(timeout=5) == "hello2"
p.stop()
= TriggerDrain and TriggeredSwitch with CLIHighFeeder on low channel
s = CLIHighFeeder()
d1 = TriggerDrain(lambda x:x=="trigger")
d2 = TriggeredSwitch()
c = QueueSink()
s >> d1
d1 >> d2
d2 > c
d1 ^ d2
p = PipeEngine(s)
p.start()
s.send("hello")
s.send("trigger")
s.send("hello2")
s.send("trigger")
s.send("hello3")
assert c.q.get(timeout=5) == "hello"
assert c.q.get(timeout=5) == "trigger"
assert c.q.get(timeout=5) == "hello3"
p.stop()
= TriggerDrain and TriggeredMessage
s = CLIFeeder()
d1 = TriggerDrain(lambda x:x=="trigger")
d2 = TriggeredMessage("hello")
c = QueueSink()
s > d1 > d2 > c
d1 ^ d2
p = PipeEngine(s)
p.start()
s.send("trigger")
r = [c.q.get(timeout=5), c.q.get(timeout=5)]
assert "hello" in r
assert "trigger" in r
p.stop()
= TriggerDrain and TriggeredQueueingValve on low channel
p = PipeEngine()
s = CLIFeeder()
r, w = os.pipe()
d1 = TriggerDrain(lambda x:x=="trigger")
d2 = TriggeredQueueingValve()
c = QueueSink(name="c")
s > d1 > d2 > c
d1 ^ d2
p.add(s)
p.start()
s.send("trigger")
s.send("hello")
s.send("trigger")
assert c.q.get(timeout=3) == "trigger"
assert c.q.get(timeout=3) in ['hello', 'trigger']
assert c.q.get(timeout=3) in ['hello', 'trigger']
assert d2.q.qsize() == 0
p.stop()
= TriggerDrain and TriggeredQueueingValve on high channel
p = PipeEngine()
s = CLIHighFeeder()
r, w = os.pipe()
d1 = TriggerDrain(lambda x:x=="trigger")
d2 = TriggeredQueueingValve()
c = QueueSink(name="c")
s >> d1 >> d2 >> c
d1 ^ d2
p.add(s)
p.start()
s.send("trigger")
s.send("hello")
s.send("trigger")
assert c.q.get(timeout=3) == "trigger"
assert c.q.get(timeout=3) == "hello"
assert d2.q.qsize() == 0
p.stop()
= UDPDrain
p = PipeEngine()
s = CLIFeeder()
s2 = CLIHighFeeder()
d1 = UDPDrain()
c = QueueSink()
s > d1 > c
s2 >> d1 >> c
p.add(s)
p.add(s2)
p.start()
pkt = DNS()
s.send(IP(src="127.0.0.1")/UDP()/DNS())
s2.send(pkt)
res = [c.q.get(timeout=2), c.q.get(timeout=2)]
assert raw(pkt) in res
res.remove(raw(pkt))
assert DNS in res[0] and res[0][UDP].sport == 1234
p.stop()
= FDSourceSink on a ObjectPipe object
obj = ObjectPipe("fdsourcesink")
obj.send("hello")
s = FDSourceSink(obj)
d = Drain()
c = QueueSink()
s > d > c
s.push("data")
s.deliver()
assert c.q.get(timeout=1) == "hello"
= UDPClientPipe and UDPServerPipe
~ networking needs_root
p = PipeEngine()
s = CLIFeeder()
srv = UDPServerPipe(name="srv", port=10000)
cli = UDPClientPipe(name="cli", addr="127.0.0.1", port=10000)
c = QueueSink(name="c")
s > cli
srv > c
p.add(s, c)
p.start()
s.send(b"hello")
p.start()
assert c.recv() == b"hello"
p.stop()
srv.stop()
= TCPConnectPipe networking test
~ networking needs_root
p = PipeEngine()
s = CLIFeeder()
d1 = TCPConnectPipe(addr="www.google.com", port=80)
c = QueueSink()
s > d1 > c
p.add(s)
p.start()
from scapy.layers.http import HTTPRequest, HTTP
s.send(bytes(HTTP()/HTTPRequest(Host="www.google.com")))
result = c.q.get(timeout=10)
p.stop()
result
assert result.startswith(b"HTTP/1.1 200 OK") or result.startswith(b"HTTP/1.1 302 Found")