aio-pika
Helios' instrumentation of aio-pika enables developers to visualize, troubleshoot and test their applicative flows that include aio-pika operations.
If you are using a different method other than aio-pika
's native consume
, there isn't a scope in which the message context can be applied.
Therefore, applying the context has to be done manually using Helios' AioPikaMessageContext
with
statement.
For example, wrapping the simple consumer example from the aio-pika
documentation:
import asyncio
import logging
import aio_pika
from helios import AioPikaMessageContext
async def main() -> None:
logging.basicConfig(level=logging.DEBUG)
connection = await aio_pika.connect_robust(
"amqp://guest:[email protected]/",
)
queue_name = "test_queue"
async with connection:
# Creating channel
channel = await connection.channel()
# Will take no more than 10 messages in advance
await channel.set_qos(prefetch_count=10)
# Declaring queue
queue = await channel.declare_queue(queue_name, auto_delete=True)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
with AioPikaMessageContext(message, queue):
async with message.process():
print(message.body)
if queue.name in message.body.decode():
break
if __name__ == "__main__":
asyncio.run(main())
Trace visualization
Updated 8 months ago