SQLAlchemy ORM を使用して Python で HDFS のデータ にアクセスする方法

Jerod Johnson
Jerod Johnson
Senior Technology Evangelist
SQLAlchemy オブジェクトリレーショナルマッピングを使用して、HDFS のデータ を操作する Python アプリケーションとスクリプトを作成します。

Python の豊富なモジュールエコシステムを活用することで、迅速に作業を開始し、システムを効果的に統合できます。CData Python Connector for HDFS と SQLAlchemy ツールキットを使用して、HDFS に接続された Python アプリケーションやスクリプトを構築できます。この記事では、SQLAlchemy を使用して HDFS のデータ に接続し、クエリを実行する方法を説明します。

CData Python Connector は最適化されたデータ処理機能を内蔵しており、Python からリアルタイムの HDFS のデータ を操作する際に比類のないパフォーマンスを提供します。HDFS に対して複雑な SQL クエリを発行すると、CData Connector はフィルタや集計などのサポートされている SQL 操作を直接 HDFS にプッシュし、サポートされていない操作(多くの場合 SQL 関数や JOIN 操作)は組み込みの SQL エンジンを使用してクライアント側で処理します。

HDFS のデータ への接続

HDFS のデータ への接続は、他のリレーショナルデータソースへの接続と同様です。必要な接続プロパティを使用して接続文字列を作成します。この記事では、接続文字列を create_engine 関数のパラメータとして渡します。

HDFS 接続プロパティの取得・設定方法

HDFS への認証には、次の接続プロパティを設定します。

  • Host:HDFS インスタンスのホストに設定。
  • Port:HDFS インスタンスのポートに設定。デフォルトのポートは"9870" です。

以下の手順に従って SQLAlchemy をインストールし、Python オブジェクトを通じて HDFS にアクセスしてみましょう。

必要なモジュールのインストール

pip ユーティリティを使用して、SQLAlchemy ツールキットと SQLAlchemy ORM パッケージをインストールします。

pip install sqlalchemy
pip install sqlalchemy.orm

適切なモジュールをインポートします。

from sqlalchemy import create_engine, String, Column
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Python での HDFS のデータ のモデリング

これで接続文字列を使用して接続できます。create_engine 関数を使用して、HDFS のデータ を操作するための Engine を作成します。

注意: 接続文字列のプロパティに特殊文字が含まれている場合は、URL エンコードする必要があります。詳細については、SQL Alchemy ドキュメントを参照してください。

engine = create_engine("hdfs:///?Host=sandbox-hdp.hortonworks.com&Port=50070&Path=/user/root&User=root")

HDFS のデータ のマッピングクラスの宣言

接続を確立したら、ORM でモデル化するテーブルのマッピングクラスを宣言します(この記事では、Files テーブルをモデル化します)。sqlalchemy.ext.declarative.declarative_base 関数を使用して、一部またはすべてのフィールド(カラム)を定義した新しいクラスを作成します。

base = declarative_base()
class Files(base):
	__tablename__ = "Files"
	FileId = Column(String,primary_key=True)
	ChildrenNum = Column(String)
	...

HDFS のデータ のクエリ

マッピングクラスを準備したら、セッションオブジェクトを使用してデータソースにクエリを実行できます。Engine をセッションにバインドした後、セッションの query メソッドにマッピングクラスを渡します。

query メソッドの使用

engine = create_engine("hdfs:///?Host=sandbox-hdp.hortonworks.com&Port=50070&Path=/user/root&User=root")
factory = sessionmaker(bind=engine)
session = factory()
for instance in session.query(Files).filter_by(FileId="119116"):
	print("FileId: ", instance.FileId)
	print("ChildrenNum: ", instance.ChildrenNum)
	print("---------")

別の方法として、適切なテーブルオブジェクトと execute メソッドを使用することもできます。以下のコードはアクティブな session で動作します。

execute メソッドの使用

Files_table = Files.metadata.tables["Files"]
for instance in session.execute(Files_table.select().where(Files_table.c.FileId == "119116")):
	print("FileId: ", instance.FileId)
	print("ChildrenNum: ", instance.ChildrenNum)
	print("---------")

JOIN、集計、制限などのより複雑なクエリの例については、拡張機能のヘルプドキュメントを参照してください。

無料トライアルと詳細情報

CData Python Connector for HDFS の30日間の無料トライアルをダウンロードして、HDFS のデータ に接続する Python アプリとスクリプトの構築を始めましょう。ご質問がありましたら、サポートチームまでお問い合わせください。

はじめる準備はできましたか?

HDFS Connector のコミュニティライセンスをダウンロード:

 ダウンロード

詳細:

HDFS Icon HDFS Python Connector お問い合わせ

HDFS データ連携用Python コネクタライブラリ。HDFS データをpandas、SQLAlchemy、Dash、petl などの人気のPython ツールにシームレスに統合。