加入收藏 | 设为首页 | 会员中心 | 我要投稿 惠州站长网 (https://www.0752zz.com.cn/)- 办公协同、云通信、物联设备、操作系统、高性能计算!
当前位置: 首页 > 教程 > 正文

对Spark的那些【魔改】

发布时间:2018-08-18 11:12:25 所属栏目:教程 来源:祝威廉
导读:副标题#e# 技术沙龙 | 邀您于8月25日与国美/AWS/转转三位专家共同探讨小程序电商实战 前言 这两年做 streamingpro 时,不可避免的需要对Spark做大量的增强。就如同我之前吐槽的,Spark大量使用了new进行对象的创建,导致里面的实现基本没有办法进行替换。

SparkEnv是在SparkContext初始化的时候创建的,该对象里面包含了closureSerializer,该对象通过new JavaSerializer创建。既然序列化太慢,又因为我们其实是在Local模式下,本身是可以不需要序列化的,所以我们这里想办法把closureSerializer的实现替换掉。正如我们前面吐槽,因为在Spark代码里写死了,没有暴露任何自定义的可能性,所以我们又要魔改一下了。

首先,我们新建一个SparkEnv的子类:

  1. class WowSparkEnv( 
  2.                    ....) extends SparkEnv( 

接着实现一个自定义的Serializer:

  1. class LocalNonOpSerializerInstance(javaD: SerializerInstance) extends SerializerInstance { 
  2.  
  3.   private def isClosure(cls: Class[_]): Boolean = { 
  4.     cls.getName.contains("$anonfun$") 
  5.   } 
  6.  
  7.   override def serialize[T: ClassTag](t: T): ByteBuffer = { 
  8.     if (isClosure(t.getClass)) { 
  9.       val uuid = UUID.randomUUID().toString 
  10.       LocalNonOpSerializerInstance.maps.put(uuid, t.asInstanceOf[AnyRef]) 
  11.       ByteBuffer.wrap(uuid.getBytes()) 
  12.     } else { 
  13.       javaD.serialize(t) 
  14.     } 
  15.  
  16.   } 
  17.  
  18.   override def deserialize[T: ClassTag](bytes: ByteBuffer): T = { 
  19.     val s = StandardCharsets.UTF_8.decode(bytes).toString() 
  20.     if (LocalNonOpSerializerInstance.maps.containsKey(s)) { 
  21.       LocalNonOpSerializerInstance.maps.remove(s).asInstanceOf[T] 
  22.     } else { 
  23.       bytes.flip() 
  24.       javaD.deserialize(bytes) 
  25.     } 
  26.  
  27.   } 
  28.  
  29.   override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = { 
  30.     val s = StandardCharsets.UTF_8.decode(bytes).toString() 
  31.     if (LocalNonOpSerializerInstance.maps.containsKey(s)) { 
  32.       LocalNonOpSerializerInstance.maps.remove(s).asInstanceOf[T] 
  33.     } else { 
  34.       bytes.flip() 
  35.       javaD.deserialize(bytes, loader) 
  36.     } 
  37.   } 
  38.  
  39.   override def serializeStream(s: OutputStream): SerializationStream = { 
  40.     javaD.serializeStream(s) 
  41.   } 
  42.  
  43.   override def deserializeStream(s: InputStream): DeserializationStream = { 
  44.     javaD.deserializeStream(s) 
  45.   } 

(编辑:惠州站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

推荐文章
    热点阅读