diff --git a/akka-cluster-sharding/src/main/mima-filters/2.6.5.backwards.excludes/shard-allocation-client.excludes b/akka-cluster-sharding/src/main/mima-filters/2.6.5.backwards.excludes/shard-allocation-client.excludes new file mode 100644 index 0000000000..ca2bcab05a --- /dev/null +++ b/akka-cluster-sharding/src/main/mima-filters/2.6.5.backwards.excludes/shard-allocation-client.excludes @@ -0,0 +1,4 @@ +# Add methods to trait not for user extension +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.external.javadsl.ExternalShardAllocationClient.setShardLocations") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.external.scaladsl.ExternalShardAllocationClient.updateShardLocations") + diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/internal/ExternalShardAllocationClientImpl.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/internal/ExternalShardAllocationClientImpl.scala index 5af5f9f28c..813d5cf640 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/internal/ExternalShardAllocationClientImpl.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/internal/ExternalShardAllocationClientImpl.scala @@ -36,6 +36,7 @@ import akka.dispatch.MessageDispatcher import akka.event.Logging import akka.pattern.ask import akka.util.JavaDurationConverters._ +import akka.util.ccompat.JavaConverters._ import akka.util.PrettyDuration._ import akka.util.Timeout @@ -92,4 +93,21 @@ final private[external] class ExternalShardAllocationClientImpl(system: ActorSys } override def getShardLocations(): CompletionStage[ShardLocations] = shardLocations().toJava + + override def updateShardLocations(locations: Map[ShardId, Address]): Future[Done] = { + log.debug("updateShardLocations {} for {}", locations, Key) + (replicator ? Update(Key, LWWMap.empty[ShardId, String], WriteLocal, None) { existing => + locations.foldLeft(existing) { + case (acc, (shardId, address)) => acc.put(self, shardId, address.toString) + } + }).flatMap { + case UpdateSuccess(_, _) => Future.successful(Done) + case UpdateTimeout => + Future.failed(new ClientTimeoutException(s"Unable to update shard location after ${timeout.duration.pretty}")) + } + } + + override def setShardLocations(locations: java.util.Map[ShardId, Address]): CompletionStage[Done] = { + updateShardLocations(locations.asScala.toMap).toJava + } } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/javadsl/ExternalShardAllocationClient.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/javadsl/ExternalShardAllocationClient.scala index fd561757c3..ee8d73360f 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/javadsl/ExternalShardAllocationClient.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/javadsl/ExternalShardAllocationClient.scala @@ -28,10 +28,21 @@ trait ExternalShardAllocationClient { * * @param shard The shard identifier * @param location Location (akka node) to allocate the shard to - * @return Confirmation that the update has been propagated to a majority of cluster nodes + * @return Conformation that the update has been written to the local node */ def setShardLocation(shard: ShardId, location: Address): CompletionStage[Done] + /** + * Update all of the provided ShardLocations. + * The [[Address]] should match one of the nodes in the cluster. If the node has not joined + * the cluster yet it will be moved to that node after the first cluster + * sharding rebalance it does. + * + * @param locations to update + * @return Confirmation that the update has been written to the local node + */ + def setShardLocations(locations: java.util.Map[ShardId, Address]): CompletionStage[Done] + /** * Get all the current shard locations that have been set via setShardLocation */ diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/scaladsl/ExternalShardAllocationClient.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/scaladsl/ExternalShardAllocationClient.scala index aa9e21af03..694e03c918 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/scaladsl/ExternalShardAllocationClient.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/external/scaladsl/ExternalShardAllocationClient.scala @@ -24,7 +24,7 @@ trait ExternalShardAllocationClient { * Update the given shard's location. The [[Address]] should * match one of the nodes in the cluster. If the node has not joined * the cluster yet it will be moved to that node after the first cluster - * sharding rebalance. + * sharding rebalance it does. * * @param shard The shard identifier * @param location Location (akka node) to allocate the shard to @@ -32,6 +32,17 @@ trait ExternalShardAllocationClient { */ def updateShardLocation(shard: ShardId, location: Address): Future[Done] + /** + * Update all of the provided ShardLocations. + * The [[Address]] should match one of the nodes in the cluster. If the node has not joined + * the cluster yet it will be moved to that node after the first cluster + * sharding rebalance it does. + * + * @param locations to update + * @return Confirmation that the update has been propagates to a majority of cluster nodes + */ + def updateShardLocations(locations: Map[ShardId, Address]): Future[Done] + /** * Get all the current shard locations that have been set via updateShardLocation */ diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ExternalShardAllocationSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ExternalShardAllocationSpec.scala index dbf977d8cd..89ee7a38ee 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ExternalShardAllocationSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ExternalShardAllocationSpec.scala @@ -131,7 +131,7 @@ abstract class ExternalShardAllocationSpec val forthAddress = address(forth) runOn(second) { system.log.info("Allocating {} on {}", onForthShardId, forthAddress) - ExternalShardAllocation(system).clientFor(typeName).updateShardLocation(onForthShardId, forthAddress) + ExternalShardAllocation(system).clientFor(typeName).updateShardLocations(Map(onForthShardId -> forthAddress)) } enterBarrier("allocated-to-new-node") runOn(forth) {