blob: e20b8b662d0d75819e1d6166ce7905caabf9eedf [file] [log] [blame]
# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require 'forwardable'
require_relative '../grpc'
# GRPC contains the General RPC module.
module GRPC
# The BiDiCall class orchestrates execution of a BiDi stream on a client or
# server.
class BidiCall
include Core::CallOps
include Core::StatusCodes
include Core::TimeConsts
# Creates a BidiCall.
#
# BidiCall should only be created after a call is accepted. That means
# different things on a client and a server. On the client, the call is
# accepted after call.invoke. On the server, this is after call.accept.
#
# #initialize cannot determine if the call is accepted or not; so if a
# call that's not accepted is used here, the error won't be visible until
# the BidiCall#run is called.
#
# deadline is the absolute deadline for the call.
#
# @param call [Call] the call used by the ActiveCall
# @param marshal [Function] f(obj)->string that marshal requests
# @param unmarshal [Function] f(string)->obj that unmarshals responses
# @param metadata_received [true|false] indicates if metadata has already
# been received. Should always be true for server calls
def initialize(call, marshal, unmarshal, metadata_received: false,
req_view: nil)
fail(ArgumentError, 'not a call') unless call.is_a? Core::Call
@call = call
@marshal = marshal
@op_notifier = nil # signals completion on clients
@unmarshal = unmarshal
@metadata_received = metadata_received
@reads_complete = false
@writes_complete = false
@complete = false
@done_mutex = Mutex.new
@req_view = req_view
end
# Begins orchestration of the Bidi stream for a client sending requests.
#
# The method either returns an Enumerator of the responses, or accepts a
# block that can be invoked with each response.
#
# @param requests the Enumerable of requests to send
# @param set_input_stream_done [Proc] called back when we're done
# reading the input stream
# @param set_output_stream_done [Proc] called back when we're done
# sending data on the output stream
# @return an Enumerator of requests to yield
def run_on_client(requests,
set_input_stream_done,
set_output_stream_done,
&blk)
@enq_th = Thread.new do
write_loop(requests, set_output_stream_done: set_output_stream_done)
end
read_loop(set_input_stream_done, &blk)
end
# Begins orchestration of the Bidi stream for a server generating replies.
#
# N.B. gen_each_reply is a func(Enumerable<Requests>)
#
# It takes an enumerable of requests as an arg, in case there is a
# relationship between the stream of requests and the stream of replies.
#
# This does not mean that must necessarily be one. E.g, the replies
# produced by gen_each_reply could ignore the received_msgs
#
# @param [Proc] gen_each_reply generates the BiDi stream replies.
# @param [Enumerable] requests The enumerable of requests to run
def run_on_server(gen_each_reply, requests)
replies = nil
# Pass in the optional call object parameter if possible
if gen_each_reply.arity == 1
replies = gen_each_reply.call(requests)
elsif gen_each_reply.arity == 2
replies = gen_each_reply.call(requests, @req_view)
else
fail 'Illegal arity of reply generator'
end
write_loop(replies, is_client: false)
end
##
# Read the next stream iteration
#
# @param [Proc] finalize_stream callback to call when the reads have been
# completely read through.
# @param [Boolean] is_client If this is a client or server request
#
def read_next_loop(finalize_stream, is_client = false)
read_loop(finalize_stream, is_client: is_client)
end
private
END_OF_READS = :end_of_reads
END_OF_WRITES = :end_of_writes
# performs a read using @call.run_batch, ensures metadata is set up
def read_using_run_batch
ops = { RECV_MESSAGE => nil }
ops[RECV_INITIAL_METADATA] = nil unless @metadata_received
begin
batch_result = @call.run_batch(ops)
unless @metadata_received
@call.metadata = batch_result.metadata
@metadata_received = true
end
batch_result
rescue GRPC::Core::CallError => e
GRPC.logger.warn('bidi call: read_using_run_batch failed')
GRPC.logger.warn(e)
nil
end
end
# set_output_stream_done is relevant on client-side
# rubocop:disable Metrics/PerceivedComplexity
def write_loop(requests, is_client: true, set_output_stream_done: nil)
GRPC::Core.fork_unsafe_begin
GRPC.logger.debug('bidi-write-loop: starting')
count = 0
requests.each do |req|
GRPC.logger.debug("bidi-write-loop: #{count}")
count += 1
payload = @marshal.call(req)
# Fails if status already received
begin
@req_view.send_initial_metadata unless @req_view.nil?
@call.run_batch(SEND_MESSAGE => payload)
rescue GRPC::Core::CallError => e
# This is almost definitely caused by a status arriving while still
# writing. Don't re-throw the error
GRPC.logger.warn('bidi-write-loop: ended with error')
GRPC.logger.warn(e)
break
end
end
GRPC.logger.debug("bidi-write-loop: #{count} writes done")
if is_client
GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting")
begin
@call.run_batch(SEND_CLOSE_FROM_CLIENT => nil)
rescue GRPC::Core::CallError => e
GRPC.logger.warn('bidi-write-loop: send close failed')
GRPC.logger.warn(e)
end
GRPC.logger.debug('bidi-write-loop: done')
end
GRPC.logger.debug('bidi-write-loop: finished')
rescue StandardError => e
GRPC.logger.warn('bidi-write-loop: failed')
GRPC.logger.warn(e)
if is_client
@call.cancel_with_status(GRPC::Core::StatusCodes::UNKNOWN,
"GRPC bidi call error: #{e.inspect}")
else
raise e
end
ensure
GRPC::Core.fork_unsafe_end
set_output_stream_done.call if is_client
end
# rubocop:enable Metrics/PerceivedComplexity
# Provides an enumerator that yields results of remote reads
def read_loop(set_input_stream_done, is_client: true)
return enum_for(:read_loop,
set_input_stream_done,
is_client: is_client) unless block_given?
GRPC.logger.debug('bidi-read-loop: starting')
begin
count = 0
# queue the initial read before beginning the loop
loop do
GRPC.logger.debug("bidi-read-loop: #{count}")
count += 1
batch_result = read_using_run_batch
# handle the next message
if batch_result.nil? || batch_result.message.nil?
GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}")
if is_client
batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil)
@call.status = batch_result.status
@call.trailing_metadata = @call.status.metadata if @call.status
GRPC.logger.debug("bidi-read-loop: done status #{@call.status}")
batch_result.check_status
end
GRPC.logger.debug('bidi-read-loop: done reading!')
break
end
res = @unmarshal.call(batch_result.message)
yield res
end
rescue StandardError => e
GRPC.logger.warn('bidi: read-loop failed')
GRPC.logger.warn(e)
raise e
ensure
set_input_stream_done.call
end
GRPC.logger.debug('bidi-read-loop: finished')
# Make sure that the write loop is done before finishing the call.
# Note that blocking is ok at this point because we've already received
# a status
@enq_th.join if is_client
end
end
end