Introduction to Kestra

Introduction to Kestra

Kestra is an open-source data orchestrator and scheduler. With Kestra, data workflows, called flows, use the YAML format and are executed by its engine via an API call, the user interface, or a trigger (webhook, schedule, SQL query, Pub/Sub message, …).

The important notions of Kestra are :

  • The flow: which describes how the data will be orchestrated (the workflow thus). It is a sequence of tasks .
  • The task: a step in the flow that will perform an action on one or more incoming data, the inputs, and then generate zero or one output data, the output.
  • The execution: which is an instance of a flow following a trigger.
  • The trigger: which defines how a flow can be triggered.

We will see these concepts in practice in the following article.

To launch Kestra locally, the easiest way is to use the Docker compose provided file, then run it via docker compose up.

This Docker compose will run Kestra in standalone mode (all components in a single process) with a PostgreSQL database as a queue and repository (for storing and executing flows), and a local storage (for storing flow data).

Kestra is composed of several components that communicate together via queues and store flow information via repositories, these queues and repositories can be implemented in different ways (memory, JDBC, Kafka, Elasticsearch). I’m not going into detail about Kestra’s architecture today, but for the more curious you can refer to the architecture documentation of Kestra.

Once Kestra is started, a graphical interface will be available on port 8080: http://localhost:8080, it’s via this interface that we’ll do all the examples in this article.

At the first launch, a Guided Tour will be offered, you can follow it or skip it to be able to follow the examples in this article.

My first flow

To create a flow, go to the Flows menu and then click on the Create button at the bottom right. Now you have a textarea in which you will be able to enter the YAML description of the flow.

id: hello-world
namespace: fr.loicmathieu.example

tasks:
  - id: hello
    type: io.kestra.core.tasks.debugs.Echo
    format: "Hello World"

A flow has:

  • An id property that is its unique identifier within a namespace.
  • A namespace property, namespaces are hierarchical like directories in a file system.
  • A tasks property which is the list of tasks to be performed at the execution of the flow.

Here we have added a single task that has three properties:

  • An id property that is its unique identifier within the flow.
  • A type property which is the name of the class that represents the task.
  • A format property, this is a property specific to Echo tasks that defines the format of the message to be logged (an Echo task is like the echo shell command).

Each task will have its own properties, which are documented in the online documentation as well as in the documentation integrated in the Kestra graphical interface (Documentation -> Plugins -> then select a plugin in the right menu).

Within the editor, the YAML description of the flow is validated and autocompletion is available for the type and properties of the tasks via CTRL+SPACE.

Kestra is based on a plugin system that allows you to add tasks (but also triggers or conditions that we will see later). By default, there are very few plugins provided in Kestra but the Docker image used in the Docker compose is built by adding all the plugins maintained by the Kestra development team, their documentation is here : https://kestra.io/plugins/.

To launch this first flow, go to the Executions tab and then click the New Execution button, which will switch to the Gantt view of the flow execution which is updated in real time with the status of the execution.

The Logs tab allows you to see the runtime logs.

Note the Hello World log generated by the hello task.

Now let’s add an input named name to our flow to give it the name of the person to say hello to. To do this, we need to edit the flow by selecting the flow, either via the breadcrumb navigation at the top ( Home / Flows / fr.loicmathieu.example.hello-world), or by left-clicking on Flows and selecting it from the list; then go to the Source tab.

We can pass data to a flow via inputs. We will define a name input of type STRING.

id: hello-world
namespace: fr.loicmathieu.example

inputs:
  - type: STRING
    name: name

tasks:
  - id: hello
    type: io.kestra.core.tasks.debugs.Echo
    format: "Hello {{inputs.name}}"

When we click on the New Execution button, a form asks us to fill in the flow inputs.

After entering a value for the name and running the flow, we can see that the log contains the value of the input.

Hello {{inputs.name}} is an expression that uses the Pebble templating engine that will render what is between the {{ and }} moustache. Here the Pebble expression will fetch the value of the inputs.name variable that points to the input with name name.

Inputs and BigQuery

In this example, the flow will take a CSV file as input, load it into Google BigQuery, then run a query on the loaded data and display the result in the logs.

