#博学谷IT技术支持#  

 一、概述

1.1 功能

  • 导入数据:将MySQL、Oracle中的数据导入到Hadoop的HDFS、Hive、HBASE等数据存储系统。
  • 导出数据:从HDFS、Hive中导出数据到关系数据库MySQL等。

1.2 工作机制

  • 将导入或导出命令翻译成MapReduce实现。

1.3 安装测试

 #测试你的sqoop是否能查看MySQL中所有的数据库
 sqoop list-databases \
 --connect jdbc:mysql://hadoop01:3306/ \
 --username root \
 --password 123456

二、数据导入

  • 全量数据:表中的所有数据。
  • 增量数据:上次操作之后至今产生的数据。
  • 数据子集:所有数据中的一部分数据。

2.1 全量导入MySQL数据到HDFS

sqoop import \
-Dorg.apache.sqoop.splitter.allow_text_splitter=true \
--connect jdbc:mysql://192.168.88.80:3306/userdb \
--username root \
--password 123456 \
--target-dir /sqoop/result3 \
--delete-target-dir \
--fields-terminated-by '\t' \
--split-by name \
--table emp \
--m 2

2.2 全量数据导入至Hive

sqoop import \
--connect jdbc:mysql://192.168.88.80:3306/userdb \
--username root \
--password 123456 \
--table emp_conn \
--hive-import \
--hive-database test \
--fields-terminated-by '\t' \
--m 1 

2.3 全量数据导入至Hive-Hcatalog

sqoop import \
--connect jdbc:mysql://192.168.88.80:3306/userdb \
--username root \
--password 123456 \
--table emp \
--fields-terminated-by '\t' \
--hcatalog-database test \
--hcatalog-table emp_hive \
-m 1

2.4 增量导入

  • append模式
    • 必须有一列自增的值,按照自增的int值判断。
    • 无法导入更新的数据。
#第一次,全量导入-首先执行以下指令先将我们之前的数据导入
sqoop import \
--connect jdbc:mysql://192.168.88.80:3306/userdb \
--username root \
--password 123456 \
--target-dir /sqoop/appendresult \
--table emp \
--m 1

#查看生成的数据文件,发现数据已经导入到hdfs中.

#模拟新增加数据,然后在mysql的emp中插入2条数据:
insert into `userdb`.`emp` (`id`, `name`, `deg`, `salary`, `dept`) values ('1206', 'allen', 'admin', '30000', 'tp');
insert into `userdb`.`emp` (`id`, `name`, `deg`, `salary`, `dept`) values ('1207', 'woon', 'admin', '40000', 'tp');

#第二次导入,执行如下的指令,实现增量的导入:
sqoop import \
--connect jdbc:mysql://192.168.88.80:3306/userdb \
--username root \
--password 123456 \
--table emp --m 1 \
--target-dir /sqoop/appendresult \
--incremental append \
--check-column id \
--last-value 1205
  • lastmodified模式
    • 必须包含动态时间变化一列,按照数据变化的时间判断。
    • 既导入新增的数据也导入更新的数据。
# 首先我们要在mysql中创建一个customer表,指定一个时间戳字段
create table userdb.customertest(
  id int,name varchar(20),
  last_mod timestamp default current_timestamp on update current_timestamp
);
#此处的时间戳设置为在数据的产生和更新时都会发生改变. 

#插入如下记录:
insert into userdb.customertest(id,name) values(1,'neil');
insert into userdb.customertest(id,name) values(2,'jack');
insert into userdb.customertest(id,name) values(3,'martin');
insert into userdb.customertest(id,name) values(4,'tony');
insert into userdb.customertest(id,name) values(5,'eric');

#第一次全量导入:此时执行sqoop指令将数据导入hdfs:
sqoop import \
--connect jdbc:mysql://192.168.88.80:3306/userdb \
--username root \
--password 123456 \
--target-dir /sqoop/lastmodifiedresult \
--table customertest --m 1

#再次插入一条数据进入customertest表
insert into userdb.customertest(id,name) values(6,'james');
#更新一条已有的数据,这条数据的时间戳会更新为我们更新数据时的系统时间.
update userdb.customertest set name = 'NEIL' where id = 1;


#第二次导入:执行如下指令,把id字段作为merge-key:
sqoop import \
--connect jdbc:mysql://192.168.88.80:3306/userdb \
--username root \
--password 123456 \
--table customertest \
--target-dir /sqoop/lastmodifiedresult \
--incremental lastmodified \
--check-column last_mod \
--last-value '2022-10-08 15:40:27' \
--m 1 \
--merge-key id 
  • --query过滤
sqoop import \
--connect jdbc:mysql://192.168.88.80:3306/userdb \
--username root \
--password 123456 \
--query "select * from customertest where last_mod  >= '2022-09-12 00:00:00' and last_mod <= '2022-09-12 23:59:59'   and  \$CONDITIONS" \
--fields-terminated-by '\001' \
--hcatalog-database test \
--hcatalog-table customertest \
-m 1
Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