diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index f0b47a99c2..532ee06109 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -15,7 +15,7 @@ import akka.dispatch.MonitorableThreadFactory import akka.event.{ Logging, LoggingAdapter } import akka.japi.Util import akka.pattern._ -import akka.remote.{ DefaultFailureDetectorRegistry, FailureDetector, _ } +import akka.remote.{ DefaultFailureDetectorRegistry, _ } import com.typesafe.config.{ Config, ConfigFactory } import scala.annotation.varargs @@ -215,6 +215,11 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { */ def state: CurrentClusterState = readView.state + /** + * Current snapshot of the member itself + */ + def selfMember: Member = readView.self + /** * Subscribe to one or more cluster domain events. * The `to` classes can be [[akka.cluster.ClusterEvent.ClusterDomainEvent]] diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index e4e769c79c..c8bb5def1c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -13,7 +13,7 @@ import akka.cluster.MemberStatus._ import akka.event.EventStream import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.actor.DeadLetterSuppression -import akka.annotation.InternalApi +import akka.annotation.{ DoNotInherit, InternalApi } import scala.collection.breakOut import scala.runtime.AbstractFunction5 @@ -53,8 +53,11 @@ object ClusterEvent { /** * Marker interface for cluster domain events. + * + * Not intended for user extension. */ - sealed trait ClusterDomainEvent extends DeadLetterSuppression + @DoNotInherit + trait ClusterDomainEvent extends DeadLetterSuppression // for binary compatibility (used to be a case class) object CurrentClusterState extends AbstractFunction5[immutable.SortedSet[Member], Set[Member], Set[Address], Option[Address], Map[String, Option[Address]], CurrentClusterState] { diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala index 40b32777de..fd553f9d7a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -5,12 +5,14 @@ package akka.cluster import java.io.Closeable + import scala.collection.immutable import akka.actor.{ Actor, ActorRef, Address, Props } import akka.cluster.ClusterEvent._ import akka.actor.PoisonPill -import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } +import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.actor.Deploy +import akka.util.OptionVal /** * INTERNAL API @@ -29,6 +31,10 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { @volatile private var _reachability: Reachability = Reachability.empty + // lazy init below, updated when state is updated + @volatile + private var _cachedSelf: OptionVal[Member] = OptionVal.None + /** * Current internal cluster stats, updated periodically via event bus. */ @@ -44,39 +50,46 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { override def postStop(): Unit = cluster.unsubscribe(self) def receive = { - case e: ClusterDomainEvent ⇒ e match { - case SeenChanged(convergence, seenBy) ⇒ - _state = _state.copy(seenBy = seenBy) - case ReachabilityChanged(reachability) ⇒ - _reachability = reachability - case MemberRemoved(member, _) ⇒ - _state = _state.copy(members = _state.members - member, unreachable = _state.unreachable - member) - case UnreachableMember(member) ⇒ - // replace current member with new member (might have different status, only address is used in equals) - _state = _state.copy(unreachable = _state.unreachable - member + member) - case ReachableMember(member) ⇒ - _state = _state.copy(unreachable = _state.unreachable - member) - case event: MemberEvent ⇒ - // replace current member with new member (might have different status, only address is used in equals) - val newUnreachable = - if (_state.unreachable.contains(event.member)) _state.unreachable - event.member + event.member - else _state.unreachable - _state = _state.copy( - members = _state.members - event.member + event.member, - unreachable = newUnreachable) - case LeaderChanged(leader) ⇒ - _state = _state.copy(leader = leader) - case RoleLeaderChanged(role, leader) ⇒ - _state = _state.copy(roleLeaderMap = _state.roleLeaderMap + (role → leader)) - case stats: CurrentInternalStats ⇒ _latestStats = stats - case ClusterShuttingDown ⇒ + case e: ClusterDomainEvent ⇒ + e match { + case SeenChanged(convergence, seenBy) ⇒ + _state = _state.copy(seenBy = seenBy) + case ReachabilityChanged(reachability) ⇒ + _reachability = reachability + case MemberRemoved(member, _) ⇒ + _state = _state.copy(members = _state.members - member, unreachable = _state.unreachable - member) + case UnreachableMember(member) ⇒ + // replace current member with new member (might have different status, only address is used in equals) + _state = _state.copy(unreachable = _state.unreachable - member + member) + case ReachableMember(member) ⇒ + _state = _state.copy(unreachable = _state.unreachable - member) + case event: MemberEvent ⇒ + // replace current member with new member (might have different status, only address is used in equals) + val newUnreachable = + if (_state.unreachable.contains(event.member)) _state.unreachable - event.member + event.member + else _state.unreachable + _state = _state.copy( + members = _state.members - event.member + event.member, + unreachable = newUnreachable) + case LeaderChanged(leader) ⇒ + _state = _state.copy(leader = leader) + case RoleLeaderChanged(role, leader) ⇒ + _state = _state.copy(roleLeaderMap = _state.roleLeaderMap + (role → leader)) + case stats: CurrentInternalStats ⇒ _latestStats = stats + case ClusterShuttingDown ⇒ - case r: ReachableDataCenter ⇒ - _state = _state.withUnreachableDataCenters(_state.unreachableDataCenters - r.dataCenter) - case r: UnreachableDataCenter ⇒ - _state = _state.withUnreachableDataCenters(_state.unreachableDataCenters + r.dataCenter) + case r: ReachableDataCenter ⇒ + _state = _state.withUnreachableDataCenters(_state.unreachableDataCenters - r.dataCenter) + case r: UnreachableDataCenter ⇒ + _state = _state.withUnreachableDataCenters(_state.unreachableDataCenters + r.dataCenter) - } + } + + e match { + case e: MemberEvent if e.member.address == selfAddress ⇒ + _cachedSelf = OptionVal.Some(e.member) + case _ ⇒ + } case s: CurrentClusterState ⇒ _state = s } }).withDispatcher(cluster.settings.UseDispatcher).withDeploy(Deploy.local), name = "clusterEventBusListener") @@ -85,9 +98,19 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { def state: CurrentClusterState = _state def self: Member = { + _cachedSelf match { + case OptionVal.None ⇒ + // lazy initialization here, later updated from elsewhere + _cachedSelf = OptionVal.Some(selfFromStateOrPlaceholder) + _cachedSelf.get + case OptionVal.Some(member) ⇒ member + } + } + + private def selfFromStateOrPlaceholder = { import cluster.selfUniqueAddress - state.members.find(_.uniqueAddress == selfUniqueAddress). - getOrElse(Member(selfUniqueAddress, cluster.selfRoles).copy(status = MemberStatus.Removed)) + state.members.find(_.uniqueAddress == selfUniqueAddress) + .getOrElse(Member(selfUniqueAddress, cluster.selfRoles).copy(status = MemberStatus.Removed)) } /** @@ -162,8 +185,10 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { /** * Unsubscribe to cluster events. */ - def close(): Unit = + def close(): Unit = { + _cachedSelf = OptionVal.Some(self.copy(MemberStatus.Removed)) if (!eventBusListener.isTerminated) eventBusListener ! PoisonPill + } } diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala index e784067c74..dcc7cce527 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Member.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala @@ -90,7 +90,7 @@ object Member { * INTERNAL API * Create a new member with status Joining. */ - private[cluster] def apply(uniqueAddress: UniqueAddress, roles: Set[String]): Member = + private[akka] def apply(uniqueAddress: UniqueAddress, roles: Set[String]): Member = new Member(uniqueAddress, Int.MaxValue, Joining, roles) /** diff --git a/akka-testkit/src/main/mima-filters/2.5.5.backwards.excludes b/akka-testkit/src/main/mima-filters/2.5.5.backwards.excludes new file mode 100644 index 0000000000..3dfc51599b --- /dev/null +++ b/akka-testkit/src/main/mima-filters/2.5.5.backwards.excludes @@ -0,0 +1,5 @@ +# awaitAssert signature update introduced in #23613 +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.testkit.TestKitBase.awaitAssert") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.testkit.TestKitBase.awaitAssert") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.testkit.TestKit.awaitAssert") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.testkit.javadsl.TestKit.awaitAssert") \ No newline at end of file diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index 51896b23ea..df34c61b08 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -4,18 +4,21 @@ package akka.testkit import language.postfixOps -import scala.annotation.{ tailrec } +import scala.annotation.tailrec import scala.collection.immutable import scala.concurrent.Await import scala.concurrent.duration._ import scala.reflect.ClassTag import java.util.concurrent._ import java.util.concurrent.atomic.AtomicInteger + import akka.actor._ -import akka.util.{ Timeout, BoxedType } +import akka.util.{ BoxedType, OptionVal, Timeout } + import scala.util.control.NonFatal import scala.Some import java.util.concurrent.TimeUnit + import akka.actor.IllegalActorStateException import akka.actor.DeadLetter import akka.actor.Terminated @@ -283,7 +286,9 @@ trait TestKitBase { } /** - * Evaluate the given assert every `interval` until it does not throw an exception. + * Evaluate the given assert every `interval` until it does not throw an exception and return the + * result. + * * If the `max` timeout expires the last exception is thrown. * * If no timeout is given, take it from the innermost enclosing `within` @@ -292,19 +297,29 @@ trait TestKitBase { * Note that the timeout is scaled using Duration.dilated, * which uses the configuration entry "akka.test.timefactor". */ - def awaitAssert(a: ⇒ Any, max: Duration = Duration.Undefined, interval: Duration = 100.millis) { + def awaitAssert[A](a: ⇒ A, max: Duration = Duration.Undefined, interval: Duration = 100.millis): A = { val _max = remainingOrDilated(max) val stop = now + _max @tailrec - def poll(t: Duration) { - val failed = - try { a; false } catch { + def poll(t: Duration): A = { + // cannot use null-ness of result as signal it failed + // because Java API and not wanting to return a value will be "return null" + var failed = false + val result: A = + try { + val aRes = a + failed = false + aRes + } catch { case NonFatal(e) ⇒ + failed = true if ((now + t) >= stop) throw e - true + else null.asInstanceOf[A] } - if (failed) { + + if (!failed) result + else { Thread.sleep(t.toMillis) poll((stop - now) min interval) } diff --git a/akka-testkit/src/main/scala/akka/testkit/javadsl/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/javadsl/TestKit.scala index 050af8d563..0e8fa7169e 100644 --- a/akka-testkit/src/main/scala/akka/testkit/javadsl/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/javadsl/TestKit.scala @@ -189,9 +189,9 @@ class TestKit(system: ActorSystem) { def awaitCond(max: Duration, interval: Duration, message: String, p: Supplier[Boolean]): Unit = tp.awaitCond(p.get, max, interval, message) - def awaitAssert(a: Supplier[Any]): Unit = tp.awaitAssert(a.get) + def awaitAssert[A](a: Supplier[A]): A = tp.awaitAssert(a.get) - def awaitAssert(max: Duration, a: Supplier[Any]): Unit = tp.awaitAssert(a.get, max) + def awaitAssert[A](max: Duration, a: Supplier[A]): A = tp.awaitAssert(a.get, max) /** * Evaluate the given assert every `interval` until it does not throw an exception. @@ -200,7 +200,7 @@ class TestKit(system: ActorSystem) { * Note that the timeout is scaled using Duration.dilated, * which uses the configuration entry "akka.test.timefactor". */ - def awaitAssert(max: Duration, interval: Duration, a: Supplier[Any]): Unit = tp.awaitAssert(a.get, max, interval) + def awaitAssert[A](max: Duration, interval: Duration, a: Supplier[A]): A = tp.awaitAssert(a.get, max, interval) /** * Same as `expectMsg(remainingOrDefault, obj)`, but correctly treating the timeFactor. diff --git a/akka-typed-testkit/src/main/scala/akka/typed/testkit/javadsl/TestProbe.scala b/akka-typed-testkit/src/main/scala/akka/typed/testkit/javadsl/TestProbe.scala new file mode 100644 index 0000000000..01a3fac9f2 --- /dev/null +++ b/akka-typed-testkit/src/main/scala/akka/typed/testkit/javadsl/TestProbe.scala @@ -0,0 +1,22 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package akka.typed.testkit.javadsl + +import akka.typed.ActorSystem +import akka.typed.testkit.TestKitSettings + +/** + * Java API: + */ +class TestProbe[M](name: String, system: ActorSystem[_], settings: TestKitSettings) extends akka.typed.testkit.scaladsl.TestProbe[M](name)(system, settings) { + + def this(system: ActorSystem[_], settings: TestKitSettings) = this("testProbe", system, settings) + + /** + * Same as `expectMsgType[T](remainingOrDefault)`, but correctly treating the timeFactor. + */ + def expectMsgType[T <: M](t: Class[T]): T = + expectMsgClass_internal(remainingOrDefault, t) + +} diff --git a/akka-typed-testkit/src/main/scala/akka/typed/testkit/scaladsl/TestProbe.scala b/akka-typed-testkit/src/main/scala/akka/typed/testkit/scaladsl/TestProbe.scala index d6be133da0..f3c96e0ebe 100644 --- a/akka-typed-testkit/src/main/scala/akka/typed/testkit/scaladsl/TestProbe.scala +++ b/akka-typed-testkit/src/main/scala/akka/typed/testkit/scaladsl/TestProbe.scala @@ -5,19 +5,25 @@ package akka.typed.testkit.scaladsl import scala.concurrent.duration._ import java.util.concurrent.BlockingDeque + import akka.typed.Behavior import akka.typed.scaladsl.Actor import akka.typed.ActorSystem import java.util.concurrent.LinkedBlockingDeque import java.util.concurrent.atomic.AtomicInteger + import akka.typed.ActorRef import akka.util.Timeout import akka.util.PrettyDuration.PrettyPrintableDuration + import scala.concurrent.Await import com.typesafe.config.Config import akka.typed.testkit.TestKitSettings import akka.util.BoxedType + +import scala.annotation.tailrec import scala.reflect.ClassTag +import scala.util.control.NonFatal object TestProbe { private val testActorId = new AtomicInteger(0) @@ -35,6 +41,7 @@ object TestProbe { } class TestProbe[M](name: String)(implicit val system: ActorSystem[_], val settings: TestKitSettings) { + import TestProbe._ private val queue = new LinkedBlockingDeque[M] @@ -219,11 +226,48 @@ class TestProbe[M](name: String)(implicit val system: ActorSystem[_], val settin def expectMsgType[T <: M](max: FiniteDuration)(implicit t: ClassTag[T]): T = expectMsgClass_internal(max.dilated, t.runtimeClass.asInstanceOf[Class[T]]) - private def expectMsgClass_internal[C](max: FiniteDuration, c: Class[C]): C = { + private[akka] def expectMsgClass_internal[C](max: FiniteDuration, c: Class[C]): C = { val o = receiveOne(max) assert(o != null, s"timeout ($max) during expectMsgClass waiting for $c") assert(BoxedType(c) isInstance o, s"expected $c, found ${o.getClass} ($o)") o.asInstanceOf[C] } + /** + * Evaluate the given assert every `interval` until it does not throw an exception and return the + * result. + * + * If the `max` timeout expires the last exception is thrown. + * + * If no timeout is given, take it from the innermost enclosing `within` + * block. + * + * Note that the timeout is scaled using Duration.dilated, + * which uses the configuration entry "akka.test.timefactor". + */ + def awaitAssert[A](a: ⇒ A, max: Duration = Duration.Undefined, interval: Duration = 100.millis): A = { + val _max = remainingOrDilated(max) + val stop = now + _max + + @tailrec + def poll(t: Duration): A = { + val result: A = + try { + a + } catch { + case NonFatal(e) ⇒ + if ((now + t) >= stop) throw e + else null.asInstanceOf[A] + } + + if (result != null) result + else { + Thread.sleep(t.toMillis) + poll((stop - now) min interval) + } + } + + poll(_max min interval) + } + } diff --git a/akka-typed-tests/src/test/java/akka/typed/cluster/ClusterApiTest.java b/akka-typed-tests/src/test/java/akka/typed/cluster/ClusterApiTest.java new file mode 100644 index 0000000000..605cfe0419 --- /dev/null +++ b/akka-typed-tests/src/test/java/akka/typed/cluster/ClusterApiTest.java @@ -0,0 +1,65 @@ +package akka.typed.cluster; + +import akka.cluster.ClusterEvent; +import akka.typed.ActorSystem; +import akka.typed.testkit.TestKitSettings; +import akka.typed.testkit.javadsl.TestProbe; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.junit.Test; +import org.scalatest.junit.JUnitSuite; +import scala.concurrent.Await; +import scala.concurrent.duration.Duration; + +public class ClusterApiTest extends JUnitSuite { + + + + @Test + public void joinLeaveAndObserve() throws Exception { + Config config = ConfigFactory.parseString( + "akka.actor.provider = cluster \n" + + "akka.remote.artery.enabled = true \n"+ + "akka.remote.netty.tcp.port = 0 \n"+ + "akka.remote.artery.canonical.port = 0 \n"+ + "akka.cluster.jmx.multi-mbeans-in-same-jvm = on \n"+ + "akka.coordinated-shutdown.terminate-actor-system = off \n"+ + "akka.actor { \n"+ + " serialize-messages = off \n"+ + " allow-java-serialization = off \n"+ + "}" + ); + + ActorSystem system1 = ActorSystem.wrap(akka.actor.ActorSystem.create("ClusterApiTest", config)); + ActorSystem system2 = ActorSystem.wrap(akka.actor.ActorSystem.create("ClusterApiTest", config)); + + try { + TestKitSettings testKitSettings = new TestKitSettings(system1.settings().config()); + + Cluster cluster1 = Cluster.get(system1); + Cluster cluster2 = Cluster.get(system2); + + TestProbe probe1 = new TestProbe<>(system1, testKitSettings); + + cluster1.subscriptions().tell(new Subscribe<>(probe1.ref().narrow(), SelfUp.class)); + cluster1.manager().tell(new Join(cluster1.selfMember().address())); + probe1.expectMsgType(SelfUp.class); + + TestProbe probe2 = new TestProbe<>(system2, testKitSettings); + cluster2.subscriptions().tell(new Subscribe<>(probe2.ref().narrow(), SelfUp.class)); + cluster2.manager().tell(new Join(cluster1.selfMember().address())); + probe2.expectMsgType(SelfUp.class); + + + cluster2.subscriptions().tell(new Subscribe<>(probe2.ref().narrow(), SelfRemoved.class)); + cluster2.manager().tell(new Leave(cluster2.selfMember().address())); + + probe2.expectMsgType(SelfRemoved.class); + } finally { + // TODO no java API to terminate actor system + Await.result(system1.terminate().zip(system2.terminate()), Duration.create("5 seconds")); + } + + } + +} \ No newline at end of file diff --git a/akka-typed-tests/src/test/scala/akka/typed/cluster/ClusterApiSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/cluster/ClusterApiSpec.scala new file mode 100644 index 0000000000..7c046c1c28 --- /dev/null +++ b/akka-typed-tests/src/test/scala/akka/typed/cluster/ClusterApiSpec.scala @@ -0,0 +1,113 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package akka.typed.cluster + +import akka.cluster.ClusterEvent._ +import akka.cluster.MemberStatus +import akka.typed.TypedSpec +import akka.typed.internal.adapter.ActorSystemAdapter +import akka.typed.scaladsl.adapter._ +import akka.typed.testkit.TestKitSettings +import akka.typed.testkit.scaladsl.TestProbe +import com.typesafe.config.ConfigFactory +import org.scalatest.concurrent.ScalaFutures + +import scala.concurrent.Await +import scala.concurrent.duration._ + +object ClusterApiSpec { + val config = ConfigFactory.parseString( + """ + akka.actor.provider = cluster + akka.remote.artery.enabled = true + akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 + akka.cluster.jmx.multi-mbeans-in-same-jvm = on + akka.coordinated-shutdown.terminate-actor-system = off + akka.actor { + serialize-messages = off + allow-java-serialization = off + } + """) +} + +class ClusterApiSpec extends TypedSpec(ClusterApiSpec.config) with ScalaFutures { + + val testSettings = TestKitSettings(adaptedSystem) + val clusterNode1 = Cluster(adaptedSystem) + val untypedSystem1 = ActorSystemAdapter.toUntyped(adaptedSystem) + + object `A typed cluster` { + + def `01 must join a cluster and observe events from both sides`() = { + + val system2 = akka.actor.ActorSystem(adaptedSystem.name, adaptedSystem.settings.config) + val adaptedSystem2 = system2.toTyped + + try { + val clusterNode2 = Cluster(adaptedSystem2) + + val node1Probe = TestProbe[AnyRef]()(adaptedSystem, testSettings) + val node2Probe = TestProbe[AnyRef]()(adaptedSystem2, testSettings) + + // initial cached selfMember + clusterNode1.selfMember.status should ===(MemberStatus.Removed) + clusterNode2.selfMember.status should ===(MemberStatus.Removed) + + // check that subscriptions work + clusterNode1.subscriptions ! Subscribe(node1Probe.ref, classOf[MemberEvent]) + clusterNode1.manager ! Join(clusterNode1.selfMember.address) + node1Probe.expectMsgType[MemberUp].member.uniqueAddress == clusterNode1.selfMember.uniqueAddress + + // check that cached selfMember is updated + node1Probe.awaitAssert( + clusterNode1.selfMember.status should ===(MemberStatus.Up) + ) + + // subscribing to OnSelfUp when already up + clusterNode1.subscriptions ! Subscribe(node1Probe.ref, classOf[SelfUp]) + node1Probe.expectMsgType[SelfUp] + + // selfMember update and on up subscription on node 2 when joining + clusterNode2.subscriptions ! Subscribe(node2Probe.ref, classOf[SelfUp]) + clusterNode2.manager ! Join(clusterNode1.selfMember.address) + node2Probe.awaitAssert( + clusterNode2.selfMember.status should ===(MemberStatus.Up) + ) + node2Probe.expectMsgType[SelfUp] + + // events about node2 joining to subscriber on node1 + node1Probe.expectMsgType[MemberJoined].member.uniqueAddress == clusterNode2.selfMember.uniqueAddress + node1Probe.expectMsgType[MemberUp].member.uniqueAddress == clusterNode1.selfMember.uniqueAddress + + // OnSelfRemoved and subscription events around node2 leaving + clusterNode2.subscriptions ! Subscribe(node2Probe.ref, classOf[SelfRemoved]) + clusterNode2.manager ! Leave(clusterNode2.selfMember.address) + + // node1 seeing all those transition events + node1Probe.expectMsgType[MemberLeft].member.uniqueAddress == clusterNode2.selfMember.uniqueAddress + node1Probe.expectMsgType[MemberExited].member.uniqueAddress == clusterNode2.selfMember.uniqueAddress + node1Probe.expectMsgType[MemberRemoved].member.uniqueAddress == clusterNode2.selfMember.uniqueAddress + + // selfMember updated and self removed event gotten + node2Probe.awaitAssert( + clusterNode2.selfMember.status should ===(MemberStatus.Removed) + ) + node2Probe.expectMsg(SelfRemoved(MemberStatus.Exiting)) + + // subscribing to SelfRemoved when already removed yields immediate message back + clusterNode2.subscriptions ! Subscribe(node2Probe.ref, classOf[SelfRemoved]) + node2Probe.expectMsg(SelfRemoved(MemberStatus.Exiting)) + + // subscribing to SelfUp when already removed yields nothing + clusterNode2.subscriptions ! Subscribe(node2Probe.ref, classOf[SelfUp]) + node2Probe.expectNoMsg(100.millis) + + } finally { + Await.result(system2.terminate(), 3.seconds) + } + } + } + +} diff --git a/akka-typed-tests/src/test/scala/akka/typed/cluster/ClusterShardingApiSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/cluster/ClusterShardingApiSpec.scala new file mode 100644 index 0000000000..b364c98922 --- /dev/null +++ b/akka-typed-tests/src/test/scala/akka/typed/cluster/ClusterShardingApiSpec.scala @@ -0,0 +1,55 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package akka.typed.cluster + +import akka.cluster.sharding.ClusterShardingSettings +import akka.typed.scaladsl.Actor +import akka.typed.scaladsl.adapter._ +import akka.typed.{ ActorSystem } + +class ClusterShardingApiSpec { + + // Compile only for now + + val system: akka.actor.ActorSystem = ??? + val typedSystem: ActorSystem[Nothing] = system.toTyped + val cluster = Cluster(typedSystem) + + trait EntityProtocol + case class Add(thing: String) extends EntityProtocol + case object PassHence extends EntityProtocol + + val entityBehavior = + Actor.deferred[EntityProtocol] { _ ⇒ + var things: List[String] = Nil + + Actor.immutable[EntityProtocol] { (_, msg) ⇒ + msg match { + case Add(thing) ⇒ + things = thing :: things + Actor.same + + case PassHence ⇒ + Actor.stopped + } + } + } + + val sharding = ClusterSharding(typedSystem).spawn( + entityBehavior, + "things-lists", + ClusterShardingSettings(typedSystem.settings.config), + maxNumberOfShards = 25, + handOffStopMessage = PassHence + ) + + sharding ! ShardingEnvelope("1", Add("bananas")) + + val entity1 = ClusterSharding.entityRefFor("1", sharding) + entity1 ! Add("pineapple") + + // start but no command + sharding ! StartEntity("2") + +} diff --git a/akka-typed-tests/src/test/scala/akka/typed/cluster/ClusterSingletonApiSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/cluster/ClusterSingletonApiSpec.scala new file mode 100644 index 0000000000..4b81b24167 --- /dev/null +++ b/akka-typed-tests/src/test/scala/akka/typed/cluster/ClusterSingletonApiSpec.scala @@ -0,0 +1,151 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package akka.typed.cluster + +import java.nio.charset.StandardCharsets + +import akka.actor.ExtendedActorSystem +import akka.serialization.SerializerWithStringManifest +import akka.typed.internal.adapter.ActorSystemAdapter +import akka.typed.scaladsl.Actor +import akka.typed.scaladsl.adapter._ +import akka.typed.testkit.TestKitSettings +import akka.typed.testkit.scaladsl.TestProbe +import akka.typed.{ ActorRef, Props, TypedSpec } +import com.typesafe.config.ConfigFactory +import org.scalatest.concurrent.ScalaFutures + +import scala.concurrent.Await +import scala.concurrent.duration._ + +object ClusterSingletonApiSpec { + + val config = ConfigFactory.parseString( + """ + akka.actor { + provider = cluster + serialize-messages = off + allow-java-serialization = off + + serializers { + test = "akka.typed.cluster.ClusterSingletonApiSpec$PingSerializer" + } + serialization-bindings { + "akka.typed.cluster.ClusterSingletonApiSpec$Ping" = test + "akka.typed.cluster.ClusterSingletonApiSpec$Pong$" = test + "akka.typed.cluster.ClusterSingletonApiSpec$Perish$" = test + } + } + akka.remote.artery.enabled = true + akka.remote.artery.canonical.port = 25552 + akka.cluster.jmx.multi-mbeans-in-same-jvm = on + """) + + trait PingProtocol + case object Pong + case class Ping(respondTo: ActorRef[Pong.type]) extends PingProtocol + + case object Perish extends PingProtocol + + val pingPong = Actor.immutable[PingProtocol] { (ctx, msg) ⇒ + + msg match { + case Ping(respondTo) ⇒ + respondTo ! Pong + Actor.same + + case Perish ⇒ + Actor.stopped + } + + } + + class PingSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest { + def identifier: Int = 47 + def manifest(o: AnyRef): String = o match { + case _: Ping ⇒ "a" + case Pong ⇒ "b" + case Perish ⇒ "c" + } + + def toBinary(o: AnyRef): Array[Byte] = o match { + case p: Ping ⇒ ActorRefResolver(system.toTyped).toSerializationFormat(p.respondTo).getBytes(StandardCharsets.UTF_8) + case Pong ⇒ Array.emptyByteArray + case Perish ⇒ Array.emptyByteArray + } + + def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match { + case "a" ⇒ Ping(ActorRefResolver(system.toTyped).resolveActorRef(new String(bytes, StandardCharsets.UTF_8))) + case "b" ⇒ Pong + case "c" ⇒ Perish + } + } +} + +class ClusterSingletonApiSpec extends TypedSpec(ClusterSingletonApiSpec.config) with ScalaFutures { + import ClusterSingletonApiSpec._ + + implicit val testSettings = TestKitSettings(adaptedSystem) + val clusterNode1 = Cluster(adaptedSystem) + val untypedSystem1 = ActorSystemAdapter.toUntyped(adaptedSystem) + + val system2 = akka.actor.ActorSystem( + adaptedSystem.name, + ConfigFactory.parseString( + """ + akka.remote.artery.canonical.port = 0 + akka.cluster.roles = ["singleton"] + """ + ).withFallback(adaptedSystem.settings.config)) + val adaptedSystem2 = system2.toTyped + val clusterNode2 = Cluster(adaptedSystem2) + + object `A typed cluster singleton` { + + def `01 must be accessible from two nodes in a cluster`() = { + val node1UpProbe = TestProbe[SelfUp]()(adaptedSystem, implicitly[TestKitSettings]) + clusterNode1.subscriptions ! Subscribe(node1UpProbe.ref, classOf[SelfUp]) + + val node2UpProbe = TestProbe[SelfUp]()(adaptedSystem2, implicitly[TestKitSettings]) + clusterNode1.subscriptions ! Subscribe(node2UpProbe.ref, classOf[SelfUp]) + + clusterNode1.manager ! Join(clusterNode1.selfMember.address) + clusterNode2.manager ! Join(clusterNode1.selfMember.address) + + node1UpProbe.expectMsgType[SelfUp] + node2UpProbe.expectMsgType[SelfUp] + + val cs1 = ClusterSingleton(adaptedSystem) + val cs2 = ClusterSingleton(adaptedSystem2) + + val settings = ClusterSingletonSettings(adaptedSystem).withRole("singleton") + val node1ref = cs1.spawn(pingPong, "ping-pong", Props.empty, settings, Perish) + val node2ref = cs2.spawn(pingPong, "ping-pong", Props.empty, settings, Perish) + + // subsequent spawning returns the same refs + cs1.spawn(pingPong, "ping-pong", Props.empty, settings, Perish) should ===(node1ref) + cs2.spawn(pingPong, "ping-pong", Props.empty, settings, Perish) should ===(node2ref) + + val node1PongProbe = TestProbe[Pong.type]()(adaptedSystem, implicitly[TestKitSettings]) + val node2PongProbe = TestProbe[Pong.type]()(adaptedSystem2, implicitly[TestKitSettings]) + + node1PongProbe.awaitAssert({ + node1ref ! Ping(node1PongProbe.ref) + node1PongProbe.expectMsg(Pong) + }, 3.seconds) + + node2PongProbe.awaitAssert({ + node2ref ! Ping(node2PongProbe.ref) + node2PongProbe.expectMsg(Pong) + }, 3.seconds) + + } + } + + override def afterAll(): Unit = { + super.afterAll() + Await.result(system2.terminate(), 3.seconds) + } + +} diff --git a/akka-typed-tests/src/test/scala/akka/typed/scaladsl/adapter/RemotingSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/scaladsl/adapter/RemotingSpec.scala new file mode 100644 index 0000000000..fdbb190b57 --- /dev/null +++ b/akka-typed-tests/src/test/scala/akka/typed/scaladsl/adapter/RemotingSpec.scala @@ -0,0 +1,112 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed.scaladsl.adapter + +import java.nio.charset.StandardCharsets + +import akka.Done +import akka.testkit.AkkaSpec +import akka.typed.{ ActorRef, ActorSystem } +import akka.typed.scaladsl.Actor +import akka.actor.{ ExtendedActorSystem, ActorSystem ⇒ UntypedActorSystem } +import akka.cluster.Cluster +import akka.serialization.{ BaseSerializer, SerializerWithStringManifest } +import com.typesafe.config.ConfigFactory + +import scala.concurrent.Promise +import akka.typed.cluster.ActorRefResolver +import akka.typed.internal.adapter.ActorRefAdapter + +class PingSerializer(system: ExtendedActorSystem) extends SerializerWithStringManifest { + override def identifier = 41 + override def manifest(o: AnyRef) = "a" + override def toBinary(o: AnyRef) = o match { + case RemotingSpec.Ping(who) ⇒ + ActorRefResolver(system.toTyped).toSerializationFormat(who).getBytes(StandardCharsets.UTF_8) + } + override def fromBinary(bytes: Array[Byte], manifest: String) = { + val str = new String(bytes, StandardCharsets.UTF_8) + val ref = ActorRefResolver(system.toTyped).resolveActorRef[String](str) + RemotingSpec.Ping(ref) + } +} + +object RemotingSpec { + def config = ConfigFactory.parseString( + s""" + akka { + loglevel = debug + actor { + provider = cluster + warn-about-java-serializer-usage = off + serialize-creators = off + serializers { + test = "akka.typed.scaladsl.adapter.PingSerializer" + } + serialization-bindings { + "akka.typed.scaladsl.adapter.RemotingSpec$$Ping" = test + } + } + remote.artery { + enabled = on + canonical { + hostname = 127.0.0.1 + port = 0 + } + } + } + """) + + case class Ping(sender: ActorRef[String]) +} + +class RemotingSpec extends AkkaSpec(RemotingSpec.config) { + + import RemotingSpec._ + + val typedSystem = system.toTyped + + "the adapted system" should { + + "something something" in { + + val pingPromise = Promise[Done]() + val ponger = Actor.immutable[Ping]((_, msg) ⇒ + msg match { + case Ping(sender) ⇒ + pingPromise.success(Done) + sender ! "pong" + Actor.stopped + }) + + // typed actor on system1 + val pingPongActor = system.spawn(ponger, "pingpong") + + val system2 = UntypedActorSystem(system.name + "-system2", RemotingSpec.config) + val typedSystem2 = system2.toTyped + try { + + // resolve the actor from node2 + val remoteRefStr = ActorRefResolver(typedSystem).toSerializationFormat(pingPongActor) + val remoteRef: ActorRef[Ping] = + ActorRefResolver(typedSystem2).resolveActorRef[Ping](remoteRefStr) + + val pongPromise = Promise[Done]() + val recipient = system2.spawn(Actor.immutable[String] { (_, msg) ⇒ + pongPromise.success(Done) + Actor.stopped + }, "recipient") + remoteRef ! Ping(recipient) + + pingPromise.future.futureValue should ===(Done) + pongPromise.future.futureValue should ===(Done) + + } finally { + system2.terminate() + } + } + + } + +} diff --git a/akka-typed/src/main/scala/akka/typed/ActorSystem.scala b/akka-typed/src/main/scala/akka/typed/ActorSystem.scala index 323f3ee53f..f80cfbc70f 100644 --- a/akka-typed/src/main/scala/akka/typed/ActorSystem.scala +++ b/akka-typed/src/main/scala/akka/typed/ActorSystem.scala @@ -220,14 +220,14 @@ object ActorSystem { val untyped = new a.ActorSystemImpl(name, appConfig, cl, executionContext, Some(PropsAdapter(() ⇒ guardianBehavior, guardianProps)), actorSystemSettings) untyped.start() - new ActorSystemAdapter(untyped) + ActorSystemAdapter.AdapterExtension(untyped).adapter } /** * Wrap an untyped [[akka.actor.ActorSystem]] such that it can be used from * Akka Typed [[Behavior]]. */ - def wrap(untyped: a.ActorSystem): ActorSystem[Nothing] = new ActorSystemAdapter(untyped.asInstanceOf[a.ActorSystemImpl]) + def wrap(untyped: a.ActorSystem): ActorSystem[Nothing] = ActorSystemAdapter.AdapterExtension(untyped.asInstanceOf[a.ActorSystemImpl]).adapter } /** diff --git a/akka-typed/src/main/scala/akka/typed/cluster/Cluster.scala b/akka-typed/src/main/scala/akka/typed/cluster/Cluster.scala new file mode 100644 index 0000000000..93670c7b79 --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/cluster/Cluster.scala @@ -0,0 +1,159 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package akka.typed.cluster + +import akka.actor.Address +import akka.annotation.DoNotInherit +import akka.cluster.ClusterEvent.{ ClusterDomainEvent, CurrentClusterState } +import akka.cluster._ +import akka.japi.Util +import akka.typed.cluster.internal.AdapterClusterImpl +import akka.typed.{ ActorRef, ActorSystem, Extension, ExtensionId } + +import scala.collection.immutable + +/** + * Messages for subscribing to changes in the cluster state + * + * Not intended for user extension. + */ +@DoNotInherit +sealed trait ClusterStateSubscription + +/** + * Subscribe to cluster state changes. The initial state of the cluster will be sent as + * a "replay" of the subscribed events. + * + * @param subscriber A subscriber that will receive events until it is unsubscribed or stops + * @param eventClass The type of events to subscribe to, can be individual event types such as + * `ReachabilityEvent` or one of the common supertypes, such as `MemberEvent` to get + * all the subtypes of events. + */ +final case class Subscribe[A <: ClusterDomainEvent]( + subscriber: ActorRef[A], + eventClass: Class[A]) extends ClusterStateSubscription + +/** + * Subscribe to this node being up, after sending this event the subscription is automatically + * cancelled. If the node is already up the event is also sent to the subscriber. If the node was up + * but is no more because it left or is leaving the cluster, no event is sent and the subscription + * request is ignored. + * + * Note: Only emitted for the typed cluster. + */ +final case class SelfUp(currentClusterState: CurrentClusterState) extends ClusterDomainEvent + +/** + * Subscribe to this node being removed from the cluster. If the node was already removed from the cluster + * when this subscription is created it will be responded to immediately from the subscriptions actor. + * + * Note: Only emitted for the typed cluster. + */ +final case class SelfRemoved(previousStatus: MemberStatus) extends ClusterDomainEvent + +final case class Unsubscribe[T](subscriber: ActorRef[T]) extends ClusterStateSubscription +final case class GetCurrentState(recipient: ActorRef[CurrentClusterState]) extends ClusterStateSubscription + +/** + * Not intended for user extension. + */ +@DoNotInherit +sealed trait ClusterCommand + +/** + * Try to join this cluster node with the node specified by 'address'. + * + * An actor system can only join a cluster once. Additional attempts will be ignored. + * When it has successfully joined it must be restarted to be able to join another + * cluster or to join the same cluster again. + * + * The name of the [[akka.actor.ActorSystem]] must be the same for all members of a + * cluster. + */ +final case class Join(address: Address) extends ClusterCommand + +/** + * Scala API: Join the specified seed nodes without defining them in config. + * Especially useful from tests when Addresses are unknown before startup time. + * + * An actor system can only join a cluster once. Additional attempts will be ignored. + * When it has successfully joined it must be restarted to be able to join another + * cluster or to join the same cluster again. + */ +final case class JoinSeedNodes(seedNodes: immutable.Seq[Address]) extends ClusterCommand { + + /** + * Java API: Join the specified seed nodes without defining them in config. + * Especially useful from tests when Addresses are unknown before startup time. + * + * An actor system can only join a cluster once. Additional attempts will be ignored. + * When it has successfully joined it must be restarted to be able to join another + * cluster or to join the same cluster again. + * + * Creates a defensive copy of the list to ensure immutability. + */ + def this(seedNodes: java.util.List[Address]) = this(Util.immutableSeq(seedNodes)) + +} + +/** + * Send command to issue state transition to LEAVING for the node specified by 'address'. + * The member will go through the status changes [[MemberStatus]] `Leaving` (not published to + * subscribers) followed by [[MemberStatus]] `Exiting` and finally [[MemberStatus]] `Removed`. + * + * Note that this command can be issued to any member in the cluster, not necessarily the + * one that is leaving. The cluster extension, but not the actor system or JVM, of the + * leaving member will be shutdown after the leader has changed status of the member to + * Exiting. Thereafter the member will be removed from the cluster. Normally this is + * handled automatically, but in case of network failures during this process it might + * still be necessary to set the node’s status to Down in order to complete the removal. + */ +final case class Leave(address: Address) extends ClusterCommand + +/** + * Send command to DOWN the node specified by 'address'. + * + * When a member is considered by the failure detector to be unreachable the leader is not + * allowed to perform its duties, such as changing status of new joining members to 'Up'. + * The status of the unreachable member must be changed to 'Down', which can be done with + * this method. + */ +final case class Down(address: Address) extends ClusterCommand + +/** + * Akka Typed Cluster API entry point + */ +object Cluster extends ExtensionId[Cluster] { + + def createExtension(system: ActorSystem[_]): Cluster = new AdapterClusterImpl(system) + + def get(system: ActorSystem[_]): Cluster = apply(system) +} + +/** + * Not intended for user extension. + */ +@DoNotInherit +abstract class Cluster extends Extension { + + /** Details about this cluster node itself */ + def selfMember: Member + + /** Returns true if this cluster instance has be shutdown. */ + def isTerminated: Boolean + + /** Current snapshot state of the cluster. */ + def state: CurrentClusterState + + /** + * @return an actor that allows for subscribing to messages when the cluster state changes + */ + def subscriptions: ActorRef[ClusterStateSubscription] + + /** + * @return an actor that accepts commands to join, leave and down nodes in a cluster + */ + def manager: ActorRef[ClusterCommand] + +} diff --git a/akka-typed/src/main/scala/akka/typed/cluster/ClusterSharding.scala b/akka-typed/src/main/scala/akka/typed/cluster/ClusterSharding.scala new file mode 100644 index 0000000000..a21f21161f --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/cluster/ClusterSharding.scala @@ -0,0 +1,196 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package akka.typed.cluster + +import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy +import akka.cluster.sharding.ClusterShardingSettings +import akka.typed.{ ActorRef, ActorSystem, Behavior, Extension, ExtensionId, Props } + +sealed case class ShardingEnvelope[A](entityId: String, message: A) +object StartEntity { + def apply[A](entityId: String): ShardingEnvelope[A] = + new ShardingEnvelope[A](entityId, null.asInstanceOf[A]) +} + +object TypedMessageExtractor { + + /** + * Scala API: + * + * Create the default message extractor, using envelopes to identify what entity a message is for + * and the hashcode of the entityId to decide which shard an entity belongs to. + * + * This is recommended since it does not force details about sharding into the entity protocol + */ + def apply[A](maxNumberOfShards: Int): TypedMessageExtractor[ShardingEnvelope[A], A] = + new DefaultMessageExtractor[A](maxNumberOfShards) + + /** + * Scala API: + * + * Create a message extractor for a protocol where the entity id is available in each message. + */ + def noEnvelope[A]( + maxNumberOfShards: Int, + extractEntityId: A ⇒ String + ): TypedMessageExtractor[A, A] = + new DefaultNoEnvelopeMessageExtractor[A](maxNumberOfShards) { + // TODO catch MatchError here and return null for those to yield an "unhandled" when partial functions are used? + def entityId(message: A) = extractEntityId(message) + } + +} + +/** + * Entirely customizable typed message extractor. Prefer [[DefaultMessageExtractor]] or [[DefaultNoEnvelopeMessageExtractor]] + * if possible. + * + * @tparam E Possibly an envelope around the messages accepted by the entity actor, is the same as `A` if there is no + * envelope. + * @tparam A The type of message accepted by the entity actor + */ +trait TypedMessageExtractor[E, A] { + + /** + * Extract the entity id from an incoming `message`. If `null` is returned + * the message will be `unhandled`, i.e. posted as `Unhandled` messages on the event stream + */ + def entityId(message: E): String + + /** + * Extract the message to send to the entity from an incoming `message`. + * Note that the extracted message does not have to be the same as the incoming + * message to support wrapping in message envelope that is unwrapped before + * sending to the entity actor. + * + * If the returned value is `null`, and the entity isn't running yet the entity will be started + * but no message will be delivered to it. + */ + def entityMessage(message: E): A + + /** + * Extract the entity id from an incoming `message`. Only messages that passed the [[#entityId]] + * function will be used as input to this function. + */ + def shardId(message: E): String +} + +/** + * Java API: + * + * Default message extractor type, using envelopes to identify what entity a message is for + * and the hashcode of the entityId to decide which shard an entity belongs to. + * + * This is recommended since it does not force details about sharding into the entity protocol + * + * @tparam A The type of message accepted by the entity actor + */ +final class DefaultMessageExtractor[A](maxNumberOfShards: Int) extends TypedMessageExtractor[ShardingEnvelope[A], A] { + def entityId(envelope: ShardingEnvelope[A]) = envelope.entityId + def entityMessage(envelope: ShardingEnvelope[A]) = envelope.message + def shardId(envelope: ShardingEnvelope[A]) = (math.abs(envelope.entityId.hashCode) % maxNumberOfShards).toString +} + +/** + * Java API: + * + * Default message extractor type, using a property of the message to identify what entity a message is for + * and the hashcode of the entityId to decide which shard an entity belongs to. + * + * This is recommended since it does not force details about sharding into the entity protocol + * + * @tparam A The type of message accepted by the entity actor + */ +abstract class DefaultNoEnvelopeMessageExtractor[A](maxNumberOfShards: Int) extends TypedMessageExtractor[A, A] { + def entityMessage(message: A) = message + def shardId(message: A) = { + val id = entityId(message) + if (id != null) (math.abs(id.hashCode) % maxNumberOfShards).toString + else null + } +} + +/** + * A reference to an entityId and the local access to sharding, allows for actor-like interaction + * + * The entity ref must be resolved locally and cannot be sent to another node. + * + * TODO what about ask, should it actually implement ActorRef to be exactly like ActorRef and callers does not have + * to know at all about it or is it good with a distinction but lookalike API? + */ +trait EntityRef[A] { + /** + * Send a message to the entity referenced by this EntityRef using *at-most-once* + * messaging semantics. + */ + def tell(msg: A): Unit +} + +object EntityRef { + implicit final class EntityRefOps[T](val ref: EntityRef[T]) extends AnyVal { + /** + * Send a message to the Actor referenced by this ActorRef using *at-most-once* + * messaging semantics. + */ + def !(msg: T): Unit = ref.tell(msg) + } +} + +object ClusterSharding extends ExtensionId[ClusterSharding] { + def createExtension(system: ActorSystem[_]): ClusterSharding = ??? + + /** + * Create an ActorRef-like reference to a specific sharded entity. Messages sent to it will be wrapped + * in a [[ShardingEnvelope]] and passed to the local shard region or proxy. + */ + def entityRefFor[A](entityId: String, actorRef: ActorRef[ShardingEnvelope[A]]): EntityRef[A] = + new EntityRef[A] { + def tell(msg: A): Unit = actorRef ! ShardingEnvelope(entityId, msg) + } + +} + +trait ClusterSharding extends Extension { + + /** + * Spawn a shard region or a proxy depending on if the settings require role and if this node has such a role. + * + * Messages are sent to the entities by wrapping the messages in a [[ShardingEnvelope]] with the entityId of the + * recipient actor. + * A [[DefaultMessageExtractor]] will be used for extracting entityId and shardId + * [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] will be used for shard allocation strategy. + * + * @param behavior The behavior for entities + * @param typeName A name that uniquely identifies the type of entity in this cluster + * @param handOffStopMessage Message sent to an entity to tell it to stop + * @tparam A The type of command the entity accepts + */ + def spawn[A]( + behavior: Behavior[A], + typeName: String, + settings: ClusterShardingSettings, + maxNumberOfShards: Int, + handOffStopMessage: A): ActorRef[ShardingEnvelope[A]] + + /** + * Spawn a shard region or a proxy depending on if the settings require role and if this node has such a role. + * + * @param behavior The behavior for entities + * @param typeName A name that uniquely identifies the type of entity in this cluster + * @param entityProps Props to apply when starting an entity + * @param handOffStopMessage Message sent to an entity to tell it to stop + * @tparam E A possible envelope around the message the entity accepts + * @tparam A The type of command the entity accepts + */ + def spawn[E, A]( + behavior: Behavior[A], + typeName: String, + entityProps: Props, + settings: ClusterShardingSettings, + messageExtractor: TypedMessageExtractor[E, A], + allocationStrategy: ShardAllocationStrategy, + handOffStopMessage: A + ): ActorRef[E] + +} diff --git a/akka-typed/src/main/scala/akka/typed/cluster/ClusterSingleton.scala b/akka-typed/src/main/scala/akka/typed/cluster/ClusterSingleton.scala new file mode 100644 index 0000000000..46201a687c --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/cluster/ClusterSingleton.scala @@ -0,0 +1,135 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package akka.typed.cluster + +import akka.actor.NoSerializationVerificationNeeded +import akka.annotation.{ DoNotInherit, InternalApi } +import akka.cluster.ClusterSettings.DataCenter +import akka.cluster.singleton.{ ClusterSingletonManagerSettings, ClusterSingletonProxySettings } +import akka.typed.cluster.internal.AdaptedClusterSingletonImpl +import akka.typed.{ ActorRef, ActorSystem, Behavior, Extension, ExtensionId, Props } +import com.typesafe.config.Config + +import scala.concurrent.duration.FiniteDuration + +object ClusterSingletonSettings { + def apply( + system: ActorSystem[_] + ): ClusterSingletonSettings = fromConfig(system.settings.config.getConfig("akka.cluster")) + + def fromConfig( + config: Config + ): ClusterSingletonSettings = { + // TODO introduce a config namespace for typed singleton and read that? + // currently singleton name is required and then discarded, for example + val mgrSettings = ClusterSingletonManagerSettings(config.getConfig("singleton")) + val proxySettings = ClusterSingletonProxySettings(config.getConfig("singleton-proxy")) + new ClusterSingletonSettings( + mgrSettings.role, + proxySettings.dataCenter, + proxySettings.singletonIdentificationInterval, + mgrSettings.removalMargin, + mgrSettings.handOverRetryInterval, + proxySettings.bufferSize + ) + } +} + +final class ClusterSingletonSettings( + val role: Option[String], + val dataCenter: Option[DataCenter], + val singletonIdentificationInterval: FiniteDuration, + val removalMargin: FiniteDuration, + val handOverRetryInterval: FiniteDuration, + val bufferSize: Int) extends NoSerializationVerificationNeeded { + + def withRole(role: String): ClusterSingletonSettings = copy(role = Some(role)) + + def withNoRole(): ClusterSingletonSettings = copy(role = None) + + def withDataCenter(dataCenter: DataCenter): ClusterSingletonSettings = copy(dataCenter = Some(dataCenter)) + + def withNoDataCenter(): ClusterSingletonSettings = copy(dataCenter = None) + + def withRemovalMargin(removalMargin: FiniteDuration): ClusterSingletonSettings = copy(removalMargin = removalMargin) + + def withHandoverRetryInterval(handOverRetryInterval: FiniteDuration): ClusterSingletonSettings = copy(handOverRetryInterval = handOverRetryInterval) + + def withBufferSize(bufferSize: Int): ClusterSingletonSettings = copy(bufferSize = bufferSize) + + private def copy( + role: Option[String] = role, + dataCenter: Option[DataCenter] = dataCenter, + singletonIdentificationInterval: FiniteDuration = singletonIdentificationInterval, + removalMargin: FiniteDuration = removalMargin, + handOverRetryInterval: FiniteDuration = handOverRetryInterval, + bufferSize: Int = bufferSize) = + new ClusterSingletonSettings(role, dataCenter, singletonIdentificationInterval, removalMargin, handOverRetryInterval, bufferSize) + + /** + * INTERNAL API: + */ + @InternalApi + private[akka] def toManagerSettings(singletonName: String): ClusterSingletonManagerSettings = + new ClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval) + + /** + * INTERNAL API: + */ + @InternalApi + private[akka] def toProxySettings(singletonName: String): ClusterSingletonProxySettings = + new ClusterSingletonProxySettings(singletonName, role, singletonIdentificationInterval, bufferSize) + + /** + * INTERNAL API: + */ + @InternalApi + private[akka] def shouldRunManager(cluster: Cluster): Boolean = + (role.isEmpty || cluster.selfMember.roles(role.get)) && + (dataCenter.isEmpty || dataCenter.contains(cluster.selfMember.dataCenter)) + +} + +object ClusterSingleton extends ExtensionId[ClusterSingleton] { + + override def createExtension(system: ActorSystem[_]): ClusterSingleton = new AdaptedClusterSingletonImpl(system) + + /** + * Java API: + */ + def get(system: ActorSystem[_]): ClusterSingleton = apply(system) +} + +/** + * INTERNAL API: + */ +@InternalApi +private[akka] object ClusterSingletonImpl { + def managerNameFor(singletonName: String) = s"singletonManager${singletonName}" +} + +/** + * Not intended for user extension. + */ +@DoNotInherit +trait ClusterSingleton extends Extension { + + /** + * Start if needed and provide a proxy to a named singleton + * + * If there already is a manager running for the given `singletonName` on this node, no additional manager is started. + * If there already is a proxy running for the given `singletonName` on this node, an [[ActorRef]] to that is returned. + * + * @param singletonName A cluster global unique name for this singleton + * @return A proxy actor that can be used to communicate with the singleton in the cluster + */ + def spawn[A]( + behavior: Behavior[A], + singletonName: String, + props: Props, + settings: ClusterSingletonSettings, + terminationMessage: A + ): ActorRef[A] + +} diff --git a/akka-typed/src/main/scala/akka/typed/cluster/internal/AdaptedClusterImpl.scala b/akka-typed/src/main/scala/akka/typed/cluster/internal/AdaptedClusterImpl.scala new file mode 100644 index 0000000000..f5325b12e3 --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/cluster/internal/AdaptedClusterImpl.scala @@ -0,0 +1,153 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package akka.typed.cluster.internal + +import akka.actor.ExtendedActorSystem +import akka.annotation.InternalApi +import akka.cluster.ClusterEvent.MemberEvent +import akka.cluster.{ ClusterEvent, MemberStatus } +import akka.typed.{ ActorRef, ActorSystem, Terminated } +import akka.typed.cluster._ +import akka.typed.internal.adapter.ActorSystemAdapter +import akka.typed.scaladsl.Actor +import akka.typed.scaladsl.adapter._ + +/** + * INTERNAL API: + */ +@InternalApi +private[akka] object AdapterClusterImpl { + + private sealed trait SeenState + private case object BeforeUp extends SeenState + private case object Up extends SeenState + private case class Removed(previousStatus: MemberStatus) extends SeenState + + private def subscriptionsBehavior(adaptedCluster: akka.cluster.Cluster) = Actor.deferred[ClusterStateSubscription] { ctx ⇒ + var seenState: SeenState = BeforeUp + var upSubscribers: List[ActorRef[SelfUp]] = Nil + var removedSubscribers: List[ActorRef[SelfRemoved]] = Nil + + adaptedCluster.subscribe(ctx.self.toUntyped, ClusterEvent.initialStateAsEvents, classOf[MemberEvent]) + + // important to not eagerly refer to it or we get a cycle here + lazy val cluster = Cluster(ctx.system) + def onSelfMemberEvent(event: MemberEvent): Unit = { + event match { + case ClusterEvent.MemberUp(_) ⇒ + seenState = Up + val upMessage = SelfUp(cluster.state) + upSubscribers.foreach(_ ! upMessage) + upSubscribers = Nil + + case ClusterEvent.MemberRemoved(_, previousStatus) ⇒ + seenState = Removed(previousStatus) + val removedMessage = SelfRemoved(previousStatus) + removedSubscribers.foreach(_ ! removedMessage) + removedSubscribers = Nil + + case _ ⇒ // This is fine. + } + } + + Actor.immutable[AnyRef] { (ctx, msg) ⇒ + + msg match { + case Subscribe(subscriber: ActorRef[SelfUp] @unchecked, clazz) if clazz == classOf[SelfUp] ⇒ + seenState match { + case Up ⇒ subscriber ! SelfUp(adaptedCluster.state) + case BeforeUp ⇒ + ctx.watch(subscriber) + upSubscribers = subscriber :: upSubscribers + case _: Removed ⇒ + // self did join, but is now no longer up, we want to avoid subscribing + // to not get a memory leak, but also not signal anything + } + Actor.same + + case Subscribe(subscriber: ActorRef[SelfRemoved] @unchecked, clazz) if clazz == classOf[SelfRemoved] ⇒ + seenState match { + case BeforeUp | Up ⇒ removedSubscribers = subscriber :: removedSubscribers + case Removed(s) ⇒ subscriber ! SelfRemoved(s) + } + Actor.same + + case Subscribe(subscriber, eventClass) ⇒ + adaptedCluster.subscribe(subscriber.toUntyped, initialStateMode = ClusterEvent.initialStateAsEvents, eventClass) + Actor.same + + case Unsubscribe(subscriber) ⇒ + adaptedCluster.unsubscribe(subscriber.toUntyped) + Actor.same + + case GetCurrentState(sender) ⇒ + adaptedCluster.sendCurrentClusterState(sender.toUntyped) + Actor.same + + case evt: MemberEvent if evt.member.uniqueAddress == cluster.selfMember.uniqueAddress ⇒ + onSelfMemberEvent(evt) + Actor.same + + case _: MemberEvent ⇒ + Actor.same + + } + }.onSignal { + + case (_, Terminated(ref)) ⇒ + upSubscribers = upSubscribers.filterNot(_ == ref) + removedSubscribers = removedSubscribers.filterNot(_ == ref) + Actor.same + + }.narrow[ClusterStateSubscription] + } + + private def managerBehavior(adaptedCluster: akka.cluster.Cluster) = Actor.immutable[ClusterCommand]((ctx, msg) ⇒ + msg match { + case Join(address) ⇒ + adaptedCluster.join(address) + Actor.same + + case Leave(address) ⇒ + adaptedCluster.leave(address) + Actor.same + + case Down(address) ⇒ + adaptedCluster.down(address) + Actor.same + + case JoinSeedNodes(addresses) ⇒ + adaptedCluster.joinSeedNodes(addresses) + Actor.same + + } + + ) + +} + +/** + * INTERNAL API: + */ +@InternalApi +private[akka] final class AdapterClusterImpl(system: ActorSystem[_]) extends Cluster { + import AdapterClusterImpl._ + + require(system.isInstanceOf[ActorSystemAdapter[_]], "only adapted actor systems can be used for cluster features") + private val untypedSystem = ActorSystemAdapter.toUntyped(system) + private def extendedUntyped = untypedSystem.asInstanceOf[ExtendedActorSystem] + private val untypedCluster = akka.cluster.Cluster(untypedSystem) + + override def selfMember = untypedCluster.selfMember + override def isTerminated = untypedCluster.isTerminated + override def state = untypedCluster.state + + // must not be lazy as it also updates the cached selfMember + override val subscriptions: ActorRef[ClusterStateSubscription] = extendedUntyped.systemActorOf( + PropsAdapter(subscriptionsBehavior(untypedCluster)), "clusterStateSubscriptions") + + override lazy val manager: ActorRef[ClusterCommand] = extendedUntyped.systemActorOf( + PropsAdapter(managerBehavior(untypedCluster)), "clusterCommandManager") + +} diff --git a/akka-typed/src/main/scala/akka/typed/cluster/internal/AdaptedClusterSingletonImpl.scala b/akka-typed/src/main/scala/akka/typed/cluster/internal/AdaptedClusterSingletonImpl.scala new file mode 100644 index 0000000000..0f982e8e81 --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/cluster/internal/AdaptedClusterSingletonImpl.scala @@ -0,0 +1,62 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package akka.typed.cluster.internal + +import java.util.concurrent.ConcurrentHashMap +import java.util.function.{ Function ⇒ JFunction } + +import akka.actor.{ ExtendedActorSystem, InvalidActorNameException } +import akka.annotation.InternalApi +import akka.cluster.singleton.{ ClusterSingletonProxy, ClusterSingletonManager ⇒ OldSingletonManager } +import akka.typed.cluster.{ Cluster, ClusterSingleton, ClusterSingletonImpl, ClusterSingletonSettings } +import akka.typed.internal.adapter.ActorSystemAdapter +import akka.typed.scaladsl.adapter._ +import akka.typed.{ ActorRef, ActorSystem, Behavior, Props } + +/** + * INTERNAL API: + */ +@InternalApi +private[akka] final class AdaptedClusterSingletonImpl(system: ActorSystem[_]) extends ClusterSingleton { + require(system.isInstanceOf[ActorSystemAdapter[_]], "only adapted actor systems can be used for the typed cluster singleton") + import ClusterSingletonImpl._ + + private lazy val cluster = Cluster(system) + private val untypedSystem = ActorSystemAdapter.toUntyped(system).asInstanceOf[ExtendedActorSystem] + + private val proxies = new ConcurrentHashMap[String, ActorRef[_]]() + + override def spawn[A]( + behavior: Behavior[A], + singletonName: String, + props: Props, + settings: ClusterSingletonSettings, + terminationMessage: A) = { + + if (settings.shouldRunManager(cluster)) { + val managerName = managerNameFor(singletonName) + // start singleton on this node + val adaptedProps = PropsAdapter(behavior, props) + try { + untypedSystem.systemActorOf( + OldSingletonManager.props(adaptedProps, terminationMessage, settings.toManagerSettings(singletonName)), + managerName) + } catch { + case ex: InvalidActorNameException if ex.getMessage.endsWith("is not unique!") ⇒ + // This is fine. We just wanted to make sure it is running and it already is + } + } + + val proxyCreator = new JFunction[String, ActorRef[_]] { + def apply(singletonName: String): ActorRef[_] = { + val proxyName = s"singletonProxy$singletonName" + untypedSystem.systemActorOf( + ClusterSingletonProxy.props(s"/system/${managerNameFor(singletonName)}", settings.toProxySettings(singletonName)), + proxyName) + } + } + + proxies.computeIfAbsent(singletonName, proxyCreator).asInstanceOf[ActorRef[A]] + } +} diff --git a/akka-typed/src/main/scala/akka/typed/internal/adapter/ActorSystemAdapter.scala b/akka-typed/src/main/scala/akka/typed/internal/adapter/ActorSystemAdapter.scala index 0aa621f49d..6ed6ad7360 100644 --- a/akka-typed/src/main/scala/akka/typed/internal/adapter/ActorSystemAdapter.scala +++ b/akka-typed/src/main/scala/akka/typed/internal/adapter/ActorSystemAdapter.scala @@ -7,10 +7,14 @@ package adapter import akka.{ actor ⇒ a, dispatch ⇒ d } import akka.dispatch.sysmsg + import scala.concurrent.ExecutionContextExecutor import akka.util.Timeout + import scala.concurrent.Future import akka.annotation.InternalApi +import akka.typed.scaladsl.adapter.AdapterExtension + import scala.annotation.unchecked.uncheckedVariance /** @@ -80,7 +84,18 @@ import scala.annotation.unchecked.uncheckedVariance } private[typed] object ActorSystemAdapter { - def apply(untyped: a.ActorSystem): ActorSystem[Nothing] = new ActorSystemAdapter(untyped.asInstanceOf[a.ActorSystemImpl]) + def apply(untyped: a.ActorSystem): ActorSystem[Nothing] = AdapterExtension(untyped).adapter + + // to make sure we do never create more than one adapter for the same actor system + class AdapterExtension(system: a.ExtendedActorSystem) extends a.Extension { + val adapter = new ActorSystemAdapter(system.asInstanceOf[a.ActorSystemImpl]) + } + object AdapterExtension extends a.ExtensionId[AdapterExtension] with a.ExtensionIdProvider { + override def get(system: a.ActorSystem): AdapterExtension = super.get(system) + override def lookup = AdapterExtension + override def createExtension(system: a.ExtendedActorSystem): AdapterExtension = + new AdapterExtension(system) + } def toUntyped[U](sys: ActorSystem[_]): a.ActorSystem = sys match { @@ -103,3 +118,4 @@ private[typed] object ActorSystemAdapter { "receptionist")) } } + diff --git a/build.sbt b/build.sbt index 87128366e0..e7e1544077 100644 --- a/build.sbt +++ b/build.sbt @@ -158,7 +158,12 @@ lazy val streamTestsTck = akkaModule("akka-stream-tests-tck") .dependsOn(streamTestkit % "test->test", stream) lazy val typed = akkaModule("akka-typed") - .dependsOn(testkit % "compile->compile;test->test", cluster % "compile->compile;test->test", distributedData) + .dependsOn( + testkit % "compile->compile;test->test", + cluster % "compile->compile;test->test", + clusterTools, + clusterSharding, + distributedData) lazy val typedTests = akkaModule("akka-typed-tests") .dependsOn(typed, typedTestkit % "compile->compile;test->test")