Add L2CAP CoC support (squashed)
[85542e0] fix test
[3748781] add ASAH sink example
[e782e29] add app
[83daa30] wip
[7f138a0] add test
[f732108] allow different address syntax
[9d0bbf8] rename deprecated methods
[eb303d5] add LE CoC support
diff --git a/apps/gg_bridge.py b/apps/gg_bridge.py
index d524913..ac3df8d 100644
--- a/apps/gg_bridge.py
+++ b/apps/gg_bridge.py
@@ -17,13 +17,14 @@
 # -----------------------------------------------------------------------------
 import asyncio
 import os
+import struct
 import logging
 import click
 from colors import color
 
 from bumble.device import Device, Peer
 from bumble.core import AdvertisingData
-from bumble.gatt import Service, Characteristic
+from bumble.gatt import Service, Characteristic, CharacteristicValue
 from bumble.utils import AsyncRunner
 from bumble.transport import open_transport_or_link
 from bumble.hci import HCI_Constant
@@ -41,13 +42,59 @@
 
 
 # -----------------------------------------------------------------------------
-class GattlinkHubBridge(Device.Listener):
+class GattlinkL2capEndpoint:
     def __init__(self):
-        self.peer              = None
-        self.rx_socket         = None
-        self.tx_socket         = None
-        self.rx_characteristic = None
-        self.tx_characteristic = None
+        self.l2cap_channel     = None
+        self.l2cap_packet      = b''
+        self.l2cap_packet_size = 0
+
+    # Called when an L2CAP SDU has been received
+    def on_coc_sdu(self, sdu):
+        print(color(f'<<< [L2CAP SDU]: {len(sdu)} bytes', 'cyan'))
+        while len(sdu):
+            if self.l2cap_packet_size == 0:
+                # Expect a new packet
+                self.l2cap_packet_size = sdu[0] + 1
+                sdu = sdu[1:]
+            else:
+                bytes_needed = self.l2cap_packet_size - len(self.l2cap_packet)
+                chunk = min(bytes_needed, len(sdu))
+                self.l2cap_packet += sdu[:chunk]
+                sdu = sdu[chunk:]
+                if len(self.l2cap_packet) == self.l2cap_packet_size:
+                    self.on_l2cap_packet(self.l2cap_packet)
+                    self.l2cap_packet = b''
+                    self.l2cap_packet_size = 0
+
+
+# -----------------------------------------------------------------------------
+class GattlinkHubBridge(GattlinkL2capEndpoint, Device.Listener):
+    def __init__(self, device, peer_address):
+        super().__init__()
+        self.device                   = device
+        self.peer_address             = peer_address
+        self.peer                     = None
+        self.tx_socket                = None
+        self.rx_characteristic        = None
+        self.tx_characteristic        = None
+        self.l2cap_psm_characteristic = None
+
+        device.listener = self
+
+    async def start(self):
+        # Connect to the peer
+        print(f'=== Connecting to {self.peer_address}...')
+        await self.device.connect(self.peer_address)
+
+    async def connect_l2cap(self, psm):
+        print(color(f'### Connecting with L2CAP on PSM = {psm}', 'yellow'))
+        try:
+            self.l2cap_channel = await self.peer.connection.open_l2cap_channel(psm)
+            print(color('*** Connected', 'yellow'), self.l2cap_channel)
+            self.l2cap_channel.sink = self.on_coc_sdu
+
+        except Exception as error:
+            print(color(f'!!! Connection failed: {error}', 'red'))
 
     @AsyncRunner.run_in_task()
     async def on_connection(self, connection):
@@ -80,15 +127,24 @@
                 self.rx_characteristic = characteristic
             elif characteristic.uuid == GG_GATTLINK_TX_CHARACTERISTIC_UUID:
                 self.tx_characteristic = characteristic
+            elif characteristic.uuid == GG_GATTLINK_L2CAP_CHANNEL_PSM_CHARACTERISTIC_UUID:
+                self.l2cap_psm_characteristic = characteristic
         print('RX:', self.rx_characteristic)
         print('TX:', self.tx_characteristic)
+        print('PSM:', self.l2cap_psm_characteristic)
 
-        # Subscribe to TX
-        if self.tx_characteristic:
+        if self.l2cap_psm_characteristic:
+            # Subscribe to and then read the PSM value
+            await self.peer.subscribe(self.l2cap_psm_characteristic, self.on_l2cap_psm_received)
+            psm_bytes = await self.peer.read_value(self.l2cap_psm_characteristic)
+            psm = struct.unpack('<H', psm_bytes)[0]
+            await self.connect_l2cap(psm)
+        elif self.tx_characteristic:
+            # Subscribe to TX
             await self.peer.subscribe(self.tx_characteristic, self.on_tx_received)
             print(color('=== Subscribed to Gattlink TX', 'yellow'))
         else:
-            print(color('!!! Gattlink TX not found', 'red'))
+            print(color('!!! No Gattlink TX or PSM found', 'red'))
 
     def on_connection_failure(self, error):
         print(color(f'!!! Connection failed: {error}'))
@@ -99,31 +155,23 @@
         self.rx_characteristic = None
         self.peer = None
 
+    # Called when an L2CAP packet has been received
+    def on_l2cap_packet(self, packet):
+        print(color(f'<<< [L2CAP PACKET]: {len(packet)} bytes', 'cyan'))
+        print(color('>>> [UDP]', 'magenta'))
+        self.tx_socket.sendto(packet)
+
     # Called by the GATT client when a notification is received
     def on_tx_received(self, value):
-        print(color('>>> TX:', 'magenta'), value.hex())
+        print(color(f'<<< [GATT TX]: {len(value)} bytes', 'cyan'))
         if self.tx_socket:
+            print(color('>>> [UDP]', 'magenta'))
             self.tx_socket.sendto(value)
 
     # Called by asyncio when the UDP socket is created
-    def connection_made(self, transport):
-        pass
-
-    # Called by asyncio when a UDP datagram is received
-    def datagram_received(self, data, address):
-        print(color('<<< RX:', 'magenta'), data.hex())
-
-        # TODO: use a queue instead of creating a task everytime
-        if self.peer and self.rx_characteristic:
-            asyncio.create_task(self.peer.write_value(self.rx_characteristic, data))
-
-
-# -----------------------------------------------------------------------------
-class GattlinkNodeBridge(Device.Listener):
-    def __init__(self):
-        self.peer      = None
-        self.rx_socket = None
-        self.tx_socket = None
+    def on_l2cap_psm_received(self, value):
+        psm = struct.unpack('<H', value)[0]
+        asyncio.create_task(self.connect_l2cap(psm))
 
     # Called by asyncio when the UDP socket is created
     def connection_made(self, transport):
@@ -131,21 +179,130 @@
 
     # Called by asyncio when a UDP datagram is received
     def datagram_received(self, data, address):
-        print(color('<<< RX:', 'magenta'), data.hex())
+        print(color(f'<<< [UDP]: {len(data)} bytes', 'green'))
 
-        # TODO: use a queue instead of creating a task everytime
-        if self.peer and self.rx_characteristic:
+        if self.l2cap_channel:
+            print(color('>>> [L2CAP]', 'yellow'))
+            self.l2cap_channel.write(bytes([len(data) - 1]) + data)
+        elif self.peer and self.rx_characteristic:
+            print(color('>>> [GATT RX]', 'yellow'))
             asyncio.create_task(self.peer.write_value(self.rx_characteristic, data))
 
 
 # -----------------------------------------------------------------------------
-async def run(hci_transport, device_address, send_host, send_port, receive_host, receive_port):
+class GattlinkNodeBridge(GattlinkL2capEndpoint, Device.Listener):
+    def __init__(self, device):
+        super().__init__()
+        self.device            = device
+        self.peer              = None
+        self.tx_socket         = None
+        self.tx_subscriber     = None
+        self.rx_characteristic = None
+
+        # Register as a listener
+        device.listener = self
+
+        # Listen for incoming L2CAP CoC connections
+        psm = 0xFB
+        device.register_l2cap_channel_server(0xFB, self.on_coc)
+        print(f'### Listening for CoC connection on PSM {psm}')
+
+        # Setup the Gattlink service
+        self.rx_characteristic = Characteristic(
+            GG_GATTLINK_RX_CHARACTERISTIC_UUID,
+            Characteristic.WRITE_WITHOUT_RESPONSE,
+            Characteristic.WRITEABLE,
+            CharacteristicValue(write=self.on_rx_write)
+        )
+        self.tx_characteristic = Characteristic(
+            GG_GATTLINK_TX_CHARACTERISTIC_UUID,
+            Characteristic.NOTIFY,
+            Characteristic.READABLE
+        )
+        self.tx_characteristic.on('subscription', self.on_tx_subscription)
+        self.psm_characteristic = Characteristic(
+            GG_GATTLINK_L2CAP_CHANNEL_PSM_CHARACTERISTIC_UUID,
+            Characteristic.READ | Characteristic.NOTIFY,
+            Characteristic.READABLE,
+            bytes([psm, 0])
+        )
+        gattlink_service = Service(
+            GG_GATTLINK_SERVICE_UUID,
+            [
+                self.rx_characteristic,
+                self.tx_characteristic,
+                self.psm_characteristic
+            ]
+        )
+        device.add_services([gattlink_service])
+        device.advertising_data = bytes(
+            AdvertisingData([
+                (AdvertisingData.COMPLETE_LOCAL_NAME, bytes('Bumble GG', 'utf-8')),
+                (AdvertisingData.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS,
+                 bytes(reversed(bytes.fromhex('ABBAFF00E56A484CB8328B17CF6CBFE8'))))
+            ])
+        )
+
+    async def start(self):
+        await self.device.start_advertising()
+
+    # Called by asyncio when the UDP socket is created
+    def connection_made(self, transport):
+        self.transport = transport
+
+    # Called by asyncio when a UDP datagram is received
+    def datagram_received(self, data, address):
+        print(color(f'<<< [UDP]: {len(data)} bytes', 'green'))
+
+        if self.l2cap_channel:
+            print(color('>>> [L2CAP]', 'yellow'))
+            self.l2cap_channel.write(bytes([len(data) - 1]) + data)
+        elif self.tx_subscriber:
+            print(color('>>> [GATT TX]', 'yellow'))
+            self.tx_characteristic.value = data
+            asyncio.create_task(self.device.notify_subscribers(self.tx_characteristic))
+
+    # Called when a write to the RX characteristic has been received
+    def on_rx_write(self, connection, data):
+        print(color(f'<<< [GATT RX]: {len(data)} bytes', 'cyan'))
+        print(color('>>> [UDP]', 'magenta'))
+        self.tx_socket.sendto(data)
+
+    # Called when the subscription to the TX characteristic has changed
+    def on_tx_subscription(self, peer, enabled):
+        print(f'### [GATT TX] subscription from {peer}: {"enabled" if enabled else "disabled"}')
+        if enabled:
+            self.tx_subscriber = peer
+        else:
+            self.tx_subscriber = None
+
+    # Called when an L2CAP packet is received
+    def on_l2cap_packet(self, packet):
+        print(color(f'<<< [L2CAP PACKET]: {len(packet)} bytes', 'cyan'))
+        print(color('>>> [UDP]', 'magenta'))
+        self.tx_socket.sendto(packet)
+
+    # Called when a new connection is established
+    def on_coc(self, channel):
+        print('*** CoC Connection', channel)
+        self.l2cap_channel = channel
+        channel.sink = self.on_coc_sdu
+
+
+# -----------------------------------------------------------------------------
+async def run(hci_transport, device_address, role_or_peer_address, send_host, send_port, receive_host, receive_port):
     print('<<< connecting to HCI...')
     async with await open_transport_or_link(hci_transport) as (hci_source, hci_sink):
         print('<<< connected')
 
         # Instantiate a bridge object
-        bridge = GattlinkNodeBridge()
+        device = Device.with_hci('Bumble GG', device_address, hci_source, hci_sink)
+
+        # Instantiate a bridge object
+        if role_or_peer_address == 'node':
+            bridge = GattlinkNodeBridge(device)
+        else:
+            bridge = GattlinkHubBridge(device, role_or_peer_address)
 
         # Create a UDP to RX bridge (receive from UDP, send to RX)
         loop = asyncio.get_running_loop()
@@ -160,35 +317,8 @@
             remote_addr=(send_host, send_port)
         )
 
-        # Create a device to manage the host, with a custom listener
-        device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
-        device.listener = bridge
         await device.power_on()
