=typ,clu akka typed cluster sharding, #23698

This commit is contained in:
Konrad `ktoso` Malawski 2017-09-20 19:23:09 +09:00 committed by Patrik Nordwall
parent 01de74cf41
commit 386289ee70
15 changed files with 1074 additions and 277 deletions

View file

@ -65,7 +65,7 @@ abstract class TypedSpec(val config: Config) extends TypedSpecSetup {
sys sys
} }
private var adaptedSystemUsed = false private var adaptedSystemUsed = false
lazy val adaptedSystem: ActorSystem[TypedSpec.Command] = { lazy val system: ActorSystem[TypedSpec.Command] = {
val sys = ActorSystem.adapter(AkkaSpec.getCallerName(classOf[TypedSpec]), guardian(), config = Some(config withFallback AkkaSpec.testConf)) val sys = ActorSystem.adapter(AkkaSpec.getCallerName(classOf[TypedSpec]), guardian(), config = Some(config withFallback AkkaSpec.testConf))
adaptedSystemUsed = true adaptedSystemUsed = true
sys sys
@ -90,7 +90,7 @@ abstract class TypedSpec(val config: Config) extends TypedSpecSetup {
} }
trait AdaptedSystem { trait AdaptedSystem {
def system: ActorSystem[TypedSpec.Command] = adaptedSystem def system: ActorSystem[TypedSpec.Command] = TypedSpec.this.system
} }
implicit val timeout = setTimeout implicit val timeout = setTimeout
@ -100,7 +100,7 @@ abstract class TypedSpec(val config: Config) extends TypedSpecSetup {
if (nativeSystemUsed) if (nativeSystemUsed)
Await.result(nativeSystem.terminate, timeout.duration) Await.result(nativeSystem.terminate, timeout.duration)
if (adaptedSystemUsed) if (adaptedSystemUsed)
Await.result(adaptedSystem.terminate, timeout.duration) Await.result(system.terminate, timeout.duration)
} }
// TODO remove after basing on ScalaTest 3 with async support // TODO remove after basing on ScalaTest 3 with async support

View file