I’m using BigQuery here because it’s easier than starting a database, but it requires a Google Cloud account and a service account set up. To do this, you need to create a Google Cloud service account and use it in the serviceAccount property of the BigQuery task. To avoid hard-coding this variable, it is possible to use an environment variable or a cluster global variable, see the online documentation on variables.

Without further ado, the solution 😉

id: beer
namespace: fr.loicmathieu.example
description: A flow to handle my beers data

inputs:
  - type: FILE
    description: Beers data
    name: beers

tasks:
  - id: start-log
    type: io.kestra.core.tasks.debugs.Echo
    format: "{{taskrun.startDate}} - {{task.id}} - Launching the flow"
  - id: load-beers
    type: io.kestra.plugin.gcp.bigquery.Load
    serviceAccount: "<service account key>"
    csvOptions:
      fieldDelimiter: ","
    destinationTable: beers.beers
    format: CSV
    from: "{{inputs.beers}}"
  - id: query-beers
    type: io.kestra.plugin.gcp.bigquery.Query
    serviceAccount: "<service account key>"
    fetchOne: true
    sql: |
      SELECT count(*) as count FROM beers.beers
  - id: end-log
    type: io.kestra.core.tasks.debugs.Echo
    format: "{{taskrun.startDate}} - {{outputs['query-beers'].row.count}} beers loaded"

Here I have defined an input of type FILE named beers. Kestra will detect the type of the input and generate the corresponding flow execution form. This will upload a CSV file containing a list of beers. This file will be stored in Kestra’s internal storage and available in all tasks.

The load-beers task of type io.kestra.plugin.gcp.bigquery.Load is used to load a CSV file into a BigQuery table. The BigQuery dataset beers must have been created before this task is executed. The from property takes a file from the internal storage (actually the URI of the file, it will be retrieved at the last moment from the internal storage). Here I use the Pebble expression {{inputs.beers}} to retrieve the file passed as input to the flow.

The query-beers task of type io.kestra.plugin.gcp.bigquery.Query is used to perform a BigQuery query. There are several ways to store the query result. Here, I use the fetchOne: true property which configures the task to fetch a single row and put the result of the query in the task output. It is also possible to load all rows (fetch: true), or store the results in Kestra’s internal storage (store: true) which is recommended for queries that bring back many rows.

The end-log task will write a log, we have already seen it before. Here, we want to write in the log the number of records loaded from the database, so we will get the corresponding output from the query-beers task via a Pebble expression: {{outputs['query-beers'].row.count}}.

The expression {{outputs['query-beers'].row.count}} may seem intriguing:

  • outputs['query-beers']: means the output of the query-beers task, so far we have seen the dotted notation (.) to access outputs, but when this one containing a ‘-‘ character, we are forced to use the subscript notation ([]) because ‘-‘ is a special character for Pebble.
  • row: is the name of the attribute set as output in the query-beers task, a task can have multiple attributes as outputs, refer to the task documentation for their list.
  • count: is the name of the column.

ForEach and file format

In this example, we will query the 10 most viewed Wikipedia pages using the BigQuery public dataset wikipedia.pageviews_2023 for French, English and German languages. Then we will transform the result into CSV.

id: wikipedia-top-ten
namespace: fr.loicmathieu.example
description: A flow that loads wikipedia top 10 FR pages each hour

tasks:
  - id: start-log
    type: io.kestra.core.tasks.debugs.Echo
    format: "{{taskrun.startDate}} - {{task.id}} - Launching the flow"
  - id: for-each-countries
    type: io.kestra.core.tasks.flows.EachSequential
    tasks:
      - id: query-top-ten
        type: io.kestra.plugin.gcp.bigquery.Query
        serviceAccount: "<service account key>"
        sql: |
          SELECT DATETIME(datehour) as date, title, views 
          FROM `bigquery-public-data.wikipedia.pageviews_2023` 
          WHERE DATE(datehour) = current_date() and wiki = '{{taskrun.value}}' and title not in ('Cookie_(informatique)', 'Wikipédia:Accueil_principal', 'Spécial:Recherche')
          ORDER BY datehour desc, views desc
          LIMIT 10
        store: true
      - id: write-csv
        type: io.kestra.plugin.serdes.csv.CsvWriter
        from: "{{outputs['query-top-ten'][taskrun.value].uri}}"
    value: '["fr", "en", "de"]'

