cleanup some systemActor stuff (#25606)

* remove the ActorRef.apply(Future) since that is uncertain, see ticket #25305
This commit is contained in:
Patrik Nordwall 2018-09-14 15:18:47 +02:00 committed by Christopher Batey
parent d8857e8d2a
commit 5371aa17e4
7 changed files with 36 additions and 45 deletions

View file

@ -123,12 +123,4 @@ trait ActorTestKit {
final def spawn[T](behavior: Behavior[T], name: String, props: Props): ActorRef[T] =
Await.result(internalSystem ? (ActorTestKitGuardian.SpawnActor(name, behavior, _, props)), timeout.duration)
// FIXME needed for Akka internal tests but, users shouldn't spawn system actors?
@InternalApi
private[akka] def systemActor[T](behavior: Behavior[T], name: String): ActorRef[T] =
Await.result(system.systemActorOf(behavior, name), timeout.duration)
@InternalApi
private[akka] def systemActor[T](behavior: Behavior[T]): ActorRef[T] =
Await.result(system.systemActorOf(behavior, childName.next()), timeout.duration)
}

View file

@ -48,11 +48,11 @@ class WatchSpec extends ActorTestKit
import WatchSpec._
class WatchSetup {
val terminator = systemActor(terminatorBehavior)
val terminator = spawn(terminatorBehavior)
val receivedTerminationSignal: Promise[Terminated] = Promise()
val watchProbe = TestProbe[Done]()
val watcher = systemActor(
val watcher = spawn(
Behaviors.supervise(
Behaviors.receive[StartWatching] {
case (ctx, StartWatching(watchee))
@ -155,11 +155,11 @@ class WatchSpec extends ActorTestKit
}
class WatchWithSetup {
val terminator = systemActor(terminatorBehavior)
val terminator = spawn(terminatorBehavior)
val receivedTerminationSignal: Promise[Message] = Promise()
val watchProbe = TestProbe[Done]()
val watcher = systemActor(
val watcher = spawn(
Behaviors.supervise(
Behaviors.receive[Message] {
case (ctx, StartWatchingWith(watchee, msg))
@ -191,11 +191,11 @@ class WatchSpec extends ActorTestKit
}
"allow watch message definition after watch using unwatch" in {
val terminator = systemActor(terminatorBehavior)
val terminator = spawn(terminatorBehavior)
val receivedTerminationSignal: Promise[Message] = Promise()
val watchProbe = TestProbe[Done]()
val watcher = systemActor(
val watcher = spawn(
Behaviors.supervise(
Behaviors.receive[Message] {
case (ctx, StartWatching(watchee))
@ -221,11 +221,11 @@ class WatchSpec extends ActorTestKit
}
"allow watch message redefinition using unwatch" in {
val terminator = systemActor(terminatorBehavior)
val terminator = spawn(terminatorBehavior)
val receivedTerminationSignal: Promise[Message] = Promise()
val watchProbe = TestProbe[Done]()
val watcher = systemActor(
val watcher = spawn(
Behaviors.supervise(
Behaviors.receive[Message] {
case (ctx, StartWatchingWith(watchee, msg))
@ -248,10 +248,10 @@ class WatchSpec extends ActorTestKit
}
class ErrorTestSetup {
val terminator = systemActor(terminatorBehavior)
val terminator = spawn(terminatorBehavior)
private val stopProbe = TestProbe[Done]()
val watcher = systemActor(
val watcher = spawn(
Behaviors.supervise(
Behaviors.receive[Message] {
case (ctx, StartWatchingWith(watchee, msg))

View file

@ -61,17 +61,6 @@ object ActorRef {
def !(msg: T): Unit = ref.tell(msg)
}
/**
* INTERNAL API
*
* FIXME, this isn't really used since we removed the native actor system
*/
@InternalApi private[akka] def apply[T](f: Future[ActorRef[T]], bufferSize: Int = 1000): ActorRef[T] =
f.value match {
// an AdaptedActorSystem will always create refs eagerly, so it will take this path
case Some(Success(ref)) ref
case _ throw new IllegalStateException("Only expecting completed futures until the native actor system is implemented")
}
}
/**

View file

@ -12,6 +12,7 @@ import scala.concurrent.duration._
import scala.reflect.ClassTag
import akka.actor.typed.ExtensionSetup
import akka.actor.typed.Props
import akka.annotation.InternalApi
/**
@ -47,11 +48,8 @@ abstract class Receptionist extends Extension {
}.get
} else LocalReceptionist
ActorRef(
system.systemActorOf(provider.behavior, "receptionist")(
// FIXME: where should that timeout be configured? Shouldn't there be a better `Extension`
// implementation that does this dance for us?
10.seconds))
import akka.actor.typed.scaladsl.adapter._
system.internalSystemActorOf(provider.behavior, "receptionist", Props.empty)
}
}

View file

@ -5,7 +5,9 @@
package akka.actor.typed
package scaladsl
import akka.actor.ExtendedActorSystem
import akka.actor.typed.internal.adapter._
import akka.annotation.InternalApi
/**
* Scala API: Adapters between typed and untyped actors and actor systems.
@ -52,6 +54,13 @@ package object adapter {
*/
implicit class TypedActorSystemOps(val sys: ActorSystem[_]) extends AnyVal {
def toUntyped: akka.actor.ActorSystem = ActorSystemAdapter.toUntyped(sys)
/**
* INTERNAL API
*/
@InternalApi private[akka] def internalSystemActorOf[U](behavior: Behavior[U], name: String, props: Props): ActorRef[U] = {
toUntyped.asInstanceOf[ExtendedActorSystem].systemActorOf(PropsAdapter(behavior, props), name)
}
}
/**

View file

@ -9,6 +9,7 @@ import akka.actor.typed.Extension
import akka.actor.typed.ExtensionId
import akka.actor.typed.ActorRef
import akka.actor.ExtendedActorSystem
import akka.actor.typed.Props
object DistributedData extends ExtensionId[DistributedData] {
def get(system: ActorSystem[_]): DistributedData = apply(system)
@ -43,7 +44,7 @@ class DistributedData(system: ActorSystem[_]) extends Extension {
val underlyingReplicator = akka.cluster.ddata.DistributedData(untypedSystem).replicator
val replicatorBehavior = Replicator.behavior(settings, underlyingReplicator)
untypedSystem.systemActorOf(PropsAdapter(replicatorBehavior), name)
system.internalSystemActorOf(replicatorBehavior, name, Props.empty)
}
}

View file

@ -5,6 +5,7 @@
package akka.cluster.typed.internal
import akka.actor.ExtendedActorSystem
import akka.actor.typed.Props
import akka.annotation.InternalApi
import akka.cluster.ClusterEvent.MemberEvent
import akka.cluster.{ ClusterEvent, MemberStatus }
@ -13,6 +14,7 @@ import akka.cluster.typed._
import akka.actor.typed.internal.adapter.ActorSystemAdapter
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.cluster.Member
/**
* INTERNAL API:
@ -136,19 +138,19 @@ private[akka] final class AdapterClusterImpl(system: ActorSystem[_]) extends Clu
import AdapterClusterImpl._
require(system.isInstanceOf[ActorSystemAdapter[_]], "only adapted actor systems can be used for cluster features")
private val untypedSystem = system.toUntyped
private def extendedUntyped = untypedSystem.asInstanceOf[ExtendedActorSystem]
private val untypedCluster = akka.cluster.Cluster(untypedSystem)
private val untypedCluster = akka.cluster.Cluster(system.toUntyped)
override def selfMember = untypedCluster.selfMember
override def isTerminated = untypedCluster.isTerminated
override def state = untypedCluster.state
override def selfMember: Member = untypedCluster.selfMember
override def isTerminated: Boolean = untypedCluster.isTerminated
override def state: ClusterEvent.CurrentClusterState = untypedCluster.state
// must not be lazy as it also updates the cached selfMember
override val subscriptions: ActorRef[ClusterStateSubscription] = extendedUntyped.systemActorOf(
PropsAdapter(subscriptionsBehavior(untypedCluster)), "clusterStateSubscriptions")
override val subscriptions: ActorRef[ClusterStateSubscription] =
system.internalSystemActorOf(
subscriptionsBehavior(untypedCluster), "clusterStateSubscriptions", Props.empty)
override lazy val manager: ActorRef[ClusterCommand] = extendedUntyped.systemActorOf(
PropsAdapter(managerBehavior(untypedCluster)), "clusterCommandManager")
override lazy val manager: ActorRef[ClusterCommand] =
system.internalSystemActorOf(
managerBehavior(untypedCluster), "clusterCommandManager", Props.empty)
}