=clt #18232 Serializer for ClusterSingleton
This commit is contained in:
parent
7693f1e5d3
commit
33e05ea59a
4 changed files with 118 additions and 5 deletions
|
|
@ -153,3 +153,16 @@ akka.cluster.singleton-proxy {
|
|||
buffer-size = 1000
|
||||
}
|
||||
# //#singleton-proxy-config
|
||||
|
||||
# Serializer for cluster ClusterSingleton messages
|
||||
akka.actor {
|
||||
serializers {
|
||||
akka-singleton = "akka.cluster.singleton.protobuf.ClusterSingletonMessageSerializer"
|
||||
}
|
||||
serialization-bindings {
|
||||
"akka.cluster.singleton.ClusterSingletonMessage" = akka-singleton
|
||||
}
|
||||
serialization-identifiers {
|
||||
"akka.cluster.singleton.protobuf.ClusterSingletonMessageSerializer" = 14
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -111,6 +111,11 @@ final class ClusterSingletonManagerSettings(
|
|||
new ClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval)
|
||||
}
|
||||
|
||||
/**
|
||||
* Marker trait for remote messages with special serializer.
|
||||
*/
|
||||
sealed trait ClusterSingletonMessage extends Serializable
|
||||
|
||||
object ClusterSingletonManager {
|
||||
|
||||
/**
|
||||
|
|
@ -136,25 +141,25 @@ object ClusterSingletonManager {
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private object Internal {
|
||||
private[akka] object Internal {
|
||||
/**
|
||||
* Sent from new oldest to previous oldest to initiate the
|
||||
* hand-over process. `HandOverInProgress` and `HandOverDone`
|
||||
* are expected replies.
|
||||
*/
|
||||
case object HandOverToMe
|
||||
case object HandOverToMe extends ClusterSingletonMessage
|
||||
/**
|
||||
* Confirmation by the previous oldest that the hand
|
||||
* over process, shut down of the singleton actor, has
|
||||
* started.
|
||||
*/
|
||||
case object HandOverInProgress
|
||||
case object HandOverInProgress extends ClusterSingletonMessage
|
||||
/**
|
||||
* Confirmation by the previous oldest that the singleton
|
||||
* actor has been terminated and the hand-over process is
|
||||
* completed.
|
||||
*/
|
||||
case object HandOverDone
|
||||
case object HandOverDone extends ClusterSingletonMessage
|
||||
/**
|
||||
* Sent from from previous oldest to new oldest to
|
||||
* initiate the normal hand-over process.
|
||||
|
|
@ -162,7 +167,7 @@ object ClusterSingletonManager {
|
|||
* oldest immediately, without knowing who was previous
|
||||
* oldest.
|
||||
*/
|
||||
case object TakeOverFromMe
|
||||
case object TakeOverFromMe extends ClusterSingletonMessage
|
||||
|
||||
final case class HandOverRetry(count: Int)
|
||||
final case class TakeOverRetry(count: Int)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,63 @@
|
|||
/**
|
||||
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.cluster.singleton.protobuf
|
||||
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.cluster.singleton.ClusterSingletonManager.Internal.HandOverDone
|
||||
import akka.cluster.singleton.ClusterSingletonManager.Internal.HandOverInProgress
|
||||
import akka.cluster.singleton.ClusterSingletonManager.Internal.HandOverToMe
|
||||
import akka.cluster.singleton.ClusterSingletonManager.Internal.TakeOverFromMe
|
||||
import akka.serialization.BaseSerializer
|
||||
import akka.serialization.SerializationExtension
|
||||
import akka.serialization.SerializerWithStringManifest
|
||||
|
||||
/**
|
||||
* INTERNAL API: Serializer of ClusterSingleton messages.
|
||||
* It is actually not using protobuf, but if we add more messages to
|
||||
* the ClusterSingleton we want to make protobuf representations of them.
|
||||
*/
|
||||
private[akka] class ClusterSingletonMessageSerializer(val system: ExtendedActorSystem)
|
||||
extends SerializerWithStringManifest with BaseSerializer {
|
||||
|
||||
private lazy val serialization = SerializationExtension(system)
|
||||
|
||||
private val HandOverToMeManifest = "A"
|
||||
private val HandOverInProgressManifest = "B"
|
||||
private val HandOverDoneManifest = "C"
|
||||
private val TakeOverFromMeManifest = "D"
|
||||
|
||||
private val emptyByteArray = Array.empty[Byte]
|
||||
|
||||
private val fromBinaryMap = collection.immutable.HashMap[String, Array[Byte] ⇒ AnyRef](
|
||||
HandOverToMeManifest -> { _ ⇒ HandOverToMe },
|
||||
HandOverInProgressManifest -> { _ ⇒ HandOverInProgress },
|
||||
HandOverDoneManifest -> { _ ⇒ HandOverDone },
|
||||
TakeOverFromMeManifest -> { _ ⇒ TakeOverFromMe })
|
||||
|
||||
override def manifest(obj: AnyRef): String = obj match {
|
||||
case HandOverToMe ⇒ HandOverToMeManifest
|
||||
case HandOverInProgress ⇒ HandOverInProgressManifest
|
||||
case HandOverDone ⇒ HandOverDoneManifest
|
||||
case TakeOverFromMe ⇒ TakeOverFromMeManifest
|
||||
case _ ⇒
|
||||
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
|
||||
}
|
||||
|
||||
override def toBinary(obj: AnyRef): Array[Byte] = obj match {
|
||||
case HandOverToMe ⇒ emptyByteArray
|
||||
case HandOverInProgress ⇒ emptyByteArray
|
||||
case HandOverDone ⇒ emptyByteArray
|
||||
case TakeOverFromMe ⇒ emptyByteArray
|
||||
case _ ⇒
|
||||
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
|
||||
}
|
||||
|
||||
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef =
|
||||
fromBinaryMap.get(manifest) match {
|
||||
case Some(f) ⇒ f(bytes)
|
||||
case None ⇒ throw new IllegalArgumentException(
|
||||
s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]")
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,32 @@
|
|||
/**
|
||||
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.cluster.singleton.protobuf
|
||||
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.cluster.singleton.ClusterSingletonManager.Internal.HandOverDone
|
||||
import akka.cluster.singleton.ClusterSingletonManager.Internal.HandOverInProgress
|
||||
import akka.cluster.singleton.ClusterSingletonManager.Internal.HandOverToMe
|
||||
import akka.cluster.singleton.ClusterSingletonManager.Internal.TakeOverFromMe
|
||||
|
||||
class ClusterSingletonMessageSerializerSpec extends AkkaSpec {
|
||||
|
||||
val serializer = new ClusterSingletonMessageSerializer(system.asInstanceOf[ExtendedActorSystem])
|
||||
|
||||
def checkSerialization(obj: AnyRef): Unit = {
|
||||
val blob = serializer.toBinary(obj)
|
||||
val ref = serializer.fromBinary(blob, serializer.manifest(obj))
|
||||
ref should ===(obj)
|
||||
}
|
||||
|
||||
"ClusterSingletonMessages" must {
|
||||
|
||||
"be serializable" in {
|
||||
checkSerialization(HandOverDone)
|
||||
checkSerialization(HandOverInProgress)
|
||||
checkSerialization(HandOverToMe)
|
||||
checkSerialization(TakeOverFromMe)
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue