Introduction à Kestra

Introduction à Kestra

Kestra est un orchestrateur et scheduler de donnée open source. Avec Kestra, les workflows de données, appelés flows, utilisent le format YAML et sont exécutés par son moteur via un appel API, l’interface utilisateur, ou un trigger (webhook, schedule, SQL query, Pub/Sub message, …).

Les notions importantes de Kestra sont :

  • Le flow : qui décrit la manière dont la donnée sera orchestrée (le workflow donc). C’est une suite de tasks.
  • La task : une étape du flow qui va permettre de réaliser une action sur une ou plusieurs données entrantes, les inputs, puis de générer zéro ou une donnée en sortie, l’output.
  • L’execution : qui est une instance de flow suite à un déclenchement.
  • Le trigger : qui définie la manière dont un flow peut être déclenché.

Nous allons voir ces notions en pratique dans la suite de l’article.

Pour lancer Kestra en local, le plus simple est d’utiliser le fichier Docker compose fournit, puis de le lancer via docker compose up.

Ce Docker compose va lancer Kestra en mode standalone (tous les composants en un seul processus) avec une base de données PostgreSQL comme queue et repository (pour le stockage et l’exécution des flows) et un storage local (pour le stockage des données des flows).

Kestra est composé de plusieurs composants qui communiquent ensemble via des queues et stockent les informations des flows via des repositories, ces queues et repositories peuvent être implémentées de différentes manières (memory, JDBC, Kafka, Elasticsearch). Je ne vais pas entrer en détail dans l’architecture de Kestra aujourd’hui, mais pour les plus curieux vous pouvez vous référer à la documentation d’architecture de Kestra.

Une fois Kestra démarré, une interface graphique sera disponible sur le port 8080 : http://localhost:8080, c’est via cette interface que nous allons faire tous les exemples de cet article.

Au premier lancement, un Guided Tour vous sera proposé, vous pouvez le suivre ou le passer pour pouvoir suivre les exemples de cet article.

Mon premier flow

Pour créer un flow, allez dans le menu Flows puis cliquez sur le bouton Create en bas à droite. Vous voici maintenant avec un textarea dans lequel vous allez pouvoir entrer le descriptif YAML du flow.

id: hello-world
namespace: fr.loicmathieu.example

tasks:
  - id: hello
    type: io.kestra.core.tasks.debugs.Echo
    format: "Hello World"

Un flow a :

  • Une propriété id qui est son identifiant unique au sein d’un namespace.
  • Une propriété namespace, un namespace est un espace de nommage pour les flux, les namespaces sont hierarchique comme les répertoires d’un système de fichier.
  • Une propriété tasks qui est la liste des tâches à réaliser à l’exécution du flow.

Ici, nous avons ajouté une seule tâche qui a trois propriétés :

  • Une propriété id qui est son identifiant unique au sein du flow.
  • Une propriété type qui est le nom de la classe qui représente la tâche.
  • Une propriété format, c’est une propriété propre aux tâches de type Echo qui définit le format du message à logguer (une tâche de type Echo est comme la commande shell echo).

Chaque tâche aura ses propres propriétés, elles sont documentées dans la documentation en ligne ainsi que dans la documentation intégrée à l’interface graphique de Kestra (Documentation -> Plugins -> puis sélectionnez un plugin dans le menu de droite).

Au sein de l’éditeur, la description YAML du flow est validé et de l’autocomplétion est disponible pour le type et les propriétés des tâches via CTRL+SPACE.

Kestra se base sur un système de plugins qui permet d’ajouter des tâches (mais aussi des triggers ou des conditions qu’on verra plus tard). Par défaut, il y a très peu de plugins fournit dans Kestra mais l’image Docker utilisée dans le Docker compose est construite en ajoutant tous les plugins maintenus par l’équipe de développement de Kestra, leur documentation est ici : https://kestra.io/plugins/.

Pour lancer ce premier flow, allez dans l’onglet Executions puis cliquez sur le bouton New Execution.
Vous basculerez alors sur la vue Gantt de l’exécution du flow qui est mise à jour en temps réelle avec le statut de l’exécution.

