Apache Presto - Connettore KAFKA

Il connettore Kafka per Presto consente di accedere ai dati da Apache Kafka utilizzando Presto.

Prerequisiti

Scarica e installa l'ultima versione dei seguenti progetti Apache.

  • Apache ZooKeeper
  • Apache Kafka

Avvia ZooKeeper

Avvia il server ZooKeeper utilizzando il seguente comando.

$ bin/zookeeper-server-start.sh config/zookeeper.properties

Ora ZooKeeper avvia il port su 2181.

Avvia Kafka

Avvia Kafka in un altro terminale usando il seguente comando.

$ bin/kafka-server-start.sh config/server.properties

Dopo l'avvio di kafka, utilizza il numero di porta 9092.

Dati TPCH

Scarica tpch-kafka

$  curl -o kafka-tpch 
https://repo1.maven.org/maven2/de/softwareforge/kafka_tpch_0811/1.0/kafka_tpch_ 
0811-1.0.sh

Ora hai scaricato il caricatore dalla centrale di Maven usando il comando sopra. Riceverai una risposta simile alla seguente.

% Total    % Received % Xferd  Average Speed   Time    Time     Time  Current 
                                 Dload  Upload   Total   Spent    Left  Speed 
  0     0    0     0    0     0      0      0 --:--:--  0:00:01 --:--:--     0  
  5 21.6M    5 1279k    0     0  83898      0  0:04:30  0:00:15  0:04:15  129k
  6 21.6M    6 1407k    0     0  86656      0  0:04:21  0:00:16  0:04:05  131k  
 24 21.6M   24 5439k    0     0   124k      0  0:02:57  0:00:43  0:02:14  175k 
 24 21.6M   24 5439k    0     0   124k      0  0:02:58  0:00:43  0:02:15  160k 
 25 21.6M   25 5736k    0     0   128k      0  0:02:52  0:00:44  0:02:08  181k 
 ………………………..

Quindi, rendilo eseguibile utilizzando il seguente comando,

$ chmod 755 kafka-tpch

Esegui tpch-kafka

Eseguire il programma kafka-tpch per precaricare una serie di argomenti con i dati tpch utilizzando il seguente comando.

Query

$ ./kafka-tpch load --brokers localhost:9092 --prefix tpch. --tpch-type tiny

Risultato

2016-07-13T16:15:52.083+0530 INFO main io.airlift.log.Logging Logging 
to stderr
2016-07-13T16:15:52.124+0530 INFO main de.softwareforge.kafka.LoadCommand
Processing tables: [customer, orders, lineitem, part, partsupp, supplier,
nation, region]
2016-07-13T16:15:52.834+0530 INFO pool-1-thread-1
de.softwareforge.kafka.LoadCommand Loading table 'customer' into topic 'tpch.customer'...
2016-07-13T16:15:52.834+0530 INFO pool-1-thread-2
de.softwareforge.kafka.LoadCommand Loading table 'orders' into topic 'tpch.orders'...
2016-07-13T16:15:52.834+0530 INFO pool-1-thread-3
de.softwareforge.kafka.LoadCommand Loading table 'lineitem' into topic 'tpch.lineitem'...
2016-07-13T16:15:52.834+0530 INFO pool-1-thread-4
de.softwareforge.kafka.LoadCommand Loading table 'part' into topic 'tpch.part'...
………………………
……………………….

Ora, le tabelle Kafka clienti, ordini, fornitori, ecc. Vengono caricate utilizzando tpch.

Aggiungi impostazioni di configurazione

Aggiungiamo le seguenti impostazioni di configurazione del connettore Kafka sul server Presto.

connector.name = kafka  

kafka.nodes = localhost:9092  

kafka.table-names = tpch.customer,tpch.orders,tpch.lineitem,tpch.part,tpch.partsupp, 
tpch.supplier,tpch.nation,tpch.region  

kafka.hide-internal-columns = false

Nella configurazione precedente, le tabelle Kafka vengono caricate utilizzando il programma Kafka-tpch.

Avvia Presto CLI

Avvia Presto CLI utilizzando il seguente comando,

$ ./presto --server localhost:8080 --catalog kafka —schema tpch;

Qui “tpch" è uno schema per il connettore Kafka e riceverai una risposta come la seguente.

presto:tpch>

Elenca tabelle

La seguente query elenca tutte le tabelle in “tpch” schema.

Query

presto:tpch> show tables;

Risultato

Table 
---------- 
 customer 
 lineitem 
 nation 
 orders
 part 
 partsupp 
 region 
 supplier

Descrivi la tabella dei clienti

La seguente query descrive “customer” tavolo.

Query

presto:tpch> describe customer;

Risultato

Column           |  Type   |                   Comment 
-------------------+---------+--------------------------------------------- 
 _partition_id     | bigint  | Partition Id 
 _partition_offset | bigint  | Offset for the message within the partition 
 _segment_start    | bigint  | Segment start offset 
 _segment_end      | bigint  | Segment end offset 
 _segment_count    | bigint  | Running message count per segment 
 _key              | varchar | Key text 
 _key_corrupt      | boolean | Key data is corrupt 
 _key_length       | bigint  | Total number of key bytes 
 _message          | varchar | Message text 
 _message_corrupt  | boolean | Message data is corrupt 
 _message_length   | bigint  | Total number of message bytes