Golang 设计模式:观察者模式
理论
观察者模式也是一种经典的设计模式,主要适用于多对一的订阅/发布场景。
什么是多对一的订阅/发布场景:
- 有多名观察者,观察一个被观察事物
- 定义了一对多的依赖关系,当一个对象状态发生改变,其依赖者会收到通知并自动更新
这种模式的存在主要是为了降低观察者和目标之间的耦合,又被叫做发布-订阅模式。
当然,这种场景其实有很多不同的解决方案,只是设计模式提供了一种理论化规范化的解决方法。
在观察者模式中,我们需要定义三种核心的角色:
- Observer:观察者,需要关注事物动态
- Event:事物的变更事件。使用 topic 来标识事物身份和变更类型;使用 val 来表示变更详情
- EventBus:事件总线,其实就是在观察者和事物之间的代理层,负责维护管理观察者,并向其同步事物的变更事件。
EventBus 是观察者模式的核心,实现了观察者与事物的解耦:
- 观察者通过向 EventBus 完成注册操作,声明自己关心的事件类型,而不需要直接找事物要
- 事物在发生变更时只用向 EventBus 统一汇报,不用维护每个观察者的交互
- Eventbus 负责维护每个观察者和事物之间的映射,在变更事件到达时逐一通知
实现
定义一下三个核心角色:
type Event struct {
Topic string
Val interface{}
}
type Observer interface {
OnChange(ctx context.Context, e *Event) error
}
type EventBus interface {
Subscribe(topic string, o Observer)
Unsubscribe(topic string, o Observer)
Publish(ctx context.Context, e *Event)
}
观察者需要实现 OnChange 方法,用于给 EventBus 在事件发生时做回调,类似这样:
type BaseObserver struct {
name string
}
func NewBaseObserver(name string) *BaseObserver {
return &BaseObserver{
name: name,
}
}
func (b *BaseObserver) OnChange(ctx context.Context, e *Event) error {
fmt.Printf("observer: %s, event key: %s, event val: %v", b.name, e.Topic, e.Val)
// ...
return nil
}
EventBus 需要实现 Subscribe 和 Unsubscribe 方法暴露给观察者,新增和删除订阅关系:
type BaseEventBus struct {
mux sync.RWMutex
observers map[string]map[Observer]struct{}
}
func NewBaseEventBus() BaseEventBus {
return BaseEventBus{
observers: make(map[string]map[Observer]struct{}),
}
}
func (b *BaseEventBus) Subscribe(topic string, o Observer) {
b.mux.Lock()
defer b.mux.Unlock()
_, ok := b.observers[topic]
if !ok {
b.observers[topic] = make(map[Observer]struct{})
}
b.observers[topic][o] = struct{}{}
}
func (b *BaseEventBus) Unsubscribe(topic string, o Observer) {
b.mux.Lock()
defer b.mux.Unlock()
delete(b.observers[topic], o)
}
关于之后 EventBus 要如何把变更事件推送给观察者,可以分为同步和异步两种模式。
在同步模式下,EventBus 在接收到 event 时,会根据事件类型 topic 匹配到对应的观察者列表,然后串行遍历分别调用每个观察者的 OnChange 方法进行通知,并对中途的 error 进行聚合,放到 handleErr 方法统一处理:
type SyncEventBus struct {
BaseEventBus
}
func NewSyncEventBus() *SyncEventBus {
return &SyncEventBus{
BaseEventBus: NewBaseEventBus(),
}
}
func (s *SyncEventBus) Publish(ctx context.Context, e *Event) {
s.mux.RLock()
defer s.mux.RUnlock()
subscribers := s.observers[e.Topic]
errs := make(map[Observer]error)
for subscriber := range subscribers {
if err := subscriber.OnChange(ctx, e); err != nil {
errs[subscriber] = err
}
}
s.handleErr(ctx, errs)
}
在异步模式下,EventBus 会在启动的时候跑一个守护协程,专门负责处理错误。在收到 event 时,EventBus 会并发地调用 Observer.OnChange 方法通知观察者,遇到的错误统一用 channel 汇总到 handleErr 协程处理:
type observerWithErr struct {
o Observer
err error
}
type AsyncEventBus struct {
BaseEventBus
errC chan *observerWithErr
ctx context.Context
stop context.CancelFunc
}
func NewAsyncEventBus() *AsyncEventBus {
aBus := AsyncEventBus{
BaseEventBus: NewBaseEventBus(),
}
aBus.ctx, aBus.stop = context.WithCancel(context.Background())
go aBus.handleErr()
return &aBus
}
func (a *AsyncEventBus) Stop() {
a.stop()
}
func (a *AsyncEventBus) Publish(ctx context.Context, e *Event) {
a.mux.RLock()
defer a.mux.RUnlock()
subscribers := a.observers[e.Topic]
for subscriber := range subscribers {
subscriber := subscriber
go func() {
if err := subscriber.OnChange(ctx, e); err != nil {
select {
case <-a.ctx.Done():
case a.errC <- &observerWithErr{
o: subscriber,
err: err,
}:
}
}
}()
}
}
func (a *AsyncEventBus) handleErr() {
for {
select {
case <-a.ctx.Done():
return
case resp := <-a.errC:
fmt.Printf("observer: %v, err: %v", resp.o, resp.err)
}
}
}
应用
观察者模式的实现比较简单,但是它对应的场景还是非常广泛的,消息队列、各种回调等就可以采用类似的思想。
在消息队列中,EventBus 对应的就是消息队列中间件,除了基础的消息传递能力,可以为整个架构提供分布式解耦、流量削峰等等能力,用途广泛;Event 对应的就是消息,通过 topic 区分,由 producer 提供;Observer 就是消费者 consumer,对 topic 进行订阅,然后进行对应的消费逻辑。
另外在很多非关系型数据库里也有采用这种思想的回调设计,比如 etcd 和 ceph。一般非关系数据库都维护有一个监听模块,维护用户创建的 watcher,采用订阅制来维持映射关系。
总结
又是一种简单的设计模式,水一篇(
一段话总结:
观察者模式是用于多对一的订阅/发布场景的设计模式,主要思路是在观察者和被观察对象之间添加一个中间层,实现观察者和被观察对象的解耦。根据事件的通知模式,可以分为同步和异步两种方法