L’onglet Logs permet de voir les logs de l’exécution.

On peut remarquer le log Hello World généré par la tâche hello.

Ajoutons maintenant un input nommé name à notre flow pour lui donne le nom de la personne à qui dire bonjour.
Pour cela, il faut éditer le flow en sélectionnant le flow, soit via la navigation breadcrumb en haut ( Home / Flows / fr.loicmathieu.example.hello-world), soit en cliquant à gauche sur Flows et en le sélectionnant dans la liste ; puis aller dans l’onglet Source.

On peut passer des données à un flow via des inputs. Nous allons définir un input name de type STRING.

id: hello-world
namespace: fr.loicmathieu.example

inputs:
  - type: STRING
    name: name

tasks:
  - id: hello
    type: io.kestra.core.tasks.debugs.Echo
    format: "Hello {{inputs.name}}"

Quand on clique sur le boutton New Execution pour exécuter un flow, un formulaire nous demande de renseigner les inputs du flow.

Après avoir rentré une valeur pour le nom et exécuté le flow, on peut constater que le log contient bien la valeur de l’input.

Hello {{inputs.name}} est une expression qui utilise le moteur de templating Pebble qui va faire un rendu de ce qui est entre les moustaches {{ et }}. Ici l’expression Pebble va chercher la valeur de la variable inputs.name qui pointe vers l’input de nom name.

Inputs et BigQuery

Dans cet exemple, le flow va prendre en entrée (en input) un fichier CSV, le charger dans Google BigQuery, puis exécuter une requête sur les données chargées et afficher dans les logs le résultat.

J’utilise ici BigQuery car c’est plus simple que de démarrer une base de données, mais cela nécessite un compte Google Cloud et un service account de configuré. Pour faire cela, il faut créer un service account Google Cloud et le renseigner dans la propriété serviceAccount de la tâche BigQuery. Pour éviter de coder en dure cette variable, il est possible d’utiliser une variable d’environement ou une variable globale au cluster, voir la documentation en ligne sur les variables.

Sans plus tarder, la solution ; )

id: beer
namespace: fr.loicmathieu.example
description: A flow to handle my beers data

inputs:
  - type: FILE
    description: Beers data
    name: beers

tasks:
  - id: start-log
    type: io.kestra.core.tasks.debugs.Echo
    format: "{{taskrun.startDate}} - {{task.id}} - Launching the flow"
  - id: load-beers
    type: io.kestra.plugin.gcp.bigquery.Load
    serviceAccount: "<service account key>"
    csvOptions:
      fieldDelimiter: ","
    destinationTable: beers.beers
    format: CSV
    from: "{{inputs.beers}}"
  - id: query-beers
    type: io.kestra.plugin.gcp.bigquery.Query
    serviceAccount: "<service account key>"
    fetchOne: true
    sql: |
      SELECT count(*) as count FROM beers.beers
  - id: end-log
    type: io.kestra.core.tasks.debugs.Echo
    format: "{{taskrun.startDate}} - {{outputs['query-beers'].row.count}} beers loaded"

J’ai défini ici un input de type FILE nommé beers. Kestra va détecter le type de l’input et générer le formulaire d’exécution du flow correspondant. Cela permettra d’uploader un fichier CSV contenant une liste de bière. Ce fichier sera stocké dans le stockage interne de Kestra et disponible dans toutes les tâches.

La tâche load-beers de type io.kestra.plugin.gcp.bigquery.Load permet de charger un fichier CSV dans une table BigQuery. Le dataset BigQuery beers doit avoir été créé avant l’exécution de cette tâche. La propriété from prend un fichier du stockage interne (en réalité l’URI du fichier, celui-ci sera récupéré au dernier moment depuis le stockage interne). J’utilise ici l’expression Pebble {{inputs.beers}} pour récupérer le fichier passé en input au flow.

