实际工作中,可能会遇到想控制Flink数据流速度的情况,比如每5秒最多输出3条数据,这时候如果使用默认的TimeWindow或者CountWindow都不好达到要求,这时候就可以进行自定义窗口的触发器Trigger,修改触发窗口执行计算的条件。
trigger接口有五个方法允许trigger对不同的事件做出反应:
onElement():进入窗口的每个元素都会调用该方法。
onEventTime():事件时间窗口触发的时候被调用。
onProcessingTime():处理时间窗口触发的时候会被调用。
onMerge():有状态的触发器相关,并在它们相应的窗口合并时合并两个触发器的状态,例如使用会话窗口。
clear():该方法主要是执行窗口的删除 *** 作。
关于上述方法需要注意两点:
1).前三方法决定着如何通过返回一个TriggerResult来 *** 作输入事件。
CONTINUE:什么都不做。FIRE:触发计算。PURE:清除窗口的元素。FIRE_AND_PURE:触发计算和清除窗口元素。
如下为简单示例:
// 示例:实现每5秒钟最多输出3条数据 stream .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5))) .trigger(new Trigger() { private ValueStateDescriptor countStateDec = new ValueStateDescriptor ("countState", Long.class); private Long num; //清除状态并执行窗口计算 private TriggerResult fireAndPurge(TimeWindow window, TriggerContext ctx) throws Exception { clear(window, ctx); return TriggerResult.FIRE; } //每来一个元素调用一次 @Override public TriggerResult onElement(JavaBean element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { ValueState countState = ctx.getPartitionedState(countStateDec); //给num赋值 num = countState.value() == null ? 0L : countState.value(); num ++; countState.update(num); System.out.println("num:" + num); if (num >= 3L) { //如果数据到了三条,但是时间还没到,线程等待 if (timestamp < window.getEnd()) { Thread.sleep(window.getEnd() - System.currentTimeMillis()); } return fireAndPurge(window, ctx); } else { return TriggerResult.CONTINUE; } } //处理时间窗口触发时,执行该方法 @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { if (time >= window.getEnd()) { return TriggerResult.CONTINUE; } else { System.out.println("fire with process time: " + time); return TriggerResult.FIRE; } } //事件时间窗口触发时,执行该方法 @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } //执行窗口的删除 *** 作 @Override public void clear(TimeWindow timeWindow, TriggerContext ctx) throws Exception { ValueState countState = ctx.getPartitionedState(countStateDec); countState.clear(); } })
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)