Merge branch 'master' into feature-active-active-event-sourcing
This commit is contained in:
commit
7bf12721c1
48 changed files with 935 additions and 688 deletions
|
|
@ -1,5 +1,9 @@
|
|||
pullRequests.frequency = "@monthly"
|
||||
|
||||
updates.pin = [
|
||||
{ groupId = "com.fasterxml.jackson.core", artifactId = "jackson-databind", version = "2.10." }
|
||||
]
|
||||
|
||||
updates.ignore = [
|
||||
{ groupId = "com.google.protobuf", artifactId = "protobuf-java" },
|
||||
{ groupId = "org.scalameta", artifactId = "scalafmt-core" },
|
||||
|
|
|
|||
|
|
@ -283,6 +283,7 @@ class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender with Eventually
|
|||
}
|
||||
|
||||
"stop restarting the child after reaching maxNrOfRetries limit (Backoff.onStop)" in {
|
||||
val supervisorWatcher = TestProbe()
|
||||
val supervisor = create(onStopOptions(maxNrOfRetries = 2))
|
||||
def waitForChild: Option[ActorRef] = {
|
||||
eventually(timeout(1.second), interval(50.millis)) {
|
||||
|
|
@ -295,7 +296,7 @@ class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender with Eventually
|
|||
expectMsgType[BackoffSupervisor.CurrentChild].ref
|
||||
}
|
||||
|
||||
watch(supervisor)
|
||||
supervisorWatcher.watch(supervisor)
|
||||
|
||||
supervisor ! BackoffSupervisor.GetRestartCount
|
||||
expectMsg(BackoffSupervisor.RestartCount(0))
|
||||
|
|
@ -327,11 +328,12 @@ class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender with Eventually
|
|||
watch(c3)
|
||||
c3 ! PoisonPill
|
||||
expectTerminated(c3)
|
||||
expectTerminated(supervisor)
|
||||
supervisorWatcher.expectTerminated(supervisor)
|
||||
}
|
||||
|
||||
"stop restarting the child after reaching maxNrOfRetries limit (Backoff.onFailure)" in {
|
||||
filterException[TestException] {
|
||||
val supervisorWatcher = TestProbe()
|
||||
val supervisor = create(onFailureOptions(maxNrOfRetries = 2))
|
||||
|
||||
def waitForChild: Option[ActorRef] = {
|
||||
|
|
@ -345,7 +347,7 @@ class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender with Eventually
|
|||
expectMsgType[BackoffSupervisor.CurrentChild].ref
|
||||
}
|
||||
|
||||
watch(supervisor)
|
||||
supervisorWatcher.watch(supervisor)
|
||||
|
||||
supervisor ! BackoffSupervisor.GetRestartCount
|
||||
expectMsg(BackoffSupervisor.RestartCount(0))
|
||||
|
|
@ -377,26 +379,25 @@ class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender with Eventually
|
|||
awaitAssert(c3 should !==(c2))
|
||||
watch(c3)
|
||||
c3 ! "boom"
|
||||
withClue("Expected child and supervisor to terminate") {
|
||||
Set(expectMsgType[Terminated].actor, expectMsgType[Terminated].actor) shouldEqual Set(c3, supervisor)
|
||||
}
|
||||
|
||||
expectTerminated(c3)
|
||||
supervisorWatcher.expectTerminated(supervisor)
|
||||
}
|
||||
}
|
||||
|
||||
"stop restarting the child if final stop message received (Backoff.onStop)" in {
|
||||
val stopMessage = "stop"
|
||||
val supervisorWatcher = TestProbe()
|
||||
val supervisor: ActorRef = create(onStopOptions(maxNrOfRetries = 100).withFinalStopMessage(_ == stopMessage))
|
||||
supervisor ! BackoffSupervisor.GetCurrentChild
|
||||
val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get
|
||||
watch(c1)
|
||||
watch(supervisor)
|
||||
supervisorWatcher.watch(supervisor)
|
||||
|
||||
supervisor ! stopMessage
|
||||
expectMsg("stop")
|
||||
c1 ! PoisonPill
|
||||
expectTerminated(c1)
|
||||
expectTerminated(supervisor)
|
||||
supervisorWatcher.expectTerminated(supervisor)
|
||||
}
|
||||
|
||||
"supervisor must not stop when final stop message has not been received" in {
|
||||
|
|
@ -406,7 +407,6 @@ class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender with Eventually
|
|||
supervisor ! BackoffSupervisor.GetCurrentChild
|
||||
val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get
|
||||
watch(c1)
|
||||
watch(supervisor)
|
||||
supervisorWatcher.watch(supervisor)
|
||||
|
||||
c1 ! PoisonPill
|
||||
|
|
@ -417,7 +417,7 @@ class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender with Eventually
|
|||
supervisorWatcher.expectNoMessage(20.millis) // supervisor must not terminate
|
||||
|
||||
supervisor ! stopMessage
|
||||
expectTerminated(supervisor)
|
||||
supervisorWatcher.expectTerminated(supervisor)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -318,7 +318,7 @@ import scala.util.Success
|
|||
case OptionVal.Some(t) =>
|
||||
throw new IllegalStateException(
|
||||
s"Invalid access by thread from the outside of $self. " +
|
||||
s"Current message is processed by $t, but also accessed from from ${Thread.currentThread()}.")
|
||||
s"Current message is processed by $t, but also accessed from ${Thread.currentThread()}.")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,2 @@
|
|||
# #29490 Add withRole to ShardedDaemonProcessSettings, internal constructor
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.typed.ShardedDaemonProcessSettings.this")
|
||||
|
|
@ -33,7 +33,7 @@ object ShardedDaemonProcessSettings {
|
|||
def fromConfig(config: Config): ShardedDaemonProcessSettings = {
|
||||
val keepAliveInterval = config.getDuration("keep-alive-interval").asScala
|
||||
|
||||
new ShardedDaemonProcessSettings(keepAliveInterval, None)
|
||||
new ShardedDaemonProcessSettings(keepAliveInterval, None, None)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -44,7 +44,8 @@ object ShardedDaemonProcessSettings {
|
|||
@ApiMayChange
|
||||
final class ShardedDaemonProcessSettings @InternalApi private[akka] (
|
||||
val keepAliveInterval: FiniteDuration,
|
||||
val shardingSettings: Option[ClusterShardingSettings]) {
|
||||
val shardingSettings: Option[ClusterShardingSettings],
|
||||
val role: Option[String]) {
|
||||
|
||||
/**
|
||||
* Scala API: The interval each parent of the sharded set is pinged from each node in the cluster.
|
||||
|
|
@ -52,7 +53,7 @@ final class ShardedDaemonProcessSettings @InternalApi private[akka] (
|
|||
* Note: How the sharded set is kept alive may change in the future meaning this setting may go away.
|
||||
*/
|
||||
def withKeepAliveInterval(keepAliveInterval: FiniteDuration): ShardedDaemonProcessSettings =
|
||||
new ShardedDaemonProcessSettings(keepAliveInterval, shardingSettings)
|
||||
copy(keepAliveInterval = keepAliveInterval)
|
||||
|
||||
/**
|
||||
* Java API: The interval each parent of the sharded set is pinged from each node in the cluster.
|
||||
|
|
@ -60,7 +61,7 @@ final class ShardedDaemonProcessSettings @InternalApi private[akka] (
|
|||
* Note: How the sharded set is kept alive may change in the future meaning this setting may go away.
|
||||
*/
|
||||
def withKeepAliveInterval(keepAliveInterval: Duration): ShardedDaemonProcessSettings =
|
||||
new ShardedDaemonProcessSettings(keepAliveInterval.asScala, shardingSettings)
|
||||
copy(keepAliveInterval = keepAliveInterval.asScala)
|
||||
|
||||
/**
|
||||
* Specify sharding settings that should be used for the sharded daemon process instead of loading from config.
|
||||
|
|
@ -68,5 +69,20 @@ final class ShardedDaemonProcessSettings @InternalApi private[akka] (
|
|||
* changing those settings will be ignored.
|
||||
*/
|
||||
def withShardingSettings(shardingSettings: ClusterShardingSettings): ShardedDaemonProcessSettings =
|
||||
new ShardedDaemonProcessSettings(keepAliveInterval, Some(shardingSettings))
|
||||
copy(shardingSettings = Option(shardingSettings))
|
||||
|
||||
/**
|
||||
* Specifies that the ShardedDaemonProcess should run on nodes with a specific role.
|
||||
* If the role is not specified all nodes in the cluster are used. If the given role does
|
||||
* not match the role of the current node the the ShardedDaemonProcess will not be started.
|
||||
*/
|
||||
def withRole(role: String): ShardedDaemonProcessSettings =
|
||||
copy(role = Option(role))
|
||||
|
||||
private def copy(
|
||||
keepAliveInterval: FiniteDuration = keepAliveInterval,
|
||||
shardingSettings: Option[ClusterShardingSettings] = shardingSettings,
|
||||
role: Option[String] = role): ShardedDaemonProcessSettings =
|
||||
new ShardedDaemonProcessSettings(keepAliveInterval, shardingSettings, role)
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,6 +29,8 @@ import akka.cluster.sharding.typed.scaladsl.Entity
|
|||
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
|
||||
import akka.cluster.sharding.typed.scaladsl.StartEntity
|
||||
import akka.cluster.typed.Cluster
|
||||
import akka.cluster.typed.SelfUp
|
||||
import akka.cluster.typed.Subscribe
|
||||
import akka.util.PrettyDuration
|
||||
|
||||
/**
|
||||
|
|
@ -39,7 +41,8 @@ private[akka] object ShardedDaemonProcessImpl {
|
|||
|
||||
object KeepAlivePinger {
|
||||
sealed trait Event
|
||||
case object Tick extends Event
|
||||
private case object Tick extends Event
|
||||
private case object StartTick extends Event
|
||||
|
||||
def apply[T](
|
||||
settings: ShardedDaemonProcessSettings,
|
||||
|
|
@ -47,19 +50,22 @@ private[akka] object ShardedDaemonProcessImpl {
|
|||
identities: Set[EntityId],
|
||||
shardingRef: ActorRef[ShardingEnvelope[T]]): Behavior[Event] =
|
||||
Behaviors.setup { context =>
|
||||
Cluster(context.system).subscriptions ! Subscribe(
|
||||
context.messageAdapter[SelfUp](_ => StartTick),
|
||||
classOf[SelfUp])
|
||||
Behaviors.withTimers { timers =>
|
||||
def triggerStartAll(): Unit = {
|
||||
identities.foreach(id => shardingRef ! StartEntity(id))
|
||||
}
|
||||
|
||||
context.log.debug2(
|
||||
s"Starting Sharded Daemon Process KeepAlivePinger for [{}], with ping interval [{}]",
|
||||
name,
|
||||
PrettyDuration.format(settings.keepAliveInterval))
|
||||
timers.startTimerWithFixedDelay(Tick, settings.keepAliveInterval)
|
||||
triggerStartAll()
|
||||
|
||||
Behaviors.receiveMessage {
|
||||
case StartTick =>
|
||||
triggerStartAll()
|
||||
context.log.debug2(
|
||||
s"Starting Sharded Daemon Process KeepAlivePinger for [{}], with ping interval [{}]",
|
||||
name,
|
||||
PrettyDuration.format(settings.keepAliveInterval))
|
||||
timers.startTimerWithFixedDelay(Tick, settings.keepAliveInterval)
|
||||
Behaviors.same
|
||||
case Tick =>
|
||||
triggerStartAll()
|
||||
context.log.debug("Periodic ping sent to [{}] processes", identities.size)
|
||||
|
|
@ -116,7 +122,7 @@ private[akka] final class ShardedDaemonProcessImpl(system: ActorSystem[_])
|
|||
val shardingBaseSettings =
|
||||
settings.shardingSettings match {
|
||||
case None =>
|
||||
// defaults in akka.cluster.sharding but allow overrides specifically for actor-set
|
||||
// defaults in akka.cluster.sharding but allow overrides specifically for sharded-daemon-process
|
||||
ClusterShardingSettings.fromConfig(
|
||||
system.settings.config.getConfig("akka.cluster.sharded-daemon-process.sharding"))
|
||||
case Some(shardingSettings) => shardingSettings
|
||||
|
|
@ -124,7 +130,7 @@ private[akka] final class ShardedDaemonProcessImpl(system: ActorSystem[_])
|
|||
|
||||
new ClusterShardingSettings(
|
||||
numberOfShards,
|
||||
shardingBaseSettings.role,
|
||||
if (settings.role.isDefined) settings.role else shardingBaseSettings.role,
|
||||
shardingBaseSettings.dataCenter,
|
||||
false, // remember entities disabled
|
||||
"",
|
||||
|
|
|
|||
|
|
@ -92,7 +92,7 @@ abstract class ShardedDaemonProcessSpec
|
|||
|
||||
"init actor set" in {
|
||||
ShardedDaemonProcess(typedSystem).init("the-fearless", 4, id => ProcessActor(id))
|
||||
enterBarrier("actor-set-initialized")
|
||||
enterBarrier("sharded-daemon-process-initialized")
|
||||
runOn(first) {
|
||||
val startedIds = (0 to 3).map { _ =>
|
||||
val event = probe.expectMessageType[ProcessActorEvent](5.seconds)
|
||||
|
|
@ -101,7 +101,7 @@ abstract class ShardedDaemonProcessSpec
|
|||
}.toSet
|
||||
startedIds.size should ===(4)
|
||||
}
|
||||
enterBarrier("actor-set-started")
|
||||
enterBarrier("sharded-daemon-process-started")
|
||||
}
|
||||
|
||||
// FIXME test removing one cluster node and verify all are alive (how do we do that?)
|
||||
|
|
|
|||
|
|
@ -15,7 +15,6 @@ import akka.actor.typed.ActorRef
|
|||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.cluster.MemberStatus
|
||||
import akka.cluster.sharding.typed.ClusterShardingSettings
|
||||
import akka.cluster.sharding.typed.ShardedDaemonProcessSettings
|
||||
import akka.cluster.typed.Cluster
|
||||
import akka.cluster.typed.Join
|
||||
|
|
@ -96,8 +95,7 @@ class ShardedDaemonProcessSpec
|
|||
|
||||
"not run if the role does not match node role" in {
|
||||
val probe = createTestProbe[Any]()
|
||||
val settings =
|
||||
ShardedDaemonProcessSettings(system).withShardingSettings(ClusterShardingSettings(system).withRole("workers"))
|
||||
val settings = ShardedDaemonProcessSettings(system).withRole("workers")
|
||||
ShardedDaemonProcess(system).init("roles", 3, id => MyActor(id, probe.ref), settings, None)
|
||||
|
||||
probe.expectNoMessage()
|
||||
|
|
|
|||
|
|
@ -774,7 +774,8 @@ private[akka] class Shard(
|
|||
private def startEntity(entityId: EntityId, ackTo: Option[ActorRef]): Unit = {
|
||||
entities.entityState(entityId) match {
|
||||
case Active(_) =>
|
||||
log.debug("Request to start entity [{}] (Already started)", entityId)
|
||||
if (verboseDebug)
|
||||
log.debug("Request to start entity [{}] (Already started)", entityId)
|
||||
touchLastMessageTimestamp(entityId)
|
||||
ackTo.foreach(_ ! ShardRegion.StartEntityAck(entityId, shardId))
|
||||
case _: RememberingStart =>
|
||||
|
|
|
|||
|
|
@ -448,6 +448,10 @@ object ShardCoordinator {
|
|||
*/
|
||||
private final case class RebalanceResult(shards: Set[ShardId])
|
||||
|
||||
private[akka] object RebalanceWorker {
|
||||
final case class ShardRegionTerminated(region: ActorRef)
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API. Rebalancing process is performed by this actor.
|
||||
* It sends `BeginHandOff` to all `ShardRegion` actors followed by
|
||||
|
|
@ -467,7 +471,6 @@ object ShardCoordinator {
|
|||
import Internal._
|
||||
|
||||
regions.foreach { region =>
|
||||
context.watch(region)
|
||||
region ! BeginHandOff(shard)
|
||||
}
|
||||
var remaining = regions
|
||||
|
|
@ -480,7 +483,7 @@ object ShardCoordinator {
|
|||
case BeginHandOffAck(`shard`) =>
|
||||
log.debug("BeginHandOffAck for shard [{}] received from [{}].", shard, sender())
|
||||
acked(sender())
|
||||
case Terminated(shardRegion) =>
|
||||
case ShardRegionTerminated(shardRegion) =>
|
||||
log.debug("ShardRegion [{}] terminated while waiting for BeginHandOffAck for shard [{}].", shardRegion, shard)
|
||||
acked(shardRegion)
|
||||
case ReceiveTimeout =>
|
||||
|
|
@ -489,7 +492,6 @@ object ShardCoordinator {
|
|||
}
|
||||
|
||||
private def acked(shardRegion: ActorRef) = {
|
||||
context.unwatch(shardRegion)
|
||||
remaining -= shardRegion
|
||||
if (remaining.isEmpty) {
|
||||
log.debug("All shard regions acked, handing off shard [{}].", shard)
|
||||
|
|
@ -547,6 +549,7 @@ abstract class ShardCoordinator(
|
|||
var state = State.empty.withRememberEntities(settings.rememberEntities)
|
||||
// rebalanceInProgress for the ShardId keys, pending GetShardHome requests by the ActorRef values
|
||||
var rebalanceInProgress = Map.empty[ShardId, Set[ActorRef]]
|
||||
var rebalanceWorkers: Set[ActorRef] = Set.empty
|
||||
var unAckedHostShards = Map.empty[ShardId, Cancellable]
|
||||
// regions that have requested handoff, for graceful shutdown
|
||||
var gracefulShutdownInProgress = Set.empty[ActorRef]
|
||||
|
|
@ -687,6 +690,7 @@ abstract class ShardCoordinator(
|
|||
continueRebalance(shards)
|
||||
|
||||
case RebalanceDone(shard, ok) =>
|
||||
rebalanceWorkers -= sender()
|
||||
if (ok)
|
||||
log.debug("Rebalance shard [{}] completed successfully.", shard)
|
||||
else
|
||||
|
|
@ -887,7 +891,8 @@ abstract class ShardCoordinator(
|
|||
}
|
||||
}
|
||||
|
||||
def regionTerminated(ref: ActorRef): Unit =
|
||||
def regionTerminated(ref: ActorRef): Unit = {
|
||||
rebalanceWorkers.foreach(_ ! RebalanceWorker.ShardRegionTerminated(ref))
|
||||
if (state.regions.contains(ref)) {
|
||||
log.debug("ShardRegion terminated: [{}]", ref)
|
||||
regionTerminationInProgress += ref
|
||||
|
|
@ -903,6 +908,7 @@ abstract class ShardCoordinator(
|
|||
allocateShardHomesForRememberEntities()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def regionProxyTerminated(ref: ActorRef): Unit =
|
||||
if (state.regionProxies.contains(ref)) {
|
||||
|
|
@ -974,7 +980,7 @@ abstract class ShardCoordinator(
|
|||
case Some(rebalanceFromRegion) =>
|
||||
rebalanceInProgress = rebalanceInProgress.updated(shard, Set.empty)
|
||||
log.debug("Rebalance shard [{}] from [{}]", shard, rebalanceFromRegion)
|
||||
context.actorOf(
|
||||
rebalanceWorkers += context.actorOf(
|
||||
rebalanceWorkerProps(
|
||||
shard,
|
||||
rebalanceFromRegion,
|
||||
|
|
@ -1150,8 +1156,8 @@ private[akka] class DDataShardCoordinator(
|
|||
case additional => ReadMajorityPlus(settings.tuningParameters.waitingForStateTimeout, majorityMinCap, additional)
|
||||
}
|
||||
private val stateWriteConsistency = settings.tuningParameters.coordinatorStateWriteMajorityPlus match {
|
||||
case Int.MaxValue => WriteAll(settings.tuningParameters.waitingForStateTimeout)
|
||||
case additional => WriteMajorityPlus(settings.tuningParameters.waitingForStateTimeout, majorityMinCap, additional)
|
||||
case Int.MaxValue => WriteAll(settings.tuningParameters.updatingStateTimeout)
|
||||
case additional => WriteMajorityPlus(settings.tuningParameters.updatingStateTimeout, majorityMinCap, additional)
|
||||
}
|
||||
|
||||
implicit val node: Cluster = Cluster(context.system)
|
||||
|
|
|
|||
|
|
@ -95,7 +95,7 @@ private[akka] final class DDataRememberEntitiesShardStore(
|
|||
|
||||
if (log.isDebugEnabled) {
|
||||
log.debug(
|
||||
"Starting up DDataRememberEntitiesStore, write timeout: [{}], read timeout: [{}], majority min cap: [{}]",
|
||||
"Starting up DDataRememberEntitiesStore, read timeout: [{}], write timeout: [{}], majority min cap: [{}]",
|
||||
settings.tuningParameters.waitingForStateTimeout.pretty,
|
||||
settings.tuningParameters.updatingStateTimeout.pretty,
|
||||
majorityMinCap)
|
||||
|
|
|
|||
|
|
@ -284,8 +284,9 @@ class RandomizedSplitBrainResolverIntegrationSpec
|
|||
val side1 = nodes.take(1 + random.nextInt(nodes.size - 1))
|
||||
val side2 = nodes.drop(side1.size)
|
||||
|
||||
val numberOfFlaky = random.nextInt(5)
|
||||
val healLastFlay = numberOfFlaky > 0 && random.nextBoolean()
|
||||
// The test is limited to one flaky step, see issue #29185.
|
||||
val numberOfFlaky = if (cleanSplit) 0 else 1
|
||||
val healLastFlaky = numberOfFlaky > 0 && random.nextBoolean()
|
||||
val flaky: Map[Int, (RoleName, List[RoleName])] =
|
||||
(0 until numberOfFlaky).map { i =>
|
||||
val from = nodes(random.nextInt(nodes.size))
|
||||
|
|
@ -296,11 +297,15 @@ class RandomizedSplitBrainResolverIntegrationSpec
|
|||
|
||||
val delays = (0 until 10).map(_ => 2 + random.nextInt(13))
|
||||
|
||||
log.info(s"Generated $scenario with random seed [$randomSeed] in round [$c]: " +
|
||||
s"cleanSplit [$cleanSplit], healCleanSplit [$healCleanSplit] " +
|
||||
(if (cleanSplit) s"side1 [${side1.map(_.name).mkString(", ")}], side2 [${side2.map(_.name).mkString(", ")}] ") +
|
||||
s"flaky [${flaky.map { case (_, (from, to)) => from.name -> to.map(_.name).mkString("(", ", ", ")") }.mkString("; ")}] " +
|
||||
s"delays [${delays.mkString(", ")}]")
|
||||
log.info(
|
||||
s"Generated $scenario with random seed [$randomSeed] in round [$c]: " +
|
||||
s"cleanSplit [$cleanSplit], healCleanSplit [$healCleanSplit] " +
|
||||
(if (cleanSplit)
|
||||
s"side1 [${side1.map(_.name).mkString(", ")}], side2 [${side2.map(_.name).mkString(", ")}] "
|
||||
else " ") +
|
||||
s", flaky [${flaky.map { case (_, (from, to)) => from.name -> to.map(_.name).mkString("(", ", ", ")") }.mkString("; ")}] " +
|
||||
s", healLastFlaky [$healLastFlaky] " +
|
||||
s", delays [${delays.mkString(", ")}]")
|
||||
|
||||
var delayIndex = 0
|
||||
def nextDelay(): Unit = {
|
||||
|
|
@ -330,7 +335,7 @@ class RandomizedSplitBrainResolverIntegrationSpec
|
|||
nextDelay()
|
||||
}
|
||||
|
||||
if (healLastFlay) {
|
||||
if (healLastFlaky) {
|
||||
val (prevFrom, prevTo) = flaky(flaky.size - 1)
|
||||
for (n <- prevTo)
|
||||
passThrough(prevFrom, n)
|
||||
|
|
|
|||
|
|
@ -46,9 +46,10 @@ abstract class ClusterShardingLeavingSpecConfig(mode: String)
|
|||
extends MultiNodeClusterShardingConfig(
|
||||
mode,
|
||||
loglevel = "DEBUG",
|
||||
additionalConfig = """
|
||||
additionalConfig =
|
||||
"""
|
||||
akka.cluster.sharding.verbose-debug-logging = on
|
||||
akka.cluster.sharding.rebalance-interval = 120 s
|
||||
akka.cluster.sharding.rebalance-interval = 1s # make rebalancing more likely to happen to test for https://github.com/akka/akka/issues/29093
|
||||
akka.cluster.sharding.distributed-data.majority-min-cap = 1
|
||||
akka.cluster.sharding.coordinator-state.write-majority-plus = 1
|
||||
akka.cluster.sharding.coordinator-state.read-majority-plus = 1
|
||||
|
|
|
|||
|
|
@ -135,7 +135,7 @@ abstract class ExternalShardAllocationSpec
|
|||
}
|
||||
enterBarrier("allocated-to-new-node")
|
||||
runOn(forth) {
|
||||
joinWithin(first)
|
||||
joinWithin(first, max = 10.seconds)
|
||||
}
|
||||
enterBarrier("forth-node-joined")
|
||||
runOn(first, second, third) {
|
||||
|
|
|
|||
|
|
@ -94,24 +94,30 @@ class ClusterSingletonManagerLeaseSpec
|
|||
awaitClusterUp(controller, first)
|
||||
enterBarrier("initial-up")
|
||||
runOn(second) {
|
||||
joinWithin(first)
|
||||
awaitAssert({
|
||||
cluster.state.members.toList.map(_.status) shouldEqual List(Up, Up, Up)
|
||||
}, 10.seconds)
|
||||
within(10.seconds) {
|
||||
joinWithin(first)
|
||||
awaitAssert {
|
||||
cluster.state.members.toList.map(_.status) shouldEqual List(Up, Up, Up)
|
||||
}
|
||||
}
|
||||
}
|
||||
enterBarrier("second-up")
|
||||
runOn(third) {
|
||||
joinWithin(first)
|
||||
awaitAssert({
|
||||
cluster.state.members.toList.map(_.status) shouldEqual List(Up, Up, Up, Up)
|
||||
}, 10.seconds)
|
||||
within(10.seconds) {
|
||||
joinWithin(first)
|
||||
awaitAssert {
|
||||
cluster.state.members.toList.map(_.status) shouldEqual List(Up, Up, Up, Up)
|
||||
}
|
||||
}
|
||||
}
|
||||
enterBarrier("third-up")
|
||||
runOn(fourth) {
|
||||
joinWithin(first)
|
||||
awaitAssert({
|
||||
cluster.state.members.toList.map(_.status) shouldEqual List(Up, Up, Up, Up, Up)
|
||||
}, 10.seconds)
|
||||
within(10.seconds) {
|
||||
joinWithin(first)
|
||||
awaitAssert {
|
||||
cluster.state.members.toList.map(_.status) shouldEqual List(Up, Up, Up, Up, Up)
|
||||
}
|
||||
}
|
||||
}
|
||||
enterBarrier("fourth-up")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -60,14 +60,18 @@ import akka.util.ccompat._
|
|||
!members.exists(member => member.dataCenter == selfDc && convergenceMemberStatus(member.status))
|
||||
|
||||
// If another member in the data center that is UP or LEAVING and has not seen this gossip or is exiting
|
||||
// convergence cannot be reached. For the first member in a secondary DC all members must have seen
|
||||
// the gossip state.
|
||||
def memberHinderingConvergenceExists =
|
||||
// convergence cannot be reached. For the first member in a secondary DC all Joining, WeaklyUp, Up or Leaving
|
||||
// members must have seen the gossip state. The reason for the stronger requirement for a first member in a
|
||||
// secondary DC is that first member should only be moved to Up once to ensure that the first upNumber is
|
||||
// only assigned once.
|
||||
def memberHinderingConvergenceExists = {
|
||||
val memberStatus = if (firstMemberInDc) convergenceMemberStatus + Joining + WeaklyUp else convergenceMemberStatus
|
||||
members.exists(
|
||||
member =>
|
||||
(firstMemberInDc || member.dataCenter == selfDc) &&
|
||||
convergenceMemberStatus(member.status) &&
|
||||
memberStatus(member.status) &&
|
||||
!(latestGossip.seenByNode(member.uniqueAddress) || exitingConfirmed(member.uniqueAddress)))
|
||||
}
|
||||
|
||||
// Find cluster members in the data center that are unreachable from other members of the data center
|
||||
// excluding observations from members outside of the data center, that have status DOWN or is passed in as confirmed exiting.
|
||||
|
|
|
|||
|
|
@ -94,7 +94,7 @@ abstract class MultiDcJoin2Spec extends MultiNodeSpec(MultiDcJoin2MultiJvmSpec)
|
|||
// at the same time join fifth, which is the difference compared to MultiDcJoinSpec
|
||||
runOn(fifth) {
|
||||
Cluster(system).join(second)
|
||||
within(10.seconds) {
|
||||
within(20.seconds) {
|
||||
awaitAssert {
|
||||
Cluster(system).state.members
|
||||
.exists(m => m.address == address(fifth) && m.status == MemberStatus.Up) should ===(true)
|
||||
|
|
|
|||
|
|
@ -5,15 +5,14 @@
|
|||
package akka.cluster
|
||||
|
||||
import java.lang.management.ManagementFactory
|
||||
import java.util.concurrent.ThreadLocalRandom
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.duration._
|
||||
import scala.language.postfixOps
|
||||
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import language.postfixOps
|
||||
import org.scalatest.BeforeAndAfterEach
|
||||
|
||||
import akka.actor.Actor
|
||||
|
|
@ -24,10 +23,8 @@ import akka.actor.ActorSystem
|
|||
import akka.actor.Address
|
||||
import akka.actor.Deploy
|
||||
import akka.actor.Identify
|
||||
import akka.actor.OneForOneStrategy
|
||||
import akka.actor.Props
|
||||
import akka.actor.RootActorPath
|
||||
import akka.actor.SupervisorStrategy._
|
||||
import akka.actor.Terminated
|
||||
import akka.cluster.ClusterEvent.CurrentClusterState
|
||||
import akka.cluster.ClusterEvent.CurrentInternalStats
|
||||
|
|
@ -35,31 +32,24 @@ import akka.cluster.ClusterEvent.MemberEvent
|
|||
import akka.remote.DefaultFailureDetectorRegistry
|
||||
import akka.remote.PhiAccrualFailureDetector
|
||||
import akka.remote.RARP
|
||||
import akka.remote.RemoteScope
|
||||
import akka.remote.artery.ArterySettings.AeronUpd
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.routing.FromConfig
|
||||
import akka.testkit._
|
||||
import akka.testkit.TestEvent._
|
||||
import akka.testkit._
|
||||
import akka.util.Helpers.ConfigOps
|
||||
import akka.util.Helpers.Requiring
|
||||
|
||||
/**
|
||||
* This test is intended to be used as long running stress test
|
||||
* of cluster related features. Number of nodes and duration of
|
||||
* of cluster membership features. Number of nodes and duration of
|
||||
* the test steps can be configured. The test scenario is organized as
|
||||
* follows:
|
||||
* 1. join nodes in various ways up to the configured total number of nodes
|
||||
* 2 while nodes are joining a few cluster aware routers are also working
|
||||
* 3. exercise concurrent joining and shutdown of nodes repeatedly
|
||||
* 4. exercise cluster aware routers, including high throughput
|
||||
* 5. exercise many actors in a tree structure
|
||||
* 6. exercise remote supervision
|
||||
* 7. gossip without any changes to the membership
|
||||
* 8. leave and shutdown nodes in various ways
|
||||
* 9. while nodes are removed remote death watch is also exercised
|
||||
* 10. while nodes are removed a few cluster aware routers are also working
|
||||
* 2. exercise concurrent joining and shutdown of nodes repeatedly
|
||||
* 3. gossip without any changes to the membership
|
||||
* 4. leave and shutdown nodes in various ways
|
||||
* 5. while nodes are removed remote death watch is also exercised
|
||||
*
|
||||
* By default it uses 13 nodes.
|
||||
* Example of sbt command line parameters to double that:
|
||||
|
|
@ -99,32 +89,19 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
|
|||
# scale the *-duration settings with this factor
|
||||
duration-factor = 1
|
||||
join-remove-duration = 90s
|
||||
work-batch-size = 100
|
||||
work-batch-interval = 2s
|
||||
payload-size = 1000
|
||||
normal-throughput-duration = 30s
|
||||
high-throughput-duration = 10s
|
||||
supervision-duration = 10s
|
||||
supervision-one-iteration = 2.5s
|
||||
idle-gossip-duration = 10s
|
||||
expected-test-duration = 600s
|
||||
# actors are created in a tree structure defined
|
||||
# by tree-width (number of children for each actor) and
|
||||
# tree-levels, total number of actors can be calculated by
|
||||
# (width * math.pow(width, levels) - 1) / (width - 1)
|
||||
tree-width = 4
|
||||
tree-levels = 4
|
||||
# scale convergence within timeouts with this factor
|
||||
convergence-within-factor = 1.0
|
||||
# set to off to only test cluster membership
|
||||
exercise-actors = on
|
||||
}
|
||||
|
||||
akka.actor.provider = cluster
|
||||
akka.cluster {
|
||||
failure-detector.acceptable-heartbeat-pause = 10s
|
||||
downing-provider-class = akka.cluster.testkit.AutoDowning
|
||||
testkit.auto-down-unreachable-after = 1s
|
||||
failure-detector.acceptable-heartbeat-pause = 3s
|
||||
downing-provider-class = akka.cluster.sbr.SplitBrainResolverProvider
|
||||
split-brain-resolver {
|
||||
stable-after = 5s
|
||||
}
|
||||
publish-stats-interval = 1s
|
||||
}
|
||||
akka.loggers = ["akka.testkit.TestEventListener"]
|
||||
|
|
@ -135,32 +112,6 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
|
|||
parallelism-max = 8
|
||||
}
|
||||
|
||||
akka.actor.deployment {
|
||||
/master-node-1/workers {
|
||||
router = round-robin-pool
|
||||
cluster {
|
||||
enabled = on
|
||||
max-nr-of-instances-per-node = 1
|
||||
allow-local-routees = on
|
||||
}
|
||||
}
|
||||
/master-node-2/workers {
|
||||
router = round-robin-group
|
||||
routees.paths = ["/user/worker"]
|
||||
cluster {
|
||||
enabled = on
|
||||
allow-local-routees = on
|
||||
}
|
||||
}
|
||||
/master-node-3/workers = {
|
||||
router = round-robin-pool
|
||||
cluster {
|
||||
enabled = on
|
||||
max-nr-of-instances-per-node = 1
|
||||
allow-local-routees = on
|
||||
}
|
||||
}
|
||||
}
|
||||
# test is using Java serialization and not priority to rewrite
|
||||
akka.actor.allow-java-serialization = on
|
||||
akka.actor.warn-about-java-serializer-usage = off
|
||||
|
|
@ -190,21 +141,11 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
|
|||
val numberOfNodesShutdown = getInt("nr-of-nodes-shutdown") * nFactor
|
||||
val numberOfNodesJoinRemove = getInt("nr-of-nodes-join-remove") // not scaled by nodes factor
|
||||
|
||||
val workBatchSize = getInt("work-batch-size")
|
||||
val workBatchInterval = testConfig.getMillisDuration("work-batch-interval")
|
||||
val payloadSize = getInt("payload-size")
|
||||
val dFactor = getInt("duration-factor")
|
||||
val joinRemoveDuration = testConfig.getMillisDuration("join-remove-duration") * dFactor
|
||||
val normalThroughputDuration = testConfig.getMillisDuration("normal-throughput-duration") * dFactor
|
||||
val highThroughputDuration = testConfig.getMillisDuration("high-throughput-duration") * dFactor
|
||||
val supervisionDuration = testConfig.getMillisDuration("supervision-duration") * dFactor
|
||||
val supervisionOneIteration = testConfig.getMillisDuration("supervision-one-iteration") * dFactor
|
||||
val idleGossipDuration = testConfig.getMillisDuration("idle-gossip-duration") * dFactor
|
||||
val expectedTestDuration = testConfig.getMillisDuration("expected-test-duration") * dFactor
|
||||
val treeWidth = getInt("tree-width")
|
||||
val treeLevels = getInt("tree-levels")
|
||||
val convergenceWithinFactor = getDouble("convergence-within-factor")
|
||||
val exerciseActors = getBoolean("exercise-actors")
|
||||
|
||||
require(
|
||||
numberOfSeedNodes + numberOfNodesJoiningToSeedNodesInitially + numberOfNodesJoiningOneByOneSmall +
|
||||
|
|
@ -438,154 +379,6 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Master of routers
|
||||
*
|
||||
* Flow control, to not flood the consumers, is handled by scheduling a
|
||||
* batch of messages to be sent to the router when half of the number
|
||||
* of outstanding messages remains.
|
||||
*
|
||||
* It uses a simple message retry mechanism. If an ack of a sent message
|
||||
* is not received within a timeout, that message will be resent to the router,
|
||||
* infinite number of times.
|
||||
*
|
||||
* When it receives the `End` command it will stop sending messages to the router,
|
||||
* resends continuous, until all outstanding acks have been received, and then
|
||||
* finally it replies with `WorkResult` to the sender of the `End` command, and stops
|
||||
* itself.
|
||||
*/
|
||||
class Master(settings: StressMultiJvmSpec.Settings, batchInterval: FiniteDuration, tree: Boolean) extends Actor {
|
||||
val workers = context.actorOf(FromConfig.props(Props[Worker]()), "workers")
|
||||
val payload = Array.fill(settings.payloadSize)(ThreadLocalRandom.current.nextInt(127).toByte)
|
||||
val retryTimeout = 5.seconds.dilated(context.system)
|
||||
val idCounter = Iterator.from(0)
|
||||
var sendCounter = 0L
|
||||
var ackCounter = 0L
|
||||
var outstanding = Map.empty[JobId, JobState]
|
||||
var startTime = 0L
|
||||
|
||||
import context.dispatcher
|
||||
val resendTask = context.system.scheduler.scheduleWithFixedDelay(3.seconds, 3.seconds, self, RetryTick)
|
||||
|
||||
override def postStop(): Unit = {
|
||||
resendTask.cancel()
|
||||
super.postStop()
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case Begin =>
|
||||
startTime = System.nanoTime
|
||||
self ! SendBatch
|
||||
context.become(working)
|
||||
case RetryTick =>
|
||||
}
|
||||
|
||||
def working: Receive = {
|
||||
case Ack(id) =>
|
||||
outstanding -= id
|
||||
ackCounter += 1
|
||||
if (outstanding.size == settings.workBatchSize / 2)
|
||||
if (batchInterval == Duration.Zero) self ! SendBatch
|
||||
else context.system.scheduler.scheduleOnce(batchInterval, self, SendBatch)
|
||||
case SendBatch => sendJobs()
|
||||
case RetryTick => resend()
|
||||
case End =>
|
||||
done(sender())
|
||||
context.become(ending(sender()))
|
||||
}
|
||||
|
||||
def ending(replyTo: ActorRef): Receive = {
|
||||
case Ack(id) =>
|
||||
outstanding -= id
|
||||
ackCounter += 1
|
||||
done(replyTo)
|
||||
case SendBatch =>
|
||||
case RetryTick => resend()
|
||||
}
|
||||
|
||||
def done(replyTo: ActorRef): Unit =
|
||||
if (outstanding.isEmpty) {
|
||||
val duration = (System.nanoTime - startTime).nanos
|
||||
replyTo ! WorkResult(duration, sendCounter, ackCounter)
|
||||
context.stop(self)
|
||||
}
|
||||
|
||||
def sendJobs(): Unit = {
|
||||
(0 until settings.workBatchSize).foreach { _ =>
|
||||
send(createJob())
|
||||
}
|
||||
}
|
||||
|
||||
def createJob(): Job = {
|
||||
if (tree)
|
||||
TreeJob(
|
||||
idCounter.next(),
|
||||
payload,
|
||||
ThreadLocalRandom.current.nextInt(settings.treeWidth),
|
||||
settings.treeLevels,
|
||||
settings.treeWidth)
|
||||
else SimpleJob(idCounter.next(), payload)
|
||||
}
|
||||
|
||||
def resend(): Unit = {
|
||||
outstanding.values.foreach { jobState =>
|
||||
if (jobState.deadline.isOverdue())
|
||||
send(jobState.job)
|
||||
}
|
||||
}
|
||||
|
||||
def send(job: Job): Unit = {
|
||||
outstanding += job.id -> JobState(Deadline.now + retryTimeout, job)
|
||||
sendCounter += 1
|
||||
workers ! job
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Used by Master as routee
|
||||
*/
|
||||
class Worker extends Actor with ActorLogging {
|
||||
def receive = {
|
||||
case SimpleJob(id, _) => sender() ! Ack(id)
|
||||
case TreeJob(id, payload, idx, levels, width) =>
|
||||
// create the actors when first TreeJob message is received
|
||||
val totalActors = ((width * math.pow(width, levels) - 1) / (width - 1)).toInt
|
||||
log.debug(
|
||||
"Creating [{}] actors in a tree structure of [{}] levels and each actor has [{}] children",
|
||||
totalActors,
|
||||
levels,
|
||||
width)
|
||||
val tree = context.actorOf(Props(classOf[TreeNode], levels, width), "tree")
|
||||
tree.forward((idx, SimpleJob(id, payload)))
|
||||
context.become(treeWorker(tree))
|
||||
}
|
||||
|
||||
def treeWorker(tree: ActorRef): Receive = {
|
||||
case SimpleJob(id, _) => sender() ! Ack(id)
|
||||
case TreeJob(id, payload, idx, _, _) =>
|
||||
tree.forward((idx, SimpleJob(id, payload)))
|
||||
}
|
||||
}
|
||||
|
||||
class TreeNode(level: Int, width: Int) extends Actor {
|
||||
require(level >= 1)
|
||||
def createChild(): Actor = if (level == 1) new Leaf else new TreeNode(level - 1, width)
|
||||
val indexedChildren =
|
||||
(0 until width).map { i =>
|
||||
context.actorOf(Props(createChild()).withDeploy(Deploy.local), name = i.toString)
|
||||
} toVector
|
||||
|
||||
def receive = {
|
||||
case (idx: Int, job: SimpleJob) if idx < width => indexedChildren(idx).forward((idx, job))
|
||||
}
|
||||
}
|
||||
|
||||
class Leaf extends Actor {
|
||||
def receive = {
|
||||
case (_: Int, job: SimpleJob) => sender() ! Ack(job.id)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Used for remote death watch testing
|
||||
*/
|
||||
|
|
@ -593,41 +386,6 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
|
|||
def receive = Actor.emptyBehavior
|
||||
}
|
||||
|
||||
/**
|
||||
* Used for remote supervision testing
|
||||
*/
|
||||
class Supervisor extends Actor {
|
||||
|
||||
var restartCount = 0
|
||||
|
||||
override val supervisorStrategy =
|
||||
OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 1 minute) {
|
||||
case _: Exception =>
|
||||
restartCount += 1
|
||||
Restart
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case props: Props => context.actorOf(props)
|
||||
case e: Exception => context.children.foreach { _ ! e }
|
||||
case GetChildrenCount => sender() ! ChildrenCount(context.children.size, restartCount)
|
||||
case Reset =>
|
||||
require(
|
||||
context.children.isEmpty,
|
||||
s"ResetChildrenCount not allowed when children exists, [${context.children.size}]")
|
||||
restartCount = 0
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Child of Supervisor for remote supervision testing
|
||||
*/
|
||||
class RemoteChild extends Actor {
|
||||
def receive = {
|
||||
case e: Exception => throw e
|
||||
}
|
||||
}
|
||||
|
||||
case object Begin
|
||||
case object End
|
||||
case object RetryTick
|
||||
|
|
@ -640,22 +398,6 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
|
|||
}
|
||||
final case class ReportTo(ref: Option[ActorRef])
|
||||
final case class StatsResult(from: Address, stats: CurrentInternalStats)
|
||||
|
||||
type JobId = Int
|
||||
trait Job { def id: JobId }
|
||||
final case class SimpleJob(id: JobId, payload: Any) extends Job
|
||||
final case class TreeJob(id: JobId, payload: Any, idx: Int, levels: Int, width: Int) extends Job
|
||||
final case class Ack(id: JobId)
|
||||
final case class JobState(deadline: Deadline, job: Job)
|
||||
final case class WorkResult(duration: Duration, sendCount: Long, ackCount: Long) {
|
||||
def retryCount: Long = sendCount - ackCount
|
||||
def jobsPerSecond: Double = ackCount * 1000.0 / duration.toMillis
|
||||
}
|
||||
case object SendBatch
|
||||
final case class CreateTree(levels: Int, width: Int)
|
||||
|
||||
case object GetChildrenCount
|
||||
final case class ChildrenCount(numberOfChildren: Int, numberOfChildRestarts: Int)
|
||||
case object Reset
|
||||
|
||||
}
|
||||
|
|
@ -699,13 +441,7 @@ abstract class StressSpec
|
|||
override def muteLog(sys: ActorSystem = system): Unit = {
|
||||
super.muteLog(sys)
|
||||
sys.eventStream.publish(Mute(EventFilter[RuntimeException](pattern = ".*Simulated exception.*")))
|
||||
muteDeadLetters(
|
||||
classOf[SimpleJob],
|
||||
classOf[AggregatedClusterResult],
|
||||
SendBatch.getClass,
|
||||
classOf[StatsResult],
|
||||
classOf[PhiResult],
|
||||
RetryTick.getClass)(sys)
|
||||
muteDeadLetters(classOf[AggregatedClusterResult], classOf[StatsResult], classOf[PhiResult], RetryTick.getClass)(sys)
|
||||
}
|
||||
|
||||
override protected def afterTermination(): Unit = {
|
||||
|
|
@ -780,14 +516,9 @@ abstract class StressSpec
|
|||
def latestGossipStats = cluster.readView.latestStats.gossipStats
|
||||
|
||||
override def cluster: Cluster = {
|
||||
createWorker
|
||||
super.cluster
|
||||
}
|
||||
|
||||
// always create one worker when the cluster is started
|
||||
lazy val createWorker: Unit =
|
||||
system.actorOf(Props[Worker](), "worker")
|
||||
|
||||
def createResultAggregator(title: String, expectedResults: Int, includeInHistory: Boolean): Unit = {
|
||||
runOn(roles.head) {
|
||||
val aggregator = system.actorOf(
|
||||
|
|
@ -1038,116 +769,6 @@ abstract class StressSpec
|
|||
|
||||
}
|
||||
|
||||
def masterName: String = "master-" + myself.name
|
||||
|
||||
def master: Option[ActorRef] = {
|
||||
system.actorSelection("/user/" + masterName).tell(Identify("master"), identifyProbe.ref)
|
||||
identifyProbe.expectMsgType[ActorIdentity].ref
|
||||
}
|
||||
|
||||
def exerciseRouters(
|
||||
title: String,
|
||||
duration: FiniteDuration,
|
||||
batchInterval: FiniteDuration,
|
||||
expectDroppedMessages: Boolean,
|
||||
tree: Boolean): Unit =
|
||||
within(duration + 10.seconds) {
|
||||
nbrUsedRoles should ===(totalNumberOfNodes)
|
||||
createResultAggregator(title, expectedResults = nbrUsedRoles, includeInHistory = false)
|
||||
|
||||
val (masterRoles, otherRoles) = roles.take(nbrUsedRoles).splitAt(3)
|
||||
runOn(masterRoles: _*) {
|
||||
reportResult {
|
||||
val m = system.actorOf(
|
||||
Props(classOf[Master], settings, batchInterval, tree).withDeploy(Deploy.local),
|
||||
name = masterName)
|
||||
m ! Begin
|
||||
import system.dispatcher
|
||||
system.scheduler.scheduleOnce(duration) {
|
||||
m.tell(End, testActor)
|
||||
}
|
||||
val workResult = awaitWorkResult(m)
|
||||
workResult.sendCount should be > (0L)
|
||||
workResult.ackCount should be > (0L)
|
||||
if (!expectDroppedMessages)
|
||||
workResult.retryCount should ===(0)
|
||||
|
||||
enterBarrier("routers-done-" + step)
|
||||
}
|
||||
}
|
||||
runOn(otherRoles: _*) {
|
||||
reportResult {
|
||||
enterBarrier("routers-done-" + step)
|
||||
}
|
||||
}
|
||||
|
||||
awaitClusterResult()
|
||||
}
|
||||
|
||||
def awaitWorkResult(m: ActorRef): WorkResult = {
|
||||
val workResult = expectMsgType[WorkResult]
|
||||
if (settings.infolog)
|
||||
log.info(
|
||||
"{} result, [{}] jobs/s, retried [{}] of [{}] msg",
|
||||
masterName,
|
||||
workResult.jobsPerSecond.form,
|
||||
workResult.retryCount,
|
||||
workResult.sendCount)
|
||||
watch(m)
|
||||
expectTerminated(m)
|
||||
workResult
|
||||
}
|
||||
|
||||
def exerciseSupervision(title: String, duration: FiniteDuration, oneIteration: Duration): Unit =
|
||||
within(duration + 10.seconds) {
|
||||
val rounds = (duration.toMillis / oneIteration.toMillis).max(1).toInt
|
||||
val supervisor = system.actorOf(Props[Supervisor](), "supervisor")
|
||||
for (_ <- 0 until rounds) {
|
||||
createResultAggregator(title, expectedResults = nbrUsedRoles, includeInHistory = false)
|
||||
|
||||
val (masterRoles, otherRoles) = roles.take(nbrUsedRoles).splitAt(3)
|
||||
runOn(masterRoles: _*) {
|
||||
reportResult {
|
||||
roles.take(nbrUsedRoles).foreach { r =>
|
||||
supervisor ! Props[RemoteChild]().withDeploy(Deploy(scope = RemoteScope(address(r))))
|
||||
}
|
||||
supervisor ! GetChildrenCount
|
||||
expectMsgType[ChildrenCount] should ===(ChildrenCount(nbrUsedRoles, 0))
|
||||
|
||||
(1 to 5).foreach { _ =>
|
||||
supervisor ! new RuntimeException("Simulated exception")
|
||||
}
|
||||
awaitAssert {
|
||||
supervisor ! GetChildrenCount
|
||||
val c = expectMsgType[ChildrenCount]
|
||||
c should ===(ChildrenCount(nbrUsedRoles, 5 * nbrUsedRoles))
|
||||
}
|
||||
|
||||
// after 5 restart attempts the children should be stopped
|
||||
supervisor ! new RuntimeException("Simulated exception")
|
||||
awaitAssert {
|
||||
supervisor ! GetChildrenCount
|
||||
val c = expectMsgType[ChildrenCount]
|
||||
// zero children
|
||||
c should ===(ChildrenCount(0, 6 * nbrUsedRoles))
|
||||
}
|
||||
supervisor ! Reset
|
||||
|
||||
}
|
||||
enterBarrier("supervision-done-" + step)
|
||||
}
|
||||
|
||||
runOn(otherRoles: _*) {
|
||||
reportResult {
|
||||
enterBarrier("supervision-done-" + step)
|
||||
}
|
||||
}
|
||||
|
||||
awaitClusterResult()
|
||||
step += 1
|
||||
}
|
||||
}
|
||||
|
||||
def idleGossip(title: String): Unit = {
|
||||
createResultAggregator(title, expectedResults = nbrUsedRoles, includeInHistory = true)
|
||||
reportResult {
|
||||
|
|
@ -1195,14 +816,6 @@ abstract class StressSpec
|
|||
enterBarrier("after-" + step)
|
||||
}
|
||||
|
||||
"start routers that are running while nodes are joining" taggedAs LongRunningTest in {
|
||||
runOn(roles.take(3): _*) {
|
||||
system.actorOf(
|
||||
Props(classOf[Master], settings, settings.workBatchInterval, false).withDeploy(Deploy.local),
|
||||
name = masterName) ! Begin
|
||||
}
|
||||
}
|
||||
|
||||
"join nodes one-by-one to small cluster" taggedAs LongRunningTest in {
|
||||
joinOneByOne(numberOfNodesJoiningOneByOneSmall)
|
||||
enterBarrier("after-" + step)
|
||||
|
|
@ -1227,99 +840,16 @@ abstract class StressSpec
|
|||
enterBarrier("after-" + step)
|
||||
}
|
||||
|
||||
"end routers that are running while nodes are joining" taggedAs LongRunningTest in within(30.seconds) {
|
||||
if (exerciseActors) {
|
||||
runOn(roles.take(3): _*) {
|
||||
master match {
|
||||
case Some(m) =>
|
||||
m.tell(End, testActor)
|
||||
val workResult = awaitWorkResult(m)
|
||||
workResult.retryCount should ===(0)
|
||||
workResult.sendCount should be > (0L)
|
||||
workResult.ackCount should be > (0L)
|
||||
case None => fail("master not running")
|
||||
}
|
||||
}
|
||||
}
|
||||
enterBarrier("after-" + step)
|
||||
}
|
||||
|
||||
"use routers with normal throughput" taggedAs LongRunningTest in {
|
||||
if (exerciseActors) {
|
||||
exerciseRouters(
|
||||
"use routers with normal throughput",
|
||||
normalThroughputDuration,
|
||||
batchInterval = workBatchInterval,
|
||||
expectDroppedMessages = false,
|
||||
tree = false)
|
||||
}
|
||||
enterBarrier("after-" + step)
|
||||
}
|
||||
|
||||
"use routers with high throughput" taggedAs LongRunningTest in {
|
||||
if (exerciseActors) {
|
||||
exerciseRouters(
|
||||
"use routers with high throughput",
|
||||
highThroughputDuration,
|
||||
batchInterval = Duration.Zero,
|
||||
expectDroppedMessages = false,
|
||||
tree = false)
|
||||
}
|
||||
enterBarrier("after-" + step)
|
||||
}
|
||||
|
||||
"use many actors with normal throughput" taggedAs LongRunningTest in {
|
||||
if (exerciseActors) {
|
||||
exerciseRouters(
|
||||
"use many actors with normal throughput",
|
||||
normalThroughputDuration,
|
||||
batchInterval = workBatchInterval,
|
||||
expectDroppedMessages = false,
|
||||
tree = true)
|
||||
}
|
||||
enterBarrier("after-" + step)
|
||||
}
|
||||
|
||||
"use many actors with high throughput" taggedAs LongRunningTest in {
|
||||
if (exerciseActors) {
|
||||
exerciseRouters(
|
||||
"use many actors with high throughput",
|
||||
highThroughputDuration,
|
||||
batchInterval = Duration.Zero,
|
||||
expectDroppedMessages = false,
|
||||
tree = true)
|
||||
}
|
||||
enterBarrier("after-" + step)
|
||||
}
|
||||
|
||||
"exercise join/remove/join/remove" taggedAs LongRunningTest in {
|
||||
exerciseJoinRemove("exercise join/remove", joinRemoveDuration)
|
||||
enterBarrier("after-" + step)
|
||||
}
|
||||
|
||||
"exercise supervision" taggedAs LongRunningTest in {
|
||||
if (exerciseActors) {
|
||||
exerciseSupervision("exercise supervision", supervisionDuration, supervisionOneIteration)
|
||||
}
|
||||
enterBarrier("after-" + step)
|
||||
}
|
||||
|
||||
"gossip when idle" taggedAs LongRunningTest in {
|
||||
idleGossip("idle gossip")
|
||||
enterBarrier("after-" + step)
|
||||
}
|
||||
|
||||
"start routers that are running while nodes are removed" taggedAs LongRunningTest in {
|
||||
if (exerciseActors) {
|
||||
runOn(roles.take(3): _*) {
|
||||
system.actorOf(
|
||||
Props(classOf[Master], settings, settings.workBatchInterval, false).withDeploy(Deploy.local),
|
||||
name = masterName) ! Begin
|
||||
}
|
||||
}
|
||||
enterBarrier("after-" + step)
|
||||
}
|
||||
|
||||
"leave nodes one-by-one from large cluster" taggedAs LongRunningTest in {
|
||||
removeOneByOne(numberOfNodesLeavingOneByOneLarge, shutdown = false)
|
||||
enterBarrier("after-" + step)
|
||||
|
|
@ -1352,22 +882,6 @@ abstract class StressSpec
|
|||
enterBarrier("after-" + step)
|
||||
}
|
||||
|
||||
"end routers that are running while nodes are removed" taggedAs LongRunningTest in within(30.seconds) {
|
||||
if (exerciseActors) {
|
||||
runOn(roles.take(3): _*) {
|
||||
master match {
|
||||
case Some(m) =>
|
||||
m.tell(End, testActor)
|
||||
val workResult = awaitWorkResult(m)
|
||||
workResult.sendCount should be > (0L)
|
||||
workResult.ackCount should be > (0L)
|
||||
case None => fail("master not running")
|
||||
}
|
||||
}
|
||||
}
|
||||
enterBarrier("after-" + step)
|
||||
}
|
||||
|
||||
"log jvm info" taggedAs LongRunningTest in {
|
||||
if (infolog) {
|
||||
log.info("StressSpec JVM:\n{}", jvmInfo())
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ class GossipSpec extends AnyWordSpec with Matchers {
|
|||
val e1 = TestMember(Address("akka", "sys", "e", 2552), Joining)
|
||||
val e2 = TestMember(e1.address, Up)
|
||||
val e3 = TestMember(e1.address, Down)
|
||||
val f1 = TestMember(Address("akka", "sys", "f", 2552), Joining)
|
||||
|
||||
val dc1a1 = TestMember(Address("akka", "sys", "a", 2552), Up, Set.empty, dataCenter = "dc1")
|
||||
val dc1b1 = TestMember(Address("akka", "sys", "b", 2552), Up, Set.empty, dataCenter = "dc1")
|
||||
|
|
@ -272,6 +273,24 @@ class GossipSpec extends AnyWordSpec with Matchers {
|
|||
state(g2, dc2e1).convergence(Set.empty) should ===(true)
|
||||
}
|
||||
|
||||
"not reach convergence for first member of other data center until all have seen the gossip 2" in {
|
||||
// reproducer test for issue #29486
|
||||
val dc2e1 = TestMember(e1.address, status = Joining, roles = Set.empty, dataCenter = "dc2")
|
||||
val dc2f1 = TestMember(f1.address, status = Joining, roles = Set.empty, dataCenter = "dc2")
|
||||
val g =
|
||||
Gossip(members = SortedSet(dc1a1, dc1b1, dc2e1, dc2f1))
|
||||
.seen(dc1a1.uniqueAddress)
|
||||
.seen(dc1b1.uniqueAddress)
|
||||
.seen(dc2f1.uniqueAddress)
|
||||
|
||||
// dc2 hasn't reached convergence because dc2e1 has not seen it (and that matters even though it is only Joining)
|
||||
state(g, dc2f1).convergence(Set.empty) should ===(false)
|
||||
|
||||
// until all have seen it
|
||||
val g2 = g.seen(dc2e1.uniqueAddress)
|
||||
state(g2, dc2f1).convergence(Set.empty) should ===(true)
|
||||
}
|
||||
|
||||
"reach convergence per data center even if another data center contains unreachable" in {
|
||||
val r1 = Reachability.empty.unreachable(dc2c1.uniqueAddress, dc2d1.uniqueAddress)
|
||||
|
||||
|
|
|
|||
|
|
@ -27,30 +27,30 @@ There are two parts of Akka that need careful consideration when performing an r
|
|||
1. Serialization format of persisted events and snapshots. New nodes must be able to read old data, and
|
||||
during the update old nodes must be able to read data stored by new nodes.
|
||||
|
||||
There are many more application specific aspects for serialization changes during rolling upgrades to consider.
|
||||
There are many more application specific aspects for serialization changes during rolling updates to consider.
|
||||
For example based on the use case and requirements, whether to allow dropped messages or tear down the TCP connection when the manifest is unknown.
|
||||
When some message loss during a rolling upgrade is acceptable versus a full shutdown and restart, assuming the application recovers afterwards
|
||||
When some message loss during a rolling update is acceptable versus a full shutdown and restart, assuming the application recovers afterwards
|
||||
* If a `java.io.NotSerializableException` is thrown in `fromBinary` this is treated as a transient problem, the issue logged and the message is dropped
|
||||
* If other exceptions are thrown it can be an indication of corrupt bytes from the underlying transport, and the connection is broken
|
||||
|
||||
For more zero-impact rolling upgrades, it is important to consider a strategy for serialization that can be evolved.
|
||||
One approach to retiring a serializer without downtime is described in @ref:[two rolling upgrade steps to switch to the new serializer](../serialization.md#rolling-upgrades).
|
||||
For more zero-impact rolling updates, it is important to consider a strategy for serialization that can be evolved.
|
||||
One approach to retiring a serializer without downtime is described in @ref:[two rolling update steps to switch to the new serializer](../serialization.md#rolling-updates).
|
||||
Additionally you can find advice on @ref:[Persistence - Schema Evolution](../persistence-schema-evolution.md) which also applies to remote messages when deploying with rolling updates.
|
||||
|
||||
## Cluster Sharding
|
||||
|
||||
During a rolling upgrade, sharded entities receiving traffic may be moved during @ref:[shard rebalancing](../typed/cluster-sharding-concepts.md#shard-rebalancing),
|
||||
During a rolling update, sharded entities receiving traffic may be moved during @ref:[shard rebalancing](../typed/cluster-sharding-concepts.md#shard-rebalancing),
|
||||
to an old or new node in the cluster, based on the pluggable allocation strategy and settings.
|
||||
When an old node is stopped the shards that were running on it are moved to one of the
|
||||
other old nodes remaining in the cluster. The `ShardCoordinator` is itself a cluster singleton.
|
||||
To minimize downtime of the shard coordinator, see the strategies about @ref[ClusterSingleton](#cluster-singleton) rolling upgrades below.
|
||||
To minimize downtime of the shard coordinator, see the strategies about @ref[ClusterSingleton](#cluster-singleton) rolling updates below.
|
||||
|
||||
A few specific changes to sharding configuration require @ref:[a full cluster restart](#cluster-sharding-configuration-change).
|
||||
|
||||
## Cluster Singleton
|
||||
|
||||
Cluster singletons are always running on the oldest node. To avoid moving cluster singletons more than necessary during a rolling upgrade,
|
||||
it is recommended to upgrade the oldest node last. This way cluster singletons are only moved once during a full rolling upgrade.
|
||||
Cluster singletons are always running on the oldest node. To avoid moving cluster singletons more than necessary during a rolling update,
|
||||
it is recommended to upgrade the oldest node last. This way cluster singletons are only moved once during a full rolling update.
|
||||
Otherwise, in the worst case cluster singletons may be migrated from node to node which requires coordination and initialization
|
||||
overhead several times.
|
||||
|
||||
|
|
@ -160,5 +160,5 @@ Rolling update is not supported when @ref:[changing the remoting transport](../r
|
|||
|
||||
### Migrating from Classic Sharding to Typed Sharding
|
||||
|
||||
If you have been using classic sharding it is possible to do a rolling upgrade to typed sharding using a 3 step procedure.
|
||||
If you have been using classic sharding it is possible to do a rolling update to typed sharding using a 3 step procedure.
|
||||
The steps along with example commits are detailed in [this sample PR](https://github.com/akka/akka-samples/pull/110)
|
||||
|
|
|
|||
|
|
@ -5,9 +5,11 @@
|
|||
Akka offers tiny helpers for use with @scala[@scaladoc[Future](scala.concurrent.Future)s]@java[@javadoc[CompletionStage](java.util.concurrent.CompletionStage)]. These are part of Akka's core module:
|
||||
|
||||
@@dependency[sbt,Maven,Gradle] {
|
||||
symbol1=AkkaVersion
|
||||
value1="$akka.version$"
|
||||
group="com.typesafe.akka"
|
||||
artifact="akka-actor_$scala.binary_version$"
|
||||
version="$akka.version$"
|
||||
artifact="akka-actor_$scala.binary.version$"
|
||||
version=AkkaVersion
|
||||
}
|
||||
|
||||
## After
|
||||
|
|
|
|||
|
|
@ -224,7 +224,7 @@ needs to have an associated code which indicates if it is a window or aisle seat
|
|||
Adding fields is the most common change you'll need to apply to your messages so make sure the serialization format
|
||||
you picked for your payloads can handle it apropriately, i.e. such changes should be *binary compatible*.
|
||||
This is achieved using the right serializer toolkit. In the following examples we will be using protobuf.
|
||||
See also @ref:[how to add fields with Jackson](serialization-jackson.md#add-field).
|
||||
See also @ref:[how to add fields with Jackson](serialization-jackson.md#add-optional-field).
|
||||
|
||||
While being able to read messages with missing fields is half of the solution, you also need to deal with the missing
|
||||
values somehow. This is usually modeled as some kind of default value, or by representing the field as an @scala[`Option[T]`]@java[`Optional<T>`]
|
||||
|
|
|
|||
|
|
@ -19,7 +19,9 @@ reading this migration guide and testing your application thoroughly is recommen
|
|||
|
||||
Rolling updates are possible without shutting down all nodes of the Akka Cluster, but will require
|
||||
configuration adjustments as described in the @ref:[Remoting](#remoting) section of this migration
|
||||
guide.
|
||||
guide. Due to the @ref:[changed serialization of the Cluster messages in Akka 2.6.2](rolling-update.md#2-6-2-clustermessageserializer-manifests-change)
|
||||
a rolling update from 2.5.x must first be made to Akka 2.6.2 and then a second rolling update can change to Akka 2.6.3
|
||||
or later.
|
||||
|
||||
## Scala 2.11 no longer supported
|
||||
|
||||
|
|
|
|||
|
|
@ -92,7 +92,7 @@ This means that a rolling update will have to go through at least one of 2.6.2,
|
|||
Issue: [#28918](https://github.com/akka/akka/issues/28918). JacksonCborSerializer was using plain JSON format
|
||||
instead of CBOR.
|
||||
|
||||
If you have `jackson-cbor` in your `serialization-bindings` a rolling upgrade will have to go through 2.6.5 when
|
||||
If you have `jackson-cbor` in your `serialization-bindings` a rolling update will have to go through 2.6.5 when
|
||||
upgrading to 2.6.5 or higher.
|
||||
|
||||
In Akka 2.6.5 the `jackson-cbor` binding will still serialize to JSON format to support rolling update from 2.6.4.
|
||||
|
|
|
|||
|
|
@ -164,11 +164,20 @@ when using polymorphic types.
|
|||
|
||||
### ADT with trait and case object
|
||||
|
||||
In Scala it's common to use a sealed trait and case objects to represent enums. If the values are case classes
|
||||
It's common in Scala to use a sealed trait and case objects to represent enums. If the values are case classes
|
||||
the `@JsonSubTypes` annotation as described above works, but if the values are case objects it will not.
|
||||
The annotation requires a `Class` and there is no way to define that in an annotation for a `case object`.
|
||||
|
||||
This can be solved by implementing a custom serialization for the enums. Annotate the `trait` with
|
||||
The easiest workaround is to define the case objects as case class without any field.
|
||||
|
||||
Alternatively, you can define an intermediate trait for the case object and a custom deserializer for it. The example below builds on the previous `Animal` sample by adding a fictitious, single instance, new animal, an `Unicorn`.
|
||||
|
||||
Scala
|
||||
: @@snip [SerializationDocSpec.scala](/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/SerializationDocSpec.scala) { #polymorphism-case-object }
|
||||
|
||||
The case object `Unicorn` can't be used in a `@JsonSubTypes` annotation, but its trait can. When serializing the case object we need to know which type tag to use, hence the `@JsonTypeName` annotation on the object. When deserializing, Jackson will only know about the trait variant therefore we need a custom deserializer that returns the case object.
|
||||
|
||||
On the other hand, if the ADT only has case objects, you can solve it by implementing a custom serialization for the enums. Annotate the `trait` with
|
||||
`@JsonSerialize` and `@JsonDeserialize` and implement the serialization with `StdSerializer` and
|
||||
`StdDeserializer`.
|
||||
|
||||
|
|
@ -205,7 +214,7 @@ We will look at a few scenarios of how the classes may be evolved.
|
|||
Removing a field can be done without any migration code. The Jackson serializer will ignore properties that does
|
||||
not exist in the class.
|
||||
|
||||
### Add Field
|
||||
### Add Optional Field
|
||||
|
||||
Adding an optional field can be done without any migration code. The default value will be @scala[None]@java[`Optional.empty`].
|
||||
|
||||
|
|
@ -226,6 +235,8 @@ Scala
|
|||
Java
|
||||
: @@snip [ItemAdded.java](/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v2a/ItemAdded.java) { #add-optional }
|
||||
|
||||
### Add Mandatory Field
|
||||
|
||||
Let's say we want to have a mandatory `discount` property without default value instead:
|
||||
|
||||
Scala
|
||||
|
|
@ -361,6 +372,63 @@ binding, but it should still be possible to deserialize old data with Jackson.
|
|||
|
||||
It's a list of class names or prefixes of class names.
|
||||
|
||||
## Rolling updates
|
||||
|
||||
When doing a rolling update, for a period of time there are two different binaries running in production. If the schema
|
||||
has evolved requiring a new schema version, the data serialized by the new binary will be unreadable from the old
|
||||
binary. This situation causes transient errors on the processes running the old binary. This service degradation is
|
||||
usually fine since the rolling update will eventually complete and all old processes will be replaced with the new
|
||||
binary. To avoid this service degradation you can also use forward-one support in your schema evolutions.
|
||||
|
||||
To complete a no-degradation rolling update, you need to make two deployments. First, deploy a new binary which can read
|
||||
the new schema but still uses the old schema. Then, deploy a second binary which serializes data using the new schema
|
||||
and drops the downcasting code from the migration.
|
||||
|
||||
Let's take, for example, the case above where we [renamed a field](#rename-field).
|
||||
|
||||
The starting schema is:
|
||||
|
||||
Scala
|
||||
: @@snip [ItemAdded.java](/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v1/ItemAdded.scala) { #add-optional }
|
||||
|
||||
Java
|
||||
: @@snip [ItemAdded.java](/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v1/ItemAdded.java) { #add-optional }
|
||||
|
||||
In a first deployment, we still don't make any change to the event class:
|
||||
|
||||
Scala
|
||||
: @@snip [ItemAdded.scala](/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v1/ItemAdded.scala) { #forward-one-rename }
|
||||
|
||||
Java
|
||||
: @@snip [ItemAdded.java](/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v1/ItemAdded.java) { #forward-one-rename }
|
||||
|
||||
but we introduce a migration that can read the newer schema which is versioned `2`:
|
||||
|
||||
Scala
|
||||
: @@snip [ItemAddedMigration.scala](/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v1withv2/ItemAddedMigration.scala) { #forward-one-rename }
|
||||
|
||||
Java
|
||||
: @@snip [ItemAddedMigration.java](/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v1withv2/ItemAddedMigration.java) { #forward-one-rename }
|
||||
|
||||
Once all running nodes have the new migration code which can read version `2` of `ItemAdded` we can proceed with the
|
||||
second step. So, we deploy the updated event:
|
||||
|
||||
Scala
|
||||
: @@snip [ItemAdded.scala](/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v2c/ItemAdded.scala) { #rename }
|
||||
|
||||
Java
|
||||
: @@snip [ItemAdded.java](/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v2c/ItemAdded.java) { #rename }
|
||||
|
||||
and the final migration code which no longer needs forward-compatibility code:
|
||||
|
||||
Scala
|
||||
: @@snip [ItemAddedMigration.scala](/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v2c/ItemAddedMigration.scala) { #rename }
|
||||
|
||||
Java
|
||||
: @@snip [ItemAddedMigration.java](/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v2c/ItemAddedMigration.java) { #rename }
|
||||
|
||||
|
||||
|
||||
## Jackson Modules
|
||||
|
||||
The following Jackson modules are enabled by default:
|
||||
|
|
|
|||
|
|
@ -180,7 +180,7 @@ should be serialized by it.
|
|||
It's recommended to throw `IllegalArgumentException` or `java.io.NotSerializableException` in
|
||||
`fromBinary` if the manifest is unknown. This makes it possible to introduce new message types and
|
||||
send them to nodes that don't know about them. This is typically needed when performing
|
||||
rolling upgrades, i.e. running a cluster with mixed versions for a while.
|
||||
rolling updates, i.e. running a cluster with mixed versions for a while.
|
||||
Those exceptions are treated as a transient problem in the classic remoting
|
||||
layer. The problem will be logged and the message dropped. Other exceptions will tear down
|
||||
the TCP connection because it can be an indication of corrupt bytes from the underlying
|
||||
|
|
@ -252,24 +252,24 @@ akka.actor.warn-about-java-serializer-usage = off
|
|||
It is not safe to mix major Scala versions when using the Java serialization as Scala does not guarantee compatibility
|
||||
and this could lead to very surprising errors.
|
||||
|
||||
## Rolling upgrades
|
||||
## Rolling updates
|
||||
|
||||
A serialized remote message (or persistent event) consists of serializer-id, the manifest, and the binary payload.
|
||||
When deserializing it is only looking at the serializer-id to pick which `Serializer` to use for `fromBinary`.
|
||||
The message class (the bindings) is not used for deserialization. The manifest is only used within the
|
||||
`Serializer` to decide how to deserialize the payload, so one `Serializer` can handle many classes.
|
||||
|
||||
That means that it is possible to change serialization for a message by performing two rolling upgrade steps to
|
||||
That means that it is possible to change serialization for a message by performing two rolling update steps to
|
||||
switch to the new serializer.
|
||||
|
||||
1. Add the `Serializer` class and define it in `akka.actor.serializers` config section, but not in
|
||||
`akka.actor.serialization-bindings`. Perform a rolling upgrade for this change. This means that the
|
||||
`akka.actor.serialization-bindings`. Perform a rolling update for this change. This means that the
|
||||
serializer class exists on all nodes and is registered, but it is still not used for serializing any
|
||||
messages. That is important because during the rolling upgrade the old nodes still don't know about
|
||||
messages. That is important because during the rolling update the old nodes still don't know about
|
||||
the new serializer and would not be able to deserialize messages with that format.
|
||||
|
||||
1. The second change is to register that the serializer is to be used for certain classes by defining
|
||||
those in the `akka.actor.serialization-bindings` config section. Perform a rolling upgrade for this
|
||||
those in the `akka.actor.serialization-bindings` config section. Perform a rolling update for this
|
||||
change. This means that new nodes will use the new serializer when sending messages and old nodes will
|
||||
be able to deserialize the new format. Old nodes will continue to use the old serializer when sending
|
||||
messages and new nodes will be able to deserialize the old format.
|
||||
|
|
|
|||
|
|
@ -140,7 +140,7 @@ The reason for only using a limited number of nodes is to keep the number of con
|
|||
centers low. The same nodes are also used for the gossip protocol when disseminating the membership
|
||||
information across data centers. Within a data center all nodes are involved in gossip and failure detection.
|
||||
|
||||
This influences how rolling upgrades should be performed. Don't stop all of the oldest that are used for gossip
|
||||
This influences how rolling updates should be performed. Don't stop all of the oldest that are used for gossip
|
||||
at the same time. Stop one or a few at a time so that new nodes can take over the responsibility.
|
||||
It's best to leave the oldest nodes until last.
|
||||
|
||||
|
|
|
|||
|
|
@ -293,7 +293,7 @@ Cluster Sharding uses its own Distributed Data `Replicator` per node.
|
|||
If using roles with sharding there is one `Replicator` per role, which enables a subset of
|
||||
all nodes for some entity types and another subset for other entity types. Each replicator has a name
|
||||
that contains the node role and therefore the role configuration must be the same on all nodes in the
|
||||
cluster, for example you can't change the roles when performing a rolling upgrade.
|
||||
cluster, for example you can't change the roles when performing a rolling update.
|
||||
Changing roles requires @ref:[a full cluster restart](../additional/rolling-updates.md#cluster-sharding-configuration-change).
|
||||
|
||||
The `akka.cluster.sharding.distributed-data` config section configures the settings for Distributed Data.
|
||||
|
|
@ -413,7 +413,7 @@ akka.persistence.cassandra.journal {
|
|||
}
|
||||
```
|
||||
|
||||
Once you have migrated you cannot go back to the old persistence store, a rolling upgrade is therefore not possible.
|
||||
Once you have migrated you cannot go back to the old persistence store, a rolling update is therefore not possible.
|
||||
|
||||
When @ref:[Distributed Data mode](#distributed-data-mode) is used the identifiers of the entities are
|
||||
stored in @ref:[Durable Storage](distributed-data.md#durable-storage) of Distributed Data. You may want to change the
|
||||
|
|
|
|||
|
|
@ -173,7 +173,7 @@ avoid blocking APIs. The following solution explains how to handle blocking
|
|||
operations properly.
|
||||
|
||||
Note that the same hints apply to managing blocking operations anywhere in Akka,
|
||||
including Streams, Http and other reactive libraries built on top of it.
|
||||
including Streams, HTTP and other reactive libraries built on top of it.
|
||||
|
||||
@@@
|
||||
|
||||
|
|
|
|||
|
|
@ -139,7 +139,7 @@ public class LambdaPersistencePluginDocTest {
|
|||
public MyJournalSpecTest() {
|
||||
super(
|
||||
ConfigFactory.parseString(
|
||||
"persistence.journal.plugin = "
|
||||
"akka.persistence.journal.plugin = "
|
||||
+ "\"akka.persistence.journal.leveldb-shared\""));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@ class ArteryUpdSendConsistencyWithOneLaneSpec
|
|||
|
||||
class ArteryUpdSendConsistencyWithThreeLanesSpec
|
||||
extends AbstractRemoteSendConsistencySpec(ConfigFactory.parseString("""
|
||||
akka.loglevel = DEBUG
|
||||
akka.remote.artery.transport = aeron-udp
|
||||
akka.remote.artery.advanced.outbound-lanes = 3
|
||||
akka.remote.artery.advanced.inbound-lanes = 3
|
||||
|
|
|
|||
|
|
@ -22,11 +22,22 @@ import akka.util.unused
|
|||
abstract class JacksonMigration {
|
||||
|
||||
/**
|
||||
* Define current version. The first version, when no migration was used,
|
||||
* is always 1.
|
||||
* Define current version, that is, the value used when serializing new data. The first version, when no
|
||||
* migration was used, is always 1.
|
||||
*/
|
||||
def currentVersion: Int
|
||||
|
||||
/**
|
||||
* Define the supported forward version this migration can read (must be greater or equal than `currentVersion`).
|
||||
* If this value is different from [[currentVersion]] a [[JacksonMigration]] may be required to downcast
|
||||
* the received payload to the current schema.
|
||||
*/
|
||||
def supportedForwardVersion: Int = currentVersion
|
||||
|
||||
require(
|
||||
currentVersion <= supportedForwardVersion,
|
||||
s"""The "currentVersion" [$currentVersion] of a JacksonMigration must be less or equal to the "supportedForwardVersion" [$supportedForwardVersion].""")
|
||||
|
||||
/**
|
||||
* Override this method if you have changed the class name. Return
|
||||
* current class name.
|
||||
|
|
|
|||
|
|
@ -319,11 +319,16 @@ import akka.util.OptionVal
|
|||
val className = migration match {
|
||||
case Some(transformer) if fromVersion < transformer.currentVersion =>
|
||||
transformer.transformClassName(fromVersion, manifestClassName)
|
||||
case Some(transformer) if fromVersion > transformer.currentVersion =>
|
||||
case Some(transformer) if fromVersion == transformer.currentVersion =>
|
||||
manifestClassName
|
||||
case Some(transformer) if fromVersion <= transformer.supportedForwardVersion =>
|
||||
transformer.transformClassName(fromVersion, manifestClassName)
|
||||
case Some(transformer) if fromVersion > transformer.supportedForwardVersion =>
|
||||
throw new IllegalStateException(
|
||||
s"Migration version ${transformer.currentVersion} is " +
|
||||
s"Migration version ${transformer.supportedForwardVersion} is " +
|
||||
s"behind version $fromVersion of deserialized type [$manifestClassName]")
|
||||
case _ => manifestClassName
|
||||
case None =>
|
||||
manifestClassName
|
||||
}
|
||||
|
||||
if (typeInManifest && (className ne manifestClassName))
|
||||
|
|
@ -359,7 +364,13 @@ import akka.util.OptionVal
|
|||
val jsonTree = objectMapper.readTree(decompressedBytes)
|
||||
val newJsonTree = transformer.transform(fromVersion, jsonTree)
|
||||
objectMapper.treeToValue(newJsonTree, clazz)
|
||||
case _ =>
|
||||
case Some(transformer) if fromVersion == transformer.currentVersion =>
|
||||
objectMapper.readValue(decompressedBytes, clazz)
|
||||
case Some(transformer) if fromVersion <= transformer.supportedForwardVersion =>
|
||||
val jsonTree = objectMapper.readTree(decompressedBytes)
|
||||
val newJsonTree = transformer.transform(fromVersion, jsonTree)
|
||||
objectMapper.treeToValue(newJsonTree, clazz)
|
||||
case None =>
|
||||
objectMapper.readValue(decompressedBytes, clazz)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -9,15 +9,16 @@ import com.fasterxml.jackson.databind.node.IntNode;
|
|||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
|
||||
public class JavaTestEventMigration extends JacksonMigration {
|
||||
public class JavaTestEventMigrationV2 extends JacksonMigration {
|
||||
|
||||
@Override
|
||||
public int currentVersion() {
|
||||
return 3;
|
||||
return 2;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String transformClassName(int fromVersion, String className) {
|
||||
// Ignore the incoming manifest and produce the same class name always.
|
||||
return JavaTestMessages.Event2.class.getName();
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,56 @@
|
|||
/*
|
||||
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.serialization.jackson;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.node.IntNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
|
||||
public class JavaTestEventMigrationV2WithV3 extends JacksonMigration {
|
||||
|
||||
@Override
|
||||
public int currentVersion() {
|
||||
return 2;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int supportedForwardVersion() {
|
||||
return 3;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String transformClassName(int fromVersion, String className) {
|
||||
// Always produce the type of the currentVersion. When fromVersion is lower,
|
||||
// transform will lift it. When fromVersion is higher, transform will downcast it.
|
||||
return JavaTestMessages.Event2.class.getName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public JsonNode transform(int fromVersion, JsonNode json) {
|
||||
ObjectNode root = (ObjectNode) json;
|
||||
if (fromVersion < 2) {
|
||||
root = upcastV1ToV2((ObjectNode) json);
|
||||
}
|
||||
if (fromVersion == 3) {
|
||||
root = downcastV3ToV2((ObjectNode) json);
|
||||
}
|
||||
return root;
|
||||
}
|
||||
|
||||
private ObjectNode upcastV1ToV2(ObjectNode json) {
|
||||
ObjectNode root = json;
|
||||
root.set("field1V2", root.get("field1"));
|
||||
root.remove("field1");
|
||||
root.set("field2", IntNode.valueOf(17));
|
||||
return root;
|
||||
}
|
||||
|
||||
private ObjectNode downcastV3ToV2(ObjectNode json) {
|
||||
ObjectNode root = json;
|
||||
root.set("field2", root.get("field3"));
|
||||
root.remove("field3");
|
||||
return root;
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,49 @@
|
|||
/*
|
||||
* Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.serialization.jackson;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.node.IntNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
|
||||
public class JavaTestEventMigrationV3 extends JacksonMigration {
|
||||
|
||||
@Override
|
||||
public int currentVersion() {
|
||||
return 3;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String transformClassName(int fromVersion, String className) {
|
||||
// Always produce the type of the currentVersion. When fromVersion is lower,
|
||||
// transform will lift it. when fromVersion is higher, transform will adapt it.
|
||||
return JavaTestMessages.Event3.class.getName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public JsonNode transform(int fromVersion, JsonNode json) {
|
||||
ObjectNode root = (ObjectNode) json;
|
||||
if (fromVersion < 2) {
|
||||
root = upcastV1ToV2(root);
|
||||
}
|
||||
if (fromVersion < 3) {
|
||||
root = upcastV2ToV3(root);
|
||||
}
|
||||
return root;
|
||||
}
|
||||
|
||||
private ObjectNode upcastV1ToV2(ObjectNode root) {
|
||||
root.set("field1V2", root.get("field1"));
|
||||
root.remove("field1");
|
||||
root.set("field2", IntNode.valueOf(17));
|
||||
return root;
|
||||
}
|
||||
|
||||
private ObjectNode upcastV2ToV3(ObjectNode root) {
|
||||
root.set("field3", root.get("field2"));
|
||||
root.remove("field2");
|
||||
return root;
|
||||
}
|
||||
}
|
||||
|
|
@ -386,6 +386,42 @@ public interface JavaTestMessages {
|
|||
}
|
||||
}
|
||||
|
||||
public class Event3 implements TestMessage {
|
||||
private final String field1V2; // same as in Event2
|
||||
private final int field3; // renamed field (was field2)
|
||||
|
||||
public Event3(String field1V2, int field3) {
|
||||
this.field1V2 = field1V2;
|
||||
this.field3 = field3;
|
||||
}
|
||||
|
||||
public String getField1V2() {
|
||||
return field1V2;
|
||||
}
|
||||
|
||||
public int getField3() {
|
||||
return field3;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
|
||||
Event3 event3 = (Event3) o;
|
||||
|
||||
if (field3 != event3.field3) return false;
|
||||
return field1V2 != null ? field1V2.equals(event3.field1V2) : event3.field1V2 == null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = field1V2 != null ? field1V2.hashCode() : 0;
|
||||
result = 31 * result + field3;
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
public class Zoo implements TestMessage {
|
||||
public final Animal first;
|
||||
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ package jdoc.akka.serialization.jackson.v1;
|
|||
import jdoc.akka.serialization.jackson.MySerializable;
|
||||
|
||||
// #add-optional
|
||||
// #forward-one-rename
|
||||
public class ItemAdded implements MySerializable {
|
||||
public final String shoppingCartId;
|
||||
public final String productId;
|
||||
|
|
@ -18,4 +19,5 @@ public class ItemAdded implements MySerializable {
|
|||
this.quantity = quantity;
|
||||
}
|
||||
}
|
||||
// #forward-one-rename
|
||||
// #add-optional
|
||||
|
|
|
|||
|
|
@ -0,0 +1,37 @@
|
|||
/*
|
||||
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package jdoc.akka.serialization.jackson.v1withv2;
|
||||
|
||||
// #forward-one-rename
|
||||
|
||||
import akka.serialization.jackson.JacksonMigration;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
|
||||
public class ItemAddedMigration extends JacksonMigration {
|
||||
|
||||
// Data produced in this node is still produced using the version 1 of the schema
|
||||
@Override
|
||||
public int currentVersion() {
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int supportedForwardVersion() {
|
||||
return 2;
|
||||
}
|
||||
|
||||
@Override
|
||||
public JsonNode transform(int fromVersion, JsonNode json) {
|
||||
ObjectNode root = (ObjectNode) json;
|
||||
if (fromVersion == 2) {
|
||||
// When receiving an event of version 2 we down-cast it to the version 1 of the schema
|
||||
root.set("productId", root.get("itemId"));
|
||||
root.remove("itemId");
|
||||
}
|
||||
return root;
|
||||
}
|
||||
}
|
||||
// #forward-one-rename
|
||||
|
|
@ -4,6 +4,7 @@
|
|||
|
||||
package akka.serialization.jackson
|
||||
|
||||
import java.lang
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.time.Duration
|
||||
import java.time.Instant
|
||||
|
|
@ -16,8 +17,10 @@ import java.util.UUID
|
|||
import java.util.logging.FileHandler
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo
|
||||
import com.fasterxml.jackson.core.JsonFactory
|
||||
|
|
@ -27,15 +30,14 @@ import com.fasterxml.jackson.core.StreamReadFeature
|
|||
import com.fasterxml.jackson.core.StreamWriteFeature
|
||||
import com.fasterxml.jackson.core.`type`.TypeReference
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature
|
||||
import com.fasterxml.jackson.databind.JsonNode
|
||||
import com.fasterxml.jackson.databind.MapperFeature
|
||||
import com.fasterxml.jackson.databind.Module
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import com.fasterxml.jackson.databind.SerializationFeature
|
||||
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
|
||||
import com.fasterxml.jackson.databind.annotation.JsonSerialize
|
||||
import com.fasterxml.jackson.databind.exc.InvalidTypeIdException
|
||||
import com.fasterxml.jackson.databind.json.JsonMapper
|
||||
import com.fasterxml.jackson.databind.node.IntNode
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode
|
||||
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
|
||||
import com.fasterxml.jackson.module.scala.JsonScalaEnumeration
|
||||
import com.github.ghik.silencer.silent
|
||||
|
|
@ -43,6 +45,7 @@ import com.typesafe.config.ConfigFactory
|
|||
import org.scalatest.BeforeAndAfterAll
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
import org.scalatest.wordspec.AnyWordSpecLike
|
||||
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.Address
|
||||
|
|
@ -54,10 +57,8 @@ import akka.actor.typed.scaladsl.Behaviors
|
|||
import akka.serialization.Serialization
|
||||
import akka.serialization.SerializationExtension
|
||||
import akka.serialization.SerializerWithStringManifest
|
||||
import akka.testkit.{ TestActors, TestKit }
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore
|
||||
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
|
||||
import com.fasterxml.jackson.databind.annotation.JsonSerialize
|
||||
import akka.testkit.TestActors
|
||||
import akka.testkit.TestKit
|
||||
|
||||
object ScalaTestMessages {
|
||||
trait TestMessage
|
||||
|
|
@ -86,6 +87,7 @@ object ScalaTestMessages {
|
|||
|
||||
final case class Event1(field1: String) extends TestMessage
|
||||
final case class Event2(field1V2: String, field2: Int) extends TestMessage
|
||||
final case class Event3(field1V2: String, field3: Int) extends TestMessage
|
||||
|
||||
final case class Zoo(first: Animal) extends TestMessage
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
|
||||
|
|
@ -154,21 +156,6 @@ object ScalaTestMessages {
|
|||
extends TestMessage
|
||||
}
|
||||
|
||||
class ScalaTestEventMigration extends JacksonMigration {
|
||||
override def currentVersion = 3
|
||||
|
||||
override def transformClassName(fromVersion: Int, className: String): String =
|
||||
classOf[ScalaTestMessages.Event2].getName
|
||||
|
||||
override def transform(fromVersion: Int, json: JsonNode): JsonNode = {
|
||||
val root = json.asInstanceOf[ObjectNode]
|
||||
root.set[JsonNode]("field1V2", root.get("field1"))
|
||||
root.remove("field1")
|
||||
root.set[JsonNode]("field2", IntNode.valueOf(17))
|
||||
root
|
||||
}
|
||||
}
|
||||
|
||||
class JacksonCborSerializerSpec extends JacksonSerializerSpec("jackson-cbor") {
|
||||
"have compression disabled by default" in {
|
||||
val conf = JacksonObjectMapperProvider.configForBinding("jackson-cbor", system.settings.config)
|
||||
|
|
@ -658,17 +645,8 @@ class JacksonJsonSerializerSpec extends JacksonSerializerSpec("jackson-json") {
|
|||
}
|
||||
}
|
||||
|
||||
abstract class JacksonSerializerSpec(serializerName: String)
|
||||
extends TestKit(
|
||||
ActorSystem(
|
||||
"JacksonJsonSerializerSpec",
|
||||
ConfigFactory.parseString(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
"akka.serialization.jackson.JavaTestMessages$$Event1" = "akka.serialization.jackson.JavaTestEventMigration"
|
||||
"akka.serialization.jackson.JavaTestMessages$$Event2" = "akka.serialization.jackson.JavaTestEventMigration"
|
||||
"akka.serialization.jackson.ScalaTestMessages$$Event1" = "akka.serialization.jackson.ScalaTestEventMigration"
|
||||
"akka.serialization.jackson.ScalaTestMessages$$Event2" = "akka.serialization.jackson.ScalaTestEventMigration"
|
||||
}
|
||||
object JacksonSerializerSpec {
|
||||
def baseConfig(serializerName: String): String = s"""
|
||||
akka.actor {
|
||||
serialization-bindings {
|
||||
"akka.serialization.jackson.ScalaTestMessages$$TestMessage" = $serializerName
|
||||
|
|
@ -676,6 +654,7 @@ abstract class JacksonSerializerSpec(serializerName: String)
|
|||
}
|
||||
}
|
||||
akka.serialization.jackson.allowed-class-prefix = ["akka.serialization.jackson.ScalaTestMessages$$OldCommand"]
|
||||
|
||||
akka.actor {
|
||||
serializers {
|
||||
inner-serializer = "akka.serialization.jackson.ScalaTestMessages$$InnerSerializationSerializer"
|
||||
|
|
@ -684,7 +663,14 @@ abstract class JacksonSerializerSpec(serializerName: String)
|
|||
"akka.serialization.jackson.ScalaTestMessages$$HasAkkaSerializer" = "inner-serializer"
|
||||
}
|
||||
}
|
||||
""")))
|
||||
"""
|
||||
}
|
||||
|
||||
abstract class JacksonSerializerSpec(serializerName: String)
|
||||
extends TestKit(
|
||||
ActorSystem(
|
||||
"JacksonJsonSerializerSpec",
|
||||
ConfigFactory.parseString(JacksonSerializerSpec.baseConfig(serializerName))))
|
||||
with AnyWordSpecLike
|
||||
with Matchers
|
||||
with BeforeAndAfterAll {
|
||||
|
|
@ -817,22 +803,138 @@ abstract class JacksonSerializerSpec(serializerName: String)
|
|||
}
|
||||
}
|
||||
|
||||
"deserialize with migrations" in {
|
||||
// TODO: Consider moving the migrations Specs to a separate Spec
|
||||
"deserialize with migrations" in withSystem(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
## Usually the key is a FQCN but we're hacking the name to use multiple migrations for the
|
||||
## same type in a single test.
|
||||
"deserialize-Java.Event1-into-Java.Event3" = "akka.serialization.jackson.JavaTestEventMigrationV3"
|
||||
}
|
||||
""" + JacksonSerializerSpec.baseConfig(serializerName)) { sys =>
|
||||
val event1 = new Event1("a")
|
||||
val serializer = serializerFor(event1)
|
||||
val serializer = serializerFor(event1, sys)
|
||||
val blob = serializer.toBinary(event1)
|
||||
val event2 = serializer.fromBinary(blob, classOf[Event1].getName).asInstanceOf[Event2]
|
||||
event1.getField1 should ===(event2.getField1V2)
|
||||
event2.getField2 should ===(17)
|
||||
|
||||
// Event1 has no migration configured so it uses the default manifest name (with no version)
|
||||
serializer.manifest(event1) should ===(classOf[Event1].getName)
|
||||
|
||||
// Hack the manifest to enforce the use a particular migration when deserializing the blob of Event1
|
||||
val event3 = serializer.fromBinary(blob, "deserialize-Java.Event1-into-Java.Event3").asInstanceOf[Event3]
|
||||
event1.getField1 should ===(event3.getField1V2)
|
||||
event3.getField3 should ===(17)
|
||||
}
|
||||
|
||||
"deserialize with migrations from V2" in {
|
||||
// produce a blob/manifest from an ActorSystem without migrations
|
||||
val event1 = new Event1("a")
|
||||
val serializer = serializerFor(event1)
|
||||
val blob = serializer.toBinary(event1)
|
||||
val event2 = serializer.fromBinary(blob, classOf[Event1].getName + "#2").asInstanceOf[Event2]
|
||||
event1.getField1 should ===(event2.getField1V2)
|
||||
event2.getField2 should ===(17)
|
||||
val manifest = serializer.manifest(event1)
|
||||
|
||||
withSystem(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
"akka.serialization.jackson.JavaTestMessages$$Event1" = "akka.serialization.jackson.JavaTestEventMigrationV2"
|
||||
"akka.serialization.jackson.JavaTestMessages$$Event2" = "akka.serialization.jackson.JavaTestEventMigrationV2"
|
||||
}
|
||||
""" + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2 =>
|
||||
// read the blob/manifest from an ActorSystem with migrations
|
||||
val serializerV2: JacksonSerializer = serializerFor(event1, sysV2)
|
||||
val event2 = serializerV2.fromBinary(blob, manifest).asInstanceOf[Event2]
|
||||
event1.getField1 should ===(event2.getField1V2)
|
||||
event2.getField2 should ===(17)
|
||||
|
||||
// Event2 has a migration configured so it uses a manifest with a version
|
||||
val serializerFor2 = serializerFor(event2, sysV2)
|
||||
serializerFor2.manifest(event2) should ===(classOf[Event2].getName + "#2")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
"use the migration's currentVersion on new serializations" in {
|
||||
withSystem(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
"akka.serialization.jackson.JavaTestMessages$$Event2" = "akka.serialization.jackson.JavaTestEventMigrationV2"
|
||||
}
|
||||
""" + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2 =>
|
||||
val event2 = new Event2("a", 17)
|
||||
// Event2 has a migration configured so it uses a manifest with a version
|
||||
val serializer2 = serializerFor(event2, sysV2)
|
||||
serializer2.manifest(event2) should ===(classOf[Event2].getName + "#2")
|
||||
}
|
||||
}
|
||||
|
||||
"use the migration's currentVersion on new serializations when supporting forward versions" in {
|
||||
withSystem(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
"akka.serialization.jackson.JavaTestMessages$$Event2" = "akka.serialization.jackson.JavaTestEventMigrationV2WithV3"
|
||||
}
|
||||
""" + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2 =>
|
||||
val event2 = new Event2("a", 17)
|
||||
// Event2 has a migration configured so it uses a manifest with a version
|
||||
val serializer2 = serializerFor(event2, sysV2)
|
||||
serializer2.manifest(event2) should ===(classOf[Event2].getName + "#2")
|
||||
}
|
||||
}
|
||||
|
||||
"deserialize a V3 blob into a V2 class (forward-one support) and back" in {
|
||||
|
||||
val blobV3 =
|
||||
withSystem(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
"akka.serialization.jackson.JavaTestMessages$$Event3" = "akka.serialization.jackson.JavaTestEventMigrationV3"
|
||||
}
|
||||
""" + JacksonSerializerSpec.baseConfig(serializerName)) { sysV3 =>
|
||||
val event3 = new Event3("Steve", 49)
|
||||
val serializer = serializerFor(event3, sysV3)
|
||||
val blob = serializer.toBinary(event3)
|
||||
blob
|
||||
}
|
||||
|
||||
val blobV2 =
|
||||
withSystem(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
"akka.serialization.jackson.JavaTestMessages$$Event2" = "akka.serialization.jackson.JavaTestEventMigrationV2WithV3"
|
||||
}
|
||||
""" + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2WithV3 =>
|
||||
val serializerForEvent2 =
|
||||
serialization(sysV2WithV3).serializerFor(classOf[Event2]).asInstanceOf[JacksonSerializer]
|
||||
val event2 = serializerForEvent2.fromBinary(blobV3, classOf[Event2].getName + "#3").asInstanceOf[Event2]
|
||||
event2.getField1V2 should ===("Steve")
|
||||
event2.getField2 should ===(49)
|
||||
serializerForEvent2.toBinary(event2)
|
||||
}
|
||||
|
||||
withSystem(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
"akka.serialization.jackson.JavaTestMessages$$Event3" = "akka.serialization.jackson.JavaTestEventMigrationV3"
|
||||
}
|
||||
""" + JacksonSerializerSpec.baseConfig(serializerName)) { sysV3 =>
|
||||
val serializerForEvent3 = serialization(sysV3).serializerFor(classOf[Event3]).asInstanceOf[JacksonSerializer]
|
||||
val event3 = serializerForEvent3.fromBinary(blobV2, classOf[Event3].getName + "#2").asInstanceOf[Event3]
|
||||
event3.getField1V2 should ===("Steve")
|
||||
event3.getField3 should ===(49)
|
||||
}
|
||||
}
|
||||
|
||||
"deserialize unsupported versions throws an exception" in {
|
||||
intercept[lang.IllegalStateException] {
|
||||
withSystem(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
"akka.serialization.jackson.JavaTestMessages$$Event1" = "akka.serialization.jackson.JavaTestEventMigrationV2"
|
||||
"akka.serialization.jackson.JavaTestMessages$$Event2" = "akka.serialization.jackson.JavaTestEventMigrationV2"
|
||||
}
|
||||
""" + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2 =>
|
||||
// produce a blob/manifest from an ActorSystem without migrations
|
||||
val event1 = new Event1("a")
|
||||
val serializer = serializerFor(event1)
|
||||
val blob = serializer.toBinary(event1)
|
||||
val manifest = serializer.manifest(event1)
|
||||
// Event1 has no migration configured so it uses the default manifest name (with no version)
|
||||
val serializerV2: JacksonSerializer = serializerFor(event1, sysV2)
|
||||
serializerV2.fromBinary(blob, manifest + "#9").asInstanceOf[Event2]
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -927,22 +1029,125 @@ abstract class JacksonSerializerSpec(serializerName: String)
|
|||
}
|
||||
}
|
||||
|
||||
"deserialize with migrations" in {
|
||||
// TODO: Consider moving the migrations Specs to a separate Spec
|
||||
"deserialize with migrations" in withSystem(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
## Usually the key is a FQCN but we're hacking the name to use multiple migrations for the
|
||||
## same type in a single test.
|
||||
"deserialize-Event1-into-Event3" = "akka.serialization.jackson.ScalaTestEventMigrationV3"
|
||||
}
|
||||
""" + JacksonSerializerSpec.baseConfig(serializerName)) { sys =>
|
||||
val event1 = Event1("a")
|
||||
val serializer = serializerFor(event1)
|
||||
val serializer = serializerFor(event1, sys)
|
||||
val blob = serializer.toBinary(event1)
|
||||
val event2 = serializer.fromBinary(blob, classOf[Event1].getName).asInstanceOf[Event2]
|
||||
event1.field1 should ===(event2.field1V2)
|
||||
event2.field2 should ===(17)
|
||||
|
||||
// Event1 has no migration configured so it uses the default manifest name (with no version)
|
||||
serializer.manifest(event1) should ===(classOf[Event1].getName)
|
||||
|
||||
// Hack the manifest to enforce the use a particular migration when deserializing the blob of Event1
|
||||
val event3 = serializer.fromBinary(blob, "deserialize-Event1-into-Event3").asInstanceOf[Event3]
|
||||
event1.field1 should ===(event3.field1V2)
|
||||
event3.field3 should ===(17)
|
||||
}
|
||||
|
||||
"deserialize with migrations from V2" in {
|
||||
// produce a blob/manifest from an ActorSystem without migrations
|
||||
val event1 = Event1("a")
|
||||
val serializer = serializerFor(event1)
|
||||
val blob = serializer.toBinary(event1)
|
||||
val event2 = serializer.fromBinary(blob, classOf[Event1].getName + "#2").asInstanceOf[Event2]
|
||||
event1.field1 should ===(event2.field1V2)
|
||||
event2.field2 should ===(17)
|
||||
val manifest = serializer.manifest(event1)
|
||||
|
||||
withSystem(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
"akka.serialization.jackson.ScalaTestMessages$$Event1" = "akka.serialization.jackson.ScalaTestEventMigrationV2"
|
||||
"akka.serialization.jackson.ScalaTestMessages$$Event2" = "akka.serialization.jackson.ScalaTestEventMigrationV2"
|
||||
}
|
||||
""" + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2 =>
|
||||
// read the blob/manifest from an ActorSystem with migrations
|
||||
val serializerV2: JacksonSerializer = serializerFor(event1, sysV2)
|
||||
val event2 = serializerV2.fromBinary(blob, manifest).asInstanceOf[Event2]
|
||||
event1.field1 should ===(event2.field1V2)
|
||||
event2.field2 should ===(17)
|
||||
|
||||
// Event2 has a migration configured so it uses a manifest with a version
|
||||
val serializerFor2 = serializerFor(event2, sysV2)
|
||||
serializerFor2.manifest(event2) should ===(classOf[Event2].getName + "#2")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
"use the migration's currentVersion on new serializations" in {
|
||||
withSystem(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
"akka.serialization.jackson.ScalaTestMessages$$Event2" = "akka.serialization.jackson.ScalaTestEventMigrationV2"
|
||||
}
|
||||
""" + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2 =>
|
||||
val event2 = new Event2("a", 17)
|
||||
// Event2 has a migration configured so it uses a manifest with a version
|
||||
val serializer2 = serializerFor(event2, sysV2)
|
||||
serializer2.manifest(event2) should ===(classOf[Event2].getName + "#2")
|
||||
}
|
||||
}
|
||||
|
||||
"deserialize a V3 blob into a V2 class (forward-one support) and back" in {
|
||||
|
||||
val blobV3 =
|
||||
withSystem(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
"akka.serialization.jackson.ScalaTestMessages$$Event3" = "akka.serialization.jackson.ScalaTestEventMigrationV3"
|
||||
}
|
||||
""" + JacksonSerializerSpec.baseConfig(serializerName)) { sysV3 =>
|
||||
val event3 = new Event3("Steve", 49)
|
||||
val serializer = serializerFor(event3, sysV3)
|
||||
val blob = serializer.toBinary(event3)
|
||||
blob
|
||||
}
|
||||
|
||||
val blobV2 =
|
||||
withSystem(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
"akka.serialization.jackson.ScalaTestMessages$$Event2" = "akka.serialization.jackson.ScalaTestEventMigrationV2WithV3"
|
||||
}
|
||||
""" + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2WithV3 =>
|
||||
val serializerForEvent2 =
|
||||
serialization(sysV2WithV3).serializerFor(classOf[Event2]).asInstanceOf[JacksonSerializer]
|
||||
val event2 = serializerForEvent2.fromBinary(blobV3, classOf[Event2].getName + "#3").asInstanceOf[Event2]
|
||||
event2.field1V2 should ===("Steve")
|
||||
event2.field2 should ===(49)
|
||||
serializerForEvent2.toBinary(event2)
|
||||
}
|
||||
|
||||
withSystem(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
"akka.serialization.jackson.ScalaTestMessages$$Event3" = "akka.serialization.jackson.ScalaTestEventMigrationV3"
|
||||
}
|
||||
""" + JacksonSerializerSpec.baseConfig(serializerName)) { sysV3 =>
|
||||
val serializerForEvent3 = serialization(sysV3).serializerFor(classOf[Event3]).asInstanceOf[JacksonSerializer]
|
||||
val event3 = serializerForEvent3.fromBinary(blobV2, classOf[Event3].getName + "#2").asInstanceOf[Event3]
|
||||
event3.field1V2 should ===("Steve")
|
||||
event3.field3 should ===(49)
|
||||
}
|
||||
}
|
||||
|
||||
"deserialize unsupported versions throws an exception" in {
|
||||
intercept[lang.IllegalStateException] {
|
||||
withSystem(s"""
|
||||
akka.serialization.jackson.migrations {
|
||||
"akka.serialization.jackson.ScalaTestMessages$$Event1" = "akka.serialization.jackson.ScalaTestEventMigrationV2"
|
||||
"akka.serialization.jackson.ScalaTestMessages$$Event2" = "akka.serialization.jackson.ScalaTestEventMigrationV2"
|
||||
}
|
||||
""" + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2 =>
|
||||
// produce a blob/manifest from an ActorSystem without migrations
|
||||
val event1 = new Event1("a")
|
||||
val serializer = serializerFor(event1)
|
||||
val blob = serializer.toBinary(event1)
|
||||
val manifest = serializer.manifest(event1)
|
||||
// Event1 has no migration configured so it uses the default manifest name (with no version)
|
||||
val serializerV2: JacksonSerializer = serializerFor(event1, sysV2)
|
||||
serializerV2.fromBinary(blob, manifest + "#9").asInstanceOf[Event2]
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
"not allow serialization of deny listed class" in {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,94 @@
|
|||
/*
|
||||
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.serialization.jackson
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode
|
||||
import com.fasterxml.jackson.databind.node.IntNode
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode
|
||||
|
||||
object ScalaTestEventMigration {
|
||||
def upcastV1ToV2(root: ObjectNode): ObjectNode = {
|
||||
root.set[JsonNode]("field1V2", root.get("field1"))
|
||||
root.remove("field1")
|
||||
root.set[JsonNode]("field2", IntNode.valueOf(17))
|
||||
root
|
||||
}
|
||||
|
||||
def upcastV2ToV3(root: ObjectNode): ObjectNode = {
|
||||
root.set("field3", root.get("field2"))
|
||||
root.remove("field2")
|
||||
root
|
||||
}
|
||||
|
||||
def downcastV3ToV2(root: ObjectNode) = {
|
||||
// downcast the V3 representation to the V2 representation. A field
|
||||
// is renamed.
|
||||
root.set("field2", root.get("field3"))
|
||||
root.remove("field3")
|
||||
root
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class ScalaTestEventMigrationV2 extends JacksonMigration {
|
||||
import ScalaTestEventMigration._
|
||||
|
||||
override def currentVersion = 2
|
||||
|
||||
override def transformClassName(fromVersion: Int, className: String): String =
|
||||
classOf[ScalaTestMessages.Event2].getName
|
||||
|
||||
override def transform(fromVersion: Int, json: JsonNode): JsonNode = {
|
||||
val root = json.asInstanceOf[ObjectNode]
|
||||
upcastV1ToV2(root)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class ScalaTestEventMigrationV2WithV3 extends JacksonMigration {
|
||||
import ScalaTestEventMigration._
|
||||
|
||||
override def currentVersion = 2
|
||||
|
||||
override def supportedForwardVersion: Int = 3
|
||||
|
||||
// Always produce the type of the currentVersion. When fromVersion is lower,
|
||||
// transform will lift it. When fromVersion is higher, transform will downcast it.
|
||||
override def transformClassName(fromVersion: Int, className: String): String =
|
||||
classOf[ScalaTestMessages.Event2].getName
|
||||
|
||||
override def transform(fromVersion: Int, json: JsonNode): JsonNode = {
|
||||
var root = json.asInstanceOf[ObjectNode]
|
||||
if (fromVersion < 2) {
|
||||
root = upcastV1ToV2(root)
|
||||
}
|
||||
if (fromVersion == 3) {
|
||||
root = downcastV3ToV2(root)
|
||||
}
|
||||
root
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class ScalaTestEventMigrationV3 extends JacksonMigration {
|
||||
import ScalaTestEventMigration._
|
||||
|
||||
override def currentVersion = 3
|
||||
|
||||
override def transformClassName(fromVersion: Int, className: String): String =
|
||||
classOf[ScalaTestMessages.Event3].getName
|
||||
|
||||
override def transform(fromVersion: Int, json: JsonNode): JsonNode = {
|
||||
var root = json.asInstanceOf[ObjectNode]
|
||||
if (fromVersion < 2) {
|
||||
root = upcastV1ToV2(root)
|
||||
}
|
||||
if (fromVersion < 3) {
|
||||
root = upcastV2ToV3(root)
|
||||
}
|
||||
root
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -12,8 +12,11 @@ import akka.serialization.SerializationExtension
|
|||
import akka.serialization.SerializerWithStringManifest
|
||||
import akka.serialization.Serializers
|
||||
import akka.testkit.TestKit
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo
|
||||
import com.fasterxml.jackson.annotation.{ JsonSubTypes, JsonTypeInfo, JsonTypeName }
|
||||
import com.fasterxml.jackson.core.JsonParser
|
||||
import com.fasterxml.jackson.databind.DeserializationContext
|
||||
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
|
||||
import com.fasterxml.jackson.databind.deser.std.StdDeserializer
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
|
|
@ -126,20 +129,51 @@ object SerializationDocSpec {
|
|||
#//#manifestless
|
||||
"""
|
||||
|
||||
//#polymorphism
|
||||
final case class Zoo(primaryAttraction: Animal) extends MySerializable
|
||||
object Polymorphism {
|
||||
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
|
||||
@JsonSubTypes(
|
||||
Array(
|
||||
new JsonSubTypes.Type(value = classOf[Lion], name = "lion"),
|
||||
new JsonSubTypes.Type(value = classOf[Elephant], name = "elephant")))
|
||||
sealed trait Animal
|
||||
//#polymorphism
|
||||
final case class Zoo(primaryAttraction: Animal) extends MySerializable
|
||||
|
||||
final case class Lion(name: String) extends Animal
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
|
||||
@JsonSubTypes(
|
||||
Array(
|
||||
new JsonSubTypes.Type(value = classOf[Lion], name = "lion"),
|
||||
new JsonSubTypes.Type(value = classOf[Elephant], name = "elephant")))
|
||||
sealed trait Animal
|
||||
|
||||
final case class Elephant(name: String, age: Int) extends Animal
|
||||
//#polymorphism
|
||||
final case class Lion(name: String) extends Animal
|
||||
|
||||
final case class Elephant(name: String, age: Int) extends Animal
|
||||
//#polymorphism
|
||||
}
|
||||
|
||||
object PolymorphismMixedClassObject {
|
||||
|
||||
//#polymorphism-case-object
|
||||
final case class Zoo(primaryAttraction: Animal) extends MySerializable
|
||||
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
|
||||
@JsonSubTypes(
|
||||
Array(
|
||||
new JsonSubTypes.Type(value = classOf[Lion], name = "lion"),
|
||||
new JsonSubTypes.Type(value = classOf[Elephant], name = "elephant"),
|
||||
new JsonSubTypes.Type(value = classOf[Unicorn], name = "unicorn")))
|
||||
sealed trait Animal
|
||||
|
||||
final case class Lion(name: String) extends Animal
|
||||
final case class Elephant(name: String, age: Int) extends Animal
|
||||
|
||||
@JsonDeserialize(using = classOf[UnicornDeserializer])
|
||||
sealed trait Unicorn extends Animal
|
||||
@JsonTypeName("unicorn")
|
||||
case object Unicorn extends Unicorn
|
||||
|
||||
class UnicornDeserializer extends StdDeserializer[Unicorn](Unicorn.getClass) {
|
||||
// whenever we need to deserialize an instance of Unicorn trait, we return the object Unicorn
|
||||
override def deserialize(p: JsonParser, ctxt: DeserializationContext): Unicorn = Unicorn
|
||||
}
|
||||
//#polymorphism-case-object
|
||||
}
|
||||
|
||||
val configDateTime = """
|
||||
#//#date-time
|
||||
|
|
@ -207,6 +241,19 @@ class SerializationDocSpec
|
|||
private def serializerFor(obj: Any): SerializerWithStringManifest =
|
||||
serialization.serializerFor(obj.getClass).asInstanceOf[SerializerWithStringManifest]
|
||||
|
||||
"serialize trait + case classes" in {
|
||||
import doc.akka.serialization.jackson.SerializationDocSpec.Polymorphism._
|
||||
verifySerialization(Zoo(Lion("Simba"))) should ===(Zoo(Lion("Simba")))
|
||||
verifySerialization(Zoo(Elephant("Dumbo", 1))) should ===(Zoo(Elephant("Dumbo", 1)))
|
||||
}
|
||||
|
||||
"serialize trait + case classes + case object" in {
|
||||
import doc.akka.serialization.jackson.SerializationDocSpec.PolymorphismMixedClassObject._
|
||||
verifySerialization(Zoo(Lion("Simba"))) should ===(Zoo(Lion("Simba")))
|
||||
verifySerialization(Zoo(Elephant("Dumbo", 1))) should ===(Zoo(Elephant("Dumbo", 1)))
|
||||
verifySerialization(Zoo(Unicorn)) should ===(Zoo(Unicorn))
|
||||
}
|
||||
|
||||
"serialize trait + object ADT" in {
|
||||
import CustomAdtSerializer.Compass
|
||||
import CustomAdtSerializer.Direction._
|
||||
|
|
|
|||
|
|
@ -7,5 +7,7 @@ package doc.akka.serialization.jackson.v1
|
|||
import doc.akka.serialization.jackson.MySerializable
|
||||
|
||||
// #add-optional
|
||||
// #forward-one-rename
|
||||
case class ItemAdded(shoppingCartId: String, productId: String, quantity: Int) extends MySerializable
|
||||
// #forward-one-rename
|
||||
// #add-optional
|
||||
|
|
|
|||
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package doc.akka.serialization.jackson.v1withv2
|
||||
|
||||
// #forward-one-rename
|
||||
import akka.serialization.jackson.JacksonMigration
|
||||
import com.fasterxml.jackson.databind.JsonNode
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode
|
||||
|
||||
class ItemAddedMigration extends JacksonMigration {
|
||||
|
||||
// Data produced in this node is still produced using the version 1 of the schema
|
||||
override def currentVersion: Int = 1
|
||||
|
||||
override def supportedForwardVersion: Int = 2
|
||||
|
||||
override def transform(fromVersion: Int, json: JsonNode): JsonNode = {
|
||||
val root = json.asInstanceOf[ObjectNode]
|
||||
if (fromVersion == 2) {
|
||||
// When receiving an event of version 2 we down-cast it to the version 1 of the schema
|
||||
root.set[JsonNode]("productId", root.get("itemId"))
|
||||
root.remove("itemId")
|
||||
}
|
||||
root
|
||||
}
|
||||
}
|
||||
// #forward-one-rename
|
||||
|
|
@ -347,8 +347,10 @@ lazy val protobufV3 = akkaModule("akka-protobuf-v3")
|
|||
exportJars := true, // in dependent projects, use assembled and shaded jar
|
||||
makePomConfiguration := makePomConfiguration.value
|
||||
.withConfigurations(Vector(Compile)), // prevent original dependency to be added to pom as runtime dep
|
||||
packagedArtifact in (Compile, packageBin) := Scoped
|
||||
.mkTuple2((artifact in (Compile, packageBin)).value, OsgiKeys.bundle.value),
|
||||
packagedArtifact in (Compile, packageBin) := Scoped.mkTuple2(
|
||||
(artifact in (Compile, packageBin)).value,
|
||||
ReproducibleBuildsPlugin.postProcessJar(OsgiKeys.bundle.value)
|
||||
),
|
||||
packageBin in Compile := ReproducibleBuildsPlugin
|
||||
.postProcessJar((assembly in Compile).value), // package by running assembly
|
||||
// Prevent cyclic task dependencies, see https://github.com/sbt/sbt-assembly/issues/365
|
||||
|
|
|
|||
|
|
@ -64,7 +64,7 @@ object AkkaDisciplinePlugin extends AutoPlugin {
|
|||
"akka-testkit")
|
||||
|
||||
lazy val silencerSettings = {
|
||||
val silencerVersion = "1.7.0"
|
||||
val silencerVersion = "1.7.1"
|
||||
Seq(
|
||||
libraryDependencies ++= Seq(
|
||||
compilerPlugin(("com.github.ghik" %% "silencer-plugin" % silencerVersion).cross(CrossVersion.patch)),
|
||||
|
|
|
|||
|
|
@ -35,17 +35,27 @@ object Dependencies {
|
|||
val scalaTestVersion = "3.1.1"
|
||||
val scalaCheckVersion = "1.14.3"
|
||||
|
||||
val Versions = Seq(
|
||||
crossScalaVersions := Seq(scala212Version, scala213Version),
|
||||
scalaVersion := System.getProperty("akka.build.scalaVersion", crossScalaVersions.value.head),
|
||||
java8CompatVersion := {
|
||||
CrossVersion.partialVersion(scalaVersion.value) match {
|
||||
// java8-compat is only used in a couple of places for 2.13,
|
||||
// it is probably possible to remove the dependency if needed.
|
||||
case Some((2, n)) if n >= 13 => "0.9.0"
|
||||
case _ => "0.8.0"
|
||||
}
|
||||
})
|
||||
val Versions =
|
||||
Seq(
|
||||
crossScalaVersions := Seq(scala212Version, scala213Version),
|
||||
scalaVersion := {
|
||||
// don't allow full override to keep compatible with the version of silencer
|
||||
// don't mandate patch not specified to allow builds to migrate
|
||||
System.getProperty("akka.build.scalaVersion", "default") match {
|
||||
case twoThirteen if twoThirteen.startsWith("2.13") => scala213Version
|
||||
case twoTwelve if twoTwelve.startsWith("2.12") => scala212Version
|
||||
case "default" => crossScalaVersions.value.head
|
||||
case other => throw new IllegalArgumentException(s"Unsupported scala version [$other]. Must be 2.12 or 2.13.")
|
||||
}
|
||||
},
|
||||
java8CompatVersion := {
|
||||
CrossVersion.partialVersion(scalaVersion.value) match {
|
||||
// java8-compat is only used in a couple of places for 2.13,
|
||||
// it is probably possible to remove the dependency if needed.
|
||||
case Some((2, n)) if n >= 13 => "0.9.0"
|
||||
case _ => "0.8.0"
|
||||
}
|
||||
})
|
||||
|
||||
object Compile {
|
||||
// Compile
|
||||
|
|
@ -63,7 +73,7 @@ object Dependencies {
|
|||
|
||||
val sigar = "org.fusesource" % "sigar" % "1.6.4" // ApacheV2
|
||||
|
||||
val jctools = "org.jctools" % "jctools-core" % "3.0.0" // ApacheV2
|
||||
val jctools = "org.jctools" % "jctools-core" % "3.0.1" // ApacheV2
|
||||
|
||||
// reactive streams
|
||||
val reactiveStreams = "org.reactivestreams" % "reactive-streams" % reactiveStreamsVersion // CC0
|
||||
|
|
@ -131,8 +141,8 @@ object Dependencies {
|
|||
val dockerClient = "com.spotify" % "docker-client" % "8.16.0" % "test" // ApacheV2
|
||||
|
||||
// metrics, measurements, perf testing
|
||||
val metrics = "io.dropwizard.metrics" % "metrics-core" % "4.1.11" % "test" // ApacheV2
|
||||
val metricsJvm = "io.dropwizard.metrics" % "metrics-jvm" % "4.1.11" % "test" // ApacheV2
|
||||
val metrics = "io.dropwizard.metrics" % "metrics-core" % "4.1.12.1" % "test" // ApacheV2
|
||||
val metricsJvm = "io.dropwizard.metrics" % "metrics-jvm" % "4.1.12.1" % "test" // ApacheV2
|
||||
val latencyUtils = "org.latencyutils" % "LatencyUtils" % "2.0.3" % "test" // Free BSD
|
||||
val hdrHistogram = "org.hdrhistogram" % "HdrHistogram" % "2.1.12" % "test" // CC0
|
||||
val metricsAll = Seq(metrics, metricsJvm, latencyUtils, hdrHistogram)
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ libraryDependencies += Defaults.sbtPluginExtra(
|
|||
addSbtPlugin("com.typesafe.sbt" % "sbt-multi-jvm" % "0.4.0")
|
||||
//#sbt-multi-jvm
|
||||
|
||||
addSbtPlugin("com.lightbend.sbt" % "sbt-java-formatter" % "0.5.1")
|
||||
addSbtPlugin("com.lightbend.sbt" % "sbt-java-formatter" % "0.6.0")
|
||||
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.3.4")
|
||||
addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.9.15")
|
||||
// sbt-osgi 0.9.5 is available but breaks including jdk9-only classes
|
||||
|
|
@ -23,6 +23,6 @@ addSbtPlugin("com.lightbend.akka" % "sbt-paradox-akka" % "0.35")
|
|||
addSbtPlugin("com.lightbend" % "sbt-whitesource" % "0.1.18")
|
||||
addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.6.0")
|
||||
addSbtPlugin("com.hpe.sbt" % "sbt-pull-request-validator" % "1.0.0")
|
||||
addSbtPlugin("net.bzzt" % "sbt-reproducible-builds" % "0.24")
|
||||
addSbtPlugin("net.bzzt" % "sbt-reproducible-builds" % "0.25")
|
||||
addSbtPlugin("com.dwijnand" % "sbt-dynver" % "4.1.1")
|
||||
addSbtPlugin("com.lightbend.sbt" % "sbt-publish-rsync" % "0.2")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue