From 8138186a44a6bbb45564253cbf12a2e6c0f9c673 Mon Sep 17 00:00:00 2001 From: Piotr Gabryanczyk Date: Fri, 30 Mar 2012 11:08:00 +0100 Subject: [PATCH 01/36] Made message conversions on producer more user friendly by renaming it and adding outgonig message converter. --- .../src/main/scala/akka/camel/Producer.scala | 24 +++++++++++++------ .../camel/javaapi/UntypedProducerActor.scala | 6 ++--- .../akka/camel/ProducerFeatureTest.scala | 4 ++-- 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/akka-camel/src/main/scala/akka/camel/Producer.scala b/akka-camel/src/main/scala/akka/camel/Producer.scala index 293de6ad39..e7ac1fb194 100644 --- a/akka-camel/src/main/scala/akka/camel/Producer.scala +++ b/akka-camel/src/main/scala/akka/camel/Producer.scala @@ -88,13 +88,13 @@ trait ProducerSupport { this: Actor ⇒ * @see Producer#produce(Any, ExchangePattern) */ protected def produce: Receive = { - case res: MessageResult ⇒ receiveAfterProduce(res.message) - case res: FailureResult ⇒ receiveAfterProduce(res.failure) + case res: MessageResult ⇒ routeResponse(res.message) + case res: FailureResult ⇒ routeResponse(res.failure) case msg ⇒ { if (oneway) - produce(receiveBeforeProduce(msg), ExchangePattern.InOnly) + produce(transformOutgoingMessage(msg), ExchangePattern.InOnly) else - produce(receiveBeforeProduce(msg), ExchangePattern.InOut) + produce(transformOutgoingMessage(msg), ExchangePattern.InOut) } } @@ -103,7 +103,16 @@ trait ProducerSupport { this: Actor ⇒ * message is passed as argument. By default, this method simply returns the argument but may be overridden * by subtraits or subclasses. */ - protected def receiveBeforeProduce: PartialFunction[Any, Any] = { + protected def transformOutgoingMessage: PartialFunction[Any, Any] = { + case msg ⇒ msg + } + + /** + * Called before the response message is sent to the original sender. The original + * message is passed as argument. By default, this method simply returns the argument but may be overridden + * by subtraits or subclasses. + */ + protected def transformResponse: PartialFunction[Any, Any] = { case msg ⇒ msg } @@ -114,8 +123,9 @@ trait ProducerSupport { this: Actor ⇒ * done. This method may be overridden by subtraits or subclasses (e.g. to forward responses to another * actor). */ - protected def receiveAfterProduce: Receive = { - case msg ⇒ if (!oneway) sender ! msg + + protected def routeResponse: Receive = { + case msg ⇒ if (!oneway) sender ! transformResponse(msg) } } diff --git a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala index b947e43d64..d615c64244 100644 --- a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala +++ b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala @@ -27,13 +27,13 @@ abstract class UntypedProducerActor extends UntypedActor with ProducerSupport { * if oneway is false. If oneway is true, nothing is * done. This method may be overridden by subclasses (e.g. to forward responses to another actor). */ - def onReceiveAfterProduce(message: AnyRef): Unit = super.receiveAfterProduce(message) + def onReceiveAfterProduce(message: AnyRef): Unit = super.routeResponse(message) - final override def receiveBeforeProduce = { + final override def transformOutgoingMessage = { case msg: AnyRef ⇒ onReceiveBeforeProduce(msg) } - final override def receiveAfterProduce = { + final override def routeResponse = { case msg: AnyRef ⇒ onReceiveAfterProduce(msg) } diff --git a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala index 515b776cfe..6de826bb8f 100644 --- a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala @@ -266,7 +266,7 @@ object ProducerFeatureTest { class TestProducer(uri: String, upper: Boolean = false) extends Actor with Producer { def endpointUri = uri - override protected def receiveBeforeProduce = { + override protected def transformOutgoingMessage = { case msg: CamelMessage ⇒ if (upper) msg.mapBody { body: String ⇒ body.toUpperCase } @@ -277,7 +277,7 @@ object ProducerFeatureTest { class TestForwarder(uri: String, target: ActorRef) extends Actor with Producer { def endpointUri = uri - override protected def receiveAfterProduce = { + override protected def routeResponse = { case msg ⇒ target forward msg } } From 7da8fa132a9f7b019ba628ffe037c001586f5b21 Mon Sep 17 00:00:00 2001 From: Piotr Gabryanczyk Date: Fri, 30 Mar 2012 11:18:07 +0100 Subject: [PATCH 02/36] Made message conversions on producer more user friendly by renaming it and adding outgonig message converter. Part II --- .../camel/javaapi/UntypedProducerActor.scala | 19 +++++++++++++++---- .../SampleUntypedForwardingProducer.java | 2 +- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala index d615c64244..e67c4fe3cd 100644 --- a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala +++ b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala @@ -19,7 +19,14 @@ abstract class UntypedProducerActor extends UntypedActor with ProducerSupport { * message is passed as argument. By default, this method simply returns the argument but may be overridden * by subclasses. */ - def onReceiveBeforeProduce(message: AnyRef): AnyRef = message + def onTransformOutgoingMessage(message: AnyRef): AnyRef = message + + /** + * Called before the response message is sent to original sender. The original + * message is passed as argument. By default, this method simply returns the argument but may be overridden + * by subclasses. + */ + def onTransformResponse(message: AnyRef): AnyRef = message /** * Called after a response was received from the endpoint specified by endpointUri. The @@ -27,14 +34,18 @@ abstract class UntypedProducerActor extends UntypedActor with ProducerSupport { * if oneway is false. If oneway is true, nothing is * done. This method may be overridden by subclasses (e.g. to forward responses to another actor). */ - def onReceiveAfterProduce(message: AnyRef): Unit = super.routeResponse(message) + def onRouteResponse(message: AnyRef): Unit = super.routeResponse(message) final override def transformOutgoingMessage = { - case msg: AnyRef ⇒ onReceiveBeforeProduce(msg) + case msg: AnyRef ⇒ onTransformOutgoingMessage(msg) + } + + final override def transformResponse = { + case msg: AnyRef ⇒ onTransformResponse(msg) } final override def routeResponse = { - case msg: AnyRef ⇒ onReceiveAfterProduce(msg) + case msg: AnyRef ⇒ onRouteResponse(msg) } final override def endpointUri = getEndpointUri diff --git a/akka-camel/src/test/java/akka/camel/SampleUntypedForwardingProducer.java b/akka-camel/src/test/java/akka/camel/SampleUntypedForwardingProducer.java index ef0b7465c5..375ef36835 100644 --- a/akka-camel/src/test/java/akka/camel/SampleUntypedForwardingProducer.java +++ b/akka-camel/src/test/java/akka/camel/SampleUntypedForwardingProducer.java @@ -15,7 +15,7 @@ public class SampleUntypedForwardingProducer extends UntypedProducerActor { } @Override - public void onReceiveAfterProduce(Object message) { + public void onRouteResponse(Object message) { CamelMessage msg = (CamelMessage)message; String body = msg.getBodyAs(String.class,getCamelContext()); getProducerTemplate().sendBody("direct:forward-test-1", body); From 44a19aff8e057573da74d26ccaa9afd387549b9b Mon Sep 17 00:00:00 2001 From: Piotr Gabryanczyk Date: Fri, 6 Apr 2012 11:13:59 +0100 Subject: [PATCH 03/36] Partial functions replaced with regular functions for message converters --- akka-camel/src/main/scala/akka/camel/Producer.scala | 8 ++------ .../scala/akka/camel/javaapi/UntypedProducerActor.scala | 4 ++-- .../src/test/scala/akka/camel/ProducerFeatureTest.scala | 2 +- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/akka-camel/src/main/scala/akka/camel/Producer.scala b/akka-camel/src/main/scala/akka/camel/Producer.scala index e7ac1fb194..679bfbd495 100644 --- a/akka-camel/src/main/scala/akka/camel/Producer.scala +++ b/akka-camel/src/main/scala/akka/camel/Producer.scala @@ -103,18 +103,14 @@ trait ProducerSupport { this: Actor ⇒ * message is passed as argument. By default, this method simply returns the argument but may be overridden * by subtraits or subclasses. */ - protected def transformOutgoingMessage: PartialFunction[Any, Any] = { - case msg ⇒ msg - } + protected def transformOutgoingMessage(msg: Any): Any = msg /** * Called before the response message is sent to the original sender. The original * message is passed as argument. By default, this method simply returns the argument but may be overridden * by subtraits or subclasses. */ - protected def transformResponse: PartialFunction[Any, Any] = { - case msg ⇒ msg - } + protected def transformResponse(msg: Any): Any = msg /** * Called after a response was received from the endpoint specified by endpointUri. The diff --git a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala index e67c4fe3cd..18055223d6 100644 --- a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala +++ b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala @@ -36,11 +36,11 @@ abstract class UntypedProducerActor extends UntypedActor with ProducerSupport { */ def onRouteResponse(message: AnyRef): Unit = super.routeResponse(message) - final override def transformOutgoingMessage = { + final override def transformOutgoingMessage(msg: Any) = msg match { case msg: AnyRef ⇒ onTransformOutgoingMessage(msg) } - final override def transformResponse = { + final override def transformResponse(msg: Any) = msg match { case msg: AnyRef ⇒ onTransformResponse(msg) } diff --git a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala index 6de826bb8f..a8f138e44a 100644 --- a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala @@ -266,7 +266,7 @@ object ProducerFeatureTest { class TestProducer(uri: String, upper: Boolean = false) extends Actor with Producer { def endpointUri = uri - override protected def transformOutgoingMessage = { + override protected def transformOutgoingMessage(msg: Any) = msg match { case msg: CamelMessage ⇒ if (upper) msg.mapBody { body: String ⇒ body.toUpperCase } From 1b254f9f61c731700953a0c1b115f54ce54f77f2 Mon Sep 17 00:00:00 2001 From: Piotr Gabryanczyk Date: Mon, 9 Apr 2012 07:44:47 +0100 Subject: [PATCH 04/36] adding return types --- .../scala/akka/camel/javaapi/UntypedProducerActor.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala index 18055223d6..a9138654d9 100644 --- a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala +++ b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala @@ -36,15 +36,15 @@ abstract class UntypedProducerActor extends UntypedActor with ProducerSupport { */ def onRouteResponse(message: AnyRef): Unit = super.routeResponse(message) - final override def transformOutgoingMessage(msg: Any) = msg match { + final override def transformOutgoingMessage(msg: Any) : AnyRef = msg match { case msg: AnyRef ⇒ onTransformOutgoingMessage(msg) } - final override def transformResponse(msg: Any) = msg match { + final override def transformResponse(msg: Any) : AnyRef = msg match { case msg: AnyRef ⇒ onTransformResponse(msg) } - final override def routeResponse = { + final override def routeResponse : AnyRef = { case msg: AnyRef ⇒ onRouteResponse(msg) } From c457360dbee05ebb8b7ee21e178711004a256329 Mon Sep 17 00:00:00 2001 From: Piotr Gabryanczyk Date: Mon, 9 Apr 2012 07:59:03 +0100 Subject: [PATCH 05/36] adding return types --- akka-camel/src/main/scala/akka/camel/Producer.scala | 4 ++-- .../scala/akka/camel/javaapi/UntypedProducerActor.scala | 6 +++--- .../src/test/scala/akka/camel/ProducerFeatureTest.scala | 4 +--- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/akka-camel/src/main/scala/akka/camel/Producer.scala b/akka-camel/src/main/scala/akka/camel/Producer.scala index 679bfbd495..4fd879355c 100644 --- a/akka-camel/src/main/scala/akka/camel/Producer.scala +++ b/akka-camel/src/main/scala/akka/camel/Producer.scala @@ -120,8 +120,8 @@ trait ProducerSupport { this: Actor ⇒ * actor). */ - protected def routeResponse: Receive = { - case msg ⇒ if (!oneway) sender ! transformResponse(msg) + protected def routeResponse(msg: Any): Any = { + if (!oneway) sender ! transformResponse(msg) } } diff --git a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala index a9138654d9..33a7934e81 100644 --- a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala +++ b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala @@ -36,15 +36,15 @@ abstract class UntypedProducerActor extends UntypedActor with ProducerSupport { */ def onRouteResponse(message: AnyRef): Unit = super.routeResponse(message) - final override def transformOutgoingMessage(msg: Any) : AnyRef = msg match { + final override def transformOutgoingMessage(msg: Any): AnyRef = msg match { case msg: AnyRef ⇒ onTransformOutgoingMessage(msg) } - final override def transformResponse(msg: Any) : AnyRef = msg match { + final override def transformResponse(msg: Any): AnyRef = msg match { case msg: AnyRef ⇒ onTransformResponse(msg) } - final override def routeResponse : AnyRef = { + final override def routeResponse(msg: Any): Any = msg match { case msg: AnyRef ⇒ onRouteResponse(msg) } diff --git a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala index a8f138e44a..4db2af577b 100644 --- a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala @@ -277,9 +277,7 @@ object ProducerFeatureTest { class TestForwarder(uri: String, target: ActorRef) extends Actor with Producer { def endpointUri = uri - override protected def routeResponse = { - case msg ⇒ target forward msg - } + override protected def routeResponse(msg: Any): Any = { target forward msg } } class TestResponder extends Actor { From 5851ffb1158514ff22ef2dd97e69b08e64bffbe1 Mon Sep 17 00:00:00 2001 From: Roland Date: Fri, 13 Apr 2012 18:31:52 +0200 Subject: [PATCH 06/36] update to scala 2.9.2 --- project/AkkaBuild.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 6fd920c5f3..dd8dd47a82 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -19,7 +19,7 @@ object AkkaBuild extends Build { lazy val buildSettings = Seq( organization := "com.typesafe.akka", version := "2.1-SNAPSHOT", - scalaVersion := "2.9.1-1" + scalaVersion := "2.9.2" ) lazy val akka = Project( From 6bd4ea17108b2e959ed25a390fa3a740b173c157 Mon Sep 17 00:00:00 2001 From: viktorklang Date: Wed, 18 Apr 2012 12:46:58 +0300 Subject: [PATCH 07/36] Clarifying actor jmm rules --- akka-docs/general/jmm.rst | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/akka-docs/general/jmm.rst b/akka-docs/general/jmm.rst index 3fe94d89db..6d088c0c47 100644 --- a/akka-docs/general/jmm.rst +++ b/akka-docs/general/jmm.rst @@ -43,6 +43,12 @@ To prevent visibility and reordering problems on actors, Akka guarantees the fol * **The actor send rule:** the send of the message to an actor happens before the receive of that message by the same actor. * **The actor subsequent processing rule:** processing of one message happens before processing of the next message by the same actor. +.. note:: + + In layman's terms this means that changes to internal fields of the actor is visible when the next message + is processed by that actor. So fields in your actor does not need to be volatile or equivalent. + + Both rules only apply for the same actor instance and are not valid if different actors are used. Futures and the Java Memory Model From 984d53a623c832eee023d52d6dc913af946f1ad8 Mon Sep 17 00:00:00 2001 From: viktorklang Date: Thu, 19 Apr 2012 11:13:35 +0300 Subject: [PATCH 08/36] Stopping the bleeding of ActorSystem ScalaDoc --- akka-actor/src/main/scala/akka/actor/ActorSystem.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index a56e920eb2..4112905711 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -200,7 +200,7 @@ object ActorSystem { * * Where no name is given explicitly, one will be automatically generated. * - * Important Notice: + * Important Notice: * * This class is not meant to be extended by user code. If you want to * actually roll your own Akka, it will probably be better to look into @@ -376,7 +376,7 @@ abstract class ActorSystem extends ActorRefFactory { /** * More powerful interface to the actor system’s implementation which is presented to extensions (see [[akka.actor.Extension]]). * - * Important Notice: + * Important Notice: * * This class is not meant to be extended by user code. If you want to * actually roll your own Akka, beware that you are completely on your own in From b9ea45716197aabd28a3c99769575c18c90bc5bb Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 19 Apr 2012 15:09:20 +0200 Subject: [PATCH 09/36] Removing unused dependencies --- project/AkkaBuild.scala | 23 +---------------------- 1 file changed, 1 insertion(+), 22 deletions(-) diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index dd8dd47a82..9e1c895966 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -430,7 +430,7 @@ object Dependencies { val actorTests = Seq( Test.junit, Test.scalatest, Test.commonsMath, Test.mockito, - Test.scalacheck, protobuf, jacksonMapper + Test.scalacheck, protobuf ) val remote = Seq( @@ -494,8 +494,6 @@ object Dependency { object V { val Camel = "2.8.0" - val Jackson = "1.8.0" - val Jetty = "7.4.0.v20110414" val Logback = "0.9.28" val Netty = "3.3.0.Final" val Protobuf = "2.4.1" @@ -510,22 +508,13 @@ object Dependency { // Compile val beanstalk = "beanstalk" % "beanstalk_client" % "1.4.5" // New BSD - val bookkeeper = "org.apache.hadoop.zookeeper" % "bookkeeper" % V.Zookeeper // ApacheV2 val camelCore = "org.apache.camel" % "camel-core" % V.Camel // ApacheV2 val camelSpring = "org.apache.camel" % "camel-spring" % V.Camel // ApacheV2 val commonsCodec = "commons-codec" % "commons-codec" % "1.4" // ApacheV2 val commonsIo = "commons-io" % "commons-io" % "2.0.1" // ApacheV2 val commonsPool = "commons-pool" % "commons-pool" % "1.5.6" // ApacheV2 - val guice = "org.guiceyfruit" % "guice-all" % "2.0" // ApacheV2 - val jacksonCore = "org.codehaus.jackson" % "jackson-core-asl" % V.Jackson // ApacheV2 - val jacksonMapper = "org.codehaus.jackson" % "jackson-mapper-asl" % V.Jackson // ApacheV2 - val jettyUtil = "org.eclipse.jetty" % "jetty-util" % V.Jetty // Eclipse license - val jettyXml = "org.eclipse.jetty" % "jetty-xml" % V.Jetty // Eclipse license - val jettyServlet = "org.eclipse.jetty" % "jetty-servlet" % V.Jetty // Eclipse license - val log4j = "log4j" % "log4j" % "1.2.14" // ApacheV2 val mongoAsync = "com.mongodb.async" % "mongo-driver_2.9.0-1" % "0.2.9-1" // ApacheV2 val netty = "io.netty" % "netty" % V.Netty // ApacheV2 - val osgi = "org.osgi" % "org.osgi.core" % "4.2.0" // ApacheV2 val protobuf = "com.google.protobuf" % "protobuf-java" % V.Protobuf // New BSD val rabbit = "com.rabbitmq" % "amqp-client" % V.Rabbit // Mozilla Public License val redis = "net.debasishg" % "redisclient_2.9.1" % "2.4.0" // ApacheV2 @@ -533,20 +522,12 @@ object Dependency { val slf4jApi = "org.slf4j" % "slf4j-api" % V.Slf4j // MIT val springBeans = "org.springframework" % "spring-beans" % V.Spring // ApacheV2 val springContext = "org.springframework" % "spring-context" % V.Spring // ApacheV2 - val staxApi = "javax.xml.stream" % "stax-api" % "1.0-2" // ApacheV2 val twttrUtilCore = "com.twitter" % "util-core" % "1.8.1" // ApacheV2 val zkClient = "zkclient" % "zkclient" % "0.3" // ApacheV2 val zookeeper = "org.apache.hadoop.zookeeper" % "zookeeper" % V.Zookeeper // ApacheV2 val zookeeperLock = "org.apache.hadoop.zookeeper" % "zookeeper-recipes-lock" % V.Zookeeper // ApacheV2 val zeroMQ = "org.zeromq" % "zeromq-scala-binding_2.9.1" % "0.0.5" // ApacheV2 - // Provided - - object Provided { - val javaxServlet = "org.apache.geronimo.specs" % "geronimo-servlet_3.0_spec" % "1.0" % "provided" // CDDL v1 - val jetty = "org.eclipse.jetty" % "jetty-server" % V.Jetty % "provided" // Eclipse license - } - // Runtime object Runtime { @@ -562,8 +543,6 @@ object Dependency { object Test { val commonsColl = "commons-collections" % "commons-collections" % "3.2.1" % "test" // ApacheV2 val commonsMath = "org.apache.commons" % "commons-math" % "2.1" % "test" // ApacheV2 - val jetty = "org.eclipse.jetty" % "jetty-server" % V.Jetty % "test" // Eclipse license - val jettyWebapp = "org.eclipse.jetty" % "jetty-webapp" % V.Jetty % "test" // Eclipse license val junit = "junit" % "junit" % "4.5" % "test" // Common Public License 1.0 val logback = "ch.qos.logback" % "logback-classic" % V.Logback % "test" // EPL 1.0 / LGPL 2.1 val mockito = "org.mockito" % "mockito-all" % "1.8.1" % "test" // MIT From 1453170c965c3127ea623c280a5c59e58a0d2c66 Mon Sep 17 00:00:00 2001 From: Oliver Schulz Date: Wed, 18 Apr 2012 21:57:58 +0200 Subject: [PATCH 10/36] Fixed ByteStringBuilder.resizeTemp resizeTemp didn't update _tempCapacity, causing ByteStringBuilder to create a new _temp array on each call of +=. --- akka-actor/src/main/scala/akka/util/ByteString.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/akka-actor/src/main/scala/akka/util/ByteString.scala b/akka-actor/src/main/scala/akka/util/ByteString.scala index 15ece6d3a8..455e9cdca0 100644 --- a/akka-actor/src/main/scala/akka/util/ByteString.scala +++ b/akka-actor/src/main/scala/akka/util/ByteString.scala @@ -349,6 +349,7 @@ final class ByteStringBuilder extends Builder[Byte, ByteString] { val newtemp = new Array[Byte](size) if (_tempLength > 0) Array.copy(_temp, 0, newtemp, 0, _tempLength) _temp = newtemp + _tempCapacity = _temp.length } private def ensureTempSize(size: Int) { From b7fc3f6da0d8acbb8922bd4432006bd5a18a95f9 Mon Sep 17 00:00:00 2001 From: Piotr Gabryanczyk Date: Fri, 20 Apr 2012 17:07:42 +0100 Subject: [PATCH 11/36] casting to AnyRef instead --- .../akka/camel/javaapi/UntypedProducerActor.scala | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala index 33a7934e81..d87d005c43 100644 --- a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala +++ b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala @@ -36,17 +36,9 @@ abstract class UntypedProducerActor extends UntypedActor with ProducerSupport { */ def onRouteResponse(message: AnyRef): Unit = super.routeResponse(message) - final override def transformOutgoingMessage(msg: Any): AnyRef = msg match { - case msg: AnyRef ⇒ onTransformOutgoingMessage(msg) - } - - final override def transformResponse(msg: Any): AnyRef = msg match { - case msg: AnyRef ⇒ onTransformResponse(msg) - } - - final override def routeResponse(msg: Any): Any = msg match { - case msg: AnyRef ⇒ onRouteResponse(msg) - } + final override def transformOutgoingMessage(msg: Any): AnyRef = onTransformOutgoingMessage(msg.asInstanceOf[AnyRef]) + final override def transformResponse(msg: Any): AnyRef = onTransformResponse(msg.asInstanceOf[AnyRef]) + final override def routeResponse(msg: Any): Any = onRouteResponse(msg.asInstanceOf[AnyRef]) final override def endpointUri = getEndpointUri From d715c470e7ee42eba63fab252dea8540e163149c Mon Sep 17 00:00:00 2001 From: Piotr Gabryanczyk Date: Sat, 21 Apr 2012 14:52:43 +0100 Subject: [PATCH 12/36] Initial doc import. Adding just an introduction as this is my first commit to docs, so this is just to set the scene. --- akka-docs/scala/index.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/akka-docs/scala/index.rst b/akka-docs/scala/index.rst index 46a84fe064..9c631af6db 100644 --- a/akka-docs/scala/index.rst +++ b/akka-docs/scala/index.rst @@ -26,3 +26,4 @@ Scala API testing extending-akka zeromq + camel From 602c47d6a1d4cb9838e16ea0ef2940a093e2a391 Mon Sep 17 00:00:00 2001 From: Piotr Gabryanczyk Date: Sat, 21 Apr 2012 14:53:01 +0100 Subject: [PATCH 13/36] Initial doc import. Adding just an introduction as this is my first commit to docs, so this is just to set the scene. --- akka-docs/scala/camel.rst | 115 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 akka-docs/scala/camel.rst diff --git a/akka-docs/scala/camel.rst b/akka-docs/scala/camel.rst new file mode 100644 index 0000000000..a173f542c0 --- /dev/null +++ b/akka-docs/scala/camel.rst @@ -0,0 +1,115 @@ + +.. _camel-scala: + +####### + Camel +####### + +For an introduction to akka-camel 2, see also the Peter Gabryanczyk's talk `Migrating akka-camel module to Akka 2.x`_. + +For an introduction to akka-camel 1, see also the `Appendix E - Akka and Camel`_ +(pdf) of the book `Camel in Action`_. + +.. _Appendix E - Akka and Camel: http://www.manning.com/ibsen/appEsample.pdf +.. _Camel in Action: http://www.manning.com/ibsen/ +.. _Migrating akka-camel module to Akka 2.x: http://skillsmatter.com/podcast/scala/akka-2-x + +Other, more advanced external articles (for version 1) are: + +* `Akka Consumer Actors: New Features and Best Practices `_ +* `Akka Producer Actors: New Features and Best Practices `_ + + + +Introduction +============ + +The akka-camel module allows actors, untyped actors, and typed actors to receive +and send messages over a great variety of protocols and APIs. This section gives +a brief overview of the general ideas behind the akka-camel module, the +remaining sections go into the details. In addition to the native Scala and Java +actor API, actors can now exchange messages with other systems over large number +of protocols and APIs such as HTTP, SOAP, TCP, FTP, SMTP or JMS, to mention a +few. At the moment, approximately 80 protocols and APIs are supported. + +The akka-camel module is based on `Apache Camel`_, a powerful and leight-weight +integration framework for the JVM. For an introduction to Apache Camel you may +want to read this `Apache Camel article`_. Camel comes with a +large number of `components`_ that provide bindings to different protocols and +APIs. The `camel-extra`_ project provides further components. + +.. _Apache Camel: http://camel.apache.org/ +.. _Apache Camel article: http://architects.dzone.com/articles/apache-camel-integration +.. _components: http://camel.apache.org/components.html +.. _camel-extra: http://code.google.com/p/camel-extra/ + +Usage of Camel's integration components in Akka is essentially a +one-liner. Here's an example. + +.. code-block:: scala + + import akka.actor.Actor + import akka.actor.Actor._ + import akka.camel.{CamelMessage, Consumer} + + class MyActor extends Consumer { + def endpointUri = "mina:tcp://localhost:6200?textline=true" + + def receive = { + case msg: CamelMessage => { /* ... */} + case _ => { /* ... */} + } + } + + // start and expose actor via tcp + val sys = ActorSystem("camel") + val myActor = sys.actorOf(Props[MyActor]) + +The above example exposes an actor over a tcp endpoint on port 6200 via Apache +Camel's `Mina component`_. The actor implements the endpointUri method to define +an endpoint from which it can receive messages. After starting the actor, tcp +clients can immediately send messages to and receive responses from that +actor. If the message exchange should go over HTTP (via Camel's `Jetty +component`_), only the actor's endpointUri method must be changed. + +.. _Mina component: http://camel.apache.org/mina.html +.. _Jetty component: http://camel.apache.org/jetty.html + +.. code-block:: scala + + class MyActor extends Consumer { + def endpointUri = "jetty:http://localhost:8877/example" + + def receive = { + case msg: CamelMessage => { /* ... */} + case _ => { /* ... */} + } + } + +Actors can also trigger message exchanges with external systems i.e. produce to +Camel endpoints. + +.. code-block:: scala + + import akka.actor.Actor + import akka.camel.{Producer, Oneway} + + class MyActor extends Actor with Producer with Oneway { + def endpointUri = "jms:queue:example" + } + +In the above example, any message sent to this actor will be added (produced) to +the example JMS queue. Producer actors may choose from the same set of Camel +components as Consumer actors do. + +The number of Camel components is constantly increasing. The akka-camel module +can support these in a plug-and-play manner. Just add them to your application's +classpath, define a component-specific endpoint URI and use it to exchange +messages over the component-specific protocols or APIs. This is possible because +Camel components bind protocol-specific message formats to a Camel-specific +`normalized message format`__. The normalized message format hides +protocol-specific details from Akka and makes it therefore very easy to support +a large number of protocols through a uniform Camel component interface. The +akka-camel module further converts mutable Camel messages into `immutable +representations`__ which are used by Consumer and Producer actors for pattern +matching, transformation, serialization or storage. \ No newline at end of file From a9fac726e84cd9d344688b4596cf2b828d2ee378 Mon Sep 17 00:00:00 2001 From: Piotr Gabryanczyk Date: Sun, 22 Apr 2012 17:02:06 +0100 Subject: [PATCH 14/36] Extracting code to scala files --- akka-docs/scala/camel.rst | 47 ++++--------------- .../code/akka/docs/camel/Introduction.scala | 44 +++++++++++++++++ 2 files changed, 52 insertions(+), 39 deletions(-) create mode 100644 akka-docs/scala/code/akka/docs/camel/Introduction.scala diff --git a/akka-docs/scala/camel.rst b/akka-docs/scala/camel.rst index a173f542c0..a728ab186e 100644 --- a/akka-docs/scala/camel.rst +++ b/akka-docs/scala/camel.rst @@ -46,24 +46,7 @@ APIs. The `camel-extra`_ project provides further components. Usage of Camel's integration components in Akka is essentially a one-liner. Here's an example. -.. code-block:: scala - - import akka.actor.Actor - import akka.actor.Actor._ - import akka.camel.{CamelMessage, Consumer} - - class MyActor extends Consumer { - def endpointUri = "mina:tcp://localhost:6200?textline=true" - - def receive = { - case msg: CamelMessage => { /* ... */} - case _ => { /* ... */} - } - } - - // start and expose actor via tcp - val sys = ActorSystem("camel") - val myActor = sys.actorOf(Props[MyActor]) +.. includecode:: code/akka/docs/camel/Introduction.scala#Consumer-mina The above example exposes an actor over a tcp endpoint on port 6200 via Apache Camel's `Mina component`_. The actor implements the endpointUri method to define @@ -75,28 +58,12 @@ component`_), only the actor's endpointUri method must be changed. .. _Mina component: http://camel.apache.org/mina.html .. _Jetty component: http://camel.apache.org/jetty.html -.. code-block:: scala - - class MyActor extends Consumer { - def endpointUri = "jetty:http://localhost:8877/example" - - def receive = { - case msg: CamelMessage => { /* ... */} - case _ => { /* ... */} - } - } +.. includecode:: code/akka/docs/camel/Introduction.scala#Consumer Actors can also trigger message exchanges with external systems i.e. produce to Camel endpoints. -.. code-block:: scala - - import akka.actor.Actor - import akka.camel.{Producer, Oneway} - - class MyActor extends Actor with Producer with Oneway { - def endpointUri = "jms:queue:example" - } +.. includecode:: code/akka/docs/camel/Introduction.scala#Producer In the above example, any message sent to this actor will be added (produced) to the example JMS queue. Producer actors may choose from the same set of Camel @@ -110,6 +77,8 @@ Camel components bind protocol-specific message formats to a Camel-specific `normalized message format`__. The normalized message format hides protocol-specific details from Akka and makes it therefore very easy to support a large number of protocols through a uniform Camel component interface. The -akka-camel module further converts mutable Camel messages into `immutable -representations`__ which are used by Consumer and Producer actors for pattern -matching, transformation, serialization or storage. \ No newline at end of file +akka-camel module further converts mutable Camel messages into immutable +representations which are used by Consumer and Producer actors for pattern +matching, transformation, serialization or storage. + +__ https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/Message.java diff --git a/akka-docs/scala/code/akka/docs/camel/Introduction.scala b/akka-docs/scala/code/akka/docs/camel/Introduction.scala new file mode 100644 index 0000000000..12a29ef72c --- /dev/null +++ b/akka-docs/scala/code/akka/docs/camel/Introduction.scala @@ -0,0 +1,44 @@ +package akka.docs.camel + +import akka.actor._ +import akka.camel._ + +//#Consumer-mina +import akka.actor.Actor +import akka.actor.Actor._ +import akka.camel.{CamelMessage, Consumer} + +class MyActor extends Consumer { + def endpointUri = "mina:tcp://localhost:6200?textline=true" + + def receive = { + case msg: CamelMessage => { /* ... */} + case _ => { /* ... */} + } +} + +// start and expose actor via tcp +val sys = ActorSystem("camel") +val myActor = sys.actorOf(Props[MyActor]) +//#Consumer-mina + + +//#Consumer +class MyActor extends Consumer { + def endpointUri = "jetty:http://localhost:8877/example" + + def receive = { + case msg: CamelMessage => { /* ... */} + case _ => { /* ... */} + } +} +//#Consumer + +//#Producer +import akka.actor.Actor +import akka.camel.{Producer, Oneway} + +class MyActor extends Actor with Producer with Oneway { + def endpointUri = "jms:queue:example" +} +//#Producer \ No newline at end of file From c0ca516dd45ac8fa91bc006bf0738f5a7718c1c5 Mon Sep 17 00:00:00 2001 From: Amir Moulavi Date: Mon, 23 Apr 2012 10:28:19 +0200 Subject: [PATCH 15/36] Akka prompt is added to SBT --- project/AkkaBuild.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 9e1c895966..13314f1324 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -340,7 +340,9 @@ object AkkaBuild extends Build { override lazy val settings = super.settings ++ buildSettings ++ Seq( resolvers += "Sonatype Snapshot Repo" at "https://oss.sonatype.org/content/repositories/snapshots/", resolvers += "Twitter Public Repo" at "http://maven.twttr.com" // This will be going away with com.mongodb.async's next release - ) + ) :+ { + shellPrompt := { s => Project.extract(s).currentProject.id + " > " } + } lazy val baseSettings = Defaults.defaultSettings ++ Publish.settings From d93143042bdbeb879a3e2725937ec791ef0de175 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 23 Apr 2012 10:45:59 +0200 Subject: [PATCH 16/36] Adding TypedActor.context for the lifecycle methods --- .../scala/akka/actor/TypedActorSpec.scala | 21 ++++++++---- .../main/scala/akka/actor/TypedActor.scala | 32 ++++++++++++------- 2 files changed, 34 insertions(+), 19 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala index 26f510d08a..9440c18fc3 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala @@ -19,6 +19,7 @@ import akka.dispatch.{ Await, Dispatchers, Future, Promise } import akka.pattern.ask import akka.serialization.JavaSerializer import akka.actor.TypedActor._ +import java.lang.IllegalStateException object TypedActorSpec { @@ -162,20 +163,26 @@ object TypedActorSpec { class LifeCyclesImpl(val latch: CountDownLatch) extends PreStart with PostStop with PreRestart with PostRestart with LifeCycles with Receiver { + private def ensureContextAvailable[T](f: ⇒ T): T = TypedActor.context match { + case null ⇒ throw new IllegalStateException("TypedActor.context is null!") + case some ⇒ f + } + override def crash(): Unit = throw new IllegalStateException("Crash!") - override def preStart(): Unit = latch.countDown() + override def preStart(): Unit = ensureContextAvailable(latch.countDown()) - override def postStop(): Unit = for (i ← 1 to 3) latch.countDown() + override def postStop(): Unit = ensureContextAvailable(for (i ← 1 to 3) latch.countDown()) - override def preRestart(reason: Throwable, message: Option[Any]): Unit = for (i ← 1 to 5) latch.countDown() + override def preRestart(reason: Throwable, message: Option[Any]): Unit = ensureContextAvailable(for (i ← 1 to 5) latch.countDown()) - override def postRestart(reason: Throwable): Unit = for (i ← 1 to 7) latch.countDown() + override def postRestart(reason: Throwable): Unit = ensureContextAvailable(for (i ← 1 to 7) latch.countDown()) override def onReceive(msg: Any, sender: ActorRef): Unit = { - msg match { - case "pigdog" ⇒ sender ! "dogpig" - } + ensureContextAvailable( + msg match { + case "pigdog" ⇒ sender ! "dogpig" + }) } } } diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 319bd10a50..f775042566 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -227,15 +227,19 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi case _ ⇒ super.supervisorStrategy } - override def preStart(): Unit = me match { - case l: PreStart ⇒ l.preStart() - case _ ⇒ super.preStart() + override def preStart(): Unit = withContext { + me match { + case l: PreStart ⇒ l.preStart() + case _ ⇒ super.preStart() + } } override def postStop(): Unit = try { - me match { - case l: PostStop ⇒ l.postStop() - case _ ⇒ super.postStop() + withContext { + me match { + case l: PostStop ⇒ l.postStop() + case _ ⇒ super.postStop() + } } } finally { TypedActor(context.system).invocationHandlerFor(proxyVar.get) match { @@ -246,14 +250,18 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi } } - override def preRestart(reason: Throwable, message: Option[Any]): Unit = me match { - case l: PreRestart ⇒ l.preRestart(reason, message) - case _ ⇒ super.preRestart(reason, message) + override def preRestart(reason: Throwable, message: Option[Any]): Unit = withContext { + me match { + case l: PreRestart ⇒ l.preRestart(reason, message) + case _ ⇒ super.preRestart(reason, message) + } } - override def postRestart(reason: Throwable): Unit = me match { - case l: PostRestart ⇒ l.postRestart(reason) - case _ ⇒ super.postRestart(reason) + override def postRestart(reason: Throwable): Unit = withContext { + me match { + case l: PostRestart ⇒ l.postRestart(reason) + case _ ⇒ super.postRestart(reason) + } } protected def withContext[T](unitOfWork: ⇒ T): T = { From 6592311c37c1d886401efd39511a25daa0836abc Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 23 Apr 2012 11:14:03 +0200 Subject: [PATCH 17/36] Making sure that null-messages don't mess up SLF4J handler --- .../akka/event/slf4j/Slf4jEventHandler.scala | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/akka-slf4j/src/main/scala/akka/event/slf4j/Slf4jEventHandler.scala b/akka-slf4j/src/main/scala/akka/event/slf4j/Slf4jEventHandler.scala index 72593d4f76..966f57b938 100644 --- a/akka-slf4j/src/main/scala/akka/event/slf4j/Slf4jEventHandler.scala +++ b/akka-slf4j/src/main/scala/akka/event/slf4j/Slf4jEventHandler.scala @@ -43,25 +43,19 @@ class Slf4jEventHandler extends Actor with SLF4JLogging { case event @ Error(cause, logSource, logClass, message) ⇒ withMdc(logSource, event.thread.getName) { cause match { - case Error.NoCause ⇒ Logger(logClass, logSource).error(message.toString) - case _ ⇒ Logger(logClass, logSource).error(message.toString, cause) + case Error.NoCause | null ⇒ Logger(logClass, logSource).error(if (message != null) message.toString else null) + case cause ⇒ Logger(logClass, logSource).error(if (message != null) message.toString else cause.getLocalizedMessage, cause) } } case event @ Warning(logSource, logClass, message) ⇒ - withMdc(logSource, event.thread.getName) { - Logger(logClass, logSource).warn("{}", message.asInstanceOf[AnyRef]) - } + withMdc(logSource, event.thread.getName) { Logger(logClass, logSource).warn("{}", message.asInstanceOf[AnyRef]) } case event @ Info(logSource, logClass, message) ⇒ - withMdc(logSource, event.thread.getName) { - Logger(logClass, logSource).info("{}", message.asInstanceOf[AnyRef]) - } + withMdc(logSource, event.thread.getName) { Logger(logClass, logSource).info("{}", message.asInstanceOf[AnyRef]) } case event @ Debug(logSource, logClass, message) ⇒ - withMdc(logSource, event.thread.getName) { - Logger(logClass, logSource).debug("{}", message.asInstanceOf[AnyRef]) - } + withMdc(logSource, event.thread.getName) { Logger(logClass, logSource).debug("{}", message.asInstanceOf[AnyRef]) } case InitializeLogger(_) ⇒ log.info("Slf4jEventHandler started") From f247749e8f28593da6987531951dc66221ea9cd3 Mon Sep 17 00:00:00 2001 From: Piotr Gabryanczyk Date: Mon, 23 Apr 2012 10:18:03 +0100 Subject: [PATCH 18/36] removed info about typed actors --- akka-docs/scala/camel.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-docs/scala/camel.rst b/akka-docs/scala/camel.rst index a728ab186e..5475eb0a27 100644 --- a/akka-docs/scala/camel.rst +++ b/akka-docs/scala/camel.rst @@ -24,7 +24,7 @@ Other, more advanced external articles (for version 1) are: Introduction ============ -The akka-camel module allows actors, untyped actors, and typed actors to receive +The akka-camel module allows actors to receive and send messages over a great variety of protocols and APIs. This section gives a brief overview of the general ideas behind the akka-camel module, the remaining sections go into the details. In addition to the native Scala and Java From ec0f04a741b8eba9d465ce9435dce3e48ebbb0e4 Mon Sep 17 00:00:00 2001 From: Piotr Gabryanczyk Date: Mon, 23 Apr 2012 10:19:31 +0100 Subject: [PATCH 19/36] routeResponse returns Unit --- akka-camel/src/main/scala/akka/camel/Producer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-camel/src/main/scala/akka/camel/Producer.scala b/akka-camel/src/main/scala/akka/camel/Producer.scala index 4fd879355c..34062ef536 100644 --- a/akka-camel/src/main/scala/akka/camel/Producer.scala +++ b/akka-camel/src/main/scala/akka/camel/Producer.scala @@ -120,7 +120,7 @@ trait ProducerSupport { this: Actor ⇒ * actor). */ - protected def routeResponse(msg: Any): Any = { + protected def routeResponse(msg: Any) { if (!oneway) sender ! transformResponse(msg) } From 6286e493a6c7c8662a7cc75675820475602f0d2f Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 23 Apr 2012 11:39:52 +0200 Subject: [PATCH 20/36] Fixing signature of routeResponse to Unit instead of Any --- akka-camel/src/main/scala/akka/camel/Producer.scala | 4 +--- .../main/scala/akka/camel/javaapi/UntypedProducerActor.scala | 2 +- .../src/test/scala/akka/camel/ProducerFeatureTest.scala | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/akka-camel/src/main/scala/akka/camel/Producer.scala b/akka-camel/src/main/scala/akka/camel/Producer.scala index 34062ef536..d42b78911a 100644 --- a/akka-camel/src/main/scala/akka/camel/Producer.scala +++ b/akka-camel/src/main/scala/akka/camel/Producer.scala @@ -120,9 +120,7 @@ trait ProducerSupport { this: Actor ⇒ * actor). */ - protected def routeResponse(msg: Any) { - if (!oneway) sender ! transformResponse(msg) - } + protected def routeResponse(msg: Any): Unit = if (!oneway) sender ! transformResponse(msg) } diff --git a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala index d87d005c43..c4d0a9c1a0 100644 --- a/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala +++ b/akka-camel/src/main/scala/akka/camel/javaapi/UntypedProducerActor.scala @@ -38,7 +38,7 @@ abstract class UntypedProducerActor extends UntypedActor with ProducerSupport { final override def transformOutgoingMessage(msg: Any): AnyRef = onTransformOutgoingMessage(msg.asInstanceOf[AnyRef]) final override def transformResponse(msg: Any): AnyRef = onTransformResponse(msg.asInstanceOf[AnyRef]) - final override def routeResponse(msg: Any): Any = onRouteResponse(msg.asInstanceOf[AnyRef]) + final override def routeResponse(msg: Any): Unit = onRouteResponse(msg.asInstanceOf[AnyRef]) final override def endpointUri = getEndpointUri diff --git a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala index 4db2af577b..44ce2540c8 100644 --- a/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ProducerFeatureTest.scala @@ -277,7 +277,7 @@ object ProducerFeatureTest { class TestForwarder(uri: String, target: ActorRef) extends Actor with Producer { def endpointUri = uri - override protected def routeResponse(msg: Any): Any = { target forward msg } + override protected def routeResponse(msg: Any): Unit = target forward msg } class TestResponder extends Actor { From 4204b317597dd6fe7790887c69d0b1f253041663 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 23 Apr 2012 11:57:13 +0200 Subject: [PATCH 21/36] Adding Amir's cool prompt to sbt --- project/AkkaBuild.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 9e1c895966..19248d3a79 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -339,7 +339,8 @@ object AkkaBuild extends Build { override lazy val settings = super.settings ++ buildSettings ++ Seq( resolvers += "Sonatype Snapshot Repo" at "https://oss.sonatype.org/content/repositories/snapshots/", - resolvers += "Twitter Public Repo" at "http://maven.twttr.com" // This will be going away with com.mongodb.async's next release + resolvers += "Twitter Public Repo" at "http://maven.twttr.com", // This will be going away with com.mongodb.async's next release + shellPrompt := { s => Project.extract(s).currentProject.id + " > " } ) lazy val baseSettings = Defaults.defaultSettings ++ Publish.settings From 0c4b2a11ae6cd2edf7dfbd3bacf51abcd37fa5d1 Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 23 Apr 2012 12:25:33 +0200 Subject: [PATCH 22/36] document blocking nature of system.actorOf, see #2007 --- akka-docs/general/actor-systems.rst | 6 ++++++ akka-docs/java/untyped-actors.rst | 7 +++++++ akka-docs/scala/actors.rst | 7 +++++++ 3 files changed, 20 insertions(+) diff --git a/akka-docs/general/actor-systems.rst b/akka-docs/general/actor-systems.rst index d3113d85be..ef922293a8 100644 --- a/akka-docs/general/actor-systems.rst +++ b/akka-docs/general/actor-systems.rst @@ -101,6 +101,12 @@ Actor Best Practices breaks all the properties which make programming in actors such a nice experience. +#. Top-level actors are the innermost part of your Error Kernel, so create them + sparingly and prefer truly hierarchical systems. This has benefits wrt. + fault-handling (both considering the granularity of configuration and the + performance) and it also reduces the number of blocking calls made, since + the creation of top-level actors involves synchronous messaging. + What you should not concern yourself with ----------------------------------------- diff --git a/akka-docs/java/untyped-actors.rst b/akka-docs/java/untyped-actors.rst index 4ba40a8f1f..89553f091a 100644 --- a/akka-docs/java/untyped-actors.rst +++ b/akka-docs/java/untyped-actors.rst @@ -82,6 +82,13 @@ that is used in log messages and for identifying actors. The name must not be em or start with ``$``. If the given name is already in use by another child to the same parent actor an `InvalidActorNameException` is thrown. +.. warning:: + + Creating top-level actors with ``system.actorOf`` is a blocking operation, + hence it may dead-lock due to starvation if the default dispatcher is + overloaded. To avoid problems, do not call this method from within actors or + futures which run on the default dispatcher. + Actors are automatically started asynchronously when created. When you create the ``UntypedActor`` then it will automatically call the ``preStart`` callback method on the ``UntypedActor`` class. This is an excellent place to diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index 6c819facda..0c98960db1 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -76,6 +76,13 @@ that is used in log messages and for identifying actors. The name must not be em or start with ``$``. If the given name is already in use by another child to the same parent actor an `InvalidActorNameException` is thrown. +.. warning:: + + Creating top-level actors with ``system.actorOf`` is a blocking operation, + hence it may dead-lock due to starvation if the default dispatcher is + overloaded. To avoid problems, do not call this method from within actors or + futures which run on the default dispatcher. + Actors are automatically started asynchronously when created. When you create the ``Actor`` then it will automatically call the ``preStart`` callback method on the ``Actor`` trait. This is an excellent place to From 8df2314bb3944e1702812f4082ac2b642619f668 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 23 Apr 2012 14:30:28 +0200 Subject: [PATCH 23/36] #1946 - Adding clarification in docs about the HashedWheelTimer semantics --- akka-actor/src/main/resources/reference.conf | 2 +- akka-docs/java/scheduler.rst | 7 +++++++ akka-docs/scala/scheduler.rst | 7 +++++++ 3 files changed, 15 insertions(+), 1 deletion(-) diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 6e747f8121..3f38cfeca0 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -295,7 +295,7 @@ akka { # Used to set the behavior of the scheduler. # Changing the default values may change the system behavior drastically so make sure - # you know what you're doing! + # you know what you're doing! See the Scheduler section of the Akka documentation for more details. scheduler { # The HashedWheelTimer (HWT) implementation from Netty is used as the default scheduler # in the system. diff --git a/akka-docs/java/scheduler.rst b/akka-docs/java/scheduler.rst index 3dde1345a6..faff8d9fe0 100644 --- a/akka-docs/java/scheduler.rst +++ b/akka-docs/java/scheduler.rst @@ -15,6 +15,13 @@ You can schedule sending of messages to actors and execution of tasks (functions You will get a ``Cancellable`` back that you can call :meth:`cancel` on to cancel the execution of the scheduled operation. +.. warning:: + + The default implementation of ``Scheduler`` used by Akka is based on the Netty ``HashedWheelTimer``. + It does not execute tasks at the exact time, but on every tick, it will run everything that is overdue. + The accuracy of the default Scheduler can be modified by the "ticks-per-wheel" and "tick-duration" configuration + properties. For more information, see: `HashedWheelTimers `_. + Some examples ------------- diff --git a/akka-docs/scala/scheduler.rst b/akka-docs/scala/scheduler.rst index 6089630625..a98f0f563c 100644 --- a/akka-docs/scala/scheduler.rst +++ b/akka-docs/scala/scheduler.rst @@ -15,6 +15,13 @@ You can schedule sending of messages to actors and execution of tasks (functions You will get a ``Cancellable`` back that you can call :meth:`cancel` on to cancel the execution of the scheduled operation. +.. warning:: + + The default implementation of ``Scheduler`` used by Akka is based on the Netty ``HashedWheelTimer``. + It does not execute tasks at the exact time, but on every tick, it will run everything that is overdue. + The accuracy of the default Scheduler can be modified by the "ticks-per-wheel" and "tick-duration" configuration + properties. For more information, see: `HashedWheelTimers `_. + Some examples ------------- From b5960253a22eb79a537f076d79ad48c2cb249d90 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 23 Apr 2012 14:50:26 +0200 Subject: [PATCH 24/36] #1990 - Adding Java and Scala version of the microkernel docs --- akka-docs/java/index.rst | 1 + akka-docs/java/microkernel.rst | 67 +++++++++++++++++++ akka-docs/modules/index.rst | 1 - akka-docs/scala/index.rst | 1 + akka-docs/{modules => scala}/microkernel.rst | 2 +- .../sample/kernel/hello/java/HelloKernel.java | 43 ++++++++++++ 6 files changed, 113 insertions(+), 2 deletions(-) create mode 100644 akka-docs/java/microkernel.rst rename akka-docs/{modules => scala}/microkernel.rst (99%) create mode 100644 akka-samples/akka-sample-hello-kernel/src/main/java/sample/kernel/hello/java/HelloKernel.java diff --git a/akka-docs/java/index.rst b/akka-docs/java/index.rst index 4b7dcb5ebf..981e07f869 100644 --- a/akka-docs/java/index.rst +++ b/akka-docs/java/index.rst @@ -23,3 +23,4 @@ Java API fsm extending-akka zeromq + microkernel diff --git a/akka-docs/java/microkernel.rst b/akka-docs/java/microkernel.rst new file mode 100644 index 0000000000..7416838537 --- /dev/null +++ b/akka-docs/java/microkernel.rst @@ -0,0 +1,67 @@ + +.. _microkernel: + +############# + Microkernel (Java) +############# + +The Akka Microkernel is included in the Akka download found at `downloads`_. + +.. _downloads: http://akka.io/downloads + +To run an application with the microkernel you need to create a Bootable class +that handles the startup and shutdown the application. An example is included below. + +Put your application jar in the ``deploy`` directory to have it automatically +loaded. + +To start the kernel use the scripts in the ``bin`` directory, passing the boot +classes for your application. + +There is a simple example of an application setup for running with the +microkernel included in the akka download. This can be run with the following +command (on a unix-based system): + +.. code-block:: none + + bin/akka sample.kernel.hello.HelloKernel + +Use ``Ctrl-C`` to interrupt and exit the microkernel. + +On a Windows machine you can also use the bin/akka.bat script. + +The code for the Hello Kernel example (see the ``HelloKernel`` class for an example +of creating a Bootable): + +.. includecode:: ../../akka-samples/akka-sample-hello-kernel/src/main/java/sample/kernel/hello/java/HelloKernel.java + + +Distribution of microkernel application +--------------------------------------- + +To make a distribution package of the microkernel and your application the ``akka-sbt-plugin`` provides +``AkkaKernelPlugin``. It creates the directory structure, with jar files, configuration files and +start scripts. + +To use the sbt plugin you define it in your ``project/plugins.sbt``: + +.. includecode:: ../../akka-sbt-plugin/sample/project/plugins.sbt + +Then you add it to the settings of your ``project/Build.scala``. It is also important that you add the ``akka-kernel`` dependency. +This is an example of a complete sbt build file: + +.. includecode:: ../../akka-sbt-plugin/sample/project/Build.scala + +Run the plugin with sbt:: + + > dist + > dist:clean + +There are several settings that can be defined: + +* ``outputDirectory`` - destination directory of the package, default ``target/dist`` +* ``distJvmOptions`` - JVM parameters to be used in the start script +* ``configSourceDirs`` - Configuration files are copied from these directories, default ``src/config``, ``src/main/config``, ``src/main/resources`` +* ``distMainClass`` - Kernel main class to use in start script +* ``libFilter`` - Filter of dependency jar files +* ``additionalLibs`` - Additional dependency jar files diff --git a/akka-docs/modules/index.rst b/akka-docs/modules/index.rst index 603eeb2084..1abb0b7a0e 100644 --- a/akka-docs/modules/index.rst +++ b/akka-docs/modules/index.rst @@ -6,6 +6,5 @@ Modules durable-mailbox http - microkernel camel spring diff --git a/akka-docs/scala/index.rst b/akka-docs/scala/index.rst index 46a84fe064..0019271e2d 100644 --- a/akka-docs/scala/index.rst +++ b/akka-docs/scala/index.rst @@ -26,3 +26,4 @@ Scala API testing extending-akka zeromq + microkernel diff --git a/akka-docs/modules/microkernel.rst b/akka-docs/scala/microkernel.rst similarity index 99% rename from akka-docs/modules/microkernel.rst rename to akka-docs/scala/microkernel.rst index 7600e1ebd2..236149964c 100644 --- a/akka-docs/modules/microkernel.rst +++ b/akka-docs/scala/microkernel.rst @@ -2,7 +2,7 @@ .. _microkernel: ############# - Microkernel + Microkernel (Scala) ############# The Akka Microkernel is included in the Akka download found at `downloads`_. diff --git a/akka-samples/akka-sample-hello-kernel/src/main/java/sample/kernel/hello/java/HelloKernel.java b/akka-samples/akka-sample-hello-kernel/src/main/java/sample/kernel/hello/java/HelloKernel.java new file mode 100644 index 0000000000..d0ccc4ad79 --- /dev/null +++ b/akka-samples/akka-sample-hello-kernel/src/main/java/sample/kernel/hello/java/HelloKernel.java @@ -0,0 +1,43 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package sample.kernel.hello.java; + +import akka.actor.ActorRef; +import akka.actor.UntypedActor; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.kernel.Bootable; + +public class HelloKernel implements Bootable { + final ActorSystem system = ActorSystem.create("hellokernel"); + + static class HelloActor extends UntypedActor { + final ActorRef worldActor = + getContext().actorOf(new Props(WorldActor.class)); + + public void onReceive(Object message) { + if (message == "start") + worldActor.tell("Hello"); + else if (message instanceof String) + System.out.println("Received message '%s'".format((String)message)); + else unhandled(message); + } +} + +static class WorldActor extends UntypedActor { + public void onReceive(Object message) { + if (message instanceof String) + getSender().tell(((String)message).toUpperCase() + " world!"); + else unhandled(message); + } +} + + public void startup() { + system.actorOf(new Props(HelloActor.class)).tell("start"); + } + + public void shutdown() { + system.shutdown(); + } +} From 84c08e8a63afd2a539d9a0e5d884e0b1104f9713 Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 23 Apr 2012 15:40:21 +0200 Subject: [PATCH 25/36] add option to log UnhandledMessage, see #1999 - LoggingBus.startDefaultLoggers will register a forwarder actor for UnhandledMessage, which republishes them as Debug message - add tests and docs - TestEventListener logs UnhandledMessage as Warning --- .../scala/akka/event/EventStreamSpec.scala | 36 ++++++++++++--- akka-actor/src/main/resources/reference.conf | 8 ++++ .../main/scala/akka/actor/ActorSystem.scala | 3 ++ .../src/main/scala/akka/event/Logging.scala | 44 ++++++++++++------- akka-docs/scala/actors.rst | 4 +- .../akka/testkit/TestEventListener.scala | 7 ++- .../test/scala/akka/testkit/AkkaSpec.scala | 12 +++++ 7 files changed, 91 insertions(+), 23 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala index add8173085..d2497c4a69 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala @@ -5,10 +5,11 @@ package akka.event import akka.testkit.AkkaSpec import akka.util.duration._ -import akka.actor.{ Actor, ActorRef, ActorSystemImpl } +import akka.actor.{ Actor, ActorRef, ActorSystemImpl, ActorSystem, Props, UnhandledMessage } import com.typesafe.config.ConfigFactory import scala.collection.JavaConverters._ -import akka.actor.ActorSystem +import akka.event.Logging.InitializeLogger +import akka.pattern.gracefulStop object EventStreamSpec { @@ -20,6 +21,14 @@ object EventStreamSpec { } """.format(Logging.StandardOutLoggerName)) + val configUnhandled = ConfigFactory.parseString(""" + akka { + stdout-loglevel = WARNING + loglevel = DEBUG + actor.debug.unhandled = on + } + """) + case class M(i: Int) case class SetTarget(ref: ActorRef) @@ -27,9 +36,13 @@ object EventStreamSpec { class MyLog extends Actor { var dst: ActorRef = context.system.deadLetters def receive = { - case Logging.InitializeLogger(bus) ⇒ bus.subscribe(context.self, classOf[SetTarget]); sender ! Logging.LoggerInitialized - case SetTarget(ref) ⇒ dst = ref; dst ! "OK" - case e: Logging.LogEvent ⇒ dst ! e + case Logging.InitializeLogger(bus) ⇒ + bus.subscribe(context.self, classOf[SetTarget]) + bus.subscribe(context.self, classOf[UnhandledMessage]) + sender ! Logging.LoggerInitialized + case SetTarget(ref) ⇒ dst = ref; dst ! "OK" + case e: Logging.LogEvent ⇒ dst ! e + case u: UnhandledMessage ⇒ dst ! u } } @@ -61,6 +74,19 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { } } + "be able to log unhandled messages" in { + val sys = ActorSystem("EventStreamSpecUnhandled", configUnhandled) + try { + sys.eventStream.subscribe(testActor, classOf[AnyRef]) + val m = UnhandledMessage(42, sys.deadLetters, sys.deadLetters) + sys.eventStream.publish(m) + expectMsgAllOf(m, Logging.Debug(sys.deadLetters.path.toString, sys.deadLetters.getClass, "unhandled message from " + sys.deadLetters + ": 42")) + sys.eventStream.unsubscribe(testActor) + } finally { + sys.shutdown() + } + } + "manage log levels" in { val bus = new EventStream(false) bus.startDefaultLoggers(impl) diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 6e747f8121..4417fa450d 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -14,6 +14,11 @@ akka { # Event handlers to register at boot time (Logging$DefaultLogger logs to STDOUT) event-handlers = ["akka.event.Logging$DefaultLogger"] + + # Event handlers are created and registered synchronously during ActorSystem + # start-up, and since they are actors, this timeout is used to bound the + # waiting time + event-handler-startup-timeout = 5s # Log level used by the configured loggers (see "event-handlers") as soon # as they have been started; before that, see "stdout-loglevel" @@ -275,6 +280,9 @@ akka { # enable DEBUG logging of subscription changes on the eventStream event-stream = off + + # enable DEBUG logging of unhandled messages + unhandled = off } # Entries for pluggable serializers and their bindings. diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 4112905711..3520093c7e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -125,12 +125,15 @@ object ActorSystem { final val LogLevel = getString("akka.loglevel") final val StdoutLogLevel = getString("akka.stdout-loglevel") final val EventHandlers: Seq[String] = getStringList("akka.event-handlers").asScala + final val EventHandlerStartTimeout = Timeout(Duration(getMilliseconds("akka.event-handler-startup-timeout"), MILLISECONDS)) final val LogConfigOnStart = config.getBoolean("akka.log-config-on-start") + final val AddLoggingReceive = getBoolean("akka.actor.debug.receive") final val DebugAutoReceive = getBoolean("akka.actor.debug.autoreceive") final val DebugLifecycle = getBoolean("akka.actor.debug.lifecycle") final val FsmDebugEvent = getBoolean("akka.actor.debug.fsm") final val DebugEventStream = getBoolean("akka.actor.debug.event-stream") + final val DebugUnhandledMessage = getBoolean("akka.actor.debug.unhandled") final val Home = config.getString("akka.home") match { case "" ⇒ None diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index 3383ea2fee..11f2aec201 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -95,26 +95,40 @@ trait LoggingBus extends ActorEventBus { case Nil ⇒ "akka.event.Logging$DefaultLogger" :: Nil case loggers ⇒ loggers } - val myloggers = for { - loggerName ← defaultLoggers - if loggerName != StandardOutLoggerName - } yield { - try { - system.dynamicAccess.getClassFor[Actor](loggerName) match { - case Right(actorClass) ⇒ addLogger(system, actorClass, level, logName) - case Left(exception) ⇒ throw exception + val myloggers = + for { + loggerName ← defaultLoggers + if loggerName != StandardOutLoggerName + } yield { + try { + system.dynamicAccess.getClassFor[Actor](loggerName) match { + case Right(actorClass) ⇒ addLogger(system, actorClass, level, logName) + case Left(exception) ⇒ throw exception + } + } catch { + case e: Exception ⇒ + throw new ConfigurationException( + "Event Handler specified in config can't be loaded [" + loggerName + + "] due to [" + e.toString + "]", e) } - } catch { - case e: Exception ⇒ - throw new ConfigurationException( - "Event Handler specified in config can't be loaded [" + loggerName + - "] due to [" + e.toString + "]", e) } - } guard.withGuard { loggers = myloggers _logLevel = level } + try { + if (system.settings.DebugUnhandledMessage) + subscribe(system.systemActorOf(Props(new Actor { + println("started" + self) + def receive = { + case UnhandledMessage(msg, sender, rcp) ⇒ + println("got it") + publish(Debug(rcp.path.toString, rcp.getClass, "unhandled message from " + sender + ": " + msg)) + } + }), "UnhandledMessageForwarder"), classOf[UnhandledMessage]) + } catch { + case _: InvalidActorNameException ⇒ // ignore if it is already running + } publish(Debug(logName, this.getClass, "Default Loggers started")) if (!(defaultLoggers contains StandardOutLoggerName)) { unsubscribe(StandardOutLogger) @@ -153,7 +167,7 @@ trait LoggingBus extends ActorEventBus { private def addLogger(system: ActorSystemImpl, clazz: Class[_ <: Actor], level: LogLevel, logName: String): ActorRef = { val name = "log" + Extension(system).id() + "-" + simpleName(clazz) val actor = system.systemActorOf(Props(clazz), name) - implicit val timeout = Timeout(5 seconds) + implicit def timeout = system.settings.EventHandlerStartTimeout import akka.pattern.ask val response = try Await.result(actor ? InitializeLogger(this), timeout.duration) catch { case _: TimeoutException ⇒ diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index 0c98960db1..fae84c080f 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -150,7 +150,9 @@ The :class:`Actor` trait defines only one abstract method, the above mentioned If the current actor behavior does not match a received message, :meth:`unhandled` is called, which by default publishes an ``akka.actor.UnhandledMessage(message, sender, recipient)`` on the actor -system’s event stream. +system’s event stream (set configuration item +``akka.event-handler-startup-timeout`` to ``true`` to have them converted into +actual Debug messages) In addition, it offers: diff --git a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala index 56ced17370..97fe6e99aa 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala @@ -4,7 +4,7 @@ package akka.testkit import scala.util.matching.Regex -import akka.actor.{ DeadLetter, ActorSystem, Terminated } +import akka.actor.{ DeadLetter, ActorSystem, Terminated, UnhandledMessage } import akka.dispatch.{ SystemMessage, Terminate } import akka.event.Logging.{ Warning, LogEvent, InitializeLogger, Info, Error, Debug, LoggerInitialized } import akka.event.Logging @@ -447,7 +447,7 @@ class TestEventListener extends Logging.DefaultLogger { override def receive = { case InitializeLogger(bus) ⇒ - Seq(classOf[Mute], classOf[UnMute], classOf[DeadLetter]) foreach (bus.subscribe(context.self, _)) + Seq(classOf[Mute], classOf[UnMute], classOf[DeadLetter], classOf[UnhandledMessage]) foreach (bus.subscribe(context.self, _)) sender ! LoggerInitialized case Mute(filters) ⇒ filters foreach addFilter case UnMute(filters) ⇒ filters foreach removeFilter @@ -462,6 +462,9 @@ class TestEventListener extends Logging.DefaultLogger { val event = Warning(rcp.path.toString, rcp.getClass, "received dead letter from " + snd + ": " + msg) if (!filter(event)) print(event) } + case UnhandledMessage(msg, sender, rcp) ⇒ + val event = Warning(rcp.path.toString, rcp.getClass, "unhandled message from " + sender + ": " + msg) + if (!filter(event)) print(event) case m ⇒ print(Debug(context.system.name, this.getClass, m)) } diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index 3a0f02c79a..fd763e6bad 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -92,6 +92,18 @@ class AkkaSpecSpec extends WordSpec with MustMatchers { "An AkkaSpec" must { + "warn about unhandled messages" in { + implicit val system = ActorSystem("AkkaSpec0", AkkaSpec.testConf) + try { + val a = system.actorOf(Props.empty) + EventFilter.warning(start = "unhandled message", occurrences = 1) intercept { + a ! 42 + } + } finally { + system.shutdown() + } + } + "terminate all actors" in { // verbose config just for demonstration purposes, please leave in in case of debugging import scala.collection.JavaConverters._ From 8ac7ac849985ec297316bd64b65b86b586c1ea55 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 23 Apr 2012 16:19:58 +0200 Subject: [PATCH 26/36] Adding docs section on how to proxy ActorRefs with TypedActors --- .../akka/docs/actor/TypedActorDocTestBase.java | 16 ++++++++++++++++ akka-docs/java/typed-actors.rst | 7 +++++++ .../code/akka/docs/actor/TypedActorDocSpec.scala | 11 +++++++++++ akka-docs/scala/typed-actors.rst | 7 +++++++ 4 files changed, 41 insertions(+) diff --git a/akka-docs/java/code/akka/docs/actor/TypedActorDocTestBase.java b/akka-docs/java/code/akka/docs/actor/TypedActorDocTestBase.java index 72f950e2e7..30db92ee0f 100644 --- a/akka-docs/java/code/akka/docs/actor/TypedActorDocTestBase.java +++ b/akka-docs/java/code/akka/docs/actor/TypedActorDocTestBase.java @@ -150,4 +150,20 @@ public class TypedActorDocTestBase { //Ignore } } + + @Test public void proxyAnyActorRef() { + try { + //#typed-actor-remote + Squarer typedActor = + TypedActor.get(system). + typedActorOf( + new TypedProps(Squarer.class), + system.actorFor("akka://SomeSystem@somehost:2552/user/some/foobar") + ); + //Use "typedActor" as a FooBar + //#typed-actor-remote + } catch (Exception e) { + //dun care + } + } } diff --git a/akka-docs/java/typed-actors.rst b/akka-docs/java/typed-actors.rst index c1de57c396..b2d7a9bfae 100644 --- a/akka-docs/java/typed-actors.rst +++ b/akka-docs/java/typed-actors.rst @@ -198,3 +198,10 @@ Proxying You can use the ``typedActorOf`` that takes a TypedProps and an ActorRef to proxy the given ActorRef as a TypedActor. This is usable if you want to communicate remotely with TypedActors on other machines, just look them up with ``actorFor`` and pass the ``ActorRef`` to ``typedActorOf``. + +Lookup & Remoting +----------------- + +Since ``TypedActors`` are backed by ``Akka Actors``, you can use ``actorFor`` together with ``typedActorOf`` to proxy ``ActorRefs`` potentially residing on remote nodes. + +.. includecode:: code/akka/docs/actor/TypedActorDocTestBase.java#typed-actor-remote \ No newline at end of file diff --git a/akka-docs/scala/code/akka/docs/actor/TypedActorDocSpec.scala b/akka-docs/scala/code/akka/docs/actor/TypedActorDocSpec.scala index b1fcc9224c..f7c5fa9bf7 100644 --- a/akka-docs/scala/code/akka/docs/actor/TypedActorDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/actor/TypedActorDocSpec.scala @@ -140,6 +140,17 @@ class TypedActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { //#typed-actor-poisonpill } + "proxy any ActorRef" in { + //#typed-actor-remote + val typedActor: Foo with Bar = + TypedActor(system). + typedActorOf( + TypedProps[FooBar], + system.actorFor("akka://SomeSystem@somehost:2552/user/some/foobar")) + //Use "typedActor" as a FooBar + //#typed-actor-remote + } + "supercharge" in { //#typed-actor-supercharge-usage val awesomeFooBar: Foo with Bar = TypedActor(system).typedActorOf(TypedProps[FooBar]()) diff --git a/akka-docs/scala/typed-actors.rst b/akka-docs/scala/typed-actors.rst index 8db250fec1..fc570e60a7 100644 --- a/akka-docs/scala/typed-actors.rst +++ b/akka-docs/scala/typed-actors.rst @@ -203,6 +203,13 @@ This is usable if you want to communicate remotely with TypedActors on other mac The ActorRef needs to accept ``MethodCall`` messages. +Lookup & Remoting +----------------- + +Since ``TypedActors`` are backed by ``Akka Actors``, you can use ``actorFor`` together with ``typedActorOf`` to proxy ``ActorRefs`` potentially residing on remote nodes. + +.. includecode:: code/akka/docs/actor/TypedActorDocSpec.scala#typed-actor-remote + Supercharging ------------- From 7cc4c03018addabe046b05754a83dddb7d8d1a25 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 23 Apr 2012 16:38:22 +0200 Subject: [PATCH 27/36] #1962 - Clarifying the error logging when dropping inbound messages and lazy-fying the logger --- akka-remote/src/main/scala/akka/remote/RemoteTransport.scala | 4 ++-- .../src/main/scala/akka/remote/netty/NettyRemoteSupport.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index 77bc8320c0..703803163a 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -288,9 +288,9 @@ trait RemoteMarshallingOps { case AddressFromURIString(address) if address == provider.transport.address ⇒ // if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed) r.!(remoteMessage.payload)(remoteMessage.sender) - case r ⇒ log.error("dropping message {} for non-local recipient {} at {} local is {}", remoteMessage.payload, r, address, provider.transport.address) + case r ⇒ log.error("dropping message {} for non-local recipient {} arriving at {} inbound address is {}", remoteMessage.payload, r, address, provider.transport.address) } - case r ⇒ log.error("dropping message {} for non-local recipient {} of type {}", remoteMessage.payload, r, if (r ne null) r.getClass else "null") + case r ⇒ log.error("dropping message {} for non-local recipient {} arriving at {} inbound address is {}", remoteMessage.payload, r, address, provider.transport.address) } } } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 42b319e4e5..6062321841 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -66,7 +66,7 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor def address = _address.get - val log = Logging(system.eventStream, "NettyRemoteTransport(" + address + ")") + lazy val log = Logging(system.eventStream, "NettyRemoteTransport(" + address + ")") def start(): Unit = { server.start() From 5f2b23c0c8bf26287d3bec743bc27de719d72fee Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 23 Apr 2012 18:59:49 +0200 Subject: [PATCH 28/36] #2005 - Putting the required fields into RemoteTransport and took the opportunity to clean up use of ActorSystemImpl --- .../src/main/scala/akka/actor/ActorSystem.scala | 7 ++++++- .../scala/akka/remote/RemoteActorRefProvider.scala | 3 +-- .../main/scala/akka/remote/RemoteTransport.scala | 13 ++++--------- .../src/main/scala/akka/remote/netty/Client.scala | 2 +- .../akka/remote/netty/NettyRemoteSupport.scala | 7 ++++--- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 4112905711..486dbe3ae5 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -17,9 +17,9 @@ import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap import java.io.Closeable import akka.dispatch.Await.Awaitable import akka.dispatch.Await.CanAwait -import java.util.concurrent.{ CountDownLatch, TimeoutException, RejectedExecutionException } import akka.util._ import collection.immutable.Stack +import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException } object ActorSystem { @@ -404,6 +404,11 @@ abstract class ExtendedActorSystem extends ActorSystem { */ def deathWatch: DeathWatch + /** + * A ThreadFactory that can be used if the transport needs to create any Threads + */ + def threadFactory: ThreadFactory + /** * ClassLoader wrapper which is used for reflective accesses internally. This is set * to use the context class loader, if one is set, or the class loader which diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 5870af9f95..a4d9a8d0c6 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -78,8 +78,7 @@ class RemoteActorRefProvider( _transport = { val fqn = remoteSettings.RemoteTransport val args = Seq( - classOf[RemoteSettings] -> remoteSettings, - classOf[ActorSystemImpl] -> system, + classOf[ExtendedActorSystem] -> system, classOf[RemoteActorRefProvider] -> this) system.dynamicAccess.createInstanceFor[RemoteTransport](fqn, args) match { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index 703803163a..3bade97460 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -5,13 +5,13 @@ package akka.remote import scala.reflect.BeanProperty -import akka.actor.{ Terminated, LocalRef, InternalActorRef, AutoReceivedMessage, AddressFromURIString, Address, ActorSystemImpl, ActorSystem, ActorRef } import akka.dispatch.SystemMessage import akka.event.{ LoggingAdapter, Logging } import akka.AkkaException import akka.serialization.Serialization import akka.remote.RemoteProtocol._ import akka.dispatch.ChildTerminated +import akka.actor._ /** * Remote life-cycle events. @@ -152,7 +152,7 @@ class RemoteTransportException(message: String, cause: Throwable) extends AkkaEx * be available (i.e. fully initialized) by the time the first message is * received or when the start() method returns, whatever happens first. */ -abstract class RemoteTransport { +abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: RemoteActorRefProvider) { /** * Shuts down the remoting */ @@ -163,11 +163,6 @@ abstract class RemoteTransport { */ def address: Address - /** - * The actor system, for which this transport is instantiated. Will publish to its eventStream. - */ - def system: ActorSystem - /** * Start up the transport, i.e. enable incoming connections. */ @@ -197,7 +192,7 @@ abstract class RemoteTransport { override def toString = address.toString } -class RemoteMessage(input: RemoteMessageProtocol, system: ActorSystemImpl) { +class RemoteMessage(input: RemoteMessageProtocol, system: ExtendedActorSystem) { def originalReceiver = input.getRecipient.getPath @@ -216,7 +211,7 @@ trait RemoteMarshallingOps { def log: LoggingAdapter - def system: ActorSystemImpl + def system: ExtendedActorSystem def provider: RemoteActorRefProvider diff --git a/akka-remote/src/main/scala/akka/remote/netty/Client.scala b/akka-remote/src/main/scala/akka/remote/netty/Client.scala index a0e91398fc..7baf3011ee 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Client.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Client.scala @@ -58,7 +58,7 @@ abstract class RemoteClient private[akka] ( * Converts the message to the wireprotocol and sends the message across the wire */ def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit = if (isRunning) { - if (netty.remoteSettings.LogSend) log.debug("Sending message {} from {} to {}", message, senderOption, recipient) + if (netty.provider.remoteSettings.LogSend) log.debug("Sending message {} from {} to {}", message, senderOption, recipient) send((message, senderOption, recipient)) } else { val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", netty, remoteAddress) diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 6062321841..cf859c3db2 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -16,18 +16,19 @@ import org.jboss.netty.channel.{ ChannelHandlerContext, Channel } import org.jboss.netty.handler.codec.protobuf.{ ProtobufEncoder, ProtobufDecoder } import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor import org.jboss.netty.util.HashedWheelTimer -import akka.actor.{ Address, ActorSystemImpl, ActorRef } import akka.dispatch.MonitorableThreadFactory import akka.event.Logging import akka.remote.RemoteProtocol.AkkaRemoteProtocol import akka.remote.{ RemoteTransportException, RemoteTransport, RemoteSettings, RemoteMarshallingOps, RemoteActorRefProvider, RemoteActorRef, RemoteServerStarted } import akka.util.NonFatal +import akka.actor.{ ExtendedActorSystem, Address, ActorRef } /** * Provides the implementation of the Netty remote support */ -class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: ActorSystemImpl, val provider: RemoteActorRefProvider) - extends RemoteTransport with RemoteMarshallingOps { +class NettyRemoteTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) with RemoteMarshallingOps { + + import provider.remoteSettings val settings = new NettySettings(remoteSettings.config.getConfig("akka.remote.netty"), remoteSettings.systemName) From 670bb396f9a03023b94acaec1793ea06f25ae99d Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 23 Apr 2012 19:07:27 +0200 Subject: [PATCH 29/36] Adding @implicitNotFound annotation on LogSource --- akka-actor/src/main/scala/akka/event/Logging.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index 3383ea2fee..1a2cb2c520 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -14,6 +14,7 @@ import java.util.concurrent.atomic.AtomicInteger import scala.util.control.NoStackTrace import java.util.concurrent.TimeoutException import akka.dispatch.Await +import annotation.implicitNotFound /** * This trait brings log level handling to the EventStream: it reads the log @@ -211,7 +212,7 @@ trait LoggingBus extends ActorEventBus { * * The default implementation of the second variant will just call the first. */ -trait LogSource[-T] { +@implicitNotFound("Cannot find LogSource for ${T} please see ScalaDoc for LogSource for how to obtain or construct one.") trait LogSource[-T] { def genString(t: T): String def genString(t: T, system: ActorSystem): String = genString(t) def getClazz(t: T): Class[_] = t.getClass From 223f86be8fbf0a357099c206dfd9900f1d3153c2 Mon Sep 17 00:00:00 2001 From: Amir Moulavi Date: Mon, 23 Apr 2012 20:24:54 +0200 Subject: [PATCH 30/36] akka-cluster: vector clocks test - merge two disjoint vector clocks --- .../scala/akka/cluster/VectorClockSpec.scala | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala b/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala index d0e4c8da13..de1142b668 100644 --- a/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/VectorClockSpec.scala @@ -157,6 +157,45 @@ class VectorClockSpec extends AkkaSpec { merged1 == merged2 must be(true) } + "correctly merge two disjoint vector clocks" in { + val node1 = Node("1") + val node2 = Node("2") + val node3 = Node("3") + val node4 = Node("4") + + val clock1_1 = VectorClock() + val clock2_1 = clock1_1 + node1 + val clock3_1 = clock2_1 + node2 + val clock4_1 = clock3_1 + node2 + val clock5_1 = clock4_1 + node3 + + val clock1_2 = VectorClock() + val clock2_2 = clock1_2 + node4 + val clock3_2 = clock2_2 + node4 + + val merged1 = clock3_2 merge clock5_1 + merged1.versions.size must be(4) + merged1.versions.contains(node1) must be(true) + merged1.versions.contains(node2) must be(true) + merged1.versions.contains(node3) must be(true) + merged1.versions.contains(node4) must be(true) + + val merged2 = clock5_1 merge clock3_2 + merged2.versions.size must be(4) + merged2.versions.contains(node1) must be(true) + merged2.versions.contains(node2) must be(true) + merged2.versions.contains(node3) must be(true) + merged2.versions.contains(node4) must be(true) + + clock3_2 < merged1 must be(true) + clock5_1 < merged1 must be(true) + + clock3_2 < merged2 must be(true) + clock5_1 < merged2 must be(true) + + merged1 == merged2 must be(true) + } + "pass blank clock incrementing" in { val node1 = Node("1") val node2 = Node("2") From d852e6245aaf92d25766785c4fe75f99619ce259 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 24 Apr 2012 00:57:50 +0200 Subject: [PATCH 31/36] #1930 - use new favicon for akka docs --- akka-docs/_sphinx/static/favicon.ico | Bin 1203 -> 658 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/akka-docs/_sphinx/static/favicon.ico b/akka-docs/_sphinx/static/favicon.ico index 858e2dbebda17d0c25eef4cbefdc74bae71bbad1..9e8f8e96246bc38e97630e8f2042877269c558ef 100644 GIT binary patch delta 634 zcmdnYIf-?GayEaktacj=xXphT@67BQf?{`i!J(MM)yB5?p2SoMo=xVTbu4t1r@X_)< zvrfeD&4>T{)=j#^7j0kh;Hm8$+w~qd9-U%1$8fS`x2x&EfH zi(cUrP1ly756l83^%bjUSf`t9PIKF!oGlx<^lFcPOla(McV9KEtLwqiZoh0=^&rSm6lEL>G4>hmruh<)`@r#V-)8xcKiy zC(3)n-`n{KKS(LImR#w%W{dhP*P61E8Grs{obLSyR&8KKZfJ_#oGmQRxSMh*TB_u-~KF((vy?#HLiDyj#;zq{fhb1 z^X)&Z_-Jr&ALF*rIZQ4dcHEC=%~ifQ?Sw{tCgb_v>??D5T0ZkLJFU@r!x;2)e@UiS q;1ZEo``HpqyMpcnbAA8L$Z-FpX8w^hUS0+U1_n=8KbLh*2~7a79~a#K delta 1184 zcmbQlx|wr=ay_GVrn7TEW^ytEL&dGB{d>I*Cy2Do*XIyxjWXpT5^U;;f&MiEAOV59qGyU+xJqu=r z9nQ;Am2mSai`w>Z$2`OTUX89{D~c}gUOJMsy`N!`1=r8x=h^F-9^@X+y5XCatmv_X zNzGxY_%cQgVTqX4E{zh*yX-wgI@}$mr_Z(ONMCT{{2y73Uo-2OTwm^x>&SO_Z@rE^ zit)nVGuIhoTg|x^$f+w=UbGK)J8I&4_|?(6j5&QlqI|)R4Y-nK9@(_|klcj7><5ar z-N?LoENy4l?fbJfeh-nkUT8tAbq9*UkugjfQ`_2C>{n*r^u3@L=ytj7` z{Wa6M`}C%pq}T2bCjVFuuMfz7%!$LycG-`ZlVJ}+a^wz3_5;q{Lay$**a;O>?z&*37)n$j6uEc;=Gy zJ`1tu2h`_oiPE(>uPni{Y5#}%sQXv9<(p1yvco}Qe{!^ z61Xyiqy4Z&U$f5X>KVWFH|uo&{8?EN>%OnzrO?6vj)%W1=D90POnCJzSI)lb$YFMV z0jC5BHhX{HjXQRns8KttZ){w`!qn5-yU;|ccgp+9BaaFX&2n2CmtJ2}Gbd@|iO(^8 z8 Date: Tue, 24 Apr 2012 07:59:12 +0200 Subject: [PATCH 32/36] closes #2013: Typos in the docs, using parens for arity-0 methods which are not referentially transparent --- akka-docs/general/actor-systems.rst | 4 ++-- .../src/main/scala/akka/tutorial/first/scala/Pi.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/akka-docs/general/actor-systems.rst b/akka-docs/general/actor-systems.rst index ef922293a8..2051f2d845 100644 --- a/akka-docs/general/actor-systems.rst +++ b/akka-docs/general/actor-systems.rst @@ -29,7 +29,7 @@ The quintessential feature of actor systems is that tasks are split up and delegated until they become small enough to be handled in one piece. In doing so, not only is the task itself clearly structured, but the resulting actors can be reasoned about in terms of which messages they should process, how they -should react nominally and how failure should be handled. If one actor does not +should react normally and how failure should be handled. If one actor does not have the means for dealing with a certain situation, it sends a corresponding failure message to its supervisor, asking for help. The recursive structure then allows to handle failure at the right level. @@ -41,7 +41,7 @@ trying to keep everything “under the carpet”. Now, the difficulty in designing such a system is how to decide who should supervise what. There is of course no single best solution, but there are a few -guide lines which might be helpful: +guidelines which might be helpful: - If one actor manages the work another actor is doing, e.g. by passing on sub-tasks, then the manager should supervise the child. The reason is that diff --git a/akka-tutorials/akka-tutorial-first/src/main/scala/akka/tutorial/first/scala/Pi.scala b/akka-tutorials/akka-tutorial-first/src/main/scala/akka/tutorial/first/scala/Pi.scala index 19fa23544c..94fb83bbd3 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/scala/akka/tutorial/first/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-first/src/main/scala/akka/tutorial/first/scala/Pi.scala @@ -49,7 +49,7 @@ object Pi extends App { var pi: Double = _ var nrOfResults: Int = _ - val start: Long = System.currentTimeMillis + val start: Long = System.currentTimeMillis() //#create-router val workerRouter = context.actorOf( @@ -66,7 +66,7 @@ object Pi extends App { nrOfResults += 1 if (nrOfResults == nrOfMessages) { // Send the result to the listener - listener ! PiApproximation(pi, duration = (System.currentTimeMillis - start).millis) + listener ! PiApproximation(pi, duration = (System.currentTimeMillis() - start).millis) // Stops this actor and all its supervised children context.stop(self) } From 456223dc13545286bf81b93ba0233f4ce6d1d7ff Mon Sep 17 00:00:00 2001 From: Piotr Gabryanczyk Date: Sat, 21 Apr 2012 14:52:43 +0100 Subject: [PATCH 33/36] Initial doc import. Adding just an introduction as this is my first commit to docs, so this is just to set the scene. --- akka-docs/scala/index.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/akka-docs/scala/index.rst b/akka-docs/scala/index.rst index 0019271e2d..fc1b619e26 100644 --- a/akka-docs/scala/index.rst +++ b/akka-docs/scala/index.rst @@ -27,3 +27,4 @@ Scala API extending-akka zeromq microkernel + camel From b9a4fda27f21210b4650ee582099f0c4272447e2 Mon Sep 17 00:00:00 2001 From: Piotr Gabryanczyk Date: Sat, 21 Apr 2012 14:53:01 +0100 Subject: [PATCH 34/36] Initial doc import. Adding just an introduction as this is my first commit to docs, so this is just to set the scene. --- akka-docs/scala/camel.rst | 115 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 akka-docs/scala/camel.rst diff --git a/akka-docs/scala/camel.rst b/akka-docs/scala/camel.rst new file mode 100644 index 0000000000..a173f542c0 --- /dev/null +++ b/akka-docs/scala/camel.rst @@ -0,0 +1,115 @@ + +.. _camel-scala: + +####### + Camel +####### + +For an introduction to akka-camel 2, see also the Peter Gabryanczyk's talk `Migrating akka-camel module to Akka 2.x`_. + +For an introduction to akka-camel 1, see also the `Appendix E - Akka and Camel`_ +(pdf) of the book `Camel in Action`_. + +.. _Appendix E - Akka and Camel: http://www.manning.com/ibsen/appEsample.pdf +.. _Camel in Action: http://www.manning.com/ibsen/ +.. _Migrating akka-camel module to Akka 2.x: http://skillsmatter.com/podcast/scala/akka-2-x + +Other, more advanced external articles (for version 1) are: + +* `Akka Consumer Actors: New Features and Best Practices `_ +* `Akka Producer Actors: New Features and Best Practices `_ + + + +Introduction +============ + +The akka-camel module allows actors, untyped actors, and typed actors to receive +and send messages over a great variety of protocols and APIs. This section gives +a brief overview of the general ideas behind the akka-camel module, the +remaining sections go into the details. In addition to the native Scala and Java +actor API, actors can now exchange messages with other systems over large number +of protocols and APIs such as HTTP, SOAP, TCP, FTP, SMTP or JMS, to mention a +few. At the moment, approximately 80 protocols and APIs are supported. + +The akka-camel module is based on `Apache Camel`_, a powerful and leight-weight +integration framework for the JVM. For an introduction to Apache Camel you may +want to read this `Apache Camel article`_. Camel comes with a +large number of `components`_ that provide bindings to different protocols and +APIs. The `camel-extra`_ project provides further components. + +.. _Apache Camel: http://camel.apache.org/ +.. _Apache Camel article: http://architects.dzone.com/articles/apache-camel-integration +.. _components: http://camel.apache.org/components.html +.. _camel-extra: http://code.google.com/p/camel-extra/ + +Usage of Camel's integration components in Akka is essentially a +one-liner. Here's an example. + +.. code-block:: scala + + import akka.actor.Actor + import akka.actor.Actor._ + import akka.camel.{CamelMessage, Consumer} + + class MyActor extends Consumer { + def endpointUri = "mina:tcp://localhost:6200?textline=true" + + def receive = { + case msg: CamelMessage => { /* ... */} + case _ => { /* ... */} + } + } + + // start and expose actor via tcp + val sys = ActorSystem("camel") + val myActor = sys.actorOf(Props[MyActor]) + +The above example exposes an actor over a tcp endpoint on port 6200 via Apache +Camel's `Mina component`_. The actor implements the endpointUri method to define +an endpoint from which it can receive messages. After starting the actor, tcp +clients can immediately send messages to and receive responses from that +actor. If the message exchange should go over HTTP (via Camel's `Jetty +component`_), only the actor's endpointUri method must be changed. + +.. _Mina component: http://camel.apache.org/mina.html +.. _Jetty component: http://camel.apache.org/jetty.html + +.. code-block:: scala + + class MyActor extends Consumer { + def endpointUri = "jetty:http://localhost:8877/example" + + def receive = { + case msg: CamelMessage => { /* ... */} + case _ => { /* ... */} + } + } + +Actors can also trigger message exchanges with external systems i.e. produce to +Camel endpoints. + +.. code-block:: scala + + import akka.actor.Actor + import akka.camel.{Producer, Oneway} + + class MyActor extends Actor with Producer with Oneway { + def endpointUri = "jms:queue:example" + } + +In the above example, any message sent to this actor will be added (produced) to +the example JMS queue. Producer actors may choose from the same set of Camel +components as Consumer actors do. + +The number of Camel components is constantly increasing. The akka-camel module +can support these in a plug-and-play manner. Just add them to your application's +classpath, define a component-specific endpoint URI and use it to exchange +messages over the component-specific protocols or APIs. This is possible because +Camel components bind protocol-specific message formats to a Camel-specific +`normalized message format`__. The normalized message format hides +protocol-specific details from Akka and makes it therefore very easy to support +a large number of protocols through a uniform Camel component interface. The +akka-camel module further converts mutable Camel messages into `immutable +representations`__ which are used by Consumer and Producer actors for pattern +matching, transformation, serialization or storage. \ No newline at end of file From f132d193f182f5c5d9918826986eb1cdc5ed48d3 Mon Sep 17 00:00:00 2001 From: Piotr Gabryanczyk Date: Sun, 22 Apr 2012 17:02:06 +0100 Subject: [PATCH 35/36] Extracting code to scala files --- akka-docs/scala/camel.rst | 47 ++++--------------- .../code/akka/docs/camel/Introduction.scala | 44 +++++++++++++++++ 2 files changed, 52 insertions(+), 39 deletions(-) create mode 100644 akka-docs/scala/code/akka/docs/camel/Introduction.scala diff --git a/akka-docs/scala/camel.rst b/akka-docs/scala/camel.rst index a173f542c0..a728ab186e 100644 --- a/akka-docs/scala/camel.rst +++ b/akka-docs/scala/camel.rst @@ -46,24 +46,7 @@ APIs. The `camel-extra`_ project provides further components. Usage of Camel's integration components in Akka is essentially a one-liner. Here's an example. -.. code-block:: scala - - import akka.actor.Actor - import akka.actor.Actor._ - import akka.camel.{CamelMessage, Consumer} - - class MyActor extends Consumer { - def endpointUri = "mina:tcp://localhost:6200?textline=true" - - def receive = { - case msg: CamelMessage => { /* ... */} - case _ => { /* ... */} - } - } - - // start and expose actor via tcp - val sys = ActorSystem("camel") - val myActor = sys.actorOf(Props[MyActor]) +.. includecode:: code/akka/docs/camel/Introduction.scala#Consumer-mina The above example exposes an actor over a tcp endpoint on port 6200 via Apache Camel's `Mina component`_. The actor implements the endpointUri method to define @@ -75,28 +58,12 @@ component`_), only the actor's endpointUri method must be changed. .. _Mina component: http://camel.apache.org/mina.html .. _Jetty component: http://camel.apache.org/jetty.html -.. code-block:: scala - - class MyActor extends Consumer { - def endpointUri = "jetty:http://localhost:8877/example" - - def receive = { - case msg: CamelMessage => { /* ... */} - case _ => { /* ... */} - } - } +.. includecode:: code/akka/docs/camel/Introduction.scala#Consumer Actors can also trigger message exchanges with external systems i.e. produce to Camel endpoints. -.. code-block:: scala - - import akka.actor.Actor - import akka.camel.{Producer, Oneway} - - class MyActor extends Actor with Producer with Oneway { - def endpointUri = "jms:queue:example" - } +.. includecode:: code/akka/docs/camel/Introduction.scala#Producer In the above example, any message sent to this actor will be added (produced) to the example JMS queue. Producer actors may choose from the same set of Camel @@ -110,6 +77,8 @@ Camel components bind protocol-specific message formats to a Camel-specific `normalized message format`__. The normalized message format hides protocol-specific details from Akka and makes it therefore very easy to support a large number of protocols through a uniform Camel component interface. The -akka-camel module further converts mutable Camel messages into `immutable -representations`__ which are used by Consumer and Producer actors for pattern -matching, transformation, serialization or storage. \ No newline at end of file +akka-camel module further converts mutable Camel messages into immutable +representations which are used by Consumer and Producer actors for pattern +matching, transformation, serialization or storage. + +__ https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/main/java/org/apache/camel/Message.java diff --git a/akka-docs/scala/code/akka/docs/camel/Introduction.scala b/akka-docs/scala/code/akka/docs/camel/Introduction.scala new file mode 100644 index 0000000000..12a29ef72c --- /dev/null +++ b/akka-docs/scala/code/akka/docs/camel/Introduction.scala @@ -0,0 +1,44 @@ +package akka.docs.camel + +import akka.actor._ +import akka.camel._ + +//#Consumer-mina +import akka.actor.Actor +import akka.actor.Actor._ +import akka.camel.{CamelMessage, Consumer} + +class MyActor extends Consumer { + def endpointUri = "mina:tcp://localhost:6200?textline=true" + + def receive = { + case msg: CamelMessage => { /* ... */} + case _ => { /* ... */} + } +} + +// start and expose actor via tcp +val sys = ActorSystem("camel") +val myActor = sys.actorOf(Props[MyActor]) +//#Consumer-mina + + +//#Consumer +class MyActor extends Consumer { + def endpointUri = "jetty:http://localhost:8877/example" + + def receive = { + case msg: CamelMessage => { /* ... */} + case _ => { /* ... */} + } +} +//#Consumer + +//#Producer +import akka.actor.Actor +import akka.camel.{Producer, Oneway} + +class MyActor extends Actor with Producer with Oneway { + def endpointUri = "jms:queue:example" +} +//#Producer \ No newline at end of file From 4b1e6cdfcebf5980b38a83f963c777e1c2f711b6 Mon Sep 17 00:00:00 2001 From: Piotr Gabryanczyk Date: Mon, 23 Apr 2012 10:18:03 +0100 Subject: [PATCH 36/36] removed info about typed actors --- akka-docs/scala/camel.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-docs/scala/camel.rst b/akka-docs/scala/camel.rst index a728ab186e..5475eb0a27 100644 --- a/akka-docs/scala/camel.rst +++ b/akka-docs/scala/camel.rst @@ -24,7 +24,7 @@ Other, more advanced external articles (for version 1) are: Introduction ============ -The akka-camel module allows actors, untyped actors, and typed actors to receive +The akka-camel module allows actors to receive and send messages over a great variety of protocols and APIs. This section gives a brief overview of the general ideas behind the akka-camel module, the remaining sections go into the details. In addition to the native Scala and Java