こんにちは、CData Software でエンジニアをしている宮本です。
先日、Google からDataform がGCP の傘下になったとアナウンスがありました。
cloud.google.com
自分自身、Dataform というサービスを知らなかったのですが、Webサイトやドキュメントなどを読んでみると、どうやらELT というか、ロードしたあとのデータ変換(ELTのTの部分)に特化したサービスというのがなんとなくわかってきました。
Dataform が接続できるデータウェアハウスは、BigQuery をはじめRedShift やSnowflake などに接続して利用できるようです。
機能面では、例えばBigQuery ではいくつかの処理を1つにまとめてストアドプロシージャとして作成し、それを外部から呼ぶことはできますが、Dataform ではその部分がより扱いやすくなった印象です。(まだ全容を把握できていませんが)
今回はそのDataform でいくつかBigQuery 上で行う処理を作成し、外部のELT ツールからDataform を実行するような構成を組んでみたいと思います。
※ELT について
最近使ったETL、ELTサービス(ツール)でデータ収集タスクについて考える / etl-elt-datacollect-task - Speaker Deck
やってみること

CData Sync でSaaS(今回はSalesforce) → BigQuery のロードを行ったあと、Dataform で①連携データのチェック、②連携用テーブルとメインテーブルのマージを行います。
※CDataSync では差分データのみの転送ができるが、BigQuery へはInsertAll で追加されていく形なので、メインテーブルを用意して差分データを毎回マージして管理するイメージ
ELT のEL (Extract Load)部分をCDataSync、T(Transform)をDataform という構成をとります。
CData Sync とは
クラウドサービスからデータベースへのノーコードレプリケートアプリケーションになります。レプリケートとは同期するという意味になりますので、CDataSync から接続できるクラウドサービスのデータをデータベースにまるっとコピーすることができます。
https://jp.cdata.com/
手順
CData Sync のインストール~ジョブ作成まで
データをBigQuery にロードする部分は、CDataSyncのインストールからSalesforce → BigQuery のジョブ実行を以下記事にて紹介していますので、本記事では省略します。
www.cdatablog.jp
※CDataSync は30日間の無償評価版があります。
ジョブ実行~メインテーブル作成
ジョブ実行
今回はLead_Sync という名前でBigQuery にテーブルを作成して連携するジョブを作成してみます。
以下のクエリをカスタムクエリボタンをクリックして入力します。
REPLICATE [Lead_Sync] SELECT * FROM [Lead]
作成したらチェックボックスをオンにして、実行ボタンを押すことでBigQuery にSalesforce のLead データが連携されます。(22件連携されたようです)
Salesforce のLead データに変更がない状態でもう一度実行すると、0件で完了しました。なので差分更新が効いていることが確認できますね。
メインテーブル作成
連携用テーブルはLead_Sync という名前で先ほど作成しました。Lead_Sync には常に前回からの差分データだけを存在するようなテーブルとします。
ただ初回ジョブ実行時はLead_Sync に全件連携されてきていますので、それををもとにしてメインテーブルを作成します。
メインテーブルの作成はBigQuery コンソールで「テーブルコピー」&「Insert into select * ~」で作成しました。最後に、連携用テーブル(Lead_Sync) にTruncate をかけてレコードを全削除しておきます。
(ここの部分もDataform で多分やれるはずです)
これでBigQuery へロードするジョブと連携用テーブル、メインテーブルの作成が完了しました。
ではここからDataform の作業です。 以下のリンクからアカウントを作成します。
Dataform | Manage data pipelines in BigQuery
アカウント作成後、プロジェクトを新規作成します。
BigQuery に接続するにはサービスアカウントでの接続が必要なようですので、サービスアカウント作成後、JSONタイプの秘密鍵をダウンロードしときます。
サービスアカウントのロールは今回はBigQuery 管理者を付与しときました。
これでプロジェクトができたのでDataform を始めることができますね。
Schemas を選択してみるとBigQuery にあるテーブルがずらーっと表示されています。
テーブルの中もプレビューできますね。
①連携データのチェック
最初に連携データのチェックを行うSQL を作成するのですが、Dataform ではアサーションを使用できるのでこれを使ってチェックする処理を作成します。
Test data quality with assertions | Dataform
では、FILES からdefinitions にマウスカーソルをもってきて「・・・」をクリックします。
New file をクリックします。
ファイル名を設定し、SQL かJavaScript を選べますが、今回はSQLX を選択します。(SQLXはSQLの拡張機能版ライブラリのようです)
今回はEmail アドレスが入っていないレコードがあるのかをチェックしていきます。以下のコードをDataform に登録します。
config {
type: "assertion",
}
SELECT
Id,Name,Email
FROM
demo.Lead_Sync
WHERE
Email IS NULL
入力したら自動的にコンパイルされて右側に結果が表示されます。エラーの場合はエラー箇所が表示されるようになってました。
コンパイルされたクエリは下のRUN STATEMENT で実行できます。
では実際にアサーションを実行してみます。
引っかかった場合は、クエリ結果が表示されます。
ちなみに結果はテーブルで保存されるようです。
これでデータチェックの部分は準備できました。
②連携用テーブルとメインテーブルのマージ
また同じようにNew でファイルを作成します。
先ほどはAssertion を選択しましたが、今回はテーブルでもビューでもどちらでも大丈夫です。
以下のマージを行うSQL をセットしました。
対象レコードがない場合は追加する処理内容となっています。
※Lead_Main がメインテーブルです。
MERGE demo.Lead_Main T USING demo.Lead_Sync S ON T.Id = S.Id
WHEN MATCHED THEN
UPDATE
SET
Id = S.Id,
Name = S.Name,
LastName = S.LastName,
FirstName = S.FirstName,
Title = S.Title,
Company = S.Company,
Phone = S.Phone,
MobilePhone = S.MobilePhone,
Fax = S.Fax,
Email = S.Email,
AnnualRevenue = S.AnnualRevenue,
LastModifiedDate = S.LastModifiedDate
WHEN NOT MATCHED THEN
INSERT
(
Id,
Name,
LastName,
FirstName,
Title,
Company,
Phone,
MobilePhone,
Fax,
Email,
AnnualRevenue,
LastModifiedDate
)
VALUES(
Id,
Name,
LastName,
FirstName,
Title,
Company,
Phone,
MobilePhone,
Fax,
Email,
AnnualRevenue,
LastModifiedDate
)
これでDataform で実行するSQL が完成です。
スケジューリング設定
外部から実行するためにはスケジューリング設定を行って、実行するSQL をひとまとめにする必要があるようです。
では、Schedules & environments をクリックし、スケジューリング設定を行います。
今回はスケジューリング実行じゃなくて、外部からの実行となりますので、「Enable this schedule」はオフにしておきます。
外部から実行する際に、Schedule name が必要となりますのでコピーしておきます。
API Keyの取得&プロジェクトID の確認
Dataform での最後の作業はAPI Keyの取得&プロジェクトIDの確認です。
API KeyはホームボタンからProject settings をクリックします。
ここからAPI Key を取得することができます。
プロジェクトIDはURL のこの部分のID になります。
これでDataform での作業が終わりました。
最後にELT ツールのCData Sync から実行するように設定していきます。
※作成したSQL などはコミットしておく必要があるようです。コミット方法は画面上部のCOMMIT ボタン押下です。
CData Sync では連携ジョブ実行後に行う処理を色々と定義することができるようになっています。
ジョブ画面のイベントタブのPost Job イベントに以下定義を追加します。
これで一連の処理の作成・設定が完了しました。
Salesforce のレコードを変更してみる
Salesforce でEmail がnull のレコードを作成しておきます。
現時点ではSalesforce でEmail 項目にnullのレコードが1件ある状態。
CData Sync から連携ジョブを実行
この状態でCData Sync のジョブを実行してみましょう。
結果はSaleforce 側で更新されたデータ1件が連携されました。
Dataform で実行結果をみてみると、マージ処理もアサーションも正常終了したようですね。
BigQuery コンソールでも確認してみます。
マージされたメインテーブルにEmail 項目がnull のレコードが反映されているのが確認できました。
また、dataform_assertions.assert_Lead というビューが作成されていて、アサーションでエラーとなったレコードが参照できるようになっていました。
これで、CData Sync でジョブをスケジューリングしてあげれば、あとは自動的にBigQuery のマージまでやってくれるようになります。
ちなみにDataform では結果をメールやSlack に通知できるようなので、エラーの時だけ通知がくるように設定することで運用も楽になりそうですね。
おわりに
いかがでしたでしょうか。Dataform のほんの一部分を使ってELT ツールと組み合わせてみました。マージ以外にも加工であったり、削除したりといろいろなケースに合わせて使えそうですね。
今日使ったCData Sync は30日間無償での利用が可能であり、Dataform については現在無料で使用できるようです。
www.cdata.com