Kafka のデータ のPostgreSQL インターフェースを作成
PostgreSQL には多くの対応クライアントがあります。標準のドライバーからBI、アナリティクスツールまで、PostgreSQL はデータ接続の人気のインターフェースです。JDBC ドライバーを使用することで、簡単に任意の標準クライアントから接続できるPostgreSQL エントリポイントを作成できます。
Kafka にPostgreSQL データベースとしてアクセスするには、CData JDBC Driver for ApacheKafka とJDBC foreign data wrapper (FDW) を使用します。この記事ではFDW をコンパイルしてインストールし、PostgreSQL サーバーからKafka にクエリを実行します。
JDBC データソースとしてKafka のデータに接続する
JDBC データソースとしてKafka に接続するには、以下が必要です。
- Driver のJAR パス:JAR ファイルは、インストールディレクトリのlib サブフォルダにあります。
Driver クラス
cdata.jdbc.apachekafka.ApacheKafkaDriver
- JDBC URL:
URL は、"jdbc:apachekafka:" で始まり、セミコロンで区切られた名前と値の組み合わせで任意の接続プロパティを含めることができます。
Apache Kafka 接続プロパティの取得・設定方法
それでは、Apache Kafka に接続していきましょう。.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされており、CData 製品と一緒に自動的にインストールされます。 別のインストール方法をご利用の場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0をインストールしてください。
Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。
デフォルトでは、CData 製品はデータソースとPLAINTEXT で通信しており、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化したい場合は、以下の設定を行ってください:
- UseSSL をtrue に設定し、CData 製品がSSL 暗号化を使用するように構成します
- SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします
Apache Kafka への認証
続いて、認証方法を設定しましょう。Apache Kafka データソースでは、以下の認証方法をサポートしています:
- Anonymous
- Plain
- SCRAM ログインモジュール
- SSL クライアント証明書
- Kerberos
Anonymous 認証
Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 このような接続はanonymous(匿名)と呼ばれます。
匿名認証を行うには、以下のプロパティを設定してください。
- AuthScheme:None
その他の認証方法については、ヘルプドキュメントをご確認ください。
ビルトイン接続文字列デザイナ
JDBC URL の構成については、Kafka JDBC Driver に組み込まれている接続文字列デザイナを使用できます。JAR ファイルのダブルクリック、またはコマンドラインからJAR ファイルを実行します。
java -jar cdata.jdbc.apachekafka.jar
接続プロパティを入力し、接続文字列をクリップボードにコピーします。
以下は一般的なJDBC URL です。
jdbc:apachekafka:User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;
JDBC FDW を構築する
FDW は、PostgreSQL を再コンパイルせずに、PostgreSQL の拡張機能としてインストールできます。例としてjdbc2_fdw 拡張子を使用します。
- ご使用のバージョンのJRE 共有オブジェクトから、/usr/lib/libjvm.so にシンボリックリンクを追加します。コマンド例:
ln -s /usr/lib/jvm/java-6-openjdk/jre/lib/amd64/server/libjvm.so /usr/lib/libjvm.so
- ビルドするには、以下のコマンドを実行してください。
make install USE_PGXS=1
Kafka のデータをPostgreSQL データベースとしてクエリする
拡張機能をインストールした後、以下のステップに従ってKafka へのクエリの実行を開始します。
- データベースにログイン
-
データベースの拡張機能をロード
CREATE EXTENSION jdbc2_fdw;
-
Kafka のオブジェクトを作成
CREATE SERVER ApacheKafka FOREIGN DATA WRAPPER jdbc2_fdw OPTIONS ( drivername 'cdata.jdbc.apachekafka.ApacheKafkaDriver', url 'jdbc:apachekafka:User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;', querytimeout '15', jarfile '/home/MyUser/CData/CData\ JDBC\ Driver\ for\ Salesforce MyDriverEdition/lib/cdata.jdbc.apachekafka.jar');
-
PostgreSQL デーモンに認識されているユーザーのユーザー名とパスワードのユーザーマッピングを作成
CREATE USER MAPPING for postgres SERVER ApacheKafka OPTIONS ( username 'admin', password 'test');
-
ローカルデータベースに外部テーブルを作成
postgres=# CREATE FOREIGN TABLE sampletable_1 ( sampletable_1_id text, sampletable_1_Id text, sampletable_1_Column1 numeric) SERVER ApacheKafka OPTIONS ( table_name 'sampletable_1');
postgres=# SELECT * FROM sampletable_1;
おわりに
このようにCData JDBC Driver for ApacheKafka を使って簡単にKafka のデータを取得して検索対象にすることができました。ぜひ、30日の無償評価版 をお試しください。