Databricks(AWS)でKafka のデータを処理・分析
Databricks は、Apache Spark を通じたデータ処理機能を提供するクラウドベースのサービスです。CData JDBC Driver と組み合わせることで、Databricks を使用してリアルタイムKafka のデータに対してデータエンジニアリングとデータサイエンスを実行できます。この記事では、AWS でCData JDBC Driver をホストし、Databricks でリアルタイムKafka のデータに接続して処理する方法を説明します。
最適化されたデータ処理が組み込まれたCData JDBC Driver は、リアルタイムKafka のデータを扱う上で比類のないパフォーマンスを提供します。Kafka に複雑なSQL クエリを発行すると、ドライバーはフィルタや集計などのサポートされているSQL 操作をKafka に直接プッシュし、サポートされていない操作(主にSQL 関数やJOIN 操作)は組み込みSQL エンジンを利用してクライアント側で処理します。組み込みの動的メタデータクエリを使用すると、ネイティブデータ型を使ってKafka のデータを操作・分析できます。
CData JDBC Driver をDatabricks にインストール
Databricks でリアルタイムKafka のデータを操作するには、Databricks クラスターにドライバーをインストールします。
- Databricks の管理画面に移動し、対象のクラスターを選択します。
- Libraries タブで「Install New」をクリックします。
- Library Source として「Upload」を選択し、Library Type として「Jar」を選択します。
- インストール場所(通常はC:\Program Files\CData[product_name]\lib)からJDBC JAR ファイル(cdata.jdbc.apachekafka.jar)をアップロードします。
ノートブックでKafka のデータにアクセス:Python
JAR ファイルをインストールしたら、Databricks でリアルタイムKafka のデータを操作する準備が整いました。ワークスペースに新しいノートブックを作成します。ノートブックに名前を付け、言語としてPython を選択し(Scala も利用可能)、JDBC ドライバーをインストールしたクラスターを選択します。ノートブックが起動したら、接続を設定し、Kafka をクエリして、基本的なレポートを作成できます。
Kafka への接続を設定
JDBC Driver クラスを参照し、JDBC URL で使用する接続文字列を構築してKafka に接続します。また、JDBC URL でRTK プロパティを設定する必要があります(Beta ドライバーを使用している場合を除く)。このプロパティの設定方法については、インストールに含まれるライセンスファイルを参照してください。
ステップ1:接続情報
driver = "cdata.jdbc.apachekafka.ApacheKafkaDriver" url = "jdbc:apachekafka:RTK=5246...;User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;"
組み込みの接続文字列デザイナー
JDBC URL の作成をサポートするために、Kafka JDBC Driver に組み込まれている接続文字列デザイナーが使用できます。JAR ファイルをダブルクリックするか、コマンドラインからJAR ファイルを実行します。
java -jar cdata.jdbc.apachekafka.jar
接続プロパティを入力し、接続文字列をクリップボードにコピーします。
Apache Kafka 接続プロパティの取得・設定方法
それでは、Apache Kafka に接続していきましょう。.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされており、CData 製品と一緒に自動的にインストールされます。 別のインストール方法をご利用の場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0をインストールしてください。
Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。
デフォルトでは、CData 製品はデータソースとPLAINTEXT で通信しており、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化したい場合は、以下の設定を行ってください:
- UseSSL をtrue に設定し、CData 製品がSSL 暗号化を使用するように構成します
- SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします
Apache Kafka への認証
続いて、認証方法を設定しましょう。Apache Kafka データソースでは、以下の認証方法をサポートしています:
- Anonymous
- Plain
- SCRAM ログインモジュール
- SSL クライアント証明書
- Kerberos
Anonymous 認証
Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 このような接続はanonymous(匿名)と呼ばれます。
匿名認証を行うには、以下のプロパティを設定してください。
- AuthScheme:None
その他の認証方法については、ヘルプドキュメントをご確認ください。
Kafka のデータをロード
接続を設定したら、CData JDBC Driver と接続情報を使用して、Kafka のデータをDataFrame としてロードできます。
ステップ2:データの読み取り
remote_table = spark.read.format ( "jdbc" ) \ .option ( "driver" , driver) \ .option ( "url" , url) \ .option ( "dbtable" , "SampleTable_1") \ .load ()
Kafka のデータを表示
ロードしたKafka のデータをdisplay 関数を呼び出して確認します。
ステップ3:結果の確認
display (remote_table.select ("Id"))
Databricks でKafka のデータを分析
Databricks SparkSQL でデータを処理するには、ロードしたデータをTemp View として登録します。
ステップ4:ビューまたはテーブルを作成
remote_table.createOrReplaceTempView ( "SAMPLE_VIEW" )
Temp View を作成したら、SparkSQL を使用してKafka のデータをレポート、ビジュアライゼーション、分析用に取得できます。
% sql SELECT Id, Column1 FROM SAMPLE_VIEW ORDER BY Column1 DESC LIMIT 5
Kafka からのデータは、対象のノートブックでのみ利用可能です。他のユーザーと共有したい場合は、テーブルとして保存します。
remote_table.write.format ( "parquet" ) .saveAsTable ( "SAMPLE_TABLE" )
CData JDBC Driver for Apache Kafka の30日間無償トライアルをダウンロードして、Databricks でリアルタイムKafka のデータの操作をはじめましょう。ご不明な点があれば、サポートチームにお問い合わせください。