Golang 设计模式:观察者模式

Golang 设计模式:观察者模式

tk_sky 38 2024-02-25

Golang 设计模式:观察者模式

理论

观察者模式也是一种经典的设计模式,主要适用于多对一的订阅/发布场景。

什么是多对一的订阅/发布场景:

  • 有多名观察者,观察一个被观察事物
  • 定义了一对多的依赖关系,当一个对象状态发生改变,其依赖者会收到通知并自动更新

这种模式的存在主要是为了降低观察者和目标之间的耦合,又被叫做发布-订阅模式。

当然,这种场景其实有很多不同的解决方案,只是设计模式提供了一种理论化规范化的解决方法。

在观察者模式中,我们需要定义三种核心的角色:

  • Observer:观察者,需要关注事物动态
  • Event:事物的变更事件。使用 topic 来标识事物身份和变更类型;使用 val 来表示变更详情
  • EventBus:事件总线,其实就是在观察者和事物之间的代理层,负责维护管理观察者,并向其同步事物的变更事件。

EventBus 是观察者模式的核心,实现了观察者与事物的解耦:

  • 观察者通过向 EventBus 完成注册操作,声明自己关心的事件类型,而不需要直接找事物要
  • 事物在发生变更时只用向 EventBus 统一汇报,不用维护每个观察者的交互
  • Eventbus 负责维护每个观察者和事物之间的映射,在变更事件到达时逐一通知

实现

定义一下三个核心角色:

type Event struct {
    Topic string
    Val   interface{}
}

type Observer interface {
    OnChange(ctx context.Context, e *Event) error
}

type EventBus interface {
    Subscribe(topic string, o Observer)
    Unsubscribe(topic string, o Observer)
    Publish(ctx context.Context, e *Event)
}

观察者需要实现 OnChange 方法,用于给 EventBus 在事件发生时做回调,类似这样:

type BaseObserver struct {
    name string
}

func NewBaseObserver(name string) *BaseObserver {
    return &BaseObserver{
        name: name,
    }
}

func (b *BaseObserver) OnChange(ctx context.Context, e *Event) error {
    fmt.Printf("observer: %s, event key: %s, event val: %v", b.name, e.Topic, e.Val)
    // ...
    return nil
}

EventBus 需要实现 Subscribe 和 Unsubscribe 方法暴露给观察者,新增和删除订阅关系:

type BaseEventBus struct {
    mux       sync.RWMutex
    observers map[string]map[Observer]struct{}
}

func NewBaseEventBus() BaseEventBus {
    return BaseEventBus{
        observers: make(map[string]map[Observer]struct{}),
    }
}

func (b *BaseEventBus) Subscribe(topic string, o Observer) {
    b.mux.Lock()
    defer b.mux.Unlock()
    _, ok := b.observers[topic]
    if !ok {
        b.observers[topic] = make(map[Observer]struct{})
    }
    b.observers[topic][o] = struct{}{}
}

func (b *BaseEventBus) Unsubscribe(topic string, o Observer) {
    b.mux.Lock()
    defer b.mux.Unlock()
    delete(b.observers[topic], o)
}

关于之后 EventBus 要如何把变更事件推送给观察者,可以分为同步和异步两种模式。

在同步模式下,EventBus 在接收到 event 时,会根据事件类型 topic 匹配到对应的观察者列表,然后串行遍历分别调用每个观察者的 OnChange 方法进行通知,并对中途的 error 进行聚合,放到 handleErr 方法统一处理:

type SyncEventBus struct {
    BaseEventBus
}

func NewSyncEventBus() *SyncEventBus {
    return &SyncEventBus{
        BaseEventBus: NewBaseEventBus(),
    }
}

func (s *SyncEventBus) Publish(ctx context.Context, e *Event) {
    s.mux.RLock()
    defer s.mux.RUnlock()
    subscribers := s.observers[e.Topic]

    errs := make(map[Observer]error)
    for subscriber := range subscribers {
        if err := subscriber.OnChange(ctx, e); err != nil {
            errs[subscriber] = err
        }
    }
    s.handleErr(ctx, errs)
}

在异步模式下,EventBus 会在启动的时候跑一个守护协程,专门负责处理错误。在收到 event 时,EventBus 会并发地调用 Observer.OnChange 方法通知观察者,遇到的错误统一用 channel 汇总到 handleErr 协程处理:

type observerWithErr struct {
    o   Observer
    err error
}

type AsyncEventBus struct {
    BaseEventBus
    errC chan *observerWithErr
    ctx  context.Context
    stop context.CancelFunc
}

func NewAsyncEventBus() *AsyncEventBus {
    aBus := AsyncEventBus{
        BaseEventBus: NewBaseEventBus(),
    }
    aBus.ctx, aBus.stop = context.WithCancel(context.Background())
    go aBus.handleErr()
    return &aBus
}

func (a *AsyncEventBus) Stop() {
    a.stop()
}

func (a *AsyncEventBus) Publish(ctx context.Context, e *Event) {
    a.mux.RLock()
    defer a.mux.RUnlock()
    subscribers := a.observers[e.Topic]
    for subscriber := range subscribers {
        subscriber := subscriber
        go func() {
            if err := subscriber.OnChange(ctx, e); err != nil {
                select {
                case <-a.ctx.Done():
                case a.errC <- &observerWithErr{
                    o:   subscriber,
                    err: err,
                }:
                }
            }
        }()
    }
}

func (a *AsyncEventBus) handleErr() {
    for {
        select {
        case <-a.ctx.Done():
            return
        case resp := <-a.errC:
            fmt.Printf("observer: %v, err: %v", resp.o, resp.err)
        }
    }
}

应用

观察者模式的实现比较简单,但是它对应的场景还是非常广泛的,消息队列、各种回调等就可以采用类似的思想。

在消息队列中,EventBus 对应的就是消息队列中间件,除了基础的消息传递能力,可以为整个架构提供分布式解耦、流量削峰等等能力,用途广泛;Event 对应的就是消息,通过 topic 区分,由 producer 提供;Observer 就是消费者 consumer,对 topic 进行订阅,然后进行对应的消费逻辑。

另外在很多非关系型数据库里也有采用这种思想的回调设计,比如 etcd 和 ceph。一般非关系数据库都维护有一个监听模块,维护用户创建的 watcher,采用订阅制来维持映射关系。

总结

又是一种简单的设计模式,水一篇(

一段话总结:

观察者模式是用于多对一的订阅/发布场景的设计模式,主要思路是在观察者和被观察对象之间添加一个中间层,实现观察者和被观察对象的解耦。根据事件的通知模式,可以分为同步和异步两种方法