111. 数据层架构设计:打造高性能、可扩展的移动应用数据层

摘要

数据层是移动应用架构的核心基础设施,负责数据的获取、存储、缓存和同步。本文深入探讨大型Android应用的数据层架构设计,涵盖Repository模式、多数据源管理、数据同步策略、离线支持、数据一致性保证等核心议题。通过完整的架构设计和Kotlin实现,展示如何构建一个高性能、可扩展、易测试的数据层架构,支撑复杂业务场景和百万级用户规模。

关键词: Repository模式、数据源管理、数据同步、离线优先、数据一致性、Flow、Kotlin


一、数据层架构概述

1.1 数据层的核心职责

外部依赖

数据层 Data Layer

领域层 Domain Layer

展示层 Presentation Layer

数据管理

数据源层

UI Components

ViewModel

Use Cases

Domain Models

Repository

Remote Source

Local Source

Cache Source

Memory Source

Sync Manager

Cache Manager

State Manager

REST API

Database

Disk Storage

Memory Cache

核心职责

  • 数据获取:从远程API、本地数据库、缓存等多个数据源获取数据
  • 数据存储:持久化数据到本地数据库、文件系统
  • 数据缓存:多级缓存策略提升性能
  • 数据同步:在线/离线数据同步、冲突解决
  • 数据转换:API模型到领域模型的转换
  • 状态管理:数据加载状态、错误处理

1.2 数据层设计原则

/**
 * 数据层设计原则
 *
 * 1. 单一数据源原则 (Single Source of Truth)
 *    - Repository是唯一的数据访问入口
 *    - 避免UI层直接访问数据源
 *
 * 2. 离线优先原则 (Offline First)
 *    - 优先使用本地数据
 *    - 后台同步远程数据
 *
 * 3. 响应式数据流原则
 *    - 使用Flow/LiveData等响应式数据流
 *    - 数据变化自动通知观察者
 *
 * 4. 分层原则
 *    - 清晰的数据源分层
 *    - Repository封装数据源细节
 *
 * 5. 可测试原则
 *    - 依赖注入
 *    - 接口抽象
 */

二、Repository模式深度实践

2.1 Repository接口设计

package com.example.security.data.repository


/**
 * 设备仓储接口
 *
 * 定义设备数据访问的标准契约
 */
interface DeviceRepository {

    /**
     * 获取设备列表 - 响应式数据流
     *
     * @param forceRefresh 是否强制刷新
     * @return 设备列表Flow,自动推送数据变化
     */
    fun getDevices(forceRefresh: Boolean = false): Flow<Result<List<Device>>>

    /**
     * 获取单个设备详情
     *
     * @param deviceId 设备ID
     * @return 设备详情Flow
     */
    fun getDevice(deviceId: String): Flow<Result<Device>>

    /**
     * 获取设备实时状态
     *
     * @param deviceId 设备ID
     * @return 设备状态Flow,实时推送状态变化
     */
    fun getDeviceStatus(deviceId: String): Flow<Result<DeviceStatus>>

    /**
     * 更新设备信息
     *
     * @param device 设备信息
     * @return 更新结果
     */
    suspend fun updateDevice(device: Device): Result<Device>

    /**
     * 删除设备
     *
     * @param deviceId 设备ID
     * @return 删除结果
     */
    suspend fun deleteDevice(deviceId: String): Result<Unit>

    /**
     * 同步设备数据
     *
     * @return 同步结果
     */
    suspend fun syncDevices(): Result<Unit>

    /**
     * 清除缓存
     */
    suspend fun clearCache()
}

2.2 Repository实现 - 多数据源协调

package com.example.security.data.repository


/**
 * 设备仓储实现
 *
 * 协调远程数据源、本地数据源和缓存数据源
 * 实现离线优先、自动同步、多级缓存策略
 */
