package com.guo.bootcanal.web;

import com.alibaba.otter.canal.client.CanalConnector;

import com.alibaba.otter.canal.client.CanalConnectors;

import com.alibaba.otter.canal.protocol.CanalEntry.*;

import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;

import java.util.List;public classSimpleCanalClientExample {public static voidmain(String args[]) {//创建链接

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("120.79.210.194",8777), "example", "", "");int batchSize = 1000;int emptyCount = 0;try{

connector.connect();

connector.subscribe(".*\\..*");

connector.rollback();while (true) {

Message message= connector.getWithoutAck(batchSize); //获取指定数量的数据

long batchId =message.getId();int size =message.getEntries().size();if (batchId == -1 || size == 0) {

System.out.println("waitting...");try{

Thread.sleep(8000);

}catch(InterruptedException e) {

}

}else{

System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);

printEntry(message.getEntries());

}

connector.ack(batchId);//提交确认//connector.rollback(batchId);//处理失败, 回滚数据

}//System.out.println("empty too many times, exit");

} catch(Exception e) {

System.out.println("Error:" +e.getMessage());

}finally{

connector.disconnect();

}

}private static void printEntry(Listentrys) {for(Entry entry : entrys) {if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() ==EntryType.TRANSACTIONEND) {continue;

}

RowChange rowChage= null;try{

rowChage=RowChange.parseFrom(entry.getStoreValue());

}catch(Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" +entry.toString(),

e);

}

EventType eventType=rowChage.getEventType();

System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",

entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),

entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),

eventType));for(RowData rowData : rowChage.getRowDatasList()) {if (eventType ==EventType.DELETE) {

System.out.println("-------> delete");

printColumn(rowData.getBeforeColumnsList());

}else if (eventType ==EventType.INSERT) {

System.out.println("-------> insert");

printColumn(rowData.getAfterColumnsList());

}else{

System.out.println("-------> before");

printColumn(rowData.getBeforeColumnsList());

System.out.println("-------> after");

printColumn(rowData.getAfterColumnsList());

}

}

}

}private static void printColumn(Listcolumns) {for(Column column : columns) {

System.out.println(column.getName() + ":" + column.getValue() + "update=" +column.getUpdated());

}

}

}

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