AWS Glue で Boto3 を使って参照ファイルをインポートする方法
このエントリーでは、boto3 を使用して RSD ファイルなどの参照ファイルを S3 から AWS Glue エグゼキュータにダウンロードする方法を解説します。これにより、JDBC ドライバーが必要なファイルを参照・使用できるようになります。
Date Entered: 3/26/2020 Last Updated: 4/2/2020 Author: Garrett Bird
データソースによっては、ローカルファイルへの直接接続を必要としたり、データモデルを定義するための RSD スキーマファイルに依存したりする場合があります。いずれの場合も、S3 に保存された参照ファイルは、AWS Glue で動作するドライバーから直接アクセスすることができません。そのため、これらのファイルを使用する前に、ジョブでファイルを取得する必要があります。ここで役立つのが boto3 です。具体的には、JDBC ドライバーが関連ファイルにアクセスできるようにするには、JDBC ドライバーを実際に使用する前に boto3 を使って S3 からファイルをダウンロードするだけです。
事前準備
このエントリーでは、JSON プロバイダーを例として使用します。JSON プロバイダーはローカルファイルをデータソースとして利用でき、また RSD ファイルを使用してテーブルのメタデータを定義することもできます。まだ完了していない場合は、こちらの記事で説明されている事前準備の手順を実行してください。その後、参照ファイルを S3 にアップロードします。このエントリーでは、以下の JSON ファイルと RSD ファイルを使用します。
people.json
{
"people": [
{
"personal": {
"age": 20,
"gender": "M",
"name": {
"first": "John",
"last": "Doe"
}
},
"vehicles": [
{
"type": "car",
"model": "Honda Civic",
"insurance": {
"company": "ABC Insurance",
"policy_num": "12345"
},
"maintenance": [
{
"date": "07-17-2017",
"desc": "oil change"
},
{
"date": "01-03-2018",
"desc": "new tires"
}
]
},
{
"type": "truck",
"model": "Dodge Ram",
"insurance": {
"company": "ABC Insurance",
"policy_num": "12345"
},
"maintenance": [
{
"date": "08-27-2017",
"desc": "new tires"
},
{
"date": "01-08-2018",
"desc": "oil change"
}
]
}
],
"source": "internet"
},
{
"personal": {
"age": 24,
"gender": "F",
"name": {
"first": "Jane",
"last": "Roberts"
}
},
"vehicles": [
{
"type": "car",
"model": "Toyota Camry",
"insurance": {
"company": "Car Insurance",
"policy_num": "98765"
},
"maintenance": [
{
"date": "05-11-2017",
"desc": "tires rotated"
},
{
"date": "11-03-2017",
"desc": "oil change"
}
]
},
{
"type": "car",
"model": "Honda Accord",
"insurance": {
"company": "Car Insurance",
"policy_num": "98765"
},
"maintenance": [
{
"date": "10-07-2017",
"desc": "new air filter"
},
{
"date": "01-13-2018",
"desc": "new brakes"
}
]
}
],
"source": "phone"
}
]
}
people.rsd
<api:script xmlns:api="http://apiscript.com/ns?v1" xmlns:xs="http://www.w3.org/2001/XMLSchema">
<!-- カラム定義でカラムの動作を指定し、XPath を使用して JSON からカラム値を抽出します。 -->
<api:info title="people" desc="Generated schema file." xmlns:other="http://apiscript.com/ns?v1">
<!-- ここで名前、型、カラムサイズを変更できます。 -->
<attr name="date" xs:type="date" readonly="false" other:xPath="/json/people/vehicles/maintenance/date" />
<attr name="desc" xs:type="string" readonly="false" other:xPath="/json/people/vehicles/maintenance/desc" />
<attr name="insurance.company" xs:type="string" readonly="false" other:xPath="/json/people/vehicles/insurance/company" />
<attr name="insurance.policy_num" xs:type="string" readonly="false" other:xPath="/json/people/vehicles/insurance/policy_num" />
<attr name="maintenance:_id" xs:type="string" readonly="false" key="true" other:xPath="/json/people/vehicles/maintenance/_id" />
<attr name="model" xs:type="string" readonly="false" other:xPath="/json/people/vehicles/model" />
<attr name="people:_id" xs:type="string" readonly="false" key="true" other:xPath="/json/people/_id" />
<attr name="personal.age" xs:type="integer" readonly="false" other:xPath="/json/people/personal/age" />
<attr name="personal.gender" xs:type="string" readonly="false" other:xPath="/json/people/personal/gender" />
<attr name="personal.name.first" xs:type="string" readonly="false" other:xPath="/json/people/personal/name/first" />
<attr name="personal.name.last" xs:type="string" readonly="false" other:xPath="/json/people/personal/name/last" />
<attr name="source" xs:type="string" readonly="false" other:xPath="/json/people/source" />
<attr name="type" xs:type="string" readonly="false" other:xPath="/json/people/vehicles/type" />
<attr name="vehicles:_id" xs:type="string" readonly="false" key="true" other:xPath="/json/people/vehicles/_id" />
</api:info>
<api:set attr="DataModel" value="FLATTENEDDOCUMENTS" />
<api:set attr="URI" value="/tmp/people.json" />
<api:set attr="JSONPath" value="$.people.vehicles.maintenance;$.people.vehicles;$.people" />
<!-- GET メソッドは SELECT に対応します。ここで SELECT ステートメントのデフォルト処理をオーバーライドできます。処理結果はスキーマの出力にプッシュされます。詳細は SELECT Execution を参照してください。 -->
<api:script method="GET">
<api:call op="jsonproviderGet">
<api:push/>
</api:call>
</api:script>
<!-- INSERT のサポートを追加するには、ヘルプ内の INSERT Execution ページで詳細情報と例を参照してください。 -->
<api:script method="POST">
<api:set attr="method" value="POST"/>
<api:call op="jsonproviderGet">
<api:throw code="500" desc="Inserts are not currently supported."/>
<api:push/>
</api:call>
</api:script>
<!-- UPDATE のサポートを追加するには、ヘルプ内の UPDATE Execution ページで詳細情報と例を参照してください。 -->
<api:script method="MERGE">
<api:set attr="method" value="PUT"/>
<api:call op="jsonproviderGet">
<api:throw code="500" desc="Updates are not currently supported."/>
<api:push/>
</api:call>
</api:script>
<!-- DELETE のサポートを追加するには、ヘルプ内の DELETE Execution ページで詳細情報と例を参照してください。 -->
<api:script method="DELETE">
<api:set attr="method" value="DELETE"/>
<api:call op="jsonproviderGet">
<api:throw code="500" desc="Deletes are not currently supported."/>
<api:push/>
</api:call>
</api:script>
</api:script>
Glue スクリプトの作成
必要なリソースを S3 にアップロードしたら、スクリプトを作成します。基本的にはこちらの記事とほぼ同じですが、以下の変更を加えます。
- boto3、botocore、TransferConfig を含む追加のインポートを記述します。
- S3 リソースから目的のファイルをダウンロードするコードを追加します。tmp ディレクトリはすべてのユーザーが書き込み可能なため、ダウンロード先として最適です。
- 設定の中で「use_threads」を false に設定しています。これにより、ファイルのダウンロードが完了する前にドライバーがファイルを参照しようとする並行処理の問題を防ぐことができます。
- JDBC URL を変更し、直接の接続プロパティではなくインポートした RSD ファイルを使用するようにします。これは Location を tmp ディレクトリに設定することで実現します。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
import boto3
import botocore
from boto3.s3.transfer import TransferConfig
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sparkContext = SparkContext()
glueContext = GlueContext(sparkContext)
sparkSession = glueContext.spark_session
##boto3 を使用して JDBC ドライバーが参照するファイルをダウンロード
s3 = boto3.resource('s3')
dlConfig = TransferConfig(use_threads=False)
try:
s3.Bucket('mybucket').download_file('people.json', '/tmp/people.json', Config=dlConfig)
s3.Bucket('mybucket').download_file('people.rsd', '/tmp/people.rsd', Config=dlConfig)
except Exception as e:
if e.response['Error']['Code'] == "404":
print("The object does not exist.")
else:
raise
##CData JDBC ドライバーを使用して people テーブルから JSON サービスを DataFrame に読み込み
##JDBC URL とドライバークラス名を設定
source_df = sparkSession.read.format("jdbc").option("url", "jdbc:json:RTK=5246...;Location=/tmp;").option("dbtable", "people").option("driver", "cdata.jdbc.json.JSONDriver").load()
glueJob = Job(glueContext)
glueJob.init(args['JOB_NAME'], args)
##DataFrame を AWS Glue の DynamicFrame オブジェクトに変換
dynamic_dframe = DynamicFrame.fromDF(source_df, glueContext, "dynamic_df")
##DynamicFrame を CSV 形式のファイルとして S3 バケットのフォルダに書き込み
##事前に定義された接続を使用して、任意の Amazon データストア(SQL Server、Redshift など)に書き込むことも可能
retDatasink4 = glueContext.write_dynamic_frame.from_options(frame = dynamic_dframe, connection_type = "s3", connection_options = {"path": "s3://mybucket/outfiles"}, format = "csv", transformation_ctx = "datasink4")
glueJob.commit()
すべての準備が完了すると、上記のスクリプトは JSON ファイルをデータソースとして使用し、RSD ファイルでカラムマッピングを行い、CSV ファイルを S3 バケットに出力します。参照ファイルを必要とする他のデータソースについても、同様の変更を加えることで対応できます。
We appreciate your feedback. If you have any questions, comments, or suggestions about this entry, please contact our support team at support@cdata.co.jp.