Bulk update of external shard allocation (#28699)

* Apply suggestions from code review
Co-authored-by: Sean Glover <sean@seanglover.com>
This commit is contained in:
Christopher Batey 2020-05-05 16:18:44 +01:00 committed by GitHub
parent 0099ebc2b8
commit b2509efdb0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 47 additions and 3 deletions

View file

@ -0,0 +1,4 @@
# Add methods to trait not for user extension
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.external.javadsl.ExternalShardAllocationClient.setShardLocations")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.external.scaladsl.ExternalShardAllocationClient.updateShardLocations")

View file

@ -36,6 +36,7 @@ import akka.dispatch.MessageDispatcher
import akka.event.Logging
import akka.pattern.ask
import akka.util.JavaDurationConverters._
import akka.util.ccompat.JavaConverters._
import akka.util.PrettyDuration._
import akka.util.Timeout
@ -92,4 +93,21 @@ final private[external] class ExternalShardAllocationClientImpl(system: ActorSys
}
override def getShardLocations(): CompletionStage[ShardLocations] = shardLocations().toJava
override def updateShardLocations(locations: Map[ShardId, Address]): Future[Done] = {
log.debug("updateShardLocations {} for {}", locations, Key)
(replicator ? Update(Key, LWWMap.empty[ShardId, String], WriteLocal, None) { existing =>
locations.foldLeft(existing) {
case (acc, (shardId, address)) => acc.put(self, shardId, address.toString)
}
}).flatMap {
case UpdateSuccess(_, _) => Future.successful(Done)
case UpdateTimeout =>
Future.failed(new ClientTimeoutException(s"Unable to update shard location after ${timeout.duration.pretty}"))
}
}
override def setShardLocations(locations: java.util.Map[ShardId, Address]): CompletionStage[Done] = {
updateShardLocations(locations.asScala.toMap).toJava
}
}

View file

@ -28,10 +28,21 @@ trait ExternalShardAllocationClient {
*
* @param shard The shard identifier
* @param location Location (akka node) to allocate the shard to
* @return Confirmation that the update has been propagated to a majority of cluster nodes
* @return Conformation that the update has been written to the local node
*/
def setShardLocation(shard: ShardId, location: Address): CompletionStage[Done]
/**
* Update all of the provided ShardLocations.
* The [[Address]] should match one of the nodes in the cluster. If the node has not joined
* the cluster yet it will be moved to that node after the first cluster
* sharding rebalance it does.
*
* @param locations to update
* @return Confirmation that the update has been written to the local node
*/
def setShardLocations(locations: java.util.Map[ShardId, Address]): CompletionStage[Done]
/**
* Get all the current shard locations that have been set via setShardLocation
*/

View file

@ -24,7 +24,7 @@ trait ExternalShardAllocationClient {
* Update the given shard's location. The [[Address]] should
* match one of the nodes in the cluster. If the node has not joined
* the cluster yet it will be moved to that node after the first cluster
* sharding rebalance.
* sharding rebalance it does.
*
* @param shard The shard identifier
* @param location Location (akka node) to allocate the shard to
@ -32,6 +32,17 @@ trait ExternalShardAllocationClient {
*/
def updateShardLocation(shard: ShardId, location: Address): Future[Done]
/**
* Update all of the provided ShardLocations.
* The [[Address]] should match one of the nodes in the cluster. If the node has not joined
* the cluster yet it will be moved to that node after the first cluster
* sharding rebalance it does.
*
* @param locations to update
* @return Confirmation that the update has been propagates to a majority of cluster nodes
*/
def updateShardLocations(locations: Map[ShardId, Address]): Future[Done]
/**
* Get all the current shard locations that have been set via updateShardLocation
*/

View file

@ -131,7 +131,7 @@ abstract class ExternalShardAllocationSpec
val forthAddress = address(forth)
runOn(second) {
system.log.info("Allocating {} on {}", onForthShardId, forthAddress)
ExternalShardAllocation(system).clientFor(typeName).updateShardLocation(onForthShardId, forthAddress)
ExternalShardAllocation(system).clientFor(typeName).updateShardLocations(Map(onForthShardId -> forthAddress))
}
enterBarrier("allocated-to-new-node")
runOn(forth) {