Skip to content

Service

service.tvtime_data

TVTime Data Service

Returns:

Name Type Description
TVTimeDataService

Service to handle tvtime data

TVTimeDataService

Source code in src/service/tvtime_data.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class TVTimeDataService:
    def __init__(self, redis_client: Annotated[RedisOMClient, Depends()]):
        self.redis = redis_client

    def get_status(self, username):
        status = {
            "exists": self.redis.exists(username),
            "ttl": self.redis.get_ttl(username),
        }
        return status

    def get_all_data(self, username):
        return self.redis.get_tvtime_data(username)

    def get_watch_next(self, username):
        return self.redis.get_tvtime_watch_next(username)

    def get_not_watched_for_while(self, username):
        return self.redis.get_tvtime_not_watched_for_while(username)

    def get_not_started_yet(self, username):
        return self.redis.get_tvtime_not_started_yet(username)

redis = redis_client instance-attribute

__init__(redis_client)

Source code in src/service/tvtime_data.py
14
15
def __init__(self, redis_client: Annotated[RedisOMClient, Depends()]):
    self.redis = redis_client

get_all_data(username)

Source code in src/service/tvtime_data.py
24
25
def get_all_data(self, username):
    return self.redis.get_tvtime_data(username)

get_not_started_yet(username)

Source code in src/service/tvtime_data.py
33
34
def get_not_started_yet(self, username):
    return self.redis.get_tvtime_not_started_yet(username)

get_not_watched_for_while(username)

Source code in src/service/tvtime_data.py
30
31
def get_not_watched_for_while(self, username):
    return self.redis.get_tvtime_not_watched_for_while(username)

get_status(username)

Source code in src/service/tvtime_data.py
17
18
19
20
21
22
def get_status(self, username):
    status = {
        "exists": self.redis.exists(username),
        "ttl": self.redis.get_ttl(username),
    }
    return status

get_watch_next(username)

Source code in src/service/tvtime_data.py
27
28
def get_watch_next(self, username):
    return self.redis.get_tvtime_watch_next(username)

service.tvtime_scraper

TVTime Scraper Service

Returns:

Name Type Description
TVTimeScraperService

Service to handle scraper tasks

TVTimeScraperService

Source code in src/service/tvtime_scraper.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class TVTimeScraperService:
    def __init__(self, redis_client: Annotated[RedisOMClient, Depends()]):
        self.celery = celery
        self.redis = redis_client

    def scrape(self, user: TVTimeUser) -> str:
        logger.debug("Scraping Service Started")
        task = scrape_task.delay(user)
        return task.id

    def get_status(self, task_id):
        logger.debug("Task id {task_id}", task_id=task_id)
        result = AsyncResult(id=task_id, app=self.celery)
        # https://docs.celeryq.dev/en/latest/userguide/tasks.html#built-in-states
        return {"id": result.id, "status": result.status}

celery = celery instance-attribute

redis = redis_client instance-attribute

__init__(redis_client)

Source code in src/service/tvtime_scraper.py
18
19
20
def __init__(self, redis_client: Annotated[RedisOMClient, Depends()]):
    self.celery = celery
    self.redis = redis_client

get_status(task_id)

Source code in src/service/tvtime_scraper.py
27
28
29
30
31
def get_status(self, task_id):
    logger.debug("Task id {task_id}", task_id=task_id)
    result = AsyncResult(id=task_id, app=self.celery)
    # https://docs.celeryq.dev/en/latest/userguide/tasks.html#built-in-states
    return {"id": result.id, "status": result.status}

scrape(user)

Source code in src/service/tvtime_scraper.py
22
23
24
25
def scrape(self, user: TVTimeUser) -> str:
    logger.debug("Scraping Service Started")
    task = scrape_task.delay(user)
    return task.id