Merge branch 'master' into derekjw-future-dispatch

This commit is contained in:
Derek Williams 2011-03-07 18:57:08 -07:00
commit eee9445be8
17 changed files with 1968 additions and 1110 deletions

View file

@ -23,15 +23,13 @@ import java.net.{InetAddress, UnknownHostException}
import AkkaException._ import AkkaException._
val exceptionName = getClass.getName val exceptionName = getClass.getName
lazy val uuid = "%s_%s".format(hostname, newUuid) val uuid = "%s_%s".format(hostname, newUuid)
override lazy val toString = "%s\n\t[%s]\n\t%s\n\t%s".format(exceptionName, uuid, message, stackTrace) override val toString = "%s\n\t[%s]\n\t%s\n\t%s".format(exceptionName, uuid, message, {
lazy val stackTrace = {
val sw = new StringWriter val sw = new StringWriter
printStackTrace(new PrintWriter(sw)) printStackTrace(new PrintWriter(sw))
sw.toString sw.toString
} })
} }
object AkkaException { object AkkaException {

View file

@ -7,6 +7,7 @@ package akka.actor
import akka.dispatch._ import akka.dispatch._
import akka.config.Config._ import akka.config.Config._
import akka.config.Supervision._ import akka.config.Supervision._
import akka.config.ConfigurationException
import akka.util.Helpers.{narrow, narrowSilently} import akka.util.Helpers.{narrow, narrowSilently}
import akka.util.ListenerManagement import akka.util.ListenerManagement
import akka.AkkaException import akka.AkkaException
@ -78,16 +79,16 @@ class ActorTimeoutException private[akka](message: String) extends AkkaEx
* *
* Create, add and remove a listener: * Create, add and remove a listener:
* <pre> * <pre>
* val errorHandlerEventListener = new Actor { * val errorHandlerEventListener = Actor.actorOf(new Actor {
* self.dispatcher = EventHandler.EventHandlerDispatcher * self.dispatcher = EventHandler.EventHandlerDispatcher
* *
* def receive = { * def receive = {
* case EventHandler.Error(cause, instance, message) => ... * case EventHandler.Error(cause, instance, message) => ...
* case EventHandler.Warning(cause, instance, message) => ... * case EventHandler.Warning(instance, message) => ...
* case EventHandler.Info(instance, message) => ... * case EventHandler.Info(instance, message) => ...
* case EventHandler.Debug(instance, message) => ... * case EventHandler.Debug(instance, message) => ...
* } * }
* } * })
* *
* EventHandler.addListener(errorHandlerEventListener) * EventHandler.addListener(errorHandlerEventListener)
* ... * ...
@ -96,7 +97,7 @@ class ActorTimeoutException private[akka](message: String) extends AkkaEx
* *
* Log an error event: * Log an error event:
* <pre> * <pre>
* EventHandler notifyListeners EventHandler.Error(exception, this, message.toString) * EventHandler.notifyListeners(EventHandler.Error(exception, this, message.toString))
* </pre> * </pre>
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
@ -110,12 +111,12 @@ object EventHandler extends ListenerManagement {
val thread: Thread = Thread.currentThread val thread: Thread = Thread.currentThread
} }
case class Error(cause: Throwable, instance: AnyRef, message: String = "") extends Event case class Error(cause: Throwable, instance: AnyRef, message: String = "") extends Event
case class Warning(cause: Throwable, instance: AnyRef, message: String = "") extends Event case class Warning(instance: AnyRef, message: String = "") extends Event
case class Info(instance: AnyRef, message: String = "") extends Event case class Info(instance: AnyRef, message: String = "") extends Event
case class Debug(instance: AnyRef, message: String = "") extends Event case class Debug(instance: AnyRef, message: String = "") extends Event
val error = "[ERROR] [%s] [%s] [%s] %s\n%s".intern val error = "[ERROR] [%s] [%s] [%s] %s\n%s".intern
val warning = "[WARN] [%s] [%s] [%s] %s\n%s".intern val warning = "[WARN] [%s] [%s] [%s] %s".intern
val info = "[INFO] [%s] [%s] [%s] %s".intern val info = "[INFO] [%s] [%s] [%s] %s".intern
val debug = "[DEBUG] [%s] [%s] [%s] %s".intern val debug = "[DEBUG] [%s] [%s] [%s] %s".intern
val ID = "default:error:handler".intern val ID = "default:error:handler".intern
@ -143,13 +144,12 @@ object EventHandler extends ListenerManagement {
instance.getClass.getSimpleName, instance.getClass.getSimpleName,
message, message,
stackTraceFor(cause))) stackTraceFor(cause)))
case event @ Warning(cause, instance, message) => case event @ Warning(instance, message) =>
println(warning.format( println(warning.format(
formattedTimestamp, formattedTimestamp,
event.thread.getName, event.thread.getName,
instance.getClass.getSimpleName, instance.getClass.getSimpleName,
message, message))
stackTraceFor(cause)))
case event @ Info(instance, message) => case event @ Info(instance, message) =>
println(info.format( println(info.format(
formattedTimestamp, formattedTimestamp,
@ -165,9 +165,19 @@ object EventHandler extends ListenerManagement {
case _ => {} case _ => {}
} }
} }
if (config.getBool("akka.default-error-handler", true)) config.getList("akka.event-handlers") foreach { listenerName =>
addListener(Actor.actorOf[DefaultListener].start) // FIXME configurable in config (on/off) try {
val clazz = Thread.currentThread.getContextClassLoader.loadClass(listenerName).asInstanceOf[Class[_]]
addListener(Actor.actorOf(clazz.asInstanceOf[Class[_ <: Actor]]).start)
} catch {
case e: Exception =>
e.printStackTrace
new ConfigurationException(
"Event Handler specified in config can't be loaded [" + listenerName +
"] due to [" + e.toString + "]")
}
}
} }
/** /**

View file

@ -7,11 +7,10 @@ package akka.dispatch
import akka.actor.{Actor, ActorRef} import akka.actor.{Actor, ActorRef}
import akka.actor.newUuid import akka.actor.newUuid
import akka.config.Config._ import akka.config.Config._
import akka.util.{Duration} import akka.util.{Duration,ReflectiveAccess}
import akka.config.ConfigMap import akka.config.ConfigMap
import java.util.concurrent.ThreadPoolExecutor.{AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy}
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
/** /**
@ -165,6 +164,7 @@ object Dispatchers {
* type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable * type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
* # (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven, * # (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven,
* # GlobalExecutorBasedEventDriven * # GlobalExecutorBasedEventDriven
* # A FQCN to a class inheriting MessageDispatcherConfigurator with a no-arg visible constructor
* keep-alive-time = 60 # Keep alive time for threads * keep-alive-time = 60 # Keep alive time for threads
* core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor) * core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor)
* max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor) * max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor)
@ -178,54 +178,52 @@ object Dispatchers {
* Gotcha: Only configures the dispatcher if possible * Gotcha: Only configures the dispatcher if possible
* Returns: None if "type" isn't specified in the config * Returns: None if "type" isn't specified in the config
* Throws: IllegalArgumentException if the value of "type" is not valid * Throws: IllegalArgumentException if the value of "type" is not valid
* IllegalArgumentException if it cannot
*/ */
def from(cfg: ConfigMap): Option[MessageDispatcher] = { def from(cfg: ConfigMap): Option[MessageDispatcher] = {
lazy val name = cfg.getString("name", newUuid.toString) cfg.getString("type") map {
case "ExecutorBasedEventDriven" => new ExecutorBasedEventDrivenDispatcherConfigurator()
def configureThreadPool(createDispatcher: => (ThreadPoolConfig) => MessageDispatcher): ThreadPoolConfigDispatcherBuilder = { case "ExecutorBasedEventDrivenWorkStealing" => new ExecutorBasedEventDrivenWorkStealingDispatcherConfigurator()
import ThreadPoolConfigDispatcherBuilder.conf_? case "GlobalExecutorBasedEventDriven" => GlobalExecutorBasedEventDrivenDispatcherConfigurator
case fqn =>
//Apply the following options to the config if they are present in the cfg ReflectiveAccess.getClassFor[MessageDispatcherConfigurator](fqn) match {
ThreadPoolConfigDispatcherBuilder(createDispatcher,ThreadPoolConfig()).configure( case Some(clazz) =>
conf_?(cfg getInt "keep-alive-time" )(time => _.setKeepAliveTime(Duration(time, TIME_UNIT))), val instance = ReflectiveAccess.createInstance[MessageDispatcherConfigurator](clazz, Array[Class[_]](), Array[AnyRef]())
conf_?(cfg getDouble "core-pool-size-factor")(factor => _.setCorePoolSizeFromFactor(factor)), if (instance.isEmpty)
conf_?(cfg getDouble "max-pool-size-factor" )(factor => _.setMaxPoolSizeFromFactor(factor)), throw new IllegalArgumentException("Cannot instantiate MessageDispatcherConfigurator type [%s], make sure it has a default no-args constructor" format fqn)
conf_?(cfg getInt "executor-bounds" )(bounds => _.setExecutorBounds(bounds)), else
conf_?(cfg getBool "allow-core-timeout" )(allow => _.setAllowCoreThreadTimeout(allow)), instance.get
conf_?(cfg getString "rejection-policy" map { case None =>
case "abort" => new AbortPolicy() throw new IllegalArgumentException("Unknown MessageDispatcherConfigurator type [%s]" format fqn)
case "caller-runs" => new CallerRunsPolicy() }
case "discard-oldest" => new DiscardOldestPolicy() } map {
case "discard" => new DiscardPolicy() _ configure cfg
case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x) }
})(policy => _.setRejectionPolicy(policy)))
}
lazy val mailboxType: MailboxType = {
val capacity = cfg.getInt("mailbox-capacity", MAILBOX_CAPACITY)
// FIXME how do we read in isBlocking for mailbox? Now set to 'false'.
if (capacity < 0) UnboundedMailbox()
else BoundedMailbox(false, capacity, Duration(cfg.getInt("mailbox-push-timeout-time", MAILBOX_PUSH_TIME_OUT.toMillis.toInt), TIME_UNIT))
}
cfg.getString("type") map {
case "ExecutorBasedEventDriven" =>
configureThreadPool(threadPoolConfig => new ExecutorBasedEventDrivenDispatcher(
name,
cfg.getInt("throughput", THROUGHPUT),
cfg.getInt("throughput-deadline-time", THROUGHPUT_DEADLINE_TIME_MILLIS),
mailboxType,
threadPoolConfig)).build
case "ExecutorBasedEventDrivenWorkStealing" =>
configureThreadPool(threadPoolConfig => new ExecutorBasedEventDrivenWorkStealingDispatcher(
name,
cfg.getInt("throughput", THROUGHPUT),
cfg.getInt("throughput-deadline-time", THROUGHPUT_DEADLINE_TIME_MILLIS),
mailboxType,
threadPoolConfig)).build
case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher
case unknown => throw new IllegalArgumentException("Unknown dispatcher type [%s]" format unknown)
}
} }
} }
object GlobalExecutorBasedEventDrivenDispatcherConfigurator extends MessageDispatcherConfigurator {
def configure(config: ConfigMap): MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
}
class ExecutorBasedEventDrivenDispatcherConfigurator extends MessageDispatcherConfigurator {
def configure(config: ConfigMap): MessageDispatcher = {
configureThreadPool(config, threadPoolConfig => new ExecutorBasedEventDrivenDispatcher(
config.getString("name", newUuid.toString),
config.getInt("throughput", Dispatchers.THROUGHPUT),
config.getInt("throughput-deadline-time", Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS),
mailboxType(config),
threadPoolConfig)).build
}
}
class ExecutorBasedEventDrivenWorkStealingDispatcherConfigurator extends MessageDispatcherConfigurator {
def configure(config: ConfigMap): MessageDispatcher = {
configureThreadPool(config, threadPoolConfig => new ExecutorBasedEventDrivenWorkStealingDispatcher(
config.getString("name", newUuid.toString),
config.getInt("throughput", Dispatchers.THROUGHPUT),
config.getInt("throughput-deadline-time", Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS),
mailboxType(config),
threadPoolConfig)).build
}
}

