Kafka コネクタ

概要

このコネクタを使用すると、Apache Kafka のトピックを Presto のテーブルとして使用できます。各メッセージは Presto の行として表示されます。

トピックはライブにできます。データが到着すると行が表示され、メッセージが削除されると行が消えます。これにより、単一のクエリで同じテーブルに複数回アクセスする場合 (例: 自己結合を実行する場合) に奇妙な動作が発生する可能性があります。

Apache Kafka 2.3.1 以降がサポートされています。

構成

Kafka コネクタを構成するには、次の内容でカタログプロパティファイル etc/catalog/kafka.properties を作成し、プロパティを必要に応じて置き換えます。

connector.name=kafka
kafka.table-names=table1,table2
kafka.nodes=host1:port,host2:port

複数の Kafka クラスタ

必要な数のカタログを持つことができます。したがって、追加の Kafka クラスタがある場合は、別のプロパティファイルを etc/catalog に別の名前で追加するだけです (必ず .properties で終わるようにします)。たとえば、プロパティファイルに sales.properties という名前を付けると、Presto は構成されたコネクタを使用して sales という名前のカタログを作成します。

構成プロパティ

次の構成プロパティを使用できます。

プロパティ名

説明

kafka.table-names

カタログによって提供されるすべてのテーブルのリスト

kafka.default-schema

テーブルのデフォルトのスキーマ名

kafka.nodes

Kafka クラスタ内のノードのリスト

kafka.connect-timeout

Kafka クラスタへの接続のタイムアウト

kafka.max-poll-records

ポーリングごとの最大レコード数

kafka.max-partition-fetch-bytes

ポーリングごとに 1 つのパーティションから取得する最大バイト数

kafka.table-description-dir

トピック記述ファイルを含むディレクトリ

kafka.hide-internal-columns

内部カラムがテーブルスキーマの一部であるかどうかを制御します

kafka.table-names

このカタログによって提供されるすべてのテーブルのコンマ区切りリスト。テーブル名は、修飾なし (単純名) であり、デフォルトのスキーマ (下記参照) に配置されるか、スキーマ名 (<schema-name>.<table-name>) で修飾することができます。

ここで定義された各テーブルについて、テーブル記述ファイル (下記参照) が存在する場合があります。テーブル記述ファイルが存在しない場合、テーブル名は Kafka のトピック名として使用され、データカラムはテーブルにマッピングされません。テーブルには、引き続きすべての内部カラム (下記参照) が含まれます。

このプロパティは必須です。デフォルトはなく、少なくとも 1 つのテーブルを定義する必要があります。

kafka.default-schema

修飾スキーマ名なしで定義されたすべてのテーブルが含まれるスキーマを定義します。

このプロパティはオプションです。デフォルトは default です。

kafka.nodes

Kafka データノードの hostname:port ペアのコンマ区切りリスト。

このプロパティは必須です。デフォルトはなく、少なくとも 1 つのノードを定義する必要があります。

Presto は、ここに指定されたノードのサブセットのみであっても、メッセージが特定のノードにのみ配置される可能性があるため、クラスタのすべてのノードに接続できる必要があります。

kafka.connect-timeout

データノードへの接続のタイムアウト。ビジー状態の Kafka クラスタは、接続を受け入れるまでにかなりの時間がかかる場合があります。タイムアウトのためにクエリが失敗する場合は、この値を大きくすることが適切な戦略です。

このプロパティはオプションです。デフォルトは 10 秒 (10s) です。

kafka.max-poll-records

Kafka からの poll() ごとの最大レコード数。

このプロパティはオプションです。デフォルトは 500 です。

kafka.max-partition-fetch-bytes``

ポーリングごとに 1 つのパーティションから取得する最大バイト数

このプロパティはオプションです。デフォルトは 1MB です。

kafka.table-description-dir

1 つ以上の JSON ファイル (.json で終わる必要があります) を保持する Presto デプロイメント内のフォルダを参照します。これらのファイルにはテーブル記述ファイルが含まれます。

このプロパティはオプションです。デフォルトは etc/kafka です。

kafka.hide-internal-columns

テーブル記述ファイルで定義されたデータカラムに加えて、コネクタは各テーブルに追加の多数のカラムを保持します。これらのカラムが非表示になっている場合でも、クエリで使用できますが、DESCRIBE <テーブル名> または SELECT * には表示されません。

このプロパティはオプションです。デフォルトは true です。

内部カラム

定義された各テーブルについて、コネクタは次のカラムを保持します。

カラム名

説明

_partition_id

BIGINT

この行を含む Kafka パーティションの ID。

_partition_offset

BIGINT

BIGINT

この行の Kafka パーティション内のオフセット。

