| import multiprocessing.pool |
| import multiprocessing.util as util |
| |
| from .queue import SimpleQueue |
| |
| |
| def clean_worker(*args, **kwargs): |
| import gc |
| multiprocessing.pool.worker(*args, **kwargs) |
| # Regular multiprocessing workers don't fully clean up after themselves, |
| # so we have to explicitly trigger garbage collection to make sure that all |
| # destructors are called... |
| gc.collect() |
| |
| |
| class Pool(multiprocessing.pool.Pool): |
| """Pool implementation which uses our version of SimpleQueue. |
| This lets us pass tensors in shared memory across processes instead of |
| serializing the underlying data.""" |
| |
| def _setup_queues(self): |
| self._inqueue = SimpleQueue() |
| self._outqueue = SimpleQueue() |
| self._quick_put = self._inqueue._writer.send |
| self._quick_get = self._outqueue._reader.recv |
| |
| def _repopulate_pool(self): |
| """Bring the number of pool processes up to the specified number, |
| for use after reaping workers which have exited. |
| """ |
| for i in range(self._processes - len(self._pool)): |
| # changed worker -> clean_worker |
| args = (self._inqueue, self._outqueue, |
| self._initializer, |
| self._initargs, self._maxtasksperchild) |
| if hasattr(self, '_wrap_exception'): |
| args += (self._wrap_exception,) |
| w = self.Process(target=clean_worker, args=args) |
| self._pool.append(w) |
| w.name = w.name.replace('Process', 'PoolWorker') |
| w.daemon = True |
| w.start() |
| util.debug('added worker') |