diff --git a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java index e908335666..5bd58e4108 100644 --- a/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java +++ b/akka-actor-tests/src/test/java/akka/dispatch/JavaFutureTests.java @@ -5,6 +5,9 @@ import static org.junit.Assert.*; import java.util.concurrent.Callable; import java.util.LinkedList; import java.lang.Iterable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + import akka.japi.Function; import akka.japi.Function2; import akka.japi.Procedure; @@ -30,6 +33,119 @@ public class JavaFutureTests { assertEquals("Hello World", f2.get()); } + @Test public void mustBeAbleToExecuteAnOnResultCallback() throws Throwable { + final CountDownLatch latch = new CountDownLatch(1); + Promise cf = new akka.dispatch.DefaultPromise(1000, TimeUnit.MILLISECONDS, Dispatchers.defaultGlobalDispatcher()); + Future f = cf; + f.onResult(new Procedure() { + public void apply(String result) { + if(result.equals("foo")) + latch.countDown(); + } + }); + + cf.completeWithResult("foo"); + assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); + assertEquals(f.get(), "foo"); + } + + @Test public void mustBeAbleToExecuteAnOnExceptionCallback() throws Throwable { + final CountDownLatch latch = new CountDownLatch(1); + Promise cf = new akka.dispatch.DefaultPromise(1000, TimeUnit.MILLISECONDS, Dispatchers.defaultGlobalDispatcher()); + Future f = cf; + f.onException(new Procedure() { + public void apply(Throwable t) { + if(t instanceof NullPointerException) + latch.countDown(); + } + }); + + Throwable exception = new NullPointerException(); + cf.completeWithException(exception); + assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); + assertEquals(f.exception().get(), exception); + } + + @Test public void mustBeAbleToExecuteAnOnTimeoutCallback() throws Throwable { + final CountDownLatch latch = new CountDownLatch(1); + Promise cf = new akka.dispatch.DefaultPromise(1000, TimeUnit.MILLISECONDS, Dispatchers.defaultGlobalDispatcher()); + Future f = cf; + f.onTimeout(new Procedure>() { + public void apply(Future future) { + latch.countDown(); + } + }); + + assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); + assertTrue(f.value().isEmpty()); + } + + @Test public void mustBeAbleToExecuteAnOnCompleteCallback() throws Throwable { + final CountDownLatch latch = new CountDownLatch(1); + Promise cf = new akka.dispatch.DefaultPromise(1000, TimeUnit.MILLISECONDS, Dispatchers.defaultGlobalDispatcher()); + Future f = cf; + f.onComplete(new Procedure>() { + public void apply(akka.dispatch.Future future) { + latch.countDown(); + } + }); + + cf.completeWithResult("foo"); + assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); + assertEquals(f.get(), "foo"); + } + + @Test public void mustBeAbleToForeachAFuture() throws Throwable { + final CountDownLatch latch = new CountDownLatch(1); + Promise cf = new akka.dispatch.DefaultPromise(1000, TimeUnit.MILLISECONDS, Dispatchers.defaultGlobalDispatcher()); + Future f = cf; + f.foreach(new Procedure() { + public void apply(String future) { + latch.countDown(); + } + }); + + cf.completeWithResult("foo"); + assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); + assertEquals(f.get(), "foo"); + } + + @Test public void mustBeAbleToFlatMapAFuture() throws Throwable { + final CountDownLatch latch = new CountDownLatch(1); + Promise cf = new akka.dispatch.DefaultPromise(1000, TimeUnit.MILLISECONDS, Dispatchers.defaultGlobalDispatcher()); + cf.completeWithResult("1000"); + Future f = cf; + Future r = f.flatMap(new Function>() { + public Future apply(String r) { + latch.countDown(); + Promise cf = new akka.dispatch.DefaultPromise(1000, TimeUnit.MILLISECONDS, Dispatchers.defaultGlobalDispatcher()); + cf.completeWithResult(Integer.parseInt(r)); + return cf; + } + }); + + assertEquals(f.get(), "1000"); + assertEquals(r.get().intValue(), 1000); + assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); + } + + @Test public void mustBeAbleToFilterAFuture() throws Throwable { + final CountDownLatch latch = new CountDownLatch(1); + Promise cf = new akka.dispatch.DefaultPromise(1000, TimeUnit.MILLISECONDS, Dispatchers.defaultGlobalDispatcher()); + Future f = cf; + Future r = f.filter(new Function() { + public Boolean apply(String r) { + latch.countDown(); + return r.equals("foo"); + } + }); + + cf.completeWithResult("foo"); + assertTrue(latch.await(5000, TimeUnit.MILLISECONDS)); + assertEquals(f.get(), "foo"); + assertEquals(r.get(), "foo"); + } + // TODO: Improve this test, perhaps with an Actor @Test public void mustSequenceAFutureList() { LinkedList> listFutures = new LinkedList>(); diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/ClusterSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/ClusterSpec.scala index 2bec70e6f5..36a0ff76e3 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/ClusterSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/ClusterSpec.scala @@ -38,7 +38,7 @@ class ClusterSpec extends WordSpec with MustMatchers { //akka.cluster.server getInt("akka.cluster.server.port") must equal(Some(2552)) getInt("akka.cluster.server.message-frame-size") must equal(Some(1048576)) - getInt("akka.cluster.server.connection-timeout") must equal(Some(1)) + getInt("akka.cluster.server.connection-timeout") must equal(Some(100)) getBool("akka.cluster.server.require-cookie") must equal(Some(false)) getBool("akka.cluster.server.untrusted-mode") must equal(Some(false)) getInt("akka.cluster.server.backlog") must equal(Some(4096)) diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 96c8f5e252..4a53292512 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -614,17 +614,17 @@ sealed trait Future[+T] extends japi.Future[T] { } package japi { - /* Future Java API */ + /* Java API */ trait Future[+T] { self: akka.dispatch.Future[T] ⇒ - private[japi] final def onComplete[A >: T](proc: Procedure[Future[A]]): this.type = self.onComplete(proc(_)) - + private[japi] final def onTimeout[A >: T](proc: Procedure[akka.dispatch.Future[A]]): this.type = self.onTimeout(proc(_)) + private[japi] final def onResult[A >: T](proc: Procedure[A]): this.type = self.onResult({ case r: A ⇒ proc(r) }: PartialFunction[T, Unit]) + private[japi] final def onException(proc: Procedure[Throwable]): this.type = self.onException({ case t: Throwable ⇒ proc(t) }: PartialFunction[Throwable, Unit]) + private[japi] final def onComplete[A >: T](proc: Procedure[akka.dispatch.Future[A]]): this.type = self.onComplete(proc(_)) private[japi] final def map[A >: T, B](f: JFunc[A, B]): akka.dispatch.Future[B] = self.map(f(_)) - private[japi] final def flatMap[A >: T, B](f: JFunc[A, akka.dispatch.Future[B]]): akka.dispatch.Future[B] = self.flatMap(f(_)) - private[japi] final def foreach[A >: T](proc: Procedure[A]): Unit = self.foreach(proc(_)) - - private[japi] final def filter(p: JFunc[Any, Boolean]): akka.dispatch.Future[Any] = self.filter(p(_)) + private[japi] final def filter[A >: T](p: JFunc[A, java.lang.Boolean]): akka.dispatch.Future[A] = + self.filter((a: Any) ⇒ p(a.asInstanceOf[A])).asInstanceOf[akka.dispatch.Future[A]] } } diff --git a/akka-cluster/src/main/scala/akka/remote/RemoteConfig.scala b/akka-cluster/src/main/scala/akka/remote/RemoteConfig.scala index 23896fe0e7..367e244b2b 100644 --- a/akka-cluster/src/main/scala/akka/remote/RemoteConfig.scala +++ b/akka-cluster/src/main/scala/akka/remote/RemoteConfig.scala @@ -33,7 +33,7 @@ object RemoteServerSettings { val UNTRUSTED_MODE = config.getBool("akka.cluster.server.untrusted-mode", false) val PORT = config.getInt("akka.cluster.server.port", 2552) - val CONNECTION_TIMEOUT_MILLIS = Duration(config.getInt("akka.cluster.server.connection-timeout", 1), TIME_UNIT) + val CONNECTION_TIMEOUT = Duration(config.getInt("akka.cluster.server.connection-timeout", 100), TIME_UNIT) val COMPRESSION_SCHEME = config.getString("akka.cluster.compression-scheme", "") val ZLIB_COMPRESSION_LEVEL = { val level = config.getInt("akka.cluster.zlib-compression-level", 6) diff --git a/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index b5ebf69062..de2e758985 100644 --- a/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-cluster/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -649,7 +649,7 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String, bootstrap.setOption("child.tcpNoDelay", true) bootstrap.setOption("child.keepAlive", true) bootstrap.setOption("child.reuseAddress", true) - bootstrap.setOption("child.connectTimeoutMillis", RemoteServerSettings.CONNECTION_TIMEOUT_MILLIS.toMillis) + bootstrap.setOption("child.connectTimeoutMillis", RemoteServerSettings.CONNECTION_TIMEOUT.toMillis) openChannels.add(bootstrap.bind(address)) serverModule.notifyListeners(RemoteServerStarted(serverModule)) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmSpec.scala index 7958bbd38e..e8ed5f2992 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/reflogic/ClusterActorRefCleanupMultiJvmSpec.scala @@ -78,7 +78,7 @@ class ClusterActorRefCleanupMultiJvmNode1 extends MasterClusterTestNode { case e: RoutingException ⇒ } - //since the call to the node failed, the node must have been removed from the list. + //since the call to the node failed, the node must have been removed from the list. clusteredRef.connectionsSize must be(1) //send a message to this node, diff --git a/config/akka-reference.conf b/config/akka-reference.conf index a33deaf4ec..5adfb2112a 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -211,7 +211,7 @@ akka { server { port = 2552 # The default remote server port clients should connect to. Default is 2552 (AKKA) message-frame-size = 1048576 # Increase this if you want to be able to send messages with large payloads - connection-timeout = 1 + connection-timeout = 100 # Length in time-unit require-cookie = off # Should the remote server require that it peers share the same secure-cookie (defined in the 'remote' section)? untrusted-mode = off # Enable untrusted mode for full security of server managed actors, allows untrusted clients to connect. backlog = 4096 # Sets the size of the connection backlog