Snowflake のデータをApache Kafka トピックにストリーミング

Dibyendu Datta
Dibyendu Datta
Lead Technology Evangelist
CData JDBC Driver とKafka Connect JDBC コネクタを使用して、Apache Kafka でSnowflake のデータにアクセスし、ストリーミングできます。

Apache Kafka は、主にリアルタイムデータパイプラインやイベント駆動型アプリケーションの構築に使用されるオープンソースのストリーム処理プラットフォームです。CData JDBC Driver for Snowflakeと組み合わせることで、Kafka はライブのSnowflake のデータを扱うことができます。この記事では、Snowflake データをApache Kafka トピックに接続、アクセス、ストリーミングする方法と、Confluent Control Center を起動してConfluent プラットフォームのKafka インフラストラクチャを使用して受信したSnowflake のデータをユーザーが安全に管理および監視できるようにする方法について説明します。

CData JDBC Driver は最適化されたデータ処理機能を内蔵しており、ライブのSnowflake のデータとのやり取りにおいて比類のないパフォーマンスを提供します。Snowflake に複雑なSQL クエリを発行すると、ドライバーはフィルタや集計などのサポートされているSQL 操作を直接Snowflake にプッシュし、サポートされていない操作(多くの場合SQL 関数やJOIN 操作)については組み込みのSQL エンジンを使用してクライアント側で処理します。組み込みの動的メタデータクエリにより、ネイティブのデータ型を使用してSnowflake のデータを操作および分析できます。

Snowflake データ連携について

CData は、Snowflake のライブデータへのアクセスと統合を簡素化します。お客様は CData の接続機能を以下の目的で活用しています:

  • Snowflake データを迅速かつ効率的に読み書きできます。
  • 指定された Warehouse、Database、Schema のメタデータを動的に取得できます。
  • OAuth、OKTA、Azure AD、Azure マネージド サービス ID、PingFederate、秘密鍵など、さまざまな方法で認証できます。

多くの CData ユーザーは、CData ソリューションを使用して、お気に入りのツールやアプリケーションから Snowflake にアクセスし、さまざまなシステムからデータを Snowflake にレプリケートして、包括的なウェアハウジングと分析を行っています。

CData ソリューションとの Snowflake 統合についての詳細は、ブログをご覧ください:https://jp.cdata.com/blog/snowflake-integrations


はじめに


前提条件

Apache Kafka トピックでSnowflake のデータをストリーミングするためにCData JDBC Driver を接続する前に、クライアントのLinux ベースのシステムに以下をインストールおよび設定してください。

  1. Confluent Platform for Apache Kafka
  2. Confluent Hub CLI のインストール
  3. Confluent Platform 用のSelf-Managed Kafka JDBC Source Connector

Snowflake のデータへの新しいJDBC 接続を定義

  1. Linux ベースのシステムにCData JDBC Driver for Snowflakeをダウンロードします。
  2. 以下の手順に従って新しいディレクトリを作成し、すべてのドライバーの内容を展開します:
    1. Snowflake という名前の新しいディレクトリを作成します。
      		mkdir Snowflake
      		
    2. ダウンロードしたドライバーファイル(.zip)をこの新しいディレクトリに移動します。
      		mv SnowflakeJDBCDriver.zip Snowflake/
      		
    3. CData SnowflakeJDBCDriver の内容をこの新しいディレクトリに解凍します。
      		unzip SnowflakeJDBCDriver.zip
      		
  3. Snowflake ディレクトリを開き、lib フォルダに移動します。
    ls
    cd lib/
    
  4. CData JDBC Driver for Snowflakelib フォルダの内容をKafka Connect JDBClib フォルダにコピーします。Kafka Connect JDBC フォルダの内容を確認し、cdata.jdbc.snowflake.jar ファイルがlib フォルダに正常にコピーされたことを確認します。
    cp -r /path/to/CData JDBC Driver for Snowflake/lib/* /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/
    cd /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/
    
  5. 以下のコマンドを使用して、CData Snowflake JDBC ドライバーのライセンスをインストールします。名前とメールアドレスを入力してください。
    	java -jar cdata.jdbc.snowflake.jar -l
    	
  6. プロダクトキーまたは"TRIAL" を入力します(ライセンスの有効期限が切れた場合は、CData サポートチームまでお問い合わせください)。
  7. 以下のコマンドを使用してConfluent ローカルサービスを起動します:
    	confluent local services start
    	

    これにより、Zookeeper、Kafka、Schema Registry、Kafka REST、Kafka CONNECT、ksqlDB、Control Center などのすべてのConfluent サービスが起動します。これで、CData JDBC Driver for Snowflake を使用してKafka Connect Driver 経由でksqlDB のKafka トピックにメッセージをストリーミングする準備が整いました。

    Confluent ローカルサービスを起動
  8. POST HTTP API リクエストを使用してKafka トピックを手動で作成します:
     curl --location 'server_address:8083/connectors'
    	--header 'Content-Type: application/json'
    	--data '{
    		"name": "jdbc_source_cdata_snowflake_01",
    		"config": {
    			"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    			"connection.url": "jdbc:snowflake:User=Admin;Password=test123;Server=localhost;Database=Northwind;Warehouse=TestWarehouse;Account=Tester1;",
    		"topic.prefix": "snowflake-01-",
    		"mode": "bulk"
    		}
    	}'
    

    HTTP POST 本文(上記)で使用されるフィールドについて説明します:

    • connector.class: 使用するKafka Connect コネクタのJava クラスを指定します。
    • connection.url: Snowflake データに接続するためのJDBC 接続URL です。

      組み込みの接続文字列デザイナー

      JDBC URL の作成については、CData JDBC Driver for Snowflakeに組み込まれた接続文字列デザイナーを使用してください。JAR ファイルをダブルクリックするか、コマンドラインからjar ファイルを実行します。

      		java -jar cdata.jdbc.snowflake.jar
      		

      接続プロパティを入力し、接続文字列をクリップボードにコピーします。

      それでは、Snowflake データベースに接続していきましょう。認証に加えて、以下の接続プロパティを設定します。

      • Url:お使いのSnowflake URL を指定します。例:https://orgname-myaccount.snowflakecomputing.com
        • Legacy URL を使用する場合:https://myaccount.region.snowflakecomputing.com
        • ご自身のURL は以下のステップで確認できます。
          1. Snowflake UI の左下にあるユーザー名をクリックします
          2. Account ID にカーソルを合わせます
          3. Copy Account URL アイコンをクリックして、アカウントURL をコピーします
      • Database(オプション):CData 製品によって公開されるテーブルとビューを、特定のSnowflake データベースのものに制限したい場合に設定します
      • Schema(オプション):CData 製品によって公開されるテーブルとビューを、特定のSnowflake データベーススキーマのものに制限したい場合に設定します

      Snowflakeへの認証

      CData 製品では、Snowflake ユーザー認証、フェデレーション認証、およびSSL クライアント認証をサポートしています。認証するには、UserPassword を設定し、AuthScheme プロパティで認証方法を選択してください。

      キーペア認証

      ユーザーアカウントに定義されたプライベートキーを使用してセキュアなトークンを作成し、キーペア認証で接続することも可能です。この方法で接続するには、AuthSchemePRIVATEKEY に設定し、以下の値を設定してください。

      • User:認証に使用するユーザーアカウント
      • PrivateKey:プライベートキーを含む.pem ファイルへのパスなど、ユーザーに使用されるプライベートキー
      • PrivateKeyType:プライベートキーを含むキーストアの種類(PEMKEY_FILE、PFXFILE など)
      • PrivateKeyPassword:指定されたプライベートキーのパスワード

      その他の認証方法については、ヘルプドキュメントの「Snowflakeへの認証」セクションをご確認ください。

      組み込みの接続文字列デザイナーを使用してJDBC URL を生成(Salesforce の例)
    • topic.prefix: コネクタによって作成されるKafka トピックに追加されるプレフィックスです。「snowflake-01-」に設定されています。
    • mode: コネクタの動作モードを指定します。ここでは「bulk」に設定されており、コネクタがバルクデータ転送を実行するように設定されていることを示しています。

    このリクエストにより、Snowflake のすべてのテーブル/コンテンツがKafka トピックとして追加されます。

    注意: リクエストをPOST するIP アドレス(サーバー)は、Linux ネットワークのIP アドレスです。

  9. ksqlDB を実行し、トピックを一覧表示します。以下のコマンドを使用します:
    ksql
    list topics;
    
    Kafka トピックを一覧表示(BigCommerce の例)
  10. トピック内のデータを表示するには、以下のSQL ステートメントを入力します:
    PRINT topic FROM BEGINNING;
    

Confluent Control Center への接続

Confluent Control Center のユーザーインターフェースにアクセスするには、上記のセクションで説明した"confluent local services" を実行し、ローカルブラウザでhttp://<server address>:9021/clusters/ と入力してください。

Confluent Control Center に接続

おわりに

CData JDBC Driver for Snowflakeの30日間無償トライアルをダウンロードして、Snowflake データをApache Kafka にストリーミングしましょう。ご不明な点があれば、サポートチームまでお問い合わせください。

はじめる準備はできましたか?

Snowflake Driver の無料トライアルをダウンロードしてお試しください:

 ダウンロード

詳細:

Snowflake Enterprise Data Warehouse Icon Snowflake JDBC Driver お問い合わせ

驚くほど簡単にJDBC でJava アプリケーションからSnowflake にデータ連携!