| #!/usr/bin/env python3 |
| # Copyright (c) PLUMgrid, Inc. |
| # Licensed under the Apache License, Version 2.0 (the "License") |
| |
| from bcc import BPF |
| import ctypes as ct |
| import random |
| import time |
| import subprocess |
| from bcc.utils import get_online_cpus |
| from unittest import main, TestCase |
| |
| class TestArray(TestCase): |
| def test_simple(self): |
| b = BPF(text=b"""BPF_ARRAY(table1, u64, 128);""") |
| t1 = b[b"table1"] |
| t1[ct.c_int(0)] = ct.c_ulonglong(100) |
| t1[ct.c_int(127)] = ct.c_ulonglong(1000) |
| for i, v in t1.items(): |
| if i.value == 0: |
| self.assertEqual(v.value, 100) |
| if i.value == 127: |
| self.assertEqual(v.value, 1000) |
| self.assertEqual(len(t1), 128) |
| |
| def test_native_type(self): |
| b = BPF(text=b"""BPF_ARRAY(table1, u64, 128);""") |
| t1 = b[b"table1"] |
| t1[0] = ct.c_ulonglong(100) |
| t1[-2] = ct.c_ulonglong(37) |
| t1[127] = ct.c_ulonglong(1000) |
| for i, v in t1.items(): |
| if i.value == 0: |
| self.assertEqual(v.value, 100) |
| if i.value == 127: |
| self.assertEqual(v.value, 1000) |
| self.assertEqual(len(t1), 128) |
| self.assertEqual(t1[-2].value, 37) |
| self.assertEqual(t1[-1].value, t1[127].value) |
| |
| def test_perf_buffer(self): |
| self.counter = 0 |
| |
| class Data(ct.Structure): |
| _fields_ = [("ts", ct.c_ulonglong)] |
| |
| def cb(cpu, data, size): |
| self.assertGreater(size, ct.sizeof(Data)) |
| event = ct.cast(data, ct.POINTER(Data)).contents |
| self.counter += 1 |
| |
| def lost_cb(lost): |
| self.assertGreater(lost, 0) |
| |
| text = b""" |
| BPF_PERF_OUTPUT(events); |
| int do_sys_nanosleep(void *ctx) { |
| struct { |
| u64 ts; |
| } data = {bpf_ktime_get_ns()}; |
| events.perf_submit(ctx, &data, sizeof(data)); |
| return 0; |
| } |
| """ |
| b = BPF(text=text) |
| b.attach_kprobe(event=b.get_syscall_fnname(b"nanosleep"), |
| fn_name=b"do_sys_nanosleep") |
| b.attach_kprobe(event=b.get_syscall_fnname(b"clock_nanosleep"), |
| fn_name=b"do_sys_nanosleep") |
| b[b"events"].open_perf_buffer(cb, lost_cb=lost_cb) |
| subprocess.call(['sleep', '0.1']) |
| b.perf_buffer_poll() |
| self.assertGreater(self.counter, 0) |
| b.cleanup() |
| |
| def test_perf_buffer_for_each_cpu(self): |
| self.events = [] |
| |
| class Data(ct.Structure): |
| _fields_ = [("cpu", ct.c_ulonglong)] |
| |
| def cb(cpu, data, size): |
| self.assertGreater(size, ct.sizeof(Data)) |
| event = ct.cast(data, ct.POINTER(Data)).contents |
| self.events.append(event) |
| |
| def lost_cb(lost): |
| self.assertGreater(lost, 0) |
| |
| text = b""" |
| BPF_PERF_OUTPUT(events); |
| int do_sys_nanosleep(void *ctx) { |
| struct { |
| u64 cpu; |
| } data = {bpf_get_smp_processor_id()}; |
| events.perf_submit(ctx, &data, sizeof(data)); |
| return 0; |
| } |
| """ |
| b = BPF(text=text) |
| b.attach_kprobe(event=b.get_syscall_fnname(b"nanosleep"), |
| fn_name=b"do_sys_nanosleep") |
| b.attach_kprobe(event=b.get_syscall_fnname(b"clock_nanosleep"), |
| fn_name=b"do_sys_nanosleep") |
| b[b"events"].open_perf_buffer(cb, lost_cb=lost_cb) |
| online_cpus = get_online_cpus() |
| for cpu in online_cpus: |
| subprocess.call(['taskset', '-c', str(cpu), 'sleep', '0.1']) |
| b.perf_buffer_poll() |
| b.cleanup() |
| self.assertGreaterEqual(len(self.events), len(online_cpus), 'Received only {}/{} events'.format(len(self.events), len(online_cpus))) |
| |
| if __name__ == "__main__": |
| main() |