Merge branch 'master' into wip-1141-config-patriknw

Conflicts:
	akka-actor/src/main/scala/akka/actor/ActorSystem.scala
This commit is contained in:
Patrik Nordwall 2011-11-19 09:18:57 +01:00
commit a9217cec7b
44 changed files with 360 additions and 389 deletions

View file

@ -14,13 +14,7 @@ import java.util.concurrent.atomic._
object ActorLifeCycleSpec {
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender {
import ActorLifeCycleSpec._
class LifeCycleTestActor(id: String, generationProvider: AtomicInteger) extends Actor {
class LifeCycleTestActor(testActor: ActorRef, id: String, generationProvider: AtomicInteger) extends Actor {
def report(msg: Any) = testActor ! message(msg)
def message(msg: Any): Tuple3[Any, String, Int] = (msg, id, currentGen)
val currentGen = generationProvider.getAndIncrement()
@ -29,6 +23,12 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
def receive = { case "status" sender ! message("OK") }
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender {
import ActorLifeCycleSpec._
"An Actor" must {
"invoke preRestart, preStart, postRestart when using OneForOneStrategy" in {
@ -36,7 +36,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
val id = newUuid().toString
val supervisor = actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3))))
val gen = new AtomicInteger(0)
val restarterProps = Props(new LifeCycleTestActor(id, gen) {
val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen) {
override def preRestart(reason: Throwable, message: Option[Any]) { report("preRestart") }
override def postRestart(reason: Throwable) { report("postRestart") }
})
@ -70,7 +70,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
val id = newUuid().toString
val supervisor = actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3))))
val gen = new AtomicInteger(0)
val restarterProps = Props(new LifeCycleTestActor(id, gen))
val restarterProps = Props(new LifeCycleTestActor(testActor, id, gen))
val restarter = (supervisor ? restarterProps).as[ActorRef].get
expectMsg(("preStart", id, 0))
@ -100,7 +100,7 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
val id = newUuid().toString
val supervisor = actorOf(Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(3))))
val gen = new AtomicInteger(0)
val props = Props(new LifeCycleTestActor(id, gen))
val props = Props(new LifeCycleTestActor(testActor, id, gen))
val a = (supervisor ? props).as[ActorRef].get
expectMsg(("preStart", id, 0))
a ! "status"

View file

@ -164,11 +164,12 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im
case Ev("go") goto(2)
}
})
val name = fsm.toString
filterException[Logging.EventHandlerException] {
system.eventStream.subscribe(testActor, classOf[Logging.Error])
fsm ! "go"
expectMsgPF(1 second, hint = "Next state 2 does not exist") {
case Logging.Error(_, `fsm`, "Next state 2 does not exist") true
case Logging.Error(_, `name`, "Next state 2 does not exist") true
}
system.eventStream.unsubscribe(testActor)
}
@ -212,18 +213,19 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im
case StopEvent(r, _, _) testActor ! r
}
})
val name = fsm.toString
system.eventStream.subscribe(testActor, classOf[Logging.Debug])
fsm ! "go"
expectMsgPF(1 second, hint = "processing Event(go,null)") {
case Logging.Debug(`fsm`, s: String) if s.startsWith("processing Event(go,null) from Actor[") true
case Logging.Debug(`name`, s: String) if s.startsWith("processing Event(go,null) from Actor[") true
}
expectMsg(1 second, Logging.Debug(fsm, "setting timer 't'/1500 milliseconds: Shutdown"))
expectMsg(1 second, Logging.Debug(fsm, "transition 1 -> 2"))
expectMsg(1 second, Logging.Debug(name, "setting timer 't'/1500 milliseconds: Shutdown"))
expectMsg(1 second, Logging.Debug(name, "transition 1 -> 2"))
fsm ! "stop"
expectMsgPF(1 second, hint = "processing Event(stop,null)") {
case Logging.Debug(`fsm`, s: String) if s.startsWith("processing Event(stop,null) from Actor[") true
case Logging.Debug(`name`, s: String) if s.startsWith("processing Event(stop,null) from Actor[") true
}
expectMsgAllOf(1 second, Logging.Debug(fsm, "canceling timer 't'"), Normal)
expectMsgAllOf(1 second, Logging.Debug(name, "canceling timer 't'"), Normal)
expectNoMsg(1 second)
system.eventStream.unsubscribe(testActor)
}

View file

@ -106,8 +106,8 @@ class FSMTimingSpec extends AkkaSpec with ImplicitSender {
}
"notify unhandled messages" taggedAs TimingTest in {
filterEvents(EventFilter.warning("unhandled event Tick in state TestUnhandled", source = fsm, occurrences = 1),
EventFilter.warning("unhandled event Unhandled(test) in state TestUnhandled", source = fsm, occurrences = 1)) {
filterEvents(EventFilter.warning("unhandled event Tick in state TestUnhandled", source = fsm.toString, occurrences = 1),
EventFilter.warning("unhandled event Unhandled(test) in state TestUnhandled", source = fsm.toString, occurrences = 1)) {
fsm ! TestUnhandled
within(1 second) {
fsm ! Tick

View file

@ -42,7 +42,7 @@ object IOActorSpec {
class SimpleEchoClient(host: String, port: Int, ioManager: ActorRef) extends Actor with IO {
lazy val socket: SocketHandle = connect(ioManager, host, port, reader)
lazy val socket: SocketHandle = connect(ioManager, host, port)(reader)
lazy val reader: ActorRef = context.actorOf {
new Actor with IO {
def receiveIO = {

View file

@ -58,9 +58,9 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
val r: Actor.Receive = {
case null
}
val log = Actor.LoggingReceive(this, r)
val log = Actor.LoggingReceive("funky", r)
log.isDefinedAt("hallo")
expectMsg(1 second, Logging.Debug(this, "received unhandled message hallo"))
expectMsg(1 second, Logging.Debug("funky", "received unhandled message hallo"))
}
}
@ -75,9 +75,10 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
sender ! "x"
}
})
val name = actor.toString
actor ! "buh"
within(1 second) {
expectMsg(Logging.Debug(actor.underlyingActor, "received handled message buh"))
expectMsg(Logging.Debug(name, "received handled message buh"))
expectMsg("x")
}
val r: Actor.Receive = {
@ -88,7 +89,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
within(500 millis) {
actor ! "bah"
expectMsgPF() {
case Logging.Error(_: UnhandledMessageException, `actor`, _) true
case Logging.Error(_: UnhandledMessageException, `name`, _) true
}
}
}
@ -105,7 +106,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
})
actor ! "buh"
within(1 second) {
expectMsg(Logging.Debug(actor.underlyingActor, "received handled message buh"))
expectMsg(Logging.Debug(actor.toString, "received handled message buh"))
expectMsg("x")
}
}
@ -123,9 +124,10 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
case _
}
})
val name = actor.toString
actor ! PoisonPill
expectMsgPF() {
case Logging.Debug(`actor`, msg: String) if msg startsWith "received AutoReceiveMessage Envelope(PoisonPill" true
case Logging.Debug(`name`, msg: String) if msg startsWith "received AutoReceiveMessage Envelope(PoisonPill" true
}
awaitCond(actor.isShutdown, 100 millis)
}
@ -143,20 +145,23 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
system.eventStream.subscribe(testActor, classOf[Logging.Error])
within(3 seconds) {
val lifecycleGuardian = appLifecycle.asInstanceOf[ActorSystemImpl].guardian
val lname = lifecycleGuardian.toString
val supervisor = TestActorRef[TestLogActor](Props[TestLogActor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 5, 5000)))
val sname = supervisor.toString
val supervisorSet = receiveWhile(messages = 2) {
case Logging.Debug(`lifecycleGuardian`, msg: String) if msg startsWith "now supervising" 1
case Logging.Debug(`supervisor`, msg: String) if msg startsWith "started" 2
case Logging.Debug(`lname`, msg: String) if msg startsWith "now supervising" 1
case Logging.Debug(`sname`, msg: String) if msg startsWith "started" 2
}.toSet
expectNoMsg(Duration.Zero)
assert(supervisorSet == Set(1, 2), supervisorSet + " was not Set(1, 2)")
val actor = TestActorRef[TestLogActor](Props[TestLogActor], supervisor, "none")
val aname = actor.toString
val set = receiveWhile(messages = 2) {
case Logging.Debug(`supervisor`, msg: String) if msg startsWith "now supervising" 1
case Logging.Debug(`actor`, msg: String) if msg startsWith "started" 2
case Logging.Debug(`sname`, msg: String) if msg startsWith "now supervising" 1
case Logging.Debug(`aname`, msg: String) if msg startsWith "started" 2
}.toSet
expectNoMsg(Duration.Zero)
assert(set == Set(1, 2), set + " was not Set(1, 2)")
@ -176,18 +181,18 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
filterException[ActorKilledException] {
actor ! Kill
val set = receiveWhile(messages = 3) {
case Logging.Error(_: ActorKilledException, `actor`, "Kill") 1
case Logging.Debug(`actor`, "restarting") 2
case Logging.Debug(`actor`, "restarted") 3
case Logging.Error(_: ActorKilledException, `aname`, "Kill") 1
case Logging.Debug(`aname`, "restarting") 2
case Logging.Debug(`aname`, "restarted") 3
}.toSet
expectNoMsg(Duration.Zero)
assert(set == Set(1, 2, 3), set + " was not Set(1, 2, 3)")
}
supervisor.stop()
expectMsg(Logging.Debug(supervisor, "stopping"))
expectMsg(Logging.Debug(actor, "stopped"))
expectMsg(Logging.Debug(supervisor, "stopped"))
expectMsg(Logging.Debug(sname, "stopping"))
expectMsg(Logging.Debug(aname, "stopped"))
expectMsg(Logging.Debug(sname, "stopped"))
}
}
}

