Flink中自定义Rich函数实现

Flink中自定义Rich函数实现,第1张

Flink中自定义Rich函数实现

在Flink中,我们知道map ,flatMap,reduce算子都可以自定义函数实现,比如MapFunction:

public class MyMapFunction implements MapFunction {
    @Override
    public Integer map(String s) throws Exception {
        return Integer.parseInt(s);
    }
}

同时,Flink中还提供了对应的Rich函数,比如RichMapFunction,RichFlatMapFunction,RichReduceFunciton,而Rich相关函数都会继承AbstractRichFunction,这个类中会实现如下几个方法:

// Flink在算子调用前会执行open方法
public void open(Configuration parameters) throws Exception 
// 获取Flink运行时上下文,每个并行的算子任务都有一个上下文,会记录算子执行过程中一些信息,包括算子的并行度、任务序号、广播数据、累加器、监控数据、以及重要的状态数据
public RuntimeContext getRuntimeContext() ;
public void close()

Rich算子是Flink中状态计算的实现入口,我们这里模拟实现一个:

public class MyRichMapFunction extends RichMapFunction {

    private MapState mapState;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        mapState = getRuntimeContext().getMapState(new MapStateDescriptor("testCount",String.class,Integer.class));
    }

    @Override
    public void close() throws Exception {
        super.close();
    }

    @Override
    public Integer map(String s) throws Exception {
        if(!mapState.contains(s)){
            mapState.put(s,0);
        }
        mapState.put(s,mapState.get(s)+1);
        return Integer.parseInt(s);
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://www.outofmemory.cn/zaji/5700653.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存