_message_corrupt

BOOLEAN

この行のメッセージをデコーダーがデコードできなかった場合は True。True の場合、メッセージからマッピングされたデータカラムは無効として扱う必要があります。

_message

VARCHAR

UTF-8 エンコード文字列としてのメッセージバイト。これは、テキストトピックにのみ役立ちます。

BIGINT

_message_length

BIGINT

_message_corrupt

メッセージのバイト数。

_key_corrupt

_message

BOOLEAN

キーデコーダーがこの行のキーをデコードできなかった場合は True。True の場合、キーからマッピングされたデータカラムは無効として扱う必要があります。

BIGINT

_key

VARCHAR

UTF-8 エンコード文字列としてのキーバイト。これは、テキストキーにのみ役立ちます。

_key_length

BIGINT

キーのバイト数。

{
    "tableName": ...,
    "schemaName": ...,
    "topicName": ...,
    "key": {
        "dataFormat": ...,
        "fields": [
            ...
        ]
    },
    "message": {
        "dataFormat": ...,
        "fields": [
            ...
       ]
    }
}

テーブル定義ファイルがないテーブルの場合、_key_corrupt カラムと _message_corrupt カラムは常に false になります。

テーブル定義ファイル

説明

tableName

必須

文字列

このファイルで定義される Presto のテーブル名。

schemaName

オプション

文字列

テーブルが含まれるスキーマ。省略した場合、デフォルトのスキーマ名が使用されます。

topicName

必須

文字列

マッピングされる Kafka トピック。

key

オプション

JSON オブジェクト

メッセージキーにマッピングされるデータ列のフィールド定義。

message

オプション

JSON オブジェクト

メッセージ自体にマッピングされるデータ列のフィールド定義。

Kafka におけるキーとメッセージ

Kafka 0.8 以降では、トピック内の各メッセージにはオプションのキーを含めることができます。テーブル定義ファイルには、データをテーブル列にマッピングするためのキーとメッセージの両方のセクションが含まれています。

テーブル定義の key フィールドと message フィールドはそれぞれ JSON オブジェクトであり、以下の 2 つのフィールドを含む必要があります。

テーブル定義ファイルがないテーブルの場合、_key_corrupt カラムと _message_corrupt カラムは常に false になります。

テーブル定義ファイル

説明

dataFormat

必須

文字列

このフィールドグループのデコーダを選択します。

fields

必須

JSON 配列

フィールド定義のリスト。各フィールド定義は Presto テーブルに新しい列を作成します。

各フィールド定義は JSON オブジェクトです

{
    "name": ...,
    "type": ...,
    "dataFormat": ...,
    "mapping": ...,
    "formatHint": ...,
    "hidden": ...,
    "comment": ...
}

テーブル定義ファイルがないテーブルの場合、_key_corrupt カラムと _message_corrupt カラムは常に false になります。

テーブル定義ファイル

説明

name

必須

文字列

Presto テーブルの列の名前。

type

必須

文字列

列の Presto 型。

dataFormat

オプション

文字列

このフィールドの列デコーダを選択します。デフォルトでは、この行データ形式と列型に対するデフォルトのデコーダが使用されます。

dataSchema

オプション

文字列

Avro スキーマが存在するパスまたは URL。Avro デコーダでのみ使用されます。

mapping

オプション

文字列

列のマッピング情報。これはデコーダ固有です。以下を参照してください。

formatHint

オプション

文字列

列デコーダに対して、列固有のフォーマットヒントを設定します。

hidden

オプション

ブール値

DESCRIBE <テーブル名> および SELECT * から列を隠します。デフォルトは false です。

comment

オプション

文字列

DESCRIBE <テーブル名> で表示される列コメントを追加します。

キーまたはメッセージのフィールド記述に制限はありません。

行のデコード

キーとメッセージの場合、デコーダを使用してメッセージとキーデータをテーブル列にマッピングします。

Kafka コネクタには以下のデコーダが含まれています

  • raw - Kafka メッセージは解釈されず、生のメッセージバイトの範囲がテーブル列にマッピングされます。

  • csv - Kafka メッセージはカンマ区切りのメッセージとして解釈され、フィールドがテーブル列にマッピングされます。

  • json - Kafka メッセージは JSON として解析され、JSON フィールドがテーブル列にマッピングされます。

  • avro - Kafka メッセージは Avro スキーマに基づいて解析され、Avro フィールドがテーブル列にマッピングされます。

テーブルのテーブル定義ファイルが存在しない場合、dummy デコーダが使用され、列は何も公開されません。

raw デコーダ

raw デコーダは、Kafka メッセージまたはキーからの生の (バイトベースの) 値の読み取りをサポートし、それを Presto 列に変換します。

