Remove akka typed actor system implementation

The end goal for akka is to have a new runtime that can be optomoized
for typed actors. However to get the API production ready so it can
start getting adopted  it has been decided to only have adapted actor
systems initially.

Further discussion here: https://github.com/akka/akka/issues/24149
This commit is contained in:
Christopher Batey 2017-12-18 12:09:59 +00:00
parent f7b3b483a8
commit c394ee7aaa
44 changed files with 194 additions and 2810 deletions

View file

@ -13,8 +13,7 @@ import java.util.Optional;
import static junit.framework.TestCase.assertSame;
import static org.junit.Assert.assertTrue;
public class ExtensionsTest extends JUnitSuite {
class ExtensionsTest extends JUnitSuite {
public static class MyExtImpl implements Extension {
}
@ -46,7 +45,7 @@ public class ExtensionsTest extends JUnitSuite {
Behavior.empty(),
"loadJavaExtensionsFromConfig",
Optional.empty(),
Optional.of(ConfigFactory.parseString("akka.typed.extensions += \"akka.actor.typed.ExtensionsTest$MyExtension\"").resolve()),
Optional.of(ConfigFactory.parseString("akka.typed.extensions += \"akka.typed.ExtensionsTest$MyExtension\"").resolve()),
Optional.empty(),
Optional.empty()
);

View file

@ -285,9 +285,7 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
implicit def system: ActorSystem[TypedSpec.Command]
private def mySuite: String =
if (system eq nativeSystem) suite + "Native"
else suite + "Adapted"
private def mySuite: String = suite + "Adapted"
def setup(name: String, wrapper: Option[Behavior[Command] Behavior[Command]] = None, ignorePostStop: Boolean = true)(
proc: (scaladsl.ActorContext[Event], StepWise.Steps[Event, ActorRef[Command]]) StepWise.Steps[Event, _]): Future[TypedSpec.Status] =
@ -635,7 +633,6 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] =
subject(ctx.self, ignorePostStop)
}
object `An ActorContext (native)` extends Normal with NativeSystem
object `An ActorContext (adapted)` extends Normal with AdaptedSystem
trait Widened extends Tests {
@ -644,7 +641,6 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] =
subject(ctx.self, ignorePostStop).widen { case x x }
}
object `An ActorContext with widened Behavior (native)` extends Widened with NativeSystem
object `An ActorContext with widened Behavior (adapted)` extends Widened with AdaptedSystem
trait Deferred extends Tests {
@ -652,7 +648,6 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] =
Actor.deferred(_ subject(ctx.self, ignorePostStop))
}
object `An ActorContext with deferred Behavior (native)` extends Deferred with NativeSystem
object `An ActorContext with deferred Behavior (adapted)` extends Deferred with AdaptedSystem
trait NestedDeferred extends Tests {
@ -660,7 +655,6 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] =
Actor.deferred(_ Actor.deferred(_ subject(ctx.self, ignorePostStop)))
}
object `An ActorContext with nested deferred Behavior (native)` extends NestedDeferred with NativeSystem
object `An ActorContext with nested deferred Behavior (adapted)` extends NestedDeferred with AdaptedSystem
trait Tap extends Tests {
@ -668,7 +662,5 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
override def behavior(ctx: scaladsl.ActorContext[Event], ignorePostStop: Boolean): Behavior[Command] =
Actor.tap((_, _) (), (_, _) (), subject(ctx.self, ignorePostStop))
}
object `An ActorContext with Tap (old-native)` extends Tap with NativeSystem
object `An ActorContext with Tap (old-adapted)` extends Tap with AdaptedSystem
}

View file

@ -56,8 +56,6 @@ class AskSpec extends TypedSpec with ScalaFutures {
}
}
object `Ask pattern (native)` extends Common with NativeSystem
object `Ask pattern (adapted)` extends Common with AdaptedSystem {
import AskSpec._

View file

@ -286,7 +286,6 @@ class BehaviorSpec extends TypedSpec {
trait FullBehavior extends Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = mkFull(monitor) null
}
object `A Full Behavior (native)` extends FullBehavior with NativeSystem
object `A Full Behavior (adapted)` extends FullBehavior with AdaptedSystem
trait ImmutableBehavior extends Messages with BecomeWithLifecycle with Stoppable {
@ -320,7 +319,6 @@ class BehaviorSpec extends TypedSpec {
}
}
}
object `A immutable Behavior (native)` extends ImmutableBehavior with NativeSystem
object `A immutable Behavior (adapted)` extends ImmutableBehavior with AdaptedSystem
trait ImmutableWithSignalScalaBehavior extends Messages with BecomeWithLifecycle with Stoppable {
@ -356,7 +354,6 @@ class BehaviorSpec extends TypedSpec {
SActor.same
}
}
object `A ImmutableWithSignal Behavior (scala,native)` extends ImmutableWithSignalScalaBehavior with NativeSystem
object `A ImmutableWithSignal Behavior (scala,adapted)` extends ImmutableWithSignalScalaBehavior with AdaptedSystem
trait ImmutableScalaBehavior extends Messages with Become with Stoppable {
@ -387,7 +384,6 @@ class BehaviorSpec extends TypedSpec {
}
}
}
object `A immutable Behavior (scala,native)` extends ImmutableScalaBehavior with NativeSystem
object `A immutable Behavior (scala,adapted)` extends ImmutableScalaBehavior with AdaptedSystem
trait MutableScalaBehavior extends Messages with Become with Stoppable {
@ -425,7 +421,6 @@ class BehaviorSpec extends TypedSpec {
}
}
}
object `A mutable Behavior (scala,native)` extends MutableScalaBehavior with NativeSystem
object `A mutable Behavior (scala,adapted)` extends MutableScalaBehavior with AdaptedSystem
trait WidenedScalaBehavior extends ImmutableWithSignalScalaBehavior with Reuse with Siphon {
@ -436,7 +431,6 @@ class BehaviorSpec extends TypedSpec {
super.behavior(monitor)._1.widen[Command] { case c inbox.ref ! c; c } inbox
}
}
object `A widened Behavior (scala,native)` extends WidenedScalaBehavior with NativeSystem
object `A widened Behavior (scala,adapted)` extends WidenedScalaBehavior with AdaptedSystem
trait DeferredScalaBehavior extends ImmutableWithSignalScalaBehavior {
@ -453,7 +447,6 @@ class BehaviorSpec extends TypedSpec {
override def checkAux(signal: Signal, aux: Aux): Unit =
aux.receiveAll() should ===(Done :: Nil)
}
object `A deferred Behavior (scala,native)` extends DeferredScalaBehavior with NativeSystem
object `A deferred Behavior (scala,adapted)` extends DeferredScalaBehavior with AdaptedSystem
trait TapScalaBehavior extends ImmutableWithSignalScalaBehavior with Reuse with SignalSiphon {
@ -462,7 +455,6 @@ class BehaviorSpec extends TypedSpec {
(SActor.tap((_, msg) inbox.ref ! Right(msg), (_, sig) inbox.ref ! Left(sig), super.behavior(monitor)._1), inbox)
}
}
object `A tap Behavior (scala,native)` extends TapScalaBehavior with NativeSystem
object `A tap Behavior (scala,adapted)` extends TapScalaBehavior with AdaptedSystem
trait RestarterScalaBehavior extends ImmutableWithSignalScalaBehavior with Reuse {
@ -470,7 +462,6 @@ class BehaviorSpec extends TypedSpec {
SActor.supervise(super.behavior(monitor)._1).onFailure(SupervisorStrategy.restart) null
}
}
object `A restarter Behavior (scala,native)` extends RestarterScalaBehavior with NativeSystem
object `A restarter Behavior (scala,adapted)` extends RestarterScalaBehavior with AdaptedSystem
/*
@ -536,7 +527,6 @@ class BehaviorSpec extends TypedSpec {
SActor.same
}))
}
object `A ImmutableWithSignal Behavior (java,native)` extends ImmutableWithSignalJavaBehavior with NativeSystem
object `A ImmutableWithSignal Behavior (java,adapted)` extends ImmutableWithSignalJavaBehavior with AdaptedSystem
trait ImmutableJavaBehavior extends Messages with Become with Stoppable {
@ -568,7 +558,6 @@ class BehaviorSpec extends TypedSpec {
})
}
}
object `A immutable Behavior (java,native)` extends ImmutableJavaBehavior with NativeSystem
object `A immutable Behavior (java,adapted)` extends ImmutableJavaBehavior with AdaptedSystem
trait WidenedJavaBehavior extends ImmutableWithSignalJavaBehavior with Reuse with Siphon {
@ -577,7 +566,6 @@ class BehaviorSpec extends TypedSpec {
JActor.widened(super.behavior(monitor)._1, pf(_.`match`(classOf[Command], fi(x { inbox.ref ! x; x })))) inbox
}
}
object `A widened Behavior (java,native)` extends WidenedJavaBehavior with NativeSystem
object `A widened Behavior (java,adapted)` extends WidenedJavaBehavior with AdaptedSystem
trait DeferredJavaBehavior extends ImmutableWithSignalJavaBehavior {
@ -594,7 +582,6 @@ class BehaviorSpec extends TypedSpec {
override def checkAux(signal: Signal, aux: Aux): Unit =
aux.receiveAll() should ===(Done :: Nil)
}
object `A deferred Behavior (java,native)` extends DeferredJavaBehavior with NativeSystem
object `A deferred Behavior (java,adapted)` extends DeferredJavaBehavior with AdaptedSystem
trait TapJavaBehavior extends ImmutableWithSignalJavaBehavior with Reuse with SignalSiphon {
@ -606,7 +593,6 @@ class BehaviorSpec extends TypedSpec {
super.behavior(monitor)._1), inbox)
}
}
object `A tap Behavior (java,native)` extends TapJavaBehavior with NativeSystem
object `A tap Behavior (java,adapted)` extends TapJavaBehavior with AdaptedSystem
trait RestarterJavaBehavior extends ImmutableWithSignalJavaBehavior with Reuse {
@ -615,7 +601,6 @@ class BehaviorSpec extends TypedSpec {
.onFailure(classOf[Exception], SupervisorStrategy.restart) null
}
}
object `A restarter Behavior (java,native)` extends RestarterJavaBehavior with NativeSystem
object `A restarter Behavior (java,adapted)` extends RestarterJavaBehavior with AdaptedSystem
}

View file

@ -158,10 +158,7 @@ class DeferredSpec extends TypedSpec {
}
object `A DeferredBehavior (stubbed, native)` extends StubbedTests with NativeSystem
object `A DeferredBehavior (stubbed, adapted)` extends StubbedTests with AdaptedSystem
object `A DeferredBehavior (real, native)` extends RealTests with NativeSystem
object `A DeferredBehavior (real, adapted)` extends RealTests with AdaptedSystem
}

View file

@ -99,6 +99,7 @@ class ExtensionsSpec extends TypedSpecSetup {
akka.typed.extensions = ["akka.actor.typed.FailingToLoadExtension$"]
""")))
}
intercept[RuntimeException] {
create()
}

View file

@ -62,7 +62,7 @@ class PerformanceSpec extends TypedSpec(
}
}
val iterations = nativeSystem.settings.config.getInt("akka.actor.typed.PerformanceSpec.iterations")
val iterations = system.settings.config.getInt("akka.actor.typed.PerformanceSpec.iterations")
trait CommonTests {
implicit def system: ActorSystem[TypedSpec.Command]
@ -78,8 +78,6 @@ class PerformanceSpec extends TypedSpec(
def `09 when using 8 pairs with 10 messages`(): Unit = sync(runTest("09")(behavior(8, 10, iterations, "dispatcher-8")))
}
object `must be fast with native ActorSystem` extends CommonTests with NativeSystem
object `must be fast with ActorSystemAdapter` extends CommonTests with AdaptedSystem
}
}

View file

@ -5,35 +5,16 @@ package akka.actor.typed
class PropsSpec extends TypedSpecSetup {
val dispatcherFirst = DispatcherDefault(MailboxCapacity(666, DispatcherFromConfig("pool")))
val mailboxFirst = MailboxCapacity(999) withNext dispatcherFirst
val dispatcherFirst = DispatcherDefault(DispatcherFromConfig("pool"))
object `A Props` {
def `must get first dispatcher`(): Unit = {
dispatcherFirst.firstOrElse[DispatcherSelector](null) should ===(dispatcherFirst)
mailboxFirst.firstOrElse[DispatcherSelector](null) should ===(dispatcherFirst)
}
def `must get first mailbox`(): Unit = {
dispatcherFirst.firstOrElse[MailboxCapacity](null).capacity should ===(666)
mailboxFirst.firstOrElse[MailboxCapacity](null).capacity should ===(999)
}
def `must get default value`(): Unit = {
mailboxFirst.firstOrElse[DispatcherFromExecutor](null) should ===(null)
}
def `must filter out the right things`(): Unit = {
val filtered = mailboxFirst.filterNot[DispatcherSelector]
filtered.firstOrElse[MailboxCapacity](null).capacity should ===(999)
filtered.firstOrElse[DispatcherSelector](null) should ===(null)
}
def `must yield all configs of some type`(): Unit = {
dispatcherFirst.allOf[DispatcherSelector] should ===(DispatcherSelector.default() :: DispatcherSelector.fromConfig("pool") :: Nil)
mailboxFirst.allOf[MailboxCapacity] should ===(List(999, 666).map(MailboxCapacity(_)))
}
}
}

View file

@ -418,10 +418,7 @@ class RestarterSpec extends TypedSpec {
}
object `A restarter (stubbed, native)` extends StubbedTests with NativeSystem
object `A restarter (stubbed, adapted)` extends StubbedTests with AdaptedSystem
object `A restarter (real, native)` extends RealTests with NativeSystem
object `A restarter (real, adapted)` extends RealTests with AdaptedSystem
}

View file

@ -223,7 +223,5 @@ class TimerSpec extends TypedSpec("""
}
}
object `A Restarter (real, native)` extends RealTests with NativeSystem
object `A Restarter (real, adapted)` extends RealTests with AdaptedSystem
}

View file

@ -48,6 +48,7 @@ class TypedSpecSetup extends RefSpec with Matchers with BeforeAndAfterAll with S
* Helper class for writing tests against both ActorSystemImpl and ActorSystemAdapter.
*/
abstract class TypedSpec(val config: Config) extends TypedSpecSetup {
import TypedSpec._
import AskPattern._
@ -58,16 +59,8 @@ abstract class TypedSpec(val config: Config) extends TypedSpecSetup {
// extension point
def setTimeout: Timeout = Timeout(1.minute)
private var nativeSystemUsed = false
lazy val nativeSystem: ActorSystem[TypedSpec.Command] = {
val sys = ActorSystem(guardian(), AkkaSpec.getCallerName(classOf[TypedSpec]), config = Some(config withFallback AkkaSpec.testConf))
nativeSystemUsed = true
sys
}
private var adaptedSystemUsed = false
lazy val system: ActorSystem[TypedSpec.Command] = {
val sys = ActorSystem.adapter(AkkaSpec.getCallerName(classOf[TypedSpec]), guardian(), config = Some(config withFallback AkkaSpec.testConf))
adaptedSystemUsed = true
val sys = ActorSystem(guardian(), AkkaSpec.getCallerName(classOf[TypedSpec]), config = Some(config withFallback AkkaSpec.testConf))
sys
}
@ -85,33 +78,27 @@ abstract class TypedSpec(val config: Config) extends TypedSpecSetup {
}
}
trait NativeSystem {
def system: ActorSystem[TypedSpec.Command] = nativeSystem
}
trait AdaptedSystem {
def system: ActorSystem[TypedSpec.Command] = TypedSpec.this.system
}
implicit val timeout = setTimeout
implicit def scheduler = nativeSystem.scheduler
implicit def scheduler = system.scheduler
lazy val blackhole = await(system ? Create(immutable[Any] { case _ same }, "blackhole"))
override def afterAll(): Unit = {
if (nativeSystemUsed)
Await.result(nativeSystem.terminate, timeout.duration)
if (adaptedSystemUsed)
Await.result(system.terminate, timeout.duration)
Await.result(system.terminate, timeout.duration)
}
// TODO remove after basing on ScalaTest 3 with async support
import akka.testkit._
def await[T](f: Future[T]): T = Await.result(f, timeout.duration * 1.1)
lazy val blackhole = await(nativeSystem ? Create(immutable[Any] { case _ same }, "blackhole"))
def await[T](f: Future[T]): T = Await.result(f, timeout.duration * 1.1)
/**
* Run an Actor-based test. The test procedure is most conveniently
* formulated using the [[StepWise$]] behavior type.
* formulated using the [[StepWise]] behavior type.
*/
def runTest[T: ClassTag](name: String)(behavior: Behavior[T])(implicit system: ActorSystem[Command]): Future[Status] =
system ? (RunTest(name, behavior, _, timeout.duration))
@ -176,6 +163,7 @@ abstract class TypedSpec(val config: Config) extends TypedSpecSetup {
}
object TypedSpec {
import akka.{ typed t }
sealed abstract class Start
@ -246,7 +234,6 @@ class TypedSpecSpec extends TypedSpec {
}
}
object `when using the native implementation` extends CommonTests with NativeSystem
object `when using the adapted implementation` extends CommonTests with AdaptedSystem
}
}

View file

@ -66,6 +66,5 @@ class WatchSpec extends TypedSpec {
}
}
object `Actor monitoring (native)` extends Tests with NativeSystem
object `Actor monitoring (adapted)` extends Tests with AdaptedSystem
}

View file

@ -1,462 +0,0 @@
/**
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.actor.typed
package internal
import akka.actor.typed.scaladsl.Actor
import akka.actor.typed.scaladsl.Actor._
import org.scalactic.ConversionCheckedTripleEquals
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.exceptions.TestFailedException
import org.junit.runner.RunWith
import org.scalatest._
@RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorCellSpec extends Spec with Matchers with BeforeAndAfterAll with ScalaFutures with ConversionCheckedTripleEquals {
val sys = new ActorSystemStub("ActorCellSpec")
def ec = sys.controlledExecutor
object `An ActorCell` {
def `must be creatable`(): Unit = {
val parent = new DebugRef[String](sys.path / "creatable", true)
val cell = new ActorCell(sys, deferred[String](_ {
parent ! "created"
immutable[String] {
case (_, s)
parent ! s
same
}
}), ec, 1000, parent)
debugCell(cell) {
ec.queueSize should ===(0)
cell.sendSystem(Create())
ec.runOne()
ec.queueSize should ===(0)
parent.receiveAll() should ===(Right("created") :: Nil)
cell.send("hello")
ec.runOne()
ec.queueSize should ===(0)
parent.receiveAll() should ===(Right("hello") :: Nil)
}
}
def `must be creatable with ???`(): Unit = {
val parent = new DebugRef[String](sys.path / "creatable???", true)
val self = new DebugRef[String](sys.path / "creatableSelf", true)
val ??? = new NotImplementedError
val cell = new ActorCell(sys, deferred[String](_ { parent ! "created"; throw ??? }), ec, 1000, parent)
cell.setSelf(self)
debugCell(cell) {
ec.queueSize should ===(0)
cell.sendSystem(Create())
ec.runOne()
ec.queueSize should ===(0)
parent.receiveAll() should ===(Right("created") :: Nil)
// explicitly verify termination via self-signal
self.receiveAll() should ===(Left(Terminate()) :: Nil)
cell.sendSystem(Terminate())
ec.runOne()
ec.queueSize should ===(0)
parent.receiveAll() should ===(Left(DeathWatchNotification(self, ???)) :: Nil)
}
}
def `must be able to terminate after construction`(): Unit = {
val parent = new DebugRef[String](sys.path / "terminate", true)
val self = new DebugRef[String](sys.path / "terminateSelf", true)
val cell = new ActorCell(sys, deferred[String](_ { parent ! "created"; stopped }), ec, 1000, parent)
cell.setSelf(self)
debugCell(cell) {
ec.queueSize should ===(0)
cell.sendSystem(Create())
ec.runOne()
ec.queueSize should ===(0)
parent.receiveAll() should ===(Right("created") :: Nil)
// explicitly verify termination via self-signal
self.receiveAll() should ===(Left(Terminate()) :: Nil)
cell.sendSystem(Terminate())
ec.runOne()
ec.queueSize should ===(0)
parent.receiveAll() should ===(Left(DeathWatchNotification(self, null)) :: Nil)
}
}
def `must be able to terminate after being started`(): Unit = {
val parent = new DebugRef[String](sys.path / "terminate", true)
val self = new DebugRef[String](sys.path / "terminateSelf", true)
val cell = new ActorCell(sys, deferred[String](_ { parent ! "created"; stopped }), ec, 1000, parent)
cell.setSelf(self)
debugCell(cell) {
ec.queueSize should ===(0)
cell.sendSystem(Create())
ec.runOne()
ec.queueSize should ===(0)
parent.receiveAll() should ===(Right("created") :: Nil)
// explicitly verify termination via self-signal
self.receiveAll() should ===(Left(Terminate()) :: Nil)
cell.sendSystem(Terminate())
ec.runOne()
ec.queueSize should ===(0)
parent.receiveAll() should ===(Left(DeathWatchNotification(self, null)) :: Nil)
}
}
def `must terminate upon failure during processing`(): Unit = {
val parent = new DebugRef[String](sys.path / "terminate", true)
val self = new DebugRef[String](sys.path / "terminateSelf", true)
val ex = new AssertionError
val behavior = deferred[String](_ { parent ! "created"; immutable[String] { case (s, _) throw ex } })
val cell = new ActorCell(sys, behavior, ec, 1000, parent)
cell.setSelf(self)
debugCell(cell) {
ec.queueSize should ===(0)
cell.sendSystem(Create())
ec.runOne()
ec.queueSize should ===(0)
parent.receiveAll() should ===(Right("created") :: Nil)
cell.send("")
ec.runOne()
ec.queueSize should ===(0)
// explicitly verify termination via self-signal
self.receiveAll() should ===(Left(Terminate()) :: Nil)
cell.sendSystem(Terminate())
ec.runOne()
ec.queueSize should ===(0)
parent.receiveAll() should ===(Left(DeathWatchNotification(self, ex)) :: Nil)
}
}
def `must signal failure when starting behavior is "same"`(): Unit = {
val parent = new DebugRef[String](sys.path / "startSame", true)
val self = new DebugRef[String](sys.path / "startSameSelf", true)
val cell = new ActorCell(sys, deferred[String](_ { parent ! "created"; same[String] }), ec, 1000, parent)
cell.setSelf(self)
debugCell(cell) {
ec.queueSize should ===(0)
cell.sendSystem(Create())
ec.runOne()
ec.queueSize should ===(0)
parent.receiveAll() should ===(Right("created") :: Nil)
// explicitly verify termination via self-signal
self.receiveAll() should ===(Left(Terminate()) :: Nil)
cell.sendSystem(Terminate())
ec.runOne()
ec.queueSize should ===(0)
parent.receiveAll() match {
case Left(DeathWatchNotification(`self`, exc)) :: Nil
exc should not be null
exc shouldBe an[IllegalArgumentException]
exc.getMessage should include("Same")
case other fail(s"$other was not a DeathWatchNotification")
}
}
}
def `must signal failure when starting behavior is "unhandled"`(): Unit = {
val parent = new DebugRef[String](sys.path / "startSame", true)
val self = new DebugRef[String](sys.path / "startSameSelf", true)
val cell = new ActorCell(sys, deferred[String](_ { parent ! "created"; unhandled[String] }), ec, 1000, parent)
cell.setSelf(self)
debugCell(cell) {
ec.queueSize should ===(0)
cell.sendSystem(Create())
ec.runOne()
ec.queueSize should ===(0)
parent.receiveAll() should ===(Right("created") :: Nil)
// explicitly verify termination via self-signal
self.receiveAll() should ===(Left(Terminate()) :: Nil)
cell.sendSystem(Terminate())
ec.runOne()
ec.queueSize should ===(0)
parent.receiveAll() match {
case Left(DeathWatchNotification(`self`, exc)) :: Nil
exc should not be null
exc shouldBe an[IllegalArgumentException]
exc.getMessage should include("Unhandled")
case other fail(s"$other was not a DeathWatchNotification")
}
}
}
/*
* also tests:
* - must reschedule for self-message
* - must not reschedule for message when already activated
* - must not reschedule for signal when already activated
*/
def `must not execute more messages than were batched naturally`(): Unit = {
val parent = new DebugRef[String](sys.path / "batching", true)
val cell = new ActorCell(sys, deferred[String] { ctx
immutable[String] {
case (_, s)
ctx.self ! s
parent ! s
same
}
}, ec, 1000, parent)
val ref = new LocalActorRef(parent.path / "child", cell)
cell.setSelf(ref)
debugCell(cell) {
ec.queueSize should ===(0)
cell.sendSystem(Create())
ec.runOne()
ec.queueSize should ===(0)
parent.receiveAll() should ===(Nil)
cell.send("one")
cell.send("two")
ec.queueSize should ===(1)
ec.runOne()
ec.queueSize should ===(1)
parent.receiveAll() should ===(Right("one") :: Right("two") :: Nil)
ec.runOne()
ec.queueSize should ===(1)
parent.receiveAll() should ===(Right("one") :: Right("two") :: Nil)
cell.send("three")
ec.runOne()
ec.queueSize should ===(1)
parent.receiveAll() should ===(Right("one") :: Right("two") :: Right("three") :: Nil)
cell.sendSystem(Terminate())
ec.queueSize should ===(1)
ec.runOne()
ec.queueSize should ===(0)
parent.receiveAll() should ===(Left(DeathWatchNotification(ref, null)) :: Nil)
}
}
def `must signal DeathWatch when terminating normally`(): Unit = {
val parent = new DebugRef[String](sys.path / "watchNormal", true)
val client = new DebugRef[String](parent.path / "client", true)
val cell = new ActorCell(sys, Actor.empty[String], ec, 1000, parent)
val ref = new LocalActorRef(parent.path / "child", cell)
cell.setSelf(ref)
debugCell(cell) {
ec.queueSize should ===(0)
cell.sendSystem(Watch(ref, client))
cell.sendSystem(Terminate())
ec.runOne()
ec.queueSize should ===(0)
parent.receiveAll() should ===(Left(DeathWatchNotification(ref, null)) :: Nil)
client.receiveAll() should ===(Left(DeathWatchNotification(ref, null)) :: Nil)
}
}
/*
* also tests:
* - must turn a DeathWatchNotification into a Terminated signal while watching
* - must terminate with DeathPactException when not handling a Terminated signal
* - must send a Watch message when watching another actor
*/
def `must signal DeathWatch when terminating abnormally`(): Unit = {
val parent = new DebugRef[String](sys.path / "watchAbnormal", true)
val client = new DebugRef[String](parent.path / "client", true)
val other = new DebugRef[String](parent.path / "other", true)
val cell = new ActorCell(sys, deferred[String] { ctx ctx.watch(parent); Actor.empty }, ec, 1000, parent)
val ref = new LocalActorRef(parent.path / "child", cell)
cell.setSelf(ref)
debugCell(cell) {
ec.queueSize should ===(0)
cell.sendSystem(Create())
ec.runOne()
ec.queueSize should ===(0)
parent.receiveAll() should ===(Left(Watch(parent, ref)) :: Nil)
// test that unwatched termination is ignored
cell.sendSystem(DeathWatchNotification(other, null))
ec.runOne()
ec.queueSize should ===(0)
parent.receiveAll() should ===(Nil)
// now trigger failure by death pact
cell.sendSystem(Watch(ref, client))
cell.sendSystem(DeathWatchNotification(parent, null))
ec.runOne()
ec.queueSize should ===(0)
parent.receiveAll() match {
case Left(DeathWatchNotification(ref, exc)) :: Nil
exc should not be null
exc shouldBe a[DeathPactException]
case other fail(s"$other was not a DeathWatchNotification")
}
client.receiveAll() should ===(Left(DeathWatchNotification(ref, null)) :: Nil)
}
}
def `must signal DeathWatch when watching after termination`(): Unit = {
val parent = new DebugRef[String](sys.path / "watchLate", true)
val client = new DebugRef[String](parent.path / "client", true)
val cell = new ActorCell(sys, stopped[String], ec, 1000, parent)
val ref = new LocalActorRef(parent.path / "child", cell)
cell.setSelf(ref)
debugCell(cell) {
ec.queueSize should ===(0)
cell.sendSystem(Create())
ec.runOne()
ec.queueSize should ===(0)
parent.receiveAll() should ===(Left(DeathWatchNotification(ref, null)) :: Nil)
cell.sendSystem(Watch(ref, client))
ec.queueSize should ===(0)
sys.deadLettersInbox.receiveAll() should ===(Left(Watch(ref, client)) :: Nil)
// correct behavior of deadLetters is verified in ActorSystemSpec
}
}
def `must signal DeathWatch when watching after termination but before deactivation`(): Unit = {
val parent = new DebugRef[String](sys.path / "watchSomewhatLate", true)
val client = new DebugRef[String](parent.path / "client", true)
val cell = new ActorCell(sys, Actor.empty[String], ec, 1000, parent)
val ref = new LocalActorRef(parent.path / "child", cell)
cell.setSelf(ref)
debugCell(cell) {
ec.queueSize should ===(0)
cell.sendSystem(Create())
ec.runOne()
ec.queueSize should ===(0)
cell.sendSystem(Terminate())
cell.sendSystem(Watch(ref, client))
ec.runOne()
ec.queueSize should ===(0)
parent.receiveAll() should ===(Left(DeathWatchNotification(ref, null)) :: Nil)
sys.deadLettersInbox.receiveAll() should ===(Left(Watch(ref, client)) :: Nil)
}
}
def `must not signal DeathWatch after Unwatch has been processed`(): Unit = {
val parent = new DebugRef[String](sys.path / "watchUnwatch", true)
val client = new DebugRef[String](parent.path / "client", true)
val cell = new ActorCell(sys, Actor.empty[String], ec, 1000, parent)
val ref = new LocalActorRef(parent.path / "child", cell)
cell.setSelf(ref)
debugCell(cell) {
ec.queueSize should ===(0)
cell.sendSystem(Watch(ref, client))
cell.sendSystem(Unwatch(ref, client))
cell.sendSystem(Terminate())
ec.runOne()
ec.queueSize should ===(0)
parent.receiveAll() should ===(Left(DeathWatchNotification(ref, null)) :: Nil)
client.receiveAll() should ===(Nil)
}
}
def `must send messages to deadLetters after being terminated`(): Unit = {
val parent = new DebugRef[String](sys.path / "sendDeadLetters", true)
val cell = new ActorCell(sys, stopped[String], ec, 1000, parent)
val ref = new LocalActorRef(parent.path / "child", cell)
cell.setSelf(ref)
debugCell(cell) {
ec.queueSize should ===(0)
cell.sendSystem(Create())
ec.runOne()
ec.queueSize should ===(0)
parent.receiveAll() should ===(Left(DeathWatchNotification(ref, null)) :: Nil)
cell.send("42")
ec.queueSize should ===(0)
sys.deadLettersInbox.receiveAll() should ===(Right("42") :: Nil)
}
}
/*
* also tests:
* - child creation
*/
def `must not terminate before children have terminated`(): Unit = {
val parent = new DebugRef[ActorRef[Nothing]](sys.path / "waitForChild", true)
val cell = new ActorCell(sys, deferred[String] { ctx
ctx.spawn(deferred[String] { ctx parent ! ctx.self; Actor.empty }, "child")
Actor.empty
}, ec, 1000, parent)
val ref = new LocalActorRef(parent.path / "child", cell)
cell.setSelf(ref)
debugCell(cell) {
ec.queueSize should ===(0)
cell.sendSystem(Create())
ec.runOne() // creating subject
parent.hasSomething should ===(false)
ec.runOne() // creating child
ec.queueSize should ===(0)
val child = parent.receiveAll() match {
case Right(child) :: Nil
child.sorryForNothing.sendSystem(Watch(child, parent))
child
case other fail(s"$other was not List(Right(<child>))")
}
ec.runOne()
ec.queueSize should ===(0)
cell.sendSystem(Terminate())
ec.runOne() // begin subject termination, will initiate child termination
parent.hasSomething should ===(false)
ec.runOne() // terminate child
parent.receiveAll() should ===(Left(DeathWatchNotification(child, null)) :: Nil)
ec.runOne() // terminate subject
parent.receiveAll() should ===(Left(DeathWatchNotification(ref, null)) :: Nil)
}
}
def `must properly terminate if failing while handling Terminated for child actor`(): Unit = {
val parent = new DebugRef[ActorRef[Nothing]](sys.path / "terminateWhenDeathPact", true)
val cell = new ActorCell(sys, deferred[String] { ctx
ctx.watch(ctx.spawn(deferred[String] { ctx parent ! ctx.self; Actor.empty }, "child"))
Actor.empty
}, ec, 1000, parent)
val ref = new LocalActorRef(parent.path / "child", cell)
cell.setSelf(ref)
debugCell(cell) {
ec.queueSize should ===(0)
cell.sendSystem(Create())
ec.runOne() // creating subject
parent.hasSomething should ===(false)
ec.runOne() // creating child
ec.queueSize should ===(0)
val child = parent.receiveAll() match {
case Right(child: ActorRefImpl[Nothing]) :: Nil
child.sendSystem(Watch(child, parent))
child
case other fail(s"$other was not List(Right(<child>))")
}
ec.runOne()
ec.queueSize should ===(0)
child.sendSystem(Terminate())
ec.runOne() // child terminates and enqueues DeathWatchNotification
parent.receiveAll() should ===(Left(DeathWatchNotification(child, null)) :: Nil)
ec.runOne() // cell fails during Terminated and terminates with DeathPactException
parent.receiveAll() match {
case Left(DeathWatchNotification(`ref`, ex: DeathPactException)) :: Nil
ex.getMessage should include("death pact")
case other fail(s"$other was not Left(DeathWatchNotification($ref, DeathPactException))")
}
ec.queueSize should ===(0)
}
}
def `must not terminate twice if failing in PostStop`(): Unit = {
val parent = new DebugRef[String](sys.path / "terminateProperlyPostStop", true)
val cell = new ActorCell(sys, immutable[String] {
case _ unhandled
} onSignal {
case (_, PostStop) ???
}, ec, 1000, parent)
val ref = new LocalActorRef(parent.path / "child", cell)
cell.setSelf(ref)
debugCell(cell) {
ec.queueSize should ===(0)
cell.sendSystem(Create())
ec.runOne()
ec.queueSize should ===(0)
cell.sendSystem(Terminate())
ec.runOne()
ec.queueSize should ===(0)
parent.receiveAll() should ===(Left(DeathWatchNotification(ref, null)) :: Nil)
}
}
}
private def debugCell[T, U](cell: ActorCell[T])(block: U): U =
try block
catch {
case ex: TestFailedException
println(cell)
throw ex
}
}

View file

