【flink-stream开发中,将 kafka 的数据写入 hbase 中】
创建hive外部表关联hbase表(先创建hbase表,再创建hive外部表关联)
·
1、用到的各种工具
- jdk-1.8
- scala-2.12.12
- flink-1.14
- hive-3.1.2
- hbase-2.2.4
- zookeeper-3.5.7
- hadoop-3.1.3
- idea-2024
- sbt-1.10
2、项目实现流程
-
数据结构
- 数据来源为自造的数据,模仿的是电商数据
- 以下,是
kafka
中获取到的json数据格式:
{
“order_id”: 3989,
“order_sn”: “2024090739040063”,
“customer_id”: 7723,
“shipping_user”: “商凤兰”,
“province”: “上海市”,
“city”: “上海市”,
“address”: “上海市上海市淮海中路3817022号19层”,
“create_time”:“20240712023342”
}
-
创建hive外部表关联hbase表(先创建hbase表,再创建hive外部表关联)
# hbase 中创建表很简单,“hbase_hive”为表名 “cf”为列族,创建表有两种写法:
create "hbase_hive",{NAME => "cf"}
# 创建表的简写
create "hbase_hive" "cf"
# 创建一个表名为 hive_hbase 的Hive外部表管理Hbase的 hbase_hive 表,创建hive外部表关联hbase表,其目的就是用hive更好的管理hbase表
create table hive_hbase (
# key 是作为hbase表的行键,底下的 cf 是作为hbase的order表的列族,后面跟着的字段是作为列限定符
create external table hive_hbase (
key string,
order_id string,
order_sn string,
customer_id string,
shipping_user string,
province string,
create_time string
)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties ("hbase.columns.mapping" =
"
:key,
cf:order_id,
cf:order_sn,
cf:customer_id,
cf:shipping_user,
cf:province,
cf:create_time
"
)
tblproperties ("hbase.table.name" = "hbase_hive","hbase.mapred.output.outputtable" = "hbase_hive");
- 创建完表之后,在idea中用flink将kafka数据抽取存入hbase中
- 使用flink的dataStreamAPI和tableAPI
- 可能会使用到的flink依赖项:
- 代码部分,用的scala语言编写的
- 使用flink的dataStreamAPI和tableAPI
import com.alibaba.fastjson2.JSON
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.Expressions.{concat, currentTimestamp, dateFormat, row}
import org.apache.flink.table.api.{$, DataTypes, LiteralIntExpression}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import scala.util.Random
object Test {
// 使用样例类更好对应的管理kafka中的json数据
case class order_master(
order_id: String,
order_sn: String,
customer_id: String,
shipping_user: String,
province: String,
create_time: String
)
def main(args: Array[String]): Unit = {
// 创建流执行环境
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
// 创建表执行环境
val tableEnv = StreamTableEnvironment.create(streamEnv)
// 连接kafka
val kafka_source = KafkaSource.builder[String]()
.setBootstrapServers("bigdata1:9092")
.setTopics("fact_order_master")
.setGroupId("test-consumer-group")
.setValueOnlyDeserializer(new SimpleStringSchema())
.setStartingOffsets(OffsetsInitializer.earliest())
.build()
// 获取kafka数据,不设置水印,放入样例类
val stream_data = streamEnv.fromSource(kafka_source, WatermarkStrategy.noWatermarks(), "kafka_source")
.map(data => {
val jSONObject = JSON.parseObject(data)
val order_id = jSONObject.getString("order_id")
val order_sn = jSONObject.getString("order_sn")
val customer_id = jSONObject.getString("customer_id")
val shipping_user = jSONObject.getString("shipping_user")
val province = jSONObject.getString("province")
val create_time = jSONObject.getString("create_time")
order_master(order_id, order_sn, customer_id, shipping_user, province, create_time)
})
// 将dataStream转换成tableStream
val transaction_tableStream_table = tableEnv.fromDataStream(stream_data)
// 创建kafka数据的临时表
tableEnv.createTemporaryView("kafka_source_table", transaction_tableStream_table)
// 创建hbase的临时表,用于将其数据存于hbase中
val hbase_temporary_table =
"""
|create temporary table hbase_sink_table (
|key string,
|cf ROW<order_id string,order_sn string,customer_id string,shipping_user string,province string,create_time string>,
|primary key (key) not enforced
|) with (
|'connector' = 'hbase-2.2',
|'table-name' = 'hbase_hive',
|'zookeeper.quorum' = 'bigdata1:2181'
|)
|""".stripMargin
tableEnv.executeSql(hbase_temporary_table)
// 数据存入hbase中,hbase的行键,将其设置成0-9的随机数+最近时间戳,列族名为cf
tableEnv.from("kafka_source_table")
.select(
concat(Random.nextInt(10).cast(DataTypes.STRING()), dateFormat(currentTimestamp(), "yyyyMMddHHmmssSSS")).as("key"),
row($("order_id"), $("order_sn"), $("customer_id"), $("shipping_user"), $("province"), $("create_time")).as("cf")
)
.executeInsert("hbase_sink_table")
}
}
- 代码运行成功后,搁hbase和hive中查看前几条数据

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐
所有评论(0)