@Singleton
class DeviceRepositoryImpl @Inject constructor(
    private val remoteDataSource: RemoteDeviceDataSource,
    private val localDataSource: LocalDeviceDataSource,
    private val cacheDataSource: DeviceCacheDataSource,
    private val deviceMapper: DeviceMapper,
    private val ioDispatcher: CoroutineDispatcher = Dispatchers.IO
) : DeviceRepository {

    companion object {
        private const val CACHE_EXPIRY_MINUTES = 5L
    }

    /**
     * 获取设备列表 - 离线优先策略
     *
     * 数据流:
     * 1. 先返回缓存数据(如果有且未过期)
     * 2. 再返回本地数据库数据
     * 3. 后台刷新远程数据
     * 4. 更新本地数据库和缓存
     * 5. 推送最新数据
     */
    override fun getDevices(forceRefresh: Boolean): Flow<Result<List<Device>>> = flow {
        // 1. 首先尝试从内存缓存获取
        if (!forceRefresh) {
            val cachedDevices = cacheDataSource.getDevices()
            if (cachedDevices.isNotEmpty() && !isCacheExpired()) {
                emit(Result.Success(cachedDevices))
            }
        }

        // 2. 从本地数据库获取
        localDataSource.getDevices()
            .catch { e ->
                // 本地数据获取失败,记录日志但不中断流
                emit(Result.Error(e))
            }
            .collect { localDevices ->
                if (localDevices.isNotEmpty()) {
                    // 发送本地数据
                    emit(Result.Success(localDevices))

                    // 更新内存缓存
                    cacheDataSource.saveDevices(localDevices)
                }
            }

        // 3. 根据策略决定是否刷新远程数据
        if (forceRefresh || shouldRefreshFromRemote()) {
            emit(Result.Loading)

            try {
                // 从远程API获取最新数据
                val remoteResult = remoteDataSource.fetchDevices()

                when (remoteResult) {
                    is Result.Success -> {
                        val devices = remoteResult.data

                        // 保存到本地数据库
                        localDataSource.saveDevices(devices)

                        // 更新内存缓存
                        cacheDataSource.saveDevices(devices)
                        cacheDataSource.updateCacheTimestamp()

                        // 发送最新数据
                        emit(Result.Success(devices))
                    }
                    is Result.Error -> {
                        // 远程获取失败,使用本地数据,发送错误通知
                        emit(Result.Error(remoteResult.exception))
                    }
                    is Result.Loading -> {
                        // 保持Loading状态
                    }
                }
            } catch (e: Exception) {
                // 网络异常,继续使用本地数据
                emit(Result.Error(e))
            }
        }
    }.flowOn(ioDispatcher)

    /**
     * 获取单个设备 - 缓存优先策略
     */
    override fun getDevice(deviceId: String): Flow<Result<Device>> = flow {
        // 1. 先从缓存获取
        val cachedDevice = cacheDataSource.getDevice(deviceId)
        if (cachedDevice != null && !isCacheExpired()) {
            emit(Result.Success(cachedDevice))
        }

        // 2. 从本地数据库获取
        localDataSource.getDevice(deviceId)
            .catch { e -> emit(Result.Error(e)) }
            .collect { device ->
                emit(Result.Success(device))
                cacheDataSource.saveDevice(device)
            }

        // 3. 后台刷新远程数据
        if (shouldRefreshFromRemote()) {
            try {
                val remoteResult = remoteDataSource.fetchDevice(deviceId)
                if (remoteResult is Result.Success) {
                    val device = remoteResult.data
                    localDataSource.saveDevice(device)
                    cacheDataSource.saveDevice(device)
                    emit(Result.Success(device))
                }
            } catch (e: Exception) {
                // 忽略远程刷新错误
            }
        }
    }.flowOn(ioDispatcher)

    /**
     * 获取设备实时状态 - WebSocket + 轮询策略
     */
    override fun getDeviceStatus(deviceId: String): Flow<Result<DeviceStatus>> =
        remoteDataSource.observeDeviceStatus(deviceId)
            .catch { e ->
                // WebSocket失败,降级到轮询
                emitAll(pollDeviceStatus(deviceId))
            }
            .onEach { result ->
                // 缓存最新状态
                if (result is Result.Success) {
                    cacheDataSource.saveDeviceStatus(deviceId, result.data)
                }
            }
            .flowOn(ioDispatcher)

    /**
     * 更新设备信息 - 乐观更新策略
     */
    override suspend fun updateDevice(device: Device): Result<Device> = withContext(ioDispatcher) {
        try {
            // 1. 乐观更新本地数据库
            localDataSource.saveDevice(device)
            cacheDataSource.saveDevice(device)

            // 2. 同步到远程服务器
            val remoteResult = remoteDataSource.updateDevice(device)

            when (remoteResult) {
                is Result.Success -> {
                    // 远程更新成功,使用服务器返回的数据
                    val updatedDevice = remoteResult.data
                    localDataSource.saveDevice(updatedDevice)
                    cacheDataSource.saveDevice(updatedDevice)
                    Result.Success(updatedDevice)
                }
                is Result.Error -> {
                    // 远程更新失败,回滚本地数据
                    // 这里可以实现冲突解决策略
                    Result.Error(remoteResult.exception)
                }
                else -> Result.Error(Exception("Unknown error"))
            }
        } catch (e: Exception) {
            Result.Error(e)
        }
    }

    /**
     * 删除设备 - 远程优先策略
     */
    override suspend fun deleteDevice(deviceId: String): Result<Unit> = withContext(ioDispatcher) {
        try {
            // 1. 先删除远程数据
            val remoteResult = remoteDataSource.deleteDevice(deviceId)

            if (remoteResult is Result.Success) {
                // 2. 删除本地数据
                localDataSource.deleteDevice(deviceId)
                cacheDataSource.deleteDevice(deviceId)
                Result.Success(Unit)
            } else {
                remoteResult as Result.Error
                Result.Error(remoteResult.exception)
            }
        } catch (e: Exception) {
            Result.Error(e)
        }
    }

    /**
     * 同步设备数据
     */
    override suspend fun syncDevices(): Result<Unit> = withContext(ioDispatcher) {
        try {
            // 1. 获取本地待同步数据
            val localDevices = localDataSource.getPendingSyncDevices()

            // 2. 同步到远程
            localDevices.forEach { device ->
                remoteDataSource.updateDevice(device)
                localDataSource.markSynced(device.id)
            }

            // 3. 从远程拉取最新数据
            val remoteResult = remoteDataSource.fetchDevices()
            if (remoteResult is Result.Success) {
                localDataSource.saveDevices(remoteResult.data)
                cacheDataSource.saveDevices(remoteResult.data)
            }

            Result.Success(Unit)
        } catch (e: Exception) {
            Result.Error(e)
        }
    }

    override suspend fun clearCache() {
        cacheDataSource.clear()
    }

    // 私有辅助方法

    private fun isCacheExpired(): Boolean {
        val lastCacheTime = cacheDataSource.getLastCacheTimestamp()
        val currentTime = System.currentTimeMillis()
        val expiryTime = TimeUnit.MINUTES.toMillis(CACHE_EXPIRY_MINUTES)
        return currentTime - lastCacheTime > expiryTime
    }

    private fun shouldRefreshFromRemote(): Boolean {
        return isCacheExpired() || !cacheDataSource.hasCache()
    }

    private fun pollDeviceStatus(deviceId: String): Flow<Result<DeviceStatus>> = flow {
        while (true) {
            val result = remoteDataSource.fetchDeviceStatus(deviceId)
            emit(result)
            kotlinx.coroutines.delay(5000) // 5秒轮询一次
        }
    }
}

