Pentaho Data Integration でKafka のデータを連携
CData JDBC Driver for Apache Kafka を使用すると、データパイプラインからリアルタイムデータにアクセスできます。Pentaho Data Integration は、ETL(Extraction, Transformation, and Loading)エンジンであり、データをクレンジングし、アクセス可能な統一フォーマットでデータを格納します。この記事では、Kafka のデータ に JDBC データソースとして接続し、Pentaho Data Integration で Kafka のデータ をベースにしたジョブやトランスフォーメーションを構築する方法を説明します。
Kafka への接続を設定
Apache Kafka 接続プロパティの取得・設定方法
それでは、Apache Kafka に接続していきましょう。.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされており、CData 製品と一緒に自動的にインストールされます。 別のインストール方法をご利用の場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0をインストールしてください。
Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。
デフォルトでは、CData 製品はデータソースとPLAINTEXT で通信しており、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化したい場合は、以下の設定を行ってください:
- UseSSL をtrue に設定し、CData 製品がSSL 暗号化を使用するように構成します
- SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします
Apache Kafka への認証
続いて、認証方法を設定しましょう。Apache Kafka データソースでは、以下の認証方法をサポートしています:
- Anonymous
- Plain
- SCRAM ログインモジュール
- SSL クライアント証明書
- Kerberos
Anonymous 認証
Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 このような接続はanonymous(匿名)と呼ばれます。
匿名認証を行うには、以下のプロパティを設定してください。
- AuthScheme:None
その他の認証方法については、ヘルプドキュメントをご確認ください。
組み込みの接続文字列デザイナー
JDBC URL の構築を支援するには、Kafka JDBC Driver に組み込まれている接続文字列デザイナーを使用してください。JAR ファイルをダブルクリックするか、コマンドラインから JAR ファイルを実行します。
java -jar cdata.jdbc.apachekafka.jar
接続プロパティを設定し、接続文字列をクリップボードにコピーします。
JDBC URL を設定する際には、Max Rows 接続プロパティの設定も検討してください。これにより返される行数が制限され、レポートやビジュアライゼーションの設計時にパフォーマンスを向上させることができます。
一般的な JDBC URL は次のようになります:
jdbc:apachekafka:User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;
接続文字列を保存して、Pentaho Data Integration で使用します。
Pentaho DI から Kafka に接続
Pentaho Data Integration を開き、「Database Connection」を選択して CData JDBC Driver for Apache Kafka への接続を設定します。
- 「General」をクリックします。
- Connection name を設定します(例:Kafka Connection)。
- Connection type を「Generic database」に設定します。
- Access を「Native (JDBC)」に設定します。
- Custom connection URL に Kafka の接続文字列を設定します(例:
jdbc:apachekafka:User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;
)。 - Custom driver class name を「cdata.jdbc.apachekafka.ApacheKafkaDriver」に設定します。
- 接続をテストし、「OK」をクリックして保存します。
Kafka のデータパイプラインを作成
CData JDBC Driver を使用して Kafka への接続が設定されたら、新しいトランスフォーメーションまたはジョブを作成する準備が整いました。
- 「File」>>「New」>>「Transformation/job」をクリックします。
- 「Table input」オブジェクトをワークフローパネルにドラッグし、Kafka 接続を選択します。
- 「Get SQL select statement」をクリックし、Database Explorer を使用して利用可能なテーブルとビューを表示します。
- テーブルを選択し、必要に応じてデータをプレビューして確認します。
ここから、適切な同期先を選択し、レプリケーション中にデータを変更、フィルタリング、その他の処理を行うトランスフォーメーションを追加することで、トランスフォーメーションまたはジョブを続行できます。
無料トライアルと詳細情報
CData JDBC Driver for Apache Kafka の 30日間無料トライアルをダウンロードして、Pentaho Data Integration で Kafka のデータ のリアルタイムデータを今すぐ活用しましょう。