| // Copyright 2016 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "mojo/core/channel.h" |
| |
| #include <errno.h> |
| #include <sys/socket.h> |
| |
| #include <algorithm> |
| #include <limits> |
| #include <memory> |
| |
| #include "base/bind.h" |
| #include "base/containers/queue.h" |
| #include "base/location.h" |
| #include "base/macros.h" |
| #include "base/memory/ref_counted.h" |
| #include "base/message_loop/message_loop_current.h" |
| #include "base/message_loop/message_pump_for_io.h" |
| #include "base/synchronization/lock.h" |
| #include "base/task_runner.h" |
| #include "build/build_config.h" |
| #include "mojo/core/core.h" |
| #include "mojo/public/cpp/platform/socket_utils_posix.h" |
| |
| #if !defined(OS_NACL) |
| #include <sys/uio.h> |
| #endif |
| |
| #if defined(OS_MACOSX) && !defined(OS_IOS) |
| #include "mojo/core/mach_port_relay.h" |
| #endif |
| |
| namespace mojo { |
| namespace core { |
| |
| namespace { |
| |
| const size_t kMaxBatchReadCapacity = 256 * 1024; |
| |
| // A view over a Channel::Message object. The write queue uses these since |
| // large messages may need to be sent in chunks. |
| class MessageView { |
| public: |
| // Owns |message|. |offset| indexes the first unsent byte in the message. |
| MessageView(Channel::MessagePtr message, size_t offset) |
| : message_(std::move(message)), |
| offset_(offset), |
| handles_(message_->TakeHandlesForTransport()) { |
| DCHECK_GT(message_->data_num_bytes(), offset_); |
| } |
| |
| MessageView(MessageView&& other) { *this = std::move(other); } |
| |
| MessageView& operator=(MessageView&& other) { |
| message_ = std::move(other.message_); |
| offset_ = other.offset_; |
| handles_ = std::move(other.handles_); |
| return *this; |
| } |
| |
| ~MessageView() {} |
| |
| const void* data() const { |
| return static_cast<const char*>(message_->data()) + offset_; |
| } |
| |
| size_t data_num_bytes() const { return message_->data_num_bytes() - offset_; } |
| |
| size_t data_offset() const { return offset_; } |
| void advance_data_offset(size_t num_bytes) { |
| DCHECK_GT(message_->data_num_bytes(), offset_ + num_bytes); |
| offset_ += num_bytes; |
| } |
| |
| std::vector<PlatformHandleInTransit> TakeHandles() { |
| return std::move(handles_); |
| } |
| Channel::MessagePtr TakeMessage() { return std::move(message_); } |
| |
| void SetHandles(std::vector<PlatformHandleInTransit> handles) { |
| handles_ = std::move(handles); |
| } |
| |
| private: |
| Channel::MessagePtr message_; |
| size_t offset_; |
| std::vector<PlatformHandleInTransit> handles_; |
| |
| DISALLOW_COPY_AND_ASSIGN(MessageView); |
| }; |
| |
| class ChannelPosix : public Channel, |
| #if defined(OS_MACOSX) && !defined(OS_IOS) |
| public MachPortRelay::Observer, |
| #endif |
| public base::MessageLoopCurrent::DestructionObserver, |
| public base::MessagePumpForIO::FdWatcher { |
| public: |
| ChannelPosix(Delegate* delegate, |
| ConnectionParams connection_params, |
| scoped_refptr<base::TaskRunner> io_task_runner) |
| : Channel(delegate), self_(this), io_task_runner_(io_task_runner) { |
| if (connection_params.server_endpoint().is_valid()) |
| server_ = connection_params.TakeServerEndpoint(); |
| else |
| socket_ = connection_params.TakeEndpoint().TakePlatformHandle().TakeFD(); |
| |
| CHECK(server_.is_valid() || socket_.is_valid()); |
| } |
| |
| void Start() override { |
| #if defined(OS_MACOSX) && !defined(OS_IOS) |
| auto* relay = Core::Get()->GetMachPortRelay(); |
| if (relay) { |
| // We should only have a relay if we know the remote process handle, |
| // because that means we're in the broker process. |
| relay->AddObserver(this); |
| } |
| #endif |
| |
| if (io_task_runner_->RunsTasksInCurrentSequence()) { |
| StartOnIOThread(); |
| } else { |
| io_task_runner_->PostTask( |
| FROM_HERE, base::BindOnce(&ChannelPosix::StartOnIOThread, this)); |
| } |
| } |
| |
| void ShutDownImpl() override { |
| // Always shut down asynchronously when called through the public interface. |
| io_task_runner_->PostTask( |
| FROM_HERE, base::BindOnce(&ChannelPosix::ShutDownOnIOThread, this)); |
| } |
| |
| void Write(MessagePtr message) override { |
| #if defined(OS_MACOSX) && !defined(OS_IOS) |
| // If this message has Mach ports and we have a MachPortRelay, use the relay |
| // to rewrite the ports as receive rights from which the send right can be |
| // read. See |MachPortRelay::SendPortsToProcess()|. |
| // |
| // Note that if we don't have a relay, the receiving process must, and they |
| // must also have the ability to extract a send right from the ports that |
| // are already attached. |
| MachPortRelay* relay = Core::Get()->GetMachPortRelay(); |
| if (relay && remote_process().is_valid() && message->has_mach_ports()) { |
| if (relay->port_provider()->TaskForPid(remote_process().get()) == |
| MACH_PORT_NULL) { |
| // We also need to have a task port for the remote process before we can |
| // send it any other ports. If we don't have one yet, queue the message |
| // until OnProcessReady() is invoked. |
| base::AutoLock lock(task_port_wait_lock_); |
| pending_outgoing_with_mach_ports_.emplace_back(std::move(message)); |
| return; |
| } |
| |
| relay->SendPortsToProcess(message.get(), remote_process().get()); |
| } |
| #endif |
| |
| bool write_error = false; |
| { |
| base::AutoLock lock(write_lock_); |
| if (reject_writes_) |
| return; |
| if (outgoing_messages_.empty()) { |
| if (!WriteNoLock(MessageView(std::move(message), 0))) |
| reject_writes_ = write_error = true; |
| } else { |
| outgoing_messages_.emplace_back(std::move(message), 0); |
| } |
| } |
| if (write_error) { |
| // Invoke OnWriteError() asynchronously on the IO thread, in case Write() |
| // was called by the delegate, in which case we should not re-enter it. |
| io_task_runner_->PostTask( |
| FROM_HERE, base::BindOnce(&ChannelPosix::OnWriteError, this, |
| Error::kDisconnected)); |
| } |
| } |
| |
| void LeakHandle() override { |
| DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| leak_handle_ = true; |
| } |
| |
| bool GetReadPlatformHandles(const void* payload, |
| size_t payload_size, |
| size_t num_handles, |
| const void* extra_header, |
| size_t extra_header_size, |
| std::vector<PlatformHandle>* handles, |
| bool* deferred) override { |
| if (num_handles > std::numeric_limits<uint16_t>::max()) |
| return false; |
| #if defined(OS_MACOSX) && !defined(OS_IOS) |
| // On OSX, we can have mach ports which are located in the extra header |
| // section. |
| using MachPortsEntry = Channel::Message::MachPortsEntry; |
| using MachPortsExtraHeader = Channel::Message::MachPortsExtraHeader; |
| if (extra_header_size < |
| sizeof(MachPortsExtraHeader) + num_handles * sizeof(MachPortsEntry)) { |
| return false; |
| } |
| const MachPortsExtraHeader* mach_ports_header = |
| reinterpret_cast<const MachPortsExtraHeader*>(extra_header); |
| size_t num_mach_ports = mach_ports_header->num_ports; |
| if (num_mach_ports > num_handles) |
| return false; |
| if (incoming_fds_.size() + num_mach_ports < num_handles) |
| return true; |
| |
| std::vector<PlatformHandleInTransit> handles_in_transit(num_handles); |
| const MachPortsEntry* mach_ports = mach_ports_header->entries; |
| |
| // If we know the remote process handle, we assume all incoming Mach ports |
| // are send right references owned by the remote process. Otherwise they're |
| // receive ports we can use to read a send right. |
| const bool extract_send_rights = remote_process().is_valid(); |
| for (size_t i = 0, mach_port_index = 0; i < num_handles; ++i) { |
| if (mach_port_index < num_mach_ports && |
| mach_ports[mach_port_index].index == i) { |
| mach_port_t port_name = |
| static_cast<mach_port_t>(mach_ports[mach_port_index].mach_port); |
| if (extract_send_rights) { |
| handles_in_transit[i] = |
| PlatformHandleInTransit::CreateForMachPortName(port_name); |
| } else { |
| handles_in_transit[i] = PlatformHandleInTransit( |
| PlatformHandle(MachPortRelay::ReceiveSendRight( |
| base::mac::ScopedMachReceiveRight(port_name)))); |
| } |
| mach_port_index++; |
| } else { |
| if (incoming_fds_.empty()) |
| return false; |
| handles_in_transit[i] = PlatformHandleInTransit( |
| PlatformHandle(std::move(incoming_fds_.front()))); |
| incoming_fds_.pop_front(); |
| } |
| } |
| if (extract_send_rights && num_mach_ports) { |
| MachPortRelay* relay = Core::Get()->GetMachPortRelay(); |
| DCHECK(relay); |
| // Extracting send rights requires that we have a task port for the |
| // remote process, which we may not yet have. |
| if (relay->port_provider()->TaskForPid(remote_process().get()) != |
| MACH_PORT_NULL) { |
| // We do have a task port, so extract the send rights immediately. |
| for (auto& handle : handles_in_transit) { |
| if (handle.is_mach_port_name()) { |
| handle = PlatformHandleInTransit(PlatformHandle(relay->ExtractPort( |
| handle.mach_port_name(), remote_process().get()))); |
| } |
| } |
| } else { |
| // No task port, we have to defer this message. |
| *deferred = true; |
| base::AutoLock lock(task_port_wait_lock_); |
| std::vector<uint8_t> data(payload_size); |
| memcpy(data.data(), payload, payload_size); |
| pending_incoming_with_mach_ports_.emplace_back( |
| std::move(data), std::move(handles_in_transit)); |
| return true; |
| } |
| } |
| |
| handles->resize(handles_in_transit.size()); |
| for (size_t i = 0; i < handles->size(); ++i) |
| handles->at(i) = handles_in_transit[i].TakeHandle(); |
| #else |
| if (incoming_fds_.size() < num_handles) |
| return true; |
| |
| handles->resize(num_handles); |
| for (size_t i = 0; i < num_handles; ++i) { |
| handles->at(i) = PlatformHandle(std::move(incoming_fds_.front())); |
| incoming_fds_.pop_front(); |
| } |
| #endif |
| |
| return true; |
| } |
| |
| private: |
| ~ChannelPosix() override { |
| DCHECK(!read_watcher_); |
| DCHECK(!write_watcher_); |
| } |
| |
| void StartOnIOThread() { |
| DCHECK(!read_watcher_); |
| DCHECK(!write_watcher_); |
| read_watcher_.reset( |
| new base::MessagePumpForIO::FdWatchController(FROM_HERE)); |
| base::MessageLoopCurrent::Get()->AddDestructionObserver(this); |
| if (server_.is_valid()) { |
| base::MessageLoopCurrentForIO::Get()->WatchFileDescriptor( |
| server_.platform_handle().GetFD().get(), false /* persistent */, |
| base::MessagePumpForIO::WATCH_READ, read_watcher_.get(), this); |
| } else { |
| write_watcher_.reset( |
| new base::MessagePumpForIO::FdWatchController(FROM_HERE)); |
| base::MessageLoopCurrentForIO::Get()->WatchFileDescriptor( |
| socket_.get(), true /* persistent */, |
| base::MessagePumpForIO::WATCH_READ, read_watcher_.get(), this); |
| base::AutoLock lock(write_lock_); |
| FlushOutgoingMessagesNoLock(); |
| } |
| } |
| |
| void WaitForWriteOnIOThread() { |
| base::AutoLock lock(write_lock_); |
| WaitForWriteOnIOThreadNoLock(); |
| } |
| |
| void WaitForWriteOnIOThreadNoLock() { |
| if (pending_write_) |
| return; |
| if (!write_watcher_) |
| return; |
| if (io_task_runner_->RunsTasksInCurrentSequence()) { |
| pending_write_ = true; |
| base::MessageLoopCurrentForIO::Get()->WatchFileDescriptor( |
| socket_.get(), false /* persistent */, |
| base::MessagePumpForIO::WATCH_WRITE, write_watcher_.get(), this); |
| } else { |
| io_task_runner_->PostTask( |
| FROM_HERE, |
| base::BindOnce(&ChannelPosix::WaitForWriteOnIOThread, this)); |
| } |
| } |
| |
| void ShutDownOnIOThread() { |
| base::MessageLoopCurrent::Get()->RemoveDestructionObserver(this); |
| |
| read_watcher_.reset(); |
| write_watcher_.reset(); |
| if (leak_handle_) { |
| ignore_result(socket_.release()); |
| server_.TakePlatformHandle().release(); |
| } else { |
| socket_.reset(); |
| ignore_result(server_.TakePlatformHandle()); |
| } |
| #if defined(OS_MACOSX) |
| fds_to_close_.clear(); |
| #endif |
| |
| #if defined(OS_MACOSX) && !defined(OS_IOS) |
| auto* relay = Core::Get()->GetMachPortRelay(); |
| if (relay) |
| relay->RemoveObserver(this); |
| #endif |
| |
| // May destroy the |this| if it was the last reference. |
| self_ = nullptr; |
| } |
| |
| #if defined(OS_MACOSX) && !defined(OS_IOS) |
| // MachPortRelay::Observer: |
| void OnProcessReady(base::ProcessHandle process) override { |
| if (process != remote_process().get()) |
| return; |
| |
| io_task_runner_->PostTask( |
| FROM_HERE, |
| base::BindOnce( |
| &ChannelPosix::FlushPendingMessagesWithMachPortsOnIOThread, this)); |
| } |
| |
| void FlushPendingMessagesWithMachPortsOnIOThread() { |
| // We have a task port for the remote process. Now we can send or accept |
| // any pending messages with Mach ports. |
| std::vector<RawIncomingMessage> incoming; |
| std::vector<MessagePtr> outgoing; |
| { |
| base::AutoLock lock(task_port_wait_lock_); |
| if (reject_incoming_messages_with_mach_ports_) |
| return; |
| std::swap(pending_incoming_with_mach_ports_, incoming); |
| std::swap(pending_outgoing_with_mach_ports_, outgoing); |
| } |
| |
| DCHECK(remote_process().is_valid()); |
| base::ProcessHandle process = remote_process().get(); |
| MachPortRelay* relay = Core::Get()->GetMachPortRelay(); |
| DCHECK(relay); |
| for (auto& message : incoming) { |
| Channel::Delegate* d = delegate(); |
| if (!d) |
| break; |
| std::vector<PlatformHandle> handles(message.handles.size()); |
| for (size_t i = 0; i < message.handles.size(); ++i) { |
| if (message.handles[i].is_mach_port_name()) { |
| handles[i] = PlatformHandle( |
| relay->ExtractPort(message.handles[i].mach_port_name(), process)); |
| } else { |
| DCHECK(!message.handles[i].owning_process().is_valid()); |
| handles[i] = message.handles[i].TakeHandle(); |
| } |
| } |
| d->OnChannelMessage(message.data.data(), message.data.size(), |
| std::move(handles)); |
| } |
| |
| for (auto& message : outgoing) { |
| relay->SendPortsToProcess(message.get(), process); |
| Write(std::move(message)); |
| } |
| } |
| #endif |
| |
| // base::MessageLoopCurrent::DestructionObserver: |
| void WillDestroyCurrentMessageLoop() override { |
| DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| if (self_) |
| ShutDownOnIOThread(); |
| } |
| |
| // base::MessagePumpForIO::FdWatcher: |
| void OnFileCanReadWithoutBlocking(int fd) override { |
| if (server_.is_valid()) { |
| CHECK_EQ(fd, server_.platform_handle().GetFD().get()); |
| #if !defined(OS_NACL) |
| read_watcher_.reset(); |
| base::MessageLoopCurrent::Get()->RemoveDestructionObserver(this); |
| |
| AcceptSocketConnection(server_.platform_handle().GetFD().get(), &socket_); |
| ignore_result(server_.TakePlatformHandle()); |
| if (!socket_.is_valid()) { |
| OnError(Error::kConnectionFailed); |
| return; |
| } |
| StartOnIOThread(); |
| #else |
| NOTREACHED(); |
| #endif |
| return; |
| } |
| CHECK_EQ(fd, socket_.get()); |
| |
| bool validation_error = false; |
| bool read_error = false; |
| size_t next_read_size = 0; |
| size_t buffer_capacity = 0; |
| size_t total_bytes_read = 0; |
| size_t bytes_read = 0; |
| do { |
| buffer_capacity = next_read_size; |
| char* buffer = GetReadBuffer(&buffer_capacity); |
| DCHECK_GT(buffer_capacity, 0u); |
| |
| std::vector<base::ScopedFD> incoming_fds; |
| ssize_t read_result = |
| SocketRecvmsg(socket_.get(), buffer, buffer_capacity, &incoming_fds); |
| for (auto& fd : incoming_fds) |
| incoming_fds_.emplace_back(std::move(fd)); |
| |
| if (read_result > 0) { |
| bytes_read = static_cast<size_t>(read_result); |
| total_bytes_read += bytes_read; |
| if (!OnReadComplete(bytes_read, &next_read_size)) { |
| read_error = true; |
| validation_error = true; |
| break; |
| } |
| } else if (read_result == 0 || |
| (errno != EAGAIN && errno != EWOULDBLOCK)) { |
| read_error = true; |
| break; |
| } |
| } while (bytes_read == buffer_capacity && |
| total_bytes_read < kMaxBatchReadCapacity && next_read_size > 0); |
| if (read_error) { |
| // Stop receiving read notifications. |
| read_watcher_.reset(); |
| if (validation_error) |
| OnError(Error::kReceivedMalformedData); |
| else |
| OnError(Error::kDisconnected); |
| } |
| } |
| |
| void OnFileCanWriteWithoutBlocking(int fd) override { |
| bool write_error = false; |
| { |
| base::AutoLock lock(write_lock_); |
| pending_write_ = false; |
| if (!FlushOutgoingMessagesNoLock()) |
| reject_writes_ = write_error = true; |
| } |
| if (write_error) |
| OnWriteError(Error::kDisconnected); |
| } |
| |
| // Attempts to write a message directly to the channel. If the full message |
| // cannot be written, it's queued and a wait is initiated to write the message |
| // ASAP on the I/O thread. |
| bool WriteNoLock(MessageView message_view) { |
| if (server_.is_valid()) { |
| outgoing_messages_.emplace_front(std::move(message_view)); |
| return true; |
| } |
| size_t bytes_written = 0; |
| do { |
| message_view.advance_data_offset(bytes_written); |
| |
| ssize_t result; |
| std::vector<PlatformHandleInTransit> handles = message_view.TakeHandles(); |
| if (!handles.empty()) { |
| iovec iov = {const_cast<void*>(message_view.data()), |
| message_view.data_num_bytes()}; |
| std::vector<base::ScopedFD> fds(handles.size()); |
| for (size_t i = 0; i < handles.size(); ++i) |
| fds[i] = handles[i].TakeHandle().TakeFD(); |
| // TODO: Handle lots of handles. |
| result = SendmsgWithHandles(socket_.get(), &iov, 1, fds); |
| if (result >= 0) { |
| #if defined(OS_MACOSX) |
| // There is a bug on OSX which makes it dangerous to close |
| // a file descriptor while it is in transit. So instead we |
| // store the file descriptor in a set and send a message to |
| // the recipient, which is queued AFTER the message that |
| // sent the FD. The recipient will reply to the message, |
| // letting us know that it is now safe to close the file |
| // descriptor. For more information, see: |
| // http://crbug.com/298276 |
| MessagePtr fds_message( |
| new Channel::Message(sizeof(fds[0]) * fds.size(), 0, |
| Message::MessageType::HANDLES_SENT)); |
| memcpy(fds_message->mutable_payload(), fds.data(), |
| sizeof(fds[0]) * fds.size()); |
| outgoing_messages_.emplace_back(std::move(fds_message), 0); |
| { |
| base::AutoLock l(fds_to_close_lock_); |
| for (auto& fd : fds) |
| fds_to_close_.emplace_back(std::move(fd)); |
| } |
| #endif // defined(OS_MACOSX) |
| } else { |
| // Message transmission failed, so pull the FDs back into |handles| |
| // so they can be held by the Message again. |
| for (size_t i = 0; i < fds.size(); ++i) { |
| handles[i] = |
| PlatformHandleInTransit(PlatformHandle(std::move(fds[i]))); |
| } |
| } |
| } else { |
| result = SocketWrite(socket_.get(), message_view.data(), |
| message_view.data_num_bytes()); |
| } |
| |
| if (result < 0) { |
| if (errno != EAGAIN && |
| errno != EWOULDBLOCK |
| #if defined(OS_MACOSX) |
| // On OS X if sendmsg() is trying to send fds between processes and |
| // there isn't enough room in the output buffer to send the fd |
| // structure over atomically then EMSGSIZE is returned. |
| // |
| // EMSGSIZE presents a problem since the system APIs can only call |
| // us when there's room in the socket buffer and not when there is |
| // "enough" room. |
| // |
| // The current behavior is to return to the event loop when EMSGSIZE |
| // is received and hopefull service another FD. This is however |
| // still technically a busy wait since the event loop will call us |
| // right back until the receiver has read enough data to allow |
| // passing the FD over atomically. |
| && errno != EMSGSIZE |
| #endif |
| ) { |
| return false; |
| } |
| message_view.SetHandles(std::move(handles)); |
| outgoing_messages_.emplace_front(std::move(message_view)); |
| WaitForWriteOnIOThreadNoLock(); |
| return true; |
| } |
| |
| bytes_written = static_cast<size_t>(result); |
| } while (bytes_written < message_view.data_num_bytes()); |
| |
| return FlushOutgoingMessagesNoLock(); |
| } |
| |
| bool FlushOutgoingMessagesNoLock() { |
| base::circular_deque<MessageView> messages; |
| std::swap(outgoing_messages_, messages); |
| |
| while (!messages.empty()) { |
| if (!WriteNoLock(std::move(messages.front()))) |
| return false; |
| |
| messages.pop_front(); |
| if (!outgoing_messages_.empty()) { |
| // The message was requeued by WriteNoLock(), so we have to wait for |
| // pipe to become writable again. Repopulate the message queue and exit. |
| // If sending the message triggered any control messages, they may be |
| // in |outgoing_messages_| in addition to or instead of the message |
| // being sent. |
| std::swap(messages, outgoing_messages_); |
| while (!messages.empty()) { |
| outgoing_messages_.push_front(std::move(messages.back())); |
| messages.pop_back(); |
| } |
| return true; |
| } |
| } |
| |
| return true; |
| } |
| |
| #if defined(OS_MACOSX) |
| bool OnControlMessage(Message::MessageType message_type, |
| const void* payload, |
| size_t payload_size, |
| std::vector<PlatformHandle> handles) override { |
| switch (message_type) { |
| case Message::MessageType::HANDLES_SENT: { |
| if (payload_size == 0) |
| break; |
| MessagePtr message(new Channel::Message( |
| payload_size, 0, Message::MessageType::HANDLES_SENT_ACK)); |
| memcpy(message->mutable_payload(), payload, payload_size); |
| Write(std::move(message)); |
| return true; |
| } |
| |
| case Message::MessageType::HANDLES_SENT_ACK: { |
| size_t num_fds = payload_size / sizeof(int); |
| if (num_fds == 0 || payload_size % sizeof(int) != 0) |
| break; |
| |
| const int* fds = reinterpret_cast<const int*>(payload); |
| if (!CloseHandles(fds, num_fds)) |
| break; |
| return true; |
| } |
| |
| default: |
| break; |
| } |
| |
| return false; |
| } |
| |
| // Closes handles referenced by |fds|. Returns false if |num_fds| is 0, or if |
| // |fds| does not match a sequence of handles in |fds_to_close_|. |
| bool CloseHandles(const int* fds, size_t num_fds) { |
| base::AutoLock l(fds_to_close_lock_); |
| if (!num_fds) |
| return false; |
| |
| auto start = std::find_if( |
| fds_to_close_.begin(), fds_to_close_.end(), |
| [&fds](const base::ScopedFD& fd) { return fd.get() == fds[0]; }); |
| if (start == fds_to_close_.end()) |
| return false; |
| |
| auto it = start; |
| size_t i = 0; |
| // The FDs in the message should match a sequence of handles in |
| // |fds_to_close_|. |
| // TODO(wez): Consider making |fds_to_close_| a circular_deque<> |
| // for greater efficiency? Or assign a unique Id to each FD-containing |
| // message, and map that to a vector of FDs to close, to avoid the |
| // need for this traversal? Id could even be the first FD in the message. |
| for (; i < num_fds && it != fds_to_close_.end(); i++, ++it) { |
| if (it->get() != fds[i]) |
| return false; |
| } |
| if (i != num_fds) |
| return false; |
| |
| // Close the FDs by erase()ing their ScopedFDs. |
| fds_to_close_.erase(start, it); |
| return true; |
| } |
| #endif // defined(OS_MACOSX) |
| |
| void OnWriteError(Error error) { |
| DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| DCHECK(reject_writes_); |
| |
| if (error == Error::kDisconnected) { |
| // If we can't write because the pipe is disconnected then continue |
| // reading to fetch any in-flight messages, relying on end-of-stream to |
| // signal the actual disconnection. |
| if (read_watcher_) { |
| write_watcher_.reset(); |
| return; |
| } |
| } |
| |
| OnError(error); |
| } |
| |
| // Keeps the Channel alive at least until explicit shutdown on the IO thread. |
| scoped_refptr<Channel> self_; |
| |
| // We may be initialized with a server socket, in which case this will be |
| // valid until it accepts an incoming connection. |
| PlatformChannelServerEndpoint server_; |
| |
| // The socket over which to communicate. May be passed in at construction time |
| // or accepted over |server_|. |
| base::ScopedFD socket_; |
| |
| scoped_refptr<base::TaskRunner> io_task_runner_; |
| |
| // These watchers must only be accessed on the IO thread. |
| std::unique_ptr<base::MessagePumpForIO::FdWatchController> read_watcher_; |
| std::unique_ptr<base::MessagePumpForIO::FdWatchController> write_watcher_; |
| |
| base::circular_deque<base::ScopedFD> incoming_fds_; |
| |
| // Protects |pending_write_| and |outgoing_messages_|. |
| base::Lock write_lock_; |
| bool pending_write_ = false; |
| bool reject_writes_ = false; |
| base::circular_deque<MessageView> outgoing_messages_; |
| |
| bool leak_handle_ = false; |
| |
| #if defined(OS_MACOSX) |
| base::Lock fds_to_close_lock_; |
| std::vector<base::ScopedFD> fds_to_close_; |
| #if !defined(OS_IOS) |
| // Guards access to the send/receive queues below. These are messages that |
| // can't be fully accepted from or dispatched to the Channel user yet because |
| // we're still waiting on a task port for the remote process. |
| struct RawIncomingMessage { |
| RawIncomingMessage(std::vector<uint8_t> data, |
| std::vector<PlatformHandleInTransit> handles) |
| : data(std::move(data)), handles(std::move(handles)) {} |
| RawIncomingMessage(RawIncomingMessage&&) = default; |
| ~RawIncomingMessage() = default; |
| |
| std::vector<uint8_t> data; |
| std::vector<PlatformHandleInTransit> handles; |
| }; |
| |
| base::Lock task_port_wait_lock_; |
| bool reject_incoming_messages_with_mach_ports_ = false; |
| std::vector<MessagePtr> pending_outgoing_with_mach_ports_; |
| std::vector<RawIncomingMessage> pending_incoming_with_mach_ports_; |
| #endif // !defined(OS_IOS) |
| #endif // defined(OS_MACOSX) |
| |
| DISALLOW_COPY_AND_ASSIGN(ChannelPosix); |
| }; |
| |
| } // namespace |
| |
| // static |
| scoped_refptr<Channel> Channel::Create( |
| Delegate* delegate, |
| ConnectionParams connection_params, |
| scoped_refptr<base::TaskRunner> io_task_runner) { |
| return new ChannelPosix(delegate, std::move(connection_params), |
| io_task_runner); |
| } |
| |
| } // namespace core |
| } // namespace mojo |