1. 什么是 Semaphore?

Semaphore(信号量)是一种用于控制并发访问的同步机制。它本质上是一个计数器,用来限制同时访问某个资源的线程数量。

1.1 基本概念

想象一个停车场,有10个停车位:

  • 信号量计数器:代表可用停车位数量
  • acquire():车辆进入停车场(占用一个停车位)
  • release():车辆离开停车场(释放一个停车位)
// 创建一个允许3个线程同时访问的信号量
Semaphore semaphore = new Semaphore(3);

1.2 Semaphore vs 其他同步机制

同步机制 特点 使用场景
Semaphore 控制同时访问数量 限制并发线程数
Mutex/Lock 互斥访问(只允许1个) 保护临界区
CountDownLatch 等待多个任务完成 协调多线程启动

2. Android 中的 Semaphore

在 Android 开发中,Semaphore 主要用于:

  • 控制网络请求并发数
  • 限制数据库连接
  • 管理线程池大小
  • 控制资源访问

2.1 基本使用方法

import java.util.concurrent.Semaphore;

public class BasicSemaphoreExample {
    // 创建允许2个线程同时访问的信号量
    private Semaphore semaphore = new Semaphore(2);
    
    public void accessResource() {
        try {
            // 获取许可证(如果没有可用许可证,线程会阻塞)
            semaphore.acquire();
            
            // 执行需要限制并发的操作
            System.out.println(Thread.currentThread().getName() + " 正在访问资源");
            Thread.sleep(2000); // 模拟耗时操作
            
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 释放许可证(非常重要!)
            semaphore.release();
            System.out.println(Thread.currentThread().getName() + " 释放资源");
        }
    }
}

2.2 核心方法详解

public class SemaphoreMethods {
    private Semaphore semaphore = new Semaphore(3);
    
    // 1. acquire() - 获取一个许可证
    public void basicAcquire() {
        try {
            semaphore.acquire(); // 阻塞直到获得许可证
            // 执行操作
        } catch (InterruptedException e) {
            // 处理中断
        } finally {
            semaphore.release(); // 必须释放
        }
    }
    
    // 2. tryAcquire() - 尝试获取许可证(非阻塞)
    public void tryAcquireExample() {
        if (semaphore.tryAcquire()) {
            try {
                // 成功获得许可证,执行操作
                System.out.println("获得许可证,执行操作");
            } finally {
                semaphore.release();
            }
        } else {
            // 没有可用许可证
            System.out.println("没有可用许可证,稍后重试");
        }
    }
    
