blob: cf0f741bf148912afe41a08fe07bbc9465a80698 [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import inspect
import json
import random
import readline
import socket
import sys
import time
import requests
import struct
import asyncio
from concurrent.futures import ThreadPoolExecutor
import rf_packets as rf
class T4AT:
def __init__(self, reader, writer):
self.nfcid1 = bytes([0x08]) + int.to_bytes(random.randint(0, 0xffffff), length=3)
self.rats_response = bytes([0x2, 0x0])
self.reader = reader
self.writer = writer
async def _read(self) -> rf.RfPacket:
header_bytes = await self.reader.read(2)
packet_length = int.from_bytes(header_bytes, byteorder='little')
packet_bytes = await self.reader.read(packet_length)
packet = rf.RfPacket.parse_all(packet_bytes)
packet.show()
return packet
def _write(self, packet: rf.RfPacket):
packet_bytes = packet.serialize()
header_bytes = int.to_bytes(len(packet_bytes), length=2, byteorder='little')
self.writer.write(header_bytes + packet_bytes)
async def listen(self):
"""Emulate device in passive listen mode. Respond to poll requests until
the device is activated by a select command."""
while True:
packet = await self._read()
match packet:
case rf.PollCommand(technology=rf.Technology.NFC_A):
self._write(rf.NfcAPollResponse(
nfcid1=self.nfcid1, int_protocol=0b01))
case rf.T4ATSelectCommand(_):
self._write(rf.T4ATSelectResponse(
rats_response=self.rats_response,
receiver=packet.sender))
print(f"t4at device selected by #{packet.sender}")
await self.active(packet.sender)
case _:
pass
async def poll(self):
"""Emulate device in passive poll mode. Automatically selects the
first discovered device."""
while True:
try:
self._write(rf.PollCommand(technology=rf.Technology.NFC_A))
packet = await asyncio.wait_for(self._read(), timeout=1.0)
match packet:
# [DIGITAL] Table 20: SEL_RES Response Format
# 01b: Configured for Type 4A Tag Platform
case rf.NfcAPollResponse(int_protocol=0b01):
nfcid1 = bytes(packet.nfcid1)
print(f"discovered t4at device with nfcid1 #{nfcid1.hex()}")
self._write(rf.T4ATSelectCommand(receiver=packet.sender, param=0))
response = await asyncio.wait_for(
self.wait_for_select_response(packet.sender), timeout=1.0)
print(f"t4at device activation complete")
await self.active(response.sender)
case _:
pass
time.sleep(0.050);
except TimeoutError:
pass
time.sleep(0.050);
try:
signature = bytes([0x1, 0x2, 0x3, 0x4]);
self._write(rf.PollCommand(technology=rf.Technology.NFC_RAW, data=signature))
await asyncio.wait_for(self._read(), timeout=1.0)
except TimeoutError:
pass
async def wait_for_select_response(self, sender_id: int):
while True:
packet = await self._read()
if isinstance(packet, rf.T4ATSelectResponse) and packet.sender == sender_id:
return packet
async def active(self, peer: int):
"""Active mode. Respond to data requests until the device
is deselected."""
while True:
packet = await self._read()
match packet:
case rf.DeactivateNotification(_):
return
case rf.Data(_):
pass
case _:
pass
async def run(address: str, rf_port: int, mode: str):
"""Emulate a T4AT compatible device in Listen mode."""
try:
reader, writer = await asyncio.open_connection(address, rf_port)
device = T4AT(reader, writer)
if mode == 'poll':
await device.poll()
elif mode == 'listen':
await device.listen()
else:
print(f"unsupported device mode {mode}")
except Exception as exn:
print(
f'Failed to connect to Casimir server at address {address}:{rf_port}:\n' +
f' {exn}\n' +
'Make sure the server is running')
exit(1)
def main():
"""Start a Casimir interactive console."""
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('--address',
type=str,
default='127.0.0.1',
help='Select the casimir server address')
parser.add_argument('--rf-port',
type=int,
default=7001,
help='Select the casimir TCP RF port')
parser.add_argument('--mode',
type=str,
choices=['poll', 'listen'],
default='poll',
help='Select the tag mode')
asyncio.run(run(**vars(parser.parse_args())))
if __name__ == '__main__':
main()