Java で Kafka のデータ エンティティのオブジェクト リレーショナル マッピング(ORM)を行う
Hibernate を使用して、オブジェクト指向のドメインモデルを従来のリレーショナルデータベースにマッピングできます。以下のチュートリアルでは、CData JDBC Driver for Kafka を使用して Hibernate で Kafka リポジトリの ORM を生成する方法を説明します。
本記事では IDE として Eclipse を使用していますが、CData JDBC Driver for Kafka は Java ランタイム環境をサポートするあらゆる製品で使用できます。Knowledge Base には、IntelliJ IDEA や NetBeans から Kafka のデータ に接続するためのチュートリアルも用意しています。
Hibernate のインストール
以下の手順に従って、Eclipse に Hibernate プラグインをインストールします。
- Eclipse で、Help -> Install New Software を選択します。
- Work With ボックスに「http://download.jboss.org/jbosstools/neon/stable/updates/」を入力します。
- フィルタボックスに「Hibernate」と入力します。
- Hibernate Tools を選択します。
新規プロジェクトの作成
以下の手順に従って、新しいプロジェクトにドライバー JAR を追加します。
- 新規プロジェクトを作成します。プロジェクトタイプとして Java Project を選択し、Next をクリックします。プロジェクト名を入力して Finish をクリックします。
- プロジェクトを右クリックして Properties をクリックします。Java Build Path をクリックし、Libraries タブを開きます。
- Add External JARs をクリックして、インストールディレクトリの lib サブフォルダにある cdata.jdbc.apachekafka.jar ライブラリを追加します。
Hibernate 設定ファイルの追加
以下の手順に従って、Kafka のデータ への接続プロパティを設定します。
- 新規プロジェクトを右クリックし、New -> Hibernate -> Hibernate Configuration File (cfg.xml) を選択します。
- src を親フォルダとして選択し、Next をクリックします。
以下の値を入力します:
- Hibernate version:: 5.2
- Database dialect: Derby
- Driver class: cdata.jdbc.apachekafka.ApacheKafkaDriver
Connection URL: JDBC URL です。jdbc:apachekafka: から始まり、セミコロン区切りの接続プロパティが続きます。
Apache Kafka 接続プロパティの取得・設定方法
それでは、Apache Kafka に接続していきましょう。.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされており、CData 製品と一緒に自動的にインストールされます。 別のインストール方法をご利用の場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0をインストールしてください。
Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。
デフォルトでは、CData 製品はデータソースとPLAINTEXT で通信しており、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化したい場合は、以下の設定を行ってください:
- UseSSL をtrue に設定し、CData 製品がSSL 暗号化を使用するように構成します
- SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします
Apache Kafka への認証
続いて、認証方法を設定しましょう。Apache Kafka データソースでは、以下の認証方法をサポートしています:
- Anonymous
- Plain
- SCRAM ログインモジュール
- SSL クライアント証明書
- Kerberos
Anonymous 認証
Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 このような接続はanonymous(匿名)と呼ばれます。
匿名認証を行うには、以下のプロパティを設定してください。
- AuthScheme:None
その他の認証方法については、ヘルプドキュメントをご確認ください。
組み込みの接続文字列デザイナー
JDBC URL の構築には、Kafka JDBC Driver に組み込まれている接続文字列デザイナーを使用できます。JAR ファイルをダブルクリックするか、コマンドラインから jar ファイルを実行してください。
java -jar cdata.jdbc.apachekafka.jar
接続プロパティを入力し、接続文字列をクリップボードにコピーします。
一般的な JDBC URL は以下のとおりです:
jdbc:apachekafka:User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;
Hibernate から Kafka のデータ に接続
以下の手順に従って、前のステップで作成した設定を選択します。
- Hibernate Configurations パースペクティブに切り替えます: Window -> Open Perspective -> Hibernate。
- Hibernate Configurations パネルを右クリックし、Add Configuration をクリックします。
- Hibernate version を 5.2 に設定します。
- Browse ボタンをクリックし、プロジェクトを選択します。
- Configuration file フィールドで、Setup -> Use Existing をクリックし、hibernate.cfg.xml ファイルの場所(このデモでは src フォルダ内)を選択します。
- Classpath タブで、User Entries の下に何もない場合は、Add External JARS をクリックしてドライバー jar を再度追加します。設定が完了したら OK をクリックします。
- 新しく作成した Hibernate 設定ファイルの Database ノードを展開します。
Kafka のデータ のリバースエンジニアリング
以下の手順に従って、reveng.xml 設定ファイルを生成します。オブジェクトとしてアクセスするテーブルを指定します。
- Package Explorer に戻ります。
- プロジェクトを右クリックし、New -> Hibernate -> Hibernate Reverse Engineering File (reveng.xml) を選択します。Next をクリックします。
- src を親フォルダとして選択し、Next をクリックします。
- Console configuration ドロップダウンメニューで、上記で作成した Hibernate 設定ファイルを選択し、Refresh をクリックします。
- ノードを展開し、リバースエンジニアリングするテーブルを選択します。完了したら Finish をクリックします。
Hibernate の実行設定
以下の手順に従って、Kafka テーブルの POJO(Plain Old Java Object)を生成します。
- メニューバーから、Run -> Hibernate Code Generation -> Hibernate Code Generation Configurations をクリックします。
- Console configuration ドロップダウンメニューで、前のセクションで作成した Hibernate 設定ファイルを選択します。Output directory の横にある Browse をクリックし、src を選択します。
- Reverse Engineer from JDBC Connection チェックボックスを有効にします。Setup ボタンをクリックし、Use Existing をクリックして、hibernate.reveng.xml ファイルの場所(このデモでは src フォルダ内)を選択します。
- Exporters タブで、Domain code (.java) と Hibernate XML Mappings (hbm.xml) をチェックします。
- Run をクリックします。
前のステップのリバースエンジニアリング設定に基づいて、1 つ以上の POJO が作成されます。
マッピングタグの挿入
生成した各マッピングに対して、hibernate.cfg.xml にマッピングタグを作成し、Hibernate がマッピングリソースを参照できるようにする必要があります。hibernate.cfg.xml を開き、以下のようにマッピングタグを挿入します:
cdata.apachekafka.ApacheKafkaDriver
jdbc:apachekafka:User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;
org.hibernate.dialect.SQLServerDialect
SQL の実行
前のステップで作成したエンティティを使用して、Kafka のデータ のデータの検索と変更が可能になります:
import java.util.*;
import org.hibernate.Session;
import org.hibernate.cfg.Configuration;
import org.hibernate.query.Query;
public class App {
public static void main(final String[] args) {
Session session = new
Configuration().configure().buildSessionFactory().openSession();
String SELECT = "FROM SampleTable_1 S WHERE Column2 = :Column2";
Query q = session.createQuery(SELECT, SampleTable_1.class);
q.setParameter("Column2","100");
List<SampleTable_1> resultList = (List<SampleTable_1>) q.list();
for(SampleTable_1 s: resultList){
System.out.println(s.getId());
System.out.println(s.getColumn1());
}
}
}