Changing default connection timeout to 100 seconds and adding Future Java API with tests

This commit is contained in:
Viktor Klang 2011-08-12 17:23:34 +02:00
parent 5b2b463ce0
commit 37a6844d46
7 changed files with 128 additions and 12 deletions

View file

@ -5,6 +5,9 @@ import static org.junit.Assert.*;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.LinkedList; import java.util.LinkedList;
import java.lang.Iterable; import java.lang.Iterable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import akka.japi.Function; import akka.japi.Function;
import akka.japi.Function2; import akka.japi.Function2;
import akka.japi.Procedure; import akka.japi.Procedure;
@ -30,6 +33,119 @@ public class JavaFutureTests {
assertEquals("Hello World", f2.get()); assertEquals("Hello World", f2.get());
} }
@Test public void mustBeAbleToExecuteAnOnResultCallback() throws Throwable {
final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = new akka.dispatch.DefaultPromise<String>(1000, TimeUnit.MILLISECONDS, Dispatchers.defaultGlobalDispatcher());
Future<String> f = cf;
f.onResult(new Procedure<String>() {
public void apply(String result) {
if(result.equals("foo"))
latch.countDown();
}
});
cf.completeWithResult("foo");
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
assertEquals(f.get(), "foo");
}
@Test public void mustBeAbleToExecuteAnOnExceptionCallback() throws Throwable {
final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = new akka.dispatch.DefaultPromise<String>(1000, TimeUnit.MILLISECONDS, Dispatchers.defaultGlobalDispatcher());
Future<String> f = cf;
f.onException(new Procedure<Throwable>() {
public void apply(Throwable t) {
if(t instanceof NullPointerException)
latch.countDown();
}
});
Throwable exception = new NullPointerException();
cf.completeWithException(exception);
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
assertEquals(f.exception().get(), exception);
}
@Test public void mustBeAbleToExecuteAnOnTimeoutCallback() throws Throwable {
final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = new akka.dispatch.DefaultPromise<String>(1000, TimeUnit.MILLISECONDS, Dispatchers.defaultGlobalDispatcher());
Future<String> f = cf;
f.onTimeout(new Procedure<Future<String>>() {
public void apply(Future<String> future) {
latch.countDown();
}
});
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
assertTrue(f.value().isEmpty());
}
@Test public void mustBeAbleToExecuteAnOnCompleteCallback() throws Throwable {
final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = new akka.dispatch.DefaultPromise<String>(1000, TimeUnit.MILLISECONDS, Dispatchers.defaultGlobalDispatcher());
Future<String> f = cf;
f.onComplete(new Procedure<Future<String>>() {
public void apply(akka.dispatch.Future<String> future) {
latch.countDown();
}
});
cf.completeWithResult("foo");
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
assertEquals(f.get(), "foo");
}
@Test public void mustBeAbleToForeachAFuture() throws Throwable {
final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = new akka.dispatch.DefaultPromise<String>(1000, TimeUnit.MILLISECONDS, Dispatchers.defaultGlobalDispatcher());
Future<String> f = cf;
f.foreach(new Procedure<String>() {
public void apply(String future) {
latch.countDown();
}
});
cf.completeWithResult("foo");
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
assertEquals(f.get(), "foo");
}
@Test public void mustBeAbleToFlatMapAFuture() throws Throwable {
final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = new akka.dispatch.DefaultPromise<String>(1000, TimeUnit.MILLISECONDS, Dispatchers.defaultGlobalDispatcher());
cf.completeWithResult("1000");
Future<String> f = cf;
Future<Integer> r = f.flatMap(new Function<String, Future<Integer>>() {
public Future<Integer> apply(String r) {
latch.countDown();
Promise<Integer> cf = new akka.dispatch.DefaultPromise<Integer>(1000, TimeUnit.MILLISECONDS, Dispatchers.defaultGlobalDispatcher());
cf.completeWithResult(Integer.parseInt(r));
return cf;
}
});
assertEquals(f.get(), "1000");
assertEquals(r.get().intValue(), 1000);
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
}
@Test public void mustBeAbleToFilterAFuture() throws Throwable {
final CountDownLatch latch = new CountDownLatch(1);
Promise<String> cf = new akka.dispatch.DefaultPromise<String>(1000, TimeUnit.MILLISECONDS, Dispatchers.defaultGlobalDispatcher());
Future<String> f = cf;
Future<String> r = f.filter(new Function<String, Boolean>() {
public Boolean apply(String r) {
latch.countDown();
return r.equals("foo");
}
});
cf.completeWithResult("foo");
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
assertEquals(f.get(), "foo");
assertEquals(r.get(), "foo");
}
// TODO: Improve this test, perhaps with an Actor // TODO: Improve this test, perhaps with an Actor
@Test public void mustSequenceAFutureList() { @Test public void mustSequenceAFutureList() {
LinkedList<Future<String>> listFutures = new LinkedList<Future<String>>(); LinkedList<Future<String>> listFutures = new LinkedList<Future<String>>();

View file

@ -38,7 +38,7 @@ class ClusterSpec extends WordSpec with MustMatchers {
//akka.cluster.server //akka.cluster.server
getInt("akka.cluster.server.port") must equal(Some(2552)) getInt("akka.cluster.server.port") must equal(Some(2552))
getInt("akka.cluster.server.message-frame-size") must equal(Some(1048576)) getInt("akka.cluster.server.message-frame-size") must equal(Some(1048576))
getInt("akka.cluster.server.connection-timeout") must equal(Some(1)) getInt("akka.cluster.server.connection-timeout") must equal(Some(100))
getBool("akka.cluster.server.require-cookie") must equal(Some(false)) getBool("akka.cluster.server.require-cookie") must equal(Some(false))
getBool("akka.cluster.server.untrusted-mode") must equal(Some(false)) getBool("akka.cluster.server.untrusted-mode") must equal(Some(false))
getInt("akka.cluster.server.backlog") must equal(Some(4096)) getInt("akka.cluster.server.backlog") must equal(Some(4096))

View file

@ -614,17 +614,17 @@ sealed trait Future[+T] extends japi.Future[T] {
} }
package japi { package japi {
/* Future Java API */ /* Java API */
trait Future[+T] { self: akka.dispatch.Future[T] trait Future[+T] { self: akka.dispatch.Future[T]
private[japi] final def onComplete[A >: T](proc: Procedure[Future[A]]): this.type = self.onComplete(proc(_)) private[japi] final def onTimeout[A >: T](proc: Procedure[akka.dispatch.Future[A]]): this.type = self.onTimeout(proc(_))
private[japi] final def onResult[A >: T](proc: Procedure[A]): this.type = self.onResult({ case r: A proc(r) }: PartialFunction[T, Unit])
private[japi] final def onException(proc: Procedure[Throwable]): this.type = self.onException({ case t: Throwable proc(t) }: PartialFunction[Throwable, Unit])
private[japi] final def onComplete[A >: T](proc: Procedure[akka.dispatch.Future[A]]): this.type = self.onComplete(proc(_))
private[japi] final def map[A >: T, B](f: JFunc[A, B]): akka.dispatch.Future[B] = self.map(f(_)) private[japi] final def map[A >: T, B](f: JFunc[A, B]): akka.dispatch.Future[B] = self.map(f(_))
private[japi] final def flatMap[A >: T, B](f: JFunc[A, akka.dispatch.Future[B]]): akka.dispatch.Future[B] = self.flatMap(f(_)) private[japi] final def flatMap[A >: T, B](f: JFunc[A, akka.dispatch.Future[B]]): akka.dispatch.Future[B] = self.flatMap(f(_))
private[japi] final def foreach[A >: T](proc: Procedure[A]): Unit = self.foreach(proc(_)) private[japi] final def foreach[A >: T](proc: Procedure[A]): Unit = self.foreach(proc(_))
private[japi] final def filter[A >: T](p: JFunc[A, java.lang.Boolean]): akka.dispatch.Future[A] =
private[japi] final def filter(p: JFunc[Any, Boolean]): akka.dispatch.Future[Any] = self.filter(p(_)) self.filter((a: Any) p(a.asInstanceOf[A])).asInstanceOf[akka.dispatch.Future[A]]
} }
} }

View file

@ -33,7 +33,7 @@ object RemoteServerSettings {
val UNTRUSTED_MODE = config.getBool("akka.cluster.server.untrusted-mode", false) val UNTRUSTED_MODE = config.getBool("akka.cluster.server.untrusted-mode", false)
val PORT = config.getInt("akka.cluster.server.port", 2552) val PORT = config.getInt("akka.cluster.server.port", 2552)
val CONNECTION_TIMEOUT_MILLIS = Duration(config.getInt("akka.cluster.server.connection-timeout", 1), TIME_UNIT) val CONNECTION_TIMEOUT = Duration(config.getInt("akka.cluster.server.connection-timeout", 100), TIME_UNIT)
val COMPRESSION_SCHEME = config.getString("akka.cluster.compression-scheme", "") val COMPRESSION_SCHEME = config.getString("akka.cluster.compression-scheme", "")
val ZLIB_COMPRESSION_LEVEL = { val ZLIB_COMPRESSION_LEVEL = {
val level = config.getInt("akka.cluster.zlib-compression-level", 6) val level = config.getInt("akka.cluster.zlib-compression-level", 6)

View file

@ -649,7 +649,7 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
bootstrap.setOption("child.tcpNoDelay", true) bootstrap.setOption("child.tcpNoDelay", true)
bootstrap.setOption("child.keepAlive", true) bootstrap.setOption("child.keepAlive", true)
bootstrap.setOption("child.reuseAddress", true) bootstrap.setOption("child.reuseAddress", true)
bootstrap.setOption("child.connectTimeoutMillis", RemoteServerSettings.CONNECTION_TIMEOUT_MILLIS.toMillis) bootstrap.setOption("child.connectTimeoutMillis", RemoteServerSettings.CONNECTION_TIMEOUT.toMillis)
openChannels.add(bootstrap.bind(address)) openChannels.add(bootstrap.bind(address))
serverModule.notifyListeners(RemoteServerStarted(serverModule)) serverModule.notifyListeners(RemoteServerStarted(serverModule))

View file

@ -78,7 +78,7 @@ class ClusterActorRefCleanupMultiJvmNode1 extends MasterClusterTestNode {
case e: RoutingException case e: RoutingException
} }
//since the call to the node failed, the node must have been removed from the list. //since the call to the node failed, the node must have been removed from the list.
clusteredRef.connectionsSize must be(1) clusteredRef.connectionsSize must be(1)
//send a message to this node, //send a message to this node,

View file

@ -211,7 +211,7 @@ akka {
server { server {
port = 2552 # The default remote server port clients should connect to. Default is 2552 (AKKA) port = 2552 # The default remote server port clients should connect to. Default is 2552 (AKKA)
message-frame-size = 1048576 # Increase this if you want to be able to send messages with large payloads message-frame-size = 1048576 # Increase this if you want to be able to send messages with large payloads
connection-timeout = 1 connection-timeout = 100 # Length in time-unit
require-cookie = off # Should the remote server require that it peers share the same secure-cookie (defined in the 'remote' section)? require-cookie = off # Should the remote server require that it peers share the same secure-cookie (defined in the 'remote' section)?
untrusted-mode = off # Enable untrusted mode for full security of server managed actors, allows untrusted clients to connect. untrusted-mode = off # Enable untrusted mode for full security of server managed actors, allows untrusted clients to connect.
backlog = 4096 # Sets the size of the connection backlog backlog = 4096 # Sets the size of the connection backlog