在Flink中,我们知道map ,flatMap,reduce算子都可以自定义函数实现,比如MapFunction:
public class MyMapFunction implements MapFunction{ @Override public Integer map(String s) throws Exception { return Integer.parseInt(s); } }
同时,Flink中还提供了对应的Rich函数,比如RichMapFunction,RichFlatMapFunction,RichReduceFunciton,而Rich相关函数都会继承AbstractRichFunction,这个类中会实现如下几个方法:
// Flink在算子调用前会执行open方法 public void open(Configuration parameters) throws Exception // 获取Flink运行时上下文,每个并行的算子任务都有一个上下文,会记录算子执行过程中一些信息,包括算子的并行度、任务序号、广播数据、累加器、监控数据、以及重要的状态数据 public RuntimeContext getRuntimeContext() ; public void close()
Rich算子是Flink中状态计算的实现入口,我们这里模拟实现一个:
public class MyRichMapFunction extends RichMapFunction{ private MapState mapState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); mapState = getRuntimeContext().getMapState(new MapStateDescriptor ("testCount",String.class,Integer.class)); } @Override public void close() throws Exception { super.close(); } @Override public Integer map(String s) throws Exception { if(!mapState.contains(s)){ mapState.put(s,0); } mapState.put(s,mapState.get(s)+1); return Integer.parseInt(s); } }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)