JSON ServicesをApache Kafka トピックにストリーミング

Dibyendu Datta
Dibyendu Datta
Lead Technology Evangelist
CData JDBC Driver とKafka Connect JDBC コネクタを使用して、Apache Kafka でJSON servicesにアクセスし、ストリーミングできます。

Apache Kafka は、主にリアルタイムデータパイプラインやイベント駆動型アプリケーションの構築に使用されるオープンソースのストリーム処理プラットフォームです。CData JDBC Driver for JSONと組み合わせることで、Kafka はライブのJSON servicesを扱うことができます。この記事では、JSON データをApache Kafka トピックに接続、アクセス、ストリーミングする方法と、Confluent Control Center を起動してConfluent プラットフォームのKafka インフラストラクチャを使用して受信したJSON servicesをユーザーが安全に管理および監視できるようにする方法について説明します。

CData JDBC Driver は最適化されたデータ処理機能を内蔵しており、ライブのJSON servicesとのやり取りにおいて比類のないパフォーマンスを提供します。JSON に複雑なSQL クエリを発行すると、ドライバーはフィルタや集計などのサポートされているSQL 操作を直接JSON にプッシュし、サポートされていない操作(多くの場合SQL 関数やJOIN 操作)については組み込みのSQL エンジンを使用してクライアント側で処理します。組み込みの動的メタデータクエリにより、ネイティブのデータ型を使用してJSON servicesを操作および分析できます。

前提条件

Apache Kafka トピックでJSON servicesをストリーミングするためにCData JDBC Driver を接続する前に、クライアントのLinux ベースのシステムに以下をインストールおよび設定してください。

  1. Confluent Platform for Apache Kafka
  2. Confluent Hub CLI のインストール
  3. Confluent Platform 用のSelf-Managed Kafka JDBC Source Connector

