模式
一种订阅机制, 在可观察对象事件发生时通知多个 “观察” 该对象的其他对象。中文以订阅者(观察者)和订阅对象(可观察对象)更容易理解,而发布者理解为统一的通知部门。
啊〰老师老师,有人就要问了,为什么不用Kafka?Redis?RabbitMQ? 没有为什么,Kafka、Redis、RabbitMQ都是消息队列,但观察者模式是一种更加通用的模式,可以用于非使命必达的场景。
发布者 (Publisher):
定义:当可观察对象发生变更,筛选对应的订阅者并发布他们关注的内容
订阅者 (Subscriber):
定义:除了有update
方法,订阅者还需要实现逻辑来处理发布者的通知参数
场景
这个模式的生活场景巨多,就比如 一蓑烟雨 的博客就有文章订阅 哈哈哈
邮箱订阅:给感兴趣的人推送更新,当然现在不感兴趣也会被迫收到。
期刊订阅:小学订阅的小学生之友,还有英语老师让大家(可自愿)订阅的英语报。
菜市场:和老板娘说有漂亮的五花肉记得打电话给我。就是她有时候会忘记。
群聊通知:排除掉开启了免打扰的成员,剩下的都是订阅者。
案例 简单点
一个商品降价订阅通知,商品为小米SU7,为了能在线分享用 TypeScript 写案例分享。
以下代码点击 codesandbox 按钮即可运行。
观察者接口
定义了基本的观察者接口,有观察者的信息和可观察对象的变更回调方法update()
观察者接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 export interface Observer { update (product : string , price : number ): void ; userUUID : string ; email : string ; subscriptionType : SubscriptionType ; discountThreshold?: number ; }export class SubscriptionType { private constructor (public readonly model: string ) {} static readonly IN_STOCK = new SubscriptionType ("IN_STOCK" ); static readonly DISCOUNT = new SubscriptionType ("DISCOUNT" ); static readonly DISCOUNT_TO = new SubscriptionType ("DISCOUNT_TO" ); getDescription (): string { switch (this .model ) { case "IN_STOCK" : return "来货通知" ; case "DISCOUNT" : return "降价通知" ; case "DISCOUNT_TO" : return "降价到预期通知" ; default : return "未知订阅" ; } } }
观察者实现
实现了观察者,增加了发送邮箱这个实际的通知方法,在update()
实现通知调用
观察者接口 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import {logger} from "../util/Logger" import { Observer , SubscriptionType } from "./Observer" ;export class UserObserver implements Observer { constructor ( public userUUID: string , public email: string , public subscriptionType: SubscriptionType, public discountThreshold?: number ) {} update (product : string , price : number ): void { switch (this .subscriptionType ) { case SubscriptionType .IN_STOCK : this .sendEmailNotification (`${product} 来货了!` ); break ; case SubscriptionType .DISCOUNT : this .sendEmailNotification (`${product} 现在已经降价至 $${price} !` ); break ; case SubscriptionType .DISCOUNT_TO : this .sendEmailNotification ( `${product} 现在已经降价至 $${price} , 满足您期待的降价 $${ this .discountThreshold ?? 0 } % !` ); break ; } } private sendEmailNotification (message : string ): void { logger.info (`发送邮件 ${this .email} : ${message} ` ); } }
可观察者接口
定义了基本的可观察者接口,主要有订阅、取消订阅、通知三要素。
可观察者接口 1 2 3 4 5 6 7 8 9 10 11 12 13 import { Observer } from "../Observer" ;export interface Observable { subscribe (observer : Observer ): void ; unsubscribe (observer : Observer ): void ; notifyObservers (): void ; }
可观察者实现
实现了一个商品观察对象
可观察者实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 import { Observable } from "./Observable" ;import { Observer , SubscriptionType } from "../Observer" ;import { logger } from "../../util/Logger" ;export class ProductObservable implements Observable { private publishers : Observer [] = []; private currentPrice : number = 0.0 ; private originalPrice : number = 100.0 ; constructor (private product: string ) { logger.info ( `创建可观察对象(商品:${product} ),价格 $${this .originalPrice} ` ); } subscribe (publisher : Observer ): void { this .publishers .push (publisher); logger.info ( `用户UUID: ${publisher.userUUID} ,成功订阅商品 ${ this .product } ,订阅类型 ${publisher.subscriptionType.getDescription()} .` ); } unsubscribe (publisher : Observer ): void { this .publishers = this .publishers .filter ( (obs ) => obs.userUUID !== publisher.userUUID ); logger.info ( `用户UUID: ${publisher.userUUID} ,取消订阅商品 ${this .product} ` ); } notifyObservers (): void { for (const publisher of this .publishers ) { switch (publisher.subscriptionType ) { case SubscriptionType .IN_STOCK : publisher.update (this .product , this .currentPrice ); break ; case SubscriptionType .DISCOUNT : if (this .currentPrice < this .originalPrice ) { publisher.update (this .product , this .currentPrice ); } break ; case SubscriptionType .DISCOUNT_TO : if (this .currentPrice <= (publisher.discountThreshold ?? 0 )) { publisher.update (this .product , this .currentPrice ); } break ; } } } productRestocked (): void { logger.info (`商品 ${this .product} 采购成功` ); this .notifyObservers (); } productDiscounted (newPrice : number ): void { this .currentPrice = newPrice; if (newPrice === this .originalPrice ) { logger.info (`商品 ${this .product} 恢复原价` ); } else { logger.info (`商品 ${this .product} 降价至: $${this .currentPrice} ` ); } this .notifyObservers (); } }
测试效果
创建 小米SU7 这个可观察对象 三个用户关注了 小米SU7,关注类型不一样 在 小米SU7 库存和价格变动时候可以观测到对应的通知变化
测试 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 import { ProductObservable } from "./observable/ProductObservable" ;import { UserObserver } from "./UserObserver" ;import { SubscriptionType } from "./Observer" ;import { logger } from "../util/Logger" ;export const TestObserver = ( ) => { const su7Notifier = new ProductObservable ("小米SU7" ); const user1 = new UserObserver ( "UUID-1111" , "user1@thatcoder.cn" , SubscriptionType .IN_STOCK ); const user2 = new UserObserver ( "UUID-2222" , "user2@thatcoder.cn" , SubscriptionType .DISCOUNT ); const user3 = new UserObserver ( "UUID-3333" , "user3@thatcoder.cn" , SubscriptionType .DISCOUNT_TO , 50 ); su7Notifier.subscribe (user1); su7Notifier.subscribe (user2); su7Notifier.subscribe (user3); su7Notifier.productRestocked (); su7Notifier.productDiscounted (60.0 ); su7Notifier.productDiscounted (100.0 ); su7Notifier.productDiscounted (45.0 ); su7Notifier.unsubscribe (user1); su7Notifier.productRestocked (); };
测试结果 和预想一致,可观察对象只需要关注自己的变动就可以了,用户考虑的就多了(还要点击订阅)。 降价到60,所以用户3不被通知 用户1取消订阅,所以来货了也不被通知 当然这是最简单的示例
Spring监听机制 Spring有EventListener
类似去定义一个事件的处理逻辑,相当于在里面写了订阅者的通知方法。ApplicationEventPublisher
会去发布定义的事件,相当于可观察者的对象发生了变动。不同的是我们只关心发布和处理逻辑即可,中间的调用交给了Listener
。
生命周期事件 在包 org.springframework.context.event 下面有很多与 ApplicationContext
生命周期相关的事件,这些事件都继承自 ApplicationContextEvent
,包括 ContextRefreshedEvent
, ContextStartedEvent
, ContextStoppedEvent
, ContextClosedEvent
。 到了对应的生命周期会调用订阅。
启动和刷新 1 2 3 4 5 6 7 8 9 10 import org.springframework.context.ApplicationListenerimport org.springframework.context.event.ContextRefreshedEvent import org.springframework.stereotype.Component @Component class StartupListener : ApplicationListener <ContextRefreshedEvent > { override fun onApplicationEvent (event: ContextRefreshedEvent ) { println("应用刷新成功!" ) } }
事务监听
@TransactionalEventListener 举例一个下单成功后的发布事务
事件定义 1 data class OrderPlacedEvent (val orderId: String, val userEmail: String)
事件处理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import org.springframework.context.event.TransactionalEventListenerimport org.springframework.stereotype.Component@Component class OrderPlacedEventListener { @TransactionalEventListener @Async fun handleOrderPlacedEvent (event: OrderPlacedEvent ) { val orderId = event.orderId val userEmail = event.userEmail println("发送 $orderId 信息到用户邮箱 $userEmail " ) } }
事件触发 1 2 3 4 5 6 7 8 9 10 11 12 13 14 import org.springframework.context.ApplicationEventPublisherimport org.springframework.stereotype.Serviceimport org.springframework.transaction.annotation .Transactional@Service class OrderService (private val eventPublisher: ApplicationEventPublisher) { @Transactional fun save (order: Order ) { eventPublisher.publishEvent(OrderPlacedEvent(orderId, userEmail)) } }
总结 优点
代码解耦 :观察者和订阅者的逻辑分开,订阅者只引用了抽象的发布者接口,每个可观察者只需要关注自己的实现。
抽象耦合 :如上代码解耦后逻辑上依然保持着抽象的耦合,订阅者只需要注册订阅即可
缺点
隐式依赖 :抽象耦合就代表着事件通知机制是隐式的,系统的行为可能变得难以预测和理解。及时补充文档,不然就慢慢DEBUG。
瞬时峰值 :某个可观察对象有大量订阅时,触发update
带来的巨额性能开销可能会导致性能瓶颈,甚至系统阻塞。注意异步和削峰。
并发问题 :多线程中,事件的发布和订阅者的变动可能带来并发问题。需要复杂的同步机制来确保线程安全,比如ConcurrentModificationException
。除了线程安全的集合可能还需要考虑显式锁、读写锁或原子操作。