Stream

Dart 提供了 Stream 类型来处理异步数据流和事件流,尤其适用于实时数据更新、事件处理等场景。

1. Stream 是什么?

在 Dart 中,Stream 是一个用于处理异步事件的对象,表示一个可以发出一系列事件或数据的流。与 Future 相似,Stream 也用于异步编程,但与 Future 只返回一个结果不同,Stream 可以返回多个数据事件。

  • 数据流Stream 可以返回多个数据值,直到流完成或发生错误。
  • 异步操作:通常用于 UI 或者 I/O 操作的异步处理。

2. 如何创建 Stream

Dart 提供了几种方法来创建 Stream,如通过 Stream 工厂构造函数、StreamController 和生成器函数等。

2.1 通过生成器创建 Stream

你可以通过 async*sync* 函数创建 Stream

Stream<int> generateNumbers() async* {
  for (int i = 1; i <= 5; i++) {
    await Future.delayed(Duration(seconds: 1));
    yield i;  // 每秒钟发出一个整数
  }
}

在这个例子中,async* 用于异步生成数据。每次 yield 都会把一个数据值发射到流中。

2.2 通过 StreamController 创建 Stream

StreamController 是 Dart 中用于创建和管理 Stream 的工具,可以用它来动态地控制流的内容。

import 'dart:async';

void main() {
  final controller = StreamController<int>();

  // 监听流
  controller.stream.listen((data) {
    print('Received data: $data');
  });

  // 发出数据
  controller.add(1);
  controller.add(2);
  controller.add(3);

  // 关闭流
  controller.close();
}

这里通过 StreamController 创建了一个流,并且向其中添加数据,最终关闭流。

3. 如何监听 Stream

你可以通过 listen 方法来订阅和监听 Stream

final stream = generateNumbers();
stream.listen((data) {
  print('Received: $data');
});

listen 方法允许你处理 Stream 发出的每一个数据。你还可以提供额外的回调来处理错误和流结束的情况。

stream.listen(
  (data) => print('Received: $data'),
  onError: (error) => print('Error: $error'),
  onDone: () => print('Stream is done!'),
);

4. Stream 与 UI 结合:StreamBuilder

在 Flutter 中,StreamBuilder 是一个用于将 Stream 与 UI 更新结合的非常有用的小部件。每当流中的数据发生变化时,StreamBuilder 会自动重建 UI。

import 'package:flutter/material.dart';

void main() {
  runApp(MyApp());
}

class MyApp extends StatelessWidget {
  
  Widget build(BuildContext context) {
    return MaterialApp(
      home: StreamBuilderExample(),
    );
  }
}

class StreamBuilderExample extends StatelessWidget {
  Stream<int> generateNumbers() async* {
    for (int i = 1; i <= 5; i++) {
      await Future.delayed(Duration(seconds: 1));
      yield i;
    }
  }

  
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(title: Text('StreamBuilder Example')),
      body: Center(
        child: StreamBuilder<int>(
          stream: generateNumbers(),
          builder: (context, snapshot) {
            if (snapshot.connectionState == ConnectionState.waiting) {
              return CircularProgressIndicator();
            } else if (snapshot.hasError) {
              return Text('Error: ${snapshot.error}');
            } else if (snapshot.connectionState == ConnectionState.done) {
              return Text('Stream is done!');
            } else {
              return Text('Received: ${snapshot.data}');
            }
          },
        ),
      ),
    );
  }
}
  • StreamBuilder 会监听流的数据变化,并根据 snapshot 中的内容来更新 UI。
  • 你可以检查 snapshot.connectionState 来知道流的当前状态(如是否正在等待数据)。

5. 流的生命周期

Stream 有几个重要的生命周期状态:

  • ConnectionState.none:表示流还没有被订阅。
  • ConnectionState.waiting:表示流正在等待数据。
  • ConnectionState.active:表示流正在发送数据。
  • ConnectionState.done:表示流已经结束。

6. 处理流的错误

Stream 允许你在流中处理错误,通常通过 onError 回调函数来处理。

final stream = generateNumbers();
stream.listen(
  (data) => print('Received: $data'),
  onError: (error) => print('Error: $error'),
);

