-
Notifications
You must be signed in to change notification settings - Fork 7.7k
Closed
Milestone
Description
adapter大致的调度结构:
canalMsgConsumer.connect();
while (running) {
for (int i = 0; i < retry; i++) {
canalMsgConsumer.getMessage() ;
canalMsgConsumer.ack / rollback();
}
}
canalMsgConsumer.disconnect();
这里会在adapter running退出时执行disconnect逻辑,目前部分的connector组件的disconnect逻辑有unsubscribe的行为
比如,以CanalTCPConsumer为例:
public void disconnect() {
canalConnector.unsubscribe();
canalConnector.disconnect();
}
}
tcp模式下unsubscribe发送给canal-server,会导致清理cursor位点,如果此时canal-server出现重启,就会从最新的binlog开始消费,从而binlog出现数据丢失的风险
yangzi1104
Metadata
Metadata
Assignees
Labels
No labels