Skip to content

Update multiprocess from 3.13.5 #6000

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions Lib/multiprocessing/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import tempfile
import itertools

import _multiprocessing

from . import util

Expand All @@ -28,6 +27,7 @@
_ForkingPickler = reduction.ForkingPickler

try:
import _multiprocessing
import _winapi
from _winapi import WAIT_OBJECT_0, WAIT_ABANDONED_0, WAIT_TIMEOUT, INFINITE
except ImportError:
Expand Down Expand Up @@ -846,7 +846,7 @@ def PipeClient(address):
_LEGACY_LENGTHS = (_MD5ONLY_MESSAGE_LENGTH, _MD5_DIGEST_LEN)


def _get_digest_name_and_payload(message: bytes) -> (str, bytes):
def _get_digest_name_and_payload(message): # type: (bytes) -> tuple[str, bytes]
"""Returns a digest name and the payload for a response hash.

If a legacy protocol is detected based on the message length
Expand Down Expand Up @@ -956,7 +956,7 @@ def answer_challenge(connection, authkey: bytes):
f'Protocol error, expected challenge: {message=}')
message = message[len(_CHALLENGE):]
if len(message) < _MD5ONLY_MESSAGE_LENGTH:
raise AuthenticationError('challenge too short: {len(message)} bytes')
raise AuthenticationError(f'challenge too short: {len(message)} bytes')
digest = _create_response(authkey, message)
connection.send_bytes(digest)
response = connection.recv_bytes(256) # reject large message
Expand Down Expand Up @@ -1012,8 +1012,20 @@ def _exhaustive_wait(handles, timeout):
# returning the first signalled might create starvation issues.)
L = list(handles)
ready = []
# Windows limits WaitForMultipleObjects at 64 handles, and we use a
# few for synchronisation, so we switch to batched waits at 60.
if len(L) > 60:
try:
res = _winapi.BatchedWaitForMultipleObjects(L, False, timeout)
except TimeoutError:
return []
ready.extend(L[i] for i in res)
if res:
L = [h for i, h in enumerate(L) if i > res[0] & i not in res]
timeout = 0
while L:
res = _winapi.WaitForMultipleObjects(L, False, timeout)
short_L = L[:60] if len(L) > 60 else L
res = _winapi.WaitForMultipleObjects(short_L, False, timeout)
if res == WAIT_TIMEOUT:
break
elif WAIT_OBJECT_0 <= res < WAIT_OBJECT_0 + len(L):
Expand Down
2 changes: 1 addition & 1 deletion Lib/multiprocessing/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def freeze_support(self):
'''Check whether this is a fake forked process in a frozen executable.
If so then run code specified by commandline and exit.
'''
if sys.platform == 'win32' and getattr(sys, 'frozen', False):
if self.get_start_method() == 'spawn' and getattr(sys, 'frozen', False):
from .spawn import freeze_support
freeze_support()

Expand Down
6 changes: 6 additions & 0 deletions Lib/multiprocessing/forkserver.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import atexit
import errno
import os
import selectors
Expand Down Expand Up @@ -167,6 +168,8 @@ def ensure_running(self):
def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
'''Run forkserver.'''
if preload:
if sys_path is not None:
sys.path[:] = sys_path
if '__main__' in preload and main_path is not None:
process.current_process()._inheriting = True
try:
Expand Down Expand Up @@ -271,13 +274,16 @@ def sigchld_handler(*_unused):
selector.close()
unused_fds = [alive_r, child_w, sig_r, sig_w]
unused_fds.extend(pid_to_fd.values())
atexit._clear()
atexit.register(util._exit_function)
code = _serve_one(child_r, fds,
unused_fds,
old_handlers)
except Exception:
sys.excepthook(*sys.exc_info())
sys.stderr.flush()
finally:
atexit._run_exitfuncs()
os._exit(code)
else:
# Send pid to client process
Expand Down
51 changes: 34 additions & 17 deletions Lib/multiprocessing/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ def dispatch(c, id, methodname, args=(), kwds={}):
kind, result = c.recv()
if kind == '#RETURN':
return result
raise convert_to_error(kind, result)
try:
raise convert_to_error(kind, result)
finally:
del result # break reference cycle

def convert_to_error(kind, result):
if kind == '#ERROR':
Expand Down Expand Up @@ -755,22 +758,29 @@ class BaseProxy(object):
_address_to_local = {}
_mutex = util.ForkAwareThreadLock()

# Each instance gets a `_serial` number. Unlike `id(...)`, this number
# is never reused.
_next_serial = 1

def __init__(self, token, serializer, manager=None,
authkey=None, exposed=None, incref=True, manager_owned=False):
with BaseProxy._mutex:
tls_idset = BaseProxy._address_to_local.get(token.address, None)
if tls_idset is None:
tls_idset = util.ForkAwareLocal(), ProcessLocalSet()
BaseProxy._address_to_local[token.address] = tls_idset
tls_serials = BaseProxy._address_to_local.get(token.address, None)
if tls_serials is None:
tls_serials = util.ForkAwareLocal(), ProcessLocalSet()
BaseProxy._address_to_local[token.address] = tls_serials

self._serial = BaseProxy._next_serial
BaseProxy._next_serial += 1

# self._tls is used to record the connection used by this
# thread to communicate with the manager at token.address
self._tls = tls_idset[0]
self._tls = tls_serials[0]

# self._idset is used to record the identities of all shared
# objects for which the current process owns references and
# self._all_serials is a set used to record the identities of all
# shared objects for which the current process owns references and
# which are in the manager at token.address
self._idset = tls_idset[1]
self._all_serials = tls_serials[1]

self._token = token
self._id = self._token.id
Expand Down Expand Up @@ -833,7 +843,10 @@ def _callmethod(self, methodname, args=(), kwds={}):
conn = self._Client(token.address, authkey=self._authkey)
dispatch(conn, None, 'decref', (token.id,))
return proxy
raise convert_to_error(kind, result)
try:
raise convert_to_error(kind, result)
finally:
del result # break reference cycle

def _getvalue(self):
'''
Expand All @@ -850,20 +863,20 @@ def _incref(self):
dispatch(conn, None, 'incref', (self._id,))
util.debug('INCREF %r', self._token.id)

self._idset.add(self._id)
self._all_serials.add(self._serial)

state = self._manager and self._manager._state

self._close = util.Finalize(
self, BaseProxy._decref,
args=(self._token, self._authkey, state,
self._tls, self._idset, self._Client),
args=(self._token, self._serial, self._authkey, state,
self._tls, self._all_serials, self._Client),
exitpriority=10
)

@staticmethod
def _decref(token, authkey, state, tls, idset, _Client):
idset.discard(token.id)
def _decref(token, serial, authkey, state, tls, idset, _Client):
idset.discard(serial)

# check whether manager is still alive
if state is None or state.value == State.STARTED:
Expand Down Expand Up @@ -1159,15 +1172,19 @@ def __imul__(self, value):
self._callmethod('__imul__', (value,))
return self

__class_getitem__ = classmethod(types.GenericAlias)

DictProxy = MakeProxyType('DictProxy', (

_BaseDictProxy = MakeProxyType('DictProxy', (
'__contains__', '__delitem__', '__getitem__', '__iter__', '__len__',
'__setitem__', 'clear', 'copy', 'get', 'items',
'keys', 'pop', 'popitem', 'setdefault', 'update', 'values'
))
DictProxy._method_to_typeid_ = {
_BaseDictProxy._method_to_typeid_ = {
'__iter__': 'Iterator',
}
class DictProxy(_BaseDictProxy):
__class_getitem__ = classmethod(types.GenericAlias)


ArrayProxy = MakeProxyType('ArrayProxy', (
Expand Down
2 changes: 1 addition & 1 deletion Lib/multiprocessing/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def __init__(self, processes=None, initializer=None, initargs=(),
self._initargs = initargs

if processes is None:
processes = os.cpu_count() or 1
processes = os.process_cpu_count() or 1
if processes < 1:
raise ValueError("Number of processes must be at least 1")
if maxtasksperchild is not None:
Expand Down
4 changes: 4 additions & 0 deletions Lib/multiprocessing/popen_fork.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import atexit
import os
import signal

Expand Down Expand Up @@ -66,10 +67,13 @@ def _launch(self, process_obj):
self.pid = os.fork()
if self.pid == 0:
try:
atexit._clear()
atexit.register(util._exit_function)
os.close(parent_r)
os.close(parent_w)
code = process_obj._bootstrap(parent_sentinel=child_r)
finally:
atexit._run_exitfuncs()
os._exit(code)
else:
os.close(child_w)
Expand Down
4 changes: 3 additions & 1 deletion Lib/multiprocessing/popen_spawn_win32.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import signal
import sys
import _winapi
from subprocess import STARTUPINFO, STARTF_FORCEOFFFEEDBACK

from .context import reduction, get_spawning_popen, set_spawning_popen
from . import spawn
Expand Down Expand Up @@ -74,7 +75,8 @@ def __init__(self, process_obj):
try:
hp, ht, pid, tid = _winapi.CreateProcess(
python_exe, cmd,
None, None, False, 0, env, None, None)
None, None, False, 0, env, None,
STARTUPINFO(dwFlags=STARTF_FORCEOFFFEEDBACK))
_winapi.CloseHandle(ht)
except:
_winapi.CloseHandle(rhandle)
Expand Down
7 changes: 2 additions & 5 deletions Lib/multiprocessing/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,11 +310,8 @@ def _bootstrap(self, parent_sentinel=None):
# _run_after_forkers() is executed
del old_process
util.info('child process calling self.run()')
try:
self.run()
exitcode = 0
finally:
util._exit_function()
self.run()
exitcode = 0
except SystemExit as e:
if e.code is None:
exitcode = 0
Expand Down
2 changes: 0 additions & 2 deletions Lib/multiprocessing/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

from queue import Empty, Full

import _multiprocessing

from . import connection
from . import context
_ForkingPickler = context.reduction.ForkingPickler
Expand Down
Loading
Loading