現(xiàn)如今,我們來到了數(shù)據(jù)時代,數(shù)據(jù)信息化與我們的生活與工作息息相關(guān)。此篇文章簡述利用大數(shù)據(jù)框架,實(shí)時處理數(shù)據(jù)的流程與相關(guān)框架的介紹,主要包括:
網(wǎng)站建設(shè)哪家好,找創(chuàng)新互聯(lián)!專注于網(wǎng)頁設(shè)計(jì)、網(wǎng)站建設(shè)、微信開發(fā)、微信小程序、集團(tuán)企業(yè)網(wǎng)站建設(shè)等服務(wù)項(xiàng)目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了承德縣免費(fèi)建站歡迎大家使用!
數(shù)據(jù)實(shí)時處理的概念和意義
數(shù)據(jù)實(shí)時處理能做什么
數(shù)據(jù)實(shí)時處理架構(gòu)簡介
數(shù)據(jù)實(shí)時處理代碼演示
什么是數(shù)據(jù)實(shí)時處理呢?我個人對數(shù)據(jù)實(shí)時處理的理解為:數(shù)據(jù)從生成->實(shí)時采集->實(shí)時緩存存儲->(準(zhǔn))實(shí)時計(jì)算->實(shí)時落地->實(shí)時展示->實(shí)時分析。這一個流程線下來,處理數(shù)據(jù)的速度在秒級甚至毫秒級。
數(shù)據(jù)實(shí)時處理有什么意義呢?我們得到數(shù)據(jù)可以進(jìn)行數(shù)據(jù)分析,利用數(shù)據(jù)統(tǒng)計(jì)方法,從錯綜復(fù)雜的數(shù)據(jù)關(guān)系中梳理出事物的聯(lián)系,比如發(fā)展趨勢、影響因素、因果關(guān)系等。甚至建立一些BI,對一些數(shù)據(jù)的有用信息進(jìn)行可視化呈現(xiàn),并形成數(shù)據(jù)故事。
何為數(shù)據(jù)的實(shí)時計(jì)算?我們從數(shù)據(jù)源端拿到數(shù)據(jù),可能不盡如人意,我們想對得到的數(shù)據(jù)進(jìn)行 ETL 操作、或者進(jìn)行關(guān)聯(lián)等等,那么我們就會用到數(shù)據(jù)的實(shí)時計(jì)算。目前主流的實(shí)時計(jì)算框架有 spark,storm,flink 等。
數(shù)據(jù)的實(shí)時落地,意思是將我們的源數(shù)據(jù)或者計(jì)算好的數(shù)據(jù)進(jìn)行實(shí)時的存儲。在大數(shù)據(jù)領(lǐng)域,推薦使用 HDFS,ES 等進(jìn)行存儲。
我們拿到了數(shù)據(jù),要會用數(shù)據(jù)的價(jià)值。數(shù)據(jù)的價(jià)值體現(xiàn)在數(shù)據(jù)中相互關(guān)聯(lián)關(guān)系,或與歷史關(guān)聯(lián),或能預(yù)測未來。我們實(shí)時得到數(shù)據(jù),不僅能夠利用前端框架進(jìn)行實(shí)時展示,還可以對其中的一些數(shù)據(jù)進(jìn)行算法訓(xùn)練,預(yù)測未來走勢等。
example:
淘寶雙 11 大屏,每年的雙 11 是淘寶粉絲瘋狂的日子。馬云會在雙 11 的當(dāng)天在阿里總部豎起一面大的電子屏幕,展示淘寶這一天的成績。例如成交額,訪問人數(shù),訂單量,下單量,成交量等等。這個電子大屏的背后,就是用到的我們所說的數(shù)據(jù)的實(shí)時處理。首先,阿里的服務(wù)器遍布全國各地,這些服務(wù)器收集PC端、手機(jī)端等日志,上報(bào)到服務(wù)器,在服務(wù)上部署數(shù)據(jù)采集工具。接下來,由于數(shù)據(jù)量龐大,需要做數(shù)據(jù)的緩存緩沖處理。下一步,對原始日志進(jìn)行實(shí)時的計(jì)算,比如篩選出上面所述的各個指標(biāo)。最后,通過接口或者其他形式,進(jìn)行前端屏幕的實(shí)時展示。
接下來是我們介紹的重點(diǎn),先放一張數(shù)據(jù)流程圖:
cdn.xitu.io/2018/9/3/1659d6798453f811?imageView2/0/w/1280/h/960/format/webp/ignore-error/1">
數(shù)據(jù)采集端,選用目前采集數(shù)據(jù)的主流控件 flume。
數(shù)據(jù)緩沖緩存,選用分布式消息隊(duì)列 kafka。
數(shù)據(jù)實(shí)時計(jì)算,選用 spark 計(jì)算引擎。
數(shù)據(jù)存儲位置,選用分布式數(shù)據(jù)存儲 ES。
其他,指從 ES 中拿到數(shù)據(jù)后進(jìn)行可視化展示,數(shù)據(jù)分析等。
下面將分別簡單的介紹下各個組件:
flume 是一個分布式的數(shù)據(jù)收集系統(tǒng),具有高可靠、高可用、事務(wù)管理、失敗重啟、聚合和傳輸?shù)裙δ?。?shù)據(jù)處理速度快,完全可以用于生產(chǎn)環(huán)境。
flume 的核心概念有:event,agent,source,channel,sink
flume 的數(shù)據(jù)流由事件 (event) 貫穿始終。event 是 flume 的基本數(shù)據(jù)單位,它攜帶日志數(shù)據(jù)并且攜帶數(shù)據(jù)的頭信息,這些 event 由 agent 外部的 source 生成,當(dāng) source 捕獲事件后會進(jìn)行特定的格式化,然后 source 會把事件推入 channel 中??梢园?channel 看作是一個緩沖區(qū),它將保存事件直到 sink 處理完該事件。sink 負(fù)責(zé)持久化日志或者把事件推向另一個 source。
flume 的核心是 agent。agent 是一個 java 進(jìn)程,運(yùn)行在日志收集端,通過 agent 接收日志,然后暫存起來,再發(fā)送到目的地。 每臺機(jī)器運(yùn)行一個 agent。 agent 里面可以包含多個 source,channel,sink。
source 是數(shù)據(jù)的收集端,負(fù)責(zé)將數(shù)據(jù)捕獲后進(jìn)行特殊的格式化,將數(shù)據(jù)封裝到 event 里,然后將事件推入 channel 中。flume 提供了很多內(nèi)置的 source,支持 avro,log4j,syslog 等等。如果內(nèi)置的 source 無法滿足環(huán)境的需求,flume 還支持自定義 source。
channel 是連接 source 和 sink 的組件,大家可以將它看做一個數(shù)據(jù)的緩沖區(qū)(數(shù)據(jù)隊(duì)列),它可以將事件暫存到內(nèi)存中也可以持久化到本地磁盤上, 直到 sink 處理完該事件。兩個較為常用的 channel,MemoryChannel 和 FileChannel。
sink 從 channel 中取出事件,然后將數(shù)據(jù)發(fā)到別處,可以向文件系統(tǒng)、數(shù)據(jù)庫、hadoop、kafka,也可以是其他 agent 的 source。
flume 的可靠性:當(dāng)節(jié)點(diǎn)出現(xiàn)故障時,日志能夠被傳送到其他節(jié)點(diǎn)上而不會丟失。Flume 提供了可靠性保障,收到數(shù)據(jù)首先寫到磁盤上,當(dāng)數(shù)據(jù)傳送成功后,再刪除;如果數(shù)據(jù)發(fā)送失敗,可以重新發(fā)送。
flume 的可恢復(fù)性:可恢復(fù)性是靠 channel。
口述抽象,上兩張官網(wǎng)貼圖:
單個 agent 收集數(shù)據(jù)流程圖
多個 agent 協(xié)作處理數(shù)據(jù)流程圖
Kafka 是一個高吞吐量的分布式發(fā)布-訂閱消息系統(tǒng)。企業(yè)中一般使用 kafka 做消息中間件,做緩沖緩存處理。需要 zookeeper 分布式協(xié)調(diào)組件管理。
kafka 的設(shè)計(jì)目標(biāo):
提供優(yōu)秀的消息持久化能力,對 TB 級以上數(shù)據(jù)也能保證常數(shù)時間的訪問性能。
高吞吐率。即使在非常廉價(jià)的機(jī)器上也能做到每臺機(jī)每秒 100000 條消息的傳輸。
支持 kafka server 間的消息分區(qū),及分布式消費(fèi),同時保證每個 partition 內(nèi)的消息順序傳輸。
同時支持離線數(shù)據(jù)處理和實(shí)時數(shù)據(jù)處理。
kafka 核心概念
broker:消息中間件處理結(jié)點(diǎn),一個 kafka 節(jié)點(diǎn)就是一個 broker,多個 broker 可以組成一個 kafka 集群。
topic:主題,kafka 集群能夠同時負(fù)責(zé)多個 topic 的分發(fā)。
partition:topic 物理上的分組,一個 topic 可以分為多個 partition,每個 partition 是一個有序的隊(duì)列。
offset:每個 partition 都由一系列有序的、不可變的消息組成,這些消息被連續(xù)的追加到 partition 中。partition 中的每個消息都有一個連續(xù)的序列號叫做 offset,用于 partition 唯一標(biāo)識一條消息。
producer:負(fù)責(zé)發(fā)布消息到 kafka broker。
consumer:消息消費(fèi)者,向 kafka broker讀取消息的客戶端。
consumer group:每個 consumer 屬于一個特定的 consumer group。
貼兩張官網(wǎng)圖
prodecer-broker-consumer
分區(qū)圖
spark 是一個分布式的計(jì)算框架,是我目前認(rèn)為最火的計(jì)算框架。
spark,是一種"one stack to rulethem all"的大數(shù)據(jù)計(jì)算框架,期望使用一個技術(shù)棧就完美地解決大數(shù)據(jù)領(lǐng)域的各種計(jì)算任務(wù)。apache 官方,對 spark 的定義是:通用的大數(shù)據(jù)快速處理引擎(一“棧”式)。
spark core 用于離線計(jì)算
spark sql 用于交互式查詢
spark streaming,structed streaming 用于實(shí)時流式計(jì)算
spark MLlib 用于機(jī)器學(xué)習(xí)
spark GraphX 用于圖計(jì)算
速度快:spar k基于內(nèi)存進(jìn)行計(jì)算(當(dāng)然也有部分計(jì)算基于磁盤,比如 shuffle)。
容易上手開發(fā):spark 的基于 rdd 的計(jì)算模型,比 hadoop 的基于 map-reduce 的計(jì)算模型要更加易于理解,更加易于上手開發(fā),實(shí)現(xiàn)各種復(fù)雜功能。
通用性:spark 提供的技術(shù)組件,可以一站式地完成大數(shù)據(jù)領(lǐng)域的離線批處理、交互式查詢、流式計(jì)算、機(jī)器學(xué)習(xí)、圖計(jì)算等常見的任務(wù)。
與其他技術(shù)的完美集成:例如 hadoop,hdfs、hive、hbase 負(fù)責(zé)存儲,yarn 負(fù)責(zé)資源調(diào)度,spark 負(fù)責(zé)大數(shù)據(jù)計(jì)算。
極高的活躍度:spark 目前是 apache 的頂級項(xiàng)目,全世界有大量的優(yōu)秀工程師是 spark 的 committer,并且世界上很多頂級的 IT 公司都在大規(guī)模地使用 spark。
貼個spark架構(gòu)圖
需要搭建 flume 集群,kafka 集群,es 集群,zookeeper 集群,由于本例 spark 是在本地模式運(yùn)行,所以無需搭建 spark 集群。
搭建好集群后,根據(jù)集群組件直接的整合關(guān)系,配置好配置文件。其中主要的配置為 flume 的配置,如下圖:
可以看到,我們的 agent 的 source 為 r1,channel 為 c1,sink 為 k1,source 為我本地 nc 服務(wù),收集日志時,只需要打開 9999 端口就可以把日志收集。channel 選擇為 memory 內(nèi)存模式。sink 為 kafka 的 topic8 主題。
開啟 zookeeper 服務(wù)。其中 QuorumPeerMain 為 zookeeper 進(jìn)程。
開啟 kafka 服務(wù)。
開啟 es 服務(wù)。
開啟 flume 服務(wù)。其中 Application 為 flume 進(jìn)程。
創(chuàng)建好 es 對應(yīng)的表,表有三個字段,對應(yīng)代碼里面的 case class(代碼隨后貼上)。
代碼如下:
package?run?import?org.apache.kafka.common.serialization.StringDeserializer?import?org.apache.log4j.Logger?import?org.apache.spark.{SparkConf,?SparkContext}?import?org.apache.spark.sql.SparkSession?import?org.apache.spark.streaming.dstream.DStream?import?org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe?import?org.apache.spark.streaming.kafka010.KafkaUtils?import?org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent?import?org.apache.spark.streaming.{Seconds,?StreamingContext}?import?org.elasticsearch.spark.rdd.EsSpark?/**???*?@author?wangjx???*?測試kafka數(shù)據(jù)進(jìn)行統(tǒng)計(jì)??kafka自身維護(hù)offset(建議使用自定義維護(hù)方式維護(hù)偏移量)???*/?object?SparkStreamingAutoOffsetKafka?{???//定義樣例類?與es表對應(yīng)???case?class?people(name:String,country:String,age:Int)???def?main(args:?Array[String]):?Unit?=?{?????val?logger?=?Logger.getLogger(this.getClass);?????//spark?配置?????val?conf?=?new?SparkConf().setAppName("SparkStreamingAutoOffsetKafka").setMaster("local[2]")?????conf.set("es.index.auto.create","true")?????conf.set("es.nodes","127.0.0.1")?????conf.set("es.port","9200")?????//spark?streaming實(shí)時計(jì)算初始化?定義每10秒一個批次?準(zhǔn)實(shí)時處理?企業(yè)一般都是準(zhǔn)實(shí)時?比如每隔10秒統(tǒng)計(jì)近1分鐘的數(shù)據(jù)等等?????val?ssc?=?new?StreamingContext(conf,?Seconds(10))?????val?spark?=?SparkSession.builder()???????.config(conf)???????.getOrCreate()?????spark.sparkContext.setLogLevel("WARN");?????//設(shè)置kafka參數(shù)?????val?kafkaParams?=?Map[String,?Object](???????"bootstrap.servers"?->?"x:9092",???????"key.deserializer"?->?classOf[StringDeserializer],???????"value.deserializer"?->?classOf[StringDeserializer],???????"group.id"?->?"exactly-once",???????"auto.offset.reset"?->?"latest",???????"enable.auto.commit"?->?(false:?java.lang.Boolean)?????)?????//kafka主題?????val?topic?=?Set("kafka8")?????//從kafka獲取數(shù)據(jù)?????val?stream?=?KafkaUtils.createDirectStream[String,?String](???????ssc,???????PreferConsistent,???????Subscribe[String,?String](topic,?kafkaParams)?????)?????//具體的業(yè)務(wù)邏輯?????val?kafkaValue:?DStream[String]?=?stream.flatMap(line=>Some(line.value()))?????val?peopleStream?=?kafkaValue???????.map(_.split(":"))???????//形成people樣例對象???????.map(m=>people(m(0),m(1),m(2).toInt))?????//存入ES?????peopleStream.foreachRDD(rdd?=>{???????EsSpark.saveToEs(rdd,?"people/man")?????})?????//啟動程序入口?????ssc.start()?????ssc.awaitTermination()???}?}?復(fù)制代碼
分享題目:簡述大數(shù)據(jù)實(shí)時處理框架
當(dāng)前URL:http://bm7419.com/article34/ijhppe.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站設(shè)計(jì)公司、網(wǎng)站制作、微信小程序、手機(jī)網(wǎng)站建設(shè)、服務(wù)器托管、ChatGPT
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)