Skip to content
Server Plugin

Ktor 服务器中的 WebSockets

所需依赖项: io.ktor:ktor-server-websockets

代码示例: server-websockets

原生服务器
Ktor 支持 Kotlin/Native,允许您无需额外的运行时或虚拟机即可运行服务器。
支持: ✅

WebSocket 是一种协议,它通过单个 TCP 连接在用户浏览器和服务器之间提供全双工通信会话。 它对于创建需要从服务器和向服务器实时数据传输的应用程序特别有用。

Ktor 在服务端和客户端都支持 WebSocket 协议。

Ktor 允许您:

  • 配置基本的 WebSocket 设置,例如帧大小、ping 周期等。
  • 处理 WebSocket 会话,用于服务器和客户端之间交换消息。
  • 添加 WebSocket 扩展。例如,您可以使用 Deflate 扩展或实现一个自定义扩展

关于客户端 WebSocket 支持,请参见 WebSockets 客户端插件

对于单向通信会话,请考虑使用 Server-Sent Events (SSE)。SSE 在服务器需要向客户端发送基于事件的更新时特别有用。

添加依赖项

要使用 WebSockets,您需要在构建脚本中引入 ktor-server-websockets artifact:

Kotlin
Groovy
XML

安装 WebSockets

要将 WebSockets 插件安装到应用程序, 请在指定的

模块
模块允许您通过分组路由来组织您的应用程序。
中将其传递给 install 函数。 以下代码片段展示了如何安装 WebSockets...

  • ...在 embeddedServer 函数调用内部。
  • ...在显式定义的 module 内部,`module` 是 Application 类的扩展函数。
kotlin
kotlin

配置 WebSockets

您可以选择在 install 代码块内部通过传递 WebSocketOptions 来配置该插件:

  • 使用 pingPeriod 属性指定 ping 间隔。
  • 使用 timeout 属性设置连接关闭的超时时间。
  • 使用 maxFrameSize 属性设置可以接收或发送的最大帧。
  • 使用 masking 属性指定是否启用掩码。
  • 使用 contentConverter 属性设置用于序列化/反序列化的转换器。
kotlin
install(WebSockets) {
    pingPeriod = 15.seconds
    timeout = 15.seconds
    maxFrameSize = Long.MAX_VALUE
    masking = false
}

处理 WebSockets 会话

API 概览

一旦您安装并配置了 WebSockets 插件,就可以定义一个端点来处理 Websocket 会话。要在服务器上定义 WebSocket 端点,请在 routing 代码块内调用 webSocket 函数:

kotlin
routing { 
    webSocket("/echo") {
       // Handle a WebSocket session
    }
}

在此示例中,当使用默认配置时,服务器接受对 ws://localhost:8080/echo 的 WebSocket 请求。

webSocket 代码块内部,您定义了 WebSocket 会话的处理器,该处理器由 DefaultWebSocketServerSession 类表示。 以下函数和属性在该代码块内可用:

  • 使用 send 函数向客户端发送文本内容。
  • 使用 incomingoutgoing 属性访问用于接收和发送 WebSocket 帧的通道。帧由 Frame 类表示。
  • 使用 close 函数发送带有指定原因的关闭帧。

处理会话时,您可以检测帧类型,例如:

  • Frame.Text 是一个文本帧。对于这种帧类型,您可以使用 Frame.Text.readText() 读取其内容。
  • Frame.Binary 是一个二进制帧。对于这种类型,您可以使用 Frame.Binary.readBytes() 读取其内容。

请注意,incoming 通道不包含控制帧,例如 ping/pong 或关闭帧。 要处理控制帧并重组分片帧,请使用 webSocketRaw 函数来处理 WebSocket 会话。

要获取有关客户端的信息(例如客户端的 IP 地址),请使用 call 属性。了解一般请求信息

下面,我们将看看使用此 API 的示例。

示例:处理单个会话

以下示例展示了如何创建 echo WebSocket 端点来处理与单个客户端的会话:

kotlin
routing {
    webSocket("/echo") {
        send("Please enter your name")
        for (frame in incoming) {
            frame as? Frame.Text ?: continue
            val receivedText = frame.readText()
            if (receivedText.equals("bye", ignoreCase = true)) {
                close(CloseReason(CloseReason.Codes.NORMAL, "Client said BYE"))
            } else {
                send(Frame.Text("Hi, $receivedText!"))
            }
        }
    }
}

有关完整示例,请参见 server-websockets

示例:处理多个会话

为了有效管理多个 WebSocket 会话并处理广播,您可以利用 Kotlin 的 SharedFlow。 此方法提供了一种可伸缩且并发友好的方法来管理 WebSocket 通信。以下是实现此模式的方法:

  1. 定义一个用于广播消息的 SharedFlow
kotlin
val messageResponseFlow = MutableSharedFlow<MessageResponse>()
val sharedFlow = messageResponseFlow.asSharedFlow()
  1. 在您的 WebSocket 路由中,实现广播和消息处理逻辑:
kotlin

        webSocket("/ws") {
            send("You are connected to WebSocket!")

            val job = launch {
                sharedFlow.collect { message ->
                    send(message.message)
                }
            }

            runCatching {
                incoming.consumeEach { frame ->
                    if (frame is Frame.Text) {
                        val receivedText = frame.readText()
                        val messageResponse = MessageResponse(receivedText)
                        messageResponseFlow.emit(messageResponse)
                    }
                }
            }.onFailure { exception ->
                println("WebSocket exception: ${exception.localizedMessage}")
            }.also {
                job.cancel()
            }
        }

runCatching 代码块处理传入消息并将其发送到 SharedFlow,然后 SharedFlow 广播给所有收集者。

通过使用此模式,您可以有效地管理多个 WebSocket 会话,而无需手动跟踪单个连接。此方法对于具有许多并发 WebSocket 连接的应用程序具有良好的伸缩性,并提供了一种清晰、反应式的方式来处理消息广播。

有关完整示例,请参见 server-websockets-sharedflow

WebSocket API 与 Ktor

WebSocket API 中的标准事件 以以下方式映射到 Ktor:

  • onConnect 在代码块开始时发生。
  • onMessage 在成功读取消息后(例如,使用 incoming.receive())或使用 for(frame in incoming) 进行挂起迭代后发生。
  • onCloseincoming 通道关闭时发生。这将完成挂起迭代,或在尝试接收消息时抛出 ClosedReceiveChannelException
  • onError 等同于其他异常。

onCloseonError 中,closeReason 属性都被设置。

在以下示例中,无限循环仅在异常发生时(无论是 ClosedReceiveChannelException 还是其他异常)退出:

kotlin
webSocket("/echo") {
    println("onConnect")
    try {
        for (frame in incoming){
            val text = (frame as Frame.Text).readText()
            println("onMessage")
            received += text
            outgoing.send(Frame.Text(text))
        }
    } catch (e: ClosedReceiveChannelException) {
        println("onClose ${closeReason.await()}")
    } catch (e: Throwable) {
        println("onError ${closeReason.await()}")
        e.printStackTrace()
    }
}