Skip to content

DuckDb does not insert arrow correctly if the record batch has been sliced #14386

@mg-jump

Description

@mg-jump

What happens?

If a record batch is created from slicing a different record batch it seems that boolean columns are not created correctly in duckdb. I have made a simple test where we upload a record batch into duck db as table "record_batch" then we can query

"select * from __record_batch__";

You would expect the record batch returned to be equal to the record batch put in. However, this is not always true. have noticed if the record batch was created by calling "slice" that the booelean columns seem to be in-accurate. If i do a deep copy of the record batch before putting into the db it is accurate.

To Reproduce

#include <cstdlib>
#include <arrow/api.h>
#include <arrow/io/api.h>
#include <arrow/ipc/api.h>
#include <arrow/c/bridge.h>
#include <adbc.h>
#include <iostream>
std::string kDuckDbDriver = "/home/path_to/libduckdb.so";

#define CHECK_ARROW_FATAL(EXPR)                      \
  if (arrow::Status status = (EXPR); !status.ok()) { \
    std::cout << status.ToString() <<std::endl;       \
    exit(1);                                         \
  }

#define CHECK_ADBC_FATAL(EXPR)                                    \
  if (AdbcStatusCode status = (EXPR); status != ADBC_STATUS_OK) { \
    std::cout <<                          \
        "AdbcStatusCode " <<(int)status << " "<<(error.message ? error.message : ""); \
  }



class ReadOneBatchReader : public arrow::RecordBatchReader {
 public:
  ReadOneBatchReader(const std::shared_ptr<arrow::RecordBatch>& batch) : batch_(batch) {}
  virtual arrow::Status ReadNext(std::shared_ptr<arrow::RecordBatch>* batch) override {
    if (already_read) {
      *batch = nullptr;
      return arrow::Status::OK();
    }
    *batch = batch_;
    already_read = true;
    return arrow::Status::OK();
  }
  std::shared_ptr<arrow::Schema> schema() const override { return batch_->schema(); }

 private:
  std::shared_ptr<arrow::RecordBatch> batch_;
  bool already_read = false;
};

arrow::Result<std::shared_ptr<arrow::RecordBatchReader>> Query(std::shared_ptr<arrow::RecordBatch>& batch, AdbcConnection connection) noexcept {
  std::string query = "select * from __record_batch__";
  ArrowArrayStream array;
  auto status =
      arrow::ExportRecordBatchReader(std::make_shared<ReadOneBatchReader>(batch), &array);
  if (!status.ok()) {
    return status;
  }
  auto clean = [array_ptr = &array, &connection]() {
    AdbcStatement statement = {};
    AdbcError error = {};
    CHECK_ADBC_FATAL(AdbcStatementNew(&connection, &statement, &error));
    CHECK_ADBC_FATAL(AdbcStatementSetSqlQuery(
        &statement, "drop table __record_batch__", &error));
    CHECK_ADBC_FATAL(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error));
    CHECK_ADBC_FATAL(AdbcStatementRelease(&statement, &error));
    if (array_ptr->release) {
      array_ptr->release(array_ptr);
    }
  };
  AdbcError error = {};
  AdbcStatement statement = {};
  CHECK_ADBC_FATAL(AdbcStatementNew(&connection, &statement, &error));
  CHECK_ADBC_FATAL(AdbcStatementSetOption(
      &statement, ADBC_INGEST_OPTION_TARGET_TABLE, "__record_batch__", &error));
  CHECK_ADBC_FATAL(AdbcStatementBindStream(&statement, &array, &error));
  CHECK_ADBC_FATAL(AdbcStatementExecuteQuery(&statement, nullptr, nullptr, &error));
  CHECK_ADBC_FATAL(AdbcStatementRelease(&statement, &error));

  statement = {};
  CHECK_ADBC_FATAL(AdbcStatementNew(&connection, &statement, &error));
  auto adbc_status = AdbcStatementSetSqlQuery(&statement, query.c_str(), &error);
  if (adbc_status != ADBC_STATUS_OK) {
    clean();
    return arrow::Status::UnknownError(error.message ? error.message : "");
  }
  ArrowArrayStream array_result;
  int64_t rows_affected;
  adbc_status =
      AdbcStatementExecuteQuery(&statement, &array_result, &rows_affected, &error);
  if (adbc_status != ADBC_STATUS_OK) {
    clean();
    AdbcStatementRelease(&statement, &error);
    return arrow::Status::UnknownError(error.message ? error.message : "");
  }
  CHECK_ADBC_FATAL(AdbcStatementRelease(&statement, &error));
  auto reader_result = arrow::ImportRecordBatchReader(&array_result);
  clean();
  return reader_result;
}

int main() {
    arrow::MemoryPool* pool = arrow::default_memory_pool();

    // Create Builders for each column
    arrow::StringBuilder date_builder(pool);
    arrow::Int64Builder time_builder(pool);
    arrow::DictionaryBuilder<arrow::StringType> source_builder(pool);
    arrow::Int32Builder channel_builder(pool);
    arrow::DictionaryBuilder<arrow::StringType> b_symbol_builder(pool);
    arrow::DictionaryBuilder<arrow::StringType> symbol_builder(pool);
    arrow::DictionaryBuilder<arrow::StringType> prime_symbol_builder(pool);
    arrow::Int64Builder event_id_builder(pool);
    arrow::BooleanBuilder bool_0_builder(pool);
    arrow::BooleanBuilder bool_1_builder(pool);
    arrow::BooleanBuilder is_string_builder(pool);
    arrow::Int64Builder id_builder(pool);
    arrow::Int32Builder qty_builder(pool);
    arrow::Int32Builder b_total_qty_builder(pool);
    arrow::DoubleBuilder price_builder(pool);
    arrow::Int32Builder qty_3_builder(pool);
    arrow::BooleanBuilder bool4_builder(pool);
    arrow::BooleanBuilder is_stop_builder(pool);

    // Populate with the example data
    std::vector<std::string> dates = {"2024-10-14", "2024-10-14", "2024-10-14", "2024-10-14", "2024-10-14", 
                                            "2024-10-14", "2024-10-14", "2024-10-14", "2024-10-14", "2024-10-14"};
    std::vector<int64_t> times = {1728856800040503141, 1728856800040503141, 1728856800040503141, 1728856800052961261, 
                                  1728856800055141669, 1728856800063910245, 1728856800074775351, 1728856800080599547, 
                                  1728856800086024633, 1728856800097860063};
    std::vector<std::string> sources = {"SOURCE", "SOURCE", "SOURCE", "SOURCE", "SOURCE", 
                                             "SOURCE", "SOURCE", "SOURCE", "SOURCE", "SOURCE"};
    std::vector<int32_t> channels = {207, 207, 207, 207, 207, 207, 207, 207, 207, 207};
    std::vector<std::string> b_symbols = {"symbol", "symbol", "symbol", "symbol", "symbol", 
                                                  "symbol", "symbol", "symbol", "symbol", "symbol"};
    std::vector<std::string> symbols = {"symbol", "symbol", "symbol", "symbol", "symbol", 
                                        "symbol", "symbol", "symbol", "symbol", "symbol"};
    std::vector<std::string> prime_symbols = {"STRING@0", "STRING@0", "STRING@0", "STRING@0", "STRING@0", 
                                              "STRING@0", "STRING@0", "STRING@0", "STRING@0", "STRING@0"};
    std::vector<int64_t> event_ids = {733, 733, 733, 755, 793, 803, 810, 815, 818, 825};
    std::vector<bool> bool_0s = {false, false, false, true, true, true, true, false, false, true};
    std::vector<bool> bool_1 = {false, false, false, false, false, false, false, false, false, false};
    std::vector<bool> is_string = {false, false, false, false, false, false, false, false, false, false};
    std::vector<int64_t> ids = {6413949158507, 6413949158507, 6413949158507, 6413949158776, 
                                      6413949158812, 6413949158817, 6413949158821, 6413949158825, 
                                      6413949158826, 6413949158833};
    std::vector<int32_t> qtys = {1, 3, 4, 1, 1, 2, 1, 1, 1, 1};
    std::vector<int32_t> qty_2 = {8, 8, 8, 1, 1, 2, 1, 1, 1, 1};
    std::vector<double> prices = {585450.0, 585475.0, 585500.0, 585425.0, 585425.0, 
                                  585425.0, 585425.0, 585450.0, 585450.0, 585425.0};
    std::vector<int32_t> qty_3s = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
    std::vector<bool> bool4 = {true, true, true, false, false, false, false, false, false, false};
    std::vector<bool> is_stop = {false, false, false, false, false, false, false, false, false, false};

    // Insert values into builders
    for (size_t i = 0; i < dates.size(); ++i) {
        (void)date_builder.Append(dates[i]);
        (void)time_builder.Append(times[i]);
        (void)source_builder.Append(sources[i]);
        (void)channel_builder.Append(channels[i]);
        (void)b_symbol_builder.Append(b_symbols[i]);
        (void)symbol_builder.Append(symbols[i]);
        (void)prime_symbol_builder.Append(prime_symbols[i]);
        (void)event_id_builder.Append(event_ids[i]);
        (void)bool_0_builder.Append(bool_0s[i]);
        (void)bool_1_builder.Append(bool_1[i]);
        (void)is_string_builder.Append(is_string[i]);
        (void)id_builder.Append(ids[i]);
        (void)qty_builder.Append(qtys[i]);
        (void)b_total_qty_builder.Append(qty_2[i]);
        (void)price_builder.Append(prices[i]);
        (void)qty_3_builder.Append(qty_3s[i]);
        (void)bool4_builder.Append(bool4[i]);
        (void)is_stop_builder.Append(is_stop[i]);
    }

    // Finish builders to create Arrays
    std::shared_ptr<arrow::Array> date_array;
    std::shared_ptr<arrow::Array> time_array;
    std::shared_ptr<arrow::Array> source_array;
    std::shared_ptr<arrow::Array> channel_array;
    std::shared_ptr<arrow::Array> b_symbol_array;
    std::shared_ptr<arrow::Array> symbol_array;
    std::shared_ptr<arrow::Array> prime_symbol_array;
    std::shared_ptr<arrow::Array> event_id_array;
    std::shared_ptr<arrow::Array> bool_0_array;
    std::shared_ptr<arrow::Array> bool_1_array;
    std::shared_ptr<arrow::Array> is_string_array;
    std::shared_ptr<arrow::Array> id_array;
    std::shared_ptr<arrow::Array> qty_array;
    std::shared_ptr<arrow::Array> b_total_qty_array;
    std::shared_ptr<arrow::Array> price_array;
    std::shared_ptr<arrow::Array> qty_3_array;
    std::shared_ptr<arrow::Array> bool4_array;
    std::shared_ptr<arrow::Array> is_stop_array;

    (void)date_builder.Finish(&date_array);
    (void)time_builder.Finish(&time_array);
    (void)source_builder.Finish(&source_array);
    (void)channel_builder.Finish(&channel_array);
    (void)b_symbol_builder.Finish(&b_symbol_array);
    (void)symbol_builder.Finish(&symbol_array);
    (void)prime_symbol_builder.Finish(&prime_symbol_array);
    (void)event_id_builder.Finish(&event_id_array);
    (void)bool_0_builder.Finish(&bool_0_array);
    (void)bool_1_builder.Finish(&bool_1_array);
    (void)is_string_builder.Finish(&is_string_array);
    (void)id_builder.Finish(&id_array);
    (void)qty_builder.Finish(&qty_array);
    (void)b_total_qty_builder.Finish(&b_total_qty_array);
    (void)price_builder.Finish(&price_array);
    (void)qty_3_builder.Finish(&qty_3_array);
    (void)bool4_builder.Finish(&bool4_array);
    (void)is_stop_builder.Finish(&is_stop_array);

    // Create Schema
    std::vector<std::shared_ptr<arrow::Field>> schema_fields = {
        arrow::field("Date", arrow::utf8()),
        arrow::field("Time", arrow::int64()),
        arrow::field("Source", arrow::dictionary(arrow::int32(), arrow::utf8())),
        arrow::field("Channel", arrow::int32()),
        arrow::field("bSymbol", arrow::dictionary(arrow::int32(), arrow::utf8())),
        arrow::field("Symbol", arrow::dictionary(arrow::int32(), arrow::utf8())),
        arrow::field("PrimeSymbol", arrow::dictionary(arrow::int32(), arrow::utf8())),
        arrow::field("EventId", arrow::int64()),
        arrow::field("bool0", arrow::boolean()),
        arrow::field("bool1", arrow::boolean()),
        arrow::field("Isstring", arrow::boolean()),
        arrow::field("aId", arrow::int64()),
        arrow::field("Qty", arrow::int32()),
        arrow::field("bTotalQty", arrow::int32()),
        arrow::field("Price", arrow::float64()),
        arrow::field("Qty3", arrow::int32()),
        arrow::field("bool4", arrow::boolean()),
        arrow::field("IsStop", arrow::boolean())
    };

    auto schema = std::make_shared<arrow::Schema>(schema_fields);

    // Create RecordBatch
    auto record_batch = arrow::RecordBatch::Make(schema, dates.size(),
                                                 {date_array, time_array, source_array, channel_array,
                                                  b_symbol_array, symbol_array, prime_symbol_array, event_id_array,
                                                  bool_0_array, bool_1_array, is_string_array, id_array,
                                                  qty_array, b_total_qty_array, price_array, qty_3_array,
                                                  bool4_array, is_stop_array});

  AdbcError error = {};
  AdbcDatabase database;
  assert(AdbcDatabaseNew(&database, &error) == 0);
  assert(AdbcDatabaseSetOption(&database, "driver", kDuckDbDriver.c_str(), &error) == 0);
  assert(AdbcDatabaseSetOption(&database, "entrypoint", "duckdb_adbc_init", &error) == 0);
  assert(AdbcDatabaseInit(&database, &error) == 0);
  AdbcConnection connection;
  assert(AdbcConnectionNew(&connection, &error) == 0);
  assert(AdbcConnectionInit(&connection, &database, &error) == 0);

  auto temp_record = record_batch->Slice(4,1);
  auto res = *(*arrow::Table::FromRecordBatchReader(((*Query(temp_record, connection)).get())))->CombineChunksToBatch();
  std::cout << "-----------------temp_record-----------------" << std::endl;
  std::cout << temp_record->ToString()<< std::endl;
  std::cout << "-----------------res-----------------" << std::endl;
  std::cout << res->ToString()<< std::endl;
  assert(res->Equals(*record_batch));
  return 0;
}

you will see that column bool0 is different in temp_record compared to res

OS:

rhel8

DuckDB Version:

1.1.2

DuckDB Client:

c++, libduckdb.so

Hardware:

intel

Full Name:

Maxwell Gomez

Affiliation:

Trading Firm (please DM for name)

What is the latest build you tested with? If possible, we recommend testing with the latest nightly build.

I have tested with a stable release

Did you include all relevant data sets for reproducing the issue?

No - Other reason (please specify in the issue body)

Did you include all code required to reproduce the issue?

  • Yes, I have

Did you include all relevant configuration (e.g., CPU architecture, Python version, Linux distribution) to reproduce the issue?

  • Yes, I have

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions