| // Copyright 2024 The Pigweed Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| // use this file except in compliance with the License. You may obtain a copy of |
| // the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| // License for the specific language governing permissions and limitations under |
| // the License. |
| |
| // Client binary for the cross-language integration test. |
| // |
| // Usage: |
| // bazel-bin/pw_transfer/integration_test_client 3300 <<< "resource_id: 12 |
| // file: '/tmp/myfile.txt'" |
| // |
| // WORK IN PROGRESS, SEE b/228516801 |
| #include "pw_transfer/client.h" |
| |
| #include <sys/socket.h> |
| |
| #include <cstddef> |
| #include <cstdio> |
| |
| #include "google/protobuf/text_format.h" |
| #include "pw_assert/check.h" |
| #include "pw_log/log.h" |
| #include "pw_rpc/channel.h" |
| #include "pw_rpc/integration_testing.h" |
| #include "pw_status/status.h" |
| #include "pw_status/try.h" |
| #include "pw_stream/std_file_stream.h" |
| #include "pw_sync/binary_semaphore.h" |
| #include "pw_thread/thread.h" |
| #include "pw_thread_stl/options.h" |
| #include "pw_transfer/integration_test/config.pb.h" |
| #include "pw_transfer/transfer_thread.h" |
| |
| namespace pw::transfer::integration_test { |
| namespace { |
| |
| // This is the maximum size of the socket send buffers. Ideally, this is set |
| // to the lowest allowed value to minimize buffering between the proxy and |
| // clients so rate limiting causes the client to block and wait for the |
| // integration test proxy to drain rather than allowing OS buffers to backlog |
| // large quantities of data. |
| // |
| // Note that the OS may chose to not strictly follow this requested buffer size. |
| // Still, setting this value to be as small as possible does reduce bufer sizes |
| // significantly enough to better reflect typical inter-device communication. |
| // |
| // For this to be effective, servers should also configure their sockets to a |
| // smaller receive buffer size. |
| constexpr int kMaxSocketSendBufferSize = 1; |
| |
| constexpr size_t kDefaultMaxWindowSizeBytes = 16384; |
| |
| thread::Options& TransferThreadOptions() { |
| static thread::stl::Options options; |
| return options; |
| } |
| |
| // Transfer status, valid only after semaphore is acquired. |
| // |
| // We need to bundle the status and semaphore together because a pw_function |
| // callback can at most capture the reference to one variable (and we need to |
| // both set the status and release the semaphore). |
| struct TransferResult { |
| Status status = Status::Unknown(); |
| sync::BinarySemaphore completed; |
| }; |
| |
| // Create a pw_transfer client and perform the transfer actions. |
| pw::Status PerformTransferActions(const pw::transfer::ClientConfig& config) { |
| constexpr size_t kMaxPayloadSize = rpc::MaxSafePayloadSize(); |
| std::byte chunk_buffer[kMaxPayloadSize]; |
| std::byte encode_buffer[kMaxPayloadSize]; |
| transfer::Thread<2, 2> transfer_thread(chunk_buffer, encode_buffer); |
| thread::Thread system_thread(TransferThreadOptions(), transfer_thread); |
| |
| // As much as we don't want to dynamically allocate an array, |
| // variable length arrays (VLA) are nonstandard, and a std::vector could cause |
| // references to go stale if the vector's underlying buffer is resized. This |
| // array of TransferResults needs to outlive the loop that performs the |
| // actual transfer actions due to how some references to TransferResult |
| // may persist beyond the lifetime of a transfer. |
| const int num_actions = config.transfer_actions().size(); |
| auto transfer_results = std::make_unique<TransferResult[]>(num_actions); |
| |
| pw::transfer::Client client(rpc::integration_test::client(), |
| rpc::integration_test::kChannelId, |
| transfer_thread, |
| kDefaultMaxWindowSizeBytes); |
| |
| client.set_max_retries(config.max_retries()); |
| client.set_max_lifetime_retries(config.max_lifetime_retries()); |
| |
| Status status = pw::OkStatus(); |
| for (int i = 0; i < num_actions; i++) { |
| const pw::transfer::TransferAction& action = config.transfer_actions()[i]; |
| TransferResult& result = transfer_results[i]; |
| // If no protocol version is specified, default to the latest version. |
| pw::transfer::ProtocolVersion protocol_version = |
| action.protocol_version() == |
| pw::transfer::TransferAction::ProtocolVersion:: |
| TransferAction_ProtocolVersion_UNKNOWN_VERSION |
| ? pw::transfer::ProtocolVersion::kLatest |
| : static_cast<pw::transfer::ProtocolVersion>( |
| action.protocol_version()); |
| if (action.transfer_type() == |
| pw::transfer::TransferAction::TransferType:: |
| TransferAction_TransferType_WRITE_TO_SERVER) { |
| pw::stream::StdFileReader input(action.file_path().c_str()); |
| pw::Result<pw::transfer::Client::Handle> handle = client.Write( |
| action.resource_id(), |
| input, |
| [&result](Status status) { |
| result.status = status; |
| result.completed.release(); |
| }, |
| protocol_version, |
| pw::transfer::cfg::kDefaultClientTimeout, |
| pw::transfer::cfg::kDefaultInitialChunkTimeout, |
| action.initial_offset()); |
| if (handle.ok()) { |
| // Wait for the transfer to complete. We need to do this here so that |
| // the StdFileReader doesn't go out of scope. |
| result.completed.acquire(); |
| } else { |
| result.status = handle.status(); |
| } |
| |
| input.Close(); |
| |
| } else if (action.transfer_type() == |
| pw::transfer::TransferAction::TransferType:: |
| TransferAction_TransferType_READ_FROM_SERVER) { |
| pw::stream::StdFileWriter output(action.file_path().c_str()); |
| pw::Result<pw::transfer::Client::Handle> handle = client.Read( |
| action.resource_id(), |
| output, |
| [&result](Status status) { |
| result.status = status; |
| result.completed.release(); |
| }, |
| protocol_version, |
| pw::transfer::cfg::kDefaultClientTimeout, |
| pw::transfer::cfg::kDefaultInitialChunkTimeout, |
| action.initial_offset()); |
| if (handle.ok()) { |
| // Wait for the transfer to complete. |
| result.completed.acquire(); |
| } else { |
| result.status = handle.status(); |
| } |
| |
| output.Close(); |
| } else { |
| PW_LOG_ERROR("Unrecognized transfer action type %d", |
| action.transfer_type()); |
| status = pw::Status::InvalidArgument(); |
| break; |
| } |
| |
| if (int(result.status.code()) != int(action.expected_status())) { |
| PW_LOG_ERROR("Failed to perform action:\n%s", |
| action.DebugString().c_str()); |
| status = result.status.ok() ? Status::Unknown() : result.status; |
| break; |
| } |
| } |
| |
| transfer_thread.Terminate(); |
| |
| system_thread.join(); |
| |
| // The RPC thread must join before destroying transfer objects as the transfer |
| // service may still reference the transfer thread or transfer client objects. |
| pw::rpc::integration_test::TerminateClient(); |
| return status; |
| } |
| |
| } // namespace |
| } // namespace pw::transfer::integration_test |
| |
| int main(int argc, char* argv[]) { |
| if (argc < 2) { |
| PW_LOG_INFO("Usage: %s PORT <<< config textproto", argv[0]); |
| return 1; |
| } |
| |
| const int port = std::atoi(argv[1]); |
| |
| std::string config_string; |
| std::string line; |
| while (std::getline(std::cin, line)) { |
| config_string = config_string + line + '\n'; |
| } |
| pw::transfer::ClientConfig config; |
| |
| bool ok = |
| google::protobuf::TextFormat::ParseFromString(config_string, &config); |
| if (!ok) { |
| PW_LOG_INFO("Failed to parse config: %s", config_string.c_str()); |
| PW_LOG_INFO("Usage: %s PORT <<< config textproto", argv[0]); |
| return 1; |
| } else { |
| PW_LOG_INFO("Client loaded config:\n%s", config.DebugString().c_str()); |
| } |
| |
| if (!pw::rpc::integration_test::InitializeClient(port).ok()) { |
| return 1; |
| } |
| |
| int retval = pw::rpc::integration_test::SetClientSockOpt( |
| SOL_SOCKET, |
| SO_SNDBUF, |
| &pw::transfer::integration_test::kMaxSocketSendBufferSize, |
| sizeof(pw::transfer::integration_test::kMaxSocketSendBufferSize)); |
| PW_CHECK_INT_EQ(retval, |
| 0, |
| "Failed to configure socket send buffer size with errno=%d", |
| errno); |
| |
| if (!pw::transfer::integration_test::PerformTransferActions(config).ok()) { |
| PW_LOG_INFO("Failed to transfer!"); |
| return 1; |
| } |
| return 0; |
| } |