asyncio.Queue - any benefit in async put with maxsize?

15 hours ago 3
ARTICLE AD BOX

I have a sync call function that puts task in an asyncio.Queue so it can be processed by an async worker. Currently, I have something like this:

import asyncio import os import sys queue = asyncio.Queue(maxsize=0) async def worker(): try: while True: task = await queue.get() print(f'processing task: {task}') queue.task_done() except asyncio.QueueShutDown: return def on_read(): data = os.read(0, 10) if data: queue.put_nowait(data) else: queue.shutdown() loop = asyncio.new_event_loop() loop.add_reader(0, on_read) loop.run_until_complete(worker())

The code could be used like this:

for i in $(seq 3); do echo $i; sleep 1; done | python3 minimal_queue.py

Note that I use put_nowait to put task in the queue. This could theoretically raise QueueFull, but since the queue is unbounded (maxsize=0) this should never happen. This can however mean that the queue grows until the system is out of memory.

I was thinking about changing this to set a max size on the queue and then use loop.create_task(queue.put(data)). But I am unsure about this. My guess would be that under stress this would just queue up a ton of blocked tasks, which might fill up memory even faster.

The third option would be to stick with put_nowait() and still use maxsize=0. This would fail much earlier, but would not eat up all memory on the system and therefore have less impact on other processes on the system.

I currently lean towards the third option. But I am not sure whether I fully understand how these queues work. Is my reasoning correct?

Read Entire Article