blob: 2e6ddf69dd8361e9c0d7341a7886d5c6f0ab00a6 [file] [log] [blame]
/*
*
* Copyright 2015 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#import "InteropTests.h"
#include <grpc/status.h>
#import <GRPCClient/GRPCCall+ChannelArg.h>
#import <GRPCClient/GRPCCall+Cronet.h>
#import <GRPCClient/GRPCCall+Interceptor.h>
#import <GRPCClient/GRPCCall+Tests.h>
#import <GRPCClient/GRPCInterceptor.h>
#import <GRPCClient/internal_testing/GRPCCall+InternalTests.h>
#import <ProtoRPC/ProtoRPC.h>
#import <RxLibrary/GRXBufferedPipe.h>
#import <RxLibrary/GRXWriter+Immediate.h>
#import <grpc/grpc.h>
#import <grpc/support/log.h>
#import "src/objective-c/tests/RemoteTestClient/Messages.pbobjc.h"
#import "src/objective-c/tests/RemoteTestClient/Test.pbobjc.h"
#import "src/objective-c/tests/RemoteTestClient/Test.pbrpc.h"
#import "../Common/TestUtils.h"
#import "InteropTestsBlockCallbacks.h"
#define SMALL_PAYLOAD_SIZE 10
#define LARGE_REQUEST_PAYLOAD_SIZE 271828
#define LARGE_RESPONSE_PAYLOAD_SIZE 314159
static const int kTestRetries = 3;
extern const char *kCFStreamVarName;
@interface RMTStreamingOutputCallRequest (Constructors)
+ (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize
requestedResponseSize:(NSNumber *)responseSize;
@end
@implementation RMTStreamingOutputCallRequest (Constructors)
+ (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize
requestedResponseSize:(NSNumber *)responseSize {
RMTStreamingOutputCallRequest *request = [self message];
RMTResponseParameters *parameters = [RMTResponseParameters message];
parameters.size = responseSize.intValue;
[request.responseParametersArray addObject:parameters];
request.payload.body = [NSMutableData dataWithLength:payloadSize.unsignedIntegerValue];
return request;
}
@end
@interface RMTStreamingOutputCallResponse (Constructors)
+ (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize;
@end
@implementation RMTStreamingOutputCallResponse (Constructors)
+ (instancetype)messageWithPayloadSize:(NSNumber *)payloadSize {
RMTStreamingOutputCallResponse *response = [self message];
response.payload.type = RMTPayloadType_Compressable;
response.payload.body = [NSMutableData dataWithLength:payloadSize.unsignedIntegerValue];
return response;
}
@end
BOOL isRemoteInteropTest(NSString *host) {
return [host isEqualToString:@"grpc-test.sandbox.googleapis.com"];
}
@interface DefaultInterceptorFactory : NSObject <GRPCInterceptorFactory>
- (GRPCInterceptor *)createInterceptorWithManager:(GRPCInterceptorManager *)interceptorManager;
@end
@implementation DefaultInterceptorFactory
- (GRPCInterceptor *)createInterceptorWithManager:(GRPCInterceptorManager *)interceptorManager {
dispatch_queue_t queue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
return [[GRPCInterceptor alloc] initWithInterceptorManager:interceptorManager
dispatchQueue:queue];
}
@end
@interface HookInterceptorFactory : NSObject <GRPCInterceptorFactory>
- (instancetype)
initWithDispatchQueue:(dispatch_queue_t)dispatchQueue
startHook:(void (^)(GRPCRequestOptions *requestOptions,
GRPCCallOptions *callOptions,
GRPCInterceptorManager *manager))startHook
writeDataHook:(void (^)(id data, GRPCInterceptorManager *manager))writeDataHook
finishHook:(void (^)(GRPCInterceptorManager *manager))finishHook
receiveNextMessagesHook:(void (^)(NSUInteger numberOfMessages,
GRPCInterceptorManager *manager))receiveNextMessagesHook
responseHeaderHook:(void (^)(NSDictionary *initialMetadata,
GRPCInterceptorManager *manager))responseHeaderHook
responseDataHook:(void (^)(id data, GRPCInterceptorManager *manager))responseDataHook
responseCloseHook:(void (^)(NSDictionary *trailingMetadata, NSError *error,
GRPCInterceptorManager *manager))responseCloseHook
didWriteDataHook:(void (^)(GRPCInterceptorManager *manager))didWriteDataHook;
- (GRPCInterceptor *)createInterceptorWithManager:(GRPCInterceptorManager *)interceptorManager;
@end
@interface HookInterceptor : GRPCInterceptor
- (instancetype)
initWithInterceptorManager:(GRPCInterceptorManager *)interceptorManager
dispatchQueue:(dispatch_queue_t)dispatchQueue
startHook:(void (^)(GRPCRequestOptions *requestOptions,
GRPCCallOptions *callOptions,
GRPCInterceptorManager *manager))startHook
writeDataHook:(void (^)(id data, GRPCInterceptorManager *manager))writeDataHook
finishHook:(void (^)(GRPCInterceptorManager *manager))finishHook
receiveNextMessagesHook:(void (^)(NSUInteger numberOfMessages,
GRPCInterceptorManager *manager))receiveNextMessagesHook
responseHeaderHook:(void (^)(NSDictionary *initialMetadata,
GRPCInterceptorManager *manager))responseHeaderHook
responseDataHook:(void (^)(id data, GRPCInterceptorManager *manager))responseDataHook
responseCloseHook:(void (^)(NSDictionary *trailingMetadata, NSError *error,
GRPCInterceptorManager *manager))responseCloseHook
didWriteDataHook:(void (^)(GRPCInterceptorManager *manager))didWriteDataHook;
@end
@implementation HookInterceptorFactory {
@protected
void (^_startHook)(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions,
GRPCInterceptorManager *manager);
void (^_writeDataHook)(id data, GRPCInterceptorManager *manager);
void (^_finishHook)(GRPCInterceptorManager *manager);
void (^_receiveNextMessagesHook)(NSUInteger numberOfMessages, GRPCInterceptorManager *manager);
void (^_responseHeaderHook)(NSDictionary *initialMetadata, GRPCInterceptorManager *manager);
void (^_responseDataHook)(id data, GRPCInterceptorManager *manager);
void (^_responseCloseHook)(NSDictionary *trailingMetadata, NSError *error,
GRPCInterceptorManager *manager);
void (^_didWriteDataHook)(GRPCInterceptorManager *manager);
dispatch_queue_t _dispatchQueue;
}
- (instancetype)
initWithDispatchQueue:(dispatch_queue_t)dispatchQueue
startHook:(void (^)(GRPCRequestOptions *requestOptions,
GRPCCallOptions *callOptions,
GRPCInterceptorManager *manager))startHook
writeDataHook:(void (^)(id data, GRPCInterceptorManager *manager))writeDataHook
finishHook:(void (^)(GRPCInterceptorManager *manager))finishHook
receiveNextMessagesHook:(void (^)(NSUInteger numberOfMessages,
GRPCInterceptorManager *manager))receiveNextMessagesHook
responseHeaderHook:(void (^)(NSDictionary *initialMetadata,
GRPCInterceptorManager *manager))responseHeaderHook
responseDataHook:(void (^)(id data, GRPCInterceptorManager *manager))responseDataHook
responseCloseHook:(void (^)(NSDictionary *trailingMetadata, NSError *error,
GRPCInterceptorManager *manager))responseCloseHook
didWriteDataHook:(void (^)(GRPCInterceptorManager *manager))didWriteDataHook {
if ((self = [super init])) {
_dispatchQueue = dispatchQueue;
_startHook = startHook;
_writeDataHook = writeDataHook;
_finishHook = finishHook;
_receiveNextMessagesHook = receiveNextMessagesHook;
_responseHeaderHook = responseHeaderHook;
_responseDataHook = responseDataHook;
_responseCloseHook = responseCloseHook;
_didWriteDataHook = didWriteDataHook;
}
return self;
}
- (GRPCInterceptor *)createInterceptorWithManager:(GRPCInterceptorManager *)interceptorManager {
return [[HookInterceptor alloc] initWithInterceptorManager:interceptorManager
dispatchQueue:_dispatchQueue
startHook:_startHook
writeDataHook:_writeDataHook
finishHook:_finishHook
receiveNextMessagesHook:_receiveNextMessagesHook
responseHeaderHook:_responseHeaderHook
responseDataHook:_responseDataHook
responseCloseHook:_responseCloseHook
didWriteDataHook:_didWriteDataHook];
}
@end
@implementation HookInterceptor {
void (^_startHook)(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions,
GRPCInterceptorManager *manager);
void (^_writeDataHook)(id data, GRPCInterceptorManager *manager);
void (^_finishHook)(GRPCInterceptorManager *manager);
void (^_receiveNextMessagesHook)(NSUInteger numberOfMessages, GRPCInterceptorManager *manager);
void (^_responseHeaderHook)(NSDictionary *initialMetadata, GRPCInterceptorManager *manager);
void (^_responseDataHook)(id data, GRPCInterceptorManager *manager);
void (^_responseCloseHook)(NSDictionary *trailingMetadata, NSError *error,
GRPCInterceptorManager *manager);
void (^_didWriteDataHook)(GRPCInterceptorManager *manager);
GRPCInterceptorManager *_manager;
dispatch_queue_t _dispatchQueue;
}
- (dispatch_queue_t)dispatchQueue {
return _dispatchQueue;
}
- (instancetype)
initWithInterceptorManager:(GRPCInterceptorManager *)interceptorManager
dispatchQueue:(dispatch_queue_t)dispatchQueue
startHook:(void (^)(GRPCRequestOptions *requestOptions,
GRPCCallOptions *callOptions,
GRPCInterceptorManager *manager))startHook
writeDataHook:(void (^)(id data, GRPCInterceptorManager *manager))writeDataHook
finishHook:(void (^)(GRPCInterceptorManager *manager))finishHook
receiveNextMessagesHook:(void (^)(NSUInteger numberOfMessages,
GRPCInterceptorManager *manager))receiveNextMessagesHook
responseHeaderHook:(void (^)(NSDictionary *initialMetadata,
GRPCInterceptorManager *manager))responseHeaderHook
responseDataHook:(void (^)(id data, GRPCInterceptorManager *manager))responseDataHook
responseCloseHook:(void (^)(NSDictionary *trailingMetadata, NSError *error,
GRPCInterceptorManager *manager))responseCloseHook
didWriteDataHook:(void (^)(GRPCInterceptorManager *manager))didWriteDataHook {
if ((self = [super initWithInterceptorManager:interceptorManager dispatchQueue:dispatchQueue])) {
_startHook = startHook;
_writeDataHook = writeDataHook;
_finishHook = finishHook;
_receiveNextMessagesHook = receiveNextMessagesHook;
_responseHeaderHook = responseHeaderHook;
_responseDataHook = responseDataHook;
_responseCloseHook = responseCloseHook;
_didWriteDataHook = didWriteDataHook;
_dispatchQueue = dispatchQueue;
_manager = interceptorManager;
}
return self;
}
- (void)startWithRequestOptions:(GRPCRequestOptions *)requestOptions
callOptions:(GRPCCallOptions *)callOptions {
if (_startHook) {
_startHook(requestOptions, callOptions, _manager);
}
}
- (void)writeData:(id)data {
if (_writeDataHook) {
_writeDataHook(data, _manager);
}
}
- (void)finish {
if (_finishHook) {
_finishHook(_manager);
}
}
- (void)receiveNextMessages:(NSUInteger)numberOfMessages {
if (_receiveNextMessagesHook) {
_receiveNextMessagesHook(numberOfMessages, _manager);
}
}
- (void)didReceiveInitialMetadata:(NSDictionary *)initialMetadata {
if (_responseHeaderHook) {
_responseHeaderHook(initialMetadata, _manager);
}
}
- (void)didReceiveData:(id)data {
if (_responseDataHook) {
_responseDataHook(data, _manager);
}
}
- (void)didCloseWithTrailingMetadata:(NSDictionary *)trailingMetadata error:(NSError *)error {
if (_responseCloseHook) {
_responseCloseHook(trailingMetadata, error, _manager);
}
}
- (void)didWriteData {
if (_didWriteDataHook) {
_didWriteDataHook(_manager);
}
}
@end
@interface GlobalInterceptorFactory : HookInterceptorFactory
@property BOOL enabled;
- (instancetype)initWithDispatchQueue:(dispatch_queue_t)dispatchQueue;
- (void)setStartHook:(void (^)(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions,
GRPCInterceptorManager *manager))startHook
writeDataHook:(void (^)(id data, GRPCInterceptorManager *manager))writeDataHook
finishHook:(void (^)(GRPCInterceptorManager *manager))finishHook
receiveNextMessagesHook:(void (^)(NSUInteger numberOfMessages,
GRPCInterceptorManager *manager))receiveNextMessagesHook
responseHeaderHook:(void (^)(NSDictionary *initialMetadata,
GRPCInterceptorManager *manager))responseHeaderHook
responseDataHook:(void (^)(id data, GRPCInterceptorManager *manager))responseDataHook
responseCloseHook:(void (^)(NSDictionary *trailingMetadata, NSError *error,
GRPCInterceptorManager *manager))responseCloseHook
didWriteDataHook:(void (^)(GRPCInterceptorManager *manager))didWriteDataHook;
@end
@implementation GlobalInterceptorFactory
- (instancetype)initWithDispatchQueue:(dispatch_queue_t)dispatchQueue {
_enabled = NO;
return [super initWithDispatchQueue:dispatchQueue
startHook:nil
writeDataHook:nil
finishHook:nil
receiveNextMessagesHook:nil
responseHeaderHook:nil
responseDataHook:nil
responseCloseHook:nil
didWriteDataHook:nil];
}
- (GRPCInterceptor *)createInterceptorWithManager:(GRPCInterceptorManager *)interceptorManager {
if (_enabled) {
return [[HookInterceptor alloc] initWithInterceptorManager:interceptorManager
dispatchQueue:_dispatchQueue
startHook:_startHook
writeDataHook:_writeDataHook
finishHook:_finishHook
receiveNextMessagesHook:_receiveNextMessagesHook
responseHeaderHook:_responseHeaderHook
responseDataHook:_responseDataHook
responseCloseHook:_responseCloseHook
didWriteDataHook:_didWriteDataHook];
} else {
return nil;
}
}
- (void)setStartHook:(void (^)(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions,
GRPCInterceptorManager *manager))startHook
writeDataHook:(void (^)(id data, GRPCInterceptorManager *manager))writeDataHook
finishHook:(void (^)(GRPCInterceptorManager *manager))finishHook
receiveNextMessagesHook:(void (^)(NSUInteger numberOfMessages,
GRPCInterceptorManager *manager))receiveNextMessagesHook
responseHeaderHook:(void (^)(NSDictionary *initialMetadata,
GRPCInterceptorManager *manager))responseHeaderHook
responseDataHook:(void (^)(id data, GRPCInterceptorManager *manager))responseDataHook
responseCloseHook:(void (^)(NSDictionary *trailingMetadata, NSError *error,
GRPCInterceptorManager *manager))responseCloseHook
didWriteDataHook:(void (^)(GRPCInterceptorManager *manager))didWriteDataHook {
_startHook = startHook;
_writeDataHook = writeDataHook;
_finishHook = finishHook;
_receiveNextMessagesHook = receiveNextMessagesHook;
_responseHeaderHook = responseHeaderHook;
_responseDataHook = responseDataHook;
_responseCloseHook = responseCloseHook;
_didWriteDataHook = didWriteDataHook;
}
@end
static GlobalInterceptorFactory *globalInterceptorFactory = nil;
static dispatch_once_t initGlobalInterceptorFactory;
#pragma mark Tests
@implementation InteropTests
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Warc-performSelector-leaks"
- (void)retriableTest:(SEL)selector retries:(int)retries timeout:(NSTimeInterval)timeout {
for (int i = 0; i < retries; i++) {
NSDate *waitUntil = [[NSDate date] dateByAddingTimeInterval:timeout];
NSCondition *cv = [[NSCondition alloc] init];
__block BOOL done = NO;
[cv lock];
dispatch_async(dispatch_get_global_queue(QOS_CLASS_BACKGROUND, 0), ^{
[self performSelector:selector];
[cv lock];
done = YES;
[cv signal];
[cv unlock];
});
while (!done && [waitUntil timeIntervalSinceNow] > 0) {
[cv waitUntilDate:waitUntil];
}
if (done) {
[cv unlock];
break;
} else {
[cv unlock];
[self tearDown];
[self setUp];
}
}
}
#pragma clang diagnostic pop
+ (XCTestSuite *)defaultTestSuite {
if (self == [InteropTests class]) {
return [XCTestSuite testSuiteWithName:@"InteropTestsEmptySuite"];
}
return super.defaultTestSuite;
}
+ (NSString *)host {
return nil;
}
// This number indicates how many bytes of overhead does Protocol Buffers encoding add onto the
// message. The number varies as different message.proto is used on different servers. The actual
// number for each interop server is overridden in corresponding derived test classes.
- (int32_t)encodingOverhead {
return 0;
}
// For backwards compatibility
+ (GRPCTransportType)transportType {
return GRPCTransportTypeChttp2BoringSSL;
}
+ (GRPCTransportID)transport {
return NULL;
}
+ (NSString *)PEMRootCertificates {
return nil;
}
+ (NSString *)hostNameOverride {
return nil;
}
+ (void)setUp {
dispatch_once(&initGlobalInterceptorFactory, ^{
dispatch_queue_t globalInterceptorQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
globalInterceptorFactory =
[[GlobalInterceptorFactory alloc] initWithDispatchQueue:globalInterceptorQueue];
[GRPCCall2 registerGlobalInterceptor:globalInterceptorFactory];
});
}
- (void)setUp {
self.continueAfterFailure = YES;
[GRPCCall resetHostSettings];
GRPCResetCallConnections();
XCTAssertNotNil([[self class] host]);
}
- (void)testEmptyUnaryRPC {
GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiter, GRPCTestAssert assertBlock) {
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
__weak XCTestExpectation *expectation = [self expectationWithDescription:@"EmptyUnary"];
GPBEmpty *request = [GPBEmpty message];
__weak RMTTestService *weakService = service;
[service emptyCallWithRequest:request
handler:^(GPBEmpty *response, NSError *error) {
if (weakService == nil) {
return;
}
XCTAssertNil(error, @"Finished with unexpected error: %@", error);
id expectedResponse = [GPBEmpty message];
XCTAssertEqualObjects(response, expectedResponse);
[expectation fulfill];
}];
waiter(@[ expectation ], GRPCInteropTestTimeoutDefault);
});
}
- (void)testEmptyUnaryRPCWithV2API {
GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
__weak XCTestExpectation *expectReceive =
[self expectationWithDescription:@"EmptyUnaryWithV2API received message"];
__weak XCTestExpectation *expectComplete =
[self expectationWithDescription:@"EmptyUnaryWithV2API completed"];
GPBEmpty *request = [GPBEmpty message];
GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
// For backwards compatibility
options.transportType = [[self class] transportType];
options.transport = [[self class] transport];
options.PEMRootCertificates = [[self class] PEMRootCertificates];
options.hostNameOverride = [[self class] hostNameOverride];
__weak RMTTestService *weakService = service;
GRPCUnaryProtoCall *call = [service
emptyCallWithMessage:request
responseHandler:[[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil
messageCallback:^(id message) {
if (weakService == nil) {
return;
}
if (message) {
id expectedResponse = [GPBEmpty message];
XCTAssertEqualObjects(message, expectedResponse);
[expectReceive fulfill];
}
}
closeCallback:^(NSDictionary *trailingMetadata, NSError *error) {
if (weakService == nil) {
return;
}
XCTAssertNil(error, @"Unexpected error: %@", error);
[expectComplete fulfill];
}]
callOptions:options];
[call start];
waiterBlock(@[ expectReceive, expectComplete ], GRPCInteropTestTimeoutDefault);
});
}
// Test that responses can be dispatched even if we do not run main run-loop
- (void)testAsyncDispatchWithV2API {
GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
XCTestExpectation *receiveExpect = [self expectationWithDescription:@"receiveExpect"];
XCTestExpectation *closeExpect = [self expectationWithDescription:@"closeExpect"];
GPBEmpty *request = [GPBEmpty message];
GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
// For backwards compatibility
options.transportType = [[self class] transportType];
options.transport = [[self class] transport];
options.PEMRootCertificates = [[self class] PEMRootCertificates];
options.hostNameOverride = [[self class] hostNameOverride];
__block BOOL messageReceived = NO;
__block BOOL done = NO;
__weak RMTTestService *weakService = service;
GRPCUnaryProtoCall *call = [service
emptyCallWithMessage:request
responseHandler:[[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil
messageCallback:^(id message) {
if (weakService == nil) {
return;
}
if (message) {
id expectedResponse = [GPBEmpty message];
XCTAssertEqualObjects(message, expectedResponse);
messageReceived = YES;
}
[receiveExpect fulfill];
}
closeCallback:^(NSDictionary *trailingMetadata, NSError *error) {
if (weakService == nil) {
return;
}
XCTAssertNil(error, @"Unexpected error: %@", error);
done = YES;
[closeExpect fulfill];
}]
callOptions:options];
[call start];
waiterBlock(@[ receiveExpect, closeExpect ], GRPCInteropTestTimeoutDefault);
XCTAssertTrue(messageReceived);
XCTAssertTrue(done);
});
}
- (void)testLargeUnaryRPC {
GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
__weak XCTestExpectation *expectation = [self expectationWithDescription:@"LargeUnary"];
RMTSimpleRequest *request = [RMTSimpleRequest message];
request.responseType = RMTPayloadType_Compressable;
request.responseSize = LARGE_RESPONSE_PAYLOAD_SIZE;
request.payload.body = [NSMutableData dataWithLength:LARGE_REQUEST_PAYLOAD_SIZE];
__weak RMTTestService *weakService = service;
[service unaryCallWithRequest:request
handler:^(RMTSimpleResponse *response, NSError *error) {
if (weakService == nil) {
return;
}
XCTAssertNil(error, @"Finished with unexpected error: %@", error);
RMTSimpleResponse *expectedResponse = [RMTSimpleResponse message];
expectedResponse.payload.type = RMTPayloadType_Compressable;
expectedResponse.payload.body =
[NSMutableData dataWithLength:LARGE_RESPONSE_PAYLOAD_SIZE];
XCTAssertEqualObjects(response, expectedResponse);
[expectation fulfill];
}];
waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
});
}
- (void)testUnaryResponseHandler {
// The test does not work on a remote server since it does not echo a trailer
if ([[self class] isRemoteTest]) {
return;
}
GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
XCTestExpectation *expectComplete = [self expectationWithDescription:@"call complete"];
XCTestExpectation *expectCompleteMainQueue =
[self expectationWithDescription:@"main queue call complete"];
RMTSimpleRequest *request = [RMTSimpleRequest message];
request.responseType = RMTPayloadType_Compressable;
request.responseSize = LARGE_RESPONSE_PAYLOAD_SIZE;
request.payload.body = [NSMutableData dataWithLength:LARGE_REQUEST_PAYLOAD_SIZE];
GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
// For backwards compatibility
options.transportType = [[self class] transportType];
options.transport = [[self class] transport];
options.PEMRootCertificates = [[self class] PEMRootCertificates];
options.hostNameOverride = [[self class] hostNameOverride];
const unsigned char raw_bytes[] = {1, 2, 3, 4};
NSData *trailer_data = [NSData dataWithBytes:raw_bytes length:sizeof(raw_bytes)];
options.initialMetadata = @{
@"x-grpc-test-echo-trailing-bin" : trailer_data,
@"x-grpc-test-echo-initial" : @"test-header"
};
__weak RMTTestService *weakService = service;
__block GRPCUnaryResponseHandler *handler = [[GRPCUnaryResponseHandler alloc]
initWithResponseHandler:^(GPBMessage *response, NSError *error) {
if (weakService == nil) {
return;
}
XCTAssertNil(error, @"Unexpected error: %@", error);
RMTSimpleResponse *expectedResponse = [RMTSimpleResponse message];
expectedResponse.payload.type = RMTPayloadType_Compressable;
expectedResponse.payload.body =
[NSMutableData dataWithLength:LARGE_RESPONSE_PAYLOAD_SIZE];
XCTAssertEqualObjects(response, expectedResponse);
XCTAssertEqualObjects(handler.responseHeaders[@"x-grpc-test-echo-initial"],
@"test-header");
XCTAssertEqualObjects(handler.responseTrailers[@"x-grpc-test-echo-trailing-bin"],
trailer_data);
[expectComplete fulfill];
}
responseDispatchQueue:dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL)];
__block GRPCUnaryResponseHandler *handlerMainQueue = [[GRPCUnaryResponseHandler alloc]
initWithResponseHandler:^(GPBMessage *response, NSError *error) {
if (weakService == nil) {
return;
}
XCTAssertNil(error, @"Unexpected error: %@", error);
RMTSimpleResponse *expectedResponse = [RMTSimpleResponse message];
expectedResponse.payload.type = RMTPayloadType_Compressable;
expectedResponse.payload.body =
[NSMutableData dataWithLength:LARGE_RESPONSE_PAYLOAD_SIZE];
XCTAssertEqualObjects(response, expectedResponse);
XCTAssertEqualObjects(handlerMainQueue.responseHeaders[@"x-grpc-test-echo-initial"],
@"test-header");
XCTAssertEqualObjects(handlerMainQueue.responseTrailers[@"x-grpc-test-echo-trailing-bin"],
trailer_data);
[expectCompleteMainQueue fulfill];
}
responseDispatchQueue:nil];
[[service unaryCallWithMessage:request responseHandler:handler callOptions:options] start];
[[service unaryCallWithMessage:request responseHandler:handlerMainQueue
callOptions:options] start];
waiterBlock(@[ expectComplete, expectCompleteMainQueue ], GRPCInteropTestTimeoutDefault);
});
}
- (void)testLargeUnaryRPCWithV2API {
GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
__weak XCTestExpectation *expectReceive =
[self expectationWithDescription:@"LargeUnaryWithV2API received message"];
__weak XCTestExpectation *expectComplete =
[self expectationWithDescription:@"LargeUnaryWithV2API received complete"];
RMTSimpleRequest *request = [RMTSimpleRequest message];
request.responseType = RMTPayloadType_Compressable;
request.responseSize = LARGE_RESPONSE_PAYLOAD_SIZE;
request.payload.body = [NSMutableData dataWithLength:LARGE_REQUEST_PAYLOAD_SIZE];
GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
// For backwards compatibility
options.transportType = [[self class] transportType];
options.transport = [[self class] transport];
options.PEMRootCertificates = [[self class] PEMRootCertificates];
options.hostNameOverride = [[self class] hostNameOverride];
__weak RMTTestService *weakService = service;
GRPCUnaryProtoCall *call = [service
unaryCallWithMessage:request
responseHandler:[[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil
messageCallback:^(id message) {
if (weakService == nil) {
return;
}
XCTAssertNotNil(message);
if (message) {
RMTSimpleResponse *expectedResponse =
[RMTSimpleResponse message];
expectedResponse.payload.type = RMTPayloadType_Compressable;
expectedResponse.payload.body =
[NSMutableData dataWithLength:LARGE_RESPONSE_PAYLOAD_SIZE];
XCTAssertEqualObjects(message, expectedResponse);
[expectReceive fulfill];
}
}
closeCallback:^(NSDictionary *trailingMetadata, NSError *error) {
if (weakService == nil) {
return;
}
XCTAssertNil(error, @"Unexpected error: %@", error);
[expectComplete fulfill];
}]
callOptions:options];
[call start];
waiterBlock(@[ expectReceive, expectComplete ], GRPCInteropTestTimeoutDefault);
});
}
- (void)testConcurrentRPCsWithErrorsWithV2API {
GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
NSMutableArray *completeExpectations = [NSMutableArray array];
NSMutableArray *calls = [NSMutableArray array];
int num_rpcs = 10;
for (int i = 0; i < num_rpcs; ++i) {
[completeExpectations
addObject:[self expectationWithDescription:
[NSString stringWithFormat:@"Received trailer for RPC %d", i]]];
RMTSimpleRequest *request = [RMTSimpleRequest message];
request.responseType = RMTPayloadType_Compressable;
request.responseSize = SMALL_PAYLOAD_SIZE;
request.payload.body = [NSMutableData dataWithLength:SMALL_PAYLOAD_SIZE];
if (i % 3 == 0) {
request.responseStatus.code = GRPC_STATUS_UNAVAILABLE;
} else if (i % 7 == 0) {
request.responseStatus.code = GRPC_STATUS_CANCELLED;
}
GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
// For backwards compatibility
options.transportType = [[self class] transportType];
options.transport = [[self class] transport];
options.PEMRootCertificates = [[self class] PEMRootCertificates];
options.hostNameOverride = [[self class] hostNameOverride];
__weak RMTTestService *weakService = service;
GRPCUnaryProtoCall *call = [service
unaryCallWithMessage:request
responseHandler:[[InteropTestsBlockCallbacks alloc]
initWithInitialMetadataCallback:nil
messageCallback:^(id message) {
if (weakService == nil) {
return;
}
if (message) {
RMTSimpleResponse *expectedResponse =
[RMTSimpleResponse message];
expectedResponse.payload.type = RMTPayloadType_Compressable;
expectedResponse.payload.body =
[NSMutableData dataWithLength:SMALL_PAYLOAD_SIZE];
XCTAssertEqualObjects(message, expectedResponse);
}
}
closeCallback:^(NSDictionary *trailingMetadata, NSError *error) {
if (weakService == nil) {
return;
}
[completeExpectations[i] fulfill];
}]
callOptions:options];
[calls addObject:call];
}
for (int i = 0; i < num_rpcs; ++i) {
GRPCUnaryProtoCall *call = calls[i];
[call start];
}
waiterBlock(completeExpectations, GRPCInteropTestTimeoutDefault);
});
}
- (void)concurrentRPCsWithErrors {
const int kNumRpcs = 10;
__block int completedCallCount = 0;
NSCondition *cv = [[NSCondition alloc] init];
NSDate *waitUntil = [[NSDate date] dateByAddingTimeInterval:GRPCInteropTestTimeoutDefault];
[cv lock];
for (int i = 0; i < kNumRpcs; ++i) {
RMTSimpleRequest *request = [RMTSimpleRequest message];
request.responseType = RMTPayloadType_Compressable;
request.responseSize = SMALL_PAYLOAD_SIZE;
request.payload.body = [NSMutableData dataWithLength:SMALL_PAYLOAD_SIZE];
if (i % 3 == 0) {
request.responseStatus.code = GRPC_STATUS_UNAVAILABLE;
} else if (i % 7 == 0) {
request.responseStatus.code = GRPC_STATUS_CANCELLED;
}
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
__weak RMTTestService *weakService = service;
GRPCProtoCall *call = [service
RPCToUnaryCallWithRequest:request
handler:^(RMTSimpleResponse *response, NSError *error) {
if (weakService == nil) {
return;
}
if (error == nil) {
RMTSimpleResponse *expectedResponse = [RMTSimpleResponse message];
expectedResponse.payload.type = RMTPayloadType_Compressable;
expectedResponse.payload.body =
[NSMutableData dataWithLength:SMALL_PAYLOAD_SIZE];
XCTAssertEqualObjects(response, expectedResponse);
}
// DEBUG
[cv lock];
if (++completedCallCount == kNumRpcs) {
[cv signal];
}
[cv unlock];
}];
[call setResponseDispatchQueue:dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL)];
[call start];
}
while (completedCallCount < kNumRpcs && [waitUntil timeIntervalSinceNow] > 0) {
[cv waitUntilDate:waitUntil];
}
[cv unlock];
}
- (void)testConcurrentRPCsWithErrors {
[self retriableTest:@selector(concurrentRPCsWithErrors) retries:kTestRetries timeout:10];
}
- (void)testPacketCoalescing {
GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
__weak XCTestExpectation *expectation = [self expectationWithDescription:@"LargeUnary"];
RMTSimpleRequest *request = [RMTSimpleRequest message];
request.responseType = RMTPayloadType_Compressable;
request.responseSize = SMALL_PAYLOAD_SIZE;
request.payload.body = [NSMutableData dataWithLength:SMALL_PAYLOAD_SIZE];
[GRPCCall enableOpBatchLog:YES];
__weak RMTTestService *weakService = service;
[service unaryCallWithRequest:request
handler:^(RMTSimpleResponse *response, NSError *error) {
if (weakService == nil) {
return;
}
XCTAssertNil(error, @"Finished with unexpected error: %@", error);
RMTSimpleResponse *expectedResponse = [RMTSimpleResponse message];
expectedResponse.payload.type = RMTPayloadType_Compressable;
expectedResponse.payload.body = [NSMutableData dataWithLength:10];
XCTAssertEqualObjects(response, expectedResponse);
// The test is a success if there is a batch of exactly 3 ops
// (SEND_INITIAL_METADATA, SEND_MESSAGE, SEND_CLOSE_FROM_CLIENT).
// Without packet coalescing each batch of ops contains only one op.
NSArray *opBatches = [GRPCCall obtainAndCleanOpBatchLog];
const NSInteger kExpectedOpBatchSize = 3;
for (NSObject *o in opBatches) {
if ([o isKindOfClass:[NSArray class]]) {
NSArray *batch = (NSArray *)o;
if ([batch count] == kExpectedOpBatchSize) {
[expectation fulfill];
break;
}
}
}
}];
waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
[GRPCCall enableOpBatchLog:NO];
});
}
- (void)test4MBResponsesAreAccepted {
GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
__weak XCTestExpectation *expectation = [self expectationWithDescription:@"MaxResponseSize"];
RMTSimpleRequest *request = [RMTSimpleRequest message];
const int32_t kPayloadSize =
4 * 1024 * 1024 - self.encodingOverhead; // 4MB - encoding overhead
request.responseSize = kPayloadSize;
__weak RMTTestService *weakService = service;
[service unaryCallWithRequest:request
handler:^(RMTSimpleResponse *response, NSError *error) {
if (weakService == nil) {
return;
}
XCTAssertNil(error, @"Finished with unexpected error: %@", error);
XCTAssertEqual(response.payload.body.length, kPayloadSize);
[expectation fulfill];
}];
waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
});
}
- (void)testResponsesOverMaxSizeFailWithActionableMessage {
GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
__weak XCTestExpectation *expectation =
[self expectationWithDescription:@"ResponseOverMaxSize"];
RMTSimpleRequest *request = [RMTSimpleRequest message];
const int32_t kPayloadSize = 4 * 1024 * 1024 - self.encodingOverhead + 1; // 1B over max size
request.responseSize = kPayloadSize;
__weak RMTTestService *weakService = service;
[service unaryCallWithRequest:request
handler:^(RMTSimpleResponse *response, NSError *error) {
if (weakService == nil) {
return;
}
// TODO(jcanizales): Catch the error and rethrow it with an
// actionable message:
// - Use +[GRPCCall setResponseSizeLimit:forHost:] to set a
// higher limit.
// - If you're developing the server, consider using response
// streaming, or let clients filter
// responses by setting a google.protobuf.FieldMask in the
// request:
// https://github.com/protocolbuffers/protobuf/blob/master/src/google/protobuf/field_mask.proto
XCTAssertEqualObjects(
error.localizedDescription,
@"CLIENT: Received message larger than max (4194305 vs. 4194304)");
[expectation fulfill];
}];
waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
});
}
- (void)testResponsesOver4MBAreAcceptedIfOptedIn {
GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
__weak XCTestExpectation *expectation =
[self expectationWithDescription:@"HigherResponseSizeLimit"];
__block NSError *callError = nil;
RMTSimpleRequest *request = [RMTSimpleRequest message];
const size_t kPayloadSize = 5 * 1024 * 1024; // 5MB
request.responseSize = kPayloadSize;
[GRPCCall setResponseSizeLimit:6 * 1024 * 1024 forHost:[[self class] host]];
__weak RMTTestService *weakService = service;
[service unaryCallWithRequest:request
handler:^(RMTSimpleResponse *response, NSError *error) {
if (weakService == nil) {
return;
}
callError = error;
[expectation fulfill];
}];
waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
XCTAssertNil(callError, @"Finished with unexpected error: %@", callError);
});
}
- (void)testClientStreamingRPC {
GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
__weak XCTestExpectation *expectation = [self expectationWithDescription:@"ClientStreaming"];
RMTStreamingInputCallRequest *request1 = [RMTStreamingInputCallRequest message];
request1.payload.body = [NSMutableData dataWithLength:27182];
RMTStreamingInputCallRequest *request2 = [RMTStreamingInputCallRequest message];
request2.payload.body = [NSMutableData dataWithLength:8];
RMTStreamingInputCallRequest *request3 = [RMTStreamingInputCallRequest message];
request3.payload.body = [NSMutableData dataWithLength:1828];
RMTStreamingInputCallRequest *request4 = [RMTStreamingInputCallRequest message];
request4.payload.body = [NSMutableData dataWithLength:45904];
GRXWriter *writer = [GRXWriter writerWithContainer:@[ request1, request2, request3, request4 ]];
__weak RMTTestService *weakService = service;
[service
streamingInputCallWithRequestsWriter:writer
handler:^(RMTStreamingInputCallResponse *response,
NSError *error) {
if (weakService == nil) {
return;
}
XCTAssertNil(error, @"Finished with unexpected error: %@",
error);
RMTStreamingInputCallResponse *expectedResponse =
[RMTStreamingInputCallResponse message];
expectedResponse.aggregatedPayloadSize = 74922;
XCTAssertEqualObjects(response, expectedResponse);
[expectation fulfill];
}];
waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
});
}
- (void)testServerStreamingRPC {
GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
__weak XCTestExpectation *expectation = [self expectationWithDescription:@"ServerStreaming"];
NSArray *expectedSizes = @[ @31415, @9, @2653, @58979 ];
RMTStreamingOutputCallRequest *request = [RMTStreamingOutputCallRequest message];
for (NSNumber *size in expectedSizes) {
RMTResponseParameters *parameters = [RMTResponseParameters message];
parameters.size = [size intValue];
[request.responseParametersArray addObject:parameters];
}
__block int index = 0;
__weak RMTTestService *weakService = service;
[service
streamingOutputCallWithRequest:request
eventHandler:^(BOOL done, RMTStreamingOutputCallResponse *response,
NSError *error) {
if (weakService == nil) {
return;
}
assertBlock(
error == nil,
[NSString
stringWithFormat:@"Finished with unexpected error: %@", error]);
assertBlock(done || response,
@"Event handler called without an event.");
if (response) {
assertBlock(index < 4, @"More than 4 responses received.");
id expected = [RMTStreamingOutputCallResponse
messageWithPayloadSize:expectedSizes[index]];
assertBlock(
[response isEqual:expected],
[NSString
stringWithFormat:@"response %@ not equal to expected %@",
response, expected]);
index += 1;
}
if (done) {
assertBlock(
index == 4,
[NSString stringWithFormat:@"Received %@ responses instead of 4.",
@(index)]);
[expectation fulfill];
}
}];
waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
});
}
- (void)testPingPongRPC {
GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
__weak XCTestExpectation *expectation = [self expectationWithDescription:@"PingPong"];
NSArray *requests = @[ @27182, @8, @1828, @45904 ];
NSArray *responses = @[ @31415, @9, @2653, @58979 ];
GRXBufferedPipe *requestsBuffer = [[GRXBufferedPipe alloc] init];
__block int index = 0;
id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
requestedResponseSize:responses[index]];
[requestsBuffer writeValue:request];
__weak RMTTestService *weakService = service;
[service
fullDuplexCallWithRequestsWriter:requestsBuffer
eventHandler:^(BOOL done, RMTStreamingOutputCallResponse *response,
NSError *error) {
if (weakService == nil) {
return;
}
assertBlock(
error == nil,
[NSString stringWithFormat:@"Finished with unexpected error: %@",
error]);
assertBlock(done || response,
@"Event handler called without an event.");
if (response) {
assertBlock(index < 4, @"More than 4 responses received.");
id expected = [RMTStreamingOutputCallResponse
messageWithPayloadSize:responses[index]];
assertBlock(
[response isEqual:expected],
[NSString
stringWithFormat:@"response %@ not equal to expected %@",
response, expected]);
index += 1;
if (index < 4) {
id request = [RMTStreamingOutputCallRequest
messageWithPayloadSize:requests[index]
requestedResponseSize:responses[index]];
[requestsBuffer writeValue:request];
} else {
[requestsBuffer writesFinishedWithError:nil];
}
}
if (done) {
assertBlock(
index == 4,
[NSString
stringWithFormat:@"Received %@ responses instead of 4.",
@(index)]);
[expectation fulfill];
}
}];
waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
});
}
- (void)testPingPongRPCWithV2API {
GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
__weak XCTestExpectation *expectation = [self expectationWithDescription:@"PingPongWithV2API"];
NSArray *requests = @[ @27182, @8, @1828, @45904 ];
NSArray *responses = @[ @31415, @9, @2653, @58979 ];
__block int index = 0;
id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
requestedResponseSize:responses[index]];
GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
// For backwards compatibility
options.transportType = [[self class] transportType];
options.transport = [[self class] transport];
options.PEMRootCertificates = [[self class] PEMRootCertificates];
options.hostNameOverride = [[self class] hostNameOverride];
__weak __block GRPCStreamingProtoCall *weakCall;
GRPCStreamingProtoCall *call = [service
fullDuplexCallWithResponseHandler:
[[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil
messageCallback:^(id message) {
GRPCStreamingProtoCall *localCall = weakCall;
if (localCall == nil) {
return;
}
assertBlock(index < 4, @"More than 4 responses received.");
id expected =
[RMTStreamingOutputCallResponse messageWithPayloadSize:responses[index]];
assertBlock([message isEqual:expected],
[NSString stringWithFormat:@"message %@ not equal to expected %@",
message, expected]);
index += 1;
if (index < 4) {
id request =
[RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
requestedResponseSize:responses[index]];
[localCall writeMessage:request];
} else {
[localCall finish];
}
}
closeCallback:^(NSDictionary *trailingMetadata, NSError *error) {
if (weakCall == nil) {
return;
}
assertBlock(
error == nil,
[NSString stringWithFormat:@"Finished with unexpected error: %@", error]);
assertBlock(
index == 4,
[NSString stringWithFormat:@"Received %@ responses instead of 4.", @(index)]);
[expectation fulfill];
}]
callOptions:options];
weakCall = call;
[call start];
[call writeMessage:request];
waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
});
}
- (void)testPingPongRPCWithFlowControl {
GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
__weak XCTestExpectation *expectation = [self expectationWithDescription:@"PingPongWithV2API"];
NSArray *requests = @[ @27182, @8, @1828, @45904 ];
NSArray *responses = @[ @31415, @9, @2653, @58979 ];
__block int index = 0;
id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
requestedResponseSize:responses[index]];
GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
// For backwards compatibility
options.transportType = [[self class] transportType];
options.transport = [[self class] transport];
options.PEMRootCertificates = [[self class] PEMRootCertificates];
options.hostNameOverride = [[self class] hostNameOverride];
options.flowControlEnabled = YES;
__block int writeMessageCount = 0;
__weak __block GRPCStreamingProtoCall *weakCall;
GRPCStreamingProtoCall *call = [service
fullDuplexCallWithResponseHandler:
[[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil
messageCallback:^(id message) {
GRPCStreamingProtoCall *localCall = weakCall;
if (localCall == nil) {
return;
}
assertBlock((index < 4), @"More than 4 responses received.");
id expected =
[RMTStreamingOutputCallResponse messageWithPayloadSize:responses[index]];
assertBlock(
[message isEqual:expected],
[NSString stringWithFormat:@"message %@ not equal to %@", message, expected]);
index += 1;
if (index < 4) {
id request =
[RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
requestedResponseSize:responses[index]];
[localCall writeMessage:request];
[localCall receiveNextMessage];
} else {
[localCall finish];
}
}
closeCallback:^(NSDictionary *trailingMetadata, NSError *error) {
if (weakCall == nil) {
return;
}
assertBlock(
error == nil,
[NSString stringWithFormat:@"Finished with unexpected error: %@", error]);
assertBlock(
index == 4,
[NSString stringWithFormat:@"Received %i responses instead of 4.", index]);
[expectation fulfill];
}
writeMessageCallback:^{
writeMessageCount++;
}]
callOptions:options];
weakCall = call;
[call start];
[call receiveNextMessage];
[call writeMessage:request];
waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
assertBlock(
writeMessageCount == 4,
[NSString stringWithFormat:@"writeMessageCount %@ not equal to 4", @(writeMessageCount)]);
});
}
- (void)testEmptyStreamRPC {
GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
__weak XCTestExpectation *expectation = [self expectationWithDescription:@"EmptyStream"];
__weak RMTTestService *weakService = service;
[service
fullDuplexCallWithRequestsWriter:[GRXWriter emptyWriter]
eventHandler:^(BOOL done, RMTStreamingOutputCallResponse *response,
NSError *error) {
if (weakService == nil) {
return;
}
XCTAssertNil(error, @"Finished with unexpected error: %@", error);
XCTAssert(done, @"Unexpected response: %@", response);
[expectation fulfill];
}];
waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
});
}
- (void)testCancelAfterBeginRPC {
GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
__weak XCTestExpectation *expectation = [self expectationWithDescription:@"CancelAfterBegin"];
// A buffered pipe to which we never write any value acts as a writer that just hangs.
GRXBufferedPipe *requestsBuffer = [[GRXBufferedPipe alloc] init];
__weak RMTTestService *weakService = service;
GRPCProtoCall *call = [service
RPCToStreamingInputCallWithRequestsWriter:requestsBuffer
handler:^(RMTStreamingInputCallResponse *response,
NSError *error) {
if (weakService == nil) {
return;
}
XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED);
[expectation fulfill];
}];
XCTAssertEqual(call.state, GRXWriterStateNotStarted);
[call start];
XCTAssertEqual(call.state, GRXWriterStateStarted);
[call cancel];
XCTAssertEqual(call.state, GRXWriterStateFinished);
waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
});
}
- (void)testCancelAfterBeginRPCWithV2API {
GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
__weak XCTestExpectation *expectation =
[self expectationWithDescription:@"CancelAfterBeginWithV2API"];
// A buffered pipe to which we never write any value acts as a writer that just hangs.
__weak RMTTestService *weakService = service;
GRPCStreamingProtoCall *call = [service
streamingInputCallWithResponseHandler:[[InteropTestsBlockCallbacks alloc]
initWithInitialMetadataCallback:nil
messageCallback:^(id message) {
if (weakService == nil) {
return;
}
XCTFail(@"Not expected to receive message");
}
closeCallback:^(NSDictionary *trailingMetadata,
NSError *error) {
if (weakService == nil) {
return;
}
XCTAssertEqual(error.code,
GRPC_STATUS_CANCELLED);
[expectation fulfill];
}]
callOptions:nil];
[call start];
[call cancel];
waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
});
}
- (void)testCancelAfterFirstResponseRPC {
GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
__weak XCTestExpectation *expectation =
[self expectationWithDescription:@"CancelAfterFirstResponse"];
// A buffered pipe to which we write a single value but never close
GRXBufferedPipe *requestsBuffer = [[GRXBufferedPipe alloc] init];
__block BOOL receivedResponse = NO;
id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:@21782
requestedResponseSize:@31415];
[requestsBuffer writeValue:request];
__weak RMTTestService *weakService = service;
__block GRPCProtoCall *call = [service
RPCToFullDuplexCallWithRequestsWriter:requestsBuffer
eventHandler:^(BOOL done, RMTStreamingOutputCallResponse *response,
NSError *error) {
if (weakService == nil) {
return;
}
if (receivedResponse) {
XCTAssert(done, @"Unexpected extra response %@", response);
XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED);
[expectation fulfill];
} else {
XCTAssertNil(error, @"Finished with unexpected error: %@",
error);
XCTAssertFalse(done, @"Finished without response");
XCTAssertNotNil(response);
receivedResponse = YES;
[call cancel];
}
}];
[call start];
waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
});
}
- (void)testCancelAfterFirstResponseRPCWithV2API {
GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
__weak XCTestExpectation *completionExpectation =
[self expectationWithDescription:@"Call completed."];
__weak XCTestExpectation *responseExpectation =
[self expectationWithDescription:@"Received response."];
__block BOOL receivedResponse = NO;
GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
// For backwards compatibility
options.transportType = self.class.transportType;
options.transport = [[self class] transport];
options.PEMRootCertificates = self.class.PEMRootCertificates;
options.hostNameOverride = [[self class] hostNameOverride];
id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:@21782
requestedResponseSize:@31415];
__weak __block GRPCStreamingProtoCall *weakCall;
GRPCStreamingProtoCall *call = [service
fullDuplexCallWithResponseHandler:[[InteropTestsBlockCallbacks alloc]
initWithInitialMetadataCallback:nil
messageCallback:^(id message) {
GRPCStreamingProtoCall *localCall = weakCall;
if (localCall == nil) {
return;
}
XCTAssertFalse(receivedResponse);
receivedResponse = YES;
[localCall cancel];
[responseExpectation fulfill];
}
closeCallback:^(NSDictionary *trailingMetadata,
NSError *error) {
if (weakCall == nil) {
return;
}
XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED);
[completionExpectation fulfill];
}]
callOptions:options];
weakCall = call;
[call start];
[call writeMessage:request];
waiterBlock(@[ completionExpectation, responseExpectation ], GRPCInteropTestTimeoutDefault);
});
}
- (void)testCancelAfterFirstRequestWithV2API {
GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
__weak XCTestExpectation *completionExpectation =
[self expectationWithDescription:@"Call completed."];
GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
// For backwards compatibility
options.transportType = self.class.transportType;
options.transport = [[self class] transport];
options.PEMRootCertificates = self.class.PEMRootCertificates;
options.hostNameOverride = [[self class] hostNameOverride];
id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:@21782
requestedResponseSize:@31415];
__weak RMTTestService *weakService = service;
GRPCStreamingProtoCall *call = [service
fullDuplexCallWithResponseHandler:[[InteropTestsBlockCallbacks alloc]
initWithInitialMetadataCallback:nil
messageCallback:^(id message) {
if (weakService == nil) {
return;
}
XCTFail(@"Received unexpected response.");
}
closeCallback:^(NSDictionary *trailingMetadata,
NSError *error) {
if (weakService == nil) {
return;
}
XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED);
[completionExpectation fulfill];
}]
callOptions:options];
[call start];
[call writeMessage:request];
[call cancel];
waiterBlock(@[ completionExpectation ], GRPCInteropTestTimeoutDefault);
});
}
- (void)testRPCAfterClosingOpenConnections {
GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
__weak XCTestExpectation *expectation =
[self expectationWithDescription:@"RPC after closing connection"];
GPBEmpty *request = [GPBEmpty message];
__weak RMTTestService *weakService = service;
[service
emptyCallWithRequest:request
handler:^(GPBEmpty *response, NSError *error) {
if (weakService == nil) {
return;
}
XCTAssertNil(error, @"First RPC finished with unexpected error: %@", error);
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wdeprecated-declarations"
[GRPCCall closeOpenConnections];
#pragma clang diagnostic pop
[weakService
emptyCallWithRequest:request
handler:^(GPBEmpty *response, NSError *error) {
XCTAssertNil(
error,
@"Second RPC finished with unexpected error: %@",
error);
[expectation fulfill];
}];
}];
waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
});
}
- (void)testCompressedUnaryRPC {
// This test needs to be disabled for remote test because interop server grpc-test
// does not support compression.
if (isRemoteInteropTest([[self class] host])) {
return;
}
GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
__weak XCTestExpectation *expectation = [self expectationWithDescription:@"LargeUnary"];
RMTSimpleRequest *request = [RMTSimpleRequest message];
request.responseType = RMTPayloadType_Compressable;
request.responseSize = LARGE_RESPONSE_PAYLOAD_SIZE;
request.payload.body = [NSMutableData dataWithLength:LARGE_REQUEST_PAYLOAD_SIZE];
request.expectCompressed.value = YES;
[GRPCCall setDefaultCompressMethod:GRPCCompressGzip forhost:[[self class] host]];
__weak RMTTestService *weakService = service;
[service unaryCallWithRequest:request
handler:^(RMTSimpleResponse *response, NSError *error) {
if (weakService == nil) {
return;
}
XCTAssertNil(error, @"Finished with unexpected error: %@", error);
RMTSimpleResponse *expectedResponse = [RMTSimpleResponse message];
expectedResponse.payload.type = RMTPayloadType_Compressable;
expectedResponse.payload.body =
[NSMutableData dataWithLength:LARGE_RESPONSE_PAYLOAD_SIZE];
XCTAssertEqualObjects(response, expectedResponse);
[expectation fulfill];
}];
waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
});
}
// TODO(b/268379869): This test has a race and is flaky in any configurations. One possible way to
// deflake this test is to find a way to disable ping ack on the interop server for this test case.
- (void)testKeepaliveWithV2API {
return;
GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
if ([[self class] transport] == gGRPCCoreCronetID) {
// Cronet does not support keepalive
return;
}
__weak XCTestExpectation *expectation = [self expectationWithDescription:@"Keepalive"];
const NSTimeInterval kTestTimeout = 5;
NSNumber *kRequestSize = @27182;
NSNumber *kResponseSize = @31415;
id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:kRequestSize
requestedResponseSize:kResponseSize];
GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
options.transportType = [[self class] transportType];
options.transport = [[self class] transport];
options.PEMRootCertificates = [[self class] PEMRootCertificates];
options.hostNameOverride = [[self class] hostNameOverride];
options.keepaliveInterval = 1.5;
options.keepaliveTimeout = 0;
__weak RMTTestService *weakService = service;
GRPCStreamingProtoCall *call = [service
fullDuplexCallWithResponseHandler:
[[InteropTestsBlockCallbacks alloc]
initWithInitialMetadataCallback:nil
messageCallback:nil
closeCallback:^(NSDictionary *trailingMetadata, NSError *error) {
if (weakService == nil) {
return;
}
XCTAssertNotNil(error);
XCTAssertEqual(
error.code, GRPC_STATUS_UNAVAILABLE,
@"Received status %@ instead of UNAVAILABLE (14).",
@(error.code));
[expectation fulfill];
}]
callOptions:options];
[call writeMessage:request];
[call start];
waiterBlock(@[ expectation ], kTestTimeout);
[call finish];
});
}
- (void)testDefaultInterceptor {
GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
__weak XCTestExpectation *expectation =
[self expectationWithDescription:@"testDefaultInterceptor"];
NSArray *requests = @[ @27182, @8, @1828, @45904 ];
NSArray *responses = @[ @31415, @9, @2653, @58979 ];
__block int index = 0;
id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
requestedResponseSize:responses[index]];
GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
// For backwards compatibility
options.transportType = [[self class] transportType];
options.transport = [[self class] transport];
options.PEMRootCertificates = [[self class] PEMRootCertificates];
options.hostNameOverride = [[self class] hostNameOverride];
options.interceptorFactories = @[ [[DefaultInterceptorFactory alloc] init] ];
__weak __block GRPCStreamingProtoCall *weakCall;
GRPCStreamingProtoCall *call = [service
fullDuplexCallWithResponseHandler:
[[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil
messageCallback:^(id message) {
GRPCStreamingProtoCall *localCall = weakCall;
if (localCall == nil) {
return;
}
assertBlock(index < 4, @"More than 4 responses received.");
id expected =
[RMTStreamingOutputCallResponse messageWithPayloadSize:responses[index]];
assertBlock([message isEqual:expected],
[NSString stringWithFormat:@"message %@ not equal to expected %@",
message, expected]);
index += 1;
if (index < 4) {
id request =
[RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
requestedResponseSize:responses[index]];
[localCall writeMessage:request];
} else {
[localCall finish];
}
}
closeCallback:^(NSDictionary *trailingMetadata, NSError *error) {
if (weakCall == nil) {
return;
}
assertBlock(
index == 4,
[NSString stringWithFormat:@"Received %@ responses instead of 4.", @(index)]);
assertBlock(
error == nil,
[NSString stringWithFormat:@"Finished with unexpected error: %@", error]);
[expectation fulfill];
}]
callOptions:options];
weakCall = call;
[call start];
[call writeMessage:request];
waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
});
}
- (void)testLoggingInterceptor {
GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
__weak XCTestExpectation *expectation =
[self expectationWithDescription:@"testLoggingInterceptor"];
__block NSUInteger startCount = 0;
__block NSUInteger writeDataCount = 0;
__block NSUInteger finishCount = 0;
__block NSUInteger receiveNextMessageCount = 0;
__block NSUInteger responseHeaderCount = 0;
__block NSUInteger responseDataCount = 0;
__block NSUInteger responseCloseCount = 0;
__block NSUInteger didWriteDataCount = 0;
id<GRPCInterceptorFactory> factory = [[HookInterceptorFactory alloc]
initWithDispatchQueue:dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL)
startHook:^(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions,
GRPCInterceptorManager *manager) {
startCount++;
XCTAssertEqualObjects(requestOptions.host, [[self class] host]);
XCTAssertEqualObjects(requestOptions.path, @"/grpc.testing.TestService/FullDuplexCall");
XCTAssertEqual(requestOptions.safety, GRPCCallSafetyDefault);
[manager startNextInterceptorWithRequest:[requestOptions copy]
callOptions:[callOptions copy]];
}
writeDataHook:^(id data, GRPCInterceptorManager *manager) {
writeDataCount++;
[manager writeNextInterceptorWithData:data];
}
finishHook:^(GRPCInterceptorManager *manager) {
finishCount++;
[manager finishNextInterceptor];
}
receiveNextMessagesHook:^(NSUInteger numberOfMessages, GRPCInterceptorManager *manager) {
receiveNextMessageCount++;
[manager receiveNextInterceptorMessages:numberOfMessages];
}
responseHeaderHook:^(NSDictionary *initialMetadata, GRPCInterceptorManager *manager) {
responseHeaderCount++;
[manager forwardPreviousInterceptorWithInitialMetadata:initialMetadata];
}
responseDataHook:^(id data, GRPCInterceptorManager *manager) {
responseDataCount++;
[manager forwardPreviousInterceptorWithData:data];
}
responseCloseHook:^(NSDictionary *trailingMetadata, NSError *error,
GRPCInterceptorManager *manager) {
responseCloseCount++;
[manager forwardPreviousInterceptorCloseWithTrailingMetadata:trailingMetadata
error:error];
}
didWriteDataHook:^(GRPCInterceptorManager *manager) {
didWriteDataCount++;
[manager forwardPreviousInterceptorDidWriteData];
}];
NSArray *requests = @[ @1, @2, @3, @4 ];
NSArray *responses = @[ @1, @2, @3, @4 ];
__block int messageIndex = 0;
id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[messageIndex]
requestedResponseSize:responses[messageIndex]];
GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
// For backwards compatibility
options.transportType = [[self class] transportType];
options.transport = [[self class] transport];
options.PEMRootCertificates = [[self class] PEMRootCertificates];
options.hostNameOverride = [[self class] hostNameOverride];
options.flowControlEnabled = YES;
options.interceptorFactories = @[ factory ];
__block int writeMessageCount = 0;
__block __weak GRPCStreamingProtoCall *weakCall;
GRPCStreamingProtoCall *call = [service
fullDuplexCallWithResponseHandler:
[[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil
messageCallback:^(id message) {
GRPCStreamingProtoCall *localCall = weakCall;
if (localCall == nil) {
return;
}
assertBlock((messageIndex < 4), @"More than 4 responses received.");
id expected = [RMTStreamingOutputCallResponse
messageWithPayloadSize:responses[messageIndex]];
assertBlock([message isEqual:expected],
[NSString stringWithFormat:@"message %@ not equal to expected %@",
message, expected]);
messageIndex += 1;
if (messageIndex < 4) {
id request = [RMTStreamingOutputCallRequest
messageWithPayloadSize:requests[messageIndex]
requestedResponseSize:responses[messageIndex]];
[localCall writeMessage:request];
[localCall receiveNextMessage];
} else {
[localCall finish];
}
}
closeCallback:^(NSDictionary *trailingMetadata, NSError *error) {
if (weakCall == nil) {
return;
}
assertBlock(
error == nil,
[NSString stringWithFormat:@"Finished with unexpected error: %@", error]);
assertBlock(messageIndex == 4,
[NSString stringWithFormat:@"Received %@ responses instead of 4.",
@(messageIndex)]);
[expectation fulfill];
}
writeMessageCallback:^{
writeMessageCount++;
}]
callOptions:options];
weakCall = call;
[call start];
[call receiveNextMessage];
[call writeMessage:request];
waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
assertBlock(startCount == 1, [NSString stringWithFormat:@"%@", @(startCount)]);
assertBlock(writeDataCount == 4, [NSString stringWithFormat:@"%@", @(writeDataCount)]);
assertBlock(finishCount == 1, [NSString stringWithFormat:@"%@", @(finishCount)]);
assertBlock(receiveNextMessageCount == 4,
[NSString stringWithFormat:@"%@", @(receiveNextMessageCount)]);
assertBlock(responseHeaderCount == 1,
[NSString stringWithFormat:@"%@", @(responseHeaderCount)]);
assertBlock(responseDataCount == 4, [NSString stringWithFormat:@"%@", @(responseDataCount)]);
assertBlock(responseCloseCount == 1, [NSString stringWithFormat:@"%@", @(responseCloseCount)]);
assertBlock(didWriteDataCount == 4, [NSString stringWithFormat:@"%@", @(didWriteDataCount)]);
assertBlock(writeMessageCount == 4, [NSString stringWithFormat:@"%@", @(writeMessageCount)]);
});
}
// Chain a default interceptor and a hook interceptor which, after one write, cancels the call
// under the hood but forward further data to the user.
- (void)testHijackingInterceptor {
GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
NSUInteger kCancelAfterWrites = 1;
__weak XCTestExpectation *expectUserCallComplete =
[self expectationWithDescription:@"User call completed."];
__weak XCTestExpectation *expectResponseCallbackComplete =
[self expectationWithDescription:@"Hook interceptor response callback completed"];
NSArray *responses = @[ @1, @2, @3, @4 ];
__block int index = 0;
__block NSUInteger startCount = 0;
__block NSUInteger writeDataCount = 0;
__block NSUInteger finishCount = 0;
__block NSUInteger responseHeaderCount = 0;
__block NSUInteger responseDataCount = 0;
__block NSUInteger responseCloseCount = 0;
id<GRPCInterceptorFactory> factory = [[HookInterceptorFactory alloc]
initWithDispatchQueue:dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL)
startHook:^(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions,
GRPCInterceptorManager *manager) {
startCount++;
[manager startNextInterceptorWithRequest:[requestOptions copy]
callOptions:[callOptions copy]];
}
writeDataHook:^(id data, GRPCInterceptorManager *manager) {
writeDataCount++;
if (index < kCancelAfterWrites) {
[manager writeNextInterceptorWithData:data];
} else if (index == kCancelAfterWrites) {
[manager cancelNextInterceptor];
[manager forwardPreviousInterceptorWithData:[[RMTStreamingOutputCallResponse
messageWithPayloadSize:responses[index]]
data]];
} else { // (index > kCancelAfterWrites)
[manager forwardPreviousInterceptorWithData:[[RMTStreamingOutputCallResponse
messageWithPayloadSize:responses[index]]
data]];
}
}
finishHook:^(GRPCInterceptorManager *manager) {
finishCount++;
// finish must happen after the hijacking, so directly reply with a close
[manager forwardPreviousInterceptorCloseWithTrailingMetadata:@{@"grpc-status" : @"0"}
error:nil];
[manager shutDown];
}
receiveNextMessagesHook:nil
responseHeaderHook:^(NSDictionary *initialMetadata, GRPCInterceptorManager *manager) {
responseHeaderCount++;
[manager forwardPreviousInterceptorWithInitialMetadata:initialMetadata];
}
responseDataHook:^(id data, GRPCInterceptorManager *manager) {
responseDataCount++;
[manager forwardPreviousInterceptorWithData:data];
}
responseCloseHook:^(NSDictionary *trailingMetadata, NSError *error,
GRPCInterceptorManager *manager) {
responseCloseCount++;
// since we canceled the call, it should return cancel error
XCTAssertNil(trailingMetadata);
XCTAssertNotNil(error);
XCTAssertEqual(error.code, GRPC_STATUS_CANCELLED);
[expectResponseCallbackComplete fulfill];
}
didWriteDataHook:nil];
NSArray *requests = @[ @1, @2, @3, @4 ];
id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
requestedResponseSize:responses[index]];
GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
// For backwards compatibility
options.transportType = [[self class] transportType];
options.transport = [[self class] transport];
options.PEMRootCertificates = [[self class] PEMRootCertificates];
options.hostNameOverride = [[self class] hostNameOverride];
options.interceptorFactories = @[ [[DefaultInterceptorFactory alloc] init], factory ];
__weak __block GRPCStreamingProtoCall *weakCall;
GRPCStreamingProtoCall *call = [service
fullDuplexCallWithResponseHandler:
[[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil
messageCallback:^(id message) {
GRPCStreamingProtoCall *localCall = weakCall;
if (localCall == nil) {
return;
}
assertBlock(index < 4, @"More than 4 responses received.");
id expected =
[RMTStreamingOutputCallResponse messageWithPayloadSize:responses[index]];
assertBlock([message isEqual:expected],
[NSString stringWithFormat:@"message %@ not equal to expected %@",
message, expected]);
index += 1;
if (index < 4) {
id request =
[RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
requestedResponseSize:responses[index]];
[localCall writeMessage:request];
[localCall receiveNextMessage];
} else {
[self waitForExpectations:@[ expectResponseCallbackComplete ]
timeout:GRPCInteropTestTimeoutDefault];
[localCall finish];
}
}
closeCallback:^(NSDictionary *trailingMetadata, NSError *error) {
if (weakCall == nil) {
return;
}
assertBlock(
error == nil,
[NSString stringWithFormat:@"Finished with unexpected error: %@", error]);
assertBlock(
index == 4,
[NSString stringWithFormat:@"Received %@ responses instead of 4.", @(index)]);
[expectUserCallComplete fulfill];
}]
callOptions:options];
weakCall = call;
[call start];
[call receiveNextMessage];
[call writeMessage:request];
waiterBlock(@[ expectUserCallComplete ], GRPCInteropTestTimeoutDefault);
assertBlock(startCount == 1, [NSString stringWithFormat:@"%@", @(startCount)]);
assertBlock(writeDataCount == 4, [NSString stringWithFormat:@"%@", @(writeDataCount)]);
assertBlock(finishCount == 1, [NSString stringWithFormat:@"%@", @(finishCount)]);
assertBlock(responseHeaderCount == 1,
[NSString stringWithFormat:@"%@", @(responseHeaderCount)]);
assertBlock(responseDataCount == 1, [NSString stringWithFormat:@"%@", @(responseDataCount)]);
assertBlock(responseCloseCount == 1, [NSString stringWithFormat:@"%@", @(responseCloseCount)]);
});
}
- (void)testGlobalInterceptor {
GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
__weak XCTestExpectation *expectation =
[self expectationWithDescription:@"testGlobalInterceptor"];
__block NSUInteger startCount = 0;
__block NSUInteger writeDataCount = 0;
__block NSUInteger finishCount = 0;
__block NSUInteger receiveNextMessageCount = 0;
__block NSUInteger responseHeaderCount = 0;
__block NSUInteger responseDataCount = 0;
__block NSUInteger responseCloseCount = 0;
__block NSUInteger didWriteDataCount = 0;
[globalInterceptorFactory
setStartHook:^(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions,
GRPCInterceptorManager *manager) {
startCount++;
XCTAssertEqualObjects(requestOptions.host, [[self class] host]);
XCTAssertEqualObjects(requestOptions.path, @"/grpc.testing.TestService/FullDuplexCall");
XCTAssertEqual(requestOptions.safety, GRPCCallSafetyDefault);
[manager startNextInterceptorWithRequest:[requestOptions copy]
callOptions:[callOptions copy]];
}
writeDataHook:^(id data, GRPCInterceptorManager *manager) {
writeDataCount++;
[manager writeNextInterceptorWithData:data];
}
finishHook:^(GRPCInterceptorManager *manager) {
finishCount++;
[manager finishNextInterceptor];
}
receiveNextMessagesHook:^(NSUInteger numberOfMessages, GRPCInterceptorManager *manager) {
receiveNextMessageCount++;
[manager receiveNextInterceptorMessages:numberOfMessages];
}
responseHeaderHook:^(NSDictionary *initialMetadata, GRPCInterceptorManager *manager) {
responseHeaderCount++;
[manager forwardPreviousInterceptorWithInitialMetadata:initialMetadata];
}
responseDataHook:^(id data, GRPCInterceptorManager *manager) {
responseDataCount++;
[manager forwardPreviousInterceptorWithData:data];
}
responseCloseHook:^(NSDictionary *trailingMetadata, NSError *error,
GRPCInterceptorManager *manager) {
responseCloseCount++;
[manager forwardPreviousInterceptorCloseWithTrailingMetadata:trailingMetadata
error:error];
}
didWriteDataHook:^(GRPCInterceptorManager *manager) {
didWriteDataCount++;
[manager forwardPreviousInterceptorDidWriteData];
}];
NSArray *requests = @[ @1, @2, @3, @4 ];
NSArray *responses = @[ @1, @2, @3, @4 ];
__block int index = 0;
id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
requestedResponseSize:responses[index]];
GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
// For backwards compatibility
options.transportType = [[self class] transportType];
options.transport = [[self class] transport];
options.PEMRootCertificates = [[self class] PEMRootCertificates];
options.hostNameOverride = [[self class] hostNameOverride];
options.flowControlEnabled = YES;
globalInterceptorFactory.enabled = YES;
__block int writeMessageCount = 0;
__weak __block GRPCStreamingProtoCall *weakCall;
__block GRPCStreamingProtoCall *call = [service
fullDuplexCallWithResponseHandler:
[[InteropTestsBlockCallbacks alloc] initWithInitialMetadataCallback:nil
messageCallback:^(id message) {
GRPCStreamingProtoCall *localCall = weakCall;
if (localCall == nil) {
return;
}
assertBlock(index < 4, @"More than 4 responses received.");
index += 1;
if (index < 4) {
id request =
[RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
requestedResponseSize:responses[index]];
[localCall writeMessage:request];
[localCall receiveNextMessage];
} else {
[localCall finish];
}
}
closeCallback:^(NSDictionary *trailingMetadata, NSError *error) {
if (weakCall == nil) {
return;
}
assertBlock(
error == nil,
[NSString stringWithFormat:@"Finished with unexpected error: %@", error]);
[expectation fulfill];
}
writeMessageCallback:^{
writeMessageCount++;
}]
callOptions:options];
weakCall = call;
[call start];
[call receiveNextMessage];
[call writeMessage:request];
waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
assertBlock(startCount == 1, [NSString stringWithFormat:@"%@", @(startCount)]);
assertBlock(writeDataCount == 4, [NSString stringWithFormat:@"%@", @(writeDataCount)]);
assertBlock(finishCount == 1, [NSString stringWithFormat:@"%@", @(finishCount)]);
assertBlock(receiveNextMessageCount == 4,
[NSString stringWithFormat:@"%@", @(receiveNextMessageCount)]);
assertBlock(responseHeaderCount == 1,
[NSString stringWithFormat:@"%@", @(responseHeaderCount)]);
assertBlock(responseDataCount == 4, [NSString stringWithFormat:@"%@", @(responseDataCount)]);
assertBlock(responseCloseCount == 1, [NSString stringWithFormat:@"%@", @(responseCloseCount)]);
assertBlock(didWriteDataCount == 4, [NSString stringWithFormat:@"%@", @(didWriteDataCount)]);
assertBlock(writeMessageCount == 4, [NSString stringWithFormat:@"%@", @(writeMessageCount)]);
globalInterceptorFactory.enabled = NO;
});
}
- (void)testConflictingGlobalInterceptors {
id<GRPCInterceptorFactory> factory = [[HookInterceptorFactory alloc]
initWithDispatchQueue:dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL)
startHook:nil
writeDataHook:nil
finishHook:nil
receiveNextMessagesHook:nil
responseHeaderHook:nil
responseDataHook:nil
responseCloseHook:nil
didWriteDataHook:nil];
@try {
[GRPCCall2 registerGlobalInterceptor:factory];
XCTFail(@"Did not receive an exception when registering global interceptor the second time");
} @catch (NSException *exception) {
// Do nothing; test passes
}
}
- (void)testInterceptorAndGlobalInterceptor {
GRPCTestRunWithFlakeRepeats(self, ^(GRPCTestWaiter waiterBlock, GRPCTestAssert assertBlock) {
RMTTestService *service = [RMTTestService serviceWithHost:[[self class] host]];
__weak XCTestExpectation *expectation =
[self expectationWithDescription:@"testInterceptorAndGlobalInterceptor"];
__block NSUInteger startCount = 0;
__block NSUInteger writeDataCount = 0;
__block NSUInteger finishCount = 0;
__block NSUInteger receiveNextMessageCount = 0;
__block NSUInteger responseHeaderCount = 0;
__block NSUInteger responseDataCount = 0;
__block NSUInteger responseCloseCount = 0;
__block NSUInteger didWriteDataCount = 0;
id<GRPCInterceptorFactory> factory = [[HookInterceptorFactory alloc]
initWithDispatchQueue:dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL)
startHook:^(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions,
GRPCInterceptorManager *manager) {
startCount++;
XCTAssertEqualObjects(requestOptions.host, [[self class] host]);
XCTAssertEqualObjects(requestOptions.path, @"/grpc.testing.TestService/FullDuplexCall");
XCTAssertEqual(requestOptions.safety, GRPCCallSafetyDefault);
[manager startNextInterceptorWithRequest:[requestOptions copy]
callOptions:[callOptions copy]];
}
writeDataHook:^(id data, GRPCInterceptorManager *manager) {
writeDataCount++;
[manager writeNextInterceptorWithData:data];
}
finishHook:^(GRPCInterceptorManager *manager) {
finishCount++;
[manager finishNextInterceptor];
}
receiveNextMessagesHook:^(NSUInteger numberOfMessages, GRPCInterceptorManager *manager) {
receiveNextMessageCount++;
[manager receiveNextInterceptorMessages:numberOfMessages];
}
responseHeaderHook:^(NSDictionary *initialMetadata, GRPCInterceptorManager *manager) {
responseHeaderCount++;
[manager forwardPreviousInterceptorWithInitialMetadata:initialMetadata];
}
responseDataHook:^(id data, GRPCInterceptorManager *manager) {
responseDataCount++;
[manager forwardPreviousInterceptorWithData:data];
}
responseCloseHook:^(NSDictionary *trailingMetadata, NSError *error,
GRPCInterceptorManager *manager) {
responseCloseCount++;
[manager forwardPreviousInterceptorCloseWithTrailingMetadata:trailingMetadata
error:error];
}
didWriteDataHook:^(GRPCInterceptorManager *manager) {
didWriteDataCount++;
[manager forwardPreviousInterceptorDidWriteData];
}];
__block NSUInteger globalStartCount = 0;
__block NSUInteger globalWriteDataCount = 0;
__block NSUInteger globalFinishCount = 0;
__block NSUInteger globalReceiveNextMessageCount = 0;
__block NSUInteger globalResponseHeaderCount = 0;
__block NSUInteger globalResponseDataCount = 0;
__block NSUInteger globalResponseCloseCount = 0;
__block NSUInteger globalDidWriteDataCount = 0;
[globalInterceptorFactory
setStartHook:^(GRPCRequestOptions *requestOptions, GRPCCallOptions *callOptions,
GRPCInterceptorManager *manager) {
globalStartCount++;
XCTAssertEqualObjects(requestOptions.host, [[self class] host]);
XCTAssertEqualObjects(requestOptions.path, @"/grpc.testing.TestService/FullDuplexCall");
XCTAssertEqual(requestOptions.safety, GRPCCallSafetyDefault);
[manager startNextInterceptorWithRequest:[requestOptions copy]
callOptions:[callOptions copy]];
}
writeDataHook:^(id data, GRPCInterceptorManager *manager) {
globalWriteDataCount++;
[manager writeNextInterceptorWithData:data];
}
finishHook:^(GRPCInterceptorManager *manager) {
globalFinishCount++;
[manager finishNextInterceptor];
}
receiveNextMessagesHook:^(NSUInteger numberOfMessages, GRPCInterceptorManager *manager) {
globalReceiveNextMessageCount++;
[manager receiveNextInterceptorMessages:numberOfMessages];
}
responseHeaderHook:^(NSDictionary *initialMetadata, GRPCInterceptorManager *manager) {
globalResponseHeaderCount++;
[manager forwardPreviousInterceptorWithInitialMetadata:initialMetadata];
}
responseDataHook:^(id data, GRPCInterceptorManager *manager) {
globalResponseDataCount++;
[manager forwardPreviousInterceptorWithData:data];
}
responseCloseHook:^(NSDictionary *trailingMetadata, NSError *error,
GRPCInterceptorManager *manager) {
globalResponseCloseCount++;
[manager forwardPreviousInterceptorCloseWithTrailingMetadata:trailingMetadata
error:error];
}
didWriteDataHook:^(GRPCInterceptorManager *manager) {
globalDidWriteDataCount++;
[manager forwardPreviousInterceptorDidWriteData];
}];
NSArray *requests = @[ @1, @2, @3, @4 ];
NSArray *responses = @[ @1, @2, @3, @4 ];
__block int index = 0;
id request = [RMTStreamingOutputCallRequest messageWithPayloadSize:requests[index]
requestedResponseSize:responses[index]];
GRPCMutableCallOptions *options = [[GRPCMutableCallOptions alloc] init];
// For backwards compatibility
options.transportType = [[self class] transportType];
options.transport = [[self class] transport];
options.PEMRootCertificates = [[self class] PEMRootCertificates];
options.hostNameOverride = [[self class] hostNameOverride];
options.flowControlEnabled = YES;
options.interceptorFactories = @[ factory ];
globalInterceptorFactory.enabled = YES;
__block int writeMessageCount = 0;
__weak __block GRPCStreamingProtoCall *weakCall;
GRPCStreamingProtoCall *call = [service
fullDuplexCallWithResponseHandler:[[InteropTestsBlockCallbacks alloc]
initWithInitialMetadataCallback:nil
messageCallback:^(id message) {
GRPCStreamingProtoCall *localCall = weakCall;
if (localCall == nil) {
return;
}
index += 1;
if (index < 4) {
id request = [RMTStreamingOutputCallRequest
messageWithPayloadSize:requests[index]
requestedResponseSize:responses[index]];
[localCall writeMessage:request];
[localCall receiveNextMessage];
} else {
[localCall finish];
}
}
closeCallback:^(NSDictionary *trailingMetadata,
NSError *error) {
if (weakCall == nil) {
return;
}
[expectation fulfill];
}
writeMessageCallback:^{
writeMessageCount++;
}]
callOptions:options];
weakCall = call;
[call start];
[call receiveNextMessage];
[call writeMessage:request];
waiterBlock(@[ expectation ], GRPCInteropTestTimeoutDefault);
assertBlock(startCount == 1, [NSString stringWithFormat:@"%@", @(startCount)]);
assertBlock(writeDataCount == 4, [NSString stringWithFormat:@"%@", @(writeDataCount)]);
assertBlock(finishCount == 1, [NSString stringWithFormat:@"%@", @(finishCount)]);
assertBlock(receiveNextMessageCount == 4,
[NSString stringWithFormat:@"%@", @(receiveNextMessageCount)]);
assertBlock(responseHeaderCount == 1,
[NSString stringWithFormat:@"%@", @(responseHeaderCount)]);
assertBlock(responseDataCount == 4, [NSString stringWithFormat:@"%@", @(responseDataCount)]);
assertBlock(responseCloseCount == 1, [NSString stringWithFormat:@"%@", @(responseCloseCount)]);
assertBlock(didWriteDataCount == 4, [NSString stringWithFormat:@"%@", @(didWriteDataCount)]);
assertBlock(globalStartCount == 1, [NSString stringWithFormat:@"%@", @(globalStartCount)]);
assertBlock(globalWriteDataCount == 4,
[NSString stringWithFormat:@"%@", @(globalWriteDataCount)]);
assertBlock(globalFinishCount == 1, [NSString stringWithFormat:@"%@", @(globalFinishCount)]);
assertBlock(globalReceiveNextMessageCount == 4,
[NSString stringWithFormat:@"%@", @(globalReceiveNextMessageCount)]);
assertBlock(globalResponseHeaderCount == 1,
[NSString stringWithFormat:@"%@", @(globalResponseHeaderCount)]);
assertBlock(globalResponseDataCount == 4,
[NSString stringWithFormat:@"%@", @(globalResponseDataCount)]);
assertBlock(globalResponseCloseCount == 1,
[NSString stringWithFormat:@"%@", @(globalResponseCloseCount)]);
assertBlock(globalDidWriteDataCount == 4,
[NSString stringWithFormat:@"%@", @(globalDidWriteDataCount)]);
assertBlock(writeMessageCount == 4, [NSString stringWithFormat:@"%@", @(writeMessageCount)]);
globalInterceptorFactory.enabled = NO;
});
}
@end