Kafka のデータをAI アシスタントのPEP から利用する方法

杉本和也
杉本和也
リードエンジニア
AI アシスタントPEP から Kafka のデータに連携し、チャットボットでリアルタイムKafka のデータを使う方法



CData API Server と ADO.NET Provider / JDBC Driver for ApacheKafka を使って、AI アシスタントPEP(https://pep.work/) から Kafka に接続して、チャットボットでリアルタイムKafkaデータを使った応答を可能にする方法を説明します。

API Server の設定

以下のリンクからAPI Server の無償トライアルをスタートしたら、セキュアなKafka OData サービスを作成していきましょう。

Kafka への接続

PEP からKafka のデータを操作するには、まずKafka への接続を作成・設定します。

  1. API Server にログインして、「Connections」をクリック、さらに「接続を追加」をクリックします。 接続を追加
  2. 「接続を追加」をクリックして、データソースがAPI Server に事前にインストールされている場合は、一覧から「Kafka」を選択します。
  3. 事前にインストールされていない場合は、コネクタを追加していきます。コネクタ追加の手順は以下の記事にまとめてありますので、ご確認ください。
    CData コネクタの追加方法はこちら >>
  4. それでは、Kafka への接続設定を行っていきましょう! 接続設定
  5. Apache Kafka 接続プロパティの取得・設定方法

    それでは、Apache Kafka に接続していきましょう。.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされており、CData 製品と一緒に自動的にインストールされます。 別のインストール方法をご利用の場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0をインストールしてください。

    Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。

    デフォルトでは、CData 製品はデータソースとPLAINTEXT で通信しており、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化したい場合は、以下の設定を行ってください:

    1. UseSSLtrue に設定し、CData 製品がSSL 暗号化を使用するように構成します
    2. SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします

    Apache Kafka への認証

    続いて、認証方法を設定しましょう。Apache Kafka データソースでは、以下の認証方法をサポートしています:

    • Anonymous
    • Plain
    • SCRAM ログインモジュール
    • SSL クライアント証明書
    • Kerberos

    Anonymous 認証

    Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 このような接続はanonymous(匿名)と呼ばれます。

    匿名認証を行うには、以下のプロパティを設定してください。

    • AuthSchemeNone

    その他の認証方法については、ヘルプドキュメントをご確認ください。

  6. 接続情報の入力が完了したら、「保存およびテスト」をクリックします。

Apache Kafka 接続プロパティの取得・設定方法

それでは、Apache Kafka に接続していきましょう。.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされており、CData 製品と一緒に自動的にインストールされます。 別のインストール方法をご利用の場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0をインストールしてください。

Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。

デフォルトでは、CData 製品はデータソースとPLAINTEXT で通信しており、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化したい場合は、以下の設定を行ってください:

  1. UseSSLtrue に設定し、CData 製品がSSL 暗号化を使用するように構成します
  2. SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします

Apache Kafka への認証

続いて、認証方法を設定しましょう。Apache Kafka データソースでは、以下の認証方法をサポートしています:

  • Anonymous
  • Plain
  • SCRAM ログインモジュール
  • SSL クライアント証明書
  • Kerberos

Anonymous 認証

Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 このような接続はanonymous(匿名)と呼ばれます。

匿名認証を行うには、以下のプロパティを設定してください。

  • AuthSchemeNone

その他の認証方法については、ヘルプドキュメントをご確認ください。

API Server のユーザー設定

次に、API Server 経由でKafka にアクセスするユーザーを作成します。「Users」ページでユーザーを追加・設定できます。やってみましょう。

  1. 「Users」ページで ユーザーを追加をクリックすると、「ユーザーを追加」ポップアップが開きます。
  2. 次に、「ロール」、「ユーザー名」、「権限」プロパティを設定し、「ユーザーを追加」をクリックします。
  3. その後、ユーザーの認証トークンが生成されます。各ユーザーの認証トークンとその他の情報は「Users」ページで確認できます。

Kafka 用のAPI エンドポイントの作成

ユーザーを作成したら、Kafka のデータ用のAPI エンドポイントを作成していきます。

  1. まず、「API」ページに移動し、 「 テーブルを追加」をクリックします。
  2. アクセスしたい接続を選択し、次へをクリックします。
  3. 接続を選択した状態で、各テーブルを選択して確認をクリックすることでエンドポイントを作成します。

OData のエンドポイントを取得

以上でKafka への接続を設定してユーザーを作成し、API Server でKafka データのAPI を追加しました。これで、OData 形式のKafka データをREST API で利用できます。API Server の「API」ページから、API のエンドポイントを表示およびコピーできます。

オンプレミスDB やファイルからのAPI Server 使用(オプション)

オンプレミスRDB やExcel/CSV などのファイルのデータを使用する場合には、API Server のCloug Gateway / SSH ポートフォワーディングが便利です。是非、Cloud Gatway の設定方法 記事を参考にしてください。

PEP でKafka のデータをチャットで使う設定

前項まででAPI ができたので、ここからはPEP 側での設定作業です。

PEP でKafka のAPI をコールするシナリオを作成

PEP 側ではあらかじめSlack などのアプリケーションを構成しておきます。ここにAPI Server をコールするAI アシスタントのシナリオを追加していきます。

PEP
  • 「会話編集」→「シナリオ」に移動し、新しいシナリオを作成します。
  • PEP
  • シナリオ名を入力します。この例では、条件はSlack Homeを 選択しています。
  • PEP
  • 「新規作成」ボタンをクリックすると、以下のようなシナリオエディタ画面に移動します。これでシナリオを作成する準備ができました。
  • PEP

Kafka のAPI の接続箇所の設定

具体的にAPI連携を含めたシナリオを作っていきます。

  • 最初に検索したい文字列を受け取るための対話ノードを配置して、以下のように構成します。
  • PEP
  • ポイントは分岐条件で「以下のすべての会話」、発言内容の記憶で「film_name」として、会話した内容を変数に保存することです。
  • PEP
  • 対話ノードを配置したら、APIノードを追加して、対話ノードから接続していきます。
  • PEP
  • APIノードでは、以下のように CData API Server へのリクエストを構成します。
    ポイントは2つです。CData API Server は Filter 機能によって、取得するデータを絞り込むことができます。その指定が「$filter=contains(title,'{{ state.scenario_4.form.film_name }}')」の部分です。 「{{ state.scenario_4.form.film_name }}」は直前の対話ノードで設定した変数をパラメータとして渡しています。「scenario_4」はPEP のシナリオIDですので、作成したシナリオに合わせて変更してください。
  • HTTPヘッダーにAPI Server のユーザートークンを指定します。
    • API名:任意の名前を設定
    • URL:http://apiserverurl/api.rsc/film_list?$filter=contains(title,'{{ state.scenario_4.form.film_name }}')
    • HTTPメソッド:GET
    • Content-Type:JSON
    • HTTPヘッダー:キー「x-cdata-authtoken」、値「予め取得したAPI Serverのトークン」を指定
  • PEP

これで保存すると、API ノードが使えるようになります。

取得したKafka からのレスポンスの表示方法

最後に後続の対話ノードを構成し、取得した検索結果を表示します。

PEP

対話ノードの応答文には、以下のような文章を埋め込みます。

検索結果は以下のとおりです。

    {% for item in state.scenario_4.api_response.film_search.value %}
    フィルムID:{{ item.FID }}
    タイトル : {{ item.title }}
    値段:{{ item.price }}
    カテゴリー:{{ item.category }}
    俳優: -----

    {% endfor %}
  
PEP

ここでポイントになるのは、API Server から受け取るJSONのレスポンスの分解方法です。 API Server のレスポンスはオブジェクト直下にvalueという配列要素があり、このレスポンスは「{{state.scenario_4.api_response.film_search.value}}」の形でアクセスできます。
設定イメージ:state.シナリオID.api_response.APIノード名.value
これを「FOR」での繰り返し処理で取得して、文章として表示する仕組みになっています。

PEP

これで設定は完了です。実際に動かしてみると、Kafka →API Server → PEP 経由でデータを取得して表示していることがわかるかと思います。

CData API Server の無償版およびトライアル

CData API Server は、無償版および30日の無償トライアルがあります。是非、API Server ページ から製品をダウンロードしてお試しください。

はじめる準備はできましたか?

詳細はこちら、または無料トライアルにお申し込みください:

CData API Server お問い合わせ