基于 Postgres 的通知模式(Notifier Pattern)
- Published on
Postgres 提供的 LISTEN/NOTIFY 是一项强大的功能,能在多种场景下发挥作用。然而 MySQL 和 SQLite仍然没有类似功能。
LISTEN/NOTIFY
的概念非常简单,几乎无需过多解释:客户端可以订阅特定的主题,其他客户端可以向这些主题发送消息,从而将消息传递给每个订阅者。这个想法可以通过简单的 psql
shell 演示:
=# LISTEN test_topic;
LISTEN
Time: 2.828 ms
=# SELECT pg_notify('test_topic', 'test_message');
pg_notify
-----------
(1 row)
Time: 17.892 ms
Asynchronous notification "test_topic" with payload "test_message" received from server process with PID 98481.
然而,尽管 LISTEN/NOTIFY
的原理简单,很多基于 Postgres 构建的应用在使用它时并未达到最佳效果,导致消耗过多的数据库连接资源,同时也很少考虑故障处理的问题。
通知器模式的核心原则
以下是实现通知器模式的一些关键原则:
LISTEN
是与特定连接绑定的。订阅(LISTEN
)成功后,必须保持原始连接可用,以接收消息。- 应用程序中的多个组件可能会因不同需求订阅同一个主题。各组件的使用场景可能完全无关,但共享订阅主题。
- Postgres 连接资源是有限的,应尽量减少使用。优化连接数对于大规模应用尤其重要。
- 单个连接可以订阅多个主题。
基于这些原则,通知器(Notifier) 的角色显而易见:它的任务是为每个进程保留一个单独的 Postgres 连接,允许同一程序的其他组件通过它订阅任意数量的主题,等待通知,并将收到的通知分发给订阅组件。
“每个进程单一 Postgres 连接”是关键 。使用通知器模式,可以将 LISTEN/NOTIFY
占用的 Postgres 连接数降低到每个程序一个,这是相较于传统方法的一个显著优势。传统方法为每个主题在每个程序中都建立一个连接。而对于像 Go 语言这样易于实现高效进程内并发的语言,通知器模式可以将 LISTEN/NOTIFY
的连接开销几乎降为零。
通知器的实现细节
以下是一个用于创建新订阅的 Listen
方法:
// Listen 返回一个订阅,调用者可以通过它接收通知通道的消息。
func (l *Notifier) Listen(channel string) *Subscription {
l.mu.Lock()
defer l.mu.Unlock()
existingSubs := l.subscriptions[channel]
sub := &Subscription{
channel: channel,
listenChan: make(chan string, 100), // 缓冲通道,用于存储通知消息
notifyListener: l,
}
l.subscriptions[channel] = append(existingSubs, sub)
if len(existingSubs) > 0 {
// 如果该通道已有订阅,复用已建立的通道。
sub.establishedChan = existingSubs[0].establishedChan
sub.establishedChanClose = func() {} // 空操作,因为不是通道所有者
return sub
}
// 通知器在成功建立 LISTEN 后关闭该通道,订阅者可据此确认。
sub.establishedChan = make(chan struct{})
sub.establishedChanClose = sync.OnceFunc(func() { close(sub.establishedChan) })
l.channelChanges = append(l.channelChanges,
channelChange{channel, sub.establishedChanClose, channelChangeOperationListen})
// 取消 WaitForNotification 的阻塞以便立即处理通道变更。
l.waitForNotificationCancel()
return sub
}
重要实现细节
使用缓冲通道和非阻塞发送 通知器使用缓冲通道(如
make(chan string, 100)
)和非阻塞发送(通过select
和default
实现)。 当通知器收到大量通知时,如果每次都等待各个组件成功接收并处理通知,很容易导致系统滞后。因此,通知会通过非阻塞发送方式传递到通道中。如果通道已满,发送操作不会阻塞,而是直接丢弃通知。这种机制依赖缓冲区提供的弹性,确保通知不会轻易被丢弃。注意:各组件需及时处理各自的通知队列(inbox),这样即使某个组件滞后,整个系统仍然能够保持健康运行。
支持多组件订阅同一主题 如果多个组件订阅同一个主题,通知器内部会对订阅按主题进行组织。由于只使用一个连接,因此对于每个主题仅需执行一次
LISTEN
操作。如果发现已有订阅存在,则无需再次执行LISTEN
,而是直接为新组件添加订阅。提供订阅确认通道 每个订阅会提供一个“已建立通道”(
established channel
),当LISTEN
成功后,该通道会被关闭。这一功能虽然在大多数生产环境中并非必要,但在测试环境中极为重要。在测试用例中,如果
pg_notify
发送早于LISTEN
的启动,通知将会丢失,从而导致测试间歇性失败。通过确认机制,测试用例可以先让通知器执行LISTEN
,等待确认成功后再发送pg_notify
,从而确保通知不丢失。
// EstablishedC 是一个通道,当通知器成功建立连接后,该通道会被关闭。
// 这在测试用例中尤为有用,因为它可以用来确认监听器不仅已启动,
// 而且已成功在通道上开始监听后再继续操作。
// 如果是对已建立通道的新订阅,则 EstablishedC 已经关闭,因此等待它总是安全的。
//
// 无法完全保证通知器一定能成功建立监听,
// 因此调用方通常会结合 context 完成通道(done)、停止通道(stop channel)
// 和/或超时机制一起使用 `select`。
//
// 当通知器停止时,该通道会始终被关闭。
func (s *Subscription) EstablishedC() <-chan struct{} { return s.establishedChan }
可中断的接收机制
SQL 标准中没有提供用于等待通知的规范方法。通常,这需要通过驱动层的特殊函数实现,比如 Pgx 提供的 WaitForNotification
。
这些方法通常会阻塞,直到接收到通知。然而,当我们仅使用单个连接时,这种阻塞机制可能会导致问题。比如:如果通知器正在一个阻塞的接收循环中,而此时另一个组件想要添加新的订阅并需要发起 LISTEN
操作怎么办?
为了解决这个问题,需要确保等待循环是可中断的。以下是使用 Go 语言实现这一机制的示例代码:
func (l *Notifier) runOnce(ctx context.Context) error {
// 处理订阅通道的变更,例如 `LISTEN` 或 `UNLISTEN` 操作。
if err := l.processChannelChanges(ctx); err != nil {
return err
}
// WaitForNotification 是一个阻塞函数。
// 为了能够定期唤醒处理新的 `LISTEN` 或 `UNLISTEN` 操作,
// 我们为监听操作设置了一个上下文超时(context deadline)。
// 如果超时了,但原因与上下文超时无关,则不会将其视为错误。
notification, err := func() (*pgconn.Notification, error) {
const listenTimeout = 30 * time.Second
// 创建一个带超时的上下文,用于控制等待通知的时限。
ctx, cancel := context.WithTimeout(ctx, listenTimeout)
defer cancel()
// 提供一种机制,用于在有新的订阅变更时取消阻塞等待。
l.mu.Lock()
l.waitForNotificationCancel = cancel
l.mu.Unlock()
// 调用驱动层的 WaitForNotification 等待通知。
notification, err := l.conn.WaitForNotification(ctx)
if err != nil {
return nil, xerrors.Errorf("等待通知时出错: %w", err)
}
return notification, nil
}()
if err != nil {
// 如果错误是由于取消(cancellation)或超时引起,
// 并且父上下文没有错误,则返回 nil。
if (errors.Is(err, context.Canceled) ||
errors.Is(err, context.DeadlineExceeded)) && ctx.Err() == nil {
return nil
}
return err
}
// 在发送通知之前加读锁,确保多线程安全。
l.mu.RLock()
defer l.mu.RUnlock()
// 通知订阅者(如果没有订阅者或订阅为空,则此操作无效)。
for _, sub := range l.subscriptions[notification.Channel] {
sub.listenChan <- notification.Payload
}
return nil
}
内部闭包的处理机制
闭包调用了 WaitForNotification
方法,但设置了一个默认的上下文超时(30 秒),使函数能够周期性地循环运行。此外,它还将上下文的取消函数 l.waitForNotificationCancel
存储下来。
当调用 Listen
方法并需要添加新订阅时,会调用 l.waitForNotificationCancel
。这会立即取消当前的等待操作,处理新的订阅请求后,闭包会重新进入等待状态。
“让它崩溃” 的思想
由于程序中所有通知的处理都依赖单个主连接,因此监控该连接的健康状态并做出适当反应至关重要。如果不进行妥善处理,所有基于 LISTEN/NOTIFY
的功能都会同时崩溃。
一种显而易见的反应方法是:关闭当前连接,使用连接池获取一个新连接,重新为每个活动订阅发起 LISTEN
,然后重新进入等待循环。
但是,有时候要保证状态能够被干净地重置可能会比较复杂。因此,另一种选择是遵循“让它崩溃”的思路: 当连接无法恢复时,停止程序,让程序通过正常的启动逻辑重新进入健康状态。
// 如果通知器出现不健康状态,则重启工作进程。
// 这种情况通常不会发生,因为通知器内置了一个重试机制,
// 在放弃之前会尽量保持连接状态。
notifier.AddUnhealthyCallback(closeShutdown)
在实际使用中,我们发现这种极端情况非常罕见(在一年多的使用中只发生过一次)。当它真的发生时,允许程序崩溃并重启并未引发任何额外的中断问题。
PgBouncer 的使用
在使用PgBouncer时,LISTEN
仅在会话池模式(session pooling)下支持,而不支持事务池模式(transaction pooling),因为通知只会发送给发起 LISTEN
操作的原始会话。
使用通知器模式要求应用程序为 LISTEN/NOTIFY
专门分配一个单独的连接,但应用程序的其他部分可以自由地使用 PgBouncer 的事务池模式或语句池模式,从而最大化连接使用效率。