Problem
You are trying to use RocksDB as a state store for your structured streaming application, when you get an error message saying that the instance could not be acquired.
Caused by: java.lang.IllegalStateException: RocksDB instance could not be acquired by [ThreadId: 742, task: 140.3 in stage 3152, TID 553193] as it was not released by [ThreadId: 42, task: 140.1 in stage 3152, TID 553083] after 10009 ms StateStoreId(opId=0,partId=140,name=default)
Cause
Two concurrent tasks cannot modify the same RocksDBStateStore instance.
Concurrent tasks attempting to access the same state store (the state store tied to the same partition of state maintained by flatMapGroupsWithState) should be extremely rare. It can only happen if the task updating the store instance was restarted by the driver before the previous attempt had terminated.
Solution
This error prevents the state from being corrupted. Restart the query if you encounter this error.
If zombie tasks are taking too long to clean up their resources, when the next task tries to acquire a lock, it will also fail. In this case, you should allow more time for the thread to clean up.
Set the wait time for the thread by configuring rocksdb.lockAcquireTimeoutMs in your SQL configuration. The value is in milliseconds.
%scala spark.sql("set spark.sql.streaming.stateStore.rocksdb.lockAcquireTimeoutMs = 20000")