Skip to content
Server Plugin

Ktor 伺服器中的 WebSocket

所需相依性: io.ktor:ktor-server-websockets

程式碼範例: server-websockets

原生伺服器
Ktor 支援 Kotlin/Native 並允許您在沒有額外執行時或虛擬機器的情況下執行伺服器。
支援: ✅

WebSocket 是一種協定,透過單一 TCP 連線在用戶的瀏覽器和伺服器之間提供全雙工通訊會話。它對於建立需要從伺服器即時傳輸資料到伺服器以及從伺服器接收即時資料的應用程式特別有用。

Ktor 同時支援伺服器端和用戶端的 WebSocket 協定。

Ktor 允許您:

  • 配置基本的 WebSocket 設定,例如訊框大小、ping 週期等。
  • 處理 WebSocket 會話,用於伺服器和用戶端之間交換訊息。
  • 新增 WebSocket 擴充功能。例如,您可以使用 Deflate 擴充功能或實作 自訂擴充功能

若要了解用戶端 WebSocket 支援,請參閱 WebSocket 用戶端外掛程式

對於單向通訊會話,請考慮使用 Server-Sent Events (SSE)。SSE 在伺服器需要向用戶端傳送基於事件的更新時特別有用。

新增相依性

若要使用 WebSockets,您需要將 ktor-server-websockets artifact 加入到建置腳本中:

Kotlin
Groovy
XML

安裝 WebSockets

若要將 WebSockets 外掛程式安裝到應用程式中,請在指定的

模組
模組允許您透過分組路由來組織應用程式。
中將其傳遞給 install 函式。以下程式碼片段展示如何安裝 WebSockets ...

  • ... 在 embeddedServer 函式呼叫內部。
  • ... 在明確定義的 module 內部,它是 Application 類別的擴充函式。
kotlin
kotlin

配置 WebSockets

您可以選擇在 install 區塊內部配置此外掛程式,透過傳遞 WebSocketOptions

  • 使用 pingPeriod 屬性來指定 ping 之間的持續時間。
  • 使用 timeout 屬性來設定逾時時間,達到該時間後連線將被關閉。
  • 使用 maxFrameSize 屬性來設定可接收或傳送的最大訊框大小。
  • 使用 masking 屬性來指定是否啟用遮罩。
  • 使用 contentConverter 屬性來設定用於序列化/反序列化的轉換器。
kotlin
install(WebSockets) {
    pingPeriod = 15.seconds
    timeout = 15.seconds
    maxFrameSize = Long.MAX_VALUE
    masking = false
}

處理 WebSocket 會話

API 概述

一旦您安裝並配置了 WebSockets 外掛程式,您就可以定義一個端點來處理 WebSocket 會話。若要在伺服器上定義 WebSocket 端點,請在 路由 區塊內部呼叫 webSocket 函式:

kotlin
routing { 
    webSocket("/echo") {
       // Handle a WebSocket session
    }
}

在此範例中,當使用 預設配置 時,伺服器接受 ws://localhost:8080/echo 的 WebSocket 請求。

webSocket 區塊內部,您定義 WebSocket 會話的處理常式,該處理常式由 DefaultWebSocketServerSession 類別表示。 以下函式和屬性在此區塊內可用:

  • 使用 send 函式將文字內容傳送給用戶端。
  • 使用 incomingoutgoing 屬性來存取用於接收和傳送 WebSocket 訊框的頻道。訊框由 Frame 類別表示。
  • 使用 close 函式傳送帶有指定原因的關閉訊框。

處理會話時,您可以檢查訊框類型,例如:

  • Frame.Text 是一個文字訊框。對於此訊框類型,您可以使用 Frame.Text.readText() 讀取其內容。
  • Frame.Binary 是一個二進位訊框。對於此類型,您可以使用 Frame.Binary.readBytes() 讀取其內容。

請注意,incoming 頻道不包含控制訊框,例如 ping/pong 或關閉訊框。 若要處理控制訊框並重新組合分段訊框,請使用 webSocketRaw 函式來處理 WebSocket 會話。

若要取得用戶端資訊(例如用戶端的 IP 位址),請使用 call 屬性。了解 一般請求資訊

下面,我們將看看使用此 API 的範例。

範例:處理單一會話

下面的範例展示如何建立 echo WebSocket 端點以處理與一個用戶端的會話:

kotlin
routing {
    webSocket("/echo") {
        send("Please enter your name")
        for (frame in incoming) {
            frame as? Frame.Text ?: continue
            val receivedText = frame.readText()
            if (receivedText.equals("bye", ignoreCase = true)) {
                close(CloseReason(CloseReason.Codes.NORMAL, "Client said BYE"))
            } else {
                send(Frame.Text("Hi, $receivedText!"))
            }
        }
    }
}

如需完整範例,請參閱 server-websockets

範例:處理多個會話

為了有效管理多個 WebSocket 會話並處理廣播,您可以利用 Kotlin 的 SharedFlow。 這種方法提供了一種可擴展且併發友好的方法,用於管理 WebSocket 通訊。以下是實作此模式的方法:

  1. 定義一個用於廣播訊息的 SharedFlow
kotlin
val messageResponseFlow = MutableSharedFlow<MessageResponse>()
val sharedFlow = messageResponseFlow.asSharedFlow()
  1. 在您的 WebSocket 路由中,實作廣播和訊息處理邏輯:
kotlin

        webSocket("/ws") {
            send("You are connected to WebSocket!")

            val job = launch {
                sharedFlow.collect { message ->
                    send(message.message)
                }
            }

            runCatching {
                incoming.consumeEach { frame ->
                    if (frame is Frame.Text) {
                        val receivedText = frame.readText()
                        val messageResponse = MessageResponse(receivedText)
                        messageResponseFlow.emit(messageResponse)
                    }
                }
            }.onFailure { exception ->
                println("WebSocket exception: ${exception.localizedMessage}")
            }.also {
                job.cancel()
            }
        }

runCatching 區塊處理傳入訊息並將它們發射到 SharedFlow,然後廣播給所有收集器。

透過使用此模式,您可以有效地管理多個 WebSocket 會話,而無需手動追蹤個別連線。這種方法對於具有許多併發 WebSocket 連線的應用程式具有良好的擴展性,並提供一種簡潔、響應式的方式來處理訊息廣播。

如需完整範例,請參閱 server-websockets-sharedflow

WebSocket API 與 Ktor

WebSocket API 中的標準事件 以以下方式對應到 Ktor:

  • onConnect 發生在區塊的開頭。
  • onMessage 發生在成功讀取訊息(例如,使用 incoming.receive())或使用 for(frame in incoming) 進行掛起迭代之後。
  • onClose 發生在 incoming 頻道關閉時。這將完成掛起迭代,或在嘗試接收訊息時拋出 ClosedReceiveChannelException
  • onError 等同於其他例外。

onCloseonError 中,closeReason 屬性會被設定。

在以下範例中,無限迴圈只會在發生例外(無論是 ClosedReceiveChannelException 還是其他例外)時才會退出:

kotlin
webSocket("/echo") {
    println("onConnect")
    try {
        for (frame in incoming){
            val text = (frame as Frame.Text).readText()
            println("onMessage")
            received += text
            outgoing.send(Frame.Text(text))
        }
    } catch (e: ClosedReceiveChannelException) {
        println("onClose ${closeReason.await()}")
    } catch (e: Throwable) {
        println("onError ${closeReason.await()}")
        e.printStackTrace()
    }
}