Kafka コネクタ¶
概要¶
このコネクタを使用すると、Apache Kafka のトピックを Presto のテーブルとして使用できます。各メッセージは Presto の行として表示されます。
トピックはライブにできます。データが到着すると行が表示され、メッセージが削除されると行が消えます。これにより、単一のクエリで同じテーブルに複数回アクセスする場合 (例: 自己結合を実行する場合) に奇妙な動作が発生する可能性があります。
注
Apache Kafka 2.3.1 以降がサポートされています。
構成¶
Kafka コネクタを構成するには、次の内容でカタログプロパティファイル etc/catalog/kafka.properties を作成し、プロパティを必要に応じて置き換えます。
connector.name=kafka
kafka.table-names=table1,table2
kafka.nodes=host1:port,host2:port
複数の Kafka クラスタ¶
必要な数のカタログを持つことができます。したがって、追加の Kafka クラスタがある場合は、別のプロパティファイルを etc/catalog に別の名前で追加するだけです (必ず .properties で終わるようにします)。たとえば、プロパティファイルに sales.properties という名前を付けると、Presto は構成されたコネクタを使用して sales という名前のカタログを作成します。
構成プロパティ¶
次の構成プロパティを使用できます。
| プロパティ名 | 説明 | 
|---|---|
| 
 | カタログによって提供されるすべてのテーブルのリスト | 
| 
 | テーブルのデフォルトのスキーマ名 | 
| 
 | Kafka クラスタ内のノードのリスト | 
| 
 | Kafka クラスタへの接続のタイムアウト | 
| 
 | ポーリングごとの最大レコード数 | 
| 
 | ポーリングごとに 1 つのパーティションから取得する最大バイト数 | 
| 
 | トピック記述ファイルを含むディレクトリ | 
| 
 | 内部カラムがテーブルスキーマの一部であるかどうかを制御します | 
kafka.table-names¶
このカタログによって提供されるすべてのテーブルのコンマ区切りリスト。テーブル名は、修飾なし (単純名) であり、デフォルトのスキーマ (下記参照) に配置されるか、スキーマ名 (<schema-name>.<table-name>) で修飾することができます。
ここで定義された各テーブルについて、テーブル記述ファイル (下記参照) が存在する場合があります。テーブル記述ファイルが存在しない場合、テーブル名は Kafka のトピック名として使用され、データカラムはテーブルにマッピングされません。テーブルには、引き続きすべての内部カラム (下記参照) が含まれます。
このプロパティは必須です。デフォルトはなく、少なくとも 1 つのテーブルを定義する必要があります。
kafka.default-schema¶
修飾スキーマ名なしで定義されたすべてのテーブルが含まれるスキーマを定義します。
このプロパティはオプションです。デフォルトは default です。
kafka.nodes¶
Kafka データノードの hostname:port ペアのコンマ区切りリスト。
このプロパティは必須です。デフォルトはなく、少なくとも 1 つのノードを定義する必要があります。
注
Presto は、ここに指定されたノードのサブセットのみであっても、メッセージが特定のノードにのみ配置される可能性があるため、クラスタのすべてのノードに接続できる必要があります。
kafka.connect-timeout¶
データノードへの接続のタイムアウト。ビジー状態の Kafka クラスタは、接続を受け入れるまでにかなりの時間がかかる場合があります。タイムアウトのためにクエリが失敗する場合は、この値を大きくすることが適切な戦略です。
このプロパティはオプションです。デフォルトは 10 秒 (10s) です。
kafka.max-poll-records¶
Kafka からの poll() ごとの最大レコード数。
このプロパティはオプションです。デフォルトは 500 です。
kafka.max-partition-fetch-bytes``¶
ポーリングごとに 1 つのパーティションから取得する最大バイト数
このプロパティはオプションです。デフォルトは 1MB です。
kafka.table-description-dir¶
1 つ以上の JSON ファイル (.json で終わる必要があります) を保持する Presto デプロイメント内のフォルダを参照します。これらのファイルにはテーブル記述ファイルが含まれます。
このプロパティはオプションです。デフォルトは etc/kafka です。
kafka.hide-internal-columns¶
テーブル記述ファイルで定義されたデータカラムに加えて、コネクタは各テーブルに追加の多数のカラムを保持します。これらのカラムが非表示になっている場合でも、クエリで使用できますが、DESCRIBE <テーブル名> または SELECT * には表示されません。
このプロパティはオプションです。デフォルトは true です。
内部カラム¶
定義された各テーブルについて、コネクタは次のカラムを保持します。
| カラム名 | 型 | 説明 | 
|---|---|---|
| 
 | BIGINT | この行を含む Kafka パーティションの ID。 | 
