Presto Worker REST API

Presto のコーディネータは、クエリフラグメントを実行し、クエリ結果を取得するために Presto ワーカーと通信します。Presto ワーカーは、中間結果を交換するために互いに通信します。この章では、これらの通信で使用される REST API について説明します。

タスクリソースは、クエリフラグメントの実行を開始し、ステータスを追跡し、結果を取得するために使用されます。

制御プレーン

コーディネータは、クエリフラグメントの実行を開始し、実行ステータスを追跡するために、次の HTTP メソッドを使用します。

  • POST/v1/task/{taskId} に送信すると、POST 本体で指定されたクエリフラグメントの実行が開始されます。リクエストには、処理する初期スプリットのセットをオプションで含めることができます。リクエストでは、結果をパーティション分割する方法も指定します。例:指定された出力列を使用して指定された数の出力バッファにハッシュパーティションするか、すべての結果を単一の出カバッファに結合するか、結合された結果を複数の出力バッファにブロードキャストします。

  • その後、POST/v1/task/{taskId} に送信すると、処理するための追加のスプリットを提供し、最終的にさらにスプリットが来ないことを指定できます。

  • GET/v1/task/{taskId}/status に送信すると、現在の実行ステータスを示す TaskStatus JSON ドキュメントが返されます。

  • GET/v1/task/{taskId} に送信すると、実行ステータスに関する拡張情報を含む TaskInfo JSON ドキュメントが返されます。

  • DELETE/v1/task/{taskId} に送信すると、完了したタスクが削除されるか、進行中のタスクがキャンセルされます。

  • GET/v1/task に送信すると、すべてのタスクの TaskInfo のリストを含む JSON ドキュメントが返されます。

コーディネータからのステータスリクエストには、X-Presto-Current-StateX-Presto-Max-Wait の 2 つの HTTP ヘッダーが含まれています。X-Presto-Current-State は、コーディネータが認識しているタスクの状態を指定します。ワーカー上のタスクの状態が異なる場合、ワーカーはすぐに応答します。ワーカー上のタスクの状態がコーディネータ上のタスクの状態と同じ場合、ワーカーはタスクの状態が変わるまで待機してから応答します。X-Presto-Max-Wait HTTP ヘッダーは、最大待機時間を指定します。ワーカーは、タスクの状態が同じであっても、その時間後に応答します。

この設計により、ワーカーをタイトなループでポーリングすることなく、コーディネータがタイムリーにタスク状態の変化を受け取ることができるようになります。

GET/v1/task/{taskId} に送信して拡張タスク情報を要求する場合も、同じ設計が適用されます。

データプレーン

コーディネータが最終的なクエリ結果を取得する場合、またはダウンストリームワーカーがアップストリームワーカーから中間結果を取得する場合、次の HTTP メソッドが使用されます。

  • GET{taskId}/results/{bufferId}/{token} に送信すると、指定された出力バッファから次のバッチの結果が返されます。前のバッチの受信を確認します。

  • GET{taskId}/results/{bufferId}/{token}/acknowledge に送信すると、結果の受信が確認され、ワーカーが結果を削除できるようになります。

  • DELETE{taskId}/results/{bufferId} に送信すると、エラーが発生した場合、指定された出力バッファからすべての結果が削除されます。

  • オプションとして、HEAD リクエストを {taskId}/results/{bufferId} に対して行い、データ以外のページシーケンス関連のヘッダーを取得できます。これを使用して、バッファが完了したかどうかを確認するか、データをフェッチせずにバッファされているデータの量を確認します。前のバッチの結果の受信を確認します。

コーディネータとワーカーは、チャンク単位で結果を取得します。それらは、X-Presto-Max-Size HTTP ヘッダーを使用して、チャンクの最大サイズ(バイト単位)を指定します。各チャンクは、単調増加シーケンス番号(トークンと呼ばれることもあります)で識別されます。結果に対する最初の要求は、シーケンス番号ゼロを指定します。応答には以下が含まれます。

  • X-Presto-Page-Sequence-Id HTTP ヘッダーとして要求されたシーケンス番号。

  • X-Presto-Page-End-Sequence-Id HTTP ヘッダーとして、チャンクの受信を確認し、次のチャンクを要求するために使用するシーケンス番号。

  • true の値を持つ X-Presto-Buffer-Complete HTTP ヘッダーとして、結果がそれ以上ないことの指示。

  • X-Presto-Buffer-Remaining-Bytes HTTP ヘッダーとして、出力バッファに残っているバイト数。これは、次のリクエストで返されるページのサイズ(バイト単位)のカンマ区切りリストを返す必要があります。これは、アップストリームタスクがデータ交換を最適化するヒントとして使用できます。

応答の本文には、シリアライズされたページワイヤフォーマット のページのリストが含まれています。

結果の最初のチャンクを受け取ると、クライアントは X-Presto-Page-End-Sequence-Id シーケンス番号を使用して、結果の次のチャンクを要求します。次のチャンクを要求すると、前のチャンクの受信が自動的に確認されます。クライアントは、true の値を持つ X-Presto-Buffer-Complete HTTP ヘッダーを受け取るまで、結果のフェッチを続けます。

クライアントが次のデータチャンクをすぐにフェッチしないことにした場合は、{taskId}/results/{bufferId}/{token}/acknowledge で GET を使用して明示的な ACK を送信します。クライアントは、トークンを、以前に受信した X-Presto-Page-End-Sequence-Id ヘッダーの値に設定します。

