Ktor サーバーにおける WebSockets
必須の依存関係: io.ktor:ktor-server-websockets
コード例: server-websockets
WebSocketは、単一のTCP接続を介して、ユーザーのブラウザとサーバーとの間にフルデュプレックス(全二重)の通信セッションを提供するプロトコルです。これは、サーバーとの間でのリアルタイムなデータ転送を必要とするアプリケーションを作成する場合に特に便利です。
Ktorは、サーバー側とクライアント側の両方でWebSocketプロトコルをサポートしています。
Ktorでは以下のことが可能です。
- フレームサイズ、pingの間隔などの基本的なWebSocket設定を構成する。
- サーバーとクライアント間でメッセージを交換するためのWebSocketセッションを処理する。
- WebSocket拡張機能を追加する。例えば、Deflate拡張機能を使用したり、カスタム拡張機能を実装したりできます。
クライアント側でのWebSocketサポートについては、WebSocketsクライアントプラグインを参照してください。
一方向の通信セッションについては、Server-Sent Events (SSE)の使用を検討してください。SSEは、サーバーがクライアントに対してイベントベースの更新を送信する必要がある場合に特に有用です。
依存関係の追加
WebSocketsを使用するには、ビルドスクリプトにktor-server-websocketsアーティファクトを含める必要があります。
WebSocketsのインストール
アプリケーションにWebSocketsプラグインをインストールするには、指定された
install関数に渡します。 以下のコードスニペットは、WebSocketsをインストールする方法を示しています。 - ...
embeddedServer関数の呼び出し内。 - ...
Applicationクラスの拡張関数である、明示的に定義されたmodule内。
WebSocketsの構成
オプションとして、installブロック内でWebSocketOptionsを渡すことでプラグインを構成できます。
pingPeriodプロパティを使用して、pingの間隔を指定します。timeoutプロパティを使用して、接続を閉じるまでのタイムアウトを設定します。maxFrameSizeプロパティを使用して、受信または送信可能な最大フレームサイズを設定します。maskingプロパティを使用して、マスキングを有効にするかどうかを指定します。contentConverterプロパティを使用して、シリアライズ/デシリアライズのためのコンバーターを設定します。
install(WebSockets) {
pingPeriod = 15.seconds
timeout = 15.seconds
maxFrameSize = Long.MAX_VALUE
masking = false
}WebSocketsセッションの処理
APIの概要
WebSocketsプラグインをインストールして構成したら、WebSocketセッションを処理するためのエンドポイントを定義できます。サーバー上でWebSocketエンドポイントを定義するには、routingブロック内でwebSocket関数を呼び出します。
routing {
webSocket("/echo") {
// WebSocketセッションを処理する
}
}この例では、デフォルト設定が使用されている場合、サーバーはws://localhost:8080/echoへのWebSocketリクエストを受け入れます。
webSocketブロック内では、DefaultWebSocketServerSessionクラスによって表されるWebSocketセッションのハンドラーを定義します。 ブロック内では、以下の関数とプロパティが利用可能です。
send関数を使用して、テキストコンテンツをクライアントに送信します。incomingおよびoutgoingプロパティを使用して、WebSocketフレームを受信および送信するためのチャネルにアクセスします。フレームはFrameクラスによって表されます。close関数を使用して、指定された理由とともにクローズフレームを送信します。
セッションを処理する際、以下のようにフレームタイプを確認できます。
Frame.Textはテキストフレームです。このフレームタイプでは、Frame.Text.readText()を使用してその内容を読み取ることができます。Frame.Binaryはバイナリフレームです。このタイプでは、Frame.Binary.readBytes()を使用してその内容を読み取ることができます。
incomingチャネルには、ping/pongやクローズフレームなどの制御フレームは含まれていないことに注意してください。 制御フレームを処理したり、断片化されたフレームを再構成したりするには、webSocketRaw関数を使用してWebSocketセッションを処理します。
クライアントに関する情報(クライアントのIPアドレスなど)を取得するには、
callプロパティを使用します。一般的なリクエスト情報について詳しく学びましょう。
以下では、このAPIの使用例を見ていきます。
例: 単一セッションの処理
以下の例は、1つのクライアントとのセッションを処理するためのecho WebSocketエンドポイントを作成する方法を示しています。
routing {
webSocket("/echo") {
send("Please enter your name")
for (frame in incoming) {
frame as? Frame.Text ?: continue
val receivedText = frame.readText()
if (receivedText.equals("bye", ignoreCase = true)) {
close(CloseReason(CloseReason.Codes.NORMAL, "Client said BYE"))
} else {
send(Frame.Text("Hi, $receivedText!"))
}
}
}
}完全な例については、server-websocketsを参照してください。
例: 複数セッションの処理
複数のWebSocketセッションを効率的に管理し、ブロードキャストを処理するために、KotlinのSharedFlowを活用できます。このアプローチは、WebSocket通信を管理するための、スケーラブルで並行処理に適した方法を提供します。このパターンの実装方法は以下の通りです。
- メッセージをブロードキャストするための
SharedFlowを定義します。
val messageResponseFlow = MutableSharedFlow<MessageResponse>()
val sharedFlow = messageResponseFlow.asSharedFlow()- WebSocketルート内で、ブロードキャストとメッセージ処理のロジックを実装します。
webSocket("/ws") {
send("You are connected to WebSocket!")
val job = launch {
sharedFlow.collect { message ->
send(message.message)
}
}
runCatching {
incoming.consumeEach { frame ->
if (frame is Frame.Text) {
val receivedText = frame.readText()
val messageResponse = MessageResponse(receivedText)
messageResponseFlow.emit(messageResponse)
}
}
}.onFailure { exception ->
println("WebSocket exception: ${exception.localizedMessage}")
}.also {
job.cancel()
}
}runCatchingブロックは受信メッセージを処理し、それらをSharedFlowにエミットします。その後、SharedFlowはすべてのコレクターにブロードキャストします。
このパターンを使用すると、個々の接続を手動で追跡することなく、複数のWebSocketセッションを効率的に管理できます。このアプローチは、多くの同時接続を持つアプリケーションでも適切にスケールし、メッセージのブロードキャストを処理するためのクリーンでリアクティブな方法を提供します。
完全な例については、server-websockets-sharedflowを参照してください。
WebSocket APIとKtor
WebSocket APIの標準イベントは、Ktorでは以下のようにマッピングされます。
onConnectはブロックの開始時に発生します。onMessageは、メッセージの読み取りに成功した後(例えば、incoming.receive()を使用した場合)、またはfor(frame in incoming)によるサスペンド・イテレーションを使用している場合に発生します。onCloseは、incomingチャネルが閉じられたときに発生します。これによりサスペンド・イテレーションが完了するか、メッセージを受信しようとしたときにClosedReceiveChannelExceptionがスローされます。onErrorは他の例外と同等です。
onCloseとonErrorの両方で、closeReasonプロパティが設定されます。
次の例では、例外(ClosedReceiveChannelExceptionまたはその他の例外)が発生したときにのみ、無限ループから抜け出します。
webSocket("/echo") {
println("onConnect")
try {
for (frame in incoming){
val text = (frame as Frame.Text).readText()
println("onMessage")
received += text
outgoing.send(Frame.Text(text))
}
} catch (e: ClosedReceiveChannelException) {
println("onClose ${closeReason.await()}")
} catch (e: Throwable) {
println("onError ${closeReason.await()}")
e.printStackTrace()
}
}