From bb0a7c09dd75c1ea1bbdee1879ef08fda2e4c575 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 10 Aug 2020 10:36:05 +0200 Subject: [PATCH 01/29] fix write consistency timeout in Sharding, #29344 --- .../main/scala/akka/cluster/sharding/ShardCoordinator.scala | 4 ++-- .../sharding/internal/DDataRememberEntitiesShardStore.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index 4b4adb3e19..090b882020 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -1150,8 +1150,8 @@ private[akka] class DDataShardCoordinator( case additional => ReadMajorityPlus(settings.tuningParameters.waitingForStateTimeout, majorityMinCap, additional) } private val stateWriteConsistency = settings.tuningParameters.coordinatorStateWriteMajorityPlus match { - case Int.MaxValue => WriteAll(settings.tuningParameters.waitingForStateTimeout) - case additional => WriteMajorityPlus(settings.tuningParameters.waitingForStateTimeout, majorityMinCap, additional) + case Int.MaxValue => WriteAll(settings.tuningParameters.updatingStateTimeout) + case additional => WriteMajorityPlus(settings.tuningParameters.updatingStateTimeout, majorityMinCap, additional) } implicit val node: Cluster = Cluster(context.system) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala index 6766f8d5db..16ebc573cd 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/DDataRememberEntitiesShardStore.scala @@ -95,7 +95,7 @@ private[akka] final class DDataRememberEntitiesShardStore( if (log.isDebugEnabled) { log.debug( - "Starting up DDataRememberEntitiesStore, write timeout: [{}], read timeout: [{}], majority min cap: [{}]", + "Starting up DDataRememberEntitiesStore, read timeout: [{}], write timeout: [{}], majority min cap: [{}]", settings.tuningParameters.waitingForStateTimeout.pretty, settings.tuningParameters.updatingStateTimeout.pretty, majorityMinCap) From 688d46a91f0b1747995e89262fc13182e2e5ae4d Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 10 Aug 2020 14:36:17 +0200 Subject: [PATCH 02/29] Harden BackoffSupervisorSpec, #29467 * order not defined since Terminated sent from two different actors --- .../akka/pattern/BackoffSupervisorSpec.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/pattern/BackoffSupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/BackoffSupervisorSpec.scala index dcccd99b1b..9df54c0ce5 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/BackoffSupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/BackoffSupervisorSpec.scala @@ -283,6 +283,7 @@ class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender with Eventually } "stop restarting the child after reaching maxNrOfRetries limit (Backoff.onStop)" in { + val supervisorWatcher = TestProbe() val supervisor = create(onStopOptions(maxNrOfRetries = 2)) def waitForChild: Option[ActorRef] = { eventually(timeout(1.second), interval(50.millis)) { @@ -295,7 +296,7 @@ class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender with Eventually expectMsgType[BackoffSupervisor.CurrentChild].ref } - watch(supervisor) + supervisorWatcher.watch(supervisor) supervisor ! BackoffSupervisor.GetRestartCount expectMsg(BackoffSupervisor.RestartCount(0)) @@ -327,11 +328,12 @@ class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender with Eventually watch(c3) c3 ! PoisonPill expectTerminated(c3) - expectTerminated(supervisor) + supervisorWatcher.expectTerminated(supervisor) } "stop restarting the child after reaching maxNrOfRetries limit (Backoff.onFailure)" in { filterException[TestException] { + val supervisorWatcher = TestProbe() val supervisor = create(onFailureOptions(maxNrOfRetries = 2)) def waitForChild: Option[ActorRef] = { @@ -345,7 +347,7 @@ class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender with Eventually expectMsgType[BackoffSupervisor.CurrentChild].ref } - watch(supervisor) + supervisorWatcher.watch(supervisor) supervisor ! BackoffSupervisor.GetRestartCount expectMsg(BackoffSupervisor.RestartCount(0)) @@ -377,26 +379,25 @@ class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender with Eventually awaitAssert(c3 should !==(c2)) watch(c3) c3 ! "boom" - withClue("Expected child and supervisor to terminate") { - Set(expectMsgType[Terminated].actor, expectMsgType[Terminated].actor) shouldEqual Set(c3, supervisor) - } - + expectTerminated(c3) + supervisorWatcher.expectTerminated(supervisor) } } "stop restarting the child if final stop message received (Backoff.onStop)" in { val stopMessage = "stop" + val supervisorWatcher = TestProbe() val supervisor: ActorRef = create(onStopOptions(maxNrOfRetries = 100).withFinalStopMessage(_ == stopMessage)) supervisor ! BackoffSupervisor.GetCurrentChild val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get watch(c1) - watch(supervisor) + supervisorWatcher.watch(supervisor) supervisor ! stopMessage expectMsg("stop") c1 ! PoisonPill expectTerminated(c1) - expectTerminated(supervisor) + supervisorWatcher.expectTerminated(supervisor) } "supervisor must not stop when final stop message has not been received" in { @@ -406,7 +407,6 @@ class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender with Eventually supervisor ! BackoffSupervisor.GetCurrentChild val c1 = expectMsgType[BackoffSupervisor.CurrentChild].ref.get watch(c1) - watch(supervisor) supervisorWatcher.watch(supervisor) c1 ! PoisonPill @@ -417,7 +417,7 @@ class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender with Eventually supervisorWatcher.expectNoMessage(20.millis) // supervisor must not terminate supervisor ! stopMessage - expectTerminated(supervisor) + supervisorWatcher.expectTerminated(supervisor) } } } From fc44a02ae2b574287d4497ab57a95911c97f3cd5 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 10 Aug 2020 14:58:26 +0200 Subject: [PATCH 03/29] harden ClusterSingletonManagerLeaseSpec, #29423 * default test timeout too short for the join --- .../ExternalShardAllocationSpec.scala | 2 +- .../ClusterSingletonManagerLeaseSpec.scala | 30 +++++++++++-------- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ExternalShardAllocationSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ExternalShardAllocationSpec.scala index 89ee7a38ee..8502495c75 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ExternalShardAllocationSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ExternalShardAllocationSpec.scala @@ -135,7 +135,7 @@ abstract class ExternalShardAllocationSpec } enterBarrier("allocated-to-new-node") runOn(forth) { - joinWithin(first) + joinWithin(first, max = 10.seconds) } enterBarrier("forth-node-joined") runOn(first, second, third) { diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerLeaseSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerLeaseSpec.scala index 4ee6fbb708..78d87fd130 100644 --- a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerLeaseSpec.scala +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerLeaseSpec.scala @@ -94,24 +94,30 @@ class ClusterSingletonManagerLeaseSpec awaitClusterUp(controller, first) enterBarrier("initial-up") runOn(second) { - joinWithin(first) - awaitAssert({ - cluster.state.members.toList.map(_.status) shouldEqual List(Up, Up, Up) - }, 10.seconds) + within(10.seconds) { + joinWithin(first) + awaitAssert { + cluster.state.members.toList.map(_.status) shouldEqual List(Up, Up, Up) + } + } } enterBarrier("second-up") runOn(third) { - joinWithin(first) - awaitAssert({ - cluster.state.members.toList.map(_.status) shouldEqual List(Up, Up, Up, Up) - }, 10.seconds) + within(10.seconds) { + joinWithin(first) + awaitAssert { + cluster.state.members.toList.map(_.status) shouldEqual List(Up, Up, Up, Up) + } + } } enterBarrier("third-up") runOn(fourth) { - joinWithin(first) - awaitAssert({ - cluster.state.members.toList.map(_.status) shouldEqual List(Up, Up, Up, Up, Up) - }, 10.seconds) + within(10.seconds) { + joinWithin(first) + awaitAssert { + cluster.state.members.toList.map(_.status) shouldEqual List(Up, Up, Up, Up, Up) + } + } } enterBarrier("fourth-up") } From 4be0b3c776a37b476b0aa86bca484424bc7c340e Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 10 Aug 2020 12:23:40 +0200 Subject: [PATCH 04/29] reduce combinations in RandomizedBrainResolverIntegrationSpec, #29185 * when the test generated scenarios that continued "instability" it could result in that node wasn't removed before new became active --- ...ndomizedBrainResolverIntegrationSpec.scala | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sbr/RandomizedBrainResolverIntegrationSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sbr/RandomizedBrainResolverIntegrationSpec.scala index f03e3ec557..a3aa3da7e7 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sbr/RandomizedBrainResolverIntegrationSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sbr/RandomizedBrainResolverIntegrationSpec.scala @@ -284,8 +284,9 @@ class RandomizedSplitBrainResolverIntegrationSpec val side1 = nodes.take(1 + random.nextInt(nodes.size - 1)) val side2 = nodes.drop(side1.size) - val numberOfFlaky = random.nextInt(5) - val healLastFlay = numberOfFlaky > 0 && random.nextBoolean() + // The test is limited to one flaky step, see issue #29185. + val numberOfFlaky = if (cleanSplit) 0 else 1 + val healLastFlaky = numberOfFlaky > 0 && random.nextBoolean() val flaky: Map[Int, (RoleName, List[RoleName])] = (0 until numberOfFlaky).map { i => val from = nodes(random.nextInt(nodes.size)) @@ -296,11 +297,15 @@ class RandomizedSplitBrainResolverIntegrationSpec val delays = (0 until 10).map(_ => 2 + random.nextInt(13)) - log.info(s"Generated $scenario with random seed [$randomSeed] in round [$c]: " + - s"cleanSplit [$cleanSplit], healCleanSplit [$healCleanSplit] " + - (if (cleanSplit) s"side1 [${side1.map(_.name).mkString(", ")}], side2 [${side2.map(_.name).mkString(", ")}] ") + - s"flaky [${flaky.map { case (_, (from, to)) => from.name -> to.map(_.name).mkString("(", ", ", ")") }.mkString("; ")}] " + - s"delays [${delays.mkString(", ")}]") + log.info( + s"Generated $scenario with random seed [$randomSeed] in round [$c]: " + + s"cleanSplit [$cleanSplit], healCleanSplit [$healCleanSplit] " + + (if (cleanSplit) + s"side1 [${side1.map(_.name).mkString(", ")}], side2 [${side2.map(_.name).mkString(", ")}] " + else " ") + + s", flaky [${flaky.map { case (_, (from, to)) => from.name -> to.map(_.name).mkString("(", ", ", ")") }.mkString("; ")}] " + + s", healLastFlaky [$healLastFlaky] " + + s", delays [${delays.mkString(", ")}]") var delayIndex = 0 def nextDelay(): Unit = { @@ -330,7 +335,7 @@ class RandomizedSplitBrainResolverIntegrationSpec nextDelay() } - if (healLastFlay) { + if (healLastFlaky) { val (prevFrom, prevTo) = flaky(flaky.size - 1) for (n <- prevTo) passThrough(prevFrom, n) From b57c34fbbfc3ea0d3b1b703795551ae097caad77 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 17 Aug 2020 13:06:04 +0200 Subject: [PATCH 05/29] debug logging of ArteryUpdSendConsistencyWithThreeLanesSpec, #29465 --- .../scala/akka/remote/artery/RemoteSendConsistencySpec.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala index e98c2b6a66..664aeadf3d 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala @@ -31,6 +31,7 @@ class ArteryUpdSendConsistencyWithOneLaneSpec class ArteryUpdSendConsistencyWithThreeLanesSpec extends AbstractRemoteSendConsistencySpec(ConfigFactory.parseString(""" + akka.loglevel = DEBUG akka.remote.artery.transport = aeron-udp akka.remote.artery.advanced.outbound-lanes = 3 akka.remote.artery.advanced.inbound-lanes = 3 From cb51646d8de3224f109cadad7b9a259f1477a2e2 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 17 Aug 2020 13:28:38 +0200 Subject: [PATCH 06/29] convenience for ShardedDaemonProcess role, #29490 --- ...e-29490-ShardedDaemonProcess-role.excludes | 2 ++ .../typed/ShardedDaemonProcessSettings.scala | 26 +++++++++++++++---- .../internal/ShardedDaemonProcessImpl.scala | 4 +-- .../typed/ShardedDaemonProcessSpec.scala | 4 +-- .../scaladsl/ShardedDaemonProcessSpec.scala | 4 +-- 5 files changed, 28 insertions(+), 12 deletions(-) create mode 100644 akka-cluster-sharding-typed/src/main/mima-filters/2.6.8.backwards.excludes/issue-29490-ShardedDaemonProcess-role.excludes diff --git a/akka-cluster-sharding-typed/src/main/mima-filters/2.6.8.backwards.excludes/issue-29490-ShardedDaemonProcess-role.excludes b/akka-cluster-sharding-typed/src/main/mima-filters/2.6.8.backwards.excludes/issue-29490-ShardedDaemonProcess-role.excludes new file mode 100644 index 0000000000..d3932216bb --- /dev/null +++ b/akka-cluster-sharding-typed/src/main/mima-filters/2.6.8.backwards.excludes/issue-29490-ShardedDaemonProcess-role.excludes @@ -0,0 +1,2 @@ +# #29490 Add withRole to ShardedDaemonProcessSettings, internal constructor +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.typed.ShardedDaemonProcessSettings.this") diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardedDaemonProcessSettings.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardedDaemonProcessSettings.scala index cb69c9189b..4ff792b716 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardedDaemonProcessSettings.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ShardedDaemonProcessSettings.scala @@ -33,7 +33,7 @@ object ShardedDaemonProcessSettings { def fromConfig(config: Config): ShardedDaemonProcessSettings = { val keepAliveInterval = config.getDuration("keep-alive-interval").asScala - new ShardedDaemonProcessSettings(keepAliveInterval, None) + new ShardedDaemonProcessSettings(keepAliveInterval, None, None) } } @@ -44,7 +44,8 @@ object ShardedDaemonProcessSettings { @ApiMayChange final class ShardedDaemonProcessSettings @InternalApi private[akka] ( val keepAliveInterval: FiniteDuration, - val shardingSettings: Option[ClusterShardingSettings]) { + val shardingSettings: Option[ClusterShardingSettings], + val role: Option[String]) { /** * Scala API: The interval each parent of the sharded set is pinged from each node in the cluster. @@ -52,7 +53,7 @@ final class ShardedDaemonProcessSettings @InternalApi private[akka] ( * Note: How the sharded set is kept alive may change in the future meaning this setting may go away. */ def withKeepAliveInterval(keepAliveInterval: FiniteDuration): ShardedDaemonProcessSettings = - new ShardedDaemonProcessSettings(keepAliveInterval, shardingSettings) + copy(keepAliveInterval = keepAliveInterval) /** * Java API: The interval each parent of the sharded set is pinged from each node in the cluster. @@ -60,7 +61,7 @@ final class ShardedDaemonProcessSettings @InternalApi private[akka] ( * Note: How the sharded set is kept alive may change in the future meaning this setting may go away. */ def withKeepAliveInterval(keepAliveInterval: Duration): ShardedDaemonProcessSettings = - new ShardedDaemonProcessSettings(keepAliveInterval.asScala, shardingSettings) + copy(keepAliveInterval = keepAliveInterval.asScala) /** * Specify sharding settings that should be used for the sharded daemon process instead of loading from config. @@ -68,5 +69,20 @@ final class ShardedDaemonProcessSettings @InternalApi private[akka] ( * changing those settings will be ignored. */ def withShardingSettings(shardingSettings: ClusterShardingSettings): ShardedDaemonProcessSettings = - new ShardedDaemonProcessSettings(keepAliveInterval, Some(shardingSettings)) + copy(shardingSettings = Option(shardingSettings)) + + /** + * Specifies that the ShardedDaemonProcess should run on nodes with a specific role. + * If the role is not specified all nodes in the cluster are used. If the given role does + * not match the role of the current node the the ShardedDaemonProcess will not be started. + */ + def withRole(role: String): ShardedDaemonProcessSettings = + copy(role = Option(role)) + + private def copy( + keepAliveInterval: FiniteDuration = keepAliveInterval, + shardingSettings: Option[ClusterShardingSettings] = shardingSettings, + role: Option[String] = role): ShardedDaemonProcessSettings = + new ShardedDaemonProcessSettings(keepAliveInterval, shardingSettings, role) + } diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala index a4341c678d..fa6df2ab0b 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala @@ -116,7 +116,7 @@ private[akka] final class ShardedDaemonProcessImpl(system: ActorSystem[_]) val shardingBaseSettings = settings.shardingSettings match { case None => - // defaults in akka.cluster.sharding but allow overrides specifically for actor-set + // defaults in akka.cluster.sharding but allow overrides specifically for sharded-daemon-process ClusterShardingSettings.fromConfig( system.settings.config.getConfig("akka.cluster.sharded-daemon-process.sharding")) case Some(shardingSettings) => shardingSettings @@ -124,7 +124,7 @@ private[akka] final class ShardedDaemonProcessImpl(system: ActorSystem[_]) new ClusterShardingSettings( numberOfShards, - shardingBaseSettings.role, + if (settings.role.isDefined) settings.role else shardingBaseSettings.role, shardingBaseSettings.dataCenter, false, // remember entities disabled "", diff --git a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ShardedDaemonProcessSpec.scala b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ShardedDaemonProcessSpec.scala index 5f1181b509..b81e92048c 100644 --- a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ShardedDaemonProcessSpec.scala +++ b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ShardedDaemonProcessSpec.scala @@ -92,7 +92,7 @@ abstract class ShardedDaemonProcessSpec "init actor set" in { ShardedDaemonProcess(typedSystem).init("the-fearless", 4, id => ProcessActor(id)) - enterBarrier("actor-set-initialized") + enterBarrier("sharded-daemon-process-initialized") runOn(first) { val startedIds = (0 to 3).map { _ => val event = probe.expectMessageType[ProcessActorEvent](5.seconds) @@ -101,7 +101,7 @@ abstract class ShardedDaemonProcessSpec }.toSet startedIds.size should ===(4) } - enterBarrier("actor-set-started") + enterBarrier("sharded-daemon-process-started") } // FIXME test removing one cluster node and verify all are alive (how do we do that?) diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ShardedDaemonProcessSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ShardedDaemonProcessSpec.scala index 519c0a2953..98c67cc78c 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ShardedDaemonProcessSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ShardedDaemonProcessSpec.scala @@ -15,7 +15,6 @@ import akka.actor.typed.ActorRef import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.Behaviors import akka.cluster.MemberStatus -import akka.cluster.sharding.typed.ClusterShardingSettings import akka.cluster.sharding.typed.ShardedDaemonProcessSettings import akka.cluster.typed.Cluster import akka.cluster.typed.Join @@ -96,8 +95,7 @@ class ShardedDaemonProcessSpec "not run if the role does not match node role" in { val probe = createTestProbe[Any]() - val settings = - ShardedDaemonProcessSettings(system).withShardingSettings(ClusterShardingSettings(system).withRole("workers")) + val settings = ShardedDaemonProcessSettings(system).withRole("workers") ShardedDaemonProcess(system).init("roles", 3, id => MyActor(id, probe.ref), settings, None) probe.expectNoMessage() From 207b89224ff6dbaa68d354f4b2c931d46d95dd27 Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Tue, 18 Aug 2020 07:46:06 +0100 Subject: [PATCH 07/29] Fix patch versin for scala when overriding To remain compatible with silencer, chop off the patch to not break downstream builds --- project/Dependencies.scala | 32 +++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index b3cc7cb359..1119d6d3f4 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -35,17 +35,27 @@ object Dependencies { val scalaTestVersion = "3.1.1" val scalaCheckVersion = "1.14.3" - val Versions = Seq( - crossScalaVersions := Seq(scala212Version, scala213Version), - scalaVersion := System.getProperty("akka.build.scalaVersion", crossScalaVersions.value.head), - java8CompatVersion := { - CrossVersion.partialVersion(scalaVersion.value) match { - // java8-compat is only used in a couple of places for 2.13, - // it is probably possible to remove the dependency if needed. - case Some((2, n)) if n >= 13 => "0.9.0" - case _ => "0.8.0" - } - }) + val Versions = + Seq( + crossScalaVersions := Seq(scala212Version, scala213Version), + scalaVersion := { + // don't allow full override to keep compatible with the version of silencer + // don't mandate patch not specified to allow builds to migrate + System.getProperty("akka.build.scalaVersion", "default") match { + case twoThirteen if twoThirteen.startsWith("2.13") => scala213Version + case twoTwelve if twoTwelve.startsWith("2.12") => scala212Version + case "default" => crossScalaVersions.value.head + case other => throw new IllegalArgumentException(s"Unsupported scala version [$other]. Must be 2.12 or 2.13.") + } + }, + java8CompatVersion := { + CrossVersion.partialVersion(scalaVersion.value) match { + // java8-compat is only used in a couple of places for 2.13, + // it is probably possible to remove the dependency if needed. + case Some((2, n)) if n >= 13 => "0.9.0" + case _ => "0.8.0" + } + }) object Compile { // Compile From f8c7a118be9a1e9639168d26e306d2d5d1dbb117 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 18 Aug 2020 11:08:07 +0200 Subject: [PATCH 08/29] Reduce scope of cluster.StressSpec, #23511 (#29472) * to only exercise membership * remote deployed routers and supervision of remote deployed actors are not priority, and that is what is sometimes failing --- .../scala/akka/cluster/StressSpec.scala | 510 +----------------- 1 file changed, 12 insertions(+), 498 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index 14591ebf83..cba721ed4d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -5,15 +5,14 @@ package akka.cluster import java.lang.management.ManagementFactory -import java.util.concurrent.ThreadLocalRandom import scala.annotation.tailrec import scala.collection.immutable import scala.concurrent.duration._ +import scala.language.postfixOps import com.typesafe.config.Config import com.typesafe.config.ConfigFactory -import language.postfixOps import org.scalatest.BeforeAndAfterEach import akka.actor.Actor @@ -24,10 +23,8 @@ import akka.actor.ActorSystem import akka.actor.Address import akka.actor.Deploy import akka.actor.Identify -import akka.actor.OneForOneStrategy import akka.actor.Props import akka.actor.RootActorPath -import akka.actor.SupervisorStrategy._ import akka.actor.Terminated import akka.cluster.ClusterEvent.CurrentClusterState import akka.cluster.ClusterEvent.CurrentInternalStats @@ -35,31 +32,24 @@ import akka.cluster.ClusterEvent.MemberEvent import akka.remote.DefaultFailureDetectorRegistry import akka.remote.PhiAccrualFailureDetector import akka.remote.RARP -import akka.remote.RemoteScope import akka.remote.artery.ArterySettings.AeronUpd import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec -import akka.routing.FromConfig -import akka.testkit._ import akka.testkit.TestEvent._ +import akka.testkit._ import akka.util.Helpers.ConfigOps import akka.util.Helpers.Requiring /** * This test is intended to be used as long running stress test - * of cluster related features. Number of nodes and duration of + * of cluster membership features. Number of nodes and duration of * the test steps can be configured. The test scenario is organized as * follows: * 1. join nodes in various ways up to the configured total number of nodes - * 2 while nodes are joining a few cluster aware routers are also working - * 3. exercise concurrent joining and shutdown of nodes repeatedly - * 4. exercise cluster aware routers, including high throughput - * 5. exercise many actors in a tree structure - * 6. exercise remote supervision - * 7. gossip without any changes to the membership - * 8. leave and shutdown nodes in various ways - * 9. while nodes are removed remote death watch is also exercised - * 10. while nodes are removed a few cluster aware routers are also working + * 2. exercise concurrent joining and shutdown of nodes repeatedly + * 3. gossip without any changes to the membership + * 4. leave and shutdown nodes in various ways + * 5. while nodes are removed remote death watch is also exercised * * By default it uses 13 nodes. * Example of sbt command line parameters to double that: @@ -99,32 +89,19 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { # scale the *-duration settings with this factor duration-factor = 1 join-remove-duration = 90s - work-batch-size = 100 - work-batch-interval = 2s - payload-size = 1000 - normal-throughput-duration = 30s - high-throughput-duration = 10s - supervision-duration = 10s - supervision-one-iteration = 2.5s idle-gossip-duration = 10s expected-test-duration = 600s - # actors are created in a tree structure defined - # by tree-width (number of children for each actor) and - # tree-levels, total number of actors can be calculated by - # (width * math.pow(width, levels) - 1) / (width - 1) - tree-width = 4 - tree-levels = 4 # scale convergence within timeouts with this factor convergence-within-factor = 1.0 - # set to off to only test cluster membership - exercise-actors = on } akka.actor.provider = cluster akka.cluster { failure-detector.acceptable-heartbeat-pause = 10s - downing-provider-class = akka.cluster.testkit.AutoDowning - testkit.auto-down-unreachable-after = 1s + downing-provider-class = akka.cluster.sbr.SplitBrainResolverProvider + split-brain-resolver { + stable-after = 5s + } publish-stats-interval = 1s } akka.loggers = ["akka.testkit.TestEventListener"] @@ -135,32 +112,6 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { parallelism-max = 8 } - akka.actor.deployment { - /master-node-1/workers { - router = round-robin-pool - cluster { - enabled = on - max-nr-of-instances-per-node = 1 - allow-local-routees = on - } - } - /master-node-2/workers { - router = round-robin-group - routees.paths = ["/user/worker"] - cluster { - enabled = on - allow-local-routees = on - } - } - /master-node-3/workers = { - router = round-robin-pool - cluster { - enabled = on - max-nr-of-instances-per-node = 1 - allow-local-routees = on - } - } - } # test is using Java serialization and not priority to rewrite akka.actor.allow-java-serialization = on akka.actor.warn-about-java-serializer-usage = off @@ -190,21 +141,11 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { val numberOfNodesShutdown = getInt("nr-of-nodes-shutdown") * nFactor val numberOfNodesJoinRemove = getInt("nr-of-nodes-join-remove") // not scaled by nodes factor - val workBatchSize = getInt("work-batch-size") - val workBatchInterval = testConfig.getMillisDuration("work-batch-interval") - val payloadSize = getInt("payload-size") val dFactor = getInt("duration-factor") val joinRemoveDuration = testConfig.getMillisDuration("join-remove-duration") * dFactor - val normalThroughputDuration = testConfig.getMillisDuration("normal-throughput-duration") * dFactor - val highThroughputDuration = testConfig.getMillisDuration("high-throughput-duration") * dFactor - val supervisionDuration = testConfig.getMillisDuration("supervision-duration") * dFactor - val supervisionOneIteration = testConfig.getMillisDuration("supervision-one-iteration") * dFactor val idleGossipDuration = testConfig.getMillisDuration("idle-gossip-duration") * dFactor val expectedTestDuration = testConfig.getMillisDuration("expected-test-duration") * dFactor - val treeWidth = getInt("tree-width") - val treeLevels = getInt("tree-levels") val convergenceWithinFactor = getDouble("convergence-within-factor") - val exerciseActors = getBoolean("exercise-actors") require( numberOfSeedNodes + numberOfNodesJoiningToSeedNodesInitially + numberOfNodesJoiningOneByOneSmall + @@ -438,154 +379,6 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { } } - /** - * Master of routers - * - * Flow control, to not flood the consumers, is handled by scheduling a - * batch of messages to be sent to the router when half of the number - * of outstanding messages remains. - * - * It uses a simple message retry mechanism. If an ack of a sent message - * is not received within a timeout, that message will be resent to the router, - * infinite number of times. - * - * When it receives the `End` command it will stop sending messages to the router, - * resends continuous, until all outstanding acks have been received, and then - * finally it replies with `WorkResult` to the sender of the `End` command, and stops - * itself. - */ - class Master(settings: StressMultiJvmSpec.Settings, batchInterval: FiniteDuration, tree: Boolean) extends Actor { - val workers = context.actorOf(FromConfig.props(Props[Worker]()), "workers") - val payload = Array.fill(settings.payloadSize)(ThreadLocalRandom.current.nextInt(127).toByte) - val retryTimeout = 5.seconds.dilated(context.system) - val idCounter = Iterator.from(0) - var sendCounter = 0L - var ackCounter = 0L - var outstanding = Map.empty[JobId, JobState] - var startTime = 0L - - import context.dispatcher - val resendTask = context.system.scheduler.scheduleWithFixedDelay(3.seconds, 3.seconds, self, RetryTick) - - override def postStop(): Unit = { - resendTask.cancel() - super.postStop() - } - - def receive = { - case Begin => - startTime = System.nanoTime - self ! SendBatch - context.become(working) - case RetryTick => - } - - def working: Receive = { - case Ack(id) => - outstanding -= id - ackCounter += 1 - if (outstanding.size == settings.workBatchSize / 2) - if (batchInterval == Duration.Zero) self ! SendBatch - else context.system.scheduler.scheduleOnce(batchInterval, self, SendBatch) - case SendBatch => sendJobs() - case RetryTick => resend() - case End => - done(sender()) - context.become(ending(sender())) - } - - def ending(replyTo: ActorRef): Receive = { - case Ack(id) => - outstanding -= id - ackCounter += 1 - done(replyTo) - case SendBatch => - case RetryTick => resend() - } - - def done(replyTo: ActorRef): Unit = - if (outstanding.isEmpty) { - val duration = (System.nanoTime - startTime).nanos - replyTo ! WorkResult(duration, sendCounter, ackCounter) - context.stop(self) - } - - def sendJobs(): Unit = { - (0 until settings.workBatchSize).foreach { _ => - send(createJob()) - } - } - - def createJob(): Job = { - if (tree) - TreeJob( - idCounter.next(), - payload, - ThreadLocalRandom.current.nextInt(settings.treeWidth), - settings.treeLevels, - settings.treeWidth) - else SimpleJob(idCounter.next(), payload) - } - - def resend(): Unit = { - outstanding.values.foreach { jobState => - if (jobState.deadline.isOverdue()) - send(jobState.job) - } - } - - def send(job: Job): Unit = { - outstanding += job.id -> JobState(Deadline.now + retryTimeout, job) - sendCounter += 1 - workers ! job - } - } - - /** - * Used by Master as routee - */ - class Worker extends Actor with ActorLogging { - def receive = { - case SimpleJob(id, _) => sender() ! Ack(id) - case TreeJob(id, payload, idx, levels, width) => - // create the actors when first TreeJob message is received - val totalActors = ((width * math.pow(width, levels) - 1) / (width - 1)).toInt - log.debug( - "Creating [{}] actors in a tree structure of [{}] levels and each actor has [{}] children", - totalActors, - levels, - width) - val tree = context.actorOf(Props(classOf[TreeNode], levels, width), "tree") - tree.forward((idx, SimpleJob(id, payload))) - context.become(treeWorker(tree)) - } - - def treeWorker(tree: ActorRef): Receive = { - case SimpleJob(id, _) => sender() ! Ack(id) - case TreeJob(id, payload, idx, _, _) => - tree.forward((idx, SimpleJob(id, payload))) - } - } - - class TreeNode(level: Int, width: Int) extends Actor { - require(level >= 1) - def createChild(): Actor = if (level == 1) new Leaf else new TreeNode(level - 1, width) - val indexedChildren = - (0 until width).map { i => - context.actorOf(Props(createChild()).withDeploy(Deploy.local), name = i.toString) - } toVector - - def receive = { - case (idx: Int, job: SimpleJob) if idx < width => indexedChildren(idx).forward((idx, job)) - } - } - - class Leaf extends Actor { - def receive = { - case (_: Int, job: SimpleJob) => sender() ! Ack(job.id) - } - } - /** * Used for remote death watch testing */ @@ -593,41 +386,6 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { def receive = Actor.emptyBehavior } - /** - * Used for remote supervision testing - */ - class Supervisor extends Actor { - - var restartCount = 0 - - override val supervisorStrategy = - OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 1 minute) { - case _: Exception => - restartCount += 1 - Restart - } - - def receive = { - case props: Props => context.actorOf(props) - case e: Exception => context.children.foreach { _ ! e } - case GetChildrenCount => sender() ! ChildrenCount(context.children.size, restartCount) - case Reset => - require( - context.children.isEmpty, - s"ResetChildrenCount not allowed when children exists, [${context.children.size}]") - restartCount = 0 - } - } - - /** - * Child of Supervisor for remote supervision testing - */ - class RemoteChild extends Actor { - def receive = { - case e: Exception => throw e - } - } - case object Begin case object End case object RetryTick @@ -640,22 +398,6 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { } final case class ReportTo(ref: Option[ActorRef]) final case class StatsResult(from: Address, stats: CurrentInternalStats) - - type JobId = Int - trait Job { def id: JobId } - final case class SimpleJob(id: JobId, payload: Any) extends Job - final case class TreeJob(id: JobId, payload: Any, idx: Int, levels: Int, width: Int) extends Job - final case class Ack(id: JobId) - final case class JobState(deadline: Deadline, job: Job) - final case class WorkResult(duration: Duration, sendCount: Long, ackCount: Long) { - def retryCount: Long = sendCount - ackCount - def jobsPerSecond: Double = ackCount * 1000.0 / duration.toMillis - } - case object SendBatch - final case class CreateTree(levels: Int, width: Int) - - case object GetChildrenCount - final case class ChildrenCount(numberOfChildren: Int, numberOfChildRestarts: Int) case object Reset } @@ -699,13 +441,7 @@ abstract class StressSpec override def muteLog(sys: ActorSystem = system): Unit = { super.muteLog(sys) sys.eventStream.publish(Mute(EventFilter[RuntimeException](pattern = ".*Simulated exception.*"))) - muteDeadLetters( - classOf[SimpleJob], - classOf[AggregatedClusterResult], - SendBatch.getClass, - classOf[StatsResult], - classOf[PhiResult], - RetryTick.getClass)(sys) + muteDeadLetters(classOf[AggregatedClusterResult], classOf[StatsResult], classOf[PhiResult], RetryTick.getClass)(sys) } override protected def afterTermination(): Unit = { @@ -780,14 +516,9 @@ abstract class StressSpec def latestGossipStats = cluster.readView.latestStats.gossipStats override def cluster: Cluster = { - createWorker super.cluster } - // always create one worker when the cluster is started - lazy val createWorker: Unit = - system.actorOf(Props[Worker](), "worker") - def createResultAggregator(title: String, expectedResults: Int, includeInHistory: Boolean): Unit = { runOn(roles.head) { val aggregator = system.actorOf( @@ -1038,116 +769,6 @@ abstract class StressSpec } - def masterName: String = "master-" + myself.name - - def master: Option[ActorRef] = { - system.actorSelection("/user/" + masterName).tell(Identify("master"), identifyProbe.ref) - identifyProbe.expectMsgType[ActorIdentity].ref - } - - def exerciseRouters( - title: String, - duration: FiniteDuration, - batchInterval: FiniteDuration, - expectDroppedMessages: Boolean, - tree: Boolean): Unit = - within(duration + 10.seconds) { - nbrUsedRoles should ===(totalNumberOfNodes) - createResultAggregator(title, expectedResults = nbrUsedRoles, includeInHistory = false) - - val (masterRoles, otherRoles) = roles.take(nbrUsedRoles).splitAt(3) - runOn(masterRoles: _*) { - reportResult { - val m = system.actorOf( - Props(classOf[Master], settings, batchInterval, tree).withDeploy(Deploy.local), - name = masterName) - m ! Begin - import system.dispatcher - system.scheduler.scheduleOnce(duration) { - m.tell(End, testActor) - } - val workResult = awaitWorkResult(m) - workResult.sendCount should be > (0L) - workResult.ackCount should be > (0L) - if (!expectDroppedMessages) - workResult.retryCount should ===(0) - - enterBarrier("routers-done-" + step) - } - } - runOn(otherRoles: _*) { - reportResult { - enterBarrier("routers-done-" + step) - } - } - - awaitClusterResult() - } - - def awaitWorkResult(m: ActorRef): WorkResult = { - val workResult = expectMsgType[WorkResult] - if (settings.infolog) - log.info( - "{} result, [{}] jobs/s, retried [{}] of [{}] msg", - masterName, - workResult.jobsPerSecond.form, - workResult.retryCount, - workResult.sendCount) - watch(m) - expectTerminated(m) - workResult - } - - def exerciseSupervision(title: String, duration: FiniteDuration, oneIteration: Duration): Unit = - within(duration + 10.seconds) { - val rounds = (duration.toMillis / oneIteration.toMillis).max(1).toInt - val supervisor = system.actorOf(Props[Supervisor](), "supervisor") - for (_ <- 0 until rounds) { - createResultAggregator(title, expectedResults = nbrUsedRoles, includeInHistory = false) - - val (masterRoles, otherRoles) = roles.take(nbrUsedRoles).splitAt(3) - runOn(masterRoles: _*) { - reportResult { - roles.take(nbrUsedRoles).foreach { r => - supervisor ! Props[RemoteChild]().withDeploy(Deploy(scope = RemoteScope(address(r)))) - } - supervisor ! GetChildrenCount - expectMsgType[ChildrenCount] should ===(ChildrenCount(nbrUsedRoles, 0)) - - (1 to 5).foreach { _ => - supervisor ! new RuntimeException("Simulated exception") - } - awaitAssert { - supervisor ! GetChildrenCount - val c = expectMsgType[ChildrenCount] - c should ===(ChildrenCount(nbrUsedRoles, 5 * nbrUsedRoles)) - } - - // after 5 restart attempts the children should be stopped - supervisor ! new RuntimeException("Simulated exception") - awaitAssert { - supervisor ! GetChildrenCount - val c = expectMsgType[ChildrenCount] - // zero children - c should ===(ChildrenCount(0, 6 * nbrUsedRoles)) - } - supervisor ! Reset - - } - enterBarrier("supervision-done-" + step) - } - - runOn(otherRoles: _*) { - reportResult { - enterBarrier("supervision-done-" + step) - } - } - - awaitClusterResult() - step += 1 - } - } - def idleGossip(title: String): Unit = { createResultAggregator(title, expectedResults = nbrUsedRoles, includeInHistory = true) reportResult { @@ -1195,14 +816,6 @@ abstract class StressSpec enterBarrier("after-" + step) } - "start routers that are running while nodes are joining" taggedAs LongRunningTest in { - runOn(roles.take(3): _*) { - system.actorOf( - Props(classOf[Master], settings, settings.workBatchInterval, false).withDeploy(Deploy.local), - name = masterName) ! Begin - } - } - "join nodes one-by-one to small cluster" taggedAs LongRunningTest in { joinOneByOne(numberOfNodesJoiningOneByOneSmall) enterBarrier("after-" + step) @@ -1227,99 +840,16 @@ abstract class StressSpec enterBarrier("after-" + step) } - "end routers that are running while nodes are joining" taggedAs LongRunningTest in within(30.seconds) { - if (exerciseActors) { - runOn(roles.take(3): _*) { - master match { - case Some(m) => - m.tell(End, testActor) - val workResult = awaitWorkResult(m) - workResult.retryCount should ===(0) - workResult.sendCount should be > (0L) - workResult.ackCount should be > (0L) - case None => fail("master not running") - } - } - } - enterBarrier("after-" + step) - } - - "use routers with normal throughput" taggedAs LongRunningTest in { - if (exerciseActors) { - exerciseRouters( - "use routers with normal throughput", - normalThroughputDuration, - batchInterval = workBatchInterval, - expectDroppedMessages = false, - tree = false) - } - enterBarrier("after-" + step) - } - - "use routers with high throughput" taggedAs LongRunningTest in { - if (exerciseActors) { - exerciseRouters( - "use routers with high throughput", - highThroughputDuration, - batchInterval = Duration.Zero, - expectDroppedMessages = false, - tree = false) - } - enterBarrier("after-" + step) - } - - "use many actors with normal throughput" taggedAs LongRunningTest in { - if (exerciseActors) { - exerciseRouters( - "use many actors with normal throughput", - normalThroughputDuration, - batchInterval = workBatchInterval, - expectDroppedMessages = false, - tree = true) - } - enterBarrier("after-" + step) - } - - "use many actors with high throughput" taggedAs LongRunningTest in { - if (exerciseActors) { - exerciseRouters( - "use many actors with high throughput", - highThroughputDuration, - batchInterval = Duration.Zero, - expectDroppedMessages = false, - tree = true) - } - enterBarrier("after-" + step) - } - "exercise join/remove/join/remove" taggedAs LongRunningTest in { exerciseJoinRemove("exercise join/remove", joinRemoveDuration) enterBarrier("after-" + step) } - "exercise supervision" taggedAs LongRunningTest in { - if (exerciseActors) { - exerciseSupervision("exercise supervision", supervisionDuration, supervisionOneIteration) - } - enterBarrier("after-" + step) - } - "gossip when idle" taggedAs LongRunningTest in { idleGossip("idle gossip") enterBarrier("after-" + step) } - "start routers that are running while nodes are removed" taggedAs LongRunningTest in { - if (exerciseActors) { - runOn(roles.take(3): _*) { - system.actorOf( - Props(classOf[Master], settings, settings.workBatchInterval, false).withDeploy(Deploy.local), - name = masterName) ! Begin - } - } - enterBarrier("after-" + step) - } - "leave nodes one-by-one from large cluster" taggedAs LongRunningTest in { removeOneByOne(numberOfNodesLeavingOneByOneLarge, shutdown = false) enterBarrier("after-" + step) @@ -1352,22 +882,6 @@ abstract class StressSpec enterBarrier("after-" + step) } - "end routers that are running while nodes are removed" taggedAs LongRunningTest in within(30.seconds) { - if (exerciseActors) { - runOn(roles.take(3): _*) { - master match { - case Some(m) => - m.tell(End, testActor) - val workResult = awaitWorkResult(m) - workResult.sendCount should be > (0L) - workResult.ackCount should be > (0L) - case None => fail("master not running") - } - } - } - enterBarrier("after-" + step) - } - "log jvm info" taggedAs LongRunningTest in { if (infolog) { log.info("StressSpec JVM:\n{}", jvmInfo()) From 2db9fb0b59a4e9bf507ec05804e335ecae933142 Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Fri, 10 Jul 2020 16:20:38 +0200 Subject: [PATCH 09/29] Make akka-protobuf-v3_2.12-2.6.7 reproducible again --- build.sbt | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/build.sbt b/build.sbt index a53103bdac..1abf2356e0 100644 --- a/build.sbt +++ b/build.sbt @@ -338,8 +338,10 @@ lazy val protobufV3 = akkaModule("akka-protobuf-v3") exportJars := true, // in dependent projects, use assembled and shaded jar makePomConfiguration := makePomConfiguration.value .withConfigurations(Vector(Compile)), // prevent original dependency to be added to pom as runtime dep - packagedArtifact in (Compile, packageBin) := Scoped - .mkTuple2((artifact in (Compile, packageBin)).value, OsgiKeys.bundle.value), + packagedArtifact in (Compile, packageBin) := Scoped.mkTuple2( + (artifact in (Compile, packageBin)).value, + ReproducibleBuildsPlugin.postProcessJar(OsgiKeys.bundle.value) + ), packageBin in Compile := ReproducibleBuildsPlugin .postProcessJar((assembly in Compile).value), // package by running assembly // Prevent cyclic task dependencies, see https://github.com/sbt/sbt-assembly/issues/365 From c934511c427309a9de551793f9e7d3883d0e1a17 Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Tue, 18 Aug 2020 10:25:01 +0100 Subject: [PATCH 10/29] Forward terminated from ShardCoordinator to RebalanceWorker (#29463) * Forward terminated from ShardCoordinator to RebalanceWorker Avoiding the need for rebalance workers to watch shard regions which is expensive as there is one rebalance worker per shard * Review feedback --- .../akka/cluster/sharding/ShardCoordinator.scala | 16 +++++++++++----- .../sharding/ClusterShardingLeavingSpec.scala | 5 +++-- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index 4b4adb3e19..ec2e278005 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -448,6 +448,10 @@ object ShardCoordinator { */ private final case class RebalanceResult(shards: Set[ShardId]) + private[akka] object RebalanceWorker { + final case class ShardRegionTerminated(region: ActorRef) + } + /** * INTERNAL API. Rebalancing process is performed by this actor. * It sends `BeginHandOff` to all `ShardRegion` actors followed by @@ -467,7 +471,6 @@ object ShardCoordinator { import Internal._ regions.foreach { region => - context.watch(region) region ! BeginHandOff(shard) } var remaining = regions @@ -480,7 +483,7 @@ object ShardCoordinator { case BeginHandOffAck(`shard`) => log.debug("BeginHandOffAck for shard [{}] received from [{}].", shard, sender()) acked(sender()) - case Terminated(shardRegion) => + case ShardRegionTerminated(shardRegion) => log.debug("ShardRegion [{}] terminated while waiting for BeginHandOffAck for shard [{}].", shardRegion, shard) acked(shardRegion) case ReceiveTimeout => @@ -489,7 +492,6 @@ object ShardCoordinator { } private def acked(shardRegion: ActorRef) = { - context.unwatch(shardRegion) remaining -= shardRegion if (remaining.isEmpty) { log.debug("All shard regions acked, handing off shard [{}].", shard) @@ -547,6 +549,7 @@ abstract class ShardCoordinator( var state = State.empty.withRememberEntities(settings.rememberEntities) // rebalanceInProgress for the ShardId keys, pending GetShardHome requests by the ActorRef values var rebalanceInProgress = Map.empty[ShardId, Set[ActorRef]] + var rebalanceWorkers: Set[ActorRef] = Set.empty var unAckedHostShards = Map.empty[ShardId, Cancellable] // regions that have requested handoff, for graceful shutdown var gracefulShutdownInProgress = Set.empty[ActorRef] @@ -687,6 +690,7 @@ abstract class ShardCoordinator( continueRebalance(shards) case RebalanceDone(shard, ok) => + rebalanceWorkers -= sender() if (ok) log.debug("Rebalance shard [{}] completed successfully.", shard) else @@ -887,7 +891,8 @@ abstract class ShardCoordinator( } } - def regionTerminated(ref: ActorRef): Unit = + def regionTerminated(ref: ActorRef): Unit = { + rebalanceWorkers.foreach(_ ! RebalanceWorker.ShardRegionTerminated(ref)) if (state.regions.contains(ref)) { log.debug("ShardRegion terminated: [{}]", ref) regionTerminationInProgress += ref @@ -903,6 +908,7 @@ abstract class ShardCoordinator( allocateShardHomesForRememberEntities() } } + } def regionProxyTerminated(ref: ActorRef): Unit = if (state.regionProxies.contains(ref)) { @@ -974,7 +980,7 @@ abstract class ShardCoordinator( case Some(rebalanceFromRegion) => rebalanceInProgress = rebalanceInProgress.updated(shard, Set.empty) log.debug("Rebalance shard [{}] from [{}]", shard, rebalanceFromRegion) - context.actorOf( + rebalanceWorkers += context.actorOf( rebalanceWorkerProps( shard, rebalanceFromRegion, diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala index a43f1a9eb3..fbab890a84 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala @@ -46,9 +46,10 @@ abstract class ClusterShardingLeavingSpecConfig(mode: String) extends MultiNodeClusterShardingConfig( mode, loglevel = "DEBUG", - additionalConfig = """ + additionalConfig = + """ akka.cluster.sharding.verbose-debug-logging = on - akka.cluster.sharding.rebalance-interval = 120 s + akka.cluster.sharding.rebalance-interval = 1s # make rebalancing more likely to happen to test for https://github.com/akka/akka/issues/29093 akka.cluster.sharding.distributed-data.majority-min-cap = 1 akka.cluster.sharding.coordinator-state.write-majority-plus = 1 akka.cluster.sharding.coordinator-state.read-majority-plus = 1 From da404071dc0697e1a4783407c12f9f533c4bf151 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 18 Aug 2020 11:49:19 +0200 Subject: [PATCH 11/29] full convergence also for joining nodes for first multi-dc join, #29486 (#29499) --- .../scala/akka/cluster/MembershipState.scala | 12 ++++++++---- .../test/scala/akka/cluster/GossipSpec.scala | 19 +++++++++++++++++++ 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala b/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala index 6c4a326d42..d2443066fb 100644 --- a/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala +++ b/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala @@ -60,14 +60,18 @@ import akka.util.ccompat._ !members.exists(member => member.dataCenter == selfDc && convergenceMemberStatus(member.status)) // If another member in the data center that is UP or LEAVING and has not seen this gossip or is exiting - // convergence cannot be reached. For the first member in a secondary DC all members must have seen - // the gossip state. - def memberHinderingConvergenceExists = + // convergence cannot be reached. For the first member in a secondary DC all Joining, WeaklyUp, Up or Leaving + // members must have seen the gossip state. The reason for the stronger requirement for a first member in a + // secondary DC is that first member should only be moved to Up once to ensure that the first upNumber is + // only assigned once. + def memberHinderingConvergenceExists = { + val memberStatus = if (firstMemberInDc) convergenceMemberStatus + Joining + WeaklyUp else convergenceMemberStatus members.exists( member => (firstMemberInDc || member.dataCenter == selfDc) && - convergenceMemberStatus(member.status) && + memberStatus(member.status) && !(latestGossip.seenByNode(member.uniqueAddress) || exitingConfirmed(member.uniqueAddress))) + } // Find cluster members in the data center that are unreachable from other members of the data center // excluding observations from members outside of the data center, that have status DOWN or is passed in as confirmed exiting. diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index c57ec39849..01213ce250 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -28,6 +28,7 @@ class GossipSpec extends AnyWordSpec with Matchers { val e1 = TestMember(Address("akka", "sys", "e", 2552), Joining) val e2 = TestMember(e1.address, Up) val e3 = TestMember(e1.address, Down) + val f1 = TestMember(Address("akka", "sys", "f", 2552), Joining) val dc1a1 = TestMember(Address("akka", "sys", "a", 2552), Up, Set.empty, dataCenter = "dc1") val dc1b1 = TestMember(Address("akka", "sys", "b", 2552), Up, Set.empty, dataCenter = "dc1") @@ -272,6 +273,24 @@ class GossipSpec extends AnyWordSpec with Matchers { state(g2, dc2e1).convergence(Set.empty) should ===(true) } + "not reach convergence for first member of other data center until all have seen the gossip 2" in { + // reproducer test for issue #29486 + val dc2e1 = TestMember(e1.address, status = Joining, roles = Set.empty, dataCenter = "dc2") + val dc2f1 = TestMember(f1.address, status = Joining, roles = Set.empty, dataCenter = "dc2") + val g = + Gossip(members = SortedSet(dc1a1, dc1b1, dc2e1, dc2f1)) + .seen(dc1a1.uniqueAddress) + .seen(dc1b1.uniqueAddress) + .seen(dc2f1.uniqueAddress) + + // dc2 hasn't reached convergence because dc2e1 has not seen it (and that matters even though it is only Joining) + state(g, dc2f1).convergence(Set.empty) should ===(false) + + // until all have seen it + val g2 = g.seen(dc2e1.uniqueAddress) + state(g2, dc2f1).convergence(Set.empty) should ===(true) + } + "reach convergence per data center even if another data center contains unreachable" in { val r1 = Reachability.empty.unreachable(dc2c1.uniqueAddress, dc2d1.uniqueAddress) From efe02935cdcefd0055f9c74d9af0b705afdd156b Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 18 Aug 2020 15:16:44 +0200 Subject: [PATCH 12/29] doc: rolling update from 2.5.x to 2.6.2 (#29501) --- .../src/main/paradox/project/migration-guide-2.5.x-2.6.x.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md index 6413f37f0f..3d2e5874ee 100644 --- a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md +++ b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md @@ -19,7 +19,9 @@ reading this migration guide and testing your application thoroughly is recommen Rolling updates are possible without shutting down all nodes of the Akka Cluster, but will require configuration adjustments as described in the @ref:[Remoting](#remoting) section of this migration -guide. +guide. Due to the @ref:[changed serialization of the Cluster messages in Akka 2.6.2](rolling-update.md#2-6-2-clustermessageserializer-manifests-change) +a rolling update from 2.5.x must first be made to Akka 2.6.2 and then a second rolling update can change to Akka 2.6.3 +or later. ## Scala 2.11 no longer supported From ff9b8f44ea6df6f31fb8850d11a29f5461240db5 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 19 Aug 2020 09:30:13 +0200 Subject: [PATCH 13/29] increase timeout in MultiDcJoin2Spec, #29505 --- .../src/multi-jvm/scala/akka/cluster/MultiDcJoin2Spec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcJoin2Spec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcJoin2Spec.scala index ea9135eb3f..14b8d38b02 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcJoin2Spec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiDcJoin2Spec.scala @@ -94,7 +94,7 @@ abstract class MultiDcJoin2Spec extends MultiNodeSpec(MultiDcJoin2MultiJvmSpec) // at the same time join fifth, which is the difference compared to MultiDcJoinSpec runOn(fifth) { Cluster(system).join(second) - within(10.seconds) { + within(20.seconds) { awaitAssert { Cluster(system).state.members .exists(m => m.address == address(fifth) && m.status == MemberStatus.Up) should ===(true) From 8f5c200e1e567507ab4a9e7fdc900bd7d73e26e1 Mon Sep 17 00:00:00 2001 From: Scala Steward Date: Mon, 24 Aug 2020 02:03:30 +0200 Subject: [PATCH 14/29] Update sbt-java-formatter to 0.6.0 --- project/plugins.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/plugins.sbt b/project/plugins.sbt index af2935a13f..0b78693a68 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -8,7 +8,7 @@ libraryDependencies += Defaults.sbtPluginExtra( addSbtPlugin("com.typesafe.sbt" % "sbt-multi-jvm" % "0.4.0") //#sbt-multi-jvm -addSbtPlugin("com.lightbend.sbt" % "sbt-java-formatter" % "0.5.1") +addSbtPlugin("com.lightbend.sbt" % "sbt-java-formatter" % "0.6.0") addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.3.4") addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.9.15") // sbt-osgi 0.9.5 is available but breaks including jdk9-only classes From b90ed1c2638cb1dc06f1bf948a18042e869b0ec3 Mon Sep 17 00:00:00 2001 From: Scala Steward <43047562+scala-steward@users.noreply.github.com> Date: Mon, 24 Aug 2020 09:44:03 +0200 Subject: [PATCH 15/29] Update silencer-lib, silencer-plugin to 1.7.1 (#29516) --- project/AkkaDisciplinePlugin.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/AkkaDisciplinePlugin.scala b/project/AkkaDisciplinePlugin.scala index 9e3a9e1ce9..7222af7938 100644 --- a/project/AkkaDisciplinePlugin.scala +++ b/project/AkkaDisciplinePlugin.scala @@ -62,7 +62,7 @@ object AkkaDisciplinePlugin extends AutoPlugin { "akka-testkit") lazy val silencerSettings = { - val silencerVersion = "1.7.0" + val silencerVersion = "1.7.1" Seq( libraryDependencies ++= Seq( compilerPlugin(("com.github.ghik" %% "silencer-plugin" % silencerVersion).cross(CrossVersion.patch)), From 988e240368c5ce4441d9f6895a8c45dd67ab4767 Mon Sep 17 00:00:00 2001 From: Scala Steward <43047562+scala-steward@users.noreply.github.com> Date: Mon, 24 Aug 2020 10:06:17 +0200 Subject: [PATCH 16/29] Update metrics-core, metrics-jvm to 4.1.12.1 (#29518) --- project/Dependencies.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 1119d6d3f4..a7b5bb2a3f 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -141,8 +141,8 @@ object Dependencies { val dockerClient = "com.spotify" % "docker-client" % "8.16.0" % "test" // ApacheV2 // metrics, measurements, perf testing - val metrics = "io.dropwizard.metrics" % "metrics-core" % "4.1.11" % "test" // ApacheV2 - val metricsJvm = "io.dropwizard.metrics" % "metrics-jvm" % "4.1.11" % "test" // ApacheV2 + val metrics = "io.dropwizard.metrics" % "metrics-core" % "4.1.12.1" % "test" // ApacheV2 + val metricsJvm = "io.dropwizard.metrics" % "metrics-jvm" % "4.1.12.1" % "test" // ApacheV2 val latencyUtils = "org.latencyutils" % "LatencyUtils" % "2.0.3" % "test" // Free BSD val hdrHistogram = "org.hdrhistogram" % "HdrHistogram" % "2.1.12" % "test" // CC0 val metricsAll = Seq(metrics, metricsJvm, latencyUtils, hdrHistogram) From 165990ba01fdc8ef631457d82b7cca7995baefaa Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Mon, 24 Aug 2020 10:28:47 +0100 Subject: [PATCH 17/29] Pin jackson to 2.10. (#29521) --- .scala-steward.conf | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.scala-steward.conf b/.scala-steward.conf index 4cc938e48a..5d73734dca 100644 --- a/.scala-steward.conf +++ b/.scala-steward.conf @@ -1,5 +1,9 @@ pullRequests.frequency = "@monthly" +updates.pin = [ + { groupId = "com.fasterxml.jackson.core", artifactId = "jackson-databind", version = "2.10." } +] + updates.ignore = [ { groupId = "com.google.protobuf", artifactId = "protobuf-java" }, { groupId = "org.scalameta", artifactId = "scalafmt-core" }, From cfbbf74b77f4773771d5274beb4811bcab109271 Mon Sep 17 00:00:00 2001 From: Scala Steward <43047562+scala-steward@users.noreply.github.com> Date: Mon, 24 Aug 2020 11:29:31 +0200 Subject: [PATCH 18/29] Update jctools-core to 3.0.1 (#29519) --- project/Dependencies.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index a7b5bb2a3f..d5aa1d87be 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -73,7 +73,7 @@ object Dependencies { val sigar = "org.fusesource" % "sigar" % "1.6.4" // ApacheV2 - val jctools = "org.jctools" % "jctools-core" % "3.0.0" // ApacheV2 + val jctools = "org.jctools" % "jctools-core" % "3.0.1" // ApacheV2 // reactive streams val reactiveStreams = "org.reactivestreams" % "reactive-streams" % reactiveStreamsVersion // CC0 From d9c610ecebcf09ed1b019e6f8c6d30d947448c64 Mon Sep 17 00:00:00 2001 From: Enno <458526+ennru@users.noreply.github.com> Date: Mon, 24 Aug 2020 11:45:27 +0200 Subject: [PATCH 19/29] docs: render depenency on Futures page correctly (#29513) --- akka-docs/src/main/paradox/futures.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/akka-docs/src/main/paradox/futures.md b/akka-docs/src/main/paradox/futures.md index a8fcc6a297..389a2d46b7 100644 --- a/akka-docs/src/main/paradox/futures.md +++ b/akka-docs/src/main/paradox/futures.md @@ -5,9 +5,11 @@ Akka offers tiny helpers for use with @scala[@scaladoc[Future](scala.concurrent.Future)s]@java[@javadoc[CompletionStage](java.util.concurrent.CompletionStage)]. These are part of Akka's core module: @@dependency[sbt,Maven,Gradle] { + symbol1=AkkaVersion + value1="$akka.version$" group="com.typesafe.akka" - artifact="akka-actor_$scala.binary_version$" - version="$akka.version$" + artifact="akka-actor_$scala.binary.version$" + version=AkkaVersion } ## After From 1ee9a322b6c97280cfe6fd377a264b7b823f9d46 Mon Sep 17 00:00:00 2001 From: Scala Steward <43047562+scala-steward@users.noreply.github.com> Date: Mon, 24 Aug 2020 13:20:24 +0200 Subject: [PATCH 20/29] Update sbt-reproducible-builds to 0.25 (#29305) --- project/plugins.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/plugins.sbt b/project/plugins.sbt index af2935a13f..675ee99930 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -23,6 +23,6 @@ addSbtPlugin("com.lightbend.akka" % "sbt-paradox-akka" % "0.35") addSbtPlugin("com.lightbend" % "sbt-whitesource" % "0.1.18") addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.6.0") addSbtPlugin("com.hpe.sbt" % "sbt-pull-request-validator" % "1.0.0") -addSbtPlugin("net.bzzt" % "sbt-reproducible-builds" % "0.24") +addSbtPlugin("net.bzzt" % "sbt-reproducible-builds" % "0.25") addSbtPlugin("com.dwijnand" % "sbt-dynver" % "4.1.1") addSbtPlugin("com.lightbend.sbt" % "sbt-publish-rsync" % "0.2") From 728dda874ef3b4357314b6ec1d66ff570fc8cb19 Mon Sep 17 00:00:00 2001 From: Ignasi Marimon-Clos Date: Tue, 25 Aug 2020 11:10:33 +0200 Subject: [PATCH 21/29] Event migration improvements (#29514) --- .../paradox/persistence-schema-evolution.md | 2 +- .../src/main/paradox/serialization-jackson.md | 61 +++- .../jackson/JacksonMigration.scala | 15 +- .../jackson/JacksonSerializer.scala | 19 +- ...ion.java => JavaTestEventMigrationV2.java} | 5 +- .../JavaTestEventMigrationV2WithV3.java | 56 ++++ .../jackson/JavaTestEventMigrationV3.java | 49 +++ .../jackson/JavaTestMessages.java | 36 ++ .../serialization/jackson/v1/ItemAdded.java | 2 + .../jackson/v1withv2/ItemAddedMigration.java | 37 +++ .../jackson/JacksonSerializerSpec.scala | 307 +++++++++++++++--- .../jackson/ScalaTestEventMigration.scala | 94 ++++++ .../serialization/jackson/v1/ItemAdded.scala | 2 + .../jackson/v1withv2/ItemAddedMigration.scala | 29 ++ 14 files changed, 651 insertions(+), 63 deletions(-) rename akka-serialization-jackson/src/test/java/akka/serialization/jackson/{JavaTestEventMigration.java => JavaTestEventMigrationV2.java} (82%) create mode 100644 akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestEventMigrationV2WithV3.java create mode 100644 akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestEventMigrationV3.java create mode 100644 akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v1withv2/ItemAddedMigration.java create mode 100644 akka-serialization-jackson/src/test/scala/akka/serialization/jackson/ScalaTestEventMigration.scala create mode 100644 akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v1withv2/ItemAddedMigration.scala diff --git a/akka-docs/src/main/paradox/persistence-schema-evolution.md b/akka-docs/src/main/paradox/persistence-schema-evolution.md index c3627ca796..5cfcfa22d2 100644 --- a/akka-docs/src/main/paradox/persistence-schema-evolution.md +++ b/akka-docs/src/main/paradox/persistence-schema-evolution.md @@ -224,7 +224,7 @@ needs to have an associated code which indicates if it is a window or aisle seat Adding fields is the most common change you'll need to apply to your messages so make sure the serialization format you picked for your payloads can handle it apropriately, i.e. such changes should be *binary compatible*. This is achieved using the right serializer toolkit. In the following examples we will be using protobuf. -See also @ref:[how to add fields with Jackson](serialization-jackson.md#add-field). +See also @ref:[how to add fields with Jackson](serialization-jackson.md#add-optional-field). While being able to read messages with missing fields is half of the solution, you also need to deal with the missing values somehow. This is usually modeled as some kind of default value, or by representing the field as an @scala[`Option[T]`]@java[`Optional`] diff --git a/akka-docs/src/main/paradox/serialization-jackson.md b/akka-docs/src/main/paradox/serialization-jackson.md index 83298aa788..1870b5bf30 100644 --- a/akka-docs/src/main/paradox/serialization-jackson.md +++ b/akka-docs/src/main/paradox/serialization-jackson.md @@ -205,7 +205,7 @@ We will look at a few scenarios of how the classes may be evolved. Removing a field can be done without any migration code. The Jackson serializer will ignore properties that does not exist in the class. -### Add Field +### Add Optional Field Adding an optional field can be done without any migration code. The default value will be @scala[None]@java[`Optional.empty`]. @@ -226,6 +226,8 @@ Scala Java : @@snip [ItemAdded.java](/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v2a/ItemAdded.java) { #add-optional } +### Add Mandatory Field + Let's say we want to have a mandatory `discount` property without default value instead: Scala @@ -361,6 +363,63 @@ binding, but it should still be possible to deserialize old data with Jackson. It's a list of class names or prefixes of class names. +## Rolling updates + +When doing a rolling update, for a period of time there are two different binaries running in production. If the schema +has evolved requiring a new schema version, the data serialized by the new binary will be unreadable from the old +binary. This situation causes transient errors on the processes running the old binary. This service degradation is +usually fine since the rolling update will eventually complete and all old processes will be replaced with the new +binary. To avoid this service degradation you can also use forward-one support in your schema evolutions. + +To complete a no-degradation rolling update, you need to make two deployments. First, deploy a new binary which can read +the new schema but still uses the old schema. Then, deploy a second binary which serializes data using the new schema +and drops the downcasting code from the migration. + +Let's take, for example, the case above where we [renamed a field](#rename-field). + +The starting schema is: + +Scala +: @@snip [ItemAdded.java](/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v1/ItemAdded.scala) { #add-optional } + +Java +: @@snip [ItemAdded.java](/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v1/ItemAdded.java) { #add-optional } + +In a first deployment, we still don't make any change to the event class: + +Scala +: @@snip [ItemAdded.scala](/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v1/ItemAdded.scala) { #forward-one-rename } + +Java +: @@snip [ItemAdded.java](/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v1/ItemAdded.java) { #forward-one-rename } + +but we introduce a migration that can read the newer schema which is versioned `2`: + +Scala +: @@snip [ItemAddedMigration.scala](/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v1withv2/ItemAddedMigration.scala) { #forward-one-rename } + +Java +: @@snip [ItemAddedMigration.java](/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v1withv2/ItemAddedMigration.java) { #forward-one-rename } + +Once all running nodes have the new migration code which can read version `2` of `ItemAdded` we can proceed with the +second step. So, we deploy the updated event: + +Scala +: @@snip [ItemAdded.scala](/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v2c/ItemAdded.scala) { #rename } + +Java +: @@snip [ItemAdded.java](/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v2c/ItemAdded.java) { #rename } + +and the final migration code which no longer needs forward-compatibility code: + +Scala +: @@snip [ItemAddedMigration.scala](/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v2c/ItemAddedMigration.scala) { #rename } + +Java +: @@snip [ItemAddedMigration.java](/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v2c/ItemAddedMigration.java) { #rename } + + + ## Jackson Modules The following Jackson modules are enabled by default: diff --git a/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/JacksonMigration.scala b/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/JacksonMigration.scala index c39b2a17e0..58f61173cc 100644 --- a/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/JacksonMigration.scala +++ b/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/JacksonMigration.scala @@ -22,11 +22,22 @@ import akka.util.unused abstract class JacksonMigration { /** - * Define current version. The first version, when no migration was used, - * is always 1. + * Define current version, that is, the value used when serializing new data. The first version, when no + * migration was used, is always 1. */ def currentVersion: Int + /** + * Define the supported forward version this migration can read (must be greater or equal than `currentVersion`). + * If this value is different from [[currentVersion]] a [[JacksonMigration]] may be required to downcast + * the received payload to the current schema. + */ + def supportedForwardVersion: Int = currentVersion + + require( + currentVersion <= supportedForwardVersion, + s"""The "currentVersion" [$currentVersion] of a JacksonMigration must be less or equal to the "supportedForwardVersion" [$supportedForwardVersion].""") + /** * Override this method if you have changed the class name. Return * current class name. diff --git a/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/JacksonSerializer.scala b/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/JacksonSerializer.scala index c3f5084205..8c1cf9f4c1 100644 --- a/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/JacksonSerializer.scala +++ b/akka-serialization-jackson/src/main/scala/akka/serialization/jackson/JacksonSerializer.scala @@ -319,11 +319,16 @@ import akka.util.OptionVal val className = migration match { case Some(transformer) if fromVersion < transformer.currentVersion => transformer.transformClassName(fromVersion, manifestClassName) - case Some(transformer) if fromVersion > transformer.currentVersion => + case Some(transformer) if fromVersion == transformer.currentVersion => + manifestClassName + case Some(transformer) if fromVersion <= transformer.supportedForwardVersion => + transformer.transformClassName(fromVersion, manifestClassName) + case Some(transformer) if fromVersion > transformer.supportedForwardVersion => throw new IllegalStateException( - s"Migration version ${transformer.currentVersion} is " + + s"Migration version ${transformer.supportedForwardVersion} is " + s"behind version $fromVersion of deserialized type [$manifestClassName]") - case _ => manifestClassName + case None => + manifestClassName } if (typeInManifest && (className ne manifestClassName)) @@ -359,7 +364,13 @@ import akka.util.OptionVal val jsonTree = objectMapper.readTree(decompressedBytes) val newJsonTree = transformer.transform(fromVersion, jsonTree) objectMapper.treeToValue(newJsonTree, clazz) - case _ => + case Some(transformer) if fromVersion == transformer.currentVersion => + objectMapper.readValue(decompressedBytes, clazz) + case Some(transformer) if fromVersion <= transformer.supportedForwardVersion => + val jsonTree = objectMapper.readTree(decompressedBytes) + val newJsonTree = transformer.transform(fromVersion, jsonTree) + objectMapper.treeToValue(newJsonTree, clazz) + case None => objectMapper.readValue(decompressedBytes, clazz) } diff --git a/akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestEventMigration.java b/akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestEventMigrationV2.java similarity index 82% rename from akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestEventMigration.java rename to akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestEventMigrationV2.java index c66716b54b..0a0fa12cb4 100644 --- a/akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestEventMigration.java +++ b/akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestEventMigrationV2.java @@ -9,15 +9,16 @@ import com.fasterxml.jackson.databind.node.IntNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.databind.JsonNode; -public class JavaTestEventMigration extends JacksonMigration { +public class JavaTestEventMigrationV2 extends JacksonMigration { @Override public int currentVersion() { - return 3; + return 2; } @Override public String transformClassName(int fromVersion, String className) { + // Ignore the incoming manifest and produce the same class name always. return JavaTestMessages.Event2.class.getName(); } diff --git a/akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestEventMigrationV2WithV3.java b/akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestEventMigrationV2WithV3.java new file mode 100644 index 0000000000..b2ef413583 --- /dev/null +++ b/akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestEventMigrationV2WithV3.java @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2016-2020 Lightbend Inc. + */ + +package akka.serialization.jackson; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.IntNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +public class JavaTestEventMigrationV2WithV3 extends JacksonMigration { + + @Override + public int currentVersion() { + return 2; + } + + @Override + public int supportedForwardVersion() { + return 3; + } + + @Override + public String transformClassName(int fromVersion, String className) { + // Always produce the type of the currentVersion. When fromVersion is lower, + // transform will lift it. When fromVersion is higher, transform will downcast it. + return JavaTestMessages.Event2.class.getName(); + } + + @Override + public JsonNode transform(int fromVersion, JsonNode json) { + ObjectNode root = (ObjectNode) json; + if (fromVersion < 2) { + root = upcastV1ToV2((ObjectNode) json); + } + if (fromVersion == 3) { + root = downcastV3ToV2((ObjectNode) json); + } + return root; + } + + private ObjectNode upcastV1ToV2(ObjectNode json) { + ObjectNode root = json; + root.set("field1V2", root.get("field1")); + root.remove("field1"); + root.set("field2", IntNode.valueOf(17)); + return root; + } + + private ObjectNode downcastV3ToV2(ObjectNode json) { + ObjectNode root = json; + root.set("field2", root.get("field3")); + root.remove("field3"); + return root; + } +} diff --git a/akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestEventMigrationV3.java b/akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestEventMigrationV3.java new file mode 100644 index 0000000000..09fbea9b74 --- /dev/null +++ b/akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestEventMigrationV3.java @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2016-2020 Lightbend Inc. + */ + +package akka.serialization.jackson; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.IntNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +public class JavaTestEventMigrationV3 extends JacksonMigration { + + @Override + public int currentVersion() { + return 3; + } + + @Override + public String transformClassName(int fromVersion, String className) { + // Always produce the type of the currentVersion. When fromVersion is lower, + // transform will lift it. when fromVersion is higher, transform will adapt it. + return JavaTestMessages.Event3.class.getName(); + } + + @Override + public JsonNode transform(int fromVersion, JsonNode json) { + ObjectNode root = (ObjectNode) json; + if (fromVersion < 2) { + root = upcastV1ToV2(root); + } + if (fromVersion < 3) { + root = upcastV2ToV3(root); + } + return root; + } + + private ObjectNode upcastV1ToV2(ObjectNode root) { + root.set("field1V2", root.get("field1")); + root.remove("field1"); + root.set("field2", IntNode.valueOf(17)); + return root; + } + + private ObjectNode upcastV2ToV3(ObjectNode root) { + root.set("field3", root.get("field2")); + root.remove("field2"); + return root; + } +} diff --git a/akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestMessages.java b/akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestMessages.java index 4a5e0e674b..3bc178a6cc 100644 --- a/akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestMessages.java +++ b/akka-serialization-jackson/src/test/java/akka/serialization/jackson/JavaTestMessages.java @@ -386,6 +386,42 @@ public interface JavaTestMessages { } } + public class Event3 implements TestMessage { + private final String field1V2; // same as in Event2 + private final int field3; // renamed field (was field2) + + public Event3(String field1V2, int field3) { + this.field1V2 = field1V2; + this.field3 = field3; + } + + public String getField1V2() { + return field1V2; + } + + public int getField3() { + return field3; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Event3 event3 = (Event3) o; + + if (field3 != event3.field3) return false; + return field1V2 != null ? field1V2.equals(event3.field1V2) : event3.field1V2 == null; + } + + @Override + public int hashCode() { + int result = field1V2 != null ? field1V2.hashCode() : 0; + result = 31 * result + field3; + return result; + } + } + public class Zoo implements TestMessage { public final Animal first; diff --git a/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v1/ItemAdded.java b/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v1/ItemAdded.java index 6a520ade67..1cc1e1aa79 100644 --- a/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v1/ItemAdded.java +++ b/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v1/ItemAdded.java @@ -7,6 +7,7 @@ package jdoc.akka.serialization.jackson.v1; import jdoc.akka.serialization.jackson.MySerializable; // #add-optional +// #forward-one-rename public class ItemAdded implements MySerializable { public final String shoppingCartId; public final String productId; @@ -18,4 +19,5 @@ public class ItemAdded implements MySerializable { this.quantity = quantity; } } +// #forward-one-rename // #add-optional diff --git a/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v1withv2/ItemAddedMigration.java b/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v1withv2/ItemAddedMigration.java new file mode 100644 index 0000000000..dbdb847505 --- /dev/null +++ b/akka-serialization-jackson/src/test/java/jdoc/akka/serialization/jackson/v1withv2/ItemAddedMigration.java @@ -0,0 +1,37 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package jdoc.akka.serialization.jackson.v1withv2; + +// #forward-one-rename + +import akka.serialization.jackson.JacksonMigration; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; + +public class ItemAddedMigration extends JacksonMigration { + + // Data produced in this node is still produced using the version 1 of the schema + @Override + public int currentVersion() { + return 1; + } + + @Override + public int supportedForwardVersion() { + return 2; + } + + @Override + public JsonNode transform(int fromVersion, JsonNode json) { + ObjectNode root = (ObjectNode) json; + if (fromVersion == 2) { + // When receiving an event of version 2 we down-cast it to the version 1 of the schema + root.set("productId", root.get("itemId")); + root.remove("itemId"); + } + return root; + } +} +// #forward-one-rename diff --git a/akka-serialization-jackson/src/test/scala/akka/serialization/jackson/JacksonSerializerSpec.scala b/akka-serialization-jackson/src/test/scala/akka/serialization/jackson/JacksonSerializerSpec.scala index c82bf11a45..29229a0f8b 100644 --- a/akka-serialization-jackson/src/test/scala/akka/serialization/jackson/JacksonSerializerSpec.scala +++ b/akka-serialization-jackson/src/test/scala/akka/serialization/jackson/JacksonSerializerSpec.scala @@ -4,6 +4,7 @@ package akka.serialization.jackson +import java.lang import java.nio.charset.StandardCharsets import java.time.Duration import java.time.Instant @@ -13,12 +14,16 @@ import java.util.Arrays import java.util.Locale import java.util.Optional import java.util.UUID + import java.util.logging.FileHandler import scala.collection.immutable import scala.concurrent.duration._ -import scala.concurrent.duration.FiniteDuration - +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.Address +import akka.actor.BootstrapSetup +import akka.actor.ExtendedActorSystem import com.fasterxml.jackson.annotation.JsonSubTypes import com.fasterxml.jackson.annotation.JsonTypeInfo import com.fasterxml.jackson.core.JsonFactory @@ -28,15 +33,12 @@ import com.fasterxml.jackson.core.StreamReadFeature import com.fasterxml.jackson.core.StreamWriteFeature import com.fasterxml.jackson.core.`type`.TypeReference import com.fasterxml.jackson.databind.DeserializationFeature -import com.fasterxml.jackson.databind.JsonNode import com.fasterxml.jackson.databind.MapperFeature import com.fasterxml.jackson.databind.Module import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.SerializationFeature import com.fasterxml.jackson.databind.exc.InvalidTypeIdException import com.fasterxml.jackson.databind.json.JsonMapper -import com.fasterxml.jackson.databind.node.IntNode -import com.fasterxml.jackson.databind.node.ObjectNode import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule import com.fasterxml.jackson.module.scala.JsonScalaEnumeration import com.github.ghik.silencer.silent @@ -45,11 +47,7 @@ import org.scalatest.BeforeAndAfterAll import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike -import akka.actor.ActorRef -import akka.actor.ActorSystem -import akka.actor.Address -import akka.actor.BootstrapSetup -import akka.actor.ExtendedActorSystem +import scala.concurrent.duration.FiniteDuration import akka.actor.Status import akka.actor.setup.ActorSystemSetup import akka.actor.typed.scaladsl.Behaviors @@ -85,6 +83,7 @@ object ScalaTestMessages { final case class Event1(field1: String) extends TestMessage final case class Event2(field1V2: String, field2: Int) extends TestMessage + final case class Event3(field1V2: String, field3: Int) extends TestMessage final case class Zoo(first: Animal) extends TestMessage @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @@ -117,21 +116,6 @@ object ScalaTestMessages { } -class ScalaTestEventMigration extends JacksonMigration { - override def currentVersion = 3 - - override def transformClassName(fromVersion: Int, className: String): String = - classOf[ScalaTestMessages.Event2].getName - - override def transform(fromVersion: Int, json: JsonNode): JsonNode = { - val root = json.asInstanceOf[ObjectNode] - root.set[JsonNode]("field1V2", root.get("field1")) - root.remove("field1") - root.set[JsonNode]("field2", IntNode.valueOf(17)) - root - } -} - class JacksonCborSerializerSpec extends JacksonSerializerSpec("jackson-cbor") { "have compression disabled by default" in { val conf = JacksonObjectMapperProvider.configForBinding("jackson-cbor", system.settings.config) @@ -621,17 +605,8 @@ class JacksonJsonSerializerSpec extends JacksonSerializerSpec("jackson-json") { } } -abstract class JacksonSerializerSpec(serializerName: String) - extends TestKit( - ActorSystem( - "JacksonJsonSerializerSpec", - ConfigFactory.parseString(s""" - akka.serialization.jackson.migrations { - "akka.serialization.jackson.JavaTestMessages$$Event1" = "akka.serialization.jackson.JavaTestEventMigration" - "akka.serialization.jackson.JavaTestMessages$$Event2" = "akka.serialization.jackson.JavaTestEventMigration" - "akka.serialization.jackson.ScalaTestMessages$$Event1" = "akka.serialization.jackson.ScalaTestEventMigration" - "akka.serialization.jackson.ScalaTestMessages$$Event2" = "akka.serialization.jackson.ScalaTestEventMigration" - } +object JacksonSerializerSpec { + def baseConfig(serializerName: String): String = s""" akka.actor { serialization-bindings { "akka.serialization.jackson.ScalaTestMessages$$TestMessage" = $serializerName @@ -639,7 +614,14 @@ abstract class JacksonSerializerSpec(serializerName: String) } } akka.serialization.jackson.allowed-class-prefix = ["akka.serialization.jackson.ScalaTestMessages$$OldCommand"] - """))) + """ +} + +abstract class JacksonSerializerSpec(serializerName: String) + extends TestKit( + ActorSystem( + "JacksonJsonSerializerSpec", + ConfigFactory.parseString(JacksonSerializerSpec.baseConfig(serializerName)))) with AnyWordSpecLike with Matchers with BeforeAndAfterAll { @@ -772,22 +754,138 @@ abstract class JacksonSerializerSpec(serializerName: String) } } - "deserialize with migrations" in { + // TODO: Consider moving the migrations Specs to a separate Spec + "deserialize with migrations" in withSystem(s""" + akka.serialization.jackson.migrations { + ## Usually the key is a FQCN but we're hacking the name to use multiple migrations for the + ## same type in a single test. + "deserialize-Java.Event1-into-Java.Event3" = "akka.serialization.jackson.JavaTestEventMigrationV3" + } + """ + JacksonSerializerSpec.baseConfig(serializerName)) { sys => val event1 = new Event1("a") - val serializer = serializerFor(event1) + val serializer = serializerFor(event1, sys) val blob = serializer.toBinary(event1) - val event2 = serializer.fromBinary(blob, classOf[Event1].getName).asInstanceOf[Event2] - event1.getField1 should ===(event2.getField1V2) - event2.getField2 should ===(17) + + // Event1 has no migration configured so it uses the default manifest name (with no version) + serializer.manifest(event1) should ===(classOf[Event1].getName) + + // Hack the manifest to enforce the use a particular migration when deserializing the blob of Event1 + val event3 = serializer.fromBinary(blob, "deserialize-Java.Event1-into-Java.Event3").asInstanceOf[Event3] + event1.getField1 should ===(event3.getField1V2) + event3.getField3 should ===(17) } "deserialize with migrations from V2" in { + // produce a blob/manifest from an ActorSystem without migrations val event1 = new Event1("a") val serializer = serializerFor(event1) val blob = serializer.toBinary(event1) - val event2 = serializer.fromBinary(blob, classOf[Event1].getName + "#2").asInstanceOf[Event2] - event1.getField1 should ===(event2.getField1V2) - event2.getField2 should ===(17) + val manifest = serializer.manifest(event1) + + withSystem(s""" + akka.serialization.jackson.migrations { + "akka.serialization.jackson.JavaTestMessages$$Event1" = "akka.serialization.jackson.JavaTestEventMigrationV2" + "akka.serialization.jackson.JavaTestMessages$$Event2" = "akka.serialization.jackson.JavaTestEventMigrationV2" + } + """ + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2 => + // read the blob/manifest from an ActorSystem with migrations + val serializerV2: JacksonSerializer = serializerFor(event1, sysV2) + val event2 = serializerV2.fromBinary(blob, manifest).asInstanceOf[Event2] + event1.getField1 should ===(event2.getField1V2) + event2.getField2 should ===(17) + + // Event2 has a migration configured so it uses a manifest with a version + val serializerFor2 = serializerFor(event2, sysV2) + serializerFor2.manifest(event2) should ===(classOf[Event2].getName + "#2") + } + + } + + "use the migration's currentVersion on new serializations" in { + withSystem(s""" + akka.serialization.jackson.migrations { + "akka.serialization.jackson.JavaTestMessages$$Event2" = "akka.serialization.jackson.JavaTestEventMigrationV2" + } + """ + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2 => + val event2 = new Event2("a", 17) + // Event2 has a migration configured so it uses a manifest with a version + val serializer2 = serializerFor(event2, sysV2) + serializer2.manifest(event2) should ===(classOf[Event2].getName + "#2") + } + } + + "use the migration's currentVersion on new serializations when supporting forward versions" in { + withSystem(s""" + akka.serialization.jackson.migrations { + "akka.serialization.jackson.JavaTestMessages$$Event2" = "akka.serialization.jackson.JavaTestEventMigrationV2WithV3" + } + """ + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2 => + val event2 = new Event2("a", 17) + // Event2 has a migration configured so it uses a manifest with a version + val serializer2 = serializerFor(event2, sysV2) + serializer2.manifest(event2) should ===(classOf[Event2].getName + "#2") + } + } + + "deserialize a V3 blob into a V2 class (forward-one support) and back" in { + + val blobV3 = + withSystem(s""" + akka.serialization.jackson.migrations { + "akka.serialization.jackson.JavaTestMessages$$Event3" = "akka.serialization.jackson.JavaTestEventMigrationV3" + } + """ + JacksonSerializerSpec.baseConfig(serializerName)) { sysV3 => + val event3 = new Event3("Steve", 49) + val serializer = serializerFor(event3, sysV3) + val blob = serializer.toBinary(event3) + blob + } + + val blobV2 = + withSystem(s""" + akka.serialization.jackson.migrations { + "akka.serialization.jackson.JavaTestMessages$$Event2" = "akka.serialization.jackson.JavaTestEventMigrationV2WithV3" + } + """ + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2WithV3 => + val serializerForEvent2 = + serialization(sysV2WithV3).serializerFor(classOf[Event2]).asInstanceOf[JacksonSerializer] + val event2 = serializerForEvent2.fromBinary(blobV3, classOf[Event2].getName + "#3").asInstanceOf[Event2] + event2.getField1V2 should ===("Steve") + event2.getField2 should ===(49) + serializerForEvent2.toBinary(event2) + } + + withSystem(s""" + akka.serialization.jackson.migrations { + "akka.serialization.jackson.JavaTestMessages$$Event3" = "akka.serialization.jackson.JavaTestEventMigrationV3" + } + """ + JacksonSerializerSpec.baseConfig(serializerName)) { sysV3 => + val serializerForEvent3 = serialization(sysV3).serializerFor(classOf[Event3]).asInstanceOf[JacksonSerializer] + val event3 = serializerForEvent3.fromBinary(blobV2, classOf[Event3].getName + "#2").asInstanceOf[Event3] + event3.getField1V2 should ===("Steve") + event3.getField3 should ===(49) + } + } + + "deserialize unsupported versions throws an exception" in { + intercept[lang.IllegalStateException] { + withSystem(s""" + akka.serialization.jackson.migrations { + "akka.serialization.jackson.JavaTestMessages$$Event1" = "akka.serialization.jackson.JavaTestEventMigrationV2" + "akka.serialization.jackson.JavaTestMessages$$Event2" = "akka.serialization.jackson.JavaTestEventMigrationV2" + } + """ + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2 => + // produce a blob/manifest from an ActorSystem without migrations + val event1 = new Event1("a") + val serializer = serializerFor(event1) + val blob = serializer.toBinary(event1) + val manifest = serializer.manifest(event1) + // Event1 has no migration configured so it uses the default manifest name (with no version) + val serializerV2: JacksonSerializer = serializerFor(event1, sysV2) + serializerV2.fromBinary(blob, manifest + "#9").asInstanceOf[Event2] + } + + } } } @@ -882,22 +980,125 @@ abstract class JacksonSerializerSpec(serializerName: String) } } - "deserialize with migrations" in { + // TODO: Consider moving the migrations Specs to a separate Spec + "deserialize with migrations" in withSystem(s""" + akka.serialization.jackson.migrations { + ## Usually the key is a FQCN but we're hacking the name to use multiple migrations for the + ## same type in a single test. + "deserialize-Event1-into-Event3" = "akka.serialization.jackson.ScalaTestEventMigrationV3" + } + """ + JacksonSerializerSpec.baseConfig(serializerName)) { sys => val event1 = Event1("a") - val serializer = serializerFor(event1) + val serializer = serializerFor(event1, sys) val blob = serializer.toBinary(event1) - val event2 = serializer.fromBinary(blob, classOf[Event1].getName).asInstanceOf[Event2] - event1.field1 should ===(event2.field1V2) - event2.field2 should ===(17) + + // Event1 has no migration configured so it uses the default manifest name (with no version) + serializer.manifest(event1) should ===(classOf[Event1].getName) + + // Hack the manifest to enforce the use a particular migration when deserializing the blob of Event1 + val event3 = serializer.fromBinary(blob, "deserialize-Event1-into-Event3").asInstanceOf[Event3] + event1.field1 should ===(event3.field1V2) + event3.field3 should ===(17) } "deserialize with migrations from V2" in { + // produce a blob/manifest from an ActorSystem without migrations val event1 = Event1("a") val serializer = serializerFor(event1) val blob = serializer.toBinary(event1) - val event2 = serializer.fromBinary(blob, classOf[Event1].getName + "#2").asInstanceOf[Event2] - event1.field1 should ===(event2.field1V2) - event2.field2 should ===(17) + val manifest = serializer.manifest(event1) + + withSystem(s""" + akka.serialization.jackson.migrations { + "akka.serialization.jackson.ScalaTestMessages$$Event1" = "akka.serialization.jackson.ScalaTestEventMigrationV2" + "akka.serialization.jackson.ScalaTestMessages$$Event2" = "akka.serialization.jackson.ScalaTestEventMigrationV2" + } + """ + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2 => + // read the blob/manifest from an ActorSystem with migrations + val serializerV2: JacksonSerializer = serializerFor(event1, sysV2) + val event2 = serializerV2.fromBinary(blob, manifest).asInstanceOf[Event2] + event1.field1 should ===(event2.field1V2) + event2.field2 should ===(17) + + // Event2 has a migration configured so it uses a manifest with a version + val serializerFor2 = serializerFor(event2, sysV2) + serializerFor2.manifest(event2) should ===(classOf[Event2].getName + "#2") + } + + } + + "use the migration's currentVersion on new serializations" in { + withSystem(s""" + akka.serialization.jackson.migrations { + "akka.serialization.jackson.ScalaTestMessages$$Event2" = "akka.serialization.jackson.ScalaTestEventMigrationV2" + } + """ + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2 => + val event2 = new Event2("a", 17) + // Event2 has a migration configured so it uses a manifest with a version + val serializer2 = serializerFor(event2, sysV2) + serializer2.manifest(event2) should ===(classOf[Event2].getName + "#2") + } + } + + "deserialize a V3 blob into a V2 class (forward-one support) and back" in { + + val blobV3 = + withSystem(s""" + akka.serialization.jackson.migrations { + "akka.serialization.jackson.ScalaTestMessages$$Event3" = "akka.serialization.jackson.ScalaTestEventMigrationV3" + } + """ + JacksonSerializerSpec.baseConfig(serializerName)) { sysV3 => + val event3 = new Event3("Steve", 49) + val serializer = serializerFor(event3, sysV3) + val blob = serializer.toBinary(event3) + blob + } + + val blobV2 = + withSystem(s""" + akka.serialization.jackson.migrations { + "akka.serialization.jackson.ScalaTestMessages$$Event2" = "akka.serialization.jackson.ScalaTestEventMigrationV2WithV3" + } + """ + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2WithV3 => + val serializerForEvent2 = + serialization(sysV2WithV3).serializerFor(classOf[Event2]).asInstanceOf[JacksonSerializer] + val event2 = serializerForEvent2.fromBinary(blobV3, classOf[Event2].getName + "#3").asInstanceOf[Event2] + event2.field1V2 should ===("Steve") + event2.field2 should ===(49) + serializerForEvent2.toBinary(event2) + } + + withSystem(s""" + akka.serialization.jackson.migrations { + "akka.serialization.jackson.ScalaTestMessages$$Event3" = "akka.serialization.jackson.ScalaTestEventMigrationV3" + } + """ + JacksonSerializerSpec.baseConfig(serializerName)) { sysV3 => + val serializerForEvent3 = serialization(sysV3).serializerFor(classOf[Event3]).asInstanceOf[JacksonSerializer] + val event3 = serializerForEvent3.fromBinary(blobV2, classOf[Event3].getName + "#2").asInstanceOf[Event3] + event3.field1V2 should ===("Steve") + event3.field3 should ===(49) + } + } + + "deserialize unsupported versions throws an exception" in { + intercept[lang.IllegalStateException] { + withSystem(s""" + akka.serialization.jackson.migrations { + "akka.serialization.jackson.ScalaTestMessages$$Event1" = "akka.serialization.jackson.ScalaTestEventMigrationV2" + "akka.serialization.jackson.ScalaTestMessages$$Event2" = "akka.serialization.jackson.ScalaTestEventMigrationV2" + } + """ + JacksonSerializerSpec.baseConfig(serializerName)) { sysV2 => + // produce a blob/manifest from an ActorSystem without migrations + val event1 = new Event1("a") + val serializer = serializerFor(event1) + val blob = serializer.toBinary(event1) + val manifest = serializer.manifest(event1) + // Event1 has no migration configured so it uses the default manifest name (with no version) + val serializerV2: JacksonSerializer = serializerFor(event1, sysV2) + serializerV2.fromBinary(blob, manifest + "#9").asInstanceOf[Event2] + } + + } } "not allow serialization of deny listed class" in { diff --git a/akka-serialization-jackson/src/test/scala/akka/serialization/jackson/ScalaTestEventMigration.scala b/akka-serialization-jackson/src/test/scala/akka/serialization/jackson/ScalaTestEventMigration.scala new file mode 100644 index 0000000000..b50dddb273 --- /dev/null +++ b/akka-serialization-jackson/src/test/scala/akka/serialization/jackson/ScalaTestEventMigration.scala @@ -0,0 +1,94 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.serialization.jackson + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.IntNode +import com.fasterxml.jackson.databind.node.ObjectNode + +object ScalaTestEventMigration { + def upcastV1ToV2(root: ObjectNode): ObjectNode = { + root.set[JsonNode]("field1V2", root.get("field1")) + root.remove("field1") + root.set[JsonNode]("field2", IntNode.valueOf(17)) + root + } + + def upcastV2ToV3(root: ObjectNode): ObjectNode = { + root.set("field3", root.get("field2")) + root.remove("field2") + root + } + + def downcastV3ToV2(root: ObjectNode) = { + // downcast the V3 representation to the V2 representation. A field + // is renamed. + root.set("field2", root.get("field3")) + root.remove("field3") + root + } + +} + +class ScalaTestEventMigrationV2 extends JacksonMigration { + import ScalaTestEventMigration._ + + override def currentVersion = 2 + + override def transformClassName(fromVersion: Int, className: String): String = + classOf[ScalaTestMessages.Event2].getName + + override def transform(fromVersion: Int, json: JsonNode): JsonNode = { + val root = json.asInstanceOf[ObjectNode] + upcastV1ToV2(root) + } + +} + +class ScalaTestEventMigrationV2WithV3 extends JacksonMigration { + import ScalaTestEventMigration._ + + override def currentVersion = 2 + + override def supportedForwardVersion: Int = 3 + + // Always produce the type of the currentVersion. When fromVersion is lower, + // transform will lift it. When fromVersion is higher, transform will downcast it. + override def transformClassName(fromVersion: Int, className: String): String = + classOf[ScalaTestMessages.Event2].getName + + override def transform(fromVersion: Int, json: JsonNode): JsonNode = { + var root = json.asInstanceOf[ObjectNode] + if (fromVersion < 2) { + root = upcastV1ToV2(root) + } + if (fromVersion == 3) { + root = downcastV3ToV2(root) + } + root + } + +} + +class ScalaTestEventMigrationV3 extends JacksonMigration { + import ScalaTestEventMigration._ + + override def currentVersion = 3 + + override def transformClassName(fromVersion: Int, className: String): String = + classOf[ScalaTestMessages.Event3].getName + + override def transform(fromVersion: Int, json: JsonNode): JsonNode = { + var root = json.asInstanceOf[ObjectNode] + if (fromVersion < 2) { + root = upcastV1ToV2(root) + } + if (fromVersion < 3) { + root = upcastV2ToV3(root) + } + root + } + +} diff --git a/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v1/ItemAdded.scala b/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v1/ItemAdded.scala index 2fa81b2360..86cc7134ef 100644 --- a/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v1/ItemAdded.scala +++ b/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v1/ItemAdded.scala @@ -7,5 +7,7 @@ package doc.akka.serialization.jackson.v1 import doc.akka.serialization.jackson.MySerializable // #add-optional +// #forward-one-rename case class ItemAdded(shoppingCartId: String, productId: String, quantity: Int) extends MySerializable +// #forward-one-rename // #add-optional diff --git a/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v1withv2/ItemAddedMigration.scala b/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v1withv2/ItemAddedMigration.scala new file mode 100644 index 0000000000..a2aa09cc35 --- /dev/null +++ b/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/v1withv2/ItemAddedMigration.scala @@ -0,0 +1,29 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package doc.akka.serialization.jackson.v1withv2 + +// #forward-one-rename +import akka.serialization.jackson.JacksonMigration +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.ObjectNode + +class ItemAddedMigration extends JacksonMigration { + + // Data produced in this node is still produced using the version 1 of the schema + override def currentVersion: Int = 1 + + override def supportedForwardVersion: Int = 2 + + override def transform(fromVersion: Int, json: JsonNode): JsonNode = { + val root = json.asInstanceOf[ObjectNode] + if (fromVersion == 2) { + // When receiving an event of version 2 we down-cast it to the version 1 of the schema + root.set[JsonNode]("productId", root.get("itemId")) + root.remove("itemId") + } + root + } +} +// #forward-one-rename From 418d6d3ec0a8c16f3bc3f54b770ba60dfd0c5b65 Mon Sep 17 00:00:00 2001 From: Ignasi Marimon-Clos Date: Tue, 25 Aug 2020 11:11:06 +0200 Subject: [PATCH 22/29] Prefer "update" over "upgrade" when rolling (#29523) --- .../main/paradox/additional/rolling-updates.md | 18 +++++++++--------- .../src/main/paradox/project/rolling-update.md | 2 +- akka-docs/src/main/paradox/serialization.md | 12 ++++++------ akka-docs/src/main/paradox/typed/cluster-dc.md | 2 +- .../src/main/paradox/typed/cluster-sharding.md | 4 ++-- 5 files changed, 19 insertions(+), 19 deletions(-) diff --git a/akka-docs/src/main/paradox/additional/rolling-updates.md b/akka-docs/src/main/paradox/additional/rolling-updates.md index 225cc47428..8c4b60f117 100644 --- a/akka-docs/src/main/paradox/additional/rolling-updates.md +++ b/akka-docs/src/main/paradox/additional/rolling-updates.md @@ -27,30 +27,30 @@ There are two parts of Akka that need careful consideration when performing an r 1. Serialization format of persisted events and snapshots. New nodes must be able to read old data, and during the update old nodes must be able to read data stored by new nodes. -There are many more application specific aspects for serialization changes during rolling upgrades to consider. +There are many more application specific aspects for serialization changes during rolling updates to consider. For example based on the use case and requirements, whether to allow dropped messages or tear down the TCP connection when the manifest is unknown. -When some message loss during a rolling upgrade is acceptable versus a full shutdown and restart, assuming the application recovers afterwards +When some message loss during a rolling update is acceptable versus a full shutdown and restart, assuming the application recovers afterwards * If a `java.io.NotSerializableException` is thrown in `fromBinary` this is treated as a transient problem, the issue logged and the message is dropped * If other exceptions are thrown it can be an indication of corrupt bytes from the underlying transport, and the connection is broken -For more zero-impact rolling upgrades, it is important to consider a strategy for serialization that can be evolved. -One approach to retiring a serializer without downtime is described in @ref:[two rolling upgrade steps to switch to the new serializer](../serialization.md#rolling-upgrades). +For more zero-impact rolling updates, it is important to consider a strategy for serialization that can be evolved. +One approach to retiring a serializer without downtime is described in @ref:[two rolling update steps to switch to the new serializer](../serialization.md#rolling-updates). Additionally you can find advice on @ref:[Persistence - Schema Evolution](../persistence-schema-evolution.md) which also applies to remote messages when deploying with rolling updates. ## Cluster Sharding -During a rolling upgrade, sharded entities receiving traffic may be moved during @ref:[shard rebalancing](../typed/cluster-sharding-concepts.md#shard-rebalancing), +During a rolling update, sharded entities receiving traffic may be moved during @ref:[shard rebalancing](../typed/cluster-sharding-concepts.md#shard-rebalancing), to an old or new node in the cluster, based on the pluggable allocation strategy and settings. When an old node is stopped the shards that were running on it are moved to one of the other old nodes remaining in the cluster. The `ShardCoordinator` is itself a cluster singleton. -To minimize downtime of the shard coordinator, see the strategies about @ref[ClusterSingleton](#cluster-singleton) rolling upgrades below. +To minimize downtime of the shard coordinator, see the strategies about @ref[ClusterSingleton](#cluster-singleton) rolling updates below. A few specific changes to sharding configuration require @ref:[a full cluster restart](#cluster-sharding-configuration-change). ## Cluster Singleton -Cluster singletons are always running on the oldest node. To avoid moving cluster singletons more than necessary during a rolling upgrade, -it is recommended to upgrade the oldest node last. This way cluster singletons are only moved once during a full rolling upgrade. +Cluster singletons are always running on the oldest node. To avoid moving cluster singletons more than necessary during a rolling update, +it is recommended to upgrade the oldest node last. This way cluster singletons are only moved once during a full rolling update. Otherwise, in the worst case cluster singletons may be migrated from node to node which requires coordination and initialization overhead several times. @@ -160,5 +160,5 @@ Rolling update is not supported when @ref:[changing the remoting transport](../r ### Migrating from Classic Sharding to Typed Sharding -If you have been using classic sharding it is possible to do a rolling upgrade to typed sharding using a 3 step procedure. +If you have been using classic sharding it is possible to do a rolling update to typed sharding using a 3 step procedure. The steps along with example commits are detailed in [this sample PR](https://github.com/akka/akka-samples/pull/110) diff --git a/akka-docs/src/main/paradox/project/rolling-update.md b/akka-docs/src/main/paradox/project/rolling-update.md index 8c5cbbe5c8..361facd122 100644 --- a/akka-docs/src/main/paradox/project/rolling-update.md +++ b/akka-docs/src/main/paradox/project/rolling-update.md @@ -92,7 +92,7 @@ This means that a rolling update will have to go through at least one of 2.6.2, Issue: [#28918](https://github.com/akka/akka/issues/28918). JacksonCborSerializer was using plain JSON format instead of CBOR. -If you have `jackson-cbor` in your `serialization-bindings` a rolling upgrade will have to go through 2.6.5 when +If you have `jackson-cbor` in your `serialization-bindings` a rolling update will have to go through 2.6.5 when upgrading to 2.6.5 or higher. In Akka 2.6.5 the `jackson-cbor` binding will still serialize to JSON format to support rolling update from 2.6.4. diff --git a/akka-docs/src/main/paradox/serialization.md b/akka-docs/src/main/paradox/serialization.md index 64cde0b0b8..ee0d91da07 100644 --- a/akka-docs/src/main/paradox/serialization.md +++ b/akka-docs/src/main/paradox/serialization.md @@ -180,7 +180,7 @@ should be serialized by it. It's recommended to throw `IllegalArgumentException` or `java.io.NotSerializableException` in `fromBinary` if the manifest is unknown. This makes it possible to introduce new message types and send them to nodes that don't know about them. This is typically needed when performing -rolling upgrades, i.e. running a cluster with mixed versions for a while. +rolling updates, i.e. running a cluster with mixed versions for a while. Those exceptions are treated as a transient problem in the classic remoting layer. The problem will be logged and the message dropped. Other exceptions will tear down the TCP connection because it can be an indication of corrupt bytes from the underlying @@ -252,24 +252,24 @@ akka.actor.warn-about-java-serializer-usage = off It is not safe to mix major Scala versions when using the Java serialization as Scala does not guarantee compatibility and this could lead to very surprising errors. -## Rolling upgrades +## Rolling updates A serialized remote message (or persistent event) consists of serializer-id, the manifest, and the binary payload. When deserializing it is only looking at the serializer-id to pick which `Serializer` to use for `fromBinary`. The message class (the bindings) is not used for deserialization. The manifest is only used within the `Serializer` to decide how to deserialize the payload, so one `Serializer` can handle many classes. -That means that it is possible to change serialization for a message by performing two rolling upgrade steps to +That means that it is possible to change serialization for a message by performing two rolling update steps to switch to the new serializer. 1. Add the `Serializer` class and define it in `akka.actor.serializers` config section, but not in - `akka.actor.serialization-bindings`. Perform a rolling upgrade for this change. This means that the + `akka.actor.serialization-bindings`. Perform a rolling update for this change. This means that the serializer class exists on all nodes and is registered, but it is still not used for serializing any - messages. That is important because during the rolling upgrade the old nodes still don't know about + messages. That is important because during the rolling update the old nodes still don't know about the new serializer and would not be able to deserialize messages with that format. 1. The second change is to register that the serializer is to be used for certain classes by defining - those in the `akka.actor.serialization-bindings` config section. Perform a rolling upgrade for this + those in the `akka.actor.serialization-bindings` config section. Perform a rolling update for this change. This means that new nodes will use the new serializer when sending messages and old nodes will be able to deserialize the new format. Old nodes will continue to use the old serializer when sending messages and new nodes will be able to deserialize the old format. diff --git a/akka-docs/src/main/paradox/typed/cluster-dc.md b/akka-docs/src/main/paradox/typed/cluster-dc.md index 8d512cb7af..ebe05ed07e 100644 --- a/akka-docs/src/main/paradox/typed/cluster-dc.md +++ b/akka-docs/src/main/paradox/typed/cluster-dc.md @@ -140,7 +140,7 @@ The reason for only using a limited number of nodes is to keep the number of con centers low. The same nodes are also used for the gossip protocol when disseminating the membership information across data centers. Within a data center all nodes are involved in gossip and failure detection. -This influences how rolling upgrades should be performed. Don't stop all of the oldest that are used for gossip +This influences how rolling updates should be performed. Don't stop all of the oldest that are used for gossip at the same time. Stop one or a few at a time so that new nodes can take over the responsibility. It's best to leave the oldest nodes until last. diff --git a/akka-docs/src/main/paradox/typed/cluster-sharding.md b/akka-docs/src/main/paradox/typed/cluster-sharding.md index 83a007d8c7..5975e6fa94 100644 --- a/akka-docs/src/main/paradox/typed/cluster-sharding.md +++ b/akka-docs/src/main/paradox/typed/cluster-sharding.md @@ -293,7 +293,7 @@ Cluster Sharding uses its own Distributed Data `Replicator` per node. If using roles with sharding there is one `Replicator` per role, which enables a subset of all nodes for some entity types and another subset for other entity types. Each replicator has a name that contains the node role and therefore the role configuration must be the same on all nodes in the -cluster, for example you can't change the roles when performing a rolling upgrade. +cluster, for example you can't change the roles when performing a rolling update. Changing roles requires @ref:[a full cluster restart](../additional/rolling-updates.md#cluster-sharding-configuration-change). The `akka.cluster.sharding.distributed-data` config section configures the settings for Distributed Data. @@ -413,7 +413,7 @@ akka.persistence.cassandra.journal { } ``` -Once you have migrated you cannot go back to the old persistence store, a rolling upgrade is therefore not possible. +Once you have migrated you cannot go back to the old persistence store, a rolling update is therefore not possible. When @ref:[Distributed Data mode](#distributed-data-mode) is used the identifiers of the entities are stored in @ref:[Durable Storage](distributed-data.md#durable-storage) of Distributed Data. You may want to change the From 1180b8364eda38e4f39085303622f03a23e76ac2 Mon Sep 17 00:00:00 2001 From: Josep Prat Date: Wed, 26 Aug 2020 12:03:01 +0200 Subject: [PATCH 23/29] Use the correct name for Akka HTTP (#29527) Fixes the typo in the name of Akka HTTP, it's written in lower case but the project name is in upper case. --- akka-docs/src/main/paradox/typed/dispatchers.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-docs/src/main/paradox/typed/dispatchers.md b/akka-docs/src/main/paradox/typed/dispatchers.md index 47ecdb4deb..deeb12d35b 100644 --- a/akka-docs/src/main/paradox/typed/dispatchers.md +++ b/akka-docs/src/main/paradox/typed/dispatchers.md @@ -173,7 +173,7 @@ avoid blocking APIs. The following solution explains how to handle blocking operations properly. Note that the same hints apply to managing blocking operations anywhere in Akka, -including Streams, Http and other reactive libraries built on top of it. +including Streams, HTTP and other reactive libraries built on top of it. @@@ From 7368e134582f0324437991c75619368eae4f24c5 Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Fri, 28 Aug 2020 14:37:04 +0100 Subject: [PATCH 24/29] Start ShardedDaemon pinging once member is up (#29526) --- .../internal/ShardedDaemonProcessImpl.scala | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala index fa6df2ab0b..7274927f74 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/internal/ShardedDaemonProcessImpl.scala @@ -29,6 +29,8 @@ import akka.cluster.sharding.typed.scaladsl.Entity import akka.cluster.sharding.typed.scaladsl.EntityTypeKey import akka.cluster.sharding.typed.scaladsl.StartEntity import akka.cluster.typed.Cluster +import akka.cluster.typed.SelfUp +import akka.cluster.typed.Subscribe import akka.util.PrettyDuration /** @@ -39,7 +41,8 @@ private[akka] object ShardedDaemonProcessImpl { object KeepAlivePinger { sealed trait Event - case object Tick extends Event + private case object Tick extends Event + private case object StartTick extends Event def apply[T]( settings: ShardedDaemonProcessSettings, @@ -47,19 +50,22 @@ private[akka] object ShardedDaemonProcessImpl { identities: Set[EntityId], shardingRef: ActorRef[ShardingEnvelope[T]]): Behavior[Event] = Behaviors.setup { context => + Cluster(context.system).subscriptions ! Subscribe( + context.messageAdapter[SelfUp](_ => StartTick), + classOf[SelfUp]) Behaviors.withTimers { timers => def triggerStartAll(): Unit = { identities.foreach(id => shardingRef ! StartEntity(id)) } - - context.log.debug2( - s"Starting Sharded Daemon Process KeepAlivePinger for [{}], with ping interval [{}]", - name, - PrettyDuration.format(settings.keepAliveInterval)) - timers.startTimerWithFixedDelay(Tick, settings.keepAliveInterval) - triggerStartAll() - Behaviors.receiveMessage { + case StartTick => + triggerStartAll() + context.log.debug2( + s"Starting Sharded Daemon Process KeepAlivePinger for [{}], with ping interval [{}]", + name, + PrettyDuration.format(settings.keepAliveInterval)) + timers.startTimerWithFixedDelay(Tick, settings.keepAliveInterval) + Behaviors.same case Tick => triggerStartAll() context.log.debug("Periodic ping sent to [{}] processes", identities.size) From 99f21dba3acd4e28d29ff665c783903b288d2955 Mon Sep 17 00:00:00 2001 From: Patrick Altaie Date: Tue, 1 Sep 2020 08:42:25 +0100 Subject: [PATCH 25/29] Add "akka" prefix to TCK code sample (#29529) The current code sample for trying out the TCK misses the "akka" prefix which means that the TCK test will fail with an error regarding not specifying a journal plugin (because the journal plugin property is wrong in the sample) Fixes #29528 --- .../java/jdocs/persistence/LambdaPersistencePluginDocTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java b/akka-docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java index e7c4fe8306..415cdb6201 100644 --- a/akka-docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java +++ b/akka-docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java @@ -139,7 +139,7 @@ public class LambdaPersistencePluginDocTest { public MyJournalSpecTest() { super( ConfigFactory.parseString( - "persistence.journal.plugin = " + "akka.persistence.journal.plugin = " + "\"akka.persistence.journal.leveldb-shared\"")); } From 4249b02c404440bf6ff3c1ddbb0e6b1a74e78b7d Mon Sep 17 00:00:00 2001 From: yiksanchan Date: Tue, 1 Sep 2020 01:57:15 -0700 Subject: [PATCH 26/29] Fix grammar in exception message (#29533) --- .../main/scala/akka/actor/typed/internal/ActorContextImpl.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala index 70dfcc2b4c..8950a299f8 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala @@ -318,7 +318,7 @@ import scala.util.Success case OptionVal.Some(t) => throw new IllegalStateException( s"Invalid access by thread from the outside of $self. " + - s"Current message is processed by $t, but also accessed from from ${Thread.currentThread()}.") + s"Current message is processed by $t, but also accessed from ${Thread.currentThread()}.") } } From 9b709df2d07962c0711ee84360ca759f4e4a677b Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 2 Sep 2020 12:48:52 +0200 Subject: [PATCH 27/29] Fix acceptable-heartbeat-pause in cluster.StressSpec, #29512 (#29541) --- akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index cba721ed4d..83138b3746 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -97,7 +97,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { akka.actor.provider = cluster akka.cluster { - failure-detector.acceptable-heartbeat-pause = 10s + failure-detector.acceptable-heartbeat-pause = 3s downing-provider-class = akka.cluster.sbr.SplitBrainResolverProvider split-brain-resolver { stable-after = 5s From 192be028a07c4f12df816d84588c6215cd6a4d0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Wed, 2 Sep 2020 14:18:44 +0200 Subject: [PATCH 28/29] Sharding request-to-start but already started logging (#29545) --- .../src/main/scala/akka/cluster/sharding/Shard.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index 925da2e7f3..2805517796 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -772,7 +772,8 @@ private[akka] class Shard( private def startEntity(entityId: EntityId, ackTo: Option[ActorRef]): Unit = { entities.entityState(entityId) match { case Active(_) => - log.debug("Request to start entity [{}] (Already started)", entityId) + if (verboseDebug) + log.debug("Request to start entity [{}] (Already started)", entityId) touchLastMessageTimestamp(entityId) ackTo.foreach(_ ! ShardRegion.StartEntityAck(entityId, shardId)) case _: RememberingStart => From 32ec0428d367c7dd4c330547a440a34318f88283 Mon Sep 17 00:00:00 2001 From: Renato Cavalcanti Date: Wed, 2 Sep 2020 14:19:13 +0200 Subject: [PATCH 29/29] Document workaround for jackson serialiaztion of scala case objects (#29531) * workaround for jackson serialiaztion of scala case objects * improved sentence --- .../src/main/paradox/serialization-jackson.md | 13 +++- .../jackson/SerializationDocSpec.scala | 73 +++++++++++++++---- 2 files changed, 71 insertions(+), 15 deletions(-) diff --git a/akka-docs/src/main/paradox/serialization-jackson.md b/akka-docs/src/main/paradox/serialization-jackson.md index 1870b5bf30..21b3aa1496 100644 --- a/akka-docs/src/main/paradox/serialization-jackson.md +++ b/akka-docs/src/main/paradox/serialization-jackson.md @@ -164,11 +164,20 @@ when using polymorphic types. ### ADT with trait and case object -In Scala it's common to use a sealed trait and case objects to represent enums. If the values are case classes +It's common in Scala to use a sealed trait and case objects to represent enums. If the values are case classes the `@JsonSubTypes` annotation as described above works, but if the values are case objects it will not. The annotation requires a `Class` and there is no way to define that in an annotation for a `case object`. -This can be solved by implementing a custom serialization for the enums. Annotate the `trait` with +The easiest workaround is to define the case objects as case class without any field. + +Alternatively, you can define an intermediate trait for the case object and a custom deserializer for it. The example below builds on the previous `Animal` sample by adding a fictitious, single instance, new animal, an `Unicorn`. + +Scala +: @@snip [SerializationDocSpec.scala](/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/SerializationDocSpec.scala) { #polymorphism-case-object } + +The case object `Unicorn` can't be used in a `@JsonSubTypes` annotation, but its trait can. When serializing the case object we need to know which type tag to use, hence the `@JsonTypeName` annotation on the object. When deserializing, Jackson will only know about the trait variant therefore we need a custom deserializer that returns the case object. + +On the other hand, if the ADT only has case objects, you can solve it by implementing a custom serialization for the enums. Annotate the `trait` with `@JsonSerialize` and `@JsonDeserialize` and implement the serialization with `StdSerializer` and `StdDeserializer`. diff --git a/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/SerializationDocSpec.scala b/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/SerializationDocSpec.scala index 29d19f1101..1ce3b2e2e9 100644 --- a/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/SerializationDocSpec.scala +++ b/akka-serialization-jackson/src/test/scala/doc/akka/serialization/jackson/SerializationDocSpec.scala @@ -12,8 +12,11 @@ import akka.serialization.SerializationExtension import akka.serialization.SerializerWithStringManifest import akka.serialization.Serializers import akka.testkit.TestKit -import com.fasterxml.jackson.annotation.JsonSubTypes -import com.fasterxml.jackson.annotation.JsonTypeInfo +import com.fasterxml.jackson.annotation.{ JsonSubTypes, JsonTypeInfo, JsonTypeName } +import com.fasterxml.jackson.core.JsonParser +import com.fasterxml.jackson.databind.DeserializationContext +import com.fasterxml.jackson.databind.annotation.JsonDeserialize +import com.fasterxml.jackson.databind.deser.std.StdDeserializer import com.typesafe.config.ConfigFactory import org.scalatest.BeforeAndAfterAll import org.scalatest.matchers.should.Matchers @@ -126,20 +129,51 @@ object SerializationDocSpec { #//#manifestless """ - //#polymorphism - final case class Zoo(primaryAttraction: Animal) extends MySerializable + object Polymorphism { - @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") - @JsonSubTypes( - Array( - new JsonSubTypes.Type(value = classOf[Lion], name = "lion"), - new JsonSubTypes.Type(value = classOf[Elephant], name = "elephant"))) - sealed trait Animal + //#polymorphism + final case class Zoo(primaryAttraction: Animal) extends MySerializable - final case class Lion(name: String) extends Animal + @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") + @JsonSubTypes( + Array( + new JsonSubTypes.Type(value = classOf[Lion], name = "lion"), + new JsonSubTypes.Type(value = classOf[Elephant], name = "elephant"))) + sealed trait Animal - final case class Elephant(name: String, age: Int) extends Animal - //#polymorphism + final case class Lion(name: String) extends Animal + + final case class Elephant(name: String, age: Int) extends Animal + //#polymorphism + } + + object PolymorphismMixedClassObject { + + //#polymorphism-case-object + final case class Zoo(primaryAttraction: Animal) extends MySerializable + + @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") + @JsonSubTypes( + Array( + new JsonSubTypes.Type(value = classOf[Lion], name = "lion"), + new JsonSubTypes.Type(value = classOf[Elephant], name = "elephant"), + new JsonSubTypes.Type(value = classOf[Unicorn], name = "unicorn"))) + sealed trait Animal + + final case class Lion(name: String) extends Animal + final case class Elephant(name: String, age: Int) extends Animal + + @JsonDeserialize(using = classOf[UnicornDeserializer]) + sealed trait Unicorn extends Animal + @JsonTypeName("unicorn") + case object Unicorn extends Unicorn + + class UnicornDeserializer extends StdDeserializer[Unicorn](Unicorn.getClass) { + // whenever we need to deserialize an instance of Unicorn trait, we return the object Unicorn + override def deserialize(p: JsonParser, ctxt: DeserializationContext): Unicorn = Unicorn + } + //#polymorphism-case-object + } val configDateTime = """ #//#date-time @@ -207,6 +241,19 @@ class SerializationDocSpec private def serializerFor(obj: Any): SerializerWithStringManifest = serialization.serializerFor(obj.getClass).asInstanceOf[SerializerWithStringManifest] + "serialize trait + case classes" in { + import doc.akka.serialization.jackson.SerializationDocSpec.Polymorphism._ + verifySerialization(Zoo(Lion("Simba"))) should ===(Zoo(Lion("Simba"))) + verifySerialization(Zoo(Elephant("Dumbo", 1))) should ===(Zoo(Elephant("Dumbo", 1))) + } + + "serialize trait + case classes + case object" in { + import doc.akka.serialization.jackson.SerializationDocSpec.PolymorphismMixedClassObject._ + verifySerialization(Zoo(Lion("Simba"))) should ===(Zoo(Lion("Simba"))) + verifySerialization(Zoo(Elephant("Dumbo", 1))) should ===(Zoo(Elephant("Dumbo", 1))) + verifySerialization(Zoo(Unicorn)) should ===(Zoo(Unicorn)) + } + "serialize trait + object ADT" in { import CustomAdtSerializer.Compass import CustomAdtSerializer.Direction._