| 
 | BIGINT | BIGINT | 
| 
 | _message_corrupt | BOOLEAN | 
| 
 | _message | VARCHAR | 
| 
 | BIGINT | _message_length | 
| 
 | _message_corrupt | メッセージのバイト数。 | 
| 
 | _message | BOOLEAN | 
| 
 | BIGINT | _key | 
VARCHAR
UTF-8 エンコード文字列としてのキーバイト。これは、テキストキーにのみ役立ちます。
_key_length
注
BIGINT
キーのバイト数。
{
    "tableName": ...,
    "schemaName": ...,
    "topicName": ...,
    "key": {
        "dataFormat": ...,
        "fields": [
            ...
        ]
    },
    "message": {
        "dataFormat": ...,
        "fields": [
            ...
       ]
    }
}
| テーブル定義ファイルがないテーブルの場合、 | テーブル定義ファイル¶ | 型 | 説明 | 
|---|---|---|---|
| 
 | 必須 | 文字列 | このファイルで定義される Presto のテーブル名。 | 
| 
 | オプション | 文字列 | テーブルが含まれるスキーマ。省略した場合、デフォルトのスキーマ名が使用されます。 | 
| 
 | 必須 | 文字列 | マッピングされる Kafka トピック。 | 
| 
 | オプション | JSON オブジェクト | メッセージキーにマッピングされるデータ列のフィールド定義。 | 
| 
 | オプション | JSON オブジェクト | メッセージ自体にマッピングされるデータ列のフィールド定義。 | 
Kafka におけるキーとメッセージ¶
Kafka 0.8 以降では、トピック内の各メッセージにはオプションのキーを含めることができます。テーブル定義ファイルには、データをテーブル列にマッピングするためのキーとメッセージの両方のセクションが含まれています。
テーブル定義の key フィールドと message フィールドはそれぞれ JSON オブジェクトであり、以下の 2 つのフィールドを含む必要があります。
| テーブル定義ファイルがないテーブルの場合、 | テーブル定義ファイル¶ | 型 | 説明 | 
|---|---|---|---|
| 
 | 必須 | 文字列 | このフィールドグループのデコーダを選択します。 | 
| 
 | 必須 | JSON 配列 | フィールド定義のリスト。各フィールド定義は Presto テーブルに新しい列を作成します。 | 
各フィールド定義は JSON オブジェクトです
{
    "name": ...,
    "type": ...,
    "dataFormat": ...,
    "mapping": ...,
    "formatHint": ...,
    "hidden": ...,
    "comment": ...
}
| テーブル定義ファイルがないテーブルの場合、 | テーブル定義ファイル¶ | 型 | 説明 | 
|---|---|---|---|
| 
 | 必須 | 文字列 | Presto テーブルの列の名前。 | 
| 
 | 必須 | 文字列 | 列の Presto 型。 | 
| 
 | オプション | 文字列 | このフィールドの列デコーダを選択します。デフォルトでは、この行データ形式と列型に対するデフォルトのデコーダが使用されます。 | 
| 
 | オプション | 文字列 | Avro スキーマが存在するパスまたは URL。Avro デコーダでのみ使用されます。 | 
| 
 | オプション | 文字列 | 列のマッピング情報。これはデコーダ固有です。以下を参照してください。 | 
| 
 | オプション | 文字列 | 列デコーダに対して、列固有のフォーマットヒントを設定します。 | 
| 
 | オプション | ブール値 | 
 | 
| 
 | オプション | 文字列 | 
 | 
