| ######################## |
| % Pipetool related tests |
| ######################## |
| |
| + Basic tests |
| |
| = Test default test case |
| |
| s = PeriodicSource("hello", 1, name="src") |
| d1 = Drain(name="d1") |
| c = ConsoleSink(name="c") |
| tf = TransformDrain(lambda x: "Got %s" % x) |
| s > d1 > c |
| d1 > tf |
| try: |
| t = TermSink(name="PipeToolsPeriodicTest", keepterm=False) |
| tf > t |
| except (IOError, OSError): |
| pass |
| |
| p = PipeEngine(s) |
| p.start() |
| time.sleep(3) |
| s.msg = [] |
| p.stop() |
| |
| = Test add_pipe |
| |
| s = AutoSource() |
| p = PipeEngine(s) |
| p.add(Pipe()) |
| assert len(p.active_pipes) == 2 |
| |
| x = p.spawn_Pipe() |
| assert len(p.active_pipes) == 3 |
| assert isinstance(x, Pipe) |
| |
| = Test exhausted source |
| |
| s = AutoSource() |
| s._gen_data("hello") |
| s.is_exhausted = True |
| d1 = Drain(name="d1") |
| c = ConsoleSink(name="c") |
| s > d1 > c |
| |
| p = PipeEngine(s) |
| p.start() |
| p.wait_and_stop() |
| |
| = Test add_pipe on running instance |
| |
| p = PipeEngine() |
| p.start() |
| |
| s = CLIFeeder() |
| |
| d1 = Drain(name="d1") |
| c = QueueSink(name="c") |
| s > d1 > c |
| |
| p.add(s) |
| |
| s.send("hello") |
| s.send("hi") |
| |
| assert c.q.get(timeout=5) == "hello" |
| assert c.q.get(timeout=5) == "hi" |
| |
| p.stop() |
| |
| = Test Operators |
| |
| s = AutoSource() |
| p = PipeEngine(s) |
| assert p == p |
| |
| a = AutoSource() |
| b = AutoSource() |
| a >> b |
| assert len(a.high_sinks) == 1 |
| assert len(a.high_sources) == 0 |
| assert len(b.high_sinks) == 0 |
| assert len(b.high_sources) == 1 |
| a |
| b |
| |
| a = Sink() |
| b = AutoSource() |
| a << b |
| assert len(a.high_sinks) == 0 |
| assert len(a.high_sources) == 1 |
| assert len(b.high_sinks) == 1 |
| assert len(b.high_sources) == 0 |
| a |
| b |
| |
| a = Sink() |
| b = Sink() |
| a % b |
| assert len(a.sinks) == 1 |
| assert len(a.sources) == 1 |
| assert len(b.sinks) == 1 |
| assert len(b.sources) == 1 |
| |
| a = Sink() |
| b = Sink() |
| a//b |
| assert len(a.high_sinks) == 1 |
| assert len(a.high_sources) == 1 |
| assert len(b.high_sinks) == 1 |
| assert len(b.high_sources) == 1 |
| |
| a = AutoSource() |
| b = Sink() |
| a^b |
| assert len(b.trigger_sources) == 1 |
| assert len(a.trigger_sinks) == 1 |
| |
| = Test doc |
| |
| s = AutoSource() |
| p = PipeEngine(s) |
| p.list_pipes() |
| p.list_pipes_detailed() |
| |
| = Test RawConsoleSink with CLIFeeder |
| |
| p = PipeEngine() |
| |
| s = CLIFeeder() |
| s.send("hello") |
| s.is_exhausted = True |
| |
| r, w = os.pipe() |
| |
| d1 = Drain(name="d1") |
| c = RawConsoleSink(name="c") |
| c._write_pipe = w |
| s > d1 > c |
| |
| p.add(s) |
| p.start() |
| |
| assert os.read(r, 20) == b"hello\n" |
| p.wait_and_stop() |
| |
| = Test QueueSink with CLIFeeder |
| |
| p = PipeEngine() |
| |
| s = CLIFeeder() |
| s.send("hello") |
| s.is_exhausted = True |
| |
| d1 = Drain(name="d1") |
| c = QueueSink(name="c") |
| s > d1 > c |
| |
| p.add(s) |
| p.start() |
| |
| p.wait_and_stop() |
| assert c.recv() == "hello" |
| assert c.recv(block=False) is None |
| |
| = Test UpDrain |
| |
| test_val = None |
| |
| class TestSink(Sink): |
| def high_push(self, msg): |
| global test_val |
| test_val = msg |
| |
| p = PipeEngine() |
| |
| s = CLIFeeder() |
| s.send("hello") |
| s.is_exhausted = True |
| |
| d1 = UpDrain(name="d1") |
| c = TestSink(name="c") |
| s > d1 |
| d1 >> c |
| |
| p.add(s) |
| p.start() |
| |
| p.wait_and_stop() |
| assert test_val == "hello" |
| |
| = Test DownDrain |
| |
| test_val = None |
| |
| class TestSink(Sink): |
| def push(self, msg): |
| global test_val |
| test_val = msg |
| |
| p = PipeEngine() |
| |
| s = CLIHighFeeder() |
| s.send("hello") |
| s.is_exhausted = True |
| |
| d1 = DownDrain(name="d1") |
| c = TestSink(name="c") |
| s >> d1 |
| d1 > c |
| |
| p.add(s) |
| p.start() |
| |
| p.wait_and_stop() |
| assert test_val == "hello" |
| |
| = Test PeriodicSource exhaustion |
| |
| s = PeriodicSource("", 1) |
| s.msg = [] |
| p = PipeEngine(s) |
| p.start() |
| p.wait_and_stop() |
| |
| + Advanced ScapyPipes pipetools tests |
| |
| = Test SniffSource |
| |
| from unittest import mock |
| fd = ObjectPipe("sniffsource") |
| fd.write("test") |
| |
| @mock.patch("scapy.scapypipes.conf.L2listen") |
| def _test(l2listen): |
| l2listen.return_value=Bunch(close=lambda *args: None, fileno=lambda: fd.fileno(), recv=lambda *args: Raw("data")) |
| p = PipeEngine() |
| s = SniffSource() |
| assert s.s is None |
| d1 = Drain(name="d1") |
| c = QueueSink(name="c") |
| s > d1 > c |
| p.add(s) |
| p.start() |
| x = c.q.get(2) |
| assert bytes(x) == b"data" |
| assert s.s is not None |
| p.stop() |
| |
| try: |
| _test() |
| finally: |
| fd.close() |
| |
| = Test SniffSource with socket |
| |
| fd = ObjectPipe("sniffsource_socket") |
| fd.write("test") |
| |
| class FakeSocket(object): |
| def __init__(self): |
| self.times = 0 |
| def recv(self, x=None): |
| if self.times > 2: |
| return |
| self.times += 1 |
| return Raw(b'hello') |
| def fileno(self): |
| return fd.fileno() |
| |
| try: |
| p = PipeEngine() |
| s = SniffSource(socket=FakeSocket()) |
| assert s.s is not None |
| d = Drain() |
| c = QueueSink() |
| p.add(s > d > c) |
| p.start() |
| msg = c.q.get(timeout=1) |
| p.stop() |
| assert raw(msg) == b'hello' |
| finally: |
| fd.close() |
| |
| = Test SniffSource with invalid args |
| |
| try: |
| s = SniffSource(iface='eth0', socket='not a socket') |
| except ValueError: |
| pass |
| else: |
| # expected ValueError |
| assert False |
| |
| = Test exhausted AutoSource and SniffSource |
| |
| from unittest import mock |
| from scapy.error import Scapy_Exception |
| |
| def _fail(): |
| raise Scapy_Exception() |
| |
| a = AutoSource() |
| a._send = mock.MagicMock(side_effect=_fail) |
| a.send("x") |
| try: |
| a.deliver() |
| except: |
| pass |
| |
| s = SniffSource() |
| s.s = mock.MagicMock() |
| s.s.recv = mock.MagicMock(side_effect=_fail) |
| try: |
| s.deliver() |
| except: |
| pass |
| |
| = Test WiresharkSink |
| ~ wiresharksink |
| |
| q = ObjectPipe("wiresharksink") |
| pkt = Ether(dst="aa:aa:aa:aa:aa:aa", src="bb:bb:bb:bb:bb:bb")/IP(dst="127.0.0.1", src="127.0.0.1")/ICMP() |
| |
| from unittest import mock |
| with mock.patch("scapy.scapypipes.subprocess.Popen", return_value=Bunch(stdin=q)) as popen: |
| sink = WiresharkSink() |
| sink.start() |
| |
| sink.push(pkt) |
| |
| q.recv() |
| q.recv() |
| assert raw(pkt) in q.recv() |
| |
| popen.assert_called_once_with( |
| [conf.prog.wireshark, '-Slki', '-'], stdin=subprocess.PIPE, stdout=None, |
| stderr=None) |
| |
| = Test WiresharkSink with linktype |
| ~ wiresharksink |
| |
| linktype = scapy.data.DLT_EN3MB |
| q = ObjectPipe("wiresharksink_linktype") |
| pkt = Ether(dst="aa:aa:aa:aa:aa:aa", src="bb:bb:bb:bb:bb:bb")/IP(dst="127.0.0.1", src="127.0.0.1")/ICMP() |
| |
| from unittest import mock |
| with mock.patch("scapy.scapypipes.subprocess.Popen", return_value=Bunch(stdin=q)) as popen: |
| sink = WiresharkSink(linktype=linktype) |
| sink.start() |
| |
| sink.push(pkt) |
| |
| chb(linktype) in q.recv() |
| q.recv() |
| assert raw(pkt) in q.recv() |
| |
| = Test WiresharkSink with args |
| ~ wiresharksink |
| |
| linktype = scapy.data.DLT_EN3MB |
| q = ObjectPipe("wiresharksink_args") |
| pkt = Ether(dst="aa:aa:aa:aa:aa:aa", src="bb:bb:bb:bb:bb:bb")/IP(dst="127.0.0.1", src="127.0.0.1")/ICMP() |
| |
| from unittest import mock |
| with mock.patch("scapy.scapypipes.subprocess.Popen", return_value=Bunch(stdin=q)) as popen: |
| sink = WiresharkSink(args=['-c', '1']) |
| sink.start() |
| |
| sink.push(pkt) |
| |
| popen.assert_called_once_with( |
| [conf.prog.wireshark, '-Slki', '-', '-c', '1'], |
| stdin=subprocess.PIPE, stdout=None, stderr=None) |
| |
| = Test RdpcapSource and WrpcapSink |
| |
| dname = get_temp_dir() |
| |
| req = Ether(b'E\x00\x00\x1c\x00\x00\x00\x004\x01\x1d\x04\xd8:\xd0\x83\xc0\xa8\x00w\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00') |
| rpy = Ether(b'\x8c\xf8\x13C5P\xdcS`\xeb\x80H\x08\x00E\x00\x00\x1c\x00\x00\x00\x004\x01\x1d\x04\xd8:\xd0\x83\xc0\xa8\x00w\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00') |
| |
| wrpcap(os.path.join(dname, "t.pcap"), [req, rpy]) |
| |
| p = PipeEngine() |
| |
| s = RdpcapSource(os.path.join(dname, "t.pcap")) |
| d1 = Drain(name="d1") |
| c = WrpcapSink(os.path.join(dname, "t2.pcap.gz"), name="c", gz=1) |
| s > d1 > c |
| p.add(s) |
| p.start() |
| p.wait_and_stop() |
| |
| results = rdpcap(os.path.join(dname, "t2.pcap.gz")) |
| |
| assert raw(results[0]) == raw(req) |
| assert raw(results[1]) == raw(rpy) |
| |
| os.unlink(os.path.join(dname, "t.pcap")) |
| os.unlink(os.path.join(dname, "t2.pcap.gz")) |
| |
| = Test InjectSink and Inject3Sink |
| ~ needs_root |
| |
| from unittest import mock |
| |
| a = IP(dst="192.168.0.1")/ICMP() |
| msgs = [] |
| |
| class FakeSocket(object): |
| def __init__(self, *arg, **karg): |
| pass |
| def close(self): |
| pass |
| def send(self, msg): |
| global msgs |
| msgs.append(msg) |
| |
| @mock.patch("scapy.scapypipes.conf.L2socket", FakeSocket) |
| @mock.patch("scapy.scapypipes.conf.L3socket", FakeSocket) |
| def _inject_sink(i3): |
| s = CLIFeeder() |
| s.send(a) |
| s.is_exhausted = True |
| d1 = Drain(name="d1") |
| c = Inject3Sink() if i3 else InjectSink() |
| s > d1 > c |
| p = PipeEngine(s) |
| p.start() |
| p.wait_and_stop() |
| |
| _inject_sink(False) # InjectSink |
| _inject_sink(True) # Inject3Sink |
| |
| assert msgs == [a,a] |
| |
| = TriggerDrain and TriggeredValve with CLIFeeder |
| |
| s = CLIFeeder() |
| d1 = TriggerDrain(lambda x:x=="trigger") |
| d2 = TriggeredValve() |
| c = QueueSink() |
| |
| s > d1 > d2 > c |
| d1 ^ d2 |
| |
| p = PipeEngine(s) |
| p.start() |
| |
| s.send("hello") |
| s.send("trigger") |
| s.send("hello2") |
| s.send("trigger") |
| s.send("hello3") |
| |
| assert c.q.get(timeout=5) == "hello" |
| assert c.q.get(timeout=5) == "trigger" |
| assert c.q.get(timeout=5) == "hello3" |
| |
| p.stop() |
| |
| = TriggerDrain and TriggeredValve with CLIHighFeeder |
| |
| s = CLIHighFeeder() |
| d1 = TriggerDrain(lambda x:x=="trigger") |
| d2 = TriggeredValve() |
| c = QueueSink() |
| |
| s >> d1 |
| d1 >> d2 |
| d2 >> c |
| d1 ^ d2 |
| |
| p = PipeEngine(s) |
| p.start() |
| |
| s.send("hello") |
| s.send("trigger") |
| s.send("hello2") |
| s.send("trigger") |
| s.send("hello3") |
| |
| assert c.q.get(timeout=5) == "hello" |
| assert c.q.get(timeout=5) == "trigger" |
| assert c.q.get(timeout=5) == "hello3" |
| |
| p.stop() |
| |
| = TriggerDrain and TriggeredQueueingValve with CLIFeeder |
| |
| s = CLIFeeder() |
| d1 = TriggerDrain(lambda x:x=="trigger") |
| d2 = TriggeredValve() |
| c = QueueSink() |
| |
| s > d1 > d2 > c |
| d1 ^ d2 |
| |
| p = PipeEngine(s) |
| p.start() |
| |
| s.send("hello") |
| s.send("trigger") |
| s.send("hello2") |
| s.send("trigger") |
| s.send("hello3") |
| |
| assert c.q.get(timeout=5) == "hello" |
| assert c.q.get(timeout=5) == "trigger" |
| assert c.q.get(timeout=5) == "hello3" |
| |
| p.stop() |
| |
| = TriggerDrain and TriggeredSwitch with CLIFeeder on high channel |
| |
| s = CLIFeeder() |
| d1 = TriggerDrain(lambda x:x=="trigger") |
| d2 = TriggeredSwitch() |
| c = QueueSink() |
| |
| s > d1 > d2 |
| d2 >> c |
| d1 ^ d2 |
| |
| p = PipeEngine(s) |
| p.start() |
| |
| s.send("hello") |
| s.send("trigger") |
| s.send("hello2") |
| s.send("trigger") |
| s.send("hello3") |
| |
| assert c.q.get(timeout=5) == "trigger" |
| assert c.q.get(timeout=5) == "hello2" |
| |
| p.stop() |
| |
| = TriggerDrain and TriggeredSwitch with CLIHighFeeder on low channel |
| |
| s = CLIHighFeeder() |
| d1 = TriggerDrain(lambda x:x=="trigger") |
| d2 = TriggeredSwitch() |
| c = QueueSink() |
| |
| s >> d1 |
| d1 >> d2 |
| d2 > c |
| d1 ^ d2 |
| |
| p = PipeEngine(s) |
| p.start() |
| |
| s.send("hello") |
| s.send("trigger") |
| s.send("hello2") |
| s.send("trigger") |
| s.send("hello3") |
| |
| assert c.q.get(timeout=5) == "hello" |
| assert c.q.get(timeout=5) == "trigger" |
| assert c.q.get(timeout=5) == "hello3" |
| |
| p.stop() |
| |
| = TriggerDrain and TriggeredMessage |
| |
| s = CLIFeeder() |
| d1 = TriggerDrain(lambda x:x=="trigger") |
| d2 = TriggeredMessage("hello") |
| c = QueueSink() |
| |
| s > d1 > d2 > c |
| d1 ^ d2 |
| |
| p = PipeEngine(s) |
| p.start() |
| |
| s.send("trigger") |
| |
| r = [c.q.get(timeout=5), c.q.get(timeout=5)] |
| assert "hello" in r |
| assert "trigger" in r |
| |
| p.stop() |
| |
| = TriggerDrain and TriggeredQueueingValve on low channel |
| |
| p = PipeEngine() |
| |
| s = CLIFeeder() |
| r, w = os.pipe() |
| |
| d1 = TriggerDrain(lambda x:x=="trigger") |
| d2 = TriggeredQueueingValve() |
| c = QueueSink(name="c") |
| s > d1 > d2 > c |
| d1 ^ d2 |
| |
| p.add(s) |
| p.start() |
| |
| s.send("trigger") |
| s.send("hello") |
| s.send("trigger") |
| assert c.q.get(timeout=3) == "trigger" |
| assert c.q.get(timeout=3) in ['hello', 'trigger'] |
| assert c.q.get(timeout=3) in ['hello', 'trigger'] |
| assert d2.q.qsize() == 0 |
| |
| p.stop() |
| |
| = TriggerDrain and TriggeredQueueingValve on high channel |
| |
| p = PipeEngine() |
| |
| s = CLIHighFeeder() |
| r, w = os.pipe() |
| |
| d1 = TriggerDrain(lambda x:x=="trigger") |
| d2 = TriggeredQueueingValve() |
| c = QueueSink(name="c") |
| s >> d1 >> d2 >> c |
| d1 ^ d2 |
| |
| p.add(s) |
| p.start() |
| |
| s.send("trigger") |
| s.send("hello") |
| s.send("trigger") |
| assert c.q.get(timeout=3) == "trigger" |
| assert c.q.get(timeout=3) == "hello" |
| assert d2.q.qsize() == 0 |
| |
| p.stop() |
| |
| = UDPDrain |
| |
| p = PipeEngine() |
| |
| s = CLIFeeder() |
| s2 = CLIHighFeeder() |
| d1 = UDPDrain() |
| c = QueueSink() |
| |
| s > d1 > c |
| s2 >> d1 >> c |
| |
| p.add(s) |
| p.add(s2) |
| p.start() |
| |
| pkt = DNS() |
| |
| s.send(IP(src="127.0.0.1")/UDP()/DNS()) |
| s2.send(pkt) |
| |
| res = [c.q.get(timeout=2), c.q.get(timeout=2)] |
| assert raw(pkt) in res |
| res.remove(raw(pkt)) |
| assert DNS in res[0] and res[0][UDP].sport == 1234 |
| |
| p.stop() |
| |
| = FDSourceSink on a ObjectPipe object |
| |
| obj = ObjectPipe("fdsourcesink") |
| obj.send("hello") |
| |
| s = FDSourceSink(obj) |
| d = Drain() |
| c = QueueSink() |
| s > d > c |
| |
| s.push("data") |
| s.deliver() |
| assert c.q.get(timeout=1) == "hello" |
| |
| = UDPClientPipe and UDPServerPipe |
| ~ networking needs_root |
| |
| p = PipeEngine() |
| |
| s = CLIFeeder() |
| srv = UDPServerPipe(name="srv", port=10000) |
| cli = UDPClientPipe(name="cli", addr="127.0.0.1", port=10000) |
| c = QueueSink(name="c") |
| |
| s > cli |
| srv > c |
| |
| p.add(s, c) |
| p.start() |
| |
| s.send(b"hello") |
| p.start() |
| assert c.recv() == b"hello" |
| p.stop() |
| srv.stop() |
| |
| = TCPConnectPipe networking test |
| ~ networking needs_root |
| |
| p = PipeEngine() |
| |
| s = CLIFeeder() |
| d1 = TCPConnectPipe(addr="www.google.com", port=80) |
| c = QueueSink() |
| |
| s > d1 > c |
| |
| p.add(s) |
| p.start() |
| |
| from scapy.layers.http import HTTPRequest, HTTP |
| s.send(bytes(HTTP()/HTTPRequest(Host="www.google.com"))) |
| result = c.q.get(timeout=10) |
| p.stop() |
| |
| result |
| assert result.startswith(b"HTTP/1.1 200 OK") or result.startswith(b"HTTP/1.1 302 Found") |
| |