Skip to content
Server Plugin

KtorサーバーにおけるWebSockets

必須の依存関係: io.ktor:ktor-server-websockets

コード例: server-websockets

ネイティブサーバー
KtorはKotlin/Nativeをサポートしており、追加のランタイムや仮想マシンなしでサーバーを実行できます。
のサポート: ✅

WebSocketは、単一のTCP接続を介してユーザーのブラウザとサーバー間で全二重通信セッションを提供するプロトコルです。これは、サーバーとの間でリアルタイムのデータ転送を必要とするアプリケーションの作成に特に役立ちます。

Ktorは、サーバー側とクライアント側の両方でWebSocketプロトコルをサポートしています。

Ktorを使用すると、次のことができます。

  • 基本的なWebSocket設定(フレームサイズ、ping期間など)を構成できます。
  • サーバーとクライアント間でメッセージを交換するためのWebSocketセッションを処理できます。
  • WebSocket拡張機能を追加できます。例えば、Deflate拡張機能を使用したり、カスタム拡張機能を実装したりできます。

クライアント側でのWebSocketサポートについては、WebSocketsクライアントプラグインを参照してください。

一方向通信セッションの場合、Server-Sent Events (SSE)の使用を検討してください。SSEは、サーバーがクライアントにイベントベースの更新を送信する必要がある場合に特に役立ちます。

依存関係を追加

WebSocketsを使用するには、ビルドスクリプトにktor-server-websocketsアーティファクトを含める必要があります。

Kotlin
Groovy
XML

WebSocketsをインストールする

アプリケーションにWebSocketsプラグインをインストールするには、指定された

モジュール
モジュールを使用すると、ルートをグループ化してアプリケーションを構造化できます。
内のinstall関数に渡します。 以下のコードスニペットは、WebSocketsをインストールする方法を示しています...

  • ... embeddedServer関数呼び出しの内部。
  • ... 明示的に定義されたmoduleApplicationクラスの拡張関数)の内部。
kotlin
kotlin

WebSocketsを構成する

オプションで、WebSocketOptionsを渡すことで、installブロック内でプラグインを構成できます。

  • pingPeriodプロパティを使用して、ping間の期間を指定します。
  • timeoutプロパティを使用して、接続が閉じられるまでのタイムアウトを設定します。
  • maxFrameSizeプロパティを使用して、送受信できる最大フレームを設定します。
  • maskingプロパティを使用して、マスキングが有効になっているかどうかを指定します。
  • contentConverterプロパティを使用して、シリアライゼーション/デシリアライゼーション用のコンバーターを設定します。
kotlin
install(WebSockets) {
    pingPeriod = 15.seconds
    timeout = 15.seconds
    maxFrameSize = Long.MAX_VALUE
    masking = false
}

WebSocketセッションを処理する

APIの概要

WebSocketsプラグインをインストールして構成したら、WebSocketセッションを処理するエンドポイントを定義できます。サーバー上でWebSocketエンドポイントを定義するには、ルーティングブロック内でwebSocket関数を呼び出します。

kotlin
routing { 
    webSocket("/echo") {
       // Handle a WebSocket session
    }
}

この例では、デフォルト構成が使用されている場合、サーバーはws://localhost:8080/echoへのWebSocketリクエストを受け入れます。

webSocketブロック内では、DefaultWebSocketServerSessionクラスで表されるWebSocketセッションのハンドラーを定義します。 ブロック内で以下の関数とプロパティを使用できます。

  • send関数を使用して、テキストコンテンツをクライアントに送信します。
  • incomingおよびoutgoingプロパティを使用して、WebSocketフレームを受信および送信するためのチャネルにアクセスします。フレームはFrameクラスで表されます。
  • close関数を使用して、指定された理由でクローズフレームを送信します。

セッションを処理する際に、フレームタイプを確認できます。例えば:

  • Frame.Textはテキストフレームです。このフレームタイプの場合、Frame.Text.readText()を使用してその内容を読み取ることができます。
  • Frame.Binaryはバイナリフレームです。このタイプの場合、Frame.Binary.readBytes()を使用してその内容を読み取ることができます。

incomingチャネルには、ping/pongやクローズフレームなどの制御フレームは含まれていないことに注意してください。 制御フレームを処理し、断片化されたフレームを再構築するには、webSocketRaw関数を使用してWebSocketセッションを処理します。

クライアントに関する情報(クライアントのIPアドレスなど)を取得するには、callプロパティを使用します。一般的なリクエスト情報について学びましょう。

以下では、このAPIの使用例を見ていきます。

例: 単一セッションの処理

以下の例は、1つのクライアントとのセッションを処理するecho WebSocketエンドポイントを作成する方法を示しています。

kotlin
routing {
    webSocket("/echo") {
        send("Please enter your name")
        for (frame in incoming) {
            frame as? Frame.Text ?: continue
            val receivedText = frame.readText()
            if (receivedText.equals("bye", ignoreCase = true)) {
                close(CloseReason(CloseReason.Codes.NORMAL, "Client said BYE"))
            } else {
                send(Frame.Text("Hi, $receivedText!"))
            }
        }
    }
}

完全な例については、server-websocketsを参照してください。

例: 複数セッションの処理

複数のWebSocketセッションを効率的に管理し、ブロードキャストを処理するには、KotlinのSharedFlowを利用できます。このアプローチは、WebSocket通信を管理するためのスケーラブルで並行処理に適した方法を提供します。このパターンを実装する方法を以下に示します。

  1. メッセージをブロードキャストするためのSharedFlowを定義します。
kotlin
val messageResponseFlow = MutableSharedFlow<MessageResponse>()
val sharedFlow = messageResponseFlow.asSharedFlow()
  1. WebSocketルートで、ブロードキャストとメッセージ処理ロジックを実装します。
kotlin

        webSocket("/ws") {
            send("You are connected to WebSocket!")

            val job = launch {
                sharedFlow.collect { message ->
                    send(message.message)
                }
            }

            runCatching {
                incoming.consumeEach { frame ->
                    if (frame is Frame.Text) {
                        val receivedText = frame.readText()
                        val messageResponse = MessageResponse(receivedText)
                        messageResponseFlow.emit(messageResponse)
                    }
                }
            }.onFailure { exception ->
                println("WebSocket exception: ${exception.localizedMessage}")
            }.also {
                job.cancel()
            }
        }

runCatchingブロックは、受信メッセージを処理し、それらをSharedFlowに発行し、その後すべてのコレクターにブロードキャストします。

このパターンを使用することで、個々の接続を手動で追跡することなく、複数のWebSocketセッションを効率的に管理できます。このアプローチは、多数の同時WebSocket接続を持つアプリケーションでうまくスケールし、メッセージブロードキャストを処理するためのクリーンでリアクティブな方法を提供します。

完全な例については、server-websockets-sharedflowを参照してください。

WebSocket APIとKtor

WebSocket APIの標準イベントは、Ktorでは次のようにマッピングされます。

  • onConnectはブロックの開始時に発生します。
  • onMessageは、メッセージを正常に読み取った後(例: incoming.receive()を使用した場合)、またはfor(frame in incoming)でサスペンドされたイテレーションを使用した後に発生します。
  • onCloseは、incomingチャネルが閉じられたときに発生します。これにより、サスペンドされたイテレーションが完了するか、メッセージを受信しようとしたときにClosedReceiveChannelExceptionがスローされます。
  • onErrorは他の例外と同等です。

onCloseonErrorの両方で、closeReasonプロパティが設定されます。

以下の例では、無限ループは例外が発生した場合にのみ終了します(ClosedReceiveChannelExceptionまたは他の例外)。

kotlin
webSocket("/echo") {
    println("onConnect")
    try {
        for (frame in incoming){
            val text = (frame as Frame.Text).readText()
            println("onMessage")
            received += text
            outgoing.send(Frame.Text(text))
        }
    } catch (e: ClosedReceiveChannelException) {
        println("onClose ${closeReason.await()}")
    } catch (e: Throwable) {
        println("onError ${closeReason.await()}")
        e.printStackTrace()
    }
}