some background
An example of an eternally pending task, Poland 2022.

Convenient scheduler in python

Introduction

Python has become an all-purpose language. It is especially commonly used in analytics and solving algorithmic problems within data science but is also popular in web development. This combination makes it a reasonable choice for various extract-transform-load (ETL) tasks.

However, many of these tasks are rather small and don’t require large frameworks such as Airflow or Luigi. When polling one or several web pages for data, a simple python script plus crontab is more than sufficient. Still, when a project gets a little bigger, managing multiple jobs using cron may become cumbersome. At the same time, bare installation of Airflow for “small jobs” needs at least 4GB RAM and 2 CPUs (here). Thinking about AWS costs, it is at least a t2.small instance running at all times.

Is there anything in between? Small enough to use, say t2.nano (very cheap) and fairly “maintainable” and “extendable”?

In this post, I would like to share with you a simple approach that uses python’s schedule package with a few modifications.

Python scheduler

Python schedule library offers simple task scheduling. It is installable using pip, and fairly easy to use. Unfortunately, the documentation doesn’t provide examples of using it within a larger project:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import schedule
import time

def job():
    print("I'm working...")

    # Run job every 3 second/minute/hour/day/week,
    # Starting 3 second/minute/hour/day/week from now
    schedule.every(3).seconds.do(job)
    schedule.every(3).minutes.do(job)
    schedule.every(3).hours.do(job)
    schedule.every(3).days.do(job)
    schedule.every(3).weeks.do(job)

    # Run job every minute at the 23rd second
    schedule.every().minute.at(":23").do(job)

    # Run job every hour at the 42rd minute
    schedule.every().hour.at(":42").do(job)

    # Run jobs every 5th hour, 20 minutes and 30 seconds in.
    # If current time is 02:00, first execution is at 06:20:30
    schedule.every(5).hours.at("20:30").do(job)

    # Run job every day at specific HH:MM and next HH:MM:SS
    schedule.every().day.at("10:30").do(job)
    schedule.every().day.at("10:30:42").do(job)

    # Run job on a specific day of the week
    schedule.every().monday.do(job)
    schedule.every().wednesday.at("13:15").do(job)
    schedule.every().minute.at(":17").do(job)

    while True:
        schedule.run_pending()
            time.sleep(1)

As you can see, all functions are called at the level of the module, which is OK for placing it in a script. However, if you have several different jobs, the code quickly becomes cluttered, especially if different callables require different parameters.

In other words, it may preferable to take advantage of the object-oriented approach and define some “architecture” around it.

Using it in a project

Let’s say, for the sake of an argument, that we have a set of dedicated ETL tasks, modeled using the following abstract class:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from abc import ABC, abstractmethod
from typing import Any, Dict, TypeVar


E = TypeVar("ETL")


class BaseETL(ABC):
    def __init__(self, **kwargs: Dict) -> None:
        self.raw_data = None
        self.transformed_data = None

    @abstractmethod
    def extract(self, **kwargs: Dict) -> E:
        ...

    @abstractmethod
    def transform(self, **kwargs: Dict) -> E:
        ...

    @abstractmethod
    def load(self, **kwargs: Dict) -> Any:
        ...

    def run(self, **kwargs: Dict) -> None:
        self.extract(**kwargs).transform(**kwargs).load(**kwargs)

Any class that would implement an ETL process would inherit from this base class. The extract method could, for example, fetch a website. Then transform would transform the raw HTML into a format acceptable by a database. Finally, the load would save the data to the database. All methods, executed in this order can be wrapped using the run method.

Now, after the ETL classes are defined, we would like to schedule each of them through the schedule module in a nice fashion.

Two example ETL tasks

For brevity, in the following examples, let’s skip the inheritance and only focus on the run method. Assume, that their extract, transform and load methods are implemented elsewhere.

etl.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class DummyETL:  # normally DummyETL(BaseETL)
    def __init__(self, init_param: int) -> None:
        # super().__init__()  # - not needed here
        self.init_param = init_param

    def run(self, p1: int, p2: int) -> None:
        print(f"{self.__class__.__name__}({self.init_param}, p1={p1}, p2={p1})")


class EvenDummierETL:  # same...
    def __init__(self, init_param: int) -> None:
        # super().__init__()  # - same
        self.init_param = init_param

    def run(self, p1: int) -> None:
        print(f"{self.__class__.__name__}({self.init_param}, p1={p1})")

