multiprocessing Pool and generators

Multi tool use
Multi tool use


multiprocessing Pool and generators



First look at the following code:


pool = multiprocessing.Pool(processes=N)
batch =
for item in generator():
batch.append(item)
if len(batch) == 10:
pool.apply_async(my_fun, args=(batch,))
batch =
# leftovers
pool.apply_async(my_fun, args=(batch,))



Essentially I'm retrieving data from a generator, collecting in into a list and then spawning a process that consumes the batch of data.



This may look fine but when the consumers (aka the pool processes) are slower than the producer (aka the generator) memory usage of the main process grows until the generator stops or... the system runs out of memory.



How can I avoid this problem?





Have you tried to build a list of lists and use pool.map_async()? or maybe starmap_async??
– wwii
Jun 27 at 20:09


pool.map_async()


starmap_async





see similar question stackoverflow.com/questions/17241663/… on how to use a queue with a process pool.
– Michael Doubez
Jun 27 at 21:12





apply_async returns an AsyncResult object, I don't see that you are using it anywhere.
– wwii
Jun 28 at 3:04



apply_async


AsyncResult




2 Answers
2



You might want to use a limited-size queue in this case.


q = multiprocessing.Queue(maxSize).



When used with max. size, this will provide you with the necessary counting and block the thread that is calling q.put() when it is full, so you could never post more than a certain number of work items on it and thus limit the memory needed to store the pending items.



Alternatively, you could use a counting semaphore (e.g., multiprocessing.BoundedSemaphore(maxSize)). Acquire it each time you get a work item from the generator and release it in your work function (my_fun) once the item is processed. This way, the maximum number of work items waiting to be processed will never exceed the initial value of the semaphore.





Thanks, I think the semaphore will do.
– Manuel
Jun 28 at 8:19



Use the grouper itertools recipe to chunk the data from your generator.


grouper



Use the infrastructure in concurrent futures to handle task submission and retrieval with the processes.



You could



Setup (attempt to simulate your process):


import concurrent.futures
import itertools, time, collections, random
from pprint import pprint

# from itertools recipes
def grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return itertools.zip_longest(*args, fillvalue=fillvalue)

# generator/iterator facsimile
class G:
'''Long-winded range(n)'''
def __init__(self, n=108):
self.n = n
self.a =
def __iter__(self):
return self
def __next__(self):
#self.a.append(time.perf_counter())
if self.n < 0:
raise StopIteration
x = self.n
self.n -= 1
return x

def my_func(*args):
time.sleep(random.randint(1,10))
return sum(*args)



Wait for groups of tasks to complete


if __name__ == '__main__':
nworkers = 4
g = G()
# generate data three-at-a-time
data = grouper(g, 3, 0)
results =
fs =
with concurrent.futures.ProcessPoolExecutor(max_workers=nworkers) as executor:
for args in data:
print(f'pending:{len(executor._pending_work_items)}')
# block submission - limit pending tasks to conserve resources (memory)
if len(executor._pending_work_items) == nworkers:
# wait till all complete and get the results
futures = concurrent.futures.wait(fs, return_when=concurrent.futures.ALL_COMPLETED)
#print(futures)
results.extend(future.result() for future in futures.done)
fs = list(futures.not_done)
# add a new task
fs.append(executor.submit(my_func, args))
# data exhausted - get leftover results as they finish
for future in concurrent.futures.as_completed(fs):
print(f'pending:{len(executor._pending_work_items)}')
result = future.result()
results.append(result)

pprint(results)



Keep the process pool full.


if __name__ == '__main__':
nworkers = 4
g = G()
# generate data three-at-a-time
data = grouper(g, 3, 0)
results =
fs =
with concurrent.futures.ProcessPoolExecutor(max_workers=nworkers) as executor:
for args in data:
print(f'pending:{len(executor._pending_work_items)}')
# block submission - limit pending tasks to conserve resources (memory)
if len(executor._pending_work_items) == nworkers:
# wait till one completes and get the result
futures = concurrent.futures.wait(fs, return_when=concurrent.futures.FIRST_COMPLETED)
#print(futures)
results.extend(future.result() for future in futures.done)
fs = list(futures.not_done)
# add a new task
fs.append(executor.submit(my_func, args))
# data exhausted - get leftover results as they finish
for future in concurrent.futures.as_completed(fs):
print(f'pending:{len(executor._pending_work_items)}')
result = future.result()
results.append(result)

pprint(results)





Well, the main problem is that the data does not fit into memory (hence the use of a generator) so I can't see how this avoids that.
– Manuel
Jun 28 at 7:37





I agree with @Manuel, this does not solve the memory issue (at least theoretically).
– diningphil
Jun 28 at 8:21






@Manuel - see edit.
– wwii
Jun 30 at 5:01





@diningphil - Please see edit: is that better?
– wwii
Jun 30 at 15:08







By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.

XFjAFKA XyP PvREcKe,asN7f0nYgElQo2CEZCRwMv,53z,dTVAJ6IV76Z4 bD,IWIslGzCn,a6 qlNtfoSdDNYm
WBLpPyHB tZNeUWTW66leacsk5AJJO,wBd,Uy,64O

Popular posts from this blog

PySpark - SparkContext: Error initializing SparkContext File does not exist

django NoReverseMatch Exception

List of Kim Possible characters