View file

@ -147,7 +147,7 @@ object ActorModelSpec {
await(deadline)(stops == dispatcher.stops.get)
} catch {
case e
system.eventStream.publish(Error(e, dispatcher, "actual: stops=" + dispatcher.stops.get +
system.eventStream.publish(Error(e, dispatcher.toString, "actual: stops=" + dispatcher.stops.get +
" required: stops=" + stops))
throw e
}
@ -204,7 +204,7 @@ object ActorModelSpec {
await(deadline)(stats.restarts.get() == restarts)
} catch {
case e
system.eventStream.publish(Error(e, dispatcher, "actual: " + stats + ", required: InterceptorStats(susp=" + suspensions +
system.eventStream.publish(Error(e, dispatcher.toString, "actual: " + stats + ", required: InterceptorStats(susp=" + suspensions +
",res=" + resumes + ",reg=" + registers + ",unreg=" + unregisters +
",recv=" + msgsReceived + ",proc=" + msgsProcessed + ",restart=" + restarts))
throw e
@ -232,9 +232,6 @@ abstract class ActorModelSpec extends AkkaSpec {
protected def newInterceptedDispatcher: MessageDispatcherInterceptor
protected def dispatcherType: String
// BalancingDispatcher of course does not work when another actor is in the pool, so overridden below
protected def wavesSupervisorDispatcher(dispatcher: MessageDispatcher) = dispatcher
"A " + dispatcherType must {
"must dynamically handle its own life cycle" in {
@ -310,7 +307,7 @@ abstract class ActorModelSpec extends AkkaSpec {
try {
f
} catch {
case e system.eventStream.publish(Error(e, this, "error in spawned thread"))
case e system.eventStream.publish(Error(e, "spawn", "error in spawned thread"))
}
}
}
@ -347,9 +344,25 @@ abstract class ActorModelSpec extends AkkaSpec {
val boss = actorOf(Props(context {
case "run" for (_ 1 to num) (context.self startsWatching context.actorOf(props)) ! cachedMessage
case Terminated(child) stopLatch.countDown()
}).withDispatcher(wavesSupervisorDispatcher(dispatcher)))
}).withDispatcher(system.dispatcherFactory.newPinnedDispatcher("boss")))
boss ! "run"
assertCountDown(cachedMessage.latch, waitTime, "Counting down from " + num)
try {
assertCountDown(cachedMessage.latch, waitTime, "Counting down from " + num)
} catch {
case e
val buddies = dispatcher.asInstanceOf[BalancingDispatcher].buddies
val mq = dispatcher.asInstanceOf[BalancingDispatcher].messageQueue
System.err.println("Buddies left: ")
buddies.toArray foreach {
case cell: ActorCell
System.err.println(" - " + cell.self.path + " " + cell.isShutdown + " " + cell.mailbox.status + " " + cell.mailbox.numberOfMessages + " " + SystemMessage.size(cell.mailbox.systemDrain()))
}
System.err.println("Mailbox: " + mq.numberOfMessages + " " + mq.hasMessages + " ")
throw e
}
assertCountDown(stopLatch, waitTime, "Expected all children to stop")
boss.stop()
}
@ -451,8 +464,6 @@ class BalancingDispatcherModelSpec extends ActorModelSpec {
def dispatcherType = "Balancing Dispatcher"
override def wavesSupervisorDispatcher(dispatcher: MessageDispatcher) = system.dispatcher
"A " + dispatcherType must {
"process messages in parallel" in {
implicit val dispatcher = newInterceptedDispatcher

View file

@ -112,7 +112,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {
private def verifyLevel(bus: LoggingBus, level: Logging.LogLevel) {
import Logging._
val allmsg = Seq(Debug(this, "debug"), Info(this, "info"), Warning(this, "warning"), Error(this, "error"))
val allmsg = Seq(Debug("", "debug"), Info("", "info"), Warning("", "warning"), Error("", "error"))
val msg = allmsg filter (_.level <= level)
allmsg foreach bus.publish
msg foreach (x expectMsg(x))

View file

@ -13,7 +13,7 @@ class Report(
compareResultWith: Option[String] = None) {
private def doLog = System.getProperty("benchmark.logResult", "true").toBoolean
val log = Logging(system, this)
val log = Logging(system, "Report")
val dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm")
val legendTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm")

View file

@ -13,6 +13,7 @@ import akka.cluster.ClusterNode
import akka.japi.{ Creator, Procedure }
import akka.serialization.{ Serializer, Serialization }
import akka.event.Logging.Debug
import akka.event.LogSource
import akka.experimental
import akka.AkkaException
@ -166,7 +167,7 @@ object Actor {
class LoggingReceive(source: AnyRef, r: Receive)(implicit system: ActorSystem) extends Receive {
def isDefinedAt(o: Any) = {
val handled = r.isDefinedAt(o)
system.eventStream.publish(Debug(source, "received " + (if (handled) "handled" else "unhandled") + " message " + o))
system.eventStream.publish(Debug(LogSource.fromAnyRef(source), "received " + (if (handled) "handled" else "unhandled") + " message " + o))
handled
}
def apply(o: Any): Unit = r(o)

View file

@ -17,7 +17,7 @@ import akka.event.Logging.{ Debug, Warning, Error }
*/
trait ActorContext extends ActorRefFactory with TypedActorFactory {
def self: ActorRef with ScalaActorRef
def self: ActorRef
def hasMessages: Boolean
@ -167,6 +167,7 @@ private[akka] class ActorCell(
}
}
//Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status
final def systemInvoke(message: SystemMessage) {
def create(): Unit = try {
@ -174,11 +175,11 @@ private[akka] class ActorCell(
actor = created
created.preStart()
checkReceiveTimeout
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "started (" + actor + ")"))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "started (" + actor + ")"))
} catch {
case e
try {
system.eventStream.publish(Error(e, self, "error while creating actor"))
system.eventStream.publish(Error(e, self.toString, "error while creating actor"))
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
} finally {
@ -188,7 +189,7 @@ private[akka] class ActorCell(
def recreate(cause: Throwable): Unit = try {
val failedActor = actor
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "restarting"))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "restarting"))
val freshActor = newActor()
if (failedActor ne null) {
val c = currentMessage //One read only plz
@ -202,14 +203,14 @@ private[akka] class ActorCell(
}
actor = freshActor // assign it here so if preStart fails, we can null out the sef-refs next call
freshActor.postRestart(cause)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "restarted"))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "restarted"))
dispatcher.resume(this) //FIXME should this be moved down?
props.faultHandler.handleSupervisorRestarted(cause, self, children)
} catch {
case e try {
system.eventStream.publish(Error(e, self, "error while creating actor"))
system.eventStream.publish(Error(e, self.toString, "error while creating actor"))
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
} finally {
@ -228,7 +229,7 @@ private[akka] class ActorCell(
val c = children
if (c.isEmpty) doTerminate()
else {
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "stopping"))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "stopping"))
for (child c) child.stop()
stopping = true
}
@ -239,84 +240,79 @@ private[akka] class ActorCell(
if (!stats.contains(child)) {
childrenRefs = childrenRefs.updated(child.name, child)
childrenStats = childrenStats.updated(child, ChildRestartStats())
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "now supervising " + child))
} else system.eventStream.publish(Warning(self, "Already supervising " + child))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "now supervising " + child))
} else system.eventStream.publish(Warning(self.toString, "Already supervising " + child))
}
try {
val isClosed = mailbox.isClosed //Fence plus volatile read
if (!isClosed) {
if (stopping) message match {
case Terminate() terminate() // to allow retry
case _
}
else message match {
case Create() create()
case Recreate(cause) recreate(cause)
case Link(subject)
system.deathWatch.subscribe(self, subject)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "now monitoring " + subject))
case Unlink(subject)
system.deathWatch.unsubscribe(self, subject)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "stopped monitoring " + subject))
case Suspend() suspend()
case Resume() resume()
case Terminate() terminate()
case Supervise(child) supervise(child)
}
if (stopping) message match {
case Terminate() terminate() // to allow retry
case _
}
else message match {
case Create() create()
case Recreate(cause) recreate(cause)
case Link(subject)
system.deathWatch.subscribe(self, subject)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "now monitoring " + subject))
case Unlink(subject)
system.deathWatch.unsubscribe(self, subject)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "stopped monitoring " + subject))
case Suspend() suspend()
case Resume() resume()
case Terminate() terminate()
case Supervise(child) supervise(child)
}
} catch {
case e //Should we really catch everything here?
system.eventStream.publish(Error(e, self, "error while processing " + message))
system.eventStream.publish(Error(e, self.toString, "error while processing " + message))
//TODO FIXME How should problems here be handled?
throw e
}
}
//Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status
final def invoke(messageHandle: Envelope) {
try {
val isClosed = mailbox.isClosed //Fence plus volatile read
if (!isClosed) {
currentMessage = messageHandle
currentMessage = messageHandle
try {
try {
try {
cancelReceiveTimeout() // FIXME: leave this here?
messageHandle.message match {
case msg: AutoReceivedMessage autoReceiveMessage(messageHandle)
case msg
if (stopping) {
// receiving Terminated in response to stopping children is too common to generate noise
if (!msg.isInstanceOf[Terminated]) system.deadLetterMailbox.enqueue(self, messageHandle)
} else {
actor(msg)
}
}
currentMessage = null // reset current message after successful invocation
} catch {
case e
system.eventStream.publish(Error(e, self, e.getMessage))
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
// make sure that InterruptedException does not leave this thread
if (e.isInstanceOf[InterruptedException]) {
val ex = ActorInterruptedException(e)
props.faultHandler.handleSupervisorFailing(self, children)
parent.tell(Failed(ex), self)
throw e //Re-throw InterruptedExceptions as expected
cancelReceiveTimeout() // FIXME: leave this here?
messageHandle.message match {
case msg: AutoReceivedMessage autoReceiveMessage(messageHandle)
case msg
if (stopping) {
// receiving Terminated in response to stopping children is too common to generate noise
if (!msg.isInstanceOf[Terminated]) system.deadLetterMailbox.enqueue(self, messageHandle)
} else {
props.faultHandler.handleSupervisorFailing(self, children)
parent.tell(Failed(e), self)
actor(msg)
}
} finally {
checkReceiveTimeout // Reschedule receive timeout
}
currentMessage = null // reset current message after successful invocation
} catch {
case e
system.eventStream.publish(Error(e, self, e.getMessage))
throw e
system.eventStream.publish(Error(e, self.toString, e.getMessage))
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
// make sure that InterruptedException does not leave this thread
if (e.isInstanceOf[InterruptedException]) {
val ex = ActorInterruptedException(e)
props.faultHandler.handleSupervisorFailing(self, children)
parent.tell(Failed(ex), self)
throw e //Re-throw InterruptedExceptions as expected
} else {
props.faultHandler.handleSupervisorFailing(self, children)
parent.tell(Failed(e), self)
}
} finally {
checkReceiveTimeout // Reschedule receive timeout
}
} catch {
case e
system.eventStream.publish(Error(e, self.toString, e.getMessage))
throw e
}
}
}
@ -332,7 +328,7 @@ private[akka] class ActorCell(
}
def autoReceiveMessage(msg: Envelope) {
if (system.settings.DebugAutoReceive) system.eventStream.publish(Debug(self, "received AutoReceiveMessage " + msg))
if (system.settings.DebugAutoReceive) system.eventStream.publish(Debug(self.toString, "received AutoReceiveMessage " + msg))
if (stopping) msg.message match {
case ChildTerminated handleChildTerminated(sender)
@ -350,7 +346,7 @@ private[akka] class ActorCell(
private def doTerminate() {
if (!system.provider.evict(self.path.toString))
system.eventStream.publish(Warning(self, "evict of " + self.path.toString + " failed"))
system.eventStream.publish(Warning(self.toString, "evict of " + self.path.toString + " failed"))
dispatcher.detach(this)
@ -361,7 +357,7 @@ private[akka] class ActorCell(
try {
parent.tell(ChildTerminated, self)
system.deathWatch.publish(Terminated(self))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self, "stopped"))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.toString, "stopped"))
} finally {
currentMessage = null
clearActorFields()
@ -371,7 +367,7 @@ private[akka] class ActorCell(
final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenStats.get(child) match {
case Some(stats) if (!props.faultHandler.handleFailure(child, cause, stats, childrenStats)) throw cause
case None system.eventStream.publish(Warning(self, "dropping Failed(" + cause + ") from unknown child"))
case None system.eventStream.publish(Warning(self.toString, "dropping Failed(" + cause + ") from unknown child"))
}
final def handleChildTerminated(child: ActorRef): Unit = {

View file

@ -13,9 +13,9 @@ import akka.config.ConfigurationException
import akka.dispatch.{ SystemMessage, Supervise, Promise, MessageDispatcher, Future, DefaultPromise, Dispatcher, Mailbox, Envelope }
import akka.event.{ Logging, DeathWatch, ActorClassification, EventStream }
import akka.routing.{ ScatterGatherFirstCompletedRouter, Routing, RouterType, Router, RoutedProps, RoutedActorRef, RoundRobinRouter, RandomRouter, LocalConnectionManager, DirectRouter }
import akka.util.Helpers
import akka.AkkaException
import com.eaio.uuid.UUID
import akka.util.{ Switch, Helpers }
/**
* Interface for all ActorRef providers to implement.
@ -75,7 +75,7 @@ trait ActorRefProvider {
* This Future is completed upon termination of this ActorRefProvider, which
* is usually initiated by stopping the guardian via ActorSystem.stop().
*/
private[akka] def terminationFuture: Future[ActorSystem.ExitStatus]
private[akka] def terminationFuture: Future[Unit]
}
/**
@ -144,7 +144,7 @@ class LocalActorRefProvider(
val dispatcher: MessageDispatcher,
val scheduler: Scheduler) extends ActorRefProvider {
val log = Logging(eventStream, this)
val log = Logging(eventStream, "LocalActorRefProvider")
// FIXME remove/replave (clustering shall not leak into akka-actor)
val nodename: String = System.getProperty("akka.cluster.nodename") match {
@ -154,7 +154,7 @@ class LocalActorRefProvider(
private[akka] val deployer: Deployer = new Deployer(settings, eventStream, nodename)
val terminationFuture = new DefaultPromise[ActorSystem.ExitStatus](Timeout.never)(dispatcher)
val terminationFuture = new DefaultPromise[Unit](Timeout.never)(dispatcher)
/*
* generate name for temporary actor refs
@ -173,8 +173,10 @@ class LocalActorRefProvider(
* receive only Supervise/ChildTerminated system messages or Failure message.
*/
private[akka] val theOneWhoWalksTheBubblesOfSpaceTime: ActorRef = new MinimalActorRef {
val stopped = new Switch(false)
@volatile
var stopped = false
var causeOfTermination: Option[Throwable] = None
override val name = "bubble-walker"
@ -185,17 +187,17 @@ class LocalActorRefProvider(
override def toString = name
override def stop() = stopped = true
override def stop() = stopped switchOn { terminationFuture.complete(causeOfTermination.toLeft(())) }
override def isShutdown = stopped
override def isShutdown = stopped.isOn
override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match {
case Failed(ex) sender.stop()
case ChildTerminated terminationFuture.completeWithResult(ActorSystem.Stopped)
override def !(message: Any)(implicit sender: ActorRef = null): Unit = stopped.ifOff(message match {
case Failed(ex) causeOfTermination = Some(ex); sender.stop()
case ChildTerminated stop()
case _ log.error(this + " received unexpected message " + message)
}
})
protected[akka] override def sendSystemMessage(message: SystemMessage) {
protected[akka] override def sendSystemMessage(message: SystemMessage): Unit = stopped ifOff {
message match {
case Supervise(child) // TODO register child in some map to keep track of it and enable shutdown after all dead
case _ log.error(this + " received unexpected system message " + message)

View file

@ -47,10 +47,6 @@ object ActorSystem {
def create(): ActorSystem = apply()
def apply(): ActorSystem = apply("default")
sealed trait ExitStatus
case object Stopped extends ExitStatus
case class Failed(cause: Throwable) extends ExitStatus
class Settings(cfg: Config) {
val config: ConfigRoot = ConfigFactory.emptyRoot("akka").withFallback(cfg).withFallback(DefaultConfigurationLoader.referenceConfig).resolve()
@ -209,7 +205,7 @@ class ActorSystemImpl(val name: String, config: Config) extends ActorSystem {
// this provides basic logging (to stdout) until .start() is called below
val eventStream = new EventStream(DebugEventStream)
eventStream.startStdoutLogger(settings)
val log = new BusLogging(eventStream, this) // this used only for .getClass in tagging messages
val log = new BusLogging(eventStream, "ActorSystem") // this used only for .getClass in tagging messages
/**
* The root actor path for this application.
@ -256,8 +252,8 @@ class ActorSystemImpl(val name: String, config: Config) extends ActorSystem {
case Right(p) p
}
}
def terminationFuture: Future[ExitStatus] = provider.terminationFuture
//FIXME Set this to a Failure when things bubble to the top
def terminationFuture: Future[Unit] = provider.terminationFuture
def guardian: ActorRef = provider.guardian
def systemGuardian: ActorRef = provider.systemGuardian
def deathWatch: DeathWatch = provider.deathWatch

View file

@ -37,7 +37,7 @@ trait ActorDeployer {
class Deployer(val settings: ActorSystem.Settings, val eventStream: EventStream, val nodename: String) extends ActorDeployer {
val deploymentConfig = new DeploymentConfig(nodename)
val log = Logging(eventStream, this)
val log = Logging(eventStream, "Deployer")
val instance: ActorDeployer = {
val deployer = new LocalDeployer()

View file

@ -71,13 +71,11 @@ object IO {
case class ServerHandle(owner: ActorRef, ioManager: ActorRef, uuid: UUID = new UUID()) extends Handle {
override def asServer = this
def accept(socketOwner: ActorRef): SocketHandle = {
def accept()(implicit socketOwner: ActorRef): SocketHandle = {
val socket = SocketHandle(socketOwner, ioManager)
ioManager ! Accept(socket, this)
socket
}
def accept()(implicit socketOwner: ScalaActorRef): SocketHandle = accept(socketOwner)
}
sealed trait IOMessage
@ -91,35 +89,23 @@ object IO {
case class Read(handle: ReadHandle, bytes: ByteString) extends IOMessage
case class Write(handle: WriteHandle, bytes: ByteString) extends IOMessage
def listen(ioManager: ActorRef, address: InetSocketAddress, owner: ActorRef): ServerHandle = {
def listen(ioManager: ActorRef, address: InetSocketAddress)(implicit owner: ActorRef): ServerHandle = {
val server = ServerHandle(owner, ioManager)
ioManager ! Listen(server, address)
server
}
def listen(ioManager: ActorRef, address: InetSocketAddress)(implicit sender: ScalaActorRef): ServerHandle =
listen(ioManager, address, sender)
def listen(ioManager: ActorRef, host: String, port: Int)(implicit owner: ActorRef): ServerHandle =
listen(ioManager, new InetSocketAddress(host, port))
def listen(ioManager: ActorRef, host: String, port: Int, owner: ActorRef): ServerHandle =
listen(ioManager, new InetSocketAddress(host, port), owner)
def listen(ioManager: ActorRef, host: String, port: Int)(implicit sender: ScalaActorRef): ServerHandle =
listen(ioManager, new InetSocketAddress(host, port), sender)
def connect(ioManager: ActorRef, address: InetSocketAddress, owner: ActorRef): SocketHandle = {
def connect(ioManager: ActorRef, address: InetSocketAddress)(implicit owner: ActorRef): SocketHandle = {
val socket = SocketHandle(owner, ioManager)
ioManager ! Connect(socket, address)
socket
}
def connect(ioManager: ActorRef, address: InetSocketAddress)(implicit sender: ScalaActorRef): SocketHandle =
connect(ioManager, address, sender)
def connect(ioManager: ActorRef, host: String, port: Int, owner: ActorRef): SocketHandle =
connect(ioManager, new InetSocketAddress(host, port), owner)
def connect(ioManager: ActorRef, host: String, port: Int)(implicit sender: ScalaActorRef): SocketHandle =
connect(ioManager, new InetSocketAddress(host, port), sender)
def connect(ioManager: ActorRef, host: String, port: Int)(implicit sender: ActorRef): SocketHandle =
connect(ioManager, new InetSocketAddress(host, port))
private class HandleState(var readBytes: ByteString, var connected: Boolean) {
def this() = this(ByteString.empty, false)

View file

@ -69,7 +69,7 @@ final case class TaskInvocation(eventStream: EventStream, function: () ⇒ Unit,
try {
function()
} catch {
case e eventStream.publish(Error(e, this, e.getMessage))
case e eventStream.publish(Error(e, "TaskInvocation", e.getMessage))
} finally {
cleanup()
}
@ -236,10 +236,8 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
*/
def resume(actor: ActorCell): Unit = {
val mbox = actor.mailbox
if (mbox.dispatcher eq this) {
mbox.becomeOpen()
if ((mbox.dispatcher eq this) && mbox.becomeOpen())
registerForExecution(mbox, false, false)
}
}
/**

View file

@ -12,6 +12,7 @@ import annotation.tailrec
import akka.actor.ActorSystem
import akka.event.EventStream
import akka.actor.Scheduler
import java.util.concurrent.atomic.AtomicBoolean
/**
* An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
@ -39,9 +40,10 @@ class BalancingDispatcher(
_timeoutMs: Long)
extends Dispatcher(_prerequisites, _name, throughput, throughputDeadlineTime, mailboxType, config, _timeoutMs) {
private val buddies = new ConcurrentSkipListSet[ActorCell](akka.util.Helpers.IdentityHashComparator)
val buddies = new ConcurrentSkipListSet[ActorCell](akka.util.Helpers.IdentityHashComparator)
val rebalance = new AtomicBoolean(false)
protected val messageQueue: MessageQueue = mailboxType match {
val messageQueue: MessageQueue = mailboxType match {
case u: UnboundedMailbox new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final val queue = new ConcurrentLinkedQueue[Envelope]
}
@ -66,13 +68,13 @@ class BalancingDispatcher(
protected[akka] override def register(actor: ActorCell) = {
super.register(actor)
registerForExecution(actor.mailbox, false, false) //Allow newcomers to be productive from the first moment
buddies.add(actor)
}
protected[akka] override def unregister(actor: ActorCell) = {
super.unregister(actor)
buddies.remove(actor)
intoTheFray(except = actor)
super.unregister(actor)
intoTheFray(except = actor) //When someone leaves, he tosses a friend into the fray
}
protected override def cleanUpMailboxFor(actor: ActorCell, mailBox: Mailbox) {
@ -88,29 +90,27 @@ class BalancingDispatcher(
}
}
protected[akka] override def registerForExecution(mbox: Mailbox, hasMessagesHint: Boolean, hasSystemMessagesHint: Boolean): Boolean = {
if (!super.registerForExecution(mbox, hasMessagesHint, hasSystemMessagesHint)) {
mbox match {
case share: SharingMailbox if !share.isClosed buddies.add(share.actor); false
case _ false
}
} else true
}
def intoTheFray(except: ActorCell): Unit =
if (rebalance.compareAndSet(false, true)) {
try {
val i = buddies.iterator()
def intoTheFray(except: ActorCell): Unit = {
var buddy = buddies.pollFirst()
while (buddy ne null) {
val mbox = buddy.mailbox
buddy = if ((buddy eq except) || (!registerForExecution(mbox, false, false) && mbox.isClosed)) buddies.pollFirst() else null
@tailrec
def throwIn(): Unit = {
val n = if (i.hasNext) i.next() else null
if (n eq null) ()
else if ((n ne except) && registerForExecution(n.mailbox, false, false)) ()
else throwIn()
}
throwIn()
} finally {
rebalance.set(false)
}
}
}
override protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = {
messageQueue.enqueue(receiver.self, invocation)
registerForExecution(receiver.mailbox, false, false)
intoTheFray(except = receiver)
if (!registerForExecution(receiver.mailbox, false, false))
intoTheFray(except = receiver)
}
}

View file

@ -99,7 +99,7 @@ class Dispatcher(
executorService.get() execute invocation
} catch {
case e2: RejectedExecutionException
prerequisites.eventStream.publish(Warning(this, e2.toString))
prerequisites.eventStream.publish(Warning("Dispatcher", e2.toString))
throw e2
}
}
@ -107,20 +107,16 @@ class Dispatcher(
protected[akka] def createMailbox(actor: ActorCell): Mailbox = mailboxType.create(actor)
protected[akka] def shutdown {
executorService.getAndSet(new ExecutorServiceDelegate {
protected[akka] def shutdown: Unit =
Option(executorService.getAndSet(new ExecutorServiceDelegate {
lazy val executor = executorServiceFactory.createExecutorService
}) match {
case null
case some some.shutdown()
}
}
})) foreach { _.shutdown() }
/**
* Returns if it was registered
*/
protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = {
if (mbox.shouldBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races
if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races
if (mbox.setAsScheduled()) {
try {
executorService.get() execute mbox

View file

@ -262,7 +262,7 @@ object Future {
result completeWithResult currentValue
} catch {
case e: Exception
dispatcher.prerequisites.eventStream.publish(Error(e, this, e.getMessage))
dispatcher.prerequisites.eventStream.publish(Error(e, "Future.fold", e.getMessage))
result completeWithException e
} finally {
results.clear
@ -631,7 +631,7 @@ sealed trait Future[+T] extends japi.Future[T] {
Right(f(res))
} catch {
case e: Exception
dispatcher.prerequisites.eventStream.publish(Error(e, this, e.getMessage))
dispatcher.prerequisites.eventStream.publish(Error(e, "Future.map", e.getMessage))
Left(e)
})
}
@ -683,7 +683,7 @@ sealed trait Future[+T] extends japi.Future[T] {
future.completeWith(f(r))
} catch {
case e: Exception
dispatcher.prerequisites.eventStream.publish(Error(e, this, e.getMessage))
dispatcher.prerequisites.eventStream.publish(Error(e, "Future.flatMap", e.getMessage))
future complete Left(e)
}
}
@ -716,7 +716,7 @@ sealed trait Future[+T] extends japi.Future[T] {
if (p(res)) r else Left(new MatchError(res))
} catch {
case e: Exception
dispatcher.prerequisites.eventStream.publish(Error(e, this, e.getMessage))
dispatcher.prerequisites.eventStream.publish(Error(e, "Future.filter", e.getMessage))
Left(e)
})
}
@ -788,7 +788,7 @@ trait Promise[T] extends Future[T] {
fr completeWith cont(f)
} catch {
case e: Exception
dispatcher.prerequisites.eventStream.publish(Error(e, this, e.getMessage))
dispatcher.prerequisites.eventStream.publish(Error(e, "Promise.completeWith", e.getMessage))
fr completeWithException e
}
}
@ -802,7 +802,7 @@ trait Promise[T] extends Future[T] {
fr completeWith cont(f)
} catch {
case e: Exception
dispatcher.prerequisites.eventStream.publish(Error(e, this, e.getMessage))
dispatcher.prerequisites.eventStream.publish(Error(e, "Promise.completeWith", e.getMessage))
fr completeWithException e
}
}
@ -994,7 +994,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
} else this
private def notifyCompleted(func: Future[T] Unit) {
try { func(this) } catch { case e dispatcher.prerequisites.eventStream.publish(Error(e, this, "Future onComplete-callback raised an exception")) } //TODO catch, everything? Really?
try { func(this) } catch { case e dispatcher.prerequisites.eventStream.publish(Error(e, "Future", "Future onComplete-callback raised an exception")) } //TODO catch, everything? Really?
}
@inline

View file

@ -128,15 +128,20 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag
protected final def systemQueueGet: SystemMessage = AbstractMailbox.systemQueueUpdater.get(this)
protected final def systemQueuePut(_old: SystemMessage, _new: SystemMessage): Boolean = AbstractMailbox.systemQueueUpdater.compareAndSet(this, _old, _new)
def shouldBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match {
final def canBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match {
case Open | Scheduled hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages
case Closed false
case _ hasSystemMessageHint || hasSystemMessages
}
final def run = {
try processMailbox() finally {
setAsIdle()
try {
if (!isClosed) { //Volatile read, needed here
processAllSystemMessages() //First, deal with any system messages
processMailbox() //Then deal with messages
}
} finally {
setAsIdle() //Volatile write, needed here
dispatcher.registerForExecution(this, false, false)
}
}
@ -146,9 +151,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag
*
* @return true if the processing finished before the mailbox was empty, due to the throughput constraint
*/
final def processMailbox() {
processAllSystemMessages() //First, process all system messages
private final def processMailbox() {
if (shouldProcessMessage) {
var nextMessage = dequeue()
if (nextMessage ne null) { //If we have a message
@ -175,7 +178,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag
}
}
def processAllSystemMessages() {
final def processAllSystemMessages() {
var nextMessage = systemDrain()
try {
while (nextMessage ne null) {
@ -187,7 +190,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag
}
} catch {
case e
actor.system.eventStream.publish(Error(e, actor.self, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!"))
actor.system.eventStream.publish(Error(e, actor.self.toString, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!"))
throw e
}
}

View file

@ -3,9 +3,16 @@
*/
package akka.event
import akka.actor.{ ActorRef, Actor, Props, ActorSystemImpl, Terminated }
import akka.actor.{ ActorRef, Actor, Props, ActorSystemImpl, Terminated, ActorSystem, simpleName }
import akka.util.Subclassification
object EventStream {
implicit def fromActorSystem(system: ActorSystem) = system.eventStream
}
class A(x: Int = 0) extends Exception("x=" + x)
class B extends A
class EventStream(debug: Boolean = false) extends LoggingBus with SubchannelClassification {
type Event = AnyRef
@ -24,18 +31,18 @@ class EventStream(debug: Boolean = false) extends LoggingBus with SubchannelClas
protected def publish(event: AnyRef, subscriber: ActorRef) = subscriber ! event
override def subscribe(subscriber: ActorRef, channel: Class[_]): Boolean = {
if (debug) publish(Logging.Debug(this, "subscribing " + subscriber + " to channel " + channel))
if (debug) publish(Logging.Debug(simpleName(this), "subscribing " + subscriber + " to channel " + channel))
if (reaper ne null) reaper ! subscriber
super.subscribe(subscriber, channel)
}
override def unsubscribe(subscriber: ActorRef, channel: Class[_]): Boolean = {
if (debug) publish(Logging.Debug(this, "unsubscribing " + subscriber + " from channel " + channel))
if (debug) publish(Logging.Debug(simpleName(this), "unsubscribing " + subscriber + " from channel " + channel))
super.unsubscribe(subscriber, channel)
}
override def unsubscribe(subscriber: ActorRef) {
if (debug) publish(Logging.Debug(this, "unsubscribing " + subscriber + " from all channels"))
if (debug) publish(Logging.Debug(simpleName(this), "unsubscribing " + subscriber + " from all channels"))
super.unsubscribe(subscriber)
}

View file

@ -15,6 +15,10 @@ import akka.dispatch.FutureTimeoutException
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.ActorRefProvider
object LoggingBus {
implicit def fromActorSystem(system: ActorSystem): LoggingBus = system.eventStream
}
/**
* This trait brings log level handling to the EventStream: it reads the log
* levels for the initial logging (StandardOutLogger) and the loggers&level
@ -68,7 +72,7 @@ trait LoggingBus extends ActorEventBus {
private[akka] def startStdoutLogger(config: Settings) {
val level = levelFor(config.StdoutLogLevel) getOrElse {
StandardOutLogger.print(Error(new EventHandlerException, this, "unknown akka.stdout-loglevel " + config.StdoutLogLevel))
StandardOutLogger.print(Error(new EventHandlerException, simpleName(this), "unknown akka.stdout-loglevel " + config.StdoutLogLevel))
ErrorLevel
}
AllLogLevels filter (level >= _) foreach (l subscribe(StandardOutLogger, classFor(l)))
@ -76,12 +80,12 @@ trait LoggingBus extends ActorEventBus {
loggers = Seq(StandardOutLogger)
_logLevel = level
}
publish(Info(this, "StandardOutLogger started"))
publish(Info(simpleName(this), "StandardOutLogger started"))
}
private[akka] def startDefaultLoggers(system: ActorSystemImpl) {
val level = levelFor(system.settings.LogLevel) getOrElse {
StandardOutLogger.print(Error(new EventHandlerException, this, "unknown akka.stdout-loglevel " + system.settings.LogLevel))
StandardOutLogger.print(Error(new EventHandlerException, simpleName(this), "unknown akka.stdout-loglevel " + system.settings.LogLevel))
ErrorLevel
}
try {
@ -109,7 +113,7 @@ trait LoggingBus extends ActorEventBus {
loggers = myloggers
_logLevel = level
}
publish(Info(this, "Default Loggers started"))
publish(Info(simpleName(this), "Default Loggers started"))
if (!(defaultLoggers contains StandardOutLoggerName)) {
unsubscribe(StandardOutLogger)
}
@ -125,7 +129,7 @@ trait LoggingBus extends ActorEventBus {
val level = _logLevel // volatile access before reading loggers
if (!(loggers contains StandardOutLogger)) {
AllLogLevels filter (level >= _) foreach (l subscribe(StandardOutLogger, classFor(l)))
publish(Info(this, "shutting down: StandardOutLogger started"))
publish(Info(simpleName(this), "shutting down: StandardOutLogger started"))
}
for {
logger loggers
@ -135,7 +139,7 @@ trait LoggingBus extends ActorEventBus {
unsubscribe(logger)
logger.stop()
}
publish(Info(this, "all default loggers stopped"))
publish(Info(simpleName(this), "all default loggers stopped"))
}
private def addLogger(system: ActorSystemImpl, clazz: Class[_ <: Actor], level: LogLevel): ActorRef = {
@ -144,17 +148,52 @@ trait LoggingBus extends ActorEventBus {
implicit val timeout = Timeout(3 seconds)
val response = try actor ? InitializeLogger(this) get catch {
case _: FutureTimeoutException
publish(Warning(this, "Logger " + name + " did not respond within " + timeout + " to InitializeLogger(bus)"))
publish(Warning(simpleName(this), "Logger " + name + " did not respond within " + timeout + " to InitializeLogger(bus)"))
}
if (response != LoggerInitialized)
throw new LoggerInitializationException("Logger " + name + " did not respond with LoggerInitialized, sent instead " + response)
AllLogLevels filter (level >= _) foreach (l subscribe(actor, classFor(l)))
publish(Info(this, "logger " + name + " started"))
publish(Info(simpleName(this), "logger " + name + " started"))
actor
}
}
trait LogSource[-T] {
def genString(t: T): String
}
object LogSource {
implicit val fromString: LogSource[String] = new LogSource[String] {
def genString(s: String) = s
}
implicit val fromActor: LogSource[Actor] = new LogSource[Actor] {
def genString(a: Actor) = a.self.toString
}
implicit val fromActorRef: LogSource[ActorRef] = new LogSource[ActorRef] {
def genString(a: ActorRef) = a.toString
}
// this one unfortunately does not work as implicit, because existential types have some weird behavior
val fromClass: LogSource[Class[_]] = new LogSource[Class[_]] {
def genString(c: Class[_]) = simpleName(c)
}
implicit def fromAnyClass[T]: LogSource[Class[T]] = fromClass.asInstanceOf[LogSource[Class[T]]]
def apply[T: LogSource](o: T) = implicitly[LogSource[T]].genString(o)
def fromAnyRef(o: AnyRef): String =
o match {
case c: Class[_] fromClass.genString(c)
case a: Actor fromActor.genString(a)
case a: ActorRef fromActorRef.genString(a)
case s: String s
case x simpleName(x)
}
}
/**
* Main entry point for Akka logging: log levels and message types (aka
* channels) defined for the main transport medium, the main event bus. The
@ -235,24 +274,26 @@ object Logging {
/**
* Obtain LoggingAdapter for the given application and source object. The
* source object is used to identify the source of this logging channel.
* source is used to identify the source of this logging channel and must have
* a corresponding LogSource[T] instance in scope; by default these are
* provided for Class[_], Actor, ActorRef and String types.
*/
def apply(system: ActorSystem, source: AnyRef): LoggingAdapter = new BusLogging(system.eventStream, source)
def apply[T: LogSource](eventStream: LoggingBus, logSource: T): LoggingAdapter =
new BusLogging(eventStream, implicitly[LogSource[T]].genString(logSource))
/**
* Java API: Obtain LoggingAdapter for the given application and source object. The
* source object is used to identify the source of this logging channel.
* source object is used to identify the source of this logging channel; if it is
* an Actor or ActorRef, its address is used, in case of a class an approximation of
* its simpleName and in all other cases the simpleName of its class.
*/
def getLogger(system: ActorSystem, source: AnyRef): LoggingAdapter = apply(system, source)
/**
* Obtain LoggingAdapter for the given event bus and source object. The
* source object is used to identify the source of this logging channel.
*/
def apply(bus: LoggingBus, source: AnyRef): LoggingAdapter = new BusLogging(bus, source)
def getLogger(system: ActorSystem, logSource: AnyRef): LoggingAdapter = apply(system.eventStream, LogSource.fromAnyRef(logSource))
/**
* Java API: Obtain LoggingAdapter for the given event bus and source object. The
* source object is used to identify the source of this logging channel.
*/
def getLogger(bus: LoggingBus, source: AnyRef): LoggingAdapter = apply(bus, source)
def getLogger(bus: LoggingBus, logSource: AnyRef): LoggingAdapter = apply(bus, LogSource.fromAnyRef(logSource))
/**
* Artificial exception injected into Error events if no Throwable is
@ -266,22 +307,22 @@ object Logging {
def level: LogLevel
}
case class Error(cause: Throwable, instance: AnyRef, message: Any = "") extends LogEvent {
case class Error(cause: Throwable, logSource: String, message: Any = "") extends LogEvent {
def level = ErrorLevel
}
object Error {
def apply(instance: AnyRef, message: Any) = new Error(new EventHandlerException, instance, message)
def apply(logSource: String, message: Any) = new Error(new EventHandlerException, logSource, message)
}
case class Warning(instance: AnyRef, message: Any = "") extends LogEvent {
case class Warning(logSource: String, message: Any = "") extends LogEvent {
def level = WarningLevel
}
case class Info(instance: AnyRef, message: Any = "") extends LogEvent {
case class Info(logSource: String, message: Any = "") extends LogEvent {
def level = InfoLevel
}
case class Debug(instance: AnyRef, message: Any = "") extends LogEvent {
case class Debug(logSource: String, message: Any = "") extends LogEvent {
def level = DebugLevel
}
@ -318,7 +359,7 @@ object Logging {
case e: Warning warning(e)
case e: Info info(e)
case e: Debug debug(e)
case e warning(Warning(this, "received unexpected event of class " + e.getClass + ": " + e))
case e warning(Warning(simpleName(this), "received unexpected event of class " + e.getClass + ": " + e))
}
}
@ -326,7 +367,7 @@ object Logging {
println(errorFormat.format(
timestamp,
event.thread.getName,
instanceName(event.instance),
event.logSource,
event.message,
stackTraceFor(event.cause)))
@ -334,21 +375,21 @@ object Logging {
println(warningFormat.format(
timestamp,
event.thread.getName,
instanceName(event.instance),
event.logSource,
event.message))
def info(event: Info) =
println(infoFormat.format(
timestamp,
event.thread.getName,
instanceName(event.instance),
event.logSource,
event.message))
def debug(event: Debug) =
println(debugFormat.format(
timestamp,
event.thread.getName,
instanceName(event.instance),
event.logSource,
event.message))
def instanceName(instance: AnyRef): String = instance match {
@ -491,7 +532,7 @@ trait LoggingAdapter {
}
}
class BusLogging(val bus: LoggingBus, val loggingInstance: AnyRef) extends LoggingAdapter {
class BusLogging(val bus: LoggingBus, val logSource: String) extends LoggingAdapter {
import Logging._
@ -500,14 +541,14 @@ class BusLogging(val bus: LoggingBus, val loggingInstance: AnyRef) extends Loggi
def isInfoEnabled = bus.logLevel >= InfoLevel
def isDebugEnabled = bus.logLevel >= DebugLevel
protected def notifyError(message: String) { bus.publish(Error(loggingInstance, message)) }
protected def notifyError(message: String) { bus.publish(Error(logSource, message)) }
protected def notifyError(cause: Throwable, message: String) { bus.publish(Error(cause, loggingInstance, message)) }
protected def notifyError(cause: Throwable, message: String) { bus.publish(Error(cause, logSource, message)) }
protected def notifyWarning(message: String) { bus.publish(Warning(loggingInstance, message)) }
protected def notifyWarning(message: String) { bus.publish(Warning(logSource, message)) }
protected def notifyInfo(message: String) { bus.publish(Info(loggingInstance, message)) }
protected def notifyInfo(message: String) { bus.publish(Info(logSource, message)) }
protected def notifyDebug(message: String) { bus.publish(Debug(loggingInstance, message)) }
protected def notifyDebug(message: String) { bus.publish(Debug(logSource, message)) }
}

View file

@ -12,8 +12,6 @@ import scala.annotation.tailrec
*/
object Helpers {
implicit def null2Option[T](t: T): Option[T] = Option(t)
def compareIdentityHash(a: AnyRef, b: AnyRef): Int = {
/*
* make sure that there is no overflow or underflow in comparisons, so
@ -28,19 +26,6 @@ object Helpers {
def compare(a: AnyRef, b: AnyRef): Int = compareIdentityHash(a, b)
}
def intToBytes(value: Int): Array[Byte] = {
val bytes = new Array[Byte](4)
bytes(0) = (value >>> 24).asInstanceOf[Byte]
bytes(1) = (value >>> 16).asInstanceOf[Byte]
bytes(2) = (value >>> 8).asInstanceOf[Byte]
bytes(3) = value.asInstanceOf[Byte]
bytes
}
def bytesToInt(bytes: Array[Byte], offset: Int): Int = {
(0 until 4).foldLeft(0)((value, index) value + ((bytes(index + offset) & 0x000000FF) << ((4 - 1 - index) * 8)))
}
final val base64chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789*?"
@tailrec
@ -72,77 +57,4 @@ object Helpers {
throw e
}
}
/**
* Convenience helper to cast the given Option of Any to an Option of the given type. Will throw a ClassCastException
* if the actual type is not assignable from the given one.
*/
def narrow[T](o: Option[Any]): Option[T] = {
require((o ne null), "Option to be narrowed must not be null!")
o.asInstanceOf[Option[T]]
}
/**
* Convenience helper to cast the given Option of Any to an Option of the given type. Will swallow a possible
* ClassCastException and return None in that case.
*/
def narrowSilently[T: Manifest](o: Option[Any]): Option[T] =
try {
narrow(o)
} catch {
case e: ClassCastException
None
}
/**
* Reference that can hold either a typed value or an exception.
*
* Usage:
* <pre>
* scala> ResultOrError(1)
* res0: ResultOrError[Int] = ResultOrError@a96606
*
* scala> res0()
* res1: Int = 1
*
* scala> res0() = 3
*
* scala> res0()
* res3: Int = 3
*
* scala> res0() = { println("Hello world"); 3}
* Hello world
*
* scala> res0()
* res5: Int = 3
*
* scala> res0() = error("Lets see what happens here...")
*
* scala> res0()
* java.lang.RuntimeException: Lets see what happens here...
* at ResultOrError.apply(Helper.scala:11)
* at .<init>(<console>:6)
* at .<clinit>(<console>)
* at Re...
* </pre>
*/
class ResultOrError[R](result: R) {
private[this] var contents: Either[R, Throwable] = Left(result)
def update(value: R) {
contents = try {
Left(value)
} catch {
case (error: Throwable) Right(error)
}
}
def apply() = contents match {
case Left(result) result
case Right(error) throw error.fillInStackTrace
}
}
object ResultOrError {
def apply[R](result: R) = new ResultOrError(result)
}
}

View file

@ -24,7 +24,7 @@ object JMX {
case e: InstanceAlreadyExistsException
Some(mbeanServer.getObjectInstance(name))
case e: Exception
system.eventStream.publish(Error(e, this, "Error when registering mbean [%s]".format(mbean)))
system.eventStream.publish(Error(e, "JMX", "Error when registering mbean [%s]".format(mbean)))
None
}
@ -32,6 +32,6 @@ object JMX {
mbeanServer.unregisterMBean(mbean)
} catch {
case e: InstanceNotFoundException {}
case e: Exception system.eventStream.publish(Error(e, this, "Error while unregistering mbean [%s]".format(mbean)))
case e: Exception system.eventStream.publish(Error(e, "JMX", "Error while unregistering mbean [%s]".format(mbean)))
}
}

View file

@ -128,7 +128,7 @@ Dependencies
You can look at the Ivy dependency resolution information that is created on
``sbt update`` and found in ``~/.ivy2/cache``. For example, the
``.ivy2/cache/se.scalablesolutions.akka-akka-cluster-compile.xml`` file contains
``~/.ivy2/cache/com.typesafe.akka-akka-remote-compile.xml`` file contains
the resolution information for the akka-cluster module compile dependencies. If
you open this file in a web browser you will get an easy to navigate view of
dependencies.

View file

@ -54,7 +54,7 @@ To use the plugin, first add a plugin definition to your sbt project by creating
resolvers += Classpaths.typesafeResolver
addSbtPlugin("se.scalablesolutions.akka" % "akka-sbt-plugin" % "2.0-SNAPSHOT")
addSbtPlugin("com.typesafe.akka" % "akka-sbt-plugin" % "2.0-SNAPSHOT")
Then use the AkkaKernelPlugin settings. In a 'light' configuration (build.sbt)::
@ -75,7 +75,7 @@ Or in a 'full' configuration (Build.scala). For example::
version := "0.1",
scalaVersion := "2.9.1"
resolvers += "Typesafe Repo" at "http://repo.typesafe.com/typesafe/releases/",
libraryDependencies += "se.scalablesolutions.akka" % "akka-kernel" % "2.0-SNAPSHOT"
libraryDependencies += "com.typesafe.akka" % "akka-kernel" % "2.0-SNAPSHOT"
)
)
}

View file

@ -180,7 +180,7 @@ It should now look something like this:
<dependencies>
<dependency>
<groupId>se.scalablesolutions.akka</groupId>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor</artifactId>
<version>2.0-SNAPSHOT</version>
</dependency>

View file

@ -192,7 +192,7 @@ in the directory you want to create your project in::
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
libraryDependencies += "se.scalablesolutions.akka" % "akka-actor" % "2.0-SNAPSHOT"
libraryDependencies += "com.typesafe.akka" % "akka-actor" % "2.0-SNAPSHOT"
Create a directory ``src/main/scala`` in which you will store the Scala source
files.
@ -202,11 +202,11 @@ modules beyond ``akka-actor``, you can add these as ``libraryDependencies`` in
``build.sbt``. Note that there must be a blank line between each. Here is an
example adding ``akka-remote`` and ``akka-stm``::
libraryDependencies += "se.scalablesolutions.akka" % "akka-actor" % "2.0-SNAPSHOT"
libraryDependencies += "com.typesafe.akka" % "akka-actor" % "2.0-SNAPSHOT"
libraryDependencies += "se.scalablesolutions.akka" % "akka-remote" % "2.0-SNAPSHOT"
libraryDependencies += "com.typesafe.akka" % "akka-remote" % "2.0-SNAPSHOT"
libraryDependencies += "se.scalablesolutions.akka" % "akka-stm" % "2.0-SNAPSHOT"
libraryDependencies += "com.typesafe.akka" % "akka-stm" % "2.0-SNAPSHOT"
So, now we are all set.

View file

@ -117,7 +117,7 @@ Summary of the essential parts for using Akka with Maven:
.. code-block:: xml
<dependency>
<groupId>se.scalablesolutions.akka</groupId>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor</artifactId>
<version>2.0-SNAPSHOT</version>
</dependency>
@ -145,7 +145,7 @@ SBT installation instructions on `https://github.com/harrah/xsbt/wiki/Setup <htt
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
libraryDependencies += "se.scalablesolutions.akka" % "akka-actor" % "2.0-SNAPSHOT"
libraryDependencies += "com.typesafe.akka" % "akka-actor" % "2.0-SNAPSHOT"
Using Akka with Eclipse

View file

@ -47,4 +47,22 @@ directly.
* http://download.java.net/maven/glassfish
* http://databinder.net/repo
SNAPSHOT Versions
=================
Nightly builds are available in `<http://repo.typesafe.com/typesafe/maven-timestamps/>`_ repository as
timestamped snapshot versions. Pick a timestamp from
`<http://repo.typesafe.com/typesafe/maven-timestamps/com/typesafe/akka/akka-actor/>`_.
All Akka modules that belong to the same build have the same timestamp.
Make sure that you add the repository to the sbt resolvers or maven repositories::
resolvers += "Typesafe Timestamp Repo" at "http://repo.typesafe.com/typesafe/maven-timestamps/"
Define the library dependencies with the timestamp as version::
libraryDependencies += "com.typesafe.akka" % "akka-actor" % "2.0-20111118-000627"
libraryDependencies += "com.typesafe.akka" % "akka-remote" % "2.0-20111118-000627"

View file

@ -99,7 +99,7 @@ If you want to use jetty-run in SBT you need to exclude the version of Jetty tha
override def ivyXML =
<dependencies>
<dependency org="se.scalablesolutions.akka" name="akka-http" rev="AKKA_VERSION_GOES_HERE">
<dependency org="com.typesafe.akka" name="akka-http" rev="AKKA_VERSION_GOES_HERE">
<exclude module="jetty"/>
</dependency>
</dependencies>

View file

@ -28,7 +28,7 @@ class BeanstalkBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner)
val messageSubmitTimeout = Duration(system.settings.config.getInt("akka.actor.mailbox.beanstalk.message-submit-timeout"), defaultTimeUnit).toSeconds.toInt
val messageTimeToLive = Duration(system.settings.config.getInt("akka.actor.mailbox.beanstalk.message-time-to-live"), defaultTimeUnit).toSeconds.toInt
val log = Logging(system, this)
val log = Logging(system, "BeanstalkBasedMailbox")
private val queue = new ThreadLocal[Client] { override def initialValue = connect(name) }

View file

@ -19,7 +19,7 @@ object FileBasedMailbox {
class FileBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization {
val log = Logging(system, this)
val log = Logging(system, "FileBasedMailbox")
val queuePath = FileBasedMailbox.queuePath(owner.system.settings.config)

View file

@ -39,7 +39,7 @@ class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) {
val writeTimeout = config.getInt(WRITE_TIMEOUT_KEY)
val readTimeout = config.getInt(READ_TIMEOUT_KEY)
val log = Logging(system, this)
val log = Logging(system, "MongoBasedMailbox")
@volatile
private var mongo = connect()

View file

@ -20,7 +20,7 @@ class RedisBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with
@volatile
private var clients = connect() // returns a RedisClientPool for multiple asynchronous message handling
val log = Logging(system, this)
val log = Logging(system, "RedisBasedMailbox")
def enqueue(receiver: ActorRef, envelope: Envelope) {
log.debug("ENQUEUING message in redis-based mailbox [%s]".format(envelope))

View file

@ -30,7 +30,7 @@ class ZooKeeperBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner)
val queueNode = "/queues"
val queuePathTemplate = queueNode + "/%s"
val log = Logging(system, this)
val log = Logging(system, "ZooKeeperBasedMailbox")
private val zkClient = new AkkaZkClient(zkServerAddresses, sessionTimeout, connectionTimeout)
private val queue = new ZooKeeperQueue[Array[Byte]](zkClient, queuePathTemplate.format(name), blockingQueue)

View file

@ -102,7 +102,7 @@ class Gossiper(remote: Remote) {
nodeMembershipChangeListeners: Set[NodeMembershipChangeListener] = Set.empty[NodeMembershipChangeListener])
private val system = remote.system
private val log = Logging(system, this)
private val log = Logging(system, "Gossiper")
private val failureDetector = remote.failureDetector
private val connectionManager = new RemoteConnectionManager(system, remote, Map.empty[RemoteAddress, ActorRef])
private val seeds = Set(address) // FIXME read in list of seeds from config

View file

@ -28,7 +28,7 @@ import java.util.concurrent.atomic.AtomicLong
*/
class Remote(val system: ActorSystemImpl, val nodename: String) {
val log = Logging(system, this)
val log = Logging(system, "Remote")
import system._
import settings._
@ -264,7 +264,7 @@ class RemoteMessage(input: RemoteMessageProtocol, remote: RemoteSupport, classLo
.newInstance(exception.getMessage).asInstanceOf[Throwable]
} catch {
case problem: Exception
remote.system.eventStream.publish(Logging.Error(problem, remote, problem.getMessage))
remote.system.eventStream.publish(Logging.Error(problem, "RemoteMessage", problem.getMessage))
CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorException(problem, classname, exception.getMessage)
}
}

View file

@ -34,7 +34,8 @@ class RemoteActorRefProvider(
val dispatcher: MessageDispatcher,
val scheduler: Scheduler) extends ActorRefProvider {
val log = Logging(eventStream, this)
val log = Logging(eventStream, "RemoteActorRefProvider")
val local = new LocalActorRefProvider(settings, rootPath, eventStream, dispatcher, scheduler)
def deathWatch = local.deathWatch

View file

@ -25,7 +25,7 @@ class RemoteConnectionManager(
initialConnections: Map[RemoteAddress, ActorRef] = Map.empty[RemoteAddress, ActorRef])
extends ConnectionManager {
val log = Logging(system, this)
val log = Logging(system, "RemoteConnectionManager")
// FIXME is this VersionedIterable really needed? It is not used I think. Complicates API. See 'def connections' etc.
case class State(version: Long, connections: Map[RemoteAddress, ActorRef])

View file

@ -39,7 +39,7 @@ abstract class RemoteClient private[akka] (
val remoteSupport: NettyRemoteSupport,
val remoteAddress: RemoteAddress) {
val log = Logging(remoteSupport.system, this)
val log = Logging(remoteSupport.system, "RemoteClient")
val name = simpleName(this) + "@" + remoteAddress
@ -351,7 +351,7 @@ class ActiveRemoteClientHandler(
* Provides the implementation of the Netty remote support
*/
class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) with RemoteMarshallingOps {
val log = Logging(system, this)
val log = Logging(system, "NettyRemoteSupport")
val serverSettings = new RemoteServerSettings(system.settings.config, system.settings.DefaultTimeUnit)
val clientSettings = new RemoteClientSettings(system.settings.config, system.settings.DefaultTimeUnit)
@ -481,7 +481,7 @@ class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) wi
}
class NettyRemoteServer(val remoteSupport: NettyRemoteSupport, val loader: Option[ClassLoader]) {
val log = Logging(remoteSupport.system, this)
val log = Logging(remoteSupport.system, "NettyRemoteServer")
import remoteSupport.serverSettings._
val address = remoteSupport.system.rootPath.remoteAddress
@ -586,7 +586,7 @@ class RemoteServerHandler(
val applicationLoader: Option[ClassLoader],
val remoteSupport: NettyRemoteSupport) extends SimpleChannelUpstreamHandler {
val log = Logging(remoteSupport.system, this)
val log = Logging(remoteSupport.system, "RemoteServerHandler")
import remoteSupport.serverSettings._

View file

@ -110,6 +110,8 @@ class CallingThreadDispatcher(
val name: String = "calling-thread") extends MessageDispatcher(_prerequisites) {
import CallingThreadDispatcher._
val log = akka.event.Logging(prerequisites.eventStream, "CallingThreadDispatcher")
protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(actor)
private def getMailbox(actor: ActorCell): Option[CallingThreadMailbox] = actor.mailbox match {
@ -215,12 +217,12 @@ class CallingThreadDispatcher(
true
} catch {
case ie: InterruptedException
prerequisites.eventStream.publish(Error(this, ie))
log.error(ie, "Interrupted during message processing")
Thread.currentThread().interrupt()
intex = ie
true
case e
prerequisites.eventStream.publish(Error(this, e))
log.error(e, "Error during message processing")
queue.leave
false
}

View file

@ -95,26 +95,20 @@ abstract class EventFilter(occurrences: Int) {
/*
* these default values are just there for easier subclassing
*/
protected val source: Option[AnyRef] = None
protected val source: Option[String] = None
protected val message: Either[String, Regex] = Left("")
protected val complete: Boolean = false
/**
* internal implementation helper, no guaranteed API
*/
protected def doMatch(src: AnyRef, msg: Any) = {
protected def doMatch(src: String, msg: Any) = {
val msgstr = if (msg != null) msg.toString else "null"
(source.isDefined && sourceMatch(src) || source.isEmpty) &&
(source.isDefined && source.get == src || source.isEmpty) &&
(message match {
case Left(s) if (complete) msgstr == s else msgstr.startsWith(s)
case Right(p) p.findFirstIn(msgstr).isDefined
})
}
private def sourceMatch(src: AnyRef) = {
source.get match {
case c: Class[_] c isInstance src
case s src == s
}
}
}
/**
@ -151,7 +145,7 @@ object EventFilter {
* `null` does NOT work (passing `null` disables the
* source filter).''
*/
def apply[A <: Throwable: Manifest](message: String = null, source: AnyRef = null, start: String = "", pattern: String = null, occurrences: Int = Int.MaxValue): EventFilter =
def apply[A <: Throwable: Manifest](message: String = null, source: String = null, start: String = "", pattern: String = null, occurrences: Int = Int.MaxValue): EventFilter =
ErrorFilter(manifest[A].erasure, Option(source),
if (message ne null) Left(message) else Option(pattern) map (new Regex(_)) toRight start,
message ne null)(occurrences)
@ -170,7 +164,7 @@ object EventFilter {
* `null` does NOT work (passing `null` disables the
* source filter).''
*/
def warning(message: String = null, source: AnyRef = null, start: String = "", pattern: String = null, occurrences: Int = Int.MaxValue): EventFilter =
def warning(message: String = null, source: String = null, start: String = "", pattern: String = null, occurrences: Int = Int.MaxValue): EventFilter =
WarningFilter(Option(source),
if (message ne null) Left(message) else Option(pattern) map (new Regex(_)) toRight start,
message ne null)(occurrences)
@ -189,7 +183,7 @@ object EventFilter {
* `null` does NOT work (passing `null` disables the
* source filter).''
*/
def info(message: String = null, source: AnyRef = null, start: String = "", pattern: String = null, occurrences: Int = Int.MaxValue): EventFilter =
def info(message: String = null, source: String = null, start: String = "", pattern: String = null, occurrences: Int = Int.MaxValue): EventFilter =
InfoFilter(Option(source),
if (message ne null) Left(message) else Option(pattern) map (new Regex(_)) toRight start,
message ne null)(occurrences)
@ -208,7 +202,7 @@ object EventFilter {
* `null` does NOT work (passing `null` disables the
* source filter).''
*/
def debug(message: String = null, source: AnyRef = null, start: String = "", pattern: String = null, occurrences: Int = Int.MaxValue): EventFilter =
def debug(message: String = null, source: String = null, start: String = "", pattern: String = null, occurrences: Int = Int.MaxValue): EventFilter =
DebugFilter(Option(source),
if (message ne null) Left(message) else Option(pattern) map (new Regex(_)) toRight start,
message ne null)(occurrences)
@ -244,7 +238,7 @@ object EventFilter {
*/
case class ErrorFilter(
throwable: Class[_],
override val source: Option[AnyRef],
override val source: Option[String],
override val message: Either[String, Regex],
override val complete: Boolean)(occurrences: Int) extends EventFilter(occurrences) {
@ -272,7 +266,7 @@ case class ErrorFilter(
* @param complete
* whether the events message must match the given message string or pattern completely
*/
def this(throwable: Class[_], source: AnyRef, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) =
def this(throwable: Class[_], source: String, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) =
this(throwable, Option(source),
if (message eq null) Left("")
else if (pattern) Right(new Regex(message))
@ -295,7 +289,7 @@ case class ErrorFilter(
* If you want to match all Warning events, the most efficient is to use <code>Left("")</code>.
*/
case class WarningFilter(
override val source: Option[AnyRef],
override val source: Option[String],
override val message: Either[String, Regex],
override val complete: Boolean)(occurrences: Int) extends EventFilter(occurrences) {
@ -321,7 +315,7 @@ case class WarningFilter(
* @param complete
* whether the events message must match the given message string or pattern completely
*/
def this(source: AnyRef, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) =
def this(source: String, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) =
this(Option(source),
if (message eq null) Left("")
else if (pattern) Right(new Regex(message))
@ -338,7 +332,7 @@ case class WarningFilter(
* If you want to match all Info events, the most efficient is to use <code>Left("")</code>.
*/
case class InfoFilter(
override val source: Option[AnyRef],
override val source: Option[String],
override val message: Either[String, Regex],
override val complete: Boolean)(occurrences: Int) extends EventFilter(occurrences) {
@ -364,7 +358,7 @@ case class InfoFilter(
* @param complete
* whether the events message must match the given message string or pattern completely
*/
def this(source: AnyRef, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) =
def this(source: String, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) =
this(Option(source),
if (message eq null) Left("")
else if (pattern) Right(new Regex(message))
@ -381,7 +375,7 @@ case class InfoFilter(
* If you want to match all Debug events, the most efficient is to use <code>Left("")</code>.
*/
case class DebugFilter(
override val source: Option[AnyRef],
override val source: Option[String],
override val message: Either[String, Regex],
override val complete: Boolean)(occurrences: Int) extends EventFilter(occurrences) {
@ -407,7 +401,7 @@ case class DebugFilter(
* @param complete
* whether the events message must match the given message string or pattern completely
*/
def this(source: AnyRef, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) =
def this(source: String, message: String, pattern: Boolean, complete: Boolean, occurrences: Int) =
this(Option(source),
if (message eq null) Left("")
else if (pattern) Right(new Regex(message))
@ -452,12 +446,12 @@ class TestEventListener extends Logging.DefaultLogger {
case event: LogEvent if (!filter(event)) print(event)
case DeadLetter(msg: SystemMessage, _, rcp)
if (!msg.isInstanceOf[Terminate]) {
val event = Warning(rcp, "received dead system message: " + msg)
val event = Warning(rcp.toString, "received dead system message: " + msg)
if (!filter(event)) print(event)
}
case DeadLetter(msg, snd, rcp)
if (!msg.isInstanceOf[Terminated]) {
val event = Warning(rcp, "received dead letter from " + snd + ": " + msg)
val event = Warning(rcp.toString, "received dead letter from " + snd + ": " + msg)
if (!filter(event)) print(event)
}
}

View file

@ -602,5 +602,5 @@ object TestProbe {
}
trait ImplicitSender { this: TestKit
implicit def implicitSenderTestActor = testActor
implicit def self = testActor
}

View file

@ -43,7 +43,7 @@ object AkkaSpec {
abstract class AkkaSpec(_application: ActorSystem = ActorSystem(getClass.getSimpleName, AkkaSpec.testConf))
extends TestKit(_application) with WordSpec with MustMatchers with BeforeAndAfterAll {
val log: LoggingAdapter = Logging(system, this)
val log: LoggingAdapter = Logging(system, this.getClass)
final override def beforeAll {
atStartup()