| # Copyright 2015 gRPC authors. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| require 'forwardable' |
| require_relative '../grpc' |
| |
| # GRPC contains the General RPC module. |
| module GRPC |
| # The BiDiCall class orchestrates execution of a BiDi stream on a client or |
| # server. |
| class BidiCall |
| include Core::CallOps |
| include Core::StatusCodes |
| include Core::TimeConsts |
| |
| # Creates a BidiCall. |
| # |
| # BidiCall should only be created after a call is accepted. That means |
| # different things on a client and a server. On the client, the call is |
| # accepted after call.invoke. On the server, this is after call.accept. |
| # |
| # #initialize cannot determine if the call is accepted or not; so if a |
| # call that's not accepted is used here, the error won't be visible until |
| # the BidiCall#run is called. |
| # |
| # deadline is the absolute deadline for the call. |
| # |
| # @param call [Call] the call used by the ActiveCall |
| # @param marshal [Function] f(obj)->string that marshal requests |
| # @param unmarshal [Function] f(string)->obj that unmarshals responses |
| # @param metadata_received [true|false] indicates if metadata has already |
| # been received. Should always be true for server calls |
| def initialize(call, marshal, unmarshal, metadata_received: false, |
| req_view: nil) |
| fail(ArgumentError, 'not a call') unless call.is_a? Core::Call |
| @call = call |
| @marshal = marshal |
| @op_notifier = nil # signals completion on clients |
| @unmarshal = unmarshal |
| @metadata_received = metadata_received |
| @reads_complete = false |
| @writes_complete = false |
| @complete = false |
| @done_mutex = Mutex.new |
| @req_view = req_view |
| end |
| |
| # Begins orchestration of the Bidi stream for a client sending requests. |
| # |
| # The method either returns an Enumerator of the responses, or accepts a |
| # block that can be invoked with each response. |
| # |
| # @param requests the Enumerable of requests to send |
| # @param set_input_stream_done [Proc] called back when we're done |
| # reading the input stream |
| # @param set_output_stream_done [Proc] called back when we're done |
| # sending data on the output stream |
| # @return an Enumerator of requests to yield |
| def run_on_client(requests, |
| set_input_stream_done, |
| set_output_stream_done, |
| &blk) |
| @enq_th = Thread.new do |
| write_loop(requests, set_output_stream_done: set_output_stream_done) |
| end |
| read_loop(set_input_stream_done, &blk) |
| end |
| |
| # Begins orchestration of the Bidi stream for a server generating replies. |
| # |
| # N.B. gen_each_reply is a func(Enumerable<Requests>) |
| # |
| # It takes an enumerable of requests as an arg, in case there is a |
| # relationship between the stream of requests and the stream of replies. |
| # |
| # This does not mean that must necessarily be one. E.g, the replies |
| # produced by gen_each_reply could ignore the received_msgs |
| # |
| # @param [Proc] gen_each_reply generates the BiDi stream replies. |
| # @param [Enumerable] requests The enumerable of requests to run |
| def run_on_server(gen_each_reply, requests) |
| replies = nil |
| |
| # Pass in the optional call object parameter if possible |
| if gen_each_reply.arity == 1 |
| replies = gen_each_reply.call(requests) |
| elsif gen_each_reply.arity == 2 |
| replies = gen_each_reply.call(requests, @req_view) |
| else |
| fail 'Illegal arity of reply generator' |
| end |
| |
| write_loop(replies, is_client: false) |
| end |
| |
| ## |
| # Read the next stream iteration |
| # |
| # @param [Proc] finalize_stream callback to call when the reads have been |
| # completely read through. |
| # @param [Boolean] is_client If this is a client or server request |
| # |
| def read_next_loop(finalize_stream, is_client = false) |
| read_loop(finalize_stream, is_client: is_client) |
| end |
| |
| private |
| |
| END_OF_READS = :end_of_reads |
| END_OF_WRITES = :end_of_writes |
| |
| # performs a read using @call.run_batch, ensures metadata is set up |
| def read_using_run_batch |
| ops = { RECV_MESSAGE => nil } |
| ops[RECV_INITIAL_METADATA] = nil unless @metadata_received |
| begin |
| batch_result = @call.run_batch(ops) |
| unless @metadata_received |
| @call.metadata = batch_result.metadata |
| @metadata_received = true |
| end |
| batch_result |
| rescue GRPC::Core::CallError => e |
| GRPC.logger.warn('bidi call: read_using_run_batch failed') |
| GRPC.logger.warn(e) |
| nil |
| end |
| end |
| |
| # set_output_stream_done is relevant on client-side |
| # rubocop:disable Metrics/PerceivedComplexity |
| def write_loop(requests, is_client: true, set_output_stream_done: nil) |
| GRPC::Core.fork_unsafe_begin |
| GRPC.logger.debug('bidi-write-loop: starting') |
| count = 0 |
| requests.each do |req| |
| GRPC.logger.debug("bidi-write-loop: #{count}") |
| count += 1 |
| payload = @marshal.call(req) |
| # Fails if status already received |
| begin |
| @req_view.send_initial_metadata unless @req_view.nil? |
| @call.run_batch(SEND_MESSAGE => payload) |
| rescue GRPC::Core::CallError => e |
| # This is almost definitely caused by a status arriving while still |
| # writing. Don't re-throw the error |
| GRPC.logger.warn('bidi-write-loop: ended with error') |
| GRPC.logger.warn(e) |
| break |
| end |
| end |
| GRPC.logger.debug("bidi-write-loop: #{count} writes done") |
| if is_client |
| GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting") |
| begin |
| @call.run_batch(SEND_CLOSE_FROM_CLIENT => nil) |
| rescue GRPC::Core::CallError => e |
| GRPC.logger.warn('bidi-write-loop: send close failed') |
| GRPC.logger.warn(e) |
| end |
| GRPC.logger.debug('bidi-write-loop: done') |
| end |
| GRPC.logger.debug('bidi-write-loop: finished') |
| rescue StandardError => e |
| GRPC.logger.warn('bidi-write-loop: failed') |
| GRPC.logger.warn(e) |
| if is_client |
| @call.cancel_with_status(GRPC::Core::StatusCodes::UNKNOWN, |
| "GRPC bidi call error: #{e.inspect}") |
| else |
| raise e |
| end |
| ensure |
| GRPC::Core.fork_unsafe_end |
| set_output_stream_done.call if is_client |
| end |
| # rubocop:enable Metrics/PerceivedComplexity |
| |
| # Provides an enumerator that yields results of remote reads |
| def read_loop(set_input_stream_done, is_client: true) |
| return enum_for(:read_loop, |
| set_input_stream_done, |
| is_client: is_client) unless block_given? |
| GRPC.logger.debug('bidi-read-loop: starting') |
| begin |
| count = 0 |
| # queue the initial read before beginning the loop |
| loop do |
| GRPC.logger.debug("bidi-read-loop: #{count}") |
| count += 1 |
| batch_result = read_using_run_batch |
| |
| # handle the next message |
| if batch_result.nil? || batch_result.message.nil? |
| GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}") |
| |
| if is_client |
| batch_result = @call.run_batch(RECV_STATUS_ON_CLIENT => nil) |
| @call.status = batch_result.status |
| @call.trailing_metadata = @call.status.metadata if @call.status |
| GRPC.logger.debug("bidi-read-loop: done status #{@call.status}") |
| batch_result.check_status |
| end |
| |
| GRPC.logger.debug('bidi-read-loop: done reading!') |
| break |
| end |
| |
| res = @unmarshal.call(batch_result.message) |
| yield res |
| end |
| rescue StandardError => e |
| GRPC.logger.warn('bidi: read-loop failed') |
| GRPC.logger.warn(e) |
| raise e |
| ensure |
| set_input_stream_done.call |
| end |
| GRPC.logger.debug('bidi-read-loop: finished') |
| # Make sure that the write loop is done before finishing the call. |
| # Note that blocking is ok at this point because we've already received |
| # a status |
| @enq_th.join if is_client |
| end |
| end |
| end |