View file

@ -141,7 +141,7 @@ class ExecutorBasedEventDrivenDispatcher(
executorService.get() execute mbox executorService.get() execute mbox
} catch { } catch {
case e: RejectedExecutionException => case e: RejectedExecutionException =>
EventHandler notifyListeners EventHandler.Warning(e, this, _name) EventHandler notifyListeners EventHandler.Warning(this, e.toString)
mbox.dispatcherLock.unlock() mbox.dispatcherLock.unlock()
throw e throw e
} }

View file

@ -6,8 +6,10 @@ package akka.dispatch
import java.util.concurrent._ import java.util.concurrent._
import atomic. {AtomicInteger, AtomicBoolean, AtomicReference, AtomicLong} import atomic. {AtomicInteger, AtomicBoolean, AtomicReference, AtomicLong}
import akka.config.ConfigMap
import akka.util.{Switch, ReentrantGuard, HashCode, ReflectiveAccess} import akka.config.Config.TIME_UNIT
import akka.util.{Duration, Switch, ReentrantGuard, HashCode, ReflectiveAccess}
import java.util.concurrent.ThreadPoolExecutor.{AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy}
import akka.actor._ import akka.actor._
/** /**
@ -202,3 +204,36 @@ trait MessageDispatcher {
*/ */
def mailboxSize(actorRef: ActorRef): Int def mailboxSize(actorRef: ActorRef): Int
} }
/**
* Trait to be used for hooking in new dispatchers into Dispatchers.fromConfig
*/
abstract class MessageDispatcherConfigurator {
def configure(config: ConfigMap): MessageDispatcher
def mailboxType(config: ConfigMap): MailboxType = {
val capacity = config.getInt("mailbox-capacity", Dispatchers.MAILBOX_CAPACITY)
// FIXME how do we read in isBlocking for mailbox? Now set to 'false'.
if (capacity < 0) UnboundedMailbox()
else BoundedMailbox(false, capacity, Duration(config.getInt("mailbox-push-timeout-time", Dispatchers.MAILBOX_PUSH_TIME_OUT.toMillis.toInt), TIME_UNIT))
}
def configureThreadPool(config: ConfigMap, createDispatcher: => (ThreadPoolConfig) => MessageDispatcher): ThreadPoolConfigDispatcherBuilder = {
import ThreadPoolConfigDispatcherBuilder.conf_?
//Apply the following options to the config if they are present in the config
ThreadPoolConfigDispatcherBuilder(createDispatcher,ThreadPoolConfig()).configure(
conf_?(config getInt "keep-alive-time" )(time => _.setKeepAliveTime(Duration(time, TIME_UNIT))),
conf_?(config getDouble "core-pool-size-factor")(factor => _.setCorePoolSizeFromFactor(factor)),
conf_?(config getDouble "max-pool-size-factor" )(factor => _.setMaxPoolSizeFromFactor(factor)),
conf_?(config getInt "executor-bounds" )(bounds => _.setExecutorBounds(bounds)),
conf_?(config getBool "allow-core-timeout" )(allow => _.setAllowCoreThreadTimeout(allow)),
conf_?(config getString "rejection-policy" map {
case "abort" => new AbortPolicy()
case "caller-runs" => new CallerRunsPolicy()
case "discard-oldest" => new DiscardOldestPolicy()
case "discard" => new DiscardPolicy()
case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x)
})(policy => _.setRejectionPolicy(policy)))
}
}

