RocksDB fails to acquire a lock

When using RocksDB as a state store, you may need to increase the acquire timeout in the SQL config.

Written by Adam Pavlacka

Last published at: February 25th, 2023

Problem

You are trying to use RocksDB as a state store for your structured streaming application, when you get an error message saying that the instance could not be acquired.

Caused by: java.lang.IllegalStateException: RocksDB instance could not be acquired by [ThreadId: 742, task: 140.3 in stage 3152, TID 553193] as it was not released by [ThreadId: 42, task: 140.1 in stage 3152, TID 553083] after 10009 ms
StateStoreId(opId=0,partId=140,name=default)

Cause

Two concurrent tasks cannot modify the same RocksDBStateStore instance.

Concurrent tasks attempting to access the same state store (the state store tied to the same partition of state maintained by flatMapGroupsWithState) should be extremely rare. It can only happen if the task updating the store instance was restarted by the driver before the previous attempt had terminated.

Delete

Info

Abrupt node termination, like when a spot instance terminates, can also cause this error.

Solution

This error prevents the state from being corrupted. Restart the query if you encounter this error.

If zombie tasks are taking too long to clean up their resources, when the next task tries to acquire a lock, it will also fail. In this case, you should allow more time for the thread to clean up.

Set the wait time for the thread by configuring rocksdb.lockAcquireTimeoutMs in your SQL configuration. The value is in milliseconds.

%scala
spark.sql("set spark.sql.streaming.stateStore.rocksdb.lockAcquireTimeoutMs = 20000")