The constructors’ parameters can, for instance, specify the URLs of the pages for scraping. The run methods’ parameters, for a change, can be used to pass secrets.

Now, that we have the ETL classes defined, let’s create a separate registry to associate the processes with some sort of schedule.

registry.py

1
2
3
4
5
6
7
8
9
10
11
12
13
import schedule

from etl import DummyETL, EvenDummierETL


def get_registry():
    dummy_etl = DummyETL(init_param=13)
    dummier_etl = EvenDummierETL(init_param=15)

    return [
        (dummy_etl, schedule.every(1).seconds),
        (dummier_etl, schedule.every(1).minutes.at(":05")),
    ]

The get_registry function is a place to define the schedule. Although the parameters’ values are hard-coded, you can think of a situation where the function loads them from a config file. Either way, it returns a list of tuples that matches the ETL objects with Jobs (from schedule). Note that this is our convention. The jobs are not yet associated with any particular Scheduler (again, from schedule). However, the convention allows us to do so in any other part of the project. We don’t have to bind them with the module-level object, as shown in the documentation example.

Our scheduler-based scheduler

Finally, let’s create a new class that will activate the whole mechanism.

scheduler.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import time
from typing import Dict, List, Tuple, TypeVar

from schedule import Job, Scheduler

from etl import DummyETL, EvenDummierETL
from etl import E  # we could do so from e.g. etl.base


S = TypeVar("Scheduler")


class TaskScheduler:
    def __init__(self, registry: List[Tuple[E, Job]]) -> None:
        self.scheduler = Scheduler()
        self.registry = []

        for task, job in registry:
            self.registry.append(task)
            self.scheduler.jobs.append(job)

    def register(self, run_params: Dict) -> S:
        jobs = self.scheduler.get_jobs()
        for task, job in zip(self.registry, jobs):
            params = run_params.get(task.__class__.__name__)
            job.do(task.run, **params)

        return self

    def run(self, polling_seconds: int) -> None:
        while True:
            time.sleep(polling_seconds)
            self.scheduler.run_pending()

Our TaskScheduler uses composition to create a single Scheduler instance and add previously registered jobs to it. Although not enforced, we use typing to give a strong hint on what should be provided to the constructor to properly register the jobs. Then, the register method is a separate method that provides the binding. Last, but not least, run activates the machinery.

A script that uses this implementation would look like this:

run.py

1
2
3
4
5
6
7
8
9
10
11
12
13
from registry import get_registry
from scheduler import TaskScheduler


if __name__ == "__main__":
    run_params = {
        "DummyETL": dict(p1=1, p2=2),  # e.g. from environmental variables
        "EvenDummierETL": dict(p1=3),
    }
    
    registry = get_registry()  # e.g. from script's args or config file
    task_scheduler = TaskScheduler(registry).register(run_params)
    task_scheduler.run()

Probably the weakest point of this solution is the convention that uses the __class__.__name__ as keys in the run_params dictionary. However, considering the simplicity of the approach, it may be OK, especially if these parameters would be defined at runtime. There are many alternatives, one of which could be creating an additional abstraction layer with e.g. objects like DummyTask that would serve as a bridge between ETL objects and the registry.

Another approach to TaskScheduler

Coming back to the TaskScheduler, we can also define it through inheritance as opposed to composition (as before). That would mean expanding the functionality of the schedule’s native Scheduler class. In this case, the TaskScheduler would be the following:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class TaskScheduler(Scheduler):  # <- here
    def __init__(self, registry: List[Tuple[E, Job]]) -> None:
        super().__init__()  # <- here
        self.registry = []

        for task, job in registry:
            self.registry.append(task)
            self.jobs.append(job)  # <- here

    def register(self, run_params: Dict) -> S:
        jobs = self.get_jobs()  # <- here
        for task, job in zip(self.registry, jobs):
            params = run_params.get(task.__class__.__name__)
            job.do(task.run, **params)

        return self

    def run(self, polling_seconds: int) -> None:
        while True:
            time.sleep(polling_seconds)
            self.run_pending()  # <- and here

You decide which way is better if any ;).

Conclusion

In this brief article, we have shown how the simple schedule module can be expanded to create a small ETL working machine. Most importantly, the approach allows to better organize the code within a small project without having to fetch the big cannons.