Apache NiFi で Kafka に接続
Apache NiFi は、強力かつスケーラブルなデータルーティング、変換、システム間連携ロジックの有向グラフをサポートしています。CData JDBC Driver for Apache Kafka と組み合わせることで、NiFi からリアルタイムのKafka のデータ を操作できます。この記事では、Apache NiFi Flow からKafka のデータ に接続してクエリを実行する方法を説明します。
CData JDBC Driver は、最適化されたデータ処理機能が組み込まれており、リアルタイムのKafka のデータ とのやり取りにおいて比類のないパフォーマンスを提供します。複雑なSQL クエリをKafka に発行すると、ドライバーはフィルタや集計などのサポートされているSQL 操作を直接Kafka にプッシュし、サポートされていない操作(多くの場合SQL 関数やJOIN 操作)は組み込みのSQL エンジンでクライアント側に処理します。また、組み込みの動的メタデータクエリにより、ネイティブのデータ型を使用してKafka のデータ を操作・分析できます。
Apache NiFi でKafka のデータ に接続
- CData JDBC Driver for Apache Kafka のインストーラーをダウンロードし、パッケージを解凍して、.exe ファイルを実行してドライバーをインストールします。
CData JDBC Driver のJAR ファイル(およびライセンスファイルがある場合はそれも)、cdata.jdbc.apachekafka.jar(および cdata.jdbc.apachekafka.lic)を Apache NiFi の lib サブフォルダにコピーします(例:C:\nifi-1.3.0-bin\nifi-1.3.0\lib)。
Windows では、CData JDBC Driver のデフォルトのインストール場所は C:\Program Files\CData\CData JDBC Driver for Apache Kafka です。
bin サブフォルダにある run-nifi.bat ファイルを実行して Apache NiFi を起動します(例:C:\nifi-1.3.0-bin\nifi-1.3.0\bin)。
(または)
コマンドプロンプトで対象のディレクトリに移動し、run-nifi.bat ファイルを実行します:
cd C:\nifi-1.3.0-bin\nifi-1.3.0\bin .\run-nifi.bat
Web ブラウザで Apache NiFi の UI に移動します:https://localhost:8443/nifi でアクセスできます。
注意:古いバージョンの Apache NiFi を使用している場合は、http://localhost:8080/nifi からアクセスする必要があります。以前のバージョンでは HTTP プロトコルが使用されていましたが、最新バージョンでは HTTPS が標準になっています。デフォルトでは、HTTP はポート 8080 で動作し、HTTPS はポート 8443 を使用します。
URL から Apache NiFi にアクセスすると、ログイン用のユーザー名とパスワードの入力を求められます。
ログイン資格情報を取得するには、NiFi のインストールディレクトリ内の log ディレクトリにある「App.log」ファイルを確認してください。このファイルには通常、NiFi インターフェースにアクセスするために必要な情報が含まれています。
- NiFi Flow のワークスペースを右クリックし、「Controller Services」をクリックします。
- ボタンをクリックして、新しいコントローラーサービスを作成します。
- Controller Services セクションで、新しく作成した「DBCPConnection Pool」を見つけ、メニュー()から「Edit」をクリックして新しい接続を設定します。
以下のプロパティを入力します:
- Database Connection URL:jdbc:apachekafka:User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;
- Database Driver Class Name:cdata.jdbc.apachekafka.ApacheKafkaDriver
- Database Driver Location(s):Apache NiFi の lib フォルダへのパス(JAR ファイルが配置されている場所)。
組み込みの接続文字列デザイナー
JDBC URL の構築には、Kafka JDBC Driver に組み込まれている接続文字列デザイナーを使用できます。JAR ファイルをダブルクリックするか、コマンドラインから JAR ファイルを実行してください。
java -jar cdata.jdbc.apachekafka.jar
接続プロパティを入力し、接続文字列をクリップボードにコピーします。
Apache Kafka 接続プロパティの取得・設定方法
それでは、Apache Kafka に接続していきましょう。.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされており、CData 製品と一緒に自動的にインストールされます。 別のインストール方法をご利用の場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0をインストールしてください。
Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。
デフォルトでは、CData 製品はデータソースとPLAINTEXT で通信しており、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化したい場合は、以下の設定を行ってください:
- UseSSL をtrue に設定し、CData 製品がSSL 暗号化を使用するように構成します
- SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします
Apache Kafka への認証
続いて、認証方法を設定しましょう。Apache Kafka データソースでは、以下の認証方法をサポートしています:
- Anonymous
- Plain
- SCRAM ログインモジュール
- SSL クライアント証明書
- Kerberos
Anonymous 認証
Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 このような接続はanonymous(匿名)と呼ばれます。
匿名認証を行うには、以下のプロパティを設定してください。
- AuthScheme:None
その他の認証方法については、ヘルプドキュメントをご確認ください。
- Controller Services セクションで、新しく作成した DBCPConnection Pool を見つけ、メニュー()から「Enable」をクリックして新しい接続を有効にします。
- 「Enable Controller Service」ウィンドウで、Scope を「Service and referencing components」に設定します。
- 接続を確立し、SELECT クエリを実行するには、プロセッサー(黄色でハイライトされている部分)をワークスペースにドラッグ&ドロップします。
- 「ExecuteSQL」プロセッサーを選択し、「Add」ボタンをクリックしてワークスペースに表示させます。
- 追加したプロセッサー(ExecuteSQL)をダブルクリックして、接続ページを開きます。
- Properties セクションで、必要な情報を入力します。Database Connection Pooling Service を作成した DBCPConnectionPool に一致させ、SQL select query セクションに実行したい SQL クエリを設定してください。
- Relationships に移動し、実行プロセスの成功時と失敗時にコンポーネントがどのように処理を進めるかのオプションを選択してください。
- ExecuteSQL コンポーネントを有効にするには、それを選択して Operation セクションの「Enable」をクリックするか、右クリックして「Enable」を選択します。
これでKafka のデータ が Apache NiFi で使用できるようになりました。たとえば、DBCPConnection Pool を QueryDatabaseTable プロセッサーのソースとして使用できます(以下に表示)。
30日間の無料トライアルをダウンロードして、Apache NiFi でリアルタイムのKafka のデータ を操作してみてください。ご不明な点は、サポートチームまでお気軽にお問い合わせください。