diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala index 23373a8af6..5d3358dc6f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala @@ -14,13 +14,7 @@ import java.util.concurrent.atomic._ object ActorLifeCycleSpec { -} - -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender { - import ActorLifeCycleSpec._ - - class LifeCycleTestActor(id: String, generationProvider: AtomicInteger) extends Actor { + class LifeCycleTestActor(testActor: ActorRef, id: String, generationProvider: AtomicInteger) extends Actor { def report(msg: Any) = testActor ! message(msg) def message(msg: Any): Tuple3[Any, String, Int] = (msg, id, currentGen) val currentGen = generationProvider.getAndIncrement() @@ -29,6 +23,12 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS def receive = { case "status" ⇒ sender ! message("OK") } } +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender { + import ActorLifeCycleSpec._ + "An Actor" must { "invoke preRestart, preStart, postRestart when using OneForOneStrategy" in { @@ -36,7 +36,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS val id = newUuid().toString val supervisor = actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3)))) val gen = new AtomicInteger(0) - val restarterProps = Props(new LifeCycleTestActor(id, gen) { + val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen) { override def preRestart(reason: Throwable, message: Option[Any]) { report("preRestart") } override def postRestart(reason: Throwable) { report("postRestart") } }) @@ -70,7 +70,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS val id = newUuid().toString val supervisor = actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3)))) val gen = new AtomicInteger(0) - val restarterProps = Props(new LifeCycleTestActor(id, gen)) + val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen)) val restarter = (supervisor ? restarterProps).as[ActorRef].get expectMsg(("preStart", id, 0)) @@ -100,7 +100,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS val id = newUuid().toString val supervisor = actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3)))) val gen = new AtomicInteger(0) - val props = Props(new LifeCycleTestActor(id, gen)) + val props = Props(new LifeCycleTestActor(testActor, id, gen)) val a = (supervisor ? props).as[ActorRef].get expectMsg(("preStart", id, 0)) a ! "status" diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala index d9e888731c..7d829ec622 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala @@ -164,11 +164,12 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im case Ev("go") ⇒ goto(2) } }) + val name = fsm.toString filterException[Logging.EventHandlerException] { system.eventStream.subscribe(testActor, classOf[Logging.Error]) fsm ! "go" expectMsgPF(1 second, hint = "Next state 2 does not exist") { - case Logging.Error(_, `fsm`, "Next state 2 does not exist") ⇒ true + case Logging.Error(_, `name`, "Next state 2 does not exist") ⇒ true } system.eventStream.unsubscribe(testActor) } @@ -212,18 +213,19 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im case StopEvent(r, _, _) ⇒ testActor ! r } }) + val name = fsm.toString system.eventStream.subscribe(testActor, classOf[Logging.Debug]) fsm ! "go" expectMsgPF(1 second, hint = "processing Event(go,null)") { - case Logging.Debug(`fsm`, s: String) if s.startsWith("processing Event(go,null) from Actor[") ⇒ true + case Logging.Debug(`name`, s: String) if s.startsWith("processing Event(go,null) from Actor[") ⇒ true } - expectMsg(1 second, Logging.Debug(fsm, "setting timer 't'/1500 milliseconds: Shutdown")) - expectMsg(1 second, Logging.Debug(fsm, "transition 1 -> 2")) + expectMsg(1 second, Logging.Debug(name, "setting timer 't'/1500 milliseconds: Shutdown")) + expectMsg(1 second, Logging.Debug(name, "transition 1 -> 2")) fsm ! "stop" expectMsgPF(1 second, hint = "processing Event(stop,null)") { - case Logging.Debug(`fsm`, s: String) if s.startsWith("processing Event(stop,null) from Actor[") ⇒ true + case Logging.Debug(`name`, s: String) if s.startsWith("processing Event(stop,null) from Actor[") ⇒ true } - expectMsgAllOf(1 second, Logging.Debug(fsm, "canceling timer 't'"), Normal) + expectMsgAllOf(1 second, Logging.Debug(name, "canceling timer 't'"), Normal) expectNoMsg(1 second) system.eventStream.unsubscribe(testActor) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala index 6e243fc020..e915b69ecd 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala @@ -106,8 +106,8 @@ class FSMTimingSpec extends AkkaSpec with ImplicitSender { } "notify unhandled messages" taggedAs TimingTest in { - filterEvents(EventFilter.warning("unhandled event Tick in state TestUnhandled", source = fsm, occurrences = 1), - EventFilter.warning("unhandled event Unhandled(test) in state TestUnhandled", source = fsm, occurrences = 1)) { + filterEvents(EventFilter.warning("unhandled event Tick in state TestUnhandled", source = fsm.toString, occurrences = 1), + EventFilter.warning("unhandled event Unhandled(test) in state TestUnhandled", source = fsm.toString, occurrences = 1)) { fsm ! TestUnhandled within(1 second) { fsm ! Tick diff --git a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala index d4f08e40c2..3765ad5b6c 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/IOActor.scala @@ -42,7 +42,7 @@ object IOActorSpec { class SimpleEchoClient(host: String, port: Int, ioManager: ActorRef) extends Actor with IO { - lazy val socket: SocketHandle = connect(ioManager, host, port, reader) + lazy val socket: SocketHandle = connect(ioManager, host, port)(reader) lazy val reader: ActorRef = context.actorOf { new Actor with IO { def receiveIO = { diff --git a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala index fc9e5812e7..48c8bc223a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala @@ -58,9 +58,9 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd val r: Actor.Receive = { case null ⇒ } - val log = Actor.LoggingReceive(this, r) + val log = Actor.LoggingReceive("funky", r) log.isDefinedAt("hallo") - expectMsg(1 second, Logging.Debug(this, "received unhandled message hallo")) + expectMsg(1 second, Logging.Debug("funky", "received unhandled message hallo")) } } @@ -75,9 +75,10 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd sender ! "x" } }) + val name = actor.toString actor ! "buh" within(1 second) { - expectMsg(Logging.Debug(actor.underlyingActor, "received handled message buh")) + expectMsg(Logging.Debug(name, "received handled message buh")) expectMsg("x") } val r: Actor.Receive = { @@ -88,7 +89,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd within(500 millis) { actor ! "bah" expectMsgPF() { - case Logging.Error(_: UnhandledMessageException, `actor`, _) ⇒ true + case Logging.Error(_: UnhandledMessageException, `name`, _) ⇒ true } } } @@ -105,7 +106,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd }) actor ! "buh" within(1 second) { - expectMsg(Logging.Debug(actor.underlyingActor, "received handled message buh")) + expectMsg(Logging.Debug(actor.toString, "received handled message buh")) expectMsg("x") } } @@ -123,9 +124,10 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd case _ ⇒ } }) + val name = actor.toString actor ! PoisonPill expectMsgPF() { - case Logging.Debug(`actor`, msg: String) if msg startsWith "received AutoReceiveMessage Envelope(PoisonPill" ⇒ true + case Logging.Debug(`name`, msg: String) if msg startsWith "received AutoReceiveMessage Envelope(PoisonPill" ⇒ true } awaitCond(actor.isShutdown, 100 millis) } @@ -143,20 +145,23 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd system.eventStream.subscribe(testActor, classOf[Logging.Error]) within(3 seconds) { val lifecycleGuardian = appLifecycle.asInstanceOf[ActorSystemImpl].guardian + val lname = lifecycleGuardian.toString val supervisor = TestActorRef[TestLogActor](Props[TestLogActor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 5, 5000))) + val sname = supervisor.toString val supervisorSet = receiveWhile(messages = 2) { - case Logging.Debug(`lifecycleGuardian`, msg: String) if msg startsWith "now supervising" ⇒ 1 - case Logging.Debug(`supervisor`, msg: String) if msg startsWith "started" ⇒ 2 + case Logging.Debug(`lname`, msg: String) if msg startsWith "now supervising" ⇒ 1 + case Logging.Debug(`sname`, msg: String) if msg startsWith "started" ⇒ 2 }.toSet expectNoMsg(Duration.Zero) assert(supervisorSet == Set(1, 2), supervisorSet + " was not Set(1, 2)") val actor = TestActorRef[TestLogActor](Props[TestLogActor], supervisor, "none") + val aname = actor.toString val set = receiveWhile(messages = 2) { - case Logging.Debug(`supervisor`, msg: String) if msg startsWith "now supervising" ⇒ 1 - case Logging.Debug(`actor`, msg: String) if msg startsWith "started" ⇒ 2 + case Logging.Debug(`sname`, msg: String) if msg startsWith "now supervising" ⇒ 1 + case Logging.Debug(`aname`, msg: String) if msg startsWith "started" ⇒ 2 }.toSet expectNoMsg(Duration.Zero) assert(set == Set(1, 2), set + " was not Set(1, 2)") @@ -176,18 +181,18 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd filterException[ActorKilledException] { actor ! Kill val set = receiveWhile(messages = 3) { - case Logging.Error(_: ActorKilledException, `actor`, "Kill") ⇒ 1 - case Logging.Debug(`actor`, "restarting") ⇒ 2 - case Logging.Debug(`actor`, "restarted") ⇒ 3 + case Logging.Error(_: ActorKilledException, `aname`, "Kill") ⇒ 1 + case Logging.Debug(`aname`, "restarting") ⇒ 2 + case Logging.Debug(`aname`, "restarted") ⇒ 3 }.toSet expectNoMsg(Duration.Zero) assert(set == Set(1, 2, 3), set + " was not Set(1, 2, 3)") } supervisor.stop() - expectMsg(Logging.Debug(supervisor, "stopping")) - expectMsg(Logging.Debug(actor, "stopped")) - expectMsg(Logging.Debug(supervisor, "stopped")) + expectMsg(Logging.Debug(sname, "stopping")) + expectMsg(Logging.Debug(aname, "stopped")) + expectMsg(Logging.Debug(sname, "stopped")) } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 33cea49284..40013a1c80 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -147,7 +147,7 @@ object ActorModelSpec { await(deadline)(stops == dispatcher.stops.get) } catch { case e ⇒ - system.eventStream.publish(Error(e, dispatcher, "actual: stops=" + dispatcher.stops.get + + system.eventStream.publish(Error(e, dispatcher.toString, "actual: stops=" + dispatcher.stops.get + " required: stops=" + stops)) throw e } @@ -204,7 +204,7 @@ object ActorModelSpec { await(deadline)(stats.restarts.get() == restarts) } catch { case e ⇒ - system.eventStream.publish(Error(e, dispatcher, "actual: " + stats + ", required: InterceptorStats(susp=" + suspensions + + system.eventStream.publish(Error(e, dispatcher.toString, "actual: " + stats + ", required: InterceptorStats(susp=" + suspensions + ",res=" + resumes + ",reg=" + registers + ",unreg=" + unregisters + ",recv=" + msgsReceived + ",proc=" + msgsProcessed + ",restart=" + restarts)) throw e @@ -232,9 +232,6 @@ abstract class ActorModelSpec extends AkkaSpec { protected def newInterceptedDispatcher: MessageDispatcherInterceptor protected def dispatcherType: String - // BalancingDispatcher of course does not work when another actor is in the pool, so overridden below - protected def wavesSupervisorDispatcher(dispatcher: MessageDispatcher) = dispatcher - "A " + dispatcherType must { "must dynamically handle its own life cycle" in { @@ -310,7 +307,7 @@ abstract class ActorModelSpec extends AkkaSpec { try { f } catch { - case e ⇒ system.eventStream.publish(Error(e, this, "error in spawned thread")) + case e ⇒ system.eventStream.publish(Error(e, "spawn", "error in spawned thread")) } } } @@ -347,9 +344,25 @@ abstract class ActorModelSpec extends AkkaSpec { val boss = actorOf(Props(context ⇒ { case "run" ⇒ for (_ ← 1 to num) (context.self startsWatching context.actorOf(props)) ! cachedMessage case Terminated(child) ⇒ stopLatch.countDown() - }).withDispatcher(wavesSupervisorDispatcher(dispatcher))) + }).withDispatcher(system.dispatcherFactory.newPinnedDispatcher("boss"))) boss ! "run" - assertCountDown(cachedMessage.latch, waitTime, "Counting down from " + num) + try { + assertCountDown(cachedMessage.latch, waitTime, "Counting down from " + num) + } catch { + case e ⇒ + val buddies = dispatcher.asInstanceOf[BalancingDispatcher].buddies + val mq = dispatcher.asInstanceOf[BalancingDispatcher].messageQueue + + System.err.println("Buddies left: ") + buddies.toArray foreach { + case cell: ActorCell ⇒ + System.err.println(" - " + cell.self.path + " " + cell.isShutdown + " " + cell.mailbox.status + " " + cell.mailbox.numberOfMessages + " " + SystemMessage.size(cell.mailbox.systemDrain())) + } + + System.err.println("Mailbox: " + mq.numberOfMessages + " " + mq.hasMessages + " ") + + throw e + } assertCountDown(stopLatch, waitTime, "Expected all children to stop") boss.stop() } @@ -451,8 +464,6 @@ class BalancingDispatcherModelSpec extends ActorModelSpec { def dispatcherType = "Balancing Dispatcher" - override def wavesSupervisorDispatcher(dispatcher: MessageDispatcher) = system.dispatcher - "A " + dispatcherType must { "process messages in parallel" in { implicit val dispatcher = newInterceptedDispatcher diff --git a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala index dbca040c08..14158e7454 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala @@ -112,7 +112,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { private def verifyLevel(bus: LoggingBus, level: Logging.LogLevel) { import Logging._ - val allmsg = Seq(Debug(this, "debug"), Info(this, "info"), Warning(this, "warning"), Error(this, "error")) + val allmsg = Seq(Debug("", "debug"), Info("", "info"), Warning("", "warning"), Error("", "error")) val msg = allmsg filter (_.level <= level) allmsg foreach bus.publish msg foreach (x ⇒ expectMsg(x)) diff --git a/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala b/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala index a69aefb013..764afefe3c 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/workbench/Report.scala @@ -13,7 +13,7 @@ class Report( compareResultWith: Option[String] = None) { private def doLog = System.getProperty("benchmark.logResult", "true").toBoolean - val log = Logging(system, this) + val log = Logging(system, "Report") val dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm") val legendTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm") diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 4224eff8e6..264fc7b791 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -13,6 +13,7 @@ import akka.cluster.ClusterNode import akka.japi.{ Creator, Procedure } import akka.serialization.{ Serializer, Serialization } import akka.event.Logging.Debug +import akka.event.LogSource import akka.experimental import akka.AkkaException @@ -166,7 +167,7 @@ object Actor { class LoggingReceive(source: AnyRef, r: Receive)(implicit system: ActorSystem) extends Receive { def isDefinedAt(o: Any) = { val handled = r.isDefinedAt(o) - system.eventStream.publish(Debug(source, "received " + (if (handled) "handled" else "unhandled") + " message " + o)) + system.eventStream.publish(Debug(LogSource.fromAnyRef(source), "received " + (if (handled) "handled" else "unhandled") + " message " + o)) handled } def apply(o: Any): Unit = r(o) diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 2685f6ca85..f43c8fd9f0 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -17,7 +17,7 @@ import akka.event.Logging.{ Debug, Warning, Error } */ trait ActorContext extends ActorRefFactory with TypedActorFactory { - def self: ActorRef with ScalaActorRef + def self: ActorRef def hasMessages: Boolean @@ -167,6 +167,7 @@ private[akka] class ActorCell( } } + //Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status final def systemInvoke(message: SystemMessage) { def create(): Unit = try { @@ -174,11 +175,11 @@ private[akka] class ActorCell( actor = created created.preStart() checkReceiveTimeout - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "started (" + actor + ")")) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "started (" + actor + ")")) } catch { case e ⇒ try { - system.eventStream.publish(Error(e, self, "error while creating actor")) + system.eventStream.publish(Error(e, self.toString, "error while creating actor")) // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) } finally { @@ -188,7 +189,7 @@ private[akka] class ActorCell( def recreate(cause: Throwable): Unit = try { val failedActor = actor - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "restarting")) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "restarting")) val freshActor = newActor() if (failedActor ne null) { val c = currentMessage //One read only plz @@ -202,14 +203,14 @@ private[akka] class ActorCell( } actor = freshActor // assign it here so if preStart fails, we can null out the sef-refs next call freshActor.postRestart(cause) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "restarted")) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "restarted")) dispatcher.resume(this) //FIXME should this be moved down? props.faultHandler.handleSupervisorRestarted(cause, self, children) } catch { case e ⇒ try { - system.eventStream.publish(Error(e, self, "error while creating actor")) + system.eventStream.publish(Error(e, self.toString, "error while creating actor")) // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) } finally { @@ -228,7 +229,7 @@ private[akka] class ActorCell( val c = children if (c.isEmpty) doTerminate() else { - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "stopping")) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "stopping")) for (child ← c) child.stop() stopping = true } @@ -239,84 +240,79 @@ private[akka] class ActorCell( if (!stats.contains(child)) { childrenRefs = childrenRefs.updated(child.name, child) childrenStats = childrenStats.updated(child, ChildRestartStats()) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "now supervising " + child)) - } else system.eventStream.publish(Warning(self, "Already supervising " + child)) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "now supervising " + child)) + } else system.eventStream.publish(Warning(self.toString, "Already supervising " + child)) } try { - val isClosed = mailbox.isClosed //Fence plus volatile read - if (!isClosed) { - if (stopping) message match { - case Terminate() ⇒ terminate() // to allow retry - case _ ⇒ - } - else message match { - case Create() ⇒ create() - case Recreate(cause) ⇒ recreate(cause) - case Link(subject) ⇒ - system.deathWatch.subscribe(self, subject) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "now monitoring " + subject)) - case Unlink(subject) ⇒ - system.deathWatch.unsubscribe(self, subject) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "stopped monitoring " + subject)) - case Suspend() ⇒ suspend() - case Resume() ⇒ resume() - case Terminate() ⇒ terminate() - case Supervise(child) ⇒ supervise(child) - } + if (stopping) message match { + case Terminate() ⇒ terminate() // to allow retry + case _ ⇒ + } + else message match { + case Create() ⇒ create() + case Recreate(cause) ⇒ recreate(cause) + case Link(subject) ⇒ + system.deathWatch.subscribe(self, subject) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "now monitoring " + subject)) + case Unlink(subject) ⇒ + system.deathWatch.unsubscribe(self, subject) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "stopped monitoring " + subject)) + case Suspend() ⇒ suspend() + case Resume() ⇒ resume() + case Terminate() ⇒ terminate() + case Supervise(child) ⇒ supervise(child) } } catch { case e ⇒ //Should we really catch everything here? - system.eventStream.publish(Error(e, self, "error while processing " + message)) + system.eventStream.publish(Error(e, self.toString, "error while processing " + message)) //TODO FIXME How should problems here be handled? throw e } } + //Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status final def invoke(messageHandle: Envelope) { try { - val isClosed = mailbox.isClosed //Fence plus volatile read - if (!isClosed) { - currentMessage = messageHandle + currentMessage = messageHandle + try { try { - try { - cancelReceiveTimeout() // FIXME: leave this here? - messageHandle.message match { - case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle) - case msg ⇒ - if (stopping) { - // receiving Terminated in response to stopping children is too common to generate noise - if (!msg.isInstanceOf[Terminated]) system.deadLetterMailbox.enqueue(self, messageHandle) - } else { - actor(msg) - } - } - currentMessage = null // reset current message after successful invocation - } catch { - case e ⇒ - system.eventStream.publish(Error(e, self, e.getMessage)) - - // prevent any further messages to be processed until the actor has been restarted - dispatcher.suspend(this) - - // make sure that InterruptedException does not leave this thread - if (e.isInstanceOf[InterruptedException]) { - val ex = ActorInterruptedException(e) - props.faultHandler.handleSupervisorFailing(self, children) - parent.tell(Failed(ex), self) - throw e //Re-throw InterruptedExceptions as expected + cancelReceiveTimeout() // FIXME: leave this here? + messageHandle.message match { + case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle) + case msg ⇒ + if (stopping) { + // receiving Terminated in response to stopping children is too common to generate noise + if (!msg.isInstanceOf[Terminated]) system.deadLetterMailbox.enqueue(self, messageHandle) } else { - props.faultHandler.handleSupervisorFailing(self, children) - parent.tell(Failed(e), self) + actor(msg) } - } finally { - checkReceiveTimeout // Reschedule receive timeout } + currentMessage = null // reset current message after successful invocation } catch { case e ⇒ - system.eventStream.publish(Error(e, self, e.getMessage)) - throw e + system.eventStream.publish(Error(e, self.toString, e.getMessage)) + + // prevent any further messages to be processed until the actor has been restarted + dispatcher.suspend(this) + + // make sure that InterruptedException does not leave this thread + if (e.isInstanceOf[InterruptedException]) { + val ex = ActorInterruptedException(e) + props.faultHandler.handleSupervisorFailing(self, children) + parent.tell(Failed(ex), self) + throw e //Re-throw InterruptedExceptions as expected + } else { + props.faultHandler.handleSupervisorFailing(self, children) + parent.tell(Failed(e), self) + } + } finally { + checkReceiveTimeout // Reschedule receive timeout } + } catch { + case e ⇒ + system.eventStream.publish(Error(e, self.toString, e.getMessage)) + throw e } } } @@ -332,7 +328,7 @@ private[akka] class ActorCell( } def autoReceiveMessage(msg: Envelope) { - if (system.settings.DebugAutoReceive) system.eventStream.publish(Debug(self, "received AutoReceiveMessage " + msg)) + if (system.settings.DebugAutoReceive) system.eventStream.publish(Debug(self.toString, "received AutoReceiveMessage " + msg)) if (stopping) msg.message match { case ChildTerminated ⇒ handleChildTerminated(sender) @@ -350,7 +346,7 @@ private[akka] class ActorCell( private def doTerminate() { if (!system.provider.evict(self.path.toString)) - system.eventStream.publish(Warning(self, "evict of " + self.path.toString + " failed")) + system.eventStream.publish(Warning(self.toString, "evict of " + self.path.toString + " failed")) dispatcher.detach(this) @@ -361,7 +357,7 @@ private[akka] class ActorCell( try { parent.tell(ChildTerminated, self) system.deathWatch.publish(Terminated(self)) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "stopped")) + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "stopped")) } finally { currentMessage = null clearActorFields() @@ -371,7 +367,7 @@ private[akka] class ActorCell( final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenStats.get(child) match { case Some(stats) ⇒ if (!props.faultHandler.handleFailure(child, cause, stats, childrenStats)) throw cause - case None ⇒ system.eventStream.publish(Warning(self, "dropping Failed(" + cause + ") from unknown child")) + case None ⇒ system.eventStream.publish(Warning(self.toString, "dropping Failed(" + cause + ") from unknown child")) } final def handleChildTerminated(child: ActorRef): Unit = { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 5770a5959f..d28a111ec2 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -13,9 +13,9 @@ import akka.config.ConfigurationException import akka.dispatch.{ SystemMessage, Supervise, Promise, MessageDispatcher, Future, DefaultPromise, Dispatcher, Mailbox, Envelope } import akka.event.{ Logging, DeathWatch, ActorClassification, EventStream } import akka.routing.{ ScatterGatherFirstCompletedRouter, Routing, RouterType, Router, RoutedProps, RoutedActorRef, RoundRobinRouter, RandomRouter, LocalConnectionManager, DirectRouter } -import akka.util.Helpers import akka.AkkaException import com.eaio.uuid.UUID +import akka.util.{ Switch, Helpers } /** * Interface for all ActorRef providers to implement. @@ -75,7 +75,7 @@ trait ActorRefProvider { * This Future is completed upon termination of this ActorRefProvider, which * is usually initiated by stopping the guardian via ActorSystem.stop(). */ - private[akka] def terminationFuture: Future[ActorSystem.ExitStatus] + private[akka] def terminationFuture: Future[Unit] } /** @@ -144,7 +144,7 @@ class LocalActorRefProvider( val dispatcher: MessageDispatcher, val scheduler: Scheduler) extends ActorRefProvider { - val log = Logging(eventStream, this) + val log = Logging(eventStream, "LocalActorRefProvider") // FIXME remove/replave (clustering shall not leak into akka-actor) val nodename: String = System.getProperty("akka.cluster.nodename") match { @@ -154,7 +154,7 @@ class LocalActorRefProvider( private[akka] val deployer: Deployer = new Deployer(settings, eventStream, nodename) - val terminationFuture = new DefaultPromise[ActorSystem.ExitStatus](Timeout.never)(dispatcher) + val terminationFuture = new DefaultPromise[Unit](Timeout.never)(dispatcher) /* * generate name for temporary actor refs @@ -173,8 +173,10 @@ class LocalActorRefProvider( * receive only Supervise/ChildTerminated system messages or Failure message. */ private[akka] val theOneWhoWalksTheBubblesOfSpaceTime: ActorRef = new MinimalActorRef { + val stopped = new Switch(false) + @volatile - var stopped = false + var causeOfTermination: Option[Throwable] = None override val name = "bubble-walker" @@ -185,17 +187,17 @@ class LocalActorRefProvider( override def toString = name - override def stop() = stopped = true + override def stop() = stopped switchOn { terminationFuture.complete(causeOfTermination.toLeft(())) } - override def isShutdown = stopped + override def isShutdown = stopped.isOn - override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match { - case Failed(ex) ⇒ sender.stop() - case ChildTerminated ⇒ terminationFuture.completeWithResult(ActorSystem.Stopped) + override def !(message: Any)(implicit sender: ActorRef = null): Unit = stopped.ifOff(message match { + case Failed(ex) ⇒ causeOfTermination = Some(ex); sender.stop() + case ChildTerminated ⇒ stop() case _ ⇒ log.error(this + " received unexpected message " + message) - } + }) - protected[akka] override def sendSystemMessage(message: SystemMessage) { + protected[akka] override def sendSystemMessage(message: SystemMessage): Unit = stopped ifOff { message match { case Supervise(child) ⇒ // TODO register child in some map to keep track of it and enable shutdown after all dead case _ ⇒ log.error(this + " received unexpected system message " + message) diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index cd2f0ba5c2..b603b5604e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -47,10 +47,6 @@ object ActorSystem { def create(): ActorSystem = apply() def apply(): ActorSystem = apply("default") - sealed trait ExitStatus - case object Stopped extends ExitStatus - case class Failed(cause: Throwable) extends ExitStatus - class Settings(cfg: Config) { val config: ConfigRoot = ConfigFactory.emptyRoot("akka").withFallback(cfg).withFallback(DefaultConfigurationLoader.referenceConfig).resolve() @@ -209,7 +205,7 @@ class ActorSystemImpl(val name: String, config: Config) extends ActorSystem { // this provides basic logging (to stdout) until .start() is called below val eventStream = new EventStream(DebugEventStream) eventStream.startStdoutLogger(settings) - val log = new BusLogging(eventStream, this) // “this” used only for .getClass in tagging messages + val log = new BusLogging(eventStream, "ActorSystem") // “this” used only for .getClass in tagging messages /** * The root actor path for this application. @@ -256,8 +252,8 @@ class ActorSystemImpl(val name: String, config: Config) extends ActorSystem { case Right(p) ⇒ p } } - - def terminationFuture: Future[ExitStatus] = provider.terminationFuture + //FIXME Set this to a Failure when things bubble to the top + def terminationFuture: Future[Unit] = provider.terminationFuture def guardian: ActorRef = provider.guardian def systemGuardian: ActorRef = provider.systemGuardian def deathWatch: DeathWatch = provider.deathWatch diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 9e4579bc48..ad274a3ba2 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -37,7 +37,7 @@ trait ActorDeployer { class Deployer(val settings: ActorSystem.Settings, val eventStream: EventStream, val nodename: String) extends ActorDeployer { val deploymentConfig = new DeploymentConfig(nodename) - val log = Logging(eventStream, this) + val log = Logging(eventStream, "Deployer") val instance: ActorDeployer = { val deployer = new LocalDeployer() diff --git a/akka-actor/src/main/scala/akka/actor/IO.scala b/akka-actor/src/main/scala/akka/actor/IO.scala index 65e325bfe8..aeb4e53573 100644 --- a/akka-actor/src/main/scala/akka/actor/IO.scala +++ b/akka-actor/src/main/scala/akka/actor/IO.scala @@ -71,13 +71,11 @@ object IO { case class ServerHandle(owner: ActorRef, ioManager: ActorRef, uuid: UUID = new UUID()) extends Handle { override def asServer = this - def accept(socketOwner: ActorRef): SocketHandle = { + def accept()(implicit socketOwner: ActorRef): SocketHandle = { val socket = SocketHandle(socketOwner, ioManager) ioManager ! Accept(socket, this) socket } - - def accept()(implicit socketOwner: ScalaActorRef): SocketHandle = accept(socketOwner) } sealed trait IOMessage @@ -91,35 +89,23 @@ object IO { case class Read(handle: ReadHandle, bytes: ByteString) extends IOMessage case class Write(handle: WriteHandle, bytes: ByteString) extends IOMessage - def listen(ioManager: ActorRef, address: InetSocketAddress, owner: ActorRef): ServerHandle = { + def listen(ioManager: ActorRef, address: InetSocketAddress)(implicit owner: ActorRef): ServerHandle = { val server = ServerHandle(owner, ioManager) ioManager ! Listen(server, address) server } - def listen(ioManager: ActorRef, address: InetSocketAddress)(implicit sender: ScalaActorRef): ServerHandle = - listen(ioManager, address, sender) + def listen(ioManager: ActorRef, host: String, port: Int)(implicit owner: ActorRef): ServerHandle = + listen(ioManager, new InetSocketAddress(host, port)) - def listen(ioManager: ActorRef, host: String, port: Int, owner: ActorRef): ServerHandle = - listen(ioManager, new InetSocketAddress(host, port), owner) - - def listen(ioManager: ActorRef, host: String, port: Int)(implicit sender: ScalaActorRef): ServerHandle = - listen(ioManager, new InetSocketAddress(host, port), sender) - - def connect(ioManager: ActorRef, address: InetSocketAddress, owner: ActorRef): SocketHandle = { + def connect(ioManager: ActorRef, address: InetSocketAddress)(implicit owner: ActorRef): SocketHandle = { val socket = SocketHandle(owner, ioManager) ioManager ! Connect(socket, address) socket } - def connect(ioManager: ActorRef, address: InetSocketAddress)(implicit sender: ScalaActorRef): SocketHandle = - connect(ioManager, address, sender) - - def connect(ioManager: ActorRef, host: String, port: Int, owner: ActorRef): SocketHandle = - connect(ioManager, new InetSocketAddress(host, port), owner) - - def connect(ioManager: ActorRef, host: String, port: Int)(implicit sender: ScalaActorRef): SocketHandle = - connect(ioManager, new InetSocketAddress(host, port), sender) + def connect(ioManager: ActorRef, host: String, port: Int)(implicit sender: ActorRef): SocketHandle = + connect(ioManager, new InetSocketAddress(host, port)) private class HandleState(var readBytes: ByteString, var connected: Boolean) { def this() = this(ByteString.empty, false) diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 107a86daaa..06ca082f15 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -69,7 +69,7 @@ final case class TaskInvocation(eventStream: EventStream, function: () ⇒ Unit, try { function() } catch { - case e ⇒ eventStream.publish(Error(e, this, e.getMessage)) + case e ⇒ eventStream.publish(Error(e, "TaskInvocation", e.getMessage)) } finally { cleanup() } @@ -236,10 +236,8 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext */ def resume(actor: ActorCell): Unit = { val mbox = actor.mailbox - if (mbox.dispatcher eq this) { - mbox.becomeOpen() + if ((mbox.dispatcher eq this) && mbox.becomeOpen()) registerForExecution(mbox, false, false) - } } /** diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 1266dcde96..b30de4a102 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -12,6 +12,7 @@ import annotation.tailrec import akka.actor.ActorSystem import akka.event.EventStream import akka.actor.Scheduler +import java.util.concurrent.atomic.AtomicBoolean /** * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed @@ -39,9 +40,10 @@ class BalancingDispatcher( _timeoutMs: Long) extends Dispatcher(_prerequisites, _name, throughput, throughputDeadlineTime, mailboxType, config, _timeoutMs) { - private val buddies = new ConcurrentSkipListSet[ActorCell](akka.util.Helpers.IdentityHashComparator) + val buddies = new ConcurrentSkipListSet[ActorCell](akka.util.Helpers.IdentityHashComparator) + val rebalance = new AtomicBoolean(false) - protected val messageQueue: MessageQueue = mailboxType match { + val messageQueue: MessageQueue = mailboxType match { case u: UnboundedMailbox ⇒ new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { final val queue = new ConcurrentLinkedQueue[Envelope] } @@ -66,13 +68,13 @@ class BalancingDispatcher( protected[akka] override def register(actor: ActorCell) = { super.register(actor) - registerForExecution(actor.mailbox, false, false) //Allow newcomers to be productive from the first moment + buddies.add(actor) } protected[akka] override def unregister(actor: ActorCell) = { - super.unregister(actor) buddies.remove(actor) - intoTheFray(except = actor) + super.unregister(actor) + intoTheFray(except = actor) //When someone leaves, he tosses a friend into the fray } protected override def cleanUpMailboxFor(actor: ActorCell, mailBox: Mailbox) { @@ -88,29 +90,27 @@ class BalancingDispatcher( } } - protected[akka] override def registerForExecution(mbox: Mailbox, hasMessagesHint: Boolean, hasSystemMessagesHint: Boolean): Boolean = { - if (!super.registerForExecution(mbox, hasMessagesHint, hasSystemMessagesHint)) { - mbox match { - case share: SharingMailbox if !share.isClosed ⇒ buddies.add(share.actor); false - case _ ⇒ false - } - } else true - } + def intoTheFray(except: ActorCell): Unit = + if (rebalance.compareAndSet(false, true)) { + try { + val i = buddies.iterator() - def intoTheFray(except: ActorCell): Unit = { - var buddy = buddies.pollFirst() - while (buddy ne null) { - val mbox = buddy.mailbox - buddy = if ((buddy eq except) || (!registerForExecution(mbox, false, false) && mbox.isClosed)) buddies.pollFirst() else null + @tailrec + def throwIn(): Unit = { + val n = if (i.hasNext) i.next() else null + if (n eq null) () + else if ((n ne except) && registerForExecution(n.mailbox, false, false)) () + else throwIn() + } + throwIn() + } finally { + rebalance.set(false) + } } - } override protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { messageQueue.enqueue(receiver.self, invocation) - + registerForExecution(receiver.mailbox, false, false) intoTheFray(except = receiver) - - if (!registerForExecution(receiver.mailbox, false, false)) - intoTheFray(except = receiver) } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 586586b7d5..5bda68850e 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -99,7 +99,7 @@ class Dispatcher( executorService.get() execute invocation } catch { case e2: RejectedExecutionException ⇒ - prerequisites.eventStream.publish(Warning(this, e2.toString)) + prerequisites.eventStream.publish(Warning("Dispatcher", e2.toString)) throw e2 } } @@ -107,20 +107,16 @@ class Dispatcher( protected[akka] def createMailbox(actor: ActorCell): Mailbox = mailboxType.create(actor) - protected[akka] def shutdown { - executorService.getAndSet(new ExecutorServiceDelegate { + protected[akka] def shutdown: Unit = + Option(executorService.getAndSet(new ExecutorServiceDelegate { lazy val executor = executorServiceFactory.createExecutorService - }) match { - case null ⇒ - case some ⇒ some.shutdown() - } - } + })) foreach { _.shutdown() } /** * Returns if it was registered */ protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = { - if (mbox.shouldBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races + if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races if (mbox.setAsScheduled()) { try { executorService.get() execute mbox diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 2160c406cd..b57ff39512 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -262,7 +262,7 @@ object Future { result completeWithResult currentValue } catch { case e: Exception ⇒ - dispatcher.prerequisites.eventStream.publish(Error(e, this, e.getMessage)) + dispatcher.prerequisites.eventStream.publish(Error(e, "Future.fold", e.getMessage)) result completeWithException e } finally { results.clear @@ -631,7 +631,7 @@ sealed trait Future[+T] extends japi.Future[T] { Right(f(res)) } catch { case e: Exception ⇒ - dispatcher.prerequisites.eventStream.publish(Error(e, this, e.getMessage)) + dispatcher.prerequisites.eventStream.publish(Error(e, "Future.map", e.getMessage)) Left(e) }) } @@ -683,7 +683,7 @@ sealed trait Future[+T] extends japi.Future[T] { future.completeWith(f(r)) } catch { case e: Exception ⇒ - dispatcher.prerequisites.eventStream.publish(Error(e, this, e.getMessage)) + dispatcher.prerequisites.eventStream.publish(Error(e, "Future.flatMap", e.getMessage)) future complete Left(e) } } @@ -716,7 +716,7 @@ sealed trait Future[+T] extends japi.Future[T] { if (p(res)) r else Left(new MatchError(res)) } catch { case e: Exception ⇒ - dispatcher.prerequisites.eventStream.publish(Error(e, this, e.getMessage)) + dispatcher.prerequisites.eventStream.publish(Error(e, "Future.filter", e.getMessage)) Left(e) }) } @@ -788,7 +788,7 @@ trait Promise[T] extends Future[T] { fr completeWith cont(f) } catch { case e: Exception ⇒ - dispatcher.prerequisites.eventStream.publish(Error(e, this, e.getMessage)) + dispatcher.prerequisites.eventStream.publish(Error(e, "Promise.completeWith", e.getMessage)) fr completeWithException e } } @@ -802,7 +802,7 @@ trait Promise[T] extends Future[T] { fr completeWith cont(f) } catch { case e: Exception ⇒ - dispatcher.prerequisites.eventStream.publish(Error(e, this, e.getMessage)) + dispatcher.prerequisites.eventStream.publish(Error(e, "Promise.completeWith", e.getMessage)) fr completeWithException e } } @@ -994,7 +994,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi } else this private def notifyCompleted(func: Future[T] ⇒ Unit) { - try { func(this) } catch { case e ⇒ dispatcher.prerequisites.eventStream.publish(Error(e, this, "Future onComplete-callback raised an exception")) } //TODO catch, everything? Really? + try { func(this) } catch { case e ⇒ dispatcher.prerequisites.eventStream.publish(Error(e, "Future", "Future onComplete-callback raised an exception")) } //TODO catch, everything? Really? } @inline diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index b752653f1d..6f268f7e9a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -128,15 +128,20 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag protected final def systemQueueGet: SystemMessage = AbstractMailbox.systemQueueUpdater.get(this) protected final def systemQueuePut(_old: SystemMessage, _new: SystemMessage): Boolean = AbstractMailbox.systemQueueUpdater.compareAndSet(this, _old, _new) - def shouldBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match { + final def canBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match { case Open | Scheduled ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages case Closed ⇒ false case _ ⇒ hasSystemMessageHint || hasSystemMessages } final def run = { - try processMailbox() finally { - setAsIdle() + try { + if (!isClosed) { //Volatile read, needed here + processAllSystemMessages() //First, deal with any system messages + processMailbox() //Then deal with messages + } + } finally { + setAsIdle() //Volatile write, needed here dispatcher.registerForExecution(this, false, false) } } @@ -146,9 +151,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag * * @return true if the processing finished before the mailbox was empty, due to the throughput constraint */ - final def processMailbox() { - processAllSystemMessages() //First, process all system messages - + private final def processMailbox() { if (shouldProcessMessage) { var nextMessage = dequeue() if (nextMessage ne null) { //If we have a message @@ -175,7 +178,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag } } - def processAllSystemMessages() { + final def processAllSystemMessages() { var nextMessage = systemDrain() try { while (nextMessage ne null) { @@ -187,7 +190,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag } } catch { case e ⇒ - actor.system.eventStream.publish(Error(e, actor.self, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!")) + actor.system.eventStream.publish(Error(e, actor.self.toString, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!")) throw e } } diff --git a/akka-actor/src/main/scala/akka/event/EventStream.scala b/akka-actor/src/main/scala/akka/event/EventStream.scala index c2be45d81e..3906d2cb04 100644 --- a/akka-actor/src/main/scala/akka/event/EventStream.scala +++ b/akka-actor/src/main/scala/akka/event/EventStream.scala @@ -3,9 +3,16 @@ */ package akka.event -import akka.actor.{ ActorRef, Actor, Props, ActorSystemImpl, Terminated } +import akka.actor.{ ActorRef, Actor, Props, ActorSystemImpl, Terminated, ActorSystem, simpleName } import akka.util.Subclassification +object EventStream { + implicit def fromActorSystem(system: ActorSystem) = system.eventStream +} + +class A(x: Int = 0) extends Exception("x=" + x) +class B extends A + class EventStream(debug: Boolean = false) extends LoggingBus with SubchannelClassification { type Event = AnyRef @@ -24,18 +31,18 @@ class EventStream(debug: Boolean = false) extends LoggingBus with SubchannelClas protected def publish(event: AnyRef, subscriber: ActorRef) = subscriber ! event override def subscribe(subscriber: ActorRef, channel: Class[_]): Boolean = { - if (debug) publish(Logging.Debug(this, "subscribing " + subscriber + " to channel " + channel)) + if (debug) publish(Logging.Debug(simpleName(this), "subscribing " + subscriber + " to channel " + channel)) if (reaper ne null) reaper ! subscriber super.subscribe(subscriber, channel) } override def unsubscribe(subscriber: ActorRef, channel: Class[_]): Boolean = { - if (debug) publish(Logging.Debug(this, "unsubscribing " + subscriber + " from channel " + channel)) + if (debug) publish(Logging.Debug(simpleName(this), "unsubscribing " + subscriber + " from channel " + channel)) super.unsubscribe(subscriber, channel) } override def unsubscribe(subscriber: ActorRef) { - if (debug) publish(Logging.Debug(this, "unsubscribing " + subscriber + " from all channels")) + if (debug) publish(Logging.Debug(simpleName(this), "unsubscribing " + subscriber + " from all channels")) super.unsubscribe(subscriber) } diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index 62ee06381d..5b3ae4b801 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -15,6 +15,10 @@ import akka.dispatch.FutureTimeoutException import java.util.concurrent.atomic.AtomicInteger import akka.actor.ActorRefProvider +object LoggingBus { + implicit def fromActorSystem(system: ActorSystem): LoggingBus = system.eventStream +} + /** * This trait brings log level handling to the EventStream: it reads the log * levels for the initial logging (StandardOutLogger) and the loggers&level @@ -68,7 +72,7 @@ trait LoggingBus extends ActorEventBus { private[akka] def startStdoutLogger(config: Settings) { val level = levelFor(config.StdoutLogLevel) getOrElse { - StandardOutLogger.print(Error(new EventHandlerException, this, "unknown akka.stdout-loglevel " + config.StdoutLogLevel)) + StandardOutLogger.print(Error(new EventHandlerException, simpleName(this), "unknown akka.stdout-loglevel " + config.StdoutLogLevel)) ErrorLevel } AllLogLevels filter (level >= _) foreach (l ⇒ subscribe(StandardOutLogger, classFor(l))) @@ -76,12 +80,12 @@ trait LoggingBus extends ActorEventBus { loggers = Seq(StandardOutLogger) _logLevel = level } - publish(Info(this, "StandardOutLogger started")) + publish(Info(simpleName(this), "StandardOutLogger started")) } private[akka] def startDefaultLoggers(system: ActorSystemImpl) { val level = levelFor(system.settings.LogLevel) getOrElse { - StandardOutLogger.print(Error(new EventHandlerException, this, "unknown akka.stdout-loglevel " + system.settings.LogLevel)) + StandardOutLogger.print(Error(new EventHandlerException, simpleName(this), "unknown akka.stdout-loglevel " + system.settings.LogLevel)) ErrorLevel } try { @@ -109,7 +113,7 @@ trait LoggingBus extends ActorEventBus { loggers = myloggers _logLevel = level } - publish(Info(this, "Default Loggers started")) + publish(Info(simpleName(this), "Default Loggers started")) if (!(defaultLoggers contains StandardOutLoggerName)) { unsubscribe(StandardOutLogger) } @@ -125,7 +129,7 @@ trait LoggingBus extends ActorEventBus { val level = _logLevel // volatile access before reading loggers if (!(loggers contains StandardOutLogger)) { AllLogLevels filter (level >= _) foreach (l ⇒ subscribe(StandardOutLogger, classFor(l))) - publish(Info(this, "shutting down: StandardOutLogger started")) + publish(Info(simpleName(this), "shutting down: StandardOutLogger started")) } for { logger ← loggers @@ -135,7 +139,7 @@ trait LoggingBus extends ActorEventBus { unsubscribe(logger) logger.stop() } - publish(Info(this, "all default loggers stopped")) + publish(Info(simpleName(this), "all default loggers stopped")) } private def addLogger(system: ActorSystemImpl, clazz: Class[_ <: Actor], level: LogLevel): ActorRef = { @@ -144,17 +148,52 @@ trait LoggingBus extends ActorEventBus { implicit val timeout = Timeout(3 seconds) val response = try actor ? InitializeLogger(this) get catch { case _: FutureTimeoutException ⇒ - publish(Warning(this, "Logger " + name + " did not respond within " + timeout + " to InitializeLogger(bus)")) + publish(Warning(simpleName(this), "Logger " + name + " did not respond within " + timeout + " to InitializeLogger(bus)")) } if (response != LoggerInitialized) throw new LoggerInitializationException("Logger " + name + " did not respond with LoggerInitialized, sent instead " + response) AllLogLevels filter (level >= _) foreach (l ⇒ subscribe(actor, classFor(l))) - publish(Info(this, "logger " + name + " started")) + publish(Info(simpleName(this), "logger " + name + " started")) actor } } +trait LogSource[-T] { + def genString(t: T): String +} + +object LogSource { + implicit val fromString: LogSource[String] = new LogSource[String] { + def genString(s: String) = s + } + + implicit val fromActor: LogSource[Actor] = new LogSource[Actor] { + def genString(a: Actor) = a.self.toString + } + + implicit val fromActorRef: LogSource[ActorRef] = new LogSource[ActorRef] { + def genString(a: ActorRef) = a.toString + } + + // this one unfortunately does not work as implicit, because existential types have some weird behavior + val fromClass: LogSource[Class[_]] = new LogSource[Class[_]] { + def genString(c: Class[_]) = simpleName(c) + } + implicit def fromAnyClass[T]: LogSource[Class[T]] = fromClass.asInstanceOf[LogSource[Class[T]]] + + def apply[T: LogSource](o: T) = implicitly[LogSource[T]].genString(o) + + def fromAnyRef(o: AnyRef): String = + o match { + case c: Class[_] ⇒ fromClass.genString(c) + case a: Actor ⇒ fromActor.genString(a) + case a: ActorRef ⇒ fromActorRef.genString(a) + case s: String ⇒ s + case x ⇒ simpleName(x) + } +} + /** * Main entry point for Akka logging: log levels and message types (aka * channels) defined for the main transport medium, the main event bus. The @@ -235,24 +274,26 @@ object Logging { /** * Obtain LoggingAdapter for the given application and source object. The - * source object is used to identify the source of this logging channel. + * source is used to identify the source of this logging channel and must have + * a corresponding LogSource[T] instance in scope; by default these are + * provided for Class[_], Actor, ActorRef and String types. */ - def apply(system: ActorSystem, source: AnyRef): LoggingAdapter = new BusLogging(system.eventStream, source) + def apply[T: LogSource](eventStream: LoggingBus, logSource: T): LoggingAdapter = + new BusLogging(eventStream, implicitly[LogSource[T]].genString(logSource)) + /** * Java API: Obtain LoggingAdapter for the given application and source object. The - * source object is used to identify the source of this logging channel. + * source object is used to identify the source of this logging channel; if it is + * an Actor or ActorRef, its address is used, in case of a class an approximation of + * its simpleName and in all other cases the simpleName of its class. */ - def getLogger(system: ActorSystem, source: AnyRef): LoggingAdapter = apply(system, source) - /** - * Obtain LoggingAdapter for the given event bus and source object. The - * source object is used to identify the source of this logging channel. - */ - def apply(bus: LoggingBus, source: AnyRef): LoggingAdapter = new BusLogging(bus, source) + def getLogger(system: ActorSystem, logSource: AnyRef): LoggingAdapter = apply(system.eventStream, LogSource.fromAnyRef(logSource)) + /** * Java API: Obtain LoggingAdapter for the given event bus and source object. The * source object is used to identify the source of this logging channel. */ - def getLogger(bus: LoggingBus, source: AnyRef): LoggingAdapter = apply(bus, source) + def getLogger(bus: LoggingBus, logSource: AnyRef): LoggingAdapter = apply(bus, LogSource.fromAnyRef(logSource)) /** * Artificial exception injected into Error events if no Throwable is @@ -266,22 +307,22 @@ object Logging { def level: LogLevel } - case class Error(cause: Throwable, instance: AnyRef, message: Any = "") extends LogEvent { + case class Error(cause: Throwable, logSource: String, message: Any = "") extends LogEvent { def level = ErrorLevel } object Error { - def apply(instance: AnyRef, message: Any) = new Error(new EventHandlerException, instance, message) + def apply(logSource: String, message: Any) = new Error(new EventHandlerException, logSource, message) } - case class Warning(instance: AnyRef, message: Any = "") extends LogEvent { + case class Warning(logSource: String, message: Any = "") extends LogEvent { def level = WarningLevel } - case class Info(instance: AnyRef, message: Any = "") extends LogEvent { + case class Info(logSource: String, message: Any = "") extends LogEvent { def level = InfoLevel } - case class Debug(instance: AnyRef, message: Any = "") extends LogEvent { + case class Debug(logSource: String, message: Any = "") extends LogEvent { def level = DebugLevel } @@ -318,7 +359,7 @@ object Logging { case e: Warning ⇒ warning(e) case e: Info ⇒ info(e) case e: Debug ⇒ debug(e) - case e ⇒ warning(Warning(this, "received unexpected event of class " + e.getClass + ": " + e)) + case e ⇒ warning(Warning(simpleName(this), "received unexpected event of class " + e.getClass + ": " + e)) } } @@ -326,7 +367,7 @@ object Logging { println(errorFormat.format( timestamp, event.thread.getName, - instanceName(event.instance), + event.logSource, event.message, stackTraceFor(event.cause))) @@ -334,21 +375,21 @@ object Logging { println(warningFormat.format( timestamp, event.thread.getName, - instanceName(event.instance), + event.logSource, event.message)) def info(event: Info) = println(infoFormat.format( timestamp, event.thread.getName, - instanceName(event.instance), + event.logSource, event.message)) def debug(event: Debug) = println(debugFormat.format( timestamp, event.thread.getName, - instanceName(event.instance), + event.logSource, event.message)) def instanceName(instance: AnyRef): String = instance match { @@ -491,7 +532,7 @@ trait LoggingAdapter { } } -class BusLogging(val bus: LoggingBus, val loggingInstance: AnyRef) extends LoggingAdapter { +class BusLogging(val bus: LoggingBus, val logSource: String) extends LoggingAdapter { import Logging._ @@ -500,14 +541,14 @@ class BusLogging(val bus: LoggingBus, val loggingInstance: AnyRef) extends Loggi def isInfoEnabled = bus.logLevel >= InfoLevel def isDebugEnabled = bus.logLevel >= DebugLevel - protected def notifyError(message: String) { bus.publish(Error(loggingInstance, message)) } + protected def notifyError(message: String) { bus.publish(Error(logSource, message)) } - protected def notifyError(cause: Throwable, message: String) { bus.publish(Error(cause, loggingInstance, message)) } + protected def notifyError(cause: Throwable, message: String) { bus.publish(Error(cause, logSource, message)) } - protected def notifyWarning(message: String) { bus.publish(Warning(loggingInstance, message)) } + protected def notifyWarning(message: String) { bus.publish(Warning(logSource, message)) } - protected def notifyInfo(message: String) { bus.publish(Info(loggingInstance, message)) } + protected def notifyInfo(message: String) { bus.publish(Info(logSource, message)) } - protected def notifyDebug(message: String) { bus.publish(Debug(loggingInstance, message)) } + protected def notifyDebug(message: String) { bus.publish(Debug(logSource, message)) } } diff --git a/akka-actor/src/main/scala/akka/util/Helpers.scala b/akka-actor/src/main/scala/akka/util/Helpers.scala index 67a77aa150..2b9f59d757 100644 --- a/akka-actor/src/main/scala/akka/util/Helpers.scala +++ b/akka-actor/src/main/scala/akka/util/Helpers.scala @@ -12,8 +12,6 @@ import scala.annotation.tailrec */ object Helpers { - implicit def null2Option[T](t: T): Option[T] = Option(t) - def compareIdentityHash(a: AnyRef, b: AnyRef): Int = { /* * make sure that there is no overflow or underflow in comparisons, so @@ -28,19 +26,6 @@ object Helpers { def compare(a: AnyRef, b: AnyRef): Int = compareIdentityHash(a, b) } - def intToBytes(value: Int): Array[Byte] = { - val bytes = new Array[Byte](4) - bytes(0) = (value >>> 24).asInstanceOf[Byte] - bytes(1) = (value >>> 16).asInstanceOf[Byte] - bytes(2) = (value >>> 8).asInstanceOf[Byte] - bytes(3) = value.asInstanceOf[Byte] - bytes - } - - def bytesToInt(bytes: Array[Byte], offset: Int): Int = { - (0 until 4).foldLeft(0)((value, index) ⇒ value + ((bytes(index + offset) & 0x000000FF) << ((4 - 1 - index) * 8))) - } - final val base64chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789*?" @tailrec @@ -72,77 +57,4 @@ object Helpers { throw e } } - - /** - * Convenience helper to cast the given Option of Any to an Option of the given type. Will throw a ClassCastException - * if the actual type is not assignable from the given one. - */ - def narrow[T](o: Option[Any]): Option[T] = { - require((o ne null), "Option to be narrowed must not be null!") - o.asInstanceOf[Option[T]] - } - - /** - * Convenience helper to cast the given Option of Any to an Option of the given type. Will swallow a possible - * ClassCastException and return None in that case. - */ - def narrowSilently[T: Manifest](o: Option[Any]): Option[T] = - try { - narrow(o) - } catch { - case e: ClassCastException ⇒ - None - } - - /** - * Reference that can hold either a typed value or an exception. - * - * Usage: - *
- * scala> ResultOrError(1)
- * res0: ResultOrError[Int] = ResultOrError@a96606
- *
- * scala> res0()
- * res1: Int = 1
- *
- * scala> res0() = 3
- *
- * scala> res0()
- * res3: Int = 3
- *
- * scala> res0() = { println("Hello world"); 3}
- * Hello world
- *
- * scala> res0()
- * res5: Int = 3
- *
- * scala> res0() = error("Lets see what happens here...")
- *
- * scala> res0()
- * java.lang.RuntimeException: Lets see what happens here...
- * at ResultOrError.apply(Helper.scala:11)
- * at .(:6)
- * at .()
- * at Re...
- *
- */
- class ResultOrError[R](result: R) {
- private[this] var contents: Either[R, Throwable] = Left(result)
-
- def update(value: ⇒ R) {
- contents = try {
- Left(value)
- } catch {
- case (error: Throwable) ⇒ Right(error)
- }
- }
-
- def apply() = contents match {
- case Left(result) ⇒ result
- case Right(error) ⇒ throw error.fillInStackTrace
- }
- }
- object ResultOrError {
- def apply[R](result: R) = new ResultOrError(result)
- }
}
diff --git a/akka-actor/src/main/scala/akka/util/JMX.scala b/akka-actor/src/main/scala/akka/util/JMX.scala
index 2bf8545210..2c87524843 100644
--- a/akka-actor/src/main/scala/akka/util/JMX.scala
+++ b/akka-actor/src/main/scala/akka/util/JMX.scala
@@ -24,7 +24,7 @@ object JMX {
case e: InstanceAlreadyExistsException ⇒
Some(mbeanServer.getObjectInstance(name))
case e: Exception ⇒
- system.eventStream.publish(Error(e, this, "Error when registering mbean [%s]".format(mbean)))
+ system.eventStream.publish(Error(e, "JMX", "Error when registering mbean [%s]".format(mbean)))
None
}
@@ -32,6 +32,6 @@ object JMX {
mbeanServer.unregisterMBean(mbean)
} catch {
case e: InstanceNotFoundException ⇒ {}
- case e: Exception ⇒ system.eventStream.publish(Error(e, this, "Error while unregistering mbean [%s]".format(mbean)))
+ case e: Exception ⇒ system.eventStream.publish(Error(e, "JMX", "Error while unregistering mbean [%s]".format(mbean)))
}
}
diff --git a/akka-docs/dev/building-akka.rst b/akka-docs/dev/building-akka.rst
index 3e46a2698e..a9db8fe7d2 100644
--- a/akka-docs/dev/building-akka.rst
+++ b/akka-docs/dev/building-akka.rst
@@ -128,7 +128,7 @@ Dependencies
You can look at the Ivy dependency resolution information that is created on
``sbt update`` and found in ``~/.ivy2/cache``. For example, the
-``.ivy2/cache/se.scalablesolutions.akka-akka-cluster-compile.xml`` file contains
+``~/.ivy2/cache/com.typesafe.akka-akka-remote-compile.xml`` file contains
the resolution information for the akka-cluster module compile dependencies. If
you open this file in a web browser you will get an easy to navigate view of
dependencies.
diff --git a/akka-docs/intro/deployment-scenarios.rst b/akka-docs/intro/deployment-scenarios.rst
index 94047f242c..829d93829e 100644
--- a/akka-docs/intro/deployment-scenarios.rst
+++ b/akka-docs/intro/deployment-scenarios.rst
@@ -54,7 +54,7 @@ To use the plugin, first add a plugin definition to your sbt project by creating
resolvers += Classpaths.typesafeResolver
- addSbtPlugin("se.scalablesolutions.akka" % "akka-sbt-plugin" % "2.0-SNAPSHOT")
+ addSbtPlugin("com.typesafe.akka" % "akka-sbt-plugin" % "2.0-SNAPSHOT")
Then use the AkkaKernelPlugin settings. In a 'light' configuration (build.sbt)::
@@ -75,7 +75,7 @@ Or in a 'full' configuration (Build.scala). For example::
version := "0.1",
scalaVersion := "2.9.1"
resolvers += "Typesafe Repo" at "http://repo.typesafe.com/typesafe/releases/",
- libraryDependencies += "se.scalablesolutions.akka" % "akka-kernel" % "2.0-SNAPSHOT"
+ libraryDependencies += "com.typesafe.akka" % "akka-kernel" % "2.0-SNAPSHOT"
)
)
}
diff --git a/akka-docs/intro/getting-started-first-java.rst b/akka-docs/intro/getting-started-first-java.rst
index be79581416..ee890d723d 100644
--- a/akka-docs/intro/getting-started-first-java.rst
+++ b/akka-docs/intro/getting-started-first-java.rst
@@ -180,7 +180,7 @@ It should now look something like this:
Left("").
*/
case class WarningFilter(
- override val source: Option[AnyRef],
+ override val source: Option[String],
override val message: Either[String, Regex],
override val complete: Boolean)(occurrences: Int) extends EventFilter(occurrences) {
@@ -321,7 +315,7 @@ case class WarningFilter(
* @param complete
* whether the event’s message must match the given message string or pattern completely
*/
- def this(source: AnyRef, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) =
+ def this(source: String, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) =
this(Option(source),
if (message eq null) Left("")
else if (pattern) Right(new Regex(message))
@@ -338,7 +332,7 @@ case class WarningFilter(
* If you want to match all Info events, the most efficient is to use Left("").
*/
case class InfoFilter(
- override val source: Option[AnyRef],
+ override val source: Option[String],
override val message: Either[String, Regex],
override val complete: Boolean)(occurrences: Int) extends EventFilter(occurrences) {
@@ -364,7 +358,7 @@ case class InfoFilter(
* @param complete
* whether the event’s message must match the given message string or pattern completely
*/
- def this(source: AnyRef, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) =
+ def this(source: String, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) =
this(Option(source),
if (message eq null) Left("")
else if (pattern) Right(new Regex(message))
@@ -381,7 +375,7 @@ case class InfoFilter(
* If you want to match all Debug events, the most efficient is to use Left("").
*/
case class DebugFilter(
- override val source: Option[AnyRef],
+ override val source: Option[String],
override val message: Either[String, Regex],
override val complete: Boolean)(occurrences: Int) extends EventFilter(occurrences) {
@@ -407,7 +401,7 @@ case class DebugFilter(
* @param complete
* whether the event’s message must match the given message string or pattern completely
*/
- def this(source: AnyRef, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) =
+ def this(source: String, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) =
this(Option(source),
if (message eq null) Left("")
else if (pattern) Right(new Regex(message))
@@ -452,12 +446,12 @@ class TestEventListener extends Logging.DefaultLogger {
case event: LogEvent ⇒ if (!filter(event)) print(event)
case DeadLetter(msg: SystemMessage, _, rcp) ⇒
if (!msg.isInstanceOf[Terminate]) {
- val event = Warning(rcp, "received dead system message: " + msg)
+ val event = Warning(rcp.toString, "received dead system message: " + msg)
if (!filter(event)) print(event)
}
case DeadLetter(msg, snd, rcp) ⇒
if (!msg.isInstanceOf[Terminated]) {
- val event = Warning(rcp, "received dead letter from " + snd + ": " + msg)
+ val event = Warning(rcp.toString, "received dead letter from " + snd + ": " + msg)
if (!filter(event)) print(event)
}
}
diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala
index 857d9f22ee..9f225a07cc 100644
--- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala
+++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala
@@ -602,5 +602,5 @@ object TestProbe {
}
trait ImplicitSender { this: TestKit ⇒
- implicit def implicitSenderTestActor = testActor
+ implicit def self = testActor
}
diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala
index a130503822..def5ebfe96 100644
--- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala
+++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala
@@ -43,7 +43,7 @@ object AkkaSpec {
abstract class AkkaSpec(_application: ActorSystem = ActorSystem(getClass.getSimpleName, AkkaSpec.testConf))
extends TestKit(_application) with WordSpec with MustMatchers with BeforeAndAfterAll {
- val log: LoggingAdapter = Logging(system, this)
+ val log: LoggingAdapter = Logging(system, this.getClass)
final override def beforeAll {
atStartup()