Implementing a lock in Elasticsearch

Implementing a lock in Elasticsearch

At Kestra, the workflow orchestration and scheduling platform I work for, we have two repository implementations: JDBC (with support for H2, MySQL, and Postgres) and Elasticsearch.

In JDBC, locking a record for processing is fairly easy:

  1. SELECT FOR UPDATE ... WHERE ... will place a lock on the record(s)
  2. We process the record(s) in the same transaction
  3. UPDATE...

At the end of the transaction, the database will automatically release the locks obtained via SELECT FOR UPDATE.

But Elasticsearch does not support locks or transactions. Is there a way to simulate a lock?

Index vs Doc API

Elasticsearch is both a NoSQL database and a search engine. Like any search engine, it relies on an asynchronous indexing phase.

This has two consequences:

  1. A search may not return an existing document.
  2. Indexing will overwrite any existing documents by default.

There are two solutions to overcome this:

  1. Use the Doc API to load a document from its ID instead of performing a search. For this to work, you must set the document ID when indexing it and not let Elasticsearch defines an ID for you.
  2. When creating a lock, use opType: Create to ensure that no document exists for the same ID. This prevents a record from being created between the time you check whether a lock exists and the time you create the lock document.

LockRepository

Here is a potential implementation for a Lock CRUD repository:

public Optional findById(String category, String id)  throws IOException {
        GetRequest getRequest = new GetRequest.Builder()
                .index("locks")
                .id(id)
                .build();

        GetResponse getResponse = client.get(getRequest, Lock.class);
        return Optional.ofNullable(getResponse.source());
}

public boolean create(Lock newLock) throws IOException {
        try {
                // we use OptType.Create, so if a doc already exists, it will refuse to index a new one.
                IndexRequest request = new IndexRequest.Builder()
                        .index("locks")
                        .id(newLock.uid())
                        .document(newLock)
                        .opType(OpType.Create)
                        .build();
                client.index(request);
                return true;
        } catch (IOException e) {
                // for conflicts, we return false to indicate that we were not able to create the lock
                if (e instanceof ResponseException respException && 
                                respException.getResponse().getStatusLine().getStatusCode() == 409) {
                        return false;
                }
                throw e;
        }
}

public void deleteById(String category, String id) throws IOException {
        DeleteRequest request = new DeleteRequest.Builder()
                .index("locks")
                .id(IdUtils.fromParts(category, id))
                .build();

        client.delete(request);
}

LockService

We will now implement a LockService that allows the execution of a Runnable guarded by a lock to simulate a transaction.

The principle is as follows:

  • We lock:
    • If a lock already exists, we wait.
    • Otherwise, we create a lock; if the creation returns false, it indicates a conflict, so we wait.
    • Otherwise, we have successfully acquired a lock!
  • We execute the Runnable
  • We unlock: we delete the lock

Here is an example of an implementation; our implementation is a bit more complex, but the idea is the same:

public void doInLock(String category, String id, Duration timeout, Runnable runnable) {
        lock(id); // our real implementation handle a timeout
        try {
                runnable.run();
        } finally {
                unlock(id);
        }
}

private void lock(String id, Duration timeout) throws LockException {
    long deadline = System.currentTimeMillis() + timeout.toMillis();
        do {
                Optional existing = lockRepository.findById(category, id);
                if (existing.isEmpty()) {
                        // we can try to lock!
                        Lock newLock = new Lock(id, Instant.now());
                        if (lockRepository.create(newLock)) {
                        // yeah, we locked!
                                return true;
                        }

                try {
                        Thread.sleep(1); // this is not ideal, you can also use Thread.onSpinWait()
                } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new LockException(e);
                }
        } while (System.currentTimeMillis() < deadline));
}

private void unlock(String category, String id) {
        Optional existing = lockRepository.findById(category, id);
        if (existing.isEmpty()) {
                // should not occur but better be safe
                return;
        }

        if (!existing.get().getOwner().equals(ServerInstance.INSTANCE_ID)) {
        // if the lock has been acquire by someone else in-between we didn't release it
        return;
    }

        lockRepository.deleteById(category, id);
}

This implementation is not a perfect implementation of a distributed lock, as explained above, the actual implementation is more complex. Among other things, Kestra has a distributed liveness check mechanism that allows us to remove locks from an instance detected as dead.

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.