@ -105,46 +105,10 @@ class ActorSystemSpec extends Spec with Matchers with BeforeAndAfterAll with Sca
}
}
}
}
object `An ActorSystemImpl` extends CommonTests {
def system[T](behavior: Behavior[T], name: String) = ActorSystem(behavior, name)
def suite = "native"
// this is essential to complete ActorCellSpec, see there
def `must correctly treat Watch dead letters`(): Unit =
withSystem("deadletters", Actor.empty[String]) { sys
val client = new DebugRef[Int](sys.path / "debug", true)
sys.deadLetters.sorry.sendSystem(Watch(sys, client))
client.receiveAll() should ===(Left(DeathWatchNotification(sys, null)) :: Nil)
}
def `must start system actors and mangle their names`(): Unit = {
withSystem("systemActorOf", Actor.empty[String]) { sys
import akka.actor.typed.scaladsl.AskPattern._
implicit val timeout = Timeout(1.second)
implicit val sched = sys.scheduler
case class Doner(ref: ActorRef[Done])
val ref1, ref2 = sys.systemActorOf(immutable[Doner] {
case (_, doner)
doner.ref ! Done
same
}, "empty").futureValue
(ref1 ? Doner).futureValue should ===(Done)
(ref2 ? Doner).futureValue should ===(Done)
val RE = "(\\d+)-empty".r
val RE(num1) = ref1.path.name.toString
val RE(num2) = ref2.path.name.toString
num2.toInt should be > num1.toInt
}
}
}
object `An ActorSystemAdapter` extends CommonTests {
def system[T](behavior: Behavior[T], name: String) = ActorSystem.adapter(name, behavior)
def system[T](behavior: Behavior[T], name: String) = ActorSystem(behavior, name)
def suite = "adapter"
}
}

View file

@ -10,6 +10,7 @@ import scala.concurrent._
import com.typesafe.config.ConfigFactory
import java.util.concurrent.ThreadFactory
import akka.event.Logging
import akka.typed.{ BusLogging, DefaultLoggingFilter, EventStream }
import akka.util.Timeout
@ -37,7 +38,14 @@ private[typed] class ActorSystemStub(val name: String)
}
override def dynamicAccess: a.DynamicAccess = new a.ReflectiveDynamicAccess(getClass.getClassLoader)
override def eventStream: EventStream = new EventStreamImpl(true)(settings.untyped.LoggerStartTimeout)
override def eventStream: EventStream = new EventStream {
override def subscribe[T](subscriber: ActorRef[T], to: Class[T]) = false
override def setLogLevel(loglevel: Logging.LogLevel): Unit = {}
override def logLevel = Logging.InfoLevel
override def unsubscribe[T](subscriber: ActorRef[T], from: Class[T]) = false
override def unsubscribe[T](subscriber: ActorRef[T]): Unit = {}
override def publish[T](event: T): Unit = {}
}
override def logFilter: e.LoggingFilter = new DefaultLoggingFilter(settings, eventStream)
override def log: e.LoggingAdapter = new BusLogging(eventStream, path.parent.toString, getClass, logFilter)
override def logConfiguration(): Unit = log.info(settings.toString)

View file

@ -1,324 +0,0 @@
/**
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.actor.typed
package internal
import akka.Done
import akka.event.Logging._
import akka.actor.typed.scaladsl.Actor._
import akka.actor.typed.scaladsl.AskPattern._
import akka.typed.testkit.Inbox
import akka.typed.Logger
import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.Eventually
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import scala.concurrent.duration._
object EventStreamSpec {
@volatile var logged = Vector.empty[LogEvent]
class MyLogger extends Logger {
def initialBehavior: Behavior[Logger.Command] =
immutable {
case (ctx, Logger.Initialize(es, replyTo))
val logger = ctx.spawn(immutable[LogEvent] { (_, ev: LogEvent)
logged :+= ev
same
}, "logger")
ctx.watch(logger)
replyTo ! logger
empty
}
}
val config = ConfigFactory.parseString("""
akka.typed.loggers = ["akka.actor.typed.internal.EventStreamSpec$MyLogger"]
""")
// class hierarchy for subchannel test
class A
class B2 extends A
class B3 extends A
class C extends B2
trait T
trait AT extends T
trait ATT extends AT
trait BT extends T
trait BTT extends BT
class CC
class CCATBT extends CC with ATT with BTT
}
class EventStreamSpec extends TypedSpec(EventStreamSpec.config) with Eventually {
import EventStreamSpec._
object `An EventStreamImpl` {
val es = nativeSystem.eventStream
val root = nativeSystem.path
def `must work in full system`(): Unit = {
es.logLevel should ===(WarningLevel)
nativeSystem.log.error("hello world")
nativeSystem.log.debug("should not see this")
es.setLogLevel(DebugLevel)
es.logLevel should ===(DebugLevel)
nativeSystem.log.debug("hello world DEBUG")
nativeSystem.log.info("hello world INFO")
eventually(logged.map(_.message) should ===(Vector("hello world", "hello world DEBUG", "hello world INFO")))
logged = Vector.empty
}
def `must manage subscribers`(): Unit = {
val box = Inbox[AnyRef]("manage")
val ref: ActorRef[String] = box.ref
es.subscribe(ref, classOf[String]) should ===(true)
es.publish("hello")
es.unsubscribe(ref)
es.publish("my")
es.subscribe(ref, classOf[String]) should ===(true)
es.publish("lovely")
es.unsubscribe(ref, classOf[String]) should ===(true)
es.publish("quaint")
es.subscribe(ref, classOf[String]) should ===(true)
es.publish("little")
es.unsubscribe(box.ref, classOf[AnyRef]) should ===(true)
es.publish("grey")
es.subscribe(ref, classOf[String]) should ===(true)
es.publish("world")
box.receiveAll() should ===(Seq[AnyRef]("hello", "lovely", "little", "world"))
}
def `must care about types`(): Unit = {
val ref = Inbox[String]("types").ref
"es.subscribe(ref, classOf[AnyRef])" shouldNot typeCheck
"es.unsubscribe(ref, classOf[AnyRef])" shouldNot typeCheck
}
def `must manage subchannels using classes`(): Unit = {
val box = Inbox[A]("subchannelclass")
val a = new A
val b1 = new B2
val b2 = new B3
val c = new C
es.subscribe(box.ref, classOf[B3]) should ===(true)
es.publish(c)
es.publish(b2)
box.receiveMsg() should ===(b2)
es.subscribe(box.ref, classOf[A]) should ===(true)
es.publish(c)
box.receiveMsg() should ===(c)
es.publish(b1)
box.receiveMsg() should ===(b1)
es.unsubscribe(box.ref, classOf[B2]) should ===(true)
es.publish(c)
es.publish(b2)
es.publish(a)
box.receiveMsg() should ===(b2)
box.receiveMsg() should ===(a)
box.hasMessages should ===(false)
}
def `must manage sub-channels using classes and traits (update on subscribe)`(): Unit = {
val tm1 = new CC
val tm2 = new CCATBT
val a1 = Inbox[AT]("subAT")
val a2 = Inbox[BT]("subBT")
val a3 = Inbox[CC]("subCC")
val a4 = Inbox[CCATBT]("subCCATBT")
es.subscribe(a1.ref, classOf[AT]) should ===(true)
es.subscribe(a2.ref, classOf[BT]) should ===(true)
es.subscribe(a3.ref, classOf[CC]) should ===(true)
es.subscribe(a4.ref, classOf[CCATBT]) should ===(true)
es.publish(tm1)
es.publish(tm2)
a1.receiveMsg() should ===(tm2)
a2.receiveMsg() should ===(tm2)
a3.receiveMsg() should ===(tm1)
a3.receiveMsg() should ===(tm2)
a4.receiveMsg() should ===(tm2)
es.unsubscribe(a1.ref, classOf[AT]) should ===(true)
es.unsubscribe(a2.ref, classOf[BT]) should ===(true)
es.unsubscribe(a3.ref, classOf[CC]) should ===(true)
es.unsubscribe(a4.ref, classOf[CCATBT]) should ===(true)
}
def `must manage sub-channels using classes and traits (update on unsubscribe)`(): Unit = {
val tm1 = new CC
val tm2 = new CCATBT
val a1 = Inbox[AT]("subAT")
val a2 = Inbox[BT]("subBT")
val a3 = Inbox[CC]("subCC")
val a4 = Inbox[CCATBT]("subCCATBT")
es.subscribe(a1.ref, classOf[AT]) should ===(true)
es.subscribe(a2.ref, classOf[BT]) should ===(true)
es.subscribe(a3.ref, classOf[CC]) should ===(true)
es.subscribe(a4.ref, classOf[CCATBT]) should ===(true)
es.unsubscribe(a3.ref, classOf[CC]) should ===(true)
es.publish(tm1)
es.publish(tm2)
a1.receiveMsg() should ===(tm2)
a2.receiveMsg() should ===(tm2)
a3.hasMessages should ===(false)
a4.receiveMsg() should ===(tm2)
es.unsubscribe(a1.ref, classOf[AT]) should ===(true)
es.unsubscribe(a2.ref, classOf[BT]) should ===(true)
es.unsubscribe(a4.ref, classOf[CCATBT]) should ===(true)
}
def `must manage sub-channels using classes and traits (update on unsubscribe all)`(): Unit = {
val tm1 = new CC
val tm2 = new CCATBT
val a1 = Inbox[AT]("subAT")
val a2 = Inbox[BT]("subBT")
val a3 = Inbox[CC]("subCC")
val a4 = Inbox[CCATBT]("subCCATBT")
es.subscribe(a1.ref, classOf[AT]) should ===(true)
es.subscribe(a2.ref, classOf[BT]) should ===(true)
es.subscribe(a3.ref, classOf[CC]) should ===(true)
es.subscribe(a4.ref, classOf[CCATBT]) should ===(true)
es.unsubscribe(a3.ref)
es.publish(tm1)
es.publish(tm2)
a1.receiveMsg() should ===(tm2)
a2.receiveMsg() should ===(tm2)
a3.hasMessages should ===(false)
a4.receiveMsg() should ===(tm2)
es.unsubscribe(a1.ref, classOf[AT]) should ===(true)
es.unsubscribe(a2.ref, classOf[BT]) should ===(true)
es.unsubscribe(a4.ref, classOf[CCATBT]) should ===(true)
}
def `must manage sub-channels using classes and traits (update on publish)`(): Unit = {
val tm1 = new CC
val tm2 = new CCATBT
val a1 = Inbox[AT]("subAT")
val a2 = Inbox[BT]("subBT")
es.subscribe(a1.ref, classOf[AT]) should ===(true)
es.subscribe(a2.ref, classOf[BT]) should ===(true)
es.publish(tm1)
es.publish(tm2)
a1.receiveMsg() should ===(tm2)
a2.receiveMsg() should ===(tm2)
es.unsubscribe(a1.ref, classOf[AT]) should ===(true)
es.unsubscribe(a2.ref, classOf[BT]) should ===(true)
}
def `must manage sub-channels using classes and traits (unsubscribe classes used with trait)`(): Unit = {
val tm1 = new CC
val tm2 = new CCATBT
val a1 = Inbox[AT]("subAT")
val a2 = Inbox[AnyRef]("subBT")
val a3 = Inbox[CC]("subCC")
es.subscribe(a1.ref, classOf[AT]) should ===(true)
es.subscribe(a2.ref, classOf[BT]) should ===(true)
es.subscribe(a2.ref, classOf[CC]) should ===(true)
es.subscribe(a3.ref, classOf[CC]) should ===(true)
es.unsubscribe(a2.ref, classOf[CC]) should ===(true)
es.unsubscribe(a3.ref, classOf[CCATBT]) should ===(true)
es.publish(tm1)
es.publish(tm2)
a1.receiveMsg() should ===(tm2)
a2.receiveMsg() should ===(tm2)
a3.receiveMsg() should ===(tm1)
es.unsubscribe(a1.ref, classOf[AT]) should ===(true)
es.unsubscribe(a2.ref, classOf[BT]) should ===(true)
es.unsubscribe(a3.ref, classOf[CC]) should ===(true)
}
def `must manage sub-channels using classes and traits (subscribe after publish)`(): Unit = {
val tm1 = new CCATBT
val a1 = Inbox[AT]("subAT")
val a2 = Inbox[BTT]("subBTT")
es.subscribe(a1.ref, classOf[AT]) should ===(true)
es.publish(tm1)
a1.receiveMsg() should ===(tm1)
a2.hasMessages should ===(false)
es.subscribe(a2.ref, classOf[BTT]) should ===(true)
es.publish(tm1)
a1.receiveMsg() should ===(tm1)
a2.receiveMsg() should ===(tm1)
es.unsubscribe(a1.ref, classOf[AT]) should ===(true)
es.unsubscribe(a2.ref, classOf[BTT]) should ===(true)
}
def `must watch subscribers`(): Unit = {
val ref = new DebugRef[String](root / "watch", true)
es.subscribe(ref, classOf[String]) should ===(true)
es.subscribe(ref, classOf[String]) should ===(false)
eventually(ref.hasSignal should ===(true))
val unsubscriber = ref.receiveSignal() match {
case Watch(`ref`, watcher) watcher
case other fail(s"expected Watch(), got $other")
}
ref.hasSomething should ===(false)
unsubscriber.sorryForNothing.sendSystem(DeathWatchNotification(ref, null))
eventually(es.subscribe(ref, classOf[String]) should ===(true))
}
def `must unsubscribe an actor upon termination`(): Unit = {
val ref = nativeSystem ? TypedSpec.Create(immutable[Done] { case _ stopped }, "tester") futureValue Timeout(1.second)
es.subscribe(ref, classOf[Done]) should ===(true)
es.subscribe(ref, classOf[Done]) should ===(false)
ref ! Done
eventually(es.subscribe(ref, classOf[Done]) should ===(true))
}
def `must unsubscribe the actor, when it subscribes already in terminated state`(): Unit = {
val ref = nativeSystem ? TypedSpec.Create(stopped[Done], "tester") futureValue Timeout(1.second)
val wait = new DebugRef[Done](root / "wait", true)
ref.sorry.sendSystem(Watch(ref, wait))
eventually(wait.hasSignal should ===(true))
wait.receiveSignal() should ===(DeathWatchNotification(ref, null))
es.subscribe(ref, classOf[Done]) should ===(true)
eventually(es.subscribe(ref, classOf[Done]) should ===(true))
}
def `must unwatch an actor from unsubscriber when that actor unsubscribes from the stream`(): Unit = {
val ref = new DebugRef[String](root / "watch", true)
es.subscribe(ref, classOf[String]) should ===(true)
es.subscribe(ref, classOf[String]) should ===(false)
eventually(ref.hasSignal should ===(true))
val unsubscriber = ref.receiveSignal() match {
case Watch(`ref`, watcher) watcher
case other fail(s"expected Watch(), got $other")
}
ref.hasSomething should ===(false)
es.unsubscribe(ref)
eventually(ref.hasSignal should ===(true))
ref.receiveSignal() should ===(Unwatch(ref, unsubscriber))
}
def `must unwatch an actor from unsubscriber when that actor unsubscribes from channels it subscribed`(): Unit = {
val ref = new DebugRef[AnyRef](root / "watch", true)
es.subscribe(ref, classOf[String]) should ===(true)
es.subscribe(ref, classOf[String]) should ===(false)
es.subscribe(ref, classOf[Integer]) should ===(true)
es.subscribe(ref, classOf[Integer]) should ===(false)
eventually(ref.hasSignal should ===(true))
val unsubscriber = ref.receiveSignal() match {
case Watch(`ref`, watcher) watcher
case other fail(s"expected Watch(), got $other")
}
ref.hasSomething should ===(false)
es.unsubscribe(ref, classOf[Integer])
Thread.sleep(50)
ref.hasSomething should ===(false)
es.unsubscribe(ref, classOf[String])
eventually(ref.hasSignal should ===(true))
ref.receiveSignal() should ===(Unwatch(ref, unsubscriber))
}
}
}

View file

@ -1,176 +0,0 @@
/**
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.actor.typed
package internal
import akka.actor.InvalidMessageException
import scala.concurrent.{ Future, Promise }
class FunctionRefSpec extends TypedSpecSetup {
object `A FunctionRef` {
def `must forward messages that are received after getting the ActorRef (completed later)`(): Unit = {
val p = Promise[ActorRef[String]]
val ref = ActorRef(p.future)
val target = new DebugRef[String](ref.path / "target", true)
p.success(target)
ref ! "42"
ref ! "43"
target.receiveAll() should ===(Left(Watch(target, ref)) :: Right("42") :: Right("43") :: Nil)
}
def `must forward messages that are received after getting the ActorRef (already completed)`(): Unit = {
val target = new DebugRef[String](ActorRef.FuturePath / "target", true)
val f = Future.successful(target)
val ref = ActorRef(f)
ref ! "42"
ref ! "43"
target.receiveAll() should ===(Right("42") :: Right("43") :: Nil)
}
def `must forward messages that are received before getting the ActorRef`(): Unit = {
val p = Promise[ActorRef[String]]
val ref = ActorRef(p.future)
ref ! "42"
ref ! "43"
val target = new DebugRef[String](ref.path / "target", true)
p.success(target)
target.receiveAll() should ===(Right("42") :: Right("43") :: Left(Watch(target, ref)) :: Nil)
}
def `must notify watchers when the future fails`(): Unit = {
val p = Promise[ActorRef[String]]
val ref = ActorRef(p.future)
val client1 = new DebugRef(ref.path / "c1", true)
ref.sorry.sendSystem(Watch(ref, client1))
client1.hasSomething should ===(false)
p.failure(new Exception)
client1.receiveSignal() should ===(DeathWatchNotification(ref, null))
client1.hasSomething should ===(false)
val client2 = new DebugRef(ref.path / "c2", true)
ref.sorry.sendSystem(Watch(ref, client2))
client2.receiveSignal() should ===(DeathWatchNotification(ref, null))
client2.hasSomething should ===(false)
client1.hasSomething should ===(false)
}
def `must notify watchers when terminated`(): Unit = {
val p = Promise[ActorRef[String]]
val ref = ActorRef(p.future)
val client1 = new DebugRef(ref.path / "c1", true)
ref.sorry.sendSystem(Watch(ref, client1))
client1.hasSomething should ===(false)
ref.sorry.sendSystem(Terminate())
client1.receiveSignal() should ===(DeathWatchNotification(ref, null))
client1.hasSomething should ===(false)
val client2 = new DebugRef(ref.path / "c2", true)
ref.sorry.sendSystem(Watch(ref, client2))
client2.receiveSignal() should ===(DeathWatchNotification(ref, null))
client2.hasSomething should ===(false)
client1.hasSomething should ===(false)
}
def `must notify watchers when terminated after receiving the target`(): Unit = {
val p = Promise[ActorRef[String]]
val ref = ActorRef(p.future)
val client1 = new DebugRef(ref.path / "c1", true)
ref.sorry.sendSystem(Watch(ref, client1))
client1.hasSomething should ===(false)
val target = new DebugRef[String](ref.path / "target", true)
p.success(target)
ref ! "42"
ref ! "43"
target.receiveAll() should ===(Left(Watch(target, ref)) :: Right("42") :: Right("43") :: Nil)
ref.sorry.sendSystem(Terminate())
client1.receiveSignal() should ===(DeathWatchNotification(ref, null))
client1.hasSomething should ===(false)
target.receiveAll() should ===(Left(Unwatch(target, ref)) :: Nil)
val client2 = new DebugRef(ref.path / "c2", true)
ref.sorry.sendSystem(Watch(ref, client2))
client2.receiveSignal() should ===(DeathWatchNotification(ref, null))
client2.hasSomething should ===(false)
client1.hasSomething should ===(false)
}
def `must notify watchers when receiving the target after terminating`(): Unit = {
val p = Promise[ActorRef[String]]
val ref = ActorRef(p.future)
val client1 = new DebugRef(ref.path / "c1", true)
ref.sorry.sendSystem(Watch(ref, client1))
client1.hasSomething should ===(false)
ref.sorry.sendSystem(Terminate())
client1.receiveSignal() should ===(DeathWatchNotification(ref, null))
client1.hasSomething should ===(false)
val target = new DebugRef[String](ref.path / "target", true)
p.success(target)
ref ! "42"
ref ! "43"
target.hasSomething should ===(false)
val client2 = new DebugRef(ref.path / "c2", true)
ref.sorry.sendSystem(Watch(ref, client2))
client2.receiveSignal() should ===(DeathWatchNotification(ref, null))
client2.hasSomething should ===(false)
client1.hasSomething should ===(false)
}
def `must notify watchers when the target ActorRef terminates`(): Unit = {
val p = Promise[ActorRef[String]]
val ref = ActorRef(p.future)
val client1 = new DebugRef(ref.path / "c1", true)
ref.sorry.sendSystem(Watch(ref, client1))
client1.hasSomething should ===(false)
val target = new DebugRef[String](ref.path / "target", true)
p.success(target)
ref ! "42"
ref ! "43"
target.receiveAll() should ===(Left(Watch(target, ref)) :: Right("42") :: Right("43") :: Nil)
ref.sorry.sendSystem(DeathWatchNotification(target, null))
client1.receiveSignal() should ===(DeathWatchNotification(ref, null))
client1.hasSomething should ===(false)
target.hasSomething should ===(false)
val client2 = new DebugRef(ref.path / "c2", true)
ref.sorry.sendSystem(Watch(ref, client2))
client2.receiveSignal() should ===(DeathWatchNotification(ref, null))
client2.hasSomething should ===(false)
client1.hasSomething should ===(false)
}
def `must not allow null messages`(): Unit = {
val p = Promise[ActorRef[String]]
val ref = ActorRef(p.future)
intercept[InvalidMessageException] {
ref ! null
}
}
}
}

View file

@ -183,7 +183,5 @@ class LocalReceptionistSpec extends TypedSpec with Eventually {
}
object `A Receptionist (native)` extends CommonTests with NativeSystem
object `A Receptionist (adapted)` extends CommonTests with AdaptedSystem
}

View file

@ -13,10 +13,6 @@ import scala.concurrent.duration.DurationInt
@RunWith(classOf[JUnitRunner])
final class ImmutablePartialSpec extends TypedSpec {
final object `An Actor.immutablePartial behavior (native)`
extends Tests
with NativeSystem
final object `An Actor.immutablePartial behavior (adapted)`
extends Tests
with AdaptedSystem

View file

@ -13,8 +13,6 @@ import org.scalatest.junit.JUnitRunner
@RunWith(classOf[JUnitRunner])
final class OnSignalSpec extends TypedSpec {
final object `An Actor.onSignal behavior (native)` extends Tests with NativeSystem
final object `An Actor.onSignal behavior (adapted)` extends Tests with AdaptedSystem
trait Tests extends StartSupport {

View file

@ -1,7 +1,7 @@
/**
* Copyright (C) 2014-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package docs.akka.actor.typed
package docs.akka.typed
//#imports
import akka.actor.typed._
@ -118,22 +118,24 @@ class IntroSpec extends TypedSpec {
//#chatroom-gabbler
//#chatroom-main
val main: Behavior[akka.NotUsed] =
val main: Behavior[String] =
Actor.deferred { ctx
val chatRoom = ctx.spawn(ChatRoom.behavior, "chatroom")
val gabblerRef = ctx.spawn(gabbler, "gabbler")
ctx.watch(gabblerRef)
chatRoom ! GetSession("ol Gabbler", gabblerRef)
Actor.immutable[akka.NotUsed] {
(_, _) Actor.unhandled
Actor.immutablePartial[String] {
case (_, "go")
chatRoom ! GetSession("ol Gabbler", gabblerRef)
Actor.same
} onSignal {
case (ctx, Terminated(ref))
case (_, Terminated(ref))
Actor.stopped
}
}
val system = ActorSystem(main, "ChatRoomDemo")
system ! "go"
Await.result(system.whenTerminated, 3.seconds)
//#chatroom-main
}

View file

@ -1,14 +1,14 @@
/**
* Copyright (C) 2014-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package docs.akka.actor.typed
package docs.akka.typed
//#imports
import akka.NotUsed
import akka.actor.typed._
import akka.actor.typed.scaladsl.Actor
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.AskPattern._
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.Await
//#imports
@ -90,22 +90,25 @@ class MutableIntroSpec extends TypedSpec {
//#chatroom-gabbler
//#chatroom-main
val main: Behavior[akka.NotUsed] =
val main: Behavior[String] =
Actor.deferred { ctx
val chatRoom = ctx.spawn(ChatRoom.behavior(), "chatroom")
val gabblerRef = ctx.spawn(gabbler, "gabbler")
ctx.watch(gabblerRef)
chatRoom ! GetSession("ol Gabbler", gabblerRef)
Actor.immutable[akka.NotUsed] {
(_, _) Actor.unhandled
Actor.immutablePartial[String] {
case (_, "go")
chatRoom ! GetSession("ol Gabbler", gabblerRef)
Actor.same
} onSignal {
case (ctx, Terminated(ref))
case (_, Terminated(ref))
println("Stopping guardian")
Actor.stopped
}
}
val system = ActorSystem(main, "ChatRoomDemo")
system ! "go"
Await.result(system.whenTerminated, 1.second)
//#chatroom-main
}

View file

@ -1,17 +1,4 @@
akka.typed {
# The loggers to be started during ActorSystem startup. These must name
# classes of type akka.typed.Logger that provide the initial logger behavior.
loggers = ["akka.typed.DefaultLogger"]
# FQCN of the logging filter that avoids rendering log messages below the current
# main loglevel.
# Must have a constructor for arguments of type (Settings, EventStream).
logging-filter = "akka.typed.DefaultLoggingFilter"
# Default mailbox capacity for actors where nothing else is configured by
# their parent, see also class akka.typed.MailboxCapacity
mailbox-capacity = 1000
# List FQCN of `akka.typed.ExtensionId`s which shall be loaded at actor system startup.
# Should be on the format: 'extensions = ["com.example.MyExtId1", "com.example.MyExtId2"]' etc.

View file

@ -16,12 +16,10 @@ import scala.util.Success
* Actor instance. Sending a message to an Actor that has terminated before
* receiving the message will lead to that message being discarded; such
* messages are delivered to the [[DeadLetter]] channel of the
* [[EventStream]] on a best effort basis
* [[akka.typed.EventStream]] on a best effort basis
* (i.e. this delivery is not reliable).
*/
trait ActorRef[-T] extends java.lang.Comparable[ActorRef[_]] {
this: internal.ActorRefImpl[T]
/**
* Send a message to the Actor referenced by this ActorRef using *at-most-once*
* messaging semantics.
@ -66,22 +64,10 @@ object ActorRef {
* Create an ActorRef from a Future, buffering up to the given number of
* messages in while the Future is not fulfilled.
*/
def apply[T](f: Future[ActorRef[T]], bufferSize: Int = 1000): ActorRef[T] =
private[akka] def apply[T](f: Future[ActorRef[T]], bufferSize: Int = 1000): ActorRef[T] =
f.value match {
// an AdaptedActorSystem will always create refs eagerly, so it will take this path
case Some(Success(ref)) ref
// for other ActorSystem implementations, this might work, it currently doesn't work
// for the adapted system, because the typed FutureRef cannot be watched from untyped
case x new internal.FutureRef(FuturePath, bufferSize, f)
case _ throw new IllegalStateException("Only expecting completed futures until the native actor system is implemented")
}
/**
* Create an ActorRef by providing a function that is invoked for sending
* messages and a termination callback.
*/
def apply[T](send: (T, internal.FunctionRef[T]) Unit, terminate: internal.FunctionRef[T] Unit): ActorRef[T] =
new internal.FunctionRef(FunctionPath, send, terminate)
private[typed] val FuturePath = a.RootActorPath(a.Address("akka.actor.typed.internal", "future"))
private[typed] val FunctionPath = a.RootActorPath(a.Address("akka.actor.typed.internal", "function"))
}

