本記事ではCData Salesforce Data Cloud ドライバーでデータレイクオブジェクトにデータを追加したり更新したりする方法と、その際の注意点について説明します。
CData Salesforce Data Cloud ドライバーはAPI を通してSalesforce Data Cloud へのアクセスを行います。Salesforce Data Cloud の更新API がサポートしている更新処理はUpsert (挿入または更新)とDelete (削除)です。Upsert はデータベース操作の一つで、レコードが存在しない場合は挿入(Insert)し、存在する場合は更新(Update)する便利な機能を持ちます。例えば、レコードの更新時に当該レコードの存在有無を事前に確認し、存在する場合はUpdate、存在しない場合はInsert といった一連の処理を行いたいとき、条件分岐によって2つの操作を使い分けることなく、Upsert という1つの処理で済ませることができます。
API は純粋なInsert 処理やUpdate 処理をサポートしないため、ドライバーもこれらの処理はサポートしません。そのためUpsert 句は実行できますが、Insert 句やUpdate 句を実行するとエラーが発生します。この挙動を変えてInsert 句やUpdate 句を実行できるようにする方法については後半で解説します。
データを更新するSQL例
データの更新はUpsert 句を使います。1つのSQLで1つのレコードを更新する単体のSQL と、複数のレコードを一度に処理するバルク処理の両方をサポートしています。
Upsert 句のSQL 例を以下に示します。このクエリではSampleObject というテスト用のデータレイクオブジェクトを更新します。このテスト用オブジェクトではId カラムがキーとして設定されています。そのためId=100のレコードがある場合はName とEmail を更新、レコードが無い場合は新規に追加するよう動作します。
UPSERT INTO SampleObject(Id,Name,Email) VALUES(100,'John','john@test.com');
バルク処理でUpsert を行うSQL 例を以下に示します。以下のように一時テーブル(末尾に#TEMPを付与)に追加または更新する複数のレコードを登録し、UPSERT SELECT 文でそれをまとめてAPI に送信します。このクエリでは3つのレコードをまとめて送信してUpsert 処理を行います。1つ1つのレコードについて同じキーのレコードがある場合は更新、無い場合は追加となります。
INSERT INTO SampleObject#TEMP(Id,Name,Email) VALUES(100,'John','john@test.com');
INSERT INTO SampleObject#TEMP(Id,Name,Email) VALUES(101,'Mike','mike@test.com');
INSERT INTO SampleObject#TEMP(Id,Name,Email) VALUES(102,'Andy','andy@test.com');
UPSERT INTO SampleObject(Id,Name,Email) SELECT Id, Name, Email FROM SampleObject#TEMP;
上と同じ処理をJava のバッチ処理で実装するコードの例を以下に示します。
Connection conn = DriverManager.getConnection("jdbc:salesforcedatacloud:",prop);
String query = "UPSERT INTO SampleObject(Id,Name,Email) VALUES(?,?,?)";
PreparedStatement pstmt = conn.prepareStatement(query);
pstmt.setString(1, "100");
pstmt.setString(2, "John");
pstmt.setString(3, "john@test.com");
pstmt.addBatch();
pstmt.setString(1, "101");
pstmt.setString(2, "Mike");
pstmt.setString(3, "mike@test.com");
pstmt.addBatch();
pstmt.setString(1, "102");
pstmt.setString(2, "Andy");
pstmt.setString(3, "andy@test.com");
pstmt.addBatch();
pstmt.executeBatch();
更新処理の動作例
以下に動作例として更新処理の実行前と実行後のデータを示します。
クエリ実行前
データレイクオブジェクトSampleObjecにテスト用データとして2件のレコードが登録されています。
クエリ実行後
以下のクエリを実行した後のデータを示します。
INSERT INTO SampleObject#TEMP(Id,Name,Email) VALUES(101,'Mike','mike@test.com');
INSERT INTO SampleObject#TEMP(Id,Name,Email) VALUES(102,'Andy','andy@test.com');
UPSERT INTO SampleObject(Id,Name,Email) SELECT Id, Name, Email FROM SampleObject#TEMP;
Id=102 のレコードが追加され、存在するId=101 のレコードのName とEmail が更新されました。
注意点
単体、バルクどちらの処理もクエリの完了からデータが実際に更新されるまで時間がかかることに注意が必要です。テスト環境ではデータの反映まで単体のSQLで3~5分、バルク処理で5分~8分ほどの時間がかかることを確認しています。もし更新したデータに対して何らかの処理を行う一連のプログラムやフローを構築する場合はこの更新時間を考慮する必要があります。
Insert句やUpdate句でUpsert処理を実行する方法
ドライバーはデータの追加、更新のためのSQL としてUpsert 句をサポートしますが、ツールによってはInsert 句やUpdate 句など典型的なSQL 文以外を実行できない場合があります。その際はEnableAsUpsert プロパティを設定することで、Insert 句やUpdate 句でUpsert 処理を実行するよう制御を変えることができます。たとえばEnableAsUpsert に「INSERT」を設定するとInsert 句を実行したときにUpsert 処理を実行します。EnableAsUpsert プロパティの設定値とそのときの動作は以下の表を参照してください。
EnableAsUpsert プロパティの設定値と動作
| プロパティ値 |
Insert句の挙動 |
Update句の挙動 |
| NONE |
エラーを発生 |
エラーを発生 |
| INSERT |
Upsert処理を実行 |
エラーを発生 |
| UPDATE |
エラーを発生 |
Upsert処理を実行 |
| ALL |
Upsert処理を実行 |
Upsert処理を実行 |
注意点としてはInsert 句、Update 句、Upsert 句のどれを実行してもAPI のUpsert 処理が実行されることです。たとえばEnableAsUpsert=ALL の場合、以下の3つのSQL の実行結果はすべて同じく、Id=100のレコードがあればName とEmail を更新、なければ新規追加という結果になります。これはバルク処理でも同様です。
// 全て同じ結果
// INSERT
INSERT INTO SampleObject(Id,Name,Email) VALUES(100,John,john@test.com);
// UPSERT
UPSERT INTO SampleObject(Id,Name,Email) VALUES(100,John,john@test.com);
// UPDATE
UPDATE SampleObject SET Name = 'John', Email = 'john@test.com' WHERE Id = 100;
おわりに
本記事ではSalesforce Data Cloud ドライバーでデータレイクにデータを書き込んだり更新したりする方法とその注意点を説明しました。Salesforce Data Cloud ドライバーは30日間無料で試用できます。Salesforce Data Cloud との連携をご検討の方はぜひお試しください