From e017aeef0826fc99722b74551f336d1751f0e884 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 10 Feb 2012 16:02:37 +0100 Subject: [PATCH 1/6] Replace akka.actor.timeout with specfic settings. See #1808 * ActorTimeout (akka.actor.timeout) was used to all sorts of things. * TestKit default-timeout * TypedActor timeout for non void methods * Transactor coordinated-timeout * ZeroMQ new-socket-timeout * And in various tests --- .../scala/akka/actor/GlobalActorSystem.scala | 9 +- .../main/scala/akka/dispatch/OldFuture.scala | 4 +- .../main/scala/akka/migration/package.scala | 2 +- .../java/akka/dispatch/JavaFutureTests.java | 31 +- .../scala/akka/actor/TypedActorSpec.scala | 2 +- .../src/test/scala/akka/pattern/AskSpec.scala | 4 +- akka-actor/src/main/resources/reference.conf | 17 +- .../main/scala/akka/actor/ActorSystem.scala | 1 - .../main/scala/akka/actor/TypedActor.scala | 13 +- akka-docs/general/configuration.rst | 18 ++ .../akka/docs/future/FutureDocTestBase.java | 265 +++++++++--------- .../code/akka/docs/jrouting/ParentActor.java | 6 +- .../code/akka/docs/future/FutureDocSpec.scala | 17 +- .../akka/docs/routing/RouterTypeExample.scala | 3 +- .../akka/remote/RemoteCommunicationSpec.scala | 4 +- .../src/main/resources/reference.conf | 3 + .../scala/akka/testkit/TestActorRef.scala | 2 +- .../src/main/scala/akka/testkit/TestKit.scala | 3 +- .../scala/akka/testkit/TestKitExtension.scala | 2 + .../test/scala/akka/testkit/AkkaSpec.scala | 2 +- .../src/main/resources/reference.conf | 13 + .../scala/akka/transactor/Transactor.scala | 4 +- .../akka/transactor/TransactorExtension.scala | 25 ++ .../akka/transactor/UntypedTransactor.scala | 4 +- akka-zeromq/src/main/resources/reference.conf | 9 +- .../akka/zeromq/ConcurrentSocketActor.scala | 5 +- .../scala/akka/zeromq/ZeroMQExtension.scala | 8 +- 27 files changed, 283 insertions(+), 193 deletions(-) create mode 100644 akka-transactor/src/main/resources/reference.conf create mode 100644 akka-transactor/src/main/scala/akka/transactor/TransactorExtension.scala diff --git a/akka-actor-migration/src/main/scala/akka/actor/GlobalActorSystem.scala b/akka-actor-migration/src/main/scala/akka/actor/GlobalActorSystem.scala index 694dd5d547..e08883c6ed 100644 --- a/akka-actor-migration/src/main/scala/akka/actor/GlobalActorSystem.scala +++ b/akka-actor-migration/src/main/scala/akka/actor/GlobalActorSystem.scala @@ -4,14 +4,21 @@ package akka.actor import java.io.File - import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigParseOptions +import akka.util.Timeout +import akka.util.duration._ @deprecated("use ActorSystem instead", "2.0") object GlobalActorSystem extends ActorSystemImpl("GlobalSystem", OldConfigurationLoader.defaultConfig) { start() + + /** + * Timeout used in `OldFuture.get` and default implicit ask timeout. + * Hard coded since the migration kit is not intended to be used for production anyway. + */ + val AwaitTimeout = Timeout(5 seconds) } /** diff --git a/akka-actor-migration/src/main/scala/akka/dispatch/OldFuture.scala b/akka-actor-migration/src/main/scala/akka/dispatch/OldFuture.scala index 89941cd066..60029e256a 100644 --- a/akka-actor-migration/src/main/scala/akka/dispatch/OldFuture.scala +++ b/akka-actor-migration/src/main/scala/akka/dispatch/OldFuture.scala @@ -18,13 +18,13 @@ class OldFuture[T](future: Future[T]) { @deprecated("use akka.dispatch.Await.result instead", "2.0") def get: T = try { - Await.result(future, GlobalActorSystem.settings.ActorTimeout.duration) + Await.result(future, GlobalActorSystem.AwaitTimeout.duration) } catch { case e: TimeoutException ⇒ throw new FutureTimeoutException(e.getMessage, e) } @deprecated("use akka.dispatch.Await.ready instead", "2.0") - def await: Future[T] = await(GlobalActorSystem.settings.ActorTimeout.duration) + def await: Future[T] = await(GlobalActorSystem.AwaitTimeout.duration) @deprecated("use akka.dispatch.Await.ready instead", "2.0") def await(atMost: Duration) = try { diff --git a/akka-actor-migration/src/main/scala/akka/migration/package.scala b/akka-actor-migration/src/main/scala/akka/migration/package.scala index 6b8cad6fe2..469604e464 100644 --- a/akka-actor-migration/src/main/scala/akka/migration/package.scala +++ b/akka-actor-migration/src/main/scala/akka/migration/package.scala @@ -14,7 +14,7 @@ package object migration { implicit def future2OldFuture[T](future: Future[T]): OldFuture[T] = new OldFuture[T](future) - implicit def askTimeout: Timeout = GlobalActorSystem.settings.ActorTimeout + implicit def askTimeout: Timeout = GlobalActorSystem.AwaitTimeout implicit def defaultDispatcher: MessageDispatcher = GlobalActorSystem.dispatcher diff --git a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java index f494fd7d81..b03fe3b5fc 100644 --- a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java +++ b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java @@ -5,6 +5,7 @@ import akka.actor.ActorSystem; import akka.japi.*; import akka.util.Duration; +import akka.testkit.TestKitExtension; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -28,7 +29,7 @@ public class JavaFutureTests { @BeforeClass public static void beforeAll() { system = ActorSystem.create("JavaFutureTests", AkkaSpec.testConf()); - t = system.settings().ActorTimeout(); + t = TestKitExtension.get(system).DefaultTimeout(); } @AfterClass @@ -61,10 +62,10 @@ public class JavaFutureTests { Promise cf = Futures.promise(system.dispatcher()); Future f = cf; f.onSuccess(new OnSuccess() { - public void onSuccess(String result) { - if (result.equals("foo")) - latch.countDown(); - } + public void onSuccess(String result) { + if (result.equals("foo")) + latch.countDown(); + } }); cf.success("foo"); @@ -78,10 +79,10 @@ public class JavaFutureTests { Promise cf = Futures.promise(system.dispatcher()); Future f = cf; f.onFailure(new OnFailure() { - public void onFailure(Throwable t) { - if (t instanceof NullPointerException) - latch.countDown(); - } + public void onFailure(Throwable t) { + if (t instanceof NullPointerException) + latch.countDown(); + } }); Throwable exception = new NullPointerException(); @@ -296,8 +297,10 @@ public class JavaFutureTests { Promise p = Futures.promise(system.dispatcher()); Future f = p.future().recover(new Recover() { public Object recover(Throwable t) throws Throwable { - if (t == fail) return "foo"; - else throw t; + if (t == fail) + return "foo"; + else + throw t; } }); Duration d = Duration.create(1, TimeUnit.SECONDS); @@ -311,8 +314,10 @@ public class JavaFutureTests { Promise p = Futures.promise(system.dispatcher()); Future f = p.future().recoverWith(new Recover>() { public Future recover(Throwable t) throws Throwable { - if (t == fail) return Futures.successful("foo", system.dispatcher()).future(); - else throw t; + if (t == fail) + return Futures. successful("foo", system.dispatcher()).future(); + else + throw t; } }); Duration d = Duration.create(1, TimeUnit.SECONDS); diff --git a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala index 4e8bc4d7b4..5a9fab6c63 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala @@ -113,7 +113,7 @@ object TypedActorSpec { } def futureComposePigdogFrom(foo: Foo): Future[String] = { - implicit val timeout = TypedActor.context.system.settings.ActorTimeout + implicit val timeout = TypedActor(TypedActor.context.system).DefaultReturnTimeout foo.futurePigdog(500).map(_.toUpperCase) } diff --git a/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala index ecb9690594..f3c36665e8 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala @@ -5,8 +5,9 @@ package akka.pattern import akka.testkit.AkkaSpec import akka.util.duration._ +import akka.testkit.DefaultTimeout -class AskSpec extends AkkaSpec { +class AskSpec extends AkkaSpec with DefaultTimeout { "The “ask” pattern" must { @@ -22,7 +23,6 @@ class AskSpec extends AkkaSpec { "return broken promises on EmptyLocalActorRefs" in { val empty = system.actorFor("unknown") - implicit val timeout = system.settings.ActorTimeout val f = empty ? 3.14 f.isCompleted must be(true) f.value.get match { diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index bc52938a7d..62d49d61cd 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -1,6 +1,6 @@ -############################## -# Akka Reference Config File # -############################## +#################################### +# Akka Actor Reference Config File # +#################################### # This the reference config file has all the default settings. # Make your edits/overrides in your application.conf. @@ -50,12 +50,6 @@ akka { # removed from their parents reaper-interval = 5s - # Default timeout for Future based invocations - # - Actor: ask && ? - # - UntypedActor: ask - # - TypedActor: methods with non-void return type - timeout = 5s - # Serializes and deserializes (non-primitive) messages to ensure immutability, # this is only intended for testing. serialize-messages = off @@ -64,6 +58,11 @@ akka { # this is only intended for testing. serialize-creators = off + typed { + # Default timeout for typed actor methods with non-void return type + timeout = 5s + } + deployment { # deployment id pattern - on the format: /parent/child etc. diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index ac77628c2c..76eea494ca 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -70,7 +70,6 @@ object ActorSystem { final val CreationTimeout = Timeout(Duration(getMilliseconds("akka.actor.creation-timeout"), MILLISECONDS)) final val ReaperInterval = Duration(getMilliseconds("akka.actor.reaper-interval"), MILLISECONDS) - final val ActorTimeout = Timeout(Duration(getMilliseconds("akka.actor.timeout"), MILLISECONDS)) final val SerializeAllMessages = getBoolean("akka.actor.serialize-messages") final val SerializeAllCreators = getBoolean("akka.actor.serialize-creators") diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 98329203e9..3d1e0c76f3 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -11,7 +11,9 @@ import java.util.concurrent.atomic.{ AtomicReference ⇒ AtomVar } import akka.serialization.{ Serialization, SerializationExtension } import akka.dispatch._ import java.util.concurrent.TimeoutException +import java.util.concurrent.TimeUnit.MILLISECONDS import java.lang.IllegalStateException +import akka.util.Duration trait TypedActorFactory { @@ -502,7 +504,7 @@ case class TypedProps[T <: AnyRef] protected[TypedProps] ( /** * @return a new TypedProps that will use the specified Timeout for its non-void-returning methods, - * if null is specified, it will use the default ActorTimeout as specified in the configuration. + * if null is specified, it will use the default timeout as specified in the configuration. * * Java API */ @@ -510,7 +512,7 @@ case class TypedProps[T <: AnyRef] protected[TypedProps] ( /** * @return a new TypedProps that will use the specified Timeout for its non-void-returning methods, - * if None is specified, it will use the default ActorTimeout as specified in the configuration. + * if None is specified, it will use the default timeout as specified in the configuration. * * Scala API */ @@ -550,6 +552,11 @@ class TypedActorExtension(system: ExtendedActorSystem) extends TypedActorFactory val serialization = SerializationExtension(system) val settings = system.settings + /** + * Default timeout for typed actor methods with non-void return type + */ + final val DefaultReturnTimeout = Timeout(Duration(settings.config.getMilliseconds("akka.actor.typed.timeout"), MILLISECONDS)) + /** * Retrieves the underlying ActorRef for the supplied TypedActor proxy, or null if none found */ @@ -575,7 +582,7 @@ class TypedActorExtension(system: ExtendedActorSystem) extends TypedActorFactory new TypedActorInvocationHandler( this, actorVar, - if (props.timeout.isDefined) props.timeout.get else this.settings.ActorTimeout)).asInstanceOf[R] + if (props.timeout.isDefined) props.timeout.get else DefaultReturnTimeout)).asInstanceOf[R] proxyVar match { case null ⇒ diff --git a/akka-docs/general/configuration.rst b/akka-docs/general/configuration.rst index 3e2bca240d..649a6abc04 100644 --- a/akka-docs/general/configuration.rst +++ b/akka-docs/general/configuration.rst @@ -75,6 +75,24 @@ akka-testkit .. literalinclude:: ../../akka-testkit/src/main/resources/reference.conf :language: none +akka-transactor +~~~~~~~~~~~~~~~ + +.. literalinclude:: ../../akka-transactor/src/main/resources/reference.conf + :language: none + +akka-agent +~~~~~~~~~~ + +.. literalinclude:: ../../akka-agent/src/main/resources/reference.conf + :language: none + +akka-zeromq +~~~~~~~~~~~ + +.. literalinclude:: ../../akka-zeromq/src/main/resources/reference.conf + :language: none + akka-beanstalk-mailbox ~~~~~~~~~~~~~~~~~~~~~~ diff --git a/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java b/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java index 8fc3b29b4e..e541c925c1 100644 --- a/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java +++ b/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java @@ -78,7 +78,7 @@ public class FutureDocTestBase { ActorRef actor = system.actorOf(new Props(MyActor.class)); String msg = "hello"; //#ask-blocking - Timeout timeout = system.settings().ActorTimeout(); + Timeout timeout = new Timeout(Duration.parse("5 seconds")); Future future = Patterns.ask(actor, msg, timeout); String result = (String) Await.result(future, timeout.duration()); //#ask-blocking @@ -196,19 +196,17 @@ public class FutureDocTestBase { Iterable> listOfFutureInts = source; // now we have a Future[Iterable[Integer]] - Future> futureListOfInts = - sequence(listOfFutureInts, system.dispatcher()); + Future> futureListOfInts = sequence(listOfFutureInts, system.dispatcher()); // Find the sum of the odd numbers - Future futureSum = futureListOfInts.map( - new Mapper, Long>() { - public Long apply(Iterable ints) { - long sum = 0; - for (Integer i : ints) - sum += i; - return sum; - } - }); + Future futureSum = futureListOfInts.map(new Mapper, Long>() { + public Long apply(Iterable ints) { + long sum = 0; + for (Integer i : ints) + sum += i; + return sum; + } + }); long result = Await.result(futureSum, Duration.create(1, SECONDS)); //#sequence @@ -221,20 +219,18 @@ public class FutureDocTestBase { //Just a sequence of Strings Iterable listStrings = Arrays.asList("a", "b", "c"); - Future> futureResult = traverse(listStrings, - new Function>() { - public Future apply(final String r) { - return future(new Callable() { - public String call() { - return r.toUpperCase(); - } - }, system.dispatcher()); - } - }, system.dispatcher()); + Future> futureResult = traverse(listStrings, new Function>() { + public Future apply(final String r) { + return future(new Callable() { + public String call() { + return r.toUpperCase(); + } + }, system.dispatcher()); + } + }, system.dispatcher()); //Returns the sequence of strings as upper case - Iterable result = - Await.result(futureResult, Duration.create(1, SECONDS)); + Iterable result = Await.result(futureResult, Duration.create(1, SECONDS)); assertEquals(Arrays.asList("A", "B", "C"), result); //#traverse } @@ -250,12 +246,11 @@ public class FutureDocTestBase { Iterable> futures = source; //Start value is the empty string - Future resultFuture = fold("", futures, - new Function2() { - public String apply(String r, String t) { - return r + t; //Just concatenate - } - }, system.dispatcher()); + Future resultFuture = fold("", futures, new Function2() { + public String apply(String r, String t) { + return r + t; //Just concatenate + } + }, system.dispatcher()); String result = Await.result(resultFuture, Duration.create(1, SECONDS)); //#fold @@ -272,12 +267,11 @@ public class FutureDocTestBase { //A sequence of Futures, in this case Strings Iterable> futures = source; - Future resultFuture = reduce(futures, - new Function2() { - public Object apply(Object r, String t) { - return r + t; //Just concatenate - } - }, system.dispatcher()); + Future resultFuture = reduce(futures, new Function2() { + public Object apply(Object r, String t) { + return r + t; //Just concatenate + } + }, system.dispatcher()); Object result = Await.result(resultFuture, Duration.create(1, SECONDS)); //#reduce @@ -285,32 +279,35 @@ public class FutureDocTestBase { assertEquals("ab", result); } - @Test public void useSuccessfulAndFailed() { + @Test + public void useSuccessfulAndFailed() { //#successful Future future = Futures.successful("Yay!", system.dispatcher()); //#successful //#failed - Future otherFuture = - Futures.failed(new IllegalArgumentException("Bang!"), system.dispatcher()); + Future otherFuture = Futures.failed(new IllegalArgumentException("Bang!"), system.dispatcher()); //#failed Object result = Await.result(future, Duration.create(1, SECONDS)); - assertEquals("Yay!",result); + assertEquals("Yay!", result); Throwable result2 = Await.result(otherFuture.failed(), Duration.create(1, SECONDS)); - assertEquals("Bang!",result2.getMessage()); + assertEquals("Bang!", result2.getMessage()); } - @Test public void useFilter() { - //#filter + @Test + public void useFilter() { + //#filter Future future1 = Futures.successful(4, system.dispatcher()); - Future successfulFilter = - future1.filter(new Filter() { - public boolean filter(Integer i) { return i % 2 == 0; } - }); + Future successfulFilter = future1.filter(new Filter() { + public boolean filter(Integer i) { + return i % 2 == 0; + } + }); - Future failedFilter = - future1.filter(new Filter() { - public boolean filter(Integer i) { return i % 2 != 0; } - }); + Future failedFilter = future1.filter(new Filter() { + public boolean filter(Integer i) { + return i % 2 != 0; + } + }); //When filter fails, the returned Future will be failed with a scala.MatchError //#filter } @@ -323,138 +320,140 @@ public class FutureDocTestBase { } - @Test public void useAndThen() { + @Test + public void useAndThen() { //#and-then - Future future1 = Futures.successful("value", system.dispatcher()). - andThen(new OnComplete() { - public void onComplete(Throwable failure, String result) { - if (failure != null) sendToIssueTracker(failure); - } + Future future1 = Futures.successful("value", system.dispatcher()).andThen(new OnComplete() { + public void onComplete(Throwable failure, String result) { + if (failure != null) + sendToIssueTracker(failure); + } }).andThen(new OnComplete() { - public void onComplete(Throwable failure, String result) { - if (result != null) sendToTheInternetz(result); - } + public void onComplete(Throwable failure, String result) { + if (result != null) + sendToTheInternetz(result); + } }); //#and-then } - @Test public void useRecover() { + @Test + public void useRecover() { //#recover Future future = future(new Callable() { public Integer call() { return 1 / 0; } }, system.dispatcher()).recover(new Recover() { - public Integer recover(Throwable problem) throws Throwable { - if (problem instanceof ArithmeticException) return 0; - else throw problem; - } + public Integer recover(Throwable problem) throws Throwable { + if (problem instanceof ArithmeticException) + return 0; + else + throw problem; + } }); int result = Await.result(future, Duration.create(1, SECONDS)); assertEquals(result, 0); //#recover } - @Test public void useTryRecover() { + @Test + public void useTryRecover() { //#try-recover Future future = future(new Callable() { public Integer call() { return 1 / 0; } }, system.dispatcher()).recoverWith(new Recover>() { - public Future recover(Throwable problem) throws Throwable { - if (problem instanceof ArithmeticException) { - return future(new Callable() { - public Integer call() { - return 0; - } - }, system.dispatcher()); + public Future recover(Throwable problem) throws Throwable { + if (problem instanceof ArithmeticException) { + return future(new Callable() { + public Integer call() { + return 0; } - else throw problem; - } + }, system.dispatcher()); + } else + throw problem; + } }); int result = Await.result(future, Duration.create(1, SECONDS)); assertEquals(result, 0); //#try-recover } - @Test public void useOnSuccessOnFailureAndOnComplete() { - { + @Test + public void useOnSuccessOnFailureAndOnComplete() { + { Future future = Futures.successful("foo", system.dispatcher()); //#onSuccess future.onSuccess(new OnSuccess() { - public void onSuccess(String result) { - if ("bar" == result) { - //Do something if it resulted in "bar" - } else { - //Do something if it was some other String - } + public void onSuccess(String result) { + if ("bar" == result) { + //Do something if it resulted in "bar" + } else { + //Do something if it was some other String } + } }); //#onSuccess - } - { - Future future = - Futures.failed(new IllegalStateException("OHNOES"), system.dispatcher()); - //#onFailure - future.onFailure( new OnFailure() { + } + { + Future future = Futures.failed(new IllegalStateException("OHNOES"), system.dispatcher()); + //#onFailure + future.onFailure(new OnFailure() { public void onFailure(Throwable failure) { - if (failure instanceof IllegalStateException) { - //Do something if it was this particular failure - } else { - //Do something if it was some other failure - } + if (failure instanceof IllegalStateException) { + //Do something if it was this particular failure + } else { + //Do something if it was some other failure + } } }); //#onFailure - } - { - Future future = Futures.successful("foo", system.dispatcher()); - //#onComplete - future.onComplete(new OnComplete() { - public void onComplete(Throwable failure, String result) { - if (failure != null) { - //We got a failure, handle it here - } else { - // We got a result, do something with it - } - } - }); - //#onComplete - } + } + { + Future future = Futures.successful("foo", system.dispatcher()); + //#onComplete + future.onComplete(new OnComplete() { + public void onComplete(Throwable failure, String result) { + if (failure != null) { + //We got a failure, handle it here + } else { + // We got a result, do something with it + } + } + }); + //#onComplete + } } - @Test public void useOrAndZip(){ + @Test + public void useOrAndZip() { { - //#zip - Future future1 = Futures.successful("foo", system.dispatcher()); - Future future2 = Futures.successful("bar", system.dispatcher()); - Future future3 = - future1.zip(future2).map(new Mapper, String>() { - public String apply(scala.Tuple2 zipped) { - return zipped._1() + " " + zipped._2(); + //#zip + Future future1 = Futures.successful("foo", system.dispatcher()); + Future future2 = Futures.successful("bar", system.dispatcher()); + Future future3 = future1.zip(future2).map(new Mapper, String>() { + public String apply(scala.Tuple2 zipped) { + return zipped._1() + " " + zipped._2(); } - }); + }); - String result = Await.result(future3, Duration.create(1, SECONDS)); - assertEquals("foo bar", result); - //#zip + String result = Await.result(future3, Duration.create(1, SECONDS)); + assertEquals("foo bar", result); + //#zip } { - //#fallback-to - Future future1 = - Futures.failed(new IllegalStateException("OHNOES1"), system.dispatcher()); - Future future2 = - Futures.failed(new IllegalStateException("OHNOES2"), system.dispatcher()); - Future future3 = - Futures.successful("bar", system.dispatcher()); - Future future4 = - future1.fallbackTo(future2).fallbackTo(future3); // Will have "bar" in this case - String result = Await.result(future4, Duration.create(1, SECONDS)); - assertEquals("bar", result); - //#fallback-to + //#fallback-to + Future future1 = Futures.failed(new IllegalStateException("OHNOES1"), system.dispatcher()); + Future future2 = Futures.failed(new IllegalStateException("OHNOES2"), system.dispatcher()); + Future future3 = Futures.successful("bar", system.dispatcher()); + Future future4 = future1.fallbackTo(future2).fallbackTo(future3); // Will have "bar" in this case + String result = Await.result(future4, Duration.create(1, SECONDS)); + assertEquals("bar", result); + //#fallback-to } } diff --git a/akka-docs/java/code/akka/docs/jrouting/ParentActor.java b/akka-docs/java/code/akka/docs/jrouting/ParentActor.java index 1119559489..cf1e2b9cee 100644 --- a/akka-docs/java/code/akka/docs/jrouting/ParentActor.java +++ b/akka-docs/java/code/akka/docs/jrouting/ParentActor.java @@ -54,9 +54,9 @@ public class ParentActor extends UntypedActor { ActorRef scatterGatherFirstCompletedRouter = getContext().actorOf( new Props(FibonacciActor.class).withRouter(new ScatterGatherFirstCompletedRouter(5, Duration .parse("2 seconds"))), "router"); - Timeout timeout = getContext().system().settings().ActorTimeout(); - Future futureResult = akka.pattern.Patterns.ask( - scatterGatherFirstCompletedRouter, new FibonacciActor.FibonacciNumber(10), timeout); + Timeout timeout = new Timeout(Duration.parse("5 seconds")); + Future futureResult = akka.pattern.Patterns.ask(scatterGatherFirstCompletedRouter, + new FibonacciActor.FibonacciNumber(10), timeout); int result = (Integer) Await.result(futureResult, timeout.duration()); //#scatterGatherFirstCompletedRouter System.out.println(String.format("The result of calculating Fibonacci for 10 is %d", result)); diff --git a/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala b/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala index 098fe873ad..3d1ca946a7 100644 --- a/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala @@ -11,6 +11,7 @@ import akka.actor.Props import akka.actor.Status.Failure import akka.dispatch.Future import akka.dispatch.Await +import akka.util.Timeout import akka.util.duration._ import akka.dispatch.Promise import java.lang.IllegalStateException @@ -46,8 +47,10 @@ class FutureDocSpec extends AkkaSpec { //#ask-blocking import akka.dispatch.Await import akka.pattern.ask + import akka.util.Timeout + import akka.util.duration._ - implicit val timeout = system.settings.ActorTimeout + implicit val timeout = Timeout(5 seconds) val future = actor ? msg // enabled by the “ask” import val result = Await.result(future, timeout.duration).asInstanceOf[String] //#ask-blocking @@ -57,7 +60,7 @@ class FutureDocSpec extends AkkaSpec { "demonstrate usage of mapTo" in { val actor = system.actorOf(Props[MyActor]) val msg = "hello" - implicit val timeout = system.settings.ActorTimeout + implicit val timeout = Timeout(5 seconds) //#map-to import akka.dispatch.Future import akka.pattern.ask @@ -164,7 +167,7 @@ class FutureDocSpec extends AkkaSpec { val actor3 = system.actorOf(Props[MyActor]) val msg1 = 1 val msg2 = 2 - implicit val timeout = system.settings.ActorTimeout + implicit val timeout = Timeout(5 seconds) import akka.dispatch.Await import akka.pattern.ask //#composing-wrong @@ -188,7 +191,7 @@ class FutureDocSpec extends AkkaSpec { val actor3 = system.actorOf(Props[MyActor]) val msg1 = 1 val msg2 = 2 - implicit val timeout = system.settings.ActorTimeout + implicit val timeout = Timeout(5 seconds) import akka.dispatch.Await import akka.pattern.ask //#composing @@ -208,7 +211,7 @@ class FutureDocSpec extends AkkaSpec { } "demonstrate usage of sequence with actors" in { - implicit val timeout = system.settings.ActorTimeout + implicit val timeout = Timeout(5 seconds) val oddActor = system.actorOf(Props[OddActor]) //#sequence-ask // oddActor returns odd numbers sequentially from 1 as a List[Future[Int]] @@ -256,7 +259,7 @@ class FutureDocSpec extends AkkaSpec { } "demonstrate usage of recover" in { - implicit val timeout = system.settings.ActorTimeout + implicit val timeout = Timeout(5 seconds) val actor = system.actorOf(Props[MyActor]) val msg1 = -1 //#recover @@ -268,7 +271,7 @@ class FutureDocSpec extends AkkaSpec { } "demonstrate usage of recoverWith" in { - implicit val timeout = system.settings.ActorTimeout + implicit val timeout = Timeout(5 seconds) val actor = system.actorOf(Props[MyActor]) val msg1 = -1 //#try-recover diff --git a/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala b/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala index 2f0a1e634c..6ec475a874 100644 --- a/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala +++ b/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala @@ -7,6 +7,7 @@ import akka.routing.{ ScatterGatherFirstCompletedRouter, BroadcastRouter, Random import annotation.tailrec import akka.actor.{ Props, Actor } import akka.util.duration._ +import akka.util.Timeout import akka.dispatch.Await import akka.pattern.ask import akka.routing.SmallestMailboxRouter @@ -80,7 +81,7 @@ class ParentActor extends Actor { val scatterGatherFirstCompletedRouter = context.actorOf( Props[FibonacciActor].withRouter(ScatterGatherFirstCompletedRouter( nrOfInstances = 5, within = 2 seconds)), "router") - implicit val timeout = context.system.settings.ActorTimeout + implicit val timeout = Timeout(5 seconds) val futureResult = scatterGatherFirstCompletedRouter ? FibonacciNumber(10) val result = Await.result(futureResult, timeout.duration) //#scatterGatherFirstCompletedRouter diff --git a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala index 88d80d6d81..73f8a98030 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala @@ -44,7 +44,7 @@ akka { /looker/child/grandchild.remote = "akka://RemoteCommunicationSpec@localhost:12345" } } -""") with ImplicitSender { +""") with ImplicitSender with DefaultTimeout { import RemoteCommunicationSpec._ @@ -59,8 +59,6 @@ akka { val here = system.actorFor("akka://remote_sys@localhost:12346/user/echo") - implicit val timeout = system.settings.ActorTimeout - override def atTermination() { other.shutdown() } diff --git a/akka-testkit/src/main/resources/reference.conf b/akka-testkit/src/main/resources/reference.conf index e4ae685f4d..5e70df9403 100644 --- a/akka-testkit/src/main/resources/reference.conf +++ b/akka-testkit/src/main/resources/reference.conf @@ -18,6 +18,9 @@ akka { # duration to wait in expectMsg and friends outside of within() block by default single-expect-default = 3s + # The timeout that is added as an implicit by DefaultTimeout trait + default-timeout = 5s + calling-thread-dispatcher { type = akka.testkit.CallingThreadDispatcherConfigurator } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index 8769fdda51..8a2f61bf76 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -68,7 +68,7 @@ class TestActorRef[T <: Actor]( if (isTerminated) throw new IllegalActorStateException("underlying actor is terminated") underlying.actor.asInstanceOf[T] match { case null ⇒ - val t = underlying.system.settings.ActorTimeout + val t = TestKitExtension(_system).DefaultTimeout Await.result(this.?(InternalGetActor)(t), t.duration).asInstanceOf[T] case ref ⇒ ref } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index 4155ea662d..bdfab36ede 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -11,6 +11,7 @@ import java.util.concurrent.{ BlockingDeque, LinkedBlockingDeque, TimeUnit, atom import atomic.AtomicInteger import scala.annotation.tailrec import akka.actor.ActorSystem +import akka.util.Timeout object TestActor { type Ignore = Option[PartialFunction[AnyRef, Boolean]] @@ -644,5 +645,5 @@ trait ImplicitSender { this: TestKit ⇒ } trait DefaultTimeout { this: TestKit ⇒ - implicit val timeout = system.settings.ActorTimeout + implicit val timeout: Timeout = testKitSettings.DefaultTimeout } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKitExtension.scala b/akka-testkit/src/main/scala/akka/testkit/TestKitExtension.scala index ada5a4fd30..71ba8d0eac 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKitExtension.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKitExtension.scala @@ -5,6 +5,7 @@ package akka.testkit import com.typesafe.config.Config import akka.util.Duration +import akka.util.Timeout import java.util.concurrent.TimeUnit.MILLISECONDS import akka.actor.{ ExtensionId, ActorSystem, Extension, ExtendedActorSystem } @@ -20,4 +21,5 @@ class TestKitSettings(val config: Config) extends Extension { val TestTimeFactor = getDouble("akka.test.timefactor") val SingleExpectDefaultTimeout = Duration(getMilliseconds("akka.test.single-expect-default"), MILLISECONDS) val TestEventFilterLeeway = Duration(getMilliseconds("akka.test.filter-leeway"), MILLISECONDS) + val DefaultTimeout = Timeout(Duration(getMilliseconds("akka.test.default-timeout"), MILLISECONDS)) } \ No newline at end of file diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index 172bdc230f..95ce267320 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -122,7 +122,7 @@ class AkkaSpecSpec extends WordSpec with MustMatchers { try { var locker = Seq.empty[DeadLetter] - implicit val timeout = system.settings.ActorTimeout + implicit val timeout = TestKitExtension(system).DefaultTimeout implicit val davyJones = otherSystem.actorOf(Props(new Actor { def receive = { case m: DeadLetter ⇒ locker :+= m diff --git a/akka-transactor/src/main/resources/reference.conf b/akka-transactor/src/main/resources/reference.conf new file mode 100644 index 0000000000..d91a50db33 --- /dev/null +++ b/akka-transactor/src/main/resources/reference.conf @@ -0,0 +1,13 @@ +######################################### +# Akka Transactor Reference Config File # +######################################### + +# This the reference config file has all the default settings. +# Make your edits/overrides in your application.conf. + +akka { + transactor { + # The timeout used for coordinated transactions across actors + coordinated-timeout = 5s + } +} diff --git a/akka-transactor/src/main/scala/akka/transactor/Transactor.scala b/akka-transactor/src/main/scala/akka/transactor/Transactor.scala index 627a5ab249..6e390a6623 100644 --- a/akka-transactor/src/main/scala/akka/transactor/Transactor.scala +++ b/akka-transactor/src/main/scala/akka/transactor/Transactor.scala @@ -93,6 +93,8 @@ case class SendTo(actor: ActorRef, message: Option[Any] = None) * @see [[akka.transactor.Coordinated]] for more information about the underlying mechanism */ trait Transactor extends Actor { + private val settings = TransactorExtension(context.system) + /** * Implement a general pattern for using coordinated transactions. */ @@ -108,7 +110,7 @@ trait Transactor extends Actor { } case message ⇒ { if (normally.isDefinedAt(message)) normally(message) - else receive(Coordinated(message)(context.system.settings.ActorTimeout)) + else receive(Coordinated(message)(settings.CoordinatedTimeout)) } } diff --git a/akka-transactor/src/main/scala/akka/transactor/TransactorExtension.scala b/akka-transactor/src/main/scala/akka/transactor/TransactorExtension.scala new file mode 100644 index 0000000000..96aea8904c --- /dev/null +++ b/akka-transactor/src/main/scala/akka/transactor/TransactorExtension.scala @@ -0,0 +1,25 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.transactor + +import akka.actor.{ ActorSystem, ExtensionId, ExtensionIdProvider, ExtendedActorSystem } +import akka.actor.Extension +import com.typesafe.config.Config +import akka.util.Timeout +import akka.util.Duration +import java.util.concurrent.TimeUnit.MILLISECONDS + +/** + * TransactorExtension is an Akka Extension to hold settings for transactors. + */ +object TransactorExtension extends ExtensionId[TransactorSettings] with ExtensionIdProvider { + override def get(system: ActorSystem): TransactorSettings = super.get(system) + override def lookup = TransactorExtension + override def createExtension(system: ExtendedActorSystem): TransactorSettings = new TransactorSettings(system.settings.config) +} + +class TransactorSettings(val config: Config) extends Extension { + import config._ + val CoordinatedTimeout = Timeout(Duration(getMilliseconds("akka.transactor.coordinated-timeout"), MILLISECONDS)) +} \ No newline at end of file diff --git a/akka-transactor/src/main/scala/akka/transactor/UntypedTransactor.scala b/akka-transactor/src/main/scala/akka/transactor/UntypedTransactor.scala index 0ffbd5e65c..353695fd73 100644 --- a/akka-transactor/src/main/scala/akka/transactor/UntypedTransactor.scala +++ b/akka-transactor/src/main/scala/akka/transactor/UntypedTransactor.scala @@ -12,6 +12,8 @@ import java.util.{ Set ⇒ JSet } * An UntypedActor version of transactor for using from Java. */ abstract class UntypedTransactor extends UntypedActor { + private val settings = TransactorExtension(context.system) + /** * Implement a general pattern for using coordinated transactions. */ @@ -29,7 +31,7 @@ abstract class UntypedTransactor extends UntypedActor { } case message ⇒ { val normal = normally(message) - if (!normal) onReceive(Coordinated(message)(context.system.settings.ActorTimeout)) + if (!normal) onReceive(Coordinated(message)(settings.CoordinatedTimeout)) } } } diff --git a/akka-zeromq/src/main/resources/reference.conf b/akka-zeromq/src/main/resources/reference.conf index cfb5756156..b94a442c78 100644 --- a/akka-zeromq/src/main/resources/reference.conf +++ b/akka-zeromq/src/main/resources/reference.conf @@ -1,6 +1,6 @@ -############################## -# Akka Reference Config File # -############################## +##################################### +# Akka ZeroMQ Reference Config File # +##################################### # This the reference config file has all the default settings. # Make your edits/overrides in your application.conf. @@ -12,6 +12,9 @@ akka { # The default timeout for a poll on the actual zeromq socket. poll-timeout = 100ms + # Timeout for creating a new socket + new-socket-timeout = 5s + socket-dispatcher { # A zeromq socket needs to be pinned to the thread that created it. # Changing this value results in weird errors and race conditions within zeromq diff --git a/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala b/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala index 254a097d80..82a07d7aa3 100644 --- a/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala +++ b/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala @@ -187,12 +187,9 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A fromConfig getOrElse context.system.dispatcher } - private val defaultPollTimeout = - Duration(context.system.settings.config.getMilliseconds("akka.zeromq.poll-timeout"), TimeUnit.MILLISECONDS) - private val pollTimeout = { val fromConfig = params collectFirst { case PollTimeoutDuration(duration) ⇒ duration } - fromConfig getOrElse defaultPollTimeout + fromConfig getOrElse ZeroMQExtension(context.system).DefaultPollTimeout } private def newEventLoop: Option[Promise[PollLifeCycle]] = { diff --git a/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala b/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala index 7ae178291f..25b35e4644 100644 --- a/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala +++ b/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala @@ -7,6 +7,9 @@ import org.zeromq.{ ZMQ ⇒ JZMQ } import akka.actor._ import akka.dispatch.{ Await } import akka.pattern.ask +import akka.util.Duration +import java.util.concurrent.TimeUnit +import akka.util.Timeout /** * A Model to represent a version of the zeromq library @@ -43,6 +46,9 @@ object ZeroMQExtension extends ExtensionId[ZeroMQExtension] with ExtensionIdProv */ class ZeroMQExtension(system: ActorSystem) extends Extension { + val DefaultPollTimeout = Duration(system.settings.config.getMilliseconds("akka.zeromq.poll-timeout"), TimeUnit.MILLISECONDS) + val NewSocketTimeout = Timeout(Duration(system.settings.config.getMilliseconds("akka.zeromq.new-socket-timeout"), TimeUnit.MILLISECONDS)) + /** * The version of the ZeroMQ library * @return a [[akka.zeromq.ZeroMQVersion]] @@ -136,7 +142,7 @@ class ZeroMQExtension(system: ActorSystem) extends Extension { * @return the [[akka.actor.ActorRef]] */ def newSocket(socketParameters: SocketOption*): ActorRef = { - implicit val timeout = system.settings.ActorTimeout + implicit val timeout = NewSocketTimeout val req = (zeromqGuardian ? newSocketProps(socketParameters: _*)).mapTo[ActorRef] Await.result(req, timeout.duration) } From be708012899b9014678b3efc928960b0a5fb07d7 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 10 Feb 2012 17:14:05 +0100 Subject: [PATCH 2/6] DOC: Corrections for the MultiJVM doc. See #1801 --- akka-docs/dev/multi-jvm-testing.rst | 46 ++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/akka-docs/dev/multi-jvm-testing.rst b/akka-docs/dev/multi-jvm-testing.rst index 85b8ae5027..18154b6c27 100644 --- a/akka-docs/dev/multi-jvm-testing.rst +++ b/akka-docs/dev/multi-jvm-testing.rst @@ -18,28 +18,46 @@ The multi-JVM testing is an sbt plugin that you can find here: http://github.com/typesafehub/sbt-multi-jvm -You can add it as a plugin by adding the following to your plugins/build.sbt:: +You can add it as a plugin by adding the following to your project/plugins.sbt:: resolvers += Classpaths.typesafeResolver addSbtPlugin("com.typesafe.sbtmultijvm" % "sbt-multi-jvm" % "0.1.9") -You can then add multi-JVM testing to a project by including the ``MultiJvm`` +You can then add multi-JVM testing to ``project/Build.scala`` by including the ``MultiJvm`` settings and config. For example, here is how the akka-remote project adds multi-JVM testing:: - import MultiJvmPlugin.{ MultiJvm, extraOptions } + import sbt._ + import Keys._ + import com.typesafe.sbtmultijvm.MultiJvmPlugin + import com.typesafe.sbtmultijvm.MultiJvmPlugin.{ MultiJvm, extraOptions } - lazy val cluster = Project( - id = "akka-remote", - base = file("akka-remote"), - settings = defaultSettings ++ MultiJvmPlugin.settings ++ Seq( - extraOptions in MultiJvm <<= (sourceDirectory in MultiJvm) { src => - (name: String) => (src ** (name + ".conf")).get.headOption.map("-Dconfig.file=" + _.absolutePath).toSeq - }, - test in Test <<= (test in Test) dependsOn (test in MultiJvm) + object AkkaBuild extends Build { + + lazy val remote = Project( + id = "akka-remote", + base = file("akka-remote"), + settings = defaultSettings ++ MultiJvmPlugin.settings ++ Seq( + extraOptions in MultiJvm <<= (sourceDirectory in MultiJvm) { src => + (name: String) => (src ** (name + ".conf")).get.headOption.map("-Dconfig.file=" + _.absolutePath).toSeq + }, + test in Test <<= (test in Test) dependsOn (test in MultiJvm) + ) + ) configs (MultiJvm) + + lazy val buildSettings = Defaults.defaultSettings ++ Seq( + organization := "com.typesafe.akka", + version := "2.0-SNAPSHOT", + scalaVersion := "2.9.1", + crossPaths := false ) - ) configs (MultiJvm) + + lazy val defaultSettings = buildSettings ++ Seq( + resolvers += "Typesafe Repo" at "http://repo.typesafe.com/typesafe/releases/" + ) + + } You can specify JVM options for the forked JVMs:: @@ -87,8 +105,8 @@ options after the test names and ``--``. For example: Creating application tests ========================== -The tests are discovered, and combined, through a naming convention. A test is -named with the following pattern: +The tests are discovered, and combined, through a naming convention. MultiJvm tests are +located in ``src/multi-jvm/scala`` directory. A test is named with the following pattern: .. code-block:: none From 49f101d1862bc550d8b0114e50bafd0045b646db Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sun, 12 Feb 2012 19:20:00 +0100 Subject: [PATCH 3/6] #1813 - Adding clarification regarding JMM and Futures --- akka-docs/general/jmm.rst | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/akka-docs/general/jmm.rst b/akka-docs/general/jmm.rst index 7d806f9ac8..3fe94d89db 100644 --- a/akka-docs/general/jmm.rst +++ b/akka-docs/general/jmm.rst @@ -45,6 +45,18 @@ To prevent visibility and reordering problems on actors, Akka guarantees the fol Both rules only apply for the same actor instance and are not valid if different actors are used. +Futures and the Java Memory Model +--------------------------------- + +The completion of a Future "happens before" the invocation of any callbacks registered to it are executed. + +We recommend not to close over non-final fields (final in Java and val in Scala), and if you *do* choose to close over +non-final fields, they must be marked *volatile* in order for the current value of the field to be visible to the callback. + +If you close over a reference, you must also ensure that the instance that is referred to is thread safe. +We highly recommend staying away from objects that use locking, since it can introduce performance problems and in the worst case, deadlocks. +Such are the perils of synchronized. + STM and the Java Memory Model ----------------------------- Akka's Software Transactional Memory (STM) also provides a "happens before" rule: From 1e879b6d03b9d4fb0e66f9d9da3c4d21311edebd Mon Sep 17 00:00:00 2001 From: Henrik Engstrom Date: Sun, 12 Feb 2012 23:09:47 +0100 Subject: [PATCH 4/6] Added HTTP docs with a sample application. See #1538 --- akka-docs/modules/code/Global.scala | 6 + .../akka/docs/http/PlayMiniApplication.scala | 128 ++++++++++++ akka-docs/modules/http.rst | 192 +++++++++++++++++- project/AkkaBuild.scala | 4 +- 4 files changed, 326 insertions(+), 4 deletions(-) create mode 100644 akka-docs/modules/code/Global.scala create mode 100644 akka-docs/modules/code/akka/docs/http/PlayMiniApplication.scala diff --git a/akka-docs/modules/code/Global.scala b/akka-docs/modules/code/Global.scala new file mode 100644 index 0000000000..021e30d5b1 --- /dev/null +++ b/akka-docs/modules/code/Global.scala @@ -0,0 +1,6 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +//#global +object Global extends com.typesafe.play.mini.Setup(akka.docs.http.PlayMiniApplication) +//#global \ No newline at end of file diff --git a/akka-docs/modules/code/akka/docs/http/PlayMiniApplication.scala b/akka-docs/modules/code/akka/docs/http/PlayMiniApplication.scala new file mode 100644 index 0000000000..15c0de0ac5 --- /dev/null +++ b/akka-docs/modules/code/akka/docs/http/PlayMiniApplication.scala @@ -0,0 +1,128 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.docs.http + +//#imports +import com.typesafe.play.mini.{ POST, GET, Path, Application } +import play.api.mvc.{ Action, AsyncResult } +import play.api.mvc.Results._ +import play.api.libs.concurrent._ +import play.api.data._ +import play.api.data.Forms._ +import akka.pattern.ask +import akka.util.Timeout +import akka.util.duration._ +import akka.actor.{ ActorSystem, Props, Actor } +import scala.collection.mutable.{ Map ⇒ MutableMap } +//#imports + +//#playMiniDefinition +object PlayMiniApplication extends Application { + //#playMiniDefinition + private val system = ActorSystem("sample") + //#regexURI + private final val StatementPattern = """/account/statement/(\w+)""".r + //#regexURI + private lazy val accountActor = system.actorOf(Props[AccountActor]) + implicit val timeout = Timeout(1000 milliseconds) + + //#route + def route = { + //#routeLogic + //#simpleGET + case GET(Path("/ping")) ⇒ Action { + Ok("Pong @ " + System.currentTimeMillis) + } + //#simpleGET + //#regexGET + case GET(Path(StatementPattern(accountId))) ⇒ Action { + AsyncResult { + //#innerRegexGET + (accountActor ask Status(accountId)).mapTo[Int].asPromise.map { r ⇒ + if (r >= 0) Ok("Account total: " + r) + else BadRequest("Unknown account: " + accountId) + } + //#innerRegexGET + } + } + //#regexGET + //#asyncDepositPOST + case POST(Path("/account/deposit")) ⇒ Action { implicit request ⇒ + //#formAsyncDepositPOST + val (accountId, amount) = commonForm.bindFromRequest.get + //#formAsyncDepositPOST + AsyncResult { + (accountActor ask Deposit(accountId, amount)).mapTo[Int].asPromise.map { r ⇒ Ok("Updated account total: " + r) } + } + } + //#asyncDepositPOST + //#asyncWithdrawPOST + case POST(Path("/account/withdraw")) ⇒ Action { implicit request ⇒ + val (accountId, amount) = commonForm.bindFromRequest.get + AsyncResult { + (accountActor ask Withdraw(accountId, amount)).mapTo[Int].asPromise.map { r ⇒ + if (r >= 0) Ok("Updated account total: " + r) + else BadRequest("Unknown account or insufficient funds. Get your act together.") + } + } + } + //#asyncWithdrawPOST + //#routeLogic + } + //#route + + //#form + val commonForm = Form( + tuple( + "accountId" -> nonEmptyText, + "amount" -> number(min = 1))) + //#form +} + +//#cases +case class Status(accountId: String) +case class Deposit(accountId: String, amount: Int) +case class Withdraw(accountId: String, amount: Int) +//#cases + +//#actor +class AccountActor extends Actor { + var accounts = MutableMap[String, Int]() + + //#receive + def receive = { + //#senderBang + case Status(accountId) ⇒ sender ! accounts.getOrElse(accountId, -1) + //#senderBang + case Deposit(accountId, amount) ⇒ sender ! deposit(accountId, amount) + case Withdraw(accountId, amount) ⇒ sender ! withdraw(accountId, amount) + } + //#receive + + private def deposit(accountId: String, amount: Int): Int = { + accounts.get(accountId) match { + case Some(value) ⇒ + val newValue = value + amount + accounts += accountId -> newValue + newValue + case None ⇒ + accounts += accountId -> amount + amount + } + } + + private def withdraw(accountId: String, amount: Int): Int = { + accounts.get(accountId) match { + case Some(value) ⇒ + if (value < amount) -1 + else { + val newValue = value - amount + accounts += accountId -> newValue + newValue + } + case None ⇒ -1 + } + } + //#actor +} \ No newline at end of file diff --git a/akka-docs/modules/http.rst b/akka-docs/modules/http.rst index 8388d65702..97978ed5f5 100644 --- a/akka-docs/modules/http.rst +++ b/akka-docs/modules/http.rst @@ -7,8 +7,194 @@ HTTP .. contents:: :local: -Play! ------ +Play2-mini +---------- +The Akka team recommends the `Play2-mini `_ framework when building RESTful +service applications that integrates with Akka. It provides a REST API on top of `Play2 `_. -Akka will recommend using `Play! Mini `_ +Getting started +--------------- + +First you must make your application aware of play-mini. +In SBT you just have to add the following to your _libraryDependencies_:: + + libraryDependencies += "com.typesafe" %% "play-mini" % "2.0-RC1-SNAPSHOT" + +Sample Application +------------------ + +To illustrate how easy it is to wire a RESTful service with Akka we will use a sample application. +The aim of the application is to show how to use play-mini and Akka in combination. Do not put too much +attention on the actual business logic itself, which is a extremely simple bank application, as building a bank +application is a little more complex than what's shown in the sample... + +The application should support the following URL commands: + - GET /ping - returns a Pong message with the time of the server (used to see if the application is up and running) + - GET /account/statement/{accountId} - returns the account statement + - POST /account/deposit - deposits money to an account (and creates a new one if it's not already existing) + - POST /account/withdraw - withdraws money from an account + +Error messages will be returned in case of any misuse of the application, e.g. withdrawing more money than an +account has etc. + +Getting started +--------------- + +To build a play-mini application you first have to make your object extend com.typesafe.play.mini.Application: + +.. includecode:: code/akka/docs/http/PlayMiniApplication.scala + :include: playMiniDefinition + +The next step is to implement the mandatory method ``route``: + +.. includecode:: code/akka/docs/http/PlayMiniApplication.scala + :include: route + :exclude: routeLogic + +It is inside the ``route`` method that all the magic happens. +In the sections below we will show how to set up play-mini to handle both GET and POST HTTP calls. + +Simple GET +---------- + +We start off by creating the simplest method we can - a "ping" method: + +.. includecode:: code/akka/docs/http/PlayMiniApplication.scala + :include: simpleGET + +As you can see in the section above play-mini uses Scala's wonderful pattern matching. +In the snippet we instruct play-mini to reply to all HTTP GET calls with the URI "/ping". +The ``Action`` returned comes from Play! and you can find more information about it `here `_. + +.. _Advanced-GET: + +Advanced GET +------------ + +Let's try something more advanced, retrieving parameters from the URI and also make an asynchronous call to an actor: + +.. includecode:: code/akka/docs/http/PlayMiniApplication.scala + :include: regexGET + +The regular expression looks like this: + +.. includecode:: code/akka/docs/http/PlayMiniApplication.scala + :include: regexURI + +In the snippets above we extract a URI parameter with the help of a simple regular expression and then we pass this +parameter on to the underlying actor system. As you can see ``AsyncResult`` is being used. This means that the call to +the actor will be performed asynchronously, i.e. no blocking. + +The asynchronous call to the actor is being done with a ``ask``, e.g.:: + + (accountActor ask Status(accountId)) + +The actor that receives the message returns the result by using a standard *sender !* +as can be seen here: + +.. includecode:: code/akka/docs/http/PlayMiniApplication.scala + :include: senderBang + +When the result is returned to the calling code we use some mapping code in Play to convert a Akka future to a Play future. +This is shown in this code: + +.. includecode:: code/akka/docs/http/PlayMiniApplication.scala + :include: innerRegexGET + +In this snippet we check the result to decide what type of response we want to send to the calling client. + +Using HTTP POST +--------------- + +Okay, in the sections above we have shown you how to use play-mini for HTTP GET calls. Let's move on to when the user +posts values to the application. + +.. includecode:: code/akka/docs/http/PlayMiniApplication.scala + :include: asyncDepositPOST + +As you can see the structure is almost the same as for the :ref:`Advanced-GET`. The difference is that we make the +``request`` parameter ``implicit`` and also that the following line of code is used to extract parameters from the POST. + +.. includecode:: code/akka/docs/http/PlayMiniApplication.scala + :include: formAsyncDepositPOST + +The code snippet used to map the call to parameters looks like this: + +.. includecode:: code/akka/docs/http/PlayMiniApplication.scala + :include: form + +Apart from the mapping of parameters the call to the actor looks is done the same as in :ref:`Advanced-GET`. + +The Complete Code Sample +------------------------ + +Below is the complete application in all its beauty. + +Global.scala (/src/main/scala/Global.scala): + +.. includecode:: code/Global.scala + +PlayMiniApplication.scala (/src/main/scala/akka/docs/http/PlayMiniApplication.scala): + +.. includecode:: code/akka/docs/http/PlayMiniApplication.scala + +Build.scala (/project/Build.scala): + +.. code-block:: scala + + import sbt._ + import Keys._ + + object PlayMiniApplicationBuild extends Build { + lazy val root = Project(id = "play-mini-application", base = file("."), settings = Project.defaultSettings).settings( + libraryDependencies += "com.typesafe" %% "play-mini" % "2.0-RC1-SNAPSHOT", + mainClass in (Compile, run) := Some("play.core.server.NettyServer")) + } + +Running the Application +----------------------- + +Firstly, start up the application by opening a command terminal and type:: + + > sbt + > run + +Now you should see something similar to this in your terminal window:: + + [info] Running play.core.server.NettyServer + Play server process ID is 2523 + [info] play - Application started (Prod) + [info] play - Listening for HTTP on port 9000... + +In this example we will use the awesome `cURL `_ command to interact with the application. +Fire up a command terminal and try the application out:: + + First we check the status of a couple of accounts: + > curl http://localhost:9000/account/statement/TheDudesAccount + Unknown account: TheDudesAccount + > curl http://localhost:9000/account/statement/MrLebowskisAccount + Unknown account: MrLebowskisAccount + + Now deposit some money to the accounts: + > curl -d "accountId=TheDudesAccount&amount=1000" http://localhost:9000/account/deposit + Updated account total: 1000 + > curl -d "accountId=MrLebowskisAccount&amount=500" http://localhost:9000/account/deposit + Updated account total: 500 + + Next thing is to check the status of the account: + > curl http://localhost:9000/account/statement/TheDudesAccount + Account total: 1000 + > curl http://localhost:9000/account/statement/MrLebowskisAccount + Account total: 500 + + Fair enough, let's try to withdraw some cash shall we: + > curl -d "accountId=TheDudesAccount&amount=999" http://localhost:9000/account/withdraw + Updated account total: 1 + > curl -d "accountId=MrLebowskisAccount&amount=999" http://localhost:9000/account/withdraw + Unknown account or insufficient funds. Get your act together. + > curl -d "accountId=MrLebowskisAccount&amount=500" http://localhost:9000/account/withdraw + Updated account total: 0 + +Yeah, it works! +Now we leave it to the astute reader of this document to take advantage of the power of play-mini and Akka. \ No newline at end of file diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 7ad4be2a9f..3d2b17b63b 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -473,7 +473,7 @@ object Dependencies { val tutorials = Seq(Test.scalatest, Test.junit) - val docs = Seq(Test.scalatest, Test.junit) + val docs = Seq(Test.scalatest, Test.junit, playMini) val zeroMQ = Seq(Test.scalatest, Test.junit, protobuf, Dependency.zeroMQ) } @@ -497,6 +497,7 @@ object Dependency { val Slf4j = "1.6.4" val Spring = "3.0.5.RELEASE" val Zookeeper = "3.4.0" + val PlayMini = "2.0-RC1-SNAPSHOT" } // Compile @@ -533,6 +534,7 @@ object Dependency { val zookeeper = "org.apache.hadoop.zookeeper" % "zookeeper" % V.Zookeeper // ApacheV2 val zookeeperLock = "org.apache.hadoop.zookeeper" % "zookeeper-recipes-lock" % V.Zookeeper // ApacheV2 val zeroMQ = "org.zeromq" %% "zeromq-scala-binding" % "0.0.3" // ApacheV2 + val playMini = "com.typesafe" % "play-mini_2.9.1" % V.PlayMini // Provided From 0cc34fdb3c3e680a7b42e43a7bdb0a85d84aec3c Mon Sep 17 00:00:00 2001 From: Henrik Engstrom Date: Mon, 13 Feb 2012 08:42:57 +0100 Subject: [PATCH 5/6] Added resolver to Typesafe snapshot repo used for play-mini. See #1538 --- project/AkkaBuild.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 3d2b17b63b..905e345dec 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -348,6 +348,7 @@ object AkkaBuild extends Build { lazy val defaultSettings = baseSettings ++ formatSettings ++ Seq( resolvers += "Typesafe Repo" at "http://repo.typesafe.com/typesafe/releases/", resolvers += "Twitter Public Repo" at "http://maven.twttr.com", // This will be going away with com.mongodb.async's next release + resolvers += "Typesafe Snapshot Repo" at "http://repo.typesafe.com/typesafe/snapshots/", // Used while play-mini is still on RC // compile options scalacOptions ++= Seq("-encoding", "UTF-8", "-deprecation", "-unchecked") ++ ( From 669a4ff9ca04c2422416435dc33a909ad7c5cc76 Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 13 Feb 2012 17:46:14 +0100 Subject: [PATCH 6/6] make unbalanced Address() constructor private, fix parsing, see #1806 --- .../scala/akka/actor/ActorLookupSpec.scala | 1 + .../src/main/scala/akka/actor/Address.scala | 38 ++++++++++--------- .../remoting/RemoteDeploymentDocSpec.scala | 2 +- .../remote/netty/NettyRemoteSupport.scala | 2 +- .../main/scala/akka/remote/netty/Server.scala | 4 +- .../akka/remote/RemoteDeployerSpec.scala | 2 +- .../scala/akka/remote/RemoteRouterSpec.scala | 2 +- 7 files changed, 27 insertions(+), 24 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala index 626e413ec8..299cc16679 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala @@ -305,6 +305,7 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { intercept[MalformedURLException] { ActorPath.fromString("://hallo") } intercept[MalformedURLException] { ActorPath.fromString("s://dd@:12") } intercept[MalformedURLException] { ActorPath.fromString("s://dd@h:hd") } + intercept[MalformedURLException] { ActorPath.fromString("a://l:1/b") } } } diff --git a/akka-actor/src/main/scala/akka/actor/Address.scala b/akka-actor/src/main/scala/akka/actor/Address.scala index d182a0e3b4..502ee56fdb 100644 --- a/akka-actor/src/main/scala/akka/actor/Address.scala +++ b/akka-actor/src/main/scala/akka/actor/Address.scala @@ -15,7 +15,7 @@ import java.net.MalformedURLException * for example a remote transport would want to associate additional * information with an address, then this must be done externally. */ -final case class Address(protocol: String, system: String, host: Option[String], port: Option[Int]) { +final case class Address private (protocol: String, system: String, host: Option[String], port: Option[Int]) { def this(protocol: String, system: String) = this(protocol, system, None, None) def this(protocol: String, system: String, host: String, port: Int) = this(protocol, system, Option(host), Some(port)) @@ -62,20 +62,25 @@ object RelativeActorPath { * This object serves as extractor for Scala and as address parser for Java. */ object AddressExtractor { - def unapply(addr: String): Option[Address] = { + def unapply(addr: String): Option[Address] = try { val uri = new URI(addr) - if (uri.getScheme == null || (uri.getUserInfo == null && uri.getHost == null)) None - else { - val addr = Address(uri.getScheme, if (uri.getUserInfo != null) uri.getUserInfo else uri.getHost, - if (uri.getUserInfo == null || uri.getHost == null) None else Some(uri.getHost), - if (uri.getPort < 0) None else Some(uri.getPort)) - Some(addr) - } + unapply(uri) } catch { case _: URISyntaxException ⇒ None } - } + + def unapply(uri: URI): Option[Address] = + if (uri.getScheme == null || (uri.getUserInfo == null && uri.getHost == null)) None + else if (uri.getUserInfo == null) { // case 1: “akka://system” + if (uri.getPort != -1) None + else Some(Address(uri.getScheme, uri.getHost)) + } else { // case 2: “akka://system@host:port” + if (uri.getHost == null || uri.getPort == -1) None + else Some( + if (uri.getUserInfo == null) Address(uri.getScheme, uri.getHost) + else Address(uri.getScheme, uri.getUserInfo, uri.getHost, uri.getPort)) + } /** * Try to construct an Address from the given String or throw a java.net.MalformedURLException. @@ -92,18 +97,15 @@ object AddressExtractor { } object ActorPathExtractor { - def unapply(addr: String): Option[(Address, Iterable[String])] = { + def unapply(addr: String): Option[(Address, Iterable[String])] = try { val uri = new URI(addr) - if (uri.getScheme == null || (uri.getUserInfo == null && uri.getHost == null) || uri.getPath == null) None - else { - val addr = Address(uri.getScheme, if (uri.getUserInfo != null) uri.getUserInfo else uri.getHost, - if (uri.getUserInfo == null || uri.getHost == null) None else Some(uri.getHost), - if (uri.getPort < 0) None else Some(uri.getPort)) - Some((addr, ActorPath.split(uri.getPath).drop(1))) + if (uri.getPath == null) None + else AddressExtractor.unapply(uri) match { + case None ⇒ None + case Some(addr) ⇒ Some((addr, ActorPath.split(uri.getPath).drop(1))) } } catch { case _: URISyntaxException ⇒ None } - } } \ No newline at end of file diff --git a/akka-docs/scala/code/akka/docs/remoting/RemoteDeploymentDocSpec.scala b/akka-docs/scala/code/akka/docs/remoting/RemoteDeploymentDocSpec.scala index 0a8785444f..a8d75512c5 100644 --- a/akka-docs/scala/code/akka/docs/remoting/RemoteDeploymentDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/remoting/RemoteDeploymentDocSpec.scala @@ -28,7 +28,7 @@ class RemoteDeploymentDocSpec extends AkkaSpec(""" import RemoteDeploymentDocSpec._ val other = ActorSystem("remote", system.settings.config) - val address = other.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address("akka", "s", Some("host"), Some(1))).get + val address = other.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address("akka", "s", "host", 1)).get override def atTermination() { other.shutdown() } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index a182948dba..8acd33c7fb 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -61,7 +61,7 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor case sa: InetSocketAddress ⇒ sa case x ⇒ throw new RemoteTransportException("unknown local address type " + x.getClass, null) } - _address.compareAndSet(null, Address("akka", remoteSettings.systemName, Some(settings.Hostname), Some(addr.getPort))) + _address.compareAndSet(null, Address("akka", remoteSettings.systemName, settings.Hostname, addr.getPort)) } def address = _address.get diff --git a/akka-remote/src/main/scala/akka/remote/netty/Server.scala b/akka-remote/src/main/scala/akka/remote/netty/Server.scala index 2c51875e9d..0bafb1c712 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Server.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Server.scala @@ -183,7 +183,7 @@ class RemoteServerHandler( instruction.getCommandType match { case CommandType.CONNECT if settings.UsePassiveConnections ⇒ val origin = instruction.getOrigin - val inbound = Address("akka", origin.getSystem, Some(origin.getHostname), Some(origin.getPort)) + val inbound = Address("akka", origin.getSystem, origin.getHostname, origin.getPort) val client = new PassiveRemoteClient(event.getChannel, netty, inbound) netty.bindClient(inbound, client) case CommandType.SHUTDOWN ⇒ //Will be unbound in channelClosed @@ -203,7 +203,7 @@ class RemoteServerHandler( private def getClientAddress(c: Channel): Option[Address] = c.getRemoteAddress match { - case inet: InetSocketAddress ⇒ Some(Address("akka", "unknown(yet)", Some(inet.getAddress.toString), Some(inet.getPort))) + case inet: InetSocketAddress ⇒ Some(Address("akka", "unknown(yet)", inet.getAddress.toString, inet.getPort)) case _ ⇒ None } } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala index 1b250f1ea9..57d240e8d8 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala @@ -42,7 +42,7 @@ class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) { service, deployment.get.config, RoundRobinRouter(3), - RemoteScope(Address("akka", "sys", Some("wallace"), Some(2552)))))) + RemoteScope(Address("akka", "sys", "wallace", 2552))))) } } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala index bcabd85098..bf79caf847 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala @@ -160,7 +160,7 @@ akka.actor.deployment { children must have size 2 val parents = children.map(_.parent) parents must have size 1 - parents.head.address must be(Address("akka", "remote_sys", Some("localhost"), Some(12347))) + parents.head.address must be(Address("akka", "remote_sys", "localhost", 12347)) children foreach (_.address.toString must be === "akka://remote_sys@localhost:12347") system.stop(router) }