pool.py 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. import multiprocessing.pool
  2. import multiprocessing.util as util
  3. from .queue import SimpleQueue
  4. def clean_worker(*args, **kwargs):
  5. import gc
  6. multiprocessing.pool.worker(*args, **kwargs)
  7. # Regular multiprocessing workers don't fully clean up after themselves,
  8. # so we have to explicitly trigger garbage collection to make sure that all
  9. # destructors are called...
  10. gc.collect()
  11. class Pool(multiprocessing.pool.Pool):
  12. """Pool implementation which uses our version of SimpleQueue.
  13. This lets us pass tensors in shared memory across processes instead of
  14. serializing the underlying data.
  15. """
  16. def _setup_queues(self):
  17. self._inqueue = SimpleQueue()
  18. self._outqueue = SimpleQueue()
  19. self._quick_put = self._inqueue._writer.send
  20. self._quick_get = self._outqueue._reader.recv
  21. def _repopulate_pool(self):
  22. """Increase the number of pool processes to the specified number.
  23. Bring the number of pool processes up to the specified number, for use after
  24. reaping workers which have exited.
  25. """
  26. for i in range(self._processes - len(self._pool)):
  27. # changed worker -> clean_worker
  28. args = (
  29. self._inqueue,
  30. self._outqueue,
  31. self._initializer,
  32. self._initargs,
  33. self._maxtasksperchild,
  34. )
  35. if hasattr(self, "_wrap_exception"):
  36. args += (self._wrap_exception,)
  37. w = self.Process(target=clean_worker, args=args)
  38. self._pool.append(w)
  39. w.name = w.name.replace("Process", "PoolWorker")
  40. w.daemon = True
  41. w.start()
  42. util.debug("added worker")