Skip to content
Server Plugin

Ktor Server 中的 WebSockets

所需依赖项io.ktor:ktor-server-websockets

代码示例 server-websockets

原生服务器
Ktor 支持 Kotlin/Native,允许您在没有额外运行时或虚拟机的情况下运行服务器。
支持:✅

WebSocket 是一种协议,它通过单个 TCP 连接在用户的浏览器和服务器之间提供全双工通信会话。它对于创建需要与服务器进行实时数据传输的应用特别有用。

Ktor 在服务器端和客户端都支持 WebSocket 协议。

Ktor 允许您:

  • 配置基础 WebSocket 设置,例如帧大小、ping 周期等。
  • 处理用于在服务器和客户端之间交换消息的 WebSocket 会话。
  • 添加 WebSocket 扩展。例如,您可以使用 Deflate 扩展或实现 自定义扩展

要了解客户端的 WebSocket 支持,请参阅 WebSockets 客户端插件

对于单向通信会话,请考虑使用 服务器发送事件 (SSE)。在服务器需要向客户端发送基于事件的更新时,SSE 特别有用。

添加依赖项

要使用 WebSockets,您需要在构建脚本中包含 ktor-server-websockets 构件:

Kotlin
Groovy
XML

安装 WebSockets

要将 WebSockets 插件安装到应用,请将其传递给指定

模块
模块允许您通过对路由进行分组来构建应用程序。
中的 install 函数。 下面的代码片段显示了如何安装 WebSockets ...

  • ... 在 embeddedServer 函数调用内部。
  • ... 在显式定义的 moduleApplication 类的扩展函数)内部。
kotlin
kotlin

配置 WebSockets

或者,您可以通过传递 WebSocketOptionsinstall 代码块内配置插件:

  • 使用 pingPeriod 属性指定 ping 之间的时间间隔。
  • 使用 timeout 属性设置连接关闭前的超时时间。
  • 使用 maxFrameSize 属性设置可以接收或发送的最大帧。
  • 使用 masking 属性指定是否启用掩码。
  • 使用 contentConverter 属性设置序列化/反序列化的转换器。
kotlin
install(WebSockets) {
    pingPeriod = 15.seconds
    timeout = 15.seconds
    maxFrameSize = Long.MAX_VALUE
    masking = false
}

处理 WebSockets 会话

API 概览

安装并配置 WebSockets 插件后,您可以定义一个端点来处理 WebSocket 会话。要在服务器上定义 WebSocket 端点,请在 routing 代码块中调用 webSocket 函数:

kotlin
routing { 
    webSocket("/echo") {
       // 处理 WebSocket 会话
    }
}

在此示例中,当使用 默认配置 时,服务器接受指向 ws://localhost:8080/echo 的 WebSocket 请求。

webSocket 代码块中,您可以定义 WebSocket 会话的处理程序,该会话由 DefaultWebSocketServerSession 类表示。 代码块中提供以下函数和属性:

  • 使用 send 函数向客户端发送文本内容。
  • 使用 incomingoutgoing 属性访问用于接收和发送 WebSocket 帧的通道。帧由 Frame 类表示。
  • 使用 close 函数发送带有指定原因的关闭帧。

在处理会话时,您可以检查帧类型,例如:

  • Frame.Text 是文本帧。对于此帧类型,您可以使用 Frame.Text.readText() 读取其内容。
  • Frame.Binary 是二进制帧。对于此类型,您可以使用 Frame.Binary.readBytes() 读取其内容。

请注意,incoming 通道不包含控制帧,例如 ping/pong 或关闭帧。 要处理控制帧并重组分段帧,请使用 webSocketRaw 函数来处理 WebSocket 会话。

要获取有关客户端的信息(例如客户端的 IP 地址),请使用 call 属性。详细了解 常规请求信息

下面,我们将查看使用此 API 的示例。

示例:处理单个会话

下面的示例展示了如何创建 echo WebSocket 端点以处理与一个客户端的会话:

kotlin
routing {
    webSocket("/echo") {
        send("Please enter your name")
        for (frame in incoming) {
            frame as? Frame.Text ?: continue
            val receivedText = frame.readText()
            if (receivedText.equals("bye", ignoreCase = true)) {
                close(CloseReason(CloseReason.Codes.NORMAL, "Client said BYE"))
            } else {
                send(Frame.Text("Hi, $receivedText!"))
            }
        }
    }
}

有关完整示例,请参阅 server-websockets

示例:处理多个会话

为了高效地管理多个 WebSocket 会话并处理广播,您可以利用 Kotlin 的 SharedFlow。 这种方法为管理 WebSocket 通信提供了一种可扩展且并发友好的方法。以下是实现此模式的方法:

  1. 定义用于广播消息的 SharedFlow
kotlin
val messageResponseFlow = MutableSharedFlow<MessageResponse>()
val sharedFlow = messageResponseFlow.asSharedFlow()
  1. 在您的 WebSocket 路由中,实现广播和消息处理逻辑:
kotlin

        webSocket("/ws") {
            send("You are connected to WebSocket!")

            val job = launch {
                sharedFlow.collect { message ->
                    send(message.message)
                }
            }

            runCatching {
                incoming.consumeEach { frame ->
                    if (frame is Frame.Text) {
                        val receivedText = frame.readText()
                        val messageResponse = MessageResponse(receivedText)
                        messageResponseFlow.emit(messageResponse)
                    }
                }
            }.onFailure { exception ->
                println("WebSocket exception: ${exception.localizedMessage}")
            }.also {
                job.cancel()
            }
        }

runCatching 代码块处理传入消息并将其发送到 SharedFlow,然后由其广播给所有收集器。

通过使用此模式,您可以高效地管理多个 WebSocket 会话,而无需手动跟踪单个连接。这种方法非常适合具有许多并发 WebSocket 连接的应用,并提供了一种干净、响应式的方式来处理消息广播。

有关完整示例,请参阅 server-websockets-sharedflow

WebSocket API 与 Ktor

来自 WebSocket API 的标准事件 以以下方式映射到 Ktor:

  • onConnect 发生在代码块开始处。
  • onMessage 发生在成功读取消息(例如,使用 incoming.receive())或使用挂起迭代 for(frame in incoming) 之后。
  • onClose 发生在 incoming 通道关闭时。这将完成挂起迭代,或者在尝试接收消息时抛出 ClosedReceiveChannelException
  • onError 等同于其他异常。

onCloseonError 中,都会设置 closeReason 属性。

在以下示例中,仅当抛出异常(ClosedReceiveChannelException 或其他异常)时,才会退出无限循环:

kotlin
webSocket("/echo") {
    println("onConnect")
    try {
        for (frame in incoming){
            val text = (frame as Frame.Text).readText()
            println("onMessage")
            received += text
            outgoing.send(Frame.Text(text))
        }
    } catch (e: ClosedReceiveChannelException) {
        println("onClose ${closeReason.await()}")
    } catch (e: Throwable) {
        println("onError ${closeReason.await()}")
        e.printStackTrace()
    }
}