7. 关闭流

一旦 StreamController 的任务完成,应该调用 close() 方法来关闭流并释放资源。

controller.close();

8. 常见的流类型:

  • 单次流 (Single-subscription Stream):只能被订阅一次的流。适用于大多数场景。
  • 广播流 (Broadcast Stream):可以被多个监听者同时订阅。适用于多个组件需要接收相同数据的情况。
8.1 广播流:
final broadcastStream = Stream<int>.periodic(Duration(seconds: 1), (x) => x).asBroadcastStream();
broadcastStream.listen((data) => print('Listener 1: $data'));
broadcastStream.listen((data) => print('Listener 2: $data'));

9. 总结

  • Stream 是 Dart 用于处理异步数据流的工具,适用于实时数据流和事件处理。
  • 使用 StreamControllerasync* 等方法可以创建和管理 Stream
  • 在 Flutter 中,StreamBuilder 用于监听 Stream 并自动更新 UI。
  • 流的生命周期和异常处理是非常重要的,应该根据 Stream 的状态来适时响应。

理解这些基础知识将帮助你更好地处理 Dart 和 Flutter 中的异步操作。

StreamBuilder

StreamBuilder 是 Flutter 中一个非常重要的响应式构建组件,它能够监听一个 Stream(数据流),并在数据发生变化时自动重建 UI,非常适合处理异步数据流(如实时更新的数据、网络请求、用户交互事件等)。以下是详细介绍:


1. StreamBuilder 的核心作用

  • 监听数据流:绑定一个 Stream,当流中有新数据时触发 UI 更新。
  • 自动重建 UI:数据变化时,自动调用 builder 方法重新构建界面,无需手动管理状态。
  • 处理异步数据:完美适配 Flutter 的异步编程模型(如网络请求、数据库监听等)。

2. 基本用法

构造函数
StreamBuilder<T>({
  Key? key,
  required this.stream,       // 要监听的数据流
  this.initialData,           // 初始数据(流未发射数据时的默认值)
  required this.builder,      // UI 构建函数
})
关键参数
参数 类型 说明
stream Stream<T> 必须,要监听的数据流
initialData T? 可选,流初始值(流未发射数据时的默认值)
builder AsyncWidgetBuilder<T> 必须,根据流数据构建 UI 的函数

3. 完整代码示例

示例 1:监听一个简单的计数器流
import 'dart:async';
import 'package:flutter/material.dart';

class CounterStreamExample extends StatefulWidget {
  
  _CounterStreamExampleState createState() => _CounterStreamExampleState();
}

class _CounterStreamExampleState extends State<CounterStreamExample> {
  // 创建一个 StreamController 来管理数据流
  final StreamController<int> _controller = StreamController<int>();
  int _count = 0;

  
  void initState() {
    super.initState();
    // 模拟定时发送数据到流
    Timer.periodic(Duration(seconds: 1), (timer) {
      _count++;
      _controller.add(_count); // 向流中添加新数据
    });
  }

  
  void dispose() {
    _controller.close(); // 关闭流,防止内存泄漏
    super.dispose();
  }

  
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(title: Text('StreamBuilder 示例')),
      body: Center(
        child: StreamBuilder<int>(
          stream: _controller.stream, // 监听流
          initialData: 0,            // 初始值
          builder: (context, snapshot) {
            // snapshot 包含流的状态和最新数据
            return Text(
              '当前计数: ${snapshot.data}',
              style: TextStyle(fontSize: 24),
            );
          },
        ),
      ),
    );
  }
}

运行效果
屏幕中央会每秒自动更新一次数字(0→1→2→…)。


示例 2:结合 Future 使用(网络请求)
Future<List<String>> fetchData() async {
  await Future.delayed(Duration(seconds: 2)); // 模拟网络延迟
  return ['Apple', 'Banana', 'Orange'];
}

