aio-pika

Helios' instrumentation of aio-pika enables developers to visualize, troubleshoot and test their applicative flows that include aio-pika operations.

If you are using a different method other than aio-pika's native consume, there isn't a scope in which the message context can be applied.
Therefore, applying the context has to be done manually using Helios' AioPikaMessageContext with statement.
For example, wrapping the simple consumer example from the aio-pika documentation:

import asyncio
import logging

import aio_pika
from helios import AioPikaMessageContext


async def main() -> None:
    logging.basicConfig(level=logging.DEBUG)
    connection = await aio_pika.connect_robust(
        "amqp://guest:[email protected]/",
    )

    queue_name = "test_queue"

    async with connection:
        # Creating channel
        channel = await connection.channel()

        # Will take no more than 10 messages in advance
        await channel.set_qos(prefetch_count=10)

        # Declaring queue
        queue = await channel.declare_queue(queue_name, auto_delete=True)

        async with queue.iterator() as queue_iter:
            async for message in queue_iter:
                with AioPikaMessageContext(message, queue):
                    async with message.process():
                        print(message.body)

                        if queue.name in message.body.decode():
                            break


if __name__ == "__main__":
    asyncio.run(main())

Trace visualization