The for-each-countries task of type io.kestra.core.tasks.flows.EachSequential allows for looing. It will perform the list of child tasks several times for the values passed in its value property; here the languages fr, en and de.

The query-top-ten task of type io.kestra.plugin.gcp.bigquery.Query will execute a query on BigQuery that will be stored in Kestra’s internal storage (store: true). It uses the Pebble expression {{taskrun.value}} which retrieves the current value from the EachSequential loop.

The write-csv task of type io.kestra.plugin.serdes.csv.CsvWriter will rewrite the file stored by the previous task to the CSV format. By default, Kestra uses the Amazon Ion object storage format, so this task switches from the Ion to the CSV format. It uses the Pebble expression {{outputs['query-top-ten'][taskrun.value].uri}} whose [taskrun.value].uri attribute retrieves the value of the uri attribute for the current loop iteration.

After executing the flow, you can go to the Outputs tab to access the task outputs and, among other things, download the CSV files generated by the flow.

Trigger

By default, a flow can only be runned manually via the graphical interface or the Kestra API.

It is possible to trigger a flow from an external event, this is the role of the trigger.

Kestra includes three basic triggers: flow, which allows you to trigger a flow from another flow, webhook which allows you to trigger a flow from a webhook URL and schedule which allows you to trigger a flow periodically from a cron expression. Many other triggers are available within Kestra’s plugins and allow to trigger a flow from a message in a broker, a file, or the presence of a record in a database table for example.

The following example will trigger a flow every minute, it uses a cron expression to define its triggering periodicity.

id: flow-with-trigger
namespace: fr.loicmathieu.example

triggers:
  - id: schedule
    type: io.kestra.core.models.triggers.types.Schedule
    cron: "*/1 * * * *"


tasks:
  - id: "echo"
    type: "io.kestra.core.tasks.debugs.Echo"
    format: "{{task.id}} > {{taskrun.startDate}}"

Data processing with Python and Pandas

Kestra offers advanced functionality via tasks that allow you to execute Bash, Python or NodeJS scripts.

The following example will query the same BigQuery dataset of Wikipedia page views, write it to CSV format, and then use that CSV in a io.kestra.core.tasks.scripts.Python task that allows you to run a Python script.

This task takes as properties:

  • inputFiles: a file list that should contain the main.py file that will be called by the task. A second file data.csv is defined that will allow local access of the file created by the task write-csv. Kestra will automatically retrieve it from its internal storage and make it available in the working directory of the Python task.
  • requirements: a list of pip dependencies, here we put the Pandas library that allows to analyze the data of the CSV file.
id: wikipedia-top-ten-pyhton-panda
namespace: fr.loicmathieu.example
description: A flow that loads wikipedia top 10 FR pages each hour

tasks:
  - id: query-top-ten
    type: io.kestra.plugin.gcp.bigquery.Query
    serviceAccount: "<service account key>"
    sql: |
      SELECT DATETIME(datehour) as date, title, views 
      FROM `bigquery-public-data.wikipedia.pageviews_2023` 
      WHERE DATE(datehour) = current_date() and wiki = 'fr' and title not in ('Cookie_(informatique)', 'Wikipédia:Accueil_principal', 'Spécial:Recherche')
      ORDER BY datehour desc, views desc
      LIMIT 10
    store: true
  - id: write-csv
    type: io.kestra.plugin.serdes.csv.CsvWriter
    from: "{{outputs['query-top-ten'].uri}}"
  - id: "python"
    type: io.kestra.core.tasks.scripts.Python
    inputFiles:
      data.csv: "{{outputs['write-csv'].uri}}"
      main.py: |
        import pandas as pd
        from kestra import Kestra
        data = pd.read_csv("data.csv")
        data.info()
        sumOfViews = data['views'].sum()
        Kestra.outputs({'sumOfViews': int(sumOfViews)})
    requirements:
      - pandas

The Python script will use Pandas to read the CSV file and transform it into a Pandas data frame, then perform the sum of the views column. This sum will then be put into the task output using the Python Kestra library.

Here are the execution logs of this flow.

Conclusion

In this introductory article, we have seen the main concepts of Kestra and some flow examples.

To go further, you can check out the online documentation as well as the plugin list.

Ketra is an open source community project available on GitHub, feel free to:

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.