| // Copyright 2016 The Chromium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include "mojo/core/node_controller.h" |
| |
| #include <algorithm> |
| #include <limits> |
| #include <vector> |
| |
| #include "base/bind.h" |
| #include "base/containers/queue.h" |
| #include "base/location.h" |
| #include "base/logging.h" |
| #include "base/macros.h" |
| #include "base/message_loop/message_loop_current.h" |
| #include "base/metrics/histogram_macros.h" |
| #include "base/process/process_handle.h" |
| #include "base/rand_util.h" |
| #include "base/time/time.h" |
| #include "base/timer/elapsed_timer.h" |
| #include "mojo/core/broker.h" |
| #include "mojo/core/broker_host.h" |
| #include "mojo/core/configuration.h" |
| #include "mojo/core/core.h" |
| #include "mojo/core/request_context.h" |
| #include "mojo/core/user_message_impl.h" |
| #include "mojo/public/cpp/platform/named_platform_channel.h" |
| #include "mojo/public/cpp/platform/platform_channel.h" |
| |
| #if defined(OS_WIN) |
| #include <windows.h> |
| #endif |
| |
| #if defined(OS_MACOSX) && !defined(OS_IOS) |
| #include "mojo/core/mach_port_relay.h" |
| #endif |
| |
| #if !defined(OS_NACL) |
| #include "crypto/random.h" |
| #endif |
| |
| namespace mojo { |
| namespace core { |
| |
| namespace { |
| |
| #if defined(OS_NACL) |
| template <typename T> |
| void GenerateRandomName(T* out) { |
| base::RandBytes(out, sizeof(T)); |
| } |
| #else |
| template <typename T> |
| void GenerateRandomName(T* out) { |
| crypto::RandBytes(out, sizeof(T)); |
| } |
| #endif |
| |
| ports::NodeName GetRandomNodeName() { |
| ports::NodeName name; |
| GenerateRandomName(&name); |
| return name; |
| } |
| |
| Channel::MessagePtr SerializeEventMessage(ports::ScopedEvent event) { |
| if (event->type() == ports::Event::Type::kUserMessage) { |
| // User message events must already be partially serialized. |
| return UserMessageImpl::FinalizeEventMessage( |
| ports::Event::Cast<ports::UserMessageEvent>(&event)); |
| } |
| |
| void* data; |
| size_t size = event->GetSerializedSize(); |
| auto message = NodeChannel::CreateEventMessage(size, size, &data, 0); |
| event->Serialize(data); |
| return message; |
| } |
| |
| ports::ScopedEvent DeserializeEventMessage( |
| const ports::NodeName& from_node, |
| Channel::MessagePtr channel_message) { |
| void* data; |
| size_t size; |
| NodeChannel::GetEventMessageData(channel_message.get(), &data, &size); |
| auto event = ports::Event::Deserialize(data, size); |
| if (!event) |
| return nullptr; |
| |
| if (event->type() != ports::Event::Type::kUserMessage) |
| return event; |
| |
| // User messages require extra parsing. |
| const size_t event_size = event->GetSerializedSize(); |
| |
| // Note that if this weren't true, the event couldn't have been deserialized |
| // in the first place. |
| DCHECK_LE(event_size, size); |
| |
| auto message_event = ports::Event::Cast<ports::UserMessageEvent>(&event); |
| auto message = UserMessageImpl::CreateFromChannelMessage( |
| message_event.get(), std::move(channel_message), |
| static_cast<uint8_t*>(data) + event_size, size - event_size); |
| message->set_source_node(from_node); |
| |
| message_event->AttachMessage(std::move(message)); |
| return std::move(message_event); |
| } |
| |
| // Used by NodeController to watch for shutdown. Since no IO can happen once |
| // the IO thread is killed, the NodeController can cleanly drop all its peers |
| // at that time. |
| class ThreadDestructionObserver |
| : public base::MessageLoopCurrent::DestructionObserver { |
| public: |
| static void Create(scoped_refptr<base::TaskRunner> task_runner, |
| const base::Closure& callback) { |
| if (task_runner->RunsTasksInCurrentSequence()) { |
| // Owns itself. |
| new ThreadDestructionObserver(callback); |
| } else { |
| task_runner->PostTask(FROM_HERE, |
| base::Bind(&Create, task_runner, callback)); |
| } |
| } |
| |
| private: |
| explicit ThreadDestructionObserver(const base::Closure& callback) |
| : callback_(callback) { |
| base::MessageLoopCurrent::Get()->AddDestructionObserver(this); |
| } |
| |
| ~ThreadDestructionObserver() override { |
| base::MessageLoopCurrent::Get()->RemoveDestructionObserver(this); |
| } |
| |
| // base::MessageLoopCurrent::DestructionObserver: |
| void WillDestroyCurrentMessageLoop() override { |
| callback_.Run(); |
| delete this; |
| } |
| |
| const base::Closure callback_; |
| |
| DISALLOW_COPY_AND_ASSIGN(ThreadDestructionObserver); |
| }; |
| |
| } // namespace |
| |
| NodeController::~NodeController() {} |
| |
| NodeController::NodeController(Core* core) |
| : core_(core), |
| name_(GetRandomNodeName()), |
| node_(new ports::Node(name_, this)) { |
| DVLOG(1) << "Initializing node " << name_; |
| } |
| |
| #if defined(OS_MACOSX) && !defined(OS_IOS) |
| void NodeController::CreateMachPortRelay(base::PortProvider* port_provider) { |
| base::AutoLock lock(mach_port_relay_lock_); |
| DCHECK(!mach_port_relay_); |
| mach_port_relay_.reset(new MachPortRelay(port_provider)); |
| } |
| #endif |
| |
| void NodeController::SetIOTaskRunner( |
| scoped_refptr<base::TaskRunner> task_runner) { |
| io_task_runner_ = task_runner; |
| ThreadDestructionObserver::Create( |
| io_task_runner_, |
| base::Bind(&NodeController::DropAllPeers, base::Unretained(this))); |
| } |
| |
| void NodeController::SendBrokerClientInvitation( |
| base::ProcessHandle target_process, |
| ConnectionParams connection_params, |
| const std::vector<std::pair<std::string, ports::PortRef>>& attached_ports, |
| const ProcessErrorCallback& process_error_callback) { |
| // Generate the temporary remote node name here so that it can be associated |
| // with the ports "attached" to this invitation. |
| ports::NodeName temporary_node_name; |
| GenerateRandomName(&temporary_node_name); |
| |
| { |
| base::AutoLock lock(reserved_ports_lock_); |
| PortMap& port_map = reserved_ports_[temporary_node_name]; |
| for (auto& entry : attached_ports) { |
| auto result = port_map.emplace(entry.first, entry.second); |
| DCHECK(result.second) << "Duplicate attachment: " << entry.first; |
| } |
| } |
| |
| ScopedProcessHandle scoped_target_process = |
| ScopedProcessHandle::CloneFrom(target_process); |
| io_task_runner_->PostTask( |
| FROM_HERE, |
| base::BindOnce(&NodeController::SendBrokerClientInvitationOnIOThread, |
| base::Unretained(this), std::move(scoped_target_process), |
| std::move(connection_params), temporary_node_name, |
| process_error_callback)); |
| } |
| |
| void NodeController::AcceptBrokerClientInvitation( |
| ConnectionParams connection_params) { |
| DCHECK(!GetConfiguration().is_broker_process); |
| #if !defined(OS_MACOSX) && !defined(OS_NACL_SFI) && !defined(OS_FUCHSIA) |
| // Use the bootstrap channel for the broker and receive the node's channel |
| // synchronously as the first message from the broker. |
| DCHECK(connection_params.endpoint().is_valid()); |
| base::ElapsedTimer timer; |
| broker_ = std::make_unique<Broker>( |
| connection_params.TakeEndpoint().TakePlatformHandle()); |
| PlatformChannelEndpoint endpoint = broker_->GetInviterEndpoint(); |
| |
| if (!endpoint.is_valid()) { |
| // Most likely the inviter's side of the channel has already been closed and |
| // the broker was unable to negotiate a NodeChannel pipe. In this case we |
| // can cancel our connection to our inviter. |
| DVLOG(1) << "Cannot connect to invalid inviter channel."; |
| CancelPendingPortMerges(); |
| return; |
| } |
| connection_params = ConnectionParams(std::move(endpoint)); |
| #endif |
| |
| io_task_runner_->PostTask( |
| FROM_HERE, |
| base::BindOnce(&NodeController::AcceptBrokerClientInvitationOnIOThread, |
| base::Unretained(this), std::move(connection_params))); |
| } |
| |
| void NodeController::ConnectIsolated(ConnectionParams connection_params, |
| const ports::PortRef& port, |
| base::StringPiece connection_name) { |
| io_task_runner_->PostTask( |
| FROM_HERE, |
| base::BindOnce(&NodeController::ConnectIsolatedOnIOThread, |
| base::Unretained(this), base::Passed(&connection_params), |
| port, connection_name.as_string())); |
| } |
| |
| void NodeController::SetPortObserver(const ports::PortRef& port, |
| scoped_refptr<PortObserver> observer) { |
| node_->SetUserData(port, std::move(observer)); |
| } |
| |
| void NodeController::ClosePort(const ports::PortRef& port) { |
| SetPortObserver(port, nullptr); |
| int rv = node_->ClosePort(port); |
| DCHECK_EQ(rv, ports::OK) << " Failed to close port: " << port.name(); |
| } |
| |
| int NodeController::SendUserMessage( |
| const ports::PortRef& port, |
| std::unique_ptr<ports::UserMessageEvent> message) { |
| return node_->SendUserMessage(port, std::move(message)); |
| } |
| |
| void NodeController::MergePortIntoInviter(const std::string& name, |
| const ports::PortRef& port) { |
| scoped_refptr<NodeChannel> inviter; |
| bool reject_merge = false; |
| { |
| // Hold |pending_port_merges_lock_| while getting |inviter|. Otherwise, |
| // there is a race where the inviter can be set, and |pending_port_merges_| |
| // be processed between retrieving |inviter| and adding the merge to |
| // |pending_port_merges_|. |
| base::AutoLock lock(pending_port_merges_lock_); |
| inviter = GetInviterChannel(); |
| if (reject_pending_merges_) { |
| reject_merge = true; |
| } else if (!inviter) { |
| pending_port_merges_.push_back(std::make_pair(name, port)); |
| return; |
| } |
| } |
| if (reject_merge) { |
| node_->ClosePort(port); |
| DVLOG(2) << "Rejecting port merge for name " << name |
| << " due to closed inviter channel."; |
| return; |
| } |
| |
| inviter->RequestPortMerge(port.name(), name); |
| } |
| |
| int NodeController::MergeLocalPorts(const ports::PortRef& port0, |
| const ports::PortRef& port1) { |
| return node_->MergeLocalPorts(port0, port1); |
| } |
| |
| base::WritableSharedMemoryRegion NodeController::CreateSharedBuffer( |
| size_t num_bytes) { |
| #if !defined(OS_MACOSX) && !defined(OS_NACL_SFI) && !defined(OS_FUCHSIA) |
| // Shared buffer creation failure is fatal, so always use the broker when we |
| // have one; unless of course the embedder forces us not to. |
| if (!GetConfiguration().force_direct_shared_memory_allocation && broker_) |
| return broker_->GetWritableSharedMemoryRegion(num_bytes); |
| #endif |
| return base::WritableSharedMemoryRegion::Create(num_bytes); |
| } |
| |
| void NodeController::RequestShutdown(const base::Closure& callback) { |
| { |
| base::AutoLock lock(shutdown_lock_); |
| shutdown_callback_ = callback; |
| shutdown_callback_flag_.Set(true); |
| } |
| |
| AttemptShutdownIfRequested(); |
| } |
| |
| void NodeController::NotifyBadMessageFrom(const ports::NodeName& source_node, |
| const std::string& error) { |
| scoped_refptr<NodeChannel> peer = GetPeerChannel(source_node); |
| if (peer) |
| peer->NotifyBadMessage(error); |
| } |
| |
| void NodeController::SendBrokerClientInvitationOnIOThread( |
| ScopedProcessHandle target_process, |
| ConnectionParams connection_params, |
| ports::NodeName temporary_node_name, |
| const ProcessErrorCallback& process_error_callback) { |
| DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| |
| #if !defined(OS_MACOSX) && !defined(OS_NACL) && !defined(OS_FUCHSIA) |
| PlatformChannel node_channel; |
| ConnectionParams node_connection_params(node_channel.TakeLocalEndpoint()); |
| // BrokerHost owns itself. |
| BrokerHost* broker_host = |
| new BrokerHost(target_process.get(), std::move(connection_params), |
| process_error_callback); |
| bool channel_ok = broker_host->SendChannel( |
| node_channel.TakeRemoteEndpoint().TakePlatformHandle()); |
| |
| #if defined(OS_WIN) |
| if (!channel_ok) { |
| // On Windows the above operation may fail if the channel is crossing a |
| // session boundary. In that case we fall back to a named pipe. |
| NamedPlatformChannel::Options options; |
| NamedPlatformChannel named_channel(options); |
| node_connection_params = |
| ConnectionParams(named_channel.TakeServerEndpoint()); |
| broker_host->SendNamedChannel(named_channel.GetServerName()); |
| } |
| #else |
| CHECK(channel_ok); |
| #endif // defined(OS_WIN) |
| |
| scoped_refptr<NodeChannel> channel = |
| NodeChannel::Create(this, std::move(node_connection_params), |
| io_task_runner_, process_error_callback); |
| |
| #else // !defined(OS_MACOSX) && !defined(OS_NACL) |
| scoped_refptr<NodeChannel> channel = |
| NodeChannel::Create(this, std::move(connection_params), io_task_runner_, |
| process_error_callback); |
| #endif // !defined(OS_MACOSX) && !defined(OS_NACL) |
| |
| // We set up the invitee channel with a temporary name so it can be identified |
| // as a pending invitee if it writes any messages to the channel. We may start |
| // receiving messages from it (though we shouldn't) as soon as Start() is |
| // called below. |
| |
| pending_invitations_.insert(std::make_pair(temporary_node_name, channel)); |
| |
| channel->SetRemoteNodeName(temporary_node_name); |
| channel->SetRemoteProcessHandle(std::move(target_process)); |
| channel->Start(); |
| |
| channel->AcceptInvitee(name_, temporary_node_name); |
| } |
| |
| void NodeController::AcceptBrokerClientInvitationOnIOThread( |
| ConnectionParams connection_params) { |
| DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| |
| { |
| base::AutoLock lock(inviter_lock_); |
| DCHECK(inviter_name_ == ports::kInvalidNodeName); |
| |
| // At this point we don't know the inviter's name, so we can't yet insert it |
| // into our |peers_| map. That will happen as soon as we receive an |
| // AcceptInvitee message from them. |
| bootstrap_inviter_channel_ = |
| NodeChannel::Create(this, std::move(connection_params), io_task_runner_, |
| ProcessErrorCallback()); |
| // Prevent the inviter pipe handle from being closed on shutdown. Pipe |
| // closure may be used by the inviter to detect the invitee process has |
| // exited. |
| bootstrap_inviter_channel_->LeakHandleOnShutdown(); |
| } |
| bootstrap_inviter_channel_->Start(); |
| } |
| |
| void NodeController::ConnectIsolatedOnIOThread( |
| ConnectionParams connection_params, |
| ports::PortRef port, |
| const std::string& connection_name) { |
| DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| |
| scoped_refptr<NodeChannel> channel = NodeChannel::Create( |
| this, std::move(connection_params), io_task_runner_, {}); |
| |
| RequestContext request_context; |
| ports::NodeName token; |
| GenerateRandomName(&token); |
| pending_isolated_connections_.emplace( |
| token, IsolatedConnection{channel, port, connection_name}); |
| if (!connection_name.empty()) { |
| // If a connection already exists with this name, drop it. |
| auto it = named_isolated_connections_.find(connection_name); |
| if (it != named_isolated_connections_.end()) { |
| ports::NodeName connection_node = it->second; |
| if (connection_node != name_) { |
| DropPeer(connection_node, nullptr); |
| } else { |
| auto pending_it = pending_isolated_connections_.find(connection_node); |
| if (pending_it != pending_isolated_connections_.end()) { |
| node_->ClosePort(pending_it->second.local_port); |
| pending_isolated_connections_.erase(pending_it); |
| } |
| named_isolated_connections_.erase(it); |
| } |
| } |
| named_isolated_connections_.emplace(connection_name, token); |
| } |
| |
| channel->SetRemoteNodeName(token); |
| channel->Start(); |
| |
| channel->AcceptPeer(name_, token, port.name()); |
| } |
| |
| scoped_refptr<NodeChannel> NodeController::GetPeerChannel( |
| const ports::NodeName& name) { |
| base::AutoLock lock(peers_lock_); |
| auto it = peers_.find(name); |
| if (it == peers_.end()) |
| return nullptr; |
| return it->second; |
| } |
| |
| scoped_refptr<NodeChannel> NodeController::GetInviterChannel() { |
| ports::NodeName inviter_name; |
| { |
| base::AutoLock lock(inviter_lock_); |
| inviter_name = inviter_name_; |
| } |
| return GetPeerChannel(inviter_name); |
| } |
| |
| scoped_refptr<NodeChannel> NodeController::GetBrokerChannel() { |
| if (GetConfiguration().is_broker_process) |
| return nullptr; |
| |
| ports::NodeName broker_name; |
| { |
| base::AutoLock lock(broker_lock_); |
| broker_name = broker_name_; |
| } |
| return GetPeerChannel(broker_name); |
| } |
| |
| void NodeController::AddPeer(const ports::NodeName& name, |
| scoped_refptr<NodeChannel> channel, |
| bool start_channel) { |
| DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| |
| DCHECK(name != ports::kInvalidNodeName); |
| DCHECK(channel); |
| |
| channel->SetRemoteNodeName(name); |
| |
| OutgoingMessageQueue pending_messages; |
| { |
| base::AutoLock lock(peers_lock_); |
| if (peers_.find(name) != peers_.end()) { |
| // This can happen normally if two nodes race to be introduced to each |
| // other. The losing pipe will be silently closed and introduction should |
| // not be affected. |
| DVLOG(1) << "Ignoring duplicate peer name " << name; |
| return; |
| } |
| |
| auto result = peers_.insert(std::make_pair(name, channel)); |
| DCHECK(result.second); |
| |
| DVLOG(2) << "Accepting new peer " << name << " on node " << name_; |
| |
| auto it = pending_peer_messages_.find(name); |
| if (it != pending_peer_messages_.end()) { |
| std::swap(pending_messages, it->second); |
| pending_peer_messages_.erase(it); |
| } |
| } |
| |
| if (start_channel) |
| channel->Start(); |
| |
| // Flush any queued message we need to deliver to this node. |
| while (!pending_messages.empty()) { |
| channel->SendChannelMessage(std::move(pending_messages.front())); |
| pending_messages.pop(); |
| } |
| } |
| |
| void NodeController::DropPeer(const ports::NodeName& name, |
| NodeChannel* channel) { |
| DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| |
| { |
| base::AutoLock lock(peers_lock_); |
| auto it = peers_.find(name); |
| |
| if (it != peers_.end()) { |
| ports::NodeName peer = it->first; |
| peers_.erase(it); |
| DVLOG(1) << "Dropped peer " << peer; |
| } |
| |
| pending_peer_messages_.erase(name); |
| pending_invitations_.erase(name); |
| } |
| |
| std::vector<ports::PortRef> ports_to_close; |
| { |
| // Clean up any reserved ports. |
| base::AutoLock lock(reserved_ports_lock_); |
| auto it = reserved_ports_.find(name); |
| if (it != reserved_ports_.end()) { |
| for (auto& entry : it->second) |
| ports_to_close.emplace_back(entry.second); |
| reserved_ports_.erase(it); |
| } |
| } |
| |
| bool is_inviter; |
| { |
| base::AutoLock lock(inviter_lock_); |
| is_inviter = (name == inviter_name_ || |
| (channel && channel == bootstrap_inviter_channel_)); |
| } |
| |
| // If the error comes from the inviter channel, we also need to cancel any |
| // port merge requests, so that errors can be propagated to the message |
| // pipes. |
| if (is_inviter) |
| CancelPendingPortMerges(); |
| |
| auto connection_it = pending_isolated_connections_.find(name); |
| if (connection_it != pending_isolated_connections_.end()) { |
| IsolatedConnection& connection = connection_it->second; |
| ports_to_close.push_back(connection.local_port); |
| if (!connection.name.empty()) |
| named_isolated_connections_.erase(connection.name); |
| pending_isolated_connections_.erase(connection_it); |
| } |
| |
| for (const auto& port : ports_to_close) |
| node_->ClosePort(port); |
| |
| node_->LostConnectionToNode(name); |
| AttemptShutdownIfRequested(); |
| } |
| |
| void NodeController::SendPeerEvent(const ports::NodeName& name, |
| ports::ScopedEvent event) { |
| Channel::MessagePtr event_message = SerializeEventMessage(std::move(event)); |
| if (!event_message) |
| return; |
| scoped_refptr<NodeChannel> peer = GetPeerChannel(name); |
| #if defined(OS_WIN) |
| if (event_message->has_handles()) { |
| // If we're sending a message with handles we aren't the destination |
| // node's inviter or broker (i.e. we don't know its process handle), ask |
| // the broker to relay for us. |
| scoped_refptr<NodeChannel> broker = GetBrokerChannel(); |
| if (!peer || !peer->HasRemoteProcessHandle()) { |
| if (!GetConfiguration().is_broker_process && broker) { |
| broker->RelayEventMessage(name, std::move(event_message)); |
| } else { |
| base::AutoLock lock(broker_lock_); |
| pending_relay_messages_[name].emplace(std::move(event_message)); |
| } |
| return; |
| } |
| } |
| #elif defined(OS_MACOSX) && !defined(OS_IOS) |
| if (event_message->has_mach_ports()) { |
| // Messages containing Mach ports are always routed through the broker, even |
| // if the broker process is the intended recipient. |
| bool use_broker = false; |
| if (!GetConfiguration().is_broker_process) { |
| base::AutoLock lock(inviter_lock_); |
| use_broker = (bootstrap_inviter_channel_ || |
| inviter_name_ != ports::kInvalidNodeName); |
| } |
| |
| if (use_broker) { |
| scoped_refptr<NodeChannel> broker = GetBrokerChannel(); |
| if (broker) { |
| broker->RelayEventMessage(name, std::move(event_message)); |
| } else { |
| base::AutoLock lock(broker_lock_); |
| pending_relay_messages_[name].emplace(std::move(event_message)); |
| } |
| return; |
| } |
| } |
| #endif // defined(OS_WIN) |
| |
| if (peer) { |
| peer->SendChannelMessage(std::move(event_message)); |
| return; |
| } |
| |
| // If we don't know who the peer is and we are the broker, we can only assume |
| // the peer is invalid, i.e., it's either a junk name or has already been |
| // disconnected. |
| scoped_refptr<NodeChannel> broker = GetBrokerChannel(); |
| if (!broker) { |
| DVLOG(1) << "Dropping message for unknown peer: " << name; |
| return; |
| } |
| |
| // If we aren't the broker, assume we just need to be introduced and queue |
| // until that can be either confirmed or denied by the broker. |
| bool needs_introduction = false; |
| { |
| base::AutoLock lock(peers_lock_); |
| // We may have been introduced on another thread by the time we get here. |
| // Double-check to be safe. |
| auto it = peers_.find(name); |
| if (it == peers_.end()) { |
| auto& queue = pending_peer_messages_[name]; |
| needs_introduction = queue.empty(); |
| queue.emplace(std::move(event_message)); |
| } else { |
| peer = it->second; |
| } |
| } |
| if (needs_introduction) |
| broker->RequestIntroduction(name); |
| else if (peer) |
| peer->SendChannelMessage(std::move(event_message)); |
| } |
| |
| void NodeController::DropAllPeers() { |
| DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| |
| std::vector<scoped_refptr<NodeChannel>> all_peers; |
| { |
| base::AutoLock lock(inviter_lock_); |
| if (bootstrap_inviter_channel_) { |
| // |bootstrap_inviter_channel_| isn't null'd here becuase we rely on its |
| // existence to determine whether or not this is the root node. Once |
| // bootstrap_inviter_channel_->ShutDown() has been called, |
| // |bootstrap_inviter_channel_| is essentially a dead object and it |
| // doesn't matter if it's deleted now or when |this| is deleted. Note: |
| // |bootstrap_inviter_channel_| is only modified on the IO thread. |
| all_peers.push_back(bootstrap_inviter_channel_); |
| } |
| } |
| |
| { |
| base::AutoLock lock(peers_lock_); |
| for (const auto& peer : peers_) |
| all_peers.push_back(peer.second); |
| for (const auto& peer : pending_invitations_) |
| all_peers.push_back(peer.second); |
| peers_.clear(); |
| pending_invitations_.clear(); |
| pending_peer_messages_.clear(); |
| pending_isolated_connections_.clear(); |
| named_isolated_connections_.clear(); |
| } |
| |
| for (const auto& peer : all_peers) |
| peer->ShutDown(); |
| |
| if (destroy_on_io_thread_shutdown_) |
| delete this; |
| } |
| |
| void NodeController::ForwardEvent(const ports::NodeName& node, |
| ports::ScopedEvent event) { |
| DCHECK(event); |
| if (node == name_) |
| node_->AcceptEvent(std::move(event)); |
| else |
| SendPeerEvent(node, std::move(event)); |
| |
| AttemptShutdownIfRequested(); |
| } |
| |
| void NodeController::BroadcastEvent(ports::ScopedEvent event) { |
| Channel::MessagePtr channel_message = SerializeEventMessage(std::move(event)); |
| DCHECK(channel_message && !channel_message->has_handles()); |
| |
| scoped_refptr<NodeChannel> broker = GetBrokerChannel(); |
| if (broker) |
| broker->Broadcast(std::move(channel_message)); |
| else |
| OnBroadcast(name_, std::move(channel_message)); |
| } |
| |
| void NodeController::PortStatusChanged(const ports::PortRef& port) { |
| scoped_refptr<ports::UserData> user_data; |
| node_->GetUserData(port, &user_data); |
| |
| PortObserver* observer = static_cast<PortObserver*>(user_data.get()); |
| if (observer) { |
| observer->OnPortStatusChanged(); |
| } else { |
| DVLOG(2) << "Ignoring status change for " << port.name() << " because it " |
| << "doesn't have an observer."; |
| } |
| } |
| |
| void NodeController::OnAcceptInvitee(const ports::NodeName& from_node, |
| const ports::NodeName& inviter_name, |
| const ports::NodeName& token) { |
| DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| |
| scoped_refptr<NodeChannel> inviter; |
| { |
| base::AutoLock lock(inviter_lock_); |
| if (bootstrap_inviter_channel_ && |
| inviter_name_ == ports::kInvalidNodeName) { |
| inviter_name_ = inviter_name; |
| inviter = bootstrap_inviter_channel_; |
| } |
| } |
| |
| if (!inviter) { |
| DLOG(ERROR) << "Unexpected AcceptInvitee message from " << from_node; |
| DropPeer(from_node, nullptr); |
| return; |
| } |
| |
| inviter->SetRemoteNodeName(inviter_name); |
| inviter->AcceptInvitation(token, name_); |
| |
| // NOTE: The invitee does not actually add its inviter as a peer until |
| // receiving an AcceptBrokerClient message from the broker. The inviter will |
| // request that said message be sent upon receiving AcceptInvitation. |
| |
| DVLOG(1) << "Broker client " << name_ << " accepting invitation from " |
| << inviter_name; |
| } |
| |
| void NodeController::OnAcceptInvitation(const ports::NodeName& from_node, |
| const ports::NodeName& token, |
| const ports::NodeName& invitee_name) { |
| DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| |
| auto it = pending_invitations_.find(from_node); |
| if (it == pending_invitations_.end() || token != from_node) { |
| DLOG(ERROR) << "Received unexpected AcceptInvitation message from " |
| << from_node; |
| DropPeer(from_node, nullptr); |
| return; |
| } |
| |
| { |
| base::AutoLock lock(reserved_ports_lock_); |
| auto it = reserved_ports_.find(from_node); |
| if (it != reserved_ports_.end()) { |
| // Swap the temporary node name's reserved ports into an entry keyed by |
| // the real node name. |
| auto result = |
| reserved_ports_.emplace(invitee_name, std::move(it->second)); |
| DCHECK(result.second); |
| reserved_ports_.erase(it); |
| } |
| } |
| |
| scoped_refptr<NodeChannel> channel = it->second; |
| pending_invitations_.erase(it); |
| |
| DCHECK(channel); |
| |
| DVLOG(1) << "Node " << name_ << " accepted invitee " << invitee_name; |
| |
| AddPeer(invitee_name, channel, false /* start_channel */); |
| |
| // TODO(rockot): We could simplify invitee initialization if we could |
| // synchronously get a new async broker channel from the broker. For now we do |
| // it asynchronously since it's only used to facilitate handle passing, not |
| // handle creation. |
| scoped_refptr<NodeChannel> broker = GetBrokerChannel(); |
| if (broker) { |
| // Inform the broker of this new client. |
| broker->AddBrokerClient(invitee_name, channel->CloneRemoteProcessHandle()); |
| } else { |
| // If we have no broker, either we need to wait for one, or we *are* the |
| // broker. |
| scoped_refptr<NodeChannel> inviter = GetInviterChannel(); |
| if (!inviter) { |
| base::AutoLock lock(inviter_lock_); |
| inviter = bootstrap_inviter_channel_; |
| } |
| |
| if (!inviter) { |
| // Yes, we're the broker. We can initialize the client directly. |
| channel->AcceptBrokerClient(name_, PlatformHandle()); |
| } else { |
| // We aren't the broker, so wait for a broker connection. |
| base::AutoLock lock(broker_lock_); |
| pending_broker_clients_.push(invitee_name); |
| } |
| } |
| } |
| |
| void NodeController::OnAddBrokerClient(const ports::NodeName& from_node, |
| const ports::NodeName& client_name, |
| base::ProcessHandle process_handle) { |
| ScopedProcessHandle scoped_process_handle(process_handle); |
| |
| scoped_refptr<NodeChannel> sender = GetPeerChannel(from_node); |
| if (!sender) { |
| DLOG(ERROR) << "Ignoring AddBrokerClient from unknown sender."; |
| return; |
| } |
| |
| if (GetPeerChannel(client_name)) { |
| DLOG(ERROR) << "Ignoring AddBrokerClient for known client."; |
| DropPeer(from_node, nullptr); |
| return; |
| } |
| |
| PlatformChannel broker_channel; |
| ConnectionParams connection_params(broker_channel.TakeLocalEndpoint()); |
| scoped_refptr<NodeChannel> client = |
| NodeChannel::Create(this, std::move(connection_params), io_task_runner_, |
| ProcessErrorCallback()); |
| |
| #if defined(OS_WIN) |
| // The broker must have a working handle to the client process in order to |
| // properly copy other handles to and from the client. |
| if (!scoped_process_handle.is_valid()) { |
| DLOG(ERROR) << "Broker rejecting client with invalid process handle."; |
| return; |
| } |
| #endif |
| client->SetRemoteProcessHandle(std::move(scoped_process_handle)); |
| |
| AddPeer(client_name, client, true /* start_channel */); |
| |
| DVLOG(1) << "Broker " << name_ << " accepting client " << client_name |
| << " from peer " << from_node; |
| |
| sender->BrokerClientAdded( |
| client_name, broker_channel.TakeRemoteEndpoint().TakePlatformHandle()); |
| } |
| |
| void NodeController::OnBrokerClientAdded(const ports::NodeName& from_node, |
| const ports::NodeName& client_name, |
| PlatformHandle broker_channel) { |
| scoped_refptr<NodeChannel> client = GetPeerChannel(client_name); |
| if (!client) { |
| DLOG(ERROR) << "BrokerClientAdded for unknown client " << client_name; |
| return; |
| } |
| |
| // This should have come from our own broker. |
| if (GetBrokerChannel() != GetPeerChannel(from_node)) { |
| DLOG(ERROR) << "BrokerClientAdded from non-broker node " << from_node; |
| return; |
| } |
| |
| DVLOG(1) << "Client " << client_name << " accepted by broker " << from_node; |
| |
| client->AcceptBrokerClient(from_node, std::move(broker_channel)); |
| } |
| |
| void NodeController::OnAcceptBrokerClient(const ports::NodeName& from_node, |
| const ports::NodeName& broker_name, |
| PlatformHandle broker_channel) { |
| DCHECK(!GetConfiguration().is_broker_process); |
| |
| // This node should already have an inviter in bootstrap mode. |
| ports::NodeName inviter_name; |
| scoped_refptr<NodeChannel> inviter; |
| { |
| base::AutoLock lock(inviter_lock_); |
| inviter_name = inviter_name_; |
| inviter = bootstrap_inviter_channel_; |
| bootstrap_inviter_channel_ = nullptr; |
| } |
| DCHECK(inviter_name == from_node); |
| DCHECK(inviter); |
| |
| base::queue<ports::NodeName> pending_broker_clients; |
| std::unordered_map<ports::NodeName, OutgoingMessageQueue> |
| pending_relay_messages; |
| { |
| base::AutoLock lock(broker_lock_); |
| broker_name_ = broker_name; |
| std::swap(pending_broker_clients, pending_broker_clients_); |
| std::swap(pending_relay_messages, pending_relay_messages_); |
| } |
| DCHECK(broker_name != ports::kInvalidNodeName); |
| |
| // It's now possible to add both the broker and the inviter as peers. |
| // Note that the broker and inviter may be the same node. |
| scoped_refptr<NodeChannel> broker; |
| if (broker_name == inviter_name) { |
| DCHECK(!broker_channel.is_valid()); |
| broker = inviter; |
| } else { |
| DCHECK(broker_channel.is_valid()); |
| broker = NodeChannel::Create( |
| this, |
| ConnectionParams(PlatformChannelEndpoint(std::move(broker_channel))), |
| io_task_runner_, ProcessErrorCallback()); |
| AddPeer(broker_name, broker, true /* start_channel */); |
| } |
| |
| AddPeer(inviter_name, inviter, false /* start_channel */); |
| |
| { |
| // Complete any port merge requests we have waiting for the inviter. |
| base::AutoLock lock(pending_port_merges_lock_); |
| for (const auto& request : pending_port_merges_) |
| inviter->RequestPortMerge(request.second.name(), request.first); |
| pending_port_merges_.clear(); |
| } |
| |
| // Feed the broker any pending invitees of our own. |
| while (!pending_broker_clients.empty()) { |
| const ports::NodeName& invitee_name = pending_broker_clients.front(); |
| auto it = peers_.find(invitee_name); |
| if (it != peers_.end()) { |
| broker->AddBrokerClient(invitee_name, |
| it->second->CloneRemoteProcessHandle()); |
| } |
| pending_broker_clients.pop(); |
| } |
| |
| #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) |
| // Have the broker relay any messages we have waiting. |
| for (auto& entry : pending_relay_messages) { |
| const ports::NodeName& destination = entry.first; |
| auto& message_queue = entry.second; |
| while (!message_queue.empty()) { |
| broker->RelayEventMessage(destination, std::move(message_queue.front())); |
| message_queue.pop(); |
| } |
| } |
| #endif |
| |
| DVLOG(1) << "Client " << name_ << " accepted by broker " << broker_name; |
| } |
| |
| void NodeController::OnEventMessage(const ports::NodeName& from_node, |
| Channel::MessagePtr channel_message) { |
| DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| |
| auto event = DeserializeEventMessage(from_node, std::move(channel_message)); |
| if (!event) { |
| // We silently ignore unparseable events, as they may come from a process |
| // running a newer version of Mojo. |
| DVLOG(1) << "Ignoring invalid or unknown event from " << from_node; |
| return; |
| } |
| |
| node_->AcceptEvent(std::move(event)); |
| |
| AttemptShutdownIfRequested(); |
| } |
| |
| void NodeController::OnRequestPortMerge( |
| const ports::NodeName& from_node, |
| const ports::PortName& connector_port_name, |
| const std::string& name) { |
| DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| |
| DVLOG(2) << "Node " << name_ << " received RequestPortMerge for name " << name |
| << " and port " << connector_port_name << "@" << from_node; |
| |
| ports::PortRef local_port; |
| { |
| base::AutoLock lock(reserved_ports_lock_); |
| auto it = reserved_ports_.find(from_node); |
| // TODO(https://crbug.com/822034): We should send a notification back to the |
| // requestor so they can clean up their dangling port in this failure case. |
| // This requires changes to the internal protocol, which can't be made yet. |
| // Until this is done, pipes from |MojoExtractMessagePipeFromInvitation()| |
| // will never break if the given name was invalid. |
| if (it == reserved_ports_.end()) { |
| DVLOG(1) << "Ignoring port merge request from node " << from_node << ". " |
| << "No ports reserved for that node."; |
| return; |
| } |
| |
| PortMap& port_map = it->second; |
| auto port_it = port_map.find(name); |
| if (port_it == port_map.end()) { |
| DVLOG(1) << "Ignoring request to connect to port for unknown name " |
| << name << " from node " << from_node; |
| return; |
| } |
| local_port = port_it->second; |
| port_map.erase(port_it); |
| if (port_map.empty()) |
| reserved_ports_.erase(it); |
| } |
| |
| int rv = node_->MergePorts(local_port, from_node, connector_port_name); |
| if (rv != ports::OK) |
| DLOG(ERROR) << "MergePorts failed: " << rv; |
| } |
| |
| void NodeController::OnRequestIntroduction(const ports::NodeName& from_node, |
| const ports::NodeName& name) { |
| DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| |
| scoped_refptr<NodeChannel> requestor = GetPeerChannel(from_node); |
| if (from_node == name || name == ports::kInvalidNodeName || !requestor) { |
| DLOG(ERROR) << "Rejecting invalid OnRequestIntroduction message from " |
| << from_node; |
| DropPeer(from_node, nullptr); |
| return; |
| } |
| |
| scoped_refptr<NodeChannel> new_friend = GetPeerChannel(name); |
| if (!new_friend) { |
| // We don't know who they're talking about! |
| requestor->Introduce(name, PlatformHandle()); |
| } else { |
| PlatformChannel new_channel; |
| requestor->Introduce(name, |
| new_channel.TakeLocalEndpoint().TakePlatformHandle()); |
| new_friend->Introduce( |
| from_node, new_channel.TakeRemoteEndpoint().TakePlatformHandle()); |
| } |
| } |
| |
| void NodeController::OnIntroduce(const ports::NodeName& from_node, |
| const ports::NodeName& name, |
| PlatformHandle channel_handle) { |
| DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| |
| if (!channel_handle.is_valid()) { |
| node_->LostConnectionToNode(name); |
| |
| DVLOG(1) << "Could not be introduced to peer " << name; |
| base::AutoLock lock(peers_lock_); |
| pending_peer_messages_.erase(name); |
| return; |
| } |
| |
| scoped_refptr<NodeChannel> channel = NodeChannel::Create( |
| this, |
| ConnectionParams(PlatformChannelEndpoint(std::move(channel_handle))), |
| io_task_runner_, ProcessErrorCallback()); |
| |
| DVLOG(1) << "Adding new peer " << name << " via broker introduction."; |
| AddPeer(name, channel, true /* start_channel */); |
| } |
| |
| void NodeController::OnBroadcast(const ports::NodeName& from_node, |
| Channel::MessagePtr message) { |
| DCHECK(!message->has_handles()); |
| |
| auto event = DeserializeEventMessage(from_node, std::move(message)); |
| if (!event) { |
| // We silently ignore unparseable events, as they may come from a process |
| // running a newer version of Mojo. |
| DVLOG(1) << "Ignoring request to broadcast invalid or unknown event from " |
| << from_node; |
| return; |
| } |
| |
| base::AutoLock lock(peers_lock_); |
| for (auto& iter : peers_) { |
| // Clone and send the event to each known peer. Events which cannot be |
| // cloned cannot be broadcast. |
| ports::ScopedEvent clone = event->Clone(); |
| if (!clone) { |
| DVLOG(1) << "Ignoring request to broadcast invalid event from " |
| << from_node << " [type=" << static_cast<uint32_t>(event->type()) |
| << "]"; |
| return; |
| } |
| |
| iter.second->SendChannelMessage(SerializeEventMessage(std::move(clone))); |
| } |
| } |
| |
| #if defined(OS_WIN) || (defined(OS_MACOSX) && !defined(OS_IOS)) |
| void NodeController::OnRelayEventMessage(const ports::NodeName& from_node, |
| base::ProcessHandle from_process, |
| const ports::NodeName& destination, |
| Channel::MessagePtr message) { |
| // The broker should always know which process this came from. |
| DCHECK(from_process != base::kNullProcessHandle); |
| DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| |
| if (GetBrokerChannel()) { |
| // Only the broker should be asked to relay a message. |
| LOG(ERROR) << "Non-broker refusing to relay message."; |
| DropPeer(from_node, nullptr); |
| return; |
| } |
| |
| if (destination == name_) { |
| // Great, we can deliver this message locally. |
| OnEventMessage(from_node, std::move(message)); |
| return; |
| } |
| |
| scoped_refptr<NodeChannel> peer = GetPeerChannel(destination); |
| if (peer) |
| peer->EventMessageFromRelay(from_node, std::move(message)); |
| else |
| DLOG(ERROR) << "Dropping relay message for unknown node " << destination; |
| } |
| |
| void NodeController::OnEventMessageFromRelay(const ports::NodeName& from_node, |
| const ports::NodeName& source_node, |
| Channel::MessagePtr message) { |
| if (GetPeerChannel(from_node) != GetBrokerChannel()) { |
| LOG(ERROR) << "Refusing relayed message from non-broker node."; |
| DropPeer(from_node, nullptr); |
| return; |
| } |
| |
| OnEventMessage(source_node, std::move(message)); |
| } |
| #endif |
| |
| void NodeController::OnAcceptPeer(const ports::NodeName& from_node, |
| const ports::NodeName& token, |
| const ports::NodeName& peer_name, |
| const ports::PortName& port_name) { |
| DCHECK(io_task_runner_->RunsTasksInCurrentSequence()); |
| |
| auto it = pending_isolated_connections_.find(from_node); |
| if (it == pending_isolated_connections_.end()) { |
| DLOG(ERROR) << "Received unexpected AcceptPeer message from " << from_node; |
| DropPeer(from_node, nullptr); |
| return; |
| } |
| |
| IsolatedConnection& connection = it->second; |
| scoped_refptr<NodeChannel> channel = std::move(connection.channel); |
| ports::PortRef local_port = connection.local_port; |
| if (!connection.name.empty()) |
| named_isolated_connections_[connection.name] = peer_name; |
| pending_isolated_connections_.erase(it); |
| DCHECK(channel); |
| |
| if (name_ != peer_name) { |
| // It's possible (e.g. in tests) that we may "connect" to ourself, in which |
| // case we skip this |AddPeer()| call and go straight to merging ports. |
| // Note that we explicitly drop any prior connection to the same peer so |
| // that new isolated connections can replace old ones. |
| DropPeer(peer_name, nullptr); |
| AddPeer(peer_name, channel, false /* start_channel */); |
| DVLOG(1) << "Node " << name_ << " accepted peer " << peer_name; |
| } |
| |
| // We need to choose one side to initiate the port merge. It doesn't matter |
| // who does it as long as they don't both try. Simple solution: pick the one |
| // with the "smaller" port name. |
| if (local_port.name() < port_name) |
| node()->MergePorts(local_port, peer_name, port_name); |
| } |
| |
| void NodeController::OnChannelError(const ports::NodeName& from_node, |
| NodeChannel* channel) { |
| if (io_task_runner_->RunsTasksInCurrentSequence()) { |
| RequestContext request_context(RequestContext::Source::SYSTEM); |
| DropPeer(from_node, channel); |
| } else { |
| io_task_runner_->PostTask( |
| FROM_HERE, |
| base::Bind(&NodeController::OnChannelError, base::Unretained(this), |
| from_node, base::RetainedRef(channel))); |
| } |
| } |
| |
| #if defined(OS_MACOSX) && !defined(OS_IOS) |
| MachPortRelay* NodeController::GetMachPortRelay() { |
| { |
| base::AutoLock lock(inviter_lock_); |
| // Return null if we're not the root. |
| if (bootstrap_inviter_channel_ || inviter_name_ != ports::kInvalidNodeName) |
| return nullptr; |
| } |
| |
| base::AutoLock lock(mach_port_relay_lock_); |
| return mach_port_relay_.get(); |
| } |
| #endif |
| |
| void NodeController::CancelPendingPortMerges() { |
| std::vector<ports::PortRef> ports_to_close; |
| |
| { |
| base::AutoLock lock(pending_port_merges_lock_); |
| reject_pending_merges_ = true; |
| for (const auto& port : pending_port_merges_) |
| ports_to_close.push_back(port.second); |
| pending_port_merges_.clear(); |
| } |
| |
| for (const auto& port : ports_to_close) |
| node_->ClosePort(port); |
| } |
| |
| void NodeController::DestroyOnIOThreadShutdown() { |
| destroy_on_io_thread_shutdown_ = true; |
| } |
| |
| void NodeController::AttemptShutdownIfRequested() { |
| if (!shutdown_callback_flag_) |
| return; |
| |
| base::Closure callback; |
| { |
| base::AutoLock lock(shutdown_lock_); |
| if (shutdown_callback_.is_null()) |
| return; |
| if (!node_->CanShutdownCleanly( |
| ports::Node::ShutdownPolicy::ALLOW_LOCAL_PORTS)) { |
| DVLOG(2) << "Unable to cleanly shut down node " << name_; |
| return; |
| } |
| |
| callback = shutdown_callback_; |
| shutdown_callback_.Reset(); |
| shutdown_callback_flag_.Set(false); |
| } |
| |
| DCHECK(!callback.is_null()); |
| |
| callback.Run(); |
| } |
| |
| NodeController::IsolatedConnection::IsolatedConnection() = default; |
| |
| NodeController::IsolatedConnection::IsolatedConnection( |
| const IsolatedConnection& other) = default; |
| |
| NodeController::IsolatedConnection::IsolatedConnection( |
| IsolatedConnection&& other) = default; |
| |
| NodeController::IsolatedConnection::IsolatedConnection( |
| scoped_refptr<NodeChannel> channel, |
| const ports::PortRef& local_port, |
| base::StringPiece name) |
| : channel(std::move(channel)), local_port(local_port), name(name) {} |
| |
| NodeController::IsolatedConnection::~IsolatedConnection() = default; |
| |
| NodeController::IsolatedConnection& NodeController::IsolatedConnection:: |
| operator=(const IsolatedConnection& other) = default; |
| |
| NodeController::IsolatedConnection& NodeController::IsolatedConnection:: |
| operator=(IsolatedConnection&& other) = default; |
| |
| } // namespace core |
| } // namespace mojo |