kafka正常启动,flink无法正常消费,也不报错

解决办法:配置属性为从所有分区的最新偏移量开始读取startFromLatest()

package com.atguigu.apitest.tabletest

/*import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, Table, TableEnvironment}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors._*/


import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors._

/**
  * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved 
  *
  * Project: FlinkTutorial
  * Package: com.atguigu.apitest.tabletest
  * Version: 1.0
  *
  * Created by wushengran on 2020/8/10 14:23
  */
object TableApiTest {
  def main(args: Array[String]): Unit = {
    // 1. 创建环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tableEnv = StreamTableEnvironment.create(env)

    tableEnv.connect(
      new Kafka()
        .version("0.11")
        .topic("sensor")
        .startFromLatest()//配置为从所有分区的最新偏移量开始读取
        .property("zookeeper.connect", "node001:2181")
        .property("bootstrap.servers", "node001:9092")
    ).withFormat(new Csv())
      .withSchema(new Schema() 
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE())
      )
      .createTemporaryTable("kafkaInputTable")

    val inputTable: Table = tableEnv.from("kafkaInputTable")
    inputTable.toAppendStream[(String,Long,Double)].print()
    env.execute("table api test! ")

  }
}

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