Spark-SparkSQL深入学习系列八(转自OopsOutOfMemory)

 /** Spark SQL源码分析系列文章*/ 

在SQL的世界里,除了官方提供的常用的处理函数之外,一般都会提供可扩展的对外自定义函数接口,这已经成为一种事实的标准。

  在前面Spark
SQL源码分析之核心流程
一文中,已经介绍了Spark SQL Catalyst Analyzer的作用,其中包含了ResolveFunctions这个解析函数的功能。但是随着Spark1.1版本的发布,Spark
SQL的代码有很多新完善和新功能了,和我先前基于1.0的源码分析多少有些不同,比如支持UDF:

  spark1.0及以前的实现:

[java] view
plain
 copy

  1. protected[sql] lazy val catalog: Catalog = new SimpleCatalog  
  2. @transient  
  3. protected[sql] lazy val analyzer: Analyzer =  
  4.   new Analyzer(catalog, EmptyFunctionRegistry, caseSensitive = true) //EmptyFunctionRegistry空实现  
  5. @transient  
  6. protected[sql] val optimizer = Optimizer  

  Spark1.1及以后的实现:

[java] view
plain
 copy

  1. protected[sql] lazy val functionRegistry: FunctionRegistry = new SimpleFunctionRegistry //SimpleFunctionRegistry实现,支持简单的UDF  
  2.   
  3. @transient  
  4. protected[sql] lazy val analyzer: Analyzer =  
  5.   new Analyzer(catalog, functionRegistry, caseSensitive = true)  

一、引子:

  对于SQL语句中的函数,会经过SqlParser的的解析成UnresolvedFunction。UnresolvedFunction最后会被Analyzer解析。

 SqlParser:

 除了非官方定义的函数外,还可以定义自定义函数,sql parser会进行解析。

