观察者模式 (Observer Pattern)

概述 (Overview)

观察者模式是一种行为型设计模式,它定义了对象之间的一对多依赖关系,使得当一个对象改变状态时,所有依赖它的对象都会自动收到通知并更新。

Observer Pattern is a behavioral design pattern that defines a one-to-many dependency between objects so that when one object changes state, all of its dependents are notified and updated automatically.

意图 (Intent)

定义对象之间的一对多依赖关系,使得当一个对象改变状态时,所有依赖它的对象都会自动收到通知并更新。

Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically.

适用场景 (When to Use)

  • 当需要实现事件处理系统时
  • 当需要将对象状态的改变通知给多个其他对象时
  • 当需要实现发布-订阅机制时
  • 当需要实现模型-视图分离时
  • 当需要支持广播通信时
  • 当需要实现松耦合的系统架构时

结构 (Structure)

基础观察者模式结构图 (Basic Observer Pattern Structure)

uses
uses
«interface»
Subject
+attach(observer: Observer)
+detach(observer: Observer)
+notify()
ConcreteSubject
-state: Object
-observers: List
+attach(observer: Observer)
+detach(observer: Observer)
+notify()
+getState()
+setState(state: Object)
«interface»
Observer
+update(subject: Subject)
ConcreteObserver
-state: Object
+update(subject: Subject)
Client
+configure()

实现方式 (Implementation Approaches)

1. 经典观察者模式 (Classic Observer Pattern)

// 主题接口 - Subject Interface
public interface Subject {
    void attach(Observer observer);
    void detach(Observer observer);
    void notifyObservers();
}

// 观察者接口 - Observer Interface
public interface Observer {
    void update(Subject subject);
}

// 具体主题 - Concrete Subject
public class WeatherStation implements Subject {
    private float temperature;
    private float humidity;
    private float pressure;
    private List<Observer> observers;
    
    public WeatherStation() {
        this.observers = new ArrayList<>();
    }
    
    @Override
    public void attach(Observer observer) {
        if (!observers.contains(observer)) {
            observers.add(observer);
            System.out.println("添加观察者: " + observer.getClass().getSimpleName());
        }
    }
    
    @Override
    public void detach(Observer observer) {
        if (observers.remove(observer)) {
            System.out.println("移除观察者: " + observer.getClass().getSimpleName());
        }
    }
    
    @Override
    public void notifyObservers() {
        System.out.println("\n通知所有观察者天气数据更新:");
        for (Observer observer : observers) {
            observer.update(this);
        }
    }
    
    // 当气象数据改变时,通知观察者
    public void setMeasurements(float temperature, float humidity, float pressure) {
        System.out.println("\n=== 气象站更新数据 ===");
        this.temperature = temperature;
        this.humidity = humidity;
        this.pressure = pressure;
        System.out.printf("温度: %.1f°C, 湿度: %.1f%%, 气压: %.1f hPa%n", 
                         temperature, humidity, pressure);
        notifyObservers();
    }
    
    // 获取当前气象数据
    public float getTemperature() {
        return temperature;
    }
    
    public float getHumidity() {
        return humidity;
    }
    
    public float getPressure() {
        return pressure;
    }
}

// 具体观察者 - Concrete Observers
public class CurrentConditionsDisplay implements Observer {
    private float temperature;
    private float humidity;
    
    @Override
    public void update(Subject subject) {
        if (subject instanceof WeatherStation) {
            WeatherStation weatherStation = (WeatherStation) subject;
            this.temperature = weatherStation.getTemperature();
            this.humidity = weatherStation.getHumidity();
            display();
        }
    }
    
    public void display() {
        System.out.println("当前天气显示:");
        System.out.printf("  当前温度: %.1f°C%n", temperature);
        System.out.printf("  当前湿度: %.1f%%%n", humidity);
    }
}

public class StatisticsDisplay implements Observer {
    private List<Float> temperatures = new ArrayList<>();
    private List<Float> humidities = new ArrayList<>();
    private List<Float> pressures = new ArrayList<>();
    
    @Override
    public void update(Subject subject) {
        if (subject instanceof WeatherStation) {
            WeatherStation weatherStation = (WeatherStation) subject;
            temperatures.add(weatherStation.getTemperature());
            humidities.add(weatherStation.getHumidity());
            pressures.add(weatherStation.getPressure());
            display();
        }
    }
    
    public void display() {
        System.out.println("气象统计显示:");
        if (!temperatures.isEmpty()) {
            System.out.printf("  平均温度: %.1f°C%n", 
                             temperatures.stream().mapToDouble(Float::doubleValue).average().orElse(0));
            System.out.printf("  最高温度: %.1f°C%n", 
                             temperatures.stream().max(Float::compareTo).orElse(0f));
            System.out.printf("  最低温度: %.1f°C%n", 
                             temperatures.stream().min(Float::compareTo).orElse(0f));
        }
    }
    
    public void clearStatistics() {
        temperatures.clear();
        humidities.clear();
        pressures.clear();
        System.out.println("统计数据已清空");
    }
}

public class ForecastDisplay implements Observer {
    private float currentPressure = 1013.0f; // 标准大气压
    private float lastPressure;
    
    @Override
    public void update(Subject subject) {
        if (subject instanceof WeatherStation) {
            WeatherStation weatherStation = (WeatherStation) subject;
            lastPressure = currentPressure;
            currentPressure = weatherStation.getPressure();
            display();
        }
    }
    
    public void display() {
        System.out.println("天气预报显示:");
        System.out.printf("  当前气压: %.1f hPa%n", currentPressure);
        
        if (currentPressure > lastPressure) {
            System.out.println("  预报: 天气正在改善");
        } else if (currentPressure == lastPressure) {
            System.out.println("  预报: 天气保持稳定");
        } else {
            System.out.println("  预报: 天气可能转差");
        }
    }
}

// 客户端使用 - Client Usage
public class WeatherStationDemo {
    public static void main(String[] args) {
        System.out.println("=== 气象站观察者模式演示 ===");
        
        // 创建气象站(主题)
        WeatherStation weatherStation = new WeatherStation();
        
        // 创建显示设备(观察者)
        CurrentConditionsDisplay currentDisplay = new CurrentConditionsDisplay();
        StatisticsDisplay statisticsDisplay = new StatisticsDisplay();
        ForecastDisplay forecastDisplay = new ForecastDisplay();
        
        // 注册观察者
        weatherStation.attach(currentDisplay);
        weatherStation.attach(statisticsDisplay);
        weatherStation.attach(forecastDisplay);
        
        // 模拟气象数据更新
        System.out.println("\n--- 第一次数据更新 ---");
        weatherStation.setMeasurements(25.0f, 65.0f, 1013.0f);
        
        System.out.println("\n--- 第二次数据更新 ---");
        weatherStation.setMeasurements(26.5f, 70.0f, 1012.0f);
        
        System.out.println("\n--- 第三次数据更新 ---");
        weatherStation.setMeasurements(24.0f, 90.0f, 1011.0f);
        
        // 移除一个观察者
        System.out.println("\n--- 移除统计显示器 ---");
        weatherStation.detach(statisticsDisplay);
        
        System.out.println("\n--- 第四次数据更新 ---");
        weatherStation.setMeasurements(23.0f, 85.0f, 1010.0f);
        
        System.out.println("\n=== 演示完成 ===");
    }
}

2. 推模型观察者模式 (Push Model Observer Pattern)

// 推模型主题接口 - Push Model Subject Interface
public interface PushSubject {
    void attach(PushObserver observer);
    void detach(PushObserver observer);
    void notifyObservers(Map<String, Object> data);
}

// 推模型观察者接口 - Push Model Observer Interface
public interface PushObserver {
    void update(Map<String, Object> data);
}

// 股票数据主题 - Stock Data Subject
public class StockData implements PushSubject {
    private String symbol;
    private double price;
    private int volume;
    private double change;
    private List<PushObserver> observers;
    
    public StockData(String symbol) {
        this.symbol = symbol;
        this.observers = new ArrayList<>();
    }
    
    @Override
    public void attach(PushObserver observer) {
        observers.add(observer);
        System.out.println("添加股票观察者: " + observer.getClass().getSimpleName());
    }
    
    @Override
    public void detach(PushObserver observer) {
        observers.remove(observer);
        System.out.println("移除股票观察者: " + observer.getClass().getSimpleName());
    }
    
    @Override
    public void notifyObservers(Map<String, Object> data) {
        System.out.println("\n推送股票数据给所有观察者:");
        for (PushObserver observer : observers) {
            observer.update(data);
        }
    }
    
    public void setStockData(double price, int volume, double change) {
        System.out.println("\n=== 股票数据更新 ===");
        this.price = price;
        this.volume = volume;
        this.change = change;
        
        // 创建推送数据
        Map<String, Object> data = new HashMap<>();
        data.put("symbol", symbol);
        data.put("price", price);
        data.put("volume", volume);
        data.put("change", change);
        data.put("changePercent", (change / (price - change)) * 100);
        data.put("timestamp", new Date());
        
        System.out.printf("股票 %s: 价格=%.2f, 成交量=%d, 涨跌=%.2f%n", 
                         symbol, price, volume, change);
        
        notifyObservers(data);
    }
}

// 具体推模型观察者 - Concrete Push Model Observers
public class PriceAlert implements PushObserver {
    private double alertThreshold;
    
    public PriceAlert(double alertThreshold) {
        this.alertThreshold = alertThreshold;
    }
    
    @Override
    public void update(Map<String, Object> data) {
        Double price = (Double) data.get("price");
        String symbol = (String) data.get("symbol");
        
        System.out.println("价格警报:");
        System.out.printf("  股票 %s 当前价格: %.2f%n", symbol, price);
        
        if (price >= alertThreshold) {
            System.out.printf("  ⚠️  警告: 价格超过阈值 %.2f%n", alertThreshold);
        } else {
            System.out.printf("  ✅ 正常: 价格在阈值以下%n");
        }
    }
}

public class VolumeAnalyzer implements PushObserver {
    private static final int HIGH_VOLUME_THRESHOLD = 1000000;
    
    @Override
    public void update(Map<String, Object> data) {
        Integer volume = (Integer) data.get("volume");
        String symbol = (String) data.get("symbol");
        
        System.out.println("成交量分析:");
        System.out.printf("  股票 %s 成交量: %d%n", symbol, volume);
        
        if (volume > HIGH_VOLUME_THRESHOLD) {
            System.out.println("  📈 高成交量: 市场活跃");
        } else if (volume > HIGH_VOLUME_THRESHOLD / 2) {
            System.out.println("  📊 中等成交量: 正常交易");
        } else {
            System.out.println("  📉 低成交量: 交易清淡");
        }
    }
}

public class TrendIndicator implements PushObserver {
    private List<Double> priceHistory = new ArrayList<>();
    private static final int HISTORY_SIZE = 5;
    
