Problem
You are attempting to write dataframes into OpenSearch indices using the org.opensearch.client:opensearch-spark-30_2.12
library when you encounter the error AnalysisException: Incompatible format detected
.
Cause
This error is caused by the presence of a _delta_log
folder in the root directory of the Databricks File System (DBFS).
When Apache Spark detects this folder, it assumes the target path is a Delta table, leading to the AnalysisException
error when attempting to write non-Delta data using the OpenSearch format. The folder's presence causes Spark to misinterpret the target path.
Solution
Verify the presence of the _delta_log
folder in the root directory.
dbutils.fs.ls("dbfs:/_delta_log/")
If the folder is present, delete it to remove the conflict. Ensure you take a backup if necessary before deletion.
dbutils.fs.rm("dbfs:/_delta_log/", True)
After deleting the _delta_log
folder, re-run the job to write the dataframe into OpenSearch using the specified function. Please adjust the parameters as required for your environment. The following is an example for an upsert condition in us-east-1 region.
def saveToElasticSearch(df):
df.write.format("org.opensearch.spark.sql")\
.option("opensearch.nodes", openSearchDomainPath)\
.option("opensearch.nodes.wan.only", True)\
.option("opensearch.port","443")\
.option("opensearch.net.ssl", "true")\
.option("opensearch.aws.sigv4.enabled", "true")\
.option("opensearch.aws.sigv4.region", "us-east-1")\
.option("opensearch.batch.size.entries",200)\
.option("opensearch.mapping.id","id")\
.option("opensearch.write.operation", "upsert")\
.mode("append")\
.save(index)
Confirm that the job now completes successfully.
Note
As a preventive measure, avoid creating Delta tables at the root location in DBFS to prevent similar conflicts in the future. Regularly check and clean up any unintended folders that may interfere with your workflows.
For more information, please review the What is Delta Lake? (AWS | Azure | GCP) documentation.