1. 安装zookeeper, kafka

2. 启动zookeeper, kafka server

3. 准备工作

在Mysql数据库创建一个table, t_student

加入maven需要的flink资源

1.10.0

2.12

org.scala-lang

scala-library

2.12.7

org.apache.flink

flink-streaming-scala_${scala.binary.version}

${flink.version}

org.apache.flink

flink-scala_${scala.binary.version}

${flink.version}

org.apache.flink

flink-connector-kafka-0.10_${scala.binary.version}

${flink.version}

org.apache.flink

flink-java

${flink.version}

compile

org.apache.flink

flink-streaming-java_${scala.binary.version}

${flink.version}

compile

消费端importcom.alibaba.fastjson.JSON;importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;importjava.util.Properties;public classConsumerMain {

public static voidmain(String[] args) throwsException {

finalStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Properties props = newProperties();props.put("bootstrap.servers","localhost:9092");props.put("zookeeper.connect","localhost:2181");props.put("group.id","metric-group");props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset","latest");SingleOutputStreamOperator student = env.addSource(newFlinkKafkaConsumer010<>(

"student",newSimpleStringSchema(),props)).setParallelism(1)

.map(string -> JSON.parseObject(string,Student.class));student.addSink(newSinkToMySQL());env.execute("Flink add sink");}

}

public classSinkToMySQL extendsRichSinkFunction {

PreparedStatement ps;privateConnection connection;/*****@paramparameters*@throwsException*/@Overridepublic voidopen(Configuration parameters) throwsException {

super.open(parameters);connection= getConnection();String sql = "insert into t_student(id, name, address, age) values(?, ?, ?, ?);";ps= this.connection.prepareStatement(sql);}

@Overridepublic voidclose() throwsException {

super.close();if(connection!= null) {

connection.close();}

if(ps!= null) {

ps.close();}

}

/*** @param value* @param context* @throws Exception*/@Overridepublic voidinvoke(Student value,Context context) throwsException {

System.out.println(" value: "+ JSON.toJSONString(value));ps.setInt(1,value.getId());ps.setString(2,value.getName());ps.setString(3,value.getAddress());ps.setInt(4,value.getAge());ps.executeUpdate();}

privateConnection getConnection() throwsSQLException {

Connection con = null;try{

Class.forName("com.mysql.jdbc.Driver");con = DriverManager.getConnection("jdbc:mysql://localhost:3306/demo","root","Myh090928");} catch(Exception e) {

System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage());}

returncon;}

}

具体代码可以从这下载

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