=cls #17908 Protobuf serializer for cluster sharding
This commit is contained in:
parent
9b6b43335f
commit
46811ca1ce
9 changed files with 6178 additions and 23 deletions
File diff suppressed because it is too large
Load diff
|
|
@ -0,0 +1,50 @@
|
|||
/**
|
||||
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
option java_package = "akka.cluster.sharding.protobuf.msg";
|
||||
option optimize_for = SPEED;
|
||||
|
||||
message CoordinatorState {
|
||||
|
||||
message ShardEntry {
|
||||
required string shardId = 1;
|
||||
required string regionRef = 2;
|
||||
}
|
||||
|
||||
repeated ShardEntry shards = 1;
|
||||
repeated string regions = 2;
|
||||
repeated string regionProxies = 3;
|
||||
repeated string unallocatedShards = 4;
|
||||
}
|
||||
|
||||
message ActorRefMessage {
|
||||
required string ref = 1;
|
||||
}
|
||||
|
||||
message ShardIdMessage {
|
||||
required string shard = 1;
|
||||
}
|
||||
|
||||
message ShardHomeAllocated {
|
||||
required string shard = 1;
|
||||
required string region = 2;
|
||||
}
|
||||
|
||||
message ShardHome {
|
||||
required string shard = 1;
|
||||
required string region = 2;
|
||||
}
|
||||
|
||||
message EntityState {
|
||||
repeated string entities = 1;
|
||||
}
|
||||
|
||||
message EntityStarted {
|
||||
required string entityId = 1;
|
||||
}
|
||||
|
||||
message EntityStopped {
|
||||
required string entityId = 1;
|
||||
}
|
||||
|
||||
|
|
@ -91,3 +91,17 @@ akka.cluster.sharding {
|
|||
use-dispatcher = ""
|
||||
}
|
||||
# //#sharding-ext-config
|
||||
|
||||
|
||||
# Protobuf serializer for Cluster Sharding messages
|
||||
akka.actor {
|
||||
serializers {
|
||||
akka-sharding = "akka.cluster.sharding.protobuf.ClusterShardingMessageSerializer"
|
||||
}
|
||||
serialization-bindings {
|
||||
"akka.cluster.sharding.ClusterShardingSerializable" = akka-sharding
|
||||
}
|
||||
serialization-identifiers {
|
||||
"akka.cluster.sharding.protobuf.ClusterShardingMessageSerializer" = 13
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,9 @@
|
|||
/**
|
||||
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.cluster.sharding
|
||||
|
||||
/**
|
||||
* Marker trait for remote messages and persistent events/snapshots with special serializer.
|
||||
*/
|
||||
trait ClusterShardingSerializable extends Serializable
|
||||
|
|
@ -36,7 +36,9 @@ private[akka] object Shard {
|
|||
/**
|
||||
* A case class which represents a state change for the Shard
|
||||
*/
|
||||
sealed trait StateChange { val entityId: EntityId }
|
||||
sealed trait StateChange extends ClusterShardingSerializable {
|
||||
val entityId: EntityId
|
||||
}
|
||||
|
||||
/**
|
||||
* `State` change for starting an entity in this `Shard`
|
||||
|
|
@ -55,8 +57,8 @@ private[akka] object Shard {
|
|||
/**
|
||||
* Persistent state of the Shard.
|
||||
*/
|
||||
@SerialVersionUID(1L) final case class State private (
|
||||
entities: Set[EntityId] = Set.empty)
|
||||
@SerialVersionUID(1L) final case class State private[akka] (
|
||||
entities: Set[EntityId] = Set.empty) extends ClusterShardingSerializable
|
||||
|
||||
/**
|
||||
* Factory method for the [[akka.actor.Props]] of the [[Shard]] actor.
|
||||
|
|
|
|||
|
|
@ -159,11 +159,11 @@ object ShardCoordinator {
|
|||
/**
|
||||
* Messages sent to the coordinator
|
||||
*/
|
||||
sealed trait CoordinatorCommand
|
||||
sealed trait CoordinatorCommand extends ClusterShardingSerializable
|
||||
/**
|
||||
* Messages sent from the coordinator
|
||||
*/
|
||||
sealed trait CoordinatorMessage
|
||||
sealed trait CoordinatorMessage extends ClusterShardingSerializable
|
||||
/**
|
||||
* `ShardRegion` registers to `ShardCoordinator`, until it receives [[RegisterAck]].
|
||||
*/
|
||||
|
|
@ -218,24 +218,13 @@ object ShardCoordinator {
|
|||
*/
|
||||
@SerialVersionUID(1L) final case class ShardStopped(shard: ShardId) extends CoordinatorCommand
|
||||
|
||||
/**
|
||||
* Result of `allocateShard` is piped to self with this message.
|
||||
*/
|
||||
@SerialVersionUID(1L) final case class AllocateShardResult(
|
||||
shard: ShardId, shardRegion: Option[ActorRef], getShardHomeSender: ActorRef) extends CoordinatorCommand
|
||||
|
||||
/**
|
||||
* Result of `rebalance` is piped to self with this message.
|
||||
*/
|
||||
@SerialVersionUID(1L) final case class RebalanceResult(shards: Set[ShardId]) extends CoordinatorCommand
|
||||
|
||||
/**
|
||||
* `ShardRegion` requests full handoff to be able to shutdown gracefully.
|
||||
*/
|
||||
@SerialVersionUID(1L) final case class GracefulShutdownReq(shardRegion: ActorRef) extends CoordinatorCommand
|
||||
|
||||
// DomainEvents for the persistent state of the event sourced ShardCoordinator
|
||||
sealed trait DomainEvent
|
||||
sealed trait DomainEvent extends ClusterShardingSerializable
|
||||
@SerialVersionUID(1L) final case class ShardRegionRegistered(region: ActorRef) extends DomainEvent
|
||||
@SerialVersionUID(1L) final case class ShardRegionProxyRegistered(regionProxy: ActorRef) extends DomainEvent
|
||||
@SerialVersionUID(1L) final case class ShardRegionTerminated(region: ActorRef) extends DomainEvent
|
||||
|
|
@ -250,13 +239,13 @@ object ShardCoordinator {
|
|||
/**
|
||||
* Persistent state of the event sourced ShardCoordinator.
|
||||
*/
|
||||
@SerialVersionUID(1L) final case class State private (
|
||||
@SerialVersionUID(1L) final case class State private[akka] (
|
||||
// region for each shard
|
||||
shards: Map[ShardId, ActorRef] = Map.empty,
|
||||
// shards for each region
|
||||
regions: Map[ActorRef, Vector[ShardId]] = Map.empty,
|
||||
regionProxies: Set[ActorRef] = Set.empty,
|
||||
unallocatedShards: Set[ShardId] = Set.empty) {
|
||||
unallocatedShards: Set[ShardId] = Set.empty) extends ClusterShardingSerializable {
|
||||
|
||||
def updated(event: DomainEvent): State = event match {
|
||||
case ShardRegionRegistered(region) ⇒
|
||||
|
|
@ -309,6 +298,17 @@ object ShardCoordinator {
|
|||
|
||||
private final case class DelayedShardRegionTerminated(region: ActorRef)
|
||||
|
||||
/**
|
||||
* Result of `allocateShard` is piped to self with this message.
|
||||
*/
|
||||
private final case class AllocateShardResult(
|
||||
shard: ShardId, shardRegion: Option[ActorRef], getShardHomeSender: ActorRef)
|
||||
|
||||
/**
|
||||
* Result of `rebalance` is piped to self with this message.
|
||||
*/
|
||||
private final case class RebalanceResult(shards: Set[ShardId])
|
||||
|
||||
/**
|
||||
* INTERNAL API. Rebalancing process is performed by this actor.
|
||||
* It sends `BeginHandOff` to all `ShardRegion` actors followed by
|
||||
|
|
|
|||
|
|
@ -0,0 +1,276 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.cluster.sharding.protobuf
|
||||
|
||||
import java.io.{ ByteArrayInputStream, ByteArrayOutputStream }
|
||||
import java.util.zip.GZIPInputStream
|
||||
import java.util.zip.GZIPOutputStream
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.collection.breakOut
|
||||
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.cluster.sharding.Shard
|
||||
import akka.cluster.sharding.ShardCoordinator
|
||||
import akka.cluster.sharding.protobuf.msg.{ ClusterShardingMessages => sm }
|
||||
import akka.serialization.BaseSerializer
|
||||
import akka.serialization.Serialization
|
||||
import akka.serialization.SerializationExtension
|
||||
import akka.serialization.SerializerWithStringManifest
|
||||
import com.google.protobuf.MessageLite
|
||||
|
||||
/**
|
||||
* INTERNAL API: Protobuf serializer of ClusterSharding messages.
|
||||
*/
|
||||
private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSystem)
|
||||
extends SerializerWithStringManifest with BaseSerializer {
|
||||
import ShardCoordinator.Internal._
|
||||
import Shard.{ State ⇒ EntityState, EntityStarted, EntityStopped }
|
||||
|
||||
private lazy val serialization = SerializationExtension(system)
|
||||
|
||||
private final val BufferSize = 1024 * 4
|
||||
|
||||
private val CoordinatorStateManifest = "AA"
|
||||
private val ShardRegionRegisteredManifest = "AB"
|
||||
private val ShardRegionProxyRegisteredManifest = "AC"
|
||||
private val ShardRegionTerminatedManifest = "AD"
|
||||
private val ShardRegionProxyTerminatedManifest = "AE"
|
||||
private val ShardHomeAllocatedManifest = "AF"
|
||||
private val ShardHomeDeallocatedManifest = "AG"
|
||||
|
||||
private val RegisterManifest = "BA"
|
||||
private val RegisterProxyManifest = "BB"
|
||||
private val RegisterAckManifest = "BC"
|
||||
private val GetShardHomeManifest = "BD"
|
||||
private val ShardHomeManifest = "BE"
|
||||
private val HostShardManifest = "BF"
|
||||
private val ShardStartedManifest = "BG"
|
||||
private val BeginHandOffManifest = "BH"
|
||||
private val BeginHandOffAckManifest = "BI"
|
||||
private val HandOffManifest = "BJ"
|
||||
private val ShardStoppedManifest = "BK"
|
||||
private val GracefulShutdownReqManifest = "BL"
|
||||
|
||||
private val EntityStateManifest = "CA"
|
||||
private val EntityStartedManifest = "CB"
|
||||
private val EntityStoppedManifest = "CD"
|
||||
|
||||
private val fromBinaryMap = collection.immutable.HashMap[String, Array[Byte] ⇒ AnyRef](
|
||||
EntityStateManifest -> entityStateFromBinary,
|
||||
EntityStartedManifest -> entityStartedFromBinary,
|
||||
EntityStoppedManifest -> entityStoppedFromBinary,
|
||||
|
||||
CoordinatorStateManifest -> coordinatorStateFromBinary,
|
||||
ShardRegionRegisteredManifest -> { bytes ⇒ ShardRegionRegistered(actorRefMessageFromBinary(bytes)) },
|
||||
ShardRegionProxyRegisteredManifest -> { bytes ⇒ ShardRegionProxyRegistered(actorRefMessageFromBinary(bytes)) },
|
||||
ShardRegionTerminatedManifest -> { bytes ⇒ ShardRegionTerminated(actorRefMessageFromBinary(bytes)) },
|
||||
ShardRegionProxyTerminatedManifest -> { bytes ⇒ ShardRegionProxyTerminated(actorRefMessageFromBinary(bytes)) },
|
||||
ShardHomeAllocatedManifest -> shardHomeAllocatedFromBinary,
|
||||
ShardHomeDeallocatedManifest -> { bytes ⇒ ShardHomeDeallocated(shardIdMessageFromBinary(bytes)) },
|
||||
|
||||
RegisterManifest -> { bytes ⇒ Register(actorRefMessageFromBinary(bytes)) },
|
||||
RegisterProxyManifest -> { bytes ⇒ RegisterProxy(actorRefMessageFromBinary(bytes)) },
|
||||
RegisterAckManifest -> { bytes ⇒ RegisterAck(actorRefMessageFromBinary(bytes)) },
|
||||
GetShardHomeManifest -> { bytes ⇒ GetShardHome(shardIdMessageFromBinary(bytes)) },
|
||||
ShardHomeManifest -> shardHomeFromBinary,
|
||||
HostShardManifest -> { bytes ⇒ HostShard(shardIdMessageFromBinary(bytes)) },
|
||||
ShardStartedManifest -> { bytes ⇒ ShardStarted(shardIdMessageFromBinary(bytes)) },
|
||||
BeginHandOffManifest -> { bytes ⇒ BeginHandOff(shardIdMessageFromBinary(bytes)) },
|
||||
BeginHandOffAckManifest -> { bytes ⇒ BeginHandOffAck(shardIdMessageFromBinary(bytes)) },
|
||||
HandOffManifest -> { bytes ⇒ HandOff(shardIdMessageFromBinary(bytes)) },
|
||||
ShardStoppedManifest -> { bytes ⇒ ShardStopped(shardIdMessageFromBinary(bytes)) },
|
||||
GracefulShutdownReqManifest -> { bytes ⇒ GracefulShutdownReq(actorRefMessageFromBinary(bytes)) })
|
||||
|
||||
override def manifest(obj: AnyRef): String = obj match {
|
||||
case _: EntityState ⇒ EntityStateManifest
|
||||
case _: EntityStarted ⇒ EntityStartedManifest
|
||||
case _: EntityStopped ⇒ EntityStoppedManifest
|
||||
|
||||
case _: State ⇒ CoordinatorStateManifest
|
||||
case _: ShardRegionRegistered ⇒ ShardRegionRegisteredManifest
|
||||
case _: ShardRegionProxyRegistered ⇒ ShardRegionProxyRegisteredManifest
|
||||
case _: ShardRegionTerminated ⇒ ShardRegionTerminatedManifest
|
||||
case _: ShardRegionProxyTerminated ⇒ ShardRegionProxyTerminatedManifest
|
||||
case _: ShardHomeAllocated ⇒ ShardHomeAllocatedManifest
|
||||
case _: ShardHomeDeallocated ⇒ ShardHomeDeallocatedManifest
|
||||
|
||||
case _: Register ⇒ RegisterManifest
|
||||
case _: RegisterProxy ⇒ RegisterProxyManifest
|
||||
case _: RegisterAck ⇒ RegisterAckManifest
|
||||
case _: GetShardHome ⇒ GetShardHomeManifest
|
||||
case _: ShardHome ⇒ ShardHomeManifest
|
||||
case _: HostShard ⇒ HostShardManifest
|
||||
case _: ShardStarted ⇒ ShardStartedManifest
|
||||
case _: BeginHandOff ⇒ BeginHandOffManifest
|
||||
case _: BeginHandOffAck ⇒ BeginHandOffAckManifest
|
||||
case _: HandOff ⇒ HandOffManifest
|
||||
case _: ShardStopped ⇒ ShardStoppedManifest
|
||||
case _: GracefulShutdownReq ⇒ GracefulShutdownReqManifest
|
||||
case _ ⇒
|
||||
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
|
||||
}
|
||||
|
||||
override def toBinary(obj: AnyRef): Array[Byte] = obj match {
|
||||
case m: State ⇒ compress(coordinatorStateToProto(m))
|
||||
case ShardRegionRegistered(ref) ⇒ actorRefMessageToProto(ref).toByteArray
|
||||
case ShardRegionProxyRegistered(ref) ⇒ actorRefMessageToProto(ref).toByteArray
|
||||
case ShardRegionTerminated(ref) ⇒ actorRefMessageToProto(ref).toByteArray
|
||||
case ShardRegionProxyTerminated(ref) ⇒ actorRefMessageToProto(ref).toByteArray
|
||||
case m: ShardHomeAllocated ⇒ shardHomeAllocatedToProto(m).toByteArray
|
||||
case ShardHomeDeallocated(shardId) ⇒ shardIdMessageToProto(shardId).toByteArray
|
||||
|
||||
case Register(ref) ⇒ actorRefMessageToProto(ref).toByteArray
|
||||
case RegisterProxy(ref) ⇒ actorRefMessageToProto(ref).toByteArray
|
||||
case RegisterAck(ref) ⇒ actorRefMessageToProto(ref).toByteArray
|
||||
case GetShardHome(shardId) ⇒ shardIdMessageToProto(shardId).toByteArray
|
||||
case m: ShardHome ⇒ shardHomeToProto(m).toByteArray
|
||||
case HostShard(shardId) ⇒ shardIdMessageToProto(shardId).toByteArray
|
||||
case ShardStarted(shardId) ⇒ shardIdMessageToProto(shardId).toByteArray
|
||||
case BeginHandOff(shardId) ⇒ shardIdMessageToProto(shardId).toByteArray
|
||||
case BeginHandOffAck(shardId) ⇒ shardIdMessageToProto(shardId).toByteArray
|
||||
case HandOff(shardId) ⇒ shardIdMessageToProto(shardId).toByteArray
|
||||
case ShardStopped(shardId) ⇒ shardIdMessageToProto(shardId).toByteArray
|
||||
case GracefulShutdownReq(ref) ⇒
|
||||
actorRefMessageToProto(ref).toByteArray
|
||||
|
||||
case m: EntityState ⇒ entityStateToProto(m).toByteArray
|
||||
case m: EntityStarted ⇒ entityStartedToProto(m).toByteArray
|
||||
case m: EntityStopped ⇒ entityStoppedToProto(m).toByteArray
|
||||
case _ ⇒
|
||||
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
|
||||
}
|
||||
|
||||
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef =
|
||||
fromBinaryMap.get(manifest) match {
|
||||
case Some(f) ⇒ f(bytes)
|
||||
case None ⇒ throw new IllegalArgumentException(
|
||||
s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]")
|
||||
}
|
||||
|
||||
private def coordinatorStateToProto(state: State): sm.CoordinatorState = {
|
||||
val regions = state.regions.map {
|
||||
case (regionRef, _) ⇒ Serialization.serializedActorPath(regionRef)
|
||||
}.toVector.asJava
|
||||
|
||||
val builder = sm.CoordinatorState.newBuilder()
|
||||
|
||||
state.shards.foreach {
|
||||
case (shardId, regionRef) ⇒
|
||||
val b = sm.CoordinatorState.ShardEntry.newBuilder()
|
||||
.setShardId(shardId)
|
||||
.setRegionRef(Serialization.serializedActorPath(regionRef))
|
||||
builder.addShards(b)
|
||||
}
|
||||
state.regions.foreach {
|
||||
case (regionRef, _) ⇒ builder.addRegions(Serialization.serializedActorPath(regionRef))
|
||||
}
|
||||
state.regionProxies.foreach { ref ⇒ builder.addRegionProxies(Serialization.serializedActorPath(ref)) }
|
||||
state.unallocatedShards.foreach { builder.addUnallocatedShards }
|
||||
|
||||
builder.build()
|
||||
}
|
||||
|
||||
private def coordinatorStateFromBinary(bytes: Array[Byte]): State =
|
||||
coordinatorStateFromProto(sm.CoordinatorState.parseFrom(decompress(bytes)))
|
||||
|
||||
private def coordinatorStateFromProto(state: sm.CoordinatorState): State = {
|
||||
val shards: Map[String, ActorRef] =
|
||||
state.getShardsList.asScala.toVector.map { entry ⇒
|
||||
entry.getShardId -> resolveActorRef(entry.getRegionRef)
|
||||
}(breakOut)
|
||||
|
||||
val regionsZero: Map[ActorRef, Vector[String]] =
|
||||
state.getRegionsList.asScala.toVector.map(resolveActorRef(_) -> Vector.empty[String])(breakOut)
|
||||
val regions: Map[ActorRef, Vector[String]] =
|
||||
shards.foldLeft(regionsZero) { case (acc, (shardId, regionRef)) ⇒ acc.updated(regionRef, acc(regionRef) :+ shardId) }
|
||||
|
||||
val proxies: Set[ActorRef] = state.getRegionProxiesList.asScala.map { resolveActorRef }(breakOut)
|
||||
val unallocatedShards: Set[String] = state.getUnallocatedShardsList.asScala.toSet
|
||||
|
||||
State(shards, regions, proxies, unallocatedShards)
|
||||
}
|
||||
|
||||
private def actorRefMessageToProto(ref: ActorRef): sm.ActorRefMessage =
|
||||
sm.ActorRefMessage.newBuilder().setRef(Serialization.serializedActorPath(ref)).build()
|
||||
|
||||
private def actorRefMessageFromBinary(bytes: Array[Byte]): ActorRef =
|
||||
resolveActorRef(sm.ActorRefMessage.parseFrom(bytes).getRef)
|
||||
|
||||
private def shardIdMessageToProto(shardId: String): sm.ShardIdMessage =
|
||||
sm.ShardIdMessage.newBuilder().setShard(shardId).build()
|
||||
|
||||
private def shardIdMessageFromBinary(bytes: Array[Byte]): String =
|
||||
sm.ShardIdMessage.parseFrom(bytes).getShard
|
||||
|
||||
private def shardHomeAllocatedToProto(evt: ShardHomeAllocated): sm.ShardHomeAllocated =
|
||||
sm.ShardHomeAllocated.newBuilder().setShard(evt.shard)
|
||||
.setRegion(Serialization.serializedActorPath(evt.region)).build()
|
||||
|
||||
private def shardHomeAllocatedFromBinary(bytes: Array[Byte]): ShardHomeAllocated = {
|
||||
val m = sm.ShardHomeAllocated.parseFrom(bytes)
|
||||
ShardHomeAllocated(m.getShard, resolveActorRef(m.getRegion))
|
||||
}
|
||||
|
||||
private def shardHomeToProto(m: ShardHome): sm.ShardHome =
|
||||
sm.ShardHome.newBuilder().setShard(m.shard)
|
||||
.setRegion(Serialization.serializedActorPath(m.ref)).build()
|
||||
|
||||
private def shardHomeFromBinary(bytes: Array[Byte]): ShardHome = {
|
||||
val m = sm.ShardHome.parseFrom(bytes)
|
||||
ShardHome(m.getShard, resolveActorRef(m.getRegion))
|
||||
}
|
||||
|
||||
private def entityStateToProto(m: EntityState): sm.EntityState = {
|
||||
val b = sm.EntityState.newBuilder()
|
||||
m.entities.foreach(b.addEntities)
|
||||
b.build()
|
||||
}
|
||||
|
||||
private def entityStateFromBinary(bytes: Array[Byte]): EntityState =
|
||||
EntityState(sm.EntityState.parseFrom(bytes).getEntitiesList.asScala.toSet)
|
||||
|
||||
private def entityStartedToProto(evt: EntityStarted): sm.EntityStarted =
|
||||
sm.EntityStarted.newBuilder().setEntityId(evt.entityId).build()
|
||||
|
||||
private def entityStartedFromBinary(bytes: Array[Byte]): EntityStarted =
|
||||
EntityStarted(sm.EntityStarted.parseFrom(bytes).getEntityId)
|
||||
|
||||
private def entityStoppedToProto(evt: EntityStopped): sm.EntityStopped =
|
||||
sm.EntityStopped.newBuilder().setEntityId(evt.entityId).build()
|
||||
|
||||
private def entityStoppedFromBinary(bytes: Array[Byte]): EntityStopped =
|
||||
EntityStopped(sm.EntityStopped.parseFrom(bytes).getEntityId)
|
||||
|
||||
private def resolveActorRef(path: String): ActorRef = {
|
||||
system.provider.resolveActorRef(path)
|
||||
}
|
||||
|
||||
private def compress(msg: MessageLite): Array[Byte] = {
|
||||
val bos = new ByteArrayOutputStream(BufferSize)
|
||||
val zip = new GZIPOutputStream(bos)
|
||||
msg.writeTo(zip)
|
||||
zip.close()
|
||||
bos.toByteArray
|
||||
}
|
||||
|
||||
private def decompress(bytes: Array[Byte]): Array[Byte] = {
|
||||
val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
|
||||
val out = new ByteArrayOutputStream()
|
||||
val buffer = new Array[Byte](BufferSize)
|
||||
|
||||
@tailrec def readChunk(): Unit = in.read(buffer) match {
|
||||
case -1 ⇒ ()
|
||||
case n ⇒
|
||||
out.write(buffer, 0, n)
|
||||
readChunk()
|
||||
}
|
||||
|
||||
readChunk()
|
||||
out.toByteArray
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,73 @@
|
|||
/**
|
||||
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.cluster.sharding.protobuf
|
||||
|
||||
import akka.actor.{ ExtendedActorSystem, Address }
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.actor.Props
|
||||
import akka.cluster.sharding.ShardCoordinator
|
||||
import akka.cluster.sharding.Shard
|
||||
|
||||
class ClusterShardingMessageSerializerSpec extends AkkaSpec {
|
||||
import ShardCoordinator.Internal._
|
||||
|
||||
val serializer = new ClusterShardingMessageSerializer(system.asInstanceOf[ExtendedActorSystem])
|
||||
|
||||
val region1 = system.actorOf(Props.empty, "region1")
|
||||
val region2 = system.actorOf(Props.empty, "region2")
|
||||
val region3 = system.actorOf(Props.empty, "region3")
|
||||
val regionProxy1 = system.actorOf(Props.empty, "regionProxy1")
|
||||
val regionProxy2 = system.actorOf(Props.empty, "regionProxy2")
|
||||
|
||||
def checkSerialization(obj: AnyRef): Unit = {
|
||||
val blob = serializer.toBinary(obj)
|
||||
val ref = serializer.fromBinary(blob, serializer.manifest(obj))
|
||||
ref should ===(obj)
|
||||
}
|
||||
|
||||
"ClusterShardingMessageSerializer" must {
|
||||
|
||||
"be able to serializable ShardCoordinator snapshot State" in {
|
||||
val state = State(
|
||||
shards = Map("a" -> region1, "b" -> region2, "c" -> region2),
|
||||
regions = Map(region1 -> Vector("a"), region2 -> Vector("b", "c"), region3 -> Vector.empty[String]),
|
||||
regionProxies = Set(regionProxy1, regionProxy2),
|
||||
unallocatedShards = Set("d"))
|
||||
checkSerialization(state)
|
||||
}
|
||||
|
||||
"be able to serializable ShardCoordinator domain events" in {
|
||||
checkSerialization(ShardRegionRegistered(region1))
|
||||
checkSerialization(ShardRegionProxyRegistered(regionProxy1))
|
||||
checkSerialization(ShardRegionTerminated(region1))
|
||||
checkSerialization(ShardRegionProxyTerminated(regionProxy1))
|
||||
checkSerialization(ShardHomeAllocated("a", region1))
|
||||
checkSerialization(ShardHomeDeallocated("a"))
|
||||
}
|
||||
|
||||
"be able to serializable ShardCoordinator remote messages" in {
|
||||
checkSerialization(Register(region1))
|
||||
checkSerialization(RegisterProxy(regionProxy1))
|
||||
checkSerialization(RegisterAck(region1))
|
||||
checkSerialization(GetShardHome("a"))
|
||||
checkSerialization(ShardHome("a", region1))
|
||||
checkSerialization(HostShard("a"))
|
||||
checkSerialization(ShardStarted("a"))
|
||||
checkSerialization(BeginHandOff("a"))
|
||||
checkSerialization(BeginHandOffAck("a"))
|
||||
checkSerialization(HandOff("a"))
|
||||
checkSerialization(ShardStopped("a"))
|
||||
checkSerialization(GracefulShutdownReq(region1))
|
||||
}
|
||||
|
||||
"be able to serializable PersistentShard snapshot state" in {
|
||||
checkSerialization(Shard.State(Set("e1", "e2", "e3")))
|
||||
}
|
||||
|
||||
"be able to serializable PersistentShard domain events" in {
|
||||
checkSerialization(Shard.EntityStarted("e1"))
|
||||
checkSerialization(Shard.EntityStopped("e1"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -30,9 +30,9 @@ import scala.collection.immutable.TreeMap
|
|||
import akka.serialization.SerializerWithStringManifest
|
||||
|
||||
/**
|
||||
* Protobuf serializer of DistributedPubSubMediator messages.
|
||||
* INTERNAL API: Protobuf serializer of DistributedPubSubMediator messages.
|
||||
*/
|
||||
class DistributedPubSubMessageSerializer(val system: ExtendedActorSystem)
|
||||
private[akka] class DistributedPubSubMessageSerializer(val system: ExtendedActorSystem)
|
||||
extends SerializerWithStringManifest with BaseSerializer {
|
||||
|
||||
private lazy val serialization = SerializationExtension(system)
|
||||
|
|
@ -79,7 +79,7 @@ class DistributedPubSubMessageSerializer(val system: ExtendedActorSystem)
|
|||
s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]")
|
||||
}
|
||||
|
||||
def compress(msg: MessageLite): Array[Byte] = {
|
||||
private def compress(msg: MessageLite): Array[Byte] = {
|
||||
val bos = new ByteArrayOutputStream(BufferSize)
|
||||
val zip = new GZIPOutputStream(bos)
|
||||
msg.writeTo(zip)
|
||||
|
|
@ -87,7 +87,7 @@ class DistributedPubSubMessageSerializer(val system: ExtendedActorSystem)
|
|||
bos.toByteArray
|
||||
}
|
||||
|
||||
def decompress(bytes: Array[Byte]): Array[Byte] = {
|
||||
private def decompress(bytes: Array[Byte]): Array[Byte] = {
|
||||
val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
|
||||
val out = new ByteArrayOutputStream()
|
||||
val buffer = new Array[Byte](BufferSize)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue