Protobuf serializers for remote deployment #22332

This commit is contained in:
Johan Andrén 2017-03-16 15:12:35 +01:00 committed by GitHub
parent 3e4f44765c
commit 3643f18ded
24 changed files with 12700 additions and 314 deletions

View file

@ -3,12 +3,21 @@
*/
package akka.remote.serialization
import akka.actor._
import akka.protobuf.ByteString
import akka.remote.{ ContainerFormats, RemoteWatcher }
import akka.serialization.{ BaseSerializer, Serialization, SerializationExtension, SerializerWithStringManifest }
import java.util.Optional
import java.io.NotSerializableException
import java.nio.charset.StandardCharsets
import java.util.Optional
import java.util.concurrent.TimeUnit
import akka.actor._
import akka.dispatch.Dispatchers
import akka.remote.routing.RemoteRouterConfig
import akka.remote.{ ContainerFormats, RemoteScope, RemoteWatcher, WireFormats }
import akka.routing._
import akka.serialization.{ BaseSerializer, Serialization, SerializationExtension, SerializerWithStringManifest }
import com.typesafe.config.{ Config, ConfigFactory, ConfigRenderOptions }
import scala.collection.JavaConverters._
import scala.concurrent.duration.{ FiniteDuration, TimeUnit }
class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest with BaseSerializer {
@ -18,23 +27,36 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW
private val throwableSupport = new ThrowableSupport(system)
private val ParameterlessSerializedMessage = Array.empty[Byte]
private val EmptyConfig = ConfigFactory.empty()
def toBinary(obj: AnyRef): Array[Byte] = obj match {
case identify: Identify serializeIdentify(identify)
case identity: ActorIdentity serializeActorIdentity(identity)
case Some(value) serializeSome(value)
case None ParameterlessSerializedMessage
case o: Optional[_] serializeOptional(o)
case r: ActorRef serializeActorRef(r)
case s: Status.Success serializeStatusSuccess(s)
case f: Status.Failure serializeStatusFailure(f)
case ex: ActorInitializationException serializeActorInitializationException(ex)
case t: Throwable throwableSupport.serializeThrowable(t)
case PoisonPill ParameterlessSerializedMessage
case Kill ParameterlessSerializedMessage
case RemoteWatcher.Heartbeat ParameterlessSerializedMessage
case hbrsp: RemoteWatcher.HeartbeatRsp serializeHeartbeatRsp(hbrsp)
case _ throw new IllegalArgumentException(s"Cannot serialize object of type [${obj.getClass.getName}]")
case identify: Identify serializeIdentify(identify)
case identity: ActorIdentity serializeActorIdentity(identity)
case Some(value) serializeSome(value)
case None ParameterlessSerializedMessage
case o: Optional[_] serializeOptional(o)
case r: ActorRef serializeActorRef(r)
case s: Status.Success serializeStatusSuccess(s)
case f: Status.Failure serializeStatusFailure(f)
case ex: ActorInitializationException serializeActorInitializationException(ex)
case t: Throwable throwableSupport.serializeThrowable(t)
case PoisonPill ParameterlessSerializedMessage
case Kill ParameterlessSerializedMessage
case RemoteWatcher.Heartbeat ParameterlessSerializedMessage
case hbrsp: RemoteWatcher.HeartbeatRsp serializeHeartbeatRsp(hbrsp)
case rs: RemoteScope serializeRemoteScope(rs)
case LocalScope ParameterlessSerializedMessage
case c: Config serializeConfig(c)
case dr: DefaultResizer serializeDefaultResizer(dr)
case fc: FromConfig serializeFromConfig(fc)
case bp: BalancingPool serializeBalancingPool(bp)
case bp: BroadcastPool serializeBroadcastPool(bp)
case rp: RandomPool serializeRandomPool(rp)
case rrp: RoundRobinPool serializeRoundRobinPool(rrp)
case sgp: ScatterGatherFirstCompletedPool serializeScatterGatherFirstCompletedPool(sgp)
case tp: TailChoppingPool serializeTailChoppingPool(tp)
case rrc: RemoteRouterConfig serializeRemoteRouterConfig(rrc)
case _ throw new IllegalArgumentException(s"Cannot serialize object of type [${obj.getClass.getName}]")
}
private def serializeIdentify(identify: Identify): Array[Byte] =
@ -80,6 +102,12 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW
ContainerFormats.WatcherHeartbeatResponse.newBuilder().setUid(hbrsp.addressUid).build().toByteArray
}
private def serializeRemoteScope(rs: RemoteScope): Array[Byte] = {
val builder = WireFormats.RemoteScope.newBuilder()
builder.setNode(buildAddressData(rs.node))
builder.build().toByteArray
}
private def actorRefBuilder(actorRef: ActorRef): ContainerFormats.ActorRef.Builder =
ContainerFormats.ActorRef.newBuilder()
.setPath(Serialization.serializedActorPath(actorRef))
@ -101,6 +129,119 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW
.build().toByteArray
}
private def serializeConfig(c: Config): Array[Byte] = {
c.root.render(ConfigRenderOptions.concise()).getBytes(StandardCharsets.UTF_8)
}
private def serializeDefaultResizer(dr: DefaultResizer): Array[Byte] = {
val builder = WireFormats.DefaultResizer.newBuilder()
builder.setBackoffRate(dr.backoffRate)
builder.setBackoffThreshold(dr.backoffThreshold)
builder.setLowerBound(dr.lowerBound)
builder.setMessagesPerResize(dr.messagesPerResize)
builder.setPressureThreshold(dr.pressureThreshold)
builder.setRampupRate(dr.rampupRate)
builder.setUpperBound(dr.upperBound)
builder.build().toByteArray
}
private def serializeFromConfig(fc: FromConfig): Array[Byte] =
if (fc == FromConfig) ParameterlessSerializedMessage
else {
val builder = WireFormats.FromConfig.newBuilder()
if (fc.resizer.isDefined)
builder.setResizer(payloadSupport.payloadBuilder(fc.resizer.get))
if (fc.routerDispatcher != Dispatchers.DefaultDispatcherId)
builder.setRouterDispatcher(fc.routerDispatcher)
builder.build().toByteArray
}
private def serializeBalancingPool(bp: BalancingPool): Array[Byte] = {
buildGenericRoutingPool(bp.nrOfInstances, bp.routerDispatcher, bp.usePoolDispatcher, bp.resizer).toByteArray
}
private def serializeBroadcastPool(bp: BroadcastPool): Array[Byte] = {
buildGenericRoutingPool(bp.nrOfInstances, bp.routerDispatcher, bp.usePoolDispatcher, bp.resizer).toByteArray
}
private def serializeRandomPool(rp: RandomPool): Array[Byte] = {
buildGenericRoutingPool(rp.nrOfInstances, rp.routerDispatcher, rp.usePoolDispatcher, rp.resizer).toByteArray
}
private def serializeRoundRobinPool(rp: RoundRobinPool): Array[Byte] = {
buildGenericRoutingPool(rp.nrOfInstances, rp.routerDispatcher, rp.usePoolDispatcher, rp.resizer).toByteArray
}
private def serializeScatterGatherFirstCompletedPool(sgp: ScatterGatherFirstCompletedPool): Array[Byte] = {
val builder = WireFormats.ScatterGatherPool.newBuilder()
builder.setGeneric(buildGenericRoutingPool(sgp.nrOfInstances, sgp.routerDispatcher, sgp.usePoolDispatcher, sgp.resizer))
builder.setWithin(buildFiniteDuration(sgp.within))
builder.build().toByteArray
}
private def serializeTailChoppingPool(tp: TailChoppingPool): Array[Byte] = {
val builder = WireFormats.TailChoppingPool.newBuilder()
builder.setGeneric(buildGenericRoutingPool(tp.nrOfInstances, tp.routerDispatcher, tp.usePoolDispatcher, tp.resizer))
builder.setWithin(buildFiniteDuration(tp.within))
builder.setInterval(buildFiniteDuration(tp.interval))
builder.build().toByteArray
}
private def serializeRemoteRouterConfig(rrc: RemoteRouterConfig): Array[Byte] = {
val builder = WireFormats.RemoteRouterConfig.newBuilder()
builder.setLocal(payloadSupport.payloadBuilder(rrc.local).build())
builder.addAllNodes(rrc.nodes.map(buildAddressData).asJava)
builder.build().toByteArray
}
private def buildGenericRoutingPool(
nrOfInstances: Int,
routerDispatcher: String,
usePoolDispatcher: Boolean,
resizer: Option[Resizer]): WireFormats.GenericRoutingPool = {
val builder = WireFormats.GenericRoutingPool.newBuilder()
builder.setNrOfInstances(nrOfInstances)
if (routerDispatcher != Dispatchers.DefaultDispatcherId) {
builder.setRouterDispatcher(routerDispatcher)
}
if (resizer.isDefined) {
builder.setResizer(payloadSupport.payloadBuilder(resizer.get))
}
builder.setUsePoolDispatcher(usePoolDispatcher)
builder.build()
}
private def timeUnitToWire(unit: TimeUnit): WireFormats.TimeUnit = unit match {
case TimeUnit.NANOSECONDS WireFormats.TimeUnit.NANOSECONDS
case TimeUnit.MICROSECONDS WireFormats.TimeUnit.MICROSECONDS
case TimeUnit.MILLISECONDS WireFormats.TimeUnit.MILLISECONDS
case TimeUnit.SECONDS WireFormats.TimeUnit.SECONDS
case TimeUnit.MINUTES WireFormats.TimeUnit.MINUTES
case TimeUnit.HOURS WireFormats.TimeUnit.HOURS
case TimeUnit.DAYS WireFormats.TimeUnit.DAYS
}
private def buildFiniteDuration(duration: FiniteDuration): WireFormats.FiniteDuration = {
WireFormats.FiniteDuration.newBuilder()
.setValue(duration.length)
.setUnit(timeUnitToWire(duration.unit))
.build()
}
private def buildAddressData(address: Address): WireFormats.AddressData = {
val builder = WireFormats.AddressData.newBuilder()
address match {
case Address(protocol, system, Some(host), Some(port))
builder.setProtocol(protocol)
builder.setSystem(system)
builder.setHostname(host)
builder.setPort(port)
builder.build()
case _ throw new IllegalArgumentException(s"Address [$address] could not be serialized: host or port missing.")
}
}
private val IdentifyManifest = "A"
private val ActorIdentityManifest = "B"
private val OptionManifest = "C"
@ -114,6 +255,18 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW
private val RemoteWatcherHBManifest = "RWHB"
private val RemoteWatcherHBRespManifest = "RWHR"
private val ActorInitializationExceptionManifest = "AIEX"
private val LocalScopeManifest = "LS"
private val RemoteScopeManifest = "RS"
private val ConfigManifest = "CF"
private val FromConfigManifest = "FC"
private val DefaultResizerManifest = "DR"
private val BalancingPoolManifest = "ROBAP"
private val BroadcastPoolManifest = "ROBP"
private val RandomPoolManifest = "RORP"
private val RoundRobinPoolManifest = "RORRP"
private val ScatterGatherPoolManifest = "ROSGP"
private val TailChoppingPoolManifest = "ROTCP"
private val RemoteRouterConfigManifest = "RORRC"
private val fromBinaryMap = Map[String, Array[Byte] AnyRef](
IdentifyManifest deserializeIdentify,
@ -129,23 +282,48 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW
KillManifest ((_) Kill),
RemoteWatcherHBManifest ((_) RemoteWatcher.Heartbeat),
RemoteWatcherHBRespManifest deserializeHeartbeatRsp,
ActorInitializationExceptionManifest deserializeActorInitializationException)
ActorInitializationExceptionManifest deserializeActorInitializationException,
LocalScopeManifest ((_) LocalScope),
RemoteScopeManifest deserializeRemoteScope,
ConfigManifest deserializeConfig,
FromConfigManifest deserializeFromConfig,
DefaultResizerManifest deserializeDefaultResizer,
BalancingPoolManifest deserializeBalancingPool,
BroadcastPoolManifest deserializeBroadcastPool,
RandomPoolManifest deserializeRandomPool,
RoundRobinPoolManifest deserializeRoundRobinPool,
ScatterGatherPoolManifest deserializeScatterGatherPool,
TailChoppingPoolManifest deserializeTailChoppingPool,
RemoteRouterConfigManifest deserializeRemoteRouterConfig
)
override def manifest(o: AnyRef): String =
o match {
case _: Identify IdentifyManifest
case _: ActorIdentity ActorIdentityManifest
case _: Option[Any] OptionManifest
case _: Optional[_] OptionalManifest
case _: ActorRef ActorRefManifest
case _: Status.Success StatusSuccessManifest
case _: Status.Failure StatusFailureManifest
case _: ActorInitializationException ActorInitializationExceptionManifest
case _: Throwable ThrowableManifest
case PoisonPill PoisonPillManifest
case Kill KillManifest
case RemoteWatcher.Heartbeat RemoteWatcherHBManifest
case _: RemoteWatcher.HeartbeatRsp RemoteWatcherHBRespManifest
case _: Identify IdentifyManifest
case _: ActorIdentity ActorIdentityManifest
case _: Option[Any] OptionManifest
case _: Optional[_] OptionalManifest
case _: ActorRef ActorRefManifest
case _: Status.Success StatusSuccessManifest
case _: Status.Failure StatusFailureManifest
case _: ActorInitializationException ActorInitializationExceptionManifest
case _: Throwable ThrowableManifest
case PoisonPill PoisonPillManifest
case Kill KillManifest
case RemoteWatcher.Heartbeat RemoteWatcherHBManifest
case _: RemoteWatcher.HeartbeatRsp RemoteWatcherHBRespManifest
case LocalScope LocalScopeManifest
case _: RemoteScope RemoteScopeManifest
case _: Config ConfigManifest
case _: FromConfig FromConfigManifest
case _: DefaultResizer DefaultResizerManifest
case _: BalancingPool BalancingPoolManifest
case _: BroadcastPool BroadcastPoolManifest
case _: RandomPool RandomPoolManifest
case _: RoundRobinPool RoundRobinPoolManifest
case _: ScatterGatherFirstCompletedPool ScatterGatherPoolManifest
case _: TailChoppingPool TailChoppingPoolManifest
case _: RemoteRouterConfig RemoteRouterConfigManifest
case _
throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]")
}
@ -224,4 +402,142 @@ class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerW
payloadSupport.deserializePayload(serializedEx.getCause).asInstanceOf[Throwable])
}
private def deserializeRemoteScope(bytes: Array[Byte]): RemoteScope = {
val rs = WireFormats.RemoteScope.parseFrom(bytes)
RemoteScope(
deserializeAddressData(rs.getNode)
)
}
private def deserializeConfig(bytes: Array[Byte]): Config = {
if (bytes.isEmpty) EmptyConfig
else ConfigFactory.parseString(new String(bytes, StandardCharsets.UTF_8))
}
private def deserializeFromConfig(bytes: Array[Byte]): FromConfig =
if (bytes.isEmpty) FromConfig
else {
val fc = WireFormats.FromConfig.parseFrom(bytes)
FromConfig(
resizer = if (fc.hasResizer) Some(payloadSupport.deserializePayload(fc.getResizer).asInstanceOf[Resizer]) else None,
routerDispatcher = if (fc.hasRouterDispatcher) fc.getRouterDispatcher else Dispatchers.DefaultDispatcherId
)
}
private def deserializeBalancingPool(bytes: Array[Byte]): BalancingPool = {
val bp = WireFormats.GenericRoutingPool.parseFrom(bytes)
BalancingPool(
nrOfInstances = bp.getNrOfInstances,
routerDispatcher = if (bp.hasRouterDispatcher) bp.getRouterDispatcher else Dispatchers.DefaultDispatcherId)
}
private def deserializeBroadcastPool(bytes: Array[Byte]): BroadcastPool = {
val bp = WireFormats.GenericRoutingPool.parseFrom(bytes)
BroadcastPool(
nrOfInstances = bp.getNrOfInstances,
resizer =
if (bp.hasResizer) Some(payloadSupport.deserializePayload(bp.getResizer).asInstanceOf[Resizer])
else None,
routerDispatcher = if (bp.hasRouterDispatcher) bp.getRouterDispatcher else Dispatchers.DefaultDispatcherId,
usePoolDispatcher = bp.getUsePoolDispatcher
)
}
private def deserializeRandomPool(bytes: Array[Byte]): RandomPool = {
val rp = WireFormats.GenericRoutingPool.parseFrom(bytes)
RandomPool(
nrOfInstances = rp.getNrOfInstances,
resizer =
if (rp.hasResizer) Some(payloadSupport.deserializePayload(rp.getResizer).asInstanceOf[Resizer])
else None,
routerDispatcher = if (rp.hasRouterDispatcher) rp.getRouterDispatcher else Dispatchers.DefaultDispatcherId,
usePoolDispatcher = rp.getUsePoolDispatcher
)
}
private def deserializeRoundRobinPool(bytes: Array[Byte]): RoundRobinPool = {
val rp = WireFormats.GenericRoutingPool.parseFrom(bytes)
RoundRobinPool(
nrOfInstances = rp.getNrOfInstances,
resizer =
if (rp.hasResizer) Some(payloadSupport.deserializePayload(rp.getResizer).asInstanceOf[Resizer])
else None,
routerDispatcher = if (rp.hasRouterDispatcher) rp.getRouterDispatcher else Dispatchers.DefaultDispatcherId,
usePoolDispatcher = rp.getUsePoolDispatcher
)
}
private def deserializeScatterGatherPool(bytes: Array[Byte]): ScatterGatherFirstCompletedPool = {
val sgp = WireFormats.ScatterGatherPool.parseFrom(bytes)
ScatterGatherFirstCompletedPool(
nrOfInstances = sgp.getGeneric.getNrOfInstances,
resizer =
if (sgp.getGeneric.hasResizer) Some(payloadSupport.deserializePayload(sgp.getGeneric.getResizer).asInstanceOf[Resizer])
else None,
within = deserializeFiniteDuration(sgp.getWithin),
routerDispatcher =
if (sgp.getGeneric.hasRouterDispatcher) sgp.getGeneric.getRouterDispatcher
else Dispatchers.DefaultDispatcherId
)
}
private def deserializeTailChoppingPool(bytes: Array[Byte]): TailChoppingPool = {
val tcp = WireFormats.TailChoppingPool.parseFrom(bytes)
TailChoppingPool(
nrOfInstances = tcp.getGeneric.getNrOfInstances,
resizer =
if (tcp.getGeneric.hasResizer) Some(payloadSupport.deserializePayload(tcp.getGeneric.getResizer).asInstanceOf[Resizer])
else None,
routerDispatcher = if (tcp.getGeneric.hasRouterDispatcher) tcp.getGeneric.getRouterDispatcher else Dispatchers.DefaultDispatcherId,
usePoolDispatcher = tcp.getGeneric.getUsePoolDispatcher,
within = deserializeFiniteDuration(tcp.getWithin),
interval = deserializeFiniteDuration(tcp.getInterval)
)
}
private def deserializeRemoteRouterConfig(bytes: Array[Byte]): RemoteRouterConfig = {
val rrc = WireFormats.RemoteRouterConfig.parseFrom(bytes)
RemoteRouterConfig(
local = payloadSupport.deserializePayload(rrc.getLocal).asInstanceOf[Pool],
nodes = rrc.getNodesList.asScala.map(deserializeAddressData)
)
}
private def deserializeDefaultResizer(bytes: Array[Byte]): DefaultResizer = {
val dr = WireFormats.DefaultResizer.parseFrom(bytes)
DefaultResizer(
lowerBound = dr.getLowerBound,
upperBound = dr.getUpperBound,
pressureThreshold = dr.getPressureThreshold,
rampupRate = dr.getRampupRate,
backoffThreshold = dr.getBackoffThreshold,
backoffRate = dr.getBackoffRate,
messagesPerResize = dr.getMessagesPerResize
)
}
private def deserializeTimeUnit(unit: WireFormats.TimeUnit): TimeUnit = unit match {
case WireFormats.TimeUnit.NANOSECONDS TimeUnit.NANOSECONDS
case WireFormats.TimeUnit.MICROSECONDS TimeUnit.MICROSECONDS
case WireFormats.TimeUnit.MILLISECONDS TimeUnit.MILLISECONDS
case WireFormats.TimeUnit.SECONDS TimeUnit.SECONDS
case WireFormats.TimeUnit.MINUTES TimeUnit.MINUTES
case WireFormats.TimeUnit.HOURS TimeUnit.HOURS
case WireFormats.TimeUnit.DAYS TimeUnit.DAYS
}
private def deserializeFiniteDuration(duration: WireFormats.FiniteDuration): FiniteDuration =
FiniteDuration(
duration.getValue,
deserializeTimeUnit(duration.getUnit)
)
private def deserializeAddressData(address: WireFormats.AddressData): Address = {
Address(
address.getProtocol,
address.getSystem,
address.getHostname,
address.getPort
)
}
}