introduce AkkaApplication

- remove global Config
- pull everything which depended on it into new AkkaApplication
- leave EventHandler alone for the moment: that evil sucker gets his
  very own AkkaApplication("akka-reference.conf") until we have settled
  on an acceptable logging API without globals
- make akka-actor and akka-testkit compile
- TestKit uses implicit AkkaApplication passing for maximum convenience
- Actor object nearly completely removed, actor creation possible via
  ActorRefFactory interface which is implemented by AkkaApplication and
  ActorContext
- serialization of ActorRef is probably broken, and so is the reflective
  RemoteSupport (now needs AkkaApplication constructor arg)
- everything else is still broken, including akka-actor-tests, so this
  is of course all not runtime-tested
This commit is contained in:
Roland 2011-10-06 21:19:46 +02:00
parent ccb429df13
commit 2381ec54d0
46 changed files with 734 additions and 1066 deletions

View file

@ -16,103 +16,6 @@ import com.eaio.uuid.UUID;
* - locating actors
*/
public class Actors {
/**
*
* @return The actor registry
*/
public static ActorRegistry registry() {
return Actor$.MODULE$.registry();
}
/**
*
* @return
* @throws UnsupportedOperationException If remoting isn't configured
* @throws ModuleNotAvailableException If the class for the remote support cannot be loaded
*/
public static RemoteSupport remote() {
return Actor$.MODULE$.remote();
}
/**
* NOTE: Use this convenience method with care, do NOT make it possible to get a reference to the
* UntypedActor instance directly, but only through its 'ActorRef' wrapper reference.
* <p/>
* Creates an ActorRef out of the Actor. Allows you to pass in the instance for the UntypedActor.
* Only use this method when you need to pass in constructor arguments into the 'UntypedActor'.
* <p/>
* You use it by implementing the UntypedActorFactory interface.
* Example in Java:
* <pre>
* ActorRef actor = Actors.actorOf(new UntypedActorFactory() {
* public UntypedActor create() {
* return new MyUntypedActor("service:name", 5);
* }
* }, "my-actor-address");
* actor.tell(message, context);
* actor.stop();
* </pre>
*/
public static ActorRef actorOf(final Creator<Actor> factory, final String address) {
return Actor$.MODULE$.actorOf(factory, address);
}
/**
* NOTE: Use this convenience method with care, do NOT make it possible to get a reference to the
* UntypedActor instance directly, but only through its 'ActorRef' wrapper reference.
* <p/>
* Creates an ActorRef out of the Actor. Allows you to pass in the instance for the UntypedActor.
* Only use this method when you need to pass in constructor arguments into the 'UntypedActor'.
* <p/>
* You use it by implementing the UntypedActorFactory interface.
* Example in Java:
* <pre>
* ActorRef actor = Actors.actorOf(new UntypedActorFactory() {
* public UntypedActor create() {
* return new MyUntypedActor("service:name", 5);
* }
* });
* actor.tell(message, context);
* actor.stop();
* </pre>
*/
public static ActorRef actorOf(final Creator<Actor> factory) {
return Actor$.MODULE$.actorOf(factory, new UUID().toString());
}
/**
* Creates an ActorRef out of the Actor type represented by the class provided.
* Example in Java:
* <pre>
* ActorRef actor = Actors.actorOf(MyUntypedActor.class, "my-actor-address");
* actor.tell(message, context);
* actor.stop();
* </pre>
* You can create and start the actor in one statement like this:
* <pre>
* val actor = Actors.actorOf(MyActor.class, "my-actor-address");
* </pre>
*/
public static ActorRef actorOf(final Class<? extends Actor> type, final String address) {
return Actor$.MODULE$.actorOf(type, address);
}
/**
* Creates an ActorRef out of the Actor type represented by the class provided.
* Example in Java:
* <pre>
* ActorRef actor = Actors.actorOf(MyUntypedActor.class, "my-actor-address");
* actor.tell(message, context);
* actor.stop();
* </pre>
* You can create and start the actor in one statement like this:
* <pre>
* val actor = Actors.actorOf(MyActor.class);
* </pre>
*/
public static ActorRef actorOf(final Class<? extends Actor> type) {
return Actor$.MODULE$.actorOf(type, new UUID().toString());
}
/**
* The message that is sent when an Actor gets a receive timeout.

View file

@ -4,8 +4,6 @@
package akka.dispatch;
import sun.tools.tree.FinallyStatement;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
abstract class AbstractPromise {

View file

@ -0,0 +1,146 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka
import akka.config._
import akka.actor._
import java.net.InetAddress
import com.eaio.uuid.UUID
import dispatch.{ Dispatcher, Dispatchers }
import akka.util.Duration
import util.ReflectiveAccess
import java.util.concurrent.TimeUnit
import akka.dispatch.BoundedMailbox
import akka.dispatch.UnboundedMailbox
import akka.routing.Routing
import remote.RemoteSupport
import akka.serialization.Serialization
object AkkaApplication {
val VERSION = "2.0-SNAPSHOT"
val GLOBAL_HOME = systemHome orElse envHome
val envHome = System.getenv("AKKA_HOME") match {
case null | "" | "." None
case value Some(value)
}
val systemHome = System.getProperty("akka.home") match {
case null | "" None
case value Some(value)
}
val envConf = System.getenv("AKKA_MODE") match {
case null | "" None
case value Some(value)
}
val systemConf = System.getProperty("akka.mode") match {
case null | "" None
case value Some(value)
}
val defaultLocation = (systemConf orElse envConf).map("akka." + _ + ".conf").getOrElse("akka.conf")
val fromProperties = try {
Some(Configuration.fromFile(System.getProperty("akka.config", "")))
} catch { case _ None }
val fromClasspath = try {
Some(Configuration.fromResource(defaultLocation))
} catch { case _ None }
val fromHome = try {
Some(Configuration.fromFile(GLOBAL_HOME.get + "/config/" + defaultLocation))
} catch { case _ None }
val emptyConfig = Configuration.fromString("akka { version = \"" + VERSION + "\" }")
def apply(name: String): AkkaApplication = new AkkaApplication(name, fromProperties orElse fromClasspath orElse fromHome getOrElse emptyConfig)
def apply(): AkkaApplication = apply("default")
}
class AkkaApplication(val name: String, val config: Configuration) extends ActorRefFactory {
import AkkaApplication._
object AkkaConfig {
import config._
val CONFIG_VERSION = getString("akka.version", VERSION)
val TIME_UNIT = getString("akka.time-unit", "seconds")
val TIMEOUT = Timeout(Duration(getInt("akka.actor.timeout", 5), TIME_UNIT))
val TimeoutMillis = TIMEOUT.duration.toMillis
val SERIALIZE_MESSAGES = getBool("akka.actor.serialize-messages", false)
val LogLevel = getString("akka.event-handler-level", "INFO")
val EventHandlers = getList("akka.event-handlers")
val ADD_LOGGING_RECEIVE = getBool("akka.actor.debug.receive", false)
val DEBUG_AUTO_RECEIVE = getBool("akka.actor.debug.autoreceive", false)
val DEBUG_LIFECYCLE = getBool("akka.actor.debug.lifecycle", false)
val FsmDebugEvent = getBool("akka.actor.debug.fsm", false)
val DispatcherThroughput = getInt("akka.actor.throughput", 5)
val DispatcherDefaultShutdown = getLong("akka.actor.dispatcher-shutdown-timeout").
map(time Duration(time, TIME_UNIT)).
getOrElse(Duration(1000, TimeUnit.MILLISECONDS))
val MailboxCapacity = getInt("akka.actor.default-dispatcher.mailbox-capacity", -1)
val MailboxPushTimeout = Duration(getInt("akka.actor.default-dispatcher.mailbox-push-timeout-time", 10), TIME_UNIT)
val ThroughputDeadlineTime = Duration(getInt("akka.actor.throughput-deadline-time", -1), TIME_UNIT)
val HOME = getString("akka.home")
val BOOT_CLASSES = getList("akka.boot")
val ENABLED_MODULES = getList("akka.enabled-modules")
val CLUSTER_ENABLED = ENABLED_MODULES exists (_ == "cluster")
val ClusterName = getString("akka.cluster.name", "default")
val REMOTE_TRANSPORT = getString("akka.remote.layer", "akka.remote.netty.NettyRemoteSupport")
val REMOTE_SERVER_PORT = getInt("akka.remote.server.port", 2552)
}
import AkkaConfig._
if (CONFIG_VERSION != VERSION)
throw new ConfigurationException("Akka JAR version [" + VERSION +
"] does not match the provided config version [" + CONFIG_VERSION + "]")
// TODO correctly pull its config from the config
val dispatcherFactory = new Dispatchers(this)
implicit val dispatcher = dispatcherFactory.defaultGlobalDispatcher
// TODO think about memory consistency effects when doing funky stuff inside an ActorRefProvider's constructor
val provider: ActorRefProvider = new LocalActorRefProvider(this)
/**
* Handle to the ActorRegistry.
* TODO: delete me!
*/
val registry = new ActorRegistry
// TODO check memory consistency issues
val reflective = new ReflectiveAccess(this)
val routing = new Routing(this)
val serialization = new Serialization(this)
val startTime = System.currentTimeMillis
def uptime = (System.currentTimeMillis - startTime) / 1000
val nodename = System.getProperty("akka.cluster.nodename") match {
case null | "" new UUID().toString
case value value
}
val hostname = System.getProperty("akka.remote.hostname") match {
case null | "" InetAddress.getLocalHost.getHostName
case value value
}
}

View file

