From f85645e46dbe3be651e1590afce41c77755c0764 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 10 Feb 2012 10:36:35 +0100 Subject: [PATCH 01/19] Removing the old work redistribution --- .../akka/dispatch/BalancingDispatcher.scala | 22 +------------------ 1 file changed, 1 insertion(+), 21 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 8542ac69c8..a6042046ff 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -36,7 +36,6 @@ class BalancingDispatcher( extends Dispatcher(_prerequisites, _id, throughput, throughputDeadlineTime, mailboxType, _executorServiceFactoryProvider, _shutdownTimeout) { val buddies = new ConcurrentSkipListSet[ActorCell](akka.util.Helpers.IdentityHashComparator) - val rebalance = new AtomicBoolean(false) val messageQueue: MessageQueue = mailboxType match { case u: UnboundedMailbox ⇒ new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { @@ -84,30 +83,11 @@ class BalancingDispatcher( protected[akka] override def unregister(actor: ActorCell) = { buddies.remove(actor) super.unregister(actor) - intoTheFray(except = actor) //When someone leaves, he tosses a friend into the fray } - def intoTheFray(except: ActorCell): Unit = - if (rebalance.compareAndSet(false, true)) { - try { - val i = buddies.iterator() - - @tailrec - def throwIn(): Unit = { - val n = if (i.hasNext) i.next() else null - if (n eq null) () - else if ((n ne except) && registerForExecution(n.mailbox, false, false)) () - else throwIn() - } - throwIn() - } finally { - rebalance.set(false) - } - } - override protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { messageQueue.enqueue(receiver.self, invocation) registerForExecution(receiver.mailbox, false, false) - intoTheFray(except = receiver) + //Somewhere around here we have to make sure that not only the intended actor is kept busy } } From e017aeef0826fc99722b74551f336d1751f0e884 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 10 Feb 2012 16:02:37 +0100 Subject: [PATCH 02/19] Replace akka.actor.timeout with specfic settings. See #1808 * ActorTimeout (akka.actor.timeout) was used to all sorts of things. * TestKit default-timeout * TypedActor timeout for non void methods * Transactor coordinated-timeout * ZeroMQ new-socket-timeout * And in various tests --- .../scala/akka/actor/GlobalActorSystem.scala | 9 +- .../main/scala/akka/dispatch/OldFuture.scala | 4 +- .../main/scala/akka/migration/package.scala | 2 +- .../java/akka/dispatch/JavaFutureTests.java | 31 +- .../scala/akka/actor/TypedActorSpec.scala | 2 +- .../src/test/scala/akka/pattern/AskSpec.scala | 4 +- akka-actor/src/main/resources/reference.conf | 17 +- .../main/scala/akka/actor/ActorSystem.scala | 1 - .../main/scala/akka/actor/TypedActor.scala | 13 +- akka-docs/general/configuration.rst | 18 ++ .../akka/docs/future/FutureDocTestBase.java | 265 +++++++++--------- .../code/akka/docs/jrouting/ParentActor.java | 6 +- .../code/akka/docs/future/FutureDocSpec.scala | 17 +- .../akka/docs/routing/RouterTypeExample.scala | 3 +- .../akka/remote/RemoteCommunicationSpec.scala | 4 +- .../src/main/resources/reference.conf | 3 + .../scala/akka/testkit/TestActorRef.scala | 2 +- .../src/main/scala/akka/testkit/TestKit.scala | 3 +- .../scala/akka/testkit/TestKitExtension.scala | 2 + .../test/scala/akka/testkit/AkkaSpec.scala | 2 +- .../src/main/resources/reference.conf | 13 + .../scala/akka/transactor/Transactor.scala | 4 +- .../akka/transactor/TransactorExtension.scala | 25 ++ .../akka/transactor/UntypedTransactor.scala | 4 +- akka-zeromq/src/main/resources/reference.conf | 9 +- .../akka/zeromq/ConcurrentSocketActor.scala | 5 +- .../scala/akka/zeromq/ZeroMQExtension.scala | 8 +- 27 files changed, 283 insertions(+), 193 deletions(-) create mode 100644 akka-transactor/src/main/resources/reference.conf create mode 100644 akka-transactor/src/main/scala/akka/transactor/TransactorExtension.scala diff --git a/akka-actor-migration/src/main/scala/akka/actor/GlobalActorSystem.scala b/akka-actor-migration/src/main/scala/akka/actor/GlobalActorSystem.scala index 694dd5d547..e08883c6ed 100644 --- a/akka-actor-migration/src/main/scala/akka/actor/GlobalActorSystem.scala +++ b/akka-actor-migration/src/main/scala/akka/actor/GlobalActorSystem.scala @@ -4,14 +4,21 @@ package akka.actor import java.io.File - import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigParseOptions +import akka.util.Timeout +import akka.util.duration._ @deprecated("use ActorSystem instead", "2.0") object GlobalActorSystem extends ActorSystemImpl("GlobalSystem", OldConfigurationLoader.defaultConfig) { start() + + /** + * Timeout used in `OldFuture.get` and default implicit ask timeout. + * Hard coded since the migration kit is not intended to be used for production anyway. + */ + val AwaitTimeout = Timeout(5 seconds) } /** diff --git a/akka-actor-migration/src/main/scala/akka/dispatch/OldFuture.scala b/akka-actor-migration/src/main/scala/akka/dispatch/OldFuture.scala index 89941cd066..60029e256a 100644 --- a/akka-actor-migration/src/main/scala/akka/dispatch/OldFuture.scala +++ b/akka-actor-migration/src/main/scala/akka/dispatch/OldFuture.scala @@ -18,13 +18,13 @@ class OldFuture[T](future: Future[T]) { @deprecated("use akka.dispatch.Await.result instead", "2.0") def get: T = try { - Await.result(future, GlobalActorSystem.settings.ActorTimeout.duration) + Await.result(future, GlobalActorSystem.AwaitTimeout.duration) } catch { case e: TimeoutException ⇒ throw new FutureTimeoutException(e.getMessage, e) } @deprecated("use akka.dispatch.Await.ready instead", "2.0") - def await: Future[T] = await(GlobalActorSystem.settings.ActorTimeout.duration) + def await: Future[T] = await(GlobalActorSystem.AwaitTimeout.duration) @deprecated("use akka.dispatch.Await.ready instead", "2.0") def await(atMost: Duration) = try { diff --git a/akka-actor-migration/src/main/scala/akka/migration/package.scala b/akka-actor-migration/src/main/scala/akka/migration/package.scala index 6b8cad6fe2..469604e464 100644 --- a/akka-actor-migration/src/main/scala/akka/migration/package.scala +++ b/akka-actor-migration/src/main/scala/akka/migration/package.scala @@ -14,7 +14,7 @@ package object migration { implicit def future2OldFuture[T](future: Future[T]): OldFuture[T] = new OldFuture[T](future) - implicit def askTimeout: Timeout = GlobalActorSystem.settings.ActorTimeout + implicit def askTimeout: Timeout = GlobalActorSystem.AwaitTimeout implicit def defaultDispatcher: MessageDispatcher = GlobalActorSystem.dispatcher diff --git a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java index f494fd7d81..b03fe3b5fc 100644 --- a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java +++ b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java @@ -5,6 +5,7 @@ import akka.actor.ActorSystem; import akka.japi.*; import akka.util.Duration; +import akka.testkit.TestKitExtension; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -28,7 +29,7 @@ public class JavaFutureTests { @BeforeClass public static void beforeAll() { system = ActorSystem.create("JavaFutureTests", AkkaSpec.testConf()); - t = system.settings().ActorTimeout(); + t = TestKitExtension.get(system).DefaultTimeout(); } @AfterClass @@ -61,10 +62,10 @@ public class JavaFutureTests { Promise cf = Futures.promise(system.dispatcher()); Future f = cf; f.onSuccess(new OnSuccess() { - public void onSuccess(String result) { - if (result.equals("foo")) - latch.countDown(); - } + public void onSuccess(String result) { + if (result.equals("foo")) + latch.countDown(); + } }); cf.success("foo"); @@ -78,10 +79,10 @@ public class JavaFutureTests { Promise cf = Futures.promise(system.dispatcher()); Future f = cf; f.onFailure(new OnFailure() { - public void onFailure(Throwable t) { - if (t instanceof NullPointerException) - latch.countDown(); - } + public void onFailure(Throwable t) { + if (t instanceof NullPointerException) + latch.countDown(); + } }); Throwable exception = new NullPointerException(); @@ -296,8 +297,10 @@ public class JavaFutureTests { Promise p = Futures.promise(system.dispatcher()); Future f = p.future().recover(new Recover() { public Object recover(Throwable t) throws Throwable { - if (t == fail) return "foo"; - else throw t; + if (t == fail) + return "foo"; + else + throw t; } }); Duration d = Duration.create(1, TimeUnit.SECONDS); @@ -311,8 +314,10 @@ public class JavaFutureTests { Promise p = Futures.promise(system.dispatcher()); Future f = p.future().recoverWith(new Recover>() { public Future recover(Throwable t) throws Throwable { - if (t == fail) return Futures.successful("foo", system.dispatcher()).future(); - else throw t; + if (t == fail) + return Futures. successful("foo", system.dispatcher()).future(); + else + throw t; } }); Duration d = Duration.create(1, TimeUnit.SECONDS); diff --git a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala index 4e8bc4d7b4..5a9fab6c63 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala @@ -113,7 +113,7 @@ object TypedActorSpec { } def futureComposePigdogFrom(foo: Foo): Future[String] = { - implicit val timeout = TypedActor.context.system.settings.ActorTimeout + implicit val timeout = TypedActor(TypedActor.context.system).DefaultReturnTimeout foo.futurePigdog(500).map(_.toUpperCase) } diff --git a/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala index ecb9690594..f3c36665e8 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala @@ -5,8 +5,9 @@ package akka.pattern import akka.testkit.AkkaSpec import akka.util.duration._ +import akka.testkit.DefaultTimeout -class AskSpec extends AkkaSpec { +class AskSpec extends AkkaSpec with DefaultTimeout { "The “ask” pattern" must { @@ -22,7 +23,6 @@ class AskSpec extends AkkaSpec { "return broken promises on EmptyLocalActorRefs" in { val empty = system.actorFor("unknown") - implicit val timeout = system.settings.ActorTimeout val f = empty ? 3.14 f.isCompleted must be(true) f.value.get match { diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index bc52938a7d..62d49d61cd 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -1,6 +1,6 @@ -############################## -# Akka Reference Config File # -############################## +#################################### +# Akka Actor Reference Config File # +#################################### # This the reference config file has all the default settings. # Make your edits/overrides in your application.conf. @@ -50,12 +50,6 @@ akka { # removed from their parents reaper-interval = 5s - # Default timeout for Future based invocations - # - Actor: ask && ? - # - UntypedActor: ask - # - TypedActor: methods with non-void return type - timeout = 5s - # Serializes and deserializes (non-primitive) messages to ensure immutability, # this is only intended for testing. serialize-messages = off @@ -64,6 +58,11 @@ akka { # this is only intended for testing. serialize-creators = off + typed { + # Default timeout for typed actor methods with non-void return type + timeout = 5s + } + deployment { # deployment id pattern - on the format: /parent/child etc. diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index ac77628c2c..76eea494ca 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -70,7 +70,6 @@ object ActorSystem { final val CreationTimeout = Timeout(Duration(getMilliseconds("akka.actor.creation-timeout"), MILLISECONDS)) final val ReaperInterval = Duration(getMilliseconds("akka.actor.reaper-interval"), MILLISECONDS) - final val ActorTimeout = Timeout(Duration(getMilliseconds("akka.actor.timeout"), MILLISECONDS)) final val SerializeAllMessages = getBoolean("akka.actor.serialize-messages") final val SerializeAllCreators = getBoolean("akka.actor.serialize-creators") diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 98329203e9..3d1e0c76f3 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -11,7 +11,9 @@ import java.util.concurrent.atomic.{ AtomicReference ⇒ AtomVar } import akka.serialization.{ Serialization, SerializationExtension } import akka.dispatch._ import java.util.concurrent.TimeoutException +import java.util.concurrent.TimeUnit.MILLISECONDS import java.lang.IllegalStateException +import akka.util.Duration trait TypedActorFactory { @@ -502,7 +504,7 @@ case class TypedProps[T <: AnyRef] protected[TypedProps] ( /** * @return a new TypedProps that will use the specified Timeout for its non-void-returning methods, - * if null is specified, it will use the default ActorTimeout as specified in the configuration. + * if null is specified, it will use the default timeout as specified in the configuration. * * Java API */ @@ -510,7 +512,7 @@ case class TypedProps[T <: AnyRef] protected[TypedProps] ( /** * @return a new TypedProps that will use the specified Timeout for its non-void-returning methods, - * if None is specified, it will use the default ActorTimeout as specified in the configuration. + * if None is specified, it will use the default timeout as specified in the configuration. * * Scala API */ @@ -550,6 +552,11 @@ class TypedActorExtension(system: ExtendedActorSystem) extends TypedActorFactory val serialization = SerializationExtension(system) val settings = system.settings + /** + * Default timeout for typed actor methods with non-void return type + */ + final val DefaultReturnTimeout = Timeout(Duration(settings.config.getMilliseconds("akka.actor.typed.timeout"), MILLISECONDS)) + /** * Retrieves the underlying ActorRef for the supplied TypedActor proxy, or null if none found */ @@ -575,7 +582,7 @@ class TypedActorExtension(system: ExtendedActorSystem) extends TypedActorFactory new TypedActorInvocationHandler( this, actorVar, - if (props.timeout.isDefined) props.timeout.get else this.settings.ActorTimeout)).asInstanceOf[R] + if (props.timeout.isDefined) props.timeout.get else DefaultReturnTimeout)).asInstanceOf[R] proxyVar match { case null ⇒ diff --git a/akka-docs/general/configuration.rst b/akka-docs/general/configuration.rst index 3e2bca240d..649a6abc04 100644 --- a/akka-docs/general/configuration.rst +++ b/akka-docs/general/configuration.rst @@ -75,6 +75,24 @@ akka-testkit .. literalinclude:: ../../akka-testkit/src/main/resources/reference.conf :language: none +akka-transactor +~~~~~~~~~~~~~~~ + +.. literalinclude:: ../../akka-transactor/src/main/resources/reference.conf + :language: none + +akka-agent +~~~~~~~~~~ + +.. literalinclude:: ../../akka-agent/src/main/resources/reference.conf + :language: none + +akka-zeromq +~~~~~~~~~~~ + +.. literalinclude:: ../../akka-zeromq/src/main/resources/reference.conf + :language: none + akka-beanstalk-mailbox ~~~~~~~~~~~~~~~~~~~~~~ diff --git a/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java b/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java index 8fc3b29b4e..e541c925c1 100644 --- a/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java +++ b/akka-docs/java/code/akka/docs/future/FutureDocTestBase.java @@ -78,7 +78,7 @@ public class FutureDocTestBase { ActorRef actor = system.actorOf(new Props(MyActor.class)); String msg = "hello"; //#ask-blocking - Timeout timeout = system.settings().ActorTimeout(); + Timeout timeout = new Timeout(Duration.parse("5 seconds")); Future future = Patterns.ask(actor, msg, timeout); String result = (String) Await.result(future, timeout.duration()); //#ask-blocking @@ -196,19 +196,17 @@ public class FutureDocTestBase { Iterable> listOfFutureInts = source; // now we have a Future[Iterable[Integer]] - Future> futureListOfInts = - sequence(listOfFutureInts, system.dispatcher()); + Future> futureListOfInts = sequence(listOfFutureInts, system.dispatcher()); // Find the sum of the odd numbers - Future futureSum = futureListOfInts.map( - new Mapper, Long>() { - public Long apply(Iterable ints) { - long sum = 0; - for (Integer i : ints) - sum += i; - return sum; - } - }); + Future futureSum = futureListOfInts.map(new Mapper, Long>() { + public Long apply(Iterable ints) { + long sum = 0; + for (Integer i : ints) + sum += i; + return sum; + } + }); long result = Await.result(futureSum, Duration.create(1, SECONDS)); //#sequence @@ -221,20 +219,18 @@ public class FutureDocTestBase { //Just a sequence of Strings Iterable listStrings = Arrays.asList("a", "b", "c"); - Future> futureResult = traverse(listStrings, - new Function>() { - public Future apply(final String r) { - return future(new Callable() { - public String call() { - return r.toUpperCase(); - } - }, system.dispatcher()); - } - }, system.dispatcher()); + Future> futureResult = traverse(listStrings, new Function>() { + public Future apply(final String r) { + return future(new Callable() { + public String call() { + return r.toUpperCase(); + } + }, system.dispatcher()); + } + }, system.dispatcher()); //Returns the sequence of strings as upper case - Iterable result = - Await.result(futureResult, Duration.create(1, SECONDS)); + Iterable result = Await.result(futureResult, Duration.create(1, SECONDS)); assertEquals(Arrays.asList("A", "B", "C"), result); //#traverse } @@ -250,12 +246,11 @@ public class FutureDocTestBase { Iterable> futures = source; //Start value is the empty string - Future resultFuture = fold("", futures, - new Function2() { - public String apply(String r, String t) { - return r + t; //Just concatenate - } - }, system.dispatcher()); + Future resultFuture = fold("", futures, new Function2() { + public String apply(String r, String t) { + return r + t; //Just concatenate + } + }, system.dispatcher()); String result = Await.result(resultFuture, Duration.create(1, SECONDS)); //#fold @@ -272,12 +267,11 @@ public class FutureDocTestBase { //A sequence of Futures, in this case Strings Iterable> futures = source; - Future resultFuture = reduce(futures, - new Function2() { - public Object apply(Object r, String t) { - return r + t; //Just concatenate - } - }, system.dispatcher()); + Future resultFuture = reduce(futures, new Function2() { + public Object apply(Object r, String t) { + return r + t; //Just concatenate + } + }, system.dispatcher()); Object result = Await.result(resultFuture, Duration.create(1, SECONDS)); //#reduce @@ -285,32 +279,35 @@ public class FutureDocTestBase { assertEquals("ab", result); } - @Test public void useSuccessfulAndFailed() { + @Test + public void useSuccessfulAndFailed() { //#successful Future future = Futures.successful("Yay!", system.dispatcher()); //#successful //#failed - Future otherFuture = - Futures.failed(new IllegalArgumentException("Bang!"), system.dispatcher()); + Future otherFuture = Futures.failed(new IllegalArgumentException("Bang!"), system.dispatcher()); //#failed Object result = Await.result(future, Duration.create(1, SECONDS)); - assertEquals("Yay!",result); + assertEquals("Yay!", result); Throwable result2 = Await.result(otherFuture.failed(), Duration.create(1, SECONDS)); - assertEquals("Bang!",result2.getMessage()); + assertEquals("Bang!", result2.getMessage()); } - @Test public void useFilter() { - //#filter + @Test + public void useFilter() { + //#filter Future future1 = Futures.successful(4, system.dispatcher()); - Future successfulFilter = - future1.filter(new Filter() { - public boolean filter(Integer i) { return i % 2 == 0; } - }); + Future successfulFilter = future1.filter(new Filter() { + public boolean filter(Integer i) { + return i % 2 == 0; + } + }); - Future failedFilter = - future1.filter(new Filter() { - public boolean filter(Integer i) { return i % 2 != 0; } - }); + Future failedFilter = future1.filter(new Filter() { + public boolean filter(Integer i) { + return i % 2 != 0; + } + }); //When filter fails, the returned Future will be failed with a scala.MatchError //#filter } @@ -323,138 +320,140 @@ public class FutureDocTestBase { } - @Test public void useAndThen() { + @Test + public void useAndThen() { //#and-then - Future future1 = Futures.successful("value", system.dispatcher()). - andThen(new OnComplete() { - public void onComplete(Throwable failure, String result) { - if (failure != null) sendToIssueTracker(failure); - } + Future future1 = Futures.successful("value", system.dispatcher()).andThen(new OnComplete() { + public void onComplete(Throwable failure, String result) { + if (failure != null) + sendToIssueTracker(failure); + } }).andThen(new OnComplete() { - public void onComplete(Throwable failure, String result) { - if (result != null) sendToTheInternetz(result); - } + public void onComplete(Throwable failure, String result) { + if (result != null) + sendToTheInternetz(result); + } }); //#and-then } - @Test public void useRecover() { + @Test + public void useRecover() { //#recover Future future = future(new Callable() { public Integer call() { return 1 / 0; } }, system.dispatcher()).recover(new Recover() { - public Integer recover(Throwable problem) throws Throwable { - if (problem instanceof ArithmeticException) return 0; - else throw problem; - } + public Integer recover(Throwable problem) throws Throwable { + if (problem instanceof ArithmeticException) + return 0; + else + throw problem; + } }); int result = Await.result(future, Duration.create(1, SECONDS)); assertEquals(result, 0); //#recover } - @Test public void useTryRecover() { + @Test + public void useTryRecover() { //#try-recover Future future = future(new Callable() { public Integer call() { return 1 / 0; } }, system.dispatcher()).recoverWith(new Recover>() { - public Future recover(Throwable problem) throws Throwable { - if (problem instanceof ArithmeticException) { - return future(new Callable() { - public Integer call() { - return 0; - } - }, system.dispatcher()); + public Future recover(Throwable problem) throws Throwable { + if (problem instanceof ArithmeticException) { + return future(new Callable() { + public Integer call() { + return 0; } - else throw problem; - } + }, system.dispatcher()); + } else + throw problem; + } }); int result = Await.result(future, Duration.create(1, SECONDS)); assertEquals(result, 0); //#try-recover } - @Test public void useOnSuccessOnFailureAndOnComplete() { - { + @Test + public void useOnSuccessOnFailureAndOnComplete() { + { Future future = Futures.successful("foo", system.dispatcher()); //#onSuccess future.onSuccess(new OnSuccess() { - public void onSuccess(String result) { - if ("bar" == result) { - //Do something if it resulted in "bar" - } else { - //Do something if it was some other String - } + public void onSuccess(String result) { + if ("bar" == result) { + //Do something if it resulted in "bar" + } else { + //Do something if it was some other String } + } }); //#onSuccess - } - { - Future future = - Futures.failed(new IllegalStateException("OHNOES"), system.dispatcher()); - //#onFailure - future.onFailure( new OnFailure() { + } + { + Future future = Futures.failed(new IllegalStateException("OHNOES"), system.dispatcher()); + //#onFailure + future.onFailure(new OnFailure() { public void onFailure(Throwable failure) { - if (failure instanceof IllegalStateException) { - //Do something if it was this particular failure - } else { - //Do something if it was some other failure - } + if (failure instanceof IllegalStateException) { + //Do something if it was this particular failure + } else { + //Do something if it was some other failure + } } }); //#onFailure - } - { - Future future = Futures.successful("foo", system.dispatcher()); - //#onComplete - future.onComplete(new OnComplete() { - public void onComplete(Throwable failure, String result) { - if (failure != null) { - //We got a failure, handle it here - } else { - // We got a result, do something with it - } - } - }); - //#onComplete - } + } + { + Future future = Futures.successful("foo", system.dispatcher()); + //#onComplete + future.onComplete(new OnComplete() { + public void onComplete(Throwable failure, String result) { + if (failure != null) { + //We got a failure, handle it here + } else { + // We got a result, do something with it + } + } + }); + //#onComplete + } } - @Test public void useOrAndZip(){ + @Test + public void useOrAndZip() { { - //#zip - Future future1 = Futures.successful("foo", system.dispatcher()); - Future future2 = Futures.successful("bar", system.dispatcher()); - Future future3 = - future1.zip(future2).map(new Mapper, String>() { - public String apply(scala.Tuple2 zipped) { - return zipped._1() + " " + zipped._2(); + //#zip + Future future1 = Futures.successful("foo", system.dispatcher()); + Future future2 = Futures.successful("bar", system.dispatcher()); + Future future3 = future1.zip(future2).map(new Mapper, String>() { + public String apply(scala.Tuple2 zipped) { + return zipped._1() + " " + zipped._2(); } - }); + }); - String result = Await.result(future3, Duration.create(1, SECONDS)); - assertEquals("foo bar", result); - //#zip + String result = Await.result(future3, Duration.create(1, SECONDS)); + assertEquals("foo bar", result); + //#zip } { - //#fallback-to - Future future1 = - Futures.failed(new IllegalStateException("OHNOES1"), system.dispatcher()); - Future future2 = - Futures.failed(new IllegalStateException("OHNOES2"), system.dispatcher()); - Future future3 = - Futures.successful("bar", system.dispatcher()); - Future future4 = - future1.fallbackTo(future2).fallbackTo(future3); // Will have "bar" in this case - String result = Await.result(future4, Duration.create(1, SECONDS)); - assertEquals("bar", result); - //#fallback-to + //#fallback-to + Future future1 = Futures.failed(new IllegalStateException("OHNOES1"), system.dispatcher()); + Future future2 = Futures.failed(new IllegalStateException("OHNOES2"), system.dispatcher()); + Future future3 = Futures.successful("bar", system.dispatcher()); + Future future4 = future1.fallbackTo(future2).fallbackTo(future3); // Will have "bar" in this case + String result = Await.result(future4, Duration.create(1, SECONDS)); + assertEquals("bar", result); + //#fallback-to } } diff --git a/akka-docs/java/code/akka/docs/jrouting/ParentActor.java b/akka-docs/java/code/akka/docs/jrouting/ParentActor.java index 1119559489..cf1e2b9cee 100644 --- a/akka-docs/java/code/akka/docs/jrouting/ParentActor.java +++ b/akka-docs/java/code/akka/docs/jrouting/ParentActor.java @@ -54,9 +54,9 @@ public class ParentActor extends UntypedActor { ActorRef scatterGatherFirstCompletedRouter = getContext().actorOf( new Props(FibonacciActor.class).withRouter(new ScatterGatherFirstCompletedRouter(5, Duration .parse("2 seconds"))), "router"); - Timeout timeout = getContext().system().settings().ActorTimeout(); - Future futureResult = akka.pattern.Patterns.ask( - scatterGatherFirstCompletedRouter, new FibonacciActor.FibonacciNumber(10), timeout); + Timeout timeout = new Timeout(Duration.parse("5 seconds")); + Future futureResult = akka.pattern.Patterns.ask(scatterGatherFirstCompletedRouter, + new FibonacciActor.FibonacciNumber(10), timeout); int result = (Integer) Await.result(futureResult, timeout.duration()); //#scatterGatherFirstCompletedRouter System.out.println(String.format("The result of calculating Fibonacci for 10 is %d", result)); diff --git a/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala b/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala index 098fe873ad..3d1ca946a7 100644 --- a/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/future/FutureDocSpec.scala @@ -11,6 +11,7 @@ import akka.actor.Props import akka.actor.Status.Failure import akka.dispatch.Future import akka.dispatch.Await +import akka.util.Timeout import akka.util.duration._ import akka.dispatch.Promise import java.lang.IllegalStateException @@ -46,8 +47,10 @@ class FutureDocSpec extends AkkaSpec { //#ask-blocking import akka.dispatch.Await import akka.pattern.ask + import akka.util.Timeout + import akka.util.duration._ - implicit val timeout = system.settings.ActorTimeout + implicit val timeout = Timeout(5 seconds) val future = actor ? msg // enabled by the “ask” import val result = Await.result(future, timeout.duration).asInstanceOf[String] //#ask-blocking @@ -57,7 +60,7 @@ class FutureDocSpec extends AkkaSpec { "demonstrate usage of mapTo" in { val actor = system.actorOf(Props[MyActor]) val msg = "hello" - implicit val timeout = system.settings.ActorTimeout + implicit val timeout = Timeout(5 seconds) //#map-to import akka.dispatch.Future import akka.pattern.ask @@ -164,7 +167,7 @@ class FutureDocSpec extends AkkaSpec { val actor3 = system.actorOf(Props[MyActor]) val msg1 = 1 val msg2 = 2 - implicit val timeout = system.settings.ActorTimeout + implicit val timeout = Timeout(5 seconds) import akka.dispatch.Await import akka.pattern.ask //#composing-wrong @@ -188,7 +191,7 @@ class FutureDocSpec extends AkkaSpec { val actor3 = system.actorOf(Props[MyActor]) val msg1 = 1 val msg2 = 2 - implicit val timeout = system.settings.ActorTimeout + implicit val timeout = Timeout(5 seconds) import akka.dispatch.Await import akka.pattern.ask //#composing @@ -208,7 +211,7 @@ class FutureDocSpec extends AkkaSpec { } "demonstrate usage of sequence with actors" in { - implicit val timeout = system.settings.ActorTimeout + implicit val timeout = Timeout(5 seconds) val oddActor = system.actorOf(Props[OddActor]) //#sequence-ask // oddActor returns odd numbers sequentially from 1 as a List[Future[Int]] @@ -256,7 +259,7 @@ class FutureDocSpec extends AkkaSpec { } "demonstrate usage of recover" in { - implicit val timeout = system.settings.ActorTimeout + implicit val timeout = Timeout(5 seconds) val actor = system.actorOf(Props[MyActor]) val msg1 = -1 //#recover @@ -268,7 +271,7 @@ class FutureDocSpec extends AkkaSpec { } "demonstrate usage of recoverWith" in { - implicit val timeout = system.settings.ActorTimeout + implicit val timeout = Timeout(5 seconds) val actor = system.actorOf(Props[MyActor]) val msg1 = -1 //#try-recover diff --git a/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala b/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala index 2f0a1e634c..6ec475a874 100644 --- a/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala +++ b/akka-docs/scala/code/akka/docs/routing/RouterTypeExample.scala @@ -7,6 +7,7 @@ import akka.routing.{ ScatterGatherFirstCompletedRouter, BroadcastRouter, Random import annotation.tailrec import akka.actor.{ Props, Actor } import akka.util.duration._ +import akka.util.Timeout import akka.dispatch.Await import akka.pattern.ask import akka.routing.SmallestMailboxRouter @@ -80,7 +81,7 @@ class ParentActor extends Actor { val scatterGatherFirstCompletedRouter = context.actorOf( Props[FibonacciActor].withRouter(ScatterGatherFirstCompletedRouter( nrOfInstances = 5, within = 2 seconds)), "router") - implicit val timeout = context.system.settings.ActorTimeout + implicit val timeout = Timeout(5 seconds) val futureResult = scatterGatherFirstCompletedRouter ? FibonacciNumber(10) val result = Await.result(futureResult, timeout.duration) //#scatterGatherFirstCompletedRouter diff --git a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala index 88d80d6d81..73f8a98030 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteCommunicationSpec.scala @@ -44,7 +44,7 @@ akka { /looker/child/grandchild.remote = "akka://RemoteCommunicationSpec@localhost:12345" } } -""") with ImplicitSender { +""") with ImplicitSender with DefaultTimeout { import RemoteCommunicationSpec._ @@ -59,8 +59,6 @@ akka { val here = system.actorFor("akka://remote_sys@localhost:12346/user/echo") - implicit val timeout = system.settings.ActorTimeout - override def atTermination() { other.shutdown() } diff --git a/akka-testkit/src/main/resources/reference.conf b/akka-testkit/src/main/resources/reference.conf index e4ae685f4d..5e70df9403 100644 --- a/akka-testkit/src/main/resources/reference.conf +++ b/akka-testkit/src/main/resources/reference.conf @@ -18,6 +18,9 @@ akka { # duration to wait in expectMsg and friends outside of within() block by default single-expect-default = 3s + # The timeout that is added as an implicit by DefaultTimeout trait + default-timeout = 5s + calling-thread-dispatcher { type = akka.testkit.CallingThreadDispatcherConfigurator } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index 8769fdda51..8a2f61bf76 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -68,7 +68,7 @@ class TestActorRef[T <: Actor]( if (isTerminated) throw new IllegalActorStateException("underlying actor is terminated") underlying.actor.asInstanceOf[T] match { case null ⇒ - val t = underlying.system.settings.ActorTimeout + val t = TestKitExtension(_system).DefaultTimeout Await.result(this.?(InternalGetActor)(t), t.duration).asInstanceOf[T] case ref ⇒ ref } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index 4155ea662d..bdfab36ede 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -11,6 +11,7 @@ import java.util.concurrent.{ BlockingDeque, LinkedBlockingDeque, TimeUnit, atom import atomic.AtomicInteger import scala.annotation.tailrec import akka.actor.ActorSystem +import akka.util.Timeout object TestActor { type Ignore = Option[PartialFunction[AnyRef, Boolean]] @@ -644,5 +645,5 @@ trait ImplicitSender { this: TestKit ⇒ } trait DefaultTimeout { this: TestKit ⇒ - implicit val timeout = system.settings.ActorTimeout + implicit val timeout: Timeout = testKitSettings.DefaultTimeout } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKitExtension.scala b/akka-testkit/src/main/scala/akka/testkit/TestKitExtension.scala index ada5a4fd30..71ba8d0eac 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKitExtension.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKitExtension.scala @@ -5,6 +5,7 @@ package akka.testkit import com.typesafe.config.Config import akka.util.Duration +import akka.util.Timeout import java.util.concurrent.TimeUnit.MILLISECONDS import akka.actor.{ ExtensionId, ActorSystem, Extension, ExtendedActorSystem } @@ -20,4 +21,5 @@ class TestKitSettings(val config: Config) extends Extension { val TestTimeFactor = getDouble("akka.test.timefactor") val SingleExpectDefaultTimeout = Duration(getMilliseconds("akka.test.single-expect-default"), MILLISECONDS) val TestEventFilterLeeway = Duration(getMilliseconds("akka.test.filter-leeway"), MILLISECONDS) + val DefaultTimeout = Timeout(Duration(getMilliseconds("akka.test.default-timeout"), MILLISECONDS)) } \ No newline at end of file diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index 172bdc230f..95ce267320 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -122,7 +122,7 @@ class AkkaSpecSpec extends WordSpec with MustMatchers { try { var locker = Seq.empty[DeadLetter] - implicit val timeout = system.settings.ActorTimeout + implicit val timeout = TestKitExtension(system).DefaultTimeout implicit val davyJones = otherSystem.actorOf(Props(new Actor { def receive = { case m: DeadLetter ⇒ locker :+= m diff --git a/akka-transactor/src/main/resources/reference.conf b/akka-transactor/src/main/resources/reference.conf new file mode 100644 index 0000000000..d91a50db33 --- /dev/null +++ b/akka-transactor/src/main/resources/reference.conf @@ -0,0 +1,13 @@ +######################################### +# Akka Transactor Reference Config File # +######################################### + +# This the reference config file has all the default settings. +# Make your edits/overrides in your application.conf. + +akka { + transactor { + # The timeout used for coordinated transactions across actors + coordinated-timeout = 5s + } +} diff --git a/akka-transactor/src/main/scala/akka/transactor/Transactor.scala b/akka-transactor/src/main/scala/akka/transactor/Transactor.scala index 627a5ab249..6e390a6623 100644 --- a/akka-transactor/src/main/scala/akka/transactor/Transactor.scala +++ b/akka-transactor/src/main/scala/akka/transactor/Transactor.scala @@ -93,6 +93,8 @@ case class SendTo(actor: ActorRef, message: Option[Any] = None) * @see [[akka.transactor.Coordinated]] for more information about the underlying mechanism */ trait Transactor extends Actor { + private val settings = TransactorExtension(context.system) + /** * Implement a general pattern for using coordinated transactions. */ @@ -108,7 +110,7 @@ trait Transactor extends Actor { } case message ⇒ { if (normally.isDefinedAt(message)) normally(message) - else receive(Coordinated(message)(context.system.settings.ActorTimeout)) + else receive(Coordinated(message)(settings.CoordinatedTimeout)) } } diff --git a/akka-transactor/src/main/scala/akka/transactor/TransactorExtension.scala b/akka-transactor/src/main/scala/akka/transactor/TransactorExtension.scala new file mode 100644 index 0000000000..96aea8904c --- /dev/null +++ b/akka-transactor/src/main/scala/akka/transactor/TransactorExtension.scala @@ -0,0 +1,25 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.transactor + +import akka.actor.{ ActorSystem, ExtensionId, ExtensionIdProvider, ExtendedActorSystem } +import akka.actor.Extension +import com.typesafe.config.Config +import akka.util.Timeout +import akka.util.Duration +import java.util.concurrent.TimeUnit.MILLISECONDS + +/** + * TransactorExtension is an Akka Extension to hold settings for transactors. + */ +object TransactorExtension extends ExtensionId[TransactorSettings] with ExtensionIdProvider { + override def get(system: ActorSystem): TransactorSettings = super.get(system) + override def lookup = TransactorExtension + override def createExtension(system: ExtendedActorSystem): TransactorSettings = new TransactorSettings(system.settings.config) +} + +class TransactorSettings(val config: Config) extends Extension { + import config._ + val CoordinatedTimeout = Timeout(Duration(getMilliseconds("akka.transactor.coordinated-timeout"), MILLISECONDS)) +} \ No newline at end of file diff --git a/akka-transactor/src/main/scala/akka/transactor/UntypedTransactor.scala b/akka-transactor/src/main/scala/akka/transactor/UntypedTransactor.scala index 0ffbd5e65c..353695fd73 100644 --- a/akka-transactor/src/main/scala/akka/transactor/UntypedTransactor.scala +++ b/akka-transactor/src/main/scala/akka/transactor/UntypedTransactor.scala @@ -12,6 +12,8 @@ import java.util.{ Set ⇒ JSet } * An UntypedActor version of transactor for using from Java. */ abstract class UntypedTransactor extends UntypedActor { + private val settings = TransactorExtension(context.system) + /** * Implement a general pattern for using coordinated transactions. */ @@ -29,7 +31,7 @@ abstract class UntypedTransactor extends UntypedActor { } case message ⇒ { val normal = normally(message) - if (!normal) onReceive(Coordinated(message)(context.system.settings.ActorTimeout)) + if (!normal) onReceive(Coordinated(message)(settings.CoordinatedTimeout)) } } } diff --git a/akka-zeromq/src/main/resources/reference.conf b/akka-zeromq/src/main/resources/reference.conf index cfb5756156..b94a442c78 100644 --- a/akka-zeromq/src/main/resources/reference.conf +++ b/akka-zeromq/src/main/resources/reference.conf @@ -1,6 +1,6 @@ -############################## -# Akka Reference Config File # -############################## +##################################### +# Akka ZeroMQ Reference Config File # +##################################### # This the reference config file has all the default settings. # Make your edits/overrides in your application.conf. @@ -12,6 +12,9 @@ akka { # The default timeout for a poll on the actual zeromq socket. poll-timeout = 100ms + # Timeout for creating a new socket + new-socket-timeout = 5s + socket-dispatcher { # A zeromq socket needs to be pinned to the thread that created it. # Changing this value results in weird errors and race conditions within zeromq diff --git a/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala b/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala index 254a097d80..82a07d7aa3 100644 --- a/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala +++ b/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala @@ -187,12 +187,9 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A fromConfig getOrElse context.system.dispatcher } - private val defaultPollTimeout = - Duration(context.system.settings.config.getMilliseconds("akka.zeromq.poll-timeout"), TimeUnit.MILLISECONDS) - private val pollTimeout = { val fromConfig = params collectFirst { case PollTimeoutDuration(duration) ⇒ duration } - fromConfig getOrElse defaultPollTimeout + fromConfig getOrElse ZeroMQExtension(context.system).DefaultPollTimeout } private def newEventLoop: Option[Promise[PollLifeCycle]] = { diff --git a/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala b/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala index 7ae178291f..25b35e4644 100644 --- a/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala +++ b/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala @@ -7,6 +7,9 @@ import org.zeromq.{ ZMQ ⇒ JZMQ } import akka.actor._ import akka.dispatch.{ Await } import akka.pattern.ask +import akka.util.Duration +import java.util.concurrent.TimeUnit +import akka.util.Timeout /** * A Model to represent a version of the zeromq library @@ -43,6 +46,9 @@ object ZeroMQExtension extends ExtensionId[ZeroMQExtension] with ExtensionIdProv */ class ZeroMQExtension(system: ActorSystem) extends Extension { + val DefaultPollTimeout = Duration(system.settings.config.getMilliseconds("akka.zeromq.poll-timeout"), TimeUnit.MILLISECONDS) + val NewSocketTimeout = Timeout(Duration(system.settings.config.getMilliseconds("akka.zeromq.new-socket-timeout"), TimeUnit.MILLISECONDS)) + /** * The version of the ZeroMQ library * @return a [[akka.zeromq.ZeroMQVersion]] @@ -136,7 +142,7 @@ class ZeroMQExtension(system: ActorSystem) extends Extension { * @return the [[akka.actor.ActorRef]] */ def newSocket(socketParameters: SocketOption*): ActorRef = { - implicit val timeout = system.settings.ActorTimeout + implicit val timeout = NewSocketTimeout val req = (zeromqGuardian ? newSocketProps(socketParameters: _*)).mapTo[ActorRef] Await.result(req, timeout.duration) } From 43913b0490a88235748f631896217b38f11a2ca6 Mon Sep 17 00:00:00 2001 From: Roland Date: Fri, 10 Feb 2012 20:47:59 +0100 Subject: [PATCH 03/19] change IdentityHashComparator to fall back to a real one MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit System.identityHashCode is not guaranteed to be consistent with equals() (cannot be, just imagine more than 2^32 objects); fix it by checking equals in case 0 would be returned and fall back to a real Comparator in case that’s needed. --- .../scala/akka/dispatch/BalancingDispatcher.scala | 10 +++++++--- akka-actor/src/main/scala/akka/util/Helpers.scala | 14 ++++++++++++-- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index a6042046ff..d8274d810a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -9,7 +9,8 @@ import akka.actor.{ ActorCell, ActorRef } import java.util.concurrent.{ LinkedBlockingQueue, ConcurrentLinkedQueue, ConcurrentSkipListSet } import annotation.tailrec import java.util.concurrent.atomic.AtomicBoolean -import akka.util.Duration +import akka.util.{ Duration, Helpers } +import java.util.Comparator /** * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed @@ -35,10 +36,13 @@ class BalancingDispatcher( _shutdownTimeout: Duration) extends Dispatcher(_prerequisites, _id, throughput, throughputDeadlineTime, mailboxType, _executorServiceFactoryProvider, _shutdownTimeout) { - val buddies = new ConcurrentSkipListSet[ActorCell](akka.util.Helpers.IdentityHashComparator) + val buddies = new ConcurrentSkipListSet[ActorCell]( + Helpers.identityHashComparator(new Comparator[ActorCell] { + def compare(l: ActorCell, r: ActorCell) = l.self.path compareTo r.self.path + })) val messageQueue: MessageQueue = mailboxType match { - case u: UnboundedMailbox ⇒ new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { + case _: UnboundedMailbox ⇒ new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { final val queue = new ConcurrentLinkedQueue[Envelope] } case BoundedMailbox(cap, timeout) ⇒ new QueueBasedMessageQueue with BoundedMessageQueueSemantics { diff --git a/akka-actor/src/main/scala/akka/util/Helpers.scala b/akka-actor/src/main/scala/akka/util/Helpers.scala index 60e6be8b65..25cb279f2e 100644 --- a/akka-actor/src/main/scala/akka/util/Helpers.scala +++ b/akka-actor/src/main/scala/akka/util/Helpers.scala @@ -21,8 +21,18 @@ object Helpers { if (diff > 0) 1 else if (diff < 0) -1 else 0 } - val IdentityHashComparator = new Comparator[AnyRef] { - def compare(a: AnyRef, b: AnyRef): Int = compareIdentityHash(a, b) + /** + * Create a comparator which will efficiently use `System.identityHashCode`, + * unless that happens to be the same for two non-equals objects, in which + * case the supplied “real” comparator is used; the comparator must be + * consistent with equals, otherwise it would not be an enhancement over + * the identityHashCode. + */ + def identityHashComparator[T <: AnyRef](comp: Comparator[T]): Comparator[T] = new Comparator[T] { + def compare(a: T, b: T): Int = compareIdentityHash(a, b) match { + case 0 if a != b ⇒ comp.compare(a, b) + case x ⇒ x + } } final val base64chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+~" From a2ef3eed7eb0e1764a32c3c018250406c126f8c0 Mon Sep 17 00:00:00 2001 From: Roland Date: Fri, 10 Feb 2012 20:56:52 +0100 Subject: [PATCH 04/19] scaffolding: make debug printout more useful, add assertions --- .../akka/actor/dispatch/ActorModelSpec.scala | 22 ++++++++++++------- .../akka/dispatch/AbstractDispatcher.scala | 2 ++ .../akka/dispatch/BalancingDispatcher.scala | 4 ++-- .../main/scala/akka/dispatch/Mailbox.scala | 5 ++++- 4 files changed, 22 insertions(+), 11 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 46bf609c7a..4635fc4749 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -110,8 +110,9 @@ object ActorModelSpec { val stops = new AtomicLong(0) def getStats(actorRef: ActorRef) = { - stats.putIfAbsent(actorRef, new InterceptorStats) match { - case null ⇒ stats.get(actorRef) + val is = new InterceptorStats + stats.putIfAbsent(actorRef, is) match { + case null ⇒ is case other ⇒ other } } @@ -127,12 +128,12 @@ object ActorModelSpec { } protected[akka] abstract override def register(actor: ActorCell) { - getStats(actor.self).registers.incrementAndGet() + assert(getStats(actor.self).registers.incrementAndGet() == 1) super.register(actor) } protected[akka] abstract override def unregister(actor: ActorCell) { - getStats(actor.self).unregisters.incrementAndGet() + assert(getStats(actor.self).unregisters.incrementAndGet() == 1) super.unregister(actor) } @@ -351,7 +352,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa def flood(num: Int) { val cachedMessage = CountDownNStop(new CountDownLatch(num)) val stopLatch = new CountDownLatch(num) - val waitTime = (30 seconds).dilated.toMillis + val waitTime = (20 seconds).dilated.toMillis val boss = system.actorOf(Props(new Actor { def receive = { case "run" ⇒ for (_ ← 1 to num) (context.watch(context.actorOf(props))) ! cachedMessage @@ -368,13 +369,18 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa val buddies = dispatcher.buddies val mq = dispatcher.messageQueue - System.err.println("Buddies left: ") - buddies.toArray foreach { + System.err.println("Buddies left: " + buddies.size + " stopLatch: " + stopLatch.getCount + " inhab:" + dispatcher.inhab) + buddies.toArray sorted new Ordering[AnyRef] { + def compare(l: AnyRef, r: AnyRef) = (l, r) match { + case (ll: ActorCell, rr: ActorCell) ⇒ ll.self.path.toString.compareTo(rr.self.path.toString) + } + } foreach { case cell: ActorCell ⇒ System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.status + " " + cell.mailbox.numberOfMessages + " " + SystemMessage.size(cell.mailbox.systemDrain())) } - System.err.println("Mailbox: " + mq.numberOfMessages + " " + mq.hasMessages + " ") + System.err.println("Mailbox: " + mq.numberOfMessages + " " + mq.hasMessages) + Iterator.continually(mq.dequeue) takeWhile (_ ne null) foreach System.err.println case _ ⇒ } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 1b31be630c..da0e4fdc6e 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -260,6 +260,8 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext mailBox.cleanUp() } + def inhab = inhabitantsUpdater.get(this) + private val shutdownAction = new Runnable { @tailrec final def run() { diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index d8274d810a..4195d0ec61 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -81,11 +81,11 @@ class BalancingDispatcher( protected[akka] override def register(actor: ActorCell) = { super.register(actor) - buddies.add(actor) + assert(buddies.add(actor)) } protected[akka] override def unregister(actor: ActorCell) = { - buddies.remove(actor) + assert(buddies.remove(actor)) super.unregister(actor) } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index cc15ae2173..32fce8564e 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -190,7 +190,10 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue var nextMessage = systemDrain() try { while ((nextMessage ne null) && !isClosed) { - if (debug) println(actor.self + " processing system message " + nextMessage + " with children " + actor.childrenRefs) + if (debug) println(actor.self + " processing system message " + nextMessage + " with " + + (if (actor.childrenRefs.isEmpty) "no children" + else if (actor.childrenRefs.size > 20) actor.childrenRefs.size + " children" + else actor.childrenRefs.mkString("children:\n ", "\n ", ""))) actor systemInvoke nextMessage nextMessage = nextMessage.next // don’t ever execute normal message when system message present! From 880f09be2215c9e8ecacb6cc5e020af570bd2ac9 Mon Sep 17 00:00:00 2001 From: Roland Date: Fri, 10 Feb 2012 21:29:11 +0100 Subject: [PATCH 05/19] special start-up sequence for actors on BalancingDispatcher Normally the ActorCell would register the actor with the dispatcher (yeah, I moved it into the logical order, because the other one was specifically done for BD but does not work out) and then dispatch the Create() message. This does not work for BD, because then the actor could potentiall process a message before Create() is enqueued, so override systemDispatch() to drop Create() and insert that during register() (which is called from attach()), making sure to achieve the following order: - enqueue Create() - register with dispatcher - add to buddies - schedule mailbox --- akka-actor/src/main/scala/akka/actor/ActorCell.scala | 4 ++-- .../scala/akka/dispatch/BalancingDispatcher.scala | 11 +++++++++++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index aa718e12c8..f4112e5d37 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -290,10 +290,10 @@ private[akka] class ActorCell( // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ parent.sendSystemMessage(akka.dispatch.Supervise(self)) + dispatcher.attach(this) + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ dispatcher.systemDispatch(this, Create()) - - dispatcher.attach(this) } // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 4195d0ec61..4ef7607016 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -79,9 +79,20 @@ class BalancingDispatcher( } } + protected[akka] override def systemDispatch(receiver: ActorCell, invocation: SystemMessage): Unit = + invocation match { + case Create() ⇒ + case x ⇒ super.systemDispatch(receiver, invocation) + } + protected[akka] override def register(actor: ActorCell) = { + val mbox = actor.mailbox + mbox.systemEnqueue(actor.self, Create()) + // must make sure that Create() is the first message enqueued in this mailbox super.register(actor) assert(buddies.add(actor)) + // must make sure that buddy-add is executed before the actor has had a chance to die + registerForExecution(mbox, false, true) } protected[akka] override def unregister(actor: ActorCell) = { From 5a9ec45d01b4cf4554cad9420d45091542ca7611 Mon Sep 17 00:00:00 2001 From: Roland Date: Fri, 10 Feb 2012 22:16:32 +0100 Subject: [PATCH 06/19] first stab at balancing algorithm: it passes the tests - add new config item "buddy-wakeup-threshold" which defaults to 5 - if BWT>=0, then check mailbox.numberOfMessages in case the target actor was not scheduled during dispatch and schedule a buddie if that is found >=BWT (BWT is a getfield) - if during unregister() there are messages in the queue, schedule a buddie This way people can tune which behavior they want, knowing full well that numberOfMessages is O(n). --- .../scala/akka/actor/dispatch/ActorModelSpec.scala | 3 ++- .../src/test/scala/akka/config/ConfigSpec.scala | 1 + akka-actor/src/main/resources/reference.conf | 7 +++++++ .../scala/akka/dispatch/BalancingDispatcher.scala | 14 ++++++++++---- .../src/main/scala/akka/dispatch/Dispatchers.scala | 3 ++- 5 files changed, 22 insertions(+), 6 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 4635fc4749..a735e7298b 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -546,7 +546,8 @@ object BalancingDispatcherModelSpec { Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), mailboxType, configureExecutor(), - Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) with MessageDispatcherInterceptor + Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), + config.getInt("buddy-wakeup-threshold")) with MessageDispatcherInterceptor override def dispatcher(): MessageDispatcher = instance } diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index dd5149ad8e..13bb3b4f27 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -53,6 +53,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) { c.getMilliseconds("shutdown-timeout") must equal(1 * 1000) c.getInt("throughput") must equal(5) c.getMilliseconds("throughput-deadline-time") must equal(0) + c.getInt("buddy-wakeup-threshold") must equal(5) } //Fork join executor config diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index bc52938a7d..b7e0563339 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -246,6 +246,13 @@ akka { # mailbox is used. The Class of the FQCN must have a constructor with a # com.typesafe.config.Config parameter. mailbox-type = "" + + # For BalancingDispatcher: if during message enqueuing the target actor is + # already busy and at least this number of messages is currently in the queue, + # then wake up another actor from the same dispatcher at random. + # Set to -1 to disable (which will also skip the possibly expensive check; + # obtaining the mailbox size is O(n) for the default mailboxes). + buddy-wakeup-threshold = 5 } debug { diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 4ef7607016..63fafef7d1 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -10,7 +10,7 @@ import java.util.concurrent.{ LinkedBlockingQueue, ConcurrentLinkedQueue, Concur import annotation.tailrec import java.util.concurrent.atomic.AtomicBoolean import akka.util.{ Duration, Helpers } -import java.util.Comparator +import java.util.{ Comparator, Iterator } /** * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed @@ -33,7 +33,8 @@ class BalancingDispatcher( throughputDeadlineTime: Duration, mailboxType: MailboxType, _executorServiceFactoryProvider: ExecutorServiceFactoryProvider, - _shutdownTimeout: Duration) + _shutdownTimeout: Duration, + buddyWakeupThreshold: Int) extends Dispatcher(_prerequisites, _id, throughput, throughputDeadlineTime, mailboxType, _executorServiceFactoryProvider, _shutdownTimeout) { val buddies = new ConcurrentSkipListSet[ActorCell]( @@ -98,11 +99,16 @@ class BalancingDispatcher( protected[akka] override def unregister(actor: ActorCell) = { assert(buddies.remove(actor)) super.unregister(actor) + if (messageQueue.hasMessages) registerOne() } override protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { messageQueue.enqueue(receiver.self, invocation) - registerForExecution(receiver.mailbox, false, false) - //Somewhere around here we have to make sure that not only the intended actor is kept busy + if (!registerForExecution(receiver.mailbox, false, false) && + buddyWakeupThreshold >= 0 && + messageQueue.numberOfMessages >= buddyWakeupThreshold) registerOne() } + + @tailrec private def registerOne(i: Iterator[ActorCell] = buddies.iterator): Unit = + if (i.hasNext && !registerForExecution(i.next.mailbox, false, false)) registerOne(i) } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index 8e99e05b06..b9fd3f784b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -189,7 +189,8 @@ class BalancingDispatcherConfigurator(config: Config, prerequisites: DispatcherP config.getInt("throughput"), Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), mailboxType, configureExecutor(), - Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) + Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), + config.getInt("buddy-wakeup-threshold")) /** * Returns the same dispatcher instance for each invocation From 1e879b6d03b9d4fb0e66f9d9da3c4d21311edebd Mon Sep 17 00:00:00 2001 From: Henrik Engstrom Date: Sun, 12 Feb 2012 23:09:47 +0100 Subject: [PATCH 07/19] Added HTTP docs with a sample application. See #1538 --- akka-docs/modules/code/Global.scala | 6 + .../akka/docs/http/PlayMiniApplication.scala | 128 ++++++++++++ akka-docs/modules/http.rst | 192 +++++++++++++++++- project/AkkaBuild.scala | 4 +- 4 files changed, 326 insertions(+), 4 deletions(-) create mode 100644 akka-docs/modules/code/Global.scala create mode 100644 akka-docs/modules/code/akka/docs/http/PlayMiniApplication.scala diff --git a/akka-docs/modules/code/Global.scala b/akka-docs/modules/code/Global.scala new file mode 100644 index 0000000000..021e30d5b1 --- /dev/null +++ b/akka-docs/modules/code/Global.scala @@ -0,0 +1,6 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +//#global +object Global extends com.typesafe.play.mini.Setup(akka.docs.http.PlayMiniApplication) +//#global \ No newline at end of file diff --git a/akka-docs/modules/code/akka/docs/http/PlayMiniApplication.scala b/akka-docs/modules/code/akka/docs/http/PlayMiniApplication.scala new file mode 100644 index 0000000000..15c0de0ac5 --- /dev/null +++ b/akka-docs/modules/code/akka/docs/http/PlayMiniApplication.scala @@ -0,0 +1,128 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.docs.http + +//#imports +import com.typesafe.play.mini.{ POST, GET, Path, Application } +import play.api.mvc.{ Action, AsyncResult } +import play.api.mvc.Results._ +import play.api.libs.concurrent._ +import play.api.data._ +import play.api.data.Forms._ +import akka.pattern.ask +import akka.util.Timeout +import akka.util.duration._ +import akka.actor.{ ActorSystem, Props, Actor } +import scala.collection.mutable.{ Map ⇒ MutableMap } +//#imports + +//#playMiniDefinition +object PlayMiniApplication extends Application { + //#playMiniDefinition + private val system = ActorSystem("sample") + //#regexURI + private final val StatementPattern = """/account/statement/(\w+)""".r + //#regexURI + private lazy val accountActor = system.actorOf(Props[AccountActor]) + implicit val timeout = Timeout(1000 milliseconds) + + //#route + def route = { + //#routeLogic + //#simpleGET + case GET(Path("/ping")) ⇒ Action { + Ok("Pong @ " + System.currentTimeMillis) + } + //#simpleGET + //#regexGET + case GET(Path(StatementPattern(accountId))) ⇒ Action { + AsyncResult { + //#innerRegexGET + (accountActor ask Status(accountId)).mapTo[Int].asPromise.map { r ⇒ + if (r >= 0) Ok("Account total: " + r) + else BadRequest("Unknown account: " + accountId) + } + //#innerRegexGET + } + } + //#regexGET + //#asyncDepositPOST + case POST(Path("/account/deposit")) ⇒ Action { implicit request ⇒ + //#formAsyncDepositPOST + val (accountId, amount) = commonForm.bindFromRequest.get + //#formAsyncDepositPOST + AsyncResult { + (accountActor ask Deposit(accountId, amount)).mapTo[Int].asPromise.map { r ⇒ Ok("Updated account total: " + r) } + } + } + //#asyncDepositPOST + //#asyncWithdrawPOST + case POST(Path("/account/withdraw")) ⇒ Action { implicit request ⇒ + val (accountId, amount) = commonForm.bindFromRequest.get + AsyncResult { + (accountActor ask Withdraw(accountId, amount)).mapTo[Int].asPromise.map { r ⇒ + if (r >= 0) Ok("Updated account total: " + r) + else BadRequest("Unknown account or insufficient funds. Get your act together.") + } + } + } + //#asyncWithdrawPOST + //#routeLogic + } + //#route + + //#form + val commonForm = Form( + tuple( + "accountId" -> nonEmptyText, + "amount" -> number(min = 1))) + //#form +} + +//#cases +case class Status(accountId: String) +case class Deposit(accountId: String, amount: Int) +case class Withdraw(accountId: String, amount: Int) +//#cases + +//#actor +class AccountActor extends Actor { + var accounts = MutableMap[String, Int]() + + //#receive + def receive = { + //#senderBang + case Status(accountId) ⇒ sender ! accounts.getOrElse(accountId, -1) + //#senderBang + case Deposit(accountId, amount) ⇒ sender ! deposit(accountId, amount) + case Withdraw(accountId, amount) ⇒ sender ! withdraw(accountId, amount) + } + //#receive + + private def deposit(accountId: String, amount: Int): Int = { + accounts.get(accountId) match { + case Some(value) ⇒ + val newValue = value + amount + accounts += accountId -> newValue + newValue + case None ⇒ + accounts += accountId -> amount + amount + } + } + + private def withdraw(accountId: String, amount: Int): Int = { + accounts.get(accountId) match { + case Some(value) ⇒ + if (value < amount) -1 + else { + val newValue = value - amount + accounts += accountId -> newValue + newValue + } + case None ⇒ -1 + } + } + //#actor +} \ No newline at end of file diff --git a/akka-docs/modules/http.rst b/akka-docs/modules/http.rst index 8388d65702..97978ed5f5 100644 --- a/akka-docs/modules/http.rst +++ b/akka-docs/modules/http.rst @@ -7,8 +7,194 @@ HTTP .. contents:: :local: -Play! ------ +Play2-mini +---------- +The Akka team recommends the `Play2-mini `_ framework when building RESTful +service applications that integrates with Akka. It provides a REST API on top of `Play2 `_. -Akka will recommend using `Play! Mini `_ +Getting started +--------------- + +First you must make your application aware of play-mini. +In SBT you just have to add the following to your _libraryDependencies_:: + + libraryDependencies += "com.typesafe" %% "play-mini" % "2.0-RC1-SNAPSHOT" + +Sample Application +------------------ + +To illustrate how easy it is to wire a RESTful service with Akka we will use a sample application. +The aim of the application is to show how to use play-mini and Akka in combination. Do not put too much +attention on the actual business logic itself, which is a extremely simple bank application, as building a bank +application is a little more complex than what's shown in the sample... + +The application should support the following URL commands: + - GET /ping - returns a Pong message with the time of the server (used to see if the application is up and running) + - GET /account/statement/{accountId} - returns the account statement + - POST /account/deposit - deposits money to an account (and creates a new one if it's not already existing) + - POST /account/withdraw - withdraws money from an account + +Error messages will be returned in case of any misuse of the application, e.g. withdrawing more money than an +account has etc. + +Getting started +--------------- + +To build a play-mini application you first have to make your object extend com.typesafe.play.mini.Application: + +.. includecode:: code/akka/docs/http/PlayMiniApplication.scala + :include: playMiniDefinition + +The next step is to implement the mandatory method ``route``: + +.. includecode:: code/akka/docs/http/PlayMiniApplication.scala + :include: route + :exclude: routeLogic + +It is inside the ``route`` method that all the magic happens. +In the sections below we will show how to set up play-mini to handle both GET and POST HTTP calls. + +Simple GET +---------- + +We start off by creating the simplest method we can - a "ping" method: + +.. includecode:: code/akka/docs/http/PlayMiniApplication.scala + :include: simpleGET + +As you can see in the section above play-mini uses Scala's wonderful pattern matching. +In the snippet we instruct play-mini to reply to all HTTP GET calls with the URI "/ping". +The ``Action`` returned comes from Play! and you can find more information about it `here `_. + +.. _Advanced-GET: + +Advanced GET +------------ + +Let's try something more advanced, retrieving parameters from the URI and also make an asynchronous call to an actor: + +.. includecode:: code/akka/docs/http/PlayMiniApplication.scala + :include: regexGET + +The regular expression looks like this: + +.. includecode:: code/akka/docs/http/PlayMiniApplication.scala + :include: regexURI + +In the snippets above we extract a URI parameter with the help of a simple regular expression and then we pass this +parameter on to the underlying actor system. As you can see ``AsyncResult`` is being used. This means that the call to +the actor will be performed asynchronously, i.e. no blocking. + +The asynchronous call to the actor is being done with a ``ask``, e.g.:: + + (accountActor ask Status(accountId)) + +The actor that receives the message returns the result by using a standard *sender !* +as can be seen here: + +.. includecode:: code/akka/docs/http/PlayMiniApplication.scala + :include: senderBang + +When the result is returned to the calling code we use some mapping code in Play to convert a Akka future to a Play future. +This is shown in this code: + +.. includecode:: code/akka/docs/http/PlayMiniApplication.scala + :include: innerRegexGET + +In this snippet we check the result to decide what type of response we want to send to the calling client. + +Using HTTP POST +--------------- + +Okay, in the sections above we have shown you how to use play-mini for HTTP GET calls. Let's move on to when the user +posts values to the application. + +.. includecode:: code/akka/docs/http/PlayMiniApplication.scala + :include: asyncDepositPOST + +As you can see the structure is almost the same as for the :ref:`Advanced-GET`. The difference is that we make the +``request`` parameter ``implicit`` and also that the following line of code is used to extract parameters from the POST. + +.. includecode:: code/akka/docs/http/PlayMiniApplication.scala + :include: formAsyncDepositPOST + +The code snippet used to map the call to parameters looks like this: + +.. includecode:: code/akka/docs/http/PlayMiniApplication.scala + :include: form + +Apart from the mapping of parameters the call to the actor looks is done the same as in :ref:`Advanced-GET`. + +The Complete Code Sample +------------------------ + +Below is the complete application in all its beauty. + +Global.scala (/src/main/scala/Global.scala): + +.. includecode:: code/Global.scala + +PlayMiniApplication.scala (/src/main/scala/akka/docs/http/PlayMiniApplication.scala): + +.. includecode:: code/akka/docs/http/PlayMiniApplication.scala + +Build.scala (/project/Build.scala): + +.. code-block:: scala + + import sbt._ + import Keys._ + + object PlayMiniApplicationBuild extends Build { + lazy val root = Project(id = "play-mini-application", base = file("."), settings = Project.defaultSettings).settings( + libraryDependencies += "com.typesafe" %% "play-mini" % "2.0-RC1-SNAPSHOT", + mainClass in (Compile, run) := Some("play.core.server.NettyServer")) + } + +Running the Application +----------------------- + +Firstly, start up the application by opening a command terminal and type:: + + > sbt + > run + +Now you should see something similar to this in your terminal window:: + + [info] Running play.core.server.NettyServer + Play server process ID is 2523 + [info] play - Application started (Prod) + [info] play - Listening for HTTP on port 9000... + +In this example we will use the awesome `cURL `_ command to interact with the application. +Fire up a command terminal and try the application out:: + + First we check the status of a couple of accounts: + > curl http://localhost:9000/account/statement/TheDudesAccount + Unknown account: TheDudesAccount + > curl http://localhost:9000/account/statement/MrLebowskisAccount + Unknown account: MrLebowskisAccount + + Now deposit some money to the accounts: + > curl -d "accountId=TheDudesAccount&amount=1000" http://localhost:9000/account/deposit + Updated account total: 1000 + > curl -d "accountId=MrLebowskisAccount&amount=500" http://localhost:9000/account/deposit + Updated account total: 500 + + Next thing is to check the status of the account: + > curl http://localhost:9000/account/statement/TheDudesAccount + Account total: 1000 + > curl http://localhost:9000/account/statement/MrLebowskisAccount + Account total: 500 + + Fair enough, let's try to withdraw some cash shall we: + > curl -d "accountId=TheDudesAccount&amount=999" http://localhost:9000/account/withdraw + Updated account total: 1 + > curl -d "accountId=MrLebowskisAccount&amount=999" http://localhost:9000/account/withdraw + Unknown account or insufficient funds. Get your act together. + > curl -d "accountId=MrLebowskisAccount&amount=500" http://localhost:9000/account/withdraw + Updated account total: 0 + +Yeah, it works! +Now we leave it to the astute reader of this document to take advantage of the power of play-mini and Akka. \ No newline at end of file diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 7ad4be2a9f..3d2b17b63b 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -473,7 +473,7 @@ object Dependencies { val tutorials = Seq(Test.scalatest, Test.junit) - val docs = Seq(Test.scalatest, Test.junit) + val docs = Seq(Test.scalatest, Test.junit, playMini) val zeroMQ = Seq(Test.scalatest, Test.junit, protobuf, Dependency.zeroMQ) } @@ -497,6 +497,7 @@ object Dependency { val Slf4j = "1.6.4" val Spring = "3.0.5.RELEASE" val Zookeeper = "3.4.0" + val PlayMini = "2.0-RC1-SNAPSHOT" } // Compile @@ -533,6 +534,7 @@ object Dependency { val zookeeper = "org.apache.hadoop.zookeeper" % "zookeeper" % V.Zookeeper // ApacheV2 val zookeeperLock = "org.apache.hadoop.zookeeper" % "zookeeper-recipes-lock" % V.Zookeeper // ApacheV2 val zeroMQ = "org.zeromq" %% "zeromq-scala-binding" % "0.0.3" // ApacheV2 + val playMini = "com.typesafe" % "play-mini_2.9.1" % V.PlayMini // Provided From 0cc34fdb3c3e680a7b42e43a7bdb0a85d84aec3c Mon Sep 17 00:00:00 2001 From: Henrik Engstrom Date: Mon, 13 Feb 2012 08:42:57 +0100 Subject: [PATCH 08/19] Added resolver to Typesafe snapshot repo used for play-mini. See #1538 --- project/AkkaBuild.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 3d2b17b63b..905e345dec 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -348,6 +348,7 @@ object AkkaBuild extends Build { lazy val defaultSettings = baseSettings ++ formatSettings ++ Seq( resolvers += "Typesafe Repo" at "http://repo.typesafe.com/typesafe/releases/", resolvers += "Twitter Public Repo" at "http://maven.twttr.com", // This will be going away with com.mongodb.async's next release + resolvers += "Typesafe Snapshot Repo" at "http://repo.typesafe.com/typesafe/snapshots/", // Used while play-mini is still on RC // compile options scalacOptions ++= Seq("-encoding", "UTF-8", "-deprecation", "-unchecked") ++ ( From 251a7cc7e399778d4e2765742ea7dcb69086e64b Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 13 Feb 2012 12:38:59 +0100 Subject: [PATCH 09/19] clean up BalancingDispatcher: - change from messageQueue.numberOfMessages to maintaining an AtomicLong for performance reasons - add comments/scaladoc where missing - remove some assert()s - fix ResiserSpec to employ buddy-wakeup-threshold --- .../akka/actor/dispatch/ActorModelSpec.scala | 4 +- .../test/scala/akka/routing/ResizerSpec.scala | 1 + .../src/main/scala/akka/actor/ActorCell.scala | 5 ++ .../akka/dispatch/AbstractDispatcher.scala | 2 +- .../akka/dispatch/BalancingDispatcher.scala | 55 +++++++++++++++---- .../main/scala/akka/dispatch/Mailbox.scala | 29 +++++++--- 6 files changed, 74 insertions(+), 22 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index a735e7298b..15886973b2 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -352,7 +352,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa def flood(num: Int) { val cachedMessage = CountDownNStop(new CountDownLatch(num)) val stopLatch = new CountDownLatch(num) - val waitTime = (20 seconds).dilated.toMillis + val waitTime = (30 seconds).dilated.toMillis val boss = system.actorOf(Props(new Actor { def receive = { case "run" ⇒ for (_ ← 1 to num) (context.watch(context.actorOf(props))) ! cachedMessage @@ -369,7 +369,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa val buddies = dispatcher.buddies val mq = dispatcher.messageQueue - System.err.println("Buddies left: " + buddies.size + " stopLatch: " + stopLatch.getCount + " inhab:" + dispatcher.inhab) + System.err.println("Buddies left: " + buddies.size + " stopLatch: " + stopLatch.getCount + " inhab:" + dispatcher.inhabitants) buddies.toArray sorted new Ordering[AnyRef] { def compare(l: AnyRef, r: AnyRef) = (l, r) match { case (ll: ActorCell, rr: ActorCell) ⇒ ll.self.path.toString.compareTo(rr.self.path.toString) diff --git a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala index 2130afe107..26b5021c18 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala @@ -26,6 +26,7 @@ object ResizerSpec { } bal-disp { type = BalancingDispatcher + buddy-wakeup-threshold = 1 } """ diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index f4112e5d37..9268406086 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -290,6 +290,11 @@ private[akka] class ActorCell( // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ parent.sendSystemMessage(akka.dispatch.Supervise(self)) + /* + * attach before submitting the mailbox for the first time, because + * otherwise the actor could already be dead before the dispatcher is + * informed of its existence (with reversed attach/detach sequence). + */ dispatcher.attach(this) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index da0e4fdc6e..9d1575c4ec 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -260,7 +260,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext mailBox.cleanUp() } - def inhab = inhabitantsUpdater.get(this) + def inhabitants: Long = inhabitantsUpdater.get(this) private val shutdownAction = new Runnable { @tailrec diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 63fafef7d1..70101578f0 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -11,6 +11,8 @@ import annotation.tailrec import java.util.concurrent.atomic.AtomicBoolean import akka.util.{ Duration, Helpers } import java.util.{ Comparator, Iterator } +import akka.util.Unsafe +import java.util.concurrent.atomic.AtomicLong /** * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed @@ -43,18 +45,46 @@ class BalancingDispatcher( })) val messageQueue: MessageQueue = mailboxType match { - case _: UnboundedMailbox ⇒ new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { - final val queue = new ConcurrentLinkedQueue[Envelope] - } - case BoundedMailbox(cap, timeout) ⇒ new QueueBasedMessageQueue with BoundedMessageQueueSemantics { - final val queue = new LinkedBlockingQueue[Envelope](cap) - final val pushTimeOut = timeout - } + case UnboundedMailbox() ⇒ + new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { + final val queue = new ConcurrentLinkedQueue[Envelope] + + override def enqueue(receiver: ActorRef, handle: Envelope) = { + super.enqueue(receiver, handle) + _pressure.getAndIncrement() + } + + override def dequeue(): Envelope = + super.dequeue() match { + case null ⇒ null + case x ⇒ _pressure.getAndDecrement(); x + } + } + + case BoundedMailbox(cap, timeout) ⇒ + new QueueBasedMessageQueue with BoundedMessageQueueSemantics { + final val queue = new LinkedBlockingQueue[Envelope](cap) + final val pushTimeOut = timeout + + override def enqueue(receiver: ActorRef, handle: Envelope) = { + super.enqueue(receiver, handle) + _pressure.getAndIncrement() + } + + override def dequeue(): Envelope = + super.dequeue() match { + case null ⇒ null + case x ⇒ _pressure.getAndDecrement(); x + } + } + case other ⇒ throw new IllegalArgumentException("Only handles BoundedMailbox and UnboundedMailbox, but you specified [" + other + "]") } protected[akka] override def createMailbox(actor: ActorCell): Mailbox = new SharingMailbox(actor) + private val _pressure = new AtomicLong + class SharingMailbox(_actor: ActorCell) extends Mailbox(_actor) with DefaultSystemMessageQueue { final def enqueue(receiver: ActorRef, handle: Envelope) = messageQueue.enqueue(receiver, handle) @@ -81,6 +111,11 @@ class BalancingDispatcher( } protected[akka] override def systemDispatch(receiver: ActorCell, invocation: SystemMessage): Unit = + /* + * need to filter out Create() messages here because BalancingDispatcher + * already enqueues this within register(), which is called first by the + * ActorCell. + */ invocation match { case Create() ⇒ case x ⇒ super.systemDispatch(receiver, invocation) @@ -91,13 +126,13 @@ class BalancingDispatcher( mbox.systemEnqueue(actor.self, Create()) // must make sure that Create() is the first message enqueued in this mailbox super.register(actor) - assert(buddies.add(actor)) + buddies.add(actor) // must make sure that buddy-add is executed before the actor has had a chance to die registerForExecution(mbox, false, true) } protected[akka] override def unregister(actor: ActorCell) = { - assert(buddies.remove(actor)) + buddies.remove(actor) super.unregister(actor) if (messageQueue.hasMessages) registerOne() } @@ -106,7 +141,7 @@ class BalancingDispatcher( messageQueue.enqueue(receiver.self, invocation) if (!registerForExecution(receiver.mailbox, false, false) && buddyWakeupThreshold >= 0 && - messageQueue.numberOfMessages >= buddyWakeupThreshold) registerOne() + _pressure.get >= buddyWakeupThreshold) registerOne() } @tailrec private def registerOne(i: Iterator[ActorCell] = buddies.iterator): Unit = diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 32fce8564e..4c50cb5c8d 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -239,15 +239,26 @@ private[akka] abstract class Mailbox(val actor: ActorCell) extends MessageQueue } trait MessageQueue { - /* - * These method need to be implemented in subclasses; they should not rely on the internal stuff above. + /** + * Try to enqueue the message to this queue, or throw an exception. */ - def enqueue(receiver: ActorRef, handle: Envelope) + def enqueue(receiver: ActorRef, handle: Envelope): Unit // NOTE: receiver is used only in two places, but cannot be removed + /** + * Try to dequeue the next message from this queue, return null failing that. + */ def dequeue(): Envelope + /** + * Should return the current number of messages held in this queue; may + * always return 0 if no other value is available efficiently. Do not use + * this for testing for presence of messages, use `hasMessages` instead. + */ def numberOfMessages: Int + /** + * Indicates whether this queue is non-empty. + */ def hasMessages: Boolean } @@ -295,15 +306,15 @@ trait DefaultSystemMessageQueue { self: Mailbox ⇒ } trait UnboundedMessageQueueSemantics extends QueueBasedMessageQueue { - final def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle - final def dequeue(): Envelope = queue.poll() + def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle + def dequeue(): Envelope = queue.poll() } trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue { def pushTimeOut: Duration override def queue: BlockingQueue[Envelope] - final def enqueue(receiver: ActorRef, handle: Envelope) { + def enqueue(receiver: ActorRef, handle: Envelope) { if (pushTimeOut.length > 0) { queue.offer(handle, pushTimeOut.length, pushTimeOut.unit) || { throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + receiver) @@ -311,13 +322,13 @@ trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue { } else queue put handle } - final def dequeue(): Envelope = queue.poll() + def dequeue(): Envelope = queue.poll() } trait QueueBasedMessageQueue extends MessageQueue { def queue: Queue[Envelope] - final def numberOfMessages = queue.size - final def hasMessages = !queue.isEmpty + def numberOfMessages = queue.size + def hasMessages = !queue.isEmpty } /** From bb40c1ae307bdb5147b4de03efb7f239f8d2d424 Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 13 Feb 2012 14:43:20 +0100 Subject: [PATCH 10/19] tweak ResizerSpec to work better with async Resize(), see #1814 - previously relied on resize() being invoked before enqueueing to the mailbox, which is not at all guaranteed any longer. --- .../test/scala/akka/routing/ResizerSpec.scala | 52 +++++++++---------- .../src/main/scala/akka/routing/Routing.scala | 3 +- 2 files changed, 26 insertions(+), 29 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala index 26b5021c18..b9765c8e92 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala @@ -11,6 +11,7 @@ import akka.util.duration._ import akka.actor.ActorRef import java.util.concurrent.atomic.AtomicInteger import akka.pattern.ask +import akka.util.Duration object ResizerSpec { @@ -161,53 +162,48 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with // as influenced by the backlog of blocking pooled actors val resizer = DefaultResizer( - lowerBound = 2, - upperBound = 4, + lowerBound = 3, + upperBound = 5, rampupRate = 0.1, + backoffRate = 0.0, pressureThreshold = 1, messagesPerResize = 1, backoffThreshold = 0.0) val router = system.actorOf(Props(new Actor { def receive = { - case (n: Int, latch: TestLatch, count: AtomicInteger) ⇒ - (n millis).dilated.sleep - count.incrementAndGet - latch.countDown() + case d: Duration ⇒ d.dilated.sleep; sender ! "done" + case "echo" ⇒ sender ! "reply" } }).withRouter(RoundRobinRouter(resizer = Some(resizer)))) // first message should create the minimum number of routees - router ! 1 + router ! "echo" + expectMsg("reply") - Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(2) + def routees(r: ActorRef): Int = { + r ! CurrentRoutees + expectMsgType[RouterRoutees].routees.size + } - def loop(loops: Int, t: Int, latch: TestLatch, count: AtomicInteger) = { - (100 millis).dilated.sleep - for (m ← 0 until loops) { - router.!((t, latch, count)) - (100 millis).dilated.sleep - } + routees(router) must be(3) + + def loop(loops: Int, d: Duration) = { + for (m ← 0 until loops) router ! d + for (m ← 0 until loops) expectMsg(d * 2, "done") } // 2 more should go thru without triggering more - val count1 = new AtomicInteger - val latch1 = TestLatch(2) - loop(2, 200, latch1, count1) - Await.ready(latch1, TestLatch.DefaultTimeout) - count1.get must be(2) + loop(2, 200 millis) - Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(2) + routees(router) must be(3) // a whole bunch should max it out - val count2 = new AtomicInteger - val latch2 = TestLatch(10) - loop(10, 500, latch2, count2) - Await.ready(latch2, TestLatch.DefaultTimeout) - count2.get must be(10) - - Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(4) + loop(4, 500 millis) + awaitCond(routees(router) == 4) + loop(10, 500 millis) + awaitCond(routees(router) == 5) } "backoff" in { @@ -240,7 +236,7 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with (300 millis).dilated.sleep // let it cool down - for (m ← 0 to 3) { + for (m ← 0 to 5) { router ! 1 (500 millis).dilated.sleep } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index da2c81d1a7..b050a21b53 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -1028,7 +1028,8 @@ case class DefaultResizer( */ def capacity(routees: IndexedSeq[ActorRef]): Int = { val currentSize = routees.size - val delta = filter(pressure(routees), currentSize) + val press = pressure(routees) + val delta = filter(press, currentSize) val proposed = currentSize + delta if (proposed < lowerBound) delta + (lowerBound - proposed) From 7c57a9d60e6f0f9fb014b61d649f3ab802b5fd89 Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 13 Feb 2012 15:33:31 +0100 Subject: [PATCH 11/19] final touch to actor start-up sequence split systemDispatch(Create()) into systemEnqueue(Create()) directly after createMailbox and registerForExecution from within Dispatcher.attach() (resp. CallingThreadDispatcher.register() does its own thing) --- .../src/main/scala/akka/actor/ActorCell.scala | 15 ++++++------ .../akka/dispatch/AbstractDispatcher.scala | 11 ++++++--- .../akka/dispatch/BalancingDispatcher.scala | 24 ++++--------------- .../testkit/CallingThreadDispatcher.scala | 13 +++++++++- 4 files changed, 31 insertions(+), 32 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 9268406086..c22529b2c8 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -285,20 +285,19 @@ private[akka] class ActorCell( final def isTerminated: Boolean = mailbox.isClosed final def start(): Unit = { + /* + * Create the mailbox and enqueue the Create() message to ensure that + * this is processed before anything else. + */ mailbox = dispatcher.createMailbox(this) + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + mailbox.systemEnqueue(self, Create()) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ parent.sendSystemMessage(akka.dispatch.Supervise(self)) - /* - * attach before submitting the mailbox for the first time, because - * otherwise the actor could already be dead before the dispatcher is - * informed of its existence (with reversed attach/detach sequence). - */ + // This call is expected to start off the actor by scheduling its mailbox. dispatcher.attach(this) - - // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - dispatcher.systemDispatch(this, Create()) } // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 9d1575c4ec..22eadb55d5 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -185,9 +185,14 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext def id: String /** - * Attaches the specified actor instance to this dispatcher + * Attaches the specified actor instance to this dispatcher, which includes + * scheduling it to run for the first time (Create() is expected to have + * been enqueued by the ActorCell upon mailbox creation). */ - final def attach(actor: ActorCell): Unit = register(actor) + final def attach(actor: ActorCell): Unit = { + register(actor) + registerForExecution(actor.mailbox, false, true) + } /** * Detaches the specified actor instance from this dispatcher @@ -243,7 +248,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext () ⇒ if (inhabitantsUpdater.decrementAndGet(this) == 0) ifSensibleToDoSoThenScheduleShutdown() /** - * If you override it, you must call it. But only ever once. See "attach" for only invocation + * If you override it, you must call it. But only ever once. See "attach" for only invocation. */ protected[akka] def register(actor: ActorCell) { inhabitantsUpdater.incrementAndGet(this) diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 70101578f0..61ac773aa0 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -110,40 +110,24 @@ class BalancingDispatcher( } } - protected[akka] override def systemDispatch(receiver: ActorCell, invocation: SystemMessage): Unit = - /* - * need to filter out Create() messages here because BalancingDispatcher - * already enqueues this within register(), which is called first by the - * ActorCell. - */ - invocation match { - case Create() ⇒ - case x ⇒ super.systemDispatch(receiver, invocation) - } - protected[akka] override def register(actor: ActorCell) = { - val mbox = actor.mailbox - mbox.systemEnqueue(actor.self, Create()) - // must make sure that Create() is the first message enqueued in this mailbox super.register(actor) buddies.add(actor) - // must make sure that buddy-add is executed before the actor has had a chance to die - registerForExecution(mbox, false, true) } protected[akka] override def unregister(actor: ActorCell) = { buddies.remove(actor) super.unregister(actor) - if (messageQueue.hasMessages) registerOne() + if (messageQueue.hasMessages) scheduleOne() } override protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { messageQueue.enqueue(receiver.self, invocation) if (!registerForExecution(receiver.mailbox, false, false) && buddyWakeupThreshold >= 0 && - _pressure.get >= buddyWakeupThreshold) registerOne() + _pressure.get >= buddyWakeupThreshold) scheduleOne() } - @tailrec private def registerOne(i: Iterator[ActorCell] = buddies.iterator): Unit = - if (i.hasNext && !registerForExecution(i.next.mailbox, false, false)) registerOne(i) + @tailrec private def scheduleOne(i: Iterator[ActorCell] = buddies.iterator): Unit = + if (i.hasNext && !registerForExecution(i.next.mailbox, false, false)) scheduleOne(i) } diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 8282ee58f5..8b2d15a079 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -8,7 +8,7 @@ import java.util.concurrent.locks.ReentrantLock import java.util.LinkedList import scala.annotation.tailrec import com.typesafe.config.Config -import akka.actor.{ ExtensionIdProvider, ExtensionId, Extension, ExtendedActorSystem, ActorRef, ActorCell } +import akka.actor.{ ActorInitializationException, ExtensionIdProvider, ExtensionId, Extension, ExtendedActorSystem, ActorRef, ActorCell } import akka.dispatch.{ TaskInvocation, SystemMessage, Suspend, Resume, MessageDispatcherConfigurator, MessageDispatcher, Mailbox, Envelope, DispatcherPrerequisites, DefaultSystemMessageQueue } import akka.util.duration.intToDurationInt import akka.util.{ Switch, Duration } @@ -132,6 +132,17 @@ class CallingThreadDispatcher( protected[akka] override def shutdownTimeout = 1 second + protected[akka] override def register(actor: ActorCell): Unit = { + super.register(actor) + actor.mailbox match { + case mbox: CallingThreadMailbox ⇒ + val queue = mbox.queue + queue.enter + runQueue(mbox, queue) + case x ⇒ throw new ActorInitializationException("expected CallingThreadMailbox, got " + x.getClass) + } + } + override def suspend(actor: ActorCell) { actor.mailbox match { case m: CallingThreadMailbox ⇒ m.suspendSwitch.switchOn From 669a4ff9ca04c2422416435dc33a909ad7c5cc76 Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 13 Feb 2012 17:46:14 +0100 Subject: [PATCH 12/19] make unbalanced Address() constructor private, fix parsing, see #1806 --- .../scala/akka/actor/ActorLookupSpec.scala | 1 + .../src/main/scala/akka/actor/Address.scala | 38 ++++++++++--------- .../remoting/RemoteDeploymentDocSpec.scala | 2 +- .../remote/netty/NettyRemoteSupport.scala | 2 +- .../main/scala/akka/remote/netty/Server.scala | 4 +- .../akka/remote/RemoteDeployerSpec.scala | 2 +- .../scala/akka/remote/RemoteRouterSpec.scala | 2 +- 7 files changed, 27 insertions(+), 24 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala index 626e413ec8..299cc16679 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala @@ -305,6 +305,7 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { intercept[MalformedURLException] { ActorPath.fromString("://hallo") } intercept[MalformedURLException] { ActorPath.fromString("s://dd@:12") } intercept[MalformedURLException] { ActorPath.fromString("s://dd@h:hd") } + intercept[MalformedURLException] { ActorPath.fromString("a://l:1/b") } } } diff --git a/akka-actor/src/main/scala/akka/actor/Address.scala b/akka-actor/src/main/scala/akka/actor/Address.scala index d182a0e3b4..502ee56fdb 100644 --- a/akka-actor/src/main/scala/akka/actor/Address.scala +++ b/akka-actor/src/main/scala/akka/actor/Address.scala @@ -15,7 +15,7 @@ import java.net.MalformedURLException * for example a remote transport would want to associate additional * information with an address, then this must be done externally. */ -final case class Address(protocol: String, system: String, host: Option[String], port: Option[Int]) { +final case class Address private (protocol: String, system: String, host: Option[String], port: Option[Int]) { def this(protocol: String, system: String) = this(protocol, system, None, None) def this(protocol: String, system: String, host: String, port: Int) = this(protocol, system, Option(host), Some(port)) @@ -62,20 +62,25 @@ object RelativeActorPath { * This object serves as extractor for Scala and as address parser for Java. */ object AddressExtractor { - def unapply(addr: String): Option[Address] = { + def unapply(addr: String): Option[Address] = try { val uri = new URI(addr) - if (uri.getScheme == null || (uri.getUserInfo == null && uri.getHost == null)) None - else { - val addr = Address(uri.getScheme, if (uri.getUserInfo != null) uri.getUserInfo else uri.getHost, - if (uri.getUserInfo == null || uri.getHost == null) None else Some(uri.getHost), - if (uri.getPort < 0) None else Some(uri.getPort)) - Some(addr) - } + unapply(uri) } catch { case _: URISyntaxException ⇒ None } - } + + def unapply(uri: URI): Option[Address] = + if (uri.getScheme == null || (uri.getUserInfo == null && uri.getHost == null)) None + else if (uri.getUserInfo == null) { // case 1: “akka://system” + if (uri.getPort != -1) None + else Some(Address(uri.getScheme, uri.getHost)) + } else { // case 2: “akka://system@host:port” + if (uri.getHost == null || uri.getPort == -1) None + else Some( + if (uri.getUserInfo == null) Address(uri.getScheme, uri.getHost) + else Address(uri.getScheme, uri.getUserInfo, uri.getHost, uri.getPort)) + } /** * Try to construct an Address from the given String or throw a java.net.MalformedURLException. @@ -92,18 +97,15 @@ object AddressExtractor { } object ActorPathExtractor { - def unapply(addr: String): Option[(Address, Iterable[String])] = { + def unapply(addr: String): Option[(Address, Iterable[String])] = try { val uri = new URI(addr) - if (uri.getScheme == null || (uri.getUserInfo == null && uri.getHost == null) || uri.getPath == null) None - else { - val addr = Address(uri.getScheme, if (uri.getUserInfo != null) uri.getUserInfo else uri.getHost, - if (uri.getUserInfo == null || uri.getHost == null) None else Some(uri.getHost), - if (uri.getPort < 0) None else Some(uri.getPort)) - Some((addr, ActorPath.split(uri.getPath).drop(1))) + if (uri.getPath == null) None + else AddressExtractor.unapply(uri) match { + case None ⇒ None + case Some(addr) ⇒ Some((addr, ActorPath.split(uri.getPath).drop(1))) } } catch { case _: URISyntaxException ⇒ None } - } } \ No newline at end of file diff --git a/akka-docs/scala/code/akka/docs/remoting/RemoteDeploymentDocSpec.scala b/akka-docs/scala/code/akka/docs/remoting/RemoteDeploymentDocSpec.scala index 0a8785444f..a8d75512c5 100644 --- a/akka-docs/scala/code/akka/docs/remoting/RemoteDeploymentDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/remoting/RemoteDeploymentDocSpec.scala @@ -28,7 +28,7 @@ class RemoteDeploymentDocSpec extends AkkaSpec(""" import RemoteDeploymentDocSpec._ val other = ActorSystem("remote", system.settings.config) - val address = other.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address("akka", "s", Some("host"), Some(1))).get + val address = other.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address("akka", "s", "host", 1)).get override def atTermination() { other.shutdown() } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index a182948dba..8acd33c7fb 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -61,7 +61,7 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor case sa: InetSocketAddress ⇒ sa case x ⇒ throw new RemoteTransportException("unknown local address type " + x.getClass, null) } - _address.compareAndSet(null, Address("akka", remoteSettings.systemName, Some(settings.Hostname), Some(addr.getPort))) + _address.compareAndSet(null, Address("akka", remoteSettings.systemName, settings.Hostname, addr.getPort)) } def address = _address.get diff --git a/akka-remote/src/main/scala/akka/remote/netty/Server.scala b/akka-remote/src/main/scala/akka/remote/netty/Server.scala index 2c51875e9d..0bafb1c712 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Server.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Server.scala @@ -183,7 +183,7 @@ class RemoteServerHandler( instruction.getCommandType match { case CommandType.CONNECT if settings.UsePassiveConnections ⇒ val origin = instruction.getOrigin - val inbound = Address("akka", origin.getSystem, Some(origin.getHostname), Some(origin.getPort)) + val inbound = Address("akka", origin.getSystem, origin.getHostname, origin.getPort) val client = new PassiveRemoteClient(event.getChannel, netty, inbound) netty.bindClient(inbound, client) case CommandType.SHUTDOWN ⇒ //Will be unbound in channelClosed @@ -203,7 +203,7 @@ class RemoteServerHandler( private def getClientAddress(c: Channel): Option[Address] = c.getRemoteAddress match { - case inet: InetSocketAddress ⇒ Some(Address("akka", "unknown(yet)", Some(inet.getAddress.toString), Some(inet.getPort))) + case inet: InetSocketAddress ⇒ Some(Address("akka", "unknown(yet)", inet.getAddress.toString, inet.getPort)) case _ ⇒ None } } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala index 1b250f1ea9..57d240e8d8 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteDeployerSpec.scala @@ -42,7 +42,7 @@ class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) { service, deployment.get.config, RoundRobinRouter(3), - RemoteScope(Address("akka", "sys", Some("wallace"), Some(2552)))))) + RemoteScope(Address("akka", "sys", "wallace", 2552))))) } } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala index bcabd85098..bf79caf847 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala @@ -160,7 +160,7 @@ akka.actor.deployment { children must have size 2 val parents = children.map(_.parent) parents must have size 1 - parents.head.address must be(Address("akka", "remote_sys", Some("localhost"), Some(12347))) + parents.head.address must be(Address("akka", "remote_sys", "localhost", 12347)) children foreach (_.address.toString must be === "akka://remote_sys@localhost:12347") system.stop(router) } From 607ec4c2cf6b7a4a57665dace3252c0562c35d22 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 13 Feb 2012 18:14:35 +0100 Subject: [PATCH 13/19] Switching approaches to check for max throttle --- .../akka/actor/dispatch/ActorModelSpec.scala | 2 +- akka-actor/src/main/resources/reference.conf | 10 ++--- .../akka/dispatch/AbstractDispatcher.scala | 7 ++- .../akka/dispatch/BalancingDispatcher.scala | 44 +++++-------------- .../main/scala/akka/dispatch/Dispatcher.scala | 7 ++- .../scala/akka/dispatch/Dispatchers.scala | 2 +- .../akka/dispatch/ThreadPoolBuilder.scala | 8 ++-- 7 files changed, 30 insertions(+), 50 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 15886973b2..deafb9cdc1 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -547,7 +547,7 @@ object BalancingDispatcherModelSpec { mailboxType, configureExecutor(), Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), - config.getInt("buddy-wakeup-threshold")) with MessageDispatcherInterceptor + config.getBoolean("attempt-teamwork")) with MessageDispatcherInterceptor override def dispatcher(): MessageDispatcher = instance } diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index b7e0563339..9e0ee70ff1 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -247,12 +247,10 @@ akka { # com.typesafe.config.Config parameter. mailbox-type = "" - # For BalancingDispatcher: if during message enqueuing the target actor is - # already busy and at least this number of messages is currently in the queue, - # then wake up another actor from the same dispatcher at random. - # Set to -1 to disable (which will also skip the possibly expensive check; - # obtaining the mailbox size is O(n) for the default mailboxes). - buddy-wakeup-threshold = 5 + # For BalancingDispatcher: If the balancing dispatcher should attempt to + # schedule idle actors using the same dispatcher when a message comes in, + # and the dispatchers ExecutorService is not fully busy already. + attempt-teamwork = on } debug { diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 22eadb55d5..6046e249af 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -156,7 +156,10 @@ trait ExecutionContext { * log the problem or whatever is appropriate for the implementation. */ def reportFailure(t: Throwable): Unit +} +private[akka] trait LoadMetrics { self: Executor ⇒ + def atFullThrottle(): Boolean } object MessageDispatcher { @@ -447,11 +450,13 @@ object ForkJoinExecutorConfigurator { final class AkkaForkJoinPool(parallelism: Int, threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, unhandledExceptionHandler: Thread.UncaughtExceptionHandler) - extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) { + extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) with LoadMetrics { override def execute(r: Runnable): Unit = r match { case m: Mailbox ⇒ super.execute(new MailboxExecutionTask(m)) case other ⇒ super.execute(other) } + + def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism() } /** diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 61ac773aa0..d2d978341c 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -4,15 +4,11 @@ package akka.dispatch -import util.DynamicVariable import akka.actor.{ ActorCell, ActorRef } -import java.util.concurrent.{ LinkedBlockingQueue, ConcurrentLinkedQueue, ConcurrentSkipListSet } import annotation.tailrec -import java.util.concurrent.atomic.AtomicBoolean import akka.util.{ Duration, Helpers } import java.util.{ Comparator, Iterator } -import akka.util.Unsafe -import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.{ Executor, LinkedBlockingQueue, ConcurrentLinkedQueue, ConcurrentSkipListSet } /** * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed @@ -36,7 +32,7 @@ class BalancingDispatcher( mailboxType: MailboxType, _executorServiceFactoryProvider: ExecutorServiceFactoryProvider, _shutdownTimeout: Duration, - buddyWakeupThreshold: Int) + attemptTeamWork: Boolean) extends Dispatcher(_prerequisites, _id, throughput, throughputDeadlineTime, mailboxType, _executorServiceFactoryProvider, _shutdownTimeout) { val buddies = new ConcurrentSkipListSet[ActorCell]( @@ -48,34 +44,12 @@ class BalancingDispatcher( case UnboundedMailbox() ⇒ new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { final val queue = new ConcurrentLinkedQueue[Envelope] - - override def enqueue(receiver: ActorRef, handle: Envelope) = { - super.enqueue(receiver, handle) - _pressure.getAndIncrement() - } - - override def dequeue(): Envelope = - super.dequeue() match { - case null ⇒ null - case x ⇒ _pressure.getAndDecrement(); x - } } case BoundedMailbox(cap, timeout) ⇒ new QueueBasedMessageQueue with BoundedMessageQueueSemantics { final val queue = new LinkedBlockingQueue[Envelope](cap) final val pushTimeOut = timeout - - override def enqueue(receiver: ActorRef, handle: Envelope) = { - super.enqueue(receiver, handle) - _pressure.getAndIncrement() - } - - override def dequeue(): Envelope = - super.dequeue() match { - case null ⇒ null - case x ⇒ _pressure.getAndDecrement(); x - } } case other ⇒ throw new IllegalArgumentException("Only handles BoundedMailbox and UnboundedMailbox, but you specified [" + other + "]") @@ -83,8 +57,6 @@ class BalancingDispatcher( protected[akka] override def createMailbox(actor: ActorCell): Mailbox = new SharingMailbox(actor) - private val _pressure = new AtomicLong - class SharingMailbox(_actor: ActorCell) extends Mailbox(_actor) with DefaultSystemMessageQueue { final def enqueue(receiver: ActorRef, handle: Envelope) = messageQueue.enqueue(receiver, handle) @@ -123,11 +95,15 @@ class BalancingDispatcher( override protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { messageQueue.enqueue(receiver.self, invocation) - if (!registerForExecution(receiver.mailbox, false, false) && - buddyWakeupThreshold >= 0 && - _pressure.get >= buddyWakeupThreshold) scheduleOne() + if (!registerForExecution(receiver.mailbox, false, false) && doTeamWork) scheduleOne() } + protected def doTeamWork(): Boolean = + attemptTeamWork && (executorService.get().executor match { + case lm: LoadMetrics ⇒ lm.atFullThrottle == false + case other ⇒ true + }) + @tailrec private def scheduleOne(i: Iterator[ActorCell] = buddies.iterator): Unit = if (i.hasNext && !registerForExecution(i.next.mailbox, false, false)) scheduleOne(i) -} +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index a735ea367e..2046f02286 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -32,12 +32,11 @@ class Dispatcher( val shutdownTimeout: Duration) extends MessageDispatcher(_prerequisites) { - protected[akka] val executorServiceFactory: ExecutorServiceFactory = + protected val executorServiceFactory: ExecutorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(id, prerequisites.threadFactory) - protected[akka] val executorService = new AtomicReference[ExecutorService](new ExecutorServiceDelegate { - lazy val executor = executorServiceFactory.createExecutorService - }) + protected val executorService = new AtomicReference[ExecutorServiceDelegate]( + new ExecutorServiceDelegate { lazy val executor = executorServiceFactory.createExecutorService }) protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { val mbox = receiver.mailbox diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index b9fd3f784b..5f4528146d 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -190,7 +190,7 @@ class BalancingDispatcherConfigurator(config: Config, prerequisites: DispatcherP Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), mailboxType, configureExecutor(), Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), - config.getInt("buddy-wakeup-threshold")) + config.getBoolean("attempt-teamwork")) /** * Returns the same dispatcher instance for each invocation diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index 1c63831013..b6fd432296 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -81,14 +81,16 @@ case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.def extends ExecutorServiceFactoryProvider { class ThreadPoolExecutorServiceFactory(val threadFactory: ThreadFactory) extends ExecutorServiceFactory { def createExecutorService: ExecutorService = { - val service = new ThreadPoolExecutor( + val service: ThreadPoolExecutor = new ThreadPoolExecutor( corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory, - rejectionPolicy) + rejectionPolicy) with LoadMetrics { + def atFullThrottle(): Boolean = this.getActiveCount >= this.getPoolSize + } service.allowCoreThreadTimeOut(allowCorePoolTimeout) service } @@ -182,7 +184,7 @@ case class MonitorableThreadFactory(name: String, protected def wire[T <: Thread](t: T): T = { t.setUncaughtExceptionHandler(exceptionHandler) t.setDaemon(daemonic) - contextClassLoader foreach (t.setContextClassLoader(_)) + contextClassLoader foreach t.setContextClassLoader t } } From 14d180c92ad2cf97b2f1d56f28bcea9d84fdc0df Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 13 Feb 2012 20:16:54 +0100 Subject: [PATCH 14/19] =?UTF-8?q?unborking=20compile=20in=20akka-cluster?= =?UTF-8?q?=20(sorry=20=E2=80=A6)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../test/scala/akka/cluster/AccrualFailureDetectorSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala index f611fc9812..4aab105273 100644 --- a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala @@ -9,7 +9,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" """) { "An AccrualFailureDetector" must { - val conn = Address("akka", "", Some("localhost"), Some(2552)) + val conn = Address("akka", "", "localhost", 2552) "mark node as available after a series of successful heartbeats" in { val fd = new AccrualFailureDetector(system) From 89cf7aa2f003740d0c53e2c30f3b0fcc149a4ae9 Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 13 Feb 2012 20:16:54 +0100 Subject: [PATCH 15/19] =?UTF-8?q?unborking=20compile=20in=20akka-cluster?= =?UTF-8?q?=20(sorry=20=E2=80=A6)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../test/scala/akka/cluster/AccrualFailureDetectorSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala index f611fc9812..4aab105273 100644 --- a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala @@ -9,7 +9,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" """) { "An AccrualFailureDetector" must { - val conn = Address("akka", "", Some("localhost"), Some(2552)) + val conn = Address("akka", "", "localhost", 2552) "mark node as available after a series of successful heartbeats" in { val fd = new AccrualFailureDetector(system) From 11f067abfc8817cad13a6ed407b1b310381bff6d Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 13 Feb 2012 20:42:14 +0100 Subject: [PATCH 16/19] fix DefaultResizer.pressure, make ResizerSpec less flaky MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - also clean up left-over reference to “buddy-wakeup-threshold” --- .../src/test/scala/akka/config/ConfigSpec.scala | 2 +- .../src/test/scala/akka/routing/ResizerSpec.scala | 7 +++---- akka-actor/src/main/scala/akka/routing/Routing.scala | 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index 13bb3b4f27..127907412e 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -53,7 +53,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) { c.getMilliseconds("shutdown-timeout") must equal(1 * 1000) c.getInt("throughput") must equal(5) c.getMilliseconds("throughput-deadline-time") must equal(0) - c.getInt("buddy-wakeup-threshold") must equal(5) + c.getBoolean("attempt-teamwork") must equal(true) } //Fork join executor config diff --git a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala index b9765c8e92..1f78c64edf 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ResizerSpec.scala @@ -27,7 +27,6 @@ object ResizerSpec { } bal-disp { type = BalancingDispatcher - buddy-wakeup-threshold = 1 } """ @@ -190,7 +189,7 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with def loop(loops: Int, d: Duration) = { for (m ← 0 until loops) router ! d - for (m ← 0 until loops) expectMsg(d * 2, "done") + for (m ← 0 until loops) expectMsg(d * 3, "done") } // 2 more should go thru without triggering more @@ -199,8 +198,8 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with routees(router) must be(3) // a whole bunch should max it out - loop(4, 500 millis) - awaitCond(routees(router) == 4) + loop(10, 500 millis) + awaitCond(routees(router) > 3) loop(10, 500 millis) awaitCond(routees(router) == 5) diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index b050a21b53..4ff6609255 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -1059,7 +1059,7 @@ case class DefaultResizer( case a: LocalActorRef ⇒ val cell = a.underlying pressureThreshold match { - case 1 ⇒ cell.mailbox.isScheduled && cell.currentMessage != null + case 1 ⇒ cell.mailbox.isScheduled && cell.mailbox.hasMessages case i if i < 1 ⇒ cell.mailbox.isScheduled && cell.currentMessage != null case threshold ⇒ cell.mailbox.numberOfMessages >= threshold } From 0f314582b852a84e7a3dda4976a8351ff0245052 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 13 Feb 2012 21:37:30 +0100 Subject: [PATCH 17/19] Sprinkling magic dust and webscale sauce --- .../akka/dispatch/BalancingDispatcher.scala | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index d2d978341c..7ac06da5ad 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -90,20 +90,23 @@ class BalancingDispatcher( protected[akka] override def unregister(actor: ActorCell) = { buddies.remove(actor) super.unregister(actor) - if (messageQueue.hasMessages) scheduleOne() + scheduleOne() } override protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { messageQueue.enqueue(receiver.self, invocation) - if (!registerForExecution(receiver.mailbox, false, false) && doTeamWork) scheduleOne() + registerForExecution(receiver.mailbox, false, false) + scheduleOne() } - protected def doTeamWork(): Boolean = - attemptTeamWork && (executorService.get().executor match { - case lm: LoadMetrics ⇒ lm.atFullThrottle == false - case other ⇒ true - }) - @tailrec private def scheduleOne(i: Iterator[ActorCell] = buddies.iterator): Unit = - if (i.hasNext && !registerForExecution(i.next.mailbox, false, false)) scheduleOne(i) + if (attemptTeamWork + && messageQueue.hasMessages + && i.hasNext + && (executorService.get().executor match { + case lm: LoadMetrics ⇒ lm.atFullThrottle == false + case other ⇒ true + }) + && !registerForExecution(i.next.mailbox, false, false)) + scheduleOne(i) } \ No newline at end of file From 498011815980828415eac11ff19e09a41a40d91f Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 13 Feb 2012 22:04:39 +0100 Subject: [PATCH 18/19] Renaming buddies to team and then optimized usage of the teamwork --- .../akka/actor/dispatch/ActorModelSpec.scala | 10 +++--- .../akka/dispatch/BalancingDispatcher.scala | 35 ++++++++++--------- 2 files changed, 22 insertions(+), 23 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index deafb9cdc1..88358e9f16 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -366,14 +366,12 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa case e ⇒ dispatcher match { case dispatcher: BalancingDispatcher ⇒ - val buddies = dispatcher.buddies + val team = dispatcher.team val mq = dispatcher.messageQueue - System.err.println("Buddies left: " + buddies.size + " stopLatch: " + stopLatch.getCount + " inhab:" + dispatcher.inhabitants) - buddies.toArray sorted new Ordering[AnyRef] { - def compare(l: AnyRef, r: AnyRef) = (l, r) match { - case (ll: ActorCell, rr: ActorCell) ⇒ ll.self.path.toString.compareTo(rr.self.path.toString) - } + System.err.println("Teammates left: " + team.size + " stopLatch: " + stopLatch.getCount + " inhab:" + dispatcher.inhabitants) + team.toArray sorted new Ordering[AnyRef] { + def compare(l: AnyRef, r: AnyRef) = (l, r) match { case (ll: ActorCell, rr: ActorCell) ⇒ ll.self.path compareTo rr.self.path } } foreach { case cell: ActorCell ⇒ System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.status + " " + cell.mailbox.numberOfMessages + " " + SystemMessage.size(cell.mailbox.systemDrain())) diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 7ac06da5ad..46701848c5 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -35,7 +35,7 @@ class BalancingDispatcher( attemptTeamWork: Boolean) extends Dispatcher(_prerequisites, _id, throughput, throughputDeadlineTime, mailboxType, _executorServiceFactoryProvider, _shutdownTimeout) { - val buddies = new ConcurrentSkipListSet[ActorCell]( + val team = new ConcurrentSkipListSet[ActorCell]( Helpers.identityHashComparator(new Comparator[ActorCell] { def compare(l: ActorCell, r: ActorCell) = l.self.path compareTo r.self.path })) @@ -82,31 +82,32 @@ class BalancingDispatcher( } } - protected[akka] override def register(actor: ActorCell) = { + protected[akka] override def register(actor: ActorCell): Unit = { super.register(actor) - buddies.add(actor) + team.add(actor) } - protected[akka] override def unregister(actor: ActorCell) = { - buddies.remove(actor) + protected[akka] override def unregister(actor: ActorCell): Unit = { + team.remove(actor) super.unregister(actor) - scheduleOne() + teamWork() } override protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { messageQueue.enqueue(receiver.self, invocation) registerForExecution(receiver.mailbox, false, false) - scheduleOne() + teamWork() } - @tailrec private def scheduleOne(i: Iterator[ActorCell] = buddies.iterator): Unit = - if (attemptTeamWork - && messageQueue.hasMessages - && i.hasNext - && (executorService.get().executor match { - case lm: LoadMetrics ⇒ lm.atFullThrottle == false - case other ⇒ true - }) - && !registerForExecution(i.next.mailbox, false, false)) - scheduleOne(i) + protected def teamWork(): Unit = if (attemptTeamWork) { + @tailrec def scheduleOne(i: Iterator[ActorCell] = team.iterator): Unit = + if (messageQueue.hasMessages + && i.hasNext + && (executorService.get().executor match { + case lm: LoadMetrics ⇒ lm.atFullThrottle == false + case other ⇒ true + }) + && !registerForExecution(i.next.mailbox, false, false)) + scheduleOne(i) + } } \ No newline at end of file From 060633331a1f8e58c9f0258a50bf5145ebe9d900 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 13 Feb 2012 22:07:12 +0100 Subject: [PATCH 19/19] Restoring the order of the universe --- .../src/main/scala/akka/dispatch/BalancingDispatcher.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 46701848c5..ec675f7b5b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -109,5 +109,7 @@ class BalancingDispatcher( }) && !registerForExecution(i.next.mailbox, false, false)) scheduleOne(i) + + scheduleOne() } } \ No newline at end of file