三、多数据源架构设计

3.1 数据源层次结构

Cache Sources

Local Sources

Remote Sources

Data Source Layer

Repository Layer

Repository

Remote Data Source

Local Data Source

Cache Data Source

REST API

WebSocket

gRPC

Room Database

SharedPreferences

File Storage

Memory Cache

Disk Cache

3.2 远程数据源实现

package com.example.security.data.source.remote


/**
 * 远程设备数据源
 *
 * 负责与远程API交互
 */
@Singleton
class RemoteDeviceDataSource @Inject constructor(
    private val apiService: DeviceApiService,
    private val deviceMapper: DeviceMapper,
    private val webSocketClient: DeviceWebSocketClient
) {

    /**
     * 从远程API获取设备列表
     */
    suspend fun fetchDevices(): Result<List<Device>> {
        return try {
            val response = apiService.getDevices()
            if (response.isSuccessful) {
                val deviceDtos = response.body() ?: emptyList()
                val devices = deviceDtos.map { deviceMapper.mapToDomain(it) }
                Result.Success(devices)
            } else {
                Result.Error(Exception("API Error: ${response.code()}"))
            }
        } catch (e: Exception) {
            Result.Error(e)
        }
    }

    /**
     * 从远程API获取单个设备
     */
    suspend fun fetchDevice(deviceId: String): Result<Device> {
        return try {
            val response = apiService.getDevice(deviceId)
            if (response.isSuccessful) {
                val deviceDto = response.body()
                if (deviceDto != null) {
                    Result.Success(deviceMapper.mapToDomain(deviceDto))
                } else {
                    Result.Error(Exception("Device not found"))
                }
            } else {
                Result.Error(Exception("API Error: ${response.code()}"))
            }
        } catch (e: Exception) {
            Result.Error(e)
        }
    }

    /**
     * 通过WebSocket观察设备状态
     */
    fun observeDeviceStatus(deviceId: String): Flow<Result<DeviceStatus>> = flow {
        webSocketClient.connect(deviceId)
            .collect { statusDto ->
                val status = deviceMapper.mapStatusToDomain(statusDto)
                emit(Result.Success(status))
            }
    }

    /**
     * 轮询获取设备状态
     */
    suspend fun fetchDeviceStatus(deviceId: String): Result<DeviceStatus> {
        return try {
            val response = apiService.getDeviceStatus(deviceId)
            if (response.isSuccessful) {
                val statusDto = response.body()
                if (statusDto != null) {
                    Result.Success(deviceMapper.mapStatusToDomain(statusDto))
                } else {
                    Result.Error(Exception("Status not found"))
                }
            } else {
                Result.Error(Exception("API Error: ${response.code()}"))
            }
        } catch (e: Exception) {
            Result.Error(e)
        }
    }

    /**
     * 更新设备信息到远程服务器
     */
    suspend fun updateDevice(device: Device): Result<Device> {
        return try {
            val deviceDto = deviceMapper.mapToDto(device)
            val response = apiService.updateDevice(device.id, deviceDto)
            if (response.isSuccessful) {
                val updatedDto = response.body()
                if (updatedDto != null) {
                    Result.Success(deviceMapper.mapToDomain(updatedDto))
                } else {
                    Result.Error(Exception("Update failed"))
                }
            } else {
                Result.Error(Exception("API Error: ${response.code()}"))
            }
        } catch (e: Exception) {
            Result.Error(e)
        }
    }

    /**
     * 删除远程设备
     */
    suspend fun deleteDevice(deviceId: String): Result<Unit> {
        return try {
            val response = apiService.deleteDevice(deviceId)
            if (response.isSuccessful) {
                Result.Success(Unit)
            } else {
                Result.Error(Exception("API Error: ${response.code()}"))
            }
        } catch (e: Exception) {
            Result.Error(e)
        }
    }
}

