Kafka のデータ のPostgreSQL インターフェースを作成

加藤龍彦
加藤龍彦
デジタルマーケティング
Kafka JDBC Driver のリモート機能を使用し、データアクセス用のPostgreSQL エントリポイントを作成します。

PostgreSQL には多くの対応クライアントがあります。標準のドライバーからBI、アナリティクスツールまで、PostgreSQL はデータ接続の人気のインターフェースです。JDBC ドライバーを使用することで、簡単に任意の標準クライアントから接続できるPostgreSQL エントリポイントを作成できます。

Kafka にPostgreSQL データベースとしてアクセスするには、CData JDBC Driver for ApacheKafka とJDBC foreign data wrapper (FDW) を使用します。この記事ではFDW をコンパイルしてインストールし、PostgreSQL サーバーからKafka にクエリを実行します。

JDBC データソースとしてKafka のデータに接続する

JDBC データソースとしてKafka に接続するには、以下が必要です。

  • Driver のJAR パス:JAR ファイルは、インストールディレクトリのlib サブフォルダにあります。
  • Driver クラス

    cdata.jdbc.apachekafka.ApacheKafkaDriver
    
  • JDBC URL: URL は、"jdbc:apachekafka:" で始まり、セミコロンで区切られた名前と値の組み合わせで任意の接続プロパティを含めることができます。

    Apache Kafka 接続プロパティの取得・設定方法

    それでは、Apache Kafka に接続していきましょう。.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされており、CData 製品と一緒に自動的にインストールされます。 別のインストール方法をご利用の場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0をインストールしてください。

    Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。

    デフォルトでは、CData 製品はデータソースとPLAINTEXT で通信しており、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化したい場合は、以下の設定を行ってください:

    1. UseSSLtrue に設定し、CData 製品がSSL 暗号化を使用するように構成します
    2. SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします

    Apache Kafka への認証

    続いて、認証方法を設定しましょう。Apache Kafka データソースでは、以下の認証方法をサポートしています:

    • Anonymous
    • Plain
    • SCRAM ログインモジュール
    • SSL クライアント証明書
    • Kerberos

    Anonymous 認証

    Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 このような接続はanonymous(匿名)と呼ばれます。

    匿名認証を行うには、以下のプロパティを設定してください。

    • AuthSchemeNone

    その他の認証方法については、ヘルプドキュメントをご確認ください。

    ビルトイン接続文字列デザイナ

    JDBC URL の構成については、Kafka JDBC Driver に組み込まれている接続文字列デザイナを使用できます。JAR ファイルのダブルクリック、またはコマンドラインからJAR ファイルを実行します。

    java -jar cdata.jdbc.apachekafka.jar
    

    接続プロパティを入力し、接続文字列をクリップボードにコピーします。

    Using the built-in connection string designer to generate a JDBC URL (Salesforce is shown.)

    以下は一般的なJDBC URL です。

    jdbc:apachekafka:User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;
    

JDBC FDW を構築する

FDW は、PostgreSQL を再コンパイルせずに、PostgreSQL の拡張機能としてインストールできます。例としてjdbc2_fdw 拡張子を使用します。

  1. ご使用のバージョンのJRE 共有オブジェクトから、/usr/lib/libjvm.so にシンボリックリンクを追加します。コマンド例:
    ln -s /usr/lib/jvm/java-6-openjdk/jre/lib/amd64/server/libjvm.so /usr/lib/libjvm.so
    
  2. ビルドするには、以下のコマンドを実行してください。
    make install USE_PGXS=1
    

Kafka のデータをPostgreSQL データベースとしてクエリする

拡張機能をインストールした後、以下のステップに従ってKafka へのクエリの実行を開始します。

  1. データベースにログイン
  2. データベースの拡張機能をロード
    CREATE EXTENSION jdbc2_fdw;
    
  3. Kafka のオブジェクトを作成
    CREATE SERVER ApacheKafka
    FOREIGN DATA WRAPPER jdbc2_fdw OPTIONS (
    drivername 'cdata.jdbc.apachekafka.ApacheKafkaDriver',
    url 'jdbc:apachekafka:User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;',
    querytimeout '15',
    jarfile '/home/MyUser/CData/CData\ JDBC\ Driver\ for\ Salesforce MyDriverEdition/lib/cdata.jdbc.apachekafka.jar');
    
  4. PostgreSQL デーモンに認識されているユーザーのユーザー名とパスワードのユーザーマッピングを作成
    CREATE USER MAPPING for postgres SERVER ApacheKafka OPTIONS (
    username 'admin',
    password 'test');
    
  5. ローカルデータベースに外部テーブルを作成
    postgres=# CREATE FOREIGN TABLE sampletable_1 (
    sampletable_1_id text,
    sampletable_1_Id text,
    sampletable_1_Column1 numeric)
    SERVER ApacheKafka OPTIONS (
    table_name 'sampletable_1');
    
Kafka に対して 読み取り/書き込みコマンドを実行可能にする
postgres=# SELECT * FROM sampletable_1;

おわりに

このようにCData JDBC Driver for ApacheKafka を使って簡単にKafka のデータを取得して検索対象にすることができました。ぜひ、30日の無償評価版 をお試しください。

はじめる準備はできましたか?

Apache Kafka Driver の無料トライアルをダウンロードしてお試しください:

 ダウンロード

詳細:

Apache Kafka Icon Apache Kafka JDBC Driver お問い合わせ

Apache Kafka データに連携するJava アプリケーションを素早く、簡単に開発できる便利なドライバー。