Merge pull request #323 from jboner/wip-1808-timeout-patriknw

Replace akka.actor.timeout with specfic settings. See #1808
This commit is contained in:
viktorklang 2012-02-13 04:52:30 -08:00
commit 216513933a
27 changed files with 283 additions and 193 deletions

View file

@ -4,14 +4,21 @@
package akka.actor
import java.io.File
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions
import akka.util.Timeout
import akka.util.duration._
@deprecated("use ActorSystem instead", "2.0")
object GlobalActorSystem extends ActorSystemImpl("GlobalSystem", OldConfigurationLoader.defaultConfig) {
start()
/**
* Timeout used in `OldFuture.get` and default implicit ask timeout.
* Hard coded since the migration kit is not intended to be used for production anyway.
*/
val AwaitTimeout = Timeout(5 seconds)
}
/**

View file

@ -18,13 +18,13 @@ class OldFuture[T](future: Future[T]) {
@deprecated("use akka.dispatch.Await.result instead", "2.0")
def get: T = try {
Await.result(future, GlobalActorSystem.settings.ActorTimeout.duration)
Await.result(future, GlobalActorSystem.AwaitTimeout.duration)
} catch {
case e: TimeoutException throw new FutureTimeoutException(e.getMessage, e)
}
@deprecated("use akka.dispatch.Await.ready instead", "2.0")
def await: Future[T] = await(GlobalActorSystem.settings.ActorTimeout.duration)
def await: Future[T] = await(GlobalActorSystem.AwaitTimeout.duration)
@deprecated("use akka.dispatch.Await.ready instead", "2.0")
def await(atMost: Duration) = try {

View file

@ -14,7 +14,7 @@ package object migration {
implicit def future2OldFuture[T](future: Future[T]): OldFuture[T] = new OldFuture[T](future)
implicit def askTimeout: Timeout = GlobalActorSystem.settings.ActorTimeout
implicit def askTimeout: Timeout = GlobalActorSystem.AwaitTimeout
implicit def defaultDispatcher: MessageDispatcher = GlobalActorSystem.dispatcher

View file

@ -5,6 +5,7 @@ import akka.actor.ActorSystem;
import akka.japi.*;
import akka.util.Duration;
import akka.testkit.TestKitExtension;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -28,7 +29,7 @@ public class JavaFutureTests {
@BeforeClass
public static void beforeAll() {
system = ActorSystem.create("JavaFutureTests", AkkaSpec.testConf());
t = system.settings().ActorTimeout();
t = TestKitExtension.get(system).DefaultTimeout();
}
@AfterClass
@ -61,10 +62,10 @@ public class JavaFutureTests {
Promise<String> cf = Futures.promise(system.dispatcher());
Future<String> f = cf;
f.onSuccess(new OnSuccess<String>() {
public void onSuccess(String result) {
if (result.equals("foo"))
latch.countDown();
}
public void onSuccess(String result) {
if (result.equals("foo"))
latch.countDown();
}
});
cf.success("foo");
@ -78,10 +79,10 @@ public class JavaFutureTests {
Promise<String> cf = Futures.promise(system.dispatcher());
Future<String> f = cf;
f.onFailure(new OnFailure() {
public void onFailure(Throwable t) {
if (t instanceof NullPointerException)
latch.countDown();
}
public void onFailure(Throwable t) {
if (t instanceof NullPointerException)
latch.countDown();
}
});
Throwable exception = new NullPointerException();
@ -296,8 +297,10 @@ public class JavaFutureTests {
Promise<Object> p = Futures.promise(system.dispatcher());
Future<Object> f = p.future().recover(new Recover<Object>() {
public Object recover(Throwable t) throws Throwable {
if (t == fail) return "foo";
else throw t;
if (t == fail)
return "foo";
else
throw t;
}
});
Duration d = Duration.create(1, TimeUnit.SECONDS);
@ -311,8 +314,10 @@ public class JavaFutureTests {
Promise<Object> p = Futures.promise(system.dispatcher());
Future<Object> f = p.future().recoverWith(new Recover<Future<Object>>() {
public Future<Object> recover(Throwable t) throws Throwable {
if (t == fail) return Futures.<Object>successful("foo", system.dispatcher()).future();
else throw t;
if (t == fail)
return Futures.<Object> successful("foo", system.dispatcher()).future();
else
throw t;
}
});
Duration d = Duration.create(1, TimeUnit.SECONDS);

View file

@ -113,7 +113,7 @@ object TypedActorSpec {
}
def futureComposePigdogFrom(foo: Foo): Future[String] = {
implicit val timeout = TypedActor.context.system.settings.ActorTimeout
implicit val timeout = TypedActor(TypedActor.context.system).DefaultReturnTimeout
foo.futurePigdog(500).map(_.toUpperCase)
}

View file

@ -5,8 +5,9 @@ package akka.pattern
import akka.testkit.AkkaSpec
import akka.util.duration._
import akka.testkit.DefaultTimeout
class AskSpec extends AkkaSpec {
class AskSpec extends AkkaSpec with DefaultTimeout {
"The “ask” pattern" must {
@ -22,7 +23,6 @@ class AskSpec extends AkkaSpec {
"return broken promises on EmptyLocalActorRefs" in {
val empty = system.actorFor("unknown")
implicit val timeout = system.settings.ActorTimeout
val f = empty ? 3.14
f.isCompleted must be(true)
f.value.get match {

View file

@ -1,6 +1,6 @@
##############################
# Akka Reference Config File #
##############################
####################################
# Akka Actor Reference Config File #
####################################
# This the reference config file has all the default settings.
# Make your edits/overrides in your application.conf.
@ -50,12 +50,6 @@ akka {
# removed from their parents
reaper-interval = 5s
# Default timeout for Future based invocations
# - Actor: ask && ?
# - UntypedActor: ask
# - TypedActor: methods with non-void return type
timeout = 5s
# Serializes and deserializes (non-primitive) messages to ensure immutability,
# this is only intended for testing.
serialize-messages = off
@ -64,6 +58,11 @@ akka {
# this is only intended for testing.
serialize-creators = off
typed {
# Default timeout for typed actor methods with non-void return type
timeout = 5s
}
deployment {
# deployment id pattern - on the format: /parent/child etc.

View file

@ -70,7 +70,6 @@ object ActorSystem {
final val CreationTimeout = Timeout(Duration(getMilliseconds("akka.actor.creation-timeout"), MILLISECONDS))
final val ReaperInterval = Duration(getMilliseconds("akka.actor.reaper-interval"), MILLISECONDS)
final val ActorTimeout = Timeout(Duration(getMilliseconds("akka.actor.timeout"), MILLISECONDS))
final val SerializeAllMessages = getBoolean("akka.actor.serialize-messages")
final val SerializeAllCreators = getBoolean("akka.actor.serialize-creators")

View file

@ -11,7 +11,9 @@ import java.util.concurrent.atomic.{ AtomicReference ⇒ AtomVar }
import akka.serialization.{ Serialization, SerializationExtension }
import akka.dispatch._
import java.util.concurrent.TimeoutException
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.lang.IllegalStateException
import akka.util.Duration
trait TypedActorFactory {
@ -502,7 +504,7 @@ case class TypedProps[T <: AnyRef] protected[TypedProps] (
/**
* @return a new TypedProps that will use the specified Timeout for its non-void-returning methods,
* if null is specified, it will use the default ActorTimeout as specified in the configuration.
* if null is specified, it will use the default timeout as specified in the configuration.
*
* Java API
*/
@ -510,7 +512,7 @@ case class TypedProps[T <: AnyRef] protected[TypedProps] (
/**
* @return a new TypedProps that will use the specified Timeout for its non-void-returning methods,
* if None is specified, it will use the default ActorTimeout as specified in the configuration.
* if None is specified, it will use the default timeout as specified in the configuration.
*
* Scala API
*/
@ -550,6 +552,11 @@ class TypedActorExtension(system: ExtendedActorSystem) extends TypedActorFactory
val serialization = SerializationExtension(system)
val settings = system.settings
/**
* Default timeout for typed actor methods with non-void return type
*/
final val DefaultReturnTimeout = Timeout(Duration(settings.config.getMilliseconds("akka.actor.typed.timeout"), MILLISECONDS))
/**
* Retrieves the underlying ActorRef for the supplied TypedActor proxy, or null if none found
*/
@ -575,7 +582,7 @@ class TypedActorExtension(system: ExtendedActorSystem) extends TypedActorFactory
new TypedActorInvocationHandler(
this,
actorVar,
if (props.timeout.isDefined) props.timeout.get else this.settings.ActorTimeout)).asInstanceOf[R]
if (props.timeout.isDefined) props.timeout.get else DefaultReturnTimeout)).asInstanceOf[R]
proxyVar match {
case null

View file

@ -75,6 +75,24 @@ akka-testkit
.. literalinclude:: ../../akka-testkit/src/main/resources/reference.conf
:language: none
akka-transactor
~~~~~~~~~~~~~~~
.. literalinclude:: ../../akka-transactor/src/main/resources/reference.conf
:language: none
akka-agent
~~~~~~~~~~
.. literalinclude:: ../../akka-agent/src/main/resources/reference.conf
:language: none
akka-zeromq
~~~~~~~~~~~
.. literalinclude:: ../../akka-zeromq/src/main/resources/reference.conf
:language: none
akka-beanstalk-mailbox
~~~~~~~~~~~~~~~~~~~~~~

View file

@ -78,7 +78,7 @@ public class FutureDocTestBase {
ActorRef actor = system.actorOf(new Props(MyActor.class));
String msg = "hello";
//#ask-blocking
Timeout timeout = system.settings().ActorTimeout();
Timeout timeout = new Timeout(Duration.parse("5 seconds"));
Future<Object> future = Patterns.ask(actor, msg, timeout);
String result = (String) Await.result(future, timeout.duration());
//#ask-blocking
@ -196,19 +196,17 @@ public class FutureDocTestBase {
Iterable<Future<Integer>> listOfFutureInts = source;
// now we have a Future[Iterable[Integer]]
Future<Iterable<Integer>> futureListOfInts =
sequence(listOfFutureInts, system.dispatcher());
Future<Iterable<Integer>> futureListOfInts = sequence(listOfFutureInts, system.dispatcher());
// Find the sum of the odd numbers
Future<Long> futureSum = futureListOfInts.map(
new Mapper<Iterable<Integer>, Long>() {
public Long apply(Iterable<Integer> ints) {
long sum = 0;
for (Integer i : ints)
sum += i;
return sum;
}
});
Future<Long> futureSum = futureListOfInts.map(new Mapper<Iterable<Integer>, Long>() {
public Long apply(Iterable<Integer> ints) {
long sum = 0;
for (Integer i : ints)
sum += i;
return sum;
}
});
long result = Await.result(futureSum, Duration.create(1, SECONDS));
//#sequence
@ -221,20 +219,18 @@ public class FutureDocTestBase {
//Just a sequence of Strings
Iterable<String> listStrings = Arrays.asList("a", "b", "c");
Future<Iterable<String>> futureResult = traverse(listStrings,
new Function<String, Future<String>>() {
public Future<String> apply(final String r) {
return future(new Callable<String>() {
public String call() {
return r.toUpperCase();
}
}, system.dispatcher());
}
}, system.dispatcher());
Future<Iterable<String>> futureResult = traverse(listStrings, new Function<String, Future<String>>() {
public Future<String> apply(final String r) {
return future(new Callable<String>() {
public String call() {
return r.toUpperCase();
}
}, system.dispatcher());
}
}, system.dispatcher());
//Returns the sequence of strings as upper case
Iterable<String> result =
Await.result(futureResult, Duration.create(1, SECONDS));
Iterable<String> result = Await.result(futureResult, Duration.create(1, SECONDS));
assertEquals(Arrays.asList("A", "B", "C"), result);
//#traverse
}
@ -250,12 +246,11 @@ public class FutureDocTestBase {
Iterable<Future<String>> futures = source;
//Start value is the empty string
Future<String> resultFuture = fold("", futures,
new Function2<String, String, String>() {
public String apply(String r, String t) {
return r + t; //Just concatenate
}
}, system.dispatcher());
Future<String> resultFuture = fold("", futures, new Function2<String, String, String>() {
public String apply(String r, String t) {
return r + t; //Just concatenate
}
}, system.dispatcher());
String result = Await.result(resultFuture, Duration.create(1, SECONDS));
//#fold
@ -272,12 +267,11 @@ public class FutureDocTestBase {
//A sequence of Futures, in this case Strings
Iterable<Future<String>> futures = source;
Future<Object> resultFuture = reduce(futures,
new Function2<Object, String, Object>() {
public Object apply(Object r, String t) {
return r + t; //Just concatenate
}
}, system.dispatcher());
Future<Object> resultFuture = reduce(futures, new Function2<Object, String, Object>() {
public Object apply(Object r, String t) {
return r + t; //Just concatenate
}
}, system.dispatcher());
Object result = Await.result(resultFuture, Duration.create(1, SECONDS));
//#reduce
@ -285,32 +279,35 @@ public class FutureDocTestBase {
assertEquals("ab", result);
}
@Test public void useSuccessfulAndFailed() {
@Test
public void useSuccessfulAndFailed() {
//#successful
Future<String> future = Futures.successful("Yay!", system.dispatcher());
//#successful
//#failed
Future<String> otherFuture =
Futures.failed(new IllegalArgumentException("Bang!"), system.dispatcher());
Future<String> otherFuture = Futures.failed(new IllegalArgumentException("Bang!"), system.dispatcher());
//#failed
Object result = Await.result(future, Duration.create(1, SECONDS));
assertEquals("Yay!",result);
assertEquals("Yay!", result);
Throwable result2 = Await.result(otherFuture.failed(), Duration.create(1, SECONDS));
assertEquals("Bang!",result2.getMessage());
assertEquals("Bang!", result2.getMessage());
}
@Test public void useFilter() {
//#filter
@Test
public void useFilter() {
//#filter
Future<Integer> future1 = Futures.successful(4, system.dispatcher());
Future<Integer> successfulFilter =
future1.filter(new Filter<Integer>() {
public boolean filter(Integer i) { return i % 2 == 0; }
});
Future<Integer> successfulFilter = future1.filter(new Filter<Integer>() {
public boolean filter(Integer i) {
return i % 2 == 0;
}
});
Future<Integer> failedFilter =
future1.filter(new Filter<Integer>() {
public boolean filter(Integer i) { return i % 2 != 0; }
});
Future<Integer> failedFilter = future1.filter(new Filter<Integer>() {
public boolean filter(Integer i) {
return i % 2 != 0;
}
});
//When filter fails, the returned Future will be failed with a scala.MatchError
//#filter
}
@ -323,138 +320,140 @@ public class FutureDocTestBase {
}
@Test public void useAndThen() {
@Test
public void useAndThen() {
//#and-then
Future<String> future1 = Futures.successful("value", system.dispatcher()).
andThen(new OnComplete<String>() {
public void onComplete(Throwable failure, String result) {
if (failure != null) sendToIssueTracker(failure);
}
Future<String> future1 = Futures.successful("value", system.dispatcher()).andThen(new OnComplete<String>() {
public void onComplete(Throwable failure, String result) {
if (failure != null)
sendToIssueTracker(failure);
}
}).andThen(new OnComplete<String>() {
public void onComplete(Throwable failure, String result) {
if (result != null) sendToTheInternetz(result);
}
public void onComplete(Throwable failure, String result) {
if (result != null)
sendToTheInternetz(result);
}
});
//#and-then
}
@Test public void useRecover() {
@Test
public void useRecover() {
//#recover
Future<Integer> future = future(new Callable<Integer>() {
public Integer call() {
return 1 / 0;
}
}, system.dispatcher()).recover(new Recover<Integer>() {
public Integer recover(Throwable problem) throws Throwable {
if (problem instanceof ArithmeticException) return 0;
else throw problem;
}
public Integer recover(Throwable problem) throws Throwable {
if (problem instanceof ArithmeticException)
return 0;
else
throw problem;
}
});
int result = Await.result(future, Duration.create(1, SECONDS));
assertEquals(result, 0);
//#recover
}
@Test public void useTryRecover() {
@Test
public void useTryRecover() {
//#try-recover
Future<Integer> future = future(new Callable<Integer>() {
public Integer call() {
return 1 / 0;
}
}, system.dispatcher()).recoverWith(new Recover<Future<Integer>>() {
public Future<Integer> recover(Throwable problem) throws Throwable {
if (problem instanceof ArithmeticException) {
return future(new Callable<Integer>() {
public Integer call() {
return 0;
}
}, system.dispatcher());
public Future<Integer> recover(Throwable problem) throws Throwable {
if (problem instanceof ArithmeticException) {
return future(new Callable<Integer>() {
public Integer call() {
return 0;
}
else throw problem;
}
}, system.dispatcher());
} else
throw problem;
}
});
int result = Await.result(future, Duration.create(1, SECONDS));
assertEquals(result, 0);
//#try-recover
}
@Test public void useOnSuccessOnFailureAndOnComplete() {
{
@Test
public void useOnSuccessOnFailureAndOnComplete() {
{
Future<String> future = Futures.successful("foo", system.dispatcher());
//#onSuccess
future.onSuccess(new OnSuccess<String>() {
public void onSuccess(String result) {
if ("bar" == result) {
//Do something if it resulted in "bar"
} else {
//Do something if it was some other String
}
public void onSuccess(String result) {
if ("bar" == result) {
//Do something if it resulted in "bar"
} else {
//Do something if it was some other String
}
}
});
//#onSuccess
}
{
Future<String> future =
Futures.failed(new IllegalStateException("OHNOES"), system.dispatcher());
//#onFailure
future.onFailure( new OnFailure() {
}
{
Future<String> future = Futures.failed(new IllegalStateException("OHNOES"), system.dispatcher());
//#onFailure
future.onFailure(new OnFailure() {
public void onFailure(Throwable failure) {
if (failure instanceof IllegalStateException) {
//Do something if it was this particular failure
} else {
//Do something if it was some other failure
}
if (failure instanceof IllegalStateException) {
//Do something if it was this particular failure
} else {
//Do something if it was some other failure
}
}
});
//#onFailure
}
{
Future<String> future = Futures.successful("foo", system.dispatcher());
//#onComplete
future.onComplete(new OnComplete<String>() {
public void onComplete(Throwable failure, String result) {
if (failure != null) {
//We got a failure, handle it here
} else {
// We got a result, do something with it
}
}
});
//#onComplete
}
}
{
Future<String> future = Futures.successful("foo", system.dispatcher());
//#onComplete
future.onComplete(new OnComplete<String>() {
public void onComplete(Throwable failure, String result) {
if (failure != null) {
//We got a failure, handle it here
} else {
// We got a result, do something with it
}
}
});
//#onComplete
}
}
@Test public void useOrAndZip(){
@Test
public void useOrAndZip() {
{
//#zip
Future<String> future1 = Futures.successful("foo", system.dispatcher());
Future<String> future2 = Futures.successful("bar", system.dispatcher());
Future<String> future3 =
future1.zip(future2).map(new Mapper<scala.Tuple2<String,String>, String>() {
public String apply(scala.Tuple2<String,String> zipped) {
return zipped._1() + " " + zipped._2();
//#zip
Future<String> future1 = Futures.successful("foo", system.dispatcher());
Future<String> future2 = Futures.successful("bar", system.dispatcher());
Future<String> future3 = future1.zip(future2).map(new Mapper<scala.Tuple2<String, String>, String>() {
public String apply(scala.Tuple2<String, String> zipped) {
return zipped._1() + " " + zipped._2();
}
});
});
String result = Await.result(future3, Duration.create(1, SECONDS));
assertEquals("foo bar", result);
//#zip
String result = Await.result(future3, Duration.create(1, SECONDS));
assertEquals("foo bar", result);
//#zip
}
{
//#fallback-to
Future<String> future1 =
Futures.failed(new IllegalStateException("OHNOES1"), system.dispatcher());
Future<String> future2 =
Futures.failed(new IllegalStateException("OHNOES2"), system.dispatcher());
Future<String> future3 =
Futures.successful("bar", system.dispatcher());
Future<String> future4 =
future1.fallbackTo(future2).fallbackTo(future3); // Will have "bar" in this case
String result = Await.result(future4, Duration.create(1, SECONDS));
assertEquals("bar", result);
//#fallback-to
//#fallback-to
Future<String> future1 = Futures.failed(new IllegalStateException("OHNOES1"), system.dispatcher());
Future<String> future2 = Futures.failed(new IllegalStateException("OHNOES2"), system.dispatcher());
Future<String> future3 = Futures.successful("bar", system.dispatcher());
Future<String> future4 = future1.fallbackTo(future2).fallbackTo(future3); // Will have "bar" in this case
String result = Await.result(future4, Duration.create(1, SECONDS));
assertEquals("bar", result);
//#fallback-to
}
}

View file

@ -54,9 +54,9 @@ public class ParentActor extends UntypedActor {
ActorRef scatterGatherFirstCompletedRouter = getContext().actorOf(
new Props(FibonacciActor.class).withRouter(new ScatterGatherFirstCompletedRouter(5, Duration
.parse("2 seconds"))), "router");
Timeout timeout = getContext().system().settings().ActorTimeout();
Future<Object> futureResult = akka.pattern.Patterns.ask(
scatterGatherFirstCompletedRouter, new FibonacciActor.FibonacciNumber(10), timeout);
Timeout timeout = new Timeout(Duration.parse("5 seconds"));
Future<Object> futureResult = akka.pattern.Patterns.ask(scatterGatherFirstCompletedRouter,
new FibonacciActor.FibonacciNumber(10), timeout);
int result = (Integer) Await.result(futureResult, timeout.duration());
//#scatterGatherFirstCompletedRouter
System.out.println(String.format("The result of calculating Fibonacci for 10 is %d", result));

View file

@ -11,6 +11,7 @@ import akka.actor.Props
import akka.actor.Status.Failure
import akka.dispatch.Future
import akka.dispatch.Await
import akka.util.Timeout
import akka.util.duration._
import akka.dispatch.Promise
import java.lang.IllegalStateException
@ -46,8 +47,10 @@ class FutureDocSpec extends AkkaSpec {
//#ask-blocking
import akka.dispatch.Await
import akka.pattern.ask
import akka.util.Timeout
import akka.util.duration._
implicit val timeout = system.settings.ActorTimeout
implicit val timeout = Timeout(5 seconds)
val future = actor ? msg // enabled by the ask import
val result = Await.result(future, timeout.duration).asInstanceOf[String]
//#ask-blocking
@ -57,7 +60,7 @@ class FutureDocSpec extends AkkaSpec {
"demonstrate usage of mapTo" in {
val actor = system.actorOf(Props[MyActor])
val msg = "hello"
implicit val timeout = system.settings.ActorTimeout
implicit val timeout = Timeout(5 seconds)
//#map-to
import akka.dispatch.Future
import akka.pattern.ask
@ -164,7 +167,7 @@ class FutureDocSpec extends AkkaSpec {
val actor3 = system.actorOf(Props[MyActor])
val msg1 = 1
val msg2 = 2
implicit val timeout = system.settings.ActorTimeout
implicit val timeout = Timeout(5 seconds)
import akka.dispatch.Await
import akka.pattern.ask
//#composing-wrong
@ -188,7 +191,7 @@ class FutureDocSpec extends AkkaSpec {
val actor3 = system.actorOf(Props[MyActor])
val msg1 = 1
val msg2 = 2
implicit val timeout = system.settings.ActorTimeout
implicit val timeout = Timeout(5 seconds)
import akka.dispatch.Await
import akka.pattern.ask
//#composing
@ -208,7 +211,7 @@ class FutureDocSpec extends AkkaSpec {
}
"demonstrate usage of sequence with actors" in {
implicit val timeout = system.settings.ActorTimeout
implicit val timeout = Timeout(5 seconds)
val oddActor = system.actorOf(Props[OddActor])
//#sequence-ask
// oddActor returns odd numbers sequentially from 1 as a List[Future[Int]]
@ -256,7 +259,7 @@ class FutureDocSpec extends AkkaSpec {
}
"demonstrate usage of recover" in {
implicit val timeout = system.settings.ActorTimeout
implicit val timeout = Timeout(5 seconds)
val actor = system.actorOf(Props[MyActor])
val msg1 = -1
//#recover
@ -268,7 +271,7 @@ class FutureDocSpec extends AkkaSpec {
}
"demonstrate usage of recoverWith" in {
implicit val timeout = system.settings.ActorTimeout
implicit val timeout = Timeout(5 seconds)
val actor = system.actorOf(Props[MyActor])
val msg1 = -1
//#try-recover

View file

@ -7,6 +7,7 @@ import akka.routing.{ ScatterGatherFirstCompletedRouter, BroadcastRouter, Random
import annotation.tailrec
import akka.actor.{ Props, Actor }
import akka.util.duration._
import akka.util.Timeout
import akka.dispatch.Await
import akka.pattern.ask
import akka.routing.SmallestMailboxRouter
@ -80,7 +81,7 @@ class ParentActor extends Actor {
val scatterGatherFirstCompletedRouter = context.actorOf(
Props[FibonacciActor].withRouter(ScatterGatherFirstCompletedRouter(
nrOfInstances = 5, within = 2 seconds)), "router")
implicit val timeout = context.system.settings.ActorTimeout
implicit val timeout = Timeout(5 seconds)
val futureResult = scatterGatherFirstCompletedRouter ? FibonacciNumber(10)
val result = Await.result(futureResult, timeout.duration)
//#scatterGatherFirstCompletedRouter

View file

@ -44,7 +44,7 @@ akka {
/looker/child/grandchild.remote = "akka://RemoteCommunicationSpec@localhost:12345"
}
}
""") with ImplicitSender {
""") with ImplicitSender with DefaultTimeout {
import RemoteCommunicationSpec._
@ -59,8 +59,6 @@ akka {
val here = system.actorFor("akka://remote_sys@localhost:12346/user/echo")
implicit val timeout = system.settings.ActorTimeout
override def atTermination() {
other.shutdown()
}

View file

@ -18,6 +18,9 @@ akka {
# duration to wait in expectMsg and friends outside of within() block by default
single-expect-default = 3s
# The timeout that is added as an implicit by DefaultTimeout trait
default-timeout = 5s
calling-thread-dispatcher {
type = akka.testkit.CallingThreadDispatcherConfigurator
}

View file

@ -68,7 +68,7 @@ class TestActorRef[T <: Actor](
if (isTerminated) throw new IllegalActorStateException("underlying actor is terminated")
underlying.actor.asInstanceOf[T] match {
case null
val t = underlying.system.settings.ActorTimeout
val t = TestKitExtension(_system).DefaultTimeout
Await.result(this.?(InternalGetActor)(t), t.duration).asInstanceOf[T]
case ref ref
}

View file

@ -11,6 +11,7 @@ import java.util.concurrent.{ BlockingDeque, LinkedBlockingDeque, TimeUnit, atom
import atomic.AtomicInteger
import scala.annotation.tailrec
import akka.actor.ActorSystem
import akka.util.Timeout
object TestActor {
type Ignore = Option[PartialFunction[AnyRef, Boolean]]
@ -644,5 +645,5 @@ trait ImplicitSender { this: TestKit ⇒
}
trait DefaultTimeout { this: TestKit
implicit val timeout = system.settings.ActorTimeout
implicit val timeout: Timeout = testKitSettings.DefaultTimeout
}

View file

@ -5,6 +5,7 @@ package akka.testkit
import com.typesafe.config.Config
import akka.util.Duration
import akka.util.Timeout
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.{ ExtensionId, ActorSystem, Extension, ExtendedActorSystem }
@ -20,4 +21,5 @@ class TestKitSettings(val config: Config) extends Extension {
val TestTimeFactor = getDouble("akka.test.timefactor")
val SingleExpectDefaultTimeout = Duration(getMilliseconds("akka.test.single-expect-default"), MILLISECONDS)
val TestEventFilterLeeway = Duration(getMilliseconds("akka.test.filter-leeway"), MILLISECONDS)
val DefaultTimeout = Timeout(Duration(getMilliseconds("akka.test.default-timeout"), MILLISECONDS))
}

View file

@ -122,7 +122,7 @@ class AkkaSpecSpec extends WordSpec with MustMatchers {
try {
var locker = Seq.empty[DeadLetter]
implicit val timeout = system.settings.ActorTimeout
implicit val timeout = TestKitExtension(system).DefaultTimeout
implicit val davyJones = otherSystem.actorOf(Props(new Actor {
def receive = {
case m: DeadLetter locker :+= m

View file

@ -0,0 +1,13 @@
#########################################
# Akka Transactor Reference Config File #
#########################################
# This the reference config file has all the default settings.
# Make your edits/overrides in your application.conf.
akka {
transactor {
# The timeout used for coordinated transactions across actors
coordinated-timeout = 5s
}
}

View file

@ -93,6 +93,8 @@ case class SendTo(actor: ActorRef, message: Option[Any] = None)
* @see [[akka.transactor.Coordinated]] for more information about the underlying mechanism
*/
trait Transactor extends Actor {
private val settings = TransactorExtension(context.system)
/**
* Implement a general pattern for using coordinated transactions.
*/
@ -108,7 +110,7 @@ trait Transactor extends Actor {
}
case message {
if (normally.isDefinedAt(message)) normally(message)
else receive(Coordinated(message)(context.system.settings.ActorTimeout))
else receive(Coordinated(message)(settings.CoordinatedTimeout))
}
}

View file

@ -0,0 +1,25 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor
import akka.actor.{ ActorSystem, ExtensionId, ExtensionIdProvider, ExtendedActorSystem }
import akka.actor.Extension
import com.typesafe.config.Config
import akka.util.Timeout
import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS
/**
* TransactorExtension is an Akka Extension to hold settings for transactors.
*/
object TransactorExtension extends ExtensionId[TransactorSettings] with ExtensionIdProvider {
override def get(system: ActorSystem): TransactorSettings = super.get(system)
override def lookup = TransactorExtension
override def createExtension(system: ExtendedActorSystem): TransactorSettings = new TransactorSettings(system.settings.config)
}
class TransactorSettings(val config: Config) extends Extension {
import config._
val CoordinatedTimeout = Timeout(Duration(getMilliseconds("akka.transactor.coordinated-timeout"), MILLISECONDS))
}

View file

@ -12,6 +12,8 @@ import java.util.{ Set ⇒ JSet }
* An UntypedActor version of transactor for using from Java.
*/
abstract class UntypedTransactor extends UntypedActor {
private val settings = TransactorExtension(context.system)
/**
* Implement a general pattern for using coordinated transactions.
*/
@ -29,7 +31,7 @@ abstract class UntypedTransactor extends UntypedActor {
}
case message {
val normal = normally(message)
if (!normal) onReceive(Coordinated(message)(context.system.settings.ActorTimeout))
if (!normal) onReceive(Coordinated(message)(settings.CoordinatedTimeout))
}
}
}

View file

@ -1,6 +1,6 @@
##############################
# Akka Reference Config File #
##############################
#####################################
# Akka ZeroMQ Reference Config File #
#####################################
# This the reference config file has all the default settings.
# Make your edits/overrides in your application.conf.
@ -12,6 +12,9 @@ akka {
# The default timeout for a poll on the actual zeromq socket.
poll-timeout = 100ms
# Timeout for creating a new socket
new-socket-timeout = 5s
socket-dispatcher {
# A zeromq socket needs to be pinned to the thread that created it.
# Changing this value results in weird errors and race conditions within zeromq

View file

@ -187,12 +187,9 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A
fromConfig getOrElse context.system.dispatcher
}
private val defaultPollTimeout =
Duration(context.system.settings.config.getMilliseconds("akka.zeromq.poll-timeout"), TimeUnit.MILLISECONDS)
private val pollTimeout = {
val fromConfig = params collectFirst { case PollTimeoutDuration(duration) duration }
fromConfig getOrElse defaultPollTimeout
fromConfig getOrElse ZeroMQExtension(context.system).DefaultPollTimeout
}
private def newEventLoop: Option[Promise[PollLifeCycle]] = {

View file

@ -7,6 +7,9 @@ import org.zeromq.{ ZMQ ⇒ JZMQ }
import akka.actor._
import akka.dispatch.{ Await }
import akka.pattern.ask
import akka.util.Duration
import java.util.concurrent.TimeUnit
import akka.util.Timeout
/**
* A Model to represent a version of the zeromq library
@ -43,6 +46,9 @@ object ZeroMQExtension extends ExtensionId[ZeroMQExtension] with ExtensionIdProv
*/
class ZeroMQExtension(system: ActorSystem) extends Extension {
val DefaultPollTimeout = Duration(system.settings.config.getMilliseconds("akka.zeromq.poll-timeout"), TimeUnit.MILLISECONDS)
val NewSocketTimeout = Timeout(Duration(system.settings.config.getMilliseconds("akka.zeromq.new-socket-timeout"), TimeUnit.MILLISECONDS))
/**
* The version of the ZeroMQ library
* @return a [[akka.zeromq.ZeroMQVersion]]
@ -136,7 +142,7 @@ class ZeroMQExtension(system: ActorSystem) extends Extension {
* @return the [[akka.actor.ActorRef]]
*/
def newSocket(socketParameters: SocketOption*): ActorRef = {
implicit val timeout = system.settings.ActorTimeout
implicit val timeout = NewSocketTimeout
val req = (zeromqGuardian ? newSocketProps(socketParameters: _*)).mapTo[ActorRef]
Await.result(req, timeout.duration)
}