在这里插入图片描述

作为一款高性能 MPP 数据库,Apache Doris 在 PB 级数据分析场景中表现出色,但许多用户在数据导入阶段常陷入选择困境。本文将彻底拆解 Stream Load、Broker Load、Routine Load、Insert into 四大核心方案,从原理解析到导入调优,从数据转换到故障排查,提供一站式解决方案。

一、技术原理深度拆解

1.1 核心数据流向

Client(提交任务) → [FE (协调任务)] → BE (数据写入/均衡)

FE节点:负责元数据管理、任务调度

BE节点:数据导入、副本同步、数据分片存储

注:stream load可以直接指定BE导入数据

1.2 方案对比

维度 Stream Load Broker Load Routine Load Insert into
使用场景 本地文件 / 程序写入 对象存储 / HDFS Kafka 实时写入 JBDC 接口 / 外部表 / 对象存储 / HDFS
单次导入数据量 小于 10GB 可达数百 GB 微批导入 MB 到 GB 视内存而定
支持文件格式 CSV、JSON、Parquet、ORC CSV、JSON、Parquet、ORC CSV、JSON SQL 方式获取

二、数据导入转换及优化分析

2.1 数据转换关键技术

数据转换

转化案例

LOAD LABEL db.transform_demo (
  DATA INFILE("hdfs://path/data.csv")
  INTO TABLE target_table
  COLUMNS TERMINATED BY ","
  (raw_ts, uid, temp_value, dyn_col)
  PRECEDING FILTER uid = 666
  SET (
      event_time = from_unixtime(raw_ts),
      value = if(temp_value = 'null', null, cast(temp_value as DECIMAL(10,2))),
      category = case
          when dyn_col in ('A','B') then 'Group1'
          else 'Group2'
      end
  )
  WHERE raw_ts > 666
)

关键技术点:

  • 列映射:将源数据列映射到目标表的不同列。【可调整列顺序及只导入特定列
  • 列变换:使用函数和表达式对源数据进行实时转换。【案例中 SET 设置
  • 前置过滤:在列映射和列变换前过滤掉不需要的原始数据。【案例中 PRECEDING 设置
  • 后置过滤:在列映射和列变换后对数据最终结果进行过滤。【案例中 WHERE 设置

2.2 性能优化体系

优先判断是不是资源的瓶颈,搜集CPU、内存、IO、网卡的监控!!!

1.Group Commit 优化机制

通过将多个小批量导入在后台合并成一个大的事务提交,显著提升了高并发小批量写入的性能。

主要针对:

  • INSERT INTO tbl VALUES(…) 语句
  • Stream Load 导入

参数配置优化:

参数 默认值 优化建议
group_commit_interval_ms 1000 根据业务对数据可见性延迟的容忍度来设置
group_commit_data_bytes 104857600 根据系统内存资源和数据可靠性要求来权衡。
group_commit off_mode sync_mode:适用于高并发写入场景;async_mode:适用于写入延迟敏感以及高频写入
2.MemTable 优化

MemTable 前移进一步减少导入过程中的开销,MemTable 将生成的 Segment 数据发给下游节点,减少了数据多次编码的开销,同时使内存反压更准确和及时。此外,我们使用了 Streaming RPC 来替代了 Ping-pong RPC,减少了数据传输过程中的等待。

MemTable 前移在 2.1 版本中默认开启。

如果在使用过程中遇到问题、希望回退到原有的导入方式,可以在 MySQL 连接中设置环境变量 enable_memtable_on_sink_node=false 来关闭 MemTable 前移。

3.单副本导入

该能力会从多个副本中选择一个副本作为主副本(其他副本为从副本),且只对主副本进行计算,当主副本的数据文件都写入成功后,通知从副本所在节点直接拉取主副本的数据文件,实现副本间的数据同步,当所有从副本节点拉取完后进行返回或超时返回(大多数副本成功即返回成功)。

开启参数

FE 配置:enable_single_replica_load = true
BE 配置:enable_single_replica_load = true
环境变量(insert into): set experimental_enable_single_replica_insert = true;
4.参数调优

Stream Load

设置sync_tablet_meta=false(无高可用需求时),减少元数据同步。
控制并发数不超过 BE 的 HTTP Server 线程数(默认 48)(be 参数webserver_num_workers)。

Routine Load

任务配置:
增大批次参数:max_batch_interval=60s 
max_batch_size=1G。(注意数据可见性的时间要求)
调整并发数:desired_concurrent_number需小于 BE 的routine_load_consumer_pool_size(默认 10)。
Fe:max_routine_load_task_num_per_be=1024,max_routine_load_task_concurrent_num=256
Be:routine_load_consumer_pool_size=10

Broker Load

并行度调优:
设置任务参数:load_parallelism=8、send_batch_parallelism=1。(后者仅在关闭 memtable 前移时生效)
调整 FE 全局参数:max_broker_concurrency=10。

本次导入并发数 = Math.min(源文件大小/min_bytes_per_broker_scanner,max_broker_concurrency,当前BE节点个数 * load_parallelism)

提高compaction速度

BE 参数

max_base_compaction_task_num_per_disk:默认值 2,每个磁盘最大compaction任务数 (视情况增大)
vertical_compaction_num_columns_per_group:默认值 5,在列式 compaction 中,组成一个合并组的列个数 (视情况增大)
max_base_compaction_threads :默认值 2, Base Compaction 线程池中线程数量的最大值,-1 表示每个磁盘一个线程。(视情况增大)

FE 参数

async_loading_load_task_pool_size:默认值10, loading_load任务执行程序池大小。该池大小限制了正在运行的最大 loading_load任务数。当前,它仅限制 broker load的 loading_load任务的数量。

三、避坑指南:常见故障解析

  1. Stream load : Doris的Stream Load那些事儿,你踩过哪些“坑”?

  2. Routine load:Doris的Routine Load导入指南

  3. 导入慢分析及优化:Doris 导入慢该如何排查及优化?

四、全景方案选型决策树

通过本文的系统性梳理,读者可精准匹配业务场景与技术方案。建议收藏本文作为 Doris 数据导入的案头手册,随时应对各类数据接入挑战。如需更详细指导,关注公众号回复 “微信” 进入Doris官方社区获取原厂人员指导!

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