View file

@ -208,7 +208,7 @@ class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extend
}) })
} catch { } catch {
case e: RejectedExecutionException => case e: RejectedExecutionException =>
EventHandler notifyListeners EventHandler.Warning(e, this) EventHandler notifyListeners EventHandler.Warning(this, e.toString)
semaphore.release semaphore.release
case e: Throwable => case e: Throwable =>
EventHandler notifyListeners EventHandler.Error(e, this) EventHandler notifyListeners EventHandler.Error(e, this)

View file

@ -216,7 +216,33 @@ abstract class RemoteSupport extends ListenerManagement with RemoteServerModule
private[akka] val typedActorsFactories = new ConcurrentHashMap[String, () => AnyRef] private[akka] val typedActorsFactories = new ConcurrentHashMap[String, () => AnyRef]
def clear { def clear {
List(actors,actorsByUuid,actorsFactories,typedActors,typedActorsByUuid,typedActorsFactories) foreach (_.clear) def clearActorMap(map: ConcurrentHashMap[String, ActorRef]) {
val i = map.values.iterator
while (i.hasNext) {
i.next match {
case ref: LocalActorRef => try { ref.stop } catch { case e: Exception => }
case _ =>
}
}
map.clear
}
def clearTypedActorMap(map: ConcurrentHashMap[String, AnyRef]) {
ReflectiveAccess.TypedActorModule.typedActorObjectInstance foreach {
case typedActor =>
val i = map.values.iterator
//FIXME Only stop local TypedActor?
while (i.hasNext) { try { typedActor.stop(i.next) } catch { case e: Exception => } }
}
map.clear
}
clearActorMap(actors)
clearActorMap(actorsByUuid)
clearTypedActorMap(typedActors)
clearTypedActorMap(typedActorsByUuid)
actorsFactories.clear
typedActorsFactories.clear
} }
} }

