SQLAlchemy ORM を使用して Python で Databricks のデータ にアクセスする方法

Jerod Johnson
Jerod Johnson
Senior Technology Evangelist
SQLAlchemy オブジェクトリレーショナルマッピングを使用して、Databricks のデータ を操作する Python アプリケーションとスクリプトを作成します。

Python の豊富なモジュールエコシステムを活用することで、迅速に作業を開始し、システムを効果的に統合できます。CData Python Connector for Databricks と SQLAlchemy ツールキットを使用して、Databricks に接続された Python アプリケーションやスクリプトを構築できます。この記事では、SQLAlchemy を使用して Databricks のデータ に接続し、クエリ、更新、削除、挿入を実行する方法を説明します。

CData Python Connector は最適化されたデータ処理機能を内蔵しており、Python からリアルタイムの Databricks のデータ を操作する際に比類のないパフォーマンスを提供します。Databricks に対して複雑な SQL クエリを発行すると、CData Connector はフィルタや集計などのサポートされている SQL 操作を直接 Databricks にプッシュし、サポートされていない操作(多くの場合 SQL 関数や JOIN 操作)は組み込みの SQL エンジンを使用してクライアント側で処理します。

Databricks データ連携について

CData を使用すれば、Databricks のライブデータへのアクセスと統合がこれまでになく簡単になります。お客様は CData の接続機能を以下の目的で利用しています:

  • Runtime バージョン 9.1 - 13.X から Pro および Classic Databricks SQL バージョンまで、すべてのバージョンの Databricks にアクセスできます。
  • あらゆるホスティングソリューションとの互換性により、お好みの環境で Databricks を使用し続けることができます。
  • パーソナルアクセストークン、Azure サービスプリンシパル、Azure AD など、さまざまな方法で安全に認証できます。
  • Databricks ファイルシステム、Azure Blob ストレージ、AWS S3 ストレージを使用して Databricks にデータをアップロードできます。

多くのお客様が、さまざまなシステムから Databricks データレイクハウスにデータを移行するために CData のソリューションを使用していますが、ライブ接続ソリューションを使用して、データベースと Databricks 間の接続をフェデレートしているお客様も多数います。これらのお客様は、SQL Server リンクサーバーまたは Polybase を使用して、既存の RDBMS 内から Databricks へのライブアクセスを実現しています。

一般的な Databricks のユースケースと CData のソリューションがデータの問題解決にどのように役立つかについては、ブログをご覧ください:What is Databricks Used For? 6 Use Cases


はじめに


Databricks のデータ への接続

Databricks のデータ への接続は、他のリレーショナルデータソースへの接続と同様です。必要な接続プロパティを使用して接続文字列を作成します。この記事では、接続文字列を create_engine 関数のパラメータとして渡します。

Databricks 接続プロパティの取得・設定方法

Databricks クラスターに接続するには、以下のプロパティを設定します。

  • Database:Databricks データベース名。
  • Server:Databricks クラスターのサーバーのホスト名
  • HTTPPath:Databricks クラスターのHTTP パス。
  • Token:個人用アクセストークン。この値は、Databricks インスタンスのユーザー設定ページに移動してアクセストークンタブを選択することで取得できます。
Databricks インスタンスで必要な値は、クラスターに移動して目的のクラスターを選択し、Advanced Options の下にあるJDBC/ODBC タブを選択することで見つけることができます。

Databricks への認証

CData は、次の認証スキームをサポートしています。

  • 個人用アクセストークン
  • Microsoft Entra ID(Azure AD)
  • Azure サービスプリンシパル
  • OAuthU2M
  • OAuthM2M

個人用アクセストークン

認証するには、次を設定します。

  • AuthSchemePersonalAccessToken
  • Token:Databricks サーバーへの接続に使用するトークン。Databricks インスタンスのユーザー設定ページに移動してアクセストークンタブを選択することで取得できます。

その他の認証方法については、ヘルプドキュメント の「はじめに」セクションを参照してください。

以下の手順に従って SQLAlchemy をインストールし、Python オブジェクトを通じて Databricks にアクセスしてみましょう。

必要なモジュールのインストール

