diff --git a/akka-typed-tests/src/test/scala/akka/typed/TypedSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/TypedSpec.scala index e2b0a6b886..b17da3bc3c 100644 --- a/akka-typed-tests/src/test/scala/akka/typed/TypedSpec.scala +++ b/akka-typed-tests/src/test/scala/akka/typed/TypedSpec.scala @@ -65,7 +65,7 @@ abstract class TypedSpec(val config: Config) extends TypedSpecSetup { sys } private var adaptedSystemUsed = false - lazy val adaptedSystem: ActorSystem[TypedSpec.Command] = { + lazy val system: ActorSystem[TypedSpec.Command] = { val sys = ActorSystem.adapter(AkkaSpec.getCallerName(classOf[TypedSpec]), guardian(), config = Some(config withFallback AkkaSpec.testConf)) adaptedSystemUsed = true sys @@ -90,7 +90,7 @@ abstract class TypedSpec(val config: Config) extends TypedSpecSetup { } trait AdaptedSystem { - def system: ActorSystem[TypedSpec.Command] = adaptedSystem + def system: ActorSystem[TypedSpec.Command] = TypedSpec.this.system } implicit val timeout = setTimeout @@ -100,7 +100,7 @@ abstract class TypedSpec(val config: Config) extends TypedSpecSetup { if (nativeSystemUsed) Await.result(nativeSystem.terminate, timeout.duration) if (adaptedSystemUsed) - Await.result(adaptedSystem.terminate, timeout.duration) + Await.result(system.terminate, timeout.duration) } // TODO remove after basing on ScalaTest 3 with async support diff --git a/akka-typed-tests/src/test/scala/akka/typed/cluster/ClusterApiSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/cluster/ClusterApiSpec.scala index 2c488e6321..f3d0556657 100644 --- a/akka-typed-tests/src/test/scala/akka/typed/cluster/ClusterApiSpec.scala +++ b/akka-typed-tests/src/test/scala/akka/typed/cluster/ClusterApiSpec.scala @@ -35,21 +35,21 @@ object ClusterApiSpec { class ClusterApiSpec extends TypedSpec(ClusterApiSpec.config) with ScalaFutures { - val testSettings = TestKitSettings(adaptedSystem) - val clusterNode1 = Cluster(adaptedSystem) - val untypedSystem1 = ActorSystemAdapter.toUntyped(adaptedSystem) + val testSettings = TestKitSettings(system) + val clusterNode1 = Cluster(system) + val untypedSystem1 = system.toUntyped object `A typed cluster` { def `01 must join a cluster and observe events from both sides`() = { - val system2 = akka.actor.ActorSystem(adaptedSystem.name, adaptedSystem.settings.config) + val system2 = akka.actor.ActorSystem(system.name, system.settings.config) val adaptedSystem2 = system2.toTyped try { val clusterNode2 = Cluster(adaptedSystem2) - val node1Probe = TestProbe[AnyRef]()(adaptedSystem, testSettings) + val node1Probe = TestProbe[AnyRef]()(system, testSettings) val node2Probe = TestProbe[AnyRef]()(adaptedSystem2, testSettings) // initial cached selfMember diff --git a/akka-typed-tests/src/test/scala/akka/typed/cluster/ClusterShardingApiSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/cluster/ClusterShardingApiSpec.scala deleted file mode 100644 index b364c98922..0000000000 --- a/akka-typed-tests/src/test/scala/akka/typed/cluster/ClusterShardingApiSpec.scala +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Copyright (C) 2009-2017 Lightbend Inc. - */ -package akka.typed.cluster - -import akka.cluster.sharding.ClusterShardingSettings -import akka.typed.scaladsl.Actor -import akka.typed.scaladsl.adapter._ -import akka.typed.{ ActorSystem } - -class ClusterShardingApiSpec { - - // Compile only for now - - val system: akka.actor.ActorSystem = ??? - val typedSystem: ActorSystem[Nothing] = system.toTyped - val cluster = Cluster(typedSystem) - - trait EntityProtocol - case class Add(thing: String) extends EntityProtocol - case object PassHence extends EntityProtocol - - val entityBehavior = - Actor.deferred[EntityProtocol] { _ ⇒ - var things: List[String] = Nil - - Actor.immutable[EntityProtocol] { (_, msg) ⇒ - msg match { - case Add(thing) ⇒ - things = thing :: things - Actor.same - - case PassHence ⇒ - Actor.stopped - } - } - } - - val sharding = ClusterSharding(typedSystem).spawn( - entityBehavior, - "things-lists", - ClusterShardingSettings(typedSystem.settings.config), - maxNumberOfShards = 25, - handOffStopMessage = PassHence - ) - - sharding ! ShardingEnvelope("1", Add("bananas")) - - val entity1 = ClusterSharding.entityRefFor("1", sharding) - entity1 ! Add("pineapple") - - // start but no command - sharding ! StartEntity("2") - -} diff --git a/akka-typed-tests/src/test/scala/akka/typed/cluster/ClusterSingletonApiSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/cluster/ClusterSingletonApiSpec.scala index 02fffe5b2e..afb8b9b14f 100644 --- a/akka-typed-tests/src/test/scala/akka/typed/cluster/ClusterSingletonApiSpec.scala +++ b/akka-typed-tests/src/test/scala/akka/typed/cluster/ClusterSingletonApiSpec.scala @@ -88,23 +88,23 @@ object ClusterSingletonApiSpec { class ClusterSingletonApiSpec extends TypedSpec(ClusterSingletonApiSpec.config) with ScalaFutures { import ClusterSingletonApiSpec._ - implicit val testSettings = TestKitSettings(adaptedSystem) - val clusterNode1 = Cluster(adaptedSystem) - val untypedSystem1 = ActorSystemAdapter.toUntyped(adaptedSystem) + implicit val testSettings = TestKitSettings(system) + val clusterNode1 = Cluster(system) + val untypedSystem1 = system.toUntyped val system2 = akka.actor.ActorSystem( - adaptedSystem.name, + system.name, ConfigFactory.parseString( """ akka.cluster.roles = ["singleton"] - """).withFallback(adaptedSystem.settings.config)) + """).withFallback(system.settings.config)) val adaptedSystem2 = system2.toTyped val clusterNode2 = Cluster(adaptedSystem2) object `A typed cluster singleton` { def `01 must be accessible from two nodes in a cluster`() = { - val node1UpProbe = TestProbe[SelfUp]()(adaptedSystem, implicitly[TestKitSettings]) + val node1UpProbe = TestProbe[SelfUp]()(system, implicitly[TestKitSettings]) clusterNode1.subscriptions ! Subscribe(node1UpProbe.ref, classOf[SelfUp]) val node2UpProbe = TestProbe[SelfUp]()(adaptedSystem2, implicitly[TestKitSettings]) @@ -116,10 +116,10 @@ class ClusterSingletonApiSpec extends TypedSpec(ClusterSingletonApiSpec.config) node1UpProbe.expectMsgType[SelfUp] node2UpProbe.expectMsgType[SelfUp] - val cs1 = ClusterSingleton(adaptedSystem) + val cs1 = ClusterSingleton(system) val cs2 = ClusterSingleton(adaptedSystem2) - val settings = ClusterSingletonSettings(adaptedSystem).withRole("singleton") + val settings = ClusterSingletonSettings(system).withRole("singleton") val node1ref = cs1.spawn(pingPong, "ping-pong", Props.empty, settings, Perish) val node2ref = cs2.spawn(pingPong, "ping-pong", Props.empty, settings, Perish) @@ -127,7 +127,7 @@ class ClusterSingletonApiSpec extends TypedSpec(ClusterSingletonApiSpec.config) cs1.spawn(pingPong, "ping-pong", Props.empty, settings, Perish) should ===(node1ref) cs2.spawn(pingPong, "ping-pong", Props.empty, settings, Perish) should ===(node2ref) - val node1PongProbe = TestProbe[Pong.type]()(adaptedSystem, implicitly[TestKitSettings]) + val node1PongProbe = TestProbe[Pong.type]()(system, implicitly[TestKitSettings]) val node2PongProbe = TestProbe[Pong.type]()(adaptedSystem2, implicitly[TestKitSettings]) node1PongProbe.awaitAssert({ diff --git a/akka-typed-tests/src/test/scala/akka/typed/cluster/internal/MiscMessageSerializerSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/cluster/internal/MiscMessageSerializerSpec.scala index 66e18d02ad..a02ec5c605 100644 --- a/akka-typed-tests/src/test/scala/akka/typed/cluster/internal/MiscMessageSerializerSpec.scala +++ b/akka-typed-tests/src/test/scala/akka/typed/cluster/internal/MiscMessageSerializerSpec.scala @@ -30,9 +30,9 @@ class MiscMessageSerializerSpec extends TypedSpec(MiscMessageSerializerSpec.conf def `must serialize and deserialize typed actor refs `(): Unit = { - val ref = (adaptedSystem ? Create(Actor.empty[Unit], "some-actor")).futureValue + val ref = (system ? Create(Actor.empty[Unit], "some-actor")).futureValue - val serialization = SerializationExtension(ActorSystemAdapter.toUntyped(adaptedSystem)) + val serialization = SerializationExtension(ActorSystemAdapter.toUntyped(system)) val serializer = serialization.findSerializerFor(ref) match { case s: SerializerWithStringManifest ⇒ s diff --git a/akka-typed-tests/src/test/scala/akka/typed/cluster/receptionist/ClusterReceptionistSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/cluster/receptionist/ClusterReceptionistSpec.scala index f7f36df26f..f1e8f316b0 100644 --- a/akka-typed-tests/src/test/scala/akka/typed/cluster/receptionist/ClusterReceptionistSpec.scala +++ b/akka-typed-tests/src/test/scala/akka/typed/cluster/receptionist/ClusterReceptionistSpec.scala @@ -97,6 +97,7 @@ object ClusterReceptionistSpec { class ClusterReceptionistSpec extends TypedSpec(ClusterReceptionistSpec.config) { import ClusterReceptionistSpec._ + val adaptedSystem = system implicit val testSettings = TestKitSettings(adaptedSystem) val untypedSystem1 = ActorSystemAdapter.toUntyped(adaptedSystem) val clusterNode1 = Cluster(untypedSystem1) diff --git a/akka-typed-tests/src/test/scala/akka/typed/cluster/sharding/ClusterShardingSpec.scala b/akka-typed-tests/src/test/scala/akka/typed/cluster/sharding/ClusterShardingSpec.scala new file mode 100644 index 0000000000..f075c0272d --- /dev/null +++ b/akka-typed-tests/src/test/scala/akka/typed/cluster/sharding/ClusterShardingSpec.scala @@ -0,0 +1,176 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ + +package akka.typed.cluster.sharding + +import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy +import akka.typed.{ ActorRef, ActorSystem, Props, TypedSpec } +import akka.typed.cluster.Cluster +import akka.typed.internal.adapter.ActorSystemAdapter +import akka.typed.scaladsl.Actor +import akka.typed.scaladsl.adapter._ +import akka.typed.testkit.TestKitSettings +import akka.typed.testkit.scaladsl.TestProbe +import com.typesafe.config.ConfigFactory +import org.scalatest.concurrent.ScalaFutures + +import scala.concurrent.duration._ + +object ClusterShardingSpec { + val config = ConfigFactory.parseString( + """ + akka.actor.provider = cluster + + // akka.loglevel = debug + + akka.remote.artery.enabled = true + akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 + akka.remote.artery.canonical.hostname = 127.0.0.1 + + akka.cluster.jmx.multi-mbeans-in-same-jvm = on + + akka.coordinated-shutdown.terminate-actor-system = off + + akka.actor { + serialize-messages = off + allow-java-serialization = off + } + """.stripMargin) + + sealed trait TestProtocol + final case class ReplyPlz(toMe: ActorRef[String]) extends TestProtocol + final case class WhoAreYou(replyTo: ActorRef[String]) extends TestProtocol + final case class StopPlz() extends TestProtocol + + sealed trait IdTestProtocol { def id: String } + final case class IdReplyPlz(id: String, toMe: ActorRef[String]) extends IdTestProtocol + final case class IdWhoAreYou(id: String, replyTo: ActorRef[String]) extends IdTestProtocol + final case class IdStopPlz(id: String) extends IdTestProtocol + +} + +class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with ScalaFutures { + import akka.typed.scaladsl.adapter._ + import ClusterShardingSpec._ + + implicit val s = system + implicit val testkitSettings = TestKitSettings(system) + val sharding = ClusterSharding(system) + + implicit val untypedSystem = system.toUntyped + private val untypedCluster = akka.cluster.Cluster(untypedSystem) + + val behavior = Actor.immutable[TestProtocol] { + case (_, StopPlz()) ⇒ + Actor.stopped + + case (ctx, WhoAreYou(replyTo)) ⇒ + replyTo ! s"I'm ${ctx.self.path.name}" + Actor.same + + case (_, ReplyPlz(toMe)) ⇒ + toMe ! "Hello!" + Actor.same + } + val behaviorWithId = Actor.immutable[IdTestProtocol] { + case (_, IdStopPlz(_)) ⇒ + Actor.stopped + + case (ctx, IdWhoAreYou(_, replyTo)) ⇒ + replyTo ! s"I'm ${ctx.self.path.name}" + Actor.same + + case (_, IdReplyPlz(_, toMe)) ⇒ + toMe ! "Hello!" + Actor.same + } + + object `Typed cluster sharding` { + + untypedCluster.join(untypedCluster.selfAddress) + + def `01 must send messsages via cluster sharding, using envelopes`(): Unit = { + val ref = sharding.spawn( + behavior, + Props.empty, + "envelope-shard", + ClusterShardingSettings(system), + 10, + StopPlz()) + + val p = TestProbe[String]() + ref ! ShardingEnvelope("test", ReplyPlz(p.ref)) + p.expectMsg(3.seconds, "Hello!") + + ref ! ShardingEnvelope("test", StopPlz()) + } + def `02 must send messsages via cluster sharding, without envelopes`(): Unit = { + val ref = sharding.spawn( + behaviorWithId, + Props.empty, + "no-envelope-shard", + ClusterShardingSettings(system), + ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, _.id), + IdStopPlz("THE_ID_HERE")) + + val p = TestProbe[String]() + ref ! IdReplyPlz("test", p.ref) + p.expectMsg(3.seconds, "Hello!") + + ref ! IdStopPlz("test") + } + + // def `03 fail if starting sharding for already used typeName, but with wrong type`(): Unit = { + // val ex = intercept[Exception] { + // sharding.spawn( + // Actor.empty[String], + // Props.empty, + // "example-02", + // ClusterShardingSettings(adaptedSystem), + // 10, + // "STOP" + // ) + // } + // + // ex.getMessage should include("already started") + // } + + untypedCluster.join(untypedCluster.selfAddress) + + def `11 EntityRef - tell`(): Unit = { + val charlieRef: EntityRef[TestProtocol] = + sharding.entityRefFor[TestProtocol]("envelope-shard", "charlie") + + val p = TestProbe[String]() + + charlieRef ! WhoAreYou(p.ref) + p.expectMsg(3.seconds, "I'm charlie") + + charlieRef tell WhoAreYou(p.ref) + p.expectMsg(3.seconds, "I'm charlie") + + charlieRef ! StopPlz() + } + + def `11 EntityRef - ask`(): Unit = { + val bobRef: EntityRef[TestProtocol] = + sharding.entityRefFor[TestProtocol]("envelope-shard", "bob") + val charlieRef: EntityRef[TestProtocol] = + sharding.entityRefFor[TestProtocol]("envelope-shard", "charlie") + + val p = TestProbe[String]() + + val reply1 = bobRef ? WhoAreYou // TODO document that WhoAreYou(_) would not work + reply1.futureValue should ===("I'm bob") + + val reply2 = charlieRef ask WhoAreYou + reply2.futureValue should ===("I'm charlie") + + bobRef ! StopPlz() + } + + } + +} diff --git a/akka-typed/src/main/scala/akka/typed/cluster/ClusterSharding.scala b/akka-typed/src/main/scala/akka/typed/cluster/ClusterSharding.scala deleted file mode 100644 index a21f21161f..0000000000 --- a/akka-typed/src/main/scala/akka/typed/cluster/ClusterSharding.scala +++ /dev/null @@ -1,196 +0,0 @@ -/** - * Copyright (C) 2009-2017 Lightbend Inc. - */ -package akka.typed.cluster - -import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy -import akka.cluster.sharding.ClusterShardingSettings -import akka.typed.{ ActorRef, ActorSystem, Behavior, Extension, ExtensionId, Props } - -sealed case class ShardingEnvelope[A](entityId: String, message: A) -object StartEntity { - def apply[A](entityId: String): ShardingEnvelope[A] = - new ShardingEnvelope[A](entityId, null.asInstanceOf[A]) -} - -object TypedMessageExtractor { - - /** - * Scala API: - * - * Create the default message extractor, using envelopes to identify what entity a message is for - * and the hashcode of the entityId to decide which shard an entity belongs to. - * - * This is recommended since it does not force details about sharding into the entity protocol - */ - def apply[A](maxNumberOfShards: Int): TypedMessageExtractor[ShardingEnvelope[A], A] = - new DefaultMessageExtractor[A](maxNumberOfShards) - - /** - * Scala API: - * - * Create a message extractor for a protocol where the entity id is available in each message. - */ - def noEnvelope[A]( - maxNumberOfShards: Int, - extractEntityId: A ⇒ String - ): TypedMessageExtractor[A, A] = - new DefaultNoEnvelopeMessageExtractor[A](maxNumberOfShards) { - // TODO catch MatchError here and return null for those to yield an "unhandled" when partial functions are used? - def entityId(message: A) = extractEntityId(message) - } - -} - -/** - * Entirely customizable typed message extractor. Prefer [[DefaultMessageExtractor]] or [[DefaultNoEnvelopeMessageExtractor]] - * if possible. - * - * @tparam E Possibly an envelope around the messages accepted by the entity actor, is the same as `A` if there is no - * envelope. - * @tparam A The type of message accepted by the entity actor - */ -trait TypedMessageExtractor[E, A] { - - /** - * Extract the entity id from an incoming `message`. If `null` is returned - * the message will be `unhandled`, i.e. posted as `Unhandled` messages on the event stream - */ - def entityId(message: E): String - - /** - * Extract the message to send to the entity from an incoming `message`. - * Note that the extracted message does not have to be the same as the incoming - * message to support wrapping in message envelope that is unwrapped before - * sending to the entity actor. - * - * If the returned value is `null`, and the entity isn't running yet the entity will be started - * but no message will be delivered to it. - */ - def entityMessage(message: E): A - - /** - * Extract the entity id from an incoming `message`. Only messages that passed the [[#entityId]] - * function will be used as input to this function. - */ - def shardId(message: E): String -} - -/** - * Java API: - * - * Default message extractor type, using envelopes to identify what entity a message is for - * and the hashcode of the entityId to decide which shard an entity belongs to. - * - * This is recommended since it does not force details about sharding into the entity protocol - * - * @tparam A The type of message accepted by the entity actor - */ -final class DefaultMessageExtractor[A](maxNumberOfShards: Int) extends TypedMessageExtractor[ShardingEnvelope[A], A] { - def entityId(envelope: ShardingEnvelope[A]) = envelope.entityId - def entityMessage(envelope: ShardingEnvelope[A]) = envelope.message - def shardId(envelope: ShardingEnvelope[A]) = (math.abs(envelope.entityId.hashCode) % maxNumberOfShards).toString -} - -/** - * Java API: - * - * Default message extractor type, using a property of the message to identify what entity a message is for - * and the hashcode of the entityId to decide which shard an entity belongs to. - * - * This is recommended since it does not force details about sharding into the entity protocol - * - * @tparam A The type of message accepted by the entity actor - */ -abstract class DefaultNoEnvelopeMessageExtractor[A](maxNumberOfShards: Int) extends TypedMessageExtractor[A, A] { - def entityMessage(message: A) = message - def shardId(message: A) = { - val id = entityId(message) - if (id != null) (math.abs(id.hashCode) % maxNumberOfShards).toString - else null - } -} - -/** - * A reference to an entityId and the local access to sharding, allows for actor-like interaction - * - * The entity ref must be resolved locally and cannot be sent to another node. - * - * TODO what about ask, should it actually implement ActorRef to be exactly like ActorRef and callers does not have - * to know at all about it or is it good with a distinction but lookalike API? - */ -trait EntityRef[A] { - /** - * Send a message to the entity referenced by this EntityRef using *at-most-once* - * messaging semantics. - */ - def tell(msg: A): Unit -} - -object EntityRef { - implicit final class EntityRefOps[T](val ref: EntityRef[T]) extends AnyVal { - /** - * Send a message to the Actor referenced by this ActorRef using *at-most-once* - * messaging semantics. - */ - def !(msg: T): Unit = ref.tell(msg) - } -} - -object ClusterSharding extends ExtensionId[ClusterSharding] { - def createExtension(system: ActorSystem[_]): ClusterSharding = ??? - - /** - * Create an ActorRef-like reference to a specific sharded entity. Messages sent to it will be wrapped - * in a [[ShardingEnvelope]] and passed to the local shard region or proxy. - */ - def entityRefFor[A](entityId: String, actorRef: ActorRef[ShardingEnvelope[A]]): EntityRef[A] = - new EntityRef[A] { - def tell(msg: A): Unit = actorRef ! ShardingEnvelope(entityId, msg) - } - -} - -trait ClusterSharding extends Extension { - - /** - * Spawn a shard region or a proxy depending on if the settings require role and if this node has such a role. - * - * Messages are sent to the entities by wrapping the messages in a [[ShardingEnvelope]] with the entityId of the - * recipient actor. - * A [[DefaultMessageExtractor]] will be used for extracting entityId and shardId - * [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] will be used for shard allocation strategy. - * - * @param behavior The behavior for entities - * @param typeName A name that uniquely identifies the type of entity in this cluster - * @param handOffStopMessage Message sent to an entity to tell it to stop - * @tparam A The type of command the entity accepts - */ - def spawn[A]( - behavior: Behavior[A], - typeName: String, - settings: ClusterShardingSettings, - maxNumberOfShards: Int, - handOffStopMessage: A): ActorRef[ShardingEnvelope[A]] - - /** - * Spawn a shard region or a proxy depending on if the settings require role and if this node has such a role. - * - * @param behavior The behavior for entities - * @param typeName A name that uniquely identifies the type of entity in this cluster - * @param entityProps Props to apply when starting an entity - * @param handOffStopMessage Message sent to an entity to tell it to stop - * @tparam E A possible envelope around the message the entity accepts - * @tparam A The type of command the entity accepts - */ - def spawn[E, A]( - behavior: Behavior[A], - typeName: String, - entityProps: Props, - settings: ClusterShardingSettings, - messageExtractor: TypedMessageExtractor[E, A], - allocationStrategy: ShardAllocationStrategy, - handOffStopMessage: A - ): ActorRef[E] - -} diff --git a/akka-typed/src/main/scala/akka/typed/cluster/ClusterSingleton.scala b/akka-typed/src/main/scala/akka/typed/cluster/ClusterSingleton.scala index 46201a687c..fc5cd036bb 100644 --- a/akka-typed/src/main/scala/akka/typed/cluster/ClusterSingleton.scala +++ b/akka-typed/src/main/scala/akka/typed/cluster/ClusterSingleton.scala @@ -6,12 +6,14 @@ package akka.typed.cluster import akka.actor.NoSerializationVerificationNeeded import akka.annotation.{ DoNotInherit, InternalApi } import akka.cluster.ClusterSettings.DataCenter -import akka.cluster.singleton.{ ClusterSingletonManagerSettings, ClusterSingletonProxySettings } +import akka.cluster.singleton.{ ClusterSingletonProxySettings, ClusterSingletonManagerSettings ⇒ UntypedClusterSingletonManagerSettings } import akka.typed.cluster.internal.AdaptedClusterSingletonImpl +import akka.typed.internal.adapter.ActorSystemAdapter import akka.typed.{ ActorRef, ActorSystem, Behavior, Extension, ExtensionId, Props } import com.typesafe.config.Config +import scala.concurrent.duration._ -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.{ Duration, FiniteDuration } object ClusterSingletonSettings { def apply( @@ -71,8 +73,8 @@ final class ClusterSingletonSettings( * INTERNAL API: */ @InternalApi - private[akka] def toManagerSettings(singletonName: String): ClusterSingletonManagerSettings = - new ClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval) + private[akka] def toManagerSettings(singletonName: String): UntypedClusterSingletonManagerSettings = + new UntypedClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval) /** * INTERNAL API: @@ -133,3 +135,92 @@ trait ClusterSingleton extends Extension { ): ActorRef[A] } + +object ClusterSingletonManagerSettings { + import akka.typed.scaladsl.adapter._ + + /** + * Create settings from the default configuration + * `akka.cluster.singleton`. + */ + def apply(system: ActorSystem[_]): ClusterSingletonManagerSettings = + apply(system.settings.config.getConfig("akka.cluster.singleton")) + .withRemovalMargin(akka.cluster.Cluster(system.toUntyped).settings.DownRemovalMargin) + + /** + * Create settings from a configuration with the same layout as + * the default configuration `akka.cluster.singleton`. + */ + def apply(config: Config): ClusterSingletonManagerSettings = + new ClusterSingletonManagerSettings( + singletonName = config.getString("singleton-name"), + role = roleOption(config.getString("role")), + removalMargin = Duration.Zero, // defaults to ClusterSettins.DownRemovalMargin + handOverRetryInterval = config.getDuration("hand-over-retry-interval", MILLISECONDS).millis) + + /** + * Java API: Create settings from the default configuration + * `akka.cluster.singleton`. + */ + def create(system: ActorSystem[_]): ClusterSingletonManagerSettings = apply(system) + + /** + * Java API: Create settings from a configuration with the same layout as + * the default configuration `akka.cluster.singleton`. + */ + def create(config: Config): ClusterSingletonManagerSettings = apply(config) + + /** + * INTERNAL API + */ + private[akka] def roleOption(role: String): Option[String] = + if (role == "") None else Option(role) + +} + +/** + * @param singletonName The actor name of the child singleton actor. + * + * @param role Singleton among the nodes tagged with specified role. + * If the role is not specified it's a singleton among all nodes in + * the cluster. + * + * @param removalMargin Margin until the singleton instance that belonged to + * a downed/removed partition is created in surviving partition. The purpose of + * this margin is that in case of a network partition the singleton actors + * in the non-surviving partitions must be stopped before corresponding actors + * are started somewhere else. This is especially important for persistent + * actors. + * + * @param handOverRetryInterval When a node is becoming oldest it sends hand-over + * request to previous oldest, that might be leaving the cluster. This is + * retried with this interval until the previous oldest confirms that the hand + * over has started or the previous oldest member is removed from the cluster + * (+ `removalMargin`). + */ +final class ClusterSingletonManagerSettings( + val singletonName: String, + val role: Option[String], + val removalMargin: FiniteDuration, + val handOverRetryInterval: FiniteDuration) extends NoSerializationVerificationNeeded { + + def withSingletonName(name: String): ClusterSingletonManagerSettings = copy(singletonName = name) + + def withRole(role: String): ClusterSingletonManagerSettings = copy(role = UntypedClusterSingletonManagerSettings.roleOption(role)) + + def withRole(role: Option[String]) = copy(role = role) + + def withRemovalMargin(removalMargin: FiniteDuration): ClusterSingletonManagerSettings = + copy(removalMargin = removalMargin) + + def withHandOverRetryInterval(retryInterval: FiniteDuration): ClusterSingletonManagerSettings = + copy(handOverRetryInterval = retryInterval) + + private def copy( + singletonName: String = singletonName, + role: Option[String] = role, + removalMargin: FiniteDuration = removalMargin, + handOverRetryInterval: FiniteDuration = handOverRetryInterval): ClusterSingletonManagerSettings = + new ClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval) +} + diff --git a/akka-typed/src/main/scala/akka/typed/cluster/internal/AdaptedClusterImpl.scala b/akka-typed/src/main/scala/akka/typed/cluster/internal/AdaptedClusterImpl.scala index f5325b12e3..198400716a 100644 --- a/akka-typed/src/main/scala/akka/typed/cluster/internal/AdaptedClusterImpl.scala +++ b/akka-typed/src/main/scala/akka/typed/cluster/internal/AdaptedClusterImpl.scala @@ -135,7 +135,7 @@ private[akka] final class AdapterClusterImpl(system: ActorSystem[_]) extends Clu import AdapterClusterImpl._ require(system.isInstanceOf[ActorSystemAdapter[_]], "only adapted actor systems can be used for cluster features") - private val untypedSystem = ActorSystemAdapter.toUntyped(system) + private val untypedSystem = system.toUntyped private def extendedUntyped = untypedSystem.asInstanceOf[ExtendedActorSystem] private val untypedCluster = akka.cluster.Cluster(untypedSystem) diff --git a/akka-typed/src/main/scala/akka/typed/cluster/internal/AdaptedClusterSingletonImpl.scala b/akka-typed/src/main/scala/akka/typed/cluster/internal/AdaptedClusterSingletonImpl.scala index 0f982e8e81..ef60ddb4d4 100644 --- a/akka-typed/src/main/scala/akka/typed/cluster/internal/AdaptedClusterSingletonImpl.scala +++ b/akka-typed/src/main/scala/akka/typed/cluster/internal/AdaptedClusterSingletonImpl.scala @@ -21,9 +21,10 @@ import akka.typed.{ ActorRef, ActorSystem, Behavior, Props } private[akka] final class AdaptedClusterSingletonImpl(system: ActorSystem[_]) extends ClusterSingleton { require(system.isInstanceOf[ActorSystemAdapter[_]], "only adapted actor systems can be used for the typed cluster singleton") import ClusterSingletonImpl._ + import akka.typed.scaladsl.adapter._ private lazy val cluster = Cluster(system) - private val untypedSystem = ActorSystemAdapter.toUntyped(system).asInstanceOf[ExtendedActorSystem] + private val untypedSystem = system.toUntyped.asInstanceOf[ExtendedActorSystem] private val proxies = new ConcurrentHashMap[String, ActorRef[_]]() diff --git a/akka-typed/src/main/scala/akka/typed/cluster/sharding/ClusterSharding.scala b/akka-typed/src/main/scala/akka/typed/cluster/sharding/ClusterSharding.scala new file mode 100644 index 0000000000..7ec0a531b1 --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/cluster/sharding/ClusterSharding.scala @@ -0,0 +1,344 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed.cluster.sharding + +import akka.annotation.{ DoNotInherit, InternalApi } +import akka.cluster.sharding.ShardCoordinator.{ LeastShardAllocationStrategy, ShardAllocationStrategy } +import akka.cluster.sharding.{ ClusterSharding ⇒ UntypedClusterSharding, ShardRegion ⇒ UntypedShardRegion } +import akka.typed.cluster.Cluster +import akka.typed.internal.adapter.{ ActorRefAdapter, ActorSystemAdapter } +import akka.typed.scaladsl.adapter.PropsAdapter +import akka.typed.{ ActorRef, ActorSystem, Behavior, Extension, ExtensionId, Props } + +import scala.language.implicitConversions + +/** + * Default envelope type that may be used with Cluster Sharding. + * + * Cluster Sharding provides a default [[HashCodeMessageExtractor]] that is able to handle + * these types of messages, by hashing the entityId into into the shardId. It is not the only, + * but a convenient way to send envelope-wrapped messages via cluster sharding. + * + * The alternative way of routing messages through sharding is to not use envelopes, + * and have the message types themselfs carry identifiers. + */ +final case class ShardingEnvelope[A](entityId: String, message: A) // TODO think if should remain a case class + +/** Allows starting a specific Sharded Entity by its entity identifier */ +object StartEntity { + + /** + * Returns [[ShardingEnvelope]] which can be sent via Cluster Sharding in order to wake up the + * specified (by `entityId`) Sharded Entity, ''without'' delivering a real message to it. + */ + def apply[A](entityId: String): ShardingEnvelope[A] = + new ShardingEnvelope[A](entityId, null.asInstanceOf[A]) // TODO should we instead sub-class here somehow? + + /** + * Java API + * + * Returns [[ShardingEnvelope]] which can be sent via Cluster Sharding in order to wake up the + * specified (by `entityId`) Sharded Entity, ''without'' delivering a real message to it. + */ + def create[A](msgClass: Class[A], entityId: String): ShardingEnvelope[A] = + apply[A](entityId) +} + +object ShardingMessageExtractor { + + /** + * Scala API: + * + * Create the default message extractor, using envelopes to identify what entity a message is for + * and the hashcode of the entityId to decide which shard an entity belongs to. + * + * This is recommended since it does not force details about sharding into the entity protocol + */ + def apply[A](maxNumberOfShards: Int): ShardingMessageExtractor[ShardingEnvelope[A], A] = + new HashCodeMessageExtractor[A](maxNumberOfShards) + + /** + * Create a message extractor for a protocol where the entity id is available in each message. + */ + def noEnvelope[A]( + maxNumberOfShards: Int, + extractEntityId: A ⇒ String + ): ShardingMessageExtractor[A, A] = + new HashCodeNoEnvelopeMessageExtractor[A](maxNumberOfShards) { + // TODO catch MatchError here and return null for those to yield an "unhandled" when partial functions are used? + def entityId(message: A) = extractEntityId(message) + } + +} + +/** + * Entirely customizable typed message extractor. Prefer [[HashCodeMessageExtractor]] or [[HashCodeNoEnvelopeMessageExtractor]] + * if possible. + * + * @tparam E Possibly an Envelope around the messages accepted by the entity actor, is the same as `A` if there is no + * envelope. + * @tparam A The type of message accepted by the entity actor + */ +trait ShardingMessageExtractor[E, A] { + + /** + * Extract the entity id from an incoming `message`. If `null` is returned + * the message will be `unhandled`, i.e. posted as `Unhandled` messages on the event stream + */ + def entityId(message: E): String + + /** + * Extract the message to send to the entity from an incoming `message`. + * Note that the extracted message does not have to be the same as the incoming + * message to support wrapping in message envelope that is unwrapped before + * sending to the entity actor. + * + * If the returned value is `null`, and the entity isn't running yet the entity will be started + * but no message will be delivered to it. + */ + def entityMessage(message: E): A // TODO "unwrapMessage" is how I'd call it? + + /** + * Extract the entity id from an incoming `message`. Only messages that passed the [[#entityId]] + * function will be used as input to this function. + */ + def shardId(message: E): String +} + +/** + * Java API: + * + * Default message extractor type, using envelopes to identify what entity a message is for + * and the hashcode of the entityId to decide which shard an entity belongs to. + * + * This is recommended since it does not force details about sharding into the entity protocol + * + * @tparam A The type of message accepted by the entity actor + */ +final class HashCodeMessageExtractor[A](maxNumberOfShards: Int) extends ShardingMessageExtractor[ShardingEnvelope[A], A] { + def entityId(envelope: ShardingEnvelope[A]): String = envelope.entityId + def entityMessage(envelope: ShardingEnvelope[A]): A = envelope.message + def shardId(envelope: ShardingEnvelope[A]): String = (math.abs(envelope.entityId.hashCode) % maxNumberOfShards).toString +} + +/** + * Java API: + * + * Default message extractor type, using a property of the message to identify what entity a message is for + * and the hashcode of the entityId to decide which shard an entity belongs to. + * + * This is recommended since it does not force details about sharding into the entity protocol + * + * @tparam A The type of message accepted by the entity actor + */ +abstract class HashCodeNoEnvelopeMessageExtractor[A](maxNumberOfShards: Int) extends ShardingMessageExtractor[A, A] { + final def entityMessage(message: A): A = message + def shardId(message: A): String = { + val id = entityId(message) + if (id != null) (math.abs(id.hashCode) % maxNumberOfShards).toString + else null + } + + override def toString = s"HashCodeNoEnvelopeMessageExtractor($maxNumberOfShards)" +} + +object ClusterSharding extends ExtensionId[ClusterSharding] { + + override def createExtension(system: ActorSystem[_]): ClusterSharding = + new AdaptedClusterShardingImpl(system) + + /** Java API */ + def get(system: ActorSystem[_]): ClusterSharding = apply(system) +} + +/** INTERNAL API */ +@InternalApi +final class AdaptedClusterShardingImpl(system: ActorSystem[_]) extends ClusterSharding { + import akka.typed.scaladsl.adapter._ + require(system.isInstanceOf[ActorSystemAdapter[_]], "only adapted untyped actor systems can be used for cluster features") + + private val cluster = Cluster(system) + private val untypedSystem = system.toUntyped + private val untypedSharding = akka.cluster.sharding.ClusterSharding(untypedSystem) + + override def spawn[A]( + behavior: Behavior[A], + entityProps: Props, + typeName: String, + settings: ClusterShardingSettings, + maxNumberOfShards: Int, + handOffStopMessage: A): ActorRef[ShardingEnvelope[A]] = { + val extractor = new HashCodeMessageExtractor[A](maxNumberOfShards) + spawn(behavior, entityProps, typeName, settings, extractor, defaultShardAllocationStrategy(settings), handOffStopMessage) + } + + override def spawn[E, A]( + behavior: Behavior[A], + entityProps: Props, + typeName: String, + settings: ClusterShardingSettings, + messageExtractor: ShardingMessageExtractor[E, A], + handOffStopMessage: A): ActorRef[E] = + spawn(behavior, entityProps, typeName, settings, messageExtractor, defaultShardAllocationStrategy(settings), handOffStopMessage) + + override def spawn[E, A]( + behavior: Behavior[A], + entityProps: Props, + typeName: String, + settings: ClusterShardingSettings, + extractor: ShardingMessageExtractor[E, A], + allocationStrategy: ShardAllocationStrategy, + handOffStopMessage: A): ActorRef[E] = { + + val untypedSettings = ClusterShardingSettings.toUntypedSettings(settings) + + val ref = + if (settings.shouldHostShard(cluster)) { + system.log.info("Starting Shard Region [{}]...") + untypedSharding.start( + typeName, + PropsAdapter(behavior, entityProps), + untypedSettings, + extractor, extractor, + defaultShardAllocationStrategy(settings), + handOffStopMessage + ) + } else { + system.log.info("Starting Shard Region Proxy [{}] (no actors will be hosted on this node)...") + + untypedSharding.startProxy( + typeName, + settings.role, + dataCenter = None, // TODO what about the multi-dc value here? + extractShardId = extractor, + extractEntityId = extractor + ) + } + + ActorRefAdapter(ref) + } + + override def entityRefFor[A](typeName: String, entityId: String): EntityRef[A] = { + new AdaptedEntityRefImpl[A](untypedSharding.shardRegion(typeName), entityId) + } + + override def getEntityRefFor[A](msgClass: Class[A], typeName: String, entityId: String): EntityRef[A] = + entityRefFor[A](typeName, entityId) + + override def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy = { + val threshold = settings.tuningParameters.leastShardAllocationRebalanceThreshold + val maxSimultaneousRebalance = settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance + new LeastShardAllocationStrategy(threshold, maxSimultaneousRebalance) + } + + // --- extractor conversions --- + @InternalApi + private implicit def convertExtractEntityId[E, A](extractor: ShardingMessageExtractor[E, A]): UntypedShardRegion.ExtractEntityId = { + // TODO what if msg was null + case msg: E if extractor.entityId(msg.asInstanceOf[E]) ne null ⇒ + // we're evaluating entityId twice, I wonder if we could do it just once (same was in old sharding's Java DSL) + + (extractor.entityId(msg.asInstanceOf[E]), extractor.entityMessage(msg.asInstanceOf[E])) + } + @InternalApi + private implicit def convertExtractShardId[E, A](extractor: ShardingMessageExtractor[E, A]): UntypedShardRegion.ExtractShardId = { + case msg: E ⇒ extractor.shardId(msg) + } +} + +@DoNotInherit +sealed trait ClusterSharding extends Extension { + + /** + * Spawn a shard region or a proxy depending on if the settings require role and if this node has such a role. + * + * Messages are sent to the entities by wrapping the messages in a [[ShardingEnvelope]] with the entityId of the + * recipient actor. + * A [[HashCodeMessageExtractor]] will be used for extracting entityId and shardId + * [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] will be used for shard allocation strategy. + * + * @param behavior The behavior for entities + * @param typeName A name that uniquely identifies the type of entity in this cluster + * @param handOffStopMessage Message sent to an entity to tell it to stop + * @tparam A The type of command the entity accepts + */ + // TODO: FYI, I think it would be very good to have rule that "behavior, otherstuff" + // TODO: or "behavior, props, otherstuff" be the consistent style we want to promote in parameter ordering, WDYT? + def spawn[A]( + behavior: Behavior[A], + props: Props, + typeName: String, + settings: ClusterShardingSettings, + maxNumberOfShards: Int, + handOffStopMessage: A): ActorRef[ShardingEnvelope[A]] + + /** + * Spawn a shard region or a proxy depending on if the settings require role and if this node has such a role. + * + * @param behavior The behavior for entities + * @param typeName A name that uniquely identifies the type of entity in this cluster + * @param entityProps Props to apply when starting an entity + * @param allocationStrategy Allocation strategy which decides on which nodes to allocate new shards + * @param handOffStopMessage Message sent to an entity to tell it to stop + * @tparam E A possible envelope around the message the entity accepts + * @tparam A The type of command the entity accepts + */ + def spawn[E, A]( + behavior: Behavior[A], + entityProps: Props, + typeName: String, + settings: ClusterShardingSettings, + messageExtractor: ShardingMessageExtractor[E, A], + allocationStrategy: ShardAllocationStrategy, + handOffStopMessage: A + ): ActorRef[E] + + /** + * Spawn a shard region or a proxy depending on if the settings require role and if this node has such a role. + * + * @param behavior The behavior for entities + * @param typeName A name that uniquely identifies the type of entity in this cluster + * @param entityProps Props to apply when starting an entity + * @param handOffStopMessage Message sent to an entity to tell it to stop + * @tparam E A possible envelope around the message the entity accepts + * @tparam A The type of command the entity accepts + */ + def spawn[E, A]( + behavior: Behavior[A], + entityProps: Props, + typeName: String, + settings: ClusterShardingSettings, + messageExtractor: ShardingMessageExtractor[E, A], + handOffStopMessage: A + ): ActorRef[E] + + /** + * Create an `ActorRef`-like reference to a specific sharded entity. + * Currently you have to correctly specify the type of messages the target can handle. + * + * Messages sent through this [[EntityRef]] will be wrapped in a [[ShardingEnvelope]] including the + * here provided `entityId`. + * + * FIXME a more typed version of this API will be explored in https://github.com/akka/akka/issues/23690 + * + * For in-depth documentation of its semantics, see [[EntityRef]]. + */ + def entityRefFor[A](typeName: String, entityId: String): EntityRef[A] + + /** + * Java API: Create an `ActorRef`-like reference to a specific sharded entity. + * Messages sent to it will be wrapped in a [[ShardingEnvelope]] and passed to the local shard region or proxy. + * + * Messages sent through this [[EntityRef]] will be wrapped in a [[ShardingEnvelope]] including the + * here provided `entityId`. + * + * FIXME a more typed version of this API will be explored in https://github.com/akka/akka/issues/23690 + * + * For in-depth documentation of its semantics, see [[EntityRef]]. + */ + def getEntityRefFor[A](msgClass: Class[A], typeName: String, entityId: String): EntityRef[A] + + /** The default ShardAllocationStrategy currently is [[LeastShardAllocationStrategy]] however could be changed in the future. */ + def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy +} diff --git a/akka-typed/src/main/scala/akka/typed/cluster/sharding/ClusterShardingSettings.scala b/akka-typed/src/main/scala/akka/typed/cluster/sharding/ClusterShardingSettings.scala new file mode 100644 index 0000000000..a8fa24860f --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/cluster/sharding/ClusterShardingSettings.scala @@ -0,0 +1,282 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ + +package akka.typed.cluster.sharding + +import akka.actor.NoSerializationVerificationNeeded +import akka.annotation.InternalApi +import akka.cluster.sharding.{ ClusterShardingSettings ⇒ UntypedShardingSettings } +import akka.cluster.singleton.{ ClusterSingletonManagerSettings ⇒ UntypedClusterSingletonManagerSettings } +import akka.typed.ActorSystem +import akka.typed.cluster.{ Cluster, ClusterSingletonManagerSettings } +import com.typesafe.config.Config + +import scala.concurrent.duration.FiniteDuration + +object ClusterShardingSettings { + + /** Scala API: Creates new cluster sharding settings object */ + def apply(system: ActorSystem[_]): ClusterShardingSettings = + fromConfig(system.settings.config.getConfig("akka.cluster.sharding")) + + def fromConfig(config: Config): ClusterShardingSettings = { + val untypedSettings = UntypedShardingSettings(config) + fromUntypedSettings(untypedSettings) + } + + /** Java API: Creates new cluster sharding settings object */ + def create(system: ActorSystem[_]): ClusterShardingSettings = + apply(system) + + /** INTERNAL API: Indended only for internal use, it is not recommended to keep converting between the setting types */ + private[akka] def fromUntypedSettings(untypedSettings: UntypedShardingSettings): ClusterShardingSettings = { + new ClusterShardingSettings( + role = untypedSettings.role, + rememberEntities = untypedSettings.rememberEntities, + journalPluginId = untypedSettings.journalPluginId, + snapshotPluginId = untypedSettings.snapshotPluginId, + stateStoreMode = StateStoreMode.byName(untypedSettings.stateStoreMode), + new TuningParameters(untypedSettings.tuningParameters), + new ClusterSingletonManagerSettings( + untypedSettings.coordinatorSingletonSettings.singletonName, + untypedSettings.coordinatorSingletonSettings.role, + untypedSettings.coordinatorSingletonSettings.removalMargin, + untypedSettings.coordinatorSingletonSettings.handOverRetryInterval + ) + ) + } + + /** INTERNAL API: Indended only for internal use, it is not recommended to keep converting between the setting types */ + private[akka] def toUntypedSettings(settings: ClusterShardingSettings): UntypedShardingSettings = { + new UntypedShardingSettings( + role = settings.role, + rememberEntities = settings.rememberEntities, + journalPluginId = settings.journalPluginId, + snapshotPluginId = settings.snapshotPluginId, + stateStoreMode = settings.stateStoreMode.name, + new UntypedShardingSettings.TuningParameters( + bufferSize = settings.tuningParameters.bufferSize, + coordinatorFailureBackoff = settings.tuningParameters.coordinatorFailureBackoff, + retryInterval = settings.tuningParameters.retryInterval, + handOffTimeout = settings.tuningParameters.handOffTimeout, + shardStartTimeout = settings.tuningParameters.shardStartTimeout, + shardFailureBackoff = settings.tuningParameters.shardFailureBackoff, + entityRestartBackoff = settings.tuningParameters.entityRestartBackoff, + rebalanceInterval = settings.tuningParameters.rebalanceInterval, + snapshotAfter = settings.tuningParameters.snapshotAfter, + keepNrOfBatches = settings.tuningParameters.keepNrOfBatches, + leastShardAllocationRebalanceThreshold = settings.tuningParameters.leastShardAllocationRebalanceThreshold, // TODO extract it a bit + leastShardAllocationMaxSimultaneousRebalance = settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance, + waitingForStateTimeout = settings.tuningParameters.waitingForStateTimeout, + updatingStateTimeout = settings.tuningParameters.updatingStateTimeout, + entityRecoveryStrategy = settings.tuningParameters.entityRecoveryStrategy, + entityRecoveryConstantRateStrategyFrequency = settings.tuningParameters.entityRecoveryConstantRateStrategyFrequency, + entityRecoveryConstantRateStrategyNumberOfEntities = settings.tuningParameters.entityRecoveryConstantRateStrategyNumberOfEntities + ), + new UntypedClusterSingletonManagerSettings( + settings.coordinatorSingletonSettings.singletonName, + settings.coordinatorSingletonSettings.role, + settings.coordinatorSingletonSettings.removalMargin, + settings.coordinatorSingletonSettings.handOverRetryInterval + )) + + } + + private def roleOption(role: String): Option[String] = + if (role == "" || role == null) None else Option(role) + + sealed trait StateStoreMode { def name: String } + object StateStoreMode { + def byName(name: String): StateStoreMode = + if (name == StateStoreModePersistence.name) StateStoreModePersistence + else if (name == StateStoreModeDData.name) StateStoreModeDData + else throw new IllegalArgumentException("Not recognized StateStoreMode, only 'persistence' and 'ddata' are supported.") + } + final case object StateStoreModePersistence extends StateStoreMode { override def name = "persistence" } + final case object StateStoreModeDData extends StateStoreMode { override def name = "ddata" } + + // generated using kaze-class + final class TuningParameters private ( + val bufferSize: Int, + val coordinatorFailureBackoff: FiniteDuration, + val entityRecoveryConstantRateStrategyFrequency: FiniteDuration, + val entityRecoveryConstantRateStrategyNumberOfEntities: Int, + val entityRecoveryStrategy: String, + val entityRestartBackoff: FiniteDuration, + val handOffTimeout: FiniteDuration, + val keepNrOfBatches: Int, + val leastShardAllocationMaxSimultaneousRebalance: Int, + val leastShardAllocationRebalanceThreshold: Int, + val rebalanceInterval: FiniteDuration, + val retryInterval: FiniteDuration, + val shardFailureBackoff: FiniteDuration, + val shardStartTimeout: FiniteDuration, + val snapshotAfter: Int, + val updatingStateTimeout: FiniteDuration, + val waitingForStateTimeout: FiniteDuration) { + + def this(untyped: UntypedShardingSettings.TuningParameters) { + this( + bufferSize = untyped.bufferSize, + coordinatorFailureBackoff = untyped.coordinatorFailureBackoff, + retryInterval = untyped.retryInterval, + handOffTimeout = untyped.handOffTimeout, + shardStartTimeout = untyped.shardStartTimeout, + shardFailureBackoff = untyped.shardFailureBackoff, + entityRestartBackoff = untyped.entityRestartBackoff, + rebalanceInterval = untyped.rebalanceInterval, + snapshotAfter = untyped.snapshotAfter, + keepNrOfBatches = untyped.keepNrOfBatches, + leastShardAllocationRebalanceThreshold = untyped.leastShardAllocationRebalanceThreshold, // TODO extract it a bit + leastShardAllocationMaxSimultaneousRebalance = untyped.leastShardAllocationMaxSimultaneousRebalance, + waitingForStateTimeout = untyped.waitingForStateTimeout, + updatingStateTimeout = untyped.updatingStateTimeout, + entityRecoveryStrategy = untyped.entityRecoveryStrategy, + entityRecoveryConstantRateStrategyFrequency = untyped.entityRecoveryConstantRateStrategyFrequency, + entityRecoveryConstantRateStrategyNumberOfEntities = untyped.entityRecoveryConstantRateStrategyNumberOfEntities + ) + + } + + require( + entityRecoveryStrategy == "all" || entityRecoveryStrategy == "constant", + s"Unknown 'entity-recovery-strategy' [$entityRecoveryStrategy], valid values are 'all' or 'constant'") + + def withBufferSize(value: Int): TuningParameters = copy(bufferSize = value) + def withCoordinatorFailureBackoff(value: FiniteDuration): TuningParameters = copy(coordinatorFailureBackoff = value) + def withEntityRecoveryConstantRateStrategyFrequency(value: FiniteDuration): TuningParameters = copy(entityRecoveryConstantRateStrategyFrequency = value) + def withEntityRecoveryConstantRateStrategyNumberOfEntities(value: Int): TuningParameters = copy(entityRecoveryConstantRateStrategyNumberOfEntities = value) + def withEntityRecoveryStrategy(value: java.lang.String): TuningParameters = copy(entityRecoveryStrategy = value) + def withEntityRestartBackoff(value: FiniteDuration): TuningParameters = copy(entityRestartBackoff = value) + def withHandOffTimeout(value: FiniteDuration): TuningParameters = copy(handOffTimeout = value) + def withKeepNrOfBatches(value: Int): TuningParameters = copy(keepNrOfBatches = value) + def withLeastShardAllocationMaxSimultaneousRebalance(value: Int): TuningParameters = copy(leastShardAllocationMaxSimultaneousRebalance = value) + def withLeastShardAllocationRebalanceThreshold(value: Int): TuningParameters = copy(leastShardAllocationRebalanceThreshold = value) + def withRebalanceInterval(value: FiniteDuration): TuningParameters = copy(rebalanceInterval = value) + def withRetryInterval(value: FiniteDuration): TuningParameters = copy(retryInterval = value) + def withShardFailureBackoff(value: FiniteDuration): TuningParameters = copy(shardFailureBackoff = value) + def withShardStartTimeout(value: FiniteDuration): TuningParameters = copy(shardStartTimeout = value) + def withSnapshotAfter(value: Int): TuningParameters = copy(snapshotAfter = value) + def withUpdatingStateTimeout(value: FiniteDuration): TuningParameters = copy(updatingStateTimeout = value) + def withWaitingForStateTimeout(value: FiniteDuration): TuningParameters = copy(waitingForStateTimeout = value) + + private def copy( + bufferSize: Int = bufferSize, + coordinatorFailureBackoff: FiniteDuration = coordinatorFailureBackoff, + entityRecoveryConstantRateStrategyFrequency: FiniteDuration = entityRecoveryConstantRateStrategyFrequency, + entityRecoveryConstantRateStrategyNumberOfEntities: Int = entityRecoveryConstantRateStrategyNumberOfEntities, + entityRecoveryStrategy: java.lang.String = entityRecoveryStrategy, + entityRestartBackoff: FiniteDuration = entityRestartBackoff, + handOffTimeout: FiniteDuration = handOffTimeout, + keepNrOfBatches: Int = keepNrOfBatches, + leastShardAllocationMaxSimultaneousRebalance: Int = leastShardAllocationMaxSimultaneousRebalance, + leastShardAllocationRebalanceThreshold: Int = leastShardAllocationRebalanceThreshold, + rebalanceInterval: FiniteDuration = rebalanceInterval, + retryInterval: FiniteDuration = retryInterval, + shardFailureBackoff: FiniteDuration = shardFailureBackoff, + shardStartTimeout: FiniteDuration = shardStartTimeout, + snapshotAfter: Int = snapshotAfter, + updatingStateTimeout: FiniteDuration = updatingStateTimeout, + waitingForStateTimeout: FiniteDuration = waitingForStateTimeout): TuningParameters = new TuningParameters( + bufferSize = bufferSize, + coordinatorFailureBackoff = coordinatorFailureBackoff, + entityRecoveryConstantRateStrategyFrequency = entityRecoveryConstantRateStrategyFrequency, + entityRecoveryConstantRateStrategyNumberOfEntities = entityRecoveryConstantRateStrategyNumberOfEntities, + entityRecoveryStrategy = entityRecoveryStrategy, + entityRestartBackoff = entityRestartBackoff, + handOffTimeout = handOffTimeout, + keepNrOfBatches = keepNrOfBatches, + leastShardAllocationMaxSimultaneousRebalance = leastShardAllocationMaxSimultaneousRebalance, + leastShardAllocationRebalanceThreshold = leastShardAllocationRebalanceThreshold, + rebalanceInterval = rebalanceInterval, + retryInterval = retryInterval, + shardFailureBackoff = shardFailureBackoff, + shardStartTimeout = shardStartTimeout, + snapshotAfter = snapshotAfter, + updatingStateTimeout = updatingStateTimeout, + waitingForStateTimeout = waitingForStateTimeout) + + override def toString = + s"""TuningParameters(${bufferSize},${coordinatorFailureBackoff},${entityRecoveryConstantRateStrategyFrequency},${entityRecoveryConstantRateStrategyNumberOfEntities},${entityRecoveryStrategy},${entityRestartBackoff},${handOffTimeout},${keepNrOfBatches},${leastShardAllocationMaxSimultaneousRebalance},${leastShardAllocationRebalanceThreshold},${rebalanceInterval},${retryInterval},${shardFailureBackoff},${shardStartTimeout},${snapshotAfter},${updatingStateTimeout},${waitingForStateTimeout})""" + } +} + +/** + * @param role specifies that this entity type requires cluster nodes with a specific role. + * If the role is not specified all nodes in the cluster are used. + * @param rememberEntities true if active entity actors shall be automatically restarted upon `Shard` + * restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash. + * @param journalPluginId Absolute path to the journal plugin configuration entity that is to + * be used for the internal persistence of ClusterSharding. If not defined the default + * journal plugin is used. Note that this is not related to persistence used by the entity + * actors. + * @param snapshotPluginId Absolute path to the snapshot plugin configuration entity that is to + * be used for the internal persistence of ClusterSharding. If not defined the default + * snapshot plugin is used. Note that this is not related to persistence used by the entity + * actors. + * @param tuningParameters additional tuning parameters, see descriptions in reference.conf + */ +final class ClusterShardingSettings( + val role: Option[String], + val rememberEntities: Boolean, + val journalPluginId: String, + val snapshotPluginId: String, + val stateStoreMode: ClusterShardingSettings.StateStoreMode, + val tuningParameters: ClusterShardingSettings.TuningParameters, + val coordinatorSingletonSettings: ClusterSingletonManagerSettings) extends NoSerializationVerificationNeeded { + + import akka.typed.cluster.sharding.ClusterShardingSettings.{ StateStoreModeDData, StateStoreModePersistence } + require( + stateStoreMode == StateStoreModePersistence || stateStoreMode == StateStoreModeDData, + s"Unknown 'state-store-mode' [$stateStoreMode], " + + s"valid values are '${StateStoreModeDData.name}' or '${StateStoreModePersistence.name}'") + + /** If true, this node should run the shard region, otherwise just a shard proxy should started on this node. */ + @InternalApi + private[akka] def shouldHostShard(cluster: Cluster): Boolean = + role.isEmpty || cluster.selfMember.roles(role.get) + + def withRole(role: String): ClusterShardingSettings = copy(role = ClusterShardingSettings.roleOption(role)) + + def withRole(role: Option[String]): ClusterShardingSettings = copy(role = role) + + def withRememberEntities(rememberEntities: Boolean): ClusterShardingSettings = + copy(rememberEntities = rememberEntities) + + def withJournalPluginId(journalPluginId: String): ClusterShardingSettings = + copy(journalPluginId = journalPluginId) + + def withSnapshotPluginId(snapshotPluginId: String): ClusterShardingSettings = + copy(snapshotPluginId = snapshotPluginId) + + def withTuningParameters(tuningParameters: ClusterShardingSettings.TuningParameters): ClusterShardingSettings = + copy(tuningParameters = tuningParameters) + + def withStateStoreMode(stateStoreMode: ClusterShardingSettings.StateStoreMode): ClusterShardingSettings = + copy(stateStoreMode = stateStoreMode) + + /** + * The `role` of the `ClusterSingletonManagerSettings` is not used. The `role` of the + * coordinator singleton will be the same as the `role` of `ClusterShardingSettings`. + */ + def withCoordinatorSingletonSettings(coordinatorSingletonSettings: ClusterSingletonManagerSettings): ClusterShardingSettings = + copy(coordinatorSingletonSettings = coordinatorSingletonSettings) + + private def copy( + role: Option[String] = role, + rememberEntities: Boolean = rememberEntities, + journalPluginId: String = journalPluginId, + snapshotPluginId: String = snapshotPluginId, + stateStoreMode: ClusterShardingSettings.StateStoreMode = stateStoreMode, + tuningParameters: ClusterShardingSettings.TuningParameters = tuningParameters, + coordinatorSingletonSettings: ClusterSingletonManagerSettings = coordinatorSingletonSettings): ClusterShardingSettings = + new ClusterShardingSettings( + role, + rememberEntities, + journalPluginId, + snapshotPluginId, + stateStoreMode, + tuningParameters, + coordinatorSingletonSettings) +} diff --git a/akka-typed/src/main/scala/akka/typed/cluster/sharding/EntityRef.scala b/akka-typed/src/main/scala/akka/typed/cluster/sharding/EntityRef.scala new file mode 100644 index 0000000000..68df245919 --- /dev/null +++ b/akka-typed/src/main/scala/akka/typed/cluster/sharding/EntityRef.scala @@ -0,0 +1,129 @@ +/* + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.typed.cluster.sharding + +import akka.actor.{ InternalActorRef, Scheduler } +import akka.annotation.InternalApi +import akka.pattern.{ AskTimeoutException, PromiseActorRef } +import akka.typed.ActorRef +import akka.typed.scaladsl.AskPattern +import akka.typed.scaladsl.AskPattern.PromiseRef +import akka.util.Timeout + +import scala.concurrent.Future + +/** + * A reference to an sharded Entity, which allows `ActorRef`-like usage. + * + * An [[EntityRef]] is NOT an [[ActorRef]]–by design–in order to be explicit about the fact that the life-cycle + * of a sharded Entity is very different than a plain Actors. Most notably, this is shown by features of Entities + * such as re-balancing (an active Entity to a different node) or passivation. Both of which are aimed to be completely + * transparent to users of such Entity. In other words, if this were to be a plain ActorRef, it would be possible to + * apply DeathWatch to it, which in turn would then trigger when the sharded Actor stopped, breaking the illusion that + * Entity refs are "always there". Please note that while not encouraged, it is possible to expose an Actor's `self` + * [[ActorRef]] and watch it in case such notification is desired. + */ +trait EntityRef[A] { + + /** + * Send a message to the entity referenced by this EntityRef using *at-most-once* + * messaging semantics. + */ + def tell(msg: A): Unit + + /** + * Allows to "ask" the [[EntityRef]] for a reply. + * See [[akka.typed.scaladsl.AskPattern]] for a complete write-up of this pattern + * + * Example usage: + * {{{ + * case class Request(msg: String, replyTo: ActorRef[Reply]) + * case class Reply(msg: String) + * + * implicit val timeout = Timeout(3.seconds) + * val target: EntityRef[Request] = ... + * val f: Future[Reply] = target ? (Request("hello", _)) + * }}} + * + * Please note that an implicit [[akka.util.Timeout]] and [[akka.actor.Scheduler]] must be available to use this pattern. + */ + def ask[U](f: ActorRef[U] ⇒ A)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] + +} + +object EntityRef { + implicit final class EntityRefOps[A](val ref: EntityRef[A]) extends AnyVal { + /** + * Send a message to the Actor referenced by this ActorRef using *at-most-once* + * messaging semantics. + */ + def !(msg: A): Unit = ref.tell(msg) + + /** + * Allows to "ask" the [[EntityRef]] for a reply. + * See [[akka.typed.scaladsl.AskPattern]] for a complete write-up of this pattern + * + * Example usage: + * {{{ + * case class Request(msg: String, replyTo: ActorRef[Reply]) + * case class Reply(msg: String) + * + * implicit val timeout = Timeout(3.seconds) + * val target: EntityRef[Request] = ... + * val f: Future[Reply] = target ? (Request("hello", _)) + * }}} + * + * Please note that an implicit [[akka.util.Timeout]] and [[akka.actor.Scheduler]] must be available to use this pattern. + */ + def ?[U](f: ActorRef[U] ⇒ A)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] = + ref.ask(f)(timeout, scheduler) + } +} + +@InternalApi +private[akka] final class AdaptedEntityRefImpl[A](shardRegion: akka.actor.ActorRef, entityId: String) extends EntityRef[A] { + import akka.pattern.ask + + override def tell(msg: A): Unit = + shardRegion ! ShardingEnvelope(entityId, msg) + + override def ask[U](f: (ActorRef[U]) ⇒ A)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] = { + import akka.typed._ + val p = new EntityPromiseRef[U](shardRegion.asInstanceOf[InternalActorRef], timeout) + val m = f(p.ref) + if (p.promiseRef ne null) p.promiseRef.messageClassName = m.getClass.getName + shardRegion ! ShardingEnvelope(entityId, m) + p.future + } + + /** Similar to [[akka.typed.scaladsl.AskPattern.PromiseRef]] but for an [[EntityRef]] target. */ + @InternalApi + private final class EntityPromiseRef[U](untyped: InternalActorRef, timeout: Timeout) { + import akka.typed.internal.{ adapter ⇒ adapt } + + // Note: _promiseRef mustn't have a type pattern, since it can be null + private[this] val (_ref: ActorRef[U], _future: Future[U], _promiseRef) = + if (untyped.isTerminated) + ( + adapt.ActorRefAdapter[U](untyped.provider.deadLetters), + Future.failed[U](new AskTimeoutException(s"Recipient[$untyped] had already been terminated.")), + null) + else if (timeout.duration.length <= 0) + ( + adapt.ActorRefAdapter[U](untyped.provider.deadLetters), + Future.failed[U](new IllegalArgumentException(s"Timeout length must be positive, question not sent to [$untyped]")), + null + ) + else { + val a = PromiseActorRef(untyped.provider, timeout, untyped, "unknown") + val b = adapt.ActorRefAdapter[U](a) + (b, a.result.future.asInstanceOf[Future[U]], a) + } + + val ref: ActorRef[U] = _ref + val future: Future[U] = _future + val promiseRef: PromiseActorRef = _promiseRef + } + +} diff --git a/akka-typed/src/main/scala/akka/typed/scaladsl/AskPattern.scala b/akka-typed/src/main/scala/akka/typed/scaladsl/AskPattern.scala index df49e223b2..2a11ce5365 100644 --- a/akka-typed/src/main/scala/akka/typed/scaladsl/AskPattern.scala +++ b/akka-typed/src/main/scala/akka/typed/scaladsl/AskPattern.scala @@ -12,11 +12,14 @@ import akka.actor.Scheduler import akka.typed.internal.FunctionRef import akka.actor.RootActorPath import akka.actor.Address +import akka.annotation.InternalApi import akka.typed.ActorRef import akka.typed.internal.{ adapter ⇒ adapt } /** * The ask-pattern implements the initiator side of a request–reply protocol. + * The `?` operator is pronounced as "ask". + * * The party that asks may be within or without an Actor, since the * implementation will fabricate a (hidden) [[ActorRef]] that is bound to a * [[scala.concurrent.Promise]]. This ActorRef will need to be injected in the @@ -36,6 +39,27 @@ import akka.typed.internal.{ adapter ⇒ adapt } */ object AskPattern { implicit class Askable[T](val ref: ActorRef[T]) extends AnyVal { + /** + * The ask-pattern implements the initiator side of a request–reply protocol. + * The `?` operator is pronounced as "ask". + * + * The party that asks may be within or without an Actor, since the + * implementation will fabricate a (hidden) [[ActorRef]] that is bound to a + * [[scala.concurrent.Promise]]. This ActorRef will need to be injected in the + * message that is sent to the target Actor in order to function as a reply-to + * address, therefore the argument to the ask / `?` + * operator is not the message itself but a function that given the reply-to + * address will create the message. + * + * {{{ + * case class Request(msg: String, replyTo: ActorRef[Reply]) + * case class Reply(msg: String) + * + * implicit val timeout = Timeout(3.seconds) + * val target: ActorRef[Request] = ... + * val f: Future[Reply] = target ? (Request("hello", _)) + * }}} + */ def ?[U](f: ActorRef[U] ⇒ T)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] = ref match { case a: adapt.ActorRefAdapter[_] ⇒ askUntyped(ref, a.untyped, timeout, f) @@ -44,7 +68,7 @@ object AskPattern { } } - private class PromiseRef[U](target: ActorRef[_], untyped: InternalActorRef, timeout: Timeout) { + private final class PromiseRef[U](target: ActorRef[_], untyped: InternalActorRef, timeout: Timeout) { // Note: _promiseRef mustn't have a type pattern, since it can be null private[this] val (_ref: ActorRef[U], _future: Future[U], _promiseRef) =