    @Override
    public void update(Map<String, Object> data) {
        Double price = (Double) data.get("price");
        Double change = (Double) data.get("change");
        String symbol = (String) data.get("symbol");
        
        priceHistory.add(price);
        if (priceHistory.size() > HISTORY_SIZE) {
            priceHistory.remove(0);
        }
        
        System.out.println("趋势指标:");
        System.out.printf("  股票 %s 当前价格: %.2f, 变化: %.2f%n", symbol, price, change);
        
        if (priceHistory.size() >= HISTORY_SIZE) {
            boolean isUpTrend = true;
            boolean isDownTrend = true;
            
            for (int i = 1; i < priceHistory.size(); i++) {
                if (priceHistory.get(i) <= priceHistory.get(i-1)) {
                    isUpTrend = false;
                }
                if (priceHistory.get(i) >= priceHistory.get(i-1)) {
                    isDownTrend = false;
                }
            }
            
            if (isUpTrend) {
                System.out.println("  📈 上升趋势: 价格连续上涨");
            } else if (isDownTrend) {
                System.out.println("  📉 下降趋势: 价格连续下跌");
            } else {
                System.out.println("  📊 震荡趋势: 价格波动");
            }
        }
    }
}

// 客户端使用 - Client Usage
public class StockMarketDemo {
    public static void main(String[] args) {
        System.out.println("=== 股票市场观察者模式演示(推模型)===");
        
        // 创建股票数据主题
        StockData appleStock = new StockData("AAPL");
        StockData googleStock = new StockData("GOOGL");
        
        // 创建观察者
        PriceAlert priceAlert = new PriceAlert(150.0);
        VolumeAnalyzer volumeAnalyzer = new VolumeAnalyzer();
        TrendIndicator trendIndicator = new TrendIndicator();
        
        // 注册观察者
        appleStock.attach(priceAlert);
        appleStock.attach(volumeAnalyzer);
        appleStock.attach(trendIndicator);
        
        googleStock.attach(priceAlert);
        googleStock.attach(volumeAnalyzer);
        
        // 模拟股票数据更新
        System.out.println("\n--- 苹果股票数据更新 ---");
        appleStock.setStockData(145.30, 2500000, 2.50);
        appleStock.setStockData(147.80, 2800000, 2.50);
        appleStock.setStockData(149.20, 3200000, 1.40);
        appleStock.setStockData(151.50, 3500000, 2.30);
        appleStock.setStockData(153.80, 3800000, 2.30);
        
        System.out.println("\n--- 谷歌股票数据更新 ---");
        googleStock.setStockData(2750.50, 1800000, -15.20);
        googleStock.setStockData(2745.30, 1900000, -5.20);
        googleStock.setStockData(2740.80, 1700000, -4.50);
        
        System.out.println("\n=== 推模型演示完成 ===");
    }
}

3. 事件总线观察者模式 (Event Bus Observer Pattern)

// 事件类型枚举 - Event Type Enum
public enum EventType {
    USER_LOGIN, USER_LOGOUT, USER_REGISTER, 
    ORDER_CREATED, ORDER_PAID, ORDER_SHIPPED,
    SYSTEM_ALERT, SYSTEM_MAINTENANCE
}

// 事件类 - Event Class
public class Event {
    private final EventType type;
    private final Object data;
    private final String source;
    private final Date timestamp;
    
    public Event(EventType type, Object data, String source) {
        this.type = type;
        this.data = data;
        this.source = source;
        this.timestamp = new Date();
    }
    
    // Getters
    public EventType getType() { return type; }
    public Object getData() { return data; }
    public String getSource() { return source; }
    public Date getTimestamp() { return timestamp; }
    
    @Override
    public String toString() {
        return String.format("Event{type=%s, source='%s', time=%s}", type, source, timestamp);
    }
}

// 事件监听器接口 - Event Listener Interface
public interface EventListener {
    void onEvent(Event event);
    boolean supports(EventType eventType);
}

// 具体事件监听器 - Concrete Event Listeners
public class UserEventListener implements EventListener {
    @Override
    public void onEvent(Event event) {
        System.out.println("用户事件监听器收到事件: " + event);
        
        switch (event.getType()) {
            case USER_LOGIN:
                handleUserLogin(event);
                break;
            case USER_LOGOUT:
                handleUserLogout(event);
                break;
            case USER_REGISTER:
                handleUserRegister(event);
                break;
            default:
                break;
        }
    }
    
    @Override
    public boolean supports(EventType eventType) {
        return eventType == EventType.USER_LOGIN || 
               eventType == EventType.USER_LOGOUT || 
               eventType == EventType.USER_REGISTER;
    }
    
    private void handleUserLogin(Event event) {
        System.out.println("处理用户登录: " + event.getData());
        // 可以在这里添加登录逻辑,如记录登录时间、更新用户状态等
    }
    
    private void handleUserLogout(Event event) {
        System.out.println("处理用户登出: " + event.getData());
        // 可以在这里添加登出逻辑,如清理会话、记录登出时间等
    }
    
    private void handleUserRegister(Event event) {
        System.out.println("处理用户注册: " + event.getData());
        // 可以在这里添加注册逻辑,如发送欢迎邮件、初始化用户数据等
    }
}

