diff --git a/akka-contrib/docs/cluster-singleton.rst b/akka-contrib/docs/cluster-singleton.rst index 7af56932c3..da6030424e 100644 --- a/akka-contrib/docs/cluster-singleton.rst +++ b/akka-contrib/docs/cluster-singleton.rst @@ -19,8 +19,8 @@ such as single-point of bottleneck. Single-point of failure is also a relevant c but for some cases this feature takes care of that by making sure that another singleton instance will eventually be started. -The cluster singleton is implemented by ``akka.contrib.pattern.ClusterSingletonManager``. -It manages singleton actor instance among all cluster nodes or a group of nodes tagged with +The cluster singleton pattern is implemented by ``akka.contrib.pattern.ClusterSingletonManager``. +It manages one singleton actor instance among all cluster nodes or a group of nodes tagged with a specific role. ``ClusterSingletonManager`` is an actor that is supposed to be started on all nodes, or all nodes with specified role, in the cluster. The actual singleton actor is started by the ``ClusterSingletonManager`` on the oldest node by creating a child actor from @@ -37,11 +37,16 @@ take over and a new singleton actor is created. For these failure scenarios ther not be a graceful hand-over, but more than one active singletons is prevented by all reasonable means. Some corner cases are eventually resolved by configurable timeouts. -You access the singleton actor with ``actorSelection`` using the names you have -specified when creating the ClusterSingletonManager. You can subscribe to -``akka.cluster.ClusterEvent.MemberEvent`` and sort the members by age -(``Member#isOlderThan``) to keep track of oldest member. -Alternatively the singleton actor may broadcast its existence when it is started. +You can access the singleton actor by using the provided ``akka.contrib.pattern.ClusterSingletonProxy``, +which will route all messages to the current instance of the singleton. The proxy will keep track of +the oldest node in the cluster and resolve the singleton's ``ActorRef`` by explicitly sending the +singleton's ``actorSelection`` the ``akka.actor.Identify`` message and waiting for it to reply. +This is performed periodically if the singleton doesn't reply within a certain (configurable) time. +Given the implementation, there might be periods of time during which the ``ActorRef`` is unavailable, +e.g., when a node leaves the cluster. In these cases, the proxy will stash away all messages until it +is able to identify the singleton. It's worth noting that messages can always be lost because of the +distributed nature of these actors. As always, additional logic should be implemented in the singleton +(acknowledgement) and in the client (retry) actors to ensure at-least-once message delivery. An Example ---------- @@ -84,26 +89,20 @@ Here is how the singleton actor handles the ``terminationMessage`` in this examp .. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala#consumer-end -With the names given above the path of singleton actor can be constructed by subscribing to -``MemberEvent`` cluster event and sort the members by age to keep track of oldest member. +Note that you can send back current state to the ``ClusterSingletonManager`` before terminating. +This message will be sent over to the ``ClusterSingletonManager`` at the new oldest node and it +will be passed to the ``singletonProps`` factory when creating the new singleton instance. + +With the names given above, access to the singleton can be obtained from any cluster node using a properly +configured proxy. In Scala: -.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala#singleton-proxy +.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala#create-singleton-proxy In Java: -.. includecode:: @contribSrc@/src/test/java/akka/contrib/pattern/ClusterSingletonManagerTest.java#singleton-proxy - -The checks of ``role`` can be omitted if you don't limit the singleton to the group of members -tagged with a specific role. - -Note that the hand-over might still be in progress and the singleton actor might not be started yet -when you receive the member event. - -A nice alternative to the above proxy is to use :ref:`distributed-pub-sub`. Let the singleton -actor register itself to the mediator with ``DistributedPubSubMediator.Put`` message when it is -started. Send messages to the singleton actor via the mediator with ``DistributedPubSubMediator.SendToAll``. +.. includecode:: @contribSrc@/src/test/java/akka/contrib/pattern/ClusterSingletonManagerTest.java#create-singleton-proxy A more comprehensive sample is available in the `Typesafe Activator `_ tutorial named `Distributed workers with Akka and Scala! `_ diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonProxy.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonProxy.scala new file mode 100644 index 0000000000..b328e9206e --- /dev/null +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonProxy.scala @@ -0,0 +1,208 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package akka.contrib.pattern + +import akka.actor._ +import akka.cluster.{ MemberStatus, Cluster, Member } +import scala.collection.immutable +import akka.cluster.ClusterEvent._ +import akka.cluster.ClusterEvent.MemberRemoved +import akka.cluster.ClusterEvent.MemberUp +import akka.actor.RootActorPath +import akka.cluster.ClusterEvent.CurrentClusterState +import akka.cluster.ClusterEvent.MemberExited +import scala.concurrent.duration._ +import scala.language.postfixOps + +object ClusterSingletonProxy { + /** + * Scala API: Factory method for `ClusterSingletonProxy` [[akka.actor.Props]]. + * + * @param singletonPath The logical path of the singleton, i.e., /user/singletonManager/singleton. + * @param role The role of the cluster nodes where the singleton can be deployed. If None, then any node will do. + * @param singletonIdentificationInterval Interval at which the proxy will try to resolve the singleton instance. + * @return The singleton proxy Props. + */ + def props(singletonPath: String, role: Option[String], singletonIdentificationInterval: FiniteDuration = 1.second): Props = Props(classOf[ClusterSingletonProxy], singletonPath, role, singletonIdentificationInterval) + + /** + * Java API: Factory method for `ClusterSingletonProxy` [[akka.actor.Props]]. + * + * @param singletonPath The logical path of the singleton, i.e., /user/singletonManager/singleton. + * @param role The role of the cluster nodes where the singleton can be deployed. If null, then any node will do. + * @param singletonIdentificationInterval Interval at which the proxy will try to resolve the singleton instance. + * @return The singleton proxy Props. + */ + def props(singletonPath: String, role: String, singletonIdentificationInterval: FiniteDuration): Props = + props(singletonPath, roleOption(role), singletonIdentificationInterval) + + /** + * Java API: Factory method for `ClusterSingletonProxy` [[akka.actor.Props]]. The interval at which the proxy will try + * to resolve the singleton instance is set to 1 second. + * + * @param singletonPath The logical path of the singleton, i.e., /user/singletonManager/singleton. + * @param role The role of the cluster nodes where the singleton can be deployed. If null, then any node will do. + * @return The singleton proxy Props. + */ + def defaultProps(singletonPath: String, role: String): Props = props(singletonPath, role, 1 second) + + private def roleOption(role: String): Option[String] = role match { + case null | "" ⇒ None + case _ ⇒ Some(role) + } + + private case object TryToIdentifySingleton + +} + +/** + * The `ClusterSingletonProxy` works together with the [[akka.contrib.pattern.ClusterSingletonManager]] to provide a + * distributed proxy to the singleton actor. + * + * The proxy can be started on every node where the singleton needs to be reached and used as if it were the singleton + * itself. It will then act as a router to the currently running singleton instance. If the singleton is not currently + * available, e.g., during hand off or startup, the proxy will stash the messages sent to the singleton and then unstash + * them when the singleton is finally available. The proxy mixes in the [[akka.actor.Stash]] trait, so it can be + * configured accordingly. + * + * The proxy works by keeping track of the oldest cluster member. When a new oldest member is identified, e.g., because + * the older one left the cluster, or at startup, the proxy will try to identify the singleton on the oldest member by + * periodically sending an [[akka.actor.Identify]] message until the singleton responds with its + * [[akka.actor.ActorIdentity]]. + * + * Note that this is a best effort implementation: messages can always be lost due to the distributed nature of the + * actors involved. + * + * @param singletonPathString The logical path of the singleton. This does not include the node address or actor system + * name, e.g., it can be something like /user/singletonManager/singleton. + * @param role Cluster role on which the singleton is deployed. This is required to keep track only of the members where + * the singleton can actually exist. + * @param singletonIdentificationInterval Periodicity at which the proxy sends the `Identify` message to the current + * singleton actor selection. + */ +class ClusterSingletonProxy(singletonPathString: String, role: Option[String], singletonIdentificationInterval: FiniteDuration) extends Actor with Stash with ActorLogging { + + val singletonPath = singletonPathString.split("/") + var identifyCounter = 0 + var identifyId = createIdentifyId(identifyCounter) + def createIdentifyId(i: Int) = "identify-singleton-" + singletonPath mkString "/" + i + var identifyTimer: Option[Cancellable] = None + + val cluster = Cluster(context.system) + var singleton: Option[ActorRef] = None + // sort by age, oldest first + val ageOrdering = Ordering.fromLessThan[Member] { + (a, b) ⇒ a.isOlderThan(b) + } + var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering) + + // subscribe to MemberEvent, re-subscribe when restart + override def preStart(): Unit = { + cancelTimer() + cluster.subscribe(self, classOf[MemberEvent]) + } + + override def postStop(): Unit = { + cancelTimer() + cluster.unsubscribe(self) + } + + def cancelTimer() = { + identifyTimer.foreach(_.cancel()) + identifyTimer = None + } + + def matchingRole(member: Member): Boolean = role match { + case None ⇒ true + case Some(r) ⇒ member.hasRole(r) + } + + def handleInitial(state: CurrentClusterState): Unit = { + trackChange { + () ⇒ + membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.collect { + case m if m.status == MemberStatus.Up && matchingRole(m) ⇒ m + } + } + } + + /** + * Discard old singleton ActorRef and send a periodic message to self to identify the singleton. + */ + def identifySingleton() { + import context.dispatcher + log.debug("Creating singleton identification timer...") + identifyCounter += 1 + identifyId = createIdentifyId(identifyCounter) + singleton = None + cancelTimer() + identifyTimer = Some(context.system.scheduler.schedule(0 milliseconds, singletonIdentificationInterval, self, ClusterSingletonProxy.TryToIdentifySingleton)) + } + + def trackChange(block: () ⇒ Unit): Unit = { + val before = membersByAge.headOption + block() + val after = membersByAge.headOption + // if the head has changed, I need to find the new singleton + if (before != after) identifySingleton() + } + + /** + * Adds new member if it has the right role. + * @param m New cluster member. + */ + def add(m: Member): Unit = { + if (matchingRole(m)) + trackChange { + () ⇒ membersByAge += m + } + } + + /** + * Removes a member. + * @param m Cluster member to remove. + */ + def remove(m: Member): Unit = { + if (matchingRole(m)) + trackChange { + () ⇒ membersByAge -= m + } + } + + def receive = { + // cluster logic + case state: CurrentClusterState ⇒ handleInitial(state) + case MemberUp(m) ⇒ add(m) + case mEvent: MemberEvent if mEvent.isInstanceOf[MemberExited] || mEvent.isInstanceOf[MemberRemoved] ⇒ remove(mEvent.member) + case _: MemberEvent ⇒ // do nothing + + // singleton identification logic + case ActorIdentity(identifyId, Some(s)) ⇒ + // if the new singleton is defined, unstash all messages + log.info("Singleton identified: {}", s.path) + singleton = Some(s) + cancelTimer() + unstashAll() + case _: ActorIdentity ⇒ // do nothing + case ClusterSingletonProxy.TryToIdentifySingleton if identifyTimer.isDefined ⇒ + membersByAge.headOption.foreach { + oldest ⇒ + val singletonAddress = RootActorPath(oldest.address) / singletonPath + log.debug("Trying to identify singleton at {}", singletonAddress) + context.actorSelection(singletonAddress) ! Identify(identifyId) + } + + // forwarding/stashing logic + case msg: Any ⇒ + singleton match { + case Some(s) ⇒ + log.debug("Forwarding message to current singleton instance {}", msg) + s forward msg + case None ⇒ + log.debug("No singleton available, stashing message {}", msg) + stash() + } + } +} diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala index f5325614dc..359a4244d1 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterSingletonManagerSpec.scala @@ -44,20 +44,29 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig { akka.actor.provider = "akka.cluster.ClusterActorRefProvider" akka.remote.log-remote-lifecycle-events = off akka.cluster.auto-down-unreachable-after = 0s - """)) + """)) nodeConfig(first, second, third, fourth, fifth, sixth)( ConfigFactory.parseString("akka.cluster.roles =[worker]")) object PointToPointChannel { + case object RegisterConsumer + case object UnregisterConsumer + case object RegistrationOk + case object UnexpectedRegistration + case object UnregistrationOk + case object UnexpectedUnregistration + case object Reset + case object ResetOk + } /** @@ -67,6 +76,7 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig { * singleton instances). */ class PointToPointChannel extends Actor with ActorLogging { + import PointToPointChannel._ def receive = idle @@ -107,12 +117,15 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig { object Consumer { case object End case object GetCurrent + case object Ping + case object Pong } /** * The Singleton actor */ class Consumer(queue: ActorRef, delegateTo: ActorRef) extends Actor { + import Consumer._ import PointToPointChannel._ @@ -135,6 +148,8 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig { queue ! UnregisterConsumer case UnregistrationOk ⇒ context stop self + case Ping ⇒ + sender ! Pong //#consumer-end } } @@ -146,12 +161,15 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig { // subscribe to MemberEvent, re-subscribe when restart override def preStart(): Unit = Cluster(context.system).subscribe(self, classOf[MemberEvent]) + override def postStop(): Unit = Cluster(context.system).unsubscribe(self) val role = "worker" // sort by age, oldest first - val ageOrdering = Ordering.fromLessThan[Member] { (a, b) ⇒ a.isOlderThan(b) } + val ageOrdering = Ordering.fromLessThan[Member] { + (a, b) ⇒ a.isOlderThan(b) + } var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering) @@ -161,27 +179,38 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig { m.status == MemberStatus.Up && m.hasRole(role)) case MemberUp(m) ⇒ if (m.hasRole(role)) membersByAge += m case MemberRemoved(m, _) ⇒ if (m.hasRole(role)) membersByAge -= m - case other ⇒ consumer foreach { _.tell(other, sender()) } + case other ⇒ consumer foreach { + _.tell(other, sender()) + } } def consumer: Option[ActorSelection] = membersByAge.headOption map (m ⇒ context.actorSelection( RootActorPath(m.address) / "user" / "singleton" / "consumer")) } + //#singleton-proxy } class ClusterSingletonManagerMultiJvmNode1 extends ClusterSingletonManagerSpec + class ClusterSingletonManagerMultiJvmNode2 extends ClusterSingletonManagerSpec + class ClusterSingletonManagerMultiJvmNode3 extends ClusterSingletonManagerSpec + class ClusterSingletonManagerMultiJvmNode4 extends ClusterSingletonManagerSpec + class ClusterSingletonManagerMultiJvmNode5 extends ClusterSingletonManagerSpec + class ClusterSingletonManagerMultiJvmNode6 extends ClusterSingletonManagerSpec + class ClusterSingletonManagerMultiJvmNode7 extends ClusterSingletonManagerSpec + class ClusterSingletonManagerMultiJvmNode8 extends ClusterSingletonManagerSpec class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerSpec) with STMultiNodeSpec with ImplicitSender { + import ClusterSingletonManagerSpec._ import ClusterSingletonManagerSpec.PointToPointChannel._ import ClusterSingletonManagerSpec.Consumer._ @@ -192,6 +221,13 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS val controllerRootActorPath = node(controller) + var _msg = 0 + + def msg(): Int = { + _msg += 1 + _msg + } + def queue: ActorRef = { // this is used from inside actor construction, i.e. other thread, and must therefore not call `node(controller` system.actorSelection(controllerRootActorPath / "user" / "queue").tell(Identify("queue"), identifyProbe.ref) @@ -201,7 +237,10 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS def join(from: RoleName, to: RoleName): Unit = { runOn(from) { Cluster(system) join node(to).address - if (Cluster(system).selfRoles.contains("worker")) createSingleton() + if (Cluster(system).selfRoles.contains("worker")) { + createSingleton() + createSingletonProxy() + } } } @@ -227,6 +266,43 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS //#create-singleton-manager } + def createSingletonProxy(): ActorRef = { + //#create-singleton-proxy + system.actorOf(ClusterSingletonProxy.props( + singletonPath = "/user/singleton/consumer", + role = Some("worker")), + name = "consumerProxy") + //#create-singleton-proxy + } + + def verifyProxyMsg(oldest: RoleName, proxyNode: RoleName, msg: Int): Unit = { + enterBarrier("before-" + msg + "-proxy-verified") + + // send a message to the proxy + runOn(proxyNode) { + // make sure that the proxy has received membership changes + // and points to the current singleton + val p = TestProbe() + within(5.seconds) { + awaitAssert { + println("#Ping") + system.actorSelection("/user/consumerProxy").tell(Ping, p.ref) + p.expectMsg(1.second, Pong) + println("#Pong") + } + } + // then send the real message + system.actorSelection("/user/consumerProxy") ! msg + } + + // expect a message on the oldest node + runOn(oldest) { + expectMsg(5.seconds, msg) + } + + enterBarrier("after-" + msg + "-proxy-verified") + } + def consumer(oldest: RoleName): ActorSelection = system.actorSelection(RootActorPath(node(oldest).address) / "user" / "singleton" / "consumer") @@ -285,35 +361,50 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS join(first, first) awaitMemberUp(memberProbe, first) verifyRegistration(first) - verifyMsg(first, msg = 1) + verifyMsg(first, msg = msg()) // join the observer node as well, which should not influence since it doesn't have the "worker" role join(observer, first) awaitMemberUp(memberProbe, observer, first) + verifyProxyMsg(first, first, msg = msg()) join(second, first) awaitMemberUp(memberProbe, second, observer, first) - verifyMsg(first, msg = 2) + verifyMsg(first, msg = msg()) + verifyProxyMsg(first, second, msg = msg()) join(third, first) awaitMemberUp(memberProbe, third, second, observer, first) - verifyMsg(first, msg = 3) + verifyMsg(first, msg = msg()) + verifyProxyMsg(first, third, msg = msg()) join(fourth, first) awaitMemberUp(memberProbe, fourth, third, second, observer, first) - verifyMsg(first, msg = 4) + verifyMsg(first, msg = msg()) + verifyProxyMsg(first, fourth, msg = msg()) join(fifth, first) awaitMemberUp(memberProbe, fifth, fourth, third, second, observer, first) - verifyMsg(first, msg = 5) + verifyMsg(first, msg = msg()) + verifyProxyMsg(first, fifth, msg = msg()) join(sixth, first) awaitMemberUp(memberProbe, sixth, fifth, fourth, third, second, observer, first) - verifyMsg(first, msg = 6) + verifyMsg(first, msg = msg()) + verifyProxyMsg(first, sixth, msg = msg()) enterBarrier("after-1") } + "let the proxy route messages to the singleton in a 6 node cluster" in within(60 seconds) { + verifyProxyMsg(first, first, msg = msg()) + verifyProxyMsg(first, second, msg = msg()) + verifyProxyMsg(first, third, msg = msg()) + verifyProxyMsg(first, fourth, msg = msg()) + verifyProxyMsg(first, fifth, msg = msg()) + verifyProxyMsg(first, sixth, msg = msg()) + } + "hand over when oldest leaves in 6 nodes cluster " in within(30 seconds) { val leaveRole = first val newOldestRole = second @@ -323,7 +414,12 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS } verifyRegistration(second) - verifyMsg(second, msg = 7) + verifyMsg(second, msg = msg()) + verifyProxyMsg(second, second, msg = msg()) + verifyProxyMsg(second, third, msg = msg()) + verifyProxyMsg(second, fourth, msg = msg()) + verifyProxyMsg(second, fifth, msg = msg()) + verifyProxyMsg(second, sixth, msg = msg()) runOn(leaveRole) { system.actorSelection("/user/singleton").tell(Identify("singleton"), identifyProbe.ref) @@ -346,19 +442,26 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS crash(second) verifyRegistration(third) - verifyMsg(third, msg = 8) + verifyMsg(third, msg = msg()) + verifyProxyMsg(third, third, msg = msg()) + verifyProxyMsg(third, fourth, msg = msg()) + verifyProxyMsg(third, fifth, msg = msg()) + verifyProxyMsg(third, sixth, msg = msg()) } "take over when two oldest crash in 3 nodes cluster" in within(60 seconds) { crash(third, fourth) verifyRegistration(fifth) - verifyMsg(fifth, msg = 9) + verifyMsg(fifth, msg = msg()) + verifyProxyMsg(fifth, fifth, msg = msg()) + verifyProxyMsg(fifth, sixth, msg = msg()) } "take over when oldest crashes in 2 nodes cluster" in within(60 seconds) { crash(fifth) verifyRegistration(sixth) - verifyMsg(sixth, msg = 10) + verifyMsg(sixth, msg = msg()) + verifyProxyMsg(sixth, sixth, msg = msg()) } } diff --git a/akka-contrib/src/test/java/akka/contrib/pattern/ClusterSingletonManagerTest.java b/akka-contrib/src/test/java/akka/contrib/pattern/ClusterSingletonManagerTest.java index 5497ea868e..9672a8c9b6 100644 --- a/akka-contrib/src/test/java/akka/contrib/pattern/ClusterSingletonManagerTest.java +++ b/akka-contrib/src/test/java/akka/contrib/pattern/ClusterSingletonManagerTest.java @@ -34,6 +34,10 @@ public class ClusterSingletonManagerTest { system.actorOf(ClusterSingletonManager.defaultProps(Props.create(Consumer.class, queue, testActor), "consumer", new End(), "worker"), "singleton"); //#create-singleton-manager + + //#create-singleton-proxy + system.actorOf(ClusterSingletonProxy.defaultProps("user/singleton/consumer", "worker"), "consumerProxy"); + //#create-singleton-proxy } static//documentation of how to keep track of the oldest member in user land diff --git a/akka-contrib/src/test/scala/akka/contrib/pattern/ClusterSingletonProxySpec.scala b/akka-contrib/src/test/scala/akka/contrib/pattern/ClusterSingletonProxySpec.scala new file mode 100644 index 0000000000..fe74286932 --- /dev/null +++ b/akka-contrib/src/test/scala/akka/contrib/pattern/ClusterSingletonProxySpec.scala @@ -0,0 +1,92 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.contrib.pattern + +import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } +import akka.testkit.{ TestProbe, TestKit } +import akka.actor._ +import com.typesafe.config.ConfigFactory +import akka.cluster.Cluster +import scala.concurrent.duration._ + +class ClusterSingletonProxySpec extends WordSpecLike with Matchers with BeforeAndAfterAll { + + import ClusterSingletonProxySpec._ + + val seed = new ActorSys() + seed.cluster.join(seed.cluster.selfAddress) + + val testSystems = (0 until 4).map(_ ⇒ new ActorSys(joinTo = Some(seed.cluster.selfAddress))) :+ seed + + "The cluster singleton proxy" must { + "correctly identify the singleton" in { + testSystems.foreach(_.testProxy("Hello")) + testSystems.foreach(_.testProxy("World")) + } + } + + override def afterAll() = testSystems.foreach(_.system.shutdown()) +} + +object ClusterSingletonProxySpec { + + class ActorSys(name: String = "ClusterSingletonProxySystem", joinTo: Option[Address] = None) + extends TestKit(ActorSystem(name, ConfigFactory.parseString(cfg))) { + + val cluster = Cluster(system) + joinTo.foreach(address ⇒ cluster.join(address)) + + cluster.registerOnMemberUp { + system.actorOf(ClusterSingletonManager.props( + singletonProps = Props[Singleton], + singletonName = "singleton", + terminationMessage = PoisonPill, + role = None, + maxHandOverRetries = 5, + maxTakeOverRetries = 2), name = "singletonManager") + } + + val proxy = system.actorOf(ClusterSingletonProxy.props("user/singletonManager/singleton", None), s"singletonProxy-${cluster.selfAddress.port.getOrElse(0)}") + + def testProxy(msg: String) { + val probe = TestProbe() + probe.send(proxy, msg) + // 25 seconds to make sure the singleton was started up + probe.expectMsg(25.seconds, "Got " + msg) + } + } + + val cfg = """akka { + + loglevel = INFO + + cluster { + auto-down-unreachable-after = 10s + + min-nr-of-members = 2 + } + + actor.provider = "akka.cluster.ClusterActorRefProvider" + + remote { + log-remote-lifecycle-events = off + netty.tcp { + hostname = "127.0.0.1" + port = 0 + } + } + } + """ + + class Singleton extends Actor with ActorLogging { + + log.info("Singleton created on {}", Cluster(context.system).selfAddress) + + def receive: Actor.Receive = { + case msg ⇒ + sender ! "Got " + msg + } + } + +}