    // 3. tryAcquire(timeout) - 带超时的尝试获取
    public void tryAcquireWithTimeout() {
        try {
            if (semaphore.tryAcquire(5, TimeUnit.SECONDS)) {
                try {
                    // 在5秒内获得了许可证
                    System.out.println("获得许可证");
                } finally {
                    semaphore.release();
                }
            } else {
                System.out.println("5秒内未能获得许可证");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    // 4. acquire(permits) - 一次获取多个许可证
    public void acquireMultiple() {
        try {
            semaphore.acquire(2); // 一次获取2个许可证
            try {
                // 执行需要2个许可证的操作
            } finally {
                semaphore.release(2); // 释放2个许可证
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

3. Android 实际应用场景

3.1 控制网络请求并发

public class NetworkRequestManager {
    // 限制同时进行的网络请求数为3个
    private static final Semaphore NETWORK_SEMAPHORE = new Semaphore(3);
    
    public void makeNetworkRequest(String url, Callback callback) {
        new Thread(() -> {
            try {
                // 获取网络请求许可证
                NETWORK_SEMAPHORE.acquire();
                
                // 执行网络请求
                performNetworkRequest(url, new Callback() {
                    @Override
                    public void onSuccess(String response) {
                        callback.onSuccess(response);
                        // 请求完成,释放许可证
                        NETWORK_SEMAPHORE.release();
                    }
                    
                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);
                        // 请求失败,也要释放许可证
                        NETWORK_SEMAPHORE.release();
                    }
                });
                
            } catch (InterruptedException e) {
                callback.onError(e);
            }
        }).start();
    }
    
    private void performNetworkRequest(String url, Callback callback) {
        // 实际的网络请求逻辑
        try {
            // 模拟网络请求
            Thread.sleep(2000);
            callback.onSuccess("Response from " + url);
        } catch (InterruptedException e) {
            callback.onError(e);
        }
    }
    
    interface Callback {
        void onSuccess(String response);
        void onError(Exception e);
    }
}

3.2 Android 文件下载管理器

public class DownloadManager {
    // 限制同时下载的文件数量
    private static final int MAX_CONCURRENT_DOWNLOADS = 2;
    private Semaphore downloadSemaphore = new Semaphore(MAX_CONCURRENT_DOWNLOADS);
    
    public void downloadFile(String url, String fileName, DownloadCallback callback) {
        new AsyncTask<Void, Integer, Boolean>() {
            @Override
            protected void onPreExecute() {
                try {
                    // 尝试获取下载许可证
                    downloadSemaphore.acquire();
                    callback.onStarted();
                } catch (InterruptedException e) {
                    callback.onError("无法获取下载许可:" + e.getMessage());
                    cancel(true);
                }
            }
            
            @Override
            protected Boolean doInBackground(Void... voids) {
                try {
                    // 执行文件下载
                    return performDownload(url, fileName, 
                        progress -> publishProgress(progress));
                } catch (Exception e) {
                    return false;
                }
            }
            
            @Override
            protected void onProgressUpdate(Integer... values) {
                callback.onProgress(values[0]);
            }
            
            @Override
            protected void onPostExecute(Boolean success) {
                // 下载完成,释放许可证
                downloadSemaphore.release();
                
                if (success) {
                    callback.onCompleted();
                } else {
                    callback.onError("下载失败");
                }
            }
            
            @Override
            protected void onCancelled() {
                // 取消时也要释放许可证
                downloadSemaphore.release();
                callback.onError("下载被取消");
            }
        }.execute();
    }
    
    private boolean performDownload(String url, String fileName, 
                                  ProgressCallback progressCallback) {
        // 实际下载逻辑
        try {
            for (int i = 0; i <= 100; i += 10) {
                Thread.sleep(200); // 模拟下载进度
                progressCallback.onProgress(i);
            }
            return true;
        } catch (InterruptedException e) {
            return false;
        }
    }
    
    interface DownloadCallback {
        void onStarted();
        void onProgress(int progress);
        void onCompleted();
        void onError(String error);
    }
    
    interface ProgressCallback {
        void onProgress(int progress);
    }
}

3.3 数据库连接池管理

public class DatabaseConnectionPool {
    private static final int MAX_CONNECTIONS = 5;
    private Semaphore connectionSemaphore = new Semaphore(MAX_CONNECTIONS);
    private Queue<SQLiteDatabase> availableConnections = new LinkedList<>();
    
    public DatabaseConnectionPool(Context context) {
        // 初始化连接池
        for (int i = 0; i < MAX_CONNECTIONS; i++) {
            SQLiteDatabase db = new DatabaseHelper(context).getWritableDatabase();
            availableConnections.offer(db);
        }
    }
    
    public void executeQuery(String query, DatabaseCallback callback) {
        new Thread(() -> {
            SQLiteDatabase connection = null;
            try {
                // 获取数据库连接许可证
                connectionSemaphore.acquire();
                
                // 从连接池获取连接
                synchronized (availableConnections) {
                    connection = availableConnections.poll();
                }
                
                if (connection != null) {
                    // 执行数据库操作
                    Cursor cursor = connection.rawQuery(query, null);
                    List<String> results = new ArrayList<>();
                    
                    while (cursor.moveToNext()) {
                        results.add(cursor.getString(0));
                    }
                    cursor.close();
                    
                    // 回调结果
                    callback.onSuccess(results);
                } else {
                    callback.onError("无法获取数据库连接");
                }
                
            } catch (InterruptedException e) {
                callback.onError("获取连接被中断:" + e.getMessage());
            } catch (Exception e) {
                callback.onError("数据库操作失败:" + e.getMessage());
            } finally {
                // 归还连接到连接池
                if (connection != null) {
                    synchronized (availableConnections) {
                        availableConnections.offer(connection);
                    }
                }
                // 释放许可证
                connectionSemaphore.release();
            }
        }).start();
    }
    
    interface DatabaseCallback {
        void onSuccess(List<String> results);
        void onError(String error);
    }
}

4. 高级用法和最佳实践

4.1 公平性控制

public class FairSemaphoreExample {
    // 公平信号量 - 按照请求顺序分配许可证
    private Semaphore fairSemaphore = new Semaphore(3, true);
    
    // 非公平信号量 - 不保证顺序(默认)
    private Semaphore unfairSemaphore = new Semaphore(3, false);
    
    public void demonstrateFairness() {
        // 创建多个线程测试公平性
        for (int i = 0; i < 10; i++) {
            final int threadId = i;
            new Thread(() -> {
                try {
                    System.out.println("线程 " + threadId + " 请求许可证");
                    fairSemaphore.acquire();
                    
                    System.out.println("线程 " + threadId + " 获得许可证");
                    Thread.sleep(1000);
                    
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    fairSemaphore.release();
                    System.out.println("线程 " + threadId + " 释放许可证");
                }
            }).start();
        }
    }
}

4.2 动态调整许可证数量

public class DynamicSemaphore {
    private Semaphore semaphore = new Semaphore(3);
    
    // 增加许可证数量
    public void increasePermits(int permits) {
        semaphore.release(permits);
        System.out.println("增加了 " + permits + " 个许可证");
    }
    
    // 减少许可证数量
    public void decreasePermits(int permits) {
        try {
            semaphore.acquire(permits);
            System.out.println("减少了 " + permits + " 个许可证");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    // 获取当前可用许可证数量
    public int getAvailablePermits() {
        return semaphore.availablePermits();
    }
    
    // 获取等待许可证的线程数量
    public int getQueueLength() {
        return semaphore.getQueueLength();
    }
}

4.3 与 RxJava 结合使用

public class RxSemaphoreExample {
    private Semaphore semaphore = new Semaphore(2);
    
    public Observable<String> processWithSemaphore(String data) {
        return Observable.fromCallable(() -> {
            semaphore.acquire();
            try {
                // 执行处理逻辑
                Thread.sleep(1000);
                return "处理完成: " + data;
            } finally {
                semaphore.release();
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());
    }
    
    // 使用示例
    public void example() {
        for (int i = 0; i < 10; i++) {
            processWithSemaphore("数据" + i)
                .subscribe(
                    result -> Log.d("RxSemaphore", result),
                    error -> Log.e("RxSemaphore", "错误: " + error.getMessage())
                );
        }
    }
}

5. 常见陷阱和注意事项

5.1 必须配对使用 acquire() 和 release()

// ❌ 错误示例 - 可能导致许可证泄漏
public void badExample() {
    try {
        semaphore.acquire();
        if (someCondition) {
            return; // 忘记释放许可证!
        }
        // 执行操作
    } catch (Exception e) {
        return; // 又忘记释放许可证!
    }
    semaphore.release(); // 只有正常情况下才会执行到这里
}

// ✅ 正确示例 - 使用 finally 确保释放
public void goodExample() {
    try {
        semaphore.acquire();
        if (someCondition) {
            return;
        }
        // 执行操作
    } catch (Exception e) {
        // 处理异常
    } finally {
        semaphore.release(); // 无论什么情况都会执行
    }
}

5.2 避免死锁

public class DeadlockAvoidance {
    private Semaphore semaphore1 = new Semaphore(1);
    private Semaphore semaphore2 = new Semaphore(1);
    
    // ❌ 可能导致死锁
    public void badMethod1() {
        try {
            semaphore1.acquire();
            semaphore2.acquire();
            // 执行操作
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore2.release();
            semaphore1.release();
        }
    }
    
    public void badMethod2() {
        try {
            semaphore2.acquire(); // 不同的获取顺序
            semaphore1.acquire();
            // 执行操作
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore1.release();
            semaphore2.release();
        }
    }
    
    // ✅ 避免死锁 - 统一获取顺序
    public void goodMethod1() {
        try {
            semaphore1.acquire(); // 总是先获取 semaphore1
            semaphore2.acquire(); // 再获取 semaphore2
            // 执行操作
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            semaphore2.release();
            semaphore1.release();
        }
    }
}

5.3 处理中断

public class InterruptHandling {
    private Semaphore semaphore = new Semaphore(3);
    
    public void handleInterruption() {
        boolean acquired = false;
        try {
            semaphore.acquire();
            acquired = true;
            
            // 执行可能被中断的操作
            Thread.sleep(5000);
            
        } catch (InterruptedException e) {
            // 处理中断
            Thread.currentThread().interrupt(); // 重新设置中断状态
            Log.w("Semaphore", "操作被中断");
        } finally {
            if (acquired) {
                semaphore.release();
            }
        }
    }
}

6. 性能考虑和监控

6.1 性能监控

public class SemaphoreMonitor {
    private Semaphore semaphore;
    private String name;
    
    public SemaphoreMonitor(int permits, String name) {
        this.semaphore = new Semaphore(permits);
        this.name = name;
    }
    
    public void acquire() throws InterruptedException {
        long startTime = System.currentTimeMillis();
        semaphore.acquire();
        long waitTime = System.currentTimeMillis() - startTime;
        
        Log.d("SemaphoreMonitor", String.format(
            "%s - 获取许可证耗时: %dms, 剩余许可证: %d, 等待队列: %d",
            name, waitTime, semaphore.availablePermits(), 
            semaphore.getQueueLength()
        ));
    }
    
    public void release() {
        semaphore.release();
        Log.d("SemaphoreMonitor", String.format(
            "%s - 释放许可证, 剩余许可证: %d",
            name, semaphore.availablePermits()
        ));
    }
    
    public void printStatus() {
        Log.i("SemaphoreMonitor", String.format(
            "%s 状态 - 可用许可证: %d, 等待线程数: %d",
            name, semaphore.availablePermits(), semaphore.getQueueLength()
        ));
    }
}

7. 完整示例:图片加载器

这是一个综合运用 Semaphore 的完整示例:

public class ImageLoader {
    private static final int MAX_CONCURRENT_LOADS = 3;
    private Semaphore loadSemaphore = new Semaphore(MAX_CONCURRENT_LOADS); 
    private LruCache<String, Bitmap> memoryCache; // 内存缓存,用于存储已加载的图片
    private ExecutorService executor = Executors.newCachedThreadPool(); // 异步任务执行器,用于并发加载图片
    
    public ImageLoader() {
        // 初始化内存缓存
        final int maxMemory = (int) (Runtime.getRuntime().maxMemory() / 1024); // 获取当前应用可用的最大内存
        final int cacheSize = maxMemory / 8; // 缓存大小设置为最大内存的 1/8
        
        memoryCache = new LruCache<String, Bitmap>(cacheSize) {
            @Override
            protected int sizeOf(String key, Bitmap bitmap) {
                return bitmap.getByteCount() / 1024; // 根据图片字节大小计算缓存占用空间
            }
        };
    }
    
    /**
     * 加载图片并显示到指定的 ImageView 中。
     * - 如果图片已存在于内存缓存中,直接从缓存加载。
     * - 如果图片不在缓存中,则通过网络下载图片,并更新到 UI。
     *
     * @param url 图片的下载地址
     * @param imageView 需要显示图片的目标 ImageView
     * @param callback 加载成功或失败的回调接口
     */
    public void loadImage(String url, ImageView imageView, LoadCallback callback) {
        // 检查缓存是否已有图片
        Bitmap cached = memoryCache.get(url);
        if (cached != null) {
            imageView.setImageBitmap(cached); // 直接设置缓存中的图片
            callback.onSuccess(); // 调用成功回调
            return;
        }
        
        // 异步加载图片
        executor.submit(() -> {
            boolean acquired = false; // 标记是否成功获取信号量
            try {
                loadSemaphore.acquire(); // 获取信号量,限制同时加载图片的任务数量
                acquired = true; // 如果成功获取信号量,标记为 true
                
                // 再次检查缓存,避免重复加载
                Bitmap doubleCached = memoryCache.get(url);
                if (doubleCached != null) {
                    runOnUiThread(() -> {
                        imageView.setImageBitmap(doubleCached); // 设置图片到 ImageView
                        callback.onSuccess(); // 调用成功回调
                    });
                    return;
                }
                
                // 下载图片
                Bitmap bitmap = downloadImage(url);
                if (bitmap != null) {
                    memoryCache.put(url, bitmap); // 将图片存入缓存
                    runOnUiThread(() -> {
                        imageView.setImageBitmap(bitmap); // 更新 UI 显示图片
                        callback.onSuccess(); // 调用成功回调
                    });
                } else {
                    runOnUiThread(() -> callback.onError("下载失败")); // 下载失败时调用错误回调
                }
                
            } catch (InterruptedException e) {
                runOnUiThread(() -> callback.onError("加载被中断")); // 线程被中断时调用错误回调
            } catch (Exception e) {
                runOnUiThread(() -> callback.onError("加载出错: " + e.getMessage())); // 发生异常时调用错误回调
            } finally {
                if (acquired) {
                    loadSemaphore.release(); // 释放信号量,允许其他任务继续执行
                }
            }
        });
    }
    
    /**
     * 从指定的 URL 下载图片。
     * - 使用网络连接获取图片数据,并将其转换为 Bitmap。
     *
     * @param url 图片的下载地址
     * @return 下载的 Bitmap 对象,或 null 如果下载失败
     */
    private Bitmap downloadImage(String url) {
        try {
            URL imageUrl = new URL(url); // 创建 URL 对象
            InputStream inputStream = imageUrl.openConnection().getInputStream(); // 打开连接并获取输入流
            return BitmapFactory.decodeStream(inputStream); // 将输入流解析为 Bitmap
        } catch (Exception e) {
            return null; // 如果发生异常,返回 null
        }
    }
    
    /**
     * 在主线程上执行指定的 Runnable。
     * - 使用 Android 的 Handler 实现,将任务切换到主线程。
     *
     * @param runnable 要在主线程执行的任务
     */
    private void runOnUiThread(Runnable runnable) {
        new Handler(Looper.getMainLooper()).post(runnable); // 使用主线程的 Looper 调度任务
    }
    
    /**
     * 加载图片的回调接口。
     * - 提供成功和错误的回调方法。
     */
    interface LoadCallback {
        void onSuccess(); // 加载成功时的回调
        void onError(String message); // 加载失败时的回调
    }
    
    /**
     * 清理资源。
     * - 关闭 ExecutorService,释放相关资源。
     */
    public void shutdown() {
        executor.shutdown(); // 关闭线程池
    }
}

8. 总结

Semaphore 是 Android 开发中非常有用的并发控制工具:

主要优点:

  • 精确控制:可以精确控制同时访问资源的线程数量
  • 灵活性:支持公平/非公平模式,可动态调整许可证数量
  • 性能:相比于 synchronized,在某些场景下性能更好

适用场景:

  • 控制网络请求并发数
  • 限制文件下载/上传数量
  • 管理数据库连接池
  • 控制 CPU 密集型任务的并发度

注意事项:

  • 必须配对使用 acquire() 和 release()
  • 使用 finally 块确保许可证释放
  • 注意死锁问题
  • 合理设置许可证数量

Logo

开源鸿蒙跨平台开发社区汇聚开发者与厂商,共建“一次开发,多端部署”的开源生态,致力于降低跨端开发门槛,推动万物智联创新。

更多推荐