Apache Pinot and its various types of indexes

Apache Pinot and its various types of indexes

Some time ago, I finally took the time to test Apache Pinot, you can find the story of my first experiments here.

Apache Pinot is a distributed real-time OnLine Analytical Processing (OLAP) datastore specifically designed to provide ultra-low latency analytics, even at extremely high throughput. If you don’t know it, start by reading my introductory article before this one.

One of the strengths of Pinot is its different types of indexes, it is these ones that we will explore in this article.

The Chicago Crimes dataset

We will use the Chicago Crimes dataset from Google Big Query which we will export to CSV. To retrieve this dataset, go to the BigQuery interface, then search for the public project bigquery-public-data, then the table chicago_crime; navigating to this URL should do the same: https://console.cloud.google.com/bigquery?p=bigquery-public-data&d=chicago_crime.

You then need to copy this table to a dataset in your GCP project, then export it to CSV.
This will give you 6 CSVs of about 250MB each, so 1.5GB of data to analyze.

This dataset contains crime data for the city of Chicago over several years; its description can be found here: https://console.cloud.google.com/marketplace/details/city-of-chicago-public-data/chicago-crime?filter=solution-type:dataset.

Once the CSVs have been retrieved, you will need to define a schema and a table, then create a Job to import the data.

Here is the schema we are going to use:

{
  "schemaName": "chicagoCrimes",
  "dimensionFieldSpecs": [
    {
      "name": "unique_key", "dataType": "LONG"
    },
    {
      "name": "case_number", "dataType": "STRING"
    },
    {
      "name": "block", "dataType": "STRING"
    },
    {
      "name": "iucr", "dataType": "STRING"
    },
    {
      "name": "primary_type", "dataType": "STRING"
    },
    {
      "name": "description", "dataType": "STRING"
    },
    {
      "name": "location_description", "dataType": "STRING"
    },
    {
      "name":"district", "dataType": "INT"
    },
    {
      "name": "ward", "dataType": "INT"
    },
    {
      "name": "community_area", "dataType": "INT"
    },
    {
      "name": "fbi_code", "dataType": "STRING"
    },
    {
      "name": "year", "dataType": "INT"
    },
    {
      "name": "location", "dataType": "STRING"
    },
    {
      "name": "arrest", "dataType": "BOOLEAN"
    },
    {
      "name": "domestic", "dataType": "BOOLEAN"
    },
    {
      "name": "beat", "dataType": "BOOLEAN"
    }
  ],
  "metricFieldSpecs": [
    {
      "name": "x_coordinate", "dataType": "FLOAT"
    },
    {
      "name": "y_coordinate", "dataType": "FLOAT"
    },
    {
      "name": "latitude", "dataType":"FLOAT"
    },
    {
      "name": "longitude", "dataType": "FLOAT"
    }
  ],
  "dateTimeFieldSpecs": [
    {
      "name": "date",
      "dataType": "STRING",
      "format": "1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss z",
      "granularity": "1:SECONDS"
    },
    {
      "name": "updated_on",
      "dataType": "STRING",
      "format": "1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss z",
      "granularity": "1:SECONDS"
    }
  ]
}

And here is the associated table, it does not currently contain any index:

{
  "tableName": "chicagoCrimes",
  "segmentsConfig" : {
    "replication" : "1",
    "schemaName" : "chicagoCrimes"
  },
  "tableIndexConfig" : {
    "invertedIndexColumns" : [],
    "loadMode"  : "MMAP"
  },
  "tenants" : {
    "broker":"DefaultTenant",
    "server":"DefaultTenant"
  },
  "tableType":"OFFLINE",
  "metadata": {}
}

To ingest the data you can use the following job:

executionFrameworkSpec:
  name: 'standalone'
  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner'
  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner'
  segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner'
jobType: SegmentCreationAndTarPush
inputDirURI: '/tmp/pinot-quick-start'
includeFileNamePattern: 'glob:**/*.csv'
outputDirURI: '/tmp/pinot-quick-start/segments/'
overwriteOutput: true
pinotFSSpecs:
  - scheme: file
    className: org.apache.pinot.spi.filesystem.LocalPinotFS
recordReaderSpec:
  dataFormat: 'csv'
  className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
  configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
  tableName: 'chicagoCrimes'
  schemaURI: 'http://pinot-controller:9000/tables/chicagoCrimes/schema'
  tableConfigURI: 'http://pinot-controller:9000/tables/chicagoCrimes'
pinotClusterSpecs:
  - controllerURI: 'http://pinot-controller:9000'

