diff --git a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala index 26f510d08a..9440c18fc3 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala @@ -19,6 +19,7 @@ import akka.dispatch.{ Await, Dispatchers, Future, Promise } import akka.pattern.ask import akka.serialization.JavaSerializer import akka.actor.TypedActor._ +import java.lang.IllegalStateException object TypedActorSpec { @@ -162,20 +163,26 @@ object TypedActorSpec { class LifeCyclesImpl(val latch: CountDownLatch) extends PreStart with PostStop with PreRestart with PostRestart with LifeCycles with Receiver { + private def ensureContextAvailable[T](f: ⇒ T): T = TypedActor.context match { + case null ⇒ throw new IllegalStateException("TypedActor.context is null!") + case some ⇒ f + } + override def crash(): Unit = throw new IllegalStateException("Crash!") - override def preStart(): Unit = latch.countDown() + override def preStart(): Unit = ensureContextAvailable(latch.countDown()) - override def postStop(): Unit = for (i ← 1 to 3) latch.countDown() + override def postStop(): Unit = ensureContextAvailable(for (i ← 1 to 3) latch.countDown()) - override def preRestart(reason: Throwable, message: Option[Any]): Unit = for (i ← 1 to 5) latch.countDown() + override def preRestart(reason: Throwable, message: Option[Any]): Unit = ensureContextAvailable(for (i ← 1 to 5) latch.countDown()) - override def postRestart(reason: Throwable): Unit = for (i ← 1 to 7) latch.countDown() + override def postRestart(reason: Throwable): Unit = ensureContextAvailable(for (i ← 1 to 7) latch.countDown()) override def onReceive(msg: Any, sender: ActorRef): Unit = { - msg match { - case "pigdog" ⇒ sender ! "dogpig" - } + ensureContextAvailable( + msg match { + case "pigdog" ⇒ sender ! "dogpig" + }) } } } diff --git a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala index add8173085..d2497c4a69 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala @@ -5,10 +5,11 @@ package akka.event import akka.testkit.AkkaSpec import akka.util.duration._ -import akka.actor.{ Actor, ActorRef, ActorSystemImpl } +import akka.actor.{ Actor, ActorRef, ActorSystemImpl, ActorSystem, Props, UnhandledMessage } import com.typesafe.config.ConfigFactory import scala.collection.JavaConverters._ -import akka.actor.ActorSystem +import akka.event.Logging.InitializeLogger +import akka.pattern.gracefulStop object EventStreamSpec { @@ -20,6 +21,14 @@ object EventStreamSpec { } """.format(Logging.StandardOutLoggerName)) + val configUnhandled = ConfigFactory.parseString(""" + akka { + stdout-loglevel = WARNING + loglevel = DEBUG + actor.debug.unhandled = on + } + """) + case class M(i: Int) case class SetTarget(ref: ActorRef) @@ -27,9 +36,13 @@ object EventStreamSpec { class MyLog extends Actor { var dst: ActorRef = context.system.deadLetters def receive = { - case Logging.InitializeLogger(bus) ⇒ bus.subscribe(context.self, classOf[SetTarget]); sender ! Logging.LoggerInitialized - case SetTarget(ref) ⇒ dst = ref; dst ! "OK" - case e: Logging.LogEvent ⇒ dst ! e + case Logging.InitializeLogger(bus) ⇒ + bus.subscribe(context.self, classOf[SetTarget]) + bus.subscribe(context.self, classOf[UnhandledMessage]) + sender ! Logging.LoggerInitialized + case SetTarget(ref) ⇒ dst = ref; dst ! "OK" + case e: Logging.LogEvent ⇒ dst ! e + case u: UnhandledMessage ⇒ dst ! u } } @@ -61,6 +74,19 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { } } + "be able to log unhandled messages" in { + val sys = ActorSystem("EventStreamSpecUnhandled", configUnhandled) + try { + sys.eventStream.subscribe(testActor, classOf[AnyRef]) + val m = UnhandledMessage(42, sys.deadLetters, sys.deadLetters) + sys.eventStream.publish(m) + expectMsgAllOf(m, Logging.Debug(sys.deadLetters.path.toString, sys.deadLetters.getClass, "unhandled message from " + sys.deadLetters + ": 42")) + sys.eventStream.unsubscribe(testActor) + } finally { + sys.shutdown() + } + } + "manage log levels" in { val bus = new EventStream(false) bus.startDefaultLoggers(impl) diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 6e747f8121..742b5e86d2 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -14,6 +14,11 @@ akka { # Event handlers to register at boot time (Logging$DefaultLogger logs to STDOUT) event-handlers = ["akka.event.Logging$DefaultLogger"] + + # Event handlers are created and registered synchronously during ActorSystem + # start-up, and since they are actors, this timeout is used to bound the + # waiting time + event-handler-startup-timeout = 5s # Log level used by the configured loggers (see "event-handlers") as soon # as they have been started; before that, see "stdout-loglevel" @@ -275,6 +280,9 @@ akka { # enable DEBUG logging of subscription changes on the eventStream event-stream = off + + # enable DEBUG logging of unhandled messages + unhandled = off } # Entries for pluggable serializers and their bindings. @@ -295,7 +303,7 @@ akka { # Used to set the behavior of the scheduler. # Changing the default values may change the system behavior drastically so make sure - # you know what you're doing! + # you know what you're doing! See the Scheduler section of the Akka documentation for more details. scheduler { # The HashedWheelTimer (HWT) implementation from Netty is used as the default scheduler # in the system. diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index a56e920eb2..4d32c3c0b1 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -17,9 +17,9 @@ import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap import java.io.Closeable import akka.dispatch.Await.Awaitable import akka.dispatch.Await.CanAwait -import java.util.concurrent.{ CountDownLatch, TimeoutException, RejectedExecutionException } import akka.util._ import collection.immutable.Stack +import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException } object ActorSystem { @@ -125,12 +125,15 @@ object ActorSystem { final val LogLevel = getString("akka.loglevel") final val StdoutLogLevel = getString("akka.stdout-loglevel") final val EventHandlers: Seq[String] = getStringList("akka.event-handlers").asScala + final val EventHandlerStartTimeout = Timeout(Duration(getMilliseconds("akka.event-handler-startup-timeout"), MILLISECONDS)) final val LogConfigOnStart = config.getBoolean("akka.log-config-on-start") + final val AddLoggingReceive = getBoolean("akka.actor.debug.receive") final val DebugAutoReceive = getBoolean("akka.actor.debug.autoreceive") final val DebugLifecycle = getBoolean("akka.actor.debug.lifecycle") final val FsmDebugEvent = getBoolean("akka.actor.debug.fsm") final val DebugEventStream = getBoolean("akka.actor.debug.event-stream") + final val DebugUnhandledMessage = getBoolean("akka.actor.debug.unhandled") final val Home = config.getString("akka.home") match { case "" ⇒ None @@ -200,7 +203,7 @@ object ActorSystem { * * Where no name is given explicitly, one will be automatically generated. * - * Important Notice: + * Important Notice: * * This class is not meant to be extended by user code. If you want to * actually roll your own Akka, it will probably be better to look into @@ -376,7 +379,7 @@ abstract class ActorSystem extends ActorRefFactory { /** * More powerful interface to the actor system’s implementation which is presented to extensions (see [[akka.actor.Extension]]). * - * Important Notice: + * Important Notice: * * This class is not meant to be extended by user code. If you want to * actually roll your own Akka, beware that you are completely on your own in @@ -404,6 +407,11 @@ abstract class ExtendedActorSystem extends ActorSystem { */ def deathWatch: DeathWatch + /** + * A ThreadFactory that can be used if the transport needs to create any Threads + */ + def threadFactory: ThreadFactory + /** * ClassLoader wrapper which is used for reflective accesses internally. This is set * to use the context class loader, if one is set, or the class loader which diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 319bd10a50..f775042566 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -227,15 +227,19 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi case _ ⇒ super.supervisorStrategy } - override def preStart(): Unit = me match { - case l: PreStart ⇒ l.preStart() - case _ ⇒ super.preStart() + override def preStart(): Unit = withContext { + me match { + case l: PreStart ⇒ l.preStart() + case _ ⇒ super.preStart() + } } override def postStop(): Unit = try { - me match { - case l: PostStop ⇒ l.postStop() - case _ ⇒ super.postStop() + withContext { + me match { + case l: PostStop ⇒ l.postStop() + case _ ⇒ super.postStop() + } } } finally { TypedActor(context.system).invocationHandlerFor(proxyVar.get) match { @@ -246,14 +250,18 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi } } - override def preRestart(reason: Throwable, message: Option[Any]): Unit = me match { - case l: PreRestart ⇒ l.preRestart(reason, message) - case _ ⇒ super.preRestart(reason, message) + override def preRestart(reason: Throwable, message: Option[Any]): Unit = withContext { + me match { + case l: PreRestart ⇒ l.preRestart(reason, message) + case _ ⇒ super.preRestart(reason, message) + } } - override def postRestart(reason: Throwable): Unit = me match { - case l: PostRestart ⇒ l.postRestart(reason) - case _ ⇒ super.postRestart(reason) + override def postRestart(reason: Throwable): Unit = withContext { + me match { + case l: PostRestart ⇒ l.postRestart(reason) + case _ ⇒ super.postRestart(reason) + } } protected def withContext[T](unitOfWork: ⇒ T): T = { diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index 3383ea2fee..bf4fc7996d 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -14,6 +14,7 @@ import java.util.concurrent.atomic.AtomicInteger import scala.util.control.NoStackTrace import java.util.concurrent.TimeoutException import akka.dispatch.Await +import annotation.implicitNotFound /** * This trait brings log level handling to the EventStream: it reads the log @@ -95,26 +96,40 @@ trait LoggingBus extends ActorEventBus { case Nil ⇒ "akka.event.Logging$DefaultLogger" :: Nil case loggers ⇒ loggers } - val myloggers = for { - loggerName ← defaultLoggers - if loggerName != StandardOutLoggerName - } yield { - try { - system.dynamicAccess.getClassFor[Actor](loggerName) match { - case Right(actorClass) ⇒ addLogger(system, actorClass, level, logName) - case Left(exception) ⇒ throw exception + val myloggers = + for { + loggerName ← defaultLoggers + if loggerName != StandardOutLoggerName + } yield { + try { + system.dynamicAccess.getClassFor[Actor](loggerName) match { + case Right(actorClass) ⇒ addLogger(system, actorClass, level, logName) + case Left(exception) ⇒ throw exception + } + } catch { + case e: Exception ⇒ + throw new ConfigurationException( + "Event Handler specified in config can't be loaded [" + loggerName + + "] due to [" + e.toString + "]", e) } - } catch { - case e: Exception ⇒ - throw new ConfigurationException( - "Event Handler specified in config can't be loaded [" + loggerName + - "] due to [" + e.toString + "]", e) } - } guard.withGuard { loggers = myloggers _logLevel = level } + try { + if (system.settings.DebugUnhandledMessage) + subscribe(system.systemActorOf(Props(new Actor { + println("started" + self) + def receive = { + case UnhandledMessage(msg, sender, rcp) ⇒ + println("got it") + publish(Debug(rcp.path.toString, rcp.getClass, "unhandled message from " + sender + ": " + msg)) + } + }), "UnhandledMessageForwarder"), classOf[UnhandledMessage]) + } catch { + case _: InvalidActorNameException ⇒ // ignore if it is already running + } publish(Debug(logName, this.getClass, "Default Loggers started")) if (!(defaultLoggers contains StandardOutLoggerName)) { unsubscribe(StandardOutLogger) @@ -153,7 +168,7 @@ trait LoggingBus extends ActorEventBus { private def addLogger(system: ActorSystemImpl, clazz: Class[_ <: Actor], level: LogLevel, logName: String): ActorRef = { val name = "log" + Extension(system).id() + "-" + simpleName(clazz) val actor = system.systemActorOf(Props(clazz), name) - implicit val timeout = Timeout(5 seconds) + implicit def timeout = system.settings.EventHandlerStartTimeout import akka.pattern.ask val response = try Await.result(actor ? InitializeLogger(this), timeout.duration) catch { case _: TimeoutException ⇒ @@ -211,7 +226,7 @@ trait LoggingBus extends ActorEventBus { * * The default implementation of the second variant will just call the first. */ -trait LogSource[-T] { +@implicitNotFound("Cannot find LogSource for ${T} please see ScalaDoc for LogSource for how to obtain or construct one.") trait LogSource[-T] { def genString(t: T): String def genString(t: T, system: ActorSystem): String = genString(t) def getClazz(t: T): Class[_] = t.getClass diff --git a/akka-actor/src/main/scala/akka/util/ByteString.scala b/akka-actor/src/main/scala/akka/util/ByteString.scala index 15ece6d3a8..455e9cdca0 100644 --- a/akka-actor/src/main/scala/akka/util/ByteString.scala +++ b/akka-actor/src/main/scala/akka/util/ByteString.scala @@ -349,6 +349,7 @@ final class ByteStringBuilder extends Builder[Byte, ByteString] { val newtemp = new Array[Byte](size) if (_tempLength > 0) Array.copy(_temp, 0, newtemp, 0, _tempLength) _temp = newtemp + _tempCapacity = _temp.length } private def ensureTempSize(size: Int) { diff --git a/akka-camel/src/main/scala/akka/camel/Producer.scala b/akka-camel/src/main/scala/akka/camel/Producer.scala index 293de6ad39..d42b78911a 100644 --- a/akka-camel/src/main/scala/akka/camel/Producer.scala +++ b/akka-camel/src/main/scala/akka/camel/Producer.scala @@ -88,13 +88,13 @@ trait ProducerSupport { this: Actor ⇒ * @see Producer#produce(Any, ExchangePattern) */ protected def produce: Receive = { - case res: MessageResult ⇒ receiveAfterProduce(res.message) - case res: FailureResult ⇒ receiveAfterProduce(res.failure) + case res: MessageResult ⇒ routeResponse(res.message) + case res: FailureResult ⇒ routeResponse(res.failure) case msg ⇒ { if (oneway) - produce(receiveBeforeProduce(msg), ExchangePattern.InOnly) + produce(transformOutgoingMessage(msg), ExchangePattern.InOnly) else - produce(receiveBeforeProduce(msg), ExchangePattern.InOut) + produce(transformOutgoingMessage(msg), ExchangePattern.InOut) } } @@ -103,9 +103,14 @@ trait ProducerSupport { this: Actor ⇒ * message is passed as argument. By default, this method simply returns the argument but may be overridden * by subtraits or subclasses. */ - protected def receiveBeforeProduce: PartialFunction[Any, Any] = { - case msg ⇒ msg - } + protected def transformOutgoingMessage(msg: Any): Any = msg + + /** + * Called before the response message is sent to the original sender. The original + * message is passed as argument. By default, this method simply returns the argument but may be overridden + * by subtraits or subclasses. + */ + protected def transformResponse(msg: Any): Any = msg /** * Called after a response was received from the endpoint specified by endpointUri. The @@ -114,9 +119,8 @@ trait ProducerSupport { this: Actor ⇒ * done. This method may be overridden by subtraits or subclasses (e.g. to forward responses to another * actor). */ - protected def receiveAfterProduce: Receive = { - case msg ⇒ if (!oneway) sender ! msg - } + + protected def routeResponse(msg: Any): Unit = if (!oneway) sender ! transformResponse(msg) } diff --git a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala index b947e43d64..c4d0a9c1a0 100644 --- a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala +++ b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala @@ -19,7 +19,14 @@ abstract class UntypedProducerActor extends UntypedActor with ProducerSupport { * message is passed as argument. By default, this method simply returns the argument but may be overridden * by subclasses. */ - def onReceiveBeforeProduce(message: AnyRef): AnyRef = message + def onTransformOutgoingMessage(message: AnyRef): AnyRef = message + + /** + * Called before the response message is sent to original sender. The original + * message is passed as argument. By default, this method simply returns the argument but may be overridden + * by subclasses. + */ + def onTransformResponse(message: AnyRef): AnyRef = message /** * Called after a response was received from the endpoint specified by endpointUri. The @@ -27,15 +34,11 @@ abstract class UntypedProducerActor extends UntypedActor with ProducerSupport { * if oneway is false. If oneway is true, nothing is * done. This method may be overridden by subclasses (e.g. to forward responses to another actor). */ - def onReceiveAfterProduce(message: AnyRef): Unit = super.receiveAfterProduce(message) + def onRouteResponse(message: AnyRef): Unit = super.routeResponse(message) - final override def receiveBeforeProduce = { - case msg: AnyRef ⇒ onReceiveBeforeProduce(msg) - } - - final override def receiveAfterProduce = { - case msg: AnyRef ⇒ onReceiveAfterProduce(msg) - } + final override def transformOutgoingMessage(msg: Any): AnyRef = onTransformOutgoingMessage(msg.asInstanceOf[AnyRef]) + final override def transformResponse(msg: Any): AnyRef = onTransformResponse(msg.asInstanceOf[AnyRef]) + final override def routeResponse(msg: Any): Unit = onRouteResponse(msg.asInstanceOf[AnyRef]) final override def endpointUri = getEndpointUri diff --git a/akka-camel/src/test/java/akka/camel/SampleUntypedForwardingProducer.java b/akka-camel/src/test/java/akka/camel/SampleUntypedForwardingProducer.java index ef0b7465c5..375ef36835 100644 --- a/akka-camel/src/test/java/akka/camel/SampleUntypedForwardingProducer.java +++ b/akka-camel/src/test/java/akka/camel/SampleUntypedForwardingProducer.java @@ -15,7 +15,7 @@ public class SampleUntypedForwardingProducer extends UntypedProducerActor { } @Override - public void onReceiveAfterProduce(Object message) { + public void onRouteResponse(Object message) { CamelMessage msg = (CamelMessage)message; String body = msg.getBodyAs(String.class,getCamelContext()); getProducerTemplate().sendBody("direct:forward-test-1", body); diff --git a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala index 515b776cfe..44ce2540c8 100644 --- a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala @@ -266,7 +266,7 @@ object ProducerFeatureTest { class TestProducer(uri: String, upper: Boolean = false) extends Actor with Producer { def endpointUri = uri - override protected def receiveBeforeProduce = { + override protected def transformOutgoingMessage(msg: Any) = msg match { case msg: CamelMessage ⇒ if (upper) msg.mapBody { body: String ⇒ body.toUpperCase } @@ -277,9 +277,7 @@ object ProducerFeatureTest { class TestForwarder(uri: String, target: ActorRef) extends Actor with Producer { def endpointUri = uri - override protected def receiveAfterProduce = { - case msg ⇒ target forward msg - } + override protected def routeResponse(msg: Any): Unit = target forward msg } class TestResponder extends Actor { diff --git a/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala b/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala index d0e4c8da13..de1142b668 100644 --- a/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala @@ -157,6 +157,45 @@ class VectorClockSpec extends AkkaSpec { merged1 == merged2 must be(true) } + "correctly merge two disjoint vector clocks" in { + val node1 = Node("1") + val node2 = Node("2") + val node3 = Node("3") + val node4 = Node("4") + + val clock1_1 = VectorClock() + val clock2_1 = clock1_1 + node1 + val clock3_1 = clock2_1 + node2 + val clock4_1 = clock3_1 + node2 + val clock5_1 = clock4_1 + node3 + + val clock1_2 = VectorClock() + val clock2_2 = clock1_2 + node4 + val clock3_2 = clock2_2 + node4 + + val merged1 = clock3_2 merge clock5_1 + merged1.versions.size must be(4) + merged1.versions.contains(node1) must be(true) + merged1.versions.contains(node2) must be(true) + merged1.versions.contains(node3) must be(true) + merged1.versions.contains(node4) must be(true) + + val merged2 = clock5_1 merge clock3_2 + merged2.versions.size must be(4) + merged2.versions.contains(node1) must be(true) + merged2.versions.contains(node2) must be(true) + merged2.versions.contains(node3) must be(true) + merged2.versions.contains(node4) must be(true) + + clock3_2 < merged1 must be(true) + clock5_1 < merged1 must be(true) + + clock3_2 < merged2 must be(true) + clock5_1 < merged2 must be(true) + + merged1 == merged2 must be(true) + } + "pass blank clock incrementing" in { val node1 = Node("1") val node2 = Node("2") diff --git a/akka-docs/_sphinx/static/favicon.ico b/akka-docs/_sphinx/static/favicon.ico index 858e2dbebd..9e8f8e9624 100644 Binary files a/akka-docs/_sphinx/static/favicon.ico and b/akka-docs/_sphinx/static/favicon.ico differ diff --git a/akka-docs/general/actor-systems.rst b/akka-docs/general/actor-systems.rst index d3113d85be..2051f2d845 100644 --- a/akka-docs/general/actor-systems.rst +++ b/akka-docs/general/actor-systems.rst @@ -29,7 +29,7 @@ The quintessential feature of actor systems is that tasks are split up and delegated until they become small enough to be handled in one piece. In doing so, not only is the task itself clearly structured, but the resulting actors can be reasoned about in terms of which messages they should process, how they -should react nominally and how failure should be handled. If one actor does not +should react normally and how failure should be handled. If one actor does not have the means for dealing with a certain situation, it sends a corresponding failure message to its supervisor, asking for help. The recursive structure then allows to handle failure at the right level. @@ -41,7 +41,7 @@ trying to keep everything “under the carpet”. Now, the difficulty in designing such a system is how to decide who should supervise what. There is of course no single best solution, but there are a few -guide lines which might be helpful: +guidelines which might be helpful: - If one actor manages the work another actor is doing, e.g. by passing on sub-tasks, then the manager should supervise the child. The reason is that @@ -101,6 +101,12 @@ Actor Best Practices breaks all the properties which make programming in actors such a nice experience. +#. Top-level actors are the innermost part of your Error Kernel, so create them + sparingly and prefer truly hierarchical systems. This has benefits wrt. + fault-handling (both considering the granularity of configuration and the + performance) and it also reduces the number of blocking calls made, since + the creation of top-level actors involves synchronous messaging. + What you should not concern yourself with ----------------------------------------- diff --git a/akka-docs/general/jmm.rst b/akka-docs/general/jmm.rst index 3fe94d89db..6d088c0c47 100644 --- a/akka-docs/general/jmm.rst +++ b/akka-docs/general/jmm.rst @@ -43,6 +43,12 @@ To prevent visibility and reordering problems on actors, Akka guarantees the fol * **The actor send rule:** the send of the message to an actor happens before the receive of that message by the same actor. * **The actor subsequent processing rule:** processing of one message happens before processing of the next message by the same actor. +.. note:: + + In layman's terms this means that changes to internal fields of the actor is visible when the next message + is processed by that actor. So fields in your actor does not need to be volatile or equivalent. + + Both rules only apply for the same actor instance and are not valid if different actors are used. Futures and the Java Memory Model diff --git a/akka-docs/java/code/akka/docs/actor/TypedActorDocTestBase.java b/akka-docs/java/code/akka/docs/actor/TypedActorDocTestBase.java index 72f950e2e7..30db92ee0f 100644 --- a/akka-docs/java/code/akka/docs/actor/TypedActorDocTestBase.java +++ b/akka-docs/java/code/akka/docs/actor/TypedActorDocTestBase.java @@ -150,4 +150,20 @@ public class TypedActorDocTestBase { //Ignore } } + + @Test public void proxyAnyActorRef() { + try { + //#typed-actor-remote + Squarer typedActor = + TypedActor.get(system). + typedActorOf( + new TypedProps(Squarer.class), + system.actorFor("akka://SomeSystem@somehost:2552/user/some/foobar") + ); + //Use "typedActor" as a FooBar + //#typed-actor-remote + } catch (Exception e) { + //dun care + } + } } diff --git a/akka-docs/java/index.rst b/akka-docs/java/index.rst index 4b7dcb5ebf..981e07f869 100644 --- a/akka-docs/java/index.rst +++ b/akka-docs/java/index.rst @@ -23,3 +23,4 @@ Java API fsm extending-akka zeromq + microkernel diff --git a/akka-docs/java/microkernel.rst b/akka-docs/java/microkernel.rst new file mode 100644 index 0000000000..7416838537 --- /dev/null +++ b/akka-docs/java/microkernel.rst @@ -0,0 +1,67 @@ + +.. _microkernel: + +############# + Microkernel (Java) +############# + +The Akka Microkernel is included in the Akka download found at `downloads`_. + +.. _downloads: http://akka.io/downloads + +To run an application with the microkernel you need to create a Bootable class +that handles the startup and shutdown the application. An example is included below. + +Put your application jar in the ``deploy`` directory to have it automatically +loaded. + +To start the kernel use the scripts in the ``bin`` directory, passing the boot +classes for your application. + +There is a simple example of an application setup for running with the +microkernel included in the akka download. This can be run with the following +command (on a unix-based system): + +.. code-block:: none + + bin/akka sample.kernel.hello.HelloKernel + +Use ``Ctrl-C`` to interrupt and exit the microkernel. + +On a Windows machine you can also use the bin/akka.bat script. + +The code for the Hello Kernel example (see the ``HelloKernel`` class for an example +of creating a Bootable): + +.. includecode:: ../../akka-samples/akka-sample-hello-kernel/src/main/java/sample/kernel/hello/java/HelloKernel.java + + +Distribution of microkernel application +--------------------------------------- + +To make a distribution package of the microkernel and your application the ``akka-sbt-plugin`` provides +``AkkaKernelPlugin``. It creates the directory structure, with jar files, configuration files and +start scripts. + +To use the sbt plugin you define it in your ``project/plugins.sbt``: + +.. includecode:: ../../akka-sbt-plugin/sample/project/plugins.sbt + +Then you add it to the settings of your ``project/Build.scala``. It is also important that you add the ``akka-kernel`` dependency. +This is an example of a complete sbt build file: + +.. includecode:: ../../akka-sbt-plugin/sample/project/Build.scala + +Run the plugin with sbt:: + + > dist + > dist:clean + +There are several settings that can be defined: + +* ``outputDirectory`` - destination directory of the package, default ``target/dist`` +* ``distJvmOptions`` - JVM parameters to be used in the start script +* ``configSourceDirs`` - Configuration files are copied from these directories, default ``src/config``, ``src/main/config``, ``src/main/resources`` +* ``distMainClass`` - Kernel main class to use in start script +* ``libFilter`` - Filter of dependency jar files +* ``additionalLibs`` - Additional dependency jar files diff --git a/akka-docs/java/scheduler.rst b/akka-docs/java/scheduler.rst index 3dde1345a6..faff8d9fe0 100644 --- a/akka-docs/java/scheduler.rst +++ b/akka-docs/java/scheduler.rst @@ -15,6 +15,13 @@ You can schedule sending of messages to actors and execution of tasks (functions You will get a ``Cancellable`` back that you can call :meth:`cancel` on to cancel the execution of the scheduled operation. +.. warning:: + + The default implementation of ``Scheduler`` used by Akka is based on the Netty ``HashedWheelTimer``. + It does not execute tasks at the exact time, but on every tick, it will run everything that is overdue. + The accuracy of the default Scheduler can be modified by the "ticks-per-wheel" and "tick-duration" configuration + properties. For more information, see: `HashedWheelTimers `_. + Some examples ------------- diff --git a/akka-docs/java/typed-actors.rst b/akka-docs/java/typed-actors.rst index c1de57c396..b2d7a9bfae 100644 --- a/akka-docs/java/typed-actors.rst +++ b/akka-docs/java/typed-actors.rst @@ -198,3 +198,10 @@ Proxying You can use the ``typedActorOf`` that takes a TypedProps and an ActorRef to proxy the given ActorRef as a TypedActor. This is usable if you want to communicate remotely with TypedActors on other machines, just look them up with ``actorFor`` and pass the ``ActorRef`` to ``typedActorOf``. + +Lookup & Remoting +----------------- + +Since ``TypedActors`` are backed by ``Akka Actors``, you can use ``actorFor`` together with ``typedActorOf`` to proxy ``ActorRefs`` potentially residing on remote nodes. + +.. includecode:: code/akka/docs/actor/TypedActorDocTestBase.java#typed-actor-remote \ No newline at end of file diff --git a/akka-docs/java/untyped-actors.rst b/akka-docs/java/untyped-actors.rst index 4ba40a8f1f..89553f091a 100644 --- a/akka-docs/java/untyped-actors.rst +++ b/akka-docs/java/untyped-actors.rst @@ -82,6 +82,13 @@ that is used in log messages and for identifying actors. The name must not be em or start with ``$``. If the given name is already in use by another child to the same parent actor an `InvalidActorNameException` is thrown. +.. warning:: + + Creating top-level actors with ``system.actorOf`` is a blocking operation, + hence it may dead-lock due to starvation if the default dispatcher is + overloaded. To avoid problems, do not call this method from within actors or + futures which run on the default dispatcher. + Actors are automatically started asynchronously when created. When you create the ``UntypedActor`` then it will automatically call the ``preStart`` callback method on the ``UntypedActor`` class. This is an excellent place to diff --git a/akka-docs/modules/index.rst b/akka-docs/modules/index.rst index 603eeb2084..1abb0b7a0e 100644 --- a/akka-docs/modules/index.rst +++ b/akka-docs/modules/index.rst @@ -6,6 +6,5 @@ Modules durable-mailbox http - microkernel camel spring diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index 6c819facda..fae84c080f 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -76,6 +76,13 @@ that is used in log messages and for identifying actors. The name must not be em or start with ``$``. If the given name is already in use by another child to the same parent actor an `InvalidActorNameException` is thrown. +.. warning:: + + Creating top-level actors with ``system.actorOf`` is a blocking operation, + hence it may dead-lock due to starvation if the default dispatcher is + overloaded. To avoid problems, do not call this method from within actors or + futures which run on the default dispatcher. + Actors are automatically started asynchronously when created. When you create the ``Actor`` then it will automatically call the ``preStart`` callback method on the ``Actor`` trait. This is an excellent place to @@ -143,7 +150,9 @@ The :class:`Actor` trait defines only one abstract method, the above mentioned If the current actor behavior does not match a received message, :meth:`unhandled` is called, which by default publishes an ``akka.actor.UnhandledMessage(message, sender, recipient)`` on the actor -system’s event stream. +system’s event stream (set configuration item +``akka.event-handler-startup-timeout`` to ``true`` to have them converted into +actual Debug messages) In addition, it offers: diff --git a/akka-docs/scala/camel.rst b/akka-docs/scala/camel.rst new file mode 100644 index 0000000000..5475eb0a27 --- /dev/null +++ b/akka-docs/scala/camel.rst @@ -0,0 +1,84 @@ + +.. _camel-scala: + +####### + Camel +####### + +For an introduction to akka-camel 2, see also the Peter Gabryanczyk's talk `Migrating akka-camel module to Akka 2.x`_. + +For an introduction to akka-camel 1, see also the `Appendix E - Akka and Camel`_ +(pdf) of the book `Camel in Action`_. + +.. _Appendix E - Akka and Camel: http://www.manning.com/ibsen/appEsample.pdf +.. _Camel in Action: http://www.manning.com/ibsen/ +.. _Migrating akka-camel module to Akka 2.x: http://skillsmatter.com/podcast/scala/akka-2-x + +Other, more advanced external articles (for version 1) are: + +* `Akka Consumer Actors: New Features and Best Practices `_ +* `Akka Producer Actors: New Features and Best Practices `_ + + + +Introduction +============ + +The akka-camel module allows actors to receive +and send messages over a great variety of protocols and APIs. This section gives +a brief overview of the general ideas behind the akka-camel module, the +remaining sections go into the details. In addition to the native Scala and Java +actor API, actors can now exchange messages with other systems over large number +of protocols and APIs such as HTTP, SOAP, TCP, FTP, SMTP or JMS, to mention a +few. At the moment, approximately 80 protocols and APIs are supported. + +The akka-camel module is based on `Apache Camel`_, a powerful and leight-weight +integration framework for the JVM. For an introduction to Apache Camel you may +want to read this `Apache Camel article`_. Camel comes with a +large number of `components`_ that provide bindings to different protocols and +APIs. The `camel-extra`_ project provides further components. + +.. _Apache Camel: http://camel.apache.org/ +.. _Apache Camel article: http://architects.dzone.com/articles/apache-camel-integration +.. _components: http://camel.apache.org/components.html +.. _camel-extra: http://code.google.com/p/camel-extra/ + +Usage of Camel's integration components in Akka is essentially a +one-liner. Here's an example. + +.. includecode:: code/akka/docs/camel/Introduction.scala#Consumer-mina + +The above example exposes an actor over a tcp endpoint on port 6200 via Apache +Camel's `Mina component`_. The actor implements the endpointUri method to define +an endpoint from which it can receive messages. After starting the actor, tcp +clients can immediately send messages to and receive responses from that +actor. If the message exchange should go over HTTP (via Camel's `Jetty +component`_), only the actor's endpointUri method must be changed. + +.. _Mina component: http://camel.apache.org/mina.html +.. _Jetty component: http://camel.apache.org/jetty.html + +.. includecode:: code/akka/docs/camel/Introduction.scala#Consumer + +Actors can also trigger message exchanges with external systems i.e. produce to +Camel endpoints. + +.. includecode:: code/akka/docs/camel/Introduction.scala#Producer + +In the above example, any message sent to this actor will be added (produced) to +the example JMS queue. Producer actors may choose from the same set of Camel +components as Consumer actors do. + +The number of Camel components is constantly increasing. The akka-camel module +can support these in a plug-and-play manner. Just add them to your application's +classpath, define a component-specific endpoint URI and use it to exchange +messages over the component-specific protocols or APIs. This is possible because +Camel components bind protocol-specific message formats to a Camel-specific +`normalized message format`__. The normalized message format hides +protocol-specific details from Akka and makes it therefore very easy to support +a large number of protocols through a uniform Camel component interface. The +akka-camel module further converts mutable Camel messages into immutable +representations which are used by Consumer and Producer actors for pattern +matching, transformation, serialization or storage. + +__ https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/Message.java diff --git a/akka-docs/scala/code/akka/docs/actor/TypedActorDocSpec.scala b/akka-docs/scala/code/akka/docs/actor/TypedActorDocSpec.scala index b1fcc9224c..f7c5fa9bf7 100644 --- a/akka-docs/scala/code/akka/docs/actor/TypedActorDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/actor/TypedActorDocSpec.scala @@ -140,6 +140,17 @@ class TypedActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { //#typed-actor-poisonpill } + "proxy any ActorRef" in { + //#typed-actor-remote + val typedActor: Foo with Bar = + TypedActor(system). + typedActorOf( + TypedProps[FooBar], + system.actorFor("akka://SomeSystem@somehost:2552/user/some/foobar")) + //Use "typedActor" as a FooBar + //#typed-actor-remote + } + "supercharge" in { //#typed-actor-supercharge-usage val awesomeFooBar: Foo with Bar = TypedActor(system).typedActorOf(TypedProps[FooBar]()) diff --git a/akka-docs/scala/code/akka/docs/camel/Introduction.scala b/akka-docs/scala/code/akka/docs/camel/Introduction.scala new file mode 100644 index 0000000000..12a29ef72c --- /dev/null +++ b/akka-docs/scala/code/akka/docs/camel/Introduction.scala @@ -0,0 +1,44 @@ +package akka.docs.camel + +import akka.actor._ +import akka.camel._ + +//#Consumer-mina +import akka.actor.Actor +import akka.actor.Actor._ +import akka.camel.{CamelMessage, Consumer} + +class MyActor extends Consumer { + def endpointUri = "mina:tcp://localhost:6200?textline=true" + + def receive = { + case msg: CamelMessage => { /* ... */} + case _ => { /* ... */} + } +} + +// start and expose actor via tcp +val sys = ActorSystem("camel") +val myActor = sys.actorOf(Props[MyActor]) +//#Consumer-mina + + +//#Consumer +class MyActor extends Consumer { + def endpointUri = "jetty:http://localhost:8877/example" + + def receive = { + case msg: CamelMessage => { /* ... */} + case _ => { /* ... */} + } +} +//#Consumer + +//#Producer +import akka.actor.Actor +import akka.camel.{Producer, Oneway} + +class MyActor extends Actor with Producer with Oneway { + def endpointUri = "jms:queue:example" +} +//#Producer \ No newline at end of file diff --git a/akka-docs/scala/index.rst b/akka-docs/scala/index.rst index 46a84fe064..fc1b619e26 100644 --- a/akka-docs/scala/index.rst +++ b/akka-docs/scala/index.rst @@ -26,3 +26,5 @@ Scala API testing extending-akka zeromq + microkernel + camel diff --git a/akka-docs/modules/microkernel.rst b/akka-docs/scala/microkernel.rst similarity index 99% rename from akka-docs/modules/microkernel.rst rename to akka-docs/scala/microkernel.rst index 7600e1ebd2..236149964c 100644 --- a/akka-docs/modules/microkernel.rst +++ b/akka-docs/scala/microkernel.rst @@ -2,7 +2,7 @@ .. _microkernel: ############# - Microkernel + Microkernel (Scala) ############# The Akka Microkernel is included in the Akka download found at `downloads`_. diff --git a/akka-docs/scala/scheduler.rst b/akka-docs/scala/scheduler.rst index 6089630625..a98f0f563c 100644 --- a/akka-docs/scala/scheduler.rst +++ b/akka-docs/scala/scheduler.rst @@ -15,6 +15,13 @@ You can schedule sending of messages to actors and execution of tasks (functions You will get a ``Cancellable`` back that you can call :meth:`cancel` on to cancel the execution of the scheduled operation. +.. warning:: + + The default implementation of ``Scheduler`` used by Akka is based on the Netty ``HashedWheelTimer``. + It does not execute tasks at the exact time, but on every tick, it will run everything that is overdue. + The accuracy of the default Scheduler can be modified by the "ticks-per-wheel" and "tick-duration" configuration + properties. For more information, see: `HashedWheelTimers `_. + Some examples ------------- diff --git a/akka-docs/scala/typed-actors.rst b/akka-docs/scala/typed-actors.rst index 8db250fec1..fc570e60a7 100644 --- a/akka-docs/scala/typed-actors.rst +++ b/akka-docs/scala/typed-actors.rst @@ -203,6 +203,13 @@ This is usable if you want to communicate remotely with TypedActors on other mac The ActorRef needs to accept ``MethodCall`` messages. +Lookup & Remoting +----------------- + +Since ``TypedActors`` are backed by ``Akka Actors``, you can use ``actorFor`` together with ``typedActorOf`` to proxy ``ActorRefs`` potentially residing on remote nodes. + +.. includecode:: code/akka/docs/actor/TypedActorDocSpec.scala#typed-actor-remote + Supercharging ------------- diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 5870af9f95..a4d9a8d0c6 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -78,8 +78,7 @@ class RemoteActorRefProvider( _transport = { val fqn = remoteSettings.RemoteTransport val args = Seq( - classOf[RemoteSettings] -> remoteSettings, - classOf[ActorSystemImpl] -> system, + classOf[ExtendedActorSystem] -> system, classOf[RemoteActorRefProvider] -> this) system.dynamicAccess.createInstanceFor[RemoteTransport](fqn, args) match { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index 77bc8320c0..3bade97460 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -5,13 +5,13 @@ package akka.remote import scala.reflect.BeanProperty -import akka.actor.{ Terminated, LocalRef, InternalActorRef, AutoReceivedMessage, AddressFromURIString, Address, ActorSystemImpl, ActorSystem, ActorRef } import akka.dispatch.SystemMessage import akka.event.{ LoggingAdapter, Logging } import akka.AkkaException import akka.serialization.Serialization import akka.remote.RemoteProtocol._ import akka.dispatch.ChildTerminated +import akka.actor._ /** * Remote life-cycle events. @@ -152,7 +152,7 @@ class RemoteTransportException(message: String, cause: Throwable) extends AkkaEx * be available (i.e. fully initialized) by the time the first message is * received or when the start() method returns, whatever happens first. */ -abstract class RemoteTransport { +abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: RemoteActorRefProvider) { /** * Shuts down the remoting */ @@ -163,11 +163,6 @@ abstract class RemoteTransport { */ def address: Address - /** - * The actor system, for which this transport is instantiated. Will publish to its eventStream. - */ - def system: ActorSystem - /** * Start up the transport, i.e. enable incoming connections. */ @@ -197,7 +192,7 @@ abstract class RemoteTransport { override def toString = address.toString } -class RemoteMessage(input: RemoteMessageProtocol, system: ActorSystemImpl) { +class RemoteMessage(input: RemoteMessageProtocol, system: ExtendedActorSystem) { def originalReceiver = input.getRecipient.getPath @@ -216,7 +211,7 @@ trait RemoteMarshallingOps { def log: LoggingAdapter - def system: ActorSystemImpl + def system: ExtendedActorSystem def provider: RemoteActorRefProvider @@ -288,9 +283,9 @@ trait RemoteMarshallingOps { case AddressFromURIString(address) if address == provider.transport.address ⇒ // if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed) r.!(remoteMessage.payload)(remoteMessage.sender) - case r ⇒ log.error("dropping message {} for non-local recipient {} at {} local is {}", remoteMessage.payload, r, address, provider.transport.address) + case r ⇒ log.error("dropping message {} for non-local recipient {} arriving at {} inbound address is {}", remoteMessage.payload, r, address, provider.transport.address) } - case r ⇒ log.error("dropping message {} for non-local recipient {} of type {}", remoteMessage.payload, r, if (r ne null) r.getClass else "null") + case r ⇒ log.error("dropping message {} for non-local recipient {} arriving at {} inbound address is {}", remoteMessage.payload, r, address, provider.transport.address) } } } diff --git a/akka-remote/src/main/scala/akka/remote/netty/Client.scala b/akka-remote/src/main/scala/akka/remote/netty/Client.scala index a0e91398fc..7baf3011ee 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Client.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Client.scala @@ -58,7 +58,7 @@ abstract class RemoteClient private[akka] ( * Converts the message to the wireprotocol and sends the message across the wire */ def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit = if (isRunning) { - if (netty.remoteSettings.LogSend) log.debug("Sending message {} from {} to {}", message, senderOption, recipient) + if (netty.provider.remoteSettings.LogSend) log.debug("Sending message {} from {} to {}", message, senderOption, recipient) send((message, senderOption, recipient)) } else { val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", netty, remoteAddress) diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 42b319e4e5..cf859c3db2 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -16,18 +16,19 @@ import org.jboss.netty.channel.{ ChannelHandlerContext, Channel } import org.jboss.netty.handler.codec.protobuf.{ ProtobufEncoder, ProtobufDecoder } import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor import org.jboss.netty.util.HashedWheelTimer -import akka.actor.{ Address, ActorSystemImpl, ActorRef } import akka.dispatch.MonitorableThreadFactory import akka.event.Logging import akka.remote.RemoteProtocol.AkkaRemoteProtocol import akka.remote.{ RemoteTransportException, RemoteTransport, RemoteSettings, RemoteMarshallingOps, RemoteActorRefProvider, RemoteActorRef, RemoteServerStarted } import akka.util.NonFatal +import akka.actor.{ ExtendedActorSystem, Address, ActorRef } /** * Provides the implementation of the Netty remote support */ -class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: ActorSystemImpl, val provider: RemoteActorRefProvider) - extends RemoteTransport with RemoteMarshallingOps { +class NettyRemoteTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) with RemoteMarshallingOps { + + import provider.remoteSettings val settings = new NettySettings(remoteSettings.config.getConfig("akka.remote.netty"), remoteSettings.systemName) @@ -66,7 +67,7 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor def address = _address.get - val log = Logging(system.eventStream, "NettyRemoteTransport(" + address + ")") + lazy val log = Logging(system.eventStream, "NettyRemoteTransport(" + address + ")") def start(): Unit = { server.start() diff --git a/akka-samples/akka-sample-hello-kernel/src/main/java/sample/kernel/hello/java/HelloKernel.java b/akka-samples/akka-sample-hello-kernel/src/main/java/sample/kernel/hello/java/HelloKernel.java new file mode 100644 index 0000000000..d0ccc4ad79 --- /dev/null +++ b/akka-samples/akka-sample-hello-kernel/src/main/java/sample/kernel/hello/java/HelloKernel.java @@ -0,0 +1,43 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package sample.kernel.hello.java; + +import akka.actor.ActorRef; +import akka.actor.UntypedActor; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.kernel.Bootable; + +public class HelloKernel implements Bootable { + final ActorSystem system = ActorSystem.create("hellokernel"); + + static class HelloActor extends UntypedActor { + final ActorRef worldActor = + getContext().actorOf(new Props(WorldActor.class)); + + public void onReceive(Object message) { + if (message == "start") + worldActor.tell("Hello"); + else if (message instanceof String) + System.out.println("Received message '%s'".format((String)message)); + else unhandled(message); + } +} + +static class WorldActor extends UntypedActor { + public void onReceive(Object message) { + if (message instanceof String) + getSender().tell(((String)message).toUpperCase() + " world!"); + else unhandled(message); + } +} + + public void startup() { + system.actorOf(new Props(HelloActor.class)).tell("start"); + } + + public void shutdown() { + system.shutdown(); + } +} diff --git a/akka-slf4j/src/main/scala/akka/event/slf4j/Slf4jEventHandler.scala b/akka-slf4j/src/main/scala/akka/event/slf4j/Slf4jEventHandler.scala index 72593d4f76..966f57b938 100644 --- a/akka-slf4j/src/main/scala/akka/event/slf4j/Slf4jEventHandler.scala +++ b/akka-slf4j/src/main/scala/akka/event/slf4j/Slf4jEventHandler.scala @@ -43,25 +43,19 @@ class Slf4jEventHandler extends Actor with SLF4JLogging { case event @ Error(cause, logSource, logClass, message) ⇒ withMdc(logSource, event.thread.getName) { cause match { - case Error.NoCause ⇒ Logger(logClass, logSource).error(message.toString) - case _ ⇒ Logger(logClass, logSource).error(message.toString, cause) + case Error.NoCause | null ⇒ Logger(logClass, logSource).error(if (message != null) message.toString else null) + case cause ⇒ Logger(logClass, logSource).error(if (message != null) message.toString else cause.getLocalizedMessage, cause) } } case event @ Warning(logSource, logClass, message) ⇒ - withMdc(logSource, event.thread.getName) { - Logger(logClass, logSource).warn("{}", message.asInstanceOf[AnyRef]) - } + withMdc(logSource, event.thread.getName) { Logger(logClass, logSource).warn("{}", message.asInstanceOf[AnyRef]) } case event @ Info(logSource, logClass, message) ⇒ - withMdc(logSource, event.thread.getName) { - Logger(logClass, logSource).info("{}", message.asInstanceOf[AnyRef]) - } + withMdc(logSource, event.thread.getName) { Logger(logClass, logSource).info("{}", message.asInstanceOf[AnyRef]) } case event @ Debug(logSource, logClass, message) ⇒ - withMdc(logSource, event.thread.getName) { - Logger(logClass, logSource).debug("{}", message.asInstanceOf[AnyRef]) - } + withMdc(logSource, event.thread.getName) { Logger(logClass, logSource).debug("{}", message.asInstanceOf[AnyRef]) } case InitializeLogger(_) ⇒ log.info("Slf4jEventHandler started") diff --git a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala index 56ced17370..97fe6e99aa 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala @@ -4,7 +4,7 @@ package akka.testkit import scala.util.matching.Regex -import akka.actor.{ DeadLetter, ActorSystem, Terminated } +import akka.actor.{ DeadLetter, ActorSystem, Terminated, UnhandledMessage } import akka.dispatch.{ SystemMessage, Terminate } import akka.event.Logging.{ Warning, LogEvent, InitializeLogger, Info, Error, Debug, LoggerInitialized } import akka.event.Logging @@ -447,7 +447,7 @@ class TestEventListener extends Logging.DefaultLogger { override def receive = { case InitializeLogger(bus) ⇒ - Seq(classOf[Mute], classOf[UnMute], classOf[DeadLetter]) foreach (bus.subscribe(context.self, _)) + Seq(classOf[Mute], classOf[UnMute], classOf[DeadLetter], classOf[UnhandledMessage]) foreach (bus.subscribe(context.self, _)) sender ! LoggerInitialized case Mute(filters) ⇒ filters foreach addFilter case UnMute(filters) ⇒ filters foreach removeFilter @@ -462,6 +462,9 @@ class TestEventListener extends Logging.DefaultLogger { val event = Warning(rcp.path.toString, rcp.getClass, "received dead letter from " + snd + ": " + msg) if (!filter(event)) print(event) } + case UnhandledMessage(msg, sender, rcp) ⇒ + val event = Warning(rcp.path.toString, rcp.getClass, "unhandled message from " + sender + ": " + msg) + if (!filter(event)) print(event) case m ⇒ print(Debug(context.system.name, this.getClass, m)) } diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index 3a0f02c79a..fd763e6bad 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -92,6 +92,18 @@ class AkkaSpecSpec extends WordSpec with MustMatchers { "An AkkaSpec" must { + "warn about unhandled messages" in { + implicit val system = ActorSystem("AkkaSpec0", AkkaSpec.testConf) + try { + val a = system.actorOf(Props.empty) + EventFilter.warning(start = "unhandled message", occurrences = 1) intercept { + a ! 42 + } + } finally { + system.shutdown() + } + } + "terminate all actors" in { // verbose config just for demonstration purposes, please leave in in case of debugging import scala.collection.JavaConverters._ diff --git a/akka-tutorials/akka-tutorial-first/src/main/scala/akka/tutorial/first/scala/Pi.scala b/akka-tutorials/akka-tutorial-first/src/main/scala/akka/tutorial/first/scala/Pi.scala index 19fa23544c..94fb83bbd3 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/scala/akka/tutorial/first/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-first/src/main/scala/akka/tutorial/first/scala/Pi.scala @@ -49,7 +49,7 @@ object Pi extends App { var pi: Double = _ var nrOfResults: Int = _ - val start: Long = System.currentTimeMillis + val start: Long = System.currentTimeMillis() //#create-router val workerRouter = context.actorOf( @@ -66,7 +66,7 @@ object Pi extends App { nrOfResults += 1 if (nrOfResults == nrOfMessages) { // Send the result to the listener - listener ! PiApproximation(pi, duration = (System.currentTimeMillis - start).millis) + listener ! PiApproximation(pi, duration = (System.currentTimeMillis() - start).millis) // Stops this actor and all its supervised children context.stop(self) } diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index b4c9c7527d..c21ca76d73 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -19,7 +19,7 @@ object AkkaBuild extends Build { lazy val buildSettings = Seq( organization := "com.typesafe.akka", version := "2.1-SNAPSHOT", - scalaVersion := "2.9.1-1" + scalaVersion := "2.9.2" ) lazy val akka = Project( @@ -332,7 +332,8 @@ object AkkaBuild extends Build { override lazy val settings = super.settings ++ buildSettings ++ Seq( resolvers += "Sonatype Snapshot Repo" at "https://oss.sonatype.org/content/repositories/snapshots/", - resolvers += "Twitter Public Repo" at "http://maven.twttr.com" // This will be going away with com.mongodb.async's next release + resolvers += "Twitter Public Repo" at "http://maven.twttr.com", // This will be going away with com.mongodb.async's next release + shellPrompt := { s => Project.extract(s).currentProject.id + " > " } ) lazy val baseSettings = Defaults.defaultSettings ++ Publish.settings @@ -423,7 +424,7 @@ object Dependencies { val actorTests = Seq( Test.junit, Test.scalatest, Test.commonsMath, Test.mockito, - Test.scalacheck, protobuf, jacksonMapper + Test.scalacheck, protobuf ) val remote = Seq( @@ -486,60 +487,41 @@ object Dependency { // Versions object V { - val Camel = "2.8.0" - val Jackson = "1.8.0" - val Jetty = "7.4.0.v20110414" - val Logback = "0.9.28" - val Netty = "3.3.0.Final" - val Protobuf = "2.4.1" - val Rabbit = "2.3.1" - val ScalaStm = "0.5" - val Scalatest = "1.6.1" - val Slf4j = "1.6.4" - val Spring = "3.0.5.RELEASE" - val Zookeeper = "3.4.0" + val Camel = "2.8.0" + val Logback = "0.9.28" + val Netty = "3.3.0.Final" + val Protobuf = "2.4.1" + val Rabbit = "2.3.1" + val ScalaStm = "0.5" + val Scalatest = "1.6.1" + val Slf4j = "1.6.4" + val Spring = "3.0.5.RELEASE" + val Zookeeper = "3.4.0" } // Compile - val beanstalk = "beanstalk" % "beanstalk_client" % "1.4.5" // New BSD - val bookkeeper = "org.apache.hadoop.zookeeper" % "bookkeeper" % V.Zookeeper // ApacheV2 - val camelCore = "org.apache.camel" % "camel-core" % V.Camel // ApacheV2 - val camelSpring = "org.apache.camel" % "camel-spring" % V.Camel // ApacheV2 - val commonsCodec = "commons-codec" % "commons-codec" % "1.4" // ApacheV2 - val commonsIo = "commons-io" % "commons-io" % "2.0.1" // ApacheV2 - val commonsPool = "commons-pool" % "commons-pool" % "1.5.6" // ApacheV2 - val guice = "org.guiceyfruit" % "guice-all" % "2.0" // ApacheV2 - val jacksonCore = "org.codehaus.jackson" % "jackson-core-asl" % V.Jackson // ApacheV2 - val jacksonMapper = "org.codehaus.jackson" % "jackson-mapper-asl" % V.Jackson // ApacheV2 - val jettyUtil = "org.eclipse.jetty" % "jetty-util" % V.Jetty // Eclipse license - val jettyXml = "org.eclipse.jetty" % "jetty-xml" % V.Jetty // Eclipse license - val jettyServlet = "org.eclipse.jetty" % "jetty-servlet" % V.Jetty // Eclipse license - val jmxClient = "cmdline-jmxclient" % "cmdline-jmxclient" % "0.10.3" // LGPL - val log4j = "log4j" % "log4j" % "1.2.14" // ApacheV2 - val mongoAsync = "com.mongodb.async" % "mongo-driver_2.9.0-1" % "0.2.9-1" // ApacheV2 - val netty = "io.netty" % "netty" % V.Netty // ApacheV2 - val osgi = "org.osgi" % "org.osgi.core" % "4.2.0" // ApacheV2 - val protobuf = "com.google.protobuf" % "protobuf-java" % V.Protobuf // New BSD - val rabbit = "com.rabbitmq" % "amqp-client" % V.Rabbit // Mozilla Public License - val redis = "net.debasishg" % "redisclient_2.9.1" % "2.4.0" // ApacheV2 - val scalaStm = "org.scala-tools" % "scala-stm_2.9.1" % V.ScalaStm // Modified BSD (Scala) - val slf4jApi = "org.slf4j" % "slf4j-api" % V.Slf4j // MIT - val springBeans = "org.springframework" % "spring-beans" % V.Spring // ApacheV2 - val springContext = "org.springframework" % "spring-context" % V.Spring // ApacheV2 - val staxApi = "javax.xml.stream" % "stax-api" % "1.0-2" // ApacheV2 - val twttrUtilCore = "com.twitter" % "util-core" % "1.8.1" // ApacheV2 - val zkClient = "zkclient" % "zkclient" % "0.3" // ApacheV2 - val zookeeper = "org.apache.hadoop.zookeeper" % "zookeeper" % V.Zookeeper // ApacheV2 - val zookeeperLock = "org.apache.hadoop.zookeeper" % "zookeeper-recipes-lock" % V.Zookeeper // ApacheV2 - val zeroMQ = "org.zeromq" % "zeromq-scala-binding_2.9.1" % "0.0.5" // ApacheV2 - - // Provided - - object Provided { - val javaxServlet = "org.apache.geronimo.specs" % "geronimo-servlet_3.0_spec" % "1.0" % "provided" // CDDL v1 - val jetty = "org.eclipse.jetty" % "jetty-server" % V.Jetty % "provided" // Eclipse license - } + val beanstalk = "beanstalk" % "beanstalk_client" % "1.4.5" // New BSD + val camelCore = "org.apache.camel" % "camel-core" % V.Camel // ApacheV2 + val camelSpring = "org.apache.camel" % "camel-spring" % V.Camel // ApacheV2 + val commonsCodec = "commons-codec" % "commons-codec" % "1.4" // ApacheV2 + val commonsIo = "commons-io" % "commons-io" % "2.0.1" // ApacheV2 + val commonsPool = "commons-pool" % "commons-pool" % "1.5.6" // ApacheV2 + val jmxClient = "cmdline-jmxclient" % "cmdline-jmxclient" % "0.10.3" // LGPL + val mongoAsync = "com.mongodb.async" % "mongo-driver_2.9.0-1" % "0.2.9-1" // ApacheV2 + val netty = "io.netty" % "netty" % V.Netty // ApacheV2 + val protobuf = "com.google.protobuf" % "protobuf-java" % V.Protobuf // New BSD + val rabbit = "com.rabbitmq" % "amqp-client" % V.Rabbit // Mozilla Public License + val redis = "net.debasishg" % "redisclient_2.9.1" % "2.4.0" // ApacheV2 + val scalaStm = "org.scala-tools" % "scala-stm_2.9.1" % V.ScalaStm // Modified BSD (Scala) + val slf4jApi = "org.slf4j" % "slf4j-api" % V.Slf4j // MIT + val springBeans = "org.springframework" % "spring-beans" % V.Spring // ApacheV2 + val springContext = "org.springframework" % "spring-context" % V.Spring // ApacheV2 + val twttrUtilCore = "com.twitter" % "util-core" % "1.8.1" // ApacheV2 + val zkClient = "zkclient" % "zkclient" % "0.3" // ApacheV2 + val zookeeper = "org.apache.hadoop.zookeeper" % "zookeeper" % V.Zookeeper // ApacheV2 + val zookeeperLock = "org.apache.hadoop.zookeeper" % "zookeeper-recipes-lock" % V.Zookeeper // ApacheV2 + val zeroMQ = "org.zeromq" % "zeromq-scala-binding_2.9.1" % "0.0.5" // ApacheV2 // Runtime @@ -556,8 +538,6 @@ object Dependency { object Test { val commonsColl = "commons-collections" % "commons-collections" % "3.2.1" % "test" // ApacheV2 val commonsMath = "org.apache.commons" % "commons-math" % "2.1" % "test" // ApacheV2 - val jetty = "org.eclipse.jetty" % "jetty-server" % V.Jetty % "test" // Eclipse license - val jettyWebapp = "org.eclipse.jetty" % "jetty-webapp" % V.Jetty % "test" // Eclipse license val junit = "junit" % "junit" % "4.5" % "test" // Common Public License 1.0 val logback = "ch.qos.logback" % "logback-classic" % V.Logback % "test" // EPL 1.0 / LGPL 2.1 val mockito = "org.mockito" % "mockito-all" % "1.8.1" % "test" // MIT