logo

基于 Postgres 的通知模式(Notifier Pattern)

Published on

Postgres 提供的 LISTEN/NOTIFY 是一项强大的功能,能在多种场景下发挥作用。然而 MySQL 和 SQLite仍然没有类似功能。

LISTEN/NOTIFY 的概念非常简单,几乎无需过多解释:客户端可以订阅特定的主题,其他客户端可以向这些主题发送消息,从而将消息传递给每个订阅者。这个想法可以通过简单的 psql shell 演示:

=# LISTEN test_topic;
LISTEN
Time: 2.828 ms

=# SELECT pg_notify('test_topic', 'test_message');
 pg_notify
-----------

(1 row)

Time: 17.892 ms
Asynchronous notification "test_topic" with payload "test_message" received from server process with PID 98481.

然而,尽管 LISTEN/NOTIFY 的原理简单,很多基于 Postgres 构建的应用在使用它时并未达到最佳效果,导致消耗过多的数据库连接资源,同时也很少考虑故障处理的问题。

通知器模式的核心原则

以下是实现通知器模式的一些关键原则:

  1. LISTEN 是与特定连接绑定的。订阅(LISTEN)成功后,必须保持原始连接可用,以接收消息。
  2. 应用程序中的多个组件可能会因不同需求订阅同一个主题。各组件的使用场景可能完全无关,但共享订阅主题。
  3. Postgres 连接资源是有限的,应尽量减少使用。优化连接数对于大规模应用尤其重要。
  4. 单个连接可以订阅多个主题

基于这些原则,通知器(Notifier) 的角色显而易见:它的任务是为每个进程保留一个单独的 Postgres 连接,允许同一程序的其他组件通过它订阅任意数量的主题,等待通知,并将收到的通知分发给订阅组件

“每个进程单一 Postgres 连接”是关键 。使用通知器模式,可以将 LISTEN/NOTIFY 占用的 Postgres 连接数降低到每个程序一个,这是相较于传统方法的一个显著优势。传统方法为每个主题在每个程序中都建立一个连接。而对于像 Go 语言这样易于实现高效进程内并发的语言,通知器模式可以将 LISTEN/NOTIFY 的连接开销几乎降为零。

Notifier Pattern

通知器的实现细节

以下是一个用于创建新订阅的 Listen 方法:

// Listen 返回一个订阅,调用者可以通过它接收通知通道的消息。
func (l *Notifier) Listen(channel string) *Subscription {
    l.mu.Lock()
    defer l.mu.Unlock()

    existingSubs := l.subscriptions[channel]

    sub := &Subscription{
        channel:        channel,
        listenChan:     make(chan string, 100), // 缓冲通道,用于存储通知消息
        notifyListener: l,
    }
    l.subscriptions[channel] = append(existingSubs, sub)

    if len(existingSubs) > 0 {
        // 如果该通道已有订阅,复用已建立的通道。
        sub.establishedChan = existingSubs[0].establishedChan
        sub.establishedChanClose = func() {} // 空操作,因为不是通道所有者
        return sub
    }

    // 通知器在成功建立 LISTEN 后关闭该通道,订阅者可据此确认。
    sub.establishedChan = make(chan struct{})
    sub.establishedChanClose = sync.OnceFunc(func() { close(sub.establishedChan) })

    l.channelChanges = append(l.channelChanges,
        channelChange{channel, sub.establishedChanClose, channelChangeOperationListen})

    // 取消 WaitForNotification 的阻塞以便立即处理通道变更。
    l.waitForNotificationCancel()

    return sub
}

重要实现细节

  1. 使用缓冲通道和非阻塞发送 通知器使用缓冲通道(如 make(chan string, 100))和非阻塞发送(通过 selectdefault 实现)。 当通知器收到大量通知时,如果每次都等待各个组件成功接收并处理通知,很容易导致系统滞后。因此,通知会通过非阻塞发送方式传递到通道中。如果通道已满,发送操作不会阻塞,而是直接丢弃通知。这种机制依赖缓冲区提供的弹性,确保通知不会轻易被丢弃。

    注意:各组件需及时处理各自的通知队列(inbox),这样即使某个组件滞后,整个系统仍然能够保持健康运行。

  2. 支持多组件订阅同一主题 如果多个组件订阅同一个主题,通知器内部会对订阅按主题进行组织。由于只使用一个连接,因此对于每个主题仅需执行一次 LISTEN 操作。如果发现已有订阅存在,则无需再次执行 LISTEN,而是直接为新组件添加订阅。

  3. 提供订阅确认通道 每个订阅会提供一个“已建立通道”(established channel),当 LISTEN 成功后,该通道会被关闭。这一功能虽然在大多数生产环境中并非必要,但在测试环境中极为重要。

    在测试用例中,如果 pg_notify 发送早于 LISTEN 的启动,通知将会丢失,从而导致测试间歇性失败。通过确认机制,测试用例可以先让通知器执行 LISTEN,等待确认成功后再发送 pg_notify,从而确保通知不丢失。

// EstablishedC 是一个通道,当通知器成功建立连接后,该通道会被关闭。
// 这在测试用例中尤为有用,因为它可以用来确认监听器不仅已启动,
// 而且已成功在通道上开始监听后再继续操作。
// 如果是对已建立通道的新订阅,则 EstablishedC 已经关闭,因此等待它总是安全的。
//
// 无法完全保证通知器一定能成功建立监听,
// 因此调用方通常会结合 context 完成通道(done)、停止通道(stop channel)
// 和/或超时机制一起使用 `select`。
//
// 当通知器停止时,该通道会始终被关闭。
func (s *Subscription) EstablishedC() <-chan struct{} { return s.establishedChan }

可中断的接收机制

SQL 标准中没有提供用于等待通知的规范方法。通常,这需要通过驱动层的特殊函数实现,比如 Pgx 提供的 WaitForNotification

这些方法通常会阻塞,直到接收到通知。然而,当我们仅使用单个连接时,这种阻塞机制可能会导致问题。比如:如果通知器正在一个阻塞的接收循环中,而此时另一个组件想要添加新的订阅并需要发起 LISTEN 操作怎么办?

为了解决这个问题,需要确保等待循环是可中断的。以下是使用 Go 语言实现这一机制的示例代码:

func (l *Notifier) runOnce(ctx context.Context) error {
    // 处理订阅通道的变更,例如 `LISTEN` 或 `UNLISTEN` 操作。
    if err := l.processChannelChanges(ctx); err != nil {
        return err
    }

    // WaitForNotification 是一个阻塞函数。
    // 为了能够定期唤醒处理新的 `LISTEN` 或 `UNLISTEN` 操作,
    // 我们为监听操作设置了一个上下文超时(context deadline)。
    // 如果超时了,但原因与上下文超时无关,则不会将其视为错误。
    notification, err := func() (*pgconn.Notification, error) {
        const listenTimeout = 30 * time.Second

        // 创建一个带超时的上下文,用于控制等待通知的时限。
        ctx, cancel := context.WithTimeout(ctx, listenTimeout)
        defer cancel()

        // 提供一种机制,用于在有新的订阅变更时取消阻塞等待。
        l.mu.Lock()
        l.waitForNotificationCancel = cancel
        l.mu.Unlock()

        // 调用驱动层的 WaitForNotification 等待通知。
        notification, err := l.conn.WaitForNotification(ctx)
        if err != nil {
            return nil, xerrors.Errorf("等待通知时出错: %w", err)
        }

        return notification, nil
    }()
    if err != nil {
        // 如果错误是由于取消(cancellation)或超时引起,
        // 并且父上下文没有错误,则返回 nil。
        if (errors.Is(err, context.Canceled) ||
            errors.Is(err, context.DeadlineExceeded)) && ctx.Err() == nil {
            return nil
        }

        return err
    }

    // 在发送通知之前加读锁,确保多线程安全。
    l.mu.RLock()
    defer l.mu.RUnlock()

    // 通知订阅者(如果没有订阅者或订阅为空,则此操作无效)。
    for _, sub := range l.subscriptions[notification.Channel] {
        sub.listenChan <- notification.Payload
    }

    return nil
}

内部闭包的处理机制

闭包调用了 WaitForNotification 方法,但设置了一个默认的上下文超时(30 秒),使函数能够周期性地循环运行。此外,它还将上下文的取消函数 l.waitForNotificationCancel 存储下来。

当调用 Listen 方法并需要添加新订阅时,会调用 l.waitForNotificationCancel。这会立即取消当前的等待操作,处理新的订阅请求后,闭包会重新进入等待状态。

“让它崩溃” 的思想

由于程序中所有通知的处理都依赖单个主连接,因此监控该连接的健康状态并做出适当反应至关重要。如果不进行妥善处理,所有基于 LISTEN/NOTIFY 的功能都会同时崩溃。

一种显而易见的反应方法是:关闭当前连接,使用连接池获取一个新连接,重新为每个活动订阅发起 LISTEN,然后重新进入等待循环。

但是,有时候要保证状态能够被干净地重置可能会比较复杂。因此,另一种选择是遵循“让它崩溃”的思路: 当连接无法恢复时,停止程序,让程序通过正常的启动逻辑重新进入健康状态。

// 如果通知器出现不健康状态,则重启工作进程。
// 这种情况通常不会发生,因为通知器内置了一个重试机制,
// 在放弃之前会尽量保持连接状态。
notifier.AddUnhealthyCallback(closeShutdown)

在实际使用中,我们发现这种极端情况非常罕见(在一年多的使用中只发生过一次)。当它真的发生时,允许程序崩溃并重启并未引发任何额外的中断问题。

PgBouncer 的使用

在使用PgBouncer时,LISTEN 仅在会话池模式(session pooling)下支持,而不支持事务池模式(transaction pooling),因为通知只会发送给发起 LISTEN 操作的原始会话。

使用通知器模式要求应用程序为 LISTEN/NOTIFY 专门分配一个单独的连接,但应用程序的其他部分可以自由地使用 PgBouncer 的事务池模式或语句池模式,从而最大化连接使用效率。