KtorサーバーにおけるWebSockets
必須の依存関係: io.ktor:ktor-server-websockets
コード例: server-websockets
WebSocketは、単一のTCP接続を介してユーザーのブラウザとサーバー間で全二重通信セッションを提供するプロトコルです。これは、サーバーとの間でリアルタイムのデータ転送を必要とするアプリケーションの作成に特に役立ちます。
Ktorは、サーバー側とクライアント側の両方でWebSocketプロトコルをサポートしています。
Ktorを使用すると、次のことができます。
- 基本的なWebSocket設定(フレームサイズ、ping期間など)を構成できます。
- サーバーとクライアント間でメッセージを交換するためのWebSocketセッションを処理できます。
- WebSocket拡張機能を追加できます。例えば、Deflate拡張機能を使用したり、カスタム拡張機能を実装したりできます。
クライアント側でのWebSocketサポートについては、WebSocketsクライアントプラグインを参照してください。
一方向通信セッションの場合、Server-Sent Events (SSE)の使用を検討してください。SSEは、サーバーがクライアントにイベントベースの更新を送信する必要がある場合に特に役立ちます。
依存関係を追加
WebSockets
を使用するには、ビルドスクリプトにktor-server-websockets
アーティファクトを含める必要があります。
WebSocketsをインストールする
アプリケーションにWebSockets
プラグインをインストールするには、指定された
install
関数に渡します。 以下のコードスニペットは、WebSockets
をインストールする方法を示しています... - ...
embeddedServer
関数呼び出しの内部。 - ... 明示的に定義された
module
(Application
クラスの拡張関数)の内部。
WebSocketsを構成する
オプションで、WebSocketOptionsを渡すことで、install
ブロック内でプラグインを構成できます。
pingPeriod
プロパティを使用して、ping間の期間を指定します。timeout
プロパティを使用して、接続が閉じられるまでのタイムアウトを設定します。maxFrameSize
プロパティを使用して、送受信できる最大フレームを設定します。masking
プロパティを使用して、マスキングが有効になっているかどうかを指定します。contentConverter
プロパティを使用して、シリアライゼーション/デシリアライゼーション用のコンバーターを設定します。
install(WebSockets) {
pingPeriod = 15.seconds
timeout = 15.seconds
maxFrameSize = Long.MAX_VALUE
masking = false
}
WebSocketセッションを処理する
APIの概要
WebSockets
プラグインをインストールして構成したら、WebSocketセッションを処理するエンドポイントを定義できます。サーバー上でWebSocketエンドポイントを定義するには、ルーティングブロック内でwebSocket
関数を呼び出します。
routing {
webSocket("/echo") {
// Handle a WebSocket session
}
}
この例では、デフォルト構成が使用されている場合、サーバーはws://localhost:8080/echo
へのWebSocketリクエストを受け入れます。
webSocket
ブロック内では、DefaultWebSocketServerSessionクラスで表されるWebSocketセッションのハンドラーを定義します。 ブロック内で以下の関数とプロパティを使用できます。
send
関数を使用して、テキストコンテンツをクライアントに送信します。incoming
およびoutgoing
プロパティを使用して、WebSocketフレームを受信および送信するためのチャネルにアクセスします。フレームはFrame
クラスで表されます。close
関数を使用して、指定された理由でクローズフレームを送信します。
セッションを処理する際に、フレームタイプを確認できます。例えば:
Frame.Text
はテキストフレームです。このフレームタイプの場合、Frame.Text.readText()
を使用してその内容を読み取ることができます。Frame.Binary
はバイナリフレームです。このタイプの場合、Frame.Binary.readBytes()
を使用してその内容を読み取ることができます。
incoming
チャネルには、ping/pongやクローズフレームなどの制御フレームは含まれていないことに注意してください。 制御フレームを処理し、断片化されたフレームを再構築するには、webSocketRaw関数を使用してWebSocketセッションを処理します。
クライアントに関する情報(クライアントのIPアドレスなど)を取得するには、
call
プロパティを使用します。一般的なリクエスト情報について学びましょう。
以下では、このAPIの使用例を見ていきます。
例: 単一セッションの処理
以下の例は、1つのクライアントとのセッションを処理するecho
WebSocketエンドポイントを作成する方法を示しています。
routing {
webSocket("/echo") {
send("Please enter your name")
for (frame in incoming) {
frame as? Frame.Text ?: continue
val receivedText = frame.readText()
if (receivedText.equals("bye", ignoreCase = true)) {
close(CloseReason(CloseReason.Codes.NORMAL, "Client said BYE"))
} else {
send(Frame.Text("Hi, $receivedText!"))
}
}
}
}
完全な例については、server-websocketsを参照してください。
例: 複数セッションの処理
複数のWebSocketセッションを効率的に管理し、ブロードキャストを処理するには、KotlinのSharedFlow
を利用できます。このアプローチは、WebSocket通信を管理するためのスケーラブルで並行処理に適した方法を提供します。このパターンを実装する方法を以下に示します。
- メッセージをブロードキャストするための
SharedFlow
を定義します。
val messageResponseFlow = MutableSharedFlow<MessageResponse>()
val sharedFlow = messageResponseFlow.asSharedFlow()
- WebSocketルートで、ブロードキャストとメッセージ処理ロジックを実装します。
webSocket("/ws") {
send("You are connected to WebSocket!")
val job = launch {
sharedFlow.collect { message ->
send(message.message)
}
}
runCatching {
incoming.consumeEach { frame ->
if (frame is Frame.Text) {
val receivedText = frame.readText()
val messageResponse = MessageResponse(receivedText)
messageResponseFlow.emit(messageResponse)
}
}
}.onFailure { exception ->
println("WebSocket exception: ${exception.localizedMessage}")
}.also {
job.cancel()
}
}
runCatching
ブロックは、受信メッセージを処理し、それらをSharedFlow
に発行し、その後すべてのコレクターにブロードキャストします。
このパターンを使用することで、個々の接続を手動で追跡することなく、複数のWebSocketセッションを効率的に管理できます。このアプローチは、多数の同時WebSocket接続を持つアプリケーションでうまくスケールし、メッセージブロードキャストを処理するためのクリーンでリアクティブな方法を提供します。
完全な例については、server-websockets-sharedflowを参照してください。
WebSocket APIとKtor
WebSocket APIの標準イベントは、Ktorでは次のようにマッピングされます。
onConnect
はブロックの開始時に発生します。onMessage
は、メッセージを正常に読み取った後(例:incoming.receive()
を使用した場合)、またはfor(frame in incoming)
でサスペンドされたイテレーションを使用した後に発生します。onClose
は、incoming
チャネルが閉じられたときに発生します。これにより、サスペンドされたイテレーションが完了するか、メッセージを受信しようとしたときにClosedReceiveChannelException
がスローされます。onError
は他の例外と同等です。
onClose
とonError
の両方で、closeReason
プロパティが設定されます。
以下の例では、無限ループは例外が発生した場合にのみ終了します(ClosedReceiveChannelException
または他の例外)。
webSocket("/echo") {
println("onConnect")
try {
for (frame in incoming){
val text = (frame as Frame.Text).readText()
println("onMessage")
received += text
outgoing.send(Frame.Text(text))
}
} catch (e: ClosedReceiveChannelException) {
println("onClose ${closeReason.await()}")
} catch (e: Throwable) {
println("onError ${closeReason.await()}")
e.printStackTrace()
}
}