View file

@ -1,25 +0,0 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.http
import akka.config.Config
import akka.util.{Bootable}
import akka.remote.BootableRemoteActorService
import akka.actor.BootableActorLoaderService
import akka.servlet.AkkaLoader
class DefaultAkkaLoader extends AkkaLoader {
def boot(): Unit = boot(true, new EmbeddedAppServer with BootableActorLoaderService with BootableRemoteActorService)
}
/**
* Can be used to boot Akka
*
* java -cp ... akka.http.Main
*/
object Main extends DefaultAkkaLoader {
def main(args: Array[String]) = boot
}

View file

@ -1,73 +0,0 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.http
import javax.ws.rs.core.UriBuilder
import javax.servlet.ServletConfig
import java.io.File
import akka.actor.BootableActorLoaderService
import akka.util.Bootable
import org.eclipse.jetty.xml.XmlConfiguration
import org.eclipse.jetty.server.{Handler, Server}
import org.eclipse.jetty.server.handler.{HandlerList, HandlerCollection, ContextHandler}
import java.net.URL
import akka.AkkaException
/**
* Handles the Akka Comet Support (load/unload)
*/
trait EmbeddedAppServer extends Bootable {
self: BootableActorLoaderService =>
import akka.config.Config._
val REST_HOSTNAME = config.getString("akka.http.hostname", "localhost")
val REST_PORT = config.getInt("akka.http.port", 9998)
val isRestEnabled = config.getList("akka.enabled-modules").exists(_ == "http")
protected var server: Option[Server] = None
protected def findJettyConfigXML: Option[URL] =
Option(applicationLoader.getOrElse(this.getClass.getClassLoader).getResource("microkernel-server.xml")) orElse
HOME.map(home => new File(home + "/config/microkernel-server.xml").toURI.toURL)
abstract override def onLoad = {
super.onLoad
if (isRestEnabled) {
val configuration = new XmlConfiguration(findJettyConfigXML.getOrElse(error("microkernel-server.xml not found!")))
System.setProperty("jetty.port", REST_PORT.toString)
System.setProperty("jetty.host", REST_HOSTNAME)
HOME.foreach( home => System.setProperty("jetty.home", home + "/deploy/root") )
server = Option(configuration.configure.asInstanceOf[Server]) map { s => //Set the correct classloader to our contexts
applicationLoader foreach { loader =>
//We need to provide the correct classloader to the servlets
def setClassLoader(handlers: Seq[Handler]): Unit = {
handlers foreach {
case c: ContextHandler => c.setClassLoader(loader)
case c: HandlerCollection => setClassLoader(c.getHandlers)
case _ =>
}
}
setClassLoader(s.getHandlers)
}
//Start the server
s.start()
s
}
}
}
abstract override def onUnload = {
super.onUnload
server foreach { _.stop() }
}
}

View file

@ -11,6 +11,11 @@ option optimize_for = SPEED;
protoc RemoteProtocol.proto --java_out ../java protoc RemoteProtocol.proto --java_out ../java
*******************************************/ *******************************************/
message AkkaRemoteProtocol {
optional RemoteMessageProtocol message = 1;
optional RemoteControlProtocol instruction = 2;
}
/** /**
* Defines a remote message. * Defines a remote message.
*/ */
@ -26,6 +31,21 @@ message RemoteMessageProtocol {
optional string cookie = 9; optional string cookie = 9;
} }
/**
* Defines some control messages for the remoting
*/
message RemoteControlProtocol {
optional string cookie = 1;
required CommandType commandType = 2;
}
/**
* Defines the type of the RemoteControlProtocol command type
*/
enum CommandType {
SHUTDOWN = 1;
}
/** /**
* Defines a remote ActorRef that "remembers" and uses its original Actor instance * Defines a remote ActorRef that "remembers" and uses its original Actor instance
* on the original node. * on the original node.

View file

@ -13,10 +13,9 @@ import akka.serialization.RemoteActorSerialization._
import akka.japi.Creator import akka.japi.Creator
import akka.config.Config._ import akka.config.Config._
import akka.remoteinterface._ import akka.remoteinterface._
import akka.actor. {Index, ActorInitializationException, LocalActorRef, newUuid, ActorRegistry, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType} import akka.actor.{EventHandler, Index, ActorInitializationException, LocalActorRef, newUuid, ActorRegistry, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType}
import akka.AkkaException import akka.AkkaException
import akka.actor.Actor._ import akka.actor.Actor._
import akka.actor.{EventHandler}
import akka.util._ import akka.util._
import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings} import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings}
@ -39,6 +38,20 @@ import scala.reflect.BeanProperty
import java.lang.reflect.InvocationTargetException import java.lang.reflect.InvocationTargetException
import java.util.concurrent.atomic. {AtomicReference, AtomicLong, AtomicBoolean} import java.util.concurrent.atomic. {AtomicReference, AtomicLong, AtomicBoolean}
object RemoteEncoder {
def encode(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = {
val arp = AkkaRemoteProtocol.newBuilder
arp.setMessage(rmp)
arp.build
}
def encode(rcp: RemoteControlProtocol): AkkaRemoteProtocol = {
val arp = AkkaRemoteProtocol.newBuilder
arp.setInstruction(rcp)
arp.build
}
}
trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagement => trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagement =>
private val remoteClients = new HashMap[Address, RemoteClient] private val remoteClients = new HashMap[Address, RemoteClient]
private val remoteActors = new Index[Address, Uuid] private val remoteActors = new Index[Address, Uuid]
@ -197,7 +210,7 @@ abstract class RemoteClient private[akka] (
senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = { senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = {
if (isRunning) { if (isRunning) {
if (request.getOneWay) { if (request.getOneWay) {
currentChannel.write(request).addListener(new ChannelFutureListener { currentChannel.write(RemoteEncoder.encode(request)).addListener(new ChannelFutureListener {
def operationComplete(future: ChannelFuture) { def operationComplete(future: ChannelFuture) {
if (future.isCancelled) { if (future.isCancelled) {
//We don't care about that right now //We don't care about that right now
@ -212,7 +225,7 @@ abstract class RemoteClient private[akka] (
else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout) else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout)
val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow) val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow)
futures.put(futureUuid, futureResult) //Add this prematurely, remove it if write fails futures.put(futureUuid, futureResult) //Add this prematurely, remove it if write fails
currentChannel.write(request).addListener(new ChannelFutureListener { currentChannel.write(RemoteEncoder.encode(request)).addListener(new ChannelFutureListener {
def operationComplete(future: ChannelFuture) { def operationComplete(future: ChannelFuture) {
if (future.isCancelled) { if (future.isCancelled) {
futures.remove(futureUuid) //Clean this up futures.remove(futureUuid) //Clean this up
@ -280,6 +293,7 @@ class ActiveRemoteClient private[akka] (
notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress)) notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress))
false false
} else { } else {
//Add a task that does GCing of expired Futures
timer.newTimeout(new TimerTask() { timer.newTimeout(new TimerTask() {
def run(timeout: Timeout) = { def run(timeout: Timeout) = {
if(isRunning) { if(isRunning) {
@ -327,10 +341,7 @@ class ActiveRemoteClient private[akka] (
reconnectionTimeWindowStart = System.currentTimeMillis reconnectionTimeWindowStart = System.currentTimeMillis
true true
} else { } else {
val timeLeft = RECONNECTION_TIME_WINDOW - (System.currentTimeMillis - reconnectionTimeWindowStart) /*Time left > 0*/ (RECONNECTION_TIME_WINDOW - (System.currentTimeMillis - reconnectionTimeWindowStart)) > 0
if (timeLeft > 0) {
true
} else false
} }
} }
@ -363,7 +374,7 @@ class ActiveRemoteClientPipelineFactory(
val timeout = new ReadTimeoutHandler(timer, RemoteClientSettings.READ_TIMEOUT.toMillis.toInt) val timeout = new ReadTimeoutHandler(timer, RemoteClientSettings.READ_TIMEOUT.toMillis.toInt)
val lenDec = new LengthFieldBasedFrameDecoder(RemoteClientSettings.MESSAGE_FRAME_SIZE, 0, 4, 0, 4) val lenDec = new LengthFieldBasedFrameDecoder(RemoteClientSettings.MESSAGE_FRAME_SIZE, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4) val lenPrep = new LengthFieldPrepender(4)
val protobufDec = new ProtobufDecoder(RemoteMessageProtocol.getDefaultInstance) val protobufDec = new ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance)
val protobufEnc = new ProtobufEncoder val protobufEnc = new ProtobufEncoder
val (enc, dec) = RemoteServerSettings.COMPRESSION_SCHEME match { val (enc, dec) = RemoteServerSettings.COMPRESSION_SCHEME match {
case "zlib" => (join(new ZlibEncoder(RemoteServerSettings.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder)) case "zlib" => (join(new ZlibEncoder(RemoteServerSettings.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder))
@ -400,7 +411,13 @@ class ActiveRemoteClientHandler(
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) { override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) {
try { try {
event.getMessage match { event.getMessage match {
case reply: RemoteMessageProtocol => case arp: AkkaRemoteProtocol if arp.hasInstruction =>
val rcp = arp.getInstruction
rcp.getCommandType match {
case CommandType.SHUTDOWN => spawn { client.shutdown }
}
case arp: AkkaRemoteProtocol if arp.hasMessage =>
val reply = arp.getMessage
val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow) val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow)
val future = futures.remove(replyUuid).asInstanceOf[CompletableFuture[Any]] val future = futures.remove(replyUuid).asInstanceOf[CompletableFuture[Any]]
@ -423,7 +440,6 @@ class ActiveRemoteClientHandler(
future.completeWithException(exception) future.completeWithException(exception)
} }
case other => case other =>
throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.module, client.remoteAddress) throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.module, client.remoteAddress)
} }
@ -552,6 +568,14 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
def shutdown { def shutdown {
try { try {
val shutdownSignal = {
val b = RemoteControlProtocol.newBuilder
if (RemoteClientSettings.SECURE_COOKIE.nonEmpty)
b.setCookie(RemoteClientSettings.SECURE_COOKIE.get)
b.setCommandType(CommandType.SHUTDOWN)
b.build
}
openChannels.write(RemoteEncoder.encode(shutdownSignal)).awaitUninterruptibly
openChannels.disconnect openChannels.disconnect
openChannels.close.awaitUninterruptibly openChannels.close.awaitUninterruptibly
bootstrap.releaseExternalResources bootstrap.releaseExternalResources
@ -765,7 +789,7 @@ class RemoteServerPipelineFactory(
val ssl = if(SECURE) join(new SslHandler(engine)) else join() val ssl = if(SECURE) join(new SslHandler(engine)) else join()
val lenDec = new LengthFieldBasedFrameDecoder(MESSAGE_FRAME_SIZE, 0, 4, 0, 4) val lenDec = new LengthFieldBasedFrameDecoder(MESSAGE_FRAME_SIZE, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4) val lenPrep = new LengthFieldPrepender(4)
val protobufDec = new ProtobufDecoder(RemoteMessageProtocol.getDefaultInstance) val protobufDec = new ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance)
val protobufEnc = new ProtobufEncoder val protobufEnc = new ProtobufEncoder
val (enc, dec) = COMPRESSION_SCHEME match { val (enc, dec) = COMPRESSION_SCHEME match {
case "zlib" => (join(new ZlibEncoder(ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder)) case "zlib" => (join(new ZlibEncoder(ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder))
@ -796,8 +820,8 @@ class RemoteServerHandler(
val typedSessionActors = new ChannelLocal[ConcurrentHashMap[String, AnyRef]]() val typedSessionActors = new ChannelLocal[ConcurrentHashMap[String, AnyRef]]()
//Writes the specified message to the specified channel and propagates write errors to listeners //Writes the specified message to the specified channel and propagates write errors to listeners
private def write(channel: Channel, message: AnyRef): Unit = { private def write(channel: Channel, payload: AkkaRemoteProtocol): Unit = {
channel.write(message).addListener( channel.write(payload).addListener(
new ChannelFutureListener { new ChannelFutureListener {
def operationComplete(future: ChannelFuture): Unit = { def operationComplete(future: ChannelFuture): Unit = {
if (future.isCancelled) { if (future.isCancelled) {
@ -807,7 +831,7 @@ class RemoteServerHandler(
case i: InetSocketAddress => Some(i) case i: InetSocketAddress => Some(i)
case _ => None case _ => None
} }
server.notifyListeners(RemoteServerWriteFailed(message, future.getCause, server, socketAddress)) server.notifyListeners(RemoteServerWriteFailed(payload, future.getCause, server, socketAddress))
} }
} }
}) })
@ -871,7 +895,9 @@ class RemoteServerHandler(
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = event.getMessage match { override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = event.getMessage match {
case null => throw new IllegalActorStateException("Message in remote MessageEvent is null: " + event) case null => throw new IllegalActorStateException("Message in remote MessageEvent is null: " + event)
case requestProtocol: RemoteMessageProtocol => //case remoteProtocol: AkkaRemoteProtocol if remoteProtocol.hasInstruction => RemoteServer cannot receive control messages (yet)
case remoteProtocol: AkkaRemoteProtocol if remoteProtocol.hasMessage =>
val requestProtocol = remoteProtocol.getMessage
if (REQUIRE_COOKIE) authenticateRemoteClient(requestProtocol, ctx) if (REQUIRE_COOKIE) authenticateRemoteClient(requestProtocol, ctx)
handleRemoteMessageProtocol(requestProtocol, event.getChannel) handleRemoteMessageProtocol(requestProtocol, event.getChannel)
case _ => //ignore case _ => //ignore
@ -952,7 +978,7 @@ class RemoteServerHandler(
// FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method // FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid) if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
write(channel, messageBuilder.build) write(channel, RemoteEncoder.encode(messageBuilder.build))
} }
} }
) )
@ -988,7 +1014,7 @@ class RemoteServerHandler(
None) None)
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid) if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
write(channel, messageBuilder.build) write(channel, RemoteEncoder.encode(messageBuilder.build))
} catch { } catch {
case e: Exception => case e: Exception =>
EventHandler notifyListeners EventHandler.Error(e, this) EventHandler notifyListeners EventHandler.Error(e, this)
@ -1157,7 +1183,7 @@ class RemoteServerHandler(
} }
} }
private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol, actorType: AkkaActorType): RemoteMessageProtocol = { private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol, actorType: AkkaActorType): AkkaRemoteProtocol = {
val actorInfo = request.getActorInfo val actorInfo = request.getActorInfo
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
None, None,
@ -1172,7 +1198,7 @@ class RemoteServerHandler(
actorType, actorType,
None) None)
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid) if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
messageBuilder.build RemoteEncoder.encode(messageBuilder.build)
} }
private def authenticateRemoteClient(request: RemoteMessageProtocol, ctx: ChannelHandlerContext) = { private def authenticateRemoteClient(request: RemoteMessageProtocol, ctx: ChannelHandlerContext) = {
@ -1212,4 +1238,4 @@ class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(na
throw new IllegalStateException("ChannelGroup already closed, cannot add new channel") throw new IllegalStateException("ChannelGroup already closed, cannot add new channel")
} }
} }
} }

