284.Semaphore (信号量)
Semaphore(信号量)是一种用于控制并发访问的同步机制。它本质上是一个计数器,用来限制同时访问某个资源的线程数量。
·
1. 什么是 Semaphore?
Semaphore(信号量)是一种用于控制并发访问的同步机制。它本质上是一个计数器,用来限制同时访问某个资源的线程数量。
1.1 基本概念
想象一个停车场,有10个停车位:
- 信号量计数器:代表可用停车位数量
- acquire():车辆进入停车场(占用一个停车位)
- release():车辆离开停车场(释放一个停车位)
// 创建一个允许3个线程同时访问的信号量
Semaphore semaphore = new Semaphore(3);
1.2 Semaphore vs 其他同步机制
| 同步机制 | 特点 | 使用场景 |
|---|---|---|
| Semaphore | 控制同时访问数量 | 限制并发线程数 |
| Mutex/Lock | 互斥访问(只允许1个) | 保护临界区 |
| CountDownLatch | 等待多个任务完成 | 协调多线程启动 |
2. Android 中的 Semaphore
在 Android 开发中,Semaphore 主要用于:
- 控制网络请求并发数
- 限制数据库连接
- 管理线程池大小
- 控制资源访问
2.1 基本使用方法
import java.util.concurrent.Semaphore;
public class BasicSemaphoreExample {
// 创建允许2个线程同时访问的信号量
private Semaphore semaphore = new Semaphore(2);
public void accessResource() {
try {
// 获取许可证(如果没有可用许可证,线程会阻塞)
semaphore.acquire();
// 执行需要限制并发的操作
System.out.println(Thread.currentThread().getName() + " 正在访问资源");
Thread.sleep(2000); // 模拟耗时操作
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放许可证(非常重要!)
semaphore.release();
System.out.println(Thread.currentThread().getName() + " 释放资源");
}
}
}
2.2 核心方法详解
public class SemaphoreMethods {
private Semaphore semaphore = new Semaphore(3);
// 1. acquire() - 获取一个许可证
public void basicAcquire() {
try {
semaphore.acquire(); // 阻塞直到获得许可证
// 执行操作
} catch (InterruptedException e) {
// 处理中断
} finally {
semaphore.release(); // 必须释放
}
}
// 2. tryAcquire() - 尝试获取许可证(非阻塞)
public void tryAcquireExample() {
if (semaphore.tryAcquire()) {
try {
// 成功获得许可证,执行操作
System.out.println("获得许可证,执行操作");
} finally {
semaphore.release();
}
} else {
// 没有可用许可证
System.out.println("没有可用许可证,稍后重试");
}
}
// 3. tryAcquire(timeout) - 带超时的尝试获取
public void tryAcquireWithTimeout() {
try {
if (semaphore.tryAcquire(5, TimeUnit.SECONDS)) {
try {
// 在5秒内获得了许可证
System.out.println("获得许可证");
} finally {
semaphore.release();
}
} else {
System.out.println("5秒内未能获得许可证");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 4. acquire(permits) - 一次获取多个许可证
public void acquireMultiple() {
try {
semaphore.acquire(2); // 一次获取2个许可证
try {
// 执行需要2个许可证的操作
} finally {
semaphore.release(2); // 释放2个许可证
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
3. Android 实际应用场景
3.1 控制网络请求并发
public class NetworkRequestManager {
// 限制同时进行的网络请求数为3个
private static final Semaphore NETWORK_SEMAPHORE = new Semaphore(3);
public void makeNetworkRequest(String url, Callback callback) {
new Thread(() -> {
try {
// 获取网络请求许可证
NETWORK_SEMAPHORE.acquire();
// 执行网络请求
performNetworkRequest(url, new Callback() {
@Override
public void onSuccess(String response) {
callback.onSuccess(response);
// 请求完成,释放许可证
NETWORK_SEMAPHORE.release();
}
@Override
public void onError(Exception e) {
callback.onError(e);
// 请求失败,也要释放许可证
NETWORK_SEMAPHORE.release();
}
});
} catch (InterruptedException e) {
callback.onError(e);
}
}).start();
}
private void performNetworkRequest(String url, Callback callback) {
// 实际的网络请求逻辑
try {
// 模拟网络请求
Thread.sleep(2000);
callback.onSuccess("Response from " + url);
} catch (InterruptedException e) {
callback.onError(e);
}
}
interface Callback {
void onSuccess(String response);
void onError(Exception e);
}
}
3.2 Android 文件下载管理器
public class DownloadManager {
// 限制同时下载的文件数量
private static final int MAX_CONCURRENT_DOWNLOADS = 2;
private Semaphore downloadSemaphore = new Semaphore(MAX_CONCURRENT_DOWNLOADS);
public void downloadFile(String url, String fileName, DownloadCallback callback) {
new AsyncTask<Void, Integer, Boolean>() {
@Override
protected void onPreExecute() {
try {
// 尝试获取下载许可证
downloadSemaphore.acquire();
callback.onStarted();
} catch (InterruptedException e) {
callback.onError("无法获取下载许可:" + e.getMessage());
cancel(true);
}
}
@Override
protected Boolean doInBackground(Void... voids) {
try {
// 执行文件下载
return performDownload(url, fileName,
progress -> publishProgress(progress));
} catch (Exception e) {
return false;
}
}
@Override
protected void onProgressUpdate(Integer... values) {
callback.onProgress(values[0]);
}
@Override
protected void onPostExecute(Boolean success) {
// 下载完成,释放许可证
downloadSemaphore.release();
if (success) {
callback.onCompleted();
} else {
callback.onError("下载失败");
}
}
@Override
protected void onCancelled() {
// 取消时也要释放许可证
downloadSemaphore.release();
callback.onError("下载被取消");
}
}.execute();
}
private boolean performDownload(String url, String fileName,
ProgressCallback progressCallback) {
// 实际下载逻辑
try {
for (int i = 0; i <= 100; i += 10) {
Thread.sleep(200); // 模拟下载进度
progressCallback.onProgress(i);
}
return true;
} catch (InterruptedException e) {
return false;
}
}
interface DownloadCallback {
void onStarted();
void onProgress(int progress);
void onCompleted();
void onError(String error);
}
interface ProgressCallback {
void onProgress(int progress);
}
}
3.3 数据库连接池管理
public class DatabaseConnectionPool {
private static final int MAX_CONNECTIONS = 5;
private Semaphore connectionSemaphore = new Semaphore(MAX_CONNECTIONS);
private Queue<SQLiteDatabase> availableConnections = new LinkedList<>();
public DatabaseConnectionPool(Context context) {
// 初始化连接池
for (int i = 0; i < MAX_CONNECTIONS; i++) {
SQLiteDatabase db = new DatabaseHelper(context).getWritableDatabase();
availableConnections.offer(db);
}
}
public void executeQuery(String query, DatabaseCallback callback) {
new Thread(() -> {
SQLiteDatabase connection = null;
try {
// 获取数据库连接许可证
connectionSemaphore.acquire();
// 从连接池获取连接
synchronized (availableConnections) {
connection = availableConnections.poll();
}
if (connection != null) {
// 执行数据库操作
Cursor cursor = connection.rawQuery(query, null);
List<String> results = new ArrayList<>();
while (cursor.moveToNext()) {
results.add(cursor.getString(0));
}
cursor.close();
// 回调结果
callback.onSuccess(results);
} else {
callback.onError("无法获取数据库连接");
}
} catch (InterruptedException e) {
callback.onError("获取连接被中断:" + e.getMessage());
} catch (Exception e) {
callback.onError("数据库操作失败:" + e.getMessage());
} finally {
// 归还连接到连接池
if (connection != null) {
synchronized (availableConnections) {
availableConnections.offer(connection);
}
}
// 释放许可证
connectionSemaphore.release();
}
}).start();
}
interface DatabaseCallback {
void onSuccess(List<String> results);
void onError(String error);
}
}
4. 高级用法和最佳实践
4.1 公平性控制
public class FairSemaphoreExample {
// 公平信号量 - 按照请求顺序分配许可证
private Semaphore fairSemaphore = new Semaphore(3, true);
// 非公平信号量 - 不保证顺序(默认)
private Semaphore unfairSemaphore = new Semaphore(3, false);
public void demonstrateFairness() {
// 创建多个线程测试公平性
for (int i = 0; i < 10; i++) {
final int threadId = i;
new Thread(() -> {
try {
System.out.println("线程 " + threadId + " 请求许可证");
fairSemaphore.acquire();
System.out.println("线程 " + threadId + " 获得许可证");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
fairSemaphore.release();
System.out.println("线程 " + threadId + " 释放许可证");
}
}).start();
}
}
}
4.2 动态调整许可证数量
public class DynamicSemaphore {
private Semaphore semaphore = new Semaphore(3);
// 增加许可证数量
public void increasePermits(int permits) {
semaphore.release(permits);
System.out.println("增加了 " + permits + " 个许可证");
}
// 减少许可证数量
public void decreasePermits(int permits) {
try {
semaphore.acquire(permits);
System.out.println("减少了 " + permits + " 个许可证");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 获取当前可用许可证数量
public int getAvailablePermits() {
return semaphore.availablePermits();
}
// 获取等待许可证的线程数量
public int getQueueLength() {
return semaphore.getQueueLength();
}
}
4.3 与 RxJava 结合使用
public class RxSemaphoreExample {
private Semaphore semaphore = new Semaphore(2);
public Observable<String> processWithSemaphore(String data) {
return Observable.fromCallable(() -> {
semaphore.acquire();
try {
// 执行处理逻辑
Thread.sleep(1000);
return "处理完成: " + data;
} finally {
semaphore.release();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
// 使用示例
public void example() {
for (int i = 0; i < 10; i++) {
processWithSemaphore("数据" + i)
.subscribe(
result -> Log.d("RxSemaphore", result),
error -> Log.e("RxSemaphore", "错误: " + error.getMessage())
);
}
}
}
5. 常见陷阱和注意事项
5.1 必须配对使用 acquire() 和 release()
// ❌ 错误示例 - 可能导致许可证泄漏
public void badExample() {
try {
semaphore.acquire();
if (someCondition) {
return; // 忘记释放许可证!
}
// 执行操作
} catch (Exception e) {
return; // 又忘记释放许可证!
}
semaphore.release(); // 只有正常情况下才会执行到这里
}
// ✅ 正确示例 - 使用 finally 确保释放
public void goodExample() {
try {
semaphore.acquire();
if (someCondition) {
return;
}
// 执行操作
} catch (Exception e) {
// 处理异常
} finally {
semaphore.release(); // 无论什么情况都会执行
}
}
5.2 避免死锁
public class DeadlockAvoidance {
private Semaphore semaphore1 = new Semaphore(1);
private Semaphore semaphore2 = new Semaphore(1);
// ❌ 可能导致死锁
public void badMethod1() {
try {
semaphore1.acquire();
semaphore2.acquire();
// 执行操作
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore2.release();
semaphore1.release();
}
}
public void badMethod2() {
try {
semaphore2.acquire(); // 不同的获取顺序
semaphore1.acquire();
// 执行操作
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore1.release();
semaphore2.release();
}
}
// ✅ 避免死锁 - 统一获取顺序
public void goodMethod1() {
try {
semaphore1.acquire(); // 总是先获取 semaphore1
semaphore2.acquire(); // 再获取 semaphore2
// 执行操作
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore2.release();
semaphore1.release();
}
}
}
5.3 处理中断
public class InterruptHandling {
private Semaphore semaphore = new Semaphore(3);
public void handleInterruption() {
boolean acquired = false;
try {
semaphore.acquire();
acquired = true;
// 执行可能被中断的操作
Thread.sleep(5000);
} catch (InterruptedException e) {
// 处理中断
Thread.currentThread().interrupt(); // 重新设置中断状态
Log.w("Semaphore", "操作被中断");
} finally {
if (acquired) {
semaphore.release();
}
}
}
}
6. 性能考虑和监控
6.1 性能监控
public class SemaphoreMonitor {
private Semaphore semaphore;
private String name;
public SemaphoreMonitor(int permits, String name) {
this.semaphore = new Semaphore(permits);
this.name = name;
}
public void acquire() throws InterruptedException {
long startTime = System.currentTimeMillis();
semaphore.acquire();
long waitTime = System.currentTimeMillis() - startTime;
Log.d("SemaphoreMonitor", String.format(
"%s - 获取许可证耗时: %dms, 剩余许可证: %d, 等待队列: %d",
name, waitTime, semaphore.availablePermits(),
semaphore.getQueueLength()
));
}
public void release() {
semaphore.release();
Log.d("SemaphoreMonitor", String.format(
"%s - 释放许可证, 剩余许可证: %d",
name, semaphore.availablePermits()
));
}
public void printStatus() {
Log.i("SemaphoreMonitor", String.format(
"%s 状态 - 可用许可证: %d, 等待线程数: %d",
name, semaphore.availablePermits(), semaphore.getQueueLength()
));
}
}
7. 完整示例:图片加载器
这是一个综合运用 Semaphore 的完整示例:
public class ImageLoader {
private static final int MAX_CONCURRENT_LOADS = 3;
private Semaphore loadSemaphore = new Semaphore(MAX_CONCURRENT_LOADS);
private LruCache<String, Bitmap> memoryCache; // 内存缓存,用于存储已加载的图片
private ExecutorService executor = Executors.newCachedThreadPool(); // 异步任务执行器,用于并发加载图片
public ImageLoader() {
// 初始化内存缓存
final int maxMemory = (int) (Runtime.getRuntime().maxMemory() / 1024); // 获取当前应用可用的最大内存
final int cacheSize = maxMemory / 8; // 缓存大小设置为最大内存的 1/8
memoryCache = new LruCache<String, Bitmap>(cacheSize) {
@Override
protected int sizeOf(String key, Bitmap bitmap) {
return bitmap.getByteCount() / 1024; // 根据图片字节大小计算缓存占用空间
}
};
}
/**
* 加载图片并显示到指定的 ImageView 中。
* - 如果图片已存在于内存缓存中,直接从缓存加载。
* - 如果图片不在缓存中,则通过网络下载图片,并更新到 UI。
*
* @param url 图片的下载地址
* @param imageView 需要显示图片的目标 ImageView
* @param callback 加载成功或失败的回调接口
*/
public void loadImage(String url, ImageView imageView, LoadCallback callback) {
// 检查缓存是否已有图片
Bitmap cached = memoryCache.get(url);
if (cached != null) {
imageView.setImageBitmap(cached); // 直接设置缓存中的图片
callback.onSuccess(); // 调用成功回调
return;
}
// 异步加载图片
executor.submit(() -> {
boolean acquired = false; // 标记是否成功获取信号量
try {
loadSemaphore.acquire(); // 获取信号量,限制同时加载图片的任务数量
acquired = true; // 如果成功获取信号量,标记为 true
// 再次检查缓存,避免重复加载
Bitmap doubleCached = memoryCache.get(url);
if (doubleCached != null) {
runOnUiThread(() -> {
imageView.setImageBitmap(doubleCached); // 设置图片到 ImageView
callback.onSuccess(); // 调用成功回调
});
return;
}
// 下载图片
Bitmap bitmap = downloadImage(url);
if (bitmap != null) {
memoryCache.put(url, bitmap); // 将图片存入缓存
runOnUiThread(() -> {
imageView.setImageBitmap(bitmap); // 更新 UI 显示图片
callback.onSuccess(); // 调用成功回调
});
} else {
runOnUiThread(() -> callback.onError("下载失败")); // 下载失败时调用错误回调
}
} catch (InterruptedException e) {
runOnUiThread(() -> callback.onError("加载被中断")); // 线程被中断时调用错误回调
} catch (Exception e) {
runOnUiThread(() -> callback.onError("加载出错: " + e.getMessage())); // 发生异常时调用错误回调
} finally {
if (acquired) {
loadSemaphore.release(); // 释放信号量,允许其他任务继续执行
}
}
});
}
/**
* 从指定的 URL 下载图片。
* - 使用网络连接获取图片数据,并将其转换为 Bitmap。
*
* @param url 图片的下载地址
* @return 下载的 Bitmap 对象,或 null 如果下载失败
*/
private Bitmap downloadImage(String url) {
try {
URL imageUrl = new URL(url); // 创建 URL 对象
InputStream inputStream = imageUrl.openConnection().getInputStream(); // 打开连接并获取输入流
return BitmapFactory.decodeStream(inputStream); // 将输入流解析为 Bitmap
} catch (Exception e) {
return null; // 如果发生异常,返回 null
}
}
/**
* 在主线程上执行指定的 Runnable。
* - 使用 Android 的 Handler 实现,将任务切换到主线程。
*
* @param runnable 要在主线程执行的任务
*/
private void runOnUiThread(Runnable runnable) {
new Handler(Looper.getMainLooper()).post(runnable); // 使用主线程的 Looper 调度任务
}
/**
* 加载图片的回调接口。
* - 提供成功和错误的回调方法。
*/
interface LoadCallback {
void onSuccess(); // 加载成功时的回调
void onError(String message); // 加载失败时的回调
}
/**
* 清理资源。
* - 关闭 ExecutorService,释放相关资源。
*/
public void shutdown() {
executor.shutdown(); // 关闭线程池
}
}
8. 总结
Semaphore 是 Android 开发中非常有用的并发控制工具:
主要优点:
- 精确控制:可以精确控制同时访问资源的线程数量
- 灵活性:支持公平/非公平模式,可动态调整许可证数量
- 性能:相比于 synchronized,在某些场景下性能更好
适用场景:
- 控制网络请求并发数
- 限制文件下载/上传数量
- 管理数据库连接池
- 控制 CPU 密集型任务的并发度
注意事项:
- 必须配对使用 acquire() 和 release()
- 使用 finally 块确保许可证释放
- 注意死锁问题
- 合理设置许可证数量
更多推荐


所有评论(0)