JSON servicesへの新しいJDBC 接続を定義

  1. Linux ベースのシステムにCData JDBC Driver for JSONをダウンロードします。
  2. 以下の手順に従って新しいディレクトリを作成し、すべてのドライバーの内容を展開します:
    1. JSON という名前の新しいディレクトリを作成します。
      		mkdir JSON
      		
    2. ダウンロードしたドライバーファイル(.zip)をこの新しいディレクトリに移動します。
      		mv JSONJDBCDriver.zip JSON/
      		
    3. CData JSONJDBCDriver の内容をこの新しいディレクトリに解凍します。
      		unzip JSONJDBCDriver.zip
      		
  3. JSON ディレクトリを開き、lib フォルダに移動します。
    ls
    cd lib/
    
  4. CData JDBC Driver for JSONlib フォルダの内容をKafka Connect JDBClib フォルダにコピーします。Kafka Connect JDBC フォルダの内容を確認し、cdata.jdbc.json.jar ファイルがlib フォルダに正常にコピーされたことを確認します。
    cp -r /path/to/CData JDBC Driver for JSON/lib/* /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/
    cd /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/
    
  5. 以下のコマンドを使用して、CData JSON JDBC ドライバーのライセンスをインストールします。名前とメールアドレスを入力してください。
    	java -jar cdata.jdbc.json.jar -l
    	
  6. プロダクトキーまたは"TRIAL" を入力します(ライセンスの有効期限が切れた場合は、CData サポートチームまでお問い合わせください)。
  7. 以下のコマンドを使用してConfluent ローカルサービスを起動します:
    	confluent local services start
    	

    これにより、Zookeeper、Kafka、Schema Registry、Kafka REST、Kafka CONNECT、ksqlDB、Control Center などのすべてのConfluent サービスが起動します。これで、CData JDBC Driver for JSON を使用してKafka Connect Driver 経由でksqlDB のKafka トピックにメッセージをストリーミングする準備が整いました。

    Confluent ローカルサービスを起動
  8. POST HTTP API リクエストを使用してKafka トピックを手動で作成します:
     curl --location 'server_address:8083/connectors'
    	--header 'Content-Type: application/json'
    	--data '{
    		"name": "jdbc_source_cdata_json_01",
    		"config": {
    			"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    			"connection.url": "jdbc:json:URI=C:/people.json;DataModel=Relational;",
    		"topic.prefix": "json-01-",
    		"mode": "bulk"
    		}
    	}'
    

    HTTP POST 本文(上記)で使用されるフィールドについて説明します:

    • connector.class: 使用するKafka Connect コネクタのJava クラスを指定します。
    • connection.url: JSON データに接続するためのJDBC 接続URL です。

      組み込みの接続文字列デザイナー

      JDBC URL の作成については、CData JDBC Driver for JSONに組み込まれた接続文字列デザイナーを使用してください。JAR ファイルをダブルクリックするか、コマンドラインからjar ファイルを実行します。

      		java -jar cdata.jdbc.json.jar
      		

      接続プロパティを入力し、接続文字列をクリップボードにコピーします。

      データソースへの認証については、ヘルプドキュメントの「はじめに」を参照してください。CData 製品は、JSON API を双方向データベーステーブルとして、JSON ファイルを読み取り専用ビュー(ローカル ファイル、一般的なクラウドサービスに保存されているファイル、FTP サーバー)としてモデル化します。HTTP Basic、Digest、NTLM、OAuth、FTP などの主要な認証スキームがサポートされています。詳細はヘルプドキュメントの「はじめに」を参照してください。

      URI を設定して認証値を入力したら、DataModel を設定してデータ表現とデータ構造をより厳密に一致させます。

      DataModel プロパティは、データをどのようにテーブルに表現するかを制御するプロパティで、次の基本設定を切り替えます。

      • Document(デフォルト):JSON データのトップレベルのドキュメントビューをモデル化します。CData 製品 は、ネストされたオブジェクト配列を集約されたJSON オブジェクトとして返します。
      • FlattenedDocuments:ネストされた配列オブジェクトと親オブジェクトを、単一テーブルに暗黙的に結合します。
      • Relational:階層データから個々の関連テーブルを返します。テーブルには、親ドキュメントにリンクする主キーと外部キーが含まれています。

      リレーショナル表現の設定についての詳細は、ヘルプドキュメントの「JSON データのモデリング」を参照してください。また、以下の例で使用されているサンプルデータも確認できます。データには人や所有する車、それらの車に行われたさまざまなメンテナンスサービスのエントリが含まれています。

      Amazon S3 内のJSON への接続

      URI をバケット内のJSON ドキュメントに設定します。さらに、次のプロパティを設定して認証します。

      • AWSAccessKey:AWS アクセスキー(username)に設定。
      • AWSSecretKey:AWS シークレットキーに設定。

      Box 内のJSON への接続

      URI をJSON ファイルへのパスに設定します。Box へ認証するには、OAuth 認証標準を使います。 認証方法については、Box への接続 を参照してください。

      Dropbox 内のJSON への接続

      URI をJSON ファイルへのパスに設定します。Dropbox へ認証するには、OAuth 認証標準を使います。 認証方法については、Dropbox への接続 を参照してください。ユーザーアカウントまたはサービスアカウントで認証できます。ユーザーアカウントフローでは、以下の接続文字列で示すように、ユーザー資格情報の接続プロパティを設定する必要はありません。 URI=dropbox://folder1/file.json; InitiateOAuth=GETANDREFRESH; OAuthClientId=oauthclientid1; OAuthClientSecret=oauthcliensecret1; CallbackUrl=http://localhost:12345;

      SharePoint Online SOAP 内のJSON への接続

      URI をJSON ファイルを含むドキュメントライブラリに設定します。認証するには、User、Password、およびStorageBaseURL を設定します。

      SharePoint Online REST 内のJSON への接続

      URI をJSON ファイルを含むドキュメントライブラリに設定します。StorageBaseURL は任意です。指定しない場合、ドライバーはルートドライブで動作します。 認証するには、OAuth 認証標準を使用します。

      FTP 内のJSON への接続

      URI をJSON ファイルへのパスが付いたサーバーのアドレスに設定します。認証するには、User およびPassword を設定します。

      Google Drive 内のJSON への接続

      デスクトップアプリケーションからのGoogle への認証には、InitiateOAuth をGETANDREFRESH に設定して、接続してください。詳細はドキュメントの「Google Drive への接続」を参照してください。

      組み込みの接続文字列デザイナーを使用してJDBC URL を生成(Salesforce の例)
    • topic.prefix: コネクタによって作成されるKafka トピックに追加されるプレフィックスです。「json-01-」に設定されています。
    • mode: コネクタの動作モードを指定します。ここでは「bulk」に設定されており、コネクタがバルクデータ転送を実行するように設定されていることを示しています。

    このリクエストにより、JSON のすべてのテーブル/コンテンツがKafka トピックとして追加されます。

    注意: リクエストをPOST するIP アドレス(サーバー)は、Linux ネットワークのIP アドレスです。

  9. ksqlDB を実行し、トピックを一覧表示します。以下のコマンドを使用します:
    ksql
    list topics;
    
    Kafka トピックを一覧表示(BigCommerce の例)
  10. トピック内のデータを表示するには、以下のSQL ステートメントを入力します:
    PRINT topic FROM BEGINNING;
    

Confluent Control Center への接続

Confluent Control Center のユーザーインターフェースにアクセスするには、上記のセクションで説明した"confluent local services" を実行し、ローカルブラウザでhttp://<server address>:9021/clusters/ と入力してください。

Confluent Control Center に接続

おわりに

CData JDBC Driver for JSONの30日間無償トライアルをダウンロードして、JSON データをApache Kafka にストリーミングしましょう。ご不明な点があれば、サポートチームまでお問い合わせください。

はじめる準備はできましたか?

JSON Driver の無料トライアルをダウンロードしてお試しください:

 ダウンロード

詳細:

JSON Icon JSON JDBC Driver お問い合わせ

JSON Web サービス連携のパワフルなJava アプリケーションを素早く作成して配布。