ワーカーが応答の生成でタイムアウトした場合、またはタスクが既に失敗または中断されている場合、ワーカーは空の結果を返します。クライアントはリクエストの再試行を試みることができます。タスクがターミナル状態にある場合、制御プレーンが最終的に状態の変更を処理すると想定されます。

クライアントが応答を見逃した場合、リクエストを繰り返すことができ、ワーカーは再度結果を送信します。シーケンス番号の ACK を受け取ると、ワーカーはそのシーケンス番号よりも小さいすべての結果を削除し、クライアントはそれらを再フェッチできなくなります。

出力バッファゼロから2チャンクの結果を取得するためのサンプルメッセージパッシングダイアグラムを次に示します。

../_images/worker-protocol-results.png

出力バッファ

データシャッフルは、ダウンストリームステージのワーカーが、アップストリームステージのワーカーから結果を取得する処理です。各アップストリームワーカーは、ダウンストリームステージのワーカー数と同じ数の出力バッファを設定します。出力バッファは、0から始まる連続した番号で識別されます。各ダウンストリームワーカーには単一の出力バッファが割り当てられ、それを用いてすべてのアップストリームワーカーから結果を取得します。

以下の図は、3つのダウンストリームワーカーを示しています。これらには、出力バッファ番号0、1、2が割り当てられています。各アップストリームワーカーは3つの出力バッファを持っています。ダウンストリームワーカー#0は、バッファ番号0を使用してすべてのアップストリームワーカーから結果を取得します。ダウンストリームワーカー#1は、バッファ番号1を使用してすべてのアップストリームワーカーから結果を取得します。ダウンストリームワーカー#2は、バッファ番号2を使用してすべてのアップストリームワーカーから結果を取得します。

../_images/worker-protocol-output-buffers.png

障害処理

タスクの失敗は、TaskStatusTaskInfoの更新を通じてコーディネーターに報告されます。

タスクの失敗が検出されると、コーディネーターは残りのすべてのタスクを中止し、クライアントにクエリ失敗を報告します。タスクの失敗が発生した場合、または中止要求を受信した場合、それ以降の処理はすべて停止し、残りのタスク出力はすべて破棄されます。

失敗したタスクまたは中止されたタスクは、カスケード障害を防ぐために、通常通りデータプレーン要求に応答し続けます。出力は失敗時に完全に破棄されるため、後続の応答はすべて空になります。ダウンストリームタスクが正常に終了して不正な結果を生成するのを防ぐために、X-Presto-Buffer-Completeヘッダーはfalseに設定されます。

クライアントにとっては、これらの応答は正常なタスクの応答と区別できません。要求バーストを避けるため、空の結果セットで応答する前に、標準的な遅延が適用されます。

問題の診断

HTTPリクエストのログは、プロトコル関連の問題の診断に役立ちます。

config.propertiesファイルを使用して、リクエストログを有効にすることができます。

Prestoの場合

http-server.log.enabled=true
http-server.log.path=<request_log_file_path>

Prestissimoの場合(ログは標準ログに書き込まれます)

http-server.enable-access-log=true

特定のプロトコルインタラクションを追跡するには、grepを使用します。

交換

cat stderr* | grep '/v1/task/20240402_223203_00000_kg5tr.11.0.455.0/results'
I0402 15:33:06.928076   625 AccessLogFilter.cpp:69] 2401:db00:126c:f2f:face:0:3e1:0 - - [2024-04-02 15:33:06] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/results/213/0 HTTP/1.1" 200 0   57
I0402 15:33:07.181629   625 AccessLogFilter.cpp:69] 2401:db00:126c:f2f:face:0:3e1:0 - - [2024-04-02 15:33:07] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/results/213/0 HTTP/1.1" 200 94024   0
I0402 15:33:25.392717   675 AccessLogFilter.cpp:69] 2401:db00:126c:f2f:face:0:3e1:0 - - [2024-04-02 15:33:25] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/results/213/1 HTTP/1.1" 200 0   0
I0402 15:33:25.393162   675 AccessLogFilter.cpp:69] 2401:db00:126c:f2f:face:0:3e1:0 - - [2024-04-02 15:33:25] "DELETE /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/results/213 HTTP/1.1" 200 0   0

TaskStatus更新

cat stderr* | grep '/v1/task/20240402_223203_00000_kg5tr.11.0.455.0/status'
I0402 15:33:34.629278   668 AccessLogFilter.cpp:69] 2401:db00:1210:4267:face:0:15:0 - - [2024-04-02 15:33:34] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/status HTTP/1.1" 200 739   1000
I0402 15:33:35.636466   668 AccessLogFilter.cpp:69] 2401:db00:1210:4267:face:0:15:0 - - [2024-04-02 15:33:35] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/status HTTP/1.1" 200 739   1000
I0402 15:33:36.644189   668 AccessLogFilter.cpp:69] 2401:db00:1210:4267:face:0:15:0 - - [2024-04-02 15:33:36] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/status HTTP/1.1" 200 739   1000
I0402 15:33:36.768704   668 AccessLogFilter.cpp:69] 2401:db00:1210:4267:face:0:15:0 - - [2024-04-02 15:33:36] "GET /v1/task/20240402_223203_00000_kg5tr.11.0.455.0/status HTTP/1.1" 200 717   115

ログレコードには、応答ステータス、応答サイズ、応答時間などの情報が含まれており、遅延やタイムアウトを含むインタラクションの流れを理解するのに役立ちます。