Databricks(AWS)でKafka のデータを処理・分析

Jerod Johnson
Jerod Johnson
Senior Technology Evangelist
CData、AWS、Databricks を使用して、リアルタイムKafka のデータに対してデータエンジニアリングとデータサイエンスを実行。

Databricks は、Apache Spark を通じたデータ処理機能を提供するクラウドベースのサービスです。CData JDBC Driver と組み合わせることで、Databricks を使用してリアルタイムKafka のデータに対してデータエンジニアリングとデータサイエンスを実行できます。この記事では、AWS でCData JDBC Driver をホストし、Databricks でリアルタイムKafka のデータに接続して処理する方法を説明します。

最適化されたデータ処理が組み込まれたCData JDBC Driver は、リアルタイムKafka のデータを扱う上で比類のないパフォーマンスを提供します。Kafka に複雑なSQL クエリを発行すると、ドライバーはフィルタや集計などのサポートされているSQL 操作をKafka に直接プッシュし、サポートされていない操作(主にSQL 関数やJOIN 操作)は組み込みSQL エンジンを利用してクライアント側で処理します。組み込みの動的メタデータクエリを使用すると、ネイティブデータ型を使ってKafka のデータを操作・分析できます。

CData JDBC Driver をDatabricks にインストール

Databricks でリアルタイムKafka のデータを操作するには、Databricks クラスターにドライバーをインストールします。

  1. Databricks の管理画面に移動し、対象のクラスターを選択します。
  2. Libraries タブで「Install New」をクリックします。
  3. Library Source として「Upload」を選択し、Library Type として「Jar」を選択します。
  4. インストール場所(通常はC:\Program Files\CData[product_name]\lib)からJDBC JAR ファイル(cdata.jdbc.apachekafka.jar)をアップロードします。

ノートブックでKafka のデータにアクセス:Python

JAR ファイルをインストールしたら、Databricks でリアルタイムKafka のデータを操作する準備が整いました。ワークスペースに新しいノートブックを作成します。ノートブックに名前を付け、言語としてPython を選択し(Scala も利用可能)、JDBC ドライバーをインストールしたクラスターを選択します。ノートブックが起動したら、接続を設定し、Kafka をクエリして、基本的なレポートを作成できます。

Kafka への接続を設定

JDBC Driver クラスを参照し、JDBC URL で使用する接続文字列を構築してKafka に接続します。また、JDBC URL でRTK プロパティを設定する必要があります(Beta ドライバーを使用している場合を除く)。このプロパティの設定方法については、インストールに含まれるライセンスファイルを参照してください。

ステップ1:接続情報

driver = "cdata.jdbc.apachekafka.ApacheKafkaDriver"
url = "jdbc:apachekafka:RTK=5246...;User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;"

組み込みの接続文字列デザイナー

JDBC URL の作成をサポートするために、Kafka JDBC Driver に組み込まれている接続文字列デザイナーが使用できます。JAR ファイルをダブルクリックするか、コマンドラインからJAR ファイルを実行します。

java -jar cdata.jdbc.apachekafka.jar

接続プロパティを入力し、接続文字列をクリップボードにコピーします。

Apache Kafka 接続プロパティの取得・設定方法

それでは、Apache Kafka に接続していきましょう。.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされており、CData 製品と一緒に自動的にインストールされます。 別のインストール方法をご利用の場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0をインストールしてください。

Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。

デフォルトでは、CData 製品はデータソースとPLAINTEXT で通信しており、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化したい場合は、以下の設定を行ってください:

  1. UseSSLtrue に設定し、CData 製品がSSL 暗号化を使用するように構成します
  2. SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします

Apache Kafka への認証

続いて、認証方法を設定しましょう。Apache Kafka データソースでは、以下の認証方法をサポートしています:

  • Anonymous
  • Plain
  • SCRAM ログインモジュール
  • SSL クライアント証明書
  • Kerberos

Anonymous 認証

Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 このような接続はanonymous(匿名)と呼ばれます。

匿名認証を行うには、以下のプロパティを設定してください。

  • AuthSchemeNone

その他の認証方法については、ヘルプドキュメントをご確認ください。

Kafka のデータをロード

接続を設定したら、CData JDBC Driver と接続情報を使用して、Kafka のデータをDataFrame としてロードできます。

ステップ2:データの読み取り

remote_table = spark.read.format ( "jdbc" ) \
	.option ( "driver" , driver) \
	.option ( "url" , url) \
	.option ( "dbtable" , "SampleTable_1") \
	.load ()

Kafka のデータを表示

ロードしたKafka のデータをdisplay 関数を呼び出して確認します。

ステップ3:結果の確認

display (remote_table.select ("Id"))

Databricks でKafka のデータを分析

Databricks SparkSQL でデータを処理するには、ロードしたデータをTemp View として登録します。

ステップ4:ビューまたはテーブルを作成

remote_table.createOrReplaceTempView ( "SAMPLE_VIEW" )

Temp View を作成したら、SparkSQL を使用してKafka のデータをレポート、ビジュアライゼーション、分析用に取得できます。

% sql

SELECT Id, Column1 FROM SAMPLE_VIEW ORDER BY Column1 DESC LIMIT 5

Kafka からのデータは、対象のノートブックでのみ利用可能です。他のユーザーと共有したい場合は、テーブルとして保存します。

remote_table.write.format ( "parquet" ) .saveAsTable ( "SAMPLE_TABLE" )

CData JDBC Driver for Apache Kafka の30日間無償トライアルをダウンロードして、Databricks でリアルタイムKafka のデータの操作をはじめましょう。ご不明な点があれば、サポートチームにお問い合わせください。

はじめる準備はできましたか?

Apache Kafka Driver の無料トライアルをダウンロードしてお試しください:

 ダウンロード

詳細:

Apache Kafka Icon Apache Kafka JDBC Driver お問い合わせ

Apache Kafka データに連携するJava アプリケーションを素早く、簡単に開発できる便利なドライバー。