Entity Framework 6 からKafka のデータに連携

加藤龍彦
加藤龍彦
デジタルマーケティング
この記事は、Entity Framework のcode-first アプローチを使って、Kafka に接続する方法を説明します。Entity Framework 6 は.NET 4.5 以上で利用可能です。

Entity Framework はobject-relational mapping フレームワークで、データをオブジェクトとして扱うために使われます。Visual Studio のADO.NET Entity Data Model ウィザードを実行するとEntity Model を作成できますが、このモデルファーストアプローチでは、データソースに変更があった場合やエンティティ操作をより制御したい場合は不都合があります。この記事では、CData ADO.NET Provider を使いコードファーストアプローチでKafka にアクセスします。

  1. Visual Studio を起動し、新しいWindows Form アプリケーションを作成します。ここでは、.NET 4.5 のC# プロジェクトを使います。
  2. Visual Studio の [パッケージ マネージャー コンソール]から'Install-Package EntityFramework' コマンドを実行し、最新のEntity Framework をインストールします。
  3. プロジェクトのApp.config ファイルを修正して、Kafka Entity Framework 6 アセンブリおよびコネクションストリングへの参照を追加します。

    Apache Kafka 接続プロパティの取得・設定方法

    それでは、Apache Kafka に接続していきましょう。.NET ベースのエディションは、Confluent.Kafka およびlibrdkafka ライブラリに依存して機能します。 これらのアセンブリはインストーラーにバンドルされており、CData 製品と一緒に自動的にインストールされます。 別のインストール方法をご利用の場合は、NuGet から依存関係のあるConfluent.Kafka 2.6.0をインストールしてください。

    Apache Kafka サーバーのアドレスを指定するには、BootstrapServers パラメータを使用します。

    デフォルトでは、CData 製品はデータソースとPLAINTEXT で通信しており、これはすべてのデータが暗号化なしで送信されることを意味します。 通信を暗号化したい場合は、以下の設定を行ってください:

    1. UseSSLtrue に設定し、CData 製品がSSL 暗号化を使用するように構成します
    2. SSLServerCert およびSSLServerCertType を設定して、サーバー証明書をロードします

    Apache Kafka への認証

    続いて、認証方法を設定しましょう。Apache Kafka データソースでは、以下の認証方法をサポートしています:

    • Anonymous
    • Plain
    • SCRAM ログインモジュール
    • SSL クライアント証明書
    • Kerberos

    Anonymous 認証

    Apache Kafka の特定のオンプレミスデプロイメントでは、認証接続プロパティを設定することなくApache Kafka に接続できます。 このような接続はanonymous(匿名)と呼ばれます。

    匿名認証を行うには、以下のプロパティを設定してください。

    • AuthSchemeNone

    その他の認証方法については、ヘルプドキュメントをご確認ください。

    
    <configuration>
       ... <connectionStrings>
        <add name="ApacheKafkaContext" connectionString="Offline=False;User=admin;Password=pass;BootStrapServers=https://localhost:9091;Topic=MyTopic;" providerName="System.Data.CData.ApacheKafka" />
      </connectionStrings>
      <entityFramework>
        <providers>
           ... <provider invariantName="System.Data.CData.ApacheKafka" type="System.Data.CData.ApacheKafka.ApacheKafkaProviderServices, System.Data.CData.ApacheKafka.Entities.EF6" />
        </providers>
      <entityFramework>
    </configuration>
    </code>
    
  4. インストールディレクトリの[lib] > 4.0 サブフォルダにあるSystem.Data.CData.ApacheKafka.Entities.EF6.dll を設定し、プロジェクトを作成してEntity Framework 6 を使うためのセットアップを完了します。
  5. この時点でプロジェクトを作成し、すべてが正しく動作していることを確認してください。これで、Entity Framework を使ってコーディングを開始できます。
  6. プロジェクトに新しい.cs ファイルを追加し、そこにクラスを追加します。これがデータベースのコンテキストとなり、DbContext クラスを拡張します。この例では、クラス名はApacheKafkaContext です。以下のサンプルコードは、OnModelCreating メソッドをオーバーライドして次の変更を加えます:
    • PluralizingTableNameConvention をModelBuilder Conventions から削除。
    • MigrationHistory テーブルへのリクエストを削除。
    
    using System.Data.Entity;
    using System.Data.Entity.Infrastructure;
    using System.Data.Entity.ModelConfiguration.Conventions;
    class ApacheKafkaContext :DbContext {
    	public ApacheKafkaContext() { }
    	protected override void OnModelCreating(DbModelBuilder modelBuilder) {  // To remove the requests to the Migration History table
    		Database.SetInitializer<ApacheKafkaContext>(null); // To remove the plural names modelBuilder.Conventions.Remove<PluralizingTableNameConvention>();
    	}
    }
    
  7. もう一つ.cs ファイルを作成し、ファイル名を呼び出そうとしているKafka のエンティティ、例えばSampleTable_1 にします。このファイルでは、エンティティとエンティティ設定の両方を定義します。以下に例を示します。
    
    using System.Data.Entity.ModelConfiguration;
    using System.ComponentModel.DataAnnotations.Schema;
    public class SampleTable_1 {
    	[DatabaseGeneratedAttribute(DatabaseGeneratedOption.Identity)]
    	public System.String Id { get; set; }
    	public System.String Id { get; set; }
    }
    public class SampleTable_1Map :EntityTypeConfiguration<SampleTable_1> {
    	public SampleTable_1Map() {
    		this.ToTable("SampleTable_1");
    		this.HasKey(SampleTable_1 => SampleTable_1.Id);
    		this.Property(SampleTable_1 => SampleTable_1.Id);
    	}
    }
    
  8. エンティティの作成が済んだので、コンテキストクラスにエンティティを追加します:
    
    public DbSet<SampleTable_1> SampleTable_1 { set; get; }
    
  9. コンテキストとエンティティの作成が完了したら、別クラスでデータをクエリできます。例:
    ApacheKafkaContext context = new ApacheKafkaContext();
    context.Configuration.UseDatabaseNullSemantics = true;
    var query = from line in context.SampleTable_1 select line;
    

はじめる準備はできましたか?

Apache Kafka Data Provider の無料トライアルをダウンロードしてお試しください:

 ダウンロード

詳細:

Apache Kafka Icon Apache Kafka ADO.NET Provider お問い合わせ

Apache Kafka データに連携する.NET アプリケーションを素早く、簡単に開発できる便利なドライバー。