org.apache.flinkflink-connector-kafka-0.11_2.111.10.0
灏咾afka鏁版嵁鍐欏叆Flink锛?/p>
valproperties=newProperties()properties.setProperty(\"bootstrap.servers\",\"localhost:9092\")properties.setProperty(\"group.id\",\"consumer-group\")properties.setProperty(\"key.deserializer\",\"org.apache.kafka.common.serialization.StringDeserializer\")properties.setProperty(\"value.deserializer\",\"org.apache.kafka.common.serialization.StringDeserializer\")properties.setProperty(\"auto.offset.reset\",\"latest\")valsource=env.addSource(newFlinkKafkaConsumer011[String](\"sensor\",newSimpleStringSchema(),properties))
鍩轰簬缃戠粶濂楁帴瀛楃殑锛?/p>
valsource=env.socketTextStream(\"IP\",PORT)
浜屻€乀ransform杞崲绠楀瓙
1. map
灏咲ataSet涓殑姣忎竴涓厓绱犺浆鎹负鍙﹀涓€涓厓绱狅細
dataStream.map{x= x*2}
2. FlatMap
閲囩敤涓€涓暟鎹厓骞剁敓鎴愰浂涓紝涓€涓垨澶氫釜鏁版嵁鍏冦€傚皢鍙ュ瓙鍒嗗壊涓哄崟璇嶇殑flatmap鍑芥暟锛?/p>
dataStream.flatMap{str= str.split(\"\")}
3. Filter
璁$畻姣忎釜鏁版嵁鍏冪殑甯冨皵鍑芥暟锛屽苟淇濆瓨鍑芥暟杩斿洖true鐨勬暟鎹厓銆傝繃婊ゆ帀闆跺€肩殑杩囨护鍣細
dataStream.filter { _ != 0 }
4. KeyBy
閫昏緫涓婂皢娴佸垎鍖轰负涓嶇浉浜ょ殑鍒嗗尯銆傚叿鏈夌浉鍚孠eys鐨勬墍鏈夎褰曢兘鍒嗛厤缁欏悓涓€鍒嗗尯銆傚湪鍐呴儴锛宬eyBy()鏄娇鐢ㄦ暎鍒楀垎鍖哄疄鐜扮殑銆傛寚瀹氶敭鏈変笉鍚岀殑鏂规硶銆?/p>
姝よ浆鎹㈣繑鍥濳eyedStream锛屽叾涓寘鎷娇鐢ㄨKeys鍖栫姸鎬佹墍闇€鐨凨eyedStream锛?/p>
dataStream.keyBy(0)
5. Reduce
琚獽eys鍖栨暟鎹祦涓婄殑 婊氬姩 Reduce銆傚皢褰撳墠鏁版嵁鍏冧笌鏈€鍚庝竴涓猂educe鐨勫€肩粍鍚堝苟鍙戝嚭鏂板€硷細
keyedStream.reduce{_+_}
6. Fold
鍏锋湁鍒濆鍊肩殑琚獽eys鍖栨暟鎹祦涓婄殑 婊氬姩 鎶樺彔銆傚皢褰撳墠鏁版嵁鍏冧笌鏈€鍚庢姌鍙犵殑鍊肩粍鍚堝苟鍙戝嚭鏂板€硷細
valresult:DataStream[String]=keyedStream.fold(\"start\")((str,i)= {str+\"-\"+i})//瑙i噴锛氬綋涓婅堪浠g爜搴旂敤浜庡簭鍒楋紙1,2,3,4,5锛夋椂锛岃緭鍑虹粨鏋?ldquo;start-1 锛?ldquo;start-1-2 锛?ldquo;start-1-2-3 锛?..
7. Aggregations
鍦ㄨKeys鍖栨暟鎹祦涓婃粴鍔ㄨ仛鍚堛€俶in鍜宮inBy涔嬮棿鐨勫樊寮傛槸min杩斿洖鏈€灏忓€硷紝鑰宮inBy杩斿洖璇ュ瓧娈典腑鍏锋湁鏈€灏忓€肩殑鏁版嵁鍏?max鍜宮axBy鐩稿悓)锛?/p>
keyedStream.sum(0);keyedStream.min(0);keyedStream.max(0);keyedStream.minBy(0);keyedStream.maxBy(0);
8. Window
鍙互鍦ㄥ凡缁忓垎鍖虹殑KeyedStream涓婂畾涔塛indows銆俉indows鏍规嵁鏌愪簺鐗瑰緛(渚嬪锛屽湪鏈€鍚?绉掑唴鍒拌揪鐨勬暟鎹?瀵规瘡涓狵eys涓殑鏁版嵁杩涜鍒嗙粍銆傝繖閲屼笉鍐嶅绐楀彛杩涜璇﹁В锛屾湁鍏崇獥鍙g殑瀹屾暣璇存槑锛?/p>
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)));
9. WindowAll
Windows鍙互鍦ㄥ父瑙凞ataStream涓婂畾涔夈€俉indows鏍规嵁鏌愪簺鐗瑰緛(渚嬪锛屽湪鏈€鍚?绉掑唴鍒拌揪鐨勬暟鎹?瀵规墍鏈夋祦浜嬩欢杩涜鍒嗙粍銆?/p>
娉ㄦ剰锛氬湪璁稿鎯呭喌涓嬶紝杩欐槸闈炲苟琛岃浆鎹€傛墍鏈夎褰曞皢鏀堕泦鍦╳indowAll 绠楀瓙鐨勪竴涓换鍔′腑銆?/p>
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
10. Window Apply
灏嗕竴鑸嚱鏁板簲鐢ㄤ簬鏁翠釜绐楀彛銆?/p>
娉ㄦ剰锛氬鏋滄偍姝e湪浣跨敤windowAll杞崲锛屽垯闇€瑕佷娇鐢ˋllWindowFunction銆?/p>
涓嬮潰鏄竴涓墜鍔ㄦ眰鍜岀獥鍙f暟鎹厓鐨勫嚱鏁帮細
windowedStream.apply{WindowFunction}allWindowedStream.apply{AllWindowFunction}
11. Window Reduce
灏嗗嚱鏁扮缉鍑忓嚱鏁板簲鐢ㄤ簬绐楀彛骞惰繑鍥炵缉灏忕殑鍊硷細
windowedStream.reduce{_+_}
12. Window Fold
灏嗗嚱鏁版姌鍙犲嚱鏁板簲鐢ㄤ簬绐楀彛骞惰繑鍥炴姌鍙犲€硷細
valresult:DataStream[String]=windowedStream.fold(\"start\",(str,i)= {str+\"-\"+i})//涓婅堪浠g爜搴旂敤浜庡簭鍒楋紙1,2,3,4,5锛夋椂锛屽皢搴忓垪鎶樺彔涓哄瓧绗︿覆 start-1-2-3-4-5
13. Union
涓や釜鎴栧涓暟鎹祦鐨勮仈鍚堬紝鍒涘缓鍖呭惈鏉ヨ嚜鎵€鏈夋祦鐨勬墍鏈夋暟鎹厓鐨勬柊娴併€傛敞鎰忥細濡傛灉灏嗘暟鎹祦涓庤嚜韬仈鍚堬紝鍒欎細鍦ㄧ粨鏋滄祦涓幏鍙栦袱娆℃暟鎹厓锛?/p>
dataStream.union(otherStream1,otherStream2,...)
14. Window Join
鍦ㄧ粰瀹欿eys鍜屽叕鍏辩獥鍙d笂杩炴帴涓や釜鏁版嵁娴侊細
dataStream.join(otherStream).where().equalTo().window(TumblingEventTimeWindows.of(Time.seconds(3))).apply(newJoinFunction(){...})
15. Interval Join
鍦ㄧ粰瀹氱殑鏃堕棿闂撮殧鍐呬娇鐢ㄥ叕鍏盞eys鍏宠仈涓や釜琚獽ey鍖栫殑鏁版嵁娴佺殑涓や釜鏁版嵁鍏僥1鍜宔2锛屼互渚縠1.timestamp + lowerBound = e2.timestamp = e1.timestamp + upperBound
am.intervalJoin(otherKeyedStream).between(Time.milliseconds(-2),Time.milliseconds(2)).upperBoundExclusive(true).lowerBoundExclusive(true).process(newIntervalJoinFunction(){...})
16. Window CoGroup
鍦ㄧ粰瀹欿eys鍜屽叕鍏辩獥鍙d笂瀵逛袱涓暟鎹祦杩涜Cogroup锛?/p>
dataStream.coGroup(otherStream).where(0).equalTo(1).window(TumblingEventTimeWindows.of(Time.seconds(3))).apply(newCoGroupFunction(){...})
17. Connect
杩炴帴 涓や釜淇濆瓨鍏剁被鍨嬬殑鏁版嵁娴併€傝繛鎺ュ厑璁镐袱涓祦涔嬮棿鐨勫叡浜姸鎬侊細
DataStreamsomeStream=...DataStreamotherStream=...ConnectedStreams Integer,String connectedStreams=someStream.connect(otherStream)//...浠h〃鐪佺暐涓棿鎿嶄綔
18. CoMap锛孋oFlatMap
绫讳技浜庤繛鎺ユ暟鎹祦涓婄殑map鍜宖latMap锛?/p>
connectedStreams.map((_:Int)= true,(_:String)= false)connectedStreams.flatMap((_:Int)= true,(_:String)= false)
19. Split
鏍规嵁鏌愪簺鏍囧噯灏嗘祦鎷嗗垎涓轰袱涓垨鏇村涓祦锛?/p>
valsplit=someDataStream.split((num:Int)= (num%2)match{case0= List(\"even\")case1= List(\"odd\")})
20. Select
浠庢媶鍒嗘祦涓€夋嫨涓€涓垨澶氫釜娴侊細
SplitStream split;DataStream even = split.select(\"even\");DataStream odd = split.select(\"odd\");DataStream all = split.select(\"even\",\"odd\")
涓夈€丼ink绠楀瓙
鏀寔灏嗘暟鎹緭鍑哄埌锛?/p>
鏈湴鏂囦欢(鍙傝€冩壒澶勭悊) 鏈湴闆嗗悎(鍙傝€冩壒澶勭悊) HDFS(鍙傝€冩壒澶勭悊)
闄ゆ涔嬪锛岃繕鏀寔锛?/p>
sink鍒発afka sink鍒癿ysql sink鍒皉edis
涓嬮潰浠ink鍒発afka涓轰緥锛?/p>
valsinkTopic=\"test\"//鏍蜂緥绫?nbsp;caseclassStudent(id:Int,name:String,addr:String,sex:String)valmapper:ObjectMapper=newObjectMapper()//灏嗗璞¤浆鎹㈡垚瀛楃涓?nbsp;deftoJsonString(T:Object):String={mapper.registerModule(DefaultScalaModule)mapper.writeValueAsString(T)}defmain(args:Array[String]):Unit={//1.鍒涘缓娴佹墽琛岀幆澧?nbsp;valenv=StreamExecutionEnvironment.getExecutionEnvironment//2.鍑嗗鏁版嵁valdataStream:DataStream[Student]=env.fromElements(Student(8,\"xiaoming\",\"beijingbiejing\",\"female\"))//灏唖tudent杞崲鎴愬瓧绗︿覆valstudentStream:DataStream[String]=dataStream.map(student= toJsonString(student)//杩欓噷闇€瑕佹樉绀篠erializerFeature涓殑鏌愪竴涓紝鍚﹀垯浼氭姤鍚屾椂鍖归厤涓や釜鏂规硶鐨勯敊璇?nbsp;)//studentStream.print()valprop=newProperties()prop.setProperty(\"bootstrap.servers\",\"node01:9092\")valmyProducer=newFlinkKafkaProducer011[String](sinkTopic,newKeyedSerializationSchemaWrapper[String](newSimpleStringSchema()),prop)studentStream.addSink(myProducer)studentStream.print()env.execute(\"Flinkaddsink\")}
本文链接: http://maxsource16.immuno-online.com/view-708022.html