设计模式-17观察者模式Observer Pattern
·
观察者模式 (Observer Pattern)
概述 (Overview)
观察者模式是一种行为型设计模式,它定义了对象之间的一对多依赖关系,使得当一个对象改变状态时,所有依赖它的对象都会自动收到通知并更新。
Observer Pattern is a behavioral design pattern that defines a one-to-many dependency between objects so that when one object changes state, all of its dependents are notified and updated automatically.
意图 (Intent)
定义对象之间的一对多依赖关系,使得当一个对象改变状态时,所有依赖它的对象都会自动收到通知并更新。
Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically.
适用场景 (When to Use)
- 当需要实现事件处理系统时
- 当需要将对象状态的改变通知给多个其他对象时
- 当需要实现发布-订阅机制时
- 当需要实现模型-视图分离时
- 当需要支持广播通信时
- 当需要实现松耦合的系统架构时
结构 (Structure)
基础观察者模式结构图 (Basic Observer Pattern Structure)
实现方式 (Implementation Approaches)
1. 经典观察者模式 (Classic Observer Pattern)
// 主题接口 - Subject Interface
public interface Subject {
void attach(Observer observer);
void detach(Observer observer);
void notifyObservers();
}
// 观察者接口 - Observer Interface
public interface Observer {
void update(Subject subject);
}
// 具体主题 - Concrete Subject
public class WeatherStation implements Subject {
private float temperature;
private float humidity;
private float pressure;
private List<Observer> observers;
public WeatherStation() {
this.observers = new ArrayList<>();
}
@Override
public void attach(Observer observer) {
if (!observers.contains(observer)) {
observers.add(observer);
System.out.println("添加观察者: " + observer.getClass().getSimpleName());
}
}
@Override
public void detach(Observer observer) {
if (observers.remove(observer)) {
System.out.println("移除观察者: " + observer.getClass().getSimpleName());
}
}
@Override
public void notifyObservers() {
System.out.println("\n通知所有观察者天气数据更新:");
for (Observer observer : observers) {
observer.update(this);
}
}
// 当气象数据改变时,通知观察者
public void setMeasurements(float temperature, float humidity, float pressure) {
System.out.println("\n=== 气象站更新数据 ===");
this.temperature = temperature;
this.humidity = humidity;
this.pressure = pressure;
System.out.printf("温度: %.1f°C, 湿度: %.1f%%, 气压: %.1f hPa%n",
temperature, humidity, pressure);
notifyObservers();
}
// 获取当前气象数据
public float getTemperature() {
return temperature;
}
public float getHumidity() {
return humidity;
}
public float getPressure() {
return pressure;
}
}
// 具体观察者 - Concrete Observers
public class CurrentConditionsDisplay implements Observer {
private float temperature;
private float humidity;
@Override
public void update(Subject subject) {
if (subject instanceof WeatherStation) {
WeatherStation weatherStation = (WeatherStation) subject;
this.temperature = weatherStation.getTemperature();
this.humidity = weatherStation.getHumidity();
display();
}
}
public void display() {
System.out.println("当前天气显示:");
System.out.printf(" 当前温度: %.1f°C%n", temperature);
System.out.printf(" 当前湿度: %.1f%%%n", humidity);
}
}
public class StatisticsDisplay implements Observer {
private List<Float> temperatures = new ArrayList<>();
private List<Float> humidities = new ArrayList<>();
private List<Float> pressures = new ArrayList<>();
@Override
public void update(Subject subject) {
if (subject instanceof WeatherStation) {
WeatherStation weatherStation = (WeatherStation) subject;
temperatures.add(weatherStation.getTemperature());
humidities.add(weatherStation.getHumidity());
pressures.add(weatherStation.getPressure());
display();
}
}
public void display() {
System.out.println("气象统计显示:");
if (!temperatures.isEmpty()) {
System.out.printf(" 平均温度: %.1f°C%n",
temperatures.stream().mapToDouble(Float::doubleValue).average().orElse(0));
System.out.printf(" 最高温度: %.1f°C%n",
temperatures.stream().max(Float::compareTo).orElse(0f));
System.out.printf(" 最低温度: %.1f°C%n",
temperatures.stream().min(Float::compareTo).orElse(0f));
}
}
public void clearStatistics() {
temperatures.clear();
humidities.clear();
pressures.clear();
System.out.println("统计数据已清空");
}
}
public class ForecastDisplay implements Observer {
private float currentPressure = 1013.0f; // 标准大气压
private float lastPressure;
@Override
public void update(Subject subject) {
if (subject instanceof WeatherStation) {
WeatherStation weatherStation = (WeatherStation) subject;
lastPressure = currentPressure;
currentPressure = weatherStation.getPressure();
display();
}
}
public void display() {
System.out.println("天气预报显示:");
System.out.printf(" 当前气压: %.1f hPa%n", currentPressure);
if (currentPressure > lastPressure) {
System.out.println(" 预报: 天气正在改善");
} else if (currentPressure == lastPressure) {
System.out.println(" 预报: 天气保持稳定");
} else {
System.out.println(" 预报: 天气可能转差");
}
}
}
// 客户端使用 - Client Usage
public class WeatherStationDemo {
public static void main(String[] args) {
System.out.println("=== 气象站观察者模式演示 ===");
// 创建气象站(主题)
WeatherStation weatherStation = new WeatherStation();
// 创建显示设备(观察者)
CurrentConditionsDisplay currentDisplay = new CurrentConditionsDisplay();
StatisticsDisplay statisticsDisplay = new StatisticsDisplay();
ForecastDisplay forecastDisplay = new ForecastDisplay();
// 注册观察者
weatherStation.attach(currentDisplay);
weatherStation.attach(statisticsDisplay);
weatherStation.attach(forecastDisplay);
// 模拟气象数据更新
System.out.println("\n--- 第一次数据更新 ---");
weatherStation.setMeasurements(25.0f, 65.0f, 1013.0f);
System.out.println("\n--- 第二次数据更新 ---");
weatherStation.setMeasurements(26.5f, 70.0f, 1012.0f);
System.out.println("\n--- 第三次数据更新 ---");
weatherStation.setMeasurements(24.0f, 90.0f, 1011.0f);
// 移除一个观察者
System.out.println("\n--- 移除统计显示器 ---");
weatherStation.detach(statisticsDisplay);
System.out.println("\n--- 第四次数据更新 ---");
weatherStation.setMeasurements(23.0f, 85.0f, 1010.0f);
System.out.println("\n=== 演示完成 ===");
}
}
2. 推模型观察者模式 (Push Model Observer Pattern)
// 推模型主题接口 - Push Model Subject Interface
public interface PushSubject {
void attach(PushObserver observer);
void detach(PushObserver observer);
void notifyObservers(Map<String, Object> data);
}
// 推模型观察者接口 - Push Model Observer Interface
public interface PushObserver {
void update(Map<String, Object> data);
}
// 股票数据主题 - Stock Data Subject
public class StockData implements PushSubject {
private String symbol;
private double price;
private int volume;
private double change;
private List<PushObserver> observers;
public StockData(String symbol) {
this.symbol = symbol;
this.observers = new ArrayList<>();
}
@Override
public void attach(PushObserver observer) {
observers.add(observer);
System.out.println("添加股票观察者: " + observer.getClass().getSimpleName());
}
@Override
public void detach(PushObserver observer) {
observers.remove(observer);
System.out.println("移除股票观察者: " + observer.getClass().getSimpleName());
}
@Override
public void notifyObservers(Map<String, Object> data) {
System.out.println("\n推送股票数据给所有观察者:");
for (PushObserver observer : observers) {
observer.update(data);
}
}
public void setStockData(double price, int volume, double change) {
System.out.println("\n=== 股票数据更新 ===");
this.price = price;
this.volume = volume;
this.change = change;
// 创建推送数据
Map<String, Object> data = new HashMap<>();
data.put("symbol", symbol);
data.put("price", price);
data.put("volume", volume);
data.put("change", change);
data.put("changePercent", (change / (price - change)) * 100);
data.put("timestamp", new Date());
System.out.printf("股票 %s: 价格=%.2f, 成交量=%d, 涨跌=%.2f%n",
symbol, price, volume, change);
notifyObservers(data);
}
}
// 具体推模型观察者 - Concrete Push Model Observers
public class PriceAlert implements PushObserver {
private double alertThreshold;
public PriceAlert(double alertThreshold) {
this.alertThreshold = alertThreshold;
}
@Override
public void update(Map<String, Object> data) {
Double price = (Double) data.get("price");
String symbol = (String) data.get("symbol");
System.out.println("价格警报:");
System.out.printf(" 股票 %s 当前价格: %.2f%n", symbol, price);
if (price >= alertThreshold) {
System.out.printf(" ⚠️ 警告: 价格超过阈值 %.2f%n", alertThreshold);
} else {
System.out.printf(" ✅ 正常: 价格在阈值以下%n");
}
}
}
public class VolumeAnalyzer implements PushObserver {
private static final int HIGH_VOLUME_THRESHOLD = 1000000;
@Override
public void update(Map<String, Object> data) {
Integer volume = (Integer) data.get("volume");
String symbol = (String) data.get("symbol");
System.out.println("成交量分析:");
System.out.printf(" 股票 %s 成交量: %d%n", symbol, volume);
if (volume > HIGH_VOLUME_THRESHOLD) {
System.out.println(" 📈 高成交量: 市场活跃");
} else if (volume > HIGH_VOLUME_THRESHOLD / 2) {
System.out.println(" 📊 中等成交量: 正常交易");
} else {
System.out.println(" 📉 低成交量: 交易清淡");
}
}
}
public class TrendIndicator implements PushObserver {
private List<Double> priceHistory = new ArrayList<>();
private static final int HISTORY_SIZE = 5;
@Override
public void update(Map<String, Object> data) {
Double price = (Double) data.get("price");
Double change = (Double) data.get("change");
String symbol = (String) data.get("symbol");
priceHistory.add(price);
if (priceHistory.size() > HISTORY_SIZE) {
priceHistory.remove(0);
}
System.out.println("趋势指标:");
System.out.printf(" 股票 %s 当前价格: %.2f, 变化: %.2f%n", symbol, price, change);
if (priceHistory.size() >= HISTORY_SIZE) {
boolean isUpTrend = true;
boolean isDownTrend = true;
for (int i = 1; i < priceHistory.size(); i++) {
if (priceHistory.get(i) <= priceHistory.get(i-1)) {
isUpTrend = false;
}
if (priceHistory.get(i) >= priceHistory.get(i-1)) {
isDownTrend = false;
}
}
if (isUpTrend) {
System.out.println(" 📈 上升趋势: 价格连续上涨");
} else if (isDownTrend) {
System.out.println(" 📉 下降趋势: 价格连续下跌");
} else {
System.out.println(" 📊 震荡趋势: 价格波动");
}
}
}
}
// 客户端使用 - Client Usage
public class StockMarketDemo {
public static void main(String[] args) {
System.out.println("=== 股票市场观察者模式演示(推模型)===");
// 创建股票数据主题
StockData appleStock = new StockData("AAPL");
StockData googleStock = new StockData("GOOGL");
// 创建观察者
PriceAlert priceAlert = new PriceAlert(150.0);
VolumeAnalyzer volumeAnalyzer = new VolumeAnalyzer();
TrendIndicator trendIndicator = new TrendIndicator();
// 注册观察者
appleStock.attach(priceAlert);
appleStock.attach(volumeAnalyzer);
appleStock.attach(trendIndicator);
googleStock.attach(priceAlert);
googleStock.attach(volumeAnalyzer);
// 模拟股票数据更新
System.out.println("\n--- 苹果股票数据更新 ---");
appleStock.setStockData(145.30, 2500000, 2.50);
appleStock.setStockData(147.80, 2800000, 2.50);
appleStock.setStockData(149.20, 3200000, 1.40);
appleStock.setStockData(151.50, 3500000, 2.30);
appleStock.setStockData(153.80, 3800000, 2.30);
System.out.println("\n--- 谷歌股票数据更新 ---");
googleStock.setStockData(2750.50, 1800000, -15.20);
googleStock.setStockData(2745.30, 1900000, -5.20);
googleStock.setStockData(2740.80, 1700000, -4.50);
System.out.println("\n=== 推模型演示完成 ===");
}
}
3. 事件总线观察者模式 (Event Bus Observer Pattern)
// 事件类型枚举 - Event Type Enum
public enum EventType {
USER_LOGIN, USER_LOGOUT, USER_REGISTER,
ORDER_CREATED, ORDER_PAID, ORDER_SHIPPED,
SYSTEM_ALERT, SYSTEM_MAINTENANCE
}
// 事件类 - Event Class
public class Event {
private final EventType type;
private final Object data;
private final String source;
private final Date timestamp;
public Event(EventType type, Object data, String source) {
this.type = type;
this.data = data;
this.source = source;
this.timestamp = new Date();
}
// Getters
public EventType getType() { return type; }
public Object getData() { return data; }
public String getSource() { return source; }
public Date getTimestamp() { return timestamp; }
@Override
public String toString() {
return String.format("Event{type=%s, source='%s', time=%s}", type, source, timestamp);
}
}
// 事件监听器接口 - Event Listener Interface
public interface EventListener {
void onEvent(Event event);
boolean supports(EventType eventType);
}
// 具体事件监听器 - Concrete Event Listeners
public class UserEventListener implements EventListener {
@Override
public void onEvent(Event event) {
System.out.println("用户事件监听器收到事件: " + event);
switch (event.getType()) {
case USER_LOGIN:
handleUserLogin(event);
break;
case USER_LOGOUT:
handleUserLogout(event);
break;
case USER_REGISTER:
handleUserRegister(event);
break;
default:
break;
}
}
@Override
public boolean supports(EventType eventType) {
return eventType == EventType.USER_LOGIN ||
eventType == EventType.USER_LOGOUT ||
eventType == EventType.USER_REGISTER;
}
private void handleUserLogin(Event event) {
System.out.println("处理用户登录: " + event.getData());
// 可以在这里添加登录逻辑,如记录登录时间、更新用户状态等
}
private void handleUserLogout(Event event) {
System.out.println("处理用户登出: " + event.getData());
// 可以在这里添加登出逻辑,如清理会话、记录登出时间等
}
private void handleUserRegister(Event event) {
System.out.println("处理用户注册: " + event.getData());
// 可以在这里添加注册逻辑,如发送欢迎邮件、初始化用户数据等
}
}
public class OrderEventListener implements EventListener {
@Override
public void onEvent(Event event) {
System.out.println("订单事件监听器收到事件: " + event);
switch (event.getType()) {
case ORDER_CREATED:
handleOrderCreated(event);
break;
case ORDER_PAID:
handleOrderPaid(event);
break;
case ORDER_SHIPPED:
handleOrderShipped(event);
break;
default:
break;
}
}
@Override
public boolean supports(EventType eventType) {
return eventType == EventType.ORDER_CREATED ||
eventType == EventType.ORDER_PAID ||
eventType == EventType.ORDER_SHIPPED;
}
private void handleOrderCreated(Event event) {
System.out.println("处理订单创建: " + event.getData());
// 可以在这里添加订单创建逻辑,如库存检查、发送确认邮件等
}
private void handleOrderPaid(Event event) {
System.out.println("处理订单支付: " + event.getData());
// 可以在这里添加支付逻辑,如更新订单状态、安排发货等
}
private void handleOrderShipped(Event event) {
System.out.println("处理订单发货: " + event.getData());
// 可以在这里添加发货逻辑,如更新物流信息、发送发货通知等
}
}
public class SystemEventListener implements EventListener {
@Override
public void onEvent(Event event) {
System.out.println("系统事件监听器收到事件: " + event);
switch (event.getType()) {
case SYSTEM_ALERT:
handleSystemAlert(event);
break;
case SYSTEM_MAINTENANCE:
handleSystemMaintenance(event);
break;
default:
break;
}
}
@Override
public boolean supports(EventType eventType) {
return eventType == EventType.SYSTEM_ALERT ||
eventType == EventType.SYSTEM_MAINTENANCE;
}
private void handleSystemAlert(Event event) {
System.out.println("🚨 系统警报: " + event.getData());
// 可以在这里添加警报处理逻辑,如发送通知、记录日志等
}
private void handleSystemMaintenance(Event event) {
System.out.println("🔧 系统维护: " + event.getData());
// 可以在这里添加维护处理逻辑,如通知用户、暂停服务等
}
}
// 事件总线(观察者模式实现)- Event Bus (Observer Pattern Implementation)
public class EventBus implements Subject {
private final Map<EventType, List<EventListener>> listeners;
private final String busName;
private boolean asyncProcessing;
private ExecutorService executorService;
public EventBus(String busName, boolean asyncProcessing) {
this.busName = busName;
this.asyncProcessing = asyncProcessing;
this.listeners = new ConcurrentHashMap<>();
// 初始化事件类型映射
for (EventType type : EventType.values()) {
listeners.put(type, new CopyOnWriteArrayList<>());
}
if (asyncProcessing) {
this.executorService = Executors.newCachedThreadPool();
}
}
@Override
public void attach(Observer observer) {
if (observer instanceof EventListener) {
EventListener listener = (EventListener) observer;
for (EventType type : EventType.values()) {
if (listener.supports(type)) {
listeners.get(type).add(listener);
}
}
System.out.println("事件监听器已注册: " + listener.getClass().getSimpleName());
}
}
@Override
public void detach(Observer observer) {
if (observer instanceof EventListener) {
EventListener listener = (EventListener) observer;
for (EventType type : EventType.values()) {
listeners.get(type).remove(listener);
}
System.out.println("事件监听器已移除: " + listener.getClass().getSimpleName());
}
}
@Override
public void notifyObservers() {
// 事件总线不实现此方法,使用publishEvent代替
}
public void publishEvent(Event event) {
System.out.println("\n[" + busName + "] 发布事件: " + event);
List<EventListener> eventListeners = listeners.get(event.getType());
if (asyncProcessing) {
// 异步处理事件
for (EventListener listener : eventListeners) {
executorService.submit(() -> {
try {
listener.onEvent(event);
} catch (Exception e) {
System.err.println("事件监听器处理失败: " + e.getMessage());
}
});
}
} else {
// 同步处理事件
for (EventListener listener : eventListeners) {
try {
listener.onEvent(event);
} catch (Exception e) {
System.err.println("事件监听器处理失败: " + e.getMessage());
}
}
}
}
public void shutdown() {
if (executorService != null) {
executorService.shutdown();
try {
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
}
// 事件源 - Event Sources
public class UserService {
private EventBus eventBus;
public UserService(EventBus eventBus) {
this.eventBus = eventBus;
}
public void userLogin(String username) {
System.out.println("\n用户服务:处理用户登录 - " + username);
Event event = new Event(EventType.USER_LOGIN, username, "UserService");
eventBus.publishEvent(event);
}
public void userLogout(String username) {
System.out.println("\n用户服务:处理用户登出 - " + username);
Event event = new Event(EventType.USER_LOGOUT, username, "UserService");
eventBus.publishEvent(event);
}
public void userRegister(String username, String email) {
System.out.println("\n用户服务:处理用户注册 - " + username);
Map<String, String> userData = new HashMap<>();
userData.put("username", username);
userData.put("email", email);
Event event = new Event(EventType.USER_REGISTER, userData, "UserService");
eventBus.publishEvent(event);
}
}
public class OrderService {
private EventBus eventBus;
public OrderService(EventBus eventBus) {
this.eventBus = eventBus;
}
public void createOrder(String orderId, String customerName, BigDecimal amount) {
System.out.println("\n订单服务:创建订单 - " + orderId);
Map<String, Object> orderData = new HashMap<>();
orderData.put("orderId", orderId);
orderData.put("customerName", customerName);
orderData.put("amount", amount);
Event event = new Event(EventType.ORDER_CREATED, orderData, "OrderService");
eventBus.publishEvent(event);
}
public void payOrder(String orderId) {
System.out.println("\n订单服务:支付订单 - " + orderId);
Event event = new Event(EventType.ORDER_PAID, orderId, "OrderService");
eventBus.publishEvent(event);
}
public void shipOrder(String orderId) {
System.out.println("\n订单服务:发货订单 - " + orderId);
Event event = new Event(EventType.ORDER_SHIPPED, orderId, "OrderService");
eventBus.publishEvent(event);
}
}
public class SystemMonitor {
private EventBus eventBus;
public SystemMonitor(EventBus eventBus) {
this.eventBus = eventBus;
}
public void sendAlert(String message) {
System.out.println("\n系统监控:发送警报");
Event event = new Event(EventType.SYSTEM_ALERT, message, "SystemMonitor");
eventBus.publishEvent(event);
}
public void startMaintenance(String message) {
System.out.println("\n系统监控:开始维护");
Event event = new Event(EventType.SYSTEM_MAINTENANCE, message, "SystemMonitor");
eventBus.publishEvent(event);
}
}
// 客户端使用 - Client Usage
public class EventBusObserverDemo {
public static void main(String[] args) {
System.out.println("=== 事件总线观察者模式演示 ===");
// 创建事件总线(主题)
EventBus eventBus = new EventBus("MainEventBus", true); // 使用异步处理
// 创建事件监听器(观察者)
EventListener userListener = new UserEventListener();
EventListener orderListener = new OrderEventListener();
EventListener systemListener = new SystemEventListener();
// 注册监听器
eventBus.attach(userListener);
eventBus.attach(orderListener);
eventBus.attach(systemListener);
// 创建事件源
UserService userService = new UserService(eventBus);
OrderService orderService = new OrderService(eventBus);
SystemMonitor systemMonitor = new SystemMonitor(eventBus);
// 模拟各种业务操作
System.out.println("\n--- 模拟用户注册 ---");
userService.userRegister("张三", "zhangsan@example.com");
System.out.println("\n--- 模拟用户登录 ---");
userService.userLogin("张三");
System.out.println("\n--- 模拟订单流程 ---");
orderService.createOrder("ORDER-001", "张三", new BigDecimal("299.99"));
orderService.payOrder("ORDER-001");
orderService.shipOrder("ORDER-001");
System.out.println("\n--- 模拟系统事件 ---");
systemMonitor.sendAlert("CPU使用率过高");
systemMonitor.startMaintenance("系统将于今晚进行维护");
System.out.println("\n--- 模拟用户登出 ---");
userService.userLogout("张三");
// 等待异步处理完成
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 清理资源
eventBus.shutdown();
System.out.println("\n=== 事件总线观察者模式演示完成 ===");
}
}
框架源码中的应用 (Framework Applications)
1. Java Observable - 可观察类
// Java Observable类中的观察者模式应用
// Java Observable Observer Pattern Application
// 可观察的股票数据 - Observable Stock Data
public class ObservableStockData extends Observable {
private String symbol;
private double price;
private int volume;
private double change;
public ObservableStockData(String symbol) {
this.symbol = symbol;
}
public void setStockData(double price, int volume, double change) {
System.out.println("\n=== 可观察股票数据更新 ===");
// 保存旧值用于比较
Double oldPrice = this.price;
Integer oldVolume = this.volume;
Double oldChange = this.change;
this.price = price;
this.volume = volume;
this.change = change;
System.out.printf("股票 %s: 价格=%.2f, 成交量=%d, 涨跌=%.2f%n",
symbol, price, volume, change);
// 设置变化标志
setChanged();
// 创建通知数据
Map<String, Object> updateData = new HashMap<>();
updateData.put("symbol", symbol);
updateData.put("price", price);
updateData.put("volume", volume);
updateData.put("change", change);
updateData.put("changePercent", (change / (price - change)) * 100);
updateData.put("oldPrice", oldPrice);
updateData.put("oldVolume", oldVolume);
updateData.put("oldChange", oldChange);
// 通知观察者
notifyObservers(updateData);
}
// 获取当前股票数据
public String getSymbol() { return symbol; }
public double getPrice() { return price; }
public int getVolume() { return volume; }
public double getChange() { return change; }
}
// 股票观察者 - Stock Observer
public class StockObserver implements Observer {
private String name;
public StockObserver(String name) {
this.name = name;
}
@Override
public void update(Observable o, Object arg) {
if (o instanceof ObservableStockData && arg instanceof Map) {
ObservableStockData stockData = (ObservableStockData) o;
@SuppressWarnings("unchecked")
Map<String, Object> data = (Map<String, Object>) arg;
System.out.println("[" + name + "] 收到股票数据更新:");
System.out.printf(" 股票: %s, 价格: %.2f, 成交量: %d%n",
stockData.getSymbol(), stockData.getPrice(), stockData.getVolume());
// 分析价格变化
Double oldPrice = (Double) data.get("oldPrice");
if (oldPrice != null && oldPrice > 0) {
double priceChange = stockData.getPrice() - oldPrice;
double changePercent = (priceChange / oldPrice) * 100;
System.out.printf(" 价格变化: %.2f (%.2f%%)%n", priceChange, changePercent);
if (Math.abs(changePercent) > 5.0) {
System.out.println(" ⚠️ 价格大幅波动警告!");
}
}
// 分析成交量变化
Integer oldVolume = (Integer) data.get("oldVolume");
if (oldVolume != null && oldVolume > 0) {
double volumeChange = stockData.getVolume() - oldVolume;
double volumeChangePercent = (volumeChange / oldVolume) * 100;
System.out.printf(" 成交量变化: %d (%.2f%%)%n", (int) volumeChange, volumeChangePercent);
}
}
}
public String getName() {
return name;
}
}
// 高级股票观察者 - Advanced Stock Observer
public class AdvancedStockObserver implements Observer {
private String name;
private List<Double> priceHistory = new ArrayList<>();
private static final int HISTORY_SIZE = 10;
public AdvancedStockObserver(String name) {
this.name = name;
}
@Override
public void update(Observable o, Object arg) {
if (o instanceof ObservableStockData && arg instanceof Map) {
ObservableStockData stockData = (ObservableStockData) o;
@SuppressWarnings("unchecked")
Map<String, Object> data = (Map<String, Object>) arg;
priceHistory.add(stockData.getPrice());
if (priceHistory.size() > HISTORY_SIZE) {
priceHistory.remove(0);
}
System.out.println("[" + name + "] 高级分析:");
System.out.printf(" 当前价格: %.2f%n", stockData.getPrice());
// 计算移动平均线
if (priceHistory.size() >= 5) {
double ma5 = priceHistory.subList(priceHistory.size() - 5, priceHistory.size())
.stream().mapToDouble(Double::doubleValue).average().orElse(0);
System.out.printf(" 5日移动平均: %.2f%n", ma5);
if (stockData.getPrice() > ma5) {
System.out.println(" 📈 价格高于5日均线 - 看涨信号");
} else {
System.out.println(" 📉 价格低于5日均线 - 看跌信号");
}
}
// 计算相对强弱指标(简化版)
if (priceHistory.size() >= HISTORY_SIZE) {
int gains = 0;
int losses = 0;
for (int i = 1; i < priceHistory.size(); i++) {
double change = priceHistory.get(i) - priceHistory.get(i-1);
if (change > 0) gains++;
else if (change < 0) losses++;
}
double rs = (double) gains / losses;
double rsi = 100 - (100 / (1 + rs));
System.out.printf(" RSI指标: %.2f%n", rsi);
if (rsi > 70) {
System.out.println(" ⚠️ RSI超买 - 可能回调");
} else if (rsi < 30) {
System.out.println(" 💹 RSI超卖 - 可能反弹");
} else {
System.out.println(" 📊 RSI正常区间");
}
}
}
}
}
// 客户端使用 - Client Usage
public class JavaObservableDemo {
public static void main(String[] args) {
System.out.println("=== Java Observable观察者模式演示 ===");
// 创建可观察的股票数据
ObservableStockData appleStock = new ObservableStockData("AAPL");
// 创建观察者
StockObserver basicObserver = new StockObserver("基础观察者");
StockObserver anotherObserver = new StockObserver("另一个观察者");
AdvancedStockObserver advancedObserver = new AdvancedStockObserver("高级观察者");
// 添加观察者
appleStock.addObserver(basicObserver);
appleStock.addObserver(anotherObserver);
appleStock.addObserver(advancedObserver);
// 模拟股票数据更新
System.out.println("\n--- 第一次股票数据更新 ---");
appleStock.setStockData(150.00, 2500000, 2.50);
System.out.println("\n--- 第二次股票数据更新 ---");
appleStock.setStockData(152.30, 2800000, 2.30);
System.out.println("\n--- 第三次股票数据更新 ---");
appleStock.setStockData(149.80, 3200000, -2.50);
// 删除一个观察者
System.out.println("\n--- 删除一个观察者 ---");
appleStock.deleteObserver(anotherObserver);
System.out.println("\n--- 第四次股票数据更新 ---");
appleStock.setStockData(151.20, 2900000, 1.40);
// 显示观察者数量
System.out.println("\n当前观察者数量: " + appleStock.countObservers());
System.out.println("\n=== Java Observable演示完成 ===");
}
}
2. Spring ApplicationEvent - 应用事件
// Spring应用事件中的观察者模式应用
// Spring ApplicationEvent Observer Pattern Application
// 自定义应用事件 - Custom Application Event
public class UserRegistrationEvent extends ApplicationEvent {
private final String username;
private final String email;
private final Date registrationTime;
public UserRegistrationEvent(Object source, String username, String email) {
super(source);
this.username = username;
this.email = email;
this.registrationTime = new Date();
}
public String getUsername() { return username; }
public String getEmail() { return email; }
public Date getRegistrationTime() { return registrationTime; }
}
public class OrderStatusChangeEvent extends ApplicationEvent {
private final String orderId;
private final String oldStatus;
private final String newStatus;
private final BigDecimal orderAmount;
public OrderStatusChangeEvent(Object source, String orderId, String oldStatus,
String newStatus, BigDecimal orderAmount) {
super(source);
this.orderId = orderId;
this.oldStatus = oldStatus;
this.newStatus = newStatus;
this.orderAmount = orderAmount;
}
public String getOrderId() { return orderId; }
public String getOldStatus() { return oldStatus; }
public String getNewStatus() { return newStatus; }
public BigDecimal getOrderAmount() { return orderAmount; }
}
// 事件监听器 - Event Listeners
@Component
public class EmailNotificationListener implements ApplicationListener<UserRegistrationEvent> {
private static final Logger logger = LoggerFactory.getLogger(EmailNotificationListener.class);
@Override
public void onApplicationEvent(UserRegistrationEvent event) {
logger.info("邮件通知监听器:用户注册 - username: {}, email: {}",
event.getUsername(), event.getEmail());
// 模拟发送欢迎邮件
try {
Thread.sleep(1000); // 模拟邮件发送时间
logger.info("欢迎邮件发送成功给: {}", event.getEmail());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("邮件发送被中断", e);
}
}
}
@Component
public class SmsNotificationListener implements ApplicationListener<UserRegistrationEvent> {
private static final Logger logger = LoggerFactory.getLogger(SmsNotificationListener.class);
@Override
public void onApplicationEvent(UserRegistrationEvent event) {
logger.info("短信通知监听器:用户注册 - username: {}", event.getUsername());
// 模拟发送短信
try {
Thread.sleep(500); // 模拟短信发送时间
logger.info("注册短信发送成功给: {}", event.getUsername());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("短信发送被中断", e);
}
}
}
@Component
public class OrderProcessingListener implements ApplicationListener<OrderStatusChangeEvent> {
private static final Logger logger = LoggerFactory.getLogger(OrderProcessingListener.class);
@Override
public void onApplicationEvent(OrderStatusChangeEvent event) {
logger.info("订单处理监听器:订单状态变更 - orderId: {}, {} -> {}, amount: {}",
event.getOrderId(), event.getOldStatus(), event.getNewStatus(), event.getOrderAmount());
// 模拟订单处理
try {
Thread.sleep(2000); // 模拟处理时间
logger.info("订单处理完成: {}", event.getOrderId());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("订单处理被中断", e);
}
}
}
@Component
public class InventoryUpdateListener implements ApplicationListener<OrderStatusChangeEvent> {
private static final Logger logger = LoggerFactory.getLogger(InventoryUpdateListener.class);
@Override
public void onApplicationEvent(OrderStatusChangeEvent event) {
logger.info("库存更新监听器:订单状态变更 - orderId: {}", event.getOrderId());
// 模拟库存更新
try {
Thread.sleep(800); // 模拟更新时间
logger.info("库存更新完成 for order: {}", event.getOrderId());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("库存更新被中断", e);
}
}
}
// 事件发布服务 - Event Publishing Service
@Service
public class UserService {
private static final Logger logger = LoggerFactory.getLogger(UserService.class);
private final ApplicationEventPublisher eventPublisher;
@Autowired
public UserService(ApplicationEventPublisher eventPublisher) {
this.eventPublisher = eventPublisher;
}
public void registerUser(String username, String email) {
logger.info("用户服务:注册用户 - username: {}, email: {}", username, email);
// 模拟用户注册逻辑
try {
Thread.sleep(100); // 模拟处理时间
// 发布用户注册事件
UserRegistrationEvent event = new UserRegistrationEvent(this, username, email);
eventPublisher.publishEvent(event);
logger.info("用户注册完成: {}", username);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("用户注册被中断", e);
}
}
}
@Service
public class OrderService {
private static final Logger logger = LoggerFactory.getLogger(OrderService.class);
private final ApplicationEventPublisher eventPublisher;
@Autowired
public OrderService(ApplicationEventPublisher eventPublisher) {
this.eventPublisher = eventPublisher;
}
public void changeOrderStatus(String orderId, String oldStatus, String newStatus, BigDecimal amount) {
logger.info("订单服务:订单状态变更 - orderId: {}, {} -> {}, amount: {}",
orderId, oldStatus, newStatus, amount);
// 模拟订单状态变更逻辑
try {
Thread.sleep(200); // 模拟处理时间
// 发布订单状态变更事件
OrderStatusChangeEvent event = new OrderStatusChangeEvent(this, orderId, oldStatus, newStatus, amount);
eventPublisher.publishEvent(event);
logger.info("订单状态变更完成: {}", orderId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("订单状态变更被中断", e);
}
}
}
// 客户端使用 - Client Usage
@SpringBootApplication
public class SpringEventObserverDemo {
public static void main(String[] args) {
SpringApplication.run(SpringEventObserverDemo.class, args);
System.out.println("=== Spring ApplicationEvent观察者模式演示 ===");
System.out.println("启动Spring Boot应用,测试应用事件功能");
}
@Component
public static class EventObserverDemoRunner implements CommandLineRunner {
@Autowired
private UserService userService;
@Autowired
private OrderService orderService;
@Override
public void run(String... args) throws Exception {
System.out.println("\n--- 开始Spring应用事件观察者模式演示 ---");
// 演示用户注册事件
System.out.println("\n--- 演示用户注册事件 ---");
userService.registerUser("张三", "zhangsan@example.com");
userService.registerUser("李四", "lisi@example.com");
// 等待事件处理完成
Thread.sleep(2000);
// 演示订单状态变更事件
System.out.println("\n--- 演示订单状态变更事件 ---");
orderService.changeOrderStatus("ORDER-001", "CREATED", "PAID", new BigDecimal("299.99"));
orderService.changeOrderStatus("ORDER-001", "PAID", "SHIPPED", new BigDecimal("299.99"));
orderService.changeOrderStatus("ORDER-001", "SHIPPED", "DELIVERED", new BigDecimal("299.99"));
// 等待事件处理完成
Thread.sleep(3000);
System.out.println("\nSpring应用事件观察者模式演示完成");
}
}
}
观察者模式与其他模式比较 (Comparison with Other Patterns)
| 特性 | 观察者模式 | 中介者模式 | 责任链模式 | 发布-订阅 |
|---|---|---|---|---|
| 目的 | 状态通知 | 交互协调 | 请求传递 | 消息分发 |
| 通信方式 | 一对一/一对多 | 多对多 | 链式传递 | 广播 |
| 耦合度 | 低耦合 | 中等耦合 | 低耦合 | 极低耦合 |
| 使用场景 | 事件通知 | 复杂交互 | 处理链 | 消息系统 |
| 实现复杂度 | 简单 | 中等 | 中等 | 复杂 |
最佳实践 (Best Practices)
1. 主题设计 (Subject Design)
- 保持主题的接口简单明了
- 避免在主题中包含过多的业务逻辑
- 考虑使用接口隔离原则
2. 观察者设计 (Observer Design)
- 保持观察者的独立性
- 避免观察者之间的依赖关系
- 考虑观察者的执行顺序
3. 通知策略 (Notification Strategy)
- 选择合适的通知时机(拉模型 vs 推模型)
- 考虑异步通知以提高性能
- 处理好通知的顺序和优先级
4. 错误处理 (Error Handling)
- 处理观察者异常,避免影响其他观察者
- 考虑使用try-catch包装观察者调用
- 提供适当的错误恢复机制
5. 性能优化 (Performance Optimization)
- 使用合适的数据结构存储观察者
- 考虑使用异步处理提高响应速度
- 避免在通知过程中进行耗时操作
优缺点 (Pros and Cons)
优点 (Advantages)
- 松耦合:观察者和主题之间是抽象耦合
- 动态关系:可以在运行时添加或删除观察者
- 广播通信:主题可以同时通知多个观察者
- 简化对象:每个对象只需要关注自己的职责
- 扩展性:易于添加新的观察者类型
缺点 (Disadvantages)
- 意外更新:可能导致意外的更新和循环依赖
- 性能开销:通知所有观察者可能带来性能开销
- 调试困难:由于动态关系,调试可能变得复杂
- 内存泄漏:如果观察者没有正确移除,可能导致内存泄漏
- 更新顺序:观察者的更新顺序可能不确定
实际应用建议 (Practical Application Tips)
1. 生命周期管理
- 确保正确管理观察者的生命周期
- 在不需要时及时移除观察者
- 考虑使用弱引用避免内存泄漏
2. 更新策略
- 选择合适的更新策略(拉模型 vs 推模型)
- 考虑使用增量更新减少数据传输
- 实现适当的更新频率控制
3. 线程安全
- 在并发环境下确保线程安全
- 考虑使用不可变对象作为通知数据
- 处理好同步和异步更新的问题
4. 测试策略
- 单独测试每个观察者的逻辑
- 测试观察者的注册和移除
- 验证通知的正确性和完整性
总结 (Summary)
观察者模式是一种基础且强大的行为型设计模式,它通过定义对象之间的一对多依赖关系,实现了松耦合的事件通知机制。这种模式在现代软件架构中有着广泛的应用,从GUI系统到事件驱动架构,从消息系统到实时数据推送。
在实际应用中,观察者模式特别适合以下场景:
- 需要实现事件驱动架构的系统
- 需要实现模型-视图分离的应用程序
- 需要支持广播通信的消息系统
- 需要实现发布-订阅机制的服务
- 需要实现实时数据推送的应用
通过合理使用观察者模式,可以构建出松耦合、可扩展、易于维护的系统架构。然而,需要注意处理好生命周期管理、性能优化和错误处理等问题,确保系统的稳定性和可靠性。
更多推荐



所有评论(0)