-
Notifications
You must be signed in to change notification settings - Fork 7.7k
Closed
Description
environment
- canal version:1.1.5
- mysql version:5.7.30
- druid version:1.2.6
Issue Description
使用druid解析DDL时,canal.instance.filter.druid.ddl = true时,
alter的DDL语句没有被成功解析。
ALTER TABLE utf8test.test default character set latin1;
类似issue
期待现象
使用druid解析时可以成功解析DDL。
如果不使用druid解析,是可以成功解析DDL并获得以下消息。
canal.instance.filter.druid.ddl = false
kafka中接受的信息
{
"data": null,
"database": "utf8test",
"es": 1632473286000,
"id": 1,
"isDdl": true,
"mysqlType": null,
"old": null,
"pkNames": null,
"sql": "/* ApplicationName=DBeaver 21.1.4 - SQLEditor <test> */ ALTER TABLE utf8test.test default character set latin1",
"sqlType": null,
"table": "test",
"ts": 1632473338798,
"type": "ALTER"
}
实际现象
ddl语句被忽略,没有任何消息被发送至kafka
问题定位
com/alibaba/otter/canal/parse/inbound/mysql/ddl/DruidDdlParser.java 67行,druid解析的alterTable的items成员为空列表,所以parse方法返回的ddlResults为空
com/alibaba/otter/canal/parse/inbound/mysql/dbsync/LogEventConvert.java中的notFilter为false。
for (SQLAlterTableItem item : alterTable.getItems()) // getItems()返回空数组
相关代码如下
List<DdlResult> ddlResults = new ArrayList<>();
for (SQLStatement statement : stmtList) {
if (statement instanceof SQLCreateTableStatement) {
DdlResult ddlResult = new DdlResult();
SQLCreateTableStatement createTable = (SQLCreateTableStatement) statement;
processName(ddlResult, schmeaName, createTable.getName(), false);
ddlResult.setType(EventType.CREATE);
ddlResults.add(ddlResult);
} else if (statement instanceof SQLAlterTableStatement) {
SQLAlterTableStatement alterTable = (SQLAlterTableStatement) statement;
for (SQLAlterTableItem item : alterTable.getItems()) {
if (item instanceof SQLAlterTableRename) {
DdlResult ddlResult = new DdlResult();
processName(ddlResult, schmeaName, alterTable.getName(), true);
processName(ddlResult, schmeaName, ((SQLAlterTableRename) item).getToName(), false);
ddlResult.setType(EventType.RENAME);
ddlResults.add(ddlResult);
} else if (item instanceof SQLAlterTableAddIndex) {
DdlResult ddlResult = new DdlResult();
processName(ddlResult, schmeaName, alterTable.getName(), false);
ddlResult.setType(EventType.CINDEX);
ddlResults.add(ddlResult);
} else if (item instanceof SQLAlterTableDropIndex || item instanceof SQLAlterTableDropKey) {
DdlResult ddlResult = new DdlResult();
processName(ddlResult, schmeaName, alterTable.getName(), false);
ddlResult.setType(EventType.DINDEX);
ddlResults.add(ddlResult);
} else if (item instanceof SQLAlterTableAddConstraint) {
DdlResult ddlResult = new DdlResult();
processName(ddlResult, schmeaName, alterTable.getName(), false);
SQLConstraint constraint = ((SQLAlterTableAddConstraint) item).getConstraint();
if (constraint instanceof SQLUnique) {
ddlResult.setType(EventType.CINDEX);
ddlResults.add(ddlResult);
}
} else if (item instanceof SQLAlterTableDropConstraint) {
DdlResult ddlResult = new DdlResult();
processName(ddlResult, schmeaName, alterTable.getName(), false);
ddlResult.setType(EventType.DINDEX);
ddlResults.add(ddlResult);
} else {
DdlResult ddlResult = new DdlResult();
processName(ddlResult, schmeaName, alterTable.getName(), false);
ddlResult.setType(EventType.ALTER);
ddlResults.add(ddlResult);
}
}
修改建议
漏掉了对于表本身的assign操作item,加上判断:增加对tableOptions的判断,如果有tableOptions,则同样返回ddlResult对象
if (alterTable.getTableOptions().size() > 0) {
DdlResult ddlResult = new DdlResult();
processName(ddlResult, schmeaName, alterTable.getName(), false);
ddlResult.setType(EventType.ALTER);
ddlResults.add(ddlResult);
}