Kafka コネクタ¶
概要¶
このコネクタを使用すると、Apache Kafka のトピックを Presto のテーブルとして使用できます。各メッセージは Presto の行として表示されます。
トピックはライブにできます。データが到着すると行が表示され、メッセージが削除されると行が消えます。これにより、単一のクエリで同じテーブルに複数回アクセスする場合 (例: 自己結合を実行する場合) に奇妙な動作が発生する可能性があります。
注
Apache Kafka 2.3.1 以降がサポートされています。
構成¶
Kafka コネクタを構成するには、次の内容でカタログプロパティファイル etc/catalog/kafka.properties
を作成し、プロパティを必要に応じて置き換えます。
connector.name=kafka
kafka.table-names=table1,table2
kafka.nodes=host1:port,host2:port
複数の Kafka クラスタ¶
必要な数のカタログを持つことができます。したがって、追加の Kafka クラスタがある場合は、別のプロパティファイルを etc/catalog
に別の名前で追加するだけです (必ず .properties
で終わるようにします)。たとえば、プロパティファイルに sales.properties
という名前を付けると、Presto は構成されたコネクタを使用して sales
という名前のカタログを作成します。
構成プロパティ¶
次の構成プロパティを使用できます。
プロパティ名 |
説明 |
---|---|
|
カタログによって提供されるすべてのテーブルのリスト |
|
テーブルのデフォルトのスキーマ名 |
|
Kafka クラスタ内のノードのリスト |
|
Kafka クラスタへの接続のタイムアウト |
|
ポーリングごとの最大レコード数 |
|
ポーリングごとに 1 つのパーティションから取得する最大バイト数 |
|
トピック記述ファイルを含むディレクトリ |
|
内部カラムがテーブルスキーマの一部であるかどうかを制御します |
kafka.table-names
¶
このカタログによって提供されるすべてのテーブルのコンマ区切りリスト。テーブル名は、修飾なし (単純名) であり、デフォルトのスキーマ (下記参照) に配置されるか、スキーマ名 (<schema-name>.<table-name>
) で修飾することができます。
ここで定義された各テーブルについて、テーブル記述ファイル (下記参照) が存在する場合があります。テーブル記述ファイルが存在しない場合、テーブル名は Kafka のトピック名として使用され、データカラムはテーブルにマッピングされません。テーブルには、引き続きすべての内部カラム (下記参照) が含まれます。
このプロパティは必須です。デフォルトはなく、少なくとも 1 つのテーブルを定義する必要があります。
kafka.default-schema
¶
修飾スキーマ名なしで定義されたすべてのテーブルが含まれるスキーマを定義します。
このプロパティはオプションです。デフォルトは default
です。
kafka.nodes
¶
Kafka データノードの hostname:port
ペアのコンマ区切りリスト。
このプロパティは必須です。デフォルトはなく、少なくとも 1 つのノードを定義する必要があります。
注
Presto は、ここに指定されたノードのサブセットのみであっても、メッセージが特定のノードにのみ配置される可能性があるため、クラスタのすべてのノードに接続できる必要があります。
kafka.connect-timeout
¶
データノードへの接続のタイムアウト。ビジー状態の Kafka クラスタは、接続を受け入れるまでにかなりの時間がかかる場合があります。タイムアウトのためにクエリが失敗する場合は、この値を大きくすることが適切な戦略です。
このプロパティはオプションです。デフォルトは 10 秒 (10s
) です。
kafka.max-poll-records
¶
Kafka からの poll() ごとの最大レコード数。
このプロパティはオプションです。デフォルトは 500
です。
kafka.max-partition-fetch-bytes``¶
ポーリングごとに 1 つのパーティションから取得する最大バイト数
このプロパティはオプションです。デフォルトは 1MB
です。
kafka.table-description-dir
¶
1 つ以上の JSON ファイル (.json
で終わる必要があります) を保持する Presto デプロイメント内のフォルダを参照します。これらのファイルにはテーブル記述ファイルが含まれます。
このプロパティはオプションです。デフォルトは etc/kafka
です。
kafka.hide-internal-columns
¶
テーブル記述ファイルで定義されたデータカラムに加えて、コネクタは各テーブルに追加の多数のカラムを保持します。これらのカラムが非表示になっている場合でも、クエリで使用できますが、DESCRIBE <テーブル名>
または SELECT *
には表示されません。
このプロパティはオプションです。デフォルトは true
です。
内部カラム¶
定義された各テーブルについて、コネクタは次のカラムを保持します。
カラム名 |
型 |
説明 |
---|---|---|
|
BIGINT |
この行を含む Kafka パーティションの ID。 |
|
BIGINT |
BIGINT |
|
_message_corrupt |
BOOLEAN |
|
_message |
VARCHAR |
|
BIGINT |
_message_length |
|
_message_corrupt |
メッセージのバイト数。 |
|
_message |
BOOLEAN |
|
BIGINT |
_key |
VARCHAR
UTF-8 エンコード文字列としてのキーバイト。これは、テキストキーにのみ役立ちます。
_key_length
注
BIGINT
キーのバイト数。
{
"tableName": ...,
"schemaName": ...,
"topicName": ...,
"key": {
"dataFormat": ...,
"fields": [
...
]
},
"message": {
"dataFormat": ...,
"fields": [
...
]
}
}
テーブル定義ファイルがないテーブルの場合、 |
テーブル定義ファイル¶ |
型 |
説明 |
---|---|---|---|
|
必須 |
文字列 |
このファイルで定義される Presto のテーブル名。 |
|
オプション |
文字列 |
テーブルが含まれるスキーマ。省略した場合、デフォルトのスキーマ名が使用されます。 |
|
必須 |
文字列 |
マッピングされる Kafka トピック。 |
|
オプション |
JSON オブジェクト |
メッセージキーにマッピングされるデータ列のフィールド定義。 |
|
オプション |
JSON オブジェクト |
メッセージ自体にマッピングされるデータ列のフィールド定義。 |
Kafka におけるキーとメッセージ¶
Kafka 0.8 以降では、トピック内の各メッセージにはオプションのキーを含めることができます。テーブル定義ファイルには、データをテーブル列にマッピングするためのキーとメッセージの両方のセクションが含まれています。
テーブル定義の key
フィールドと message
フィールドはそれぞれ JSON オブジェクトであり、以下の 2 つのフィールドを含む必要があります。
テーブル定義ファイルがないテーブルの場合、 |
テーブル定義ファイル¶ |
型 |
説明 |
---|---|---|---|
|
必須 |
文字列 |
このフィールドグループのデコーダを選択します。 |
|
必須 |
JSON 配列 |
フィールド定義のリスト。各フィールド定義は Presto テーブルに新しい列を作成します。 |
各フィールド定義は JSON オブジェクトです
{
"name": ...,
"type": ...,
"dataFormat": ...,
"mapping": ...,
"formatHint": ...,
"hidden": ...,
"comment": ...
}
テーブル定義ファイルがないテーブルの場合、 |
テーブル定義ファイル¶ |
型 |
説明 |
---|---|---|---|
|
必須 |
文字列 |
Presto テーブルの列の名前。 |
|
必須 |
文字列 |
列の Presto 型。 |
|
オプション |
文字列 |
このフィールドの列デコーダを選択します。デフォルトでは、この行データ形式と列型に対するデフォルトのデコーダが使用されます。 |
|
オプション |
文字列 |
Avro スキーマが存在するパスまたは URL。Avro デコーダでのみ使用されます。 |
|
オプション |
文字列 |
列のマッピング情報。これはデコーダ固有です。以下を参照してください。 |
|
オプション |
文字列 |
列デコーダに対して、列固有のフォーマットヒントを設定します。 |
|
オプション |
ブール値 |
|
|
オプション |
文字列 |
|
キーまたはメッセージのフィールド記述に制限はありません。
行のデコード¶
キーとメッセージの場合、デコーダを使用してメッセージとキーデータをテーブル列にマッピングします。
Kafka コネクタには以下のデコーダが含まれています
raw
- Kafka メッセージは解釈されず、生のメッセージバイトの範囲がテーブル列にマッピングされます。csv
- Kafka メッセージはカンマ区切りのメッセージとして解釈され、フィールドがテーブル列にマッピングされます。json
- Kafka メッセージは JSON として解析され、JSON フィールドがテーブル列にマッピングされます。avro
- Kafka メッセージは Avro スキーマに基づいて解析され、Avro フィールドがテーブル列にマッピングされます。
注
テーブルのテーブル定義ファイルが存在しない場合、dummy
デコーダが使用され、列は何も公開されません。
raw
デコーダ¶
raw デコーダは、Kafka メッセージまたはキーからの生の (バイトベースの) 値の読み取りをサポートし、それを Presto 列に変換します。
フィールドでは、以下の属性がサポートされています
dataFormat
- 変換されたデータ型の幅を選択しますtype
- Presto データ型 (サポートされているデータ型のリストについては以下の表を参照してください)mapping
-<開始位置>[:<終了位置>]
; 変換するバイトの開始位置と終了位置 (オプション)
dataFormat
属性は、変換されるバイト数を選択します。存在しない場合、BYTE
が想定されます。すべての値は符号付きです。
サポートされている値は次のとおりです
BYTE
- 1 バイトSHORT
- 2 バイト (ビッグエンディアン)INT
- 4 バイト (ビッグエンディアン)LONG
- 8 バイト (ビッグエンディアン)FLOAT
- 4 バイト (IEEE 754 形式)DOUBLE
- 8 バイト (IEEE 754 形式)
type
属性は、値をマッピングする Presto データ型を定義します。
列に割り当てられた Presto 型に応じて、dataFormat の異なる値を使用できます
Presto データ型 |
許可される |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
mapping
属性は、デコードに使用されるキーまたはメッセージ内のバイトの範囲を指定します。コロン (<開始位置>[:<終了位置>]
) で区切られた 1 つまたは 2 つの数値にすることができます。
開始位置のみが指定されている場合
固定幅型の場合、列は指定された
dataFormat
に適切なバイト数を使用します (上記を参照)。
VARCHAR
値がデコードされると、開始位置からメッセージの最後まで、すべてのバイトが使用されます。
開始位置と終了位置が指定されている場合
固定幅型の場合、サイズは指定された
dataFormat
で使用されるバイト数と等しくなければなりません。
VARCHAR
の場合、開始位置 (含む) と終了位置 (除く) の間のすべてのバイトが使用されます。
mapping
属性が指定されていない場合、開始位置を 0 に設定し、終了位置を未定義のままにするのと同じです。
数値データ型 (BIGINT
、INTEGER
、SMALLINT
、TINYINT
、DOUBLE
) のデコードスキームは簡単です。バイトのシーケンスが入力メッセージから読み取られ、以下のいずれかに従ってデコードされます
ビッグエンディアンエンコーディング (整数型の場合)
IEEE 754 形式 (
DOUBLE
の場合)。
デコードされたバイトシーケンスの長さは、dataFormat
によって暗示されます。
VARCHAR
データ型の場合、バイトのシーケンスは UTF-8 エンコーディングに従って解釈されます。
csv
デコーダ¶
CSV デコーダは、メッセージまたはキーを表すバイトを UTF-8 エンコーディングを使用して文字列に変換し、その結果を CSV (カンマ区切り値) 行として解釈します。
フィールドの場合、type
属性と mapping
属性を定義する必要があります
type
- Presto データ型 (サポートされているデータ型のリストについては以下の表を参照してください)mapping
- CSV レコード内のフィールドのインデックス
dataFormat
と formatHint
はサポートされておらず、省略する必要があります。
以下の表は、type
で使用できるサポートされている Presto 型とデコードスキームを示しています
Presto データ型 |
デコード規則 |
---|---|
BIGINT INTEGER SMALLINT TINYINT |
Java |
|
Java |
|
「true」の文字シーケンスは |
|
そのまま使用されます |
json
デコーダ¶
JSON デコーダは、メッセージまたはキーを表すバイトを、RFC 4627 に従って JSON に変換します。メッセージまたはキーは、配列または単純型ではなく、JSON オブジェクトに変換される必要があることに注意してください。
フィールドでは、以下の属性がサポートされています
type
- 列の Presto 型。dataFormat
- 列に使用されるフィールドデコーダ。mapping
- JSON オブジェクトからフィールドを選択するための、スラッシュで区切られたフィールド名のリストformatHint
-custom-date-time
のみ。以下を参照してください。
JSON デコーダは複数のフィールドデコーダをサポートしており、標準テーブル列には _default
が使用され、日付と時刻ベースの型には複数のデコーダが使用されます。
以下の表は、type
で使用できる Presto データ型と、dataFormat
属性を介して指定できる一致するフィールドデコーダを示しています
Presto データ型 |
許可される |
---|---|
BIGINT INTEGER SMALLINT TINYINT DOUBLE _message_corrupt _message VARCHAR(x) |
デフォルトのフィールドデコーダ (省略された |
TIMESTAMP TIMESTAMP WITH TIME ZONE TIME TIME WITH TIME ZONE |
|
|
|
デフォルトフィールドデコーダー¶
これは、すべてのPresto物理データ型をサポートする標準のフィールドデコーダーです。フィールド値は、JSON変換ルールによってブール値、long値、double値、または文字列値に強制変換されます。日付/時刻に基づかない列には、このデコーダーを使用する必要があります。
日付と時刻のデコーダー¶
JSONオブジェクトの値をPrestoのDATE
、TIME
、TIME WITH TIME ZONE
、TIMESTAMP
、またはTIMESTAMP WITH TIME ZONE
列に変換するには、フィールド定義のdataFormat
属性を使用して特別なデコーダーを選択する必要があります。
iso8601
- テキストベースで、テキストフィールドをISO 8601タイムスタンプとして解析します。rfc2822
- テキストベースで、テキストフィールドをRFC 2822タイムスタンプとして解析します。custom-date-time
- テキストベースで、formatHint
属性を介して指定されたJoda形式パターンに従ってテキストフィールドを解析します。形式パターンは、https://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.htmlに準拠する必要があります。
milliseconds-since-epoch
- 数値ベースで、テキストまたは数値をエポックからのミリ秒数として解釈します。seconds-since-epoch
- 数値ベースで、テキストまたは数値をエポックからのミリ秒数として解釈します。
TIMESTAMP WITH TIME ZONE
およびTIME WITH TIME ZONE
データ型の場合、デコードされた値にタイムゾーン情報が含まれている場合、その情報がPresto値で使用されます。それ以外の場合、結果のタイムゾーンはUTC
に設定されます。
avro
デコーダー¶
Avroデコーダーは、スキーマに基づいてAvro形式のメッセージまたはキーを表すバイトを変換します。メッセージにはAvroスキーマが埋め込まれている必要があります。PrestoはスキーマレスAvroデコードをサポートしていません。
キー/メッセージの場合、avro
デコーダーを使用する場合、dataSchema
を定義する必要があります。これは、デコードする必要があるメッセージの有効なAvroスキーマファイルの場所を指す必要があります。この場所は、リモートWebサーバー(例:dataSchema: 'http://example.org/schema/avro_data.avsc'
)またはローカルファイルシステム(例:dataSchema: '/usr/local/schema/avro_data.avsc'
)にすることができます。この場所にPrestoコーディネーターノードからアクセスできない場合、デコーダーは失敗します。
フィールドでは、以下の属性がサポートされています
name
- Prestoテーブルの列の名前。type
- 列の Presto 型。mapping
- Avroスキーマからフィールドを選択するためのスラッシュ区切りのフィールド名のリスト。mapping
で指定されたフィールドが元のAvroスキーマに存在しない場合、読み取り操作はNULLを返します。
以下の表は、同等のAvroフィールドタイプに対してtype
で使用できる、サポートされているPrestoタイプをリストします。
Presto データ型 |
許可されるAvroデータ型 |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Avroスキーマの進化¶
Avroデコーダーは、後方互換性のあるスキーマの進化機能をサポートしています。後方互換性を使用すると、古いスキーマで作成されたAvroデータを読み取るために、新しいスキーマを使用できます。Avroスキーマの変更は、Prestoのトピック定義ファイルにも反映する必要があります。新しく追加/名前変更されたフィールドは、Avroスキーマファイルにデフォルト値が設定されている必要があります。
スキーマの進化の動作は次のとおりです
新しいスキーマに追加された列:古いスキーマで作成されたデータは、テーブルが新しいスキーマを使用している場合、デフォルト値を生成します。
新しいスキーマで削除された列:古いスキーマで作成されたデータは、削除された列からデータを生成しなくなります。
新しいスキーマで名前が変更された列:これは列を削除して新しい列を追加するのと同じであり、古いスキーマで作成されたデータは、テーブルが新しいスキーマを使用している場合、デフォルト値を生成します。
新しいスキーマで列のタイプを変更する:Avroで型の強制変換がサポートされている場合、変換が行われます。互換性のない型の場合、エラーがスローされます。