[java] view
plain
 copy

  1. ident ~ "(" ~ repsep(expression, ",") <~ ")" ^^ {  
  2.     case udfName ~ _ ~ exprs => UnresolvedFunction(udfName, exprs)  

  将SqlParser传入的udfName和exprs封装成一个class class UnresolvedFunction继承自Expression。

  只是这个Expression的dataType等一系列属性和eval计算方法均无法访问,强制访问会抛出异常,因为它没有被Resolved,只是一个载体。

[java] view
plain
 copy

  1. case class UnresolvedFunction(name: String, children: Seq[Expression]) extends Expression {  
  2.   override def dataType = throw new UnresolvedException(this, "dataType")  
  3.   override def foldable = throw new UnresolvedException(this, "foldable")  
  4.   override def nullable = throw new UnresolvedException(this, "nullable")  
  5.   override lazy val resolved = false  
  6.   
  7.   // Unresolved functions are transient at compile time and don't get evaluated during execution.  
  8.   override def eval(input: Row = null): EvaluatedType =  
  9.     throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")  
  10.   
  11.   override def toString = s"'$name(${children.mkString(",")})"  
  12. }<strong></strong>  

Analyzer:

  Analyzer初始化的时候会需要Catalog,database和table的元数据关系,以及FunctionRegistry来维护UDF名称和UDF实现的元数据,这里使用SimpleFunctionRegistry。

[java] view
plain
 copy

  1. /** 
  2.  * Replaces [[UnresolvedFunction]]s with concrete [[catalyst.expressions.Expression Expressions]]. 
  3.  */  
  4. object ResolveFunctions extends Rule[LogicalPlan] {  
  5.   def apply(plan: LogicalPlan): LogicalPlan = plan transform {  
  6.     case q: LogicalPlan =>  
  7.       q transformExpressions { //对当前LogicalPlan进行transformExpressions操作  
  8.         case u @ UnresolvedFunction(name, children) if u.childrenResolved => //如果遍历到了UnresolvedFunction  
  9.           registry.lookupFunction(name, children) //从UDF元数据表里查找udf函数  
  10.       }  
  11.   }  
  12. }  

二、UDF注册

2.1 UDFRegistration

  

  registerFunction("len", (x:String)=>x.length)

  registerFunction是UDFRegistration下的方法,SQLContext现在实现了UDFRegistration这个trait,只要导入SQLContext,即可以使用udf功能。

  UDFRegistration核心方法registerFunction:

  registerFunction方法签名def registerFunction[T: TypeTag](name: String, func: Function1[_, T]): Unit

  接受一个udfName 和 一个FunctionN,可以是Function1 到Function22。即这个udf的参数只支持1-22个。(scala的痛啊)

  内部builder通过ScalaUdf来构造一个Expression,这里ScalaUdf继承自Expression(可以简单的理解目前的SimpleUDF即是一个Catalyst的一个Expression),传入scala的function作为UDF的实现,并且用反射检查字段类型是否是Catalyst允许的,见ScalaReflection.

[java] view
plain
 copy

  1. def registerFunction[T: TypeTag](name: String, func: Function1[_, T]): Unit = {  
  2. def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)//构造Expression  
  3. functionRegistry.registerFunction(name, builder)//向SQLContext的functionRegistry(维护了一个hashMap来管理udf映射)注册  

2.2 注册Function:

注意:这里FunctionBuilder是一个type FunctionBuilder = Seq[Expression] => Expression

[java] view
plain
 copy

  1. class SimpleFunctionRegistry extends FunctionRegistry {  
  2.   val functionBuilders = new mutable.HashMap[String, FunctionBuilder]() //udf映射关系维护[udfName,Expression]  
  3.   
  4.   def registerFunction(name: String, builder: FunctionBuilder) = { //put expression进Map  
  5.     functionBuilders.put(name, builder)  
  6.   }  
  7.   
  8.   override def lookupFunction(name: String, children: Seq[Expression]): Expression = {  
  9.     functionBuilders(name)(children) //查找udf,返回Expression  
  10.   }  
  11. }  

至此,我们将一个scala function注册为一个catalyst的一个Expression,这就是spark的simple udf。

三、UDF计算:

UDF既然已经被封装为catalyst树里的一个Expression节点,那么计算的时候也就是计算ScalaUdf的eval方法。

先通过Row和表达式计算function所需要的参数,最后通过反射调用function,来达到计算udf的目的。

 ScalaUdf继承自Expression:

scalaUdf接受一个function, dataType,和一系列表达式。

比较简单,看注释即可:

[java] view
plain
 copy

  1. case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expression])  
  2.   extends Expression {  
  3.   
  4.   type EvaluatedType = Any  
  5.   
  6.   def nullable = true  
  7.   
  8.   override def toString = s"scalaUDF(${children.mkString(",")})"  
  9.  override def eval(input: Row): Any = {  
  10.     val result = children.size match {  
  11.       case 0 => function.asInstanceOf[() => Any]()  
  12.       case 1 => function.asInstanceOf[(Any) => Any](children(0).eval(input)) //反射调用function  
  13.       case 2 =>  
  14.         function.asInstanceOf[(Any, Any) => Any](  
  15.           children(0).eval(input), //表达式参数计算  
  16.           children(1).eval(input))  
  17.       case 3 =>  
  18.         function.asInstanceOf[(Any, Any, Any) => Any](  
  19.           children(0).eval(input),  
  20.           children(1).eval(input),  
  21.           children(2).eval(input))  
  22.       case 4 =>  
  23.      ......  
  24.        case 22 => //scala function只支持22个参数,这里枚举了。  
  25.         function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](  
  26.           children(0).eval(input),  
  27.           children(1).eval(input),  
  28.           children(2).eval(input),  
  29.           children(3).eval(input),  
  30.           children(4).eval(input),  
  31.           children(5).eval(input),  
  32.           children(6).eval(input),  
  33.           children(7).eval(input),  
  34.           children(8).eval(input),  
  35.           children(9).eval(input),  
  36.           children(10).eval(input),  
  37.           children(11).eval(input),  
  38.           children(12).eval(input),  
  39.           children(13).eval(input),  
  40.           children(14).eval(input),  
  41.           children(15).eval(input),  
  42.           children(16).eval(input),  
  43.           children(17).eval(input),  
  44.           children(18).eval(input),  
  45.           children(19).eval(input),  
  46.           children(20).eval(input),  
  47.           children(21).eval(input))  

四、总结

    Spark目前的UDF其实就是scala function。将scala function封装到一个Catalyst Expression当中,在进行sql计算时,使用同样的Eval方法对当前输入Row进行计算。

    编写一个spark udf非常简单,只需给UDF起个函数名,并且传递一个scala function即可。依靠scala函数编程的表现能力,使得编写scala udf比较简单,且相较hive的udf更容易使人理解。

——EOF——

原创文章,转载请注明:

转载自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory

本文链接地址:http://blog.csdn.net/oopsoom/article/details/39395641

注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议,欢迎转载、转发和评论,但是请保留本文作者署名和文章链接。如若需要用于商业目的或者与授权方面的协商,请联系我。

时间: 2016-05-11

Spark-SparkSQL深入学习系列八(转自OopsOutOfMemory)的相关文章

Python爬虫学习系列教程

Python版本:2.7 一.爬虫入门 1. Python爬虫入门一之综述 2. Python爬虫入门二之爬虫基础了解 3. Python爬虫入门三之Urllib库的基本使用 4. Python爬虫入门四之Urllib库的高级用法 5. Python爬虫入门五之URLError异常处理 6. Python爬虫入门六之Cookie的使用 7. Python爬虫入门七之正则表达式 二.爬虫实战 1. Python爬虫实战一之爬取糗事百科段子 2. Python爬虫实战二之爬取百度贴吧帖子 3. Py

Asp.Net Web API 2 官网菜鸟学习系列导航[持续更新中]

原文:Asp.Net Web API 2 官网菜鸟学习系列导航[持续更新中] 前言 本来一直参见于微软官网进行学习的, 官网网址http://www.asp.net/web-api.出于自己想锻炼一下学习阅读英文文章的目的,又可以学习下微软新发布的技术,其实也很久了,但自己菜鸟一枚,对自己来说都是新技术了.鉴于以上两个原因,本人打算借助google翻译和有道词典,来翻译学习这个系列,并通过博客园来记录自己的翻译学习过程.由于自己阅读水平的确太菜,在借助工具的情况下,有时候搞出来的也是蹩脚的语句,

kvm虚拟化学习笔记(八)之kvm虚拟机vnc配置

原创作品,允许转载,转载时请务必以超链接形式标明文章 原始出处 .作者信息和本声明.否则将追究法律责任.http://koumm.blog.51cto.com/703525/1291803 KVM虚拟化学习笔记系列文章列表 ---------------------------------------- kvm虚拟化学习笔记(一)之kvm虚拟化环境安装http://koumm.blog.51cto.com/703525/1288795 kvm虚拟化学习笔记(二)之linux kvm虚拟机安装 h

ExtJs2.0学习系列(2)--Ext.Panel

上一篇文章ExtJs2.0学习系列(1)--Ext.MessageBox ,受到了大家的褒贬不一,还是有的朋友提出好的建议,在此表示感谢! 今天介绍extjs中的Panel组件. //html代码 <div id="container"> </div> //js代码 var p = new Ext.Panel({ title: 'My Panel',//标题 collapsible:true,//右上角上的那个收缩按钮,设为false则不显示 renderTo:

ExtJs2.0学习系列(6)--Ext.FormPanel之第三式(ComboBox篇)

前言:说句实话,此extjs系列的文章在博客园中的热度不高,可能是学这玩意的人不多吧,但是我觉得有这么个系列的文章对于中国朋友非常有帮助!请大家支持! 上篇ExtJs2.0学习系列(5)--Ext.FormPanel之第二式中我们讨论了下fieldset和表单验证的知识,今天我们接着深入解析表单元素中ComboBox组件的使用.会涉及 到.net简单服务器数据交互,但暂不做深入讨论,以后会详细分析服务器交互相关,不过可能要等较长一段时间,呵呵! 5.服务器数据作为ComboBox的数据源实例 首

JAVA/JSP学习系列之八(改写MySQL翻页例子)

js|mysql|翻页 一.前言 其实,改写后的JDBC Data-Source是运行在Servlet中的,通过JNDI去查找数据源.我用Orion试的,将本站<JAVA/JSP学习系列之六(MySQL翻页例子) > 简单改写了一下. 二.配置 (1)JDBC 需要将用到的JDBC驱动Copy到[ORION]/lib目录下 (2)data-source 在[ORION]/config/data-sources.xml文件中加入如下: 〈data-source class="com.e

ExtJs2.0学习系列

ExtJs2.0学习系列(15)--extjs换肤 ExtJs2.0学习系列(14)--Ext.TreePanel之第三式(可增删改的树) ExtJs2.0学习系列(13)--Ext.TreePanel之第二式 ExtJs2.0学习系列(12)--Ext.TreePanel之第一式 ExtJs2.0学习系列(11)--Ext.XTemplate ExtJs2.0学习系列(10)--Ext.TabPanel之第二式 ExtJs2.0学习系列(9)--Ext.TabPanel之第一式 ExtJs2.

ExtJs2.0学习系列(12)--Ext.TreePanel之第一式

今天开始,我们就开始一起学习TreePanel了,道个歉,上篇的代码很乱阿. 我总是喜欢用最简单的例子开始,去理解最基本的使用方法,减少对i后面高级使用的干扰! TreePanel是继承自Panel,所以很多在Panel中谈到的属性这里可能会一笔带过,如有问题,请参考ExtJs2.0学习系列(2)--Ext.Panel 1.第一个静态树--最简单的树 效果图: html代码: <div id="container"> </div> js代码: Ext.onRea

ExtJs2.0学习系列(3)--Ext.Window

前面介绍了panel组件--ExtJs2.0学习系列(2)--Ext.Panel,今天将介绍window组件,它继承自panel. 先介绍个最简单例子 //html代码 <div id="win" class="x-hidden"> </div> //js代码 var w=new Ext.Window({ contentEl:"win",//主体显示的html元素,也可以写为el:"win" width