diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/ExtensionsTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/ExtensionsTest.java index 99e8766d63..77284ca6d1 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/ExtensionsTest.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/ExtensionsTest.java @@ -13,8 +13,7 @@ import java.util.Optional; import static junit.framework.TestCase.assertSame; import static org.junit.Assert.assertTrue; -public class ExtensionsTest extends JUnitSuite { - +class ExtensionsTest extends JUnitSuite { public static class MyExtImpl implements Extension { } @@ -46,7 +45,7 @@ public class ExtensionsTest extends JUnitSuite { Behavior.empty(), "loadJavaExtensionsFromConfig", Optional.empty(), - Optional.of(ConfigFactory.parseString("akka.typed.extensions += \"akka.actor.typed.ExtensionsTest$MyExtension\"").resolve()), + Optional.of(ConfigFactory.parseString("akka.typed.extensions += \"akka.typed.ExtensionsTest$MyExtension\"").resolve()), Optional.empty(), Optional.empty() ); diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala index e41dcc82e9..19c0e5583f 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala @@ -285,9 +285,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( implicit def system: ActorSystem[TypedSpec.Command] - private def mySuite: String = - if (system eq nativeSystem) suite + "Native" - else suite + "Adapted" + private def mySuite: String = suite + "Adapted" def setup(name: String, wrapper: Option[Behavior[Command] ⇒ Behavior[Command]] = None, ignorePostStop: Boolean = true)( proc: (scaladsl.ActorContext[Event], StepWise.Steps[Event, ActorRef[Command]]) ⇒ StepWise.Steps[Event, _]): Future[TypedSpec.Status] = @@ -635,7 +633,6 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] = subject(ctx.self, ignorePostStop) } - object `An ActorContext (native)` extends Normal with NativeSystem object `An ActorContext (adapted)` extends Normal with AdaptedSystem trait Widened extends Tests { @@ -644,7 +641,6 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] = subject(ctx.self, ignorePostStop).widen { case x ⇒ x } } - object `An ActorContext with widened Behavior (native)` extends Widened with NativeSystem object `An ActorContext with widened Behavior (adapted)` extends Widened with AdaptedSystem trait Deferred extends Tests { @@ -652,7 +648,6 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] = Actor.deferred(_ ⇒ subject(ctx.self, ignorePostStop)) } - object `An ActorContext with deferred Behavior (native)` extends Deferred with NativeSystem object `An ActorContext with deferred Behavior (adapted)` extends Deferred with AdaptedSystem trait NestedDeferred extends Tests { @@ -660,7 +655,6 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] = Actor.deferred(_ ⇒ Actor.deferred(_ ⇒ subject(ctx.self, ignorePostStop))) } - object `An ActorContext with nested deferred Behavior (native)` extends NestedDeferred with NativeSystem object `An ActorContext with nested deferred Behavior (adapted)` extends NestedDeferred with AdaptedSystem trait Tap extends Tests { @@ -668,7 +662,5 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString( override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] = Actor.tap((_, _) ⇒ (), (_, _) ⇒ (), subject(ctx.self, ignorePostStop)) } - object `An ActorContext with Tap (old-native)` extends Tap with NativeSystem object `An ActorContext with Tap (old-adapted)` extends Tap with AdaptedSystem - } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/AskSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/AskSpec.scala index d0f0427244..f672ced41f 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/AskSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/AskSpec.scala @@ -56,8 +56,6 @@ class AskSpec extends TypedSpec with ScalaFutures { } } - object `Ask pattern (native)` extends Common with NativeSystem - object `Ask pattern (adapted)` extends Common with AdaptedSystem { import AskSpec._ diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala index 12889efe74..9c14edf707 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala @@ -286,7 +286,6 @@ class BehaviorSpec extends TypedSpec { trait FullBehavior extends Messages with BecomeWithLifecycle with Stoppable { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = mkFull(monitor) → null } - object `A Full Behavior (native)` extends FullBehavior with NativeSystem object `A Full Behavior (adapted)` extends FullBehavior with AdaptedSystem trait ImmutableBehavior extends Messages with BecomeWithLifecycle with Stoppable { @@ -320,7 +319,6 @@ class BehaviorSpec extends TypedSpec { } } } - object `A immutable Behavior (native)` extends ImmutableBehavior with NativeSystem object `A immutable Behavior (adapted)` extends ImmutableBehavior with AdaptedSystem trait ImmutableWithSignalScalaBehavior extends Messages with BecomeWithLifecycle with Stoppable { @@ -356,7 +354,6 @@ class BehaviorSpec extends TypedSpec { SActor.same } } - object `A ImmutableWithSignal Behavior (scala,native)` extends ImmutableWithSignalScalaBehavior with NativeSystem object `A ImmutableWithSignal Behavior (scala,adapted)` extends ImmutableWithSignalScalaBehavior with AdaptedSystem trait ImmutableScalaBehavior extends Messages with Become with Stoppable { @@ -387,7 +384,6 @@ class BehaviorSpec extends TypedSpec { } } } - object `A immutable Behavior (scala,native)` extends ImmutableScalaBehavior with NativeSystem object `A immutable Behavior (scala,adapted)` extends ImmutableScalaBehavior with AdaptedSystem trait MutableScalaBehavior extends Messages with Become with Stoppable { @@ -425,7 +421,6 @@ class BehaviorSpec extends TypedSpec { } } } - object `A mutable Behavior (scala,native)` extends MutableScalaBehavior with NativeSystem object `A mutable Behavior (scala,adapted)` extends MutableScalaBehavior with AdaptedSystem trait WidenedScalaBehavior extends ImmutableWithSignalScalaBehavior with Reuse with Siphon { @@ -436,7 +431,6 @@ class BehaviorSpec extends TypedSpec { super.behavior(monitor)._1.widen[Command] { case c ⇒ inbox.ref ! c; c } → inbox } } - object `A widened Behavior (scala,native)` extends WidenedScalaBehavior with NativeSystem object `A widened Behavior (scala,adapted)` extends WidenedScalaBehavior with AdaptedSystem trait DeferredScalaBehavior extends ImmutableWithSignalScalaBehavior { @@ -453,7 +447,6 @@ class BehaviorSpec extends TypedSpec { override def checkAux(signal: Signal, aux: Aux): Unit = aux.receiveAll() should ===(Done :: Nil) } - object `A deferred Behavior (scala,native)` extends DeferredScalaBehavior with NativeSystem object `A deferred Behavior (scala,adapted)` extends DeferredScalaBehavior with AdaptedSystem trait TapScalaBehavior extends ImmutableWithSignalScalaBehavior with Reuse with SignalSiphon { @@ -462,7 +455,6 @@ class BehaviorSpec extends TypedSpec { (SActor.tap((_, msg) ⇒ inbox.ref ! Right(msg), (_, sig) ⇒ inbox.ref ! Left(sig), super.behavior(monitor)._1), inbox) } } - object `A tap Behavior (scala,native)` extends TapScalaBehavior with NativeSystem object `A tap Behavior (scala,adapted)` extends TapScalaBehavior with AdaptedSystem trait RestarterScalaBehavior extends ImmutableWithSignalScalaBehavior with Reuse { @@ -470,7 +462,6 @@ class BehaviorSpec extends TypedSpec { SActor.supervise(super.behavior(monitor)._1).onFailure(SupervisorStrategy.restart) → null } } - object `A restarter Behavior (scala,native)` extends RestarterScalaBehavior with NativeSystem object `A restarter Behavior (scala,adapted)` extends RestarterScalaBehavior with AdaptedSystem /* @@ -536,7 +527,6 @@ class BehaviorSpec extends TypedSpec { SActor.same })) } - object `A ImmutableWithSignal Behavior (java,native)` extends ImmutableWithSignalJavaBehavior with NativeSystem object `A ImmutableWithSignal Behavior (java,adapted)` extends ImmutableWithSignalJavaBehavior with AdaptedSystem trait ImmutableJavaBehavior extends Messages with Become with Stoppable { @@ -568,7 +558,6 @@ class BehaviorSpec extends TypedSpec { }) } } - object `A immutable Behavior (java,native)` extends ImmutableJavaBehavior with NativeSystem object `A immutable Behavior (java,adapted)` extends ImmutableJavaBehavior with AdaptedSystem trait WidenedJavaBehavior extends ImmutableWithSignalJavaBehavior with Reuse with Siphon { @@ -577,7 +566,6 @@ class BehaviorSpec extends TypedSpec { JActor.widened(super.behavior(monitor)._1, pf(_.`match`(classOf[Command], fi(x ⇒ { inbox.ref ! x; x })))) → inbox } } - object `A widened Behavior (java,native)` extends WidenedJavaBehavior with NativeSystem object `A widened Behavior (java,adapted)` extends WidenedJavaBehavior with AdaptedSystem trait DeferredJavaBehavior extends ImmutableWithSignalJavaBehavior { @@ -594,7 +582,6 @@ class BehaviorSpec extends TypedSpec { override def checkAux(signal: Signal, aux: Aux): Unit = aux.receiveAll() should ===(Done :: Nil) } - object `A deferred Behavior (java,native)` extends DeferredJavaBehavior with NativeSystem object `A deferred Behavior (java,adapted)` extends DeferredJavaBehavior with AdaptedSystem trait TapJavaBehavior extends ImmutableWithSignalJavaBehavior with Reuse with SignalSiphon { @@ -606,7 +593,6 @@ class BehaviorSpec extends TypedSpec { super.behavior(monitor)._1), inbox) } } - object `A tap Behavior (java,native)` extends TapJavaBehavior with NativeSystem object `A tap Behavior (java,adapted)` extends TapJavaBehavior with AdaptedSystem trait RestarterJavaBehavior extends ImmutableWithSignalJavaBehavior with Reuse { @@ -615,7 +601,6 @@ class BehaviorSpec extends TypedSpec { .onFailure(classOf[Exception], SupervisorStrategy.restart) → null } } - object `A restarter Behavior (java,native)` extends RestarterJavaBehavior with NativeSystem object `A restarter Behavior (java,adapted)` extends RestarterJavaBehavior with AdaptedSystem } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/DeferredSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/DeferredSpec.scala index e97fac8d34..59cc077522 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/DeferredSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/DeferredSpec.scala @@ -158,10 +158,7 @@ class DeferredSpec extends TypedSpec { } - object `A DeferredBehavior (stubbed, native)` extends StubbedTests with NativeSystem object `A DeferredBehavior (stubbed, adapted)` extends StubbedTests with AdaptedSystem - - object `A DeferredBehavior (real, native)` extends RealTests with NativeSystem object `A DeferredBehavior (real, adapted)` extends RealTests with AdaptedSystem } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ExtensionsSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ExtensionsSpec.scala index b48b906c1b..d370039867 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ExtensionsSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ExtensionsSpec.scala @@ -99,6 +99,7 @@ class ExtensionsSpec extends TypedSpecSetup { akka.typed.extensions = ["akka.actor.typed.FailingToLoadExtension$"] """))) } + intercept[RuntimeException] { create() } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/PerformanceSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/PerformanceSpec.scala index 03823eef0b..870d56bbd3 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/PerformanceSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/PerformanceSpec.scala @@ -62,7 +62,7 @@ class PerformanceSpec extends TypedSpec( } } - val iterations = nativeSystem.settings.config.getInt("akka.actor.typed.PerformanceSpec.iterations") + val iterations = system.settings.config.getInt("akka.actor.typed.PerformanceSpec.iterations") trait CommonTests { implicit def system: ActorSystem[TypedSpec.Command] @@ -78,8 +78,6 @@ class PerformanceSpec extends TypedSpec( def `09 when using 8 pairs with 10 messages`(): Unit = sync(runTest("09")(behavior(8, 10, iterations, "dispatcher-8"))) } - object `must be fast with native ActorSystem` extends CommonTests with NativeSystem object `must be fast with ActorSystemAdapter` extends CommonTests with AdaptedSystem } - } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/PropsSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/PropsSpec.scala index d45c23b6e7..4d30466aa6 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/PropsSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/PropsSpec.scala @@ -5,35 +5,16 @@ package akka.actor.typed class PropsSpec extends TypedSpecSetup { - val dispatcherFirst = DispatcherDefault(MailboxCapacity(666, DispatcherFromConfig("pool"))) - val mailboxFirst = MailboxCapacity(999) withNext dispatcherFirst + val dispatcherFirst = DispatcherDefault(DispatcherFromConfig("pool")) object `A Props` { def `must get first dispatcher`(): Unit = { dispatcherFirst.firstOrElse[DispatcherSelector](null) should ===(dispatcherFirst) - mailboxFirst.firstOrElse[DispatcherSelector](null) should ===(dispatcherFirst) - } - - def `must get first mailbox`(): Unit = { - dispatcherFirst.firstOrElse[MailboxCapacity](null).capacity should ===(666) - mailboxFirst.firstOrElse[MailboxCapacity](null).capacity should ===(999) - } - - def `must get default value`(): Unit = { - mailboxFirst.firstOrElse[DispatcherFromExecutor](null) should ===(null) - } - - def `must filter out the right things`(): Unit = { - val filtered = mailboxFirst.filterNot[DispatcherSelector] - filtered.firstOrElse[MailboxCapacity](null).capacity should ===(999) - filtered.firstOrElse[DispatcherSelector](null) should ===(null) } def `must yield all configs of some type`(): Unit = { dispatcherFirst.allOf[DispatcherSelector] should ===(DispatcherSelector.default() :: DispatcherSelector.fromConfig("pool") :: Nil) - mailboxFirst.allOf[MailboxCapacity] should ===(List(999, 666).map(MailboxCapacity(_))) } - } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/RestarterSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/RestarterSpec.scala index b2c87a607b..1f36b04e91 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/RestarterSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/RestarterSpec.scala @@ -418,10 +418,7 @@ class RestarterSpec extends TypedSpec { } - object `A restarter (stubbed, native)` extends StubbedTests with NativeSystem object `A restarter (stubbed, adapted)` extends StubbedTests with AdaptedSystem - - object `A restarter (real, native)` extends RealTests with NativeSystem object `A restarter (real, adapted)` extends RealTests with AdaptedSystem } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TimerSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TimerSpec.scala index 2237124e78..e00408971c 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TimerSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TimerSpec.scala @@ -223,7 +223,5 @@ class TimerSpec extends TypedSpec(""" } } - object `A Restarter (real, native)` extends RealTests with NativeSystem object `A Restarter (real, adapted)` extends RealTests with AdaptedSystem - } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TypedSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TypedSpec.scala index fdba1f7a12..35f4854e0f 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TypedSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/TypedSpec.scala @@ -48,6 +48,7 @@ class TypedSpecSetup extends RefSpec with Matchers with BeforeAndAfterAll with S * Helper class for writing tests against both ActorSystemImpl and ActorSystemAdapter. */ abstract class TypedSpec(val config: Config) extends TypedSpecSetup { + import TypedSpec._ import AskPattern._ @@ -58,16 +59,8 @@ abstract class TypedSpec(val config: Config) extends TypedSpecSetup { // extension point def setTimeout: Timeout = Timeout(1.minute) - private var nativeSystemUsed = false - lazy val nativeSystem: ActorSystem[TypedSpec.Command] = { - val sys = ActorSystem(guardian(), AkkaSpec.getCallerName(classOf[TypedSpec]), config = Some(config withFallback AkkaSpec.testConf)) - nativeSystemUsed = true - sys - } - private var adaptedSystemUsed = false lazy val system: ActorSystem[TypedSpec.Command] = { - val sys = ActorSystem.adapter(AkkaSpec.getCallerName(classOf[TypedSpec]), guardian(), config = Some(config withFallback AkkaSpec.testConf)) - adaptedSystemUsed = true + val sys = ActorSystem(guardian(), AkkaSpec.getCallerName(classOf[TypedSpec]), config = Some(config withFallback AkkaSpec.testConf)) sys } @@ -85,33 +78,27 @@ abstract class TypedSpec(val config: Config) extends TypedSpecSetup { } } - trait NativeSystem { - def system: ActorSystem[TypedSpec.Command] = nativeSystem - } - trait AdaptedSystem { def system: ActorSystem[TypedSpec.Command] = TypedSpec.this.system } implicit val timeout = setTimeout - implicit def scheduler = nativeSystem.scheduler + implicit def scheduler = system.scheduler + + lazy val blackhole = await(system ? Create(immutable[Any] { case _ ⇒ same }, "blackhole")) override def afterAll(): Unit = { - if (nativeSystemUsed) - Await.result(nativeSystem.terminate, timeout.duration) - if (adaptedSystemUsed) - Await.result(system.terminate, timeout.duration) + Await.result(system.terminate, timeout.duration) } // TODO remove after basing on ScalaTest 3 with async support import akka.testkit._ - def await[T](f: Future[T]): T = Await.result(f, timeout.duration * 1.1) - lazy val blackhole = await(nativeSystem ? Create(immutable[Any] { case _ ⇒ same }, "blackhole")) + def await[T](f: Future[T]): T = Await.result(f, timeout.duration * 1.1) /** * Run an Actor-based test. The test procedure is most conveniently - * formulated using the [[StepWise$]] behavior type. + * formulated using the [[StepWise]] behavior type. */ def runTest[T: ClassTag](name: String)(behavior: Behavior[T])(implicit system: ActorSystem[Command]): Future[Status] = system ? (RunTest(name, behavior, _, timeout.duration)) @@ -176,6 +163,7 @@ abstract class TypedSpec(val config: Config) extends TypedSpecSetup { } object TypedSpec { + import akka.{ typed ⇒ t } sealed abstract class Start @@ -246,7 +234,6 @@ class TypedSpecSpec extends TypedSpec { } } - object `when using the native implementation` extends CommonTests with NativeSystem object `when using the adapted implementation` extends CommonTests with AdaptedSystem } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/WatchSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/WatchSpec.scala index 65323e2285..b8cbd7b907 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/WatchSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/WatchSpec.scala @@ -66,6 +66,5 @@ class WatchSpec extends TypedSpec { } } - object `Actor monitoring (native)` extends Tests with NativeSystem object `Actor monitoring (adapted)` extends Tests with AdaptedSystem } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorCellSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorCellSpec.scala deleted file mode 100644 index b9be95f6e8..0000000000 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorCellSpec.scala +++ /dev/null @@ -1,462 +0,0 @@ -/** - * Copyright (C) 2016-2017 Lightbend Inc. - */ -package akka.actor.typed -package internal - -import akka.actor.typed.scaladsl.Actor -import akka.actor.typed.scaladsl.Actor._ -import org.scalactic.ConversionCheckedTripleEquals -import org.scalatest.concurrent.ScalaFutures -import org.scalatest.exceptions.TestFailedException -import org.junit.runner.RunWith -import org.scalatest._ - -@RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with ScalaFutures with ConversionCheckedTripleEquals { - - val sys = new ActorSystemStub("ActorCellSpec") - def ec = sys.controlledExecutor - - object `An ActorCell` { - - def `must be creatable`(): Unit = { - val parent = new DebugRef[String](sys.path / "creatable", true) - val cell = new ActorCell(sys, deferred[String](_ ⇒ { - parent ! "created" - immutable[String] { - case (_, s) ⇒ - parent ! s - same - } - }), ec, 1000, parent) - debugCell(cell) { - ec.queueSize should ===(0) - cell.sendSystem(Create()) - ec.runOne() - ec.queueSize should ===(0) - parent.receiveAll() should ===(Right("created") :: Nil) - cell.send("hello") - ec.runOne() - ec.queueSize should ===(0) - parent.receiveAll() should ===(Right("hello") :: Nil) - } - } - - def `must be creatable with ???`(): Unit = { - val parent = new DebugRef[String](sys.path / "creatable???", true) - val self = new DebugRef[String](sys.path / "creatableSelf", true) - val ??? = new NotImplementedError - val cell = new ActorCell(sys, deferred[String](_ ⇒ { parent ! "created"; throw ??? }), ec, 1000, parent) - cell.setSelf(self) - debugCell(cell) { - ec.queueSize should ===(0) - cell.sendSystem(Create()) - ec.runOne() - ec.queueSize should ===(0) - parent.receiveAll() should ===(Right("created") :: Nil) - // explicitly verify termination via self-signal - self.receiveAll() should ===(Left(Terminate()) :: Nil) - cell.sendSystem(Terminate()) - ec.runOne() - ec.queueSize should ===(0) - parent.receiveAll() should ===(Left(DeathWatchNotification(self, ???)) :: Nil) - } - } - - def `must be able to terminate after construction`(): Unit = { - val parent = new DebugRef[String](sys.path / "terminate", true) - val self = new DebugRef[String](sys.path / "terminateSelf", true) - val cell = new ActorCell(sys, deferred[String](_ ⇒ { parent ! "created"; stopped }), ec, 1000, parent) - cell.setSelf(self) - debugCell(cell) { - ec.queueSize should ===(0) - cell.sendSystem(Create()) - ec.runOne() - ec.queueSize should ===(0) - parent.receiveAll() should ===(Right("created") :: Nil) - // explicitly verify termination via self-signal - self.receiveAll() should ===(Left(Terminate()) :: Nil) - cell.sendSystem(Terminate()) - ec.runOne() - ec.queueSize should ===(0) - parent.receiveAll() should ===(Left(DeathWatchNotification(self, null)) :: Nil) - } - } - - def `must be able to terminate after being started`(): Unit = { - val parent = new DebugRef[String](sys.path / "terminate", true) - val self = new DebugRef[String](sys.path / "terminateSelf", true) - val cell = new ActorCell(sys, deferred[String](_ ⇒ { parent ! "created"; stopped }), ec, 1000, parent) - cell.setSelf(self) - debugCell(cell) { - ec.queueSize should ===(0) - cell.sendSystem(Create()) - ec.runOne() - ec.queueSize should ===(0) - parent.receiveAll() should ===(Right("created") :: Nil) - // explicitly verify termination via self-signal - self.receiveAll() should ===(Left(Terminate()) :: Nil) - cell.sendSystem(Terminate()) - ec.runOne() - ec.queueSize should ===(0) - parent.receiveAll() should ===(Left(DeathWatchNotification(self, null)) :: Nil) - } - } - - def `must terminate upon failure during processing`(): Unit = { - val parent = new DebugRef[String](sys.path / "terminate", true) - val self = new DebugRef[String](sys.path / "terminateSelf", true) - val ex = new AssertionError - val behavior = deferred[String](_ ⇒ { parent ! "created"; immutable[String] { case (s, _) ⇒ throw ex } }) - val cell = new ActorCell(sys, behavior, ec, 1000, parent) - cell.setSelf(self) - debugCell(cell) { - ec.queueSize should ===(0) - cell.sendSystem(Create()) - ec.runOne() - ec.queueSize should ===(0) - parent.receiveAll() should ===(Right("created") :: Nil) - cell.send("") - ec.runOne() - ec.queueSize should ===(0) - // explicitly verify termination via self-signal - self.receiveAll() should ===(Left(Terminate()) :: Nil) - cell.sendSystem(Terminate()) - ec.runOne() - ec.queueSize should ===(0) - parent.receiveAll() should ===(Left(DeathWatchNotification(self, ex)) :: Nil) - } - } - - def `must signal failure when starting behavior is "same"`(): Unit = { - val parent = new DebugRef[String](sys.path / "startSame", true) - val self = new DebugRef[String](sys.path / "startSameSelf", true) - val cell = new ActorCell(sys, deferred[String](_ ⇒ { parent ! "created"; same[String] }), ec, 1000, parent) - cell.setSelf(self) - debugCell(cell) { - ec.queueSize should ===(0) - cell.sendSystem(Create()) - ec.runOne() - ec.queueSize should ===(0) - parent.receiveAll() should ===(Right("created") :: Nil) - // explicitly verify termination via self-signal - self.receiveAll() should ===(Left(Terminate()) :: Nil) - cell.sendSystem(Terminate()) - ec.runOne() - ec.queueSize should ===(0) - parent.receiveAll() match { - case Left(DeathWatchNotification(`self`, exc)) :: Nil ⇒ - exc should not be null - exc shouldBe an[IllegalArgumentException] - exc.getMessage should include("Same") - case other ⇒ fail(s"$other was not a DeathWatchNotification") - } - } - } - - def `must signal failure when starting behavior is "unhandled"`(): Unit = { - val parent = new DebugRef[String](sys.path / "startSame", true) - val self = new DebugRef[String](sys.path / "startSameSelf", true) - val cell = new ActorCell(sys, deferred[String](_ ⇒ { parent ! "created"; unhandled[String] }), ec, 1000, parent) - cell.setSelf(self) - debugCell(cell) { - ec.queueSize should ===(0) - cell.sendSystem(Create()) - ec.runOne() - ec.queueSize should ===(0) - parent.receiveAll() should ===(Right("created") :: Nil) - // explicitly verify termination via self-signal - self.receiveAll() should ===(Left(Terminate()) :: Nil) - cell.sendSystem(Terminate()) - ec.runOne() - ec.queueSize should ===(0) - parent.receiveAll() match { - case Left(DeathWatchNotification(`self`, exc)) :: Nil ⇒ - exc should not be null - exc shouldBe an[IllegalArgumentException] - exc.getMessage should include("Unhandled") - case other ⇒ fail(s"$other was not a DeathWatchNotification") - } - } - } - - /* - * also tests: - * - must reschedule for self-message - * - must not reschedule for message when already activated - * - must not reschedule for signal when already activated - */ - def `must not execute more messages than were batched naturally`(): Unit = { - val parent = new DebugRef[String](sys.path / "batching", true) - val cell = new ActorCell(sys, deferred[String] { ctx ⇒ - immutable[String] { - case (_, s) ⇒ - ctx.self ! s - parent ! s - same - } - }, ec, 1000, parent) - val ref = new LocalActorRef(parent.path / "child", cell) - cell.setSelf(ref) - debugCell(cell) { - ec.queueSize should ===(0) - cell.sendSystem(Create()) - ec.runOne() - ec.queueSize should ===(0) - parent.receiveAll() should ===(Nil) - cell.send("one") - cell.send("two") - ec.queueSize should ===(1) - ec.runOne() - ec.queueSize should ===(1) - parent.receiveAll() should ===(Right("one") :: Right("two") :: Nil) - ec.runOne() - ec.queueSize should ===(1) - parent.receiveAll() should ===(Right("one") :: Right("two") :: Nil) - cell.send("three") - ec.runOne() - ec.queueSize should ===(1) - parent.receiveAll() should ===(Right("one") :: Right("two") :: Right("three") :: Nil) - cell.sendSystem(Terminate()) - ec.queueSize should ===(1) - ec.runOne() - ec.queueSize should ===(0) - parent.receiveAll() should ===(Left(DeathWatchNotification(ref, null)) :: Nil) - } - } - - def `must signal DeathWatch when terminating normally`(): Unit = { - val parent = new DebugRef[String](sys.path / "watchNormal", true) - val client = new DebugRef[String](parent.path / "client", true) - val cell = new ActorCell(sys, Actor.empty[String], ec, 1000, parent) - val ref = new LocalActorRef(parent.path / "child", cell) - cell.setSelf(ref) - debugCell(cell) { - ec.queueSize should ===(0) - cell.sendSystem(Watch(ref, client)) - cell.sendSystem(Terminate()) - ec.runOne() - ec.queueSize should ===(0) - parent.receiveAll() should ===(Left(DeathWatchNotification(ref, null)) :: Nil) - client.receiveAll() should ===(Left(DeathWatchNotification(ref, null)) :: Nil) - } - } - - /* - * also tests: - * - must turn a DeathWatchNotification into a Terminated signal while watching - * - must terminate with DeathPactException when not handling a Terminated signal - * - must send a Watch message when watching another actor - */ - def `must signal DeathWatch when terminating abnormally`(): Unit = { - val parent = new DebugRef[String](sys.path / "watchAbnormal", true) - val client = new DebugRef[String](parent.path / "client", true) - val other = new DebugRef[String](parent.path / "other", true) - val cell = new ActorCell(sys, deferred[String] { ctx ⇒ ctx.watch(parent); Actor.empty }, ec, 1000, parent) - val ref = new LocalActorRef(parent.path / "child", cell) - cell.setSelf(ref) - debugCell(cell) { - ec.queueSize should ===(0) - cell.sendSystem(Create()) - ec.runOne() - ec.queueSize should ===(0) - parent.receiveAll() should ===(Left(Watch(parent, ref)) :: Nil) - // test that unwatched termination is ignored - cell.sendSystem(DeathWatchNotification(other, null)) - ec.runOne() - ec.queueSize should ===(0) - parent.receiveAll() should ===(Nil) - // now trigger failure by death pact - cell.sendSystem(Watch(ref, client)) - cell.sendSystem(DeathWatchNotification(parent, null)) - ec.runOne() - ec.queueSize should ===(0) - parent.receiveAll() match { - case Left(DeathWatchNotification(ref, exc)) :: Nil ⇒ - exc should not be null - exc shouldBe a[DeathPactException] - case other ⇒ fail(s"$other was not a DeathWatchNotification") - } - client.receiveAll() should ===(Left(DeathWatchNotification(ref, null)) :: Nil) - } - } - - def `must signal DeathWatch when watching after termination`(): Unit = { - val parent = new DebugRef[String](sys.path / "watchLate", true) - val client = new DebugRef[String](parent.path / "client", true) - val cell = new ActorCell(sys, stopped[String], ec, 1000, parent) - val ref = new LocalActorRef(parent.path / "child", cell) - cell.setSelf(ref) - debugCell(cell) { - ec.queueSize should ===(0) - cell.sendSystem(Create()) - ec.runOne() - ec.queueSize should ===(0) - parent.receiveAll() should ===(Left(DeathWatchNotification(ref, null)) :: Nil) - cell.sendSystem(Watch(ref, client)) - ec.queueSize should ===(0) - sys.deadLettersInbox.receiveAll() should ===(Left(Watch(ref, client)) :: Nil) - // correct behavior of deadLetters is verified in ActorSystemSpec - } - } - - def `must signal DeathWatch when watching after termination but before deactivation`(): Unit = { - val parent = new DebugRef[String](sys.path / "watchSomewhatLate", true) - val client = new DebugRef[String](parent.path / "client", true) - val cell = new ActorCell(sys, Actor.empty[String], ec, 1000, parent) - val ref = new LocalActorRef(parent.path / "child", cell) - cell.setSelf(ref) - debugCell(cell) { - ec.queueSize should ===(0) - cell.sendSystem(Create()) - ec.runOne() - ec.queueSize should ===(0) - cell.sendSystem(Terminate()) - cell.sendSystem(Watch(ref, client)) - ec.runOne() - ec.queueSize should ===(0) - parent.receiveAll() should ===(Left(DeathWatchNotification(ref, null)) :: Nil) - sys.deadLettersInbox.receiveAll() should ===(Left(Watch(ref, client)) :: Nil) - } - } - - def `must not signal DeathWatch after Unwatch has been processed`(): Unit = { - val parent = new DebugRef[String](sys.path / "watchUnwatch", true) - val client = new DebugRef[String](parent.path / "client", true) - val cell = new ActorCell(sys, Actor.empty[String], ec, 1000, parent) - val ref = new LocalActorRef(parent.path / "child", cell) - cell.setSelf(ref) - debugCell(cell) { - ec.queueSize should ===(0) - cell.sendSystem(Watch(ref, client)) - cell.sendSystem(Unwatch(ref, client)) - cell.sendSystem(Terminate()) - ec.runOne() - ec.queueSize should ===(0) - parent.receiveAll() should ===(Left(DeathWatchNotification(ref, null)) :: Nil) - client.receiveAll() should ===(Nil) - } - } - - def `must send messages to deadLetters after being terminated`(): Unit = { - val parent = new DebugRef[String](sys.path / "sendDeadLetters", true) - val cell = new ActorCell(sys, stopped[String], ec, 1000, parent) - val ref = new LocalActorRef(parent.path / "child", cell) - cell.setSelf(ref) - debugCell(cell) { - ec.queueSize should ===(0) - cell.sendSystem(Create()) - ec.runOne() - ec.queueSize should ===(0) - parent.receiveAll() should ===(Left(DeathWatchNotification(ref, null)) :: Nil) - cell.send("42") - ec.queueSize should ===(0) - sys.deadLettersInbox.receiveAll() should ===(Right("42") :: Nil) - } - } - - /* - * also tests: - * - child creation - */ - def `must not terminate before children have terminated`(): Unit = { - val parent = new DebugRef[ActorRef[Nothing]](sys.path / "waitForChild", true) - val cell = new ActorCell(sys, deferred[String] { ctx ⇒ - ctx.spawn(deferred[String] { ctx ⇒ parent ! ctx.self; Actor.empty }, "child") - Actor.empty - }, ec, 1000, parent) - val ref = new LocalActorRef(parent.path / "child", cell) - cell.setSelf(ref) - debugCell(cell) { - ec.queueSize should ===(0) - cell.sendSystem(Create()) - ec.runOne() // creating subject - parent.hasSomething should ===(false) - ec.runOne() // creating child - ec.queueSize should ===(0) - val child = parent.receiveAll() match { - case Right(child) :: Nil ⇒ - child.sorryForNothing.sendSystem(Watch(child, parent)) - child - case other ⇒ fail(s"$other was not List(Right())") - } - ec.runOne() - ec.queueSize should ===(0) - cell.sendSystem(Terminate()) - ec.runOne() // begin subject termination, will initiate child termination - parent.hasSomething should ===(false) - ec.runOne() // terminate child - parent.receiveAll() should ===(Left(DeathWatchNotification(child, null)) :: Nil) - ec.runOne() // terminate subject - parent.receiveAll() should ===(Left(DeathWatchNotification(ref, null)) :: Nil) - } - } - - def `must properly terminate if failing while handling Terminated for child actor`(): Unit = { - val parent = new DebugRef[ActorRef[Nothing]](sys.path / "terminateWhenDeathPact", true) - val cell = new ActorCell(sys, deferred[String] { ctx ⇒ - ctx.watch(ctx.spawn(deferred[String] { ctx ⇒ parent ! ctx.self; Actor.empty }, "child")) - Actor.empty - }, ec, 1000, parent) - val ref = new LocalActorRef(parent.path / "child", cell) - cell.setSelf(ref) - debugCell(cell) { - ec.queueSize should ===(0) - cell.sendSystem(Create()) - ec.runOne() // creating subject - parent.hasSomething should ===(false) - ec.runOne() // creating child - ec.queueSize should ===(0) - val child = parent.receiveAll() match { - case Right(child: ActorRefImpl[Nothing]) :: Nil ⇒ - child.sendSystem(Watch(child, parent)) - child - case other ⇒ fail(s"$other was not List(Right())") - } - ec.runOne() - ec.queueSize should ===(0) - child.sendSystem(Terminate()) - ec.runOne() // child terminates and enqueues DeathWatchNotification - parent.receiveAll() should ===(Left(DeathWatchNotification(child, null)) :: Nil) - ec.runOne() // cell fails during Terminated and terminates with DeathPactException - parent.receiveAll() match { - case Left(DeathWatchNotification(`ref`, ex: DeathPactException)) :: Nil ⇒ - ex.getMessage should include("death pact") - case other ⇒ fail(s"$other was not Left(DeathWatchNotification($ref, DeathPactException))") - } - ec.queueSize should ===(0) - } - } - - def `must not terminate twice if failing in PostStop`(): Unit = { - val parent = new DebugRef[String](sys.path / "terminateProperlyPostStop", true) - val cell = new ActorCell(sys, immutable[String] { - case _ ⇒ unhandled - } onSignal { - case (_, PostStop) ⇒ ??? - }, ec, 1000, parent) - val ref = new LocalActorRef(parent.path / "child", cell) - cell.setSelf(ref) - debugCell(cell) { - ec.queueSize should ===(0) - cell.sendSystem(Create()) - ec.runOne() - ec.queueSize should ===(0) - cell.sendSystem(Terminate()) - ec.runOne() - ec.queueSize should ===(0) - parent.receiveAll() should ===(Left(DeathWatchNotification(ref, null)) :: Nil) - } - } - } - - private def debugCell[T, U](cell: ActorCell[T])(block: ⇒ U): U = - try block - catch { - case ex: TestFailedException ⇒ - println(cell) - throw ex - } - -} diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorSystemSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorSystemSpec.scala index 1b548a1717..0765c78399 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorSystemSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorSystemSpec.scala @@ -105,46 +105,10 @@ class ActorSystemSpec extends Spec with Matchers with BeforeAndAfterAll with Sca } } } - - } - - object `An ActorSystemImpl` extends CommonTests { - def system[T](behavior: Behavior[T], name: String) = ActorSystem(behavior, name) - def suite = "native" - - // this is essential to complete ActorCellSpec, see there - def `must correctly treat Watch dead letters`(): Unit = - withSystem("deadletters", Actor.empty[String]) { sys ⇒ - val client = new DebugRef[Int](sys.path / "debug", true) - sys.deadLetters.sorry.sendSystem(Watch(sys, client)) - client.receiveAll() should ===(Left(DeathWatchNotification(sys, null)) :: Nil) - } - - def `must start system actors and mangle their names`(): Unit = { - withSystem("systemActorOf", Actor.empty[String]) { sys ⇒ - import akka.actor.typed.scaladsl.AskPattern._ - implicit val timeout = Timeout(1.second) - implicit val sched = sys.scheduler - - case class Doner(ref: ActorRef[Done]) - - val ref1, ref2 = sys.systemActorOf(immutable[Doner] { - case (_, doner) ⇒ - doner.ref ! Done - same - }, "empty").futureValue - (ref1 ? Doner).futureValue should ===(Done) - (ref2 ? Doner).futureValue should ===(Done) - val RE = "(\\d+)-empty".r - val RE(num1) = ref1.path.name.toString - val RE(num2) = ref2.path.name.toString - num2.toInt should be > num1.toInt - } - } } object `An ActorSystemAdapter` extends CommonTests { - def system[T](behavior: Behavior[T], name: String) = ActorSystem.adapter(name, behavior) + def system[T](behavior: Behavior[T], name: String) = ActorSystem(behavior, name) def suite = "adapter" } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorSystemStub.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorSystemStub.scala index 837fa879cc..31b78480b7 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorSystemStub.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorSystemStub.scala @@ -10,6 +10,7 @@ import scala.concurrent._ import com.typesafe.config.ConfigFactory import java.util.concurrent.ThreadFactory +import akka.event.Logging import akka.typed.{ BusLogging, DefaultLoggingFilter, EventStream } import akka.util.Timeout @@ -37,7 +38,14 @@ private[typed] class ActorSystemStub(val name: String) } override def dynamicAccess: a.DynamicAccess = new a.ReflectiveDynamicAccess(getClass.getClassLoader) - override def eventStream: EventStream = new EventStreamImpl(true)(settings.untyped.LoggerStartTimeout) + override def eventStream: EventStream = new EventStream { + override def subscribe[T](subscriber: ActorRef[T], to: Class[T]) = false + override def setLogLevel(loglevel: Logging.LogLevel): Unit = {} + override def logLevel = Logging.InfoLevel + override def unsubscribe[T](subscriber: ActorRef[T], from: Class[T]) = false + override def unsubscribe[T](subscriber: ActorRef[T]): Unit = {} + override def publish[T](event: T): Unit = {} + } override def logFilter: e.LoggingFilter = new DefaultLoggingFilter(settings, eventStream) override def log: e.LoggingAdapter = new BusLogging(eventStream, path.parent.toString, getClass, logFilter) override def logConfiguration(): Unit = log.info(settings.toString) diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/EventStreamSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/EventStreamSpec.scala deleted file mode 100644 index 63b5321c1f..0000000000 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/EventStreamSpec.scala +++ /dev/null @@ -1,324 +0,0 @@ -/** - * Copyright (C) 2016-2017 Lightbend Inc. - */ -package akka.actor.typed -package internal - -import akka.Done -import akka.event.Logging._ -import akka.actor.typed.scaladsl.Actor._ -import akka.actor.typed.scaladsl.AskPattern._ -import akka.typed.testkit.Inbox -import akka.typed.Logger -import com.typesafe.config.ConfigFactory -import org.scalatest.concurrent.Eventually -import org.scalatest.concurrent.PatienceConfiguration.Timeout - -import scala.concurrent.duration._ - -object EventStreamSpec { - @volatile var logged = Vector.empty[LogEvent] - - class MyLogger extends Logger { - def initialBehavior: Behavior[Logger.Command] = - immutable { - case (ctx, Logger.Initialize(es, replyTo)) ⇒ - val logger = ctx.spawn(immutable[LogEvent] { (_, ev: LogEvent) ⇒ - logged :+= ev - same - }, "logger") - ctx.watch(logger) - replyTo ! logger - empty - } - } - - val config = ConfigFactory.parseString(""" -akka.typed.loggers = ["akka.actor.typed.internal.EventStreamSpec$MyLogger"] -""") - - // class hierarchy for subchannel test - class A - class B2 extends A - class B3 extends A - class C extends B2 - - trait T - trait AT extends T - trait ATT extends AT - trait BT extends T - trait BTT extends BT - class CC - class CCATBT extends CC with ATT with BTT -} - -class EventStreamSpec extends TypedSpec(EventStreamSpec.config) with Eventually { - import EventStreamSpec._ - - object `An EventStreamImpl` { - - val es = nativeSystem.eventStream - val root = nativeSystem.path - - def `must work in full system`(): Unit = { - es.logLevel should ===(WarningLevel) - nativeSystem.log.error("hello world") - nativeSystem.log.debug("should not see this") - es.setLogLevel(DebugLevel) - es.logLevel should ===(DebugLevel) - nativeSystem.log.debug("hello world DEBUG") - nativeSystem.log.info("hello world INFO") - - eventually(logged.map(_.message) should ===(Vector("hello world", "hello world DEBUG", "hello world INFO"))) - logged = Vector.empty - } - - def `must manage subscribers`(): Unit = { - val box = Inbox[AnyRef]("manage") - val ref: ActorRef[String] = box.ref - es.subscribe(ref, classOf[String]) should ===(true) - es.publish("hello") - es.unsubscribe(ref) - es.publish("my") - es.subscribe(ref, classOf[String]) should ===(true) - es.publish("lovely") - es.unsubscribe(ref, classOf[String]) should ===(true) - es.publish("quaint") - es.subscribe(ref, classOf[String]) should ===(true) - es.publish("little") - es.unsubscribe(box.ref, classOf[AnyRef]) should ===(true) - es.publish("grey") - es.subscribe(ref, classOf[String]) should ===(true) - es.publish("world") - box.receiveAll() should ===(Seq[AnyRef]("hello", "lovely", "little", "world")) - } - - def `must care about types`(): Unit = { - val ref = Inbox[String]("types").ref - "es.subscribe(ref, classOf[AnyRef])" shouldNot typeCheck - "es.unsubscribe(ref, classOf[AnyRef])" shouldNot typeCheck - } - - def `must manage subchannels using classes`(): Unit = { - val box = Inbox[A]("subchannelclass") - val a = new A - val b1 = new B2 - val b2 = new B3 - val c = new C - es.subscribe(box.ref, classOf[B3]) should ===(true) - es.publish(c) - es.publish(b2) - box.receiveMsg() should ===(b2) - es.subscribe(box.ref, classOf[A]) should ===(true) - es.publish(c) - box.receiveMsg() should ===(c) - es.publish(b1) - box.receiveMsg() should ===(b1) - es.unsubscribe(box.ref, classOf[B2]) should ===(true) - es.publish(c) - es.publish(b2) - es.publish(a) - box.receiveMsg() should ===(b2) - box.receiveMsg() should ===(a) - box.hasMessages should ===(false) - } - - def `must manage sub-channels using classes and traits (update on subscribe)`(): Unit = { - val tm1 = new CC - val tm2 = new CCATBT - val a1 = Inbox[AT]("subAT") - val a2 = Inbox[BT]("subBT") - val a3 = Inbox[CC]("subCC") - val a4 = Inbox[CCATBT]("subCCATBT") - - es.subscribe(a1.ref, classOf[AT]) should ===(true) - es.subscribe(a2.ref, classOf[BT]) should ===(true) - es.subscribe(a3.ref, classOf[CC]) should ===(true) - es.subscribe(a4.ref, classOf[CCATBT]) should ===(true) - es.publish(tm1) - es.publish(tm2) - a1.receiveMsg() should ===(tm2) - a2.receiveMsg() should ===(tm2) - a3.receiveMsg() should ===(tm1) - a3.receiveMsg() should ===(tm2) - a4.receiveMsg() should ===(tm2) - es.unsubscribe(a1.ref, classOf[AT]) should ===(true) - es.unsubscribe(a2.ref, classOf[BT]) should ===(true) - es.unsubscribe(a3.ref, classOf[CC]) should ===(true) - es.unsubscribe(a4.ref, classOf[CCATBT]) should ===(true) - } - - def `must manage sub-channels using classes and traits (update on unsubscribe)`(): Unit = { - val tm1 = new CC - val tm2 = new CCATBT - val a1 = Inbox[AT]("subAT") - val a2 = Inbox[BT]("subBT") - val a3 = Inbox[CC]("subCC") - val a4 = Inbox[CCATBT]("subCCATBT") - - es.subscribe(a1.ref, classOf[AT]) should ===(true) - es.subscribe(a2.ref, classOf[BT]) should ===(true) - es.subscribe(a3.ref, classOf[CC]) should ===(true) - es.subscribe(a4.ref, classOf[CCATBT]) should ===(true) - es.unsubscribe(a3.ref, classOf[CC]) should ===(true) - es.publish(tm1) - es.publish(tm2) - a1.receiveMsg() should ===(tm2) - a2.receiveMsg() should ===(tm2) - a3.hasMessages should ===(false) - a4.receiveMsg() should ===(tm2) - es.unsubscribe(a1.ref, classOf[AT]) should ===(true) - es.unsubscribe(a2.ref, classOf[BT]) should ===(true) - es.unsubscribe(a4.ref, classOf[CCATBT]) should ===(true) - } - - def `must manage sub-channels using classes and traits (update on unsubscribe all)`(): Unit = { - val tm1 = new CC - val tm2 = new CCATBT - val a1 = Inbox[AT]("subAT") - val a2 = Inbox[BT]("subBT") - val a3 = Inbox[CC]("subCC") - val a4 = Inbox[CCATBT]("subCCATBT") - - es.subscribe(a1.ref, classOf[AT]) should ===(true) - es.subscribe(a2.ref, classOf[BT]) should ===(true) - es.subscribe(a3.ref, classOf[CC]) should ===(true) - es.subscribe(a4.ref, classOf[CCATBT]) should ===(true) - es.unsubscribe(a3.ref) - es.publish(tm1) - es.publish(tm2) - a1.receiveMsg() should ===(tm2) - a2.receiveMsg() should ===(tm2) - a3.hasMessages should ===(false) - a4.receiveMsg() should ===(tm2) - es.unsubscribe(a1.ref, classOf[AT]) should ===(true) - es.unsubscribe(a2.ref, classOf[BT]) should ===(true) - es.unsubscribe(a4.ref, classOf[CCATBT]) should ===(true) - } - - def `must manage sub-channels using classes and traits (update on publish)`(): Unit = { - val tm1 = new CC - val tm2 = new CCATBT - val a1 = Inbox[AT]("subAT") - val a2 = Inbox[BT]("subBT") - - es.subscribe(a1.ref, classOf[AT]) should ===(true) - es.subscribe(a2.ref, classOf[BT]) should ===(true) - es.publish(tm1) - es.publish(tm2) - a1.receiveMsg() should ===(tm2) - a2.receiveMsg() should ===(tm2) - es.unsubscribe(a1.ref, classOf[AT]) should ===(true) - es.unsubscribe(a2.ref, classOf[BT]) should ===(true) - } - - def `must manage sub-channels using classes and traits (unsubscribe classes used with trait)`(): Unit = { - val tm1 = new CC - val tm2 = new CCATBT - val a1 = Inbox[AT]("subAT") - val a2 = Inbox[AnyRef]("subBT") - val a3 = Inbox[CC]("subCC") - - es.subscribe(a1.ref, classOf[AT]) should ===(true) - es.subscribe(a2.ref, classOf[BT]) should ===(true) - es.subscribe(a2.ref, classOf[CC]) should ===(true) - es.subscribe(a3.ref, classOf[CC]) should ===(true) - es.unsubscribe(a2.ref, classOf[CC]) should ===(true) - es.unsubscribe(a3.ref, classOf[CCATBT]) should ===(true) - es.publish(tm1) - es.publish(tm2) - a1.receiveMsg() should ===(tm2) - a2.receiveMsg() should ===(tm2) - a3.receiveMsg() should ===(tm1) - es.unsubscribe(a1.ref, classOf[AT]) should ===(true) - es.unsubscribe(a2.ref, classOf[BT]) should ===(true) - es.unsubscribe(a3.ref, classOf[CC]) should ===(true) - } - - def `must manage sub-channels using classes and traits (subscribe after publish)`(): Unit = { - val tm1 = new CCATBT - val a1 = Inbox[AT]("subAT") - val a2 = Inbox[BTT]("subBTT") - - es.subscribe(a1.ref, classOf[AT]) should ===(true) - es.publish(tm1) - a1.receiveMsg() should ===(tm1) - a2.hasMessages should ===(false) - es.subscribe(a2.ref, classOf[BTT]) should ===(true) - es.publish(tm1) - a1.receiveMsg() should ===(tm1) - a2.receiveMsg() should ===(tm1) - es.unsubscribe(a1.ref, classOf[AT]) should ===(true) - es.unsubscribe(a2.ref, classOf[BTT]) should ===(true) - } - - def `must watch subscribers`(): Unit = { - val ref = new DebugRef[String](root / "watch", true) - es.subscribe(ref, classOf[String]) should ===(true) - es.subscribe(ref, classOf[String]) should ===(false) - eventually(ref.hasSignal should ===(true)) - val unsubscriber = ref.receiveSignal() match { - case Watch(`ref`, watcher) ⇒ watcher - case other ⇒ fail(s"expected Watch(), got $other") - } - ref.hasSomething should ===(false) - unsubscriber.sorryForNothing.sendSystem(DeathWatchNotification(ref, null)) - eventually(es.subscribe(ref, classOf[String]) should ===(true)) - } - - def `must unsubscribe an actor upon termination`(): Unit = { - val ref = nativeSystem ? TypedSpec.Create(immutable[Done] { case _ ⇒ stopped }, "tester") futureValue Timeout(1.second) - es.subscribe(ref, classOf[Done]) should ===(true) - es.subscribe(ref, classOf[Done]) should ===(false) - ref ! Done - eventually(es.subscribe(ref, classOf[Done]) should ===(true)) - } - - def `must unsubscribe the actor, when it subscribes already in terminated state`(): Unit = { - val ref = nativeSystem ? TypedSpec.Create(stopped[Done], "tester") futureValue Timeout(1.second) - val wait = new DebugRef[Done](root / "wait", true) - ref.sorry.sendSystem(Watch(ref, wait)) - eventually(wait.hasSignal should ===(true)) - wait.receiveSignal() should ===(DeathWatchNotification(ref, null)) - es.subscribe(ref, classOf[Done]) should ===(true) - eventually(es.subscribe(ref, classOf[Done]) should ===(true)) - } - - def `must unwatch an actor from unsubscriber when that actor unsubscribes from the stream`(): Unit = { - val ref = new DebugRef[String](root / "watch", true) - es.subscribe(ref, classOf[String]) should ===(true) - es.subscribe(ref, classOf[String]) should ===(false) - eventually(ref.hasSignal should ===(true)) - val unsubscriber = ref.receiveSignal() match { - case Watch(`ref`, watcher) ⇒ watcher - case other ⇒ fail(s"expected Watch(), got $other") - } - ref.hasSomething should ===(false) - es.unsubscribe(ref) - eventually(ref.hasSignal should ===(true)) - ref.receiveSignal() should ===(Unwatch(ref, unsubscriber)) - } - - def `must unwatch an actor from unsubscriber when that actor unsubscribes from channels it subscribed`(): Unit = { - val ref = new DebugRef[AnyRef](root / "watch", true) - es.subscribe(ref, classOf[String]) should ===(true) - es.subscribe(ref, classOf[String]) should ===(false) - es.subscribe(ref, classOf[Integer]) should ===(true) - es.subscribe(ref, classOf[Integer]) should ===(false) - eventually(ref.hasSignal should ===(true)) - val unsubscriber = ref.receiveSignal() match { - case Watch(`ref`, watcher) ⇒ watcher - case other ⇒ fail(s"expected Watch(), got $other") - } - ref.hasSomething should ===(false) - es.unsubscribe(ref, classOf[Integer]) - Thread.sleep(50) - ref.hasSomething should ===(false) - es.unsubscribe(ref, classOf[String]) - eventually(ref.hasSignal should ===(true)) - ref.receiveSignal() should ===(Unwatch(ref, unsubscriber)) - } - - } - -} diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/FunctionRefSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/FunctionRefSpec.scala deleted file mode 100644 index 00c2d07017..0000000000 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/FunctionRefSpec.scala +++ /dev/null @@ -1,176 +0,0 @@ -/** - * Copyright (C) 2016-2017 Lightbend Inc. - */ -package akka.actor.typed -package internal - -import akka.actor.InvalidMessageException - -import scala.concurrent.{ Future, Promise } - -class FunctionRefSpec extends TypedSpecSetup { - - object `A FunctionRef` { - - def `must forward messages that are received after getting the ActorRef (completed later)`(): Unit = { - val p = Promise[ActorRef[String]] - val ref = ActorRef(p.future) - val target = new DebugRef[String](ref.path / "target", true) - p.success(target) - ref ! "42" - ref ! "43" - target.receiveAll() should ===(Left(Watch(target, ref)) :: Right("42") :: Right("43") :: Nil) - } - - def `must forward messages that are received after getting the ActorRef (already completed)`(): Unit = { - val target = new DebugRef[String](ActorRef.FuturePath / "target", true) - val f = Future.successful(target) - val ref = ActorRef(f) - ref ! "42" - ref ! "43" - target.receiveAll() should ===(Right("42") :: Right("43") :: Nil) - } - - def `must forward messages that are received before getting the ActorRef`(): Unit = { - val p = Promise[ActorRef[String]] - val ref = ActorRef(p.future) - ref ! "42" - ref ! "43" - val target = new DebugRef[String](ref.path / "target", true) - p.success(target) - target.receiveAll() should ===(Right("42") :: Right("43") :: Left(Watch(target, ref)) :: Nil) - } - - def `must notify watchers when the future fails`(): Unit = { - val p = Promise[ActorRef[String]] - val ref = ActorRef(p.future) - val client1 = new DebugRef(ref.path / "c1", true) - - ref.sorry.sendSystem(Watch(ref, client1)) - client1.hasSomething should ===(false) - - p.failure(new Exception) - client1.receiveSignal() should ===(DeathWatchNotification(ref, null)) - client1.hasSomething should ===(false) - - val client2 = new DebugRef(ref.path / "c2", true) - - ref.sorry.sendSystem(Watch(ref, client2)) - client2.receiveSignal() should ===(DeathWatchNotification(ref, null)) - client2.hasSomething should ===(false) - client1.hasSomething should ===(false) - } - - def `must notify watchers when terminated`(): Unit = { - val p = Promise[ActorRef[String]] - val ref = ActorRef(p.future) - val client1 = new DebugRef(ref.path / "c1", true) - - ref.sorry.sendSystem(Watch(ref, client1)) - client1.hasSomething should ===(false) - - ref.sorry.sendSystem(Terminate()) - client1.receiveSignal() should ===(DeathWatchNotification(ref, null)) - client1.hasSomething should ===(false) - - val client2 = new DebugRef(ref.path / "c2", true) - - ref.sorry.sendSystem(Watch(ref, client2)) - client2.receiveSignal() should ===(DeathWatchNotification(ref, null)) - client2.hasSomething should ===(false) - client1.hasSomething should ===(false) - } - - def `must notify watchers when terminated after receiving the target`(): Unit = { - val p = Promise[ActorRef[String]] - val ref = ActorRef(p.future) - val client1 = new DebugRef(ref.path / "c1", true) - - ref.sorry.sendSystem(Watch(ref, client1)) - client1.hasSomething should ===(false) - - val target = new DebugRef[String](ref.path / "target", true) - p.success(target) - ref ! "42" - ref ! "43" - target.receiveAll() should ===(Left(Watch(target, ref)) :: Right("42") :: Right("43") :: Nil) - - ref.sorry.sendSystem(Terminate()) - client1.receiveSignal() should ===(DeathWatchNotification(ref, null)) - client1.hasSomething should ===(false) - target.receiveAll() should ===(Left(Unwatch(target, ref)) :: Nil) - - val client2 = new DebugRef(ref.path / "c2", true) - - ref.sorry.sendSystem(Watch(ref, client2)) - client2.receiveSignal() should ===(DeathWatchNotification(ref, null)) - client2.hasSomething should ===(false) - client1.hasSomething should ===(false) - } - - def `must notify watchers when receiving the target after terminating`(): Unit = { - val p = Promise[ActorRef[String]] - val ref = ActorRef(p.future) - val client1 = new DebugRef(ref.path / "c1", true) - - ref.sorry.sendSystem(Watch(ref, client1)) - client1.hasSomething should ===(false) - - ref.sorry.sendSystem(Terminate()) - client1.receiveSignal() should ===(DeathWatchNotification(ref, null)) - client1.hasSomething should ===(false) - - val target = new DebugRef[String](ref.path / "target", true) - p.success(target) - ref ! "42" - ref ! "43" - target.hasSomething should ===(false) - - val client2 = new DebugRef(ref.path / "c2", true) - - ref.sorry.sendSystem(Watch(ref, client2)) - client2.receiveSignal() should ===(DeathWatchNotification(ref, null)) - client2.hasSomething should ===(false) - client1.hasSomething should ===(false) - } - - def `must notify watchers when the target ActorRef terminates`(): Unit = { - val p = Promise[ActorRef[String]] - val ref = ActorRef(p.future) - val client1 = new DebugRef(ref.path / "c1", true) - - ref.sorry.sendSystem(Watch(ref, client1)) - client1.hasSomething should ===(false) - - val target = new DebugRef[String](ref.path / "target", true) - p.success(target) - ref ! "42" - ref ! "43" - target.receiveAll() should ===(Left(Watch(target, ref)) :: Right("42") :: Right("43") :: Nil) - - ref.sorry.sendSystem(DeathWatchNotification(target, null)) - client1.receiveSignal() should ===(DeathWatchNotification(ref, null)) - client1.hasSomething should ===(false) - target.hasSomething should ===(false) - - val client2 = new DebugRef(ref.path / "c2", true) - - ref.sorry.sendSystem(Watch(ref, client2)) - client2.receiveSignal() should ===(DeathWatchNotification(ref, null)) - client2.hasSomething should ===(false) - client1.hasSomething should ===(false) - } - - def `must not allow null messages`(): Unit = { - val p = Promise[ActorRef[String]] - val ref = ActorRef(p.future) - - intercept[InvalidMessageException] { - ref ! null - } - - } - - } - -} diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/receptionist/LocalReceptionistSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/receptionist/LocalReceptionistSpec.scala index 424533e9c1..e463f4cd68 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/receptionist/LocalReceptionistSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/receptionist/LocalReceptionistSpec.scala @@ -183,7 +183,5 @@ class LocalReceptionistSpec extends TypedSpec with Eventually { } - object `A Receptionist (native)` extends CommonTests with NativeSystem object `A Receptionist (adapted)` extends CommonTests with AdaptedSystem - } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ImmutablePartial.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ImmutablePartial.scala index c8981527d6..26cd878665 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ImmutablePartial.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ImmutablePartial.scala @@ -13,10 +13,6 @@ import scala.concurrent.duration.DurationInt @RunWith(classOf[JUnitRunner]) final class ImmutablePartialSpec extends TypedSpec { - final object `An Actor.immutablePartial behavior (native)` - extends Tests - with NativeSystem - final object `An Actor.immutablePartial behavior (adapted)` extends Tests with AdaptedSystem diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/OnSignalSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/OnSignalSpec.scala index 0d0514f071..85309a88d2 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/OnSignalSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/OnSignalSpec.scala @@ -13,8 +13,6 @@ import org.scalatest.junit.JUnitRunner @RunWith(classOf[JUnitRunner]) final class OnSignalSpec extends TypedSpec { - final object `An Actor.onSignal behavior (native)` extends Tests with NativeSystem - final object `An Actor.onSignal behavior (adapted)` extends Tests with AdaptedSystem trait Tests extends StartSupport { diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/IntroSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/IntroSpec.scala index 5a59361c70..9667b8bed1 100644 --- a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/IntroSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/IntroSpec.scala @@ -1,7 +1,7 @@ /** * Copyright (C) 2014-2017 Lightbend Inc. */ -package docs.akka.actor.typed +package docs.akka.typed //#imports import akka.actor.typed._ @@ -118,22 +118,24 @@ class IntroSpec extends TypedSpec { //#chatroom-gabbler //#chatroom-main - val main: Behavior[akka.NotUsed] = + val main: Behavior[String] = Actor.deferred { ctx ⇒ val chatRoom = ctx.spawn(ChatRoom.behavior, "chatroom") val gabblerRef = ctx.spawn(gabbler, "gabbler") ctx.watch(gabblerRef) - chatRoom ! GetSession("ol’ Gabbler", gabblerRef) - Actor.immutable[akka.NotUsed] { - (_, _) ⇒ Actor.unhandled + Actor.immutablePartial[String] { + case (_, "go") ⇒ + chatRoom ! GetSession("ol’ Gabbler", gabblerRef) + Actor.same } onSignal { - case (ctx, Terminated(ref)) ⇒ + case (_, Terminated(ref)) ⇒ Actor.stopped } } val system = ActorSystem(main, "ChatRoomDemo") + system ! "go" Await.result(system.whenTerminated, 3.seconds) //#chatroom-main } diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/MutableIntroSpec.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/MutableIntroSpec.scala index b666cd15ae..b596ea5574 100644 --- a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/MutableIntroSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/MutableIntroSpec.scala @@ -1,14 +1,14 @@ /** * Copyright (C) 2014-2017 Lightbend Inc. */ -package docs.akka.actor.typed +package docs.akka.typed //#imports +import akka.NotUsed import akka.actor.typed._ import akka.actor.typed.scaladsl.Actor import akka.actor.typed.scaladsl.ActorContext -import akka.actor.typed.scaladsl.AskPattern._ -import scala.concurrent.Future + import scala.concurrent.duration._ import scala.concurrent.Await //#imports @@ -90,22 +90,25 @@ class MutableIntroSpec extends TypedSpec { //#chatroom-gabbler //#chatroom-main - val main: Behavior[akka.NotUsed] = + val main: Behavior[String] = Actor.deferred { ctx ⇒ val chatRoom = ctx.spawn(ChatRoom.behavior(), "chatroom") val gabblerRef = ctx.spawn(gabbler, "gabbler") ctx.watch(gabblerRef) - chatRoom ! GetSession("ol’ Gabbler", gabblerRef) - Actor.immutable[akka.NotUsed] { - (_, _) ⇒ Actor.unhandled + Actor.immutablePartial[String] { + case (_, "go") ⇒ + chatRoom ! GetSession("ol’ Gabbler", gabblerRef) + Actor.same } onSignal { - case (ctx, Terminated(ref)) ⇒ + case (_, Terminated(ref)) ⇒ + println("Stopping guardian") Actor.stopped } } val system = ActorSystem(main, "ChatRoomDemo") + system ! "go" Await.result(system.whenTerminated, 1.second) //#chatroom-main } diff --git a/akka-actor-typed/src/main/resources/reference.conf b/akka-actor-typed/src/main/resources/reference.conf index c620d00512..58bf0119c3 100644 --- a/akka-actor-typed/src/main/resources/reference.conf +++ b/akka-actor-typed/src/main/resources/reference.conf @@ -1,17 +1,4 @@ akka.typed { - - # The loggers to be started during ActorSystem startup. These must name - # classes of type akka.typed.Logger that provide the initial logger behavior. - loggers = ["akka.typed.DefaultLogger"] - - # FQCN of the logging filter that avoids rendering log messages below the current - # main loglevel. - # Must have a constructor for arguments of type (Settings, EventStream). - logging-filter = "akka.typed.DefaultLoggingFilter" - - # Default mailbox capacity for actors where nothing else is configured by - # their parent, see also class akka.typed.MailboxCapacity - mailbox-capacity = 1000 # List FQCN of `akka.typed.ExtensionId`s which shall be loaded at actor system startup. # Should be on the format: 'extensions = ["com.example.MyExtId1", "com.example.MyExtId2"]' etc. diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorRef.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorRef.scala index c67800e72f..d6bad39b62 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorRef.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorRef.scala @@ -16,12 +16,10 @@ import scala.util.Success * Actor instance. Sending a message to an Actor that has terminated before * receiving the message will lead to that message being discarded; such * messages are delivered to the [[DeadLetter]] channel of the - * [[EventStream]] on a best effort basis + * [[akka.typed.EventStream]] on a best effort basis * (i.e. this delivery is not reliable). */ trait ActorRef[-T] extends java.lang.Comparable[ActorRef[_]] { - this: internal.ActorRefImpl[T] ⇒ - /** * Send a message to the Actor referenced by this ActorRef using *at-most-once* * messaging semantics. @@ -66,22 +64,10 @@ object ActorRef { * Create an ActorRef from a Future, buffering up to the given number of * messages in while the Future is not fulfilled. */ - def apply[T](f: Future[ActorRef[T]], bufferSize: Int = 1000): ActorRef[T] = + private[akka] def apply[T](f: Future[ActorRef[T]], bufferSize: Int = 1000): ActorRef[T] = f.value match { // an AdaptedActorSystem will always create refs eagerly, so it will take this path case Some(Success(ref)) ⇒ ref - // for other ActorSystem implementations, this might work, it currently doesn't work - // for the adapted system, because the typed FutureRef cannot be watched from untyped - case x ⇒ new internal.FutureRef(FuturePath, bufferSize, f) + case _ ⇒ throw new IllegalStateException("Only expecting completed futures until the native actor system is implemented") } - - /** - * Create an ActorRef by providing a function that is invoked for sending - * messages and a termination callback. - */ - def apply[T](send: (T, internal.FunctionRef[T]) ⇒ Unit, terminate: internal.FunctionRef[T] ⇒ Unit): ActorRef[T] = - new internal.FunctionRef(FunctionPath, send, terminate) - - private[typed] val FuturePath = a.RootActorPath(a.Address("akka.actor.typed.internal", "future")) - private[typed] val FunctionPath = a.RootActorPath(a.Address("akka.actor.typed.internal", "function")) } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala index b5d17f01e1..89b421eba9 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala @@ -17,6 +17,7 @@ import akka.annotation.DoNotInherit import akka.annotation.ApiMayChange import java.util.Optional +import akka.actor.BootstrapSetup import akka.actor.typed.receptionist.Receptionist import akka.typed.EventStream @@ -29,8 +30,7 @@ import akka.typed.EventStream */ @DoNotInherit @ApiMayChange -abstract class ActorSystem[-T] extends ActorRef[T] with Extensions { this: internal.ActorRefImpl[T] ⇒ - +abstract class ActorSystem[-T] extends ActorRef[T] with Extensions { /** * The name of this actor system, used to distinguish multiple ones within * the same JVM & class loader. @@ -139,11 +139,11 @@ abstract class ActorSystem[-T] extends ActorRef[T] with Extensions { this: inter * Ask the system guardian of this system to create an actor from the given * behavior and props and with the given name. The name does not need to * be unique since the guardian will prefix it with a running number when - * creating the child actor. The timeout sets the timeout used for the [[akka.actor.typed.scaladsl.AskPattern$]] + * creating the child actor. The timeout sets the timeout used for the [[akka.actor.typed.scaladsl.AskPattern]] * invocation when asking the guardian. * * The returned Future of [[ActorRef]] may be converted into an [[ActorRef]] - * to which messages can immediately be sent by using the [[ActorRef$.apply[T](s*]] + * to which messages can immediately be sent by using the [[ActorRef.apply[T](s*]] * method. */ def systemActorOf[U](behavior: Behavior[U], name: String, props: Props = Props.empty)(implicit timeout: Timeout): Future[ActorRef[U]] @@ -159,9 +159,7 @@ object ActorSystem { import internal._ /** - * Scala API: Create an ActorSystem implementation that is optimized for running - * Akka Typed [[Behavior]] hierarchies—this system cannot run untyped - * [[akka.actor.Actor]] instances. + * Scala API: Create an ActorSystem */ def apply[T]( guardianBehavior: Behavior[T], @@ -173,13 +171,11 @@ object ActorSystem { Behavior.validateAsInitial(guardianBehavior) val cl = classLoader.getOrElse(akka.actor.ActorSystem.findClassLoader()) val appConfig = config.getOrElse(ConfigFactory.load(cl)) - new ActorSystemImpl(name, appConfig, cl, executionContext, guardianBehavior, guardianProps) + createInternal(name, guardianBehavior, guardianProps, Some(appConfig), classLoader, executionContext) } /** - * Java API: Create an ActorSystem implementation that is optimized for running - * Akka Typed [[Behavior]] hierarchies—this system cannot run untyped - * [[akka.actor.Actor]] instances. + * Java API: Create an ActorSystem */ def create[T]( guardianBehavior: Behavior[T], @@ -193,9 +189,7 @@ object ActorSystem { } /** - * Java API: Create an ActorSystem implementation that is optimized for running - * Akka Typed [[Behavior]] hierarchies—this system cannot run untyped - * [[akka.actor.Actor]] instances. + * Java API: Create an ActorSystem */ def create[T](guardianBehavior: Behavior[T], name: String): ActorSystem[T] = apply(guardianBehavior, name) @@ -205,26 +199,22 @@ object ActorSystem { * which runs Akka Typed [[Behavior]] on an emulation layer. In this * system typed and untyped actors can coexist. */ - def adapter[T](name: String, guardianBehavior: Behavior[T], - guardianProps: Props = Props.empty, - config: Option[Config] = None, - classLoader: Option[ClassLoader] = None, - executionContext: Option[ExecutionContext] = None, - actorSystemSettings: ActorSystemSetup = ActorSystemSetup.empty): ActorSystem[T] = { - - // TODO I'm not sure how useful this mode is for end-users. It has the limitation that untyped top level - // actors can't be created, because we have a custom user guardian. I would imagine that if you have - // a system of both untyped and typed actors (e.g. adding some typed actors to an existing application) - // you would start an untyped.ActorSystem and spawn typed actors from that system or from untyped actors. - // Same thing with `wrap` below. + private def createInternal[T](name: String, guardianBehavior: Behavior[T], + guardianProps: Props = Props.empty, + config: Option[Config] = None, + classLoader: Option[ClassLoader] = None, + executionContext: Option[ExecutionContext] = None): ActorSystem[T] = { Behavior.validateAsInitial(guardianBehavior) val cl = classLoader.getOrElse(akka.actor.ActorSystem.findClassLoader()) val appConfig = config.getOrElse(ConfigFactory.load(cl)) + val setup = ActorSystemSetup(BootstrapSetup(classLoader, config, executionContext)) val untyped = new a.ActorSystemImpl(name, appConfig, cl, executionContext, - Some(PropsAdapter(() ⇒ guardianBehavior, guardianProps)), actorSystemSettings) + Some(PropsAdapter(() ⇒ guardianBehavior, guardianProps)), setup) untyped.start() - ActorSystemAdapter.AdapterExtension(untyped).adapter + + val adapter: ActorSystemAdapter.AdapterExtension = ActorSystemAdapter.AdapterExtension(untyped) + adapter.adapter } /** @@ -267,10 +257,6 @@ final class Settings(val config: Config, val untyped: a.ActorSystem.Settings, va value } - val Loggers = getSL("Loggers", "akka.typed.loggers") - val LoggingFilter = getS("LoggingFilter", "akka.typed.logging-filter") - val DefaultMailboxCapacity = getI("DefaultMailboxCapacity", "akka.typed.mailbox-capacity") - foundSettings = foundSettings.reverse override def toString: String = s"Settings($name,\n ${foundSettings.mkString("\n ")})" diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/Props.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/Props.scala index 2b38d76f23..44cf1567ed 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/Props.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/Props.scala @@ -22,8 +22,8 @@ object Props { /** * Data structure for describing an actor’s props details like which - * executor to run it on. For each type of setting (e.g. [[DispatcherSelector]] - * or [[MailboxCapacity]]) the FIRST occurrence is used when creating the + * executor to run it on. For each type of setting (e.g. [[DispatcherSelector]]) + * the FIRST occurrence is used when creating the * actor; this means that adding configuration using the "with" methods * overrides what was configured previously. * @@ -78,11 +78,6 @@ abstract class Props private[akka] () extends Product with Serializable { */ def withDispatcherFromExecutionContext(ec: ExecutionContext): Props = DispatcherFromExecutionContext(ec, this) - /** - * Prepend the given mailbox capacity configuration to this Props. - */ - def withMailboxCapacity(capacity: Int): Props = MailboxCapacity(capacity, this) - /** * Find the first occurrence of a configuration node of the given type, falling * back to the provided default if none is found. @@ -140,19 +135,6 @@ abstract class Props private[akka] () extends Product with Serializable { } } -/** - * Configure the maximum mailbox capacity for the actor. If more messages are - * enqueued because the actor does not process them quickly enough then further - * messages will be dropped. - * - * The default mailbox capacity that is used when this option is not given is - * taken from the `akka.typed.mailbox-capacity` configuration setting. - */ -@InternalApi -private[akka] final case class MailboxCapacity(capacity: Int, next: Props = Props.empty) extends Props { - private[akka] override def withNext(next: Props): Props = copy(next = next) -} - /** * The empty configuration node, used as a terminator for the internally linked * list of each Props. diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorCell.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorCell.scala deleted file mode 100644 index 0627d5c216..0000000000 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorCell.scala +++ /dev/null @@ -1,492 +0,0 @@ -/** - * Copyright (C) 2016-2017 Lightbend Inc. - */ -package akka.actor.typed -package internal - -import akka.actor.{ Cancellable, InvalidActorNameException, InvalidMessageException } -import akka.util.Helpers - -import scala.concurrent.duration.FiniteDuration -import akka.dispatch.ExecutionContexts - -import scala.concurrent.ExecutionContextExecutor -import akka.util.Unsafe.{ instance ⇒ unsafe } -import java.util.concurrent.ConcurrentLinkedQueue -import java.util.Queue - -import scala.annotation.tailrec -import scala.util.control.NonFatal -import scala.util.control.Exception.Catcher -import akka.event.Logging.Error -import akka.event.Logging -import akka.actor.typed.Behavior.StoppedBehavior -import akka.util.OptionVal -import akka.actor.typed.Behavior.UntypedBehavior - -/** - * INTERNAL API - */ -object ActorCell { - /* - * Description of the _status field bit structure: - * - * bit 0-29: activation count (number of (system)messages) - * bit 30: terminating (or terminated) - * bit 31: terminated - * - * Activation count is a bit special: - * 0 means inactive - * 1 means active without normal messages (i.e. only system messages) - * N means active with N-1 normal messages (plus possibly system messages) - */ - final val terminatingShift = 30 - - final val activationMask = (1 << terminatingShift) - 1 - // ensure that if all processors enqueue “the last message” concurrently, there is still no overflow - val maxActivations = activationMask - Runtime.getRuntime.availableProcessors - 1 - - final val terminatingBit = 1 << terminatingShift - final val terminatedBit = 1 << 31 - - def isTerminating(status: Int): Boolean = (status & terminatingBit) != 0 - def isTerminated(status: Int): Boolean = status < 0 - def isActive(status: Int): Boolean = (status & ~activationMask) == 0 - - def activations(status: Int): Int = status & activationMask - def messageCount(status: Int): Int = Math.max(0, activations(status) - 1) - - val statusOffset = unsafe.objectFieldOffset(classOf[ActorCell[_]].getDeclaredField("_status")) - val systemQueueOffset = unsafe.objectFieldOffset(classOf[ActorCell[_]].getDeclaredField("_systemQueue")) - - final val DefaultState = 0 - final val SuspendedState = 1 - final val SuspendedWaitForChildrenState = 2 - - /** compile time constant */ - final val Debug = false -} - -/** - * INTERNAL API - */ -private[typed] class ActorCell[T]( - override val system: ActorSystem[Nothing], - protected val initialBehavior: Behavior[T], - override val executionContext: ExecutionContextExecutor, - override val mailboxCapacity: Int, - val parent: ActorRefImpl[Nothing]) - extends ActorContextImpl[T] with Runnable with SupervisionMechanics[T] with DeathWatch[T] { - import ActorCell._ - - /* - * Implementation of the ActorContext trait. - */ - - protected var childrenMap = Map.empty[String, ActorRefImpl[Nothing]] - protected var terminatingMap = Map.empty[String, ActorRefImpl[Nothing]] - override def children: Iterable[ActorRef[Nothing]] = childrenMap.values - override def child(name: String): Option[ActorRef[Nothing]] = childrenMap.get(name) - protected def removeChild(actor: ActorRefImpl[Nothing]): Unit = { - val n = actor.path.name - childrenMap.get(n) match { - case Some(`actor`) ⇒ childrenMap -= n - case _ ⇒ - terminatingMap.get(n) match { - case Some(`actor`) ⇒ terminatingMap -= n - case _ ⇒ - } - } - } - private[typed] def terminating: Iterable[ActorRef[Nothing]] = terminatingMap.values - - private var _self: ActorRefImpl[T] = _ - private[typed] def setSelf(ref: ActorRefImpl[T]): Unit = _self = ref - override def self: ActorRefImpl[T] = _self - - protected def ctx: ActorContext[T] = this - - override def spawn[U](behavior: Behavior[U], name: String, props: Props): ActorRef[U] = { - if (behavior.isInstanceOf[UntypedBehavior[_]]) - throw new IllegalArgumentException(s"${behavior.getClass.getName} requires untyped ActorSystem") - if (childrenMap contains name) throw InvalidActorNameException(s"actor name [$name] is not unique") - if (terminatingMap contains name) throw InvalidActorNameException(s"actor name [$name] is not yet free") - val dispatcher = props.firstOrElse[DispatcherSelector](DispatcherFromExecutionContext(executionContext)) - val capacity = props.firstOrElse(MailboxCapacity(system.settings.DefaultMailboxCapacity)) - val cell = new ActorCell[U](system, Behavior.validateAsInitial(behavior), system.dispatchers.lookup(dispatcher), capacity.capacity, self) - // TODO uid is still needed - val ref = new LocalActorRef[U](self.path / name, cell) - cell.setSelf(ref) - childrenMap = childrenMap.updated(name, ref) - ref.sendSystem(Create()) - ref - } - - private var nextName = 0L - override def spawnAnonymous[U](behavior: Behavior[U], props: Props): ActorRef[U] = { - val name = Helpers.base64(nextName) - nextName += 1 - spawn(behavior, name, props) - } - - override def stop[U](child: ActorRef[U]): Boolean = { - val name = child.path.name - childrenMap.get(name) match { - case None ⇒ false - case Some(ref) if ref != child ⇒ false - case Some(ref) ⇒ - ref.sendSystem(Terminate()) - childrenMap -= name - terminatingMap = terminatingMap.updated(name, ref) - true - } - } - - protected def stopAll(): Unit = { - childrenMap.valuesIterator.foreach { ref ⇒ - ref.sendSystem(Terminate()) - terminatingMap = terminatingMap.updated(ref.path.name, ref) - } - childrenMap = Map.empty - } - - override def schedule[U](delay: FiniteDuration, target: ActorRef[U], msg: U): Cancellable = - system.scheduler.scheduleOnce(delay)(target ! msg)(ExecutionContexts.sameThreadExecutionContext) - - override private[akka] def internalSpawnAdapter[U](f: U ⇒ T, _name: String): ActorRef[U] = { - val baseName = Helpers.base64(nextName, new java.lang.StringBuilder("$!")) - nextName += 1 - val name = if (_name != "") s"$baseName-${_name}" else baseName - val ref = new FunctionRef[U]( - self.path / name, - (msg, _) ⇒ { val m = f(msg); if (m != null) send(m) }, - (self) ⇒ sendSystem(DeathWatchNotification(self, null))) - childrenMap = childrenMap.updated(name, ref) - ref - } - - private[this] var receiveTimeout: (FiniteDuration, T) = null - override def setReceiveTimeout(d: FiniteDuration, msg: T): Unit = { - if (Debug) println(s"$self setting receive timeout of $d, msg $msg") - receiveTimeout = (d, msg) - } - override def cancelReceiveTimeout(): Unit = { - if (Debug) println(s"$self canceling receive timeout") - receiveTimeout = null - } - - /* - * Implementation of the invocation mechanics. - */ - - // see comment in companion object for details - @volatile private[this] var _status: Int = 0 - protected[typed] def getStatus: Int = _status - private[this] val queue: Queue[T] = new ConcurrentLinkedQueue[T] - private[typed] def peekMessage: T = queue.peek() - private[this] val maxQueue: Int = Math.min(mailboxCapacity, maxActivations) - @volatile private[this] var _systemQueue: LatestFirstSystemMessageList = SystemMessageList.LNil - - protected def maySend: Boolean = !isTerminating - protected def isTerminating: Boolean = ActorCell.isTerminating(_status) - protected def setTerminating(): Unit = if (!ActorCell.isTerminating(_status)) unsafe.getAndAddInt(this, statusOffset, terminatingBit) - protected def setClosed(): Unit = if (!isTerminated(_status)) unsafe.getAndAddInt(this, statusOffset, terminatedBit) - - private def handleException: Catcher[Unit] = { - case e: InterruptedException ⇒ - publish(Error(e, self.path.toString, getClass, "interrupted during message send")) - Thread.currentThread.interrupt() - case NonFatal(e) ⇒ - publish(Error(e, self.path.toString, getClass, "swallowing exception during message send")) - } - - def send(msg: T): Unit = { - if (msg == null) throw new InvalidMessageException("[null] is not an allowed message") - try { - val old = unsafe.getAndAddInt(this, statusOffset, 1) - val oldActivations = activations(old) - // this is not an off-by-one: #msgs is activations-1 if >0 - if (oldActivations > maxQueue) { - if (Debug) println(s"[$thread] $self NOT enqueueing $msg at status $old ($oldActivations > $maxQueue)") - // cannot enqueue, need to give back activation token - unsafe.getAndAddInt(this, statusOffset, -1) - system.eventStream.publish(Dropped(msg, self)) - } else if (ActorCell.isTerminating(old)) { - if (Debug) println(s"[$thread] $self NOT enqueueing $msg at status $old (is terminating)") - unsafe.getAndAddInt(this, statusOffset, -1) - system.deadLetters ! msg - } else { - if (Debug) println(s"[$thread] $self enqueueing $msg at status $old") - // need to enqueue; if the actor sees the token but not the message, it will reschedule - queue.add(msg) - if (oldActivations == 0) { - if (Debug) println(s"[$thread] $self being woken up") - unsafe.getAndAddInt(this, statusOffset, 1) // the first 1 was just the “active” bit, now add 1msg - // if the actor was not yet running, set it in motion; spurious wakeups don’t hurt - executionContext.execute(this) - } - } - } catch handleException - } - - def sendSystem(signal: SystemMessage): Unit = { - @tailrec def needToActivate(): Boolean = { - val currentList = _systemQueue - if (currentList.head == NoMessage) { - system.deadLetters.sorry.sendSystem(signal) - false - } else { - unsafe.compareAndSwapObject(this, systemQueueOffset, currentList.head, (signal :: currentList).head) || { - signal.unlink() - needToActivate() - } - } - } - try { - if (needToActivate()) { - val old = unsafe.getAndAddInt(this, statusOffset, 1) - if (isTerminated(old)) { - // nothing to do - if (Debug) println(s"[$thread] $self NOT enqueueing $signal: terminating") - unsafe.getAndAddInt(this, statusOffset, -1) - } else if (activations(old) == 0) { - // all is good: we signaled the transition to active - if (Debug) println(s"[$thread] $self enqueueing $signal: activating") - executionContext.execute(this) - } else { - // take back that token: we didn’t actually enqueue a normal message and the actor was already active - if (Debug) println(s"[$thread] $self enqueueing $signal: already active") - unsafe.getAndAddInt(this, statusOffset, -1) - } - } else if (Debug) println(s"[$thread] $self NOT enqueueing $signal: terminated") - } catch handleException - } - - /** - * Main entry point into the actor: the ActorCell is a Runnable that is - * enqueued in its Executor whenever it needs to run. The _status field is - * used for coordination such that it is never enqueued more than once at - * any given time, because that would break the Actor Model. - * - * The idea here is to process at most as many messages as were in queued - * upon entry of this method, interleaving each normal message with the - * processing of all system messages that may have accumulated in the - * meantime. If at the end of the processing messages remain in the queue - * then this cell is rescheduled. - * - * All coordination occurs via a single Int field that is only updated in - * wait-free manner (LOCK XADD via unsafe.getAndAddInt), where conflicts are - * resolved by compensating actions. For a description of the bit usage see - * the companion object’s source code. - */ - override final def run(): Unit = { - if (Debug) println(s"[$thread] $self entering run(): interrupted=${Thread.currentThread.isInterrupted}") - val status = _status - val msgs = messageCount(status) - - var processed = 0 - @tailrec def process(): Unit = { - if (processAllSystemMessages() && processed < msgs) { - val msg = queue.poll() - if (msg != null) { - processed += 1 - processMessage(msg) - process() - } - } - } - - try { - unscheduleReceiveTimeout() - if (!isTerminated(status)) { - process() - scheduleReceiveTimeout() - } - } catch { - case NonFatal(ex) ⇒ - fail(ex) - case ie: InterruptedException ⇒ - fail(ie) - if (Debug) println(s"[$thread] $self interrupting due to catching InterruptedException") - Thread.currentThread.interrupt() - } - - // Returns `true` if it should be rescheduled. - // This method shouldn't throw apart from fatal errors. - def postProcess(): Boolean = { - // also remove the general activation token - processed += 1 - val prev = unsafe.getAndAddInt(this, statusOffset, -processed) - val now = prev - processed - if (isTerminated(now)) { - false // we’re finished, don't reschedule - } else if (activations(now) > 0) { - // normal messages pending: reverse the deactivation - unsafe.getAndAddInt(this, statusOffset, 1) - true // ... and reschedule - } else if (_systemQueue.head != null) { - /* - * System message was enqueued after our last processing, we now need to - * race against the other party because the enqueue might have happened - * before the deactivation (above) and hence not scheduled. - * - * If we win, we reschedule; if we lose, we must remove the attempted - * activation token again. - */ - val again = unsafe.getAndAddInt(this, statusOffset, 1) - if (activations(again) == 0) true //reschedule - else { - unsafe.getAndAddInt(this, statusOffset, -1) - false // don't reschedule - } - } else { - false // don't reschedule - } - } - - if (postProcess()) - try executionContext.execute(this) catch { - case NonFatal(e) ⇒ - // we can just hope that the actor will receive another message at some - // point to wake it up again—assuming that the failure to enqueue the cell is transient - fail(e) - } - - if (Debug) println(s"[$thread] $self exiting run(): interrupted=${Thread.currentThread.isInterrupted}") - } - - protected[typed] var behavior: Behavior[T] = _ - - protected def next(b: Behavior[T], msg: Any): Unit = { - if (Behavior.isUnhandled(b)) unhandled(msg) - else { - b match { - case s: StoppedBehavior[T] ⇒ - // use StoppedBehavior with previous behavior or an explicitly given `postStop` behavior - // until Terminate is received, i.e until finishTerminate is invoked, and there PostStop - // will be signaled to the previous/postStop behavior - s.postStop match { - case OptionVal.None ⇒ - // use previous as the postStop behavior - behavior = new Behavior.StoppedBehavior(OptionVal.Some(behavior)) - case OptionVal.Some(postStop) ⇒ - // use the given postStop behavior, but canonicalize it - behavior = new Behavior.StoppedBehavior(OptionVal.Some(Behavior.canonicalize(postStop, behavior, ctx))) - } - self.sendSystem(Terminate()) - case _ ⇒ - behavior = Behavior.canonicalize(b, behavior, ctx) - } - } - } - - private def unhandled(msg: Any): Unit = msg match { - case Terminated(ref) ⇒ fail(DeathPactException(ref)) - case _ ⇒ // nothing to do - } - - private[this] var receiveTimeoutScheduled: Cancellable = null - private def unscheduleReceiveTimeout(): Unit = - if (receiveTimeoutScheduled ne null) { - receiveTimeoutScheduled.cancel() - receiveTimeoutScheduled = null - } - private def scheduleReceiveTimeout(): Unit = - receiveTimeout match { - case (d, msg) ⇒ - receiveTimeoutScheduled = schedule(d, self, msg) - case other ⇒ - // nothing to do - } - - /** - * Process the messages in the mailbox - */ - private def processMessage(msg: T): Unit = { - if (Debug) println(s"[$thread] $self processing message $msg") - next(Behavior.interpretMessage(behavior, this, msg), msg) - if (Thread.interrupted()) - throw new InterruptedException("Interrupted while processing actor messages") - } - - @tailrec - private def systemDrain(next: LatestFirstSystemMessageList): EarliestFirstSystemMessageList = { - val currentList = _systemQueue - if (currentList.head == NoMessage) SystemMessageList.ENil - else if (unsafe.compareAndSwapObject(this, systemQueueOffset, currentList.head, next.head)) currentList.reverse - else systemDrain(next) - } - - /** - * Will at least try to process all queued system messages: in case of - * failure simply drop and go on to the next, because there is nothing to - * restart here (failure is in ActorCell somewhere …). In case the mailbox - * becomes closed (because of processing a Terminate message), dump all - * already dequeued message to deadLetters. - */ - private def processAllSystemMessages(): Boolean = { - var interruption: Throwable = null - var messageList = systemDrain(SystemMessageList.LNil) - var continue = true - while (messageList.nonEmpty && continue) { - val msg = messageList.head - messageList = messageList.tail - msg.unlink() - continue = - try processSignal(msg) - catch { - case ie: InterruptedException ⇒ - fail(ie) - if (Debug) println(s"[$thread] $self interrupting due to catching InterruptedException during system message processing") - Thread.currentThread.interrupt() - true - case ex @ (NonFatal(_) | _: AssertionError) ⇒ - fail(ex) - true - } - /* - * the second part of the condition is necessary to avoid logging an InterruptedException - * from the systemGuardian during shutdown - */ - if (Thread.interrupted() && system.whenTerminated.value.isEmpty) - interruption = new InterruptedException("Interrupted while processing system messages") - // don’t ever execute normal message when system message present! - if (messageList.isEmpty && continue) messageList = systemDrain(SystemMessageList.LNil) - } - /* - * if we closed the mailbox, we must dump the remaining system messages - * to deadLetters (this is essential for DeathWatch) - */ - val dlm = system.deadLetters - if (isTerminated(_status) && messageList.isEmpty) messageList = systemDrain(new LatestFirstSystemMessageList(NoMessage)) - while (messageList.nonEmpty) { - val msg = messageList.head - messageList = messageList.tail - if (Debug) println(s"[$thread] $self dropping dead system message $msg") - msg.unlink() - try dlm.sorry.sendSystem(msg) - catch { - case e: InterruptedException ⇒ interruption = e - case NonFatal(e) ⇒ system.eventStream.publish( - Error(e, self.path.toString, this.getClass, "error while enqueuing " + msg + " to deadLetters: " + e.getMessage)) - } - if (isTerminated(_status) && messageList.isEmpty) messageList = systemDrain(new LatestFirstSystemMessageList(NoMessage)) - } - // if we got an interrupted exception while handling system messages, then rethrow it - if (interruption ne null) { - if (Debug) println(s"[$thread] $self throwing interruption") - Thread.interrupted() // clear interrupted flag before throwing according to java convention - throw interruption - } - continue - } - - // logging is not the main purpose, and if it fails there’s nothing we can do - protected final def publish(e: Logging.LogEvent): Unit = try system.eventStream.publish(e) catch { case NonFatal(_) ⇒ } - - protected final def clazz(o: AnyRef): Class[_] = if (o eq null) this.getClass else o.getClass - - private def thread: String = Thread.currentThread.getName - - override def toString: String = f"ActorCell($self, status = ${_status}%08x, queue = $queue)" -} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorRefImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorRefImpl.scala index 05e4420b59..de630f6385 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorRefImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorRefImpl.scala @@ -4,18 +4,6 @@ package akka.actor.typed package internal -import akka.{ actor ⇒ a } -import akka.dispatch.sysmsg._ -import akka.util.Unsafe.{ instance ⇒ unsafe } - -import scala.annotation.tailrec -import scala.util.control.NonFatal -import scala.concurrent.Future -import java.util.ArrayList - -import akka.actor.InvalidMessageException - -import scala.util.{ Failure, Success } import scala.annotation.unchecked.uncheckedVariance /** @@ -24,7 +12,7 @@ import scala.annotation.unchecked.uncheckedVariance * available in the package object, enabling `ref.toImpl` (or `ref.toImplN` * for `ActorRef[Nothing]`—Scala refuses to infer `Nothing` as a type parameter). */ -private[typed] trait ActorRefImpl[-T] extends ActorRef[T] { +private[akka] trait ActorRefImpl[-T] extends ActorRef[T] { def sendSystem(signal: SystemMessage): Unit def isLocal: Boolean @@ -53,185 +41,3 @@ private[typed] trait ActorRefImpl[-T] extends ActorRef[T] { override def toString: String = s"Actor[${path}#${path.uid}]" } - -/** - * A local ActorRef that is backed by an asynchronous [[ActorCell]]. - */ -private[typed] class LocalActorRef[-T](override val path: a.ActorPath, cell: ActorCell[T]) - extends ActorRef[T] with ActorRefImpl[T] { - override def tell(msg: T): Unit = cell.send(msg) - override def sendSystem(signal: SystemMessage): Unit = cell.sendSystem(signal) - final override def isLocal: Boolean = true - private[typed] def getCell: ActorCell[_] = cell -} - -/** - * A local ActorRef that just discards everything that is sent to it. This - * implies that it effectively has an infinite lifecycle, i.e. it never - * terminates (meaning: no Hawking radiation). - */ -private[typed] object BlackholeActorRef - extends ActorRef[Any] with ActorRefImpl[Any] { - override val path: a.ActorPath = a.RootActorPath(a.Address("akka.actor.typed.internal", "blackhole")) - override def tell(msg: Any): Unit = () - override def sendSystem(signal: SystemMessage): Unit = () - final override def isLocal: Boolean = true -} - -/** - * A local synchronous ActorRef that invokes the given function for every message send. - * This reference can be watched and will do the right thing when it receives a [[DeathWatchNotification]]. - * This reference cannot watch other references. - */ -private[akka] final class FunctionRef[-T]( - _path: a.ActorPath, - send: (T, FunctionRef[T]) ⇒ Unit, - _terminate: FunctionRef[T] ⇒ Unit) - extends WatchableRef[T](_path) { - - override def tell(msg: T): Unit = { - if (msg == null) throw new InvalidMessageException("[null] is not an allowed message") - if (isAlive) - try send(msg, this) catch { - case NonFatal(ex) ⇒ // nothing we can do here - } - else () // we don’t have deadLetters available - } - - override def sendSystem(signal: SystemMessage): Unit = signal match { - case Create() ⇒ // nothing to do - case DeathWatchNotification(ref, cause) ⇒ // we’re not watching, and we’re not a parent either - case Terminate() ⇒ doTerminate() - case Watch(watchee, watcher) ⇒ if (watchee == this && watcher != this) addWatcher(watcher.sorryForNothing) - case Unwatch(watchee, watcher) ⇒ if (watchee == this && watcher != this) remWatcher(watcher.sorryForNothing) - case NoMessage ⇒ // nothing to do - } - - override def isLocal = true - - override def terminate(): Unit = _terminate(this) -} - -/** - * The mechanics for synthetic ActorRefs that have a lifecycle and support being watched. - */ -private[typed] abstract class WatchableRef[-T](override val path: a.ActorPath) extends ActorRef[T] with ActorRefImpl[T] { - import WatchableRef._ - - /** - * Callback that is invoked when this ref has terminated. Even if doTerminate() is - * called multiple times, this callback is invoked only once. - */ - protected def terminate(): Unit - - type S = Set[ActorRefImpl[Nothing]] - @volatile private[this] var _watchedBy: S = Set.empty - - protected def isAlive: Boolean = _watchedBy != null - - protected def doTerminate(): Unit = { - val watchedBy = unsafe.getAndSetObject(this, watchedByOffset, null).asInstanceOf[S] - if (watchedBy != null) { - try terminate() catch { case NonFatal(ex) ⇒ } - if (watchedBy.nonEmpty) watchedBy foreach sendTerminated - } - } - - private def sendTerminated(watcher: ActorRefImpl[Nothing]): Unit = - watcher.sendSystem(DeathWatchNotification(this, null)) - - @tailrec final protected def addWatcher(watcher: ActorRefImpl[Nothing]): Unit = - _watchedBy match { - case null ⇒ sendTerminated(watcher) - case watchedBy ⇒ - if (!watchedBy.contains(watcher)) - if (!unsafe.compareAndSwapObject(this, watchedByOffset, watchedBy, watchedBy + watcher)) - addWatcher(watcher) // try again - } - - @tailrec final protected def remWatcher(watcher: ActorRefImpl[Nothing]): Unit = { - _watchedBy match { - case null ⇒ // do nothing... - case watchedBy ⇒ - if (watchedBy.contains(watcher)) - if (!unsafe.compareAndSwapObject(this, watchedByOffset, watchedBy, watchedBy - watcher)) - remWatcher(watcher) // try again - } - } -} - -private[typed] object WatchableRef { - val watchedByOffset = unsafe.objectFieldOffset(classOf[WatchableRef[_]].getDeclaredField("_watchedBy")) -} - -/** - * A Future of an ActorRef can quite easily be wrapped as an ActorRef since no - * promises are made about delivery delays: as long as the Future is not ready - * messages will be queued, afterwards they get sent without waiting. - */ -private[typed] class FutureRef[-T](_path: a.ActorPath, bufferSize: Int, f: Future[ActorRef[T]]) extends WatchableRef[T](_path) { - import FutureRef._ - - // Keep in synch with `targetOffset` in companion (could also change on mixing in a trait). - @volatile private[this] var _target: Either[ArrayList[T], ActorRef[T]] = Left(new ArrayList[T]) - - f.onComplete { - case Success(ref) ⇒ - _target match { - case l @ Left(list) ⇒ - list.synchronized { - val it = list.iterator - while (it.hasNext) ref ! it.next() - if (unsafe.compareAndSwapObject(this, targetOffset, l, Right(ref))) - ref.sorry.sendSystem(Watch(ref, this)) - // if this fails, concurrent termination has won and there is no point in watching - } - case _ ⇒ // already terminated - } - case Failure(ex) ⇒ doTerminate() - }(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) - - override def terminate(): Unit = { - val old = unsafe.getAndSetObject(this, targetOffset, Right(BlackholeActorRef)) - old match { - case Right(target: ActorRef[_]) ⇒ target.sorry.sendSystem(Unwatch(target, this)) - case _ ⇒ // nothing to do - } - } - - override def tell(msg: T): Unit = { - if (msg == null) throw new InvalidMessageException("[null] is not an allowed message") - _target match { - case Left(list) ⇒ - list.synchronized { - if (_target.isRight) tell(msg) - else if (list.size < bufferSize) list.add(msg) - } - case Right(ref) ⇒ ref ! msg - } - } - - override def sendSystem(signal: SystemMessage): Unit = signal match { - case Create() ⇒ // nothing to do - case DeathWatchNotification(ref, cause) ⇒ - _target = Right(BlackholeActorRef) // avoid sending Unwatch() in this case - doTerminate() // this can only be the result of watching the target - case Terminate() ⇒ doTerminate() - case Watch(watchee, watcher) ⇒ if (watchee == this && watcher != this) addWatcher(watcher.sorryForNothing) - case Unwatch(watchee, watcher) ⇒ if (watchee == this && watcher != this) remWatcher(watcher.sorryForNothing) - case NoMessage ⇒ // nothing to do - } - - override def isLocal = true -} - -private[typed] object FutureRef { - val targetOffset = { - val fields = classOf[FutureRef[_]].getDeclaredFields.toList - // On Scala 2.12, the field's name is exactly "_target" (and it's private), earlier Scala versions compile the val to a public field that's name mangled to "akka.actor.typed$internal$FutureRef$$_target" - val targetField = fields.find(_.getName.endsWith("_target")) - assert(targetField.nonEmpty, s"Could not find _target field in FutureRef class among fields $fields.") - - unsafe.objectFieldOffset(targetField.get) - } -} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorSystemImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorSystemImpl.scala deleted file mode 100644 index 3e421a1a50..0000000000 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorSystemImpl.scala +++ /dev/null @@ -1,301 +0,0 @@ -/** - * Copyright (C) 2016-2017 Lightbend Inc. - */ -package akka.actor.typed -package internal - -import com.typesafe.config.Config - -import scala.concurrent.ExecutionContext -import java.util.concurrent.ThreadFactory - -import scala.concurrent.{ ExecutionContextExecutor, Future } -import akka.{ actor ⇒ a, dispatch ⇒ d, event ⇒ e } - -import scala.util.control.NonFatal -import scala.util.control.ControlThrowable -import scala.collection.immutable -import akka.actor.typed.Dispatchers - -import scala.concurrent.Promise -import java.util.concurrent.ConcurrentSkipListSet -import java.util.concurrent.atomic.AtomicBoolean - -import scala.collection.JavaConverters._ -import scala.util.Success -import akka.util.Timeout -import java.io.Closeable -import java.util.concurrent.atomic.AtomicInteger - -import akka.actor.typed.receptionist.Receptionist -import akka.actor.typed.scaladsl.AskPattern -import akka.typed.{ BusLogging, EventStream } - -object ActorSystemImpl { - - sealed trait SystemCommand - case class CreateSystemActor[T](behavior: Behavior[T], name: String, props: Props)(val replyTo: ActorRef[ActorRef[T]]) extends SystemCommand - - val systemGuardianBehavior: Behavior[SystemCommand] = { - import scaladsl.Actor - Actor.deferred { _ ⇒ - var i = 1 - Actor.immutable { - case (ctx, create: CreateSystemActor[t]) ⇒ - val name = s"$i-${create.name}" - i += 1 - create.replyTo ! ctx.spawn(create.behavior, name, create.props) - Actor.same - } - } - } -} - -/* - * Actor Ideas: - - • remoting/clustering is just another set of actors/extensions - -Receptionist: - - • should be a new kind of Extension (where lookup yields ActorRef) - • obtaining a reference may either give a single remote one or a dynamic local proxy that routes to available instances—distinguished using a “stableDestination” flag (for read-your-writes semantics) - • perhaps fold sharding into this: how message routing is done should not matter - -Streams: - - • make new implementation of ActorMaterializer that leverages Envelope removal - • all internal actor creation must be asynchronous - • could offer ActorSystem extension for materializer - • remove downcasts to ActorMaterializer in akka-stream package—replace by proper function passing or Materializer APIs where needed (should make Gearpump happier as well) - • add new Sink/Source for ActorRef[] - -Distributed Data: - - • create new Behaviors around the logic - - * - */ - -private[typed] class ActorSystemImpl[-T]( - override val name: String, - _config: Config, - _cl: ClassLoader, - _ec: Option[ExecutionContext], - _userGuardianBehavior: Behavior[T], - _userGuardianProps: Props) - extends ActorSystem[T] with ActorRef[T] with ActorRefImpl[T] with ExtensionsImpl { - - import ActorSystemImpl._ - - if (!name.matches("""^[a-zA-Z0-9][a-zA-Z0-9-_]*$""")) - throw new IllegalArgumentException( - "invalid ActorSystem name [" + name + - "], must contain only word characters (i.e. [a-zA-Z0-9] plus non-leading '-' or '_')") - - final override val path: a.ActorPath = a.RootActorPath(a.Address("akka", name)) / "user" - - override val settings: Settings = new Settings(_cl, _config, name) - - override def logConfiguration(): Unit = log.info(settings.toString) - - protected def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = - new Thread.UncaughtExceptionHandler() { - def uncaughtException(thread: Thread, cause: Throwable): Unit = { - cause match { - case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable ⇒ log.error(cause, "Uncaught error from thread [{}]", thread.getName) - case _ ⇒ - if (settings.untyped.JvmExitOnFatalError) { - try { - log.error(cause, "Uncaught error from thread [{}] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled", thread.getName) - import System.err - err.print("Uncaught error from thread [") - err.print(thread.getName) - err.print("] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[") - err.print(name) - err.println("]") - cause.printStackTrace(System.err) - System.err.flush() - } finally { - System.exit(-1) - } - } else { - log.error(cause, "Uncaught fatal error from thread [{}] shutting down ActorSystem [{}]", thread.getName, name) - terminate() - } - } - } - } - - override val threadFactory: d.MonitorableThreadFactory = - d.MonitorableThreadFactory(name, settings.untyped.Daemonicity, Option(_cl), uncaughtExceptionHandler) - - override val dynamicAccess: a.DynamicAccess = new a.ReflectiveDynamicAccess(_cl) - - private val loggerIds = new AtomicInteger - def loggerId(): Int = loggerIds.incrementAndGet() - - // this provides basic logging (to stdout) until .start() is called below - override val eventStream = new EventStreamImpl(settings.untyped.DebugEventStream)(settings.untyped.LoggerStartTimeout) - eventStream.startStdoutLogger(settings) - - override val logFilter: e.LoggingFilter = { - val arguments = Vector(classOf[Settings] → settings, classOf[EventStream] → eventStream) - dynamicAccess.createInstanceFor[e.LoggingFilter](settings.LoggingFilter, arguments).get - } - - override val log: e.LoggingAdapter = new BusLogging(eventStream, getClass.getName + "(" + name + ")", this.getClass, logFilter) - - /** - * Create the scheduler service. This one needs one special behavior: if - * Closeable, it MUST execute all outstanding tasks upon .close() in order - * to properly shutdown all dispatchers. - * - * Furthermore, this timer service MUST throw IllegalStateException if it - * cannot schedule a task. Once scheduled, the task MUST be executed. If - * executed upon close(), the task may execute before its timeout. - */ - protected def createScheduler(): a.Scheduler = - dynamicAccess.createInstanceFor[a.Scheduler](settings.untyped.SchedulerClass, immutable.Seq( - classOf[Config] → settings.config, - classOf[e.LoggingAdapter] → log, - classOf[ThreadFactory] → threadFactory.withName(threadFactory.name + "-scheduler"))).get - - override val scheduler: a.Scheduler = createScheduler() - private def closeScheduler(): Unit = scheduler match { - case x: Closeable ⇒ x.close() - case _ ⇒ - } - - /** - * Stub implementation of untyped EventStream to allow reuse of previous DispatcherConfigurator infrastructure - */ - private object eventStreamStub extends e.EventStream(null, false) { - override def subscribe(ref: a.ActorRef, ch: Class[_]): Boolean = - throw new UnsupportedOperationException("Cannot use this eventstream for subscribing") - override def publish(event: AnyRef): Unit = eventStream.publish(event) - } - /** - * Stub implementation of untyped Mailboxes to allow reuse of previous DispatcherConfigurator infrastructure - */ - private val mailboxesStub = new d.Mailboxes(settings.untyped, eventStreamStub, dynamicAccess, - new a.MinimalActorRef { - override def path = rootPath - override def provider = throw new UnsupportedOperationException("Mailboxes’ deadletter reference does not provide") - }) - - private val dispatcherPrequisites = - d.DefaultDispatcherPrerequisites(threadFactory, eventStreamStub, scheduler, dynamicAccess, settings.untyped, mailboxesStub, _ec) - override val dispatchers: Dispatchers = new DispatchersImpl(settings, log, dispatcherPrequisites) - override val executionContext: ExecutionContextExecutor = dispatchers.lookup(DispatcherDefault()) - - override val startTime: Long = System.currentTimeMillis() - override def uptime: Long = (System.currentTimeMillis() - startTime) / 1000 - - private val terminationPromise: Promise[Terminated] = Promise() - - private val rootPath: a.ActorPath = a.RootActorPath(a.Address("akka", name)) - - private val topLevelActors = new ConcurrentSkipListSet[ActorRefImpl[Nothing]] - private val terminateTriggered = new AtomicBoolean - private val theOneWhoWalksTheBubblesOfSpaceTime: ActorRefImpl[Nothing] = - new ActorRef[Nothing] with ActorRefImpl[Nothing] { - override def path: a.ActorPath = rootPath - override def tell(msg: Nothing): Unit = - throw new UnsupportedOperationException("Cannot send to theOneWhoWalksTheBubblesOfSpaceTime") - override def sendSystem(signal: SystemMessage): Unit = signal match { - case Terminate() ⇒ - if (terminateTriggered.compareAndSet(false, true)) - topLevelActors.asScala.foreach(ref ⇒ ref.sendSystem(Terminate())) - case DeathWatchNotification(ref, _) ⇒ - topLevelActors.remove(ref) - if (topLevelActors.isEmpty) { - if (terminationPromise.tryComplete(Success(Terminated(this)(null)))) { - eventStream.stopDefaultLoggers(ActorSystemImpl.this) - closeScheduler() - dispatchers.shutdown() - } - } else if (terminateTriggered.compareAndSet(false, true)) - topLevelActors.asScala.foreach(ref ⇒ ref.sendSystem(Terminate())) - case _ ⇒ // ignore - } - override def isLocal: Boolean = true - } - - private def createTopLevel[U](behavior: Behavior[U], name: String, props: Props): ActorRefImpl[U] = { - val dispatcher = props.firstOrElse[DispatcherSelector](DispatcherFromExecutionContext(executionContext)) - val capacity = props.firstOrElse(MailboxCapacity(settings.DefaultMailboxCapacity)) - val cell = new ActorCell(this, behavior, dispatchers.lookup(dispatcher), capacity.capacity, theOneWhoWalksTheBubblesOfSpaceTime) - val ref = new LocalActorRef(rootPath / name, cell) - cell.setSelf(ref) - topLevelActors.add(ref) - ref.sendSystem(Create()) - ref - } - - private val systemGuardian: ActorRefImpl[SystemCommand] = createTopLevel(systemGuardianBehavior, "system", EmptyProps) - - private val userGuardian: ActorRefImpl[T] = createTopLevel(_userGuardianBehavior, "user", _userGuardianProps) - - // now we can start up the loggers - eventStream.startUnsubscriber(this) - eventStream.startDefaultLoggers(this) - - loadExtensions() - - override def terminate(): Future[Terminated] = { - theOneWhoWalksTheBubblesOfSpaceTime.sendSystem(Terminate()) - terminationPromise.future - } - override def whenTerminated: Future[Terminated] = terminationPromise.future - - override def deadLetters[U]: ActorRefImpl[U] = - new ActorRef[U] with ActorRefImpl[U] { - override def path: a.ActorPath = rootPath - override def tell(msg: U): Unit = eventStream.publish(DeadLetter(msg)) - override def sendSystem(signal: SystemMessage): Unit = { - signal match { - case Watch(watchee, watcher) ⇒ watcher.sorryForNothing.sendSystem(DeathWatchNotification(watchee, null)) - case _ ⇒ // all good - } - eventStream.publish(DeadLetter(signal)) - } - override def isLocal: Boolean = true - } - - override def tell(msg: T): Unit = userGuardian.tell(msg) - override def sendSystem(msg: SystemMessage): Unit = userGuardian.sendSystem(msg) - override def isLocal: Boolean = true - - def systemActorOf[U](behavior: Behavior[U], name: String, props: Props)(implicit timeout: Timeout): Future[ActorRef[U]] = { - import AskPattern._ - implicit val sched = scheduler - systemGuardian ? CreateSystemActor(behavior, name, props) - } - - def printTree: String = { - def printNode(node: ActorRefImpl[Nothing], indent: String): String = { - node match { - case wc: LocalActorRef[_] ⇒ - val cell = wc.getCell - (if (indent.isEmpty) "-> " else indent.dropRight(1) + "⌊-> ") + - node.path.name + " " + e.Logging.simpleName(node) + " " + - (if (cell.behavior ne null) cell.behavior.getClass else "null") + - " status=" + cell.getStatus + - " nextMsg=" + cell.peekMessage + - (if (cell.children.isEmpty && cell.terminating.isEmpty) "" else "\n") + - ({ - val terminating = cell.terminating.toSeq.sorted.map(r ⇒ printNode(r.sorryForNothing, indent + " T")) - val children = cell.children.toSeq.sorted - val bulk = children.dropRight(1) map (r ⇒ printNode(r.sorryForNothing, indent + " |")) - terminating ++ bulk ++ (children.lastOption map (r ⇒ printNode(r.sorryForNothing, indent + " "))) - } mkString "\n") - case _ ⇒ - indent + node.path.name + " " + e.Logging.simpleName(node) - } - } - printNode(systemGuardian, "") + "\n" + - printNode(userGuardian, "") - } - -} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/DispatchersImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/DispatchersImpl.scala deleted file mode 100644 index 9116388c58..0000000000 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/DispatchersImpl.scala +++ /dev/null @@ -1,227 +0,0 @@ -/** - * Copyright (C) 2016-2017 Lightbend Inc. - */ -package akka.actor.typed -package internal - -import scala.concurrent.ExecutionContextExecutor -import java.util.concurrent.{ Executors, ExecutorService } -import akka.event.LoggingAdapter -import akka.{ actor ⇒ a, dispatch ⇒ d, event ⇒ e } -import java.util.concurrent.ConcurrentHashMap -import akka.ConfigurationException -import com.typesafe.config.{ Config, ConfigFactory } -import akka.dispatch.ExecutionContexts -import java.util.concurrent.ConcurrentSkipListSet -import java.util.Comparator - -class DispatchersImpl(settings: Settings, log: LoggingAdapter, prerequisites: d.DispatcherPrerequisites) extends Dispatchers { - - def lookup(selector: DispatcherSelector): ExecutionContextExecutor = - selector match { - case DispatcherDefault(_) ⇒ defaultGlobalDispatcher - case DispatcherFromConfig(path, _) ⇒ lookup(path) - case DispatcherFromExecutor(ex: ExecutionContextExecutor, _) ⇒ ex - case DispatcherFromExecutor(ex, _) ⇒ d.ExecutionContexts.fromExecutor(ex) - case DispatcherFromExecutionContext(ec: ExecutionContextExecutor, _) ⇒ ec - case DispatcherFromExecutionContext(ec, _) ⇒ throw new UnsupportedOperationException("I thought all ExecutionContexts are also Executors?") // FIXME - } - - def shutdown(): Unit = { - val i = allCreatedServices.iterator() - while (i.hasNext) i.next().shutdown() - allCreatedServices.clear() - } - - import Dispatchers._ - - val cachingConfig = new d.CachingConfig(settings.config) - - val defaultDispatcherConfig: Config = - idConfig(DefaultDispatcherId).withFallback(settings.config.getConfig(DefaultDispatcherId)) - - /** - * The one and only default dispatcher. - */ - def defaultGlobalDispatcher: ExecutionContextExecutor = lookup(DefaultDispatcherId) - - private val dispatcherConfigurators = new ConcurrentHashMap[String, MessageDispatcherConfigurator] - private val allCreatedServices = new ConcurrentSkipListSet[ExecutorService](new Comparator[ExecutorService] { - override def compare(left: ExecutorService, right: ExecutorService): Int = { - val l = if (left == null) 0 else left.hashCode - val r = if (right == null) 0 else right.hashCode - if (l < r) -1 else if (l > r) 1 else 0 - } - }) - - /** - * Returns a dispatcher as specified in configuration. Please note that this - * method _may_ create and return a NEW dispatcher, _every_ call. - * - * Throws ConfigurationException if the specified dispatcher cannot be found in the configuration. - */ - def lookup(id: String): ExecutionContextExecutor = - lookupConfigurator(id).dispatcher() match { - case es: ExecutorService ⇒ - allCreatedServices.add(es) - es - case ece ⇒ ece - } - - /** - * Checks that the configuration provides a section for the given dispatcher. - * This does not guarantee that no ConfigurationException will be thrown when - * using this dispatcher, because the details can only be checked by trying - * to instantiate it, which might be undesirable when just checking. - */ - def hasDispatcher(id: String): Boolean = dispatcherConfigurators.containsKey(id) || cachingConfig.hasPath(id) - - private def lookupConfigurator(id: String): MessageDispatcherConfigurator = { - dispatcherConfigurators.get(id) match { - case null ⇒ - // It doesn't matter if we create a dispatcher configurator that isn't used due to concurrent lookup. - // That shouldn't happen often and in case it does the actual ExecutorService isn't - // created until used, i.e. cheap. - val newConfigurator = - if (cachingConfig.hasPath(id)) configuratorFrom(config(id)) - else throw new ConfigurationException(s"Dispatcher [$id] not configured") - - dispatcherConfigurators.putIfAbsent(id, newConfigurator) match { - case null ⇒ newConfigurator - case existing ⇒ existing - } - - case existing ⇒ existing - } - } - - /** - * INTERNAL API - */ - private[akka] def config(id: String): Config = { - import scala.collection.JavaConverters._ - def simpleName = id.substring(id.lastIndexOf('.') + 1) - idConfig(id) - .withFallback(settings.config.getConfig(id)) - .withFallback(ConfigFactory.parseMap(Map("name" → simpleName).asJava)) - .withFallback(defaultDispatcherConfig) - } - - private def idConfig(id: String): Config = { - import scala.collection.JavaConverters._ - ConfigFactory.parseMap(Map("id" → id).asJava) - } - - /** - * INTERNAL API - * - * Creates a MessageDispatcherConfigurator from a Config. - * - * The Config must also contain a `id` property, which is the identifier of the dispatcher. - * - * Throws: IllegalArgumentException if the value of "type" is not valid - * IllegalArgumentException if it cannot create the MessageDispatcherConfigurator - */ - private def configuratorFrom(cfg: Config): MessageDispatcherConfigurator = { - if (!cfg.hasPath("id")) throw new ConfigurationException("Missing dispatcher 'id' property in config: " + cfg.root.render) - - cfg.getString("type") match { - case "Dispatcher" ⇒ new DispatcherConfigurator(cfg, prerequisites) - case "PinnedDispatcher" ⇒ new PinnedDispatcherConfigurator(cfg, prerequisites) - case fqn ⇒ - val args = List(classOf[Config] → cfg) - prerequisites.dynamicAccess.createInstanceFor[DispatcherConfigurator](fqn, args).recover({ - case exception ⇒ - throw new ConfigurationException( - ("Cannot instantiate DispatcherConfigurator type [%s], defined in [%s], " + - "make sure it has constructor with [com.typesafe.config.Config] and " + - "[akka.dispatch.DispatcherPrerequisites] parameters") - .format(fqn, cfg.getString("id")), exception) - }).get - } - } -} - -/** - * Base class to be used for hooking in new dispatchers into Dispatchers. - */ -abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites: d.DispatcherPrerequisites) { - - val config: Config = new d.CachingConfig(_config) - - /** - * Returns an instance of MessageDispatcher given the configuration. - * Depending on the needs the implementation may return a new instance for - * each invocation or return the same instance every time. - */ - def dispatcher(): ExecutionContextExecutor - - def configureExecutor(): d.ExecutorServiceConfigurator = { - def configurator(executor: String): d.ExecutorServiceConfigurator = executor match { - case null | "" | "fork-join-executor" ⇒ new d.ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites) - case "thread-pool-executor" ⇒ new d.ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites) - case fqcn ⇒ - val args = List( - classOf[Config] → config, - classOf[d.DispatcherPrerequisites] → prerequisites) - prerequisites.dynamicAccess.createInstanceFor[d.ExecutorServiceConfigurator](fqcn, args).recover({ - case exception ⇒ throw new IllegalArgumentException( - ("Cannot instantiate ExecutorServiceConfigurator (\"executor = [%s]\"), defined in [%s], " + - "make sure it has an accessible constructor with a [%s,%s] signature") - .format(fqcn, config.getString("id"), classOf[Config], classOf[d.DispatcherPrerequisites]), exception) - }).get - } - - config.getString("executor") match { - case "default-executor" ⇒ new d.DefaultExecutorServiceConfigurator(config.getConfig("default-executor"), prerequisites, configurator(config.getString("default-executor.fallback"))) - case other ⇒ configurator(other) - } - } -} - -/** - * Configurator for creating [[akka.dispatch.Dispatcher]]. - * Returns the same dispatcher instance for for each invocation - * of the `dispatcher()` method. - */ -class DispatcherConfigurator(config: Config, prerequisites: d.DispatcherPrerequisites) - extends MessageDispatcherConfigurator(config, prerequisites) { - - private val instance = ExecutionContexts.fromExecutorService( - configureExecutor().createExecutorServiceFactory(config.getString("id"), prerequisites.threadFactory) - .createExecutorService) - - /** - * Returns the same dispatcher instance for each invocation - */ - override def dispatcher(): ExecutionContextExecutor = instance -} - -/** - * Configurator for creating [[akka.dispatch.PinnedDispatcher]]. - * Returns new dispatcher instance for for each invocation - * of the `dispatcher()` method. - */ -class PinnedDispatcherConfigurator(config: Config, prerequisites: d.DispatcherPrerequisites) - extends MessageDispatcherConfigurator(config, prerequisites) { - - private val threadPoolConfig: d.ThreadPoolConfig = configureExecutor() match { - case e: d.ThreadPoolExecutorConfigurator ⇒ e.threadPoolConfig - case other ⇒ - prerequisites.eventStream.publish( - e.Logging.Warning( - "PinnedDispatcherConfigurator", - this.getClass, - "PinnedDispatcher [%s] not configured to use ThreadPoolExecutor, falling back to default config.".format( - config.getString("id")))) - d.ThreadPoolConfig() - } - - private val factory = threadPoolConfig.createExecutorServiceFactory(config.getString("id"), prerequisites.threadFactory) - - /** - * Creates new dispatcher for each invocation. - */ - override def dispatcher(): ExecutionContextExecutor = ExecutionContexts.fromExecutorService(factory.createExecutorService) - -} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/EventStreamImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/EventStreamImpl.scala deleted file mode 100644 index dd097015a4..0000000000 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/EventStreamImpl.scala +++ /dev/null @@ -1,339 +0,0 @@ -/** - * Copyright (C) 2016-2017 Lightbend Inc. - */ -package akka.actor.typed -package internal - -import akka.{ actor ⇒ a, event ⇒ e } -import java.util.concurrent.atomic.AtomicReference - -import scala.annotation.tailrec -import scala.concurrent.{ Await, Promise } -import akka.util.{ ReentrantGuard, Subclassification, SubclassifiedIndex } - -import scala.collection.immutable -import java.util.concurrent.TimeoutException - -import akka.util.Timeout -import akka.actor.typed.scaladsl.AskPattern -import akka.typed.{ EventStream, Logger } - -/** - * INTERNAL API - * - * An Akka EventStream is a pub-sub stream of events both system and user generated, - * where subscribers are ActorRefs and the channels are Classes and Events are any java.lang.Object. - * EventStreams employ SubchannelClassification, which means that if you listen to a Class, - * you'll receive any message that is of that type or a subtype. - * - * The debug flag in the constructor toggles if operations on this EventStream should also be published - * as Debug-Events - */ -private[typed] class EventStreamImpl(private val debug: Boolean)(implicit private val timeout: Timeout) extends EventStream { - import e.Logging._ - import EventStreamImpl._ - - private val unsubscriberPromise = Promise[ActorRef[Command]] - private val unsubscriber = ActorRef(unsubscriberPromise.future) - - /** - * ''Must'' be called after actor system is "ready". - * Starts system actor that takes care of unsubscribing subscribers that have terminated. - */ - def startUnsubscriber(sys: ActorSystem[Nothing]): Unit = - unsubscriberPromise.completeWith(sys.systemActorOf(unsubscriberBehavior, "eventStreamUnsubscriber")) - - private val unsubscriberBehavior = { - // TODO avoid depending on dsl here? - import scaladsl.Actor - Actor.deferred[Command] { _ ⇒ - if (debug) publish(e.Logging.Debug(simpleName(getClass), getClass, s"registering unsubscriber with $this")) - Actor.immutable[Command] { (ctx, msg) ⇒ - msg match { - case Register(actor) ⇒ - if (debug) publish(e.Logging.Debug(simpleName(getClass), getClass, s"watching $actor in order to unsubscribe from EventStream when it terminates")) - ctx.watch(actor) - Actor.same - - case UnregisterIfNoMoreSubscribedChannels(actor) if hasSubscriptions(actor) ⇒ Actor.same - // hasSubscriptions can be slow, but it's better for this actor to take the hit than the EventStream - - case UnregisterIfNoMoreSubscribedChannels(actor) ⇒ - if (debug) publish(e.Logging.Debug(simpleName(getClass), getClass, s"unwatching $actor, since has no subscriptions")) - ctx.unwatch(actor) - Actor.same - } - } onSignal { - case (_, Terminated(actor)) ⇒ - if (debug) publish(e.Logging.Debug(simpleName(getClass), getClass, s"unsubscribe $actor from $this, because it was terminated")) - unsubscribe(actor) - Actor.same - } - } - } - - private val guard = new ReentrantGuard - private var loggers = Seq.empty[(ActorRef[Logger.Command], ActorRef[LogEvent])] - @volatile private var _logLevel: LogLevel = _ - - /** - * Query currently set log level. See object Logging for more information. - */ - def logLevel = _logLevel - - /** - * Change log level: default loggers (i.e. from configuration file) are - * subscribed/unsubscribed as necessary so that they listen to all levels - * which are at least as severe as the given one. See object Logging for - * more information. - * - * NOTE: if the StandardOutLogger is configured also as normal logger, it - * will not participate in the automatic management of log level - * subscriptions! - */ - def setLogLevel(level: LogLevel): Unit = guard.withGuard { - val logLvl = _logLevel // saves (2 * AllLogLevel.size - 1) volatile reads (because of the loops below) - for { - l ← AllLogLevels - // subscribe if previously ignored and now requested - if l > logLvl && l <= level - (logger, channel) ← loggers - } subscribe(channel, classFor(l)) - for { - l ← AllLogLevels - // unsubscribe if previously registered and now ignored - if l <= logLvl && l > level - (logger, channel) ← loggers - } unsubscribe(channel, classFor(l)) - _logLevel = level - } - - private def setUpStdoutLogger(settings: Settings) { - val level = levelFor(settings.untyped.StdoutLogLevel) getOrElse { - // only log initialization errors directly with StandardOutLogger.print - StandardOutLogger.print(Error(new LoggerException, simpleName(this), this.getClass, "unknown akka.stdout-loglevel " + settings.untyped.StdoutLogLevel)) - ErrorLevel - } - AllLogLevels filter (level >= _) foreach (l ⇒ subscribe(StandardOutLogger, classFor(l))) - guard.withGuard { - loggers :+= internal.BlackholeActorRef → StandardOutLogger - _logLevel = level - } - } - - /** - * Actor-less logging implementation for synchronous logging to standard - * output. This logger is always attached first in order to be able to log - * failures during application start-up, even before normal logging is - * started. Its log level can be defined by configuration setting - * akka.stdout-loglevel. - */ - private[typed] class StandardOutLogger extends ActorRef[LogEvent] with ActorRefImpl[LogEvent] with StdOutLogger { - - override def path: a.ActorPath = StandardOutLoggerPath - - override def tell(message: LogEvent): Unit = - if (message == null) throw a.InvalidMessageException("Message must not be null") - else print(message) - - def isLocal: Boolean = true - - def sendSystem(signal: SystemMessage): Unit = () - - @throws(classOf[java.io.ObjectStreamException]) - protected def writeReplace(): AnyRef = serializedStandardOutLogger - } - - private val serializedStandardOutLogger = new SerializedStandardOutLogger - - @SerialVersionUID(1L) - private[typed] class SerializedStandardOutLogger extends Serializable { - @throws(classOf[java.io.ObjectStreamException]) - private def readResolve(): AnyRef = StandardOutLogger - } - - private val StandardOutLogger = new StandardOutLogger - - private val UnhandledMessageForwarder = { - // TODO avoid depending on dsl here? - import scaladsl.Actor.{ same, immutable } - immutable[a.UnhandledMessage] { - case (_, a.UnhandledMessage(msg, sender, rcp)) ⇒ - publish(Debug(rcp.path.toString, rcp.getClass, "unhandled message from " + sender + ": " + msg)) - same - } - } - - def startStdoutLogger(settings: Settings) { - setUpStdoutLogger(settings) - publish(Debug(simpleName(this), this.getClass, "StandardOutLogger started")) - } - - def startDefaultLoggers(system: ActorSystemImpl[Nothing]) { - val logName = simpleName(this) + "(" + system + ")" - val level = levelFor(system.settings.untyped.LogLevel) getOrElse { - // only log initialization errors directly with StandardOutLogger.print - StandardOutLogger.print(Error(new LoggerException, logName, this.getClass, "unknown akka.loglevel " + system.settings.untyped.LogLevel)) - ErrorLevel - } - try { - val defaultLoggers = system.settings.Loggers match { - case Nil ⇒ classOf[DefaultLogger].getName :: Nil - case loggers ⇒ loggers - } - val myloggers = - for { - loggerName ← defaultLoggers - if loggerName != StandardOutLogger.getClass.getName - } yield { - system.dynamicAccess.getClassFor[Logger](loggerName).map(addLogger(system, _, level, logName)) - .recover({ - case e ⇒ throw new akka.ConfigurationException( - "Logger specified in config can't be loaded [" + loggerName + - "] due to [" + e.toString + "]", e) - }).get - } - guard.withGuard { - loggers = myloggers - _logLevel = level - } - if (system.settings.untyped.DebugUnhandledMessage) - subscribe( - ActorRef( - system.systemActorOf(UnhandledMessageForwarder, "UnhandledMessageForwarder")), - classOf[a.UnhandledMessage]) - publish(Debug(logName, this.getClass, "Default Loggers started")) - if (!(defaultLoggers contains StandardOutLogger.getClass.getName)) { - unsubscribe(StandardOutLogger) - } - } catch { - case e: Exception ⇒ - if (!system.whenTerminated.isCompleted) { - System.err.println("error while starting up loggers") - e.printStackTrace() - throw new akka.ConfigurationException("Could not start logger due to [" + e.toString + "]") - } - } - } - - def stopDefaultLoggers(system: ActorSystem[Nothing]) { - val level = _logLevel // volatile access before reading loggers - if (!(loggers contains StandardOutLogger)) { - setUpStdoutLogger(system.settings) - publish(Debug(simpleName(this), this.getClass, "shutting down: StandardOutLogger started")) - } - for { - (logger, channel) ← loggers - } { - // this is very necessary, else you get infinite loop with DeadLetter - unsubscribe(channel) - import internal._ - logger.sorry.sendSystem(Terminate()) - } - publish(Debug(simpleName(this), this.getClass, "all default loggers stopped")) - } - - private def addLogger( - system: ActorSystemImpl[Nothing], - clazz: Class[_ <: Logger], - level: LogLevel, - logName: String): (ActorRef[Logger.Command], ActorRef[LogEvent]) = { - val name = "log" + system.loggerId() + "-" + simpleName(clazz) - val logger = clazz.newInstance() - val actor = ActorRef(system.systemActorOf(logger.initialBehavior, name, DispatcherFromConfig(system.settings.untyped.LoggersDispatcher))) - import AskPattern._ - implicit val scheduler = system.scheduler - val logChannel = try Await.result(actor ? (Logger.Initialize(this, _: ActorRef[ActorRef[LogEvent]])), timeout.duration) catch { - case ex: TimeoutException ⇒ - publish(Warning(logName, this.getClass, "Logger " + name + " did not respond within " + timeout + " to InitializeLogger(bus)")) - throw ex - } - AllLogLevels filter (level >= _) foreach (l ⇒ subscribe(logChannel, classFor(l))) - publish(Debug(logName, this.getClass, "logger " + name + " started")) - (actor, logChannel) - } - - private val subscriptions = new SubclassifiedIndex[Class[_], ActorRef[Any]]()(subclassification) - - @volatile - private var cache = Map.empty[Class[_], Set[ActorRef[Any]]] - - override def subscribe[T](subscriber: ActorRef[T], channel: Class[T]): Boolean = { - if (subscriber eq null) throw new IllegalArgumentException("subscriber is null") - if (debug) publish(e.Logging.Debug(simpleName(this), this.getClass, "subscribing " + subscriber + " to channel " + channel)) - unsubscriber ! Register(subscriber) - subscriptions.synchronized { - val diff = subscriptions.addValue(channel, subscriber.upcast[Any]) - addToCache(diff) - diff.nonEmpty - } - } - - override def unsubscribe[T](subscriber: ActorRef[T], channel: Class[T]): Boolean = { - if (subscriber eq null) throw new IllegalArgumentException("subscriber is null") - val ret = subscriptions.synchronized { - val diff = subscriptions.removeValue(channel, subscriber.upcast[Any]) - // removeValue(K, V) does not return the diff to remove from or add to the cache - // but instead the whole set of keys and values that should be updated in the cache - cache ++= diff - diff.nonEmpty - } - if (debug) publish(e.Logging.Debug(simpleName(this), this.getClass, "unsubscribing " + subscriber + " from channel " + channel)) - unsubscriber ! UnregisterIfNoMoreSubscribedChannels(subscriber.upcast[Any]) - ret - } - - override def unsubscribe[T](subscriber: ActorRef[T]) { - if (subscriber eq null) throw new IllegalArgumentException("subscriber is null") - subscriptions.synchronized { - removeFromCache(subscriptions.removeValue(subscriber.upcast[Any])) - } - if (debug) publish(e.Logging.Debug(simpleName(this), this.getClass, "unsubscribing " + subscriber + " from all channels")) - unsubscriber ! UnregisterIfNoMoreSubscribedChannels(subscriber.upcast[Any]) - } - - override def publish[T](event: T): Unit = { - val c = event.asInstanceOf[AnyRef].getClass - val recv = - if (cache contains c) cache(c) // c will never be removed from cache - else subscriptions.synchronized { - if (cache contains c) cache(c) - else { - addToCache(subscriptions.addKey(c)) - cache(c) - } - } - recv foreach (_ ! event) - } - - /** - * Expensive call! Avoid calling directly from event bus subscribe / unsubscribe. - */ - private def hasSubscriptions(subscriber: ActorRef[Any]): Boolean = - cache.values exists { _ contains subscriber } - - private def removeFromCache(changes: immutable.Seq[(Class[_], Set[ActorRef[Any]])]): Unit = - cache = (cache /: changes) { - case (m, (c, cs)) ⇒ m.updated(c, m.getOrElse(c, Set.empty[ActorRef[Any]]) diff cs) - } - - private def addToCache(changes: immutable.Seq[(Class[_], Set[ActorRef[Any]])]): Unit = - cache = (cache /: changes) { - case (m, (c, cs)) ⇒ m.updated(c, m.getOrElse(c, Set.empty[ActorRef[Any]]) union cs) - } -} - -private[typed] object EventStreamImpl { - - sealed trait Command - final case class Register(actor: ActorRef[Nothing]) extends Command - final case class UnregisterIfNoMoreSubscribedChannels(actor: ActorRef[Any]) extends Command - - private val subclassification = new Subclassification[Class[_]] { - def isEqual(x: Class[_], y: Class[_]) = x == y - def isSubclass(x: Class[_], y: Class[_]) = y isAssignableFrom x - } - - val StandardOutLoggerPath = a.RootActorPath(a.Address("akka.actor.typed.internal", "StandardOutLogger")) - -} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ExtensionsImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ExtensionsImpl.scala index f6381691b7..505d971239 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ExtensionsImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ExtensionsImpl.scala @@ -25,7 +25,7 @@ trait ExtensionsImpl extends Extensions { self: ActorSystem[_] ⇒ /** * Hook for ActorSystem to load extensions on startup */ - protected final def loadExtensions(): Unit = { + final def loadExtensions(): Unit = { /** * @param throwOnLoadFail Throw exception when an extension fails to load (needed for backwards compatibility) */ @@ -57,10 +57,6 @@ trait ExtensionsImpl extends Extensions { self: ActorSystem[_] ⇒ } } - // eager initialization of CoordinatedShutdown - // TODO coordinated shutdown for akka.actor.typed - // CoordinatedShutdown(self) - loadExtensions("akka.typed.library-extensions", throwOnLoadFail = true) loadExtensions("akka.typed.extensions", throwOnLoadFail = false) } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/SupervisionMechanics.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/SupervisionMechanics.scala index ea795ab3a1..759f86e040 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/SupervisionMechanics.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/SupervisionMechanics.scala @@ -14,8 +14,6 @@ import akka.util.OptionVal * INTERNAL API */ private[typed] trait SupervisionMechanics[T] { - import ActorCell._ - /* * INTERFACE WITH ACTOR CELL */ @@ -46,7 +44,6 @@ private[typed] trait SupervisionMechanics[T] { * Process one system message and return whether further messages shall be processed. */ protected def processSignal(message: SystemMessage): Boolean = { - if (ActorCell.Debug) println(s"[${Thread.currentThread.getName}] $self processing system message $message") message match { case Watch(watchee, watcher) ⇒ { addWatcher(watchee.sorryForNothing, watcher.sorryForNothing); true } case Unwatch(watchee, watcher) ⇒ { remWatcher(watchee.sorryForNothing, watcher.sorryForNothing); true } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/SystemMessage.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/SystemMessage.scala index f5a411f527..1d28fa2ba1 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/SystemMessage.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/SystemMessage.scala @@ -197,25 +197,25 @@ private[akka] sealed trait SystemMessage extends Serializable { * INTERNAL API */ @SerialVersionUID(1L) -private[typed] final case class Create() extends SystemMessage +private[akka] final case class Create() extends SystemMessage /** * INTERNAL API */ @SerialVersionUID(1L) -private[typed] final case class Terminate() extends SystemMessage +private[akka] final case class Terminate() extends SystemMessage /** * INTERNAL API */ @SerialVersionUID(1L) -private[typed] final case class Watch(watchee: ActorRef[Nothing], watcher: ActorRef[Nothing]) extends SystemMessage +private[akka] final case class Watch(watchee: ActorRef[Nothing], watcher: ActorRef[Nothing]) extends SystemMessage /** * INTERNAL API */ @SerialVersionUID(1L) -private[typed] final case class Unwatch(watchee: ActorRef[Nothing], watcher: ActorRef[Nothing]) extends SystemMessage +private[akka] final case class Unwatch(watchee: ActorRef[Nothing], watcher: ActorRef[Nothing]) extends SystemMessage /** * INTERNAL API diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorRefAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorRefAdapter.scala index acccd800b0..0116257e28 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorRefAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorRefAdapter.scala @@ -17,6 +17,7 @@ import akka.dispatch.sysmsg extends ActorRef[T] with internal.ActorRefImpl[T] { override def path: a.ActorPath = untyped.path + override def tell(msg: T): Unit = { if (msg == null) throw new InvalidMessageException("[null] is not an allowed message") untyped ! msg diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorSystemAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorSystemAdapter.scala index 7f5444679f..045f7f1326 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorSystemAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorSystemAdapter.scala @@ -25,15 +25,19 @@ import akka.typed.EventStream @InternalApi private[akka] class ActorSystemAdapter[-T](val untyped: a.ActorSystemImpl) extends ActorSystem[T] with ActorRef[T] with internal.ActorRefImpl[T] with ExtensionsImpl { + loadExtensions() + import ActorRefAdapter.sendSystemMessage // Members declared in akka.actor.typed.ActorRef override def tell(msg: T): Unit = { - if (msg == null) throw new InvalidMessageException("[null] is not an allowed message") + if (msg == null) throw InvalidMessageException("[null] is not an allowed message") untyped.guardian ! msg } + override def isLocal: Boolean = true override def sendSystem(signal: internal.SystemMessage): Unit = sendSystemMessage(untyped.guardian, signal) + final override val path: a.ActorPath = a.RootActorPath(a.Address("akka", untyped.name)) / "user" override def toString: String = untyped.toString @@ -87,6 +91,7 @@ private[akka] object ActorSystemAdapter { class AdapterExtension(system: a.ExtendedActorSystem) extends a.Extension { val adapter = new ActorSystemAdapter(system.asInstanceOf[a.ActorSystemImpl]) } + object AdapterExtension extends a.ExtensionId[AdapterExtension] with a.ExtensionIdProvider { override def get(system: a.ActorSystem): AdapterExtension = super.get(system) override def lookup = AdapterExtension diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala index 313b4d9009..3b99ac8e25 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AskPattern.scala @@ -9,7 +9,6 @@ import akka.actor.InternalActorRef import akka.pattern.AskTimeoutException import akka.pattern.PromiseActorRef import akka.actor.Scheduler -import akka.actor.typed.internal.FunctionRef import akka.actor.RootActorPath import akka.actor.Address import akka.annotation.InternalApi @@ -64,7 +63,7 @@ object AskPattern { ref match { case a: adapt.ActorRefAdapter[_] ⇒ askUntyped(ref, a.untyped, timeout, f) case a: adapt.ActorSystemAdapter[_] ⇒ askUntyped(ref, a.untyped.guardian, timeout, f) - case _ ⇒ ask(ref, timeout, scheduler, f) + case a ⇒ throw new IllegalStateException("Only expect actor references to be ActorRefAdapter or ActorSystemAdapter until native system is implemented: " + a.getClass) } } @@ -99,24 +98,5 @@ object AskPattern { p.future } - private def ask[T, U](actorRef: ActorRef[T], timeout: Timeout, scheduler: Scheduler, f: ActorRef[U] ⇒ T): Future[U] = { - import akka.dispatch.ExecutionContexts.{ sameThreadExecutionContext ⇒ ec } - val p = Promise[U] - val ref = new FunctionRef[U]( - AskPath, - (msg, self) ⇒ { - p.trySuccess(msg) - self.sendSystem(akka.actor.typed.internal.Terminate()) - }, - (self) ⇒ if (!p.isCompleted) p.tryFailure(new NoSuchElementException("ask pattern terminated before value was received"))) - actorRef ! f(ref) - val d = timeout.duration - val c = scheduler.scheduleOnce(d)(p.tryFailure(new AskTimeoutException(s"did not receive message within $d")))(ec) - val future = p.future - future.andThen { - case _ ⇒ c.cancel() - }(ec) - } - private[typed] val AskPath = RootActorPath(Address("akka.actor.typed.internal", "ask")) } diff --git a/akka-actor-typed/src/main/scala/akka/typed/EventStream.scala b/akka-actor-typed/src/main/scala/akka/typed/EventStream.scala index c51739ef40..a1b06e38fb 100644 --- a/akka-actor-typed/src/main/scala/akka/typed/EventStream.scala +++ b/akka-actor-typed/src/main/scala/akka/typed/EventStream.scala @@ -64,34 +64,6 @@ object Logger { // FIXME add Mute/Unmute (i.e. the TestEventListener functionality) } -class DefaultLogger extends Logger with StdOutLogger { - import Logger._ - - val initialBehavior = { - // TODO avoid depending on dsl here? - import akka.actor.typed.scaladsl.Actor._ - deferred[Command] { _ ⇒ - immutable[Command] { - case (ctx, Initialize(eventStream, replyTo)) ⇒ - val log = ctx.spawn(deferred[AnyRef] { childCtx ⇒ - - immutable[AnyRef] { - case (_, event: LogEvent) ⇒ - print(event) - same - case _ ⇒ unhandled - } - }, "logger") - - ctx.watch(log) // sign death pact - replyTo ! log - - empty - } - } - } -} - class DefaultLoggingFilter(settings: Settings, eventStream: EventStream) extends e.DefaultLoggingFilter(() ⇒ eventStream.logLevel) /** diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 91dd7eee7f..83bf5bbb25 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -973,7 +973,9 @@ private[akka] class ActorSystemImpl( private[this] final val ref = new AtomicReference(done) // onComplete never fires twice so safe to avoid null check - upStreamTerminated onComplete { t ⇒ ref.getAndSet(null).complete(t) } + upStreamTerminated onComplete { + t ⇒ ref.getAndSet(null).complete(t) + } /** * Adds a Runnable that will be executed on ActorSystem termination. diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonApiSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonApiSpec.scala index 6c15d5fb95..0b170a72a5 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonApiSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonApiSpec.scala @@ -116,7 +116,7 @@ class ClusterSingletonApiSpec extends TypedSpec(ClusterSingletonApiSpec.config) node1UpProbe.expectMsgType[SelfUp] node2UpProbe.expectMsgType[SelfUp] - val cs1 = ClusterSingleton(system) + val cs1: ClusterSingleton = ClusterSingleton(system) val cs2 = ClusterSingleton(adaptedSystem2) val settings = ClusterSingletonSettings(system).withRole("singleton") diff --git a/akka-testkit-typed/src/main/scala/akka/typed/testkit/Inbox.scala b/akka-testkit-typed/src/main/scala/akka/typed/testkit/Inbox.scala index 0853580103..8e048eb53c 100644 --- a/akka-testkit-typed/src/main/scala/akka/typed/testkit/Inbox.scala +++ b/akka-testkit-typed/src/main/scala/akka/typed/testkit/Inbox.scala @@ -6,7 +6,7 @@ package akka.typed.testkit import java.util.concurrent.{ ConcurrentLinkedQueue, ThreadLocalRandom } import akka.actor.{ Address, RootActorPath } -import akka.actor.typed.{ ActorRef, internal } +import akka.actor.typed.ActorRef import scala.annotation.tailrec import scala.collection.immutable @@ -24,7 +24,7 @@ class Inbox[T](name: String) { val ref: ActorRef[T] = { val uid = ThreadLocalRandom.current().nextInt() val path = RootActorPath(Address("akka.actor.typed.inbox", "anonymous")).child(name).withUid(uid) - new internal.FunctionRef[T](path, (msg, self) ⇒ q.add(msg), (self) ⇒ ()) + new FunctionRef[T](path, (msg, self) ⇒ q.add(msg), (self) ⇒ ()) } def receiveMsg(): T = q.poll() match { diff --git a/akka-testkit-typed/src/main/scala/akka/typed/testkit/StubbedActorContext.scala b/akka-testkit-typed/src/main/scala/akka/typed/testkit/StubbedActorContext.scala index e308c612e9..b8b9934d08 100644 --- a/akka-testkit-typed/src/main/scala/akka/typed/testkit/StubbedActorContext.scala +++ b/akka-testkit-typed/src/main/scala/akka/typed/testkit/StubbedActorContext.scala @@ -1,14 +1,108 @@ package akka.typed.testkit +import akka.actor.InvalidMessageException import akka.{ actor ⇒ untyped } import akka.actor.typed._ import akka.util.Helpers +import akka.{ actor ⇒ a } +import akka.util.Unsafe.{ instance ⇒ unsafe } import scala.collection.immutable.TreeMap import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration.FiniteDuration import akka.annotation.InternalApi -import akka.actor.typed.internal.ActorContextImpl +import akka.actor.typed.internal.{ ActorContextImpl, ActorRefImpl } + +import scala.annotation.tailrec +import scala.util.control.NonFatal + +/** + * A local synchronous ActorRef that invokes the given function for every message send. + * This reference can be watched and will do the right thing when it receives a [[akka.actor.typed.internal.DeathWatchNotification]]. + * This reference cannot watch other references. + */ +private[akka] final class FunctionRef[-T]( + _path: a.ActorPath, + send: (T, FunctionRef[T]) ⇒ Unit, + _terminate: FunctionRef[T] ⇒ Unit) + extends WatchableRef[T](_path) { + + override def tell(msg: T): Unit = { + if (msg == null) throw InvalidMessageException("[null] is not an allowed message") + if (isAlive) + try send(msg, this) catch { + case NonFatal(ex) ⇒ // nothing we can do here + } + else () // we don’t have deadLetters available + } + + import internal._ + + override def sendSystem(signal: SystemMessage): Unit = signal match { + case internal.Create() ⇒ // nothing to do + case internal.DeathWatchNotification(ref, cause) ⇒ // we’re not watching, and we’re not a parent either + case internal.Terminate() ⇒ doTerminate() + case internal.Watch(watchee, watcher) ⇒ if (watchee == this && watcher != this) addWatcher(watcher.sorryForNothing) + case internal.Unwatch(watchee, watcher) ⇒ if (watchee == this && watcher != this) remWatcher(watcher.sorryForNothing) + case NoMessage ⇒ // nothing to do + } + + override def isLocal = true + + override def terminate(): Unit = _terminate(this) +} + +/** + * The mechanics for synthetic ActorRefs that have a lifecycle and support being watched. + */ +private[typed] abstract class WatchableRef[-T](override val path: a.ActorPath) extends ActorRef[T] with ActorRefImpl[T] { + import WatchableRef._ + + /** + * Callback that is invoked when this ref has terminated. Even if doTerminate() is + * called multiple times, this callback is invoked only once. + */ + protected def terminate(): Unit + + type S = Set[ActorRefImpl[Nothing]] + @volatile private[this] var _watchedBy: S = Set.empty + + protected def isAlive: Boolean = _watchedBy != null + + protected def doTerminate(): Unit = { + val watchedBy = unsafe.getAndSetObject(this, watchedByOffset, null).asInstanceOf[S] + if (watchedBy != null) { + try terminate() catch { case NonFatal(ex) ⇒ } + if (watchedBy.nonEmpty) watchedBy foreach sendTerminated + } + } + + private def sendTerminated(watcher: ActorRefImpl[Nothing]): Unit = + watcher.sendSystem(internal.DeathWatchNotification(this, null)) + + @tailrec final protected def addWatcher(watcher: ActorRefImpl[Nothing]): Unit = + _watchedBy match { + case null ⇒ sendTerminated(watcher) + case watchedBy ⇒ + if (!watchedBy.contains(watcher)) + if (!unsafe.compareAndSwapObject(this, watchedByOffset, watchedBy, watchedBy + watcher)) + addWatcher(watcher) // try again + } + + @tailrec final protected def remWatcher(watcher: ActorRefImpl[Nothing]): Unit = { + _watchedBy match { + case null ⇒ // do nothing... + case watchedBy ⇒ + if (watchedBy.contains(watcher)) + if (!unsafe.compareAndSwapObject(this, watchedByOffset, watchedBy, watchedBy - watcher)) + remWatcher(watcher) // try again + } + } +} + +private[typed] object WatchableRef { + val watchedByOffset = unsafe.objectFieldOffset(classOf[WatchableRef[_]].getDeclaredField("_watchedBy")) +} /** * An [[ActorContext]] for synchronous execution of a [[Behavior]] that @@ -72,10 +166,12 @@ class StubbedActorContext[T]( * INTERNAL API */ @InternalApi private[akka] def internalSpawnAdapter[U](f: U ⇒ T, name: String): ActorRef[U] = { + val n = if (name != "") s"${childName.next()}-$name" else childName.next() val i = Inbox[U](n) _children += i.ref.path.name → i - new internal.FunctionRef[U]( + + new FunctionRef[U]( self.path / i.ref.path.name, (msg, _) ⇒ { val m = f(msg); if (m != null) { selfInbox.ref ! m; i.ref ! msg } }, (self) ⇒ selfInbox.ref.sorry.sendSystem(internal.DeathWatchNotification(self, null))) diff --git a/akka-testkit-typed/src/test/scala/akka/typed/testkit/EffectfulActorContextSpec.scala b/akka-testkit-typed/src/test/scala/akka/typed/testkit/EffectfulActorContextSpec.scala index 09efd00e13..c8f4afe581 100644 --- a/akka-testkit-typed/src/test/scala/akka/typed/testkit/EffectfulActorContextSpec.scala +++ b/akka-testkit-typed/src/test/scala/akka/typed/testkit/EffectfulActorContextSpec.scala @@ -80,7 +80,7 @@ object EffectfulActorContextSpec { class EffectfulActorContextSpec extends FlatSpec with Matchers { - private val props = Props.empty.withMailboxCapacity(10) + private val props = Props.empty "EffectfulActorContext's spawn" should "create children when no props specified" in { val system = ActorSystem.create(Father.init(), "father-system") diff --git a/build.sbt b/build.sbt index a6e026503f..0f532cf99d 100644 --- a/build.sbt +++ b/build.sbt @@ -445,3 +445,11 @@ def akkaModule(name: String): Project = .settings(akka.AkkaBuild.defaultSettings) .settings(akka.Formatting.formatSettings) .enablePlugins(BootstrapGenjavadoc) + +lazy val typedTests = taskKey[Unit]("Runs all the typed tests") +typedTests := { + (test in(actorTyped, Test)).value + (test in(actorTypedTests, Test)).value + (test in(clusterTyped, Test)).value + (test in(clusterShardingTyped, Test)).value +}