Skip to content
Server Plugin

Ktor Server 中的 WebSockets

必要相依性io.ktor:ktor-server-websockets

程式碼範例 server-websockets

Native 伺服器
Ktor 支援 Kotlin/Native,並允許您在沒有額外執行階段或虛擬機的情況下執行伺服器。
支援:✅

WebSocket 是一種協定,可透過單一 TCP 連線在使用者瀏覽器與伺服器之間提供全雙工通訊工作階段。它對於建立需要與伺服器進行即時資料傳輸的應用程式特別有用。

Ktor 在伺服器端與用戶端均支援 WebSocket 協定。

Ktor 允許您:

  • 配置基本的 WebSocket 設定,例如框架大小、ping 週期等等。
  • 處理 WebSocket 工作階段,以便在伺服器與用戶端之間交換訊息。
  • 新增 WebSocket 擴充套件。例如,您可以使用 Deflate 擴充套件或實作 自訂擴充套件

若要了解用戶端的 WebSocket 支援,請參閱 WebSockets 用戶端外掛程式

對於單向通訊工作階段,請考慮使用 Server-Sent Events (SSE)。當伺服器需要向用戶端傳送基於事件的更新時,SSE 特別有用。

新增相依性

要使用 WebSockets,您需要在建置指令碼中包含 ktor-server-websockets 構件:

Kotlin
Groovy
XML

安裝 WebSockets

要將 WebSockets 外掛程式安裝到應用程式,請將其傳遞給指定

模組
模組允許您透過對路由進行分組來建構應用程式。
中的 install 函式。 下方的程式碼片段顯示如何安裝 WebSockets ...

  • ... 在 embeddedServer 函式呼叫中。
  • ... 在明確定義的 module 中,該模組是 Application 類別的擴充函式。
kotlin
kotlin

配置 WebSockets

(選用)您可以透過傳遞 WebSocketOptionsinstall 區塊內配置外掛程式:

  • 使用 pingPeriod 屬性指定 ping 之間的時間間隔。
  • 使用 timeout 屬性設定連線關閉前的逾時時間。
  • 使用 maxFrameSize 屬性設定可接收或傳送的最大框架(Frame)大小。
  • 使用 masking 屬性指定是否啟用遮罩。
  • 使用 contentConverter 屬性設定用於序列化/反序列化的轉換器。
kotlin
install(WebSockets) {
    pingPeriod = 15.seconds
    timeout = 15.seconds
    maxFrameSize = Long.MAX_VALUE
    masking = false
}

處理 WebSockets 工作階段

API 總覽

安裝並配置 WebSockets 外掛程式後,您可以定義一個端點來處理 WebSocket 工作階段。要在伺服器上定義 WebSocket 端點,請在 routing 區塊內呼叫 webSocket 函式:

kotlin
routing { 
    webSocket("/echo") {
       // 處理 WebSocket 工作階段
    }
}

在此範例中,當使用 預設配置 時,伺服器會接受發送至 ws://localhost:8080/echo 的 WebSocket 請求。

webSocket 區塊內,您定義 WebSocket 工作階段的處理常式,該工作階段由 DefaultWebSocketServerSession 類別表示。 區塊內提供以下函式和屬性:

  • 使用 send 函式向用戶端傳送文字內容。
  • 使用 incomingoutgoing 屬性來存取用於接收和傳送 WebSocket 框架的通道。框架由 Frame 類別表示。
  • 使用 close 函式傳送帶有指定原因的關閉框架。

處理工作階段時,您可以檢查框架類型,例如:

  • Frame.Text 是文字框架。對於此框架類型,您可以使用 Frame.Text.readText() 讀取其內容。
  • Frame.Binary 是二進制框架。對於此類型,您可以使用 Frame.Binary.readBytes() 讀取其內容。

請注意,incoming 通道不包含控制框架,例如 ping/pong 或關閉框架。 要處理控制框架並重新組裝分段框架,請使用 webSocketRaw 函式來處理 WebSocket 工作階段。

要獲取有關用戶端的資訊(例如用戶端的 IP 地址),請使用 call 屬性。了解 一般請求資訊

下面,我們將看看使用此 API 的範例。

範例:處理單一工作階段

下方的範例顯示如何建立 echo WebSocket 端點來處理與單一用戶端的工作階段:

kotlin
routing {
    webSocket("/echo") {
        send("Please enter your name")
        for (frame in incoming) {
            frame as? Frame.Text ?: continue
            val receivedText = frame.readText()
            if (receivedText.equals("bye", ignoreCase = true)) {
                close(CloseReason(CloseReason.Codes.NORMAL, "Client said BYE"))
            } else {
                send(Frame.Text("Hi, $receivedText!"))
            }
        }
    }
}

如需完整範例,請參閱 server-websockets

範例:處理多個工作階段

為了有效地管理多個 WebSocket 工作階段並處理廣播,您可以利用 Kotlin 的 SharedFlow。這種方法為管理 WebSocket 通訊提供了一種可擴展且並行友善的方法。以下是實作此模式的方法:

  1. 定義一個用於廣播訊息的 SharedFlow
kotlin
val messageResponseFlow = MutableSharedFlow<MessageResponse>()
val sharedFlow = messageResponseFlow.asSharedFlow()
  1. 在您的 WebSocket 路由中,實作廣播和訊息處理邏輯:
kotlin

        webSocket("/ws") {
            send("You are connected to WebSocket!")

            val job = launch {
                sharedFlow.collect { message ->
                    send(message.message)
                }
            }

            runCatching {
                incoming.consumeEach { frame ->
                    if (frame is Frame.Text) {
                        val receivedText = frame.readText()
                        val messageResponse = MessageResponse(receivedText)
                        messageResponseFlow.emit(messageResponse)
                    }
                }
            }.onFailure { exception ->
                println("WebSocket exception: ${exception.localizedMessage}")
            }.also {
                job.cancel()
            }
        }

runCatching 區塊處理傳入訊息並將其發射到 SharedFlow,然後由 SharedFlow 廣播給所有收集者。

透過使用此模式,您可以有效地管理多個 WebSocket 工作階段,而無需手動追蹤個別連線。這種方法非常適合具有許多並行 WebSocket 連線的應用程式,並提供了一種乾淨、反應式的方法來處理訊息廣播。

如需完整範例,請參閱 server-websockets-sharedflow

WebSocket API 與 Ktor

來自 WebSocket API 的標準事件 以以下方式對應到 Ktor:

  • onConnect 發生在區塊的開頭。
  • onMessage 發生在成功讀取訊息(例如透過 incoming.receive())或使用暫停的反覆運算(suspended iteration)與 for(frame in incoming) 之後。
  • onClose 發生在 incoming 通道關閉時。這會完成暫停的反覆運算,或在嘗試接收訊息時拋出 ClosedReceiveChannelException
  • onError 等同於其他例外。

onCloseonError 中,都會設定 closeReason 屬性。

在以下範例中,只有在引發例外(ClosedReceiveChannelException 或其他例外)時才會退出無窮迴圈:

kotlin
webSocket("/echo") {
    println("onConnect")
    try {
        for (frame in incoming){
            val text = (frame as Frame.Text).readText()
            println("onMessage")
            received += text
            outgoing.send(Frame.Text(text))
        }
    } catch (e: ClosedReceiveChannelException) {
        println("onClose ${closeReason.await()}")
    } catch (e: Throwable) {
        println("onError ${closeReason.await()}")
        e.printStackTrace()
    }
}