Azure App サービスでKafka のIFTTT フローをトリガー

杉本和也
杉本和也
リードエンジニア
この記事では、Logic Apps の標準ウィザードを使用してIFTTT (if-this-then-that) ワークフローを自動化する方法を説明します。



Azure Logic Apps は ノーコードでアプリを構築できるクラウドサービスです。CData API Server と連携することで、ノーコードアプリ開発用のKafka のデータへのクラウドベースのアクセスをノーコードで追加できます。本記事では、API Server 経由でAzure Logic Apps からKafka 連携を実現する方法を紹介します。

API Server の設定

以下のリンクからAPI Server の無償トライアルをスタートしたら、セキュアなKafka OData サービスを作成していきましょう。

Kafka への接続

Azure Logic Apps からKafka のデータを操作するには、まずKafka への接続を作成・設定します。

  1. API Server にログインして、「Connections」をクリック、さらに「接続を追加」をクリックします。 接続を追加
  2. 「接続を追加」をクリックして、データソースがAPI Server に事前にインストールされている場合は、一覧から「Kafka」を選択します。
  3. 事前にインストールされていない場合は、コネクタを追加していきます。コネクタ追加の手順は以下の記事にまとめてありますので、ご確認ください。
    CData コネクタの追加方法はこちら >>
  4. それでは、Kafka への接続設定を行っていきましょう! 接続設定
  5. Apache Kafka 接続プロパティの取得・設定方法

    それでは、Apache Kafka に接続していきましょう。.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされており、CData 製品と一緒に自動的にインストールされます。 別のインストール方法をご利用の場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0をインストールしてください。

    Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。

    デフォルトでは、CData 製品はデータソースとPLAINTEXT で通信しており、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化したい場合は、以下の設定を行ってください:

    1. UseSSLtrue に設定し、CData 製品がSSL 暗号化を使用するように構成します
    2. SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします

    Apache Kafka への認証

    続いて、認証方法を設定しましょう。Apache Kafka データソースでは、以下の認証方法をサポートしています:

    • Anonymous
    • Plain
    • SCRAM ログインモジュール
    • SSL クライアント証明書
    • Kerberos

    Anonymous 認証

    Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 このような接続はanonymous(匿名)と呼ばれます。

    匿名認証を行うには、以下のプロパティを設定してください。

    • AuthSchemeNone

    その他の認証方法については、ヘルプドキュメントをご確認ください。

  6. 接続情報の入力が完了したら、「保存およびテスト」をクリックします。

Apache Kafka 接続プロパティの取得・設定方法

それでは、Apache Kafka に接続していきましょう。.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされており、CData 製品と一緒に自動的にインストールされます。 別のインストール方法をご利用の場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0をインストールしてください。

Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。

デフォルトでは、CData 製品はデータソースとPLAINTEXT で通信しており、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化したい場合は、以下の設定を行ってください:

  1. UseSSLtrue に設定し、CData 製品がSSL 暗号化を使用するように構成します
  2. SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします

Apache Kafka への認証

続いて、認証方法を設定しましょう。Apache Kafka データソースでは、以下の認証方法をサポートしています:

  • Anonymous
  • Plain
  • SCRAM ログインモジュール
  • SSL クライアント証明書
  • Kerberos

Anonymous 認証

Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 このような接続はanonymous(匿名)と呼ばれます。

匿名認証を行うには、以下のプロパティを設定してください。

  • AuthSchemeNone

その他の認証方法については、ヘルプドキュメントをご確認ください。

API Server のユーザー設定

次に、API Server 経由でKafka にアクセスするユーザーを作成します。「Users」ページでユーザーを追加・設定できます。やってみましょう。

  1. 「Users」ページで ユーザーを追加をクリックすると、「ユーザーを追加」ポップアップが開きます。
  2. 次に、「ロール」、「ユーザー名」、「権限」プロパティを設定し、「ユーザーを追加」をクリックします。
  3. その後、ユーザーの認証トークンが生成されます。各ユーザーの認証トークンとその他の情報は「Users」ページで確認できます。

Kafka 用のAPI エンドポイントの作成

ユーザーを作成したら、Kafka のデータ用のAPI エンドポイントを作成していきます。

  1. まず、「API」ページに移動し、 「 テーブルを追加」をクリックします。
  2. アクセスしたい接続を選択し、次へをクリックします。
  3. 接続を選択した状態で、各テーブルを選択して確認をクリックすることでエンドポイントを作成します。

OData のエンドポイントを取得

以上でKafka への接続を設定してユーザーを作成し、API Server でKafka データのAPI を追加しました。これで、OData 形式のKafka データをREST API で利用できます。API Server の「API」ページから、API のエンドポイントを表示およびコピーできます。

コネクションとOData エンドポイントを設定したら、Azure Logic Apps からKafka のデータに接続できます。