public class OrderEventListener implements EventListener {
    @Override
    public void onEvent(Event event) {
        System.out.println("订单事件监听器收到事件: " + event);
        
        switch (event.getType()) {
            case ORDER_CREATED:
                handleOrderCreated(event);
                break;
            case ORDER_PAID:
                handleOrderPaid(event);
                break;
            case ORDER_SHIPPED:
                handleOrderShipped(event);
                break;
            default:
                break;
        }
    }
    
    @Override
    public boolean supports(EventType eventType) {
        return eventType == EventType.ORDER_CREATED || 
               eventType == EventType.ORDER_PAID || 
               eventType == EventType.ORDER_SHIPPED;
    }
    
    private void handleOrderCreated(Event event) {
        System.out.println("处理订单创建: " + event.getData());
        // 可以在这里添加订单创建逻辑,如库存检查、发送确认邮件等
    }
    
    private void handleOrderPaid(Event event) {
        System.out.println("处理订单支付: " + event.getData());
        // 可以在这里添加支付逻辑,如更新订单状态、安排发货等
    }
    
    private void handleOrderShipped(Event event) {
        System.out.println("处理订单发货: " + event.getData());
        // 可以在这里添加发货逻辑,如更新物流信息、发送发货通知等
    }
}

public class SystemEventListener implements EventListener {
    @Override
    public void onEvent(Event event) {
        System.out.println("系统事件监听器收到事件: " + event);
        
        switch (event.getType()) {
            case SYSTEM_ALERT:
                handleSystemAlert(event);
                break;
            case SYSTEM_MAINTENANCE:
                handleSystemMaintenance(event);
                break;
            default:
                break;
        }
    }
    
    @Override
    public boolean supports(EventType eventType) {
        return eventType == EventType.SYSTEM_ALERT || 
               eventType == EventType.SYSTEM_MAINTENANCE;
    }
    
    private void handleSystemAlert(Event event) {
        System.out.println("🚨 系统警报: " + event.getData());
        // 可以在这里添加警报处理逻辑,如发送通知、记录日志等
    }
    
    private void handleSystemMaintenance(Event event) {
        System.out.println("🔧 系统维护: " + event.getData());
        // 可以在这里添加维护处理逻辑,如通知用户、暂停服务等
    }
}

// 事件总线(观察者模式实现)- Event Bus (Observer Pattern Implementation)
public class EventBus implements Subject {
    private final Map<EventType, List<EventListener>> listeners;
    private final String busName;
    private boolean asyncProcessing;
    private ExecutorService executorService;
    
    public EventBus(String busName, boolean asyncProcessing) {
        this.busName = busName;
        this.asyncProcessing = asyncProcessing;
        this.listeners = new ConcurrentHashMap<>();
        
        // 初始化事件类型映射
        for (EventType type : EventType.values()) {
            listeners.put(type, new CopyOnWriteArrayList<>());
        }
        
        if (asyncProcessing) {
            this.executorService = Executors.newCachedThreadPool();
        }
    }
    
    @Override
    public void attach(Observer observer) {
        if (observer instanceof EventListener) {
            EventListener listener = (EventListener) observer;
            for (EventType type : EventType.values()) {
                if (listener.supports(type)) {
                    listeners.get(type).add(listener);
                }
            }
            System.out.println("事件监听器已注册: " + listener.getClass().getSimpleName());
        }
    }
    
    @Override
    public void detach(Observer observer) {
        if (observer instanceof EventListener) {
            EventListener listener = (EventListener) observer;
            for (EventType type : EventType.values()) {
                listeners.get(type).remove(listener);
            }
            System.out.println("事件监听器已移除: " + listener.getClass().getSimpleName());
        }
    }
    
    @Override
    public void notifyObservers() {
        // 事件总线不实现此方法,使用publishEvent代替
    }
    
    public void publishEvent(Event event) {
        System.out.println("\n[" + busName + "] 发布事件: " + event);
        
        List<EventListener> eventListeners = listeners.get(event.getType());
        
        if (asyncProcessing) {
            // 异步处理事件
            for (EventListener listener : eventListeners) {
                executorService.submit(() -> {
                    try {
                        listener.onEvent(event);
                    } catch (Exception e) {
                        System.err.println("事件监听器处理失败: " + e.getMessage());
                    }
                });
            }
        } else {
            // 同步处理事件
            for (EventListener listener : eventListeners) {
                try {
                    listener.onEvent(event);
                } catch (Exception e) {
                    System.err.println("事件监听器处理失败: " + e.getMessage());
                }
            }
        }
    }
    
    public void shutdown() {
        if (executorService != null) {
            executorService.shutdown();
            try {
                if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                    executorService.shutdownNow();
                }
            } catch (InterruptedException e) {
                executorService.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
    }
}

// 事件源 - Event Sources
public class UserService {
    private EventBus eventBus;
    
    public UserService(EventBus eventBus) {
        this.eventBus = eventBus;
    }
    
    public void userLogin(String username) {
        System.out.println("\n用户服务:处理用户登录 - " + username);
        Event event = new Event(EventType.USER_LOGIN, username, "UserService");
        eventBus.publishEvent(event);
    }
    
    public void userLogout(String username) {
        System.out.println("\n用户服务:处理用户登出 - " + username);
        Event event = new Event(EventType.USER_LOGOUT, username, "UserService");
        eventBus.publishEvent(event);
    }
    
    public void userRegister(String username, String email) {
        System.out.println("\n用户服务:处理用户注册 - " + username);
        Map<String, String> userData = new HashMap<>();
        userData.put("username", username);
        userData.put("email", email);
        Event event = new Event(EventType.USER_REGISTER, userData, "UserService");
        eventBus.publishEvent(event);
    }
}

public class OrderService {
    private EventBus eventBus;
    
