diff --git a/extension/parquet/include/parquet_bss_encoder.hpp b/extension/parquet/include/parquet_bss_encoder.hpp index 49b1ab05b4..78f75a0c75 100644 --- a/extension/parquet/include/parquet_bss_encoder.hpp +++ b/extension/parquet/include/parquet_bss_encoder.hpp @@ -30,7 +30,6 @@ public: } void FinishWrite(WriteStream &writer) { - D_ASSERT(count == total_value_count); writer.WriteData(buffer.get(), total_value_count * bit_width); } diff --git a/extension/parquet/include/parquet_dlba_encoder.hpp b/extension/parquet/include/parquet_dlba_encoder.hpp index ef7d19f0cf..f9c1964e16 100644 --- a/extension/parquet/include/parquet_dlba_encoder.hpp +++ b/extension/parquet/include/parquet_dlba_encoder.hpp @@ -33,9 +33,8 @@ public: } void FinishWrite(WriteStream &writer) { - D_ASSERT(stream->GetPosition() == total_string_size); dbp_encoder.FinishWrite(writer); - writer.WriteData(buffer.get(), total_string_size); + writer.WriteData(stream->GetData(), stream->GetPosition()); } private: diff --git a/extension/parquet/include/writer/boolean_column_writer.hpp b/extension/parquet/include/writer/boolean_column_writer.hpp index f513f15a57..6c650cb030 100644 --- a/extension/parquet/include/writer/boolean_column_writer.hpp +++ b/extension/parquet/include/writer/boolean_column_writer.hpp @@ -24,7 +24,7 @@ public: void WriteVector(WriteStream &temp_writer, ColumnWriterStatistics *stats_p, ColumnWriterPageState *state_p, Vector &input_column, idx_t chunk_start, idx_t chunk_end) override; - unique_ptr InitializePageState(PrimitiveColumnWriterState &state) override; + unique_ptr InitializePageState(PrimitiveColumnWriterState &state, idx_t page_idx) override; void FlushPageState(WriteStream &temp_writer, ColumnWriterPageState *state_p) override; idx_t GetRowSize(const Vector &vector, const idx_t index, const PrimitiveColumnWriterState &state) const override; diff --git a/extension/parquet/include/writer/enum_column_writer.hpp b/extension/parquet/include/writer/enum_column_writer.hpp index 724bfab6de..ab4772eb23 100644 --- a/extension/parquet/include/writer/enum_column_writer.hpp +++ b/extension/parquet/include/writer/enum_column_writer.hpp @@ -28,7 +28,7 @@ public: void WriteVector(WriteStream &temp_writer, ColumnWriterStatistics *stats_p, ColumnWriterPageState *page_state_p, Vector &input_column, idx_t chunk_start, idx_t chunk_end) override; - unique_ptr InitializePageState(PrimitiveColumnWriterState &state) override; + unique_ptr InitializePageState(PrimitiveColumnWriterState &state, idx_t page_idx) override; void FlushPageState(WriteStream &temp_writer, ColumnWriterPageState *state_p) override; diff --git a/extension/parquet/include/writer/primitive_column_writer.hpp b/extension/parquet/include/writer/primitive_column_writer.hpp index 6315efbd74..f3bea0323b 100644 --- a/extension/parquet/include/writer/primitive_column_writer.hpp +++ b/extension/parquet/include/writer/primitive_column_writer.hpp @@ -93,7 +93,7 @@ protected: virtual unique_ptr InitializeStatsState(); //! Initialize the writer for a specific page. Only used for scalar types. - virtual unique_ptr InitializePageState(PrimitiveColumnWriterState &state); + virtual unique_ptr InitializePageState(PrimitiveColumnWriterState &state, idx_t page_idx); //! Flushes the writer for a specific page. Only used for scalar types. virtual void FlushPageState(WriteStream &temp_writer, ColumnWriterPageState *state); diff --git a/extension/parquet/include/writer/templated_column_writer.hpp b/extension/parquet/include/writer/templated_column_writer.hpp index 027af57fe6..2ece1ff5a6 100644 --- a/extension/parquet/include/writer/templated_column_writer.hpp +++ b/extension/parquet/include/writer/templated_column_writer.hpp @@ -126,11 +126,12 @@ public: return std::move(result); } - unique_ptr InitializePageState(PrimitiveColumnWriterState &state_p) override { + unique_ptr InitializePageState(PrimitiveColumnWriterState &state_p, + idx_t page_idx) override { auto &state = state_p.Cast>(); - - auto result = make_uniq>(state.total_value_count, state.total_string_size, - state.encoding, state.dictionary); + const auto &page_info = state_p.page_info[page_idx]; + auto result = make_uniq>( + page_info.row_count - page_info.empty_count, state.total_string_size, state.encoding, state.dictionary); return std::move(result); } @@ -341,6 +342,7 @@ public: for (; r < chunk_end; r++) { if (!mask.RowIsValid(r)) { + Printer::PrintF("Encountered invalid value!"); continue; } const TGT target_value = OP::template Operation(data_ptr[r]); diff --git a/extension/parquet/writer/boolean_column_writer.cpp b/extension/parquet/writer/boolean_column_writer.cpp index bcfd78b3ea..3432eb2ed1 100644 --- a/extension/parquet/writer/boolean_column_writer.cpp +++ b/extension/parquet/writer/boolean_column_writer.cpp @@ -86,7 +86,8 @@ void BooleanColumnWriter::WriteVector(WriteStream &temp_writer, ColumnWriterStat } } -unique_ptr BooleanColumnWriter::InitializePageState(PrimitiveColumnWriterState &state) { +unique_ptr BooleanColumnWriter::InitializePageState(PrimitiveColumnWriterState &state, + idx_t page_idx) { return make_uniq(); } diff --git a/extension/parquet/writer/enum_column_writer.cpp b/extension/parquet/writer/enum_column_writer.cpp index 8518019efe..c6ae42827a 100644 --- a/extension/parquet/writer/enum_column_writer.cpp +++ b/extension/parquet/writer/enum_column_writer.cpp @@ -65,7 +65,8 @@ void EnumColumnWriter::WriteVector(WriteStream &temp_writer, ColumnWriterStatist } } -unique_ptr EnumColumnWriter::InitializePageState(PrimitiveColumnWriterState &state) { +unique_ptr EnumColumnWriter::InitializePageState(PrimitiveColumnWriterState &state, + idx_t page_idx) { return make_uniq(bit_width); } diff --git a/extension/parquet/writer/primitive_column_writer.cpp b/extension/parquet/writer/primitive_column_writer.cpp index 9e3515de9d..e1f54ca880 100644 --- a/extension/parquet/writer/primitive_column_writer.cpp +++ b/extension/parquet/writer/primitive_column_writer.cpp @@ -28,7 +28,8 @@ void PrimitiveColumnWriter::RegisterToRowGroup(duckdb_parquet::RowGroup &row_gro row_group.columns.push_back(std::move(column_chunk)); } -unique_ptr PrimitiveColumnWriter::InitializePageState(PrimitiveColumnWriterState &state) { +unique_ptr PrimitiveColumnWriter::InitializePageState(PrimitiveColumnWriterState &state, + idx_t page_idx) { return nullptr; } @@ -114,7 +115,7 @@ void PrimitiveColumnWriter::BeginWrite(ColumnWriterState &state_p) { MaxValue(NextPowerOfTwo(page_info.estimated_page_size), MemoryStream::DEFAULT_INITIAL_CAPACITY)); write_info.write_count = page_info.empty_count; write_info.max_write_count = page_info.row_count; - write_info.page_state = InitializePageState(state); + write_info.page_state = InitializePageState(state, page_idx); write_info.compressed_size = 0; write_info.compressed_data = nullptr;