Pentaho Data Integration でKafka のデータを連携

Jerod Johnson
Jerod Johnson
Senior Technology Evangelist
Pentaho Data Integration で Kafka のデータ をベースにした ETL パイプラインを構築します。

CData JDBC Driver for Apache Kafka を使用すると、データパイプラインからリアルタイムデータにアクセスできます。Pentaho Data Integration は、ETL(Extraction, Transformation, and Loading)エンジンであり、データをクレンジングし、アクセス可能な統一フォーマットでデータを格納します。この記事では、Kafka のデータ に JDBC データソースとして接続し、Pentaho Data Integration で Kafka のデータ をベースにしたジョブやトランスフォーメーションを構築する方法を説明します。

Kafka への接続を設定

Apache Kafka 接続プロパティの取得・設定方法

それでは、Apache Kafka に接続していきましょう。.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされており、CData 製品と一緒に自動的にインストールされます。 別のインストール方法をご利用の場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0をインストールしてください。

Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。

デフォルトでは、CData 製品はデータソースとPLAINTEXT で通信しており、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化したい場合は、以下の設定を行ってください:

  1. UseSSLtrue に設定し、CData 製品がSSL 暗号化を使用するように構成します
  2. SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします

Apache Kafka への認証

続いて、認証方法を設定しましょう。Apache Kafka データソースでは、以下の認証方法をサポートしています:

  • Anonymous
  • Plain
  • SCRAM ログインモジュール
  • SSL クライアント証明書
  • Kerberos

Anonymous 認証

Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 このような接続はanonymous(匿名)と呼ばれます。

匿名認証を行うには、以下のプロパティを設定してください。

  • AuthSchemeNone

その他の認証方法については、ヘルプドキュメントをご確認ください。

組み込みの接続文字列デザイナー

JDBC URL の構築を支援するには、Kafka JDBC Driver に組み込まれている接続文字列デザイナーを使用してください。JAR ファイルをダブルクリックするか、コマンドラインから JAR ファイルを実行します。

java -jar cdata.jdbc.apachekafka.jar

接続プロパティを設定し、接続文字列をクリップボードにコピーします。

JDBC URL を設定する際には、Max Rows 接続プロパティの設定も検討してください。これにより返される行数が制限され、レポートやビジュアライゼーションの設計時にパフォーマンスを向上させることができます。

一般的な JDBC URL は次のようになります:

jdbc:apachekafka:User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;

接続文字列を保存して、Pentaho Data Integration で使用します。

Pentaho DI から Kafka に接続

Pentaho Data Integration を開き、「Database Connection」を選択して CData JDBC Driver for Apache Kafka への接続を設定します。

  1. 「General」をクリックします。
  2. Connection name を設定します(例:Kafka Connection)。
  3. Connection type を「Generic database」に設定します。
  4. Access を「Native (JDBC)」に設定します。
  5. Custom connection URL に Kafka の接続文字列を設定します(例:
    jdbc:apachekafka:User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;
    )。
  6. Custom driver class name を「cdata.jdbc.apachekafka.ApacheKafkaDriver」に設定します。
  7. 接続をテストし、「OK」をクリックして保存します。

Kafka のデータパイプラインを作成

CData JDBC Driver を使用して Kafka への接続が設定されたら、新しいトランスフォーメーションまたはジョブを作成する準備が整いました。

  1. 「File」>>「New」>>「Transformation/job」をクリックします。
  2. 「Table input」オブジェクトをワークフローパネルにドラッグし、Kafka 接続を選択します。
  3. 「Get SQL select statement」をクリックし、Database Explorer を使用して利用可能なテーブルとビューを表示します。
  4. テーブルを選択し、必要に応じてデータをプレビューして確認します。

ここから、適切な同期先を選択し、レプリケーション中にデータを変更、フィルタリング、その他の処理を行うトランスフォーメーションを追加することで、トランスフォーメーションまたはジョブを続行できます。

無料トライアルと詳細情報

CData JDBC Driver for Apache Kafka の 30日間無料トライアルをダウンロードして、Pentaho Data Integration で Kafka のデータ のリアルタイムデータを今すぐ活用しましょう。

はじめる準備はできましたか?

Apache Kafka Driver の無料トライアルをダウンロードしてお試しください:

 ダウンロード

詳細:

Apache Kafka Icon Apache Kafka JDBC Driver お問い合わせ

Apache Kafka データに連携するJava アプリケーションを素早く、簡単に開発できる便利なドライバー。