View file

@ -47,7 +47,6 @@ class AkkaRemoteTest extends
override def beforeEach { override def beforeEach {
remote.start(host,port) remote.start(host,port)
Thread.sleep(2000)
super.beforeEach super.beforeEach
} }

View file

@ -445,9 +445,8 @@ object TypedActor {
* @param intfClass interface the typed actor implements * @param intfClass interface the typed actor implements
* @param targetClass implementation class of the typed actor * @param targetClass implementation class of the typed actor
*/ */
def newInstance[T](intfClass: Class[T], targetClass: Class[_]): T = { def newInstance[T](intfClass: Class[T], targetClass: Class[_]): T =
newInstance(intfClass, targetClass, TypedActorConfiguration()) newInstance(intfClass, targetClass, TypedActorConfiguration())
}
/** /**
* Factory method for typed actor. * Factory method for typed actor.
@ -759,7 +758,6 @@ object TypedActor {
val typedActor = val typedActor =
if (instance.isInstanceOf[TypedActor]) instance.asInstanceOf[TypedActor] if (instance.isInstanceOf[TypedActor]) instance.asInstanceOf[TypedActor]
else throw new IllegalArgumentException("Actor [" + targetClass.getName + "] is not a sub class of 'TypedActor'") else throw new IllegalArgumentException("Actor [" + targetClass.getName + "] is not a sub class of 'TypedActor'")
typedActor.preStart
typedActor typedActor
} }
@ -768,7 +766,6 @@ object TypedActor {
val typedActor = val typedActor =
if (instance.isInstanceOf[TypedActor]) instance.asInstanceOf[TypedActor] if (instance.isInstanceOf[TypedActor]) instance.asInstanceOf[TypedActor]
else throw new IllegalArgumentException("Actor [" + instance.getClass.getName + "] is not a sub class of 'TypedActor'") else throw new IllegalArgumentException("Actor [" + instance.getClass.getName + "] is not a sub class of 'TypedActor'")
typedActor.preStart
typedActor typedActor
} }

View file

@ -0,0 +1,51 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.actor
import org.scalatest.Spec
import org.scalatest.Assertions
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterEach
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import Issue675Spec._
object Issue675Spec {
var l = collection.mutable.ListBuffer.empty[String]
trait RegistrationService {
def register(user: String, cred: String): Unit
}
class RegistrationServiceImpl extends TypedActor with RegistrationService {
def register(user: String, cred: String): Unit = {}
override def preStart() {
l += "RegistrationServiceImpl.preStart() called"
}
}
}
@RunWith(classOf[JUnitRunner])
class Issue675Spec extends
Spec with
ShouldMatchers with
BeforeAndAfterEach {
override def afterEach() {
Actor.registry.shutdownAll
}
describe("TypedActor preStart method") {
it("should be invoked once") {
import Issue675Spec._
val simplePojo = TypedActor.newInstance(classOf[RegistrationService], classOf[RegistrationServiceImpl])
l.size should equal(1)
}
}
}

View file

@ -12,7 +12,7 @@ akka {
time-unit = "seconds" # Time unit for all timeout properties throughout the config time-unit = "seconds" # Time unit for all timeout properties throughout the config
default-error-handler = on # register the default error handler listener which logs errors to STDOUT event-handlers = ["akka.actor.EventHandler$DefaultListener"] # event handlers to register at boot time (EventHandler$DefaultListener logs to STDOUT)
# These boot classes are loaded (and created) automatically when the Akka Microkernel boots up # These boot classes are loaded (and created) automatically when the Akka Microkernel boots up
# Can be used to bootstrap your application(s) # Can be used to bootstrap your application(s)

View file

@ -98,7 +98,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", GuiceyFruitRepo) lazy val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", GuiceyFruitRepo)
lazy val glassfishModuleConfig = ModuleConfiguration("org.glassfish", GlassfishRepo) lazy val glassfishModuleConfig = ModuleConfiguration("org.glassfish", GlassfishRepo)
lazy val jbossModuleConfig = ModuleConfiguration("org.jboss", JBossRepo) lazy val jbossModuleConfig = ModuleConfiguration("org.jboss", JBossRepo)
lazy val jerseyContrModuleConfig = ModuleConfiguration("com.sun.jersey.contribs", JavaNetRepo)
lazy val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", JavaNetRepo) lazy val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", JavaNetRepo)
lazy val multiverseModuleConfig = ModuleConfiguration("org.multiverse", CodehausRepo) lazy val multiverseModuleConfig = ModuleConfiguration("org.multiverse", CodehausRepo)
lazy val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo) lazy val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo)
@ -135,11 +134,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val javax_servlet_30 = "org.glassfish" % "javax.servlet" % JAVAX_SERVLET_VERSION % "provided" //CDDL v1 lazy val javax_servlet_30 = "org.glassfish" % "javax.servlet" % JAVAX_SERVLET_VERSION % "provided" //CDDL v1
lazy val jetty = "org.eclipse.jetty" % "jetty-server" % JETTY_VERSION % "compile" //Eclipse license lazy val jetty = "org.eclipse.jetty" % "jetty-server" % JETTY_VERSION % "provided" //Eclipse license
lazy val jetty_util = "org.eclipse.jetty" % "jetty-util" % JETTY_VERSION % "compile" //Eclipse license
lazy val jetty_xml = "org.eclipse.jetty" % "jetty-xml" % JETTY_VERSION % "compile" //Eclipse license
lazy val jetty_servlet = "org.eclipse.jetty" % "jetty-servlet" % JETTY_VERSION % "compile" //Eclipse license
lazy val guicey = "org.guiceyfruit" % "guice-all" % "2.0" % "compile" //ApacheV2 lazy val guicey = "org.guiceyfruit" % "guice-all" % "2.0" % "compile" //ApacheV2
lazy val h2_lzf = "voldemort.store.compress" % "h2-lzf" % "1.0" % "compile" //ApacheV2 lazy val h2_lzf = "voldemort.store.compress" % "h2-lzf" % "1.0" % "compile" //ApacheV2
@ -147,10 +142,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val jackson = "org.codehaus.jackson" % "jackson-mapper-asl" % JACKSON_VERSION % "compile" //ApacheV2 lazy val jackson = "org.codehaus.jackson" % "jackson-mapper-asl" % JACKSON_VERSION % "compile" //ApacheV2
lazy val jackson_core = "org.codehaus.jackson" % "jackson-core-asl" % JACKSON_VERSION % "compile" //ApacheV2 lazy val jackson_core = "org.codehaus.jackson" % "jackson-core-asl" % JACKSON_VERSION % "compile" //ApacheV2
lazy val jersey = "com.sun.jersey" % "jersey-core" % JERSEY_VERSION % "compile" //CDDL v1 lazy val jersey_server = "com.sun.jersey" % "jersey-server" % JERSEY_VERSION % "provided" //CDDL v1
lazy val jersey_json = "com.sun.jersey" % "jersey-json" % JERSEY_VERSION % "compile" //CDDL v1
lazy val jersey_server = "com.sun.jersey" % "jersey-server" % JERSEY_VERSION % "compile" //CDDL v1
lazy val jersey_contrib = "com.sun.jersey.contribs" % "jersey-scala" % JERSEY_VERSION % "compile" //CDDL v1
lazy val jsr250 = "javax.annotation" % "jsr250-api" % "1.0" % "compile" //CDDL v1 lazy val jsr250 = "javax.annotation" % "jsr250-api" % "1.0" % "compile" //CDDL v1
@ -168,8 +160,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val sjson = "net.debasishg" % "sjson_2.8.1" % "0.9.1" % "compile" //ApacheV2 lazy val sjson = "net.debasishg" % "sjson_2.8.1" % "0.9.1" % "compile" //ApacheV2
lazy val sjson_test = "net.debasishg" % "sjson_2.8.1" % "0.9.1" % "test" //ApacheV2 lazy val sjson_test = "net.debasishg" % "sjson_2.8.1" % "0.9.1" % "test" //ApacheV2
lazy val stax_api = "javax.xml.stream" % "stax-api" % "1.0-2" % "compile" //ApacheV2
// Test // Test
lazy val commons_coll = "commons-collections" % "commons-collections" % "3.2.1" % "test" //ApacheV2 lazy val commons_coll = "commons-collections" % "commons-collections" % "3.2.1" % "test" //ApacheV2
@ -297,6 +287,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
val junit = Dependencies.junit val junit = Dependencies.junit
val scalatest = Dependencies.scalatest val scalatest = Dependencies.scalatest
val multiverse_test = Dependencies.multiverse_test // StandardLatch val multiverse_test = Dependencies.multiverse_test // StandardLatch
override def bndExportPackage = super.bndExportPackage ++ Seq("com.eaio.*;version=3.2")
} }
// ------------------------------------------------------------------------------------------------------------------- // -------------------------------------------------------------------------------------------------------------------
@ -356,16 +348,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
val jsr250 = Dependencies.jsr250 val jsr250 = Dependencies.jsr250
val javax_servlet30 = Dependencies.javax_servlet_30 val javax_servlet30 = Dependencies.javax_servlet_30
val jetty = Dependencies.jetty val jetty = Dependencies.jetty
val jetty_util = Dependencies.jetty_util val jersey = Dependencies.jersey_server
val jetty_xml = Dependencies.jetty_xml
val jetty_servlet = Dependencies.jetty_servlet
val jackson_core = Dependencies.jackson_core
val jersey = Dependencies.jersey
val jersey_contrib = Dependencies.jersey_contrib
val jersey_json = Dependencies.jersey_json
val jersey_server = Dependencies.jersey_server
val jsr311 = Dependencies.jsr311 val jsr311 = Dependencies.jsr311
val stax_api = Dependencies.stax_api
// testing // testing
val junit = Dependencies.junit val junit = Dependencies.junit