Implementing a lock in Elasticsearch

Implementing a lock in Elasticsearch

At Kestra, the workflow orchestration and scheduling platform I work for, we have two repository implementations: JDBC (with support for H2, MySQL, and Postgres) and Elasticsearch.

In JDBC, locking a record for processing is fairly easy:

  1. SELECT FOR UPDATE ... WHERE ... will place a lock on the record(s)
  2. We process the record(s) in the same transaction
  3. UPDATE...

At the end of the transaction, the database will automatically release the locks obtained via SELECT FOR UPDATE.

But Elasticsearch does not support locks or transactions. Is there a way to simulate a lock?

Index vs Doc API

Elasticsearch is both a NoSQL database and a search engine. Like any search engine, it relies on an asynchronous indexing phase.

This has two consequences:

  1. A search may not return an existing document.
  2. Indexing will overwrite any existing documents by default.

There are two solutions to overcome this:

  1. Use the Doc API to load a document from its ID instead of performing a search. For this to work, you must set the document ID when indexing it and not let Elasticsearch defines an ID for you.
  2. When creating a lock, use opType: Create to ensure that no document exists for the same ID. This prevents a record from being created between the time you check whether a lock exists and the time you create the lock document.

LockRepository

Here is a potential implementation for a Lock CRUD repository:

    public Optional<Lock> findById(String category, String id)  throws IOException {
        GetRequest getRequest = new GetRequest.Builder()
            .index("locks)
            .id(id)
            .build();

                GetResponse<Lock> getResponse = client.get(getRequest, Lock.class);
        return Optional.ofNullable(getResponse.source());
    }

    public boolean create(Lock newLock) throws IOException {
        try {
            // we use OptType.Create, so if a doc already exists, it will refuse to index a new one.
            IndexRequest<Lock> request = new IndexRequest.Builder<Lock>()
                .index("locks)"
                .id(newLock.uid())
                .document(newLock)
                .opType(OpType.Create)
                .build();
            client.index(request);
            return true;
        } catch (IOException e) {
            // for conflicts, we return false to indicate that we were not able to create the lock
            if (e instanceof ResponseException respException && 
                                respException.getResponse().getStatusLine().getStatusCode() == 409) {
                return false;
            }
            throw e;
        }
    }

    public void deleteById(String category, String id) throws IOException {
        DeleteRequest request = new DeleteRequest.Builder()
            .index("locks")
            .id(IdUtils.fromParts(category, id))
            .build();

        client.delete(request);
    }

LockService

We will now implement a LockService that allows the execution of a Runnable guarded by a lock to simulate a transaction.

The principle is as follows:

  • We lock:
    • If a lock already exists, we wait.
    • Otherwise, we create a lock; if the creation returns false, it indicates a conflict, so we wait.
    • Otherwise, we have successfully acquired a lock!
  • We execute the Runnable
  • We unlock: we delete the lock

Here is an example of an implementation; our implementation is a bit more complex, but the idea is the same:

public void doInLock(String category, String id, Duration timeout, Runnable runnable) {
    lock(id); // our real implementation handle a timeout
    try {
        runnable.run();
    } finally {
        unlock(id);
    }
}

    private void lock(String id) throws LockException {
    do {
        Optional&lt;Lock&gt; existing = lockRepository.findById(category, id);
        if (existing.isEmpty()) {
            // we can try to lock!
            Lock newLock = new Lock(id, Instant.now());
            if (lockRepository.create(newLock)) {
                            // yeah, we locked!
                return true;
            }

        try {
            Thread.sleep(1); // this is not ideal, you can also use Thread.onSpinWait()
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new LockException(e);
        }
    } while (true); // &lt;-- please don't do that but implement a timeout
}

private void unlock(String category, String id) {
    Optional&lt;Lock&gt; existing = lockRepository.findById(category, id);
    if (existing.isEmpty()) {
        // should not occur but better be safe
        return;
    }

    lockRepository.deleteById(category, id);
}

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.