Logic App でKafka にアクセス

Logic App でAPI Server を使用し、Kafka の周りにプロセスフローを作成できます。HTTP + Swagger アクションは、Kafka に対して実行する操作を定義するためのウィザードを提供します。以下のステップでは、Logic App でKafka を取得する方法を説明しています。

テーブルにレコードの作成日を含むカラムがある場合は、以下のステップに従って新しいレコードのカラム値をチェックする関数を作成できます。それ以外の場合は、[Create a Logic App]セクションにスキップし、フィルタに一致するエンティティにメールを送信します。

新しいKafka エンティティを確認

特定の新しいKafka エンティティを見つけるために、インターバルの開始日時の値を取得する関数を作成できます。

  1. [Azure Portal]で、[New]->[Function App]->[Create]と進みます。
  2. 名前を入力し、サブスクリプション、リソースグループ、App Service プラン、そしてストレージアカウントを選択します。
  3. Function App を選択し、Webhook + API シナリオを選択します。
  4. 言語を選択します。この記事では、JavaScript を使用します。
  5. 以下のコードを追加し、JSON オブジェクトで前の時間を返します。
    module.exports = function (context, data) {
      var d = new Date();
      d.setHours(d.getHours()-1);
      // Response of the function to be used later.
      context.res = {
        body: {
          start: d
        }
      };
      context.done();
    };
    

トリガーにKafka を追加

以下のステップに従って、フィルタに一致する結果をKafka で検索するトリガーを作成します。上記の関数を作成した場合は、返されたインターバルの開始後に作成されたオブジェクトを検索できます。

  1. Azure Portal で[New]をクリックし、[Web + Mobile]セクションで[Logic App]を選択してリソースグループとApp Service プランを選択します。
  2. これで、Logic App Designer で使用可能なウィザードが使用できます。このウィザードには、Logic App の設定ブレードからアクセスできます。Blank Logic App templateを選択します。
  3. Kafka オブジェクトをポーリングするRecurrence アクションを追加します。この記事では、一時間ごとにポーリングします。タイムゾーンを選択します。デフォルトはUTC です。
  4. 関数アクションを追加します。[Add Action]ダイアログのメニューを展開し、リジョンにAzure 関数を表示するオプションを選択します。先に作成したFunction App を選択し、インターバル開始を返す関数を選択します。
  5. からの中括弧のペア({})を入力し、からのペイロードオブジェクトを関数に渡します。
  6. HTTP + Swagger アクションを追加し、API Server のOData エンドポイントを入力します。
    http://MySite:MyPort/api.rsc/@MyAuthtoken/$oas
  7. [Return SampleTable_1]操作を選択します。
  8. 各プロパティの説明を使用して、取得する列やフィルターなどの追加パラメータを指定します。以下はフィルタの一例です。

    Column2 eq '100'

  9. getInterval 関数から返された日時値を使用するには、SampleTable_1 テーブルの日時列で[ge]演算子を使用し、ダイアログでBody パラメータを選択します。日時値を囲むには、クォーテーションを使用する必要があることに注意して下さい。

    An OData filter on the results of an Azure Function App, getToday.(Salesforce is shown.)
  10. [Code View]に切り替え、$filter 式を変更してインターバルの開始を含むプロパティを抽出します。'@{body('MyFunc')['MyProp']' 構文を使用します。

    "getAllAccount": {
      "inputs": {
        "method": "get",
          "queries": {
            "$filter":"CreatedDate ge '@{body('getInterval')['start']}'"
          },
          "uri": "https://MySite:MyPort/api.rsc/@MyAuthtoken/SampleTable_1"
      }
    

これで、ワークフローのデータソースおよび宛先としてKafka にアクセスできます。

新しレコードをメールで送信

以下のステップに従って、新しいSampleTable_1 エンティティを含むレポートをメールで送信します。

  1. [Logic Apps Designer]で[SMTP - Send Email]アクションを追加します。
  2. SMTP サーバーに必要な情報を構成します。
  3. From、To、Subject、Body を構成します。返されたKafka 列からパラメータを追加できます。

[Save]をクリックし、[Run]をクリックして過去一時間に作成されたKafka レコードに関する電子メール通知を送信します。

Dynamic columns added to template the body of an email.(Salesforce is shown.)

クラウドアプリケーションからKafka のデータへのライブ接続

Azure Logic Apps からKafka のリアルタイムデータに直接接続できるようになりました。これで、Kafka のデータを複製せずにより多くの接続とアプリを作成できます。

クラウドアプリケーションから直接100を超えるSaaS 、ビッグデータ、NoSQL ソースへのリアルタイムデータアクセスを取得するには、API Server を参照してください。

はじめる準備はできましたか?

詳細はこちら、または無料トライアルにお申し込みください:

CData API Server お問い合わせ