3.3 本地数据源实现

package com.example.security.data.source.local


/**
 * 本地设备数据源
 *
 * 负责与本地数据库交互
 */
@Singleton
class LocalDeviceDataSource @Inject constructor(
    private val deviceDao: DeviceDao,
    private val deviceMapper: DeviceMapper
) {

    /**
     * 获取所有设备 - 响应式
     */
    fun getDevices(): Flow<List<Device>> {
        return deviceDao.getAllDevices()
            .map { entities ->
                entities.map { deviceMapper.mapToDomain(it) }
            }
    }

    /**
     * 获取单个设备 - 响应式
     */
    fun getDevice(deviceId: String): Flow<Device> {
        return deviceDao.getDevice(deviceId)
            .map { entity ->
                deviceMapper.mapToDomain(entity)
            }
    }

    /**
     * 保存单个设备
     */
    suspend fun saveDevice(device: Device) {
        val entity = deviceMapper.mapToEntity(device)
        deviceDao.insertDevice(entity)
    }

    /**
     * 保存多个设备
     */
    suspend fun saveDevices(devices: List<Device>) {
        val entities = devices.map { deviceMapper.mapToEntity(it) }
        deviceDao.insertDevices(entities)
    }

    /**
     * 删除设备
     */
    suspend fun deleteDevice(deviceId: String) {
        deviceDao.deleteDevice(deviceId)
    }

    /**
     * 获取待同步设备
     */
    suspend fun getPendingSyncDevices(): List<Device> {
        return deviceDao.getPendingSyncDevices()
            .map { deviceMapper.mapToDomain(it) }
    }

    /**
     * 标记设备已同步
     */
    suspend fun markSynced(deviceId: String) {
        deviceDao.markSynced(deviceId, System.currentTimeMillis())
    }

    /**
     * 清空所有设备
     */
    suspend fun clearAll() {
        deviceDao.deleteAll()
    }
}

3.4 缓存数据源实现

package com.example.security.data.source.cache


/**
 * 设备缓存数据源
 *
 * 使用LruCache实现内存缓存
 */
@Singleton
class DeviceCacheDataSource @Inject constructor() {

    companion object {
        private const val CACHE_SIZE = 50 // 缓存50个设备
    }

    // 设备列表缓存
    private var devicesCache: List<Device>? = null

    // 单个设备缓存
    private val deviceCache = LruCache<String, Device>(CACHE_SIZE)

    // 设备状态缓存
    private val statusCache = LruCache<String, DeviceStatus>(CACHE_SIZE)

    // 缓存时间戳
    private var cacheTimestamp: Long = 0L

    /**
     * 获取所有设备缓存
     */
    fun getDevices(): List<Device> {
        return devicesCache ?: emptyList()
    }

