canal监控mysql表结构_canal 监控数据库表 快速使用
package com.guo.bootcanal.web;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.protocol.CanalEntry.*;import com
package com.guo.bootcanal.web;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;public classSimpleCanalClientExample {public static voidmain(String args[]) {//创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("120.79.210.194",8777), "example", "", "");int batchSize = 1000;int emptyCount = 0;try{
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();while (true) {
Message message= connector.getWithoutAck(batchSize); //获取指定数量的数据
long batchId =message.getId();int size =message.getEntries().size();if (batchId == -1 || size == 0) {
System.out.println("waitting...");try{
Thread.sleep(8000);
}catch(InterruptedException e) {
}
}else{
System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId);//提交确认//connector.rollback(batchId);//处理失败, 回滚数据
}//System.out.println("empty too many times, exit");
} catch(Exception e) {
System.out.println("Error:" +e.getMessage());
}finally{
connector.disconnect();
}
}private static void printEntry(Listentrys) {for(Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() ==EntryType.TRANSACTIONEND) {continue;
}
RowChange rowChage= null;try{
rowChage=RowChange.parseFrom(entry.getStoreValue());
}catch(Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" +entry.toString(),
e);
}
EventType eventType=rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));for(RowData rowData : rowChage.getRowDatasList()) {if (eventType ==EventType.DELETE) {
System.out.println("-------> delete");
printColumn(rowData.getBeforeColumnsList());
}else if (eventType ==EventType.INSERT) {
System.out.println("-------> insert");
printColumn(rowData.getAfterColumnsList());
}else{
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}private static void printColumn(Listcolumns) {for(Column column : columns) {
System.out.println(column.getName() + ":" + column.getValue() + "update=" +column.getUpdated());
}
}
}
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐


所有评论(0)