フィールドでは、以下の属性がサポートされています

  • dataFormat - 変換されたデータ型の幅を選択します

  • type - Presto データ型 (サポートされているデータ型のリストについては以下の表を参照してください)

  • mapping - <開始位置>[:<終了位置>]; 変換するバイトの開始位置と終了位置 (オプション)

dataFormat 属性は、変換されるバイト数を選択します。存在しない場合、BYTE が想定されます。すべての値は符号付きです。

サポートされている値は次のとおりです

  • BYTE - 1 バイト

  • SHORT - 2 バイト (ビッグエンディアン)

  • INT - 4 バイト (ビッグエンディアン)

  • LONG - 8 バイト (ビッグエンディアン)

  • FLOAT - 4 バイト (IEEE 754 形式)

  • DOUBLE - 8 バイト (IEEE 754 形式)

type 属性は、値をマッピングする Presto データ型を定義します。

列に割り当てられた Presto 型に応じて、dataFormat の異なる値を使用できます

Presto データ型

許可される dataFormat

BIGINT

BYTESHORTINTLONG

INTEGER

BYTESHORTINT

SMALLINT

BYTESHORT

TINYINT

BYTE

DOUBLE

DOUBLEFLOAT

_message_corrupt

BYTESHORTINTLONG

VARCHAR / VARCHAR(x)

BYTE

mapping 属性は、デコードに使用されるキーまたはメッセージ内のバイトの範囲を指定します。コロン (<開始位置>[:<終了位置>]) で区切られた 1 つまたは 2 つの数値にすることができます。

開始位置のみが指定されている場合

  • 固定幅型の場合、列は指定された dataFormat に適切なバイト数を使用します (上記を参照)。

  • VARCHAR 値がデコードされると、開始位置からメッセージの最後まで、すべてのバイトが使用されます。

開始位置と終了位置が指定されている場合

  • 固定幅型の場合、サイズは指定された dataFormat で使用されるバイト数と等しくなければなりません。

  • VARCHAR の場合、開始位置 (含む) と終了位置 (除く) の間のすべてのバイトが使用されます。

mapping 属性が指定されていない場合、開始位置を 0 に設定し、終了位置を未定義のままにするのと同じです。

数値データ型 (BIGINTINTEGERSMALLINTTINYINTDOUBLE) のデコードスキームは簡単です。バイトのシーケンスが入力メッセージから読み取られ、以下のいずれかに従ってデコードされます

  • ビッグエンディアンエンコーディング (整数型の場合)

  • IEEE 754 形式 ( DOUBLE の場合)。

デコードされたバイトシーケンスの長さは、dataFormat によって暗示されます。

VARCHAR データ型の場合、バイトのシーケンスは UTF-8 エンコーディングに従って解釈されます。

csv デコーダ

CSV デコーダは、メッセージまたはキーを表すバイトを UTF-8 エンコーディングを使用して文字列に変換し、その結果を CSV (カンマ区切り値) 行として解釈します。

フィールドの場合、type 属性と mapping 属性を定義する必要があります

  • type - Presto データ型 (サポートされているデータ型のリストについては以下の表を参照してください)

  • mapping - CSV レコード内のフィールドのインデックス

dataFormatformatHint はサポートされておらず、省略する必要があります。

以下の表は、type で使用できるサポートされている Presto 型とデコードスキームを示しています

Presto データ型

デコード規則

BIGINT
INTEGER
SMALLINT
TINYINT

Java Long.parseLong() を使用してデコード

DOUBLE

Java Double.parseDouble() を使用してデコード

_message_corrupt

「true」の文字シーケンスは true にマッピングされます。その他の文字シーケンスは false にマッピングされます

VARCHAR / VARCHAR(x)

そのまま使用されます

json デコーダ

JSON デコーダは、メッセージまたはキーを表すバイトを、RFC 4627 に従って JSON に変換します。メッセージまたはキーは、配列または単純型ではなく、JSON オブジェクトに変換される必要があることに注意してください。

フィールドでは、以下の属性がサポートされています

  • type - 列の Presto 型。

  • dataFormat - 列に使用されるフィールドデコーダ。

  • mapping - JSON オブジェクトからフィールドを選択するための、スラッシュで区切られたフィールド名のリスト

  • formatHint - custom-date-time のみ。以下を参照してください。

JSON デコーダは複数のフィールドデコーダをサポートしており、標準テーブル列には _default が使用され、日付と時刻ベースの型には複数のデコーダが使用されます。

以下の表は、type で使用できる Presto データ型と、dataFormat 属性を介して指定できる一致するフィールドデコーダを示しています

Presto データ型

許可される dataFormat