    /**
     * 保存设备列表到缓存
     */
    fun saveDevices(devices: List<Device>) {
        devicesCache = devices
        // 同时更新单个设备缓存
        devices.forEach { device ->
            deviceCache.put(device.id, device)
        }
    }

    /**
     * 获取单个设备缓存
     */
    fun getDevice(deviceId: String): Device? {
        return deviceCache.get(deviceId)
    }

    /**
     * 保存单个设备到缓存
     */
    fun saveDevice(device: Device) {
        deviceCache.put(device.id, device)

        // 更新列表缓存中的对应项
        devicesCache?.let { list ->
            val index = list.indexOfFirst { it.id == device.id }
            if (index != -1) {
                val mutableList = list.toMutableList()
                mutableList[index] = device
                devicesCache = mutableList
            }
        }
    }

    /**
     * 删除设备缓存
     */
    fun deleteDevice(deviceId: String) {
        deviceCache.remove(deviceId)
        statusCache.remove(deviceId)

        devicesCache?.let { list ->
            devicesCache = list.filter { it.id != deviceId }
        }
    }

    /**
     * 保存设备状态到缓存
     */
    fun saveDeviceStatus(deviceId: String, status: DeviceStatus) {
        statusCache.put(deviceId, status)
    }

    /**
     * 获取设备状态缓存
     */
    fun getDeviceStatus(deviceId: String): DeviceStatus? {
        return statusCache.get(deviceId)
    }

    /**
     * 更新缓存时间戳
     */
    fun updateCacheTimestamp() {
        cacheTimestamp = System.currentTimeMillis()
    }

    /**
     * 获取缓存时间戳
     */
    fun getLastCacheTimestamp(): Long {
        return cacheTimestamp
    }

    /**
     * 是否有缓存
     */
    fun hasCache(): Boolean {
        return devicesCache != null && devicesCache!!.isNotEmpty()
    }

    /**
     * 清空所有缓存
     */
    fun clear() {
        devicesCache = null
        deviceCache.evictAll()
        statusCache.evictAll()
        cacheTimestamp = 0L
    }
}

四、数据同步与离线支持

4.1 数据同步架构

数据源

同步管理器

同步触发器

同步队列

同步策略

同步

同步

手动同步

自动同步

网络恢复

定期同步

Sync Manager

冲突解决

数据合并

优先级管理

Sync Queue

重试机制

Local DB

Remote API

4.2 同步管理器实现

package com.example.security.data.sync


/**
 * 数据同步管理器
 *
 * 负责本地数据与远程数据的双向同步
 */
@Singleton
class DataSyncManager @Inject constructor(
    private val localDataSource: LocalDeviceDataSource,
    private val remoteDataSource: RemoteDeviceDataSource,
    private val networkMonitor: NetworkMonitor,
    private val syncStateManager: SyncStateManager,
    private val conflictResolver: ConflictResolver,
    private val ioDispatcher: CoroutineDispatcher = Dispatchers.IO
) {

    private val syncQueue = ConcurrentLinkedQueue<SyncTask>()
    private val syncScope = CoroutineScope(ioDispatcher + SupervisorJob())

    // 同步状态Flow
    private val _syncState = MutableStateFlow<SyncState>(SyncState.Idle)
    val syncState: StateFlow<SyncState> = _syncState.asStateFlow()

    init {
        // 监听网络状态,网络恢复时自动同步
        syncScope.launch {
            networkMonitor.isOnline
                .filter { it } // 只在网络恢复时触发
                .collect {
                    autoSync()
                }
        }
    }

    /**
     * 手动触发完整同步
     */
    suspend fun sync(): Result<Unit> = withContext(ioDispatcher) {
        if (_syncState.value is SyncState.Syncing) {
            return@withContext Result.Error(Exception("Sync already in progress"))
        }

        _syncState.value = SyncState.Syncing(progress = 0)

        try {
            // 1. 上传本地变更
            val uploadResult = uploadLocalChanges()
            if (uploadResult is Result.Error) {
                _syncState.value = SyncState.Error(uploadResult.exception)
                return@withContext uploadResult
            }

            _syncState.value = SyncState.Syncing(progress = 50)

            // 2. 下载远程变更
            val downloadResult = downloadRemoteChanges()
            if (downloadResult is Result.Error) {
                _syncState.value = SyncState.Error(downloadResult.exception)
                return@withContext downloadResult
            }

            _syncState.value = SyncState.Success(System.currentTimeMillis())
            Result.Success(Unit)

        } catch (e: Exception) {
            _syncState.value = SyncState.Error(e)
            Result.Error(e)
        }
    }

    /**
     * 自动同步 - 后台静默同步
     */
    private suspend fun autoSync() {
        if (!networkMonitor.isOnline.value) return
        if (_syncState.value is SyncState.Syncing) return

        sync()
    }

    /**
     * 上传本地变更到远程
     */
    private suspend fun uploadLocalChanges(): Result<Unit> {
        try {
            // 获取所有待同步的本地设备
            val pendingDevices = localDataSource.getPendingSyncDevices()

            if (pendingDevices.isEmpty()) {
                return Result.Success(Unit)
            }

            // 批量上传
            val uploadResults = pendingDevices.map { device ->
                async(ioDispatcher) {
                    uploadDevice(device)
                }
            }.awaitAll()

            // 检查是否有失败的上传
            val failedUploads = uploadResults.filterIsInstance<Result.Error>()
            if (failedUploads.isNotEmpty()) {
                return Result.Error(Exception("Some uploads failed"))
            }

            return Result.Success(Unit)

        } catch (e: Exception) {
            return Result.Error(e)
        }
    }

    /**
     * 上传单个设备
     */
    private suspend fun uploadDevice(device: Device): Result<Unit> {
        return when (val result = remoteDataSource.updateDevice(device)) {
            is Result.Success -> {
                // 标记为已同步
                localDataSource.markSynced(device.id)
                Result.Success(Unit)
            }
            is Result.Error -> {
                // 上传失败,加入重试队列
                addToRetryQueue(SyncTask.Upload(device))
                result
            }
            else -> Result.Error(Exception("Unknown error"))
        }
    }

    /**
     * 下载远程变更到本地
     */
    private suspend fun downloadRemoteChanges(): Result<Unit> {
        try {
            // 获取远程所有设备
            val remoteResult = remoteDataSource.fetchDevices()

            if (remoteResult is Result.Error) {
                return remoteResult
            }

            val remoteDevices = (remoteResult as Result.Success).data

            // 获取本地所有设备
            val localDevices = localDataSource.getDevices().first()

            // 合并数据,解决冲突
            val mergedDevices = mergeData(localDevices, remoteDevices)

            // 保存合并后的数据
            localDataSource.saveDevices(mergedDevices)

            return Result.Success(Unit)

        } catch (e: Exception) {
            return Result.Error(e)
        }
    }

    /**
     * 合并本地和远程数据
     */
    private suspend fun mergeData(
        localDevices: List<Device>,
        remoteDevices: List<Device>
    ): List<Device> {
        val localMap = localDevices.associateBy { it.id }
        val remoteMap = remoteDevices.associateBy { it.id }

        val mergedMap = mutableMapOf<String, Device>()

        // 处理远程设备
        remoteDevices.forEach { remoteDevice ->
            val localDevice = localMap[remoteDevice.id]

            if (localDevice == null) {
                // 远程新增设备,直接使用
                mergedMap[remoteDevice.id] = remoteDevice
            } else {
                // 设备冲突,使用冲突解决器
                val resolvedDevice = conflictResolver.resolve(localDevice, remoteDevice)
                mergedMap[remoteDevice.id] = resolvedDevice
            }
        }

        // 处理本地独有设备(远程已删除)
        localDevices.forEach { localDevice ->
            if (!remoteMap.containsKey(localDevice.id)) {
                // 本地独有,保留本地数据(后续会上传)
                if (!mergedMap.containsKey(localDevice.id)) {
                    mergedMap[localDevice.id] = localDevice
                }
            }
        }

        return mergedMap.values.toList()
    }

    /**
     * 添加到重试队列
     */
    private fun addToRetryQueue(task: SyncTask) {
        syncQueue.offer(task)

        // 启动重试任务
        syncScope.launch {
            delay(5000) // 5秒后重试
            retrySync()
        }
    }

    /**
     * 重试同步
     */
    private suspend fun retrySync() {
        val task = syncQueue.poll() ?: return

        when (task) {
            is SyncTask.Upload -> {
                val result = uploadDevice(task.device)
                if (result is Result.Error && task.retryCount < 3) {
                    // 继续重试
                    addToRetryQueue(task.copy(retryCount = task.retryCount + 1))
                }
            }
            is SyncTask.Download -> {
                // 处理下载重试
            }
        }
    }
}

/**
 * 同步任务
 */
sealed class SyncTask(open val retryCount: Int = 0) {
    data class Upload(
        val device: Device,
        override val retryCount: Int = 0
    ) : SyncTask(retryCount)

    data class Download(
        val deviceId: String,
        override val retryCount: Int = 0
    ) : SyncTask(retryCount)
}

