Salesforce Data Cloud のデータをApache Kafka トピックにストリーミング
Apache Kafka は、主にリアルタイムデータパイプラインやイベント駆動型アプリケーションの構築に使用されるオープンソースのストリーム処理プラットフォームです。CData JDBC Driver for Salesforce Data Cloudと組み合わせることで、Kafka はライブのSalesforce Data Cloud のデータを扱うことができます。この記事では、Salesforce Data Cloud データをApache Kafka トピックに接続、アクセス、ストリーミングする方法と、Confluent Control Center を起動してConfluent プラットフォームのKafka インフラストラクチャを使用して受信したSalesforce Data Cloud のデータをユーザーが安全に管理および監視できるようにする方法について説明します。
CData JDBC Driver は最適化されたデータ処理機能を内蔵しており、ライブのSalesforce Data Cloud のデータとのやり取りにおいて比類のないパフォーマンスを提供します。Salesforce Data Cloud に複雑なSQL クエリを発行すると、ドライバーはフィルタや集計などのサポートされているSQL 操作を直接Salesforce Data Cloud にプッシュし、サポートされていない操作(多くの場合SQL 関数やJOIN 操作)については組み込みのSQL エンジンを使用してクライアント側で処理します。組み込みの動的メタデータクエリにより、ネイティブのデータ型を使用してSalesforce Data Cloud のデータを操作および分析できます。
前提条件
Apache Kafka トピックでSalesforce Data Cloud のデータをストリーミングするためにCData JDBC Driver を接続する前に、クライアントのLinux ベースのシステムに以下をインストールおよび設定してください。
- Confluent Platform for Apache Kafka
- Confluent Hub CLI のインストール
- Confluent Platform 用のSelf-Managed Kafka JDBC Source Connector
Salesforce Data Cloud のデータへの新しいJDBC 接続を定義
- Linux ベースのシステムにCData JDBC Driver for Salesforce Data Cloudをダウンロードします。
- 以下の手順に従って新しいディレクトリを作成し、すべてのドライバーの内容を展開します:
- Salesforce Data Cloud という名前の新しいディレクトリを作成します。
mkdir SalesforceDataCloud
- ダウンロードしたドライバーファイル(.zip)をこの新しいディレクトリに移動します。
mv SalesforceDataCloudJDBCDriver.zip SalesforceDataCloud/
- CData SalesforceDataCloudJDBCDriver の内容をこの新しいディレクトリに解凍します。
unzip SalesforceDataCloudJDBCDriver.zip
- Salesforce Data Cloud という名前の新しいディレクトリを作成します。
- Salesforce Data Cloud ディレクトリを開き、lib フォルダに移動します。
ls cd lib/
- CData JDBC Driver for Salesforce Data Cloud のlib フォルダの内容をKafka Connect JDBC のlib フォルダにコピーします。Kafka Connect JDBC フォルダの内容を確認し、cdata.jdbc.salesforcedatacloud.jar ファイルがlib フォルダに正常にコピーされたことを確認します。
cp -r /path/to/CData JDBC Driver for Salesforce Data Cloud/lib/* /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/ cd /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/
- 以下のコマンドを使用して、CData Salesforce Data Cloud JDBC ドライバーのライセンスをインストールします。名前とメールアドレスを入力してください。
java -jar cdata.jdbc.salesforcedatacloud.jar -l
- プロダクトキーまたは"TRIAL" を入力します(ライセンスの有効期限が切れた場合は、CData サポートチームまでお問い合わせください)。
- 以下のコマンドを使用してConfluent ローカルサービスを起動します:
confluent local services start
これにより、Zookeeper、Kafka、Schema Registry、Kafka REST、Kafka CONNECT、ksqlDB、Control Center などのすべてのConfluent サービスが起動します。これで、CData JDBC Driver for Salesforce Data Cloud を使用してKafka Connect Driver 経由でksqlDB のKafka トピックにメッセージをストリーミングする準備が整いました。
- POST HTTP API リクエストを使用してKafka トピックを手動で作成します:
curl --location 'server_address:8083/connectors' --header 'Content-Type: application/json' --data '{ "name": "jdbc_source_cdata_salesforcedatacloud_01", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:salesforcedatacloud:", "topic.prefix": "salesforcedatacloud-01-", "mode": "bulk" } }'HTTP POST 本文(上記)で使用されるフィールドについて説明します:
- connector.class: 使用するKafka Connect コネクタのJava クラスを指定します。
- connection.url: Salesforce Data Cloud データに接続するためのJDBC 接続URL です。
組み込みの接続文字列デザイナー
JDBC URL の作成については、CData JDBC Driver for Salesforce Data Cloudに組み込まれた接続文字列デザイナーを使用してください。JAR ファイルをダブルクリックするか、コマンドラインからjar ファイルを実行します。
java -jar cdata.jdbc.salesforcedatacloud.jar
接続プロパティを入力し、接続文字列をクリップボードにコピーします。
それでは、Salesforce Data Cloud への認証方法を設定していきましょう。Salesforce Data Cloud では、OAuth 標準による認証をサポートしています。
OAuth 認証
AuthScheme をOAuth に設定してください。
デスクトップアプリケーション
CData 製品では、デスクトップでの認証を簡略化する埋め込みOAuth アプリケーションを提供しています。
また、Salesforce Data Cloud コンソールで設定および登録するカスタムOAuth アプリケーションを介してデスクトップから認証することも可能です。詳しくは、ヘルプドキュメントのカスタムOAuth アプリの作成をご確認ください。
接続する前に、以下のプロパティを設定してください。
- InitiateOAuth:GETANDREFRESH。InitiateOAuth を使用すれば、繰り返しOAuth の交換を行ったり、手動でOAuthAccessToken を設定する必要がなくなります
- OAuthClientId(カスタムアプリケーションのみ):カスタムOAuth アプリケーションの登録時に割り当てられたクライアントID
- OAuthClientSecret(カスタムアプリケーションのみ):カスタムOAuth アプリケーションの登録時に割り当てられたクライアントシークレット
接続すると、CData 製品がデフォルトブラウザでSalesforce Data Cloud のOAuth エンドポイントを開きます。ログインして、アプリケーションにアクセス許可を与えてください。
以下のようにドライバーがOAuth プロセスを完了します。
- コールバックURL からアクセストークンを取得します
- 古いトークンの期限が切れた際は、新しいアクセストークンを取得します
- OAuthSettingsLocation にOAuth 値を保存し、接続間で永続化します
Web アプリケーションやヘッドレスマシンを含むその他のOAuth 認証方法については、ヘルプドキュメントをご確認ください。
- topic.prefix: コネクタによって作成されるKafka トピックに追加されるプレフィックスです。「salesforcedatacloud-01-」に設定されています。
- mode: コネクタの動作モードを指定します。ここでは「bulk」に設定されており、コネクタがバルクデータ転送を実行するように設定されていることを示しています。
このリクエストにより、Salesforce Data Cloud のすべてのテーブル/コンテンツがKafka トピックとして追加されます。
注意: リクエストをPOST するIP アドレス(サーバー)は、Linux ネットワークのIP アドレスです。
- ksqlDB を実行し、トピックを一覧表示します。以下のコマンドを使用します:
ksql list topics;
- トピック内のデータを表示するには、以下のSQL ステートメントを入力します:
PRINT topic FROM BEGINNING;
Confluent Control Center への接続
Confluent Control Center のユーザーインターフェースにアクセスするには、上記のセクションで説明した"confluent local services" を実行し、ローカルブラウザでhttp://<server address>:9021/clusters/ と入力してください。
おわりに
CData JDBC Driver for Salesforce Data Cloudの30日間無償トライアルをダウンロードして、Salesforce Data Cloud データをApache Kafka にストリーミングしましょう。ご不明な点があれば、サポートチームまでお問い合わせください。