Skip to content

REDIS CLient Reference

redis_client

Async Redis client helpers for storing problems efficiently.

save_to_redis async

save_to_redis(key: str, value: str) -> None

Save a key-value pair to Redis.

Source code in src/clients/redis_client.py
async def save_to_redis(key: str, value: str) -> None:
    """Save a key-value pair to Redis."""
    global redis
    if redis:
        await redis.set(key, value)
        logger.info(f"Saved to Redis: {key}")

save_problem_redis async

save_problem_redis(problem: Problem) -> str

Persist a Problem instance in Redis using msgpack.

Parameters:

Name Type Description Default
problem Problem

The Problem instance to store.

required

Returns:

Name Type Description
str str

The Redis key used (problem_id).

Source code in src/clients/redis_client.py
async def save_problem_redis(problem: Problem) -> str:
    """Persist a Problem instance in Redis using msgpack.

    Args:
        problem: The Problem instance to store.

    Returns:
        str: The Redis key used (problem_id).
    """
    global redis
    if redis is None:
        raise RuntimeError("Redis is not initialized. Call init_redis() first.")

    payload = _serialize_problem(problem)
    await redis.set(problem.problem_id, payload)
    logger.info(f"Saved problem to Redis: {problem.problem_id}")
    return problem.problem_id

load_problem_redis async

load_problem_redis(problem_id: str) -> Problem | None

Retrieve a Problem payload from Redis.

Parameters:

Name Type Description Default
problem_id str

The UUID key used when saving the problem.

required

Returns:

Type Description
Problem | None

dict | None: Decoded problem payload or None if missing.

Source code in src/clients/redis_client.py
async def load_problem_redis(problem_id: str) -> Problem | None:
    """Retrieve a Problem payload from Redis.

    Args:
        problem_id: The UUID key used when saving the problem.

    Returns:
        dict | None: Decoded problem payload or None if missing.
    """
    global redis
    if redis is None:
        raise RuntimeError("Redis is not initialized. Call init_redis() first.")

    blob = await redis.get(problem_id)
    if blob is None:
        return None
    return _deserialize_problem(blob)

check_id_exists async

check_id_exists(problem_id: str) -> bool

Check if a problem ID exists in Redis.

Parameters:

Name Type Description Default
problem_id str

The UUID key to check.

required

Returns: bool: True if the ID exists, False otherwise.

Source code in src/clients/redis_client.py
async def check_id_exists(problem_id: str) -> bool:
    """Check if a problem ID exists in Redis.

    Args:
        problem_id: The UUID key to check.
    Returns:
        bool: True if the ID exists, False otherwise.
    """
    global redis
    if redis is None:
        raise RuntimeError("Redis is not initialized. Call init_redis() first.")
    exists = await redis.exists(problem_id)
    return exists == 1

subscribe_to_problem_channel async

subscribe_to_problem_channel() -> PubSub

Subscribe to a Redis channel for pub/sub.

Returns:

Name Type Description
PubSub PubSub

The Redis PubSub object.

Source code in src/clients/redis_client.py
async def subscribe_to_problem_channel() -> PubSub:
    """Subscribe to a Redis channel for pub/sub.

    Returns:
        PubSub: The Redis PubSub object.
    """
    global redis
    channel_name = "problems"
    if redis is None:
        raise RuntimeError("Redis is not initialized. Call init_redis() first.")
    pubsub = redis.pubsub()
    try:
        await pubsub.subscribe(channel_name)
        logger.info(f"Subscribed to Redis channel: {channel_name}")
    finally:
        await pubsub.unsubscribe(channel_name)
        await pubsub.close()

    return pubsub

receive_message async

receive_message(pubsub, timeout: float) -> dict | None

Receive a message from the Redis PubSub channel.

Parameters:

Name Type Description Default
pubsub PubSub

The Redis PubSub object.

required
timeout float

Time in seconds to wait for a message.

required

Returns:

Type Description
dict | None

dict | None: The message data or None if no message.

Source code in src/clients/redis_client.py
async def receive_message(pubsub, timeout: float) -> dict | None:
    """Receive a message from the Redis PubSub channel.

    Args:
        pubsub (PubSub): The Redis PubSub object.
        timeout (float): Time in seconds to wait for a message.

    Returns:
        dict | None: The message data or None if no message.
    """
    while True:
        message = await pubsub.get_message(
            ignore_subscribe_messages=True, timeout=timeout
        )
        if message is None:
            return None
        elif message["type"] == "message":
            return message["data"]
        else:
            continue

unsunscribe_and_close_pubsub async

unsunscribe_and_close_pubsub(pubsub) -> None

Unsubscribe from all channels and close the PubSub connection.

Parameters:

Name Type Description Default
pubsub PubSub

The Redis PubSub object.

required
Source code in src/clients/redis_client.py
async def unsunscribe_and_close_pubsub(pubsub) -> None:
    """Unsubscribe from all channels and close the PubSub connection.

    Args:
        pubsub (PubSub): The Redis PubSub object.
    """
    if pubsub:
        await pubsub.unsubscribe()
        await pubsub.close()
        logger.info("Unsubscribed and closed PubSub connection")

redis_sync_client

Synchronous Redis client for solver workers.

create_redis_client

create_redis_client() -> redis.Redis

Create a new synchronous Redis client.

Returns:

Type Description
Redis

redis.Redis: A new Redis client instance.

Raises:

Type Description
Exception

If connection or ping fails.

Source code in src/clients/redis_sync_client.py
def create_redis_client() -> redis.Redis:  # type: ignore
    """
    Create a new synchronous Redis client.

    Returns:
        redis.Redis: A new Redis client instance.

    Raises:
        Exception: If connection or ping fails.
    """
    try:
        redis_url = f"redis://{REDIS_HOST_URL}:{REDIS_PORT}"
        client: redis.Redis = redis.Redis.from_url(redis_url, decode_responses=False)  # type: ignore

        # Test connection
        client.ping()
        logger.info("Redis sync client connected successfully")
        return client
    except Exception as e:
        logger.error(f"Failed to create Redis client: {e}")
        raise

save_solution_to_redis

save_solution_to_redis(
    client: Redis, solution: Solution
) -> None

Save a solution to Redis and update the problem with solution data.

Parameters:

Name Type Description Default
client Redis

Redis client instance.

required
solution Solution

The Solution object containing problem_id, solution_data, and status.

required

Raises:

Type Description
RuntimeError

If client is None.

Exception

If save fails.

Source code in src/clients/redis_sync_client.py
def save_solution_to_redis(
    client: redis.Redis,  # type: ignore
    solution: Solution,
) -> None:
    """
    Save a solution to Redis and update the problem with solution data.

    Args:
        client: Redis client instance.
        solution: The Solution object containing problem_id, solution_data, and status.

    Raises:
        RuntimeError: If client is None.
        Exception: If save fails.
    """
    if client is None:
        raise RuntimeError("Redis client is None")

    try:
        # Load and update the problem with solution data
        problem_bytes = client.get(solution.problem_id)
        if problem_bytes and isinstance(problem_bytes, bytes):
            from clients.util import _deserialize_problem, _serialize_problem

            problem = _deserialize_problem(problem_bytes)
            problem.solution = solution.solution_data
            problem.status = solution.status

            # Save updated problem back to Redis
            updated_problem_bytes = _serialize_problem(problem)
            client.set(solution.problem_id, updated_problem_bytes)
            logger.info(f"Problem {solution.problem_id} updated with solution")
        else:
            logger.warning(
                f"Problem {solution.problem_id} not found in Redis, only solution saved"
            )

    except Exception as e:
        logger.error(f"Failed to save solution to Redis: {e}")
        raise

close_redis_client

close_redis_client(client: Optional[Redis]) -> None

Safely close Redis client connection.

Parameters:

Name Type Description Default
client Optional[Redis]

The Redis client to close.

required
Source code in src/clients/redis_sync_client.py
def close_redis_client(client: Optional[redis.Redis]) -> None:  # type: ignore
    """
    Safely close Redis client connection.

    Args:
        client: The Redis client to close.
    """
    if client:
        try:
            client.close()
            logger.info("Redis client closed")
        except Exception as e:
            logger.warning(f"Error closing Redis client: {e}")