模式
一种订阅机制, 在可观察对象事件发生时通知多个 “观察” 该对象的其他对象。中文以订阅者(观察者)和订阅对象(可观察对象)更容易理解,而发布者理解为统一的通知部门。
啊〰老师老师,有人就要问了,为什么不用Kafka?Redis?RabbitMQ?
没有为什么,Kafka、Redis、RabbitMQ都是消息队列,但观察者模式是一种更加通用的模式,可以用于非使命必达的场景。
- 发布者 (Publisher):
- 定义:当可观察对象发生变更,筛选对应的订阅者并发布他们关注的内容
- 订阅者 (Subscriber):
- 定义:除了有
update
方法,订阅者还需要实现逻辑来处理发布者的通知参数
场景
这个模式的生活场景巨多,就比如 一蓑烟雨 的博客就有文章订阅 哈哈哈
- 邮箱订阅:给感兴趣的人推送更新,当然现在不感兴趣也会被迫收到。
- 期刊订阅:小学订阅的小学生之友,还有英语老师让大家(可自愿)订阅的英语报。
- 菜市场:和老板娘说有漂亮的五花肉记得打电话给我。就是她有时候会忘记。
- 群聊通知:排除掉开启了免打扰的成员,剩下的都是订阅者。
案例
简单点
一个商品降价订阅通知,商品为小米SU7,为了能在线分享用 TypeScript 写案例分享。
以下代码点击 codesandbox 按钮即可运行。

