Merge branch 'master' into wip-1800-balancing-è

This commit is contained in:
Viktor Klang 2012-02-13 18:18:29 +01:00
commit aef41fbef3
40 changed files with 681 additions and 235 deletions

View file

@ -4,14 +4,21 @@
package akka.actor
import java.io.File
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions
import akka.util.Timeout
import akka.util.duration._
@deprecated("use ActorSystem instead", "2.0")
object GlobalActorSystem extends ActorSystemImpl("GlobalSystem", OldConfigurationLoader.defaultConfig) {
start()
/**
* Timeout used in `OldFuture.get` and default implicit ask timeout.
* Hard coded since the migration kit is not intended to be used for production anyway.
*/
val AwaitTimeout = Timeout(5 seconds)
}
/**

View file

@ -18,13 +18,13 @@ class OldFuture[T](future: Future[T]) {
@deprecated("use akka.dispatch.Await.result instead", "2.0")
def get: T = try {
Await.result(future, GlobalActorSystem.settings.ActorTimeout.duration)
Await.result(future, GlobalActorSystem.AwaitTimeout.duration)
} catch {
case e: TimeoutException throw new FutureTimeoutException(e.getMessage, e)
}
@deprecated("use akka.dispatch.Await.ready instead", "2.0")
def await: Future[T] = await(GlobalActorSystem.settings.ActorTimeout.duration)
def await: Future[T] = await(GlobalActorSystem.AwaitTimeout.duration)
@deprecated("use akka.dispatch.Await.ready instead", "2.0")
def await(atMost: Duration) = try {

View file

@ -14,7 +14,7 @@ package object migration {
implicit def future2OldFuture[T](future: Future[T]): OldFuture[T] = new OldFuture[T](future)
implicit def askTimeout: Timeout = GlobalActorSystem.settings.ActorTimeout
implicit def askTimeout: Timeout = GlobalActorSystem.AwaitTimeout
implicit def defaultDispatcher: MessageDispatcher = GlobalActorSystem.dispatcher

View file

@ -5,6 +5,7 @@ import akka.actor.ActorSystem;
import akka.japi.*;
import akka.util.Duration;
import akka.testkit.TestKitExtension;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -28,7 +29,7 @@ public class JavaFutureTests {
@BeforeClass
public static void beforeAll() {
system = ActorSystem.create("JavaFutureTests", AkkaSpec.testConf());
t = system.settings().ActorTimeout();
t = TestKitExtension.get(system).DefaultTimeout();
}
@AfterClass
@ -61,10 +62,10 @@ public class JavaFutureTests {
Promise<String> cf = Futures.promise(system.dispatcher());
Future<String> f = cf;
f.onSuccess(new OnSuccess<String>() {
public void onSuccess(String result) {
if (result.equals("foo"))
latch.countDown();
}
public void onSuccess(String result) {
if (result.equals("foo"))
latch.countDown();
}
});
cf.success("foo");
@ -78,10 +79,10 @@ public class JavaFutureTests {
Promise<String> cf = Futures.promise(system.dispatcher());
Future<String> f = cf;
f.onFailure(new OnFailure() {
public void onFailure(Throwable t) {
if (t instanceof NullPointerException)
latch.countDown();
}
public void onFailure(Throwable t) {
if (t instanceof NullPointerException)
latch.countDown();
}
});
Throwable exception = new NullPointerException();
@ -296,8 +297,10 @@ public class JavaFutureTests {
Promise<Object> p = Futures.promise(system.dispatcher());
Future<Object> f = p.future().recover(new Recover<Object>() {
public Object recover(Throwable t) throws Throwable {
if (t == fail) return "foo";
else throw t;
if (t == fail)
return "foo";
else
throw t;
}
});
Duration d = Duration.create(1, TimeUnit.SECONDS);
@ -311,8 +314,10 @@ public class JavaFutureTests {
Promise<Object> p = Futures.promise(system.dispatcher());
Future<Object> f = p.future().recoverWith(new Recover<Future<Object>>() {
public Future<Object> recover(Throwable t) throws Throwable {
if (t == fail) return Futures.<Object>successful("foo", system.dispatcher()).future();
else throw t;
if (t == fail)
return Futures.<Object> successful("foo", system.dispatcher()).future();
else
throw t;
}
});
Duration d = Duration.create(1, TimeUnit.SECONDS);

View file

@ -305,6 +305,7 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
intercept[MalformedURLException] { ActorPath.fromString("://hallo") }
intercept[MalformedURLException] { ActorPath.fromString("s://dd@:12") }
intercept[MalformedURLException] { ActorPath.fromString("s://dd@h:hd") }
intercept[MalformedURLException] { ActorPath.fromString("a://l:1/b") }
}
}

View file

@ -113,7 +113,7 @@ object TypedActorSpec {
}
def futureComposePigdogFrom(foo: Foo): Future[String] = {
implicit val timeout = TypedActor.context.system.settings.ActorTimeout
implicit val timeout = TypedActor(TypedActor.context.system).DefaultReturnTimeout
foo.futurePigdog(500).map(_.toUpperCase)
}

View file

@ -5,8 +5,9 @@ package akka.pattern
import akka.testkit.AkkaSpec
import akka.util.duration._
import akka.testkit.DefaultTimeout
class AskSpec extends AkkaSpec {
class AskSpec extends AkkaSpec with DefaultTimeout {
"The “ask” pattern" must {
@ -22,7 +23,6 @@ class AskSpec extends AkkaSpec {
"return broken promises on EmptyLocalActorRefs" in {
val empty = system.actorFor("unknown")
implicit val timeout = system.settings.ActorTimeout
val f = empty ? 3.14
f.isCompleted must be(true)
f.value.get match {

View file

@ -1,6 +1,6 @@
##############################
# Akka Reference Config File #
##############################
####################################
# Akka Actor Reference Config File #
####################################
# This the reference config file has all the default settings.
# Make your edits/overrides in your application.conf.
@ -50,12 +50,6 @@ akka {
# removed from their parents
reaper-interval = 5s
# Default timeout for Future based invocations
# - Actor: ask && ?
# - UntypedActor: ask
# - TypedActor: methods with non-void return type
timeout = 5s
# Serializes and deserializes (non-primitive) messages to ensure immutability,
# this is only intended for testing.
serialize-messages = off
@ -64,6 +58,11 @@ akka {
# this is only intended for testing.
serialize-creators = off
typed {
# Default timeout for typed actor methods with non-void return type
timeout = 5s
}
deployment {
# deployment id pattern - on the format: /parent/child etc.

View file

@ -70,7 +70,6 @@ object ActorSystem {
final val CreationTimeout = Timeout(Duration(getMilliseconds("akka.actor.creation-timeout"), MILLISECONDS))
final val ReaperInterval = Duration(getMilliseconds("akka.actor.reaper-interval"), MILLISECONDS)
final val ActorTimeout = Timeout(Duration(getMilliseconds("akka.actor.timeout"), MILLISECONDS))
final val SerializeAllMessages = getBoolean("akka.actor.serialize-messages")
final val SerializeAllCreators = getBoolean("akka.actor.serialize-creators")

View file

@ -15,7 +15,7 @@ import java.net.MalformedURLException
* for example a remote transport would want to associate additional
* information with an address, then this must be done externally.
*/
final case class Address(protocol: String, system: String, host: Option[String], port: Option[Int]) {
final case class Address private (protocol: String, system: String, host: Option[String], port: Option[Int]) {
def this(protocol: String, system: String) = this(protocol, system, None, None)
def this(protocol: String, system: String, host: String, port: Int) = this(protocol, system, Option(host), Some(port))
@ -62,20 +62,25 @@ object RelativeActorPath {
* This object serves as extractor for Scala and as address parser for Java.
*/
object AddressExtractor {
def unapply(addr: String): Option[Address] = {
def unapply(addr: String): Option[Address] =
try {
val uri = new URI(addr)
if (uri.getScheme == null || (uri.getUserInfo == null && uri.getHost == null)) None
else {
val addr = Address(uri.getScheme, if (uri.getUserInfo != null) uri.getUserInfo else uri.getHost,
if (uri.getUserInfo == null || uri.getHost == null) None else Some(uri.getHost),
if (uri.getPort < 0) None else Some(uri.getPort))
Some(addr)
}
unapply(uri)
} catch {
case _: URISyntaxException None
}
}
def unapply(uri: URI): Option[Address] =
if (uri.getScheme == null || (uri.getUserInfo == null && uri.getHost == null)) None
else if (uri.getUserInfo == null) { // case 1: akka://system
if (uri.getPort != -1) None
else Some(Address(uri.getScheme, uri.getHost))
} else { // case 2: akka://system@host:port
if (uri.getHost == null || uri.getPort == -1) None
else Some(
if (uri.getUserInfo == null) Address(uri.getScheme, uri.getHost)
else Address(uri.getScheme, uri.getUserInfo, uri.getHost, uri.getPort))
}
/**
* Try to construct an Address from the given String or throw a java.net.MalformedURLException.
@ -92,18 +97,15 @@ object AddressExtractor {
}
object ActorPathExtractor {
def unapply(addr: String): Option[(Address, Iterable[String])] = {
def unapply(addr: String): Option[(Address, Iterable[String])] =
try {
val uri = new URI(addr)
if (uri.getScheme == null || (uri.getUserInfo == null && uri.getHost == null) || uri.getPath == null) None
else {
val addr = Address(uri.getScheme, if (uri.getUserInfo != null) uri.getUserInfo else uri.getHost,
if (uri.getUserInfo == null || uri.getHost == null) None else Some(uri.getHost),
if (uri.getPort < 0) None else Some(uri.getPort))
Some((addr, ActorPath.split(uri.getPath).drop(1)))
if (uri.getPath == null) None
else AddressExtractor.unapply(uri) match {
case None None
case Some(addr) Some((addr, ActorPath.split(uri.getPath).drop(1)))
}
} catch {
case _: URISyntaxException None
}
}
}

View file

@ -11,7 +11,9 @@ import java.util.concurrent.atomic.{ AtomicReference ⇒ AtomVar }
import akka.serialization.{ Serialization, SerializationExtension }
import akka.dispatch._
import java.util.concurrent.TimeoutException
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.lang.IllegalStateException
import akka.util.Duration
trait TypedActorFactory {
@ -502,7 +504,7 @@ case class TypedProps[T <: AnyRef] protected[TypedProps] (
/**
* @return a new TypedProps that will use the specified Timeout for its non-void-returning methods,
* if null is specified, it will use the default ActorTimeout as specified in the configuration.
* if null is specified, it will use the default timeout as specified in the configuration.
*
* Java API
*/
@ -510,7 +512,7 @@ case class TypedProps[T <: AnyRef] protected[TypedProps] (
/**
* @return a new TypedProps that will use the specified Timeout for its non-void-returning methods,
* if None is specified, it will use the default ActorTimeout as specified in the configuration.
* if None is specified, it will use the default timeout as specified in the configuration.
*
* Scala API
*/
@ -550,6 +552,11 @@ class TypedActorExtension(system: ExtendedActorSystem) extends TypedActorFactory
val serialization = SerializationExtension(system)
val settings = system.settings
/**
* Default timeout for typed actor methods with non-void return type
*/
final val DefaultReturnTimeout = Timeout(Duration(settings.config.getMilliseconds("akka.actor.typed.timeout"), MILLISECONDS))
/**
* Retrieves the underlying ActorRef for the supplied TypedActor proxy, or null if none found
*/
@ -575,7 +582,7 @@ class TypedActorExtension(system: ExtendedActorSystem) extends TypedActorFactory
new TypedActorInvocationHandler(
this,
actorVar,
if (props.timeout.isDefined) props.timeout.get else this.settings.ActorTimeout)).asInstanceOf[R]
if (props.timeout.isDefined) props.timeout.get else DefaultReturnTimeout)).asInstanceOf[R]
proxyVar match {
case null

View file

@ -18,28 +18,46 @@ The multi-JVM testing is an sbt plugin that you can find here:
http://github.com/typesafehub/sbt-multi-jvm
You can add it as a plugin by adding the following to your plugins/build.sbt::
You can add it as a plugin by adding the following to your project/plugins.sbt::
resolvers += Classpaths.typesafeResolver
addSbtPlugin("com.typesafe.sbtmultijvm" % "sbt-multi-jvm" % "0.1.9")
You can then add multi-JVM testing to a project by including the ``MultiJvm``
You can then add multi-JVM testing to ``project/Build.scala`` by including the ``MultiJvm``
settings and config. For example, here is how the akka-remote project adds
multi-JVM testing::
import MultiJvmPlugin.{ MultiJvm, extraOptions }
import sbt._
import Keys._
import com.typesafe.sbtmultijvm.MultiJvmPlugin
import com.typesafe.sbtmultijvm.MultiJvmPlugin.{ MultiJvm, extraOptions }
lazy val cluster = Project(
id = "akka-remote",
base = file("akka-remote"),
settings = defaultSettings ++ MultiJvmPlugin.settings ++ Seq(
extraOptions in MultiJvm <<= (sourceDirectory in MultiJvm) { src =>
(name: String) => (src ** (name + ".conf")).get.headOption.map("-Dconfig.file=" + _.absolutePath).toSeq
},
test in Test <<= (test in Test) dependsOn (test in MultiJvm)
object AkkaBuild extends Build {
lazy val remote = Project(
id = "akka-remote",
base = file("akka-remote"),
settings = defaultSettings ++ MultiJvmPlugin.settings ++ Seq(
extraOptions in MultiJvm <<= (sourceDirectory in MultiJvm) { src =>
(name: String) => (src ** (name + ".conf")).get.headOption.map("-Dconfig.file=" + _.absolutePath).toSeq
},
test in Test <<= (test in Test) dependsOn (test in MultiJvm)
)
) configs (MultiJvm)
lazy val buildSettings = Defaults.defaultSettings ++ Seq(
organization := "com.typesafe.akka",
version := "2.0-SNAPSHOT",
scalaVersion := "2.9.1",
crossPaths := false
)
) configs (MultiJvm)
lazy val defaultSettings = buildSettings ++ Seq(
resolvers += "Typesafe Repo" at "http://repo.typesafe.com/typesafe/releases/"
)
}
You can specify JVM options for the forked JVMs::
@ -87,8 +105,8 @@ options after the test names and ``--``. For example:
Creating application tests
==========================
The tests are discovered, and combined, through a naming convention. A test is
named with the following pattern:
The tests are discovered, and combined, through a naming convention. MultiJvm tests are
located in ``src/multi-jvm/scala`` directory. A test is named with the following pattern:
.. code-block:: none

View file

@ -75,6 +75,24 @@ akka-testkit
.. literalinclude:: ../../akka-testkit/src/main/resources/reference.conf
:language: none
akka-transactor
~~~~~~~~~~~~~~~
.. literalinclude:: ../../akka-transactor/src/main/resources/reference.conf
:language: none
akka-agent
~~~~~~~~~~
.. literalinclude:: ../../akka-agent/src/main/resources/reference.conf
:language: none
akka-zeromq
~~~~~~~~~~~
.. literalinclude:: ../../akka-zeromq/src/main/resources/reference.conf
:language: none
akka-beanstalk-mailbox
~~~~~~~~~~~~~~~~~~~~~~

View file

@ -45,6 +45,18 @@ To prevent visibility and reordering problems on actors, Akka guarantees the fol
Both rules only apply for the same actor instance and are not valid if different actors are used.
Futures and the Java Memory Model
---------------------------------
The completion of a Future "happens before" the invocation of any callbacks registered to it are executed.
We recommend not to close over non-final fields (final in Java and val in Scala), and if you *do* choose to close over
non-final fields, they must be marked *volatile* in order for the current value of the field to be visible to the callback.
If you close over a reference, you must also ensure that the instance that is referred to is thread safe.
We highly recommend staying away from objects that use locking, since it can introduce performance problems and in the worst case, deadlocks.
Such are the perils of synchronized.
STM and the Java Memory Model
-----------------------------
Akka's Software Transactional Memory (STM) also provides a "happens before" rule:

View file

@ -78,7 +78,7 @@ public class FutureDocTestBase {
ActorRef actor = system.actorOf(new Props(MyActor.class));
String msg = "hello";
//#ask-blocking
Timeout timeout = system.settings().ActorTimeout();
Timeout timeout = new Timeout(Duration.parse("5 seconds"));
Future<Object> future = Patterns.ask(actor, msg, timeout);
String result = (String) Await.result(future, timeout.duration());
//#ask-blocking
@ -196,19 +196,17 @@ public class FutureDocTestBase {
Iterable<Future<Integer>> listOfFutureInts = source;
// now we have a Future[Iterable[Integer]]
Future<Iterable<Integer>> futureListOfInts =
sequence(listOfFutureInts, system.dispatcher());
Future<Iterable<Integer>> futureListOfInts = sequence(listOfFutureInts, system.dispatcher());
// Find the sum of the odd numbers
Future<Long> futureSum = futureListOfInts.map(
new Mapper<Iterable<Integer>, Long>() {
public Long apply(Iterable<Integer> ints) {
long sum = 0;
for (Integer i : ints)
sum += i;
return sum;
}
});
Future<Long> futureSum = futureListOfInts.map(new Mapper<Iterable<Integer>, Long>() {
public Long apply(Iterable<Integer> ints) {
long sum = 0;
for (Integer i : ints)
sum += i;
return sum;
}
});
long result = Await.result(futureSum, Duration.create(1, SECONDS));
//#sequence
@ -221,20 +219,18 @@ public class FutureDocTestBase {
//Just a sequence of Strings
Iterable<String> listStrings = Arrays.asList("a", "b", "c");
Future<Iterable<String>> futureResult = traverse(listStrings,
new Function<String, Future<String>>() {
public Future<String> apply(final String r) {
return future(new Callable<String>() {
public String call() {
return r.toUpperCase();
}
}, system.dispatcher());
}
}, system.dispatcher());
Future<Iterable<String>> futureResult = traverse(listStrings, new Function<String, Future<String>>() {
public Future<String> apply(final String r) {
return future(new Callable<String>() {
public String call() {
return r.toUpperCase();
}
}, system.dispatcher());
}
}, system.dispatcher());
//Returns the sequence of strings as upper case
Iterable<String> result =
Await.result(futureResult, Duration.create(1, SECONDS));
Iterable<String> result = Await.result(futureResult, Duration.create(1, SECONDS));
assertEquals(Arrays.asList("A", "B", "C"), result);
//#traverse
}
@ -250,12 +246,11 @@ public class FutureDocTestBase {
Iterable<Future<String>> futures = source;
//Start value is the empty string
Future<String> resultFuture = fold("", futures,
new Function2<String, String, String>() {
public String apply(String r, String t) {
return r + t; //Just concatenate
}
}, system.dispatcher());
Future<String> resultFuture = fold("", futures, new Function2<String, String, String>() {
public String apply(String r, String t) {
return r + t; //Just concatenate
}
}, system.dispatcher());
String result = Await.result(resultFuture, Duration.create(1, SECONDS));
//#fold
@ -272,12 +267,11 @@ public class FutureDocTestBase {
//A sequence of Futures, in this case Strings
Iterable<Future<String>> futures = source;
Future<Object> resultFuture = reduce(futures,
new Function2<Object, String, Object>() {
public Object apply(Object r, String t) {
return r + t; //Just concatenate
}
}, system.dispatcher());
Future<Object> resultFuture = reduce(futures, new Function2<Object, String, Object>() {
public Object apply(Object r, String t) {
return r + t; //Just concatenate
}
}, system.dispatcher());
Object result = Await.result(resultFuture, Duration.create(1, SECONDS));
//#reduce
@ -285,32 +279,35 @@ public class FutureDocTestBase {
assertEquals("ab", result);
}
@Test public void useSuccessfulAndFailed() {
@Test
public void useSuccessfulAndFailed() {
//#successful
Future<String> future = Futures.successful("Yay!", system.dispatcher());
//#successful
//#failed
Future<String> otherFuture =
Futures.failed(new IllegalArgumentException("Bang!"), system.dispatcher());
Future<String> otherFuture = Futures.failed(new IllegalArgumentException("Bang!"), system.dispatcher());
//#failed
Object result = Await.result(future, Duration.create(1, SECONDS));
assertEquals("Yay!",result);
assertEquals("Yay!", result);
Throwable result2 = Await.result(otherFuture.failed(), Duration.create(1, SECONDS));
assertEquals("Bang!",result2.getMessage());
assertEquals("Bang!", result2.getMessage());
}
@Test public void useFilter() {
//#filter
@Test
public void useFilter() {
//#filter
Future<Integer> future1 = Futures.successful(4, system.dispatcher());
Future<Integer> successfulFilter =
future1.filter(new Filter<Integer>() {
public boolean filter(Integer i) { return i % 2 == 0; }
});
Future<Integer> successfulFilter = future1.filter(new Filter<Integer>() {
public boolean filter(Integer i) {
return i % 2 == 0;
}
});
Future<Integer> failedFilter =
future1.filter(new Filter<Integer>() {
public boolean filter(Integer i) { return i % 2 != 0; }
});
Future<Integer> failedFilter = future1.filter(new Filter<Integer>() {
public boolean filter(Integer i) {
return i % 2 != 0;
}
});
//When filter fails, the returned Future will be failed with a scala.MatchError
//#filter
}
@ -323,138 +320,140 @@ public class FutureDocTestBase {
}
@Test public void useAndThen() {
@Test
public void useAndThen() {
//#and-then
Future<String> future1 = Futures.successful("value", system.dispatcher()).
andThen(new OnComplete<String>() {
public void onComplete(Throwable failure, String result) {
if (failure != null) sendToIssueTracker(failure);
}
Future<String> future1 = Futures.successful("value", system.dispatcher()).andThen(new OnComplete<String>() {
public void onComplete(Throwable failure, String result) {
if (failure != null)
sendToIssueTracker(failure);
}
}).andThen(new OnComplete<String>() {
public void onComplete(Throwable failure, String result) {
if (result != null) sendToTheInternetz(result);
}
public void onComplete(Throwable failure, String result) {
if (result != null)
sendToTheInternetz(result);
}
});
//#and-then
}
@Test public void useRecover() {
@Test
public void useRecover() {
//#recover
Future<Integer> future = future(new Callable<Integer>() {
public Integer call() {
return 1 / 0;
}
}, system.dispatcher()).recover(new Recover<Integer>() {
public Integer recover(Throwable problem) throws Throwable {
if (problem instanceof ArithmeticException) return 0;
else throw problem;
}
public Integer recover(Throwable problem) throws Throwable {
if (problem instanceof ArithmeticException)
return 0;
else
throw problem;
}
});
int result = Await.result(future, Duration.create(1, SECONDS));
assertEquals(result, 0);
//#recover
}
@Test public void useTryRecover() {
@Test
public void useTryRecover() {
//#try-recover
Future<Integer> future = future(new Callable<Integer>() {
public Integer call() {
return 1 / 0;
}
}, system.dispatcher()).recoverWith(new Recover<Future<Integer>>() {
public Future<Integer> recover(Throwable problem) throws Throwable {
if (problem instanceof ArithmeticException) {
return future(new Callable<Integer>() {
public Integer call() {
return 0;
}
}, system.dispatcher());
public Future<Integer> recover(Throwable problem) throws Throwable {
if (problem instanceof ArithmeticException) {
return future(new Callable<Integer>() {
public Integer call() {
return 0;
}
else throw problem;
}
}, system.dispatcher());
} else
throw problem;
}
});
int result = Await.result(future, Duration.create(1, SECONDS));
assertEquals(result, 0);
//#try-recover
}
@Test public void useOnSuccessOnFailureAndOnComplete() {
{
@Test
public void useOnSuccessOnFailureAndOnComplete() {
{
Future<String> future = Futures.successful("foo", system.dispatcher());
//#onSuccess
future.onSuccess(new OnSuccess<String>() {
public void onSuccess(String result) {
if ("bar" == result) {
//Do something if it resulted in "bar"
} else {
//Do something if it was some other String
}
public void onSuccess(String result) {
if ("bar" == result) {
//Do something if it resulted in "bar"
} else {
//Do something if it was some other String
}
}
});
//#onSuccess
}
{
Future<String> future =
Futures.failed(new IllegalStateException("OHNOES"), system.dispatcher());
//#onFailure
future.onFailure( new OnFailure() {
}
{
Future<String> future = Futures.failed(new IllegalStateException("OHNOES"), system.dispatcher());
//#onFailure
future.onFailure(new OnFailure() {
public void onFailure(Throwable failure) {
if (failure instanceof IllegalStateException) {
//Do something if it was this particular failure
} else {
//Do something if it was some other failure
}
if (failure instanceof IllegalStateException) {
//Do something if it was this particular failure
} else {
//Do something if it was some other failure
}
}
});
//#onFailure
}
{
Future<String> future = Futures.successful("foo", system.dispatcher());
//#onComplete
future.onComplete(new OnComplete<String>() {
public void onComplete(Throwable failure, String result) {
if (failure != null) {
//We got a failure, handle it here
} else {
// We got a result, do something with it
}
}
});
//#onComplete
}
}
{
Future<String> future = Futures.successful("foo", system.dispatcher());
//#onComplete
future.onComplete(new OnComplete<String>() {
public void onComplete(Throwable failure, String result) {
if (failure != null) {
//We got a failure, handle it here
} else {
// We got a result, do something with it
}
}
});
//#onComplete
}
}
@Test public void useOrAndZip(){
@Test
public void useOrAndZip() {
{
//#zip
Future<String> future1 = Futures.successful("foo", system.dispatcher());
Future<String> future2 = Futures.successful("bar", system.dispatcher());
Future<String> future3 =
future1.zip(future2).map(new Mapper<scala.Tuple2<String,String>, String>() {
public String apply(scala.Tuple2<String,String> zipped) {
return zipped._1() + " " + zipped._2();
//#zip
Future<String> future1 = Futures.successful("foo", system.dispatcher());
Future<String> future2 = Futures.successful("bar", system.dispatcher());
Future<String> future3 = future1.zip(future2).map(new Mapper<scala.Tuple2<String, String>, String>() {
public String apply(scala.Tuple2<String, String> zipped) {
return zipped._1() + " " + zipped._2();
}
});
});
String result = Await.result(future3, Duration.create(1, SECONDS));
assertEquals("foo bar", result);
//#zip
String result = Await.result(future3, Duration.create(1, SECONDS));
assertEquals("foo bar", result);
//#zip
}
{
//#fallback-to
Future<String> future1 =
Futures.failed(new IllegalStateException("OHNOES1"), system.dispatcher());
Future<String> future2 =
Futures.failed(new IllegalStateException("OHNOES2"), system.dispatcher());
Future<String> future3 =
Futures.successful("bar", system.dispatcher());
Future<String> future4 =
future1.fallbackTo(future2).fallbackTo(future3); // Will have "bar" in this case
String result = Await.result(future4, Duration.create(1, SECONDS));
assertEquals("bar", result);
//#fallback-to
//#fallback-to
Future<String> future1 = Futures.failed(new IllegalStateException("OHNOES1"), system.dispatcher());
Future<String> future2 = Futures.failed(new IllegalStateException("OHNOES2"), system.dispatcher());
Future<String> future3 = Futures.successful("bar", system.dispatcher());
Future<String> future4 = future1.fallbackTo(future2).fallbackTo(future3); // Will have "bar" in this case
String result = Await.result(future4, Duration.create(1, SECONDS));
assertEquals("bar", result);
//#fallback-to
}
}

View file

@ -54,9 +54,9 @@ public class ParentActor extends UntypedActor {
ActorRef scatterGatherFirstCompletedRouter = getContext().actorOf(
new Props(FibonacciActor.class).withRouter(new ScatterGatherFirstCompletedRouter(5, Duration
.parse("2 seconds"))), "router");
Timeout timeout = getContext().system().settings().ActorTimeout();
Future<Object> futureResult = akka.pattern.Patterns.ask(
scatterGatherFirstCompletedRouter, new FibonacciActor.FibonacciNumber(10), timeout);
Timeout timeout = new Timeout(Duration.parse("5 seconds"));
Future<Object> futureResult = akka.pattern.Patterns.ask(scatterGatherFirstCompletedRouter,
new FibonacciActor.FibonacciNumber(10), timeout);
int result = (Integer) Await.result(futureResult, timeout.duration());
//#scatterGatherFirstCompletedRouter
System.out.println(String.format("The result of calculating Fibonacci for 10 is %d", result));

View file

@ -0,0 +1,6 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
//#global
object Global extends com.typesafe.play.mini.Setup(akka.docs.http.PlayMiniApplication)
//#global

View file

@ -0,0 +1,128 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.docs.http
//#imports
import com.typesafe.play.mini.{ POST, GET, Path, Application }
import play.api.mvc.{ Action, AsyncResult }
import play.api.mvc.Results._
import play.api.libs.concurrent._
import play.api.data._
import play.api.data.Forms._
import akka.pattern.ask
import akka.util.Timeout
import akka.util.duration._
import akka.actor.{ ActorSystem, Props, Actor }
import scala.collection.mutable.{ Map MutableMap }
//#imports
//#playMiniDefinition
object PlayMiniApplication extends Application {
//#playMiniDefinition
private val system = ActorSystem("sample")
//#regexURI
private final val StatementPattern = """/account/statement/(\w+)""".r
//#regexURI
private lazy val accountActor = system.actorOf(Props[AccountActor])
implicit val timeout = Timeout(1000 milliseconds)
//#route
def route = {
//#routeLogic
//#simpleGET
case GET(Path("/ping")) Action {
Ok("Pong @ " + System.currentTimeMillis)
}
//#simpleGET
//#regexGET
case GET(Path(StatementPattern(accountId))) Action {
AsyncResult {
//#innerRegexGET
(accountActor ask Status(accountId)).mapTo[Int].asPromise.map { r
if (r >= 0) Ok("Account total: " + r)
else BadRequest("Unknown account: " + accountId)
}
//#innerRegexGET
}
}
//#regexGET
//#asyncDepositPOST
case POST(Path("/account/deposit")) Action { implicit request
//#formAsyncDepositPOST
val (accountId, amount) = commonForm.bindFromRequest.get
//#formAsyncDepositPOST
AsyncResult {
(accountActor ask Deposit(accountId, amount)).mapTo[Int].asPromise.map { r Ok("Updated account total: " + r) }
}
}
//#asyncDepositPOST
//#asyncWithdrawPOST
case POST(Path("/account/withdraw")) Action { implicit request
val (accountId, amount) = commonForm.bindFromRequest.get
AsyncResult {
(accountActor ask Withdraw(accountId, amount)).mapTo[Int].asPromise.map { r
if (r >= 0) Ok("Updated account total: " + r)
else BadRequest("Unknown account or insufficient funds. Get your act together.")
}
}
}
//#asyncWithdrawPOST
//#routeLogic
}
//#route
//#form
val commonForm = Form(
tuple(
"accountId" -> nonEmptyText,
"amount" -> number(min = 1)))
//#form
}
//#cases
case class Status(accountId: String)
case class Deposit(accountId: String, amount: Int)
case class Withdraw(accountId: String, amount: Int)
//#cases
//#actor
class AccountActor extends Actor {
var accounts = MutableMap[String, Int]()
//#receive
def receive = {
//#senderBang
case Status(accountId) sender ! accounts.getOrElse(accountId, -1)
//#senderBang
case Deposit(accountId, amount) sender ! deposit(accountId, amount)
case Withdraw(accountId, amount) sender ! withdraw(accountId, amount)
}
//#receive
private def deposit(accountId: String, amount: Int): Int = {
accounts.get(accountId) match {
case Some(value)
val newValue = value + amount
accounts += accountId -> newValue
newValue
case None
accounts += accountId -> amount
amount
}
}
private def withdraw(accountId: String, amount: Int): Int = {
accounts.get(accountId) match {
case Some(value)
if (value < amount) -1
else {
val newValue = value - amount
accounts += accountId -> newValue
newValue
}
case None -1
}
}
//#actor
}

View file

@ -7,8 +7,194 @@ HTTP
.. contents:: :local:
Play!
-----
Play2-mini
----------
The Akka team recommends the `Play2-mini <https://github.com/typesafehub/play2-mini>`_ framework when building RESTful
service applications that integrates with Akka. It provides a REST API on top of `Play2 <https://github.com/playframework/Play20/>`_.
Akka will recommend using `Play! Mini <https://github.com/typesafehub/play2-mini>`_
Getting started
---------------
First you must make your application aware of play-mini.
In SBT you just have to add the following to your _libraryDependencies_::
libraryDependencies += "com.typesafe" %% "play-mini" % "2.0-RC1-SNAPSHOT"
Sample Application
------------------
To illustrate how easy it is to wire a RESTful service with Akka we will use a sample application.
The aim of the application is to show how to use play-mini and Akka in combination. Do not put too much
attention on the actual business logic itself, which is a extremely simple bank application, as building a bank
application is a little more complex than what's shown in the sample...
The application should support the following URL commands:
- GET /ping - returns a Pong message with the time of the server (used to see if the application is up and running)
- GET /account/statement/{accountId} - returns the account statement
- POST /account/deposit - deposits money to an account (and creates a new one if it's not already existing)
- POST /account/withdraw - withdraws money from an account
Error messages will be returned in case of any misuse of the application, e.g. withdrawing more money than an
account has etc.
Getting started
---------------
To build a play-mini application you first have to make your object extend com.typesafe.play.mini.Application:
.. includecode:: code/akka/docs/http/PlayMiniApplication.scala
:include: playMiniDefinition
The next step is to implement the mandatory method ``route``:
.. includecode:: code/akka/docs/http/PlayMiniApplication.scala
:include: route
:exclude: routeLogic
It is inside the ``route`` method that all the magic happens.
In the sections below we will show how to set up play-mini to handle both GET and POST HTTP calls.
Simple GET
----------
We start off by creating the simplest method we can - a "ping" method:
.. includecode:: code/akka/docs/http/PlayMiniApplication.scala
:include: simpleGET
As you can see in the section above play-mini uses Scala's wonderful pattern matching.
In the snippet we instruct play-mini to reply to all HTTP GET calls with the URI "/ping".
The ``Action`` returned comes from Play! and you can find more information about it `here <https://github.com/playframework/Play20/wiki/ScalaActions>`_.
.. _Advanced-GET:
Advanced GET
------------
Let's try something more advanced, retrieving parameters from the URI and also make an asynchronous call to an actor:
.. includecode:: code/akka/docs/http/PlayMiniApplication.scala
:include: regexGET
The regular expression looks like this:
.. includecode:: code/akka/docs/http/PlayMiniApplication.scala
:include: regexURI
In the snippets above we extract a URI parameter with the help of a simple regular expression and then we pass this
parameter on to the underlying actor system. As you can see ``AsyncResult`` is being used. This means that the call to
the actor will be performed asynchronously, i.e. no blocking.
The asynchronous call to the actor is being done with a ``ask``, e.g.::
(accountActor ask Status(accountId))
The actor that receives the message returns the result by using a standard *sender !*
as can be seen here:
.. includecode:: code/akka/docs/http/PlayMiniApplication.scala
:include: senderBang
When the result is returned to the calling code we use some mapping code in Play to convert a Akka future to a Play future.
This is shown in this code:
.. includecode:: code/akka/docs/http/PlayMiniApplication.scala
:include: innerRegexGET
In this snippet we check the result to decide what type of response we want to send to the calling client.
Using HTTP POST
---------------
Okay, in the sections above we have shown you how to use play-mini for HTTP GET calls. Let's move on to when the user
posts values to the application.
.. includecode:: code/akka/docs/http/PlayMiniApplication.scala
:include: asyncDepositPOST
As you can see the structure is almost the same as for the :ref:`Advanced-GET`. The difference is that we make the
``request`` parameter ``implicit`` and also that the following line of code is used to extract parameters from the POST.
.. includecode:: code/akka/docs/http/PlayMiniApplication.scala
:include: formAsyncDepositPOST
The code snippet used to map the call to parameters looks like this:
.. includecode:: code/akka/docs/http/PlayMiniApplication.scala
:include: form
Apart from the mapping of parameters the call to the actor looks is done the same as in :ref:`Advanced-GET`.
The Complete Code Sample
------------------------
Below is the complete application in all its beauty.
Global.scala (<yourApp>/src/main/scala/Global.scala):
.. includecode:: code/Global.scala
PlayMiniApplication.scala (<yourApp>/src/main/scala/akka/docs/http/PlayMiniApplication.scala):
.. includecode:: code/akka/docs/http/PlayMiniApplication.scala
Build.scala (<yourApp>/project/Build.scala):
.. code-block:: scala
import sbt._
import Keys._
object PlayMiniApplicationBuild extends Build {
lazy val root = Project(id = "play-mini-application", base = file("."), settings = Project.defaultSettings).settings(
libraryDependencies += "com.typesafe" %% "play-mini" % "2.0-RC1-SNAPSHOT",
mainClass in (Compile, run) := Some("play.core.server.NettyServer"))
}
Running the Application
-----------------------
Firstly, start up the application by opening a command terminal and type::
> sbt
> run
Now you should see something similar to this in your terminal window::
[info] Running play.core.server.NettyServer
Play server process ID is 2523
[info] play - Application started (Prod)
[info] play - Listening for HTTP on port 9000...
In this example we will use the awesome `cURL <http://en.wikipedia.org/wiki/CURL>`_ command to interact with the application.
Fire up a command terminal and try the application out::
First we check the status of a couple of accounts:
> curl http://localhost:9000/account/statement/TheDudesAccount
Unknown account: TheDudesAccount
> curl http://localhost:9000/account/statement/MrLebowskisAccount
Unknown account: MrLebowskisAccount
Now deposit some money to the accounts:
> curl -d "accountId=TheDudesAccount&amount=1000" http://localhost:9000/account/deposit
Updated account total: 1000
> curl -d "accountId=MrLebowskisAccount&amount=500" http://localhost:9000/account/deposit
Updated account total: 500
Next thing is to check the status of the account:
> curl http://localhost:9000/account/statement/TheDudesAccount
Account total: 1000
> curl http://localhost:9000/account/statement/MrLebowskisAccount
Account total: 500
Fair enough, let's try to withdraw some cash shall we:
> curl -d "accountId=TheDudesAccount&amount=999" http://localhost:9000/account/withdraw
Updated account total: 1
> curl -d "accountId=MrLebowskisAccount&amount=999" http://localhost:9000/account/withdraw
Unknown account or insufficient funds. Get your act together.
> curl -d "accountId=MrLebowskisAccount&amount=500" http://localhost:9000/account/withdraw
Updated account total: 0
Yeah, it works!
Now we leave it to the astute reader of this document to take advantage of the power of play-mini and Akka.

View file

@ -11,6 +11,7 @@ import akka.actor.Props
import akka.actor.Status.Failure
import akka.dispatch.Future
import akka.dispatch.Await
import akka.util.Timeout
import akka.util.duration._
import akka.dispatch.Promise
import java.lang.IllegalStateException
@ -46,8 +47,10 @@ class FutureDocSpec extends AkkaSpec {
//#ask-blocking
import akka.dispatch.Await
import akka.pattern.ask
import akka.util.Timeout
import akka.util.duration._
implicit val timeout = system.settings.ActorTimeout
implicit val timeout = Timeout(5 seconds)
val future = actor ? msg // enabled by the ask import
val result = Await.result(future, timeout.duration).asInstanceOf[String]
//#ask-blocking
@ -57,7 +60,7 @@ class FutureDocSpec extends AkkaSpec {
"demonstrate usage of mapTo" in {
val actor = system.actorOf(Props[MyActor])
val msg = "hello"
implicit val timeout = system.settings.ActorTimeout
implicit val timeout = Timeout(5 seconds)
//#map-to
import akka.dispatch.Future
import akka.pattern.ask
@ -164,7 +167,7 @@ class FutureDocSpec extends AkkaSpec {
val actor3 = system.actorOf(Props[MyActor])
val msg1 = 1
val msg2 = 2
implicit val timeout = system.settings.ActorTimeout
implicit val timeout = Timeout(5 seconds)
import akka.dispatch.Await
import akka.pattern.ask
//#composing-wrong
@ -188,7 +191,7 @@ class FutureDocSpec extends AkkaSpec {
val actor3 = system.actorOf(Props[MyActor])
val msg1 = 1
val msg2 = 2
implicit val timeout = system.settings.ActorTimeout
implicit val timeout = Timeout(5 seconds)
import akka.dispatch.Await
import akka.pattern.ask
//#composing
@ -208,7 +211,7 @@ class FutureDocSpec extends AkkaSpec {
}
"demonstrate usage of sequence with actors" in {
implicit val timeout = system.settings.ActorTimeout
implicit val timeout = Timeout(5 seconds)
val oddActor = system.actorOf(Props[OddActor])
//#sequence-ask
// oddActor returns odd numbers sequentially from 1 as a List[Future[Int]]
@ -256,7 +259,7 @@ class FutureDocSpec extends AkkaSpec {
}
"demonstrate usage of recover" in {
implicit val timeout = system.settings.ActorTimeout
implicit val timeout = Timeout(5 seconds)
val actor = system.actorOf(Props[MyActor])
val msg1 = -1
//#recover
@ -268,7 +271,7 @@ class FutureDocSpec extends AkkaSpec {
}
"demonstrate usage of recoverWith" in {
implicit val timeout = system.settings.ActorTimeout
implicit val timeout = Timeout(5 seconds)
val actor = system.actorOf(Props[MyActor])
val msg1 = -1
//#try-recover

View file

@ -28,7 +28,7 @@ class RemoteDeploymentDocSpec extends AkkaSpec("""
import RemoteDeploymentDocSpec._
val other = ActorSystem("remote", system.settings.config)
val address = other.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address("akka", "s", Some("host"), Some(1))).get
val address = other.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address("akka", "s", "host", 1)).get
override def atTermination() { other.shutdown() }

View file

@ -7,6 +7,7 @@ import akka.routing.{ ScatterGatherFirstCompletedRouter, BroadcastRouter, Random
import annotation.tailrec
import akka.actor.{ Props, Actor }
import akka.util.duration._
import akka.util.Timeout
import akka.dispatch.Await
import akka.pattern.ask
import akka.routing.SmallestMailboxRouter
@ -80,7 +81,7 @@ class ParentActor extends Actor {
val scatterGatherFirstCompletedRouter = context.actorOf(
Props[FibonacciActor].withRouter(ScatterGatherFirstCompletedRouter(
nrOfInstances = 5, within = 2 seconds)), "router")
implicit val timeout = context.system.settings.ActorTimeout
implicit val timeout = Timeout(5 seconds)
val futureResult = scatterGatherFirstCompletedRouter ? FibonacciNumber(10)
val result = Await.result(futureResult, timeout.duration)
//#scatterGatherFirstCompletedRouter

View file

@ -61,7 +61,7 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor
case sa: InetSocketAddress sa
case x throw new RemoteTransportException("unknown local address type " + x.getClass, null)
}
_address.compareAndSet(null, Address("akka", remoteSettings.systemName, Some(settings.Hostname), Some(addr.getPort)))
_address.compareAndSet(null, Address("akka", remoteSettings.systemName, settings.Hostname, addr.getPort))
}
def address = _address.get

View file

@ -183,7 +183,7 @@ class RemoteServerHandler(
instruction.getCommandType match {
case CommandType.CONNECT if settings.UsePassiveConnections
val origin = instruction.getOrigin
val inbound = Address("akka", origin.getSystem, Some(origin.getHostname), Some(origin.getPort))
val inbound = Address("akka", origin.getSystem, origin.getHostname, origin.getPort)
val client = new PassiveRemoteClient(event.getChannel, netty, inbound)
netty.bindClient(inbound, client)
case CommandType.SHUTDOWN //Will be unbound in channelClosed
@ -203,7 +203,7 @@ class RemoteServerHandler(
private def getClientAddress(c: Channel): Option[Address] =
c.getRemoteAddress match {
case inet: InetSocketAddress Some(Address("akka", "unknown(yet)", Some(inet.getAddress.toString), Some(inet.getPort)))
case inet: InetSocketAddress Some(Address("akka", "unknown(yet)", inet.getAddress.toString, inet.getPort))
case _ None
}
}

View file

@ -44,7 +44,7 @@ akka {
/looker/child/grandchild.remote = "akka://RemoteCommunicationSpec@localhost:12345"
}
}
""") with ImplicitSender {
""") with ImplicitSender with DefaultTimeout {
import RemoteCommunicationSpec._
@ -59,8 +59,6 @@ akka {
val here = system.actorFor("akka://remote_sys@localhost:12346/user/echo")
implicit val timeout = system.settings.ActorTimeout
override def atTermination() {
other.shutdown()
}

View file

@ -42,7 +42,7 @@ class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) {
service,
deployment.get.config,
RoundRobinRouter(3),
RemoteScope(Address("akka", "sys", Some("wallace"), Some(2552))))))
RemoteScope(Address("akka", "sys", "wallace", 2552)))))
}
}

View file

@ -160,7 +160,7 @@ akka.actor.deployment {
children must have size 2
val parents = children.map(_.parent)
parents must have size 1
parents.head.address must be(Address("akka", "remote_sys", Some("localhost"), Some(12347)))
parents.head.address must be(Address("akka", "remote_sys", "localhost", 12347))
children foreach (_.address.toString must be === "akka://remote_sys@localhost:12347")
system.stop(router)
}

View file

@ -18,6 +18,9 @@ akka {
# duration to wait in expectMsg and friends outside of within() block by default
single-expect-default = 3s
# The timeout that is added as an implicit by DefaultTimeout trait
default-timeout = 5s
calling-thread-dispatcher {
type = akka.testkit.CallingThreadDispatcherConfigurator
}

View file

@ -68,7 +68,7 @@ class TestActorRef[T <: Actor](
if (isTerminated) throw new IllegalActorStateException("underlying actor is terminated")
underlying.actor.asInstanceOf[T] match {
case null
val t = underlying.system.settings.ActorTimeout
val t = TestKitExtension(_system).DefaultTimeout
Await.result(this.?(InternalGetActor)(t), t.duration).asInstanceOf[T]
case ref ref
}

View file

@ -11,6 +11,7 @@ import java.util.concurrent.{ BlockingDeque, LinkedBlockingDeque, TimeUnit, atom
import atomic.AtomicInteger
import scala.annotation.tailrec
import akka.actor.ActorSystem
import akka.util.Timeout
object TestActor {
type Ignore = Option[PartialFunction[AnyRef, Boolean]]
@ -644,5 +645,5 @@ trait ImplicitSender { this: TestKit ⇒
}
trait DefaultTimeout { this: TestKit
implicit val timeout = system.settings.ActorTimeout
implicit val timeout: Timeout = testKitSettings.DefaultTimeout
}

View file

@ -5,6 +5,7 @@ package akka.testkit
import com.typesafe.config.Config
import akka.util.Duration
import akka.util.Timeout
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.{ ExtensionId, ActorSystem, Extension, ExtendedActorSystem }
@ -20,4 +21,5 @@ class TestKitSettings(val config: Config) extends Extension {
val TestTimeFactor = getDouble("akka.test.timefactor")
val SingleExpectDefaultTimeout = Duration(getMilliseconds("akka.test.single-expect-default"), MILLISECONDS)
val TestEventFilterLeeway = Duration(getMilliseconds("akka.test.filter-leeway"), MILLISECONDS)
val DefaultTimeout = Timeout(Duration(getMilliseconds("akka.test.default-timeout"), MILLISECONDS))
}

View file

@ -122,7 +122,7 @@ class AkkaSpecSpec extends WordSpec with MustMatchers {
try {
var locker = Seq.empty[DeadLetter]
implicit val timeout = system.settings.ActorTimeout
implicit val timeout = TestKitExtension(system).DefaultTimeout
implicit val davyJones = otherSystem.actorOf(Props(new Actor {
def receive = {
case m: DeadLetter locker :+= m

View file

@ -0,0 +1,13 @@
#########################################
# Akka Transactor Reference Config File #
#########################################
# This the reference config file has all the default settings.
# Make your edits/overrides in your application.conf.
akka {
transactor {
# The timeout used for coordinated transactions across actors
coordinated-timeout = 5s
}
}

View file

@ -93,6 +93,8 @@ case class SendTo(actor: ActorRef, message: Option[Any] = None)
* @see [[akka.transactor.Coordinated]] for more information about the underlying mechanism
*/
trait Transactor extends Actor {
private val settings = TransactorExtension(context.system)
/**
* Implement a general pattern for using coordinated transactions.
*/
@ -108,7 +110,7 @@ trait Transactor extends Actor {
}
case message {
if (normally.isDefinedAt(message)) normally(message)
else receive(Coordinated(message)(context.system.settings.ActorTimeout))
else receive(Coordinated(message)(settings.CoordinatedTimeout))
}
}

View file

@ -0,0 +1,25 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor
import akka.actor.{ ActorSystem, ExtensionId, ExtensionIdProvider, ExtendedActorSystem }
import akka.actor.Extension
import com.typesafe.config.Config
import akka.util.Timeout
import akka.util.Duration
import java.util.concurrent.TimeUnit.MILLISECONDS
/**
* TransactorExtension is an Akka Extension to hold settings for transactors.
*/
object TransactorExtension extends ExtensionId[TransactorSettings] with ExtensionIdProvider {
override def get(system: ActorSystem): TransactorSettings = super.get(system)
override def lookup = TransactorExtension
override def createExtension(system: ExtendedActorSystem): TransactorSettings = new TransactorSettings(system.settings.config)
}
class TransactorSettings(val config: Config) extends Extension {
import config._
val CoordinatedTimeout = Timeout(Duration(getMilliseconds("akka.transactor.coordinated-timeout"), MILLISECONDS))
}

View file

@ -12,6 +12,8 @@ import java.util.{ Set ⇒ JSet }
* An UntypedActor version of transactor for using from Java.
*/
abstract class UntypedTransactor extends UntypedActor {
private val settings = TransactorExtension(context.system)
/**
* Implement a general pattern for using coordinated transactions.
*/
@ -29,7 +31,7 @@ abstract class UntypedTransactor extends UntypedActor {
}
case message {
val normal = normally(message)
if (!normal) onReceive(Coordinated(message)(context.system.settings.ActorTimeout))
if (!normal) onReceive(Coordinated(message)(settings.CoordinatedTimeout))
}
}
}

View file

@ -1,6 +1,6 @@
##############################
# Akka Reference Config File #
##############################
#####################################
# Akka ZeroMQ Reference Config File #
#####################################
# This the reference config file has all the default settings.
# Make your edits/overrides in your application.conf.
@ -12,6 +12,9 @@ akka {
# The default timeout for a poll on the actual zeromq socket.
poll-timeout = 100ms
# Timeout for creating a new socket
new-socket-timeout = 5s
socket-dispatcher {
# A zeromq socket needs to be pinned to the thread that created it.
# Changing this value results in weird errors and race conditions within zeromq

View file

@ -187,12 +187,9 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A
fromConfig getOrElse context.system.dispatcher
}
private val defaultPollTimeout =
Duration(context.system.settings.config.getMilliseconds("akka.zeromq.poll-timeout"), TimeUnit.MILLISECONDS)
private val pollTimeout = {
val fromConfig = params collectFirst { case PollTimeoutDuration(duration) duration }
fromConfig getOrElse defaultPollTimeout
fromConfig getOrElse ZeroMQExtension(context.system).DefaultPollTimeout
}
private def newEventLoop: Option[Promise[PollLifeCycle]] = {

View file

@ -7,6 +7,9 @@ import org.zeromq.{ ZMQ ⇒ JZMQ }
import akka.actor._
import akka.dispatch.{ Await }
import akka.pattern.ask
import akka.util.Duration
import java.util.concurrent.TimeUnit
import akka.util.Timeout
/**
* A Model to represent a version of the zeromq library
@ -43,6 +46,9 @@ object ZeroMQExtension extends ExtensionId[ZeroMQExtension] with ExtensionIdProv
*/
class ZeroMQExtension(system: ActorSystem) extends Extension {
val DefaultPollTimeout = Duration(system.settings.config.getMilliseconds("akka.zeromq.poll-timeout"), TimeUnit.MILLISECONDS)
val NewSocketTimeout = Timeout(Duration(system.settings.config.getMilliseconds("akka.zeromq.new-socket-timeout"), TimeUnit.MILLISECONDS))
/**
* The version of the ZeroMQ library
* @return a [[akka.zeromq.ZeroMQVersion]]
@ -136,7 +142,7 @@ class ZeroMQExtension(system: ActorSystem) extends Extension {
* @return the [[akka.actor.ActorRef]]
*/
def newSocket(socketParameters: SocketOption*): ActorRef = {
implicit val timeout = system.settings.ActorTimeout
implicit val timeout = NewSocketTimeout
val req = (zeromqGuardian ? newSocketProps(socketParameters: _*)).mapTo[ActorRef]
Await.result(req, timeout.duration)
}

View file

@ -348,6 +348,7 @@ object AkkaBuild extends Build {
lazy val defaultSettings = baseSettings ++ formatSettings ++ Seq(
resolvers += "Typesafe Repo" at "http://repo.typesafe.com/typesafe/releases/",
resolvers += "Twitter Public Repo" at "http://maven.twttr.com", // This will be going away with com.mongodb.async's next release
resolvers += "Typesafe Snapshot Repo" at "http://repo.typesafe.com/typesafe/snapshots/", // Used while play-mini is still on RC
// compile options
scalacOptions ++= Seq("-encoding", "UTF-8", "-deprecation", "-unchecked") ++ (
@ -473,7 +474,7 @@ object Dependencies {
val tutorials = Seq(Test.scalatest, Test.junit)
val docs = Seq(Test.scalatest, Test.junit)
val docs = Seq(Test.scalatest, Test.junit, playMini)
val zeroMQ = Seq(Test.scalatest, Test.junit, protobuf, Dependency.zeroMQ)
}
@ -497,6 +498,7 @@ object Dependency {
val Slf4j = "1.6.4"
val Spring = "3.0.5.RELEASE"
val Zookeeper = "3.4.0"
val PlayMini = "2.0-RC1-SNAPSHOT"
}
// Compile
@ -533,6 +535,7 @@ object Dependency {
val zookeeper = "org.apache.hadoop.zookeeper" % "zookeeper" % V.Zookeeper // ApacheV2
val zookeeperLock = "org.apache.hadoop.zookeeper" % "zookeeper-recipes-lock" % V.Zookeeper // ApacheV2
val zeroMQ = "org.zeromq" %% "zeromq-scala-binding" % "0.0.3" // ApacheV2
val playMini = "com.typesafe" % "play-mini_2.9.1" % V.PlayMini
// Provided