La tâche query-beers de type io.kestra.plugin.gcp.bigquery.Query permet d’effectuer une requête BigQuery. Il y a plusieurs manières de stocker le résultat de la requête. Ici, j’utilise la propriété fetchOne: true qui configure la tâche pour récupérer une seule ligne et mettre le résultat de la requête en output de la tâche. Il est aussi possible de charger toutes les lignes (fetch: true), ou de stocker les résultats dans le stockage interne de Kestra (store: true) ce qui est conseillé pour les requêtes qui ramènent beaucoup de lignes.

La tâche end-log va écrire un log, nous l’avons déjà vu précédemment. Ici, nous voulons écrire dans les logs le nombre d’enregistrements chargés depuis la base de données, nous allons donc récupérer l’output correspondant de la tâche query-beers via une expression Pebble : {{outputs['query-beers'].row.count}}.

L’expression {{outputs['query-beers'].row.count}} peut sembler intrigante :

  • outputs['query-beers'] : signifie l’output de la tâche query-beers, nous avons jusqu’à présent vu la notation pointée (.) pour accéder aux outputs, mais quand celui-ci contenant un caractère ‘-‘, on est obligé d’utiliser la notation subscript ([]) car ‘-‘ est un caractère spécial pour Pebble.
  • row : est le nom de l’attribut mis en output dans la tâche query-beers, une tâche peut avoir plusieurs attributs en outputs, se référer à la documentation de la tâche pour leur liste.
  • count : est le nom de la colonne.

ForEach et format de fichier

Dans cet exemple, nous allons requêter les 10 pages Wikipedia les plus vues grâce au dataset public BigQuery wikipedia.pageviews_2023 pour les langues français, anglais et allemand. Puis nous allons transformer le résultat en CSV.

id: wikipedia-top-ten
namespace: fr.loicmathieu.example
description: A flow that loads wikipedia top 10 FR pages each hour

tasks:
  - id: start-log
    type: io.kestra.core.tasks.debugs.Echo
    format: "{{taskrun.startDate}} - {{task.id}} - Launching the flow"
  - id: for-each-countries
    type: io.kestra.core.tasks.flows.EachSequential
    tasks:
      - id: query-top-ten
        type: io.kestra.plugin.gcp.bigquery.Query
        serviceAccount: "<service account key>"
        sql: |
          SELECT DATETIME(datehour) as date, title, views 
          FROM `bigquery-public-data.wikipedia.pageviews_2023` 
          WHERE DATE(datehour) = current_date() and wiki = '{{taskrun.value}}' and title not in ('Cookie_(informatique)', 'Wikipédia:Accueil_principal', 'Spécial:Recherche')
          ORDER BY datehour desc, views desc
          LIMIT 10
        store: true
      - id: write-csv
        type: io.kestra.plugin.serdes.csv.CsvWriter
        from: "{{outputs['query-top-ten'][taskrun.value].uri}}"
    value: '["fr", "en", "de"]'

La tâche for-each-countries de type io.kestra.core.tasks.flows.EachSequential permet de faire une boucle. Elle va réaliser plusieurs fois la liste des tâches enfant pour les valeurs passées dans sa propriété value ; ici les langues fr, en et de.

La tâche query-top-ten de type io.kestra.plugin.gcp.bigquery.Query va exécuter une requête sur BigQuery qui sera stockée dans le stockage interne de Kestra (store: true). Elle utilise l’expression Pebble {{taskrun.value}} qui permet de récupérer la valeur en cours de la boucle EachSequential.

La tâche write-csv de type io.kestra.plugin.serdes.csv.CsvWriter va réécrire le fichier stocké par la tâche précédente au format CSV. Par défaut, Kestra utilise le format de stockage objet Amazon Ion, cette tâche permet donc de passer du format Ion au format CSV. Elle utilise l’expression Pebble {{outputs['query-top-ten'][taskrun.value].uri}} dont l’attribut [taskrun.value].uri permet de récupérer la valeur de l’attribut uri pour l’itération de boucle actuelle.

Après exécution du flow, vous pouvez aller dans l’onglet Outputs pour accéder aux outputs des tâches et, entre autres, télécharger les fichiers CSV générés par le flow.

Trigger

