|  | #region Copyright notice and license | 
|  |  | 
|  | // Copyright 2015-2016 gRPC authors. | 
|  | // | 
|  | // Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | // you may not use this file except in compliance with the License. | 
|  | // You may obtain a copy of the License at | 
|  | // | 
|  | //     http://www.apache.org/licenses/LICENSE-2.0 | 
|  | // | 
|  | // Unless required by applicable law or agreed to in writing, software | 
|  | // distributed under the License is distributed on an "AS IS" BASIS, | 
|  | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | // See the License for the specific language governing permissions and | 
|  | // limitations under the License. | 
|  |  | 
|  | #endregion | 
|  |  | 
|  | using System; | 
|  | using System.Collections.Generic; | 
|  | using System.Diagnostics; | 
|  | using System.Linq; | 
|  | using System.Threading; | 
|  | using System.Threading.Tasks; | 
|  |  | 
|  | using CommandLine; | 
|  | using CommandLine.Text; | 
|  | using Grpc.Core; | 
|  | using Grpc.Core.Logging; | 
|  | using Grpc.Core.Utils; | 
|  | using Grpc.Testing; | 
|  |  | 
|  | namespace Grpc.IntegrationTesting | 
|  | { | 
|  | public class StressTestClient | 
|  | { | 
|  | static readonly ILogger Logger = GrpcEnvironment.Logger.ForType<StressTestClient>(); | 
|  | const double SecondsToNanos = 1e9; | 
|  |  | 
|  | private class ClientOptions | 
|  | { | 
|  | [Option("server_addresses", Default = "localhost:8080")] | 
|  | public string ServerAddresses { get; set; } | 
|  |  | 
|  | [Option("test_cases", Default = "large_unary:100")] | 
|  | public string TestCases { get; set; } | 
|  |  | 
|  | [Option("test_duration_secs", Default = -1)] | 
|  | public int TestDurationSecs { get; set; } | 
|  |  | 
|  | [Option("num_channels_per_server", Default = 1)] | 
|  | public int NumChannelsPerServer { get; set; } | 
|  |  | 
|  | [Option("num_stubs_per_channel", Default = 1)] | 
|  | public int NumStubsPerChannel { get; set; } | 
|  |  | 
|  | [Option("metrics_port", Default = 8081)] | 
|  | public int MetricsPort { get; set; } | 
|  | } | 
|  |  | 
|  | ClientOptions options; | 
|  | List<string> serverAddresses; | 
|  | Dictionary<string, int> weightedTestCases; | 
|  | WeightedRandomGenerator testCaseGenerator; | 
|  |  | 
|  | // cancellation will be emitted once test_duration_secs has elapsed. | 
|  | CancellationTokenSource finishedTokenSource = new CancellationTokenSource(); | 
|  | Histogram histogram = new Histogram(0.01, 60 * SecondsToNanos); | 
|  |  | 
|  | private StressTestClient(ClientOptions options, List<string> serverAddresses, Dictionary<string, int> weightedTestCases) | 
|  | { | 
|  | this.options = options; | 
|  | this.serverAddresses = serverAddresses; | 
|  | this.weightedTestCases = weightedTestCases; | 
|  | this.testCaseGenerator = new WeightedRandomGenerator(this.weightedTestCases); | 
|  | } | 
|  |  | 
|  | public static void Run(string[] args) | 
|  | { | 
|  | GrpcEnvironment.SetLogger(new ConsoleLogger()); | 
|  | var parserResult = Parser.Default.ParseArguments<ClientOptions>(args) | 
|  | .WithNotParsed((x) => Environment.Exit(1)) | 
|  | .WithParsed(options => { | 
|  | GrpcPreconditions.CheckArgument(options.NumChannelsPerServer > 0); | 
|  | GrpcPreconditions.CheckArgument(options.NumStubsPerChannel > 0); | 
|  |  | 
|  | var serverAddresses = options.ServerAddresses.Split(','); | 
|  | GrpcPreconditions.CheckArgument(serverAddresses.Length > 0, "You need to provide at least one server address"); | 
|  |  | 
|  | var testCases = ParseWeightedTestCases(options.TestCases); | 
|  | GrpcPreconditions.CheckArgument(testCases.Count > 0, "You need to provide at least one test case"); | 
|  |  | 
|  | var interopClient = new StressTestClient(options, serverAddresses.ToList(), testCases); | 
|  | interopClient.Run().Wait(); | 
|  | }); | 
|  | } | 
|  |  | 
|  | async Task Run() | 
|  | { | 
|  | var metricsServer = new Server() | 
|  | { | 
|  | Services = { MetricsService.BindService(new MetricsServiceImpl(histogram)) }, | 
|  | Ports = { { "[::]", options.MetricsPort, ServerCredentials.Insecure } } | 
|  | }; | 
|  | metricsServer.Start(); | 
|  |  | 
|  | if (options.TestDurationSecs >= 0) | 
|  | { | 
|  | finishedTokenSource.CancelAfter(TimeSpan.FromSeconds(options.TestDurationSecs)); | 
|  | } | 
|  |  | 
|  | var tasks = new List<Task>(); | 
|  | var channels = new List<Channel>(); | 
|  | foreach (var serverAddress in serverAddresses) | 
|  | { | 
|  | for (int i = 0; i < options.NumChannelsPerServer; i++) | 
|  | { | 
|  | var channel = new Channel(serverAddress, ChannelCredentials.Insecure); | 
|  | channels.Add(channel); | 
|  | for (int j = 0; j < options.NumStubsPerChannel; j++) | 
|  | { | 
|  | var client = new TestService.TestServiceClient(channel); | 
|  | var task = Task.Factory.StartNew(() => RunBodyAsync(client).GetAwaiter().GetResult(), | 
|  | TaskCreationOptions.LongRunning); | 
|  | tasks.Add(task); | 
|  | } | 
|  | } | 
|  | } | 
|  | await Task.WhenAll(tasks); | 
|  |  | 
|  | foreach (var channel in channels) | 
|  | { | 
|  | await channel.ShutdownAsync(); | 
|  | } | 
|  |  | 
|  | await metricsServer.ShutdownAsync(); | 
|  | } | 
|  |  | 
|  | async Task RunBodyAsync(TestService.TestServiceClient client) | 
|  | { | 
|  | Logger.Info("Starting stress test client thread."); | 
|  | while (!finishedTokenSource.Token.IsCancellationRequested) | 
|  | { | 
|  | var testCase = testCaseGenerator.GetNext(); | 
|  |  | 
|  | var stopwatch = Stopwatch.StartNew(); | 
|  |  | 
|  | await RunTestCaseAsync(client, testCase); | 
|  |  | 
|  | stopwatch.Stop(); | 
|  | histogram.AddObservation(stopwatch.Elapsed.TotalSeconds * SecondsToNanos); | 
|  | } | 
|  | Logger.Info("Stress test client thread finished."); | 
|  | } | 
|  |  | 
|  | async Task RunTestCaseAsync(TestService.TestServiceClient client, string testCase) | 
|  | { | 
|  | switch (testCase) | 
|  | { | 
|  | case "empty_unary": | 
|  | InteropClient.RunEmptyUnary(client); | 
|  | break; | 
|  | case "large_unary": | 
|  | InteropClient.RunLargeUnary(client); | 
|  | break; | 
|  | case "client_streaming": | 
|  | await InteropClient.RunClientStreamingAsync(client); | 
|  | break; | 
|  | case "server_streaming": | 
|  | await InteropClient.RunServerStreamingAsync(client); | 
|  | break; | 
|  | case "ping_pong": | 
|  | await InteropClient.RunPingPongAsync(client); | 
|  | break; | 
|  | case "empty_stream": | 
|  | await InteropClient.RunEmptyStreamAsync(client); | 
|  | break; | 
|  | case "cancel_after_begin": | 
|  | await InteropClient.RunCancelAfterBeginAsync(client); | 
|  | break; | 
|  | case "cancel_after_first_response": | 
|  | await InteropClient.RunCancelAfterFirstResponseAsync(client); | 
|  | break; | 
|  | case "timeout_on_sleeping_server": | 
|  | await InteropClient.RunTimeoutOnSleepingServerAsync(client); | 
|  | break; | 
|  | case "custom_metadata": | 
|  | await InteropClient.RunCustomMetadataAsync(client); | 
|  | break; | 
|  | case "status_code_and_message": | 
|  | await InteropClient.RunStatusCodeAndMessageAsync(client); | 
|  | break; | 
|  | default: | 
|  | throw new ArgumentException("Unsupported test case  " + testCase); | 
|  | } | 
|  | } | 
|  |  | 
|  | static Dictionary<string, int> ParseWeightedTestCases(string weightedTestCases) | 
|  | { | 
|  | var result = new Dictionary<string, int>(); | 
|  | foreach (var weightedTestCase in weightedTestCases.Split(',')) | 
|  | { | 
|  | var parts = weightedTestCase.Split(new char[] {':'}, 2); | 
|  | GrpcPreconditions.CheckArgument(parts.Length == 2, "Malformed test_cases option."); | 
|  | result.Add(parts[0], int.Parse(parts[1])); | 
|  | } | 
|  | return result; | 
|  | } | 
|  |  | 
|  | class WeightedRandomGenerator | 
|  | { | 
|  | readonly Random random = new Random(); | 
|  | readonly List<Tuple<int, string>> cumulativeSums; | 
|  | readonly int weightSum; | 
|  |  | 
|  | public WeightedRandomGenerator(Dictionary<string, int> weightedItems) | 
|  | { | 
|  | cumulativeSums = new List<Tuple<int, string>>(); | 
|  | weightSum = 0; | 
|  | foreach (var entry in weightedItems) | 
|  | { | 
|  | weightSum += entry.Value; | 
|  | cumulativeSums.Add(Tuple.Create(weightSum, entry.Key)); | 
|  | } | 
|  | } | 
|  |  | 
|  | public string GetNext() | 
|  | { | 
|  | int rand = random.Next(weightSum); | 
|  | foreach (var entry in cumulativeSums) | 
|  | { | 
|  | if (rand < entry.Item1) | 
|  | { | 
|  | return entry.Item2; | 
|  | } | 
|  | } | 
|  | throw new InvalidOperationException("GetNext() failed."); | 
|  | } | 
|  | } | 
|  |  | 
|  | class MetricsServiceImpl : MetricsService.MetricsServiceBase | 
|  | { | 
|  | const string GaugeName = "csharp_overall_qps"; | 
|  |  | 
|  | readonly Histogram histogram; | 
|  | readonly WallClockStopwatch wallClockStopwatch = new WallClockStopwatch(); | 
|  |  | 
|  | public MetricsServiceImpl(Histogram histogram) | 
|  | { | 
|  | this.histogram = histogram; | 
|  | } | 
|  |  | 
|  | public override Task<GaugeResponse> GetGauge(GaugeRequest request, ServerCallContext context) | 
|  | { | 
|  | if (request.Name == GaugeName) | 
|  | { | 
|  | long qps = GetQpsAndReset(); | 
|  |  | 
|  | return Task.FromResult(new GaugeResponse | 
|  | { | 
|  | Name = GaugeName, | 
|  | LongValue = qps | 
|  | }); | 
|  | } | 
|  | throw new RpcException(new Status(StatusCode.InvalidArgument, "Gauge does not exist")); | 
|  | } | 
|  |  | 
|  | public override async Task GetAllGauges(EmptyMessage request, IServerStreamWriter<GaugeResponse> responseStream, ServerCallContext context) | 
|  | { | 
|  | long qps = GetQpsAndReset(); | 
|  |  | 
|  | var response = new GaugeResponse | 
|  | { | 
|  | Name = GaugeName, | 
|  | LongValue = qps | 
|  | }; | 
|  | await responseStream.WriteAsync(response); | 
|  | } | 
|  |  | 
|  | long GetQpsAndReset() | 
|  | { | 
|  | var snapshot = histogram.GetSnapshot(true); | 
|  | var elapsedSnapshot = wallClockStopwatch.GetElapsedSnapshot(true); | 
|  |  | 
|  | return (long) (snapshot.Count / elapsedSnapshot.TotalSeconds); | 
|  | } | 
|  | } | 
|  | } | 
|  | } |