キーまたはメッセージのフィールド記述に制限はありません。
行のデコード¶
キーとメッセージの場合、デコーダを使用してメッセージとキーデータをテーブル列にマッピングします。
Kafka コネクタには以下のデコーダが含まれています
- raw- Kafka メッセージは解釈されず、生のメッセージバイトの範囲がテーブル列にマッピングされます。
- csv- Kafka メッセージはカンマ区切りのメッセージとして解釈され、フィールドがテーブル列にマッピングされます。
- json- Kafka メッセージは JSON として解析され、JSON フィールドがテーブル列にマッピングされます。
- avro- Kafka メッセージは Avro スキーマに基づいて解析され、Avro フィールドがテーブル列にマッピングされます。
注
テーブルのテーブル定義ファイルが存在しない場合、dummy デコーダが使用され、列は何も公開されません。
raw デコーダ¶
raw デコーダは、Kafka メッセージまたはキーからの生の (バイトベースの) 値の読み取りをサポートし、それを Presto 列に変換します。
フィールドでは、以下の属性がサポートされています
- dataFormat- 変換されたデータ型の幅を選択します
- type- Presto データ型 (サポートされているデータ型のリストについては以下の表を参照してください)
- mapping-- <開始位置>[:<終了位置>]; 変換するバイトの開始位置と終了位置 (オプション)
dataFormat 属性は、変換されるバイト数を選択します。存在しない場合、BYTE が想定されます。すべての値は符号付きです。
サポートされている値は次のとおりです
- BYTE- 1 バイト
- SHORT- 2 バイト (ビッグエンディアン)
- INT- 4 バイト (ビッグエンディアン)
- LONG- 8 バイト (ビッグエンディアン)
- FLOAT- 4 バイト (IEEE 754 形式)
- DOUBLE- 8 バイト (IEEE 754 形式)
type 属性は、値をマッピングする Presto データ型を定義します。
列に割り当てられた Presto 型に応じて、dataFormat の異なる値を使用できます
| Presto データ型 | 許可される  | 
|---|---|
| 
 | 
 | 
| 
 | 
 | 
| 
 | 
 | 
| 
 | 
 | 
| 
 | 
 | 
| 
 | 
 | 
| 
 | 
 | 
mapping 属性は、デコードに使用されるキーまたはメッセージ内のバイトの範囲を指定します。コロン (<開始位置>[:<終了位置>]) で区切られた 1 つまたは 2 つの数値にすることができます。
開始位置のみが指定されている場合
固定幅型の場合、列は指定された
dataFormatに適切なバイト数を使用します (上記を参照)。
VARCHAR値がデコードされると、開始位置からメッセージの最後まで、すべてのバイトが使用されます。
開始位置と終了位置が指定されている場合
固定幅型の場合、サイズは指定された
dataFormatで使用されるバイト数と等しくなければなりません。
VARCHARの場合、開始位置 (含む) と終了位置 (除く) の間のすべてのバイトが使用されます。
mapping 属性が指定されていない場合、開始位置を 0 に設定し、終了位置を未定義のままにするのと同じです。
数値データ型 (BIGINT、INTEGER、SMALLINT、TINYINT、DOUBLE) のデコードスキームは簡単です。バイトのシーケンスが入力メッセージから読み取られ、以下のいずれかに従ってデコードされます
ビッグエンディアンエンコーディング (整数型の場合)
IEEE 754 形式 (
DOUBLEの場合)。
デコードされたバイトシーケンスの長さは、dataFormat によって暗示されます。
VARCHAR データ型の場合、バイトのシーケンスは UTF-8 エンコーディングに従って解釈されます。
csv デコーダ¶
CSV デコーダは、メッセージまたはキーを表すバイトを UTF-8 エンコーディングを使用して文字列に変換し、その結果を CSV (カンマ区切り値) 行として解釈します。
フィールドの場合、type 属性と mapping 属性を定義する必要があります
- type- Presto データ型 (サポートされているデータ型のリストについては以下の表を参照してください)
- mapping- CSV レコード内のフィールドのインデックス
dataFormat と formatHint はサポートされておらず、省略する必要があります。
以下の表は、type で使用できるサポートされている Presto 型とデコードスキームを示しています
| Presto データ型 | デコード規則 | 
|---|---|
| BIGINTINTEGERSMALLINTTINYINT | Java  | 
| 
 | Java  | 
| 
 | 「true」の文字シーケンスは  | 
| 
 | そのまま使用されます | 