pip ユーティリティを使用して、SQLAlchemy ツールキットと SQLAlchemy ORM パッケージをインストールします。

pip install sqlalchemy
pip install sqlalchemy.orm

適切なモジュールをインポートします。

from sqlalchemy import create_engine, String, Column
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Python での Databricks のデータ のモデリング

これで接続文字列を使用して接続できます。create_engine 関数を使用して、Databricks のデータ を操作するための Engine を作成します。

注意: 接続文字列のプロパティに特殊文字が含まれている場合は、URL エンコードする必要があります。詳細については、SQL Alchemy ドキュメントを参照してください。

engine = create_engine("databricks:///?Server=127.0.0.1&HTTPPath=MyHTTPPath&User=MyUser&Token=MyToken")

Databricks のデータ のマッピングクラスの宣言

接続を確立したら、ORM でモデル化するテーブルのマッピングクラスを宣言します(この記事では、Customers テーブルをモデル化します)。sqlalchemy.ext.declarative.declarative_base 関数を使用して、一部またはすべてのフィールド(カラム)を定義した新しいクラスを作成します。

base = declarative_base()
class Customers(base):
	__tablename__ = "Customers"
	City = Column(String,primary_key=True)
	CompanyName = Column(String)
	...

Databricks のデータ のクエリ

マッピングクラスを準備したら、セッションオブジェクトを使用してデータソースにクエリを実行できます。Engine をセッションにバインドした後、セッションの query メソッドにマッピングクラスを渡します。

query メソッドの使用

engine = create_engine("databricks:///?Server=127.0.0.1&HTTPPath=MyHTTPPath&User=MyUser&Token=MyToken")
factory = sessionmaker(bind=engine)
session = factory()
for instance in session.query(Customers).filter_by(Country="US"):
	print("City: ", instance.City)
	print("CompanyName: ", instance.CompanyName)
	print("---------")

別の方法として、適切なテーブルオブジェクトと execute メソッドを使用することもできます。以下のコードはアクティブな session で動作します。

execute メソッドの使用

Customers_table = Customers.metadata.tables["Customers"]
for instance in session.execute(Customers_table.select().where(Customers_table.c.Country == "US")):
	print("City: ", instance.City)
	print("CompanyName: ", instance.CompanyName)
	print("---------")

JOIN、集計、制限などのより複雑なクエリの例については、拡張機能のヘルプドキュメントを参照してください。

Databricks のデータ の挿入

Databricks のデータ を挿入するには、マッピングクラスのインスタンスを定義し、アクティブな session に追加します。セッションの commit 関数を呼び出して、追加されたすべてのインスタンスを Databricks にプッシュします。

new_rec = Customers(City="placeholder", Country="US")
session.add(new_rec)
session.commit()

Databricks のデータ の更新

Databricks のデータ を更新するには、フィルタクエリで目的のレコードを取得します。次に、フィールドの値を変更し、セッションの commit 関数を呼び出して、変更されたレコードを Databricks にプッシュします。

updated_rec = session.query(Customers).filter_by(SOME_ID_COLUMN="SOME_ID_VALUE").first()
updated_rec.Country = "US"
session.commit()

Databricks のデータ の削除

Databricks のデータ を削除するには、フィルタクエリで目的のレコードを取得します。次に、アクティブな session でレコードを削除し、セッションの commit 関数を呼び出して、指定されたレコード(行)に対して削除操作を実行します。

deleted_rec = session.query(Customers).filter_by(SOME_ID_COLUMN="SOME_ID_VALUE").first()
session.delete(deleted_rec)
session.commit()

無料トライアルと詳細情報

CData Python Connector for Databricks の30日間の無料トライアルをダウンロードして、Databricks のデータ に接続する Python アプリとスクリプトの構築を始めましょう。ご質問がありましたら、サポートチームまでお問い合わせください。

はじめる準備はできましたか?

Databricks Connector のコミュニティライセンスをダウンロード:

 ダウンロード

詳細:

Databricks Icon Databricks Python Connector お問い合わせ

Databricks データ接続用のPython コネクタライブラリ。Pandas、SQLAlchemy、Dash & petl など人気のPython ツールとDatabricks を連携。