29
03/2021
Spark任务中空间数据的序列化
一、引言
objectWKBUtils {
private val readerPool = newThreadLocal[WKBReader]{
override def initialValue:WKBReader =new WKBReader
}
private val writerPool = newThreadLocal[WKBWriter]{
override def initialValue:WKBWriter =new WKBWriter
}
def read(bytes: Array[Byte]): Geometry= readerPool.get.read(bytes)
def write(geom: Geometry): Array[Byte]= writerPool.get.write(geom)
}classGeometrySerializer extends Serializer[Geometry] {
override def write(kryo: Kryo, output:Output, geom: Geometry): Unit= {
val spatialBytes: Array[Byte] =WKBUtils.write(geom)
output.writeInt(spatialBytes.length)
output.writeBytes(spatialBytes)
}
override def read(kryo: Kryo, input:Input, clazz: Class[Geometry]):Geometry= {
val length = input.readInt()
WKBUtils.read(input.readBytes(length))
}
}classGeometryKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo:Kryo): Unit = {
val geometrySerializer = new GeometrySerializer
kryo.register(classOf[Point],geometrySerializer)
kryo.register(classOf[LineString],geometrySerializer)
kryo.register(classOf[Polygon],geometrySerializer)
kryo.register(classOf[MultiPoint],geometrySerializer)
kryo.register(classOf[MultiLineString],geometrySerializer)
kryo.register(classOf[MultiPolygon],geometrySerializer)
}
}val conf= new SparkConf().set("spark.serializer",classOf[KryoSerializer].getName)
.set("spark.kryo.registrator",classOf[GeometryKryoRegistrator].getName)
val spark = new SparkContext(conf)
转载请注明:康瑞部落 » Spark任务中空间数据的序列化

0 条评论