原文链接

欢迎大家对于本站的访问 - AsterCasc

前言

对于跨平台socket的实现方式有很多,先不谈跨平台处理,一般我们要在Web端实现实时消息通信的方式是使用SockJs,服务端使用使用Spring的情况下自带了对于websocket的支持,可以通过集成相关库来完成消息通信

在我们使用SockJs的时候,Web端使用sockjs-client库,使用SockJS尝试建立WebSocket连接时,Spring会接收该连接请求,并尝试建立WebSocket通道。如果WebSocket不可用,Web端会回退到其他协议(HTTP StreamingHTTP Long Polling),Spring可以根据不同的协议做出不同的处理。众所周知,websocket是传输层协议,而在之上的具体实现的应用层协议一般为STOMP,消息以帧的形式传递,类似于HTTP,且支持发布/订阅模式

这方面的内容可以参考本站使用Spring构建实时聊天通知的页面应用以及使用Spring构建实时聊天通知的页面应用(续篇)相关内容

实现选择

如果使用SockJs,那么比较通用的跨平台方案还是使用krossbow完成会比较方便,但是这个库其实还是不是很成熟,使用下来各种小问题还是不少的。如果你的服务端不是已经确定使用SpringSockJs标准处理方案,或者你有时间再写一套连接方案以兼容不同的客户端,其实有很多对于非Web客户端更好写的实现手段,比如:WebSocket+消息订阅、Socket.IOMQTT甚至如果你没有服务端可以直接使Firebase

实现

关于服务端的实现,前言中的跳转文档中已经给出,这里不再赘述,这里只谈客户端

我们这里使用Koin进行依赖注入,对这里不太了解的小伙伴可以参考前文Kotlin Compose Multiplatform下导航解决方案中的Koin部分,当然也可以自己使用静态对象做单例做相似处理

引包

kotlin {
	sourceSets {
		commonMain.dependencies {
			implementation("org.hildan.krossbow:krossbow-stomp-core:4.5.0")  
			implementation("org.hildan.krossbow:krossbow-websocket-sockjs:4.5.0")  
			implementation("org.glassfish.tyrus.bundles:tyrus-standalone-client:1.19")  
			implementation("com.fasterxml.jackson.core:jackson-databind:2.13.5")
		}
	}
}

之前基本没有聊过包的版本问题,因为对于公共包而言,虽然我们说首位版本号更新是可以接受无法兼容的,但是对于这种常用的大体量的公共包而言,基本都会保留/兼容之前的过时接口和类,一般无脑上最新或者次新稳定版本就行了。但是几个包是有兼容性问题的,而且版本不对很容易出现Class Not FoundMethod Not Found这种问题,所以谨慎的小伙伴可以选择这里给出的已经配好的版本

连接

连接socket的情况一般有三种:

  • 主动登录连接:用户输入账号密码,手机号验证码等主动进行的登录事件进行连接
  • 被动登录连接:用户退出应用程序,再次打开应用程序,使用之前保留的Token进行登录
  • 断网重连:用户网络断开后,重新处理登录状态
被动登录连接

创建一个MainScreenModel存储应用主视图对象模型,在其中建立firstTryLinkSocket变量,并将该模型对象使用Koin进行注入:


class MainScreenModel : ScreenModel {
	//...
	
	private val _firstTryLinkSocket = MutableStateFlow(true)  
	val firstTryLinkSocket = _firstTryLinkSocket.asStateFlow()  
	fun triedLinkSocket() {  
	    _firstTryLinkSocket.value = false  
	}
	
	//...
}

以及socket相关的变量初始化:

class MainScreenModel : ScreenModel {
	//...
	
	private val _collectorJob = MutableStateFlow<Job?>(null)  
	private val _socketClient = MutableStateFlow(StompClient(SockJSClient()) {  
	    instrumentation = object : KrossbowInstrumentation {  
	        override suspend fun onWebSocketClosed(cause: Throwable?) {  
	            globalDataModel.resetSocketConnected(false)  
	        }  
	    }  
	})  
	private val _socketSession = MutableStateFlow<StompSession?>(null)  
	val socketSession = _socketSession.asStateFlow()
	
	//...
}

在进入应用程序封面页面,尝试进行连接,从存储文件中读入用户信息解析token,如果能读取解析到,则尝试进行连接。关于跨平台的文件存储可以参考Kotlin Compose Multiplatform下数据持久化解决方案


//...

dataStorageManager: DataStorageManager = koinInject()
mainModel: MainScreenModel = koinInject()
val commonApiCoroutine = rememberCoroutineScope()

//...

val firstTryLinkSocket = mainModel.firstTryLinkSocket.value  
val userDataStringDb = dataStorageManager.getNonFlowString(DataStorageManager.USER_DATA)

//...

if (userDataStringDb.isNotBlank() && firstTryLinkSocket) {  
    mainModel.triedLinkSocket()  
    val userDataDb: UserDataModel = baseJsonConf.decodeFromString(userDataStringDb)  
    if (!userDataDb.token.isNullOrBlank()) {  
        commonApiCoroutine.launch {  
            mainModel.login(  
                dbData = userDataDb,  
                forceLogin = true  
            )  
        }  
    }  
}

登录连接,需要先检查token是否过期,过期直接消除登录状态:

suspend fun login(  
    account: String = "", passwd: String = "",  
    dbData: UserDataModel? = null,  
    forceLogin: Boolean = false,  
) {  

	//....

	val isLogin = BaseApi().isLogin(_userState.value.token)  
	if (!isLogin) {  
	    if (globalDataModel.netStatus.value) {  
	        globalDataModel.clearLocalUserState()  
	    } else {  
	        globalDataModel.resetSocketConnected(false)  
	    }  
	    if (!forceLogin) return  
	} else {  
	    chatScreenModel.updateChatData(_userState.value.token)  
	}

	//...
	
    CoroutineScope(Dispatchers.IO).launch(socketExceptionHandlerWithReconnect) {  
        try {  
            _collectorJob.value?.cancel()  
            _socketSession.value?.disconnect()  
            _collectorJob.value = null  
            _socketSession.value = null  
        } catch (ignore: Exception) {  
        }  
  
        _socketSession.value = _socketClient.value.connect(  
            "your_url"
        )  
        val subscription: Flow<String> = _socketSession.value!!.subscribeText(  
            "your_subscribe"
        )  
        globalDataModel.resetSocketConnected(true)  
        _collectorJob.value = _commonCoroutine.launch(socketExceptionHandlerWithReconnect) {  
            subscription.collect { msg ->  
                val chatRow: ChatRowModel = baseJsonConf.decodeFromString(msg)  
                chatScreenModel.pushChatMessage(_userState.value.token, chatRow)  
            }  
        }  
    }
}

此时就成功连接并且订阅完成,当收到消息时候就可以通过baseJsonConf.decodeFromString()就行反序列化获取对象,并且进行相应的处理。对于序列化的相关内容可以参考Kotlin Compose Multiplatform下实现HTTP请求。注意这里需要使用CoroutineScope(Dispatchers.IO)来创建一个在I/O线程池中执行的协程作用域进行socket连接,否则对于移动端会报该项工作不能在主线程当中执行

主动发送消息到服务端:

    CoroutineScope(Dispatchers.IO).launch(mainModel.socketExceptionHandler) {  
        socketSession?.sendText(  
            "your_url", "your_message"
        )  
        chatScreenModel.updateInputContent(chatId, "")  
}

这里一般的处理方案是将各种信息封装到body当中,在服务端进行解析来发送给指定订阅者

主动登录连接

后面对于socket本身的处理都一样,前面调用login函数时,传入账号密码相关参数进行处理:

suspend fun login(  
    account: String = "", passwd: String = "",  
    dbData: UserDataModel? = null,  
    forceLogin: Boolean = false,  
) {  
	//...
	_userState.value.userData = BaseApi().login(account, passwd)  
    val thisToken = _userState.value.userData.token  
    if (!thisToken.isNullOrBlank()) {  
        _syncUserData.value = true  
        chatScreenModel.updateChatData(thisToken)  
    }  
    _userState.value.token = _userState.value.userData.token ?: ""  
    if (_userState.value.token.isBlank()) return  

	//...
	
	chatScreenModel.updateChatData(_userState.value.token)  
  
	//...
	
    CoroutineScope(Dispatchers.IO).launch(socketExceptionHandlerWithReconnect) {  
        try {  
            _collectorJob.value?.cancel()  
            _socketSession.value?.disconnect()  
            _collectorJob.value = null  
            _socketSession.value = null  
        } catch (ignore: Exception) {  
        }  
  
        _socketSession.value = _socketClient.value.connect(  
            "your_url"
        )  
        val subscription: Flow<String> = _socketSession.value!!.subscribeText(  
            "your_subscribe"
        )  
        globalDataModel.resetSocketConnected(true)  
        _collectorJob.value = _commonCoroutine.launch(socketExceptionHandlerWithReconnect) {  
            subscription.collect { msg ->  
                val chatRow: ChatRowModel = baseJsonConf.decodeFromString(msg)  
                chatScreenModel.pushChatMessage(_userState.value.token, chatRow)  
            }  
        }  
    }
}

通过登录获取token,而不是从持久化文件当中获取

断网重连

细心的小伙伴可能发现了,我们在进行连接/订阅/发送的时候都加入了异常处理函数,socketExceptionHandlerWithReconnect以及mainModel.socketExceptionHandler,主要处理对于断网或者网络不稳定时候的重连机制,如果不做任何处理,那么在网络不稳定的时候应用程序就会崩溃

对于发送失败的情况,我们不做主动断连重连,只是提示用户网络不稳定即可,此时在用户节目的表现仅为消息发送不出去,弹出网络不稳定的通知:

val socketExceptionHandler = CoroutineExceptionHandler { _, exception ->  
    println("CoroutineException Caught $exception")  
    globalDataModel.resetSocketConnected(false)  
    globalDataModel.checkNetwork()  
}

对于心跳异常,或者没有连接/订阅成功的情况,我们会每隔几秒进行重连:

private val socketExceptionHandlerWithReconnect = CoroutineExceptionHandler { _, exception ->  
    println("Reconnect CoroutineException Caught $exception")  
    globalDataModel.resetSocketConnected(false)  
    globalDataModel.checkNetwork()  
    CoroutineScope(Dispatchers.IO).launch {  
        delay(5000)  
        if (globalDataModel.userState.value.token.isNotBlank()) {  
            login(  
                dbData = globalDataModel.userState.value.userData,  
                forceLogin = true  
            )  
        }  
    }  
}

至此就可以构建一个基本的稳定sockJs连接客户端。如果有对于代码块的不理解,可以参考下方源码,结合相关上下文

源码

Tomoyo

参考资料

Krossbow

Krossbow Doc

原文链接

欢迎大家对于本站的访问 - AsterCasc

Logo

开源鸿蒙跨平台开发社区汇聚开发者与厂商,共建“一次开发,多端部署”的开源生态,致力于降低跨端开发门槛,推动万物智联创新。

更多推荐