json デコーダ¶
JSON デコーダは、メッセージまたはキーを表すバイトを、RFC 4627 に従って JSON に変換します。メッセージまたはキーは、配列または単純型ではなく、JSON オブジェクトに変換される必要があることに注意してください。
フィールドでは、以下の属性がサポートされています
- type- 列の Presto 型。
- dataFormat- 列に使用されるフィールドデコーダ。
- mapping- JSON オブジェクトからフィールドを選択するための、スラッシュで区切られたフィールド名のリスト
- formatHint-- custom-date-timeのみ。以下を参照してください。
JSON デコーダは複数のフィールドデコーダをサポートしており、標準テーブル列には _default が使用され、日付と時刻ベースの型には複数のデコーダが使用されます。
以下の表は、type で使用できる Presto データ型と、dataFormat 属性を介して指定できる一致するフィールドデコーダを示しています
| Presto データ型 | 許可される  | 
|---|---|
| BIGINTINTEGERSMALLINTTINYINTDOUBLE_message_corrupt_messageVARCHAR(x) | デフォルトのフィールドデコーダ (省略された  | 
| TIMESTAMPTIMESTAMP WITH TIME ZONETIMETIME WITH TIME ZONE | 
 | 
| 
 | 
 | 
デフォルトフィールドデコーダー¶
これは、すべてのPresto物理データ型をサポートする標準のフィールドデコーダーです。フィールド値は、JSON変換ルールによってブール値、long値、double値、または文字列値に強制変換されます。日付/時刻に基づかない列には、このデコーダーを使用する必要があります。
日付と時刻のデコーダー¶
JSONオブジェクトの値をPrestoのDATE、TIME、TIME WITH TIME ZONE、TIMESTAMP、またはTIMESTAMP WITH TIME ZONE列に変換するには、フィールド定義のdataFormat属性を使用して特別なデコーダーを選択する必要があります。
- iso8601- テキストベースで、テキストフィールドをISO 8601タイムスタンプとして解析します。
- rfc2822- テキストベースで、テキストフィールドをRFC 2822タイムスタンプとして解析します。
- custom-date-time- テキストベースで、- formatHint属性を介して指定されたJoda形式パターンに従ってテキストフィールドを解析します。
- 形式パターンは、https://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.htmlに準拠する必要があります。 
 
- milliseconds-since-epoch- 数値ベースで、テキストまたは数値をエポックからのミリ秒数として解釈します。
- seconds-since-epoch- 数値ベースで、テキストまたは数値をエポックからのミリ秒数として解釈します。
TIMESTAMP WITH TIME ZONEおよびTIME WITH TIME ZONEデータ型の場合、デコードされた値にタイムゾーン情報が含まれている場合、その情報がPresto値で使用されます。それ以外の場合、結果のタイムゾーンはUTCに設定されます。
avroデコーダー¶
Avroデコーダーは、スキーマに基づいてAvro形式のメッセージまたはキーを表すバイトを変換します。メッセージにはAvroスキーマが埋め込まれている必要があります。PrestoはスキーマレスAvroデコードをサポートしていません。
キー/メッセージの場合、avroデコーダーを使用する場合、dataSchemaを定義する必要があります。これは、デコードする必要があるメッセージの有効なAvroスキーマファイルの場所を指す必要があります。この場所は、リモートWebサーバー(例:dataSchema: 'http://example.org/schema/avro_data.avsc')またはローカルファイルシステム(例:dataSchema: '/usr/local/schema/avro_data.avsc')にすることができます。この場所にPrestoコーディネーターノードからアクセスできない場合、デコーダーは失敗します。
フィールドでは、以下の属性がサポートされています
- name- Prestoテーブルの列の名前。
- type- 列の Presto 型。
- mapping- Avroスキーマからフィールドを選択するためのスラッシュ区切りのフィールド名のリスト。- mappingで指定されたフィールドが元のAvroスキーマに存在しない場合、読み取り操作はNULLを返します。
以下の表は、同等のAvroフィールドタイプに対してtypeで使用できる、サポートされているPrestoタイプをリストします。
| Presto データ型 | 許可されるAvroデータ型 | 
|---|---|
| 
 | 
 | 
| 
 | 
 | 
| 
 | 
 | 
| 
 | 
 | 
| 
 | 
 | 
| 
 | 
 | 
| 
 | 
 | 
Avroスキーマの進化¶
Avroデコーダーは、後方互換性のあるスキーマの進化機能をサポートしています。後方互換性を使用すると、古いスキーマで作成されたAvroデータを読み取るために、新しいスキーマを使用できます。Avroスキーマの変更は、Prestoのトピック定義ファイルにも反映する必要があります。新しく追加/名前変更されたフィールドは、Avroスキーマファイルにデフォルト値が設定されている必要があります。
スキーマの進化の動作は次のとおりです
- 新しいスキーマに追加された列:古いスキーマで作成されたデータは、テーブルが新しいスキーマを使用している場合、デフォルト値を生成します。 
- 新しいスキーマで削除された列:古いスキーマで作成されたデータは、削除された列からデータを生成しなくなります。 
- 新しいスキーマで名前が変更された列:これは列を削除して新しい列を追加するのと同じであり、古いスキーマで作成されたデータは、テーブルが新しいスキーマを使用している場合、デフォルト値を生成します。 
- 新しいスキーマで列のタイプを変更する:Avroで型の強制変換がサポートされている場合、変換が行われます。互換性のない型の場合、エラーがスローされます。