logo

Databricks 集群与 Spark 高并发场景优化指南

Published on

大多数运行在 Spark 上的应用程序(如 ETL 工作负载或机器学习任务)通常都是单线程运行的。我们较少遇到需要同时向数据库集群提交多个作业的高并发应用场景。

本文将总结如何针对高并发应用场景优化 Databricks 集群配置,以及相关的最佳实践。

集群级别优化

1. 启用 Photon 加速引擎

  • Databricks 声称 Photon 是一个向量化查询引擎,可以提升对 Delta Lake 表的读取、写入和复杂连接操作的速度
  • 需要注意的是并非所有 worker 类型都支持 Photon,选择 worker 类型时要考虑这一点

2. 启用磁盘缓存(Delta 缓存)

  • 启用后,parquet 文件的远程副本将缓存在 worker 节点的本地磁盘上
  • 后续查询可直接从缓存获取数据
  • 可以使用支持缓存加速的 worker 类型
  • 也可以通过设置集群属性开启:
spark.databricks.io.cache.enabled="true" 

3. 高并发集群模式

  • 在高并发模式下,用户代码运行在独立的线程中
  • 注意:使用 Unity Catalog 元存储时无法使用此模式
  • 该模式也不支持 Scala 脚本

4. 调整 Spark 最大并发运行数

  • spark.databricks.maxConcurrentRuns 定义了集群可以同时执行的查询数量
  • 默认值为 10
  • 随着集群水平扩展(增加 worker)或垂直扩展(增加资源),可以适当提高此值

5. 启用删除向量

  • 传统方式:每次删除或更新 Delta Lake 表后,相关的 parquet 文件会被标记删除并创建新文件
  • 删除向量方式:将更新和删除记录保存在一个文件(向量)中,避免频繁重建文件
  • 读取时会同时读取这个向量文件以识别已删除或更新的记录
  • 在优化或自动压缩等操作时,才会重建文件
  • 可在建表时启用或通过以下方式修改现有表:
TBLPROPERTIES (delta.enableDeletionVectors = true)

代码层面优化

1. 使用 Databricks 特有功能

  • Optimize: 优化文件大小提升读取效率
  • Auto compaction: 自动压缩小文件
  • Optimized writes: 写入前在执行器间重排数据,生成较大的文件而不是许多小文件

2. 谨慎使用 Spark UDFs

  • UDFs 用于实现 Spark API 不支持的自定义逻辑
  • 缺点:执行效率不如 DataFrame API 方法
  • 原因:Spark 无法高效并行化 UDFs,且有序列化开销
  • 建议:尽可能使用 Spark DataFrame API 操作

3. 合理使用 collect()

  • collect() 会将分布在 worker 节点的数据收集到驱动程序内存
  • 处理大型数据框时要谨慎使用

4. broadcast 变量的使用

  • broadcast 变量在每个 worker 节点都存储一个副本
  • 适用于转换操作中频繁使用的变量
  • 注意:大型变量会占用大量 worker 内存
  • 更新时需要同步所有副本

5. 分区调优

  • 合理设置 shuffle 分区数(默认 200)
  • 建议设置为集群核心数的倍数
  • 可启用自动优化:
spark.databricks.adaptive.autoOptimizeShuffle.enabled=true

6. 启用自适应查询执行(AQE)

  • 在执行阶段之间优化查询
  • 通过以下命令检查是否启用:
spark.conf.get("spark.sql.adaptive.enabled")

7. 谓词下推

  • Spark 自动将过滤条件下推到数据库层面
  • 减少数据读取量,提升性能

8. 合理使用数据框缓存

  • 频繁使用的数据框可用 df.cache() 缓存
  • 注意:缓存会占用 worker 内存,需谨慎使用