diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterSharding.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterSharding.scala index 8d32e45cbc..ed7c5098e9 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterSharding.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterSharding.scala @@ -3,17 +3,27 @@ */ package akka.cluster.sharding.typed -import akka.annotation.{ DoNotInherit, InternalApi } -import akka.cluster.sharding.ShardCoordinator.{ LeastShardAllocationStrategy, ShardAllocationStrategy } -import akka.cluster.sharding.{ ClusterSharding ⇒ UntypedClusterSharding, ShardRegion ⇒ UntypedShardRegion } -import akka.cluster.typed.Cluster -import akka.actor.typed.internal.adapter.{ ActorRefAdapter, ActorSystemAdapter } -import akka.actor.typed.scaladsl.adapter.PropsAdapter -import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Extension, ExtensionId, Props } - import scala.language.implicitConversions import scala.reflect.ClassTag + import akka.actor.typed.Behavior.UntypedBehavior +import akka.actor.typed.internal.adapter.ActorRefAdapter +import akka.actor.typed.internal.adapter.ActorSystemAdapter +import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem +import akka.actor.typed.Behavior +import akka.actor.typed.Extension +import akka.actor.typed.ExtensionId +import akka.actor.typed.Props +import akka.annotation.DoNotInherit +import akka.annotation.InternalApi +import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy +import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy +import akka.cluster.sharding.{ ClusterSharding ⇒ UntypedClusterSharding } +import akka.cluster.sharding.{ ShardRegion ⇒ UntypedShardRegion } +import akka.cluster.typed.Cluster +import akka.event.Logging +import akka.event.LoggingAdapter /** * Default envelope type that may be used with Cluster Sharding. @@ -23,7 +33,7 @@ import akka.actor.typed.Behavior.UntypedBehavior * but a convenient way to send envelope-wrapped messages via cluster sharding. * * The alternative way of routing messages through sharding is to not use envelopes, - * and have the message types themselfs carry identifiers. + * and have the message types themselves carry identifiers. */ final case class ShardingEnvelope[A](entityId: String, message: A) // TODO think if should remain a case class @@ -57,25 +67,25 @@ object ShardingMessageExtractor { * * This is recommended since it does not force details about sharding into the entity protocol */ - def apply[A](maxNumberOfShards: Int): ShardingMessageExtractor[ShardingEnvelope[A], A] = - new HashCodeMessageExtractor[A](maxNumberOfShards) + def apply[A](maxNumberOfShards: Int, handOffStopMessage: A): ShardingMessageExtractor[ShardingEnvelope[A], A] = + new HashCodeMessageExtractor[A](maxNumberOfShards, handOffStopMessage) /** - * Create a message extractor for a protocol where the entity id is available in each message. + * Scala API: Create a message extractor for a protocol where the entity id is available in each message. */ def noEnvelope[A]( - maxNumberOfShards: Int, - extractEntityId: A ⇒ String): ShardingMessageExtractor[A, A] = - new HashCodeNoEnvelopeMessageExtractor[A](maxNumberOfShards) { - // TODO catch MatchError here and return null for those to yield an "unhandled" when partial functions are used? + maxNumberOfShards: Int, + handOffStopMessage: A)( + extractEntityId: A ⇒ String): ShardingMessageExtractor[A, A] = + new HashCodeNoEnvelopeMessageExtractor[A](maxNumberOfShards, handOffStopMessage) { def entityId(message: A) = extractEntityId(message) } } /** - * Entirely customizable typed message extractor. Prefer [[HashCodeMessageExtractor]] or [[HashCodeNoEnvelopeMessageExtractor]] - * if possible. + * Entirely customizable typed message extractor. Prefer [[HashCodeMessageExtractor]] or + * [[HashCodeNoEnvelopeMessageExtractor]]if possible. * * @tparam E Possibly an Envelope around the messages accepted by the entity actor, is the same as `A` if there is no * envelope. @@ -89,6 +99,12 @@ trait ShardingMessageExtractor[E, A] { */ def entityId(message: E): String + /** + * Extract the entity id from an incoming `message`. Only messages that passed the [[entityId]] + * function will be used as input to this function. + */ + def shardId(message: E): String + /** * Extract the message to send to the entity from an incoming `message`. * Note that the extracted message does not have to be the same as the incoming @@ -98,18 +114,16 @@ trait ShardingMessageExtractor[E, A] { * If the returned value is `null`, and the entity isn't running yet the entity will be started * but no message will be delivered to it. */ - def entityMessage(message: E): A // TODO "unwrapMessage" is how I'd call it? + def unwrapMessage(message: E): A /** - * Extract the entity id from an incoming `message`. Only messages that passed the [[entityId]] - * function will be used as input to this function. + * Message sent to an entity to tell it to stop, e.g. when rebalanced. + * The message defined here is not passed to `entityId`, `shardId` or `unwrapMessage`. */ - def shardId(message: E): String + def handOffStopMessage: A } /** - * Java API: - * * Default message extractor type, using envelopes to identify what entity a message is for * and the hashcode of the entityId to decide which shard an entity belongs to. * @@ -117,15 +131,17 @@ trait ShardingMessageExtractor[E, A] { * * @tparam A The type of message accepted by the entity actor */ -final class HashCodeMessageExtractor[A](maxNumberOfShards: Int) extends ShardingMessageExtractor[ShardingEnvelope[A], A] { +final class HashCodeMessageExtractor[A]( + val maxNumberOfShards: Int, + override val handOffStopMessage: A) + extends ShardingMessageExtractor[ShardingEnvelope[A], A] { + def entityId(envelope: ShardingEnvelope[A]): String = envelope.entityId - def entityMessage(envelope: ShardingEnvelope[A]): A = envelope.message + def unwrapMessage(envelope: ShardingEnvelope[A]): A = envelope.message def shardId(envelope: ShardingEnvelope[A]): String = (math.abs(envelope.entityId.hashCode) % maxNumberOfShards).toString } /** - * Java API: - * * Default message extractor type, using a property of the message to identify what entity a message is for * and the hashcode of the entityId to decide which shard an entity belongs to. * @@ -133,8 +149,11 @@ final class HashCodeMessageExtractor[A](maxNumberOfShards: Int) extends Sharding * * @tparam A The type of message accepted by the entity actor */ -abstract class HashCodeNoEnvelopeMessageExtractor[A](maxNumberOfShards: Int) extends ShardingMessageExtractor[A, A] { - final def entityMessage(message: A): A = message +abstract class HashCodeNoEnvelopeMessageExtractor[A]( + val maxNumberOfShards: Int, + override val handOffStopMessage: A) + extends ShardingMessageExtractor[A, A] { + final def unwrapMessage(message: A): A = message def shardId(message: A): String = { val id = entityId(message) if (id != null) (math.abs(id.hashCode) % maxNumberOfShards).toString @@ -146,8 +165,10 @@ abstract class HashCodeNoEnvelopeMessageExtractor[A](maxNumberOfShards: Int) ext /** * The key of an entity type, the `name` must be unique. + * + * Not for user extension. */ -abstract class EntityTypeKey[T] { +@DoNotInherit abstract class EntityTypeKey[T] { def name: String } @@ -195,6 +216,7 @@ final class AdaptedClusterShardingImpl(system: ActorSystem[_]) extends ClusterSh private val cluster = Cluster(system) private val untypedSystem = system.toUntyped private val untypedSharding = akka.cluster.sharding.ClusterSharding(untypedSystem) + private val log: LoggingAdapter = Logging(untypedSystem, classOf[ClusterSharding]) override def spawn[A]( behavior: Behavior[A], @@ -203,18 +225,17 @@ final class AdaptedClusterShardingImpl(system: ActorSystem[_]) extends ClusterSh settings: ClusterShardingSettings, maxNumberOfShards: Int, handOffStopMessage: A): ActorRef[ShardingEnvelope[A]] = { - val extractor = new HashCodeMessageExtractor[A](maxNumberOfShards) - spawn(behavior, entityProps, typeKey, settings, extractor, defaultShardAllocationStrategy(settings), handOffStopMessage) + val extractor = new HashCodeMessageExtractor[A](maxNumberOfShards, handOffStopMessage) + spawn(behavior, entityProps, typeKey, settings, extractor, defaultShardAllocationStrategy(settings)) } override def spawn[E, A]( - behavior: Behavior[A], - entityProps: Props, - typeKey: EntityTypeKey[A], - settings: ClusterShardingSettings, - messageExtractor: ShardingMessageExtractor[E, A], - handOffStopMessage: A): ActorRef[E] = - spawn(behavior, entityProps, typeKey, settings, messageExtractor, defaultShardAllocationStrategy(settings), handOffStopMessage) + behavior: Behavior[A], + entityProps: Props, + typeKey: EntityTypeKey[A], + settings: ClusterShardingSettings, + messageExtractor: ShardingMessageExtractor[E, A]): ActorRef[E] = + spawn(behavior, entityProps, typeKey, settings, messageExtractor, defaultShardAllocationStrategy(settings)) override def spawn[E, A]( behavior: Behavior[A], @@ -222,14 +243,13 @@ final class AdaptedClusterShardingImpl(system: ActorSystem[_]) extends ClusterSh typeKey: EntityTypeKey[A], settings: ClusterShardingSettings, extractor: ShardingMessageExtractor[E, A], - allocationStrategy: ShardAllocationStrategy, - handOffStopMessage: A): ActorRef[E] = { + allocationStrategy: ShardAllocationStrategy): ActorRef[E] = { val untypedSettings = ClusterShardingSettings.toUntypedSettings(settings) val ref = if (settings.shouldHostShard(cluster)) { - system.log.info("Starting Shard Region [{}]...") + log.info("Starting Shard Region [{}]...", typeKey.name) val untypedProps = behavior match { case u: UntypedBehavior[_] ⇒ u.untypedProps // PersistentBehavior @@ -242,7 +262,7 @@ final class AdaptedClusterShardingImpl(system: ActorSystem[_]) extends ClusterSh untypedSettings, extractor, extractor, defaultShardAllocationStrategy(settings), - handOffStopMessage) + extractor.handOffStopMessage) } else { system.log.info("Starting Shard Region Proxy [{}] (no actors will be hosted on this node)...") @@ -274,7 +294,7 @@ final class AdaptedClusterShardingImpl(system: ActorSystem[_]) extends ClusterSh case msg: E @unchecked if extractor.entityId(msg) ne null ⇒ // we're evaluating entityId twice, I wonder if we could do it just once (same was in old sharding's Java DSL) - (extractor.entityId(msg), extractor.entityMessage(msg)) + (extractor.entityId(msg), extractor.unwrapMessage(msg)) } @InternalApi private implicit def convertExtractShardId[E, A](extractor: ShardingMessageExtractor[E, A]): UntypedShardRegion.ExtractShardId = { @@ -295,11 +315,9 @@ sealed abstract class ClusterSharding extends Extension { * * @param behavior The behavior for entities * @param typeKey A key that uniquely identifies the type of entity in this cluster - * @param handOffStopMessage Message sent to an entity to tell it to stop + * @param handOffStopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced. * @tparam A The type of command the entity accepts */ - // TODO: FYI, I think it would be very good to have rule that "behavior, otherstuff" - // TODO: or "behavior, props, otherstuff" be the consistent style we want to promote in parameter ordering, WDYT? def spawn[A]( behavior: Behavior[A], props: Props, @@ -314,8 +332,8 @@ sealed abstract class ClusterSharding extends Extension { * @param behavior The behavior for entities * @param typeKey A key that uniquely identifies the type of entity in this cluster * @param entityProps Props to apply when starting an entity + * @param messageExtractor Extract entityId, shardId, and unwrap messages. * @param allocationStrategy Allocation strategy which decides on which nodes to allocate new shards - * @param handOffStopMessage Message sent to an entity to tell it to stop * @tparam E A possible envelope around the message the entity accepts * @tparam A The type of command the entity accepts */ @@ -325,8 +343,7 @@ sealed abstract class ClusterSharding extends Extension { typeKey: EntityTypeKey[A], settings: ClusterShardingSettings, messageExtractor: ShardingMessageExtractor[E, A], - allocationStrategy: ShardAllocationStrategy, - handOffStopMessage: A): ActorRef[E] + allocationStrategy: ShardAllocationStrategy): ActorRef[E] /** * Spawn a shard region or a proxy depending on if the settings require role and if this node has such a role. @@ -334,17 +351,16 @@ sealed abstract class ClusterSharding extends Extension { * @param behavior The behavior for entities * @param typeKey A key that uniquely identifies the type of entity in this cluster * @param entityProps Props to apply when starting an entity - * @param handOffStopMessage Message sent to an entity to tell it to stop + * @param messageExtractor Extract entityId, shardId, and unwrap messages. * @tparam E A possible envelope around the message the entity accepts * @tparam A The type of command the entity accepts */ def spawn[E, A]( - behavior: Behavior[A], - entityProps: Props, - typeKey: EntityTypeKey[A], - settings: ClusterShardingSettings, - messageExtractor: ShardingMessageExtractor[E, A], - handOffStopMessage: A): ActorRef[E] + behavior: Behavior[A], + entityProps: Props, + typeKey: EntityTypeKey[A], + settings: ClusterShardingSettings, + messageExtractor: ShardingMessageExtractor[E, A]): ActorRef[E] /** * Create an `ActorRef`-like reference to a specific sharded entity. diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ClusterShardingSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ClusterShardingSpec.scala index 436b8eaf97..a4f52d8be3 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ClusterShardingSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/ClusterShardingSpec.scala @@ -6,19 +6,25 @@ package akka.cluster.sharding.typed import java.nio.charset.StandardCharsets +import scala.concurrent.duration._ + import akka.actor.ExtendedActorSystem -import akka.actor.typed.{ ActorRef, ActorRefResolver, Props, TypedAkkaSpecWithShutdown } import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ +import akka.actor.typed.ActorRef +import akka.actor.typed.ActorRefResolver +import akka.actor.typed.ActorSystem +import akka.actor.typed.Props +import akka.actor.typed.TypedAkkaSpecWithShutdown import akka.cluster.MemberStatus -import akka.cluster.typed.{ Cluster, Join } +import akka.cluster.typed.Cluster +import akka.cluster.typed.Join +import akka.cluster.typed.Leave import akka.serialization.SerializerWithStringManifest import akka.testkit.typed.TestKit import akka.testkit.typed.scaladsl.TestProbe import com.typesafe.config.ConfigFactory -import org.scalatest.concurrent.{ Eventually, ScalaFutures } - -import scala.concurrent.duration._ +import org.scalatest.time.Span object ClusterShardingSpec { val config = ConfigFactory.parseString( @@ -55,12 +61,10 @@ object ClusterShardingSpec { final case class WhoAreYou(replyTo: ActorRef[String]) extends TestProtocol final case class StopPlz() extends TestProtocol - sealed trait IdTestProtocol extends java.io.Serializable { - def id: String - } + sealed trait IdTestProtocol extends java.io.Serializable final case class IdReplyPlz(id: String, toMe: ActorRef[String]) extends IdTestProtocol final case class IdWhoAreYou(id: String, replyTo: ActorRef[String]) extends IdTestProtocol - final case class IdStopPlz(id: String) extends IdTestProtocol + final case class IdStopPlz() extends IdTestProtocol class Serializer(system: ExtendedActorSystem) extends SerializerWithStringManifest { def identifier: Int = 48 @@ -89,7 +93,7 @@ object ClusterShardingSpec { case _: StopPlz ⇒ Array.emptyByteArray case IdReplyPlz(id, ref) ⇒ idAndRefToBinary(id, ref) case IdWhoAreYou(id, ref) ⇒ idAndRefToBinary(id, ref) - case IdStopPlz(id) ⇒ id.getBytes(StandardCharsets.UTF_8) + case _: IdStopPlz ⇒ Array.emptyByteArray } private def actorRefFromBinary[T](bytes: Array[Byte]): ActorRef[T] = @@ -108,29 +112,22 @@ object ClusterShardingSpec { case "c" ⇒ StopPlz() case "A" ⇒ IdReplyPlz.tupled(idAndRefFromBinary(bytes)) case "B" ⇒ IdWhoAreYou.tupled(idAndRefFromBinary(bytes)) - case "C" ⇒ IdStopPlz(new String(bytes, StandardCharsets.UTF_8)) + case "C" ⇒ IdStopPlz() } } } -class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterShardingSpec.config) - with TypedAkkaSpecWithShutdown with ScalaFutures with Eventually { - +class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterShardingSpec.config) with TypedAkkaSpecWithShutdown { import ClusterShardingSpec._ - import akka.actor.typed.scaladsl.adapter._ - implicit val s = system val sharding = ClusterSharding(system) - implicit val untypedSystem = system.toUntyped - - val untypedSystem2 = akka.actor.ActorSystem(system.name, system.settings.config) - val system2 = untypedSystem2.toTyped + val system2 = ActorSystem(Behaviors.ignore[Any], name = system.name, config = system.settings.config) val sharding2 = ClusterSharding(system2) override def afterAll(): Unit = { - system2.terminate().futureValue + TestKit.shutdown(system2, 5.seconds) super.afterAll() } @@ -140,7 +137,8 @@ class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterSharding Behaviors.stopped case (ctx, WhoAreYou(replyTo)) ⇒ - replyTo ! s"I'm ${ctx.self.path.name}" + val address = Cluster(ctx.system).selfMember.address + replyTo ! s"I'm ${ctx.self.path.name} at ${address.host.get}:${address.port.get}" Behaviors.same case (_, ReplyPlz(toMe)) ⇒ @@ -150,11 +148,12 @@ class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterSharding val typeKey2 = EntityTypeKey[IdTestProtocol]("no-envelope-shard") val behaviorWithId = Behaviors.immutable[IdTestProtocol] { - case (_, IdStopPlz(_)) ⇒ + case (_, IdStopPlz()) ⇒ Behaviors.stopped case (ctx, IdWhoAreYou(_, replyTo)) ⇒ - replyTo ! s"I'm ${ctx.self.path.name}" + val address = Cluster(ctx.system).selfMember.address + replyTo ! s"I'm ${ctx.self.path.name} at ${address.host.get}:${address.port.get}" Behaviors.same case (_, IdReplyPlz(_, toMe)) ⇒ @@ -162,6 +161,44 @@ class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterSharding Behaviors.same } + val shardingRef1 = sharding.spawn( + behavior, + Props.empty, + typeKey, + ClusterShardingSettings(system), + 10, + StopPlz()) + + val shardingRef2 = sharding2.spawn( + behavior, + Props.empty, + typeKey, + ClusterShardingSettings(system2), + 10, + StopPlz()) + + val shardingRef3 = sharding.spawn( + behaviorWithId, + Props.empty, + typeKey2, + ClusterShardingSettings(system), + ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) { + case IdReplyPlz(id, _) ⇒ id + case IdWhoAreYou(id, _) ⇒ id + case other ⇒ throw new IllegalArgumentException(s"Unexpected message $other") + }) + + val shardingRef4 = sharding2.spawn( + behaviorWithId, + Props.empty, + typeKey2, + ClusterShardingSettings(system2), + ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) { + case IdReplyPlz(id, _) ⇒ id + case IdWhoAreYou(id, _) ⇒ id + case other ⇒ throw new IllegalArgumentException(s"Unexpected message $other") + }) + "Typed cluster sharding" must { "join cluster" in { @@ -180,50 +217,18 @@ class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterSharding } "send messages via cluster sharding, using envelopes" in { - val ref = sharding.spawn( - behavior, - Props.empty, - typeKey, - ClusterShardingSettings(system), - 10, - StopPlz()) - sharding2.spawn( - behavior, - Props.empty, - typeKey, - ClusterShardingSettings(system2), - 10, - StopPlz()) - (1 to 10).foreach { n ⇒ val p = TestProbe[String]() - ref ! ShardingEnvelope(s"test$n", ReplyPlz(p.ref)) - p.expectMessage(3.seconds, "Hello!") - ref ! ShardingEnvelope(s"test$n", StopPlz()) + shardingRef1 ! ShardingEnvelope(s"test$n", ReplyPlz(p.ref)) + p.expectMessage("Hello!") } } "send messages via cluster sharding, without envelopes" in { - val ref = sharding.spawn( - behaviorWithId, - Props.empty, - typeKey2, - ClusterShardingSettings(system), - ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, _.id), - IdStopPlz("THE_ID_HERE")) - sharding2.spawn( - behaviorWithId, - Props.empty, - typeKey2, - ClusterShardingSettings(system2), - ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, _.id), - IdStopPlz("THE_ID_HERE")) - (1 to 10).foreach { n ⇒ val p = TestProbe[String]() - ref ! IdReplyPlz(s"test$n", p.ref) - p.expectMessage(3.seconds, "Hello!") - ref ! IdStopPlz(s"test$n") + shardingRef3 ! IdReplyPlz(s"test$n", p.ref) + p.expectMessage("Hello!") } } @@ -248,10 +253,10 @@ class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterSharding val p = TestProbe[String]() charlieRef ! WhoAreYou(p.ref) - p.expectMessage(3.seconds, "I'm charlie") + p.expectMessageType[String] should startWith("I'm charlie") charlieRef tell WhoAreYou(p.ref) - p.expectMessage(3.seconds, "I'm charlie") + p.expectMessageType[String] should startWith("I'm charlie") charlieRef ! StopPlz() } @@ -261,12 +266,39 @@ class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterSharding val charlieRef = sharding.entityRefFor(typeKey, "charlie") val reply1 = bobRef ? WhoAreYou // TODO document that WhoAreYou(_) would not work - reply1.futureValue should ===("I'm bob") + reply1.futureValue should startWith("I'm bob") val reply2 = charlieRef ask WhoAreYou - reply2.futureValue should ===("I'm charlie") + reply2.futureValue should startWith("I'm charlie") bobRef ! StopPlz() } + + "use the handOffStopMessage for leaving/rebalance" in { + var replies1 = Set.empty[String] + (1 to 10).foreach { n ⇒ + val p = TestProbe[String]() + shardingRef1 ! ShardingEnvelope(s"test$n", WhoAreYou(p.ref)) + replies1 += p.expectMessageType[String] + } + replies1.size should ===(10) + + Cluster(system2).manager ! Leave(Cluster(system2).selfMember.address) + + implicit val patienceConfig: PatienceConfig = PatienceConfig(5.seconds, Span(100, org.scalatest.time.Millis)) + eventually { + // if it wouldn't receive the StopPlz and stop the leaving would take longer and this would fail + Cluster(system2).isTerminated should ===(true) + } + + var replies2 = Set.empty[String] + (1 to 10).foreach { n ⇒ + val p = TestProbe[String]() + shardingRef1 ! ShardingEnvelope(s"test$n", WhoAreYou(p.ref)) + replies2 += p.expectMessageType[String](10.seconds) + } + replies2.size should ===(10) + replies2 should !==(replies1) // different addresses + } } }