Flink-使用合流操作進行實時對賬需求的實現(xiàn)-創(chuàng)新互聯(lián)

學Flink第八章多流轉(zhuǎn)換的時候,進行合流操作.connect()使用到了第九章狀態(tài)編程的知識,感覺總體不是很清晰,因此學完狀態(tài)編程后現(xiàn)在進行重溫并細化一些細節(jié)

創(chuàng)新互聯(lián)公司專注于中大型企業(yè)的網(wǎng)站設(shè)計、成都網(wǎng)站設(shè)計和網(wǎng)站改版、網(wǎng)站營銷服務(wù),追求商業(yè)策劃與數(shù)據(jù)分析、創(chuàng)意藝術(shù)與技術(shù)開發(fā)的融合,累計客戶超過千家,服務(wù)滿意度達97%。幫助廣大客戶順利對接上互聯(lián)網(wǎng)浪潮,準確優(yōu)選出符合自己需要的互聯(lián)網(wǎng)運用,我們將一直專注品牌網(wǎng)站制作和互聯(lián)網(wǎng)程序開發(fā),在前進的路上,與客戶一起成長!
  1. 業(yè)務(wù)背景

在這里插入圖片描述

  • 步驟一:

用戶進行支付的時候,后臺是需要調(diào)用第三方服務(wù)平臺進行服務(wù),即用戶支付請求,頁面將會跳轉(zhuǎn)到第三方支付平臺支付

  • 步驟二:

用戶進行支付之后,第三方支付平臺給到用戶前端支出反饋,并且給我們平臺發(fā)送用戶已經(jīng)付款的消息

  • 步驟三:

第三方支付平臺需要將錢再轉(zhuǎn)入到我們平臺賬戶

  1. 出現(xiàn)的問題以及需求
  • 問題

如果進行到圖中④,如果發(fā)生數(shù)據(jù)丟失,那么用戶已經(jīng)支付的消息無法傳達給到后臺,而后不能關(guān)閉訂單

  • 需求

因此需要進行實時對賬操作,即用戶提交的支付請求(客戶端),以及第三方支付平臺給到的請求(三方端),兩者可以當成兩條流

  • 結(jié)果

如果進行兩條流的操作后不匹配,那么將進行預(yù)警

  1. 一些細節(jié)考慮
  • 兩個流都給他標上時間戳(使用watermark標志)

  • 使用狀態(tài)編程保存狀態(tài)以及設(shè)置定時器,來進行兩條流的連接以及等待

    • 如果對方流中有我流的數(shù)據(jù),那么直接輸出成功;如果沒有則更新我流狀態(tài),注冊定時器等待另一個流

    • 然后用ontimer()觸發(fā)定時器:判斷條件如果兩條流中還有狀態(tài)沒被清空,說明沒匹配上

  1. 上代碼
  • 代碼
public class BillCheckExample {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        //來自app的支付日志
        SingleOutputStreamOperator>appStream = env.fromElements(
                Tuple3.of("order-1", "app", 1000L),
                Tuple3.of("order-2", "app", 2000L),
                Tuple3.of("order-3", "app", 3500L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner>() {@Override
                    public long extractTimestamp(Tuple3element, long recordTimestamp) {return element.f2;
                    }
                })
        );
        //來自第三方平臺的支付日志
        SingleOutputStreamOperator>thirdpartStream = env.fromElements(
                Tuple4.of("order-1", "third-party", "success", 3000L),
                Tuple4.of("order-3", "third-party", "success", 4000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.
     >forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner>() {@Override
                    public long extractTimestamp(Tuple4element, long recordTimestamp) {return element.f3;
                    }
                })
        );

        //檢測同一支付單在兩條流中是否匹配,等待一段時間后,不匹配就報警
//        //這種也可以
//        appStream.keyBy(data->data.f0)
//                .connect(thirdpartStream.keyBy(data ->data.f0));
//
        appStream.connect(thirdpartStream)
                        .keyBy(data->data.f0,data->data.f0)
                        .process(new OrderMatchResult())
                        .print();


        env.execute();
    }
    //自定義實現(xiàn)CoFunction
    public static class OrderMatchResult extends CoProcessFunction,
                                                Tuple4,String>{//定義狀態(tài)變量,用來保存已經(jīng)到達的事件
        private ValueState>appEventState;
        private ValueState>thirdPartyEventState;

        //運行上下文環(huán)境中獲取狀態(tài)
        @Override
        public void open(Configuration parameters) throws Exception {appEventState = getRuntimeContext().getState(
                    new ValueStateDescriptor>("app-event", Types.TUPLE(Types.STRING,Types.STRING,Types.LONG))
            );
        thirdPartyEventState = getRuntimeContext().getState(
                new ValueStateDescriptor>("thirdparty-event", Types.TUPLE(Types.STRING, Types.STRING, Types.STRING,Types.LONG))
        );
        }

        @Override
        public void processElement1(Tuple3value, CoProcessFunction, Tuple4, String>.Context ctx, Collectorout) throws Exception {//來的是app event,看另一條流中事件是否來過
            if(thirdPartyEventState.value()!=null){out.collect("對賬成功:"+value+" "+thirdPartyEventState.value());
                //清空狀態(tài)
                thirdPartyEventState.clear();
            }else{//如果沒來就等待,并且更新狀態(tài)
                appEventState.update(value);
                //注冊一個5秒后的定時器,開始等待另一條的事件
                ctx.timerService().registerEventTimeTimer(value.f2+5000L);
            }

        }

        @Override
        public void processElement2(Tuple4value, CoProcessFunction, Tuple4, String>.Context ctx, Collectorout) throws Exception {//來的是app event,看另一條流中事件是否來過
            if(appEventState.value()!=null){out.collect("對賬成功:"+appEventState.value()+" "+value);
                //清空狀態(tài)
                appEventState.clear();
            }else{//如果沒來就等待,并且更新狀態(tài)
                thirdPartyEventState.update(value);
                //注冊一個5秒后的定時器,開始等待另一條的事件
                ctx.timerService().registerEventTimeTimer(value.f3);
            }
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collectorout) throws Exception {//定時器觸發(fā),判斷狀態(tài),如果某個狀態(tài)不為空,說明另一條中事件沒來
            //并且不會存在兩個都不為空,因為其中一個不為空后會被清除
            //沒有沒清空表示失敗
            if(appEventState.value()!=null){out.collect("對賬失?。?+appEventState.value()+" "+"第三方支付平臺信息未到");
            }
            if(thirdPartyEventState.value()!=null){out.collect("對賬失敗:"+thirdPartyEventState.value()+" "+"APP信息信息未到");
            }
            //清空所有數(shù)據(jù)
            appEventState.clear();
            thirdPartyEventState.clear();
        }
    }

}
  • 結(jié)果
對賬成功:(order-1,app,1000) (order-1,third-party,success,3000)
對賬成功:(order-3,app,3500) (order-3,third-party,success,4000)
對賬失?。?order-2,app,2000) 第三方支付平臺信息未到

你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級服務(wù)器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧

文章名稱:Flink-使用合流操作進行實時對賬需求的實現(xiàn)-創(chuàng)新互聯(lián)
鏈接分享:http://bm7419.com/article34/ihjpe.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供建站公司ChatGPT、虛擬主機、網(wǎng)站排名外貿(mào)網(wǎng)站建設(shè)、網(wǎng)站維護

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

營銷型網(wǎng)站建設(shè)