This commit is contained in:
Nikolay Botev 2011-12-19 15:54:26 -08:00
parent 0558e11aff
commit 877075cdac
6 changed files with 46 additions and 21 deletions

View file

@ -287,9 +287,13 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
protected def systemImpl = this
@inline private def askAndAwait(actorRef: ActorRef, message: Any)(implicit timeout: akka.util.Timeout): Any = {
Await.result(Futures.ask(actorRef, message), timeout.duration)
}
private[akka] def systemActorOf(props: Props, name: String): ActorRef = {
implicit val timeout = settings.CreationTimeout
Await.result(Futures.ask(systemGuardian, CreateChild(props, name)), timeout.duration) match {
askAndAwait(systemGuardian, CreateChild(props, name)) match {
case ref: ActorRef ref
case ex: Exception throw ex
}
@ -297,7 +301,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
def actorOf(props: Props, name: String): ActorRef = {
implicit val timeout = settings.CreationTimeout
Await.result(Futures.ask(guardian, CreateChild(props, name)), timeout.duration) match {
askAndAwait(guardian, CreateChild(props, name)) match {
case ref: ActorRef ref
case ex: Exception throw ex
}
@ -305,7 +309,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
def actorOf(props: Props): ActorRef = {
implicit val timeout = settings.CreationTimeout
Await.result(Futures.ask(guardian, CreateRandomNameChild(props)), timeout.duration) match {
askAndAwait(guardian, CreateRandomNameChild(props)) match {
case ref: ActorRef ref
case ex: Exception throw ex
}
@ -317,8 +321,8 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
val guard = guardian.path
val sys = systemGuardian.path
path.parent match {
case `guard` Await.result(Futures.ask(guardian, StopChild(actor)), timeout.duration)
case `sys` Await.result(Futures.ask(systemGuardian, StopChild(actor)), timeout.duration)
case `guard` askAndAwait(guardian, StopChild(actor))
case `sys` askAndAwait(systemGuardian, StopChild(actor))
case _ actor.asInstanceOf[InternalActorRef].stop()
}
}

View file

@ -41,4 +41,26 @@ package object actor {
}
}
// Implicit for converting a Promise to an actor.
// Symmetric to the future2actor conversion, which allows
// piping a Future result (read side) to an Actor's mailbox, this
// conversion allows using an Actor to complete a Promise (write side)
//
// Future.ask / actor ? message is now a trivial implementation that can
// also be done in user code (assuming actorRef, timeout and dispatcher implicits):
//
// Future.ask(actor, message) = {
// val promise = Promise[Any]()
// actor ! (message, promise)
// promise
// }
@inline implicit def promise2actor(promise: akka.dispatch.Promise[Any])(implicit actorRef: ActorRef, timeout: akka.util.Timeout) = {
val provider = actorRef.asInstanceOf[InternalActorRef].provider
provider.ask(promise, timeout) match {
case Some(ref) ref
case None null
}
}
}

View file

@ -55,14 +55,10 @@ object Await {
object Futures {
def ask(actor: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = {
val provider = actor.asInstanceOf[InternalActorRef].provider
val promise = Promise[Any]()(provider.dispatcher)
provider.ask(promise, timeout) match {
case Some(a)
actor.!(message)(a)
case None
actor.!(message)(null)
}
implicit val dispatcher = actor.asInstanceOf[InternalActorRef].provider.dispatcher
implicit val actorRefContext = actor // for promise2actor implicit conversion
val promise = Promise[Any]()
actor.!(message)(promise)
promise
}

View file

@ -10,6 +10,7 @@ import org.junit.Test;
//#imports
import akka.actor.*;
import akka.dispatch.Await;
import static akka.dispatch.Futures.ask;
import akka.transactor.Coordinated;
import akka.util.Duration;
import akka.util.Timeout;
@ -30,7 +31,7 @@ public class TransactorDocTest {
counter1.tell(new Coordinated(new Increment(counter2), timeout));
Integer count = (Integer) Await.result(counter1.ask("GetCount", timeout), timeout.duration());
Integer count = (Integer) Await.result(ask(counter1, "GetCount", timeout), timeout.duration());
//#coordinated-example
assertEquals(count, new Integer(1));
@ -71,7 +72,7 @@ public class TransactorDocTest {
counter.tell(coordinated.coordinate(new Increment()));
coordinated.await();
Integer count = (Integer) Await.result(counter.ask("GetCount", timeout), timeout.duration());
Integer count = (Integer) Await.result(ask(counter, "GetCount", timeout), timeout.duration());
assertEquals(count, new Integer(1));
system.shutdown();
@ -88,10 +89,10 @@ public class TransactorDocTest {
friendlyCounter.tell(coordinated.coordinate(new Increment(friend)));
coordinated.await();
Integer count1 = (Integer) Await.result(friendlyCounter.ask("GetCount", timeout), timeout.duration());
Integer count1 = (Integer) Await.result(ask(friendlyCounter, "GetCount", timeout), timeout.duration());
assertEquals(count1, new Integer(1));
Integer count2 = (Integer) Await.result(friend.ask("GetCount", timeout), timeout.duration());
Integer count2 = (Integer) Await.result(ask(friend, "GetCount", timeout), timeout.duration());
assertEquals(count2, new Integer(1));
system.shutdown();

View file

@ -18,6 +18,7 @@ import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import akka.dispatch.Await;
import akka.dispatch.Future;
import static akka.dispatch.Futures.ask;
import akka.testkit.AkkaSpec;
import akka.testkit.EventFilter;
import akka.testkit.ErrorFilter;
@ -80,7 +81,7 @@ public class UntypedCoordinatedIncrementTest {
} catch (InterruptedException exception) {
}
for (ActorRef counter : counters) {
Future<Object> future = counter.ask("GetCount", timeout);
Future<Object> future = ask(counter, "GetCount", timeout);
int count = (Integer) Await.result(future, timeout.duration());
assertEquals(1, count);
}
@ -102,7 +103,7 @@ public class UntypedCoordinatedIncrementTest {
} catch (InterruptedException exception) {
}
for (ActorRef counter : counters) {
Future<Object>future = counter.ask("GetCount", timeout);
Future<Object>future = ask(counter, "GetCount", timeout);
int count = (Integer) Await.result(future, timeout.duration());
assertEquals(0, count);
}

View file

@ -18,6 +18,7 @@ import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import akka.dispatch.Await;
import akka.dispatch.Future;
import static akka.dispatch.Futures.ask;
import akka.testkit.AkkaSpec;
import akka.testkit.EventFilter;
import akka.testkit.ErrorFilter;
@ -81,7 +82,7 @@ public class UntypedTransactorTest {
} catch (InterruptedException exception) {
}
for (ActorRef counter : counters) {
Future<Object> future = counter.ask("GetCount", timeout);
Future<Object> future = ask(counter, "GetCount", timeout);
int count = (Integer) Await.result(future, timeout.duration());
assertEquals(1, count);
}
@ -103,7 +104,7 @@ public class UntypedTransactorTest {
} catch (InterruptedException exception) {
}
for (ActorRef counter : counters) {
Future<Object> future = counter.ask("GetCount", timeout);
Future<Object> future = ask(counter, "GetCount", timeout);
int count = (Integer) Await.result(future, timeout.duration());
assertEquals(0, count);
}