blob: 718788f994484fdb297589398d54fe742b85452d [file] [log] [blame]
Alan Viverette3da604b2020-06-10 18:34:39 +00001/*
2 * Copyright (C) 2019 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package com.android.internal.infra;
18
19import static java.util.concurrent.TimeUnit.SECONDS;
20
21import android.os.AsyncTask;
22import android.os.ParcelFileDescriptor;
23
24import com.android.internal.util.FunctionalUtils.ThrowingConsumer;
25import com.android.internal.util.FunctionalUtils.ThrowingFunction;
26
27import libcore.io.IoUtils;
28
29import java.io.ByteArrayOutputStream;
30import java.io.Closeable;
31import java.io.IOException;
32import java.io.InputStream;
33import java.io.OutputStream;
34import java.util.concurrent.Executor;
35
36/**
37 * Utility class for streaming bytes across IPC, using standard APIs such as
38 * {@link InputStream}/{@link OutputStream} or simply {@code byte[]}
39 *
40 * <p>
41 * To use this, you'll want to declare your IPC methods to accept a {@link ParcelFileDescriptor},
42 * and call them from within lambdas passed to {@link #receiveBytes}/{@link #sendBytes},
43 * passing on the provided {@link ParcelFileDescriptor}.
44 *
45 * <p>
46 * E.g.:
47 * {@code
48 * //IFoo.aidl
49 * oneway interface IFoo {
50 * void sendGreetings(in ParcelFileDescriptor pipe);
51 * void receiveGreetings(in ParcelFileDescriptor pipe);
52 * }
53 *
54 * //Foo.java
55 * mServiceConnector.postAsync(service -> RemoteStream.sendBytes(
56 * pipe -> service.sendGreetings(pipe, greetings)))...
57 *
58 * mServiceConnector.postAsync(service -> RemoteStream.receiveBytes(
59 * pipe -> service.receiveGreetings(pipe)))
60 * .whenComplete((greetings, err) -> ...);
61 * }
62 *
63 * <p>
64 * Each operation has a 30 second timeout by default, as it's possible for an operation to be
65 * stuck forever otherwise.
66 * You can {@link #cancelTimeout cancel} and/or {@link #orTimeout set a custom timeout}, using the
67 * {@link AndroidFuture} you get as a result.
68 *
69 * <p>
70 * You can also {@link #cancel} the operation, which will result in closing the underlying
71 * {@link ParcelFileDescriptor}.
72 *
73 * @see #sendBytes
74 * @see #receiveBytes
75 *
76 * @param <RES> the result of a successful streaming.
77 * @param <IOSTREAM> either {@link InputStream} or {@link OutputStream} depending on the direction.
78 */
79public abstract class RemoteStream<RES, IOSTREAM extends Closeable>
80 extends AndroidFuture<RES>
81 implements Runnable {
82
83 private final ThrowingFunction<IOSTREAM, RES> mHandleStream;
84 private volatile ParcelFileDescriptor mLocalPipe;
85
86 /**
87 * Call an IPC, and process incoming bytes as an {@link InputStream} within {@code read}.
88 *
89 * @param ipc action to perform the IPC. Called directly on the calling thread.
90 * @param read action to read from an {@link InputStream}, transforming data into {@code R}.
91 * Called asynchronously on the background thread.
92 * @param <R> type of the end result of reading the bytes (if any).
93 * @return an {@link AndroidFuture} that can be used to track operation's completion and
94 * retrieve its result (if any).
95 */
96 public static <R> AndroidFuture<R> receiveBytes(
97 ThrowingConsumer<ParcelFileDescriptor> ipc, ThrowingFunction<InputStream, R> read) {
98 return new RemoteStream<R, InputStream>(
99 ipc, read, AsyncTask.THREAD_POOL_EXECUTOR, true /* read */) {
100 @Override
101 protected InputStream createStream(ParcelFileDescriptor fd) {
102 return new ParcelFileDescriptor.AutoCloseInputStream(fd);
103 }
104 };
105 }
106
107 /**
108 * Call an IPC, and asynchronously return incoming bytes as {@code byte[]}.
109 *
110 * @param ipc action to perform the IPC. Called directly on the calling thread.
111 * @return an {@link AndroidFuture} that can be used to track operation's completion and
112 * retrieve its result.
113 */
114 public static AndroidFuture<byte[]> receiveBytes(ThrowingConsumer<ParcelFileDescriptor> ipc) {
115 return receiveBytes(ipc, RemoteStream::readAll);
116 }
117
118 /**
119 * Convert a given {@link InputStream} into {@code byte[]}.
120 *
121 * <p>
122 * This doesn't close the given {@link InputStream}
123 */
124 public static byte[] readAll(InputStream inputStream) throws IOException {
125 ByteArrayOutputStream combinedBuffer = new ByteArrayOutputStream();
126 byte[] buffer = new byte[16 * 1024];
127 while (true) {
128 int numRead = inputStream.read(buffer);
129 if (numRead == -1) {
130 break;
131 }
132 combinedBuffer.write(buffer, 0, numRead);
133 }
134 return combinedBuffer.toByteArray();
135 }
136
137 /**
138 * Call an IPC, and perform sending bytes via an {@link OutputStream} within {@code write}.
139 *
140 * @param ipc action to perform the IPC. Called directly on the calling thread.
141 * @param write action to write to an {@link OutputStream}, optionally returning operation
142 * result as {@code R}. Called asynchronously on the background thread.
143 * @param <R> type of the end result of writing the bytes (if any).
144 * @return an {@link AndroidFuture} that can be used to track operation's completion and
145 * retrieve its result (if any).
146 */
147 public static <R> AndroidFuture<R> sendBytes(
148 ThrowingConsumer<ParcelFileDescriptor> ipc, ThrowingFunction<OutputStream, R> write) {
149 return new RemoteStream<R, OutputStream>(
150 ipc, write, AsyncTask.THREAD_POOL_EXECUTOR, false /* read */) {
151 @Override
152 protected OutputStream createStream(ParcelFileDescriptor fd) {
153 return new ParcelFileDescriptor.AutoCloseOutputStream(fd);
154 }
155 };
156 }
157
158 /**
159 * Same as {@link #sendBytes(ThrowingConsumer, ThrowingFunction)}, but explicitly avoids
160 * returning a result.
161 */
162 public static AndroidFuture<Void> sendBytes(
163 ThrowingConsumer<ParcelFileDescriptor> ipc, ThrowingConsumer<OutputStream> write) {
164 return sendBytes(ipc, os -> {
165 write.acceptOrThrow(os);
166 return null;
167 });
168 }
169
170 /**
171 * Same as {@link #sendBytes(ThrowingConsumer, ThrowingFunction)}, but providing the data to
172 * send eagerly as {@code byte[]}.
173 */
174 public static AndroidFuture<Void> sendBytes(
175 ThrowingConsumer<ParcelFileDescriptor> ipc, byte[] data) {
176 return sendBytes(ipc, os -> {
177 os.write(data);
178 return null;
179 });
180 }
181
182 private RemoteStream(
183 ThrowingConsumer<ParcelFileDescriptor> ipc,
184 ThrowingFunction<IOSTREAM, RES> handleStream,
185 Executor backgroundExecutor,
186 boolean read) {
187 mHandleStream = handleStream;
188
189 ParcelFileDescriptor[] pipe;
190 try {
191 //TODO consider using createReliablePipe
192 pipe = ParcelFileDescriptor.createPipe();
193 try (ParcelFileDescriptor remotePipe = pipe[read ? 1 : 0]) {
194 ipc.acceptOrThrow(remotePipe);
195 // Remote pipe end is duped by binder call. Local copy is not needed anymore
196 }
197
198 mLocalPipe = pipe[read ? 0 : 1];
199 backgroundExecutor.execute(this);
200
201 // Guard against getting stuck forever
202 orTimeout(30, SECONDS);
203 } catch (Throwable e) {
204 completeExceptionally(e);
205 // mLocalPipe closes in #onCompleted
206 }
207 }
208
209 protected abstract IOSTREAM createStream(ParcelFileDescriptor fd);
210
211 @Override
212 public void run() {
213 try (IOSTREAM stream = createStream(mLocalPipe)) {
214 complete(mHandleStream.applyOrThrow(stream));
215 } catch (Throwable t) {
216 completeExceptionally(t);
217 }
218 }
219
220 @Override
221 protected void onCompleted(RES res, Throwable err) {
222 super.onCompleted(res, err);
223 IoUtils.closeQuietly(mLocalPipe);
224 }
225}