Multi node test hardening for Sharding #27749 (#28631)

This commit is contained in:
Helena Edelson 2020-02-27 12:05:55 -08:00 committed by GitHub
parent 8ba9fda183
commit 6fe2f66adc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
21 changed files with 615 additions and 1444 deletions

View file

@ -4,39 +4,18 @@
package akka.cluster.sharding package akka.cluster.sharding
import scala.collection.immutable
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor._ import akka.actor._
import akka.cluster.{ Cluster, MultiNodeClusterSpec }
import akka.persistence.Persistence
import akka.persistence.journal.leveldb.SharedLeveldbJournal
import akka.persistence.journal.leveldb.SharedLeveldbStore
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import scala.concurrent.Future
import akka.util.Timeout
import akka.pattern.ask import akka.pattern.ask
import akka.remote.testconductor.RoleName
import akka.testkit._
import akka.util.Timeout
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration._
object ClusterShardingCustomShardAllocationSpec { object ClusterShardingCustomShardAllocationSpec {
class Entity extends Actor {
def receive = {
case id: Int => sender() ! id
}
}
val extractEntityId: ShardRegion.ExtractEntityId = {
case id: Int => (id.toString, id)
}
val extractShardId: ShardRegion.ExtractShardId = {
case id: Int => id.toString
}
case object AllocateReq case object AllocateReq
case class UseRegion(region: ActorRef) case class UseRegion(region: ActorRef)
@ -81,37 +60,23 @@ object ClusterShardingCustomShardAllocationSpec {
} }
abstract class ClusterShardingCustomShardAllocationSpecConfig(val mode: String) extends MultiNodeConfig { abstract class ClusterShardingCustomShardAllocationSpecConfig(mode: String)
extends MultiNodeClusterShardingConfig(
mode,
additionalConfig = s"""
akka.cluster.sharding.rebalance-interval = 1 s
akka.persistence.journal.leveldb-shared.store.native = off
""") {
val first = role("first") val first = role("first")
val second = role("second") val second = role("second")
commonConfig(
ConfigFactory
.parseString(s"""
akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared {
timeout = 5s
store {
native = off
dir = "target/ClusterShardingCustomShardAllocationSpec/journal"
}
}
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/ClusterShardingCustomShardAllocationSpec/snapshots"
akka.cluster.sharding.state-store-mode = "$mode"
akka.cluster.sharding.rebalance-interval = 1 s
#akka.cluster.sharding.retry-interval = 5 s
""")
.withFallback(SharedLeveldbJournal.configToEnableJavaSerializationForTest)
.withFallback(MultiNodeClusterSpec.clusterConfig))
} }
object PersistentClusterShardingCustomShardAllocationSpecConfig object PersistentClusterShardingCustomShardAllocationSpecConfig
extends ClusterShardingCustomShardAllocationSpecConfig("persistence") extends ClusterShardingCustomShardAllocationSpecConfig(ClusterShardingSettings.StateStoreModePersistence)
object DDataClusterShardingCustomShardAllocationSpecConfig object DDataClusterShardingCustomShardAllocationSpecConfig
extends ClusterShardingCustomShardAllocationSpecConfig("ddata") extends ClusterShardingCustomShardAllocationSpecConfig(ClusterShardingSettings.StateStoreModeDData)
class PersistentClusterShardingCustomShardAllocationSpec class PersistentClusterShardingCustomShardAllocationSpec
extends ClusterShardingCustomShardAllocationSpec(PersistentClusterShardingCustomShardAllocationSpecConfig) extends ClusterShardingCustomShardAllocationSpec(PersistentClusterShardingCustomShardAllocationSpecConfig)
@ -126,62 +91,35 @@ class PersistentClusterShardingCustomShardAllocationMultiJvmNode2
class DDataClusterShardingCustomShardAllocationMultiJvmNode1 extends DDataClusterShardingCustomShardAllocationSpec class DDataClusterShardingCustomShardAllocationMultiJvmNode1 extends DDataClusterShardingCustomShardAllocationSpec
class DDataClusterShardingCustomShardAllocationMultiJvmNode2 extends DDataClusterShardingCustomShardAllocationSpec class DDataClusterShardingCustomShardAllocationMultiJvmNode2 extends DDataClusterShardingCustomShardAllocationSpec
abstract class ClusterShardingCustomShardAllocationSpec(config: ClusterShardingCustomShardAllocationSpecConfig) abstract class ClusterShardingCustomShardAllocationSpec(multiNodeConfig: ClusterShardingCustomShardAllocationSpecConfig)
extends MultiNodeSpec(config) extends MultiNodeClusterShardingSpec(multiNodeConfig)
with STMultiNodeSpec
with ImplicitSender { with ImplicitSender {
import ClusterShardingCustomShardAllocationSpec._
import config._
override def initialParticipants = roles.size import ClusterShardingCustomShardAllocationSpec._
import multiNodeConfig._
def join(from: RoleName, to: RoleName): Unit = { def join(from: RoleName, to: RoleName): Unit = {
runOn(from) { join(
Cluster(system).join(node(to).address) from,
startSharding() to,
} startSharding(
enterBarrier(from.name + "-joined") system,
}
def startSharding(): Unit = {
ClusterSharding(system).start(
typeName = "Entity", typeName = "Entity",
entityProps = Props[Entity], entityProps = TestActors.echoActorProps,
settings = ClusterShardingSettings(system), extractEntityId = MultiNodeClusterShardingSpec.intExtractEntityId,
extractEntityId = extractEntityId, extractShardId = MultiNodeClusterShardingSpec.intExtractShardId,
extractShardId = extractShardId, allocationStrategy = TestAllocationStrategy(allocator)))
allocationStrategy = TestAllocationStrategy(allocator),
handOffStopMessage = PoisonPill)
} }
lazy val region = ClusterSharding(system).shardRegion("Entity") lazy val region = ClusterSharding(system).shardRegion("Entity")
lazy val allocator = system.actorOf(Props[Allocator], "allocator") lazy val allocator = system.actorOf(Props[Allocator], "allocator")
def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData
s"Cluster sharding ($mode) with custom allocation strategy" must { s"Cluster sharding ($mode) with custom allocation strategy" must {
if (!isDdataMode) {
"setup shared journal" in {
// start the Persistence extension
Persistence(system)
runOn(first) {
system.actorOf(Props[SharedLeveldbStore], "store")
}
enterBarrier("persistence-started")
runOn(first, second) {
system.actorSelection(node(first) / "user" / "store") ! Identify(None)
val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get
SharedLeveldbJournal.setStore(sharedStore, system)
}
enterBarrier("after-1")
}
}
"use specified region" in within(30.seconds) { "use specified region" in within(30.seconds) {
startPersistenceIfNotDdataMode(startOn = first, setStoreOn = Seq(first, second))
join(first, first) join(first, first)
runOn(first) { runOn(first) {

View file

@ -4,27 +4,16 @@
package akka.cluster.sharding package akka.cluster.sharding
import java.io.File
import akka.cluster.sharding.ShardRegion.Passivate
import scala.concurrent.duration._
import org.apache.commons.io.FileUtils
import com.typesafe.config.ConfigFactory
import akka.actor._ import akka.actor._
import akka.cluster.{ Cluster, MemberStatus, MultiNodeClusterSpec } import akka.cluster.sharding.ShardRegion.Passivate
import akka.persistence.Persistence
import akka.persistence.journal.leveldb.SharedLeveldbJournal
import akka.persistence.journal.leveldb.SharedLeveldbStore
import akka.remote.testconductor.RoleName import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.remote.transport.ThrottlerTransportAdapter.Direction import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.serialization.jackson.CborSerializable import akka.serialization.jackson.CborSerializable
import akka.testkit._ import akka.testkit._
import akka.util.ccompat._ import akka.util.ccompat._
import scala.concurrent.duration._
@ccompatUsedUntil213 @ccompatUsedUntil213
object ClusterShardingFailureSpec { object ClusterShardingFailureSpec {
case class Get(id: String) extends CborSerializable case class Get(id: String) extends CborSerializable
@ -49,56 +38,36 @@ object ClusterShardingFailureSpec {
case Get(id) => id.charAt(0).toString case Get(id) => id.charAt(0).toString
case Add(id, _) => id.charAt(0).toString case Add(id, _) => id.charAt(0).toString
} }
} }
abstract class ClusterShardingFailureSpecConfig(val mode: String) extends MultiNodeConfig { abstract class ClusterShardingFailureSpecConfig(override val mode: String)
val controller = role("controller") extends MultiNodeClusterShardingConfig(
val first = role("first") mode,
val second = role("second") additionalConfig = s"""
commonConfig(
ConfigFactory
.parseString(s"""
akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.remote.classic.log-remote-lifecycle-events = off
akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning
akka.cluster.testkit.auto-down-unreachable-after = 0s
akka.cluster.roles = ["backend"] akka.cluster.roles = ["backend"]
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared {
timeout = 5s
store {
native = off
dir = "target/ClusterShardingFailureSpec/journal"
}
}
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/ClusterShardingFailureSpec/snapshots"
akka.cluster.sharding { akka.cluster.sharding {
coordinator-failure-backoff = 3s coordinator-failure-backoff = 3s
shard-failure-backoff = 3s shard-failure-backoff = 3s
state-store-mode = "$mode"
}
akka.cluster.sharding.distributed-data.durable.lmdb {
dir = target/ClusterShardingFailureSpec/sharding-ddata
map-size = 10 MiB
} }
akka.persistence.journal.leveldb-shared.store.native = off
# using Java serialization for these messages because test is sending them # using Java serialization for these messages because test is sending them
# to other nodes, which isn't normal usage. # to other nodes, which isn't normal usage.
akka.actor.serialization-bindings { akka.actor.serialization-bindings {
"${classOf[ShardRegion.Passivate].getName}" = java-test "${classOf[ShardRegion.Passivate].getName}" = java-test
} }
""") """) {
.withFallback(SharedLeveldbJournal.configToEnableJavaSerializationForTest)
.withFallback(MultiNodeClusterSpec.clusterConfig)) val controller = role("controller")
val first = role("first")
val second = role("second")
testTransport(on = true) testTransport(on = true)
} }
object PersistentClusterShardingFailureSpecConfig extends ClusterShardingFailureSpecConfig("persistence") object PersistentClusterShardingFailureSpecConfig
object DDataClusterShardingFailureSpecConfig extends ClusterShardingFailureSpecConfig("ddata") extends ClusterShardingFailureSpecConfig(ClusterShardingSettings.StateStoreModePersistence)
object DDataClusterShardingFailureSpecConfig
extends ClusterShardingFailureSpecConfig(ClusterShardingSettings.StateStoreModeDData)
class PersistentClusterShardingFailureSpec class PersistentClusterShardingFailureSpec
extends ClusterShardingFailureSpec(PersistentClusterShardingFailureSpecConfig) extends ClusterShardingFailureSpec(PersistentClusterShardingFailureSpecConfig)
@ -112,79 +81,31 @@ class DDataClusterShardingFailureMultiJvmNode1 extends DDataClusterShardingFailu
class DDataClusterShardingFailureMultiJvmNode2 extends DDataClusterShardingFailureSpec class DDataClusterShardingFailureMultiJvmNode2 extends DDataClusterShardingFailureSpec
class DDataClusterShardingFailureMultiJvmNode3 extends DDataClusterShardingFailureSpec class DDataClusterShardingFailureMultiJvmNode3 extends DDataClusterShardingFailureSpec
abstract class ClusterShardingFailureSpec(config: ClusterShardingFailureSpecConfig) abstract class ClusterShardingFailureSpec(multiNodeConfig: ClusterShardingFailureSpecConfig)
extends MultiNodeSpec(config) extends MultiNodeClusterShardingSpec(multiNodeConfig)
with STMultiNodeSpec
with ImplicitSender { with ImplicitSender {
import ClusterShardingFailureSpec._ import ClusterShardingFailureSpec._
import config._ import multiNodeConfig._
override def initialParticipants = roles.size
val storageLocations = List(
new File(system.settings.config.getString("akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile)
override protected def atStartup(): Unit = {
storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir))
enterBarrier("startup")
}
override protected def afterTermination(): Unit = {
storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir))
}
val cluster = Cluster(system)
def join(from: RoleName, to: RoleName): Unit = { def join(from: RoleName, to: RoleName): Unit = {
runOn(from) { join(
cluster.join(node(to).address) from,
startSharding() to,
startSharding(
within(remaining) { system,
awaitAssert {
cluster.state.members.unsorted.map(_.uniqueAddress) should contain(cluster.selfUniqueAddress)
cluster.state.members.unsorted.map(_.status) should ===(Set(MemberStatus.Up))
}
}
}
enterBarrier(from.name + "-joined")
}
def startSharding(): Unit = {
ClusterSharding(system).start(
typeName = "Entity", typeName = "Entity",
entityProps = Props[Entity], entityProps = Props[Entity],
settings = ClusterShardingSettings(system).withRememberEntities(true),
extractEntityId = extractEntityId, extractEntityId = extractEntityId,
extractShardId = extractShardId) extractShardId = extractShardId))
} }
lazy val region = ClusterSharding(system).shardRegion("Entity") lazy val region = ClusterSharding(system).shardRegion("Entity")
def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData
s"Cluster sharding ($mode) with flaky journal/network" must { s"Cluster sharding ($mode) with flaky journal/network" must {
if (!isDdataMode) {
"setup shared journal" in {
// start the Persistence extension
Persistence(system)
runOn(controller) {
system.actorOf(Props[SharedLeveldbStore], "store")
}
enterBarrier("persistence-started")
runOn(first, second) {
system.actorSelection(node(controller) / "user" / "store") ! Identify(None)
val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get
SharedLeveldbJournal.setStore(sharedStore, system)
}
enterBarrier("after-1")
}
}
"join cluster" in within(20.seconds) { "join cluster" in within(20.seconds) {
startPersistenceIfNotDdataMode(startOn = controller, setStoreOn = Seq(first, second))
join(first, first) join(first, first)
join(second, first) join(second, first)

View file

@ -4,64 +4,40 @@
package akka.cluster.sharding package akka.cluster.sharding
import akka.actor._
import akka.cluster.{ Cluster, MultiNodeClusterSpec }
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.remote.testconductor.RoleName
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec }
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.serialization.jackson.CborSerializable import akka.actor._
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
object ClusterShardingGetStateSpec { object ClusterShardingGetStateSpec {
case object Stop extends CborSerializable import MultiNodeClusterShardingSpec.PingPongActor
case class Ping(id: Long) extends CborSerializable
case object Pong extends CborSerializable
class ShardedActor extends Actor with ActorLogging {
log.info(self.path.toString)
def receive = {
case Stop => context.stop(self)
case _: Ping => sender() ! Pong
}
}
val extractEntityId: ShardRegion.ExtractEntityId = { val extractEntityId: ShardRegion.ExtractEntityId = {
case msg @ Ping(id) => (id.toString, msg) case msg @ PingPongActor.Ping(id) => (id.toString, msg)
} }
val numberOfShards = 2 val numberOfShards = 2
val extractShardId: ShardRegion.ExtractShardId = { val extractShardId: ShardRegion.ExtractShardId = {
case Ping(id) => (id % numberOfShards).toString case PingPongActor.Ping(id) => (id % numberOfShards).toString
} }
val shardTypeName = "Ping" val shardTypeName = "Ping"
} }
object ClusterShardingGetStateSpecConfig extends MultiNodeConfig { object ClusterShardingGetStateSpecConfig extends MultiNodeClusterShardingConfig(additionalConfig = s"""
val controller = role("controller")
val first = role("first")
val second = role("second")
commonConfig(ConfigFactory.parseString(s"""
akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning
akka.cluster.testkit.auto-down-unreachable-after = 0s
akka.cluster.sharding { akka.cluster.sharding {
coordinator-failure-backoff = 3s coordinator-failure-backoff = 3s
shard-failure-backoff = 3s shard-failure-backoff = 3s
state-store-mode = "ddata"
} }
akka.cluster.sharding.distributed-data.durable.lmdb { """) {
dir = target/ClusterShardingGetStateSpec/sharding-ddata
map-size = 10 MiB val controller = role("controller")
} val first = role("first")
""").withFallback(MultiNodeClusterSpec.clusterConfig)) val second = role("second")
nodeConfig(first, second)(ConfigFactory.parseString("""akka.cluster.roles=["shard"]""")) nodeConfig(first, second)(ConfigFactory.parseString("""akka.cluster.roles=["shard"]"""))
@ -71,45 +47,18 @@ class ClusterShardingGetStateSpecMultiJvmNode1 extends ClusterShardingGetStateSp
class ClusterShardingGetStateSpecMultiJvmNode2 extends ClusterShardingGetStateSpec class ClusterShardingGetStateSpecMultiJvmNode2 extends ClusterShardingGetStateSpec
class ClusterShardingGetStateSpecMultiJvmNode3 extends ClusterShardingGetStateSpec class ClusterShardingGetStateSpecMultiJvmNode3 extends ClusterShardingGetStateSpec
abstract class ClusterShardingGetStateSpec abstract class ClusterShardingGetStateSpec extends MultiNodeClusterShardingSpec(ClusterShardingGetStateSpecConfig) {
extends MultiNodeSpec(ClusterShardingGetStateSpecConfig)
with STMultiNodeSpec {
import ClusterShardingGetStateSpec._ import ClusterShardingGetStateSpec._
import ClusterShardingGetStateSpecConfig._ import ClusterShardingGetStateSpecConfig._
import MultiNodeClusterShardingSpec.PingPongActor
def initialParticipants = roles.size
def startShard(): ActorRef = {
ClusterSharding(system).start(
typeName = shardTypeName,
entityProps = Props(new ShardedActor),
settings = ClusterShardingSettings(system).withRole("shard"),
extractEntityId = extractEntityId,
extractShardId = extractShardId)
}
def startProxy(): ActorRef = {
ClusterSharding(system).startProxy(
typeName = shardTypeName,
role = Some("shard"),
extractEntityId = extractEntityId,
extractShardId = extractShardId)
}
def join(from: RoleName): Unit = {
runOn(from) {
Cluster(system).join(node(controller).address)
}
enterBarrier(from.name + "-joined")
}
"Inspecting cluster sharding state" must { "Inspecting cluster sharding state" must {
"join cluster" in { "join cluster" in {
join(controller) join(controller, controller)
join(first) join(first, controller)
join(second) join(second, controller)
// make sure all nodes has joined // make sure all nodes has joined
awaitAssert { awaitAssert {
@ -118,10 +67,22 @@ abstract class ClusterShardingGetStateSpec
} }
runOn(controller) { runOn(controller) {
startProxy() startProxy(
system,
typeName = shardTypeName,
role = Some("shard"),
extractEntityId = extractEntityId,
extractShardId = extractShardId)
} }
runOn(first, second) { runOn(first, second) {
startShard() startSharding(
system,
typeName = shardTypeName,
entityProps = Props(new PingPongActor),
settings = settings.withRole("shard"),
extractEntityId = extractEntityId,
extractShardId = extractShardId)
} }
enterBarrier("sharding started") enterBarrier("sharding started")
@ -147,9 +108,9 @@ abstract class ClusterShardingGetStateSpec
awaitAssert { awaitAssert {
val pingProbe = TestProbe() val pingProbe = TestProbe()
// trigger starting of 4 entities // trigger starting of 4 entities
(1 to 4).foreach(n => region.tell(Ping(n), pingProbe.ref)) (1 to 4).foreach(n => region.tell(PingPongActor.Ping(n), pingProbe.ref))
pingProbe.receiveWhile(messages = 4) { pingProbe.receiveWhile(messages = 4) {
case Pong => () case PingPongActor.Pong => ()
} }
} }
} }

View file

@ -7,68 +7,37 @@ package akka.cluster.sharding
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.actor._ import akka.actor._
import akka.cluster.Cluster import akka.cluster.{ Cluster, MemberStatus }
import akka.cluster.MemberStatus import akka.testkit.{ TestDuration, TestProbe }
import akka.cluster.MultiNodeClusterSpec
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.serialization.jackson.CborSerializable
import akka.testkit.TestDuration
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
object ClusterShardingGetStatsSpec { object ClusterShardingGetStatsSpec {
case object Stop extends CborSerializable import MultiNodeClusterShardingSpec.PingPongActor
case class Ping(id: Long) extends CborSerializable
case object Pong extends CborSerializable
class ShardedActor extends Actor with ActorLogging { val shardTypeName = "Ping"
log.info(s"entity started {}", self.path)
def receive = {
case Stop => context.stop(self)
case _: Ping => sender() ! Pong
}
}
val extractEntityId: ShardRegion.ExtractEntityId = {
case msg @ Ping(id) => (id.toString, msg)
}
val numberOfShards = 3 val numberOfShards = 3
val extractShardId: ShardRegion.ExtractShardId = { val extractEntityId: ShardRegion.ExtractEntityId = {
case Ping(id) => (id % numberOfShards).toString case msg @ PingPongActor.Ping(id) => (id.toString, msg)
}
val extractShardId: ShardRegion.ExtractShardId = {
case PingPongActor.Ping(id) => (id % numberOfShards).toString
} }
val shardTypeName = "Ping"
} }
object ClusterShardingGetStatsSpecConfig extends MultiNodeConfig { object ClusterShardingGetStatsSpecConfig
extends MultiNodeClusterShardingConfig(additionalConfig = """
akka.log-dead-letters-during-shutdown = off
akka.cluster.sharding.updating-state-timeout = 2s
akka.cluster.sharding.waiting-for-state-timeout = 2s
""") {
val controller = role("controller") val controller = role("controller")
val first = role("first") val first = role("first")
val second = role("second") val second = role("second")
val third = role("third") val third = role("third")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.remote.classic.log-remote-lifecycle-events = off
akka.log-dead-letters-during-shutdown = off
akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning
akka.cluster.testkit.auto-down-unreachable-after = 0s
akka.cluster.sharding {
state-store-mode = "ddata"
updating-state-timeout = 2s
waiting-for-state-timeout = 2s
}
akka.cluster.sharding.distributed-data.durable.lmdb {
dir = target/ClusterShardingGetStatsSpec/sharding-ddata
map-size = 10 MiB
}
""").withFallback(MultiNodeClusterSpec.clusterConfig))
nodeConfig(first, second, third)(ConfigFactory.parseString("""akka.cluster.roles=["shard"]""")) nodeConfig(first, second, third)(ConfigFactory.parseString("""akka.cluster.roles=["shard"]"""))
} }
@ -78,48 +47,30 @@ class ClusterShardingGetStatsSpecMultiJvmNode2 extends ClusterShardingGetStatsSp
class ClusterShardingGetStatsSpecMultiJvmNode3 extends ClusterShardingGetStatsSpec class ClusterShardingGetStatsSpecMultiJvmNode3 extends ClusterShardingGetStatsSpec
class ClusterShardingGetStatsSpecMultiJvmNode4 extends ClusterShardingGetStatsSpec class ClusterShardingGetStatsSpecMultiJvmNode4 extends ClusterShardingGetStatsSpec
abstract class ClusterShardingGetStatsSpec abstract class ClusterShardingGetStatsSpec extends MultiNodeClusterShardingSpec(ClusterShardingGetStatsSpecConfig) {
extends MultiNodeSpec(ClusterShardingGetStatsSpecConfig)
with STMultiNodeSpec {
import ClusterShardingGetStatsSpec._ import ClusterShardingGetStatsSpec._
import ClusterShardingGetStatsSpecConfig._ import ClusterShardingGetStatsSpecConfig._
import MultiNodeClusterShardingSpec.PingPongActor
def initialParticipants = roles.size
def startShard(): ActorRef = { def startShard(): ActorRef = {
ClusterSharding(system).start( startSharding(
system,
typeName = shardTypeName, typeName = shardTypeName,
entityProps = Props(new ShardedActor), entityProps = Props(new PingPongActor),
settings = ClusterShardingSettings(system).withRole("shard"), settings = settings.withRole("shard"),
extractEntityId = extractEntityId, extractEntityId = extractEntityId,
extractShardId = extractShardId) extractShardId = extractShardId)
} }
def startProxy(): ActorRef = {
ClusterSharding(system).startProxy(
typeName = shardTypeName,
role = Some("shard"),
extractEntityId = extractEntityId,
extractShardId = extractShardId)
}
def join(from: RoleName): Unit = {
runOn(from) {
Cluster(system).join(node(controller).address)
}
enterBarrier(from.name + "-joined")
}
lazy val region = ClusterSharding(system).shardRegion(shardTypeName) lazy val region = ClusterSharding(system).shardRegion(shardTypeName)
"Inspecting cluster sharding state" must { "Inspecting cluster sharding state" must {
"join cluster" in { "join cluster" in {
join(controller) Seq(controller, first, second, third).foreach { node =>
join(first) join(from = node, to = controller)
join(second) }
join(third)
// make sure all nodes are up // make sure all nodes are up
within(10.seconds) { within(10.seconds) {
@ -129,7 +80,12 @@ abstract class ClusterShardingGetStatsSpec
} }
runOn(controller) { runOn(controller) {
startProxy() startProxy(
system,
typeName = shardTypeName,
role = Some("shard"),
extractEntityId = extractEntityId,
extractShardId = extractShardId)
} }
runOn(first, second, third) { runOn(first, second, third) {
startShard() startShard()
@ -162,9 +118,9 @@ abstract class ClusterShardingGetStatsSpec
val pingProbe = TestProbe() val pingProbe = TestProbe()
// trigger starting of 2 entities on first and second node // trigger starting of 2 entities on first and second node
// but leave third node without entities // but leave third node without entities
List(1, 2, 4, 6).foreach(n => region.tell(Ping(n), pingProbe.ref)) List(1, 2, 4, 6).foreach(n => region.tell(PingPongActor.Ping(n), pingProbe.ref))
pingProbe.receiveWhile(messages = 4) { pingProbe.receiveWhile(messages = 4) {
case Pong => () case PingPongActor.Pong => ()
} }
} }
} }
@ -209,9 +165,9 @@ abstract class ClusterShardingGetStatsSpec
awaitAssert { awaitAssert {
val pingProbe = TestProbe() val pingProbe = TestProbe()
// make sure we have the 4 entities still alive across the fewer nodes // make sure we have the 4 entities still alive across the fewer nodes
List(1, 2, 4, 6).foreach(n => region.tell(Ping(n), pingProbe.ref)) List(1, 2, 4, 6).foreach(n => region.tell(PingPongActor.Ping(n), pingProbe.ref))
pingProbe.receiveWhile(messages = 4) { pingProbe.receiveWhile(messages = 4) {
case Pong => () case PingPongActor.Pong => ()
} }
} }
} }

View file

@ -4,77 +4,25 @@
package akka.cluster.sharding package akka.cluster.sharding
import scala.concurrent.duration._
import java.io.File
import akka.actor._ import akka.actor._
import akka.cluster.{ Cluster, MultiNodeClusterSpec }
import akka.cluster.sharding.ShardRegion.GracefulShutdown import akka.cluster.sharding.ShardRegion.GracefulShutdown
import akka.persistence.Persistence
import akka.persistence.journal.leveldb.{ SharedLeveldbJournal, SharedLeveldbStore }
import akka.remote.testconductor.RoleName import akka.remote.testconductor.RoleName
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec }
import akka.testkit._ import akka.testkit._
import com.typesafe.config.ConfigFactory
import org.apache.commons.io.FileUtils
import scala.concurrent.duration._ import scala.concurrent.duration._
object ClusterShardingGracefulShutdownSpec { abstract class ClusterShardingGracefulShutdownSpecConfig(mode: String)
case object StopEntity extends MultiNodeClusterShardingConfig(
mode,
class Entity extends Actor { additionalConfig = "akka.persistence.journal.leveldb-shared.store.native = off") {
def receive = {
case id: Int => sender() ! id
case StopEntity =>
context.stop(self)
}
}
val extractEntityId: ShardRegion.ExtractEntityId = {
case id: Int => (id.toString, id)
}
val extractShardId: ShardRegion.ExtractShardId = msg =>
msg match {
case id: Int => id.toString
}
}
abstract class ClusterShardingGracefulShutdownSpecConfig(val mode: String) extends MultiNodeConfig {
val first = role("first") val first = role("first")
val second = role("second") val second = role("second")
commonConfig(
ConfigFactory
.parseString(s"""
akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared {
timeout = 5s
store {
native = off
dir = "target/ClusterShardingGracefulShutdownSpec/journal"
}
}
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/ClusterShardingGracefulShutdownSpec/snapshots"
akka.cluster.sharding.state-store-mode = "$mode"
akka.cluster.sharding.distributed-data.durable.lmdb {
dir = target/ClusterShardingGracefulShutdownSpec/sharding-ddata
map-size = 10 MiB
}
""")
.withFallback(SharedLeveldbJournal.configToEnableJavaSerializationForTest)
.withFallback(MultiNodeClusterSpec.clusterConfig))
} }
object PersistentClusterShardingGracefulShutdownSpecConfig object PersistentClusterShardingGracefulShutdownSpecConfig
extends ClusterShardingGracefulShutdownSpecConfig("persistence") extends ClusterShardingGracefulShutdownSpecConfig(ClusterShardingSettings.StateStoreModePersistence)
object DDataClusterShardingGracefulShutdownSpecConfig extends ClusterShardingGracefulShutdownSpecConfig("ddata") object DDataClusterShardingGracefulShutdownSpecConfig
extends ClusterShardingGracefulShutdownSpecConfig(ClusterShardingSettings.StateStoreModeDData)
class PersistentClusterShardingGracefulShutdownSpec class PersistentClusterShardingGracefulShutdownSpec
extends ClusterShardingGracefulShutdownSpec(PersistentClusterShardingGracefulShutdownSpecConfig) extends ClusterShardingGracefulShutdownSpec(PersistentClusterShardingGracefulShutdownSpecConfig)
@ -87,76 +35,43 @@ class PersistentClusterShardingGracefulShutdownMultiJvmNode2 extends PersistentC
class DDataClusterShardingGracefulShutdownMultiJvmNode1 extends DDataClusterShardingGracefulShutdownSpec class DDataClusterShardingGracefulShutdownMultiJvmNode1 extends DDataClusterShardingGracefulShutdownSpec
class DDataClusterShardingGracefulShutdownMultiJvmNode2 extends DDataClusterShardingGracefulShutdownSpec class DDataClusterShardingGracefulShutdownMultiJvmNode2 extends DDataClusterShardingGracefulShutdownSpec
abstract class ClusterShardingGracefulShutdownSpec(config: ClusterShardingGracefulShutdownSpecConfig) abstract class ClusterShardingGracefulShutdownSpec(multiNodeConfig: ClusterShardingGracefulShutdownSpecConfig)
extends MultiNodeSpec(config) extends MultiNodeClusterShardingSpec(multiNodeConfig)
with STMultiNodeSpec
with ImplicitSender { with ImplicitSender {
import ClusterShardingGracefulShutdownSpec._
import config._
override def initialParticipants = roles.size import MultiNodeClusterShardingSpec.ShardedEntity
import multiNodeConfig._
val storageLocations = List( private val typeName = "Entity"
new File(system.settings.config.getString("akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile)
override protected def atStartup(): Unit = { def join(from: RoleName, to: RoleName, typeName: String): Unit = {
storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir)) super.join(from, to)
enterBarrier("startup")
}
override protected def afterTermination(): Unit = {
storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir))
}
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) { runOn(from) {
Cluster(system).join(node(to).address) startSharding(typeName)
startSharding()
} }
enterBarrier(from.name + "-joined") enterBarrier(s"$from-started")
} }
def startSharding(): Unit = { def startSharding(typeName: String): ActorRef =
val allocationStrategy = startSharding(
new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1) system,
ClusterSharding(system).start( typeName,
typeName = "Entity", entityProps = Props[ShardedEntity],
entityProps = Props[Entity], extractEntityId = MultiNodeClusterShardingSpec.intExtractEntityId,
settings = ClusterShardingSettings(system), extractShardId = MultiNodeClusterShardingSpec.intExtractShardId,
extractEntityId = extractEntityId, allocationStrategy =
extractShardId = extractShardId, new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1),
allocationStrategy, handOffStopMessage = ShardedEntity.Stop)
handOffStopMessage = StopEntity)
}
lazy val region = ClusterSharding(system).shardRegion("Entity") lazy val region = ClusterSharding(system).shardRegion(typeName)
def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData
s"Cluster sharding ($mode)" must { s"Cluster sharding ($mode)" must {
if (!isDdataMode) {
"setup shared journal" in {
// start the Persistence extension
Persistence(system)
runOn(first) {
system.actorOf(Props[SharedLeveldbStore], "store")
}
enterBarrier("peristence-started")
runOn(first, second) {
system.actorSelection(node(first) / "user" / "store") ! Identify(None)
val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get
SharedLeveldbJournal.setStore(sharedStore, system)
}
enterBarrier("after-1")
}
}
"start some shards in both regions" in within(30.seconds) { "start some shards in both regions" in within(30.seconds) {
join(first, first) startPersistenceIfNotDdataMode(startOn = first, setStoreOn = Seq(first, second))
join(second, first)
join(first, first, typeName)
join(second, first, typeName)
awaitAssert { awaitAssert {
val p = TestProbe() val p = TestProbe()
@ -197,16 +112,7 @@ abstract class ClusterShardingGracefulShutdownSpec(config: ClusterShardingGracef
"gracefully shutdown empty region" in within(30.seconds) { "gracefully shutdown empty region" in within(30.seconds) {
runOn(first) { runOn(first) {
val allocationStrategy = val regionEmpty = startSharding(typeName = "EntityEmpty")
new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1)
val regionEmpty = ClusterSharding(system).start(
typeName = "EntityEmpty",
entityProps = Props[Entity],
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId,
extractShardId = extractShardId,
allocationStrategy,
handOffStopMessage = StopEntity)
watch(regionEmpty) watch(regionEmpty)
regionEmpty ! GracefulShutdown regionEmpty ! GracefulShutdown

View file

@ -4,44 +4,23 @@
package akka.cluster.sharding package akka.cluster.sharding
import akka.cluster.MultiNodeClusterSpec
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec }
import akka.testkit._ import akka.testkit._
import com.typesafe.config.ConfigFactory
object ClusterShardingIncorrectSetupSpecConfig extends MultiNodeConfig { object ClusterShardingIncorrectSetupSpecConfig
extends MultiNodeClusterShardingConfig(additionalConfig = "akka.cluster.sharding.waiting-for-state-timeout = 100ms") {
val first = role("first") val first = role("first")
val second = role("second") val second = role("second")
val commonConfig = ConfigFactory.parseString("""
akka.loglevel = INFO
akka.cluster.sharding {
waiting-for-state-timeout = 100ms
}
""".stripMargin)
commonConfig(commonConfig.withFallback(MultiNodeClusterSpec.clusterConfig))
} }
class ClusterShardingIncorrectSetupMultiJvmNode1 extends ClusterShardingIncorrectSetupSpec class ClusterShardingIncorrectSetupMultiJvmNode1 extends ClusterShardingIncorrectSetupSpec
class ClusterShardingIncorrectSetupMultiJvmNode2 extends ClusterShardingIncorrectSetupSpec class ClusterShardingIncorrectSetupMultiJvmNode2 extends ClusterShardingIncorrectSetupSpec
object ClusterShardingIncorrectSetupSpec {
val extractEntityId: ShardRegion.ExtractEntityId = {
case id: Int => (id.toString, id)
}
val extractShardId: ShardRegion.ExtractShardId = {
case id: Int => id.toString
}
}
abstract class ClusterShardingIncorrectSetupSpec abstract class ClusterShardingIncorrectSetupSpec
extends MultiNodeSpec(ClusterShardingIncorrectSetupSpecConfig) extends MultiNodeClusterShardingSpec(ClusterShardingIncorrectSetupSpecConfig)
with MultiNodeClusterSpec
with ImplicitSender { with ImplicitSender {
import ClusterShardingIncorrectSetupSpec._
import ClusterShardingIncorrectSetupSpecConfig._ import ClusterShardingIncorrectSetupSpecConfig._
"Cluster sharding" must { "Cluster sharding" must {
@ -50,12 +29,7 @@ abstract class ClusterShardingIncorrectSetupSpec
enterBarrier("cluster-up") enterBarrier("cluster-up")
runOn(first) { runOn(first) {
EventFilter.error(pattern = """Has ClusterSharding been started on all nodes?""").intercept { EventFilter.error(pattern = """Has ClusterSharding been started on all nodes?""").intercept {
ClusterSharding(system).start( startSharding(system, typeName = "Entity", entityProps = TestActors.echoActorProps)
typeName = "Entity",
entityProps = TestActors.echoActorProps,
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId,
extractShardId = extractShardId)
} }
} }
enterBarrier("helpful error message logged") enterBarrier("helpful error message logged")

View file

@ -4,27 +4,11 @@
package akka.cluster.sharding package akka.cluster.sharding
import java.io.File
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.actor.Actor import akka.actor.{ Actor, ActorRef, Props }
import akka.actor.ActorIdentity
import akka.actor.ActorRef
import akka.actor.Identify
import akka.actor.Props
import akka.cluster.{ Cluster, MemberStatus, MultiNodeClusterSpec }
import akka.persistence.Persistence
import akka.persistence.journal.leveldb.SharedLeveldbJournal
import akka.persistence.journal.leveldb.SharedLeveldbStore
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.serialization.jackson.CborSerializable import akka.serialization.jackson.CborSerializable
import akka.testkit._ import akka.testkit._
import com.typesafe.config.ConfigFactory
import org.apache.commons.io.FileUtils
object ClusterShardingLeavingSpec { object ClusterShardingLeavingSpec {
case class Ping(id: String) extends CborSerializable case class Ping(id: String) extends CborSerializable
@ -55,42 +39,18 @@ object ClusterShardingLeavingSpec {
} }
} }
abstract class ClusterShardingLeavingSpecConfig(val mode: String) extends MultiNodeConfig { abstract class ClusterShardingLeavingSpecConfig(mode: String) extends MultiNodeClusterShardingConfig(mode) {
val first = role("first") val first = role("first")
val second = role("second") val second = role("second")
val third = role("third") val third = role("third")
val fourth = role("fourth") val fourth = role("fourth")
commonConfig(
ConfigFactory
.parseString(s"""
akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.remote.classic.log-remote-lifecycle-events = off
akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning
akka.cluster.testkit.auto-down-unreachable-after = 0s
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared {
timeout = 5s
store {
native = off
dir = "target/ClusterShardingLeavingSpec/journal"
}
}
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/ClusterShardingLeavingSpec/snapshots"
akka.cluster.sharding.state-store-mode = "$mode"
akka.cluster.sharding.distributed-data.durable.lmdb {
dir = target/ClusterShardingLeavingSpec/sharding-ddata
map-size = 10 MiB
}
""")
.withFallback(SharedLeveldbJournal.configToEnableJavaSerializationForTest)
.withFallback(MultiNodeClusterSpec.clusterConfig))
} }
object PersistentClusterShardingLeavingSpecConfig extends ClusterShardingLeavingSpecConfig("persistence") object PersistentClusterShardingLeavingSpecConfig
object DDataClusterShardingLeavingSpecConfig extends ClusterShardingLeavingSpecConfig("ddata") extends ClusterShardingLeavingSpecConfig(ClusterShardingSettings.StateStoreModePersistence)
object DDataClusterShardingLeavingSpecConfig
extends ClusterShardingLeavingSpecConfig(ClusterShardingSettings.StateStoreModeDData)
class PersistentClusterShardingLeavingSpec class PersistentClusterShardingLeavingSpec
extends ClusterShardingLeavingSpec(PersistentClusterShardingLeavingSpecConfig) extends ClusterShardingLeavingSpec(PersistentClusterShardingLeavingSpecConfig)
@ -106,79 +66,32 @@ class DDataClusterShardingLeavingMultiJvmNode2 extends DDataClusterShardingLeavi
class DDataClusterShardingLeavingMultiJvmNode3 extends DDataClusterShardingLeavingSpec class DDataClusterShardingLeavingMultiJvmNode3 extends DDataClusterShardingLeavingSpec
class DDataClusterShardingLeavingMultiJvmNode4 extends DDataClusterShardingLeavingSpec class DDataClusterShardingLeavingMultiJvmNode4 extends DDataClusterShardingLeavingSpec
abstract class ClusterShardingLeavingSpec(config: ClusterShardingLeavingSpecConfig) abstract class ClusterShardingLeavingSpec(multiNodeConfig: ClusterShardingLeavingSpecConfig)
extends MultiNodeSpec(config) extends MultiNodeClusterShardingSpec(multiNodeConfig)
with STMultiNodeSpec
with ImplicitSender { with ImplicitSender {
import ClusterShardingLeavingSpec._ import ClusterShardingLeavingSpec._
import config._ import multiNodeConfig._
override def initialParticipants = roles.size
val storageLocations = List(
new File(system.settings.config.getString("akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile)
override protected def atStartup(): Unit = {
storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir))
enterBarrier("startup")
}
override protected def afterTermination(): Unit = {
storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir))
}
val cluster = Cluster(system)
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
cluster.join(node(to).address)
startSharding()
within(15.seconds) {
awaitAssert(cluster.state.members.exists { m =>
m.uniqueAddress == cluster.selfUniqueAddress && m.status == MemberStatus.Up
} should be(true))
}
}
enterBarrier(from.name + "-joined")
}
def startSharding(): Unit = { def startSharding(): Unit = {
ClusterSharding(system).start( startSharding(
system,
typeName = "Entity", typeName = "Entity",
entityProps = Props[Entity], entityProps = Props[Entity],
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId, extractEntityId = extractEntityId,
extractShardId = extractShardId) extractShardId = extractShardId)
} }
lazy val region = ClusterSharding(system).shardRegion("Entity") lazy val region = ClusterSharding(system).shardRegion("Entity")
def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData
s"Cluster sharding ($mode) with leaving member" must { s"Cluster sharding ($mode) with leaving member" must {
if (!isDdataMode) {
"setup shared journal" in {
// start the Persistence extension
Persistence(system)
runOn(first) {
system.actorOf(Props[SharedLeveldbStore], "store")
}
enterBarrier("peristence-started")
system.actorSelection(node(first) / "user" / "store") ! Identify(None)
val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get
SharedLeveldbJournal.setStore(sharedStore, system)
enterBarrier("after-1")
}
}
"join cluster" in within(20.seconds) { "join cluster" in within(20.seconds) {
join(first, first) startPersistenceIfNotDdataMode(startOn = first, setStoreOn = roles)
join(second, first)
join(third, first) join(first, first, onJoinedRunOnFrom = startSharding())
join(fourth, first) join(second, first, onJoinedRunOnFrom = startSharding())
join(third, first, onJoinedRunOnFrom = startSharding())
join(fourth, first, onJoinedRunOnFrom = startSharding())
enterBarrier("after-2") enterBarrier("after-2")
} }

View file

@ -4,73 +4,33 @@
package akka.cluster.sharding package akka.cluster.sharding
import java.io.File
import akka.actor._ import akka.actor._
import akka.cluster.{ Cluster, MemberStatus, MultiNodeClusterSpec } import akka.cluster.MemberStatus
import akka.persistence.Persistence import akka.cluster.sharding.ShardRegion.{ ClusterShardingStats, GetClusterShardingStats }
import akka.persistence.journal.leveldb.{ SharedLeveldbJournal, SharedLeveldbStore }
import akka.remote.testconductor.RoleName
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec }
import akka.testkit._ import akka.testkit._
import com.typesafe.config.ConfigFactory
import org.apache.commons.io.FileUtils
import scala.concurrent.duration._
import akka.cluster.sharding.ShardRegion.GetClusterShardingStats
import akka.cluster.sharding.ShardRegion.ClusterShardingStats
import akka.util.ccompat._ import akka.util.ccompat._
import scala.concurrent.duration._
@ccompatUsedUntil213 @ccompatUsedUntil213
object ClusterShardingMinMembersSpec { abstract class ClusterShardingMinMembersSpecConfig(mode: String)
case object StopEntity extends MultiNodeClusterShardingConfig(
mode,
additionalConfig = s"""
akka.cluster.sharding.rebalance-interval = 120s #disable rebalance
akka.cluster.min-nr-of-members = 3
""") {
val extractEntityId: ShardRegion.ExtractEntityId = {
case id: Int => (id.toString, id)
}
val extractShardId: ShardRegion.ExtractShardId = msg =>
msg match {
case id: Int => id.toString
}
}
abstract class ClusterShardingMinMembersSpecConfig(val mode: String) extends MultiNodeConfig {
val first = role("first") val first = role("first")
val second = role("second") val second = role("second")
val third = role("third") val third = role("third")
commonConfig(
ConfigFactory
.parseString(s"""
akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared {
timeout = 5s
store {
native = off
dir = "target/ClusterShardingMinMembersSpec/journal"
}
}
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/ClusterShardingMinMembersSpec/snapshots"
akka.cluster.sharding.state-store-mode = "$mode"
akka.cluster.sharding.rebalance-interval = 120s #disable rebalance
akka.cluster.sharding.distributed-data.durable.lmdb {
dir = target/ClusterShardingMinMembersSpec/sharding-ddata
map-size = 10 MiB
}
akka.cluster.min-nr-of-members = 3
""")
.withFallback(SharedLeveldbJournal.configToEnableJavaSerializationForTest)
.withFallback(MultiNodeClusterSpec.clusterConfig))
} }
object PersistentClusterShardingMinMembersSpecConfig extends ClusterShardingMinMembersSpecConfig("persistence") object PersistentClusterShardingMinMembersSpecConfig
object DDataClusterShardingMinMembersSpecConfig extends ClusterShardingMinMembersSpecConfig("ddata") extends ClusterShardingMinMembersSpecConfig(ClusterShardingSettings.StateStoreModePersistence)
object DDataClusterShardingMinMembersSpecConfig
extends ClusterShardingMinMembersSpecConfig(ClusterShardingSettings.StateStoreModeDData)
class PersistentClusterShardingMinMembersSpec class PersistentClusterShardingMinMembersSpec
extends ClusterShardingMinMembersSpec(PersistentClusterShardingMinMembersSpecConfig) extends ClusterShardingMinMembersSpec(PersistentClusterShardingMinMembersSpecConfig)
@ -84,84 +44,35 @@ class DDataClusterShardingMinMembersMultiJvmNode1 extends DDataClusterShardingMi
class DDataClusterShardingMinMembersMultiJvmNode2 extends DDataClusterShardingMinMembersSpec class DDataClusterShardingMinMembersMultiJvmNode2 extends DDataClusterShardingMinMembersSpec
class DDataClusterShardingMinMembersMultiJvmNode3 extends DDataClusterShardingMinMembersSpec class DDataClusterShardingMinMembersMultiJvmNode3 extends DDataClusterShardingMinMembersSpec
abstract class ClusterShardingMinMembersSpec(config: ClusterShardingMinMembersSpecConfig) abstract class ClusterShardingMinMembersSpec(multiNodeConfig: ClusterShardingMinMembersSpecConfig)
extends MultiNodeSpec(config) extends MultiNodeClusterShardingSpec(multiNodeConfig)
with STMultiNodeSpec
with ImplicitSender { with ImplicitSender {
import ClusterShardingMinMembersSpec._ import MultiNodeClusterShardingSpec.ShardedEntity
import config._ import multiNodeConfig._
override def initialParticipants = roles.size
val storageLocations = List(
new File(system.settings.config.getString("akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile)
override protected def atStartup(): Unit = {
storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir))
enterBarrier("startup")
}
override protected def afterTermination(): Unit = {
storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir))
}
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
Cluster(system).join(node(to).address)
}
enterBarrier(from.name + "-joined")
}
val cluster = Cluster(system)
def startSharding(): Unit = { def startSharding(): Unit = {
val allocationStrategy = startSharding(
new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1) system,
ClusterSharding(system).start(
typeName = "Entity", typeName = "Entity",
entityProps = TestActors.echoActorProps, entityProps = TestActors.echoActorProps,
settings = ClusterShardingSettings(system), extractEntityId = MultiNodeClusterShardingSpec.intExtractEntityId,
extractEntityId = extractEntityId, extractShardId = MultiNodeClusterShardingSpec.intExtractShardId,
extractShardId = extractShardId, allocationStrategy =
allocationStrategy, new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1),
handOffStopMessage = StopEntity) handOffStopMessage = ShardedEntity.Stop)
} }
lazy val region = ClusterSharding(system).shardRegion("Entity") lazy val region = ClusterSharding(system).shardRegion("Entity")
def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData
s"Cluster with min-nr-of-members using sharding ($mode)" must { s"Cluster with min-nr-of-members using sharding ($mode)" must {
if (!isDdataMode) {
"setup shared journal" in {
// start the Persistence extension
Persistence(system)
runOn(first) {
system.actorOf(Props[SharedLeveldbStore], "store")
}
enterBarrier("peristence-started")
runOn(first, second, third) {
system.actorSelection(node(first) / "user" / "store") ! Identify(None)
val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get
SharedLeveldbJournal.setStore(sharedStore, system)
}
enterBarrier("after-1")
}
}
"use all nodes" in within(30.seconds) { "use all nodes" in within(30.seconds) {
join(first, first) startPersistenceIfNotDdataMode(startOn = first, setStoreOn = Seq(first, second, third))
runOn(first) {
startSharding() // the only test not asserting join status before starting to shard
} join(first, first, onJoinedRunOnFrom = startSharding(), assertNodeUp = false)
join(second, first) join(second, first, onJoinedRunOnFrom = startSharding(), assertNodeUp = false)
runOn(second) { join(third, first, assertNodeUp = false)
startSharding()
}
join(third, first)
// wait with starting sharding on third // wait with starting sharding on third
within(remaining) { within(remaining) {
awaitAssert { awaitAssert {

View file

@ -4,67 +4,44 @@
package akka.cluster.sharding package akka.cluster.sharding
import akka.actor.{ Actor, ActorLogging, ActorRef, Props } import scala.concurrent.duration._
import akka.cluster.MultiNodeClusterSpec
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec } import akka.actor.Props
import akka.serialization.jackson.CborSerializable
import akka.testkit.TestProbe import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.ScalaFutures import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.duration._
object ClusterShardingQueriesSpec { object ClusterShardingQueriesSpec {
case class Ping(id: Long) extends CborSerializable import MultiNodeClusterShardingSpec.PingPongActor
case object Pong extends CborSerializable
class EntityActor extends Actor with ActorLogging {
def receive: Receive = {
case _: Ping => sender() ! Pong
}
}
val extractEntityId: ShardRegion.ExtractEntityId = { val extractEntityId: ShardRegion.ExtractEntityId = {
case msg @ Ping(id) => (id.toString, msg) case msg @ PingPongActor.Ping(id) => (id.toString, msg)
} }
val numberOfShards = 6 val numberOfShards = 6
val extractShardId: ShardRegion.ExtractShardId = { val extractShardId: ShardRegion.ExtractShardId = {
case Ping(id) => (id % numberOfShards).toString case PingPongActor.Ping(id) => (id % numberOfShards).toString
} }
val shardTypeName = "DatatypeA" val shardTypeName = "DatatypeA"
} }
object ClusterShardingQueriesSpecConfig extends MultiNodeConfig { object ClusterShardingQueriesSpecConfig
extends MultiNodeClusterShardingConfig(additionalConfig = """
akka.log-dead-letters-during-shutdown = off
akka.cluster.sharding {
shard-region-query-timeout = 2ms
updating-state-timeout = 2s
waiting-for-state-timeout = 2s
}
""") {
val controller = role("controller") val controller = role("controller")
val busy = role("busy") val busy = role("busy")
val second = role("second") val second = role("second")
val third = role("third") val third = role("third")
commonConfig(
debugConfig(on = false)
.withFallback(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.remote.classic.log-remote-lifecycle-events = off
akka.log-dead-letters-during-shutdown = off
akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning
akka.cluster.testkit.auto-down-unreachable-after = 0s
akka.cluster.sharding {
state-store-mode = "ddata"
shard-region-query-timeout = 2ms
updating-state-timeout = 2s
waiting-for-state-timeout = 2s
}
akka.cluster.sharding.distributed-data.durable.lmdb {
dir = target/ClusterShardingGetStatsSpec/sharding-ddata
map-size = 10 MiB
}
""").withFallback(MultiNodeClusterSpec.clusterConfig)))
val shardRoles = ConfigFactory.parseString("""akka.cluster.roles=["shard"]""") val shardRoles = ConfigFactory.parseString("""akka.cluster.roles=["shard"]""")
nodeConfig(busy)( nodeConfig(busy)(
@ -79,29 +56,12 @@ class ClusterShardingQueriesSpecMultiJvmNode3 extends ClusterShardingQueriesSpec
class ClusterShardingQueriesSpecMultiJvmNode4 extends ClusterShardingQueriesSpec class ClusterShardingQueriesSpecMultiJvmNode4 extends ClusterShardingQueriesSpec
abstract class ClusterShardingQueriesSpec abstract class ClusterShardingQueriesSpec
extends MultiNodeSpec(ClusterShardingQueriesSpecConfig) extends MultiNodeClusterShardingSpec(ClusterShardingQueriesSpecConfig)
with MultiNodeClusterSpec
with ScalaFutures { with ScalaFutures {
import ClusterShardingQueriesSpec._ import ClusterShardingQueriesSpec._
import ClusterShardingQueriesSpecConfig._ import ClusterShardingQueriesSpecConfig._
import MultiNodeClusterShardingSpec.PingPongActor
def startShard(): ActorRef = {
ClusterSharding(system).start(
typeName = shardTypeName,
entityProps = Props(new EntityActor),
settings = ClusterShardingSettings(system).withRole("shard"),
extractEntityId = extractEntityId,
extractShardId = extractShardId)
}
def startProxy(): ActorRef = {
ClusterSharding(system).startProxy(
typeName = shardTypeName,
role = Some("shard"),
extractEntityId = extractEntityId,
extractShardId = extractShardId)
}
lazy val region = ClusterSharding(system).shardRegion(shardTypeName) lazy val region = ClusterSharding(system).shardRegion(shardTypeName)
@ -111,11 +71,22 @@ abstract class ClusterShardingQueriesSpec
awaitClusterUp(controller, busy, second, third) awaitClusterUp(controller, busy, second, third)
runOn(controller) { runOn(controller) {
startProxy() startProxy(
system,
typeName = shardTypeName,
role = Some("shard"),
extractEntityId = extractEntityId,
extractShardId = extractShardId)
} }
runOn(busy, second, third) { runOn(busy, second, third) {
startShard() startSharding(
system,
typeName = shardTypeName,
entityProps = Props(new PingPongActor),
settings = settings.withRole("shard"),
extractEntityId = extractEntityId,
extractShardId = extractShardId)
} }
enterBarrier("sharding started") enterBarrier("sharding started")
@ -126,9 +97,9 @@ abstract class ClusterShardingQueriesSpec
within(10.seconds) { within(10.seconds) {
awaitAssert { awaitAssert {
val pingProbe = TestProbe() val pingProbe = TestProbe()
(0 to 20).foreach(n => region.tell(Ping(n), pingProbe.ref)) (0 to 20).foreach(n => region.tell(PingPongActor.Ping(n), pingProbe.ref))
pingProbe.receiveWhile(messages = 20) { pingProbe.receiveWhile(messages = 20) {
case Pong => () case PingPongActor.Pong => ()
} }
} }
} }

View file

@ -4,48 +4,23 @@
package akka.cluster.sharding package akka.cluster.sharding
import scala.concurrent.Future
import scala.concurrent.duration._
import akka.Done import akka.Done
import akka.actor._ import akka.actor._
import akka.cluster.Cluster
import akka.cluster.MemberStatus import akka.cluster.MemberStatus
import akka.cluster.MultiNodeClusterSpec import akka.testkit.{ ImplicitSender, TestProbe }
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig import scala.concurrent.Future
import akka.remote.testkit.MultiNodeSpec import scala.concurrent.duration._
import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._
import com.typesafe.config.ConfigFactory
/** /**
* Test for issue #28416 * Test for issue #28416
*/ */
object ClusterShardingRegistrationCoordinatedShutdownSpec extends MultiNodeConfig { object ClusterShardingRegistrationCoordinatedShutdownSpec extends MultiNodeClusterShardingConfig {
val first = role("first") val first = role("first")
val second = role("second") val second = role("second")
val third = role("third") val third = role("third")
commonConfig(ConfigFactory.parseString(s"""
akka.loglevel = DEBUG # FIXME
""").withFallback(MultiNodeClusterSpec.clusterConfig))
class Entity extends Actor {
def receive = {
case id: Int => sender() ! id
}
}
val extractEntityId: ShardRegion.ExtractEntityId = {
case id: Int => (id.toString, id)
}
val extractShardId: ShardRegion.ExtractShardId = msg =>
msg match {
case id: Int => id.toString
}
} }
class ClusterShardingRegistrationCoordinatedShutdownMultiJvmNode1 class ClusterShardingRegistrationCoordinatedShutdownMultiJvmNode1
@ -56,35 +31,14 @@ class ClusterShardingRegistrationCoordinatedShutdownMultiJvmNode3
extends ClusterShardingRegistrationCoordinatedShutdownSpec extends ClusterShardingRegistrationCoordinatedShutdownSpec
abstract class ClusterShardingRegistrationCoordinatedShutdownSpec abstract class ClusterShardingRegistrationCoordinatedShutdownSpec
extends MultiNodeSpec(ClusterShardingRegistrationCoordinatedShutdownSpec) extends MultiNodeClusterShardingSpec(ClusterShardingRegistrationCoordinatedShutdownSpec)
with STMultiNodeSpec
with ImplicitSender { with ImplicitSender {
import ClusterShardingRegistrationCoordinatedShutdownSpec._ import ClusterShardingRegistrationCoordinatedShutdownSpec._
import MultiNodeClusterShardingSpec.ShardedEntity
override def initialParticipants: Int = roles.size
private val cluster = Cluster(system)
private lazy val region = ClusterSharding(system).shardRegion("Entity") private lazy val region = ClusterSharding(system).shardRegion("Entity")
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
cluster.join(node(to).address)
}
runOn(from) {
cluster.state.members.exists(m => m.uniqueAddress == cluster.selfUniqueAddress && m.status == MemberStatus.Up)
}
enterBarrier(from.name + "-joined")
}
def startSharding(): Unit = {
ClusterSharding(system).start(
typeName = "Entity",
entityProps = Props[Entity],
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId,
extractShardId = extractShardId)
}
s"Region registration during CoordinatedShutdown" must { s"Region registration during CoordinatedShutdown" must {
"try next oldest" in within(30.seconds) { "try next oldest" in within(30.seconds) {
@ -109,7 +63,12 @@ abstract class ClusterShardingRegistrationCoordinatedShutdownSpec
} }
startSharding() startSharding(
system,
typeName = "Entity",
entityProps = Props[ShardedEntity],
extractEntityId = MultiNodeClusterShardingSpec.intExtractEntityId,
extractShardId = MultiNodeClusterShardingSpec.intExtractShardId)
enterBarrier("before-shutdown") enterBarrier("before-shutdown")

View file

@ -4,17 +4,11 @@
package akka.cluster.sharding package akka.cluster.sharding
import java.io.File
import akka.actor._ import akka.actor._
import akka.cluster.{ Cluster, MemberStatus, MultiNodeClusterSpec } import akka.cluster.{ Cluster, MemberStatus }
import akka.persistence.Persistence import akka.persistence.journal.leveldb.SharedLeveldbJournal
import akka.persistence.journal.leveldb.{ SharedLeveldbJournal, SharedLeveldbStore }
import akka.remote.testconductor.RoleName
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec }
import akka.testkit._ import akka.testkit._
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import org.apache.commons.io.FileUtils
import scala.concurrent.duration._ import scala.concurrent.duration._
@ -52,37 +46,14 @@ object ClusterShardingRememberEntitiesNewExtractorSpec {
} }
abstract class ClusterShardingRememberEntitiesNewExtractorSpecConfig(val mode: String) extends MultiNodeConfig { abstract class ClusterShardingRememberEntitiesNewExtractorSpecConfig(mode: String)
extends MultiNodeClusterShardingConfig(
mode,
additionalConfig = "akka.persistence.journal.leveldb-shared.store.native = off") {
val first = role("first") val first = role("first")
val second = role("second") val second = role("second")
val third = role("third") val third = role("third")
commonConfig(
ConfigFactory
.parseString(s"""
akka.actor.provider = "cluster"
akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning
akka.cluster.testkit.auto-down-unreachable-after = 0s
akka.remote.classic.log-remote-lifecycle-events = off
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared {
timeout = 5s
store {
native = off
dir = "target/ShardingRememberEntitiesNewExtractorSpec/journal"
}
}
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/ShardingRememberEntitiesNewExtractorSpec/snapshots"
akka.cluster.sharding.state-store-mode = "$mode"
akka.cluster.sharding.distributed-data.durable.lmdb {
dir = target/ShardingRememberEntitiesNewExtractorSpec/sharding-ddata
map-size = 10 MiB
}
""")
.withFallback(SharedLeveldbJournal.configToEnableJavaSerializationForTest)
.withFallback(MultiNodeClusterSpec.clusterConfig))
val roleConfig = ConfigFactory.parseString(""" val roleConfig = ConfigFactory.parseString("""
akka.cluster.roles = [sharding] akka.cluster.roles = [sharding]
""") """)
@ -132,82 +103,41 @@ class DDataClusterShardingRememberEntitiesNewExtractorMultiJvmNode3
extends DDataClusterShardingRememberEntitiesNewExtractorSpec extends DDataClusterShardingRememberEntitiesNewExtractorSpec
abstract class ClusterShardingRememberEntitiesNewExtractorSpec( abstract class ClusterShardingRememberEntitiesNewExtractorSpec(
config: ClusterShardingRememberEntitiesNewExtractorSpecConfig) multiNodeConfig: ClusterShardingRememberEntitiesNewExtractorSpecConfig)
extends MultiNodeSpec(config) extends MultiNodeClusterShardingSpec(multiNodeConfig)
with STMultiNodeSpec
with ImplicitSender { with ImplicitSender {
import ClusterShardingRememberEntitiesNewExtractorSpec._ import ClusterShardingRememberEntitiesNewExtractorSpec._
import config._ import multiNodeConfig._
val typeName = "Entity" val typeName = "Entity"
override def initialParticipants = roles.size
val storageLocations = List(
new File(system.settings.config.getString("akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile)
override protected def atStartup(): Unit = {
storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir))
enterBarrier("startup")
}
override protected def afterTermination(): Unit = {
storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir))
}
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
Cluster(system).join(node(to).address)
}
enterBarrier(from.name + "-joined")
}
val cluster = Cluster(system)
def startShardingWithExtractor1(): Unit = { def startShardingWithExtractor1(): Unit = {
ClusterSharding(system).start( startSharding(
system,
typeName = typeName, typeName = typeName,
entityProps = ClusterShardingRememberEntitiesNewExtractorSpec.props(None), entityProps = ClusterShardingRememberEntitiesNewExtractorSpec.props(None),
settings = ClusterShardingSettings(system).withRememberEntities(true).withRole("sharding"), settings = settings.withRole("sharding"),
extractEntityId = extractEntityId, extractEntityId = extractEntityId,
extractShardId = extractShardId1) extractShardId = extractShardId1)
} }
def startShardingWithExtractor2(sys: ActorSystem, probe: ActorRef): Unit = { def startShardingWithExtractor2(sys: ActorSystem, probe: ActorRef): Unit = {
ClusterSharding(sys).start( startSharding(
sys,
typeName = typeName, typeName = typeName,
entityProps = ClusterShardingRememberEntitiesNewExtractorSpec.props(Some(probe)), entityProps = ClusterShardingRememberEntitiesNewExtractorSpec.props(Some(probe)),
settings = ClusterShardingSettings(system).withRememberEntities(true).withRole("sharding"), settings = ClusterShardingSettings(sys).withRememberEntities(config.rememberEntities).withRole("sharding"),
extractEntityId = extractEntityId, extractEntityId = extractEntityId,
extractShardId = extractShardId2) extractShardId = extractShardId2)
} }
def region(sys: ActorSystem = system) = ClusterSharding(sys).shardRegion(typeName) def region(sys: ActorSystem = system) = ClusterSharding(sys).shardRegion(typeName)
def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData
s"Cluster with min-nr-of-members using sharding ($mode)" must { s"Cluster with min-nr-of-members using sharding ($mode)" must {
if (!isDdataMode) {
"setup shared journal" in {
// start the Persistence extension
Persistence(system)
runOn(first) {
system.actorOf(Props[SharedLeveldbStore], "store")
}
enterBarrier("persistence-started")
runOn(second, third) {
system.actorSelection(node(first) / "user" / "store") ! Identify(None)
val sharedStore = expectMsgType[ActorIdentity](10.seconds).ref.get
SharedLeveldbJournal.setStore(sharedStore, system)
}
enterBarrier("after-1")
}
}
"start up first cluster and sharding" in within(15.seconds) { "start up first cluster and sharding" in within(15.seconds) {
startPersistenceIfNotDdataMode(startOn = first, setStoreOn = Seq(second, third))
join(first, first) join(first, first)
join(second, first) join(second, first)
join(third, first) join(third, first)

View file

@ -4,23 +4,15 @@
package akka.cluster.sharding package akka.cluster.sharding
import java.io.File
import java.util.concurrent.TimeUnit.NANOSECONDS import java.util.concurrent.TimeUnit.NANOSECONDS
import scala.concurrent.duration._
import akka.actor._ import akka.actor._
import akka.cluster.Cluster
import akka.cluster.MemberStatus import akka.cluster.MemberStatus
import akka.cluster.MultiNodeClusterSpec
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._ import akka.testkit._
import akka.util.ccompat._ import akka.util.ccompat._
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import org.apache.commons.io.FileUtils
import scala.concurrent.duration._
@ccompatUsedUntil213 @ccompatUsedUntil213
object ClusterShardingRememberEntitiesPerfSpec { object ClusterShardingRememberEntitiesPerfSpec {
@ -48,32 +40,22 @@ object ClusterShardingRememberEntitiesPerfSpec {
} }
object ClusterShardingRememberEntitiesPerfSpecConfig extends MultiNodeConfig { object ClusterShardingRememberEntitiesPerfSpecConfig extends MultiNodeClusterShardingConfig(additionalConfig = s"""
akka.testconductor.barrier-timeout = 3 minutes
akka.remote.artery.advanced.outbound-message-queue-size = 10000
akka.remote.artery.advanced.maximum-frame-size = 512 KiB
# comment next line to enable durable lmdb storage
akka.cluster.sharding.distributed-data.durable.keys = []
""") {
val first = role("first") val first = role("first")
val second = role("second") val second = role("second")
val third = role("third") val third = role("third")
commonConfig(ConfigFactory.parseString(s"""
akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning
akka.cluster.testkit.auto-down-unreachable-after = 0s
akka.remote.log-remote-lifecycle-events = off
akka.testconductor.barrier-timeout = 3 minutes
akka.remote.artery.advanced.outbound-message-queue-size = 10000
akka.remote.artery.advanced.maximum-frame-size = 512 KiB
akka.cluster.sharding.state-store-mode = "ddata"
akka.cluster.sharding.distributed-data.durable.lmdb {
dir = target/ShardingRememberEntitiesPerfSpec/sharding-ddata
}
# comment next line to enable durable lmdb storage
akka.cluster.sharding.distributed-data.durable.keys = []
""").withFallback(MultiNodeClusterSpec.clusterConfig))
nodeConfig(third)(ConfigFactory.parseString(s""" nodeConfig(third)(ConfigFactory.parseString(s"""
akka.cluster.sharding.distributed-data.durable.lmdb { akka.cluster.sharding.distributed-data.durable.lmdb {
# use same directory when starting new node on third (not used at same time) # use same directory when starting new node on third (not used at same time)
dir = target/ShardingRememberEntitiesSpec/sharding-third dir = "$targetDir/sharding-third"
} }
""")) """))
} }
@ -83,41 +65,17 @@ class ClusterShardingRememberEntitiesPerfSpecMultiJvmNode2 extends ClusterShardi
class ClusterShardingRememberEntitiesPerfSpecMultiJvmNode3 extends ClusterShardingRememberEntitiesPerfSpec class ClusterShardingRememberEntitiesPerfSpecMultiJvmNode3 extends ClusterShardingRememberEntitiesPerfSpec
abstract class ClusterShardingRememberEntitiesPerfSpec abstract class ClusterShardingRememberEntitiesPerfSpec
extends MultiNodeSpec(ClusterShardingRememberEntitiesPerfSpecConfig) extends MultiNodeClusterShardingSpec(ClusterShardingRememberEntitiesPerfSpecConfig)
with STMultiNodeSpec
with ImplicitSender { with ImplicitSender {
import ClusterShardingRememberEntitiesPerfSpec._ import ClusterShardingRememberEntitiesPerfSpec._
import ClusterShardingRememberEntitiesPerfSpecConfig._ import ClusterShardingRememberEntitiesPerfSpecConfig._
override def initialParticipants = roles.size
val storageLocations = List(
new File(system.settings.config.getString("akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile)
override protected def atStartup(): Unit = {
storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir))
enterBarrier("startup")
}
override protected def afterTermination(): Unit = {
storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir))
}
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
Cluster(system).join(node(to).address)
}
enterBarrier(from.name + "-joined")
}
val cluster = Cluster(system)
def startSharding(): Unit = { def startSharding(): Unit = {
(1 to 3).foreach { n => (1 to 3).foreach { n =>
ClusterSharding(system).start( startSharding(
system,
typeName = s"Entity$n", typeName = s"Entity$n",
entityProps = ClusterShardingRememberEntitiesPerfSpec.props(), entityProps = ClusterShardingRememberEntitiesPerfSpec.props(),
settings = ClusterShardingSettings(system).withRememberEntities(true),
extractEntityId = extractEntityId, extractEntityId = extractEntityId,
extractShardId = extractShardId) extractShardId = extractShardId)
} }

View file

@ -4,36 +4,17 @@
package akka.cluster.sharding package akka.cluster.sharding
import java.io.File import scala.concurrent.duration._
import akka.actor._ import akka.actor._
import akka.cluster.{ Cluster, MemberStatus, MultiNodeClusterSpec } import akka.cluster.{ Cluster, MemberStatus }
import akka.persistence.Persistence
import akka.persistence.journal.leveldb.{ SharedLeveldbJournal, SharedLeveldbStore }
import akka.remote.testconductor.RoleName
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec }
import akka.testkit._ import akka.testkit._
import com.typesafe.config.ConfigFactory
import org.apache.commons.io.FileUtils
import akka.util.ccompat._ import akka.util.ccompat._
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
@ccompatUsedUntil213 @ccompatUsedUntil213
object ClusterShardingRememberEntitiesSpec { object ClusterShardingRememberEntitiesSpec {
final case class Started(ref: ActorRef)
def props(probe: ActorRef): Props = Props(new TestEntity(probe))
class TestEntity(probe: ActorRef) extends Actor {
probe ! Started(self)
def receive = {
case m => sender() ! m
}
}
val extractEntityId: ShardRegion.ExtractEntityId = { val extractEntityId: ShardRegion.ExtractEntityId = {
case id: Int => (id.toString, id) case id: Int => (id.toString, id)
} }
@ -46,43 +27,20 @@ object ClusterShardingRememberEntitiesSpec {
} }
abstract class ClusterShardingRememberEntitiesSpecConfig(val mode: String, val rememberEntities: Boolean) abstract class ClusterShardingRememberEntitiesSpecConfig(mode: String, rememberEntities: Boolean)
extends MultiNodeConfig { extends MultiNodeClusterShardingConfig(
mode,
rememberEntities,
additionalConfig = s"""
akka.testconductor.barrier-timeout = 60 s
akka.test.single-expect-default = 60 s
akka.persistence.journal.leveldb-shared.store.native = off
""") {
val first = role("first") val first = role("first")
val second = role("second") val second = role("second")
val third = role("third") val third = role("third")
val targetDir = s"target/ClusterShardingRememberEntitiesSpec-$mode-remember-$rememberEntities"
val modeConfig =
if (mode == ClusterShardingSettings.StateStoreModeDData) ConfigFactory.empty
else ConfigFactory.parseString(s"""
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared.timeout = 5s
akka.persistence.journal.leveldb-shared.store.native = off
akka.persistence.journal.leveldb-shared.store.dir = "$targetDir/journal"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "$targetDir/snapshots"
""")
commonConfig(
modeConfig
.withFallback(ConfigFactory.parseString(s"""
akka.actor.provider = "cluster"
akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning
akka.cluster.testkit.auto-down-unreachable-after = 0s
akka.remote.log-remote-lifecycle-events = off
akka.cluster.sharding.state-store-mode = "$mode"
akka.cluster.sharding.distributed-data.durable.lmdb {
dir = $targetDir/sharding-ddata
map-size = 10 MiB
}
akka.testconductor.barrier-timeout = 60 s
akka.test.single-expect-default = 60 s
"""))
.withFallback(SharedLeveldbJournal.configToEnableJavaSerializationForTest)
.withFallback(MultiNodeClusterSpec.clusterConfig))
nodeConfig(third)(ConfigFactory.parseString(s""" nodeConfig(third)(ConfigFactory.parseString(s"""
akka.cluster.sharding.distributed-data.durable.lmdb { akka.cluster.sharding.distributed-data.durable.lmdb {
# use same directory when starting new node on third (not used at same time) # use same directory when starting new node on third (not used at same time)
@ -99,10 +57,10 @@ class PersistentClusterShardingRememberEntitiesSpecConfig(rememberEntities: Bool
class DDataClusterShardingRememberEntitiesSpecConfig(rememberEntities: Boolean) class DDataClusterShardingRememberEntitiesSpecConfig(rememberEntities: Boolean)
extends ClusterShardingRememberEntitiesSpecConfig(ClusterShardingSettings.StateStoreModeDData, rememberEntities) extends ClusterShardingRememberEntitiesSpecConfig(ClusterShardingSettings.StateStoreModeDData, rememberEntities)
abstract class PersistentClusterShardingRememberEntitiesSpec(val rememberEntities: Boolean) abstract class PersistentClusterShardingRememberEntitiesSpec(rememberEntities: Boolean)
extends ClusterShardingRememberEntitiesSpec( extends ClusterShardingRememberEntitiesSpec(
new PersistentClusterShardingRememberEntitiesSpecConfig(rememberEntities)) new PersistentClusterShardingRememberEntitiesSpecConfig(rememberEntities))
abstract class DDataClusterShardingRememberEntitiesSpec(val rememberEntities: Boolean) abstract class DDataClusterShardingRememberEntitiesSpec(rememberEntities: Boolean)
extends ClusterShardingRememberEntitiesSpec(new DDataClusterShardingRememberEntitiesSpecConfig(rememberEntities)) extends ClusterShardingRememberEntitiesSpec(new DDataClusterShardingRememberEntitiesSpecConfig(rememberEntities))
class PersistentClusterShardingRememberEntitiesEnabledMultiJvmNode1 class PersistentClusterShardingRememberEntitiesEnabledMultiJvmNode1
@ -127,45 +85,20 @@ class DDataClusterShardingRememberEntitiesDefaultMultiJvmNode1 extends DDataClus
class DDataClusterShardingRememberEntitiesDefaultMultiJvmNode2 extends DDataClusterShardingRememberEntitiesSpec(false) class DDataClusterShardingRememberEntitiesDefaultMultiJvmNode2 extends DDataClusterShardingRememberEntitiesSpec(false)
class DDataClusterShardingRememberEntitiesDefaultMultiJvmNode3 extends DDataClusterShardingRememberEntitiesSpec(false) class DDataClusterShardingRememberEntitiesDefaultMultiJvmNode3 extends DDataClusterShardingRememberEntitiesSpec(false)
abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememberEntitiesSpecConfig) abstract class ClusterShardingRememberEntitiesSpec(multiNodeConfig: ClusterShardingRememberEntitiesSpecConfig)
extends MultiNodeSpec(config) extends MultiNodeClusterShardingSpec(multiNodeConfig)
with STMultiNodeSpec
with ImplicitSender { with ImplicitSender {
import ClusterShardingRememberEntitiesSpec._ import ClusterShardingRememberEntitiesSpec._
import config._ import MultiNodeClusterShardingSpec.EntityActor
import multiNodeConfig._
override def initialParticipants: Int = roles.size
val dataType = "Entity" val dataType = "Entity"
val storageLocations = List(
new File(system.settings.config.getString("akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile)
override protected def atStartup(): Unit = {
storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir))
enterBarrier("startup")
}
override protected def afterTermination(): Unit = {
storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir))
}
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
Cluster(system).join(node(to).address)
awaitAssert {
Cluster(system).state.isMemberUp(node(from).address)
}
}
enterBarrier(from.name + "-joined")
}
val cluster = Cluster(system)
def startSharding(sys: ActorSystem, probe: ActorRef): ActorRef = { def startSharding(sys: ActorSystem, probe: ActorRef): ActorRef = {
ClusterSharding(sys).start( startSharding(
sys,
typeName = dataType, typeName = dataType,
entityProps = ClusterShardingRememberEntitiesSpec.props(probe), entityProps = Props(new EntityActor(probe)),
settings = ClusterShardingSettings(sys).withRememberEntities(rememberEntities), settings = ClusterShardingSettings(sys).withRememberEntities(rememberEntities),
extractEntityId = extractEntityId, extractEntityId = extractEntityId,
extractShardId = extractShardId) extractShardId = extractShardId)
@ -173,45 +106,24 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb
lazy val region = ClusterSharding(system).shardRegion(dataType) lazy val region = ClusterSharding(system).shardRegion(dataType)
def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData def expectEntityRestarted(
sys: ActorSystem,
def expectEntityRestarted(sys: ActorSystem, event: Int, probe: TestProbe, entityProbe: TestProbe): Started = { event: Int,
probe: TestProbe,
entityProbe: TestProbe): EntityActor.Started = {
if (!rememberEntities) { if (!rememberEntities) {
probe.send(ClusterSharding(sys).shardRegion(dataType), event) probe.send(ClusterSharding(sys).shardRegion(dataType), event)
probe.expectMsg(1) probe.expectMsg(1)
} }
entityProbe.expectMsgType[Started](30.seconds) entityProbe.expectMsgType[EntityActor.Started](30.seconds)
}
def setStoreIfNotDdata(sys: ActorSystem): Unit =
if (!isDdataMode) {
val probe = TestProbe()(sys)
sys.actorSelection(node(first) / "user" / "store").tell(Identify(None), probe.ref)
val sharedStore = probe.expectMsgType[ActorIdentity](20.seconds).ref.get
SharedLeveldbJournal.setStore(sharedStore, sys)
} }
s"Cluster sharding with remember entities ($mode)" must { s"Cluster sharding with remember entities ($mode)" must {
if (!isDdataMode) {
"setup shared journal" in {
// start the Persistence extension
Persistence(system)
runOn(first) {
system.actorOf(Props[SharedLeveldbStore], "store")
}
enterBarrier("persistence-started")
runOn(first, second, third) {
setStoreIfNotDdata(system)
}
enterBarrier("after-1")
}
}
"start remembered entities when coordinator fail over" in within(30.seconds) { "start remembered entities when coordinator fail over" in within(30.seconds) {
startPersistenceIfNotDdataMode(startOn = first, setStoreOn = Seq(first, second, third))
val entityProbe = TestProbe() val entityProbe = TestProbe()
val probe = TestProbe() val probe = TestProbe()
join(second, second) join(second, second)
@ -219,7 +131,7 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb
startSharding(system, entityProbe.ref) startSharding(system, entityProbe.ref)
probe.send(region, 1) probe.send(region, 1)
probe.expectMsg(1) probe.expectMsg(1)
entityProbe.expectMsgType[Started] entityProbe.expectMsgType[EntityActor.Started]
} }
enterBarrier("second-started") enterBarrier("second-started")
@ -269,7 +181,7 @@ abstract class ClusterShardingRememberEntitiesSpec(config: ClusterShardingRememb
val entityProbe2 = TestProbe()(sys2) val entityProbe2 = TestProbe()(sys2)
val probe2 = TestProbe()(sys2) val probe2 = TestProbe()(sys2)
setStoreIfNotDdata(sys2) if (!isDdataMode) setStore(sys2, storeOn = first)
Cluster(sys2).join(Cluster(sys2).selfAddress) Cluster(sys2).join(Cluster(sys2).selfAddress)

View file

@ -5,10 +5,7 @@
package akka.cluster.sharding package akka.cluster.sharding
import akka.actor._ import akka.actor._
import akka.cluster.MultiNodeClusterSpec
import akka.cluster.sharding.ShardRegion.{ ClusterShardingStats, GetClusterShardingStats } import akka.cluster.sharding.ShardRegion.{ ClusterShardingStats, GetClusterShardingStats }
import akka.persistence.journal.leveldb.SharedLeveldbJournal
import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec }
import akka.testkit._ import akka.testkit._
import com.typesafe.config.{ Config, ConfigFactory } import com.typesafe.config.{ Config, ConfigFactory }
@ -39,7 +36,7 @@ object E2 {
} }
} }
abstract class ClusterShardingMinMembersPerRoleConfig extends MultiNodeConfig { abstract class ClusterShardingMinMembersPerRoleConfig extends MultiNodeClusterShardingConfig {
val first = role("first") val first = role("first")
val second = role("second") val second = role("second")
val third = role("third") val third = role("third")
@ -49,19 +46,6 @@ abstract class ClusterShardingMinMembersPerRoleConfig extends MultiNodeConfig {
val r1Config: Config = ConfigFactory.parseString("""akka.cluster.roles = [ "R1" ]""") val r1Config: Config = ConfigFactory.parseString("""akka.cluster.roles = [ "R1" ]""")
val r2Config: Config = ConfigFactory.parseString("""akka.cluster.roles = [ "R2" ]""") val r2Config: Config = ConfigFactory.parseString("""akka.cluster.roles = [ "R2" ]""")
commonConfig(
ConfigFactory
.parseString("""
akka.actor.provider = "cluster"
akka.cluster.sharding.state-store-mode = "ddata"
akka.cluster.sharding.distributed-data.durable.lmdb {
dir = target/ClusterShardingMinMembersSpec/sharding-
map-size = 10 MiB
}
""")
.withFallback(SharedLeveldbJournal.configToEnableJavaSerializationForTest)
.withFallback(MultiNodeClusterSpec.clusterConfig))
} }
object ClusterShardingMinMembersPerRoleNotConfiguredConfig extends ClusterShardingMinMembersPerRoleConfig { object ClusterShardingMinMembersPerRoleNotConfiguredConfig extends ClusterShardingMinMembersPerRoleConfig {
@ -110,14 +94,11 @@ class ClusterShardingMinMembersPerRoleSpecMultiJvmNode3 extends ClusterShardingM
class ClusterShardingMinMembersPerRoleSpecMultiJvmNode4 extends ClusterShardingMinMembersPerRoleSpec class ClusterShardingMinMembersPerRoleSpecMultiJvmNode4 extends ClusterShardingMinMembersPerRoleSpec
class ClusterShardingMinMembersPerRoleSpecMultiJvmNode5 extends ClusterShardingMinMembersPerRoleSpec class ClusterShardingMinMembersPerRoleSpecMultiJvmNode5 extends ClusterShardingMinMembersPerRoleSpec
abstract class ClusterShardingRolePartitioningSpec(config: ClusterShardingMinMembersPerRoleConfig) abstract class ClusterShardingRolePartitioningSpec(multiNodeConfig: ClusterShardingMinMembersPerRoleConfig)
extends MultiNodeSpec(config) extends MultiNodeClusterShardingSpec(multiNodeConfig)
with MultiNodeClusterSpec
with ImplicitSender { with ImplicitSender {
import config._ import multiNodeConfig._
override def initialParticipants: Int = roles.size
private val fourthAddress = node(fourth).address private val fourthAddress = node(fourth).address
private val fifthAddress = node(fifth).address private val fifthAddress = node(fifth).address
@ -126,20 +107,22 @@ abstract class ClusterShardingRolePartitioningSpec(config: ClusterShardingMinMem
"start the cluster, await convergence, init sharding on every node: 2 data types, 'akka.cluster.min-nr-of-members=2', partition shard location by 2 roles" in { "start the cluster, await convergence, init sharding on every node: 2 data types, 'akka.cluster.min-nr-of-members=2', partition shard location by 2 roles" in {
// start sharding early // start sharding early
ClusterSharding(system).start( startSharding(
system,
typeName = E1.TypeKey, typeName = E1.TypeKey,
entityProps = TestActors.echoActorProps, entityProps = TestActors.echoActorProps,
// nodes 1,2,3: role R1, shard region E1, proxy region E2 // nodes 1,2,3: role R1, shard region E1, proxy region E2
settings = ClusterShardingSettings(system).withRole("R1"), settings = settings.withRole("R1"),
extractEntityId = E1.extractEntityId, extractEntityId = E1.extractEntityId,
extractShardId = E1.extractShardId) extractShardId = E1.extractShardId)
// when run on first, second and third (role R1) proxy region is started // when run on first, second and third (role R1) proxy region is started
ClusterSharding(system).start( startSharding(
system,
typeName = E2.TypeKey, typeName = E2.TypeKey,
entityProps = TestActors.echoActorProps, entityProps = TestActors.echoActorProps,
// nodes 4,5: role R2, shard region E2, proxy region E1 // nodes 4,5: role R2, shard region E2, proxy region E1
settings = ClusterShardingSettings(system).withRole("R2"), settings = settings.withRole("R2"),
extractEntityId = E2.extractEntityId, extractEntityId = E2.extractEntityId,
extractShardId = E2.extractShardId) extractShardId = E2.extractShardId)

View file

@ -4,53 +4,25 @@
package akka.cluster.sharding package akka.cluster.sharding
import scala.concurrent.duration._
import akka.actor._ import akka.actor._
import akka.cluster.Cluster
import akka.cluster.MultiNodeClusterSpec
import akka.remote.testconductor.RoleName import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.remote.transport.ThrottlerTransportAdapter.Direction import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.testkit._ import akka.testkit._
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
/** /**
* one-to-one mapping between shards and entities is not efficient but some use that anyway * one-to-one mapping between shards and entities is not efficient but some use that anyway
*/ */
object ClusterShardingSingleShardPerEntitySpec { object ClusterShardingSingleShardPerEntitySpecConfig
class Entity extends Actor { extends MultiNodeClusterShardingConfig(additionalConfig = "akka.cluster.sharding.updating-state-timeout = 1s") {
def receive = {
case id: Int => sender() ! id
}
}
val extractEntityId: ShardRegion.ExtractEntityId = {
case id: Int => (id.toString, id)
}
val extractShardId: ShardRegion.ExtractShardId = {
case id: Int => id.toString
}
}
object ClusterShardingSingleShardPerEntitySpecConfig extends MultiNodeConfig {
val first = role("first") val first = role("first")
val second = role("second") val second = role("second")
val third = role("third") val third = role("third")
val fourth = role("fourth") val fourth = role("fourth")
val fifth = role("fifth") val fifth = role("fifth")
commonConfig(ConfigFactory.parseString(s"""
akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.cluster.sharding.state-store-mode = ddata
akka.cluster.sharding.updating-state-timeout = 1s
""").withFallback(MultiNodeClusterSpec.clusterConfig))
testTransport(on = true) testTransport(on = true)
} }
@ -61,29 +33,21 @@ class ClusterShardingSingleShardPerEntitySpecMultiJvmNode4 extends ClusterShardi
class ClusterShardingSingleShardPerEntitySpecMultiJvmNode5 extends ClusterShardingSingleShardPerEntitySpec class ClusterShardingSingleShardPerEntitySpecMultiJvmNode5 extends ClusterShardingSingleShardPerEntitySpec
abstract class ClusterShardingSingleShardPerEntitySpec abstract class ClusterShardingSingleShardPerEntitySpec
extends MultiNodeSpec(ClusterShardingSingleShardPerEntitySpecConfig) extends MultiNodeClusterShardingSpec(ClusterShardingSingleShardPerEntitySpecConfig)
with STMultiNodeSpec
with ImplicitSender { with ImplicitSender {
import ClusterShardingSingleShardPerEntitySpec._
import ClusterShardingSingleShardPerEntitySpecConfig._ import ClusterShardingSingleShardPerEntitySpecConfig._
import MultiNodeClusterShardingSpec.ShardedEntity
override def initialParticipants = roles.size
def join(from: RoleName, to: RoleName): Unit = { def join(from: RoleName, to: RoleName): Unit = {
runOn(from) { join(
Cluster(system).join(node(to).address) from,
startSharding() to,
} startSharding(
enterBarrier(from.name + "-joined") system,
}
def startSharding(): Unit = {
ClusterSharding(system).start(
typeName = "Entity", typeName = "Entity",
entityProps = Props[Entity], entityProps = Props[ShardedEntity],
settings = ClusterShardingSettings(system), extractEntityId = MultiNodeClusterShardingSpec.intExtractEntityId,
extractEntityId = extractEntityId, extractShardId = MultiNodeClusterShardingSpec.intExtractShardId))
extractShardId = extractShardId)
} }
lazy val region = ClusterSharding(system).shardRegion("Entity") lazy val region = ClusterSharding(system).shardRegion("Entity")

View file

@ -4,33 +4,22 @@
package akka.cluster.sharding package akka.cluster.sharding
import akka.actor._
import akka.cluster.Cluster
import akka.cluster.ddata.{ Replicator, ReplicatorSettings } import akka.cluster.ddata.{ Replicator, ReplicatorSettings }
import akka.cluster.sharding.ShardCoordinator.Internal.{ HandOff, ShardStopped } import akka.cluster.sharding.ShardCoordinator.Internal.{ HandOff, ShardStopped }
import akka.cluster.sharding.ShardRegion.Passivate import akka.cluster.sharding.ShardRegion.{ CurrentRegions, GetCurrentRegions, Passivate }
import akka.cluster.sharding.ShardRegion.GetCurrentRegions import akka.cluster.singleton.{ ClusterSingletonManager, ClusterSingletonManagerSettings }
import akka.cluster.sharding.ShardRegion.CurrentRegions
import language.postfixOps
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor._
import akka.cluster.{ Cluster, MultiNodeClusterSpec }
import akka.persistence.PersistentActor
import akka.persistence.Persistence
import akka.persistence.journal.leveldb.SharedLeveldbJournal
import akka.persistence.journal.leveldb.SharedLeveldbStore
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._
import akka.testkit.TestEvent.Mute
import java.io.File
import org.apache.commons.io.FileUtils
import akka.cluster.singleton.ClusterSingletonManager
import akka.cluster.singleton.ClusterSingletonManagerSettings
import akka.pattern.BackoffOpts import akka.pattern.BackoffOpts
import akka.persistence.{ Persistence, PersistentActor }
import akka.persistence.journal.leveldb.{ SharedLeveldbJournal, SharedLeveldbStore }
import akka.remote.testconductor.RoleName
import akka.testkit.TestEvent.Mute
import akka.testkit._
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import scala.language.postfixOps
object ClusterShardingSpec { object ClusterShardingSpec {
//#counter-actor //#counter-actor
@ -118,8 +107,8 @@ object ClusterShardingSpec {
} }
abstract class ClusterShardingSpecConfig(val mode: String, val entityRecoveryStrategy: String = "all") abstract class ClusterShardingSpecConfig(mode: String, val entityRecoveryStrategy: String = "all")
extends MultiNodeConfig { extends MultiNodeClusterShardingConfig(mode) {
val controller = role("controller") val controller = role("controller")
val first = role("first") val first = role("first")
@ -129,30 +118,22 @@ abstract class ClusterShardingSpecConfig(val mode: String, val entityRecoveryStr
val fifth = role("fifth") val fifth = role("fifth")
val sixth = role("sixth") val sixth = role("sixth")
commonConfig( /** This is the only test that creates the shared store regardless of mode,
ConfigFactory * because it uses a PersistentActor. So unlike all other uses of
.parseString(s""" * `MultiNodeClusterShardingConfig`, we use `MultiNodeConfig.commonConfig` here,
akka.loglevel = INFO * and call `MultiNodeClusterShardingConfig.persistenceConfig` which does not check
akka.actor.provider = "cluster" * mode, then leverage the common config and fallbacks after these specific test configs:
akka.remote.log-remote-lifecycle-events = off */
akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning commonConfig(ConfigFactory.parseString(s"""
akka.cluster.testkit.auto-down-unreachable-after = 0s
akka.cluster.roles = ["backend"] akka.cluster.roles = ["backend"]
akka.cluster.distributed-data.gossip-interval = 1s akka.cluster.distributed-data.gossip-interval = 1s
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" akka.persistence.journal.leveldb-shared.timeout = 10s #the original default, base test uses 5s
akka.persistence.journal.leveldb-shared.store {
native = off
dir = "target/ClusterShardingSpec/journal"
}
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/ClusterShardingSpec/snapshots"
akka.cluster.sharding { akka.cluster.sharding {
retry-interval = 1 s retry-interval = 1 s
handoff-timeout = 10 s handoff-timeout = 10 s
shard-start-timeout = 5s shard-start-timeout = 5s
entity-restart-backoff = 1s entity-restart-backoff = 1s
rebalance-interval = 2 s rebalance-interval = 2 s
state-store-mode = "$mode"
entity-recovery-strategy = "$entityRecoveryStrategy" entity-recovery-strategy = "$entityRecoveryStrategy"
entity-recovery-constant-rate-strategy { entity-recovery-constant-rate-strategy {
frequency = 1 ms frequency = 1 ms
@ -162,10 +143,6 @@ abstract class ClusterShardingSpecConfig(val mode: String, val entityRecoveryStr
rebalance-threshold = 1 rebalance-threshold = 1
max-simultaneous-rebalance = 1 max-simultaneous-rebalance = 1
} }
distributed-data.durable.lmdb {
dir = target/ClusterShardingSpec/sharding-ddata
map-size = 10 MiB
}
} }
akka.testconductor.barrier-timeout = 70s akka.testconductor.barrier-timeout = 70s
@ -182,9 +159,8 @@ abstract class ClusterShardingSpecConfig(val mode: String, val entityRecoveryStr
} }
""") """).withFallback(MultiNodeClusterShardingConfig.persistenceConfig(targetDir)).withFallback(common))
.withFallback(SharedLeveldbJournal.configToEnableJavaSerializationForTest)
.withFallback(MultiNodeClusterSpec.clusterConfig))
nodeConfig(sixth) { nodeConfig(sixth) {
ConfigFactory.parseString("""akka.cluster.roles = ["frontend"]""") ConfigFactory.parseString("""akka.cluster.roles = ["frontend"]""")
} }
@ -226,11 +202,13 @@ object ClusterShardingDocCode {
} }
object PersistentClusterShardingSpecConfig extends ClusterShardingSpecConfig("persistence") object PersistentClusterShardingSpecConfig
object DDataClusterShardingSpecConfig extends ClusterShardingSpecConfig("ddata") extends ClusterShardingSpecConfig(ClusterShardingSettings.StateStoreModePersistence)
object DDataClusterShardingSpecConfig extends ClusterShardingSpecConfig(ClusterShardingSettings.StateStoreModeDData)
object PersistentClusterShardingWithEntityRecoverySpecConfig object PersistentClusterShardingWithEntityRecoverySpecConfig
extends ClusterShardingSpecConfig("persistence", "constant") extends ClusterShardingSpecConfig(ClusterShardingSettings.StateStoreModePersistence, "constant")
object DDataClusterShardingWithEntityRecoverySpecConfig extends ClusterShardingSpecConfig("ddata", "constant") object DDataClusterShardingWithEntityRecoverySpecConfig
extends ClusterShardingSpecConfig(ClusterShardingSettings.StateStoreModeDData, "constant")
class PersistentClusterShardingSpec extends ClusterShardingSpec(PersistentClusterShardingSpecConfig) class PersistentClusterShardingSpec extends ClusterShardingSpec(PersistentClusterShardingSpecConfig)
class DDataClusterShardingSpec extends ClusterShardingSpec(DDataClusterShardingSpecConfig) class DDataClusterShardingSpec extends ClusterShardingSpec(DDataClusterShardingSpecConfig)
@ -271,32 +249,14 @@ class DDataClusterShardingWithEntityRecoveryMultiJvmNode5 extends DDataClusterSh
class DDataClusterShardingWithEntityRecoveryMultiJvmNode6 extends DDataClusterShardingWithEntityRecoverySpec class DDataClusterShardingWithEntityRecoveryMultiJvmNode6 extends DDataClusterShardingWithEntityRecoverySpec
class DDataClusterShardingWithEntityRecoveryMultiJvmNode7 extends DDataClusterShardingWithEntityRecoverySpec class DDataClusterShardingWithEntityRecoveryMultiJvmNode7 extends DDataClusterShardingWithEntityRecoverySpec
abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig) abstract class ClusterShardingSpec(multiNodeConfig: ClusterShardingSpecConfig)
extends MultiNodeSpec(config) extends MultiNodeClusterShardingSpec(multiNodeConfig)
with MultiNodeClusterSpec
with STMultiNodeSpec
with ImplicitSender { with ImplicitSender {
import ClusterShardingSpec._ import ClusterShardingSpec._
import config._ import multiNodeConfig._
val storageLocations = List(
new File(system.settings.config.getString("akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile)
override protected def atStartup(): Unit = {
storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir))
enterBarrier("startup")
}
override protected def afterTermination(): Unit = {
storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir))
}
def join(from: RoleName, to: RoleName): Unit = { def join(from: RoleName, to: RoleName): Unit = {
runOn(from) { join(from, to, createCoordinator())
Cluster(system).join(node(to).address)
createCoordinator()
}
enterBarrier(from.name + "-joined")
} }
lazy val replicator = system.actorOf( lazy val replicator = system.actorOf(
@ -381,8 +341,6 @@ abstract class ClusterShardingSpec(config: ClusterShardingSpecConfig)
lazy val rebalancingPersistentRegion = createRegion("RebalancingRememberCounter", rememberEntities = true) lazy val rebalancingPersistentRegion = createRegion("RebalancingRememberCounter", rememberEntities = true)
lazy val autoMigrateRegion = createRegion("AutoMigrateRememberRegionTest", rememberEntities = true) lazy val autoMigrateRegion = createRegion("AutoMigrateRememberRegionTest", rememberEntities = true)
def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData
s"Cluster sharding ($mode)" must { s"Cluster sharding ($mode)" must {
// must be done also in ddata mode since Counter is PersistentActor // must be done also in ddata mode since Counter is PersistentActor

View file

@ -4,42 +4,24 @@
package akka.cluster.sharding package akka.cluster.sharding
import akka.actor.Actor import akka.actor.{ Actor, ActorLogging, Address, Props }
import akka.actor.ActorLogging
import akka.actor.Address
import akka.actor.PoisonPill
import akka.actor.Props
import akka.cluster.Cluster import akka.cluster.Cluster
import akka.cluster.MultiNodeClusterSpec import akka.cluster.sharding.ExternalShardAllocationSpec.GiveMeYourHome.{ Get, Home }
import akka.cluster.sharding.ExternalShardAllocationSpec.GiveMeYourHome.Get import akka.cluster.sharding.external.{ ExternalShardAllocation, ExternalShardAllocationStrategy }
import akka.cluster.sharding.ExternalShardAllocationSpec.GiveMeYourHome.Home
import akka.cluster.sharding.external.ExternalShardAllocation
import akka.cluster.sharding.external.ExternalShardAllocationStrategy
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.serialization.jackson.CborSerializable import akka.serialization.jackson.CborSerializable
import akka.testkit.ImplicitSender import akka.testkit.{ ImplicitSender, TestProbe }
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.ScalaFutures import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.duration._ import scala.concurrent.duration._
object ExternalShardAllocationSpecConfig extends MultiNodeConfig { object ExternalShardAllocationSpecConfig
extends MultiNodeClusterShardingConfig(additionalConfig = """
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.cluster.sharding { akka.cluster.sharding {
distributed-data.durable.lmdb {
dir = target/ExternalShardAllocationSpec/sharding-ddata
map-size = 10 MiB
}
retry-interval = 2000ms retry-interval = 2000ms
waiting-for-state-timeout = 2000ms waiting-for-state-timeout = 2000ms
rebalance-interval = 1s rebalance-interval = 1s
} }
""").withFallback(MultiNodeClusterSpec.clusterConfig)) """) {
val first = role("first") val first = role("first")
val second = role("second") val second = role("second")
@ -82,14 +64,13 @@ object ExternalShardAllocationSpec {
} }
abstract class ExternalShardAllocationSpec abstract class ExternalShardAllocationSpec
extends MultiNodeSpec(ExternalShardAllocationSpecConfig) extends MultiNodeClusterShardingSpec(ExternalShardAllocationSpecConfig)
with MultiNodeClusterSpec
with ImplicitSender with ImplicitSender
with ScalaFutures { with ScalaFutures {
import ExternalShardAllocationSpecConfig._
import ExternalShardAllocationSpec._
import ExternalShardAllocationSpec.GiveMeYourHome._ import ExternalShardAllocationSpec.GiveMeYourHome._
import ExternalShardAllocationSpec._
import ExternalShardAllocationSpecConfig._
override implicit val patienceConfig: PatienceConfig = PatienceConfig(5.second) override implicit val patienceConfig: PatienceConfig = PatienceConfig(5.second)
@ -102,16 +83,13 @@ abstract class ExternalShardAllocationSpec
enterBarrier("cluster-started") enterBarrier("cluster-started")
} }
lazy val shardRegion = { lazy val shardRegion = startSharding(
ClusterSharding(system).start( system,
typeName = typeName, typeName = typeName,
entityProps = Props[GiveMeYourHome], entityProps = Props[GiveMeYourHome],
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId, extractEntityId = extractEntityId,
extractShardId = extractShardId, extractShardId = extractShardId,
new ExternalShardAllocationStrategy(system, typeName), allocationStrategy = new ExternalShardAllocationStrategy(system, typeName))
PoisonPill)
}
"start cluster sharding" in { "start cluster sharding" in {
shardRegion shardRegion

View file

@ -4,23 +4,16 @@
package akka.cluster.sharding package akka.cluster.sharding
import scala.concurrent.duration._ import akka.actor.{ Actor, ActorRef, Address, Props }
import akka.cluster.sharding.ShardRegion.{ CurrentRegions, GetCurrentRegions }
import akka.actor.Actor import akka.cluster.{ Cluster, MemberStatus }
import akka.actor.ActorRef
import akka.actor.Address
import akka.actor.Props
import akka.cluster.{ Cluster, MemberStatus, MultiNodeClusterSpec }
import akka.cluster.sharding.ShardRegion.CurrentRegions
import akka.cluster.sharding.ShardRegion.GetCurrentRegions
import akka.remote.testconductor.RoleName import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.serialization.jackson.CborSerializable import akka.serialization.jackson.CborSerializable
import akka.testkit._ import akka.testkit._
import com.typesafe.config.ConfigFactory
import akka.util.ccompat._ import akka.util.ccompat._
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
@ccompatUsedUntil213 @ccompatUsedUntil213
object MultiDcClusterShardingSpec { object MultiDcClusterShardingSpec {
@ -50,26 +43,23 @@ object MultiDcClusterShardingSpec {
} }
} }
object MultiDcClusterShardingSpecConfig extends MultiNodeConfig { object MultiDcClusterShardingSpecConfig
extends MultiNodeClusterShardingConfig(
loglevel = "DEBUG", //issue #23741
additionalConfig = s"""
akka.cluster {
debug.verbose-heartbeat-logging = on
debug.verbose-gossip-logging = on
sharding.retry-interval = 200ms
}
akka.remote.log-remote-lifecycle-events = on
""") {
val first = role("first") val first = role("first")
val second = role("second") val second = role("second")
val third = role("third") val third = role("third")
val fourth = role("fourth") val fourth = role("fourth")
commonConfig(ConfigFactory.parseString(s"""
akka.loglevel = DEBUG # issue #23741
akka.cluster {
debug.verbose-heartbeat-logging = on
debug.verbose-gossip-logging = on
downing-provider-class = akka.cluster.testkit.AutoDowning
testkit.auto-down-unreachable-after = 0s
sharding {
retry-interval = 200ms
}
}
akka.remote.log-remote-lifecycle-events = on
""").withFallback(MultiNodeClusterSpec.clusterConfig))
nodeConfig(first, second) { nodeConfig(first, second) {
ConfigFactory.parseString("akka.cluster.multi-data-center.self-data-center = DC1") ConfigFactory.parseString("akka.cluster.multi-data-center.self-data-center = DC1")
} }
@ -85,16 +75,15 @@ class MultiDcClusterShardingSpecMultiJvmNode3 extends MultiDcClusterShardingSpec
class MultiDcClusterShardingSpecMultiJvmNode4 extends MultiDcClusterShardingSpec class MultiDcClusterShardingSpecMultiJvmNode4 extends MultiDcClusterShardingSpec
abstract class MultiDcClusterShardingSpec abstract class MultiDcClusterShardingSpec
extends MultiNodeSpec(MultiDcClusterShardingSpecConfig) extends MultiNodeClusterShardingSpec(MultiDcClusterShardingSpecConfig)
with MultiNodeClusterSpec
with STMultiNodeSpec
with ImplicitSender { with ImplicitSender {
import MultiDcClusterShardingSpec._ import MultiDcClusterShardingSpec._
import MultiDcClusterShardingSpecConfig._ import MultiDcClusterShardingSpecConfig._
def join(from: RoleName, to: RoleName): Unit = { def join(from: RoleName, to: RoleName): Unit = {
runOn(from) { join(
cluster.join(node(to).address) from,
to, {
startSharding() startSharding()
withClue( withClue(
s"Failed waiting for ${cluster.selfUniqueAddress} to be up. Current state: ${cluster.state}" + cluster.state) { s"Failed waiting for ${cluster.selfUniqueAddress} to be up. Current state: ${cluster.state}" + cluster.state) {
@ -104,15 +93,14 @@ abstract class MultiDcClusterShardingSpec
} should be(true)) } should be(true))
} }
} }
} })
enterBarrier(from.name + "-joined")
} }
def startSharding(): Unit = { def startSharding(): Unit = {
ClusterSharding(system).start( startSharding(
system,
typeName = "Entity", typeName = "Entity",
entityProps = Props[Entity](), entityProps = Props[Entity](),
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId, extractEntityId = extractEntityId,
extractShardId = extractShardId) extractShardId = extractShardId)
} }

View file

@ -12,6 +12,7 @@ import akka.remote.testkit.MultiNodeConfig
import com.typesafe.config.{ Config, ConfigFactory } import com.typesafe.config.{ Config, ConfigFactory }
object MultiNodeClusterShardingConfig { object MultiNodeClusterShardingConfig {
private[sharding] def testNameFromCallStack(classToStartFrom: Class[_]): String = { private[sharding] def testNameFromCallStack(classToStartFrom: Class[_]): String = {
def isAbstractClass(className: String): Boolean = { def isAbstractClass(className: String): Boolean = {
@ -53,6 +54,21 @@ object MultiNodeClusterShardingConfig {
.replaceAll("""\$\$?\w+""", "") // drop scala anonymous functions/classes .replaceAll("""\$\$?\w+""", "") // drop scala anonymous functions/classes
.replaceAll("[^a-zA-Z_0-9]", "_") .replaceAll("[^a-zA-Z_0-9]", "_")
} }
def persistenceConfig(targetDir: String): Config =
ConfigFactory.parseString(s"""
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared {
timeout = 5s
store {
native = off
dir = "$targetDir/journal"
}
}
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "$targetDir/snapshots"
""")
} }
/** /**
@ -63,48 +79,42 @@ object MultiNodeClusterShardingConfig {
* *
* @param mode the state store mode * @param mode the state store mode
* @param rememberEntities defaults to off * @param rememberEntities defaults to off
* @param overrides additional config * @param additionalConfig additional config
* @param loglevel defaults to INFO * @param loglevel defaults to INFO
*/ */
abstract class MultiNodeClusterShardingConfig( abstract class MultiNodeClusterShardingConfig(
val mode: String = ClusterShardingSettings.StateStoreModeDData, val mode: String = ClusterShardingSettings.StateStoreModeDData,
val rememberEntities: Boolean = false, val rememberEntities: Boolean = false,
overrides: Config = ConfigFactory.empty, additionalConfig: String = "",
loglevel: String = "INFO") loglevel: String = "INFO")
extends MultiNodeConfig { extends MultiNodeConfig {
import MultiNodeClusterShardingConfig._ import MultiNodeClusterShardingConfig._
val targetDir = val targetDir =
s"target/ClusterSharding${testNameFromCallStack(classOf[MultiNodeClusterShardingConfig])}Spec-$mode-remember-$rememberEntities" s"target/ClusterSharding${testNameFromCallStack(classOf[MultiNodeClusterShardingConfig]).replace("Config", "").replace("_", "")}"
val modeConfig = val persistenceConfig: Config =
if (mode == ClusterShardingSettings.StateStoreModeDData) ConfigFactory.empty if (mode == ClusterShardingSettings.StateStoreModeDData) ConfigFactory.empty
else ConfigFactory.parseString(s""" else MultiNodeClusterShardingConfig.persistenceConfig(targetDir)
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared.timeout = 5s
akka.persistence.journal.leveldb-shared.store.native = off
akka.persistence.journal.leveldb-shared.store.dir = "$targetDir/journal"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "$targetDir/snapshots"
""")
commonConfig( val common: Config =
overrides ConfigFactory
.withFallback(modeConfig) .parseString(s"""
.withFallback(ConfigFactory.parseString(s"""
akka.loglevel = $loglevel
akka.actor.provider = "cluster" akka.actor.provider = "cluster"
akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning
akka.cluster.testkit.auto-down-unreachable-after = 0s akka.cluster.testkit.auto-down-unreachable-after = 0s
akka.remote.log-remote-lifecycle-events = off
akka.cluster.sharding.state-store-mode = "$mode" akka.cluster.sharding.state-store-mode = "$mode"
akka.cluster.sharding.distributed-data.durable.lmdb { akka.cluster.sharding.distributed-data.durable.lmdb {
dir = $targetDir/sharding-ddata dir = $targetDir/sharding-ddata
map-size = 10 MiB map-size = 10 MiB
} }
""")) akka.loglevel = $loglevel
akka.remote.log-remote-lifecycle-events = off
""")
.withFallback(SharedLeveldbJournal.configToEnableJavaSerializationForTest) .withFallback(SharedLeveldbJournal.configToEnableJavaSerializationForTest)
.withFallback(MultiNodeClusterSpec.clusterConfig)) .withFallback(MultiNodeClusterSpec.clusterConfig)
commonConfig(ConfigFactory.parseString(additionalConfig).withFallback(persistenceConfig).withFallback(common))
} }

View file

@ -6,38 +6,66 @@ package akka.cluster.sharding
import java.io.File import java.io.File
import scala.concurrent.duration._ import akka.actor.{ Actor, ActorIdentity, ActorLogging, ActorRef, ActorSystem, Identify, PoisonPill, Props }
import akka.cluster.MultiNodeClusterSpec
import akka.actor.{ Actor, ActorIdentity, ActorRef, ActorSystem, Identify, PoisonPill, Props } import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.{ Cluster, MultiNodeClusterSpec }
import akka.persistence.Persistence import akka.persistence.Persistence
import akka.persistence.journal.leveldb.{ SharedLeveldbJournal, SharedLeveldbStore } import akka.persistence.journal.leveldb.{ SharedLeveldbJournal, SharedLeveldbStore }
import akka.remote.testconductor.RoleName import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.MultiNodeSpec
import akka.testkit.TestProbe import akka.serialization.jackson.CborSerializable
import akka.util.ccompat.ccompatUsedUntil213 import akka.testkit.{ TestActors, TestProbe }
import akka.util.ccompat._
import org.apache.commons.io.FileUtils import org.apache.commons.io.FileUtils
import scala.concurrent.duration._
@ccompatUsedUntil213 @ccompatUsedUntil213
object MultiNodeClusterShardingSpec { object MultiNodeClusterShardingSpec {
final case class EntityStarted(ref: ActorRef) object EntityActor {
final case class Started(ref: ActorRef)
def props(probe: ActorRef): Props = Props(new EntityActor(probe)) }
class EntityActor(probe: ActorRef) extends Actor { class EntityActor(probe: ActorRef) extends Actor {
probe ! EntityStarted(self) probe ! EntityActor.Started(self)
def receive: Receive = { def receive: Receive = {
case m => sender() ! m case m => sender() ! m
} }
} }
val defaultExtractEntityId: ShardRegion.ExtractEntityId = { object PingPongActor {
case id: Int => (id.toString, id) case object Stop extends CborSerializable
case class Ping(id: Long) extends CborSerializable
case object Pong extends CborSerializable
} }
val defaultExtractShardId: ShardRegion.ExtractShardId = msg => class PingPongActor extends Actor with ActorLogging {
import PingPongActor._
log.info(s"entity started {}", self.path)
def receive: Receive = {
case Stop => context.stop(self)
case _: Ping => sender() ! Pong
}
}
object ShardedEntity {
case object Stop
}
class ShardedEntity extends Actor {
def receive: Receive = {
case id: Int => sender() ! id
case ShardedEntity.Stop =>
context.stop(self)
}
}
val intExtractEntityId: ShardRegion.ExtractEntityId = {
case id: Int => (id.toString, id)
}
val intExtractShardId: ShardRegion.ExtractShardId = msg =>
msg match { msg match {
case id: Int => id.toString case id: Int => id.toString
case ShardRegion.StartEntity(id) => id case ShardRegion.StartEntity(id) => id
@ -58,7 +86,12 @@ abstract class MultiNodeClusterShardingSpec(val config: MultiNodeClusterSharding
override def initialParticipants: Int = roles.size override def initialParticipants: Int = roles.size
protected val storageLocations = List( protected lazy val settings = ClusterShardingSettings(system).withRememberEntities(config.rememberEntities)
private lazy val defaultShardAllocationStrategy =
ClusterSharding(system).defaultShardAllocationStrategy(settings)
protected lazy val storageLocations = List(
new File(system.settings.config.getString("akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile) new File(system.settings.config.getString("akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile)
override protected def atStartup(): Unit = { override protected def atStartup(): Unit = {
@ -72,44 +105,92 @@ abstract class MultiNodeClusterShardingSpec(val config: MultiNodeClusterSharding
super.afterTermination() super.afterTermination()
} }
protected def join(from: RoleName, to: RoleName): Unit = { /**
* Flexible cluster join pattern usage.
*
* @param from the node the `Cluster.join` is `runOn`
* @param to the node to join to
* @param onJoinedRunOnFrom optionally execute a function after join validation
* is successful, e.g. start sharding or create coordinator
* @param assertNodeUp if disabled - false, the joining member's `MemberStatus.Up`
* and similar assertions are not run. This allows tests that were
* not doing assertions (e.g. ClusterShardingMinMembersSpec) or
* doing them after `onJoinedRunOnFrom` more flexibility.
* Defaults to true, running member status checks.
*/
protected def join(
from: RoleName,
to: RoleName,
onJoinedRunOnFrom: => Unit = (),
assertNodeUp: Boolean = true,
max: FiniteDuration = 20.seconds): Unit = {
runOn(from) { runOn(from) {
Cluster(system).join(node(to).address) cluster.join(node(to).address)
if (assertNodeUp) {
within(max) {
awaitAssert { awaitAssert {
Cluster(system).state.isMemberUp(node(from).address) cluster.state.isMemberUp(node(from).address)
} }
} }
}
onJoinedRunOnFrom
}
enterBarrier(from.name + "-joined") enterBarrier(from.name + "-joined")
} }
protected def startSharding( protected def startSharding(
sys: ActorSystem, sys: ActorSystem,
entityProps: Props, typeName: String,
dataType: String, entityProps: Props = TestActors.echoActorProps,
extractEntityId: ShardRegion.ExtractEntityId = defaultExtractEntityId, settings: ClusterShardingSettings = settings,
extractShardId: ShardRegion.ExtractShardId = defaultExtractShardId, extractEntityId: ShardRegion.ExtractEntityId = intExtractEntityId,
extractShardId: ShardRegion.ExtractShardId = intExtractShardId,
allocationStrategy: ShardAllocationStrategy = defaultShardAllocationStrategy,
handOffStopMessage: Any = PoisonPill): ActorRef = { handOffStopMessage: Any = PoisonPill): ActorRef = {
ClusterSharding(sys).start( ClusterSharding(sys).start(
typeName = dataType, typeName,
entityProps = entityProps, entityProps,
settings = ClusterShardingSettings(sys).withRememberEntities(rememberEntities), settings,
extractEntityId = extractEntityId, extractEntityId,
extractShardId = extractShardId, extractShardId,
ClusterSharding(sys).defaultShardAllocationStrategy(ClusterShardingSettings(sys)), allocationStrategy,
handOffStopMessage) handOffStopMessage)
} }
protected def startProxy(
sys: ActorSystem,
typeName: String,
role: Option[String],
extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId): ActorRef = {
ClusterSharding(sys).startProxy(typeName, role, extractEntityId, extractShardId)
}
protected def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData protected def isDdataMode: Boolean = mode == ClusterShardingSettings.StateStoreModeDData
private def setStoreIfNotDdataMode(sys: ActorSystem, storeOn: RoleName): Unit = protected def setStoreIfNotDdataMode(sys: ActorSystem, storeOn: RoleName): Unit =
if (!isDdataMode) { if (!isDdataMode) setStore(sys, storeOn)
protected def setStore(sys: ActorSystem, storeOn: RoleName): Unit = {
val probe = TestProbe()(sys) val probe = TestProbe()(sys)
sys.actorSelection(node(storeOn) / "user" / "store").tell(Identify(None), probe.ref) sys.actorSelection(node(storeOn) / "user" / "store").tell(Identify(None), probe.ref)
val sharedStore = probe.expectMsgType[ActorIdentity](20.seconds).ref.get val sharedStore = probe.expectMsgType[ActorIdentity](20.seconds).ref.get
SharedLeveldbJournal.setStore(sharedStore, sys) SharedLeveldbJournal.setStore(sharedStore, sys)
} }
/**
* {{{
* startPersistenceIfNotDdataMode(startOn = first, setStoreOn = Seq(first, second, third))
* }}}
*
* @param startOn the node to start the `SharedLeveldbStore` store on
* @param setStoreOn the nodes to `SharedLeveldbJournal.setStore` on
*/
protected def startPersistenceIfNotDdataMode(startOn: RoleName, setStoreOn: Seq[RoleName]): Unit =
if (!isDdataMode) startPersistence(startOn, setStoreOn)
/** /**
* {{{ * {{{
* startPersistence(startOn = first, setStoreOn = Seq(first, second, third)) * startPersistence(startOn = first, setStoreOn = Seq(first, second, third))
@ -118,8 +199,8 @@ abstract class MultiNodeClusterShardingSpec(val config: MultiNodeClusterSharding
* @param startOn the node to start the `SharedLeveldbStore` store on * @param startOn the node to start the `SharedLeveldbStore` store on
* @param setStoreOn the nodes to `SharedLeveldbJournal.setStore` on * @param setStoreOn the nodes to `SharedLeveldbJournal.setStore` on
*/ */
protected def startPersistenceIfNotDdataMode(startOn: RoleName, setStoreOn: Seq[RoleName]): Unit = protected def startPersistence(startOn: RoleName, setStoreOn: Seq[RoleName]): Unit = {
if (!isDdataMode) { info("Setting up setup shared journal.")
Persistence(system) Persistence(system)
runOn(startOn) { runOn(startOn) {
@ -128,11 +209,10 @@ abstract class MultiNodeClusterShardingSpec(val config: MultiNodeClusterSharding
enterBarrier("persistence-started") enterBarrier("persistence-started")
runOn(setStoreOn: _*) { runOn(setStoreOn: _*) {
setStoreIfNotDdataMode(system, startOn) setStore(system, startOn)
} }
enterBarrier(s"after-${startOn.name}") enterBarrier(s"after-${startOn.name}")
} }
} }

View file

@ -98,7 +98,7 @@ object MultiNodeClusterSpec {
trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoroner { trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoroner {
self: MultiNodeSpec => self: MultiNodeSpec =>
override def initialParticipants = roles.size override def initialParticipants: Int = roles.size
private val cachedAddresses = new ConcurrentHashMap[RoleName, Address] private val cachedAddresses = new ConcurrentHashMap[RoleName, Address]