观察者接口
定义了基本的观察者接口,有观察者的信息和可观察对象的变更回调方法update()
观察者接口1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| export interface Observer { update(product: string, price: number): void; userUUID: string; email: string; subscriptionType: SubscriptionType; discountThreshold?: number; }
export class SubscriptionType { private constructor(public readonly model: string) {}
static readonly IN_STOCK = new SubscriptionType("IN_STOCK"); static readonly DISCOUNT = new SubscriptionType("DISCOUNT"); static readonly DISCOUNT_TO = new SubscriptionType("DISCOUNT_TO");
getDescription(): string { switch (this.model) { case "IN_STOCK": return "来货通知"; case "DISCOUNT": return "降价通知"; case "DISCOUNT_TO": return "降价到预期通知"; default: return "未知订阅"; } } }
|
观察者实现
实现了观察者,增加了发送邮箱这个实际的通知方法,在update()
实现通知调用
观察者接口1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| import {logger} from "../util/Logger" import { Observer, SubscriptionType } from "./Observer";
export class UserObserver implements Observer { constructor( public userUUID: string, public email: string, public subscriptionType: SubscriptionType, public discountThreshold?: number ) {}
update(product: string, price: number): void { switch (this.subscriptionType) { case SubscriptionType.IN_STOCK: this.sendEmailNotification(`${product} 来货了!`); break; case SubscriptionType.DISCOUNT: this.sendEmailNotification(`${product} 现在已经降价至 $${price}!`); break; case SubscriptionType.DISCOUNT_TO: this.sendEmailNotification( `${product} 现在已经降价至 $${price}, 满足您期待的降价 $${ this.discountThreshold ?? 0 }% !` ); break; } }
private sendEmailNotification(message: string): void { logger.info(`发送邮件 ${this.email}: ${message}`); } }
|
可观察者接口
定义了基本的可观察者接口,主要有订阅、取消订阅、通知三要素。
可观察者接口1 2 3 4 5 6 7 8 9 10 11 12 13
| import { Observer } from "../Observer";
export interface Observable { subscribe(observer: Observer): void;
unsubscribe(observer: Observer): void;
notifyObservers(): void; }
|
可观察者实现
实现了一个商品观察对象
可观察者实现1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| import { Observable } from "./Observable"; import { Observer, SubscriptionType } from "../Observer"; import { logger } from "../../util/Logger";
export class ProductObservable implements Observable { private publishers: Observer[] = []; private currentPrice: number = 0.0; private originalPrice: number = 100.0;
constructor(private product: string) { logger.info( `创建可观察对象(商品:${product}),价格 $${this.originalPrice}` ); }
subscribe(publisher: Observer): void { this.publishers.push(publisher); logger.info( `用户UUID: ${publisher.userUUID} ,成功订阅商品 ${ this.product } ,订阅类型 ${publisher.subscriptionType.getDescription()}.` ); }
unsubscribe(publisher: Observer): void { this.publishers = this.publishers.filter( (obs) => obs.userUUID !== publisher.userUUID ); logger.info( `用户UUID: ${publisher.userUUID} ,取消订阅商品 ${this.product} ` ); }
notifyObservers(): void { for (const publisher of this.publishers) { switch (publisher.subscriptionType) { case SubscriptionType.IN_STOCK: publisher.update(this.product, this.currentPrice); break; case SubscriptionType.DISCOUNT: if (this.currentPrice < this.originalPrice) { publisher.update(this.product, this.currentPrice); } break; case SubscriptionType.DISCOUNT_TO: if (this.currentPrice <= (publisher.discountThreshold ?? 0)) { publisher.update(this.product, this.currentPrice); } break; } } } productRestocked(): void { logger.info(`商品 ${this.product} 采购成功`); this.notifyObservers(); }
productDiscounted(newPrice: number): void { this.currentPrice = newPrice; if (newPrice === this.originalPrice) { logger.info(`商品 ${this.product} 恢复原价`); } else { logger.info(`商品 ${this.product} 降价至: $${this.currentPrice}`); } this.notifyObservers(); } }
|
测试效果
创建 小米SU7 这个可观察对象
三个用户关注了 小米SU7,关注类型不一样
在 小米SU7 库存和价格变动时候可以观测到对应的通知变化
测试1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| import { ProductObservable } from "./observable/ProductObservable"; import { UserObserver } from "./UserObserver"; import { SubscriptionType } from "./Observer"; import { logger } from "../util/Logger";
export const TestObserver = () => { const su7Notifier = new ProductObservable("小米SU7");
const user1 = new UserObserver( "UUID-1111", "user1@thatcoder.cn", SubscriptionType.IN_STOCK ); const user2 = new UserObserver( "UUID-2222", "user2@thatcoder.cn", SubscriptionType.DISCOUNT ); const user3 = new UserObserver( "UUID-3333", "user3@thatcoder.cn", SubscriptionType.DISCOUNT_TO, 50 );
su7Notifier.subscribe(user1); su7Notifier.subscribe(user2); su7Notifier.subscribe(user3);
su7Notifier.productRestocked();
su7Notifier.productDiscounted(60.0);
su7Notifier.productDiscounted(100.0);
su7Notifier.productDiscounted(45.0);
su7Notifier.unsubscribe(user1);
su7Notifier.productRestocked(); };
|
测试结果
和预想一致,可观察对象只需要关注自己的变动就可以了,用户考虑的就多了(还要点击订阅)。
降价到60,所以用户3不被通知
用户1取消订阅,所以来货了也不被通知
当然这是最简单的示例
Spring监听机制
Spring有EventListener
类似去定义一个事件的处理逻辑,相当于在里面写了订阅者的通知方法。ApplicationEventPublisher
会去发布定义的事件,相当于可观察者的对象发生了变动。不同的是我们只关心发布和处理逻辑即可,中间的调用交给了Listener
。
生命周期事件
在包 org.springframework.context.event 下面有很多与 ApplicationContext
生命周期相关的事件,这些事件都继承自 ApplicationContextEvent
,包括 ContextRefreshedEvent
, ContextStartedEvent
, ContextStoppedEvent
, ContextClosedEvent
。
到了对应的生命周期会调用订阅。
启动和刷新1 2 3 4 5 6 7 8 9 10
| import org.springframework.context.ApplicationListener import org.springframework.context.event.ContextRefreshedEvent import org.springframework.stereotype.Component @Component class StartupListener : ApplicationListener<ContextRefreshedEvent> { override fun onApplicationEvent(event: ContextRefreshedEvent) { println("应用刷新成功!") } }
|
事务监听
@TransactionalEventListener
举例一个下单成功后的发布事务
事件定义1
| data class OrderPlacedEvent(val orderId: String, val userEmail: String)
|
事件处理1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| import org.springframework.context.event.TransactionalEventListener import org.springframework.stereotype.Component
@Component class OrderPlacedEventListener {
@TransactionalEventListener @Async fun handleOrderPlacedEvent(event: OrderPlacedEvent) { val orderId = event.orderId val userEmail = event.userEmail println("发送 $orderId 信息到用户邮箱 $userEmail") } }
|
事件触发1 2 3 4 5 6 7 8 9 10 11 12 13 14
| import org.springframework.context.ApplicationEventPublisher import org.springframework.stereotype.Service import org.springframework.transaction.annotation.Transactional
@Service class OrderService(private val eventPublisher: ApplicationEventPublisher) {
@Transactional fun save(order: Order) { eventPublisher.publishEvent(OrderPlacedEvent(orderId, userEmail)) } }
|
总结
优点
- 代码解耦:观察者和订阅者的逻辑分开,订阅者只引用了抽象的发布者接口,每个可观察者只需要关注自己的实现。
- 抽象耦合:如上代码解耦后逻辑上依然保持着抽象的耦合,订阅者只需要注册订阅即可
缺点
- 隐式依赖:抽象耦合就代表着事件通知机制是隐式的,系统的行为可能变得难以预测和理解。及时补充文档,不然就慢慢DEBUG。
- 瞬时峰值:某个可观察对象有大量订阅时,触发
update
带来的巨额性能开销可能会导致性能瓶颈,甚至系统阻塞。注意异步和削峰。
- 并发问题:多线程中,事件的发布和订阅者的变动可能带来并发问题。需要复杂的同步机制来确保线程安全,比如
ConcurrentModificationException
。除了线程安全的集合可能还需要考虑显式锁、读写锁或原子操作。