Ktor Server 中的 WebSockets
所需依赖项:io.ktor:ktor-server-websockets
代码示例: server-websockets
WebSocket 是一种协议,它通过单个 TCP 连接在用户的浏览器和服务器之间提供全双工通信会话。它对于创建需要与服务器进行实时数据传输的应用特别有用。
Ktor 在服务器端和客户端都支持 WebSocket 协议。
Ktor 允许您:
- 配置基础 WebSocket 设置,例如帧大小、ping 周期等。
- 处理用于在服务器和客户端之间交换消息的 WebSocket 会话。
- 添加 WebSocket 扩展。例如,您可以使用 Deflate 扩展或实现 自定义扩展。
要了解客户端的 WebSocket 支持,请参阅 WebSockets 客户端插件。
对于单向通信会话,请考虑使用 服务器发送事件 (SSE)。在服务器需要向客户端发送基于事件的更新时,SSE 特别有用。
添加依赖项
要使用 WebSockets,您需要在构建脚本中包含 ktor-server-websockets 构件:
安装 WebSockets
要将 WebSockets 插件安装到应用,请将其传递给指定
install 函数。 下面的代码片段显示了如何安装 WebSockets ... - ... 在
embeddedServer函数调用内部。 - ... 在显式定义的
module(Application类的扩展函数)内部。
配置 WebSockets
或者,您可以通过传递 WebSocketOptions 在 install 代码块内配置插件:
- 使用
pingPeriod属性指定 ping 之间的时间间隔。 - 使用
timeout属性设置连接关闭前的超时时间。 - 使用
maxFrameSize属性设置可以接收或发送的最大帧。 - 使用
masking属性指定是否启用掩码。 - 使用
contentConverter属性设置序列化/反序列化的转换器。
install(WebSockets) {
pingPeriod = 15.seconds
timeout = 15.seconds
maxFrameSize = Long.MAX_VALUE
masking = false
}处理 WebSockets 会话
API 概览
安装并配置 WebSockets 插件后,您可以定义一个端点来处理 WebSocket 会话。要在服务器上定义 WebSocket 端点,请在 routing 代码块中调用 webSocket 函数:
routing {
webSocket("/echo") {
// 处理 WebSocket 会话
}
}在此示例中,当使用 默认配置 时,服务器接受指向 ws://localhost:8080/echo 的 WebSocket 请求。
在 webSocket 代码块中,您可以定义 WebSocket 会话的处理程序,该会话由 DefaultWebSocketServerSession 类表示。 代码块中提供以下函数和属性:
- 使用
send函数向客户端发送文本内容。 - 使用
incoming和outgoing属性访问用于接收和发送 WebSocket 帧的通道。帧由Frame类表示。 - 使用
close函数发送带有指定原因的关闭帧。
在处理会话时,您可以检查帧类型,例如:
Frame.Text是文本帧。对于此帧类型,您可以使用Frame.Text.readText()读取其内容。Frame.Binary是二进制帧。对于此类型,您可以使用Frame.Binary.readBytes()读取其内容。
请注意,
incoming通道不包含控制帧,例如 ping/pong 或关闭帧。 要处理控制帧并重组分段帧,请使用 webSocketRaw 函数来处理 WebSocket 会话。
要获取有关客户端的信息(例如客户端的 IP 地址),请使用
call属性。详细了解 常规请求信息。
下面,我们将查看使用此 API 的示例。
示例:处理单个会话
下面的示例展示了如何创建 echo WebSocket 端点以处理与一个客户端的会话:
routing {
webSocket("/echo") {
send("Please enter your name")
for (frame in incoming) {
frame as? Frame.Text ?: continue
val receivedText = frame.readText()
if (receivedText.equals("bye", ignoreCase = true)) {
close(CloseReason(CloseReason.Codes.NORMAL, "Client said BYE"))
} else {
send(Frame.Text("Hi, $receivedText!"))
}
}
}
}有关完整示例,请参阅 server-websockets。
示例:处理多个会话
为了高效地管理多个 WebSocket 会话并处理广播,您可以利用 Kotlin 的 SharedFlow。 这种方法为管理 WebSocket 通信提供了一种可扩展且并发友好的方法。以下是实现此模式的方法:
- 定义用于广播消息的
SharedFlow:
val messageResponseFlow = MutableSharedFlow<MessageResponse>()
val sharedFlow = messageResponseFlow.asSharedFlow()- 在您的 WebSocket 路由中,实现广播和消息处理逻辑:
webSocket("/ws") {
send("You are connected to WebSocket!")
val job = launch {
sharedFlow.collect { message ->
send(message.message)
}
}
runCatching {
incoming.consumeEach { frame ->
if (frame is Frame.Text) {
val receivedText = frame.readText()
val messageResponse = MessageResponse(receivedText)
messageResponseFlow.emit(messageResponse)
}
}
}.onFailure { exception ->
println("WebSocket exception: ${exception.localizedMessage}")
}.also {
job.cancel()
}
}runCatching 代码块处理传入消息并将其发送到 SharedFlow,然后由其广播给所有收集器。
通过使用此模式,您可以高效地管理多个 WebSocket 会话,而无需手动跟踪单个连接。这种方法非常适合具有许多并发 WebSocket 连接的应用,并提供了一种干净、响应式的方式来处理消息广播。
有关完整示例,请参阅 server-websockets-sharedflow。
WebSocket API 与 Ktor
来自 WebSocket API 的标准事件 以以下方式映射到 Ktor:
onConnect发生在代码块开始处。onMessage发生在成功读取消息(例如,使用incoming.receive())或使用挂起迭代for(frame in incoming)之后。onClose发生在incoming通道关闭时。这将完成挂起迭代,或者在尝试接收消息时抛出ClosedReceiveChannelException。onError等同于其他异常。
在 onClose 和 onError 中,都会设置 closeReason 属性。
在以下示例中,仅当抛出异常(ClosedReceiveChannelException 或其他异常)时,才会退出无限循环:
webSocket("/echo") {
println("onConnect")
try {
for (frame in incoming){
val text = (frame as Frame.Text).readText()
println("onMessage")
received += text
outgoing.send(Frame.Text(text))
}
} catch (e: ClosedReceiveChannelException) {
println("onClose ${closeReason.await()}")
} catch (e: Throwable) {
println("onError ${closeReason.await()}")
e.printStackTrace()
}
}