| #!/usr/bin/env python3 |
| # Copyright 2023 The Pigweed Authors |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| # use this file except in compliance with the License. You may obtain a copy of |
| # the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations under |
| # the License. |
| """Tests for the transfer service client.""" |
| |
| import enum |
| import math |
| import os |
| import unittest |
| from typing import Iterable |
| |
| from pw_status import Status |
| from pw_rpc import callback_client, client, ids, packets |
| from pw_rpc.internal import packet_pb2 |
| |
| import pw_transfer |
| from pw_transfer import ProtocolVersion |
| |
| from pw_transfer import transfer_pb2 |
| |
| _TRANSFER_SERVICE_ID = ids.calculate('pw.transfer.Transfer') |
| _FIRST_SESSION_ID = 1 |
| _ARBITRARY_TRANSFER_ID = 66 |
| |
| # If the default timeout is too short, some tests become flaky on Windows. |
| DEFAULT_TIMEOUT_S = 0.3 |
| |
| |
| class _Method(enum.Enum): |
| READ = ids.calculate('Read') |
| WRITE = ids.calculate('Write') |
| |
| |
| # pylint: disable=missing-function-docstring, missing-class-docstring |
| |
| |
| class TransferManagerTest(unittest.TestCase): |
| # pylint: disable=too-many-public-methods |
| """Tests for the transfer manager.""" |
| |
| def setUp(self) -> None: |
| self._client = client.Client.from_modules( |
| callback_client.Impl(), |
| [client.Channel(1, self._handle_request)], |
| (transfer_pb2,), |
| ) |
| self._service = self._client.channel(1).rpcs.pw.transfer.Transfer |
| |
| self._sent_chunks: list[transfer_pb2.Chunk] = [] |
| self._packets_to_send: list[list[packet_pb2.RpcPacket]] = [] |
| |
| def _enqueue_server_responses( |
| self, method: _Method, responses: Iterable[Iterable[transfer_pb2.Chunk]] |
| ) -> None: |
| for group in responses: |
| serialized_group = [] |
| for response in group: |
| serialized_group.append( |
| packet_pb2.RpcPacket( |
| type=packet_pb2.PacketType.SERVER_STREAM, |
| channel_id=1, |
| service_id=_TRANSFER_SERVICE_ID, |
| method_id=method.value, |
| status=Status.OK.value, |
| payload=response.SerializeToString(), |
| ) |
| ) |
| self._packets_to_send.append(serialized_group) |
| |
| def _enqueue_server_error(self, method: _Method, error: Status) -> None: |
| self._packets_to_send.append( |
| [ |
| packet_pb2.RpcPacket( |
| type=packet_pb2.PacketType.SERVER_ERROR, |
| channel_id=1, |
| service_id=_TRANSFER_SERVICE_ID, |
| method_id=method.value, |
| status=error.value, |
| ) |
| ] |
| ) |
| |
| def _handle_request(self, data: bytes) -> None: |
| packet = packets.decode(data) |
| if packet.type is not packet_pb2.PacketType.CLIENT_STREAM: |
| return |
| |
| chunk = transfer_pb2.Chunk() |
| chunk.MergeFromString(packet.payload) |
| self._sent_chunks.append(chunk) |
| |
| if self._packets_to_send: |
| responses = self._packets_to_send.pop(0) |
| for response in responses: |
| response.call_id = packet.call_id |
| self._client.process_packet(response.SerializeToString()) |
| |
| def _received_data(self) -> bytearray: |
| data = bytearray() |
| for chunk in self._sent_chunks: |
| data.extend(chunk.data) |
| return data |
| |
| def test_read_transfer_basic(self): |
| manager = pw_transfer.Manager( |
| self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S |
| ) |
| |
| self._enqueue_server_responses( |
| _Method.READ, |
| ( |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=3, offset=0, data=b'abc', remaining_bytes=0 |
| ), |
| ), |
| ), |
| ) |
| |
| data = manager.read(3) |
| self.assertEqual(data, b'abc') |
| self.assertEqual(len(self._sent_chunks), 2) |
| self.assertTrue(self._sent_chunks[-1].HasField('status')) |
| self.assertEqual(self._sent_chunks[-1].status, 0) |
| |
| def test_read_transfer_multichunk(self) -> None: |
| manager = pw_transfer.Manager( |
| self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S |
| ) |
| |
| self._enqueue_server_responses( |
| _Method.READ, |
| ( |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=3, offset=0, data=b'abc', remaining_bytes=3 |
| ), |
| transfer_pb2.Chunk( |
| transfer_id=3, offset=3, data=b'def', remaining_bytes=0 |
| ), |
| ), |
| ), |
| ) |
| |
| data = manager.read(3) |
| self.assertEqual(data, b'abcdef') |
| self.assertEqual(len(self._sent_chunks), 2) |
| self.assertTrue(self._sent_chunks[-1].HasField('status')) |
| self.assertEqual(self._sent_chunks[-1].status, 0) |
| |
| def test_read_transfer_progress_callback(self) -> None: |
| manager = pw_transfer.Manager( |
| self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S |
| ) |
| |
| self._enqueue_server_responses( |
| _Method.READ, |
| ( |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=3, offset=0, data=b'abc', remaining_bytes=3 |
| ), |
| transfer_pb2.Chunk( |
| transfer_id=3, offset=3, data=b'def', remaining_bytes=0 |
| ), |
| ), |
| ), |
| ) |
| |
| progress: list[pw_transfer.ProgressStats] = [] |
| |
| data = manager.read(3, progress.append) |
| self.assertEqual(data, b'abcdef') |
| self.assertEqual(len(self._sent_chunks), 2) |
| self.assertTrue(self._sent_chunks[-1].HasField('status')) |
| self.assertEqual(self._sent_chunks[-1].status, 0) |
| self.assertEqual( |
| progress, |
| [ |
| pw_transfer.ProgressStats(3, 3, 6), |
| pw_transfer.ProgressStats(6, 6, 6), |
| ], |
| ) |
| |
| def test_read_transfer_retry_bad_offset(self) -> None: |
| """Server responds with an unexpected offset in a read transfer.""" |
| manager = pw_transfer.Manager( |
| self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S |
| ) |
| |
| self._enqueue_server_responses( |
| _Method.READ, |
| ( |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=3, offset=0, data=b'123', remaining_bytes=6 |
| ), |
| # Incorrect offset; expecting 3. |
| transfer_pb2.Chunk( |
| transfer_id=3, offset=1, data=b'456', remaining_bytes=3 |
| ), |
| ), |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=3, offset=3, data=b'456', remaining_bytes=3 |
| ), |
| transfer_pb2.Chunk( |
| transfer_id=3, offset=6, data=b'789', remaining_bytes=0 |
| ), |
| ), |
| ), |
| ) |
| |
| data = manager.read(3) |
| self.assertEqual(data, b'123456789') |
| |
| # Two transfer parameter requests should have been sent. |
| self.assertEqual(len(self._sent_chunks), 3) |
| self.assertTrue(self._sent_chunks[-1].HasField('status')) |
| self.assertEqual(self._sent_chunks[-1].status, 0) |
| |
| def test_read_transfer_recovery_sends_parameters_on_retry(self) -> None: |
| """Server sends the same chunk twice (retry) in a read transfer.""" |
| manager = pw_transfer.Manager( |
| self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S |
| ) |
| |
| self._enqueue_server_responses( |
| _Method.READ, |
| ( |
| ( |
| # Bad offset, enter recovery state. Only one parameters |
| # chunk should be sent. |
| transfer_pb2.Chunk( |
| transfer_id=3, offset=1, data=b'234', remaining_bytes=5 |
| ), |
| transfer_pb2.Chunk( |
| transfer_id=3, offset=4, data=b'567', remaining_bytes=2 |
| ), |
| transfer_pb2.Chunk( |
| transfer_id=3, offset=7, data=b'8', remaining_bytes=1 |
| ), |
| ), |
| ( |
| # Only one parameters chunk should be sent after the server |
| # retries the same offset twice. |
| transfer_pb2.Chunk( |
| transfer_id=3, offset=1, data=b'234', remaining_bytes=5 |
| ), |
| transfer_pb2.Chunk( |
| transfer_id=3, offset=4, data=b'567', remaining_bytes=2 |
| ), |
| transfer_pb2.Chunk( |
| transfer_id=3, offset=7, data=b'8', remaining_bytes=1 |
| ), |
| transfer_pb2.Chunk( |
| transfer_id=3, offset=7, data=b'8', remaining_bytes=1 |
| ), |
| ), |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=3, |
| offset=0, |
| data=b'123456789', |
| remaining_bytes=0, |
| ), |
| ), |
| ), |
| ) |
| |
| data = manager.read(3) |
| self.assertEqual(data, b'123456789') |
| |
| self.assertEqual(len(self._sent_chunks), 4) |
| self.assertEqual( |
| self._sent_chunks[0].type, transfer_pb2.Chunk.Type.START |
| ) |
| self.assertEqual(self._sent_chunks[0].offset, 0) |
| self.assertEqual( |
| self._sent_chunks[1].type, |
| transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT, |
| ) |
| self.assertEqual(self._sent_chunks[1].offset, 0) |
| self.assertEqual( |
| self._sent_chunks[2].type, |
| transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT, |
| ) |
| self.assertEqual(self._sent_chunks[2].offset, 0) |
| self.assertEqual( |
| self._sent_chunks[3].type, transfer_pb2.Chunk.Type.COMPLETION |
| ) |
| |
| def test_read_transfer_retry_timeout(self) -> None: |
| """Server doesn't respond to read transfer parameters.""" |
| manager = pw_transfer.Manager( |
| self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S |
| ) |
| |
| self._enqueue_server_responses( |
| _Method.READ, |
| ( |
| (), # Send nothing in response to the initial parameters. |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=3, offset=0, data=b'xyz', remaining_bytes=0 |
| ), |
| ), |
| ), |
| ) |
| |
| data = manager.read(3) |
| self.assertEqual(data, b'xyz') |
| |
| # Two transfer parameter requests should have been sent. |
| self.assertEqual(len(self._sent_chunks), 3) |
| self.assertTrue(self._sent_chunks[-1].HasField('status')) |
| self.assertEqual(self._sent_chunks[-1].status, 0) |
| |
| def test_read_transfer_lifetime_retries(self) -> None: |
| """Server doesn't respond several times during the transfer.""" |
| manager = pw_transfer.Manager( |
| self._service, |
| default_response_timeout_s=DEFAULT_TIMEOUT_S, |
| max_retries=2**32 - 1, |
| max_lifetime_retries=4, |
| ) |
| |
| self._enqueue_server_responses( |
| _Method.READ, |
| ( |
| (), # Retry 1 |
| (), # Retry 2 |
| ( |
| transfer_pb2.Chunk( # Expected chunk. |
| transfer_id=43, offset=0, data=b'xyz' |
| ), |
| ), |
| # Don't send anything else. The maximum lifetime retry count |
| # should be hit. |
| ), |
| ) |
| |
| with self.assertRaises(pw_transfer.Error) as context: |
| manager.read(43) |
| |
| self.assertEqual(len(self._sent_chunks), 5) |
| |
| exception = context.exception |
| self.assertEqual(exception.resource_id, 43) |
| self.assertEqual(exception.status, Status.DEADLINE_EXCEEDED) |
| |
| def test_read_transfer_timeout(self) -> None: |
| manager = pw_transfer.Manager( |
| self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S |
| ) |
| |
| with self.assertRaises(pw_transfer.Error) as context: |
| manager.read(27) |
| |
| exception = context.exception |
| self.assertEqual(exception.resource_id, 27) |
| self.assertEqual(exception.status, Status.DEADLINE_EXCEEDED) |
| |
| # The client should have sent four transfer parameters requests: one |
| # initial, and three retries. |
| self.assertEqual(len(self._sent_chunks), 4) |
| |
| def test_read_transfer_error(self) -> None: |
| manager = pw_transfer.Manager( |
| self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S |
| ) |
| |
| self._enqueue_server_responses( |
| _Method.READ, |
| ( |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=31, status=Status.NOT_FOUND.value |
| ), |
| ), |
| ), |
| ) |
| |
| with self.assertRaises(pw_transfer.Error) as context: |
| manager.read(31) |
| |
| exception = context.exception |
| self.assertEqual(exception.resource_id, 31) |
| self.assertEqual(exception.status, Status.NOT_FOUND) |
| |
| def test_read_transfer_server_error(self) -> None: |
| manager = pw_transfer.Manager( |
| self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S |
| ) |
| |
| self._enqueue_server_error(_Method.READ, Status.NOT_FOUND) |
| |
| with self.assertRaises(pw_transfer.Error) as context: |
| manager.read(31) |
| |
| exception = context.exception |
| self.assertEqual(exception.resource_id, 31) |
| self.assertEqual(exception.status, Status.INTERNAL) |
| |
| def test_read_transfer_adaptive_window_slow_start(self) -> None: |
| test_max_chunk_size = 16 |
| |
| manager = pw_transfer.Manager( |
| self._service, |
| default_response_timeout_s=DEFAULT_TIMEOUT_S, |
| max_chunk_size_bytes=test_max_chunk_size, |
| default_protocol_version=ProtocolVersion.LEGACY, |
| ) |
| |
| self._enqueue_server_responses( |
| _Method.READ, |
| ( |
| # First window: 1 chunk. |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=_ARBITRARY_TRANSFER_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=0, |
| data=b'#' * test_max_chunk_size, |
| ), |
| ), |
| # Second window: 2 chunks. |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=_ARBITRARY_TRANSFER_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=test_max_chunk_size, |
| data=b'#' * test_max_chunk_size, |
| ), |
| transfer_pb2.Chunk( |
| transfer_id=_ARBITRARY_TRANSFER_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=2 * test_max_chunk_size, |
| data=b'#' * test_max_chunk_size, |
| ), |
| ), |
| # Third window: finish transfer. |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=_ARBITRARY_TRANSFER_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=3 * test_max_chunk_size, |
| data=b'#' * test_max_chunk_size, |
| remaining_bytes=0, |
| ), |
| ), |
| ), |
| ) |
| |
| data = manager.read(_ARBITRARY_TRANSFER_ID) |
| |
| self.assertEqual( |
| self._sent_chunks, |
| [ |
| # First parameters: 1 chunk window. |
| transfer_pb2.Chunk( |
| type=transfer_pb2.Chunk.Type.START, |
| transfer_id=_ARBITRARY_TRANSFER_ID, |
| resource_id=_ARBITRARY_TRANSFER_ID, |
| pending_bytes=test_max_chunk_size, |
| max_chunk_size_bytes=test_max_chunk_size, |
| window_end_offset=test_max_chunk_size, |
| ), |
| # Second parameters: 2 chunk window. |
| transfer_pb2.Chunk( |
| type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, |
| transfer_id=_ARBITRARY_TRANSFER_ID, |
| offset=test_max_chunk_size, |
| pending_bytes=2 * test_max_chunk_size, |
| max_chunk_size_bytes=test_max_chunk_size, |
| window_end_offset=( |
| test_max_chunk_size + 2 * test_max_chunk_size |
| ), |
| ), |
| # Third parameters: 4 chunk window. |
| transfer_pb2.Chunk( |
| type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, |
| transfer_id=_ARBITRARY_TRANSFER_ID, |
| offset=2 * test_max_chunk_size, |
| pending_bytes=4 * test_max_chunk_size, |
| max_chunk_size_bytes=test_max_chunk_size, |
| window_end_offset=( |
| 2 * test_max_chunk_size + 4 * test_max_chunk_size |
| ), |
| ), |
| transfer_pb2.Chunk( |
| type=transfer_pb2.Chunk.Type.COMPLETION, |
| transfer_id=_ARBITRARY_TRANSFER_ID, |
| status=Status.OK.value, |
| ), |
| ], |
| ) |
| self.assertEqual(data, b'#' * (4 * test_max_chunk_size)) |
| |
| def test_read_transfer_adaptive_window_congestion_avoidance(self) -> None: |
| test_max_chunk_size = 16 |
| |
| manager = pw_transfer.Manager( |
| self._service, |
| default_response_timeout_s=DEFAULT_TIMEOUT_S, |
| max_chunk_size_bytes=test_max_chunk_size, |
| default_protocol_version=ProtocolVersion.LEGACY, |
| ) |
| |
| self._enqueue_server_responses( |
| _Method.READ, |
| ( |
| # First window: 1 chunk. |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=_ARBITRARY_TRANSFER_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=0, |
| data=b'#' * test_max_chunk_size, |
| ), |
| ), |
| # Second window: 2 chunks. |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=_ARBITRARY_TRANSFER_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=test_max_chunk_size, |
| data=b'#' * test_max_chunk_size, |
| ), |
| transfer_pb2.Chunk( |
| transfer_id=_ARBITRARY_TRANSFER_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=2 * test_max_chunk_size, |
| data=b'#' * test_max_chunk_size, |
| ), |
| ), |
| # Third window: send the wrong offset, triggering a |
| # retransmission. |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=_ARBITRARY_TRANSFER_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=5 * test_max_chunk_size, |
| data=b'#' * test_max_chunk_size, |
| ), |
| ), |
| # Fourth window: send the expected offset. |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=_ARBITRARY_TRANSFER_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=3 * test_max_chunk_size, |
| data=b'#' * test_max_chunk_size, |
| ), |
| transfer_pb2.Chunk( |
| transfer_id=_ARBITRARY_TRANSFER_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=4 * test_max_chunk_size, |
| data=b'#' * test_max_chunk_size, |
| ), |
| ), |
| # Fifth window: finish the transfer. |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=_ARBITRARY_TRANSFER_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=5 * test_max_chunk_size, |
| data=b'#' * test_max_chunk_size, |
| remaining_bytes=0, |
| ), |
| ), |
| ), |
| ) |
| |
| data = manager.read(_ARBITRARY_TRANSFER_ID) |
| |
| self.assertEqual( |
| self._sent_chunks, |
| [ |
| # First parameters: 1 chunk window. |
| transfer_pb2.Chunk( |
| type=transfer_pb2.Chunk.Type.START, |
| transfer_id=_ARBITRARY_TRANSFER_ID, |
| resource_id=_ARBITRARY_TRANSFER_ID, |
| pending_bytes=test_max_chunk_size, |
| max_chunk_size_bytes=test_max_chunk_size, |
| window_end_offset=test_max_chunk_size, |
| ), |
| # Second parameters: 2 chunk window. |
| transfer_pb2.Chunk( |
| type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, |
| transfer_id=_ARBITRARY_TRANSFER_ID, |
| offset=test_max_chunk_size, |
| pending_bytes=2 * test_max_chunk_size, |
| max_chunk_size_bytes=test_max_chunk_size, |
| window_end_offset=( |
| test_max_chunk_size + 2 * test_max_chunk_size |
| ), |
| ), |
| # Third parameters: 4 chunk window. |
| transfer_pb2.Chunk( |
| type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, |
| transfer_id=_ARBITRARY_TRANSFER_ID, |
| offset=2 * test_max_chunk_size, |
| pending_bytes=4 * test_max_chunk_size, |
| max_chunk_size_bytes=test_max_chunk_size, |
| window_end_offset=( |
| 2 * test_max_chunk_size + 4 * test_max_chunk_size |
| ), |
| ), |
| # Fourth parameters: data loss, retransmit and halve window. |
| transfer_pb2.Chunk( |
| type=transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT, |
| transfer_id=_ARBITRARY_TRANSFER_ID, |
| offset=3 * test_max_chunk_size, |
| pending_bytes=2 * test_max_chunk_size, |
| max_chunk_size_bytes=test_max_chunk_size, |
| window_end_offset=( |
| 3 * test_max_chunk_size + 2 * test_max_chunk_size |
| ), |
| ), |
| # Fifth parameters: in congestion avoidance, window size now |
| # only increases by one chunk instead of doubling. |
| transfer_pb2.Chunk( |
| type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, |
| transfer_id=_ARBITRARY_TRANSFER_ID, |
| offset=4 * test_max_chunk_size, |
| pending_bytes=3 * test_max_chunk_size, |
| max_chunk_size_bytes=test_max_chunk_size, |
| window_end_offset=( |
| 4 * test_max_chunk_size + 3 * test_max_chunk_size |
| ), |
| ), |
| transfer_pb2.Chunk( |
| type=transfer_pb2.Chunk.Type.COMPLETION, |
| transfer_id=_ARBITRARY_TRANSFER_ID, |
| status=Status.OK.value, |
| ), |
| ], |
| ) |
| self.assertEqual(data, b'#' * (6 * test_max_chunk_size)) |
| |
| def test_read_transfer_v2_adaptive_window_slow_start(self) -> None: |
| test_max_chunk_size = 16 |
| |
| manager = pw_transfer.Manager( |
| self._service, |
| default_response_timeout_s=DEFAULT_TIMEOUT_S, |
| max_chunk_size_bytes=test_max_chunk_size, |
| default_protocol_version=ProtocolVersion.VERSION_TWO, |
| ) |
| |
| self._enqueue_server_responses( |
| _Method.READ, |
| ( |
| ( |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.START_ACK, |
| protocol_version=ProtocolVersion.VERSION_TWO.value, |
| ), |
| ), |
| # First window: 1 chunk. |
| ( |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=0, |
| data=b'#' * test_max_chunk_size, |
| ), |
| ), |
| # Second window: 2 chunks. |
| ( |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=test_max_chunk_size, |
| data=b'#' * test_max_chunk_size, |
| ), |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=2 * test_max_chunk_size, |
| data=b'#' * test_max_chunk_size, |
| ), |
| ), |
| # Third window: finish transfer. |
| ( |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=3 * test_max_chunk_size, |
| data=b'#' * test_max_chunk_size, |
| remaining_bytes=0, |
| ), |
| ), |
| ( |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.COMPLETION_ACK, |
| ), |
| ), |
| ), |
| ) |
| |
| data = manager.read(_ARBITRARY_TRANSFER_ID) |
| |
| self.assertEqual( |
| self._sent_chunks, |
| [ |
| transfer_pb2.Chunk( |
| transfer_id=_ARBITRARY_TRANSFER_ID, |
| resource_id=_ARBITRARY_TRANSFER_ID, |
| desired_session_id=_FIRST_SESSION_ID, |
| pending_bytes=test_max_chunk_size, |
| max_chunk_size_bytes=test_max_chunk_size, |
| window_end_offset=test_max_chunk_size, |
| type=transfer_pb2.Chunk.Type.START, |
| protocol_version=ProtocolVersion.VERSION_TWO.value, |
| ), |
| # First parameters: 1 chunk window. |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION, |
| offset=0, |
| max_chunk_size_bytes=test_max_chunk_size, |
| window_end_offset=test_max_chunk_size, |
| protocol_version=ProtocolVersion.VERSION_TWO.value, |
| ), |
| # Second parameters: 2 chunk window. |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, |
| offset=test_max_chunk_size, |
| max_chunk_size_bytes=test_max_chunk_size, |
| window_end_offset=( |
| test_max_chunk_size + 2 * test_max_chunk_size |
| ), |
| ), |
| # Third parameters: 4 chunk window. |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, |
| offset=2 * test_max_chunk_size, |
| max_chunk_size_bytes=test_max_chunk_size, |
| window_end_offset=( |
| 2 * test_max_chunk_size + 4 * test_max_chunk_size |
| ), |
| ), |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.COMPLETION, |
| status=Status.OK.value, |
| ), |
| ], |
| ) |
| self.assertEqual(data, b'#' * (4 * test_max_chunk_size)) |
| |
| def test_read_transfer_v2_adaptive_window_congestion_avoidance( |
| self, |
| ) -> None: |
| test_max_chunk_size = 16 |
| |
| manager = pw_transfer.Manager( |
| self._service, |
| default_response_timeout_s=DEFAULT_TIMEOUT_S, |
| max_chunk_size_bytes=test_max_chunk_size, |
| default_protocol_version=ProtocolVersion.VERSION_TWO, |
| ) |
| |
| self._enqueue_server_responses( |
| _Method.READ, |
| ( |
| ( |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.START_ACK, |
| protocol_version=ProtocolVersion.VERSION_TWO.value, |
| ), |
| ), |
| # First window: 1 chunk. |
| ( |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=0, |
| data=b'#' * test_max_chunk_size, |
| ), |
| ), |
| # Second window: 2 chunks. |
| ( |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=test_max_chunk_size, |
| data=b'#' * test_max_chunk_size, |
| ), |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=2 * test_max_chunk_size, |
| data=b'#' * test_max_chunk_size, |
| ), |
| ), |
| # Third window: send the wrong offset, triggering a |
| # retransmission. |
| ( |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=5 * test_max_chunk_size, |
| data=b'#' * test_max_chunk_size, |
| ), |
| ), |
| # Fourth window: send the expected offset. |
| ( |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=3 * test_max_chunk_size, |
| data=b'#' * test_max_chunk_size, |
| ), |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=4 * test_max_chunk_size, |
| data=b'#' * test_max_chunk_size, |
| ), |
| ), |
| # Fifth window: finish the transfer. |
| ( |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=5 * test_max_chunk_size, |
| data=b'#' * test_max_chunk_size, |
| remaining_bytes=0, |
| ), |
| ), |
| ( |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.COMPLETION_ACK, |
| ), |
| ), |
| ), |
| ) |
| |
| data = manager.read(_ARBITRARY_TRANSFER_ID) |
| |
| self.assertEqual( |
| self._sent_chunks, |
| [ |
| transfer_pb2.Chunk( |
| type=transfer_pb2.Chunk.Type.START, |
| transfer_id=_ARBITRARY_TRANSFER_ID, |
| resource_id=_ARBITRARY_TRANSFER_ID, |
| desired_session_id=_FIRST_SESSION_ID, |
| pending_bytes=test_max_chunk_size, |
| max_chunk_size_bytes=test_max_chunk_size, |
| window_end_offset=test_max_chunk_size, |
| protocol_version=ProtocolVersion.VERSION_TWO.value, |
| ), |
| # First parameters: 1 chunk window. |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION, |
| offset=0, |
| max_chunk_size_bytes=test_max_chunk_size, |
| window_end_offset=test_max_chunk_size, |
| protocol_version=ProtocolVersion.VERSION_TWO.value, |
| ), |
| # Second parameters: 2 chunk window. |
| transfer_pb2.Chunk( |
| type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, |
| session_id=_FIRST_SESSION_ID, |
| offset=test_max_chunk_size, |
| max_chunk_size_bytes=test_max_chunk_size, |
| window_end_offset=( |
| test_max_chunk_size + 2 * test_max_chunk_size |
| ), |
| ), |
| # Third parameters: 4 chunk window. |
| transfer_pb2.Chunk( |
| type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, |
| session_id=_FIRST_SESSION_ID, |
| offset=2 * test_max_chunk_size, |
| max_chunk_size_bytes=test_max_chunk_size, |
| window_end_offset=( |
| 2 * test_max_chunk_size + 4 * test_max_chunk_size |
| ), |
| ), |
| # Fourth parameters: data loss, retransmit and halve window. |
| transfer_pb2.Chunk( |
| type=transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT, |
| session_id=_FIRST_SESSION_ID, |
| offset=3 * test_max_chunk_size, |
| max_chunk_size_bytes=test_max_chunk_size, |
| window_end_offset=( |
| 3 * test_max_chunk_size + 2 * test_max_chunk_size |
| ), |
| ), |
| # Fifth parameters: in congestion avoidance, window size now |
| # only increases by one chunk instead of doubling. |
| transfer_pb2.Chunk( |
| type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, |
| session_id=_FIRST_SESSION_ID, |
| offset=4 * test_max_chunk_size, |
| max_chunk_size_bytes=test_max_chunk_size, |
| window_end_offset=( |
| 4 * test_max_chunk_size + 3 * test_max_chunk_size |
| ), |
| ), |
| transfer_pb2.Chunk( |
| type=transfer_pb2.Chunk.Type.COMPLETION, |
| session_id=_FIRST_SESSION_ID, |
| status=Status.OK.value, |
| ), |
| ], |
| ) |
| self.assertEqual(data, b'#' * (6 * test_max_chunk_size)) |
| |
| def test_write_transfer_basic(self) -> None: |
| manager = pw_transfer.Manager( |
| self._service, |
| default_response_timeout_s=DEFAULT_TIMEOUT_S, |
| ) |
| |
| self._enqueue_server_responses( |
| _Method.WRITE, |
| ( |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=4, |
| offset=0, |
| pending_bytes=32, |
| max_chunk_size_bytes=8, |
| ), |
| ), |
| (transfer_pb2.Chunk(transfer_id=4, status=Status.OK.value),), |
| ), |
| ) |
| |
| manager.write(4, b'hello') |
| self.assertEqual(len(self._sent_chunks), 2) |
| self.assertEqual(self._received_data(), b'hello') |
| |
| def test_write_transfer_max_chunk_size(self) -> None: |
| manager = pw_transfer.Manager( |
| self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S |
| ) |
| |
| self._enqueue_server_responses( |
| _Method.WRITE, |
| ( |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=4, |
| offset=0, |
| pending_bytes=32, |
| max_chunk_size_bytes=8, |
| ), |
| ), |
| (), |
| (transfer_pb2.Chunk(transfer_id=4, status=Status.OK.value),), |
| ), |
| ) |
| |
| manager.write(4, b'hello world') |
| self.assertEqual(len(self._sent_chunks), 3) |
| self.assertEqual(self._received_data(), b'hello world') |
| self.assertEqual(self._sent_chunks[1].data, b'hello wo') |
| self.assertEqual(self._sent_chunks[2].data, b'rld') |
| |
| def test_write_transfer_multiple_parameters(self) -> None: |
| manager = pw_transfer.Manager( |
| self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S |
| ) |
| |
| self._enqueue_server_responses( |
| _Method.WRITE, |
| ( |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=4, |
| offset=0, |
| pending_bytes=8, |
| max_chunk_size_bytes=8, |
| ), |
| ), |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=4, |
| offset=8, |
| pending_bytes=8, |
| max_chunk_size_bytes=8, |
| ), |
| ), |
| (transfer_pb2.Chunk(transfer_id=4, status=Status.OK.value),), |
| ), |
| ) |
| |
| manager.write(4, b'data to write') |
| self.assertEqual(len(self._sent_chunks), 3) |
| self.assertEqual(self._received_data(), b'data to write') |
| self.assertEqual(self._sent_chunks[1].data, b'data to ') |
| self.assertEqual(self._sent_chunks[2].data, b'write') |
| |
| def test_write_transfer_progress_callback(self) -> None: |
| manager = pw_transfer.Manager( |
| self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S |
| ) |
| |
| self._enqueue_server_responses( |
| _Method.WRITE, |
| ( |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=4, |
| offset=0, |
| pending_bytes=8, |
| max_chunk_size_bytes=8, |
| ), |
| ), |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=4, |
| offset=8, |
| pending_bytes=8, |
| max_chunk_size_bytes=8, |
| ), |
| ), |
| (transfer_pb2.Chunk(transfer_id=4, status=Status.OK.value),), |
| ), |
| ) |
| |
| progress: list[pw_transfer.ProgressStats] = [] |
| |
| manager.write(4, b'data to write', progress.append) |
| self.assertEqual(len(self._sent_chunks), 3) |
| self.assertEqual(self._received_data(), b'data to write') |
| self.assertEqual(self._sent_chunks[1].data, b'data to ') |
| self.assertEqual(self._sent_chunks[2].data, b'write') |
| self.assertEqual( |
| progress, |
| [ |
| pw_transfer.ProgressStats(8, 0, 13), |
| pw_transfer.ProgressStats(13, 8, 13), |
| pw_transfer.ProgressStats(13, 13, 13), |
| ], |
| ) |
| |
| def test_write_transfer_rewind(self) -> None: |
| """Write transfer in which the server re-requests an earlier offset.""" |
| manager = pw_transfer.Manager( |
| self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S |
| ) |
| |
| self._enqueue_server_responses( |
| _Method.WRITE, |
| ( |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=4, |
| offset=0, |
| pending_bytes=8, |
| max_chunk_size_bytes=8, |
| ), |
| ), |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=4, |
| offset=8, |
| pending_bytes=8, |
| max_chunk_size_bytes=8, |
| ), |
| ), |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=4, |
| offset=4, # rewind |
| pending_bytes=8, |
| max_chunk_size_bytes=8, |
| ), |
| ), |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=4, |
| offset=12, |
| pending_bytes=16, # update max size |
| max_chunk_size_bytes=16, |
| ), |
| ), |
| (transfer_pb2.Chunk(transfer_id=4, status=Status.OK.value),), |
| ), |
| ) |
| |
| manager.write(4, b'pigweed data transfer') |
| self.assertEqual(len(self._sent_chunks), 5) |
| self.assertEqual(self._sent_chunks[1].data, b'pigweed ') |
| self.assertEqual(self._sent_chunks[2].data, b'data tra') |
| self.assertEqual(self._sent_chunks[3].data, b'eed data') |
| self.assertEqual(self._sent_chunks[4].data, b' transfer') |
| |
| def test_write_transfer_bad_offset(self) -> None: |
| manager = pw_transfer.Manager( |
| self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S |
| ) |
| |
| self._enqueue_server_responses( |
| _Method.WRITE, |
| ( |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=4, |
| offset=0, |
| pending_bytes=8, |
| max_chunk_size_bytes=8, |
| ), |
| ), |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=4, |
| offset=100, # larger offset than data |
| pending_bytes=8, |
| max_chunk_size_bytes=8, |
| ), |
| ), |
| (transfer_pb2.Chunk(transfer_id=4, status=Status.OK.value),), |
| ), |
| ) |
| |
| with self.assertRaises(pw_transfer.Error) as context: |
| manager.write(4, b'small data') |
| |
| exception = context.exception |
| self.assertEqual(exception.resource_id, 4) |
| self.assertEqual(exception.status, Status.OUT_OF_RANGE) |
| |
| def test_write_transfer_error(self) -> None: |
| manager = pw_transfer.Manager( |
| self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S |
| ) |
| |
| self._enqueue_server_responses( |
| _Method.WRITE, |
| ( |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=21, status=Status.UNAVAILABLE.value |
| ), |
| ), |
| ), |
| ) |
| |
| with self.assertRaises(pw_transfer.Error) as context: |
| manager.write(21, b'no write') |
| |
| exception = context.exception |
| self.assertEqual(exception.resource_id, 21) |
| self.assertEqual(exception.status, Status.UNAVAILABLE) |
| |
| def test_write_transfer_server_error(self) -> None: |
| manager = pw_transfer.Manager( |
| self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S |
| ) |
| |
| self._enqueue_server_error(_Method.WRITE, Status.NOT_FOUND) |
| |
| with self.assertRaises(pw_transfer.Error) as context: |
| manager.write(21, b'server error') |
| |
| exception = context.exception |
| self.assertEqual(exception.resource_id, 21) |
| self.assertEqual(exception.status, Status.INTERNAL) |
| |
| def test_write_transfer_timeout_after_initial_chunk(self) -> None: |
| manager = pw_transfer.Manager( |
| self._service, |
| default_response_timeout_s=0.001, |
| max_retries=2, |
| default_protocol_version=ProtocolVersion.LEGACY, |
| ) |
| |
| with self.assertRaises(pw_transfer.Error) as context: |
| manager.write(22, b'no server response!') |
| |
| self.assertEqual( |
| self._sent_chunks, |
| [ |
| transfer_pb2.Chunk( |
| transfer_id=22, |
| resource_id=22, |
| type=transfer_pb2.Chunk.Type.START, |
| ), # initial chunk |
| transfer_pb2.Chunk( |
| transfer_id=22, |
| resource_id=22, |
| type=transfer_pb2.Chunk.Type.START, |
| ), # retry 1 |
| transfer_pb2.Chunk( |
| transfer_id=22, |
| resource_id=22, |
| type=transfer_pb2.Chunk.Type.START, |
| ), # retry 2 |
| ], |
| ) |
| |
| exception = context.exception |
| self.assertEqual(exception.resource_id, 22) |
| self.assertEqual(exception.status, Status.DEADLINE_EXCEEDED) |
| |
| def test_write_transfer_timeout_after_intermediate_chunk(self) -> None: |
| """Tests write transfers that timeout after the initial chunk.""" |
| manager = pw_transfer.Manager( |
| self._service, |
| default_response_timeout_s=DEFAULT_TIMEOUT_S, |
| max_retries=2, |
| default_protocol_version=ProtocolVersion.LEGACY, |
| ) |
| |
| self._enqueue_server_responses( |
| _Method.WRITE, |
| [ |
| [ |
| transfer_pb2.Chunk( |
| transfer_id=22, pending_bytes=10, max_chunk_size_bytes=5 |
| ) |
| ] |
| ], |
| ) |
| |
| with self.assertRaises(pw_transfer.Error) as context: |
| manager.write(22, b'0123456789') |
| |
| last_data_chunk = transfer_pb2.Chunk( |
| transfer_id=22, |
| data=b'56789', |
| offset=5, |
| remaining_bytes=0, |
| type=transfer_pb2.Chunk.Type.DATA, |
| ) |
| |
| self.assertEqual( |
| self._sent_chunks, |
| [ |
| transfer_pb2.Chunk( |
| transfer_id=22, |
| resource_id=22, |
| type=transfer_pb2.Chunk.Type.START, |
| ), |
| transfer_pb2.Chunk( |
| transfer_id=22, |
| data=b'01234', |
| type=transfer_pb2.Chunk.Type.DATA, |
| ), |
| last_data_chunk, # last chunk |
| last_data_chunk, # retry 1 |
| last_data_chunk, # retry 2 |
| ], |
| ) |
| |
| exception = context.exception |
| self.assertEqual(exception.resource_id, 22) |
| self.assertEqual(exception.status, Status.DEADLINE_EXCEEDED) |
| |
| def test_write_zero_pending_bytes_is_internal_error(self) -> None: |
| manager = pw_transfer.Manager( |
| self._service, default_response_timeout_s=DEFAULT_TIMEOUT_S |
| ) |
| |
| self._enqueue_server_responses( |
| _Method.WRITE, |
| ((transfer_pb2.Chunk(transfer_id=23, pending_bytes=0),),), |
| ) |
| |
| with self.assertRaises(pw_transfer.Error) as context: |
| manager.write(23, b'no write') |
| |
| exception = context.exception |
| self.assertEqual(exception.resource_id, 23) |
| self.assertEqual(exception.status, Status.INTERNAL) |
| |
| def test_v2_read_transfer_basic(self) -> None: |
| """Tests a simple protocol version 2 read transfer.""" |
| manager = pw_transfer.Manager( |
| self._service, |
| default_response_timeout_s=DEFAULT_TIMEOUT_S, |
| default_protocol_version=ProtocolVersion.VERSION_TWO, |
| ) |
| |
| self._enqueue_server_responses( |
| _Method.READ, |
| ( |
| ( |
| transfer_pb2.Chunk( |
| resource_id=39, |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.START_ACK, |
| protocol_version=ProtocolVersion.VERSION_TWO.value, |
| ), |
| ), |
| ( |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=0, |
| data=b'version two', |
| remaining_bytes=0, |
| ), |
| ), |
| ( |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.COMPLETION_ACK, |
| ), |
| ), |
| ), |
| ) |
| |
| data = manager.read(39) |
| |
| self.assertEqual( |
| self._sent_chunks, |
| [ |
| transfer_pb2.Chunk( |
| transfer_id=39, |
| resource_id=39, |
| desired_session_id=_FIRST_SESSION_ID, |
| pending_bytes=1024, |
| max_chunk_size_bytes=1024, |
| window_end_offset=1024, |
| type=transfer_pb2.Chunk.Type.START, |
| protocol_version=ProtocolVersion.VERSION_TWO.value, |
| ), |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION, |
| max_chunk_size_bytes=1024, |
| window_end_offset=1024, |
| # pending_bytes should no longer exist as server and client |
| # have agreed on v2. |
| protocol_version=ProtocolVersion.VERSION_TWO.value, |
| ), |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.COMPLETION, |
| status=Status.OK.value, |
| ), |
| ], |
| ) |
| |
| self.assertEqual(data, b'version two') |
| |
| def test_v2_read_transfer_legacy_fallback(self) -> None: |
| """Tests a v2 read transfer when the server only supports legacy.""" |
| manager = pw_transfer.Manager( |
| self._service, |
| default_response_timeout_s=DEFAULT_TIMEOUT_S, |
| default_protocol_version=ProtocolVersion.VERSION_TWO, |
| ) |
| |
| # Respond to the START chunk with a legacy data transfer chunk instead |
| # of a START_ACK. |
| self._enqueue_server_responses( |
| _Method.READ, |
| ( |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=40, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=0, |
| data=b'sorry, legacy only', |
| remaining_bytes=0, |
| ), |
| ), |
| ), |
| ) |
| |
| data = manager.read(40) |
| |
| self.assertEqual( |
| self._sent_chunks, |
| [ |
| transfer_pb2.Chunk( |
| transfer_id=40, |
| resource_id=40, |
| desired_session_id=_FIRST_SESSION_ID, |
| pending_bytes=1024, |
| max_chunk_size_bytes=1024, |
| window_end_offset=1024, |
| type=transfer_pb2.Chunk.Type.START, |
| protocol_version=ProtocolVersion.VERSION_TWO.value, |
| ), |
| transfer_pb2.Chunk( |
| transfer_id=40, |
| type=transfer_pb2.Chunk.Type.COMPLETION, |
| status=Status.OK.value, |
| ), |
| ], |
| ) |
| |
| self.assertEqual(data, b'sorry, legacy only') |
| |
| def test_v2_read_transfer_re_sent_data(self) -> None: |
| """Tests a simple protocol version 2 read transfer.""" |
| manager = pw_transfer.Manager( |
| self._service, |
| default_response_timeout_s=DEFAULT_TIMEOUT_S, |
| default_protocol_version=ProtocolVersion.VERSION_TWO, |
| ) |
| |
| self._enqueue_server_responses( |
| _Method.READ, |
| ( |
| ( |
| transfer_pb2.Chunk( |
| resource_id=39, |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.START_ACK, |
| protocol_version=ProtocolVersion.VERSION_TWO.value, |
| ), |
| ), |
| ( |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=0, |
| data=b'01234567', |
| ), |
| # Re-send already transmitted data. |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=4, |
| data=b'4567', |
| ), |
| ), |
| ( |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=8, |
| data=b'89abcdef', |
| remaining_bytes=0, |
| ), |
| ), |
| ( |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.COMPLETION_ACK, |
| ), |
| ), |
| ), |
| ) |
| |
| data = manager.read(39) |
| |
| self.assertEqual( |
| self._sent_chunks, |
| [ |
| transfer_pb2.Chunk( |
| transfer_id=39, |
| resource_id=39, |
| desired_session_id=_FIRST_SESSION_ID, |
| pending_bytes=1024, |
| max_chunk_size_bytes=1024, |
| window_end_offset=1024, |
| type=transfer_pb2.Chunk.Type.START, |
| protocol_version=ProtocolVersion.VERSION_TWO.value, |
| ), |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION, |
| max_chunk_size_bytes=1024, |
| window_end_offset=1024, |
| protocol_version=ProtocolVersion.VERSION_TWO.value, |
| ), |
| # Should send a continue chunk in response to retransmission. |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.PARAMETERS_CONTINUE, |
| offset=8, |
| max_chunk_size_bytes=1024, |
| window_end_offset=1024, |
| ), |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.COMPLETION, |
| status=Status.OK.value, |
| ), |
| ], |
| ) |
| |
| self.assertEqual(data, b'0123456789abcdef') |
| |
| def test_v2_write_transfer_basic(self) -> None: |
| """Tests a simple protocol version 2 write transfer.""" |
| manager = pw_transfer.Manager( |
| self._service, |
| default_response_timeout_s=DEFAULT_TIMEOUT_S, |
| default_protocol_version=ProtocolVersion.VERSION_TWO, |
| ) |
| |
| self._enqueue_server_responses( |
| _Method.WRITE, |
| ( |
| ( |
| transfer_pb2.Chunk( |
| resource_id=72, |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.START_ACK, |
| protocol_version=ProtocolVersion.VERSION_TWO.value, |
| ), |
| ), |
| ( |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT, |
| offset=0, |
| window_end_offset=32, |
| max_chunk_size_bytes=8, |
| ), |
| ), |
| (), # In response to the first data chunk. |
| ( |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.COMPLETION, |
| status=Status.OK.value, |
| ), |
| ), |
| ), |
| ) |
| |
| manager.write(72, b'write version 2') |
| |
| self.assertEqual( |
| self._sent_chunks, |
| [ |
| transfer_pb2.Chunk( |
| transfer_id=72, |
| resource_id=72, |
| desired_session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.START, |
| protocol_version=ProtocolVersion.VERSION_TWO.value, |
| ), |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION, |
| protocol_version=ProtocolVersion.VERSION_TWO.value, |
| ), |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=0, |
| data=b'write ve', |
| ), |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=8, |
| data=b'rsion 2', |
| remaining_bytes=0, |
| ), |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.COMPLETION_ACK, |
| ), |
| ], |
| ) |
| |
| self.assertEqual(self._received_data(), b'write version 2') |
| |
| def test_v2_write_transfer_legacy_fallback(self) -> None: |
| """Tests a v2 write transfer when the server only supports legacy.""" |
| manager = pw_transfer.Manager( |
| self._service, |
| default_response_timeout_s=DEFAULT_TIMEOUT_S, |
| default_protocol_version=ProtocolVersion.VERSION_TWO, |
| ) |
| |
| self._enqueue_server_responses( |
| _Method.WRITE, |
| ( |
| # Send a parameters chunk immediately per the legacy protocol. |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=76, |
| type=transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT, |
| offset=0, |
| pending_bytes=32, |
| window_end_offset=32, |
| max_chunk_size_bytes=8, |
| ), |
| ), |
| (), # In response to the first data chunk. |
| ( |
| transfer_pb2.Chunk( |
| transfer_id=76, |
| type=transfer_pb2.Chunk.Type.COMPLETION, |
| status=Status.OK.value, |
| ), |
| ), |
| ), |
| ) |
| |
| manager.write(76, b'write v... NOPE') |
| |
| self.assertEqual( |
| self._sent_chunks, |
| [ |
| transfer_pb2.Chunk( |
| transfer_id=76, |
| resource_id=76, |
| desired_session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.START, |
| protocol_version=ProtocolVersion.VERSION_TWO.value, |
| ), |
| transfer_pb2.Chunk( |
| transfer_id=76, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=0, |
| data=b'write v.', |
| ), |
| transfer_pb2.Chunk( |
| transfer_id=76, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=8, |
| data=b'.. NOPE', |
| remaining_bytes=0, |
| ), |
| ], |
| ) |
| |
| self.assertEqual(self._received_data(), b'write v... NOPE') |
| |
| def test_v2_server_error(self) -> None: |
| """Tests a server error occurring during the opening handshake.""" |
| |
| manager = pw_transfer.Manager( |
| self._service, |
| default_response_timeout_s=DEFAULT_TIMEOUT_S, |
| default_protocol_version=ProtocolVersion.VERSION_TWO, |
| ) |
| |
| self._enqueue_server_responses( |
| _Method.READ, |
| ( |
| ( |
| transfer_pb2.Chunk( |
| resource_id=43, |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.START_ACK, |
| protocol_version=ProtocolVersion.VERSION_TWO.value, |
| ), |
| ), |
| ( |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.COMPLETION, |
| status=Status.DATA_LOSS.value, |
| ), |
| ), |
| ), |
| ) |
| |
| with self.assertRaises(pw_transfer.Error) as context: |
| manager.read(43) |
| |
| self.assertEqual( |
| self._sent_chunks, |
| [ |
| transfer_pb2.Chunk( |
| transfer_id=43, |
| desired_session_id=_FIRST_SESSION_ID, |
| resource_id=43, |
| pending_bytes=1024, |
| max_chunk_size_bytes=1024, |
| window_end_offset=1024, |
| type=transfer_pb2.Chunk.Type.START, |
| protocol_version=ProtocolVersion.VERSION_TWO.value, |
| ), |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION, |
| max_chunk_size_bytes=1024, |
| window_end_offset=1024, |
| protocol_version=ProtocolVersion.VERSION_TWO.value, |
| ), |
| # Client sends a COMPLETION_ACK in response to the server. |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.COMPLETION_ACK, |
| ), |
| ], |
| ) |
| |
| exception = context.exception |
| self.assertEqual(exception.resource_id, 43) |
| self.assertEqual(exception.status, Status.DATA_LOSS) |
| |
| def test_v2_timeout_during_opening_handshake(self) -> None: |
| """Tests a timeout occurring during the opening handshake.""" |
| manager = pw_transfer.Manager( |
| self._service, |
| default_response_timeout_s=DEFAULT_TIMEOUT_S, |
| default_protocol_version=ProtocolVersion.VERSION_TWO, |
| ) |
| |
| # Don't enqueue any server responses. |
| |
| with self.assertRaises(pw_transfer.Error) as context: |
| manager.read(41) |
| |
| start_chunk = transfer_pb2.Chunk( |
| transfer_id=41, |
| resource_id=41, |
| desired_session_id=_FIRST_SESSION_ID, |
| pending_bytes=1024, |
| max_chunk_size_bytes=1024, |
| window_end_offset=1024, |
| type=transfer_pb2.Chunk.Type.START, |
| protocol_version=ProtocolVersion.VERSION_TWO.value, |
| ) |
| |
| # The opening chunk should be sent initially, then retried three times. |
| self.assertEqual(self._sent_chunks, [start_chunk] * 4) |
| |
| exception = context.exception |
| self.assertEqual(exception.resource_id, 41) |
| self.assertEqual(exception.status, Status.DEADLINE_EXCEEDED) |
| |
| def test_v2_timeout_recovery_during_opening_handshake(self) -> None: |
| """Tests a timeout during the opening handshake which recovers.""" |
| manager = pw_transfer.Manager( |
| self._service, |
| default_response_timeout_s=DEFAULT_TIMEOUT_S, |
| default_protocol_version=ProtocolVersion.VERSION_TWO, |
| ) |
| |
| self._enqueue_server_responses( |
| _Method.WRITE, |
| ( |
| ( |
| transfer_pb2.Chunk( |
| resource_id=73, |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.START_ACK, |
| protocol_version=ProtocolVersion.VERSION_TWO.value, |
| ), |
| ), |
| (), # Don't respond to the START_ACK_CONFIRMATION. |
| (), # Don't respond to the first START_ACK_CONFIRMATION retry. |
| ( |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.PARAMETERS_RETRANSMIT, |
| offset=0, |
| window_end_offset=32, |
| max_chunk_size_bytes=8, |
| ), |
| ), |
| (), # In response to the first data chunk. |
| ( |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.COMPLETION, |
| status=Status.OK.value, |
| ), |
| ), |
| ), |
| ) |
| |
| manager.write(73, b'write timeout 2') |
| |
| start_ack_confirmation = transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION, |
| protocol_version=ProtocolVersion.VERSION_TWO.value, |
| ) |
| |
| self.assertEqual( |
| self._sent_chunks, |
| [ |
| transfer_pb2.Chunk( |
| transfer_id=73, |
| resource_id=73, |
| desired_session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.START, |
| protocol_version=ProtocolVersion.VERSION_TWO.value, |
| ), |
| start_ack_confirmation, # Initial transmission |
| start_ack_confirmation, # Retry 1 |
| start_ack_confirmation, # Retry 2 |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=0, |
| data=b'write ti', |
| ), |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=8, |
| data=b'meout 2', |
| remaining_bytes=0, |
| ), |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.COMPLETION_ACK, |
| ), |
| ], |
| ) |
| |
| self.assertEqual(self._received_data(), b'write timeout 2') |
| |
| def test_v2_closing_handshake_bad_chunk(self) -> None: |
| """Tests an unexpected chunk response during the closing handshake.""" |
| manager = pw_transfer.Manager( |
| self._service, |
| default_response_timeout_s=DEFAULT_TIMEOUT_S, |
| default_protocol_version=ProtocolVersion.VERSION_TWO, |
| ) |
| |
| self._enqueue_server_responses( |
| _Method.READ, |
| ( |
| ( |
| transfer_pb2.Chunk( |
| resource_id=47, |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.START_ACK, |
| protocol_version=ProtocolVersion.VERSION_TWO.value, |
| ), |
| ), |
| ( |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=0, |
| data=b'version two', |
| remaining_bytes=0, |
| ), |
| ), |
| # In response to the COMPLETION, re-send the last chunk instead |
| # of a COMPLETION_ACK. |
| ( |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=0, |
| data=b'version two', |
| remaining_bytes=0, |
| ), |
| ), |
| ( |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.COMPLETION_ACK, |
| ), |
| ), |
| ), |
| ) |
| |
| data = manager.read(47) |
| |
| self.assertEqual( |
| self._sent_chunks, |
| [ |
| transfer_pb2.Chunk( |
| transfer_id=47, |
| resource_id=47, |
| desired_session_id=_FIRST_SESSION_ID, |
| pending_bytes=1024, |
| max_chunk_size_bytes=1024, |
| window_end_offset=1024, |
| type=transfer_pb2.Chunk.Type.START, |
| protocol_version=ProtocolVersion.VERSION_TWO.value, |
| ), |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION, |
| max_chunk_size_bytes=1024, |
| window_end_offset=1024, |
| protocol_version=ProtocolVersion.VERSION_TWO.value, |
| ), |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.COMPLETION, |
| status=Status.OK.value, |
| ), |
| # Completion should be re-sent following the repeated chunk. |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.COMPLETION, |
| status=Status.OK.value, |
| ), |
| ], |
| ) |
| |
| self.assertEqual(data, b'version two') |
| |
| def test_v2_timeout_during_closing_handshake(self) -> None: |
| """Tests a timeout occurring during the closing handshake.""" |
| manager = pw_transfer.Manager( |
| self._service, |
| default_response_timeout_s=DEFAULT_TIMEOUT_S, |
| default_protocol_version=ProtocolVersion.VERSION_TWO, |
| ) |
| |
| self._enqueue_server_responses( |
| _Method.READ, |
| ( |
| ( |
| transfer_pb2.Chunk( |
| resource_id=47, |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.START_ACK, |
| protocol_version=ProtocolVersion.VERSION_TWO.value, |
| ), |
| ), |
| ( |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.DATA, |
| offset=0, |
| data=b'dropped completion', |
| remaining_bytes=0, |
| ), |
| ), |
| # Never send the expected COMPLETION_ACK chunk. |
| ), |
| ) |
| |
| data = manager.read(47) |
| |
| self.assertEqual( |
| self._sent_chunks, |
| [ |
| transfer_pb2.Chunk( |
| transfer_id=47, |
| resource_id=47, |
| desired_session_id=_FIRST_SESSION_ID, |
| pending_bytes=1024, |
| max_chunk_size_bytes=1024, |
| window_end_offset=1024, |
| type=transfer_pb2.Chunk.Type.START, |
| protocol_version=ProtocolVersion.VERSION_TWO.value, |
| ), |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.START_ACK_CONFIRMATION, |
| max_chunk_size_bytes=1024, |
| window_end_offset=1024, |
| protocol_version=ProtocolVersion.VERSION_TWO.value, |
| ), |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.COMPLETION, |
| status=Status.OK.value, |
| ), |
| # The completion should be retried per the usual retry flow. |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.COMPLETION, |
| status=Status.OK.value, |
| ), |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.COMPLETION, |
| status=Status.OK.value, |
| ), |
| transfer_pb2.Chunk( |
| session_id=_FIRST_SESSION_ID, |
| type=transfer_pb2.Chunk.Type.COMPLETION, |
| status=Status.OK.value, |
| ), |
| ], |
| ) |
| |
| # Despite timing out following several retries, the transfer should |
| # still conclude successfully, as failing to receive a COMPLETION_ACK |
| # is not fatal. |
| self.assertEqual(data, b'dropped completion') |
| |
| |
| class ProgressStatsTest(unittest.TestCase): |
| def test_received_percent_known_total(self) -> None: |
| self.assertEqual( |
| pw_transfer.ProgressStats(75, 0, 100).percent_received(), 0.0 |
| ) |
| self.assertEqual( |
| pw_transfer.ProgressStats(75, 50, 100).percent_received(), 50.0 |
| ) |
| self.assertEqual( |
| pw_transfer.ProgressStats(100, 100, 100).percent_received(), 100.0 |
| ) |
| |
| def test_received_percent_unknown_total(self) -> None: |
| self.assertTrue( |
| math.isnan( |
| pw_transfer.ProgressStats(75, 50, None).percent_received() |
| ) |
| ) |
| self.assertTrue( |
| math.isnan( |
| pw_transfer.ProgressStats(100, 100, None).percent_received() |
| ) |
| ) |
| |
| def test_str_known_total(self) -> None: |
| stats = str(pw_transfer.ProgressStats(75, 50, 100)) |
| self.assertIn('75', stats) |
| self.assertIn('50', stats) |
| self.assertIn('100', stats) |
| |
| def test_str_unknown_total(self) -> None: |
| stats = str(pw_transfer.ProgressStats(75, 50, None)) |
| self.assertIn('75', stats) |
| self.assertIn('50', stats) |
| self.assertIn('unknown', stats) |
| |
| |
| if __name__ == '__main__': |
| # TODO: b/265975025 - Only run this test in upstream Pigweed until the |
| # occasional hangs are fixed. |
| if os.environ.get('PW_ROOT') and os.environ.get( |
| 'PW_ROOT' |
| ) == os.environ.get('PW_PROJECT_ROOT'): |
| unittest.main() |
| else: |
| print('Skipping transfer_test.py due to possible hangs (b/265975025).') |