KafkaConnect及FileConnector的示例分析

這篇文章將為大家詳細(xì)講解有關(guān)Kafka Connect及FileConnector的示例分析,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。

站在用戶的角度思考問(wèn)題,與客戶深入溝通,找到北屯網(wǎng)站設(shè)計(jì)與北屯網(wǎng)站推廣的解決方案,憑借多年的經(jīng)驗(yàn),讓設(shè)計(jì)與互聯(lián)網(wǎng)技術(shù)結(jié)合,創(chuàng)造個(gè)性化、用戶體驗(yàn)好的作品,建站類型包括:成都做網(wǎng)站、成都網(wǎng)站建設(shè)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣、空間域名、網(wǎng)頁(yè)空間、企業(yè)郵箱。業(yè)務(wù)覆蓋北屯地區(qū)。

一. Kafka Connect簡(jiǎn)介

Kafka是一個(gè)使用越來(lái)越廣的消息系統(tǒng),尤其是在大數(shù)據(jù)開(kāi)發(fā)中(實(shí)時(shí)數(shù)據(jù)處理和分析)。為何集成其他系統(tǒng)和解耦應(yīng)用,經(jīng)常使用Producer來(lái)發(fā)送消息到Broker,并使用Consumer來(lái)消費(fèi)Broker中的消息。Kafka Connect是到0.9版本才提供的并極大的簡(jiǎn)化了其他系統(tǒng)與Kafka的集成。Kafka Connect運(yùn)用用戶快速定義并實(shí)現(xiàn)各種Connector(File,Jdbc,Hdfs等),這些功能讓大批量數(shù)據(jù)導(dǎo)入/導(dǎo)出Kafka很方便。

Kafka Connect及FileConnector的示例分析

如圖中所示,左側(cè)的Sources負(fù)責(zé)從其他異構(gòu)系統(tǒng)中讀取數(shù)據(jù)并導(dǎo)入到Kafka中;右側(cè)的Sinks是把Kafka中的數(shù)據(jù)寫(xiě)入到其他的系統(tǒng)中。

二. 各種Kafka Connector

Kafka Connector很多,包括開(kāi)源和商業(yè)版本的。如下列表中是常用的開(kāi)源的Connector

ConnectorsReferences
JdbcSource, Sink
Elastic SearchSink1, Sink2, Sink3
CassandraSource1, Source 2, Sink1, Sink2
MongoDBSource
HBaseSink
SyslogSource
MQTT (Source)Source
Twitter (Source)Source, Sink
S3Sink1, Sink2

商業(yè)版的可以通過(guò)Confluent.io獲得

三. 示例

3.1 FileConnector Demo

本例演示如何使用Kafka Connect把Source(test.txt)轉(zhuǎn)為流數(shù)據(jù)再寫(xiě)入到Destination(test.sink.txt)中。如下圖所示:

Kafka Connect及FileConnector的示例分析

本例使用到了兩個(gè)Connector:

  • FileStreamSource:從test.txt中讀取并發(fā)布到Broker中

  • FileStreamSink:從Broker中讀取數(shù)據(jù)并寫(xiě)入到test.sink.txt文件中 其中的Source使用到的配置文件是${KAFKA_HOME}/config/connect-file-source.properties

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test

其中的Sink使用到的配置文件是${KAFKA_HOME}/config/connect-file-sink.properties

name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test

Broker使用到的配置文件是${KAFKA_HOME}/config/connect-standalone.properties

bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=trueinternal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
3.2 運(yùn)行Demo

需要熟悉Kafka的一些命令行,參考本系列之前的文章Apache Kafka系列(二) 命令行工具(CLI)

3.2.1 啟動(dòng)Kafka Broker
[root@localhost bin]# cd /opt/kafka_2.11-0.11.0.0/
[root@localhost kafka_2.11-0.11.0.0]# ls
bin  config  libs  LICENSE  logs  NOTICE  site-docs
[root@localhost kafka_2.11-0.11.0.0]# ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
[root@localhost kafka_2.11-0.11.0.0]# ./bin/kafka-server-start.sh ./config/server.properties &
3.2.2 啟動(dòng)Source Connector和Sink Connector
[root@localhost kafka_2.11-0.11.0.0]# ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

3.3.3 打開(kāi)console-consumer

./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic connect-test
3.3.4 寫(xiě)入到test.txt文件中,并觀察3.3.3中的變化
[root@Server4 kafka_2.12-0.11.0.0]# echo 'firest line' >> test.txt
[root@Server4 kafka_2.12-0.11.0.0]# echo 'second line' >> test.txt
3.3.3中打開(kāi)的窗口輸出如下
{"schema":{"type":"string","optional":false},"payload":"firest line"}
{"schema":{"type":"string","optional":false},"payload":"second line"}
3.3.5 查看test.sink.txt
[root@Server4 kafka_2.12-0.11.0.0]# cat test.sink.txt
firest line
second line

關(guān)于Kafka Connect及FileConnector的示例分析就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到。

當(dāng)前標(biāo)題:KafkaConnect及FileConnector的示例分析
當(dāng)前地址:http://bm7419.com/article26/pcescg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供手機(jī)網(wǎng)站建設(shè)、虛擬主機(jī)App設(shè)計(jì)、網(wǎng)站改版云服務(wù)器、網(wǎng)站制作

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

綿陽(yáng)服務(wù)器托管