| 12345678910111213141516171819202122232425262728293031323334353637383940414243 |
- # mypy: allow-untyped-defs
- import io
- import multiprocessing.queues
- import pickle
- from multiprocessing.reduction import ForkingPickler
- class ConnectionWrapper:
- """Proxy class for _multiprocessing.Connection which uses ForkingPickler for object serialization."""
- def __init__(self, conn):
- self.conn = conn
- def send(self, obj):
- buf = io.BytesIO()
- ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj)
- self.send_bytes(buf.getvalue())
- def recv(self):
- buf = self.recv_bytes()
- return pickle.loads(buf)
- def __getattr__(self, name):
- if "conn" in self.__dict__:
- return getattr(self.conn, name)
- raise AttributeError(f"'{type(self).__name__}' object has no attribute 'conn'")
- class Queue(multiprocessing.queues.Queue):
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self._reader: ConnectionWrapper = ConnectionWrapper(self._reader)
- self._writer: ConnectionWrapper = ConnectionWrapper(self._writer)
- self._send = self._writer.send
- self._recv = self._reader.recv
- class SimpleQueue(multiprocessing.queues.SimpleQueue):
- def _make_methods(self):
- if not isinstance(self._reader, ConnectionWrapper):
- self._reader: ConnectionWrapper = ConnectionWrapper(self._reader)
- self._writer: ConnectionWrapper = ConnectionWrapper(self._writer)
- super()._make_methods() # type: ignore[misc]
|