Google Data Fusion で Kafka に連携した ETL プロセスを作成

Jerod Johnson
Jerod Johnson
Senior Technology Evangelist
CData JDBC Driver を Google Data Fusion にロードし、Kafka のデータ にリアルタイムでアクセスできる ETL プロセスを作成します。

Google Data Fusion を使用すると、セルフサービス型のデータ連携を行い、異なるデータソースを統合できます。CData JDBC Driver for Apache Kafka をアップロードすることで、Google Data Fusion のパイプライン内から Kafka のデータ にリアルタイムでアクセスできるようになります。CData JDBC Driver を使用すると、Kafka のデータ を Google Data Fusion でネイティブにサポートされている任意のデータソースにパイプできますが、この記事では、Kafka から Google BigQuery へデータをパイプする方法を説明します。

CData JDBC Driver for Apache Kafka を Google Data Fusion にアップロード

CData JDBC Driver for Apache Kafka を Google Data Fusion インスタンスにアップロードして、Kafka のデータ にリアルタイムでアクセスしましょう。Google Data Fusion では JDBC ドライバーの命名規則に制限があるため、JAR ファイルを driver-version.jar という形式に合わせてコピーまたはリネームしてください。例:cdataapachekafka-2020.jar

  1. Google Data Fusion インスタンスを開きます
  2. をクリックしてエンティティを追加し、ドライバーをアップロードします
  3. "Upload driver" タブで、リネームした JAR ファイルをドラッグまたは参照します。
  4. "Driver configuration" タブで以下を設定します:
    • Name: ドライバーの名前(cdata.jdbc.apachekafka)を作成し、メモしておきます
    • Class name: JDBC クラス名を設定します:(cdata.jdbc.apachekafka.ApacheKafkaDriver)
  5. "Finish" をクリックします

Google Data Fusion で Kafka のデータ に接続

JDBC Driver をアップロードしたら、Google Data Fusion のパイプラインで Kafka のデータ にリアルタイムでアクセスできます。

  1. Pipeline Studio に移動して、新しいパイプラインを作成します
  2. "Source" オプションから "Database" をクリックして、JDBC Driver 用のソースを追加します
  3. Database ソースの "Properties" をクリックしてプロパティを編集します

    NOTE:Google Data Fusion で JDBC Driver を使用するには、ライセンス(製品版またはトライアル)とランタイムキー(RTK)が必要です。ライセンス(またはトライアル)の取得については、CData までお問い合わせください。

    • Label を設定します
    • Reference Name を将来の参照用の値に設定します(例:cdata-apachekafka)
    • Plugin Type を "jdbc" に設定します
    • Connection String を Kafka の JDBC URL に設定します。例:

      jdbc:apachekafka:RTK=5246...;User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;

      Apache Kafka 接続プロパティの取得・設定方法

      それでは、Apache Kafka に接続していきましょう。.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされており、CData 製品と一緒に自動的にインストールされます。 別のインストール方法をご利用の場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0をインストールしてください。

      Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。

      デフォルトでは、CData 製品はデータソースとPLAINTEXT で通信しており、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化したい場合は、以下の設定を行ってください:

      1. UseSSLtrue に設定し、CData 製品がSSL 暗号化を使用するように構成します
      2. SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします

      Apache Kafka への認証

      続いて、認証方法を設定しましょう。Apache Kafka データソースでは、以下の認証方法をサポートしています:

      • Anonymous
      • Plain
      • SCRAM ログインモジュール
      • SSL クライアント証明書
      • Kerberos

      Anonymous 認証

      Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 このような接続はanonymous(匿名)と呼ばれます。

      匿名認証を行うには、以下のプロパティを設定してください。

      • AuthSchemeNone

      その他の認証方法については、ヘルプドキュメントをご確認ください。

      ビルトイン接続文字列デザイナー

      JDBC URL の作成には、Kafka JDBC Driver に組み込まれている接続文字列デザイナーを使用できます。JAR ファイルをダブルクリックするか、コマンドラインから JAR ファイルを実行してください。

            java -jar cdata.jdbc.apachekafka.jar
            

      接続プロパティを入力し、接続文字列をクリップボードにコピーします。

    • Import Query を Kafka から取得したいデータを抽出する SQL クエリに設定します。例:
      SELECT * FROM SampleTable_1
  4. "Sink" タブから、同期先シンクを追加します(この例では Google BigQuery を使用します)
  5. BigQuery シンクの "Properties" をクリックしてプロパティを編集します
    • Label を設定します
    • Reference Name を apachekafka-bigquery のような値に設定します
    • Project ID を特定の Google BigQuery プロジェクト ID に設定します(またはデフォルトの "auto-detect" のままにします)
    • Dataset を特定の Google BigQuery データセットに設定します
    • Table を Kafka のデータ を挿入するテーブル名に設定します

Source と Sink を設定すると、Kafka のデータ を Google BigQuery にパイプする準備が整います。パイプラインを保存してデプロイしてください。パイプラインを実行すると、Google Data Fusion が Kafka からリアルタイムデータをリクエストし、Google BigQuery にインポートします。

これはシンプルなパイプラインの例ですが、変換、分析、条件などを使用してより複雑な Kafka パイプラインを作成できます。CData JDBC Driver for Apache Kafka の 30日間の無償トライアルをダウンロードして、今すぐ Google Data Fusion で Kafka のデータ をリアルタイムで活用しましょう。

はじめる準備はできましたか?

Apache Kafka Driver の無料トライアルをダウンロードしてお試しください:

 ダウンロード

詳細:

Apache Kafka Icon Apache Kafka JDBC Driver お問い合わせ

Apache Kafka データに連携するJava アプリケーションを素早く、簡単に開発できる便利なドライバー。