rename ClusterSharding start to init, #25711 (#25867)

* rename ClusterSharding start to init, #25711

* fix test
This commit is contained in:
Patrik Nordwall 2018-11-05 18:25:18 +01:00
parent 9c1153b1a6
commit 4d115f19a6
14 changed files with 53 additions and 57 deletions

View file

@ -133,7 +133,7 @@ import akka.util.Timeout
private val shardCommandActors: ConcurrentHashMap[String, ActorRef[scaladsl.ClusterSharding.ShardCommand]] = new ConcurrentHashMap
// scaladsl impl
override def start[M, E](entity: scaladsl.Entity[M, E]): ActorRef[E] = {
override def init[M, E](entity: scaladsl.Entity[M, E]): ActorRef[E] = {
val settings = entity.settings match {
case None ClusterShardingSettings(system)
case Some(s) s
@ -144,14 +144,14 @@ import akka.util.Timeout
case Some(e) e
}).asInstanceOf[ShardingMessageExtractor[E, M]]
internalStart(entity.createBehavior, entity.entityProps, entity.typeKey,
internalInit(entity.createBehavior, entity.entityProps, entity.typeKey,
entity.stopMessage, settings, extractor, entity.allocationStrategy)
}
// javadsl impl
override def start[M, E](entity: javadsl.Entity[M, E]): ActorRef[E] = {
override def init[M, E](entity: javadsl.Entity[M, E]): ActorRef[E] = {
import scala.compat.java8.OptionConverters._
start(new scaladsl.Entity(
init(new scaladsl.Entity(
createBehavior = (ctx: EntityContext) Behaviors.setup[M] { actorContext
entity.createBehavior(
new javadsl.EntityContext[M](ctx.entityId, ctx.shard, actorContext.asJava))
@ -165,7 +165,7 @@ import akka.util.Timeout
))
}
private def internalStart[M, E](
private def internalInit[M, E](
behavior: EntityContext Behavior[M],
entityProps: Props,
typeKey: scaladsl.EntityTypeKey[M],
@ -241,7 +241,7 @@ import akka.util.Timeout
typeNames.putIfAbsent(typeKey.name, messageClassName) match {
case spawnedMessageClassName: String if messageClassName != spawnedMessageClassName
throw new IllegalArgumentException(s"[${typeKey.name}] already started for [$spawnedMessageClassName]")
throw new IllegalArgumentException(s"[${typeKey.name}] already initialized for [$spawnedMessageClassName]")
case _ ()
}

View file

@ -7,7 +7,6 @@ package javadsl
import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.function.BiFunction
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
@ -72,7 +71,7 @@ object ClusterSharding {
* to route the message with the entity id to the final destination.
*
* This extension is supposed to be used by first, typically at system startup on each node
* in the cluster, registering the supported entity types with the [[ClusterSharding#spawn]]
* in the cluster, registering the supported entity types with the [[ClusterSharding#init]]
* method, which returns the `ShardRegion` actor reference for a named entity type.
* Messages to the entities are always sent via that `ActorRef`, i.e. the local `ShardRegion`.
* Messages can also be sent via the [[EntityRef]] retrieved with [[ClusterSharding#entityRefFor]],
@ -176,7 +175,7 @@ abstract class ClusterSharding {
* @tparam M The type of message the entity accepts
* @tparam E A possible envelope around the message the entity accepts
*/
def start[M, E](entity: Entity[M, E]): ActorRef[E]
def init[M, E](entity: Entity[M, E]): ActorRef[E]
/**
* Create an `ActorRef`-like reference to a specific sharded entity.
@ -199,7 +198,7 @@ abstract class ClusterSharding {
object Entity {
/**
* Defines how the entity should be created. Used in [[ClusterSharding#start]]. More optional
* Defines how the entity should be created. Used in [[ClusterSharding#init]]. More optional
* settings can be defined using the `with` methods of the returned [[Entity]].
*
* Any [[Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent actors
@ -217,7 +216,7 @@ object Entity {
}
/**
* Defines how the [[PersistentEntity]] should be created. Used in [[ClusterSharding#start]]. Any [[Behavior]] can
* Defines how the [[PersistentEntity]] should be created. Used in [[ClusterSharding#init]]. Any [[Behavior]] can
* be used as a sharded entity actor, but the combination of sharding and persistent actors is very common
* and therefore this factory is provided as convenience.
*
@ -246,7 +245,7 @@ object Entity {
}
/**
* Defines how the entity should be created. Used in [[ClusterSharding#start]].
* Defines how the entity should be created. Used in [[ClusterSharding#init]].
*/
final class Entity[M, E] private[akka] (
val createBehavior: JFunction[EntityContext[M], Behavior[M]],

View file

@ -19,7 +19,6 @@ import akka.actor.typed.ExtensionSetup
import akka.actor.typed.RecipientRef
import akka.actor.typed.Props
import akka.actor.typed.internal.InternalRecipientRef
import akka.actor.typed.scaladsl.ActorContext
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
@ -27,7 +26,6 @@ import akka.cluster.sharding.typed.internal.ClusterShardingImpl
import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl
import akka.cluster.sharding.ShardRegion.{ StartEntity UntypedStartEntity }
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.PersistentBehavior
object ClusterSharding extends ExtensionId[ClusterSharding] {
@ -74,7 +72,7 @@ object ClusterSharding extends ExtensionId[ClusterSharding] {
* to route the message with the entity id to the final destination.
*
* This extension is supposed to be used by first, typically at system startup on each node
* in the cluster, registering the supported entity types with the [[ClusterSharding#spawn]]
* in the cluster, registering the supported entity types with the [[ClusterSharding#init]]
* method, which returns the `ShardRegion` actor reference for a named entity type.
* Messages to the entities are always sent via that `ActorRef`, i.e. the local `ShardRegion`.
* Messages can also be sent via the [[EntityRef]] retrieved with [[ClusterSharding#entityRefFor]],
@ -178,7 +176,7 @@ trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding
* @tparam M The type of message the entity accepts
* @tparam E A possible envelope around the message the entity accepts
*/
def start[M, E](entity: Entity[M, E]): ActorRef[E]
def init[M, E](entity: Entity[M, E]): ActorRef[E]
/**
* Create an `ActorRef`-like reference to a specific sharded entity.
@ -206,12 +204,12 @@ trait ClusterSharding extends Extension { javadslSelf: javadsl.ClusterSharding
object Entity {
/**
* Defines how the entity should be created. Used in [[ClusterSharding#start]]. More optional
* Defines how the entity should be created. Used in [[ClusterSharding#init]]. More optional
* settings can be defined using the `with` methods of the returned [[Entity]].
*
* Any [[Behavior]] can be used as a sharded entity actor, but the combination of sharding and persistent actors
* is very common and therefore [[PersistentEntity]] is provided as a convenience for creating such
* [[PersistentBehavior]].
* `PersistentBehavior`.
*
* @param typeKey A key that uniquely identifies the type of entity in this cluster
* @param createBehavior Create the behavior for an entity given a [[EntityContext]] (includes entityId)
@ -224,7 +222,7 @@ object Entity {
}
/**
* Defines how the entity should be created. Used in [[ClusterSharding#start]].
* Defines how the entity should be created. Used in [[ClusterSharding#init]].
*/
final class Entity[M, E] private[akka] (
val createBehavior: EntityContext Behavior[M],

View file

@ -67,14 +67,14 @@ abstract class MultiDcClusterShardingSpec extends MultiNodeSpec(MultiDcClusterSh
formCluster(first, second, third, fourth)
}
"start sharding" in {
"init sharding" in {
val sharding = ClusterSharding(typedSystem)
val shardRegion: ActorRef[ShardingEnvelope[PingProtocol]] = sharding.start(
val shardRegion: ActorRef[ShardingEnvelope[PingProtocol]] = sharding.init(
Entity(typeKey, _ multiDcPinger))
val probe = TestProbe[Pong]
shardRegion ! ShardingEnvelope(entityId, Ping(probe.ref))
probe.expectMessage(max = 10.seconds, Pong(cluster.selfMember.dataCenter))
enterBarrier("sharding-started")
enterBarrier("sharding-initialized")
}
"be able to message via entity ref" in {
@ -96,7 +96,7 @@ abstract class MultiDcClusterShardingSpec extends MultiNodeSpec(MultiDcClusterSh
"be able to message cross dc via proxy" in {
runOn(first, second) {
val proxy: ActorRef[ShardingEnvelope[PingProtocol]] = ClusterSharding(typedSystem).start(
val proxy: ActorRef[ShardingEnvelope[PingProtocol]] = ClusterSharding(typedSystem).init(
Entity(
typeKey,
_ multiDcPinger)

View file

@ -21,7 +21,6 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
import static org.junit.Assert.assertEquals;
@ -135,7 +134,7 @@ public class ClusterShardingPersistenceTest extends JUnitSuite {
ClusterSharding sharding = ClusterSharding.get(testKit.system());
sharding.start(Entity.ofPersistentEntity(TestPersistentEntity.ENTITY_TYPE_KEY,
sharding.init(Entity.ofPersistentEntity(TestPersistentEntity.ENTITY_TYPE_KEY,
entityContext -> new TestPersistentEntity(entityContext.getEntityId()))
.withStopMessage(StopPlz.INSTANCE));

View file

@ -42,7 +42,7 @@ public class HelloWorldPersistentEntityExample {
this.system = system;
sharding = ClusterSharding.get(system);
sharding.start(
sharding.init(
Entity.ofPersistentEntity(
HelloWorld.ENTITY_TYPE_KEY,
ctx -> new HelloWorld(ctx.getActorContext(), ctx.getEntityId())));

View file

@ -41,7 +41,7 @@ public class HelloWorldPersistentEntityExampleTest extends JUnitSuite {
cluster.manager().tell(new Join(cluster.selfMember().address()));
ClusterSharding sharding = ClusterSharding.get(testKit.system());
sharding.start(
sharding.init(
Entity.ofPersistentEntity(
HelloWorld.ENTITY_TYPE_KEY,
ctx -> new HelloWorld(ctx.getActorContext(), ctx.getEntityId())));

View file

@ -89,22 +89,22 @@ public class ShardingCompileOnlyTest {
}
//#counter-passivate
public static void startPassivateExample() {
public static void initPassivateExample() {
ActorSystem system = ActorSystem.create(
Behaviors.empty(), "ShardingExample"
);
ClusterSharding sharding = ClusterSharding.get(system);
//#counter-passivate-start
//#counter-passivate-init
EntityTypeKey<CounterCommand> typeKey = EntityTypeKey.create(CounterCommand.class, "Counter");
sharding.start(
sharding.init(
Entity.of(
typeKey,
ctx -> counter2(ctx.getShard(), ctx.getEntityId()))
.withStopMessage(new GoodByeCounter()));
//#counter-passivate-start
//#counter-passivate-init
}
public static void example() {
@ -117,14 +117,14 @@ public class ShardingCompileOnlyTest {
ClusterSharding sharding = ClusterSharding.get(system);
//#sharding-extension
//#start
//#init
EntityTypeKey<CounterCommand> typeKey = EntityTypeKey.create(CounterCommand.class, "Counter");
ActorRef<ShardingEnvelope<CounterCommand>> shardRegion = sharding.start(
ActorRef<ShardingEnvelope<CounterCommand>> shardRegion = sharding.init(
Entity.of(
typeKey,
ctx -> counter(ctx.getEntityId(),0)));
//#start
//#init
//#send
EntityRef<CounterCommand> counterOne = sharding.entityRefFor(typeKey, "counter-`");
@ -143,7 +143,7 @@ public class ShardingCompileOnlyTest {
//#persistence
EntityTypeKey<BlogCommand> blogTypeKey = EntityTypeKey.create(BlogCommand.class, "BlogPost");
sharding.start(
sharding.init(
Entity.of(
blogTypeKey,
ctx -> BlogBehavior.behavior(ctx.getEntityId())));

View file

@ -139,7 +139,7 @@ class ClusterShardingPersistenceSpec extends ScalaTestWithActorTestKit(ClusterSh
"Typed cluster sharding with persistent actor" must {
ClusterSharding(system).start(Entity(
ClusterSharding(system).init(Entity(
typeKey,
ctx persistentEntity(ctx.entityId, ctx.shard)))

View file

@ -188,17 +188,17 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
Behaviors.same
}
private val shardingRef1: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.start(Entity(
private val shardingRef1: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.init(Entity(
typeKey,
ctx behavior(ctx.shard))
.withStopMessage(StopPlz()))
private val shardingRef2 = sharding2.start(Entity(
private val shardingRef2 = sharding2.init(Entity(
typeKey,
ctx behavior(ctx.shard))
.withStopMessage(StopPlz()))
private val shardingRef3: ActorRef[IdTestProtocol] = sharding.start(Entity(
private val shardingRef3: ActorRef[IdTestProtocol] = sharding.init(Entity(
typeKey2,
ctx behaviorWithId(ctx.shard))
.withMessageExtractor(ShardingMessageExtractor.noEnvelope[IdTestProtocol](10, IdStopPlz()) {
@ -209,7 +209,7 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
.withStopMessage(IdStopPlz())
)
private val shardingRef4 = sharding2.start(Entity(
private val shardingRef4 = sharding2.init(Entity(
typeKey2,
ctx behaviorWithId(ctx.shard))
.withMessageExtractor(
@ -268,7 +268,7 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
val p = TestProbe[String]()
val typeKey3 = EntityTypeKey[TestProtocol]("passivate-test")
val shardingRef3: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.start(Entity(
val shardingRef3: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.init(Entity(
typeKey3,
ctx behavior(ctx.shard, Some(stopProbe.ref)))
.withStopMessage(StopPlz()))
@ -289,7 +289,7 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
val p = TestProbe[String]()
val typeKey4 = EntityTypeKey[TestProtocol]("passivate-test-poison")
val shardingRef4: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.start(Entity(
val shardingRef4: ActorRef[ShardingEnvelope[TestProtocol]] = sharding.init(Entity(
typeKey4,
ctx behavior(ctx.shard, Some(stopProbe.ref))))
// no StopPlz stopMessage
@ -305,16 +305,16 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
p.expectMessage("Hello!")
}
"fail if starting sharding for already used typeName, but with a different type" in {
// sharding has been already started with EntityTypeKey[TestProtocol]("envelope-shard")
"fail if init sharding for already used typeName, but with a different type" in {
// sharding has been already initialized with EntityTypeKey[TestProtocol]("envelope-shard")
val ex = intercept[Exception] {
sharding.start(Entity(
sharding.init(Entity(
EntityTypeKey[IdTestProtocol]("envelope-shard"),
ctx behaviorWithId(ctx.shard))
.withStopMessage(IdStopPlz()))
}
ex.getMessage should include("already started")
ex.getMessage should include("already initialized")
}
"EntityRef - tell" in {
@ -374,7 +374,7 @@ class ClusterShardingSpec extends ScalaTestWithActorTestKit(ClusterShardingSpec.
"EntityRef - AskTimeoutException" in {
val ignorantKey = EntityTypeKey[TestProtocol]("ignorant")
sharding.start(Entity(
sharding.init(Entity(
ignorantKey,
_ Behaviors.ignore[TestProtocol])
.withStopMessage(StopPlz()))

View file

@ -23,7 +23,7 @@ object HelloWorldPersistentEntityExample {
// registration at startup
private val sharding = ClusterSharding(system)
sharding.start(Entity(
sharding.init(Entity(
typeKey = HelloWorld.entityTypeKey,
createBehavior = entityContext HelloWorld.persistentEntity(entityContext.entityId)))

View file

@ -35,7 +35,7 @@ class HelloWorldPersistentEntityExampleSpec extends ScalaTestWithActorTestKit(He
super.beforeAll()
Cluster(system).manager ! Join(Cluster(system).selfMember.address)
sharding.start(Entity(
sharding.init(Entity(
HelloWorld.entityTypeKey,
ctx HelloWorld.persistentEntity(ctx.entityId)))
}

View file

@ -43,13 +43,13 @@ object ShardingCompileOnlySpec {
}
//#counter
//#start
//#init
val TypeKey = EntityTypeKey[CounterCommand]("Counter")
val shardRegion: ActorRef[ShardingEnvelope[CounterCommand]] = sharding.start(Entity(
val shardRegion: ActorRef[ShardingEnvelope[CounterCommand]] = sharding.init(Entity(
typeKey = TypeKey,
createBehavior = ctx counter(ctx.entityId, 0)))
//#start
//#init
//#send
// With an EntityRef
@ -64,7 +64,7 @@ object ShardingCompileOnlySpec {
//#persistence
val BlogTypeKey = EntityTypeKey[BlogCommand]("BlogPost")
ClusterSharding(system).start(Entity(
ClusterSharding(system).init(Entity(
typeKey = BlogTypeKey,
createBehavior = ctx behavior(ctx.entityId)))
//#persistence
@ -98,7 +98,7 @@ object ShardingCompileOnlySpec {
}
}
sharding.start(Entity(
sharding.init(Entity(
typeKey = TypeKey,
createBehavior = ctx counter2(ctx.shard, ctx.entityId))
.withStopMessage(GoodByeCounter))

View file

@ -45,10 +45,10 @@ Java
Each Entity type has a key that is then used to retrieve an EntityRef for a given entity identifier.
Scala
: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #start }
: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #init }
Java
: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #start }
: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #init }
Messages to a specific entity are then sent via an EntityRef.
It is also possible to wrap methods in a `ShardingEnvelop` or define extractor functions and send messages directly to the shard region.
@ -115,7 +115,7 @@ Scala
: @@snip [ShardingCompileOnlySpec.scala](/akka-cluster-sharding-typed/src/test/scala/docs/akka/cluster/sharding/typed/ShardingCompileOnlySpec.scala) { #counter-messages #counter-passivate }
Java
: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #counter-messages #counter-passivate #counter-passivate-start }
: @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #counter-messages #counter-passivate #counter-passivate-init }
Note that in the above example the `stopMessage` is specified as `GoodByeCounter`. That message will be sent to
the entity when it's supposed to stop itself due to rebalance or passivation. If the `stopMessage` is not defined