|  | #!/usr/bin/env ruby | 
|  |  | 
|  | # Copyright 2016 gRPC authors. | 
|  | # | 
|  | # Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | # you may not use this file except in compliance with the License. | 
|  | # You may obtain a copy of the License at | 
|  | # | 
|  | #     http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | # Unless required by applicable law or agreed to in writing, software | 
|  | # distributed under the License is distributed on an "AS IS" BASIS, | 
|  | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | # See the License for the specific language governing permissions and | 
|  | # limitations under the License. | 
|  |  | 
|  | require 'optparse' | 
|  | require 'thread' | 
|  | require_relative '../pb/test/client' | 
|  | require_relative './metrics_server' | 
|  | require_relative '../lib/grpc' | 
|  |  | 
|  | class QpsGauge < Gauge | 
|  | @query_count | 
|  | @query_mutex | 
|  | @start_time | 
|  |  | 
|  | def initialize | 
|  | @query_count = 0 | 
|  | @query_mutex = Mutex.new | 
|  | @start_time = Time.now | 
|  | end | 
|  |  | 
|  | def increment_queries | 
|  | @query_mutex.synchronize { @query_count += 1} | 
|  | end | 
|  |  | 
|  | def get_name | 
|  | 'qps' | 
|  | end | 
|  |  | 
|  | def get_type | 
|  | 'long' | 
|  | end | 
|  |  | 
|  | def get_value | 
|  | (@query_mutex.synchronize { @query_count / (Time.now - @start_time) }).to_i | 
|  | end | 
|  | end | 
|  |  | 
|  | def start_metrics_server(port) | 
|  | host = "0.0.0.0:#{port}" | 
|  | server = GRPC::RpcServer.new | 
|  | server.add_http2_port(host, :this_port_is_insecure) | 
|  | service = MetricsServiceImpl.new | 
|  | server.handle(service) | 
|  | server_thread = Thread.new { server.run_till_terminated } | 
|  | [server, service, server_thread] | 
|  | end | 
|  |  | 
|  | StressArgs = Struct.new(:server_addresses, :test_cases, :duration, | 
|  | :channels_per_server, :concurrent_calls, :metrics_port) | 
|  |  | 
|  | def start(stress_args) | 
|  | running = true | 
|  | threads = [] | 
|  | qps_gauge = QpsGauge.new | 
|  | metrics_server, metrics_service, metrics_thread = | 
|  | start_metrics_server(stress_args.metrics_port) | 
|  | metrics_service.register_gauge(qps_gauge) | 
|  | stress_args.server_addresses.each do |address| | 
|  | stress_args.channels_per_server.times do | 
|  | client_args = Args.new | 
|  | client_args.host, client_args.port = address.split(':') | 
|  | client_args.secure = false | 
|  | client_args.test_case = '' | 
|  | stub = create_stub(client_args) | 
|  | named_tests = NamedTests.new(stub, client_args) | 
|  | stress_args.concurrent_calls.times do | 
|  | threads << Thread.new do | 
|  | while running | 
|  | named_tests.method(stress_args.test_cases.sample).call | 
|  | qps_gauge.increment_queries | 
|  | end | 
|  | end | 
|  | end | 
|  | end | 
|  | end | 
|  | if stress_args.duration >= 0 | 
|  | sleep stress_args.duration | 
|  | running = false | 
|  | metrics_server.stop | 
|  | p "QPS: #{qps_gauge.get_value}" | 
|  | threads.each { |thd| thd.join; } | 
|  | end | 
|  | metrics_thread.join | 
|  | end | 
|  |  | 
|  | def parse_stress_args | 
|  | stress_args = StressArgs.new | 
|  | stress_args.server_addresses = ['localhost:8080'] | 
|  | stress_args.test_cases = [] | 
|  | stress_args.duration = -1 | 
|  | stress_args.channels_per_server = 1 | 
|  | stress_args.concurrent_calls = 1 | 
|  | stress_args.metrics_port = '8081' | 
|  | OptionParser.new do |opts| | 
|  | opts.on('--server_addresses [LIST]', Array) do |addrs| | 
|  | stress_args.server_addresses = addrs | 
|  | end | 
|  | opts.on('--test_cases cases', Array) do |cases| | 
|  | stress_args.test_cases = (cases.map do |item| | 
|  | split = item.split(':') | 
|  | [split[0]] * split[1].to_i | 
|  | end).reduce([], :+) | 
|  | end | 
|  | opts.on('--test_duration_secs [INT]', OptionParser::DecimalInteger) do |time| | 
|  | stress_args.duration = time | 
|  | end | 
|  | opts.on('--num_channels_per_server [INT]', OptionParser::DecimalInteger) do |channels| | 
|  | stress_args.channels_per_server = channels | 
|  | end | 
|  | opts.on('--num_stubs_per_channel [INT]', OptionParser::DecimalInteger) do |stubs| | 
|  | stress_args.concurrent_calls = stubs | 
|  | end | 
|  | opts.on('--metrics_port [port]') do |port| | 
|  | stress_args.metrics_port = port | 
|  | end | 
|  | end.parse! | 
|  | stress_args | 
|  | end | 
|  |  | 
|  | def main | 
|  | opts = parse_stress_args | 
|  | start(opts) | 
|  | end | 
|  |  | 
|  | if __FILE__ == $0 | 
|  | main | 
|  | end |