Kafkaコネクタチュートリアル¶
はじめに¶
Presto用Kafkaコネクタを使用すると、PrestoからApache Kafkaのライブトピックデータにアクセスできます。このチュートリアルでは、トピックを設定する方法と、Prestoテーブルを支えるトピック記述ファイルを作成する方法を説明します。
インストール¶
このチュートリアルでは、Prestoと動作するローカルPrestoインストールに精通していることを前提としています(Prestoのデプロイを参照)。Apache KafkaのセットアップとPrestoとの統合に焦点を当てます。
ステップ1:Apache Kafkaをインストールする¶
Apache Kafkaをダウンロードして解凍します。
注記
このチュートリアルは、Apache Kafka 0.8.1でテストされました。Apache Kafkaの0.8.xバージョンであれば動作するはずです。
ZooKeeperとKafkaサーバーを起動します
$ bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
$ bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
これにより、ポート2181
でZookeeperが、ポート9092
でKafkaが起動します。
ステップ2:データをロードする¶
Mavenセントラルからtpch-kafkaローダーをダウンロードします
$ curl -o kafka-tpch https://repo1.maven.org/maven2/de/softwareforge/kafka_tpch_0811/1.0/kafka_tpch_0811-1.0.sh
$ chmod 755 kafka-tpch
次に、kafka-tpch
プログラムを実行して、tpchデータを含む複数のトピックをプリロードします
$ ./kafka-tpch load --brokers localhost:9092 --prefix tpch. --tpch-type tiny
2014-07-28T17:17:07.594-0700 INFO main com.facebook.airlift.log.Logging Logging to stderr
2014-07-28T17:17:07.623-0700 INFO main de.softwareforge.kafka.LoadCommand Processing tables: [customer, orders, lineitem, part, partsupp, supplier, nation, region]
2014-07-28T17:17:07.981-0700 INFO pool-1-thread-1 de.softwareforge.kafka.LoadCommand Loading table 'customer' into topic 'tpch.customer'...
2014-07-28T17:17:07.981-0700 INFO pool-1-thread-2 de.softwareforge.kafka.LoadCommand Loading table 'orders' into topic 'tpch.orders'...
2014-07-28T17:17:07.981-0700 INFO pool-1-thread-3 de.softwareforge.kafka.LoadCommand Loading table 'lineitem' into topic 'tpch.lineitem'...
2014-07-28T17:17:07.982-0700 INFO pool-1-thread-4 de.softwareforge.kafka.LoadCommand Loading table 'part' into topic 'tpch.part'...
2014-07-28T17:17:07.982-0700 INFO pool-1-thread-5 de.softwareforge.kafka.LoadCommand Loading table 'partsupp' into topic 'tpch.partsupp'...
2014-07-28T17:17:07.982-0700 INFO pool-1-thread-6 de.softwareforge.kafka.LoadCommand Loading table 'supplier' into topic 'tpch.supplier'...
2014-07-28T17:17:07.982-0700 INFO pool-1-thread-7 de.softwareforge.kafka.LoadCommand Loading table 'nation' into topic 'tpch.nation'...
2014-07-28T17:17:07.982-0700 INFO pool-1-thread-8 de.softwareforge.kafka.LoadCommand Loading table 'region' into topic 'tpch.region'...
2014-07-28T17:17:10.612-0700 ERROR pool-1-thread-8 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.region
2014-07-28T17:17:10.781-0700 INFO pool-1-thread-8 de.softwareforge.kafka.LoadCommand Generated 5 rows for table 'region'.
2014-07-28T17:17:10.797-0700 ERROR pool-1-thread-3 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.lineitem
2014-07-28T17:17:10.932-0700 ERROR pool-1-thread-1 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.customer
2014-07-28T17:17:11.068-0700 ERROR pool-1-thread-2 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.orders
2014-07-28T17:17:11.200-0700 ERROR pool-1-thread-6 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.supplier
2014-07-28T17:17:11.319-0700 INFO pool-1-thread-6 de.softwareforge.kafka.LoadCommand Generated 100 rows for table 'supplier'.
2014-07-28T17:17:11.333-0700 ERROR pool-1-thread-4 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.part
2014-07-28T17:17:11.466-0700 ERROR pool-1-thread-5 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.partsupp
2014-07-28T17:17:11.597-0700 ERROR pool-1-thread-7 kafka.producer.async.DefaultEventHandler Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: tpch.nation
2014-07-28T17:17:11.706-0700 INFO pool-1-thread-7 de.softwareforge.kafka.LoadCommand Generated 25 rows for table 'nation'.
2014-07-28T17:17:12.180-0700 INFO pool-1-thread-1 de.softwareforge.kafka.LoadCommand Generated 1500 rows for table 'customer'.
2014-07-28T17:17:12.251-0700 INFO pool-1-thread-4 de.softwareforge.kafka.LoadCommand Generated 2000 rows for table 'part'.
2014-07-28T17:17:12.905-0700 INFO pool-1-thread-2 de.softwareforge.kafka.LoadCommand Generated 15000 rows for table 'orders'.
2014-07-28T17:17:12.919-0700 INFO pool-1-thread-5 de.softwareforge.kafka.LoadCommand Generated 8000 rows for table 'partsupp'.
2014-07-28T17:17:13.877-0700 INFO pool-1-thread-3 de.softwareforge.kafka.LoadCommand Generated 60175 rows for table 'lineitem'.
Kafkaには、クエリを実行するためのデータがプリロードされた複数のトピックが用意されています。
ステップ3:KafkaトピックをPrestoに認識させる¶
Prestoインストールで、Kafkaコネクタのカタログプロパティファイルetc/catalog/kafka.properties
を追加します。このファイルには、Kafkaノードとトピックがリストされています
connector.name=kafka
kafka.nodes=localhost:9092
kafka.table-names=tpch.customer,tpch.orders,tpch.lineitem,tpch.part,tpch.partsupp,tpch.supplier,tpch.nation,tpch.region
kafka.hide-internal-columns=false
Prestoを起動します
$ bin/launcher start
Kafkaテーブルはすべて設定でtpch.
プレフィックスが付いているため、テーブルはtpch
スキーマにあります。プロパティファイルの名前がkafka.properties
であるため、コネクタはkafka
カタログにマウントされます。
Presto CLIを起動します
$ ./presto --catalog kafka --schema tpch
テーブルをリストして、動作していることを確認します
presto:tpch> SHOW TABLES;
Table
----------
customer
lineitem
nation
orders
part
partsupp
region
supplier
(8 rows)
ステップ4:基本的なデータクエリ¶
Kafkaデータは構造化されておらず、メッセージの形式を記述するメタデータがありません。追加の設定を行わない場合、Kafkaコネクタはデータにアクセスして生の形式でマッピングできますが、組み込みのカラム以外に実際のカラムはありません
presto:tpch> DESCRIBE customer;
Column | Type | Extra | Comment
-------------------+---------+-------+---------------------------------------------
_partition_id | bigint | | Partition Id
_partition_offset | bigint | | Offset for the message within the partition
_key | varchar | | Key text
_key_corrupt | boolean | | Key data is corrupt
_key_length | bigint | | Total number of key bytes
_message | varchar | | Message text
_message_corrupt | boolean | | Message data is corrupt
_message_length | bigint | | Total number of message bytes
(11 rows)
presto:tpch> SELECT count(*) FROM customer;
_col0
-------
1500
presto:tpch> SELECT _message FROM customer LIMIT 5;
_message
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
{"rowNumber":1,"customerKey":1,"name":"Customer#000000001","address":"IVhzIApeRb ot,c,E","nationKey":15,"phone":"25-989-741-2988","accountBalance":711.56,"marketSegment":"BUILDING","comment":"to the even, regular platelets. regular, ironic epitaphs nag e"}
{"rowNumber":3,"customerKey":3,"name":"Customer#000000003","address":"MG9kdTD2WBHm","nationKey":1,"phone":"11-719-748-3364","accountBalance":7498.12,"marketSegment":"AUTOMOBILE","comment":" deposits eat slyly ironic, even instructions. express foxes detect slyly. blithel
{"rowNumber":5,"customerKey":5,"name":"Customer#000000005","address":"KvpyuHCplrB84WgAiGV6sYpZq7Tj","nationKey":3,"phone":"13-750-942-6364","accountBalance":794.47,"marketSegment":"HOUSEHOLD","comment":"n accounts will have to unwind. foxes cajole accor"}
{"rowNumber":7,"customerKey":7,"name":"Customer#000000007","address":"TcGe5gaZNgVePxU5kRrvXBfkasDTea","nationKey":18,"phone":"28-190-982-9759","accountBalance":9561.95,"marketSegment":"AUTOMOBILE","comment":"ainst the ironic, express theodolites. express, even pinto bean
{"rowNumber":9,"customerKey":9,"name":"Customer#000000009","address":"xKiAFTjUsCuxfeleNqefumTrjS","nationKey":8,"phone":"18-338-906-3675","accountBalance":8324.07,"marketSegment":"FURNITURE","comment":"r theodolites according to the requests wake thinly excuses: pending
(5 rows)
presto:tpch> SELECT sum(cast(json_extract_scalar(_message, '$.accountBalance') AS double)) FROM customer LIMIT 10;
_col0
------------
6681865.59
(1 row)
KafkaからのデータはPrestoを使用してクエリできますが、まだ実際のテーブル形状ではありません。生のデータは_message
および_key
カラムを介して利用できますが、カラムにデコードされていません。サンプルデータはJSON形式であるため、Prestoに組み込まれているJSON関数と演算子を使用してデータをスライスできます。
ステップ5:トピック記述ファイルを追加する¶
Kafkaコネクタは、生のデータをテーブル形式に変換するためのトピック記述ファイルをサポートしています。これらのファイルは、Prestoインストールのetc/kafka
フォルダにあり、.json
で終わる必要があります。ファイル名はテーブル名と一致させることをお勧めしますが、必須ではありません。
次のファイルをetc/kafka/tpch.customer.json
として追加し、Prestoを再起動します
{
"tableName": "customer",
"schemaName": "tpch",
"topicName": "tpch.customer",
"key": {
"dataFormat": "raw",
"fields": [
{
"name": "kafka_key",
"dataFormat": "LONG",
"type": "BIGINT",
"hidden": "false"
}
]
}
}
customerテーブルには、kafka_key
という追加のカラムが追加されました。
presto:tpch> DESCRIBE customer;
Column | Type | Extra | Comment
-------------------+---------+-------+---------------------------------------------
kafka_key | bigint | |
_partition_id | bigint | | Partition Id
_partition_offset | bigint | | Offset for the message within the partition
_key | varchar | | Key text
_key_corrupt | boolean | | Key data is corrupt
_key_length | bigint | | Total number of key bytes
_message | varchar | | Message text
_message_corrupt | boolean | | Message data is corrupt
_message_length | bigint | | Total number of message bytes
(12 rows)
presto:tpch> SELECT kafka_key FROM customer ORDER BY kafka_key LIMIT 10;
kafka_key
-----------
0
1
2
3
4
5
6
7
8
9
(10 rows)
トピック定義ファイルは、内部Kafkaキー(8バイトの生のlong)をPrestoのBIGINT
カラムにマッピングします。
ステップ6:トピックメッセージのすべての値をカラムにマッピングする¶
etc/kafka/tpch.customer.json
ファイルを更新して、メッセージのフィールドを追加し、Prestoを再起動します。メッセージ内のフィールドはJSONであるため、json
データ形式を使用します。これは、キーとメッセージに異なるデータ形式が使用される例です。
{
"tableName": "customer",
"schemaName": "tpch",
"topicName": "tpch.customer",
"key": {
"dataFormat": "raw",
"fields": [
{
"name": "kafka_key",
"dataFormat": "LONG",
"type": "BIGINT",
"hidden": "false"
}
]
},
"message": {
"dataFormat": "json",
"fields": [
{
"name": "row_number",
"mapping": "rowNumber",
"type": "BIGINT"
},
{
"name": "customer_key",
"mapping": "customerKey",
"type": "BIGINT"
},
{
"name": "name",
"mapping": "name",
"type": "VARCHAR"
},
{
"name": "address",
"mapping": "address",
"type": "VARCHAR"
},
{
"name": "nation_key",
"mapping": "nationKey",
"type": "BIGINT"
},
{
"name": "phone",
"mapping": "phone",
"type": "VARCHAR"
},
{
"name": "account_balance",
"mapping": "accountBalance",
"type": "DOUBLE"
},
{
"name": "market_segment",
"mapping": "marketSegment",
"type": "VARCHAR"
},
{
"name": "comment",
"mapping": "comment",
"type": "VARCHAR"
}
]
}
}
これで、メッセージのJSONのすべてのフィールドについて、カラムが定義され、以前の合計クエリはaccount_balance
カラムで直接操作できます
presto:tpch> DESCRIBE customer;
Column | Type | Extra | Comment
-------------------+---------+-------+---------------------------------------------
kafka_key | bigint | |
row_number | bigint | |
customer_key | bigint | |
name | varchar | |
address | varchar | |
nation_key | bigint | |
phone | varchar | |
account_balance | double | |
market_segment | varchar | |
comment | varchar | |
_partition_id | bigint | | Partition Id
_partition_offset | bigint | | Offset for the message within the partition
_key | varchar | | Key text
_key_corrupt | boolean | | Key data is corrupt
_key_length | bigint | | Total number of key bytes
_message | varchar | | Message text
_message_corrupt | boolean | | Message data is corrupt
_message_length | bigint | | Total number of message bytes
(21 rows)
presto:tpch> SELECT * FROM customer LIMIT 5;
kafka_key | row_number | customer_key | name | address | nation_key | phone | account_balance | market_segment | comment
-----------+------------+--------------+--------------------+---------------------------------------+------------+-----------------+-----------------+----------------+---------------------------------------------------------------------------------------------------------
1 | 2 | 2 | Customer#000000002 | XSTf4,NCwDVaWNe6tEgvwfmRchLXak | 13 | 23-768-687-3665 | 121.65 | AUTOMOBILE | l accounts. blithely ironic theodolites integrate boldly: caref
3 | 4 | 4 | Customer#000000004 | XxVSJsLAGtn | 4 | 14-128-190-5944 | 2866.83 | MACHINERY | requests. final, regular ideas sleep final accou
5 | 6 | 6 | Customer#000000006 | sKZz0CsnMD7mp4Xd0YrBvx,LREYKUWAh yVn | 20 | 30-114-968-4951 | 7638.57 | AUTOMOBILE | tions. even deposits boost according to the slyly bold packages. final accounts cajole requests. furious
7 | 8 | 8 | Customer#000000008 | I0B10bB0AymmC, 0PrRYBCP1yGJ8xcBPmWhl5 | 17 | 27-147-574-9335 | 6819.74 | BUILDING | among the slyly regular theodolites kindle blithely courts. carefully even theodolites haggle slyly alon
9 | 10 | 10 | Customer#000000010 | 6LrEaV6KR6PLVcgl2ArL Q3rqzLzcT1 v2 | 5 | 15-741-346-9870 | 2753.54 | HOUSEHOLD | es regular deposits haggle. fur
(5 rows)
presto:tpch> SELECT sum(account_balance) FROM customer LIMIT 10;
_col0
------------
6681865.59
(1 row)
これで、customer
トピックメッセージのすべてのフィールドがPrestoテーブルカラムとして利用できるようになりました。
ステップ7:ライブデータを使用する¶
Prestoは、Kafkaのライブデータを到着時にクエリできます。データのライブフィードをシミュレートするために、このチュートリアルでは、Kafkaへのライブツイートのフィードを設定します。
ライブTwitterフィードを設定する¶
twistrツールをダウンロードします
$ curl -o twistr https://repo1.maven.org/maven2/de/softwareforge/twistr_kafka_0811/1.2/twistr_kafka_0811-1.2.sh
$ chmod 755 twistr
https://dev.twitter.com/で開発者アカウントを作成し、アクセスとコンシューマートークンを設定します。
twistr.properties
ファイルを作成し、アクセスとコンシューマーのキーとシークレットを配置します
twistr.access-token-key=...
twistr.access-token-secret=...
twistr.consumer-key=...
twistr.consumer-secret=...
twistr.kafka.brokers=localhost:9092
Prestoにtweetsテーブルを作成する¶
etc/catalog/kafka.properties
ファイルにtweetsテーブルを追加します
connector.name=kafka
kafka.nodes=localhost:9092
kafka.table-names=tpch.customer,tpch.orders,tpch.lineitem,tpch.part,tpch.partsupp,tpch.supplier,tpch.nation,tpch.region,tweets
kafka.hide-internal-columns=false
Twitterフィードのトピック定義ファイルをetc/kafka/tweets.json
として追加します
{
"tableName": "tweets",
"topicName": "twitter_feed",
"dataFormat": "json",
"key": {
"dataFormat": "raw",
"fields": [
{
"name": "kafka_key",
"dataFormat": "LONG",
"type": "BIGINT",
"hidden": "false"
}
]
},
"message": {
"dataFormat":"json",
"fields": [
{
"name": "text",
"mapping": "text",
"type": "VARCHAR"
},
{
"name": "user_name",
"mapping": "user/screen_name",
"type": "VARCHAR"
},
{
"name": "lang",
"mapping": "lang",
"type": "VARCHAR"
},
{
"name": "created_at",
"mapping": "created_at",
"type": "TIMESTAMP",
"dataFormat": "rfc2822"
},
{
"name": "favorite_count",
"mapping": "favorite_count",
"type": "BIGINT"
},
{
"name": "retweet_count",
"mapping": "retweet_count",
"type": "BIGINT"
},
{
"name": "favorited",
"mapping": "favorited",
"type": "BOOLEAN"
},
{
"name": "id",
"mapping": "id_str",
"type": "VARCHAR"
},
{
"name": "in_reply_to_screen_name",
"mapping": "in_reply_to_screen_name",
"type": "VARCHAR"
},
{
"name": "place_name",
"mapping": "place/full_name",
"type": "VARCHAR"
}
]
}
}
このテーブルには明示的なスキーマ名がないため、default
スキーマに配置されます。
ライブデータを送信する¶
twistrツールを起動します
$ java -Dness.config.location=file:$(pwd) -Dness.config=twistr -jar ./twistr
twistr
はTwitter APIに接続し、「サンプルツイート」フィードをtwitter_feed
というKafkaトピックに送信します。
ライブデータに対してクエリを実行します
$ ./presto-cli --catalog kafka --schema default
presto:default> SELECT count(*) FROM tweets;
_col0
-------
4467
(1 row)
presto:default> SELECT count(*) FROM tweets;
_col0
-------
4517
(1 row)
presto:default> SELECT count(*) FROM tweets;
_col0
-------
4572
(1 row)
presto:default> SELECT kafka_key, user_name, lang, created_at FROM tweets LIMIT 10;
kafka_key | user_name | lang | created_at
--------------------+-----------------+------+-------------------------
494227746231685121 | burncaniff | en | 2014-07-29 14:07:31.000
494227746214535169 | gu8tn | ja | 2014-07-29 14:07:31.000
494227746219126785 | pequitamedicen | es | 2014-07-29 14:07:31.000
494227746201931777 | josnyS | ht | 2014-07-29 14:07:31.000
494227746219110401 | Cafe510 | en | 2014-07-29 14:07:31.000
494227746210332673 | Da_JuanAnd_Only | en | 2014-07-29 14:07:31.000
494227746193956865 | Smile_Kidrauhl6 | pt | 2014-07-29 14:07:31.000
494227750426017793 | CashforeverCD | en | 2014-07-29 14:07:32.000
494227750396653569 | FilmArsivimiz | tr | 2014-07-29 14:07:32.000
494227750388256769 | jmolas | es | 2014-07-29 14:07:32.000
(10 rows)
Kafkaへのライブフィードがあり、Prestoを使用してクエリできます。
エピローグ:タイムスタンプ¶
前のステップで設定したツイートフィードには、各ツイートの created_at
属性として、RFC 2822 形式のタイムスタンプが含まれています。
presto:default> SELECT DISTINCT json_extract_scalar(_message, '$.created_at')) AS raw_date
-> FROM tweets LIMIT 5;
raw_date
--------------------------------
Tue Jul 29 21:07:31 +0000 2014
Tue Jul 29 21:07:32 +0000 2014
Tue Jul 29 21:07:33 +0000 2014
Tue Jul 29 21:07:34 +0000 2014
Tue Jul 29 21:07:35 +0000 2014
(5 rows)
ツイートテーブルのトピック定義ファイルには、rfc2822
コンバーターを使用したタイムスタンプへのマッピングが含まれています。
...
{
"name": "created_at",
"mapping": "created_at",
"type": "TIMESTAMP",
"dataFormat": "rfc2822"
},
...
これにより、未加工のデータを Presto のタイムスタンプ列にマッピングできます。
presto:default> SELECT created_at, raw_date FROM (
-> SELECT created_at, json_extract_scalar(_message, '$.created_at') AS raw_date
-> FROM tweets)
-> GROUP BY 1, 2 LIMIT 5;
created_at | raw_date
-------------------------+--------------------------------
2014-07-29 14:07:20.000 | Tue Jul 29 21:07:20 +0000 2014
2014-07-29 14:07:21.000 | Tue Jul 29 21:07:21 +0000 2014
2014-07-29 14:07:22.000 | Tue Jul 29 21:07:22 +0000 2014
2014-07-29 14:07:23.000 | Tue Jul 29 21:07:23 +0000 2014
2014-07-29 14:07:24.000 | Tue Jul 29 21:07:24 +0000 2014
(5 rows)
Kafka コネクタには、ISO 8601、RFC 2822 テキスト形式、およびエポックからの秒数またはミリ秒を使用した数値ベースのタイムスタンプ用のコンバーターが含まれています。 また、Joda-Time フォーマット文字列を使用してテキスト列を解析する、汎用的なテキストベースのフォーマッターも用意されています。