1、用到的各种工具

  • jdk-1.8
  • scala-2.12.12
  • flink-1.14
  • hive-3.1.2
  • hbase-2.2.4
  • zookeeper-3.5.7
  • hadoop-3.1.3
  • idea-2024
  • sbt-1.10

2、项目实现流程

  1. 数据结构

    • 数据来源为自造的数据,模仿的是电商数据
    • 以下,是kafka中获取到的json数据格式:
      {
      “order_id”: 3989,
      “order_sn”: “2024090739040063”,
      “customer_id”: 7723,
      “shipping_user”: “商凤兰”,
      “province”: “上海市”,
      “city”: “上海市”,
      “address”: “上海市上海市淮海中路3817022号19层”,
      “create_time”:“20240712023342”
      }
  2. 创建hive外部表关联hbase表(先创建hbase表,再创建hive外部表关联)

# hbase 中创建表很简单,“hbase_hive”为表名 “cf”为列族,创建表有两种写法:
create "hbase_hive",{NAME => "cf"}
# 创建表的简写
create "hbase_hive" "cf"

在这里插入图片描述

# 创建一个表名为 hive_hbase 的Hive外部表管理Hbase的 hbase_hive 表,创建hive外部表关联hbase表,其目的就是用hive更好的管理hbase表
create table hive_hbase (
# key 是作为hbase表的行键,底下的 cf 是作为hbase的order表的列族,后面跟着的字段是作为列限定符
create external table hive_hbase (
key string,
order_id string,
order_sn string,
customer_id string,
shipping_user string,
province string,
create_time string
)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties ("hbase.columns.mapping" = 
"
:key,
cf:order_id,
cf:order_sn,
cf:customer_id,
cf:shipping_user,
cf:province,
cf:create_time
"
)
tblproperties ("hbase.table.name" = "hbase_hive","hbase.mapred.output.outputtable" = "hbase_hive");

在这里插入图片描述

  1. 创建完表之后,在idea中用flink将kafka数据抽取存入hbase中
    • 使用flink的dataStreamAPI和tableAPI
      • 可能会使用到的flink依赖项:
      • 在这里插入图片描述
    • 代码部分,用的scala语言编写的
import com.alibaba.fastjson2.JSON
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.Expressions.{concat, currentTimestamp, dateFormat, row}
import org.apache.flink.table.api.{$, DataTypes, LiteralIntExpression}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

import scala.util.Random


object Test {
	//	  使用样例类更好对应的管理kafka中的json数据
  case class order_master(
                           order_id: String,
                           order_sn: String,
                           customer_id: String,
                           shipping_user: String,
                           province: String,
                           create_time: String
                         )

  def main(args: Array[String]): Unit = {
    //    创建流执行环境
    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
    //    创建表执行环境
    val tableEnv = StreamTableEnvironment.create(streamEnv)
    //    连接kafka
    val kafka_source = KafkaSource.builder[String]()
      .setBootstrapServers("bigdata1:9092")
      .setTopics("fact_order_master")
      .setGroupId("test-consumer-group")
      .setValueOnlyDeserializer(new SimpleStringSchema())
      .setStartingOffsets(OffsetsInitializer.earliest())
      .build()
    //    获取kafka数据,不设置水印,放入样例类
    val stream_data = streamEnv.fromSource(kafka_source, WatermarkStrategy.noWatermarks(), "kafka_source")
      .map(data => {
        val jSONObject = JSON.parseObject(data)
        val order_id = jSONObject.getString("order_id")
        val order_sn = jSONObject.getString("order_sn")
        val customer_id = jSONObject.getString("customer_id")
        val shipping_user = jSONObject.getString("shipping_user")
        val province = jSONObject.getString("province")
        val create_time = jSONObject.getString("create_time")
        order_master(order_id, order_sn, customer_id, shipping_user, province, create_time)
      })
    //    将dataStream转换成tableStream
    val transaction_tableStream_table = tableEnv.fromDataStream(stream_data)
    //    创建kafka数据的临时表
    tableEnv.createTemporaryView("kafka_source_table", transaction_tableStream_table)
    //    创建hbase的临时表,用于将其数据存于hbase中
    val hbase_temporary_table =
      """
        |create temporary table hbase_sink_table (
        |key string,
        |cf ROW<order_id string,order_sn string,customer_id string,shipping_user string,province string,create_time string>,
        |primary key (key) not enforced
        |) with (
        |'connector' = 'hbase-2.2',
        |'table-name' = 'hbase_hive',
        |'zookeeper.quorum' = 'bigdata1:2181'
        |)
        |""".stripMargin

    tableEnv.executeSql(hbase_temporary_table)
    //    数据存入hbase中,hbase的行键,将其设置成0-9的随机数+最近时间戳,列族名为cf
    tableEnv.from("kafka_source_table")
      .select(
        concat(Random.nextInt(10).cast(DataTypes.STRING()), dateFormat(currentTimestamp(), "yyyyMMddHHmmssSSS")).as("key"),
        row($("order_id"), $("order_sn"), $("customer_id"), $("shipping_user"), $("province"), $("create_time")).as("cf")
      )
      .executeInsert("hbase_sink_table")
  }
}

  • 代码运行成功后,搁hbase和hive中查看前几条数据
    在这里插入图片描述
    在这里插入图片描述
Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