【补丁】Stream和WebSocket
Stream 和 WebSocket 都是处理异步数据和实时通信的常用工具,尤其在 Dart 和 Flutter 开发中应用广泛。这两者结合使用时,可以实现高效的实时通信与动态数据更新,提升用户体验。
Stream
Dart 提供了 Stream 类型来处理异步数据流和事件流,尤其适用于实时数据更新、事件处理等场景。
1. Stream 是什么?
在 Dart 中,Stream 是一个用于处理异步事件的对象,表示一个可以发出一系列事件或数据的流。与 Future 相似,Stream 也用于异步编程,但与 Future 只返回一个结果不同,Stream 可以返回多个数据事件。
- 数据流:
Stream可以返回多个数据值,直到流完成或发生错误。 - 异步操作:通常用于 UI 或者 I/O 操作的异步处理。
2. 如何创建 Stream?
Dart 提供了几种方法来创建 Stream,如通过 Stream 工厂构造函数、StreamController 和生成器函数等。
2.1 通过生成器创建 Stream
你可以通过 async* 或 sync* 函数创建 Stream:
Stream<int> generateNumbers() async* {
for (int i = 1; i <= 5; i++) {
await Future.delayed(Duration(seconds: 1));
yield i; // 每秒钟发出一个整数
}
}
在这个例子中,async* 用于异步生成数据。每次 yield 都会把一个数据值发射到流中。
2.2 通过 StreamController 创建 Stream
StreamController 是 Dart 中用于创建和管理 Stream 的工具,可以用它来动态地控制流的内容。
import 'dart:async';
void main() {
final controller = StreamController<int>();
// 监听流
controller.stream.listen((data) {
print('Received data: $data');
});
// 发出数据
controller.add(1);
controller.add(2);
controller.add(3);
// 关闭流
controller.close();
}
这里通过 StreamController 创建了一个流,并且向其中添加数据,最终关闭流。
3. 如何监听 Stream?
你可以通过 listen 方法来订阅和监听 Stream:
final stream = generateNumbers();
stream.listen((data) {
print('Received: $data');
});
listen 方法允许你处理 Stream 发出的每一个数据。你还可以提供额外的回调来处理错误和流结束的情况。
stream.listen(
(data) => print('Received: $data'),
onError: (error) => print('Error: $error'),
onDone: () => print('Stream is done!'),
);
4. Stream 与 UI 结合:StreamBuilder
在 Flutter 中,StreamBuilder 是一个用于将 Stream 与 UI 更新结合的非常有用的小部件。每当流中的数据发生变化时,StreamBuilder 会自动重建 UI。
import 'package:flutter/material.dart';
void main() {
runApp(MyApp());
}
class MyApp extends StatelessWidget {
Widget build(BuildContext context) {
return MaterialApp(
home: StreamBuilderExample(),
);
}
}
class StreamBuilderExample extends StatelessWidget {
Stream<int> generateNumbers() async* {
for (int i = 1; i <= 5; i++) {
await Future.delayed(Duration(seconds: 1));
yield i;
}
}
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(title: Text('StreamBuilder Example')),
body: Center(
child: StreamBuilder<int>(
stream: generateNumbers(),
builder: (context, snapshot) {
if (snapshot.connectionState == ConnectionState.waiting) {
return CircularProgressIndicator();
} else if (snapshot.hasError) {
return Text('Error: ${snapshot.error}');
} else if (snapshot.connectionState == ConnectionState.done) {
return Text('Stream is done!');
} else {
return Text('Received: ${snapshot.data}');
}
},
),
),
);
}
}
StreamBuilder会监听流的数据变化,并根据snapshot中的内容来更新 UI。- 你可以检查
snapshot.connectionState来知道流的当前状态(如是否正在等待数据)。
5. 流的生命周期
Stream 有几个重要的生命周期状态:
ConnectionState.none:表示流还没有被订阅。ConnectionState.waiting:表示流正在等待数据。ConnectionState.active:表示流正在发送数据。ConnectionState.done:表示流已经结束。
6. 处理流的错误
Stream 允许你在流中处理错误,通常通过 onError 回调函数来处理。
final stream = generateNumbers();
stream.listen(
(data) => print('Received: $data'),
onError: (error) => print('Error: $error'),
);
7. 关闭流
一旦 StreamController 的任务完成,应该调用 close() 方法来关闭流并释放资源。
controller.close();
8. 常见的流类型:
- 单次流 (
Single-subscription Stream):只能被订阅一次的流。适用于大多数场景。 - 广播流 (
Broadcast Stream):可以被多个监听者同时订阅。适用于多个组件需要接收相同数据的情况。
8.1 广播流:
final broadcastStream = Stream<int>.periodic(Duration(seconds: 1), (x) => x).asBroadcastStream();
broadcastStream.listen((data) => print('Listener 1: $data'));
broadcastStream.listen((data) => print('Listener 2: $data'));
9. 总结
Stream是 Dart 用于处理异步数据流的工具,适用于实时数据流和事件处理。- 使用
StreamController和async*等方法可以创建和管理Stream。 - 在 Flutter 中,
StreamBuilder用于监听Stream并自动更新 UI。 - 流的生命周期和异常处理是非常重要的,应该根据
Stream的状态来适时响应。
理解这些基础知识将帮助你更好地处理 Dart 和 Flutter 中的异步操作。
StreamBuilder
StreamBuilder 是 Flutter 中一个非常重要的响应式构建组件,它能够监听一个 Stream(数据流),并在数据发生变化时自动重建 UI,非常适合处理异步数据流(如实时更新的数据、网络请求、用户交互事件等)。以下是详细介绍:
1. StreamBuilder 的核心作用
- 监听数据流:绑定一个
Stream,当流中有新数据时触发 UI 更新。 - 自动重建 UI:数据变化时,自动调用
builder方法重新构建界面,无需手动管理状态。 - 处理异步数据:完美适配 Flutter 的异步编程模型(如网络请求、数据库监听等)。
2. 基本用法
构造函数
StreamBuilder<T>({
Key? key,
required this.stream, // 要监听的数据流
this.initialData, // 初始数据(流未发射数据时的默认值)
required this.builder, // UI 构建函数
})
关键参数
| 参数 | 类型 | 说明 |
|---|---|---|
stream |
Stream<T> |
必须,要监听的数据流 |
initialData |
T? |
可选,流初始值(流未发射数据时的默认值) |
builder |
AsyncWidgetBuilder<T> |
必须,根据流数据构建 UI 的函数 |
3. 完整代码示例
示例 1:监听一个简单的计数器流
import 'dart:async';
import 'package:flutter/material.dart';
class CounterStreamExample extends StatefulWidget {
_CounterStreamExampleState createState() => _CounterStreamExampleState();
}
class _CounterStreamExampleState extends State<CounterStreamExample> {
// 创建一个 StreamController 来管理数据流
final StreamController<int> _controller = StreamController<int>();
int _count = 0;
void initState() {
super.initState();
// 模拟定时发送数据到流
Timer.periodic(Duration(seconds: 1), (timer) {
_count++;
_controller.add(_count); // 向流中添加新数据
});
}
void dispose() {
_controller.close(); // 关闭流,防止内存泄漏
super.dispose();
}
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(title: Text('StreamBuilder 示例')),
body: Center(
child: StreamBuilder<int>(
stream: _controller.stream, // 监听流
initialData: 0, // 初始值
builder: (context, snapshot) {
// snapshot 包含流的状态和最新数据
return Text(
'当前计数: ${snapshot.data}',
style: TextStyle(fontSize: 24),
);
},
),
),
);
}
}
运行效果:
屏幕中央会每秒自动更新一次数字(0→1→2→…)。
示例 2:结合 Future 使用(网络请求)
Future<List<String>> fetchData() async {
await Future.delayed(Duration(seconds: 2)); // 模拟网络延迟
return ['Apple', 'Banana', 'Orange'];
}
StreamBuilder<List<String>>(
stream: fetchData().asStream(), // 将 Future 转为 Stream
builder: (context, snapshot) {
if (snapshot.connectionState == ConnectionState.waiting) {
return CircularProgressIndicator(); // 加载中
} else if (snapshot.hasError) {
return Text('Error: ${snapshot.error}'); // 错误处理
} else {
return ListView.builder(
itemCount: snapshot.data!.length,
itemBuilder: (context, index) {
return ListTile(title: Text(snapshot.data![index]));
},
);
}
},
)
4. StreamBuilder 的核心机制
AsyncSnapshot 对象
builder 方法接收一个 AsyncSnapshot<T> 参数,它包含以下关键信息:
| 属性 | 类型 | 说明 |
|---|---|---|
data |
T? |
流的最新数据(可能为 null) |
error |
Object? |
流中的错误(如果有) |
connectionState |
ConnectionState |
流的连接状态 |
ConnectionState 的四种状态
| 状态 | 说明 |
|---|---|
none |
未监听任何流 |
waiting |
正在等待流的首次数据 |
active |
流已激活,有数据或错误 |
done |
流已关闭(如调用了 stream.close()) |
5. 实际应用场景
-
实时数据更新
- 聊天消息列表(Firebase Realtime Database)
- 股票价格变动
- 传感器数据(如温度、心率)
-
网络请求
- 分页加载数据
- 表单提交状态反馈
-
用户交互事件流
- 按钮点击事件
- 手势滑动事件
-
动画控制
- 定时动画更新
- 物理模拟(如游戏角色移动)
6. 注意事项
-
必须关闭 StreamController
在StatefulWidget的dispose()方法中调用streamController.close(),避免内存泄漏。 -
处理错误状态
始终检查snapshot.hasError,避免未捕获的异常导致应用崩溃。 -
避免频繁重建
如果流发射数据过于频繁(如每秒多次),考虑使用throttle或debounce限制更新频率。 -
与
StatefulWidget的区别StreamBuilder自动管理状态,适合数据驱动 UI的场景。StatefulWidget需要手动调用setState(),适合复杂交互逻辑。
7. 对比其他方案
| 方案 | 适用场景 | 特点 |
|---|---|---|
StreamBuilder |
异步数据流 | 自动监听流,重建 UI |
FutureBuilder |
单次异步操作 | 监听 Future,一次性结果 |
ValueListenableBuilder |
局部状态共享 | 监听 ValueListenable |
ChangeNotifierProvider |
复杂状态管理 | 结合 Provider 使用 |
总结
StreamBuilder 是 Flutter 中处理异步数据流的利器,它的核心优势在于:
- 响应式编程:数据变化自动触发 UI 更新。
- 简洁的代码:无需手动管理状态(如
setState())。 - 强大的状态管理:通过
AsyncSnapshot精细控制加载、错误等状态。
如果你的应用需要处理实时数据、网络请求或用户事件流,StreamBuilder 通常是首选方案! 🚀
WebSocket
在 Flutter 中使用 WebSocket 来实现实时通信,可以通过 web_socket_channel 包来轻松实现。WebSocket 是一个全双工协议,它允许客户端与服务器建立持久连接,适合用在需要实时更新的场景(如聊天、实时通知、在线游戏等)。
下面是如何在 Flutter 中使用 WebSocket 的详细教学。
1. 安装 web_socket_channel 包
首先,在你的 pubspec.yaml 文件中添加 web_socket_channel 依赖:
dependencies:
flutter:
sdk: flutter
web_socket_channel: ^2.0.0
然后运行 flutter pub get 来安装依赖。
2. 创建 WebSocket 客户端
在 Flutter 中,你可以通过 WebSocketChannel 来连接 WebSocket 服务并进行数据通信。这里展示了一个简单的 Flutter WebSocket 客户端,能够与 WebSocket 服务器进行通信。
2.1 创建一个 WebSocket 连接
下面是一个简单的 Flutter 页面,包含 WebSocket 客户端的实现:
import 'package:flutter/material.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
class WebSocketChatPage extends StatefulWidget {
_WebSocketChatPageState createState() => _WebSocketChatPageState();
}
class _WebSocketChatPageState extends State<WebSocketChatPage> {
final TextEditingController _controller = TextEditingController();
final WebSocketChannel _channel = WebSocketChannel.connect(
Uri.parse('ws://localhost:8000/ws/chat/'), // 这里的地址要根据你的 FastAPI 服务器地址来更改
);
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(title: Text('WebSocket Chat')),
body: Column(
children: [
Expanded(
child: StreamBuilder(
stream: _channel.stream, // 监听 WebSocket 数据流
builder: (context, snapshot) {
if (snapshot.connectionState == ConnectionState.waiting) {
return Center(child: CircularProgressIndicator());
}
return ListView(
children: [
if (snapshot.hasData)
ListTile(
title: Text(snapshot.data),
),
],
);
},
),
),
Padding(
padding: const EdgeInsets.all(8.0),
child: Row(
children: [
Expanded(
child: TextField(
controller: _controller,
decoration: InputDecoration(hintText: 'Enter message'),
),
),
IconButton(
icon: Icon(Icons.send),
onPressed: () {
if (_controller.text.isNotEmpty) {
_channel.sink.add(_controller.text); // 发送消息到服务器
_controller.clear();
}
},
),
],
),
),
],
),
);
}
void dispose() {
_channel.sink.close(); // 关闭 WebSocket 连接
super.dispose();
}
}
代码解析:
- WebSocketChannel.connect:使用这个方法连接 WebSocket 服务端,你需要传入服务端的地址。这里我们使用了
ws://localhost:8000/ws/chat/,这是一个 FastAPI WebSocket 服务端的路径(根据实际情况修改)。 - StreamBuilder:用于监听 WebSocket 数据流。每当从 WebSocket 服务器接收到数据时,
StreamBuilder会自动重建界面。它会监听snapshot.data来显示服务器返回的消息。 - TextField 和 IconButton:让用户输入消息并通过 WebSocket 发送。
3. WebSocket 连接与服务器的交互
你可以使用一个简单的 FastAPI 服务来和 Flutter WebSocket 客户端进行交互。
FastAPI WebSocket 服务端代码:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
@app.websocket("/ws/chat/")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept() # 接受 WebSocket 连接
try:
while True:
msg = await websocket.receive_text() # 接收客户端的消息
await websocket.send_text(f"Message received: {msg}") # 发送回应给客户端
except WebSocketDisconnect:
print("Client disconnected")
4. 启动 FastAPI 服务器
使用 FastAPI 服务器运行 WebSocket 服务:
uvicorn main:app --reload
这会启动一个本地服务器,监听端口 8000。你可以通过浏览器或者 WebSocket 客户端连接到 ws://localhost:8000/ws/chat/。
5. Flutter 客户端与 FastAPI WebSocket 交互
运行上面 Flutter 代码时,Flutter 客户端会连接到 FastAPI WebSocket 服务。当你在 Flutter 应用中输入消息并点击发送时,消息会通过 WebSocket 发送到 FastAPI 服务器,服务器会将消息返回,Flutter 客户端接收到消息后会显示在界面上。
6. 关闭 WebSocket 连接
在 Flutter 中,使用 WebSocketChannel.sink.close() 来关闭 WebSocket 连接,通常在页面销毁时调用:
void dispose() {
_channel.sink.close(); // 关闭 WebSocket 连接
super.dispose();
}
7. 总结
- 使用
web_socket_channel库,你可以在 Flutter 中很方便地创建 WebSocket 客户端。 - 与 FastAPI 服务端的 WebSocket 通信非常简单,FastAPI 提供了原生的 WebSocket 支持。
- 通过 WebSocket,可以实现实时通信,如聊天应用、实时数据更新等功能。
通过这个例子,你可以看到如何使用 Flutter 和 FastAPI 配合使用 WebSocket 来实现实时通信功能。
更多推荐


所有评论(0)