-
Notifications
You must be signed in to change notification settings - Fork 203
Description
s3 urls were not working when use_memory_table
is set to false.
It was returning the following error:
Error: DataFusion error: Internal error: No suitable object store found for s3://roapitest/blogs_flattened.parquet. This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
So I went through the datafusion repo, did a little bit of debugging then I had found out that,
While using use_memory_table: false
it was using a function from datafusion called infer_schema
which in turn uses object_store
which is actually causing the error. object_store
is a dashmap containing the schema info of path,s3,hdfs urls. Only localfile system urls are present in object_store
. We have to manually configure s3 urls with bucket name using a function in SessionContext
in RuntimeEnv
called register_store
.
I was able to use register_store
and the required functionality to use s3 without loading into the memory seems to work, but in columnq
there is global SessionContext
and when tables in loaded for example in parquet file in columnq/src/table/parquet.rs
has a local SessionContext
.
//in columnq/src/columnq.rs
pub fn new_with_config(config: SessionConfig) -> Self {
let dfctx = SessionContext::with_config(config);
let bucket_name = "roapi-test";
let region = std::env::var("AWS_REGION").unwrap();
let endpoint = std::env::var("AWS_ENDPOINT_URL").unwrap();
let secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").unwrap();
let access_key_id = std::env::var("AWS_ACCESS_KEY_ID").unwrap();
let s3_builder = AmazonS3Builder::new()
.with_access_key_id(access_key_id)
.with_bucket_name(bucket_name)
.with_endpoint(endpoint)
.with_region(region)
.with_secret_access_key(secret_access_key)
.build()
.map_err(|e| {
println!("{:?}", e);
ColumnQError::MissingOption
})
.unwrap();
dfctx.runtime_env().object_store_registry.register_store(
"s3",
"roapi-test",
Arc::new(s3_builder),
);
let schema_map = HashMap::<String, arrow::datatypes::SchemaRef>::new();
Self {
dfctx,
schema_map,
kv_catalog: HashMap::new(),
}
}
//in columnq/src/table/parquet.rs
pub async fn to_datafusion_table(t: &TableSource) -> Result<Arc<dyn TableProvider>, ColumnQError> {
let opt = t
.option
.clone()
.unwrap_or_else(|| TableLoadOption::parquet(TableOptionParquet::default()));
let TableOptionParquet { use_memory_table } = opt.as_parquet()?;
if *use_memory_table {
to_mem_table(t).await
} else {
let table_url = ListingTableUrl::parse(t.get_uri_str())?;
let options = ListingOptions::new(Arc::new(ParquetFormat::default()));
let schemaref = match &t.schema {
Some(s) => Arc::new(s.into()),
None => {
let ctx = SessionContext::new();
let bucket_name = "roapi-test";
let region = std::env::var("AWS_REGION").unwrap();
let endpoint = std::env::var("AWS_ENDPOINT_URL").unwrap();
let secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").unwrap();
let access_key_id = std::env::var("AWS_ACCESS_KEY_ID").unwrap();
let s3_builder = AmazonS3Builder::new()
.with_access_key_id(access_key_id)
.with_bucket_name(bucket_name)
.with_endpoint(endpoint)
.with_region(region)
.with_secret_access_key(secret_access_key)
.build()
.map_err(|e| {
println!("{:?}", e);
ColumnQError::MissingOption
})?;
ctx.runtime_env().object_store_registry.register_store(
"s3",
"roapi-test",
Arc::new(s3_builder),
);
let s = options.infer_schema(&ctx.state(), &table_url).await?;
println!("6");
s
}
};
let table_config = ListingTableConfig::new(table_url)
.with_listing_options(options)
.with_schema(schemaref);
Ok(Arc::new(ListingTable::try_new(table_config)?))
}
}
if you can see I have to use register_store
function twice into order make s3 urls to work only for parquet files. If want it to work for csv files, I have to use the function thrice. Which I think is not efficient.
Do you think is there a better way to do it like having a single global SessionContext
?