数据湖之Hudi(9):使用Spark向Hudi中插入数据
目录0. 相关文章链接1. 开发说明2. 环境构建2.1. 构建服务器环境2.2. 构建Maven项目3. Maven依赖4. 核心代码0. 相关文章链接大数据基础知识点 文章汇总1. 开发说明Apache Hudi最初是由Uber开发的,旨在以高效率实现低延迟的数据库访问。Hudi 提供了Hudi 表的概念,这些表支持CRUD操作,基于Spark框架使用Hudi API 进行读写操作。2. 环境
目录
0. 相关文章链接
1. 开发说明
Apache Hudi最初是由Uber开发的,旨在以高效率实现低延迟的数据库访问。Hudi 提供了Hudi 表的概念,这些表支持CRUD操作,基于Spark框架使用Hudi API 进行读写操作。
2. 环境构建
2.1. 构建服务器环境
关于构建Spark向Hudi中插入数据的服务器环境,可以参考博文的另外一篇博文,在CentOS7上安装HDFS即可,博文连接:数据湖之Hudi(6):Hudi与Spark和HDFS的集成安装使用
2.2. 构建Maven项目
需要在IDEA中创建一个Maven工程,并将服务器上的core-site.xml 和 hdfs-site.xml 这2个配置文件导入,以及创建一个log4j.properties文件,如下图所示:
log4j.properties 文件内容如下:
log4j.rootCategory=WARN, console
log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
注意,这是本地跑程序,需要配置好域名映射。
3. Maven依赖
<repositories>
<repository>
<id>aliyun</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>jboss</id>
<url>http://repository.jboss.com/nexus/content/groups/public</url>
</repository>
</repositories>
<properties>
<scala.version>2.12.10</scala.version>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.0.0</spark.version>
<hadoop.version>3.0.0</hadoop.version>
<hudi.version>0.9.0</hudi.version>
</properties>
<dependencies>
<!-- 依赖Scala语言 -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- Spark Core 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Spark SQL 依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Hadoop Client 依赖 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- hudi-spark3 -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark3-bundle_2.12</artifactId>
<version>${hudi.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
<build>
<outputDirectory>target/classes</outputDirectory>
<testOutputDirectory>target/test-classes</testOutputDirectory>
<resources>
<resource>
<directory>${project.basedir}/src/main/resources</directory>
</resource>
</resources>
<!-- Maven 编译的插件 -->
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
4. 核心代码
在上述图片的包中新建scala的object对象,对象名为:Demo01_InsertForCOW,用于实现模拟数据,插入Hudi表,采用COW模式。
具体需求:使用官方QuickstartUtils提供模拟产生Trip数据,模拟100条交易Trip乘车数据,将其转换为DataFrame数据集,保存至Hudi表中,代码基本与spark-shell命令行一致
具体代码如下:
package com.ouyang.hudi.crud
import org.apache.hudi.QuickstartUtils.DataGenerator
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
/**
* @ date: 2022/2/23
* @ author: yangshibiao
* @ desc: 模拟数据,插入Hudi表,采用COW模式
* 使用官方QuickstartUtils提供模拟产生Trip数据,
* 模拟100条交易Trip乘车数据,将其转换为DataFrame数据集,
* 保存至Hudi表中,代码基本与spark-shell命令行一致
*/
object Demo01_InsertForCOW {
def main(args: Array[String]): Unit = {
// 创建SparkSession实例对象,设置属性
val spark: SparkSession = {
SparkSession.builder()
.appName(this.getClass.getSimpleName.stripSuffix("$"))
.master("local[4]")
// 设置序列化方式:Kryo
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
}
// 定义变量:表名称、保存路径
val tableName: String = "tbl_trips_cow"
val tablePath: String = "/hudi-warehouse/tbl_trips_cow"
// 构建数据生成器,模拟产生业务数据
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConverters._
import spark.implicits._
// 第1步、模拟乘车数据
val dataGen: DataGenerator = new DataGenerator()
val inserts = convertToStringList(dataGen.generateInserts(100))
// 将集合对象写入到df中
val insertDF: DataFrame = spark.read.json(
spark.sparkContext.parallelize(inserts.asScala, 2).toDS()
)
insertDF.printSchema()
insertDF.show(10, truncate = false)
// TOOD: 第2步、插入数据到Hudi表
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
insertDF.write
.mode(SaveMode.Append)
.format("hudi")
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
// Hudi 表的属性值设置
.option(PRECOMBINE_FIELD.key(), "ts")
.option(RECORDKEY_FIELD.key(), "uuid")
.option(PARTITIONPATH_FIELD.key(), "partitionpath")
.option(TBL_NAME.key(), tableName)
.save(tablePath)
}
}
点击执行后可能会碰到 null\bin\winutils.exe in the Hadoop binaries 问题,这个是在windows本地执行时没有对应环境,可以忽略,如下图所示:
在代码中打印了数据格式和部分数据,如下所示:
root
|-- begin_lat: double (nullable = true)
|-- begin_lon: double (nullable = true)
|-- driver: string (nullable = true)
|-- end_lat: double (nullable = true)
|-- end_lon: double (nullable = true)
|-- fare: double (nullable = true)
|-- partitionpath: string (nullable = true)
|-- rider: string (nullable = true)
|-- ts: long (nullable = true)
|-- uuid: string (nullable = true)
+-------------------+-------------------+----------+-------------------+-------------------+------------------+------------------------------------+---------+-------------+------------------------------------+
|begin_lat |begin_lon |driver |end_lat |end_lon |fare |partitionpath |rider |ts |uuid |
+-------------------+-------------------+----------+-------------------+-------------------+------------------+------------------------------------+---------+-------------+------------------------------------+
|0.4726905879569653 |0.46157858450465483|driver-213|0.754803407008858 |0.9671159942018241 |34.158284716382845|americas/brazil/sao_paulo |rider-213|1645620263263|550e7186-203c-48a8-9964-edf12e0dfbe3|
|0.6100070562136587 |0.8779402295427752 |driver-213|0.3407870505929602 |0.5030798142293655 |43.4923811219014 |americas/brazil/sao_paulo |rider-213|1645074858260|c8d5e237-6589-419e-bef7-221faa4faa13|
|0.5731835407930634 |0.4923479652912024 |driver-213|0.08988581780930216|0.42520899698713666|64.27696295884016 |americas/united_states/san_francisco|rider-213|1645298902122|d64b94ec-d8e8-44f3-a5c0-e205e034aa5d|
|0.21624150367601136|0.14285051259466197|driver-213|0.5890949624813784 |0.0966823831927115 |93.56018115236618 |americas/united_states/san_francisco|rider-213|1645132033863|fd8f9051-b5d2-4403-8002-8bb173df5dc8|
|0.40613510977307 |0.5644092139040959 |driver-213|0.798706304941517 |0.02698359227182834|17.851135255091155|asia/india/chennai |rider-213|1645254343160|160c7699-7f5e-4ec3-ba76-9ae63ae815af|
|0.8742041526408587 |0.7528268153249502 |driver-213|0.9197827128888302 |0.362464770874404 |19.179139106643607|americas/united_states/san_francisco|rider-213|1645452263906|fe9d75c0-f326-4cef-8596-4248a57d1fea|
|0.1856488085068272 |0.9694586417848392 |driver-213|0.38186367037201974|0.25252652214479043|33.92216483948643 |americas/united_states/san_francisco|rider-213|1645133755620|5d149bc7-78a8-46df-b2b0-a038dc79e378|
|0.0750588760043035 |0.03844104444445928|driver-213|0.04376353354538354|0.6346040067610669 |66.62084366450246 |americas/brazil/sao_paulo |rider-213|1645362187498|da2dd8e5-c2d9-45e2-8c96-520927e5458d|
|0.651058505660742 |0.8192868687714224 |driver-213|0.20714896002914462|0.06224031095826987|41.06290929046368 |asia/india/chennai |rider-213|1645575914370|f01e9d28-df30-454c-a780-b56cd5b43ce7|
|0.11488393157088261|0.6273212202489661 |driver-213|0.7454678537511295 |0.3954939864908973 |27.79478688582596 |americas/united_states/san_francisco|rider-213|1645094601577|bd4ae628-3885-4b26-8a50-c14f8e42a265|
+-------------------+-------------------+----------+-------------------+-------------------+------------------+------------------------------------+---------+-------------+------------------------------------+
only showing top 10 rows
运行程序后会发现数据已经插入到HDFS中了,如下图所示:
注:Hudi系列博文为通过对Hudi官网学习记录所写,其中有加入个人理解,如有不足,请各位读者谅解☺☺☺
注:其他相关文章链接由此进(包括Hudi在内的各数据湖相关博文) -> 数据湖 文章汇总

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐
所有评论(0)