Par défaut, un flow ne peut être lancé que manuellement via l’interface graphique ou l’API de Kestra.

Il est possible de déclancher un flow depuis un évènement extérieur, c’est le rôle du trigger.

Kestra inclu de base trois triggers: flow, qui permet de déclancher un flow depuis un autre flow, webhook qui permet de déclancher un flow depuis une URL webhook et schedule qui permet de déclencher un flow périodiquement depuis une expression cron. De nombreux autres triggers sont disponibles au sein des plugins de Kestra et permettent de déclancher un flow depuis un message dans un broker, un fichier, ou la présence d’un enregistrement dans une table de base de données par exemple.

L’exemple suivant va déclencher un flow toutes les minutes, il utilise une cron expression pour définir sa périodicité de déclenchement.

id: flow-with-trigger
namespace: fr.loicmathieu.example

triggers:
  - id: schedule
    type: io.kestra.core.models.triggers.types.Schedule
    cron: "*/1 * * * *"


tasks:
  - id: "echo"
    type: "io.kestra.core.tasks.debugs.Echo"
    format: "{{task.id}} > {{taskrun.startDate}}"

Traitement de donnée avec Python et Pandas

Kestra offre des fonctionnalités avancées via des tâches permettant d’exécuter des scripts Bash, Python ou NodeJS.

L’exemple suivant va requêter le même dataset BigQuery des pages vues Wikipedia, l’écrire au format CSV, puis utiliser ce CSV dans une tâche de type io.kestra.core.tasks.scripts.Python qui permet de lancer un script Python.

Cette tâche prend en propriétés :

  • inputFiles : une liste de fichier qui doit contenir le fichier main.py qui sera appelé par la tâche. Un deuxième ficher data.csv est défini qui va permettre d’accéder localement au fichier créé par la tâche write-csv. Kestra va automatiquement le récupérer depuis son internal storage et le mettre à disposition dans le répertoire de travail de la tâche Python.
  • requirements : une liste de dépendances pip, ici, nous avons mis la librairie Pandas qui permet d’analyser les données du fichier CSV.
id: wikipedia-top-ten-pyhton-panda
namespace: fr.loicmathieu.example
description: A flow that loads wikipedia top 10 FR pages each hour

tasks:
  - id: query-top-ten
    type: io.kestra.plugin.gcp.bigquery.Query
    serviceAccount: "<service account key>"
    sql: |
      SELECT DATETIME(datehour) as date, title, views 
      FROM `bigquery-public-data.wikipedia.pageviews_2023` 
      WHERE DATE(datehour) = current_date() and wiki = 'fr' and title not in ('Cookie_(informatique)', 'Wikipédia:Accueil_principal', 'Spécial:Recherche')
      ORDER BY datehour desc, views desc
      LIMIT 10
    store: true
  - id: write-csv
    type: io.kestra.plugin.serdes.csv.CsvWriter
    from: "{{outputs['query-top-ten'].uri}}"
  - id: "python"
    type: io.kestra.core.tasks.scripts.Python
    inputFiles:
      data.csv: "{{outputs['write-csv'].uri}}"
      main.py: |
        import pandas as pd
        from kestra import Kestra
        data = pd.read_csv("data.csv")
        data.info()
        sumOfViews = data['views'].sum()
        Kestra.outputs({'sumOfViews': int(sumOfViews)})
    requirements:
      - pandas

Le script Python va utiliser Pandas pour lire le fichier CSV et le transformer en data frame Pandas, puis réaliser la somme de la colonne views. Cette somme sera ensuite mise en output de la tâche grâce à la librairie Python Kestra.

Voici les logs d’exécution de ce flow.

Conclusion

Dans cet article introductif, nous avons vu les principaux concepts de Kestra et quelques exemples de flow.

Pour aller plus loin, vous pouvez consulter la documentation en ligne ainsi que la liste des plugins.

Ketra est un projet communautaire open source disponible sur GitHub, n’hésitez pas à :

Laisser un commentaire

Ce site utilise Akismet pour réduire les indésirables. En savoir plus sur comment les données de vos commentaires sont utilisées.