Ktor Server 中的 WebSockets
必要相依性:io.ktor:ktor-server-websockets
程式碼範例: server-websockets
WebSocket 是一種協定,可透過單一 TCP 連線在使用者瀏覽器與伺服器之間提供全雙工通訊工作階段。它對於建立需要與伺服器進行即時資料傳輸的應用程式特別有用。
Ktor 在伺服器端與用戶端均支援 WebSocket 協定。
Ktor 允許您:
- 配置基本的 WebSocket 設定,例如框架大小、ping 週期等等。
- 處理 WebSocket 工作階段,以便在伺服器與用戶端之間交換訊息。
- 新增 WebSocket 擴充套件。例如,您可以使用 Deflate 擴充套件或實作 自訂擴充套件。
若要了解用戶端的 WebSocket 支援,請參閱 WebSockets 用戶端外掛程式。
對於單向通訊工作階段,請考慮使用 Server-Sent Events (SSE)。當伺服器需要向用戶端傳送基於事件的更新時,SSE 特別有用。
新增相依性
要使用 WebSockets,您需要在建置指令碼中包含 ktor-server-websockets 構件:
安裝 WebSockets
要將 WebSockets 外掛程式安裝到應用程式,請將其傳遞給指定
install 函式。 下方的程式碼片段顯示如何安裝 WebSockets ... - ... 在
embeddedServer函式呼叫中。 - ... 在明確定義的
module中,該模組是Application類別的擴充函式。
配置 WebSockets
(選用)您可以透過傳遞 WebSocketOptions 在 install 區塊內配置外掛程式:
- 使用
pingPeriod屬性指定 ping 之間的時間間隔。 - 使用
timeout屬性設定連線關閉前的逾時時間。 - 使用
maxFrameSize屬性設定可接收或傳送的最大框架(Frame)大小。 - 使用
masking屬性指定是否啟用遮罩。 - 使用
contentConverter屬性設定用於序列化/反序列化的轉換器。
install(WebSockets) {
pingPeriod = 15.seconds
timeout = 15.seconds
maxFrameSize = Long.MAX_VALUE
masking = false
}處理 WebSockets 工作階段
API 總覽
安裝並配置 WebSockets 外掛程式後,您可以定義一個端點來處理 WebSocket 工作階段。要在伺服器上定義 WebSocket 端點,請在 routing 區塊內呼叫 webSocket 函式:
routing {
webSocket("/echo") {
// 處理 WebSocket 工作階段
}
}在此範例中,當使用 預設配置 時,伺服器會接受發送至 ws://localhost:8080/echo 的 WebSocket 請求。
在 webSocket 區塊內,您定義 WebSocket 工作階段的處理常式,該工作階段由 DefaultWebSocketServerSession 類別表示。 區塊內提供以下函式和屬性:
- 使用
send函式向用戶端傳送文字內容。 - 使用
incoming和outgoing屬性來存取用於接收和傳送 WebSocket 框架的通道。框架由Frame類別表示。 - 使用
close函式傳送帶有指定原因的關閉框架。
處理工作階段時,您可以檢查框架類型,例如:
Frame.Text是文字框架。對於此框架類型,您可以使用Frame.Text.readText()讀取其內容。Frame.Binary是二進制框架。對於此類型,您可以使用Frame.Binary.readBytes()讀取其內容。
請注意,
incoming通道不包含控制框架,例如 ping/pong 或關閉框架。 要處理控制框架並重新組裝分段框架,請使用 webSocketRaw 函式來處理 WebSocket 工作階段。
要獲取有關用戶端的資訊(例如用戶端的 IP 地址),請使用
call屬性。了解 一般請求資訊。
下面,我們將看看使用此 API 的範例。
範例:處理單一工作階段
下方的範例顯示如何建立 echo WebSocket 端點來處理與單一用戶端的工作階段:
routing {
webSocket("/echo") {
send("Please enter your name")
for (frame in incoming) {
frame as? Frame.Text ?: continue
val receivedText = frame.readText()
if (receivedText.equals("bye", ignoreCase = true)) {
close(CloseReason(CloseReason.Codes.NORMAL, "Client said BYE"))
} else {
send(Frame.Text("Hi, $receivedText!"))
}
}
}
}如需完整範例,請參閱 server-websockets。
範例:處理多個工作階段
為了有效地管理多個 WebSocket 工作階段並處理廣播,您可以利用 Kotlin 的 SharedFlow。這種方法為管理 WebSocket 通訊提供了一種可擴展且並行友善的方法。以下是實作此模式的方法:
- 定義一個用於廣播訊息的
SharedFlow:
val messageResponseFlow = MutableSharedFlow<MessageResponse>()
val sharedFlow = messageResponseFlow.asSharedFlow()- 在您的 WebSocket 路由中,實作廣播和訊息處理邏輯:
webSocket("/ws") {
send("You are connected to WebSocket!")
val job = launch {
sharedFlow.collect { message ->
send(message.message)
}
}
runCatching {
incoming.consumeEach { frame ->
if (frame is Frame.Text) {
val receivedText = frame.readText()
val messageResponse = MessageResponse(receivedText)
messageResponseFlow.emit(messageResponse)
}
}
}.onFailure { exception ->
println("WebSocket exception: ${exception.localizedMessage}")
}.also {
job.cancel()
}
}runCatching 區塊處理傳入訊息並將其發射到 SharedFlow,然後由 SharedFlow 廣播給所有收集者。
透過使用此模式,您可以有效地管理多個 WebSocket 工作階段,而無需手動追蹤個別連線。這種方法非常適合具有許多並行 WebSocket 連線的應用程式,並提供了一種乾淨、反應式的方法來處理訊息廣播。
如需完整範例,請參閱 server-websockets-sharedflow。
WebSocket API 與 Ktor
來自 WebSocket API 的標準事件 以以下方式對應到 Ktor:
onConnect發生在區塊的開頭。onMessage發生在成功讀取訊息(例如透過incoming.receive())或使用暫停的反覆運算(suspended iteration)與for(frame in incoming)之後。onClose發生在incoming通道關閉時。這會完成暫停的反覆運算,或在嘗試接收訊息時拋出ClosedReceiveChannelException。onError等同於其他例外。
在 onClose 和 onError 中,都會設定 closeReason 屬性。
在以下範例中,只有在引發例外(ClosedReceiveChannelException 或其他例外)時才會退出無窮迴圈:
webSocket("/echo") {
println("onConnect")
try {
for (frame in incoming){
val text = (frame as Frame.Text).readText()
println("onMessage")
received += text
outgoing.send(Frame.Text(text))
}
} catch (e: ClosedReceiveChannelException) {
println("onClose ${closeReason.await()}")
} catch (e: Throwable) {
println("onError ${closeReason.await()}")
e.printStackTrace()
}
}