SQLAlchemy ORM を使用して Python で Amazon Athena のデータ にアクセスする方法

Jerod Johnson
Jerod Johnson
Senior Technology Evangelist
SQLAlchemy オブジェクトリレーショナルマッピングを使用して、Amazon Athena のデータ を操作する Python アプリケーションとスクリプトを作成します。

Python の豊富なモジュールエコシステムを活用することで、迅速に作業を開始し、システムを効果的に統合できます。CData Python Connector for Amazon Athena と SQLAlchemy ツールキットを使用して、Amazon Athena に接続された Python アプリケーションやスクリプトを構築できます。この記事では、SQLAlchemy を使用して Amazon Athena のデータ に接続し、クエリ、更新、削除、挿入を実行する方法を説明します。

CData Python Connector は最適化されたデータ処理機能を内蔵しており、Python からリアルタイムの Amazon Athena のデータ を操作する際に比類のないパフォーマンスを提供します。Amazon Athena に対して複雑な SQL クエリを発行すると、CData Connector はフィルタや集計などのサポートされている SQL 操作を直接 Amazon Athena にプッシュし、サポートされていない操作(多くの場合 SQL 関数や JOIN 操作)は組み込みの SQL エンジンを使用してクライアント側で処理します。

Amazon Athena データ連携について

CData は、Amazon Athena のライブデータにアクセスし、統合するための最も簡単な方法を提供します。お客様は CData の接続機能を以下の目的で使用しています:

  • IAM 認証情報、アクセスキー、インスタンスプロファイルなど、さまざまな方法で安全に認証できます。多様なセキュリティニーズに対応し、認証プロセスを簡素化します。
  • 詳細なエラーメッセージにより、セットアップを効率化し、問題を迅速に解決できます。
  • サーバーサイドでのクエリ実行により、パフォーマンスを向上させ、クライアントリソースへの負荷を最小限に抑えます。

ユーザーは、Tableau、Power BI、Excel などの分析ツールと Athena を統合し、お気に入りのツールから詳細な分析を行うことができます。

CData を使用した Amazon Athena のユニークなユースケースについては、ブログ記事をご覧ください:https://jp.cdata.com/blog/amazon-athena-use-cases


はじめに


Amazon Athena のデータ への接続

Amazon Athena のデータ への接続は、他のリレーショナルデータソースへの接続と同様です。必要な接続プロパティを使用して接続文字列を作成します。この記事では、接続文字列を create_engine 関数のパラメータとして渡します。

Amazon Athena 接続プロパティの取得・設定方法

それでは、早速Athena に接続していきましょう。

データに接続するには、以下の接続パラメータを指定します。

  • DataSource:接続するAmazon Athena データソース。
  • Database:接続するAmazon Athena データベース。
  • AWSRegion:Amazon Athena データがホストされているリージョン。
  • S3StagingDirectory:クエリの結果を保存するS3 フォルダ。

Database またはDataSource が設定されていない場合、CData 製品はAmazon Athena の利用可能なデータソースからすべてのデータベースのリスト化を試みます。そのため、両方のプロパティを設定することでCData 製品のパフォーマンスが向上します。

Amazon Athena の認証設定

CData 製品は幅広い認証オプションに対応しています。詳しくはヘルプドキュメントの「はじめに」を参照してみてください。

AWS キーを取得

IAM ユーザーの認証情報を取得するには、以下のステップお試しください。

  1. IAM コンソールにサインインします。
  2. ナビゲーションペインでユーザーを選択します。
  3. ユーザーのアクセスキーを作成または管理するには、ユーザーを選択してからセキュリティ認証情報タブに移動します。

AWS ルートアカウントの資格情報を取得するには、以下のステップをお試しください。

  1. ルートアカウントの認証情報を使用してAWS 管理コンソールにサインインします。
  2. アカウント名または番号を選択します。
  3. 表示されたメニューでMy Security Credentials を選択します。
  4. ルートアカウントのアクセスキーを管理または作成するには、Continue to Security Credentials をクリックし、[Access Keys]セクションを展開します。

その他の認証オプションについては、ヘルプドキュメントの「Amazon Athena への認証」を参照してください。

以下の手順に従って SQLAlchemy をインストールし、Python オブジェクトを通じて Amazon Athena にアクセスしてみましょう。

必要なモジュールのインストール

pip ユーティリティを使用して、SQLAlchemy ツールキットと SQLAlchemy ORM パッケージをインストールします。

