hive学习笔记之十:用户自定义聚合函数(UDAF),区区一个SpringBoot问题就被干趴下了

hive学习笔记之十:用户自定义聚合函数(UDAF),区区一个SpringBoot问题就被干趴下了,第1张

hive学习笔记之十:用户自定义聚合函数(UDAF),区区一个SpringBoot问题就被干趴下了
  • 合并值缓冲区大小,这里是用来保存字符串长度,因此设为4byte

  • @return

*/

@Override

public int estimate() {

return JavaDataModel.PRIMITIVES1;

}

}

  1. 新建FieldLengthUDAFevaluator.java,里面是整个UDAF逻辑实现,关键代码已经添加了注释,请结合前面的图片来理解,核心思路是iterate将当前分组的字段处理完毕,merger把分散的数据合并起来,再由terminate决定当前分组计算结果:

package com.bolingcavalry.hiveudf.udaf;

import org.apache.hadoop.hive.ql.metadata.HiveException;

import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFevaluator;

import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;

import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;

public class FieldLengthUDAFevaluator extends GenericUDAFevaluator {

PrimitiveObjectInspector inputOI;

ObjectInspector outputOI;

PrimitiveObjectInspector integerOI;

@Override

public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {

super.init(m, parameters);

// COMPLETE或者PARTIAL1,输入的都是数据库的原始数据

if(Mode.PARTIAL1.equals(m) || Mode.COMPLETE.equals(m)) {

inputOI = (PrimitiveObjectInspector) parameters[0];

} else {

// PARTIAL2和FINAL阶段,都是基于前一个阶段init返回值作为parameters入参

integerOI = (PrimitiveObjectInspector) parameters[0];

}

outputOI = ObjectInspectorFactory.getReflectionObjectInspector(

Integer.class,

ObjectInspectorFactory.ObjectInspectorOptions.JAVA

);

// 给下一个阶段用的,即告诉下一个阶段,自己输出数据的类型

return outputOI;

}

public AggregationBuffer getNewAggregationBuffer() throws HiveException {

return new FieldLengthAggregationBuffer();

}

public void reset(AggregationBuffer agg) throws HiveException {

((FieldLengthAggregationBuffer)agg).setValue(0);

}

public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {

if(null==parameters || parameters.length<1) {

return;

}

Object javaObj = inputOI.getPrimitiveJavaObject(parameters[0]);

((FieldLengthAggregationBuffer)agg).add(String.valueOf(javaObj).length());

}

public Object terminate(AggregationBuffer agg) throws HiveException {

return ((FieldLengthAggregationBuffer)agg).getValue();

}

public Object terminatePartial(AggregationBuffer agg) throws HiveException {

return terminate(agg);

}

public void merge(AggregationBuffer agg, Object partial) throws HiveException {

((FieldLengthAggregationBuffer) agg).add((Integer)integerOI.getPrimitiveJavaObject(partial));

}

}

  1. 最后是FieldLength.java,该类注册UDAF到hive时用到的,负责实例化FieldLengthUDAFevaluator,给hive使用:

package com.bolingcavalry.hiveudf.udaf;

import org.apache.hadoop.hive.ql.parse.SemanticException;

import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;

import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFevaluator;

import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo;

import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;

public class FieldLength extends AbstractGenericUDAFResolver {

@Override

public GenericUDAFevaluator getevaluator(GenericUDAFParameterInfo info) throws SemanticException {

return new FieldLengthUDAFevaluator();

}

@Override

public GenericUDAFevaluator getevaluator(TypeInfo[] info) throws SemanticException

《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》

【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享

{

return new FieldLengthUDAFevaluator();

}

}

至此,编码完成,接下来是部署和体验;

部署和体验

本次部署的注册方式是临时函数,如果您想注册为永久函数,请参考前文;

  1. 在pom.xml所在目录执行mvn clean package -U,即可编译构建;

  2. 在target目录得到文件hiveudf-1.0-SNAPSHOT.jar;

  3. 上传到hive服务器,我这里是放在/home/hadoop/udf目录;

  4. 进入hive会话,执行以下命令添加jar:

add jar /home/hadoop/udf/hiveudf-1.0-SNAPSHOT.jar;

  1. 执行以下命令注册:

create temporary function udf_fieldlength as ‘com.bolingcavalry.hiveudf.udaf.FieldLength’;

  1. 找一个适合执行group by的表试试,我这里是前面的文章中创建的address表,完整数据如下:

hive> select * from address;

OK

1 guangdong guangzhou

2 guangdong shenzhen

3 shanxi xian

4 shanxi hanzhong

6 jiangshu nanjing

  1. 执行下面的SQL:

select province, count(city), udf_fieldlength(city) from address group by province;

执行结果如下,可见guangdong的guangzhou和shenzhen总长度为17,jiangsu的nanjing为7,shanxi的xian和hanzhong总长度12,符合预期:

Total MapReduce CPU Time Spent: 2 seconds 730 msec

OK

guangdong 2 17

jiangshu 1 7

shanxi 2 12

Time taken: 28.484 seconds, Fetched: 3 row(s)

欢迎分享,转载请注明来源:内存溢出

原文地址: https://www.outofmemory.cn/zaji/5683055.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存