如何让 Kafka 完全运行在对象存储上?
- Published on
概述
在我们继续之前,让我们回顾一下 Kafka 的设计。消息系统使用操作系统文件系统进行数据存储,并利用内核页面缓存机制。系统不是试图将尽可能多的数据保存在内存中并将其刷新到文件系统,而是将所有数据传输到页面缓存,然后再刷新到磁盘。所有消息的写入和读取操作都必须通过页面缓存。
现代操作系统通常会借用未使用的内存(RAM)部分作为页面缓存。经常使用的磁盘数据会被填充到这个缓存中,避免过于频繁地直接访问磁盘,从而提高性能。

这种设计将计算和存储紧密耦合,这意味着添加更多机器是扩展存储的唯一方式。如果需要更多磁盘空间,就必须添加更多 CPU 和 RAM,这可能导致资源浪费。

在经历了由于 Kafka 紧密的计算-存储设计而导致的弹性和资源利用问题后,Uber 提出了 Kafka 分层存储(KIP-405)以避免 Kafka 的紧耦合设计。主要思想是代理将具有两层存储:本地和远程。第一个是代理的本磁盘,用于接收最新数据,而后者使用 HDFS/S3/GCS
等存储来持久化历史数据。

虽然将历史数据卸载到远程存储可以帮助 Kafka broker 的计算和存储层减少相互依赖,但 broker 并不是 100% 无状态的。AutoMQ 的工程师们想知道:"是否有一种方法可以将所有 Kafka 数据存储在对象存储中,同时仍然保持像在本地磁盘上一样的高性能?"
AutoMQ 存储架构
目前,AutoMQ 可以在 AWS、GCS 和 Azure 等主要云提供商上运行,但我将使用 AWS 的技术来描述其架构,以便与我从他们的博客和文档中学到的内容保持一致。
AutoMQ 的目标很简单:通过使其能够将所有消息写入对象存储而不牺牲性能来提高 Kafka 的效率和弹性。
他们通过重用 Apache Kafka 代码进行计算和协议,同时引入共享存储架构来替换 Kafka broker 的本地磁盘来实现这一目标。与维护本地和远程存储的分层存储方法不同,AutoMQ 希望使系统完全无状态。
从宏观角度来看,AutoMQ broker 将消息写入内存缓存。在异步将此消息写入对象存储之前,broker 必须首先将数据写入 WAL 存储以确保数据持久性。

缓存

AutoMQ 使用堆外缓存内存层来处理所有消息的读写,保证实时性能。它管理两种不同的缓存以满足不同需求:
- 日志缓存处理写入和热读取(需要最新数据的读取)
- 块缓存用于冷读取(访问历史数据)
如果数据在日志缓存中不可用,它将从块缓存中读取。块缓存通过使用预取和批量读取等技术提高了即使是历史读取的内存命中率,这有助于在冷读取操作期间保持性能。
预取是一种提前将预计需要的数据加载到内存中的技术,这样在需要时就可以立即使用,减少等待时间。批量读取是一种允许在单个操作中读取多个数据的技术。这减少了读取请求的数量并加快了数据检索速度。
每个缓存都有不同的数据淘汰策略。日志缓存有一个默认的最大大小(可配置)。如果达到限制,缓存将使用先进先出(FIFO)策略淘汰数据,以确保新数据的可用性。对于块缓存,AutoMQ 使用最近最少使用(LRU)策略来淘汰块数据。
内存缓存层为读写操作提供最低延迟;但它受限于机器内存大小且不可靠。如果 broker 机器崩溃,缓存中的数据就会丢失。这就是为什么 AutoMQ 需要一种方法来使数据传输更可靠。
预写日志(WAL)
数据使用直接 IO 从日缓存写入原始 EBS 设备。

EBS 是一种可以附加到 EC2 实例的持久性块级存储设备。Amazon EBS 提供从 SSD 到 HDD 的各种卷类型,允许用户根据需求选择。EBS 多重挂载功能允许将一个 EBS 卷挂载到多个 EC2 实例。在探索 AutoMQ 如何在后台从故障中恢复时,我们将重新讨论多重挂载功能。
EBS 存储充当预写日志(WAL),这是一个用于崩溃和事务恢复的仅追加磁盘结构。使用 B-Tree 进行存储管理的数据库通常包含这种用于恢复的数据结构;每个修改都必须在应用到数据之前通过 WAL。当机器从崩溃中恢复时,它可以读取 WAL 以恢复到之前的状态。

类似地,AutoMQ 将 EBS 设备视为 WAL。broker 必须确保消息已经在 WAL 中,然后才能写入 S3;当 broker 收到消息时,它写入内存缓存,并且只有在消息持久化到 EBS 后才返回"我收到了你的消息"响应。AutoMQ 使用 EBS 中的数据在 broker 失败的情况下进行恢复。我们将在接下来的章节中回到恢复过程。

考虑到 EBS 的高成本(特别是 IOPS 优化的 SSD 类型)是很重要的。由于 AutoMQ 中的 EBS 设备主要用作 WAL 以确保消息持久性,系统只需要很小的 EBS 卷。AutoMQ 默认 WAL 大小设置为 10GB。
对象存储
对象存储存储所有 AutoMQ 数据。用户可以使用 AWS S3 或 Google GCS 等服务作为这一层。云对象服务以其极高的持久性、可扩展性和成本效益而闻名。broker 从日志缓存异步写入数据到对象存储。
AutoMQ 在对象存储中的数据文件具有以下组件:
- DataBlock: 存储实际数据
- IndexBlock: 存储索引
- Footer: 存储文件元数据

- DataBlock 包含实际数据。
- IndexBlock 是由 DataBlockIndex 项组成的固定 36 字节块。项目数量与文件中的 DataBlock 数量相关。每个 DataIndexBlock 中的信息有助于定位 DataBlock 位置。
- Footer 是一个固定的 48 字节块,包含 IndexBlock 的位置和大小,使快速访问索引数据成为可能。
写入过程
从用户的角度来看,AutoMQ 的写入过程与 Apache Kafka 类似。首先创建一个包含消息值和目标主题的记录。然后,消息被序列化并批量通过网络发送。
关键的区别在于 broker 如何处理消息持久化。
在 Kafka 中,broker 将消息写入页面缓存,然后刷新到本地磁盘。它们不实现任何内存缓存,而是将所有工作交给操作系统。
在 AutoMQ 中,情况则大不相同。让我们仔细看看消息写入过程:

- 生产者将消息发送给 broker 并等待响应
- broker 将接收到的消息放入日志缓存(一个堆外内存缓存)
堆外内存在 Java 中是在 Java 堆之外管理的。与由 JVM 处理和垃圾回收的堆内存不同,堆外内存不是自动管理的。开发人员必须手动分配和释放堆外内存。由于 JVM 不会自动清理堆外内存,如果处理不当,这可能更复杂且容易发生内存泄漏。
- 消息随后使用直接 I/O 写入 WAL(EBS)设备。一旦消息成功写入 EBS,broker 就会向生产者发送成功响应。(我将在下一节解释这个过程。)
直接 I/O 是一种通过直接读写磁盘来绕过操作系统文件系统缓存的方法,这可以减少延迟并提高大数据传输的性能。实现直接 I/O 通常需要更复杂的应用程序逻辑,因为开发人员必须管理数据对齐、缓冲区分配和其他低级细节。
- 日志缓存中的消息在落地到 WAL 后会异步写入对象存储。
从缓存到 WAL 的旅程
消息使用 SlidingWindow 抽象从日志缓存写入 WAL,它为每条记录分配写入位置并管理写入过程。SlidingWindow 有几个位置:

- Start Offset: 这个偏移量标记滑动窗口的开始;系统已经写入了这个偏移量之前的记录
- Next Offset: 下一个未写入的位置;新记录从这里开始。Start 和 Next Offsets 之间的数据尚未完全写入
- Max Offset: 这是滑动窗口的结束;当 Next Offset 达到这个点时,它将尝试扩展窗口
为了更好地理解,让我们查看 AutoMQ 用于促进写入 EBS 过程的一些新数据结构:

- block: 最小的 IO 单元,包含一个或多个记录,写入磁盘时对齐到 4 KiB
- writingBlocks: 当前正在写入的块集合;AutoMQ 在完成写入磁盘后移除这些块
- pendingBlocks: 等待写入的块;当 IO 线程池完成时,新块会放在这里,当有空间时移动到 writingBlocks
- currentBlock: 来自缓存的最新日志。需要写入的记录放在这个块中。新记录也在这里分配逻辑偏移量。当 currentBlock 满时,所有块都放入待处理块中。此时,系统将创建一个新的当前块。
在准备好所有先决条件信息后,我们将了解数据写入 EBS 的过程:

- 过程从传入记录的追加请求开始
- 记录被添加到 currentBlock,分配偏移量,并异步返回给调用者
- 如果 currentBlock 达到特定大小或时间限制,它会将所有块移动到 pendingBlocks。AutoMQ 将创建一个新的 currentBlock
- 如果 writingBlocks 少于 IO 线程池大小,则将 pendingBlocks 中的一个块移动到 writingBlocks 进行写入
- 一旦块写入磁盘,它就从 writingBlocks 中移除;系统重新启动滑动窗口的 Start Offset。标记追加请求完成
从缓存到对象存储的旅程

