Flink学习笔记(十二):Flink State生命周期 - Time-To-Live (TTL)

Flink学习笔记(十二):Flink State生命周期 - Time-To-Live (TTL),第1张

Flink学习笔记(十二):Flink State生命周期 - Time-To-Live (TTL)

在前面的 Flink学习笔记(十一):flink KeyedState运用介绍了如何使用state进行sum *** 作。但是数据流通常是长时间运行,那么存在的状态将越来越多,如何解决这个问题呢?

1、Flink State Time-To-Live (TTL)

Flink提供了StateTtlConfig机制进行处理。首先我们看下提供的策略类型

  • TTL 刷新策略(默认OnCreateAndWrite)
策略类型描述StateTtlConfig.UpdateType.Disabled禁用TTL,永不过期StateTtlConfig.UpdateType.OnCreateAndWrite每次写 *** 作都会更新State的最后访问时间StateTtlConfig.UpdateType.OnReadAndWrite每次读写 *** 作都会跟新State的最后访问时间
  • 状态可见性(默认NeverReturnExpired)
策略类型描述StateTtlConfig.StateVisibility.NeverReturnExpired永不返回过期状态StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp可以返回过期但尚未被清理的状态值

具体可以参考flink的官方文档

里面有更具体的介绍,包括state类型,清理策略和相关例子

2、实例

还是上面文章中的一个例子

我们可以看到在keybystream中配置了StateTtlConfig,配置方式如下,当一个状态超过两秒后重新计算状态

StateTtlConfig ttlConfig = StateTtlConfig
         newBuilder(Time.seconds(2))
         .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
         .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
         .build();
         
ValueStateDescriptor stateDescriptor = new ValueStateDescriptor("key-fruit", Types.TUPLE(Types.STRING, Types.INT));
stateDescriptor.enableTimeToLive(ttlConfig);

当然清除状态可以使用cleanupIncrementally,如

StateTtlConfig ttlConfig = StateTtlConfig
         newBuilder(Time.seconds(2))
         .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
         .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
         .cleanupIncrementally(10, true)
         .build();

我们看下完整代码

public class TestStateTtlConfig {
    private static final String[] FRUIT = {"苹果", "梨", "西瓜", "葡萄", "火龙果", "橘子", "桃子", "香蕉"};

    public static void main(String args[]) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream> fruit = env.addSource(new SourceFunction>() {
            private volatile boolean isRunning = true;
            private final Random random = new Random();

            @Override
            public void run(SourceContext> ctx) throws Exception {
                while (isRunning) {
                    TimeUnit.SECONDS.sleep(1);
                    ctx.collect(Tuple2.of(FRUIT[random.nextInt(FRUIT.length)], 1));
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        });

        fruit.keyBy(0).map(new RichMapFunction, Tuple2>() {
            private ValueState> valueState;

            @Override
            public void open(Configuration parameters) throws Exception {
                StateTtlConfig ttlConfig = StateTtlConfig
                        .newBuilder(Time.seconds(2))
                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                        .cleanupIncrementally(10, true)
                        .build();

                ValueStateDescriptor stateDescriptor = new ValueStateDescriptor("key-fruit", Types.TUPLE(Types.STRING, Types.INT));
                stateDescriptor.enableTimeToLive(ttlConfig);
                valueState = getRuntimeContext().getState(stateDescriptor);
            }

            @Override
            public Tuple2 map(Tuple2 tuple2) throws Exception {
                Tuple2 currentState = valueState.value();
                // 初始化 ValueState 值
                if (null == currentState) {
                    currentState = new Tuple2<>(tuple2.f0, 0);
                }

                Tuple2 newState = new Tuple2<>(currentState.f0, currentState.f1 + tuple2.f1);
                // 更新 ValueState 值
                valueState.update(newState);

                return Tuple2.of(newState.f0, newState.f1);
            }
        }).print();

        env.execute("fruit");
    }

执行结果

欢迎分享,转载请注明来源:内存溢出

原文地址: http://www.outofmemory.cn/zaji/5684999.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存