multiprocessing Pool and generators

Multi tool use
multiprocessing Pool and generators
First look at the following code:
pool = multiprocessing.Pool(processes=N)
batch =
for item in generator():
batch.append(item)
if len(batch) == 10:
pool.apply_async(my_fun, args=(batch,))
batch =
# leftovers
pool.apply_async(my_fun, args=(batch,))
Essentially I'm retrieving data from a generator, collecting in into a list and then spawning a process that consumes the batch of data.
This may look fine but when the consumers (aka the pool processes) are slower than the producer (aka the generator) memory usage of the main process grows until the generator stops or... the system runs out of memory.
How can I avoid this problem?
pool.map_async()
starmap_async
see similar question stackoverflow.com/questions/17241663/… on how to use a queue with a process pool.
– Michael Doubez
Jun 27 at 21:12
apply_async
returns an AsyncResult
object, I don't see that you are using it anywhere.– wwii
Jun 28 at 3:04
apply_async
AsyncResult
2 Answers
2
You might want to use a limited-size queue in this case.
q = multiprocessing.Queue(maxSize).
When used with max. size, this will provide you with the necessary counting and block the thread that is calling q.put() when it is full, so you could never post more than a certain number of work items on it and thus limit the memory needed to store the pending items.
Alternatively, you could use a counting semaphore (e.g., multiprocessing.BoundedSemaphore(maxSize)). Acquire it each time you get a work item from the generator and release it in your work function (my_fun) once the item is processed. This way, the maximum number of work items waiting to be processed will never exceed the initial value of the semaphore.
Thanks, I think the semaphore will do.
– Manuel
Jun 28 at 8:19
Use the grouper
itertools recipe to chunk the data from your generator.
grouper
Use the infrastructure in concurrent futures to handle task submission and retrieval with the processes.
You could
Setup (attempt to simulate your process):
import concurrent.futures
import itertools, time, collections, random
from pprint import pprint
# from itertools recipes
def grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return itertools.zip_longest(*args, fillvalue=fillvalue)
# generator/iterator facsimile
class G:
'''Long-winded range(n)'''
def __init__(self, n=108):
self.n = n
self.a =
def __iter__(self):
return self
def __next__(self):
#self.a.append(time.perf_counter())
if self.n < 0:
raise StopIteration
x = self.n
self.n -= 1
return x
def my_func(*args):
time.sleep(random.randint(1,10))
return sum(*args)
Wait for groups of tasks to complete
if __name__ == '__main__':
nworkers = 4
g = G()
# generate data three-at-a-time
data = grouper(g, 3, 0)
results =
fs =
with concurrent.futures.ProcessPoolExecutor(max_workers=nworkers) as executor:
for args in data:
print(f'pending:{len(executor._pending_work_items)}')
# block submission - limit pending tasks to conserve resources (memory)
if len(executor._pending_work_items) == nworkers:
# wait till all complete and get the results
futures = concurrent.futures.wait(fs, return_when=concurrent.futures.ALL_COMPLETED)
#print(futures)
results.extend(future.result() for future in futures.done)
fs = list(futures.not_done)
# add a new task
fs.append(executor.submit(my_func, args))
# data exhausted - get leftover results as they finish
for future in concurrent.futures.as_completed(fs):
print(f'pending:{len(executor._pending_work_items)}')
result = future.result()
results.append(result)
pprint(results)
Keep the process pool full.
if __name__ == '__main__':
nworkers = 4
g = G()
# generate data three-at-a-time
data = grouper(g, 3, 0)
results =
fs =
with concurrent.futures.ProcessPoolExecutor(max_workers=nworkers) as executor:
for args in data:
print(f'pending:{len(executor._pending_work_items)}')
# block submission - limit pending tasks to conserve resources (memory)
if len(executor._pending_work_items) == nworkers:
# wait till one completes and get the result
futures = concurrent.futures.wait(fs, return_when=concurrent.futures.FIRST_COMPLETED)
#print(futures)
results.extend(future.result() for future in futures.done)
fs = list(futures.not_done)
# add a new task
fs.append(executor.submit(my_func, args))
# data exhausted - get leftover results as they finish
for future in concurrent.futures.as_completed(fs):
print(f'pending:{len(executor._pending_work_items)}')
result = future.result()
results.append(result)
pprint(results)
Well, the main problem is that the data does not fit into memory (hence the use of a generator) so I can't see how this avoids that.
– Manuel
Jun 28 at 7:37
I agree with @Manuel, this does not solve the memory issue (at least theoretically).
– diningphil
Jun 28 at 8:21
@Manuel - see edit.
– wwii
Jun 30 at 5:01
@diningphil - Please see edit: is that better?
– wwii
Jun 30 at 15:08
By clicking "Post Your Answer", you acknowledge that you have read our updated terms of service, privacy policy and cookie policy, and that your continued use of the website is subject to these policies.
Have you tried to build a list of lists and use
pool.map_async()
? or maybestarmap_async
??– wwii
Jun 27 at 20:09