2018-10-29 17:19:37 +08:00
|
|
|
/*
|
2020-01-02 07:24:59 -05:00
|
|
|
* Copyright (C) 2015-2020 Lightbend Inc. <https://www.lightbend.com>
|
2015-04-15 11:07:12 +02:00
|
|
|
*/
|
2018-04-24 16:03:55 +01:00
|
|
|
|
2015-04-27 14:48:28 +02:00
|
|
|
package akka.cluster.sharding
|
2015-04-15 11:07:12 +02:00
|
|
|
|
|
|
|
|
import scala.concurrent.duration._
|
Disable Java serialization by default, #22333 (#27285)
* akka.actor.allow-java-serialization = off
* Moved primitive (Long, Int, String, ByteString) serializers
from akka-remote to akka-actor since they had no dependency
and are useful also in local systems, e.g. persistence.
* e.g. needed for persistence-tck
* less allow-java-serialization=on in tests
* CborSerializable in Jackson/test module for ease of use
* JavaSerializable for Java serialization in tests, already in akka-testkit,
but misconfigured
* Made tests pass
* allow-java-serialization=on in akka-persistence
* allow-java-serialization=on in classic remoting tests
* JavaSerializable and CborSerializable in other remoting tests
* Added serialization for
* Boolean
* java.util.concurrent.TimeoutException, AskTimeoutException
* support for testing serialization with the inmem journal
* utility to verifySerialization, in SerializationTestKit
* remove AccountExampleWithCommandHandlersInState becuase not possible to serialize State when it's not static
* Effect() is factory in EventSourcedBehavior class
* test the account examples
* SharedLeveldbJournal.configToEnableJavaSerializationForTest
* support for exceptions from remote deployed child actors
* fallback to akka.remote.serialization.ThrowableNotSerializableException
if exception is not serializable when wrapped in system messages from
remote deployed child actors and Status.Failure messages
* it's implemented in `WrappedPayloadSupport.payloadBuilder`
* update reference documentation
* serialize-messages=off in most places, separate ticket for
improving or removing that feature
* migration guide, including description of rolling update
* fix 2.13 compiler error
* minor review feedback
2019-07-11 14:04:24 +02:00
|
|
|
|
2020-02-27 12:05:55 -08:00
|
|
|
import akka.actor.{ Actor, ActorRef, Props }
|
Disable Java serialization by default, #22333 (#27285)
* akka.actor.allow-java-serialization = off
* Moved primitive (Long, Int, String, ByteString) serializers
from akka-remote to akka-actor since they had no dependency
and are useful also in local systems, e.g. persistence.
* e.g. needed for persistence-tck
* less allow-java-serialization=on in tests
* CborSerializable in Jackson/test module for ease of use
* JavaSerializable for Java serialization in tests, already in akka-testkit,
but misconfigured
* Made tests pass
* allow-java-serialization=on in akka-persistence
* allow-java-serialization=on in classic remoting tests
* JavaSerializable and CborSerializable in other remoting tests
* Added serialization for
* Boolean
* java.util.concurrent.TimeoutException, AskTimeoutException
* support for testing serialization with the inmem journal
* utility to verifySerialization, in SerializationTestKit
* remove AccountExampleWithCommandHandlersInState becuase not possible to serialize State when it's not static
* Effect() is factory in EventSourcedBehavior class
* test the account examples
* SharedLeveldbJournal.configToEnableJavaSerializationForTest
* support for exceptions from remote deployed child actors
* fallback to akka.remote.serialization.ThrowableNotSerializableException
if exception is not serializable when wrapped in system messages from
remote deployed child actors and Status.Failure messages
* it's implemented in `WrappedPayloadSupport.payloadBuilder`
* update reference documentation
* serialize-messages=off in most places, separate ticket for
improving or removing that feature
* migration guide, including description of rolling update
* fix 2.13 compiler error
* minor review feedback
2019-07-11 14:04:24 +02:00
|
|
|
import akka.serialization.jackson.CborSerializable
|
2015-04-15 11:07:12 +02:00
|
|
|
import akka.testkit._
|
|
|
|
|
|
2015-08-20 13:24:39 +03:00
|
|
|
object ClusterShardingLeavingSpec {
|
Disable Java serialization by default, #22333 (#27285)
* akka.actor.allow-java-serialization = off
* Moved primitive (Long, Int, String, ByteString) serializers
from akka-remote to akka-actor since they had no dependency
and are useful also in local systems, e.g. persistence.
* e.g. needed for persistence-tck
* less allow-java-serialization=on in tests
* CborSerializable in Jackson/test module for ease of use
* JavaSerializable for Java serialization in tests, already in akka-testkit,
but misconfigured
* Made tests pass
* allow-java-serialization=on in akka-persistence
* allow-java-serialization=on in classic remoting tests
* JavaSerializable and CborSerializable in other remoting tests
* Added serialization for
* Boolean
* java.util.concurrent.TimeoutException, AskTimeoutException
* support for testing serialization with the inmem journal
* utility to verifySerialization, in SerializationTestKit
* remove AccountExampleWithCommandHandlersInState becuase not possible to serialize State when it's not static
* Effect() is factory in EventSourcedBehavior class
* test the account examples
* SharedLeveldbJournal.configToEnableJavaSerializationForTest
* support for exceptions from remote deployed child actors
* fallback to akka.remote.serialization.ThrowableNotSerializableException
if exception is not serializable when wrapped in system messages from
remote deployed child actors and Status.Failure messages
* it's implemented in `WrappedPayloadSupport.payloadBuilder`
* update reference documentation
* serialize-messages=off in most places, separate ticket for
improving or removing that feature
* migration guide, including description of rolling update
* fix 2.13 compiler error
* minor review feedback
2019-07-11 14:04:24 +02:00
|
|
|
case class Ping(id: String) extends CborSerializable
|
2015-04-15 11:07:12 +02:00
|
|
|
|
|
|
|
|
class Entity extends Actor {
|
|
|
|
|
def receive = {
|
2019-02-09 15:25:39 +01:00
|
|
|
case Ping(_) => sender() ! self
|
2015-04-15 11:07:12 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
Disable Java serialization by default, #22333 (#27285)
* akka.actor.allow-java-serialization = off
* Moved primitive (Long, Int, String, ByteString) serializers
from akka-remote to akka-actor since they had no dependency
and are useful also in local systems, e.g. persistence.
* e.g. needed for persistence-tck
* less allow-java-serialization=on in tests
* CborSerializable in Jackson/test module for ease of use
* JavaSerializable for Java serialization in tests, already in akka-testkit,
but misconfigured
* Made tests pass
* allow-java-serialization=on in akka-persistence
* allow-java-serialization=on in classic remoting tests
* JavaSerializable and CborSerializable in other remoting tests
* Added serialization for
* Boolean
* java.util.concurrent.TimeoutException, AskTimeoutException
* support for testing serialization with the inmem journal
* utility to verifySerialization, in SerializationTestKit
* remove AccountExampleWithCommandHandlersInState becuase not possible to serialize State when it's not static
* Effect() is factory in EventSourcedBehavior class
* test the account examples
* SharedLeveldbJournal.configToEnableJavaSerializationForTest
* support for exceptions from remote deployed child actors
* fallback to akka.remote.serialization.ThrowableNotSerializableException
if exception is not serializable when wrapped in system messages from
remote deployed child actors and Status.Failure messages
* it's implemented in `WrappedPayloadSupport.payloadBuilder`
* update reference documentation
* serialize-messages=off in most places, separate ticket for
improving or removing that feature
* migration guide, including description of rolling update
* fix 2.13 compiler error
* minor review feedback
2019-07-11 14:04:24 +02:00
|
|
|
case object GetLocations extends CborSerializable
|
|
|
|
|
case class Locations(locations: Map[String, ActorRef]) extends CborSerializable
|
2015-04-15 11:07:12 +02:00
|
|
|
|
|
|
|
|
class ShardLocations extends Actor {
|
|
|
|
|
var locations: Locations = _
|
|
|
|
|
def receive = {
|
2019-02-09 15:25:39 +01:00
|
|
|
case GetLocations => sender() ! locations
|
|
|
|
|
case l: Locations => locations = l
|
2015-04-15 11:07:12 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2015-06-09 16:02:19 +02:00
|
|
|
val extractEntityId: ShardRegion.ExtractEntityId = {
|
2019-02-09 15:25:39 +01:00
|
|
|
case m @ Ping(id) => (id, m)
|
2015-04-15 11:07:12 +02:00
|
|
|
}
|
|
|
|
|
|
2015-06-09 16:02:19 +02:00
|
|
|
val extractShardId: ShardRegion.ExtractShardId = {
|
2019-02-09 15:25:39 +01:00
|
|
|
case Ping(id: String) => id.charAt(0).toString
|
2015-04-15 11:07:12 +02:00
|
|
|
}
|
2015-08-20 13:24:39 +03:00
|
|
|
}
|
2015-04-15 11:07:12 +02:00
|
|
|
|
2020-02-27 12:05:55 -08:00
|
|
|
abstract class ClusterShardingLeavingSpecConfig(mode: String) extends MultiNodeClusterShardingConfig(mode) {
|
2015-08-20 13:24:39 +03:00
|
|
|
val first = role("first")
|
|
|
|
|
val second = role("second")
|
|
|
|
|
val third = role("third")
|
|
|
|
|
val fourth = role("fourth")
|
|
|
|
|
|
2015-04-15 11:07:12 +02:00
|
|
|
}
|
|
|
|
|
|
2020-02-27 12:05:55 -08:00
|
|
|
object PersistentClusterShardingLeavingSpecConfig
|
|
|
|
|
extends ClusterShardingLeavingSpecConfig(ClusterShardingSettings.StateStoreModePersistence)
|
|
|
|
|
object DDataClusterShardingLeavingSpecConfig
|
|
|
|
|
extends ClusterShardingLeavingSpecConfig(ClusterShardingSettings.StateStoreModeDData)
|
2015-08-20 13:24:39 +03:00
|
|
|
|
2019-03-11 10:38:24 +01:00
|
|
|
class PersistentClusterShardingLeavingSpec
|
|
|
|
|
extends ClusterShardingLeavingSpec(PersistentClusterShardingLeavingSpecConfig)
|
2015-08-20 13:24:39 +03:00
|
|
|
class DDataClusterShardingLeavingSpec extends ClusterShardingLeavingSpec(DDataClusterShardingLeavingSpecConfig)
|
|
|
|
|
|
|
|
|
|
class PersistentClusterShardingLeavingMultiJvmNode1 extends PersistentClusterShardingLeavingSpec
|
|
|
|
|
class PersistentClusterShardingLeavingMultiJvmNode2 extends PersistentClusterShardingLeavingSpec
|
|
|
|
|
class PersistentClusterShardingLeavingMultiJvmNode3 extends PersistentClusterShardingLeavingSpec
|
|
|
|
|
class PersistentClusterShardingLeavingMultiJvmNode4 extends PersistentClusterShardingLeavingSpec
|
|
|
|
|
|
|
|
|
|
class DDataClusterShardingLeavingMultiJvmNode1 extends DDataClusterShardingLeavingSpec
|
|
|
|
|
class DDataClusterShardingLeavingMultiJvmNode2 extends DDataClusterShardingLeavingSpec
|
|
|
|
|
class DDataClusterShardingLeavingMultiJvmNode3 extends DDataClusterShardingLeavingSpec
|
|
|
|
|
class DDataClusterShardingLeavingMultiJvmNode4 extends DDataClusterShardingLeavingSpec
|
2015-04-15 11:07:12 +02:00
|
|
|
|
2020-02-27 12:05:55 -08:00
|
|
|
abstract class ClusterShardingLeavingSpec(multiNodeConfig: ClusterShardingLeavingSpecConfig)
|
|
|
|
|
extends MultiNodeClusterShardingSpec(multiNodeConfig)
|
2019-03-11 10:38:24 +01:00
|
|
|
with ImplicitSender {
|
2015-04-15 11:07:12 +02:00
|
|
|
import ClusterShardingLeavingSpec._
|
2020-02-27 12:05:55 -08:00
|
|
|
import multiNodeConfig._
|
2015-04-15 11:07:12 +02:00
|
|
|
|
|
|
|
|
def startSharding(): Unit = {
|
2020-02-27 12:05:55 -08:00
|
|
|
startSharding(
|
|
|
|
|
system,
|
2019-03-13 10:56:20 +01:00
|
|
|
typeName = "Entity",
|
|
|
|
|
entityProps = Props[Entity],
|
|
|
|
|
extractEntityId = extractEntityId,
|
|
|
|
|
extractShardId = extractShardId)
|
2015-04-15 11:07:12 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
lazy val region = ClusterSharding(system).shardRegion("Entity")
|
|
|
|
|
|
2015-08-20 13:24:39 +03:00
|
|
|
s"Cluster sharding ($mode) with leaving member" must {
|
2015-04-15 11:07:12 +02:00
|
|
|
|
|
|
|
|
"join cluster" in within(20.seconds) {
|
2020-05-11 12:17:29 +02:00
|
|
|
startPersistenceIfNeeded(startOn = first, setStoreOn = roles)
|
2020-02-27 12:05:55 -08:00
|
|
|
|
|
|
|
|
join(first, first, onJoinedRunOnFrom = startSharding())
|
|
|
|
|
join(second, first, onJoinedRunOnFrom = startSharding())
|
|
|
|
|
join(third, first, onJoinedRunOnFrom = startSharding())
|
|
|
|
|
join(fourth, first, onJoinedRunOnFrom = startSharding())
|
2015-04-15 11:07:12 +02:00
|
|
|
|
|
|
|
|
enterBarrier("after-2")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"initialize shards" in {
|
|
|
|
|
runOn(first) {
|
|
|
|
|
val shardLocations = system.actorOf(Props[ShardLocations], "shardLocations")
|
2019-02-09 15:25:39 +01:00
|
|
|
val locations = (for (n <- 1 to 10) yield {
|
2015-04-15 11:07:12 +02:00
|
|
|
val id = n.toString
|
|
|
|
|
region ! Ping(id)
|
2019-02-09 15:25:39 +01:00
|
|
|
id -> expectMsgType[ActorRef]
|
2015-04-15 11:07:12 +02:00
|
|
|
}).toMap
|
|
|
|
|
shardLocations ! Locations(locations)
|
|
|
|
|
}
|
|
|
|
|
enterBarrier("after-3")
|
|
|
|
|
}
|
|
|
|
|
|
2016-12-01 18:49:38 +01:00
|
|
|
"recover after leaving coordinator node" in {
|
|
|
|
|
system.actorSelection(node(first) / "user" / "shardLocations") ! GetLocations
|
|
|
|
|
val Locations(originalLocations) = expectMsgType[Locations]
|
|
|
|
|
val firstAddress = node(first).address
|
|
|
|
|
|
2015-04-15 11:07:12 +02:00
|
|
|
runOn(third) {
|
|
|
|
|
cluster.leave(node(first).address)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
runOn(first) {
|
|
|
|
|
watch(region)
|
2015-06-01 19:03:00 +02:00
|
|
|
expectTerminated(region, 15.seconds)
|
2015-04-15 11:07:12 +02:00
|
|
|
}
|
|
|
|
|
enterBarrier("stopped")
|
|
|
|
|
|
|
|
|
|
runOn(second, third, fourth) {
|
2016-12-01 18:49:38 +01:00
|
|
|
within(15.seconds) {
|
|
|
|
|
awaitAssert {
|
|
|
|
|
val probe = TestProbe()
|
|
|
|
|
originalLocations.foreach {
|
2019-02-09 15:25:39 +01:00
|
|
|
case (id, ref) =>
|
2016-12-01 18:49:38 +01:00
|
|
|
region.tell(Ping(id), probe.ref)
|
|
|
|
|
if (ref.path.address == firstAddress)
|
|
|
|
|
probe.expectMsgType[ActorRef](1.second) should not be (ref)
|
|
|
|
|
else
|
|
|
|
|
probe.expectMsg(1.second, ref) // should not move
|
|
|
|
|
}
|
2015-04-15 11:07:12 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
enterBarrier("after-4")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|