Amazon Athena のデータをApache Kafka トピックにストリーミング
Apache Kafka は、主にリアルタイムデータパイプラインやイベント駆動型アプリケーションの構築に使用されるオープンソースのストリーム処理プラットフォームです。CData JDBC Driver for Amazon Athenaと組み合わせることで、Kafka はライブのAmazon Athena のデータを扱うことができます。この記事では、Amazon Athena データをApache Kafka トピックに接続、アクセス、ストリーミングする方法と、Confluent Control Center を起動してConfluent プラットフォームのKafka インフラストラクチャを使用して受信したAmazon Athena のデータをユーザーが安全に管理および監視できるようにする方法について説明します。
CData JDBC Driver は最適化されたデータ処理機能を内蔵しており、ライブのAmazon Athena のデータとのやり取りにおいて比類のないパフォーマンスを提供します。Amazon Athena に複雑なSQL クエリを発行すると、ドライバーはフィルタや集計などのサポートされているSQL 操作を直接Amazon Athena にプッシュし、サポートされていない操作(多くの場合SQL 関数やJOIN 操作)については組み込みのSQL エンジンを使用してクライアント側で処理します。組み込みの動的メタデータクエリにより、ネイティブのデータ型を使用してAmazon Athena のデータを操作および分析できます。
Amazon Athena データ連携について
CData は、Amazon Athena のライブデータにアクセスし、統合するための最も簡単な方法を提供します。お客様は CData の接続機能を以下の目的で使用しています:
- IAM 認証情報、アクセスキー、インスタンスプロファイルなど、さまざまな方法で安全に認証できます。多様なセキュリティニーズに対応し、認証プロセスを簡素化します。
- 詳細なエラーメッセージにより、セットアップを効率化し、問題を迅速に解決できます。
- サーバーサイドでのクエリ実行により、パフォーマンスを向上させ、クライアントリソースへの負荷を最小限に抑えます。
ユーザーは、Tableau、Power BI、Excel などの分析ツールと Athena を統合し、お気に入りのツールから詳細な分析を行うことができます。
CData を使用した Amazon Athena のユニークなユースケースについては、ブログ記事をご覧ください:https://jp.cdata.com/blog/amazon-athena-use-cases
はじめに
前提条件
Apache Kafka トピックでAmazon Athena のデータをストリーミングするためにCData JDBC Driver を接続する前に、クライアントのLinux ベースのシステムに以下をインストールおよび設定してください。
- Confluent Platform for Apache Kafka
- Confluent Hub CLI のインストール
- Confluent Platform 用のSelf-Managed Kafka JDBC Source Connector
Amazon Athena のデータへの新しいJDBC 接続を定義
- Linux ベースのシステムにCData JDBC Driver for Amazon Athenaをダウンロードします。
- 以下の手順に従って新しいディレクトリを作成し、すべてのドライバーの内容を展開します:
- Amazon Athena という名前の新しいディレクトリを作成します。
mkdir AmazonAthena
- ダウンロードしたドライバーファイル(.zip)をこの新しいディレクトリに移動します。
mv AmazonAthenaJDBCDriver.zip AmazonAthena/
- CData AmazonAthenaJDBCDriver の内容をこの新しいディレクトリに解凍します。
unzip AmazonAthenaJDBCDriver.zip
- Amazon Athena という名前の新しいディレクトリを作成します。
- Amazon Athena ディレクトリを開き、lib フォルダに移動します。
ls cd lib/
- CData JDBC Driver for Amazon Athena のlib フォルダの内容をKafka Connect JDBC のlib フォルダにコピーします。Kafka Connect JDBC フォルダの内容を確認し、cdata.jdbc.amazonathena.jar ファイルがlib フォルダに正常にコピーされたことを確認します。
cp -r /path/to/CData JDBC Driver for Amazon Athena/lib/* /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/ cd /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/
- 以下のコマンドを使用して、CData Amazon Athena JDBC ドライバーのライセンスをインストールします。名前とメールアドレスを入力してください。
java -jar cdata.jdbc.amazonathena.jar -l
- プロダクトキーまたは"TRIAL" を入力します(ライセンスの有効期限が切れた場合は、CData サポートチームまでお問い合わせください)。
- 以下のコマンドを使用してConfluent ローカルサービスを起動します:
confluent local services start
これにより、Zookeeper、Kafka、Schema Registry、Kafka REST、Kafka CONNECT、ksqlDB、Control Center などのすべてのConfluent サービスが起動します。これで、CData JDBC Driver for Amazon Athena を使用してKafka Connect Driver 経由でksqlDB のKafka トピックにメッセージをストリーミングする準備が整いました。
- POST HTTP API リクエストを使用してKafka トピックを手動で作成します:
curl --location 'server_address:8083/connectors' --header 'Content-Type: application/json' --data '{ "name": "jdbc_source_cdata_amazonathena_01", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:amazonathena:AccessKey='a123';SecretKey='s123';Region='IRELAND';Database='sampledb';S3StagingDirectory='s3://bucket/staging/';", "topic.prefix": "amazonathena-01-", "mode": "bulk" } }'HTTP POST 本文(上記)で使用されるフィールドについて説明します:
- connector.class: 使用するKafka Connect コネクタのJava クラスを指定します。
- connection.url: Amazon Athena データに接続するためのJDBC 接続URL です。
組み込みの接続文字列デザイナー
JDBC URL の作成については、CData JDBC Driver for Amazon Athenaに組み込まれた接続文字列デザイナーを使用してください。JAR ファイルをダブルクリックするか、コマンドラインからjar ファイルを実行します。
java -jar cdata.jdbc.amazonathena.jar
接続プロパティを入力し、接続文字列をクリップボードにコピーします。
Amazon Athena 接続プロパティの取得・設定方法
それでは、早速Athena に接続していきましょう。
データに接続するには、以下の接続パラメータを指定します。
- DataSource:接続するAmazon Athena データソース。
- Database:接続するAmazon Athena データベース。
- AWSRegion:Amazon Athena データがホストされているリージョン。
- S3StagingDirectory:クエリの結果を保存するS3 フォルダ。
Database またはDataSource が設定されていない場合、CData 製品はAmazon Athena の利用可能なデータソースからすべてのデータベースのリスト化を試みます。そのため、両方のプロパティを設定することでCData 製品のパフォーマンスが向上します。
Amazon Athena の認証設定
CData 製品は幅広い認証オプションに対応しています。詳しくはヘルプドキュメントの「はじめに」を参照してみてください。
AWS キーを取得
IAM ユーザーの認証情報を取得するには、以下のステップお試しください。
- IAM コンソールにサインインします。
- ナビゲーションペインでユーザーを選択します。
- ユーザーのアクセスキーを作成または管理するには、ユーザーを選択してからセキュリティ認証情報タブに移動します。
AWS ルートアカウントの資格情報を取得するには、以下のステップをお試しください。
- ルートアカウントの認証情報を使用してAWS 管理コンソールにサインインします。
- アカウント名または番号を選択します。
- 表示されたメニューでMy Security Credentials を選択します。
- ルートアカウントのアクセスキーを管理または作成するには、Continue to Security Credentials をクリックし、[Access Keys]セクションを展開します。
その他の認証オプションについては、ヘルプドキュメントの「Amazon Athena への認証」を参照してください。
- topic.prefix: コネクタによって作成されるKafka トピックに追加されるプレフィックスです。「amazonathena-01-」に設定されています。
- mode: コネクタの動作モードを指定します。ここでは「bulk」に設定されており、コネクタがバルクデータ転送を実行するように設定されていることを示しています。
このリクエストにより、Amazon Athena のすべてのテーブル/コンテンツがKafka トピックとして追加されます。
注意: リクエストをPOST するIP アドレス(サーバー)は、Linux ネットワークのIP アドレスです。
- ksqlDB を実行し、トピックを一覧表示します。以下のコマンドを使用します:
ksql list topics;
- トピック内のデータを表示するには、以下のSQL ステートメントを入力します:
PRINT topic FROM BEGINNING;
Confluent Control Center への接続
Confluent Control Center のユーザーインターフェースにアクセスするには、上記のセクションで説明した"confluent local services" を実行し、ローカルブラウザでhttp://<server address>:9021/clusters/ と入力してください。
おわりに
CData JDBC Driver for Amazon Athenaの30日間無償トライアルをダウンロードして、Amazon Athena データをApache Kafka にストリーミングしましょう。ご不明な点があれば、サポートチームまでお問い合わせください。