Skip to content

Parquet s3 urls not working when use_memory_table is set to false #227

@elliot14A

Description

@elliot14A

s3 urls were not working when use_memory_table is set to false.
It was returning the following error:

Error: DataFusion error: Internal error: No suitable object store found for s3://roapitest/blogs_flattened.parquet. This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker

So I went through the datafusion repo, did a little bit of debugging then I had found out that,
While using use_memory_table: false it was using a function from datafusion called infer_schema which in turn uses object_store which is actually causing the error. object_store is a dashmap containing the schema info of path,s3,hdfs urls. Only localfile system urls are present in object_store. We have to manually configure s3 urls with bucket name using a function in SessionContext in RuntimeEnv called register_store.
I was able to use register_store and the required functionality to use s3 without loading into the memory seems to work, but in columnq there is global SessionContext and when tables in loaded for example in parquet file in columnq/src/table/parquet.rs has a local SessionContext.

//in columnq/src/columnq.rs
pub fn new_with_config(config: SessionConfig) -> Self {
        let dfctx = SessionContext::with_config(config);
        let bucket_name = "roapi-test";
        let region = std::env::var("AWS_REGION").unwrap();
        let endpoint = std::env::var("AWS_ENDPOINT_URL").unwrap();
        let secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").unwrap();
        let access_key_id = std::env::var("AWS_ACCESS_KEY_ID").unwrap();
        let s3_builder = AmazonS3Builder::new()
            .with_access_key_id(access_key_id)
            .with_bucket_name(bucket_name)
            .with_endpoint(endpoint)
            .with_region(region)
            .with_secret_access_key(secret_access_key)
            .build()
            .map_err(|e| {
                println!("{:?}", e);
                ColumnQError::MissingOption
            })
            .unwrap();
        dfctx.runtime_env().object_store_registry.register_store(
            "s3",
            "roapi-test",
            Arc::new(s3_builder),
        );
        let schema_map = HashMap::<String, arrow::datatypes::SchemaRef>::new();
        Self {
            dfctx,
            schema_map,
            kv_catalog: HashMap::new(),
        }
    }
//in columnq/src/table/parquet.rs
pub async fn to_datafusion_table(t: &TableSource) -> Result<Arc<dyn TableProvider>, ColumnQError> {
    let opt = t
        .option
        .clone()
        .unwrap_or_else(|| TableLoadOption::parquet(TableOptionParquet::default()));
    let TableOptionParquet { use_memory_table } = opt.as_parquet()?;

    if *use_memory_table {
        to_mem_table(t).await
    } else {
        let table_url = ListingTableUrl::parse(t.get_uri_str())?;
        let options = ListingOptions::new(Arc::new(ParquetFormat::default()));

        let schemaref = match &t.schema {
            Some(s) => Arc::new(s.into()),
            None => {
                let ctx = SessionContext::new();
               
                let bucket_name = "roapi-test";
                let region = std::env::var("AWS_REGION").unwrap();
                let endpoint = std::env::var("AWS_ENDPOINT_URL").unwrap();
                let secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").unwrap();
                let access_key_id = std::env::var("AWS_ACCESS_KEY_ID").unwrap();
                let s3_builder = AmazonS3Builder::new()
                    .with_access_key_id(access_key_id)
                    .with_bucket_name(bucket_name)
                    .with_endpoint(endpoint)
                    .with_region(region)
                    .with_secret_access_key(secret_access_key)
                    .build()
                    .map_err(|e| {
                        println!("{:?}", e);
                        ColumnQError::MissingOption
                    })?;
                ctx.runtime_env().object_store_registry.register_store(
                    "s3",
                    "roapi-test",
                    Arc::new(s3_builder),
                );
                let s = options.infer_schema(&ctx.state(), &table_url).await?;
                println!("6");
                s
            }
        };

        let table_config = ListingTableConfig::new(table_url)
            .with_listing_options(options)
            .with_schema(schemaref);
        Ok(Arc::new(ListingTable::try_new(table_config)?))
    }
}

if you can see I have to use register_store function twice into order make s3 urls to work only for parquet files. If want it to work for csv files, I have to use the function thrice. Which I think is not efficient.

Do you think is there a better way to do it like having a single global SessionContext?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions