!con #3597 Remove hand over data message in cluster singleton

This commit is contained in:
Patrik Nordwall 2013-09-10 13:35:51 +02:00
parent beba5d9f76
commit 23f933afe3
11 changed files with 89 additions and 143 deletions

View file

@ -28,9 +28,8 @@ supplied ``Props``. ``ClusterSingletonManager`` makes sure that at most one sing
is running at any point in time.
The singleton actor is always running on the oldest member, which can be determined by
``Member#isOlderThan``. This can change when removing members. A graceful hand over can normally
be performed when current oldest node is leaving the cluster. Be aware that there is a short
time period when there is no active singleton during the hand-over process.
``Member#isOlderThan``. This can change when removing that member from the cluster. Be aware
that there is a short time period when there is no active singleton during the hand-over process.
The cluster failure detector will notice when oldest node becomes unreachable due to
things like JVM crash, hard shut down, or network failure. Then a new oldest node will

View file

@ -26,12 +26,9 @@ object ClusterSingletonManager {
/**
* Scala API: Factory method for `ClusterSingletonManager` [[akka.actor.Props]].
* Note that the `singletonProps` function is applied when creating
* the singleton actor and it must not use members that are not thread safe, e.g.
* mutable state in enclosing actor.
*/
def props(
singletonProps: Option[Any] Props,
singletonProps: Props,
singletonName: String,
terminationMessage: Any,
role: Option[String],
@ -43,35 +40,28 @@ object ClusterSingletonManager {
/**
* Java API: Factory method for `ClusterSingletonManager` [[akka.actor.Props]].
* Note that the `singletonPropsFactory` is invoked when creating
* the singleton actor and it must not use members that are not thread safe, e.g.
* mutable state in enclosing actor.
*/
def props(
singletonProps: Props,
singletonName: String,
terminationMessage: Any,
role: String,
maxHandOverRetries: Int,
maxTakeOverRetries: Int,
retryInterval: FiniteDuration,
singletonPropsFactory: ClusterSingletonPropsFactory): Props =
props(handOverData singletonPropsFactory.create(handOverData.orNull), singletonName, terminationMessage,
retryInterval: FiniteDuration): Props =
props(singletonProps, singletonName, terminationMessage,
ClusterSingletonManager.Internal.roleOption(role), maxHandOverRetries, maxTakeOverRetries, retryInterval)
/**
* Java API: Factory method for `ClusterSingletonManager` [[akka.actor.Props]]
* with default values.
* Note that the `singletonPropsFactory` is invoked when creating
* the singleton actor and it must not use members that are not thread safe, e.g.
* mutable state in enclosing actor.
*/
def defaultProps(
singletonProps: Props,
singletonName: String,
terminationMessage: Any,
role: String,
singletonPropsFactory: ClusterSingletonPropsFactory): Props =
props(handOverData singletonPropsFactory.create(handOverData.orNull), singletonName, terminationMessage,
ClusterSingletonManager.Internal.roleOption(role))
role: String): Props =
props(singletonProps, singletonName, terminationMessage, ClusterSingletonManager.Internal.roleOption(role))
/**
* INTERNAL API
@ -103,12 +93,9 @@ object ClusterSingletonManager {
/**
* Confirmation by the previous oldest that the singleton
* actor has been terminated and the hand-over process is
* completed. The `handOverData` holds the message, if any,
* sent from the singleton actor to its parent ClusterSingletonManager
* when shutting down. It is passed to the `singletonProps`
* factory on the new oldest node.
* completed.
*/
case class HandOverDone(handOverData: Option[Any])
case object HandOverDone
/**
* Sent from from previous oldest to new oldest to
* initiate the normal hand-over process.
@ -135,11 +122,10 @@ object ClusterSingletonManager {
case object Uninitialized extends Data
case class YoungerData(oldestOption: Option[Address]) extends Data
case class BecomingOldestData(previousOldestOption: Option[Address]) extends Data
case class OldestData(singleton: ActorRef, singletonTerminated: Boolean = false,
handOverData: Option[Any] = None) extends Data
case class WasOldestData(singleton: ActorRef, singletonTerminated: Boolean, handOverData: Option[Any],
case class OldestData(singleton: ActorRef, singletonTerminated: Boolean = false) extends Data
case class WasOldestData(singleton: ActorRef, singletonTerminated: Boolean,
newOldestOption: Option[Address]) extends Data
case class HandingOverData(singleton: ActorRef, handOverTo: Option[ActorRef], handOverData: Option[Any]) extends Data
case class HandingOverData(singleton: ActorRef, handOverTo: Option[ActorRef]) extends Data
case object EndData extends Data
val HandOverRetryTimer = "hand-over-retry"
@ -263,22 +249,6 @@ object ClusterSingletonManager {
}
}
/**
* Java API. Factory for the [[akka.actor.Props]] of the singleton
* actor instance. Used in constructor of
* [[akka.contrib.pattern.ClusterSingletonManager]]
*/
@SerialVersionUID(1L)
trait ClusterSingletonPropsFactory extends Serializable {
/**
* Create the `Props` from the `handOverData` sent from
* previous singleton. `handOverData` might be null
* when no hand-over took place, or when the there is no need
* for sending data to the new singleton.
*/
def create(handOverData: Any): Props
}
/**
* Thrown when a consistent state can't be determined within the
* defined retry limits. Eventually it will reach a stable state and
@ -305,11 +275,6 @@ class ClusterSingletonManagerIsStuck(message: String) extends AkkaException(mess
* there is a short time period when there is no active singleton during the
* hand-over process.
*
* The singleton actor can at any time send a message to its parent
* ClusterSingletonManager and this message will be passed to the
* `singletonProps` factory on the new oldest node when a graceful
* hand-over is performed.
*
* The cluster failure detector will notice when oldest node
* becomes unreachable due to things like JVM crash, hard shut down,
* or network failure. Then a new oldest node will take over and a
@ -329,23 +294,13 @@ class ClusterSingletonManagerIsStuck(message: String) extends AkkaException(mess
*
* ==Arguments==
*
* '''''singletonProps''''' Factory for [[akka.actor.Props]] of the
* singleton actor instance. The `Option` parameter is the the
* `handOverData` sent from previous singleton. `handOverData`
* might be None when no hand-over took place, or when the there
* is no need for sending data to the new singleton. The `handOverData`
* is typically passed as parameter to the constructor of the
* singleton actor. Note that the `singletonProps` function is applied when creating
* the singleton actor and it must not use members that are not thread safe, e.g.
* mutable state in enclosing actor.
* '''''singletonProps''''' [[akka.actor.Props]] of the singleton actor instance.
*
* '''''singletonName''''' The actor name of the child singleton actor.
*
* '''''terminationMessage''''' When handing over to a new oldest node
* this `terminationMessage` is sent to the singleton actor to tell
* it to finish its work, close resources, and stop. It can sending
* a message back to the parent ClusterSingletonManager, which will
* passed to the `singletonProps` factory on the new oldest node.
* it to finish its work, close resources, and stop.
* The hand-over to the new oldest node is completed when the
* singleton actor is terminated.
* Note that [[akka.actor.PoisonPill]] is a perfectly fine
@ -380,7 +335,7 @@ class ClusterSingletonManagerIsStuck(message: String) extends AkkaException(mess
* stopped for certain corner cases.
*/
class ClusterSingletonManager(
singletonProps: Option[Any] Props,
singletonProps: Props,
singletonName: String,
terminationMessage: Any,
role: Option[String],
@ -472,7 +427,7 @@ class ClusterSingletonManager(
oldestChangedReceived = true
if (oldestOption == selfAddressOption && memberCount == 1)
// alone, oldest immediately
gotoOldest(None)
gotoOldest()
else if (oldestOption == selfAddressOption)
goto(BecomingOldest) using BecomingOldestData(None)
else
@ -485,8 +440,8 @@ class ClusterSingletonManager(
if (oldestOption == selfAddressOption) {
logInfo("Younger observed OldestChanged: [{} -> myself]", previousOldestOption)
previousOldestOption match {
case None gotoOldest(None)
case Some(prev) if removed.contains(prev) gotoOldest(None)
case None gotoOldest()
case Some(prev) if removed.contains(prev) gotoOldest()
case Some(prev)
peer(prev) ! HandOverToMe
goto(BecomingOldest) using BecomingOldestData(previousOldestOption)
@ -517,9 +472,9 @@ class ClusterSingletonManager(
cancelTimer(HandOverRetryTimer)
stay
case Event(HandOverDone(handOverData), BecomingOldestData(Some(previousOldest)))
case Event(HandOverDone, BecomingOldestData(Some(previousOldest)))
if (sender.path.address == previousOldest)
gotoOldest(handOverData)
gotoOldest()
else {
logInfo("Ignoring HandOverDone in BecomingOldest from [{}]. Expected previous oldest [{}]",
sender.path.address, previousOldest)
@ -551,21 +506,21 @@ class ClusterSingletonManager(
// can't send HandOverToMe, previousOldest unknown for new node (or restart)
// previous oldest might be down or removed, so no TakeOverFromMe message is received
logInfo("Timeout in BecomingOldest. Previous oldest unknown, removed and no TakeOver request.")
gotoOldest(None)
gotoOldest()
} else
throw new ClusterSingletonManagerIsStuck(
s"Becoming singleton oldest was stuck because previous oldest [${previousOldestOption}] is unresponsive")
}
def gotoOldest(handOverData: Option[Any]): State = {
def gotoOldest(): State = {
logInfo("Singleton manager [{}] starting singleton actor", cluster.selfAddress)
val singleton = context watch context.actorOf(singletonProps(handOverData), singletonName)
val singleton = context watch context.actorOf(singletonProps, singletonName)
goto(Oldest) using OldestData(singleton)
}
when(Oldest) {
case Event(OldestChanged(oldestOption), OldestData(singleton, singletonTerminated, handOverData))
case Event(OldestChanged(oldestOption), OldestData(singleton, singletonTerminated))
oldestChangedReceived = true
logInfo("Oldest observed OldestChanged: [{} -> {}]", cluster.selfAddress, oldestOption)
oldestOption match {
@ -573,30 +528,27 @@ class ClusterSingletonManager(
// already oldest
stay
case Some(a) if !selfExited && removed.contains(a)
gotoHandingOver(singleton, singletonTerminated, handOverData, None)
gotoHandingOver(singleton, singletonTerminated, None)
case Some(a)
// send TakeOver request in case the new oldest doesn't know previous oldest
peer(a) ! TakeOverFromMe
setTimer(TakeOverRetryTimer, TakeOverRetry(1), retryInterval, repeat = false)
goto(WasOldest) using WasOldestData(singleton, singletonTerminated, handOverData, newOldestOption = Some(a))
goto(WasOldest) using WasOldestData(singleton, singletonTerminated, newOldestOption = Some(a))
case None
// new oldest will initiate the hand-over
setTimer(TakeOverRetryTimer, TakeOverRetry(1), retryInterval, repeat = false)
goto(WasOldest) using WasOldestData(singleton, singletonTerminated, handOverData, newOldestOption = None)
goto(WasOldest) using WasOldestData(singleton, singletonTerminated, newOldestOption = None)
}
case Event(HandOverToMe, OldestData(singleton, singletonTerminated, handOverData))
gotoHandingOver(singleton, singletonTerminated, handOverData, Some(sender))
case Event(HandOverToMe, OldestData(singleton, singletonTerminated))
gotoHandingOver(singleton, singletonTerminated, Some(sender))
case Event(singletonHandOverMessage, d @ OldestData(singleton, _, _)) if sender == singleton
stay using d.copy(handOverData = Some(singletonHandOverMessage))
case Event(Terminated(ref), d @ OldestData(singleton, _, _)) if ref == singleton
case Event(Terminated(ref), d @ OldestData(singleton, _)) if ref == singleton
stay using d.copy(singletonTerminated = true)
}
when(WasOldest) {
case Event(TakeOverRetry(count), WasOldestData(_, _, _, newOldestOption))
case Event(TakeOverRetry(count), WasOldestData(_, _, newOldestOption))
if (count <= maxTakeOverRetries) {
logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newOldestOption)
newOldestOption foreach { peer(_) ! TakeOverFromMe }
@ -605,49 +557,43 @@ class ClusterSingletonManager(
} else
throw new ClusterSingletonManagerIsStuck(s"Expected hand-over to [${newOldestOption}] never occured")
case Event(HandOverToMe, WasOldestData(singleton, singletonTerminated, handOverData, _))
gotoHandingOver(singleton, singletonTerminated, handOverData, Some(sender))
case Event(HandOverToMe, WasOldestData(singleton, singletonTerminated, _))
gotoHandingOver(singleton, singletonTerminated, Some(sender))
case Event(MemberRemoved(m, _), WasOldestData(singleton, singletonTerminated, handOverData, Some(newOldest))) if !selfExited && m.address == newOldest
case Event(MemberRemoved(m, _), WasOldestData(singleton, singletonTerminated, Some(newOldest))) if !selfExited && m.address == newOldest
addRemoved(m.address)
gotoHandingOver(singleton, singletonTerminated, handOverData, None)
gotoHandingOver(singleton, singletonTerminated, None)
case Event(singletonHandOverMessage, d @ WasOldestData(singleton, _, _, _)) if sender == singleton
stay using d.copy(handOverData = Some(singletonHandOverMessage))
case Event(Terminated(ref), d @ WasOldestData(singleton, _, _, _)) if ref == singleton
case Event(Terminated(ref), d @ WasOldestData(singleton, _, _)) if ref == singleton
stay using d.copy(singletonTerminated = true)
}
def gotoHandingOver(singleton: ActorRef, singletonTerminated: Boolean, handOverData: Option[Any], handOverTo: Option[ActorRef]): State = {
def gotoHandingOver(singleton: ActorRef, singletonTerminated: Boolean, handOverTo: Option[ActorRef]): State = {
if (singletonTerminated) {
handOverDone(handOverTo, handOverData)
handOverDone(handOverTo)
} else {
handOverTo foreach { _ ! HandOverInProgress }
singleton ! terminationMessage
goto(HandingOver) using HandingOverData(singleton, handOverTo, handOverData)
goto(HandingOver) using HandingOverData(singleton, handOverTo)
}
}
when(HandingOver) {
case (Event(Terminated(ref), HandingOverData(singleton, handOverTo, handOverData))) if ref == singleton
handOverDone(handOverTo, handOverData)
case (Event(Terminated(ref), HandingOverData(singleton, handOverTo))) if ref == singleton
handOverDone(handOverTo)
case Event(HandOverToMe, d @ HandingOverData(singleton, handOverTo, _)) if handOverTo == Some(sender)
case Event(HandOverToMe, d @ HandingOverData(singleton, handOverTo)) if handOverTo == Some(sender)
// retry
sender ! HandOverInProgress
stay
case Event(singletonHandOverMessage, d @ HandingOverData(singleton, _, _)) if sender == singleton
stay using d.copy(handOverData = Some(singletonHandOverMessage))
}
def handOverDone(handOverTo: Option[ActorRef], handOverData: Option[Any]): State = {
def handOverDone(handOverTo: Option[ActorRef]): State = {
val newOldest = handOverTo.map(_.path.address)
logInfo("Singleton terminated, hand-over done [{} -> {}]", cluster.selfAddress, newOldest)
handOverTo foreach { _ ! HandOverDone(handOverData) }
handOverTo foreach { _ ! HandOverDone }
if (selfExited || removed.contains(cluster.selfAddress))
goto(End) using EndData
else

View file

@ -78,7 +78,7 @@ class ClusterSingletonManagerChaosSpec extends MultiNodeSpec(ClusterSingletonMan
def createSingleton(): ActorRef = {
system.actorOf(ClusterSingletonManager.props(
singletonProps = handOverData Props(classOf[Echo], testActor),
singletonProps = Props(classOf[Echo], testActor),
singletonName = "echo",
terminationMessage = PoisonPill,
role = None),

View file

@ -111,15 +111,11 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig {
/**
* The Singleton actor
*/
class Consumer(handOverData: Option[Any], queue: ActorRef, delegateTo: ActorRef) extends Actor {
class Consumer(queue: ActorRef, delegateTo: ActorRef) extends Actor {
import Consumer._
import PointToPointChannel._
var current: Int = handOverData match {
case Some(x: Int) x
case Some(x) throw new IllegalArgumentException(s"handOverData must be an Int, got [${x}]")
case None 0
}
var current = 0
override def preStart(): Unit = queue ! RegisterConsumer
@ -137,9 +133,6 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig {
case End
queue ! UnregisterConsumer
case UnregistrationOk
// reply to ClusterSingletonManager with hand over data,
// which will be passed as parameter to new consumer singleton
context.parent ! current
context stop self
//#consumer-end
}
@ -226,8 +219,7 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
def createSingleton(): ActorRef = {
//#create-singleton-manager
system.actorOf(ClusterSingletonManager.props(
singletonProps = handOverData
Props(classOf[Consumer], handOverData, queue, testActor),
singletonProps = Props(classOf[Consumer], queue, testActor),
singletonName = "consumer",
terminationMessage = End,
role = Some("worker")),
@ -238,12 +230,12 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
def consumer(oldest: RoleName): ActorSelection =
system.actorSelection(RootActorPath(node(oldest).address) / "user" / "singleton" / "consumer")
def verifyRegistration(oldest: RoleName, expectedCurrent: Int): Unit = {
def verifyRegistration(oldest: RoleName): Unit = {
enterBarrier("before-" + oldest.name + "-registration-verified")
runOn(oldest) {
expectMsg(RegistrationOk)
consumer(oldest) ! GetCurrent
expectMsg(expectedCurrent)
expectMsg(0)
}
enterBarrier("after-" + oldest.name + "-registration-verified")
}
@ -292,7 +284,7 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
join(first, first)
awaitMemberUp(memberProbe, first)
verifyRegistration(first, expectedCurrent = 0)
verifyRegistration(first)
verifyMsg(first, msg = 1)
// join the observer node as well, which should not influence since it doesn't have the "worker" role
@ -330,7 +322,7 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
Cluster(system) leave node(leaveRole).address
}
verifyRegistration(second, expectedCurrent = 6)
verifyRegistration(second)
verifyMsg(second, msg = 7)
runOn(leaveRole) {
@ -353,19 +345,19 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
enterBarrier("logs-muted")
crash(second)
verifyRegistration(third, expectedCurrent = 0)
verifyRegistration(third)
verifyMsg(third, msg = 8)
}
"take over when two oldest crash in 3 nodes cluster" in within(60 seconds) {
crash(third, fourth)
verifyRegistration(fifth, expectedCurrent = 0)
verifyRegistration(fifth)
verifyMsg(fifth, msg = 9)
}
"take over when oldest crashes in 2 nodes cluster" in within(60 seconds) {
crash(fifth)
verifyRegistration(sixth, expectedCurrent = 0)
verifyRegistration(sixth)
verifyMsg(sixth, msg = 10)
}

View file

@ -31,13 +31,9 @@ public class ClusterSingletonManagerTest {
//#create-singleton-manager
system.actorOf(
ClusterSingletonManager.defaultProps("consumer", new End(), "worker",
new ClusterSingletonPropsFactory() {
@Override
public Props create(Object handOverData) {
return Props.create(Consumer.class, handOverData, queue, testActor);
}
}), "singleton");
ClusterSingletonManager.defaultProps(
Props.create(Consumer.class, queue, testActor), "consumer",
new End(), "worker"), "singleton");
//#create-singleton-manager
}

View file

@ -5,4 +5,4 @@
################################
Migration from 2.0.x to 2.1.x is described in the
`documentation of 2.1 <http://doc.akka.io/docs/akka/2.1.2/project/migration-guide-2.0.x-2.1.x.html>`_.
`documentation of 2.1 <http://doc.akka.io/docs/akka/2.1.4/project/migration-guide-2.0.x-2.1.x.html>`_.

View file

@ -0,0 +1,21 @@
.. _migration-2.3:
################################
Migration Guide 2.2.x to 2.3.x
################################
The 2.2 release contains some structural changes that require some
simple, mechanical source-level changes in client code.
When migrating from earlier versions you should first follow the instructions for
migrating :ref:`1.3.x to 2.0.x <migration-2.0>` and then :ref:`2.0.x to 2.1.x <migration-2.1>`
and then :ref:`2.1.x to 2.2.x <migration-2.2>`.
Removed hand over data in cluster singleton
===========================================
The support for passing data from previous singleton instance to new instance
in a graceful leaving scenario has been removed. Valuable state should be persisted
in durable storage instead, e.g. using akka-persistence. The constructor/props parameters
of ``ClusterSingletonManager`` has been changed to ordinary ``Props`` parameter for the
singleton actor instead of the factory parameter.

View file

@ -6,7 +6,6 @@ import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.contrib.pattern.ClusterSingletonManager;
import akka.contrib.pattern.ClusterSingletonPropsFactory;
public class StatsSampleOneMasterMain {
@ -23,13 +22,9 @@ public class StatsSampleOneMasterMain {
//#create-singleton-manager
system.actorOf(ClusterSingletonManager.defaultProps(
"statsService", PoisonPill.getInstance(), "compute",
new ClusterSingletonPropsFactory() {
@Override
public Props create(Object handOverData) {
return Props.create(StatsService.class);
}
}), "singleton");
Props.create(StatsService.class),
"statsService", PoisonPill.getInstance(), "compute"),
"singleton");
//#create-singleton-manager
system.actorOf(Props.create(StatsFacade.class), "statsFacade");

View file

@ -155,7 +155,7 @@ object StatsSampleOneMaster {
//#create-singleton-manager
system.actorOf(ClusterSingletonManager.props(
singletonProps = _ Props[StatsService], singletonName = "statsService",
singletonProps = Props[StatsService], singletonName = "statsService",
terminationMessage = PoisonPill, role = Some("compute")),
name = "singleton")
//#create-singleton-manager

View file

@ -85,7 +85,7 @@ abstract class StatsSampleSingleMasterSpec extends MultiNodeSpec(StatsSampleSing
Cluster(system).unsubscribe(testActor)
system.actorOf(ClusterSingletonManager.props(
singletonProps = _ Props[StatsService], singletonName = "statsService",
singletonProps = Props[StatsService], singletonName = "statsService",
terminationMessage = PoisonPill, role = Some("compute")), name = "singleton")
system.actorOf(Props[StatsFacade], "statsFacade")

View file

@ -19,7 +19,6 @@ import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit.ImplicitSender
import sample.cluster.stats.japi.StatsMessages._
import akka.contrib.pattern.ClusterSingletonPropsFactory
object StatsSampleSingleMasterJapiSpecConfig extends MultiNodeConfig {
// register the named roles (nodes) of the test
@ -85,12 +84,10 @@ abstract class StatsSampleSingleMasterJapiSpec extends MultiNodeSpec(StatsSample
Cluster(system).unsubscribe(testActor)
system.actorOf(ClusterSingletonManager.defaultProps(
Props[StatsService],
singletonName = "statsService",
terminationMessage = PoisonPill,
role = null,
singletonPropsFactory = new ClusterSingletonPropsFactory {
def create(handOverData: Any) = Props[StatsService]
}), name = "singleton")
role = null), name = "singleton")
system.actorOf(Props[StatsFacade], "statsFacade")