cluster apis for typed, #21226

* Cluster management (join, leave, etc)
* Cluster membership subscriptions (MemberUp, MemberRemoved, etc)
* New SelfUp and SelfRemoved events
* change signature of awaitAssert to return the value (not binary compatible)
* Cluster singleton api
This commit is contained in:
Johan Andrén 2017-09-21 17:58:29 +02:00 committed by Patrik Nordwall
parent 94f0492873
commit c31f6b862f
22 changed files with 1397 additions and 56 deletions

View file

@ -15,7 +15,7 @@ import akka.dispatch.MonitorableThreadFactory
import akka.event.{ Logging, LoggingAdapter }
import akka.japi.Util
import akka.pattern._
import akka.remote.{ DefaultFailureDetectorRegistry, FailureDetector, _ }
import akka.remote.{ DefaultFailureDetectorRegistry, _ }
import com.typesafe.config.{ Config, ConfigFactory }
import scala.annotation.varargs
@ -215,6 +215,11 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
*/
def state: CurrentClusterState = readView.state
/**
* Current snapshot of the member itself
*/
def selfMember: Member = readView.self
/**
* Subscribe to one or more cluster domain events.
* The `to` classes can be [[akka.cluster.ClusterEvent.ClusterDomainEvent]]

View file

@ -13,7 +13,7 @@ import akka.cluster.MemberStatus._
import akka.event.EventStream
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.actor.DeadLetterSuppression
import akka.annotation.InternalApi
import akka.annotation.{ DoNotInherit, InternalApi }
import scala.collection.breakOut
import scala.runtime.AbstractFunction5
@ -53,8 +53,11 @@ object ClusterEvent {
/**
* Marker interface for cluster domain events.
*
* Not intended for user extension.
*/
sealed trait ClusterDomainEvent extends DeadLetterSuppression
@DoNotInherit
trait ClusterDomainEvent extends DeadLetterSuppression
// for binary compatibility (used to be a case class)
object CurrentClusterState extends AbstractFunction5[immutable.SortedSet[Member], Set[Member], Set[Address], Option[Address], Map[String, Option[Address]], CurrentClusterState] {

View file

@ -5,12 +5,14 @@
package akka.cluster
import java.io.Closeable
import scala.collection.immutable
import akka.actor.{ Actor, ActorRef, Address, Props }
import akka.cluster.ClusterEvent._
import akka.actor.PoisonPill
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.actor.Deploy
import akka.util.OptionVal
/**
* INTERNAL API
@ -29,6 +31,10 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
@volatile
private var _reachability: Reachability = Reachability.empty
// lazy init below, updated when state is updated
@volatile
private var _cachedSelf: OptionVal[Member] = OptionVal.None
/**
* Current internal cluster stats, updated periodically via event bus.
*/
@ -44,39 +50,46 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
override def postStop(): Unit = cluster.unsubscribe(self)
def receive = {
case e: ClusterDomainEvent e match {
case SeenChanged(convergence, seenBy)
_state = _state.copy(seenBy = seenBy)
case ReachabilityChanged(reachability)
_reachability = reachability
case MemberRemoved(member, _)
_state = _state.copy(members = _state.members - member, unreachable = _state.unreachable - member)
case UnreachableMember(member)
// replace current member with new member (might have different status, only address is used in equals)
_state = _state.copy(unreachable = _state.unreachable - member + member)
case ReachableMember(member)
_state = _state.copy(unreachable = _state.unreachable - member)
case event: MemberEvent
// replace current member with new member (might have different status, only address is used in equals)
val newUnreachable =
if (_state.unreachable.contains(event.member)) _state.unreachable - event.member + event.member
else _state.unreachable
_state = _state.copy(
members = _state.members - event.member + event.member,
unreachable = newUnreachable)
case LeaderChanged(leader)
_state = _state.copy(leader = leader)
case RoleLeaderChanged(role, leader)
_state = _state.copy(roleLeaderMap = _state.roleLeaderMap + (role leader))
case stats: CurrentInternalStats _latestStats = stats
case ClusterShuttingDown
case e: ClusterDomainEvent
e match {
case SeenChanged(convergence, seenBy)
_state = _state.copy(seenBy = seenBy)
case ReachabilityChanged(reachability)
_reachability = reachability
case MemberRemoved(member, _)
_state = _state.copy(members = _state.members - member, unreachable = _state.unreachable - member)
case UnreachableMember(member)
// replace current member with new member (might have different status, only address is used in equals)
_state = _state.copy(unreachable = _state.unreachable - member + member)
case ReachableMember(member)
_state = _state.copy(unreachable = _state.unreachable - member)
case event: MemberEvent
// replace current member with new member (might have different status, only address is used in equals)
val newUnreachable =
if (_state.unreachable.contains(event.member)) _state.unreachable - event.member + event.member
else _state.unreachable
_state = _state.copy(
members = _state.members - event.member + event.member,
unreachable = newUnreachable)
case LeaderChanged(leader)
_state = _state.copy(leader = leader)
case RoleLeaderChanged(role, leader)
_state = _state.copy(roleLeaderMap = _state.roleLeaderMap + (role leader))
case stats: CurrentInternalStats _latestStats = stats
case ClusterShuttingDown
case r: ReachableDataCenter
_state = _state.withUnreachableDataCenters(_state.unreachableDataCenters - r.dataCenter)
case r: UnreachableDataCenter
_state = _state.withUnreachableDataCenters(_state.unreachableDataCenters + r.dataCenter)
case r: ReachableDataCenter
_state = _state.withUnreachableDataCenters(_state.unreachableDataCenters - r.dataCenter)
case r: UnreachableDataCenter
_state = _state.withUnreachableDataCenters(_state.unreachableDataCenters + r.dataCenter)
}
}
e match {
case e: MemberEvent if e.member.address == selfAddress
_cachedSelf = OptionVal.Some(e.member)
case _
}
case s: CurrentClusterState _state = s
}
}).withDispatcher(cluster.settings.UseDispatcher).withDeploy(Deploy.local), name = "clusterEventBusListener")
@ -85,9 +98,19 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
def state: CurrentClusterState = _state
def self: Member = {
_cachedSelf match {
case OptionVal.None
// lazy initialization here, later updated from elsewhere
_cachedSelf = OptionVal.Some(selfFromStateOrPlaceholder)
_cachedSelf.get
case OptionVal.Some(member) member
}
}
private def selfFromStateOrPlaceholder = {
import cluster.selfUniqueAddress
state.members.find(_.uniqueAddress == selfUniqueAddress).
getOrElse(Member(selfUniqueAddress, cluster.selfRoles).copy(status = MemberStatus.Removed))
state.members.find(_.uniqueAddress == selfUniqueAddress)
.getOrElse(Member(selfUniqueAddress, cluster.selfRoles).copy(status = MemberStatus.Removed))
}
/**
@ -162,8 +185,10 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
/**
* Unsubscribe to cluster events.
*/
def close(): Unit =
def close(): Unit = {
_cachedSelf = OptionVal.Some(self.copy(MemberStatus.Removed))
if (!eventBusListener.isTerminated)
eventBusListener ! PoisonPill
}
}

View file

@ -90,7 +90,7 @@ object Member {
* INTERNAL API
* Create a new member with status Joining.
*/
private[cluster] def apply(uniqueAddress: UniqueAddress, roles: Set[String]): Member =
private[akka] def apply(uniqueAddress: UniqueAddress, roles: Set[String]): Member =
new Member(uniqueAddress, Int.MaxValue, Joining, roles)
/**

View file

@ -0,0 +1,5 @@
# awaitAssert signature update introduced in #23613
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.testkit.TestKitBase.awaitAssert")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.testkit.TestKitBase.awaitAssert")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.testkit.TestKit.awaitAssert")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.testkit.javadsl.TestKit.awaitAssert")

View file

@ -4,18 +4,21 @@
package akka.testkit
import language.postfixOps
import scala.annotation.{ tailrec }
import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.reflect.ClassTag
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import akka.actor._
import akka.util.{ Timeout, BoxedType }
import akka.util.{ BoxedType, OptionVal, Timeout }
import scala.util.control.NonFatal
import scala.Some
import java.util.concurrent.TimeUnit
import akka.actor.IllegalActorStateException
import akka.actor.DeadLetter
import akka.actor.Terminated
@ -283,7 +286,9 @@ trait TestKitBase {
}
/**
* Evaluate the given assert every `interval` until it does not throw an exception.
* Evaluate the given assert every `interval` until it does not throw an exception and return the
* result.
*
* If the `max` timeout expires the last exception is thrown.
*
* If no timeout is given, take it from the innermost enclosing `within`
@ -292,19 +297,29 @@ trait TestKitBase {
* Note that the timeout is scaled using Duration.dilated,
* which uses the configuration entry "akka.test.timefactor".
*/
def awaitAssert(a: Any, max: Duration = Duration.Undefined, interval: Duration = 100.millis) {
def awaitAssert[A](a: A, max: Duration = Duration.Undefined, interval: Duration = 100.millis): A = {
val _max = remainingOrDilated(max)
val stop = now + _max
@tailrec
def poll(t: Duration) {
val failed =
try { a; false } catch {
def poll(t: Duration): A = {
// cannot use null-ness of result as signal it failed
// because Java API and not wanting to return a value will be "return null"
var failed = false
val result: A =
try {
val aRes = a
failed = false
aRes
} catch {
case NonFatal(e)
failed = true
if ((now + t) >= stop) throw e
true
else null.asInstanceOf[A]
}
if (failed) {
if (!failed) result
else {
Thread.sleep(t.toMillis)
poll((stop - now) min interval)
}

View file

@ -189,9 +189,9 @@ class TestKit(system: ActorSystem) {
def awaitCond(max: Duration, interval: Duration, message: String, p: Supplier[Boolean]): Unit =
tp.awaitCond(p.get, max, interval, message)
def awaitAssert(a: Supplier[Any]): Unit = tp.awaitAssert(a.get)
def awaitAssert[A](a: Supplier[A]): A = tp.awaitAssert(a.get)
def awaitAssert(max: Duration, a: Supplier[Any]): Unit = tp.awaitAssert(a.get, max)
def awaitAssert[A](max: Duration, a: Supplier[A]): A = tp.awaitAssert(a.get, max)
/**
* Evaluate the given assert every `interval` until it does not throw an exception.
@ -200,7 +200,7 @@ class TestKit(system: ActorSystem) {
* Note that the timeout is scaled using Duration.dilated,
* which uses the configuration entry "akka.test.timefactor".
*/
def awaitAssert(max: Duration, interval: Duration, a: Supplier[Any]): Unit = tp.awaitAssert(a.get, max, interval)
def awaitAssert[A](max: Duration, interval: Duration, a: Supplier[A]): A = tp.awaitAssert(a.get, max, interval)
/**
* Same as `expectMsg(remainingOrDefault, obj)`, but correctly treating the timeFactor.

View file

@ -0,0 +1,22 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.testkit.javadsl
import akka.typed.ActorSystem
import akka.typed.testkit.TestKitSettings
/**
* Java API:
*/
class TestProbe[M](name: String, system: ActorSystem[_], settings: TestKitSettings) extends akka.typed.testkit.scaladsl.TestProbe[M](name)(system, settings) {
def this(system: ActorSystem[_], settings: TestKitSettings) = this("testProbe", system, settings)
/**
* Same as `expectMsgType[T](remainingOrDefault)`, but correctly treating the timeFactor.
*/
def expectMsgType[T <: M](t: Class[T]): T =
expectMsgClass_internal(remainingOrDefault, t)
}

View file

@ -5,19 +5,25 @@ package akka.typed.testkit.scaladsl
import scala.concurrent.duration._
import java.util.concurrent.BlockingDeque
import akka.typed.Behavior
import akka.typed.scaladsl.Actor
import akka.typed.ActorSystem
import java.util.concurrent.LinkedBlockingDeque
import java.util.concurrent.atomic.AtomicInteger
import akka.typed.ActorRef
import akka.util.Timeout
import akka.util.PrettyDuration.PrettyPrintableDuration
import scala.concurrent.Await
import com.typesafe.config.Config
import akka.typed.testkit.TestKitSettings
import akka.util.BoxedType
import scala.annotation.tailrec
import scala.reflect.ClassTag
import scala.util.control.NonFatal
object TestProbe {
private val testActorId = new AtomicInteger(0)
@ -35,6 +41,7 @@ object TestProbe {
}
class TestProbe[M](name: String)(implicit val system: ActorSystem[_], val settings: TestKitSettings) {
import TestProbe._
private val queue = new LinkedBlockingDeque[M]
@ -219,11 +226,48 @@ class TestProbe[M](name: String)(implicit val system: ActorSystem[_], val settin
def expectMsgType[T <: M](max: FiniteDuration)(implicit t: ClassTag[T]): T =
expectMsgClass_internal(max.dilated, t.runtimeClass.asInstanceOf[Class[T]])
private def expectMsgClass_internal[C](max: FiniteDuration, c: Class[C]): C = {
private[akka] def expectMsgClass_internal[C](max: FiniteDuration, c: Class[C]): C = {
val o = receiveOne(max)
assert(o != null, s"timeout ($max) during expectMsgClass waiting for $c")
assert(BoxedType(c) isInstance o, s"expected $c, found ${o.getClass} ($o)")
o.asInstanceOf[C]
}
/**
* Evaluate the given assert every `interval` until it does not throw an exception and return the
* result.
*
* If the `max` timeout expires the last exception is thrown.
*
* If no timeout is given, take it from the innermost enclosing `within`
* block.
*
* Note that the timeout is scaled using Duration.dilated,
* which uses the configuration entry "akka.test.timefactor".
*/
def awaitAssert[A](a: A, max: Duration = Duration.Undefined, interval: Duration = 100.millis): A = {
val _max = remainingOrDilated(max)
val stop = now + _max
@tailrec
def poll(t: Duration): A = {
val result: A =
try {
a
} catch {
case NonFatal(e)
if ((now + t) >= stop) throw e
else null.asInstanceOf[A]
}
if (result != null) result
else {
Thread.sleep(t.toMillis)
poll((stop - now) min interval)
}
}
poll(_max min interval)
}
}

View file

@ -0,0 +1,65 @@
package akka.typed.cluster;
import akka.cluster.ClusterEvent;
import akka.typed.ActorSystem;
import akka.typed.testkit.TestKitSettings;
import akka.typed.testkit.javadsl.TestProbe;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import scala.concurrent.Await;
import scala.concurrent.duration.Duration;
public class ClusterApiTest extends JUnitSuite {
@Test
public void joinLeaveAndObserve() throws Exception {
Config config = ConfigFactory.parseString(
"akka.actor.provider = cluster \n" +
"akka.remote.artery.enabled = true \n"+
"akka.remote.netty.tcp.port = 0 \n"+
"akka.remote.artery.canonical.port = 0 \n"+
"akka.cluster.jmx.multi-mbeans-in-same-jvm = on \n"+
"akka.coordinated-shutdown.terminate-actor-system = off \n"+
"akka.actor { \n"+
" serialize-messages = off \n"+
" allow-java-serialization = off \n"+
"}"
);
ActorSystem<?> system1 = ActorSystem.wrap(akka.actor.ActorSystem.create("ClusterApiTest", config));
ActorSystem<?> system2 = ActorSystem.wrap(akka.actor.ActorSystem.create("ClusterApiTest", config));
try {
TestKitSettings testKitSettings = new TestKitSettings(system1.settings().config());
Cluster cluster1 = Cluster.get(system1);
Cluster cluster2 = Cluster.get(system2);
TestProbe<ClusterEvent.ClusterDomainEvent> probe1 = new TestProbe<>(system1, testKitSettings);
cluster1.subscriptions().tell(new Subscribe<>(probe1.ref().narrow(), SelfUp.class));
cluster1.manager().tell(new Join(cluster1.selfMember().address()));
probe1.expectMsgType(SelfUp.class);
TestProbe<ClusterEvent.ClusterDomainEvent> probe2 = new TestProbe<>(system2, testKitSettings);
cluster2.subscriptions().tell(new Subscribe<>(probe2.ref().narrow(), SelfUp.class));
cluster2.manager().tell(new Join(cluster1.selfMember().address()));
probe2.expectMsgType(SelfUp.class);
cluster2.subscriptions().tell(new Subscribe<>(probe2.ref().narrow(), SelfRemoved.class));
cluster2.manager().tell(new Leave(cluster2.selfMember().address()));
probe2.expectMsgType(SelfRemoved.class);
} finally {
// TODO no java API to terminate actor system
Await.result(system1.terminate().zip(system2.terminate()), Duration.create("5 seconds"));
}
}
}

View file

@ -0,0 +1,113 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.cluster
import akka.cluster.ClusterEvent._
import akka.cluster.MemberStatus
import akka.typed.TypedSpec
import akka.typed.internal.adapter.ActorSystemAdapter
import akka.typed.scaladsl.adapter._
import akka.typed.testkit.TestKitSettings
import akka.typed.testkit.scaladsl.TestProbe
import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.Await
import scala.concurrent.duration._
object ClusterApiSpec {
val config = ConfigFactory.parseString(
"""
akka.actor.provider = cluster
akka.remote.artery.enabled = true
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
akka.cluster.jmx.multi-mbeans-in-same-jvm = on
akka.coordinated-shutdown.terminate-actor-system = off
akka.actor {
serialize-messages = off
allow-java-serialization = off
}
""")
}
class ClusterApiSpec extends TypedSpec(ClusterApiSpec.config) with ScalaFutures {
val testSettings = TestKitSettings(adaptedSystem)
val clusterNode1 = Cluster(adaptedSystem)
val untypedSystem1 = ActorSystemAdapter.toUntyped(adaptedSystem)
object `A typed cluster` {
def `01 must join a cluster and observe events from both sides`() = {
val system2 = akka.actor.ActorSystem(adaptedSystem.name, adaptedSystem.settings.config)
val adaptedSystem2 = system2.toTyped
try {
val clusterNode2 = Cluster(adaptedSystem2)
val node1Probe = TestProbe[AnyRef]()(adaptedSystem, testSettings)
val node2Probe = TestProbe[AnyRef]()(adaptedSystem2, testSettings)
// initial cached selfMember
clusterNode1.selfMember.status should ===(MemberStatus.Removed)
clusterNode2.selfMember.status should ===(MemberStatus.Removed)
// check that subscriptions work
clusterNode1.subscriptions ! Subscribe(node1Probe.ref, classOf[MemberEvent])
clusterNode1.manager ! Join(clusterNode1.selfMember.address)
node1Probe.expectMsgType[MemberUp].member.uniqueAddress == clusterNode1.selfMember.uniqueAddress
// check that cached selfMember is updated
node1Probe.awaitAssert(
clusterNode1.selfMember.status should ===(MemberStatus.Up)
)
// subscribing to OnSelfUp when already up
clusterNode1.subscriptions ! Subscribe(node1Probe.ref, classOf[SelfUp])
node1Probe.expectMsgType[SelfUp]
// selfMember update and on up subscription on node 2 when joining
clusterNode2.subscriptions ! Subscribe(node2Probe.ref, classOf[SelfUp])
clusterNode2.manager ! Join(clusterNode1.selfMember.address)
node2Probe.awaitAssert(
clusterNode2.selfMember.status should ===(MemberStatus.Up)
)
node2Probe.expectMsgType[SelfUp]
// events about node2 joining to subscriber on node1
node1Probe.expectMsgType[MemberJoined].member.uniqueAddress == clusterNode2.selfMember.uniqueAddress
node1Probe.expectMsgType[MemberUp].member.uniqueAddress == clusterNode1.selfMember.uniqueAddress
// OnSelfRemoved and subscription events around node2 leaving
clusterNode2.subscriptions ! Subscribe(node2Probe.ref, classOf[SelfRemoved])
clusterNode2.manager ! Leave(clusterNode2.selfMember.address)
// node1 seeing all those transition events
node1Probe.expectMsgType[MemberLeft].member.uniqueAddress == clusterNode2.selfMember.uniqueAddress
node1Probe.expectMsgType[MemberExited].member.uniqueAddress == clusterNode2.selfMember.uniqueAddress
node1Probe.expectMsgType[MemberRemoved].member.uniqueAddress == clusterNode2.selfMember.uniqueAddress
// selfMember updated and self removed event gotten
node2Probe.awaitAssert(
clusterNode2.selfMember.status should ===(MemberStatus.Removed)
)
node2Probe.expectMsg(SelfRemoved(MemberStatus.Exiting))
// subscribing to SelfRemoved when already removed yields immediate message back
clusterNode2.subscriptions ! Subscribe(node2Probe.ref, classOf[SelfRemoved])
node2Probe.expectMsg(SelfRemoved(MemberStatus.Exiting))
// subscribing to SelfUp when already removed yields nothing
clusterNode2.subscriptions ! Subscribe(node2Probe.ref, classOf[SelfUp])
node2Probe.expectNoMsg(100.millis)
} finally {
Await.result(system2.terminate(), 3.seconds)
}
}
}
}

View file

@ -0,0 +1,55 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.cluster
import akka.cluster.sharding.ClusterShardingSettings
import akka.typed.scaladsl.Actor
import akka.typed.scaladsl.adapter._
import akka.typed.{ ActorSystem }
class ClusterShardingApiSpec {
// Compile only for now
val system: akka.actor.ActorSystem = ???
val typedSystem: ActorSystem[Nothing] = system.toTyped
val cluster = Cluster(typedSystem)
trait EntityProtocol
case class Add(thing: String) extends EntityProtocol
case object PassHence extends EntityProtocol
val entityBehavior =
Actor.deferred[EntityProtocol] { _
var things: List[String] = Nil
Actor.immutable[EntityProtocol] { (_, msg)
msg match {
case Add(thing)
things = thing :: things
Actor.same
case PassHence
Actor.stopped
}
}
}
val sharding = ClusterSharding(typedSystem).spawn(
entityBehavior,
"things-lists",
ClusterShardingSettings(typedSystem.settings.config),
maxNumberOfShards = 25,
handOffStopMessage = PassHence
)
sharding ! ShardingEnvelope("1", Add("bananas"))
val entity1 = ClusterSharding.entityRefFor("1", sharding)
entity1 ! Add("pineapple")
// start but no command
sharding ! StartEntity("2")
}

View file

@ -0,0 +1,151 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.cluster
import java.nio.charset.StandardCharsets
import akka.actor.ExtendedActorSystem
import akka.serialization.SerializerWithStringManifest
import akka.typed.internal.adapter.ActorSystemAdapter
import akka.typed.scaladsl.Actor
import akka.typed.scaladsl.adapter._
import akka.typed.testkit.TestKitSettings
import akka.typed.testkit.scaladsl.TestProbe
import akka.typed.{ ActorRef, Props, TypedSpec }
import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.Await
import scala.concurrent.duration._
object ClusterSingletonApiSpec {
val config = ConfigFactory.parseString(
"""
akka.actor {
provider = cluster
serialize-messages = off
allow-java-serialization = off
serializers {
test = "akka.typed.cluster.ClusterSingletonApiSpec$PingSerializer"
}
serialization-bindings {
"akka.typed.cluster.ClusterSingletonApiSpec$Ping" = test
"akka.typed.cluster.ClusterSingletonApiSpec$Pong$" = test
"akka.typed.cluster.ClusterSingletonApiSpec$Perish$" = test
}
}
akka.remote.artery.enabled = true
akka.remote.artery.canonical.port = 25552
akka.cluster.jmx.multi-mbeans-in-same-jvm = on
""")
trait PingProtocol
case object Pong
case class Ping(respondTo: ActorRef[Pong.type]) extends PingProtocol
case object Perish extends PingProtocol
val pingPong = Actor.immutable[PingProtocol] { (ctx, msg)
msg match {
case Ping(respondTo)
respondTo ! Pong
Actor.same
case Perish
Actor.stopped
}
}
class PingSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest {
def identifier: Int = 47
def manifest(o: AnyRef): String = o match {
case _: Ping "a"
case Pong "b"
case Perish "c"
}
def toBinary(o: AnyRef): Array[Byte] = o match {
case p: Ping ActorRefResolver(system.toTyped).toSerializationFormat(p.respondTo).getBytes(StandardCharsets.UTF_8)
case Pong Array.emptyByteArray
case Perish Array.emptyByteArray
}
def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
case "a" Ping(ActorRefResolver(system.toTyped).resolveActorRef(new String(bytes, StandardCharsets.UTF_8)))
case "b" Pong
case "c" Perish
}
}
}
class ClusterSingletonApiSpec extends TypedSpec(ClusterSingletonApiSpec.config) with ScalaFutures {
import ClusterSingletonApiSpec._
implicit val testSettings = TestKitSettings(adaptedSystem)
val clusterNode1 = Cluster(adaptedSystem)
val untypedSystem1 = ActorSystemAdapter.toUntyped(adaptedSystem)
val system2 = akka.actor.ActorSystem(
adaptedSystem.name,
ConfigFactory.parseString(
"""
akka.remote.artery.canonical.port = 0
akka.cluster.roles = ["singleton"]
"""
).withFallback(adaptedSystem.settings.config))
val adaptedSystem2 = system2.toTyped
val clusterNode2 = Cluster(adaptedSystem2)
object `A typed cluster singleton` {
def `01 must be accessible from two nodes in a cluster`() = {
val node1UpProbe = TestProbe[SelfUp]()(adaptedSystem, implicitly[TestKitSettings])
clusterNode1.subscriptions ! Subscribe(node1UpProbe.ref, classOf[SelfUp])
val node2UpProbe = TestProbe[SelfUp]()(adaptedSystem2, implicitly[TestKitSettings])
clusterNode1.subscriptions ! Subscribe(node2UpProbe.ref, classOf[SelfUp])
clusterNode1.manager ! Join(clusterNode1.selfMember.address)
clusterNode2.manager ! Join(clusterNode1.selfMember.address)
node1UpProbe.expectMsgType[SelfUp]
node2UpProbe.expectMsgType[SelfUp]
val cs1 = ClusterSingleton(adaptedSystem)
val cs2 = ClusterSingleton(adaptedSystem2)
val settings = ClusterSingletonSettings(adaptedSystem).withRole("singleton")
val node1ref = cs1.spawn(pingPong, "ping-pong", Props.empty, settings, Perish)
val node2ref = cs2.spawn(pingPong, "ping-pong", Props.empty, settings, Perish)
// subsequent spawning returns the same refs
cs1.spawn(pingPong, "ping-pong", Props.empty, settings, Perish) should ===(node1ref)
cs2.spawn(pingPong, "ping-pong", Props.empty, settings, Perish) should ===(node2ref)
val node1PongProbe = TestProbe[Pong.type]()(adaptedSystem, implicitly[TestKitSettings])
val node2PongProbe = TestProbe[Pong.type]()(adaptedSystem2, implicitly[TestKitSettings])
node1PongProbe.awaitAssert({
node1ref ! Ping(node1PongProbe.ref)
node1PongProbe.expectMsg(Pong)
}, 3.seconds)
node2PongProbe.awaitAssert({
node2ref ! Ping(node2PongProbe.ref)
node2PongProbe.expectMsg(Pong)
}, 3.seconds)
}
}
override def afterAll(): Unit = {
super.afterAll()
Await.result(system2.terminate(), 3.seconds)
}
}

View file

@ -0,0 +1,112 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.scaladsl.adapter
import java.nio.charset.StandardCharsets
import akka.Done
import akka.testkit.AkkaSpec
import akka.typed.{ ActorRef, ActorSystem }
import akka.typed.scaladsl.Actor
import akka.actor.{ ExtendedActorSystem, ActorSystem UntypedActorSystem }
import akka.cluster.Cluster
import akka.serialization.{ BaseSerializer, SerializerWithStringManifest }
import com.typesafe.config.ConfigFactory
import scala.concurrent.Promise
import akka.typed.cluster.ActorRefResolver
import akka.typed.internal.adapter.ActorRefAdapter
class PingSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest {
override def identifier = 41
override def manifest(o: AnyRef) = "a"
override def toBinary(o: AnyRef) = o match {
case RemotingSpec.Ping(who)
ActorRefResolver(system.toTyped).toSerializationFormat(who).getBytes(StandardCharsets.UTF_8)
}
override def fromBinary(bytes: Array[Byte], manifest: String) = {
val str = new String(bytes, StandardCharsets.UTF_8)
val ref = ActorRefResolver(system.toTyped).resolveActorRef[String](str)
RemotingSpec.Ping(ref)
}
}
object RemotingSpec {
def config = ConfigFactory.parseString(
s"""
akka {
loglevel = debug
actor {
provider = cluster
warn-about-java-serializer-usage = off
serialize-creators = off
serializers {
test = "akka.typed.scaladsl.adapter.PingSerializer"
}
serialization-bindings {
"akka.typed.scaladsl.adapter.RemotingSpec$$Ping" = test
}
}
remote.artery {
enabled = on
canonical {
hostname = 127.0.0.1
port = 0
}
}
}
""")
case class Ping(sender: ActorRef[String])
}
class RemotingSpec extends AkkaSpec(RemotingSpec.config) {
import RemotingSpec._
val typedSystem = system.toTyped
"the adapted system" should {
"something something" in {
val pingPromise = Promise[Done]()
val ponger = Actor.immutable[Ping]((_, msg)
msg match {
case Ping(sender)
pingPromise.success(Done)
sender ! "pong"
Actor.stopped
})
// typed actor on system1
val pingPongActor = system.spawn(ponger, "pingpong")
val system2 = UntypedActorSystem(system.name + "-system2", RemotingSpec.config)
val typedSystem2 = system2.toTyped
try {
// resolve the actor from node2
val remoteRefStr = ActorRefResolver(typedSystem).toSerializationFormat(pingPongActor)
val remoteRef: ActorRef[Ping] =
ActorRefResolver(typedSystem2).resolveActorRef[Ping](remoteRefStr)
val pongPromise = Promise[Done]()
val recipient = system2.spawn(Actor.immutable[String] { (_, msg)
pongPromise.success(Done)
Actor.stopped
}, "recipient")
remoteRef ! Ping(recipient)
pingPromise.future.futureValue should ===(Done)
pongPromise.future.futureValue should ===(Done)
} finally {
system2.terminate()
}
}
}
}

View file

@ -220,14 +220,14 @@ object ActorSystem {
val untyped = new a.ActorSystemImpl(name, appConfig, cl, executionContext,
Some(PropsAdapter(() guardianBehavior, guardianProps)), actorSystemSettings)
untyped.start()
new ActorSystemAdapter(untyped)
ActorSystemAdapter.AdapterExtension(untyped).adapter
}
/**
* Wrap an untyped [[akka.actor.ActorSystem]] such that it can be used from
* Akka Typed [[Behavior]].
*/
def wrap(untyped: a.ActorSystem): ActorSystem[Nothing] = new ActorSystemAdapter(untyped.asInstanceOf[a.ActorSystemImpl])
def wrap(untyped: a.ActorSystem): ActorSystem[Nothing] = ActorSystemAdapter.AdapterExtension(untyped.asInstanceOf[a.ActorSystemImpl]).adapter
}
/**

View file

@ -0,0 +1,159 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.cluster
import akka.actor.Address
import akka.annotation.DoNotInherit
import akka.cluster.ClusterEvent.{ ClusterDomainEvent, CurrentClusterState }
import akka.cluster._
import akka.japi.Util
import akka.typed.cluster.internal.AdapterClusterImpl
import akka.typed.{ ActorRef, ActorSystem, Extension, ExtensionId }
import scala.collection.immutable
/**
* Messages for subscribing to changes in the cluster state
*
* Not intended for user extension.
*/
@DoNotInherit
sealed trait ClusterStateSubscription
/**
* Subscribe to cluster state changes. The initial state of the cluster will be sent as
* a "replay" of the subscribed events.
*
* @param subscriber A subscriber that will receive events until it is unsubscribed or stops
* @param eventClass The type of events to subscribe to, can be individual event types such as
* `ReachabilityEvent` or one of the common supertypes, such as `MemberEvent` to get
* all the subtypes of events.
*/
final case class Subscribe[A <: ClusterDomainEvent](
subscriber: ActorRef[A],
eventClass: Class[A]) extends ClusterStateSubscription
/**
* Subscribe to this node being up, after sending this event the subscription is automatically
* cancelled. If the node is already up the event is also sent to the subscriber. If the node was up
* but is no more because it left or is leaving the cluster, no event is sent and the subscription
* request is ignored.
*
* Note: Only emitted for the typed cluster.
*/
final case class SelfUp(currentClusterState: CurrentClusterState) extends ClusterDomainEvent
/**
* Subscribe to this node being removed from the cluster. If the node was already removed from the cluster
* when this subscription is created it will be responded to immediately from the subscriptions actor.
*
* Note: Only emitted for the typed cluster.
*/
final case class SelfRemoved(previousStatus: MemberStatus) extends ClusterDomainEvent
final case class Unsubscribe[T](subscriber: ActorRef[T]) extends ClusterStateSubscription
final case class GetCurrentState(recipient: ActorRef[CurrentClusterState]) extends ClusterStateSubscription
/**
* Not intended for user extension.
*/
@DoNotInherit
sealed trait ClusterCommand
/**
* Try to join this cluster node with the node specified by 'address'.
*
* An actor system can only join a cluster once. Additional attempts will be ignored.
* When it has successfully joined it must be restarted to be able to join another
* cluster or to join the same cluster again.
*
* The name of the [[akka.actor.ActorSystem]] must be the same for all members of a
* cluster.
*/
final case class Join(address: Address) extends ClusterCommand
/**
* Scala API: Join the specified seed nodes without defining them in config.
* Especially useful from tests when Addresses are unknown before startup time.
*
* An actor system can only join a cluster once. Additional attempts will be ignored.
* When it has successfully joined it must be restarted to be able to join another
* cluster or to join the same cluster again.
*/
final case class JoinSeedNodes(seedNodes: immutable.Seq[Address]) extends ClusterCommand {
/**
* Java API: Join the specified seed nodes without defining them in config.
* Especially useful from tests when Addresses are unknown before startup time.
*
* An actor system can only join a cluster once. Additional attempts will be ignored.
* When it has successfully joined it must be restarted to be able to join another
* cluster or to join the same cluster again.
*
* Creates a defensive copy of the list to ensure immutability.
*/
def this(seedNodes: java.util.List[Address]) = this(Util.immutableSeq(seedNodes))
}
/**
* Send command to issue state transition to LEAVING for the node specified by 'address'.
* The member will go through the status changes [[MemberStatus]] `Leaving` (not published to
* subscribers) followed by [[MemberStatus]] `Exiting` and finally [[MemberStatus]] `Removed`.
*
* Note that this command can be issued to any member in the cluster, not necessarily the
* one that is leaving. The cluster extension, but not the actor system or JVM, of the
* leaving member will be shutdown after the leader has changed status of the member to
* Exiting. Thereafter the member will be removed from the cluster. Normally this is
* handled automatically, but in case of network failures during this process it might
* still be necessary to set the nodes status to Down in order to complete the removal.
*/
final case class Leave(address: Address) extends ClusterCommand
/**
* Send command to DOWN the node specified by 'address'.
*
* When a member is considered by the failure detector to be unreachable the leader is not
* allowed to perform its duties, such as changing status of new joining members to 'Up'.
* The status of the unreachable member must be changed to 'Down', which can be done with
* this method.
*/
final case class Down(address: Address) extends ClusterCommand
/**
* Akka Typed Cluster API entry point
*/
object Cluster extends ExtensionId[Cluster] {
def createExtension(system: ActorSystem[_]): Cluster = new AdapterClusterImpl(system)
def get(system: ActorSystem[_]): Cluster = apply(system)
}
/**
* Not intended for user extension.
*/
@DoNotInherit
abstract class Cluster extends Extension {
/** Details about this cluster node itself */
def selfMember: Member
/** Returns true if this cluster instance has be shutdown. */
def isTerminated: Boolean
/** Current snapshot state of the cluster. */
def state: CurrentClusterState
/**
* @return an actor that allows for subscribing to messages when the cluster state changes
*/
def subscriptions: ActorRef[ClusterStateSubscription]
/**
* @return an actor that accepts commands to join, leave and down nodes in a cluster
*/
def manager: ActorRef[ClusterCommand]
}

View file

@ -0,0 +1,196 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.cluster
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.ClusterShardingSettings
import akka.typed.{ ActorRef, ActorSystem, Behavior, Extension, ExtensionId, Props }
sealed case class ShardingEnvelope[A](entityId: String, message: A)
object StartEntity {
def apply[A](entityId: String): ShardingEnvelope[A] =
new ShardingEnvelope[A](entityId, null.asInstanceOf[A])
}
object TypedMessageExtractor {
/**
* Scala API:
*
* Create the default message extractor, using envelopes to identify what entity a message is for
* and the hashcode of the entityId to decide which shard an entity belongs to.
*
* This is recommended since it does not force details about sharding into the entity protocol
*/
def apply[A](maxNumberOfShards: Int): TypedMessageExtractor[ShardingEnvelope[A], A] =
new DefaultMessageExtractor[A](maxNumberOfShards)
/**
* Scala API:
*
* Create a message extractor for a protocol where the entity id is available in each message.
*/
def noEnvelope[A](
maxNumberOfShards: Int,
extractEntityId: A String
): TypedMessageExtractor[A, A] =
new DefaultNoEnvelopeMessageExtractor[A](maxNumberOfShards) {
// TODO catch MatchError here and return null for those to yield an "unhandled" when partial functions are used?
def entityId(message: A) = extractEntityId(message)
}
}
/**
* Entirely customizable typed message extractor. Prefer [[DefaultMessageExtractor]] or [[DefaultNoEnvelopeMessageExtractor]]
* if possible.
*
* @tparam E Possibly an envelope around the messages accepted by the entity actor, is the same as `A` if there is no
* envelope.
* @tparam A The type of message accepted by the entity actor
*/
trait TypedMessageExtractor[E, A] {
/**
* Extract the entity id from an incoming `message`. If `null` is returned
* the message will be `unhandled`, i.e. posted as `Unhandled` messages on the event stream
*/
def entityId(message: E): String
/**
* Extract the message to send to the entity from an incoming `message`.
* Note that the extracted message does not have to be the same as the incoming
* message to support wrapping in message envelope that is unwrapped before
* sending to the entity actor.
*
* If the returned value is `null`, and the entity isn't running yet the entity will be started
* but no message will be delivered to it.
*/
def entityMessage(message: E): A
/**
* Extract the entity id from an incoming `message`. Only messages that passed the [[#entityId]]
* function will be used as input to this function.
*/
def shardId(message: E): String
}
/**
* Java API:
*
* Default message extractor type, using envelopes to identify what entity a message is for
* and the hashcode of the entityId to decide which shard an entity belongs to.
*
* This is recommended since it does not force details about sharding into the entity protocol
*
* @tparam A The type of message accepted by the entity actor
*/
final class DefaultMessageExtractor[A](maxNumberOfShards: Int) extends TypedMessageExtractor[ShardingEnvelope[A], A] {
def entityId(envelope: ShardingEnvelope[A]) = envelope.entityId
def entityMessage(envelope: ShardingEnvelope[A]) = envelope.message
def shardId(envelope: ShardingEnvelope[A]) = (math.abs(envelope.entityId.hashCode) % maxNumberOfShards).toString
}
/**
* Java API:
*
* Default message extractor type, using a property of the message to identify what entity a message is for
* and the hashcode of the entityId to decide which shard an entity belongs to.
*
* This is recommended since it does not force details about sharding into the entity protocol
*
* @tparam A The type of message accepted by the entity actor
*/
abstract class DefaultNoEnvelopeMessageExtractor[A](maxNumberOfShards: Int) extends TypedMessageExtractor[A, A] {
def entityMessage(message: A) = message
def shardId(message: A) = {
val id = entityId(message)
if (id != null) (math.abs(id.hashCode) % maxNumberOfShards).toString
else null
}
}
/**
* A reference to an entityId and the local access to sharding, allows for actor-like interaction
*
* The entity ref must be resolved locally and cannot be sent to another node.
*
* TODO what about ask, should it actually implement ActorRef to be exactly like ActorRef and callers does not have
* to know at all about it or is it good with a distinction but lookalike API?
*/
trait EntityRef[A] {
/**
* Send a message to the entity referenced by this EntityRef using *at-most-once*
* messaging semantics.
*/
def tell(msg: A): Unit
}
object EntityRef {
implicit final class EntityRefOps[T](val ref: EntityRef[T]) extends AnyVal {
/**
* Send a message to the Actor referenced by this ActorRef using *at-most-once*
* messaging semantics.
*/
def !(msg: T): Unit = ref.tell(msg)
}
}
object ClusterSharding extends ExtensionId[ClusterSharding] {
def createExtension(system: ActorSystem[_]): ClusterSharding = ???
/**
* Create an ActorRef-like reference to a specific sharded entity. Messages sent to it will be wrapped
* in a [[ShardingEnvelope]] and passed to the local shard region or proxy.
*/
def entityRefFor[A](entityId: String, actorRef: ActorRef[ShardingEnvelope[A]]): EntityRef[A] =
new EntityRef[A] {
def tell(msg: A): Unit = actorRef ! ShardingEnvelope(entityId, msg)
}
}
trait ClusterSharding extends Extension {
/**
* Spawn a shard region or a proxy depending on if the settings require role and if this node has such a role.
*
* Messages are sent to the entities by wrapping the messages in a [[ShardingEnvelope]] with the entityId of the
* recipient actor.
* A [[DefaultMessageExtractor]] will be used for extracting entityId and shardId
* [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] will be used for shard allocation strategy.
*
* @param behavior The behavior for entities
* @param typeName A name that uniquely identifies the type of entity in this cluster
* @param handOffStopMessage Message sent to an entity to tell it to stop
* @tparam A The type of command the entity accepts
*/
def spawn[A](
behavior: Behavior[A],
typeName: String,
settings: ClusterShardingSettings,
maxNumberOfShards: Int,
handOffStopMessage: A): ActorRef[ShardingEnvelope[A]]
/**
* Spawn a shard region or a proxy depending on if the settings require role and if this node has such a role.
*
* @param behavior The behavior for entities
* @param typeName A name that uniquely identifies the type of entity in this cluster
* @param entityProps Props to apply when starting an entity
* @param handOffStopMessage Message sent to an entity to tell it to stop
* @tparam E A possible envelope around the message the entity accepts
* @tparam A The type of command the entity accepts
*/
def spawn[E, A](
behavior: Behavior[A],
typeName: String,
entityProps: Props,
settings: ClusterShardingSettings,
messageExtractor: TypedMessageExtractor[E, A],
allocationStrategy: ShardAllocationStrategy,
handOffStopMessage: A
): ActorRef[E]
}

View file

@ -0,0 +1,135 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.cluster
import akka.actor.NoSerializationVerificationNeeded
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.cluster.ClusterSettings.DataCenter
import akka.cluster.singleton.{ ClusterSingletonManagerSettings, ClusterSingletonProxySettings }
import akka.typed.cluster.internal.AdaptedClusterSingletonImpl
import akka.typed.{ ActorRef, ActorSystem, Behavior, Extension, ExtensionId, Props }
import com.typesafe.config.Config
import scala.concurrent.duration.FiniteDuration
object ClusterSingletonSettings {
def apply(
system: ActorSystem[_]
): ClusterSingletonSettings = fromConfig(system.settings.config.getConfig("akka.cluster"))
def fromConfig(
config: Config
): ClusterSingletonSettings = {
// TODO introduce a config namespace for typed singleton and read that?
// currently singleton name is required and then discarded, for example
val mgrSettings = ClusterSingletonManagerSettings(config.getConfig("singleton"))
val proxySettings = ClusterSingletonProxySettings(config.getConfig("singleton-proxy"))
new ClusterSingletonSettings(
mgrSettings.role,
proxySettings.dataCenter,
proxySettings.singletonIdentificationInterval,
mgrSettings.removalMargin,
mgrSettings.handOverRetryInterval,
proxySettings.bufferSize
)
}
}
final class ClusterSingletonSettings(
val role: Option[String],
val dataCenter: Option[DataCenter],
val singletonIdentificationInterval: FiniteDuration,
val removalMargin: FiniteDuration,
val handOverRetryInterval: FiniteDuration,
val bufferSize: Int) extends NoSerializationVerificationNeeded {
def withRole(role: String): ClusterSingletonSettings = copy(role = Some(role))
def withNoRole(): ClusterSingletonSettings = copy(role = None)
def withDataCenter(dataCenter: DataCenter): ClusterSingletonSettings = copy(dataCenter = Some(dataCenter))
def withNoDataCenter(): ClusterSingletonSettings = copy(dataCenter = None)
def withRemovalMargin(removalMargin: FiniteDuration): ClusterSingletonSettings = copy(removalMargin = removalMargin)
def withHandoverRetryInterval(handOverRetryInterval: FiniteDuration): ClusterSingletonSettings = copy(handOverRetryInterval = handOverRetryInterval)
def withBufferSize(bufferSize: Int): ClusterSingletonSettings = copy(bufferSize = bufferSize)
private def copy(
role: Option[String] = role,
dataCenter: Option[DataCenter] = dataCenter,
singletonIdentificationInterval: FiniteDuration = singletonIdentificationInterval,
removalMargin: FiniteDuration = removalMargin,
handOverRetryInterval: FiniteDuration = handOverRetryInterval,
bufferSize: Int = bufferSize) =
new ClusterSingletonSettings(role, dataCenter, singletonIdentificationInterval, removalMargin, handOverRetryInterval, bufferSize)
/**
* INTERNAL API:
*/
@InternalApi
private[akka] def toManagerSettings(singletonName: String): ClusterSingletonManagerSettings =
new ClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval)
/**
* INTERNAL API:
*/
@InternalApi
private[akka] def toProxySettings(singletonName: String): ClusterSingletonProxySettings =
new ClusterSingletonProxySettings(singletonName, role, singletonIdentificationInterval, bufferSize)
/**
* INTERNAL API:
*/
@InternalApi
private[akka] def shouldRunManager(cluster: Cluster): Boolean =
(role.isEmpty || cluster.selfMember.roles(role.get)) &&
(dataCenter.isEmpty || dataCenter.contains(cluster.selfMember.dataCenter))
}
object ClusterSingleton extends ExtensionId[ClusterSingleton] {
override def createExtension(system: ActorSystem[_]): ClusterSingleton = new AdaptedClusterSingletonImpl(system)
/**
* Java API:
*/
def get(system: ActorSystem[_]): ClusterSingleton = apply(system)
}
/**
* INTERNAL API:
*/
@InternalApi
private[akka] object ClusterSingletonImpl {
def managerNameFor(singletonName: String) = s"singletonManager${singletonName}"
}
/**
* Not intended for user extension.
*/
@DoNotInherit
trait ClusterSingleton extends Extension {
/**
* Start if needed and provide a proxy to a named singleton
*
* If there already is a manager running for the given `singletonName` on this node, no additional manager is started.
* If there already is a proxy running for the given `singletonName` on this node, an [[ActorRef]] to that is returned.
*
* @param singletonName A cluster global unique name for this singleton
* @return A proxy actor that can be used to communicate with the singleton in the cluster
*/
def spawn[A](
behavior: Behavior[A],
singletonName: String,
props: Props,
settings: ClusterSingletonSettings,
terminationMessage: A
): ActorRef[A]
}

View file

@ -0,0 +1,153 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.cluster.internal
import akka.actor.ExtendedActorSystem
import akka.annotation.InternalApi
import akka.cluster.ClusterEvent.MemberEvent
import akka.cluster.{ ClusterEvent, MemberStatus }
import akka.typed.{ ActorRef, ActorSystem, Terminated }
import akka.typed.cluster._
import akka.typed.internal.adapter.ActorSystemAdapter
import akka.typed.scaladsl.Actor
import akka.typed.scaladsl.adapter._
/**
* INTERNAL API:
*/
@InternalApi
private[akka] object AdapterClusterImpl {
private sealed trait SeenState
private case object BeforeUp extends SeenState
private case object Up extends SeenState
private case class Removed(previousStatus: MemberStatus) extends SeenState
private def subscriptionsBehavior(adaptedCluster: akka.cluster.Cluster) = Actor.deferred[ClusterStateSubscription] { ctx
var seenState: SeenState = BeforeUp
var upSubscribers: List[ActorRef[SelfUp]] = Nil
var removedSubscribers: List[ActorRef[SelfRemoved]] = Nil
adaptedCluster.subscribe(ctx.self.toUntyped, ClusterEvent.initialStateAsEvents, classOf[MemberEvent])
// important to not eagerly refer to it or we get a cycle here
lazy val cluster = Cluster(ctx.system)
def onSelfMemberEvent(event: MemberEvent): Unit = {
event match {
case ClusterEvent.MemberUp(_)
seenState = Up
val upMessage = SelfUp(cluster.state)
upSubscribers.foreach(_ ! upMessage)
upSubscribers = Nil
case ClusterEvent.MemberRemoved(_, previousStatus)
seenState = Removed(previousStatus)
val removedMessage = SelfRemoved(previousStatus)
removedSubscribers.foreach(_ ! removedMessage)
removedSubscribers = Nil
case _ // This is fine.
}
}
Actor.immutable[AnyRef] { (ctx, msg)
msg match {
case Subscribe(subscriber: ActorRef[SelfUp] @unchecked, clazz) if clazz == classOf[SelfUp]
seenState match {
case Up subscriber ! SelfUp(adaptedCluster.state)
case BeforeUp
ctx.watch(subscriber)
upSubscribers = subscriber :: upSubscribers
case _: Removed
// self did join, but is now no longer up, we want to avoid subscribing
// to not get a memory leak, but also not signal anything
}
Actor.same
case Subscribe(subscriber: ActorRef[SelfRemoved] @unchecked, clazz) if clazz == classOf[SelfRemoved]
seenState match {
case BeforeUp | Up removedSubscribers = subscriber :: removedSubscribers
case Removed(s) subscriber ! SelfRemoved(s)
}
Actor.same
case Subscribe(subscriber, eventClass)
adaptedCluster.subscribe(subscriber.toUntyped, initialStateMode = ClusterEvent.initialStateAsEvents, eventClass)
Actor.same
case Unsubscribe(subscriber)
adaptedCluster.unsubscribe(subscriber.toUntyped)
Actor.same
case GetCurrentState(sender)
adaptedCluster.sendCurrentClusterState(sender.toUntyped)
Actor.same
case evt: MemberEvent if evt.member.uniqueAddress == cluster.selfMember.uniqueAddress
onSelfMemberEvent(evt)
Actor.same
case _: MemberEvent
Actor.same
}
}.onSignal {
case (_, Terminated(ref))
upSubscribers = upSubscribers.filterNot(_ == ref)
removedSubscribers = removedSubscribers.filterNot(_ == ref)
Actor.same
}.narrow[ClusterStateSubscription]
}
private def managerBehavior(adaptedCluster: akka.cluster.Cluster) = Actor.immutable[ClusterCommand]((ctx, msg)
msg match {
case Join(address)
adaptedCluster.join(address)
Actor.same
case Leave(address)
adaptedCluster.leave(address)
Actor.same
case Down(address)
adaptedCluster.down(address)
Actor.same
case JoinSeedNodes(addresses)
adaptedCluster.joinSeedNodes(addresses)
Actor.same
}
)
}
/**
* INTERNAL API:
*/
@InternalApi
private[akka] final class AdapterClusterImpl(system: ActorSystem[_]) extends Cluster {
import AdapterClusterImpl._
require(system.isInstanceOf[ActorSystemAdapter[_]], "only adapted actor systems can be used for cluster features")
private val untypedSystem = ActorSystemAdapter.toUntyped(system)
private def extendedUntyped = untypedSystem.asInstanceOf[ExtendedActorSystem]
private val untypedCluster = akka.cluster.Cluster(untypedSystem)
override def selfMember = untypedCluster.selfMember
override def isTerminated = untypedCluster.isTerminated
override def state = untypedCluster.state
// must not be lazy as it also updates the cached selfMember
override val subscriptions: ActorRef[ClusterStateSubscription] = extendedUntyped.systemActorOf(
PropsAdapter(subscriptionsBehavior(untypedCluster)), "clusterStateSubscriptions")
override lazy val manager: ActorRef[ClusterCommand] = extendedUntyped.systemActorOf(
PropsAdapter(managerBehavior(untypedCluster)), "clusterCommandManager")
}

View file

@ -0,0 +1,62 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.cluster.internal
import java.util.concurrent.ConcurrentHashMap
import java.util.function.{ Function JFunction }
import akka.actor.{ ExtendedActorSystem, InvalidActorNameException }
import akka.annotation.InternalApi
import akka.cluster.singleton.{ ClusterSingletonProxy, ClusterSingletonManager OldSingletonManager }
import akka.typed.cluster.{ Cluster, ClusterSingleton, ClusterSingletonImpl, ClusterSingletonSettings }
import akka.typed.internal.adapter.ActorSystemAdapter
import akka.typed.scaladsl.adapter._
import akka.typed.{ ActorRef, ActorSystem, Behavior, Props }
/**
* INTERNAL API:
*/
@InternalApi
private[akka] final class AdaptedClusterSingletonImpl(system: ActorSystem[_]) extends ClusterSingleton {
require(system.isInstanceOf[ActorSystemAdapter[_]], "only adapted actor systems can be used for the typed cluster singleton")
import ClusterSingletonImpl._
private lazy val cluster = Cluster(system)
private val untypedSystem = ActorSystemAdapter.toUntyped(system).asInstanceOf[ExtendedActorSystem]
private val proxies = new ConcurrentHashMap[String, ActorRef[_]]()
override def spawn[A](
behavior: Behavior[A],
singletonName: String,
props: Props,
settings: ClusterSingletonSettings,
terminationMessage: A) = {
if (settings.shouldRunManager(cluster)) {
val managerName = managerNameFor(singletonName)
// start singleton on this node
val adaptedProps = PropsAdapter(behavior, props)
try {
untypedSystem.systemActorOf(
OldSingletonManager.props(adaptedProps, terminationMessage, settings.toManagerSettings(singletonName)),
managerName)
} catch {
case ex: InvalidActorNameException if ex.getMessage.endsWith("is not unique!")
// This is fine. We just wanted to make sure it is running and it already is
}
}
val proxyCreator = new JFunction[String, ActorRef[_]] {
def apply(singletonName: String): ActorRef[_] = {
val proxyName = s"singletonProxy$singletonName"
untypedSystem.systemActorOf(
ClusterSingletonProxy.props(s"/system/${managerNameFor(singletonName)}", settings.toProxySettings(singletonName)),
proxyName)
}
}
proxies.computeIfAbsent(singletonName, proxyCreator).asInstanceOf[ActorRef[A]]
}
}

View file

@ -7,10 +7,14 @@ package adapter
import akka.{ actor a, dispatch d }
import akka.dispatch.sysmsg
import scala.concurrent.ExecutionContextExecutor
import akka.util.Timeout
import scala.concurrent.Future
import akka.annotation.InternalApi
import akka.typed.scaladsl.adapter.AdapterExtension
import scala.annotation.unchecked.uncheckedVariance
/**
@ -80,7 +84,18 @@ import scala.annotation.unchecked.uncheckedVariance
}
private[typed] object ActorSystemAdapter {
def apply(untyped: a.ActorSystem): ActorSystem[Nothing] = new ActorSystemAdapter(untyped.asInstanceOf[a.ActorSystemImpl])
def apply(untyped: a.ActorSystem): ActorSystem[Nothing] = AdapterExtension(untyped).adapter
// to make sure we do never create more than one adapter for the same actor system
class AdapterExtension(system: a.ExtendedActorSystem) extends a.Extension {
val adapter = new ActorSystemAdapter(system.asInstanceOf[a.ActorSystemImpl])
}
object AdapterExtension extends a.ExtensionId[AdapterExtension] with a.ExtensionIdProvider {
override def get(system: a.ActorSystem): AdapterExtension = super.get(system)
override def lookup = AdapterExtension
override def createExtension(system: a.ExtendedActorSystem): AdapterExtension =
new AdapterExtension(system)
}
def toUntyped[U](sys: ActorSystem[_]): a.ActorSystem =
sys match {
@ -103,3 +118,4 @@ private[typed] object ActorSystemAdapter {
"receptionist"))
}
}

View file

@ -158,7 +158,12 @@ lazy val streamTestsTck = akkaModule("akka-stream-tests-tck")
.dependsOn(streamTestkit % "test->test", stream)
lazy val typed = akkaModule("akka-typed")
.dependsOn(testkit % "compile->compile;test->test", cluster % "compile->compile;test->test", distributedData)
.dependsOn(
testkit % "compile->compile;test->test",
cluster % "compile->compile;test->test",
clusterTools,
clusterSharding,
distributedData)
lazy val typedTests = akkaModule("akka-typed-tests")
.dependsOn(typed, typedTestkit % "compile->compile;test->test")