Apache Airflow で Act-On データを連携

Jerod Johnson
Jerod Johnson
Senior Technology Evangelist
CData JDBC Driver を使用して、Apache Airflow で Act-On のデータ にアクセスして処理。

Apache Airflow は、データエンジニアリングワークフローの作成、スケジューリング、モニタリングをサポートするツールです。 CData JDBC Driver for Act-On と組み合わせることで、Airflow からリアルタイムの Act-On のデータ を扱うことができます。 この記事では、Apache Airflow インスタンスから Act-On のデータ に接続してクエリを実行し、結果を CSV ファイルに保存する方法を説明します。

CData JDBC ドライバーは、最適化されたデータ処理機能を組み込んでおり、 リアルタイムの Act-On のデータ を扱う際に比類のないパフォーマンスを発揮します。複雑な SQL クエリを Act-On に発行すると、 ドライバーはフィルタや集計などのサポートされている SQL 操作を直接 Act-On にプッシュし、 サポートされていない操作(主に SQL 関数や JOIN 操作)は組み込みの SQL エンジンを使用してクライアント側で処理します。 また、組み込みの動的メタデータクエリ機能により、ネイティブのデータ型を使用して Act-On のデータ の操作・分析が可能です。

Act-On への接続を設定

組み込みの接続文字列デザイナー

JDBC URL の構築には、Act-On JDBC Driver に組み込まれている接続文字列デザイナーを使用できます。JAR ファイルをダブルクリックするか、コマンドラインから JAR ファイルを実行してください。

java -jar cdata.jdbc.acton.jar

接続プロパティを入力し、接続文字列をクリップボードにコピーします。

ActOn はOAuth 認証標準を利用しています。OAuth を使って認証するには、アプリケーションを作成してOAuthClientIdOAuthClientSecret、およびCallbackURL 接続プロパティを取得する必要があります。

認証方法についての詳細は、ヘルプドキュメントの「認証の使用」を参照してください。

クラスター環境やクラウドで JDBC ドライバーをホストする場合は、ライセンス(製品版またはトライアル版)とランタイムキー(RTK)が必要です。ライセンス(またはトライアル)の取得については、弊社営業チームにお問い合わせください

以下は、JDBC 接続に必要な主なプロパティです。

プロパティ
データベース接続 URLjdbc:acton:RTK=5246...;
データベースドライバークラス名cdata.jdbc.acton.ActOnDriver

Airflow で JDBC 接続を設定

  1. Apache Airflow インスタンスにログインします。
  2. Airflow インスタンスのナビゲーションバーで、Admin にカーソルを合わせ、Connections をクリックします。
  3. 次の画面で + ボタンをクリックして、新しい接続を作成します。
  4. Add Connection フォームで、必要な接続プロパティを入力します:
    • Connection Id:接続の名前を入力します(例:acton_jdbc)
    • Connection Type:JDBC Connection
    • Connection URL:上記の JDBC 接続 URL(例:jdbc:acton:RTK=5246...;)
    • Driver Class:cdata.jdbc.acton.ActOnDriver
    • Driver Path:PATH/TO/cdata.jdbc.acton.jar
  5. フォーム下部の Test ボタンをクリックして、新しい接続をテストします。
  6. 新しい接続を保存すると、次の画面で接続リストに新しい行が追加されたことを示す緑色のバナーが表示されます。

DAG の作成

Airflow の DAG は、ワークフローのプロセスを保存し、トリガーすることでワークフローを実行できるエンティティです。 ここでのワークフローは、Act-On のデータ に対して SQL クエリを実行し、結果を CSV ファイルに保存するというシンプルなものです。

  1. まず、ホームディレクトリに「airflow」フォルダがあるはずです。その中に「dags」という新しいディレクトリを作成します。 ここに Python ファイルを保存すると、UI 上で Airflow DAG として表示されます。
  2. 次に、新しい Python ファイルを作成し、act-on_hook.py という名前を付けます。このファイルに以下のコードを挿入してください:
    	import time
    	from datetime import datetime
    	from airflow.decorators import dag, task
    	from airflow.providers.jdbc.hooks.jdbc import JdbcHook
    	import pandas as pd
    
    	# DAG を宣言
    	@dag(dag_id="act-on_hook", schedule_interval="0 10 * * *", start_date=datetime(2022,2,15), catchup=False, tags=['load_csv'])
    
    	# DAG 関数を定義
    	def extract_and_load():
    	# タスクを定義
    		@task()
    		def jdbc_extract():
    			try:
    				hook = JdbcHook(jdbc_conn_id="jdbc")
    				sql = """ select * from Account """
    				df = hook.get_pandas_df(sql)
    				df.to_csv("/{some_file_path}/{name_of_csv}.csv",header=False, index=False, quoting=1)
    				# print(df.head())
    				print(df)
    				tbl_dict = df.to_dict('dict')
    				return tbl_dict
    			except Exception as e:
    				print("Data extract error: " + str(e))
    
    		jdbc_extract()
    
    	sf_extract_and_load = extract_and_load()
    
  3. このファイルを保存し、Airflow インスタンスを更新します。DAG のリストに「act-on_hook」という新しい DAG が表示されるはずです。
  4. この DAG をクリックし、次の画面で一時停止スイッチをクリックして青色にオンにします。次に、トリガー(再生)ボタンをクリックして DAG を実行します。これにより、act-on_hook.py ファイル内の SQL クエリが実行され、コード内で指定したファイルパスに CSV として結果がエクスポートされます。
  5. 新しい DAG をトリガーした後、Downloads フォルダ(または Python スクリプト内で指定した場所)を確認すると、CSV ファイルが作成されていることがわかります。この例では account.csv です。
  6. CSV ファイルを開くと、Apache Airflow によって Act-On のデータ が CSV 形式で利用可能になっていることを確認できます。

詳細情報と無料トライアル

CData JDBC Driver for Act-On の30日間無料トライアルをダウンロードして、Apache Airflow でリアルタイムの Act-On のデータ を活用してみてください。ご質問があれば、サポートチームまでお気軽にお問い合わせください。

はじめる準備はできましたか?

Act-On Driver の無料トライアルをダウンロードしてお試しください:

 ダウンロード

詳細:

Act-On Icon Act-On JDBC Driver お問い合わせ

驚くほど簡単にJDBC でJava アプリケーションにAct-On マーケティングオートメーションのCampaigns、Programs、Reports データを連携!