@ -8,16 +8,14 @@ import DeploymentConfig._
import akka.dispatch._
import akka.config._
import akka.routing._
import Config._
import akka.util.{ ReflectiveAccess, Duration }
import ReflectiveAccess._
import akka.util.Duration
import akka.remote.RemoteSupport
import akka.cluster.ClusterNode
import akka.japi.{ Creator, Procedure }
import akka.serialization.{ Serializer, Serialization }
import akka.event.EventHandler
import akka.experimental
import akka.AkkaException
import akka.{ AkkaApplication, AkkaException }
import scala.reflect.BeanProperty
@ -123,12 +121,6 @@ case class Timeout(duration: Duration) {
}
object Timeout {
/**
* The default timeout, based on the config setting 'akka.actor.timeout'
*/
@BeanProperty
implicit val default = new Timeout(Actor.TIMEOUT)
/**
* A timeout with zero duration, will cause most requests to always timeout.
*/
@ -149,217 +141,10 @@ object Timeout {
implicit def longToTimeout(timeout: Long) = new Timeout(timeout)
}
/**
* Actor factory module with factory methods for creating various kinds of Actors.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Actor {
/**
* A Receive is a convenience type that defines actor message behavior currently modeled as
* a PartialFunction[Any, Unit].
*/
type Receive = PartialFunction[Any, Unit]
private[akka] val TIMEOUT = Duration(config.getInt("akka.actor.timeout", 5), TIME_UNIT).toMillis
private[akka] val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false)
/**
* Handle to the ActorRefProviders for looking up and creating ActorRefs.
*/
val provider = new ActorRefProviders
/**
* Handle to the ActorRegistry.
*/
val registry = new ActorRegistry
/**
* Handle to the ClusterNode. API for the cluster client.
*/
// lazy val cluster: ClusterNode = ClusterModule.node
/**
* Handle to the RemoteSupport. API for the remote client/server.
* Only for internal use.
*/
private[akka] lazy val remote: RemoteSupport = RemoteModule.remoteService.server
/**
* This decorator adds invocation logging to a Receive function.
*/
class LoggingReceive(source: AnyRef, r: Receive) extends Receive {
def isDefinedAt(o: Any) = {
val handled = r.isDefinedAt(o)
EventHandler.debug(source, "received " + (if (handled) "handled" else "unhandled") + " message " + o)
handled
}
def apply(o: Any): Unit = r(o)
}
object LoggingReceive {
def apply(source: AnyRef, r: Receive): Receive = r match {
case _: LoggingReceive r
case _ new LoggingReceive(source, r)
}
}
/**
* Wrap a Receive partial function in a logging enclosure, which sends a
* debug message to the EventHandler each time before a message is matched.
* This includes messages which are not handled.
*
* <pre><code>
* def receive = loggable {
* case x => ...
* }
* </code></pre>
*
* This method does NOT modify the given Receive unless
* akka.actor.debug.receive is set within akka.conf.
*/
def loggable(self: AnyRef)(r: Receive): Receive = if (addLoggingReceive) LoggingReceive(self, r) else r
private[akka] val addLoggingReceive = config.getBool("akka.actor.debug.receive", false)
private[akka] val debugAutoReceive = config.getBool("akka.actor.debug.autoreceive", false)
private[akka] val debugLifecycle = config.getBool("akka.actor.debug.lifecycle", false)
/**
* Creates an ActorRef out of the Actor with type T.
* <pre>
* import Actor._
* val actor = actorOf[MyActor]
* actor ! message
* actor.stop()
* </pre>
*/
def actorOf[T <: Actor: Manifest](address: String): ActorRef =
actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], address)
/**
* Creates an ActorRef out of the Actor with type T.
* Uses generated address.
* <pre>
* import Actor._
* val actor = actorOf[MyActor]
* actor ! message
* actor.stop
* </pre>
*/
def actorOf[T <: Actor: Manifest]: ActorRef =
actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], new UUID().toString)
/**
* Creates an ActorRef out of the Actor of the specified Class.
* Uses generated address.
* <pre>
* import Actor._
* val actor = actorOf(classOf[MyActor])
* actor ! message
* actor.stop()
* </pre>
*/
def actorOf[T <: Actor](clazz: Class[T]): ActorRef = actorOf(clazz, new UUID().toString)
/**
* Creates an ActorRef out of the Actor of the specified Class.
* <pre>
* import Actor._
* val actor = actorOf(classOf[MyActor])
* actor ! message
* actor.stop
* </pre>
*/
def actorOf[T <: Actor](clazz: Class[T], address: String): ActorRef = actorOf(Props(clazz), address)
/**
* Creates an ActorRef out of the Actor. Allows you to pass in a factory function
* that creates the Actor. Please note that this function can be invoked multiple
* times if for example the Actor is supervised and needs to be restarted.
* Uses generated address.
* <p/>
* <pre>
* import Actor._
* val actor = actorOf(new MyActor)
* actor ! message
* actor.stop()
* </pre>
*/
def actorOf[T <: Actor](factory: T): ActorRef = actorOf(factory, newUuid().toString)
/**
* Creates an ActorRef out of the Actor. Allows you to pass in a factory function
* that creates the Actor. Please note that this function can be invoked multiple
* times if for example the Actor is supervised and needs to be restarted.
* <p/>
* This function should <b>NOT</b> be used for remote actors.
* <pre>
* import Actor._
* val actor = actorOf(new MyActor)
* actor ! message
* actor.stop
* </pre>
*/
def actorOf[T <: Actor](creator: T, address: String): ActorRef = actorOf(Props(creator), address)
/**
* Creates an ActorRef out of the Actor. Allows you to pass in a factory (Creator<Actor>)
* that creates the Actor. Please note that this function can be invoked multiple
* times if for example the Actor is supervised and needs to be restarted.
* Uses generated address.
* <p/>
* JAVA API
*/
def actorOf[T <: Actor](creator: Creator[T]): ActorRef = actorOf(Props(creator), newUuid().toString)
/**
* Creates an ActorRef out of the Actor. Allows you to pass in a factory (Creator<Actor>)
* that creates the Actor. Please note that this function can be invoked multiple
* times if for example the Actor is supervised and needs to be restarted.
* <p/>
* This function should <b>NOT</b> be used for remote actors.
* JAVA API
*/
def actorOf[T <: Actor](creator: Creator[T], address: String): ActorRef = actorOf(Props(creator), address)
/**
* Creates an ActorRef out of the Actor.
* <p/>
* <pre>
* FIXME document
* </pre>
*/
def actorOf(props: Props): ActorRef = actorOf(props, newUuid.toString)
/**
* Creates an ActorRef out of the Actor.
* <p/>
* <pre>
* FIXME document
* </pre>
*/
def actorOf(props: Props, address: String): ActorRef = provider.actorOf(props, address)
/**
* Use to spawn out a block of code in an event-driven actor. Will shut actor down when
* the block has been executed.
* <p/>
* Only to be used from Scala code.
* <p/>
* NOTE: If used from within an Actor then has to be qualified with 'Actor.spawn' since
* there is a method 'spawn[ActorType]' in the Actor trait already.
* Example:
* <pre>
* import Actor.spawn
*
* spawn {
* ... // do stuff
* }
* </pre>
*/
def spawn(body: Unit)(implicit dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher) {
actorOf(Props(context { case "go" try { body } finally { context.self.stop() } }).withDispatcher(dispatcher)) ! "go"
}
}
/**
@ -379,11 +164,6 @@ object Actor {
*/
trait Actor {
import Actor.{ addLoggingReceive, debugAutoReceive, LoggingReceive }
/**
* Type alias because traits cannot have companion objects.
*/
type Receive = Actor.Receive
/**
@ -408,6 +188,50 @@ trait Actor {
context
}
implicit def application = context.application
private def config = application.AkkaConfig
/**
* The default timeout, based on the config setting 'akka.actor.timeout'
*/
implicit val defaultTimeout = config.TIMEOUT
/**
* This decorator adds invocation logging to a Receive function.
*/
class LoggingReceive(source: AnyRef, r: Receive) extends Receive {
def isDefinedAt(o: Any) = {
val handled = r.isDefinedAt(o)
EventHandler.debug(source, "received " + (if (handled) "handled" else "unhandled") + " message " + o)
handled
}
def apply(o: Any): Unit = r(o)
}
object LoggingReceive {
def apply(source: AnyRef, r: Receive): Receive = r match {
case _: LoggingReceive r
case _ new LoggingReceive(source, r)
}
}
/**
* Wrap a Receive partial function in a logging enclosure, which sends a
* debug message to the EventHandler each time before a message is matched.
* This includes messages which are not handled.
*
* <pre><code>
* def receive = loggable {
* case x => ...
* }
* </code></pre>
*
* This method does NOT modify the given Receive unless
* akka.actor.debug.receive is set within akka.conf.
*/
def loggable(self: AnyRef)(r: Receive): Receive = if (config.ADD_LOGGING_RECEIVE) LoggingReceive(self, r) else r
/**
* Some[ActorRef] representation of the 'self' ActorRef reference.
* <p/>
@ -596,7 +420,7 @@ trait Actor {
throw new InvalidMessageException("Message from [" + channel + "] to [" + self.toString + "] is null")
def autoReceiveMessage(msg: AutoReceivedMessage) {
if (debugAutoReceive) EventHandler.debug(this, "received AutoReceiveMessage " + msg)
if (config.DEBUG_AUTO_RECEIVE) EventHandler.debug(this, "received AutoReceiveMessage " + msg)
msg match {
case HotSwap(code, discardOld) become(code(self), discardOld)

View file

@ -12,13 +12,14 @@ import scala.collection.JavaConverters
import akka.event.{ InVMMonitoring, EventHandler }
import java.util.concurrent.{ ScheduledFuture, TimeUnit }
import java.util.{ Collection JCollection, Collections JCollections }
import akka.AkkaApplication
/**
* The actor context - the view of the actor cell from the actor.
* Exposes contextual information for the actor and the current message.
* TODO: everything here for current compatibility - could be limited more
*/
private[akka] trait ActorContext {
private[akka] trait ActorContext extends ActorRefFactory {
def self: ActorRef with ScalaActorRef
@ -47,6 +48,8 @@ private[akka] trait ActorContext {
def handleFailure(fail: Failed): Unit
def handleChildTerminated(child: ActorRef): Unit
def application: AkkaApplication
}
case class ChildRestartStats(val child: ActorRef, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L) {
@ -204,6 +207,7 @@ private[akka] object ActorCell {
}
private[akka] class ActorCell(
val application: AkkaApplication,
val self: ActorRef with ScalaActorRef,
val props: Props,
@volatile var receiveTimeout: Option[Long],
@ -211,6 +215,8 @@ private[akka] class ActorCell(
import ActorCell._
def provider = application.provider
@volatile
var futureTimeout: Option[ScheduledFuture[AnyRef]] = None //FIXME TODO Doesn't need to be volatile either, since it will only ever be accessed when a message is processed
@ -245,7 +251,7 @@ private[akka] class ActorCell(
}
}
Actor.registry.register(self)
application.registry.register(self)
dispatcher.attach(this)
}
@ -331,7 +337,7 @@ private[akka] class ActorCell(
actor = created
created.preStart()
checkReceiveTimeout
if (Actor.debugLifecycle) EventHandler.debug(created, "started")
if (application.AkkaConfig.DEBUG_LIFECYCLE) EventHandler.debug(created, "started")
} catch {
case e try {
EventHandler.error(e, this, "error while creating actor")
@ -345,7 +351,7 @@ private[akka] class ActorCell(
def recreate(cause: Throwable): Unit = try {
val failedActor = actor
if (Actor.debugLifecycle) EventHandler.debug(failedActor, "restarting")
if (application.AkkaConfig.DEBUG_LIFECYCLE) EventHandler.debug(failedActor, "restarting")
val freshActor = newActor()
if (failedActor ne null) {
val c = currentMessage //One read only plz
@ -359,7 +365,7 @@ private[akka] class ActorCell(
}
actor = freshActor // assign it here so if preStart fails, we can null out the sef-refs next call
freshActor.postRestart(cause)
if (Actor.debugLifecycle) EventHandler.debug(freshActor, "restarted")
if (application.AkkaConfig.DEBUG_LIFECYCLE) EventHandler.debug(freshActor, "restarted")
dispatcher.resume(this) //FIXME should this be moved down?
@ -382,13 +388,13 @@ private[akka] class ActorCell(
def terminate() {
receiveTimeout = None
cancelReceiveTimeout
Actor.provider.evict(self.address)
Actor.registry.unregister(self)
application.provider.evict(self.address)
application.registry.unregister(self)
dispatcher.detach(this)
try {
val a = actor
if (Actor.debugLifecycle) EventHandler.debug(a, "stopping")
if (application.AkkaConfig.DEBUG_LIFECYCLE) EventHandler.debug(a, "stopping")
if (a ne null) a.postStop()
//Stop supervised actors
@ -410,7 +416,7 @@ private[akka] class ActorCell(
val links = _linkedActors
if (!links.contains(child)) {
_linkedActors = new ChildRestartStats(child) :: links
if (Actor.debugLifecycle) EventHandler.debug(actor, "now supervising " + child)
if (application.AkkaConfig.DEBUG_LIFECYCLE) EventHandler.debug(actor, "now supervising " + child)
} else EventHandler.warning(actor, "Already supervising " + child)
}
@ -422,10 +428,10 @@ private[akka] class ActorCell(
case Recreate(cause) recreate(cause)
case Link(subject)
akka.event.InVMMonitoring.link(self, subject)
if (Actor.debugLifecycle) EventHandler.debug(actor, "now monitoring " + subject)
if (application.AkkaConfig.DEBUG_LIFECYCLE) EventHandler.debug(actor, "now monitoring " + subject)
case Unlink(subject)
akka.event.InVMMonitoring.unlink(self, subject)
if (Actor.debugLifecycle) EventHandler.debug(actor, "stopped monitoring " + subject)
if (application.AkkaConfig.DEBUG_LIFECYCLE) EventHandler.debug(actor, "stopped monitoring " + subject)
case Suspend suspend()
case Resume resume()
case Terminate terminate()

View file

@ -7,11 +7,12 @@ package akka.actor
import akka.dispatch._
import akka.util._
import akka.serialization.{ Serializer, Serialization }
import ReflectiveAccess._
import ClusterModule._
import java.net.InetSocketAddress
import scala.collection.immutable.Stack
import java.lang.{ UnsupportedOperationException, IllegalStateException }
import akka.AkkaApplication
import akka.remote.RemoteSupport
import scala.util.DynamicVariable
/**
* ActorRef is an immutable and serializable handle to an Actor.
@ -56,15 +57,6 @@ abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyCha
*/
def compareTo(other: ActorRef) = this.address compareTo other.address
protected[akka] def timeout: Long = Props.defaultTimeout.duration.toMillis //TODO Remove me if possible
/**
* Akka Java API. <p/>
* @see ask(message: AnyRef, sender: ActorRef): Future[_]
* Uses the Actors default timeout (setTimeout()) and omits the sender
*/
def ask(message: AnyRef): Future[AnyRef] = ask(message, timeout, null)
/**
* Akka Java API. <p/>
* @see ask(message: AnyRef, sender: ActorRef): Future[_]
@ -72,13 +64,6 @@ abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyCha
*/
def ask(message: AnyRef, timeout: Long): Future[Any] = ask(message, timeout, null)
/**
* Akka Java API. <p/>
* @see ask(message: AnyRef, sender: ActorRef): Future[_]
* Uses the Actors default timeout (setTimeout())
*/
def ask(message: AnyRef, sender: ActorRef): Future[AnyRef] = ask(message, timeout, sender)
/**
* Akka Java API. <p/>
* Sends a message asynchronously returns a future holding the eventual reply message.
@ -162,6 +147,7 @@ abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyCha
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class LocalActorRef private[akka] (
application: AkkaApplication,
private[this] val props: Props,
val address: String,
val systemService: Boolean = false,
@ -170,20 +156,7 @@ class LocalActorRef private[akka] (
hotswap: Stack[PartialFunction[Any, Unit]] = Stack.empty)
extends ActorRef with ScalaActorRef {
// used only for deserialization
private[akka] def this(
__uuid: Uuid,
__address: String,
__props: Props,
__receiveTimeout: Option[Long],
__hotswap: Stack[PartialFunction[Any, Unit]]) = {
this(__props, __address, false, __uuid, __receiveTimeout, __hotswap)
actorCell.setActorContext(actorCell) // this is needed for deserialization - why?
}
private[this] val actorCell = new ActorCell(this, props, receiveTimeout, hotswap)
private[this] val actorCell = new ActorCell(application, this, props, receiveTimeout, hotswap)
actorCell.start()
/**
@ -250,8 +223,6 @@ class LocalActorRef private[akka] (
instance
}
protected[akka] override def timeout: Long = props.timeout.duration.toMillis // TODO: remove this if possible
protected[akka] def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit =
actorCell.postMessageToMailbox(message, channel)
@ -270,10 +241,9 @@ class LocalActorRef private[akka] (
@throws(classOf[java.io.ObjectStreamException])
private def writeReplace(): AnyRef = {
val inetaddr =
if (ReflectiveAccess.RemoteModule.isEnabled) Actor.remote.address
else ReflectiveAccess.RemoteModule.configDefaultAddress
SerializedActorRef(uuid, address, inetaddr.getAddress.getHostAddress, inetaddr.getPort, timeout)
// TODO: this was used to really send LocalActorRef across the network, which is broken now
val inetaddr = application.reflective.RemoteModule.configDefaultAddress
SerializedActorRef(uuid, address, inetaddr.getAddress.getHostAddress, inetaddr.getPort)
}
}
@ -293,9 +263,10 @@ object RemoteActorSystemMessage {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[akka] case class RemoteActorRef private[akka] (
val application: AkkaApplication,
val remote: RemoteSupport,
val remoteAddress: InetSocketAddress,
val address: String,
_timeout: Long,
loader: Option[ClassLoader])
extends ActorRef with ScalaActorRef {
@ -304,13 +275,11 @@ private[akka] case class RemoteActorRef private[akka] (
def isShutdown: Boolean = !running
RemoteModule.ensureEnabled()
protected[akka] override def timeout: Long = _timeout
application.reflective.RemoteModule.ensureEnabled()
def postMessageToMailbox(message: Any, channel: UntypedChannel) {
val chSender = if (channel.isInstanceOf[ActorRef]) Some(channel.asInstanceOf[ActorRef]) else None
Actor.remote.send[Any](message, chSender, None, remoteAddress, timeout, true, this, loader)
remote.send[Any](message, chSender, None, remoteAddress, true, this, loader)
}
def postMessageToMailboxAndCreateFutureResultWithTimeout(
@ -320,9 +289,9 @@ private[akka] case class RemoteActorRef private[akka] (
val chSender = if (channel.isInstanceOf[ActorRef]) Some(channel.asInstanceOf[ActorRef]) else None
val chFuture = if (channel.isInstanceOf[Promise[_]]) Some(channel.asInstanceOf[Promise[Any]]) else None
val future = Actor.remote.send[Any](message, chSender, chFuture, remoteAddress, timeout.duration.toMillis, false, this, loader)
val future = remote.send[Any](message, chSender, chFuture, remoteAddress, false, this, loader)
if (future.isDefined) ActorPromise(future.get)
if (future.isDefined) ActorPromise(future.get)(timeout)
else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
}
@ -341,7 +310,7 @@ private[akka] case class RemoteActorRef private[akka] (
@throws(classOf[java.io.ObjectStreamException])
private def writeReplace(): AnyRef = {
SerializedActorRef(uuid, address, remoteAddress.getAddress.getHostAddress, remoteAddress.getPort, timeout)
SerializedActorRef(uuid, address, remoteAddress.getAddress.getHostAddress, remoteAddress.getPort)
}
def link(actorRef: ActorRef): ActorRef = unsupported
@ -415,15 +384,17 @@ trait ScalaActorRef extends ActorRefShared with ReplyChannel[Any] { ref: ActorRe
case class SerializedActorRef(uuid: Uuid,
address: String,
hostname: String,
port: Int,
timeout: Long) {
port: Int) {
import akka.serialization.Serialization._
@throws(classOf[java.io.ObjectStreamException])
def readResolve(): AnyRef = Actor.registry.local.actorFor(uuid) match {
def readResolve(): AnyRef = application.value.registry.local.actorFor(uuid) match {
case Some(actor) actor
case None
//TODO FIXME Add case for when hostname+port == remote.address.hostname+port, should return a DeadActorRef or something
if (ReflectiveAccess.RemoteModule.isEnabled)
RemoteActorRef(new InetSocketAddress(hostname, port), address, timeout, None)
val remote = application.value.reflective.RemoteModule
if (remote.isEnabled)
RemoteActorRef(application.value, remote.defaultRemoteSupport.get(), new InetSocketAddress(hostname, port), address, None)
else
throw new IllegalStateException(
"Trying to deserialize ActorRef [" + this +

View file

@ -8,6 +8,11 @@ import DeploymentConfig._
import akka.event.EventHandler
import akka.AkkaException
import akka.routing._
import akka.AkkaApplication
import akka.dispatch.MessageDispatcher
import java.util.concurrent.ConcurrentHashMap
import akka.dispatch.Promise
import com.eaio.uuid.UUID
/**
* Interface for all ActorRef providers to implement.
@ -21,6 +26,35 @@ trait ActorRefProvider {
private[akka] def evict(address: String): Boolean
}
/**
* Interface implemented by AkkaApplication and AkkaContext, the only two places from which you can get fresh actors
*/
trait ActorRefFactory {
def provider: ActorRefProvider
def dispatcher: MessageDispatcher
def createActor(props: Props): ActorRef = createActor(props, new UUID().toString)
/*
* TODO this will have to go at some point, because creating two actors with
* the same address can race on the cluster, and then you never know which
* implementation wins
*/
def createActor(props: Props, address: String): ActorRef = {
val p =
if (props.dispatcher == Props.defaultDispatcher)
props.copy(dispatcher = dispatcher)
else
props
provider.actorOf(p, address).get
}
def findActor(address: String): Option[ActorRef] = provider.findActorRef(address)
}
class ActorRefProviderException(message: String) extends AkkaException(message)
object ActorRefProvider {
@ -30,108 +64,19 @@ object ActorRefProvider {
object ClusterProvider extends ProviderType
}
/**
* Container for all ActorRef providers.
*/
private[akka] class ActorRefProviders(
@volatile private var localProvider: Option[ActorRefProvider] = Some(new LocalActorRefProvider),
@volatile private var remoteProvider: Option[ActorRefProvider] = None,
@volatile private var clusterProvider: Option[ActorRefProvider] = None) {
import ActorRefProvider._
def register(providerType: ProviderType, provider: ActorRefProvider) = {
EventHandler.info(this, "Registering ActorRefProvider [%s]".format(provider.getClass.getName))
providerType match {
case LocalProvider localProvider = Option(provider)
case RemoteProvider remoteProvider = Option(provider)
case ClusterProvider clusterProvider = Option(provider)
}
}
//FIXME Implement support for configuring by deployment ID etc
//FIXME If address matches an already created actor (Ahead-of-time deployed) return that actor
//FIXME If address exists in config, it will override the specified Props (should we attempt to merge?)
def actorOf(props: Props, address: String): ActorRef = {
@annotation.tailrec
def actorOf(props: Props, address: String, providers: List[ActorRefProvider]): Option[ActorRef] = {
providers match {
case Nil None
case provider :: rest
provider.actorOf(props, address) match {
case None actorOf(props, address, rest) // recur
case ref ref
}
}
}
actorOf(props, address, providersAsList).getOrElse(throw new ActorRefProviderException(
"Actor [" +
address +
"] could not be found in or created by any of the registered 'ActorRefProvider's [" +
providersAsList.map(_.getClass.getName).mkString(", ") + "]"))
}
def findActorRef(address: String): Option[ActorRef] = {
@annotation.tailrec
def findActorRef(address: String, providers: List[ActorRefProvider]): Option[ActorRef] = {
providers match {
case Nil None
case provider :: rest
provider.findActorRef(address) match {
case None findActorRef(address, rest) // recur
case ref ref
}
}
}
findActorRef(address, providersAsList)
}
/**
* Returns true if the actor was in the provider's cache and evicted successfully, else false.
*/
private[akka] def evict(address: String): Boolean = {
@annotation.tailrec
def evict(address: String, providers: List[ActorRefProvider]): Boolean = {
providers match {
case Nil false
case provider :: rest
if (provider.evict(address)) true // done
else evict(address, rest) // recur
}
}
evict(address, providersAsList)
}
private[akka] def systemActorOf(props: Props, address: String): Option[ActorRef] = {
localProvider
.getOrElse(throw new IllegalStateException("No LocalActorRefProvider available"))
.asInstanceOf[LocalActorRefProvider]
.actorOf(props, address, true)
}
private def providersAsList = List(localProvider, remoteProvider, clusterProvider).flatten
}
/**
* Local ActorRef provider.
*/
class LocalActorRefProvider extends ActorRefProvider {
import java.util.concurrent.ConcurrentHashMap
import akka.dispatch.Promise
import com.eaio.uuid.UUID
class LocalActorRefProvider(application: AkkaApplication) extends ActorRefProvider {
import application.dispatcher
private val actors = new ConcurrentHashMap[String, Promise[Option[ActorRef]]]
private val deployer = new Deployer(application)
def actorOf(props: Props, address: String): Option[ActorRef] = actorOf(props, address, false)
def findActorRef(address: String): Option[ActorRef] = Actor.registry.local.actorFor(address)
def findActorRef(address: String): Option[ActorRef] = application.registry.local.actorFor(address)
/**
* Returns true if the actor was in the provider's cache and evicted successfully, else false.
@ -147,11 +92,11 @@ class LocalActorRefProvider extends ActorRefProvider {
if (oldFuture eq null) { // we won the race -- create the actor and resolve the future
val actor = try {
Deployer.lookupDeploymentFor(address) match { // see if the deployment already exists, if so use it, if not create actor
deployer.lookupDeploymentFor(address) match { // see if the deployment already exists, if so use it, if not create actor
// create a local actor
case None | Some(Deploy(_, _, Direct, _, _, LocalScope))
Some(new LocalActorRef(props, address, systemService)) // create a local actor
Some(new LocalActorRef(application, props, address, systemService)) // create a local actor
// create a routed actor ref
case deploy @ Some(Deploy(_, _, router, nrOfInstances, _, LocalScope))
@ -168,10 +113,10 @@ class LocalActorRefProvider extends ActorRefProvider {
}
val connections: Iterable[ActorRef] =
if (nrOfInstances.factor > 0)
Vector.fill(nrOfInstances.factor)(new LocalActorRef(props, new UUID().toString, systemService))
Vector.fill(nrOfInstances.factor)(new LocalActorRef(application, props, new UUID().toString, systemService))
else Nil
Some(Routing.actorOf(RoutedProps(
Some(application.routing.actorOf(RoutedProps(
routerFactory = routerFactory,
connections = connections)))
@ -183,7 +128,7 @@ class LocalActorRefProvider extends ActorRefProvider {
throw e
}
actor foreach Actor.registry.register // only for ActorRegistry backward compat, will be removed later
actor foreach application.registry.register // only for ActorRegistry backward compat, will be removed later
newFuture completeWithResult actor
actor

View file

@ -28,7 +28,7 @@ case class TypedActorUnregistered(@BeanProperty address: String, @BeanProperty a
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[actor] final class ActorRegistry private[actor] () extends ListenerManagement {
private[akka] final class ActorRegistry private[akka] () extends ListenerManagement {
private val actorsByAddress = new ConcurrentHashMap[String, ActorRef]
private val actorsByUuid = new ConcurrentHashMap[Uuid, ActorRef]
private val typedActorsByUuid = new ConcurrentHashMap[Uuid, AnyRef]

View file

@ -7,21 +7,17 @@ package akka.actor
import java.io.File
import java.net.{ URL, URLClassLoader }
import java.util.jar.JarFile
import akka.util.{ Bootable }
import akka.config.Config._
import akka.util.Bootable
import akka.AkkaApplication
/**
* Handles all modules in the deploy directory (load and unload)
*/
trait BootableActorLoaderService extends Bootable {
val BOOT_CLASSES = config.getList("akka.boot")
lazy val applicationLoader: Option[ClassLoader] = createApplicationClassLoader
protected def createApplicationClassLoader: Option[ClassLoader] = Some({
if (HOME.isDefined) {
val DEPLOY = HOME.get + "/deploy"
protected def createApplicationClassLoader(application: AkkaApplication): Option[ClassLoader] = Some({
if (application.AkkaConfig.HOME.isDefined) {
val DEPLOY = application.AkkaConfig.HOME.get + "/deploy"
val DEPLOY_DIR = new File(DEPLOY)
if (!DEPLOY_DIR.exists) {
System.exit(-1)
@ -45,8 +41,11 @@ trait BootableActorLoaderService extends Bootable {
} else Thread.currentThread.getContextClassLoader
})
abstract override def onLoad = {
super.onLoad
abstract override def onLoad(application: AkkaApplication) = {
super.onLoad(application)
val BOOT_CLASSES = application.AkkaConfig.BOOT_CLASSES
val applicationLoader = createApplicationClassLoader(application)
applicationLoader foreach Thread.currentThread.setContextClassLoader
@ -55,9 +54,9 @@ trait BootableActorLoaderService extends Bootable {
}
}
abstract override def onUnload = {
super.onUnload
Actor.registry.local.shutdownAll
abstract override def onUnload(application: AkkaApplication) = {
super.onUnload(application)
application.registry.local.shutdownAll
}
}

View file

@ -10,9 +10,8 @@ import java.util.concurrent.ConcurrentHashMap
import akka.event.EventHandler
import akka.actor.DeploymentConfig._
import akka.util.ReflectiveAccess._
import akka.AkkaException
import akka.config.{ Configuration, ConfigurationException, Config }
import akka.{ AkkaException, AkkaApplication }
import akka.config.{ Configuration, ConfigurationException }
trait ActorDeployer {
private[akka] def init(deployments: Seq[Deploy]): Unit
@ -27,12 +26,18 @@ trait ActorDeployer {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Deployer extends ActorDeployer {
class Deployer(val application: AkkaApplication) extends ActorDeployer {
val deploymentConfig = new DeploymentConfig(application)
// val defaultAddress = Node(Config.nodename)
lazy val instance: ActorDeployer = {
val deployer = if (ClusterModule.isEnabled) ClusterModule.clusterDeployer else LocalDeployer
val deployer = if (application.reflective.ClusterModule.isEnabled) {
application.reflective.ClusterModule.clusterDeployer
} else {
LocalDeployer
}
deployer.init(deploymentsInConfig)
deployer
}
@ -101,7 +106,7 @@ object Deployer extends ActorDeployer {
private[akka] def addressesInConfig: List[String] = {
val deploymentPath = "akka.actor.deployment"
Config.config.getSection(deploymentPath) match {
application.config.getSection(deploymentPath) match {
case None Nil
case Some(addressConfig)
addressConfig.map.keySet
@ -113,7 +118,7 @@ object Deployer extends ActorDeployer {
/**
* Lookup deployment in 'akka.conf' configuration file.
*/
private[akka] def lookupInConfig(address: String, configuration: Configuration = Config.config): Option[Deploy] = {
private[akka] def lookupInConfig(address: String, configuration: Configuration = application.config): Option[Deploy] = {
import akka.util.ReflectiveAccess.{ createInstance, emptyArguments, emptyParams, getClassFor }
// --------------------------------
@ -234,7 +239,7 @@ object Deployer extends ActorDeployer {
val hostname = remoteConfig.getString("hostname", "localhost")
val port = remoteConfig.getInt("port", 2552)
Some(Deploy(address, recipe, router, nrOfInstances, failureDetector, RemoteScope(hostname, port)))
Some(Deploy(address, recipe, router, nrOfInstances, failureDetector, deploymentConfig.RemoteScope(hostname, port)))
case None // check for 'cluster' config section
@ -280,7 +285,7 @@ object Deployer extends ActorDeployer {
// --------------------------------
clusterConfig.getSection("replication") match {
case None
Some(Deploy(address, recipe, router, nrOfInstances, failureDetector, ClusterScope(preferredNodes, Transient)))
Some(Deploy(address, recipe, router, nrOfInstances, failureDetector, deploymentConfig.ClusterScope(preferredNodes, Transient)))
case Some(replicationConfig)
val storage = replicationConfig.getString("storage", "transaction-log") match {
@ -299,7 +304,7 @@ object Deployer extends ActorDeployer {
".cluster.replication.strategy] needs to be either [\"write-through\"] or [\"write-behind\"] - was [" +
unknown + "]")
}
Some(Deploy(address, recipe, router, nrOfInstances, failureDetector, ClusterScope(preferredNodes, Replication(storage, strategy))))
Some(Deploy(address, recipe, router, nrOfInstances, failureDetector, deploymentConfig.ClusterScope(preferredNodes, Replication(storage, strategy))))
}
}
}

View file

@ -4,16 +4,9 @@
package akka.actor
import akka.config.Config
import akka.routing.{ RouterType, FailureDetectorType }
import akka.AkkaApplication
/**
* Module holding the programmatic deployment configuration classes.
* Defines the deployment specification.
* Most values have defaults and can be left out.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object DeploymentConfig {
// --------------------------------
@ -75,13 +68,6 @@ object DeploymentConfig {
// --- Scope
// --------------------------------
sealed trait Scope
case class ClusterScope(
preferredNodes: Iterable[Home] = Vector(Node(Config.nodename)),
replication: ReplicationScheme = Transient) extends Scope
case class RemoteScope(
hostname: String = "localhost",
port: Int = 2552) extends Scope
// For Java API
case class LocalScope() extends Scope
@ -177,8 +163,6 @@ object DeploymentConfig {
// case IP(address) throw new UnsupportedOperationException("Specifying preferred node name by 'IP address' is not yet supported. Use the node name like: preferred-nodes = [\"node:node1\"]")
}
def isHomeNode(homes: Iterable[Home]): Boolean = homes exists (home nodeNameFor(home) == Config.nodename)
def failureDetectorTypeFor(failureDetector: FailureDetector): FailureDetectorType = failureDetector match {
case BannagePeriodFailureDetector(timeToBan) FailureDetectorType.BannagePeriodFailureDetector(timeToBan)
case RemoveConnectionOnFirstFailureLocalFailureDetector FailureDetectorType.RemoveConnectionOnFirstFailureLocalFailureDetector
@ -205,16 +189,6 @@ object DeploymentConfig {
case c: CustomRouter throw new UnsupportedOperationException("Unknown Router [" + c + "]")
}
def replicationSchemeFor(deployment: Deploy): Option[ReplicationScheme] = deployment match {
case Deploy(_, _, _, _, _, ClusterScope(_, replicationScheme)) Some(replicationScheme)
case _ None
}
def isReplicated(deployment: Deploy): Boolean = replicationSchemeFor(deployment) match {
case Some(replicationScheme) isReplicated(replicationScheme)
case _ false
}
def isReplicated(replicationScheme: ReplicationScheme): Boolean =
isReplicatedWithTransactionLog(replicationScheme) ||
isReplicatedWithDataGrid(replicationScheme)
@ -254,4 +228,38 @@ object DeploymentConfig {
case _: DataGrid | DataGrid throw new UnsupportedOperationException("ReplicationStorage 'DataGrid' is no supported yet")
}
}
}
/**
* Module holding the programmatic deployment configuration classes.
* Defines the deployment specification.
* Most values have defaults and can be left out.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class DeploymentConfig(val application: AkkaApplication) {
import DeploymentConfig._
case class ClusterScope(
preferredNodes: Iterable[Home] = Vector(Node(application.nodename)),
replication: ReplicationScheme = Transient) extends Scope
case class RemoteScope(
hostname: String = "localhost",
port: Int = application.AkkaConfig.REMOTE_SERVER_PORT) extends Scope
def isHomeNode(homes: Iterable[Home]): Boolean = homes exists (home nodeNameFor(home) == application.nodename)
def replicationSchemeFor(deployment: Deploy): Option[ReplicationScheme] = deployment match {
case Deploy(_, _, _, _, _, ClusterScope(_, replicationScheme)) Some(replicationScheme)
case _ None
}
def isReplicated(deployment: Deploy): Boolean = replicationSchemeFor(deployment) match {
case Some(replicationScheme) DeploymentConfig.isReplicated(replicationScheme)
case _ false
}
}

View file

@ -5,7 +5,6 @@ package akka.actor
import akka.util._
import akka.event.EventHandler
import akka.config.Config.config
import scala.collection.mutable
import java.util.concurrent.ScheduledFuture
@ -64,8 +63,6 @@ object FSM {
*/
implicit def d2od(d: Duration): Option[Duration] = Some(d)
val debugEvent = config.getBool("akka.actor.debug.fsm", false)
case class LogEntry[S, D](stateName: S, stateData: D, event: Any)
case class State[S, D](stateName: S, stateData: D, timeout: Option[Duration] = None, stopReason: Option[Reason] = None, replies: List[Any] = Nil) {
@ -571,6 +568,8 @@ trait LoggingFSM[S, D] extends FSM[S, D] { this: Actor ⇒
def logDepth: Int = 0
private val debugEvent = context.application.AkkaConfig.FsmDebugEvent
private val events = new Array[Event](logDepth)
private val states = new Array[AnyRef](logDepth)
private var pos = 0

View file

@ -16,8 +16,8 @@ import akka.util._
*/
object Props {
final val defaultCreator: () Actor = () throw new UnsupportedOperationException("No actor creator specified!")
final val defaultDispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher
final val defaultTimeout: Timeout = Timeout(Duration(Actor.TIMEOUT, "millis"))
final val defaultDispatcher: MessageDispatcher = null
final val defaultTimeout: Timeout = Timeout(Duration.MinusInf)
final val defaultFaultHandler: FaultHandlingStrategy = OneForOneStrategy(classOf[Exception] :: Nil, None, None)
final val defaultSupervisor: Option[ActorRef] = None

View file

@ -28,16 +28,12 @@ object Scheduler {
private[akka] val service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
private def createSendRunnable(receiver: ActorRef, message: Any, throwWhenReceiverExpired: Boolean): Runnable = {
receiver match {
case local: LocalActorRef
val uuid = local.uuid
new Runnable {
def run = Actor.registry.local.actorFor(uuid) match {
case None if (throwWhenReceiverExpired) throw new RuntimeException("Receiver not found, unregistered")
case Some(actor) actor ! message
}
}
case other new Runnable { def run = other ! message }
def run =
if (receiver.isShutdown && throwWhenReceiverExpired)
throw new RuntimeException("Receiver not found, unregistered")
else
receiver ! message
}
}

View file

@ -1,20 +0,0 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
object Supervisor {
class Supervisor(terminationHandling: (ActorContext, Terminated) Unit) extends Actor {
def receive = {
case t: Terminated terminationHandling(context, t)
}
}
private val doNothing: (ActorContext, Terminated) Unit = (_, _) ()
def apply(faultHandler: FaultHandlingStrategy = Props.defaultFaultHandler, supervisor: ActorRef = null,
terminationHandling: (ActorContext, Terminated) Unit = doNothing): ActorRef =
Actor.actorOf(Props(new Supervisor(terminationHandling)).withSupervisor(supervisor).withFaultHandler(faultHandler))
}

View file

@ -5,12 +5,12 @@ package akka.actor
*/
import akka.japi.{ Creator, Option JOption }
import akka.actor.Actor._
import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy }
import akka.util.{ Duration }
import java.util.concurrent.atomic.{ AtomicReference AtomVar }
import akka.serialization.{ Serializer, Serialization }
import akka.dispatch._
import akka.AkkaApplication
//TODO Document this class, not only in Scaladoc, but also in a dedicated typed-actor.rst, for both java and scala
/**
@ -29,7 +29,7 @@ import akka.dispatch._
*
* TypedActors needs, just like Actors, to be Stopped when they are no longer needed, use TypedActor.stop(proxy)
*/
object TypedActor {
class TypedActor(val application: AkkaApplication) {
private val selfReference = new ThreadLocal[AnyRef]
/**
@ -83,7 +83,7 @@ object TypedActor {
case null SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, null, null)
case ps if ps.length == 0 SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, Array[Serializer.Identifier](), Array[Array[Byte]]())
case ps
val serializers: Array[Serializer] = ps map Serialization.findSerializerFor
val serializers: Array[Serializer] = ps map application.serialization.findSerializerFor
val serializedParameters: Array[Array[Byte]] = Array.ofDim[Array[Byte]](serializers.length)
for (i 0 until serializers.length)
serializedParameters(i) = serializers(i) toBinary parameters(i) //Mutable for the sake of sanity
@ -105,7 +105,7 @@ object TypedActor {
case a
val deserializedParameters: Array[AnyRef] = Array.ofDim[AnyRef](a.length) //Mutable for the sake of sanity
for (i 0 until a.length)
deserializedParameters(i) = Serialization.serializerByIdentity(serializerIdentifiers(i)).fromBinary(serializedParameters(i))
deserializedParameters(i) = application.serialization.serializerByIdentity(serializerIdentifiers(i)).fromBinary(serializedParameters(i))
deserializedParameters
})
@ -221,9 +221,13 @@ object TypedActor {
private[akka] def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyVar: AtomVar[T], props: Props, loader: ClassLoader): T = {
//Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling
val actorVar = new AtomVar[ActorRef](null)
val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(actorVar)(props.timeout)).asInstanceOf[T]
val timeout = props.timeout match {
case Timeout(Duration.MinusInf) => application.AkkaConfig.TIMEOUT
case x => x
}
val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(actorVar)(timeout)).asInstanceOf[T]
proxyVar.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive
val ref = actorOf(props)
val ref = application.createActor(props)
actorVar.set(ref) //Make sure the InvocationHandler gets ahold of the actor reference, this is not a problem since the proxy hasn't escaped this method yet
proxyVar.get
}
@ -232,8 +236,8 @@ object TypedActor {
private[akka] class TypedActor[R <: AnyRef, T <: R](val proxyVar: AtomVar[R], createInstance: T) extends Actor {
override def preStart = Actor.registry.registerTypedActor(self, proxyVar.get) //Make sure actor registry knows about this actor
override def postStop = Actor.registry.unregisterTypedActor(self, proxyVar.get)
override def preStart = application.registry.registerTypedActor(self, proxyVar.get) //Make sure actor registry knows about this actor
override def postStop = application.registry.unregisterTypedActor(self, proxyVar.get)
val me = createInstance
def receive = {

View file

@ -7,13 +7,12 @@ package akka.cluster
import akka.actor._
import DeploymentConfig._
import akka.dispatch.Future
import akka.config.Config
import akka.routing._
import akka.serialization.Serializer
import akka.cluster.metrics._
import akka.util.Duration
import akka.util.duration._
import akka.AkkaException
import akka.{ AkkaException, AkkaApplication }
import com.eaio.uuid.UUID
@ -103,7 +102,8 @@ class NodeAddress(val clusterName: String, val nodeName: String) {
* NodeAddress companion object and factory.
*/
object NodeAddress {
def apply(clusterName: String = Config.clusterName, nodeName: String = Config.nodename): NodeAddress = new NodeAddress(clusterName, nodeName)
def apply(clusterName: String, nodeName: String): NodeAddress = new NodeAddress(clusterName, nodeName)
def apply(application: AkkaApplication): NodeAddress = new NodeAddress(application.AkkaConfig.ClusterName, application.nodename)
def unapply(other: Any) = other match {
case address: NodeAddress Some((address.clusterName, address.nodeName))

View file

@ -6,10 +6,6 @@ package akka.config
import akka.AkkaException
import java.net.InetAddress
import com.eaio.uuid.UUID
class ConfigurationException(message: String, cause: Throwable = null) extends AkkaException(message, cause) {
def this(msg: String) = this(msg, null);
}
@ -17,113 +13,3 @@ class ConfigurationException(message: String, cause: Throwable = null) extends A
class ModuleNotAvailableException(message: String, cause: Throwable = null) extends AkkaException(message, cause) {
def this(msg: String) = this(msg, null);
}
/**
* Loads up the configuration (from the akka.conf file).
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Config {
val VERSION = "2.0-SNAPSHOT"
val HOME = {
val envHome = System.getenv("AKKA_HOME") match {
case null | "" | "." None
case value Some(value)
}
val systemHome = System.getProperty("akka.home") match {
case null | "" None
case value Some(value)
}
systemHome orElse envHome
}
val config: Configuration = {
val confName = {
val envConf = System.getenv("AKKA_MODE") match {
case null | "" None
case value Some(value)
}
val systemConf = System.getProperty("akka.mode") match {
case null | "" None
case value Some(value)
}
(systemConf orElse envConf).map("akka." + _ + ".conf").getOrElse("akka.conf")
}
val (newInstance, source) =
if (System.getProperty("akka.config", "") != "") {
val configFile = System.getProperty("akka.config", "")
(() Configuration.fromFile(configFile), "Loading config from -Dakka.config=" + configFile)
} else if (getClass.getClassLoader.getResource(confName) ne null) {
(() Configuration.fromResource(confName, getClass.getClassLoader), "Loading config [" + confName + "] from the application classpath.")
} else if (HOME.isDefined) {
val configFile = HOME.get + "/config/" + confName
(() Configuration.fromFile(configFile), "AKKA_HOME is defined as [" + HOME.get + "], loading config from [" + configFile + "].")
} else {
(() Configuration.fromString("akka {}"), // default empty config
"\nCan't load '" + confName + "'." +
"\nOne of the three ways of locating the '" + confName + "' file needs to be defined:" +
"\n\t1. Define the '-Dakka.config=...' system property option." +
"\n\t2. Put the '" + confName + "' file on the classpath." +
"\n\t3. Define 'AKKA_HOME' environment variable pointing to the root of the Akka distribution." +
"\nI have no way of finding the '" + confName + "' configuration file." +
"\nUsing default values everywhere.")
}
try {
val i = newInstance()
val configVersion = i.getString("akka.version", VERSION)
if (configVersion != VERSION)
throw new ConfigurationException(
"Akka JAR version [" + VERSION + "] is different than the provided config version [" + configVersion + "]")
if (Configuration.outputConfigSources)
System.out.println(source)
i
} catch {
case e
System.err.println("Couldn't parse config, fatal error.")
System.err.println("Config source: " + source)
e.printStackTrace(System.err)
System.exit(-1)
throw e
}
}
val CONFIG_VERSION = config.getString("akka.version", VERSION)
val TIME_UNIT = config.getString("akka.time-unit", "seconds")
val isClusterEnabled = config.getList("akka.enabled-modules").exists(_ == "cluster")
val clusterName = config.getString("akka.cluster.name", "default")
val nodename = System.getProperty("akka.cluster.nodename") match {
case null | "" new UUID().toString
case value value
}
val hostname = System.getProperty("akka.remote.hostname") match {
case null | "" InetAddress.getLocalHost.getHostName
case value value
}
val remoteServerPort = System.getProperty("akka.remote.port") match {
case null | ""
System.getProperty("akka.remote.server.port") match {
case null | "" config.getInt("akka.remote.server.port", 2552)
case value value.toInt
}
case value value.toInt
}
val startTime = System.currentTimeMillis
def uptime = (System.currentTimeMillis - startTime) / 1000
}

View file

@ -8,10 +8,10 @@ import java.util.concurrent._
import java.util.concurrent.atomic.AtomicLong
import akka.event.EventHandler
import akka.config.Configuration
import akka.config.Config.TIME_UNIT
import akka.util.{ Duration, Switch, ReentrantGuard }
import java.util.concurrent.ThreadPoolExecutor.{ AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy }
import akka.actor._
import akka.AkkaApplication
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
@ -60,8 +60,6 @@ object MessageDispatcher {
val UNSCHEDULED = 0
val SCHEDULED = 1
val RESCHEDULED = 2
implicit def defaultGlobalDispatcher = Dispatchers.defaultGlobalDispatcher
}
/**
@ -238,7 +236,7 @@ abstract class MessageDispatcher extends Serializable {
* When the dispatcher no longer has any actors registered, how long will it wait until it shuts itself down, in Ms
* defaulting to your akka configs "akka.actor.dispatcher-shutdown-timeout" or otherwise, 1 Second
*/
protected[akka] def timeoutMs: Long = Dispatchers.DEFAULT_SHUTDOWN_TIMEOUT.toMillis
protected[akka] def timeoutMs: Long
/**
* After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference
@ -305,19 +303,19 @@ abstract class MessageDispatcher extends Serializable {
/**
* Trait to be used for hooking in new dispatchers into Dispatchers.fromConfig
*/
abstract class MessageDispatcherConfigurator {
abstract class MessageDispatcherConfigurator(val application: AkkaApplication) {
/**
* Returns an instance of MessageDispatcher given a Configuration
*/
def configure(config: Configuration): MessageDispatcher
def mailboxType(config: Configuration): MailboxType = {
val capacity = config.getInt("mailbox-capacity", Dispatchers.MAILBOX_CAPACITY)
val capacity = config.getInt("mailbox-capacity", application.AkkaConfig.MailboxCapacity)
if (capacity < 1) UnboundedMailbox()
else {
val duration = Duration(
config.getInt("mailbox-push-timeout-time", Dispatchers.MAILBOX_PUSH_TIME_OUT.toMillis.toInt),
TIME_UNIT)
config.getInt("mailbox-push-timeout-time", application.AkkaConfig.MailboxPushTimeout.toMillis.toInt),
application.AkkaConfig.TIME_UNIT)
BoundedMailbox(capacity, duration)
}
}
@ -327,7 +325,7 @@ abstract class MessageDispatcherConfigurator {
//Apply the following options to the config if they are present in the config
ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig()).configure(
conf_?(config getInt "keep-alive-time")(time _.setKeepAliveTime(Duration(time, TIME_UNIT))),
conf_?(config getInt "keep-alive-time")(time _.setKeepAliveTime(Duration(time, application.AkkaConfig.TIME_UNIT))),
conf_?(config getDouble "core-pool-size-factor")(factor _.setCorePoolSizeFromFactor(factor)),
conf_?(config getDouble "max-pool-size-factor")(factor _.setMaxPoolSizeFromFactor(factor)),
conf_?(config getInt "executor-bounds")(bounds _.setExecutorBounds(bounds)),

View file

@ -28,29 +28,12 @@ import annotation.tailrec
*/
class BalancingDispatcher(
_name: String,
throughput: Int = Dispatchers.THROUGHPUT,
throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
config: ThreadPoolConfig = ThreadPoolConfig())
extends Dispatcher(_name, throughput, throughputDeadlineTime, mailboxType, config) {
def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
this(_name, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage
def this(_name: String, throughput: Int, mailboxType: MailboxType) =
this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
def this(_name: String, throughput: Int) =
this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
def this(_name: String, _config: ThreadPoolConfig) =
this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _config)
def this(_name: String, memberType: Class[_ <: Actor]) =
this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
def this(_name: String, mailboxType: MailboxType) =
this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
throughput: Int,
throughputDeadlineTime: Int,
mailboxType: MailboxType,
config: ThreadPoolConfig,
_timeoutMs: Long)
extends Dispatcher(_name, throughput, throughputDeadlineTime, mailboxType, config, _timeoutMs) {
private val buddies = new ConcurrentSkipListSet[ActorCell](new Comparator[ActorCell] { def compare(a: ActorCell, b: ActorCell) = a.uuid.compareTo(b.uuid) }) //new ConcurrentLinkedQueue[ActorCell]()

View file

@ -64,27 +64,13 @@ import akka.actor.{ ActorCell, ActorKilledException }
*/
class Dispatcher(
_name: String,
val throughput: Int = Dispatchers.THROUGHPUT,
val throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
val mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
executorServiceFactoryProvider: ExecutorServiceFactoryProvider = ThreadPoolConfig())
val throughput: Int,
val throughputDeadlineTime: Int,
val mailboxType: MailboxType,
executorServiceFactoryProvider: ExecutorServiceFactoryProvider,
val timeoutMs: Long)
extends MessageDispatcher {
def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) =
this(_name, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage
def this(_name: String, throughput: Int, mailboxType: MailboxType) =
this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
def this(_name: String, throughput: Int) =
this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
def this(_name: String, _executorServiceFactoryProvider: ExecutorServiceFactoryProvider) =
this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, _executorServiceFactoryProvider)
def this(_name: String) =
this(_name, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
val name = "akka:event-driven:dispatcher:" + _name
protected[akka] val executorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(name)

View file

@ -6,12 +6,10 @@ package akka.dispatch
import akka.actor.LocalActorRef
import akka.actor.newUuid
import akka.config.Config._
import akka.util.{ Duration, ReflectiveAccess }
import akka.config.Configuration
import java.util.concurrent.TimeUnit
import akka.AkkaApplication
/**
* Scala API. Dispatcher factory.
@ -45,19 +43,15 @@ import java.util.concurrent.TimeUnit
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Dispatchers {
val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
val DEFAULT_SHUTDOWN_TIMEOUT = config.getLong("akka.actor.dispatcher-shutdown-timeout").
map(time Duration(time, TIME_UNIT)).
getOrElse(Duration(1000, TimeUnit.MILLISECONDS))
val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", -1)
val MAILBOX_PUSH_TIME_OUT = Duration(config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-time", 10), TIME_UNIT)
val THROUGHPUT_DEADLINE_TIME = Duration(config.getInt("akka.actor.throughput-deadline-time", -1), TIME_UNIT)
val THROUGHPUT_DEADLINE_TIME_MILLIS = THROUGHPUT_DEADLINE_TIME.toMillis.toInt
val MAILBOX_TYPE: MailboxType = if (MAILBOX_CAPACITY < 1) UnboundedMailbox() else BoundedMailbox()
class Dispatchers(val application: AkkaApplication) {
val THROUGHPUT_DEADLINE_TIME_MILLIS = application.AkkaConfig.ThroughputDeadlineTime.toMillis.toInt
val MAILBOX_TYPE: MailboxType =
if (application.AkkaConfig.MailboxCapacity < 1) UnboundedMailbox()
else BoundedMailbox(application.AkkaConfig.MailboxCapacity, application.AkkaConfig.MailboxPushTimeout)
val DISPATCHER_SHUTDOWN_TIMEOUT = application.AkkaConfig.DispatcherDefaultShutdown.toMillis
lazy val defaultGlobalDispatcher =
config.getSection("akka.actor.default-dispatcher").flatMap(from) getOrElse newDispatcher("AkkaDefaultGlobalDispatcher", 1, MAILBOX_TYPE).build
application.config.getSection("akka.actor.default-dispatcher").flatMap(from) getOrElse newDispatcher("AkkaDefaultGlobalDispatcher", 1, MAILBOX_TYPE).build
/**
* Creates an thread based dispatcher serving a single actor through the same single thread.
@ -66,8 +60,8 @@ object Dispatchers {
* E.g. each actor consumes its own thread.
*/
def newPinnedDispatcher(actor: LocalActorRef) = actor match {
case null new PinnedDispatcher()
case some new PinnedDispatcher(some.underlying)
case null new PinnedDispatcher(null, "anon", MAILBOX_TYPE, DISPATCHER_SHUTDOWN_TIMEOUT)
case some new PinnedDispatcher(some.underlying, some.underlying.uuid.toString, MAILBOX_TYPE, DISPATCHER_SHUTDOWN_TIMEOUT)
}
/**
@ -77,8 +71,8 @@ object Dispatchers {
* E.g. each actor consumes its own thread.
*/
def newPinnedDispatcher(actor: LocalActorRef, mailboxType: MailboxType) = actor match {
case null new PinnedDispatcher(mailboxType)
case some new PinnedDispatcher(some.underlying, mailboxType)
case null new PinnedDispatcher(null, "anon", mailboxType, DISPATCHER_SHUTDOWN_TIMEOUT)
case some new PinnedDispatcher(some.underlying, some.underlying.uuid.toString, mailboxType, DISPATCHER_SHUTDOWN_TIMEOUT)
}
/**
@ -87,7 +81,7 @@ object Dispatchers {
* E.g. each actor consumes its own thread.
*/
def newPinnedDispatcher(name: String, mailboxType: MailboxType) =
new PinnedDispatcher(name, mailboxType)
new PinnedDispatcher(null, name, mailboxType, DISPATCHER_SHUTDOWN_TIMEOUT)
/**
* Creates an thread based dispatcher serving a single actor through the same single thread.
@ -95,7 +89,7 @@ object Dispatchers {
* E.g. each actor consumes its own thread.
*/
def newPinnedDispatcher(name: String) =
new PinnedDispatcher(name)
new PinnedDispatcher(null, name, MAILBOX_TYPE, DISPATCHER_SHUTDOWN_TIMEOUT)
/**
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
@ -103,7 +97,8 @@ object Dispatchers {
* Has a fluent builder interface for configuring its semantics.
*/
def newDispatcher(name: String) =
ThreadPoolConfigDispatcherBuilder(config new Dispatcher(name, config), ThreadPoolConfig())
ThreadPoolConfigDispatcherBuilder(config new Dispatcher(name, application.AkkaConfig.DispatcherThroughput,
THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE, config, DISPATCHER_SHUTDOWN_TIMEOUT), ThreadPoolConfig())
/**
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
@ -112,7 +107,7 @@ object Dispatchers {
*/
def newDispatcher(name: String, throughput: Int, mailboxType: MailboxType) =
ThreadPoolConfigDispatcherBuilder(config
new Dispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config), ThreadPoolConfig())
new Dispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config, DISPATCHER_SHUTDOWN_TIMEOUT), ThreadPoolConfig())
/**
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
@ -121,7 +116,7 @@ object Dispatchers {
*/
def newDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) =
ThreadPoolConfigDispatcherBuilder(config
new Dispatcher(name, throughput, throughputDeadlineMs, mailboxType, config), ThreadPoolConfig())
new Dispatcher(name, throughput, throughputDeadlineMs, mailboxType, config, DISPATCHER_SHUTDOWN_TIMEOUT), ThreadPoolConfig())
/**
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
@ -129,7 +124,8 @@ object Dispatchers {
* Has a fluent builder interface for configuring its semantics.
*/
def newBalancingDispatcher(name: String) =
ThreadPoolConfigDispatcherBuilder(config new BalancingDispatcher(name, config), ThreadPoolConfig())
ThreadPoolConfigDispatcherBuilder(config new BalancingDispatcher(name, application.AkkaConfig.DispatcherThroughput,
THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE, config, DISPATCHER_SHUTDOWN_TIMEOUT), ThreadPoolConfig())
/**
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
@ -138,7 +134,7 @@ object Dispatchers {
*/
def newBalancingDispatcher(name: String, throughput: Int) =
ThreadPoolConfigDispatcherBuilder(config
new BalancingDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE, config), ThreadPoolConfig())
new BalancingDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE, config, DISPATCHER_SHUTDOWN_TIMEOUT), ThreadPoolConfig())
/**
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
@ -147,7 +143,7 @@ object Dispatchers {
*/
def newBalancingDispatcher(name: String, throughput: Int, mailboxType: MailboxType) =
ThreadPoolConfigDispatcherBuilder(config
new BalancingDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config), ThreadPoolConfig())
new BalancingDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config, DISPATCHER_SHUTDOWN_TIMEOUT), ThreadPoolConfig())
/**
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
@ -156,13 +152,13 @@ object Dispatchers {
*/
def newBalancingDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) =
ThreadPoolConfigDispatcherBuilder(config
new BalancingDispatcher(name, throughput, throughputDeadlineMs, mailboxType, config), ThreadPoolConfig())
new BalancingDispatcher(name, throughput, throughputDeadlineMs, mailboxType, config, DISPATCHER_SHUTDOWN_TIMEOUT), ThreadPoolConfig())
/**
* Utility function that tries to load the specified dispatcher config from the akka.conf
* or else use the supplied default dispatcher
*/
def fromConfig(key: String, default: MessageDispatcher = defaultGlobalDispatcher): MessageDispatcher =
config getSection key flatMap from getOrElse default
application.config getSection key flatMap from getOrElse default
/*
* Creates of obtains a dispatcher from a ConfigMap according to the format below
@ -189,8 +185,8 @@ object Dispatchers {
*/
def from(cfg: Configuration): Option[MessageDispatcher] = {
cfg.getString("type") flatMap {
case "Dispatcher" Some(new DispatcherConfigurator())
case "BalancingDispatcher" Some(new BalancingDispatcherConfigurator())
case "Dispatcher" Some(new DispatcherConfigurator(application))
case "BalancingDispatcher" Some(new BalancingDispatcherConfigurator(application))
case "GlobalDispatcher" None //TODO FIXME remove this
case fqn
ReflectiveAccess.getClassFor[MessageDispatcherConfigurator](fqn) match {
@ -210,24 +206,26 @@ object Dispatchers {
}
}
class DispatcherConfigurator extends MessageDispatcherConfigurator {
class DispatcherConfigurator(application: AkkaApplication) extends MessageDispatcherConfigurator(application) {
def configure(config: Configuration): MessageDispatcher = {
configureThreadPool(config, threadPoolConfig new Dispatcher(
config.getString("name", newUuid.toString),
config.getInt("throughput", Dispatchers.THROUGHPUT),
config.getInt("throughput-deadline-time", Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS),
config.getInt("throughput", application.AkkaConfig.DispatcherThroughput),
config.getInt("throughput-deadline-time", application.AkkaConfig.ThroughputDeadlineTime.toMillis.toInt),
mailboxType(config),
threadPoolConfig)).build
threadPoolConfig,
application.AkkaConfig.DispatcherDefaultShutdown.toMillis)).build
}
}
class BalancingDispatcherConfigurator extends MessageDispatcherConfigurator {
class BalancingDispatcherConfigurator(application: AkkaApplication) extends MessageDispatcherConfigurator(application) {
def configure(config: Configuration): MessageDispatcher = {
configureThreadPool(config, threadPoolConfig new BalancingDispatcher(
config.getString("name", newUuid.toString),
config.getInt("throughput", Dispatchers.THROUGHPUT),
config.getInt("throughput-deadline-time", Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS),
config.getInt("throughput", application.AkkaConfig.DispatcherThroughput),
config.getInt("throughput-deadline-time", application.AkkaConfig.ThroughputDeadlineTime.toMillis.toInt),
mailboxType(config),
threadPoolConfig)).build
threadPoolConfig,
application.AkkaConfig.DispatcherDefaultShutdown.toMillis)).build
}
}

View file

@ -27,13 +27,16 @@ class FutureTimeoutException(message: String, cause: Throwable = null) extends A
def this(message: String) = this(message, null)
}
object Futures {
class FutureFactory(dispatcher: MessageDispatcher, timeout: Timeout) {
// TODO: remove me ASAP !!!
implicit val _dispatcher = dispatcher
/**
* Java API, equivalent to Future.apply
*/
def future[T](body: Callable[T]): Future[T] =
Future(body.call)
Future(body.call, timeout)
/**
* Java API, equivalent to Future.apply
@ -51,7 +54,7 @@ object Futures {
* Java API, equivalent to Future.apply
*/
def future[T](body: Callable[T], dispatcher: MessageDispatcher): Future[T] =
Future(body.call)(dispatcher)
Future(body.call)(dispatcher, timeout)
/**
* Java API, equivalent to Future.apply
@ -71,9 +74,11 @@ object Futures {
*/
def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean], timeout: Timeout): Future[JOption[T]] = {
val pred: T Boolean = predicate.apply(_)
Future.find[T](pred, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures)).map(JOption.fromScalaOption(_))
Future.find[T]((scala.collection.JavaConversions.iterableAsScalaIterable(futures)), timeout)(pred).map(JOption.fromScalaOption(_))(timeout)
}
def find[T <: AnyRef](futures: JIterable[Future[T]], predicate: JFunc[T, java.lang.Boolean]): Future[JOption[T]] = find(futures, predicate, timeout)
/**
* Java API.
* Returns a Future to the result of the first future in the list that is completed
@ -81,6 +86,8 @@ object Futures {
def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], timeout: Timeout): Future[T] =
Future.firstCompletedOf(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)
def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]]): Future[T] = firstCompletedOf(futures, timeout)
/**
* Java API
* A non-blocking fold over the specified futures.
@ -89,11 +96,11 @@ object Futures {
* or the result of the fold.
*/
def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Timeout, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] =
Future.fold(zero, timeout)(scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply _)
Future.fold(scala.collection.JavaConversions.iterableAsScalaIterable(futures), timeout)(zero)(fun.apply _)
def fold[T <: AnyRef, R <: AnyRef](zero: R, timeout: Long, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = fold(zero, timeout: Timeout, futures, fun)
def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = fold(zero, Timeout.default, futures, fun)
def fold[T <: AnyRef, R <: AnyRef](zero: R, futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, R]): Future[R] = fold(zero, timeout, futures, fun)
/**
* Java API.
@ -104,24 +111,23 @@ object Futures {
def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], timeout: Long, fun: akka.japi.Function2[R, T, T]): Future[R] = reduce(futures, timeout: Timeout, fun)
def reduce[T <: AnyRef, R >: T](futures: java.lang.Iterable[Future[T]], fun: akka.japi.Function2[R, T, T]): Future[R] = reduce(futures, timeout, fun)
/**
* Java API.
* Simple version of Future.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.lang.Iterable[A]].
* Useful for reducing many Futures into a single Future.
*/
def sequence[A](in: JIterable[Future[A]], timeout: Timeout): Future[JIterable[A]] =
def sequence[A](in: JIterable[Future[A]], timeout: Timeout): Future[JIterable[A]] = {
implicit val t = timeout
scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[A]()))((fr, fa)
for (r fr; a fa) yield {
r add a
r
})
}
/**
* Java API.
* Simple version of Futures.traverse. Transforms a java.lang.Iterable[Future[A]] into a Future[java.lang.Iterable[A]].
* Useful for reducing many Futures into a single Future.
*/
def sequence[A](in: JIterable[Future[A]]): Future[JIterable[A]] = sequence(in, Timeout.default)
def sequence[A](in: JIterable[Future[A]]): Future[JIterable[A]] = sequence(in, timeout)
/**
* Java API.
@ -129,7 +135,8 @@ object Futures {
* This is useful for performing a parallel map. For example, to apply a function to all items of a list
* in parallel.
*/
def traverse[A, B](in: JIterable[A], timeout: Timeout, fn: JFunc[A, Future[B]]): Future[JIterable[B]] =
def traverse[A, B](in: JIterable[A], timeout: Timeout, fn: JFunc[A, Future[B]]): Future[JIterable[B]] = {
implicit val t = timeout
scala.collection.JavaConversions.iterableAsScalaIterable(in).foldLeft(Future(new JLinkedList[B]())) { (fr, a)
val fb = fn(a)
for (r fr; b fb) yield {
@ -137,14 +144,10 @@ object Futures {
r
}
}
}
def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]]): Future[JIterable[B]] = traverse(in, timeout, fn)
/**
* Java API.
* Transforms a java.lang.Iterable[A] into a Future[java.lang.Iterable[B]] using the provided Function A Future[B].
* This is useful for performing a parallel map. For example, to apply a function to all items of a list
* in parallel.
*/
def traverse[A, B](in: JIterable[A], fn: JFunc[A, Future[B]]): Future[JIterable[B]] = traverse(in, Timeout.default, fn)
}
object Future {
@ -153,7 +156,7 @@ object Future {
* This method constructs and returns a Future that will eventually hold the result of the execution of the supplied body
* The execution is performed by the specified Dispatcher.
*/
def apply[T](body: T)(implicit dispatcher: MessageDispatcher, timeout: Timeout = implicitly): Future[T] = {
def apply[T](body: T)(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[T] = {
val promise = new DefaultPromise[T](timeout)
dispatcher dispatchTask { ()
promise complete {
@ -183,16 +186,16 @@ object Future {
* Simple version of Futures.traverse. Transforms a Traversable[Future[A]] into a Future[Traversable[A]].
* Useful for reducing many Futures into a single Future.
*/
def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], timeout: Timeout): Future[M[A]] =
def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], timeout: Timeout, dispatcher: MessageDispatcher): Future[M[A]] =
in.foldLeft(new KeptPromise(Right(cbf(in))): Future[Builder[A, M[A]]])((fr, fa) for (r fr; a fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result)
def sequence[A, M[_] <: Traversable[_]](timeout: Timeout)(in: M[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] =
sequence(in)(cbf, timeout)
def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]], timeout: Timeout)(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], dispatcher: MessageDispatcher): Future[M[A]] =
sequence(in)(cbf, timeout, dispatcher)
/**
* Returns a Future to the result of the first future in the list that is completed
*/
def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Timeout = Timeout.never): Future[T] = {
def firstCompletedOf[T](futures: Iterable[Future[T]])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[T] = {
val futureResult = new DefaultPromise[T](timeout)
val completeFirst: Future[T] Unit = _.value.foreach(futureResult complete _)
@ -201,10 +204,13 @@ object Future {
futureResult
}
def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Timeout)(implicit dispatcher: MessageDispatcher): Future[T] =
firstCompletedOf(futures)(dispatcher, timeout)
/**
* Returns a Future that will hold the optional result of the first Future with a result that matches the predicate
*/
def find[T](predicate: T Boolean, timeout: Timeout = Timeout.default)(futures: Iterable[Future[T]]): Future[Option[T]] = {
def find[T](futures: Iterable[Future[T]])(predicate: T Boolean)(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[Option[T]] = {
if (futures.isEmpty) new KeptPromise[Option[T]](Right(None))
else {
val result = new DefaultPromise[Option[T]](timeout)
@ -221,6 +227,9 @@ object Future {
}
}
def find[T](futures: Iterable[Future[T]], timeout:Timeout)(predicate: T Boolean)(implicit dispatcher: MessageDispatcher): Future[Option[T]] =
find(futures)(predicate)(dispatcher, timeout)
/**
* A non-blocking fold over the specified futures.
* The fold is performed on the thread where the last future is completed,
@ -231,7 +240,7 @@ object Future {
* val result = Futures.fold(0)(futures)(_ + _).await.result
* </pre>
*/
def fold[T, R](zero: R, timeout: Timeout = Timeout.default)(futures: Iterable[Future[T]])(foldFun: (R, T) R): Future[R] = {
def fold[T, R](futures: Iterable[Future[T]])(zero: R)(foldFun: (R, T) R)(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[R] = {
if (futures.isEmpty) {
new KeptPromise[R](Right(zero))
} else {
@ -273,6 +282,9 @@ object Future {
}
}
def fold[T, R](futures: Iterable[Future[T]], timeout: Timeout)(zero: R)(foldFun: (R, T) R)(implicit dispatcher: MessageDispatcher): Future[R] =
fold(futures)(zero)(foldFun)(dispatcher, timeout)
/**
* Initiates a fold over the supplied futures where the fold-zero is the result value of the Future that's completed first
* Example:
@ -280,7 +292,7 @@ object Future {
* val result = Futures.reduce(futures)(_ + _).await.result
* </pre>
*/
def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Timeout = Timeout.default)(op: (R, T) T): Future[R] = {
def reduce[T, R >: T](futures: Iterable[Future[T]])(op: (R, T) T)(implicit dispatcher: MessageDispatcher, timeout: Timeout): Future[R] = {
if (futures.isEmpty)
new KeptPromise[R](Left(new UnsupportedOperationException("empty reduce left")))
else {
@ -289,7 +301,7 @@ object Future {
val seedFold: Future[T] Unit = f {
if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold
f.value.get match {
case Right(value) result.completeWith(fold(value, timeout)(futures.filterNot(_ eq f))(op))
case Right(value) result.completeWith(fold(futures.filterNot(_ eq f))(value)(op))
case Left(exception) result.completeWithException(exception)
}
}
@ -299,6 +311,9 @@ object Future {
}
}
def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Timeout)(op: (R, T) T)(implicit dispatcher: MessageDispatcher): Future[R] =
reduce(futures)(op)(dispatcher, timeout)
/**
* Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A Future[B].
* This is useful for performing a parallel map. For example, to apply a function to all items of a list
@ -307,14 +322,14 @@ object Future {
* val myFutureList = Futures.traverse(myList)(x Future(myFunc(x)))
* </pre>
*/
def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], timeout: Timeout): Future[M[B]] =
def traverse[A, B, M[_] <: Traversable[_]](in: M[A])(fn: A Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], timeout: Timeout, dispatcher: MessageDispatcher): Future[M[B]] =
in.foldLeft(new KeptPromise(Right(cbf(in))): Future[Builder[B, M[B]]]) { (fr, a)
val fb = fn(a.asInstanceOf[A])
for (r fr; b fb) yield (r += b)
}.map(_.result)
def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Timeout)(fn: A Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] =
traverse(in)(fn)(cbf, timeout)
def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Timeout)(fn: A Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], dispatcher: MessageDispatcher): Future[M[B]] =
traverse(in)(fn)(cbf, timeout, dispatcher)
/**
* Captures a block that will be transformed into 'Continuation Passing Style' using Scala's Delimited
@ -342,6 +357,8 @@ object Future {
future
}
// TODO make variant of flow(timeout)(body) which does NOT break type inference
private val _taskStack = new ThreadLocal[Option[Stack[() Unit]]]() {
override def initialValue = None
}
@ -689,12 +706,20 @@ package japi {
private[japi] final def onResult[A >: T](proc: Procedure[A]): this.type = self.onResult({ case r proc(r.asInstanceOf[A]) }: PartialFunction[T, Unit])
private[japi] final def onException(proc: Procedure[Throwable]): this.type = self.onException({ case t: Throwable proc(t) }: PartialFunction[Throwable, Unit])
private[japi] final def onComplete[A >: T](proc: Procedure[akka.dispatch.Future[A]]): this.type = self.onComplete(proc(_))
private[japi] final def map[A >: T, B](f: JFunc[A, B]): akka.dispatch.Future[B] = self.map(f(_))
private[japi] final def flatMap[A >: T, B](f: JFunc[A, akka.dispatch.Future[B]]): akka.dispatch.Future[B] = self.flatMap(f(_))
private[japi] final def map[A >: T, B](f: JFunc[A, B], timeout: Timeout): akka.dispatch.Future[B] = {
implicit val t = timeout
self.map(f(_))
}
private[japi] final def flatMap[A >: T, B](f: JFunc[A, akka.dispatch.Future[B]], timeout: Timeout): akka.dispatch.Future[B] = {
implicit val t = timeout
self.flatMap(f(_))
}
private[japi] final def foreach[A >: T](proc: Procedure[A]): Unit = self.foreach(proc(_))
private[japi] final def filter[A >: T](p: JFunc[A, java.lang.Boolean]): akka.dispatch.Future[A] =
private[japi] final def filter[A >: T](p: JFunc[A, java.lang.Boolean], timeout: Timeout): akka.dispatch.Future[A] = {
implicit val t = timeout
self.filter((a: Any) p(a.asInstanceOf[A])).asInstanceOf[akka.dispatch.Future[A]]
}
}
}
object Promise {
@ -707,12 +732,12 @@ object Promise {
/**
* Creates a non-completed, new, Promise with the default timeout (akka.actor.timeout in conf)
*/
def apply[A]()(implicit dispatcher: MessageDispatcher): Promise[A] = apply(Timeout.default)
def apply[A]()(implicit dispatcher: MessageDispatcher, timeout: Timeout): Promise[A] = apply(timeout)
/**
* Construct a completable channel
*/
def channel(timeout: Long = Actor.TIMEOUT)(implicit dispatcher: MessageDispatcher): ActorPromise = new ActorPromise(timeout)
def channel(timeout: Long)(implicit dispatcher: MessageDispatcher): ActorPromise = new ActorPromise(timeout)
}
/**
@ -795,7 +820,7 @@ private[akka] case class FState[T](value: Option[Either[Throwable, T]] = None, l
class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDispatcher) extends AbstractPromise with Promise[T] {
self
def this()(implicit dispatcher: MessageDispatcher) = this(Timeout.default)
def this()(implicit dispatcher: MessageDispatcher, timeout: Timeout) = this(timeout)
def this(timeout: Long)(implicit dispatcher: MessageDispatcher) = this(Timeout(timeout))
@ -914,7 +939,7 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
if (timeout.duration.isFinite) {
value match {
case Some(_) this
case _ if isExpired Future[A](fallback)
case _ if isExpired Future[A](fallback, timeout)
case _
val promise = new DefaultPromise[A](Timeout.never) //TODO FIXME We can't have infinite timeout here, doesn't make sense.
promise completeWith this
@ -958,8 +983,8 @@ class ActorPromise(timeout: Timeout)(implicit dispatcher: MessageDispatcher) ext
}
object ActorPromise {
def apply(f: Promise[Any]): ActorPromise =
new ActorPromise(f.timeout)(f.dispatcher) {
def apply(f: Promise[Any])(timeout: Timeout = f.timeout): ActorPromise =
new ActorPromise(timeout)(f.dispatcher) {
completeWith(f)
override def !(message: Any)(implicit channel: UntypedChannel) = f completeWithResult message
override def sendException(ex: Throwable) = {

View file

@ -270,9 +270,7 @@ case class UnboundedMailbox() extends MailboxType {
}
}
case class BoundedMailbox(
val capacity: Int = { if (Dispatchers.MAILBOX_CAPACITY < 0) Int.MaxValue else Dispatchers.MAILBOX_CAPACITY },
val pushTimeOut: Duration = Dispatchers.MAILBOX_PUSH_TIME_OUT) extends MailboxType {
case class BoundedMailbox(val capacity: Int, val pushTimeOut: Duration) extends MailboxType {
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
@ -291,10 +289,7 @@ case class UnboundedPriorityMailbox(cmp: Comparator[Envelope]) extends MailboxTy
}
}
case class BoundedPriorityMailbox(
val cmp: Comparator[Envelope],
val capacity: Int = { if (Dispatchers.MAILBOX_CAPACITY < 0) Int.MaxValue else Dispatchers.MAILBOX_CAPACITY },
val pushTimeOut: Duration = Dispatchers.MAILBOX_PUSH_TIME_OUT) extends MailboxType {
case class BoundedPriorityMailbox(val cmp: Comparator[Envelope], val capacity: Int, val pushTimeOut: Duration) extends MailboxType {
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")

View file

@ -12,23 +12,8 @@ import akka.actor.ActorCell
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class PinnedDispatcher(_actor: ActorCell, _name: String, _mailboxType: MailboxType)
extends Dispatcher(
_name, Dispatchers.THROUGHPUT, -1, _mailboxType, PinnedDispatcher.oneThread) {
def this(_name: String, _mailboxType: MailboxType) = this(null, _name, _mailboxType)
def this(_actor: ActorCell, _name: String) = this(_actor, _name, Dispatchers.MAILBOX_TYPE)
def this(_name: String) = this(null, _name, Dispatchers.MAILBOX_TYPE)
def this(_mailboxType: MailboxType) = this(null, "anon", _mailboxType)
def this(_actor: ActorCell, _mailboxType: MailboxType) = this(_actor, _actor.uuid.toString, _mailboxType)
def this(_actor: ActorCell) = this(_actor, _actor.uuid.toString, Dispatchers.MAILBOX_TYPE)
def this() = this(Dispatchers.MAILBOX_TYPE)
class PinnedDispatcher(_actor: ActorCell, _name: String, _mailboxType: MailboxType, _timeoutMs: Long)
extends Dispatcher(_name, Int.MaxValue, -1, _mailboxType, PinnedDispatcher.oneThread, _timeoutMs) {
protected[akka] val owner = new AtomicReference[ActorCell](_actor)

View file

@ -4,13 +4,13 @@
package akka.dispatch
import akka.actor.Actor.TIMEOUT
import java.util.concurrent.atomic.AtomicReference
import scala.util.continuations._
import scala.annotation.{ tailrec }
import akka.actor.Timeout
object PromiseStream {
def apply[A](timeout: Long = TIMEOUT): PromiseStream[A] = new PromiseStream[A](timeout)
def apply[A](implicit dispatcher: MessageDispatcher, timeout: Timeout): PromiseStream[A] = new PromiseStream[A]
private sealed trait State
private case object Normal extends State
@ -29,7 +29,7 @@ trait PromiseStreamOut[A] {
def apply(promise: Promise[A]): A @cps[Future[Any]]
final def map[B](f: (A) B): PromiseStreamOut[B] = new PromiseStreamOut[B] {
final def map[B](f: (A) B)(implicit timeout: Timeout): PromiseStreamOut[B] = new PromiseStreamOut[B] {
def dequeue(): Future[B] = self.dequeue().map(f)
@ -102,11 +102,9 @@ trait PromiseStreamIn[A] {
}
class PromiseStream[A](timeout: Long) extends PromiseStreamOut[A] with PromiseStreamIn[A] {
class PromiseStream[A](implicit val dispatcher: MessageDispatcher, val timeout: Timeout) extends PromiseStreamOut[A] with PromiseStreamIn[A] {
import PromiseStream.{ State, Normal, Pending, Busy }
def this() = this(TIMEOUT)
private val _elemOut: AtomicReference[List[A]] = new AtomicReference(Nil)
private val _elemIn: AtomicReference[List[A]] = new AtomicReference(Nil)
private val _pendOut: AtomicReference[List[Promise[A]]] = new AtomicReference(null)
@ -122,9 +120,9 @@ class PromiseStream[A](timeout: Long) extends PromiseStreamOut[A] with PromiseSt
if (eo.nonEmpty) {
if (_elemOut.compareAndSet(eo, eo.tail)) shift { cont: (A Future[Any]) cont(eo.head) }
else apply()
} else apply(Promise[A](timeout))
} else apply(Promise[A])
}
} else apply(Promise[A](timeout))
} else apply(Promise[A])
final def apply(promise: Promise[A]): A @cps[Future[Any]] =
shift { cont: (A Future[Any]) dequeue(promise) flatMap cont }

View file

@ -6,11 +6,11 @@ package akka.event
import akka.actor._
import akka.dispatch.Dispatchers
import akka.config.Config._
import akka.config.ConfigurationException
import akka.util.{ ListenerManagement, ReflectiveAccess }
import akka.serialization._
import akka.AkkaException
import akka.AkkaApplication
/**
* Event handler.
@ -54,6 +54,10 @@ import akka.AkkaException
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object EventHandler extends ListenerManagement {
// TODO remove this EVIL thing!
private val appl = AkkaApplication("akka-reference.conf")
val synchronousLogging: Boolean = System.getProperty("akka.event.force-sync") match {
case null | "" false
case _ true
@ -97,12 +101,12 @@ object EventHandler extends ListenerManagement {
lazy val StandardOutLogger = new StandardOutLogger {}
lazy val EventHandlerDispatcher =
Dispatchers.fromConfig("akka.event-handler-dispatcher", Dispatchers.newDispatcher("event-handler-dispatcher").setCorePoolSize(2).build)
appl.dispatcherFactory.fromConfig("akka.event-handler-dispatcher", appl.dispatcherFactory.newDispatcher("event-handler-dispatcher").setCorePoolSize(2).build)
implicit object defaultListenerFormat extends StatelessActorFormat[DefaultListener]
@volatile
var level: Int = config.getString("akka.event-handler-level", "INFO") match {
var level: Int = appl.AkkaConfig.LogLevel match {
case "ERROR" | "error" ErrorLevel
case "WARNING" | "warning" WarningLevel
case "INFO" | "info" InfoLevel
@ -113,14 +117,14 @@ object EventHandler extends ListenerManagement {
def start() {
try {
val defaultListeners = config.getList("akka.event-handlers") match {
val defaultListeners = appl.AkkaConfig.EventHandlers match {
case Nil "akka.event.EventHandler$DefaultListener" :: Nil
case listeners listeners
}
defaultListeners foreach { listenerName
try {
ReflectiveAccess.getClassFor[Actor](listenerName) match {
case Right(actorClass) addListener(new LocalActorRef(Props(actorClass).withDispatcher(EventHandlerDispatcher), newUuid.toString, systemService = true))
case Right(actorClass) addListener(new LocalActorRef(appl, Props(actorClass).withDispatcher(EventHandlerDispatcher), newUuid.toString, systemService = true))
case Left(exception) throw exception
}
} catch {

View file

@ -10,7 +10,7 @@ import DeploymentConfig._
import akka.util._
import akka.dispatch.Promise
import akka.serialization._
import akka.AkkaException
import akka.{ AkkaException, AkkaApplication }
import scala.reflect.BeanProperty
@ -185,12 +185,12 @@ case class CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorExcept
override def printStackTrace(printWriter: PrintWriter) = cause.printStackTrace(printWriter)
}
abstract class RemoteSupport extends ListenerManagement with RemoteServerModule with RemoteClientModule {
abstract class RemoteSupport(val application: AkkaApplication) extends ListenerManagement with RemoteServerModule with RemoteClientModule {
val eventHandler: ActorRef = {
implicit object format extends StatelessActorFormat[RemoteEventHandler]
val clazz = classOf[RemoteEventHandler]
val handler = new LocalActorRef(Props(clazz), clazz.getName, true)
val handler = new LocalActorRef(application, Props(clazz), clazz.getName, true)
// add the remote client and server listener that pipes the events to the event handler system
addListener(handler)
handler
@ -221,7 +221,7 @@ abstract class RemoteSupport extends ListenerManagement with RemoteServerModule
/**
* This is the interface for the RemoteServer functionality, it's used in Actor.remote
*/
trait RemoteServerModule extends RemoteModule {
trait RemoteServerModule extends RemoteModule { this: RemoteSupport =>
protected val guard = new ReentrantGuard
/**
@ -243,16 +243,16 @@ trait RemoteServerModule extends RemoteModule {
* Starts the server up
*/
def start(): RemoteServerModule =
start(ReflectiveAccess.RemoteModule.configDefaultAddress.getAddress.getHostAddress,
ReflectiveAccess.RemoteModule.configDefaultAddress.getPort,
start(application.reflective.RemoteModule.configDefaultAddress.getAddress.getHostAddress,
application.reflective.RemoteModule.configDefaultAddress.getPort,
None)
/**
* Starts the server up
*/
def start(loader: ClassLoader): RemoteServerModule =
start(ReflectiveAccess.RemoteModule.configDefaultAddress.getAddress.getHostAddress,
ReflectiveAccess.RemoteModule.configDefaultAddress.getPort,
start(application.reflective.RemoteModule.configDefaultAddress.getAddress.getHostAddress,
application.reflective.RemoteModule.configDefaultAddress.getPort,
Option(loader))
/**
@ -330,13 +330,13 @@ trait RemoteServerModule extends RemoteModule {
def unregisterPerSession(address: String): Unit
}
trait RemoteClientModule extends RemoteModule { self: RemoteModule
trait RemoteClientModule extends RemoteModule { self: RemoteSupport
def actorFor(address: String, hostname: String, port: Int): ActorRef =
actorFor(address, Actor.TIMEOUT, hostname, port, None)
actorFor(address, application.AkkaConfig.TimeoutMillis, hostname, port, None)
def actorFor(address: String, hostname: String, port: Int, loader: ClassLoader): ActorRef =
actorFor(address, Actor.TIMEOUT, hostname, port, Some(loader))
actorFor(address, application.AkkaConfig.TimeoutMillis, hostname, port, Some(loader))
def actorFor(address: String, timeout: Long, hostname: String, port: Int): ActorRef =
actorFor(address, timeout, hostname, port, None)
@ -367,7 +367,6 @@ trait RemoteClientModule extends RemoteModule { self: RemoteModule ⇒
senderOption: Option[ActorRef],
senderFuture: Option[Promise[T]],
remoteAddress: InetSocketAddress,
timeout: Long,
isOneWay: Boolean,
actorRef: ActorRef,
loader: Option[ClassLoader]): Option[Promise[T]]

View file

@ -9,7 +9,7 @@ import akka.actor._
import akka.event.EventHandler
import akka.config.ConfigurationException
import akka.actor.UntypedChannel._
import akka.dispatch.{ Future, Futures }
import akka.dispatch.Future
import akka.util.ReflectiveAccess
import java.net.InetSocketAddress

View file

@ -27,7 +27,7 @@ case class WithListeners(f: (ActorRef) ⇒ Unit) extends ListenerMessage
trait Listeners { self: Actor
private val listeners = new ConcurrentSkipListSet[ActorRef]
protected def listenerManagement: Receive = {
protected def listenerManagement: Actor.Receive = {
case Listen(l) listeners add l
case Deafen(l) listeners remove l
case WithListeners(f) listeners foreach f

View file

@ -91,7 +91,7 @@ trait DefaultActorPool extends ActorPool { this: Actor ⇒
_delegates foreach { _ ! PoisonPill }
}
protected def _route(): Receive = {
protected def _route(): Actor.Receive = {
// for testing...
case Stat
tryReply(Stats(_delegates length))

View file

@ -4,6 +4,7 @@
package akka.routing
import akka.util.Duration
import akka.actor._
import akka.util.ReflectiveAccess
@ -58,9 +59,9 @@ object RouterType {
object RoutedProps {
final val defaultTimeout = Actor.TIMEOUT
final val defaultTimeout = Timeout(Duration.MinusInf)
final val defaultRouterFactory = () new RoundRobinRouter
final val defaultLocalOnly = !ReflectiveAccess.ClusterModule.isEnabled
final val defaultLocalOnly = false
final val defaultFailureDetectorFactory = (connections: Map[InetSocketAddress, ActorRef]) new RemoveConnectionOnFirstFailureLocalFailureDetector(connections.values)
/**

View file

@ -8,9 +8,8 @@ import akka.AkkaException
import akka.actor._
import akka.event.EventHandler
import akka.config.ConfigurationException
import akka.actor.UntypedChannel._
import akka.dispatch.{ Future, Futures }
import akka.util.ReflectiveAccess
import akka.dispatch.{ Future, MessageDispatcher }
import akka.AkkaApplication
import java.net.InetSocketAddress
import java.lang.reflect.InvocationTargetException
@ -126,15 +125,19 @@ class RemoveConnectionOnFirstFailureLocalFailureDetector extends FailureDetector
}
}
/**
* A Helper class to create actor references that use routing.
*/
object Routing {
sealed trait RoutingMessage
case class Broadcast(message: Any) extends RoutingMessage
}
/**
* A Helper class to create actor references that use routing.
*/
class Routing(val application: AkkaApplication) {
/**
* FIXME: will very likely be moved to the ActorRef.
*/
@ -144,11 +147,10 @@ object Routing {
//TODO If address exists in config, it will override the specified Props (should we attempt to merge?)
//TODO If the actor deployed uses a different config, then ignore or throw exception?
val clusteringEnabled = ReflectiveAccess.ClusterModule.isEnabled
val localOnly = props.localOnly
val clusteringEnabled = application.reflective.ClusterModule.isEnabled
if (clusteringEnabled && !props.localOnly)
ReflectiveAccess.ClusterModule.newClusteredActorRef(props)
application.reflective.ClusterModule.newClusteredActorRef(props)
else {
if (props.connections.isEmpty) //FIXME Shouldn't this be checked when instance is created so that it works with linking instead of barfing?
throw new IllegalArgumentException("A routed actorRef can't have an empty connection set")
@ -496,7 +498,7 @@ trait ScatterGatherRouter extends BasicRouter with Serializable {
* (wrapped into {@link Routing.Broadcast} and sent with "?" method). For the messages sent in a fire-forget
* mode, the router would behave as {@link RoundRobinRouter}
*/
class ScatterGatherFirstCompletedRouter extends RoundRobinRouter with ScatterGatherRouter {
class ScatterGatherFirstCompletedRouter(implicit val dispatcher: MessageDispatcher, timeout: Timeout) extends RoundRobinRouter with ScatterGatherRouter {
protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] = Future.firstCompletedOf(results)
}

View file

@ -4,10 +4,11 @@
package akka.serialization
import akka.util.ReflectiveAccess._
import akka.config.Config._
import akka.AkkaException
import akka.util.ReflectiveAccess
import akka.AkkaApplication
import scala.util.DynamicVariable
import akka.remote.RemoteSupport
case class NoSerializerFoundException(m: String) extends AkkaException(m)
@ -15,7 +16,7 @@ case class NoSerializerFoundException(m: String) extends AkkaException(m)
* Serialization module. Contains methods for serialization and deserialization as well as
* locating a Serializer for a particular class as defined in the mapping in the 'akka.conf' file.
*/
object Serialization {
class Serialization(val application: AkkaApplication) {
//TODO document me
def serialize(o: AnyRef): Either[Exception, Array[Byte]] =
@ -26,7 +27,11 @@ object Serialization {
bytes: Array[Byte],
clazz: Class[_],
classLoader: Option[ClassLoader]): Either[Exception, AnyRef] =
try { Right(serializerFor(clazz).fromBinary(bytes, Some(clazz), classLoader)) } catch { case e: Exception Left(e) }
try {
Serialization.application.withValue(application) {
Right(serializerFor(clazz).fromBinary(bytes, Some(clazz), classLoader))
}
} catch { case e: Exception Left(e) }
def findSerializerFor(o: AnyRef): Serializer = o match {
case null NullSerializer
@ -41,7 +46,7 @@ object Serialization {
* Tries to load the specified Serializer by the FQN
*/
def serializerOf(serializerFQN: String): Either[Exception, Serializer] =
createInstance(serializerFQN, ReflectiveAccess.emptyParams, ReflectiveAccess.emptyArguments)
ReflectiveAccess.createInstance(serializerFQN, ReflectiveAccess.emptyParams, ReflectiveAccess.emptyArguments)
private def serializerForBestMatchClass(cl: Class[_]): Either[Exception, Serializer] = {
if (bindings.isEmpty)
@ -49,7 +54,7 @@ object Serialization {
else {
bindings find {
case (clazzName, _)
getClassFor(clazzName) match {
ReflectiveAccess.getClassFor(clazzName) match {
case Right(clazz) clazz.isAssignableFrom(cl)
case _ false
}
@ -65,7 +70,7 @@ object Serialization {
* But "default" can be overridden in config
*/
val serializers: Map[String, Serializer] =
config.getSection("akka.actor.serializers")
application.config.getSection("akka.actor.serializers")
.map(_.map)
.getOrElse(Map())
.foldLeft(Map[String, Serializer]("default" -> akka.serialization.JavaSerializer)) {
@ -76,7 +81,7 @@ object Serialization {
/**
* bindings is a Map whose keys = FQN of class that is serializable and values = the alias of the serializer to be used
*/
val bindings: Map[String, String] = config.getSection("akka.actor.serialization-bindings") map {
val bindings: Map[String, String] = application.config.getSection("akka.actor.serialization-bindings") map {
_.map.foldLeft(Map[String, String]()) {
case (result, (k: String, vs: List[_])) result ++ (vs collect { case v: String (v, k) }) //All keys which are lists, take the Strings from them and Map them
case (result, _) result //For any other values, just skip them, TODO: print out warnings?
@ -94,3 +99,9 @@ object Serialization {
val serializerByIdentity: Map[Serializer.Identifier, Serializer] =
Map(NullSerializer.identifier -> NullSerializer) ++ serializers map { case (_, v) (v.identifier, v) }
}
object Serialization {
// TODO ensure that these are always set (i.e. withValue()) when doing deserialization
val application = new DynamicVariable[AkkaApplication](null)
}

View file

@ -3,13 +3,12 @@
*/
package akka.util
import akka.config.Config
import akka.AkkaApplication
/*
* This class is responsible for booting up a stack of bundles and then shutting them down
*/
class AkkaLoader {
class AkkaLoader(application: AkkaApplication) {
private val hasBooted = new Switch(false)
@volatile
@ -23,7 +22,7 @@ class AkkaLoader {
def boot(withBanner: Boolean, b: Bootable): Unit = hasBooted switchOn {
if (withBanner) printBanner()
println("Starting Akka...")
b.onLoad
b.onLoad(application)
Thread.currentThread.setContextClassLoader(getClass.getClassLoader)
_bundles = Some(b)
println("Akka started successfully")
@ -35,7 +34,7 @@ class AkkaLoader {
def shutdown() {
hasBooted switchOff {
println("Shutting down Akka...")
_bundles.foreach(_.onUnload)
_bundles.foreach(_.onUnload(application))
_bundles = None
println("Akka succesfully shut down")
}
@ -87,6 +86,6 @@ class AkkaLoader {
==============================================================================
Running version %s
==============================================================================
""".format(Config.VERSION))
""".format(AkkaApplication.VERSION))
}
}

View file

@ -3,8 +3,9 @@
*/
package akka.util
import akka.AkkaApplication
trait Bootable {
def onLoad() {}
def onUnload() {}
def onLoad(application: AkkaApplication) {}
def onUnload(application: AkkaApplication) {}
}

View file

@ -4,27 +4,128 @@
package akka.util
import akka.dispatch.Envelope
import akka.config.{ Config, ModuleNotAvailableException }
import akka.config.ModuleNotAvailableException
import akka.actor._
import DeploymentConfig.ReplicationScheme
import akka.config.{ Config, ModuleNotAvailableException }
import akka.config.ModuleNotAvailableException
import akka.event.EventHandler
import akka.cluster.ClusterNode
import akka.remote.{ RemoteSupport, RemoteService }
import akka.routing.{ RoutedProps, Router }
import java.net.InetSocketAddress
import akka.AkkaApplication
object ReflectiveAccess {
val loader = getClass.getClassLoader
val emptyParams: Array[Class[_]] = Array()
val emptyArguments: Array[AnyRef] = Array()
val noParams = Array[Class[_]]()
val noArgs = Array[AnyRef]()
def createInstance[T](clazz: Class[_],
params: Array[Class[_]],
args: Array[AnyRef]): Either[Exception, T] = try {
assert(clazz ne null)
assert(params ne null)
assert(args ne null)
val ctor = clazz.getDeclaredConstructor(params: _*)
ctor.setAccessible(true)
Right(ctor.newInstance(args: _*).asInstanceOf[T])
} catch {
case e: java.lang.reflect.InvocationTargetException
EventHandler.debug(this, e.getCause.toString)
Left(e)
case e: Exception
EventHandler.debug(this, e.toString)
Left(e)
}
def createInstance[T](fqn: String,
params: Array[Class[_]],
args: Array[AnyRef],
classloader: ClassLoader = loader): Either[Exception, T] = try {
assert(params ne null)
assert(args ne null)
getClassFor(fqn, classloader) match {
case Right(value)
val ctor = value.getDeclaredConstructor(params: _*)
ctor.setAccessible(true)
Right(ctor.newInstance(args: _*).asInstanceOf[T])
case Left(exception) Left(exception) //We could just cast this to Either[Exception, T] but it's ugly
}
} catch {
case e: Exception
Left(e)
}
//Obtains a reference to fqn.MODULE$
def getObjectFor[T](fqn: String, classloader: ClassLoader = loader): Either[Exception, T] = try {
getClassFor(fqn, classloader) match {
case Right(value)
val instance = value.getDeclaredField("MODULE$")
instance.setAccessible(true)
val obj = instance.get(null)
if (obj eq null) Left(new NullPointerException) else Right(obj.asInstanceOf[T])
case Left(exception) Left(exception) //We could just cast this to Either[Exception, T] but it's ugly
}
} catch {
case e: Exception
Left(e)
}
def getClassFor[T](fqn: String, classloader: ClassLoader = loader): Either[Exception, Class[T]] = try {
assert(fqn ne null)
// First, use the specified CL
val first = try {
Right(classloader.loadClass(fqn).asInstanceOf[Class[T]])
} catch {
case c: ClassNotFoundException Left(c)
}
if (first.isRight) first
else {
// Second option is to use the ContextClassLoader
val second = try {
Right(Thread.currentThread.getContextClassLoader.loadClass(fqn).asInstanceOf[Class[T]])
} catch {
case c: ClassNotFoundException Left(c)
}
if (second.isRight) second
else {
val third = try {
if (classloader ne loader) Right(loader.loadClass(fqn).asInstanceOf[Class[T]]) else Left(null) //Horrid
} catch {
case c: ClassNotFoundException Left(c)
}
if (third.isRight) third
else {
try {
Right(Class.forName(fqn).asInstanceOf[Class[T]]) // Last option is Class.forName
} catch {
case c: ClassNotFoundException Left(c)
}
}
}
}
} catch {
case e: Exception Left(e)
}
}
/**
* Helper class for reflective access to different modules in order to allow optional loading of modules.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object ReflectiveAccess {
class ReflectiveAccess(val application: AkkaApplication) {
val loader = getClass.getClassLoader
val emptyParams: Array[Class[_]] = Array()
val emptyArguments: Array[AnyRef] = Array()
import ReflectiveAccess._
/**
* Reflective access to the Cluster module.
@ -32,7 +133,7 @@ object ReflectiveAccess {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object ClusterModule {
lazy val isEnabled = Config.isClusterEnabled //&& clusterInstance.isDefined
lazy val isEnabled = application.AkkaConfig.CLUSTER_ENABLED //&& clusterInstance.isDefined
lazy val clusterRefClass: Class[_] = getClassFor("akka.cluster.ClusterActorRef") match {
case Left(e) throw e
@ -138,9 +239,9 @@ object ReflectiveAccess {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object RemoteModule {
val TRANSPORT = Config.config.getString("akka.remote.layer", "akka.remote.netty.NettyRemoteSupport")
val TRANSPORT = application.AkkaConfig.REMOTE_TRANSPORT
val configDefaultAddress = new InetSocketAddress(Config.hostname, Config.remoteServerPort)
val configDefaultAddress = new InetSocketAddress(application.hostname, application.AkkaConfig.REMOTE_SERVER_PORT)
lazy val isEnabled = remoteSupportClass.isDefined
@ -188,98 +289,4 @@ object ReflectiveAccess {
}
}
val noParams = Array[Class[_]]()
val noArgs = Array[AnyRef]()
def createInstance[T](clazz: Class[_],
params: Array[Class[_]],
args: Array[AnyRef]): Either[Exception, T] = try {
assert(clazz ne null)
assert(params ne null)
assert(args ne null)
val ctor = clazz.getDeclaredConstructor(params: _*)
ctor.setAccessible(true)
Right(ctor.newInstance(args: _*).asInstanceOf[T])
} catch {
case e: java.lang.reflect.InvocationTargetException
EventHandler.debug(this, e.getCause.toString)
Left(e)
case e: Exception
EventHandler.debug(this, e.toString)
Left(e)
}
def createInstance[T](fqn: String,
params: Array[Class[_]],
args: Array[AnyRef],
classloader: ClassLoader = loader): Either[Exception, T] = try {
assert(params ne null)
assert(args ne null)
getClassFor(fqn, classloader) match {
case Right(value)
val ctor = value.getDeclaredConstructor(params: _*)
ctor.setAccessible(true)
Right(ctor.newInstance(args: _*).asInstanceOf[T])
case Left(exception) Left(exception) //We could just cast this to Either[Exception, T] but it's ugly
}
} catch {
case e: Exception
Left(e)
}
//Obtains a reference to fqn.MODULE$
def getObjectFor[T](fqn: String, classloader: ClassLoader = loader): Either[Exception, T] = try {
getClassFor(fqn, classloader) match {
case Right(value)
val instance = value.getDeclaredField("MODULE$")
instance.setAccessible(true)
val obj = instance.get(null)
if (obj eq null) Left(new NullPointerException) else Right(obj.asInstanceOf[T])
case Left(exception) Left(exception) //We could just cast this to Either[Exception, T] but it's ugly
}
} catch {
case e: Exception
Left(e)
}
def getClassFor[T](fqn: String, classloader: ClassLoader = loader): Either[Exception, Class[T]] = try {
assert(fqn ne null)
// First, use the specified CL
val first = try {
Right(classloader.loadClass(fqn).asInstanceOf[Class[T]])
} catch {
case c: ClassNotFoundException Left(c)
}
if (first.isRight) first
else {
// Second option is to use the ContextClassLoader
val second = try {
Right(Thread.currentThread.getContextClassLoader.loadClass(fqn).asInstanceOf[Class[T]])
} catch {
case c: ClassNotFoundException Left(c)
}
if (second.isRight) second
else {
val third = try {
if (classloader ne loader) Right(loader.loadClass(fqn).asInstanceOf[Class[T]]) else Left(null) //Horrid
} catch {
case c: ClassNotFoundException Left(c)
}
if (third.isRight) third
else {
try {
Right(Class.forName(fqn).asInstanceOf[Class[T]]) // Last option is Class.forName
} catch {
case c: ClassNotFoundException Left(c)
}
}
}
}
} catch {
case e: Exception Left(e)
}
}

View file

@ -1,17 +1,19 @@
package akka.util
import scala.util.continuations._
import akka.dispatch.MessageDispatcher
import akka.actor.Timeout
package object cps {
def matchC[A, B, C, D](in: A)(pf: PartialFunction[A, B @cpsParam[C, D]]): B @cpsParam[C, D] = pf(in)
def loopC[A, U](block: U @cps[A])(implicit loop: CPSLoop[A]): Unit @cps[A] =
def loopC[A, U](block: U @cps[A])(implicit loop: CPSLoop[A], dispatcher: MessageDispatcher, timeout: Timeout): Unit @cps[A] =
loop.loopC(block)
def whileC[A, U](test: Boolean)(block: U @cps[A])(implicit loop: CPSLoop[A]): Unit @cps[A] =
def whileC[A, U](test: Boolean)(block: U @cps[A])(implicit loop: CPSLoop[A], dispatcher: MessageDispatcher, timeout: Timeout): Unit @cps[A] =
loop.whileC(test)(block)
def repeatC[A, U](times: Int)(block: U @cps[A])(implicit loop: CPSLoop[A]): Unit @cps[A] =
def repeatC[A, U](times: Int)(block: U @cps[A])(implicit loop: CPSLoop[A], dispatcher: MessageDispatcher, timeout: Timeout): Unit @cps[A] =
loop.repeatC(times)(block)
}
@ -22,20 +24,20 @@ package cps {
}
trait CPSLoop[A] {
def loopC[U](block: U @cps[A]): Unit @cps[A]
def whileC[U](test: Boolean)(block: U @cps[A]): Unit @cps[A]
def repeatC[U](times: Int)(block: U @cps[A]): Unit @cps[A]
def loopC[U](block: U @cps[A])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Unit @cps[A]
def whileC[U](test: Boolean)(block: U @cps[A])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Unit @cps[A]
def repeatC[U](times: Int)(block: U @cps[A])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Unit @cps[A]
}
import akka.dispatch.{ Future, Promise }
class FutureCPSLoop extends CPSLoop[Future[Any]] {
def loopC[U](block: U @cps[Future[Any]]): Unit @cps[Future[Any]] =
def loopC[U](block: U @cps[Future[Any]])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Unit @cps[Future[Any]] =
shift { c: (Unit Future[Any])
Future(reify(block) flatMap (_ reify(loopC(block))) foreach c)
}
def whileC[U](test: Boolean)(block: U @cps[Future[Any]]): Unit @cps[Future[Any]] =
def whileC[U](test: Boolean)(block: U @cps[Future[Any]])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Unit @cps[Future[Any]] =
shift { c: (Unit Future[Any])
if (test)
Future(reify(block) flatMap (_ reify(whileC(test)(block))) foreach c)
@ -43,7 +45,7 @@ package cps {
Promise() completeWithResult (shiftUnitR[Unit, Future[Any]](()) foreach c)
}
def repeatC[U](times: Int)(block: U @cps[Future[Any]]): Unit @cps[Future[Any]] =
def repeatC[U](times: Int)(block: U @cps[Future[Any]])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Unit @cps[Future[Any]] =
shift { c: (Unit Future[Any])
if (times > 0)
Future(reify(block) flatMap (_ reify(repeatC(times - 1)(block))) foreach c)
@ -55,19 +57,19 @@ package cps {
trait DefaultCPSLoop {
implicit def defaultCPSLoop[A] = new CPSLoop[A] {
def loopC[U](block: U @cps[A]): Unit @cps[A] = {
def loopC[U](block: U @cps[A])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Unit @cps[A] = {
block
loopC(block)
}
def whileC[U](test: Boolean)(block: U @cps[A]): Unit @cps[A] = {
def whileC[U](test: Boolean)(block: U @cps[A])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Unit @cps[A] = {
if (test) {
block
whileC(test)(block)
}
}
def repeatC[U](times: Int)(block: U @cps[A]): Unit @cps[A] = {
def repeatC[U](times: Int)(block: U @cps[A])(implicit dispatcher: MessageDispatcher, timeout: Timeout): Unit @cps[A] = {
if (times > 0) {
block
repeatC(times - 1)(block)

View file

@ -7,9 +7,9 @@ package akka.testkit
import akka.actor._
import akka.util.ReflectiveAccess
import akka.event.EventHandler
import com.eaio.uuid.UUID
import akka.actor.Props._
import akka.AkkaApplication
/**
* This special ActorRef is exclusively for use during unit testing in a single-threaded environment. Therefore, it
@ -19,7 +19,8 @@ import akka.actor.Props._
* @author Roland Kuhn
* @since 1.1
*/
class TestActorRef[T <: Actor](props: Props, address: String) extends LocalActorRef(props.withDispatcher(CallingThreadDispatcher.global), address, false) {
class TestActorRef[T <: Actor](application: AkkaApplication, props: Props, address: String)
extends LocalActorRef(application, props.withDispatcher(CallingThreadDispatcher.global), address, false) {
/**
* Directly inject messages into actor receive behavior. Any exceptions
* thrown will be available to you, while still being able to use
@ -41,19 +42,19 @@ class TestActorRef[T <: Actor](props: Props, address: String) extends LocalActor
object TestActorRef {
def apply[T <: Actor](factory: T): TestActorRef[T] = apply[T](Props(factory), new UUID().toString)
def apply[T <: Actor](factory: T)(implicit application: AkkaApplication): TestActorRef[T] = apply[T](Props(factory), new UUID().toString)
def apply[T <: Actor](factory: T, address: String): TestActorRef[T] = apply[T](Props(factory), address)
def apply[T <: Actor](factory: T, address: String)(implicit application: AkkaApplication): TestActorRef[T] = apply[T](Props(factory), address)
def apply[T <: Actor](props: Props): TestActorRef[T] = apply[T](props, new UUID().toString)
def apply[T <: Actor](props: Props)(implicit application: AkkaApplication): TestActorRef[T] = apply[T](props, new UUID().toString)
def apply[T <: Actor](props: Props, address: String): TestActorRef[T] = new TestActorRef(props, address)
def apply[T <: Actor](props: Props, address: String)(implicit application: AkkaApplication): TestActorRef[T] = new TestActorRef(application, props, address)
def apply[T <: Actor: Manifest]: TestActorRef[T] = apply[T](new UUID().toString)
def apply[T <: Actor](implicit m: Manifest[T], application: AkkaApplication): TestActorRef[T] = apply[T](new UUID().toString)
def apply[T <: Actor: Manifest](address: String): TestActorRef[T] = apply[T](Props({
def apply[T <: Actor](address: String)(implicit m: Manifest[T], application: AkkaApplication): TestActorRef[T] = apply[T](Props({
import ReflectiveAccess.{ createInstance, noParams, noArgs }
createInstance[T](manifest[T].erasure, noParams, noArgs) match {
createInstance[T](m.erasure, noParams, noArgs) match {
case Right(value) value
case Left(exception) throw new ActorInitializationException(
"Could not instantiate Actor" +

View file

@ -85,12 +85,12 @@ class TestEventListener extends EventHandler.DefaultListener {
var filters: List[EventFilter] = Nil
override def receive: Receive = ({
override def receive: Actor.Receive = ({
case Mute(filters) filters foreach addFilter
case UnMute(filters) filters foreach removeFilter
case UnMuteAll filters = Nil
case event: Event if filter(event)
}: Receive) orElse super.receive
}: Actor.Receive) orElse super.receive
def filter(event: Event): Boolean = filters exists (f try { f(event) } catch { case e: Exception false })

View file

@ -6,8 +6,8 @@ package akka.testkit
import akka.actor._
import akka.util._
import com.eaio.uuid.UUID
import akka.AkkaApplication
/**
* This is a specialised form of the TestActorRef with support for querying and
@ -34,7 +34,8 @@ import com.eaio.uuid.UUID
* @author Roland Kuhn
* @since 1.2
*/
class TestFSMRef[S, D, T <: Actor](props: Props, address: String)(implicit ev: T <:< FSM[S, D]) extends TestActorRef(props, address) {
class TestFSMRef[S, D, T <: Actor](application: AkkaApplication, props: Props, address: String)(implicit ev: T <:< FSM[S, D])
extends TestActorRef(application, props, address) {
private def fsm: T = underlyingActor
@ -79,8 +80,10 @@ class TestFSMRef[S, D, T <: Actor](props: Props, address: String)(implicit ev: T
object TestFSMRef {
def apply[S, D, T <: Actor](factory: T)(implicit ev: T <:< FSM[S, D]): TestFSMRef[S, D, T] = new TestFSMRef(Props(creator = () factory), new UUID().toString)
def apply[S, D, T <: Actor](factory: T)(implicit ev: T <:< FSM[S, D], application: AkkaApplication): TestFSMRef[S, D, T] =
new TestFSMRef(application, Props(creator = () factory), new UUID().toString)
def apply[S, D, T <: Actor](factory: T, address: String)(implicit ev: T <:< FSM[S, D]): TestFSMRef[S, D, T] = new TestFSMRef(Props(creator = () factory), address)
def apply[S, D, T <: Actor](factory: T, address: String)(implicit ev: T <:< FSM[S, D], application: AkkaApplication): TestFSMRef[S, D, T] =
new TestFSMRef(application, Props(creator = () factory), address)
}

View file

@ -7,11 +7,10 @@ import akka.actor._
import Actor._
import akka.util.Duration
import akka.util.duration._
import java.util.concurrent.{ BlockingDeque, LinkedBlockingDeque, TimeUnit, atomic }
import atomic.AtomicInteger
import scala.annotation.tailrec
import akka.AkkaApplication
object TestActor {
type Ignore = Option[PartialFunction[AnyRef, Boolean]]
@ -87,10 +86,14 @@ class TestActor(queue: BlockingDeque[TestActor.Message]) extends Actor with FSM[
* @author Roland Kuhn
* @since 1.1
*/
trait TestKitLight {
class TestKit(_app: AkkaApplication = AkkaApplication()) {
import TestActor.{ Message, RealMessage, NullMessage }
implicit val application = _app
implicit val defaultFutureTimeout = _app.AkkaConfig.TIMEOUT
implicit val defaultFutureDispatcher = _app.dispatcher
private val queue = new LinkedBlockingDeque[Message]()
private[akka] var lastMessage: Message = NullMessage
@ -98,7 +101,7 @@ trait TestKitLight {
* ActorRef of the test actor. Access is provided to enable e.g.
* registration as message target.
*/
val testActor = new LocalActorRef(Props(new TestActor(queue)).copy(dispatcher = CallingThreadDispatcher.global), "testActor" + TestKit.testActorId.incrementAndGet(), true)
val testActor = new LocalActorRef(application, Props(new TestActor(queue)).copy(dispatcher = CallingThreadDispatcher.global), "testActor" + TestKit.testActorId.incrementAndGet(), true)
/**
* Implicit sender reference so that replies are possible for messages sent
@ -548,14 +551,10 @@ object TestKit {
private[testkit] val testActorId = new AtomicInteger(0)
}
trait TestKit extends TestKitLight {
implicit val self = testActor
}
/**
* TestKit-based probe which allows sending, reception and reply.
*/
class TestProbe extends TestKit {
class TestProbe(_application: AkkaApplication) extends TestKit(_application) {
/**
* Shorthand to get the testActor.
@ -586,5 +585,5 @@ class TestProbe extends TestKit {
}
object TestProbe {
def apply() = new TestProbe
def apply()(implicit application: AkkaApplication) = new TestProbe(application)
}

View file

@ -8,6 +8,7 @@ import org.scalatest.{ BeforeAndAfterEach, WordSpec }
import akka.actor._
import akka.event.EventHandler
import akka.dispatch.{ Future, Promise }
import akka.AkkaApplication
/**
* Test whether TestActorRef behaves as an ActorRef should, besides its own spec.
@ -89,7 +90,7 @@ object TestActorRefSpec {
}
class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEach {
class TestActorRefSpec extends TestKit with WordSpec with MustMatchers with BeforeAndAfterEach {
import TestActorRefSpec._
@ -120,7 +121,7 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac
"used with ActorRef" in {
val a = TestActorRef(Props(new Actor {
val nested = Actor.actorOf(Props(self { case _ }))
val nested = context.createActor(Props(self { case _ }))
def receive = { case _ reply(nested) }
}))
a must not be (null)
@ -225,7 +226,7 @@ class TestActorRefSpec extends WordSpec with MustMatchers with BeforeAndAfterEac
"proxy apply for the underlying actor" in {
val ref = TestActorRef[WorkerActor]
intercept[IllegalActorStateException] { ref("work") }
val ch = Promise.channel()
val ch = Promise.channel(5000)
ref ! ch
ch must be('completed)
ch.get must be("complexReply")

View file

@ -9,7 +9,7 @@ import org.scalatest.{ BeforeAndAfterEach, WordSpec }
import akka.actor._
import akka.util.duration._
class TestFSMRefSpec extends WordSpec with MustMatchers with TestKit {
class TestFSMRefSpec extends TestKit with WordSpec with MustMatchers {
import FSM._

View file

@ -8,7 +8,7 @@ import akka.event.EventHandler
import akka.dispatch.Future
import akka.util.duration._
class TestProbeSpec extends WordSpec with MustMatchers {
class TestProbeSpec extends TestKit with WordSpec with MustMatchers {
"A TestProbe" must {

View file

@ -4,7 +4,7 @@ import org.scalatest.matchers.MustMatchers
import org.scalatest.{ BeforeAndAfterEach, WordSpec }
import akka.util.Duration
class TestTimeSpec extends WordSpec with MustMatchers with BeforeAndAfterEach {
class TestTimeSpec extends TestKit with WordSpec with MustMatchers with BeforeAndAfterEach {
val tf = Duration.timeFactor