WIP: first compiling base

This commit is contained in:
Nikolay Botev 2011-12-31 17:01:19 -08:00
parent 877075cdac
commit a342bb93ea
6 changed files with 115 additions and 96 deletions

View file

@ -520,7 +520,7 @@ class LocalActorRefProvider(
// case other result.success(other)
// }
//}), systemGuardian, systemGuardian.path / "promise" / tempName(), false, None)
//val ff = system.scheduler.scheduleOnce(t.duration) { b.stop() }
//val ff = system.scheduler.scheduleOnce(t.duration) { result.failure(new AskTimeoutException("Timed out")) }
//result onComplete { _
// b.stop()
// ff.cancel()

View file

@ -7,6 +7,7 @@ import akka.config.ConfigurationException
import akka.actor._
import akka.event._
import akka.dispatch._
import akka.patterns.ask
import akka.util.duration._
import akka.util.Timeout
import akka.util.Timeout._
@ -287,13 +288,9 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
protected def systemImpl = this
@inline private def askAndAwait(actorRef: ActorRef, message: Any)(implicit timeout: akka.util.Timeout): Any = {
Await.result(Futures.ask(actorRef, message), timeout.duration)
}
private[akka] def systemActorOf(props: Props, name: String): ActorRef = {
implicit val timeout = settings.CreationTimeout
askAndAwait(systemGuardian, CreateChild(props, name)) match {
Await.result(systemGuardian ? CreateChild(props, name), timeout.duration) match {
case ref: ActorRef ref
case ex: Exception throw ex
}
@ -301,7 +298,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
def actorOf(props: Props, name: String): ActorRef = {
implicit val timeout = settings.CreationTimeout
askAndAwait(guardian, CreateChild(props, name)) match {
Await.result(guardian ? CreateChild(props, name), timeout.duration) match {
case ref: ActorRef ref
case ex: Exception throw ex
}
@ -309,7 +306,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
def actorOf(props: Props): ActorRef = {
implicit val timeout = settings.CreationTimeout
askAndAwait(guardian, CreateRandomNameChild(props)) match {
Await.result(guardian ? CreateRandomNameChild(props), timeout.duration) match {
case ref: ActorRef ref
case ex: Exception throw ex
}
@ -321,8 +318,8 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
val guard = guardian.path
val sys = systemGuardian.path
path.parent match {
case `guard` askAndAwait(guardian, StopChild(actor))
case `sys` askAndAwait(systemGuardian, StopChild(actor))
case `guard` Await.result(guardian ? StopChild(actor), timeout.duration)
case `sys` Await.result(systemGuardian ? StopChild(actor), timeout.duration)
case _ actor.asInstanceOf[InternalActorRef].stop()
}
}

View file

@ -396,17 +396,18 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
case "equals" (args.length == 1 && (proxy eq args(0)) || actor == extension.getActorRefFor(args(0))).asInstanceOf[AnyRef] //Force boxing of the boolean
case "hashCode" actor.hashCode.asInstanceOf[AnyRef]
case _
import akka.patterns.ask
MethodCall(method, args) match {
case m if m.isOneWay actor ! m; null //Null return value
case m if m.returnsFuture_? Futures.ask(actor, m)(timeout)
case m if m.returnsFuture_? actor.?(m, timeout)
case m if m.returnsJOption_? || m.returnsOption_?
val f = Futures.ask(actor, m)(timeout)
val f = actor.?(m, timeout)
(try { Await.ready(f, timeout.duration).value } catch { case _: TimeoutException None }) match {
case None | Some(Right(null)) if (m.returnsJOption_?) JOption.none[Any] else None
case Some(Right(joption: AnyRef)) joption
case Some(Left(ex)) throw ex
}
case m Await.result(Futures.ask(actor, m)(timeout), timeout.duration).asInstanceOf[AnyRef]
case m Await.result(actor.?(m, timeout), timeout.duration).asInstanceOf[AnyRef]
}
}
}

View file

@ -8,9 +8,6 @@ package object actor {
implicit def actorRef2Scala(ref: ActorRef): ScalaActorRef = ref.asInstanceOf[ScalaActorRef]
implicit def scala2ActorRef(ref: ScalaActorRef): ActorRef = ref.asInstanceOf[ActorRef]
implicit def actorRef2Askable(actorRef: ActorRef) = new dispatch.AskableActorRef(actorRef)
implicit def askable2ActorRef(askable: dispatch.AskableActorRef) = askable.actorRef
type Uuid = com.eaio.uuid.UUID
def newUuid(): Uuid = new Uuid()
@ -41,7 +38,17 @@ package object actor {
}
}
// Implicit for converting a Promise to an actor.
}
package object patterns {
import akka.actor.{ ActorRef, InternalActorRef }
import akka.dispatch.Promise
import akka.util.Timeout
implicit def ask(actorRef: ActorRef): AskableActorRef = new AskableActorRef()(actorRef)
// Implicit for converting a Promise to an ActorRef.
// Symmetric to the future2actor conversion, which allows
// piping a Future result (read side) to an Actor's mailbox, this
// conversion allows using an Actor to complete a Promise (write side)
@ -49,13 +56,13 @@ package object actor {
// Future.ask / actor ? message is now a trivial implementation that can
// also be done in user code (assuming actorRef, timeout and dispatcher implicits):
//
// Future.ask(actor, message) = {
// Patterns.ask(actor, message) = {
// val promise = Promise[Any]()
// actor ! (message, promise)
// promise
// }
@inline implicit def promise2actor(promise: akka.dispatch.Promise[Any])(implicit actorRef: ActorRef, timeout: akka.util.Timeout) = {
@inline implicit def promise2actorRef(promise: Promise[Any])(implicit actorRef: ActorRef, timeout: Timeout): ActorRef = {
val provider = actorRef.asInstanceOf[InternalActorRef].provider
provider.ask(promise, timeout) match {
case Some(ref) ref
@ -64,3 +71,91 @@ package object actor {
}
}
package patterns {
import akka.actor.{ ActorRef, InternalActorRef }
import akka.dispatch.{ Future, Promise }
import akka.util.Timeout
final class AskableActorRef(implicit val actorRef: ActorRef) {
/**
* Akka Java API.
*
* Sends a message asynchronously returns a future holding the eventual reply message.
* The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given
* timeout has expired.
*
* <b>NOTE:</b>
* Use this method with care. In most cases it is better to use 'tell' together with the sender
* parameter to implement non-blocking request/response message exchanges.
*
* If you are sending messages using <code>ask</code> and using blocking operations on the Future, such as
* 'get', then you <b>have to</b> use <code>getContext().sender().tell(...)</code>
* in the target actor to send a reply message to the original sender, and thereby completing the Future,
* otherwise the sender will block until the timeout expires.
*
* When using future callbacks, inside actors you need to carefully avoid closing over
* the containing actors reference, i.e. do not call methods or access mutable state
* on the enclosing actor from within the callback. This would break the actor
* encapsulation and may introduce synchronization bugs and race conditions because
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
* there is not yet a way to detect these illegal accesses at compile time.
*/
def ask(message: AnyRef, timeout: Timeout): Future[AnyRef] = ?(message, timeout).asInstanceOf[Future[AnyRef]]
def ask(message: AnyRef, timeoutMillis: Long): Future[AnyRef] = ask(message, new Timeout(timeoutMillis))
/**
* Sends a message asynchronously, returning a future which may eventually hold the reply.
* The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given
* timeout has expired.
*
* <b>NOTE:</b>
* Use this method with care. In most cases it is better to use '!' together with implicit or explicit
* sender parameter to implement non-blocking request/response message exchanges.
*
* If you are sending messages using <code>ask</code> and using blocking operations on the Future, such as
* 'get', then you <b>have to</b> use <code>getContext().sender().tell(...)</code>
* in the target actor to send a reply message to the original sender, and thereby completing the Future,
* otherwise the sender will block until the timeout expires.
*
* When using future callbacks, inside actors you need to carefully avoid closing over
* the containing actors reference, i.e. do not call methods or access mutable state
* on the enclosing actor from within the callback. This would break the actor
* encapsulation and may introduce synchronization bugs and race conditions because
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
* there is not yet a way to detect these illegal accesses at compile time.
*/
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = {
implicit val dispatcher = actorRef.asInstanceOf[InternalActorRef].provider.dispatcher
val promise = Promise[Any]()
actorRef.!(message)(promise)
promise
}
/**
* Sends a message asynchronously, returning a future which may eventually hold the reply.
* The implicit parameter with the default value is just there to disambiguate it from the version that takes the
* implicit timeout
*/
def ?(message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = ?(message)(timeout)
}
}
object Patterns {
import akka.actor.ActorRef
import akka.dispatch.Future
import akka.patterns.{ ask => actorRef2Askable }
import akka.util.Timeout
def ask(actor: ActorRef, message: Any, timeout: Timeout): Future[Any] =
actorRef2Askable(actor).?(message)(timeout)
def ask(actor: ActorRef, message: Any, timeoutMillis: Long): Future[Any] =
actorRef2Askable(actor).?(message)(new Timeout(timeoutMillis))
}

View file

@ -24,7 +24,6 @@ import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger,
import akka.dispatch.Await.CanAwait
import java.util.concurrent._
import akka.actor.ActorSystem
import akka.actor.{ ActorRef, InternalActorRef }
object Await {
sealed trait CanAwait
@ -54,20 +53,6 @@ object Await {
*/
object Futures {
def ask(actor: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = {
implicit val dispatcher = actor.asInstanceOf[InternalActorRef].provider.dispatcher
implicit val actorRefContext = actor // for promise2actor implicit conversion
val promise = Promise[Any]()
actor.!(message)(promise)
promise
}
def ask(actor: ActorRef, message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] =
ask(actor, message)(timeout)
def ask(actor: ActorRef, message: Any, timeoutMillis: Long): Future[Any] =
ask(actor, message)(new Timeout(timeoutMillis))
/**
* Java API, equivalent to Future.apply
*/
@ -149,66 +134,6 @@ object Futures {
}
}
final class AskableActorRef(val actorRef: ActorRef) {
/**
* Akka Java API.
*
* Sends a message asynchronously returns a future holding the eventual reply message.
* The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given
* timeout has expired.
*
* <b>NOTE:</b>
* Use this method with care. In most cases it is better to use 'tell' together with the sender
* parameter to implement non-blocking request/response message exchanges.
*
* If you are sending messages using <code>ask</code> and using blocking operations on the Future, such as
* 'get', then you <b>have to</b> use <code>getContext().sender().tell(...)</code>
* in the target actor to send a reply message to the original sender, and thereby completing the Future,
* otherwise the sender will block until the timeout expires.
*
* When using future callbacks, inside actors you need to carefully avoid closing over
* the containing actors reference, i.e. do not call methods or access mutable state
* on the enclosing actor from within the callback. This would break the actor
* encapsulation and may introduce synchronization bugs and race conditions because
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
* there is not yet a way to detect these illegal accesses at compile time.
*/
def ask(message: AnyRef, timeout: Timeout): Future[AnyRef] = ?(message, timeout).asInstanceOf[Future[AnyRef]]
def ask(message: AnyRef, timeoutMillis: Long): Future[AnyRef] = ask(message, new Timeout(timeoutMillis))
/**
* Sends a message asynchronously, returning a future which may eventually hold the reply.
* The Future will be completed with an [[akka.actor.AskTimeoutException]] after the given
* timeout has expired.
*
* <b>NOTE:</b>
* Use this method with care. In most cases it is better to use '!' together with implicit or explicit
* sender parameter to implement non-blocking request/response message exchanges.
*
* If you are sending messages using <code>ask</code> and using blocking operations on the Future, such as
* 'get', then you <b>have to</b> use <code>getContext().sender().tell(...)</code>
* in the target actor to send a reply message to the original sender, and thereby completing the Future,
* otherwise the sender will block until the timeout expires.
*
* When using future callbacks, inside actors you need to carefully avoid closing over
* the containing actors reference, i.e. do not call methods or access mutable state
* on the enclosing actor from within the callback. This would break the actor
* encapsulation and may introduce synchronization bugs and race conditions because
* the callback will be scheduled concurrently to the enclosing actor. Unfortunately
* there is not yet a way to detect these illegal accesses at compile time.
*/
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = Futures.ask(actorRef, message)
/**
* Sends a message asynchronously, returning a future which may eventually hold the reply.
* The implicit parameter with the default value is just there to disambiguate it from the version that takes the
* implicit timeout
*/
def ?(message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = ?(message)(timeout)
}
object Future {
/**

View file

@ -14,7 +14,7 @@ import akka.util.Timeout
import java.util.concurrent.atomic.AtomicInteger
import scala.util.control.NoStackTrace
import java.util.concurrent.TimeoutException
import akka.dispatch.{ Await, Futures }
import akka.dispatch.Await
object LoggingBus {
implicit def fromActorSystem(system: ActorSystem): LoggingBus = system.eventStream
@ -158,7 +158,8 @@ trait LoggingBus extends ActorEventBus {
val name = "log" + Extension(system).id() + "-" + simpleName(clazz)
val actor = system.systemActorOf(Props(clazz), name)
implicit val timeout = Timeout(3 seconds)
val response = try Await.result(Futures.ask(actor, InitializeLogger(this)), timeout.duration) catch {
import akka.patterns.ask
val response = try Await.result(actor ? InitializeLogger(this), timeout.duration) catch {
case _: TimeoutException
publish(Warning(simpleName(this), "Logger " + name + " did not respond within " + timeout + " to InitializeLogger(bus)"))
}