-
-        # Connect to the peer
-        # print(f'=== Connecting to {device_address}...')
-        # await device.connect(device_address)
-
-        # TODO move to class
-        gattlink_service = Service(
-            GG_GATTLINK_SERVICE_UUID,
-            [
-                Characteristic(
-                    GG_GATTLINK_L2CAP_CHANNEL_PSM_CHARACTERISTIC_UUID,
-                    Characteristic.READ,
-                    Characteristic.READABLE,
-                    bytes([193, 0])
-                )
-            ]
-        )
-        device.add_services([gattlink_service])
-        device.advertising_data = bytes(
-            AdvertisingData([
-                (AdvertisingData.COMPLETE_LOCAL_NAME, bytes('Bumble GG', 'utf-8')),
-                (AdvertisingData.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS, bytes(reversed(bytes.fromhex('ABBAFF00E56A484CB8328B17CF6CBFE8'))))
-            ])
-        )
-        await device.start_advertising()
+        await bridge.start()
 
         # Wait until the source terminates
         await hci_source.wait_for_termination()
@@ -197,15 +327,16 @@
 @click.command()
 @click.argument('hci_transport')
 @click.argument('device_address')
[email protected]('role_or_peer_address')
 @click.option('-sh', '--send-host', type=str, default='127.0.0.1', help='UDP host to send to')
 @click.option('-sp', '--send-port', type=int, default=9001, help='UDP port to send to')
 @click.option('-rh', '--receive-host', type=str, default='127.0.0.1', help='UDP host to receive on')
 @click.option('-rp', '--receive-port', type=int, default=9000, help='UDP port to receive on')
-def main(hci_transport, device_address, send_host, send_port, receive_host, receive_port):
-    logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
-    asyncio.run(run(hci_transport, device_address, send_host, send_port, receive_host, receive_port))
+def main(hci_transport, device_address, role_or_peer_address, send_host, send_port, receive_host, receive_port):
+    asyncio.run(run(hci_transport, device_address, role_or_peer_address, send_host, send_port, receive_host, receive_port))
 
 
 # -----------------------------------------------------------------------------
+logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
 if __name__ == '__main__':
     main()
diff --git a/apps/l2cap_bridge.py b/apps/l2cap_bridge.py
new file mode 100644
index 0000000..ba658c2
--- /dev/null
+++ b/apps/l2cap_bridge.py
@@ -0,0 +1,331 @@
+# Copyright 2021-2022 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# -----------------------------------------------------------------------------
+# Imports
+# -----------------------------------------------------------------------------
+import asyncio
+import click
+import logging
+import os
+from colors import color
+
+from bumble.transport import open_transport_or_link
+from bumble.device import Device
+from bumble.utils import FlowControlAsyncPipe
+from bumble.hci import HCI_Constant
+
+
+# -----------------------------------------------------------------------------
+class ServerBridge:
+    """
+    L2CAP CoC server bridge: waits for a peer to connect an L2CAP CoC channel
+    on a specified PSM. When the connection is made, the bridge connects a TCP
+    socket to a remote host and bridges the data in both directions, with flow
+    control.
+    When the L2CAP CoC channel is closed, the bridge disconnects the TCP socket
+    and waits for a new L2CAP CoC channel to be connected.
+    When the TCP connection is closed by the TCP server, XXXX
+    """
+    def __init__(
+        self,
+        psm,
+        max_credits,
+        mtu,
+        mps,
+        tcp_host,
+        tcp_port
+    ):
+        self.psm         = psm
+        self.max_credits = max_credits
+        self.mtu         = mtu
+        self.mps         = mps
+        self.tcp_host    = tcp_host
+        self.tcp_port    = tcp_port
+
+    async def start(self, device):
+        # Listen for incoming L2CAP CoC connections
+        device.register_l2cap_channel_server(
+            psm         = self.psm,
+            server      = self.on_coc,
+            max_credits = self.max_credits,
+            mtu         = self.mtu,
+            mps         = self.mps
+        )
+        print(color(f'### Listening for CoC connection on PSM {self.psm}', 'yellow'))
+
+        def on_ble_connection(connection):
+            def on_ble_disconnection(reason):
+                print(color('@@@ Bluetooth disconnection:', 'red'), HCI_Constant.error_name(reason))
+
+            print(color('@@@ Bluetooth connection:', 'green'), connection)
+            connection.on('disconnection', on_ble_disconnection)
+
+        device.on('connection', on_ble_connection)
+
+        await device.start_advertising(auto_restart=True)
+
+    # Called when a new L2CAP connection is established
+    def on_coc(self, l2cap_channel):
+        print(color('*** L2CAP channel:', 'cyan'), l2cap_channel)
+
+        class Pipe:
+            def __init__(self, bridge, l2cap_channel):
+                self.bridge        = bridge
+                self.tcp_transport = None
+                self.l2cap_channel = l2cap_channel
+
+                l2cap_channel.on('close', self.on_l2cap_close)
+                l2cap_channel.sink = self.on_coc_sdu
+
+            async def connect_to_tcp(self):
+                # Connect to the TCP server
+                print(color(f'### Connecting to TCP {self.bridge.tcp_host}:{self.bridge.tcp_port}...', 'yellow'))
+
+                class TcpClientProtocol(asyncio.Protocol):
+                    def __init__(self, pipe):
+                        self.pipe = pipe
+
+                    def connection_lost(self, error):
+                        print(color(f'!!! TCP connection lost: {error}', 'red'))
+                        if self.pipe.l2cap_channel is not None:
+                            asyncio.create_task(self.pipe.l2cap_channel.disconnect())
+
+                    def data_received(self, data):
+                        print(f'<<< Received on TCP: {len(data)}')
+                        self.pipe.l2cap_channel.write(data)
+
+                try:
+                    self.tcp_transport, _ = await asyncio.get_running_loop().create_connection(
+                        lambda: TcpClientProtocol(self),
+                        host=self.bridge.tcp_host,
+                        port=self.bridge.tcp_port,
+                    )
+                    print(color('### Connected', 'green'))
+                except Exception as error:
+                    print(color(f'!!! Connection failed: {error}', 'red'))
+                    await self.l2cap_channel.disconnect()
+
+            def on_l2cap_close(self):
+                self.l2cap_channel = None
+                if self.tcp_transport is not None:
+                    self.tcp_transport.close()
+
+            def on_coc_sdu(self, sdu):
+                print(color(f'<<< [L2CAP SDU]: {len(sdu)} bytes', 'cyan'))
+                if self.tcp_transport is None:
+                    print(color('!!! TCP socket not open, dropping', 'red'))
+                    return
+                self.tcp_transport.write(sdu)
+
+        pipe = Pipe(self, l2cap_channel)
+
+        asyncio.create_task(pipe.connect_to_tcp())
+
+
+# -----------------------------------------------------------------------------
+class ClientBridge:
+    """
+    L2CAP CoC client bridge: connects to a BLE device, then waits for an inbound
+    TCP connection on a specified port number. When a TCP client connects, an
+    L2CAP CoC channel connection to the BLE device is established, and the data
+    is bridged in both directions, with flow control.
+    When the TCP connection is closed by the client, the L2CAP CoC channel is
+    disconnected, but the connection to the BLE device remains, ready for a new
+    TCP client to connect.
+    When the L2CAP CoC channel is closed, XXXX
+    """
+
+    READ_CHUNK_SIZE = 4096
+
+    def __init__(
+        self,
+        psm,
+        max_credits,
+        mtu,
+        mps,
+        address,
+        tcp_host,
+        tcp_port
+    ):
+        self.psm         = psm
+        self.max_credits = max_credits
+        self.mtu         = mtu
+        self.mps         = mps
+        self.address     = address
+        self.tcp_host    = tcp_host
+        self.tcp_port    = tcp_port
+
+    async def start(self, device):
+        print(color(f'### Connecting to {self.address}...', 'yellow'))
+        connection = await device.connect(self.address)
+        print(color('### Connected', 'green'))
+
+        # Called when the BLE connection is disconnected
+        def on_ble_disconnection(reason):
+            print(color('@@@ Bluetooth disconnection:', 'red'), HCI_Constant.error_name(reason))
+
+        connection.on('disconnection', on_ble_disconnection)
+
+        # Called when a TCP connection is established
+        async def on_tcp_connection(reader, writer):
+            peername = writer.get_extra_info('peername')
+            print(color(f'<<< TCP connection from {peername}', 'magenta'))
+
+            def on_coc_sdu(sdu):
+                print(color(f'<<< [L2CAP SDU]: {len(sdu)} bytes', 'cyan'))
+                l2cap_to_tcp_pipe.write(sdu)
+
+            def on_l2cap_close():
+                print(color('*** L2CAP channel closed', 'red'))
+                l2cap_to_tcp_pipe.stop()
+                writer.close()
+
+            # Connect a new L2CAP channel
+            print(color(f'>>> Opening L2CAP channel on PSM = {self.psm}', 'yellow'))
+            try:
+                l2cap_channel = await connection.open_l2cap_channel(
+                    psm         = self.psm,
+                    max_credits = self.max_credits,
+                    mtu         = self.mtu,
+                    mps         = self.mps
+                )
+                print(color('*** L2CAP channel:', 'cyan'), l2cap_channel)
+            except Exception as error:
+                print(color(f'!!! Connection failed: {error}', 'red'))
+                writer.close()
+                return
+
+            l2cap_channel.sink = on_coc_sdu
+            l2cap_channel.on('close', on_l2cap_close)
+
+            # Start a flow control pipe from L2CAP to TCP
+            l2cap_to_tcp_pipe = FlowControlAsyncPipe(
+                l2cap_channel.pause_reading,
+                l2cap_channel.resume_reading,
+                writer.write,
+                writer.drain
+            )
+            l2cap_to_tcp_pipe.start()
+
+            # Pipe data from TCP to L2CAP
+            while True:
+                try:
+                    data = await reader.read(self.READ_CHUNK_SIZE)
+
+                    if len(data) == 0:
+                        print(color('!!! End of stream', 'red'))
+                        await l2cap_channel.disconnect()
+                        return
+
+                    print(color(f'<<< [TCP DATA]: {len(data)} bytes', 'blue'))
+                    l2cap_channel.write(data)
+                    await l2cap_channel.drain()
+                except Exception as error:
+                    print(f'!!! Exception: {error}')
+                    break
+
+            writer.close()
+            print(color('~~~ Bye bye', 'magenta'))
+
+        await asyncio.start_server(
+            on_tcp_connection,
+            host=self.tcp_host if self.tcp_host != '_' else None,
+            port=self.tcp_port
+        )
+        print(color(f'### Listening for TCP connections on port {self.tcp_port}', 'magenta'))
+
+
+# -----------------------------------------------------------------------------
+async def run(device_config, hci_transport, bridge):
+    print('<<< connecting to HCI...')
+    async with await open_transport_or_link(hci_transport) as (hci_source, hci_sink):
+        print('<<< connected')
+
+        device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink)
+
+        # Let's go
+        await device.power_on()
+        await bridge.start(device)
+
+        # Wait until the transport terminates
+        await hci_source.wait_for_termination()
+
+
+# -----------------------------------------------------------------------------
[email protected]()
[email protected]_context
[email protected]('--device-config',         help='Device configuration file', required=True)
[email protected]('--hci-transport',         help='HCI transport', required=True)
[email protected]('--psm',                   help='PSM for L2CAP CoC', type=int, default=1234)
[email protected]('--l2cap-coc-max-credits', help='Maximum L2CAP CoC Credits', type=click.IntRange(1, 65535), default=128)
[email protected]('--l2cap-coc-mtu',         help='L2CAP CoC MTU', type=click.IntRange(23, 65535), default=1022)
[email protected]('--l2cap-coc-mps',         help='L2CAP CoC MPS', type=click.IntRange(23, 65533), default=1024)
+def cli(context, device_config, hci_transport, psm, l2cap_coc_max_credits, l2cap_coc_mtu, l2cap_coc_mps):
+    context.ensure_object(dict)
+    context.obj['device_config'] = device_config
+    context.obj['hci_transport'] = hci_transport
+    context.obj['psm']           = psm
+    context.obj['max_credits']   = l2cap_coc_max_credits
+    context.obj['mtu']           = l2cap_coc_mtu
+    context.obj['mps']           = l2cap_coc_mps
+
+
+# -----------------------------------------------------------------------------
[email protected]()
[email protected]_context
[email protected]('--tcp-host', help='TCP host', default='localhost')
[email protected]('--tcp-port', help='TCP port', default=9544)
+def server(context, tcp_host, tcp_port):
+    bridge = ServerBridge(
+        context.obj['psm'],
+        context.obj['max_credits'],
+        context.obj['mtu'],
+        context.obj['mps'],
+        tcp_host,
+        tcp_port)
+    asyncio.run(run(
+        context.obj['device_config'],
+        context.obj['hci_transport'],
+        bridge
+    ))
+
+
+# -----------------------------------------------------------------------------
[email protected]()
[email protected]_context
[email protected]('bluetooth-address')
[email protected]('--tcp-host', help='TCP host', default='_')
[email protected]('--tcp-port', help='TCP port', default=9543)
+def client(context, bluetooth_address, tcp_host, tcp_port):
+    bridge = ClientBridge(
+        context.obj['psm'],
+        context.obj['max_credits'],
+        context.obj['mtu'],
+        context.obj['mps'],
+        bluetooth_address,
+        tcp_host,
+        tcp_port
+    )
+    asyncio.run(run(
+        context.obj['device_config'],
+        context.obj['hci_transport'],
+        bridge
+    ))
+
+
+# -----------------------------------------------------------------------------
+logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
+if __name__ == '__main__':
+    cli(obj={})
diff --git a/bumble/avdtp.py b/bumble/avdtp.py
index 759e38c..7fe4fbb 100644
--- a/bumble/avdtp.py
+++ b/bumble/avdtp.py
@@ -351,7 +351,7 @@
                 logger.debug('pump canceled')
 
         # Pump packets
