Skip to content

RabbitMQ Client Reference

rabbitmq_client

RabbitMQ async client for puzzle job queuing.

init_rabbitmq async

init_rabbitmq() -> None

Initialize RabbitMQ connection and declare queue.

Source code in src/clients/rabbitmq_client.py
async def init_rabbitmq() -> None:
    """Initialize RabbitMQ connection and declare queue."""
    global connection, channel
    try:
        rabbitmq_url = f"amqp://{RABBITMQ_USERNAME}:{RABBITMQ_PASSWORD}@{RABBITMQ_HOST}:{RABBITMQ_PORT}/"
        connection = await aio_pika.connect_robust(rabbitmq_url) # type: ignore
        channel = await connection.channel() # type: ignore
        await channel.declare_queue(RABBITMQ_PROBLEMS_QUEUE_NAME, durable=True) # type: ignore
        logger.success("RabbitMQ connected successfully")
    except Exception as e:
        logger.error("RabbitMQ connection failed: {}", str(e))
        raise

close_rabbitmq async

close_rabbitmq() -> None

Close RabbitMQ connection.

Raises:

Type Description
Exception

If closing the connection fails.

Source code in src/clients/rabbitmq_client.py
async def close_rabbitmq() -> None:
    """Close RabbitMQ connection.

    Raises:
        Exception: If closing the connection fails.
    """
    global connection, channel
    try:
        if channel:
            await channel.close()
        if connection:
            await connection.close()
        logger.info("RabbitMQ closed successfully")
    except Exception as e:
        logger.error("Error closing RabbitMQ: {error}", error=str(e))
        raise

enqueue_problem async

enqueue_problem(problem: Problem) -> None

Publish a Problem instance to the queue using msgpack serialization.

Parameters:

Name Type Description Default
problem Problem

The Problem instance to enqueue.

required

Raises:

Type Description
RuntimeError

If channel is not initialized.

Exception

If publishing fails.

Source code in src/clients/rabbitmq_client.py
async def enqueue_problem(problem: Problem) -> None:
    """Publish a Problem instance to the queue using msgpack serialization.

    Args:
        problem: The Problem instance to enqueue.

    Raises:
        RuntimeError: If channel is not initialized.
        Exception: If publishing fails.
    """
    global channel
    if channel is None:
        raise RuntimeError("RabbitMQ channel not initialized. Call init_rabbitmq() first.")

    try:
        payload = _serialize_problem(problem)
        await channel.default_exchange.publish(
            aio_pika.Message(body=payload),
            routing_key=RABBITMQ_PROBLEMS_QUEUE_NAME
        )
        logger.info("Problem object enqueued: {problem_id}", problem_id=problem.problem_id)
    except Exception as e:
        logger.error("Error enqueuing problem object: {error}", error=str(e))
        raise

rabbitmq_sync_client

Synchronous RabbitMQ client for solver workers.

create_rabbitmq_connection

create_rabbitmq_connection() -> pika.BlockingConnection

Create a new blocking RabbitMQ connection.

Returns:

Type Description
BlockingConnection

pika.BlockingConnection: A new connection to RabbitMQ.

Raises:

Type Description
Exception

If connection fails.

Source code in src/clients/rabbitmq_sync_client.py
def create_rabbitmq_connection() -> pika.BlockingConnection:
    """
    Create a new blocking RabbitMQ connection.

    Returns:
        pika.BlockingConnection: A new connection to RabbitMQ.

    Raises:
        Exception: If connection fails.
    """
    try:
        credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD)
        parameters = pika.ConnectionParameters(
            host=RABBITMQ_HOST,
            port=int(RABBITMQ_PORT),
            credentials=credentials,
            heartbeat=600,
            blocked_connection_timeout=300,
        )
        connection = pika.BlockingConnection(parameters)
        logger.info("RabbitMQ blocking connection created successfully")
        return connection
    except Exception as e:
        logger.error(f"Failed to create RabbitMQ connection: {e}")
        raise

create_rabbitmq_channel

create_rabbitmq_channel(
    connection: BlockingConnection,
) -> pika.adapters.blocking_connection.BlockingChannel

Create a channel from an existing RabbitMQ connection.

Parameters:

Name Type Description Default
connection BlockingConnection

An active RabbitMQ connection.

required

Returns:

Type Description
BlockingChannel

pika.adapters.blocking_connection.BlockingChannel: A new channel.

Raises:

Type Description
Exception

If channel creation fails.

Source code in src/clients/rabbitmq_sync_client.py
def create_rabbitmq_channel(connection: pika.BlockingConnection) -> "pika.adapters.blocking_connection.BlockingChannel":
    """
    Create a channel from an existing RabbitMQ connection.

    Args:
        connection: An active RabbitMQ connection.

    Returns:
        pika.adapters.blocking_connection.BlockingChannel: A new channel.

    Raises:
        Exception: If channel creation fails.
    """
    try:
        channel = connection.channel()
        logger.info("RabbitMQ channel created successfully")
        return channel
    except Exception as e:
        logger.error(f"Failed to create RabbitMQ channel: {e}")
        raise

setup_consumer_channel

setup_consumer_channel(
    channel: BlockingChannel, prefetch_count: int = 1
) -> None

Configure a channel for consuming messages.

Parameters:

Name Type Description Default
channel BlockingChannel

The channel to configure.

required
prefetch_count int

Number of messages to prefetch (default: 1 for fair dispatch).

1
Source code in src/clients/rabbitmq_sync_client.py
def setup_consumer_channel(channel: "pika.adapters.blocking_connection.BlockingChannel", prefetch_count: int = 1) -> None:
    """
    Configure a channel for consuming messages.

    Args:
        channel: The channel to configure.
        prefetch_count: Number of messages to prefetch (default: 1 for fair dispatch).
    """
    # Declare queue (idempotent)
    channel.queue_declare(queue=RABBITMQ_PROBLEMS_QUEUE_NAME, durable=True)

    # Set QoS for fair dispatch
    channel.basic_qos(prefetch_count=prefetch_count)

    logger.info(f"Consumer channel configured with prefetch_count={prefetch_count}")

close_rabbitmq_connection

close_rabbitmq_connection(
    connection: Optional[BlockingConnection],
    channel: Optional[BlockingChannel] = None,
) -> None

Safely close RabbitMQ channel and connection.

Parameters:

Name Type Description Default
connection Optional[BlockingConnection]

The connection to close.

required
channel Optional[BlockingChannel]

Optional channel to close first.

None
Source code in src/clients/rabbitmq_sync_client.py
def close_rabbitmq_connection(
    connection: Optional[pika.BlockingConnection],
    channel: Optional["pika.adapters.blocking_connection.BlockingChannel"] = None
) -> None:
    """
    Safely close RabbitMQ channel and connection.

    Args:
        connection: The connection to close.
        channel: Optional channel to close first.
    """
    if channel:
        try:
            channel.close()
            logger.info("RabbitMQ channel closed")
        except Exception as e:
            logger.warning(f"Error closing RabbitMQ channel: {e}")

    if connection:
        try:
            connection.close()
            logger.info("RabbitMQ connection closed")
        except Exception as e:
            logger.warning(f"Error closing RabbitMQ connection: {e}")