add drain support and a few tool options
diff --git a/apps/bench.py b/apps/bench.py
index b99cc11..1caea9f 100644
--- a/apps/bench.py
+++ b/apps/bench.py
@@ -80,10 +80,10 @@
 SPEED_RX_UUID = '016A2CC7-E14B-4819-935F-1F56EAE4098D'
 
 DEFAULT_RFCOMM_UUID = 'E6D55659-C8B4-4B85-96BB-B1143AF6D3AE'
-DEFAULT_L2CAP_PSM = 1234
+DEFAULT_L2CAP_PSM = 128
 DEFAULT_L2CAP_MAX_CREDITS = 128
 DEFAULT_L2CAP_MTU = 1024
-DEFAULT_L2CAP_MPS = 1022
+DEFAULT_L2CAP_MPS = 1024
 
 DEFAULT_LINGER_TIME = 1.0
 DEFAULT_POST_CONNECTION_WAIT_TIME = 1.0
@@ -240,6 +240,23 @@
     return 0
 
 
+def log_stats(title, stats):
+    stats_min = min(stats)
+    stats_max = max(stats)
+    stats_avg = sum(stats) / len(stats)
+    logging.info(
+        color(
+            (
+                f'### {title} stats: '
+                f'min={stats_min:.2f}, '
+                f'max={stats_max:.2f}, '
+                f'average={stats_avg:.2f}'
+            ),
+            'cyan',
+        )
+    )
+
+
 class PacketType(enum.IntEnum):
     RESET = 0
     SEQUENCE = 1
@@ -253,14 +270,27 @@
 # Sender
 # -----------------------------------------------------------------------------
 class Sender:
-    def __init__(self, packet_io, start_delay, packet_size, packet_count):
+    def __init__(
+        self,
+        packet_io,
+        start_delay,
+        repeat,
+        repeat_delay,
+        pace,
+        packet_size,
+        packet_count,
+    ):
         self.tx_start_delay = start_delay
         self.tx_packet_size = packet_size
         self.tx_packet_count = packet_count
         self.packet_io = packet_io
         self.packet_io.packet_listener = self
+        self.repeat = repeat
+        self.repeat_delay = repeat_delay
+        self.pace = pace
         self.start_time = 0
         self.bytes_sent = 0
+        self.stats = []
         self.done = asyncio.Event()
 
     def reset(self):
@@ -271,27 +301,57 @@
         await self.packet_io.ready.wait()
         logging.info(color('--- Go!', 'blue'))
 
-        if self.tx_start_delay:
-            logging.info(color(f'*** Startup delay: {self.tx_start_delay}', 'blue'))
-            await asyncio.sleep(self.tx_start_delay)
+        for run in range(self.repeat + 1):
+            self.done.clear()
 
-        logging.info(color('=== Sending RESET', 'magenta'))
-        await self.packet_io.send_packet(bytes([PacketType.RESET]))
-        self.start_time = time.time()
-        for tx_i in range(self.tx_packet_count):
-            packet_flags = PACKET_FLAG_LAST if tx_i == self.tx_packet_count - 1 else 0
-            packet = struct.pack(
-                '>bbI',
-                PacketType.SEQUENCE,
-                packet_flags,
-                tx_i,
-            ) + bytes(self.tx_packet_size - 6)
-            logging.info(color(f'Sending packet {tx_i}: {len(packet)} bytes', 'yellow'))
-            self.bytes_sent += len(packet)
-            await self.packet_io.send_packet(packet)
+            if run > 0 and self.repeat and self.repeat_delay:
+                logging.info(color(f'*** Repeat delay: {self.repeat_delay}', 'green'))
+                await asyncio.sleep(self.repeat_delay)
 
-        await self.done.wait()
-        logging.info(color('=== Done!', 'magenta'))
+            if self.tx_start_delay:
+                logging.info(color(f'*** Startup delay: {self.tx_start_delay}', 'blue'))
+                await asyncio.sleep(self.tx_start_delay)
+
+            logging.info(color('=== Sending RESET', 'magenta'))
+            await self.packet_io.send_packet(bytes([PacketType.RESET]))
+            self.start_time = time.time()
+            self.bytes_sent = 0
+            for tx_i in range(self.tx_packet_count):
+                packet_flags = (
+                    PACKET_FLAG_LAST if tx_i == self.tx_packet_count - 1 else 0
+                )
+                packet = struct.pack(
+                    '>bbI',
+                    PacketType.SEQUENCE,
+                    packet_flags,
+                    tx_i,
+                ) + bytes(self.tx_packet_size - 6 - self.packet_io.overhead_size)
+                logging.info(
+                    color(
+                        f'Sending packet {tx_i}: {self.tx_packet_size} bytes', 'yellow'
+                    )
+                )
+                self.bytes_sent += len(packet)
+                await self.packet_io.send_packet(packet)
+
+                if self.pace is None:
+                    continue
+
+                if self.pace > 0:
+                    await asyncio.sleep(self.pace / 1000)
+                else:
+                    await self.packet_io.drain()
+
+            await self.done.wait()
+
+            run_counter = f'[{run + 1} of {self.repeat + 1}]' if self.repeat else ''
+            logging.info(color(f'=== {run_counter} Done!', 'magenta'))
+
+            if self.repeat:
+                log_stats('Run', self.stats)
+
+        if self.repeat:
+            logging.info(color('--- End of runs', 'blue'))
 
     def on_packet_received(self, packet):
         try:
@@ -302,6 +362,7 @@
         if packet_type == PacketType.ACK:
             elapsed = time.time() - self.start_time
             average_tx_speed = self.bytes_sent / elapsed
+            self.stats.append(average_tx_speed)
             logging.info(
                 color(
                     f'@@@ Received ACK. Speed: average={average_tx_speed:.4f}'
@@ -320,17 +381,17 @@
     start_timestamp: float
     last_timestamp: float
 
-    def __init__(self, packet_io):
+    def __init__(self, packet_io, linger):
         self.reset()
         self.packet_io = packet_io
         self.packet_io.packet_listener = self
+        self.linger = linger
         self.done = asyncio.Event()
 
     def reset(self):
         self.expected_packet_index = 0
-        self.start_timestamp = 0.0
-        self.last_timestamp = 0.0
-        self.bytes_received = 0
+        self.measurements = [(time.time(), 0)]
+        self.total_bytes_received = 0
 
     def on_packet_received(self, packet):
         try:
@@ -338,12 +399,9 @@
         except ValueError:
             return
 
-        now = time.time()
-
         if packet_type == PacketType.RESET:
             logging.info(color('=== Received RESET', 'magenta'))
             self.reset()
-            self.start_timestamp = now
             return
 
         try:
@@ -352,7 +410,8 @@
             return
         logging.info(
             f'<<< Received packet {packet_index}: '
-            f'flags=0x{packet_flags:02X}, {len(packet)} bytes'
+            f'flags=0x{packet_flags:02X}, '
+            f'{len(packet) + self.packet_io.overhead_size} bytes'
         )
 
         if packet_index != self.expected_packet_index:
@@ -363,19 +422,27 @@
                 )
             )
 
-        elapsed_since_start = now - self.start_timestamp
-        elapsed_since_last = now - self.last_timestamp
-        self.bytes_received += len(packet)
+        now = time.time()
+        elapsed_since_start = now - self.measurements[0][0]
+        elapsed_since_last = now - self.measurements[-1][0]
+        self.measurements.append((now, len(packet)))
+        self.total_bytes_received += len(packet)
         instant_rx_speed = len(packet) / elapsed_since_last
-        average_rx_speed = self.bytes_received / elapsed_since_start
+        average_rx_speed = self.total_bytes_received / elapsed_since_start
+        window = self.measurements[-64:]
+        windowed_rx_speed = sum(measurement[1] for measurement in window[1:]) / (
+            window[-1][0] - window[0][0]
+        )
         logging.info(
             color(
-                f'Speed: instant={instant_rx_speed:.4f}, average={average_rx_speed:.4f}',
+                'Speed: '
+                f'instant={instant_rx_speed:.4f}, '
+                f'windowed={windowed_rx_speed:.4f}, '
+                f'average={average_rx_speed:.4f}',
                 'yellow',
             )
         )
 
-        self.last_timestamp = now
         self.expected_packet_index = packet_index + 1
 
         if packet_flags & PACKET_FLAG_LAST:
@@ -385,7 +452,8 @@
                 )
             )
             logging.info(color('@@@ Received last packet', 'green'))
-            self.done.set()
+            if not self.linger:
+                self.done.set()
 
     async def run(self):
         await self.done.wait()
@@ -396,16 +464,31 @@
 # Ping
 # -----------------------------------------------------------------------------
 class Ping:
-    def __init__(self, packet_io, start_delay, packet_size, packet_count):
+    def __init__(
+        self,
+        packet_io,
+        start_delay,
+        repeat,
+        repeat_delay,
+        pace,
+        packet_size,
+        packet_count,
+    ):
         self.tx_start_delay = start_delay
         self.tx_packet_size = packet_size
         self.tx_packet_count = packet_count
         self.packet_io = packet_io
         self.packet_io.packet_listener = self
+        self.repeat = repeat
+        self.repeat_delay = repeat_delay
+        self.pace = pace
         self.done = asyncio.Event()
         self.current_packet_index = 0
         self.ping_sent_time = 0.0
         self.latencies = []
+        self.min_stats = []
+        self.max_stats = []
+        self.avg_stats = []
 
     def reset(self):
         pass
@@ -415,21 +498,53 @@
         await self.packet_io.ready.wait()
         logging.info(color('--- Go!', 'blue'))
 
-        if self.tx_start_delay:
-            logging.info(color(f'*** Startup delay: {self.tx_start_delay}', 'blue'))
-            await asyncio.sleep(self.tx_start_delay)
+        for run in range(self.repeat + 1):
+            self.done.clear()
 
-        logging.info(color('=== Sending RESET', 'magenta'))
-        await self.packet_io.send_packet(bytes([PacketType.RESET]))
+            if run > 0 and self.repeat and self.repeat_delay:
+                logging.info(color(f'*** Repeat delay: {self.repeat_delay}', 'green'))
+                await asyncio.sleep(self.repeat_delay)
 
-        await self.send_next_ping()
+            if self.tx_start_delay:
+                logging.info(color(f'*** Startup delay: {self.tx_start_delay}', 'blue'))
+                await asyncio.sleep(self.tx_start_delay)
 
-        await self.done.wait()
-        average_latency = sum(self.latencies) / len(self.latencies)
-        logging.info(color(f'@@@ Average latency: {average_latency:.2f}'))
-        logging.info(color('=== Done!', 'magenta'))
+            logging.info(color('=== Sending RESET', 'magenta'))
+            await self.packet_io.send_packet(bytes([PacketType.RESET]))
+
+            self.current_packet_index = 0
+            await self.send_next_ping()
+
+            await self.done.wait()
+
+            min_latency = min(self.latencies)
+            max_latency = max(self.latencies)
+            avg_latency = sum(self.latencies) / len(self.latencies)
+            logging.info(color(
+                '@@@ Latencies: '
+                f'min={min_latency:.2f}, '
+                f'max={max_latency:.2f}, '
+                f'average={avg_latency:.2f}'))
+
+            self.min_stats.append(min_latency)
+            self.max_stats.append(max_latency)
+            self.avg_stats.append(avg_latency)
+
+            run_counter = f'[{run + 1} of {self.repeat + 1}]' if self.repeat else ''
+            logging.info(color(f'=== {run_counter} Done!', 'magenta'))
+
+            if self.repeat:
+                log_stats('Min Latency', self.min_stats)
+                log_stats('Max Latency', self.max_stats)
+                log_stats('Average Latency', self.avg_stats)
+
+        if self.repeat:
+            logging.info(color('--- End of runs', 'blue'))
 
     async def send_next_ping(self):
