diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 37febd4956..69d6e42dee 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -878,6 +878,7 @@ private[akka] class ActorSystemImpl( "akka-cluster-sharding-typed", "akka-cluster-tools", "akka-cluster-typed", + "akka-coordination", "akka-discovery", "akka-distributed-data", "akka-multi-node-testkit", diff --git a/akka-cluster-sharding/src/main/mima-filters/2.5.21.backwards.excludes b/akka-cluster-sharding/src/main/mima-filters/2.5.21.backwards.excludes new file mode 100644 index 0000000000..772258f753 --- /dev/null +++ b/akka-cluster-sharding/src/main/mima-filters/2.5.21.backwards.excludes @@ -0,0 +1,4 @@ +# Lease API #26468 +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShard.initialized") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.PersistentShard.initialized") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard.initialized") diff --git a/akka-cluster-sharding/src/main/resources/reference.conf b/akka-cluster-sharding/src/main/resources/reference.conf index 61140bd22a..a19a97b818 100644 --- a/akka-cluster-sharding/src/main/resources/reference.conf +++ b/akka-cluster-sharding/src/main/resources/reference.conf @@ -133,6 +133,7 @@ akka.cluster.sharding { # Settings for the coordinator singleton. Same layout as akka.cluster.singleton. # The "role" of the singleton configuration is not used. The singleton role will # be the same as "akka.cluster.sharding.role". + # A lease can be configured in these settings for the coordinator singleton coordinator-singleton = ${akka.cluster.singleton} # Settings for the Distributed Data replicator. @@ -161,6 +162,14 @@ akka.cluster.sharding { # This dispatcher for the entity actors is defined by the user provided # Props, i.e. this dispatcher is not used for the entity actors. use-dispatcher = "" + + # Config path of the lease that each shard must acquire before starting entity actors + # default is no lease + # A lease can also be used for the singleton coordinator by settings it in the coordinator-singleton properties + use-lease = "" + + # The interval between retries for acquiring the lease + lease-retry-interval = 5s } # //#sharding-ext-config diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala index 8ba17954b6..af47f52b6c 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala @@ -8,9 +8,9 @@ import scala.concurrent.duration._ import scala.concurrent.duration.FiniteDuration import akka.actor.ActorSystem import akka.actor.NoSerializationVerificationNeeded -import akka.annotation.InternalApi +import akka.annotation.{ ApiMayChange, InternalApi } import com.typesafe.config.Config -import akka.cluster.Cluster +import akka.cluster.{ Cluster, ClusterLeaseSettings } import akka.cluster.singleton.ClusterSingletonManagerSettings import akka.util.JavaDurationConverters._ @@ -59,6 +59,11 @@ object ClusterShardingSettings { if (config.getString("passivate-idle-entity-after").toLowerCase == "off") Duration.Zero else config.getDuration("passivate-idle-entity-after", MILLISECONDS).millis + val lease = config.getString("use-lease") match { + case s if s.isEmpty ⇒ None + case other ⇒ Some(new ClusterLeaseSettings(other, config.getDuration("lease-retry-interval").asScala)) + } + new ClusterShardingSettings( role = roleOption(config.getString("role")), rememberEntities = config.getBoolean("remember-entities"), @@ -67,7 +72,8 @@ object ClusterShardingSettings { stateStoreMode = config.getString("state-store-mode"), passivateIdleEntityAfter = passivateIdleAfter, tuningParameters, - coordinatorSingletonSettings) + coordinatorSingletonSettings, + lease) } /** @@ -213,9 +219,31 @@ final class ClusterShardingSettings( val stateStoreMode: String, val passivateIdleEntityAfter: FiniteDuration, val tuningParameters: ClusterShardingSettings.TuningParameters, - val coordinatorSingletonSettings: ClusterSingletonManagerSettings) + val coordinatorSingletonSettings: ClusterSingletonManagerSettings, + val leaseSettings: Option[ClusterLeaseSettings]) extends NoSerializationVerificationNeeded { + // bin compat for 2.5.21 + def this( + role: Option[String], + rememberEntities: Boolean, + journalPluginId: String, + snapshotPluginId: String, + stateStoreMode: String, + passivateIdleEntityAfter: FiniteDuration, + tuningParameters: ClusterShardingSettings.TuningParameters, + coordinatorSingletonSettings: ClusterSingletonManagerSettings) = + this( + role, + rememberEntities, + journalPluginId, + snapshotPluginId, + stateStoreMode, + passivateIdleEntityAfter, + tuningParameters, + coordinatorSingletonSettings, + None) + // included for binary compatibility reasons @deprecated( "Use the ClusterShardingSettings factory methods or the constructor including passivateIdleEntityAfter instead", @@ -273,6 +301,10 @@ final class ClusterShardingSettings( def withPassivateIdleAfter(duration: java.time.Duration): ClusterShardingSettings = copy(passivateIdleAfter = duration.asScala) + @ApiMayChange + def withLeaseSettings(leaseSettings: ClusterLeaseSettings): ClusterShardingSettings = + copy(leaseSettings = Some(leaseSettings)) + /** * The `role` of the `ClusterSingletonManagerSettings` is not used. The `role` of the * coordinator singleton will be the same as the `role` of `ClusterShardingSettings`. @@ -289,8 +321,8 @@ final class ClusterShardingSettings( stateStoreMode: String = stateStoreMode, passivateIdleAfter: FiniteDuration = passivateIdleEntityAfter, tuningParameters: ClusterShardingSettings.TuningParameters = tuningParameters, - coordinatorSingletonSettings: ClusterSingletonManagerSettings = coordinatorSingletonSettings) - : ClusterShardingSettings = + coordinatorSingletonSettings: ClusterSingletonManagerSettings = coordinatorSingletonSettings, + leaseSettings: Option[ClusterLeaseSettings] = leaseSettings): ClusterShardingSettings = new ClusterShardingSettings( role, rememberEntities, @@ -299,5 +331,6 @@ final class ClusterShardingSettings( stateStoreMode, passivateIdleAfter, tuningParameters, - coordinatorSingletonSettings) + coordinatorSingletonSettings, + leaseSettings) } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index 9a2ad232fe..209e2877b4 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -6,28 +6,37 @@ package akka.cluster.sharding import java.net.URLEncoder -import akka.actor.ActorLogging -import akka.actor.ActorRef -import akka.actor.ActorSystem -import akka.actor.Deploy -import akka.actor.Props -import akka.actor.Terminated -import akka.actor.Actor +import akka.actor.{ + Actor, + ActorLogging, + ActorRef, + ActorSystem, + DeadLetterSuppression, + Deploy, + NoSerializationVerificationNeeded, + Props, + Stash, + Terminated, + Timers +} import akka.util.{ ConstantFun, MessageBufferMap } - import scala.concurrent.Future + import akka.cluster.Cluster import akka.cluster.ddata.ORSet import akka.cluster.ddata.ORSetKey import akka.cluster.ddata.Replicator._ -import akka.actor.Stash import akka.persistence._ -import akka.actor.NoSerializationVerificationNeeded - +import akka.util.PrettyDuration._ +import akka.coordination.lease.scaladsl.{ Lease, LeaseProvider } +import akka.pattern.pipe import scala.concurrent.duration._ +import akka.cluster.sharding.ShardRegion.ShardInitialized + /** * INTERNAL API + * * @see [[ClusterSharding$ ClusterSharding extension]] */ private[akka] object Shard { @@ -81,6 +90,12 @@ private[akka] object Shard { @SerialVersionUID(1L) final case class ShardStats(shardId: ShardRegion.ShardId, entityCount: Int) extends ClusterShardingSerializable + final case class LeaseAcquireResult(acquired: Boolean, reason: Option[Throwable]) extends DeadLetterSuppression + final case class LeaseLost(reason: Option[Throwable]) extends DeadLetterSuppression + + final case object LeaseRetry extends DeadLetterSuppression + private val LeaseRetryTimer = "lease-retry" + object State { val Empty = State() } @@ -154,7 +169,8 @@ private[akka] class Shard( extractShardId: ShardRegion.ExtractShardId, handOffStopMessage: Any) extends Actor - with ActorLogging { + with ActorLogging + with Timers { import ShardRegion.{ handOffStopperProps, EntityId, Msg, Passivate, ShardInitialized } import ShardCoordinator.Internal.{ HandOff, ShardStopped } @@ -180,15 +196,82 @@ private[akka] class Shard( None } - initialized() + val lease = settings.leaseSettings.map( + ls => + LeaseProvider(context.system).getLease( + s"${context.system.name}-shard-$typeName-$shardId", + ls.leaseImplementation, + Cluster(context.system).selfAddress.hostPort)) - def initialized(): Unit = context.parent ! ShardInitialized(shardId) + val leaseRetryInterval = settings.leaseSettings match { + case Some(l) => l.leaseRetryInterval + case None => 5.seconds // not used + } + + override def preStart(): Unit = { + acquireLeaseIfNeeded() + } + + /** + * Will call onLeaseAcquired when completed, also when lease isn't used + */ + def acquireLeaseIfNeeded(): Unit = { + lease match { + case Some(l) => + tryGetLease(l) + context.become(awaitingLease()) + case None => + onLeaseAcquired() + } + } + + // Override to execute logic once the lease has been acquired + // Will be called on the actor thread + def onLeaseAcquired(): Unit = { + log.debug("Shard initialized") + context.parent ! ShardInitialized(shardId) + context.become(receiveCommand) + } + + private def tryGetLease(l: Lease) = { + log.info("Acquiring lease {}", l.settings) + pipe(l.acquire(reason => self ! LeaseLost(reason)).map(r => LeaseAcquireResult(r, None)).recover { + case t => LeaseAcquireResult(acquired = false, Some(t)) + }).to(self) + } def processChange[E <: StateChange](event: E)(handler: E => Unit): Unit = handler(event) def receive = receiveCommand + // Don't send back ShardInitialized so that messages are buffered in the ShardRegion + // while awaiting the lease + def awaitingLease(): Receive = { + case LeaseAcquireResult(true, _) => + log.debug("Acquired lease") + onLeaseAcquired() + case LeaseAcquireResult(false, None) => + log.error( + "Failed to get lease for shard type [{}] id [{}]. Retry in {}", + typeName, + shardId, + leaseRetryInterval.pretty) + timers.startSingleTimer(LeaseRetryTimer, LeaseRetry, leaseRetryInterval) + case LeaseAcquireResult(false, Some(t)) => + log.error( + t, + "Failed to get lease for shard type [{}] id [{}]. Retry in {}", + typeName, + shardId, + leaseRetryInterval) + timers.startSingleTimer(LeaseRetryTimer, LeaseRetry, leaseRetryInterval) + case LeaseRetry => + tryGetLease(lease.get) + case ll: LeaseLost => + receiveLeaseLost(ll) + } + def receiveCommand: Receive = { case Terminated(ref) => receiveTerminated(ref) case msg: CoordinatorMessage => receiveCoordinatorMessage(msg) @@ -198,9 +281,17 @@ private[akka] class Shard( case msg: ShardRegionCommand => receiveShardRegionCommand(msg) case msg: ShardQuery => receiveShardQuery(msg) case PassivateIdleTick => passivateIdleEntities() + case msg: LeaseLost => receiveLeaseLost(msg) case msg if extractEntityId.isDefinedAt(msg) => deliverMessage(msg, sender()) } + def receiveLeaseLost(msg: LeaseLost): Unit = { + // The shard region will re-create this when it receives a message for this shard + log.error("Shard type [{}] id [{}] lease lost. Reason: {}", typeName, shardId, msg.reason) + // Stop entities ASAP rather than send termination message + context.stop(self) + + } def receiveShardCommand(msg: ShardCommand): Unit = msg match { case RestartEntity(id) => getOrCreateEntity(id) case RestartEntities(ids) => restartEntities(ids) @@ -558,15 +649,16 @@ private[akka] class PersistentShard( import Shard._ import settings.tuningParameters._ + override def preStart(): Unit = { + // override to not acquire the lease on start up, acquire after persistent recovery + } + override def persistenceId = s"/sharding/${typeName}Shard/$shardId" override def journalPluginId: String = settings.journalPluginId override def snapshotPluginId: String = settings.snapshotPluginId - // would be initialized after recovery completed - override def initialized(): Unit = {} - override def receive = receiveCommand override def processChange[E <: StateChange](event: E)(handler: E => Unit): Unit = { @@ -586,11 +678,18 @@ private[akka] class PersistentShard( case EntityStopped(id) => state = state.copy(state.entities - id) case SnapshotOffer(_, snapshot: State) => state = snapshot case RecoveryCompleted => - restartRememberedEntities() - super.initialized() + acquireLeaseIfNeeded() // onLeaseAcquired called when completed log.debug("PersistentShard recovery completed shard [{}] with [{}] entities", shardId, state.entities.size) } + override def onLeaseAcquired(): Unit = { + log.debug("Shard initialized") + context.parent ! ShardInitialized(shardId) + context.become(receiveCommand) + restartRememberedEntities() + unstashAll() + } + override def receiveCommand: Receive = ({ case e: SaveSnapshotSuccess => @@ -672,8 +771,11 @@ private[akka] class DDataShard( stateKeys(i) } - // get initial state from ddata replicator - getState() + override def onLeaseAcquired(): Unit = { + log.info("Lease Acquired. Getting state from DData") + getState() + context.become(waitingForState(Set.empty)) + } private def getState(): Unit = { (0 until numberOfKeys).map { i => @@ -681,18 +783,15 @@ private[akka] class DDataShard( } } - // would be initialized after recovery completed - override def initialized(): Unit = {} - override def receive = waitingForState(Set.empty) // This state will stash all commands private def waitingForState(gotKeys: Set[Int]): Receive = { def receiveOne(i: Int): Unit = { val newGotKeys = gotKeys + i - if (newGotKeys.size == numberOfKeys) + if (newGotKeys.size == numberOfKeys) { recoveryCompleted() - else + } else context.become(waitingForState(newGotKeys)) } @@ -718,11 +817,11 @@ private[akka] class DDataShard( } private def recoveryCompleted(): Unit = { - restartRememberedEntities() - super.initialized() log.debug("DDataShard recovery completed shard [{}] with [{}] entities", shardId, state.entities.size) - unstashAll() + context.parent ! ShardInitialized(shardId) context.become(receiveCommand) + restartRememberedEntities() + unstashAll() } override def processChange[E <: StateChange](event: E)(handler: E => Unit): Unit = { @@ -775,6 +874,7 @@ private[akka] class DDataShard( evt) throw cause + // TODO what can this actually be? We're unitialized in the ShardRegion case _ => stash() } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index 6aaebb3cc7..efe7b65dd6 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -423,8 +423,9 @@ object ShardRegion { /** * INTERNAL API * - * This actor creates child entity actors on demand for the shards that it is told to be - * responsible for. It delegates messages targeted to other shards to the responsible + * This actor creates children shard actors on demand that it is told to be responsible for. + * The shard actors in turn create entity actors on demand. + * It delegates messages targeted to other shards to the responsible * `ShardRegion` actor on other nodes. * * @see [[ClusterSharding$ ClusterSharding extension]] diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingLeaseSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingLeaseSpec.scala new file mode 100644 index 0000000000..29bad859eb --- /dev/null +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingLeaseSpec.scala @@ -0,0 +1,135 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.cluster.sharding +import akka.actor.Props +import akka.cluster.{ Cluster, MemberStatus, TestLease, TestLeaseExt } +import akka.testkit.TestActors.EchoActor +import akka.testkit.{ AkkaSpec, ImplicitSender } +import com.typesafe.config.{ Config, ConfigFactory } + +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.util.Success +import scala.util.control.NoStackTrace + +object ClusterShardingLeaseSpec { + val config = ConfigFactory.parseString(""" + akka.loglevel = DEBUG + #akka.loggers = ["akka.testkit.SilenceAllTestEventListener"] + akka.actor.provider = "cluster" + akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 + akka.cluster.sharding { + use-lease = "test-lease" + lease-retry-interval = 200ms + distributed-data.durable { + keys = [] + } + } + """).withFallback(TestLease.config) + + val persistenceConfig = ConfigFactory.parseString(""" + akka.cluster.sharding { + state-store-mode = persistence + journal-plugin-id = "akka.persistence.journal.inmem" + } + """) + + val ddataConfig = ConfigFactory.parseString(""" + akka.cluster.sharding { + state-store-mode = ddata + } + """) + + val extractEntityId: ShardRegion.ExtractEntityId = { + case msg: Int => (msg.toString, msg) + } + + val extractShardId: ShardRegion.ExtractShardId = { + case msg: Int => (msg % 10).toString + } + case class LeaseFailed(msg: String) extends RuntimeException(msg) with NoStackTrace +} + +class PersistenceClusterShardingLeaseSpec + extends ClusterShardingLeaseSpec(ClusterShardingLeaseSpec.persistenceConfig, true) +class DDataClusterShardingLeaseSpec extends ClusterShardingLeaseSpec(ClusterShardingLeaseSpec.ddataConfig, true) + +class ClusterShardingLeaseSpec(config: Config, rememberEntities: Boolean) + extends AkkaSpec(config.withFallback(ClusterShardingLeaseSpec.config)) + with ImplicitSender { + import ClusterShardingLeaseSpec._ + + def this() = this(ConfigFactory.empty(), false) + + val shortDuration = 200.millis + val cluster = Cluster(system) + val leaseOwner = cluster.selfMember.address.hostPort + val testLeaseExt = TestLeaseExt(system) + + override protected def atStartup(): Unit = { + cluster.join(cluster.selfAddress) + awaitAssert { + cluster.selfMember.status shouldEqual MemberStatus.Up + } + ClusterSharding(system).start( + typeName = typeName, + entityProps = Props[EchoActor], + settings = ClusterShardingSettings(system).withRememberEntities(rememberEntities), + extractEntityId = extractEntityId, + extractShardId = extractShardId) + } + + def region = ClusterSharding(system).shardRegion(typeName) + + val typeName = "echo" + + def leaseForShard(shardId: Int) = awaitAssert { + testLeaseExt.getTestLease(leaseNameFor(shardId)) + } + + def leaseNameFor(shardId: Int, typeName: String = typeName): String = + s"${system.name}-shard-${typeName}-${shardId}" + + "Cluster sharding with lease" should { + "not start until lease is acquired" in { + region ! 1 + expectNoMessage(shortDuration) + val testLease = leaseForShard(1) + testLease.initialPromise.complete(Success(true)) + expectMsg(1) + } + "retry if initial acquire is false" in { + region ! 2 + expectNoMessage(shortDuration) + val testLease = leaseForShard(2) + testLease.initialPromise.complete(Success(false)) + expectNoMessage(shortDuration) + testLease.setNextAcquireResult(Future.successful(true)) + expectMsg(2) + } + "retry if initial acquire fails" in { + region ! 3 + expectNoMessage(shortDuration) + val testLease = leaseForShard(3) + testLease.initialPromise.failure(LeaseFailed("oh no")) + expectNoMessage(shortDuration) + testLease.setNextAcquireResult(Future.successful(true)) + expectMsg(3) + } + "recover if lease lost" in { + region ! 4 + expectNoMessage(shortDuration) + val testLease = leaseForShard(4) + testLease.initialPromise.complete(Success(true)) + expectMsg(4) + testLease.getCurrentCallback()(Option(LeaseFailed("oh dear"))) + awaitAssert({ + region ! 4 + expectMsg(4) + }, max = 5.seconds) + } + } +} diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ShardSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ShardSpec.scala new file mode 100644 index 0000000000..a8a046aa71 --- /dev/null +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ShardSpec.scala @@ -0,0 +1,122 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.cluster.sharding + +import java.util.concurrent.atomic.AtomicInteger + +import akka.actor.{ Actor, ActorLogging, PoisonPill, Props } +import akka.cluster.{ ClusterLeaseSettings, TestLeaseExt } +import akka.cluster.sharding.ShardRegion.ShardInitialized +import akka.testkit.{ AkkaSpec, ImplicitSender, TestProbe } + +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.util.Success +import scala.util.control.NoStackTrace + +object ShardSpec { + val config = + """ + akka.loglevel = INFO + akka.actor.provider = "cluster" + akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 + test-lease { + lease-class = akka.cluster.TestLease + heartbeat-interval = 1s + heartbeat-timeout = 120s + lease-operation-timeout = 3s + } + """ + + class EntityActor extends Actor with ActorLogging { + override def receive: Receive = { + case msg ⇒ + log.info("Msg {}", msg) + sender() ! s"ack ${msg}" + } + } + + val numberOfShards = 5 + + case class EntityEnvelope(entityId: Int, msg: Any) + + val extractEntityId: ShardRegion.ExtractEntityId = { + case EntityEnvelope(id, payload) ⇒ (id.toString, payload) + } + + val extractShardId: ShardRegion.ExtractShardId = { + case EntityEnvelope(id, _) ⇒ (id % numberOfShards).toString + } + + case class BadLease(msg: String) extends RuntimeException(msg) with NoStackTrace +} + +class ShardSpec extends AkkaSpec(ShardSpec.config) with ImplicitSender { + + import ShardSpec._ + + val shortDuration = 100.millis + val testLeaseExt = TestLeaseExt(system) + + def leaseNameForShard(typeName: String, shardId: String) = s"${system.name}-shard-${typeName}-${shardId}" + + "A Cluster Shard" should { + "not initialize the shard until the lease is acquired" in new Setup { + parent.expectNoMessage(shortDuration) + lease.initialPromise.complete(Success(true)) + parent.expectMsg(ShardInitialized(shardId)) + } + + "retry if lease acquire returns false" in new Setup { + lease.initialPromise.complete(Success(false)) + parent.expectNoMessage(shortDuration) + lease.setNextAcquireResult(Future.successful(true)) + parent.expectMsg(ShardInitialized(shardId)) + } + + "retry if the lease acquire fails" in new Setup { + lease.initialPromise.failure(BadLease("no lease for you")) + parent.expectNoMessage(shortDuration) + lease.setNextAcquireResult(Future.successful(true)) + parent.expectMsg(ShardInitialized(shardId)) + } + + "shutdown if lease is lost" in new Setup { + val probe = TestProbe() + probe.watch(shard) + lease.initialPromise.complete(Success(true)) + parent.expectMsg(ShardInitialized(shardId)) + lease.getCurrentCallback().apply(Some(BadLease("bye bye lease"))) + probe.expectTerminated(shard) + } + } + + val shardIds = new AtomicInteger(0) + def nextShardId = s"${shardIds.getAndIncrement()}" + + trait Setup { + val shardId = nextShardId + val parent = TestProbe() + val settings = ClusterShardingSettings(system).withLeaseSettings(new ClusterLeaseSettings("test-lease", 2.seconds)) + def lease = awaitAssert { + testLeaseExt.getTestLease(leaseNameForShard(typeName, shardId)) + } + + val typeName = "type1" + val shard = parent.childActorOf( + Shard.props( + typeName, + shardId, + _ ⇒ Props(new EntityActor()), + settings, + extractEntityId, + extractShardId, + PoisonPill, + system.deadLetters, + 1)) + } + +} diff --git a/akka-cluster-tools/src/main/mima-filters/2.5.21.backwards.excludes b/akka-cluster-tools/src/main/mima-filters/2.5.21.backwards.excludes new file mode 100644 index 0000000000..58038bc644 --- /dev/null +++ b/akka-cluster-tools/src/main/mima-filters/2.5.21.backwards.excludes @@ -0,0 +1,5 @@ +# Lease API #26468 +ProblemFilters.exclude[Problem]("akka.cluster.singleton.ClusterSingletonManager#Internal*") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.singleton.ClusterSingletonManager.gotoOldest") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.singleton.ClusterSingletonManager.gotoHandingOver") +ProblemFilters.exclude[Problem]("akka.cluster.singleton.ClusterSingletonManager$Internal$*") diff --git a/akka-cluster-tools/src/main/resources/reference.conf b/akka-cluster-tools/src/main/resources/reference.conf index 1de04b2a75..7f774c8cec 100644 --- a/akka-cluster-tools/src/main/resources/reference.conf +++ b/akka-cluster-tools/src/main/resources/reference.conf @@ -186,6 +186,14 @@ akka.cluster.singleton { # it will not be a quicker hand over by reducing this value, but in extreme failure scenarios # the recovery might be faster. min-number-of-hand-over-retries = 15 + + # Config path of the lease to be taken before creating the singleton actor + # if the lease is lost then the actor is restarted and it will need to re-acquire the lease + # the default is no lease + use-lease = "" + + # The interval between retries for acquiring the lease + lease-retry-interval = 5s } # //#singleton-config diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/ClusterLeaseSettings.scala b/akka-cluster-tools/src/main/scala/akka/cluster/ClusterLeaseSettings.scala new file mode 100644 index 0000000000..cd14778eed --- /dev/null +++ b/akka-cluster-tools/src/main/scala/akka/cluster/ClusterLeaseSettings.scala @@ -0,0 +1,18 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.cluster + +import akka.annotation.ApiMayChange + +import scala.concurrent.duration.FiniteDuration +import akka.util.JavaDurationConverters._ +import akka.util.PrettyDuration._ + +@ApiMayChange +class ClusterLeaseSettings private[akka] (val leaseImplementation: String, val leaseRetryInterval: FiniteDuration) { + def getLeaseRetryInterval(): java.time.Duration = leaseRetryInterval.asJava + + override def toString = s"ClusterLeaseSettings($leaseImplementation, ${leaseRetryInterval.pretty})" +} diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala index 17eaf2ab05..3d799aa331 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala @@ -5,10 +5,10 @@ package akka.cluster.singleton import com.typesafe.config.Config + import scala.concurrent.duration._ import scala.collection.immutable import scala.concurrent.Future - import akka.actor.Actor import akka.actor.Deploy import akka.actor.ActorSystem @@ -19,22 +19,22 @@ import akka.actor.DeadLetterSuppression import akka.actor.FSM import akka.actor.Props import akka.actor.Terminated -import akka.cluster.Cluster +import akka.cluster._ import akka.cluster.ClusterEvent._ -import akka.cluster.Member -import akka.cluster.MemberStatus import akka.AkkaException import akka.actor.NoSerializationVerificationNeeded -import akka.cluster.UniqueAddress -import akka.cluster.ClusterEvent -import scala.concurrent.Promise +import akka.pattern.pipe +import akka.util.JavaDurationConverters._ +import scala.concurrent.Promise import akka.Done import akka.actor.CoordinatedShutdown -import akka.annotation.DoNotInherit +import akka.annotation.{ ApiMayChange, DoNotInherit } import akka.pattern.ask import akka.util.Timeout -import akka.cluster.ClusterSettings +import akka.coordination.lease.scaladsl.{ Lease, LeaseProvider } + +import scala.util.control.NonFatal object ClusterSingletonManagerSettings { @@ -52,12 +52,19 @@ object ClusterSingletonManagerSettings { * Create settings from a configuration with the same layout as * the default configuration `akka.cluster.singleton`. */ - def apply(config: Config): ClusterSingletonManagerSettings = + def apply(config: Config): ClusterSingletonManagerSettings = { + val lease = config.getString("use-lease") match { + case s if s.isEmpty ⇒ None + case leaseConfigPath => + Some(new ClusterLeaseSettings(leaseConfigPath, config.getDuration("lease-retry-interval").asScala)) + } new ClusterSingletonManagerSettings( singletonName = config.getString("singleton-name"), role = roleOption(config.getString("role")), - removalMargin = Duration.Zero, // defaults to ClusterSettins.DownRemovalMargin - handOverRetryInterval = config.getDuration("hand-over-retry-interval", MILLISECONDS).millis) + removalMargin = Duration.Zero, // defaults to ClusterSettings.DownRemovalMargin + handOverRetryInterval = config.getDuration("hand-over-retry-interval", MILLISECONDS).millis, + lease) + } /** * Java API: Create settings from the default configuration @@ -98,14 +105,25 @@ object ClusterSingletonManagerSettings { * retried with this interval until the previous oldest confirms that the hand * over has started or the previous oldest member is removed from the cluster * (+ `removalMargin`). + * + * @param leaseSettings LeaseSettings for acquiring before creating the singleton actor */ final class ClusterSingletonManagerSettings( val singletonName: String, val role: Option[String], val removalMargin: FiniteDuration, - val handOverRetryInterval: FiniteDuration) + val handOverRetryInterval: FiniteDuration, + val leaseSettings: Option[ClusterLeaseSettings]) extends NoSerializationVerificationNeeded { + // bin compat for akka 2.5.21 + def this( + singletonName: String, + role: Option[String], + removalMargin: FiniteDuration, + handOverRetryInterval: FiniteDuration) = + this(singletonName, role, removalMargin, handOverRetryInterval, None) + def withSingletonName(name: String): ClusterSingletonManagerSettings = copy(singletonName = name) def withRole(role: String): ClusterSingletonManagerSettings = @@ -119,12 +137,16 @@ final class ClusterSingletonManagerSettings( def withHandOverRetryInterval(retryInterval: FiniteDuration): ClusterSingletonManagerSettings = copy(handOverRetryInterval = retryInterval) + def withLeaseSettings(leaseSettings: ClusterLeaseSettings): ClusterSingletonManagerSettings = + copy(leaseSettings = Some(leaseSettings)) + private def copy( singletonName: String = singletonName, role: Option[String] = role, removalMargin: FiniteDuration = removalMargin, - handOverRetryInterval: FiniteDuration = handOverRetryInterval): ClusterSingletonManagerSettings = - new ClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval) + handOverRetryInterval: FiniteDuration = handOverRetryInterval, + leaseSettings: Option[ClusterLeaseSettings] = leaseSettings): ClusterSingletonManagerSettings = + new ClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval, leaseSettings) } /** @@ -189,10 +211,12 @@ object ClusterSingletonManager { final case class HandOverRetry(count: Int) final case class TakeOverRetry(count: Int) + final case object LeaseRetry case object Cleanup case object StartOldestChangedBuffer case object Start extends State + case object AcquiringLease extends State case object Oldest extends State case object Younger extends State case object BecomingOldest extends State @@ -205,21 +229,19 @@ object ClusterSingletonManager { case object Uninitialized extends Data final case class YoungerData(oldestOption: Option[UniqueAddress]) extends Data final case class BecomingOldestData(previousOldestOption: Option[UniqueAddress]) extends Data - final case class OldestData(singleton: ActorRef, singletonTerminated: Boolean = false) extends Data - final case class WasOldestData( - singleton: ActorRef, - singletonTerminated: Boolean, - newOldestOption: Option[UniqueAddress]) - extends Data + final case class OldestData(singleton: Option[ActorRef]) extends Data + final case class WasOldestData(singleton: Option[ActorRef], newOldestOption: Option[UniqueAddress]) extends Data final case class HandingOverData(singleton: ActorRef, handOverTo: Option[ActorRef]) extends Data final case class StoppingData(singleton: ActorRef) extends Data case object EndData extends Data final case class DelayedMemberRemoved(member: Member) case object SelfExiting + case class AcquiringLeaseData(leaseRequestInProgress: Boolean, singleton: Option[ActorRef]) extends Data val HandOverRetryTimer = "hand-over-retry" val TakeOverRetryTimer = "take-over-retry" val CleanupTimer = "cleanup" + val LeaseRetryTimer = "lease-retry" object OldestChangedBuffer { @@ -236,8 +258,14 @@ object ClusterSingletonManager { final case class OldestChanged(oldest: Option[UniqueAddress]) } + final case class AcquireLeaseResult(holdingLease: Boolean) extends DeadLetterSuppression + final case class ReleaseLeaseResult(released: Boolean) extends DeadLetterSuppression + final case class AcquireLeaseFailure(t: Throwable) extends DeadLetterSuppression + final case class ReleaseLeaseFailure(t: Throwable) extends DeadLetterSuppression + final case class LeaseLost(reason: Option[Throwable]) extends DeadLetterSuppression + /** - * Notifications of member events that track oldest member is tunneled + * Notifications of member events that track oldest member are tunneled * via this actor (child of ClusterSingletonManager) to be able to deliver * one change at a time. Avoiding simultaneous changes simplifies * the process in ClusterSingletonManager. ClusterSingletonManager requests @@ -457,6 +485,17 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se role.forall(cluster.selfRoles.contains), s"This cluster member [${cluster.selfAddress}] doesn't have the role [$role]") + private val singletonLeaseName = s"${context.system.name}-singleton-${self.path}" + + val lease: Option[Lease] = settings.leaseSettings.map( + settings => + LeaseProvider(context.system) + .getLease(singletonLeaseName, settings.leaseImplementation, cluster.selfAddress.hostPort)) + val leaseRetryInterval: FiniteDuration = settings.leaseSettings match { + case Some(s) => s.leaseRetryInterval + case None => 5.seconds // won't be used + } + val removalMargin = if (settings.removalMargin <= Duration.Zero) cluster.downingProvider.downRemovalMargin else settings.removalMargin @@ -515,6 +554,9 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se def logInfo(template: String, arg1: Any, arg2: Any): Unit = if (LogInfo) log.info(template, arg1, arg2) + def logInfo(template: String, arg1: Any, arg2: Any, arg3: Any): Unit = + if (LogInfo) log.info(template, arg1, arg2, arg3) + override def preStart(): Unit = { super.preStart() require(!cluster.isTerminated, "Cluster node must not be terminated") @@ -557,7 +599,7 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se oldestChangedReceived = true if (oldestOption == selfUniqueAddressOption && safeToBeOldest) // oldest immediately - gotoOldest() + tryGoToOldest() else if (oldestOption == selfUniqueAddressOption) goto(BecomingOldest).using(BecomingOldestData(None)) else @@ -570,8 +612,8 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se if (oldestOption == selfUniqueAddressOption) { logInfo("Younger observed OldestChanged: [{} -> myself]", previousOldestOption.map(_.address)) previousOldestOption match { - case None => gotoOldest() - case Some(prev) if removed.contains(prev) => gotoOldest() + case None => tryGoToOldest() + case Some(prev) if removed.contains(prev) => tryGoToOldest() case Some(prev) => peer(prev.address) ! HandOverToMe goto(BecomingOldest).using(BecomingOldestData(previousOldestOption)) @@ -620,7 +662,7 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se case Event(HandOverDone, BecomingOldestData(Some(previousOldest))) => if (sender().path.address == previousOldest.address) - gotoOldest() + tryGoToOldest() else { logInfo( "Ignoring HandOverDone in BecomingOldest from [{}]. Expected previous oldest [{}]", @@ -645,7 +687,7 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se if m.uniqueAddress == previousOldest => logInfo("Previous oldest [{}] removed", previousOldest.address) addRemoved(m.uniqueAddress) - gotoOldest() + tryGoToOldest() case Event(TakeOverFromMe, BecomingOldestData(previousOldestOption)) => val senderAddress = sender().path.address @@ -682,7 +724,7 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se // can't send HandOverToMe, previousOldest unknown for new node (or restart) // previous oldest might be down or removed, so no TakeOverFromMe message is received logInfo("Timeout in BecomingOldest. Previous oldest unknown, removed and no TakeOver request.") - gotoOldest() + tryGoToOldest() } else if (cluster.isTerminated) stop() else @@ -698,47 +740,109 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se self ! DelayedMemberRemoved(m) } - def gotoOldest(): State = { + def tryAcquireLease() = { + import context.dispatcher + pipe(lease.get.acquire(reason => self ! LeaseLost(reason)).map[Any](AcquireLeaseResult).recover { + case NonFatal(t) => AcquireLeaseFailure(t) + }).to(self) + goto(AcquiringLease).using(AcquiringLeaseData(leaseRequestInProgress = true, None)) + } + + // Try and go to oldest, taking the lease if needed + def tryGoToOldest(): State = { + // check if lease + lease match { + case None => + goToOldest() + case Some(_) => + logInfo("Trying to acquire lease before starting singleton") + tryAcquireLease() + } + } + + when(AcquiringLease) { + case Event(AcquireLeaseResult(result), _) => + logInfo("Acquire lease result {}", result) + if (result) { + goToOldest() + } else { + setTimer(LeaseRetryTimer, LeaseRetry, leaseRetryInterval) + stay.using(AcquiringLeaseData(leaseRequestInProgress = false, None)) + } + case Event(Terminated(ref), AcquiringLeaseData(_, Some(singleton))) if ref == singleton => + logInfo("Singleton actor terminated. Trying to acquire lease again before re-creating.") + // tryAcquireLease sets the state to None for singleton actor + tryAcquireLease() + case Event(AcquireLeaseFailure(t), _) => + log.error(t, "failed to get lease (will be retried)") + setTimer(LeaseRetryTimer, LeaseRetry, leaseRetryInterval) + stay.using(AcquiringLeaseData(leaseRequestInProgress = false, None)) + case Event(LeaseRetry, _) => + // If lease was lost (so previous state was oldest) then we don't try and get the lease + // until the old singleton instance has been terminated so we know there isn't an + // instance in this case + tryAcquireLease() + case Event(OldestChanged(oldestOption), AcquiringLeaseData(_, singleton)) => + handleOldestChanged(singleton, oldestOption) + case Event(HandOverToMe, AcquiringLeaseData(_, singleton)) => + gotoHandingOver(singleton, Some(sender())) + case Event(TakeOverFromMe, _) => + // already oldest, so confirm and continue like that + sender() ! HandOverToMe + stay + case Event(SelfExiting, _) => + selfMemberExited() + // complete memberExitingProgress when handOverDone + sender() ! Done // reply to ask + stay + case Event(MemberDowned(m), _) if m.uniqueAddress == cluster.selfUniqueAddress => + logInfo("Self downed, stopping ClusterSingletonManager") + stop() + } + + def goToOldest(): State = { val singleton = context.watch(context.actorOf(singletonProps, singletonName)) logInfo("Singleton manager starting singleton actor [{}]", singleton.path) - goto(Oldest).using(OldestData(singleton)) + goto(Oldest).using(OldestData(Some(singleton))) + } + + def handleOldestChanged(singleton: Option[ActorRef], oldestOption: Option[UniqueAddress]) = { + oldestChangedReceived = true + logInfo("{} observed OldestChanged: [{} -> {}]", stateName, cluster.selfAddress, oldestOption.map(_.address)) + oldestOption match { + case Some(a) if a == cluster.selfUniqueAddress => + // already oldest + stay + case Some(a) if !selfExited && removed.contains(a) => + // The member removal was not completed and the old removed node is considered + // oldest again. Safest is to terminate the singleton instance and goto Younger. + // This node will become oldest again when the other is removed again. + gotoHandingOver(singleton, None) + case Some(a) => + // send TakeOver request in case the new oldest doesn't know previous oldest + peer(a.address) ! TakeOverFromMe + setTimer(TakeOverRetryTimer, TakeOverRetry(1), handOverRetryInterval, repeat = false) + goto(WasOldest).using(WasOldestData(singleton, newOldestOption = Some(a))) + case None => + // new oldest will initiate the hand-over + setTimer(TakeOverRetryTimer, TakeOverRetry(1), handOverRetryInterval, repeat = false) + goto(WasOldest).using(WasOldestData(singleton, newOldestOption = None)) + } } when(Oldest) { - case Event(OldestChanged(oldestOption), OldestData(singleton, singletonTerminated)) => - oldestChangedReceived = true - logInfo("Oldest observed OldestChanged: [{} -> {}]", cluster.selfAddress, oldestOption.map(_.address)) - oldestOption match { - case Some(a) if a == cluster.selfUniqueAddress => - // already oldest - stay - case Some(a) if !selfExited && removed.contains(a) => - // The member removal was not completed and the old removed node is considered - // oldest again. Safest is to terminate the singleton instance and goto Younger. - // This node will become oldest again when the other is removed again. - gotoHandingOver(singleton, singletonTerminated, None) - case Some(a) => - // send TakeOver request in case the new oldest doesn't know previous oldest - peer(a.address) ! TakeOverFromMe - setTimer(TakeOverRetryTimer, TakeOverRetry(1), handOverRetryInterval, repeat = false) - goto(WasOldest).using(WasOldestData(singleton, singletonTerminated, newOldestOption = Some(a))) - case None => - // new oldest will initiate the hand-over - setTimer(TakeOverRetryTimer, TakeOverRetry(1), handOverRetryInterval, repeat = false) - goto(WasOldest).using(WasOldestData(singleton, singletonTerminated, newOldestOption = None)) - } - - case Event(HandOverToMe, OldestData(singleton, singletonTerminated)) => - gotoHandingOver(singleton, singletonTerminated, Some(sender())) - + case Event(OldestChanged(oldestOption), OldestData(singleton)) => + handleOldestChanged(singleton, oldestOption) + case Event(HandOverToMe, OldestData(singleton)) => + gotoHandingOver(singleton, Some(sender())) case Event(TakeOverFromMe, _) => // already oldest, so confirm and continue like that sender() ! HandOverToMe stay - case Event(Terminated(ref), d @ OldestData(singleton, _)) if ref == singleton => + case Event(Terminated(ref), d @ OldestData(Some(singleton))) if ref == singleton => logInfo("Singleton actor [{}] was terminated", singleton.path) - stay.using(d.copy(singletonTerminated = true)) + stay.using(d.copy(singleton = None)) case Event(SelfExiting, _) => selfMemberExited() @@ -746,22 +850,34 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se sender() ! Done // reply to ask stay - case Event(MemberDowned(m), OldestData(singleton, singletonTerminated)) - if m.uniqueAddress == cluster.selfUniqueAddress => - if (singletonTerminated) { - logInfo("Self downed, stopping ClusterSingletonManager") - stop() - } else { - logInfo("Self downed, stopping") - gotoStopping(singleton) + case Event(MemberDowned(m), OldestData(singleton)) if m.uniqueAddress == cluster.selfUniqueAddress => + singleton match { + case Some(s) => + logInfo("Self downed, stopping") + gotoStopping(s) + case None => + logInfo("Self downed, stopping ClusterSingletonManager") + stop() + } + + case Event(LeaseLost(reason), OldestData(singleton)) => + log.warning("Lease has been lost. Reason: {}. Terminating singleton and trying to re-acquire lease", reason) + singleton match { + case Some(s) => + s ! terminationMessage + goto(AcquiringLease).using(AcquiringLeaseData(leaseRequestInProgress = false, singleton)) + case None => + tryAcquireLease() } } when(WasOldest) { - case Event(TakeOverRetry(count), WasOldestData(singleton, singletonTerminated, newOldestOption)) => + case Event(TakeOverRetry(count), WasOldestData(singleton, newOldestOption)) => if ((cluster.isTerminated || selfExited) && (newOldestOption.isEmpty || count > maxTakeOverRetries)) { - if (singletonTerminated) stop() - else gotoStopping(singleton) + singleton match { + case Some(s) => gotoStopping(s) + case None => stop() + } } else if (count <= maxTakeOverRetries) { if (maxTakeOverRetries - count <= 3) logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newOldestOption.map(_.address)) @@ -773,21 +889,20 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se } else throw new ClusterSingletonManagerIsStuck(s"Expected hand-over to [$newOldestOption] never occurred") - case Event(HandOverToMe, WasOldestData(singleton, singletonTerminated, _)) => - gotoHandingOver(singleton, singletonTerminated, Some(sender())) - + case Event(HandOverToMe, WasOldestData(singleton, _)) => + gotoHandingOver(singleton, Some(sender())) case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress && !selfExited => logInfo("Self removed, stopping ClusterSingletonManager") stop() - case Event(MemberRemoved(m, _), WasOldestData(singleton, singletonTerminated, Some(newOldest))) + case Event(MemberRemoved(m, _), WasOldestData(singleton, Some(newOldest))) if !selfExited && m.uniqueAddress == newOldest => addRemoved(m.uniqueAddress) - gotoHandingOver(singleton, singletonTerminated, None) + gotoHandingOver(singleton, None) - case Event(Terminated(ref), d @ WasOldestData(singleton, _, _)) if ref == singleton => - logInfo("Singleton actor [{}] was terminated", singleton.path) - stay.using(d.copy(singletonTerminated = true)) + case Event(Terminated(ref), d @ WasOldestData(singleton, _)) if singleton.contains(ref) => + logInfo("Singleton actor [{}] was terminated", ref.path) + stay.using(d.copy(singleton = None)) case Event(SelfExiting, _) => selfMemberExited() @@ -795,34 +910,34 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se sender() ! Done // reply to ask stay - case Event(MemberDowned(m), OldestData(singleton, singletonTerminated)) - if m.uniqueAddress == cluster.selfUniqueAddress => - if (singletonTerminated) { - logInfo("Self downed, stopping ClusterSingletonManager") - stop() - } else { - logInfo("Self downed, stopping") - gotoStopping(singleton) + case Event(MemberDowned(m), WasOldestData(singleton, _)) if m.uniqueAddress == cluster.selfUniqueAddress => + singleton match { + case None => + logInfo("Self downed, stopping ClusterSingletonManager") + stop() + case Some(s) => + logInfo("Self downed, stopping") + gotoStopping(s) } - } - def gotoHandingOver(singleton: ActorRef, singletonTerminated: Boolean, handOverTo: Option[ActorRef]): State = { - if (singletonTerminated) { - handOverDone(handOverTo) - } else { - handOverTo.foreach { _ ! HandOverInProgress } - logInfo("Singleton manager stopping singleton actor [{}]", singleton.path) - singleton ! terminationMessage - goto(HandingOver).using(HandingOverData(singleton, handOverTo)) + def gotoHandingOver(singleton: Option[ActorRef], handOverTo: Option[ActorRef]): State = { + singleton match { + case None => + handOverDone(handOverTo) + case Some(s) => + handOverTo.foreach { _ ! HandOverInProgress } + logInfo("Singleton manager stopping singleton actor [{}]", s.path) + s ! terminationMessage + goto(HandingOver).using(HandingOverData(s, handOverTo)) } } when(HandingOver) { - case (Event(Terminated(ref), HandingOverData(singleton, handOverTo))) if ref == singleton => + case Event(Terminated(ref), HandingOverData(singleton, handOverTo)) if ref == singleton => handOverDone(handOverTo) - case Event(HandOverToMe, HandingOverData(singleton, handOverTo)) if handOverTo == Some(sender()) => + case Event(HandOverToMe, HandingOverData(_, handOverTo)) if handOverTo.contains(sender()) => // retry sender() ! HandOverInProgress stay @@ -855,7 +970,7 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se } when(Stopping) { - case (Event(Terminated(ref), StoppingData(singleton))) if ref == singleton => + case Event(Terminated(ref), StoppingData(singleton)) if ref == singleton => logInfo("Singleton actor [{}] was terminated", singleton.path) stop() } @@ -901,6 +1016,20 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se if (m.uniqueAddress == cluster.selfUniqueAddress) logInfo("Self downed, waiting for removal") stay + case Event(ReleaseLeaseFailure(t), _) => + log.error( + t, + "Failed to release lease. Singleton may not be able to run on another node until lease timeout occurs") + stay + case Event(ReleaseLeaseResult(released), _) => + if (released) { + logInfo("Lease released") + } else { + // TODO we could retry + log.error( + "Failed to release lease. Singleton may not be able to run on another node until lease timeout occurs") + } + stay } onTransition { @@ -916,6 +1045,29 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se case WasOldest -> _ => cancelTimer(TakeOverRetryTimer) } + onTransition { + case (AcquiringLease, to) if to != Oldest => + stateData match { + case AcquiringLeaseData(true, _) => + logInfo("Releasing lease as leaving AcquiringLease going to [{}]", to) + import context.dispatcher + lease.foreach(l => + pipe(l.release().map[Any](ReleaseLeaseResult).recover { + case t => ReleaseLeaseFailure(t) + }).to(self)) + case _ => + } + } + + onTransition { + case Oldest -> _ => + lease.foreach { l => + logInfo("Releasing lease as leaving Oldest") + import context.dispatcher + pipe(l.release().map(ReleaseLeaseResult)).to(self) + } + } + onTransition { case _ -> (Younger | Oldest) => getNextOldestChanged() } diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/TestLeaseActor.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/TestLeaseActor.scala new file mode 100644 index 0000000000..f22640eb3c --- /dev/null +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/TestLeaseActor.scala @@ -0,0 +1,116 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.cluster + +import java.util.concurrent.atomic.AtomicReference + +import scala.concurrent.Future +import scala.concurrent.duration._ +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.ExtendedActorSystem +import akka.actor.Extension +import akka.actor.ExtensionId +import akka.actor.ExtensionIdProvider +import akka.actor.Props +import akka.cluster.TestLeaseActor.{ Acquire, Create, Release } +import akka.event.Logging +import akka.coordination.lease.LeaseSettings +import akka.coordination.lease.scaladsl.Lease +import akka.pattern.ask +import akka.util.Timeout + +object TestLeaseActor { + def props(probe: ActorRef): Props = + Props(new TestLeaseActor(probe)) + + sealed trait LeaseRequest + final case class Acquire(owner: String) extends LeaseRequest + final case class Release(owner: String) extends LeaseRequest + final case class Create(leaseName: String, ownerName: String) + + final case object GetRequests + final case class LeaseRequests(requests: List[LeaseRequest]) + final case class ActionRequest(request: LeaseRequest, result: Any) // boolean of Failure +} + +class TestLeaseActor(probe: ActorRef) extends Actor with ActorLogging { + import TestLeaseActor._ + + var requests: List[(ActorRef, LeaseRequest)] = Nil + + override def receive = { + + case c: Create ⇒ + log.info("Lease created with name {} ownerName {}", c.leaseName, c.ownerName) + + case request: LeaseRequest ⇒ + log.info("Lease request {} from {}", request, sender()) + requests = (sender(), request) :: requests + + case GetRequests ⇒ + sender() ! LeaseRequests(requests.map(_._2)) + + case ActionRequest(request, result) ⇒ + requests.find(_._2 == request) match { + case Some((snd, req)) ⇒ + log.info("Actioning request {} to {}", req, result) + snd ! result + requests = requests.filterNot(_._2 == request) + case None ⇒ + throw new RuntimeException(s"unknown request to action: ${request}. Requests: ${requests}") + } + + } + +} + +object TestLeaseActorClientExt extends ExtensionId[TestLeaseActorClientExt] with ExtensionIdProvider { + override def get(system: ActorSystem): TestLeaseActorClientExt = super.get(system) + override def lookup = TestLeaseActorClientExt + override def createExtension(system: ExtendedActorSystem): TestLeaseActorClientExt = + new TestLeaseActorClientExt(system) +} + +class TestLeaseActorClientExt(val system: ExtendedActorSystem) extends Extension { + + private val leaseActor = new AtomicReference[ActorRef]() + + def getLeaseActor(): ActorRef = { + val lease = leaseActor.get + if (lease == null) throw new IllegalStateException("LeaseActorRef must be set first") + lease + } + + def setActorLease(client: ActorRef): Unit = + leaseActor.set(client) + +} + +class TestLeaseActorClient(settings: LeaseSettings, system: ExtendedActorSystem) extends Lease(settings) { + + private val log = Logging(system, getClass) + val leaseActor = TestLeaseActorClientExt(system).getLeaseActor() + + log.info("lease created {}", settings) + leaseActor ! Create(settings.leaseName, settings.ownerName) + + private implicit val timeout = Timeout(100.seconds) + + override def acquire(): Future[Boolean] = { + (leaseActor ? Acquire(settings.ownerName)).mapTo[Boolean] + } + + override def release(): Future[Boolean] = { + (leaseActor ? Release(settings.ownerName)).mapTo[Boolean] + } + + override def checkLease(): Boolean = false + + override def acquire(callback: Option[Throwable] ⇒ Unit): Future[Boolean] = + (leaseActor ? Acquire(settings.ownerName)).mapTo[Boolean] +} diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerLeaseSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerLeaseSpec.scala new file mode 100644 index 0000000000..436f37801c --- /dev/null +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerLeaseSpec.scala @@ -0,0 +1,216 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.cluster.singleton + +import akka.actor.{ Actor, ActorIdentity, ActorLogging, ActorRef, Address, Identify, PoisonPill, Props } +import akka.cluster.MemberStatus.Up +import akka.cluster.TestLeaseActor._ +import akka.cluster.singleton.ClusterSingletonManagerLeaseSpec.ImportantSingleton.Response +import akka.cluster._ +import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec } +import akka.remote.transport.ThrottlerTransportAdapter.Direction +import akka.testkit._ +import com.typesafe.config.ConfigFactory + +import scala.language.postfixOps +import scala.concurrent.duration._ + +object ClusterSingletonManagerLeaseSpec extends MultiNodeConfig { + val controller = role("controller") + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + + testTransport(true) + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = "cluster" + akka.remote.log-remote-lifecycle-events = off + akka.cluster.auto-down-unreachable-after = 0s + test-lease { + lease-class = akka.cluster.TestLeaseActorClient + heartbeat-interval = 1s + heartbeat-timeout = 120s + lease-operation-timeout = 3s + } + akka.cluster.singleton { + use-lease = "test-lease" + } + """)) + + nodeConfig(first, second, third)(ConfigFactory.parseString("akka.cluster.roles = [worker]")) + + object ImportantSingleton { + case class Response(msg: Any, address: Address) + + def props(): Props = Props(new ImportantSingleton()) + } + + class ImportantSingleton extends Actor with ActorLogging { + val selfAddress = Cluster(context.system).selfAddress + override def preStart(): Unit = { + log.info("Singleton starting") + } + override def postStop(): Unit = { + log.info("Singleton stopping") + } + override def receive: Receive = { + case msg ⇒ + sender() ! Response(msg, selfAddress) + } + } +} + +class ClusterSingletonManagerLeaseMultiJvmNode1 extends ClusterSingletonManagerLeaseSpec +class ClusterSingletonManagerLeaseMultiJvmNode2 extends ClusterSingletonManagerLeaseSpec +class ClusterSingletonManagerLeaseMultiJvmNode3 extends ClusterSingletonManagerLeaseSpec +class ClusterSingletonManagerLeaseMultiJvmNode4 extends ClusterSingletonManagerLeaseSpec +class ClusterSingletonManagerLeaseMultiJvmNode5 extends ClusterSingletonManagerLeaseSpec + +class ClusterSingletonManagerLeaseSpec + extends MultiNodeSpec(ClusterSingletonManagerLeaseSpec) + with STMultiNodeSpec + with ImplicitSender + with MultiNodeClusterSpec { + + import ClusterSingletonManagerLeaseSpec.ImportantSingleton._ + import ClusterSingletonManagerLeaseSpec._ + + override def initialParticipants = roles.size + + // used on the controller + val leaseProbe = TestProbe() + + "Cluster singleton manager with lease" should { + + "form a cluster" in { + awaitClusterUp(controller, first) + enterBarrier("initial-up") + runOn(second) { + joinWithin(first) + awaitAssert({ + cluster.state.members.toList.map(_.status) shouldEqual List(Up, Up, Up) + }, 10.seconds) + } + enterBarrier("second-up") + runOn(third) { + joinWithin(first) + awaitAssert({ + cluster.state.members.toList.map(_.status) shouldEqual List(Up, Up, Up, Up) + }, 10.seconds) + } + enterBarrier("third-up") + runOn(fourth) { + joinWithin(first) + awaitAssert({ + cluster.state.members.toList.map(_.status) shouldEqual List(Up, Up, Up, Up, Up) + }, 10.seconds) + } + enterBarrier("fourth-up") + } + + "start test lease" in { + runOn(controller) { + system.actorOf(TestLeaseActor.props(leaseProbe.ref), s"lease-${system.name}") + } + enterBarrier("lease-actor-started") + } + + "find the lease on every node" in { + system.actorSelection(node(controller) / "user" / s"lease-${system.name}") ! Identify(None) + val leaseRef: ActorRef = expectMsgType[ActorIdentity].ref.get + TestLeaseActorClientExt(system).setActorLease(leaseRef) + enterBarrier("singleton-started") + } + + "Start singleton and ping from all nodes" in { + runOn(first, second, third, fourth) { + system.actorOf( + ClusterSingletonManager + .props(props(), PoisonPill, ClusterSingletonManagerSettings(system).withRole("worker")), + "important") + } + enterBarrier("singleton-started") + + val proxy = system.actorOf( + ClusterSingletonProxy.props( + singletonManagerPath = "/user/important", + settings = ClusterSingletonProxySettings(system).withRole("worker"))) + + runOn(first, second, third, fourth) { + proxy ! "Ping" + // lease has not been granted so now allowed to come up + expectNoMessage(2.seconds) + } + + enterBarrier("singleton-pending") + + runOn(controller) { + TestLeaseActorClientExt(system).getLeaseActor() ! GetRequests + expectMsg(LeaseRequests(List(Acquire(address(first).hostPort)))) + TestLeaseActorClientExt(system).getLeaseActor() ! ActionRequest(Acquire(address(first).hostPort), true) + } + enterBarrier("lease-acquired") + + runOn(first, second, third, fourth) { + expectMsg(Response("Ping", address(first))) + } + enterBarrier("pinged") + } + + "Move singleton when oldest node downed" in { + + cluster.state.members.size shouldEqual 5 + runOn(controller) { + cluster.down(address(first)) + awaitAssert({ + cluster.state.members.toList.map(_.status) shouldEqual List(Up, Up, Up, Up) + }, 20.seconds) + val requests = awaitAssert({ + TestLeaseActorClientExt(system).getLeaseActor() ! GetRequests + val msg = expectMsgType[LeaseRequests] + withClue("Requests: " + msg) { + msg.requests.size shouldEqual 2 + } + msg + }, 10.seconds) + + requests.requests should contain(Release(address(first).hostPort)) + requests.requests should contain(Acquire(address(second).hostPort)) + } + runOn(second, third, fourth) { + awaitAssert({ + cluster.state.members.toList.map(_.status) shouldEqual List(Up, Up, Up, Up) + }, 20.seconds) + } + enterBarrier("first node downed") + val proxy = system.actorOf( + ClusterSingletonProxy.props( + singletonManagerPath = "/user/important", + settings = ClusterSingletonProxySettings(system).withRole("worker"))) + + runOn(second, third, fourth) { + proxy ! "Ping" + // lease has not been granted so now allowed to come up + expectNoMessage(2.seconds) + } + enterBarrier("singleton-not-migrated") + + runOn(controller) { + TestLeaseActorClientExt(system).getLeaseActor() ! ActionRequest(Acquire(address(second).hostPort), true) + } + + enterBarrier("singleton-moved-to-second") + + runOn(second, third, fourth) { + proxy ! "Ping" + expectMsg(Response("Ping", address(second))) + } + enterBarrier("finished") + } + } +} diff --git a/akka-cluster-tools/src/test/scala/akka/cluster/TestLease.scala b/akka-cluster-tools/src/test/scala/akka/cluster/TestLease.scala new file mode 100644 index 0000000000..1a26d28943 --- /dev/null +++ b/akka-cluster-tools/src/test/scala/akka/cluster/TestLease.scala @@ -0,0 +1,96 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.cluster + +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicReference + +import akka.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider } +import akka.coordination.lease.LeaseSettings +import akka.coordination.lease.scaladsl.Lease +import akka.event.Logging +import akka.testkit.TestProbe +import com.typesafe.config.ConfigFactory + +import scala.concurrent.{ Future, Promise } +import scala.collection.JavaConverters._ + +object TestLeaseExt extends ExtensionId[TestLeaseExt] with ExtensionIdProvider { + override def get(system: ActorSystem): TestLeaseExt = super.get(system) + override def lookup = TestLeaseExt + override def createExtension(system: ExtendedActorSystem): TestLeaseExt = new TestLeaseExt(system) +} + +class TestLeaseExt(val system: ExtendedActorSystem) extends Extension { + + private val testLeases = new ConcurrentHashMap[String, TestLease]() + + def getTestLease(name: String): TestLease = { + val lease = testLeases.get(name) + if (lease == null) + throw new IllegalStateException( + s"Test lease $name has not been set yet. Current leases ${testLeases.keys().asScala.toList}") + lease + } + + def setTestLease(name: String, lease: TestLease): Unit = + testLeases.put(name, lease) + +} + +object TestLease { + final case class AcquireReq(owner: String) + final case class ReleaseReq(owner: String) + + val config = ConfigFactory.parseString(""" + test-lease { + lease-class = akka.cluster.TestLease + } + """.stripMargin) +} + +class TestLease(settings: LeaseSettings, system: ExtendedActorSystem) extends Lease(settings) { + import TestLease._ + + val log = Logging(system, getClass) + val probe = TestProbe()(system) + + log.info("Creating lease {}", settings) + + TestLeaseExt(system).setTestLease(settings.leaseName, this) + + val initialPromise = Promise[Boolean] + + private val nextAcquireResult = new AtomicReference[Future[Boolean]](initialPromise.future) + private val nextCheckLeaseResult = new AtomicReference[Boolean](false) + private val currentCallBack = new AtomicReference[Option[Throwable] ⇒ Unit](_ ⇒ ()) + + def setNextAcquireResult(next: Future[Boolean]): Unit = + nextAcquireResult.set(next) + + def setNextCheckLeaseResult(value: Boolean): Unit = + nextCheckLeaseResult.set(value) + + def getCurrentCallback(): Option[Throwable] ⇒ Unit = currentCallBack.get() + + override def acquire(): Future[Boolean] = { + log.info("acquire, current response " + nextAcquireResult) + probe.ref ! AcquireReq(settings.ownerName) + nextAcquireResult.get() + } + + override def release(): Future[Boolean] = { + probe.ref ! ReleaseReq(settings.ownerName) + Future.successful(true) + } + + override def checkLease(): Boolean = nextCheckLeaseResult.get + + override def acquire(callback: Option[Throwable] ⇒ Unit): Future[Boolean] = { + currentCallBack.set(callback) + acquire() + } + +} diff --git a/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonLeaseSpec.scala b/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonLeaseSpec.scala new file mode 100644 index 0000000000..1cb36f9754 --- /dev/null +++ b/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonLeaseSpec.scala @@ -0,0 +1,195 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.cluster.singleton + +import java.util.concurrent.atomic.AtomicInteger + +import akka.actor.{ Actor, ActorLogging, ActorRef, ExtendedActorSystem, PoisonPill, Props } +import akka.cluster.TestLease.{ AcquireReq, ReleaseReq } +import akka.cluster.{ Cluster, MemberStatus, TestLease, TestLeaseExt } +import akka.testkit.{ AkkaSpec, TestProbe } +import com.typesafe.config.ConfigFactory + +import scala.concurrent.Promise +import scala.concurrent.duration._ +import scala.util.Success + +class ImportantSingleton(lifeCycleProbe: ActorRef) extends Actor with ActorLogging { + + override def preStart(): Unit = { + log.info("Important Singleton Starting") + lifeCycleProbe ! "preStart" + } + + override def postStop(): Unit = { + log.info("Important Singleton Stopping") + lifeCycleProbe ! "postStop" + } + + override def receive: Receive = { + case msg ⇒ + sender() ! msg + } +} + +class ClusterSingletonLeaseSpec extends AkkaSpec(ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = cluster + + akka.cluster.singleton { + use-lease = "test-lease" + lease-retry-interval = 2000ms + } + """).withFallback(TestLease.config)) { + + val cluster = Cluster(system) + val testLeaseExt = TestLeaseExt(system) + + override protected def atStartup(): Unit = { + cluster.join(cluster.selfAddress) + awaitAssert { + cluster.selfMember.status shouldEqual MemberStatus.Up + } + } + + def extSystem: ExtendedActorSystem = system.asInstanceOf[ExtendedActorSystem] + + val counter = new AtomicInteger() + + def nextName() = s"important-${counter.getAndIncrement()}" + + val shortDuration = 50.millis + + val leaseOwner = cluster.selfMember.address.hostPort + + def nextSettings() = ClusterSingletonManagerSettings(system).withSingletonName(nextName()) + + def leaseNameFor(settings: ClusterSingletonManagerSettings): String = + s"ClusterSingletonLeaseSpec-singleton-akka://ClusterSingletonLeaseSpec/user/${settings.singletonName}" + + "A singleton with lease" should { + + "not start until lease is available" in { + val probe = TestProbe() + val settings = nextSettings() + system.actorOf( + ClusterSingletonManager.props(Props(new ImportantSingleton(probe.ref)), PoisonPill, settings), + settings.singletonName) + val testLease = awaitAssert { + testLeaseExt.getTestLease(leaseNameFor(settings)) + } // allow singleton manager to create the lease + probe.expectNoMessage(shortDuration) + testLease.initialPromise.complete(Success(true)) + probe.expectMsg("preStart") + } + + "do not start if lease acquire returns false" in { + val probe = TestProbe() + val settings = nextSettings() + system.actorOf( + ClusterSingletonManager.props(Props(new ImportantSingleton(probe.ref)), PoisonPill, settings), + settings.singletonName) + val testLease = awaitAssert { + testLeaseExt.getTestLease(leaseNameFor(settings)) + } // allow singleton manager to create the lease + probe.expectNoMessage(shortDuration) + testLease.initialPromise.complete(Success(false)) + probe.expectNoMessage(shortDuration) + } + + "retry trying to get lease if acquire returns false" in { + val singletonProbe = TestProbe() + val settings = nextSettings() + system.actorOf( + ClusterSingletonManager.props(Props(new ImportantSingleton(singletonProbe.ref)), PoisonPill, settings), + settings.singletonName) + val testLease = awaitAssert { + testLeaseExt.getTestLease(leaseNameFor(settings)) + } // allow singleton manager to create the lease + testLease.probe.expectMsg(AcquireReq(leaseOwner)) + singletonProbe.expectNoMessage(shortDuration) + val nextResponse = Promise[Boolean] + testLease.setNextAcquireResult(nextResponse.future) + testLease.initialPromise.complete(Success(false)) + testLease.probe.expectMsg(AcquireReq(leaseOwner)) + singletonProbe.expectNoMessage(shortDuration) + nextResponse.complete(Success(true)) + singletonProbe.expectMsg("preStart") + } + + "do not start if lease acquire fails" in { + val probe = TestProbe() + val settings = nextSettings() + system.actorOf( + ClusterSingletonManager.props(Props(new ImportantSingleton(probe.ref)), PoisonPill, settings), + settings.singletonName) + val testLease = awaitAssert { + testLeaseExt.getTestLease(leaseNameFor(settings)) + } // allow singleton manager to create the lease + probe.expectNoMessage(shortDuration) + testLease.initialPromise.failure(new RuntimeException("no lease for you")) + probe.expectNoMessage(shortDuration) + } + + "retry trying to get lease if acquire returns fails" in { + val singletonProbe = TestProbe() + val settings = nextSettings() + system.actorOf( + ClusterSingletonManager.props(Props(new ImportantSingleton(singletonProbe.ref)), PoisonPill, settings), + settings.singletonName) + val testLease = awaitAssert { + testLeaseExt.getTestLease(leaseNameFor(settings)) + } // allow singleton manager to create the lease + testLease.probe.expectMsg(AcquireReq(leaseOwner)) + singletonProbe.expectNoMessage(shortDuration) + val nextResponse = Promise[Boolean] + testLease.setNextAcquireResult(nextResponse.future) + testLease.initialPromise.failure(new RuntimeException("no lease for you")) + testLease.probe.expectMsg(AcquireReq(leaseOwner)) + singletonProbe.expectNoMessage(shortDuration) + nextResponse.complete(Success(true)) + singletonProbe.expectMsg("preStart") + } + + "stop singleton if the lease fails periodic check" in { + val lifecycleProbe = TestProbe() + val settings = nextSettings() + system.actorOf( + ClusterSingletonManager.props(Props(new ImportantSingleton(lifecycleProbe.ref)), PoisonPill, settings), + settings.singletonName) + val testLease = awaitAssert { + testLeaseExt.getTestLease(leaseNameFor(settings)) + } + testLease.probe.expectMsg(AcquireReq(leaseOwner)) + testLease.initialPromise.complete(Success(true)) + lifecycleProbe.expectMsg("preStart") + val callback = testLease.getCurrentCallback() + callback(None) + lifecycleProbe.expectMsg("postStop") + testLease.probe.expectMsg(ReleaseReq(leaseOwner)) + + // should try and reacquire lease + testLease.probe.expectMsg(AcquireReq(leaseOwner)) + lifecycleProbe.expectMsg("preStart") + } + + "release lease when leaving oldest" in { + val singletonProbe = TestProbe() + val settings = nextSettings() + system.actorOf( + ClusterSingletonManager.props(Props(new ImportantSingleton(singletonProbe.ref)), PoisonPill, settings), + settings.singletonName) + val testLease = awaitAssert { + testLeaseExt.getTestLease(leaseNameFor(settings)) + } // allow singleton manager to create the lease + singletonProbe.expectNoMessage(shortDuration) + testLease.probe.expectMsg(AcquireReq(leaseOwner)) + testLease.initialPromise.complete(Success(true)) + singletonProbe.expectMsg("preStart") + cluster.leave(cluster.selfAddress) + testLease.probe.expectMsg(ReleaseReq(leaseOwner)) + } + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 1a0e73fe0c..e89ed9fabf 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -251,7 +251,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro awaitCond( { clusterView.refreshCurrentState() - if (memberInState(joinNode, List(MemberStatus.up)) && + if (memberInState(joinNode, List(MemberStatus.Up)) && memberInState(myself, List(MemberStatus.Joining, MemberStatus.Up))) true else { diff --git a/akka-coordination/src/main/resources/reference.conf b/akka-coordination/src/main/resources/reference.conf new file mode 100644 index 0000000000..310ab52099 --- /dev/null +++ b/akka-coordination/src/main/resources/reference.conf @@ -0,0 +1,22 @@ +akka.coordination { + + # Defaults for any lease implementation that doesn't include these properties + lease { + + # FQCN of the implementation of the Lease + lease-class = "" + + #defaults + # if the node that acquired the leases crashes, how long should the lease be held before another owner can get it + heartbeat-timeout = 120s + + # interval for communicating with the third party to confirm the lease is still held + heartbeat-interval = 12s + + # lease implementations are expected to time out acquire and release calls or document + # that they do not implement an operation timeout + lease-operation-timeout = 5s + + #defaults + } +} diff --git a/akka-coordination/src/main/scala/akka/coordination/lease/LeaseException.scala b/akka-coordination/src/main/scala/akka/coordination/lease/LeaseException.scala new file mode 100644 index 0000000000..7ab4de5101 --- /dev/null +++ b/akka-coordination/src/main/scala/akka/coordination/lease/LeaseException.scala @@ -0,0 +1,13 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.coordination.lease + +import akka.annotation.ApiMayChange + +@ApiMayChange +class LeaseException(message: String) extends RuntimeException(message) + +@ApiMayChange +final class LeaseTimeoutException(message: String) extends LeaseException(message) diff --git a/akka-coordination/src/main/scala/akka/coordination/lease/LeaseSettings.scala b/akka-coordination/src/main/scala/akka/coordination/lease/LeaseSettings.scala new file mode 100644 index 0000000000..813012618f --- /dev/null +++ b/akka-coordination/src/main/scala/akka/coordination/lease/LeaseSettings.scala @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.coordination.lease + +import akka.annotation.ApiMayChange +import com.typesafe.config.Config + +object LeaseSettings { + @ApiMayChange + def apply(config: Config, leaseName: String, ownerName: String): LeaseSettings = { + new LeaseSettings(leaseName, ownerName, TimeoutSettings(config), config) + } +} + +@ApiMayChange +final class LeaseSettings( + val leaseName: String, + val ownerName: String, + val timeoutSettings: TimeoutSettings, + val leaseConfig: Config) { + + def withTimeoutSettings(timeoutSettings: TimeoutSettings): LeaseSettings = + new LeaseSettings(leaseName, ownerName, timeoutSettings, leaseConfig) + + override def toString = s"LeaseSettings($leaseName, $ownerName, $timeoutSettings)" +} diff --git a/akka-coordination/src/main/scala/akka/coordination/lease/TimeoutSettings.scala b/akka-coordination/src/main/scala/akka/coordination/lease/TimeoutSettings.scala new file mode 100644 index 0000000000..6c20a9e5a1 --- /dev/null +++ b/akka-coordination/src/main/scala/akka/coordination/lease/TimeoutSettings.scala @@ -0,0 +1,87 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.coordination.lease + +import akka.annotation.ApiMayChange +import com.typesafe.config.{ Config, ConfigValueType } +import akka.util.JavaDurationConverters._ +import scala.concurrent.duration._ + +object TimeoutSettings { + @ApiMayChange + def apply(config: Config): TimeoutSettings = { + val heartBeatTimeout = config.getDuration("heartbeat-timeout").asScala + val heartBeatInterval = config.getValue("heartbeat-interval").valueType() match { + case ConfigValueType.STRING if config.getString("heartbeat-interval").isEmpty ⇒ + (heartBeatTimeout / 10).max(5.seconds) + case _ ⇒ config.getDuration("heartbeat-interval").asScala + } + require(heartBeatInterval < (heartBeatTimeout / 2), "heartbeat-interval must be less than half heartbeat-timeout") + new TimeoutSettings(heartBeatInterval, heartBeatTimeout, config.getDuration("lease-operation-timeout").asScala) + } + +} + +@ApiMayChange +final class TimeoutSettings( + val heartbeatInterval: FiniteDuration, + val heartbeatTimeout: FiniteDuration, + val operationTimeout: FiniteDuration) { + + /** + * Java API + */ + def getHeartbeatInterval(): java.time.Duration = heartbeatInterval.asJava + + /** + * Java API + */ + def getHeartbeatTimeout(): java.time.Duration = heartbeatTimeout.asJava + + /** + * Java API + */ + def getOperationTimeout(): java.time.Duration = operationTimeout.asJava + + /** + * Java API + */ + def withHeartbeatInterval(heartbeatInterval: java.time.Duration): TimeoutSettings = { + copy(heartbeatInterval = heartbeatInterval.asScala) + } + + /** + * Java API + */ + def withHeartbeatTimeout(heartbeatTimeout: java.time.Duration): TimeoutSettings = { + copy(heartbeatTimeout = heartbeatTimeout.asScala) + } + + /** + * Java API + */ + def withOperationTimeout(operationTimeout: java.time.Duration): TimeoutSettings = { + copy(operationTimeout = operationTimeout.asScala) + } + + def withHeartbeatInterval(heartbeatInterval: FiniteDuration): TimeoutSettings = { + copy(heartbeatInterval = heartbeatInterval) + } + def withHeartbeatTimeout(heartbeatTimeout: FiniteDuration): TimeoutSettings = { + copy(heartbeatTimeout = heartbeatTimeout) + } + def withOperationTimeout(operationTimeout: FiniteDuration): TimeoutSettings = { + copy(operationTimeout = operationTimeout) + } + + private def copy( + heartbeatInterval: FiniteDuration = heartbeatInterval, + heartbeatTimeout: FiniteDuration = heartbeatTimeout, + operationTimeout: FiniteDuration = operationTimeout): TimeoutSettings = { + new TimeoutSettings(heartbeatInterval, heartbeatTimeout, operationTimeout) + } + + override def toString = s"TimeoutSettings($heartbeatInterval, $heartbeatTimeout, $operationTimeout)" +} diff --git a/akka-coordination/src/main/scala/akka/coordination/lease/internal/LeaseAdapter.scala b/akka-coordination/src/main/scala/akka/coordination/lease/internal/LeaseAdapter.scala new file mode 100644 index 0000000000..5c54558939 --- /dev/null +++ b/akka-coordination/src/main/scala/akka/coordination/lease/internal/LeaseAdapter.scala @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.coordination.lease.internal + +import java.util.Optional +import java.util.concurrent.CompletionStage +import java.util.function.Consumer + +import akka.annotation.InternalApi +import akka.coordination.lease.LeaseSettings +import akka.coordination.lease.javadsl.Lease +import akka.coordination.lease.scaladsl.{ Lease => ScalaLease } + +import scala.compat.java8.FutureConverters._ +import scala.compat.java8.OptionConverters._ +import scala.concurrent.ExecutionContext + +/** + * INTERNAL API + */ +@InternalApi +final private[akka] class LeaseAdapter(delegate: ScalaLease)(implicit val ec: ExecutionContext) extends Lease { + + override def acquire(): CompletionStage[java.lang.Boolean] = delegate.acquire().map(Boolean.box).toJava + + override def acquire(leaseLostCallback: Consumer[Optional[Throwable]]): CompletionStage[java.lang.Boolean] = { + delegate.acquire(o ⇒ leaseLostCallback.accept(o.asJava)).map(Boolean.box).toJava + } + + override def release(): CompletionStage[java.lang.Boolean] = delegate.release().map(Boolean.box).toJava + override def checkLease(): Boolean = delegate.checkLease() + override def getSettings(): LeaseSettings = delegate.settings +} diff --git a/akka-coordination/src/main/scala/akka/coordination/lease/javadsl/Lease.scala b/akka-coordination/src/main/scala/akka/coordination/lease/javadsl/Lease.scala new file mode 100644 index 0000000000..39d5c67483 --- /dev/null +++ b/akka-coordination/src/main/scala/akka/coordination/lease/javadsl/Lease.scala @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.coordination.lease.javadsl + +import java.util.Optional +import java.util.concurrent.CompletionStage + +import akka.annotation.ApiMayChange +import akka.coordination.lease.LeaseSettings + +@ApiMayChange +abstract class Lease() { + + def getSettings(): LeaseSettings + + /** + * Try to acquire the lease. The returned `CompletionStage` will be completed with `true` + * if the lease could be acquired, i.e. no other owner is holding the lease. + * + * The returned `Future` will be completed with `false` if the lease for certain couldn't be + * acquired, e.g. because some other owner is holding it. It's completed with [[akka.coordination.lease.LeaseException]] + * failure if it might not have been able to acquire the lease, e.g. communication timeout + * with the lease resource. + * + * The lease will be held by the [[LeaseSettings.ownerName]] until it is released + * with [[Lease.release]]. A Lease implementation will typically also loose the ownership + * if it can't maintain its authority, e.g. if it crashes or is partitioned from the lease + * resource for too long. + * + * [[Lease.checkLease]] can be used to verify that the owner still has the lease. + */ + def acquire(): CompletionStage[java.lang.Boolean] + + /** + * Same as acquire with an additional callback + * that is called if the lease is lost. The lease can be lose due to being unable + * to communicate with the lease provider. + * Implementations should not call leaseLostCallback until after the returned future + * has been completed + */ + def acquire(leaseLostCallback: java.util.function.Consumer[Optional[Throwable]]): CompletionStage[java.lang.Boolean] + + /** + * Release the lease so some other owner can acquire it. + */ + def release(): CompletionStage[java.lang.Boolean] + + /** + * Check if the owner still holds the lease. + * `true` means that it certainly holds the lease. + * `false` means that it might not hold the lease, but it could, and for more certain + * response you would have to use [[Lease#acquire()*]] or [[Lease#release]]. + */ + def checkLease(): Boolean + +} diff --git a/akka-coordination/src/main/scala/akka/coordination/lease/javadsl/LeaseProvider.scala b/akka-coordination/src/main/scala/akka/coordination/lease/javadsl/LeaseProvider.scala new file mode 100644 index 0000000000..e2c78f8f67 --- /dev/null +++ b/akka-coordination/src/main/scala/akka/coordination/lease/javadsl/LeaseProvider.scala @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.coordination.lease.javadsl + +import akka.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider } +import akka.annotation.ApiMayChange +import akka.coordination.lease.internal.LeaseAdapter +import akka.coordination.lease.scaladsl.{ LeaseProvider ⇒ ScalaLeaseProvider } + +@ApiMayChange +object LeaseProvider extends ExtensionId[LeaseProvider] with ExtensionIdProvider { + override def get(system: ActorSystem): LeaseProvider = super.get(system) + + override def lookup = LeaseProvider + + override def createExtension(system: ExtendedActorSystem): LeaseProvider = new LeaseProvider(system) + + private final case class LeaseKey(leaseName: String, configPath: String, clientName: String) +} + +@ApiMayChange +class LeaseProvider(system: ExtendedActorSystem) extends Extension { + private val delegate = ScalaLeaseProvider(system) + + /** + * The configuration define at `configPath` must have a property `lease-class` that defines + * the fully qualified class name of the Lease implementation. + * The class must implement [[Lease]] and have constructor with [[akka.coordination.lease.LeaseSettings]] parameter and + * optionally ActorSystem parameter. + * + * @param leaseName the name of the lease resource + * @param configPath the path of configuration for the lease + * @param ownerName the owner that will `acquire` the lease, e.g. hostname and port of the ActorSystem + */ + def getLease(leaseName: String, configPath: String, ownerName: String): Lease = { + val scalaLease = delegate.getLease(leaseName, configPath, ownerName) + new LeaseAdapter(scalaLease)(system.dispatcher) + } +} diff --git a/akka-coordination/src/main/scala/akka/coordination/lease/scaladsl/Lease.scala b/akka-coordination/src/main/scala/akka/coordination/lease/scaladsl/Lease.scala new file mode 100644 index 0000000000..693bd3f138 --- /dev/null +++ b/akka-coordination/src/main/scala/akka/coordination/lease/scaladsl/Lease.scala @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.coordination.lease.scaladsl + +import akka.annotation.ApiMayChange +import akka.coordination.lease.LeaseSettings + +import scala.concurrent.Future + +@ApiMayChange +abstract class Lease(val settings: LeaseSettings) { + + /** + * Try to acquire the lease. The returned `Future` will be completed with `true` + * if the lease could be acquired, i.e. no other owner is holding the lease. + * + * The returned `Future` will be completed with `false` if the lease for certain couldn't be + * acquired, e.g. because some other owner is holding it. It's completed with [[akka.coordination.lease.LeaseException]] + * failure if it might not have been able to acquire the lease, e.g. communication timeout + * with the lease resource. + * + * The lease will be held by the [[akka.coordination.lease.LeaseSettings.ownerName]] until it is released + * with [[Lease.release]]. A Lease implementation will typically also lose the ownership + * if it can't maintain its authority, e.g. if it crashes or is partitioned from the lease + * resource for too long. + * + * [[Lease.checkLease]] can be used to verify that the owner still has the lease. + */ + def acquire(): Future[Boolean] + + /** + * Same as acquire with an additional callback + * that is called if the lease is lost. The lease can be lose due to being unable + * to communicate with the lease provider. + * Implementations should not call leaseLostCallback until after the returned future + * has been completed + */ + def acquire(leaseLostCallback: Option[Throwable] ⇒ Unit): Future[Boolean] + + /** + * Release the lease so some other owner can acquire it. + */ + def release(): Future[Boolean] + + /** + * Check if the owner still holds the lease. + * `true` means that it certainly holds the lease. + * `false` means that it might not hold the lease, but it could, and for more certain + * response you would have to use [[Lease#acquire()*]] or [[Lease#release]]. + */ + def checkLease(): Boolean + +} diff --git a/akka-coordination/src/main/scala/akka/coordination/lease/scaladsl/LeaseProvider.scala b/akka-coordination/src/main/scala/akka/coordination/lease/scaladsl/LeaseProvider.scala new file mode 100644 index 0000000000..2ad301b477 --- /dev/null +++ b/akka-coordination/src/main/scala/akka/coordination/lease/scaladsl/LeaseProvider.scala @@ -0,0 +1,92 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.coordination.lease.scaladsl + +import java.util.concurrent.ConcurrentHashMap +import java.util.function.{ Function ⇒ JFunction } + +import scala.collection.immutable +import scala.util.Failure +import scala.util.Success +import akka.actor.ActorSystem +import akka.actor.ExtendedActorSystem +import akka.actor.Extension +import akka.actor.ExtensionId +import akka.actor.ExtensionIdProvider +import akka.annotation.ApiMayChange +import akka.event.Logging +import akka.coordination.lease.LeaseSettings + +@ApiMayChange +object LeaseProvider extends ExtensionId[LeaseProvider] with ExtensionIdProvider { + override def get(system: ActorSystem): LeaseProvider = super.get(system) + + override def lookup = LeaseProvider + + override def createExtension(system: ExtendedActorSystem): LeaseProvider = new LeaseProvider(system) + + private final case class LeaseKey(leaseName: String, configPath: String, clientName: String) +} + +@ApiMayChange +class LeaseProvider(system: ExtendedActorSystem) extends Extension { + import LeaseProvider.LeaseKey + + private val log = Logging(system, getClass) + private val leases = new ConcurrentHashMap[LeaseKey, Lease]() + + /** + * The configuration define at `configPath` must have a property `lease-class` that defines + * the fully qualified class name of the Lease implementation. + * The class must implement [[Lease]] and have constructor with [[akka.coordination.lease.LeaseSettings]] parameter and + * optionally ActorSystem parameter. + * + * @param leaseName the name of the lease resource + * @param configPath the path of configuration for the lease + * @param ownerName the owner that will `acquire` the lease, e.g. hostname and port of the ActorSystem + */ + def getLease(leaseName: String, configPath: String, ownerName: String): Lease = { + val leaseKey = LeaseKey(leaseName, configPath, ownerName) + leases.computeIfAbsent( + leaseKey, + new JFunction[LeaseKey, Lease] { + override def apply(t: LeaseKey): Lease = { + val leaseConfig = system.settings.config + .getConfig(configPath) + .withFallback(system.settings.config.getConfig("akka.coordination.lease")) + loadLease(LeaseSettings(leaseConfig, leaseName, ownerName), configPath) + } + }) + } + + private def loadLease(leaseSettings: LeaseSettings, configPath: String): Lease = { + val fqcn = leaseSettings.leaseConfig.getString("lease-class") + require(fqcn.nonEmpty, "lease-class must not be empty") + val dynamicAccess = system.dynamicAccess + dynamicAccess + .createInstanceFor[Lease]( + fqcn, + immutable.Seq((classOf[LeaseSettings], leaseSettings), (classOf[ExtendedActorSystem], system))) + .recoverWith { + case _: NoSuchMethodException ⇒ + dynamicAccess.createInstanceFor[Lease](fqcn, immutable.Seq((classOf[LeaseSettings], leaseSettings))) + + } match { + case Success(value) ⇒ value + case Failure(e) ⇒ + log.error( + e, + "Invalid lease configuration for leaseName [{}], configPath [{}] lease-class [{}]. " + + "The class must implement Lease and have constructor with LeaseSettings parameter and " + + "optionally ActorSystem parameter.", + leaseSettings.leaseName, + configPath, + fqcn) + throw e + } + } + + // TODO how to clean up a lease? Not important for this use case as we'll only have one lease +} diff --git a/akka-coordination/src/test/java/akka/coordination/lease/javadsl/LeaseProviderTest.java b/akka-coordination/src/test/java/akka/coordination/lease/javadsl/LeaseProviderTest.java new file mode 100644 index 0000000000..67641ac6b2 --- /dev/null +++ b/akka-coordination/src/test/java/akka/coordination/lease/javadsl/LeaseProviderTest.java @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.coordination.lease.javadsl; + +import akka.actor.ActorSystem; +import akka.coordination.lease.scaladsl.LeaseProviderSpec; +import akka.testkit.AkkaJUnitActorSystemResource; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class LeaseProviderTest { + @Rule + public AkkaJUnitActorSystemResource actorSystemResource = + new AkkaJUnitActorSystemResource("LoggingAdapterTest", LeaseProviderSpec.config()); + + private ActorSystem system = null; + + @Before + public void before() { + system = actorSystemResource.getSystem(); + } + + @Test + public void loadLeaseImpl() { + Lease leaseA = LeaseProvider.get(system).getLease("a", "lease-a", "owner1"); + + assertEquals(leaseA.getSettings().leaseName(), "a"); + assertEquals(leaseA.getSettings().ownerName(), "owner1"); + assertEquals(leaseA.getSettings().leaseConfig().getString("key1"), "value1"); + + Lease leaseB = LeaseProvider.get(system).getLease("b", "lease-b", "owner2"); + + assertEquals(leaseB.getSettings().leaseName(), "b"); + assertEquals(leaseB.getSettings().ownerName(), "owner2"); + assertEquals(leaseB.getSettings().leaseConfig().getString("key2"), "value2"); + } +} diff --git a/akka-coordination/src/test/java/jdocs/akka/coordination/lease/LeaseDocTest.java b/akka-coordination/src/test/java/jdocs/akka/coordination/lease/LeaseDocTest.java new file mode 100644 index 0000000000..900bbe33ac --- /dev/null +++ b/akka-coordination/src/test/java/jdocs/akka/coordination/lease/LeaseDocTest.java @@ -0,0 +1,94 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package jdocs.akka.coordination.lease; + +import akka.actor.ActorSystem; +import akka.cluster.Cluster; +import akka.coordination.lease.LeaseSettings; +import akka.coordination.lease.javadsl.Lease; +import akka.coordination.lease.javadsl.LeaseProvider; +import akka.testkit.javadsl.TestKit; +import docs.akka.coordination.LeaseDocSpec; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Optional; +import java.util.concurrent.CompletionStage; +import java.util.function.Consumer; + +@SuppressWarnings("unused") +public class LeaseDocTest { + // #lease-example + static class SampleLease extends Lease { + + private LeaseSettings settings; + + public SampleLease(LeaseSettings settings) { + this.settings = settings; + } + + @Override + public LeaseSettings getSettings() { + return settings; + } + + @Override + public CompletionStage acquire() { + return null; + } + + @Override + public CompletionStage acquire(Consumer> leaseLostCallback) { + return null; + } + + @Override + public CompletionStage release() { + return null; + } + + @Override + public boolean checkLease() { + return false; + } + } + // #lease-example + + private static ActorSystem system; + + @BeforeClass + public static void setup() { + system = ActorSystem.create("LeaseDocTest", LeaseDocSpec.config()); + } + + @AfterClass + public static void teardown() { + TestKit.shutdownActorSystem(system); + system = null; + } + + private void doSomethingImportant(Optional leaseLostReason) {} + + @Test + public void beLoadable() { + // #lease-usage + Lease lease = + LeaseProvider.get(system).getLease("", "docs-lease", ""); + CompletionStage acquired = lease.acquire(); + boolean stillAcquired = lease.checkLease(); + CompletionStage released = lease.release(); + // #lease-usage + + // #lost-callback + lease.acquire(this::doSomethingImportant); + // #lost-callback + + // #cluster-owner + String owner = Cluster.get(system).selfAddress().hostPort(); + // #cluster-owner + + } +} diff --git a/akka-coordination/src/test/scala/akka/coordination/lease/TimeoutSettingsSpec.scala b/akka-coordination/src/test/scala/akka/coordination/lease/TimeoutSettingsSpec.scala new file mode 100644 index 0000000000..58235b4b6b --- /dev/null +++ b/akka-coordination/src/test/scala/akka/coordination/lease/TimeoutSettingsSpec.scala @@ -0,0 +1,54 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.coordination.lease + +import com.typesafe.config.ConfigFactory +import org.scalatest.{ Matchers, WordSpec } + +import scala.concurrent.duration._ + +class TimeoutSettingsSpec extends WordSpec with Matchers { + private def conf(overrides: String): TimeoutSettings = { + val c = ConfigFactory.parseString(overrides).withFallback(ConfigFactory.load()) + TimeoutSettings(c) + } + "TimeoutSettings" should { + "default heartbeat-interval to heartbeat-timeout / 10" in { + conf(""" + heartbeat-timeout=100s + heartbeat-interval="" + lease-operation-timeout=5s + """).heartbeatInterval shouldEqual 10.second + } + + "have a min of 5s for heartbeat-interval" in { + conf(""" + heartbeat-timeout=40s + heartbeat-interval="" + lease-operation-timeout=5s + """).heartbeatInterval shouldEqual 5.second + } + + "allow overriding of heartbeat-interval" in { + conf(""" + heartbeat-timeout=100s + heartbeat-interval=20s + lease-operation-timeout=5s + """).heartbeatInterval shouldEqual 20.second + } + + "not allow interval to be greater or equal to half the interval" in { + intercept[IllegalArgumentException] { + conf(""" + heartbeat-timeout=100s + heartbeat-interval=50s + lease-operation-timeout=5s + """) + }.getMessage shouldEqual "requirement failed: heartbeat-interval must be less than half heartbeat-timeout" + + } + } + +} diff --git a/akka-coordination/src/test/scala/akka/coordination/lease/scaladsl/LeaseProviderSpec.scala b/akka-coordination/src/test/scala/akka/coordination/lease/scaladsl/LeaseProviderSpec.scala new file mode 100644 index 0000000000..e5ba51e447 --- /dev/null +++ b/akka-coordination/src/test/scala/akka/coordination/lease/scaladsl/LeaseProviderSpec.scala @@ -0,0 +1,128 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.coordination.lease.scaladsl + +import scala.concurrent.Future +import akka.actor.ExtendedActorSystem +import akka.coordination.lease.LeaseSettings +import akka.testkit.{ AkkaSpec, EventFilter } +import com.typesafe.config.ConfigFactory + +import scala.concurrent.duration._ + +object LeaseProviderSpec { + class LeaseA(settings: LeaseSettings) extends Lease(settings) { + override def acquire(): Future[Boolean] = Future.successful(false) + override def release(): Future[Boolean] = Future.successful(false) + override def checkLease(): Boolean = false + override def acquire(callback: Option[Throwable] ⇒ Unit): Future[Boolean] = Future.successful(false) + } + + class LeaseB(settings: LeaseSettings, system: ExtendedActorSystem) extends Lease(settings) { + system.name // warning + override def acquire(): Future[Boolean] = Future.successful(false) + override def release(): Future[Boolean] = Future.successful(false) + override def checkLease(): Boolean = false + override def acquire(callback: Option[Throwable] ⇒ Unit): Future[Boolean] = Future.successful(false) + } + + val config = ConfigFactory.parseString(s""" + lease-a { + lease-class = "${classOf[LeaseProviderSpec.LeaseA].getName}" + key1 = value1 + heartbeat-timeout = 100s + heartbeat-interval = 1s + lease-operation-timeout = 2s + } + + lease-b { + lease-class = "${classOf[LeaseProviderSpec.LeaseB].getName}" + key2 = value2 + heartbeat-timeout = 120s + heartbeat-interval = 1s + lease-operation-timeout = 2s + } + + lease-missing { + } + + lease-unknown { + lease-class = "foo.wrong.ClassName" + heartbeat-timeout = 120s + heartbeat-interval = 1s + lease-operation-timeout = 2s + } + + lease-fallback-to-defaults { + lease-class = "${classOf[LeaseProviderSpec.LeaseA].getName}" + } + + """) +} + +class LeaseProviderSpec extends AkkaSpec(LeaseProviderSpec.config) { + import LeaseProviderSpec._ + + "LeaseProvider" must { + + "load lease implementation" in { + val leaseA = LeaseProvider(system).getLease("a", "lease-a", "owner1") + leaseA.getClass should ===(classOf[LeaseA]) + leaseA.settings.leaseName should ===("a") + leaseA.settings.ownerName should ===("owner1") + leaseA.settings.leaseConfig.getString("key1") should ===("value1") + leaseA.settings.timeoutSettings.heartbeatTimeout should ===(100.seconds) + leaseA.settings.timeoutSettings.heartbeatInterval should ===(1.seconds) + leaseA.settings.timeoutSettings.operationTimeout should ===(2.seconds) + + val leaseB = LeaseProvider(system).getLease("b", "lease-b", "owner2") + leaseB.getClass should ===(classOf[LeaseB]) + leaseB.settings.leaseName should ===("b") + leaseB.settings.ownerName should ===("owner2") + leaseB.settings.leaseConfig.getString("key2") should ===("value2") + } + + "load defaults for timeouts if not specified" in { + val defaults = LeaseProvider(system).getLease("a", "lease-fallback-to-defaults", "owner1") + defaults.settings.timeoutSettings.operationTimeout should ===(5.seconds) + defaults.settings.timeoutSettings.heartbeatTimeout should ===(120.seconds) + defaults.settings.timeoutSettings.heartbeatInterval should ===(12.seconds) + } + + "return same instance for same leaseName, configPath and owner" in { + val leaseA1 = LeaseProvider(system).getLease("a2", "lease-a", "owner1") + val leaseA2 = LeaseProvider(system).getLease("a2", "lease-a", "owner1") + leaseA1 shouldBe theSameInstanceAs(leaseA2) + } + + "return different instance for different leaseName" in { + val leaseA1 = LeaseProvider(system).getLease("a3", "lease-a", "owner1") + val leaseA2 = LeaseProvider(system).getLease("a3b", "lease-a", "owner1") + leaseA1 should not be theSameInstanceAs(leaseA2) + } + + "return different instance for different ownerName" in { + val leaseA1 = LeaseProvider(system).getLease("a4", "lease-a", "owner1") + val leaseA2 = LeaseProvider(system).getLease("a4", "lease-a", "owner2") + leaseA1 should not be theSameInstanceAs(leaseA2) + } + + "throw if missing lease-class config" in { + intercept[IllegalArgumentException] { + LeaseProvider(system).getLease("x", "lease-missing", "owner1") + }.getMessage should include("lease-class must not be empty") + } + + "throw if unknown lease-class config" in { + intercept[ClassNotFoundException] { + EventFilter[ClassNotFoundException](occurrences = 1).intercept { + LeaseProvider(system).getLease("x", "lease-unknown", "owner1") + } + } + } + + } + +} diff --git a/akka-coordination/src/test/scala/docs/akka/coordination/LeaseDocSpec.scala b/akka-coordination/src/test/scala/docs/akka/coordination/LeaseDocSpec.scala new file mode 100644 index 0000000000..201f940f39 --- /dev/null +++ b/akka-coordination/src/test/scala/docs/akka/coordination/LeaseDocSpec.scala @@ -0,0 +1,88 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package docs.akka.coordination + +import akka.cluster.Cluster +import akka.coordination.lease.LeaseSettings +import akka.coordination.lease.scaladsl.{ Lease, LeaseProvider } +import akka.testkit.AkkaSpec +import com.typesafe.config.ConfigFactory + +import scala.concurrent.Future + +//#lease-example +class SampleLease(settings: LeaseSettings) extends Lease(settings) { + + override def acquire(): Future[Boolean] = { + Future.successful(true) + } + + override def acquire(leaseLostCallback: Option[Throwable] => Unit): Future[Boolean] = { + Future.successful(true) + } + + override def release(): Future[Boolean] = { + Future.successful(true) + } + + override def checkLease(): Boolean = { + true + } +} +//#lease-example + +object LeaseDocSpec { + + val config = ConfigFactory.parseString(""" + #lease-config + akka.actor.provider = cluster + docs-lease { + lease-class = "docs.akka.coordination.SampleLease" + heartbeat-timeout = 100s + heartbeat-interval = 1s + lease-operation-timeout = 1s + # Any lease specific configuration + } + #lease-config + """.stripMargin) + + def blackhole(stuff: Any*): Unit = { + stuff.toString + () + } + def doSomethingImportant(leaseLostReason: Option[Throwable]): Unit = { + leaseLostReason.map(_.toString) + () + } +} + +class LeaseDocSpec extends AkkaSpec(LeaseDocSpec.config) { + import LeaseDocSpec._ + + "A docs lease" should { + "be loadable" in { + + //#lease-usage + val lease = LeaseProvider(system).getLease("", "docs-lease", "owner") + val acquired: Future[Boolean] = lease.acquire() + val stillAcquired: Boolean = lease.checkLease() + val released: Future[Boolean] = lease.release() + //#lease-usage + + //#lost-callback + lease.acquire(leaseLostReason => doSomethingImportant(leaseLostReason)) + //#lost-callback + + //#cluster-owner + val owner = Cluster(system).selfAddress.hostPort + //#cluster-owner + + // remove compiler warnings + blackhole(acquired, stillAcquired, released, owner) + + } + } + +} diff --git a/akka-docs/src/main/paradox/cluster-sharding.md b/akka-docs/src/main/paradox/cluster-sharding.md index bc34ceaff5..e6270401f4 100644 --- a/akka-docs/src/main/paradox/cluster-sharding.md +++ b/akka-docs/src/main/paradox/cluster-sharding.md @@ -517,3 +517,24 @@ When doing rolling upgrades special care must be taken to not change any of the * the persistence mode If any one of these needs a change it will require a full cluster restart. + + +## Lease + +A @ref[lease](coordination.md) can be used as an additional safety measure to ensure a shard +does not run on two nodes. + +Reasons for how this can happen: + +* Network partitions without an appropriate downing provider +* Mistakes in the deployment process leading to two separate Akka Clusters +* Timing issues between removing members from the Cluster on one side of a network partition and shutting them down on the other side + +A lease can be a final backup that means that each shard won't create child entity actors unless it has the lease. + +To use a lease for sharding set `akka.cluster.sharding.use-lease` to the configuration location +of the lease to use. Each shard will try and acquire a lease with with the name `-shard--` and +the owner is set to the `Cluster(system).selfAddress.hostPort`. + +If a shard can't acquire a lease it will remain uninitialized so messages for entities it owns will +be buffered in the `ShardRegion`. If the lease is lost after initialization the Shard will be terminated. diff --git a/akka-docs/src/main/paradox/cluster-singleton.md b/akka-docs/src/main/paradox/cluster-singleton.md index 50189f0488..e5b43c564c 100644 --- a/akka-docs/src/main/paradox/cluster-singleton.md +++ b/akka-docs/src/main/paradox/cluster-singleton.md @@ -184,3 +184,23 @@ Scala Java : @@snip [ClusterSingletonSupervision.java](/akka-docs/src/test/java/jdocs/cluster/singleton/ClusterSingletonSupervision.java) { #singleton-supervisor-actor-usage-imports } @@snip [ClusterSingletonSupervision.java](/akka-docs/src/test/java/jdocs/cluster/singleton/ClusterSingletonSupervision.java) { #singleton-supervisor-actor-usage } + +## Lease + +A @ref[lease](coordination.md) can be used as an additional safety measure to ensure that two singletons +don't run at the same time. Reasons for how this can happen: + +* Network partitions without an appropriate downing provider +* Mistakes in the deployment process leading to two separate Akka Clusters +* Timing issues between removing members from the Cluster on one side of a network partition and shutting them down on the other side + +A lease can be a final backup that means that the singleton actor won't be created unless +the lease can be acquired. + +To use a lease for singleton set `akka.cluster.singleton.use-lease` to the configuration location +of the lease to use. A lease with with the name `-singleton-` is used and +the owner is set to the @scala[`Cluster(system).selfAddress.hostPort`]@java[`Cluster.get(system).selfAddress().hostPort()`]. + +If the cluster singleton manager can't acquire the lease it will keep retrying while it is the oldest node in the cluster. +If the lease is lost then the singleton actor will be terminated then the lease will be re-tried. + diff --git a/akka-docs/src/main/paradox/coordination.md b/akka-docs/src/main/paradox/coordination.md new file mode 100644 index 0000000000..0ba40cbc6e --- /dev/null +++ b/akka-docs/src/main/paradox/coordination.md @@ -0,0 +1,112 @@ +# Coordination + +@@@ warning + +This module is currently marked as @ref:[may change](common/may-change.md). It is ready to be used +in production but the API may change without warning or a deprecation period. + +@@@ + +Akka Coordination is a set of tools for distributed coordination. + +## Dependency + +@@dependency[sbt,Gradle,Maven] { + group="com.typesafe.akka" + artifact="akka-coordination_$scala.binary_version$" + version="$akka.version$" +} + +## Lease + +The lease is a pluggable API for a distributed lock. + +## Using a lease + +Leases are loaded with: + +* Lease name +* Config location to indicate which implementation should be loaded +* Owner name + +Any lease implementation should provide the following guarantees: + +* A lease with the same name loaded multiple times, even on different nodes, is the same lease +* Only one owner can acquire the lease at a time + +To acquire a lease: + +Scala +: @@snip [LeaseDocSpec.scala](/akka-coordination/src/test/scala/docs/akka/coordination/LeaseDocSpec.scala) { #lease-usage } + +Java +: @@snip [LeaseDocTest.java](/akka-coordination/src/test/java/jdocs/akka/coordination/lease/LeaseDocTest.java) { #lease-usage } + +Acquiring a lease returns a @scala[Future]@java[CompletionStage] as lease implementations typically are implemented +via a third party system such as the Kubernetes API server or Zookeeper. + +Once a lease is acquired `checkLease` can be called to ensure that the lease is still acquired. As lease implementations +are based on other distributed systems a lease can be lost due to a timeout with the third party system. This operation is +not asynchronous so it can be called before performing any action for which having the lease is important. + +A lease has an owner. If the same owner tries to acquire the lease multiple times it will succeed i.e. leases are reentrant. + +It is important to pick a lease name that will be unique for your use case. If a lease needs to be unique for each node +in a Cluster the cluster host port can be use: + +Scala +: @@snip [LeaseDocSpec.scala](/akka-coordination/src/test/scala/docs/akka/coordination/LeaseDocSpec.scala) { #cluster-owner } + +Java +: @@snip [LeaseDocTest.scala](/akka-coordination/src/test/java/jdocs/akka/coordination/lease/LeaseDocTest.java) { #cluster-owner } + +For use cases where multiple different leases on the same node then something unique must be added to the name. For example +a lease can be used with Cluster Sharding and in this case the shard Id is included in the lease name for each shard. + +## Usages in other Akka modules + +Leases can be used for @ref[Cluster Singletons](cluster-singleton.md#lease) and @ref[Cluster Sharding](cluster-sharding.md#lease). + +## Lease implementations + +* [Kubernetes API](https://developer.lightbend.com/docs/akka-commercial-addons/current/kubernetes-lease.html) + +## Implementing a lease + +Implementations should extend +the @scala[`akka.coordination.lease.scaladsl.Lease`]@java[`akka.coordination.lease.javadsl.Lease`] + +Scala +: @@snip [LeaseDocSpec.scala](/akka-coordination/src/test/scala/docs/akka/coordination/LeaseDocSpec.scala) { #lease-example } + +Java +: @@snip [LeaseDocTest.scala](/akka-coordination/src/test/java/jdocs/akka/coordination/lease/LeaseDocTest.java) { #lease-example } + +The methods should provide the following guarantees: + +* `acquire` should complete with: `true` if the lease has been acquired, `false` if the lease is taken by another owner, or fail if it can't communicate with the third party system implementing the lease. +* `release` should complete with: `true` if the lease has definitely been released, `false` if the lease has definitely not been released, or fail if it is unknown if the lease has been released. +* `checkLease` should return false until an `acquire` @scala[Future]@java[CompletionStage] has completed and should return `false` if the lease is lost due to an error communicating with the third party. Check lease should also not block. +* The `acquire` lease lost callback should only be called after an `aquire` @scala[Future]@java[CompletionStage] has completed and should be called if the lease is lose e.g. due to losing communication with the third party system. + +In addition it is expected that a lease implementation will include a time to live mechanism meaning that a lease won't be held for ever in case the node crashes. +If a user prefers to have outside intervention in this case for maximum safety then the time to live can be set to infinite. + +The configuration must define the `lease-class` property for the FQCN of the lease implementation. + +The lease implementation should have support for the following properties where the defaults come from `akka.coordination.lease`: + +@@snip [reference.conf](/akka-coordination/src/main/resources/reference.conf) { #defaults } + +This configuration location is passed into `getLease`. + +Scala +: @@snip [LeaseDocSpec.scala](/akka-coordination/src/test/scala/docs/akka/coordination/LeaseDocSpec.scala) { #lease-config } + +Java +: @@snip [LeaseDocSpec.scala](/akka-coordination/src/test/scala/docs/akka/coordination/LeaseDocSpec.scala) { #lease-config } + + + + + diff --git a/akka-docs/src/main/paradox/index.md b/akka-docs/src/main/paradox/index.md index 2868099797..3896f0162c 100644 --- a/akka-docs/src/main/paradox/index.md +++ b/akka-docs/src/main/paradox/index.md @@ -13,6 +13,7 @@ * [stream/index](stream/index.md) * [index-network](index-network.md) * [discovery](discovery/index.md) +* [coordination](coordination.md) * [index-futures](index-futures.md) * [index-utilities](index-utilities.md) * [common/other-modules](common/other-modules.md) diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala index 05ac416faa..47539b4c8b 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -580,7 +580,7 @@ private[akka] class BarrierCoordinator with LoggingFSM[BarrierCoordinator.State, BarrierCoordinator.Data] { import BarrierCoordinator._ import Controller._ - import FSM.`->` + import FSM._ // this shall be set to true if all subsequent barriers shall fail var failed = false diff --git a/build.sbt b/build.sbt index 54e3060d50..c8e4609e59 100644 --- a/build.sbt +++ b/build.sbt @@ -6,8 +6,9 @@ enablePlugins(UnidocRoot, TimeStampede, UnidocWithPrValidation, NoPublish, Copyr JavaFormatterPlugin) disablePlugins(MimaPlugin) addCommandAlias( - name ="fixall", + name = "fixall", value = ";scalafixEnable;compile:scalafix;test:scalafix;multi-jvm:scalafix;test:compile;reload") + import akka.AkkaBuild._ import akka.{AkkaBuild, Dependencies, GitHub, OSGi, Protobuf, SigarLoader, VersionGenerator} import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm @@ -48,7 +49,8 @@ lazy val aggregatedProjects: Seq[ProjectReference] = Seq( clusterTyped, clusterShardingTyped, benchJmhTyped, streamTyped, - discovery + discovery, + coordination ) lazy val root = Project( @@ -134,7 +136,7 @@ lazy val camel = akkaModule("akka-camel") .settings(OSGi.camel) lazy val cluster = akkaModule("akka-cluster") - .dependsOn(remote, remoteTests % "test->test" , testkit % "test->test") + .dependsOn(remote, remoteTests % "test->test", testkit % "test->test") .settings(Dependencies.cluster) .settings(AutomaticModuleName.settings("akka.cluster")) .settings(OSGi.cluster) @@ -168,7 +170,7 @@ lazy val clusterSharding = akkaModule("akka-cluster-sharding") cluster % "compile->compile;test->test;multi-jvm->multi-jvm", distributedData, persistence % "compile->compile", - clusterTools + clusterTools % "compile->compile;test->test" ) .settings(Dependencies.clusterSharding) .settings(AutomaticModuleName.settings("akka.cluster.sharding")) @@ -178,7 +180,10 @@ lazy val clusterSharding = akkaModule("akka-cluster-sharding") .enablePlugins(MultiNode, ScaladocNoVerificationOfDiagrams) lazy val clusterTools = akkaModule("akka-cluster-tools") - .dependsOn(cluster % "compile->compile;test->test;multi-jvm->multi-jvm") + .dependsOn( + cluster % "compile->compile;test->test;multi-jvm->multi-jvm", + coordination + ) .settings(Dependencies.clusterTools) .settings(AutomaticModuleName.settings("akka.cluster.tools")) .settings(OSGi.clusterTools) @@ -192,16 +197,17 @@ lazy val contrib = akkaModule("akka-contrib") .settings(AutomaticModuleName.settings("akka.contrib")) .settings(OSGi.contrib) .settings( - description := """| - |This subproject provides a home to modules contributed by external - |developers which may or may not move into the officially supported code - |base over time. A module in this subproject doesn't have to obey the rule - |of staying binary compatible between minor releases. Breaking API changes - |may be introduced in minor releases without notice as we refine and - |simplify based on your feedback. A module may be dropped in any release - |without prior deprecation. The Lightbend subscription does not cover - |support for these modules. - |""".stripMargin + description := + """| + |This subproject provides a home to modules contributed by external + |developers which may or may not move into the officially supported code + |base over time. A module in this subproject doesn't have to obey the rule + |of staying binary compatible between minor releases. Breaking API changes + |may be introduced in minor releases without notice as we refine and + |simplify based on your feedback. A module may be dropped in any release + |without prior deprecation. The Lightbend subscription does not cover + |support for these modules. + |""".stripMargin ) .configs(MultiJvm) .enablePlugins(MultiNode, ScaladocNoVerificationOfDiagrams) @@ -234,7 +240,7 @@ lazy val docs = akkaModule("akka-docs") ) .settings(Dependencies.docs) .settings( - name in (Compile, paradox) := "Akka", + name in(Compile, paradox) := "Akka", Compile / paradoxProperties ++= Map( "canonical.base_url" -> "https://doc.akka.io/docs/akka/current", "github.base_url" -> GitHub.url(version.value), // for links like this: @github[#1](#1) or @github[83986f9](83986f9) @@ -325,7 +331,7 @@ lazy val persistenceTck = akkaModule("akka-persistence-tck") .dependsOn(persistence % "compile->compile;test->test", testkit % "compile->compile;test->test") .settings(Dependencies.persistenceTck) .settings(AutomaticModuleName.settings("akka.persistence.tck")) -//.settings(OSGi.persistenceTck) TODO: we do need to export this as OSGi bundle too? + //.settings(OSGi.persistenceTck) TODO: we do need to export this as OSGi bundle too? .settings( fork in Test := true ) @@ -414,7 +420,8 @@ lazy val actorTyped = akkaModule("akka-actor-typed") .settings(OSGi.actorTyped) .settings(AkkaBuild.noScala211) .settings( - initialCommands := """ + initialCommands := + """ import akka.actor.typed._ import akka.actor.typed.scaladsl.Behaviors import scala.concurrent._ @@ -474,7 +481,7 @@ lazy val clusterShardingTyped = akkaModule("akka-cluster-sharding-typed") .settings(AkkaBuild.noScala211) .settings(AutomaticModuleName.settings("akka.cluster.sharding.typed")) // To be able to import ContainerFormats.proto - .settings(Protobuf.importPath := Some(baseDirectory.value / ".." / "akka-remote" / "src" / "main" / "protobuf" )) + .settings(Protobuf.importPath := Some(baseDirectory.value / ".." / "akka-remote" / "src" / "main" / "protobuf")) .disablePlugins(MimaPlugin) .configs(MultiJvm) .enablePlugins(MultiNodeScalaTest) @@ -520,6 +527,20 @@ lazy val discovery = akkaModule("akka-discovery") .settings(AutomaticModuleName.settings("akka.discovery")) .settings(OSGi.discovery) +lazy val coordination = akkaModule("akka-coordination") + .dependsOn( + actor, + testkit % "test->test", + actorTests % "test->test", + cluster % "test->test" + ) + .settings(Dependencies.coordination) + .settings(AutomaticModuleName.settings("akka.coordination")) + .settings(OSGi.coordination) + .settings(AkkaBuild.mayChangeSettings) + .disablePlugins(MimaPlugin) + + def akkaModule(name: String): Project = Project(id = name, base = file(name)) .enablePlugins(ReproducibleBuildsPlugin) @@ -541,7 +562,6 @@ addCommandAlias("allActor", commandValue(actor, Some(actorTests))) addCommandAlias("allRemote", commandValue(remote, Some(remoteTests))) addCommandAlias("allClusterCore", commandValue(cluster)) addCommandAlias("allClusterMetrics", commandValue(clusterMetrics)) -addCommandAlias("allDistributedData", commandValue(distributedData)) addCommandAlias("allClusterSharding", commandValue(clusterSharding)) addCommandAlias("allClusterTools", commandValue(clusterTools)) addCommandAlias("allCluster", Seq( @@ -549,6 +569,8 @@ addCommandAlias("allCluster", Seq( commandValue(distributedData), commandValue(clusterSharding), commandValue(clusterTools)).mkString) +addCommandAlias("allCoordination", commandValue(coordination)) +addCommandAlias("allDistributedData", commandValue(distributedData)) addCommandAlias("allPersistence", commandValue(persistence)) addCommandAlias("allStream", commandValue(stream, Some(streamTests))) addCommandAlias("allDiscovery", commandValue(discovery)) diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 4df8d13466..cbfc449c55 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -30,7 +30,7 @@ object AkkaBuild { parallelExecution in GlobalScope := System.getProperty("akka.parallelExecution", parallelExecutionByDefault.toString).toBoolean, version in ThisBuild := "2.5-SNAPSHOT" ) - + lazy val mayChangeSettings = Seq( description := """|This module of Akka is marked as |'may change', which means that it is in early diff --git a/project/AkkaDisciplinePlugin.scala b/project/AkkaDisciplinePlugin.scala index 8760bbec03..d4c9bf20fb 100644 --- a/project/AkkaDisciplinePlugin.scala +++ b/project/AkkaDisciplinePlugin.scala @@ -24,12 +24,14 @@ object AkkaDisciplinePlugin extends AutoPlugin with ScalafixSupport { val fatalWarningsFor = Set( "akka-discovery", "akka-distributed-data", - "akka-protobuf", + "akka-coordination", + "akka-protobuf" ) val strictProjects = Set( "akka-discovery", "akka-protobuf", + "akka-coordination" ) lazy val scalaFixSettings = Seq( diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 12aee07fad..0778ec7482 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -150,6 +150,8 @@ object Dependencies { val discovery = l ++= Seq(Test.junit, Test.scalatest.value) + val coordination = l ++= Seq(Test.junit, Test.scalatest.value) + val testkit = l ++= Seq(Test.junit, Test.scalatest.value) ++ Test.metricsAll val actorTests = l ++= Seq( diff --git a/project/OSGi.scala b/project/OSGi.scala index 95b3a292af..cf6e6b0c2a 100644 --- a/project/OSGi.scala +++ b/project/OSGi.scala @@ -133,6 +133,8 @@ object OSGi { val discovery = exports(Seq("akka.discovery.*")) + val coordination = exports(Seq("akka.coordination.*")) + val osgiOptionalImports = Seq( // needed because testkit is normally not used in the application bundle, // but it should still be included as transitive dependency and used by BundleDelegatingClassLoader