We will now start a Pinot cluster, the easiest way is to use the Docker Compose from the Pinot documentation.

To start the cluster via Docker Compose, run the command docker compose up. After a few minutes, you will have a Pinot cluster started whose interface is accessible at the URL http://localhost:9000.

The Pinot image allows you to launch table creation or data ingestion jobs, the following commands assume that the necessary resources are in a directory ~/dev/pinot/crime which contains :

  • The table schema in the file chicagoCrimes-schema.json
  • The configuration of the table in the file chicagoCrimes-table.json
  • The 6 CSVs of data in a directory data

To create the table you can use the following docker command:

docker run --rm -ti \
    --network=pinot_default \
    -v ~/dev/pinot/crime:/tmp/pinot-quick-start \
    --name pinot-batch-table-creation \
    apachepinot/pinot:0.10.0 AddTable \
    -schemaFile /tmp/pinot-quick-start/chicagoCrimes-schema.json \
    -tableConfigFile /tmp/pinot-quick-start/chicagoCrimes-table.json \
    -controllerHost manual-pinot-controller \
    -controllerPort 9000 -exec

To ingest the data you can use the following docker command:

docker run --rm -ti \
    --network=pinot_default \
    -v ~/dev/pinot/crime:/tmp/pinot-quick-start \
    --name pinot-data-ingestion-job \
    apachepinot/pinot:0.10.0 LaunchDataIngestionJob \
    -jobSpecFile /tmp/pinot-quick-start/job-ingest.yml

After ingestion we will have 6452716 rows in our table.

Performances without index

To test the performance of Pinot without any index, we will run a few queries from the Pinot administration console:

select count(*) from chicagoCrimes

select year, count(*) from chicagoCrimes where arrest = true group by year
 
select year, count(*) from chicagoCrimes where primary_type='NARCOTICS' group by year

select year, count(*) from chicagoCrimes where x_coordinate>1180000 group by year

select year, count(*) from chicagoCrimes where ward=45 group by year

select year, sum(community_area) from chicagoCrimes group by year

Observation: the queries run in a few (tens of) milliseconds on a dataset of 1.5 GB and 6.5 million rows.

Segments take up 476MB on disk.

The secret of these good performances without index is that each field is stored in a Forward Index, by default of type dictionary for dimension columns otherwise raw value.

Dictionary-encoded forward index with bit compression

In a dictionary-like forward index, an identifier is assigned to each unique value in a column, and a dictionary is constructed to associate the identifier with the value. The forward index stores identifiers compressed in bits. If you have few unique values, dictionary encoding can significantly improve index storage efficiency.

Source : https://docs.pinot.apache.org/basics/indexing/forward-index.

Pinot indexes

Inverted index

In an inverted index, a mapping is created for each field value. This mapping will store the list of documents that contain this value.

For example, for the following documents:

Document IDs primary_type
1 MURDER
2 MURDER
3 DRUGS

You will have the following inverted index:

primary_type Document IDs
MURDER 1, 2
DRUGS 3

Bloom Filter

A Bloom Filter makes it possible to exclude segments that do not contain any record corresponding to an EQUALITY predicate.

Range index

Same as an inverted index, but will create an index mapping for a range of values instead of creating one for each value.

Saves space for columns with lots of distinct values.

For columns of TIMESTAMP types, a dedicated index exists and serves the same purpose: the Timestamp index.

Star-tree

Star-Tree data structure offers a configurable trade-off between space and time and lets us achieve hard upper bound for query latencies for a given use case.

Source : https://docs.pinot.apache.org/basics/indexing/star-tree-index.

A star-tree index will pre-compute and store one or more pre-aggregations.

It will use a tree data structure to pre-compute sub-aggregations based on one dimension at certain nodes in the tree.

When querying, Pinot will then select the nodes taking part of the query and aggregate the sub-aggregation of these nodes.

A star-tree index is a tree structure that contains the following types of nodes:

  • Root Node (Orange) : Single root node, from which the rest of the tree can be traversed.
  • Leaf Node (Blue) : Can contain at most T records, where T is configurable.
  • Non-leaf Node (Green) : Nodes with more than T records are further split into children nodes.
  • Star-Node (Yellow) : Star-Node, contains the pre-aggregated records after removing the dimension on which the data was split for this level.
  • Dimensions Split Order ([D1, D2]) : Ordered list of dimensions that is used to determine the dimension to split on for a given level in the tree.

The results