StreamBuilder<List<String>>(
  stream: fetchData().asStream(), // 将 Future 转为 Stream
  builder: (context, snapshot) {
    if (snapshot.connectionState == ConnectionState.waiting) {
      return CircularProgressIndicator(); // 加载中
    } else if (snapshot.hasError) {
      return Text('Error: ${snapshot.error}'); // 错误处理
    } else {
      return ListView.builder(
        itemCount: snapshot.data!.length,
        itemBuilder: (context, index) {
          return ListTile(title: Text(snapshot.data![index]));
        },
      );
    }
  },
)

4. StreamBuilder 的核心机制

AsyncSnapshot 对象

builder 方法接收一个 AsyncSnapshot<T> 参数,它包含以下关键信息:

属性 类型 说明
data T? 流的最新数据(可能为 null
error Object? 流中的错误(如果有)
connectionState ConnectionState 流的连接状态
ConnectionState 的四种状态
状态 说明
none 未监听任何流
waiting 正在等待流的首次数据
active 流已激活,有数据或错误
done 流已关闭(如调用了 stream.close()

5. 实际应用场景

  1. 实时数据更新

    • 聊天消息列表(Firebase Realtime Database)
    • 股票价格变动
    • 传感器数据(如温度、心率)
  2. 网络请求

    • 分页加载数据
    • 表单提交状态反馈
  3. 用户交互事件流

    • 按钮点击事件
    • 手势滑动事件
  4. 动画控制

    • 定时动画更新
    • 物理模拟(如游戏角色移动)

6. 注意事项

  1. 必须关闭 StreamController
    StatefulWidgetdispose() 方法中调用 streamController.close(),避免内存泄漏。

  2. 处理错误状态
    始终检查 snapshot.hasError,避免未捕获的异常导致应用崩溃。

  3. 避免频繁重建
    如果流发射数据过于频繁(如每秒多次),考虑使用 throttledebounce 限制更新频率。

  4. StatefulWidget 的区别

    • StreamBuilder 自动管理状态,适合数据驱动 UI的场景。
    • StatefulWidget 需要手动调用 setState(),适合复杂交互逻辑

7. 对比其他方案

方案 适用场景 特点
StreamBuilder 异步数据流 自动监听流,重建 UI
FutureBuilder 单次异步操作 监听 Future,一次性结果
ValueListenableBuilder 局部状态共享 监听 ValueListenable
ChangeNotifierProvider 复杂状态管理 结合 Provider 使用

总结

StreamBuilder 是 Flutter 中处理异步数据流的利器,它的核心优势在于:

  1. 响应式编程:数据变化自动触发 UI 更新。
  2. 简洁的代码:无需手动管理状态(如 setState())。
  3. 强大的状态管理:通过 AsyncSnapshot 精细控制加载、错误等状态。

如果你的应用需要处理实时数据、网络请求或用户事件流,StreamBuilder 通常是首选方案! 🚀

WebSocket

在 Flutter 中使用 WebSocket 来实现实时通信,可以通过 web_socket_channel 包来轻松实现。WebSocket 是一个全双工协议,它允许客户端与服务器建立持久连接,适合用在需要实时更新的场景(如聊天、实时通知、在线游戏等)。

下面是如何在 Flutter 中使用 WebSocket 的详细教学。

1. 安装 web_socket_channel

首先,在你的 pubspec.yaml 文件中添加 web_socket_channel 依赖:

dependencies:
  flutter:
    sdk: flutter
  web_socket_channel: ^2.0.0

然后运行 flutter pub get 来安装依赖。

2. 创建 WebSocket 客户端

在 Flutter 中,你可以通过 WebSocketChannel 来连接 WebSocket 服务并进行数据通信。这里展示了一个简单的 Flutter WebSocket 客户端,能够与 WebSocket 服务器进行通信。

2.1 创建一个 WebSocket 连接

下面是一个简单的 Flutter 页面,包含 WebSocket 客户端的实现:

import 'package:flutter/material.dart';
import 'package:web_socket_channel/web_socket_channel.dart';

class WebSocketChatPage extends StatefulWidget {
  
  _WebSocketChatPageState createState() => _WebSocketChatPageState();
}

class _WebSocketChatPageState extends State<WebSocketChatPage> {
  final TextEditingController _controller = TextEditingController();
  final WebSocketChannel _channel = WebSocketChannel.connect(
    Uri.parse('ws://localhost:8000/ws/chat/'),  // 这里的地址要根据你的 FastAPI 服务器地址来更改
  );

  
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(title: Text('WebSocket Chat')),
      body: Column(
        children: [
          Expanded(
            child: StreamBuilder(
              stream: _channel.stream,  // 监听 WebSocket 数据流
              builder: (context, snapshot) {
                if (snapshot.connectionState == ConnectionState.waiting) {
                  return Center(child: CircularProgressIndicator());
                }
                return ListView(
                  children: [
                    if (snapshot.hasData)
                      ListTile(
                        title: Text(snapshot.data),
                      ),
                  ],
                );
              },
            ),
          ),
          Padding(
            padding: const EdgeInsets.all(8.0),
            child: Row(
              children: [
                Expanded(
                  child: TextField(
                    controller: _controller,
                    decoration: InputDecoration(hintText: 'Enter message'),
                  ),
                ),
                IconButton(
                  icon: Icon(Icons.send),
                  onPressed: () {
                    if (_controller.text.isNotEmpty) {
                      _channel.sink.add(_controller.text);  // 发送消息到服务器
                      _controller.clear();
                    }
                  },
                ),
              ],
            ),
          ),
        ],
      ),
    );
  }

  
  void dispose() {
    _channel.sink.close();  // 关闭 WebSocket 连接
    super.dispose();
  }
}
代码解析:
  • WebSocketChannel.connect:使用这个方法连接 WebSocket 服务端,你需要传入服务端的地址。这里我们使用了 ws://localhost:8000/ws/chat/,这是一个 FastAPI WebSocket 服务端的路径(根据实际情况修改)。
  • StreamBuilder:用于监听 WebSocket 数据流。每当从 WebSocket 服务器接收到数据时,StreamBuilder 会自动重建界面。它会监听 snapshot.data 来显示服务器返回的消息。
  • TextField 和 IconButton:让用户输入消息并通过 WebSocket 发送。

