Kotlin Compose Multiplatform下Socket解决方案
对于跨平台socket的实现方式有很多,先不谈跨平台处理,一般我们要在Web端实现实时消息通信的方式是使用SockJs,服务端使用使用Spring的情况下自带了对于websocket的支持,可以通过集成相关库来完成消息通信在我们使用SockJs的时候,Web端使用库,使用SockJS尝试建立WebSocket连接时,Spring会接收该连接请求,并尝试建立WebSocket通道。如果WebSock
原文链接
前言
对于跨平台socket的实现方式有很多,先不谈跨平台处理,一般我们要在Web端实现实时消息通信的方式是使用SockJs,服务端使用使用Spring的情况下自带了对于websocket的支持,可以通过集成相关库来完成消息通信
在我们使用SockJs的时候,Web端使用sockjs-client库,使用SockJS尝试建立WebSocket连接时,Spring会接收该连接请求,并尝试建立WebSocket通道。如果WebSocket不可用,Web端会回退到其他协议(HTTP Streaming、HTTP Long Polling),Spring可以根据不同的协议做出不同的处理。众所周知,websocket是传输层协议,而在之上的具体实现的应用层协议一般为STOMP,消息以帧的形式传递,类似于HTTP,且支持发布/订阅模式
这方面的内容可以参考本站使用Spring构建实时聊天通知的页面应用以及使用Spring构建实时聊天通知的页面应用(续篇)相关内容
实现选择
如果使用SockJs,那么比较通用的跨平台方案还是使用krossbow完成会比较方便,但是这个库其实还是不是很成熟,使用下来各种小问题还是不少的。如果你的服务端不是已经确定使用Spring的SockJs标准处理方案,或者你有时间再写一套连接方案以兼容不同的客户端,其实有很多对于非Web客户端更好写的实现手段,比如:WebSocket+消息订阅、Socket.IO、MQTT甚至如果你没有服务端可以直接使Firebase
实现
关于服务端的实现,前言中的跳转文档中已经给出,这里不再赘述,这里只谈客户端
我们这里使用Koin进行依赖注入,对这里不太了解的小伙伴可以参考前文Kotlin Compose Multiplatform下导航解决方案中的Koin部分,当然也可以自己使用静态对象做单例做相似处理
引包
kotlin {
sourceSets {
commonMain.dependencies {
implementation("org.hildan.krossbow:krossbow-stomp-core:4.5.0")
implementation("org.hildan.krossbow:krossbow-websocket-sockjs:4.5.0")
implementation("org.glassfish.tyrus.bundles:tyrus-standalone-client:1.19")
implementation("com.fasterxml.jackson.core:jackson-databind:2.13.5")
}
}
}
之前基本没有聊过包的版本问题,因为对于公共包而言,虽然我们说首位版本号更新是可以接受无法兼容的,但是对于这种常用的大体量的公共包而言,基本都会保留/兼容之前的过时接口和类,一般无脑上最新或者次新稳定版本就行了。但是几个包是有兼容性问题的,而且版本不对很容易出现Class Not Found,Method Not Found这种问题,所以谨慎的小伙伴可以选择这里给出的已经配好的版本
连接
连接socket的情况一般有三种:
- 主动登录连接:用户输入账号密码,手机号验证码等主动进行的登录事件进行连接
- 被动登录连接:用户退出应用程序,再次打开应用程序,使用之前保留的
Token进行登录 - 断网重连:用户网络断开后,重新处理登录状态
被动登录连接
创建一个MainScreenModel存储应用主视图对象模型,在其中建立firstTryLinkSocket变量,并将该模型对象使用Koin进行注入:
class MainScreenModel : ScreenModel {
//...
private val _firstTryLinkSocket = MutableStateFlow(true)
val firstTryLinkSocket = _firstTryLinkSocket.asStateFlow()
fun triedLinkSocket() {
_firstTryLinkSocket.value = false
}
//...
}
以及socket相关的变量初始化:
class MainScreenModel : ScreenModel {
//...
private val _collectorJob = MutableStateFlow<Job?>(null)
private val _socketClient = MutableStateFlow(StompClient(SockJSClient()) {
instrumentation = object : KrossbowInstrumentation {
override suspend fun onWebSocketClosed(cause: Throwable?) {
globalDataModel.resetSocketConnected(false)
}
}
})
private val _socketSession = MutableStateFlow<StompSession?>(null)
val socketSession = _socketSession.asStateFlow()
//...
}
在进入应用程序封面页面,尝试进行连接,从存储文件中读入用户信息解析token,如果能读取解析到,则尝试进行连接。关于跨平台的文件存储可以参考Kotlin Compose Multiplatform下数据持久化解决方案
//...
dataStorageManager: DataStorageManager = koinInject()
mainModel: MainScreenModel = koinInject()
val commonApiCoroutine = rememberCoroutineScope()
//...
val firstTryLinkSocket = mainModel.firstTryLinkSocket.value
val userDataStringDb = dataStorageManager.getNonFlowString(DataStorageManager.USER_DATA)
//...
if (userDataStringDb.isNotBlank() && firstTryLinkSocket) {
mainModel.triedLinkSocket()
val userDataDb: UserDataModel = baseJsonConf.decodeFromString(userDataStringDb)
if (!userDataDb.token.isNullOrBlank()) {
commonApiCoroutine.launch {
mainModel.login(
dbData = userDataDb,
forceLogin = true
)
}
}
}
登录连接,需要先检查token是否过期,过期直接消除登录状态:
suspend fun login(
account: String = "", passwd: String = "",
dbData: UserDataModel? = null,
forceLogin: Boolean = false,
) {
//....
val isLogin = BaseApi().isLogin(_userState.value.token)
if (!isLogin) {
if (globalDataModel.netStatus.value) {
globalDataModel.clearLocalUserState()
} else {
globalDataModel.resetSocketConnected(false)
}
if (!forceLogin) return
} else {
chatScreenModel.updateChatData(_userState.value.token)
}
//...
CoroutineScope(Dispatchers.IO).launch(socketExceptionHandlerWithReconnect) {
try {
_collectorJob.value?.cancel()
_socketSession.value?.disconnect()
_collectorJob.value = null
_socketSession.value = null
} catch (ignore: Exception) {
}
_socketSession.value = _socketClient.value.connect(
"your_url"
)
val subscription: Flow<String> = _socketSession.value!!.subscribeText(
"your_subscribe"
)
globalDataModel.resetSocketConnected(true)
_collectorJob.value = _commonCoroutine.launch(socketExceptionHandlerWithReconnect) {
subscription.collect { msg ->
val chatRow: ChatRowModel = baseJsonConf.decodeFromString(msg)
chatScreenModel.pushChatMessage(_userState.value.token, chatRow)
}
}
}
}
此时就成功连接并且订阅完成,当收到消息时候就可以通过baseJsonConf.decodeFromString()就行反序列化获取对象,并且进行相应的处理。对于序列化的相关内容可以参考Kotlin Compose Multiplatform下实现HTTP请求。注意这里需要使用CoroutineScope(Dispatchers.IO)来创建一个在I/O线程池中执行的协程作用域进行socket连接,否则对于移动端会报该项工作不能在主线程当中执行
主动发送消息到服务端:
CoroutineScope(Dispatchers.IO).launch(mainModel.socketExceptionHandler) {
socketSession?.sendText(
"your_url", "your_message"
)
chatScreenModel.updateInputContent(chatId, "")
}
这里一般的处理方案是将各种信息封装到body当中,在服务端进行解析来发送给指定订阅者
主动登录连接
后面对于socket本身的处理都一样,前面调用login函数时,传入账号密码相关参数进行处理:
suspend fun login(
account: String = "", passwd: String = "",
dbData: UserDataModel? = null,
forceLogin: Boolean = false,
) {
//...
_userState.value.userData = BaseApi().login(account, passwd)
val thisToken = _userState.value.userData.token
if (!thisToken.isNullOrBlank()) {
_syncUserData.value = true
chatScreenModel.updateChatData(thisToken)
}
_userState.value.token = _userState.value.userData.token ?: ""
if (_userState.value.token.isBlank()) return
//...
chatScreenModel.updateChatData(_userState.value.token)
//...
CoroutineScope(Dispatchers.IO).launch(socketExceptionHandlerWithReconnect) {
try {
_collectorJob.value?.cancel()
_socketSession.value?.disconnect()
_collectorJob.value = null
_socketSession.value = null
} catch (ignore: Exception) {
}
_socketSession.value = _socketClient.value.connect(
"your_url"
)
val subscription: Flow<String> = _socketSession.value!!.subscribeText(
"your_subscribe"
)
globalDataModel.resetSocketConnected(true)
_collectorJob.value = _commonCoroutine.launch(socketExceptionHandlerWithReconnect) {
subscription.collect { msg ->
val chatRow: ChatRowModel = baseJsonConf.decodeFromString(msg)
chatScreenModel.pushChatMessage(_userState.value.token, chatRow)
}
}
}
}
通过登录获取token,而不是从持久化文件当中获取
断网重连
细心的小伙伴可能发现了,我们在进行连接/订阅/发送的时候都加入了异常处理函数,socketExceptionHandlerWithReconnect以及mainModel.socketExceptionHandler,主要处理对于断网或者网络不稳定时候的重连机制,如果不做任何处理,那么在网络不稳定的时候应用程序就会崩溃
对于发送失败的情况,我们不做主动断连重连,只是提示用户网络不稳定即可,此时在用户节目的表现仅为消息发送不出去,弹出网络不稳定的通知:
val socketExceptionHandler = CoroutineExceptionHandler { _, exception ->
println("CoroutineException Caught $exception")
globalDataModel.resetSocketConnected(false)
globalDataModel.checkNetwork()
}
对于心跳异常,或者没有连接/订阅成功的情况,我们会每隔几秒进行重连:
private val socketExceptionHandlerWithReconnect = CoroutineExceptionHandler { _, exception ->
println("Reconnect CoroutineException Caught $exception")
globalDataModel.resetSocketConnected(false)
globalDataModel.checkNetwork()
CoroutineScope(Dispatchers.IO).launch {
delay(5000)
if (globalDataModel.userState.value.token.isNotBlank()) {
login(
dbData = globalDataModel.userState.value.userData,
forceLogin = true
)
}
}
}
至此就可以构建一个基本的稳定sockJs连接客户端。如果有对于代码块的不理解,可以参考下方源码,结合相关上下文
源码
参考资料
原文链接
更多推荐


所有评论(0)