Apache NiFi でNetSuite のデータにバッチ操作を実行
Apache NiFi は、データルーティング、変換、およびシステム間連携ロジックを強力かつスケーラブルに構築できるプラットフォームです。CData JDBC Driver for NetSuite と組み合わせることで、NiFi からリアルタイムNetSuite のデータを操作できるようになります。この記事では、CSV ファイルからデータを読み取り、Apache NiFi(バージョン1.9.0 以降)でCData JDBC Driver forNetSuite のデータを使用してバッチ操作(INSERT/UPDATE/DELETE)を実行する方法を説明します。
最適化されたデータ処理が組み込まれたCData JDBC Driver は、リアルタイムNetSuite のデータを扱う上で比類のないパフォーマンスを提供します。NetSuite に複雑なSQL クエリを発行すると、ドライバーはフィルタや集計などのサポートされているSQL 操作をNetSuite に直接プッシュし、サポートされていない操作(主にSQL 関数やJOIN 操作)は組み込みSQL エンジンを利用してクライアント側で処理します。組み込みの動的メタデータクエリを使用すると、ネイティブデータ型を使ってNetSuite のデータを操作・分析できます。
NetSuite データ連携について
CData は、Oracle NetSuite のライブデータにアクセスし、統合するための最も簡単な方法を提供します。お客様は CData の接続機能を以下の目的で使用しています:
- Standard、CRM、OneWorld を含む、すべてのエディションの NetSuite にアクセスできます。
- SuiteTalk API(SOAP ベース)のすべてのバージョンと、SQL のように機能し、より簡単なデータクエリと操作を可能にする SuiteQL に接続できます。
- Saved Searches のサポートにより、事前定義されたレポートとカスタムレポートにアクセスできます。
- トークンベースおよび OAuth 2.0 で安全に認証でき、あらゆるユースケースで互換性とセキュリティを確保します。
- SQL ストアドプロシージャを使用して、ファイルのアップロード・ダウンロード、レコードや関連付けのアタッチ・デタッチ、ロールの取得、追加のテーブルやカラム情報の取得、ジョブ結果の取得などの機能的なアクションを実行できます。
お客様は、Power BI や Excel などのお気に入りの分析ツールからライブ NetSuite データにアクセスするために CData ソリューションを使用しています。また、CData Sync を直接使用するか、Azure Data Factory などの他のアプリケーションとの CData の互換性を活用して、NetSuite データを包括的なデータベースやデータウェアハウスに統合しています。CData は、Oracle NetSuite のお客様が NetSuite からデータを取得し、NetSuite にデータをプッシュするアプリを簡単に作成できるよう支援し、他のソースからのデータを NetSuite と統合することを可能にしています。
当社の Oracle NetSuite ソリューションの詳細については、ブログをご覧ください:Drivers in Focus Part 2: Replicating and Consolidating ... NetSuite Accounting Data
はじめに
JDBC URL の生成
Apache NiFi からNetSuite のデータに接続するには、JDBC URL が必要です。
組み込みの接続文字列デザイナー
JDBC URL の作成をサポートするために、NetSuite JDBC Driver に組み込まれている接続文字列デザイナーが使用できます。JAR ファイルをダブルクリックするか、コマンドラインからJAR ファイルを実行します。
java -jar cdata.jdbc.netsuite.jar
接続プロパティを入力し、接続文字列をクリップボードにコピーします。
NetSuiteへの接続
NetSuite では、2種類のAPI でデータにアクセスできます。どちらのAPI を使用するかは、Schema 接続プロパティで以下のいずれかを選択して指定してください。
- SuiteTalk は、NetSuite との通信に使用されるSOAP ベースの従来から提供されているサービスです。幅広いエンティティをサポートし、INSERT / UPDATE / DELETE の操作も対応しています。ただし、SuiteQL API と比べるとデータの取得速度が劣ります。また、サーバーサイドでのJOIN に対応していないため、これらの処理はCData 製品がクライアントサイドで実行します。
- SuiteQL は、より新しいAPI です。JOIN、GROUP BY、集計、カラムフィルタリングをサーバーサイドで処理できるため、SuiteTalk よりもはるかに高速にデータを取得できます。ただし、NetSuite データへのアクセスは読み取り専用となります。
データの取得のみが目的でしたらSuiteQL をお勧めします。データの取得と変更の両方が必要な場合は、SuiteTalk をお選びください。
NetSuite への認証
CData 製品では、以下の認証方式がご利用いただけます。
- トークンベース認証(TBA)はOAuth1.0に似た仕組みです。2020.2以降のSuiteTalk とSuiteQL の両方で利用できます。
- OAuth 2.0 認証(OAuth 2.0 認可コードグラントフロー)は、SuiteQL でのみご利用いただけます。
- OAuth JWT 認証は、OAuth2.0 クライアント認証フローの一つで、クライアント認証情報を含むJWT を使用してNetSuite データへのアクセスを要求します。
トークンベース認証(OAuth1.0)
トークンベース認証(TBA)は、基本的にOAuth 1.0 の仕組みです。この認証方式はSuiteTalk とSuiteQL の両方でサポートされています。管理者権限をお持ちの方がNetSuite UI 内でOAuthClientId、OAuthClientSecret、OAuthAccessToken、OAuthAccessTokenSecret を直接作成することで設定できます。 NetSuite UI でのトークン作成手順については、ヘルプドキュメントの「はじめに」セクションをご参照ください。
アクセストークンを作成したら、以下の接続プロパティを設定して接続してみましょう。
- AuthScheme = Token
- AccountId = 接続先のアカウント
- OAuthClientId = アプリケーション作成時に表示されるコンシューマーキー
- OAuthClientSecret = アプリケーション作成時に表示されるコンシューマーシークレット
- OAuthAccessToken = アクセストークン作成時のトークンID
- OAuthAccessTokenSecret = アクセストークン作成時のトークンシークレット
その他の認証方法については、ヘルプドキュメントの「はじめに」をご確認ください。
Apache NiFi でのバッチ操作(INSERT/UPDATE/DELETE)
以下のサンプルフローは、次のNiFi プロセッサに基づいています:
- ListFile - ローカルファイルシステムからファイルリストを取得し、取得した各ファイルにFlowFile を作成します。
- FetchFile - ListFile プロセッサから受け取ったFlowFile のコンテンツを読み取ります。
- PutDatabaseRecord - 指定されたRecordReader を使用して、FetchFile プロセッサからのフローファイルからレコードを入力します。これらのレコードはSQL 文に変換され、単一のトランザクションとして実行されます。
- LogAttribute - 指定されたログレベルでFlowFile の属性を出力します。
完成したフローは以下のようになります:
注意事項
1. CSV ファイルのカラム名は、挿入/更新/削除するデータソーステーブルのレコードのカラム名と一致する必要があります。
2. Apache NiFi バージョン1.9.0 より前のバージョンでは、PutDatabaseRecord プロセッサの Maximum Batch Size プロパティがサポートされていません。
設定
バッチINSERT、UPDATE、またはDELETE を実行するには、NiFi プロセッサを以下のように設定します:
- ListFile プロセッサの設定:Input Directory プロパティを、CSV ファイルを取得するローカルフォルダパスに設定します。 File Filter プロパティを、名前が式に一致するファイルのみを選択する正規表現に設定します。 例:CSV ファイルのフルパスがC:\Users\Public\Documents\InsertNiFi.csv の場合、プロパティは以下の画像のように設定します:
- FetchFile プロセッサの設定 FetchFile プロセッサのプロパティ設定はデフォルト値のままにします:
- PutDatabaseRecord プロセッサの設定
- Record Reader プロパティをCSV Reader Controller Service に設定します。CSV Reader Controller Service を、CSV ファイルの形式に合わせて設定します。
- Statement Type プロパティをINSERT に設定します。
-
Database Connection Pooling Service を、ドライバー設定を保持するDBCPConnection Pool に設定します。ドライバーはBulk API を使用するように設定する必要があります。
プロパティ 値 Database Connection URL jdbc:netsuite:AccountId=XABC123456;Schema=SuiteTalk;AuthScheme=Token;OAuthClientId=MyOAuthClientId;OAuthClientSecret=MyOAuthClientSecret;OAuthAccessToken=MyOAuthAccessToken;OAuthAccessTokenSecret=MyOAuthAccessTokenSecret; Database Driver Class Name cdata.jdbc.netsuite.NetSuiteDriver - Catalog Name プロパティを、テーブルが属するカタログの名前に設定します。
- Schema Name プロパティを、テーブルが属するスキーマの名前に設定します。
- Table Name プロパティを、INSERT 先のテーブル名に設定します。
-
Maximum Batch Size プロパティを、単一のバッチに含めるレコードの最大数に設定します。
-
Record Reader プロパティをCSV Reader Controller Service に設定します。CSV Reader Controller Service を、CSV ファイルの形式に合わせて設定します。
- Statement Type プロパティをUPDATE に設定します。
- Database Connection Pooling Service を、ドライバー設定を保持するDBCPConnection Pool に設定します。ドライバーはBulk API を使用するように設定する必要があります。上記と同じDatabase Connection URL 形式を使用します。
- Catalog Name プロパティを、テーブルが属するカタログの名前に設定します。
- Schema Name プロパティを、テーブルが属するスキーマの名前に設定します。
- Table Name プロパティを、UPDATE 対象のテーブル名に設定します。
- Update Keys プロパティを、UPDATE に必要なカラム名に設定します。
-
Maximum Batch Size プロパティを、単一のバッチに含めるレコードの最大数に設定します。
-
Record Reader プロパティをCSV Reader Controller Service に設定します。CSV Reader Controller Service を、CSV ファイルの形式に合わせて設定します。
- Statement Type プロパティをDELETE に設定します。
- Database Connection Pooling Service を、ドライバー設定を保持するDBCPConnection Pool に設定します。ドライバーはBulk API を使用するように設定する必要があります。上記と同じDatabase Connection URL 形式を使用します。
- Catalog Name プロパティを、テーブルが属するカタログの名前に設定します。
- Schema Name プロパティを、テーブルが属するスキーマの名前に設定します。
- Table Name プロパティを、UPDATE 対象のテーブル名に設定します。
- INSERT やUPDATE のStatement Type とは異なり、DELETE 操作ではMaximum Batch Size プロパティは表示されません。ただし、操作は引き続きバッチで処理されます。変更しない場合、バッチあたりの最大レコード数はデフォルト値の2000 です。DELETE 操作で使用するMaximum Batch Size の値を変更するには、Statement Type をINSERT またはUPDATE に変更し、Maximum Batch Size プロパティの値を変更して「Apply Changes」をクリックします。最後に、プロセッサの設定を再度開き、Statement Type をDELETE に戻して「Apply Changes」をクリックします。
-
LogAttribute プロセッサの設定
最後に、LogAttribute プロセッサを設定して、ユースケースに基づいてログに記録または無視する属性とログレベルを指定します。
INSERT 操作
バッチINSERT 操作を実行するには、PutDatabaseRecord プロセッサを以下のように設定します:
UPDATE 操作
バッチUPDATE 操作を実行するには、PutDatabaseRecord プロセッサを以下のように設定します:
DELETE 操作
バッチDELETE 操作を実行するには、PutDatabaseRecord プロセッサを以下のように設定します:
無償トライアルと詳細情報
CData JDBC Driver for NetSuite の30日間無償トライアルをダウンロードして、Apache NiFi でリアルタイムNetSuite のデータの操作をはじめましょう。ご不明な点があれば、サポートチームにお問い合わせください。