-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Description
What happens?
If a record batch is created from slicing a different record batch it seems that boolean columns are not created correctly in duckdb. I have made a simple test where we upload a record batch into duck db as table "record_batch" then we can query
"select * from __record_batch__";
You would expect the record batch returned to be equal to the record batch put in. However, this is not always true. have noticed if the record batch was created by calling "slice" that the booelean columns seem to be in-accurate. If i do a deep copy of the record batch before putting into the db it is accurate.
To Reproduce
#include <cstdlib>
#include <arrow/api.h>
#include <arrow/io/api.h>
#include <arrow/ipc/api.h>
#include <arrow/c/bridge.h>
#include <adbc.h>
#include <iostream>
std::string kDuckDbDriver = "/home/path_to/libduckdb.so";
#define CHECK_ARROW_FATAL(EXPR) \
if (arrow::Status status = (EXPR); !status.ok()) { \
std::cout << status.ToString() <<std::endl; \
exit(1); \
}
#define CHECK_ADBC_FATAL(EXPR) \
if (AdbcStatusCode status = (EXPR); status != ADBC_STATUS_OK) { \
std::cout << \
"AdbcStatusCode " <<(int)status << " "<<(error.message ? error.message : ""); \
}
class ReadOneBatchReader : public arrow::RecordBatchReader {
public:
ReadOneBatchReader(const std::shared_ptr<arrow::RecordBatch>& batch) : batch_(batch) {}
virtual arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch) override {
if (already_read) {
*batch = nullptr;
return arrow::Status::OK();
}
*batch = batch_;
already_read = true;
return arrow::Status::OK();
}
std::shared_ptr<arrow::Schema> schema() const override { return batch_->schema(); }
private:
std::shared_ptr<arrow::RecordBatch> batch_;
bool already_read = false;
};
arrow::Result<std::shared_ptr<arrow::RecordBatchReader>> Query(std::shared_ptr<arrow::RecordBatch>& batch, AdbcConnection connection) noexcept {
std::string query = "select * from __record_batch__";
ArrowArrayStream array;
auto status =
arrow::ExportRecordBatchReader(std::make_shared<ReadOneBatchReader>(batch), &array);
if (!status.ok()) {
return status;
}
auto clean = [array_ptr = &array, &connection]() {
AdbcStatement statement = {};
AdbcError error = {};
CHECK_ADBC_FATAL(AdbcStatementNew(&connection, &statement, &error));
CHECK_ADBC_FATAL(AdbcStatementSetSqlQuery(
&statement, "drop table __record_batch__", &error));
CHECK_ADBC_FATAL(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error));
CHECK_ADBC_FATAL(AdbcStatementRelease(&statement, &error));
if (array_ptr->release) {
array_ptr->release(array_ptr);
}
};
AdbcError error = {};
AdbcStatement statement = {};
CHECK_ADBC_FATAL(AdbcStatementNew(&connection, &statement, &error));
CHECK_ADBC_FATAL(AdbcStatementSetOption(
&statement, ADBC_INGEST_OPTION_TARGET_TABLE, "__record_batch__", &error));
CHECK_ADBC_FATAL(AdbcStatementBindStream(&statement, &array, &error));
CHECK_ADBC_FATAL(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error));
CHECK_ADBC_FATAL(AdbcStatementRelease(&statement, &error));
statement = {};
CHECK_ADBC_FATAL(AdbcStatementNew(&connection, &statement, &error));
auto adbc_status = AdbcStatementSetSqlQuery(&statement, query.c_str(), &error);
if (adbc_status != ADBC_STATUS_OK) {
clean();
return arrow::Status::UnknownError(error.message ? error.message : "");
}
ArrowArrayStream array_result;
int64_t rows_affected;
adbc_status =
AdbcStatementExecuteQuery(&statement, &array_result, &rows_affected, &error);
if (adbc_status != ADBC_STATUS_OK) {
clean();
AdbcStatementRelease(&statement, &error);
return arrow::Status::UnknownError(error.message ? error.message : "");
}
CHECK_ADBC_FATAL(AdbcStatementRelease(&statement, &error));
auto reader_result = arrow::ImportRecordBatchReader(&array_result);
clean();
return reader_result;
}
int main() {
arrow::MemoryPool* pool = arrow::default_memory_pool();
// Create Builders for each column
arrow::StringBuilder date_builder(pool);
arrow::Int64Builder time_builder(pool);
arrow::DictionaryBuilder<arrow::StringType> source_builder(pool);
arrow::Int32Builder channel_builder(pool);
arrow::DictionaryBuilder<arrow::StringType> b_symbol_builder(pool);
arrow::DictionaryBuilder<arrow::StringType> symbol_builder(pool);
arrow::DictionaryBuilder<arrow::StringType> prime_symbol_builder(pool);
arrow::Int64Builder event_id_builder(pool);
arrow::BooleanBuilder bool_0_builder(pool);
arrow::BooleanBuilder bool_1_builder(pool);
arrow::BooleanBuilder is_string_builder(pool);
arrow::Int64Builder id_builder(pool);
arrow::Int32Builder qty_builder(pool);
arrow::Int32Builder b_total_qty_builder(pool);
arrow::DoubleBuilder price_builder(pool);
arrow::Int32Builder qty_3_builder(pool);
arrow::BooleanBuilder bool4_builder(pool);
arrow::BooleanBuilder is_stop_builder(pool);
// Populate with the example data
std::vector<std::string> dates = {"2024-10-14", "2024-10-14", "2024-10-14", "2024-10-14", "2024-10-14",
"2024-10-14", "2024-10-14", "2024-10-14", "2024-10-14", "2024-10-14"};
std::vector<int64_t> times = {1728856800040503141, 1728856800040503141, 1728856800040503141, 1728856800052961261,
1728856800055141669, 1728856800063910245, 1728856800074775351, 1728856800080599547,
1728856800086024633, 1728856800097860063};
std::vector<std::string> sources = {"SOURCE", "SOURCE", "SOURCE", "SOURCE", "SOURCE",
"SOURCE", "SOURCE", "SOURCE", "SOURCE", "SOURCE"};
std::vector<int32_t> channels = {207, 207, 207, 207, 207, 207, 207, 207, 207, 207};
std::vector<std::string> b_symbols = {"symbol", "symbol", "symbol", "symbol", "symbol",
"symbol", "symbol", "symbol", "symbol", "symbol"};
std::vector<std::string> symbols = {"symbol", "symbol", "symbol", "symbol", "symbol",
"symbol", "symbol", "symbol", "symbol", "symbol"};
std::vector<std::string> prime_symbols = {"STRING@0", "STRING@0", "STRING@0", "STRING@0", "STRING@0",
"STRING@0", "STRING@0", "STRING@0", "STRING@0", "STRING@0"};
std::vector<int64_t> event_ids = {733, 733, 733, 755, 793, 803, 810, 815, 818, 825};
std::vector<bool> bool_0s = {false, false, false, true, true, true, true, false, false, true};
std::vector<bool> bool_1 = {false, false, false, false, false, false, false, false, false, false};
std::vector<bool> is_string = {false, false, false, false, false, false, false, false, false, false};
std::vector<int64_t> ids = {6413949158507, 6413949158507, 6413949158507, 6413949158776,
6413949158812, 6413949158817, 6413949158821, 6413949158825,
6413949158826, 6413949158833};
std::vector<int32_t> qtys = {1, 3, 4, 1, 1, 2, 1, 1, 1, 1};
std::vector<int32_t> qty_2 = {8, 8, 8, 1, 1, 2, 1, 1, 1, 1};
std::vector<double> prices = {585450.0, 585475.0, 585500.0, 585425.0, 585425.0,
585425.0, 585425.0, 585450.0, 585450.0, 585425.0};
std::vector<int32_t> qty_3s = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
std::vector<bool> bool4 = {true, true, true, false, false, false, false, false, false, false};
std::vector<bool> is_stop = {false, false, false, false, false, false, false, false, false, false};
// Insert values into builders
for (size_t i = 0; i < dates.size(); ++i) {
(void)date_builder.Append(dates[i]);
(void)time_builder.Append(times[i]);
(void)source_builder.Append(sources[i]);
(void)channel_builder.Append(channels[i]);
(void)b_symbol_builder.Append(b_symbols[i]);
(void)symbol_builder.Append(symbols[i]);
(void)prime_symbol_builder.Append(prime_symbols[i]);
(void)event_id_builder.Append(event_ids[i]);
(void)bool_0_builder.Append(bool_0s[i]);
(void)bool_1_builder.Append(bool_1[i]);
(void)is_string_builder.Append(is_string[i]);
(void)id_builder.Append(ids[i]);
(void)qty_builder.Append(qtys[i]);
(void)b_total_qty_builder.Append(qty_2[i]);
(void)price_builder.Append(prices[i]);
(void)qty_3_builder.Append(qty_3s[i]);
(void)bool4_builder.Append(bool4[i]);
(void)is_stop_builder.Append(is_stop[i]);
}
// Finish builders to create Arrays
std::shared_ptr<arrow::Array> date_array;
std::shared_ptr<arrow::Array> time_array;
std::shared_ptr<arrow::Array> source_array;
std::shared_ptr<arrow::Array> channel_array;
std::shared_ptr<arrow::Array> b_symbol_array;
std::shared_ptr<arrow::Array> symbol_array;
std::shared_ptr<arrow::Array> prime_symbol_array;
std::shared_ptr<arrow::Array> event_id_array;
std::shared_ptr<arrow::Array> bool_0_array;
std::shared_ptr<arrow::Array> bool_1_array;
std::shared_ptr<arrow::Array> is_string_array;
std::shared_ptr<arrow::Array> id_array;
std::shared_ptr<arrow::Array> qty_array;
std::shared_ptr<arrow::Array> b_total_qty_array;
std::shared_ptr<arrow::Array> price_array;
std::shared_ptr<arrow::Array> qty_3_array;
std::shared_ptr<arrow::Array> bool4_array;
std::shared_ptr<arrow::Array> is_stop_array;
(void)date_builder.Finish(&date_array);
(void)time_builder.Finish(&time_array);
(void)source_builder.Finish(&source_array);
(void)channel_builder.Finish(&channel_array);
(void)b_symbol_builder.Finish(&b_symbol_array);
(void)symbol_builder.Finish(&symbol_array);
(void)prime_symbol_builder.Finish(&prime_symbol_array);
(void)event_id_builder.Finish(&event_id_array);
(void)bool_0_builder.Finish(&bool_0_array);
(void)bool_1_builder.Finish(&bool_1_array);
(void)is_string_builder.Finish(&is_string_array);
(void)id_builder.Finish(&id_array);
(void)qty_builder.Finish(&qty_array);
(void)b_total_qty_builder.Finish(&b_total_qty_array);
(void)price_builder.Finish(&price_array);
(void)qty_3_builder.Finish(&qty_3_array);
(void)bool4_builder.Finish(&bool4_array);
(void)is_stop_builder.Finish(&is_stop_array);
// Create Schema
std::vector<std::shared_ptr<arrow::Field>> schema_fields = {
arrow::field("Date", arrow::utf8()),
arrow::field("Time", arrow::int64()),
arrow::field("Source", arrow::dictionary(arrow::int32(), arrow::utf8())),
arrow::field("Channel", arrow::int32()),
arrow::field("bSymbol", arrow::dictionary(arrow::int32(), arrow::utf8())),
arrow::field("Symbol", arrow::dictionary(arrow::int32(), arrow::utf8())),
arrow::field("PrimeSymbol", arrow::dictionary(arrow::int32(), arrow::utf8())),
arrow::field("EventId", arrow::int64()),
arrow::field("bool0", arrow::boolean()),
arrow::field("bool1", arrow::boolean()),
arrow::field("Isstring", arrow::boolean()),
arrow::field("aId", arrow::int64()),
arrow::field("Qty", arrow::int32()),
arrow::field("bTotalQty", arrow::int32()),
arrow::field("Price", arrow::float64()),
arrow::field("Qty3", arrow::int32()),
arrow::field("bool4", arrow::boolean()),
arrow::field("IsStop", arrow::boolean())
};
auto schema = std::make_shared<arrow::Schema>(schema_fields);
// Create RecordBatch
auto record_batch = arrow::RecordBatch::Make(schema, dates.size(),
{date_array, time_array, source_array, channel_array,
b_symbol_array, symbol_array, prime_symbol_array, event_id_array,
bool_0_array, bool_1_array, is_string_array, id_array,
qty_array, b_total_qty_array, price_array, qty_3_array,
bool4_array, is_stop_array});
AdbcError error = {};
AdbcDatabase database;
assert(AdbcDatabaseNew(&database, &error) == 0);
assert(AdbcDatabaseSetOption(&database, "driver", kDuckDbDriver.c_str(), &error) == 0);
assert(AdbcDatabaseSetOption(&database, "entrypoint", "duckdb_adbc_init", &error) == 0);
assert(AdbcDatabaseInit(&database, &error) == 0);
AdbcConnection connection;
assert(AdbcConnectionNew(&connection, &error) == 0);
assert(AdbcConnectionInit(&connection, &database, &error) == 0);
auto temp_record = record_batch->Slice(4,1);
auto res = *(*arrow::Table::FromRecordBatchReader(((*Query(temp_record, connection)).get())))->CombineChunksToBatch();
std::cout << "-----------------temp_record-----------------" << std::endl;
std::cout << temp_record->ToString()<< std::endl;
std::cout << "-----------------res-----------------" << std::endl;
std::cout << res->ToString()<< std::endl;
assert(res->Equals(*record_batch));
return 0;
}
you will see that column bool0
is different in temp_record
compared to res
OS:
rhel8
DuckDB Version:
1.1.2
DuckDB Client:
c++, libduckdb.so
Hardware:
intel
Full Name:
Maxwell Gomez
Affiliation:
Trading Firm (please DM for name)
What is the latest build you tested with? If possible, we recommend testing with the latest nightly build.
I have tested with a stable release
Did you include all relevant data sets for reproducing the issue?
No - Other reason (please specify in the issue body)
Did you include all code required to reproduce the issue?
- Yes, I have
Did you include all relevant configuration (e.g., CPU architecture, Python version, Linux distribution) to reproduce the issue?
- Yes, I have