To test the different types of indexes, we will create a chicagoCrimesWithIdx table with a set of indexes and run the same queries on the table with indexes and the one without to compare the performance.

The following table definition reuses the chicagoCrimes schema but adds indexes on some fields.

{
  "tableName": "chicagoCrimesWithIdx",
  "segmentsConfig" : {
    "replication" : "1",
    "schemaName" : "chicagoCrimes"
  },
  "tableIndexConfig" : {
    "invertedIndexColumns" : ["primary_type"],
    "bloomFilterColumns": ["ward"],
    "rangeIndexColumns": ["x_coordinate"],
    "starTreeIndexConfigs": [{
      "dimensionsSplitOrder": ["year"],
      "skipStarNodeCreationForDimensions": [],
      "functionColumnPairs": ["SUM__community_area"],
      "maxLeafRecords": 1
    }],
    "loadMode"  : "MMAP"
  },
  "tenants" : {
    "broker":"DefaultTenant",
    "server":"DefaultTenant"
  },
  "tableType":"OFFLINE",
  "metadata": {}
}

You can create the table via the following Docker command:

docker run --rm -ti \
    --network=pinot_default \
    -v ~/dev/pinot/crime:/tmp/pinot-quick-start \
    --name pinot-batch-table-creation \
    apachepinot/pinot:0.10.0 AddTable \
    -schemaFile /tmp/pinot-quick-start/chicagoCrimes-schema.json \
    -tableConfigFile /tmp/pinot-quick-start/chicagoCrimes-table-with-idx.json \
    -controllerHost manual-pinot-controller \
    -controllerPort 9000 -exec

And load the data into the table via the following Docker command, the job job-ingest-with-idx.yml is the same as the job job-ingest.yml except that it uses the new table description:

docker run --rm -ti \
    --network=pinot_default \
    -v ~/dev/pinot/crime:/tmp/pinot-quick-start \
    --name pinot-data-ingestion-job \
    apachepinot/pinot:0.10.0 LaunchDataIngestionJob \
    -jobSpecFile /tmp/pinot-quick-start/job-ingest-with-idx.yml

Inverted index

select year, count(*) from chicagoCrimes where primary_type='NARCOTICS' group by year
Index Y/N timeUsedMs numDocsScanned numEntriesScannedInFilter numEntriesScannedPostFilter
Without index 25ms 636118 6452716 636118
With index 11ms 636118 O 636118

We see here the advantage of an inverted index: the filter (the WHERE clause) did not scan any entry because it used the index.
Query execution time has been greatly improved from 25ms to 11ms.

Range index

select year, count(*) from chicagoCrimes where x_coordinate>1180000 group by year
Index Y/N timeUsedMs numDocsScanned numEntriesScannedInFilter numEntriesScannedPostFilter
Without index 29ms 990885 6452716 990885
With index 11ms 990885 641072 990885

We see here the interest of a range index: the filter (the WHERE clause) scanned far fewer entries because it used the index. As the index only have an entry by range, it still had to scan part of the entries (10% here).
Query execution time has been greatly improved from 29ms to 11ms.

Bloom filter

select year, count(*) from chicagoCrimesWithIdx where ward=45 group by year

A Bloom filter will filter the segments (segment pruning) to be processed, although here we have 6 segments, these segments was created without any filter logic (technical segment), so the Bloom filter will have no effect.

To be able to use a Bloom filter on the ward field, it would have been necessary to create the segments based on the value of this field (one segment per value range for example).

Star-tree index

select year, sum(community_area) from chicagoCrimes group by year
Index Y/N timeUsedMs numDocsScanned numEntriesScannedInFilter numEntriesScannedPostFilter
Without index 31ms 6452716 0 12905432
With index 6ms 132 0 264

With a star-tree index, documents will be pre-computed with pre-aggregations.
Instead of scanning the table documents, the index documents was scanned; hence the number of scanned documents of 132.
This query only used data from the indexes and therefore ran very quickly.
Query execution time has been greatly improved from 31ms to 6ms.

Conclusion

Without index, the performance of a Pinot query is already very good thanks to its optimized storage of fields in forward indexes. Adding specific indexes allows a significant performance gain even for queries scanning a large part of the table data. Please note that the query times given in this article are based on local executions on a small dataset for Pinot, so they cannot be extrapolated to a real dataset on a production Pinot deployment.

The star-tree index allows pre-aggregation without requiring a large storage space, queries using it becomes ultra-fast because they use a small number of pre-built documents instead of requiring a total scan of segments. This is for me the most interesting and innovative index offered by Pinot.

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.