Api improvements of Sharding handOffMessage, #23686

* move definition of handOffMessage to the message extractor
* rename entityMessage to unwrapMessage
* update and cleanup test
This commit is contained in:
Patrik Nordwall 2018-02-01 06:56:58 +01:00
parent f1aa12daf2
commit 92de8375db
2 changed files with 170 additions and 122 deletions

View file

@ -3,17 +3,27 @@
*/
package akka.cluster.sharding.typed
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.cluster.sharding.ShardCoordinator.{ LeastShardAllocationStrategy, ShardAllocationStrategy }
import akka.cluster.sharding.{ ClusterSharding UntypedClusterSharding, ShardRegion UntypedShardRegion }
import akka.cluster.typed.Cluster
import akka.actor.typed.internal.adapter.{ ActorRefAdapter, ActorSystemAdapter }
import akka.actor.typed.scaladsl.adapter.PropsAdapter
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Extension, ExtensionId, Props }
import scala.language.implicitConversions
import scala.reflect.ClassTag
import akka.actor.typed.Behavior.UntypedBehavior
import akka.actor.typed.internal.adapter.ActorRefAdapter
import akka.actor.typed.internal.adapter.ActorSystemAdapter
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.Extension
import akka.actor.typed.ExtensionId
import akka.actor.typed.Props
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.{ ClusterSharding UntypedClusterSharding }
import akka.cluster.sharding.{ ShardRegion UntypedShardRegion }
import akka.cluster.typed.Cluster
import akka.event.Logging
import akka.event.LoggingAdapter
/**
* Default envelope type that may be used with Cluster Sharding.
@ -23,7 +33,7 @@ import akka.actor.typed.Behavior.UntypedBehavior
* but a convenient way to send envelope-wrapped messages via cluster sharding.
*
* The alternative way of routing messages through sharding is to not use envelopes,
* and have the message types themselfs carry identifiers.
* and have the message types themselves carry identifiers.
*/
final case class ShardingEnvelope[A](entityId: String, message: A) // TODO think if should remain a case class
@ -57,25 +67,25 @@ object ShardingMessageExtractor {
*
* This is recommended since it does not force details about sharding into the entity protocol
*/
def apply[A](maxNumberOfShards: Int): ShardingMessageExtractor[ShardingEnvelope[A], A] =
new HashCodeMessageExtractor[A](maxNumberOfShards)
def apply[A](maxNumberOfShards: Int, handOffStopMessage: A): ShardingMessageExtractor[ShardingEnvelope[A], A] =
new HashCodeMessageExtractor[A](maxNumberOfShards, handOffStopMessage)
/**
* Create a message extractor for a protocol where the entity id is available in each message.
* Scala API: Create a message extractor for a protocol where the entity id is available in each message.
*/
def noEnvelope[A](
maxNumberOfShards: Int,
extractEntityId: A String): ShardingMessageExtractor[A, A] =
new HashCodeNoEnvelopeMessageExtractor[A](maxNumberOfShards) {
// TODO catch MatchError here and return null for those to yield an "unhandled" when partial functions are used?
maxNumberOfShards: Int,
handOffStopMessage: A)(
extractEntityId: A String): ShardingMessageExtractor[A, A] =
new HashCodeNoEnvelopeMessageExtractor[A](maxNumberOfShards, handOffStopMessage) {
def entityId(message: A) = extractEntityId(message)
}
}
/**
* Entirely customizable typed message extractor. Prefer [[HashCodeMessageExtractor]] or [[HashCodeNoEnvelopeMessageExtractor]]
* if possible.
* Entirely customizable typed message extractor. Prefer [[HashCodeMessageExtractor]] or
* [[HashCodeNoEnvelopeMessageExtractor]]if possible.
*
* @tparam E Possibly an Envelope around the messages accepted by the entity actor, is the same as `A` if there is no
* envelope.
@ -89,6 +99,12 @@ trait ShardingMessageExtractor[E, A] {
*/
def entityId(message: E): String
/**
* Extract the entity id from an incoming `message`. Only messages that passed the [[entityId]]
* function will be used as input to this function.
*/
def shardId(message: E): String
/**
* Extract the message to send to the entity from an incoming `message`.
* Note that the extracted message does not have to be the same as the incoming
@ -98,18 +114,16 @@ trait ShardingMessageExtractor[E, A] {
* If the returned value is `null`, and the entity isn't running yet the entity will be started
* but no message will be delivered to it.
*/
def entityMessage(message: E): A // TODO "unwrapMessage" is how I'd call it?
def unwrapMessage(message: E): A
/**
* Extract the entity id from an incoming `message`. Only messages that passed the [[entityId]]
* function will be used as input to this function.
* Message sent to an entity to tell it to stop, e.g. when rebalanced.
* The message defined here is not passed to `entityId`, `shardId` or `unwrapMessage`.
*/
def shardId(message: E): String
def handOffStopMessage: A
}
/**
* Java API:
*
* Default message extractor type, using envelopes to identify what entity a message is for
* and the hashcode of the entityId to decide which shard an entity belongs to.
*
@ -117,15 +131,17 @@ trait ShardingMessageExtractor[E, A] {
*
* @tparam A The type of message accepted by the entity actor
*/
final class HashCodeMessageExtractor[A](maxNumberOfShards: Int) extends ShardingMessageExtractor[ShardingEnvelope[A], A] {
final class HashCodeMessageExtractor[A](
val maxNumberOfShards: Int,
override val handOffStopMessage: A)
extends ShardingMessageExtractor[ShardingEnvelope[A], A] {
def entityId(envelope: ShardingEnvelope[A]): String = envelope.entityId
def entityMessage(envelope: ShardingEnvelope[A]): A = envelope.message
def unwrapMessage(envelope: ShardingEnvelope[A]): A = envelope.message
def shardId(envelope: ShardingEnvelope[A]): String = (math.abs(envelope.entityId.hashCode) % maxNumberOfShards).toString
}
/**
* Java API:
*
* Default message extractor type, using a property of the message to identify what entity a message is for
* and the hashcode of the entityId to decide which shard an entity belongs to.
*
@ -133,8 +149,11 @@ final class HashCodeMessageExtractor[A](maxNumberOfShards: Int) extends Sharding
*
* @tparam A The type of message accepted by the entity actor
*/
abstract class HashCodeNoEnvelopeMessageExtractor[A](maxNumberOfShards: Int) extends ShardingMessageExtractor[A, A] {
final def entityMessage(message: A): A = message
abstract class HashCodeNoEnvelopeMessageExtractor[A](
val maxNumberOfShards: Int,
override val handOffStopMessage: A)
extends ShardingMessageExtractor[A, A] {
final def unwrapMessage(message: A): A = message
def shardId(message: A): String = {
val id = entityId(message)
if (id != null) (math.abs(id.hashCode) % maxNumberOfShards).toString
@ -146,8 +165,10 @@ abstract class HashCodeNoEnvelopeMessageExtractor[A](maxNumberOfShards: Int) ext
/**
* The key of an entity type, the `name` must be unique.
*
* Not for user extension.
*/
abstract class EntityTypeKey[T] {
@DoNotInherit abstract class EntityTypeKey[T] {
def name: String
}
@ -195,6 +216,7 @@ final class AdaptedClusterShardingImpl(system: ActorSystem[_]) extends ClusterSh
private val cluster = Cluster(system)
private val untypedSystem = system.toUntyped
private val untypedSharding = akka.cluster.sharding.ClusterSharding(untypedSystem)
private val log: LoggingAdapter = Logging(untypedSystem, classOf[ClusterSharding])
override def spawn[A](
behavior: Behavior[A],
@ -203,18 +225,17 @@ final class AdaptedClusterShardingImpl(system: ActorSystem[_]) extends ClusterSh
settings: ClusterShardingSettings,
maxNumberOfShards: Int,
handOffStopMessage: A): ActorRef[ShardingEnvelope[A]] = {
val extractor = new HashCodeMessageExtractor[A](maxNumberOfShards)
spawn(behavior, entityProps, typeKey, settings, extractor, defaultShardAllocationStrategy(settings), handOffStopMessage)
val extractor = new HashCodeMessageExtractor[A](maxNumberOfShards, handOffStopMessage)
spawn(behavior, entityProps, typeKey, settings, extractor, defaultShardAllocationStrategy(settings))
}
override def spawn[E, A](
behavior: Behavior[A],
entityProps: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
messageExtractor: ShardingMessageExtractor[E, A],
handOffStopMessage: A): ActorRef[E] =
spawn(behavior, entityProps, typeKey, settings, messageExtractor, defaultShardAllocationStrategy(settings), handOffStopMessage)
behavior: Behavior[A],
entityProps: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
messageExtractor: ShardingMessageExtractor[E, A]): ActorRef[E] =
spawn(behavior, entityProps, typeKey, settings, messageExtractor, defaultShardAllocationStrategy(settings))
override def spawn[E, A](
behavior: Behavior[A],
@ -222,14 +243,13 @@ final class AdaptedClusterShardingImpl(system: ActorSystem[_]) extends ClusterSh
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
extractor: ShardingMessageExtractor[E, A],
allocationStrategy: ShardAllocationStrategy,
handOffStopMessage: A): ActorRef[E] = {
allocationStrategy: ShardAllocationStrategy): ActorRef[E] = {
val untypedSettings = ClusterShardingSettings.toUntypedSettings(settings)
val ref =
if (settings.shouldHostShard(cluster)) {
system.log.info("Starting Shard Region [{}]...")
log.info("Starting Shard Region [{}]...", typeKey.name)
val untypedProps = behavior match {
case u: UntypedBehavior[_] u.untypedProps // PersistentBehavior
@ -242,7 +262,7 @@ final class AdaptedClusterShardingImpl(system: ActorSystem[_]) extends ClusterSh
untypedSettings,
extractor, extractor,
defaultShardAllocationStrategy(settings),
handOffStopMessage)
extractor.handOffStopMessage)
} else {
system.log.info("Starting Shard Region Proxy [{}] (no actors will be hosted on this node)...")
@ -274,7 +294,7 @@ final class AdaptedClusterShardingImpl(system: ActorSystem[_]) extends ClusterSh
case msg: E @unchecked if extractor.entityId(msg) ne null
// we're evaluating entityId twice, I wonder if we could do it just once (same was in old sharding's Java DSL)
(extractor.entityId(msg), extractor.entityMessage(msg))
(extractor.entityId(msg), extractor.unwrapMessage(msg))
}
@InternalApi
private implicit def convertExtractShardId[E, A](extractor: ShardingMessageExtractor[E, A]): UntypedShardRegion.ExtractShardId = {
@ -295,11 +315,9 @@ sealed abstract class ClusterSharding extends Extension {
*
* @param behavior The behavior for entities
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param handOffStopMessage Message sent to an entity to tell it to stop
* @param handOffStopMessage Message sent to an entity to tell it to stop, e.g. when rebalanced.
* @tparam A The type of command the entity accepts
*/
// TODO: FYI, I think it would be very good to have rule that "behavior, otherstuff"
// TODO: or "behavior, props, otherstuff" be the consistent style we want to promote in parameter ordering, WDYT?
def spawn[A](
behavior: Behavior[A],
props: Props,
@ -314,8 +332,8 @@ sealed abstract class ClusterSharding extends Extension {
* @param behavior The behavior for entities
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param entityProps Props to apply when starting an entity
* @param messageExtractor Extract entityId, shardId, and unwrap messages.
* @param allocationStrategy Allocation strategy which decides on which nodes to allocate new shards
* @param handOffStopMessage Message sent to an entity to tell it to stop
* @tparam E A possible envelope around the message the entity accepts
* @tparam A The type of command the entity accepts
*/
@ -325,8 +343,7 @@ sealed abstract class ClusterSharding extends Extension {
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
messageExtractor: ShardingMessageExtractor[E, A],
allocationStrategy: ShardAllocationStrategy,
handOffStopMessage: A): ActorRef[E]
allocationStrategy: ShardAllocationStrategy): ActorRef[E]
/**
* Spawn a shard region or a proxy depending on if the settings require role and if this node has such a role.
@ -334,17 +351,16 @@ sealed abstract class ClusterSharding extends Extension {
* @param behavior The behavior for entities
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param entityProps Props to apply when starting an entity
* @param handOffStopMessage Message sent to an entity to tell it to stop
* @param messageExtractor Extract entityId, shardId, and unwrap messages.
* @tparam E A possible envelope around the message the entity accepts
* @tparam A The type of command the entity accepts
*/
def spawn[E, A](
behavior: Behavior[A],
entityProps: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
messageExtractor: ShardingMessageExtractor[E, A],
handOffStopMessage: A): ActorRef[E]
behavior: Behavior[A],
entityProps: Props,
typeKey: EntityTypeKey[A],
settings: ClusterShardingSettings,
messageExtractor: ShardingMessageExtractor[E, A]): ActorRef[E]
/**
* Create an `ActorRef`-like reference to a specific sharded entity.

View file

@ -6,19 +6,25 @@ package akka.cluster.sharding.typed
import java.nio.charset.StandardCharsets
import scala.concurrent.duration._
import akka.actor.ExtendedActorSystem
import akka.actor.typed.{ ActorRef, ActorRefResolver, Props, TypedAkkaSpecWithShutdown }
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorRefResolver
import akka.actor.typed.ActorSystem
import akka.actor.typed.Props
import akka.actor.typed.TypedAkkaSpecWithShutdown
import akka.cluster.MemberStatus
import akka.cluster.typed.{ Cluster, Join }
import akka.cluster.typed.Cluster
import akka.cluster.typed.Join
import akka.cluster.typed.Leave
import akka.serialization.SerializerWithStringManifest
import akka.testkit.typed.TestKit
import akka.testkit.typed.scaladsl.TestProbe
import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.{ Eventually, ScalaFutures }
import scala.concurrent.duration._
import org.scalatest.time.Span
object ClusterShardingSpec {
val config = ConfigFactory.parseString(
@ -55,12 +61,10 @@ object ClusterShardingSpec {
final case class WhoAreYou(replyTo: ActorRef[String]) extends TestProtocol
final case class StopPlz() extends TestProtocol
sealed trait IdTestProtocol extends java.io.Serializable {
def id: String
}
sealed trait IdTestProtocol extends java.io.Serializable
final case class IdReplyPlz(id: String, toMe: ActorRef[String]) extends IdTestProtocol
final case class IdWhoAreYou(id: String, replyTo: ActorRef[String]) extends IdTestProtocol
final case class IdStopPlz(id: String) extends IdTestProtocol
final case class IdStopPlz() extends IdTestProtocol
class Serializer(system: ExtendedActorSystem) extends SerializerWithStringManifest {
def identifier: Int = 48
@ -89,7 +93,7 @@ object ClusterShardingSpec {
case _: StopPlz Array.emptyByteArray
case IdReplyPlz(id, ref) idAndRefToBinary(id, ref)
case IdWhoAreYou(id, ref) idAndRefToBinary(id, ref)
case IdStopPlz(id) id.getBytes(StandardCharsets.UTF_8)
case _: IdStopPlz Array.emptyByteArray
}
private def actorRefFromBinary[T](bytes: Array[Byte]): ActorRef[T] =
@ -108,29 +112,22 @@ object ClusterShardingSpec {
case "c" StopPlz()
case "A" IdReplyPlz.tupled(idAndRefFromBinary(bytes))
case "B" IdWhoAreYou.tupled(idAndRefFromBinary(bytes))
case "C" IdStopPlz(new String(bytes, StandardCharsets.UTF_8))
case "C" IdStopPlz()
}
}
}
class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterShardingSpec.config)
with TypedAkkaSpecWithShutdown with ScalaFutures with Eventually {
class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterShardingSpec.config) with TypedAkkaSpecWithShutdown {
import ClusterShardingSpec._
import akka.actor.typed.scaladsl.adapter._
implicit val s = system
val sharding = ClusterSharding(system)
implicit val untypedSystem = system.toUntyped
val untypedSystem2 = akka.actor.ActorSystem(system.name, system.settings.config)
val system2 = untypedSystem2.toTyped
val system2 = ActorSystem(Behaviors.ignore[Any], name = system.name, config = system.settings.config)
val sharding2 = ClusterSharding(system2)
override def afterAll(): Unit = {
system2.terminate().futureValue
TestKit.shutdown(system2, 5.seconds)
super.afterAll()
}
@ -140,7 +137,8 @@ class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterSharding
Behaviors.stopped
case (ctx, WhoAreYou(replyTo))
replyTo ! s"I'm ${ctx.self.path.name}"
val address = Cluster(ctx.system).selfMember.address
replyTo ! s"I'm ${ctx.self.path.name} at ${address.host.get}:${address.port.get}"
Behaviors.same
case (_, ReplyPlz(toMe))
@ -150,11 +148,12 @@ class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterSharding
val typeKey2 = EntityTypeKey[IdTestProtocol]("no-envelope-shard")
val behaviorWithId = Behaviors.immutable[IdTestProtocol] {
case (_, IdStopPlz(_))
case (_, IdStopPlz())
Behaviors.stopped
case (ctx, IdWhoAreYou(_, replyTo))
replyTo ! s"I'm ${ctx.self.path.name}"
val address = Cluster(ctx.system).selfMember.address
replyTo ! s"I'm ${ctx.self.path.name} at ${address.host.get}:${address.port.get}"
Behaviors.same
case (_, IdReplyPlz(_, toMe))
@ -162,6 +161,44 @@ class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterSharding
Behaviors.same
}
val shardingRef1 = sharding.spawn(
behavior,
Props.empty,
typeKey,
ClusterShardingSettings(system),
10,
StopPlz())
val shardingRef2 = sharding2.spawn(
behavior,
Props.empty,
typeKey,
ClusterShardingSettings(system2),
10,
StopPlz())
val shardingRef3 = sharding.spawn(
behaviorWithId,
Props.empty,
typeKey2,
ClusterShardingSettings(system),
ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) {
case IdReplyPlz(id, _) id
case IdWhoAreYou(id, _) id
case other throw new IllegalArgumentException(s"Unexpected message $other")
})
val shardingRef4 = sharding2.spawn(
behaviorWithId,
Props.empty,
typeKey2,
ClusterShardingSettings(system2),
ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) {
case IdReplyPlz(id, _) id
case IdWhoAreYou(id, _) id
case other throw new IllegalArgumentException(s"Unexpected message $other")
})
"Typed cluster sharding" must {
"join cluster" in {
@ -180,50 +217,18 @@ class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterSharding
}
"send messages via cluster sharding, using envelopes" in {
val ref = sharding.spawn(
behavior,
Props.empty,
typeKey,
ClusterShardingSettings(system),
10,
StopPlz())
sharding2.spawn(
behavior,
Props.empty,
typeKey,
ClusterShardingSettings(system2),
10,
StopPlz())
(1 to 10).foreach { n
val p = TestProbe[String]()
ref ! ShardingEnvelope(s"test$n", ReplyPlz(p.ref))
p.expectMessage(3.seconds, "Hello!")
ref ! ShardingEnvelope(s"test$n", StopPlz())
shardingRef1 ! ShardingEnvelope(s"test$n", ReplyPlz(p.ref))
p.expectMessage("Hello!")
}
}
"send messages via cluster sharding, without envelopes" in {
val ref = sharding.spawn(
behaviorWithId,
Props.empty,
typeKey2,
ClusterShardingSettings(system),
ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, _.id),
IdStopPlz("THE_ID_HERE"))
sharding2.spawn(
behaviorWithId,
Props.empty,
typeKey2,
ClusterShardingSettings(system2),
ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, _.id),
IdStopPlz("THE_ID_HERE"))
(1 to 10).foreach { n
val p = TestProbe[String]()
ref ! IdReplyPlz(s"test$n", p.ref)
p.expectMessage(3.seconds, "Hello!")
ref ! IdStopPlz(s"test$n")
shardingRef3 ! IdReplyPlz(s"test$n", p.ref)
p.expectMessage("Hello!")
}
}
@ -248,10 +253,10 @@ class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterSharding
val p = TestProbe[String]()
charlieRef ! WhoAreYou(p.ref)
p.expectMessage(3.seconds, "I'm charlie")
p.expectMessageType[String] should startWith("I'm charlie")
charlieRef tell WhoAreYou(p.ref)
p.expectMessage(3.seconds, "I'm charlie")
p.expectMessageType[String] should startWith("I'm charlie")
charlieRef ! StopPlz()
}
@ -261,12 +266,39 @@ class ClusterShardingSpec extends TestKit("ClusterShardingSpec", ClusterSharding
val charlieRef = sharding.entityRefFor(typeKey, "charlie")
val reply1 = bobRef ? WhoAreYou // TODO document that WhoAreYou(_) would not work
reply1.futureValue should ===("I'm bob")
reply1.futureValue should startWith("I'm bob")
val reply2 = charlieRef ask WhoAreYou
reply2.futureValue should ===("I'm charlie")
reply2.futureValue should startWith("I'm charlie")
bobRef ! StopPlz()
}
"use the handOffStopMessage for leaving/rebalance" in {
var replies1 = Set.empty[String]
(1 to 10).foreach { n
val p = TestProbe[String]()
shardingRef1 ! ShardingEnvelope(s"test$n", WhoAreYou(p.ref))
replies1 += p.expectMessageType[String]
}
replies1.size should ===(10)
Cluster(system2).manager ! Leave(Cluster(system2).selfMember.address)
implicit val patienceConfig: PatienceConfig = PatienceConfig(5.seconds, Span(100, org.scalatest.time.Millis))
eventually {
// if it wouldn't receive the StopPlz and stop the leaving would take longer and this would fail
Cluster(system2).isTerminated should ===(true)
}
var replies2 = Set.empty[String]
(1 to 10).foreach { n
val p = TestProbe[String]()
shardingRef1 ! ShardingEnvelope(s"test$n", WhoAreYou(p.ref))
replies2 += p.expectMessageType[String](10.seconds)
}
replies2.size should ===(10)
replies2 should !==(replies1) // different addresses
}
}
}