Embulk を使用して Kafka ののデータをデータベースにロードする方法

Jerod Johnson
Jerod Johnson
Senior Technology Evangelist
CData JDBC Driver とオープンソースのETL/ELT ツールであるEmbulk を使ってKafka のデータをデータベースにロードする方法を解説します。

Embulk はオープンソースのバルクデータローダーです。CData JDBC Driver for Apache Kafka と組み合わせることで、Kafka から任意の同期先にデータを簡単にロードできます。この記事では、CData JDBC Driver for Apache Kafka をEmbulk で使用してKafka のデータをMySQL データベースにロードする方法を解説します。

CData JDBC Driver は、最適化されたデータ処理機能を内蔵しており、リアルタイムのKafka のデータに対して比類のないパフォーマンスを発揮します。Kafka に対して複雑なSQL クエリを発行すると、ドライバーはフィルタや集計などのサポートされているSQL 操作を直接Kafka にプッシュし、サポートされていない操作(SQL 関数やJOIN 操作など)は組み込みのSQL エンジンを使用してクライアント側で処理します。

Kafka への JDBC 接続を設定

Embulk でバルクロードジョブを作成する前に、JDBC Driver のJAR ファイルのインストール場所(通常はC:\Program Files\CData\CData JDBC Driver for Apache Kafka\lib)を確認しておきます。

Embulk はJDBC 接続をサポートしているため、Kafka に簡単に接続してSQL クエリを実行できます。バルクロードジョブを作成する前に、Kafka への認証用のJDBC URL を作成します。

Apache Kafka 接続プロパティの取得・設定方法

それでは、Apache Kafka に接続していきましょう。.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされており、CData 製品と一緒に自動的にインストールされます。 別のインストール方法をご利用の場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0をインストールしてください。

Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。

デフォルトでは、CData 製品はデータソースとPLAINTEXT で通信しており、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化したい場合は、以下の設定を行ってください:

  1. UseSSLtrue に設定し、CData 製品がSSL 暗号化を使用するように構成します
  2. SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします

Apache Kafka への認証

続いて、認証方法を設定しましょう。Apache Kafka データソースでは、以下の認証方法をサポートしています:

  • Anonymous
  • Plain
  • SCRAM ログインモジュール
  • SSL クライアント証明書
  • Kerberos

Anonymous 認証

Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 このような接続はanonymous(匿名)と呼ばれます。

匿名認証を行うには、以下のプロパティを設定してください。

  • AuthSchemeNone

その他の認証方法については、ヘルプドキュメントをご確認ください。

組み込みの接続文字列デザイナー

JDBC URL の作成には、Kafka JDBC Driver に組み込まれている接続文字列デザイナーを使用できます。JAR ファイルをダブルクリックするか、コマンドラインからJAR ファイルを実行します。

java -jar cdata.jdbc.apachekafka.jar

接続プロパティを入力し、接続文字列をクリップボードにコピーします。

以下は、Kafka への一般的なJDBC 接続文字列です。

jdbc:apachekafka:User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;

Embulk で Kafka ののデータをロード

CData JDBC Driver をインストールしてJDBC 接続文字列を作成したら、必要なEmbulk プラグインをインストールします。

Embulk の入力・出力プラグインをインストール

  1. Embulk にJDBC 入力プラグインをインストールします。
    https://github.com/embulk/embulk-input-jdbc/tree/master/embulk-input-jdbc
  2. embulk gem install embulk-input-jdbc
    
  3. この記事では、同期先データベースとしてMySQL を使用します。出力プラグインを使用して、SQL Server、PostgreSQL、またはGoogle BigQuery を同期先として選択することもできます。
    https://github.com/embulk/embulk-output-jdbc/tree/master/embulk-output-mysql
    embulk gem install embulk-output-mysql
    

入力プラグインと出力プラグインをインストールしたら、Embulk を使用してKafka のデータをMySQL にロードする準備が整いました。

Kafka ののデータをロードするジョブを作成

まず、Embulk で設定ファイルを作成します。ファイル名はapachekafka-mysql.yml のようにします。

  1. 入力プラグインのオプションには、CData JDBC Driver for Apache Kafka、ドライバーJAR ファイルへのパス、ドライバークラス(例:cdata.jdbc.apachekafka.ApacheKafkaDriver)、および上記のJDBC URL を指定します。
  2. 出力プラグインのオプションには、MySQL データベースの値と認証情報を指定します。

設定ファイルのサンプル(apachekafka-mysql.yml)

in:
	type: jdbc
	driver_path: C:\Program Files\CData[product_name] 20xx\lib\cdata.jdbc.apachekafka.jar
	driver_class: cdata.jdbc.apachekafka.ApacheKafkaDriver
	url: jdbc:apachekafka:User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;
	table: "SampleTable_1"
out:
	type: mysql
	host: localhost
	database: DatabaseName
	user: UserId
	password: UserPassword
	table: "SampleTable_1"
	mode: insert

ファイルを作成したら、Embulk ジョブを実行します。

embulk run apachekafka-mysql.yml

Embulk ジョブを実行すると、MySQL テーブルにKafka のデータが格納されます。

フィルタリングした Kafka ののデータをロード

テーブルから直接データをロードするだけでなく、カスタムSQL クエリを使用してロードするデータをより詳細に制御できます。また、クエリフィールドのSQL WHERE 句で最終更新カラムを設定することで、増分ロードを実行することもできます。

in:
	type: jdbc
	driver_path: C:\Program Files\CData[product_name] 20xx\lib\cdata.jdbc.apachekafka.jar
	driver_class: cdata.jdbc.apachekafka.ApacheKafkaDriver
	url: jdbc:apachekafka:User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;
	query: "SELECT Id, Column1 FROM SampleTable_1 WHERE [RecordId] = 1"
out:
	type: mysql
	host: localhost
	database: DatabaseName
	user: UserId
	password: UserPassword
	table: "SampleTable_1"
	mode: insert

詳細情報と無料トライアル

CData JDBC Driver for Apache Kafka をコネクタとして使用することで、Embulk のデータロードジョブにKafka のデータを統合できます。また、200 を超えるエンタープライズデータソース向けドライバーにより、あらゆるエンタープライズSaaS、ビッグデータ、NoSQL ソースも統合できます。30日間の無料トライアルをダウンロードして、今すぐお試しください。

はじめる準備はできましたか?

Apache Kafka Driver の無料トライアルをダウンロードしてお試しください:

 ダウンロード

詳細:

Apache Kafka Icon Apache Kafka JDBC Driver お問い合わせ

Apache Kafka データに連携するJava アプリケーションを素早く、簡単に開発できる便利なドライバー。