/**
 * 同步状态
 */
sealed class SyncState {
    object Idle : SyncState()
    data class Syncing(val progress: Int) : SyncState()
    data class Success(val timestamp: Long) : SyncState()
    data class Error(val exception: Exception) : SyncState()
}

4.3 冲突解决策略

package com.example.security.data.sync


/**
 * 数据冲突解决器
 *
 * 当本地数据和远程数据冲突时,决定使用哪个版本
 */
@Singleton
class ConflictResolver @Inject constructor() {

    /**
     * 解决设备数据冲突
     *
     * 策略:
     * 1. 比较时间戳,使用最新的数据
     * 2. 如果时间戳相同,优先使用远程数据
     * 3. 对于某些字段(如用户自定义名称),优先保留本地修改
     */
    fun resolve(localDevice: Device, remoteDevice: Device): Device {
        // 1. 如果远程数据更新,直接使用远程数据
        if (remoteDevice.updatedAt > localDevice.updatedAt) {
            return remoteDevice
        }

        // 2. 如果本地数据更新,保留本地数据
        if (localDevice.updatedAt > remoteDevice.updatedAt) {
            return localDevice
        }

        // 3. 时间戳相同,进行字段级别合并
        return mergeDevices(localDevice, remoteDevice)
    }

    /**
     * 字段级别合并
     */
    private fun mergeDevices(localDevice: Device, remoteDevice: Device): Device {
        // 优先保留本地用户自定义字段
        return remoteDevice.copy(
            name = localDevice.name.takeIf { it != remoteDevice.name } ?: remoteDevice.name,
            // 其他字段使用远程数据
        )
    }
}

五、数据层最佳实践

5.1 错误处理与重试

package com.example.security.data.common


/**
 * 网络请求重试扩展
 */
suspend fun <T> retryWithExponentialBackoff(
    times: Int = 3,
    initialDelay: Long = 1000, // 1秒
    maxDelay: Long = 10000,    // 10秒
    factor: Double = 2.0,
    block: suspend () -> T
): T {
    var currentDelay = initialDelay
    repeat(times - 1) { attempt ->
        try {
            return block()
        } catch (e: Exception) {
            // 判断是否应该重试
            if (!shouldRetry(e)) {
                throw e
            }
        }
        delay(currentDelay)
        currentDelay = (currentDelay * factor).toLong().coerceAtMost(maxDelay)
    }
    return block() // 最后一次尝试
}

/**
 * 判断异常是否应该重试
 */
private fun shouldRetry(exception: Exception): Boolean {
    return when (exception) {
        is java.net.SocketTimeoutException -> true
        is java.net.UnknownHostException -> true
        is java.io.IOException -> true
        else -> false
    }
}

5.2 数据映射器

package com.example.security.data.mapper


/**
 * 设备数据映射器
 *
 * 负责不同层之间的数据模型转换
 */
class DeviceMapper @Inject constructor() {

    /**
     * DTO -> Domain Model
     */
    fun mapToDomain(dto: DeviceDto): Device {
        return Device(
            id = dto.id,
            name = dto.name,
            type = dto.type,
            model = dto.model,
            serialNumber = dto.serialNumber,
            firmwareVersion = dto.firmwareVersion,
            isOnline = dto.isOnline,
            batteryLevel = dto.batteryLevel,
            createdAt = dto.createdAt,
            updatedAt = dto.updatedAt
        )
    }

    /**
     * Domain Model -> DTO
     */
    fun mapToDto(device: Device): DeviceDto {
        return DeviceDto(
            id = device.id,
            name = device.name,
            type = device.type,
            model = device.model,
            serialNumber = device.serialNumber,
            firmwareVersion = device.firmwareVersion,
            isOnline = device.isOnline,
            batteryLevel = device.batteryLevel,
            createdAt = device.createdAt,
            updatedAt = device.updatedAt
        )
    }

    /**
     * Entity -> Domain Model
     */
    fun mapToDomain(entity: DeviceEntity): Device {
        return Device(
            id = entity.id,
            name = entity.name,
            type = entity.type,
            model = entity.model,
            serialNumber = entity.serialNumber,
            firmwareVersion = entity.firmwareVersion,
            isOnline = entity.isOnline,
            batteryLevel = entity.batteryLevel,
            createdAt = entity.createdAt,
            updatedAt = entity.updatedAt
        )
    }