3. WebSocket 连接与服务器的交互

你可以使用一个简单的 FastAPI 服务来和 Flutter WebSocket 客户端进行交互。

FastAPI WebSocket 服务端代码:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket("/ws/chat/")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()  # 接受 WebSocket 连接
    try:
        while True:
            msg = await websocket.receive_text()  # 接收客户端的消息
            await websocket.send_text(f"Message received: {msg}")  # 发送回应给客户端
    except WebSocketDisconnect:
        print("Client disconnected")

4. 启动 FastAPI 服务器

使用 FastAPI 服务器运行 WebSocket 服务:

uvicorn main:app --reload

这会启动一个本地服务器,监听端口 8000。你可以通过浏览器或者 WebSocket 客户端连接到 ws://localhost:8000/ws/chat/

5. Flutter 客户端与 FastAPI WebSocket 交互

运行上面 Flutter 代码时,Flutter 客户端会连接到 FastAPI WebSocket 服务。当你在 Flutter 应用中输入消息并点击发送时,消息会通过 WebSocket 发送到 FastAPI 服务器,服务器会将消息返回,Flutter 客户端接收到消息后会显示在界面上。

6. 关闭 WebSocket 连接

在 Flutter 中,使用 WebSocketChannel.sink.close() 来关闭 WebSocket 连接,通常在页面销毁时调用:


void dispose() {
  _channel.sink.close();  // 关闭 WebSocket 连接
  super.dispose();
}

7. 总结

  • 使用 web_socket_channel 库,你可以在 Flutter 中很方便地创建 WebSocket 客户端。
  • 与 FastAPI 服务端的 WebSocket 通信非常简单,FastAPI 提供了原生的 WebSocket 支持。
  • 通过 WebSocket,可以实现实时通信,如聊天应用、实时数据更新等功能。

通过这个例子,你可以看到如何使用 Flutter 和 FastAPI 配合使用 WebSocket 来实现实时通信功能。

Logo

开源鸿蒙跨平台开发社区汇聚开发者与厂商,共建“一次开发,多端部署”的开源生态,致力于降低跨端开发门槛,推动万物智联创新。

更多推荐