Python でDatabricks のデータを変換・出力するETL 処理を作る方法

加藤龍彦
加藤龍彦
デジタルマーケティング
CData Python Connector とpetl モジュールを使って、Databricks のデータを変換後にCSV ファイルに吐き出すETL 処理を実装します。

Pythonエコシステムには多くのモジュールがあり、システム構築を素早く効率的に行うことができます。本記事では、CData Python Connector for Databricks とpetl フレームワークを使って、Databricks のデータにPython から接続してデータを変換、CSV に出力するETL 変換を実装してみます。

CData Python Connector は効率的なデータ処理によりDatabricks のデータ にPython から接続し、高いパフォーマンスを発揮します。Databricks にデータをクエリする際、ドライバーはフィルタリング、集計などがサポートされている場合SQL 処理を直接Databricks 側に行わせ、サポートされていないSQL 処理については、組み込みのSQL エンジンによりクライアント側で処理を行います(JOIN やSQL 関数など)。

必要なモジュールのインストール

pip で必要なモジュールおよびフレームワークをインストールします:

pip install petl
pip install pandas

Python でDatabricks のデータをETL 処理するアプリを構築

モジュールとフレームワークをインストールしたら、ETL アプリケーションを組んでいきます。コードのスニペットは以下の通りです。フルコードは記事の末尾に付いています。

CData Connector を含むモジュールをインポートします。

import petl as etl
import pandas as pd
import cdata.databricks as mod

接続文字列で接続を確立します。connect 関数を使って、CData Databricks Connector からDatabricks への接続を行います

cnxn = mod.connect("Server=127.0.0.1;HTTPPath=MyHTTPPath;User=MyUser;Token=MyToken;")

Databricks 接続プロパティの取得・設定方法

Databricks クラスターに接続するには、以下のプロパティを設定します。

  • Database:Databricks データベース名。
  • Server:Databricks クラスターのサーバーのホスト名
  • HTTPPath:Databricks クラスターのHTTP パス。
  • Token:個人用アクセストークン。この値は、Databricks インスタンスのユーザー設定ページに移動してアクセストークンタブを選択することで取得できます。
Databricks インスタンスで必要な値は、クラスターに移動して目的のクラスターを選択し、Advanced Options の下にあるJDBC/ODBC タブを選択することで見つけることができます。

Databricks への認証

CData は、次の認証スキームをサポートしています。

  • 個人用アクセストークン
  • Microsoft Entra ID(Azure AD)
  • Azure サービスプリンシパル
  • OAuthU2M
  • OAuthM2M

個人用アクセストークン

認証するには、次を設定します。

  • AuthSchemePersonalAccessToken
  • Token:Databricks サーバーへの接続に使用するトークン。Databricks インスタンスのユーザー設定ページに移動してアクセストークンタブを選択することで取得できます。

その他の認証方法については、ヘルプドキュメント の「はじめに」セクションを参照してください。

Databricks をクエリするSQL 文の作成

Databricks にはSQL でデータアクセスが可能です。Customers エンティティからのデータを読み出します。

sql = "SELECT City, CompanyName FROM Customers WHERE Country = 'US'"

Databricks データのETL 処理

DataFrame に格納されたクエリ結果を使って、petl でETL(抽出・変換・ロード)パイプラインを組みます。この例では、Databricks のデータ を取得して、CompanyName カラムでデータをソートして、CSV ファイルにデータをロードします。

table1 = etl.fromdb(cnxn,sql)

table2 = etl.sort(table1,'CompanyName')

etl.tocsv(table2,'customers_data.csv')

CData Python Connector for Databricks を使えば、データベースを扱う場合と同感覚で、Databricks のデータ を扱うことができ、petl のようなETL パッケージから直接データにアクセスが可能になります。

おわりに

Databricks Python Connector の30日の無償トライアル をぜひダウンロードして、Databricks のデータ への接続をPython アプリやスクリプトから簡単に作成しましょう。



フルソースコード

import petl as etl
import pandas as pd
import cdata.databricks as mod

cnxn = mod.connect("Server=127.0.0.1;HTTPPath=MyHTTPPath;User=MyUser;Token=MyToken;")

sql = "SELECT City, CompanyName FROM Customers WHERE Country = 'US'"

table1 = etl.fromdb(cnxn,sql)

table2 = etl.sort(table1,'CompanyName')

etl.tocsv(table2,'customers_data.csv')

はじめる準備はできましたか?

Databricks Connector のコミュニティライセンスをダウンロード:

 ダウンロード

詳細:

Databricks Icon Databricks Python Connector お問い合わせ

Databricks データ接続用のPython コネクタライブラリ。Pandas、SQLAlchemy、Dash & petl など人気のPython ツールとDatabricks を連携。