Merge pull request #25625 from btomala/feature/25348_missing_serializers_for_shard_region
Protobuf serializers for ShardRegionStats
This commit is contained in:
commit
1b8da91b45
4 changed files with 1306 additions and 14 deletions
File diff suppressed because it is too large
Load diff
|
|
@ -53,6 +53,15 @@ message ShardStats {
|
|||
required int32 entityCount = 2;
|
||||
}
|
||||
|
||||
message ShardRegionStats {
|
||||
repeated MapFieldEntry stats = 1;
|
||||
}
|
||||
|
||||
message MapFieldEntry {
|
||||
optional string key = 1;
|
||||
optional int32 value = 2;
|
||||
}
|
||||
|
||||
message StartEntity {
|
||||
required string entityId = 1;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ import akka.serialization.SerializerWithStringManifest
|
|||
import akka.protobuf.MessageLite
|
||||
import java.io.NotSerializableException
|
||||
|
||||
import akka.cluster.sharding.ShardRegion.{ StartEntity, StartEntityAck }
|
||||
import akka.cluster.sharding.ShardRegion._
|
||||
|
||||
/**
|
||||
* INTERNAL API: Protobuf serializer of ClusterSharding messages.
|
||||
|
|
@ -65,6 +65,8 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
|
|||
|
||||
private val GetShardStatsManifest = "DA"
|
||||
private val ShardStatsManifest = "DB"
|
||||
private val GetShardRegionStatsManifest = "DC"
|
||||
private val ShardRegionStatsManifest = "DD"
|
||||
|
||||
private val fromBinaryMap = collection.immutable.HashMap[String, Array[Byte] ⇒ AnyRef](
|
||||
EntityStateManifest → entityStateFromBinary,
|
||||
|
|
@ -94,6 +96,8 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
|
|||
|
||||
GetShardStatsManifest → { bytes ⇒ GetShardStats },
|
||||
ShardStatsManifest → { bytes ⇒ shardStatsFromBinary(bytes) },
|
||||
GetShardRegionStatsManifest → { bytes ⇒ GetShardRegionStats },
|
||||
ShardRegionStatsManifest → { bytes ⇒ shardRegionStatsFromBinary(bytes) },
|
||||
|
||||
StartEntityManifest → { startEntityFromBinary(_) },
|
||||
StartEntityAckManifest → { startEntityAckFromBinary(_) }
|
||||
|
|
@ -130,6 +134,8 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
|
|||
|
||||
case GetShardStats ⇒ GetShardStatsManifest
|
||||
case _: ShardStats ⇒ ShardStatsManifest
|
||||
case GetShardRegionStats ⇒ GetShardRegionStatsManifest
|
||||
case _: ShardRegionStats ⇒ ShardRegionStatsManifest
|
||||
case _ ⇒
|
||||
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
|
||||
}
|
||||
|
|
@ -157,15 +163,17 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
|
|||
case GracefulShutdownReq(ref) ⇒
|
||||
actorRefMessageToProto(ref).toByteArray
|
||||
|
||||
case m: EntityState ⇒ entityStateToProto(m).toByteArray
|
||||
case m: EntityStarted ⇒ entityStartedToProto(m).toByteArray
|
||||
case m: EntityStopped ⇒ entityStoppedToProto(m).toByteArray
|
||||
case m: EntityState ⇒ entityStateToProto(m).toByteArray
|
||||
case m: EntityStarted ⇒ entityStartedToProto(m).toByteArray
|
||||
case m: EntityStopped ⇒ entityStoppedToProto(m).toByteArray
|
||||
|
||||
case s: StartEntity ⇒ startEntityToByteArray(s)
|
||||
case s: StartEntityAck ⇒ startEntityAckToByteArray(s)
|
||||
case s: StartEntity ⇒ startEntityToByteArray(s)
|
||||
case s: StartEntityAck ⇒ startEntityAckToByteArray(s)
|
||||
|
||||
case GetShardStats ⇒ Array.emptyByteArray
|
||||
case m: ShardStats ⇒ shardStatsToProto(m).toByteArray
|
||||
case GetShardStats ⇒ Array.emptyByteArray
|
||||
case m: ShardStats ⇒ shardStatsToProto(m).toByteArray
|
||||
case GetShardRegionStats ⇒ Array.emptyByteArray
|
||||
case m: ShardRegionStats ⇒ shardRegionStatsToProto(m).toByteArray
|
||||
|
||||
case _ ⇒
|
||||
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
|
||||
|
|
@ -280,6 +288,21 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
|
|||
ShardStats(parsed.getShard, parsed.getEntityCount)
|
||||
}
|
||||
|
||||
private def shardRegionStatsToProto(evt: ShardRegionStats): sm.ShardRegionStats = {
|
||||
val b = sm.ShardRegionStats.newBuilder()
|
||||
evt.stats.foreach {
|
||||
case (sid, no) ⇒
|
||||
b.addStats(sm.MapFieldEntry.newBuilder().setKey(sid).setValue(no).build())
|
||||
}
|
||||
b.build()
|
||||
}
|
||||
|
||||
private def shardRegionStatsFromBinary(bytes: Array[Byte]): ShardRegionStats = {
|
||||
val parsed = sm.ShardRegionStats.parseFrom(bytes)
|
||||
val stats: Map[String, Int] = parsed.getStatsList.asScala.map(e ⇒ e.getKey -> e.getValue)(breakOut)
|
||||
ShardRegionStats(stats)
|
||||
}
|
||||
|
||||
private def startEntityToByteArray(s: StartEntity): Array[Byte] = {
|
||||
val builder = sm.StartEntity.newBuilder()
|
||||
builder.setEntityId(s.entityId)
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ package akka.cluster.sharding.protobuf
|
|||
import akka.actor.ExtendedActorSystem
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.actor.Props
|
||||
import akka.cluster.sharding.ShardRegion.ShardId
|
||||
import akka.cluster.sharding.{ Shard, ShardCoordinator, ShardRegion }
|
||||
|
||||
class ClusterShardingMessageSerializerSpec extends AkkaSpec {
|
||||
|
|
@ -78,6 +79,15 @@ class ClusterShardingMessageSerializerSpec extends AkkaSpec {
|
|||
checkSerialization(Shard.ShardStats("a", 23))
|
||||
}
|
||||
|
||||
"be able to serialize GetShardRegionStats" in {
|
||||
checkSerialization(ShardRegion.GetShardRegionStats)
|
||||
}
|
||||
|
||||
"be able to serialize ShardRegionStats" in {
|
||||
checkSerialization(ShardRegion.ShardRegionStats(Map.empty[ShardId, Int]))
|
||||
checkSerialization(ShardRegion.ShardRegionStats(Map[ShardId, Int]("a" -> 23)))
|
||||
}
|
||||
|
||||
"be able to serialize StartEntity" in {
|
||||
checkSerialization(ShardRegion.StartEntity("42"))
|
||||
checkSerialization(ShardRegion.StartEntityAck("13", "37"))
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue