blob: cd10231f710f1139590545b3b9e3bd88c12544f2 [file] [log] [blame]
Haibo Huangd8830302020-03-03 10:09:46 -08001__all__ = 'create_subprocess_exec', 'create_subprocess_shell'
2
3import subprocess
Haibo Huangd8830302020-03-03 10:09:46 -08004
5from . import events
6from . import protocols
7from . import streams
8from . import tasks
9from .log import logger
10
11
12PIPE = subprocess.PIPE
13STDOUT = subprocess.STDOUT
14DEVNULL = subprocess.DEVNULL
15
16
17class SubprocessStreamProtocol(streams.FlowControlMixin,
18 protocols.SubprocessProtocol):
19 """Like StreamReaderProtocol, but for a subprocess."""
20
21 def __init__(self, limit, loop):
22 super().__init__(loop=loop)
23 self._limit = limit
24 self.stdin = self.stdout = self.stderr = None
25 self._transport = None
26 self._process_exited = False
27 self._pipe_fds = []
28 self._stdin_closed = self._loop.create_future()
29
30 def __repr__(self):
31 info = [self.__class__.__name__]
32 if self.stdin is not None:
33 info.append(f'stdin={self.stdin!r}')
34 if self.stdout is not None:
35 info.append(f'stdout={self.stdout!r}')
36 if self.stderr is not None:
37 info.append(f'stderr={self.stderr!r}')
38 return '<{}>'.format(' '.join(info))
39
40 def connection_made(self, transport):
41 self._transport = transport
42
43 stdout_transport = transport.get_pipe_transport(1)
44 if stdout_transport is not None:
45 self.stdout = streams.StreamReader(limit=self._limit,
46 loop=self._loop)
47 self.stdout.set_transport(stdout_transport)
48 self._pipe_fds.append(1)
49
50 stderr_transport = transport.get_pipe_transport(2)
51 if stderr_transport is not None:
52 self.stderr = streams.StreamReader(limit=self._limit,
53 loop=self._loop)
54 self.stderr.set_transport(stderr_transport)
55 self._pipe_fds.append(2)
56
57 stdin_transport = transport.get_pipe_transport(0)
58 if stdin_transport is not None:
59 self.stdin = streams.StreamWriter(stdin_transport,
60 protocol=self,
61 reader=None,
62 loop=self._loop)
63
64 def pipe_data_received(self, fd, data):
65 if fd == 1:
66 reader = self.stdout
67 elif fd == 2:
68 reader = self.stderr
69 else:
70 reader = None
71 if reader is not None:
72 reader.feed_data(data)
73
74 def pipe_connection_lost(self, fd, exc):
75 if fd == 0:
76 pipe = self.stdin
77 if pipe is not None:
78 pipe.close()
79 self.connection_lost(exc)
80 if exc is None:
81 self._stdin_closed.set_result(None)
82 else:
83 self._stdin_closed.set_exception(exc)
84 return
85 if fd == 1:
86 reader = self.stdout
87 elif fd == 2:
88 reader = self.stderr
89 else:
90 reader = None
91 if reader is not None:
92 if exc is None:
93 reader.feed_eof()
94 else:
95 reader.set_exception(exc)
96
97 if fd in self._pipe_fds:
98 self._pipe_fds.remove(fd)
99 self._maybe_close_transport()
100
101 def process_exited(self):
102 self._process_exited = True
103 self._maybe_close_transport()
104
105 def _maybe_close_transport(self):
106 if len(self._pipe_fds) == 0 and self._process_exited:
107 self._transport.close()
108 self._transport = None
109
110 def _get_close_waiter(self, stream):
111 if stream is self.stdin:
112 return self._stdin_closed
113
114
115class Process:
116 def __init__(self, transport, protocol, loop):
117 self._transport = transport
118 self._protocol = protocol
119 self._loop = loop
120 self.stdin = protocol.stdin
121 self.stdout = protocol.stdout
122 self.stderr = protocol.stderr
123 self.pid = transport.get_pid()
124
125 def __repr__(self):
126 return f'<{self.__class__.__name__} {self.pid}>'
127
128 @property
129 def returncode(self):
130 return self._transport.get_returncode()
131
132 async def wait(self):
133 """Wait until the process exit and return the process return code."""
134 return await self._transport._wait()
135
136 def send_signal(self, signal):
137 self._transport.send_signal(signal)
138
139 def terminate(self):
140 self._transport.terminate()
141
142 def kill(self):
143 self._transport.kill()
144
145 async def _feed_stdin(self, input):
146 debug = self._loop.get_debug()
147 self.stdin.write(input)
148 if debug:
149 logger.debug(
150 '%r communicate: feed stdin (%s bytes)', self, len(input))
151 try:
152 await self.stdin.drain()
153 except (BrokenPipeError, ConnectionResetError) as exc:
154 # communicate() ignores BrokenPipeError and ConnectionResetError
155 if debug:
156 logger.debug('%r communicate: stdin got %r', self, exc)
157
158 if debug:
159 logger.debug('%r communicate: close stdin', self)
160 self.stdin.close()
161
162 async def _noop(self):
163 return None
164
165 async def _read_stream(self, fd):
166 transport = self._transport.get_pipe_transport(fd)
167 if fd == 2:
168 stream = self.stderr
169 else:
170 assert fd == 1
171 stream = self.stdout
172 if self._loop.get_debug():
173 name = 'stdout' if fd == 1 else 'stderr'
174 logger.debug('%r communicate: read %s', self, name)
175 output = await stream.read()
176 if self._loop.get_debug():
177 name = 'stdout' if fd == 1 else 'stderr'
178 logger.debug('%r communicate: close %s', self, name)
179 transport.close()
180 return output
181
182 async def communicate(self, input=None):
183 if input is not None:
184 stdin = self._feed_stdin(input)
185 else:
186 stdin = self._noop()
187 if self.stdout is not None:
188 stdout = self._read_stream(1)
189 else:
190 stdout = self._noop()
191 if self.stderr is not None:
192 stderr = self._read_stream(2)
193 else:
194 stderr = self._noop()
Yi Kong71199322022-08-30 15:53:45 +0800195 stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr)
Haibo Huangd8830302020-03-03 10:09:46 -0800196 await self.wait()
197 return (stdout, stderr)
198
199
200async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
Yi Kong71199322022-08-30 15:53:45 +0800201 limit=streams._DEFAULT_LIMIT, **kwds):
202 loop = events.get_running_loop()
Haibo Huangd8830302020-03-03 10:09:46 -0800203 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
204 loop=loop)
205 transport, protocol = await loop.subprocess_shell(
206 protocol_factory,
207 cmd, stdin=stdin, stdout=stdout,
208 stderr=stderr, **kwds)
209 return Process(transport, protocol, loop)
210
211
212async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
Yi Kong71199322022-08-30 15:53:45 +0800213 stderr=None, limit=streams._DEFAULT_LIMIT,
214 **kwds):
215 loop = events.get_running_loop()
Haibo Huangd8830302020-03-03 10:09:46 -0800216 protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
217 loop=loop)
218 transport, protocol = await loop.subprocess_exec(
219 protocol_factory,
220 program, *args,
221 stdin=stdin, stdout=stdout,
222 stderr=stderr, **kwds)
223 return Process(transport, protocol, loop)