    public OrderService(EventBus eventBus) {
        this.eventBus = eventBus;
    }
    
    public void createOrder(String orderId, String customerName, BigDecimal amount) {
        System.out.println("\n订单服务:创建订单 - " + orderId);
        Map<String, Object> orderData = new HashMap<>();
        orderData.put("orderId", orderId);
        orderData.put("customerName", customerName);
        orderData.put("amount", amount);
        Event event = new Event(EventType.ORDER_CREATED, orderData, "OrderService");
        eventBus.publishEvent(event);
    }
    
    public void payOrder(String orderId) {
        System.out.println("\n订单服务:支付订单 - " + orderId);
        Event event = new Event(EventType.ORDER_PAID, orderId, "OrderService");
        eventBus.publishEvent(event);
    }
    
    public void shipOrder(String orderId) {
        System.out.println("\n订单服务:发货订单 - " + orderId);
        Event event = new Event(EventType.ORDER_SHIPPED, orderId, "OrderService");
        eventBus.publishEvent(event);
    }
}

public class SystemMonitor {
    private EventBus eventBus;
    
    public SystemMonitor(EventBus eventBus) {
        this.eventBus = eventBus;
    }
    
    public void sendAlert(String message) {
        System.out.println("\n系统监控:发送警报");
        Event event = new Event(EventType.SYSTEM_ALERT, message, "SystemMonitor");
        eventBus.publishEvent(event);
    }
    
    public void startMaintenance(String message) {
        System.out.println("\n系统监控:开始维护");
        Event event = new Event(EventType.SYSTEM_MAINTENANCE, message, "SystemMonitor");
        eventBus.publishEvent(event);
    }
}

// 客户端使用 - Client Usage
public class EventBusObserverDemo {
    public static void main(String[] args) {
        System.out.println("=== 事件总线观察者模式演示 ===");
        
        // 创建事件总线(主题)
        EventBus eventBus = new EventBus("MainEventBus", true); // 使用异步处理
        
        // 创建事件监听器(观察者)
        EventListener userListener = new UserEventListener();
        EventListener orderListener = new OrderEventListener();
        EventListener systemListener = new SystemEventListener();
        
        // 注册监听器
        eventBus.attach(userListener);
        eventBus.attach(orderListener);
        eventBus.attach(systemListener);
        
        // 创建事件源
        UserService userService = new UserService(eventBus);
        OrderService orderService = new OrderService(eventBus);
        SystemMonitor systemMonitor = new SystemMonitor(eventBus);
        
        // 模拟各种业务操作
        System.out.println("\n--- 模拟用户注册 ---");
        userService.userRegister("张三", "zhangsan@example.com");
        
        System.out.println("\n--- 模拟用户登录 ---");
        userService.userLogin("张三");
        
        System.out.println("\n--- 模拟订单流程 ---");
        orderService.createOrder("ORDER-001", "张三", new BigDecimal("299.99"));
        orderService.payOrder("ORDER-001");
        orderService.shipOrder("ORDER-001");
        
        System.out.println("\n--- 模拟系统事件 ---");
        systemMonitor.sendAlert("CPU使用率过高");
        systemMonitor.startMaintenance("系统将于今晚进行维护");
        
        System.out.println("\n--- 模拟用户登出 ---");
        userService.userLogout("张三");
        
        // 等待异步处理完成
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        // 清理资源
        eventBus.shutdown();
        
        System.out.println("\n=== 事件总线观察者模式演示完成 ===");
    }
}

框架源码中的应用 (Framework Applications)

1. Java Observable - 可观察类

// Java Observable类中的观察者模式应用
// Java Observable Observer Pattern Application

// 可观察的股票数据 - Observable Stock Data
public class ObservableStockData extends Observable {
    private String symbol;
    private double price;
    private int volume;
    private double change;
    
    public ObservableStockData(String symbol) {
        this.symbol = symbol;
    }
    
    public void setStockData(double price, int volume, double change) {
        System.out.println("\n=== 可观察股票数据更新 ===");
        
        // 保存旧值用于比较
        Double oldPrice = this.price;
        Integer oldVolume = this.volume;
        Double oldChange = this.change;
        
        this.price = price;
        this.volume = volume;
        this.change = change;
        
        System.out.printf("股票 %s: 价格=%.2f, 成交量=%d, 涨跌=%.2f%n", 
                         symbol, price, volume, change);
        
        // 设置变化标志
        setChanged();
        
        // 创建通知数据
        Map<String, Object> updateData = new HashMap<>();
        updateData.put("symbol", symbol);
        updateData.put("price", price);
        updateData.put("volume", volume);
        updateData.put("change", change);
        updateData.put("changePercent", (change / (price - change)) * 100);
        updateData.put("oldPrice", oldPrice);
        updateData.put("oldVolume", oldVolume);
        updateData.put("oldChange", oldChange);
        
        // 通知观察者
        notifyObservers(updateData);
    }
    
    // 获取当前股票数据
    public String getSymbol() { return symbol; }
    public double getPrice() { return price; }
    public int getVolume() { return volume; }
    public double getChange() { return change; }
}

// 股票观察者 - Stock Observer
public class StockObserver implements Observer {
    private String name;
    
    public StockObserver(String name) {
        this.name = name;
    }
    
