Skip to content
Server Plugin

Ktor サーバーにおける WebSockets

必須の依存関係: io.ktor:ktor-server-websockets

コード例: server-websockets

ネイティブサーバー
Ktor supports Kotlin/Native and allows you to run a server without an additional runtime or virtual machine.
のサポート: ✅

WebSocketは、単一のTCP接続を介して、ユーザーのブラウザとサーバーとの間にフルデュプレックス(全二重)の通信セッションを提供するプロトコルです。これは、サーバーとの間でのリアルタイムなデータ転送を必要とするアプリケーションを作成する場合に特に便利です。

Ktorは、サーバー側とクライアント側の両方でWebSocketプロトコルをサポートしています。

Ktorでは以下のことが可能です。

  • フレームサイズ、pingの間隔などの基本的なWebSocket設定を構成する。
  • サーバーとクライアント間でメッセージを交換するためのWebSocketセッションを処理する。
  • WebSocket拡張機能を追加する。例えば、Deflate拡張機能を使用したり、カスタム拡張機能を実装したりできます。

クライアント側でのWebSocketサポートについては、WebSocketsクライアントプラグインを参照してください。

一方向の通信セッションについては、Server-Sent Events (SSE)の使用を検討してください。SSEは、サーバーがクライアントに対してイベントベースの更新を送信する必要がある場合に特に有用です。

依存関係の追加

WebSocketsを使用するには、ビルドスクリプトにktor-server-websocketsアーティファクトを含める必要があります。

Kotlin
Groovy
XML

WebSocketsのインストール

アプリケーションにWebSocketsプラグインをインストールするには、指定された

モジュール
Modules allow you to structure your application by grouping routes.
内のinstall関数に渡します。 以下のコードスニペットは、WebSocketsをインストールする方法を示しています。

  • ... embeddedServer関数の呼び出し内。
  • ... Applicationクラスの拡張関数である、明示的に定義されたmodule内。
kotlin
kotlin

WebSocketsの構成

オプションとして、installブロック内でWebSocketOptionsを渡すことでプラグインを構成できます。

  • pingPeriodプロパティを使用して、pingの間隔を指定します。
  • timeoutプロパティを使用して、接続を閉じるまでのタイムアウトを設定します。
  • maxFrameSizeプロパティを使用して、受信または送信可能な最大フレームサイズを設定します。
  • maskingプロパティを使用して、マスキングを有効にするかどうかを指定します。
  • contentConverterプロパティを使用して、シリアライズ/デシリアライズのためのコンバーターを設定します。
kotlin
install(WebSockets) {
    pingPeriod = 15.seconds
    timeout = 15.seconds
    maxFrameSize = Long.MAX_VALUE
    masking = false
}

WebSocketsセッションの処理

APIの概要

WebSocketsプラグインをインストールして構成したら、WebSocketセッションを処理するためのエンドポイントを定義できます。サーバー上でWebSocketエンドポイントを定義するには、routingブロック内でwebSocket関数を呼び出します。

kotlin
routing { 
    webSocket("/echo") {
       // WebSocketセッションを処理する
    }
}

この例では、デフォルト設定が使用されている場合、サーバーはws://localhost:8080/echoへのWebSocketリクエストを受け入れます。

webSocketブロック内では、DefaultWebSocketServerSessionクラスによって表されるWebSocketセッションのハンドラーを定義します。 ブロック内では、以下の関数とプロパティが利用可能です。

  • send関数を使用して、テキストコンテンツをクライアントに送信します。
  • incomingおよびoutgoingプロパティを使用して、WebSocketフレームを受信および送信するためのチャネルにアクセスします。フレームはFrameクラスによって表されます。
  • close関数を使用して、指定された理由とともにクローズフレームを送信します。

セッションを処理する際、以下のようにフレームタイプを確認できます。

  • Frame.Textはテキストフレームです。このフレームタイプでは、Frame.Text.readText()を使用してその内容を読み取ることができます。
  • Frame.Binaryはバイナリフレームです。このタイプでは、Frame.Binary.readBytes()を使用してその内容を読み取ることができます。

