diff --git a/akka-cluster-tools/src/main/resources/reference.conf b/akka-cluster-tools/src/main/resources/reference.conf index ed792f7235..a933cd5837 100644 --- a/akka-cluster-tools/src/main/resources/reference.conf +++ b/akka-cluster-tools/src/main/resources/reference.conf @@ -153,3 +153,16 @@ akka.cluster.singleton-proxy { buffer-size = 1000 } # //#singleton-proxy-config + +# Serializer for cluster ClusterSingleton messages +akka.actor { + serializers { + akka-singleton = "akka.cluster.singleton.protobuf.ClusterSingletonMessageSerializer" + } + serialization-bindings { + "akka.cluster.singleton.ClusterSingletonMessage" = akka-singleton + } + serialization-identifiers { + "akka.cluster.singleton.protobuf.ClusterSingletonMessageSerializer" = 14 + } +} diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala index 366c4f3e55..8d4cd3c8d7 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala @@ -111,6 +111,11 @@ final class ClusterSingletonManagerSettings( new ClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval) } +/** + * Marker trait for remote messages with special serializer. + */ +sealed trait ClusterSingletonMessage extends Serializable + object ClusterSingletonManager { /** @@ -136,25 +141,25 @@ object ClusterSingletonManager { /** * INTERNAL API */ - private object Internal { + private[akka] object Internal { /** * Sent from new oldest to previous oldest to initiate the * hand-over process. `HandOverInProgress` and `HandOverDone` * are expected replies. */ - case object HandOverToMe + case object HandOverToMe extends ClusterSingletonMessage /** * Confirmation by the previous oldest that the hand * over process, shut down of the singleton actor, has * started. */ - case object HandOverInProgress + case object HandOverInProgress extends ClusterSingletonMessage /** * Confirmation by the previous oldest that the singleton * actor has been terminated and the hand-over process is * completed. */ - case object HandOverDone + case object HandOverDone extends ClusterSingletonMessage /** * Sent from from previous oldest to new oldest to * initiate the normal hand-over process. @@ -162,7 +167,7 @@ object ClusterSingletonManager { * oldest immediately, without knowing who was previous * oldest. */ - case object TakeOverFromMe + case object TakeOverFromMe extends ClusterSingletonMessage final case class HandOverRetry(count: Int) final case class TakeOverRetry(count: Int) diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/protobuf/ClusterSingletonMessageSerializer.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/protobuf/ClusterSingletonMessageSerializer.scala new file mode 100644 index 0000000000..63e59a8f17 --- /dev/null +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/protobuf/ClusterSingletonMessageSerializer.scala @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.cluster.singleton.protobuf + +import akka.actor.ExtendedActorSystem +import akka.cluster.singleton.ClusterSingletonManager.Internal.HandOverDone +import akka.cluster.singleton.ClusterSingletonManager.Internal.HandOverInProgress +import akka.cluster.singleton.ClusterSingletonManager.Internal.HandOverToMe +import akka.cluster.singleton.ClusterSingletonManager.Internal.TakeOverFromMe +import akka.serialization.BaseSerializer +import akka.serialization.SerializationExtension +import akka.serialization.SerializerWithStringManifest + +/** + * INTERNAL API: Serializer of ClusterSingleton messages. + * It is actually not using protobuf, but if we add more messages to + * the ClusterSingleton we want to make protobuf representations of them. + */ +private[akka] class ClusterSingletonMessageSerializer(val system: ExtendedActorSystem) + extends SerializerWithStringManifest with BaseSerializer { + + private lazy val serialization = SerializationExtension(system) + + private val HandOverToMeManifest = "A" + private val HandOverInProgressManifest = "B" + private val HandOverDoneManifest = "C" + private val TakeOverFromMeManifest = "D" + + private val emptyByteArray = Array.empty[Byte] + + private val fromBinaryMap = collection.immutable.HashMap[String, Array[Byte] ⇒ AnyRef]( + HandOverToMeManifest -> { _ ⇒ HandOverToMe }, + HandOverInProgressManifest -> { _ ⇒ HandOverInProgress }, + HandOverDoneManifest -> { _ ⇒ HandOverDone }, + TakeOverFromMeManifest -> { _ ⇒ TakeOverFromMe }) + + override def manifest(obj: AnyRef): String = obj match { + case HandOverToMe ⇒ HandOverToMeManifest + case HandOverInProgress ⇒ HandOverInProgressManifest + case HandOverDone ⇒ HandOverDoneManifest + case TakeOverFromMe ⇒ TakeOverFromMeManifest + case _ ⇒ + throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]") + } + + override def toBinary(obj: AnyRef): Array[Byte] = obj match { + case HandOverToMe ⇒ emptyByteArray + case HandOverInProgress ⇒ emptyByteArray + case HandOverDone ⇒ emptyByteArray + case TakeOverFromMe ⇒ emptyByteArray + case _ ⇒ + throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]") + } + + override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = + fromBinaryMap.get(manifest) match { + case Some(f) ⇒ f(bytes) + case None ⇒ throw new IllegalArgumentException( + s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]") + } + +} diff --git a/akka-cluster-tools/src/test/scala/akka/cluster/singleton/protobuf/ClusterSingletonMessageSerializerSpec.scala b/akka-cluster-tools/src/test/scala/akka/cluster/singleton/protobuf/ClusterSingletonMessageSerializerSpec.scala new file mode 100644 index 0000000000..4d87caa7b2 --- /dev/null +++ b/akka-cluster-tools/src/test/scala/akka/cluster/singleton/protobuf/ClusterSingletonMessageSerializerSpec.scala @@ -0,0 +1,32 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.cluster.singleton.protobuf + +import akka.actor.ExtendedActorSystem +import akka.testkit.AkkaSpec +import akka.cluster.singleton.ClusterSingletonManager.Internal.HandOverDone +import akka.cluster.singleton.ClusterSingletonManager.Internal.HandOverInProgress +import akka.cluster.singleton.ClusterSingletonManager.Internal.HandOverToMe +import akka.cluster.singleton.ClusterSingletonManager.Internal.TakeOverFromMe + +class ClusterSingletonMessageSerializerSpec extends AkkaSpec { + + val serializer = new ClusterSingletonMessageSerializer(system.asInstanceOf[ExtendedActorSystem]) + + def checkSerialization(obj: AnyRef): Unit = { + val blob = serializer.toBinary(obj) + val ref = serializer.fromBinary(blob, serializer.manifest(obj)) + ref should ===(obj) + } + + "ClusterSingletonMessages" must { + + "be serializable" in { + checkSerialization(HandOverDone) + checkSerialization(HandOverInProgress) + checkSerialization(HandOverToMe) + checkSerialization(TakeOverFromMe) + } + } +}