For things like repoman, it would be useful to have a parallel aux_get method that takes an input iterator and returns an iterator that yields results as they become available. There's a prototype implementation here: https://github.com/funtoo/portage-gentoo/blob/aux_get-fork/pym/portage/dbapi/porttree.py#L577 My plan is to add an asynchronous aux_get method that returns a Future. A separate parallel aux_get method will use the existing TaskScheduler class to handle scheduling with max_jobs and max_load parameter, while a loop like the following yields results as they become available, using a function with the same behavior as asyncio.wait: > # future_generator populates the futures dictionary that > # is used in the while loop > scheduler = TaskScheduler( > future_generator, > max_jobs=max_jobs, > max_load=max_load, > event_loop=loop) > > try: > scheduler.start() > > # scheduler should ensure that futures is non-empty until > # future_generator is exhausted > while futures: > done, pending = loop.run_until_complete( > wait(*list(futures.values()), > return_when=FIRST_COMPLETED)) > for future in done: > del futures[id(future)] > yield future > > finally: > # cleanup in case if interruption by SIGINT, etc > scheduler.cancel() > scheduler.wait() A class like the following can be used to wrap Future instances for scheduling by TaskScheduler: > class FutureWrapper(AbstractPollTask): > __slots__ = ('future',) > def _start(self): > self.future.add_done_callback(self._done_callback) > > def _done_callback(self, future): > if future.cancelled(): > self.cancelled = True > self.returncode = -signal.SIGINT > elif future.exception() is None: > self.returncode = os.EX_OK > else: > self.returncode = 1 > self.wait()
Patches posted for review: https://archives.gentoo.org/gentoo-portage-dev/message/41514982d7ac24d85d618cbb001ea3da https://github.com/gentoo/portage/pull/260
The bug has been referenced in the following commit(s): https://gitweb.gentoo.org/proj/portage.git/commit/?id=e43f6c583ed9205abbdcb11340c81d7dd97ccc11 commit e43f6c583ed9205abbdcb11340c81d7dd97ccc11 Author: Zac Medico <zmedico@gentoo.org> AuthorDate: 2018-02-25 23:19:58 +0000 Commit: Zac Medico <zmedico@gentoo.org> CommitDate: 2018-02-28 17:22:20 +0000 Add iter_completed convenience function (bug 648790) The iter_completed function is similar to asyncio.as_completed, but takes an iterator of futures as input, and includes support for max_jobs and max_load parameters. The default values for max_jobs and max_load correspond to multiprocessing.cpu_count(). Example usage for async_aux_get: import portage from portage.util.futures.iter_completed import iter_completed portdb = portage.portdb # aux_get has many inputs, and the same cpv can exist in multiple # repositories, so the caller is responsibe for mapping futures # back to their aux_get inputs future_cpv = {} def future_generator(): for cpv in portdb.cp_list('sys-apps/portage'): future = portdb.async_aux_get(cpv, portage.auxdbkeys) future_cpv[id(future)] = cpv yield future for future in iter_completed(future_generator()): cpv = future_cpv.pop(id(future)) try: result = future.result() except KeyError as e: # aux_get failed print('error:', cpv, e) else: print(cpv, result) See: https://docs.python.org/3/library/asyncio-task.html#asyncio.as_completed Bug: https://bugs.gentoo.org/648790 .../tests/util/futures/test_iter_completed.py | 50 ++++++++++ pym/portage/util/_async/AsyncTaskFuture.py | 31 +++++++ pym/portage/util/futures/iter_completed.py | 63 +++++++++++++ pym/portage/util/futures/wait.py | 102 +++++++++++++++++++++ 4 files changed, 246 insertions(+) https://gitweb.gentoo.org/proj/portage.git/commit/?id=e3960e27f1b75120c2c0511b6207bd2ebdcc26a3 commit e3960e27f1b75120c2c0511b6207bd2ebdcc26a3 Author: Zac Medico <zmedico@gentoo.org> AuthorDate: 2018-02-25 22:27:13 +0000 Commit: Zac Medico <zmedico@gentoo.org> CommitDate: 2018-02-28 17:22:19 +0000 portdbapi: add async_aux_get method (bug 648790) Add async_aux_get method that returns a Future and otherwise behaves identically to aux_get. Use async_aux_get to implement the synchronous aux_get method. Bug: https://bugs.gentoo.org/648790 Reviewed-by: Alec Warner <antarus@gentoo.org> pym/portage/dbapi/porttree.py | 99 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 78 insertions(+), 21 deletions(-)}
The async_aux_get method has been used the fixes for a number of bugs involving event loop recursion: bug 653810 bug 653844 bug 653946 bug 654038 bug 655378
Fixed in portage-2.3.40-r1.