    @Override
    public void update(Observable o, Object arg) {
        if (o instanceof ObservableStockData && arg instanceof Map) {
            ObservableStockData stockData = (ObservableStockData) o;
            @SuppressWarnings("unchecked")
            Map<String, Object> data = (Map<String, Object>) arg;
            
            System.out.println("[" + name + "] 收到股票数据更新:");
            System.out.printf("  股票: %s, 价格: %.2f, 成交量: %d%n", 
                             stockData.getSymbol(), stockData.getPrice(), stockData.getVolume());
            
            // 分析价格变化
            Double oldPrice = (Double) data.get("oldPrice");
            if (oldPrice != null && oldPrice > 0) {
                double priceChange = stockData.getPrice() - oldPrice;
                double changePercent = (priceChange / oldPrice) * 100;
                System.out.printf("  价格变化: %.2f (%.2f%%)%n", priceChange, changePercent);
                
                if (Math.abs(changePercent) > 5.0) {
                    System.out.println("  ⚠️  价格大幅波动警告!");
                }
            }
            
            // 分析成交量变化
            Integer oldVolume = (Integer) data.get("oldVolume");
            if (oldVolume != null && oldVolume > 0) {
                double volumeChange = stockData.getVolume() - oldVolume;
                double volumeChangePercent = (volumeChange / oldVolume) * 100;
                System.out.printf("  成交量变化: %d (%.2f%%)%n", (int) volumeChange, volumeChangePercent);
            }
        }
    }
    
    public String getName() {
        return name;
    }
}

// 高级股票观察者 - Advanced Stock Observer
public class AdvancedStockObserver implements Observer {
    private String name;
    private List<Double> priceHistory = new ArrayList<>();
    private static final int HISTORY_SIZE = 10;
    
    public AdvancedStockObserver(String name) {
        this.name = name;
    }
    
    @Override
    public void update(Observable o, Object arg) {
        if (o instanceof ObservableStockData && arg instanceof Map) {
            ObservableStockData stockData = (ObservableStockData) o;
            @SuppressWarnings("unchecked")
            Map<String, Object> data = (Map<String, Object>) arg;
            
            priceHistory.add(stockData.getPrice());
            if (priceHistory.size() > HISTORY_SIZE) {
                priceHistory.remove(0);
            }
            
            System.out.println("[" + name + "] 高级分析:");
            System.out.printf("  当前价格: %.2f%n", stockData.getPrice());
            
            // 计算移动平均线
            if (priceHistory.size() >= 5) {
                double ma5 = priceHistory.subList(priceHistory.size() - 5, priceHistory.size())
                    .stream().mapToDouble(Double::doubleValue).average().orElse(0);
                System.out.printf("  5日移动平均: %.2f%n", ma5);
                
                if (stockData.getPrice() > ma5) {
                    System.out.println("  📈 价格高于5日均线 - 看涨信号");
                } else {
                    System.out.println("  📉 价格低于5日均线 - 看跌信号");
                }
            }
            
            // 计算相对强弱指标(简化版)
            if (priceHistory.size() >= HISTORY_SIZE) {
                int gains = 0;
                int losses = 0;
                
                for (int i = 1; i < priceHistory.size(); i++) {
                    double change = priceHistory.get(i) - priceHistory.get(i-1);
                    if (change > 0) gains++;
                    else if (change < 0) losses++;
                }
                
                double rs = (double) gains / losses;
                double rsi = 100 - (100 / (1 + rs));
                System.out.printf("  RSI指标: %.2f%n", rsi);
                
                if (rsi > 70) {
                    System.out.println("  ⚠️  RSI超买 - 可能回调");
                } else if (rsi < 30) {
                    System.out.println("  💹  RSI超卖 - 可能反弹");
                } else {
                    System.out.println("  📊  RSI正常区间");
                }
            }
        }
    }
}

// 客户端使用 - Client Usage
public class JavaObservableDemo {
    public static void main(String[] args) {
        System.out.println("=== Java Observable观察者模式演示 ===");
        
        // 创建可观察的股票数据
        ObservableStockData appleStock = new ObservableStockData("AAPL");
        
        // 创建观察者
        StockObserver basicObserver = new StockObserver("基础观察者");
        StockObserver anotherObserver = new StockObserver("另一个观察者");
        AdvancedStockObserver advancedObserver = new AdvancedStockObserver("高级观察者");
        
        // 添加观察者
        appleStock.addObserver(basicObserver);
        appleStock.addObserver(anotherObserver);
        appleStock.addObserver(advancedObserver);
        
        // 模拟股票数据更新
        System.out.println("\n--- 第一次股票数据更新 ---");
        appleStock.setStockData(150.00, 2500000, 2.50);
        
        System.out.println("\n--- 第二次股票数据更新 ---");
        appleStock.setStockData(152.30, 2800000, 2.30);
        
        System.out.println("\n--- 第三次股票数据更新 ---");
        appleStock.setStockData(149.80, 3200000, -2.50);
        
        // 删除一个观察者
        System.out.println("\n--- 删除一个观察者 ---");
        appleStock.deleteObserver(anotherObserver);
        
        System.out.println("\n--- 第四次股票数据更新 ---");
        appleStock.setStockData(151.20, 2900000, 1.40);
        
        // 显示观察者数量
        System.out.println("\n当前观察者数量: " + appleStock.countObservers());
        
        System.out.println("\n=== Java Observable演示完成 ===");
    }
}

2. Spring ApplicationEvent - 应用事件

// Spring应用事件中的观察者模式应用
// Spring ApplicationEvent Observer Pattern Application

// 自定义应用事件 - Custom Application Event
public class UserRegistrationEvent extends ApplicationEvent {
    private final String username;
    private final String email;
    private final Date registrationTime;
    
