Ktor 伺服器中的 WebSocket
所需相依性: io.ktor:ktor-server-websockets
程式碼範例: server-websockets
WebSocket 是一種協定,透過單一 TCP 連線在用戶的瀏覽器和伺服器之間提供全雙工通訊會話。它對於建立需要從伺服器即時傳輸資料到伺服器以及從伺服器接收即時資料的應用程式特別有用。
Ktor 同時支援伺服器端和用戶端的 WebSocket 協定。
Ktor 允許您:
- 配置基本的 WebSocket 設定,例如訊框大小、ping 週期等。
- 處理 WebSocket 會話,用於伺服器和用戶端之間交換訊息。
- 新增 WebSocket 擴充功能。例如,您可以使用 Deflate 擴充功能或實作 自訂擴充功能。
若要了解用戶端 WebSocket 支援,請參閱 WebSocket 用戶端外掛程式。
對於單向通訊會話,請考慮使用 Server-Sent Events (SSE)。SSE 在伺服器需要向用戶端傳送基於事件的更新時特別有用。
新增相依性
若要使用 WebSockets
,您需要將 ktor-server-websockets
artifact 加入到建置腳本中:
安裝 WebSockets
若要將 WebSockets
外掛程式安裝到應用程式中,請在指定的
install
函式。以下程式碼片段展示如何安裝 WebSockets
... - ... 在
embeddedServer
函式呼叫內部。 - ... 在明確定義的
module
內部,它是Application
類別的擴充函式。
配置 WebSockets
您可以選擇在 install
區塊內部配置此外掛程式,透過傳遞 WebSocketOptions:
- 使用
pingPeriod
屬性來指定 ping 之間的持續時間。 - 使用
timeout
屬性來設定逾時時間,達到該時間後連線將被關閉。 - 使用
maxFrameSize
屬性來設定可接收或傳送的最大訊框大小。 - 使用
masking
屬性來指定是否啟用遮罩。 - 使用
contentConverter
屬性來設定用於序列化/反序列化的轉換器。
install(WebSockets) {
pingPeriod = 15.seconds
timeout = 15.seconds
maxFrameSize = Long.MAX_VALUE
masking = false
}
處理 WebSocket 會話
API 概述
一旦您安裝並配置了 WebSockets
外掛程式,您就可以定義一個端點來處理 WebSocket 會話。若要在伺服器上定義 WebSocket 端點,請在 路由 區塊內部呼叫 webSocket
函式:
routing {
webSocket("/echo") {
// Handle a WebSocket session
}
}
在此範例中,當使用 預設配置 時,伺服器接受 ws://localhost:8080/echo
的 WebSocket 請求。
在 webSocket
區塊內部,您定義 WebSocket 會話的處理常式,該處理常式由 DefaultWebSocketServerSession 類別表示。 以下函式和屬性在此區塊內可用:
- 使用
send
函式將文字內容傳送給用戶端。 - 使用
incoming
和outgoing
屬性來存取用於接收和傳送 WebSocket 訊框的頻道。訊框由Frame
類別表示。 - 使用
close
函式傳送帶有指定原因的關閉訊框。
處理會話時,您可以檢查訊框類型,例如:
Frame.Text
是一個文字訊框。對於此訊框類型,您可以使用Frame.Text.readText()
讀取其內容。Frame.Binary
是一個二進位訊框。對於此類型,您可以使用Frame.Binary.readBytes()
讀取其內容。
請注意,
incoming
頻道不包含控制訊框,例如 ping/pong 或關閉訊框。 若要處理控制訊框並重新組合分段訊框,請使用 webSocketRaw 函式來處理 WebSocket 會話。
若要取得用戶端資訊(例如用戶端的 IP 位址),請使用
call
屬性。了解 一般請求資訊。
下面,我們將看看使用此 API 的範例。
範例:處理單一會話
下面的範例展示如何建立 echo
WebSocket 端點以處理與一個用戶端的會話:
routing {
webSocket("/echo") {
send("Please enter your name")
for (frame in incoming) {
frame as? Frame.Text ?: continue
val receivedText = frame.readText()
if (receivedText.equals("bye", ignoreCase = true)) {
close(CloseReason(CloseReason.Codes.NORMAL, "Client said BYE"))
} else {
send(Frame.Text("Hi, $receivedText!"))
}
}
}
}
如需完整範例,請參閱 server-websockets。
範例:處理多個會話
為了有效管理多個 WebSocket 會話並處理廣播,您可以利用 Kotlin 的 SharedFlow
。 這種方法提供了一種可擴展且併發友好的方法,用於管理 WebSocket 通訊。以下是實作此模式的方法:
- 定義一個用於廣播訊息的
SharedFlow
:
val messageResponseFlow = MutableSharedFlow<MessageResponse>()
val sharedFlow = messageResponseFlow.asSharedFlow()
- 在您的 WebSocket 路由中,實作廣播和訊息處理邏輯:
webSocket("/ws") {
send("You are connected to WebSocket!")
val job = launch {
sharedFlow.collect { message ->
send(message.message)
}
}
runCatching {
incoming.consumeEach { frame ->
if (frame is Frame.Text) {
val receivedText = frame.readText()
val messageResponse = MessageResponse(receivedText)
messageResponseFlow.emit(messageResponse)
}
}
}.onFailure { exception ->
println("WebSocket exception: ${exception.localizedMessage}")
}.also {
job.cancel()
}
}
runCatching
區塊處理傳入訊息並將它們發射到 SharedFlow
,然後廣播給所有收集器。
透過使用此模式,您可以有效地管理多個 WebSocket 會話,而無需手動追蹤個別連線。這種方法對於具有許多併發 WebSocket 連線的應用程式具有良好的擴展性,並提供一種簡潔、響應式的方式來處理訊息廣播。
如需完整範例,請參閱 server-websockets-sharedflow。
WebSocket API 與 Ktor
WebSocket API 中的標準事件 以以下方式對應到 Ktor:
onConnect
發生在區塊的開頭。onMessage
發生在成功讀取訊息(例如,使用incoming.receive()
)或使用for(frame in incoming)
進行掛起迭代之後。onClose
發生在incoming
頻道關閉時。這將完成掛起迭代,或在嘗試接收訊息時拋出ClosedReceiveChannelException
。onError
等同於其他例外。
在 onClose
和 onError
中,closeReason
屬性會被設定。
在以下範例中,無限迴圈只會在發生例外(無論是 ClosedReceiveChannelException
還是其他例外)時才會退出:
webSocket("/echo") {
println("onConnect")
try {
for (frame in incoming){
val text = (frame as Frame.Text).readText()
println("onMessage")
received += text
outgoing.send(Frame.Text(text))
}
} catch (e: ClosedReceiveChannelException) {
println("onClose ${closeReason.await()}")
} catch (e: Throwable) {
println("onError ${closeReason.await()}")
e.printStackTrace()
}
}