External sharding client handle empty DData key (#28668)
This commit is contained in:
parent
e487088a27
commit
f1dbb79b71
2 changed files with 13 additions and 0 deletions
|
|
@ -17,6 +17,7 @@ import akka.cluster.ddata.LWWMap
|
||||||
import akka.cluster.ddata.Replicator.Get
|
import akka.cluster.ddata.Replicator.Get
|
||||||
import akka.cluster.ddata.Replicator.GetFailure
|
import akka.cluster.ddata.Replicator.GetFailure
|
||||||
import akka.cluster.ddata.Replicator.GetSuccess
|
import akka.cluster.ddata.Replicator.GetSuccess
|
||||||
|
import akka.cluster.ddata.Replicator.NotFound
|
||||||
import akka.cluster.ddata.Replicator.ReadMajority
|
import akka.cluster.ddata.Replicator.ReadMajority
|
||||||
import akka.cluster.ddata.Replicator.Update
|
import akka.cluster.ddata.Replicator.Update
|
||||||
import akka.cluster.ddata.Replicator.UpdateSuccess
|
import akka.cluster.ddata.Replicator.UpdateSuccess
|
||||||
|
|
@ -79,6 +80,8 @@ final private[external] class ExternalShardAllocationClientImpl(system: ActorSys
|
||||||
case success @ GetSuccess(`Key`, _) =>
|
case success @ GetSuccess(`Key`, _) =>
|
||||||
Future.successful(
|
Future.successful(
|
||||||
success.get(Key).entries.transform((_, asStr) => ShardLocation(AddressFromURIString(asStr))))
|
success.get(Key).entries.transform((_, asStr) => ShardLocation(AddressFromURIString(asStr))))
|
||||||
|
case NotFound(_, _) =>
|
||||||
|
Future.successful(Map.empty[ShardId, ShardLocation])
|
||||||
case GetFailure(_, _) =>
|
case GetFailure(_, _) =>
|
||||||
Future.failed((new ClientTimeoutException(s"Unable to get shard locations after ${timeout.duration.pretty}")))
|
Future.failed((new ClientTimeoutException(s"Unable to get shard locations after ${timeout.duration.pretty}")))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,16 @@ class ExternalShardAllocationStrategySpec extends AkkaSpec("""
|
||||||
|
|
||||||
val requester = TestProbe()
|
val requester = TestProbe()
|
||||||
|
|
||||||
|
"ExternalShardAllocationClient" must {
|
||||||
|
"default to no locations if sharding never started" in {
|
||||||
|
ExternalShardAllocation(system)
|
||||||
|
.clientFor("not found")
|
||||||
|
.shardLocations()
|
||||||
|
.futureValue
|
||||||
|
.locations shouldEqual Map.empty
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
"ExternalShardAllocation allocate" must {
|
"ExternalShardAllocation allocate" must {
|
||||||
"default to requester if query times out" in {
|
"default to requester if query times out" in {
|
||||||
val (strat, _) = createStrategy()
|
val (strat, _) = createStrategy()
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue