diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index e85ee21932..db2e7a7885 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -16,7 +16,7 @@ import scala.concurrent.duration.FiniteDuration /** * INTERNAL API: Use `BalancingPool` instead of this dispatcher directly. - * + * * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed * that all actors using the same instance of this dispatcher can process all messages that have been sent to one of the actors. I.e. the * actors belong to a pool of actors, and to the client there is no guarantee about which actor instance actually processes a given message. diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala index 5490e17f96..4caca9a3ea 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/Replicator.scala @@ -44,6 +44,7 @@ import akka.actor.ExtendedActorSystem import akka.actor.SupervisorStrategy import akka.actor.OneForOneStrategy import akka.actor.ActorInitializationException +import java.util.concurrent.TimeUnit object ReplicatorSettings { @@ -894,30 +895,39 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog else normalReceive val load: Receive = { - case LoadData(data) ⇒ - data.foreach { - case (key, d) ⇒ - val envelope = DataEnvelope(d) - write(key, envelope) match { - case Some(newEnvelope) ⇒ - if (newEnvelope.data ne envelope.data) - durableStore ! Store(key, newEnvelope.data, None) - case None ⇒ - } - } - case LoadAllCompleted ⇒ - context.become(normalReceive) - self ! FlushChanges + val startTime = System.nanoTime() + var count = 0 - case GetReplicaCount ⇒ - // 0 until durable data has been loaded, used by test - sender() ! ReplicaCount(0) + { + case LoadData(data) ⇒ + count += data.size + data.foreach { + case (key, d) ⇒ + val envelope = DataEnvelope(d) + write(key, envelope) match { + case Some(newEnvelope) ⇒ + if (newEnvelope.data ne envelope.data) + durableStore ! Store(key, newEnvelope.data, None) + case None ⇒ + } + } + case LoadAllCompleted ⇒ + log.debug( + "Loading {} entries from durable store took {} ms", + count, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)) + context.become(normalReceive) + self ! FlushChanges - case RemovedNodePruningTick | FlushChanges | GossipTick ⇒ - // ignore scheduled ticks when loading durable data - case m @ (_: Read | _: Write | _: Status | _: Gossip) ⇒ - // ignore gossip and replication when loading durable data - log.debug("ignoring message [{}] when loading durable data", m.getClass.getName) + case GetReplicaCount ⇒ + // 0 until durable data has been loaded, used by test + sender() ! ReplicaCount(0) + + case RemovedNodePruningTick | FlushChanges | GossipTick ⇒ + // ignore scheduled ticks when loading durable data + case m @ (_: Read | _: Write | _: Status | _: Gossip) ⇒ + // ignore gossip and replication when loading durable data + log.debug("ignoring message [{}] when loading durable data", m.getClass.getName) + } } val normalReceive: Receive = { diff --git a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurableDataSpec.scala b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurableDataSpec.scala index 656cbbd65d..2d3ae3bcf9 100644 --- a/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurableDataSpec.scala +++ b/akka-distributed-data/src/multi-jvm/scala/akka/cluster/ddata/DurableDataSpec.scala @@ -112,7 +112,7 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig) runOn(first) { val r = newReplicator() - within(5.seconds) { + within(10.seconds) { awaitAssert { r ! GetReplicaCount expectMsg(ReplicaCount(1)) @@ -158,7 +158,7 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig) join(second, first) val r = newReplicator() - within(5.seconds) { + within(10.seconds) { awaitAssert { r ! GetReplicaCount expectMsg(ReplicaCount(2)) @@ -247,7 +247,7 @@ abstract class DurableDataSpec(multiNodeConfig: DurableDataSpecConfig) new TestKit(sys1) with ImplicitSender { val r = newReplicator(sys1) - within(5.seconds) { + within(10.seconds) { awaitAssert { r ! GetReplicaCount expectMsg(ReplicaCount(1))