harden DurableDataSpec, #22037
This commit is contained in:
parent
eeda6cc2c5
commit
38a133ece0
3 changed files with 36 additions and 26 deletions
|
|
@ -16,7 +16,7 @@ import scala.concurrent.duration.FiniteDuration
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API: Use `BalancingPool` instead of this dispatcher directly.
|
* INTERNAL API: Use `BalancingPool` instead of this dispatcher directly.
|
||||||
*
|
*
|
||||||
* An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
|
* An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
|
||||||
* that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors. I.e. the
|
* that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors. I.e. the
|
||||||
* actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message.
|
* actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message.
|
||||||
|
|
|
||||||
|
|
@ -44,6 +44,7 @@ import akka.actor.ExtendedActorSystem
|
||||||
import akka.actor.SupervisorStrategy
|
import akka.actor.SupervisorStrategy
|
||||||
import akka.actor.OneForOneStrategy
|
import akka.actor.OneForOneStrategy
|
||||||
import akka.actor.ActorInitializationException
|
import akka.actor.ActorInitializationException
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
object ReplicatorSettings {
|
object ReplicatorSettings {
|
||||||
|
|
||||||
|
|
@ -894,30 +895,39 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
||||||
else normalReceive
|
else normalReceive
|
||||||
|
|
||||||
val load: Receive = {
|
val load: Receive = {
|
||||||
case LoadData(data) ⇒
|
val startTime = System.nanoTime()
|
||||||
data.foreach {
|
var count = 0
|
||||||
case (key, d) ⇒
|
|
||||||
val envelope = DataEnvelope(d)
|
|
||||||
write(key, envelope) match {
|
|
||||||
case Some(newEnvelope) ⇒
|
|
||||||
if (newEnvelope.data ne envelope.data)
|
|
||||||
durableStore ! Store(key, newEnvelope.data, None)
|
|
||||||
case None ⇒
|
|
||||||
}
|
|
||||||
}
|
|
||||||
case LoadAllCompleted ⇒
|
|
||||||
context.become(normalReceive)
|
|
||||||
self ! FlushChanges
|
|
||||||
|
|
||||||
case GetReplicaCount ⇒
|
{
|
||||||
// 0 until durable data has been loaded, used by test
|
case LoadData(data) ⇒
|
||||||
sender() ! ReplicaCount(0)
|
count += data.size
|
||||||
|
data.foreach {
|
||||||
|
case (key, d) ⇒
|
||||||
|
val envelope = DataEnvelope(d)
|
||||||
|
write(key, envelope) match {
|
||||||
|
case Some(newEnvelope) ⇒
|
||||||
|
if (newEnvelope.data ne envelope.data)
|
||||||
|
durableStore ! Store(key, newEnvelope.data, None)
|
||||||
|
case None ⇒
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case LoadAllCompleted ⇒
|
||||||
|
log.debug(
|
||||||
|
"Loading {} entries from durable store took {} ms",
|
||||||
|
count, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime))
|
||||||
|
context.become(normalReceive)
|
||||||
|
self ! FlushChanges
|
||||||
|
|
||||||
case RemovedNodePruningTick | FlushChanges | GossipTick ⇒
|
case GetReplicaCount ⇒
|
||||||
// ignore scheduled ticks when loading durable data
|
// 0 until durable data has been loaded, used by test
|
||||||
case m @ (_: Read | _: Write | _: Status | _: Gossip) ⇒
|
sender() ! ReplicaCount(0)
|
||||||
// ignore gossip and replication when loading durable data
|
|
||||||
log.debug("ignoring message [{}] when loading durable data", m.getClass.getName)
|
case RemovedNodePruningTick | FlushChanges | GossipTick ⇒
|
||||||
|
// ignore scheduled ticks when loading durable data
|
||||||
|
case m @ (_: Read | _: Write | _: Status | _: Gossip) ⇒
|
||||||
|
// ignore gossip and replication when loading durable data
|
||||||
|
log.debug("ignoring message [{}] when loading durable data", m.getClass.getName)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
val normalReceive: Receive = {
|
val normalReceive: Receive = {
|
||||||
|
|
|
||||||
|
|
@ -112,7 +112,7 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig)
|
||||||
runOn(first) {
|
runOn(first) {
|
||||||
|
|
||||||
val r = newReplicator()
|
val r = newReplicator()
|
||||||
within(5.seconds) {
|
within(10.seconds) {
|
||||||
awaitAssert {
|
awaitAssert {
|
||||||
r ! GetReplicaCount
|
r ! GetReplicaCount
|
||||||
expectMsg(ReplicaCount(1))
|
expectMsg(ReplicaCount(1))
|
||||||
|
|
@ -158,7 +158,7 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig)
|
||||||
join(second, first)
|
join(second, first)
|
||||||
|
|
||||||
val r = newReplicator()
|
val r = newReplicator()
|
||||||
within(5.seconds) {
|
within(10.seconds) {
|
||||||
awaitAssert {
|
awaitAssert {
|
||||||
r ! GetReplicaCount
|
r ! GetReplicaCount
|
||||||
expectMsg(ReplicaCount(2))
|
expectMsg(ReplicaCount(2))
|
||||||
|
|
@ -247,7 +247,7 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig)
|
||||||
new TestKit(sys1) with ImplicitSender {
|
new TestKit(sys1) with ImplicitSender {
|
||||||
|
|
||||||
val r = newReplicator(sys1)
|
val r = newReplicator(sys1)
|
||||||
within(5.seconds) {
|
within(10.seconds) {
|
||||||
awaitAssert {
|
awaitAssert {
|
||||||
r ! GetReplicaCount
|
r ! GetReplicaCount
|
||||||
expectMsg(ReplicaCount(1))
|
expectMsg(ReplicaCount(1))
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue