From d0053746df1da7a165242d05df82b0fd22b7be6c Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 28 Nov 2016 14:30:30 +0100 Subject: [PATCH] fix regression in remember entities, #21892 * regression was introduced by https://github.com/akka/akka/commit/141318e60a3b0802fbf2537829f883f5491cd97a in 2.4.12 --- .../cluster/sharding/ShardCoordinator.scala | 8 +- .../ClusterShardingRememberEntitiesSpec.scala | 180 ++++++++++++++++++ .../scala/akka/stream/javadsl/Source.scala | 2 +- .../scala/akka/stream/scaladsl/Source.scala | 2 +- 4 files changed, 186 insertions(+), 6 deletions(-) create mode 100644 akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index 957991b828..aeb8e05c57 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -437,15 +437,15 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti if (isMember(region)) { log.debug("ShardRegion registered: [{}]", region) aliveRegions += region - if (state.regions.contains(region)) + if (state.regions.contains(region)) { region ! RegisterAck(self) - else { + allocateShardHomesForRememberEntities() + } else { gracefulShutdownInProgress -= region update(ShardRegionRegistered(region)) { evt ⇒ state = state.updated(evt) context.watch(region) region ! RegisterAck(self) - allocateShardHomesForRememberEntities() } } @@ -693,7 +693,7 @@ abstract class ShardCoordinator(typeName: String, settings: ClusterShardingSetti } def allocateShardHomesForRememberEntities(): Unit = { - if (settings.rememberEntities) + if (settings.rememberEntities && state.unallocatedShards.nonEmpty) state.unallocatedShards.foreach { self ! GetShardHome(_) } } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala new file mode 100644 index 0000000000..b9c78bc9a9 --- /dev/null +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingRememberEntitiesSpec.scala @@ -0,0 +1,180 @@ +/** + * Copyright (C) 2009-2016 Lightbend Inc. + */ +package akka.cluster.sharding + +import scala.concurrent.duration._ +import java.io.File + +import akka.actor._ +import akka.cluster.Cluster +import akka.cluster.sharding.ShardRegion.GracefulShutdown +import akka.persistence.Persistence +import akka.persistence.journal.leveldb.{ SharedLeveldbJournal, SharedLeveldbStore } +import akka.remote.testconductor.RoleName +import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec } +import akka.testkit._ +import com.typesafe.config.ConfigFactory +import org.apache.commons.io.FileUtils + +import scala.concurrent.duration._ +import akka.cluster.sharding.ShardRegion.GetClusterShardingStats +import akka.cluster.sharding.ShardRegion.ClusterShardingStats +import akka.cluster.MemberStatus + +object ClusterShardingRememberEntitiesSpec { + + final case class Started(ref: ActorRef) + + def props(probe: ActorRef): Props = Props(new TestEntity(probe)) + + class TestEntity(probe: ActorRef) extends Actor { + probe ! Started(self) + + def receive = { + case m ⇒ sender() ! m + } + } + + val extractEntityId: ShardRegion.ExtractEntityId = { + case id: Int ⇒ (id.toString, id) + } + + val extractShardId: ShardRegion.ExtractShardId = msg ⇒ msg match { + case id: Int ⇒ id.toString + } + +} + +abstract class ClusterShardingRememberEntitiesSpecConfig(val mode: String) extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(ConfigFactory.parseString(s""" + akka.loglevel = INFO + akka.actor.provider = "cluster" + akka.cluster.auto-down-unreachable-after = 0s + akka.remote.log-remote-lifecycle-events = off + akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" + akka.persistence.journal.leveldb-shared { + timeout = 5s + store { + native = off + dir = "target/journal-ClusterShardingRememberEntitiesSpec" + } + } + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" + akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingRememberEntitiesSpec" + akka.cluster.sharding.state-store-mode = "$mode" + """)) +} + +object PersistentClusterShardingRememberEntitiesSpecConfig extends ClusterShardingRememberEntitiesSpecConfig("persistence") +object DDataClusterShardingRememberEntitiesSpecConfig extends ClusterShardingRememberEntitiesSpecConfig("ddata") + +class PersistentClusterShardingRememberEntitiesSpec extends ClusterShardingRememberEntitiesSpec(PersistentClusterShardingRememberEntitiesSpecConfig) + +class PersistentClusterShardingRememberEntitiesMultiJvmNode1 extends PersistentClusterShardingRememberEntitiesSpec +class PersistentClusterShardingRememberEntitiesMultiJvmNode2 extends PersistentClusterShardingRememberEntitiesSpec +class PersistentClusterShardingRememberEntitiesMultiJvmNode3 extends PersistentClusterShardingRememberEntitiesSpec + +abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememberEntitiesSpecConfig) extends MultiNodeSpec(config) with STMultiNodeSpec with ImplicitSender { + import ClusterShardingRememberEntitiesSpec._ + import config._ + + override def initialParticipants = roles.size + + val storageLocations = List( + "akka.persistence.journal.leveldb.dir", + "akka.persistence.journal.leveldb-shared.store.dir", + "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s))) + + override protected def atStartup() { + runOn(first) { + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) + } + } + + override protected def afterTermination() { + runOn(first) { + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) + } + } + + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + Cluster(system) join node(to).address + } + enterBarrier(from.name + "-joined") + } + + val cluster = Cluster(system) + + def startSharding(): Unit = { + ClusterSharding(system).start( + typeName = "Entity", + entityProps = ClusterShardingRememberEntitiesSpec.props(testActor), + settings = ClusterShardingSettings(system).withRememberEntities(true), + extractEntityId = extractEntityId, + extractShardId = extractShardId) + } + + lazy val region = ClusterSharding(system).shardRegion("Entity") + + s"Cluster with min-nr-of-members using sharding ($mode)" must { + + "setup shared journal" in { + // start the Persistence extension + Persistence(system) + runOn(first) { + system.actorOf(Props[SharedLeveldbStore], "store") + } + enterBarrier("peristence-started") + + runOn(second, third) { + system.actorSelection(node(first) / "user" / "store") ! Identify(None) + val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get + SharedLeveldbJournal.setStore(sharedStore, system) + } + + enterBarrier("after-1") + } + + "start remembered entities when coordinator fail over" in within(30.seconds) { + join(second, second) + runOn(second) { + startSharding() + region ! 1 + expectMsgType[Started] + } + + join(third, second) + runOn(third) { + startSharding() + } + runOn(second, third) { + within(remaining) { + awaitAssert { + cluster.state.members.size should ===(2) + cluster.state.members.map(_.status) should ===(Set(MemberStatus.Up)) + } + } + } + enterBarrier("all-up") + + runOn(first) { + testConductor.exit(second, 0).await + } + enterBarrier("crash-second") + + runOn(third) { + expectMsgType[Started](remaining) + } + + enterBarrier("after-2") + } + + } +} + diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 501e1dec44..aae573eb12 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -227,7 +227,7 @@ object Source { * `create` factory is never called and the materialized `CompletionStage` is failed. */ def lazily[T, M](create: function.Creator[Source[T, M]]): Source[T, CompletionStage[M]] = - scaladsl.Source.lazily[T, M](() => create.create().asScala).mapMaterializedValue(_.toJava).asJava + scaladsl.Source.lazily[T, M](() ⇒ create.create().asScala).mapMaterializedValue(_.toJava).asJava /** * Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]] diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index a7adcfb691..34e09ec9f3 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -360,7 +360,7 @@ object Source { * the materialized future is completed with its value, if downstream cancels or fails without any demand the * create factory is never called and the materialized `Future` is failed. */ - def lazily[T, M](create: () => Source[T, M]): Source[T, Future[M]] = + def lazily[T, M](create: () ⇒ Source[T, M]): Source[T, Future[M]] = Source.fromGraph(new LazySource[T, M](create)) /**