-
Notifications
You must be signed in to change notification settings - Fork 18
Closed
Description
Current Behavior
[2025/05/20 08:13:26.733 +00:00] [DEBUG] [pipeline/flow_graph_dd_node.go:206] ["DDNode receive insert messages"] [segmentID=458133418831953688] [channel=cdc-test-downstream-49-rootcoord-dml_6_458133418831953657v0] [numRows=919] [startPosTs=458156517462900739] [endPosTs=458156518996443139]
[2025/05/20 08:13:26.734 +00:00] [WARN] [writebuffer/write_buffer.go:711] ["failed to transfer insert msg to insert data"] [error="field bm25_sparse not found when converting insert msg to insert data: field not found[field=111]"] [errorVerbose="field bm25_sparse not found when converting insert msg to insert data: field not found[field=111]\n(1) attached stack trace\n -- stack trace:\n | github.com/milvus-io/milvus/pkg/v2/util/merr.WrapErrFieldNotFound[...]\n | \t/workspace/source/pkg/util/merr/utils.go:1034\n | github.com/milvus-io/milvus/internal/storage.ColumnBasedInsertMsgToInsertData\n | \t/workspace/source/internal/storage/utils.go:512\n | github.com/milvus-io/milvus/internal/storage.InsertMsgToInsertData\n | \t/workspace/source/internal/storage/utils.go:701\n | github.com/milvus-io/milvus/internal/flushcommon/writebuffer.PrepareInsert\n | \t/workspace/source/internal/flushcommon/writebuffer/write_buffer.go:709\n | github.com/milvus-io/milvus/internal/flushcommon/pipeline.(*writeNode).Operate\n | \t/workspace/source/internal/flushcommon/pipeline/flow_graph_write_node.go:86\n | github.com/milvus-io/milvus/internal/util/flowgraph.(*nodeCtxManager).workNodeStart\n | \t/workspace/source/internal/util/flowgraph/node.go:131\n | runtime.goexit\n | \t/usr/local/go/src/runtime/asm_amd64.s:1700\nWraps: (2) field bm25_sparse not found when converting insert msg to insert data\nWraps: (3) field not found[field=111]\nError types: (1) *withstack.withStack (2) *errutil.withPrefix (3) merr.milvusError"]
[2025/05/20 08:13:26.734 +00:00] [ERROR] [pipeline/flow_graph_write_node.go:88] ["failed to prepare data"] [error="field bm25_sparse not found when converting insert msg to insert data: field not found[field=111]"] [errorVerbose="field bm25_sparse not found when converting insert msg to insert data: field not found[field=111]\n(1) attached stack trace\n -- stack trace:\n | github.com/milvus-io/milvus/pkg/v2/util/merr.WrapErrFieldNotFound[...]\n | \t/workspace/source/pkg/util/merr/utils.go:1034\n | github.com/milvus-io/milvus/internal/storage.ColumnBasedInsertMsgToInsertData\n | \t/workspace/source/internal/storage/utils.go:512\n | github.com/milvus-io/milvus/internal/storage.InsertMsgToInsertData\n | \t/workspace/source/internal/storage/utils.go:701\n | github.com/milvus-io/milvus/internal/flushcommon/writebuffer.PrepareInsert\n | \t/workspace/source/internal/flushcommon/writebuffer/write_buffer.go:709\n | github.com/milvus-io/milvus/internal/flushcommon/pipeline.(*writeNode).Operate\n | \t/workspace/source/internal/flushcommon/pipeline/flow_graph_write_node.go:86\n | github.com/milvus-io/milvus/internal/util/flowgraph.(*nodeCtxManager).workNodeStart\n | \t/workspace/source/internal/util/flowgraph/node.go:131\n | runtime.goexit\n | \t/usr/local/go/src/runtime/asm_amd64.s:1700\nWraps: (2) field bm25_sparse not found when converting insert msg to insert data\nWraps: (3) field not found[field=111]\nError types: (1) *withstack.withStack (2) *errutil.withPrefix (3) merr.milvusError"] [stack="github.com/milvus-io/milvus/internal/flushcommon/pipeline.(*writeNode).Operate\n\t/workspace/source/internal/flushcommon/pipeline/flow_graph_write_node.go:88\ngithub.com/milvus-io/milvus/internal/util/flowgraph.(*nodeCtxManager).workNodeStart\n\t/workspace/source/internal/util/flowgraph/node.go:131"]
panic: field bm25_sparse not found when converting insert msg to insert data: field not found[field=111]
goroutine 507 gp=0xc00110ba40 m=9 mp=0xc0007c2008 [running]:
panic({0x5de91e0?, 0xc001a709f0?})
/usr/local/go/src/runtime/panic.go:811 +0x168 fp=0xc00805baa8 sp=0xc00805b9f8 pc=0x1f58a08
github.com/milvus-io/milvus/internal/flushcommon/pipeline.(*writeNode).Operate(0xc001889320, {0xc0011fab50?, 0xc001a709c0?, 0xf?})
/workspace/source/internal/flushcommon/pipeline/flow_graph_write_node.go:89 +0x10f0 fp=0xc00805bee0 sp=0xc00805baa8 pc=0x4c5eff0
github.com/milvus-io/milvus/internal/util/flowgraph.(*nodeCtxManager).workNodeStart(0xc0019499e0)
/workspace/source/internal/util/flowgraph/node.go:131 +0x26f fp=0xc00805bfc8 sp=0xc00805bee0 pc=0x4c3ff8f
github.com/milvus-io/milvus/internal/util/flowgraph.(*nodeCtxManager).Start.gowrap1()
/workspace/source/internal/util/flowgraph/node.go:95 +0x25 fp=0xc00805bfe0 sp=0xc00805bfc8 pc=0x4c3fce5
runtime.goexit({})
/usr/local/go/src/runtime/asm_amd64.s:1700 +0x1 fp=0xc00805bfe8 sp=0xc00805bfe0 pc=0x1f62501
created by github.com/milvus-io/milvus/internal/util/flowgraph.(*nodeCtxManager).Start in goroutine 460
/workspace/source/internal/util/flowgraph/node.go:95 +0x138
cdc log
[2025/05/20 09:37:25.976 +00:00] [INFO] [reader/replicate_channel_manager.go:992] ["add collection to channel handler"] [channel_name=cdc-test-upstream-50-rootcoord-dml_0_458157790958193462v0] [collection_id=458157790958193462] [collection_name=cdc_create_task_insert_2025_05_20_17_37_24_317527] [seek_channel=cdc-test-upstream-50-rootcoord-dml_0]
[2025/05/20 09:37:25.976 +00:00] [INFO] [msgdispatcher/dispatcher.go:217] ["begin to work"] [pchannel=cdc-test-upstream-50-rootcoord-dml_0] [isMain=true]
[2025/05/20 09:37:25.976 +00:00] [INFO] [reader/replicate_channel_manager.go:975] ["start to handle the msg pack"] [channel_name=cdc-test-upstream-50-rootcoord-dml_0_458157790958193462v0]
[2025/05/20 09:37:25.976 +00:00] [INFO] [reader/replicate_channel_manager.go:1488] ["begin timestamp is 0, use end timestamp"] [target_channel=cdc-test-downstream-50-rootcoord-dml_0] [end_ts=458157941009940481] [hasValidMsg=false]
[2025/05/20 09:37:25.977 +00:00] [DEBUG] [reader/replicate_channel_manager.go:1843] ["time tick msg"] [channel=cdc-test-downstream-50-rootcoord-dml_0] [max_ts=458157941009940481]
[2025/05/20 09:37:26.203 +00:00] [INFO] [maintenance/log_msg.go:69] ["log msg duration reached target"] [module=maintenance] [startTime=2025/05/20 09:29:11.680 +00:00]
[2025/05/20 09:37:26.204 +00:00] [DEBUG] [reader/replicate_channel_manager.go:1744] ["receive msg"] [msg=Insert] [collection_name=cdc_create_task_insert_2025_05_20_17_37_24_317527] [shard_name=cdc-test-upstream-50-rootcoord-dml_0_458157790958193462v0] [data_len=2085]
[2025/05/20 09:37:26.204 +00:00] [DEBUG] [reader/replicate_channel_manager.go:1744] ["receive msg"] [msg=Insert] [collection_name=cdc_create_task_insert_2025_05_20_17_37_24_317527] [shard_name=cdc-test-upstream-50-rootcoord-dml_0_458157790958193462v0] [data_len=915]
[2025/05/20 09:37:26.204 +00:00] [DEBUG] [reader/replicate_channel_manager.go:1843] ["time tick msg"] [channel=cdc-test-downstream-50-rootcoord-dml_0] [max_ts=458157941285191682]
[2025/05/20 09:37:26.578 +00:00] [INFO] [server/cdc_impl.go:1078] ["start to replicate channel"] [channel=cdc-test-downstream-50-rootcoord-dml_0]
[2025/05/20 09:37:26.965 +00:00] [INFO] [writer/channel_writer.go:365] ["receive msg"] [type=Flush] [database=default]
[2025/05/20 09:37:26.988 +00:00] [INFO] [writer/channel_writer.go:375] ["finish to handle msg"] [type=Flush] [database=default]
[2025/05/20 09:37:27.209 +00:00] [DEBUG] [reader/replicate_channel_manager.go:1744] ["receive msg"] [msg=Insert] [collection_name=cdc_create_task_insert_2025_05_20_17_37_24_317527] [shard_name=cdc-test-upstream-50-rootcoord-dml_0_458157790958193462v0] [data_len=2472]
[2025/05/20 09:37:27.209 +00:00] [DEBUG] [reader/replicate_channel_manager.go:1744] ["receive msg"] [msg=Insert] [collection_name=cdc_create_task_insert_2025_05_20_17_37_24_317527] [shard_name=cdc-test-upstream-50-rootcoord-dml_0_458157790958193462v0] [data_len=528]
[2025/05/20 09:37:29.806 +00:00] [INFO] [writer/replicate_message_manager.go:126] ["new replicate message handler"] [channelName=cdc-test-downstream-50-rootcoord-dml_0]
[2025/05/20 09:37:29.841 +00:00] [DEBUG] [writer/channel_writer.go:286] ["replicate msg"] [channel=cdc-test-downstream-50-rootcoord-dml_0] [type=Insert] [db=default] [collection=cdc_create_task_insert_2025_05_20_17_37_24_317527] [partition=_default] [insert_data_len=2085]
[2025/05/20 09:37:29.843 +00:00] [DEBUG] [writer/channel_writer.go:286] ["replicate msg"] [channel=cdc-test-downstream-50-rootcoord-dml_0] [type=Insert] [db=default] [collection=cdc_create_task_insert_2025_05_20_17_37_24_317527] [partition=_default] [insert_data_len=915]
[2025/05/20 09:37:29.948 +00:00] [DEBUG] [writer/channel_writer.go:286] ["replicate msg"] [channel=cdc-test-downstream-50-rootcoord-dml_0] [type=Insert] [db=default] [collection=cdc_create_task_insert_2025_05_20_17_37_24_317527] [partition=_default] [insert_data_len=2472]
[2025/05/20 09:37:29.949 +00:00] [DEBUG] [writer/channel_writer.go:286] ["replicate msg"] [channel=cdc-test-downstream-50-rootcoord-dml_0] [type=Insert] [db=default] [collection=cdc_create_task_insert_2025_05_20_17_37_24_317527] [partition=_default] [insert_data_len=528]
[2025/05/20 09:37:30.208 +00:00] [INFO] [writer/channel_writer.go:365] ["receive msg"] [type=CreateIndex] [collection=cdc_create_task_insert_2025_05_20_17_37_24_317527] [database=default]
[2025/05/20 09:37:30.395 +00:00] [INFO] [writer/channel_writer.go:375] ["finish to handle msg"] [type=CreateIndex] [collection=cdc_create_task_insert_2025_05_20_17_37_24_317527] [database=default]
[2025/05/20 09:37:34.028 +00:00] [INFO] [writer/channel_writer.go:365] ["receive msg"] [type=CreateIndex] [collection=cdc_create_task_insert_2025_05_20_17_37_24_317527] [database=default]
[2025/05/20 09:37:34.067 +00:00] [INFO] [writer/channel_writer.go:375] ["finish to handle msg"] [type=CreateIndex] [collection=cdc_create_task_insert_2025_05_20_17_37_24_317527] [database=default]
[2025/05/20 09:37:34.581 +00:00] [INFO] [writer/channel_writer.go:365] ["receive msg"] [type=CreateIndex] [collection=cdc_create_task_insert_2025_05_20_17_37_24_317527] [database=default]
[2025/05/20 09:37:34.606 +00:00] [INFO] [writer/channel_writer.go:375] ["finish to handle msg"] [type=CreateIndex] [collection=cdc_create_task_insert_2025_05_20_17_37_24_317527] [database=default]
[2025/05/20 09:37:35.165 +00:00] [INFO] [writer/channel_writer.go:365] ["receive msg"] [type=CreateIndex] [collection=cdc_create_task_insert_2025_05_20_17_37_24_317527] [database=default]
[2025/05/20 09:37:35.168 +00:00] [WARN] [retry/retry.go:46] ["retry func failed"] [retried=0] [error="only BM25 Function output field support BM25 metric type: invalid parameter[expected=valid index params][actual=invalid index params]"]
[2025/05/20 09:37:50.182 +00:00] [WARN] [retry/retry.go:46] ["retry func failed"] [retried=4] [error="only BM25 Function output field support BM25 metric type: invalid parameter[expected=valid index params][actual=invalid index params]"]
[2025/05/20 09:38:30.195 +00:00] [WARN] [retry/retry.go:46] ["retry func failed"] [retried=8] [error="only BM25 Function output field support BM25 metric type: invalid parameter[expected=valid index params][actual=invalid index params]"]
[2025/05/20 09:39:06.000 +00:00] [DEBUG] [reader/replicate_channel_manager.go:1843] ["time tick msg"] [channel=cdc-test-downstream-50-rootcoord-dml_0] [max_ts=458157967447425025]
[2025/05/20 09:39:10.207 +00:00] [WARN] [retry/retry.go:46] ["retry func failed"] [retried=12] [error="only BM25 Function output field support BM25 metric type: invalid parameter[expected=valid index params][actual=invalid index params]"]
[2025/05/20 09:39:50.221 +00:00] [WARN] [retry/retry.go:46] ["retry func failed"] [retried=16] [error="only BM25 Function output field support BM25 metric type: invalid parameter[expected=valid index params][actual=invalid index params]"]
[2025/05/20 09:40:30.236 +00:00] [WARN] [retry/retry.go:46] ["retry func failed"] [retried=20] [error="only BM25 Function output field support BM25 metric type: invalid parameter[expected=valid index params][actual=invalid index params]"]
[2025/05/20 09:40:46.198 +00:00] [DEBUG] [reader/replicate_channel_manager.go:1843] ["time tick msg"] [channel=cdc-test-downstream-50-rootcoord-dml_0] [max_ts=458157993700884481]
[2025/05/20 09:41:10.249 +00:00] [WARN] [retry/retry.go:46] ["retry func failed"] [retried=24] [error="only BM25 Function output field support BM25 metric type: invalid parameter[expected=valid index params][actual=invalid index params]"]
[2025/05/20 09:41:50.263 +00:00] [WARN] [retry/retry.go:46] ["retry func failed"] [retried=28] [error="only BM25 Function output field support BM25 metric type: invalid parameter[expected=valid index params][actual=invalid index params]"]
[2025/05/20 09:42:25.999 +00:00] [DEBUG] [reader/replicate_channel_manager.go:1843] ["time tick msg"] [channel=cdc-test-downstream-50-rootcoord-dml_0] [max_ts=458158019862855682]
[2025/05/20 09:42:30.282 +00:00] [WARN] [retry/retry.go:46] ["retry func failed"] [retried=32] [error="only BM25 Function output field support BM25 metric type: invalid parameter[expected=valid index params][actual=invalid index params]"]
[2025/05/20 09:43:10.294 +00:00] [WARN] [retry/retry.go:46] ["retry func failed"] [retried=36] [error="only BM25 Function output field support BM25 metric type: invalid parameter[expected=valid index params][actual=invalid index params]"]
[2025/05/20 09:43:50.308 +00:00] [WARN] [retry/retry.go:46] ["retry func failed"] [retried=40] [error="only BM25 Function output field support BM25 metric type: invalid parameter[expected=valid index params][actual=invalid index params]"]
[2025/05/20 09:44:06.198 +00:00] [DEBUG] [reader/replicate_channel_manager.go:1843] ["time tick msg"] [channel=cdc-test-downstream-50-rootcoord-dml_0] [max_ts=458158046142791681]
[2025/05/20 09:44:30.322 +00:00] [WARN] [retry/retry.go:46] ["retry func failed"] [retried=44] [error="only BM25 Function output field support BM25 metric type: invalid parameter[expected=valid index params][actual=invalid index params]"]
[2025/05/20 09:45:10.350 +00:00] [WARN] [retry/retry.go:46] ["retry func failed"] [retried=48] [error="only BM25 Function output field support BM25 metric type: invalid parameter[expected=valid index params][actual=invalid index params]"]
[2025/05/20 09:45:45.999 +00:00] [DEBUG] [reader/replicate_channel_manager.go:1843] ["time tick msg"] [channel=cdc-test-downstream-50-rootcoord-dml_0] [max_ts=458158072291655681]
[2025/05/20 09:45:50.364 +00:00] [WARN] [retry/retry.go:46] ["retry func failed"] [retried=52] [error="only BM25 Function output field support BM25 metric type: invalid parameter[expected=valid index params][actual=invalid index params]"]
[2025/05/20 09:46:30.379 +00:00] [WARN] [retry/retry.go:46] ["retry func failed"] [retried=56] [error="only BM25 Function output field support BM25 metric type: invalid parameter[expected=valid index params][actual=invalid index params]"]
[2025/05/20 09:47:10.391 +00:00] [WARN] [retry/retry.go:46] ["retry func failed"] [retried=60] [error="only BM25 Function output field support BM25 metric type: invalid parameter[expected=valid index params][actual=invalid index params]"]
[2025/05/20 09:47:26.398 +00:00] [DEBUG] [reader/replicate_channel_manager.go:1843] ["time tick msg"] [channel=cdc-test-downstream-50-rootcoord-dml_0] [max_ts=458158098610913281]
[2025/05/20 09:47:50.402 +00:00] [WARN] [retry/retry.go:46] ["retry func failed"] [retried=64] [error="only BM25 Function output field support BM25 metric type: invalid parameter[expected=valid index params][actual=invalid index params]"]
[2025/05/20 09:48:30.431 +00:00] [WARN] [retry/retry.go:46] ["retry func failed"] [retried=68] [error="only BM25 Function output field support BM25 metric type: invalid parameter[expected=valid index params][actual=invalid index params]"]
test code
@pytest.mark.parametrize("nullable", [True])
def test_cdc_sync_insert_with_full_datatype_request(self, upstream_host, upstream_port, downstream_host, downstream_port, nullable):
"""
target: test cdc default
method: insert entities in upstream
expected: entities in downstream is inserted
"""
connections.connect(host=upstream_host, port=upstream_port, token="root:Milvus")
collection_name = prefix + "insert_" + datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f')
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True, description="id"),
FieldSchema(name="int64", dtype=DataType.INT64, nullable=nullable, description="int64"),
FieldSchema(name="float", dtype=DataType.FLOAT, nullable=nullable, description="float"),
FieldSchema(name="varchar", dtype=DataType.VARCHAR, max_length=1024, nullable=nullable, enable_match=True, enable_analyzer=True, description="varchar"),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=1024, enable_match=True, enable_analyzer=True, description="text"),
FieldSchema(name="json", dtype=DataType.JSON, nullable=nullable, description="json"),
FieldSchema(name="bool", dtype=DataType.BOOL, nullable=nullable, description="bool"),
FieldSchema(name="array", dtype=DataType.ARRAY, element_type=DataType.INT64, nullable=nullable, max_capacity=10, description="array"),
FieldSchema(name="float_embedding", dtype=DataType.FLOAT_VECTOR, dim=128, description="float_embedding"),
FieldSchema(name="binary_embedding", dtype=DataType.BINARY_VECTOR, dim=128, description="binary_embedding"),
FieldSchema(name="sparse_vector", dtype=DataType.SPARSE_FLOAT_VECTOR, description="sparse_vector"),
FieldSchema(name="bm25_sparse", dtype=DataType.SPARSE_FLOAT_VECTOR, description="bm25_sparse"),
]
bm25_func = Function(
name="bm25",
function_type=FunctionType.BM25,
input_field_names=["text"],
output_field_names=["bm25_sparse"],
params={})
schema = CollectionSchema(fields, "test collection")
schema.add_function(bm25_func)
c = Collection(name=collection_name, schema=schema)
log.info(f"create collection {collection_name} in upstream")
# insert data to upstream
nb = 3000
data = [
{
"int64": i,
"float": np.float32(i),
"varchar": fake.sentence(),
"text": fake.text(),
"json": {"number": i, "varchar": str(i), "bool": bool(i)},
"bool": bool(i),
"array": [i for _ in range(10)],
"float_embedding": [random.random() for _ in range(128)],
"binary_embedding": [random.randint(0, 255) for _ in range(128//8)],
"sparse_vector": {1: 0.5, 100: 0.3, 500: 0.8, 1024: 0.2, 5000: 0.6}
}
for i in range(nb)
]
c.insert(data)
# add null data
data = [
{
"int64": None,
"float": None,
"varchar": None,
"text": fake.text(),
"json": None,
"bool": None,
"array": [],
"float_embedding": [random.random() for _ in range(128)],
"binary_embedding": [random.randint(0, 255) for _ in range(128//8)],
"sparse_vector": {1: 0.5, 100: 0.3, 500: 0.8, 1024: 0.2, 5000: 0.6}
}
for i in range(nb)
]
c.insert(data)
c.flush(timeout=60)
float_emb_index_params = {"index_type": "HNSW", "params": {"M": 128, "efConstruction": 128}, "metric_type": "L2"}
binary_emb_index_params = {"index_type": "BIN_IVF_FLAT", "params": {"nlist": 128}, "metric_type": "HAMMING"}
sparse_index_params = {"index_type": "SPARSE_INVERTED_INDEX", "params": {}, "metric_type": "IP"}
bm25_index_params = {"index_type": "SPARSE_INVERTED_INDEX", "params": {"k1": 1.2, "b": 0.75}, "metric_type": "BM25"}
c.create_index("float_embedding", float_emb_index_params)
c.create_index("binary_embedding", binary_emb_index_params)
c.create_index("sparse_vector", sparse_index_params)
c.create_index("bm25_sparse", bm25_index_params)
c.load()
# get number of entities in upstream
log.info(f"number of entities in upstream: {c.num_entities}")
upstream_entities = c.num_entities
upstream_index = [index.to_dict() for index in c.indexes]
upstream_count = c.query(
expr="",
output_fields=["count(*)"]
)[0]["count(*)"]
assert upstream_count == upstream_entities
# check collections in downstream
connections.disconnect("default")
self.connect_downstream(downstream_host, downstream_port)
c_downstream = Collection(name=collection_name)
timeout = 120
t0 = time.time()
log.info(f"all collections in downstream {list_collections()}")
while True and time.time() - t0 < timeout:
if time.time() - t0 > timeout:
log.info(f"collection synced in downstream failed with timeout: {time.time() - t0:.2f}s")
break
# get the number of entities in downstream
if c_downstream.num_entities != upstream_entities:
log.info(f"sync progress:{c_downstream.num_entities / upstream_entities * 100:.2f}%")
# collections in subset of downstream
if c_downstream.num_entities == upstream_entities:
log.info(f"collection synced in downstream successfully cost time: {time.time() - t0:.2f}s")
break
time.sleep(10)
try:
c_downstream.flush(timeout=60)
except Exception as e:
log.info(f"flush err: {str(e)}")
assert c_downstream.num_entities == upstream_entities
# check index in downstream
downstream_index = [index.to_dict() for index in c_downstream.indexes]
assert sorted(upstream_index) == sorted(downstream_index)
# check count in downstream
downstream_count = c_downstream.query(
expr="",
output_fields=["count(*)"]
)["count(*)"][0]
assert downstream_count == upstream_count
Expected Behavior
No response
Steps To Reproduce
No response
Environment
milvus version: 2.5-20250520-7fc7e519-amd64
cdc version: main-2025-05-20-4f5d178
Anything else?
No response
Metadata
Metadata
Assignees
Labels
No labels