    public UserRegistrationEvent(Object source, String username, String email) {
        super(source);
        this.username = username;
        this.email = email;
        this.registrationTime = new Date();
    }
    
    public String getUsername() { return username; }
    public String getEmail() { return email; }
    public Date getRegistrationTime() { return registrationTime; }
}

public class OrderStatusChangeEvent extends ApplicationEvent {
    private final String orderId;
    private final String oldStatus;
    private final String newStatus;
    private final BigDecimal orderAmount;
    
    public OrderStatusChangeEvent(Object source, String orderId, String oldStatus, 
                                String newStatus, BigDecimal orderAmount) {
        super(source);
        this.orderId = orderId;
        this.oldStatus = oldStatus;
        this.newStatus = newStatus;
        this.orderAmount = orderAmount;
    }
    
    public String getOrderId() { return orderId; }
    public String getOldStatus() { return oldStatus; }
    public String getNewStatus() { return newStatus; }
    public BigDecimal getOrderAmount() { return orderAmount; }
}

// 事件监听器 - Event Listeners
@Component
public class EmailNotificationListener implements ApplicationListener<UserRegistrationEvent> {
    private static final Logger logger = LoggerFactory.getLogger(EmailNotificationListener.class);
    
    @Override
    public void onApplicationEvent(UserRegistrationEvent event) {
        logger.info("邮件通知监听器:用户注册 - username: {}, email: {}", 
                   event.getUsername(), event.getEmail());
        
        // 模拟发送欢迎邮件
        try {
            Thread.sleep(1000); // 模拟邮件发送时间
            logger.info("欢迎邮件发送成功给: {}", event.getEmail());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.error("邮件发送被中断", e);
        }
    }
}

@Component
public class SmsNotificationListener implements ApplicationListener<UserRegistrationEvent> {
    private static final Logger logger = LoggerFactory.getLogger(SmsNotificationListener.class);
    
    @Override
    public void onApplicationEvent(UserRegistrationEvent event) {
        logger.info("短信通知监听器:用户注册 - username: {}", event.getUsername());
        
        // 模拟发送短信
        try {
            Thread.sleep(500); // 模拟短信发送时间
            logger.info("注册短信发送成功给: {}", event.getUsername());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.error("短信发送被中断", e);
        }
    }
}

@Component
public class OrderProcessingListener implements ApplicationListener<OrderStatusChangeEvent> {
    private static final Logger logger = LoggerFactory.getLogger(OrderProcessingListener.class);
    
    @Override
    public void onApplicationEvent(OrderStatusChangeEvent event) {
        logger.info("订单处理监听器:订单状态变更 - orderId: {}, {} -> {}, amount: {}", 
                   event.getOrderId(), event.getOldStatus(), event.getNewStatus(), event.getOrderAmount());
        
        // 模拟订单处理
        try {
            Thread.sleep(2000); // 模拟处理时间
            logger.info("订单处理完成: {}", event.getOrderId());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.error("订单处理被中断", e);
        }
    }
}

@Component
public class InventoryUpdateListener implements ApplicationListener<OrderStatusChangeEvent> {
    private static final Logger logger = LoggerFactory.getLogger(InventoryUpdateListener.class);
    
    @Override
    public void onApplicationEvent(OrderStatusChangeEvent event) {
        logger.info("库存更新监听器:订单状态变更 - orderId: {}", event.getOrderId());
        
        // 模拟库存更新
        try {
            Thread.sleep(800); // 模拟更新时间
            logger.info("库存更新完成 for order: {}", event.getOrderId());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.error("库存更新被中断", e);
        }
    }
}

// 事件发布服务 - Event Publishing Service
@Service
public class UserService {
    private static final Logger logger = LoggerFactory.getLogger(UserService.class);
    private final ApplicationEventPublisher eventPublisher;
    
    @Autowired
    public UserService(ApplicationEventPublisher eventPublisher) {
        this.eventPublisher = eventPublisher;
    }
    
    public void registerUser(String username, String email) {
        logger.info("用户服务:注册用户 - username: {}, email: {}", username, email);
        
        // 模拟用户注册逻辑
        try {
            Thread.sleep(100); // 模拟处理时间
            
            // 发布用户注册事件
            UserRegistrationEvent event = new UserRegistrationEvent(this, username, email);
            eventPublisher.publishEvent(event);
            
            logger.info("用户注册完成: {}", username);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.error("用户注册被中断", e);
        }
    }
}

@Service
public class OrderService {
    private static final Logger logger = LoggerFactory.getLogger(OrderService.class);
    private final ApplicationEventPublisher eventPublisher;
    
    @Autowired
    public OrderService(ApplicationEventPublisher eventPublisher) {
        this.eventPublisher = eventPublisher;
    }
    
    public void changeOrderStatus(String orderId, String oldStatus, String newStatus, BigDecimal amount) {
        logger.info("订单服务:订单状态变更 - orderId: {}, {} -> {}, amount: {}", 
                   orderId, oldStatus, newStatus, amount);
        
        // 模拟订单状态变更逻辑
        try {
            Thread.sleep(200); // 模拟处理时间
            
            // 发布订单状态变更事件
            OrderStatusChangeEvent event = new OrderStatusChangeEvent(this, orderId, oldStatus, newStatus, amount);
            eventPublisher.publishEvent(event);
            
            logger.info("订单状态变更完成: {}", orderId);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.error("订单状态变更被中断", e);
        }
    }
}

// 客户端使用 - Client Usage
@SpringBootApplication
public class SpringEventObserverDemo {
    
