Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent writers return error despite successfully written to the file #2609

Open
Aiden-Frost opened this issue Jun 18, 2024 · 5 comments
Open
Labels
bug Something isn't working

Comments

@Aiden-Frost
Copy link

Environment

Delta-rs version: 0.18.1

Binding: rust

Environment:

  • Cloud provider: S3
  • OS: MacOS Sonama 14.5
  • Other:

Bug

What happened: I have writers as python processes, writing to the same file location. Each writer is responsible for creating a pandas dataframe and writing to the exact file location. There are 4 different scenarios regarding this writing:

  1. Wrote to the file location and does not return error.
  2. Wrote to the file location and returns error.
    2.1. Delta transaction failed, version 0 already exists
    2.2. Generic error: A Delta Lake table already exists at that location
  3. Does not write to the file location and returns error.
    3.1. Delta table already exists, write mode set to error

When the process throws error 2.1 or 2.2, I expect the write to fail. But when inspecting the file I observe that the writer's data is appended to the file.

For the below reproducible example, let's consider process-1 and process-2 had the error 2.2. Now I only expected process 3 to be present in the file but then the file contained

pd.read_parquet('s3a://test-bucket/file-4)
           x
0  Process 2
1  Process 1
2  Process 3

What you expected to happen: When the writer threw an error, I expected the write to fail. When multiple writers are writing to the same location then only one should succeed and the other writes should fail and return error.

How to reproduce it:

import multiprocessing
import pandas as pd
from deltalake import write_deltalake

storage_options = {'AWS_S3_LOCKING_PROVIDER': 'dynamodb', 'DELTA_DYNAMO_TABLE_NAME': 'delta_log'}

def print_process_name(process_name):
    for i in range(10):
        try:
            df = pd.DataFrame({'x': [process_name]})
            print(f"WRITING process: {process_name}, i: {i}")
            write_deltalake(f's3a://test-bucket/file-{i}', df, storage_options=storage_options)
        except Exception as e:
            print(f"Error: {e} process: {process_name}, i: {i}")
        else:
            print(f"SUCCESS Process: {process_name}, i: {i}")
    

if __name__ == "__main__":

    process_names = ["Process 1", "Process 2", "Process 3"]


    processes = []
    for name in process_names:
        process = multiprocessing.Process(target=print_process_name, args=(name,))
        processes.append(process)


    for process in processes:
        process.start()

    for process in processes:
        process.join()

More details:

Regarding the Generic error: A Delta Lake table already exists at that location, I believe this is handled in this part of the code in
crates/core/src/logstore/mod.rs

    async fn is_delta_table_location(&self) -> DeltaResult<bool> {
        // TODO We should really be using HEAD here, but this fails in windows tests
        let object_store = self.object_store();
        let mut stream = object_store.list(Some(self.log_path()));
        if let Some(res) = stream.next().await {
            match res {
                Ok(_) => Ok(true),
                Err(ObjectStoreError::NotFound { .. }) => Ok(false),
                Err(err) => Err(err)?,
            }
        } else {
            Ok(false)
        }
    }

This function is called here crates/core/src/operations/create.rs:

            let table_state = if log_store.is_delta_table_location().await? {
                match mode {
                    SaveMode::ErrorIfExists => return Err(CreateError::TableAlreadyExists.into()),

For the Delta table already exists, write mode set to error, this is handled from python side in python/deltalake/writer.py

def write_deltalake 
....
            if mode == "error":
                raise FileExistsError(
                    "Delta table already exists, write mode set to error."
                )
@Aiden-Frost Aiden-Frost added the bug Something isn't working label Jun 18, 2024
@wjones127
Copy link
Collaborator

When the writer threw an error, I expected the write to fail. When multiple writers are writing to the same location then only one should succeed and the other writes should fail and return error.

I think the information you are missing is that in a delta table, the writes happen in two stages: (1) write data files (Parquet), then (2) commit to the transaction log. It detects the table already exists and fails on step 2. The files created in step (1) that are part of the failed transaction can be cleaned up with the VACUUM operation.

@Aiden-Frost
Copy link
Author

Thank you for the clarification. I have gone through the transaction and vacuum documentation. Following the example when I try to vacuum:

dt = DeltaTable("s3a://test-bucket/file-4")
dt.vacuum()
[]

It returns an empty list. Is there something I am missing to indicate to clean up files that have failed transaction?

@wjones127
Copy link
Collaborator

You should read the documentation for the vacuum method, particularly the retention_hours and enforce_retention_duration parameters.

https://delta-io.github.io/delta-rs/api/delta_table/#deltalake.DeltaTable.vacuum

@Aiden-Frost
Copy link
Author

After going through the documentation I have set the retention_hours=0 and set enforce_retention_duration=False
But even after this I still get empty list for vacuum.

There are totally 3 parquet files generated by the sample program in a path, I observed that the 00000000000000000000.json file has one parquet file under the field value: add, while the other two parquet files are under add in the tmp commit.

@marcosmartinezfco
Copy link

I have the same issue here. Even if I try to vacuum the files or even if I try deleting and creating the table again I get the same error where the data gets appended but then the transaction log fails.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants