J’ai enfin pris le temps de tester Apache Pinot

J’ai enfin pris le temps de tester Apache Pinot

Cela faisait très longtemps que j’avais envie de tester Apache Pinot et j’ai enfin pris le temps de le faire !

Tout d’abord, une rapide description de Pinot

Pinot est un datastore OLAP (OnLine Analytical Processing) distribuée et temps réel, spécialement conçu pour fournir des analyses à très faible latence, même à un débit extrêmement élevé. Il peut ingérer des données directement à partir de sources de données en continu (stream) ou de sources de données par lots (batch).
Au cœur du système se trouve un stockage en colonne, avec plusieurs techniques d’indexation et de pré-agrégation intelligentes pour une faible latence.
Pinot a été construit par des ingénieurs de LinkedIn et Uber et est conçu pour scaler up ou out sans limites. Les performances restent toujours constantes en fonction de la taille de votre cluster et d’un seuil de requêtes par seconde (RPS) attendu.

Un cluster Pinot est constitué des éléments suivants :

  • Pinot Controller : constitué de Apache Helix (cluster management) et Apache Zookeeper (coordination), c’est le composant central de Pinot qui va prendre en charge l’orchestration du cluster, la réplication, et la gestion de l’état des différents composants du cluster.
  • Pinot Broker : reçoit les requêtes des clients et achemine leurs exécutions vers un ou plusieurs serveurs Pinot avant de renvoyer une réponse consolidée.
  • Pinot Server : stocke les segments (une partie d’une table) et exécute les requêtes. Un serveur peut être soit real-time (en cas de données streamée) soit offline (en cas de données envoyées par batch et immuable).
  • Pinot Minion : composant optionnel permettant d’exécuter des tâches en tâche de fond au sein du cluster, par exemple pour de la purge de données.

Premier lancement

Commençons donc pas le commencement : lancer Pinot localement ! Apache Pinot étant un système distribué à plusieurs composants (Zookeeper, Pinot Controller, Pinot Broker, Pinot Server), je décide donc de passer par une image Docker tout en un pour le tester localement, ça me semble le plus simple.

En me basant sur le guide Getting Started, je lance Pinot avec cette commande Docker qui permet de le démarrer avec un jeu de données de statistique de Baseball pré-importé. Le conteneur, après avoir instancié les composants et importer les données, va ensuite exécuter un ensemble de requêtes et afficher leurs résultats dans les logs.

docker run \
    -p 9000:9000 \
    apachepinot/pinot:0.9.3 QuickStart \
    -type batch

Après démarrage de Pinot et import du jeu de données, je peux utiliser la console Pinot (disponible sur le port 9000 par défaut) pour accéder au cluster.

Celle-ci permet de visualiser l’état du cluster via l’onglet Cluster Management.

Et de lancer des requêtes via l’onglet Query Console. J’y fais un petit count(*) de la table baseballStats créé, la requête s’effectue quasiment immédiatement (quelques millisecondes) mais en même temps il n’y a que 97889 lignes donc c’est normal.

Analyse des émissions de carbone européenne

Bon, tout ça c’est joli, mais pour dépasser le Hello World et vraiment tester Pinot, il va me falloir un jeu de données un peu plus volumineux avec lequel faire joujou.

La commission européenne mets à disposition un ensemble de donnée en Open Data, c’est le moment d’en profiter.

Je jette mon dévolu sur un jeu de données sur les émissions de gaz à effets de serre dans la zone euro (vous pouvez le trouver sur cette page) : Air emissions accounts by NACE Rev. 2 activity de 4 millions de points de données.

Voici la description du jeu de données : This data set reports the emissions of greenhouse gases and air pollutants broken down by 64 industries (classified by NACE Rev. 2) plus households. Concepts and principles are the same as in national accounts. Complete data starts from reference year 2008.

Il va maintenant falloir charger les données, chaque colonne est séparée par une tabulation, l’absence de donnée est définie par :, la première colonne contenant la catégorie NACE il serait judicieux de la séparer en quatre colonnes : les intitulés sont airpol,nace_r2,unit,geo\time.

