SparkEnv是在SparkContext初始化的时候创建的,该对象里面包含了closureSerializer,该对象通过new JavaSerializer创建。既然序列化太慢,又因为我们其实是在Local模式下,本身是可以不需要序列化的,所以我们这里想办法把closureSerializer的实现替换掉。正如我们前面吐槽,因为在Spark代码里写死了,没有暴露任何自定义的可能性,所以我们又要魔改一下了。
首先,我们新建一个SparkEnv的子类:
- class WowSparkEnv(
- ....) extends SparkEnv(
接着实现一个自定义的Serializer:
- class LocalNonOpSerializerInstance(javaD: SerializerInstance) extends SerializerInstance {
-
- private def isClosure(cls: Class[_]): Boolean = {
- cls.getName.contains("$anonfun$")
- }
-
- override def serialize[T: ClassTag](t: T): ByteBuffer = {
- if (isClosure(t.getClass)) {
- val uuid = UUID.randomUUID().toString
- LocalNonOpSerializerInstance.maps.put(uuid, t.asInstanceOf[AnyRef])
- ByteBuffer.wrap(uuid.getBytes())
- } else {
- javaD.serialize(t)
- }
-
- }
-
- override def deserialize[T: ClassTag](bytes: ByteBuffer): T = {
- val s = StandardCharsets.UTF_8.decode(bytes).toString()
- if (LocalNonOpSerializerInstance.maps.containsKey(s)) {
- LocalNonOpSerializerInstance.maps.remove(s).asInstanceOf[T]
- } else {
- bytes.flip()
- javaD.deserialize(bytes)
- }
-
- }
-
- override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = {
- val s = StandardCharsets.UTF_8.decode(bytes).toString()
- if (LocalNonOpSerializerInstance.maps.containsKey(s)) {
- LocalNonOpSerializerInstance.maps.remove(s).asInstanceOf[T]
- } else {
- bytes.flip()
- javaD.deserialize(bytes, loader)
- }
- }
-
- override def serializeStream(s: OutputStream): SerializationStream = {
- javaD.serializeStream(s)
- }
-
- override def deserializeStream(s: InputStream): DeserializationStream = {
- javaD.deserializeStream(s)
- }
(编辑:惠州站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|