-        self.pump_task = asyncio.get_running_loop().create_task(pump_packets())
+        self.pump_task = asyncio.create_task(pump_packets())
 
     async def stop(self):
         # Stop the pump
@@ -1890,10 +1890,10 @@
         self.configuration = configuration
 
     def on_start_command(self):
-        asyncio.get_running_loop().create_task(self.start())
+        asyncio.create_task(self.start())
 
     def on_suspend_command(self):
-        asyncio.get_running_loop().create_task(self.stop())
+        asyncio.create_task(self.stop())
 
 
 # -----------------------------------------------------------------------------
diff --git a/bumble/device.py b/bumble/device.py
index b12fad5..50801c7 100644
--- a/bumble/device.py
+++ b/bumble/device.py
@@ -43,6 +43,13 @@
 # -----------------------------------------------------------------------------
 # Constants
 # -----------------------------------------------------------------------------
+DEVICE_MIN_SCAN_INTERVAL                      = 25
+DEVICE_MAX_SCAN_INTERVAL                      = 10240
+DEVICE_MIN_SCAN_WINDOW                        = 25
+DEVICE_MAX_SCAN_WINDOW                        = 10240
+DEVICE_MIN_LE_RSSI                            = -127
+DEVICE_MAX_LE_RSSI                            = 20
+
 DEVICE_DEFAULT_ADDRESS                        = '00:00:00:00:00:00'
 DEVICE_DEFAULT_ADVERTISING_INTERVAL           = 1000  # ms
 DEVICE_DEFAULT_ADVERTISING_DATA               = ''
@@ -62,20 +69,15 @@
 DEVICE_DEFAULT_CONNECTION_SUPERVISION_TIMEOUT = 720  # ms
 DEVICE_DEFAULT_CONNECTION_MIN_CE_LENGTH       = 0   # ms
 DEVICE_DEFAULT_CONNECTION_MAX_CE_LENGTH       = 0   # ms
-
-DEVICE_MIN_SCAN_INTERVAL                      = 25
-DEVICE_MAX_SCAN_INTERVAL                      = 10240
-DEVICE_MIN_SCAN_WINDOW                        = 25
-DEVICE_MAX_SCAN_WINDOW                        = 10240
-DEVICE_MIN_LE_RSSI                            = -127
-DEVICE_MAX_LE_RSSI                            = 20
+DEVICE_DEFAULT_L2CAP_COC_MTU                  = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU
+DEVICE_DEFAULT_L2CAP_COC_MPS                  = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS
+DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS          = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS
 
 
 # -----------------------------------------------------------------------------
 # Classes
 # -----------------------------------------------------------------------------
 
-
 # -----------------------------------------------------------------------------
 class Advertisement:
     TX_POWER_NOT_AVAILABLE = HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE
@@ -429,7 +431,16 @@
     def create_l2cap_connector(self, psm):
         return self.device.create_l2cap_connector(self, psm)
 
-    async def disconnect(self, reason = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR):
+    async def open_l2cap_channel(
+        self,
+        psm,
+        max_credits=DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS,
+        mtu=DEVICE_DEFAULT_L2CAP_COC_MTU,
+        mps=DEVICE_DEFAULT_L2CAP_COC_MPS
+    ):
+        return await self.device.open_l2cap_channel(self, psm, max_credits, mtu, mps)
+
+    async def disconnect(self, reason=HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR):
         return await self.device.disconnect(self, reason)
 
     async def pair(self):
@@ -563,6 +574,7 @@
         with open(filename, 'r') as file:
             self.load_from_dict(json.load(file))
 
+
 # -----------------------------------------------------------------------------
 # Decorators used with the following Device class
 # (we define them outside of the Device class, because defining decorators
@@ -685,7 +697,7 @@
         self.classic_enabled            = False
         self.inquiry_response           = None
         self.address_resolver           = None
-        self.classic_pending_accepts    = { Address.ANY: [] }  # Futures, by BD address OR [Futures] for Address.ANY
+        self.classic_pending_accepts    = {Address.ANY: []}  # Futures, by BD address OR [Futures] for Address.ANY
 
         # Use the initial config or a default
         self.public_address = Address('00:00:00:00:00:00')
@@ -785,15 +797,35 @@
                 if transport is None or connection.transport == transport:
                     return connection
 
-    def register_l2cap_server(self, psm, server):
-        self.l2cap_channel_manager.register_server(psm, server)
-
     def create_l2cap_connector(self, connection, psm):
         return lambda: self.l2cap_channel_manager.connect(connection, psm)
 
     def create_l2cap_registrar(self, psm):
         return lambda handler: self.register_l2cap_server(psm, handler)
 
+    def register_l2cap_server(self, psm, server):
+        self.l2cap_channel_manager.register_server(psm, server)
+
+    def register_l2cap_channel_server(
+        self,
+        psm,
+        server,
+        max_credits=DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS,
+        mtu=DEVICE_DEFAULT_L2CAP_COC_MTU,
+        mps=DEVICE_DEFAULT_L2CAP_COC_MPS
+    ):
+        return self.l2cap_channel_manager.register_le_coc_server(psm, server, max_credits, mtu, mps)
+
+    async def open_l2cap_channel(
+        self,
+        connection,
+        psm,
+        max_credits=DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS,
+        mtu=DEVICE_DEFAULT_L2CAP_COC_MTU,
+        mps=DEVICE_DEFAULT_L2CAP_COC_MPS
+    ):
+        return await self.l2cap_channel_manager.open_le_coc(connection, psm, max_credits, mtu, mps)
+
     def send_l2cap_pdu(self, connection_handle, cid, pdu):
         self.host.send_l2cap_pdu(connection_handle, cid, pdu)
 
@@ -1185,13 +1217,15 @@
         def on_connection(connection):
             if transport == BT_LE_TRANSPORT or (
                 # match BR/EDR connection event against peer address
-                connection.transport == transport and connection.peer_address == peer_address):
+                connection.transport == transport and connection.peer_address == peer_address
+            ):
                 pending_connection.set_result(connection)
 
         def on_connection_failure(error):
             if transport == BT_LE_TRANSPORT or (
                 # match BR/EDR connection failure event against peer address
-                error.transport == transport and error.peer_address == peer_address):
+                error.transport == transport and error.peer_address == peer_address
+            ):
                 pending_connection.set_exception(error)
 
         # Create a future so that we can wait for the connection's result
@@ -1336,7 +1370,7 @@
         if peer_address == Address.NIL:
             raise ValueError('accept on nil address')
 
-         # Create a future so that we can wait for the request
+        # Create a future so that we can wait for the request
         pending_request = asyncio.get_running_loop().create_future()
 
         if peer_address == Address.ANY:
@@ -1349,8 +1383,7 @@
         try:
             # Wait for a request or a completed connection
             result = await (asyncio.wait_for(pending_request, timeout) if timeout else pending_request)
-
-        except:
+        except Exception:
             # Remove future from device context
             if peer_address == Address.ANY:
                 self.classic_pending_accepts[Address.ANY].remove(pending_request)
@@ -1710,26 +1743,32 @@
             connection.remove_listener('connection_encryption_failure', on_encryption_failure)
 
     # [Classic only]
-    async def request_remote_name(self, remote: Connection | Address):
+    async def request_remote_name(self, remote):  # remote: Connection | Address
         # Set up event handlers
         pending_name = asyncio.get_running_loop().create_future()
 
         if type(remote) == Address:
             peer_address = remote
-            handler = self.on('remote_name',
+            handler = self.on(
+                'remote_name',
                 lambda address, remote_name:
-                    pending_name.set_result(remote_name) if address == remote else None)
-            failure_handler = self.on('remote_name_failure',
+                    pending_name.set_result(remote_name) if address == remote else None
+            )
+            failure_handler = self.on(
+                'remote_name_failure',
                 lambda address, error_code:
-                    pending_name.set_exception(HCI_Error(error_code)) if address == remote else None)
+                    pending_name.set_exception(HCI_Error(error_code)) if address == remote else None
+            )
         else:
             peer_address = remote.peer_address
-            handler = remote.on('remote_name',
-                lambda:
-                    pending_name.set_result(remote.peer_name))
-            failure_handler = remote.on('remote_name_failure',
-                lambda error_code:
-                    pending_name.set_exception(HCI_Error(error_code)))
+            handler = remote.on(
+                'remote_name',
+                lambda: pending_name.set_result(remote.peer_name)
+            )
+            failure_handler = remote.on(
+                'remote_name_failure',
+                lambda error_code: pending_name.set_exception(HCI_Error(error_code))
+            )
 
         try:
             result = await self.send_command(
@@ -2097,7 +2136,6 @@
             else:
                 self.emit('remote_name_failure', address, error)
 
-
     # [Classic only]
     @host_event_handler
     @try_with_connection_from_address
diff --git a/bumble/gatt.py b/bumble/gatt.py
index 0847652..cc66329 100644
--- a/bumble/gatt.py
+++ b/bumble/gatt.py
@@ -25,6 +25,7 @@
 import asyncio
 import types
 import logging
+from pyee import EventEmitter
 from colors import color
 
 from .core import *
diff --git a/bumble/gatt_client.py b/bumble/gatt_client.py
index c1c5276..07116fa 100644
--- a/bumble/gatt_client.py
+++ b/bumble/gatt_client.py
@@ -273,7 +273,7 @@
             if response.op_code == ATT_ERROR_RESPONSE:
                 if response.error_code != ATT_ATTRIBUTE_NOT_FOUND_ERROR:
                     # Unexpected end
-                    logger.waning(f'!!! unexpected error while discovering services: {HCI_Constant.error_name(response.error_code)}')
+                    logger.warning(f'!!! unexpected error while discovering services: {HCI_Constant.error_name(response.error_code)}')
                     # TODO raise appropriate exception
                     return
                 break
@@ -337,7 +337,7 @@
             if response.op_code == ATT_ERROR_RESPONSE:
                 if response.error_code != ATT_ATTRIBUTE_NOT_FOUND_ERROR:
                     # Unexpected end
-                    logger.waning(f'!!! unexpected error while discovering services: {HCI_Constant.error_name(response.error_code)}')
+                    logger.warning(f'!!! unexpected error while discovering services: {HCI_Constant.error_name(response.error_code)}')
                     # TODO raise appropriate exception
                     return
                 break
diff --git a/bumble/gatt_server.py b/bumble/gatt_server.py
index 656df6b..c5d8dc1 100644
--- a/bumble/gatt_server.py
+++ b/bumble/gatt_server.py
@@ -155,7 +155,7 @@
         return cccd or bytes([0, 0])
 
     def write_cccd(self, connection, characteristic, value):
-        logger.debug(f'Subscription update for connection={connection.handle:04X}, handle={characteristic.handle:04X}: {value.hex()}')
+        logger.debug(f'Subscription update for connection=0x{connection.handle:04X}, handle=0x{characteristic.handle:04X}: {value.hex()}')
 
         # Sanity check
         if len(value) != 2:
@@ -204,7 +204,7 @@
         logger.debug(f'GATT Notify from server: [0x{connection.handle:04X}] {notification}')
         self.send_gatt_pdu(connection.handle, bytes(notification))
 
-    async def indicate_subscriber(self, connection, attribute, value=None, force=False):
+    async def indicate_subscriber(self, connection, attribute, force=False):
         # Check if there's a subscriber
         if not force:
             subscribers = self.subscribers.get(connection.handle)
diff --git a/bumble/hci.py b/bumble/hci.py
index d4cf7cc..cf9f682 100644
--- a/bumble/hci.py
+++ b/bumble/hci.py
@@ -2466,9 +2466,10 @@
 
 
 # -----------------------------------------------------------------------------
+@HCI_Command.command()
 class HCI_Read_Synchronous_Flow_Control_Enable_Command(HCI_Command):
     '''
-    See Bluetooth spec @ 7.3.36 Write Synchronous Flow Control Enable Command
+    See Bluetooth spec @ 7.3.36 Read Synchronous Flow Control Enable Command
     '''
 
 
diff --git a/bumble/host.py b/bumble/host.py
index 32b2194..a57299b 100644
--- a/bumble/host.py
+++ b/bumble/host.py
@@ -79,6 +79,8 @@
         self.local_version                    = None
         self.local_supported_commands         = bytes(64)
         self.local_le_features                = 0
+        self.suggested_max_tx_octets          = 251   # Max allowed
+        self.suggested_max_tx_time            = 2120  # Max allowed
         self.command_semaphore                = asyncio.Semaphore(1)
         self.long_term_key_provider           = None
         self.link_key_provider                = None
@@ -138,6 +140,22 @@
             f'hc_total_num_le_acl_data_packets={self.hc_total_num_le_acl_data_packets}'
         )
 