+        if self.pace:
+            await asyncio.sleep(self.pace / 1000)
+
         packet = struct.pack(
             '>bbI',
             PacketType.SEQUENCE,
@@ -488,10 +603,11 @@
 class Pong:
     expected_packet_index: int
 
-    def __init__(self, packet_io):
+    def __init__(self, packet_io, linger):
         self.reset()
         self.packet_io = packet_io
         self.packet_io.packet_listener = self
+        self.linger = linger
         self.done = asyncio.Event()
 
     def reset(self):
@@ -536,7 +652,7 @@
             )
         )
 
-        if packet_flags & PACKET_FLAG_LAST:
+        if packet_flags & PACKET_FLAG_LAST and not self.linger:
             self.done.set()
 
     async def run(self):
@@ -554,6 +670,7 @@
         self.speed_tx = None
         self.packet_listener = None
         self.ready = asyncio.Event()
+        self.overhead_size = 0
 
     async def on_connection(self, connection):
         peer = Peer(connection)
@@ -603,6 +720,9 @@
     async def send_packet(self, packet):
         await self.speed_tx.write_value(packet)
 
+    async def drain(self):
+        pass
+
 
 # -----------------------------------------------------------------------------
 # GattServer
@@ -612,6 +732,7 @@
         self.device = device
         self.packet_listener = None
         self.ready = asyncio.Event()
+        self.overhead_size = 0
 
         # Setup the GATT service
         self.speed_tx = Characteristic(
@@ -653,6 +774,9 @@
     async def send_packet(self, packet):
         await self.device.notify_subscribers(self.speed_rx, packet)
 
+    async def drain(self):
+        pass
+
 
 # -----------------------------------------------------------------------------
 # StreamedPacketIO
@@ -664,6 +788,7 @@
         self.rx_packet = b''
         self.rx_packet_header = b''
         self.rx_packet_need = 0
+        self.overhead_size = 2
 
     def on_packet(self, packet):
         while packet:
@@ -715,6 +840,7 @@
         self.max_credits = max_credits
         self.mtu = mtu
         self.mps = mps
+        self.l2cap_channel = None
         self.ready = asyncio.Event()
 
     async def on_connection(self, connection: Connection) -> None:
@@ -736,9 +862,10 @@
             logging.info(color(f'!!! Connection failed: {error}', 'red'))
             return
 
-        l2cap_channel.sink = self.on_packet
-        l2cap_channel.on('close', self.on_l2cap_close)
         self.io_sink = l2cap_channel.write
+        self.l2cap_channel = l2cap_channel
+        l2cap_channel.on('close', self.on_l2cap_close)
+        l2cap_channel.sink = self.on_packet
 
         self.ready.set()
 
@@ -748,6 +875,10 @@
     def on_l2cap_close(self):
         logging.info(color('*** L2CAP channel closed', 'red'))
 
+    async def drain(self):
+        assert self.l2cap_channel
+        await self.l2cap_channel.drain()
+
 
 # -----------------------------------------------------------------------------
 # L2capServer
@@ -786,6 +917,7 @@
         logging.info(color(f'*** L2CAP channel: {l2cap_channel}', 'cyan'))
 
         self.io_sink = l2cap_channel.write
+        self.l2cap_channel = l2cap_channel
         l2cap_channel.on('close', self.on_l2cap_close)
         l2cap_channel.sink = self.on_packet
 
@@ -795,6 +927,10 @@
         logging.info(color('*** L2CAP channel closed', 'red'))
         self.l2cap_channel = None
 
+    async def drain(self):
+        assert self.l2cap_channel
+        await self.l2cap_channel.drain()
+
 
 # -----------------------------------------------------------------------------
 # RfcommClient
@@ -805,6 +941,7 @@
         self.device = device
         self.channel = channel
         self.uuid = uuid
+        self.rfcomm_session = None
         self.ready = asyncio.Event()
 
     async def on_connection(self, connection):
@@ -840,12 +977,17 @@
 
         rfcomm_session.sink = self.on_packet
         self.io_sink = rfcomm_session.write
+        self.rfcomm_session = rfcomm_session
 
         self.ready.set()
 
     def on_disconnection(self, _):
         pass
 
+    async def drain(self):
+        assert self.rfcomm_session
+        await self.rfcomm_session.drain()
+
 
 # -----------------------------------------------------------------------------
 # RfcommServer
@@ -853,6 +995,7 @@
 class RfcommServer(StreamedPacketIO):
     def __init__(self, device, channel):
         super().__init__()
+        self.dlc = None
         self.ready = asyncio.Event()
 
         # Create and register a server
@@ -881,6 +1024,11 @@
         logging.info(color(f'*** DLC connected: {dlc}', 'blue'))
         dlc.sink = self.on_packet
         self.io_sink = dlc.write
+        self.dlc = dlc
+
+    async def drain(self):
+        assert self.dlc
+        await self.dlc.drain()
 
 
 # -----------------------------------------------------------------------------
@@ -1030,6 +1178,7 @@
 
             await role.run()
             await asyncio.sleep(DEFAULT_LINGER_TIME)
+            await self.connection.disconnect()
 
     def on_disconnection(self, reason):
         logging.info(color(f'!!! Disconnection: reason={reason}', 'red'))
@@ -1120,12 +1269,8 @@
 
         # Stop being discoverable and connectable
         if self.classic:
-
-            async def stop_being_discoverable_connectable():
-                await self.device.set_discoverable(False)
-                await self.device.set_connectable(False)
-
-            AsyncRunner.spawn(stop_being_discoverable_connectable())
+            AsyncRunner.spawn(self.device.set_discoverable(False))
+            AsyncRunner.spawn(self.device.set_connectable(False))
 
         # Request a new data length if needed
         if self.extended_data_length:
@@ -1141,6 +1286,10 @@
         self.connection = None
         self.role.reset()
 
+        if self.classic:
+            AsyncRunner.spawn(self.device.set_discoverable(True))
+            AsyncRunner.spawn(self.device.set_connectable(True))
+
     def on_connection_parameters_update(self):
         print_connection(self.connection)
 
@@ -1168,10 +1317,22 @@
             return GattServer(device)
 
         if mode == 'l2cap-client':
-            return L2capClient(device, psm=ctx.obj['l2cap_psm'])
+            return L2capClient(
+                device,
+                psm=ctx.obj['l2cap_psm'],
+                mtu=ctx.obj['l2cap_mtu'],
+                mps=ctx.obj['l2cap_mps'],
+                max_credits=ctx.obj['l2cap_max_credits'],
+            )
 
         if mode == 'l2cap-server':
-            return L2capServer(device, psm=ctx.obj['l2cap_psm'])
+            return L2capServer(
+                device,
+                psm=ctx.obj['l2cap_psm'],
+                mtu=ctx.obj['l2cap_mtu'],
+                mps=ctx.obj['l2cap_mps'],
+                max_credits=ctx.obj['l2cap_max_credits'],
+            )
 
         if mode == 'rfcomm-client':
             return RfcommClient(
@@ -1197,23 +1358,29 @@
             return Sender(
                 packet_io,
                 start_delay=ctx.obj['start_delay'],
+                repeat=ctx.obj['repeat'],
+                repeat_delay=ctx.obj['repeat_delay'],
+                pace=ctx.obj['pace'],
                 packet_size=ctx.obj['packet_size'],
                 packet_count=ctx.obj['packet_count'],
             )
 
         if role == 'receiver':
-            return Receiver(packet_io)
+            return Receiver(packet_io, ctx.obj['linger'])
 
         if role == 'ping':
             return Ping(
                 packet_io,
                 start_delay=ctx.obj['start_delay'],
+                repeat=ctx.obj['repeat'],
+                repeat_delay=ctx.obj['repeat_delay'],
+                pace=ctx.obj['pace'],
                 packet_size=ctx.obj['packet_size'],
                 packet_count=ctx.obj['packet_count'],
             )
 
         if role == 'pong':
-            return Pong(packet_io)
+            return Pong(packet_io, ctx.obj['linger'])
 
         raise ValueError('invalid role')
 
@@ -1267,12 +1434,30 @@
     help='L2CAP PSM to use',
 )
 @click.option(
+    '--l2cap-mtu',
+    type=int,
+    default=DEFAULT_L2CAP_MTU,
+    help='L2CAP MTU to use',
+)
[email protected](
+    '--l2cap-mps',
+    type=int,
+    default=DEFAULT_L2CAP_MPS,
+    help='L2CAP MPS to use',
+)
[email protected](
+    '--l2cap-max-credits',
+    type=int,
+    default=DEFAULT_L2CAP_MAX_CREDITS,
+    help='L2CAP maximum number of credits allowed for the peer',
+)
[email protected](
     '--packet-size',
     '-s',
     metavar='SIZE',
     type=click.IntRange(8, 4096),
     default=500,
-    help='Packet size (server role)',
+    help='Packet size (client or ping role)',
 )
 @click.option(
     '--packet-count',
@@ -1280,7 +1465,7 @@
     metavar='COUNT',
     type=int,
     default=10,
-    help='Packet count (server role)',
+    help='Packet count (client or ping role)',
 )
 @click.option(
     '--start-delay',
@@ -1288,7 +1473,39 @@
     metavar='SECONDS',
     type=int,
     default=1,
-    help='Start delay (server role)',
+    help='Start delay (client or ping role)',
+)
[email protected](
+    '--repeat',
+    metavar='N',
+    type=int,
+    default=0,
+    help=(
+        'Repeat the run N times (client and ping roles)'
+        '(0, which is the fault, to run just once) '
+    ),
+)
[email protected](
+    '--repeat-delay',
+    metavar='SECONDS',
+    type=int,
+    default=1,
+    help=('Delay, in seconds, between repeats'),
+)
[email protected](
+    '--pace',
+    metavar='MILLISECONDS',
+    type=int,
+    default=0,
+    help=(
+        'Wait N milliseconds between packets '
+        '(0, which is the fault, to send as fast as possible) '
+    ),
+)
[email protected](
+    '--linger',
+    is_flag=True,
+    help="Don't exit at the end of a run (server and pong roles)",
 )
 @click.pass_context
 def bench(
@@ -1301,9 +1518,16 @@
     packet_size,
     packet_count,
     start_delay,
+    repeat,
+    repeat_delay,
+    pace,
+    linger,
     rfcomm_channel,
     rfcomm_uuid,
     l2cap_psm,
+    l2cap_mtu,
+    l2cap_mps,
+    l2cap_max_credits,
 ):
     ctx.ensure_object(dict)
     ctx.obj['device_config'] = device_config
@@ -1313,9 +1537,16 @@
     ctx.obj['rfcomm_channel'] = rfcomm_channel
     ctx.obj['rfcomm_uuid'] = rfcomm_uuid
     ctx.obj['l2cap_psm'] = l2cap_psm
+    ctx.obj['l2cap_mtu'] = l2cap_mtu
+    ctx.obj['l2cap_mps'] = l2cap_mps
+    ctx.obj['l2cap_max_credits'] = l2cap_max_credits
     ctx.obj['packet_size'] = packet_size
     ctx.obj['packet_count'] = packet_count
     ctx.obj['start_delay'] = start_delay
+    ctx.obj['repeat'] = repeat
+    ctx.obj['repeat_delay'] = repeat_delay
+    ctx.obj['pace'] = pace
+    ctx.obj['linger'] = linger
 
     ctx.obj['extended_data_length'] = (
         [int(x) for x in extended_data_length.split('/')]