diff --git a/akka-contrib/docs/cluster-sharding.rst b/akka-contrib/docs/cluster-sharding.rst index 8e799607aa..924f70a673 100644 --- a/akka-contrib/docs/cluster-sharding.rst +++ b/akka-contrib/docs/cluster-sharding.rst @@ -35,7 +35,7 @@ its state if it is valuable. When using the sharding extension you are first, typically at system startup on each node in the cluster, supposed to register the supported entry types with the ``ClusterSharding.start`` -method. +method. ``ClusterSharding.start`` gives you the reference which you can pass along. .. includecode:: @contribSrc@/src/test/java/akka/contrib/pattern/ClusterShardingTest.java#counter-start @@ -79,7 +79,7 @@ its state if it is valuable. When using the sharding extension you are first, typically at system startup on each node in the cluster, supposed to register the supported entry types with the ``ClusterSharding.start`` -method. +method. ``ClusterSharding.start`` gives you the reference which you can pass along. .. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala#counter-start diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala index 292e22d976..18db79b17a 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala @@ -215,18 +215,20 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * that passed the `idExtractor` will be used * @param allocationStrategy possibility to use a custom shard allocation and * rebalancing logic + * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard */ def start( typeName: String, entryProps: Option[Props], idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver, - allocationStrategy: ShardAllocationStrategy): Unit = { + allocationStrategy: ShardAllocationStrategy): ActorRef = { implicit val timeout = system.settings.CreationTimeout val startMsg = Start(typeName, entryProps, idExtractor, shardResolver, allocationStrategy) val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration) regions.put(typeName, shardRegion) + shardRegion } /** @@ -250,12 +252,13 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * be `unhandled`, i.e. posted as `Unhandled` messages on the event stream * @param shardResolver function to determine the shard id for an incoming message, only messages * that passed the `idExtractor` will be used + * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard */ def start( typeName: String, entryProps: Option[Props], idExtractor: ShardRegion.IdExtractor, - shardResolver: ShardRegion.ShardResolver): Unit = { + shardResolver: ShardRegion.ShardResolver): ActorRef = { start(typeName, entryProps, idExtractor, shardResolver, new LeastShardAllocationStrategy(LeastShardAllocationRebalanceThreshold, LeastShardAllocationMaxSimultaneousRebalance)) @@ -278,12 +281,13 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * entry from the incoming message * @param allocationStrategy possibility to use a custom shard allocation and * rebalancing logic + * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard */ def start( typeName: String, entryProps: Props, messageExtractor: ShardRegion.MessageExtractor, - allocationStrategy: ShardAllocationStrategy): Unit = { + allocationStrategy: ShardAllocationStrategy): ActorRef = { start(typeName, entryProps = Option(entryProps), idExtractor = { @@ -312,11 +316,12 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * entry actors itself * @param messageExtractor functions to extract the entry id, shard id, and the message to send to the * entry from the incoming message + * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard */ def start( typeName: String, entryProps: Props, - messageExtractor: ShardRegion.MessageExtractor): Unit = { + messageExtractor: ShardRegion.MessageExtractor): ActorRef = { start(typeName, entryProps, messageExtractor, new LeastShardAllocationStrategy(LeastShardAllocationRebalanceThreshold, LeastShardAllocationMaxSimultaneousRebalance)) diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala index ddbf424804..a4aa2b2522 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala @@ -11,7 +11,6 @@ import akka.actor.Identify import akka.actor.PoisonPill import akka.actor.Props import akka.cluster.Cluster -import akka.cluster.ClusterEvent._ import akka.persistence.EventsourcedProcessor import akka.persistence.Persistence import akka.persistence.journal.leveldb.SharedLeveldbJournal @@ -460,7 +459,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult "easy to use with extensions" in within(50.seconds) { runOn(third, fourth, fifth, sixth) { //#counter-start - ClusterSharding(system).start( + val counterRegion: ActorRef = ClusterSharding(system).start( typeName = "Counter", entryProps = Some(Props[Counter]), idExtractor = idExtractor, @@ -502,6 +501,22 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult } enterBarrier("after-9") + + } + "easy API for starting" in within(50.seconds) { + runOn(first) { + val counterRegionViaStart: ActorRef = ClusterSharding(system).start( + typeName = "ApiTest", + entryProps = Some(Props[Counter]), + idExtractor = idExtractor, + shardResolver = shardResolver) + + val counterRegionViaGet: ActorRef = ClusterSharding(system).shardRegion("ApiTest") + + counterRegionViaStart should equal(counterRegionViaGet) + } + enterBarrier("after-10") + } } diff --git a/akka-contrib/src/test/java/akka/contrib/pattern/ClusterShardingTest.java b/akka-contrib/src/test/java/akka/contrib/pattern/ClusterShardingTest.java index c2b0408753..0147d978c0 100644 --- a/akka-contrib/src/test/java/akka/contrib/pattern/ClusterShardingTest.java +++ b/akka-contrib/src/test/java/akka/contrib/pattern/ClusterShardingTest.java @@ -4,7 +4,6 @@ package akka.contrib.pattern; -import java.util.Iterator; import java.util.concurrent.TimeUnit; import scala.concurrent.duration.Duration; @@ -64,8 +63,8 @@ public class ClusterShardingTest { //#counter-extractor //#counter-start - ClusterSharding.get(system).start("Counter", Props.create(Counter.class), - messageExtractor); + ActorRef startedCounterRegion = ClusterSharding.get(system).start("Counter", Props.create(Counter.class), + messageExtractor); //#counter-start //#counter-usage