当日志缓存中累积了足够的数据时,AutoMQ 会触发上传到对象存储。LogCache 中的数据按 streamId
和 startOffset
排序。然后 AutoMQ 按批次将数据从缓存写入对象存储,每个批次按相同的顺序上传。
如前所述,对象存储中的数据文件包括 DataBlock、IndexBlock 和 Footer。
在 AutoMQ 完成写入 DataBlock 后,它使用之前写入的信息构建 IndexBlock。由于已经知道每个 DataBlock 在对象中的位置,这些数据用于为每个 DataBlock 创建一个 DataBlockIndex。IndexBlock 中的 DataBlockIndex 数量与 DataBlock 的数量相对应。
最后,Footer 元数据块记录了与 IndexBlock 数据位置相关的信息。
读取过程
AutoMQ 消费者像 Apache Kafka 一样开始消费过程。它们发出带有所需偏移量位置的异步拉取请求。
在收到请求后,broker 搜索消息并将其返回给消费者。消费者根据当前偏移量位置和其长度计算下一个偏移量位置,准备下一个请求:
next_offset = current_offset + current_message_length
物理数据读取路径与传统方式有所不同。
AutoMQ 尝试尽可能多地从内存中提供数据读取服务。最初,Kafka 从页面缓存中读取数据。如果消息不在那里,操作系统将访问磁盘并将所需数据填充到页面缓存中以服务请求。

AutoMQ 中的读取操作遵循以下路径:
- 如果请求需要最近写入的数据,它从日志缓存中读取。需要注意的是,只有已经写入 WAL 的消息才可用于满足请求
- 如果数据不在日志缓存中,操作会检查块缓存
块缓存通过从对象存储加载数据来填充。如果数据仍然找不到,AutoMQ 会尝试预取。预取允许系统加载预计不久后会需要的数据。由于消费者从特定位置顺序读取消息,预取数据可以提高缓存命中率,提升读取性能。
为了加快对象存储中的数据查找速度,broker 使用文件的 Footer
来找到 IndexBlock
的位置。IndexBlock 中的数据按 (streamId, startOffset)
排序,通过二分查找可以快速识别正确的 DataBlock
。
一旦定位到 DataBlock,broker 可以通过遍历 DataBlock 中的所有记录批次来高效地找到所需数据。
DataBlock 中的记录批次数量会影响检索特定偏移量的时间。为了解决这个问题,在上传过程中,来自同一个流的所有数据被分成 1MB 的段,确保每个 DataBlock 中的记录批次数量不会降低检索速度。
恢复机制
如前所述,EBS 存储作为 AutoMQ 的预写日志(WAL),帮助使消息从内存到对象存储的写入过程更可靠。让我们设想一个 AutoMQ 集群有两个 broker(A 和 B)的情况,每个 broker 都有两个关联的 EBS 存储。让我们看看 AutoMQ 是如何实现可靠的消息传输的:

如前所述,只有当 broker 确认消息已经落地到 WAL(EBS)时,消息才被认为成功接收。
那么,如果其中一个 broker(比如 broker A)崩溃了会怎样?该 broker 的 EBS 存储设备会怎样?那些还没有写入对象存储的 EBS 数据又会怎样?
AutoMQ 利用 AWS EBS 多重挂载功能来处理这种情况。在 broker A 宕机后,EBS 设备 A 将被挂载到 broker B。当 broker B 有两个 EBS 卷时,它会通过标签知道哪个是从空闲状态挂载的。Broker B 会将 EBS 存储 A 的数据刷新到 S3,然后删除该卷。此外,当将孤立的 EBS 卷挂载到 Broker B 时,AutoMQ 利用 NVME 预留来防止意外数据写入该卷。这些策略显著加快了故障转移过程。
新创建的 broker 将拥有新的 EBS 存储。
元数据管理
我们将通过探索 AutoMQ 如何管理集群元数据来结束本文。它重用了 Kafka 的 KRaft 机制。我在写 Kafka 系列时没有深入研究 KRaft,所以这是学习这个元数据管理模型的好机会。😊
AutoMQ 利用基于 Kafka 的 Kraft 模式的最新元数据管理架构。
传统的 Kafka 依赖单独的 ZooKeeper 服务器进行集群元数据管理,但 KRaft 消除了 ZooKeeper,简化了 Kafka 并增强了弹性。在 KRaft 模式下,Kafka 使用基于 Raft 的内部控制器仲裁组(一组负责维护和确保元数据一致性的 broker)。Raft 共识算法用于选举领导者并在仲裁组之间复制元数据更改。KRaft 模式下的每个 broker 都保留元数据的本地副本,而控制器仲裁组的领导者管理更新并将其复制到所有 broker,从而减少操作复杂性和潜在的故障点。

AutoMQ 也有一个控制器仲裁组来确定控制器领导者。集群元数据(包括主题/分区与数据之间的映射、分区与 broker 之间的映射等)存储在领导者中。只有领导者可以修改这些元数据;如果 broker 想要更改它,必须与领导者通信。元数据被复制到每个 broker;元数据的任何更改都由控制器传播到每个 broker。
结语
在本文中,我们探索了 AutoMQ 如何创造性地利用云服务来实现一个关键目标:将所有 Kafka 消息存储在几乎无限的对象存储中,同时保持 Kafka 原有的性能和兼容性。