pip install sqlalchemy
pip install sqlalchemy.orm

適切なモジュールをインポートします。

from sqlalchemy import create_engine, String, Column
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Python での Amazon Athena のデータ のモデリング

これで接続文字列を使用して接続できます。create_engine 関数を使用して、Amazon Athena のデータ を操作するための Engine を作成します。

注意: 接続文字列のプロパティに特殊文字が含まれている場合は、URL エンコードする必要があります。詳細については、SQL Alchemy ドキュメントを参照してください。

engine = create_engine("amazonathena:///?AccessKey='a123'&SecretKey='s123'&Region='IRELAND'&Database='sampledb'&S3StagingDirectory='s3://bucket/staging/'")

Amazon Athena のデータ のマッピングクラスの宣言

接続を確立したら、ORM でモデル化するテーブルのマッピングクラスを宣言します(この記事では、Customers テーブルをモデル化します)。sqlalchemy.ext.declarative.declarative_base 関数を使用して、一部またはすべてのフィールド(カラム)を定義した新しいクラスを作成します。

base = declarative_base()
class Customers(base):
	__tablename__ = "Customers"
	Name = Column(String,primary_key=True)
	TotalDue = Column(String)
	...

Amazon Athena のデータ のクエリ

マッピングクラスを準備したら、セッションオブジェクトを使用してデータソースにクエリを実行できます。Engine をセッションにバインドした後、セッションの query メソッドにマッピングクラスを渡します。

query メソッドの使用

engine = create_engine("amazonathena:///?AccessKey='a123'&SecretKey='s123'&Region='IRELAND'&Database='sampledb'&S3StagingDirectory='s3://bucket/staging/'")
factory = sessionmaker(bind=engine)
session = factory()
for instance in session.query(Customers).filter_by(CustomerId="12345"):
	print("Name: ", instance.Name)
	print("TotalDue: ", instance.TotalDue)
	print("---------")

別の方法として、適切なテーブルオブジェクトと execute メソッドを使用することもできます。以下のコードはアクティブな session で動作します。

execute メソッドの使用

Customers_table = Customers.metadata.tables["Customers"]
for instance in session.execute(Customers_table.select().where(Customers_table.c.CustomerId == "12345")):
	print("Name: ", instance.Name)
	print("TotalDue: ", instance.TotalDue)
	print("---------")

JOIN、集計、制限などのより複雑なクエリの例については、拡張機能のヘルプドキュメントを参照してください。

Amazon Athena のデータ の挿入

Amazon Athena のデータ を挿入するには、マッピングクラスのインスタンスを定義し、アクティブな session に追加します。セッションの commit 関数を呼び出して、追加されたすべてのインスタンスを Amazon Athena にプッシュします。

new_rec = Customers(Name="placeholder", CustomerId="12345")
session.add(new_rec)
session.commit()

Amazon Athena のデータ の更新

Amazon Athena のデータ を更新するには、フィルタクエリで目的のレコードを取得します。次に、フィールドの値を変更し、セッションの commit 関数を呼び出して、変更されたレコードを Amazon Athena にプッシュします。

updated_rec = session.query(Customers).filter_by(SOME_ID_COLUMN="SOME_ID_VALUE").first()
updated_rec.CustomerId = "12345"
session.commit()

Amazon Athena のデータ の削除

Amazon Athena のデータ を削除するには、フィルタクエリで目的のレコードを取得します。次に、アクティブな session でレコードを削除し、セッションの commit 関数を呼び出して、指定されたレコード(行)に対して削除操作を実行します。

deleted_rec = session.query(Customers).filter_by(SOME_ID_COLUMN="SOME_ID_VALUE").first()
session.delete(deleted_rec)
session.commit()

無料トライアルと詳細情報

CData Python Connector for Amazon Athena の30日間の無料トライアルをダウンロードして、Amazon Athena のデータ に接続する Python アプリとスクリプトの構築を始めましょう。ご質問がありましたら、サポートチームまでお問い合わせください。

はじめる準備はできましたか?

Amazon Athena Connector のコミュニティライセンスをダウンロード:

 ダウンロード

詳細:

Amazon Athena Icon Amazon Athena Python Connector お問い合わせ

Amazon Athena へのデータ連携用のPython Connecotr ライブラリ。 pandas、SQLAlchemy、Dash、petl などの主要なPython ツールにAmazon Athena をシームレスに統合。