Apache Kafka へのAirtable のデータのETL / ELT パイプラインを作ってデータを統合する方法
データ分析基盤へのAirtable のデータの取り込みのニーズが高まっています。CData Sync は、数百のSaaS / DB のデータをApache Kafka をはじめとする各種DB / データウェアハウスにノーコードで統合・レプリケーション(複製)が可能なETL / ELT ツールです。
本記事では、Airtable のデータをCData Sync を使ってApache Kafka に統合するデータパイプラインを作っていきます。
CData Sync とは?
CData Sync は、レポーティング、アナリティクス、機械学習、AI などで使えるよう、社内のデータを一か所に統合して管理できるデータ基盤をノーコードで構築できるETL ツールで、以下の特徴を持っています。
- Airtable をはじめとする数百種類のSaaS / DB データに対応
- Apache Kafka など多くのRDB、データレイク、データストア、データウェアハウスに同期可能
- 業務データのデータ分析基盤へのETL / ELT 機能に特化し、極限まで設定操作をシンプルに
- 主要なSaaS データの差分更新やCDC(Change Data Capture、変更データキャプチャ)のサポート
- フレキシブルなSQL / dbt 連携での取得データの変換
CData Sync では、1.データソースとしてAirtable の接続を設定、2.同期先としてApache Kafka の接続を設定、3.Airtable からApache Kafka へのレプリケーションジョブの作成、という3つのステップだけでレプリケーション処理を作成可能です。以下に具体的な設定手順を説明します。
1. データソースとしてAirtable の接続を設定
まずはじめに、CData Sync のブラウザ管理コンソールにログインします。CData Sync のインストールをまだ行っていない方は、本記事の製品リンクから「CData Sync」をクリックしてCData Sync をインストールしてください。30日間の無償トライアルをご利用いただけます。インストール後にCData Sync が起動して、ブラウザ設定画面が開きます。
それでは、データソース側にAirtable を設定していきましょう。左の「接続」タブをクリックします。
- 「+接続の追加」ボタンをクリックします。
- 「データソース」タブを選択して、リスト表示されるデータソースを選ぶか、検索バーにデータソース名を入力して、Airtable を見つけます。
- Airtable の右側の「→」をクリックして、Airtable アカウントへの接続画面を開きます。もし、Airtable のコネクタがデフォルトでCData Sync にインストールされていない場合には、ダウンロードアイコン(コネクタのアップロードアイコン)をクリックし、「ダウンロード」をクリックすると、CData Sync にコネクタがインストールされます。
- 接続プロパティにAirtable に接続するアカウント情報を入力をします。
Airtable への接続
それでは、Airtable に接続していきましょう。CData 製品は、Airtable にテーブルとビューを要求します。 Schema プロパティ(オプション)を使用すると、表示されるテーブルおよびビューを特定のベースに制限できます。 特定のベースに制限したい場合は、このプロパティを使用するスキーマの名前に設定してください。(これはAirtable のBase 名に相当します。)
すべてのAirtable Bases に加えて、DataModelInformation という名前の静的スキーマもご利用いただけます。 このスキーマでは、Bases、Tables、Users のような静的テーブルをクエリできます。 DisplayObjectIds がTrue に設定されている場合、Schema の値は名前ではなくAirtable Base id に設定する必要があります。
Airtableへの認証
続いて、認証方法を設定しましょう。個人用アクセストークンまたはOAuth PKCE のいずれかを使用してAirtable に認証できます。
個人用アクセストークン
個人用アクセストークンをまだ生成していない場合は、以下のステップで生成してみましょう。
- ユーザーアカウントにログインします
- "https://airtable.com/create/tokens" に移動します
- Create new token をクリックします
- Scopes で、Add a scope をクリックして以下の各スコープを追加します
- data.records:read
- data.records:write
- schema.bases:read
- Access で、トークンにアクセス権を付与するすべてのワークスペースとベースを追加します
- Create token をクリックしてトークンを生成します。生成されたトークンは一度しか表示されませんので、必ずコピーして保存してください
次に、以下の設定を行います。
- AuthScheme:PersonalAccessToken
- Token:先ほど生成した個人用アクセストークンの値
OAuth PKCE については、 href="/kb/help/" target="_blank">ヘルプドキュメントの「はじめに」をご確認ください。
- 「作成およびテスト」をクリックして、正しくAirtable に接続できているかをテストして保存します。これでレプリケーションのデータソースとしてAirtable への接続が設定されました。
2. 同期先としてApache Kafka の接続を設定
次に、Airtable のデータを書き込む先(=同期先)として、Apache Kafka を設定します。同じく「接続」タブを開きます。
- 「+接続の追加」ボタンをクリックします。
- 「同期先」タブを選択して、リスト表示されるデータソースを選ぶか、検索バーにデータソース名を入力して、Apache Kafka を見つけます。
- Apache Kafka の右側の「→」をクリックして、Apache Kafka データベースへの接続画面を開きます。もし、Apache Kafka のコネクタがデフォルトでCData Sync にインストールされていない場合には、ダウンロードアイコン(コネクタのアップロードアイコン)をクリックし、「ダウンロード」をクリックすると、CData Sync にコネクタがインストールされます。
- 必要な接続プロパティを入力します。
- Bootstrap Servers - 接続するApache Kafka Bootstrap サーバーのアドレスを設定。
- Auth Scheme - 認証スキームを選択。デフォルト設定はPlain で、ユーザーのログイン情報を使用します。
- User - Apache Kafka への認証に使用するユーザー名を入力。
- Password - Apache Kafka への認証に使用するパスワードを入力。
- Type Detection Scheme - 使用する型検出用スキーム(None、RowScan、SchemaRegistry、MessageOnly)を指定。デフォルトは「None」です。
- Use SSL - Secure Sockets Layer(SSL)プロトコルを使用するかどうかを指定。デフォルト値はFalse です。
- 「作成およびテスト」をクリックして、正しく接続できているかをテストします。
- これで同期先としてApache Kafka を設定できました。CData Sync では、Apache Kafka のデータベース名を指定するだけで同期するAirtable に併せたテーブルスキーマを自動的に作成(CREATE TABLE)してくれます。同期データに合わせたテーブルを事前に作成するなどの面倒な手順は必要ありません。もちろん、既存テーブルにマッピングを行いデータ同期を行うことも可能です。
3. Airtable からApache Kafka へのレプリケーションジョブの作成
CData Sync では、レプリケーションをジョブ単位で設定します。ジョブは、Airtable からApache Kafka という単位で設定し、複数のテーブルを含むことができます。レプリケーションジョブ設定には、「ジョブ」タブに進み、「+ジョブを追加」ボタンをクリックします。
「ジョブを追加」画面が開き、以下を入力します:
- 名前:ジョブの名前
- データソース:ドロップダウンリストから先に設定したAirtable を選択
- 同期先:先に設定したApache Kafka を選択
すべてのオブジェクトをレプリケーションする場合
Airtable のすべてのオブジェクト / テーブルをレプリケーションするには、「種類」セクションで「すべて同期」を選択して、「ジョブを追加」ボタンで確定します。
作成したジョブ画面で、右上の「▷実行」ボタンをクリックするだけで、全Airtable テーブルのApache Kafka への同期を行うことができます。
オブジェクトを選択してレプリケーションする場合
Airtable から特定のオブジェクト / テーブルを選択してレプリケーションを行うことが可能です。「種類」セクションでは、「標準(個別設定)」を選んでください。
次に「ジョブ」画面で、「タスク」タブをクリックし、「タスクを追加」ボタンをクリックします。 
するとCData Sync で利用可能なオブジェクト / テーブルのリストが表示されるので、レプリケーションを行うオブジェクトにチェックを付けます(複数選択可)。「ジョブを追加」ボタンで確定します。
作成したジョブ画面で、「▷実行」ボタンをクリックして(もしくは各タスク毎の実行ボタンを押して)、レプリケーションジョブを実行します。 
このようにとても簡単にAirtable からApache Kafka への同期を行うことができました。
CData Sync の主要な機能を試してみる:スケジューリング・差分更新・ETL
ジョブのスケジュール起動設定
CData Sync では、同期ジョブを1日に1回や15分に1回などのスケジュール起動をすることができます。ジョブ画面の「概要」タブから「スケジュール」パネルを選び、「⚙設定」ボタンをクリックします。「間隔」と同期時間の「毎時何分」を設定し、「保存」を押して設定を完了します。これでCData Sync が同期ジョブをスケジュール実行してくれます。ユーザーはダッシュボードで同期ジョブの状態をチェックするだけです。
差分更新
CData Sync では、主要なデータソースでは、差分更新が可能です。差分更新では、最後のジョブ実行時からデータソース側でデータの追加・変更があったデータだけを同期するので、レプリケーションのクエリ・通信のコストを圧倒的に抑えることが可能です。
差分更新を有効化するには、ジョブの「概要」タブから「差分更新」パネルを選び、「⚙設定」ボタンをクリックします。「開始日」と「レプリケーション間隔」を設定して、「保存」します。
SQL での取得データのカスタマイズ
CData Sync は、デフォルトではAirtable のオブジェクト / テーブルをそのままApache Kafka に複製しますが、ここにSQL、またはdbt 連携でのETL 処理を組み込むことができます。テーブルカラムが多すぎる場合や、データ管理の観点から一部のカラムだけをレプリケーションしたり、さらにデータの絞り込み(フィルタリング)をしたデータだけをレプリケーションすることが可能です。
ジョブの「概要」タブ、「タスク」タブへと進みます。選択されたタスク(テーブル)の「▶」の左側のメニューをクリックし、「編集」を選びます。タスクの編集画面が開きます。
UI からカラムを選択する場合には、「カラム」タブから「マッピング編集」をクリックします。レプリケーションで使用しないカラムからチェックを外します。
SQL を記述して、フィルタリングなどのカスタマイズを行うには、「クエリ」タブをクリックし、REPLICATE 「テーブル名」の後に標準SQL でフィルタリングを行います。
Airtable からApache Kafka へのデータ同期には、ぜひCData Sync をご利用ください
このようにノーコードで簡単にAirtable のデータをApache Kafka にレプリケーションできます。データ分析、AI やノーコードツールからのデータ利用などさまざまな用途でCData Sync をご利用いただけます。30日の無償トライアルで、シンプルでパワフルなデータパイプラインを体感してください。
日本のユーザー向けにCData Sync は、UI の日本語化、ドキュメントの日本語化、日本語でのテクニカルサポートを提供しています。
CData Sync の 導入事例を併せてご覧ください。