Python でAzure Data Lake Storage のデータを変換・出力するETL 処理を作る方法
Pythonエコシステムには多くのモジュールがあり、システム構築を素早く効率的に行うことができます。本記事では、CData Python Connector for ADLS とpetl フレームワークを使って、Azure Data Lake Storage のデータにPython から接続してデータを変換、CSV に出力するETL 変換を実装してみます。
CData Python Connector は効率的なデータ処理によりAzure Data Lake Storage のデータ にPython から接続し、高いパフォーマンスを発揮します。Azure Data Lake Storage にデータをクエリする際、ドライバーはフィルタリング、集計などがサポートされている場合SQL 処理を直接Azure Data Lake Storage 側に行わせ、サポートされていないSQL 処理については、組み込みのSQL エンジンによりクライアント側で処理を行います(JOIN やSQL 関数など)。
必要なモジュールのインストール
pip で必要なモジュールおよびフレームワークをインストールします:
pip install petl pip install pandas
Python でAzure Data Lake Storage のデータをETL 処理するアプリを構築
モジュールとフレームワークをインストールしたら、ETL アプリケーションを組んでいきます。コードのスニペットは以下の通りです。フルコードは記事の末尾に付いています。
CData Connector を含むモジュールをインポートします。
import petl as etl import pandas as pd import cdata.adls as mod
接続文字列で接続を確立します。connect 関数を使って、CData Azure Data Lake Storage Connector からAzure Data Lake Storage への接続を行います
cnxn = mod.connect("Schema=ADLSGen2;Account=myAccount;FileSystem=myFileSystem;AccessKey=myAccessKey;")
Azure Data Lake Storage 接続プロパティの取得・設定方法
Azure Data Lake Storage Gen2 への接続
それでは、Gen2 Data Lake Storage アカウントに接続していきましょう。接続するには、以下のプロパティを設定します。
- Account:ストレージアカウントの名前
- FileSystem:このアカウントに使用されるファイルシステム名。例えば、Azure Blob コンテナの名前
- Directory(オプション):レプリケートされたファイルが保存される場所へのパス。パスが指定されない場合、ファイルはルートディレクトリに保存されます
Azure Data Lake Storage Gen2への認証
続いて、認証方法を設定しましょう。CData 製品では、5つの認証方法をサポートしています:アクセスキー(AccessKey)の使用、共有アクセス署名(SAS)の使用、Azure Active Directory OAuth(AzureAD)経由、Azure サービスプリンシパル(AzureServicePrincipal またはAzureServicePrincipalCert)経由、およびManaged Service Identity(AzureMSI)経由です。
アクセスキー
アクセスキーを使用して接続するには、まずADLS Gen2ストレージアカウントで利用可能なアクセスキーを取得する必要があります。
Azure ポータルでの手順は以下のとおりです:
- ADLS Gen2ストレージアカウントにアクセスします
- 設定でアクセスキーを選択します
- 利用可能なアクセスキーの1つの値をAccessKey 接続プロパティにコピーします
接続の準備ができたら、以下のプロパティを設定してください。
- AuthScheme:AccessKey
- AccessKey:先ほどAzure ポータルで取得したアクセスキーの値
共有アクセス署名(SAS)
共有アクセス署名を使用して接続するには、まずAzure Storage Explorer ツールを使用して署名を生成する必要があります。
接続の準備ができたら、以下のプロパティを設定してください。
- AuthScheme:SAS
- SharedAccessSignature:先ほど生成した共有アクセス署名の値
その他の認証方法については、 href="/kb/help/" target="_blank">ヘルプドキュメントの「Azure Data Lake Storage Gen2への認証」セクションをご確認ください。
Azure Data Lake Storage をクエリするSQL 文の作成
Azure Data Lake Storage にはSQL でデータアクセスが可能です。Resources エンティティからのデータを読み出します。
sql = "SELECT FullPath, Permission FROM Resources WHERE Type = 'FILE'"
Azure Data Lake Storage データのETL 処理
DataFrame に格納されたクエリ結果を使って、petl でETL(抽出・変換・ロード)パイプラインを組みます。この例では、Azure Data Lake Storage のデータ を取得して、Permission カラムでデータをソートして、CSV ファイルにデータをロードします。
table1 = etl.fromdb(cnxn,sql) table2 = etl.sort(table1,'Permission') etl.tocsv(table2,'resources_data.csv')
CData Python Connector for ADLS を使えば、データベースを扱う場合と同感覚で、Azure Data Lake Storage のデータ を扱うことができ、petl のようなETL パッケージから直接データにアクセスが可能になります。
おわりに
Azure Data Lake Storage Python Connector の30日の無償トライアル をぜひダウンロードして、Azure Data Lake Storage のデータ への接続をPython アプリやスクリプトから簡単に作成しましょう。
フルソースコード
import petl as etl
import pandas as pd
import cdata.adls as mod
cnxn = mod.connect("Schema=ADLSGen2;Account=myAccount;FileSystem=myFileSystem;AccessKey=myAccessKey;")
sql = "SELECT FullPath, Permission FROM Resources WHERE Type = 'FILE'"
table1 = etl.fromdb(cnxn,sql)
table2 = etl.sort(table1,'Permission')
etl.tocsv(table2,'resources_data.csv')