diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala index 232aa80585..39bc19c097 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala @@ -4,14 +4,12 @@ package akka.cluster.sharding -import akka.actor.{ Actor, ExtendedActorSystem, NoSerializationVerificationNeeded, PoisonPill, Props } +import akka.actor.{ Actor, ActorRef, ExtendedActorSystem, NoSerializationVerificationNeeded, PoisonPill, Props } +import akka.cluster.ClusterSettings.DataCenter import akka.cluster.sharding.ShardCoordinator.Internal.ShardStopped -import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy -import akka.cluster.sharding.ShardRegion.HandOffStopper +import akka.cluster.sharding.ShardCoordinator.LeastShardAllocationStrategy +import akka.cluster.sharding.ShardRegion.{ ExtractEntityId, ExtractShardId, HandOffStopper, Msg } import akka.testkit.{ AkkaSpec, TestProbe } -import org.mockito.ArgumentMatchers -import org.mockito.Mockito._ -import org.scalatestplus.mockito.MockitoSugar import scala.concurrent.duration._ @@ -32,10 +30,29 @@ class ClusterShardingInternalsSpec extends AkkaSpec(""" |akka.actor.provider = cluster |akka.remote.classic.netty.tcp.port = 0 |akka.remote.artery.canonical.port = 0 - |""".stripMargin) with MockitoSugar { + |""".stripMargin) { import ClusterShardingInternalsSpec._ - val clusterSharding = spy(new ClusterSharding(system.asInstanceOf[ExtendedActorSystem])) + case class StartingProxy( + typeName: String, + role: Option[String], + dataCenter: Option[DataCenter], + extractEntityId: ExtractEntityId, + extractShardId: ExtractShardId) + + val probe = TestProbe() + + val clusterSharding = new ClusterSharding(system.asInstanceOf[ExtendedActorSystem]) { + override def startProxy( + typeName: String, + role: Option[String], + dataCenter: Option[DataCenter], + extractEntityId: ExtractEntityId, + extractShardId: ExtractShardId): ActorRef = { + probe.ref ! StartingProxy(typeName, role, dataCenter, extractEntityId, extractShardId) + ActorRef.noSender + } + } "ClusterSharding" must { @@ -43,8 +60,8 @@ class ClusterShardingInternalsSpec extends AkkaSpec(""" val settingsWithRole = ClusterShardingSettings(system).withRole("nonExistingRole") val typeName = "typeName" - val extractEntityId = mock[ShardRegion.ExtractEntityId] - val extractShardId = mock[ShardRegion.ExtractShardId] + val extractEntityId: ExtractEntityId = { case msg: Msg => ("42", msg) } + val extractShardId: ExtractShardId = _ => "37" clusterSharding.start( typeName = typeName, @@ -52,15 +69,10 @@ class ClusterShardingInternalsSpec extends AkkaSpec(""" settings = settingsWithRole, extractEntityId = extractEntityId, extractShardId = extractShardId, - allocationStrategy = mock[ShardAllocationStrategy], + allocationStrategy = new LeastShardAllocationStrategy(3, 4), handOffStopMessage = PoisonPill) - verify(clusterSharding).startProxy( - ArgumentMatchers.eq(typeName), - ArgumentMatchers.eq(settingsWithRole.role), - ArgumentMatchers.eq(None), - ArgumentMatchers.eq(extractEntityId), - ArgumentMatchers.eq(extractShardId)) + probe.expectMsg(StartingProxy(typeName, settingsWithRole.role, None, extractEntityId, extractShardId)) } "stop entities from HandOffStopper even if the entity doesn't handle handOffStopMessage" in { diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/fold.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/fold.md index ff38d53cfa..a12b4ecf4b 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/fold.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/fold.md @@ -19,6 +19,7 @@ completes, the current value is emitted downstream. Note that the `zero` value must be immutable. +## Reactive Streams semantics @@@div { .callout } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 1bd1987062..ae6ae9f0e5 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -208,8 +208,7 @@ object Dependencies { Provided.levelDBNative, Test.junit, Test.scalatest.value, - Test.commonsIo, - Test.mockito) + Test.commonsIo) val clusterMetrics = l ++= Seq(Provided.sigarLoader, Test.slf4jJul, Test.slf4jLog4j, Test.logback, Test.mockito)