111. 数据层架构设计:打造高性能、可扩展的移动应用数据层
本文探讨了移动应用数据层架构设计,重点阐述了Repository模式在多数据源管理中的应用。数据层作为核心基础设施,承担数据获取、存储、缓存和同步等职责。设计遵循单一数据源、离线优先、响应式数据流等原则,通过Kotlin实现展示了如何协调远程API、本地数据库和缓存数据源。采用分层架构和Flow响应式编程,支持离线优先策略、自动数据同步和多级缓存,为复杂业务场景和百万级用户规模提供高性能、可扩展的
·
111. 数据层架构设计:打造高性能、可扩展的移动应用数据层
摘要
数据层是移动应用架构的核心基础设施,负责数据的获取、存储、缓存和同步。本文深入探讨大型Android应用的数据层架构设计,涵盖Repository模式、多数据源管理、数据同步策略、离线支持、数据一致性保证等核心议题。通过完整的架构设计和Kotlin实现,展示如何构建一个高性能、可扩展、易测试的数据层架构,支撑复杂业务场景和百万级用户规模。
关键词: Repository模式、数据源管理、数据同步、离线优先、数据一致性、Flow、Kotlin
一、数据层架构概述
1.1 数据层的核心职责
核心职责:
- 数据获取:从远程API、本地数据库、缓存等多个数据源获取数据
- 数据存储:持久化数据到本地数据库、文件系统
- 数据缓存:多级缓存策略提升性能
- 数据同步:在线/离线数据同步、冲突解决
- 数据转换:API模型到领域模型的转换
- 状态管理:数据加载状态、错误处理
1.2 数据层设计原则
/**
* 数据层设计原则
*
* 1. 单一数据源原则 (Single Source of Truth)
* - Repository是唯一的数据访问入口
* - 避免UI层直接访问数据源
*
* 2. 离线优先原则 (Offline First)
* - 优先使用本地数据
* - 后台同步远程数据
*
* 3. 响应式数据流原则
* - 使用Flow/LiveData等响应式数据流
* - 数据变化自动通知观察者
*
* 4. 分层原则
* - 清晰的数据源分层
* - Repository封装数据源细节
*
* 5. 可测试原则
* - 依赖注入
* - 接口抽象
*/
二、Repository模式深度实践
2.1 Repository接口设计
package com.example.security.data.repository
/**
* 设备仓储接口
*
* 定义设备数据访问的标准契约
*/
interface DeviceRepository {
/**
* 获取设备列表 - 响应式数据流
*
* @param forceRefresh 是否强制刷新
* @return 设备列表Flow,自动推送数据变化
*/
fun getDevices(forceRefresh: Boolean = false): Flow<Result<List<Device>>>
/**
* 获取单个设备详情
*
* @param deviceId 设备ID
* @return 设备详情Flow
*/
fun getDevice(deviceId: String): Flow<Result<Device>>
/**
* 获取设备实时状态
*
* @param deviceId 设备ID
* @return 设备状态Flow,实时推送状态变化
*/
fun getDeviceStatus(deviceId: String): Flow<Result<DeviceStatus>>
/**
* 更新设备信息
*
* @param device 设备信息
* @return 更新结果
*/
suspend fun updateDevice(device: Device): Result<Device>
/**
* 删除设备
*
* @param deviceId 设备ID
* @return 删除结果
*/
suspend fun deleteDevice(deviceId: String): Result<Unit>
/**
* 同步设备数据
*
* @return 同步结果
*/
suspend fun syncDevices(): Result<Unit>
/**
* 清除缓存
*/
suspend fun clearCache()
}
2.2 Repository实现 - 多数据源协调
package com.example.security.data.repository
/**
* 设备仓储实现
*
* 协调远程数据源、本地数据源和缓存数据源
* 实现离线优先、自动同步、多级缓存策略
*/
@Singleton
class DeviceRepositoryImpl @Inject constructor(
private val remoteDataSource: RemoteDeviceDataSource,
private val localDataSource: LocalDeviceDataSource,
private val cacheDataSource: DeviceCacheDataSource,
private val deviceMapper: DeviceMapper,
private val ioDispatcher: CoroutineDispatcher = Dispatchers.IO
) : DeviceRepository {
companion object {
private const val CACHE_EXPIRY_MINUTES = 5L
}
/**
* 获取设备列表 - 离线优先策略
*
* 数据流:
* 1. 先返回缓存数据(如果有且未过期)
* 2. 再返回本地数据库数据
* 3. 后台刷新远程数据
* 4. 更新本地数据库和缓存
* 5. 推送最新数据
*/
override fun getDevices(forceRefresh: Boolean): Flow<Result<List<Device>>> = flow {
// 1. 首先尝试从内存缓存获取
if (!forceRefresh) {
val cachedDevices = cacheDataSource.getDevices()
if (cachedDevices.isNotEmpty() && !isCacheExpired()) {
emit(Result.Success(cachedDevices))
}
}
// 2. 从本地数据库获取
localDataSource.getDevices()
.catch { e ->
// 本地数据获取失败,记录日志但不中断流
emit(Result.Error(e))
}
.collect { localDevices ->
if (localDevices.isNotEmpty()) {
// 发送本地数据
emit(Result.Success(localDevices))
// 更新内存缓存
cacheDataSource.saveDevices(localDevices)
}
}
// 3. 根据策略决定是否刷新远程数据
if (forceRefresh || shouldRefreshFromRemote()) {
emit(Result.Loading)
try {
// 从远程API获取最新数据
val remoteResult = remoteDataSource.fetchDevices()
when (remoteResult) {
is Result.Success -> {
val devices = remoteResult.data
// 保存到本地数据库
localDataSource.saveDevices(devices)
// 更新内存缓存
cacheDataSource.saveDevices(devices)
cacheDataSource.updateCacheTimestamp()
// 发送最新数据
emit(Result.Success(devices))
}
is Result.Error -> {
// 远程获取失败,使用本地数据,发送错误通知
emit(Result.Error(remoteResult.exception))
}
is Result.Loading -> {
// 保持Loading状态
}
}
} catch (e: Exception) {
// 网络异常,继续使用本地数据
emit(Result.Error(e))
}
}
}.flowOn(ioDispatcher)
/**
* 获取单个设备 - 缓存优先策略
*/
override fun getDevice(deviceId: String): Flow<Result<Device>> = flow {
// 1. 先从缓存获取
val cachedDevice = cacheDataSource.getDevice(deviceId)
if (cachedDevice != null && !isCacheExpired()) {
emit(Result.Success(cachedDevice))
}
// 2. 从本地数据库获取
localDataSource.getDevice(deviceId)
.catch { e -> emit(Result.Error(e)) }
.collect { device ->
emit(Result.Success(device))
cacheDataSource.saveDevice(device)
}
// 3. 后台刷新远程数据
if (shouldRefreshFromRemote()) {
try {
val remoteResult = remoteDataSource.fetchDevice(deviceId)
if (remoteResult is Result.Success) {
val device = remoteResult.data
localDataSource.saveDevice(device)
cacheDataSource.saveDevice(device)
emit(Result.Success(device))
}
} catch (e: Exception) {
// 忽略远程刷新错误
}
}
}.flowOn(ioDispatcher)
/**
* 获取设备实时状态 - WebSocket + 轮询策略
*/
override fun getDeviceStatus(deviceId: String): Flow<Result<DeviceStatus>> =
remoteDataSource.observeDeviceStatus(deviceId)
.catch { e ->
// WebSocket失败,降级到轮询
emitAll(pollDeviceStatus(deviceId))
}
.onEach { result ->
// 缓存最新状态
if (result is Result.Success) {
cacheDataSource.saveDeviceStatus(deviceId, result.data)
}
}
.flowOn(ioDispatcher)
/**
* 更新设备信息 - 乐观更新策略
*/
override suspend fun updateDevice(device: Device): Result<Device> = withContext(ioDispatcher) {
try {
// 1. 乐观更新本地数据库
localDataSource.saveDevice(device)
cacheDataSource.saveDevice(device)
// 2. 同步到远程服务器
val remoteResult = remoteDataSource.updateDevice(device)
when (remoteResult) {
is Result.Success -> {
// 远程更新成功,使用服务器返回的数据
val updatedDevice = remoteResult.data
localDataSource.saveDevice(updatedDevice)
cacheDataSource.saveDevice(updatedDevice)
Result.Success(updatedDevice)
}
is Result.Error -> {
// 远程更新失败,回滚本地数据
// 这里可以实现冲突解决策略
Result.Error(remoteResult.exception)
}
else -> Result.Error(Exception("Unknown error"))
}
} catch (e: Exception) {
Result.Error(e)
}
}
/**
* 删除设备 - 远程优先策略
*/
override suspend fun deleteDevice(deviceId: String): Result<Unit> = withContext(ioDispatcher) {
try {
// 1. 先删除远程数据
val remoteResult = remoteDataSource.deleteDevice(deviceId)
if (remoteResult is Result.Success) {
// 2. 删除本地数据
localDataSource.deleteDevice(deviceId)
cacheDataSource.deleteDevice(deviceId)
Result.Success(Unit)
} else {
remoteResult as Result.Error
Result.Error(remoteResult.exception)
}
} catch (e: Exception) {
Result.Error(e)
}
}
/**
* 同步设备数据
*/
override suspend fun syncDevices(): Result<Unit> = withContext(ioDispatcher) {
try {
// 1. 获取本地待同步数据
val localDevices = localDataSource.getPendingSyncDevices()
// 2. 同步到远程
localDevices.forEach { device ->
remoteDataSource.updateDevice(device)
localDataSource.markSynced(device.id)
}
// 3. 从远程拉取最新数据
val remoteResult = remoteDataSource.fetchDevices()
if (remoteResult is Result.Success) {
localDataSource.saveDevices(remoteResult.data)
cacheDataSource.saveDevices(remoteResult.data)
}
Result.Success(Unit)
} catch (e: Exception) {
Result.Error(e)
}
}
override suspend fun clearCache() {
cacheDataSource.clear()
}
// 私有辅助方法
private fun isCacheExpired(): Boolean {
val lastCacheTime = cacheDataSource.getLastCacheTimestamp()
val currentTime = System.currentTimeMillis()
val expiryTime = TimeUnit.MINUTES.toMillis(CACHE_EXPIRY_MINUTES)
return currentTime - lastCacheTime > expiryTime
}
private fun shouldRefreshFromRemote(): Boolean {
return isCacheExpired() || !cacheDataSource.hasCache()
}
private fun pollDeviceStatus(deviceId: String): Flow<Result<DeviceStatus>> = flow {
while (true) {
val result = remoteDataSource.fetchDeviceStatus(deviceId)
emit(result)
kotlinx.coroutines.delay(5000) // 5秒轮询一次
}
}
}
三、多数据源架构设计
3.1 数据源层次结构
3.2 远程数据源实现
package com.example.security.data.source.remote
/**
* 远程设备数据源
*
* 负责与远程API交互
*/
@Singleton
class RemoteDeviceDataSource @Inject constructor(
private val apiService: DeviceApiService,
private val deviceMapper: DeviceMapper,
private val webSocketClient: DeviceWebSocketClient
) {
/**
* 从远程API获取设备列表
*/
suspend fun fetchDevices(): Result<List<Device>> {
return try {
val response = apiService.getDevices()
if (response.isSuccessful) {
val deviceDtos = response.body() ?: emptyList()
val devices = deviceDtos.map { deviceMapper.mapToDomain(it) }
Result.Success(devices)
} else {
Result.Error(Exception("API Error: ${response.code()}"))
}
} catch (e: Exception) {
Result.Error(e)
}
}
/**
* 从远程API获取单个设备
*/
suspend fun fetchDevice(deviceId: String): Result<Device> {
return try {
val response = apiService.getDevice(deviceId)
if (response.isSuccessful) {
val deviceDto = response.body()
if (deviceDto != null) {
Result.Success(deviceMapper.mapToDomain(deviceDto))
} else {
Result.Error(Exception("Device not found"))
}
} else {
Result.Error(Exception("API Error: ${response.code()}"))
}
} catch (e: Exception) {
Result.Error(e)
}
}
/**
* 通过WebSocket观察设备状态
*/
fun observeDeviceStatus(deviceId: String): Flow<Result<DeviceStatus>> = flow {
webSocketClient.connect(deviceId)
.collect { statusDto ->
val status = deviceMapper.mapStatusToDomain(statusDto)
emit(Result.Success(status))
}
}
/**
* 轮询获取设备状态
*/
suspend fun fetchDeviceStatus(deviceId: String): Result<DeviceStatus> {
return try {
val response = apiService.getDeviceStatus(deviceId)
if (response.isSuccessful) {
val statusDto = response.body()
if (statusDto != null) {
Result.Success(deviceMapper.mapStatusToDomain(statusDto))
} else {
Result.Error(Exception("Status not found"))
}
} else {
Result.Error(Exception("API Error: ${response.code()}"))
}
} catch (e: Exception) {
Result.Error(e)
}
}
/**
* 更新设备信息到远程服务器
*/
suspend fun updateDevice(device: Device): Result<Device> {
return try {
val deviceDto = deviceMapper.mapToDto(device)
val response = apiService.updateDevice(device.id, deviceDto)
if (response.isSuccessful) {
val updatedDto = response.body()
if (updatedDto != null) {
Result.Success(deviceMapper.mapToDomain(updatedDto))
} else {
Result.Error(Exception("Update failed"))
}
} else {
Result.Error(Exception("API Error: ${response.code()}"))
}
} catch (e: Exception) {
Result.Error(e)
}
}
/**
* 删除远程设备
*/
suspend fun deleteDevice(deviceId: String): Result<Unit> {
return try {
val response = apiService.deleteDevice(deviceId)
if (response.isSuccessful) {
Result.Success(Unit)
} else {
Result.Error(Exception("API Error: ${response.code()}"))
}
} catch (e: Exception) {
Result.Error(e)
}
}
}
3.3 本地数据源实现
package com.example.security.data.source.local
/**
* 本地设备数据源
*
* 负责与本地数据库交互
*/
@Singleton
class LocalDeviceDataSource @Inject constructor(
private val deviceDao: DeviceDao,
private val deviceMapper: DeviceMapper
) {
/**
* 获取所有设备 - 响应式
*/
fun getDevices(): Flow<List<Device>> {
return deviceDao.getAllDevices()
.map { entities ->
entities.map { deviceMapper.mapToDomain(it) }
}
}
/**
* 获取单个设备 - 响应式
*/
fun getDevice(deviceId: String): Flow<Device> {
return deviceDao.getDevice(deviceId)
.map { entity ->
deviceMapper.mapToDomain(entity)
}
}
/**
* 保存单个设备
*/
suspend fun saveDevice(device: Device) {
val entity = deviceMapper.mapToEntity(device)
deviceDao.insertDevice(entity)
}
/**
* 保存多个设备
*/
suspend fun saveDevices(devices: List<Device>) {
val entities = devices.map { deviceMapper.mapToEntity(it) }
deviceDao.insertDevices(entities)
}
/**
* 删除设备
*/
suspend fun deleteDevice(deviceId: String) {
deviceDao.deleteDevice(deviceId)
}
/**
* 获取待同步设备
*/
suspend fun getPendingSyncDevices(): List<Device> {
return deviceDao.getPendingSyncDevices()
.map { deviceMapper.mapToDomain(it) }
}
/**
* 标记设备已同步
*/
suspend fun markSynced(deviceId: String) {
deviceDao.markSynced(deviceId, System.currentTimeMillis())
}
/**
* 清空所有设备
*/
suspend fun clearAll() {
deviceDao.deleteAll()
}
}
3.4 缓存数据源实现
package com.example.security.data.source.cache
/**
* 设备缓存数据源
*
* 使用LruCache实现内存缓存
*/
@Singleton
class DeviceCacheDataSource @Inject constructor() {
companion object {
private const val CACHE_SIZE = 50 // 缓存50个设备
}
// 设备列表缓存
private var devicesCache: List<Device>? = null
// 单个设备缓存
private val deviceCache = LruCache<String, Device>(CACHE_SIZE)
// 设备状态缓存
private val statusCache = LruCache<String, DeviceStatus>(CACHE_SIZE)
// 缓存时间戳
private var cacheTimestamp: Long = 0L
/**
* 获取所有设备缓存
*/
fun getDevices(): List<Device> {
return devicesCache ?: emptyList()
}
/**
* 保存设备列表到缓存
*/
fun saveDevices(devices: List<Device>) {
devicesCache = devices
// 同时更新单个设备缓存
devices.forEach { device ->
deviceCache.put(device.id, device)
}
}
/**
* 获取单个设备缓存
*/
fun getDevice(deviceId: String): Device? {
return deviceCache.get(deviceId)
}
/**
* 保存单个设备到缓存
*/
fun saveDevice(device: Device) {
deviceCache.put(device.id, device)
// 更新列表缓存中的对应项
devicesCache?.let { list ->
val index = list.indexOfFirst { it.id == device.id }
if (index != -1) {
val mutableList = list.toMutableList()
mutableList[index] = device
devicesCache = mutableList
}
}
}
/**
* 删除设备缓存
*/
fun deleteDevice(deviceId: String) {
deviceCache.remove(deviceId)
statusCache.remove(deviceId)
devicesCache?.let { list ->
devicesCache = list.filter { it.id != deviceId }
}
}
/**
* 保存设备状态到缓存
*/
fun saveDeviceStatus(deviceId: String, status: DeviceStatus) {
statusCache.put(deviceId, status)
}
/**
* 获取设备状态缓存
*/
fun getDeviceStatus(deviceId: String): DeviceStatus? {
return statusCache.get(deviceId)
}
/**
* 更新缓存时间戳
*/
fun updateCacheTimestamp() {
cacheTimestamp = System.currentTimeMillis()
}
/**
* 获取缓存时间戳
*/
fun getLastCacheTimestamp(): Long {
return cacheTimestamp
}
/**
* 是否有缓存
*/
fun hasCache(): Boolean {
return devicesCache != null && devicesCache!!.isNotEmpty()
}
/**
* 清空所有缓存
*/
fun clear() {
devicesCache = null
deviceCache.evictAll()
statusCache.evictAll()
cacheTimestamp = 0L
}
}
四、数据同步与离线支持
4.1 数据同步架构
4.2 同步管理器实现
package com.example.security.data.sync
/**
* 数据同步管理器
*
* 负责本地数据与远程数据的双向同步
*/
@Singleton
class DataSyncManager @Inject constructor(
private val localDataSource: LocalDeviceDataSource,
private val remoteDataSource: RemoteDeviceDataSource,
private val networkMonitor: NetworkMonitor,
private val syncStateManager: SyncStateManager,
private val conflictResolver: ConflictResolver,
private val ioDispatcher: CoroutineDispatcher = Dispatchers.IO
) {
private val syncQueue = ConcurrentLinkedQueue<SyncTask>()
private val syncScope = CoroutineScope(ioDispatcher + SupervisorJob())
// 同步状态Flow
private val _syncState = MutableStateFlow<SyncState>(SyncState.Idle)
val syncState: StateFlow<SyncState> = _syncState.asStateFlow()
init {
// 监听网络状态,网络恢复时自动同步
syncScope.launch {
networkMonitor.isOnline
.filter { it } // 只在网络恢复时触发
.collect {
autoSync()
}
}
}
/**
* 手动触发完整同步
*/
suspend fun sync(): Result<Unit> = withContext(ioDispatcher) {
if (_syncState.value is SyncState.Syncing) {
return@withContext Result.Error(Exception("Sync already in progress"))
}
_syncState.value = SyncState.Syncing(progress = 0)
try {
// 1. 上传本地变更
val uploadResult = uploadLocalChanges()
if (uploadResult is Result.Error) {
_syncState.value = SyncState.Error(uploadResult.exception)
return@withContext uploadResult
}
_syncState.value = SyncState.Syncing(progress = 50)
// 2. 下载远程变更
val downloadResult = downloadRemoteChanges()
if (downloadResult is Result.Error) {
_syncState.value = SyncState.Error(downloadResult.exception)
return@withContext downloadResult
}
_syncState.value = SyncState.Success(System.currentTimeMillis())
Result.Success(Unit)
} catch (e: Exception) {
_syncState.value = SyncState.Error(e)
Result.Error(e)
}
}
/**
* 自动同步 - 后台静默同步
*/
private suspend fun autoSync() {
if (!networkMonitor.isOnline.value) return
if (_syncState.value is SyncState.Syncing) return
sync()
}
/**
* 上传本地变更到远程
*/
private suspend fun uploadLocalChanges(): Result<Unit> {
try {
// 获取所有待同步的本地设备
val pendingDevices = localDataSource.getPendingSyncDevices()
if (pendingDevices.isEmpty()) {
return Result.Success(Unit)
}
// 批量上传
val uploadResults = pendingDevices.map { device ->
async(ioDispatcher) {
uploadDevice(device)
}
}.awaitAll()
// 检查是否有失败的上传
val failedUploads = uploadResults.filterIsInstance<Result.Error>()
if (failedUploads.isNotEmpty()) {
return Result.Error(Exception("Some uploads failed"))
}
return Result.Success(Unit)
} catch (e: Exception) {
return Result.Error(e)
}
}
/**
* 上传单个设备
*/
private suspend fun uploadDevice(device: Device): Result<Unit> {
return when (val result = remoteDataSource.updateDevice(device)) {
is Result.Success -> {
// 标记为已同步
localDataSource.markSynced(device.id)
Result.Success(Unit)
}
is Result.Error -> {
// 上传失败,加入重试队列
addToRetryQueue(SyncTask.Upload(device))
result
}
else -> Result.Error(Exception("Unknown error"))
}
}
/**
* 下载远程变更到本地
*/
private suspend fun downloadRemoteChanges(): Result<Unit> {
try {
// 获取远程所有设备
val remoteResult = remoteDataSource.fetchDevices()
if (remoteResult is Result.Error) {
return remoteResult
}
val remoteDevices = (remoteResult as Result.Success).data
// 获取本地所有设备
val localDevices = localDataSource.getDevices().first()
// 合并数据,解决冲突
val mergedDevices = mergeData(localDevices, remoteDevices)
// 保存合并后的数据
localDataSource.saveDevices(mergedDevices)
return Result.Success(Unit)
} catch (e: Exception) {
return Result.Error(e)
}
}
/**
* 合并本地和远程数据
*/
private suspend fun mergeData(
localDevices: List<Device>,
remoteDevices: List<Device>
): List<Device> {
val localMap = localDevices.associateBy { it.id }
val remoteMap = remoteDevices.associateBy { it.id }
val mergedMap = mutableMapOf<String, Device>()
// 处理远程设备
remoteDevices.forEach { remoteDevice ->
val localDevice = localMap[remoteDevice.id]
if (localDevice == null) {
// 远程新增设备,直接使用
mergedMap[remoteDevice.id] = remoteDevice
} else {
// 设备冲突,使用冲突解决器
val resolvedDevice = conflictResolver.resolve(localDevice, remoteDevice)
mergedMap[remoteDevice.id] = resolvedDevice
}
}
// 处理本地独有设备(远程已删除)
localDevices.forEach { localDevice ->
if (!remoteMap.containsKey(localDevice.id)) {
// 本地独有,保留本地数据(后续会上传)
if (!mergedMap.containsKey(localDevice.id)) {
mergedMap[localDevice.id] = localDevice
}
}
}
return mergedMap.values.toList()
}
/**
* 添加到重试队列
*/
private fun addToRetryQueue(task: SyncTask) {
syncQueue.offer(task)
// 启动重试任务
syncScope.launch {
delay(5000) // 5秒后重试
retrySync()
}
}
/**
* 重试同步
*/
private suspend fun retrySync() {
val task = syncQueue.poll() ?: return
when (task) {
is SyncTask.Upload -> {
val result = uploadDevice(task.device)
if (result is Result.Error && task.retryCount < 3) {
// 继续重试
addToRetryQueue(task.copy(retryCount = task.retryCount + 1))
}
}
is SyncTask.Download -> {
// 处理下载重试
}
}
}
}
/**
* 同步任务
*/
sealed class SyncTask(open val retryCount: Int = 0) {
data class Upload(
val device: Device,
override val retryCount: Int = 0
) : SyncTask(retryCount)
data class Download(
val deviceId: String,
override val retryCount: Int = 0
) : SyncTask(retryCount)
}
/**
* 同步状态
*/
sealed class SyncState {
object Idle : SyncState()
data class Syncing(val progress: Int) : SyncState()
data class Success(val timestamp: Long) : SyncState()
data class Error(val exception: Exception) : SyncState()
}
4.3 冲突解决策略
package com.example.security.data.sync
/**
* 数据冲突解决器
*
* 当本地数据和远程数据冲突时,决定使用哪个版本
*/
@Singleton
class ConflictResolver @Inject constructor() {
/**
* 解决设备数据冲突
*
* 策略:
* 1. 比较时间戳,使用最新的数据
* 2. 如果时间戳相同,优先使用远程数据
* 3. 对于某些字段(如用户自定义名称),优先保留本地修改
*/
fun resolve(localDevice: Device, remoteDevice: Device): Device {
// 1. 如果远程数据更新,直接使用远程数据
if (remoteDevice.updatedAt > localDevice.updatedAt) {
return remoteDevice
}
// 2. 如果本地数据更新,保留本地数据
if (localDevice.updatedAt > remoteDevice.updatedAt) {
return localDevice
}
// 3. 时间戳相同,进行字段级别合并
return mergeDevices(localDevice, remoteDevice)
}
/**
* 字段级别合并
*/
private fun mergeDevices(localDevice: Device, remoteDevice: Device): Device {
// 优先保留本地用户自定义字段
return remoteDevice.copy(
name = localDevice.name.takeIf { it != remoteDevice.name } ?: remoteDevice.name,
// 其他字段使用远程数据
)
}
}
五、数据层最佳实践
5.1 错误处理与重试
package com.example.security.data.common
/**
* 网络请求重试扩展
*/
suspend fun <T> retryWithExponentialBackoff(
times: Int = 3,
initialDelay: Long = 1000, // 1秒
maxDelay: Long = 10000, // 10秒
factor: Double = 2.0,
block: suspend () -> T
): T {
var currentDelay = initialDelay
repeat(times - 1) { attempt ->
try {
return block()
} catch (e: Exception) {
// 判断是否应该重试
if (!shouldRetry(e)) {
throw e
}
}
delay(currentDelay)
currentDelay = (currentDelay * factor).toLong().coerceAtMost(maxDelay)
}
return block() // 最后一次尝试
}
/**
* 判断异常是否应该重试
*/
private fun shouldRetry(exception: Exception): Boolean {
return when (exception) {
is java.net.SocketTimeoutException -> true
is java.net.UnknownHostException -> true
is java.io.IOException -> true
else -> false
}
}
5.2 数据映射器
package com.example.security.data.mapper
/**
* 设备数据映射器
*
* 负责不同层之间的数据模型转换
*/
class DeviceMapper @Inject constructor() {
/**
* DTO -> Domain Model
*/
fun mapToDomain(dto: DeviceDto): Device {
return Device(
id = dto.id,
name = dto.name,
type = dto.type,
model = dto.model,
serialNumber = dto.serialNumber,
firmwareVersion = dto.firmwareVersion,
isOnline = dto.isOnline,
batteryLevel = dto.batteryLevel,
createdAt = dto.createdAt,
updatedAt = dto.updatedAt
)
}
/**
* Domain Model -> DTO
*/
fun mapToDto(device: Device): DeviceDto {
return DeviceDto(
id = device.id,
name = device.name,
type = device.type,
model = device.model,
serialNumber = device.serialNumber,
firmwareVersion = device.firmwareVersion,
isOnline = device.isOnline,
batteryLevel = device.batteryLevel,
createdAt = device.createdAt,
updatedAt = device.updatedAt
)
}
/**
* Entity -> Domain Model
*/
fun mapToDomain(entity: DeviceEntity): Device {
return Device(
id = entity.id,
name = entity.name,
type = entity.type,
model = entity.model,
serialNumber = entity.serialNumber,
firmwareVersion = entity.firmwareVersion,
isOnline = entity.isOnline,
batteryLevel = entity.batteryLevel,
createdAt = entity.createdAt,
updatedAt = entity.updatedAt
)
}
/**
* Domain Model -> Entity
*/
fun mapToEntity(device: Device): DeviceEntity {
return DeviceEntity(
id = device.id,
name = device.name,
type = device.type,
model = device.model,
serialNumber = device.serialNumber,
firmwareVersion = device.firmwareVersion,
isOnline = device.isOnline,
batteryLevel = device.batteryLevel,
createdAt = device.createdAt,
updatedAt = device.updatedAt,
syncStatus = SyncStatus.SYNCED,
lastSyncTime = System.currentTimeMillis()
)
}
}
5.3 数据层测试
package com.example.security.data.repository
class DeviceRepositoryImplTest {
private lateinit var repository: DeviceRepositoryImpl
private lateinit var remoteDataSource: RemoteDeviceDataSource
private lateinit var localDataSource: LocalDeviceDataSource
private lateinit var cacheDataSource: DeviceCacheDataSource
@Before
fun setup() {
remoteDataSource = mockk()
localDataSource = mockk()
cacheDataSource = mockk()
repository = DeviceRepositoryImpl(
remoteDataSource = remoteDataSource,
localDataSource = localDataSource,
cacheDataSource = cacheDataSource,
deviceMapper = DeviceMapper()
)
}
@Test
fun `getDevices returns cached data first`() = runTest {
// Given
val cachedDevices = listOf(
Device(id = "1", name = "Device 1"),
Device(id = "2", name = "Device 2")
)
every { cacheDataSource.getDevices() } returns cachedDevices
every { cacheDataSource.getLastCacheTimestamp() } returns System.currentTimeMillis()
// When
val result = repository.getDevices(forceRefresh = false).first()
// Then
assertTrue(result is Result.Success)
assertEquals(cachedDevices, (result as Result.Success).data)
verify { cacheDataSource.getDevices() }
verify(exactly = 0) { localDataSource.getDevices() }
verify(exactly = 0) { remoteDataSource.fetchDevices() }
}
@Test
fun `getDevices fetches from remote when cache expired`() = runTest {
// Given
val remoteDevices = listOf(
Device(id = "1", name = "Device 1"),
Device(id = "2", name = "Device 2")
)
every { cacheDataSource.getDevices() } returns emptyList()
every { cacheDataSource.getLastCacheTimestamp() } returns 0L
every { localDataSource.getDevices() } returns flowOf(emptyList())
coEvery { remoteDataSource.fetchDevices() } returns Result.Success(remoteDevices)
coEvery { localDataSource.saveDevices(any()) } just Runs
every { cacheDataSource.saveDevices(any()) } just Runs
every { cacheDataSource.updateCacheTimestamp() } just Runs
// When
val result = repository.getDevices(forceRefresh = false).first()
// Then
assertTrue(result is Result.Success)
assertEquals(remoteDevices, (result as Result.Success).data)
coVerify { remoteDataSource.fetchDevices() }
coVerify { localDataSource.saveDevices(remoteDevices) }
verify { cacheDataSource.saveDevices(remoteDevices) }
}
}
六、总结
6.1 关键技术点
- Repository模式:单一数据源,封装数据访问逻辑
- 多数据源协调:远程API、本地数据库、内存缓存的协同工作
- 离线优先:优先使用本地数据,后台同步远程数据
- 响应式数据流:使用Flow实现数据变化的自动推送
- 数据同步:双向同步、冲突解决、重试机制
- 分层架构:清晰的数据层、领域层、展示层分离
6.2 架构优势
- 高性能:多级缓存、懒加载、响应式数据流
- 高可用:离线支持、自动同步、错误处理
- 可扩展:模块化设计、依赖注入、接口抽象
- 易测试:清晰的分层、Mock友好的接口设计
- 易维护:单一职责、代码复用、统一的数据访问模式
6.3 最佳实践建议
- 始终使用Repository作为数据访问入口
- 实现离线优先策略,提升用户体验
- 使用Flow实现响应式数据流
- 设计合理的缓存策略,平衡性能和数据新鲜度
- 实现完善的数据同步和冲突解决机制
- 充分的单元测试和集成测试
- 监控数据层性能,及时发现和解决问题
通过本文的架构设计和实现,您可以构建一个高性能、可靠、易维护的移动应用数据层,为上层业务提供坚实的数据基础设施支持。
更多推荐


所有评论(0)