When I tried to make process.spawn call the event loop in order to address bug 916566 comment 10, it triggered this "RuntimeError: This event loop is already running" error because the _create_pty_or_pipe copy_term_size parameter triggers synchronous spawn in an async context: > lib/portage/tests/ebuild/test_doebuild_spawn.py::DoebuildSpawnTestCase::testDoebuildSpawn > Traceback (most recent call last): > File "portage/lib/_emerge/AbstractEbuildProcess.py", line 155, in _start_post_builddir_lock > SpawnProcess._start(self) > File "portage/lib/_emerge/SpawnProcess.py", line 129, in _start > self._proc = self._spawn(self.args, **kwargs) > ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ > File "portage/lib/_emerge/EbuildSpawnProcess.py", line 22, in _spawn > return self.spawn_func(args, env=env, **kwargs) > ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ > File "portage/lib/portage/process.py", line 163, in spawn_bash > return spawn(args, opt_name=opt_name, **keywords) > ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ > File "portage/lib/portage/process.py", line 636, in spawn > pid = start_func( > ^^^^^^^^^^^ > File "portage/lib/portage/process.py", line 1410, in _start_proc > proc.start() > File "portage/lib/_emerge/AsynchronousTask.py", line 34, in start > self._start() > File "portage/lib/portage/util/_async/ForkProcess.py", line 39, in _start > super()._start() > File "portage/lib/_emerge/SpawnProcess.py", line 65, in _start > master_fd, slave_fd = self._pipe(fd_pipes) > ^^^^^^^^^^^^^^^^^^^^ > File "portage/lib/_emerge/SpawnProcess.py", line 224, in _pipe > got_pty, master_fd, slave_fd = _create_pty_or_pipe(copy_term_size=stdout_pipe) > ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ > File "portage/lib/portage/util/_pty.py", line 74, in _create_pty_or_pipe > set_term_size(rows, columns, slave_fd) > File "portage/lib/portage/output.py", line 564, in set_term_size > spawn(cmd, env=os.environ, fd_pipes={0: fd}) > File "portage/lib/portage/process.py", line 697, in spawn > retval = loop.run_until_complete(pid.wait()) > ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ > File "portage/lib/portage/util/_eventloop/asyncio_event_loop.py", line 149, in _run_until_complete > return self._loop.run_until_complete(future) > ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ > File "/usr/lib/python3.12/asyncio/base_events.py", line 660, in run_until_complete > self._check_running() > File "/usr/lib/python3.12/asyncio/base_events.py", line 619, in _check_running > raise RuntimeError('This event loop is already running') > RuntimeError: This event loop is already running
We can use returnproc and then wait for it asynchronously if the event loop is already running: --- a/lib/portage/output.py +++ b/lib/portage/output.py @@ -1,2 +1,2 @@ -# Copyright 1998-2021 Gentoo Authors +# Copyright 1998-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 @@ -15,3 +15,5 @@ portage.proxy.lazyimport.lazyimport( globals(), + "portage.process:spawn", "portage.util:writemsg", + "portage.util.futures:asyncio", ) @@ -559,3 +561,2 @@ def set_term_size(lines, columns, fd): """ - from portage.process import spawn @@ -563,5 +564,13 @@ def set_term_size(lines, columns, fd): try: - spawn(cmd, env=os.environ, fd_pipes={0: fd}) + proc = spawn(cmd, env=os.environ, fd_pipes={0: fd}, returnproc=True) except CommandNotFound: writemsg(_("portage: stty: command not found\n"), noiselevel=-1) + else: + loop = asyncio.get_event_loop() + if loop.is_running(): + asyncio.ensure_future(proc.wait(), loop).add_done_callback( + lambda future: future.result() + ) + else: + loop.run_until_complete(proc.wait())
We also need to prevent spawn recursion via _create_pty_or_pipe copy_term_size. This works: diff --git a/lib/_emerge/SpawnProcess.py b/lib/_emerge/SpawnProcess.py index 5e582e3322..7df16e07aa 100644 --- a/lib/_emerge/SpawnProcess.py +++ b/lib/_emerge/SpawnProcess.py @@ -42,5 +42,5 @@ class SpawnProcess(SubProcess): __slots__ = ( - ("args", "log_filter_file") + ("args", "create_pty", "log_filter_file") + _spawn_kwarg_names + ( @@ -219,4 +219,8 @@ class SpawnProcess(SubProcess): @param fd_pipes: pipes from which to copy terminal size if desired. """ + if self.create_pty is False: + # Prevent spawn recursion via copy_term_size for bug 923750. + return os.pipe() + stdout_pipe = None if not self.background: diff --git a/lib/portage/process.py b/lib/portage/process.py index a8414c0ec4..fb8aee57ed 100644 --- a/lib/portage/process.py +++ b/lib/portage/process.py @@ -1408,4 +1407,5 @@ def _start_proc( kwargs=kwargs, fd_pipes=fd_pipes, + create_pty=False, ) proc.start()
(In reply to Zac Medico from comment #1) > We can use returnproc and then wait for it asynchronously if the event loop > is already running: When testing with multiprocessing.set_start_method("spawn", force=True), got a pickling error for os.environ that was solved by copying it to a dict like this: --- a/lib/portage/output.py +++ b/lib/portage/output.py @@ -562,7 +562,7 @@ def set_term_size(lines, columns, fd): cmd = ["stty", "rows", str(lines), "columns", str(columns)] try: - proc = spawn(cmd, env=os.environ, fd_pipes={0: fd}, returnproc=True) + proc = spawn(cmd, env=dict(os.environ), fd_pipes={0: fd}, returnproc=True) except CommandNotFound: writemsg(_("portage: stty: command not found\n"), noiselevel=-1) else: I suppose we can make spawn internally copy env to a dict if it isn't one already.
The pickling error for lib/portage/tests/sync/test_sync_local.py: > Traceback (most recent call last): > File "portage/lib/portage/util/_async/AsyncFunction.py", line 41, in _target_wrapper > result = target(*args, **kwargs) > ^^^^^^^^^^^^^^^^^^^^^^^ > File "portage/lib/portage/sync/controller.py", line 172, in sync > taskmaster.run_tasks(tasks, func, status, options=task_opts) > File "portage/lib/portage/sync/controller.py", line 65, in run_tasks > result = getattr(inst, func)(**kwargs) > ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ > File "portage/lib/portage/sync/syncbase.py", line 370, in sync > return self.update() > ^^^^^^^^^^^^^ > File "portage/lib/portage/sync/modules/rsync/rsync.py", line 211, in update > unchanged, is_synced, exitcode, updatecache_flg = self._do_rsync( > ^^^^^^^^^^^^^^^ > File "portage/lib/portage/sync/modules/rsync/rsync.py", line 813, in _do_rsync > command.append(self.download_dir) > ^^^^^^^^^^^^^^^^^ > File "portage/lib/portage/sync/syncbase.py", line 128, in download_dir > self._download_dir = self.repo_storage.init_update() > ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ > File "portage/lib/portage/util/futures/_sync_decorator.py", line 23, in wrapper > return (loop or asyncio.get_event_loop()).run_until_complete( > ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ > File "portage/lib/portage/util/_eventloop/asyncio_event_loop.py", line 149, in _run_until_complete > return self._loop.run_until_complete(future) > ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ > File "/usr/lib/python3.12/asyncio/base_events.py", line 684, in run_until_complete > return future.result() > ^^^^^^^^^^^^^^^ > File "portage/lib/portage/repository/storage/hardlink_quarantine.py", line 58, in init_update > await self._check_call(["rm", "-rf", update_location]) > File "portage/lib/portage/repository/storage/hardlink_quarantine.py", line 48, in _check_call > p.start() > File "portage/lib/_emerge/AsynchronousTask.py", line 34, in start > self._start() > File "portage/lib/_emerge/SpawnProcess.py", line 65, in _start > master_fd, slave_fd = self._pipe(fd_pipes) > ^^^^^^^^^^^^^^^^^^^^ > File "portage/lib/_emerge/SpawnProcess.py", line 228, in _pipe > got_pty, master_fd, slave_fd = _create_pty_or_pipe(copy_term_size=stdout_pipe) > ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ > File "portage/lib/portage/util/_pty.py", line 74, in _create_pty_or_pipe > set_term_size(rows, columns, slave_fd) > File "portage/lib/portage/output.py", line 565, in set_term_size > proc = spawn(cmd, env=os.environ, fd_pipes={0: fd}, returnproc=True) > ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ > File "portage/lib/portage/process.py", line 636, in spawn > pid = start_func( > ^^^^^^^^^^^ > File "portage/lib/portage/process.py", line 1413, in _start_proc > proc.start() > File "portage/lib/_emerge/AsynchronousTask.py", line 34, in start > self._start() > File "portage/lib/portage/util/_async/ForkProcess.py", line 64, in _start > self._proc = self._spawn(self.args, fd_pipes=None) > ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ > File "portage/lib/portage/util/_async/ForkProcess.py", line 226, in _spawn > proc.start() > File "/usr/lib/python3.12/multiprocessing/process.py", line 121, in start > self._popen = self._Popen(self) > ^^^^^^^^^^^^^^^^^ > File "/usr/lib/python3.12/multiprocessing/context.py", line 224, in _Popen > return _default_context.get_context().Process._Popen(process_obj) > ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ > File "/usr/lib/python3.12/multiprocessing/context.py", line 289, in _Popen > return Popen(process_obj) > ^^^^^^^^^^^^^^^^^^ > File "/usr/lib/python3.12/multiprocessing/popen_spawn_posix.py", line 32, in __init__ > super().__init__(process_obj) > File "/usr/lib/python3.12/multiprocessing/popen_fork.py", line 19, in __init__ > self._launch(process_obj) > File "/usr/lib/python3.12/multiprocessing/popen_spawn_posix.py", line 47, in _launch > reduction.dump(process_obj, fp) > File "/usr/lib/python3.12/multiprocessing/reduction.py", line 60, in dump > ForkingPickler(file, protocol).dump(obj) > AttributeError: Can't pickle local object '_createenviron.<locals>.encode'. Did you mean: '_download_dir'?
(In reply to Zac Medico from comment #2) > We also need to prevent spawn recursion via _create_pty_or_pipe > copy_term_size. This works: > > diff --git a/lib/_emerge/SpawnProcess.py b/lib/_emerge/SpawnProcess.py > index 5e582e3322..7df16e07aa 100644 > --- a/lib/_emerge/SpawnProcess.py > +++ b/lib/_emerge/SpawnProcess.py > @@ -42,5 +42,5 @@ class SpawnProcess(SubProcess): > > __slots__ = ( > - ("args", "log_filter_file") > + ("args", "create_pty", "log_filter_file") > + _spawn_kwarg_names > + ( > @@ -219,4 +219,8 @@ class SpawnProcess(SubProcess): > @param fd_pipes: pipes from which to copy terminal size if desired. > """ > + if self.create_pty is False: > + # Prevent spawn recursion via copy_term_size for bug 923750. > + return os.pipe() > + > stdout_pipe = None > if not self.background: > diff --git a/lib/portage/process.py b/lib/portage/process.py > index a8414c0ec4..fb8aee57ed 100644 > --- a/lib/portage/process.py > +++ b/lib/portage/process.py > @@ -1408,4 +1407,5 @@ def _start_proc( > kwargs=kwargs, > fd_pipes=fd_pipes, > + create_pty=False, > ) > proc.start() For now I've omitted this patch from https://github.com/gentoo/portage/pull/1252 because it applies on top of 305612d1b04aa06d3d1a1c8b51d046a644742fd5 which I reverted due to bug 923755.
The bug has been referenced in the following commit(s): https://gitweb.gentoo.org/proj/portage.git/commit/?id=c65d2d8b6c17d849e85f8c13c1a287ff1a07ccbd commit c65d2d8b6c17d849e85f8c13c1a287ff1a07ccbd Author: Zac Medico <zmedico@gentoo.org> AuthorDate: 2024-02-04 07:22:37 +0000 Commit: Zac Medico <zmedico@gentoo.org> CommitDate: 2024-02-04 08:27:37 +0000 process.spawn: Avoid os.environ pickling error This error was observed when pickling os.environ for muliprocessing start method "spawn" after set_term_size was updated to use returnproc: File "/usr/lib/python3.12/multiprocessing/reduction.py", line 60, in dump ForkingPickler(file, protocol).dump(obj) AttributeError: Can't pickle local object '_createenviron.<locals>.encode'. Did you mean: '_download_dir'? Bug: https://bugs.gentoo.org/923750 Signed-off-by: Zac Medico <zmedico@gentoo.org> lib/portage/process.py | 3 +++ 1 file changed, 3 insertions(+) https://gitweb.gentoo.org/proj/portage.git/commit/?id=f97e414ce980299acc962e357db24106d62e4c7c commit f97e414ce980299acc962e357db24106d62e4c7c Author: Zac Medico <zmedico@gentoo.org> AuthorDate: 2024-02-04 06:12:00 +0000 Commit: Zac Medico <zmedico@gentoo.org> CommitDate: 2024-02-04 08:27:37 +0000 set_term_size: Wait asynchronously if event loop is running When set_term_size is called from an asynchronous context, we can wait for the process to exit asynchronously. This will prevent a "RuntimeError: This event loop is already running" error in the future when synchronous spawn calls will need to run the event loop in order to wait for the spawned process(es). Bug: https://bugs.gentoo.org/923750 Signed-off-by: Zac Medico <zmedico@gentoo.org> lib/portage/output.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-)
(In reply to Zac Medico from comment #5) > For now I've omitted this patch from > https://github.com/gentoo/portage/pull/1252 because it applies on top of > 305612d1b04aa06d3d1a1c8b51d046a644742fd5 which I reverted due to bug 923755. Should be included in the equivalent of 305612d1b04aa06d3d1a1c8b51d046a644742fd5 the next time we merge it.
(In reply to Zac Medico from comment #7) > (In reply to Zac Medico from comment #5) > > For now I've omitted this patch from > > https://github.com/gentoo/portage/pull/1252 because it applies on top of > > 305612d1b04aa06d3d1a1c8b51d046a644742fd5 which I reverted due to bug 923755. > > Should be included in the equivalent of > 305612d1b04aa06d3d1a1c8b51d046a644742fd5 the next time we merge it. I've found a better fix that prevents ForkProcess from unnecessarily calling self._pipe and clobbering self.fd_pipes[1] and self.fd_pipes[2] from SpawnProcess: --- a/lib/portage/util/_async/ForkProcess.py +++ b/lib/portage/util/_async/ForkProcess.py @@ -80,5 +80,9 @@ class ForkProcess(SpawnProcess): if self._HAVE_SEND_HANDLE: - master_fd, slave_fd = self._pipe(self.fd_pipes) - self.fd_pipes[1] = slave_fd - self.fd_pipes[2] = slave_fd + if not any(fd in self.fd_pipes for fd in (1, 2)): + master_fd, slave_fd = self._pipe(self.fd_pipes) + self.fd_pipes[1] = slave_fd + self.fd_pipes[2] = slave_fd + else: + slave_fd = None + master_fd = os.open(os.devnull, os.O_RDONLY) self._files = self._files_dict(connection=connection, slave_fd=slave_fd) @@ -169,3 +173,4 @@ class ForkProcess(SpawnProcess): if hasattr(self._files, "slave_fd"): - os.close(self._files.slave_fd) + if self._files.slave_fd is not None: + os.close(self._files.slave_fd) del self._files.slave_fd
The bug has been referenced in the following commit(s): https://gitweb.gentoo.org/proj/portage.git/commit/?id=e8b31c86eaed645a8740fb2844e2935aee161e43 commit e8b31c86eaed645a8740fb2844e2935aee161e43 Author: Zac Medico <zmedico@gentoo.org> AuthorDate: 2024-02-05 05:55:11 +0000 Commit: Zac Medico <zmedico@gentoo.org> CommitDate: 2024-02-06 01:30:21 +0000 ForkProcess: Prevent redundant pipe and set_term_size recursion When process.spawn is updated to call ForkProcess for bug 916566, it needs to avoid recursion via set_term_size. Bug: https://bugs.gentoo.org/916566 Bug: https://bugs.gentoo.org/923750 Signed-off-by: Zac Medico <zmedico@gentoo.org> lib/_emerge/SpawnProcess.py | 31 ++++++++++++++++++++++++------- lib/portage/util/_async/ForkProcess.py | 28 +++++++++++++++++++++++----- 2 files changed, 47 insertions(+), 12 deletions(-)
The bug has been closed via the following commit(s): https://gitweb.gentoo.org/repo/gentoo.git/commit/?id=77c44c46194922509bc4f2b5cfc099412a560a69 commit 77c44c46194922509bc4f2b5cfc099412a560a69 Author: Sam James <sam@gentoo.org> AuthorDate: 2024-02-22 07:23:40 +0000 Commit: Sam James <sam@gentoo.org> CommitDate: 2024-02-22 07:23:50 +0000 sys-apps/portage: add 3.0.62 Closes: https://bugs.gentoo.org/663324 Closes: https://bugs.gentoo.org/728046 Closes: https://bugs.gentoo.org/891137 Closes: https://bugs.gentoo.org/906368 Closes: https://bugs.gentoo.org/916566 Closes: https://bugs.gentoo.org/921170 Closes: https://bugs.gentoo.org/921208 Closes: https://bugs.gentoo.org/921400 Closes: https://bugs.gentoo.org/922038 Closes: https://bugs.gentoo.org/922142 Closes: https://bugs.gentoo.org/923368 Closes: https://bugs.gentoo.org/923750 Closes: https://bugs.gentoo.org/923841 Closes: https://bugs.gentoo.org/923852 Closes: https://bugs.gentoo.org/923854 Closes: https://bugs.gentoo.org/924192 Closes: https://bugs.gentoo.org/924273 Closes: https://bugs.gentoo.org/924585 Closes: https://bugs.gentoo.org/921380 Signed-off-by: Sam James <sam@gentoo.org> sys-apps/portage/Manifest | 1 + sys-apps/portage/portage-3.0.62.ebuild | 246 +++++++++++++++++++++++++++++++++ 2 files changed, 247 insertions(+)
The bug has been referenced in the following commit(s): https://gitweb.gentoo.org/proj/portage.git/commit/?id=0ff7a3b28e0ec63d68d32e01145db8962d53774d commit 0ff7a3b28e0ec63d68d32e01145db8962d53774d Author: Zac Medico <zmedico@gentoo.org> AuthorDate: 2024-02-26 05:09:21 +0000 Commit: Zac Medico <zmedico@gentoo.org> CommitDate: 2024-02-26 05:09:21 +0000 SpawnProcess: Wait for async set_term_size Use the SpawnProcess _unregister method to handle async set_term_size results, avoiding possible messages like bug 925456 triggered: [ERROR] Task was destroyed but it is pending! Also update _BinpkgFetcherProcess and _EbuildFetcherProcess which inherit the _pty_ready attribute from SpawnProcess. Fixes: f97e414ce980 ("set_term_size: Wait asynchronously if event loop is running") Bug: https://bugs.gentoo.org/923750 Bug: https://bugs.gentoo.org/925456 Signed-off-by: Zac Medico <zmedico@gentoo.org> lib/_emerge/BinpkgFetcher.py | 6 ++++-- lib/_emerge/EbuildFetcher.py | 6 ++++-- lib/_emerge/SpawnProcess.py | 14 +++++++++++++- lib/portage/output.py | 13 +++++++++---- lib/portage/util/_pty.py | 23 ++++++++++++++++------- 5 files changed, 46 insertions(+), 16 deletions(-)