Flink中Watermarks怎么用
这篇文章将为大家详细讲解有关Flink中Watermarks怎么用,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。

成都创新互联是专业的讷河网站建设公司,讷河接单;提供网站制作、做网站,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行讷河网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!
Watermarks水印:为输入的数据流的设置一个时间事件(时间戳),对窗口内的数据输入流无序与延迟提供解决方案
示例环境
java.version: 1.8.xflink.version: 1.11.1
TimestampsAndWatermarks.java
import com.flink.examples.DataSource;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
/**
* @Description Watermarks水印:为输入的数据流的设置一个时间事件(时间戳),对窗口内的数据输入流无序与延迟提供解决方案
*/
public class TimestampsAndWatermarks {
/**
* 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/event_timestamps_watermarks.html
*/
/**
* 遍历集合,分别打印不同性别的信息,对于执行超时,自动触发定时器
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/*
TimeCharacteristic有三种时间类型:
ProcessingTime:以operator处理的时间为准,它使用的是机器的系统时间来作为data stream的时间;
IngestionTime:以数据进入flink streaming data flow的时间为准;
EventTime:以数据自带的时间戳字段为准,应用程序需要指定如何从record中抽取时间戳字段;需要实现assignTimestampsAndWatermarks方法,并设置时间水位线;
*/
//使用event time,需要指定事件的时间戳
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
//设置自动生成水印的时间周期,避免数据流量大的情况下,频繁添加水印导致计算性能降低。
env.getConfig().setAutoWatermarkInterval(1000L);
List> tuple3List = DataSource.getTuple3ToList();
DataStream> inStream = env.addSource(new MyRichSourceFunction());
DataStream> dataStream = inStream
//为一个水位线,这个Watermarks在不断的变化,一旦Watermarks大于了某个window的end_time,就会触发此window的计算,Watermarks就是用来触发window计算的。
//Duration.ofSeconds(2),到数据流到达flink后,再水位线中设置延迟时间,也就是在所有数据流的最大的事件时间比window窗口结束时间大或相等时,再延迟多久触发window窗口结束;
// .assignTimestampsAndWatermarks(
// WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(2))
// .withTimestampAssigner((element, timestamp) -> {
// long times = System.currentTimeMillis() ;
// System.out.println(element.f1 + ","+ element.f0 + "的水位线为:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss"));
// return times;
// })
// )
.assignTimestampsAndWatermarks(new MyWatermarkStrategy()
.withTimestampAssigner(new SerializableTimestampAssigner>() {
@Override
public long extractTimestamp(Tuple3 element, long timestamp) {
long times = System.currentTimeMillis();
System.out.println(element.f1 + "," + element.f0 + "的水位线为:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss"));
return times;
}
}))
//分区窗口
.keyBy((KeySelector, String>) k -> k.f1)
//触发3s滚动窗口
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
//执行窗口数据,对keyBy数据流批量处理
.apply(new WindowFunction, Tuple2, String, TimeWindow>(){
@Override
public void apply(String s, TimeWindow window, Iterable> input, Collector> out) throws Exception {
long times = System.currentTimeMillis() ;
System.out.println();
System.out.println("窗口处理时间:" + DateFormatUtils.format(new Date(times), "yyyy-MM-dd HH:mm:ss"));
Iterator> iterator = input.iterator();
int total = 0;
int size = 0;
String sex = "";
while (iterator.hasNext()){
Tuple3 tuple3 = iterator.next();
total += tuple3.f2;
size ++;
sex = tuple3.f1;
}
out.collect(new Tuple2<>(sex, total / size));
}
});
dataStream.print();
env.execute("flink Filter job");
}
/**
* 定期水印生成器
*/
public static class MyWatermarkStrategy implements WatermarkStrategy>{
@Override
public WatermarkGenerator> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator>() {
//设置固定的延迟量3.5 seconds
private final long maxOutOfOrderness = 3500;
private long currentMaxTimestamp;
/**
* 事件处理
* @param event 数据流对象
* @param eventTimestamp 事件水位线时间
* @param output 输出
*/
@Override
public void onEvent(Tuple3 event, long eventTimestamp, WatermarkOutput output) {
currentMaxTimestamp = Math.max(System.currentTimeMillis(), eventTimestamp);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 拿上一个水印时间 - 延迟量 = 等于给的窗口最终数据最后时间(如果在窗口到期内,未发生新的水印事件,则按window正常结束时间计算,当在最后水印时间-延迟量的时间范围内,有新的数据流进入,则会重新触发窗口内对全部数据流计算)
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
}
};
}
}
/**
* 模拟数据持续输出
*/
public static class MyRichSourceFunction extends RichSourceFunction> {
@Override
public void run(SourceContext> ctx) throws Exception {
List> tuple3List = DataSource.getTuple3ToList();
int j = 0;
for (int i=0;i<100;i++){
if (i%6 == 0){
j=0;
}
ctx.collect(tuple3List.get(j));
//1秒钟输出一个
Thread.sleep(1 * 1000);
j ++;
}
}
@Override
public void cancel() {
try{
super.close();
}catch (Exception e){
e.printStackTrace();
}
}
}
} 打印结果
man,张三的水位线为:2020-12-27 10:28:20 girl,李四的水位线为:2020-12-27 10:28:21 man,王五的水位线为:2020-12-27 10:28:22 girl,刘六的水位线为:2020-12-27 10:28:23 girl,伍七的水位线为:2020-12-27 10:28:24 窗口处理时间:2020-12-27 10:28:25 (man,20) man,吴八的水位线为:2020-12-27 10:28:25 man,张三的水位线为:2020-12-27 10:28:26 girl,李四的水位线为:2020-12-27 10:28:27 窗口处理时间:2020-12-27 10:28:28 (girl,28) 窗口处理时间:2020-12-27 10:28:28 (man,29)
关于“Flink中Watermarks怎么用”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。
网站名称:Flink中Watermarks怎么用
URL链接:http://www.jxjierui.cn/article/ggside.html


咨询
建站咨询
