AWS Lambda でリアルタイムKafka のデータにアクセス(IntelliJ IDEA を使用)

Dibyendu Datta
Dibyendu Datta
Lead Technology Evangelist
IntelliJ IDEA と CData JDBC Driver を使用して、AWS Lambda からリアルタイムKafka のデータに接続。

AWS Lambda は、新しい情報やイベントに素早く応答するアプリケーションを構築できるコンピューティングサービスです。CData JDBC Driver for Apache Kafka と組み合わせることで、AWS Lambda 関数からリアルタイムKafka のデータを操作できます。この記事では、IntelliJ で Maven を使用して AWS Lambda 関数を構築し、Kafka のデータに接続してクエリを実行する方法を説明します。

最適化されたデータ処理機能を組み込んだ CData JDBC ドライバは、リアルタイムKafka のデータとのインタラクションにおいて卓越したパフォーマンスを発揮します。Kafka に対して複雑な SQL クエリを発行すると、ドライバーはフィルタや集計などのサポートされている SQL 操作を直接Kafkaにプッシュし、サポートされていない操作(主に SQL 関数や JOIN 操作)は組み込みの SQL エンジンを使用してクライアント側で処理します。さらに、動的メタデータクエリ機能により、ネイティブのデータ型を使用してKafka のデータの操作・分析が可能です。

ステップ1:接続プロパティの設定と接続文字列の構築

CData JDBC Driver for Apache Kafka のインストーラーをダウンロードし、パッケージを解凍して JAR ファイルを実行してドライバーをインストールします。次に、必要な接続プロパティを収集します。

Apache Kafka 接続プロパティの取得・設定方法

それでは、Apache Kafka に接続していきましょう。.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされており、CData 製品と一緒に自動的にインストールされます。 別のインストール方法をご利用の場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0をインストールしてください。

Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。

デフォルトでは、CData 製品はデータソースとPLAINTEXT で通信しており、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化したい場合は、以下の設定を行ってください:

  1. UseSSLtrue に設定し、CData 製品がSSL 暗号化を使用するように構成します
  2. SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします

Apache Kafka への認証

続いて、認証方法を設定しましょう。Apache Kafka データソースでは、以下の認証方法をサポートしています:

  • Anonymous
  • Plain
  • SCRAM ログインモジュール
  • SSL クライアント証明書
  • Kerberos

Anonymous 認証

Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 このような接続はanonymous(匿名)と呼ばれます。

匿名認証を行うには、以下のプロパティを設定してください。

  • AuthSchemeNone

その他の認証方法については、ヘルプドキュメントをご確認ください。

NOTE: AWS Lambda 関数で JDBC ドライバーを使用するには、ライセンス(製品版または試用版)とランタイムキー(RTK)が必要です。ライセンス(または試用版)の取得については、弊社営業チームまでお問い合わせください

組み込みの接続文字列デザイナー

JDBC URL の構築には、Kafka JDBC Driver に組み込まれている接続文字列デザイナーを使用できます。JAR ファイルをダブルクリックするか、コマンドラインから JAR ファイルを実行してください。

java -jar cdata.jdbc.apachekafka.jar

接続プロパティ(RTK を含む)を入力し、接続文字列をクリップボードにコピーします。

ステップ2:IntelliJ でプロジェクトを作成

  1. IntelliJ IDEA で「New Project」をクリックします。
  2. Generators から「Maven Archetype」を選択します。
  3. プロジェクトに名前を付け、Archetype として「maven.archetypes:maven-archetype-quickstart」を選択します。
  4. 「Create」をクリックします。

CData JDBC Driver for Apache Kafka JAR ファイルのインストール

プロジェクトのルートフォルダから以下の Maven コマンドを実行して、JAR ファイルをプロジェクトにインストールします。

mvn install:install-file -Dfile="PATH/TO/CData JDBC Driver for Apache Kafka 20XX/lib/cdata.jdbc.apachekafka.jar" -DgroupId="org.cdata.connectors" -DartifactId="cdata-apachekafka-connector" -Dversion="23" -Dpackaging=jar

依存関係の追加

Maven プロジェクトの pom.xml ファイル内で、AWS とCData JDBC Driver for Apache Kafkaを依存関係として追加します(<dependencies> 要素内に以下の XML を追加)。

  • AWS
    <dependency>
       <groupId>com.amazonaws</groupId>
       <artifactId>aws-lambda-java-core</artifactId>
       <version>1.2.2</version> <!--Replace with the actual version-->
    </dependency>
  • CData JDBC Driver for Apache Kafka
    <dependency>
       <groupId>org.cdata.connectors</groupId>
       <artifactId>cdata-apachekafka-connector</artifactId>
       <version>25</version> <!--Replace with the actual version-->
    </dependency>
  • Fat JAR 作成用の Maven Shade Plugin
    <build>
      <plugins>
        <plugin>
          <groupId>org.apache.maven.plugins</groupId>
          <artifactId>maven-shade-plugin</artifactId>
          <version>3.4.1</version>
          <executions>
            <execution>
              <phase>package</phase>
              <goals>
                <goal>shade</goal>
              </goals>
              <configuration>
                <createDependencyReducedPom>false</createDependencyReducedPom>
                <transformers>
                  <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                    <mainClass>com.example.CDataLambda</mainClass>
                      <!-- Change to your actual Lambda handler class -->
                  </transformer>
                </transformers>
              </configuration>
            </execution>
          </executions>
        </plugin>
      </plugins>
    </build>

AWS Lambda 関数の作成

このサンプルプロジェクトでは、CDataLambda.java と CDataLambdaTest.java の2つのソースファイルを作成します。

Lambda 関数の定義

  1. CDataLambda クラスを AWS Lambda SDK の RequestHandler インターフェースを実装するように更新します。handleRequest メソッドを追加する必要があります。このメソッドは、Lambda 関数がトリガーされたときに以下のタスクを実行します:
    1. 入力を使用して SQL クエリを構築
    2. CData JDBC Driver for Apache Kafka を登録
    3. JDBC を使用してKafkaへの接続を確立
    4. Kafka で SQL クエリを実行
    5. 結果をコンソールに出力
    6. 出力メッセージを返す
  2. 以下の完全な Lambda クラスを使用してください。インポート、クラス定義、handleRequest メソッドが含まれています。DriverManager.getConnection 呼び出し内の接続文字列値は、実際の値に置き換えてください。

    package com.example;
    
    import com.amazonaws.services.lambda.runtime.Context;
    import com.amazonaws.services.lambda.runtime.RequestHandler;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.ResultSet;
    import java.sql.ResultSetMetaData;
    import java.sql.SQLException;
    import java.sql.Statement;
    public class CDataLambda implements RequestHandler < Object, String > {
    
      @Override
      public String handleRequest(Object input, Context context) {
        String query = "SELECT * FROM " + input;
    
        String bucketName = "MY_AWS_BUCKET";
        try {
          Class.forName("cdata.jdbc.apachekafka.ApacheKafkaDriver");
          cdata.jdbc.apachekafka.ApacheKafkaDriver driver = new cdata.jdbc.apachekafka.ApacheKafkaDriver();
          DriverManager.registerDriver(driver);
        } catch (SQLException ex) {
          // Registering the driver failed
          throw new RuntimeException("Failed to register JDBC driver", ex);
        } catch (ClassNotFoundException e) {
          // The driver class was not found in the classpath
          throw new RuntimeException("JDBC Driver class not found", e);
    
        }
        Connection connection = null;
        try {
          connection = DriverManager.getConnection("jdbc:cdata:apachekafka:RTK=52465...;User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;");
        } catch (SQLException ex) {
          context.getLogger().log("Error getting connection: " + ex.getMessage());
        } catch (Exception ex) {
          context.getLogger().log("Error: " + ex.getMessage());
        }
    
        if (connection != null) {
          context.getLogger().log("Connected Successfully!
    ");
        }
    
        ResultSet resultSet = null;
        try {
          //executing query
          Statement stmt = connection.createStatement();
          resultSet = stmt.executeQuery(query);
    
          ResultSetMetaData metaData = resultSet.getMetaData();
          int numCols = metaData.getColumnCount();
    
          //printing the results
          while (resultSet.next()) {
            for (int i = 1; i <= numCols; i++) {
              System.out.printf("%-25s", (resultSet.getObject(i) != null) ? resultSet.getObject(i).toString().replaceAll("
    ", "") : null);
            }
            System.out.print("
    ");
          }
        } catch (SQLException ex) {
          System.out.println("SQL Exception: " + ex.getMessage());
        } catch (Exception ex) {
          System.out.println("General exception: " + ex.getMessage());
        }
        return "v24 query: " + query + " complete";
      }
    }
    
    

ステップ3:Lambda 関数のデプロイと実行

IntelliJ で関数をビルドしたら、Maven プロジェクト全体を単一の JAR ファイルとしてデプロイする準備が整います。

  1. IntelliJ で mvn install コマンドを使用して SNAPSHOT JAR ファイルをビルドします。

    Note: Maven Shade Plugin は target フォルダに2つの JAR を生成します。AWS Lambda には常に、すべての必要な依存関係を含むサイズの大きい -shaded.jar ファイルをアップロードしてください。

  2. AWS Lambda で新しい関数を作成します(または既存の関数を開きます)。
  3. 関数に名前を付け、IAM ロールを選択し、タイムアウト値を関数が完了するのに十分な値に設定します(クエリの結果サイズによって異なります)。
  4. 「Upload from」->「.zip file」をクリックし、SNAPSHOT JAR ファイルを選択します。
  5. 「Runtime settings」セクションで「Edit」をクリックし、Handler を handleRequest メソッドに設定します(例:package.class::handleRequest)。
  6. これで関数をテストできます。「Event JSON」フィールドにテーブル名を設定し、「Test」をクリックします。

無償トライアル・詳細情報

CData JDBC Driver for Apache Kafka の30日間の無償トライアルをダウンロードして、AWS Lambda でリアルタイムKafka のデータを活用してみてください。ご不明な点があれば、サポートチームまでお気軽にお問い合わせください。

はじめる準備はできましたか?

Apache Kafka Driver の無料トライアルをダウンロードしてお試しください:

 ダウンロード

詳細:

Apache Kafka Icon Apache Kafka JDBC Driver お問い合わせ

Apache Kafka データに連携するJava アプリケーションを素早く、簡単に開発できる便利なドライバー。