View file

@ -17,6 +17,7 @@ import akka.annotation.DoNotInherit
import akka.annotation.ApiMayChange
import java.util.Optional
import akka.actor.BootstrapSetup
import akka.actor.typed.receptionist.Receptionist
import akka.typed.EventStream
@ -29,8 +30,7 @@ import akka.typed.EventStream
*/
@DoNotInherit
@ApiMayChange
abstract class ActorSystem[-T] extends ActorRef[T] with Extensions { this: internal.ActorRefImpl[T]
abstract class ActorSystem[-T] extends ActorRef[T] with Extensions {
/**
* The name of this actor system, used to distinguish multiple ones within
* the same JVM & class loader.
@ -139,11 +139,11 @@ abstract class ActorSystem[-T] extends ActorRef[T] with Extensions { this: inter
* Ask the system guardian of this system to create an actor from the given
* behavior and props and with the given name. The name does not need to
* be unique since the guardian will prefix it with a running number when
* creating the child actor. The timeout sets the timeout used for the [[akka.actor.typed.scaladsl.AskPattern$]]
* creating the child actor. The timeout sets the timeout used for the [[akka.actor.typed.scaladsl.AskPattern]]
* invocation when asking the guardian.
*
* The returned Future of [[ActorRef]] may be converted into an [[ActorRef]]
* to which messages can immediately be sent by using the [[ActorRef$.apply[T](s*]]
* to which messages can immediately be sent by using the [[ActorRef.apply[T](s*]]
* method.
*/
def systemActorOf[U](behavior: Behavior[U], name: String, props: Props = Props.empty)(implicit timeout: Timeout): Future[ActorRef[U]]
@ -159,9 +159,7 @@ object ActorSystem {
import internal._
/**
* Scala API: Create an ActorSystem implementation that is optimized for running
* Akka Typed [[Behavior]] hierarchiesthis system cannot run untyped
* [[akka.actor.Actor]] instances.
* Scala API: Create an ActorSystem
*/
def apply[T](
guardianBehavior: Behavior[T],
@ -173,13 +171,11 @@ object ActorSystem {
Behavior.validateAsInitial(guardianBehavior)
val cl = classLoader.getOrElse(akka.actor.ActorSystem.findClassLoader())
val appConfig = config.getOrElse(ConfigFactory.load(cl))
new ActorSystemImpl(name, appConfig, cl, executionContext, guardianBehavior, guardianProps)
createInternal(name, guardianBehavior, guardianProps, Some(appConfig), classLoader, executionContext)
}
/**
* Java API: Create an ActorSystem implementation that is optimized for running
* Akka Typed [[Behavior]] hierarchiesthis system cannot run untyped
* [[akka.actor.Actor]] instances.
* Java API: Create an ActorSystem
*/
def create[T](
guardianBehavior: Behavior[T],
@ -193,9 +189,7 @@ object ActorSystem {
}
/**
* Java API: Create an ActorSystem implementation that is optimized for running
* Akka Typed [[Behavior]] hierarchiesthis system cannot run untyped
* [[akka.actor.Actor]] instances.
* Java API: Create an ActorSystem
*/
def create[T](guardianBehavior: Behavior[T], name: String): ActorSystem[T] =
apply(guardianBehavior, name)
@ -205,26 +199,22 @@ object ActorSystem {
* which runs Akka Typed [[Behavior]] on an emulation layer. In this
* system typed and untyped actors can coexist.
*/
def adapter[T](name: String, guardianBehavior: Behavior[T],
guardianProps: Props = Props.empty,
config: Option[Config] = None,
classLoader: Option[ClassLoader] = None,
executionContext: Option[ExecutionContext] = None,
actorSystemSettings: ActorSystemSetup = ActorSystemSetup.empty): ActorSystem[T] = {
// TODO I'm not sure how useful this mode is for end-users. It has the limitation that untyped top level
// actors can't be created, because we have a custom user guardian. I would imagine that if you have
// a system of both untyped and typed actors (e.g. adding some typed actors to an existing application)
// you would start an untyped.ActorSystem and spawn typed actors from that system or from untyped actors.
// Same thing with `wrap` below.
private def createInternal[T](name: String, guardianBehavior: Behavior[T],
guardianProps: Props = Props.empty,
config: Option[Config] = None,
classLoader: Option[ClassLoader] = None,
executionContext: Option[ExecutionContext] = None): ActorSystem[T] = {
Behavior.validateAsInitial(guardianBehavior)
val cl = classLoader.getOrElse(akka.actor.ActorSystem.findClassLoader())
val appConfig = config.getOrElse(ConfigFactory.load(cl))
val setup = ActorSystemSetup(BootstrapSetup(classLoader, config, executionContext))
val untyped = new a.ActorSystemImpl(name, appConfig, cl, executionContext,
Some(PropsAdapter(() guardianBehavior, guardianProps)), actorSystemSettings)
Some(PropsAdapter(() guardianBehavior, guardianProps)), setup)
untyped.start()
ActorSystemAdapter.AdapterExtension(untyped).adapter
val adapter: ActorSystemAdapter.AdapterExtension = ActorSystemAdapter.AdapterExtension(untyped)
adapter.adapter
}
/**
@ -267,10 +257,6 @@ final class Settings(val config: Config, val untyped: a.ActorSystem.Settings, va
value
}
val Loggers = getSL("Loggers", "akka.typed.loggers")
val LoggingFilter = getS("LoggingFilter", "akka.typed.logging-filter")
val DefaultMailboxCapacity = getI("DefaultMailboxCapacity", "akka.typed.mailbox-capacity")
foundSettings = foundSettings.reverse
override def toString: String = s"Settings($name,\n ${foundSettings.mkString("\n ")})"

View file

@ -22,8 +22,8 @@ object Props {
/**
* Data structure for describing an actors props details like which
* executor to run it on. For each type of setting (e.g. [[DispatcherSelector]]
* or [[MailboxCapacity]]) the FIRST occurrence is used when creating the
* executor to run it on. For each type of setting (e.g. [[DispatcherSelector]])
* the FIRST occurrence is used when creating the
* actor; this means that adding configuration using the "with" methods
* overrides what was configured previously.
*
@ -78,11 +78,6 @@ abstract class Props private[akka] () extends Product with Serializable {
*/
def withDispatcherFromExecutionContext(ec: ExecutionContext): Props = DispatcherFromExecutionContext(ec, this)
/**
* Prepend the given mailbox capacity configuration to this Props.
*/
def withMailboxCapacity(capacity: Int): Props = MailboxCapacity(capacity, this)
/**
* Find the first occurrence of a configuration node of the given type, falling
* back to the provided default if none is found.
@ -140,19 +135,6 @@ abstract class Props private[akka] () extends Product with Serializable {
}
}
/**
* Configure the maximum mailbox capacity for the actor. If more messages are
* enqueued because the actor does not process them quickly enough then further
* messages will be dropped.
*
* The default mailbox capacity that is used when this option is not given is
* taken from the `akka.typed.mailbox-capacity` configuration setting.
*/
@InternalApi
private[akka] final case class MailboxCapacity(capacity: Int, next: Props = Props.empty) extends Props {
private[akka] override def withNext(next: Props): Props = copy(next = next)
}
/**
* The empty configuration node, used as a terminator for the internally linked
* list of each Props.

View file

@ -1,492 +0,0 @@
/**
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.actor.typed
package internal
import akka.actor.{ Cancellable, InvalidActorNameException, InvalidMessageException }
import akka.util.Helpers
import scala.concurrent.duration.FiniteDuration
import akka.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContextExecutor
import akka.util.Unsafe.{ instance unsafe }
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.Queue
import scala.annotation.tailrec
import scala.util.control.NonFatal
import scala.util.control.Exception.Catcher
import akka.event.Logging.Error
import akka.event.Logging
import akka.actor.typed.Behavior.StoppedBehavior
import akka.util.OptionVal
import akka.actor.typed.Behavior.UntypedBehavior
/**
* INTERNAL API
*/
object ActorCell {
/*
* Description of the _status field bit structure:
*
* bit 0-29: activation count (number of (system)messages)
* bit 30: terminating (or terminated)
* bit 31: terminated
*
* Activation count is a bit special:
* 0 means inactive
* 1 means active without normal messages (i.e. only system messages)
* N means active with N-1 normal messages (plus possibly system messages)
*/
final val terminatingShift = 30
final val activationMask = (1 << terminatingShift) - 1
// ensure that if all processors enqueue the last message concurrently, there is still no overflow
val maxActivations = activationMask - Runtime.getRuntime.availableProcessors - 1
final val terminatingBit = 1 << terminatingShift
final val terminatedBit = 1 << 31
def isTerminating(status: Int): Boolean = (status & terminatingBit) != 0
def isTerminated(status: Int): Boolean = status < 0
def isActive(status: Int): Boolean = (status & ~activationMask) == 0
def activations(status: Int): Int = status & activationMask
def messageCount(status: Int): Int = Math.max(0, activations(status) - 1)
val statusOffset = unsafe.objectFieldOffset(classOf[ActorCell[_]].getDeclaredField("_status"))
val systemQueueOffset = unsafe.objectFieldOffset(classOf[ActorCell[_]].getDeclaredField("_systemQueue"))
final val DefaultState = 0
final val SuspendedState = 1
final val SuspendedWaitForChildrenState = 2
/** compile time constant */
final val Debug = false
}
/**
* INTERNAL API
*/
private[typed] class ActorCell[T](
override val system: ActorSystem[Nothing],
protected val initialBehavior: Behavior[T],
override val executionContext: ExecutionContextExecutor,
override val mailboxCapacity: Int,
val parent: ActorRefImpl[Nothing])
extends ActorContextImpl[T] with Runnable with SupervisionMechanics[T] with DeathWatch[T] {
import ActorCell._
/*
* Implementation of the ActorContext trait.
*/
protected var childrenMap = Map.empty[String, ActorRefImpl[Nothing]]
protected var terminatingMap = Map.empty[String, ActorRefImpl[Nothing]]
override def children: Iterable[ActorRef[Nothing]] = childrenMap.values
override def child(name: String): Option[ActorRef[Nothing]] = childrenMap.get(name)
protected def removeChild(actor: ActorRefImpl[Nothing]): Unit = {
val n = actor.path.name
childrenMap.get(n) match {
case Some(`actor`) childrenMap -= n
case _
terminatingMap.get(n) match {
case Some(`actor`) terminatingMap -= n
case _
}
}
}
private[typed] def terminating: Iterable[ActorRef[Nothing]] = terminatingMap.values
private var _self: ActorRefImpl[T] = _
private[typed] def setSelf(ref: ActorRefImpl[T]): Unit = _self = ref
override def self: ActorRefImpl[T] = _self
protected def ctx: ActorContext[T] = this
override def spawn[U](behavior: Behavior[U], name: String, props: Props): ActorRef[U] = {
if (behavior.isInstanceOf[UntypedBehavior[_]])
throw new IllegalArgumentException(s"${behavior.getClass.getName} requires untyped ActorSystem")
if (childrenMap contains name) throw InvalidActorNameException(s"actor name [$name] is not unique")
if (terminatingMap contains name) throw InvalidActorNameException(s"actor name [$name] is not yet free")
val dispatcher = props.firstOrElse[DispatcherSelector](DispatcherFromExecutionContext(executionContext))
val capacity = props.firstOrElse(MailboxCapacity(system.settings.DefaultMailboxCapacity))
val cell = new ActorCell[U](system, Behavior.validateAsInitial(behavior), system.dispatchers.lookup(dispatcher), capacity.capacity, self)
// TODO uid is still needed
val ref = new LocalActorRef[U](self.path / name, cell)
cell.setSelf(ref)
childrenMap = childrenMap.updated(name, ref)
ref.sendSystem(Create())
ref
}
private var nextName = 0L
override def spawnAnonymous[U](behavior: Behavior[U], props: Props): ActorRef[U] = {
val name = Helpers.base64(nextName)
nextName += 1
spawn(behavior, name, props)
}
override def stop[U](child: ActorRef[U]): Boolean = {
val name = child.path.name
childrenMap.get(name) match {
case None false
case Some(ref) if ref != child false
case Some(ref)
ref.sendSystem(Terminate())
childrenMap -= name
terminatingMap = terminatingMap.updated(name, ref)
true
}
}
protected def stopAll(): Unit = {
childrenMap.valuesIterator.foreach { ref
ref.sendSystem(Terminate())
terminatingMap = terminatingMap.updated(ref.path.name, ref)
}
childrenMap = Map.empty
}
override def schedule[U](delay: FiniteDuration, target: ActorRef[U], msg: U): Cancellable =
system.scheduler.scheduleOnce(delay)(target ! msg)(ExecutionContexts.sameThreadExecutionContext)
override private[akka] def internalSpawnAdapter[U](f: U T, _name: String): ActorRef[U] = {
val baseName = Helpers.base64(nextName, new java.lang.StringBuilder("$!"))
nextName += 1
val name = if (_name != "") s"$baseName-${_name}" else baseName
val ref = new FunctionRef[U](
self.path / name,
(msg, _) { val m = f(msg); if (m != null) send(m) },
(self) sendSystem(DeathWatchNotification(self, null)))
childrenMap = childrenMap.updated(name, ref)
ref
}
private[this] var receiveTimeout: (FiniteDuration, T) = null
override def setReceiveTimeout(d: FiniteDuration, msg: T): Unit = {
if (Debug) println(s"$self setting receive timeout of $d, msg $msg")
receiveTimeout = (d, msg)
}
override def cancelReceiveTimeout(): Unit = {
if (Debug) println(s"$self canceling receive timeout")
receiveTimeout = null
}
/*
* Implementation of the invocation mechanics.
*/
// see comment in companion object for details
@volatile private[this] var _status: Int = 0
protected[typed] def getStatus: Int = _status
private[this] val queue: Queue[T] = new ConcurrentLinkedQueue[T]
private[typed] def peekMessage: T = queue.peek()
private[this] val maxQueue: Int = Math.min(mailboxCapacity, maxActivations)
@volatile private[this] var _systemQueue: LatestFirstSystemMessageList = SystemMessageList.LNil
protected def maySend: Boolean = !isTerminating
protected def isTerminating: Boolean = ActorCell.isTerminating(_status)
protected def setTerminating(): Unit = if (!ActorCell.isTerminating(_status)) unsafe.getAndAddInt(this, statusOffset, terminatingBit)
protected def setClosed(): Unit = if (!isTerminated(_status)) unsafe.getAndAddInt(this, statusOffset, terminatedBit)
private def handleException: Catcher[Unit] = {
case e: InterruptedException
publish(Error(e, self.path.toString, getClass, "interrupted during message send"))
Thread.currentThread.interrupt()
case NonFatal(e)
publish(Error(e, self.path.toString, getClass, "swallowing exception during message send"))
}
def send(msg: T): Unit = {
if (msg == null) throw new InvalidMessageException("[null] is not an allowed message")
try {
val old = unsafe.getAndAddInt(this, statusOffset, 1)
val oldActivations = activations(old)
// this is not an off-by-one: #msgs is activations-1 if >0
if (oldActivations > maxQueue) {
if (Debug) println(s"[$thread] $self NOT enqueueing $msg at status $old ($oldActivations > $maxQueue)")
// cannot enqueue, need to give back activation token
unsafe.getAndAddInt(this, statusOffset, -1)
system.eventStream.publish(Dropped(msg, self))
} else if (ActorCell.isTerminating(old)) {
if (Debug) println(s"[$thread] $self NOT enqueueing $msg at status $old (is terminating)")
unsafe.getAndAddInt(this, statusOffset, -1)
system.deadLetters ! msg
} else {
if (Debug) println(s"[$thread] $self enqueueing $msg at status $old")
// need to enqueue; if the actor sees the token but not the message, it will reschedule
queue.add(msg)
if (oldActivations == 0) {
if (Debug) println(s"[$thread] $self being woken up")
unsafe.getAndAddInt(this, statusOffset, 1) // the first 1 was just the active bit, now add 1msg
// if the actor was not yet running, set it in motion; spurious wakeups dont hurt
executionContext.execute(this)
}
}
} catch handleException
}
def sendSystem(signal: SystemMessage): Unit = {
@tailrec def needToActivate(): Boolean = {
val currentList = _systemQueue
if (currentList.head == NoMessage) {
system.deadLetters.sorry.sendSystem(signal)
false
} else {
unsafe.compareAndSwapObject(this, systemQueueOffset, currentList.head, (signal :: currentList).head) || {
signal.unlink()
needToActivate()
}
}
}
try {
if (needToActivate()) {
val old = unsafe.getAndAddInt(this, statusOffset, 1)
if (isTerminated(old)) {
// nothing to do
if (Debug) println(s"[$thread] $self NOT enqueueing $signal: terminating")
unsafe.getAndAddInt(this, statusOffset, -1)
} else if (activations(old) == 0) {
// all is good: we signaled the transition to active
if (Debug) println(s"[$thread] $self enqueueing $signal: activating")
executionContext.execute(this)
} else {
// take back that token: we didnt actually enqueue a normal message and the actor was already active
if (Debug) println(s"[$thread] $self enqueueing $signal: already active")
unsafe.getAndAddInt(this, statusOffset, -1)
}
} else if (Debug) println(s"[$thread] $self NOT enqueueing $signal: terminated")
} catch handleException
}
/**
* Main entry point into the actor: the ActorCell is a Runnable that is
* enqueued in its Executor whenever it needs to run. The _status field is
* used for coordination such that it is never enqueued more than once at
* any given time, because that would break the Actor Model.
*
* The idea here is to process at most as many messages as were in queued
* upon entry of this method, interleaving each normal message with the
* processing of all system messages that may have accumulated in the
* meantime. If at the end of the processing messages remain in the queue
* then this cell is rescheduled.
*
* All coordination occurs via a single Int field that is only updated in
* wait-free manner (LOCK XADD via unsafe.getAndAddInt), where conflicts are
* resolved by compensating actions. For a description of the bit usage see
* the companion objects source code.
*/
override final def run(): Unit = {
if (Debug) println(s"[$thread] $self entering run(): interrupted=${Thread.currentThread.isInterrupted}")
val status = _status
val msgs = messageCount(status)
var processed = 0
@tailrec def process(): Unit = {
if (processAllSystemMessages() && processed < msgs) {
val msg = queue.poll()
if (msg != null) {
processed += 1
processMessage(msg)
process()
}
}
}
try {
unscheduleReceiveTimeout()
if (!isTerminated(status)) {
process()
scheduleReceiveTimeout()
}
} catch {
case NonFatal(ex)
fail(ex)
case ie: InterruptedException
fail(ie)
if (Debug) println(s"[$thread] $self interrupting due to catching InterruptedException")
Thread.currentThread.interrupt()
}
// Returns `true` if it should be rescheduled.
// This method shouldn't throw apart from fatal errors.
def postProcess(): Boolean = {
// also remove the general activation token
processed += 1
val prev = unsafe.getAndAddInt(this, statusOffset, -processed)
val now = prev - processed
if (isTerminated(now)) {
false // were finished, don't reschedule
} else if (activations(now) > 0) {
// normal messages pending: reverse the deactivation
unsafe.getAndAddInt(this, statusOffset, 1)
true // ... and reschedule
} else if (_systemQueue.head != null) {
/*
* System message was enqueued after our last processing, we now need to
* race against the other party because the enqueue might have happened
* before the deactivation (above) and hence not scheduled.
*
* If we win, we reschedule; if we lose, we must remove the attempted
* activation token again.
*/
val again = unsafe.getAndAddInt(this, statusOffset, 1)
if (activations(again) == 0) true //reschedule
else {
unsafe.getAndAddInt(this, statusOffset, -1)
false // don't reschedule
}
} else {
false // don't reschedule
}
}
if (postProcess())
try executionContext.execute(this) catch {
case NonFatal(e)
// we can just hope that the actor will receive another message at some
// point to wake it up againassuming that the failure to enqueue the cell is transient
fail(e)
}
if (Debug) println(s"[$thread] $self exiting run(): interrupted=${Thread.currentThread.isInterrupted}")
}
protected[typed] var behavior: Behavior[T] = _
protected def next(b: Behavior[T], msg: Any): Unit = {
if (Behavior.isUnhandled(b)) unhandled(msg)
else {
b match {
case s: StoppedBehavior[T]
// use StoppedBehavior with previous behavior or an explicitly given `postStop` behavior
// until Terminate is received, i.e until finishTerminate is invoked, and there PostStop
// will be signaled to the previous/postStop behavior
s.postStop match {
case OptionVal.None
// use previous as the postStop behavior
behavior = new Behavior.StoppedBehavior(OptionVal.Some(behavior))
case OptionVal.Some(postStop)
// use the given postStop behavior, but canonicalize it
behavior = new Behavior.StoppedBehavior(OptionVal.Some(Behavior.canonicalize(postStop, behavior, ctx)))
}
self.sendSystem(Terminate())
case _
behavior = Behavior.canonicalize(b, behavior, ctx)
}
}
}
private def unhandled(msg: Any): Unit = msg match {
case Terminated(ref) fail(DeathPactException(ref))
case _ // nothing to do
}
private[this] var receiveTimeoutScheduled: Cancellable = null
private def unscheduleReceiveTimeout(): Unit =
if (receiveTimeoutScheduled ne null) {
receiveTimeoutScheduled.cancel()
receiveTimeoutScheduled = null
}
private def scheduleReceiveTimeout(): Unit =
receiveTimeout match {
case (d, msg)
receiveTimeoutScheduled = schedule(d, self, msg)
case other
// nothing to do
}
/**
* Process the messages in the mailbox
*/
private def processMessage(msg: T): Unit = {
if (Debug) println(s"[$thread] $self processing message $msg")
next(Behavior.interpretMessage(behavior, this, msg), msg)
if (Thread.interrupted())
throw new InterruptedException("Interrupted while processing actor messages")
}
@tailrec
private def systemDrain(next: LatestFirstSystemMessageList): EarliestFirstSystemMessageList = {
val currentList = _systemQueue
if (currentList.head == NoMessage) SystemMessageList.ENil
else if (unsafe.compareAndSwapObject(this, systemQueueOffset, currentList.head, next.head)) currentList.reverse
else systemDrain(next)
}
/**
* Will at least try to process all queued system messages: in case of
* failure simply drop and go on to the next, because there is nothing to
* restart here (failure is in ActorCell somewhere ). In case the mailbox
* becomes closed (because of processing a Terminate message), dump all
* already dequeued message to deadLetters.
*/
private def processAllSystemMessages(): Boolean = {
var interruption: Throwable = null
var messageList = systemDrain(SystemMessageList.LNil)
var continue = true
while (messageList.nonEmpty && continue) {
val msg = messageList.head
messageList = messageList.tail
msg.unlink()
continue =
try processSignal(msg)
catch {
case ie: InterruptedException
fail(ie)
if (Debug) println(s"[$thread] $self interrupting due to catching InterruptedException during system message processing")
Thread.currentThread.interrupt()
true
case ex @ (NonFatal(_) | _: AssertionError)
fail(ex)
true
}
/*
* the second part of the condition is necessary to avoid logging an InterruptedException
* from the systemGuardian during shutdown
*/
if (Thread.interrupted() && system.whenTerminated.value.isEmpty)
interruption = new InterruptedException("Interrupted while processing system messages")
// dont ever execute normal message when system message present!
if (messageList.isEmpty && continue) messageList = systemDrain(SystemMessageList.LNil)
}
/*
* if we closed the mailbox, we must dump the remaining system messages
* to deadLetters (this is essential for DeathWatch)
*/
val dlm = system.deadLetters
if (isTerminated(_status) && messageList.isEmpty) messageList = systemDrain(new LatestFirstSystemMessageList(NoMessage))
while (messageList.nonEmpty) {
val msg = messageList.head
messageList = messageList.tail
if (Debug) println(s"[$thread] $self dropping dead system message $msg")
msg.unlink()
try dlm.sorry.sendSystem(msg)
catch {
case e: InterruptedException interruption = e
case NonFatal(e) system.eventStream.publish(
Error(e, self.path.toString, this.getClass, "error while enqueuing " + msg + " to deadLetters: " + e.getMessage))
}
if (isTerminated(_status) && messageList.isEmpty) messageList = systemDrain(new LatestFirstSystemMessageList(NoMessage))
}
// if we got an interrupted exception while handling system messages, then rethrow it
if (interruption ne null) {
if (Debug) println(s"[$thread] $self throwing interruption")
Thread.interrupted() // clear interrupted flag before throwing according to java convention
throw interruption
}
continue
}
// logging is not the main purpose, and if it fails theres nothing we can do
protected final def publish(e: Logging.LogEvent): Unit = try system.eventStream.publish(e) catch { case NonFatal(_) }
protected final def clazz(o: AnyRef): Class[_] = if (o eq null) this.getClass else o.getClass
private def thread: String = Thread.currentThread.getName
override def toString: String = f"ActorCell($self, status = ${_status}%08x, queue = $queue)"
}

View file

@ -4,18 +4,6 @@
package akka.actor.typed
package internal
import akka.{ actor a }
import akka.dispatch.sysmsg._
import akka.util.Unsafe.{ instance unsafe }
import scala.annotation.tailrec
import scala.util.control.NonFatal
import scala.concurrent.Future
import java.util.ArrayList
import akka.actor.InvalidMessageException
import scala.util.{ Failure, Success }
import scala.annotation.unchecked.uncheckedVariance
/**
@ -24,7 +12,7 @@ import scala.annotation.unchecked.uncheckedVariance
* available in the package object, enabling `ref.toImpl` (or `ref.toImplN`
* for `ActorRef[Nothing]`Scala refuses to infer `Nothing` as a type parameter).
*/
private[typed] trait ActorRefImpl[-T] extends ActorRef[T] {
private[akka] trait ActorRefImpl[-T] extends ActorRef[T] {
def sendSystem(signal: SystemMessage): Unit
def isLocal: Boolean
@ -53,185 +41,3 @@ private[typed] trait ActorRefImpl[-T] extends ActorRef[T] {
override def toString: String = s"Actor[${path}#${path.uid}]"
}
/**
* A local ActorRef that is backed by an asynchronous [[ActorCell]].
*/
private[typed] class LocalActorRef[-T](override val path: a.ActorPath, cell: ActorCell[T])
extends ActorRef[T] with ActorRefImpl[T] {
override def tell(msg: T): Unit = cell.send(msg)
override def sendSystem(signal: SystemMessage): Unit = cell.sendSystem(signal)
final override def isLocal: Boolean = true
private[typed] def getCell: ActorCell[_] = cell
}
/**
* A local ActorRef that just discards everything that is sent to it. This
* implies that it effectively has an infinite lifecycle, i.e. it never
* terminates (meaning: no Hawking radiation).
*/
private[typed] object BlackholeActorRef
extends ActorRef[Any] with ActorRefImpl[Any] {
override val path: a.ActorPath = a.RootActorPath(a.Address("akka.actor.typed.internal", "blackhole"))
override def tell(msg: Any): Unit = ()
override def sendSystem(signal: SystemMessage): Unit = ()
final override def isLocal: Boolean = true
}
/**
* A local synchronous ActorRef that invokes the given function for every message send.
* This reference can be watched and will do the right thing when it receives a [[DeathWatchNotification]].
* This reference cannot watch other references.
*/
private[akka] final class FunctionRef[-T](
_path: a.ActorPath,
send: (T, FunctionRef[T]) Unit,
_terminate: FunctionRef[T] Unit)
extends WatchableRef[T](_path) {
override def tell(msg: T): Unit = {
if (msg == null) throw new InvalidMessageException("[null] is not an allowed message")
if (isAlive)
try send(msg, this) catch {
case NonFatal(ex) // nothing we can do here
}
else () // we dont have deadLetters available
}
override def sendSystem(signal: SystemMessage): Unit = signal match {
case Create() // nothing to do
case DeathWatchNotification(ref, cause) // were not watching, and were not a parent either
case Terminate() doTerminate()
case Watch(watchee, watcher) if (watchee == this && watcher != this) addWatcher(watcher.sorryForNothing)
case Unwatch(watchee, watcher) if (watchee == this && watcher != this) remWatcher(watcher.sorryForNothing)
case NoMessage // nothing to do
}
override def isLocal = true
override def terminate(): Unit = _terminate(this)
}
/**
* The mechanics for synthetic ActorRefs that have a lifecycle and support being watched.
*/
private[typed] abstract class WatchableRef[-T](override val path: a.ActorPath) extends ActorRef[T] with ActorRefImpl[T] {
import WatchableRef._
/**
* Callback that is invoked when this ref has terminated. Even if doTerminate() is
* called multiple times, this callback is invoked only once.
*/
protected def terminate(): Unit
type S = Set[ActorRefImpl[Nothing]]
@volatile private[this] var _watchedBy: S = Set.empty
protected def isAlive: Boolean = _watchedBy != null
protected def doTerminate(): Unit = {
val watchedBy = unsafe.getAndSetObject(this, watchedByOffset, null).asInstanceOf[S]
if (watchedBy != null) {
try terminate() catch { case NonFatal(ex) }
if (watchedBy.nonEmpty) watchedBy foreach sendTerminated
}
}
private def sendTerminated(watcher: ActorRefImpl[Nothing]): Unit =
watcher.sendSystem(DeathWatchNotification(this, null))
@tailrec final protected def addWatcher(watcher: ActorRefImpl[Nothing]): Unit =
_watchedBy match {
case null sendTerminated(watcher)
case watchedBy
if (!watchedBy.contains(watcher))
if (!unsafe.compareAndSwapObject(this, watchedByOffset, watchedBy, watchedBy + watcher))
addWatcher(watcher) // try again
}
@tailrec final protected def remWatcher(watcher: ActorRefImpl[Nothing]): Unit = {
_watchedBy match {
case null // do nothing...
case watchedBy
if (watchedBy.contains(watcher))
if (!unsafe.compareAndSwapObject(this, watchedByOffset, watchedBy, watchedBy - watcher))
remWatcher(watcher) // try again
}
}
}
private[typed] object WatchableRef {
val watchedByOffset = unsafe.objectFieldOffset(classOf[WatchableRef[_]].getDeclaredField("_watchedBy"))
}
/**
* A Future of an ActorRef can quite easily be wrapped as an ActorRef since no
* promises are made about delivery delays: as long as the Future is not ready
* messages will be queued, afterwards they get sent without waiting.
*/
private[typed] class FutureRef[-T](_path: a.ActorPath, bufferSize: Int, f: Future[ActorRef[T]]) extends WatchableRef[T](_path) {
import FutureRef._
// Keep in synch with `targetOffset` in companion (could also change on mixing in a trait).
@volatile private[this] var _target: Either[ArrayList[T], ActorRef[T]] = Left(new ArrayList[T])
f.onComplete {
case Success(ref)
_target match {
case l @ Left(list)
list.synchronized {
val it = list.iterator
while (it.hasNext) ref ! it.next()
if (unsafe.compareAndSwapObject(this, targetOffset, l, Right(ref)))
ref.sorry.sendSystem(Watch(ref, this))
// if this fails, concurrent termination has won and there is no point in watching
}
case _ // already terminated
}
case Failure(ex) doTerminate()
}(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
override def terminate(): Unit = {
val old = unsafe.getAndSetObject(this, targetOffset, Right(BlackholeActorRef))
old match {
case Right(target: ActorRef[_]) target.sorry.sendSystem(Unwatch(target, this))
case _ // nothing to do
}
}
override def tell(msg: T): Unit = {
if (msg == null) throw new InvalidMessageException("[null] is not an allowed message")
_target match {
case Left(list)
list.synchronized {
if (_target.isRight) tell(msg)
else if (list.size < bufferSize) list.add(msg)
}
case Right(ref) ref ! msg
}
}
override def sendSystem(signal: SystemMessage): Unit = signal match {
case Create() // nothing to do
case DeathWatchNotification(ref, cause)
_target = Right(BlackholeActorRef) // avoid sending Unwatch() in this case
doTerminate() // this can only be the result of watching the target
case Terminate() doTerminate()
case Watch(watchee, watcher) if (watchee == this && watcher != this) addWatcher(watcher.sorryForNothing)
case Unwatch(watchee, watcher) if (watchee == this && watcher != this) remWatcher(watcher.sorryForNothing)
case NoMessage // nothing to do
}
override def isLocal = true
}
private[typed] object FutureRef {
val targetOffset = {
val fields = classOf[FutureRef[_]].getDeclaredFields.toList
// On Scala 2.12, the field's name is exactly "_target" (and it's private), earlier Scala versions compile the val to a public field that's name mangled to "akka.actor.typed$internal$FutureRef$$_target"
val targetField = fields.find(_.getName.endsWith("_target"))
assert(targetField.nonEmpty, s"Could not find _target field in FutureRef class among fields $fields.")
unsafe.objectFieldOffset(targetField.get)
}
}

View file

@ -1,301 +0,0 @@
/**
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.actor.typed
package internal
import com.typesafe.config.Config
import scala.concurrent.ExecutionContext
import java.util.concurrent.ThreadFactory
import scala.concurrent.{ ExecutionContextExecutor, Future }
import akka.{ actor a, dispatch d, event e }
import scala.util.control.NonFatal
import scala.util.control.ControlThrowable
import scala.collection.immutable
import akka.actor.typed.Dispatchers
import scala.concurrent.Promise
import java.util.concurrent.ConcurrentSkipListSet
import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.JavaConverters._
import scala.util.Success
import akka.util.Timeout
import java.io.Closeable
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.scaladsl.AskPattern
import akka.typed.{ BusLogging, EventStream }
object ActorSystemImpl {
sealed trait SystemCommand
case class CreateSystemActor[T](behavior: Behavior[T], name: String, props: Props)(val replyTo: ActorRef[ActorRef[T]]) extends SystemCommand
val systemGuardianBehavior: Behavior[SystemCommand] = {
import scaladsl.Actor
Actor.deferred { _
var i = 1
Actor.immutable {
case (ctx, create: CreateSystemActor[t])
val name = s"$i-${create.name}"
i += 1
create.replyTo ! ctx.spawn(create.behavior, name, create.props)
Actor.same
}
}
}
}
/*
* Actor Ideas:
remoting/clustering is just another set of actors/extensions
Receptionist:
should be a new kind of Extension (where lookup yields ActorRef)
obtaining a reference may either give a single remote one or a dynamic local proxy that routes to available instancesdistinguished using a stableDestination flag (for read-your-writes semantics)
perhaps fold sharding into this: how message routing is done should not matter
Streams:
make new implementation of ActorMaterializer that leverages Envelope removal
all internal actor creation must be asynchronous
could offer ActorSystem extension for materializer
remove downcasts to ActorMaterializer in akka-stream packagereplace by proper function passing or Materializer APIs where needed (should make Gearpump happier as well)
add new Sink/Source for ActorRef[]
Distributed Data:
create new Behaviors around the logic
*
*/
private[typed] class ActorSystemImpl[-T](
override val name: String,
_config: Config,
_cl: ClassLoader,
_ec: Option[ExecutionContext],
_userGuardianBehavior: Behavior[T],
_userGuardianProps: Props)
extends ActorSystem[T] with ActorRef[T] with ActorRefImpl[T] with ExtensionsImpl {
import ActorSystemImpl._
if (!name.matches("""^[a-zA-Z0-9][a-zA-Z0-9-_]*$"""))
throw new IllegalArgumentException(
"invalid ActorSystem name [" + name +
"], must contain only word characters (i.e. [a-zA-Z0-9] plus non-leading '-' or '_')")
final override val path: a.ActorPath = a.RootActorPath(a.Address("akka", name)) / "user"
override val settings: Settings = new Settings(_cl, _config, name)
override def logConfiguration(): Unit = log.info(settings.toString)
protected def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler =
new Thread.UncaughtExceptionHandler() {
def uncaughtException(thread: Thread, cause: Throwable): Unit = {
cause match {
case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable log.error(cause, "Uncaught error from thread [{}]", thread.getName)
case _
if (settings.untyped.JvmExitOnFatalError) {
try {
log.error(cause, "Uncaught error from thread [{}] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled", thread.getName)
import System.err
err.print("Uncaught error from thread [")
err.print(thread.getName)
err.print("] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[")
err.print(name)
err.println("]")
cause.printStackTrace(System.err)
System.err.flush()
} finally {
System.exit(-1)
}
} else {
log.error(cause, "Uncaught fatal error from thread [{}] shutting down ActorSystem [{}]", thread.getName, name)
terminate()
}
}
}
}
override val threadFactory: d.MonitorableThreadFactory =
d.MonitorableThreadFactory(name, settings.untyped.Daemonicity, Option(_cl), uncaughtExceptionHandler)
override val dynamicAccess: a.DynamicAccess = new a.ReflectiveDynamicAccess(_cl)
private val loggerIds = new AtomicInteger
def loggerId(): Int = loggerIds.incrementAndGet()
// this provides basic logging (to stdout) until .start() is called below
override val eventStream = new EventStreamImpl(settings.untyped.DebugEventStream)(settings.untyped.LoggerStartTimeout)
eventStream.startStdoutLogger(settings)
override val logFilter: e.LoggingFilter = {
val arguments = Vector(classOf[Settings] settings, classOf[EventStream] eventStream)
dynamicAccess.createInstanceFor[e.LoggingFilter](settings.LoggingFilter, arguments).get
}
override val log: e.LoggingAdapter = new BusLogging(eventStream, getClass.getName + "(" + name + ")", this.getClass, logFilter)
/**
* Create the scheduler service. This one needs one special behavior: if
* Closeable, it MUST execute all outstanding tasks upon .close() in order
* to properly shutdown all dispatchers.
*
* Furthermore, this timer service MUST throw IllegalStateException if it
* cannot schedule a task. Once scheduled, the task MUST be executed. If
* executed upon close(), the task may execute before its timeout.
*/
protected def createScheduler(): a.Scheduler =
dynamicAccess.createInstanceFor[a.Scheduler](settings.untyped.SchedulerClass, immutable.Seq(
classOf[Config] settings.config,
classOf[e.LoggingAdapter] log,
classOf[ThreadFactory] threadFactory.withName(threadFactory.name + "-scheduler"))).get
override val scheduler: a.Scheduler = createScheduler()
private def closeScheduler(): Unit = scheduler match {
case x: Closeable x.close()
case _
}
/**
* Stub implementation of untyped EventStream to allow reuse of previous DispatcherConfigurator infrastructure
*/
private object eventStreamStub extends e.EventStream(null, false) {
override def subscribe(ref: a.ActorRef, ch: Class[_]): Boolean =
throw new UnsupportedOperationException("Cannot use this eventstream for subscribing")
override def publish(event: AnyRef): Unit = eventStream.publish(event)
}
/**
* Stub implementation of untyped Mailboxes to allow reuse of previous DispatcherConfigurator infrastructure
*/
private val mailboxesStub = new d.Mailboxes(settings.untyped, eventStreamStub, dynamicAccess,
new a.MinimalActorRef {
override def path = rootPath
override def provider = throw new UnsupportedOperationException("Mailboxes deadletter reference does not provide")
})
private val dispatcherPrequisites =
d.DefaultDispatcherPrerequisites(threadFactory, eventStreamStub, scheduler, dynamicAccess, settings.untyped, mailboxesStub, _ec)
override val dispatchers: Dispatchers = new DispatchersImpl(settings, log, dispatcherPrequisites)
override val executionContext: ExecutionContextExecutor = dispatchers.lookup(DispatcherDefault())
override val startTime: Long = System.currentTimeMillis()
override def uptime: Long = (System.currentTimeMillis() - startTime) / 1000
private val terminationPromise: Promise[Terminated] = Promise()
private val rootPath: a.ActorPath = a.RootActorPath(a.Address("akka", name))
private val topLevelActors = new ConcurrentSkipListSet[ActorRefImpl[Nothing]]
private val terminateTriggered = new AtomicBoolean
private val theOneWhoWalksTheBubblesOfSpaceTime: ActorRefImpl[Nothing] =
new ActorRef[Nothing] with ActorRefImpl[Nothing] {
override def path: a.ActorPath = rootPath
override def tell(msg: Nothing): Unit =
throw new UnsupportedOperationException("Cannot send to theOneWhoWalksTheBubblesOfSpaceTime")
override def sendSystem(signal: SystemMessage): Unit = signal match {
case Terminate()
if (terminateTriggered.compareAndSet(false, true))
topLevelActors.asScala.foreach(ref ref.sendSystem(Terminate()))
case DeathWatchNotification(ref, _)
topLevelActors.remove(ref)
if (topLevelActors.isEmpty) {
if (terminationPromise.tryComplete(Success(Terminated(this)(null)))) {
eventStream.stopDefaultLoggers(ActorSystemImpl.this)
closeScheduler()
dispatchers.shutdown()
}
} else if (terminateTriggered.compareAndSet(false, true))
topLevelActors.asScala.foreach(ref ref.sendSystem(Terminate()))
case _ // ignore
}
override def isLocal: Boolean = true
}
private def createTopLevel[U](behavior: Behavior[U], name: String, props: Props): ActorRefImpl[U] = {
val dispatcher = props.firstOrElse[DispatcherSelector](DispatcherFromExecutionContext(executionContext))
val capacity = props.firstOrElse(MailboxCapacity(settings.DefaultMailboxCapacity))
val cell = new ActorCell(this, behavior, dispatchers.lookup(dispatcher), capacity.capacity, theOneWhoWalksTheBubblesOfSpaceTime)
val ref = new LocalActorRef(rootPath / name, cell)
cell.setSelf(ref)
topLevelActors.add(ref)
ref.sendSystem(Create())
ref
}
private val systemGuardian: ActorRefImpl[SystemCommand] = createTopLevel(systemGuardianBehavior, "system", EmptyProps)
private val userGuardian: ActorRefImpl[T] = createTopLevel(_userGuardianBehavior, "user", _userGuardianProps)
// now we can start up the loggers
eventStream.startUnsubscriber(this)
eventStream.startDefaultLoggers(this)
loadExtensions()
override def terminate(): Future[Terminated] = {
theOneWhoWalksTheBubblesOfSpaceTime.sendSystem(Terminate())
terminationPromise.future
}
override def whenTerminated: Future[Terminated] = terminationPromise.future
override def deadLetters[U]: ActorRefImpl[U] =
new ActorRef[U] with ActorRefImpl[U] {
override def path: a.ActorPath = rootPath
override def tell(msg: U): Unit = eventStream.publish(DeadLetter(msg))
override def sendSystem(signal: SystemMessage): Unit = {
signal match {
case Watch(watchee, watcher) watcher.sorryForNothing.sendSystem(DeathWatchNotification(watchee, null))
case _ // all good
}
eventStream.publish(DeadLetter(signal))
}
override def isLocal: Boolean = true
}
override def tell(msg: T): Unit = userGuardian.tell(msg)
override def sendSystem(msg: SystemMessage): Unit = userGuardian.sendSystem(msg)
override def isLocal: Boolean = true
def systemActorOf[U](behavior: Behavior[U], name: String, props: Props)(implicit timeout: Timeout): Future[ActorRef[U]] = {
import AskPattern._
implicit val sched = scheduler
systemGuardian ? CreateSystemActor(behavior, name, props)
}
def printTree: String = {
def printNode(node: ActorRefImpl[Nothing], indent: String): String = {
node match {
case wc: LocalActorRef[_]
val cell = wc.getCell
(if (indent.isEmpty) "-> " else indent.dropRight(1) + "⌊-> ") +
node.path.name + " " + e.Logging.simpleName(node) + " " +
(if (cell.behavior ne null) cell.behavior.getClass else "null") +
" status=" + cell.getStatus +
" nextMsg=" + cell.peekMessage +
(if (cell.children.isEmpty && cell.terminating.isEmpty) "" else "\n") +
({
val terminating = cell.terminating.toSeq.sorted.map(r printNode(r.sorryForNothing, indent + " T"))
val children = cell.children.toSeq.sorted
val bulk = children.dropRight(1) map (r printNode(r.sorryForNothing, indent + " |"))
terminating ++ bulk ++ (children.lastOption map (r printNode(r.sorryForNothing, indent + " ")))
} mkString "\n")
case _
indent + node.path.name + " " + e.Logging.simpleName(node)
}
}
printNode(systemGuardian, "") + "\n" +
printNode(userGuardian, "")
}
}

View file

@ -1,227 +0,0 @@
/**
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.actor.typed
package internal
import scala.concurrent.ExecutionContextExecutor
import java.util.concurrent.{ Executors, ExecutorService }
import akka.event.LoggingAdapter
import akka.{ actor a, dispatch d, event e }
import java.util.concurrent.ConcurrentHashMap
import akka.ConfigurationException
import com.typesafe.config.{ Config, ConfigFactory }
import akka.dispatch.ExecutionContexts
import java.util.concurrent.ConcurrentSkipListSet
import java.util.Comparator
class DispatchersImpl(settings: Settings, log: LoggingAdapter, prerequisites: d.DispatcherPrerequisites) extends Dispatchers {
def lookup(selector: DispatcherSelector): ExecutionContextExecutor =
selector match {
case DispatcherDefault(_) defaultGlobalDispatcher
case DispatcherFromConfig(path, _) lookup(path)
case DispatcherFromExecutor(ex: ExecutionContextExecutor, _) ex
case DispatcherFromExecutor(ex, _) d.ExecutionContexts.fromExecutor(ex)
case DispatcherFromExecutionContext(ec: ExecutionContextExecutor, _) ec
case DispatcherFromExecutionContext(ec, _) throw new UnsupportedOperationException("I thought all ExecutionContexts are also Executors?") // FIXME
}
def shutdown(): Unit = {
val i = allCreatedServices.iterator()
while (i.hasNext) i.next().shutdown()
allCreatedServices.clear()
}
import Dispatchers._
val cachingConfig = new d.CachingConfig(settings.config)
val defaultDispatcherConfig: Config =
idConfig(DefaultDispatcherId).withFallback(settings.config.getConfig(DefaultDispatcherId))
/**
* The one and only default dispatcher.
*/
def defaultGlobalDispatcher: ExecutionContextExecutor = lookup(DefaultDispatcherId)
private val dispatcherConfigurators = new ConcurrentHashMap[String, MessageDispatcherConfigurator]
private val allCreatedServices = new ConcurrentSkipListSet[ExecutorService](new Comparator[ExecutorService] {
override def compare(left: ExecutorService, right: ExecutorService): Int = {
val l = if (left == null) 0 else left.hashCode
val r = if (right == null) 0 else right.hashCode
if (l < r) -1 else if (l > r) 1 else 0
}
})
/**
* Returns a dispatcher as specified in configuration. Please note that this
* method _may_ create and return a NEW dispatcher, _every_ call.
*
* Throws ConfigurationException if the specified dispatcher cannot be found in the configuration.
*/
def lookup(id: String): ExecutionContextExecutor =
lookupConfigurator(id).dispatcher() match {
case es: ExecutorService
allCreatedServices.add(es)
es
case ece ece
}
/**
* Checks that the configuration provides a section for the given dispatcher.
* This does not guarantee that no ConfigurationException will be thrown when
* using this dispatcher, because the details can only be checked by trying
* to instantiate it, which might be undesirable when just checking.
*/
def hasDispatcher(id: String): Boolean = dispatcherConfigurators.containsKey(id) || cachingConfig.hasPath(id)
private def lookupConfigurator(id: String): MessageDispatcherConfigurator = {
dispatcherConfigurators.get(id) match {
case null
// It doesn't matter if we create a dispatcher configurator that isn't used due to concurrent lookup.
// That shouldn't happen often and in case it does the actual ExecutorService isn't
// created until used, i.e. cheap.
val newConfigurator =
if (cachingConfig.hasPath(id)) configuratorFrom(config(id))
else throw new ConfigurationException(s"Dispatcher [$id] not configured")
dispatcherConfigurators.putIfAbsent(id, newConfigurator) match {
case null newConfigurator
case existing existing
}
case existing existing
}
}
/**
* INTERNAL API
*/
private[akka] def config(id: String): Config = {
import scala.collection.JavaConverters._
def simpleName = id.substring(id.lastIndexOf('.') + 1)
idConfig(id)
.withFallback(settings.config.getConfig(id))
.withFallback(ConfigFactory.parseMap(Map("name" simpleName).asJava))
.withFallback(defaultDispatcherConfig)
}
private def idConfig(id: String): Config = {
import scala.collection.JavaConverters._
ConfigFactory.parseMap(Map("id" id).asJava)
}
/**
* INTERNAL API
*
* Creates a MessageDispatcherConfigurator from a Config.
*
* The Config must also contain a `id` property, which is the identifier of the dispatcher.
*
* Throws: IllegalArgumentException if the value of "type" is not valid
* IllegalArgumentException if it cannot create the MessageDispatcherConfigurator
*/
private def configuratorFrom(cfg: Config): MessageDispatcherConfigurator = {
if (!cfg.hasPath("id")) throw new ConfigurationException("Missing dispatcher 'id' property in config: " + cfg.root.render)
cfg.getString("type") match {
case "Dispatcher" new DispatcherConfigurator(cfg, prerequisites)
case "PinnedDispatcher" new PinnedDispatcherConfigurator(cfg, prerequisites)
case fqn
val args = List(classOf[Config] cfg)
prerequisites.dynamicAccess.createInstanceFor[DispatcherConfigurator](fqn, args).recover({
case exception
throw new ConfigurationException(
("Cannot instantiate DispatcherConfigurator type [%s], defined in [%s], " +
"make sure it has constructor with [com.typesafe.config.Config] and " +
"[akka.dispatch.DispatcherPrerequisites] parameters")
.format(fqn, cfg.getString("id")), exception)
}).get
}
}
}
/**
* Base class to be used for hooking in new dispatchers into Dispatchers.
*/
abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites: d.DispatcherPrerequisites) {
val config: Config = new d.CachingConfig(_config)
/**
* Returns an instance of MessageDispatcher given the configuration.
* Depending on the needs the implementation may return a new instance for
* each invocation or return the same instance every time.
*/
def dispatcher(): ExecutionContextExecutor
def configureExecutor(): d.ExecutorServiceConfigurator = {
def configurator(executor: String): d.ExecutorServiceConfigurator = executor match {
case null | "" | "fork-join-executor" new d.ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites)
case "thread-pool-executor" new d.ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites)
case fqcn
val args = List(
classOf[Config] config,
classOf[d.DispatcherPrerequisites] prerequisites)
prerequisites.dynamicAccess.createInstanceFor[d.ExecutorServiceConfigurator](fqcn, args).recover({
case exception throw new IllegalArgumentException(
("Cannot instantiate ExecutorServiceConfigurator (\"executor = [%s]\"), defined in [%s], " +
"make sure it has an accessible constructor with a [%s,%s] signature")
.format(fqcn, config.getString("id"), classOf[Config], classOf[d.DispatcherPrerequisites]), exception)
}).get
}
config.getString("executor") match {
case "default-executor" new d.DefaultExecutorServiceConfigurator(config.getConfig("default-executor"), prerequisites, configurator(config.getString("default-executor.fallback")))
case other configurator(other)
}
}
}
/**
* Configurator for creating [[akka.dispatch.Dispatcher]].
* Returns the same dispatcher instance for for each invocation
* of the `dispatcher()` method.
*/
class DispatcherConfigurator(config: Config, prerequisites: d.DispatcherPrerequisites)
extends MessageDispatcherConfigurator(config, prerequisites) {
private val instance = ExecutionContexts.fromExecutorService(
configureExecutor().createExecutorServiceFactory(config.getString("id"), prerequisites.threadFactory)
.createExecutorService)
/**
* Returns the same dispatcher instance for each invocation
*/
override def dispatcher(): ExecutionContextExecutor = instance
}
/**
* Configurator for creating [[akka.dispatch.PinnedDispatcher]].
* Returns new dispatcher instance for for each invocation
* of the `dispatcher()` method.
*/
class PinnedDispatcherConfigurator(config: Config, prerequisites: d.DispatcherPrerequisites)
extends MessageDispatcherConfigurator(config, prerequisites) {
private val threadPoolConfig: d.ThreadPoolConfig = configureExecutor() match {
case e: d.ThreadPoolExecutorConfigurator e.threadPoolConfig
case other
prerequisites.eventStream.publish(
e.Logging.Warning(
"PinnedDispatcherConfigurator",
this.getClass,
"PinnedDispatcher [%s] not configured to use ThreadPoolExecutor, falling back to default config.".format(
config.getString("id"))))
d.ThreadPoolConfig()
}
private val factory = threadPoolConfig.createExecutorServiceFactory(config.getString("id"), prerequisites.threadFactory)
/**
* Creates new dispatcher for each invocation.
*/
override def dispatcher(): ExecutionContextExecutor = ExecutionContexts.fromExecutorService(factory.createExecutorService)
}

View file

@ -1,339 +0,0 @@
/**
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package akka.actor.typed
package internal
import akka.{ actor a, event e }
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.concurrent.{ Await, Promise }
import akka.util.{ ReentrantGuard, Subclassification, SubclassifiedIndex }
import scala.collection.immutable
import java.util.concurrent.TimeoutException
import akka.util.Timeout
import akka.actor.typed.scaladsl.AskPattern
import akka.typed.{ EventStream, Logger }
/**
* INTERNAL API
*
* An Akka EventStream is a pub-sub stream of events both system and user generated,
* where subscribers are ActorRefs and the channels are Classes and Events are any java.lang.Object.
* EventStreams employ SubchannelClassification, which means that if you listen to a Class,
* you'll receive any message that is of that type or a subtype.
*
* The debug flag in the constructor toggles if operations on this EventStream should also be published
* as Debug-Events
*/
private[typed] class EventStreamImpl(private val debug: Boolean)(implicit private val timeout: Timeout) extends EventStream {
import e.Logging._
import EventStreamImpl._
private val unsubscriberPromise = Promise[ActorRef[Command]]
private val unsubscriber = ActorRef(unsubscriberPromise.future)
/**
* ''Must'' be called after actor system is "ready".
* Starts system actor that takes care of unsubscribing subscribers that have terminated.
*/
def startUnsubscriber(sys: ActorSystem[Nothing]): Unit =
unsubscriberPromise.completeWith(sys.systemActorOf(unsubscriberBehavior, "eventStreamUnsubscriber"))
private val unsubscriberBehavior = {
// TODO avoid depending on dsl here?
import scaladsl.Actor
Actor.deferred[Command] { _
if (debug) publish(e.Logging.Debug(simpleName(getClass), getClass, s"registering unsubscriber with $this"))
Actor.immutable[Command] { (ctx, msg)
msg match {
case Register(actor)
if (debug) publish(e.Logging.Debug(simpleName(getClass), getClass, s"watching $actor in order to unsubscribe from EventStream when it terminates"))
ctx.watch(actor)
Actor.same
case UnregisterIfNoMoreSubscribedChannels(actor) if hasSubscriptions(actor) Actor.same
// hasSubscriptions can be slow, but it's better for this actor to take the hit than the EventStream
case UnregisterIfNoMoreSubscribedChannels(actor)
if (debug) publish(e.Logging.Debug(simpleName(getClass), getClass, s"unwatching $actor, since has no subscriptions"))
ctx.unwatch(actor)
Actor.same
}
} onSignal {
case (_, Terminated(actor))
if (debug) publish(e.Logging.Debug(simpleName(getClass), getClass, s"unsubscribe $actor from $this, because it was terminated"))
unsubscribe(actor)
Actor.same
}
}
}
private val guard = new ReentrantGuard
private var loggers = Seq.empty[(ActorRef[Logger.Command], ActorRef[LogEvent])]
@volatile private var _logLevel: LogLevel = _
/**
* Query currently set log level. See object Logging for more information.
*/
def logLevel = _logLevel
/**
* Change log level: default loggers (i.e. from configuration file) are
* subscribed/unsubscribed as necessary so that they listen to all levels
* which are at least as severe as the given one. See object Logging for
* more information.
*
* NOTE: if the StandardOutLogger is configured also as normal logger, it
* will not participate in the automatic management of log level
* subscriptions!
*/
def setLogLevel(level: LogLevel): Unit = guard.withGuard {
val logLvl = _logLevel // saves (2 * AllLogLevel.size - 1) volatile reads (because of the loops below)
for {
l AllLogLevels
// subscribe if previously ignored and now requested
if l > logLvl && l <= level
(logger, channel) loggers
} subscribe(channel, classFor(l))
for {
l AllLogLevels
// unsubscribe if previously registered and now ignored
if l <= logLvl && l > level
(logger, channel) loggers
} unsubscribe(channel, classFor(l))
_logLevel = level
}
private def setUpStdoutLogger(settings: Settings) {
val level = levelFor(settings.untyped.StdoutLogLevel) getOrElse {
// only log initialization errors directly with StandardOutLogger.print
StandardOutLogger.print(Error(new LoggerException, simpleName(this), this.getClass, "unknown akka.stdout-loglevel " + settings.untyped.StdoutLogLevel))
ErrorLevel
}
AllLogLevels filter (level >= _) foreach (l subscribe(StandardOutLogger, classFor(l)))
guard.withGuard {
loggers :+= internal.BlackholeActorRef StandardOutLogger
_logLevel = level
}
}
/**
* Actor-less logging implementation for synchronous logging to standard
* output. This logger is always attached first in order to be able to log
* failures during application start-up, even before normal logging is
* started. Its log level can be defined by configuration setting
* <code>akka.stdout-loglevel</code>.
*/
private[typed] class StandardOutLogger extends ActorRef[LogEvent] with ActorRefImpl[LogEvent] with StdOutLogger {
override def path: a.ActorPath = StandardOutLoggerPath
override def tell(message: LogEvent): Unit =
if (message == null) throw a.InvalidMessageException("Message must not be null")
else print(message)
def isLocal: Boolean = true
def sendSystem(signal: SystemMessage): Unit = ()
@throws(classOf[java.io.ObjectStreamException])
protected def writeReplace(): AnyRef = serializedStandardOutLogger
}
private val serializedStandardOutLogger = new SerializedStandardOutLogger
@SerialVersionUID(1L)
private[typed] class SerializedStandardOutLogger extends Serializable {
@throws(classOf[java.io.ObjectStreamException])
private def readResolve(): AnyRef = StandardOutLogger
}
private val StandardOutLogger = new StandardOutLogger
private val UnhandledMessageForwarder = {
// TODO avoid depending on dsl here?
import scaladsl.Actor.{ same, immutable }
immutable[a.UnhandledMessage] {
case (_, a.UnhandledMessage(msg, sender, rcp))
publish(Debug(rcp.path.toString, rcp.getClass, "unhandled message from " + sender + ": " + msg))
same
}
}
def startStdoutLogger(settings: Settings) {
setUpStdoutLogger(settings)
publish(Debug(simpleName(this), this.getClass, "StandardOutLogger started"))
}
def startDefaultLoggers(system: ActorSystemImpl[Nothing]) {
val logName = simpleName(this) + "(" + system + ")"
val level = levelFor(system.settings.untyped.LogLevel) getOrElse {
// only log initialization errors directly with StandardOutLogger.print
StandardOutLogger.print(Error(new LoggerException, logName, this.getClass, "unknown akka.loglevel " + system.settings.untyped.LogLevel))
ErrorLevel
}
try {
val defaultLoggers = system.settings.Loggers match {
case Nil classOf[DefaultLogger].getName :: Nil
case loggers loggers
}
val myloggers =
for {
loggerName defaultLoggers
if loggerName != StandardOutLogger.getClass.getName
} yield {
system.dynamicAccess.getClassFor[Logger](loggerName).map(addLogger(system, _, level, logName))
.recover({
case e throw new akka.ConfigurationException(
"Logger specified in config can't be loaded [" + loggerName +
"] due to [" + e.toString + "]", e)
}).get
}
guard.withGuard {
loggers = myloggers
_logLevel = level
}
if (system.settings.untyped.DebugUnhandledMessage)
subscribe(
ActorRef(
system.systemActorOf(UnhandledMessageForwarder, "UnhandledMessageForwarder")),
classOf[a.UnhandledMessage])
publish(Debug(logName, this.getClass, "Default Loggers started"))
if (!(defaultLoggers contains StandardOutLogger.getClass.getName)) {
unsubscribe(StandardOutLogger)
}
} catch {
case e: Exception
if (!system.whenTerminated.isCompleted) {
System.err.println("error while starting up loggers")
e.printStackTrace()
throw new akka.ConfigurationException("Could not start logger due to [" + e.toString + "]")
}
}
}
def stopDefaultLoggers(system: ActorSystem[Nothing]) {
val level = _logLevel // volatile access before reading loggers
if (!(loggers contains StandardOutLogger)) {
setUpStdoutLogger(system.settings)
publish(Debug(simpleName(this), this.getClass, "shutting down: StandardOutLogger started"))
}
for {
(logger, channel) loggers
} {
// this is very necessary, else you get infinite loop with DeadLetter
unsubscribe(channel)
import internal._
logger.sorry.sendSystem(Terminate())
}
publish(Debug(simpleName(this), this.getClass, "all default loggers stopped"))
}
private def addLogger(
system: ActorSystemImpl[Nothing],
clazz: Class[_ <: Logger],
level: LogLevel,
logName: String): (ActorRef[Logger.Command], ActorRef[LogEvent]) = {
val name = "log" + system.loggerId() + "-" + simpleName(clazz)
val logger = clazz.newInstance()
val actor = ActorRef(system.systemActorOf(logger.initialBehavior, name, DispatcherFromConfig(system.settings.untyped.LoggersDispatcher)))
import AskPattern._
implicit val scheduler = system.scheduler
val logChannel = try Await.result(actor ? (Logger.Initialize(this, _: ActorRef[ActorRef[LogEvent]])), timeout.duration) catch {
case ex: TimeoutException
publish(Warning(logName, this.getClass, "Logger " + name + " did not respond within " + timeout + " to InitializeLogger(bus)"))
throw ex
}
AllLogLevels filter (level >= _) foreach (l subscribe(logChannel, classFor(l)))
publish(Debug(logName, this.getClass, "logger " + name + " started"))
(actor, logChannel)
}
private val subscriptions = new SubclassifiedIndex[Class[_], ActorRef[Any]]()(subclassification)
@volatile
private var cache = Map.empty[Class[_], Set[ActorRef[Any]]]
override def subscribe[T](subscriber: ActorRef[T], channel: Class[T]): Boolean = {
if (subscriber eq null) throw new IllegalArgumentException("subscriber is null")
if (debug) publish(e.Logging.Debug(simpleName(this), this.getClass, "subscribing " + subscriber + " to channel " + channel))
unsubscriber ! Register(subscriber)
subscriptions.synchronized {
val diff = subscriptions.addValue(channel, subscriber.upcast[Any])
addToCache(diff)
diff.nonEmpty
}
}
override def unsubscribe[T](subscriber: ActorRef[T], channel: Class[T]): Boolean = {
if (subscriber eq null) throw new IllegalArgumentException("subscriber is null")
val ret = subscriptions.synchronized {
val diff = subscriptions.removeValue(channel, subscriber.upcast[Any])
// removeValue(K, V) does not return the diff to remove from or add to the cache
// but instead the whole set of keys and values that should be updated in the cache
cache ++= diff
diff.nonEmpty
}
if (debug) publish(e.Logging.Debug(simpleName(this), this.getClass, "unsubscribing " + subscriber + " from channel " + channel))
unsubscriber ! UnregisterIfNoMoreSubscribedChannels(subscriber.upcast[Any])
ret
}
override def unsubscribe[T](subscriber: ActorRef[T]) {
if (subscriber eq null) throw new IllegalArgumentException("subscriber is null")
subscriptions.synchronized {
removeFromCache(subscriptions.removeValue(subscriber.upcast[Any]))
}
if (debug) publish(e.Logging.Debug(simpleName(this), this.getClass, "unsubscribing " + subscriber + " from all channels"))
unsubscriber ! UnregisterIfNoMoreSubscribedChannels(subscriber.upcast[Any])
}
override def publish[T](event: T): Unit = {
val c = event.asInstanceOf[AnyRef].getClass
val recv =
if (cache contains c) cache(c) // c will never be removed from cache
else subscriptions.synchronized {
if (cache contains c) cache(c)
else {
addToCache(subscriptions.addKey(c))
cache(c)
}
}
recv foreach (_ ! event)
}
/**
* Expensive call! Avoid calling directly from event bus subscribe / unsubscribe.
*/
private def hasSubscriptions(subscriber: ActorRef[Any]): Boolean =
cache.values exists { _ contains subscriber }
private def removeFromCache(changes: immutable.Seq[(Class[_], Set[ActorRef[Any]])]): Unit =
cache = (cache /: changes) {
case (m, (c, cs)) m.updated(c, m.getOrElse(c, Set.empty[ActorRef[Any]]) diff cs)
}
private def addToCache(changes: immutable.Seq[(Class[_], Set[ActorRef[Any]])]): Unit =
cache = (cache /: changes) {
case (m, (c, cs)) m.updated(c, m.getOrElse(c, Set.empty[ActorRef[Any]]) union cs)
}
}
private[typed] object EventStreamImpl {
sealed trait Command
final case class Register(actor: ActorRef[Nothing]) extends Command
final case class UnregisterIfNoMoreSubscribedChannels(actor: ActorRef[Any]) extends Command
private val subclassification = new Subclassification[Class[_]] {
def isEqual(x: Class[_], y: Class[_]) = x == y
def isSubclass(x: Class[_], y: Class[_]) = y isAssignableFrom x
}
val StandardOutLoggerPath = a.RootActorPath(a.Address("akka.actor.typed.internal", "StandardOutLogger"))
}

View file

@ -25,7 +25,7 @@ trait ExtensionsImpl extends Extensions { self: ActorSystem[_] ⇒
/**
* Hook for ActorSystem to load extensions on startup
*/
protected final def loadExtensions(): Unit = {
final def loadExtensions(): Unit = {
/**
* @param throwOnLoadFail Throw exception when an extension fails to load (needed for backwards compatibility)
*/
@ -57,10 +57,6 @@ trait ExtensionsImpl extends Extensions { self: ActorSystem[_] ⇒
}
}
// eager initialization of CoordinatedShutdown
// TODO coordinated shutdown for akka.actor.typed
// CoordinatedShutdown(self)
loadExtensions("akka.typed.library-extensions", throwOnLoadFail = true)
loadExtensions("akka.typed.extensions", throwOnLoadFail = false)
}

View file

@ -14,8 +14,6 @@ import akka.util.OptionVal
* INTERNAL API
*/
private[typed] trait SupervisionMechanics[T] {
import ActorCell._
/*
* INTERFACE WITH ACTOR CELL
*/
@ -46,7 +44,6 @@ private[typed] trait SupervisionMechanics[T] {
* Process one system message and return whether further messages shall be processed.
*/
protected def processSignal(message: SystemMessage): Boolean = {
if (ActorCell.Debug) println(s"[${Thread.currentThread.getName}] $self processing system message $message")
message match {
case Watch(watchee, watcher) { addWatcher(watchee.sorryForNothing, watcher.sorryForNothing); true }
case Unwatch(watchee, watcher) { remWatcher(watchee.sorryForNothing, watcher.sorryForNothing); true }

View file

@ -197,25 +197,25 @@ private[akka] sealed trait SystemMessage extends Serializable {
* INTERNAL API
*/
@SerialVersionUID(1L)
private[typed] final case class Create() extends SystemMessage
private[akka] final case class Create() extends SystemMessage
/**
* INTERNAL API
*/
@SerialVersionUID(1L)
private[typed] final case class Terminate() extends SystemMessage
private[akka] final case class Terminate() extends SystemMessage
/**
* INTERNAL API
*/
@SerialVersionUID(1L)
private[typed] final case class Watch(watchee: ActorRef[Nothing], watcher: ActorRef[Nothing]) extends SystemMessage
private[akka] final case class Watch(watchee: ActorRef[Nothing], watcher: ActorRef[Nothing]) extends SystemMessage
/**
* INTERNAL API
*/
@SerialVersionUID(1L)
private[typed] final case class Unwatch(watchee: ActorRef[Nothing], watcher: ActorRef[Nothing]) extends SystemMessage
private[akka] final case class Unwatch(watchee: ActorRef[Nothing], watcher: ActorRef[Nothing]) extends SystemMessage
/**
* INTERNAL API

View file

@ -17,6 +17,7 @@ import akka.dispatch.sysmsg
extends ActorRef[T] with internal.ActorRefImpl[T] {
override def path: a.ActorPath = untyped.path
override def tell(msg: T): Unit = {
if (msg == null) throw new InvalidMessageException("[null] is not an allowed message")
untyped ! msg

View file

@ -25,15 +25,19 @@ import akka.typed.EventStream
@InternalApi private[akka] class ActorSystemAdapter[-T](val untyped: a.ActorSystemImpl)
extends ActorSystem[T] with ActorRef[T] with internal.ActorRefImpl[T] with ExtensionsImpl {
loadExtensions()
import ActorRefAdapter.sendSystemMessage
// Members declared in akka.actor.typed.ActorRef
override def tell(msg: T): Unit = {
if (msg == null) throw new InvalidMessageException("[null] is not an allowed message")
if (msg == null) throw InvalidMessageException("[null] is not an allowed message")
untyped.guardian ! msg
}
override def isLocal: Boolean = true
override def sendSystem(signal: internal.SystemMessage): Unit = sendSystemMessage(untyped.guardian, signal)
final override val path: a.ActorPath = a.RootActorPath(a.Address("akka", untyped.name)) / "user"
override def toString: String = untyped.toString
@ -87,6 +91,7 @@ private[akka] object ActorSystemAdapter {
class AdapterExtension(system: a.ExtendedActorSystem) extends a.Extension {
val adapter = new ActorSystemAdapter(system.asInstanceOf[a.ActorSystemImpl])
}
object AdapterExtension extends a.ExtensionId[AdapterExtension] with a.ExtensionIdProvider {
override def get(system: a.ActorSystem): AdapterExtension = super.get(system)
override def lookup = AdapterExtension

View file

@ -9,7 +9,6 @@ import akka.actor.InternalActorRef
import akka.pattern.AskTimeoutException
import akka.pattern.PromiseActorRef
import akka.actor.Scheduler
import akka.actor.typed.internal.FunctionRef
import akka.actor.RootActorPath
import akka.actor.Address
import akka.annotation.InternalApi
@ -64,7 +63,7 @@ object AskPattern {
ref match {
case a: adapt.ActorRefAdapter[_] askUntyped(ref, a.untyped, timeout, f)
case a: adapt.ActorSystemAdapter[_] askUntyped(ref, a.untyped.guardian, timeout, f)
case _ ask(ref, timeout, scheduler, f)
case a throw new IllegalStateException("Only expect actor references to be ActorRefAdapter or ActorSystemAdapter until native system is implemented: " + a.getClass)
}
}
@ -99,24 +98,5 @@ object AskPattern {
p.future
}
private def ask[T, U](actorRef: ActorRef[T], timeout: Timeout, scheduler: Scheduler, f: ActorRef[U] T): Future[U] = {
import akka.dispatch.ExecutionContexts.{ sameThreadExecutionContext ec }
val p = Promise[U]
val ref = new FunctionRef[U](
AskPath,
(msg, self) {
p.trySuccess(msg)
self.sendSystem(akka.actor.typed.internal.Terminate())
},
(self) if (!p.isCompleted) p.tryFailure(new NoSuchElementException("ask pattern terminated before value was received")))
actorRef ! f(ref)
val d = timeout.duration
val c = scheduler.scheduleOnce(d)(p.tryFailure(new AskTimeoutException(s"did not receive message within $d")))(ec)
val future = p.future
future.andThen {
case _ c.cancel()
}(ec)
}
private[typed] val AskPath = RootActorPath(Address("akka.actor.typed.internal", "ask"))
}

View file

@ -64,34 +64,6 @@ object Logger {
// FIXME add Mute/Unmute (i.e. the TestEventListener functionality)
}
class DefaultLogger extends Logger with StdOutLogger {
import Logger._
val initialBehavior = {
// TODO avoid depending on dsl here?
import akka.actor.typed.scaladsl.Actor._
deferred[Command] { _
immutable[Command] {
case (ctx, Initialize(eventStream, replyTo))
val log = ctx.spawn(deferred[AnyRef] { childCtx
immutable[AnyRef] {
case (_, event: LogEvent)
print(event)
same
case _ unhandled
}
}, "logger")
ctx.watch(log) // sign death pact
replyTo ! log
empty
}
}
}
}
class DefaultLoggingFilter(settings: Settings, eventStream: EventStream) extends e.DefaultLoggingFilter(() eventStream.logLevel)
/**

View file

@ -973,7 +973,9 @@ private[akka] class ActorSystemImpl(
private[this] final val ref = new AtomicReference(done)
// onComplete never fires twice so safe to avoid null check
upStreamTerminated onComplete { t ref.getAndSet(null).complete(t) }
upStreamTerminated onComplete {
t ref.getAndSet(null).complete(t)
}
/**
* Adds a Runnable that will be executed on ActorSystem termination.

View file

@ -116,7 +116,7 @@ class ClusterSingletonApiSpec extends TypedSpec(ClusterSingletonApiSpec.config)
node1UpProbe.expectMsgType[SelfUp]
node2UpProbe.expectMsgType[SelfUp]
val cs1 = ClusterSingleton(system)
val cs1: ClusterSingleton = ClusterSingleton(system)
val cs2 = ClusterSingleton(adaptedSystem2)
val settings = ClusterSingletonSettings(system).withRole("singleton")

View file

@ -6,7 +6,7 @@ package akka.typed.testkit
import java.util.concurrent.{ ConcurrentLinkedQueue, ThreadLocalRandom }
import akka.actor.{ Address, RootActorPath }
import akka.actor.typed.{ ActorRef, internal }
import akka.actor.typed.ActorRef
import scala.annotation.tailrec
import scala.collection.immutable
@ -24,7 +24,7 @@ class Inbox[T](name: String) {
val ref: ActorRef[T] = {
val uid = ThreadLocalRandom.current().nextInt()
val path = RootActorPath(Address("akka.actor.typed.inbox", "anonymous")).child(name).withUid(uid)
new internal.FunctionRef[T](path, (msg, self) q.add(msg), (self) ())
new FunctionRef[T](path, (msg, self) q.add(msg), (self) ())
}
def receiveMsg(): T = q.poll() match {

View file

@ -1,14 +1,108 @@
package akka.typed.testkit
import akka.actor.InvalidMessageException
import akka.{ actor untyped }
import akka.actor.typed._
import akka.util.Helpers
import akka.{ actor a }
import akka.util.Unsafe.{ instance unsafe }
import scala.collection.immutable.TreeMap
import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration.FiniteDuration
import akka.annotation.InternalApi
import akka.actor.typed.internal.ActorContextImpl
import akka.actor.typed.internal.{ ActorContextImpl, ActorRefImpl }
import scala.annotation.tailrec
import scala.util.control.NonFatal
/**
* A local synchronous ActorRef that invokes the given function for every message send.
* This reference can be watched and will do the right thing when it receives a [[akka.actor.typed.internal.DeathWatchNotification]].
* This reference cannot watch other references.
*/
private[akka] final class FunctionRef[-T](
_path: a.ActorPath,
send: (T, FunctionRef[T]) Unit,
_terminate: FunctionRef[T] Unit)
extends WatchableRef[T](_path) {
override def tell(msg: T): Unit = {
if (msg == null) throw InvalidMessageException("[null] is not an allowed message")
if (isAlive)
try send(msg, this) catch {
case NonFatal(ex) // nothing we can do here
}
else () // we dont have deadLetters available
}
import internal._
override def sendSystem(signal: SystemMessage): Unit = signal match {
case internal.Create() // nothing to do
case internal.DeathWatchNotification(ref, cause) // were not watching, and were not a parent either
case internal.Terminate() doTerminate()
case internal.Watch(watchee, watcher) if (watchee == this && watcher != this) addWatcher(watcher.sorryForNothing)
case internal.Unwatch(watchee, watcher) if (watchee == this && watcher != this) remWatcher(watcher.sorryForNothing)
case NoMessage // nothing to do
}
override def isLocal = true
override def terminate(): Unit = _terminate(this)
}
/**
* The mechanics for synthetic ActorRefs that have a lifecycle and support being watched.
*/
private[typed] abstract class WatchableRef[-T](override val path: a.ActorPath) extends ActorRef[T] with ActorRefImpl[T] {
import WatchableRef._
/**
* Callback that is invoked when this ref has terminated. Even if doTerminate() is
* called multiple times, this callback is invoked only once.
*/
protected def terminate(): Unit
type S = Set[ActorRefImpl[Nothing]]
@volatile private[this] var _watchedBy: S = Set.empty
protected def isAlive: Boolean = _watchedBy != null
protected def doTerminate(): Unit = {
val watchedBy = unsafe.getAndSetObject(this, watchedByOffset, null).asInstanceOf[S]
if (watchedBy != null) {
try terminate() catch { case NonFatal(ex) }
if (watchedBy.nonEmpty) watchedBy foreach sendTerminated
}
}
private def sendTerminated(watcher: ActorRefImpl[Nothing]): Unit =
watcher.sendSystem(internal.DeathWatchNotification(this, null))
@tailrec final protected def addWatcher(watcher: ActorRefImpl[Nothing]): Unit =
_watchedBy match {
case null sendTerminated(watcher)
case watchedBy
if (!watchedBy.contains(watcher))
if (!unsafe.compareAndSwapObject(this, watchedByOffset, watchedBy, watchedBy + watcher))
addWatcher(watcher) // try again
}
@tailrec final protected def remWatcher(watcher: ActorRefImpl[Nothing]): Unit = {
_watchedBy match {
case null // do nothing...
case watchedBy
if (watchedBy.contains(watcher))
if (!unsafe.compareAndSwapObject(this, watchedByOffset, watchedBy, watchedBy - watcher))
remWatcher(watcher) // try again
}
}
}
private[typed] object WatchableRef {
val watchedByOffset = unsafe.objectFieldOffset(classOf[WatchableRef[_]].getDeclaredField("_watchedBy"))
}
/**
* An [[ActorContext]] for synchronous execution of a [[Behavior]] that
@ -72,10 +166,12 @@ class StubbedActorContext[T](
* INTERNAL API
*/
@InternalApi private[akka] def internalSpawnAdapter[U](f: U T, name: String): ActorRef[U] = {
val n = if (name != "") s"${childName.next()}-$name" else childName.next()
val i = Inbox[U](n)
_children += i.ref.path.name i
new internal.FunctionRef[U](
new FunctionRef[U](
self.path / i.ref.path.name,
(msg, _) { val m = f(msg); if (m != null) { selfInbox.ref ! m; i.ref ! msg } },
(self) selfInbox.ref.sorry.sendSystem(internal.DeathWatchNotification(self, null)))

View file

@ -80,7 +80,7 @@ object EffectfulActorContextSpec {
class EffectfulActorContextSpec extends FlatSpec with Matchers {
private val props = Props.empty.withMailboxCapacity(10)
private val props = Props.empty
"EffectfulActorContext's spawn" should "create children when no props specified" in {
val system = ActorSystem.create(Father.init(), "father-system")

View file

@ -445,3 +445,11 @@ def akkaModule(name: String): Project =
.settings(akka.AkkaBuild.defaultSettings)
.settings(akka.Formatting.formatSettings)
.enablePlugins(BootstrapGenjavadoc)
lazy val typedTests = taskKey[Unit]("Runs all the typed tests")
typedTests := {
(test in(actorTyped, Test)).value
(test in(actorTypedTests, Test)).value
(test in(clusterTyped, Test)).value
(test in(clusterShardingTyped, Test)).value
}