SQLAlchemy ORM を使用して Python で Elasticsearch のデータ にアクセスする方法
Python の豊富なモジュールエコシステムを活用することで、迅速に作業を開始し、システムを効果的に統合できます。CData Python Connector for Elasticsearch と SQLAlchemy ツールキットを使用して、Elasticsearch に接続された Python アプリケーションやスクリプトを構築できます。この記事では、SQLAlchemy を使用して Elasticsearch のデータ に接続し、クエリ、更新、削除、挿入を実行する方法を説明します。
CData Python Connector は最適化されたデータ処理機能を内蔵しており、Python からリアルタイムの Elasticsearch のデータ を操作する際に比類のないパフォーマンスを提供します。Elasticsearch に対して複雑な SQL クエリを発行すると、CData Connector はフィルタや集計などのサポートされている SQL 操作を直接 Elasticsearch にプッシュし、サポートされていない操作(多くの場合 SQL 関数や JOIN 操作)は組み込みの SQL エンジンを使用してクライアント側で処理します。
Elasticsearch データ連携について
CData を使用すれば、Elasticsearch のライブデータへのアクセスと統合がこれまでになく簡単になります。お客様は CData の接続機能を以下の目的で利用しています:
- SQL エンドポイントと REST エンドポイントの両方にアクセスでき、接続を最適化し、Elasticsearch データの読み書きに関してより多くのオプションを提供します。
- v2.2 以降およびオープンソース Elasticsearch サブスクリプションを含む、ほぼすべての Elasticsearch インスタンスに接続できます。
- SCORE() 関数を明示的に要求することなく、常にクエリ結果の関連性スコアを受け取ることができます。これにより、サードパーティツールからのアクセスが簡素化され、クエリ結果のテキスト関連性のランキングを簡単に確認できます。
- 複数のインデックスを検索でき、クライアントマシンではなく Elasticsearch がクエリと結果の管理・処理を担当します。
ユーザーは、Crystal Reports、Power BI、Excel などの分析ツールと Elasticsearch データを統合し、当社のツールを活用して、Elasticsearch を含むすべてのデータソースへの単一のフェデレートアクセスレイヤーを実現しています。
CData の Elasticsearch ソリューションの詳細については、ナレッジベース記事をご覧ください:CData Elasticsearch Driver Features & Differentiators
はじめに
Elasticsearch のデータ への接続
Elasticsearch のデータ への接続は、他のリレーショナルデータソースへの接続と同様です。必要な接続プロパティを使用して接続文字列を作成します。この記事では、接続文字列を create_engine 関数のパラメータとして渡します。
Elasticsearch 接続プロパティの取得・設定方法
接続するには、Server およびPort 接続プロパティを設定します。 認証には、User とPassword プロパティ、PKI (public key infrastructure)、またはその両方を設定します。 PKI を使用するには、SSLClientCert、SSLClientCertType、SSLClientCertSubject、およびSSLClientCertPassword プロパティを設定します。
CData 製品は、認証とTLS/SSL 暗号化にX-Pack Security を使用しています。TLS/SSL で接続するには、Server 値に'https://' を接頭します。Note: PKI を 使用するためには、TLS/SSL およびクライアント認証はX-Pack 上で有効化されていなければなりません。
接続されると、X-Pack では、設定したリルムをベースにユーザー認証およびロールの許可が実施されます。
以下の手順に従って SQLAlchemy をインストールし、Python オブジェクトを通じて Elasticsearch にアクセスしてみましょう。
必要なモジュールのインストール
pip ユーティリティを使用して、SQLAlchemy ツールキットと SQLAlchemy ORM パッケージをインストールします。
pip install sqlalchemy pip install sqlalchemy.orm
適切なモジュールをインポートします。
from sqlalchemy import create_engine, String, Column from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker
Python での Elasticsearch のデータ のモデリング
これで接続文字列を使用して接続できます。create_engine 関数を使用して、Elasticsearch のデータ を操作するための Engine を作成します。
注意: 接続文字列のプロパティに特殊文字が含まれている場合は、URL エンコードする必要があります。詳細については、SQL Alchemy ドキュメントを参照してください。
engine = create_engine("elasticsearch:///?Server=127.0.0.1&Port=9200&User=admin&Password=123456")
Elasticsearch のデータ のマッピングクラスの宣言
接続を確立したら、ORM でモデル化するテーブルのマッピングクラスを宣言します(この記事では、Orders テーブルをモデル化します)。sqlalchemy.ext.declarative.declarative_base 関数を使用して、一部またはすべてのフィールド(カラム)を定義した新しいクラスを作成します。
base = declarative_base() class Orders(base): __tablename__ = "Orders" OrderName = Column(String,primary_key=True) Freight = Column(String) ...
Elasticsearch のデータ のクエリ
マッピングクラスを準備したら、セッションオブジェクトを使用してデータソースにクエリを実行できます。Engine をセッションにバインドした後、セッションの query メソッドにマッピングクラスを渡します。
query メソッドの使用
engine = create_engine("elasticsearch:///?Server=127.0.0.1&Port=9200&User=admin&Password=123456")
factory = sessionmaker(bind=engine)
session = factory()
for instance in session.query(Orders).filter_by(ShipCity="New York"):
print("OrderName: ", instance.OrderName)
print("Freight: ", instance.Freight)
print("---------")
別の方法として、適切なテーブルオブジェクトと execute メソッドを使用することもできます。以下のコードはアクティブな session で動作します。
execute メソッドの使用
Orders_table = Orders.metadata.tables["Orders"]
for instance in session.execute(Orders_table.select().where(Orders_table.c.ShipCity == "New York")):
print("OrderName: ", instance.OrderName)
print("Freight: ", instance.Freight)
print("---------")
JOIN、集計、制限などのより複雑なクエリの例については、拡張機能のヘルプドキュメントを参照してください。
Elasticsearch のデータ の挿入
Elasticsearch のデータ を挿入するには、マッピングクラスのインスタンスを定義し、アクティブな session に追加します。セッションの commit 関数を呼び出して、追加されたすべてのインスタンスを Elasticsearch にプッシュします。
new_rec = Orders(OrderName="placeholder", ShipCity="New York") session.add(new_rec) session.commit()
Elasticsearch のデータ の更新
Elasticsearch のデータ を更新するには、フィルタクエリで目的のレコードを取得します。次に、フィールドの値を変更し、セッションの commit 関数を呼び出して、変更されたレコードを Elasticsearch にプッシュします。
updated_rec = session.query(Orders).filter_by(SOME_ID_COLUMN="SOME_ID_VALUE").first() updated_rec.ShipCity = "New York" session.commit()
Elasticsearch のデータ の削除
Elasticsearch のデータ を削除するには、フィルタクエリで目的のレコードを取得します。次に、アクティブな session でレコードを削除し、セッションの commit 関数を呼び出して、指定されたレコード(行)に対して削除操作を実行します。
deleted_rec = session.query(Orders).filter_by(SOME_ID_COLUMN="SOME_ID_VALUE").first() session.delete(deleted_rec) session.commit()
無料トライアルと詳細情報
CData Python Connector for Elasticsearch の30日間の無料トライアルをダウンロードして、Elasticsearch のデータ に接続する Python アプリとスクリプトの構築を始めましょう。ご質問がありましたら、サポートチームまでお問い合わせください。