diff --git a/docs/scaladocs-akka-actors/_highlighter/SyntaxHighlighter.css b/docs/scaladocs-akka-actors/_highlighter/SyntaxHighlighter.css new file mode 100644 index 0000000000..f7b31dae3c --- /dev/null +++ b/docs/scaladocs-akka-actors/_highlighter/SyntaxHighlighter.css @@ -0,0 +1,35 @@ +.dp-highlighter{font-family:"Consolas","Courier New",Courier,mono,serif;font-size:12px;background-color:#E7E5DC;width:99%;overflow:auto;margin:18px 0 18px 0!important;padding-top:1px;} +.dp-highlighter ol,.dp-highlighter ol li,.dp-highlighter ol li span{margin:0;padding:0;border:none;} +.dp-highlighter a,.dp-highlighter a:hover{background:none;border:none;padding:0;margin:0;} +.dp-highlighter .bar{padding-left:45px;} +.dp-highlighter.collapsed .bar,.dp-highlighter.nogutter .bar{padding-left:0;} +.dp-highlighter ol{list-style:decimal;background-color:#fff;margin:0 0 1px 45px!important;padding:0;color:#5C5C5C;} +.dp-highlighter.nogutter ol,.dp-highlighter.nogutter ol li{list-style:none!important;margin-left:0!important;} +.dp-highlighter ol li,.dp-highlighter .columns div{list-style:decimal-leading-zero;list-style-position:outside!important;border-left:3px solid #6CE26C;background-color:#F8F8F8;color:#5C5C5C;padding:0 3px 0 10px!important;margin:0!important;line-height:14px;} +.dp-highlighter.nogutter ol li,.dp-highlighter.nogutter .columns div{border:0;} +.dp-highlighter .columns{background-color:#F8F8F8;color:gray;overflow:hidden;width:100%;} +.dp-highlighter .columns div{padding-bottom:5px;} +.dp-highlighter ol li.alt{background-color:#FFF;color:inherit;} +.dp-highlighter ol li span{color:black;background-color:inherit;} +.dp-highlighter.collapsed ol{margin:0;} +.dp-highlighter.collapsed ol li{display:none;} +.dp-highlighter.printing{border:none;} +.dp-highlighter.printing .tools{display:none!important;} +.dp-highlighter.printing li{display:list-item!important;} +.dp-highlighter .tools{padding:3px 8px 3px 10px;font:9px Verdana,Geneva,Arial,Helvetica,sans-serif;color:silver;background-color:#f8f8f8;padding-bottom:10px;border-left:3px solid #6CE26C;} +.dp-highlighter.nogutter .tools{border-left:0;} +.dp-highlighter.collapsed .tools{border-bottom:0;} +.dp-highlighter .tools a{font-size:9px;color:#a0a0a0;background-color:inherit;text-decoration:none;margin-right:10px;} +.dp-highlighter .tools a:hover{color:red;background-color:inherit;text-decoration:underline;} +.dp-about{background-color:#fff;color:#333;margin:0;padding:0;} +.dp-about table{width:100%;height:100%;font-size:11px;font-family:Tahoma,Verdana,Arial,sans-serif!important;} +.dp-about td{padding:10px;vertical-align:top;} +.dp-about .copy{border-bottom:1px solid #ACA899;height:95%;} +.dp-about .title{color:red;background-color:inherit;font-weight:bold;} +.dp-about .para{margin:0 0 4px 0;} +.dp-about .footer{background-color:#ECEADB;color:#333;border-top:1px solid #fff;text-align:right;} +.dp-about .close{font-size:11px;font-family:Tahoma,Verdana,Arial,sans-serif!important;background-color:#ECEADB;color:#333;width:60px;height:22px;} +.dp-highlighter .comment,.dp-highlighter .comments{color:#008200;background-color:inherit;} +.dp-highlighter .string{color:blue;background-color:inherit;} +.dp-highlighter .keyword{color:#069;font-weight:bold;background-color:inherit;} +.dp-highlighter .preprocessor{color:gray;background-color:inherit;} \ No newline at end of file diff --git a/docs/scaladocs-akka-actors/_highlighter/clipboard.swf b/docs/scaladocs-akka-actors/_highlighter/clipboard.swf new file mode 100644 index 0000000000..2cfe37185b Binary files /dev/null and b/docs/scaladocs-akka-actors/_highlighter/clipboard.swf differ diff --git a/docs/scaladocs-akka-actors/_highlighter/shAll.js b/docs/scaladocs-akka-actors/_highlighter/shAll.js new file mode 100644 index 0000000000..76ce6a1945 --- /dev/null +++ b/docs/scaladocs-akka-actors/_highlighter/shAll.js @@ -0,0 +1,350 @@ +var dp={sh:{Toolbar:{},Utils:{},RegexLib:{},Brushes:{},Strings:{AboutDialog:'
dp.SyntaxHighlighter Version: {V}©2004-2007 Alex Gorbatchev. |
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.actor
+
+import java.net.InetSocketAddress
+
+import se.scalablesolutions.akka.dispatch.{MessageDispatcher, FutureResult}
+import se.scalablesolutions.akka.nio.protobuf.RemoteProtocol.RemoteRequest
+import se.scalablesolutions.akka.nio.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory}
+import se.scalablesolutions.akka.config.ScalaConfig._
+import se.scalablesolutions.akka.util._
+
+import org.codehaus.aspectwerkz.joinpoint.{MethodRtti, JoinPoint}
+import org.codehaus.aspectwerkz.proxy.Proxy
+import org.codehaus.aspectwerkz.annotation.{Aspect, Around}
+import se.scalablesolutions.akka.serialization.Serializer
+import java.lang.reflect.{InvocationTargetException, Method}
+
+object Annotations {
+ import se.scalablesolutions.akka.annotation._
+ val oneway = classOf[oneway]
+ val transactionrequired = classOf[transactionrequired]
+ val prerestart = classOf[prerestart]
+ val postrestart = classOf[postrestart]
+ val immutable = classOf[immutable]
+ val inittransactionalstate = classOf[inittransactionalstate]
+}
+
+/**
+ *
+ * @author <a href="http://jonasboner.com">Jonas Bonér</a>
+ */
+object ActiveObject {
+ val AKKA_CAMEL_ROUTING_SCHEME = "akka"
+
+ def newInstance[T](target: Class[T], timeout: Long): T =
+ newInstance(target, new Dispatcher(None), None, timeout)
+
+ def newInstance[T](target: Class[T], timeout: Long, restartCallbacks: Option[RestartCallbacks]): T =
+ newInstance(target, new Dispatcher(restartCallbacks), None, timeout)
+
+ def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long): T =
+ newInstance(intf, target, new Dispatcher(None), None, timeout)
+
+ def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, restartCallbacks: Option[RestartCallbacks]): T =
+ newInstance(intf, target, new Dispatcher(restartCallbacks), None, timeout)
+
+ def newRemoteInstance[T](target: Class[T], timeout: Long, hostname: String, port: Int): T =
+ newInstance(target, new Dispatcher(None), Some(new InetSocketAddress(hostname, port)), timeout)
+
+ def newRemoteInstance[T](target: Class[T], timeout: Long, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T =
+ newInstance(target, new Dispatcher(restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout)
+
+ def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, hostname: String, port: Int): T =
+ newInstance(intf, target, new Dispatcher(None), Some(new InetSocketAddress(hostname, port)), timeout)
+
+ def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T =
+ newInstance(intf, target, new Dispatcher(restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout)
+
+ def newInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher): T = {
+ val actor = new Dispatcher(None)
+ actor.messageDispatcher = dispatcher
+ newInstance(target, actor, None, timeout)
+ }
+
+ def newInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = {
+ val actor = new Dispatcher(restartCallbacks)
+ actor.messageDispatcher = dispatcher
+ newInstance(target, actor, None, timeout)
+ }
+
+ def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher): T = {
+ val actor = new Dispatcher(None)
+ actor.messageDispatcher = dispatcher
+ newInstance(intf, target, actor, None, timeout)
+ }
+
+ def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = {
+ val actor = new Dispatcher(restartCallbacks)
+ actor.messageDispatcher = dispatcher
+ newInstance(intf, target, actor, None, timeout)
+ }
+
+ def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = {
+ val actor = new Dispatcher(None)
+ actor.messageDispatcher = dispatcher
+ newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
+ }
+
+ def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = {
+ val actor = new Dispatcher(restartCallbacks)
+ actor.messageDispatcher = dispatcher
+ newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
+ }
+
+ def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = {
+ val actor = new Dispatcher(None)
+ actor.messageDispatcher = dispatcher
+ newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
+ }
+
+ def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = {
+ val actor = new Dispatcher(restartCallbacks)
+ actor.messageDispatcher = dispatcher
+ newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
+ }
+
+ private[akka] def newInstance[T](target: Class[T], actor: Dispatcher, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
+ val proxy = Proxy.newInstance(target, false, true)
+ actor.initialize(target, proxy)
+ actor.timeout = timeout
+ actor.start
+ AspectInitRegistry.register(proxy, AspectInit(target, actor, remoteAddress, timeout))
+ proxy.asInstanceOf[T]
+ }
+
+ private[akka] def newInstance[T](intf: Class[T], target: AnyRef, actor: Dispatcher, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
+ val proxy = Proxy.newInstance(Array(intf), Array(target), false, true)
+ actor.initialize(target.getClass, target)
+ actor.timeout = timeout
+ actor.start
+ AspectInitRegistry.register(proxy, AspectInit(intf, actor, remoteAddress, timeout))
+ proxy.asInstanceOf[T]
+ }
+
+
+ private[akka] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): Supervisor = {
+ object factory extends SupervisorFactory {
+ override def getSupervisorConfig = SupervisorConfig(restartStrategy, components)
+ }
+ val supervisor = factory.newSupervisor
+ supervisor ! StartSupervisor
+ supervisor
+ }
+}
+
+private[akka] object AspectInitRegistry {
+ private val inits = new java.util.concurrent.ConcurrentHashMap[AnyRef, AspectInit]
+
+ def initFor(target: AnyRef) = {
+ val init = inits.get(target)
+ inits.remove(target)
+ init
+ }
+
+ def register(target: AnyRef, init: AspectInit) = inits.put(target, init)
+}
+
+private[akka] sealed case class AspectInit(
+ val target: Class[_],
+ val actor: Dispatcher,
+ val remoteAddress: Option[InetSocketAddress],
+ val timeout: Long)
+
+/**
+ * AspectWerkz Aspect that is turning POJOs into Active Object.
+ * Is deployed on a 'per-instance' basis.
+ *
+ * @author <a href="http://jonasboner.com">Jonas Bonér</a>
+ */
+@Aspect("perInstance")
+private[akka] sealed class ActiveObjectAspect {
+ @volatile var isInitialized = false
+ var target: Class[_] = _
+ var actor: Dispatcher = _
+ var remoteAddress: Option[InetSocketAddress] = _
+ var timeout: Long = _
+
+ @Around("execution(* *.*(..))")
+ def invoke(joinPoint: JoinPoint): AnyRef = {
+ if (!isInitialized) {
+ val init = AspectInitRegistry.initFor(joinPoint.getThis)
+ target = init.target
+ actor = init.actor
+ remoteAddress = init.remoteAddress
+ timeout = init.timeout
+ isInitialized = true
+ }
+ dispatch(joinPoint)
+ }
+
+ private def dispatch(joinPoint: JoinPoint) = {
+ if (remoteAddress.isDefined) remoteDispatch(joinPoint)
+ else localDispatch(joinPoint)
+ }
+
+ private def localDispatch(joinPoint: JoinPoint): AnyRef = {
+ val rtti = joinPoint.getRtti.asInstanceOf[MethodRtti]
+ if (isOneWay(rtti)) actor ! Invocation(joinPoint, true, true)
+ else {
+ val result = actor !! Invocation(joinPoint, false, isVoid(rtti))
+ if (result.isDefined) result.get
+ else throw new IllegalStateException("No result defined for invocation [" + joinPoint + "]")
+ }
+ }
+
+ private def remoteDispatch(joinPoint: JoinPoint): AnyRef = {
+ val rtti = joinPoint.getRtti.asInstanceOf[MethodRtti]
+ val oneWay_? = isOneWay(rtti)
+ val (message: Array[AnyRef], isEscaped) = escapeArguments(rtti.getParameterValues)
+ val requestBuilder = RemoteRequest.newBuilder
+ .setId(RemoteRequestIdFactory.nextId)
+ .setMethod(rtti.getMethod.getName)
+ .setTarget(target.getName)
+ .setTimeout(timeout)
+ .setIsActor(false)
+ .setIsOneWay(oneWay_?)
+ .setIsEscaped(false)
+ RemoteProtocolBuilder.setMessage(message, requestBuilder)
+ val id = actor.registerSupervisorAsRemoteActor
+ if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
+ val remoteMessage = requestBuilder.build
+ val future = RemoteClient.clientFor(remoteAddress.get).send(remoteMessage)
+ if (oneWay_?) null // for void methods
+ else {
+ if (future.isDefined) {
+ future.get.await
+ val result = getResultOrThrowException(future.get)
+ if (result.isDefined) result.get
+ else throw new IllegalStateException("No result returned from call to [" + joinPoint + "]")
+ } else throw new IllegalStateException("No future returned from call to [" + joinPoint + "]")
+ }
+ }
+
+ private def getResultOrThrowException[T](future: FutureResult): Option[T] =
+ if (future.exception.isDefined) {
+ val (_, cause) = future.exception.get
+ throw cause
+ } else future.result.asInstanceOf[Option[T]]
+
+ private def isOneWay(rtti: MethodRtti) = rtti.getMethod.isAnnotationPresent(Annotations.oneway)
+
+ private def isVoid(rtti: MethodRtti) = rtti.getMethod.getReturnType == java.lang.Void.TYPE
+
+ private def escapeArguments(args: Array[AnyRef]): Tuple2[Array[AnyRef], Boolean] = {
+ var isEscaped = false
+ val escapedArgs = for (arg <- args) yield {
+ val clazz = arg.getClass
+ if (clazz.getName.contains("$$ProxiedByAW")) {
+ isEscaped = true
+ "$$ProxiedByAW" + clazz.getSuperclass.getName
+ } else arg
+ }
+ (escapedArgs, isEscaped)
+ }
+}
+
+/**
+ * Represents a snapshot of the current invocation.
+ *
+ * @author <a href="http://jonasboner.com">Jonas Bonér</a>
+ */
+@serializable private[akka] case class Invocation(joinPoint: JoinPoint, isOneWay: Boolean, isVoid: Boolean) {
+
+ override def toString: String = synchronized {
+ "Invocation [joinPoint: " + joinPoint.toString + ", isOneWay: " + isOneWay + ", isVoid: " + isVoid + "]"
+ }
+
+ override def hashCode: Int = synchronized {
+ var result = HashCode.SEED
+ result = HashCode.hash(result, joinPoint)
+ result = HashCode.hash(result, isOneWay)
+ result = HashCode.hash(result, isVoid)
+ result
+ }
+
+ override def equals(that: Any): Boolean = synchronized {
+ that != null &&
+ that.isInstanceOf[Invocation] &&
+ that.asInstanceOf[Invocation].joinPoint == joinPoint &&
+ that.asInstanceOf[Invocation].isOneWay == isOneWay &&
+ that.asInstanceOf[Invocation].isVoid == isVoid
+ }
+}
+
+/**
+ * Generic Actor managing Invocation dispatch, transaction and error management.
+ *
+ * @author <a href="http://jonasboner.com">Jonas Bonér</a>
+ */
+private[akka] class Dispatcher(val callbacks: Option[RestartCallbacks]) extends Actor {
+ private val ZERO_ITEM_CLASS_ARRAY = Array[Class[_]]()
+ private val ZERO_ITEM_OBJECT_ARRAY = Array[Object[_]]()
+
+ private[actor] var target: Option[AnyRef] = None
+ private var preRestart: Option[Method] = None
+ private var postRestart: Option[Method] = None
+ private var initTxState: Option[Method] = None
+
+ private[actor] def initialize(targetClass: Class[_], targetInstance: AnyRef) = {
+ if (targetClass.isAnnotationPresent(Annotations.transactionrequired)) makeTransactionRequired
+ id = targetClass.getName
+ target = Some(targetInstance)
+ val methods = targetInstance.getClass.getDeclaredMethods.toList
+
+ // See if we have any config define restart callbacks
+ callbacks match {
+ case None => {}
+ case Some(RestartCallbacks(pre, post)) =>
+ preRestart = Some(try {
+ targetInstance.getClass.getDeclaredMethod(pre, ZERO_ITEM_CLASS_ARRAY: _*)
+ } catch { case e => throw new IllegalStateException("Could not find pre restart method [" + pre + "] in [" + targetClass.getName + "]. It must have a zero argument definition.") })
+ postRestart = Some(try {
+ targetInstance.getClass.getDeclaredMethod(post, ZERO_ITEM_CLASS_ARRAY: _*)
+ } catch { case e => throw new IllegalStateException("Could not find post restart method [" + post + "] in [" + targetClass.getName + "]. It must have a zero argument definition.") })
+ }
+
+ // See if we have any annotation defined restart callbacks
+ if (!preRestart.isDefined) preRestart = methods.find(m => m.isAnnotationPresent(Annotations.prerestart))
+ if (!postRestart.isDefined) postRestart = methods.find(m => m.isAnnotationPresent(Annotations.postrestart))
+
+ if (preRestart.isDefined && preRestart.get.getParameterTypes.length != 0)
+ throw new IllegalStateException("Method annotated with @prerestart or defined as a restart callback in [" + targetClass.getName + "] must have a zero argument definition")
+ if (postRestart.isDefined && postRestart.get.getParameterTypes.length != 0)
+ throw new IllegalStateException("Method annotated with @postrestart or defined as a restart callback in [" + targetClass.getName + "] must have a zero argument definition")
+
+ if (preRestart.isDefined) preRestart.get.setAccessible(true)
+ if (postRestart.isDefined) postRestart.get.setAccessible(true)
+
+ // see if we have a method annotated with @inittransactionalstate, if so invoke it
+ //initTxState = methods.find(m => m.isAnnotationPresent(Annotations.inittransactionalstate))
+ //if (initTxState.isDefined && initTxState.get.getParameterTypes.length != 0) throw new IllegalStateException("Method annotated with @inittransactionalstate must have a zero argument definition")
+ //if (initTxState.isDefined) initTxState.get.setAccessible(true)
+ }
+
+ override def receive: PartialFunction[Any, Unit] = {
+ case Invocation(joinPoint, isOneWay, _) =>
+ if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint)
+ if (isOneWay) joinPoint.proceed
+ else reply(joinPoint.proceed)
+ case unexpected =>
+ throw new IllegalStateException("Unexpected message [" + unexpected + "] sent to [" + this + "]")
+ }
+
+ override protected def preRestart(reason: AnyRef, config: Option[AnyRef]) {
+ try {
+ if (preRestart.isDefined) preRestart.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)
+ } catch { case e: InvocationTargetException => throw e.getCause }
+ }
+
+ override protected def postRestart(reason: AnyRef, config: Option[AnyRef]) {
+ try {
+ if (postRestart.isDefined) postRestart.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)
+ } catch { case e: InvocationTargetException => throw e.getCause }
+ }
+
+ //override protected def initTransactionalState = {
+ // try {
+ // if (initTxState.isDefined && target.isDefined) initTxState.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)
+ // } catch { case e: InvocationTargetException => throw e.getCause }
+ //}
+
+ private def serializeArguments(joinPoint: JoinPoint) = {
+ val args = joinPoint.getRtti.asInstanceOf[MethodRtti].getParameterValues
+ var unserializable = false
+ var hasMutableArgument = false
+ for (arg <- args.toList) {
+ if (!arg.isInstanceOf[String] &&
+ !arg.isInstanceOf[Byte] &&
+ !arg.isInstanceOf[Int] &&
+ !arg.isInstanceOf[Long] &&
+ !arg.isInstanceOf[Float] &&
+ !arg.isInstanceOf[Double] &&
+ !arg.isInstanceOf[Boolean] &&
+ !arg.isInstanceOf[Char] &&
+ !arg.isInstanceOf[java.lang.Byte] &&
+ !arg.isInstanceOf[java.lang.Integer] &&
+ !arg.isInstanceOf[java.lang.Long] &&
+ !arg.isInstanceOf[java.lang.Float] &&
+ !arg.isInstanceOf[java.lang.Double] &&
+ !arg.isInstanceOf[java.lang.Boolean] &&
+ !arg.isInstanceOf[java.lang.Character] &&
+ !arg.getClass.isAnnotationPresent(Annotations.immutable)) {
+ hasMutableArgument = true
+ }
+ if (arg.getClass.getName.contains("$$ProxiedByAWSubclassing$$")) unserializable = true
+ }
+ if (!unserializable && hasMutableArgument) {
+ // FIXME: can we have another default deep cloner?
+ val copyOfArgs = Serializer.Java.deepClone(args)
+ joinPoint.getRtti.asInstanceOf[MethodRtti].setParameterValues(copyOfArgs.asInstanceOf[Array[AnyRef]])
+ }
+ }
+}
+
+/*
+ublic class CamelInvocationHandler implements InvocationHandler {
+ private final Endpoint endpoint;
+ private final Producer producer;
+ private final MethodInfoCache methodInfoCache;
+
+ public CamelInvocationHandler(Endpoint endpoint, Producer producer, MethodInfoCache methodInfoCache) {
+ this.endpoint = endpoint;
+ this.producer = producer;
+ this.methodInfoCache = methodInfoCache;
+ }
+
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ BeanInvocation invocation = new BeanInvocation(method, args);
+ ExchangePattern pattern = ExchangePattern.InOut;
+ MethodInfo methodInfo = methodInfoCache.getMethodInfo(method);
+ if (methodInfo != null) {
+ pattern = methodInfo.getPattern();
+ }
+ Exchange exchange = new DefaultExchange(endpoint, pattern);
+ exchange.getIn().setBody(invocation);
+
+ producer.process(exchange);
+ Throwable fault = exchange.getException();
+ if (fault != null) {
+ throw new InvocationTargetException(fault);
+ }
+ if (pattern.isOutCapable()) {
+ return exchange.getOut().getBody();
+ } else {
+ return null;
+ }
+ }
+}
+
+ if (joinpoint.target.isInstanceOf[MessageDriven] &&
+ joinpoint.method.getName == "onMessage") {
+ val m = joinpoint.method
+
+ val endpointName = m.getDeclaringClass.getName + "." + m.getName
+ val activeObjectName = m.getDeclaringClass.getName
+ val endpoint = conf.getRoutingEndpoint(conf.lookupUriFor(m))
+ val producer = endpoint.createProducer
+ val exchange = endpoint.createExchange
+ exchange.getIn().setBody(joinpoint)
+ producer.process(exchange)
+ val fault = exchange.getException();
+ if (fault != null) throw new InvocationTargetException(fault)
+
+ // FIXME: need some timeout and future here...
+ exchange.getOut.getBody
+
+ } else
+*/
+
+
+
+
diff --git a/docs/scaladocs-akka-actors/actor/Actor.scala.html b/docs/scaladocs-akka-actors/actor/Actor.scala.html
new file mode 100644
index 0000000000..e6dcc18edb
--- /dev/null
+++ b/docs/scaladocs-akka-actors/actor/Actor.scala.html
@@ -0,0 +1,684 @@
+
+
+
+
+
+
+
+
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.actor
+
+import java.net.InetSocketAddress
+import java.util.HashSet
+
+import se.scalablesolutions.akka.Config._
+import se.scalablesolutions.akka.dispatch._
+import se.scalablesolutions.akka.config.ScalaConfig._
+import se.scalablesolutions.akka.stm.Transaction._
+import se.scalablesolutions.akka.stm.TransactionManagement._
+import se.scalablesolutions.akka.stm.{StmException, TransactionManagement}
+import se.scalablesolutions.akka.nio.protobuf.RemoteProtocol.RemoteRequest
+import se.scalablesolutions.akka.nio.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory}
+import se.scalablesolutions.akka.serialization.Serializer
+import se.scalablesolutions.akka.util.Helpers.ReadWriteLock
+import se.scalablesolutions.akka.util.Logging
+
+import org.codehaus.aspectwerkz.joinpoint.{MethodRtti, JoinPoint}
+
+import org.multiverse.utils.TransactionThreadLocal._
+
+sealed abstract class LifecycleMessage
+case class Init(config: AnyRef) extends LifecycleMessage
+//case object TransactionalInit extends LifecycleMessage
+case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends LifecycleMessage
+case class Restart(reason: AnyRef) extends LifecycleMessage
+case class Exit(dead: Actor, killer: Throwable) extends LifecycleMessage
+
+sealed abstract class DispatcherType
+object DispatcherType {
+ case object EventBasedThreadPooledProxyInvokingDispatcher extends DispatcherType
+ case object EventBasedSingleThreadDispatcher extends DispatcherType
+ case object EventBasedThreadPoolDispatcher extends DispatcherType
+ case object ThreadBasedDispatcher extends DispatcherType
+}
+
+/**
+ * @author <a href="http://jonasboner.com">Jonas Bonér</a>
+ */
+class ActorMessageInvoker(val actor: Actor) extends MessageInvoker {
+ def invoke(handle: MessageInvocation) = actor.invoke(handle)
+}
+
+/**
+ * @author <a href="http://jonasboner.com">Jonas Bonér</a>
+ */
+object Actor {
+ val TIMEOUT = config.getInt("akka.actor.timeout", 5000)
+ val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false)
+}
+
+/**
+ * @author <a href="http://jonasboner.com">Jonas Bonér</a>
+ */
+trait Actor extends Logging with TransactionManagement {
+ ActorRegistry.register(this)
+
+ @volatile private[this] var isRunning: Boolean = false
+ private[this] val remoteFlagLock = new ReadWriteLock
+ private[this] val transactionalFlagLock = new ReadWriteLock
+
+ private var hotswap: Option[PartialFunction[Any, Unit]] = None
+ private var config: Option[AnyRef] = None
+
+ @volatile protected[this] var isTransactionRequiresNew = false
+ @volatile protected[this] var remoteAddress: Option[InetSocketAddress] = None
+ @volatile protected[akka] var supervisor: Option[Actor] = None
+
+ protected[akka] var mailbox: MessageQueue = _
+ protected[this] var senderFuture: Option[CompletableFutureResult] = None
+ protected[this] val linkedActors = new HashSet[Actor]
+ protected[actor] var lifeCycleConfig: Option[LifeCycle] = None
+
+ val name = this.getClass.getName
+
+ // ====================================
+ // ==== USER CALLBACKS TO OVERRIDE ====
+ // ====================================
+
+ /**
+ * User overridable callback/setting.
+ *
+ * Defines the default timeout for '!!' invocations, e.g. the timeout for the future returned by the call to '!!'.
+ */
+ @volatile var timeout: Long = Actor.TIMEOUT
+
+ /**
+ * User overridable callback/setting.
+ *
+ * User can (and is encouraged to) override the default configuration so it fits the specific use-case that the actor is used for.
+ * <p/>
+ * It is beneficial to have actors share the same dispatcher, easily +100 actors can share the same.
+ * <br/>
+ * But if you are running many many actors then it can be a good idea to have split them up in terms of dispatcher sharing.
+ * <br/>
+ * Default is that all actors that are created and spawned from within this actor is sharing the same dispatcher as its creator.
+ * <pre>
+ * dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher
+ * .withNewThreadPoolWithBoundedBlockingQueue(100)
+ * .setCorePoolSize(16)
+ * .setMaxPoolSize(128)
+ * .setKeepAliveTimeInMillis(60000)
+ * .setRejectionPolicy(new CallerRunsPolicy)
+ * .buildThreadPool
+ * </pre>
+ */
+ protected[akka] var messageDispatcher: MessageDispatcher = {
+ val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher(getClass.getName)
+ mailbox = dispatcher.messageQueue
+ dispatcher.registerHandler(this, new ActorMessageInvoker(this))
+ dispatcher
+ }
+
+ /**
+ * User overridable callback/setting.
+ *
+ * Identifier for actor, does not have to be a unique one. Simply the one used in logging etc.
+ */
+ protected[this] var id: String = this.getClass.toString
+
+ /**
+ * User overridable callback/setting.
+ *
+ * Set trapExit to true if actor should be able to trap linked actors exit messages.
+ */
+ protected[this] var trapExit: Boolean = false
+
+ /**
+ * User overridable callback/setting.
+ *
+ * If 'trapExit' is set for the actor to act as supervisor, then a faultHandler must be defined.
+ * Can be one of:
+ * <pre/>
+ * AllForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int)
+ *
+ * OneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int)
+ * </pre>
+ */
+ protected var faultHandler: Option[FaultHandlingStrategy] = None
+
+ /**
+ * User overridable callback/setting.
+ *
+ * Partial function implementing the server logic.
+ * To be implemented by subclassing server.
+ * <p/>
+ * Example code:
+ * <pre>
+ * def receive: PartialFunction[Any, Unit] = {
+ * case Ping =>
+ * println("got a ping")
+ * reply("pong")
+ *
+ * case OneWay =>
+ * println("got a oneway")
+ *
+ * case _ =>
+ * println("unknown message, ignoring")
+ * }
+ * </pre>
+ */
+ protected def receive: PartialFunction[Any, Unit]
+
+ /**
+ * User overridable callback/setting.
+ *
+ * Optional callback method that is called during initialization.
+ * To be implemented by subclassing actor.
+ */
+ protected def init(config: AnyRef) = {}
+
+ /**
+ * User overridable callback/setting.
+ *
+ * Mandatory callback method that is called during restart and reinitialization after a server crash.
+ * To be implemented by subclassing actor.
+ */
+ protected def preRestart(reason: AnyRef, config: Option[AnyRef]) = {}
+
+ /**
+ * User overridable callback/setting.
+ *
+ * Mandatory callback method that is called during restart and reinitialization after a server crash.
+ * To be implemented by subclassing actor.
+ */
+ protected def postRestart(reason: AnyRef, config: Option[AnyRef]) = {}
+
+ /**
+ * User overridable callback/setting.
+ *
+ * Optional callback method that is called during termination.
+ * To be implemented by subclassing actor.
+ */
+ protected def initTransactionalState() = {}
+
+ /**
+ * User overridable callback/setting.
+ *
+ * Optional callback method that is called during termination.
+ * To be implemented by subclassing actor.
+ */
+ protected def shutdown {}
+
+ // =============
+ // ==== API ====
+ // =============
+
+ /**
+ * Starts up the actor and its message queue.
+ */
+ def start = synchronized {
+ if (!isRunning) {
+ messageDispatcher.start
+ isRunning = true
+ //if (isTransactional) this !! TransactionalInit
+ }
+ log.info("[%s] has started", toString)
+ }
+
+ /**
+ * Stops the actor and its message queue.
+ */
+ def stop = synchronized {
+ if (isRunning) {
+ dispatcher.unregisterHandler(this)
+ if (dispatcher.isInstanceOf[ThreadBasedDispatcher]) dispatcher.shutdown
+ // FIXME: Need to do reference count to know if EventBasedThreadPoolDispatcher and EventBasedSingleThreadDispatcher can be shut down
+ isRunning = false
+ shutdown
+ } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
+ }
+
+ /**
+ * Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
+ */
+ def !(message: AnyRef) =
+ if (isRunning) postMessageToMailbox(message)
+ else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
+
+ /**
+ * Sends a message asynchronously and waits on a future for a reply message.
+ * <p/>
+ * It waits on the reply either until it receives it (in the form of <code>Some(replyMessage)</code>)
+ * or until the timeout expires (which will return None). E.g. send-and-receive-eventually semantics.
+ * <p/>
+ * <b>NOTE:</b>
+ * If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>reply(..)</code>
+ * to send a reply message to the original sender. If not then the sender will block until the timeout expires.
+ */
+ def !: Option[T] = if (isRunning) {
+ val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout)
+ val isActiveObject = message.isInstanceOf[Invocation]
+ if (isActiveObject && message.asInstanceOf[Invocation].isVoid) future.completeWithResult(None)
+ try {
+ future.await
+ } catch {
+ case e: FutureTimeoutException =>
+ if (isActiveObject) throw e
+ else None
+ }
+ getResultOrThrowException(future)
+ } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
+
+ /**
+ * Sends a message asynchronously and waits on a future for a reply message.
+ * <p/>
+ * It waits on the reply either until it receives it (in the form of <code>Some(replyMessage)</code>)
+ * or until the timeout expires (which will return None). E.g. send-and-receive-eventually semantics.
+ * <p/>
+ * <b>NOTE:</b>
+ * If you are sending messages using <code>!!</code> then you <b>have to</b> use <code>reply(..)</code>
+ * to send a reply message to the original sender. If not then the sender will block until the timeout expires.
+ */
+ def !: Option[T] = !
+
+ /**
+ * Sends a message asynchronously, but waits on a future indefinitely. E.g. emulates a synchronous call.
+ * <p/>
+ * <b>NOTE:</b>
+ * Should be used with care (almost never), since very dangerous (will block a thread indefinitely if no reply).
+ */
+ def !?[T](message: AnyRef): T = if (isRunning) {
+ val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, 0)
+ future.awaitBlocking
+ getResultOrThrowException(future).get
+ } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
+
+ /**
+ * Use <code>reply(..)</code> to reply with a message to the original sender of the message currently
+ * being processed.
+ * <p/>
+ * <b>NOTE:</b>
+ * Does only work together with the actor <code>!!</code> method and/or active objects not annotated
+ * with <code>@oneway</code>.
+ */
+ protected[this] def reply(message: AnyRef) = senderFuture match {
+ case None => throw new IllegalStateException(
+ "\n\tNo sender in scope, can't reply. " +
+ "\n\tHave you used the '!' message send or the '@oneway' active object annotation? " +
+ "\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future that will be bound by the argument passed to 'reply'." )
+ case Some(future) => future.completeWithResult(message)
+ }
+
+ def dispatcher = messageDispatcher
+
+ /**
+ * Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
+ */
+ def dispatcher_=(dispatcher: MessageDispatcher): Unit = synchronized {
+ if (!isRunning) {
+ messageDispatcher = dispatcher
+ mailbox = messageDispatcher.messageQueue
+ messageDispatcher.registerHandler(this, new ActorMessageInvoker(this))
+ } else throw new IllegalArgumentException("Can not swap dispatcher for " + toString + " after it has been started")
+ }
+
+ /**
+ * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
+ */
+ def makeRemote(hostname: String, port: Int): Unit = remoteFlagLock.withWriteLock {
+ makeRemote(new InetSocketAddress(hostname, port))
+ }
+
+ /**
+ * Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
+ */
+ def makeRemote(address: InetSocketAddress): Unit = remoteFlagLock.withWriteLock {
+ remoteAddress = Some(address)
+ }
+
+ /**
+ * Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists.
+ * However, it will always participate in an existing transaction.
+ * If transactionality want to be completely turned off then do it by invoking:
+ * <pre/>
+ * TransactionManagement.disableTransactions
+ * </pre>
+ */
+ def makeTransactionRequired = synchronized {
+ if (isRunning) throw new IllegalArgumentException("Can not make actor transaction required after it has been started")
+ else isTransactionRequiresNew = true
+ }
+
+ /**
+ * Links an other actor to this actor. Links are unidirectional and means that a the linking actor will receive a notification nif the linked actor has crashed.
+ * If the 'trapExit' flag has been set then it will 'trap' the failure and automatically restart the linked actors according to the restart strategy defined by the 'faultHandler'.
+ * <p/>
+ * To be invoked from within the actor itself.
+ */
+ protected[this] def link(actor: Actor) = {
+ if (isRunning) {
+ linkedActors.add(actor)
+ if (actor.supervisor.isDefined) throw new IllegalStateException("Actor can only have one supervisor [" + actor + "], e.g. link(actor) fails")
+ actor.supervisor = Some(this)
+ log.debug("Linking actor [%s] to actor [%s]", actor, this)
+ } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
+ }
+
+ /**
+ * Unlink the actor.
+ * <p/>
+ * To be invoked from within the actor itself.
+ */
+ protected[this] def unlink(actor: Actor) = {
+ if (isRunning) {
+ if (!linkedActors.contains(actor)) throw new IllegalStateException("Actor [" + actor + "] is not a linked actor, can't unlink")
+ linkedActors.remove(actor)
+ actor.supervisor = None
+ log.debug("Unlinking actor [%s] from actor [%s]", actor, this)
+ } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
+ }
+
+ /**
+ * Atomically start and link an actor.
+ * <p/>
+ * To be invoked from within the actor itself.
+ */
+ protected[this] def startLink(actor: Actor) = {
+ actor.start
+ link(actor)
+ }
+
+ /**
+ * Atomically start, link and make an actor remote.
+ * <p/>
+ * To be invoked from within the actor itself.
+ */
+ protected[this] def startLinkRemote(actor: Actor, hostname: String, port: Int) = {
+ actor.makeRemote(hostname, port)
+ actor.start
+ link(actor)
+ }
+
+ /**
+ * Atomically create (from actor class) and start an actor.
+ * <p/>
+ * To be invoked from within the actor itself.
+ */
+ protected[this] def spawn[T <: Actor](actorClass: Class[T]): T = {
+ val actor = actorClass.newInstance.asInstanceOf[T]
+ if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) {
+ actor.dispatcher = dispatcher
+ actor.mailbox = mailbox
+ }
+ actor.start
+ actor
+ }
+
+ /**
+ * Atomically create (from actor class), start and make an actor remote.
+ * <p/>
+ * To be invoked from within the actor itself.
+ */
+ protected[this] def spawnRemote[T <: Actor](actorClass: Class[T], hostname: String, port: Int): T = {
+ val actor = actorClass.newInstance.asInstanceOf[T]
+ actor.makeRemote(hostname, port)
+ if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) {
+ actor.dispatcher = dispatcher
+ actor.mailbox = mailbox
+ }
+ actor.start
+ actor
+ }
+
+ /**
+ * Atomically create (from actor class), start and link an actor.
+ * <p/>
+ * To be invoked from within the actor itself.
+ */
+ protected[this] def spawnLink[T <: Actor](actorClass: Class[T]): T = {
+ val actor = spawn[T](actorClass)
+ link(actor)
+ actor
+ }
+
+ /**
+ * Atomically create (from actor class), start, link and make an actor remote.
+ * <p/>
+ * To be invoked from within the actor itself.
+ */
+ protected[this] def spawnLinkRemote[T <: Actor](actorClass: Class[T], hostname: String, port: Int): T = {
+ val actor = spawn[T](actorClass)
+ actor.makeRemote(hostname, port)
+ link(actor)
+ actor
+ }
+
+ // ================================
+ // ==== IMPLEMENTATION DETAILS ====
+ // ================================
+
+ private def postMessageToMailbox(message: AnyRef): Unit = remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime
+ if (remoteAddress.isDefined) {
+ val requestBuilder = RemoteRequest.newBuilder
+ .setId(RemoteRequestIdFactory.nextId)
+ .setTarget(this.getClass.getName)
+ .setTimeout(timeout)
+ .setIsActor(true)
+ .setIsOneWay(true)
+ .setIsEscaped(false)
+ val id = registerSupervisorAsRemoteActor
+ if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
+ RemoteProtocolBuilder.setMessage(message, requestBuilder)
+ RemoteClient.clientFor(remoteAddress.get).send(requestBuilder.build)
+ } else {
+ val handle = new MessageInvocation(this, message, None, currentTransaction.get)
+ handle.send
+ }
+ }
+
+ private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): CompletableFutureResult = remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime
+ if (remoteAddress.isDefined) {
+ val requestBuilder = RemoteRequest.newBuilder
+ .setId(RemoteRequestIdFactory.nextId)
+ .setTarget(this.getClass.getName)
+ .setTimeout(timeout)
+ .setIsActor(true)
+ .setIsOneWay(false)
+ .setIsEscaped(false)
+ RemoteProtocolBuilder.setMessage(message, requestBuilder)
+ val id = registerSupervisorAsRemoteActor
+ if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
+ val future = RemoteClient.clientFor(remoteAddress.get).send(requestBuilder.build)
+ if (future.isDefined) future.get
+ else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
+ } else {
+ val future = new DefaultCompletableFutureResult(timeout)
+ val handle = new MessageInvocation(this, message, Some(future), currentTransaction.get)
+ handle.send
+ future
+ }
+ }
+
+ /**
+ * Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods
+ */
+ private[akka] def invoke(messageHandle: MessageInvocation) = synchronized {
+ if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
+ else dispatch(messageHandle)
+ }
+
+ private def dispatch[T](messageHandle: MessageInvocation) = {
+ setTransaction(messageHandle.tx)
+
+ val message = messageHandle.message //serializeMessage(messageHandle.message)
+ val future = messageHandle.future
+ try {
+ senderFuture = future
+ if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function
+ else throw new IllegalArgumentException("No handler matching message [" + message + "] in " + toString)
+ } catch {
+ case e =>
+ // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
+ if (supervisor.isDefined) supervisor.get ! Exit(this, e)
+ if (future.isDefined) future.get.completeWithException(this, e)
+ else e.printStackTrace
+ } finally {
+ clearTransaction
+ }
+ }
+
+ private def transactionalDispatch[T](messageHandle: MessageInvocation) = {
+ setTransaction(messageHandle.tx)
+
+ val message = messageHandle.message //serializeMessage(messageHandle.message)
+ val future = messageHandle.future
+
+ def proceed = {
+ try {
+ incrementTransaction
+ if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function
+ else throw new IllegalArgumentException("Actor " + toString + " could not process message [" + message + "] since no matching 'case' clause in its 'receive' method could be found")
+ } finally {
+ decrementTransaction
+ }
+ }
+
+ try {
+ senderFuture = future
+ if (isTransactionRequiresNew && !isTransactionInScope) {
+ if (senderFuture.isEmpty) throw new StmException(
+ "\n\tCan't continue transaction in a one-way fire-forget message send" +
+ "\n\tE.g. using Actor '!' method or Active Object 'void' method" +
+ "\n\tPlease use the Actor '!!', '!?' methods or Active Object method with non-void return type")
+ atomic {
+ proceed
+ }
+ } else proceed
+ } catch {
+ case e =>
+ e.printStackTrace
+
+ if (future.isDefined) future.get.completeWithException(this, e)
+ else e.printStackTrace
+
+ clearTransaction // need to clear currentTransaction before call to supervisor
+
+ // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
+ if (supervisor.isDefined) supervisor.get ! Exit(this, e)
+ } finally {
+ clearTransaction
+ }
+ }
+
+ private def getResultOrThrowException[T](future: FutureResult): Option[T] =
+ if (future.exception.isDefined) throw future.exception.get._2
+ else future.result.asInstanceOf[Option[T]]
+
+ private def base: PartialFunction[Any, Unit] = lifeCycle orElse (hotswap getOrElse receive)
+
+ private val lifeCycle: PartialFunction[Any, Unit] = {
+ case Init(config) => init(config)
+ case HotSwap(code) => hotswap = code
+ case Restart(reason) => restart(reason)
+ case Exit(dead, reason) => handleTrapExit(dead, reason)
+// case TransactionalInit => initTransactionalState
+ }
+
+ private[this] def handleTrapExit(dead: Actor, reason: Throwable): Unit = {
+ if (trapExit) {
+ if (faultHandler.isDefined) {
+ faultHandler.get match {
+ // FIXME: implement support for maxNrOfRetries and withinTimeRange in RestartStrategy
+ case AllForOneStrategy(maxNrOfRetries, withinTimeRange) => restartLinkedActors(reason)
+ case OneForOneStrategy(maxNrOfRetries, withinTimeRange) => dead.restart(reason)
+ }
+ } else throw new IllegalStateException("No 'faultHandler' defined for actor with the 'trapExit' flag set to true - can't proceed " + toString)
+ } else {
+ if (supervisor.isDefined) supervisor.get ! Exit(dead, reason) // if 'trapExit' is not defined then pass the Exit on
+ }
+ }
+
+ private[this] def restartLinkedActors(reason: AnyRef) =
+ linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach(_.restart(reason))
+
+ private[Actor] def restart(reason: AnyRef) = synchronized {
+ lifeCycleConfig match {
+ case None => throw new IllegalStateException("Actor [" + id + "] does not have a life-cycle defined.")
+
+ // FIXME implement support for shutdown time
+ case Some(LifeCycle(scope, shutdownTime, _)) => {
+ scope match {
+ case Permanent => {
+ preRestart(reason, config)
+ log.info("Restarting actor [%s] configured as PERMANENT.", id)
+ postRestart(reason, config)
+ }
+
+ case Temporary =>
+ // FIXME handle temporary actors correctly - restart if exited normally
+// if (reason == 'normal) {
+// log.debug("Restarting actor [%s] configured as TEMPORARY (since exited naturally).", id)
+// scheduleRestart
+// } else
+ log.info("Actor [%s] configured as TEMPORARY will not be restarted (received unnatural exit message).", id)
+
+ case Transient =>
+ log.info("Actor [%s] configured as TRANSIENT will not be restarted.", id)
+ }
+ }
+ }
+ }
+
+ private[akka] def registerSupervisorAsRemoteActor: Option[String] = synchronized {
+ if (supervisor.isDefined) {
+ RemoteClient.clientFor(remoteAddress.get).registerSupervisorForActor(this)
+ Some(supervisor.get.uuid)
+ } else None
+ }
+
+
+ private[akka] def swapDispatcher(disp: MessageDispatcher) = synchronized {
+ messageDispatcher = disp
+ mailbox = messageDispatcher.messageQueue
+ messageDispatcher.registerHandler(this, new ActorMessageInvoker(this))
+ }
+
+ private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) {
+ if (!message.isInstanceOf[String] &&
+ !message.isInstanceOf[Byte] &&
+ !message.isInstanceOf[Int] &&
+ !message.isInstanceOf[Long] &&
+ !message.isInstanceOf[Float] &&
+ !message.isInstanceOf[Double] &&
+ !message.isInstanceOf[Boolean] &&
+ !message.isInstanceOf[Char] &&
+ !message.isInstanceOf[Tuple2[_,_]] &&
+ !message.isInstanceOf[Tuple3[_,_,_]] &&
+ !message.isInstanceOf[Tuple4[_,_,_,_]] &&
+ !message.isInstanceOf[Tuple5[_,_,_,_,_]] &&
+ !message.isInstanceOf[Tuple6[_,_,_,_,_,_]] &&
+ !message.isInstanceOf[Tuple7[_,_,_,_,_,_,_]] &&
+ !message.isInstanceOf[Tuple8[_,_,_,_,_,_,_,_]] &&
+ !message.getClass.isArray &&
+ !message.isInstanceOf[List[_]] &&
+ !message.isInstanceOf[scala.collection.immutable.Map[_,_]] &&
+ !message.isInstanceOf[scala.collection.immutable.Set[_]] &&
+ !message.isInstanceOf[scala.collection.immutable.Tree[_,_]] &&
+ !message.getClass.isAnnotationPresent(Annotations.immutable)) {
+ Serializer.Java.deepClone(message)
+ } else message
+ } else message
+
+ override def toString(): String = "Actor[" + uuid + ":" + id + "]"
+}
+
+
+
+
diff --git a/docs/scaladocs-akka-actors/actor/ActorRegistry.scala.html b/docs/scaladocs-akka-actors/actor/ActorRegistry.scala.html
new file mode 100644
index 0000000000..ef16dc7115
--- /dev/null
+++ b/docs/scaladocs-akka-actors/actor/ActorRegistry.scala.html
@@ -0,0 +1,50 @@
+
+
+
+
+
+
+
+
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.actor
+
+import se.scalablesolutions.akka.util.Logging
+
+import scala.collection.mutable.HashMap
+
+/**
+ * Registry holding all actor instances, mapped by class.
+ *
+ * @author <a href="http://jonasboner.com">Jonas Bonér</a>
+ */
+object ActorRegistry extends Logging {
+ private val actors = new HashMap[String, List[Actor]]
+
+ def actorsFor(clazz: Class[_]): List[Actor] = actorsFor(clazz.getName)
+
+ def actorsFor(fqn : String): List[Actor] = synchronized {
+ actors.get(fqn) match {
+ case None => Nil
+ case Some(instances) => instances
+ }
+ }
+
+ def register(actor: Actor) = synchronized {
+ val name = actor.getClass.getName
+ actors.get(name) match {
+ case Some(instances) => actors + (name -> (actor :: instances))
+ case None => actors + (name -> (actor :: Nil))
+ }
+ }
+}
+
+
+
+
diff --git a/docs/scaladocs-akka-actors/actor/Scheduler.scala.html b/docs/scaladocs-akka-actors/actor/Scheduler.scala.html
new file mode 100644
index 0000000000..84db8c17dd
--- /dev/null
+++ b/docs/scaladocs-akka-actors/actor/Scheduler.scala.html
@@ -0,0 +1,104 @@
+
+
+
+
+
+
+
+
+/*
+ * Copyright 2007 WorldWide Conferencing, LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package se.scalablesolutions.akka.actor
+
+import java.util.concurrent._
+import config.ScalaConfig._
+import _root_.se.scalablesolutions.akka.util.{Logging}
+
+
+import org.scala_tools.javautils.Imports._
+
+case object UnSchedule
+case class SchedulerException(msg: String, e: Throwable) extends RuntimeException(msg, e)
+
+/**
+ * Rework of David Pollak's ActorPing class in the Lift Project
+ * which is licensed under the Apache 2 License.
+ */
+class ScheduleActor(val receiver: Actor, val future: ScheduledFuture[AnyRef]) extends Actor with Logging {
+ lifeCycleConfig = Some(LifeCycle(Permanent, 100))
+
+ def receive: PartialFunction[Any, Unit] = {
+ case UnSchedule =>
+ Scheduler.stopSupervising(this)
+ future.cancel(true)
+ stop
+ }
+}
+
+object Scheduler extends Actor {
+ private var service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
+ private val schedulers = new ConcurrentHashMap[Actor, Actor]
+ faultHandler = Some(OneForOneStrategy(5, 5000))
+ trapExit = true
+ start
+
+ def schedule(receiver: Actor, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit) = {
+ try {
+ startLink(new ScheduleActor(
+ receiver,
+ service.scheduleAtFixedRate(new java.lang.Runnable {
+ def run = receiver ! message;
+ }, initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]))
+ } catch {
+ case e => throw SchedulerException(message + " could not be scheduled on " + receiver, e)
+ }
+ }
+
+ def restart = service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
+
+ def stopSupervising(actor: Actor) = {
+ unlink(actor)
+ schedulers.remove(actor)
+ }
+
+ override def shutdown = {
+ schedulers.values.asScala.foreach(_ ! UnSchedule)
+ service.shutdown
+ }
+
+ def receive: PartialFunction[Any, Unit] = {
+ case _ => {} // ignore all messages
+ }
+}
+
+private object SchedulerThreadFactory extends ThreadFactory {
+ private var count = 0
+ val threadFactory = Executors.defaultThreadFactory()
+
+ def newThread(r: Runnable): Thread = {
+ val thread = threadFactory.newThread(r)
+ thread.setName("Scheduler-" + count)
+ thread.setDaemon(true)
+ thread
+ }
+}
+
+
+
+
+
+
diff --git a/docs/scaladocs-akka-actors/actor/Supervisor.scala.html b/docs/scaladocs-akka-actors/actor/Supervisor.scala.html
new file mode 100644
index 0000000000..2c23d063ff
--- /dev/null
+++ b/docs/scaladocs-akka-actors/actor/Supervisor.scala.html
@@ -0,0 +1,168 @@
+
+
+
+
+
+
+
+
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.actor
+
+import se.scalablesolutions.akka.config.ScalaConfig._
+import se.scalablesolutions.akka.config.{ConfiguratorRepository, Configurator}
+import se.scalablesolutions.akka.util.Helpers._
+import se.scalablesolutions.akka.util.Logging
+import se.scalablesolutions.akka.dispatch.Dispatchers
+
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.mutable.HashMap
+
+/**
+ * Messages that the supervisor responds to and returns.
+ *
+ * @author <a href="http://jonasboner.com">Jonas Bonér</a>
+ */
+sealed abstract class SupervisorMessage
+case object StartSupervisor extends SupervisorMessage
+case object StopSupervisor extends SupervisorMessage
+case class ConfigureSupervisor(config: SupervisorConfig, factory: SupervisorFactory) extends SupervisorMessage
+case object ConfigSupervisorSuccess extends SupervisorMessage
+
+sealed abstract class FaultHandlingStrategy
+case class AllForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends FaultHandlingStrategy
+case class OneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends FaultHandlingStrategy
+
+/**
+ * Abstract base class for all supervisor factories.
+ * <p>
+ * Example usage:
+ * <pre>
+ * class MySupervisorFactory extends SupervisorFactory {
+ *
+ * override protected def getSupervisorConfig: SupervisorConfig = {
+ * SupervisorConfig(
+ * RestartStrategy(OneForOne, 3, 10),
+ * Supervise(
+ * myFirstActor,
+ * LifeCycle(Permanent, 1000))
+ * ::
+ * Supervise(
+ * mySecondActor,
+ * LifeCycle(Permanent, 1000))
+ * :: Nil)
+ * }
+ * }
+ * </pre>
+ *
+ * Then create a concrete factory in which we mix in support for the specific implementation of the Service we want to use.
+ *
+ * <pre>
+ * object factory extends MySupervisorFactory
+ * </pre>
+ *
+ * Then create a new Supervisor tree with the concrete Services we have defined.
+ *
+ * <pre>
+ * val supervisor = factory.newSupervisor
+ * supervisor ! Start // start up all managed servers
+ * </pre>
+ *
+ * @author <a href="http://jonasboner.com">Jonas Bonér</a>
+ */
+abstract class SupervisorFactory extends Logging {
+ def newSupervisor: Supervisor = newSupervisorFor(getSupervisorConfig)
+
+ def newSupervisorFor(config: SupervisorConfig): Supervisor = config match {
+ case SupervisorConfig(restartStrategy, _) =>
+ val supervisor = create(restartStrategy)
+ supervisor.start
+ supervisor.configure(config, this)
+ supervisor
+ }
+
+ /**
+ * To be overridden by concrete factory.
+ * Should return the SupervisorConfig for the supervisor.
+ */
+ protected def getSupervisorConfig: SupervisorConfig
+
+ protected def create(strategy: RestartStrategy): Supervisor = strategy match {
+ case RestartStrategy(scheme, maxNrOfRetries, timeRange) =>
+ scheme match {
+ case AllForOne => new Supervisor(AllForOneStrategy(maxNrOfRetries, timeRange))
+ case OneForOne => new Supervisor(OneForOneStrategy(maxNrOfRetries, timeRange))
+ }
+ }
+}
+
+/**
+ * <b>NOTE:</b>
+ * <p/>
+ * The supervisor class is only used for the configuration system when configuring supervisor hierarchies declaratively.
+ * Should not be used in development. Instead wire the actors together using 'link', 'spawnLink' etc. and set the 'trapExit'
+ * flag in the actors that should trap error signals and trigger restart.
+ * <p/>
+ * See the ScalaDoc for the SupervisorFactory for an example on how to declaratively wire up actors.
+ *
+ * @author <a href="http://jonasboner.com">Jonas Bonér</a>
+ */
+class Supervisor private[akka] (handler: FaultHandlingStrategy) extends Actor with Logging with Configurator {
+ trapExit = true
+ faultHandler = Some(handler)
+ //dispatcher = Dispatchers.newThreadBasedDispatcher(this)
+
+ val actors = new ConcurrentHashMap[String, Actor]
+
+ def getInstance[T](clazz: Class[T]) = actors.get(clazz.getName).asInstanceOf[T]
+
+ def getComponentInterfaces: List[Class[_]] = actors.values.toArray.toList.map(_.getClass)
+
+ def isDefined(clazz: Class[_]): Boolean = actors.containsKey(clazz.getName)
+
+ def startSupervisor = {
+ ConfiguratorRepository.registerConfigurator(this)
+ actors.values.toArray.toList.foreach(println)
+ start
+ this ! StartSupervisor
+ }
+
+ def stopSupervisor = this ! StopSupervisor
+
+ protected def receive: PartialFunction[Any, Unit] = {
+ case StartSupervisor =>
+ linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor => actor.start; log.info("Starting actor: %s", actor) }
+
+ case StopSupervisor =>
+ linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach { actor => actor.stop; log.info("Stopping actor: %s", actor) }
+ log.info("Stopping supervisor: %s", this)
+ stop
+ }
+
+ def configure(config: SupervisorConfig, factory: SupervisorFactory) = config match {
+ case SupervisorConfig(_, servers) =>
+ servers.map(server =>
+ server match {
+ case Supervise(actor, lifecycle) =>
+ actors.put(actor.getClass.getName, actor)
+ actor.lifeCycleConfig = Some(lifecycle)
+ startLink(actor)
+
+ case SupervisorConfig(_, _) => // recursive configuration
+ val supervisor = factory.newSupervisorFor(server.asInstanceOf[SupervisorConfig])
+ supervisor ! StartSupervisor
+ // FIXME what to do with recursively supervisors?
+ })
+ }
+}
+
+
+
+
diff --git a/docs/scaladocs-akka-actors/all-classes.css b/docs/scaladocs-akka-actors/all-classes.css
new file mode 100644
index 0000000000..e25638b37d
--- /dev/null
+++ b/docs/scaladocs-akka-actors/all-classes.css
@@ -0,0 +1,13 @@
+body{font-size:10pt;font-family:sans-serif;}
+h2{background-color:#EEE;border:1px solid #999;color:#900;font-family:sans-serif;font-weight:bold;padding:.3em;}
+a{text-decoration:none;}
+div.ctrl{text-align:center;}
+select#packagesFilter,input#nameFilter{width:100%;}
+#classes{margin-left:0;padding-left:0;list-style:none;}
+.trait,.object,.class{padding-left:17px;background-repeat:no-repeat;background-position:0 0;}
+.trait{color:#5C4AA0;background-image:url(_images/trait.png);font-style:italic;}
+.class{color:#33814B;background-image:url(_images/class.png);}
+.object{color:#892020;background-image:url(_images/object.png);}
+#kindFilters *{font-size:75%;font-weight:bold;font-style:normal;}
+#classes a:active,#classes a:hover{color:#900;text-decoration:underline;}
+#classes a:link,#classes a:visited{color:#009;font-family:sans-serif;text-decoration:none;}
\ No newline at end of file
diff --git a/docs/scaladocs-akka-actors/all-classes.html b/docs/scaladocs-akka-actors/all-classes.html
new file mode 100644
index 0000000000..6bc73b5105
--- /dev/null
+++ b/docs/scaladocs-akka-actors/all-classes.html
@@ -0,0 +1,42 @@
+
+
+
+
+
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.config
+
+import JavaConfig._
+
+import com.google.inject._
+
+import java.util._
+//import org.apache.camel.impl.{JndiRegistry, DefaultCamelContext}
+//import org.apache.camel.{Endpoint, Routes}
+
+/**
+ * Configurator for the Active Objects. Used to do declarative configuration of supervision.
+ * It also doing dependency injection with and into Active Objects using dependency injection
+ * frameworks such as Google Guice or Spring.
+ * <p/>
+ * If you don't want declarative configuration then you should use the <code>ActiveObject</code>
+ * factory methods.
+ *
+ * @author <a href="http://jonasboner.com">Jonas Bonér</a>
+ */
+class ActiveObjectConfigurator {
+ // TODO: make pluggable once we have f.e a SpringConfigurator
+ private val INSTANCE = new ActiveObjectGuiceConfigurator
+
+ /**
+ * Returns the active abject that has been put under supervision for the class specified.
+ *
+ * @param clazz the class for the active object
+ * @return the active object for the class
+ */
+ def getInstance[T](clazz: Class[T]): T = INSTANCE.getInstance(clazz)
+
+ def configure(restartStrategy: RestartStrategy, components: Array[Component]): ActiveObjectConfigurator = {
+ INSTANCE.configure(
+ restartStrategy.transform,
+ components.toList.asInstanceOf[scala.List[Component]].map(_.transform))
+ this
+ }
+
+ def inject: ActiveObjectConfigurator = {
+ INSTANCE.inject
+ this
+ }
+
+ def supervise: ActiveObjectConfigurator = {
+ INSTANCE.supervise
+ this
+ }
+
+ def addExternalGuiceModule(module: Module): ActiveObjectConfigurator = {
+ INSTANCE.addExternalGuiceModule(module)
+ this
+ }
+
+ //def addRoutes(routes: Routes): ActiveObjectConfigurator = {
+ // INSTANCE.addRoutes(routes)
+ // this
+ // }
+
+
+ def getComponentInterfaces: List[Class[_]] = {
+ val al = new ArrayList[Class[_]]
+ for (c <- INSTANCE.getComponentInterfaces) al.add(c)
+ al
+ }
+
+ def getExternalDependency[T](clazz: Class[T]): T = INSTANCE.getExternalDependency(clazz)
+
+ //def getRoutingEndpoint(uri: String): Endpoint = INSTANCE.getRoutingEndpoint(uri)
+
+ //def getRoutingEndpoints: java.util.Collection[Endpoint] = INSTANCE.getRoutingEndpoints
+
+ //def getRoutingEndpoints(uri: String): java.util.Collection[Endpoint] = INSTANCE.getRoutingEndpoints(uri)
+
+ // TODO: should this be exposed?
+ def getGuiceModules: List[Module] = INSTANCE.getGuiceModules
+
+ def reset = INSTANCE.reset
+
+ def stop = INSTANCE.stop
+}
+
+
+
+