fix wrong log in receiveDeltaPropagation
This commit is contained in:
parent
9855e2896f
commit
dc070e4e1c
1 changed files with 59 additions and 59 deletions
|
|
@ -123,29 +123,29 @@ object ReplicatorSettings {
|
||||||
* in the `Set`.
|
* in the `Set`.
|
||||||
*/
|
*/
|
||||||
final class ReplicatorSettings(
|
final class ReplicatorSettings(
|
||||||
val role: Option[String],
|
val role: Option[String],
|
||||||
val gossipInterval: FiniteDuration,
|
val gossipInterval: FiniteDuration,
|
||||||
val notifySubscribersInterval: FiniteDuration,
|
val notifySubscribersInterval: FiniteDuration,
|
||||||
val maxDeltaElements: Int,
|
val maxDeltaElements: Int,
|
||||||
val dispatcher: String,
|
val dispatcher: String,
|
||||||
val pruningInterval: FiniteDuration,
|
val pruningInterval: FiniteDuration,
|
||||||
val maxPruningDissemination: FiniteDuration,
|
val maxPruningDissemination: FiniteDuration,
|
||||||
val durableStoreProps: Either[(String, Config), Props],
|
val durableStoreProps: Either[(String, Config), Props],
|
||||||
val durableKeys: Set[KeyId],
|
val durableKeys: Set[KeyId],
|
||||||
val pruningMarkerTimeToLive: FiniteDuration,
|
val pruningMarkerTimeToLive: FiniteDuration,
|
||||||
val durablePruningMarkerTimeToLive: FiniteDuration,
|
val durablePruningMarkerTimeToLive: FiniteDuration,
|
||||||
val deltaCrdtEnabled: Boolean) {
|
val deltaCrdtEnabled: Boolean) {
|
||||||
|
|
||||||
// For backwards compatibility
|
// For backwards compatibility
|
||||||
def this(role: Option[String], gossipInterval: FiniteDuration, notifySubscribersInterval: FiniteDuration,
|
def this(role: Option[String], gossipInterval: FiniteDuration, notifySubscribersInterval: FiniteDuration,
|
||||||
maxDeltaElements: Int, dispatcher: String, pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration) =
|
maxDeltaElements: Int, dispatcher: String, pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration) =
|
||||||
this(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval,
|
this(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval,
|
||||||
maxPruningDissemination, Right(Props.empty), Set.empty, 6.hours, 10.days, true)
|
maxPruningDissemination, Right(Props.empty), Set.empty, 6.hours, 10.days, true)
|
||||||
|
|
||||||
// For backwards compatibility
|
// For backwards compatibility
|
||||||
def this(role: Option[String], gossipInterval: FiniteDuration, notifySubscribersInterval: FiniteDuration,
|
def this(role: Option[String], gossipInterval: FiniteDuration, notifySubscribersInterval: FiniteDuration,
|
||||||
maxDeltaElements: Int, dispatcher: String, pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration,
|
maxDeltaElements: Int, dispatcher: String, pruningInterval: FiniteDuration, maxPruningDissemination: FiniteDuration,
|
||||||
durableStoreProps: Either[(String, Config), Props], durableKeys: Set[String]) =
|
durableStoreProps: Either[(String, Config), Props], durableKeys: Set[String]) =
|
||||||
this(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval,
|
this(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher, pruningInterval,
|
||||||
maxPruningDissemination, durableStoreProps, durableKeys, 6.hours, 10.days, true)
|
maxPruningDissemination, durableStoreProps, durableKeys, 6.hours, 10.days, true)
|
||||||
|
|
||||||
|
|
@ -174,7 +174,7 @@ final class ReplicatorSettings(
|
||||||
copy(pruningInterval = pruningInterval, maxPruningDissemination = maxPruningDissemination)
|
copy(pruningInterval = pruningInterval, maxPruningDissemination = maxPruningDissemination)
|
||||||
|
|
||||||
def withPruningMarkerTimeToLive(
|
def withPruningMarkerTimeToLive(
|
||||||
pruningMarkerTimeToLive: FiniteDuration,
|
pruningMarkerTimeToLive: FiniteDuration,
|
||||||
durablePruningMarkerTimeToLive: FiniteDuration): ReplicatorSettings =
|
durablePruningMarkerTimeToLive: FiniteDuration): ReplicatorSettings =
|
||||||
copy(
|
copy(
|
||||||
pruningMarkerTimeToLive = pruningMarkerTimeToLive,
|
pruningMarkerTimeToLive = pruningMarkerTimeToLive,
|
||||||
|
|
@ -201,18 +201,18 @@ final class ReplicatorSettings(
|
||||||
copy(deltaCrdtEnabled = deltaCrdtEnabled)
|
copy(deltaCrdtEnabled = deltaCrdtEnabled)
|
||||||
|
|
||||||
private def copy(
|
private def copy(
|
||||||
role: Option[String] = role,
|
role: Option[String] = role,
|
||||||
gossipInterval: FiniteDuration = gossipInterval,
|
gossipInterval: FiniteDuration = gossipInterval,
|
||||||
notifySubscribersInterval: FiniteDuration = notifySubscribersInterval,
|
notifySubscribersInterval: FiniteDuration = notifySubscribersInterval,
|
||||||
maxDeltaElements: Int = maxDeltaElements,
|
maxDeltaElements: Int = maxDeltaElements,
|
||||||
dispatcher: String = dispatcher,
|
dispatcher: String = dispatcher,
|
||||||
pruningInterval: FiniteDuration = pruningInterval,
|
pruningInterval: FiniteDuration = pruningInterval,
|
||||||
maxPruningDissemination: FiniteDuration = maxPruningDissemination,
|
maxPruningDissemination: FiniteDuration = maxPruningDissemination,
|
||||||
durableStoreProps: Either[(String, Config), Props] = durableStoreProps,
|
durableStoreProps: Either[(String, Config), Props] = durableStoreProps,
|
||||||
durableKeys: Set[KeyId] = durableKeys,
|
durableKeys: Set[KeyId] = durableKeys,
|
||||||
pruningMarkerTimeToLive: FiniteDuration = pruningMarkerTimeToLive,
|
pruningMarkerTimeToLive: FiniteDuration = pruningMarkerTimeToLive,
|
||||||
durablePruningMarkerTimeToLive: FiniteDuration = durablePruningMarkerTimeToLive,
|
durablePruningMarkerTimeToLive: FiniteDuration = durablePruningMarkerTimeToLive,
|
||||||
deltaCrdtEnabled: Boolean = deltaCrdtEnabled): ReplicatorSettings =
|
deltaCrdtEnabled: Boolean = deltaCrdtEnabled): ReplicatorSettings =
|
||||||
new ReplicatorSettings(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher,
|
new ReplicatorSettings(role, gossipInterval, notifySubscribersInterval, maxDeltaElements, dispatcher,
|
||||||
pruningInterval, maxPruningDissemination, durableStoreProps, durableKeys,
|
pruningInterval, maxPruningDissemination, durableStoreProps, durableKeys,
|
||||||
pruningMarkerTimeToLive, durablePruningMarkerTimeToLive, deltaCrdtEnabled)
|
pruningMarkerTimeToLive, durablePruningMarkerTimeToLive, deltaCrdtEnabled)
|
||||||
|
|
@ -436,7 +436,7 @@ object Replicator {
|
||||||
* for example not access `sender()` reference of an enclosing actor.
|
* for example not access `sender()` reference of an enclosing actor.
|
||||||
*/
|
*/
|
||||||
final case class Update[A <: ReplicatedData](key: Key[A], writeConsistency: WriteConsistency,
|
final case class Update[A <: ReplicatedData](key: Key[A], writeConsistency: WriteConsistency,
|
||||||
request: Option[Any])(val modify: Option[A] ⇒ A)
|
request: Option[Any])(val modify: Option[A] ⇒ A)
|
||||||
extends Command[A] with NoSerializationVerificationNeeded {
|
extends Command[A] with NoSerializationVerificationNeeded {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -606,9 +606,9 @@ object Replicator {
|
||||||
* The `DataEnvelope` wraps a data entry and carries state of the pruning process for the entry.
|
* The `DataEnvelope` wraps a data entry and carries state of the pruning process for the entry.
|
||||||
*/
|
*/
|
||||||
final case class DataEnvelope(
|
final case class DataEnvelope(
|
||||||
data: ReplicatedData,
|
data: ReplicatedData,
|
||||||
pruning: Map[UniqueAddress, PruningState] = Map.empty,
|
pruning: Map[UniqueAddress, PruningState] = Map.empty,
|
||||||
deltaVersions: VersionVector = VersionVector.empty)
|
deltaVersions: VersionVector = VersionVector.empty)
|
||||||
extends ReplicatorMessage {
|
extends ReplicatorMessage {
|
||||||
|
|
||||||
import PruningState._
|
import PruningState._
|
||||||
|
|
@ -1227,7 +1227,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
||||||
def isLocalSender(): Boolean = !replyTo.path.address.hasGlobalScope
|
def isLocalSender(): Boolean = !replyTo.path.address.hasGlobalScope
|
||||||
|
|
||||||
def receiveUpdate(key: KeyR, modify: Option[ReplicatedData] ⇒ ReplicatedData,
|
def receiveUpdate(key: KeyR, modify: Option[ReplicatedData] ⇒ ReplicatedData,
|
||||||
writeConsistency: WriteConsistency, req: Option[Any]): Unit = {
|
writeConsistency: WriteConsistency, req: Option[Any]): Unit = {
|
||||||
val localValue = getData(key.id)
|
val localValue = getData(key.id)
|
||||||
|
|
||||||
def deltaOrPlaceholder(d: DeltaReplicatedData): Option[ReplicatedDelta] = {
|
def deltaOrPlaceholder(d: DeltaReplicatedData): Option[ReplicatedDelta] = {
|
||||||
|
|
@ -1542,7 +1542,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
||||||
case NonFatal(e) ⇒
|
case NonFatal(e) ⇒
|
||||||
// catching in case we need to support rolling upgrades that are
|
// catching in case we need to support rolling upgrades that are
|
||||||
// mixing nodes with incompatible delta-CRDT types
|
// mixing nodes with incompatible delta-CRDT types
|
||||||
log.warning("Couldn't process DeltaPropagation from [] due to {}", fromNode, e)
|
log.warning("Couldn't process DeltaPropagation from [{}] due to {}", fromNode, e)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// !deltaCrdtEnabled
|
// !deltaCrdtEnabled
|
||||||
|
|
@ -1880,15 +1880,15 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
||||||
*/
|
*/
|
||||||
@InternalApi private[akka] object WriteAggregator {
|
@InternalApi private[akka] object WriteAggregator {
|
||||||
def props(
|
def props(
|
||||||
key: KeyR,
|
key: KeyR,
|
||||||
envelope: Replicator.Internal.DataEnvelope,
|
envelope: Replicator.Internal.DataEnvelope,
|
||||||
delta: Option[Replicator.Internal.Delta],
|
delta: Option[Replicator.Internal.Delta],
|
||||||
consistency: Replicator.WriteConsistency,
|
consistency: Replicator.WriteConsistency,
|
||||||
req: Option[Any],
|
req: Option[Any],
|
||||||
nodes: Set[Address],
|
nodes: Set[Address],
|
||||||
unreachable: Set[Address],
|
unreachable: Set[Address],
|
||||||
replyTo: ActorRef,
|
replyTo: ActorRef,
|
||||||
durable: Boolean): Props =
|
durable: Boolean): Props =
|
||||||
Props(new WriteAggregator(key, envelope, delta, consistency, req, nodes, unreachable, replyTo, durable))
|
Props(new WriteAggregator(key, envelope, delta, consistency, req, nodes, unreachable, replyTo, durable))
|
||||||
.withDeploy(Deploy.local)
|
.withDeploy(Deploy.local)
|
||||||
}
|
}
|
||||||
|
|
@ -1897,15 +1897,15 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
@InternalApi private[akka] class WriteAggregator(
|
@InternalApi private[akka] class WriteAggregator(
|
||||||
key: KeyR,
|
key: KeyR,
|
||||||
envelope: Replicator.Internal.DataEnvelope,
|
envelope: Replicator.Internal.DataEnvelope,
|
||||||
delta: Option[Replicator.Internal.Delta],
|
delta: Option[Replicator.Internal.Delta],
|
||||||
consistency: Replicator.WriteConsistency,
|
consistency: Replicator.WriteConsistency,
|
||||||
req: Option[Any],
|
req: Option[Any],
|
||||||
override val nodes: Set[Address],
|
override val nodes: Set[Address],
|
||||||
override val unreachable: Set[Address],
|
override val unreachable: Set[Address],
|
||||||
replyTo: ActorRef,
|
replyTo: ActorRef,
|
||||||
durable: Boolean) extends ReadWriteAggregator {
|
durable: Boolean) extends ReadWriteAggregator {
|
||||||
|
|
||||||
import Replicator._
|
import Replicator._
|
||||||
import Replicator.Internal._
|
import Replicator.Internal._
|
||||||
|
|
@ -2009,13 +2009,13 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
||||||
*/
|
*/
|
||||||
@InternalApi private[akka] object ReadAggregator {
|
@InternalApi private[akka] object ReadAggregator {
|
||||||
def props(
|
def props(
|
||||||
key: KeyR,
|
key: KeyR,
|
||||||
consistency: Replicator.ReadConsistency,
|
consistency: Replicator.ReadConsistency,
|
||||||
req: Option[Any],
|
req: Option[Any],
|
||||||
nodes: Set[Address],
|
nodes: Set[Address],
|
||||||
unreachable: Set[Address],
|
unreachable: Set[Address],
|
||||||
localValue: Option[Replicator.Internal.DataEnvelope],
|
localValue: Option[Replicator.Internal.DataEnvelope],
|
||||||
replyTo: ActorRef): Props =
|
replyTo: ActorRef): Props =
|
||||||
Props(new ReadAggregator(key, consistency, req, nodes, unreachable, localValue, replyTo))
|
Props(new ReadAggregator(key, consistency, req, nodes, unreachable, localValue, replyTo))
|
||||||
.withDeploy(Deploy.local)
|
.withDeploy(Deploy.local)
|
||||||
|
|
||||||
|
|
@ -2025,13 +2025,13 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
@InternalApi private[akka] class ReadAggregator(
|
@InternalApi private[akka] class ReadAggregator(
|
||||||
key: KeyR,
|
key: KeyR,
|
||||||
consistency: Replicator.ReadConsistency,
|
consistency: Replicator.ReadConsistency,
|
||||||
req: Option[Any],
|
req: Option[Any],
|
||||||
override val nodes: Set[Address],
|
override val nodes: Set[Address],
|
||||||
override val unreachable: Set[Address],
|
override val unreachable: Set[Address],
|
||||||
localValue: Option[Replicator.Internal.DataEnvelope],
|
localValue: Option[Replicator.Internal.DataEnvelope],
|
||||||
replyTo: ActorRef) extends ReadWriteAggregator {
|
replyTo: ActorRef) extends ReadWriteAggregator {
|
||||||
|
|
||||||
import Replicator._
|
import Replicator._
|
||||||
import Replicator.Internal._
|
import Replicator.Internal._
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue