Fix typed cluster singleton cross dc proxies (#24936)

* Fix typed cluster singleton cross dc proxies
* Adds first multi-jvm test for typed cluster
This commit is contained in:
Christopher Batey 2018-04-27 12:44:44 +01:00 committed by GitHub
parent 82e2e2c551
commit 23373565db
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 414 additions and 28 deletions

View file

@ -0,0 +1,111 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.typed
import akka.actor.typed.{ ActorRef, Props }
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.typed.{ MultiDcClusterActors, MultiNodeTypedClusterSpec }
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec }
import akka.testkit.typed.scaladsl.TestProbe
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.ScalaFutures
object MultiDcClusterShardingSpecConfig extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
commonConfig(
ConfigFactory.parseString(
"""
akka.loglevel = DEBUG
""").withFallback(
MultiNodeTypedClusterSpec.clusterConfig))
nodeConfig(first, second)(ConfigFactory.parseString(
"""
akka.cluster.multi-data-center.self-data-center = "dc1"
"""))
nodeConfig(third, fourth)(ConfigFactory.parseString(
"""
akka.cluster.multi-data-center.self-data-center = "dc2"
"""))
testTransport(on = true)
}
class MultiDcClusterShardingMultiJvmNode1 extends MultiDcClusterShardingSpec
class MultiDcClusterShardingMultiJvmNode2 extends MultiDcClusterShardingSpec
class MultiDcClusterShardingMultiJvmNode3 extends MultiDcClusterShardingSpec
class MultiDcClusterShardingMultiJvmNode4 extends MultiDcClusterShardingSpec
abstract class MultiDcClusterShardingSpec extends MultiNodeSpec(MultiDcClusterShardingSpecConfig)
with MultiNodeTypedClusterSpec with ScalaFutures {
import MultiDcClusterShardingSpecConfig._
import MultiDcClusterActors._
val typeKey = EntityTypeKey[PingProtocol]("ping")
val entityId = "ping-1"
"Cluster sharding in multi dc cluster" must {
"form cluster" in {
formCluster(first, second, third, fourth)
}
"start sharding" in {
val sharding = ClusterSharding(typedSystem)
val shardRegion: ActorRef[ShardingEnvelope[PingProtocol]] = sharding.spawn(
_ multiDcPinger,
Props.empty,
typeKey = typeKey,
ClusterShardingSettings(typedSystem),
10,
NoMore
)
val probe = TestProbe[Pong]
shardRegion ! ShardingEnvelope(entityId, Ping(probe.ref))
probe.expectMessage(Pong(cluster.selfMember.dataCenter))
enterBarrier("sharding-started")
}
"be able to message via entity ref" in {
val probe = TestProbe[Pong]
val entityRef = ClusterSharding(typedSystem).entityRefFor(typeKey, entityId)
entityRef ! Ping(probe.ref)
probe.expectMessage(Pong(cluster.selfMember.dataCenter))
enterBarrier("entity-ref")
}
}
"be able to ask via entity ref" in {
implicit val timeout = Timeout(remainingOrDefault)
val entityRef = ClusterSharding(typedSystem).entityRefFor(typeKey, entityId)
val response = entityRef ? Ping
response.futureValue shouldEqual Pong(cluster.selfMember.dataCenter)
enterBarrier("ask")
}
"be able to message cross dc via proxy" in {
runOn(first, second) {
val proxy: ActorRef[ShardingEnvelope[PingProtocol]] = ClusterSharding(typedSystem).spawn(
_ multiDcPinger,
Props.empty,
typeKey = typeKey,
ClusterShardingSettings(typedSystem).withDataCenter("dc2"),
10,
NoMore
)
val probe = TestProbe[Pong]
proxy ! ShardingEnvelope(entityId, Ping(probe.ref))
probe.expectMessage(remainingOrDefault, Pong("dc2"))
}
enterBarrier("done")
}
}

View file

@ -168,7 +168,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
private val regions: ConcurrentHashMap[String, ActorRef] = new ConcurrentHashMap
private val proxies: ConcurrentHashMap[String, ActorRef] = new ConcurrentHashMap
private lazy val guardian = {
private lazy val guardian: ActorRef = {
val guardianName: String =
system.settings.config.getString("akka.cluster.sharding.guardian-name")
val dispatcher = system.settings.config
@ -610,7 +610,7 @@ private[akka] class ClusterShardingGuardian extends Actor {
context.system.deadLetters
}
def receive = {
def receive: Receive = {
case Start(typeName,
entityProps,
settings,

View file

@ -323,9 +323,6 @@ object ShardRegion {
*/
final case class StartEntityAck(entityId: EntityId, shardId: ShardRegion.ShardId) extends ClusterShardingSerializable
private def roleOption(role: String): Option[String] =
if (role == "") None else Option(role)
/**
* INTERNAL API. Sends stopMessage (e.g. `PoisonPill`) to the entities and when all of
* them have terminated it replies with `ShardStopped`.
@ -461,7 +458,7 @@ private[akka] class ShardRegion(
}
}
def receive = {
def receive: Receive = {
case Terminated(ref) receiveTerminated(ref)
case ShardInitialized(shardId) initializeShard(shardId, sender())
case evt: ClusterDomainEvent receiveClusterEvent(evt)
@ -759,7 +756,7 @@ private[akka] class ShardRegion(
getShard(shardId)
case None
if (!shardBuffers.contains(shardId)) {
log.debug("Request shard [{}] home", shardId)
log.debug("Request shard [{}] home. Coordinator [{}]", shardId, coordinator)
coordinator.foreach(_ ! GetShardHome(shardId))
}
val buf = shardBuffers.getOrEmpty(shardId)
@ -788,7 +785,7 @@ private[akka] class ShardRegion(
context.system.deadLetters ! msg
case None
if (!shardBuffers.contains(shardId)) {
log.debug("Request shard [{}] home", shardId)
log.debug("Request shard [{}] home. Coordinator [{}]", shardId, coordinator)
coordinator.foreach(_ ! GetShardHome(shardId))
}
bufferMessage(shardId, msg, snd)

View file

@ -123,7 +123,7 @@ abstract class MultiDcClusterShardingSpec extends MultiNodeSpec(MultiDcClusterSh
}, 10.seconds)
}
s"Cluster sharding in multi data center cluster" must {
"Cluster sharding in multi data center cluster" must {
"join cluster" in within(20.seconds) {
join(first, first)
join(second, first)

View file

@ -97,6 +97,8 @@ final class ClusterSingletonProxySettings(
def withDataCenter(dataCenter: DataCenter): ClusterSingletonProxySettings = copy(dataCenter = Some(dataCenter))
def withDataCenter(dataCenter: Option[DataCenter]): ClusterSingletonProxySettings = copy(dataCenter = dataCenter)
def withSingletonIdentificationInterval(singletonIdentificationInterval: FiniteDuration): ClusterSingletonProxySettings =
copy(singletonIdentificationInterval = singletonIdentificationInterval)

View file

@ -5,29 +5,24 @@
package akka.cluster.singleton
import language.postfixOps
import scala.collection.immutable
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.Address
import akka.actor.Props
import akka.actor.RootActorPath
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.cluster.Member
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._
import akka.testkit.TestEvent._
import akka.actor.Terminated
import akka.actor.Identify
import akka.actor.ActorIdentity
import akka.actor.ActorSelection
import akka.cluster.MemberStatus
object ClusterSingletonManagerSpec extends MultiNodeConfig {
val controller = role("controller")

View file

@ -88,17 +88,21 @@ final class ClusterSingletonSettings(
* INTERNAL API:
*/
@InternalApi
private[akka] def toProxySettings(singletonName: String): ClusterSingletonProxySettings =
private[akka] def toProxySettings(singletonName: String): ClusterSingletonProxySettings = {
new ClusterSingletonProxySettings(singletonName, role, singletonIdentificationInterval, bufferSize)
.withDataCenter(dataCenter)
}
/**
* INTERNAL API:
*/
@InternalApi
private[akka] def shouldRunManager(cluster: Cluster): Boolean =
private[akka] def shouldRunManager(cluster: Cluster): Boolean = {
(role.isEmpty || cluster.selfMember.roles(role.get)) &&
(dataCenter.isEmpty || dataCenter.contains(cluster.selfMember.dataCenter))
}
override def toString = s"ClusterSingletonSettings($role, $dataCenter, $singletonIdentificationInterval, $removalMargin, $handOverRetryInterval, $bufferSize)"
}
object ClusterSingleton extends ExtensionId[ClusterSingleton] {
@ -141,7 +145,6 @@ abstract class ClusterSingleton extends Extension {
settings: ClusterSingletonSettings,
terminationMessage: A
): ActorRef[A]
}
object ClusterSingletonManagerSettings {

View file

@ -14,6 +14,7 @@ import akka.actor.typed.Behavior.UntypedPropsBehavior
import akka.cluster.typed.{ Cluster, ClusterSingleton, ClusterSingletonImpl, ClusterSingletonSettings }
import akka.actor.typed.internal.adapter.ActorSystemAdapter
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props }
import akka.cluster.ClusterSettings.DataCenter
/**
* INTERNAL API:
@ -21,20 +22,21 @@ import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, Props }
@InternalApi
private[akka] final class AdaptedClusterSingletonImpl(system: ActorSystem[_]) extends ClusterSingleton {
require(system.isInstanceOf[ActorSystemAdapter[_]], "only adapted actor systems can be used for the typed cluster singleton")
import ClusterSingletonImpl._
import akka.actor.typed.scaladsl.adapter._
private lazy val cluster = Cluster(system)
private val untypedSystem = system.toUntyped.asInstanceOf[ExtendedActorSystem]
private val proxies = new ConcurrentHashMap[String, ActorRef[_]]()
private val proxies = new ConcurrentHashMap[(String, Option[DataCenter]), ActorRef[_]]()
override def spawn[A](
behavior: Behavior[A],
singletonName: String,
props: Props,
settings: ClusterSingletonSettings,
terminationMessage: A) = {
terminationMessage: A): ActorRef[A] = {
if (settings.shouldRunManager(cluster)) {
val managerName = managerNameFor(singletonName)
@ -53,15 +55,19 @@ private[akka] final class AdaptedClusterSingletonImpl(system: ActorSystem[_]) ex
}
}
val proxyCreator = new JFunction[String, ActorRef[_]] {
def apply(singletonName: String): ActorRef[_] = {
val proxyName = s"singletonProxy$singletonName"
getProxy(singletonName, settings)
}
private def getProxy[T](name: String, settings: ClusterSingletonSettings): ActorRef[T] = {
val proxyCreator = new JFunction[(String, Option[DataCenter]), ActorRef[_]] {
def apply(singletonNameAndDc: (String, Option[DataCenter])): ActorRef[_] = {
val (singletonName, _) = singletonNameAndDc
val proxyName = s"singletonProxy$singletonName-${settings.dataCenter.getOrElse("no-dc")}"
untypedSystem.systemActorOf(
ClusterSingletonProxy.props(s"/system/${managerNameFor(singletonName)}", settings.toProxySettings(singletonName)),
proxyName)
}
}
proxies.computeIfAbsent(singletonName, proxyCreator).asInstanceOf[ActorRef[A]]
proxies.computeIfAbsent((name, settings.dataCenter), proxyCreator).asInstanceOf[ActorRef[T]]
}
}

View file

@ -0,0 +1,26 @@
/*
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.typed
import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl.Behaviors
object MultiDcClusterActors {
case class Pong(dc: String)
sealed trait PingProtocol
case class Ping(ref: ActorRef[Pong]) extends PingProtocol
case object NoMore extends PingProtocol
val multiDcPinger = Behaviors.setup[PingProtocol] { ctx
val cluster = Cluster(ctx.system)
Behaviors.receiveMessage[PingProtocol] {
case Ping(ref)
ref ! Pong(cluster.selfMember.dataCenter)
Behaviors.same
case NoMore
Behaviors.stopped
}
}
}

View file

@ -0,0 +1,126 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.typed
import akka.actor.typed.Props
import akka.actor.typed.scaladsl.adapter._
import akka.cluster.MemberStatus
import akka.remote.testconductor.RoleName
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec }
import akka.testkit.typed.scaladsl.TestProbe
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
object MultiDcClusterSingletonSpecConfig extends MultiNodeConfig {
val first: RoleName = role("first")
val second = role("second")
val third = role("third")
commonConfig(
ConfigFactory.parseString(
"""
akka.loglevel = DEBUG
""").withFallback(
MultiNodeTypedClusterSpec.clusterConfig))
nodeConfig(first)(ConfigFactory.parseString(
"""
akka.cluster.multi-data-center.self-data-center = "dc1"
"""))
nodeConfig(second, third)(ConfigFactory.parseString(
"""
akka.cluster.multi-data-center.self-data-center = "dc2"
"""))
testTransport(on = true)
}
class MultiDcClusterSingletonMultiJvmNode1 extends MultiDcClusterSingletonSpec
class MultiDcClusterSingletonMultiJvmNode2 extends MultiDcClusterSingletonSpec
class MultiDcClusterSingletonMultiJvmNode3 extends MultiDcClusterSingletonSpec
abstract class MultiDcClusterSingletonSpec extends MultiNodeSpec(MultiDcClusterSingletonSpecConfig)
with MultiNodeTypedClusterSpec {
import MultiDcClusterActors._
import MultiDcClusterSingletonSpecConfig._
"A typed cluster with multiple data centers" must {
"be able to form" in {
runOn(first) {
cluster.manager ! Join(cluster.selfMember.address)
}
runOn(second, third) {
cluster.manager ! Join(first)
}
enterBarrier("form-cluster-join-attempt")
runOn(first, second, third) {
within(20.seconds) {
awaitAssert(clusterView.members.filter(_.status == MemberStatus.Up) should have size 3)
}
}
enterBarrier("cluster started")
}
"be able to create and ping singleton in same DC" in {
runOn(first) {
val singleton = ClusterSingleton(typedSystem)
val pinger = singleton.spawn(
multiDcPinger,
"ping",
Props.empty,
ClusterSingletonSettings(typedSystem),
NoMore
)
val probe = TestProbe[Pong]
pinger ! Ping(probe.ref)
probe.expectMessage(Pong("dc1"))
enterBarrier("singleton-up")
}
runOn(second, third) {
enterBarrier("singleton-up")
}
}
"be able to ping singleton via proxy in another dc" in {
runOn(second) {
val singleton = ClusterSingleton(system.toTyped)
val pinger = singleton.spawn(
multiDcPinger,
"ping",
Props.empty,
ClusterSingletonSettings(typedSystem).withDataCenter("dc1"),
NoMore
)
val probe = TestProbe[Pong]
pinger ! Ping(probe.ref)
probe.expectMessage(Pong("dc1"))
}
enterBarrier("singleton-pinged")
}
"be able to target singleton with the same name in own dc " in {
runOn(second, third) {
val singleton = ClusterSingleton(typedSystem)
val pinger = singleton.spawn(
multiDcPinger,
"ping",
Props.empty,
ClusterSingletonSettings(typedSystem),
NoMore
)
val probe = TestProbe[Pong]
pinger ! Ping(probe.ref)
probe.expectMessage(Pong("dc2"))
}
enterBarrier("singleton-pinged-own-dc")
}
}
}

View file

@ -0,0 +1,114 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.typed
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import akka.actor.{ Address, Scheduler }
import akka.actor.typed.ActorSystem
import akka.remote.testkit.{ FlightRecordingSupport, MultiNodeSpec, STMultiNodeSpec }
import akka.testkit.WatchedByCoroner
import org.scalatest.{ Matchers, Suite }
import akka.actor.typed.scaladsl.adapter._
import akka.cluster.{ ClusterEvent, MemberStatus }
import akka.remote.testconductor.RoleName
import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.duration._
import scala.language.implicitConversions
object MultiNodeTypedClusterSpec {
def clusterConfig: Config = ConfigFactory.parseString(
s"""
akka.actor.provider = cluster
akka.actor.warn-about-java-serializer-usage = off
akka.cluster {
jmx.enabled = off
gossip-interval = 200 ms
leader-actions-interval = 200 ms
unreachable-nodes-reaper-interval = 500 ms
periodic-tasks-initial-delay = 300 ms
publish-stats-interval = 0 s # always, when it happens
failure-detector.heartbeat-interval = 500 ms
run-coordinated-shutdown-when-down = off
}
akka.loglevel = INFO
akka.log-dead-letters = off
akka.log-dead-letters-during-shutdown = off
akka.remote {
log-remote-lifecycle-events = off
artery.advanced.flight-recorder {
enabled=on
destination=target/flight-recorder-${UUID.randomUUID().toString}.afr
}
}
akka.loggers = ["akka.testkit.TestEventListener"]
akka.test {
single-expect-default = 10 s
}
""")
}
trait MultiNodeTypedClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoroner with FlightRecordingSupport with Matchers {
self: MultiNodeSpec
override def initialParticipants: Int = roles.size
implicit def typedSystem: ActorSystem[Nothing] = system.toTyped
implicit def scheduler: Scheduler = system.scheduler
private val cachedAddresses = new ConcurrentHashMap[RoleName, Address]
// TODO: Add support for typed to multi node test kit
def cluster: Cluster = Cluster(system.toTyped)
def clusterView: ClusterEvent.CurrentClusterState = cluster.state
override def expectedTestDuration: FiniteDuration = 60.seconds
/**
* Lookup the Address for the role.
*
* Implicit conversion from RoleName to Address.
*
* It is cached, which has the implication that stopping
* and then restarting a role (jvm) with another address is not
* supported.
*/
implicit def address(role: RoleName): Address = {
cachedAddresses.get(role) match {
case null
val address = node(role).address
cachedAddresses.put(role, address)
address
case address address
}
}
def formCluster(first: RoleName, rest: RoleName*): Unit = {
runOn(first) {
cluster.manager ! Join(cluster.selfMember.address)
awaitAssert(cluster.state.members.exists { m
m.uniqueAddress == cluster.selfMember.uniqueAddress && m.status == MemberStatus.Up
} should be(true))
}
enterBarrier(first.name + "-joined")
rest foreach { node
runOn(node) {
cluster.manager ! Join(address(first))
awaitAssert(cluster.state.members.exists { m
m.uniqueAddress == cluster.selfMember.uniqueAddress && m.status == MemberStatus.Up
} should be(true))
}
}
enterBarrier("all-joined")
}
}

View file

@ -76,7 +76,7 @@ object MultiNodeClusterSpec {
class EndActor(testActor: ActorRef, target: Option[Address]) extends Actor {
import EndActor._
def receive = {
def receive: Receive = {
case SendEnd
target foreach { t
context.actorSelection(RootActorPath(t) / self.path.elements) ! End

View file

@ -414,26 +414,32 @@ lazy val clusterTyped = akkaModule("akka-cluster-typed")
persistenceTyped % "test->test",
protobuf,
typedTestkit % "test->test",
actorTypedTests % "test->test"
actorTypedTests % "test->test",
remoteTests % "test->test"
)
.settings(AkkaBuild.mayChangeSettings)
.settings(AutomaticModuleName.settings("akka.cluster.typed"))
.disablePlugins(MimaPlugin)
.configs(MultiJvm)
.enablePlugins(MultiNodeScalaTest)
lazy val clusterShardingTyped = akkaModule("akka-cluster-sharding-typed")
.dependsOn(
clusterTyped,
clusterTyped % "compile->compile;test->test;multi-jvm->multi-jvm",
persistenceTyped,
clusterSharding,
typedTestkit % "test->test",
actorTypedTests % "test->test",
persistenceTyped % "test->test"
persistenceTyped % "test->test",
remoteTests % "test->test"
)
.settings(AkkaBuild.mayChangeSettings)
.settings(AutomaticModuleName.settings("akka.cluster.sharding.typed"))
// To be able to import ContainerFormats.proto
.settings(Protobuf.importPath := Some(baseDirectory.value / ".." / "akka-remote" / "src" / "main" / "protobuf" ))
.disablePlugins(MimaPlugin)
.configs(MultiJvm)
.enablePlugins(MultiNodeScalaTest)
lazy val streamTyped = akkaModule("akka-stream-typed")
.dependsOn(