-
Notifications
You must be signed in to change notification settings - Fork 18
Description
Current Behavior
Expected Behavior
No response
Steps To Reproduce
def test_milvus_cdc_collection(self, upstream_host, upstream_port, downstream_host, downstream_port):
"""
target: test cdc default
method: create task with default params
expected: create successfully
"""
collection_name = prefix + datetime.now().strftime("%Y_%m_%d_%H_%M_%S_%f")
request_data = {
"milvus_connect_param": {
"host": downstream_host,
"port": int(downstream_port),
"username": "",
"password": "",
"enable_tls": False,
"ignore_partition": False,
"connect_timeout": 10
},
"collection_infos": [
{
"name": collection_name
}
]
}
# create a cdc task
rsp, result = client.create_task(request_data)
assert result
log.info(f"create task response: {rsp}")
task_id = rsp['task_id']
# get the cdc task
rsp, result = client.get_task(task_id)
assert result
log.info(f"get task {task_id} response: {rsp}")
# check create collection and insert entities to collection
connections.connect(host=upstream_host, port=upstream_port)
checker = InsertEntitiesCollectionChecker(host=upstream_host, port=upstream_port, c_name=collection_name)
checker.run()
time.sleep(120)
all_collections = list_collections()
# pause the insert task
log.info(f"start to pause the insert task")
checker.pause()
log.info(f"pause the insert task successfully")
# check the collection in upstream
num_entities_upstream = checker.get_num_entities()
log.info(f"num_entities_upstream: {num_entities_upstream}")
count_by_query_upstream = checker.get_count_by_query()
log.info(f"count_by_query_upstream: {count_by_query_upstream}")
# check the collection in downstream
connections.disconnect("default")
log.info(f"start to connect to downstream {downstream_host} {downstream_port}")
connections.connect(host=downstream_host, port=downstream_port)
all_collections = list_collections()
collection = Collection(name=collection_name)
collection.create_index(field_name="float_vector", index_params={"index_type": "IVF_FLAT", "metric_type": "L2", "params": {"nlist": 128}})
collection.load()
# wait for the collection to be synced
timeout = 120
t0 = time.time()
while True and time.time() - t0 < timeout:
count_by_query_downstream = len(collection.query(expr=checker.query_expr, output_fields=checker.output_fields))
if count_by_query_downstream == count_by_query_upstream:
break
time.sleep(1)
if time.time() - t0 > timeout:
raise Exception(f"Timeout waiting for collection {collection_name} to be synced")
log.info(f"count_by_query_downstream: {count_by_query_downstream}")
assert count_by_query_upstream == count_by_query_downstream
# wait for the collection to be flushed
time.sleep(20)
collection.flush()
num_entities_downstream = collection.num_entities
log.info(f"num_entities_downstream: {num_entities_downstream}")
assert num_entities_upstream == num_entities_downstream, f"num_entities_upstream {num_entities_upstream} != num_entities_downstream {num_entities_downstream}"
# delete the entities in upstream
connections.disconnect("default")
log.info(f"start to connect to upstream {upstream_host} {upstream_port}")
connections.connect(host=upstream_host, port=upstream_port)
log.info(f"start to delete the entities in upstream")
delete_expr = f"int64 in {[i for i in range(0, 3000)]}"
checker.collection.delete(delete_expr)
while True and time.time() - t0 < timeout:
res = checker.collection.query(expr=delete_expr, output_fields=checker.output_fields)
if len(res) == 0:
break
else:
log.info(f"res: {len(res)}")
time.sleep(1)
if time.time() - t0 > timeout:
raise Exception(f"Timeout waiting for delete entities in upstream")
log.info(f"res: {res}")
count_by_query_upstream = len(res)
assert count_by_query_upstream == 0
log.info(f"delete the entities in upstream successfully")
# check the collection in downstream
connections.disconnect("default")
log.info(f"start to connect to downstream {downstream_host} {downstream_port}")
connections.connect(host=downstream_host, port=downstream_port)
collection = Collection(name=collection_name)
collection.load()
# wait for the collection to be synced
timeout = 120
t0 = time.time()
while True and time.time() - t0 < timeout:
count_by_query_downstream = len(collection.query(expr=delete_expr, output_fields=checker.output_fields))
if count_by_query_downstream == count_by_query_upstream:
log.info(f"cost time: {time.time() - t0} to sync the delete entities")
break
else:
log.info(f"count_by_query_downstream: {count_by_query_downstream}")
time.sleep(1)
if time.time() - t0 > timeout:
raise Exception(f"Timeout waiting for collection {collection_name} to be synced")
log.info(f"count_by_query_downstream: {count_by_query_downstream}")
assert count_by_query_upstream == count_by_query_downstream
# drop the collection in upstream
connections.disconnect("default")
log.info(f"start to connect to upstream {upstream_host} {upstream_port}")
connections.connect(host=upstream_host, port=upstream_port)
log.info(f"start to drop the collection in upstream")
checker.collection.drop()
t0 = time.time()
while True and time.time() - t0 < timeout:
if collection_name not in list_collections():
break
time.sleep(1)
log.info(f"collection: {collection_name} still exists")
if time.time() - t0 > timeout:
log.error(f"Timeout waiting for collection {collection_name} to be dropped")
log.info(f"drop the collection in upstream successfully")
# check the collection in downstream
connections.disconnect("default")
log.info(f"start to connect to downstream {downstream_host} {downstream_port}")
connections.connect(host=downstream_host, port=downstream_port)
t0 = time.time()
while True and time.time() - t0 < timeout:
log.info(f"all collections in downstream: {list_collections()}")
if collection_name not in list_collections():
log.info(f"cost time: {time.time() - t0} to drop the collection")
break
time.sleep(1)
log.info(f"collection: {collection_name} still exists")
if time.time() - t0 > timeout:
log.error(f"Timeout waiting for collection {collection_name} to be dropped")
assert collection_name not in list_collections()
### Environment
_No response_
### Anything else?
_No response_
Metadata
Metadata
Assignees
Labels
No labels