+        if (
+            self.supports_command(HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND) and
+            self.supports_command(HCI_LE_WRITE_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND)
+        ):
+            response = await self.send_command(HCI_LE_Read_Suggested_Default_Data_Length_Command())
+            suggested_max_tx_octets = response.return_parameters.suggested_max_tx_octets
+            suggested_max_tx_time   = response.return_parameters.suggested_max_tx_time
+            if (
+                suggested_max_tx_octets != self.suggested_max_tx_octets or
+                suggested_max_tx_time != self.suggested_max_tx_time
+            ):
+                await self.send_command(HCI_LE_Write_Suggested_Default_Data_Length_Command(
+                    suggested_max_tx_octets = self.suggested_max_tx_octets,
+                    suggested_max_tx_time   = self.suggested_max_tx_time
+                ))
+
         self.reset_done = True
 
     @property
diff --git a/bumble/l2cap.py b/bumble/l2cap.py
index 927454e..78ec0ec 100644
--- a/bumble/l2cap.py
+++ b/bumble/l2cap.py
@@ -19,6 +19,7 @@
 import logging
 import struct
 
+from collections import deque
 from colors import color
 from pyee import EventEmitter
 
@@ -43,13 +44,23 @@
 
 L2CAP_DEFAULT_MTU = 2048  # Default value for the MTU we are willing to accept
 
+L2CAP_DEFAULT_CONNECTIONLESS_MTU = 1024
+
 # See Bluetooth spec @ Vol 3, Part A - Table 2.1: CID name space on ACL-U, ASB-U, and AMP-U logical links
 L2CAP_ACL_U_DYNAMIC_CID_RANGE_START = 0x0040
 L2CAP_ACL_U_DYNAMIC_CID_RANGE_END   = 0xFFFF
 
 # See Bluetooth spec @ Vol 3, Part A - Table 2.2: CID name space on LE-U logical link
 L2CAP_LE_U_DYNAMIC_CID_RANGE_START = 0x0040
-L2CAP_LE_U_DYNAMIC_CID_RANGE_START = 0x007F
+L2CAP_LE_U_DYNAMIC_CID_RANGE_END   = 0x007F
+
+# PSM Range - See Bluetooth spec @ Vol 3, Part A / Table 4.5: PSM ranges and usage
+L2CAP_PSM_DYNAMIC_RANGE_START = 0x1001
+L2CAP_PSM_DYNAMIC_RANGE_END   = 0xFFFF
+
+# LE PSM Ranges - See Bluetooth spec @ Vol 3, Part A / Table 4.19: LE Credit Based Connection Request LE_PSM ranges
+L2CAP_LE_PSM_DYNAMIC_RANGE_START = 0x0080
+L2CAP_LE_PSM_DYNAMIC_RANGE_END   = 0x00FF
 
 # Frame types
 L2CAP_COMMAND_REJECT                       = 0x01
@@ -107,8 +118,13 @@
 L2CAP_SIGNALING_MTU_EXCEEDED_REASON = 0x0001
 L2CAP_INVALID_CID_IN_REQUEST_REASON = 0x0002
 
-L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU = 2048
-L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS = 2048
+L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_CREDITS             = 65535
+L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU                 = 23
+L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MPS                 = 23
+L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MPS                 = 65533
+L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU             = 2046
+L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS             = 2048
+L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS = 256
 
 L2CAP_MAXIMUM_TRANSMISSION_UNIT_CONFIGURATION_OPTION_TYPE = 0x01
 
@@ -172,7 +188,7 @@
         self.identifier = pdu[1]
         length = struct.unpack_from('<H', pdu, 2)[0]
         if length + 4 != len(pdu):
-            logger.warn(color(f'!!! length mismatch: expected {len(pdu) - 4} but got {length}', 'red'))
+            logger.warning(color(f'!!! length mismatch: expected {len(pdu) - 4} but got {length}', 'red'))
         if hasattr(self, 'fields'):
             self.init_from_bytes(pdu, 4)
         return self
@@ -185,10 +201,10 @@
     def decode_configuration_options(data):
         options = []
         while len(data) >= 2:
-            type = data[0]
+            type   = data[0]
             length = data[1]
-            value = data[2:2 + length]
-            data = data[2 + length:]
+            value  = data[2:2 + length]
+            data   = data[2 + length:]
             options.append((type, value))
 
         return options
@@ -268,7 +284,10 @@
 
 # -----------------------------------------------------------------------------
 @L2CAP_Control_Frame.subclass([
-    ('psm',        2),
+    ('psm', {
+        'parser': lambda data, offset: L2CAP_Connection_Request.parse_psm(data, offset),
+        'serializer': lambda value: L2CAP_Connection_Request.serialize_psm(value)
+    }),
     ('source_cid', 2)
 ])
 class L2CAP_Connection_Request(L2CAP_Control_Frame):
@@ -276,6 +295,28 @@
     See Bluetooth spec @ Vol 3, Part A - 4.2 CONNECTION REQUEST
     '''
 
+    @staticmethod
+    def parse_psm(data, offset=0):
+        psm_length = 2
+        psm = data[offset] | data[offset + 1] << 8
+
+        # The PSM field extends until the first even octet (inclusive)
+        while data[offset + psm_length - 1] % 2 == 1:
+            psm |= data[offset + psm_length] << (8 * psm_length)
+            psm_length += 1
+
+        return offset + psm_length, psm
+
+    @staticmethod
+    def serialize_psm(psm):
+        serialized = struct.pack('<H', psm & 0xFFFF)
+        psm >>= 16
+        while psm:
+            serialized += bytes([psm & 0xFF])
+            psm >>= 8
+
+        return serialized
+
 
 # -----------------------------------------------------------------------------
 @L2CAP_Control_Frame.subclass([
@@ -289,16 +330,16 @@
     See Bluetooth spec @ Vol 3, Part A - 4.3 CONNECTION RESPONSE
     '''
 
-    CONNECTION_SUCCESSFUL                               = 0x0000
-    CONNECTION_PENDING                                  = 0x0001
-    CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED             = 0x0002
-    CONNECTION_REFUSED_SECURITY_BLOCK                   = 0x0003
-    CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE           = 0x0004
-    CONNECTION_REFUSED_INVALID_SOURCE_CID               = 0x0006
-    CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED     = 0x0007
-    CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS          = 0x000B
+    CONNECTION_SUCCESSFUL                           = 0x0000
+    CONNECTION_PENDING                              = 0x0001
+    CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED         = 0x0002
+    CONNECTION_REFUSED_SECURITY_BLOCK               = 0x0003
+    CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE       = 0x0004
+    CONNECTION_REFUSED_INVALID_SOURCE_CID           = 0x0006
+    CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED = 0x0007
+    CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS      = 0x000B
 
-    CONNECTION_RESULT_NAMES = {
+    RESULT_NAMES = {
         CONNECTION_SUCCESSFUL:                           'CONNECTION_SUCCESSFUL',
         CONNECTION_PENDING:                              'CONNECTION_PENDING',
         CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED:         'CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED',
@@ -311,7 +352,7 @@
 
     @staticmethod
     def result_name(result):
-        return name_or_number(L2CAP_Connection_Response.CONNECTION_RESULT_NAMES, result)
+        return name_or_number(L2CAP_Connection_Response.RESULT_NAMES, result)
 
 
 # -----------------------------------------------------------------------------
@@ -485,10 +526,10 @@
 
 # -----------------------------------------------------------------------------
 @L2CAP_Control_Frame.subclass([
-    ('le_psm', 2),
-    ('source_cid', 2),
-    ('mtu', 2),
-    ('mps', 2),
+    ('le_psm',          2),
+    ('source_cid',      2),
+    ('mtu',             2),
+    ('mps',             2),
     ('initial_credits', 2)
 ])
 class L2CAP_LE_Credit_Based_Connection_Request(L2CAP_Control_Frame):
@@ -521,7 +562,7 @@
     CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED     = 0x000A
     CONNECTION_REFUSED_UNACCEPTABLE_PARAMETERS          = 0x000B
 
-    CONNECTION_RESULT_NAMES = {
+    RESULT_NAMES = {
         CONNECTION_SUCCESSFUL:                               'CONNECTION_SUCCESSFUL',
         CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED:             'CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED',
         CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE:           'CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE',
@@ -536,12 +577,12 @@
 
     @staticmethod
     def result_name(result):
-        return name_or_number(L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_RESULT_NAMES, result)
+        return name_or_number(L2CAP_LE_Credit_Based_Connection_Response.RESULT_NAMES, result)
 
 
 # -----------------------------------------------------------------------------
 @L2CAP_Control_Frame.subclass([
-    ('cid', 2),
+    ('cid',     2),
     ('credits', 2)
 ])
 class L2CAP_LE_Flow_Control_Credit(L2CAP_Control_Frame):
@@ -619,6 +660,9 @@
     def send_pdu(self, pdu):
         self.manager.send_pdu(self.connection, self.destination_cid, pdu)
 
+    def send_control_frame(self, frame):
+        self.manager.send_control_frame(self.connection, self.signaling_cid, frame)
+
     async def send_request(self, request):
         # Check that there isn't already a request pending
         if self.response:
@@ -637,15 +681,16 @@
         elif self.sink:
             self.sink(pdu)
         else:
-            logger.warn(color('received pdu without a pending request or sink', 'red'))
-
-    def send_control_frame(self, frame):
-        self.manager.send_control_frame(self.connection, self.signaling_cid, frame)
+            logger.warning(color('received pdu without a pending request or sink', 'red'))
 
     async def connect(self):
         if self.state != Channel.CLOSED:
             raise InvalidStateError('invalid state')
 
+        # Check that we can start a new connection
+        if self.connection_result:
+            raise RuntimeError('connection already pending')
+
         self.change_state(Channel.WAIT_CONNECT_RSP)
         self.send_control_frame(
             L2CAP_Connection_Request(
@@ -657,7 +702,12 @@
 
         # Create a future to wait for the state machine to get to a success or error state
         self.connection_result = asyncio.get_running_loop().create_future()
-        return await self.connection_result
+
+        # Wait for the connection to succeed or fail
+        try:
+            return await self.connection_result
+        finally:
+            self.connection_result = None
 
     async def disconnect(self):
         if self.state != Channel.OPEN:
@@ -708,7 +758,7 @@
 
     def on_connection_response(self, response):
         if self.state != Channel.WAIT_CONNECT_RSP:
-            logger.warn(color('invalid state', 'red'))
+            logger.warning(color('invalid state', 'red'))
             return
 
         if response.result == L2CAP_Connection_Response.CONNECTION_SUCCESSFUL:
@@ -734,7 +784,7 @@
             self.state != Channel.WAIT_CONFIG_REQ and
             self.state != Channel.WAIT_CONFIG_REQ_RSP
         ):
-            logger.warn(color('invalid state', 'red'))
+            logger.warning(color('invalid state', 'red'))
             return
 
         # Decode the options
@@ -750,7 +800,7 @@
                 source_cid = self.destination_cid,
                 flags      = 0x0000,
                 result     = L2CAP_Configure_Response.SUCCESS,
-                options    = request.options  # TODO: don't accept everthing blindly
+                options    = request.options  # TODO: don't accept everything blindly
             )
         )
         if self.state == Channel.WAIT_CONFIG:
@@ -777,7 +827,7 @@
                     self.connection_result = None
                 self.emit('open')
             else:
-                logger.warn(color('invalid state', 'red'))
+                logger.warning(color('invalid state', 'red'))
         elif response.result == L2CAP_Configure_Response.FAILURE_UNACCEPTABLE_PARAMETERS:
             # Re-configure with what's suggested in the response
             self.send_control_frame(
@@ -789,7 +839,7 @@
                 )
             )
         else:
-            logger.warn(color(f'!!! configuration rejected: {L2CAP_Configure_Response.result_name(response.result)}', 'red'))
+            logger.warning(color(f'!!! configuration rejected: {L2CAP_Configure_Response.result_name(response.result)}', 'red'))
             # TODO: decide how to fail gracefully
 
     def on_disconnection_request(self, request):
@@ -805,15 +855,15 @@
             self.emit('close')
             self.manager.on_channel_closed(self)
         else:
-            logger.warn(color('invalid state', 'red'))
+            logger.warning(color('invalid state', 'red'))
 
     def on_disconnection_response(self, response):
         if self.state != Channel.WAIT_DISCONNECT:
-            logger.warn(color('invalid state', 'red'))
+            logger.warning(color('invalid state', 'red'))
             return
 
         if response.destination_cid != self.destination_cid or response.source_cid != self.source_cid:
-            logger.warn('unexpected source or destination CID')
+            logger.warning('unexpected source or destination CID')
             return
 
         self.change_state(Channel.CLOSED)
@@ -828,22 +878,362 @@
 
 
 # -----------------------------------------------------------------------------
+class LeConnectionOrientedChannel(EventEmitter):
+    """
+    LE Credit-based Connection Oriented Channel
+    """
+
+    INIT             = 0
+    CONNECTED        = 1
+    CONNECTING       = 2
+    DISCONNECTING    = 3
+    DISCONNECTED     = 4
+    CONNECTION_ERROR = 5
+
+    STATE_NAMES = {
+        INIT:             'INIT',
+        CONNECTED:        'CONNECTED',
+        CONNECTING:       'CONNECTING',
+        DISCONNECTING:    'DISCONNECTING',
+        DISCONNECTED:     'DISCONNECTED',
+        CONNECTION_ERROR: 'CONNECTION_ERROR'
+    }
+
+    @staticmethod
+    def state_name(state):
+        return name_or_number(LeConnectionOrientedChannel.STATE_NAMES, state)
+
+    def __init__(
+        self,
+        manager,
+        connection,
+        le_psm,
+        source_cid,
+        destination_cid,
+        mtu,
+        mps,
+        credits,
+        peer_mtu,
+        peer_mps,
+        peer_credits,
+        connected
+    ):
+        super().__init__()
+        self.manager                = manager
+        self.connection             = connection
+        self.le_psm                 = le_psm
+        self.source_cid             = source_cid
+        self.destination_cid        = destination_cid
+        self.mtu                    = mtu
+        self.mps                    = mps
+        self.credits                = credits
+        self.peer_mtu               = peer_mtu
+        self.peer_mps               = peer_mps
+        self.peer_credits           = peer_credits
+        self.peer_max_credits       = self.peer_credits
+        self.peer_credits_threshold = self.peer_max_credits // 2
+        self.in_sdu                 = None
+        self.in_sdu_length          = 0
+        self.out_queue              = deque()
+        self.out_sdu                = None
+        self.sink                   = None
+        self.connection_result      = None
+        self.disconnection_result   = None
+        self.drained                = asyncio.Event()
+
+        self.drained.set()
+
+        if connected:
+            self.state = LeConnectionOrientedChannel.CONNECTED
+        else:
+            self.state = LeConnectionOrientedChannel.INIT
+
+    def change_state(self, new_state):
+        logger.debug(f'{self} state change -> {color(self.state_name(new_state), "cyan")}')
+        self.state = new_state
+
+        if new_state == self.CONNECTED:
+            self.emit('open')
+        elif new_state == self.DISCONNECTED:
+            self.emit('close')
+
+    def send_pdu(self, pdu):
+        self.manager.send_pdu(self.connection, self.destination_cid, pdu)
+
+    def send_control_frame(self, frame):
+        self.manager.send_control_frame(self.connection, L2CAP_LE_SIGNALING_CID, frame)
+
+    async def connect(self):
+        # Check that we're in the right state
+        if self.state != self.INIT:
+            raise InvalidStateError('not in a connectable state')
+
+        # Check that we can start a new connection
+        identifier = self.manager.next_identifier(self.connection)
+        if identifier in self.manager.le_coc_requests:
+            raise RuntimeError('too many concurrent connection requests')
+
+        self.change_state(self.CONNECTING)
+        request = L2CAP_LE_Credit_Based_Connection_Request(
+            identifier      = identifier,
+            le_psm          = self.le_psm,
+            source_cid      = self.source_cid,
+            mtu             = self.mtu,
+            mps             = self.mps,
+            initial_credits = self.peer_credits
+        )
+        self.manager.le_coc_requests[identifier] = request
+        self.send_control_frame(request)
+
+        # Create a future to wait for the response
+        self.connection_result = asyncio.get_running_loop().create_future()
+
+        # Wait for the connection to succeed or fail
+        return await self.connection_result
+
+    async def disconnect(self):
+        # Check that we're connected
+        if self.state != self.CONNECTED:
+            raise InvalidStateError('not connected')
+
+        self.change_state(self.DISCONNECTING)
+        self.flush_output()
+        self.send_control_frame(
+            L2CAP_Disconnection_Request(
+                identifier      = self.manager.next_identifier(self.connection),
+                destination_cid = self.destination_cid,
+                source_cid      = self.source_cid
+            )
+        )
+
+        # Create a future to wait for the state machine to get to a success or error state
+        self.disconnection_result = asyncio.get_running_loop().create_future()
+        return await self.disconnection_result
+
+    def on_pdu(self, pdu):
+        if self.sink is None:
+            logger.warning('received pdu without a sink')
+            return
+
+        if self.state != self.CONNECTED:
+            logger.warning('received PDU while not connected, dropping')
+
+        # Manage the peer credits
+        if self.peer_credits == 0:
+            logger.warning('received LE frame when peer out of credits')
+        else:
+            self.peer_credits -= 1
+            if self.peer_credits <= self.peer_credits_threshold:
+                # The credits fell below the threshold, replenish them to the max
+                self.send_control_frame(
+                    L2CAP_LE_Flow_Control_Credit(
+                        identifier = self.manager.next_identifier(self.connection),
+                        cid        = self.source_cid,
+                        credits    = self.peer_max_credits - self.peer_credits
+                    )
+                )
+                self.peer_credits = self.peer_max_credits
+
+        # Check if this starts a new SDU
+        if self.in_sdu is None:
+            # Start a new SDU
+            self.in_sdu = pdu
+        else:
+            # Continue an SDU
+            self.in_sdu += pdu
+
+        # Check if the SDU is complete
+        if self.in_sdu_length == 0:
+            # We don't know the size yet, check if we have received the header to compute it
+            if len(self.in_sdu) >= 2:
+                self.in_sdu_length = struct.unpack_from('<H', self.in_sdu, 0)[0]
+        if self.in_sdu_length == 0:
+            # We'll compute it later
+            return
+        if len(self.in_sdu) < 2 + self.in_sdu_length:
+            # Not complete yet
+            logger.debug(f'SDU: {len(self.in_sdu) - 2} of {self.in_sdu_length} bytes received')
+            return
+        if len(self.in_sdu) != 2 + self.in_sdu_length:
+            # Overflow
+            logger.warning(f'SDU overflow: sdu_length={self.in_sdu_length}, received {len(self.in_sdu) - 2}')
+            # TODO: we should disconnect
+            self.in_sdu = None
+            self.in_sdu_length = 0
+            return
+
+        # Send the SDU to the sink
+        logger.debug(f'SDU complete: 2+{len(self.in_sdu) - 2} bytes')
+        self.sink(self.in_sdu[2:])
+
+        # Prepare for a new SDU
+        self.in_sdu = None
+        self.in_sdu_length = 0
+
+    def on_connection_response(self, response):
+        # Look for a matching pending response result
+        if self.connection_result is None:
+            logger.warning(f'received unexpected connection response (id={response.identifier})')
+            return
+
+        if response.result == L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_SUCCESSFUL:
+            self.destination_cid = response.destination_cid
+            self.peer_mtu        = response.mtu
+            self.peer_mps        = response.mps
+            self.credits         = response.initial_credits
+            self.connected       = True
+            self.connection_result.set_result(self)
+            self.change_state(self.CONNECTED)
+        else:
+            self.connection_result.set_exception(
+                ProtocolError(
+                    response.result,
+                    'l2cap',
+                    L2CAP_LE_Credit_Based_Connection_Response.result_name(response.result))
+            )
+            self.change_state(self.CONNECTION_ERROR)
+
+        # Cleanup
+        self.connection_result = None
+
+    def on_credits(self, credits):
+        self.credits += credits
+        logger.debug(f'received {credits} credits, total = {self.credits}')
+
+        # Try to send more data if we have any queued up
+        self.process_output()
+
+    def on_disconnection_request(self, request):
+        self.send_control_frame(
+            L2CAP_Disconnection_Response(
+                identifier      = request.identifier,
+                destination_cid = request.destination_cid,
+                source_cid      = request.source_cid
+            )
+        )
+        self.change_state(self.DISCONNECTED)
+        self.flush_output()
+
+    def on_disconnection_response(self, response):
+        if self.state != self.DISCONNECTING:
+            logger.warning(color('invalid state', 'red'))
+            return
+
+        if response.destination_cid != self.destination_cid or response.source_cid != self.source_cid:
+            logger.warning('unexpected source or destination CID')
+            return
+
+        self.change_state(self.DISCONNECTED)
+        if self.disconnection_result:
+            self.disconnection_result.set_result(None)
+            self.disconnection_result = None
+
+    def flush_output(self):
+        self.out_queue.clear()
+        self.out_sdu = None
+
+    def process_output(self):
+        while self.credits > 0:
+            if self.out_sdu is not None:
+                # Finish the current SDU
+                packet = self.out_sdu[:self.peer_mps]
+                self.send_pdu(packet)
+                self.credits -= 1
+                logger.debug(f'sent {len(packet)} bytes, {self.credits} credits left')
+                if len(packet) == len(self.out_sdu):
+                    # We sent everything
+                    self.out_sdu = None
+                else:
+                    # Keep what's still left to send
+                    self.out_sdu = self.out_sdu[len(packet):]
+                continue
+            elif self.out_queue:
+                # Create the next SDU (2 bytes header plus up to MTU bytes payload)
+                logger.debug(f'assembling SDU from {len(self.out_queue)} packets in output queue')
+                payload = b''
+                while self.out_queue and len(payload) < self.peer_mtu:
+                    # We can add more data to the payload
+                    chunk = self.out_queue[0][:self.peer_mtu - len(payload)]
+                    payload += chunk
+                    self.out_queue[0] = self.out_queue[0][len(chunk):]
+                    if len(self.out_queue[0]) == 0:
+                        # We consumed the entire buffer, remove it
+                        self.out_queue.popleft()
+                        logger.debug(f'packet completed, {len(self.out_queue)} left in queue')
+
+                # Construct the SDU with its header
+                assert len(payload) != 0
+                logger.debug(f'SDU complete: {len(payload)} payload bytes')
+                self.out_sdu = struct.pack('<H', len(payload)) + payload
+            else:
+                # Nothing left to send for now
+                self.drained.set()
+                return
+
+    def write(self, data):
+        if self.state != self.CONNECTED:
+            logger.warning('not connected, dropping data')
+            return
+
+        # Queue the data
+        self.out_queue.append(data)
+        self.drained.clear()
+        logger.debug(f'{len(data)} bytes packet queued, {len(self.out_queue)} packets in queue')
+
+        # Send what we can
+        self.process_output()
+
+    async def drain(self):
+        await self.drained.wait()
+
+    def pause_reading(self):
+        # TODO: not implemented yet
+        pass
+
+    def resume_reading(self):
+        # TODO: not implemented yet
+        pass
+
+    def __str__(self):
+        return f'CoC({self.source_cid}->{self.destination_cid}, State={self.state_name(self.state)}, PSM={self.le_psm}, MTU={self.mtu}/{self.peer_mtu}, MPS={self.mps}/{self.peer_mps}, credits={self.credits}/{self.peer_credits})'
+
+
+# -----------------------------------------------------------------------------
 class ChannelManager:
-    def __init__(self, extended_features=None, connectionless_mtu=1024):
-        self.host               = None
-        self.channels           = {}  # Channels, mapped by connection and cid
-        # Fixed channel handlers, mapped by cid
-        self.fixed_channels     = {
-            L2CAP_SIGNALING_CID: None, L2CAP_LE_SIGNALING_CID: None}
+    def __init__(self, extended_features=[], connectionless_mtu=L2CAP_DEFAULT_CONNECTIONLESS_MTU):
+        self._host              = None
         self.identifiers        = {}  # Incrementing identifier values by connection
+        self.channels           = {}  # All channels, mapped by connection and source cid
+        self.fixed_channels     = {   # Fixed channel handlers, mapped by cid
+            L2CAP_SIGNALING_CID: None, L2CAP_LE_SIGNALING_CID: None
+        }
         self.servers            = {}  # Servers accepting connections, by PSM
-        self.extended_features  = [] if extended_features is None else extended_features
+        self.le_coc_channels    = {}  # LE CoC channels, mapped by connection and destination cid
+        self.le_coc_servers     = {}  # LE CoC - Servers accepting connections, by PSM
+        self.le_coc_requests    = {}  # LE CoC connection requests, by identifier
+        self.extended_features  = extended_features
         self.connectionless_mtu = connectionless_mtu
 
+    @property
+    def host(self):
+        return self._host
+
+    @host.setter
+    def host(self, host):
+        if self._host is not None:
+            self._host.remove_listener('disconnection', self.on_disconnection)
+        self._host = host
+        if host is not None:
+            host.add_listener('disconnection', self.on_disconnection)
+
     def find_channel(self, connection_handle, cid):
         if connection_channels := self.channels.get(connection_handle):
             return connection_channels.get(cid)
 
+    def find_le_coc_channel(self, connection_handle, cid):
+        if connection_channels := self.le_coc_channels.get(connection_handle):
+            return connection_channels.get(cid)
+
     @staticmethod
     def find_free_br_edr_cid(channels):
         # Pick the smallest valid CID that's not already in the list
@@ -853,6 +1243,24 @@
             if cid not in channels:
                 return cid
 
+    @staticmethod
+    def find_free_le_cid(channels):
+        # Pick the smallest valid CID that's not already in the list
+        # (not necessarily the most efficient algorithm, but the list of CID is
+        # very small in practice)
+        for cid in range(L2CAP_LE_U_DYNAMIC_CID_RANGE_START, L2CAP_LE_U_DYNAMIC_CID_RANGE_END + 1):
+            if cid not in channels:
+                return cid
+
+    @staticmethod
+    def check_le_coc_parameters(max_credits, mtu, mps):
+        if max_credits < 1 or max_credits > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_CREDITS:
+            raise ValueError('max credits out of range')
+        if mtu < L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MTU:
+            raise ValueError('MTU too small')
+        if mps < L2CAP_LE_CREDIT_BASED_CONNECTION_MIN_MPS or mps > L2CAP_LE_CREDIT_BASED_CONNECTION_MAX_MPS:
+            raise ValueError('MPS out of range')
+
     def next_identifier(self, connection):
         identifier = (self.identifiers.setdefault(connection.handle, 0) + 1) % 256
         self.identifiers[connection.handle] = identifier
@@ -866,8 +1274,77 @@
             del self.fixed_channels[cid]
 
     def register_server(self, psm, server):
+        if psm == 0:
+            # Find a free PSM
+            for candidate in range(L2CAP_PSM_DYNAMIC_RANGE_START, L2CAP_PSM_DYNAMIC_RANGE_END + 1, 2):
+                if (candidate >> 8) % 2 == 1:
+                    continue
+                if candidate in self.servers:
+                    continue
+                psm = candidate
+                break
+            else:
+                raise InvalidStateError('no free PSM')
+        else:
+            # Check that the PSM isn't already in use
+            if psm in self.servers:
+                raise ValueError('PSM already in use')
+
+            # Check that the PSM is valid
+            if psm % 2 == 0:
+                raise ValueError('invalid PSM (not odd)')
+            check = psm >> 8
+            while check:
+                if check % 2 != 0:
+                    raise ValueError('invalid PSM')
+                check >>= 8
+
         self.servers[psm] = server
 
+        return psm
+
+    def register_le_coc_server(
+        self,
+        psm,
+        server,
+        max_credits=L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS,
+        mtu=L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU,
+        mps=L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS
+    ):
+        self.check_le_coc_parameters(max_credits, mtu, mps)
+
+        if psm == 0:
+            # Find a free PSM
+            for candidate in range(L2CAP_LE_PSM_DYNAMIC_RANGE_START, L2CAP_LE_PSM_DYNAMIC_RANGE_END + 1):
+                if candidate in self.le_coc_servers:
+                    continue
+                psm = candidate
+                break
+            else:
+                raise InvalidStateError('no free PSM')
+        else:
+            # Check that the PSM isn't already in use
+            if psm in self.le_coc_servers:
+                raise ValueError('PSM already in use')
+
+        self.le_coc_servers[psm] = (
+            server,
+            max_credits or L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS,
+            mtu or L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU,
+            mps or L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS
+        )
+
+        return psm
+
+    def on_disconnection(self, connection_handle, reason):
+        logger.debug(f'disconnection from {connection_handle}, cleaning up channels')
+        if connection_handle in self.channels:
+            del self.channels[connection_handle]
+        if connection_handle in self.le_coc_channels:
+            del self.le_coc_channels[connection_handle]
+        if connection_handle in self.identifiers:
+            del self.identifiers[connection_handle]
+
     def send_pdu(self, connection, cid, pdu):
         pdu_str = pdu.hex() if type(pdu) is bytes else str(pdu)
         logger.debug(f'{color(">>> Sending L2CAP PDU", "blue")} on connection [0x{connection.handle:04X}] (CID={cid}) {connection.peer_address}: {pdu_str}')
@@ -883,7 +1360,7 @@
             self.fixed_channels[cid](connection.handle, pdu)
         else:
             if (channel := self.find_channel(connection.handle, cid)) is None:
-                logger.warn(color(f'channel not found for 0x{connection.handle:04X}:{cid}', 'red'))
+                logger.warning(color(f'channel not found for 0x{connection.handle:04X}:{cid}', 'red'))
                 return
 
             channel.on_pdu(pdu)
@@ -927,7 +1404,6 @@
 
     def on_l2cap_command_reject(self, connection, cid, packet):
         logger.warning(f'{color("!!! Command rejected:", "red")} {packet.reason}')
-        pass
 
     def on_l2cap_connection_request(self, connection, cid, request):
         # Check if there's a server for this PSM
@@ -959,7 +1435,7 @@
             server(channel)
             channel.on_connection_request(request)
         else:
-            logger.warn(f'No server for connection 0x{connection.handle:04X} on PSM {request.psm}')
+            logger.warning(f'No server for connection 0x{connection.handle:04X} on PSM {request.psm}')
             self.send_control_frame(
                 connection,
                 cid,
@@ -974,35 +1450,35 @@
 
     def on_l2cap_connection_response(self, connection, cid, response):
         if (channel := self.find_channel(connection.handle, response.source_cid)) is None:
-            logger.warn(color(f'channel {response.source_cid} not found for 0x{connection.handle:04X}:{cid}', 'red'))
+            logger.warning(color(f'channel {response.source_cid} not found for 0x{connection.handle:04X}:{cid}', 'red'))
             return
 
         channel.on_connection_response(response)
 
     def on_l2cap_configure_request(self, connection, cid, request):
         if (channel := self.find_channel(connection.handle, request.destination_cid)) is None:
-            logger.warn(color(f'channel {request.destination_cid} not found for 0x{connection.handle:04X}:{cid}', 'red'))
+            logger.warning(color(f'channel {request.destination_cid} not found for 0x{connection.handle:04X}:{cid}', 'red'))
             return
 
         channel.on_configure_request(request)
 
     def on_l2cap_configure_response(self, connection, cid, response):
         if (channel := self.find_channel(connection.handle, response.source_cid)) is None:
-            logger.warn(color(f'channel {response.source_cid} not found for 0x{connection.handle:04X}:{cid}', 'red'))
+            logger.warning(color(f'channel {response.source_cid} not found for 0x{connection.handle:04X}:{cid}', 'red'))
             return
 
         channel.on_configure_response(response)
 
     def on_l2cap_disconnection_request(self, connection, cid, request):
         if (channel := self.find_channel(connection.handle, request.destination_cid)) is None:
-            logger.warn(color(f'channel {request.destination_cid} not found for 0x{connection.handle:04X}:{cid}', 'red'))
+            logger.warning(color(f'channel {request.destination_cid} not found for 0x{connection.handle:04X}:{cid}', 'red'))
             return
 
         channel.on_disconnection_request(request)
 
     def on_l2cap_disconnection_response(self, connection, cid, response):
         if (channel := self.find_channel(connection.handle, response.source_cid)) is None:
-            logger.warn(color(f'channel {response.source_cid} not found for 0x{connection.handle:04X}:{cid}', 'red'))
+            logger.warning(color(f'channel {response.source_cid} not found for 0x{connection.handle:04X}:{cid}', 'red'))
             return
 
         channel.on_disconnection_response(response)
@@ -1076,25 +1552,123 @@
             )
 
     def on_l2cap_connection_parameter_update_response(self, connection, cid, response):
+        # TODO: check response
         pass
 
     def on_l2cap_le_credit_based_connection_request(self, connection, cid, request):
-        # FIXME: temp fixed values
-        self.send_control_frame(
-            connection,
-            cid,
-            L2CAP_LE_Credit_Based_Connection_Response(
-                identifier      = request.identifier,
-                destination_cid = 194,  # FIXME: for testing only
-                mtu             = L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU,
-                mps             = L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS,
-                initial_credits = 3,  # FIXME: for testing only
-                result          = L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_SUCCESSFUL
-            )
-        )
+        if request.le_psm in self.le_coc_servers:
+            (server, max_credits, mtu, mps) = self.le_coc_servers[request.le_psm]
 
-    def on_l2cap_le_flow_control_credit(self, connection, cid, packet):
-        pass
+            # Check that the CID isn't already used
+            le_connection_channels = self.le_coc_channels.setdefault(connection.handle, {})
+            if request.source_cid in le_connection_channels:
+                logger.warning(f'source CID {request.source_cid} already in use')
+                self.send_control_frame(
+                    connection,
+                    cid,
+                    L2CAP_LE_Credit_Based_Connection_Response(
+                        identifier      = request.identifier,
+                        destination_cid = 0,
+                        mtu             = mtu,
+                        mps             = mps,
+                        initial_credits = 0,
+                        result          = L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_REFUSED_SOURCE_CID_ALREADY_ALLOCATED
+                    )
+                )
+                return
+
+            # Find a free CID for this new channel
+            connection_channels = self.channels.setdefault(connection.handle, {})
+            source_cid = self.find_free_le_cid(connection_channels)
+            if source_cid is None:  # Should never happen!
+                self.send_control_frame(
+                    connection,
+                    cid,
+                    L2CAP_LE_Credit_Based_Connection_Response(
+                        identifier      = request.identifier,
+                        destination_cid = 0,
+                        mtu             = mtu,
+                        mps             = mps,
+                        initial_credits = 0,
+                        result          = L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_REFUSED_NO_RESOURCES_AVAILABLE,
+                    )
+                )
+                return
+
+            # Create a new channel
+            logger.debug(f'creating LE CoC server channel with cid={source_cid} for psm {request.le_psm}')
+            channel = LeConnectionOrientedChannel(
+                self,
+                connection,
+                request.le_psm,
+                source_cid,
+                request.source_cid,
+                mtu,
+                mps,
+                request.initial_credits,
+                request.mtu,
+                request.mps,
+                max_credits,
+                True
+            )
+            connection_channels[source_cid] = channel
+            le_connection_channels[request.source_cid] = channel
+
+            # Respond
+            self.send_control_frame(
+                connection,
+                cid,
+                L2CAP_LE_Credit_Based_Connection_Response(
+                    identifier      = request.identifier,
+                    destination_cid = source_cid,
+                    mtu             = mtu,
+                    mps             = mps,
+                    initial_credits = max_credits,
+                    result          = L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_SUCCESSFUL
+                )
+            )
+
+            # Notify
+            server(channel)
+        else:
+            logger.info(f'No LE server for connection 0x{connection.handle:04X} on PSM {request.le_psm}')
+            self.send_control_frame(
+                connection,
+                cid,
+                L2CAP_LE_Credit_Based_Connection_Response(
+                    identifier      = request.identifier,
+                    destination_cid = 0,
+                    mtu             = L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU,
+                    mps             = L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS,
+                    initial_credits = 0,
+                    result          = L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_REFUSED_LE_PSM_NOT_SUPPORTED,
+                )
+            )
+
+    def on_l2cap_le_credit_based_connection_response(self, connection, cid, response):
+        # Find the pending request by identifier
+        request = self.le_coc_requests.get(response.identifier)
+        if request is None:
+            logger.warning(color('!!! received response for unknown request', 'red'))
+            return
+        del self.le_coc_requests[response.identifier]
+
+        # Find the channel for this request
+        channel = self.find_channel(connection.handle, request.source_cid)
+        if channel is None:
+            logger.warning(color(f'received connection response for an unknown channel (cid={request.source_cid})', 'red'))
+            return
+
+        # Process the response
+        channel.on_connection_response(response)
+
+    def on_l2cap_le_flow_control_credit(self, connection, cid, credit):
+        channel = self.find_le_coc_channel(connection.handle, credit.cid)
+        if channel is None:
+            logger.warning(f'received credits for an unknown channel (cid={credit.cid}')
+            return
+
+        channel.on_credits(credit.credits)
 
     def on_channel_closed(self, channel):
         connection_channels = self.channels.get(channel.connection.handle)
@@ -1102,22 +1676,65 @@
             if channel.source_cid in connection_channels:
                 del connection_channels[channel.source_cid]
 
-    async def connect(self, connection, psm):
-        # NOTE: this implementation hard-codes BR/EDR more
-        # TODO: LE mode (maybe?)
+    async def open_le_coc(self, connection, psm, max_credits, mtu, mps):
+        self.check_le_coc_parameters(max_credits, mtu, mps)
 
-        # Find a free CID for a new channel
+        # Find a free CID for the new channel
         connection_channels = self.channels.setdefault(connection.handle, {})
-        cid = self.find_free_br_edr_cid(connection_channels)
-        if cid is None:  # Should never happen!
+        source_cid = self.find_free_le_cid(connection_channels)
+        if source_cid is None:  # Should never happen!
             raise RuntimeError('all CIDs already in use')
 
         # Create the channel
-        logger.debug(f'creating client channel with cid={cid} for psm {psm}')
-        channel = Channel(self, connection, L2CAP_SIGNALING_CID, psm, cid, L2CAP_MIN_BR_EDR_MTU)
-        connection_channels[cid] = channel
+        logger.debug(f'creating coc channel with cid={source_cid} for psm {psm}')
+        channel = LeConnectionOrientedChannel(
+            manager         = self,
+            connection      = connection,
+            le_psm          = psm,
+            source_cid      = source_cid,
+            destination_cid = 0,
+            mtu             = mtu,
+            mps             = mps,
+            credits         = 0,
+            peer_mtu        = 0,
+            peer_mps        = 0,
+            peer_credits    = max_credits,
+            connected       = False
+        )
+        connection_channels[source_cid] = channel
 
         # Connect
-        await channel.connect()
+        try:
+            await channel.connect()
+        except Exception as error:
+            logger.warning(f'connection failed: {error}')
+            del connection_channels[source_cid]
+            raise
+
+        # Remember the channel by source CID and destination CID
+        le_connection_channels = self.le_coc_channels.setdefault(connection.handle, {})
+        le_connection_channels[channel.destination_cid] = channel
+
+        return channel
+
+    async def connect(self, connection, psm):
+        # NOTE: this implementation hard-codes BR/EDR
+
+        # Find a free CID for a new channel
+        connection_channels = self.channels.setdefault(connection.handle, {})
+        source_cid = self.find_free_br_edr_cid(connection_channels)
+        if source_cid is None:  # Should never happen!
+            raise RuntimeError('all CIDs already in use')
+
+        # Create the channel
+        logger.debug(f'creating client channel with cid={source_cid} for psm {psm}')
+        channel = Channel(self, connection, L2CAP_SIGNALING_CID, psm, source_cid, L2CAP_MIN_BR_EDR_MTU)
+        connection_channels[source_cid] = channel
+
+        # Connect
+        try:
+            await channel.connect()
+        except Exception:
+            del connection_channels[source_cid]
 
         return channel
diff --git a/bumble/transport/common.py b/bumble/transport/common.py
index d5c1ae9..0f5d27f 100644
--- a/bumble/transport/common.py
+++ b/bumble/transport/common.py
@@ -274,7 +274,7 @@
                     self.terminated.set_result(error)
                     break
 
-        self.pump_task = asyncio.get_running_loop().create_task(pump_packets())
+        self.pump_task = asyncio.create_task(pump_packets())
 
     def close(self):
         if self.pump_task:
@@ -304,7 +304,7 @@
                     logger.warn(f'exception while sending packet: {error}')
                     break
 
-        self.pump_task = asyncio.get_running_loop().create_task(pump_packets())
+        self.pump_task = asyncio.create_task(pump_packets())
 
     def close(self):
         if self.pump_task:
diff --git a/bumble/utils.py b/bumble/utils.py
index 1ab3fd7..5d8ab95 100644
--- a/bumble/utils.py
+++ b/bumble/utils.py
@@ -18,6 +18,7 @@
 import asyncio
 import logging
 import traceback
+import collections
 from functools import wraps
 from colors import color
 from pyee import EventEmitter
@@ -140,3 +141,95 @@
             return wrapper
 
         return decorator
+
+
+# -----------------------------------------------------------------------------
+class FlowControlAsyncPipe:
+    """
+    Asyncio pipe with flow control. When writing to the pipe, the source is
+    paused (by calling a function passed in when the pipe is created) if the
+    amount of queued data exceeds a specified threshold.
+    """
+    def __init__(self, pause_source, resume_source, write_to_sink=None, drain_sink=None, threshold=0):
+        self.pause_source  = pause_source
+        self.resume_source = resume_source
+        self.write_to_sink = write_to_sink
+        self.drain_sink    = drain_sink
+        self.threshold     = threshold
+        self.queue         = collections.deque()  # Queue of packets
+        self.queued_bytes  = 0                    # Number of bytes in the queue
+        self.ready_to_pump = asyncio.Event()
+        self.paused        = False
+        self.source_paused = False
+        self.pump_task     = None
+
+    def start(self):
+        if self.pump_task is None:
+            self.pump_task = asyncio.create_task(self.pump())
+
+        self.check_pump()
+
+    def stop(self):
+        if self.pump_task is not None:
+            self.pump_task.cancel()
+            self.pump_task = None
+
+    def write(self, packet):
+        self.queued_bytes += len(packet)
+        self.queue.append(packet)
+
+        # Pause the source if we're over the threshold
+        if self.queued_bytes > self.threshold and not self.source_paused:
+            logger.debug(f'pausing source (queued={self.queued_bytes})')
+            self.pause_source()
+            self.source_paused = True
+
+        self.check_pump()
+
+    def pause(self):
+        if not self.paused:
+            self.paused = True
+            if not self.source_paused:
+                self.pause_source()
+                self.source_paused = True
+            self.check_pump()
+
+    def resume(self):
+        if self.paused:
+            self.paused = False
+            if self.source_paused:
+                self.resume_source()
+                self.source_paused = False
+            self.check_pump()
+
+    def can_pump(self):
+        return self.queue and not self.paused and self.write_to_sink is not None
+
+    def check_pump(self):
+        if self.can_pump():
+            self.ready_to_pump.set()
+        else:
+            self.ready_to_pump.clear()
+
+    async def pump(self):
+        while True:
+            # Wait until we can try to pump packets
+            await self.ready_to_pump.wait()
+
+            # Try to pump a packet
+            if self.can_pump():
+                packet = self.queue.pop()
+                self.write_to_sink(packet)
+                self.queued_bytes -= len(packet)
+
+                # Drain the sink if we can
+                if self.drain_sink:
+                    await self.drain_sink()
+
+                # Check if we can accept more
+                if self.queued_bytes <= self.threshold and self.source_paused:
+                    logger.debug(f'resuming source (queued={self.queued_bytes})')
+                    self.source_paused = False
+                    self.resume_source()
+
+            self.check_pump()
diff --git a/examples/asha_sink1.json b/examples/asha_sink1.json
new file mode 100644
index 0000000..badef8b
--- /dev/null
+++ b/examples/asha_sink1.json
@@ -0,0 +1,5 @@
+{
+    "name": "Bumble Aid Left",
+    "address": "F1:F2:F3:F4:F5:F6",
+    "keystore": "JsonKeyStore"
+}
diff --git a/examples/asha_sink2.json b/examples/asha_sink2.json
new file mode 100644
index 0000000..785d406
--- /dev/null
+++ b/examples/asha_sink2.json
@@ -0,0 +1,5 @@
+{
+    "name": "Bumble Aid Right",
+    "address": "F7:F8:F9:FA:FB:FC",
+    "keystore": "JsonKeyStore"
+}
diff --git a/examples/run_asha_sink.py b/examples/run_asha_sink.py
new file mode 100644
index 0000000..bebb5de
--- /dev/null
+++ b/examples/run_asha_sink.py
@@ -0,0 +1,161 @@
+# Copyright 2021-2022 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# -----------------------------------------------------------------------------
+# Imports
+# -----------------------------------------------------------------------------
+import asyncio
+import struct
+import sys
+import os
+import logging
+
+from bumble.core import AdvertisingData
+from bumble.device import Device
+from bumble.transport import open_transport_or_link
+from bumble.hci import UUID
+from bumble.gatt import (
+    Service,
+    Characteristic,
+    CharacteristicValue
+)
+
+
+# -----------------------------------------------------------------------------
+# Constants
+# -----------------------------------------------------------------------------
+ASHA_SERVICE                             = UUID.from_16_bits(0xFDF0, 'Audio Streaming for Hearing Aid')
+ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC = UUID('6333651e-c481-4a3e-9169-7c902aad37bb', 'ReadOnlyProperties')
+ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC  = UUID('f0d4de7e-4a88-476c-9d9f-1937b0996cc0', 'AudioControlPoint')
+ASHA_AUDIO_STATUS_CHARACTERISTIC         = UUID('38663f1a-e711-4cac-b641-326b56404837', 'AudioStatus')
+ASHA_VOLUME_CHARACTERISTIC               = UUID('00e4ca9e-ab14-41e4-8823-f9e70c7e91df', 'Volume')
+ASHA_LE_PSM_OUT_CHARACTERISTIC           = UUID('2d410339-82b6-42aa-b34e-e2e01df8cc1a', 'LE_PSM_OUT')
+
+
+# -----------------------------------------------------------------------------
+async def main():
+    if len(sys.argv) != 4:
+        print('Usage: python run_asha_sink.py <device-config> <transport-spec> <audio-file>')
+        print('example: python run_asha_sink.py device1.json usb:0 audio_out.g722')
+        return
+
+    audio_out = open(sys.argv[3], 'wb')
+
+    async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
+        device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
+
+        # Handler for audio control commands
+        def on_audio_control_point_write(connection, value):
+            print('--- AUDIO CONTROL POINT Write:', value.hex())
+            opcode = value[0]
+            if opcode == 1:
+                # Start
+                audio_type = ('Unknown', 'Ringtone', 'Phone Call', 'Media')[value[2]]
+                print(f'### START: codec={value[1]}, audio_type={audio_type}, volume={value[3]}, otherstate={value[4]}')
+            elif opcode == 2:
+                print('### STOP')
+            elif opcode == 3:
+                print(f'### STATUS: connected={value[1]}')
+
+            # Respond with a status
+            asyncio.create_task(device.notify_subscribers(audio_status_characteristic, force=True))
+
+        # Handler for volume control
+        def on_volume_write(connection, value):
+            print('--- VOLUME Write:', value[0])
+
+        # Register an L2CAP CoC server
+        def on_coc(channel):
+            def on_data(data):
+                print('<<< Voice data received:', data.hex())
+                audio_out.write(data)
+
+            channel.sink = on_data
+
+        psm = device.register_l2cap_channel_server(0, on_coc, 8)
+        print(f'### LE_PSM_OUT = {psm}')
+
+        # Add the ASHA service to the GATT server
+        read_only_properties_characteristic = Characteristic(
+            ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
+            Characteristic.READ,
+            Characteristic.READABLE,
+            bytes([
+                0x01,        # Version
+                0x00,        # Device Capabilities [Left, Monaural]
+                0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08,  # HiSyncId
+                0x01,        # Feature Map [LE CoC audio output streaming supported]
+                0x00, 0x00,  # Render Delay
+                0x00, 0x00,  # RFU
+                0x02, 0x00   # Codec IDs [G.722 at 16 kHz]
+            ])
+        )
+        audio_control_point_characteristic = Characteristic(
+            ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
+            Characteristic.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE,
+            Characteristic.WRITEABLE,
+            CharacteristicValue(write=on_audio_control_point_write)
+        )
+        audio_status_characteristic = Characteristic(
+            ASHA_AUDIO_STATUS_CHARACTERISTIC,
+            Characteristic.READ | Characteristic.NOTIFY,
+            Characteristic.READABLE,
+            bytes([0])
+        )
+        volume_characteristic = Characteristic(
+            ASHA_VOLUME_CHARACTERISTIC,
+            Characteristic.WRITE_WITHOUT_RESPONSE,
+            Characteristic.WRITEABLE,
+            CharacteristicValue(write=on_volume_write)
+        )
+        le_psm_out_characteristic = Characteristic(
+            ASHA_LE_PSM_OUT_CHARACTERISTIC,
+            Characteristic.READ,
+            Characteristic.READABLE,
+            struct.pack('<H', psm)
+        )
+        device.add_service(Service(
+            ASHA_SERVICE,
+            [
+                read_only_properties_characteristic,
+                audio_control_point_characteristic,
+                audio_status_characteristic,
+                volume_characteristic,
+                le_psm_out_characteristic
+            ]
+        ))
+
+        # Set the advertising data
+        device.advertising_data = bytes(
+            AdvertisingData([
+                (AdvertisingData.COMPLETE_LOCAL_NAME, bytes(device.name, 'utf-8')),
+                (AdvertisingData.FLAGS, bytes([0x06])),
+                (AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, bytes(ASHA_SERVICE)),
+                (AdvertisingData.SERVICE_DATA_16_BIT_UUID, bytes(ASHA_SERVICE) + bytes([
+                    0x01,  # Protocol Version
+                    0x00,  # Capability
+                    0x01, 0x02, 0x03, 0x04  # Truncated HiSyncID
+                ]))
+            ])
+        )
+
+        # Go!
+        await device.power_on()
+        await device.start_advertising(auto_restart=True)
+
+        await hci_source.wait_for_termination()
+
+# -----------------------------------------------------------------------------
+logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
+asyncio.run(main())
diff --git a/examples/run_notifier.py b/examples/run_notifier.py
index f52b56c..15173cb 100644
--- a/examples/run_notifier.py
+++ b/examples/run_notifier.py
@@ -50,10 +50,16 @@
 
 
 # -----------------------------------------------------------------------------
+# Alternative way to listen for subscriptions
+# -----------------------------------------------------------------------------
+def on_my_characteristic_subscription(peer, enabled):
+    print(f'### My characteristic from {peer}: {"enabled" if enabled else "disabled"}')
+
+# -----------------------------------------------------------------------------
 async def main():
     if len(sys.argv) < 3:
-        print('Usage: run_gatt_server.py <device-config> <transport-spec>')
-        print('example: run_gatt_server.py device1.json usb:0')
+        print('Usage: run_notifier.py <device-config> <transport-spec>')
+        print('example: run_notifier.py device1.json usb:0')
         return
 
     print('<<< connecting to HCI...')
@@ -83,6 +89,7 @@
             Characteristic.READABLE,
             bytes([0x42])
         )
+        characteristic3.on('subscription', on_my_characteristic_subscription)
         custom_service = Service(
             '50DB505C-8AC4-4738-8448-3B1D9CC09CC5',
             [characteristic1, characteristic2, characteristic3]
diff --git a/examples/run_rfcomm_client.py b/examples/run_rfcomm_client.py
index 83ef848..76586c3 100644
--- a/examples/run_rfcomm_client.py
+++ b/examples/run_rfcomm_client.py
@@ -98,6 +98,7 @@
 
     await sdp_client.disconnect()
 
+
 # -----------------------------------------------------------------------------
 class TcpServerProtocol(asyncio.Protocol):
     def __init__(self, rfcomm_session):
@@ -173,7 +174,7 @@
         print('*** Encryption on')
 
         # Create a client and start it
-        print('@@@ Starting to RFCOMM client...')
+        print('@@@ Starting RFCOMM client...')
         rfcomm_client = Client(device, connection)
         rfcomm_mux = await rfcomm_client.start()
         print('@@@ Started')
@@ -192,7 +193,7 @@
         if len(sys.argv) == 6:
             # A TCP port was specified, start listening
             tcp_port = int(sys.argv[5])
-            asyncio.get_running_loop().create_task(tcp_server(tcp_port, session))
+            asyncio.create_task(tcp_server(tcp_port, session))
 
         await hci_source.wait_for_termination()
 
diff --git a/setup.cfg b/setup.cfg
index ff99226..c64dcce 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -51,6 +51,7 @@
     bumble-controller-info = bumble.apps.controller_info:main
     bumble-gatt-dump = bumble.apps.gatt_dump:main
     bumble-hci-bridge = bumble.apps.hci_bridge:main
+    bumble-l2cap-bridge = bumble.apps.l2cap_bridge:main
     bumble-pair = bumble.apps.pair:main
     bumble-scan = bumble.apps.scan:main
     bumble-show = bumble.apps.show:main
@@ -64,6 +65,7 @@
 test =
     pytest >= 6.2
     pytest-asyncio >= 0.17
+    coverage >= 6.4
 development =
     invoke >= 1.4
     nox >= 2022
diff --git a/tests/l2cap_test.py b/tests/l2cap_test.py
new file mode 100644
index 0000000..319038b
--- /dev/null
+++ b/tests/l2cap_test.py
@@ -0,0 +1,284 @@
+# Copyright 2021-2022 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# -----------------------------------------------------------------------------
+# Imports
+# -----------------------------------------------------------------------------
+import asyncio
+import logging
+import os
+import random
+import pytest
+
+from bumble.controller import Controller
+from bumble.link import LocalLink
+from bumble.device import Device
+from bumble.host import Host
+from bumble.transport import AsyncPipeSink
+from bumble.core import ProtocolError
+from bumble.l2cap import (
+    L2CAP_Connection_Request
+)
+
+
+# -----------------------------------------------------------------------------
+# Logging
+# -----------------------------------------------------------------------------
+logger = logging.getLogger(__name__)
+
+
+# -----------------------------------------------------------------------------
+class TwoDevices:
+    def __init__(self):
+        self.connections = [None, None]
+
+        self.link = LocalLink()
+        self.controllers = [
+            Controller('C1', link = self.link),
+            Controller('C2', link = self.link)
+        ]
+        self.devices = [
+            Device(
+                address = 'F0:F1:F2:F3:F4:F5',
+                host    = Host(self.controllers[0], AsyncPipeSink(self.controllers[0]))
+            ),
+            Device(
+                address = 'F5:F4:F3:F2:F1:F0',
+                host    = Host(self.controllers[1], AsyncPipeSink(self.controllers[1]))
+            )
+        ]
+
+        self.paired = [None, None]
+
+    def on_connection(self, which, connection):
+        self.connections[which] = connection
+
+    def on_paired(self, which, keys):
+        self.paired[which] = keys
+
+
+# -----------------------------------------------------------------------------
+async def setup_connection():
+    # Create two devices, each with a controller, attached to the same link
+    two_devices = TwoDevices()
+
+    # Attach listeners
+    two_devices.devices[0].on('connection', lambda connection: two_devices.on_connection(0, connection))
+    two_devices.devices[1].on('connection', lambda connection: two_devices.on_connection(1, connection))
+
+    # Start
+    await two_devices.devices[0].power_on()
+    await two_devices.devices[1].power_on()
+
+    # Connect the two devices
+    await two_devices.devices[0].connect(two_devices.devices[1].random_address)
+
+    # Check the post conditions
+    assert(two_devices.connections[0] is not None)
+    assert(two_devices.connections[1] is not None)
+
+    return two_devices
+
+
+# -----------------------------------------------------------------------------
+def test_helpers():
+    psm = L2CAP_Connection_Request.serialize_psm(0x01)
+    assert(psm == bytes([0x01, 0x00]))
+
+    psm = L2CAP_Connection_Request.serialize_psm(0x1023)
+    assert(psm == bytes([0x23, 0x10]))
+
+    psm = L2CAP_Connection_Request.serialize_psm(0x242311)
+    assert(psm == bytes([0x11, 0x23, 0x24]))
+
+    (offset, psm) = L2CAP_Connection_Request.parse_psm(bytes([0x00, 0x01, 0x00, 0x44]), 1)
+    assert(offset == 3)
+    assert(psm == 0x01)
+
+    (offset, psm) = L2CAP_Connection_Request.parse_psm(bytes([0x00, 0x23, 0x10, 0x44]), 1)
+    assert(offset == 3)
+    assert(psm == 0x1023)
+
+    (offset, psm) = L2CAP_Connection_Request.parse_psm(bytes([0x00, 0x11, 0x23, 0x24, 0x44]), 1)
+    assert(offset == 4)
+    assert(psm == 0x242311)
+
+    rq = L2CAP_Connection_Request(psm = 0x01, source_cid = 0x44)
+    brq = bytes(rq)
+    srq = L2CAP_Connection_Request.from_bytes(brq)
+    assert(srq.psm == rq.psm)
+    assert(srq.source_cid == rq.source_cid)
+
+
+# -----------------------------------------------------------------------------
[email protected]
+async def test_basic_connection():
+    devices = await setup_connection()
+    psm = 1234
+
+    # Check that if there's no one listening, we can't connect
+    with pytest.raises(ProtocolError):
+        l2cap_channel = await devices.connections[0].open_l2cap_channel(psm)
+
+    # Now add a listener
+    incoming_channel = None
+    received = []
+
+    def on_coc(channel):
+        nonlocal incoming_channel
+        incoming_channel = channel
+
+        def on_data(data):
+            received.append(data)
+
+        channel.sink = on_data
+
+    devices.devices[1].register_l2cap_channel_server(psm, on_coc)
+    l2cap_channel = await devices.connections[0].open_l2cap_channel(psm)
+
+    messages = (
+        bytes([1, 2, 3]),
+        bytes([4, 5, 6]),
+        bytes(10000)
+    )
+    for message in messages:
+        l2cap_channel.write(message)
+        await asyncio.sleep(0)
+
+    await l2cap_channel.drain()
+
+    # Test closing
+    closed = [False, False]
+    closed_event = asyncio.Event()
+
+    def on_close(which, event):
+        closed[which] = True
+        if event:
+            event.set()
+
+    l2cap_channel.on('close', lambda: on_close(0, None))
+    incoming_channel.on('close', lambda: on_close(1, closed_event))
+    await l2cap_channel.disconnect()
+    assert(closed == [True, True])
+    await closed_event.wait()
+
+    sent_bytes = b''.join(messages)
+    received_bytes = b''.join(received)
+    assert(sent_bytes == received_bytes)
+
+
+# -----------------------------------------------------------------------------
+async def transfer_payload(max_credits, mtu, mps):
+    devices = await setup_connection()
+
+    received = []
+
+    def on_coc(channel):
+        def on_data(data):
+            received.append(data)
+
+        channel.sink = on_data
+
+    psm = devices.devices[1].register_l2cap_channel_server(
+        psm         = 0,
+        server      = on_coc,
+        max_credits = max_credits,
+        mtu         = mtu,
+        mps         = mps
+    )
+    l2cap_channel = await devices.connections[0].open_l2cap_channel(psm)
+
+    messages = [
+        bytes([1, 2, 3, 4, 5, 6, 7]) * x
+        for x in (3, 10, 100, 500, 789)
+    ]
+    for message in messages:
+        l2cap_channel.write(message)
+        await asyncio.sleep(0)
+        if random.randint(0, 5) == 1:
+            await l2cap_channel.drain()
+
+    await l2cap_channel.drain()
+    await l2cap_channel.disconnect()
+
+    sent_bytes = b''.join(messages)
+    received_bytes = b''.join(received)
+    assert(sent_bytes == received_bytes)
+
+
[email protected]
+async def test_transfer():
+    for max_credits in (1, 10, 100, 10000):
+        for mtu in (23, 24, 25, 26, 50, 200, 255, 256, 1000):
+            for mps in (23, 24, 25, 26, 50, 200, 255, 256, 1000):
+                # print(max_credits, mtu, mps)
+                await transfer_payload(max_credits, mtu, mps)
+
+
+# -----------------------------------------------------------------------------
[email protected]
+async def test_bidirectional_transfer():
+    devices = await setup_connection()
+
+    client_received = []
+    server_received = []
+    server_channel  = None
+
+    def on_server_coc(channel):
+        nonlocal server_channel
+        server_channel = channel
+
+        def on_server_data(data):
+            server_received.append(data)
+
+        channel.sink = on_server_data
+
+    def on_client_data(data):
+        client_received.append(data)
+
+    psm = devices.devices[1].register_l2cap_channel_server(psm=0, server=on_server_coc)
+    client_channel = await devices.connections[0].open_l2cap_channel(psm)
+    client_channel.sink = on_client_data
+
+    messages = [
+        bytes([1, 2, 3, 4, 5, 6, 7]) * x
+        for x in (3, 10, 100)
+    ]
+    for message in messages:
+        client_channel.write(message)
+        await client_channel.drain()
+        await asyncio.sleep(0)
+        server_channel.write(message)
+        await server_channel.drain()
+
+    await client_channel.disconnect()
+
+    message_bytes = b''.join(messages)
+    client_received_bytes = b''.join(client_received)
+    server_received_bytes = b''.join(server_received)
+    assert(client_received_bytes == message_bytes)
+    assert(server_received_bytes == message_bytes)
+
+
+# -----------------------------------------------------------------------------
+async def run():
+    test_helpers()
+    await test_basic_connection()
+    await test_transfer()
+    await test_bidirectional_transfer()
+
+# -----------------------------------------------------------------------------
+if __name__ == '__main__':
+    logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
+    asyncio.run(run())