Gentoo Websites Logo
Go to: Gentoo Home Documentation Forums Lists Bugs Planet Store Wiki Get Gentoo!
Bug 648790 - sys-apps/portage: add parallel aux_get method for things like repoman to use
Summary: sys-apps/portage: add parallel aux_get method for things like repoman to use
Status: RESOLVED FIXED
Alias: None
Product: Portage Development
Classification: Unclassified
Component: Enhancement/Feature Requests (show other bugs)
Hardware: All All
: Normal normal (vote)
Assignee: Portage team
URL:
Whiteboard:
Keywords: InVCS
Depends on:
Blocks: 651804 653810 653844 653946 654038 655378 666940
  Show dependency tree
 
Reported: 2018-02-25 19:57 UTC by Zac Medico
Modified: 2018-09-24 03:36 UTC (History)
1 user (show)

See Also:
Package list:
Runtime testing required: ---


Attachments

Note You need to log in before you can comment on or make changes to this bug.
Description Zac Medico gentoo-dev 2018-02-25 19:57:51 UTC
For things like repoman, it would be useful to have a parallel aux_get method that takes an input iterator and returns an iterator that yields results as they become available. There's a prototype implementation here:

https://github.com/funtoo/portage-gentoo/blob/aux_get-fork/pym/portage/dbapi/porttree.py#L577

My plan is to add an asynchronous aux_get method that returns a Future. A separate parallel aux_get method will use the existing TaskScheduler class to handle scheduling with max_jobs and max_load parameter, while a loop like the following yields results as they become available, using a function with the same behavior as asyncio.wait:

> # future_generator populates the futures dictionary that
> # is used in the while loop
> scheduler = TaskScheduler(
> 	future_generator,
> 	max_jobs=max_jobs,
> 	max_load=max_load,
> 	event_loop=loop)
> 
> try:
> 	scheduler.start()
> 
> 	# scheduler should ensure that futures is non-empty until
> 	# future_generator is exhausted
> 	while futures:
> 		done, pending = loop.run_until_complete(
> 			wait(*list(futures.values()),
> 			return_when=FIRST_COMPLETED))
> 		for future in done:
> 			del futures[id(future)]
> 			yield future
> 
> finally:
> 	# cleanup in case if interruption by SIGINT, etc
> 	scheduler.cancel()
> 	scheduler.wait()

A class like the following can be used to wrap Future instances for scheduling by TaskScheduler:

> class FutureWrapper(AbstractPollTask):
> 	__slots__ = ('future',)
> 	def _start(self):
> 		self.future.add_done_callback(self._done_callback)
> 
> 	def _done_callback(self, future):
> 		if future.cancelled():
> 			self.cancelled = True
> 			self.returncode = -signal.SIGINT
> 		elif future.exception() is None:
> 			self.returncode = os.EX_OK
> 		else:
> 			self.returncode = 1
> 		self.wait()
Comment 2 Larry the Git Cow gentoo-dev 2018-02-28 18:41:02 UTC
The bug has been referenced in the following commit(s):

https://gitweb.gentoo.org/proj/portage.git/commit/?id=e43f6c583ed9205abbdcb11340c81d7dd97ccc11

commit e43f6c583ed9205abbdcb11340c81d7dd97ccc11
Author:     Zac Medico <zmedico@gentoo.org>
AuthorDate: 2018-02-25 23:19:58 +0000
Commit:     Zac Medico <zmedico@gentoo.org>
CommitDate: 2018-02-28 17:22:20 +0000

    Add iter_completed convenience function (bug 648790)
    
    The iter_completed function is similar to asyncio.as_completed, but
    takes an iterator of futures as input, and includes support for
    max_jobs and max_load parameters. The default values for max_jobs
    and max_load correspond to multiprocessing.cpu_count().
    
    Example usage for async_aux_get:
    
      import portage
      from portage.util.futures.iter_completed import iter_completed
    
      portdb = portage.portdb
      # aux_get has many inputs, and the same cpv can exist in multiple
      # repositories, so the caller is responsibe for mapping futures
      # back to their aux_get inputs
      future_cpv = {}
    
      def future_generator():
        for cpv in portdb.cp_list('sys-apps/portage'):
          future = portdb.async_aux_get(cpv, portage.auxdbkeys)
          future_cpv[id(future)] = cpv
          yield future
    
      for future in iter_completed(future_generator()):
        cpv = future_cpv.pop(id(future))
        try:
          result = future.result()
        except KeyError as e:
          # aux_get failed
          print('error:', cpv, e)
        else:
          print(cpv, result)
    
    See: https://docs.python.org/3/library/asyncio-task.html#asyncio.as_completed
    Bug: https://bugs.gentoo.org/648790

 .../tests/util/futures/test_iter_completed.py      |  50 ++++++++++
 pym/portage/util/_async/AsyncTaskFuture.py         |  31 +++++++
 pym/portage/util/futures/iter_completed.py         |  63 +++++++++++++
 pym/portage/util/futures/wait.py                   | 102 +++++++++++++++++++++
 4 files changed, 246 insertions(+)

https://gitweb.gentoo.org/proj/portage.git/commit/?id=e3960e27f1b75120c2c0511b6207bd2ebdcc26a3

commit e3960e27f1b75120c2c0511b6207bd2ebdcc26a3
Author:     Zac Medico <zmedico@gentoo.org>
AuthorDate: 2018-02-25 22:27:13 +0000
Commit:     Zac Medico <zmedico@gentoo.org>
CommitDate: 2018-02-28 17:22:19 +0000

    portdbapi: add async_aux_get method (bug 648790)
    
    Add async_aux_get method that returns a Future and otherwise
    behaves identically to aux_get. Use async_aux_get to implement
    the synchronous aux_get method.
    
    Bug: https://bugs.gentoo.org/648790
    Reviewed-by: Alec Warner <antarus@gentoo.org>

 pym/portage/dbapi/porttree.py | 99 ++++++++++++++++++++++++++++++++++---------
 1 file changed, 78 insertions(+), 21 deletions(-)}
Comment 3 Zac Medico gentoo-dev 2018-06-02 00:20:53 UTC
The async_aux_get method has been used the fixes for a number of bugs involving event loop recursion:

bug 653810
bug 653844
bug 653946
bug 654038
bug 655378
Comment 4 Zac Medico gentoo-dev 2018-07-02 18:44:45 UTC
Fixed in portage-2.3.40-r1.