Mule アプリケーションからKafka のデータにアクセス:CData JDBC Driver
CData JDBC Driver for ApacheKafka はKafka のデータをMule アプリケーションと連携することで、読み、書き、更新、削除といった機能をおなじみのSQL クエリを使って実現します。JDBC ドライバーを使えば、Kafka のデータをバックアップ、変換、レポート作成、分析するMule アプリケーションをユーザーは簡単に作成できます。
本記事では、Mule プロジェクト内でCData JDBC Driver for ApacheKafka を使用して、Kafka のデータのWeb インターフェースを作成する方法を紹介します。作成したアプリケーションを使えば、HTTP 経由でKafka のデータをリクエストして、JSON 形式で結果を取得できます。まったく同様の手順で、すべてのCData JDBC ドライバで250 を超えるデータソースのWeb インターフェースを作成できます。手順は以下のとおりです。
- Anypoint Studio で新しいMule プロジェクトを作る。
- Message Flow にHTTP コネクタを追加する。
- HTTP コネクタのアドレスを設定する。

- HTTP コネクタの追加後、Database Select コネクタを同じフローに追加する。
- データベースへの新しい接続を作成し(または既存の接続を編集し)、プロパティを設定する。
- 接続を「Generic Connection」に設定
- Required Libraries セクションでCData JDBC ドライバのJAR ファイルを指定する(例:cdata.jdbc.apachekafka.jar)。
- Kafka の接続文字列にURL を指定
Apache Kafka 接続プロパティの取得・設定方法
それでは、Apache Kafka に接続していきましょう。.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされており、CData 製品と一緒に自動的にインストールされます。 別のインストール方法をご利用の場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0をインストールしてください。
Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。
デフォルトでは、CData 製品はデータソースとPLAINTEXT で通信しており、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化したい場合は、以下の設定を行ってください:
- UseSSL をtrue に設定し、CData 製品がSSL 暗号化を使用するように構成します
- SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします
Apache Kafka への認証
続いて、認証方法を設定しましょう。Apache Kafka データソースでは、以下の認証方法をサポートしています:
- Anonymous
- Plain
- SCRAM ログインモジュール
- SSL クライアント証明書
- Kerberos
Anonymous 認証
Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 このような接続はanonymous(匿名)と呼ばれます。
匿名認証を行うには、以下のプロパティを設定してください。
- AuthScheme:None
その他の認証方法については、ヘルプドキュメントをご確認ください。
組み込みの接続文字列デザイナ
JDBC 用のURL の作成にサポートが必要な場合は、Kafka JDBC Driver に組み込まれた接続文字列デザイナを使用できます。JAR ファイルをダブルクリックするか、コマンドラインからJAR ファイルを実行してください。
java -jar cdata.jdbc.apachekafka.jar
接続プロパティを入力して、接続文字列をクリップボードにコピーします。
- Driver クラス名をcdata.jdbc.apachekafka.ApacheKafkaDriver に指定します。
- 「接続テスト」をクリックします。
- SQL Query Text をKafka のデータをリクエストするためのSQL クエリに設定します。例えば、
SELECT Id, Column1 FROM SampleTable_1 WHERE Column2 = '100'
。
- Transform Message コンポーネントをフローに追加します。
- Output スクリプトを次のように設定して、ペイロードをJSON に変換します。
%dw 2.0 output application/json --- payload
- Kafka のデータを閲覧するには、HTTP コネクタ用に設定したアドレスに移動します(デフォルトでは、localhost:8081):http://localhost:8081。Web ブラウザおよびJSON エンドポイントを使用可能な他のツール内で、Kafka のデータをJSON として利用できます。
これで、カスタムアプリケーションおよび他のさまざまなBI、帳票、ETL ツールからKafka のデータを(JSON データとして)扱うための簡易なWeb インターフェースを作成できました。Mule アプリケーションからお好みのデータソースにアクセスできる、JDBC Driver for ApacheKafka の30日の無償評価版のダウンロードはこちらから。