    /**
     * Domain Model -> Entity
     */
    fun mapToEntity(device: Device): DeviceEntity {
        return DeviceEntity(
            id = device.id,
            name = device.name,
            type = device.type,
            model = device.model,
            serialNumber = device.serialNumber,
            firmwareVersion = device.firmwareVersion,
            isOnline = device.isOnline,
            batteryLevel = device.batteryLevel,
            createdAt = device.createdAt,
            updatedAt = device.updatedAt,
            syncStatus = SyncStatus.SYNCED,
            lastSyncTime = System.currentTimeMillis()
        )
    }
}

5.3 数据层测试

package com.example.security.data.repository


class DeviceRepositoryImplTest {

    private lateinit var repository: DeviceRepositoryImpl
    private lateinit var remoteDataSource: RemoteDeviceDataSource
    private lateinit var localDataSource: LocalDeviceDataSource
    private lateinit var cacheDataSource: DeviceCacheDataSource

    @Before
    fun setup() {
        remoteDataSource = mockk()
        localDataSource = mockk()
        cacheDataSource = mockk()

        repository = DeviceRepositoryImpl(
            remoteDataSource = remoteDataSource,
            localDataSource = localDataSource,
            cacheDataSource = cacheDataSource,
            deviceMapper = DeviceMapper()
        )
    }

    @Test
    fun `getDevices returns cached data first`() = runTest {
        // Given
        val cachedDevices = listOf(
            Device(id = "1", name = "Device 1"),
            Device(id = "2", name = "Device 2")
        )

        every { cacheDataSource.getDevices() } returns cachedDevices
        every { cacheDataSource.getLastCacheTimestamp() } returns System.currentTimeMillis()

        // When
        val result = repository.getDevices(forceRefresh = false).first()

        // Then
        assertTrue(result is Result.Success)
        assertEquals(cachedDevices, (result as Result.Success).data)

        verify { cacheDataSource.getDevices() }
        verify(exactly = 0) { localDataSource.getDevices() }
        verify(exactly = 0) { remoteDataSource.fetchDevices() }
    }

    @Test
    fun `getDevices fetches from remote when cache expired`() = runTest {
        // Given
        val remoteDevices = listOf(
            Device(id = "1", name = "Device 1"),
            Device(id = "2", name = "Device 2")
        )

        every { cacheDataSource.getDevices() } returns emptyList()
        every { cacheDataSource.getLastCacheTimestamp() } returns 0L
        every { localDataSource.getDevices() } returns flowOf(emptyList())
        coEvery { remoteDataSource.fetchDevices() } returns Result.Success(remoteDevices)
        coEvery { localDataSource.saveDevices(any()) } just Runs
        every { cacheDataSource.saveDevices(any()) } just Runs
        every { cacheDataSource.updateCacheTimestamp() } just Runs

        // When
        val result = repository.getDevices(forceRefresh = false).first()

        // Then
        assertTrue(result is Result.Success)
        assertEquals(remoteDevices, (result as Result.Success).data)

        coVerify { remoteDataSource.fetchDevices() }
        coVerify { localDataSource.saveDevices(remoteDevices) }
        verify { cacheDataSource.saveDevices(remoteDevices) }
    }
}

六、总结

6.1 关键技术点

  1. Repository模式:单一数据源,封装数据访问逻辑
  2. 多数据源协调:远程API、本地数据库、内存缓存的协同工作
  3. 离线优先:优先使用本地数据,后台同步远程数据
  4. 响应式数据流:使用Flow实现数据变化的自动推送
  5. 数据同步:双向同步、冲突解决、重试机制
  6. 分层架构:清晰的数据层、领域层、展示层分离

6.2 架构优势

  • 高性能:多级缓存、懒加载、响应式数据流
  • 高可用:离线支持、自动同步、错误处理
  • 可扩展:模块化设计、依赖注入、接口抽象
  • 易测试:清晰的分层、Mock友好的接口设计
  • 易维护:单一职责、代码复用、统一的数据访问模式

6.3 最佳实践建议

  1. 始终使用Repository作为数据访问入口
  2. 实现离线优先策略,提升用户体验
  3. 使用Flow实现响应式数据流
  4. 设计合理的缓存策略,平衡性能和数据新鲜度
  5. 实现完善的数据同步和冲突解决机制
  6. 充分的单元测试和集成测试
  7. 监控数据层性能,及时发现和解决问题

通过本文的架构设计和实现,您可以构建一个高性能、可靠、易维护的移动应用数据层,为上层业务提供坚实的数据基础设施支持。

Logo

开源鸿蒙跨平台开发社区汇聚开发者与厂商,共建“一次开发,多端部署”的开源生态,致力于降低跨端开发门槛,推动万物智联创新。

更多推荐