這篇文章將為大家詳細(xì)講解有關(guān)Kafka Connect及FileConnector的示例分析,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個(gè)參考,希望大家閱讀完這篇文章后對(duì)相關(guān)知識(shí)有一定的了解。
站在用戶的角度思考問(wèn)題,與客戶深入溝通,找到北屯網(wǎng)站設(shè)計(jì)與北屯網(wǎng)站推廣的解決方案,憑借多年的經(jīng)驗(yàn),讓設(shè)計(jì)與互聯(lián)網(wǎng)技術(shù)結(jié)合,創(chuàng)造個(gè)性化、用戶體驗(yàn)好的作品,建站類型包括:成都做網(wǎng)站、成都網(wǎng)站建設(shè)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣、空間域名、網(wǎng)頁(yè)空間、企業(yè)郵箱。業(yè)務(wù)覆蓋北屯地區(qū)。
Kafka是一個(gè)使用越來(lái)越廣的消息系統(tǒng),尤其是在大數(shù)據(jù)開(kāi)發(fā)中(實(shí)時(shí)數(shù)據(jù)處理和分析)。為何集成其他系統(tǒng)和解耦應(yīng)用,經(jīng)常使用Producer來(lái)發(fā)送消息到Broker,并使用Consumer來(lái)消費(fèi)Broker中的消息。Kafka Connect是到0.9版本才提供的并極大的簡(jiǎn)化了其他系統(tǒng)與Kafka的集成。Kafka Connect運(yùn)用用戶快速定義并實(shí)現(xiàn)各種Connector(File,Jdbc,Hdfs等),這些功能讓大批量數(shù)據(jù)導(dǎo)入/導(dǎo)出Kafka很方便。
如圖中所示,左側(cè)的Sources負(fù)責(zé)從其他異構(gòu)系統(tǒng)中讀取數(shù)據(jù)并導(dǎo)入到Kafka中;右側(cè)的Sinks是把Kafka中的數(shù)據(jù)寫(xiě)入到其他的系統(tǒng)中。
Kafka Connector很多,包括開(kāi)源和商業(yè)版本的。如下列表中是常用的開(kāi)源的Connector
Connectors | References |
---|---|
Jdbc | Source, Sink |
Elastic Search | Sink1, Sink2, Sink3 |
Cassandra | Source1, Source 2, Sink1, Sink2 |
MongoDB | Source |
HBase | Sink |
Syslog | Source |
MQTT (Source) | Source |
Twitter (Source) | Source, Sink |
S3 | Sink1, Sink2 |
商業(yè)版的可以通過(guò)Confluent.io獲得
本例演示如何使用Kafka Connect把Source(test.txt)轉(zhuǎn)為流數(shù)據(jù)再寫(xiě)入到Destination(test.sink.txt)中。如下圖所示:
本例使用到了兩個(gè)Connector:
FileStreamSource:從test.txt中讀取并發(fā)布到Broker中
FileStreamSink:從Broker中讀取數(shù)據(jù)并寫(xiě)入到test.sink.txt文件中 其中的Source使用到的配置文件是${KAFKA_HOME}/config/connect-file-source.properties
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
topic=connect-test
其中的Sink使用到的配置文件是${KAFKA_HOME}/config/connect-file-sink.properties
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
topics=connect-test
Broker使用到的配置文件是${KAFKA_HOME}/config/connect-standalone.properties
bootstrap.servers=localhost:9092key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=trueinternal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
需要熟悉Kafka的一些命令行,參考本系列之前的文章Apache Kafka系列(二) 命令行工具(CLI)
[root@localhost bin]# cd /opt/kafka_2.11-0.11.0.0/
[root@localhost kafka_2.11-0.11.0.0]# ls
bin config libs LICENSE logs NOTICE site-docs
[root@localhost kafka_2.11-0.11.0.0]# ./bin/zookeeper-server-start.sh ./config/zookeeper.properties &
[root@localhost kafka_2.11-0.11.0.0]# ./bin/kafka-server-start.sh ./config/server.properties &
[root@localhost kafka_2.11-0.11.0.0]# ./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
3.3.3 打開(kāi)console-consumer
./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic connect-test
[root@Server4 kafka_2.12-0.11.0.0]# echo 'firest line' >> test.txt
[root@Server4 kafka_2.12-0.11.0.0]# echo 'second line' >> test.txt
{"schema":{"type":"string","optional":false},"payload":"firest line"}
{"schema":{"type":"string","optional":false},"payload":"second line"}
[root@Server4 kafka_2.12-0.11.0.0]# cat test.sink.txt
firest line
second line
關(guān)于Kafka Connect及FileConnector的示例分析就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到。
當(dāng)前標(biāo)題:KafkaConnect及FileConnector的示例分析
當(dāng)前地址:http://bm7419.com/article26/pcescg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供手機(jī)網(wǎng)站建設(shè)、虛擬主機(jī)、App設(shè)計(jì)、網(wǎng)站改版、云服務(wù)器、網(wǎng)站制作
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)