incomingチャネルには、ping/pongやクローズフレームなどの制御フレームは含まれていないことに注意してください。 制御フレームを処理したり、断片化されたフレームを再構成したりするには、webSocketRaw関数を使用してWebSocketセッションを処理します。

クライアントに関する情報(クライアントのIPアドレスなど)を取得するには、callプロパティを使用します。一般的なリクエスト情報について詳しく学びましょう。

以下では、このAPIの使用例を見ていきます。

例: 単一セッションの処理

以下の例は、1つのクライアントとのセッションを処理するためのecho WebSocketエンドポイントを作成する方法を示しています。

kotlin
routing {
    webSocket("/echo") {
        send("Please enter your name")
        for (frame in incoming) {
            frame as? Frame.Text ?: continue
            val receivedText = frame.readText()
            if (receivedText.equals("bye", ignoreCase = true)) {
                close(CloseReason(CloseReason.Codes.NORMAL, "Client said BYE"))
            } else {
                send(Frame.Text("Hi, $receivedText!"))
            }
        }
    }
}

完全な例については、server-websocketsを参照してください。

例: 複数セッションの処理

複数のWebSocketセッションを効率的に管理し、ブロードキャストを処理するために、KotlinのSharedFlowを活用できます。このアプローチは、WebSocket通信を管理するための、スケーラブルで並行処理に適した方法を提供します。このパターンの実装方法は以下の通りです。

  1. メッセージをブロードキャストするためのSharedFlowを定義します。
kotlin
val messageResponseFlow = MutableSharedFlow<MessageResponse>()
val sharedFlow = messageResponseFlow.asSharedFlow()
  1. WebSocketルート内で、ブロードキャストとメッセージ処理のロジックを実装します。
kotlin

        webSocket("/ws") {
            send("You are connected to WebSocket!")

            val job = launch {
                sharedFlow.collect { message ->
                    send(message.message)
                }
            }

            runCatching {
                incoming.consumeEach { frame ->
                    if (frame is Frame.Text) {
                        val receivedText = frame.readText()
                        val messageResponse = MessageResponse(receivedText)
                        messageResponseFlow.emit(messageResponse)
                    }
                }
            }.onFailure { exception ->
                println("WebSocket exception: ${exception.localizedMessage}")
            }.also {
                job.cancel()
            }
        }

runCatchingブロックは受信メッセージを処理し、それらをSharedFlowにエミットします。その後、SharedFlowはすべてのコレクターにブロードキャストします。

このパターンを使用すると、個々の接続を手動で追跡することなく、複数のWebSocketセッションを効率的に管理できます。このアプローチは、多くの同時接続を持つアプリケーションでも適切にスケールし、メッセージのブロードキャストを処理するためのクリーンでリアクティブな方法を提供します。

完全な例については、server-websockets-sharedflowを参照してください。

WebSocket APIとKtor

WebSocket APIの標準イベントは、Ktorでは以下のようにマッピングされます。

  • onConnectはブロックの開始時に発生します。
  • onMessageは、メッセージの読み取りに成功した後(例えば、incoming.receive()を使用した場合)、またはfor(frame in incoming)によるサスペンド・イテレーションを使用している場合に発生します。
  • onCloseは、incomingチャネルが閉じられたときに発生します。これによりサスペンド・イテレーションが完了するか、メッセージを受信しようとしたときにClosedReceiveChannelExceptionがスローされます。
  • onErrorは他の例外と同等です。

onCloseonErrorの両方で、closeReasonプロパティが設定されます。

次の例では、例外(ClosedReceiveChannelExceptionまたはその他の例外)が発生したときにのみ、無限ループから抜け出します。

kotlin
webSocket("/echo") {
    println("onConnect")
    try {
        for (frame in incoming){
            val text = (frame as Frame.Text).readText()
            println("onMessage")
            received += text
            outgoing.send(Frame.Text(text))
        }
    } catch (e: ClosedReceiveChannelException) {
        println("onClose ${closeReason.await()}")
    } catch (e: Throwable) {
        println("onError ${closeReason.await()}")
        e.printStackTrace()
    }
}