diff --git a/graalpython/com.oracle.graal.python.test/src/tests/conftest.toml b/graalpython/com.oracle.graal.python.test/src/tests/conftest.toml index db128a5123..dfae892489 100644 --- a/graalpython/com.oracle.graal.python.test/src/tests/conftest.toml +++ b/graalpython/com.oracle.graal.python.test/src/tests/conftest.toml @@ -20,7 +20,6 @@ partial_splits_individual_tests = true # on Windows, yet, add their files here. exclude_on = ['win32', 'win32-github'] selector = [ - "test_multiprocessing_graalpy.py", # import _winapi "test_pathlib.py", "test_posix.py", # import posix "test_pyio.py", # pyio imports msvcrt diff --git a/graalpython/com.oracle.graal.python.test/src/tests/test_entropy_subprocess.py b/graalpython/com.oracle.graal.python.test/src/tests/test_entropy_subprocess.py index 0016e3b083..07f6b0bad2 100644 --- a/graalpython/com.oracle.graal.python.test/src/tests/test_entropy_subprocess.py +++ b/graalpython/com.oracle.graal.python.test/src/tests/test_entropy_subprocess.py @@ -48,6 +48,9 @@ from tests.util import needs_capi +POSIX_BACKEND_IS_JAVA = sys.implementation.name == "graalpy" and __graalpython__.posix_module_backend() == "java" + + @unittest.skipUnless(sys.implementation.name == "graalpy" and sys.platform.startswith("linux"), "Linux GraalPy-specific test") class EntropySubprocessTests(unittest.TestCase): HASH_SECRET_BYTES = 24 @@ -180,6 +183,7 @@ def test_multiprocessing_process_import_does_not_use_initrandom(self): "ok", ) + @unittest.skipIf(POSIX_BACKEND_IS_JAVA, "multiprocessing doesn't work on Java POSIX backend") def test_multiprocessing_deliver_challenge_does_not_use_additional_initrandom(self): self.assert_initrandom_bytes_used( self.RANDOM_MODULE_BYTES, diff --git a/graalpython/com.oracle.graal.python.test/src/tests/test_multiprocessing.py b/graalpython/com.oracle.graal.python.test/src/tests/test_multiprocessing.py index 0bdcf368f9..726fda1202 100644 --- a/graalpython/com.oracle.graal.python.test/src/tests/test_multiprocessing.py +++ b/graalpython/com.oracle.graal.python.test/src/tests/test_multiprocessing.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022, 2024, Oracle and/or its affiliates. All rights reserved. +# Copyright (c) 2022, 2026, Oracle and/or its affiliates. All rights reserved. # DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. # # The Universal Permissive License (UPL), Version 1.0 @@ -42,16 +42,16 @@ import unittest +POSIX_BACKEND_IS_JAVA = sys.implementation.name == 'graalpy' and __graalpython__.posix_module_backend() == 'java' + + +@unittest.skipIf(POSIX_BACKEND_IS_JAVA, "multiprocessing doesn't work on emulated backend") class MultiprocessingTest(unittest.TestCase): @classmethod def tearDownClass(cls): import multiprocessing.resource_tracker multiprocessing.resource_tracker._resource_tracker._stop() - @unittest.skipIf( - sys.implementation.name == 'graalpy' and __graalpython__.posix_module_backend() == 'java', - reason="TODO multiprocessing.Array doesn't work on emulated backend", - ) def test_array_read(self): # This used to be buggy due to wrong usage of memoryview offsets when two objects were allocated in the same block # Don't remove the unused value on the next line diff --git a/graalpython/com.oracle.graal.python.test/src/tests/test_multiprocessing_graalpy.py b/graalpython/com.oracle.graal.python.test/src/tests/test_multiprocessing_graalpy.py deleted file mode 100644 index 033071c400..0000000000 --- a/graalpython/com.oracle.graal.python.test/src/tests/test_multiprocessing_graalpy.py +++ /dev/null @@ -1,104 +0,0 @@ -# Copyright (c) 2020, 2026, Oracle and/or its affiliates. All rights reserved. -# DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. -# -# The Universal Permissive License (UPL), Version 1.0 -# -# Subject to the condition set forth below, permission is hereby granted to any -# person obtaining a copy of this software, associated documentation and/or -# data (collectively the "Software"), free of charge and under any and all -# copyright rights in the Software, and any and all patent rights owned or -# freely licensable by each licensor hereunder covering either (i) the -# unmodified Software as contributed to or provided by such licensor, or (ii) -# the Larger Works (as defined below), to deal in both -# -# (a) the Software, and -# -# (b) any piece of software and/or hardware listed in the lrgrwrks.txt file if -# one is included with the Software each a "Larger Work" to which the Software -# is contributed by such licensors), -# -# without restriction, including without limitation the rights to copy, create -# derivative works of, display, perform, and distribute the Software and make, -# use, sell, offer for sale, import, export, have made, and have sold the -# Software and the Larger Work(s), and to sublicense the foregoing rights on -# either these or other terms. -# -# This license is subject to the following condition: -# -# The above copyright notice and either this complete permission notice or at a -# minimum a reference to the UPL must be included in all copies or substantial -# portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. -import multiprocessing -from multiprocessing.connection import wait -from functools import wraps -from dataclasses import dataclass - -import os -import sys -import time - -from tests.util import skip_if_sandboxed - - -if sys.implementation.name == 'graalpy': - def graalpy_multiprocessing(test): - @wraps(test) - def set_graalpy(): - original_ctx = multiprocessing.get_context().get_start_method() - try: - multiprocessing.set_start_method('graalpy', force=True) - test() - finally: - multiprocessing.set_start_method(original_ctx, force=True) - return set_graalpy - - - @graalpy_multiprocessing - def test_SemLock_raises_on_non_string_name(): - from _multiprocessing_graalpy import SemLock - try: - SemLock(kind=1, value=1, name={1: 2}, maxvalue=1, unlink=1) - except TypeError: - pass - else: - assert False - - - @graalpy_multiprocessing - @skip_if_sandboxed("Sandboxed runs use an emulated backend for multiprocessing wait") - def test_wait_timeout(): - timeout = 3 - a, b = multiprocessing.Pipe() - x, y = multiprocessing.connection.Pipe(False) # Truffle multiprocessing pipe - for fds in [[a, b], [x, y], [a, b, x, y]]: - start = time.monotonic() - res = wait(fds, timeout) - delta = time.monotonic() - start - assert not res - # The GraalPy multiprocessing wait path actively polls fake file descriptors and may - # overshoot under scheduling contention. - assert delta < timeout * 8 - assert delta > timeout / 2 - - - @graalpy_multiprocessing - @skip_if_sandboxed("Sandboxed runs use an emulated backend for multiprocessing wait") - def test_wait(): - a, b = multiprocessing.Pipe() - x, y = multiprocessing.connection.Pipe(False) # Truffle multiprocessing pipe - a.send(42) - res = wait([b, y], 3) - assert res == [b], "res1" - assert b.recv() == 42, "res2" - y.send(33) - res = wait([b, x], 3) - assert res == [x], "res3" - assert x.recv() == 33, "res4" diff --git a/graalpython/com.oracle.graal.python.test/src/tests/timings-darwin.json b/graalpython/com.oracle.graal.python.test/src/tests/timings-darwin.json index a690dc7bba..6d2677b503 100644 --- a/graalpython/com.oracle.graal.python.test/src/tests/timings-darwin.json +++ b/graalpython/com.oracle.graal.python.test/src/tests/timings-darwin.json @@ -166,7 +166,6 @@ "graalpython/com.oracle.graal.python.test/src/tests/test_module_property.py": 0.01, "graalpython/com.oracle.graal.python.test/src/tests/test_mro.py": 0.03, "graalpython/com.oracle.graal.python.test/src/tests/test_multiprocessing.py": 0.2, - "graalpython/com.oracle.graal.python.test/src/tests/test_multiprocessing_graalpy.py": 9.08, "graalpython/com.oracle.graal.python.test/src/tests/test_nonlocal.py": 0.0, "graalpython/com.oracle.graal.python.test/src/tests/test_object.py": 0.0, "graalpython/com.oracle.graal.python.test/src/tests/test_operator.py": 0.01, @@ -233,4 +232,4 @@ "graalpython/com.oracle.graal.python.test/src/tests/test_yield_from.py": 0.03, "graalpython/com.oracle.graal.python.test/src/tests/test_zipimport.py": 0.05, "graalpython/com.oracle.graal.python.test/src/tests/test_zlib.py": 0.02 -} \ No newline at end of file +} diff --git a/graalpython/com.oracle.graal.python.test/src/tests/timings-linux.json b/graalpython/com.oracle.graal.python.test/src/tests/timings-linux.json index 75a77cdae2..f1ad67a8d4 100644 --- a/graalpython/com.oracle.graal.python.test/src/tests/timings-linux.json +++ b/graalpython/com.oracle.graal.python.test/src/tests/timings-linux.json @@ -169,7 +169,6 @@ "graalpython/com.oracle.graal.python.test/src/tests/test_module_property.py": 0.07999999999999999, "graalpython/com.oracle.graal.python.test/src/tests/test_mro.py": 0.0, "graalpython/com.oracle.graal.python.test/src/tests/test_multiprocessing.py": 0.36, - "graalpython/com.oracle.graal.python.test/src/tests/test_multiprocessing_graalpy.py": 9.059999999999999, "graalpython/com.oracle.graal.python.test/src/tests/test_nonlocal.py": 0.0, "graalpython/com.oracle.graal.python.test/src/tests/test_object.py": 0.0, "graalpython/com.oracle.graal.python.test/src/tests/test_operator.py": 0.0, @@ -236,4 +235,4 @@ "graalpython/com.oracle.graal.python.test/src/tests/test_yield_from.py": 0.01, "graalpython/com.oracle.graal.python.test/src/tests/test_zipimport.py": 0.060000000000000005, "graalpython/com.oracle.graal.python.test/src/tests/test_zlib.py": 0.03 -} \ No newline at end of file +} diff --git a/graalpython/com.oracle.graal.python.test/src/tests/unittest_tags/test_multiprocessing_graalpy.txt b/graalpython/com.oracle.graal.python.test/src/tests/unittest_tags/test_multiprocessing_graalpy.txt deleted file mode 100644 index 2bd40ad8d2..0000000000 --- a/graalpython/com.oracle.graal.python.test/src/tests/unittest_tags/test_multiprocessing_graalpy.txt +++ /dev/null @@ -1,18 +0,0 @@ -test.test_multiprocessing_graalpy.ChallengeResponseTest.test_challengeresponse @ linux-aarch64-github,linux-x86_64-github -test.test_multiprocessing_graalpy.MiscTestCase.test__all__ @ linux-aarch64-github,linux-x86_64-github -test.test_multiprocessing_graalpy.OtherTest.test_answer_challenge_auth_failure @ linux-aarch64-github,linux-x86_64-github -test.test_multiprocessing_graalpy.OtherTest.test_deliver_challenge_auth_failure @ linux-aarch64-github,linux-x86_64-github -test.test_multiprocessing_graalpy.SemLockTests.test_semlock_subclass @ linux-aarch64-github,linux-x86_64-github -test.test_multiprocessing_graalpy.TestInvalidFamily.test_invalid_family @ linux-aarch64-github,linux-x86_64-github -test.test_multiprocessing_graalpy.TestNoForkBomb.test_noforkbomb @ linux-aarch64-github,linux-x86_64-github -test.test_multiprocessing_graalpy.TestPoolNotLeakOnFailure.test_release_unused_processes @ linux-aarch64-github,linux-x86_64-github -test.test_multiprocessing_graalpy.TestResourceTracker.test_resource_tracker @ linux-aarch64-github,linux-x86_64-github -test.test_multiprocessing_graalpy.TestSimpleQueue.test_close @ linux-aarch64-github,linux-x86_64-github -test.test_multiprocessing_graalpy.TestSimpleQueue.test_empty_exceptions @ linux-aarch64-github,linux-x86_64-github -test.test_multiprocessing_graalpy.TestStartMethod.test_get_all @ linux-aarch64-github,linux-x86_64-github -test.test_multiprocessing_graalpy.TestStdinBadfiledescriptor.test_flushing @ linux-aarch64-github,linux-x86_64-github -test.test_multiprocessing_graalpy.TestStdinBadfiledescriptor.test_pool_in_process @ linux-aarch64-github,linux-x86_64-github -test.test_multiprocessing_graalpy.TestStdinBadfiledescriptor.test_queue_in_process @ linux-aarch64-github,linux-x86_64-github -test.test_multiprocessing_graalpy.TestWait.test_neg_timeout @ linux-aarch64-github,linux-x86_64-github -test.test_multiprocessing_graalpy.TestWait.test_wait_timeout @ linux-aarch64-github,linux-x86_64-github -test.test_multiprocessing_graalpy._TestImportStar.test_import @ linux-aarch64-github,linux-x86_64-github diff --git a/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/PythonLanguage.java b/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/PythonLanguage.java index 3dafa77699..929e713df5 100644 --- a/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/PythonLanguage.java +++ b/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/PythonLanguage.java @@ -45,7 +45,6 @@ import java.util.Map; import java.util.WeakHashMap; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Semaphore; import java.util.logging.Level; import org.graalvm.collections.EconomicMap; @@ -343,20 +342,6 @@ public boolean isSingleContext() { @CompilationFinal(dimensions = 1) public static final PythonAbstractObject[] CONTEXT_INSENSITIVE_SINGLETONS = new PythonAbstractObject[]{PNone.NONE, PEllipsis.INSTANCE, PNotImplemented.NOT_IMPLEMENTED}; - /** - * Named semaphores are shared between all processes in a system, and they persist until the - * system is shut down, unless explicitly removed. We interpret this as meaning they all exist - * globally per language instance, that is, they are shared between different Contexts in the - * same engine. - * - * Top level contexts use this map to initialize their shared multiprocessing data. Inner - * children contexts created for the multiprocessing module ignore this map in - * {@link PythonLanguage} and instead inherit it in the shared multiprocessing data from their - * parent context. This way, the child inner contexts do not have to run in the same engine - * (have the same language instance), but can still share the named semaphores. - */ - public final ConcurrentHashMap namedSemaphores = new ConcurrentHashMap<>(); - @CompilationFinal(dimensions = 1) private volatile Object[] engineOptionsStorage; @CompilationFinal private volatile OptionValues engineOptions; @CompilationFinal private boolean useNativePrimitiveStorage; diff --git a/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/Python3Core.java b/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/Python3Core.java index 64a0c471f3..e89b7b6604 100644 --- a/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/Python3Core.java +++ b/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/Python3Core.java @@ -195,8 +195,6 @@ import com.oracle.graal.python.builtins.modules.lzma.LZMACompressorBuiltins; import com.oracle.graal.python.builtins.modules.lzma.LZMADecompressorBuiltins; import com.oracle.graal.python.builtins.modules.lzma.LZMAModuleBuiltins; -import com.oracle.graal.python.builtins.modules.multiprocessing.GraalPySemLockBuiltins; -import com.oracle.graal.python.builtins.modules.multiprocessing.MultiprocessingGraalPyModuleBuiltins; import com.oracle.graal.python.builtins.modules.multiprocessing.MultiprocessingModuleBuiltins; import com.oracle.graal.python.builtins.modules.multiprocessing.SemLockBuiltins; import com.oracle.graal.python.builtins.modules.pickle.PickleBufferBuiltins; @@ -765,8 +763,6 @@ private static PythonBuiltins[] initializeBuiltins(TruffleLanguage.Env env) { // _multiprocessing PythonImageBuildOptions.WITHOUT_NATIVE_POSIX ? null : new MultiprocessingModuleBuiltins(), PythonImageBuildOptions.WITHOUT_NATIVE_POSIX ? null : new SemLockBuiltins(), - new MultiprocessingGraalPyModuleBuiltins(), - new GraalPySemLockBuiltins(), new WarningsModuleBuiltins(), new GraalPythonModuleBuiltins(), diff --git a/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/PythonBuiltinClassType.java b/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/PythonBuiltinClassType.java index d1a36fd72a..e7dbb5c3b9 100644 --- a/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/PythonBuiltinClassType.java +++ b/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/PythonBuiltinClassType.java @@ -124,7 +124,6 @@ import com.oracle.graal.python.builtins.modules.lsprof.ProfilerBuiltins; import com.oracle.graal.python.builtins.modules.lzma.LZMACompressorBuiltins; import com.oracle.graal.python.builtins.modules.lzma.LZMADecompressorBuiltins; -import com.oracle.graal.python.builtins.modules.multiprocessing.GraalPySemLockBuiltins; import com.oracle.graal.python.builtins.modules.multiprocessing.SemLockBuiltins; import com.oracle.graal.python.builtins.modules.pickle.PickleBufferBuiltins; import com.oracle.graal.python.builtins.modules.pickle.PicklerBuiltins; @@ -635,7 +634,6 @@ passed as positional arguments to zip(). The i-th element in every tuple PLock("LockType", PythonObject, newBuilder().publishInModule(J__THREAD).disallowInstantiation().slots(CommonLockBuiltins.SLOTS, LockTypeBuiltins.SLOTS)), PRLock("RLock", PythonObject, newBuilder().publishInModule(J__THREAD).basetype().slots(CommonLockBuiltins.SLOTS, RLockBuiltins.SLOTS)), PSemLock("SemLock", PythonObject, newBuilder().publishInModule("_multiprocessing").basetype().slots(SemLockBuiltins.SLOTS)), - PGraalPySemLock("SemLock", PythonObject, newBuilder().publishInModule("_multiprocessing_graalpy").basetype().slots(GraalPySemLockBuiltins.SLOTS)), PSocket("socket", PythonObject, newBuilder().publishInModule(J__SOCKET).basetype().slots(SocketBuiltins.SLOTS)), PStaticmethod("staticmethod", PythonObject, newBuilder().publishInModule(J_BUILTINS).basetype().addDict(24).slots(StaticmethodBuiltins.SLOTS).doc(""" staticmethod(function) -> method diff --git a/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/modules/PosixModuleBuiltins.java b/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/modules/PosixModuleBuiltins.java index 67a5506dde..f46e4f938b 100644 --- a/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/modules/PosixModuleBuiltins.java +++ b/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/modules/PosixModuleBuiltins.java @@ -887,9 +887,6 @@ static PNone close(VirtualFrame frame, int fd, @Cached GilNode gil, @Cached PConstructAndRaiseNode.Lazy constructAndRaiseNode) { try { - if (context.getSharedMultiprocessingData().decrementFDRefCount(fd)) { - return PNone.NONE; - } gil.release(true); try { posixLib.close(context.getPosixSupport(), fd); diff --git a/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/modules/multiprocessing/GraalPySemLockBuiltins.java b/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/modules/multiprocessing/GraalPySemLockBuiltins.java deleted file mode 100644 index 6156dcd4a0..0000000000 --- a/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/modules/multiprocessing/GraalPySemLockBuiltins.java +++ /dev/null @@ -1,345 +0,0 @@ -/* - * Copyright (c) 2019, 2025, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * The Universal Permissive License (UPL), Version 1.0 - * - * Subject to the condition set forth below, permission is hereby granted to any - * person obtaining a copy of this software, associated documentation and/or - * data (collectively the "Software"), free of charge and under any and all - * copyright rights in the Software, and any and all patent rights owned or - * freely licensable by each licensor hereunder covering either (i) the - * unmodified Software as contributed to or provided by such licensor, or (ii) - * the Larger Works (as defined below), to deal in both - * - * (a) the Software, and - * - * (b) any piece of software and/or hardware listed in the lrgrwrks.txt file if - * one is included with the Software each a "Larger Work" to which the Software - * is contributed by such licensors), - * - * without restriction, including without limitation the rights to copy, create - * derivative works of, display, perform, and distribute the Software and make, - * use, sell, offer for sale, import, export, have made, and have sold the - * Software and the Larger Work(s), and to sublicense the foregoing rights on - * either these or other terms. - * - * This license is subject to the following condition: - * - * The above copyright notice and either this complete permission notice or at a - * minimum a reference to the UPL must be included in all copies or substantial - * portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ -package com.oracle.graal.python.builtins.modules.multiprocessing; - -import static com.oracle.graal.python.nodes.SpecialMethodNames.J___ENTER__; -import static com.oracle.graal.python.nodes.SpecialMethodNames.J___EXIT__; - -import java.util.List; -import java.util.concurrent.Semaphore; - -import com.oracle.graal.python.PythonLanguage; -import com.oracle.graal.python.annotations.ArgumentClinic; -import com.oracle.graal.python.annotations.Slot; -import com.oracle.graal.python.annotations.Slot.SlotKind; -import com.oracle.graal.python.annotations.Slot.SlotSignature; -import com.oracle.graal.python.annotations.Builtin; -import com.oracle.graal.python.builtins.CoreFunctions; -import com.oracle.graal.python.builtins.Python3Core; -import com.oracle.graal.python.builtins.PythonBuiltinClassType; -import com.oracle.graal.python.builtins.PythonBuiltins; -import com.oracle.graal.python.builtins.modules.multiprocessing.GraalPySemLockBuiltinsClinicProviders.SemLockNodeClinicProviderGen; -import com.oracle.graal.python.builtins.objects.PNone; -import com.oracle.graal.python.builtins.objects.type.TpSlots; -import com.oracle.graal.python.builtins.objects.type.TypeNodes; -import com.oracle.graal.python.lib.PyFloatAsDoubleNode; -import com.oracle.graal.python.nodes.ErrorMessages; -import com.oracle.graal.python.nodes.PRaiseNode; -import com.oracle.graal.python.nodes.function.PythonBuiltinBaseNode; -import com.oracle.graal.python.nodes.function.PythonBuiltinNode; -import com.oracle.graal.python.nodes.function.builtins.PythonClinicBuiltinNode; -import com.oracle.graal.python.nodes.function.builtins.PythonTernaryBuiltinNode; -import com.oracle.graal.python.nodes.function.builtins.PythonTernaryClinicBuiltinNode; -import com.oracle.graal.python.nodes.function.builtins.PythonUnaryBuiltinNode; -import com.oracle.graal.python.nodes.function.builtins.clinic.ArgumentClinicProvider; -import com.oracle.graal.python.runtime.GilNode; -import com.oracle.graal.python.runtime.PythonContext; -import com.oracle.graal.python.runtime.PythonContext.SharedMultiprocessingData; -import com.oracle.graal.python.runtime.object.PFactory; -import com.oracle.truffle.api.CompilerDirectives.TruffleBoundary; -import com.oracle.truffle.api.dsl.Bind; -import com.oracle.truffle.api.dsl.Cached; -import com.oracle.truffle.api.dsl.GenerateNodeFactory; -import com.oracle.truffle.api.dsl.NodeFactory; -import com.oracle.truffle.api.dsl.Specialization; -import com.oracle.truffle.api.frame.VirtualFrame; -import com.oracle.truffle.api.nodes.Node; -import com.oracle.truffle.api.strings.TruffleString; - -@CoreFunctions(extendClasses = {PythonBuiltinClassType.PGraalPySemLock}) -public final class GraalPySemLockBuiltins extends PythonBuiltins { - - public static final TpSlots SLOTS = GraalPySemLockBuiltinsSlotsGen.SLOTS; - - @Override - protected List> getNodeFactories() { - return GraalPySemLockBuiltinsFactory.getFactories(); - } - - @Override - public void initialize(Python3Core core) { - addBuiltinConstant("SEM_VALUE_MAX", Integer.MAX_VALUE); - super.initialize(core); - } - - @Slot(value = SlotKind.tp_new, isComplex = true) - @SlotSignature(name = "SemLock", parameterNames = {"cls", "kind", "value", "maxvalue", "name", "unlink"}) - @ArgumentClinic(name = "kind", conversion = ArgumentClinic.ClinicConversion.Int) - @ArgumentClinic(name = "value", conversion = ArgumentClinic.ClinicConversion.Int) - @ArgumentClinic(name = "maxvalue", conversion = ArgumentClinic.ClinicConversion.Int) - @ArgumentClinic(name = "name", conversion = ArgumentClinic.ClinicConversion.TString) - @ArgumentClinic(name = "unlink", conversion = ArgumentClinic.ClinicConversion.IntToBoolean) - @GenerateNodeFactory - abstract static class SemLockNode extends PythonClinicBuiltinNode { - @Specialization - static PGraalPySemLock construct(Object cls, int kind, int value, @SuppressWarnings("unused") int maxValue, TruffleString name, boolean unlink, - @Bind Node inliningTarget, - @Cached TypeNodes.GetInstanceShape getInstanceShape, - @Cached PRaiseNode raiseNode) { - if (kind != PGraalPySemLock.RECURSIVE_MUTEX && kind != PGraalPySemLock.SEMAPHORE) { - throw raiseNode.raise(inliningTarget, PythonBuiltinClassType.ValueError, ErrorMessages.UNRECOGNIZED_KIND); - } - Semaphore semaphore = newSemaphore(value); - if (!unlink) { - // CPython creates a named semaphore, and if unlink != 0 unlinks - // it directly, so it cannot be accessed by other processes. We - // have to explicitly link it, so we do that here if we - // must. CPython always uses O_CREAT | O_EXCL for creating named - // semaphores, so a conflict raises. - SharedMultiprocessingData multiprocessing = PythonContext.get(inliningTarget).getSharedMultiprocessingData(); - if (multiprocessing.getNamedSemaphore(name) != null) { - throw raiseNode.raise(inliningTarget, PythonBuiltinClassType.FileExistsError, ErrorMessages.SEMAPHORE_NAME_TAKEN, name); - } else { - multiprocessing.putNamedSemaphore(name, semaphore); - } - } - return PFactory.createGraalPySemLock(cls, getInstanceShape.execute(cls), name, kind, semaphore); - } - - @TruffleBoundary - private static Semaphore newSemaphore(int value) { - return new Semaphore(value); - } - - @Override - protected ArgumentClinicProvider getArgumentClinic() { - return SemLockNodeClinicProviderGen.INSTANCE; - } - } - - @Builtin(name = "_count", minNumOfPositionalArgs = 1) - @GenerateNodeFactory - abstract static class CountNode extends PythonUnaryBuiltinNode { - @Specialization - static int getCount(PGraalPySemLock self) { - return self.getCount(); - } - } - - @Builtin(name = "_is_mine", minNumOfPositionalArgs = 1) - @GenerateNodeFactory - abstract static class IsMineNode extends PythonUnaryBuiltinNode { - @Specialization - static boolean isMine(PGraalPySemLock self) { - return self.isMine(); - } - } - - @Builtin(name = "_is_zero", minNumOfPositionalArgs = 1) - @GenerateNodeFactory - abstract static class IsZeroNode extends PythonUnaryBuiltinNode { - @Specialization - static boolean isZero(PGraalPySemLock self) { - return self.isZero(); - } - } - - @Builtin(name = "_get_value", minNumOfPositionalArgs = 1) - @GenerateNodeFactory - abstract static class GetValueNode extends PythonUnaryBuiltinNode { - @Specialization - static int getValue(PGraalPySemLock self) { - return self.getValue(); - } - } - - @Builtin(name = "handle", minNumOfPositionalArgs = 1, isGetter = true) - @GenerateNodeFactory - abstract static class GetHandleNode extends PythonUnaryBuiltinNode { - @Specialization - @TruffleBoundary - static int getHandle(PGraalPySemLock self) { - return self.hashCode(); - } - } - - @Builtin(name = "name", minNumOfPositionalArgs = 1, isGetter = true) - @GenerateNodeFactory - abstract static class GetNameNode extends PythonUnaryBuiltinNode { - @Specialization - static TruffleString getName(PGraalPySemLock self) { - return self.getName(); - } - } - - @Builtin(name = "maxvalue", minNumOfPositionalArgs = 1, isGetter = true) - @GenerateNodeFactory - abstract static class GetMaxValue extends PythonUnaryBuiltinNode { - @Specialization - static Object getMax(@SuppressWarnings("unused") PGraalPySemLock self) { - return Integer.MAX_VALUE; - } - } - - @Builtin(name = "kind", minNumOfPositionalArgs = 1, isGetter = true) - @GenerateNodeFactory - abstract static class GetKindNode extends PythonUnaryBuiltinNode { - @Specialization - static int getKind(PGraalPySemLock self) { - return self.getKind(); - } - } - - @Builtin(name = "acquire", minNumOfPositionalArgs = 1, parameterNames = {"self", "blocking", "timeout"}) - @ArgumentClinic(name = "blocking", conversion = ArgumentClinic.ClinicConversion.Boolean, defaultValue = "true") - @GenerateNodeFactory - abstract static class AcquireNode extends PythonTernaryClinicBuiltinNode { - - @Override - protected ArgumentClinicProvider getArgumentClinic() { - return GraalPySemLockBuiltinsClinicProviders.AcquireNodeClinicProviderGen.INSTANCE; - } - - protected static boolean isFast(PGraalPySemLock self) { - return self.getKind() == PGraalPySemLock.RECURSIVE_MUTEX && self.isMine(); - } - - @Specialization(guards = "isFast(self)") - static boolean fast(PGraalPySemLock self, @SuppressWarnings("unused") boolean blocking, @SuppressWarnings("unused") Object timeout) { - self.increaseCount(); - return true; - } - - @Specialization(guards = "!isFast(self)") - static boolean slow(VirtualFrame frame, PGraalPySemLock self, boolean blocking, Object timeoutObj, - @Bind Node inliningTarget, - @Cached PyFloatAsDoubleNode asDoubleNode, - @Cached GilNode gil) { - boolean hasDeadline = !(timeoutObj instanceof PNone); - long timeoutMs = 0; - if (hasDeadline) { - timeoutMs = (long) (asDoubleNode.execute(frame, inliningTarget, timeoutObj) * 1000); - if (timeoutMs < 0) { - timeoutMs = 0; - } - } - boolean acquired = self.acquireNonBlocking(); - if (acquired) { - return true; - } - if (!blocking) { - return false; - } - gil.release(true); - try { - if (hasDeadline) { - return self.acquireTimeout(inliningTarget, timeoutMs); - } else { - return self.acquireBlocking(inliningTarget); - } - } finally { - gil.acquire(); - } - } - } - - @Builtin(name = J___ENTER__, minNumOfPositionalArgs = 1, parameterNames = {"self", "blocking", "timeout"}) - @GenerateNodeFactory - abstract static class EnterLockNode extends PythonTernaryBuiltinNode { - @Specialization - static Object doEnter(VirtualFrame frame, PGraalPySemLock self, Object blocking, Object timeout, - @Cached AcquireNode acquireNode) { - return acquireNode.execute(frame, self, blocking, timeout); - } - } - - @Builtin(name = "_rebuild", minNumOfPositionalArgs = 4, parameterNames = {"handle", "kind", "maxvalue", "name"}) - @ArgumentClinic(name = "kind", conversion = ArgumentClinic.ClinicConversion.Int) - @ArgumentClinic(name = "name", conversion = ArgumentClinic.ClinicConversion.TString) - @GenerateNodeFactory - abstract static class RebuildNode extends PythonClinicBuiltinNode { - @Override - protected ArgumentClinicProvider getArgumentClinic() { - return GraalPySemLockBuiltinsClinicProviders.RebuildNodeClinicProviderGen.INSTANCE; - } - - @Specialization - static Object doEnter(@SuppressWarnings("unused") Object handle, int kind, @SuppressWarnings("unused") Object maxvalue, TruffleString name, - @Bind Node inliningTarget, - @Bind PythonLanguage language) { - SharedMultiprocessingData multiprocessing = PythonContext.get(inliningTarget).getSharedMultiprocessingData(); - Semaphore semaphore = multiprocessing.getNamedSemaphore(name); - if (semaphore == null) { - // TODO can this even happen? cpython simply creates a semlock object with the - // provided handle - semaphore = newSemaphore(); - } - return PFactory.createGraalPySemLock(language, name, kind, semaphore); - } - - @TruffleBoundary - private static Semaphore newSemaphore() { - return new Semaphore(0); - } - } - - @Builtin(name = "release", minNumOfPositionalArgs = 1) - @GenerateNodeFactory - abstract static class ReleaseLockNode extends PythonUnaryBuiltinNode { - @Specialization - static Object doRelease(PGraalPySemLock self, - @Bind Node inliningTarget, - @Cached PRaiseNode raiseNode) { - if (self.getKind() == PGraalPySemLock.RECURSIVE_MUTEX) { - if (!self.isMine()) { - throw raiseNode.raise(inliningTarget, PythonBuiltinClassType.AssertionError, ErrorMessages.ATTEMP_TO_RELEASE_RECURSIVE_LOCK); - } - if (self.getCount() > 1) { - self.decreaseCount(); - return PNone.NONE; - } - assert self.getCount() == 1; - } - self.release(); - return PNone.NONE; - } - } - - @Builtin(name = J___EXIT__, minNumOfPositionalArgs = 4) - @GenerateNodeFactory - abstract static class ExitLockNode extends PythonBuiltinNode { - @Specialization - static Object exit(PGraalPySemLock self, @SuppressWarnings("unused") Object type, @SuppressWarnings("unused") Object value, @SuppressWarnings("unused") Object traceback) { - self.release(); - return PNone.NONE; - } - } -} diff --git a/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/modules/multiprocessing/MultiprocessingGraalPyModuleBuiltins.java b/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/modules/multiprocessing/MultiprocessingGraalPyModuleBuiltins.java deleted file mode 100644 index 53e3fe8e50..0000000000 --- a/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/modules/multiprocessing/MultiprocessingGraalPyModuleBuiltins.java +++ /dev/null @@ -1,456 +0,0 @@ -/* - * Copyright (c) 2019, 2025, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * The Universal Permissive License (UPL), Version 1.0 - * - * Subject to the condition set forth below, permission is hereby granted to any - * person obtaining a copy of this software, associated documentation and/or - * data (collectively the "Software"), free of charge and under any and all - * copyright rights in the Software, and any and all patent rights owned or - * freely licensable by each licensor hereunder covering either (i) the - * unmodified Software as contributed to or provided by such licensor, or (ii) - * the Larger Works (as defined below), to deal in both - * - * (a) the Software, and - * - * (b) any piece of software and/or hardware listed in the lrgrwrks.txt file if - * one is included with the Software each a "Larger Work" to which the Software - * is contributed by such licensors), - * - * without restriction, including without limitation the rights to copy, create - * derivative works of, display, perform, and distribute the Software and make, - * use, sell, offer for sale, import, export, have made, and have sold the - * Software and the Larger Work(s), and to sublicense the foregoing rights on - * either these or other terms. - * - * This license is subject to the following condition: - * - * The above copyright notice and either this complete permission notice or at a - * minimum a reference to the UPL must be included in all copies or substantial - * portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ -package com.oracle.graal.python.builtins.modules.multiprocessing; - -import static com.oracle.graal.python.builtins.PythonBuiltinClassType.OSError; - -import java.util.List; -import java.util.Objects; -import java.util.concurrent.Semaphore; - -import com.oracle.graal.python.PythonLanguage; -import com.oracle.graal.python.annotations.Builtin; -import com.oracle.graal.python.builtins.CoreFunctions; -import com.oracle.graal.python.builtins.Python3Core; -import com.oracle.graal.python.builtins.PythonBuiltins; -import com.oracle.graal.python.builtins.modules.PosixModuleBuiltins; -import com.oracle.graal.python.builtins.objects.PNone; -import com.oracle.graal.python.builtins.objects.buffer.PythonBufferAccessLibrary; -import com.oracle.graal.python.builtins.objects.bytes.PBytes; -import com.oracle.graal.python.builtins.objects.common.SequenceNodes; -import com.oracle.graal.python.builtins.objects.common.SequenceStorageNodes; -import com.oracle.graal.python.builtins.objects.exception.OSErrorEnum; -import com.oracle.graal.python.builtins.objects.ints.PInt; -import com.oracle.graal.python.builtins.objects.list.PList; -import com.oracle.graal.python.builtins.objects.thread.PThread; -import com.oracle.graal.python.builtins.objects.tuple.PTuple; -import com.oracle.graal.python.lib.PyObjectGetItem; -import com.oracle.graal.python.lib.PyObjectSizeNode; -import com.oracle.graal.python.nodes.ErrorMessages; -import com.oracle.graal.python.nodes.PConstructAndRaiseNode; -import com.oracle.graal.python.nodes.PRaiseNode; -import com.oracle.graal.python.nodes.builtins.ListNodes; -import com.oracle.graal.python.nodes.function.PythonBuiltinBaseNode; -import com.oracle.graal.python.nodes.function.PythonBuiltinNode; -import com.oracle.graal.python.nodes.function.builtins.PythonBinaryBuiltinNode; -import com.oracle.graal.python.nodes.function.builtins.PythonUnaryBuiltinNode; -import com.oracle.graal.python.nodes.util.CannotCastException; -import com.oracle.graal.python.nodes.util.CastToJavaDoubleNode; -import com.oracle.graal.python.nodes.util.CastToJavaIntExactNode; -import com.oracle.graal.python.nodes.util.CastToJavaIntLossyNode; -import com.oracle.graal.python.runtime.GilNode; -import com.oracle.graal.python.runtime.PosixSupportLibrary; -import com.oracle.graal.python.runtime.PosixSupportLibrary.Timeval; -import com.oracle.graal.python.runtime.PythonContext; -import com.oracle.graal.python.runtime.PythonContext.SharedMultiprocessingData; -import com.oracle.graal.python.runtime.object.PFactory; -import com.oracle.graal.python.runtime.sequence.storage.SequenceStorage; -import com.oracle.graal.python.util.ArrayBuilder; -import com.oracle.graal.python.util.PythonUtils; -import com.oracle.truffle.api.CompilerDirectives; -import com.oracle.truffle.api.CompilerDirectives.TruffleBoundary; -import com.oracle.truffle.api.TruffleContext; -import com.oracle.truffle.api.TruffleLogger; -import com.oracle.truffle.api.dsl.Bind; -import com.oracle.truffle.api.dsl.Cached; -import com.oracle.truffle.api.dsl.Cached.Shared; -import com.oracle.truffle.api.dsl.GenerateNodeFactory; -import com.oracle.truffle.api.dsl.NodeFactory; -import com.oracle.truffle.api.dsl.Specialization; -import com.oracle.truffle.api.frame.VirtualFrame; -import com.oracle.truffle.api.library.CachedLibrary; -import com.oracle.truffle.api.nodes.Node; -import com.oracle.truffle.api.strings.TruffleString; - -@CoreFunctions(defineModule = "_multiprocessing_graalpy") -public final class MultiprocessingGraalPyModuleBuiltins extends PythonBuiltins { - - private static final TruffleLogger LOGGER = PythonLanguage.getLogger(MultiprocessingGraalPyModuleBuiltins.class); - - @Override - protected List> getNodeFactories() { - return MultiprocessingGraalPyModuleBuiltinsFactory.getFactories(); - } - - @Override - public void initialize(Python3Core core) { - // TODO: add necessary entries to the dict - addBuiltinConstant("flags", PFactory.createDict(core.getLanguage())); - super.initialize(core); - } - - @GenerateNodeFactory - @Builtin(name = "sem_unlink", parameterNames = {"name"}) - abstract static class SemUnlink extends PythonUnaryBuiltinNode { - @Specialization - PNone doit(VirtualFrame frame, TruffleString name, - @Bind Node inliningTarget, - @Cached PConstructAndRaiseNode.Lazy constructAndRaiseNode) { - Semaphore prev = getContext().getSharedMultiprocessingData().removeNamedSemaphore(name); - if (prev == null) { - throw constructAndRaiseNode.get(inliningTarget).raiseFileNotFoundError(frame, ErrorMessages.NO_SUCH_FILE_OR_DIR_WITH_LABEL, "semaphores", name); - } - return PNone.NONE; - } - } - - @Builtin(name = "_spawn_context", minNumOfPositionalArgs = 3, parameterNames = {"fd", "sentinel", "keepFds"}) - @GenerateNodeFactory - abstract static class SpawnContextNode extends PythonBuiltinNode { - @Specialization - long spawn(int fd, int sentinel, PList keepFds, - @Bind Node inliningTarget, - @Cached SequenceStorageNodes.GetItemNode getItem, - @Cached CastToJavaIntExactNode castToJavaIntNode) { - SequenceStorage storage = keepFds.getSequenceStorage(); - int length = storage.length(); - int[] keep = new int[length]; - for (int i = 0; i < length; i++) { - Object item = getItem.execute(storage, i); - keep[i] = castToJavaIntNode.execute(inliningTarget, item); - } - PythonContext context = getContext(); - long tid = context.spawnTruffleContext(fd, sentinel, keep); - return convertTid(tid); - } - } - - @Builtin(name = "_gettid") - @GenerateNodeFactory - abstract static class GetTidNode extends PythonBuiltinNode { - @Specialization - @TruffleBoundary - long getTid( - @Bind Node inliningTarget) { - return convertTid(PThread.getThreadId(Objects.requireNonNull(PythonContext.get(inliningTarget).getMainThread()))); - } - } - - @Builtin(name = "_waittid", minNumOfPositionalArgs = 2, parameterNames = {"tid", "options"}) - @GenerateNodeFactory - abstract static class WaitTidNode extends PythonBinaryBuiltinNode { - @Specialization - PTuple waittid(long id, @SuppressWarnings("unused") int options, - @Bind PythonLanguage language) { - long tid = convertTid(id); - // TODO implement for options - WNOHANG and 0 - final SharedMultiprocessingData multiprocessing = getContext().getSharedMultiprocessingData(); - Thread thread = multiprocessing.getChildContextThread(tid); - if (thread != null && thread.isAlive()) { - return PFactory.createTuple(language, new Object[]{0, 0, 0}); - } - - PythonContext.ChildContextData data = multiprocessing.getChildContextData(tid); - /* - * The assumption made here is that once _waittid returns the exit code, the caller - * caches it and never calls _waittid again, so we do not need to keep the data and can - * clean it. See popen_truffleprocess that calls the _waittid builtin. - */ - multiprocessing.removeChildContextData(tid); - return PFactory.createTuple(language, new Object[]{id, data.wasSignaled() ? data.getExitCode() : 0, data.getExitCode()}); - } - } - - @Builtin(name = "_terminate_spawned_thread", minNumOfPositionalArgs = 2, parameterNames = {"tid", "sig"}) - @GenerateNodeFactory - abstract static class TerminateThreadNode extends PythonBinaryBuiltinNode { - @Specialization - @TruffleBoundary - Object terminate(long id, PInt sig) { - final SharedMultiprocessingData multiprocessing = getContext().getSharedMultiprocessingData(); - Thread thread = multiprocessing.getChildContextThread(convertTid(id)); - if (thread != null && thread.isAlive()) { - PythonContext.ChildContextData data = multiprocessing.getChildContextData(convertTid(id)); - try { - data.awaitRunning(); - TruffleContext truffleCtx = data.getTruffleContext(); - if (truffleCtx != null && !truffleCtx.isCancelling() && data.compareAndSetExiting(false, true)) { - LOGGER.fine("terminating spawned thread"); - data.setSignaled(sig.intValue()); - truffleCtx.closeCancelled(this, "_terminate_spawned_thread"); - } - } catch (InterruptedException ex) { - LOGGER.finest("got interrupt while terminating spawned thread"); - } - } - return PNone.NONE; - } - } - - private static long convertTid(long tid) { - return tid * -1; - } - - @Builtin(name = "_pipe", minNumOfPositionalArgs = 0) - @GenerateNodeFactory - abstract static class PipeNode extends PythonBuiltinNode { - - @Specialization - PTuple pipe(@Cached GilNode gil, - @Bind PythonLanguage language) { - int[] pipe; - PythonContext ctx = getContext(); - SharedMultiprocessingData sharedData = ctx.getSharedMultiprocessingData(); - gil.release(true); - try { - pipe = sharedData.pipe(); - ctx.getChildContextFDs().add(pipe[0]); - ctx.getChildContextFDs().add(pipe[1]); - } finally { - gil.acquire(); - } - return PFactory.createTuple(language, new Object[]{pipe[0], pipe[1]}); - } - } - - @Builtin(name = "_write", minNumOfPositionalArgs = 2, parameterNames = {"fd", "data"}) - @GenerateNodeFactory - public abstract static class WriteNode extends PythonBinaryBuiltinNode { - @Specialization(limit = "1") - Object doWrite(int fd, PBytes data, - @CachedLibrary("data") PythonBufferAccessLibrary bufferLib, - @Shared @Cached GilNode gil) { - SharedMultiprocessingData sharedData = getContext().getSharedMultiprocessingData(); - gil.release(true); - try { - byte[] bytes = bufferLib.getCopiedByteArray(data); - sharedData.addPipeData(fd, bytes, - () -> { - throw PRaiseNode.raiseStatic(this, OSError, ErrorMessages.BAD_FILE_DESCRIPTOR); - }, - () -> { - throw PConstructAndRaiseNode.getUncached().raiseOSError(null, OSErrorEnum.EPIPE); - }); - return bytes.length; - } finally { - gil.acquire(); - } - } - - @Specialization(limit = "1") - Object doWrite(long fd, PBytes data, - @CachedLibrary("data") PythonBufferAccessLibrary bufferLib, - @Shared @Cached GilNode gil) { - return doWrite((int) fd, data, bufferLib, gil); - } - } - - @Builtin(name = "_read", minNumOfPositionalArgs = 2, parameterNames = {"fd", "length"}) - @GenerateNodeFactory - public abstract static class ReadNode extends PythonBinaryBuiltinNode { - @Specialization - Object doReadInt(int fd, @SuppressWarnings("unused") Object length, - @Bind PythonLanguage language, - @Shared @Cached GilNode gil) { - SharedMultiprocessingData sharedData = getContext().getSharedMultiprocessingData(); - gil.release(true); - try { - Object data = sharedData.takePipeData(this, fd, () -> { - throw PRaiseNode.raiseStatic(this, OSError, ErrorMessages.BAD_FILE_DESCRIPTOR); - }); - if (data == PNone.NONE) { - return PFactory.createEmptyBytes(language); - } - return PFactory.createBytes(language, (byte[]) data); - } finally { - gil.acquire(); - } - } - - @Specialization - Object doReadLong(long fd, Object length, - @Bind PythonLanguage language, - @Shared @Cached GilNode gil) { - return doReadInt((int) fd, length, language, gil); - } - } - - @Builtin(name = "_close", minNumOfPositionalArgs = 1, parameterNames = {"fd"}) - @GenerateNodeFactory - public abstract static class CloseNode extends PythonUnaryBuiltinNode { - @Specialization - PNone close(@SuppressWarnings("unused") int fd) { - assert fd < 0; - SharedMultiprocessingData sharedData = getContext().getSharedMultiprocessingData(); - if (!sharedData.decrementFDRefCount(fd)) { - sharedData.closePipe(fd); - } - return PNone.NONE; - } - - @Specialization - PNone close(@SuppressWarnings("unused") long fd) { - return close((int) fd); - } - } - - @Builtin(name = "_select", minNumOfPositionalArgs = 4) - @GenerateNodeFactory - abstract static class SelectNode extends PythonBuiltinNode { - /* - * We would like to poll two different things with a timeout: the actual file descriptors - * and the Java managed LinkedBlockingQueues. - * - * The LinkedBlockingQueue does not expose anything that would allow us to wait on multiple - * LinkedBlockingQueues at once, so we'd have to spawn a thread for each or roll out our own - * synchronization of take/offer to allow that. - * - * The actual file descriptors could be backed by Java POSIX emulation layer, or by the - * native POSIX implementation -- the `select` can run actual native select, which we cannot - * easily interrupt from Java if one of the LinkedBlockingQueue is unblocked earlier than - * the native select returns. - * - * Given all these complexities, for the time being, we do active waiting here, but at least - * without holding the GIL, and we also yield in every iteration. - */ - - @Specialization - Object doGeneric(VirtualFrame frame, Object multiprocessingFdsList, Object multiprocessingObjsList, Object posixFileObjsList, Object timeoutObj, - @Bind Node inliningTarget, - @Bind PythonLanguage language, - @Cached PosixModuleBuiltins.FileDescriptorConversionNode fdConvertor, - @Cached PyObjectSizeNode sizeNode, - @Cached PyObjectGetItem getItem, - @Cached SequenceNodes.GetObjectArrayNode getObjectArrayNode, - @Cached ListNodes.FastConstructListNode constructListNode, - @Cached CastToJavaIntLossyNode castToJava, - @Cached CastToJavaDoubleNode castToDouble, - @Cached GilNode gil, - @Cached PConstructAndRaiseNode.Lazy constructAndRaiseNode) { - PythonContext context = getContext(); - SharedMultiprocessingData sharedData = context.getSharedMultiprocessingData(); - - PList list = constructListNode.execute(frame, inliningTarget, multiprocessingFdsList); - int size = sizeNode.execute(frame, inliningTarget, list); - int[] multiprocessingFds = new int[size]; - for (int i = 0; i < size; i++) { - Object pythonObject = getItem.execute(frame, inliningTarget, list, i); - multiprocessingFds[i] = toInt(inliningTarget, castToJava, pythonObject); - } - - Object[] posixFileObjs = getObjectArrayNode.execute(inliningTarget, posixFileObjsList); - int[] posixFds = new int[posixFileObjs.length]; - for (int i = 0; i < posixFileObjs.length; i++) { - posixFds[i] = toInt(inliningTarget, castToJava, fdConvertor.execute(frame, posixFileObjs[i])); - } - - double timeout = castToDouble.execute(inliningTarget, timeoutObj); - - Object[] multiprocessingObjs = getObjectArrayNode.execute(inliningTarget, multiprocessingObjsList); - gil.release(true); - try { - boolean[] selectedMultiprocessingFds = new boolean[multiprocessingFds.length]; - boolean[] selectedPosixFds = new boolean[posixFds.length]; - - doSelect(context.getPosixSupport(), sharedData, posixFds, selectedPosixFds, multiprocessingFds, selectedMultiprocessingFds, timeout); - - ArrayBuilder result = new ArrayBuilder<>(4); - for (int i = 0; i < selectedMultiprocessingFds.length; i++) { - if (selectedMultiprocessingFds[i]) { - result.add(multiprocessingObjs[i]); - } - } - for (int i = 0; i < selectedPosixFds.length; i++) { - if (selectedPosixFds[i]) { - result.add(posixFileObjs[i]); - } - } - - return PFactory.createList(language, result.toArray(new Object[0])); - } catch (PosixSupportLibrary.PosixException e) { - throw constructAndRaiseNode.get(inliningTarget).raiseOSErrorFromPosixException(frame, e); - } finally { - gil.acquire(); - } - } - - private static int toInt(Node inliningTarget, CastToJavaIntLossyNode castToJava, Object pythonObject) { - try { - return castToJava.execute(inliningTarget, pythonObject); - } catch (CannotCastException e) { - throw CompilerDirectives.shouldNotReachHere(); - } - } - - @TruffleBoundary - private static void doSelect(Object posix, SharedMultiprocessingData sharedData, - int[] posixFds, boolean[] selectedPosixFds, - int[] multiprocessingFds, boolean[] selectedMultiprocessingFds, - double timeoutInS) throws PosixSupportLibrary.PosixException { - PosixSupportLibrary posixLib = PosixSupportLibrary.getUncached(); - boolean blocking = timeoutInS >= 0; - boolean untilReady = timeoutInS == 0; - long deadline = 0; - if (blocking && !untilReady) { - long timeout = (long) (timeoutInS * 1000_000_000.0); - deadline = System.nanoTime() + timeout; - } - while (true) { - boolean selected = false; - if (posixFds.length > 0) { - PosixSupportLibrary.SelectResult selectResult = posixLib.select(posix, posixFds, - PythonUtils.EMPTY_INT_ARRAY, PythonUtils.EMPTY_INT_ARRAY, Timeval.SELECT_TIMEOUT_NOW); - System.arraycopy(selectResult.getReadFds(), 0, selectedPosixFds, 0, selectedPosixFds.length); - if (blocking) { - for (boolean b : selectedPosixFds) { - selected |= b; - } - } - } - for (int i = 0; i < multiprocessingFds.length; i++) { - int fd = multiprocessingFds[i]; - selectedMultiprocessingFds[i] = !sharedData.isBlocking(fd); - if (selectedMultiprocessingFds[i]) { - selected = true; - } - } - if (!blocking || selected) { - return; - } - if (deadline != 0 && deadline - System.nanoTime() < 0) { - return; - } - Thread.yield(); - } - } - } - -} diff --git a/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/modules/multiprocessing/PGraalPySemLock.java b/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/modules/multiprocessing/PGraalPySemLock.java deleted file mode 100644 index f17cebb238..0000000000 --- a/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/modules/multiprocessing/PGraalPySemLock.java +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Copyright (c) 2019, 2023, Oracle and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * The Universal Permissive License (UPL), Version 1.0 - * - * Subject to the condition set forth below, permission is hereby granted to any - * person obtaining a copy of this software, associated documentation and/or - * data (collectively the "Software"), free of charge and under any and all - * copyright rights in the Software, and any and all patent rights owned or - * freely licensable by each licensor hereunder covering either (i) the - * unmodified Software as contributed to or provided by such licensor, or (ii) - * the Larger Works (as defined below), to deal in both - * - * (a) the Software, and - * - * (b) any piece of software and/or hardware listed in the lrgrwrks.txt file if - * one is included with the Software each a "Larger Work" to which the Software - * is contributed by such licensors), - * - * without restriction, including without limitation the rights to copy, create - * derivative works of, display, perform, and distribute the Software and make, - * use, sell, offer for sale, import, export, have made, and have sold the - * Software and the Larger Work(s), and to sublicense the foregoing rights on - * either these or other terms. - * - * This license is subject to the following condition: - * - * The above copyright notice and either this complete permission notice or at a - * minimum a reference to the UPL must be included in all copies or substantial - * portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. - */ -package com.oracle.graal.python.builtins.modules.multiprocessing; - -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; - -import com.oracle.graal.python.builtins.objects.thread.AbstractPythonLock; -import com.oracle.graal.python.builtins.objects.thread.PThread; -import com.oracle.truffle.api.CompilerDirectives.TruffleBoundary; -import com.oracle.truffle.api.TruffleSafepoint; -import com.oracle.truffle.api.nodes.Node; -import com.oracle.truffle.api.object.Shape; -import com.oracle.truffle.api.strings.TruffleString; - -public final class PGraalPySemLock extends AbstractPythonLock { - public static final int RECURSIVE_MUTEX = 0; - public static final int SEMAPHORE = 1; - - private final Semaphore semaphore; - private final int kind; - private final TruffleString name; - - private long lastThreadID = -1; - private int count; - - public PGraalPySemLock(Object cls, Shape instanceShape, TruffleString name, int kind, Semaphore sharedSemaphore) { - super(cls, instanceShape); - this.name = name; - this.semaphore = sharedSemaphore; - this.kind = kind; - } - - @Override - @TruffleBoundary - protected boolean acquireNonBlocking() { - boolean ret = semaphore.tryAcquire(); - if (ret) { - lastThreadID = PThread.getThreadId(Thread.currentThread()); - count++; - } - return ret; - } - - @Override - @TruffleBoundary - protected boolean acquireBlocking(Node node) { - boolean[] b = new boolean[1]; - TruffleSafepoint.setBlockedThreadInterruptible(node, (s) -> { - s.acquire(); - b[0] = true; - }, semaphore); - if (b[0]) { - lastThreadID = PThread.getThreadId(Thread.currentThread()); - count++; - } - return b[0]; - } - - @Override - @TruffleBoundary - protected boolean acquireTimeout(Node node, long timeout) { - boolean[] b = new boolean[1]; - TruffleSafepoint.setBlockedThreadInterruptible(node, (s) -> b[0] = s.tryAcquire(timeout, TimeUnit.MILLISECONDS), semaphore); - if (b[0]) { - lastThreadID = PThread.getThreadId(Thread.currentThread()); - count++; - } - return b[0]; - } - - @Override - @TruffleBoundary - public void release() { - semaphore.release(); - count--; - } - - @Override - @TruffleBoundary - public boolean locked() { - return semaphore.availablePermits() == 0; - } - - @TruffleBoundary - public int getValue() { - return semaphore.availablePermits(); - } - - public int getCount() { - return count; - } - - public void increaseCount() { - count++; - lastThreadID = PThread.getThreadId(Thread.currentThread()); - } - - public void decreaseCount() { - count--; - } - - @TruffleBoundary - public boolean isMine() { - return count > 0 && lastThreadID == PThread.getThreadId(Thread.currentThread()); - } - - public boolean isZero() { - return semaphore.availablePermits() == 0; - } - - public int getKind() { - return kind; - } - - public TruffleString getName() { - return name; - } -} diff --git a/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/modules/multiprocessing/SemLockBuiltins.java b/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/modules/multiprocessing/SemLockBuiltins.java index 974672c697..34a1bb2347 100644 --- a/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/modules/multiprocessing/SemLockBuiltins.java +++ b/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/builtins/modules/multiprocessing/SemLockBuiltins.java @@ -123,7 +123,7 @@ static PSemLock construct(VirtualFrame frame, Object cls, int kind, int value, i @Cached TypeNodes.GetInstanceShape getInstanceShape, @Cached PConstructAndRaiseNode.Lazy constructAndRaiseNode, @Cached PRaiseNode raiseNode) { - if (kind != PGraalPySemLock.RECURSIVE_MUTEX && kind != PGraalPySemLock.SEMAPHORE) { + if (kind != PSemLock.RECURSIVE_MUTEX && kind != PSemLock.SEMAPHORE) { throw raiseNode.raise(inliningTarget, ValueError, ErrorMessages.UNRECOGNIZED_KIND); } Object posixName = posixLib.createPathFromString(posixSupport, name); diff --git a/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/nodes/exception/TopLevelExceptionHandler.java b/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/nodes/exception/TopLevelExceptionHandler.java index 4171d870ba..2f395900bd 100644 --- a/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/nodes/exception/TopLevelExceptionHandler.java +++ b/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/nodes/exception/TopLevelExceptionHandler.java @@ -147,9 +147,6 @@ public Object execute(VirtualFrame frame) { throw new PythonInterruptedException(); } catch (AbstractTruffleException e) { assert !PArguments.isPythonFrame(frame); - if (e instanceof PException pe && pe.getEscapedException() instanceof PBaseException managedException && getContext().isChildContext() && isSystemExit(managedException)) { - return handleChildContextExit(managedException); - } throw handlePythonException(e); } catch (ThreadDeath e) { // Do not handle, result of TruffleContext.closeCancelled() @@ -212,9 +209,6 @@ private void exit(int exitCode) { throw new PythonExitException(this, exitCode); } if (!source.isInteractive()) { - if (getContext().isChildContext()) { - getContext().getChildContextData().setExitCode(1); - } throw new PythonExitException(this, exitCode); } } @@ -278,20 +272,6 @@ private void handleSystemExit(PBaseException pythonException) { } } - @TruffleBoundary - private Object handleChildContextExit(PBaseException pythonException) throws PException { - // avoid throwing PythonExitException from spawned child context, return only exitCode - try { - return getExitCode(pythonException); - } catch (CannotCastException cce) { - if (handleAlwaysRunExceptHook(getContext(), pythonException)) { - return 1; - } else { - throw pythonException.getExceptionForReraise(pythonException.getTraceback()); - } - } - } - private static int getExitCode(PBaseException pythonException) throws CannotCastException { final Object[] exceptionAttributes = pythonException.getExceptionAttributes(); int exitcode = 0; diff --git a/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/runtime/PythonContext.java b/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/runtime/PythonContext.java index a771560e41..b0c9e309bb 100644 --- a/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/runtime/PythonContext.java +++ b/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/runtime/PythonContext.java @@ -86,14 +86,8 @@ import java.util.Optional; import java.util.Random; import java.util.WeakHashMap; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; @@ -133,7 +127,6 @@ import com.oracle.graal.python.builtins.objects.str.PString; import com.oracle.graal.python.builtins.objects.str.StringNodes.StringReplaceNode; import com.oracle.graal.python.builtins.objects.thread.PLock; -import com.oracle.graal.python.builtins.objects.thread.PThread; import com.oracle.graal.python.builtins.objects.tuple.PTuple; import com.oracle.graal.python.lib.PyObjectCallMethodObjArgs; import com.oracle.graal.python.lib.PyObjectGetAttr; @@ -149,7 +142,6 @@ import com.oracle.graal.python.nodes.object.SetDictNode; import com.oracle.graal.python.nodes.statement.AbstractImportNode; import com.oracle.graal.python.nodes.util.CannotCastException; -import com.oracle.graal.python.nodes.util.CastToJavaIntLossyNode; import com.oracle.graal.python.nodes.util.CastToTruffleStringNode; import com.oracle.graal.python.runtime.AsyncHandler.AsyncAction; import com.oracle.graal.python.runtime.PythonContextFactory.GetThreadStateNodeGen; @@ -170,7 +162,6 @@ import com.oracle.graal.python.util.ShutdownHook; import com.oracle.graal.python.util.Supplier; import com.oracle.graal.python.util.SuppressFBWarnings; -import com.oracle.truffle.api.CallTarget; import com.oracle.truffle.api.CompilerAsserts; import com.oracle.truffle.api.CompilerDirectives; import com.oracle.truffle.api.CompilerDirectives.CompilationFinal; @@ -179,8 +170,6 @@ import com.oracle.truffle.api.HostCompilerDirectives.InliningCutoff; import com.oracle.truffle.api.RootCallTarget; import com.oracle.truffle.api.ThreadLocalAction; -import com.oracle.truffle.api.TruffleContext; -import com.oracle.truffle.api.TruffleContext.Builder; import com.oracle.truffle.api.TruffleFile; import com.oracle.truffle.api.TruffleLanguage; import com.oracle.truffle.api.TruffleLanguage.ContextReference; @@ -200,7 +189,6 @@ import com.oracle.truffle.api.interop.UnsupportedMessageException; import com.oracle.truffle.api.nodes.Node; import com.oracle.truffle.api.nodes.RootNode; -import com.oracle.truffle.api.source.Source; import com.oracle.truffle.api.strings.TruffleString; import com.oracle.truffle.api.strings.TruffleString.Encoding; import com.oracle.truffle.api.utilities.CyclicAssumption; @@ -837,11 +825,6 @@ public Thread getOwner() { @CompilationFinal private long perfCounterStart = System.nanoTime(); - public static final String CHILD_CONTEXT_DATA = "childContextData"; - @CompilationFinal private ArrayList childContextFDs; - private final ChildContextData childContextData; - private final SharedMultiprocessingData sharedMultiprocessingData; - private boolean codecsInitialized; private final List codecSearchPath = new ArrayList<>(); private final Map codecSearchCache = new HashMap<>(); @@ -947,303 +930,9 @@ public void leaveAsyncHandler() { inAsyncHandler.set(false); } - public static final class ChildContextData { - private int exitCode = 0; - private boolean signaled; - private final PythonContext parentCtx; - private TruffleWeakReference ctx; - - private final AtomicBoolean exiting = new AtomicBoolean(false); - private final CountDownLatch running = new CountDownLatch(1); - - public ChildContextData(PythonContext parentCtx) { - this.parentCtx = parentCtx; - } - - public void setExitCode(int exitCode) { - this.exitCode = exitCode; - } - - public int getExitCode() { - return this.exitCode; - } - - public void setSignaled(int signaledCode) { - this.signaled = true; - this.exitCode = signaledCode; - } - - public boolean wasSignaled() { - return this.signaled; - } - - private void setTruffleContext(TruffleContext ctx) { - assert this.ctx == null; - assert ctx != null; - this.ctx = new TruffleWeakReference<>(ctx); - } - - public TruffleContext getTruffleContext() { - return ctx.get(); - } - - public void awaitRunning() throws InterruptedException { - running.await(); - } - - public boolean compareAndSetExiting(boolean expect, boolean update) { - return exiting.compareAndSet(expect, update); - } - } - - public static final class SharedMultiprocessingData { - - /** - * A sentinel object that remains in the {@link LinkedBlockingQueue} in the - * {@link #pipeData}. It is pushed there in #close so that any blocking #take calls can wake - * up and react to the end of the stream. - */ - private static final Object SENTINEL = new Object(); - - private final AtomicInteger fdCounter = new AtomicInteger(0); - - /** - * Maps the two fake file descriptors created in {@link #pipe()} to one - * {@link LinkedBlockingQueue} - */ - private final ConcurrentSkipListMap> pipeData = new ConcurrentSkipListMap<>(); - - /** - * Holds ref count of file descriptors which were passed over to a spawned child context. - * This can be either:
- *
    - *
  • fake file descriptors created via {@link #pipe()}
  • - *
  • real file descriptors coming from the posix implementation
  • - *
- */ - private final ConcurrentHashMap fdRefCount = new ConcurrentHashMap<>(); - - public SharedMultiprocessingData(ConcurrentHashMap namedSemaphores) { - this.namedSemaphores = namedSemaphores; - } - - /** - * Increases reference count for the given file descriptor. - */ - @TruffleBoundary - private void incrementFDRefCount(int fd) { - fdRefCount.compute(fd, (f, count) -> (count == null) ? 1 : count + 1); - } - - /** - * Decreases reference count for the given file descriptor. - * - * @return {@code true} if ref count was decreased, {@code false} if ref count isn't tracked - * anymore. - */ - @TruffleBoundary - public boolean decrementFDRefCount(int fd) { - Integer cnt = fdRefCount.computeIfPresent(fd, (f, count) -> { - if (count == 0 || count == Integer.MIN_VALUE) { - return Integer.MIN_VALUE; - } else { - assert count > 0; - return count - 1; - } - }); - return cnt != null && !fdRefCount.remove(fd, Integer.MIN_VALUE); - } - - /** - * @return fake (negative) fd values to avoid clash with real file descriptors and to detect - * potential usage by other python builtins - */ - @TruffleBoundary - public int[] pipe() { - LinkedBlockingQueue q = new LinkedBlockingQueue<>(); - int writeFD = fdCounter.addAndGet(-2); - assert isWriteFD(writeFD); - int readFD = getPairFd(writeFD); - pipeData.put(readFD, q); - pipeData.put(writeFD, q); - return new int[]{readFD, writeFD}; - } - - /** - * Adding pipe data needs no special synchronization, since we guarantee there is only ever - * one or no queue registered for a given fd. - */ - @TruffleBoundary - public void addPipeData(int fd, byte[] bytes, Runnable noFDHandler, Runnable brokenPipeHandler) { - assert isWriteFD(fd); - LinkedBlockingQueue q = pipeData.get(fd); - if (q == null) { - // the write end is already closed - noFDHandler.run(); - throw CompilerDirectives.shouldNotReachHere(); - } - int fd2 = getPairFd(fd); - if (isClosed(fd2)) { - // the read end is already closed - brokenPipeHandler.run(); - throw CompilerDirectives.shouldNotReachHere(); - } - q.add(bytes); - } - - /** - * Closing the read end of a pipe just removes the mapping from that fd to the queue. - * Closing the write end adds the {@link #SENTINEL} value as the last value. There is a - * potential race here for incorrect code that concurrently writes to the write end via - * {@link #addPipeData}, in that the sentinel may prevent writes from being visible. - */ - @TruffleBoundary - public void closePipe(int fd) { - LinkedBlockingQueue q = pipeData.remove(fd); - if (q != null && isWriteFD(fd)) { - q.offer(SENTINEL); - } - } - - /** - * This needs no additional synchronization, since if the write-end of the pipe is already - * closed, the {@link #take} call will return appropriately. - */ - @TruffleBoundary - public Object takePipeData(Node node, int fd, Runnable noFDHandler) { - LinkedBlockingQueue q = pipeData.get(fd); - if (q == null) { - noFDHandler.run(); - throw CompilerDirectives.shouldNotReachHere(); - } - Object[] o = new Object[]{PNone.NONE}; - TruffleSafepoint.setBlockedThreadInterruptible(node, (lbq) -> { - o[0] = take(lbq); - }, q); - return o[0]; - } - - /** - * This uses {@link ConcurrentSkipListMap#compute} to determine the blocking state. The - * runnable may be run multiple times, so we need to check and write all possible results to - * the result array. This ensures that if there is concurrent modification of the - * {@link #pipeData}, we will get a valid result. - */ - @TruffleBoundary - public boolean isBlocking(int fd) { - boolean[] result = new boolean[]{false}; - pipeData.compute(fd, (f, q) -> { - if (q == null) { - result[0] = false; - } else { - int fd2 = getPairFd(fd); - if (isClosed(fd2)) { - result[0] = false; - } else { - // this uses q.isEmpty() instead of our isEmpty(q), because we are not - // interested in the race between closing fd2 and this runnable. If the - // SENTINEL is pushed in the meantime, we should return false, just as if - // we had observed fd2 to be closed already. - result[0] = q.isEmpty(); - } - } - return q; - }); - return result[0]; - } - - private static int getPairFd(int fd) { - return isWriteFD(fd) ? fd + 1 : fd - 1; - } - - private static boolean isWriteFD(int fd) { - return fd % 2 == 0; - } - - private static Object take(LinkedBlockingQueue q) throws InterruptedException { - Object v = q.take(); - if (v == SENTINEL) { - q.offer(SENTINEL); - return PythonUtils.EMPTY_BYTE_ARRAY; - } else { - return v; - } - } - - private boolean isClosed(int fd) { - // since there is no way that any thread can be trying to read/write to this pipe FD - // legally before it was added to pipeData in #pipe above, we don't need to - // synchronize. If the FD is taken, and it's not in pipe data, this is a race in the - // program, because some thread is just arbitrarily probing FDs. - return fd >= fdCounter.get() && pipeData.get(fd) == null; - } - - /** - * @see PythonLanguage#namedSemaphores - */ - private final ConcurrentHashMap namedSemaphores; - - @TruffleBoundary - public void putNamedSemaphore(TruffleString name, Semaphore sem) { - namedSemaphores.put(name, sem); - } - - @TruffleBoundary - public Semaphore getNamedSemaphore(TruffleString name) { - return namedSemaphores.get(name); - } - - @TruffleBoundary - public Semaphore removeNamedSemaphore(TruffleString name) { - return namedSemaphores.remove(name); - } - - private final ConcurrentHashMap childContextThreads = new ConcurrentHashMap<>(); - - /** - * {@code ChildContextData} outlives its own context, because the parent needs to be able to - * access the exit code even after the child context was closed and thread disposed. We - * dispose the mapping to {@code ChildContextData} when the Python code (our internal Python - * code) asks for the exit code for the first time after the child exited. - */ - private final ConcurrentHashMap childContextData = new ConcurrentHashMap<>(); - - @TruffleBoundary - public Thread getChildContextThread(long tid) { - return childContextThreads.get(tid); - } - - @TruffleBoundary - public void putChildContextThread(long id, Thread thread) { - childContextThreads.put(id, thread); - } - - @TruffleBoundary - public void removeChildContextThread(long id) { - childContextThreads.remove(id); - } - - @TruffleBoundary - public ChildContextData getChildContextData(long tid) { - return childContextData.get(tid); - } - - @TruffleBoundary - public void removeChildContextData(long tid) { - childContextData.remove(tid); - } - - @TruffleBoundary - public void putChildContextData(long id, ChildContextData data) { - childContextData.put(id, data); - } - } - public PythonContext(PythonLanguage language, TruffleLanguage.Env env) { super(language, env); this.env = env; - this.childContextData = (ChildContextData) env.getConfig().get(CHILD_CONTEXT_DATA); - this.sharedMultiprocessingData = this.childContextData == null ? new SharedMultiprocessingData(language.namedSemaphores) : childContextData.parentCtx.sharedMultiprocessingData; this.handler = new AsyncHandler(this); this.sharedFinalizer = new AsyncHandler.SharedFinalizer(this); this.optionValues = PythonOptions.createOptionValuesStorage(env); @@ -1259,101 +948,6 @@ public static PythonContext get(Node node) { return REFERENCE.get(node); } - public boolean isChildContext() { - return childContextData != null; - } - - public ChildContextData getChildContextData() { - return childContextData; - } - - public SharedMultiprocessingData getSharedMultiprocessingData() { - return sharedMultiprocessingData; - } - - public long spawnTruffleContext(int fd, int sentinel, int[] fdsToKeep) { - ChildContextData data = new ChildContextData(isChildContext() ? childContextData.parentCtx : this); - Builder childContextBuilder = data.parentCtx.env.newInnerContextBuilder().// - forceSharing(getOption(PythonOptions.ForceSharingForInnerContexts)).// - inheritAllAccess(true).// - initializeCreatorContext(true).// - option("python.IsolateNativeModules", "true").// - // TODO always force java posix in spawned: test_multiprocessing_spawn fails - // with that. Gives "OSError: [Errno 9] Bad file number" - // option("python.PosixModuleBackend", "java").// - config(PythonContext.CHILD_CONTEXT_DATA, data); - - TruffleContext childContext = childContextBuilder.build(); - data.setTruffleContext(childContext); - ChildContextThread childContextRunnable = new ChildContextThread(fd, sentinel, data); - Thread thread = data.parentCtx.env.newTruffleThreadBuilder(childContextRunnable).context(childContext).threadGroup(threadGroup).build(); - long tid = PThread.getThreadId(thread); - getSharedMultiprocessingData().putChildContextThread(tid, thread); - getSharedMultiprocessingData().putChildContextData(tid, data); - for (int fdToKeep : fdsToKeep) { - // prevent file descriptors from being closed when passed to another "process", - // equivalent to fds_to_keep arg in posix fork_exec - getSharedMultiprocessingData().incrementFDRefCount(fdToKeep); - } - start(thread); - return tid; - } - - @TruffleBoundary - private static void start(Thread thread) { - thread.start(); - } - - public synchronized ArrayList getChildContextFDs() { - if (childContextFDs == null) { - childContextFDs = new ArrayList<>(); - } - return childContextFDs; - } - - private static final class ChildContextThread implements Runnable { - private static final TruffleLogger MULTIPROCESSING_LOGGER = PythonLanguage.getLogger(ChildContextThread.class); - private static final Source MULTIPROCESSING_SOURCE = Source.newBuilder(PythonLanguage.ID, - "from multiprocessing.popen_truffleprocess import spawn_truffleprocess; spawn_truffleprocess(fd, sentinel)", - "").internal(true).build(); - - private final int fd; - private final ChildContextData data; - private final int sentinel; - - public ChildContextThread(int fd, int sentinel, ChildContextData data) { - this.fd = fd; - this.data = data; - this.sentinel = sentinel; - } - - @Override - public void run() { - try { - MULTIPROCESSING_LOGGER.fine("starting spawned child context"); - CallTarget ct = PythonContext.get(null).getEnv().parsePublic(MULTIPROCESSING_SOURCE, "fd", "sentinel"); - try { - data.running.countDown(); - Object res = ct.call(fd, sentinel); - int exitCode = CastToJavaIntLossyNode.executeUncached(res); - data.setExitCode(exitCode); - } finally { - if (data.compareAndSetExiting(false, true)) { - try { - MULTIPROCESSING_LOGGER.log(Level.FINE, "closed spawned child context"); - } catch (Throwable t) { - MULTIPROCESSING_LOGGER.log(Level.FINE, "exception while closing spawned child context", t); - } - } - data.parentCtx.sharedMultiprocessingData.closePipe(sentinel); - } - } catch (ThreadDeath td) { - // as a result of of TruffleContext.closeCancelled() - throw td; - } - } - } - public ThreadGroup getThreadGroup() { return threadGroup; } @@ -2178,11 +1772,6 @@ public void finalizeContext() { } // interrupt and join or kill system threads joinSystemThreads(); - for (int fd : getChildContextFDs()) { - if (!getSharedMultiprocessingData().decrementFDRefCount(fd)) { - getSharedMultiprocessingData().closePipe(fd); - } - } mainThread = null; if (stdioFlushFailed) { throw new PythonExitException(null, 120); @@ -2842,7 +2431,6 @@ public void disposeThread(Thread thread, boolean canRunGuestCode, boolean markSh } ts.dispose(thread == Thread.currentThread(), markShuttingDown); releaseSentinelLock(ts.sentinelLock); - getSharedMultiprocessingData().removeChildContextThread(PThread.getThreadId(thread)); } private static void releaseSentinelLock(WeakReference sentinelLockWeakref) { diff --git a/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/runtime/object/PFactory.java b/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/runtime/object/PFactory.java index 62ce9d3a9e..4c4e50ac31 100644 --- a/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/runtime/object/PFactory.java +++ b/graalpython/com.oracle.graal.python/src/com/oracle/graal/python/runtime/object/PFactory.java @@ -32,8 +32,6 @@ import java.lang.ref.ReferenceQueue; import java.math.BigInteger; import java.util.LinkedHashMap; -import java.util.concurrent.Semaphore; - import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; @@ -69,7 +67,6 @@ import com.oracle.graal.python.builtins.modules.json.PJSONScanner; import com.oracle.graal.python.builtins.modules.lsprof.Profiler; import com.oracle.graal.python.builtins.modules.lzma.LZMAObject; -import com.oracle.graal.python.builtins.modules.multiprocessing.PGraalPySemLock; import com.oracle.graal.python.builtins.modules.multiprocessing.PSemLock; import com.oracle.graal.python.builtins.modules.pickle.PPickleBuffer; import com.oracle.graal.python.builtins.modules.pickle.PPickler; @@ -1122,14 +1119,6 @@ public static PSemLock createSemLock(Object cls, Shape shape, long handle, int k return new PSemLock(cls, shape, handle, kind, maxValue, name); } - public static PGraalPySemLock createGraalPySemLock(PythonLanguage language, TruffleString name, int kind, Semaphore sharedSemaphore) { - return createGraalPySemLock(PythonBuiltinClassType.PGraalPySemLock, PythonBuiltinClassType.PGraalPySemLock.getInstanceShape(language), name, kind, sharedSemaphore); - } - - public static PGraalPySemLock createGraalPySemLock(Object cls, Shape shape, TruffleString name, int kind, Semaphore sharedSemaphore) { - return new PGraalPySemLock(cls, shape, name, kind, sharedSemaphore); - } - public static PScandirIterator createScandirIterator(PythonLanguage language, PythonContext context, Object dirStream, PosixFileHandle path, boolean needsRewind) { return new PScandirIterator(PythonBuiltinClassType.PScandirIterator, PythonBuiltinClassType.PScandirIterator.getInstanceShape(language), context, dirStream, path, needsRewind); } diff --git a/graalpython/lib-python/3/multiprocessing/connection.py b/graalpython/lib-python/3/multiprocessing/connection.py index d7619ede2d..e3b7a95d34 100644 --- a/graalpython/lib-python/3/multiprocessing/connection.py +++ b/graalpython/lib-python/3/multiprocessing/connection.py @@ -19,17 +19,12 @@ import tempfile import itertools -# Begin Truffle change -# import _multiprocessing -import weakref -# End Truffle change +import _multiprocessing from . import util from . import AuthenticationError, BufferTooShort -# Begin Truffle change -from .context import reduction, _default_context -# End Truffle change +from .context import reduction _ForkingPickler = reduction.ForkingPickler try: @@ -40,6 +35,8 @@ raise _winapi = None +# GraalPy change: temporary until we implement proper multiprocessing for Windows +_winapi = None # # # @@ -81,9 +78,8 @@ def arbitrary_address(family): elif family == 'AF_UNIX': return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir()) elif family == 'AF_PIPE': - # GraalVM change: add thread ID, we may be in the same process return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' % - (_default_context._get_id(), next(_mmap_counter)), dir="") + (os.getpid(), next(_mmap_counter)), dir="") else: raise ValueError('unrecognized family') @@ -118,33 +114,13 @@ def address_type(address): # Connection classes # - -# Begin Truffle change -class _ConnectionFinalizer: - def __init__(self, fd): - self.fd = fd - - def close(self): - if self.fd: - try: - _default_context._close(self.fd) - finally: - self.fd = None -# End Truffle change - - class _ConnectionBase: _handle = None def __init__(self, handle, readable=True, writable=True): handle = handle.__index__() - # Begin Truffle change - if handle < 0 and not _default_context._is_graalpy(): + if handle < 0: raise ValueError("invalid handle") - # Use finalize instead of __del__ - self._finalizer = _ConnectionFinalizer(handle) - weakref.finalize(self, self._finalizer.close) - # End Truffle change if not readable and not writable: raise ValueError( "at least one of `readable` and `writable` must be True") @@ -201,9 +177,7 @@ def close(self): """Close the connection""" if self._handle is not None: try: - # Begin Truffle change - self._finalizer.close() - # End Truffle change + self._close() finally: self._handle = None @@ -402,19 +376,11 @@ def _close(self, _close=_multiprocessing.closesocket): _read = _multiprocessing.recv else: def _close(self, _close=os.close): - # Begin Truffle change - # _close(self._handle) - _default_context._close(self._handle) - # End Truffle change + _close(self._handle) _write = os.write _read = os.read def _send(self, buf, write=_write): - # Begin Truffle change - if self._handle < 0 and _default_context._is_graalpy(): - self._send_mp_write(buf.tobytes()) - return - # End Truffle change remaining = len(buf) while True: n = write(self._handle, buf) @@ -424,10 +390,6 @@ def _send(self, buf, write=_write): buf = buf[n:] def _recv(self, size, read=_read): - # Begin Truffle change - if self._handle < 0 and _default_context._is_graalpy(): - return self._recv_mp_read(size) - # End Truffle change buf = io.BytesIO() handle = self._handle remaining = size @@ -444,11 +406,6 @@ def _recv(self, size, read=_read): return buf def _send_bytes(self, buf): - # Begin Truffle change - if self._handle < 0 and _default_context._is_graalpy(): - self._send_mp_write(buf.tobytes()) - return - # End Truffle change n = len(buf) if n > 0x7fffffff: pre_header = struct.pack("!i", -1) @@ -472,10 +429,6 @@ def _send_bytes(self, buf): self._send(header + buf) def _recv_bytes(self, maxsize=None): - # Begin Truffle change - if self._handle < 0 and _default_context._is_graalpy(): - return self._recv_mp_read(maxsize) - # End Truffle change buf = self._recv(4) size, = struct.unpack("!i", buf.getvalue()) if size == -1: @@ -485,19 +438,6 @@ def _recv_bytes(self, maxsize=None): return None return self._recv(size) - # Begin Truffle change - def _recv_mp_read(self, size): - # size is irelevant, _multiprocessing._read returns - # the whole byte array at once - from _multiprocessing_graalpy import _read - chunk = _read(self._handle, size) - return io.BytesIO(chunk) - - def _send_mp_write(self, bytes): - from _multiprocessing_graalpy import _write - _write(self._handle, bytes) - # End Truffle change - def _poll(self, timeout): r = wait([self], timeout) return bool(r) @@ -603,10 +543,7 @@ def Pipe(duplex=True): c1 = Connection(s1.detach()) c2 = Connection(s2.detach()) else: - # Begin Truffle change - # fd1, fd2 = os.pipe() - fd1, fd2 = _default_context._pipe() - # End Truffle change + fd1, fd2 = os.pipe() c1 = Connection(fd1, writable=False) c2 = Connection(fd2, readable=False) @@ -1174,7 +1111,6 @@ def wait(object_list, timeout=None): else: -# Begin Truffle change: original wait works only with actual file descriptors import selectors # poll/select have the advantage of not requiring any extra file @@ -1185,7 +1121,7 @@ def wait(object_list, timeout=None): else: _WaitSelector = selectors.SelectSelector - def _original_wait(object_list, timeout=None): + def wait(object_list, timeout=None): ''' Wait till an object in object_list is ready/readable. @@ -1208,46 +1144,6 @@ def _original_wait(object_list, timeout=None): if timeout < 0: return ready - - def wait(object_list, timeout=None): - ''' - Wait till an object in object_list is ready/readable. - - Returns list of those objects in object_list which are ready/readable. - #''' - - if not _default_context._is_graalpy(): - return _original_wait(object_list, timeout) - mp_select_list = [] - mp_original_objs = [] - selectors_list = [] - for obj in object_list: - fileno = obj.fileno() if hasattr(obj, "fileno") else obj - if(fileno < 0): - mp_select_list.append(fileno) - mp_original_objs.append(obj) - else: - selectors_list.append(obj) - - # If there are no "fake" multiprocessing fds, then just use the original implementation - if not mp_select_list: - return _original_wait(object_list, timeout) - - # From the docs: - # timeout is None -> block until ready - # timeout <= 0 -> no blocking, just check - # Our internal builtin only takes float value to make it simpler: - # timeout == 0 -> block until ready - # timeout < 0 -> no blocking - if timeout is None: - normalized_timeout = 0 - else: - t = float(timeout) - normalized_timeout = -1 if t == 0 else t - from _multiprocessing_graalpy import _select - return _select(mp_select_list, mp_original_objs, selectors_list, normalized_timeout) -# End Truffle change - # # Make connection and socket objects shareable if possible # diff --git a/graalpython/lib-python/3/multiprocessing/context.py b/graalpython/lib-python/3/multiprocessing/context.py index b2e05351a0..673cde194c 100644 --- a/graalpython/lib-python/3/multiprocessing/context.py +++ b/graalpython/lib-python/3/multiprocessing/context.py @@ -23,24 +23,6 @@ class TimeoutError(ProcessError): class AuthenticationError(ProcessError): pass - -# Begin Truffle change -_graalpy_backend_futurewarned = False - - -def _warn_graalpy_backend_futurewarning_once(stacklevel): - import warnings - global _graalpy_backend_futurewarned - if not _graalpy_backend_futurewarned: - warnings.warn( - "The 'graalpy' multiprocessing backend is deprecated and will be removed in a future " - "GraalPy release. Use the 'spawn' start method where available.", - FutureWarning, - stacklevel=stacklevel, - ) - _graalpy_backend_futurewarned = True -# End Truffle change - # # Base type for contexts. Bound methods of an instance of this type are included in __all__ of __init__.py # @@ -231,57 +213,6 @@ def reducer(self, reduction): def _check_available(self): pass - # Begin Truffle change - def _is_graalpy(self): - is_graalpy = isinstance(self.get_context(), GraalPyContext) - if is_graalpy: - _warn_graalpy_backend_futurewarning_once(stacklevel=3) - return is_graalpy - - def _get_id(self): - if self._is_graalpy(): - from _multiprocessing_graalpy import _gettid - return _gettid() - import os - return os.getpid() - - def _SemLock(self, kind, value, maxvalue, name, unlink): - if self._is_graalpy(): - from _multiprocessing_graalpy import SemLock - else: - from _multiprocessing import SemLock - return SemLock(kind, value, maxvalue, name, unlink) - - def _SemLock_rebuild(self, *args): - if self._is_graalpy(): - from _multiprocessing_graalpy import SemLock - else: - from _multiprocessing import SemLock - return SemLock._rebuild(*args) - - def _sem_unlink(self, name): - if self._is_graalpy(): - from _multiprocessing_graalpy import sem_unlink - else: - from _multiprocessing import sem_unlink - sem_unlink(name) - - def _close(self, fd): - if fd < 0: - from _multiprocessing_graalpy import _close as close - else: - from os import close - close(fd) - - def _pipe(self): - if self._is_graalpy(): - from _multiprocessing_graalpy import _pipe as pipe - else: - from os import pipe - return pipe() - - # End Truffle change - # # Type of default context -- underlying context can be set at most once # @@ -327,52 +258,30 @@ def get_start_method(self, allow_none=False): return self._actual_context._name def get_all_start_methods(self): - # Begin Truffle change - methods = ['spawn', 'graalpy'] if __graalpython__.posix_module_backend() == 'native' else ['graalpy'] - return methods - # End Truffle change + # GraalPy change + return ['spawn'] # # Context types for fixed start method # -# Begin Truffle change -class GraalPyProcess(process.BaseProcess): - _start_method = 'graalpy' - @staticmethod - def _Popen(process_obj): - _warn_graalpy_backend_futurewarning_once(stacklevel=3) - from multiprocessing.popen_truffleprocess import Popen - return Popen(process_obj) - - @staticmethod - def _after_fork(): - pass - - -class GraalPyContext(BaseContext): - _name = 'graalpy' - Process = GraalPyProcess -# End Truffle change - - if sys.platform != 'win32': class ForkProcess(process.BaseProcess): _start_method = 'fork' @staticmethod def _Popen(process_obj): - # Begin Truffle change - # from .popen_fork import Popen - # return Popen(process_obj) - raise NotImplementedError("'fork' not supported in graalpython") - # End Truffle change + # GraalPy change + raise NotImplementedError("'fork' not supported on GraalPy") class SpawnProcess(process.BaseProcess): _start_method = 'spawn' @staticmethod def _Popen(process_obj): + # GraalPy change + if __graalpython__.posix_module_backend() == 'java': + raise ValueError("multiprocessing not supported with the java POSIX backend") from .popen_spawn_posix import Popen return Popen(process_obj) @@ -385,11 +294,8 @@ class ForkServerProcess(process.BaseProcess): _start_method = 'forkserver' @staticmethod def _Popen(process_obj): - # Begin Truffle change - # from .popen_forkserver import Popen - # return Popen(process_obj) - raise NotImplementedError("'forkserver' not supported in GraalPy") - # End Truffle change + # GraalPy change + raise NotImplementedError("'forkserver' not supported on GraalPy") class ForkContext(BaseContext): _name = 'fork' @@ -410,22 +316,14 @@ def _check_available(self): 'fork': ForkContext(), 'spawn': SpawnContext(), 'forkserver': ForkServerContext(), - # Begin Truffle change - 'graalpy': GraalPyContext(), - # End Truffle change } - # Begin Truffle change - # if sys.platform == 'darwin': - # # bpo-33725: running arbitrary code after fork() is no longer reliable - # # on macOS since macOS 10.14 (Mojave). Use spawn by default instead. - # _default_context = DefaultContext(_concrete_contexts['spawn']) - # else: - # _default_context = DefaultContext(_concrete_contexts['fork']) - if __graalpython__.posix_module_backend() == 'native': + # GraalPy change + if True: + # bpo-33725: running arbitrary code after fork() is no longer reliable + # on macOS since macOS 10.14 (Mojave). Use spawn by default instead. _default_context = DefaultContext(_concrete_contexts['spawn']) else: - _default_context = DefaultContext(_concrete_contexts['graalpy']) - # End Truffle change + _default_context = DefaultContext(_concrete_contexts['fork']) else: @@ -447,13 +345,8 @@ class SpawnContext(BaseContext): _concrete_contexts = { 'spawn': SpawnContext(), - # Begin Truffle change - 'graalpy': GraalPyContext(), - # End Truffle change } - # Begin Truffle change - _default_context = DefaultContext(_concrete_contexts['graalpy']) - # End Truffle change + _default_context = DefaultContext(_concrete_contexts['spawn']) # # Force the start method diff --git a/graalpython/lib-python/3/multiprocessing/pool.py b/graalpython/lib-python/3/multiprocessing/pool.py index 9185118355..c0f2e741bc 100644 --- a/graalpython/lib-python/3/multiprocessing/pool.py +++ b/graalpython/lib-python/3/multiprocessing/pool.py @@ -919,6 +919,30 @@ def _set(self, i, obj): # # +# GraalPy change: ThreadPool would still have used semaphore-based +# multiprocessing lock. Use a threading lock to make thread pools work +# on emulated posix backend +class _ThreadPoolContext: + + def __init__(self, ctx): + self._ctx = ctx + + def __getattr__(self, name): + return getattr(self._ctx, name) + + def get_context(self, method=None): + if method is None: + return self + return self._ctx.get_context(method) + + def Lock(self): + return threading.Lock() + + def SimpleQueue(self): + from .queues import SimpleQueue + return SimpleQueue(ctx=self) + + class ThreadPool(Pool): _wrap_exception = False @@ -928,7 +952,8 @@ def Process(ctx, *args, **kwds): return Process(*args, **kwds) def __init__(self, processes=None, initializer=None, initargs=()): - Pool.__init__(self, processes, initializer, initargs) + # GraalPy change + Pool.__init__(self, processes, initializer, initargs, context=_ThreadPoolContext(get_context())) def _setup_queues(self): self._inqueue = queue.SimpleQueue() diff --git a/graalpython/lib-python/3/multiprocessing/popen_truffleprocess.py b/graalpython/lib-python/3/multiprocessing/popen_truffleprocess.py deleted file mode 100644 index 0f8361d2d9..0000000000 --- a/graalpython/lib-python/3/multiprocessing/popen_truffleprocess.py +++ /dev/null @@ -1,147 +0,0 @@ -import io -import os -import signal - -from .context import reduction, set_spawning_popen -from . import spawn -from . import util -from . import process - -from _multiprocessing_graalpy import _waittid, _terminate_spawned_thread, _spawn_context, _pipe, _read, _write, _close - -__all__ = ['Popen'] - -# -# Wrapper for an fd used while launching a process -# - -class _DupFd(object): - def __init__(self, fd): - self.fd = fd - def detach(self): - return self.fd - -# -# Start child truffle context -# -# inspired by popen_spawn_posix and popen_spawn_fork -# - -class Popen(object): - method = 'graalpy' - - DupFd = _DupFd - - def __init__(self, process_obj): - util._flush_std_streams() - self._fds = [] - self.returncode = None - self.finalizer = None - self._launch(process_obj) - - def duplicate_for_child(self, fd): - self._fds.append(fd) - return fd - - def poll(self, flag=os.WNOHANG): - if self.returncode is None: - try: - # this is different than in popen_fork -> os.waitpid(self.pid, flag) - # we have no real proces pid and a process - # which could have been evenutally signaled - tid, sigcode, exitcode = _waittid(self._tid, flag) - except OSError as e: - return None - if tid == self._tid: - if sigcode > 0: - self.returncode = -sigcode - else: - self.returncode = exitcode - return self.returncode - - def wait(self, timeout=None): - if self.returncode is None: - # begin change - # if timeout is not None: - # from multiprocessing.connection import wait - # if not wait([self.sentinel], timeout): - # return None - - # this method was copied from popen_fork, and is called (only?) from process.join() - # - TODO docs says that process.join(timeout=None) should block, - # but if so, than it entirely relies on this popen.wait() - # and calling wait() only if timeout != None would not block => - # => call wait() always, even if timeout == None - # - see also _test_multiprocessing.py/test_sentinel: - # after p.join() (which should return once the process is done), - # wait_for_handle() is still called with a timeout - why so if the process is already done? - # the test (and other) fail(s) with the original impl commented above, - # raising the timeout value in wait_for_handle helps, graalpython gets more time to finish the process - from multiprocessing.connection import wait - if not wait([self.sentinel], timeout): - return None - # end change - # This shouldn't block if wait() returned successfully. - return self.poll(os.WNOHANG if timeout == 0.0 else 0) - return self.returncode - - def _send_signal(self, sig): - if self.returncode is None: - _terminate_spawned_thread(self._tid, sig) - - def terminate(self): - self._send_signal(signal.SIGTERM) - - def kill(self): - self._send_signal(signal.SIGKILL) - - def _launch(self, process_obj): - prep_data = spawn.get_preparation_data(process_obj._name) - fp = io.BytesIO() - - parent_r = child_w = child_r = parent_w = None - - try: - parent_r, child_w = _pipe() - child_r, parent_w = _pipe() - - set_spawning_popen(self) - try: - reduction.dump(prep_data, fp) - reduction.dump(process_obj, fp) - finally: - set_spawning_popen(None) - - self.sentinel = parent_r - _write(parent_w, fp.getbuffer().tobytes()) - - self._fds.extend([child_r, child_w]) - self._tid = _spawn_context(child_r, child_w, self._fds) - self.pid = self._tid - finally: - fds_to_close = [] - for fd in (parent_r, parent_w): - if fd is not None: - fds_to_close.append(fd) - self.finalizer = util.Finalize(self, util.close_fds, fds_to_close) - - for fd in (child_r, child_w): - if fd is not None: - _close(fd) - - def close(self): - if self.finalizer is not None: - self.finalizer() - - -# Entry point to the child context thread -def spawn_truffleprocess(fd, parent_sentinel): - process.current_process()._inheriting = True - try: - bytesIO = io.BytesIO(_read(fd, 1024)) - preparation_data = reduction.pickle.load(bytesIO) - spawn.prepare(preparation_data) - self = reduction.pickle.load(bytesIO) - finally: - del process.current_process()._inheriting - return self._bootstrap(parent_sentinel) diff --git a/graalpython/lib-python/3/multiprocessing/process.py b/graalpython/lib-python/3/multiprocessing/process.py index ad9381442f..271ba3fd32 100644 --- a/graalpython/lib-python/3/multiprocessing/process.py +++ b/graalpython/lib-python/3/multiprocessing/process.py @@ -83,11 +83,7 @@ def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, count = next(_process_counter) self._identity = _current_process._identity + (count,) self._config = _current_process._config.copy() - # Begin Truffle change - # self._parent_pid = os.getpid() - from .context import _default_context - self._parent_pid = _default_context._get_id() - # End Truffle change + self._parent_pid = os.getpid() self._parent_name = _current_process.name self._popen = None self._closed = False @@ -117,13 +113,8 @@ def start(self): ''' self._check_closed() assert self._popen is None, 'cannot start a process twice' - # Begin Truffle change - # assert self._parent_pid == os.getpid(), \ - # 'can only start a process object created by current process' - from .context import _default_context - assert self._parent_pid == _default_context._get_id(), \ + assert self._parent_pid == os.getpid(), \ 'can only start a process object created by current process' - # End Truffle change assert not _current_process._config.get('daemon'), \ 'daemonic processes are not allowed to have children' _cleanup() @@ -153,11 +144,7 @@ def join(self, timeout=None): Wait until child process terminates ''' self._check_closed() - # Begin Truffle change - # assert self._parent_pid == os.getpid(), 'can only join a child process' - from .context import _default_context - assert self._parent_pid == _default_context._get_id(), 'can only join a child process' - # End Truffle change + assert self._parent_pid == os.getpid(), 'can only join a child process' assert self._popen is not None, 'can only join a started process' res = self._popen.wait(timeout) if res is not None: @@ -170,12 +157,8 @@ def is_alive(self): self._check_closed() if self is _current_process: return True - # Begin Truffle change - # assert self._parent_pid == os.getpid(), 'can only test a child process' - from .context import _default_context - assert self._parent_pid == _default_context._get_id(), 'can only test a child process' - # End Truffle change - + assert self._parent_pid == os.getpid(), 'can only test a child process' + if self._popen is None: return False @@ -255,11 +238,7 @@ def ident(self): ''' self._check_closed() if self is _current_process: - # Begin Truffle change - # return os.getpid() - from .context import _default_context - return _default_context._get_id() - # End Truffle change + return os.getpid() else: return self._popen and self._popen.pid @@ -279,15 +258,11 @@ def sentinel(self): def __repr__(self): exitcode = None - from .context import _default_context if self is _current_process: status = 'started' elif self._closed: status = 'closed' - # Begin Truffle change - # elif self._parent_pid != os.getpid(): - elif self._parent_pid != _default_context._get_id(): - # End Truffle change + elif self._parent_pid != os.getpid(): status = 'unknown' elif self._popen is None: status = 'initial' diff --git a/graalpython/lib-python/3/multiprocessing/queues.py b/graalpython/lib-python/3/multiprocessing/queues.py index 9cfb6f9f20..852ae87b27 100644 --- a/graalpython/lib-python/3/multiprocessing/queues.py +++ b/graalpython/lib-python/3/multiprocessing/queues.py @@ -20,9 +20,7 @@ from queue import Empty, Full -# Begin Truffle change -# import _multiprocessing -# End Truffle change +import _multiprocessing from . import connection from . import context diff --git a/graalpython/lib-python/3/multiprocessing/resource_tracker.py b/graalpython/lib-python/3/multiprocessing/resource_tracker.py index ffa2f37261..0d903fdcbb 100644 --- a/graalpython/lib-python/3/multiprocessing/resource_tracker.py +++ b/graalpython/lib-python/3/multiprocessing/resource_tracker.py @@ -24,10 +24,6 @@ from . import spawn from . import util -# Begin Truffle change -from .context import _default_context -# End Truffle change - __all__ = ['ensure_running', 'register', 'unregister'] _HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask') @@ -38,28 +34,22 @@ } if os.name == 'posix': - # Begin Truffle change - try: - import _multiprocessing - import _posixshmem - - # Use sem_unlink() to clean up named semaphores. - # - # sem_unlink() may be missing if the Python build process detected the - # absence of POSIX named semaphores. In that case, no named semaphores were - # ever opened, so no cleanup would be necessary. - if hasattr(_multiprocessing, 'sem_unlink'): - _CLEANUP_FUNCS.update({ - 'semaphore': _multiprocessing.sem_unlink, - }) - # GraalPy chagen: comment out until we get shm support - # _CLEANUP_FUNCS.update({ - # 'shared_memory': _posixshmem.shm_unlink, - # }) - except ImportError: - # We don't have _multiprocessing, so we're running graalpy mode - pass - # End Truffle change + import _multiprocessing + import _posixshmem + + # Use sem_unlink() to clean up named semaphores. + # + # sem_unlink() may be missing if the Python build process detected the + # absence of POSIX named semaphores. In that case, no named semaphores were + # ever opened, so no cleanup would be necessary. + if hasattr(_multiprocessing, 'sem_unlink'): + _CLEANUP_FUNCS.update({ + 'semaphore': _multiprocessing.sem_unlink, + }) + # GraalPy change: comment out unit we implement _posixshmem properly + # _CLEANUP_FUNCS.update({ + # 'shared_memory': _posixshmem.shm_unlink, + # }) class ReentrantCallError(RuntimeError): @@ -107,11 +97,6 @@ def ensure_running(self): This can be run from any process. Usually a child process will use the resource created by its parent.''' - # Begin Truffle change - if _default_context._is_graalpy(): - # No resource_tracker needed in graalpy mode - return - # End Truffle change with self._lock: if self._lock._recursion_count() > 1: # The code below is certainly not reentrant-safe, so bail out @@ -176,11 +161,6 @@ def ensure_running(self): def _check_alive(self): '''Check that the pipe has not been closed by sending a probe.''' - # Begin Truffle change - if _default_context._is_graalpy(): - # No resource_tracker needed in graalpy mode - return True - # End Truffle change try: # We cannot use send here as it calls ensure_running, creating # a cycle. @@ -199,11 +179,6 @@ def unregister(self, name, rtype): self._send('UNREGISTER', name, rtype) def _send(self, cmd, name, rtype): - # Begin Truffle change - if _default_context._is_graalpy(): - # No resource_tracker needed in graalpy mode - return - # End Truffle change try: self.ensure_running() except ReentrantCallError: diff --git a/graalpython/lib-python/3/multiprocessing/synchronize.py b/graalpython/lib-python/3/multiprocessing/synchronize.py index dfa996e0b7..0f682b9a09 100644 --- a/graalpython/lib-python/3/multiprocessing/synchronize.py +++ b/graalpython/lib-python/3/multiprocessing/synchronize.py @@ -14,9 +14,7 @@ import threading import sys import tempfile -# Begin Truffle change -# import _multiprocessing -# End Truffle change +import _multiprocessing import time from . import context @@ -26,30 +24,20 @@ # Try to import the mp.synchronize module cleanly, if it fails # raise ImportError for platforms lacking a working sem_open implementation. # See issue 3770 -# Begin Truffle change -# try: -# from _multiprocessing import SemLock, sem_unlink -# except (ImportError): -# raise ImportError("This platform lacks a functioning sem_open" + -# " implementation, therefore, the required" + -# " synchronization primitives needed will not" + -# " function, see issue 3770.") -# Begin Truffle change +try: + from _multiprocessing import SemLock, sem_unlink +except (ImportError): + raise ImportError("This platform lacks a functioning sem_open" + + " implementation, therefore, the required" + + " synchronization primitives needed will not" + + " function, see issue 3770.") # # Constants # RECURSIVE_MUTEX, SEMAPHORE = list(range(2)) -# Begin Truffle change -try: - import _multiprocessing - SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX -except ImportError: - import _multiprocessing_graalpy - SEM_VALUE_MAX = _multiprocessing_graalpy.SemLock.SEM_VALUE_MAX - -# Begin Truffle change +SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX # # Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock` @@ -66,11 +54,9 @@ def __init__(self, kind, value, maxvalue, *, ctx): unlink_now = sys.platform == 'win32' or self._is_fork_ctx for i in range(100): try: - # Begin Truffle change - sl = self._semlock = ctx._SemLock( + sl = self._semlock = _multiprocessing.SemLock( kind, value, maxvalue, self._make_name(), unlink_now) - # End Truffle change except FileExistsError: pass else: @@ -98,9 +84,7 @@ def _after_fork(obj): @staticmethod def _cleanup(name): from .resource_tracker import unregister - # Begin Truffle change - context._default_context._sem_unlink(name) - # End Truffle change + sem_unlink(name) unregister(name, "semaphore") def _make_methods(self): @@ -128,10 +112,7 @@ def __getstate__(self): return (h, sl.kind, sl.maxvalue, sl.name) def __setstate__(self, state): - # Begin Truffle change - ctx = context._default_context.get_context() - self._semlock = ctx._SemLock_rebuild(*state) - # End Truffle change + self._semlock = _multiprocessing.SemLock._rebuild(*state) util.debug('recreated blocker with handle %r' % state[0]) self._make_methods() # Ensure that deserialized SemLock can be serialized again (gh-108520). diff --git a/graalpython/lib-python/3/multiprocessing/util.py b/graalpython/lib-python/3/multiprocessing/util.py index 26df2b15c6..79559823fb 100644 --- a/graalpython/lib-python/3/multiprocessing/util.py +++ b/graalpython/lib-python/3/multiprocessing/util.py @@ -18,10 +18,6 @@ from . import process -# Begin Truffle change -from .context import _default_context -# End Truffle change - __all__ = [ 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger', 'log_to_stderr', 'get_temp_dir', 'register_after_fork', @@ -205,21 +201,15 @@ def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): self._args = args self._kwargs = kwargs or {} self._key = (exitpriority, next(_finalizer_counter)) - # Begin Truffle change -# self._pid = os.getpid() - self._pid = _default_context._get_id() - # End Truffle change - + self._pid = os.getpid() + _finalizer_registry[self._key] = self def __call__(self, wr=None, # Need to bind these locally because the globals can have # been cleared at shutdown _finalizer_registry=_finalizer_registry, - # Begin Truffle change - #sub_debug=sub_debug, getpid=os.getpid): - sub_debug=sub_debug, getpid=_default_context._get_id): - # Begin Truffle change + sub_debug=sub_debug, getpid=os.getpid): ''' Run the callback unless it has already been called or cancelled ''' @@ -476,9 +466,7 @@ def spawnv_passfds(path, args, passfds): def close_fds(*fds): """Close each file descriptor given as an argument""" for fd in fds: - # Begin Truffle change - _default_context._close(fd) - # End Truffle change + os.close(fd) def _cleanup_tests(): diff --git a/graalpython/lib-python/3/test/_test_multiprocessing.py b/graalpython/lib-python/3/test/_test_multiprocessing.py index 625454b371..1dccb7a6e4 100644 --- a/graalpython/lib-python/3/test/_test_multiprocessing.py +++ b/graalpython/lib-python/3/test/_test_multiprocessing.py @@ -78,6 +78,10 @@ HAS_SHAREDCTYPES = False HAS_SHMEM = False IS_LINUX = sys.platform.startswith("linux") +IS_GRAALPY_JAVA_POSIX = ( + sys.implementation.name == 'graalpy' and + __graalpython__.posix_module_backend() == 'java' +) # End Truffle change try: @@ -240,10 +244,6 @@ def return_four_if_spawn(): multiprocessing.set_start_method(orig_start_method, force=True) -# GraalPy change -def get_id(): - return multiprocessing.context._default_context._get_id() - # # Creates a wrapper for a function which records the time it takes to finish # @@ -332,10 +332,7 @@ def test_current(self): self.assertTrue(not current.daemon) self.assertIsInstance(authkey, bytes) self.assertTrue(len(authkey) > 0) - # Begin Truffle change - # self.assertEqual(current.ident, os.getpid()) - self.assertEqual(current.ident, get_id()) - # End Truffle change + self.assertEqual(current.ident, os.getpid()) self.assertEqual(current.exitcode, None) def test_set_executable(self): @@ -413,10 +410,7 @@ def test_parent_process_attributes(self): p.join() parent_pid, parent_name = rconn.recv() self.assertEqual(parent_pid, self.current_process().pid) - # Begin Truffle change - # self.assertEqual(parent_pid, os.getpid()) - self.assertEqual(parent_pid, get_id()) - # End Truffle change + self.assertEqual(parent_pid, os.getpid()) self.assertEqual(parent_name, self.current_process().name) @classmethod @@ -3239,7 +3233,8 @@ def test_mymanager_context_prestarted(self): manager.start() with manager: self.common(manager) - self.assertEqual(manager._process.exitcode, 0) + # GraalPy change: JVM exits with 143 on SIGTERM + self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM, 128 + signal.SIGTERM)) def common(self, manager): foo = manager.Foo() @@ -5174,10 +5169,7 @@ def _child_test_wait(cls, w, slow): for i in range(10): if slow: time.sleep(random.random() * 0.100) - # Begin Truffle change - #w.send((i, os.getpid())) - w.send((i, get_id())) - # End Truffle change + w.send((i, os.getpid())) w.close() def test_wait(self, slow=False): @@ -5626,13 +5618,8 @@ def check_context(self, ctx): p.join() self.assertEqual(child_method, ctx.get_start_method()) - @support.impl_detail("It's not possible to switch between graalpy and spawn context without changing the global default", graalpy=False) def test_context(self): - # Begin Truffle change - # 'fork' and 'forkserver' not supported - # for method in ('fork', 'spawn', 'forkserver'): for method in multiprocessing.get_all_start_methods(): - # End Truffle change try: ctx = multiprocessing.get_context(method) except ValueError: @@ -5651,16 +5638,16 @@ def test_context_check_module_types(self): with self.assertRaisesRegex(TypeError, 'module_names must be a list of strings'): ctx.set_forkserver_preload([1, 2, 3]) - @support.impl_detail("It's not possible to switch between graalpy and spawn context without changing the global default", graalpy=False) def test_set_get(self): multiprocessing.set_forkserver_preload(PRELOAD) count = 0 + if not multiprocessing.get_all_start_methods(): + self.assertIsNone(multiprocessing.get_start_method(allow_none=True)) + self.assertRaises(ValueError, multiprocessing.get_start_method) + return old_method = multiprocessing.get_start_method() try: - # Begin Truffle change - # 'fork' and 'forkserver' not supported for method in multiprocessing.get_all_start_methods(): - # End Truffle change try: multiprocessing.set_start_method(method, force=True) except ValueError: @@ -5680,11 +5667,11 @@ def test_set_get(self): def test_get_all(self): methods = multiprocessing.get_all_start_methods() if sys.platform == 'win32': - # GraalVM change - self.assertEqual(methods, ['graalpy']) + self.assertEqual(methods, ['spawn']) + elif IS_GRAALPY_JAVA_POSIX: + self.assertEqual(methods, []) else: - # GraalVM change - self.assertEqual(methods, ['spawn', 'graalpy']) + self.assertEqual(methods, ['spawn']) def test_preload_resources(self): if multiprocessing.get_start_method() != 'forkserver': diff --git a/graalpython/lib-python/3/test/conftest.toml b/graalpython/lib-python/3/test/conftest.toml index c66c6c79e5..7597dd6430 100644 --- a/graalpython/lib-python/3/test/conftest.toml +++ b/graalpython/lib-python/3/test/conftest.toml @@ -21,7 +21,6 @@ selector = [ 'test_imaplib', 'test_ftplib', 'test_multiprocessing_spawn', - 'test_multiprocessing_graalpy', # trying to avoid transient issues there, not sure about the reason 'test_unittest', 'test_logging', @@ -47,7 +46,4 @@ selector = [ 'test_peepholer', # We don't have fork, so although we pass surprisingly many tests in there, it's not going to cover anything useful 'test_multiprocessing_fork', - # Transiently fails with java.lang.IllegalStateException: There is an active child contexts after finalizeContext! - # when the runner exits. We don't know which tests can trigger this, so we exclude the whole file. - 'test_multiprocessing_graalpy', ] diff --git a/graalpython/lib-python/3/test/test_multiprocessing_graalpy.py b/graalpython/lib-python/3/test/test_multiprocessing_graalpy.py deleted file mode 100644 index 8f1bd328fc..0000000000 --- a/graalpython/lib-python/3/test/test_multiprocessing_graalpy.py +++ /dev/null @@ -1,7 +0,0 @@ -import unittest -import test._test_multiprocessing - -test._test_multiprocessing.install_tests_in_module_dict(globals(), 'graalpy') - -if __name__ == '__main__': - unittest.main()