blob: 0c897d519985400ffdab2abdd194157a5b4d200d [file] [log] [blame]
# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'forwardable'
require 'weakref'
require_relative 'bidi_call'
class Struct
# BatchResult is the struct returned by calls to call#start_batch.
class BatchResult
# check_status returns the status, raising an error if the status
# is non-nil and not OK.
def check_status
return nil if status.nil?
if status.code != GRPC::Core::StatusCodes::OK
GRPC.logger.debug("Failing with status #{status}")
# raise BadStatus, propagating the metadata if present.
fail GRPC::BadStatus.new_status_exception(
status.code, status.details, status.metadata,
status.debug_error_string)
end
status
end
end
end
# GRPC contains the General RPC module.
module GRPC
# The ActiveCall class provides simple methods for sending marshallable
# data to a call
class ActiveCall # rubocop:disable Metrics/ClassLength
include Core::TimeConsts
include Core::CallOps
extend Forwardable
attr_reader :deadline, :metadata_sent, :metadata_to_send, :peer, :peer_cert
def_delegators :@call, :cancel, :metadata, :write_flag, :write_flag=,
:trailing_metadata, :status
# client_invoke begins a client invocation.
#
# Flow Control note: this blocks until flow control accepts that client
# request can go ahead.
#
# deadline is the absolute deadline for the call.
#
# == Keyword Arguments ==
# any keyword arguments are treated as metadata to be sent to the server
# if a keyword value is a list, multiple metadata for it's key are sent
#
# @param call [Call] a call on which to start and invocation
# @param metadata [Hash] the metadata
def self.client_invoke(call, metadata = {})
fail(TypeError, '!Core::Call') unless call.is_a? Core::Call
call.run_batch(SEND_INITIAL_METADATA => metadata)
end
# Creates an ActiveCall.
#
# ActiveCall should only be created after a call is accepted. That
# means different things on a client and a server. On the client, the
# call is accepted after calling call.invoke. On the server, this is
# after call.accept.
#
# #initialize cannot determine if the call is accepted or not; so if a
# call that's not accepted is used here, the error won't be visible until
# the ActiveCall methods are called.
#
# deadline is the absolute deadline for the call.
#
# @param call [Call] the call used by the ActiveCall
# @param marshal [Function] f(obj)->string that marshal requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param deadline [Fixnum] the deadline for the call to complete
# @param started [true|false] indicates that metadata was sent
# @param metadata_received [true|false] indicates if metadata has already
# been received. Should always be true for server calls
def initialize(call, marshal, unmarshal, deadline, started: true,
metadata_received: false, metadata_to_send: nil)
fail(TypeError, '!Core::Call') unless call.is_a? Core::Call
@call = call
@deadline = deadline
@marshal = marshal
@unmarshal = unmarshal
@metadata_received = metadata_received
@metadata_sent = started
@op_notifier = nil
fail(ArgumentError, 'Already sent md') if started && metadata_to_send
@metadata_to_send = metadata_to_send || {} unless started
@send_initial_md_mutex = Mutex.new
@output_stream_done = false
@input_stream_done = false
@call_finished = false
@call_finished_mu = Mutex.new
@client_call_executed = false
@client_call_executed_mu = Mutex.new
# set the peer now so that the accessor can still function
# after the server closes the call
@peer = call.peer
end
# Sends the initial metadata that has yet to be sent.
# Does nothing if metadata has already been sent for this call.
def send_initial_metadata(new_metadata = {})
@send_initial_md_mutex.synchronize do
return if @metadata_sent
@metadata_to_send.merge!(new_metadata)
ActiveCall.client_invoke(@call, @metadata_to_send)
@metadata_sent = true
end
end
# output_metadata are provides access to hash that can be used to
# save metadata to be sent as trailer
def output_metadata
@output_metadata ||= {}
end
# cancelled indicates if the call was cancelled
def cancelled?
!@call.status.nil? && @call.status.code == Core::StatusCodes::CANCELLED
end
# multi_req_view provides a restricted view of this ActiveCall for use
# in a server client-streaming handler.
def multi_req_view
MultiReqView.new(self)
end
# single_req_view provides a restricted view of this ActiveCall for use in
# a server request-response handler.
def single_req_view
SingleReqView.new(self)
end
# operation provides a restricted view of this ActiveCall for use as
# a Operation.
def operation
@op_notifier = Notifier.new
Operation.new(self)
end
##
# Returns a restricted view of this ActiveCall for use in interceptors
#
# @return [InterceptableView]
#
def interceptable
InterceptableView.new(self)
end
def receive_and_check_status
ops = { RECV_STATUS_ON_CLIENT => nil }
ops[RECV_INITIAL_METADATA] = nil unless @metadata_received
batch_result = @call.run_batch(ops)
unless @metadata_received
@call.metadata = batch_result.metadata
@metadata_received = true
end
set_input_stream_done
attach_status_results_and_complete_call(batch_result)
end
def attach_status_results_and_complete_call(recv_status_batch_result)
unless recv_status_batch_result.status.nil?
@call.trailing_metadata = recv_status_batch_result.status.metadata
end
@call.status = recv_status_batch_result.status
# The RECV_STATUS in run_batch always succeeds
# Check the status for a bad status or failed run batch
recv_status_batch_result.check_status
end
# remote_send sends a request to the remote endpoint.
#
# It blocks until the remote endpoint accepts the message.
#
# @param req [Object, String] the object to send or it's marshal form.
# @param marshalled [false, true] indicates if the object is already
# marshalled.
def remote_send(req, marshalled = false)
send_initial_metadata
GRPC.logger.debug("sending #{req}, marshalled? #{marshalled}")
payload = marshalled ? req : @marshal.call(req)
@call.run_batch(SEND_MESSAGE => payload)
end
# send_status sends a status to the remote endpoint.
#
# @param code [int] the status code to send
# @param details [String] details
# @param assert_finished [true, false] when true(default), waits for
# FINISHED.
# @param metadata [Hash] metadata to send to the server. If a value is a
# list, mulitple metadata for its key are sent
def send_status(code = OK, details = '', assert_finished = false,
metadata: {})
send_initial_metadata
ops = {
SEND_STATUS_FROM_SERVER => Struct::Status.new(code, details, metadata)
}
ops[RECV_CLOSE_ON_SERVER] = nil if assert_finished
@call.run_batch(ops)
set_output_stream_done
nil
end
# Intended for use on server-side calls when a single request from
# the client is expected (i.e., unary and server-streaming RPC types).
def read_unary_request
req = remote_read
set_input_stream_done
req
end
def server_unary_response(req, trailing_metadata: {},
code: Core::StatusCodes::OK, details: 'OK')
ops = {}
ops[SEND_MESSAGE] = @marshal.call(req)
ops[SEND_STATUS_FROM_SERVER] = Struct::Status.new(
code, details, trailing_metadata)
ops[RECV_CLOSE_ON_SERVER] = nil
@send_initial_md_mutex.synchronize do
ops[SEND_INITIAL_METADATA] = @metadata_to_send unless @metadata_sent
@metadata_sent = true
end
@call.run_batch(ops)
set_output_stream_done
end
# remote_read reads a response from the remote endpoint.
#
# It blocks until the remote endpoint replies with a message or status.
# On receiving a message, it returns the response after unmarshalling it.
# On receiving a status, it returns nil if the status is OK, otherwise
# raising BadStatus
def remote_read
ops = { RECV_MESSAGE => nil }
ops[RECV_INITIAL_METADATA] = nil unless @metadata_received
batch_result = @call.run_batch(ops)
unless @metadata_received
@call.metadata = batch_result.metadata
@metadata_received = true
end
get_message_from_batch_result(batch_result)
rescue GRPC::Core::CallError => e
GRPC.logger.info("remote_read: #{e}")
nil
end
def get_message_from_batch_result(recv_message_batch_result)
unless recv_message_batch_result.nil? ||
recv_message_batch_result.message.nil?
return @unmarshal.call(recv_message_batch_result.message)
end
GRPC.logger.debug('found nil; the final response has been sent')
nil
end
# each_remote_read passes each response to the given block or returns an
# enumerator the responses if no block is given.
# Used to generate the request enumerable for
# server-side client-streaming RPC's.
#
# == Enumerator ==
#
# * #next blocks until the remote endpoint sends a READ or FINISHED
# * for each read, enumerator#next yields the response
# * on status
# * if it's is OK, enumerator#next raises StopException
# * if is not OK, enumerator#next raises RuntimeException
#
# == Block ==
#
# * if provided it is executed for each response
# * the call blocks until no more responses are provided
#
# @return [Enumerator] if no block was given
def each_remote_read
return enum_for(:each_remote_read) unless block_given?
begin
loop do
resp = remote_read
break if resp.nil? # the last response was received
yield resp
end
ensure
set_input_stream_done
end
end
# each_remote_read_then_finish passes each response to the given block or
# returns an enumerator of the responses if no block is given.
#
# It is like each_remote_read, but it blocks on finishing on detecting
# the final message.
#
# == Enumerator ==
#
# * #next blocks until the remote endpoint sends a READ or FINISHED
# * for each read, enumerator#next yields the response
# * on status
# * if it's is OK, enumerator#next raises StopException
# * if is not OK, enumerator#next raises RuntimeException
#
# == Block ==
#
# * if provided it is executed for each response
# * the call blocks until no more responses are provided
#
# @return [Enumerator] if no block was given
def each_remote_read_then_finish
return enum_for(:each_remote_read_then_finish) unless block_given?
loop do
resp = remote_read
break if resp.nil? # the last response was received
yield resp
end
receive_and_check_status
ensure
set_input_stream_done
end
# request_response sends a request to a GRPC server, and returns the
# response.
#
# @param req [Object] the request sent to the server
# @param metadata [Hash] metadata to be sent to the server. If a value is
# a list, multiple metadata for its key are sent
# @return [Object] the response received from the server
def request_response(req, metadata: {})
raise_error_if_already_executed
ops = {
SEND_MESSAGE => @marshal.call(req),
SEND_CLOSE_FROM_CLIENT => nil,
RECV_INITIAL_METADATA => nil,
RECV_MESSAGE => nil,
RECV_STATUS_ON_CLIENT => nil
}
@send_initial_md_mutex.synchronize do
# Metadata might have already been sent if this is an operation view
unless @metadata_sent
ops[SEND_INITIAL_METADATA] = @metadata_to_send.merge!(metadata)
end
@metadata_sent = true
end
begin
batch_result = @call.run_batch(ops)
# no need to check for cancellation after a CallError because this
# batch contains a RECV_STATUS op
ensure
set_input_stream_done
set_output_stream_done
end
@call.metadata = batch_result.metadata
attach_status_results_and_complete_call(batch_result)
get_message_from_batch_result(batch_result)
end
# client_streamer sends a stream of requests to a GRPC server, and
# returns a single response.
#
# requests provides an 'iterable' of Requests. I.e. it follows Ruby's
# #each enumeration protocol. In the simplest case, requests will be an
# array of marshallable objects; in typical case it will be an Enumerable
# that allows dynamic construction of the marshallable objects.
#
# @param requests [Object] an Enumerable of requests to send
# @param metadata [Hash] metadata to be sent to the server. If a value is
# a list, multiple metadata for its key are sent
# @return [Object] the response received from the server
def client_streamer(requests, metadata: {})
raise_error_if_already_executed
begin
send_initial_metadata(metadata)
requests.each { |r| @call.run_batch(SEND_MESSAGE => @marshal.call(r)) }
rescue GRPC::Core::CallError => e
receive_and_check_status # check for Cancelled
raise e
rescue => e
set_input_stream_done
raise e
ensure
set_output_stream_done
end
batch_result = @call.run_batch(
SEND_CLOSE_FROM_CLIENT => nil,
RECV_INITIAL_METADATA => nil,
RECV_MESSAGE => nil,
RECV_STATUS_ON_CLIENT => nil
)
set_input_stream_done
@call.metadata = batch_result.metadata
attach_status_results_and_complete_call(batch_result)
get_message_from_batch_result(batch_result)
end
# server_streamer sends one request to the GRPC server, which yields a
# stream of responses.
#
# responses provides an enumerator over the streamed responses, i.e. it
# follows Ruby's #each iteration protocol. The enumerator blocks while
# waiting for each response, stops when the server signals that no
# further responses will be supplied. If the implicit block is provided,
# it is executed with each response as the argument and no result is
# returned.
#
# @param req [Object] the request sent to the server
# @param metadata [Hash] metadata to be sent to the server. If a value is
# a list, multiple metadata for its key are sent
# @return [Enumerator|nil] a response Enumerator
def server_streamer(req, metadata: {})
raise_error_if_already_executed
ops = {
SEND_MESSAGE => @marshal.call(req),
SEND_CLOSE_FROM_CLIENT => nil
}
@send_initial_md_mutex.synchronize do
# Metadata might have already been sent if this is an operation view
unless @metadata_sent
ops[SEND_INITIAL_METADATA] = @metadata_to_send.merge!(metadata)
end
@metadata_sent = true
end
begin
@call.run_batch(ops)
rescue GRPC::Core::CallError => e
receive_and_check_status # checks for Cancelled
raise e
rescue => e
set_input_stream_done
raise e
ensure
set_output_stream_done
end
replies = enum_for(:each_remote_read_then_finish)
return replies unless block_given?
replies.each { |r| yield r }
end
# bidi_streamer sends a stream of requests to the GRPC server, and yields
# a stream of responses.
#
# This method takes an Enumerable of requests, and returns and enumerable
# of responses.
#
# == requests ==
#
# requests provides an 'iterable' of Requests. I.e. it follows Ruby's
# #each enumeration protocol. In the simplest case, requests will be an
# array of marshallable objects; in typical case it will be an
# Enumerable that allows dynamic construction of the marshallable
# objects.
#
# == responses ==
#
# This is an enumerator of responses. I.e, its #next method blocks
# waiting for the next response. Also, if at any point the block needs
# to consume all the remaining responses, this can be done using #each or
# #collect. Calling #each or #collect should only be done if
# the_call#writes_done has been called, otherwise the block will loop
# forever.
#
# @param requests [Object] an Enumerable of requests to send
# @param metadata [Hash] metadata to be sent to the server. If a value is
# a list, multiple metadata for its key are sent
# @return [Enumerator, nil] a response Enumerator
def bidi_streamer(requests, metadata: {}, &blk)
raise_error_if_already_executed
# Metadata might have already been sent if this is an operation view
begin
send_initial_metadata(metadata)
rescue GRPC::Core::CallError => e
batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
set_input_stream_done
set_output_stream_done
attach_status_results_and_complete_call(batch_result)
raise e
rescue => e
set_input_stream_done
set_output_stream_done
raise e
end
bd = BidiCall.new(@call,
@marshal,
@unmarshal,
metadata_received: @metadata_received)
bd.run_on_client(requests,
proc { set_input_stream_done },
proc { set_output_stream_done },
&blk)
end
# run_server_bidi orchestrates a BiDi stream processing on a server.
#
# N.B. gen_each_reply is a func(Enumerable<Requests>)
#
# It takes an enumerable of requests as an arg, in case there is a
# relationship between the stream of requests and the stream of replies.
#
# This does not mean that must necessarily be one. E.g, the replies
# produced by gen_each_reply could ignore the received_msgs
#
# @param mth [Proc] generates the BiDi stream replies
# @param interception_ctx [InterceptionContext]
#
def run_server_bidi(mth, interception_ctx)
view = multi_req_view
bidi_call = BidiCall.new(
@call,
@marshal,
@unmarshal,
metadata_received: @metadata_received,
req_view: view
)
requests = bidi_call.read_next_loop(proc { set_input_stream_done }, false)
interception_ctx.intercept!(
:bidi_streamer,
call: view,
method: mth,
requests: requests
) do
bidi_call.run_on_server(mth, requests)
end
end
# Waits till an operation completes
def wait
return if @op_notifier.nil?
GRPC.logger.debug("active_call.wait: on #{@op_notifier}")
@op_notifier.wait
end
# Signals that an operation is done.
# Only relevant on the client-side (this is a no-op on the server-side)
def op_is_done
return if @op_notifier.nil?
@op_notifier.notify(self)
end
# Add to the metadata that will be sent from the server.
# Fails if metadata has already been sent.
# Unused by client calls.
def merge_metadata_to_send(new_metadata = {})
@send_initial_md_mutex.synchronize do
fail('cant change metadata after already sent') if @metadata_sent
@metadata_to_send.merge!(new_metadata)
end
end
def attach_peer_cert(peer_cert)
@peer_cert = peer_cert
end
private
# To be called once the "input stream" has been completelly
# read through (i.e, done reading from client or received status)
# note this is idempotent
def set_input_stream_done
@call_finished_mu.synchronize do
@input_stream_done = true
maybe_finish_and_close_call_locked
end
end
# To be called once the "output stream" has been completelly
# sent through (i.e, done sending from client or sent status)
# note this is idempotent
def set_output_stream_done
@call_finished_mu.synchronize do
@output_stream_done = true
maybe_finish_and_close_call_locked
end
end
def maybe_finish_and_close_call_locked
return unless @output_stream_done && @input_stream_done
return if @call_finished
@call_finished = true
op_is_done
@call.close
end
# Starts the call if not already started
# @param metadata [Hash] metadata to be sent to the server. If a value is
# a list, multiple metadata for its key are sent
def start_call(metadata = {})
merge_metadata_to_send(metadata) && send_initial_metadata
end
def raise_error_if_already_executed
@client_call_executed_mu.synchronize do
if @client_call_executed
fail GRPC::Core::CallError, 'attempting to re-run a call'
end
@client_call_executed = true
end
end
def self.view_class(*visible_methods)
Class.new do
extend ::Forwardable
def_delegators :@wrapped, *visible_methods
# @param wrapped [ActiveCall] the call whose methods are shielded
def initialize(wrapped)
@wrapped = wrapped
end
end
end
# SingleReqView limits access to an ActiveCall's methods for use in server
# handlers that receive just one request.
SingleReqView = view_class(:cancelled?, :deadline, :metadata,
:output_metadata, :peer, :peer_cert,
:send_initial_metadata,
:metadata_to_send,
:merge_metadata_to_send,
:metadata_sent)
# MultiReqView limits access to an ActiveCall's methods for use in
# server client_streamer handlers.
MultiReqView = view_class(:cancelled?, :deadline,
:each_remote_read, :metadata, :output_metadata,
:peer, :peer_cert,
:send_initial_metadata,
:metadata_to_send,
:merge_metadata_to_send,
:metadata_sent)
# Operation limits access to an ActiveCall's methods for use as
# a Operation on the client.
Operation = view_class(:cancel, :cancelled?, :deadline, :execute,
:metadata, :status, :start_call, :wait, :write_flag,
:write_flag=, :trailing_metadata)
# InterceptableView further limits access to an ActiveCall's methods
# for use in interceptors on the client, exposing only the deadline
InterceptableView = view_class(:deadline)
end
end