Implémenter un lock en Elasticsearch

Implémenter un lock en Elasticsearch

A Kestra, la plateforme d'orchestration et de scheduling de workflow pour laquelle je travaille, nous avons deux implémentations de repository: JDBC (avec support pour H2, MySQL et Postgres) et Elasticsearch.

En JDBC, locker un enregistrement pour le traiter est assez facile :

  1. SELECT FOR UPDATE ... WHERE ... va poser un lock sur le ou les enregistrements
  2. On traite le ou les enregistrements dans la même transaction
  3. UPDATE...

En fin de transaction, la BDD va automatiquement relâcher les locks obtenu via le SELECT FOR UPDATE.

Mais Elasticsearch ne supporte ni les locks ni les transactions, existe-t-il un moyen de simuler un lock?

Index vs Doc API

Elasticsearch est en même temps une base de données NoSQL et un moteur de recherche. Comme tout moteur de recherche, il se base sur une phase d'indexation asynchrone.

Cela a deux conséquences :

  1. Une recherche peut ne pas retourner un document existant
  2. Une indexation va par défaut écraser tout document existant

Pour palier à ça, il y a deux solutions :

  1. Utiliser la Doc API pour charger un document depuis son identifiant au lieu de faire une recherche. Pour que cela marche, il faut fixer l'identifiant du document lors de son indexation et ne pas laisser Elasticsearch définir un identifiant à notre place.
  2. Lors de la création d'un lock, utiliser opType: Create pour être sûr qu'aucun document n'existe pour un même identifiant. Cela permet d'éviter qu'un enregistrement n'ait été créé entre le moment où on regarde si un lock existe et celui où on crée le document de lock.

LockRepository

Voici une potentielle implémentation pour un repository CRUD de Lock:

    public Optional<Lock> findById(String category, String id)  throws IOException {
        GetRequest getRequest = new GetRequest.Builder()
            .index("locks)
            .id(id)
            .build();

                GetResponse<Lock> getResponse = client.get(getRequest, Lock.class);
        return Optional.ofNullable(getResponse.source());
    }

    public boolean create(Lock newLock) throws IOException {
        try {
            // we use OptType.Create, so if a doc already exists, it will refuse to index a new one.
            IndexRequest<Lock> request = new IndexRequest.Builder<Lock>()
                .index("locks)"
                .id(newLock.uid())
                .document(newLock)
                .opType(OpType.Create)
                .build();
            client.index(request);
            return true;
        } catch (IOException e) {
            // for conflicts, we return false to indicate that we were not able to create the lock
            if (e instanceof ResponseException respException && 
                    respException.getResponse().getStatusLine().getStatusCode() == 409) {
                return false;
            }
            throw e;
        }
    }

    public void deleteById(String category, String id) throws IOException {
        DeleteRequest request = new DeleteRequest.Builder()
            .index("locks")
            .id(IdUtils.fromParts(category, id))
            .build();

        client.delete(request);
    }
[/code]

<h2>LockService</h2>

Nous allons maintenant implémenter un <code>LockService</code> qui a une méthode qui permet d'exécuter un <code>Runnable</code> gardé par un lock pour simuler une transaction.

Le principe est le suivant :
- On lock :
    - Si un lock existe déjà, on attend
    - Sinon, on crée un lock, si la création retourne false ça indique un conflit, donc on attend
    - Sinon, on a réussit à poser un lock !
- On execute le <code>Runnable</code>
- On dé-lock: on supprime le lock

Voici un exemple d'implémentation, notre implémentation est un peu plus complexe, mais l'idée est là :

```java
    public void doInLock(String category, String id, Duration timeout, Runnable runnable) {
        lock(id); // our real implementation handle a timeout
        try {
            runnable.run();
        } finally {
            unlock(id);
        }
    }

<pre><code>    private void lock(String id) throws LockException {
    do {
        Optional&amp;lt;Lock&amp;gt; existing = lockRepository.findById(category, id);
        if (existing.isEmpty()) {
            // we can try to lock!
            Lock newLock = new Lock(id, Instant.now());
            if (lockRepository.create(newLock)) {
                            // yeah, we locked!
                return true;
            }

        try {
            Thread.sleep(1); // this is not ideal, you can also use Thread.onSpinWait()
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new LockException(e);
        }
    } while (true); // &amp;lt;-- please don&amp;#039;t do that but implement a timeout
}

private void unlock(String category, String id) {
    Optional&amp;lt;Lock&amp;gt; existing = lockRepository.findById(category, id);
    if (existing.isEmpty()) {
        // should not occur but better be safe
        return;
    }

    lockRepository.deleteById(category, id);
}

Laisser un commentaire

Ce site utilise Akismet pour réduire les indésirables. En savoir plus sur la façon dont les données de vos commentaires sont traitées.