Pour envoyer cette donnée à Pinot, je me suis basé sur le guide Pushing your data to Pinot.

Pour commencer, il faut définir un schéma à la donnée, voici celui que j’ai utilisé :

{
  "schemaName": "greenhouseGazEmission",
  "dimensionFieldSpecs": [
    {
      "name": "airpol",
      "dataType": "STRING"
    },
    {
      "name": "nace_r2",
      "dataType": "STRING"
    },
    {
      "name": "unit",
      "dataType": "STRING"
    },
    {
      "name": "geo",
      "dataType": "STRING"
    }
  ],
  "metricFieldSpecs": [
    {
      "name": "2020",
      "dataType": "FLOAT"
    },
    {
      "name": "2019",
      "dataType": "FLOAT"
    },
    [...] // repeate the same for all fields down to 1995
  ]
}

Dans ce schéma, on définit deux types de champs, les champs de type dimensions : ce sont des chaînes de caractères sur lesquels nous allons pouvoir filtrer ou grouper les données, et les champs de type métrique qui sont ici tous des floats (un champ par année de donnée disponible) sur lesquels nous allons pouvoir faire des calculs (agrégations).

Il faut ensuite définir la table où stocker les données, voici la définition de la table, elle est pour l’instant très simple :

{
  "tableName": "greenhouseGazEmission",
  "segmentsConfig" : {
    "replication" : "1",
    "schemaName" : "greenhouseGazEmission"
  },
  "tableIndexConfig" : {
    "invertedIndexColumns" : [],
    "loadMode"  : "MMAP"
  },
  "tenants" : {
    "broker":"DefaultTenant",
    "server":"DefaultTenant"
  },
  "tableType":"OFFLINE",
  "metadata": {}
}

Pour pouvoir créer cette table, j’ai redémarré un cluster Pinot depuis le Docker Compose proposé dans la documentation, puis utilisé la commande Docker suivante qui va lancer une commande de création de la table depuis les définitions de schéma et de table précédemment créés :

docker run --rm -ti \
    --network=pinot_default \
    -v ~/dev/pinot/data:/tmp/pinot-quick-start \
    --name pinot-batch-table-creation \
    apachepinot/pinot:0.9.3 AddTable \
    -schemaFile /tmp/pinot-quick-start/greenhousGazEmission-schema.json \
    -tableConfigFile /tmp/pinot-quick-start/greenhousGazEmission-table.json \
    -controllerHost manual-pinot-controller \
    -controllerPort 9000 -exec

Nous pouvons ensuite aller vérifier via la console Pinot (http://localhost:9000) que la table a bien été créée.

Maintenant, place à l’insertion des données !

Le fichier est de type TSV et contient : quand il n’y a pas de données, pour le préparer à l’ingestion dans Pinot nous allons exécuter une commande sed pour le transformer en CSV (donc remplacer les tabulations par des virgules) et supprimer certains caractères incorrects, cette commande sed va aussi modifier la ligne des noms des champs qu’il faudra ensuite remettre à ceux attendu par le schéma.

sed 's/\t/,/g;s/://g;s/[pseb]//g;s/[[:blank:]]//g' env_ac_ainah_r2.tsv > data/env_ac_ainah_r2.csv

L’ingestion dans Pinot se fait via un job d’insertion défini au format yaml.

executionFrameworkSpec:
  name: 'standalone'
  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
  segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
jobType: SegmentCreationAndTarPush
inputDirURI: '/tmp/pinot-quick-start'
includeFileNamePattern: 'glob:**/*.csv'
outputDirURI: '/tmp/pinot-quick-start/segments/'
overwriteOutput: true
pinotFSSpecs:
  - scheme: file
    className: org.apache.pinot.spi.filesystem.LocalPinotFS
recordReaderSpec:
  dataFormat: 'csv'
  className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
  configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
  tableName: 'greenhouseGazEmission'
  schemaURI: 'http://pinot-controller:9000/tables/greenhouseGazEmission/schema'
  tableConfigURI: 'http://pinot-controller:9000/tables/greenhouseGazEmission'
pinotClusterSpecs:
  - controllerURI: 'http://pinot-controller:9000'

Ce job définit entre autre :

  • jobType: SegmentCreationAndTarPush : le job va créer un segment de table. Un segment est une partition des données de la table. Si votre jeu de données est important il faudra découper le fichier CSV pour pouvoir lancer plusieurs jobs et obtenir plusieurs segments.
  • inputDirURI et includeFileNamePattern pour définir l’endroit où chercher le ou les CSV des données à charger.
  • recordReaderSpec qui définit le format des données CSV.
  • tableSpec qui définit la spécification de la table cible, celle que nous avons définie précédemment.

Pour lancer le job, vous pouvez utiliser la commande Docker suivante :

docker run --rm -ti \
    --network=pinot_default \
    -v ~/dev/pinot/data:/tmp/pinot-quick-start \
    --name pinot-data-ingestion-job \
    apachepinot/pinot:0.9.3 LaunchDataIngestionJob \
    -jobSpecFile /tmp/pinot-quick-start/job-ingest.yml

Après ingestion, nous avons maintenant 266456 lignes dans notre table que nous pouvons ensuite requêter depuis la console Pinot.

Par exemple via la requête suivante : select geo, sum(2019), sum(2020), sum(2021) from greenhouseGazEmission group by geo

Pinot utilise Apache Calcite pour le requêtage, nous pouvons donc utiliser du SQL ANSI ce qui simplifie grandement le requêtage. La requête ci-dessus va donner les résultats suivants :

Le jeu de donnée étant assez petit, les requêtes s’exécutent en quelques millisecondes malgré qu’aucun indexe n’ait été créé.

Essayons maintenant la requête suivante : select sum(2019), sum(2020), sum(2021) from greenhouseGazEmission where unit = 'G_HAB', elle s’exécute en 24ms, et on peut voir les informations de requêtage suivant :

  • numDocsScanned: 66614
  • totalDocs: 266456
  • numEntriesScannedInFilter: 266456

Pinot a donc scanné 66614 documents sur 266456, les documents qui correspondent à l’unit G_HAB. Lors de la phase de filter, il a scanné 266456 entrées, il a donc fait un full scan sur la table.

Optimisation du requêtage grâce aux indexes

Pinot permet l’ajout d’indexe pour optimiser le requêtage, dans la requête précédemment utilisée, la colonne unit était utilisé pour filtrer les données.

Je vais donc modifier la structure de table pour ajouter un index inversé sur la colonne unit.

Pour faire cela nous pouvons modifier la description de la table pour ajouter cet indexe :

{
  "tableName": "greenhouseGazEmission",
  "tableIndexConfig" : {
    "invertedIndexColumns" : ["unit"],
    "loadMode"  : "MMAP"
  },
  [...]
}

Pour plus de simplicité, j’ai utilisé l’interface graphique de Pinot pour ajouter l’index (en éditant la table) puis recharger les segments. Recharger les segments après une modification de table est nécessaire pour que l’index soit créé et mis à jour.

Après relance de la requête, on voit que le numEntriesScannedInFilter passe à 0, Pinot utilisant l’indexe précédemment créé.

Une des forces de Pinot est qu’il supporte beaucoup de type d’indexes différent, ce qui permet d’implémenter des cas d’utilisation différents, et d’optimiser pour le requêtage ou le stockage, chaque indexe utilisant de l’espace disque.

Pour aller plus loin sur les capacités d’indexation de Pinot vous pouvez lire mon article : Apache Pinot et de ses différents types d’indexes.

2 réflexions sur « J’ai enfin pris le temps de tester Apache Pinot »

Laisser un commentaire

Ce site utilise Akismet pour réduire les indésirables. En savoir plus sur comment les données de vos commentaires sont utilisées.