Apache NiFi でHubDB のデータにバッチ操作を実行
Apache NiFi は、データルーティング、変換、およびシステム間連携ロジックを強力かつスケーラブルに構築できるプラットフォームです。CData JDBC Driver for HubDB と組み合わせることで、NiFi からリアルタイムHubDB のデータを操作できるようになります。この記事では、CSV ファイルからデータを読み取り、Apache NiFi(バージョン1.9.0 以降)でCData JDBC Driver forHubDB のデータを使用してバッチ操作(INSERT/UPDATE/DELETE)を実行する方法を説明します。
最適化されたデータ処理が組み込まれたCData JDBC Driver は、リアルタイムHubDB のデータを扱う上で比類のないパフォーマンスを提供します。HubDB に複雑なSQL クエリを発行すると、ドライバーはフィルタや集計などのサポートされているSQL 操作をHubDB に直接プッシュし、サポートされていない操作(主にSQL 関数やJOIN 操作)は組み込みSQL エンジンを利用してクライアント側で処理します。組み込みの動的メタデータクエリを使用すると、ネイティブデータ型を使ってHubDB のデータを操作・分析できます。
JDBC URL の生成
Apache NiFi からHubDB のデータに接続するには、JDBC URL が必要です。
組み込みの接続文字列デザイナー
JDBC URL の作成をサポートするために、HubDB JDBC Driver に組み込まれている接続文字列デザイナーが使用できます。JAR ファイルをダブルクリックするか、コマンドラインからJAR ファイルを実行します。
java -jar cdata.jdbc.hubdb.jar
接続プロパティを入力し、接続文字列をクリップボードにコピーします。
HubDBデータソースへの接続には、パブリックHubSpotアプリケーションを使用したOAuth認証とプライベートアプリケーショントークンを使用した認証の2つの方法があります。
カスタムOAuthアプリを使用する
すべてのOAuthフローでAuthSchemeを"OAuth"に設定する必要があります。特定の認証ニーズ(デスクトップアプリケーション、Webアプリケーション、ヘッドレスマシン)に必要な接続プロパティについては、ヘルプドキュメントを確認してください。
アプリケーションを登録し、OAuthクライアント認証情報を取得するには、以下の手順を実行してください。
- HubSpotアプリ開発者アカウントにログインします。
- アプリ開発者アカウントである必要があります。標準のHubSpotアカウントではパブリックアプリを作成できません。
- 開発者アカウントのホームページで、アプリタブをクリックします。
- アプリを作成をクリックします。
- アプリ情報タブで、ユーザーが接続する際に表示される値を入力し、必要に応じて変更します。これらの値には、パブリックアプリケーション名、アプリケーションロゴ、アプリケーションの説明が含まれます。
- 認証タブで、「リダイレクトURL」ボックスにコールバックURLを入力します。
- デスクトップアプリケーションを作成する場合は、http://localhost:33333のようなローカルにアクセス可能なURLに設定します。
- Webアプリケーションを作成する場合は、ユーザーがアプリケーションを承認した際にリダイレクトされる信頼できるURLに設定します。
- アプリを作成をクリックします。HubSpotがアプリケーションとそれに関連する認証情報を生成します。
- 認証タブで、クライアントIDとクライアントシークレットを確認します。これらは後でドライバーを設定する際に使用します。
スコープの下で、アプリケーションの意図する機能に必要なスコープを選択します。
テーブルにアクセスするには、最低限以下のスコープが必要です:
- hubdb
- oauth
- crm.objects.owners.read
- 変更を保存をクリックします。
- 統合に必要な機能にアクセスできる本番ポータルにアプリケーションをインストールします。
- 「インストールURL(OAuth)」の下で、完全なURLをコピーをクリックして、アプリケーションのインストールURLをコピーします。
- コピーしたリンクをブラウザで開きます。アプリケーションをインストールする標準アカウントを選択します。
- アプリを接続をクリックします。結果のタブは閉じて構いません。
プライベートアプリを使用する
HubSpotプライベートアプリケーショントークンを使用して接続するには、AuthSchemeプロパティを"PrivateApp"に設定します。
以下の手順に従ってプライベートアプリケーショントークンを生成できます:
- HubDBアカウントで、メインナビゲーションバーの設定アイコン(歯車)をクリックします。
- 左サイドバーメニューで、統合 > プライベートアプリに移動します。
- プライベートアプリを作成をクリックします。
- 基本情報タブで、アプリケーションの詳細(名前、ロゴ、説明)を設定します。
- スコープタブで、プライベートアプリケーションがアクセスできるようにしたい各スコープに対して読み取りまたは書き込みを選択します。
- テーブルにアクセスするには、最低限hubdbとcrm.objects.owners.readが必要です。
- アプリケーションの設定が完了したら、右上のアプリを作成をクリックします。
- アプリケーションのアクセストークンに関する情報を確認し、作成を続行をクリックし、その後トークンを表示をクリックします。
- コピーをクリックして、プライベートアプリケーショントークンをコピーします。
接続するには、PrivateAppTokenを取得したプライベートアプリケーショントークンに設定します。
Apache NiFi でのバッチ操作(INSERT/UPDATE/DELETE)
以下のサンプルフローは、次のNiFi プロセッサに基づいています:
- ListFile - ローカルファイルシステムからファイルリストを取得し、取得した各ファイルにFlowFile を作成します。
- FetchFile - ListFile プロセッサから受け取ったFlowFile のコンテンツを読み取ります。
- PutDatabaseRecord - 指定されたRecordReader を使用して、FetchFile プロセッサからのフローファイルからレコードを入力します。これらのレコードはSQL 文に変換され、単一のトランザクションとして実行されます。
- LogAttribute - 指定されたログレベルでFlowFile の属性を出力します。
完成したフローは以下のようになります:
注意事項
1. CSV ファイルのカラム名は、挿入/更新/削除するデータソーステーブルのレコードのカラム名と一致する必要があります。
2. Apache NiFi バージョン1.9.0 より前のバージョンでは、PutDatabaseRecord プロセッサの Maximum Batch Size プロパティがサポートされていません。
設定
バッチINSERT、UPDATE、またはDELETE を実行するには、NiFi プロセッサを以下のように設定します:
- ListFile プロセッサの設定:Input Directory プロパティを、CSV ファイルを取得するローカルフォルダパスに設定します。 File Filter プロパティを、名前が式に一致するファイルのみを選択する正規表現に設定します。 例:CSV ファイルのフルパスがC:\Users\Public\Documents\InsertNiFi.csv の場合、プロパティは以下の画像のように設定します:
- FetchFile プロセッサの設定 FetchFile プロセッサのプロパティ設定はデフォルト値のままにします:
- PutDatabaseRecord プロセッサの設定
- Record Reader プロパティをCSV Reader Controller Service に設定します。CSV Reader Controller Service を、CSV ファイルの形式に合わせて設定します。
- Statement Type プロパティをINSERT に設定します。
-
Database Connection Pooling Service を、ドライバー設定を保持するDBCPConnection Pool に設定します。ドライバーはBulk API を使用するように設定する必要があります。
プロパティ 値 Database Connection URL jdbc:hubdb:AuthScheme=OAuth;OAuthClientID=MyOAuthClientID;OAuthClientSecret=MyOAuthClientSecret;CallbackURL=http://localhost:33333; Database Driver Class Name cdata.jdbc.hubdb.HubDBDriver - Catalog Name プロパティを、テーブルが属するカタログの名前に設定します。
- Schema Name プロパティを、テーブルが属するスキーマの名前に設定します。
- Table Name プロパティを、INSERT 先のテーブル名に設定します。
-
Maximum Batch Size プロパティを、単一のバッチに含めるレコードの最大数に設定します。
-
Record Reader プロパティをCSV Reader Controller Service に設定します。CSV Reader Controller Service を、CSV ファイルの形式に合わせて設定します。
- Statement Type プロパティをUPDATE に設定します。
- Database Connection Pooling Service を、ドライバー設定を保持するDBCPConnection Pool に設定します。ドライバーはBulk API を使用するように設定する必要があります。上記と同じDatabase Connection URL 形式を使用します。
- Catalog Name プロパティを、テーブルが属するカタログの名前に設定します。
- Schema Name プロパティを、テーブルが属するスキーマの名前に設定します。
- Table Name プロパティを、UPDATE 対象のテーブル名に設定します。
- Update Keys プロパティを、UPDATE に必要なカラム名に設定します。
-
Maximum Batch Size プロパティを、単一のバッチに含めるレコードの最大数に設定します。
-
Record Reader プロパティをCSV Reader Controller Service に設定します。CSV Reader Controller Service を、CSV ファイルの形式に合わせて設定します。
- Statement Type プロパティをDELETE に設定します。
- Database Connection Pooling Service を、ドライバー設定を保持するDBCPConnection Pool に設定します。ドライバーはBulk API を使用するように設定する必要があります。上記と同じDatabase Connection URL 形式を使用します。
- Catalog Name プロパティを、テーブルが属するカタログの名前に設定します。
- Schema Name プロパティを、テーブルが属するスキーマの名前に設定します。
- Table Name プロパティを、UPDATE 対象のテーブル名に設定します。
- INSERT やUPDATE のStatement Type とは異なり、DELETE 操作ではMaximum Batch Size プロパティは表示されません。ただし、操作は引き続きバッチで処理されます。変更しない場合、バッチあたりの最大レコード数はデフォルト値の2000 です。DELETE 操作で使用するMaximum Batch Size の値を変更するには、Statement Type をINSERT またはUPDATE に変更し、Maximum Batch Size プロパティの値を変更して「Apply Changes」をクリックします。最後に、プロセッサの設定を再度開き、Statement Type をDELETE に戻して「Apply Changes」をクリックします。
-
LogAttribute プロセッサの設定
最後に、LogAttribute プロセッサを設定して、ユースケースに基づいてログに記録または無視する属性とログレベルを指定します。
INSERT 操作
バッチINSERT 操作を実行するには、PutDatabaseRecord プロセッサを以下のように設定します:
UPDATE 操作
バッチUPDATE 操作を実行するには、PutDatabaseRecord プロセッサを以下のように設定します:
DELETE 操作
バッチDELETE 操作を実行するには、PutDatabaseRecord プロセッサを以下のように設定します:
無償トライアルと詳細情報
CData JDBC Driver for HubDB の30日間無償トライアルをダウンロードして、Apache NiFi でリアルタイムHubDB のデータの操作をはじめましょう。ご不明な点があれば、サポートチームにお問い合わせください。