+con #3843 Add ClusterSingletonProxy

This commit is contained in:
Giovanni Botta 2014-01-31 15:10:43 -05:00 committed by giodude
parent 9cc49d9ece
commit ee01a8dffe
5 changed files with 441 additions and 35 deletions

View file

@ -44,20 +44,29 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig {
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
"""))
"""))
nodeConfig(first, second, third, fourth, fifth, sixth)(
ConfigFactory.parseString("akka.cluster.roles =[worker]"))
object PointToPointChannel {
case object RegisterConsumer
case object UnregisterConsumer
case object RegistrationOk
case object UnexpectedRegistration
case object UnregistrationOk
case object UnexpectedUnregistration
case object Reset
case object ResetOk
}
/**
@ -67,6 +76,7 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig {
* singleton instances).
*/
class PointToPointChannel extends Actor with ActorLogging {
import PointToPointChannel._
def receive = idle
@ -107,12 +117,15 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig {
object Consumer {
case object End
case object GetCurrent
case object Ping
case object Pong
}
/**
* The Singleton actor
*/
class Consumer(queue: ActorRef, delegateTo: ActorRef) extends Actor {
import Consumer._
import PointToPointChannel._
@ -135,6 +148,8 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig {
queue ! UnregisterConsumer
case UnregistrationOk
context stop self
case Ping
sender ! Pong
//#consumer-end
}
}
@ -146,12 +161,15 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig {
// subscribe to MemberEvent, re-subscribe when restart
override def preStart(): Unit =
Cluster(context.system).subscribe(self, classOf[MemberEvent])
override def postStop(): Unit =
Cluster(context.system).unsubscribe(self)
val role = "worker"
// sort by age, oldest first
val ageOrdering = Ordering.fromLessThan[Member] { (a, b) a.isOlderThan(b) }
val ageOrdering = Ordering.fromLessThan[Member] {
(a, b) a.isOlderThan(b)
}
var membersByAge: immutable.SortedSet[Member] =
immutable.SortedSet.empty(ageOrdering)
@ -161,27 +179,38 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig {
m.status == MemberStatus.Up && m.hasRole(role))
case MemberUp(m) if (m.hasRole(role)) membersByAge += m
case MemberRemoved(m, _) if (m.hasRole(role)) membersByAge -= m
case other consumer foreach { _.tell(other, sender()) }
case other consumer foreach {
_.tell(other, sender())
}
}
def consumer: Option[ActorSelection] =
membersByAge.headOption map (m context.actorSelection(
RootActorPath(m.address) / "user" / "singleton" / "consumer"))
}
//#singleton-proxy
}
class ClusterSingletonManagerMultiJvmNode1 extends ClusterSingletonManagerSpec
class ClusterSingletonManagerMultiJvmNode2 extends ClusterSingletonManagerSpec
class ClusterSingletonManagerMultiJvmNode3 extends ClusterSingletonManagerSpec
class ClusterSingletonManagerMultiJvmNode4 extends ClusterSingletonManagerSpec
class ClusterSingletonManagerMultiJvmNode5 extends ClusterSingletonManagerSpec
class ClusterSingletonManagerMultiJvmNode6 extends ClusterSingletonManagerSpec
class ClusterSingletonManagerMultiJvmNode7 extends ClusterSingletonManagerSpec
class ClusterSingletonManagerMultiJvmNode8 extends ClusterSingletonManagerSpec
class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerSpec) with STMultiNodeSpec with ImplicitSender {
import ClusterSingletonManagerSpec._
import ClusterSingletonManagerSpec.PointToPointChannel._
import ClusterSingletonManagerSpec.Consumer._
@ -192,6 +221,13 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
val controllerRootActorPath = node(controller)
var _msg = 0
def msg(): Int = {
_msg += 1
_msg
}
def queue: ActorRef = {
// this is used from inside actor construction, i.e. other thread, and must therefore not call `node(controller`
system.actorSelection(controllerRootActorPath / "user" / "queue").tell(Identify("queue"), identifyProbe.ref)
@ -201,7 +237,10 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
Cluster(system) join node(to).address
if (Cluster(system).selfRoles.contains("worker")) createSingleton()
if (Cluster(system).selfRoles.contains("worker")) {
createSingleton()
createSingletonProxy()
}
}
}
@ -227,6 +266,43 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
//#create-singleton-manager
}
def createSingletonProxy(): ActorRef = {
//#create-singleton-proxy
system.actorOf(ClusterSingletonProxy.props(
singletonPath = "/user/singleton/consumer",
role = Some("worker")),
name = "consumerProxy")
//#create-singleton-proxy
}
def verifyProxyMsg(oldest: RoleName, proxyNode: RoleName, msg: Int): Unit = {
enterBarrier("before-" + msg + "-proxy-verified")
// send a message to the proxy
runOn(proxyNode) {
// make sure that the proxy has received membership changes
// and points to the current singleton
val p = TestProbe()
within(5.seconds) {
awaitAssert {
println("#Ping")
system.actorSelection("/user/consumerProxy").tell(Ping, p.ref)
p.expectMsg(1.second, Pong)
println("#Pong")
}
}
// then send the real message
system.actorSelection("/user/consumerProxy") ! msg
}
// expect a message on the oldest node
runOn(oldest) {
expectMsg(5.seconds, msg)
}
enterBarrier("after-" + msg + "-proxy-verified")
}
def consumer(oldest: RoleName): ActorSelection =
system.actorSelection(RootActorPath(node(oldest).address) / "user" / "singleton" / "consumer")
@ -285,35 +361,50 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
join(first, first)
awaitMemberUp(memberProbe, first)
verifyRegistration(first)
verifyMsg(first, msg = 1)
verifyMsg(first, msg = msg())
// join the observer node as well, which should not influence since it doesn't have the "worker" role
join(observer, first)
awaitMemberUp(memberProbe, observer, first)
verifyProxyMsg(first, first, msg = msg())
join(second, first)
awaitMemberUp(memberProbe, second, observer, first)
verifyMsg(first, msg = 2)
verifyMsg(first, msg = msg())
verifyProxyMsg(first, second, msg = msg())
join(third, first)
awaitMemberUp(memberProbe, third, second, observer, first)
verifyMsg(first, msg = 3)
verifyMsg(first, msg = msg())
verifyProxyMsg(first, third, msg = msg())
join(fourth, first)
awaitMemberUp(memberProbe, fourth, third, second, observer, first)
verifyMsg(first, msg = 4)
verifyMsg(first, msg = msg())
verifyProxyMsg(first, fourth, msg = msg())
join(fifth, first)
awaitMemberUp(memberProbe, fifth, fourth, third, second, observer, first)
verifyMsg(first, msg = 5)
verifyMsg(first, msg = msg())
verifyProxyMsg(first, fifth, msg = msg())
join(sixth, first)
awaitMemberUp(memberProbe, sixth, fifth, fourth, third, second, observer, first)
verifyMsg(first, msg = 6)
verifyMsg(first, msg = msg())
verifyProxyMsg(first, sixth, msg = msg())
enterBarrier("after-1")
}
"let the proxy route messages to the singleton in a 6 node cluster" in within(60 seconds) {
verifyProxyMsg(first, first, msg = msg())
verifyProxyMsg(first, second, msg = msg())
verifyProxyMsg(first, third, msg = msg())
verifyProxyMsg(first, fourth, msg = msg())
verifyProxyMsg(first, fifth, msg = msg())
verifyProxyMsg(first, sixth, msg = msg())
}
"hand over when oldest leaves in 6 nodes cluster " in within(30 seconds) {
val leaveRole = first
val newOldestRole = second
@ -323,7 +414,12 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
}
verifyRegistration(second)
verifyMsg(second, msg = 7)
verifyMsg(second, msg = msg())
verifyProxyMsg(second, second, msg = msg())
verifyProxyMsg(second, third, msg = msg())
verifyProxyMsg(second, fourth, msg = msg())
verifyProxyMsg(second, fifth, msg = msg())
verifyProxyMsg(second, sixth, msg = msg())
runOn(leaveRole) {
system.actorSelection("/user/singleton").tell(Identify("singleton"), identifyProbe.ref)
@ -346,19 +442,26 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
crash(second)
verifyRegistration(third)
verifyMsg(third, msg = 8)
verifyMsg(third, msg = msg())
verifyProxyMsg(third, third, msg = msg())
verifyProxyMsg(third, fourth, msg = msg())
verifyProxyMsg(third, fifth, msg = msg())
verifyProxyMsg(third, sixth, msg = msg())
}
"take over when two oldest crash in 3 nodes cluster" in within(60 seconds) {
crash(third, fourth)
verifyRegistration(fifth)
verifyMsg(fifth, msg = 9)
verifyMsg(fifth, msg = msg())
verifyProxyMsg(fifth, fifth, msg = msg())
verifyProxyMsg(fifth, sixth, msg = msg())
}
"take over when oldest crashes in 2 nodes cluster" in within(60 seconds) {
crash(fifth)
verifyRegistration(sixth)
verifyMsg(sixth, msg = 10)
verifyMsg(sixth, msg = msg())
verifyProxyMsg(sixth, sixth, msg = msg())
}
}