Lease API + use in cluster singleton and sharding, #26480 (#26629)

* lease api
* Cluster singleton manager with lease
* Refactor OldestData to use option for actor reference
* Sharding with lease
* Docs for singleton and sharding lease + config for sharding lease
* Have ddata shard wait until lease is acquired before getting state
This commit is contained in:
Christopher Batey 2019-03-28 13:24:46 +01:00 committed by Patrik Nordwall
parent 777173f988
commit 65ccada280
41 changed files with 2389 additions and 159 deletions

View file

@ -878,6 +878,7 @@ private[akka] class ActorSystemImpl(
"akka-cluster-sharding-typed",
"akka-cluster-tools",
"akka-cluster-typed",
"akka-coordination",
"akka-discovery",
"akka-distributed-data",
"akka-multi-node-testkit",

View file

@ -0,0 +1,4 @@
# Lease API #26468
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.DDataShard.initialized")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.PersistentShard.initialized")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard.initialized")

View file

@ -133,6 +133,7 @@ akka.cluster.sharding {
# Settings for the coordinator singleton. Same layout as akka.cluster.singleton.
# The "role" of the singleton configuration is not used. The singleton role will
# be the same as "akka.cluster.sharding.role".
# A lease can be configured in these settings for the coordinator singleton
coordinator-singleton = ${akka.cluster.singleton}
# Settings for the Distributed Data replicator.
@ -161,6 +162,14 @@ akka.cluster.sharding {
# This dispatcher for the entity actors is defined by the user provided
# Props, i.e. this dispatcher is not used for the entity actors.
use-dispatcher = ""
# Config path of the lease that each shard must acquire before starting entity actors
# default is no lease
# A lease can also be used for the singleton coordinator by settings it in the coordinator-singleton properties
use-lease = ""
# The interval between retries for acquiring the lease
lease-retry-interval = 5s
}
# //#sharding-ext-config

View file

@ -8,9 +8,9 @@ import scala.concurrent.duration._
import scala.concurrent.duration.FiniteDuration
import akka.actor.ActorSystem
import akka.actor.NoSerializationVerificationNeeded
import akka.annotation.InternalApi
import akka.annotation.{ ApiMayChange, InternalApi }
import com.typesafe.config.Config
import akka.cluster.Cluster
import akka.cluster.{ Cluster, ClusterLeaseSettings }
import akka.cluster.singleton.ClusterSingletonManagerSettings
import akka.util.JavaDurationConverters._
@ -59,6 +59,11 @@ object ClusterShardingSettings {
if (config.getString("passivate-idle-entity-after").toLowerCase == "off") Duration.Zero
else config.getDuration("passivate-idle-entity-after", MILLISECONDS).millis
val lease = config.getString("use-lease") match {
case s if s.isEmpty None
case other Some(new ClusterLeaseSettings(other, config.getDuration("lease-retry-interval").asScala))
}
new ClusterShardingSettings(
role = roleOption(config.getString("role")),
rememberEntities = config.getBoolean("remember-entities"),
@ -67,7 +72,8 @@ object ClusterShardingSettings {
stateStoreMode = config.getString("state-store-mode"),
passivateIdleEntityAfter = passivateIdleAfter,
tuningParameters,
coordinatorSingletonSettings)
coordinatorSingletonSettings,
lease)
}
/**
@ -213,9 +219,31 @@ final class ClusterShardingSettings(
val stateStoreMode: String,
val passivateIdleEntityAfter: FiniteDuration,
val tuningParameters: ClusterShardingSettings.TuningParameters,
val coordinatorSingletonSettings: ClusterSingletonManagerSettings)
val coordinatorSingletonSettings: ClusterSingletonManagerSettings,
val leaseSettings: Option[ClusterLeaseSettings])
extends NoSerializationVerificationNeeded {
// bin compat for 2.5.21
def this(
role: Option[String],
rememberEntities: Boolean,
journalPluginId: String,
snapshotPluginId: String,
stateStoreMode: String,
passivateIdleEntityAfter: FiniteDuration,
tuningParameters: ClusterShardingSettings.TuningParameters,
coordinatorSingletonSettings: ClusterSingletonManagerSettings) =
this(
role,
rememberEntities,
journalPluginId,
snapshotPluginId,
stateStoreMode,
passivateIdleEntityAfter,
tuningParameters,
coordinatorSingletonSettings,
None)
// included for binary compatibility reasons
@deprecated(
"Use the ClusterShardingSettings factory methods or the constructor including passivateIdleEntityAfter instead",
@ -273,6 +301,10 @@ final class ClusterShardingSettings(
def withPassivateIdleAfter(duration: java.time.Duration): ClusterShardingSettings =
copy(passivateIdleAfter = duration.asScala)
@ApiMayChange
def withLeaseSettings(leaseSettings: ClusterLeaseSettings): ClusterShardingSettings =
copy(leaseSettings = Some(leaseSettings))
/**
* The `role` of the `ClusterSingletonManagerSettings` is not used. The `role` of the
* coordinator singleton will be the same as the `role` of `ClusterShardingSettings`.
@ -289,8 +321,8 @@ final class ClusterShardingSettings(
stateStoreMode: String = stateStoreMode,
passivateIdleAfter: FiniteDuration = passivateIdleEntityAfter,
tuningParameters: ClusterShardingSettings.TuningParameters = tuningParameters,
coordinatorSingletonSettings: ClusterSingletonManagerSettings = coordinatorSingletonSettings)
: ClusterShardingSettings =
coordinatorSingletonSettings: ClusterSingletonManagerSettings = coordinatorSingletonSettings,
leaseSettings: Option[ClusterLeaseSettings] = leaseSettings): ClusterShardingSettings =
new ClusterShardingSettings(
role,
rememberEntities,
@ -299,5 +331,6 @@ final class ClusterShardingSettings(
stateStoreMode,
passivateIdleAfter,
tuningParameters,
coordinatorSingletonSettings)
coordinatorSingletonSettings,
leaseSettings)
}

View file

@ -6,28 +6,37 @@ package akka.cluster.sharding
import java.net.URLEncoder
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Deploy
import akka.actor.Props
import akka.actor.Terminated
import akka.actor.Actor
import akka.actor.{
Actor,
ActorLogging,
ActorRef,
ActorSystem,
DeadLetterSuppression,
Deploy,
NoSerializationVerificationNeeded,
Props,
Stash,
Terminated,
Timers
}
import akka.util.{ ConstantFun, MessageBufferMap }
import scala.concurrent.Future
import akka.cluster.Cluster
import akka.cluster.ddata.ORSet
import akka.cluster.ddata.ORSetKey
import akka.cluster.ddata.Replicator._
import akka.actor.Stash
import akka.persistence._
import akka.actor.NoSerializationVerificationNeeded
import akka.util.PrettyDuration._
import akka.coordination.lease.scaladsl.{ Lease, LeaseProvider }
import akka.pattern.pipe
import scala.concurrent.duration._
import akka.cluster.sharding.ShardRegion.ShardInitialized
/**
* INTERNAL API
*
* @see [[ClusterSharding$ ClusterSharding extension]]
*/
private[akka] object Shard {
@ -81,6 +90,12 @@ private[akka] object Shard {
@SerialVersionUID(1L) final case class ShardStats(shardId: ShardRegion.ShardId, entityCount: Int)
extends ClusterShardingSerializable
final case class LeaseAcquireResult(acquired: Boolean, reason: Option[Throwable]) extends DeadLetterSuppression
final case class LeaseLost(reason: Option[Throwable]) extends DeadLetterSuppression
final case object LeaseRetry extends DeadLetterSuppression
private val LeaseRetryTimer = "lease-retry"
object State {
val Empty = State()
}
@ -154,7 +169,8 @@ private[akka] class Shard(
extractShardId: ShardRegion.ExtractShardId,
handOffStopMessage: Any)
extends Actor
with ActorLogging {
with ActorLogging
with Timers {
import ShardRegion.{ handOffStopperProps, EntityId, Msg, Passivate, ShardInitialized }
import ShardCoordinator.Internal.{ HandOff, ShardStopped }
@ -180,15 +196,82 @@ private[akka] class Shard(
None
}
initialized()
val lease = settings.leaseSettings.map(
ls =>
LeaseProvider(context.system).getLease(
s"${context.system.name}-shard-$typeName-$shardId",
ls.leaseImplementation,
Cluster(context.system).selfAddress.hostPort))
def initialized(): Unit = context.parent ! ShardInitialized(shardId)
val leaseRetryInterval = settings.leaseSettings match {
case Some(l) => l.leaseRetryInterval
case None => 5.seconds // not used
}
override def preStart(): Unit = {
acquireLeaseIfNeeded()
}
/**
* Will call onLeaseAcquired when completed, also when lease isn't used
*/
def acquireLeaseIfNeeded(): Unit = {
lease match {
case Some(l) =>
tryGetLease(l)
context.become(awaitingLease())
case None =>
onLeaseAcquired()
}
}
// Override to execute logic once the lease has been acquired
// Will be called on the actor thread
def onLeaseAcquired(): Unit = {
log.debug("Shard initialized")
context.parent ! ShardInitialized(shardId)
context.become(receiveCommand)
}
private def tryGetLease(l: Lease) = {
log.info("Acquiring lease {}", l.settings)
pipe(l.acquire(reason => self ! LeaseLost(reason)).map(r => LeaseAcquireResult(r, None)).recover {
case t => LeaseAcquireResult(acquired = false, Some(t))
}).to(self)
}
def processChange[E <: StateChange](event: E)(handler: E => Unit): Unit =
handler(event)
def receive = receiveCommand
// Don't send back ShardInitialized so that messages are buffered in the ShardRegion
// while awaiting the lease
def awaitingLease(): Receive = {
case LeaseAcquireResult(true, _) =>
log.debug("Acquired lease")
onLeaseAcquired()
case LeaseAcquireResult(false, None) =>
log.error(
"Failed to get lease for shard type [{}] id [{}]. Retry in {}",
typeName,
shardId,
leaseRetryInterval.pretty)
timers.startSingleTimer(LeaseRetryTimer, LeaseRetry, leaseRetryInterval)
case LeaseAcquireResult(false, Some(t)) =>
log.error(
t,
"Failed to get lease for shard type [{}] id [{}]. Retry in {}",
typeName,
shardId,
leaseRetryInterval)
timers.startSingleTimer(LeaseRetryTimer, LeaseRetry, leaseRetryInterval)
case LeaseRetry =>
tryGetLease(lease.get)
case ll: LeaseLost =>
receiveLeaseLost(ll)
}
def receiveCommand: Receive = {
case Terminated(ref) => receiveTerminated(ref)
case msg: CoordinatorMessage => receiveCoordinatorMessage(msg)
@ -198,9 +281,17 @@ private[akka] class Shard(
case msg: ShardRegionCommand => receiveShardRegionCommand(msg)
case msg: ShardQuery => receiveShardQuery(msg)
case PassivateIdleTick => passivateIdleEntities()
case msg: LeaseLost => receiveLeaseLost(msg)
case msg if extractEntityId.isDefinedAt(msg) => deliverMessage(msg, sender())
}
def receiveLeaseLost(msg: LeaseLost): Unit = {
// The shard region will re-create this when it receives a message for this shard
log.error("Shard type [{}] id [{}] lease lost. Reason: {}", typeName, shardId, msg.reason)
// Stop entities ASAP rather than send termination message
context.stop(self)
}
def receiveShardCommand(msg: ShardCommand): Unit = msg match {
case RestartEntity(id) => getOrCreateEntity(id)
case RestartEntities(ids) => restartEntities(ids)
@ -558,15 +649,16 @@ private[akka] class PersistentShard(
import Shard._
import settings.tuningParameters._
override def preStart(): Unit = {
// override to not acquire the lease on start up, acquire after persistent recovery
}
override def persistenceId = s"/sharding/${typeName}Shard/$shardId"
override def journalPluginId: String = settings.journalPluginId
override def snapshotPluginId: String = settings.snapshotPluginId
// would be initialized after recovery completed
override def initialized(): Unit = {}
override def receive = receiveCommand
override def processChange[E <: StateChange](event: E)(handler: E => Unit): Unit = {
@ -586,11 +678,18 @@ private[akka] class PersistentShard(
case EntityStopped(id) => state = state.copy(state.entities - id)
case SnapshotOffer(_, snapshot: State) => state = snapshot
case RecoveryCompleted =>
restartRememberedEntities()
super.initialized()
acquireLeaseIfNeeded() // onLeaseAcquired called when completed
log.debug("PersistentShard recovery completed shard [{}] with [{}] entities", shardId, state.entities.size)
}
override def onLeaseAcquired(): Unit = {
log.debug("Shard initialized")
context.parent ! ShardInitialized(shardId)
context.become(receiveCommand)
restartRememberedEntities()
unstashAll()
}
override def receiveCommand: Receive =
({
case e: SaveSnapshotSuccess =>
@ -672,8 +771,11 @@ private[akka] class DDataShard(
stateKeys(i)
}
// get initial state from ddata replicator
override def onLeaseAcquired(): Unit = {
log.info("Lease Acquired. Getting state from DData")
getState()
context.become(waitingForState(Set.empty))
}
private def getState(): Unit = {
(0 until numberOfKeys).map { i =>
@ -681,18 +783,15 @@ private[akka] class DDataShard(
}
}
// would be initialized after recovery completed
override def initialized(): Unit = {}
override def receive = waitingForState(Set.empty)
// This state will stash all commands
private def waitingForState(gotKeys: Set[Int]): Receive = {
def receiveOne(i: Int): Unit = {
val newGotKeys = gotKeys + i
if (newGotKeys.size == numberOfKeys)
if (newGotKeys.size == numberOfKeys) {
recoveryCompleted()
else
} else
context.become(waitingForState(newGotKeys))
}
@ -718,11 +817,11 @@ private[akka] class DDataShard(
}
private def recoveryCompleted(): Unit = {
restartRememberedEntities()
super.initialized()
log.debug("DDataShard recovery completed shard [{}] with [{}] entities", shardId, state.entities.size)
unstashAll()
context.parent ! ShardInitialized(shardId)
context.become(receiveCommand)
restartRememberedEntities()
unstashAll()
}
override def processChange[E <: StateChange](event: E)(handler: E => Unit): Unit = {
@ -775,6 +874,7 @@ private[akka] class DDataShard(
evt)
throw cause
// TODO what can this actually be? We're unitialized in the ShardRegion
case _ => stash()
}

View file

@ -423,8 +423,9 @@ object ShardRegion {
/**
* INTERNAL API
*
* This actor creates child entity actors on demand for the shards that it is told to be
* responsible for. It delegates messages targeted to other shards to the responsible
* This actor creates children shard actors on demand that it is told to be responsible for.
* The shard actors in turn create entity actors on demand.
* It delegates messages targeted to other shards to the responsible
* `ShardRegion` actor on other nodes.
*
* @see [[ClusterSharding$ ClusterSharding extension]]

View file

@ -0,0 +1,135 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding
import akka.actor.Props
import akka.cluster.{ Cluster, MemberStatus, TestLease, TestLeaseExt }
import akka.testkit.TestActors.EchoActor
import akka.testkit.{ AkkaSpec, ImplicitSender }
import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Success
import scala.util.control.NoStackTrace
object ClusterShardingLeaseSpec {
val config = ConfigFactory.parseString("""
akka.loglevel = DEBUG
#akka.loggers = ["akka.testkit.SilenceAllTestEventListener"]
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
akka.cluster.sharding {
use-lease = "test-lease"
lease-retry-interval = 200ms
distributed-data.durable {
keys = []
}
}
""").withFallback(TestLease.config)
val persistenceConfig = ConfigFactory.parseString("""
akka.cluster.sharding {
state-store-mode = persistence
journal-plugin-id = "akka.persistence.journal.inmem"
}
""")
val ddataConfig = ConfigFactory.parseString("""
akka.cluster.sharding {
state-store-mode = ddata
}
""")
val extractEntityId: ShardRegion.ExtractEntityId = {
case msg: Int => (msg.toString, msg)
}
val extractShardId: ShardRegion.ExtractShardId = {
case msg: Int => (msg % 10).toString
}
case class LeaseFailed(msg: String) extends RuntimeException(msg) with NoStackTrace
}
class PersistenceClusterShardingLeaseSpec
extends ClusterShardingLeaseSpec(ClusterShardingLeaseSpec.persistenceConfig, true)
class DDataClusterShardingLeaseSpec extends ClusterShardingLeaseSpec(ClusterShardingLeaseSpec.ddataConfig, true)
class ClusterShardingLeaseSpec(config: Config, rememberEntities: Boolean)
extends AkkaSpec(config.withFallback(ClusterShardingLeaseSpec.config))
with ImplicitSender {
import ClusterShardingLeaseSpec._
def this() = this(ConfigFactory.empty(), false)
val shortDuration = 200.millis
val cluster = Cluster(system)
val leaseOwner = cluster.selfMember.address.hostPort
val testLeaseExt = TestLeaseExt(system)
override protected def atStartup(): Unit = {
cluster.join(cluster.selfAddress)
awaitAssert {
cluster.selfMember.status shouldEqual MemberStatus.Up
}
ClusterSharding(system).start(
typeName = typeName,
entityProps = Props[EchoActor],
settings = ClusterShardingSettings(system).withRememberEntities(rememberEntities),
extractEntityId = extractEntityId,
extractShardId = extractShardId)
}
def region = ClusterSharding(system).shardRegion(typeName)
val typeName = "echo"
def leaseForShard(shardId: Int) = awaitAssert {
testLeaseExt.getTestLease(leaseNameFor(shardId))
}
def leaseNameFor(shardId: Int, typeName: String = typeName): String =
s"${system.name}-shard-${typeName}-${shardId}"
"Cluster sharding with lease" should {
"not start until lease is acquired" in {
region ! 1
expectNoMessage(shortDuration)
val testLease = leaseForShard(1)
testLease.initialPromise.complete(Success(true))
expectMsg(1)
}
"retry if initial acquire is false" in {
region ! 2
expectNoMessage(shortDuration)
val testLease = leaseForShard(2)
testLease.initialPromise.complete(Success(false))
expectNoMessage(shortDuration)
testLease.setNextAcquireResult(Future.successful(true))
expectMsg(2)
}
"retry if initial acquire fails" in {
region ! 3
expectNoMessage(shortDuration)
val testLease = leaseForShard(3)
testLease.initialPromise.failure(LeaseFailed("oh no"))
expectNoMessage(shortDuration)
testLease.setNextAcquireResult(Future.successful(true))
expectMsg(3)
}
"recover if lease lost" in {
region ! 4
expectNoMessage(shortDuration)
val testLease = leaseForShard(4)
testLease.initialPromise.complete(Success(true))
expectMsg(4)
testLease.getCurrentCallback()(Option(LeaseFailed("oh dear")))
awaitAssert({
region ! 4
expectMsg(4)
}, max = 5.seconds)
}
}
}

View file

@ -0,0 +1,122 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.{ Actor, ActorLogging, PoisonPill, Props }
import akka.cluster.{ ClusterLeaseSettings, TestLeaseExt }
import akka.cluster.sharding.ShardRegion.ShardInitialized
import akka.testkit.{ AkkaSpec, ImplicitSender, TestProbe }
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Success
import scala.util.control.NoStackTrace
object ShardSpec {
val config =
"""
akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
test-lease {
lease-class = akka.cluster.TestLease
heartbeat-interval = 1s
heartbeat-timeout = 120s
lease-operation-timeout = 3s
}
"""
class EntityActor extends Actor with ActorLogging {
override def receive: Receive = {
case msg
log.info("Msg {}", msg)
sender() ! s"ack ${msg}"
}
}
val numberOfShards = 5
case class EntityEnvelope(entityId: Int, msg: Any)
val extractEntityId: ShardRegion.ExtractEntityId = {
case EntityEnvelope(id, payload) (id.toString, payload)
}
val extractShardId: ShardRegion.ExtractShardId = {
case EntityEnvelope(id, _) (id % numberOfShards).toString
}
case class BadLease(msg: String) extends RuntimeException(msg) with NoStackTrace
}
class ShardSpec extends AkkaSpec(ShardSpec.config) with ImplicitSender {
import ShardSpec._
val shortDuration = 100.millis
val testLeaseExt = TestLeaseExt(system)
def leaseNameForShard(typeName: String, shardId: String) = s"${system.name}-shard-${typeName}-${shardId}"
"A Cluster Shard" should {
"not initialize the shard until the lease is acquired" in new Setup {
parent.expectNoMessage(shortDuration)
lease.initialPromise.complete(Success(true))
parent.expectMsg(ShardInitialized(shardId))
}
"retry if lease acquire returns false" in new Setup {
lease.initialPromise.complete(Success(false))
parent.expectNoMessage(shortDuration)
lease.setNextAcquireResult(Future.successful(true))
parent.expectMsg(ShardInitialized(shardId))
}
"retry if the lease acquire fails" in new Setup {
lease.initialPromise.failure(BadLease("no lease for you"))
parent.expectNoMessage(shortDuration)
lease.setNextAcquireResult(Future.successful(true))
parent.expectMsg(ShardInitialized(shardId))
}
"shutdown if lease is lost" in new Setup {
val probe = TestProbe()
probe.watch(shard)
lease.initialPromise.complete(Success(true))
parent.expectMsg(ShardInitialized(shardId))
lease.getCurrentCallback().apply(Some(BadLease("bye bye lease")))
probe.expectTerminated(shard)
}
}
val shardIds = new AtomicInteger(0)
def nextShardId = s"${shardIds.getAndIncrement()}"
trait Setup {
val shardId = nextShardId
val parent = TestProbe()
val settings = ClusterShardingSettings(system).withLeaseSettings(new ClusterLeaseSettings("test-lease", 2.seconds))
def lease = awaitAssert {
testLeaseExt.getTestLease(leaseNameForShard(typeName, shardId))
}
val typeName = "type1"
val shard = parent.childActorOf(
Shard.props(
typeName,
shardId,
_ Props(new EntityActor()),
settings,
extractEntityId,
extractShardId,
PoisonPill,
system.deadLetters,
1))
}
}

View file

@ -0,0 +1,5 @@
# Lease API #26468
ProblemFilters.exclude[Problem]("akka.cluster.singleton.ClusterSingletonManager#Internal*")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.singleton.ClusterSingletonManager.gotoOldest")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.singleton.ClusterSingletonManager.gotoHandingOver")
ProblemFilters.exclude[Problem]("akka.cluster.singleton.ClusterSingletonManager$Internal$*")

View file

@ -186,6 +186,14 @@ akka.cluster.singleton {
# it will not be a quicker hand over by reducing this value, but in extreme failure scenarios
# the recovery might be faster.
min-number-of-hand-over-retries = 15
# Config path of the lease to be taken before creating the singleton actor
# if the lease is lost then the actor is restarted and it will need to re-acquire the lease
# the default is no lease
use-lease = ""
# The interval between retries for acquiring the lease
lease-retry-interval = 5s
}
# //#singleton-config

View file

@ -0,0 +1,18 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster
import akka.annotation.ApiMayChange
import scala.concurrent.duration.FiniteDuration
import akka.util.JavaDurationConverters._
import akka.util.PrettyDuration._
@ApiMayChange
class ClusterLeaseSettings private[akka] (val leaseImplementation: String, val leaseRetryInterval: FiniteDuration) {
def getLeaseRetryInterval(): java.time.Duration = leaseRetryInterval.asJava
override def toString = s"ClusterLeaseSettings($leaseImplementation, ${leaseRetryInterval.pretty})"
}

View file

@ -5,10 +5,10 @@
package akka.cluster.singleton
import com.typesafe.config.Config
import scala.concurrent.duration._
import scala.collection.immutable
import scala.concurrent.Future
import akka.actor.Actor
import akka.actor.Deploy
import akka.actor.ActorSystem
@ -19,22 +19,22 @@ import akka.actor.DeadLetterSuppression
import akka.actor.FSM
import akka.actor.Props
import akka.actor.Terminated
import akka.cluster.Cluster
import akka.cluster._
import akka.cluster.ClusterEvent._
import akka.cluster.Member
import akka.cluster.MemberStatus
import akka.AkkaException
import akka.actor.NoSerializationVerificationNeeded
import akka.cluster.UniqueAddress
import akka.cluster.ClusterEvent
import scala.concurrent.Promise
import akka.pattern.pipe
import akka.util.JavaDurationConverters._
import scala.concurrent.Promise
import akka.Done
import akka.actor.CoordinatedShutdown
import akka.annotation.DoNotInherit
import akka.annotation.{ ApiMayChange, DoNotInherit }
import akka.pattern.ask
import akka.util.Timeout
import akka.cluster.ClusterSettings
import akka.coordination.lease.scaladsl.{ Lease, LeaseProvider }
import scala.util.control.NonFatal
object ClusterSingletonManagerSettings {
@ -52,12 +52,19 @@ object ClusterSingletonManagerSettings {
* Create settings from a configuration with the same layout as
* the default configuration `akka.cluster.singleton`.
*/
def apply(config: Config): ClusterSingletonManagerSettings =
def apply(config: Config): ClusterSingletonManagerSettings = {
val lease = config.getString("use-lease") match {
case s if s.isEmpty None
case leaseConfigPath =>
Some(new ClusterLeaseSettings(leaseConfigPath, config.getDuration("lease-retry-interval").asScala))
}
new ClusterSingletonManagerSettings(
singletonName = config.getString("singleton-name"),
role = roleOption(config.getString("role")),
removalMargin = Duration.Zero, // defaults to ClusterSettins.DownRemovalMargin
handOverRetryInterval = config.getDuration("hand-over-retry-interval", MILLISECONDS).millis)
removalMargin = Duration.Zero, // defaults to ClusterSettings.DownRemovalMargin
handOverRetryInterval = config.getDuration("hand-over-retry-interval", MILLISECONDS).millis,
lease)
}
/**
* Java API: Create settings from the default configuration
@ -98,14 +105,25 @@ object ClusterSingletonManagerSettings {
* retried with this interval until the previous oldest confirms that the hand
* over has started or the previous oldest member is removed from the cluster
* (+ `removalMargin`).
*
* @param leaseSettings LeaseSettings for acquiring before creating the singleton actor
*/
final class ClusterSingletonManagerSettings(
val singletonName: String,
val role: Option[String],
val removalMargin: FiniteDuration,
val handOverRetryInterval: FiniteDuration)
val handOverRetryInterval: FiniteDuration,
val leaseSettings: Option[ClusterLeaseSettings])
extends NoSerializationVerificationNeeded {
// bin compat for akka 2.5.21
def this(
singletonName: String,
role: Option[String],
removalMargin: FiniteDuration,
handOverRetryInterval: FiniteDuration) =
this(singletonName, role, removalMargin, handOverRetryInterval, None)
def withSingletonName(name: String): ClusterSingletonManagerSettings = copy(singletonName = name)
def withRole(role: String): ClusterSingletonManagerSettings =
@ -119,12 +137,16 @@ final class ClusterSingletonManagerSettings(
def withHandOverRetryInterval(retryInterval: FiniteDuration): ClusterSingletonManagerSettings =
copy(handOverRetryInterval = retryInterval)
def withLeaseSettings(leaseSettings: ClusterLeaseSettings): ClusterSingletonManagerSettings =
copy(leaseSettings = Some(leaseSettings))
private def copy(
singletonName: String = singletonName,
role: Option[String] = role,
removalMargin: FiniteDuration = removalMargin,
handOverRetryInterval: FiniteDuration = handOverRetryInterval): ClusterSingletonManagerSettings =
new ClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval)
handOverRetryInterval: FiniteDuration = handOverRetryInterval,
leaseSettings: Option[ClusterLeaseSettings] = leaseSettings): ClusterSingletonManagerSettings =
new ClusterSingletonManagerSettings(singletonName, role, removalMargin, handOverRetryInterval, leaseSettings)
}
/**
@ -189,10 +211,12 @@ object ClusterSingletonManager {
final case class HandOverRetry(count: Int)
final case class TakeOverRetry(count: Int)
final case object LeaseRetry
case object Cleanup
case object StartOldestChangedBuffer
case object Start extends State
case object AcquiringLease extends State
case object Oldest extends State
case object Younger extends State
case object BecomingOldest extends State
@ -205,21 +229,19 @@ object ClusterSingletonManager {
case object Uninitialized extends Data
final case class YoungerData(oldestOption: Option[UniqueAddress]) extends Data
final case class BecomingOldestData(previousOldestOption: Option[UniqueAddress]) extends Data
final case class OldestData(singleton: ActorRef, singletonTerminated: Boolean = false) extends Data
final case class WasOldestData(
singleton: ActorRef,
singletonTerminated: Boolean,
newOldestOption: Option[UniqueAddress])
extends Data
final case class OldestData(singleton: Option[ActorRef]) extends Data
final case class WasOldestData(singleton: Option[ActorRef], newOldestOption: Option[UniqueAddress]) extends Data
final case class HandingOverData(singleton: ActorRef, handOverTo: Option[ActorRef]) extends Data
final case class StoppingData(singleton: ActorRef) extends Data
case object EndData extends Data
final case class DelayedMemberRemoved(member: Member)
case object SelfExiting
case class AcquiringLeaseData(leaseRequestInProgress: Boolean, singleton: Option[ActorRef]) extends Data
val HandOverRetryTimer = "hand-over-retry"
val TakeOverRetryTimer = "take-over-retry"
val CleanupTimer = "cleanup"
val LeaseRetryTimer = "lease-retry"
object OldestChangedBuffer {
@ -236,8 +258,14 @@ object ClusterSingletonManager {
final case class OldestChanged(oldest: Option[UniqueAddress])
}
final case class AcquireLeaseResult(holdingLease: Boolean) extends DeadLetterSuppression
final case class ReleaseLeaseResult(released: Boolean) extends DeadLetterSuppression
final case class AcquireLeaseFailure(t: Throwable) extends DeadLetterSuppression
final case class ReleaseLeaseFailure(t: Throwable) extends DeadLetterSuppression
final case class LeaseLost(reason: Option[Throwable]) extends DeadLetterSuppression
/**
* Notifications of member events that track oldest member is tunneled
* Notifications of member events that track oldest member are tunneled
* via this actor (child of ClusterSingletonManager) to be able to deliver
* one change at a time. Avoiding simultaneous changes simplifies
* the process in ClusterSingletonManager. ClusterSingletonManager requests
@ -457,6 +485,17 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
role.forall(cluster.selfRoles.contains),
s"This cluster member [${cluster.selfAddress}] doesn't have the role [$role]")
private val singletonLeaseName = s"${context.system.name}-singleton-${self.path}"
val lease: Option[Lease] = settings.leaseSettings.map(
settings =>
LeaseProvider(context.system)
.getLease(singletonLeaseName, settings.leaseImplementation, cluster.selfAddress.hostPort))
val leaseRetryInterval: FiniteDuration = settings.leaseSettings match {
case Some(s) => s.leaseRetryInterval
case None => 5.seconds // won't be used
}
val removalMargin =
if (settings.removalMargin <= Duration.Zero) cluster.downingProvider.downRemovalMargin
else settings.removalMargin
@ -515,6 +554,9 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
def logInfo(template: String, arg1: Any, arg2: Any): Unit =
if (LogInfo) log.info(template, arg1, arg2)
def logInfo(template: String, arg1: Any, arg2: Any, arg3: Any): Unit =
if (LogInfo) log.info(template, arg1, arg2, arg3)
override def preStart(): Unit = {
super.preStart()
require(!cluster.isTerminated, "Cluster node must not be terminated")
@ -557,7 +599,7 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
oldestChangedReceived = true
if (oldestOption == selfUniqueAddressOption && safeToBeOldest)
// oldest immediately
gotoOldest()
tryGoToOldest()
else if (oldestOption == selfUniqueAddressOption)
goto(BecomingOldest).using(BecomingOldestData(None))
else
@ -570,8 +612,8 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
if (oldestOption == selfUniqueAddressOption) {
logInfo("Younger observed OldestChanged: [{} -> myself]", previousOldestOption.map(_.address))
previousOldestOption match {
case None => gotoOldest()
case Some(prev) if removed.contains(prev) => gotoOldest()
case None => tryGoToOldest()
case Some(prev) if removed.contains(prev) => tryGoToOldest()
case Some(prev) =>
peer(prev.address) ! HandOverToMe
goto(BecomingOldest).using(BecomingOldestData(previousOldestOption))
@ -620,7 +662,7 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
case Event(HandOverDone, BecomingOldestData(Some(previousOldest))) =>
if (sender().path.address == previousOldest.address)
gotoOldest()
tryGoToOldest()
else {
logInfo(
"Ignoring HandOverDone in BecomingOldest from [{}]. Expected previous oldest [{}]",
@ -645,7 +687,7 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
if m.uniqueAddress == previousOldest =>
logInfo("Previous oldest [{}] removed", previousOldest.address)
addRemoved(m.uniqueAddress)
gotoOldest()
tryGoToOldest()
case Event(TakeOverFromMe, BecomingOldestData(previousOldestOption)) =>
val senderAddress = sender().path.address
@ -682,7 +724,7 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
// can't send HandOverToMe, previousOldest unknown for new node (or restart)
// previous oldest might be down or removed, so no TakeOverFromMe message is received
logInfo("Timeout in BecomingOldest. Previous oldest unknown, removed and no TakeOver request.")
gotoOldest()
tryGoToOldest()
} else if (cluster.isTerminated)
stop()
else
@ -698,16 +740,75 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
self ! DelayedMemberRemoved(m)
}
def gotoOldest(): State = {
val singleton = context.watch(context.actorOf(singletonProps, singletonName))
logInfo("Singleton manager starting singleton actor [{}]", singleton.path)
goto(Oldest).using(OldestData(singleton))
def tryAcquireLease() = {
import context.dispatcher
pipe(lease.get.acquire(reason => self ! LeaseLost(reason)).map[Any](AcquireLeaseResult).recover {
case NonFatal(t) => AcquireLeaseFailure(t)
}).to(self)
goto(AcquiringLease).using(AcquiringLeaseData(leaseRequestInProgress = true, None))
}
when(Oldest) {
case Event(OldestChanged(oldestOption), OldestData(singleton, singletonTerminated)) =>
// Try and go to oldest, taking the lease if needed
def tryGoToOldest(): State = {
// check if lease
lease match {
case None =>
goToOldest()
case Some(_) =>
logInfo("Trying to acquire lease before starting singleton")
tryAcquireLease()
}
}
when(AcquiringLease) {
case Event(AcquireLeaseResult(result), _) =>
logInfo("Acquire lease result {}", result)
if (result) {
goToOldest()
} else {
setTimer(LeaseRetryTimer, LeaseRetry, leaseRetryInterval)
stay.using(AcquiringLeaseData(leaseRequestInProgress = false, None))
}
case Event(Terminated(ref), AcquiringLeaseData(_, Some(singleton))) if ref == singleton =>
logInfo("Singleton actor terminated. Trying to acquire lease again before re-creating.")
// tryAcquireLease sets the state to None for singleton actor
tryAcquireLease()
case Event(AcquireLeaseFailure(t), _) =>
log.error(t, "failed to get lease (will be retried)")
setTimer(LeaseRetryTimer, LeaseRetry, leaseRetryInterval)
stay.using(AcquiringLeaseData(leaseRequestInProgress = false, None))
case Event(LeaseRetry, _) =>
// If lease was lost (so previous state was oldest) then we don't try and get the lease
// until the old singleton instance has been terminated so we know there isn't an
// instance in this case
tryAcquireLease()
case Event(OldestChanged(oldestOption), AcquiringLeaseData(_, singleton)) =>
handleOldestChanged(singleton, oldestOption)
case Event(HandOverToMe, AcquiringLeaseData(_, singleton)) =>
gotoHandingOver(singleton, Some(sender()))
case Event(TakeOverFromMe, _) =>
// already oldest, so confirm and continue like that
sender() ! HandOverToMe
stay
case Event(SelfExiting, _) =>
selfMemberExited()
// complete memberExitingProgress when handOverDone
sender() ! Done // reply to ask
stay
case Event(MemberDowned(m), _) if m.uniqueAddress == cluster.selfUniqueAddress =>
logInfo("Self downed, stopping ClusterSingletonManager")
stop()
}
def goToOldest(): State = {
val singleton = context.watch(context.actorOf(singletonProps, singletonName))
logInfo("Singleton manager starting singleton actor [{}]", singleton.path)
goto(Oldest).using(OldestData(Some(singleton)))
}
def handleOldestChanged(singleton: Option[ActorRef], oldestOption: Option[UniqueAddress]) = {
oldestChangedReceived = true
logInfo("Oldest observed OldestChanged: [{} -> {}]", cluster.selfAddress, oldestOption.map(_.address))
logInfo("{} observed OldestChanged: [{} -> {}]", stateName, cluster.selfAddress, oldestOption.map(_.address))
oldestOption match {
case Some(a) if a == cluster.selfUniqueAddress =>
// already oldest
@ -716,29 +817,32 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
// The member removal was not completed and the old removed node is considered
// oldest again. Safest is to terminate the singleton instance and goto Younger.
// This node will become oldest again when the other is removed again.
gotoHandingOver(singleton, singletonTerminated, None)
gotoHandingOver(singleton, None)
case Some(a) =>
// send TakeOver request in case the new oldest doesn't know previous oldest
peer(a.address) ! TakeOverFromMe
setTimer(TakeOverRetryTimer, TakeOverRetry(1), handOverRetryInterval, repeat = false)
goto(WasOldest).using(WasOldestData(singleton, singletonTerminated, newOldestOption = Some(a)))
goto(WasOldest).using(WasOldestData(singleton, newOldestOption = Some(a)))
case None =>
// new oldest will initiate the hand-over
setTimer(TakeOverRetryTimer, TakeOverRetry(1), handOverRetryInterval, repeat = false)
goto(WasOldest).using(WasOldestData(singleton, singletonTerminated, newOldestOption = None))
goto(WasOldest).using(WasOldestData(singleton, newOldestOption = None))
}
}
case Event(HandOverToMe, OldestData(singleton, singletonTerminated)) =>
gotoHandingOver(singleton, singletonTerminated, Some(sender()))
when(Oldest) {
case Event(OldestChanged(oldestOption), OldestData(singleton)) =>
handleOldestChanged(singleton, oldestOption)
case Event(HandOverToMe, OldestData(singleton)) =>
gotoHandingOver(singleton, Some(sender()))
case Event(TakeOverFromMe, _) =>
// already oldest, so confirm and continue like that
sender() ! HandOverToMe
stay
case Event(Terminated(ref), d @ OldestData(singleton, _)) if ref == singleton =>
case Event(Terminated(ref), d @ OldestData(Some(singleton))) if ref == singleton =>
logInfo("Singleton actor [{}] was terminated", singleton.path)
stay.using(d.copy(singletonTerminated = true))
stay.using(d.copy(singleton = None))
case Event(SelfExiting, _) =>
selfMemberExited()
@ -746,22 +850,34 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
sender() ! Done // reply to ask
stay
case Event(MemberDowned(m), OldestData(singleton, singletonTerminated))
if m.uniqueAddress == cluster.selfUniqueAddress =>
if (singletonTerminated) {
case Event(MemberDowned(m), OldestData(singleton)) if m.uniqueAddress == cluster.selfUniqueAddress =>
singleton match {
case Some(s) =>
logInfo("Self downed, stopping")
gotoStopping(s)
case None =>
logInfo("Self downed, stopping ClusterSingletonManager")
stop()
} else {
logInfo("Self downed, stopping")
gotoStopping(singleton)
}
case Event(LeaseLost(reason), OldestData(singleton)) =>
log.warning("Lease has been lost. Reason: {}. Terminating singleton and trying to re-acquire lease", reason)
singleton match {
case Some(s) =>
s ! terminationMessage
goto(AcquiringLease).using(AcquiringLeaseData(leaseRequestInProgress = false, singleton))
case None =>
tryAcquireLease()
}
}
when(WasOldest) {
case Event(TakeOverRetry(count), WasOldestData(singleton, singletonTerminated, newOldestOption)) =>
case Event(TakeOverRetry(count), WasOldestData(singleton, newOldestOption)) =>
if ((cluster.isTerminated || selfExited) && (newOldestOption.isEmpty || count > maxTakeOverRetries)) {
if (singletonTerminated) stop()
else gotoStopping(singleton)
singleton match {
case Some(s) => gotoStopping(s)
case None => stop()
}
} else if (count <= maxTakeOverRetries) {
if (maxTakeOverRetries - count <= 3)
logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newOldestOption.map(_.address))
@ -773,21 +889,20 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
} else
throw new ClusterSingletonManagerIsStuck(s"Expected hand-over to [$newOldestOption] never occurred")
case Event(HandOverToMe, WasOldestData(singleton, singletonTerminated, _)) =>
gotoHandingOver(singleton, singletonTerminated, Some(sender()))
case Event(HandOverToMe, WasOldestData(singleton, _)) =>
gotoHandingOver(singleton, Some(sender()))
case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress && !selfExited =>
logInfo("Self removed, stopping ClusterSingletonManager")
stop()
case Event(MemberRemoved(m, _), WasOldestData(singleton, singletonTerminated, Some(newOldest)))
case Event(MemberRemoved(m, _), WasOldestData(singleton, Some(newOldest)))
if !selfExited && m.uniqueAddress == newOldest =>
addRemoved(m.uniqueAddress)
gotoHandingOver(singleton, singletonTerminated, None)
gotoHandingOver(singleton, None)
case Event(Terminated(ref), d @ WasOldestData(singleton, _, _)) if ref == singleton =>
logInfo("Singleton actor [{}] was terminated", singleton.path)
stay.using(d.copy(singletonTerminated = true))
case Event(Terminated(ref), d @ WasOldestData(singleton, _)) if singleton.contains(ref) =>
logInfo("Singleton actor [{}] was terminated", ref.path)
stay.using(d.copy(singleton = None))
case Event(SelfExiting, _) =>
selfMemberExited()
@ -795,34 +910,34 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
sender() ! Done // reply to ask
stay
case Event(MemberDowned(m), OldestData(singleton, singletonTerminated))
if m.uniqueAddress == cluster.selfUniqueAddress =>
if (singletonTerminated) {
case Event(MemberDowned(m), WasOldestData(singleton, _)) if m.uniqueAddress == cluster.selfUniqueAddress =>
singleton match {
case None =>
logInfo("Self downed, stopping ClusterSingletonManager")
stop()
} else {
case Some(s) =>
logInfo("Self downed, stopping")
gotoStopping(singleton)
gotoStopping(s)
}
}
}
def gotoHandingOver(singleton: ActorRef, singletonTerminated: Boolean, handOverTo: Option[ActorRef]): State = {
if (singletonTerminated) {
def gotoHandingOver(singleton: Option[ActorRef], handOverTo: Option[ActorRef]): State = {
singleton match {
case None =>
handOverDone(handOverTo)
} else {
case Some(s) =>
handOverTo.foreach { _ ! HandOverInProgress }
logInfo("Singleton manager stopping singleton actor [{}]", singleton.path)
singleton ! terminationMessage
goto(HandingOver).using(HandingOverData(singleton, handOverTo))
logInfo("Singleton manager stopping singleton actor [{}]", s.path)
s ! terminationMessage
goto(HandingOver).using(HandingOverData(s, handOverTo))
}
}
when(HandingOver) {
case (Event(Terminated(ref), HandingOverData(singleton, handOverTo))) if ref == singleton =>
case Event(Terminated(ref), HandingOverData(singleton, handOverTo)) if ref == singleton =>
handOverDone(handOverTo)
case Event(HandOverToMe, HandingOverData(singleton, handOverTo)) if handOverTo == Some(sender()) =>
case Event(HandOverToMe, HandingOverData(_, handOverTo)) if handOverTo.contains(sender()) =>
// retry
sender() ! HandOverInProgress
stay
@ -855,7 +970,7 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
}
when(Stopping) {
case (Event(Terminated(ref), StoppingData(singleton))) if ref == singleton =>
case Event(Terminated(ref), StoppingData(singleton)) if ref == singleton =>
logInfo("Singleton actor [{}] was terminated", singleton.path)
stop()
}
@ -901,6 +1016,20 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
if (m.uniqueAddress == cluster.selfUniqueAddress)
logInfo("Self downed, waiting for removal")
stay
case Event(ReleaseLeaseFailure(t), _) =>
log.error(
t,
"Failed to release lease. Singleton may not be able to run on another node until lease timeout occurs")
stay
case Event(ReleaseLeaseResult(released), _) =>
if (released) {
logInfo("Lease released")
} else {
// TODO we could retry
log.error(
"Failed to release lease. Singleton may not be able to run on another node until lease timeout occurs")
}
stay
}
onTransition {
@ -916,6 +1045,29 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
case WasOldest -> _ => cancelTimer(TakeOverRetryTimer)
}
onTransition {
case (AcquiringLease, to) if to != Oldest =>
stateData match {
case AcquiringLeaseData(true, _) =>
logInfo("Releasing lease as leaving AcquiringLease going to [{}]", to)
import context.dispatcher
lease.foreach(l =>
pipe(l.release().map[Any](ReleaseLeaseResult).recover {
case t => ReleaseLeaseFailure(t)
}).to(self))
case _ =>
}
}
onTransition {
case Oldest -> _ =>
lease.foreach { l =>
logInfo("Releasing lease as leaving Oldest")
import context.dispatcher
pipe(l.release().map(ReleaseLeaseResult)).to(self)
}
}
onTransition {
case _ -> (Younger | Oldest) => getNextOldestChanged()
}

View file

@ -0,0 +1,116 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.Future
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.actor.Props
import akka.cluster.TestLeaseActor.{ Acquire, Create, Release }
import akka.event.Logging
import akka.coordination.lease.LeaseSettings
import akka.coordination.lease.scaladsl.Lease
import akka.pattern.ask
import akka.util.Timeout
object TestLeaseActor {
def props(probe: ActorRef): Props =
Props(new TestLeaseActor(probe))
sealed trait LeaseRequest
final case class Acquire(owner: String) extends LeaseRequest
final case class Release(owner: String) extends LeaseRequest
final case class Create(leaseName: String, ownerName: String)
final case object GetRequests
final case class LeaseRequests(requests: List[LeaseRequest])
final case class ActionRequest(request: LeaseRequest, result: Any) // boolean of Failure
}
class TestLeaseActor(probe: ActorRef) extends Actor with ActorLogging {
import TestLeaseActor._
var requests: List[(ActorRef, LeaseRequest)] = Nil
override def receive = {
case c: Create
log.info("Lease created with name {} ownerName {}", c.leaseName, c.ownerName)
case request: LeaseRequest
log.info("Lease request {} from {}", request, sender())
requests = (sender(), request) :: requests
case GetRequests
sender() ! LeaseRequests(requests.map(_._2))
case ActionRequest(request, result)
requests.find(_._2 == request) match {
case Some((snd, req))
log.info("Actioning request {} to {}", req, result)
snd ! result
requests = requests.filterNot(_._2 == request)
case None
throw new RuntimeException(s"unknown request to action: ${request}. Requests: ${requests}")
}
}
}
object TestLeaseActorClientExt extends ExtensionId[TestLeaseActorClientExt] with ExtensionIdProvider {
override def get(system: ActorSystem): TestLeaseActorClientExt = super.get(system)
override def lookup = TestLeaseActorClientExt
override def createExtension(system: ExtendedActorSystem): TestLeaseActorClientExt =
new TestLeaseActorClientExt(system)
}
class TestLeaseActorClientExt(val system: ExtendedActorSystem) extends Extension {
private val leaseActor = new AtomicReference[ActorRef]()
def getLeaseActor(): ActorRef = {
val lease = leaseActor.get
if (lease == null) throw new IllegalStateException("LeaseActorRef must be set first")
lease
}
def setActorLease(client: ActorRef): Unit =
leaseActor.set(client)
}
class TestLeaseActorClient(settings: LeaseSettings, system: ExtendedActorSystem) extends Lease(settings) {
private val log = Logging(system, getClass)
val leaseActor = TestLeaseActorClientExt(system).getLeaseActor()
log.info("lease created {}", settings)
leaseActor ! Create(settings.leaseName, settings.ownerName)
private implicit val timeout = Timeout(100.seconds)
override def acquire(): Future[Boolean] = {
(leaseActor ? Acquire(settings.ownerName)).mapTo[Boolean]
}
override def release(): Future[Boolean] = {
(leaseActor ? Release(settings.ownerName)).mapTo[Boolean]
}
override def checkLease(): Boolean = false
override def acquire(callback: Option[Throwable] Unit): Future[Boolean] =
(leaseActor ? Acquire(settings.ownerName)).mapTo[Boolean]
}

View file

@ -0,0 +1,216 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.singleton
import akka.actor.{ Actor, ActorIdentity, ActorLogging, ActorRef, Address, Identify, PoisonPill, Props }
import akka.cluster.MemberStatus.Up
import akka.cluster.TestLeaseActor._
import akka.cluster.singleton.ClusterSingletonManagerLeaseSpec.ImportantSingleton.Response
import akka.cluster._
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec }
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.testkit._
import com.typesafe.config.ConfigFactory
import scala.language.postfixOps
import scala.concurrent.duration._
object ClusterSingletonManagerLeaseSpec extends MultiNodeConfig {
val controller = role("controller")
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
testTransport(true)
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
test-lease {
lease-class = akka.cluster.TestLeaseActorClient
heartbeat-interval = 1s
heartbeat-timeout = 120s
lease-operation-timeout = 3s
}
akka.cluster.singleton {
use-lease = "test-lease"
}
"""))
nodeConfig(first, second, third)(ConfigFactory.parseString("akka.cluster.roles = [worker]"))
object ImportantSingleton {
case class Response(msg: Any, address: Address)
def props(): Props = Props(new ImportantSingleton())
}
class ImportantSingleton extends Actor with ActorLogging {
val selfAddress = Cluster(context.system).selfAddress
override def preStart(): Unit = {
log.info("Singleton starting")
}
override def postStop(): Unit = {
log.info("Singleton stopping")
}
override def receive: Receive = {
case msg
sender() ! Response(msg, selfAddress)
}
}
}
class ClusterSingletonManagerLeaseMultiJvmNode1 extends ClusterSingletonManagerLeaseSpec
class ClusterSingletonManagerLeaseMultiJvmNode2 extends ClusterSingletonManagerLeaseSpec
class ClusterSingletonManagerLeaseMultiJvmNode3 extends ClusterSingletonManagerLeaseSpec
class ClusterSingletonManagerLeaseMultiJvmNode4 extends ClusterSingletonManagerLeaseSpec
class ClusterSingletonManagerLeaseMultiJvmNode5 extends ClusterSingletonManagerLeaseSpec
class ClusterSingletonManagerLeaseSpec
extends MultiNodeSpec(ClusterSingletonManagerLeaseSpec)
with STMultiNodeSpec
with ImplicitSender
with MultiNodeClusterSpec {
import ClusterSingletonManagerLeaseSpec.ImportantSingleton._
import ClusterSingletonManagerLeaseSpec._
override def initialParticipants = roles.size
// used on the controller
val leaseProbe = TestProbe()
"Cluster singleton manager with lease" should {
"form a cluster" in {
awaitClusterUp(controller, first)
enterBarrier("initial-up")
runOn(second) {
joinWithin(first)
awaitAssert({
cluster.state.members.toList.map(_.status) shouldEqual List(Up, Up, Up)
}, 10.seconds)
}
enterBarrier("second-up")
runOn(third) {
joinWithin(first)
awaitAssert({
cluster.state.members.toList.map(_.status) shouldEqual List(Up, Up, Up, Up)
}, 10.seconds)
}
enterBarrier("third-up")
runOn(fourth) {
joinWithin(first)
awaitAssert({
cluster.state.members.toList.map(_.status) shouldEqual List(Up, Up, Up, Up, Up)
}, 10.seconds)
}
enterBarrier("fourth-up")
}
"start test lease" in {
runOn(controller) {
system.actorOf(TestLeaseActor.props(leaseProbe.ref), s"lease-${system.name}")
}
enterBarrier("lease-actor-started")
}
"find the lease on every node" in {
system.actorSelection(node(controller) / "user" / s"lease-${system.name}") ! Identify(None)
val leaseRef: ActorRef = expectMsgType[ActorIdentity].ref.get
TestLeaseActorClientExt(system).setActorLease(leaseRef)
enterBarrier("singleton-started")
}
"Start singleton and ping from all nodes" in {
runOn(first, second, third, fourth) {
system.actorOf(
ClusterSingletonManager
.props(props(), PoisonPill, ClusterSingletonManagerSettings(system).withRole("worker")),
"important")
}
enterBarrier("singleton-started")
val proxy = system.actorOf(
ClusterSingletonProxy.props(
singletonManagerPath = "/user/important",
settings = ClusterSingletonProxySettings(system).withRole("worker")))
runOn(first, second, third, fourth) {
proxy ! "Ping"
// lease has not been granted so now allowed to come up
expectNoMessage(2.seconds)
}
enterBarrier("singleton-pending")
runOn(controller) {
TestLeaseActorClientExt(system).getLeaseActor() ! GetRequests
expectMsg(LeaseRequests(List(Acquire(address(first).hostPort))))
TestLeaseActorClientExt(system).getLeaseActor() ! ActionRequest(Acquire(address(first).hostPort), true)
}
enterBarrier("lease-acquired")
runOn(first, second, third, fourth) {
expectMsg(Response("Ping", address(first)))
}
enterBarrier("pinged")
}
"Move singleton when oldest node downed" in {
cluster.state.members.size shouldEqual 5
runOn(controller) {
cluster.down(address(first))
awaitAssert({
cluster.state.members.toList.map(_.status) shouldEqual List(Up, Up, Up, Up)
}, 20.seconds)
val requests = awaitAssert({
TestLeaseActorClientExt(system).getLeaseActor() ! GetRequests
val msg = expectMsgType[LeaseRequests]
withClue("Requests: " + msg) {
msg.requests.size shouldEqual 2
}
msg
}, 10.seconds)
requests.requests should contain(Release(address(first).hostPort))
requests.requests should contain(Acquire(address(second).hostPort))
}
runOn(second, third, fourth) {
awaitAssert({
cluster.state.members.toList.map(_.status) shouldEqual List(Up, Up, Up, Up)
}, 20.seconds)
}
enterBarrier("first node downed")
val proxy = system.actorOf(
ClusterSingletonProxy.props(
singletonManagerPath = "/user/important",
settings = ClusterSingletonProxySettings(system).withRole("worker")))
runOn(second, third, fourth) {
proxy ! "Ping"
// lease has not been granted so now allowed to come up
expectNoMessage(2.seconds)
}
enterBarrier("singleton-not-migrated")
runOn(controller) {
TestLeaseActorClientExt(system).getLeaseActor() ! ActionRequest(Acquire(address(second).hostPort), true)
}
enterBarrier("singleton-moved-to-second")
runOn(second, third, fourth) {
proxy ! "Ping"
expectMsg(Response("Ping", address(second)))
}
enterBarrier("finished")
}
}
}

View file

@ -0,0 +1,96 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicReference
import akka.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }
import akka.coordination.lease.LeaseSettings
import akka.coordination.lease.scaladsl.Lease
import akka.event.Logging
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
import scala.concurrent.{ Future, Promise }
import scala.collection.JavaConverters._
object TestLeaseExt extends ExtensionId[TestLeaseExt] with ExtensionIdProvider {
override def get(system: ActorSystem): TestLeaseExt = super.get(system)
override def lookup = TestLeaseExt
override def createExtension(system: ExtendedActorSystem): TestLeaseExt = new TestLeaseExt(system)
}
class TestLeaseExt(val system: ExtendedActorSystem) extends Extension {
private val testLeases = new ConcurrentHashMap[String, TestLease]()
def getTestLease(name: String): TestLease = {
val lease = testLeases.get(name)
if (lease == null)
throw new IllegalStateException(
s"Test lease $name has not been set yet. Current leases ${testLeases.keys().asScala.toList}")
lease
}
def setTestLease(name: String, lease: TestLease): Unit =
testLeases.put(name, lease)
}
object TestLease {
final case class AcquireReq(owner: String)
final case class ReleaseReq(owner: String)
val config = ConfigFactory.parseString("""
test-lease {
lease-class = akka.cluster.TestLease
}
""".stripMargin)
}
class TestLease(settings: LeaseSettings, system: ExtendedActorSystem) extends Lease(settings) {
import TestLease._
val log = Logging(system, getClass)
val probe = TestProbe()(system)
log.info("Creating lease {}", settings)
TestLeaseExt(system).setTestLease(settings.leaseName, this)
val initialPromise = Promise[Boolean]
private val nextAcquireResult = new AtomicReference[Future[Boolean]](initialPromise.future)
private val nextCheckLeaseResult = new AtomicReference[Boolean](false)
private val currentCallBack = new AtomicReference[Option[Throwable] Unit](_ ())
def setNextAcquireResult(next: Future[Boolean]): Unit =
nextAcquireResult.set(next)
def setNextCheckLeaseResult(value: Boolean): Unit =
nextCheckLeaseResult.set(value)
def getCurrentCallback(): Option[Throwable] Unit = currentCallBack.get()
override def acquire(): Future[Boolean] = {
log.info("acquire, current response " + nextAcquireResult)
probe.ref ! AcquireReq(settings.ownerName)
nextAcquireResult.get()
}
override def release(): Future[Boolean] = {
probe.ref ! ReleaseReq(settings.ownerName)
Future.successful(true)
}
override def checkLease(): Boolean = nextCheckLeaseResult.get
override def acquire(callback: Option[Throwable] Unit): Future[Boolean] = {
currentCallBack.set(callback)
acquire()
}
}

View file

@ -0,0 +1,195 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.singleton
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.{ Actor, ActorLogging, ActorRef, ExtendedActorSystem, PoisonPill, Props }
import akka.cluster.TestLease.{ AcquireReq, ReleaseReq }
import akka.cluster.{ Cluster, MemberStatus, TestLease, TestLeaseExt }
import akka.testkit.{ AkkaSpec, TestProbe }
import com.typesafe.config.ConfigFactory
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.Success
class ImportantSingleton(lifeCycleProbe: ActorRef) extends Actor with ActorLogging {
override def preStart(): Unit = {
log.info("Important Singleton Starting")
lifeCycleProbe ! "preStart"
}
override def postStop(): Unit = {
log.info("Important Singleton Stopping")
lifeCycleProbe ! "postStop"
}
override def receive: Receive = {
case msg
sender() ! msg
}
}
class ClusterSingletonLeaseSpec extends AkkaSpec(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = cluster
akka.cluster.singleton {
use-lease = "test-lease"
lease-retry-interval = 2000ms
}
""").withFallback(TestLease.config)) {
val cluster = Cluster(system)
val testLeaseExt = TestLeaseExt(system)
override protected def atStartup(): Unit = {
cluster.join(cluster.selfAddress)
awaitAssert {
cluster.selfMember.status shouldEqual MemberStatus.Up
}
}
def extSystem: ExtendedActorSystem = system.asInstanceOf[ExtendedActorSystem]
val counter = new AtomicInteger()
def nextName() = s"important-${counter.getAndIncrement()}"
val shortDuration = 50.millis
val leaseOwner = cluster.selfMember.address.hostPort
def nextSettings() = ClusterSingletonManagerSettings(system).withSingletonName(nextName())
def leaseNameFor(settings: ClusterSingletonManagerSettings): String =
s"ClusterSingletonLeaseSpec-singleton-akka://ClusterSingletonLeaseSpec/user/${settings.singletonName}"
"A singleton with lease" should {
"not start until lease is available" in {
val probe = TestProbe()
val settings = nextSettings()
system.actorOf(
ClusterSingletonManager.props(Props(new ImportantSingleton(probe.ref)), PoisonPill, settings),
settings.singletonName)
val testLease = awaitAssert {
testLeaseExt.getTestLease(leaseNameFor(settings))
} // allow singleton manager to create the lease
probe.expectNoMessage(shortDuration)
testLease.initialPromise.complete(Success(true))
probe.expectMsg("preStart")
}
"do not start if lease acquire returns false" in {
val probe = TestProbe()
val settings = nextSettings()
system.actorOf(
ClusterSingletonManager.props(Props(new ImportantSingleton(probe.ref)), PoisonPill, settings),
settings.singletonName)
val testLease = awaitAssert {
testLeaseExt.getTestLease(leaseNameFor(settings))
} // allow singleton manager to create the lease
probe.expectNoMessage(shortDuration)
testLease.initialPromise.complete(Success(false))
probe.expectNoMessage(shortDuration)
}
"retry trying to get lease if acquire returns false" in {
val singletonProbe = TestProbe()
val settings = nextSettings()
system.actorOf(
ClusterSingletonManager.props(Props(new ImportantSingleton(singletonProbe.ref)), PoisonPill, settings),
settings.singletonName)
val testLease = awaitAssert {
testLeaseExt.getTestLease(leaseNameFor(settings))
} // allow singleton manager to create the lease
testLease.probe.expectMsg(AcquireReq(leaseOwner))
singletonProbe.expectNoMessage(shortDuration)
val nextResponse = Promise[Boolean]
testLease.setNextAcquireResult(nextResponse.future)
testLease.initialPromise.complete(Success(false))
testLease.probe.expectMsg(AcquireReq(leaseOwner))
singletonProbe.expectNoMessage(shortDuration)
nextResponse.complete(Success(true))
singletonProbe.expectMsg("preStart")
}
"do not start if lease acquire fails" in {
val probe = TestProbe()
val settings = nextSettings()
system.actorOf(
ClusterSingletonManager.props(Props(new ImportantSingleton(probe.ref)), PoisonPill, settings),
settings.singletonName)
val testLease = awaitAssert {
testLeaseExt.getTestLease(leaseNameFor(settings))
} // allow singleton manager to create the lease
probe.expectNoMessage(shortDuration)
testLease.initialPromise.failure(new RuntimeException("no lease for you"))
probe.expectNoMessage(shortDuration)
}
"retry trying to get lease if acquire returns fails" in {
val singletonProbe = TestProbe()
val settings = nextSettings()
system.actorOf(
ClusterSingletonManager.props(Props(new ImportantSingleton(singletonProbe.ref)), PoisonPill, settings),
settings.singletonName)
val testLease = awaitAssert {
testLeaseExt.getTestLease(leaseNameFor(settings))
} // allow singleton manager to create the lease
testLease.probe.expectMsg(AcquireReq(leaseOwner))
singletonProbe.expectNoMessage(shortDuration)
val nextResponse = Promise[Boolean]
testLease.setNextAcquireResult(nextResponse.future)
testLease.initialPromise.failure(new RuntimeException("no lease for you"))
testLease.probe.expectMsg(AcquireReq(leaseOwner))
singletonProbe.expectNoMessage(shortDuration)
nextResponse.complete(Success(true))
singletonProbe.expectMsg("preStart")
}
"stop singleton if the lease fails periodic check" in {
val lifecycleProbe = TestProbe()
val settings = nextSettings()
system.actorOf(
ClusterSingletonManager.props(Props(new ImportantSingleton(lifecycleProbe.ref)), PoisonPill, settings),
settings.singletonName)
val testLease = awaitAssert {
testLeaseExt.getTestLease(leaseNameFor(settings))
}
testLease.probe.expectMsg(AcquireReq(leaseOwner))
testLease.initialPromise.complete(Success(true))
lifecycleProbe.expectMsg("preStart")
val callback = testLease.getCurrentCallback()
callback(None)
lifecycleProbe.expectMsg("postStop")
testLease.probe.expectMsg(ReleaseReq(leaseOwner))
// should try and reacquire lease
testLease.probe.expectMsg(AcquireReq(leaseOwner))
lifecycleProbe.expectMsg("preStart")
}
"release lease when leaving oldest" in {
val singletonProbe = TestProbe()
val settings = nextSettings()
system.actorOf(
ClusterSingletonManager.props(Props(new ImportantSingleton(singletonProbe.ref)), PoisonPill, settings),
settings.singletonName)
val testLease = awaitAssert {
testLeaseExt.getTestLease(leaseNameFor(settings))
} // allow singleton manager to create the lease
singletonProbe.expectNoMessage(shortDuration)
testLease.probe.expectMsg(AcquireReq(leaseOwner))
testLease.initialPromise.complete(Success(true))
singletonProbe.expectMsg("preStart")
cluster.leave(cluster.selfAddress)
testLease.probe.expectMsg(ReleaseReq(leaseOwner))
}
}
}

View file

@ -251,7 +251,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro
awaitCond(
{
clusterView.refreshCurrentState()
if (memberInState(joinNode, List(MemberStatus.up)) &&
if (memberInState(joinNode, List(MemberStatus.Up)) &&
memberInState(myself, List(MemberStatus.Joining, MemberStatus.Up)))
true
else {

View file

@ -0,0 +1,22 @@
akka.coordination {
# Defaults for any lease implementation that doesn't include these properties
lease {
# FQCN of the implementation of the Lease
lease-class = ""
#defaults
# if the node that acquired the leases crashes, how long should the lease be held before another owner can get it
heartbeat-timeout = 120s
# interval for communicating with the third party to confirm the lease is still held
heartbeat-interval = 12s
# lease implementations are expected to time out acquire and release calls or document
# that they do not implement an operation timeout
lease-operation-timeout = 5s
#defaults
}
}

View file

@ -0,0 +1,13 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.coordination.lease
import akka.annotation.ApiMayChange
@ApiMayChange
class LeaseException(message: String) extends RuntimeException(message)
@ApiMayChange
final class LeaseTimeoutException(message: String) extends LeaseException(message)

View file

@ -0,0 +1,28 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.coordination.lease
import akka.annotation.ApiMayChange
import com.typesafe.config.Config
object LeaseSettings {
@ApiMayChange
def apply(config: Config, leaseName: String, ownerName: String): LeaseSettings = {
new LeaseSettings(leaseName, ownerName, TimeoutSettings(config), config)
}
}
@ApiMayChange
final class LeaseSettings(
val leaseName: String,
val ownerName: String,
val timeoutSettings: TimeoutSettings,
val leaseConfig: Config) {
def withTimeoutSettings(timeoutSettings: TimeoutSettings): LeaseSettings =
new LeaseSettings(leaseName, ownerName, timeoutSettings, leaseConfig)
override def toString = s"LeaseSettings($leaseName, $ownerName, $timeoutSettings)"
}

View file

@ -0,0 +1,87 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.coordination.lease
import akka.annotation.ApiMayChange
import com.typesafe.config.{ Config, ConfigValueType }
import akka.util.JavaDurationConverters._
import scala.concurrent.duration._
object TimeoutSettings {
@ApiMayChange
def apply(config: Config): TimeoutSettings = {
val heartBeatTimeout = config.getDuration("heartbeat-timeout").asScala
val heartBeatInterval = config.getValue("heartbeat-interval").valueType() match {
case ConfigValueType.STRING if config.getString("heartbeat-interval").isEmpty
(heartBeatTimeout / 10).max(5.seconds)
case _ config.getDuration("heartbeat-interval").asScala
}
require(heartBeatInterval < (heartBeatTimeout / 2), "heartbeat-interval must be less than half heartbeat-timeout")
new TimeoutSettings(heartBeatInterval, heartBeatTimeout, config.getDuration("lease-operation-timeout").asScala)
}
}
@ApiMayChange
final class TimeoutSettings(
val heartbeatInterval: FiniteDuration,
val heartbeatTimeout: FiniteDuration,
val operationTimeout: FiniteDuration) {
/**
* Java API
*/
def getHeartbeatInterval(): java.time.Duration = heartbeatInterval.asJava
/**
* Java API
*/
def getHeartbeatTimeout(): java.time.Duration = heartbeatTimeout.asJava
/**
* Java API
*/
def getOperationTimeout(): java.time.Duration = operationTimeout.asJava
/**
* Java API
*/
def withHeartbeatInterval(heartbeatInterval: java.time.Duration): TimeoutSettings = {
copy(heartbeatInterval = heartbeatInterval.asScala)
}
/**
* Java API
*/
def withHeartbeatTimeout(heartbeatTimeout: java.time.Duration): TimeoutSettings = {
copy(heartbeatTimeout = heartbeatTimeout.asScala)
}
/**
* Java API
*/
def withOperationTimeout(operationTimeout: java.time.Duration): TimeoutSettings = {
copy(operationTimeout = operationTimeout.asScala)
}
def withHeartbeatInterval(heartbeatInterval: FiniteDuration): TimeoutSettings = {
copy(heartbeatInterval = heartbeatInterval)
}
def withHeartbeatTimeout(heartbeatTimeout: FiniteDuration): TimeoutSettings = {
copy(heartbeatTimeout = heartbeatTimeout)
}
def withOperationTimeout(operationTimeout: FiniteDuration): TimeoutSettings = {
copy(operationTimeout = operationTimeout)
}
private def copy(
heartbeatInterval: FiniteDuration = heartbeatInterval,
heartbeatTimeout: FiniteDuration = heartbeatTimeout,
operationTimeout: FiniteDuration = operationTimeout): TimeoutSettings = {
new TimeoutSettings(heartbeatInterval, heartbeatTimeout, operationTimeout)
}
override def toString = s"TimeoutSettings($heartbeatInterval, $heartbeatTimeout, $operationTimeout)"
}

View file

@ -0,0 +1,35 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.coordination.lease.internal
import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.function.Consumer
import akka.annotation.InternalApi
import akka.coordination.lease.LeaseSettings
import akka.coordination.lease.javadsl.Lease
import akka.coordination.lease.scaladsl.{ Lease => ScalaLease }
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._
import scala.concurrent.ExecutionContext
/**
* INTERNAL API
*/
@InternalApi
final private[akka] class LeaseAdapter(delegate: ScalaLease)(implicit val ec: ExecutionContext) extends Lease {
override def acquire(): CompletionStage[java.lang.Boolean] = delegate.acquire().map(Boolean.box).toJava
override def acquire(leaseLostCallback: Consumer[Optional[Throwable]]): CompletionStage[java.lang.Boolean] = {
delegate.acquire(o leaseLostCallback.accept(o.asJava)).map(Boolean.box).toJava
}
override def release(): CompletionStage[java.lang.Boolean] = delegate.release().map(Boolean.box).toJava
override def checkLease(): Boolean = delegate.checkLease()
override def getSettings(): LeaseSettings = delegate.settings
}

View file

@ -0,0 +1,58 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.coordination.lease.javadsl
import java.util.Optional
import java.util.concurrent.CompletionStage
import akka.annotation.ApiMayChange
import akka.coordination.lease.LeaseSettings
@ApiMayChange
abstract class Lease() {
def getSettings(): LeaseSettings
/**
* Try to acquire the lease. The returned `CompletionStage` will be completed with `true`
* if the lease could be acquired, i.e. no other owner is holding the lease.
*
* The returned `Future` will be completed with `false` if the lease for certain couldn't be
* acquired, e.g. because some other owner is holding it. It's completed with [[akka.coordination.lease.LeaseException]]
* failure if it might not have been able to acquire the lease, e.g. communication timeout
* with the lease resource.
*
* The lease will be held by the [[LeaseSettings.ownerName]] until it is released
* with [[Lease.release]]. A Lease implementation will typically also loose the ownership
* if it can't maintain its authority, e.g. if it crashes or is partitioned from the lease
* resource for too long.
*
* [[Lease.checkLease]] can be used to verify that the owner still has the lease.
*/
def acquire(): CompletionStage[java.lang.Boolean]
/**
* Same as acquire with an additional callback
* that is called if the lease is lost. The lease can be lose due to being unable
* to communicate with the lease provider.
* Implementations should not call leaseLostCallback until after the returned future
* has been completed
*/
def acquire(leaseLostCallback: java.util.function.Consumer[Optional[Throwable]]): CompletionStage[java.lang.Boolean]
/**
* Release the lease so some other owner can acquire it.
*/
def release(): CompletionStage[java.lang.Boolean]
/**
* Check if the owner still holds the lease.
* `true` means that it certainly holds the lease.
* `false` means that it might not hold the lease, but it could, and for more certain
* response you would have to use [[Lease#acquire()*]] or [[Lease#release]].
*/
def checkLease(): Boolean
}

View file

@ -0,0 +1,41 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.coordination.lease.javadsl
import akka.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }
import akka.annotation.ApiMayChange
import akka.coordination.lease.internal.LeaseAdapter
import akka.coordination.lease.scaladsl.{ LeaseProvider ScalaLeaseProvider }
@ApiMayChange
object LeaseProvider extends ExtensionId[LeaseProvider] with ExtensionIdProvider {
override def get(system: ActorSystem): LeaseProvider = super.get(system)
override def lookup = LeaseProvider
override def createExtension(system: ExtendedActorSystem): LeaseProvider = new LeaseProvider(system)
private final case class LeaseKey(leaseName: String, configPath: String, clientName: String)
}
@ApiMayChange
class LeaseProvider(system: ExtendedActorSystem) extends Extension {
private val delegate = ScalaLeaseProvider(system)
/**
* The configuration define at `configPath` must have a property `lease-class` that defines
* the fully qualified class name of the Lease implementation.
* The class must implement [[Lease]] and have constructor with [[akka.coordination.lease.LeaseSettings]] parameter and
* optionally ActorSystem parameter.
*
* @param leaseName the name of the lease resource
* @param configPath the path of configuration for the lease
* @param ownerName the owner that will `acquire` the lease, e.g. hostname and port of the ActorSystem
*/
def getLease(leaseName: String, configPath: String, ownerName: String): Lease = {
val scalaLease = delegate.getLease(leaseName, configPath, ownerName)
new LeaseAdapter(scalaLease)(system.dispatcher)
}
}

View file

@ -0,0 +1,55 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.coordination.lease.scaladsl
import akka.annotation.ApiMayChange
import akka.coordination.lease.LeaseSettings
import scala.concurrent.Future
@ApiMayChange
abstract class Lease(val settings: LeaseSettings) {
/**
* Try to acquire the lease. The returned `Future` will be completed with `true`
* if the lease could be acquired, i.e. no other owner is holding the lease.
*
* The returned `Future` will be completed with `false` if the lease for certain couldn't be
* acquired, e.g. because some other owner is holding it. It's completed with [[akka.coordination.lease.LeaseException]]
* failure if it might not have been able to acquire the lease, e.g. communication timeout
* with the lease resource.
*
* The lease will be held by the [[akka.coordination.lease.LeaseSettings.ownerName]] until it is released
* with [[Lease.release]]. A Lease implementation will typically also lose the ownership
* if it can't maintain its authority, e.g. if it crashes or is partitioned from the lease
* resource for too long.
*
* [[Lease.checkLease]] can be used to verify that the owner still has the lease.
*/
def acquire(): Future[Boolean]
/**
* Same as acquire with an additional callback
* that is called if the lease is lost. The lease can be lose due to being unable
* to communicate with the lease provider.
* Implementations should not call leaseLostCallback until after the returned future
* has been completed
*/
def acquire(leaseLostCallback: Option[Throwable] Unit): Future[Boolean]
/**
* Release the lease so some other owner can acquire it.
*/
def release(): Future[Boolean]
/**
* Check if the owner still holds the lease.
* `true` means that it certainly holds the lease.
* `false` means that it might not hold the lease, but it could, and for more certain
* response you would have to use [[Lease#acquire()*]] or [[Lease#release]].
*/
def checkLease(): Boolean
}

View file

@ -0,0 +1,92 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.coordination.lease.scaladsl
import java.util.concurrent.ConcurrentHashMap
import java.util.function.{ Function JFunction }
import scala.collection.immutable
import scala.util.Failure
import scala.util.Success
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.annotation.ApiMayChange
import akka.event.Logging
import akka.coordination.lease.LeaseSettings
@ApiMayChange
object LeaseProvider extends ExtensionId[LeaseProvider] with ExtensionIdProvider {
override def get(system: ActorSystem): LeaseProvider = super.get(system)
override def lookup = LeaseProvider
override def createExtension(system: ExtendedActorSystem): LeaseProvider = new LeaseProvider(system)
private final case class LeaseKey(leaseName: String, configPath: String, clientName: String)
}
@ApiMayChange
class LeaseProvider(system: ExtendedActorSystem) extends Extension {
import LeaseProvider.LeaseKey
private val log = Logging(system, getClass)
private val leases = new ConcurrentHashMap[LeaseKey, Lease]()
/**
* The configuration define at `configPath` must have a property `lease-class` that defines
* the fully qualified class name of the Lease implementation.
* The class must implement [[Lease]] and have constructor with [[akka.coordination.lease.LeaseSettings]] parameter and
* optionally ActorSystem parameter.
*
* @param leaseName the name of the lease resource
* @param configPath the path of configuration for the lease
* @param ownerName the owner that will `acquire` the lease, e.g. hostname and port of the ActorSystem
*/
def getLease(leaseName: String, configPath: String, ownerName: String): Lease = {
val leaseKey = LeaseKey(leaseName, configPath, ownerName)
leases.computeIfAbsent(
leaseKey,
new JFunction[LeaseKey, Lease] {
override def apply(t: LeaseKey): Lease = {
val leaseConfig = system.settings.config
.getConfig(configPath)
.withFallback(system.settings.config.getConfig("akka.coordination.lease"))
loadLease(LeaseSettings(leaseConfig, leaseName, ownerName), configPath)
}
})
}
private def loadLease(leaseSettings: LeaseSettings, configPath: String): Lease = {
val fqcn = leaseSettings.leaseConfig.getString("lease-class")
require(fqcn.nonEmpty, "lease-class must not be empty")
val dynamicAccess = system.dynamicAccess
dynamicAccess
.createInstanceFor[Lease](
fqcn,
immutable.Seq((classOf[LeaseSettings], leaseSettings), (classOf[ExtendedActorSystem], system)))
.recoverWith {
case _: NoSuchMethodException
dynamicAccess.createInstanceFor[Lease](fqcn, immutable.Seq((classOf[LeaseSettings], leaseSettings)))
} match {
case Success(value) value
case Failure(e)
log.error(
e,
"Invalid lease configuration for leaseName [{}], configPath [{}] lease-class [{}]. " +
"The class must implement Lease and have constructor with LeaseSettings parameter and " +
"optionally ActorSystem parameter.",
leaseSettings.leaseName,
configPath,
fqcn)
throw e
}
}
// TODO how to clean up a lease? Not important for this use case as we'll only have one lease
}

View file

@ -0,0 +1,42 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.coordination.lease.javadsl;
import akka.actor.ActorSystem;
import akka.coordination.lease.scaladsl.LeaseProviderSpec;
import akka.testkit.AkkaJUnitActorSystemResource;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class LeaseProviderTest {
@Rule
public AkkaJUnitActorSystemResource actorSystemResource =
new AkkaJUnitActorSystemResource("LoggingAdapterTest", LeaseProviderSpec.config());
private ActorSystem system = null;
@Before
public void before() {
system = actorSystemResource.getSystem();
}
@Test
public void loadLeaseImpl() {
Lease leaseA = LeaseProvider.get(system).getLease("a", "lease-a", "owner1");
assertEquals(leaseA.getSettings().leaseName(), "a");
assertEquals(leaseA.getSettings().ownerName(), "owner1");
assertEquals(leaseA.getSettings().leaseConfig().getString("key1"), "value1");
Lease leaseB = LeaseProvider.get(system).getLease("b", "lease-b", "owner2");
assertEquals(leaseB.getSettings().leaseName(), "b");
assertEquals(leaseB.getSettings().ownerName(), "owner2");
assertEquals(leaseB.getSettings().leaseConfig().getString("key2"), "value2");
}
}

View file

@ -0,0 +1,94 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.coordination.lease;
import akka.actor.ActorSystem;
import akka.cluster.Cluster;
import akka.coordination.lease.LeaseSettings;
import akka.coordination.lease.javadsl.Lease;
import akka.coordination.lease.javadsl.LeaseProvider;
import akka.testkit.javadsl.TestKit;
import docs.akka.coordination.LeaseDocSpec;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
@SuppressWarnings("unused")
public class LeaseDocTest {
// #lease-example
static class SampleLease extends Lease {
private LeaseSettings settings;
public SampleLease(LeaseSettings settings) {
this.settings = settings;
}
@Override
public LeaseSettings getSettings() {
return settings;
}
@Override
public CompletionStage<Boolean> acquire() {
return null;
}
@Override
public CompletionStage<Boolean> acquire(Consumer<Optional<Throwable>> leaseLostCallback) {
return null;
}
@Override
public CompletionStage<Boolean> release() {
return null;
}
@Override
public boolean checkLease() {
return false;
}
}
// #lease-example
private static ActorSystem system;
@BeforeClass
public static void setup() {
system = ActorSystem.create("LeaseDocTest", LeaseDocSpec.config());
}
@AfterClass
public static void teardown() {
TestKit.shutdownActorSystem(system);
system = null;
}
private void doSomethingImportant(Optional<Throwable> leaseLostReason) {}
@Test
public void beLoadable() {
// #lease-usage
Lease lease =
LeaseProvider.get(system).getLease("<name of the lease>", "docs-lease", "<owner name>");
CompletionStage<Boolean> acquired = lease.acquire();
boolean stillAcquired = lease.checkLease();
CompletionStage<Boolean> released = lease.release();
// #lease-usage
// #lost-callback
lease.acquire(this::doSomethingImportant);
// #lost-callback
// #cluster-owner
String owner = Cluster.get(system).selfAddress().hostPort();
// #cluster-owner
}
}

View file

@ -0,0 +1,54 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.coordination.lease
import com.typesafe.config.ConfigFactory
import org.scalatest.{ Matchers, WordSpec }
import scala.concurrent.duration._
class TimeoutSettingsSpec extends WordSpec with Matchers {
private def conf(overrides: String): TimeoutSettings = {
val c = ConfigFactory.parseString(overrides).withFallback(ConfigFactory.load())
TimeoutSettings(c)
}
"TimeoutSettings" should {
"default heartbeat-interval to heartbeat-timeout / 10" in {
conf("""
heartbeat-timeout=100s
heartbeat-interval=""
lease-operation-timeout=5s
""").heartbeatInterval shouldEqual 10.second
}
"have a min of 5s for heartbeat-interval" in {
conf("""
heartbeat-timeout=40s
heartbeat-interval=""
lease-operation-timeout=5s
""").heartbeatInterval shouldEqual 5.second
}
"allow overriding of heartbeat-interval" in {
conf("""
heartbeat-timeout=100s
heartbeat-interval=20s
lease-operation-timeout=5s
""").heartbeatInterval shouldEqual 20.second
}
"not allow interval to be greater or equal to half the interval" in {
intercept[IllegalArgumentException] {
conf("""
heartbeat-timeout=100s
heartbeat-interval=50s
lease-operation-timeout=5s
""")
}.getMessage shouldEqual "requirement failed: heartbeat-interval must be less than half heartbeat-timeout"
}
}
}

View file

@ -0,0 +1,128 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.coordination.lease.scaladsl
import scala.concurrent.Future
import akka.actor.ExtendedActorSystem
import akka.coordination.lease.LeaseSettings
import akka.testkit.{ AkkaSpec, EventFilter }
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
object LeaseProviderSpec {
class LeaseA(settings: LeaseSettings) extends Lease(settings) {
override def acquire(): Future[Boolean] = Future.successful(false)
override def release(): Future[Boolean] = Future.successful(false)
override def checkLease(): Boolean = false
override def acquire(callback: Option[Throwable] Unit): Future[Boolean] = Future.successful(false)
}
class LeaseB(settings: LeaseSettings, system: ExtendedActorSystem) extends Lease(settings) {
system.name // warning
override def acquire(): Future[Boolean] = Future.successful(false)
override def release(): Future[Boolean] = Future.successful(false)
override def checkLease(): Boolean = false
override def acquire(callback: Option[Throwable] Unit): Future[Boolean] = Future.successful(false)
}
val config = ConfigFactory.parseString(s"""
lease-a {
lease-class = "${classOf[LeaseProviderSpec.LeaseA].getName}"
key1 = value1
heartbeat-timeout = 100s
heartbeat-interval = 1s
lease-operation-timeout = 2s
}
lease-b {
lease-class = "${classOf[LeaseProviderSpec.LeaseB].getName}"
key2 = value2
heartbeat-timeout = 120s
heartbeat-interval = 1s
lease-operation-timeout = 2s
}
lease-missing {
}
lease-unknown {
lease-class = "foo.wrong.ClassName"
heartbeat-timeout = 120s
heartbeat-interval = 1s
lease-operation-timeout = 2s
}
lease-fallback-to-defaults {
lease-class = "${classOf[LeaseProviderSpec.LeaseA].getName}"
}
""")
}
class LeaseProviderSpec extends AkkaSpec(LeaseProviderSpec.config) {
import LeaseProviderSpec._
"LeaseProvider" must {
"load lease implementation" in {
val leaseA = LeaseProvider(system).getLease("a", "lease-a", "owner1")
leaseA.getClass should ===(classOf[LeaseA])
leaseA.settings.leaseName should ===("a")
leaseA.settings.ownerName should ===("owner1")
leaseA.settings.leaseConfig.getString("key1") should ===("value1")
leaseA.settings.timeoutSettings.heartbeatTimeout should ===(100.seconds)
leaseA.settings.timeoutSettings.heartbeatInterval should ===(1.seconds)
leaseA.settings.timeoutSettings.operationTimeout should ===(2.seconds)
val leaseB = LeaseProvider(system).getLease("b", "lease-b", "owner2")
leaseB.getClass should ===(classOf[LeaseB])
leaseB.settings.leaseName should ===("b")
leaseB.settings.ownerName should ===("owner2")
leaseB.settings.leaseConfig.getString("key2") should ===("value2")
}
"load defaults for timeouts if not specified" in {
val defaults = LeaseProvider(system).getLease("a", "lease-fallback-to-defaults", "owner1")
defaults.settings.timeoutSettings.operationTimeout should ===(5.seconds)
defaults.settings.timeoutSettings.heartbeatTimeout should ===(120.seconds)
defaults.settings.timeoutSettings.heartbeatInterval should ===(12.seconds)
}
"return same instance for same leaseName, configPath and owner" in {
val leaseA1 = LeaseProvider(system).getLease("a2", "lease-a", "owner1")
val leaseA2 = LeaseProvider(system).getLease("a2", "lease-a", "owner1")
leaseA1 shouldBe theSameInstanceAs(leaseA2)
}
"return different instance for different leaseName" in {
val leaseA1 = LeaseProvider(system).getLease("a3", "lease-a", "owner1")
val leaseA2 = LeaseProvider(system).getLease("a3b", "lease-a", "owner1")
leaseA1 should not be theSameInstanceAs(leaseA2)
}
"return different instance for different ownerName" in {
val leaseA1 = LeaseProvider(system).getLease("a4", "lease-a", "owner1")
val leaseA2 = LeaseProvider(system).getLease("a4", "lease-a", "owner2")
leaseA1 should not be theSameInstanceAs(leaseA2)
}
"throw if missing lease-class config" in {
intercept[IllegalArgumentException] {
LeaseProvider(system).getLease("x", "lease-missing", "owner1")
}.getMessage should include("lease-class must not be empty")
}
"throw if unknown lease-class config" in {
intercept[ClassNotFoundException] {
EventFilter[ClassNotFoundException](occurrences = 1).intercept {
LeaseProvider(system).getLease("x", "lease-unknown", "owner1")
}
}
}
}
}

View file

@ -0,0 +1,88 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.akka.coordination
import akka.cluster.Cluster
import akka.coordination.lease.LeaseSettings
import akka.coordination.lease.scaladsl.{ Lease, LeaseProvider }
import akka.testkit.AkkaSpec
import com.typesafe.config.ConfigFactory
import scala.concurrent.Future
//#lease-example
class SampleLease(settings: LeaseSettings) extends Lease(settings) {
override def acquire(): Future[Boolean] = {
Future.successful(true)
}
override def acquire(leaseLostCallback: Option[Throwable] => Unit): Future[Boolean] = {
Future.successful(true)
}
override def release(): Future[Boolean] = {
Future.successful(true)
}
override def checkLease(): Boolean = {
true
}
}
//#lease-example
object LeaseDocSpec {
val config = ConfigFactory.parseString("""
#lease-config
akka.actor.provider = cluster
docs-lease {
lease-class = "docs.akka.coordination.SampleLease"
heartbeat-timeout = 100s
heartbeat-interval = 1s
lease-operation-timeout = 1s
# Any lease specific configuration
}
#lease-config
""".stripMargin)
def blackhole(stuff: Any*): Unit = {
stuff.toString
()
}
def doSomethingImportant(leaseLostReason: Option[Throwable]): Unit = {
leaseLostReason.map(_.toString)
()
}
}
class LeaseDocSpec extends AkkaSpec(LeaseDocSpec.config) {
import LeaseDocSpec._
"A docs lease" should {
"be loadable" in {
//#lease-usage
val lease = LeaseProvider(system).getLease("<name of the lease>", "docs-lease", "owner")
val acquired: Future[Boolean] = lease.acquire()
val stillAcquired: Boolean = lease.checkLease()
val released: Future[Boolean] = lease.release()
//#lease-usage
//#lost-callback
lease.acquire(leaseLostReason => doSomethingImportant(leaseLostReason))
//#lost-callback
//#cluster-owner
val owner = Cluster(system).selfAddress.hostPort
//#cluster-owner
// remove compiler warnings
blackhole(acquired, stillAcquired, released, owner)
}
}
}

View file

@ -517,3 +517,24 @@ When doing rolling upgrades special care must be taken to not change any of the
* the persistence mode
If any one of these needs a change it will require a full cluster restart.
## Lease
A @ref[lease](coordination.md) can be used as an additional safety measure to ensure a shard
does not run on two nodes.
Reasons for how this can happen:
* Network partitions without an appropriate downing provider
* Mistakes in the deployment process leading to two separate Akka Clusters
* Timing issues between removing members from the Cluster on one side of a network partition and shutting them down on the other side
A lease can be a final backup that means that each shard won't create child entity actors unless it has the lease.
To use a lease for sharding set `akka.cluster.sharding.use-lease` to the configuration location
of the lease to use. Each shard will try and acquire a lease with with the name `<actor system name>-shard-<type name>-<shard id>` and
the owner is set to the `Cluster(system).selfAddress.hostPort`.
If a shard can't acquire a lease it will remain uninitialized so messages for entities it owns will
be buffered in the `ShardRegion`. If the lease is lost after initialization the Shard will be terminated.

View file

@ -184,3 +184,23 @@ Scala
Java
: @@snip [ClusterSingletonSupervision.java](/akka-docs/src/test/java/jdocs/cluster/singleton/ClusterSingletonSupervision.java) { #singleton-supervisor-actor-usage-imports }
@@snip [ClusterSingletonSupervision.java](/akka-docs/src/test/java/jdocs/cluster/singleton/ClusterSingletonSupervision.java) { #singleton-supervisor-actor-usage }
## Lease
A @ref[lease](coordination.md) can be used as an additional safety measure to ensure that two singletons
don't run at the same time. Reasons for how this can happen:
* Network partitions without an appropriate downing provider
* Mistakes in the deployment process leading to two separate Akka Clusters
* Timing issues between removing members from the Cluster on one side of a network partition and shutting them down on the other side
A lease can be a final backup that means that the singleton actor won't be created unless
the lease can be acquired.
To use a lease for singleton set `akka.cluster.singleton.use-lease` to the configuration location
of the lease to use. A lease with with the name `<actor system name>-singleton-<singleton actor path>` is used and
the owner is set to the @scala[`Cluster(system).selfAddress.hostPort`]@java[`Cluster.get(system).selfAddress().hostPort()`].
If the cluster singleton manager can't acquire the lease it will keep retrying while it is the oldest node in the cluster.
If the lease is lost then the singleton actor will be terminated then the lease will be re-tried.

View file

@ -0,0 +1,112 @@
# Coordination
@@@ warning
This module is currently marked as @ref:[may change](common/may-change.md). It is ready to be used
in production but the API may change without warning or a deprecation period.
@@@
Akka Coordination is a set of tools for distributed coordination.
## Dependency
@@dependency[sbt,Gradle,Maven] {
group="com.typesafe.akka"
artifact="akka-coordination_$scala.binary_version$"
version="$akka.version$"
}
## Lease
The lease is a pluggable API for a distributed lock.
## Using a lease
Leases are loaded with:
* Lease name
* Config location to indicate which implementation should be loaded
* Owner name
Any lease implementation should provide the following guarantees:
* A lease with the same name loaded multiple times, even on different nodes, is the same lease
* Only one owner can acquire the lease at a time
To acquire a lease:
Scala
: @@snip [LeaseDocSpec.scala](/akka-coordination/src/test/scala/docs/akka/coordination/LeaseDocSpec.scala) { #lease-usage }
Java
: @@snip [LeaseDocTest.java](/akka-coordination/src/test/java/jdocs/akka/coordination/lease/LeaseDocTest.java) { #lease-usage }
Acquiring a lease returns a @scala[Future]@java[CompletionStage] as lease implementations typically are implemented
via a third party system such as the Kubernetes API server or Zookeeper.
Once a lease is acquired `checkLease` can be called to ensure that the lease is still acquired. As lease implementations
are based on other distributed systems a lease can be lost due to a timeout with the third party system. This operation is
not asynchronous so it can be called before performing any action for which having the lease is important.
A lease has an owner. If the same owner tries to acquire the lease multiple times it will succeed i.e. leases are reentrant.
It is important to pick a lease name that will be unique for your use case. If a lease needs to be unique for each node
in a Cluster the cluster host port can be use:
Scala
: @@snip [LeaseDocSpec.scala](/akka-coordination/src/test/scala/docs/akka/coordination/LeaseDocSpec.scala) { #cluster-owner }
Java
: @@snip [LeaseDocTest.scala](/akka-coordination/src/test/java/jdocs/akka/coordination/lease/LeaseDocTest.java) { #cluster-owner }
For use cases where multiple different leases on the same node then something unique must be added to the name. For example
a lease can be used with Cluster Sharding and in this case the shard Id is included in the lease name for each shard.
## Usages in other Akka modules
Leases can be used for @ref[Cluster Singletons](cluster-singleton.md#lease) and @ref[Cluster Sharding](cluster-sharding.md#lease).
## Lease implementations
* [Kubernetes API](https://developer.lightbend.com/docs/akka-commercial-addons/current/kubernetes-lease.html)
## Implementing a lease
Implementations should extend
the @scala[`akka.coordination.lease.scaladsl.Lease`]@java[`akka.coordination.lease.javadsl.Lease`]
Scala
: @@snip [LeaseDocSpec.scala](/akka-coordination/src/test/scala/docs/akka/coordination/LeaseDocSpec.scala) { #lease-example }
Java
: @@snip [LeaseDocTest.scala](/akka-coordination/src/test/java/jdocs/akka/coordination/lease/LeaseDocTest.java) { #lease-example }
The methods should provide the following guarantees:
* `acquire` should complete with: `true` if the lease has been acquired, `false` if the lease is taken by another owner, or fail if it can't communicate with the third party system implementing the lease.
* `release` should complete with: `true` if the lease has definitely been released, `false` if the lease has definitely not been released, or fail if it is unknown if the lease has been released.
* `checkLease` should return false until an `acquire` @scala[Future]@java[CompletionStage] has completed and should return `false` if the lease is lost due to an error communicating with the third party. Check lease should also not block.
* The `acquire` lease lost callback should only be called after an `aquire` @scala[Future]@java[CompletionStage] has completed and should be called if the lease is lose e.g. due to losing communication with the third party system.
In addition it is expected that a lease implementation will include a time to live mechanism meaning that a lease won't be held for ever in case the node crashes.
If a user prefers to have outside intervention in this case for maximum safety then the time to live can be set to infinite.
The configuration must define the `lease-class` property for the FQCN of the lease implementation.
The lease implementation should have support for the following properties where the defaults come from `akka.coordination.lease`:
@@snip [reference.conf](/akka-coordination/src/main/resources/reference.conf) { #defaults }
This configuration location is passed into `getLease`.
Scala
: @@snip [LeaseDocSpec.scala](/akka-coordination/src/test/scala/docs/akka/coordination/LeaseDocSpec.scala) { #lease-config }
Java
: @@snip [LeaseDocSpec.scala](/akka-coordination/src/test/scala/docs/akka/coordination/LeaseDocSpec.scala) { #lease-config }

View file

@ -13,6 +13,7 @@
* [stream/index](stream/index.md)
* [index-network](index-network.md)
* [discovery](discovery/index.md)
* [coordination](coordination.md)
* [index-futures](index-futures.md)
* [index-utilities](index-utilities.md)
* [common/other-modules](common/other-modules.md)

View file

@ -580,7 +580,7 @@ private[akka] class BarrierCoordinator
with LoggingFSM[BarrierCoordinator.State, BarrierCoordinator.Data] {
import BarrierCoordinator._
import Controller._
import FSM.`->`
import FSM._
// this shall be set to true if all subsequent barriers shall fail
var failed = false

View file

@ -6,8 +6,9 @@ enablePlugins(UnidocRoot, TimeStampede, UnidocWithPrValidation, NoPublish, Copyr
JavaFormatterPlugin)
disablePlugins(MimaPlugin)
addCommandAlias(
name ="fixall",
name = "fixall",
value = ";scalafixEnable;compile:scalafix;test:scalafix;multi-jvm:scalafix;test:compile;reload")
import akka.AkkaBuild._
import akka.{AkkaBuild, Dependencies, GitHub, OSGi, Protobuf, SigarLoader, VersionGenerator}
import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm
@ -48,7 +49,8 @@ lazy val aggregatedProjects: Seq[ProjectReference] = Seq(
clusterTyped, clusterShardingTyped,
benchJmhTyped,
streamTyped,
discovery
discovery,
coordination
)
lazy val root = Project(
@ -134,7 +136,7 @@ lazy val camel = akkaModule("akka-camel")
.settings(OSGi.camel)
lazy val cluster = akkaModule("akka-cluster")
.dependsOn(remote, remoteTests % "test->test" , testkit % "test->test")
.dependsOn(remote, remoteTests % "test->test", testkit % "test->test")
.settings(Dependencies.cluster)
.settings(AutomaticModuleName.settings("akka.cluster"))
.settings(OSGi.cluster)
@ -168,7 +170,7 @@ lazy val clusterSharding = akkaModule("akka-cluster-sharding")
cluster % "compile->compile;test->test;multi-jvm->multi-jvm",
distributedData,
persistence % "compile->compile",
clusterTools
clusterTools % "compile->compile;test->test"
)
.settings(Dependencies.clusterSharding)
.settings(AutomaticModuleName.settings("akka.cluster.sharding"))
@ -178,7 +180,10 @@ lazy val clusterSharding = akkaModule("akka-cluster-sharding")
.enablePlugins(MultiNode, ScaladocNoVerificationOfDiagrams)
lazy val clusterTools = akkaModule("akka-cluster-tools")
.dependsOn(cluster % "compile->compile;test->test;multi-jvm->multi-jvm")
.dependsOn(
cluster % "compile->compile;test->test;multi-jvm->multi-jvm",
coordination
)
.settings(Dependencies.clusterTools)
.settings(AutomaticModuleName.settings("akka.cluster.tools"))
.settings(OSGi.clusterTools)
@ -192,7 +197,8 @@ lazy val contrib = akkaModule("akka-contrib")
.settings(AutomaticModuleName.settings("akka.contrib"))
.settings(OSGi.contrib)
.settings(
description := """|
description :=
"""|
|This subproject provides a home to modules contributed by external
|developers which may or may not move into the officially supported code
|base over time. A module in this subproject doesn't have to obey the rule
@ -234,7 +240,7 @@ lazy val docs = akkaModule("akka-docs")
)
.settings(Dependencies.docs)
.settings(
name in (Compile, paradox) := "Akka",
name in(Compile, paradox) := "Akka",
Compile / paradoxProperties ++= Map(
"canonical.base_url" -> "https://doc.akka.io/docs/akka/current",
"github.base_url" -> GitHub.url(version.value), // for links like this: @github[#1](#1) or @github[83986f9](83986f9)
@ -325,7 +331,7 @@ lazy val persistenceTck = akkaModule("akka-persistence-tck")
.dependsOn(persistence % "compile->compile;test->test", testkit % "compile->compile;test->test")
.settings(Dependencies.persistenceTck)
.settings(AutomaticModuleName.settings("akka.persistence.tck"))
//.settings(OSGi.persistenceTck) TODO: we do need to export this as OSGi bundle too?
//.settings(OSGi.persistenceTck) TODO: we do need to export this as OSGi bundle too?
.settings(
fork in Test := true
)
@ -414,7 +420,8 @@ lazy val actorTyped = akkaModule("akka-actor-typed")
.settings(OSGi.actorTyped)
.settings(AkkaBuild.noScala211)
.settings(
initialCommands := """
initialCommands :=
"""
import akka.actor.typed._
import akka.actor.typed.scaladsl.Behaviors
import scala.concurrent._
@ -474,7 +481,7 @@ lazy val clusterShardingTyped = akkaModule("akka-cluster-sharding-typed")
.settings(AkkaBuild.noScala211)
.settings(AutomaticModuleName.settings("akka.cluster.sharding.typed"))
// To be able to import ContainerFormats.proto
.settings(Protobuf.importPath := Some(baseDirectory.value / ".." / "akka-remote" / "src" / "main" / "protobuf" ))
.settings(Protobuf.importPath := Some(baseDirectory.value / ".." / "akka-remote" / "src" / "main" / "protobuf"))
.disablePlugins(MimaPlugin)
.configs(MultiJvm)
.enablePlugins(MultiNodeScalaTest)
@ -520,6 +527,20 @@ lazy val discovery = akkaModule("akka-discovery")
.settings(AutomaticModuleName.settings("akka.discovery"))
.settings(OSGi.discovery)
lazy val coordination = akkaModule("akka-coordination")
.dependsOn(
actor,
testkit % "test->test",
actorTests % "test->test",
cluster % "test->test"
)
.settings(Dependencies.coordination)
.settings(AutomaticModuleName.settings("akka.coordination"))
.settings(OSGi.coordination)
.settings(AkkaBuild.mayChangeSettings)
.disablePlugins(MimaPlugin)
def akkaModule(name: String): Project =
Project(id = name, base = file(name))
.enablePlugins(ReproducibleBuildsPlugin)
@ -541,7 +562,6 @@ addCommandAlias("allActor", commandValue(actor, Some(actorTests)))
addCommandAlias("allRemote", commandValue(remote, Some(remoteTests)))
addCommandAlias("allClusterCore", commandValue(cluster))
addCommandAlias("allClusterMetrics", commandValue(clusterMetrics))
addCommandAlias("allDistributedData", commandValue(distributedData))
addCommandAlias("allClusterSharding", commandValue(clusterSharding))
addCommandAlias("allClusterTools", commandValue(clusterTools))
addCommandAlias("allCluster", Seq(
@ -549,6 +569,8 @@ addCommandAlias("allCluster", Seq(
commandValue(distributedData),
commandValue(clusterSharding),
commandValue(clusterTools)).mkString)
addCommandAlias("allCoordination", commandValue(coordination))
addCommandAlias("allDistributedData", commandValue(distributedData))
addCommandAlias("allPersistence", commandValue(persistence))
addCommandAlias("allStream", commandValue(stream, Some(streamTests)))
addCommandAlias("allDiscovery", commandValue(discovery))

View file

@ -24,12 +24,14 @@ object AkkaDisciplinePlugin extends AutoPlugin with ScalafixSupport {
val fatalWarningsFor = Set(
"akka-discovery",
"akka-distributed-data",
"akka-protobuf",
"akka-coordination",
"akka-protobuf"
)
val strictProjects = Set(
"akka-discovery",
"akka-protobuf",
"akka-coordination"
)
lazy val scalaFixSettings = Seq(

View file

@ -150,6 +150,8 @@ object Dependencies {
val discovery = l ++= Seq(Test.junit, Test.scalatest.value)
val coordination = l ++= Seq(Test.junit, Test.scalatest.value)
val testkit = l ++= Seq(Test.junit, Test.scalatest.value) ++ Test.metricsAll
val actorTests = l ++= Seq(

View file

@ -133,6 +133,8 @@ object OSGi {
val discovery = exports(Seq("akka.discovery.*"))
val coordination = exports(Seq("akka.coordination.*"))
val osgiOptionalImports = Seq(
// needed because testkit is normally not used in the application bundle,
// but it should still be included as transitive dependency and used by BundleDelegatingClassLoader