@ -35,21 +35,21 @@ object ClusterApiSpec {
class ClusterApiSpec extends TypedSpec(ClusterApiSpec.config) with ScalaFutures { class ClusterApiSpec extends TypedSpec(ClusterApiSpec.config) with ScalaFutures {
val testSettings = TestKitSettings(adaptedSystem) val testSettings = TestKitSettings(system)
val clusterNode1 = Cluster(adaptedSystem) val clusterNode1 = Cluster(system)
val untypedSystem1 = ActorSystemAdapter.toUntyped(adaptedSystem) val untypedSystem1 = system.toUntyped
object `A typed cluster` { object `A typed cluster` {
def `01 must join a cluster and observe events from both sides`() = { def `01 must join a cluster and observe events from both sides`() = {
val system2 = akka.actor.ActorSystem(adaptedSystem.name, adaptedSystem.settings.config) val system2 = akka.actor.ActorSystem(system.name, system.settings.config)
val adaptedSystem2 = system2.toTyped val adaptedSystem2 = system2.toTyped
try { try {
val clusterNode2 = Cluster(adaptedSystem2) val clusterNode2 = Cluster(adaptedSystem2)
val node1Probe = TestProbe[AnyRef]()(adaptedSystem, testSettings) val node1Probe = TestProbe[AnyRef]()(system, testSettings)
val node2Probe = TestProbe[AnyRef]()(adaptedSystem2, testSettings) val node2Probe = TestProbe[AnyRef]()(adaptedSystem2, testSettings)
// initial cached selfMember // initial cached selfMember

View file

@ -1,55 +0,0 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.cluster
import akka.cluster.sharding.ClusterShardingSettings
import akka.typed.scaladsl.Actor
import akka.typed.scaladsl.adapter._
import akka.typed.{ ActorSystem }
class ClusterShardingApiSpec {
// Compile only for now
val system: akka.actor.ActorSystem = ???
val typedSystem: ActorSystem[Nothing] = system.toTyped
val cluster = Cluster(typedSystem)
trait EntityProtocol
case class Add(thing: String) extends EntityProtocol
case object PassHence extends EntityProtocol
val entityBehavior =
Actor.deferred[EntityProtocol] { _
var things: List[String] = Nil
Actor.immutable[EntityProtocol] { (_, msg)
msg match {
case Add(thing)
things = thing :: things
Actor.same
case PassHence
Actor.stopped
}
}
}
val sharding = ClusterSharding(typedSystem).spawn(
entityBehavior,
"things-lists",
ClusterShardingSettings(typedSystem.settings.config),
maxNumberOfShards = 25,
handOffStopMessage = PassHence
)
sharding ! ShardingEnvelope("1", Add("bananas"))
val entity1 = ClusterSharding.entityRefFor("1", sharding)
entity1 ! Add("pineapple")
// start but no command
sharding ! StartEntity("2")
}

View file

@ -88,23 +88,23 @@ object ClusterSingletonApiSpec {
class ClusterSingletonApiSpec extends TypedSpec(ClusterSingletonApiSpec.config) with ScalaFutures { class ClusterSingletonApiSpec extends TypedSpec(ClusterSingletonApiSpec.config) with ScalaFutures {
import ClusterSingletonApiSpec._ import ClusterSingletonApiSpec._
implicit val testSettings = TestKitSettings(adaptedSystem) implicit val testSettings = TestKitSettings(system)
val clusterNode1 = Cluster(adaptedSystem) val clusterNode1 = Cluster(system)
val untypedSystem1 = ActorSystemAdapter.toUntyped(adaptedSystem) val untypedSystem1 = system.toUntyped
val system2 = akka.actor.ActorSystem( val system2 = akka.actor.ActorSystem(
adaptedSystem.name, system.name,
ConfigFactory.parseString( ConfigFactory.parseString(
""" """
akka.cluster.roles = ["singleton"] akka.cluster.roles = ["singleton"]
""").withFallback(adaptedSystem.settings.config)) """).withFallback(system.settings.config))
val adaptedSystem2 = system2.toTyped val adaptedSystem2 = system2.toTyped
val clusterNode2 = Cluster(adaptedSystem2) val clusterNode2 = Cluster(adaptedSystem2)
object `A typed cluster singleton` { object `A typed cluster singleton` {
def `01 must be accessible from two nodes in a cluster`() = { def `01 must be accessible from two nodes in a cluster`() = {
val node1UpProbe = TestProbe[SelfUp]()(adaptedSystem, implicitly[TestKitSettings]) val node1UpProbe = TestProbe[SelfUp]()(system, implicitly[TestKitSettings])
clusterNode1.subscriptions ! Subscribe(node1UpProbe.ref, classOf[SelfUp]) clusterNode1.subscriptions ! Subscribe(node1UpProbe.ref, classOf[SelfUp])
val node2UpProbe = TestProbe[SelfUp]()(adaptedSystem2, implicitly[TestKitSettings]) val node2UpProbe = TestProbe[SelfUp]()(adaptedSystem2, implicitly[TestKitSettings])
@ -116,10 +116,10 @@ class ClusterSingletonApiSpec extends TypedSpec(ClusterSingletonApiSpec.config)
node1UpProbe.expectMsgType[SelfUp] node1UpProbe.expectMsgType[SelfUp]
node2UpProbe.expectMsgType[SelfUp] node2UpProbe.expectMsgType[SelfUp]
val cs1 = ClusterSingleton(adaptedSystem) val cs1 = ClusterSingleton(system)
val cs2 = ClusterSingleton(adaptedSystem2) val cs2 = ClusterSingleton(adaptedSystem2)
val settings = ClusterSingletonSettings(adaptedSystem).withRole("singleton") val settings = ClusterSingletonSettings(system).withRole("singleton")
val node1ref = cs1.spawn(pingPong, "ping-pong", Props.empty, settings, Perish) val node1ref = cs1.spawn(pingPong, "ping-pong", Props.empty, settings, Perish)
val node2ref = cs2.spawn(pingPong, "ping-pong", Props.empty, settings, Perish) val node2ref = cs2.spawn(pingPong, "ping-pong", Props.empty, settings, Perish)
@ -127,7 +127,7 @@ class ClusterSingletonApiSpec extends TypedSpec(ClusterSingletonApiSpec.config)
cs1.spawn(pingPong, "ping-pong", Props.empty, settings, Perish) should ===(node1ref) cs1.spawn(pingPong, "ping-pong", Props.empty, settings, Perish) should ===(node1ref)
cs2.spawn(pingPong, "ping-pong", Props.empty, settings, Perish) should ===(node2ref) cs2.spawn(pingPong, "ping-pong", Props.empty, settings, Perish) should ===(node2ref)
val node1PongProbe = TestProbe[Pong.type]()(adaptedSystem, implicitly[TestKitSettings]) val node1PongProbe = TestProbe[Pong.type]()(system, implicitly[TestKitSettings])
val node2PongProbe = TestProbe[Pong.type]()(adaptedSystem2, implicitly[TestKitSettings]) val node2PongProbe = TestProbe[Pong.type]()(adaptedSystem2, implicitly[TestKitSettings])
node1PongProbe.awaitAssert({ node1PongProbe.awaitAssert({

View file

@ -30,9 +30,9 @@ class MiscMessageSerializerSpec extends TypedSpec(MiscMessageSerializerSpec.conf
def `must serialize and deserialize typed actor refs `(): Unit = { def `must serialize and deserialize typed actor refs `(): Unit = {
val ref = (adaptedSystem ? Create(Actor.empty[Unit], "some-actor")).futureValue val ref = (system ? Create(Actor.empty[Unit], "some-actor")).futureValue
val serialization = SerializationExtension(ActorSystemAdapter.toUntyped(adaptedSystem)) val serialization = SerializationExtension(ActorSystemAdapter.toUntyped(system))
val serializer = serialization.findSerializerFor(ref) match { val serializer = serialization.findSerializerFor(ref) match {
case s: SerializerWithStringManifest s case s: SerializerWithStringManifest s

View file

@ -97,6 +97,7 @@ object ClusterReceptionistSpec {
class ClusterReceptionistSpec extends TypedSpec(ClusterReceptionistSpec.config) { class ClusterReceptionistSpec extends TypedSpec(ClusterReceptionistSpec.config) {
import ClusterReceptionistSpec._ import ClusterReceptionistSpec._
val adaptedSystem = system
implicit val testSettings = TestKitSettings(adaptedSystem) implicit val testSettings = TestKitSettings(adaptedSystem)
val untypedSystem1 = ActorSystemAdapter.toUntyped(adaptedSystem) val untypedSystem1 = ActorSystemAdapter.toUntyped(adaptedSystem)
val clusterNode1 = Cluster(untypedSystem1) val clusterNode1 = Cluster(untypedSystem1)

View file

@ -0,0 +1,176 @@
/*
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.typed.cluster.sharding
import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy
import akka.typed.{ ActorRef, ActorSystem, Props, TypedSpec }
import akka.typed.cluster.Cluster
import akka.typed.internal.adapter.ActorSystemAdapter
import akka.typed.scaladsl.Actor
import akka.typed.scaladsl.adapter._
import akka.typed.testkit.TestKitSettings
import akka.typed.testkit.scaladsl.TestProbe
import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.duration._
object ClusterShardingSpec {
val config = ConfigFactory.parseString(
"""
akka.actor.provider = cluster
// akka.loglevel = debug
akka.remote.artery.enabled = true
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
akka.remote.artery.canonical.hostname = 127.0.0.1
akka.cluster.jmx.multi-mbeans-in-same-jvm = on
akka.coordinated-shutdown.terminate-actor-system = off
akka.actor {
serialize-messages = off
allow-java-serialization = off
}
""".stripMargin)
sealed trait TestProtocol
final case class ReplyPlz(toMe: ActorRef[String]) extends TestProtocol
final case class WhoAreYou(replyTo: ActorRef[String]) extends TestProtocol
final case class StopPlz() extends TestProtocol
sealed trait IdTestProtocol { def id: String }
final case class IdReplyPlz(id: String, toMe: ActorRef[String]) extends IdTestProtocol
final case class IdWhoAreYou(id: String, replyTo: ActorRef[String]) extends IdTestProtocol
final case class IdStopPlz(id: String) extends IdTestProtocol
}
class ClusterShardingSpec extends TypedSpec(ClusterShardingSpec.config) with ScalaFutures {
import akka.typed.scaladsl.adapter._
import ClusterShardingSpec._
implicit val s = system
implicit val testkitSettings = TestKitSettings(system)
val sharding = ClusterSharding(system)
implicit val untypedSystem = system.toUntyped
private val untypedCluster = akka.cluster.Cluster(untypedSystem)
val behavior = Actor.immutable[TestProtocol] {
case (_, StopPlz())
Actor.stopped
case (ctx, WhoAreYou(replyTo))
replyTo ! s"I'm ${ctx.self.path.name}"
Actor.same
case (_, ReplyPlz(toMe))
toMe ! "Hello!"
Actor.same
}
val behaviorWithId = Actor.immutable[IdTestProtocol] {
case (_, IdStopPlz(_))
Actor.stopped
case (ctx, IdWhoAreYou(_, replyTo))
replyTo ! s"I'm ${ctx.self.path.name}"
Actor.same
case (_, IdReplyPlz(_, toMe))
toMe ! "Hello!"
Actor.same
}
object `Typed cluster sharding` {
untypedCluster.join(untypedCluster.selfAddress)
def `01 must send messsages via cluster sharding, using envelopes`(): Unit = {
val ref = sharding.spawn(
behavior,
Props.empty,
"envelope-shard",
ClusterShardingSettings(system),
10,
StopPlz())
val p = TestProbe[String]()
ref ! ShardingEnvelope("test", ReplyPlz(p.ref))
p.expectMsg(3.seconds, "Hello!")
ref ! ShardingEnvelope("test", StopPlz())
}
def `02 must send messsages via cluster sharding, without envelopes`(): Unit = {
val ref = sharding.spawn(
behaviorWithId,
Props.empty,
"no-envelope-shard",
ClusterShardingSettings(system),
ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, _.id),
IdStopPlz("THE_ID_HERE"))
val p = TestProbe[String]()
ref ! IdReplyPlz("test", p.ref)
p.expectMsg(3.seconds, "Hello!")
ref ! IdStopPlz("test")
}
// def `03 fail if starting sharding for already used typeName, but with wrong type`(): Unit = {
// val ex = intercept[Exception] {
// sharding.spawn(
// Actor.empty[String],
// Props.empty,
// "example-02",
// ClusterShardingSettings(adaptedSystem),
// 10,
// "STOP"
// )
// }
//
// ex.getMessage should include("already started")
// }
untypedCluster.join(untypedCluster.selfAddress)
def `11 EntityRef - tell`(): Unit = {
val charlieRef: EntityRef[TestProtocol] =
sharding.entityRefFor[TestProtocol]("envelope-shard", "charlie")
val p = TestProbe[String]()
charlieRef ! WhoAreYou(p.ref)
p.expectMsg(3.seconds, "I'm charlie")
charlieRef tell WhoAreYou(p.ref)
p.expectMsg(3.seconds, "I'm charlie")
charlieRef ! StopPlz()
}
def `11 EntityRef - ask`(): Unit = {
val bobRef: EntityRef[TestProtocol] =
sharding.entityRefFor[TestProtocol]("envelope-shard", "bob")
val charlieRef: EntityRef[TestProtocol] =
sharding.entityRefFor[TestProtocol]("envelope-shard", "charlie")
val p = TestProbe[String]()
val reply1 = bobRef ? WhoAreYou // TODO document that WhoAreYou(_) would not work
reply1.futureValue should ===("I'm bob")
val reply2 = charlieRef ask WhoAreYou
reply2.futureValue should ===("I'm charlie")
bobRef ! StopPlz()
}
}
}

View file

@ -1,196 +0,0 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.typed.cluster
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.ClusterShardingSettings
import akka.typed.{ ActorRef, ActorSystem, Behavior, Extension, ExtensionId, Props }
sealed case class ShardingEnvelope[A](entityId: String, message: A)
object StartEntity {
def apply[A](entityId: String): ShardingEnvelope[A] =
new ShardingEnvelope[A](entityId, null.asInstanceOf[A])
}
object TypedMessageExtractor {
/**
* Scala API:
*
* Create the default message extractor, using envelopes to identify what entity a message is for
* and the hashcode of the entityId to decide which shard an entity belongs to.
*
* This is recommended since it does not force details about sharding into the entity protocol
*/
def apply[A](maxNumberOfShards: Int): TypedMessageExtractor[ShardingEnvelope[A], A] =
new DefaultMessageExtractor[A](maxNumberOfShards)
/**
* Scala API:
*
* Create a message extractor for a protocol where the entity id is available in each message.
*/
def noEnvelope[A](
maxNumberOfShards: Int,
extractEntityId: A String
): TypedMessageExtractor[A, A] =
new DefaultNoEnvelopeMessageExtractor[A](maxNumberOfShards) {
// TODO catch MatchError here and return null for those to yield an "unhandled" when partial functions are used?
def entityId(message: A) = extractEntityId(message)
}
}
/**
* Entirely customizable typed message extractor. Prefer [[DefaultMessageExtractor]] or [[DefaultNoEnvelopeMessageExtractor]]
* if possible.
*
* @tparam E Possibly an envelope around the messages accepted by the entity actor, is the same as `A` if there is no
* envelope.
* @tparam A The type of message accepted by the entity actor
*/
trait TypedMessageExtractor[E, A] {
/**
* Extract the entity id from an incoming `message`. If `null` is returned
* the message will be `unhandled`, i.e. posted as `Unhandled` messages on the event stream
*/
def entityId(message: E): String
/**
* Extract the message to send to the entity from an incoming `message`.
* Note that the extracted message does not have to be the same as the incoming
* message to support wrapping in message envelope that is unwrapped before
* sending to the entity actor.
*
* If the returned value is `null`, and the entity isn't running yet the entity will be started
* but no message will be delivered to it.
*/
def entityMessage(message: E): A
/**
* Extract the entity id from an incoming `message`. Only messages that passed the [[#entityId]]
* function will be used as input to this function.
*/
def shardId(message: E): String
}
/**
* Java API:
*
* Default message extractor type, using envelopes to identify what entity a message is for
* and the hashcode of the entityId to decide which shard an entity belongs to.
*
* This is recommended since it does not force details about sharding into the entity protocol
*
* @tparam A The type of message accepted by the entity actor
*/
final class DefaultMessageExtractor[A](maxNumberOfShards: Int) extends TypedMessageExtractor[ShardingEnvelope[A], A] {
def entityId(envelope: ShardingEnvelope[A]) = envelope.entityId
def entityMessage(envelope: ShardingEnvelope[A]) = envelope.message
def shardId(envelope: ShardingEnvelope[A]) = (math.abs(envelope.entityId.hashCode) % maxNumberOfShards).toString
}
/**
* Java API:
*
* Default message extractor type, using a property of the message to identify what entity a message is for
* and the hashcode of the entityId to decide which shard an entity belongs to.
*
* This is recommended since it does not force details about sharding into the entity protocol
*
* @tparam A The type of message accepted by the entity actor
*/
abstract class DefaultNoEnvelopeMessageExtractor[A](maxNumberOfShards: Int) extends TypedMessageExtractor[A, A] {
def entityMessage(message: A) = message
def shardId(message: A) = {
val id = entityId(message)
if (id != null) (math.abs(id.hashCode) % maxNumberOfShards).toString
else null
}
}
/**
* A reference to an entityId and the local access to sharding, allows for actor-like interaction
*
* The entity ref must be resolved locally and cannot be sent to another node.
*
* TODO what about ask, should it actually implement ActorRef to be exactly like ActorRef and callers does not have
* to know at all about it or is it good with a distinction but lookalike API?
*/
trait EntityRef[A] {
/**
* Send a message to the entity referenced by this EntityRef using *at-most-once*
* messaging semantics.
*/
def tell(msg: A): Unit
}
object EntityRef {
implicit final class EntityRefOps[T](val ref: EntityRef[T]) extends AnyVal {
/**
* Send a message to the Actor referenced by this ActorRef using *at-most-once*
* messaging semantics.
*/
def !(msg: T): Unit = ref.tell(msg)
}
}
object ClusterSharding extends ExtensionId[ClusterSharding] {
def createExtension(system: ActorSystem[_]): ClusterSharding = ???
/**
* Create an ActorRef-like reference to a specific sharded entity. Messages sent to it will be wrapped
* in a [[ShardingEnvelope]] and passed to the local shard region or proxy.
*/
def entityRefFor[A](entityId: String, actorRef: ActorRef[ShardingEnvelope[A]]): EntityRef[A] =
new EntityRef[A] {
def tell(msg: A): Unit = actorRef ! ShardingEnvelope(entityId, msg)
}
}
trait ClusterSharding extends Extension {
/**
* Spawn a shard region or a proxy depending on if the settings require role and if this node has such a role.
*
* Messages are sent to the entities by wrapping the messages in a [[ShardingEnvelope]] with the entityId of the
* recipient actor.
* A [[DefaultMessageExtractor]] will be used for extracting entityId and shardId
* [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] will be used for shard allocation strategy.
*
* @param behavior The behavior for entities
* @param typeName A name that uniquely identifies the type of entity in this cluster
* @param handOffStopMessage Message sent to an entity to tell it to stop
* @tparam A The type of command the entity accepts
*/
def spawn[A](
behavior: Behavior[A],
typeName: String,
settings: ClusterShardingSettings,
maxNumberOfShards: Int,
handOffStopMessage: A): ActorRef[ShardingEnvelope[A]]
/**
* Spawn a shard region or a proxy depending on if the settings require role and if this node has such a role.
*
* @param behavior The behavior for entities
* @param typeName A name that uniquely identifies the type of entity in this cluster
* @param entityProps Props to apply when starting an entity
* @param handOffStopMessage Message sent to an entity to tell it to stop
* @tparam E A possible envelope around the message the entity accepts
* @tparam A The type of command the entity accepts
*/
def spawn[E, A](
behavior: Behavior[A],
typeName: String,
entityProps: Props,
settings: ClusterShardingSettings,
messageExtractor: TypedMessageExtractor[E, A],
allocationStrategy: ShardAllocationStrategy,
handOffStopMessage: A
): ActorRef[E]
}

View file

@ -6,12 +6,14 @@ package akka.typed.cluster
import akka.actor.NoSerializationVerificationNeeded import akka.actor.NoSerializationVerificationNeeded
import akka.annotation.{ DoNotInherit, InternalApi } import akka.annotation.{ DoNotInherit, InternalApi }
import akka.cluster.ClusterSettings.DataCenter import akka.cluster.ClusterSettings.DataCenter
import akka.cluster.singleton.{ ClusterSingletonManagerSettings, ClusterSingletonProxySettings } import akka.cluster.singleton.{ ClusterSingletonProxySettings, ClusterSingletonManagerSettings UntypedClusterSingletonManagerSettings }
import akka.typed.cluster.internal.AdaptedClusterSingletonImpl import akka.typed.cluster.internal.AdaptedClusterSingletonImpl
import akka.typed.internal.adapter.ActorSystemAdapter
import akka.typed.{ ActorRef, ActorSystem, Behavior, Extension, ExtensionId, Props } import akka.typed.{ ActorRef, ActorSystem, Behavior, Extension, ExtensionId, Props }
import com.typesafe.config.Config import com.typesafe.config.Config
import scala.concurrent.duration._
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.{ Duration, FiniteDuration }
object ClusterSingletonSettings { object ClusterSingletonSettings {
def apply( def apply(
@ -71,8 +73,8 @@ final class ClusterSingletonSettings(
* INTERNAL API: * INTERNAL API:
*/ */
@InternalApi @InternalApi
private[akka] def toManagerSettings(singletonName: String): ClusterSingletonManagerSettings = private[akka] def toManagerSettings(singletonName: String): UntypedClusterSingletonManagerSettings =
new ClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval) new UntypedClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval)
/** /**
* INTERNAL API: * INTERNAL API:
@ -133,3 +135,92 @@ trait ClusterSingleton extends Extension {
): ActorRef[A] ): ActorRef[A]
} }
object ClusterSingletonManagerSettings {
import akka.typed.scaladsl.adapter._
/**
* Create settings from the default configuration
* `akka.cluster.singleton`.
*/
def apply(system: ActorSystem[_]): ClusterSingletonManagerSettings =
apply(system.settings.config.getConfig("akka.cluster.singleton"))
.withRemovalMargin(akka.cluster.Cluster(system.toUntyped).settings.DownRemovalMargin)
/**
* Create settings from a configuration with the same layout as
* the default configuration `akka.cluster.singleton`.
*/
def apply(config: Config): ClusterSingletonManagerSettings =
new ClusterSingletonManagerSettings(
singletonName = config.getString("singleton-name"),
role = roleOption(config.getString("role")),
removalMargin = Duration.Zero, // defaults to ClusterSettins.DownRemovalMargin
handOverRetryInterval = config.getDuration("hand-over-retry-interval", MILLISECONDS).millis)
/**
* Java API: Create settings from the default configuration
* `akka.cluster.singleton`.
*/
def create(system: ActorSystem[_]): ClusterSingletonManagerSettings = apply(system)
/**
* Java API: Create settings from a configuration with the same layout as
* the default configuration `akka.cluster.singleton`.
*/
def create(config: Config): ClusterSingletonManagerSettings = apply(config)
/**
* INTERNAL API
*/
private[akka] def roleOption(role: String): Option[String] =
if (role == "") None else Option(role)
}
/**
* @param singletonName The actor name of the child singleton actor.
*
* @param role Singleton among the nodes tagged with specified role.
* If the role is not specified it's a singleton among all nodes in
* the cluster.
*
* @param removalMargin Margin until the singleton instance that belonged to
* a downed/removed partition is created in surviving partition. The purpose of
* this margin is that in case of a network partition the singleton actors
* in the non-surviving partitions must be stopped before corresponding actors
* are started somewhere else. This is especially important for persistent
* actors.
*
* @param handOverRetryInterval When a node is becoming oldest it sends hand-over
* request to previous oldest, that might be leaving the cluster. This is
* retried with this interval until the previous oldest confirms that the hand
* over has started or the previous oldest member is removed from the cluster
* (+ `removalMargin`).
*/
final class ClusterSingletonManagerSettings(
val singletonName: String,
val role: Option[String],
val removalMargin: FiniteDuration,
val handOverRetryInterval: FiniteDuration) extends NoSerializationVerificationNeeded {
def withSingletonName(name: String): ClusterSingletonManagerSettings = copy(singletonName = name)
def withRole(role: String): ClusterSingletonManagerSettings = copy(role = UntypedClusterSingletonManagerSettings.roleOption(role))
def withRole(role: Option[String]) = copy(role = role)
def withRemovalMargin(removalMargin: FiniteDuration): ClusterSingletonManagerSettings =
copy(removalMargin = removalMargin)
def withHandOverRetryInterval(retryInterval: FiniteDuration): ClusterSingletonManagerSettings =
copy(handOverRetryInterval = retryInterval)
private def copy(
singletonName: String = singletonName,
role: Option[String] = role,
removalMargin: FiniteDuration = removalMargin,
handOverRetryInterval: FiniteDuration = handOverRetryInterval): ClusterSingletonManagerSettings =
new ClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval)
}

View file

@ -135,7 +135,7 @@ private[akka] final class AdapterClusterImpl(system: ActorSystem[_]) extends Clu
import AdapterClusterImpl._ import AdapterClusterImpl._
require(system.isInstanceOf[ActorSystemAdapter[_]], "only adapted actor systems can be used for cluster features") require(system.isInstanceOf[ActorSystemAdapter[_]], "only adapted actor systems can be used for cluster features")
private val untypedSystem = ActorSystemAdapter.toUntyped(system) private val untypedSystem = system.toUntyped
private def extendedUntyped = untypedSystem.asInstanceOf[ExtendedActorSystem] private def extendedUntyped = untypedSystem.asInstanceOf[ExtendedActorSystem]
private val untypedCluster = akka.cluster.Cluster(untypedSystem) private val untypedCluster = akka.cluster.Cluster(untypedSystem)

View file

@ -21,9 +21,10 @@ import akka.typed.{ ActorRef, ActorSystem, Behavior, Props }
private[akka] final class AdaptedClusterSingletonImpl(system: ActorSystem[_]) extends ClusterSingleton { private[akka] final class AdaptedClusterSingletonImpl(system: ActorSystem[_]) extends ClusterSingleton {
require(system.isInstanceOf[ActorSystemAdapter[_]], "only adapted actor systems can be used for the typed cluster singleton") require(system.isInstanceOf[ActorSystemAdapter[_]], "only adapted actor systems can be used for the typed cluster singleton")
import ClusterSingletonImpl._ import ClusterSingletonImpl._
import akka.typed.scaladsl.adapter._
private lazy val cluster = Cluster(system) private lazy val cluster = Cluster(system)
private val untypedSystem = ActorSystemAdapter.toUntyped(system).asInstanceOf[ExtendedActorSystem] private val untypedSystem = system.toUntyped.asInstanceOf[ExtendedActorSystem]
private val proxies = new ConcurrentHashMap[String, ActorRef[_]]() private val proxies = new ConcurrentHashMap[String, ActorRef[_]]()

View file

@ -0,0 +1,344 @@
/*
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.typed.cluster.sharding
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.cluster.sharding.ShardCoordinator.{ LeastShardAllocationStrategy, ShardAllocationStrategy }
import akka.cluster.sharding.{ ClusterSharding UntypedClusterSharding, ShardRegion UntypedShardRegion }
import akka.typed.cluster.Cluster
import akka.typed.internal.adapter.{ ActorRefAdapter, ActorSystemAdapter }
import akka.typed.scaladsl.adapter.PropsAdapter
import akka.typed.{ ActorRef, ActorSystem, Behavior, Extension, ExtensionId, Props }
import scala.language.implicitConversions
/**
* Default envelope type that may be used with Cluster Sharding.
*
* Cluster Sharding provides a default [[HashCodeMessageExtractor]] that is able to handle
* these types of messages, by hashing the entityId into into the shardId. It is not the only,
* but a convenient way to send envelope-wrapped messages via cluster sharding.
*
* The alternative way of routing messages through sharding is to not use envelopes,
* and have the message types themselfs carry identifiers.
*/
final case class ShardingEnvelope[A](entityId: String, message: A) // TODO think if should remain a case class
/** Allows starting a specific Sharded Entity by its entity identifier */
object StartEntity {
/**
* Returns [[ShardingEnvelope]] which can be sent via Cluster Sharding in order to wake up the
* specified (by `entityId`) Sharded Entity, ''without'' delivering a real message to it.
*/
def apply[A](entityId: String): ShardingEnvelope[A] =
new ShardingEnvelope[A](entityId, null.asInstanceOf[A]) // TODO should we instead sub-class here somehow?
/**
* Java API
*
* Returns [[ShardingEnvelope]] which can be sent via Cluster Sharding in order to wake up the
* specified (by `entityId`) Sharded Entity, ''without'' delivering a real message to it.
*/
def create[A](msgClass: Class[A], entityId: String): ShardingEnvelope[A] =
apply[A](entityId)
}
object ShardingMessageExtractor {
/**
* Scala API:
*
* Create the default message extractor, using envelopes to identify what entity a message is for
* and the hashcode of the entityId to decide which shard an entity belongs to.
*
* This is recommended since it does not force details about sharding into the entity protocol
*/
def apply[A](maxNumberOfShards: Int): ShardingMessageExtractor[ShardingEnvelope[A], A] =
new HashCodeMessageExtractor[A](maxNumberOfShards)
/**
* Create a message extractor for a protocol where the entity id is available in each message.
*/
def noEnvelope[A](
maxNumberOfShards: Int,
extractEntityId: A String
): ShardingMessageExtractor[A, A] =
new HashCodeNoEnvelopeMessageExtractor[A](maxNumberOfShards) {
// TODO catch MatchError here and return null for those to yield an "unhandled" when partial functions are used?
def entityId(message: A) = extractEntityId(message)
}
}
/**
* Entirely customizable typed message extractor. Prefer [[HashCodeMessageExtractor]] or [[HashCodeNoEnvelopeMessageExtractor]]
* if possible.
*
* @tparam E Possibly an Envelope around the messages accepted by the entity actor, is the same as `A` if there is no
* envelope.
* @tparam A The type of message accepted by the entity actor
*/
trait ShardingMessageExtractor[E, A] {
/**
* Extract the entity id from an incoming `message`. If `null` is returned
* the message will be `unhandled`, i.e. posted as `Unhandled` messages on the event stream
*/
def entityId(message: E): String
/**
* Extract the message to send to the entity from an incoming `message`.
* Note that the extracted message does not have to be the same as the incoming
* message to support wrapping in message envelope that is unwrapped before
* sending to the entity actor.
*
* If the returned value is `null`, and the entity isn't running yet the entity will be started
* but no message will be delivered to it.
*/
def entityMessage(message: E): A // TODO "unwrapMessage" is how I'd call it?
/**
* Extract the entity id from an incoming `message`. Only messages that passed the [[#entityId]]
* function will be used as input to this function.
*/
def shardId(message: E): String
}
/**
* Java API:
*
* Default message extractor type, using envelopes to identify what entity a message is for
* and the hashcode of the entityId to decide which shard an entity belongs to.
*
* This is recommended since it does not force details about sharding into the entity protocol
*
* @tparam A The type of message accepted by the entity actor
*/
final class HashCodeMessageExtractor[A](maxNumberOfShards: Int) extends ShardingMessageExtractor[ShardingEnvelope[A], A] {
def entityId(envelope: ShardingEnvelope[A]): String = envelope.entityId
def entityMessage(envelope: ShardingEnvelope[A]): A = envelope.message
def shardId(envelope: ShardingEnvelope[A]): String = (math.abs(envelope.entityId.hashCode) % maxNumberOfShards).toString
}
/**
* Java API:
*
* Default message extractor type, using a property of the message to identify what entity a message is for
* and the hashcode of the entityId to decide which shard an entity belongs to.
*
* This is recommended since it does not force details about sharding into the entity protocol
*
* @tparam A The type of message accepted by the entity actor
*/
abstract class HashCodeNoEnvelopeMessageExtractor[A](maxNumberOfShards: Int) extends ShardingMessageExtractor[A, A] {
final def entityMessage(message: A): A = message
def shardId(message: A): String = {
val id = entityId(message)
if (id != null) (math.abs(id.hashCode) % maxNumberOfShards).toString
else null
}
override def toString = s"HashCodeNoEnvelopeMessageExtractor($maxNumberOfShards)"
}
object ClusterSharding extends ExtensionId[ClusterSharding] {
override def createExtension(system: ActorSystem[_]): ClusterSharding =
new AdaptedClusterShardingImpl(system)
/** Java API */
def get(system: ActorSystem[_]): ClusterSharding = apply(system)
}
/** INTERNAL API */
@InternalApi
final class AdaptedClusterShardingImpl(system: ActorSystem[_]) extends ClusterSharding {
import akka.typed.scaladsl.adapter._
require(system.isInstanceOf[ActorSystemAdapter[_]], "only adapted untyped actor systems can be used for cluster features")
private val cluster = Cluster(system)
private val untypedSystem = system.toUntyped
private val untypedSharding = akka.cluster.sharding.ClusterSharding(untypedSystem)
override def spawn[A](
behavior: Behavior[A],
entityProps: Props,
typeName: String,
settings: ClusterShardingSettings,
maxNumberOfShards: Int,
handOffStopMessage: A): ActorRef[ShardingEnvelope[A]] = {
val extractor = new HashCodeMessageExtractor[A](maxNumberOfShards)
spawn(behavior, entityProps, typeName, settings, extractor, defaultShardAllocationStrategy(settings), handOffStopMessage)
}
override def spawn[E, A](
behavior: Behavior[A],
entityProps: Props,
typeName: String,
settings: ClusterShardingSettings,
messageExtractor: ShardingMessageExtractor[E, A],
handOffStopMessage: A): ActorRef[E] =
spawn(behavior, entityProps, typeName, settings, messageExtractor, defaultShardAllocationStrategy(settings), handOffStopMessage)
override def spawn[E, A](
behavior: Behavior[A],
entityProps: Props,
typeName: String,
settings: ClusterShardingSettings,
extractor: ShardingMessageExtractor[E, A],
allocationStrategy: ShardAllocationStrategy,
handOffStopMessage: A): ActorRef[E] = {
val untypedSettings = ClusterShardingSettings.toUntypedSettings(settings)
val ref =
if (settings.shouldHostShard(cluster)) {
system.log.info("Starting Shard Region [{}]...")
untypedSharding.start(
typeName,
PropsAdapter(behavior, entityProps),
untypedSettings,
extractor, extractor,
defaultShardAllocationStrategy(settings),
handOffStopMessage
)
} else {
system.log.info("Starting Shard Region Proxy [{}] (no actors will be hosted on this node)...")
untypedSharding.startProxy(
typeName,
settings.role,
dataCenter = None, // TODO what about the multi-dc value here?
extractShardId = extractor,
extractEntityId = extractor
)
}
ActorRefAdapter(ref)
}
override def entityRefFor[A](typeName: String, entityId: String): EntityRef[A] = {
new AdaptedEntityRefImpl[A](untypedSharding.shardRegion(typeName), entityId)
}
override def getEntityRefFor[A](msgClass: Class[A], typeName: String, entityId: String): EntityRef[A] =
entityRefFor[A](typeName, entityId)
override def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy = {
val threshold = settings.tuningParameters.leastShardAllocationRebalanceThreshold
val maxSimultaneousRebalance = settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance
new LeastShardAllocationStrategy(threshold, maxSimultaneousRebalance)
}
// --- extractor conversions ---
@InternalApi
private implicit def convertExtractEntityId[E, A](extractor: ShardingMessageExtractor[E, A]): UntypedShardRegion.ExtractEntityId = {
// TODO what if msg was null
case msg: E if extractor.entityId(msg.asInstanceOf[E]) ne null
// we're evaluating entityId twice, I wonder if we could do it just once (same was in old sharding's Java DSL)
(extractor.entityId(msg.asInstanceOf[E]), extractor.entityMessage(msg.asInstanceOf[E]))
}
@InternalApi
private implicit def convertExtractShardId[E, A](extractor: ShardingMessageExtractor[E, A]): UntypedShardRegion.ExtractShardId = {
case msg: E extractor.shardId(msg)
}
}
@DoNotInherit
sealed trait ClusterSharding extends Extension {
/**
* Spawn a shard region or a proxy depending on if the settings require role and if this node has such a role.
*
* Messages are sent to the entities by wrapping the messages in a [[ShardingEnvelope]] with the entityId of the
* recipient actor.
* A [[HashCodeMessageExtractor]] will be used for extracting entityId and shardId
* [[akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy]] will be used for shard allocation strategy.
*
* @param behavior The behavior for entities
* @param typeName A name that uniquely identifies the type of entity in this cluster
* @param handOffStopMessage Message sent to an entity to tell it to stop
* @tparam A The type of command the entity accepts
*/
// TODO: FYI, I think it would be very good to have rule that "behavior, otherstuff"
// TODO: or "behavior, props, otherstuff" be the consistent style we want to promote in parameter ordering, WDYT?
def spawn[A](
behavior: Behavior[A],
props: Props,
typeName: String,
settings: ClusterShardingSettings,
maxNumberOfShards: Int,
handOffStopMessage: A): ActorRef[ShardingEnvelope[A]]
/**
* Spawn a shard region or a proxy depending on if the settings require role and if this node has such a role.
*
* @param behavior The behavior for entities
* @param typeName A name that uniquely identifies the type of entity in this cluster
* @param entityProps Props to apply when starting an entity
* @param allocationStrategy Allocation strategy which decides on which nodes to allocate new shards
* @param handOffStopMessage Message sent to an entity to tell it to stop
* @tparam E A possible envelope around the message the entity accepts
* @tparam A The type of command the entity accepts
*/
def spawn[E, A](
behavior: Behavior[A],
entityProps: Props,
typeName: String,
settings: ClusterShardingSettings,
messageExtractor: ShardingMessageExtractor[E, A],
allocationStrategy: ShardAllocationStrategy,
handOffStopMessage: A
): ActorRef[E]
/**
* Spawn a shard region or a proxy depending on if the settings require role and if this node has such a role.
*
* @param behavior The behavior for entities
* @param typeName A name that uniquely identifies the type of entity in this cluster
* @param entityProps Props to apply when starting an entity
* @param handOffStopMessage Message sent to an entity to tell it to stop
* @tparam E A possible envelope around the message the entity accepts
* @tparam A The type of command the entity accepts
*/
def spawn[E, A](
behavior: Behavior[A],
entityProps: Props,
typeName: String,
settings: ClusterShardingSettings,
messageExtractor: ShardingMessageExtractor[E, A],
handOffStopMessage: A
): ActorRef[E]
/**
* Create an `ActorRef`-like reference to a specific sharded entity.
* Currently you have to correctly specify the type of messages the target can handle.
*
* Messages sent through this [[EntityRef]] will be wrapped in a [[ShardingEnvelope]] including the
* here provided `entityId`.
*
* FIXME a more typed version of this API will be explored in https://github.com/akka/akka/issues/23690
*
* For in-depth documentation of its semantics, see [[EntityRef]].
*/
def entityRefFor[A](typeName: String, entityId: String): EntityRef[A]
/**
* Java API: Create an `ActorRef`-like reference to a specific sharded entity.
* Messages sent to it will be wrapped in a [[ShardingEnvelope]] and passed to the local shard region or proxy.
*
* Messages sent through this [[EntityRef]] will be wrapped in a [[ShardingEnvelope]] including the
* here provided `entityId`.
*
* FIXME a more typed version of this API will be explored in https://github.com/akka/akka/issues/23690
*
* For in-depth documentation of its semantics, see [[EntityRef]].
*/
def getEntityRefFor[A](msgClass: Class[A], typeName: String, entityId: String): EntityRef[A]
/** The default ShardAllocationStrategy currently is [[LeastShardAllocationStrategy]] however could be changed in the future. */
def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy
}

View file

@ -0,0 +1,282 @@
/*
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.typed.cluster.sharding
import akka.actor.NoSerializationVerificationNeeded
import akka.annotation.InternalApi
import akka.cluster.sharding.{ ClusterShardingSettings UntypedShardingSettings }
import akka.cluster.singleton.{ ClusterSingletonManagerSettings UntypedClusterSingletonManagerSettings }
import akka.typed.ActorSystem
import akka.typed.cluster.{ Cluster, ClusterSingletonManagerSettings }
import com.typesafe.config.Config
import scala.concurrent.duration.FiniteDuration
object ClusterShardingSettings {
/** Scala API: Creates new cluster sharding settings object */
def apply(system: ActorSystem[_]): ClusterShardingSettings =
fromConfig(system.settings.config.getConfig("akka.cluster.sharding"))
def fromConfig(config: Config): ClusterShardingSettings = {
val untypedSettings = UntypedShardingSettings(config)
fromUntypedSettings(untypedSettings)
}
/** Java API: Creates new cluster sharding settings object */
def create(system: ActorSystem[_]): ClusterShardingSettings =
apply(system)
/** INTERNAL API: Indended only for internal use, it is not recommended to keep converting between the setting types */
private[akka] def fromUntypedSettings(untypedSettings: UntypedShardingSettings): ClusterShardingSettings = {
new ClusterShardingSettings(
role = untypedSettings.role,
rememberEntities = untypedSettings.rememberEntities,
journalPluginId = untypedSettings.journalPluginId,
snapshotPluginId = untypedSettings.snapshotPluginId,
stateStoreMode = StateStoreMode.byName(untypedSettings.stateStoreMode),
new TuningParameters(untypedSettings.tuningParameters),
new ClusterSingletonManagerSettings(
untypedSettings.coordinatorSingletonSettings.singletonName,
untypedSettings.coordinatorSingletonSettings.role,
untypedSettings.coordinatorSingletonSettings.removalMargin,
untypedSettings.coordinatorSingletonSettings.handOverRetryInterval
)
)
}
/** INTERNAL API: Indended only for internal use, it is not recommended to keep converting between the setting types */
private[akka] def toUntypedSettings(settings: ClusterShardingSettings): UntypedShardingSettings = {
new UntypedShardingSettings(
role = settings.role,
rememberEntities = settings.rememberEntities,
journalPluginId = settings.journalPluginId,
snapshotPluginId = settings.snapshotPluginId,
stateStoreMode = settings.stateStoreMode.name,
new UntypedShardingSettings.TuningParameters(
bufferSize = settings.tuningParameters.bufferSize,
coordinatorFailureBackoff = settings.tuningParameters.coordinatorFailureBackoff,
retryInterval = settings.tuningParameters.retryInterval,
handOffTimeout = settings.tuningParameters.handOffTimeout,
shardStartTimeout = settings.tuningParameters.shardStartTimeout,
shardFailureBackoff = settings.tuningParameters.shardFailureBackoff,
entityRestartBackoff = settings.tuningParameters.entityRestartBackoff,
rebalanceInterval = settings.tuningParameters.rebalanceInterval,
snapshotAfter = settings.tuningParameters.snapshotAfter,
keepNrOfBatches = settings.tuningParameters.keepNrOfBatches,
leastShardAllocationRebalanceThreshold = settings.tuningParameters.leastShardAllocationRebalanceThreshold, // TODO extract it a bit
leastShardAllocationMaxSimultaneousRebalance = settings.tuningParameters.leastShardAllocationMaxSimultaneousRebalance,
waitingForStateTimeout = settings.tuningParameters.waitingForStateTimeout,
updatingStateTimeout = settings.tuningParameters.updatingStateTimeout,
entityRecoveryStrategy = settings.tuningParameters.entityRecoveryStrategy,
entityRecoveryConstantRateStrategyFrequency = settings.tuningParameters.entityRecoveryConstantRateStrategyFrequency,
entityRecoveryConstantRateStrategyNumberOfEntities = settings.tuningParameters.entityRecoveryConstantRateStrategyNumberOfEntities
),
new UntypedClusterSingletonManagerSettings(
settings.coordinatorSingletonSettings.singletonName,
settings.coordinatorSingletonSettings.role,
settings.coordinatorSingletonSettings.removalMargin,
settings.coordinatorSingletonSettings.handOverRetryInterval
))
}
private def roleOption(role: String): Option[String] =
if (role == "" || role == null) None else Option(role)
sealed trait StateStoreMode { def name: String }
object StateStoreMode {
def byName(name: String): StateStoreMode =
if (name == StateStoreModePersistence.name) StateStoreModePersistence
else if (name == StateStoreModeDData.name) StateStoreModeDData
else throw new IllegalArgumentException("Not recognized StateStoreMode, only 'persistence' and 'ddata' are supported.")
}
final case object StateStoreModePersistence extends StateStoreMode { override def name = "persistence" }
final case object StateStoreModeDData extends StateStoreMode { override def name = "ddata" }
// generated using kaze-class
final class TuningParameters private (
val bufferSize: Int,
val coordinatorFailureBackoff: FiniteDuration,
val entityRecoveryConstantRateStrategyFrequency: FiniteDuration,
val entityRecoveryConstantRateStrategyNumberOfEntities: Int,
val entityRecoveryStrategy: String,
val entityRestartBackoff: FiniteDuration,
val handOffTimeout: FiniteDuration,
val keepNrOfBatches: Int,
val leastShardAllocationMaxSimultaneousRebalance: Int,
val leastShardAllocationRebalanceThreshold: Int,
val rebalanceInterval: FiniteDuration,
val retryInterval: FiniteDuration,
val shardFailureBackoff: FiniteDuration,
val shardStartTimeout: FiniteDuration,
val snapshotAfter: Int,
val updatingStateTimeout: FiniteDuration,
val waitingForStateTimeout: FiniteDuration) {
def this(untyped: UntypedShardingSettings.TuningParameters) {
this(
bufferSize = untyped.bufferSize,
coordinatorFailureBackoff = untyped.coordinatorFailureBackoff,
retryInterval = untyped.retryInterval,
handOffTimeout = untyped.handOffTimeout,
shardStartTimeout = untyped.shardStartTimeout,
shardFailureBackoff = untyped.shardFailureBackoff,
entityRestartBackoff = untyped.entityRestartBackoff,
rebalanceInterval = untyped.rebalanceInterval,
snapshotAfter = untyped.snapshotAfter,
keepNrOfBatches = untyped.keepNrOfBatches,
leastShardAllocationRebalanceThreshold = untyped.leastShardAllocationRebalanceThreshold, // TODO extract it a bit
leastShardAllocationMaxSimultaneousRebalance = untyped.leastShardAllocationMaxSimultaneousRebalance,
waitingForStateTimeout = untyped.waitingForStateTimeout,
updatingStateTimeout = untyped.updatingStateTimeout,
entityRecoveryStrategy = untyped.entityRecoveryStrategy,
entityRecoveryConstantRateStrategyFrequency = untyped.entityRecoveryConstantRateStrategyFrequency,
entityRecoveryConstantRateStrategyNumberOfEntities = untyped.entityRecoveryConstantRateStrategyNumberOfEntities
)
}
require(
entityRecoveryStrategy == "all" || entityRecoveryStrategy == "constant",
s"Unknown 'entity-recovery-strategy' [$entityRecoveryStrategy], valid values are 'all' or 'constant'")
def withBufferSize(value: Int): TuningParameters = copy(bufferSize = value)
def withCoordinatorFailureBackoff(value: FiniteDuration): TuningParameters = copy(coordinatorFailureBackoff = value)
def withEntityRecoveryConstantRateStrategyFrequency(value: FiniteDuration): TuningParameters = copy(entityRecoveryConstantRateStrategyFrequency = value)
def withEntityRecoveryConstantRateStrategyNumberOfEntities(value: Int): TuningParameters = copy(entityRecoveryConstantRateStrategyNumberOfEntities = value)
def withEntityRecoveryStrategy(value: java.lang.String): TuningParameters = copy(entityRecoveryStrategy = value)
def withEntityRestartBackoff(value: FiniteDuration): TuningParameters = copy(entityRestartBackoff = value)
def withHandOffTimeout(value: FiniteDuration): TuningParameters = copy(handOffTimeout = value)
def withKeepNrOfBatches(value: Int): TuningParameters = copy(keepNrOfBatches = value)
def withLeastShardAllocationMaxSimultaneousRebalance(value: Int): TuningParameters = copy(leastShardAllocationMaxSimultaneousRebalance = value)
def withLeastShardAllocationRebalanceThreshold(value: Int): TuningParameters = copy(leastShardAllocationRebalanceThreshold = value)
def withRebalanceInterval(value: FiniteDuration): TuningParameters = copy(rebalanceInterval = value)
def withRetryInterval(value: FiniteDuration): TuningParameters = copy(retryInterval = value)
def withShardFailureBackoff(value: FiniteDuration): TuningParameters = copy(shardFailureBackoff = value)
def withShardStartTimeout(value: FiniteDuration): TuningParameters = copy(shardStartTimeout = value)
def withSnapshotAfter(value: Int): TuningParameters = copy(snapshotAfter = value)
def withUpdatingStateTimeout(value: FiniteDuration): TuningParameters = copy(updatingStateTimeout = value)
def withWaitingForStateTimeout(value: FiniteDuration): TuningParameters = copy(waitingForStateTimeout = value)
private def copy(
bufferSize: Int = bufferSize,
coordinatorFailureBackoff: FiniteDuration = coordinatorFailureBackoff,
entityRecoveryConstantRateStrategyFrequency: FiniteDuration = entityRecoveryConstantRateStrategyFrequency,
entityRecoveryConstantRateStrategyNumberOfEntities: Int = entityRecoveryConstantRateStrategyNumberOfEntities,
entityRecoveryStrategy: java.lang.String = entityRecoveryStrategy,
entityRestartBackoff: FiniteDuration = entityRestartBackoff,
handOffTimeout: FiniteDuration = handOffTimeout,
keepNrOfBatches: Int = keepNrOfBatches,
leastShardAllocationMaxSimultaneousRebalance: Int = leastShardAllocationMaxSimultaneousRebalance,
leastShardAllocationRebalanceThreshold: Int = leastShardAllocationRebalanceThreshold,
rebalanceInterval: FiniteDuration = rebalanceInterval,
retryInterval: FiniteDuration = retryInterval,
shardFailureBackoff: FiniteDuration = shardFailureBackoff,
shardStartTimeout: FiniteDuration = shardStartTimeout,
snapshotAfter: Int = snapshotAfter,
updatingStateTimeout: FiniteDuration = updatingStateTimeout,
waitingForStateTimeout: FiniteDuration = waitingForStateTimeout): TuningParameters = new TuningParameters(
bufferSize = bufferSize,
coordinatorFailureBackoff = coordinatorFailureBackoff,
entityRecoveryConstantRateStrategyFrequency = entityRecoveryConstantRateStrategyFrequency,
entityRecoveryConstantRateStrategyNumberOfEntities = entityRecoveryConstantRateStrategyNumberOfEntities,
entityRecoveryStrategy = entityRecoveryStrategy,
entityRestartBackoff = entityRestartBackoff,
handOffTimeout = handOffTimeout,
keepNrOfBatches = keepNrOfBatches,
leastShardAllocationMaxSimultaneousRebalance = leastShardAllocationMaxSimultaneousRebalance,
leastShardAllocationRebalanceThreshold = leastShardAllocationRebalanceThreshold,
rebalanceInterval = rebalanceInterval,
retryInterval = retryInterval,
shardFailureBackoff = shardFailureBackoff,
shardStartTimeout = shardStartTimeout,
snapshotAfter = snapshotAfter,
updatingStateTimeout = updatingStateTimeout,
waitingForStateTimeout = waitingForStateTimeout)
override def toString =
s"""TuningParameters(${bufferSize},${coordinatorFailureBackoff},${entityRecoveryConstantRateStrategyFrequency},${entityRecoveryConstantRateStrategyNumberOfEntities},${entityRecoveryStrategy},${entityRestartBackoff},${handOffTimeout},${keepNrOfBatches},${leastShardAllocationMaxSimultaneousRebalance},${leastShardAllocationRebalanceThreshold},${rebalanceInterval},${retryInterval},${shardFailureBackoff},${shardStartTimeout},${snapshotAfter},${updatingStateTimeout},${waitingForStateTimeout})"""
}
}
/**
* @param role specifies that this entity type requires cluster nodes with a specific role.
* If the role is not specified all nodes in the cluster are used.
* @param rememberEntities true if active entity actors shall be automatically restarted upon `Shard`
* restart. i.e. if the `Shard` is started on a different `ShardRegion` due to rebalance or crash.
* @param journalPluginId Absolute path to the journal plugin configuration entity that is to
* be used for the internal persistence of ClusterSharding. If not defined the default
* journal plugin is used. Note that this is not related to persistence used by the entity
* actors.
* @param snapshotPluginId Absolute path to the snapshot plugin configuration entity that is to
* be used for the internal persistence of ClusterSharding. If not defined the default
* snapshot plugin is used. Note that this is not related to persistence used by the entity
* actors.
* @param tuningParameters additional tuning parameters, see descriptions in reference.conf
*/
final class ClusterShardingSettings(
val role: Option[String],
val rememberEntities: Boolean,
val journalPluginId: String,
val snapshotPluginId: String,
val stateStoreMode: ClusterShardingSettings.StateStoreMode,
val tuningParameters: ClusterShardingSettings.TuningParameters,
val coordinatorSingletonSettings: ClusterSingletonManagerSettings) extends NoSerializationVerificationNeeded {
import akka.typed.cluster.sharding.ClusterShardingSettings.{ StateStoreModeDData, StateStoreModePersistence }
require(
stateStoreMode == StateStoreModePersistence || stateStoreMode == StateStoreModeDData,
s"Unknown 'state-store-mode' [$stateStoreMode], " +
s"valid values are '${StateStoreModeDData.name}' or '${StateStoreModePersistence.name}'")
/** If true, this node should run the shard region, otherwise just a shard proxy should started on this node. */
@InternalApi
private[akka] def shouldHostShard(cluster: Cluster): Boolean =
role.isEmpty || cluster.selfMember.roles(role.get)
def withRole(role: String): ClusterShardingSettings = copy(role = ClusterShardingSettings.roleOption(role))
def withRole(role: Option[String]): ClusterShardingSettings = copy(role = role)
def withRememberEntities(rememberEntities: Boolean): ClusterShardingSettings =
copy(rememberEntities = rememberEntities)
def withJournalPluginId(journalPluginId: String): ClusterShardingSettings =
copy(journalPluginId = journalPluginId)
def withSnapshotPluginId(snapshotPluginId: String): ClusterShardingSettings =
copy(snapshotPluginId = snapshotPluginId)
def withTuningParameters(tuningParameters: ClusterShardingSettings.TuningParameters): ClusterShardingSettings =
copy(tuningParameters = tuningParameters)
def withStateStoreMode(stateStoreMode: ClusterShardingSettings.StateStoreMode): ClusterShardingSettings =
copy(stateStoreMode = stateStoreMode)
/**
* The `role` of the `ClusterSingletonManagerSettings` is not used. The `role` of the
* coordinator singleton will be the same as the `role` of `ClusterShardingSettings`.
*/
def withCoordinatorSingletonSettings(coordinatorSingletonSettings: ClusterSingletonManagerSettings): ClusterShardingSettings =
copy(coordinatorSingletonSettings = coordinatorSingletonSettings)
private def copy(
role: Option[String] = role,
rememberEntities: Boolean = rememberEntities,
journalPluginId: String = journalPluginId,
snapshotPluginId: String = snapshotPluginId,
stateStoreMode: ClusterShardingSettings.StateStoreMode = stateStoreMode,
tuningParameters: ClusterShardingSettings.TuningParameters = tuningParameters,
coordinatorSingletonSettings: ClusterSingletonManagerSettings = coordinatorSingletonSettings): ClusterShardingSettings =
new ClusterShardingSettings(
role,
rememberEntities,
journalPluginId,
snapshotPluginId,
stateStoreMode,
tuningParameters,
coordinatorSingletonSettings)
}

View file

@ -0,0 +1,129 @@
/*
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.typed.cluster.sharding
import akka.actor.{ InternalActorRef, Scheduler }
import akka.annotation.InternalApi
import akka.pattern.{ AskTimeoutException, PromiseActorRef }
import akka.typed.ActorRef
import akka.typed.scaladsl.AskPattern
import akka.typed.scaladsl.AskPattern.PromiseRef
import akka.util.Timeout
import scala.concurrent.Future
/**
* A reference to an sharded Entity, which allows `ActorRef`-like usage.
*
* An [[EntityRef]] is NOT an [[ActorRef]]by designin order to be explicit about the fact that the life-cycle
* of a sharded Entity is very different than a plain Actors. Most notably, this is shown by features of Entities
* such as re-balancing (an active Entity to a different node) or passivation. Both of which are aimed to be completely
* transparent to users of such Entity. In other words, if this were to be a plain ActorRef, it would be possible to
* apply DeathWatch to it, which in turn would then trigger when the sharded Actor stopped, breaking the illusion that
* Entity refs are "always there". Please note that while not encouraged, it is possible to expose an Actor's `self`
* [[ActorRef]] and watch it in case such notification is desired.
*/
trait EntityRef[A] {
/**
* Send a message to the entity referenced by this EntityRef using *at-most-once*
* messaging semantics.
*/
def tell(msg: A): Unit
/**
* Allows to "ask" the [[EntityRef]] for a reply.
* See [[akka.typed.scaladsl.AskPattern]] for a complete write-up of this pattern
*
* Example usage:
* {{{
* case class Request(msg: String, replyTo: ActorRef[Reply])
* case class Reply(msg: String)
*
* implicit val timeout = Timeout(3.seconds)
* val target: EntityRef[Request] = ...
* val f: Future[Reply] = target ? (Request("hello", _))
* }}}
*
* Please note that an implicit [[akka.util.Timeout]] and [[akka.actor.Scheduler]] must be available to use this pattern.
*/
def ask[U](f: ActorRef[U] A)(implicit timeout: Timeout, scheduler: Scheduler): Future[U]
}
object EntityRef {
implicit final class EntityRefOps[A](val ref: EntityRef[A]) extends AnyVal {
/**
* Send a message to the Actor referenced by this ActorRef using *at-most-once*
* messaging semantics.
*/
def !(msg: A): Unit = ref.tell(msg)
/**
* Allows to "ask" the [[EntityRef]] for a reply.
* See [[akka.typed.scaladsl.AskPattern]] for a complete write-up of this pattern
*
* Example usage:
* {{{
* case class Request(msg: String, replyTo: ActorRef[Reply])
* case class Reply(msg: String)
*
* implicit val timeout = Timeout(3.seconds)
* val target: EntityRef[Request] = ...
* val f: Future[Reply] = target ? (Request("hello", _))
* }}}
*
* Please note that an implicit [[akka.util.Timeout]] and [[akka.actor.Scheduler]] must be available to use this pattern.
*/
def ?[U](f: ActorRef[U] A)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] =
ref.ask(f)(timeout, scheduler)
}
}
@InternalApi
private[akka] final class AdaptedEntityRefImpl[A](shardRegion: akka.actor.ActorRef, entityId: String) extends EntityRef[A] {
import akka.pattern.ask
override def tell(msg: A): Unit =
shardRegion ! ShardingEnvelope(entityId, msg)
override def ask[U](f: (ActorRef[U]) A)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] = {
import akka.typed._
val p = new EntityPromiseRef[U](shardRegion.asInstanceOf[InternalActorRef], timeout)
val m = f(p.ref)
if (p.promiseRef ne null) p.promiseRef.messageClassName = m.getClass.getName
shardRegion ! ShardingEnvelope(entityId, m)
p.future
}
/** Similar to [[akka.typed.scaladsl.AskPattern.PromiseRef]] but for an [[EntityRef]] target. */
@InternalApi
private final class EntityPromiseRef[U](untyped: InternalActorRef, timeout: Timeout) {
import akka.typed.internal.{ adapter adapt }
// Note: _promiseRef mustn't have a type pattern, since it can be null
private[this] val (_ref: ActorRef[U], _future: Future[U], _promiseRef) =
if (untyped.isTerminated)
(
adapt.ActorRefAdapter[U](untyped.provider.deadLetters),
Future.failed[U](new AskTimeoutException(s"Recipient[$untyped] had already been terminated.")),
null)
else if (timeout.duration.length <= 0)
(
adapt.ActorRefAdapter[U](untyped.provider.deadLetters),
Future.failed[U](new IllegalArgumentException(s"Timeout length must be positive, question not sent to [$untyped]")),
null
)
else {
val a = PromiseActorRef(untyped.provider, timeout, untyped, "unknown")
val b = adapt.ActorRefAdapter[U](a)
(b, a.result.future.asInstanceOf[Future[U]], a)
}
val ref: ActorRef[U] = _ref
val future: Future[U] = _future
val promiseRef: PromiseActorRef = _promiseRef
}
}

View file

@ -12,11 +12,14 @@ import akka.actor.Scheduler
import akka.typed.internal.FunctionRef import akka.typed.internal.FunctionRef
import akka.actor.RootActorPath import akka.actor.RootActorPath
import akka.actor.Address import akka.actor.Address
import akka.annotation.InternalApi
import akka.typed.ActorRef import akka.typed.ActorRef
import akka.typed.internal.{ adapter adapt } import akka.typed.internal.{ adapter adapt }
/** /**
* The ask-pattern implements the initiator side of a requestreply protocol. * The ask-pattern implements the initiator side of a requestreply protocol.
* The `?` operator is pronounced as "ask".
*
* The party that asks may be within or without an Actor, since the * The party that asks may be within or without an Actor, since the
* implementation will fabricate a (hidden) [[ActorRef]] that is bound to a * implementation will fabricate a (hidden) [[ActorRef]] that is bound to a
* [[scala.concurrent.Promise]]. This ActorRef will need to be injected in the * [[scala.concurrent.Promise]]. This ActorRef will need to be injected in the
@ -36,6 +39,27 @@ import akka.typed.internal.{ adapter ⇒ adapt }
*/ */
object AskPattern { object AskPattern {
implicit class Askable[T](val ref: ActorRef[T]) extends AnyVal { implicit class Askable[T](val ref: ActorRef[T]) extends AnyVal {
/**
* The ask-pattern implements the initiator side of a requestreply protocol.
* The `?` operator is pronounced as "ask".
*
* The party that asks may be within or without an Actor, since the
* implementation will fabricate a (hidden) [[ActorRef]] that is bound to a
* [[scala.concurrent.Promise]]. This ActorRef will need to be injected in the
* message that is sent to the target Actor in order to function as a reply-to
* address, therefore the argument to the ask / `?`
* operator is not the message itself but a function that given the reply-to
* address will create the message.
*
* {{{
* case class Request(msg: String, replyTo: ActorRef[Reply])
* case class Reply(msg: String)
*
* implicit val timeout = Timeout(3.seconds)
* val target: ActorRef[Request] = ...
* val f: Future[Reply] = target ? (Request("hello", _))
* }}}
*/
def ?[U](f: ActorRef[U] T)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] = def ?[U](f: ActorRef[U] T)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] =
ref match { ref match {
case a: adapt.ActorRefAdapter[_] askUntyped(ref, a.untyped, timeout, f) case a: adapt.ActorRefAdapter[_] askUntyped(ref, a.untyped, timeout, f)
@ -44,7 +68,7 @@ object AskPattern {
} }
} }
private class PromiseRef[U](target: ActorRef[_], untyped: InternalActorRef, timeout: Timeout) { private final class PromiseRef[U](target: ActorRef[_], untyped: InternalActorRef, timeout: Timeout) {
// Note: _promiseRef mustn't have a type pattern, since it can be null // Note: _promiseRef mustn't have a type pattern, since it can be null
private[this] val (_ref: ActorRef[U], _future: Future[U], _promiseRef) = private[this] val (_ref: ActorRef[U], _future: Future[U], _promiseRef) =