    public static void main(String[] args) {
        SpringApplication.run(SpringEventObserverDemo.class, args);
        
        System.out.println("=== Spring ApplicationEvent观察者模式演示 ===");
        System.out.println("启动Spring Boot应用,测试应用事件功能");
    }
    
    @Component
    public static class EventObserverDemoRunner implements CommandLineRunner {
        
        @Autowired
        private UserService userService;
        
        @Autowired
        private OrderService orderService;
        
        @Override
        public void run(String... args) throws Exception {
            System.out.println("\n--- 开始Spring应用事件观察者模式演示 ---");
            
            // 演示用户注册事件
            System.out.println("\n--- 演示用户注册事件 ---");
            userService.registerUser("张三", "zhangsan@example.com");
            userService.registerUser("李四", "lisi@example.com");
            
            // 等待事件处理完成
            Thread.sleep(2000);
            
            // 演示订单状态变更事件
            System.out.println("\n--- 演示订单状态变更事件 ---");
            orderService.changeOrderStatus("ORDER-001", "CREATED", "PAID", new BigDecimal("299.99"));
            orderService.changeOrderStatus("ORDER-001", "PAID", "SHIPPED", new BigDecimal("299.99"));
            orderService.changeOrderStatus("ORDER-001", "SHIPPED", "DELIVERED", new BigDecimal("299.99"));
            
            // 等待事件处理完成
            Thread.sleep(3000);
            
            System.out.println("\nSpring应用事件观察者模式演示完成");
        }
    }
}

观察者模式与其他模式比较 (Comparison with Other Patterns)

特性 观察者模式 中介者模式 责任链模式 发布-订阅
目的 状态通知 交互协调 请求传递 消息分发
通信方式 一对一/一对多 多对多 链式传递 广播
耦合度 低耦合 中等耦合 低耦合 极低耦合
使用场景 事件通知 复杂交互 处理链 消息系统
实现复杂度 简单 中等 中等 复杂

最佳实践 (Best Practices)

1. 主题设计 (Subject Design)

  • 保持主题的接口简单明了
  • 避免在主题中包含过多的业务逻辑
  • 考虑使用接口隔离原则

2. 观察者设计 (Observer Design)

  • 保持观察者的独立性
  • 避免观察者之间的依赖关系
  • 考虑观察者的执行顺序

3. 通知策略 (Notification Strategy)

  • 选择合适的通知时机(拉模型 vs 推模型)
  • 考虑异步通知以提高性能
  • 处理好通知的顺序和优先级

4. 错误处理 (Error Handling)

  • 处理观察者异常,避免影响其他观察者
  • 考虑使用try-catch包装观察者调用
  • 提供适当的错误恢复机制

5. 性能优化 (Performance Optimization)

  • 使用合适的数据结构存储观察者
  • 考虑使用异步处理提高响应速度
  • 避免在通知过程中进行耗时操作

优缺点 (Pros and Cons)

优点 (Advantages)

  • 松耦合:观察者和主题之间是抽象耦合
  • 动态关系:可以在运行时添加或删除观察者
  • 广播通信:主题可以同时通知多个观察者
  • 简化对象:每个对象只需要关注自己的职责
  • 扩展性:易于添加新的观察者类型

缺点 (Disadvantages)

  • 意外更新:可能导致意外的更新和循环依赖
  • 性能开销:通知所有观察者可能带来性能开销
  • 调试困难:由于动态关系,调试可能变得复杂
  • 内存泄漏:如果观察者没有正确移除,可能导致内存泄漏
  • 更新顺序:观察者的更新顺序可能不确定

实际应用建议 (Practical Application Tips)

1. 生命周期管理

  • 确保正确管理观察者的生命周期
  • 在不需要时及时移除观察者
  • 考虑使用弱引用避免内存泄漏

2. 更新策略

  • 选择合适的更新策略(拉模型 vs 推模型)
  • 考虑使用增量更新减少数据传输
  • 实现适当的更新频率控制

3. 线程安全

  • 在并发环境下确保线程安全
  • 考虑使用不可变对象作为通知数据
  • 处理好同步和异步更新的问题

4. 测试策略

  • 单独测试每个观察者的逻辑
  • 测试观察者的注册和移除
  • 验证通知的正确性和完整性

总结 (Summary)

观察者模式是一种基础且强大的行为型设计模式,它通过定义对象之间的一对多依赖关系,实现了松耦合的事件通知机制。这种模式在现代软件架构中有着广泛的应用,从GUI系统到事件驱动架构,从消息系统到实时数据推送。

在实际应用中,观察者模式特别适合以下场景:

  • 需要实现事件驱动架构的系统
  • 需要实现模型-视图分离的应用程序
  • 需要支持广播通信的消息系统
  • 需要实现发布-订阅机制的服务
  • 需要实现实时数据推送的应用

通过合理使用观察者模式,可以构建出松耦合、可扩展、易于维护的系统架构。然而,需要注意处理好生命周期管理、性能优化和错误处理等问题,确保系统的稳定性和可靠性。

Logo

开源鸿蒙跨平台开发社区汇聚开发者与厂商,共建“一次开发,多端部署”的开源生态,致力于降低跨端开发门槛,推动万物智联创新。

更多推荐