BIGINT
INTEGER
SMALLINT
TINYINT
DOUBLE
_message_corrupt
_message
VARCHAR(x)

デフォルトのフィールドデコーダ (省略された dataFormat 属性)

TIMESTAMP
TIMESTAMP WITH TIME ZONE
TIME
TIME WITH TIME ZONE

custom-date-timeiso8601rfc2822milliseconds-since-epochseconds-since-epoch

DATE

custom-date-timeiso8601rfc2822

デフォルトフィールドデコーダー

これは、すべてのPresto物理データ型をサポートする標準のフィールドデコーダーです。フィールド値は、JSON変換ルールによってブール値、long値、double値、または文字列値に強制変換されます。日付/時刻に基づかない列には、このデコーダーを使用する必要があります。

日付と時刻のデコーダー

JSONオブジェクトの値をPrestoのDATETIMETIME WITH TIME ZONETIMESTAMP、またはTIMESTAMP WITH TIME ZONE列に変換するには、フィールド定義のdataFormat属性を使用して特別なデコーダーを選択する必要があります。

  • iso8601 - テキストベースで、テキストフィールドをISO 8601タイムスタンプとして解析します。

  • rfc2822 - テキストベースで、テキストフィールドをRFC 2822タイムスタンプとして解析します。

  • custom-date-time - テキストベースで、formatHint属性を介して指定されたJoda形式パターンに従ってテキストフィールドを解析します。

    形式パターンは、https://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.htmlに準拠する必要があります。

  • milliseconds-since-epoch - 数値ベースで、テキストまたは数値をエポックからのミリ秒数として解釈します。

  • seconds-since-epoch - 数値ベースで、テキストまたは数値をエポックからのミリ秒数として解釈します。

TIMESTAMP WITH TIME ZONEおよびTIME WITH TIME ZONEデータ型の場合、デコードされた値にタイムゾーン情報が含まれている場合、その情報がPresto値で使用されます。それ以外の場合、結果のタイムゾーンはUTCに設定されます。

avroデコーダー

Avroデコーダーは、スキーマに基づいてAvro形式のメッセージまたはキーを表すバイトを変換します。メッセージにはAvroスキーマが埋め込まれている必要があります。PrestoはスキーマレスAvroデコードをサポートしていません。

キー/メッセージの場合、avroデコーダーを使用する場合、dataSchemaを定義する必要があります。これは、デコードする必要があるメッセージの有効なAvroスキーマファイルの場所を指す必要があります。この場所は、リモートWebサーバー(例:dataSchema: 'http://example.org/schema/avro_data.avsc')またはローカルファイルシステム(例:dataSchema: '/usr/local/schema/avro_data.avsc')にすることができます。この場所にPrestoコーディネーターノードからアクセスできない場合、デコーダーは失敗します。

フィールドでは、以下の属性がサポートされています

  • name - Prestoテーブルの列の名前。

  • type - 列の Presto 型。

  • mapping - Avroスキーマからフィールドを選択するためのスラッシュ区切りのフィールド名のリスト。mappingで指定されたフィールドが元のAvroスキーマに存在しない場合、読み取り操作はNULLを返します。

以下の表は、同等のAvroフィールドタイプに対してtypeで使用できる、サポートされているPrestoタイプをリストします。

Presto データ型

許可されるAvroデータ型

BIGINT

INTLONG

DOUBLE

DOUBLEFLOAT

_message_corrupt

_message_corrupt

VARCHAR / VARCHAR(x)

STRING

VARBINARY

FIXEDBYTES

ARRAY

ARRAY

MAP

MAP

Avroスキーマの進化

Avroデコーダーは、後方互換性のあるスキーマの進化機能をサポートしています。後方互換性を使用すると、古いスキーマで作成されたAvroデータを読み取るために、新しいスキーマを使用できます。Avroスキーマの変更は、Prestoのトピック定義ファイルにも反映する必要があります。新しく追加/名前変更されたフィールドは、Avroスキーマファイルにデフォルト値が設定されている必要があります

スキーマの進化の動作は次のとおりです

  • 新しいスキーマに追加された列:古いスキーマで作成されたデータは、テーブルが新しいスキーマを使用している場合、デフォルト値を生成します。

  • 新しいスキーマで削除された列:古いスキーマで作成されたデータは、削除された列からデータを生成しなくなります。

  • 新しいスキーマで名前が変更された列:これは列を削除して新しい列を追加するのと同じであり、古いスキーマで作成されたデータは、テーブルが新しいスキーマを使用している場合、デフォルト値を生成します。

  • 新しいスキーマで列のタイプを変更する:Avroで型の強制変換がサポートされている場合、変換が行われます。互換性のない型の場合、エラーがスローされます。