4000-520-616
欢迎来到免疫在线!(蚂蚁淘生物旗下平台)  请登录 |  免费注册 |  询价篮
主营:原厂直采,平行进口,授权代理(蚂蚁淘为您服务)
咨询热线电话
4000-520-616
当前位置: 首页 > 新闻动态 >
新闻详情
硬核!一文学完Flink流计算常用算子_TechWeb
来自 : TechWeb 发布时间:2021-03-24

org.apache.flinkflink-connector-kafka-0.11_2.111.10.0

灏咾afka鏁版嵁鍐欏叆Flink锛?/p>

valproperties=newProperties()properties.setProperty(\"bootstrap.servers\",\"localhost:9092\")properties.setProperty(\"group.id\",\"consumer-group\")properties.setProperty(\"key.deserializer\",\"org.apache.kafka.common.serialization.StringDeserializer\")properties.setProperty(\"value.deserializer\",\"org.apache.kafka.common.serialization.StringDeserializer\")properties.setProperty(\"auto.offset.reset\",\"latest\")valsource=env.addSource(newFlinkKafkaConsumer011[String](\"sensor\",newSimpleStringSchema(),properties))

鍩轰簬缃戠粶濂楁帴瀛楃殑锛?/p>

valsource=env.socketTextStream(\"IP\",PORT)

浜屻€乀ransform杞崲绠楀瓙

1. map

灏咲ataSet涓殑姣忎竴涓厓绱犺浆鎹负鍙﹀涓€涓厓绱狅細

dataStream.map{x= x*2}

2. FlatMap

閲囩敤涓€涓暟鎹厓骞剁敓鎴愰浂涓紝涓€涓垨澶氫釜鏁版嵁鍏冦€傚皢鍙ュ瓙鍒嗗壊涓哄崟璇嶇殑flatmap鍑芥暟锛?/p>

dataStream.flatMap{str= str.split(\"\")}

3. Filter

璁$畻姣忎釜鏁版嵁鍏冪殑甯冨皵鍑芥暟锛屽苟淇濆瓨鍑芥暟杩斿洖true鐨勬暟鎹厓銆傝繃婊ゆ帀闆跺€肩殑杩囨护鍣細

dataStream.filter { _ != 0 }

4. KeyBy

閫昏緫涓婂皢娴佸垎鍖轰负涓嶇浉浜ょ殑鍒嗗尯銆傚叿鏈夌浉鍚孠eys鐨勬墍鏈夎褰曢兘鍒嗛厤缁欏悓涓€鍒嗗尯銆傚湪鍐呴儴锛宬eyBy()鏄娇鐢ㄦ暎鍒楀垎鍖哄疄鐜扮殑銆傛寚瀹氶敭鏈変笉鍚岀殑鏂规硶銆?/p>

姝よ浆鎹㈣繑鍥濳eyedStream锛屽叾涓寘鎷娇鐢ㄨKeys鍖栫姸鎬佹墍闇€鐨凨eyedStream锛?/p>

dataStream.keyBy(0)

5. Reduce

琚獽eys鍖栨暟鎹祦涓婄殑 婊氬姩 Reduce銆傚皢褰撳墠鏁版嵁鍏冧笌鏈€鍚庝竴涓猂educe鐨勫€肩粍鍚堝苟鍙戝嚭鏂板€硷細

keyedStream.reduce{_+_}

6. Fold

鍏锋湁鍒濆鍊肩殑琚獽eys鍖栨暟鎹祦涓婄殑 婊氬姩 鎶樺彔銆傚皢褰撳墠鏁版嵁鍏冧笌鏈€鍚庢姌鍙犵殑鍊肩粍鍚堝苟鍙戝嚭鏂板€硷細

valresult:DataStream[String]=keyedStream.fold(\"start\")((str,i)= {str+\"-\"+i})//瑙i噴锛氬綋涓婅堪浠g爜搴旂敤浜庡簭鍒楋紙1,2,3,4,5锛夋椂锛岃緭鍑虹粨鏋?ldquo;start-1 锛?ldquo;start-1-2 锛?ldquo;start-1-2-3 锛?..

7. Aggregations

鍦ㄨKeys鍖栨暟鎹祦涓婃粴鍔ㄨ仛鍚堛€俶in鍜宮inBy涔嬮棿鐨勫樊寮傛槸min杩斿洖鏈€灏忓€硷紝鑰宮inBy杩斿洖璇ュ瓧娈典腑鍏锋湁鏈€灏忓€肩殑鏁版嵁鍏?max鍜宮axBy鐩稿悓)锛?/p>

keyedStream.sum(0);keyedStream.min(0);keyedStream.max(0);keyedStream.minBy(0);keyedStream.maxBy(0);

8. Window

鍙互鍦ㄥ凡缁忓垎鍖虹殑KeyedStream涓婂畾涔塛indows銆俉indows鏍规嵁鏌愪簺鐗瑰緛(渚嬪锛屽湪鏈€鍚?绉掑唴鍒拌揪鐨勬暟鎹?瀵规瘡涓狵eys涓殑鏁版嵁杩涜鍒嗙粍銆傝繖閲屼笉鍐嶅绐楀彛杩涜璇﹁В锛屾湁鍏崇獥鍙g殑瀹屾暣璇存槑锛?/p>

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)));

9. WindowAll

Windows鍙互鍦ㄥ父瑙凞ataStream涓婂畾涔夈€俉indows鏍规嵁鏌愪簺鐗瑰緛(渚嬪锛屽湪鏈€鍚?绉掑唴鍒拌揪鐨勬暟鎹?瀵规墍鏈夋祦浜嬩欢杩涜鍒嗙粍銆?/p>

娉ㄦ剰锛氬湪璁稿鎯呭喌涓嬶紝杩欐槸闈炲苟琛岃浆鎹€傛墍鏈夎褰曞皢鏀堕泦鍦╳indowAll 绠楀瓙鐨勪竴涓换鍔′腑銆?/p>

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))

10. Window Apply

灏嗕竴鑸嚱鏁板簲鐢ㄤ簬鏁翠釜绐楀彛銆?/p>

娉ㄦ剰锛氬鏋滄偍姝e湪浣跨敤windowAll杞崲锛屽垯闇€瑕佷娇鐢ˋllWindowFunction銆?/p>

涓嬮潰鏄竴涓墜鍔ㄦ眰鍜岀獥鍙f暟鎹厓鐨勫嚱鏁帮細

windowedStream.apply{WindowFunction}allWindowedStream.apply{AllWindowFunction}

11. Window Reduce

灏嗗嚱鏁扮缉鍑忓嚱鏁板簲鐢ㄤ簬绐楀彛骞惰繑鍥炵缉灏忕殑鍊硷細

windowedStream.reduce{_+_}

12. Window Fold

灏嗗嚱鏁版姌鍙犲嚱鏁板簲鐢ㄤ簬绐楀彛骞惰繑鍥炴姌鍙犲€硷細

valresult:DataStream[String]=windowedStream.fold(\"start\",(str,i)= {str+\"-\"+i})//涓婅堪浠g爜搴旂敤浜庡簭鍒楋紙1,2,3,4,5锛夋椂锛屽皢搴忓垪鎶樺彔涓哄瓧绗︿覆 start-1-2-3-4-5

13. Union

涓や釜鎴栧涓暟鎹祦鐨勮仈鍚堬紝鍒涘缓鍖呭惈鏉ヨ嚜鎵€鏈夋祦鐨勬墍鏈夋暟鎹厓鐨勬柊娴併€傛敞鎰忥細濡傛灉灏嗘暟鎹祦涓庤嚜韬仈鍚堬紝鍒欎細鍦ㄧ粨鏋滄祦涓幏鍙栦袱娆℃暟鎹厓锛?/p>

dataStream.union(otherStream1,otherStream2,...)

14. Window Join

鍦ㄧ粰瀹欿eys鍜屽叕鍏辩獥鍙d笂杩炴帴涓や釜鏁版嵁娴侊細

dataStream.join(otherStream).where().equalTo().window(TumblingEventTimeWindows.of(Time.seconds(3))).apply(newJoinFunction(){...})

15. Interval Join

鍦ㄧ粰瀹氱殑鏃堕棿闂撮殧鍐呬娇鐢ㄥ叕鍏盞eys鍏宠仈涓や釜琚獽ey鍖栫殑鏁版嵁娴佺殑涓や釜鏁版嵁鍏僥1鍜宔2锛屼互渚縠1.timestamp + lowerBound = e2.timestamp = e1.timestamp + upperBound

am.intervalJoin(otherKeyedStream).between(Time.milliseconds(-2),Time.milliseconds(2)).upperBoundExclusive(true).lowerBoundExclusive(true).process(newIntervalJoinFunction(){...})

16. Window CoGroup

鍦ㄧ粰瀹欿eys鍜屽叕鍏辩獥鍙d笂瀵逛袱涓暟鎹祦杩涜Cogroup锛?/p>

dataStream.coGroup(otherStream).where(0).equalTo(1).window(TumblingEventTimeWindows.of(Time.seconds(3))).apply(newCoGroupFunction(){...})

17. Connect

杩炴帴 涓や釜淇濆瓨鍏剁被鍨嬬殑鏁版嵁娴併€傝繛鎺ュ厑璁镐袱涓祦涔嬮棿鐨勫叡浜姸鎬侊細

DataStreamsomeStream=...DataStreamotherStream=...ConnectedStreams Integer,String connectedStreams=someStream.connect(otherStream)//...浠h〃鐪佺暐涓棿鎿嶄綔

18. CoMap锛孋oFlatMap

绫讳技浜庤繛鎺ユ暟鎹祦涓婄殑map鍜宖latMap锛?/p>

connectedStreams.map((_:Int)= true,(_:String)= false)connectedStreams.flatMap((_:Int)= true,(_:String)= false)

19. Split

鏍规嵁鏌愪簺鏍囧噯灏嗘祦鎷嗗垎涓轰袱涓垨鏇村涓祦锛?/p>

valsplit=someDataStream.split((num:Int)= (num%2)match{case0= List(\"even\")case1= List(\"odd\")})

20. Select

浠庢媶鍒嗘祦涓€夋嫨涓€涓垨澶氫釜娴侊細

SplitStream split;DataStream even = split.select(\"even\");DataStream odd = split.select(\"odd\");DataStream all = split.select(\"even\",\"odd\")

涓夈€丼ink绠楀瓙

鏀寔灏嗘暟鎹緭鍑哄埌锛?/p>

鏈湴鏂囦欢(鍙傝€冩壒澶勭悊) 鏈湴闆嗗悎(鍙傝€冩壒澶勭悊) HDFS(鍙傝€冩壒澶勭悊)

闄ゆ涔嬪锛岃繕鏀寔锛?/p>

sink鍒発afka sink鍒癿ysql sink鍒皉edis

涓嬮潰浠ink鍒発afka涓轰緥锛?/p>

valsinkTopic=\"test\"//鏍蜂緥绫?nbsp;caseclassStudent(id:Int,name:String,addr:String,sex:String)valmapper:ObjectMapper=newObjectMapper()//灏嗗璞¤浆鎹㈡垚瀛楃涓?nbsp;deftoJsonString(T:Object):String={mapper.registerModule(DefaultScalaModule)mapper.writeValueAsString(T)}defmain(args:Array[String]):Unit={//1.鍒涘缓娴佹墽琛岀幆澧?nbsp;valenv=StreamExecutionEnvironment.getExecutionEnvironment//2.鍑嗗鏁版嵁valdataStream:DataStream[Student]=env.fromElements(Student(8,\"xiaoming\",\"beijingbiejing\",\"female\"))//灏唖tudent杞崲鎴愬瓧绗︿覆valstudentStream:DataStream[String]=dataStream.map(student= toJsonString(student)//杩欓噷闇€瑕佹樉绀篠erializerFeature涓殑鏌愪竴涓紝鍚﹀垯浼氭姤鍚屾椂鍖归厤涓や釜鏂规硶鐨勯敊璇?nbsp;)//studentStream.print()valprop=newProperties()prop.setProperty(\"bootstrap.servers\",\"node01:9092\")valmyProducer=newFlinkKafkaProducer011[String](sinkTopic,newKeyedSerializationSchemaWrapper[String](newSimpleStringSchema()),prop)studentStream.addSink(myProducer)studentStream.print()env.execute(\"Flinkaddsink\")}

本文链接: http://maxsource16.immuno-online.com/view-708022.html

发布于 : 2021-03-24 阅读(0)
公司介绍
联络我们
服务热线:4000-520-616
(限工作日9:00-18:00)
QQ :1570468124
手机:18915418616
官网:http://