Merge branch 'master' of github.com:jboner/akka
This commit is contained in:
commit
6a5bd2c594
11 changed files with 1914 additions and 976 deletions
|
|
@ -23,15 +23,13 @@ import java.net.{InetAddress, UnknownHostException}
|
|||
import AkkaException._
|
||||
val exceptionName = getClass.getName
|
||||
|
||||
lazy val uuid = "%s_%s".format(hostname, newUuid)
|
||||
val uuid = "%s_%s".format(hostname, newUuid)
|
||||
|
||||
override lazy val toString = "%s\n\t[%s]\n\t%s\n\t%s".format(exceptionName, uuid, message, stackTrace)
|
||||
|
||||
lazy val stackTrace = {
|
||||
override val toString = "%s\n\t[%s]\n\t%s\n\t%s".format(exceptionName, uuid, message, {
|
||||
val sw = new StringWriter
|
||||
printStackTrace(new PrintWriter(sw))
|
||||
sw.toString
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
object AkkaException {
|
||||
|
|
|
|||
|
|
@ -7,11 +7,10 @@ package akka.dispatch
|
|||
import akka.actor.{Actor, ActorRef}
|
||||
import akka.actor.newUuid
|
||||
import akka.config.Config._
|
||||
import akka.util.{Duration}
|
||||
import akka.util.{Duration,ReflectiveAccess}
|
||||
|
||||
import akka.config.ConfigMap
|
||||
|
||||
import java.util.concurrent.ThreadPoolExecutor.{AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy}
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
/**
|
||||
|
|
@ -165,6 +164,7 @@ object Dispatchers {
|
|||
* type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
|
||||
* # (ExecutorBasedEventDrivenWorkStealing), ExecutorBasedEventDriven,
|
||||
* # GlobalExecutorBasedEventDriven
|
||||
* # A FQCN to a class inheriting MessageDispatcherConfigurator with a no-arg visible constructor
|
||||
* keep-alive-time = 60 # Keep alive time for threads
|
||||
* core-pool-size-factor = 1.0 # No of core threads ... ceil(available processors * factor)
|
||||
* max-pool-size-factor = 4.0 # Max no of threads ... ceil(available processors * factor)
|
||||
|
|
@ -178,54 +178,52 @@ object Dispatchers {
|
|||
* Gotcha: Only configures the dispatcher if possible
|
||||
* Returns: None if "type" isn't specified in the config
|
||||
* Throws: IllegalArgumentException if the value of "type" is not valid
|
||||
* IllegalArgumentException if it cannot
|
||||
*/
|
||||
def from(cfg: ConfigMap): Option[MessageDispatcher] = {
|
||||
lazy val name = cfg.getString("name", newUuid.toString)
|
||||
|
||||
def configureThreadPool(createDispatcher: => (ThreadPoolConfig) => MessageDispatcher): ThreadPoolConfigDispatcherBuilder = {
|
||||
import ThreadPoolConfigDispatcherBuilder.conf_?
|
||||
|
||||
//Apply the following options to the config if they are present in the cfg
|
||||
ThreadPoolConfigDispatcherBuilder(createDispatcher,ThreadPoolConfig()).configure(
|
||||
conf_?(cfg getInt "keep-alive-time" )(time => _.setKeepAliveTime(Duration(time, TIME_UNIT))),
|
||||
conf_?(cfg getDouble "core-pool-size-factor")(factor => _.setCorePoolSizeFromFactor(factor)),
|
||||
conf_?(cfg getDouble "max-pool-size-factor" )(factor => _.setMaxPoolSizeFromFactor(factor)),
|
||||
conf_?(cfg getInt "executor-bounds" )(bounds => _.setExecutorBounds(bounds)),
|
||||
conf_?(cfg getBool "allow-core-timeout" )(allow => _.setAllowCoreThreadTimeout(allow)),
|
||||
conf_?(cfg getString "rejection-policy" map {
|
||||
case "abort" => new AbortPolicy()
|
||||
case "caller-runs" => new CallerRunsPolicy()
|
||||
case "discard-oldest" => new DiscardOldestPolicy()
|
||||
case "discard" => new DiscardPolicy()
|
||||
case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x)
|
||||
})(policy => _.setRejectionPolicy(policy)))
|
||||
}
|
||||
|
||||
lazy val mailboxType: MailboxType = {
|
||||
val capacity = cfg.getInt("mailbox-capacity", MAILBOX_CAPACITY)
|
||||
// FIXME how do we read in isBlocking for mailbox? Now set to 'false'.
|
||||
if (capacity < 0) UnboundedMailbox()
|
||||
else BoundedMailbox(false, capacity, Duration(cfg.getInt("mailbox-push-timeout-time", MAILBOX_PUSH_TIME_OUT.toMillis.toInt), TIME_UNIT))
|
||||
}
|
||||
|
||||
cfg.getString("type") map {
|
||||
case "ExecutorBasedEventDriven" =>
|
||||
configureThreadPool(threadPoolConfig => new ExecutorBasedEventDrivenDispatcher(
|
||||
name,
|
||||
cfg.getInt("throughput", THROUGHPUT),
|
||||
cfg.getInt("throughput-deadline-time", THROUGHPUT_DEADLINE_TIME_MILLIS),
|
||||
mailboxType,
|
||||
threadPoolConfig)).build
|
||||
|
||||
case "ExecutorBasedEventDrivenWorkStealing" =>
|
||||
configureThreadPool(threadPoolConfig => new ExecutorBasedEventDrivenWorkStealingDispatcher(
|
||||
name,
|
||||
cfg.getInt("throughput", THROUGHPUT),
|
||||
cfg.getInt("throughput-deadline-time", THROUGHPUT_DEADLINE_TIME_MILLIS),
|
||||
mailboxType,
|
||||
threadPoolConfig)).build
|
||||
case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher
|
||||
case unknown => throw new IllegalArgumentException("Unknown dispatcher type [%s]" format unknown)
|
||||
}
|
||||
cfg.getString("type") map {
|
||||
case "ExecutorBasedEventDriven" => new ExecutorBasedEventDrivenDispatcherConfigurator()
|
||||
case "ExecutorBasedEventDrivenWorkStealing" => new ExecutorBasedEventDrivenWorkStealingDispatcherConfigurator()
|
||||
case "GlobalExecutorBasedEventDriven" => GlobalExecutorBasedEventDrivenDispatcherConfigurator
|
||||
case fqn =>
|
||||
ReflectiveAccess.getClassFor[MessageDispatcherConfigurator](fqn) match {
|
||||
case Some(clazz) =>
|
||||
val instance = ReflectiveAccess.createInstance[MessageDispatcherConfigurator](clazz, Array[Class[_]](), Array[AnyRef]())
|
||||
if (instance.isEmpty)
|
||||
throw new IllegalArgumentException("Cannot instantiate MessageDispatcherConfigurator type [%s], make sure it has a default no-args constructor" format fqn)
|
||||
else
|
||||
instance.get
|
||||
case None =>
|
||||
throw new IllegalArgumentException("Unknown MessageDispatcherConfigurator type [%s]" format fqn)
|
||||
}
|
||||
} map {
|
||||
_ configure cfg
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object GlobalExecutorBasedEventDrivenDispatcherConfigurator extends MessageDispatcherConfigurator {
|
||||
def configure(config: ConfigMap): MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
|
||||
}
|
||||
|
||||
class ExecutorBasedEventDrivenDispatcherConfigurator extends MessageDispatcherConfigurator {
|
||||
def configure(config: ConfigMap): MessageDispatcher = {
|
||||
configureThreadPool(config, threadPoolConfig => new ExecutorBasedEventDrivenDispatcher(
|
||||
config.getString("name", newUuid.toString),
|
||||
config.getInt("throughput", Dispatchers.THROUGHPUT),
|
||||
config.getInt("throughput-deadline-time", Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS),
|
||||
mailboxType(config),
|
||||
threadPoolConfig)).build
|
||||
}
|
||||
}
|
||||
|
||||
class ExecutorBasedEventDrivenWorkStealingDispatcherConfigurator extends MessageDispatcherConfigurator {
|
||||
def configure(config: ConfigMap): MessageDispatcher = {
|
||||
configureThreadPool(config, threadPoolConfig => new ExecutorBasedEventDrivenWorkStealingDispatcher(
|
||||
config.getString("name", newUuid.toString),
|
||||
config.getInt("throughput", Dispatchers.THROUGHPUT),
|
||||
config.getInt("throughput-deadline-time", Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS),
|
||||
mailboxType(config),
|
||||
threadPoolConfig)).build
|
||||
}
|
||||
}
|
||||
|
|
@ -6,8 +6,10 @@ package akka.dispatch
|
|||
|
||||
import java.util.concurrent._
|
||||
import atomic. {AtomicInteger, AtomicBoolean, AtomicReference, AtomicLong}
|
||||
|
||||
import akka.util.{Switch, ReentrantGuard, HashCode, ReflectiveAccess}
|
||||
import akka.config.ConfigMap
|
||||
import akka.config.Config.TIME_UNIT
|
||||
import akka.util.{Duration, Switch, ReentrantGuard, HashCode, ReflectiveAccess}
|
||||
import java.util.concurrent.ThreadPoolExecutor.{AbortPolicy, CallerRunsPolicy, DiscardOldestPolicy, DiscardPolicy}
|
||||
import akka.actor._
|
||||
|
||||
/**
|
||||
|
|
@ -162,3 +164,36 @@ trait MessageDispatcher {
|
|||
*/
|
||||
def mailboxSize(actorRef: ActorRef): Int
|
||||
}
|
||||
|
||||
/**
|
||||
* Trait to be used for hooking in new dispatchers into Dispatchers.fromConfig
|
||||
*/
|
||||
abstract class MessageDispatcherConfigurator {
|
||||
def configure(config: ConfigMap): MessageDispatcher
|
||||
|
||||
def mailboxType(config: ConfigMap): MailboxType = {
|
||||
val capacity = config.getInt("mailbox-capacity", Dispatchers.MAILBOX_CAPACITY)
|
||||
// FIXME how do we read in isBlocking for mailbox? Now set to 'false'.
|
||||
if (capacity < 0) UnboundedMailbox()
|
||||
else BoundedMailbox(false, capacity, Duration(config.getInt("mailbox-push-timeout-time", Dispatchers.MAILBOX_PUSH_TIME_OUT.toMillis.toInt), TIME_UNIT))
|
||||
}
|
||||
|
||||
def configureThreadPool(config: ConfigMap, createDispatcher: => (ThreadPoolConfig) => MessageDispatcher): ThreadPoolConfigDispatcherBuilder = {
|
||||
import ThreadPoolConfigDispatcherBuilder.conf_?
|
||||
|
||||
//Apply the following options to the config if they are present in the config
|
||||
ThreadPoolConfigDispatcherBuilder(createDispatcher,ThreadPoolConfig()).configure(
|
||||
conf_?(config getInt "keep-alive-time" )(time => _.setKeepAliveTime(Duration(time, TIME_UNIT))),
|
||||
conf_?(config getDouble "core-pool-size-factor")(factor => _.setCorePoolSizeFromFactor(factor)),
|
||||
conf_?(config getDouble "max-pool-size-factor" )(factor => _.setMaxPoolSizeFromFactor(factor)),
|
||||
conf_?(config getInt "executor-bounds" )(bounds => _.setExecutorBounds(bounds)),
|
||||
conf_?(config getBool "allow-core-timeout" )(allow => _.setAllowCoreThreadTimeout(allow)),
|
||||
conf_?(config getString "rejection-policy" map {
|
||||
case "abort" => new AbortPolicy()
|
||||
case "caller-runs" => new CallerRunsPolicy()
|
||||
case "discard-oldest" => new DiscardOldestPolicy()
|
||||
case "discard" => new DiscardPolicy()
|
||||
case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x)
|
||||
})(policy => _.setRejectionPolicy(policy)))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
File diff suppressed because it is too large
Load diff
|
|
@ -11,6 +11,11 @@ option optimize_for = SPEED;
|
|||
protoc RemoteProtocol.proto --java_out ../java
|
||||
*******************************************/
|
||||
|
||||
message AkkaRemoteProtocol {
|
||||
optional RemoteMessageProtocol message = 1;
|
||||
optional RemoteControlProtocol instruction = 2;
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines a remote message.
|
||||
*/
|
||||
|
|
@ -26,6 +31,21 @@ message RemoteMessageProtocol {
|
|||
optional string cookie = 9;
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines some control messages for the remoting
|
||||
*/
|
||||
message RemoteControlProtocol {
|
||||
optional string cookie = 1;
|
||||
required CommandType commandType = 2;
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines the type of the RemoteControlProtocol command type
|
||||
*/
|
||||
enum CommandType {
|
||||
SHUTDOWN = 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines a remote ActorRef that "remembers" and uses its original Actor instance
|
||||
* on the original node.
|
||||
|
|
|
|||
|
|
@ -13,10 +13,9 @@ import akka.serialization.RemoteActorSerialization._
|
|||
import akka.japi.Creator
|
||||
import akka.config.Config._
|
||||
import akka.remoteinterface._
|
||||
import akka.actor. {Index, ActorInitializationException, LocalActorRef, newUuid, ActorRegistry, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType}
|
||||
import akka.actor.{EventHandler, Index, ActorInitializationException, LocalActorRef, newUuid, ActorRegistry, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, Exit, LifeCycleMessage, ActorType => AkkaActorType}
|
||||
import akka.AkkaException
|
||||
import akka.actor.Actor._
|
||||
import akka.actor.{EventHandler}
|
||||
import akka.util._
|
||||
import akka.remote.{MessageSerializer, RemoteClientSettings, RemoteServerSettings}
|
||||
|
||||
|
|
@ -39,6 +38,20 @@ import scala.reflect.BeanProperty
|
|||
import java.lang.reflect.InvocationTargetException
|
||||
import java.util.concurrent.atomic. {AtomicReference, AtomicLong, AtomicBoolean}
|
||||
|
||||
object RemoteEncoder {
|
||||
def encode(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = {
|
||||
val arp = AkkaRemoteProtocol.newBuilder
|
||||
arp.setMessage(rmp)
|
||||
arp.build
|
||||
}
|
||||
|
||||
def encode(rcp: RemoteControlProtocol): AkkaRemoteProtocol = {
|
||||
val arp = AkkaRemoteProtocol.newBuilder
|
||||
arp.setInstruction(rcp)
|
||||
arp.build
|
||||
}
|
||||
}
|
||||
|
||||
trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagement =>
|
||||
private val remoteClients = new HashMap[Address, RemoteClient]
|
||||
private val remoteActors = new Index[Address, Uuid]
|
||||
|
|
@ -197,7 +210,7 @@ abstract class RemoteClient private[akka] (
|
|||
senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = {
|
||||
if (isRunning) {
|
||||
if (request.getOneWay) {
|
||||
currentChannel.write(request).addListener(new ChannelFutureListener {
|
||||
currentChannel.write(RemoteEncoder.encode(request)).addListener(new ChannelFutureListener {
|
||||
def operationComplete(future: ChannelFuture) {
|
||||
if (future.isCancelled) {
|
||||
//We don't care about that right now
|
||||
|
|
@ -212,7 +225,7 @@ abstract class RemoteClient private[akka] (
|
|||
else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout)
|
||||
val futureUuid = uuidFrom(request.getUuid.getHigh, request.getUuid.getLow)
|
||||
futures.put(futureUuid, futureResult) //Add this prematurely, remove it if write fails
|
||||
currentChannel.write(request).addListener(new ChannelFutureListener {
|
||||
currentChannel.write(RemoteEncoder.encode(request)).addListener(new ChannelFutureListener {
|
||||
def operationComplete(future: ChannelFuture) {
|
||||
if (future.isCancelled) {
|
||||
futures.remove(futureUuid) //Clean this up
|
||||
|
|
@ -280,6 +293,7 @@ class ActiveRemoteClient private[akka] (
|
|||
notifyListeners(RemoteClientError(connection.getCause, module, remoteAddress))
|
||||
false
|
||||
} else {
|
||||
//Add a task that does GCing of expired Futures
|
||||
timer.newTimeout(new TimerTask() {
|
||||
def run(timeout: Timeout) = {
|
||||
if(isRunning) {
|
||||
|
|
@ -327,10 +341,7 @@ class ActiveRemoteClient private[akka] (
|
|||
reconnectionTimeWindowStart = System.currentTimeMillis
|
||||
true
|
||||
} else {
|
||||
val timeLeft = RECONNECTION_TIME_WINDOW - (System.currentTimeMillis - reconnectionTimeWindowStart)
|
||||
if (timeLeft > 0) {
|
||||
true
|
||||
} else false
|
||||
/*Time left > 0*/ (RECONNECTION_TIME_WINDOW - (System.currentTimeMillis - reconnectionTimeWindowStart)) > 0
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -363,7 +374,7 @@ class ActiveRemoteClientPipelineFactory(
|
|||
val timeout = new ReadTimeoutHandler(timer, RemoteClientSettings.READ_TIMEOUT.toMillis.toInt)
|
||||
val lenDec = new LengthFieldBasedFrameDecoder(RemoteClientSettings.MESSAGE_FRAME_SIZE, 0, 4, 0, 4)
|
||||
val lenPrep = new LengthFieldPrepender(4)
|
||||
val protobufDec = new ProtobufDecoder(RemoteMessageProtocol.getDefaultInstance)
|
||||
val protobufDec = new ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance)
|
||||
val protobufEnc = new ProtobufEncoder
|
||||
val (enc, dec) = RemoteServerSettings.COMPRESSION_SCHEME match {
|
||||
case "zlib" => (join(new ZlibEncoder(RemoteServerSettings.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder))
|
||||
|
|
@ -400,7 +411,13 @@ class ActiveRemoteClientHandler(
|
|||
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) {
|
||||
try {
|
||||
event.getMessage match {
|
||||
case reply: RemoteMessageProtocol =>
|
||||
case arp: AkkaRemoteProtocol if arp.hasInstruction =>
|
||||
val rcp = arp.getInstruction
|
||||
rcp.getCommandType match {
|
||||
case CommandType.SHUTDOWN => spawn { client.shutdown }
|
||||
}
|
||||
case arp: AkkaRemoteProtocol if arp.hasMessage =>
|
||||
val reply = arp.getMessage
|
||||
val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow)
|
||||
val future = futures.remove(replyUuid).asInstanceOf[CompletableFuture[Any]]
|
||||
|
||||
|
|
@ -423,7 +440,6 @@ class ActiveRemoteClientHandler(
|
|||
|
||||
future.completeWithException(exception)
|
||||
}
|
||||
|
||||
case other =>
|
||||
throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.module, client.remoteAddress)
|
||||
}
|
||||
|
|
@ -552,6 +568,14 @@ class NettyRemoteServer(serverModule: NettyRemoteServerModule, val host: String,
|
|||
|
||||
def shutdown {
|
||||
try {
|
||||
val shutdownSignal = {
|
||||
val b = RemoteControlProtocol.newBuilder
|
||||
if (RemoteClientSettings.SECURE_COOKIE.nonEmpty)
|
||||
b.setCookie(RemoteClientSettings.SECURE_COOKIE.get)
|
||||
b.setCommandType(CommandType.SHUTDOWN)
|
||||
b.build
|
||||
}
|
||||
openChannels.write(RemoteEncoder.encode(shutdownSignal)).awaitUninterruptibly
|
||||
openChannels.disconnect
|
||||
openChannels.close.awaitUninterruptibly
|
||||
bootstrap.releaseExternalResources
|
||||
|
|
@ -765,7 +789,7 @@ class RemoteServerPipelineFactory(
|
|||
val ssl = if(SECURE) join(new SslHandler(engine)) else join()
|
||||
val lenDec = new LengthFieldBasedFrameDecoder(MESSAGE_FRAME_SIZE, 0, 4, 0, 4)
|
||||
val lenPrep = new LengthFieldPrepender(4)
|
||||
val protobufDec = new ProtobufDecoder(RemoteMessageProtocol.getDefaultInstance)
|
||||
val protobufDec = new ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance)
|
||||
val protobufEnc = new ProtobufEncoder
|
||||
val (enc, dec) = COMPRESSION_SCHEME match {
|
||||
case "zlib" => (join(new ZlibEncoder(ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder))
|
||||
|
|
@ -796,8 +820,8 @@ class RemoteServerHandler(
|
|||
val typedSessionActors = new ChannelLocal[ConcurrentHashMap[String, AnyRef]]()
|
||||
|
||||
//Writes the specified message to the specified channel and propagates write errors to listeners
|
||||
private def write(channel: Channel, message: AnyRef): Unit = {
|
||||
channel.write(message).addListener(
|
||||
private def write(channel: Channel, payload: AkkaRemoteProtocol): Unit = {
|
||||
channel.write(payload).addListener(
|
||||
new ChannelFutureListener {
|
||||
def operationComplete(future: ChannelFuture): Unit = {
|
||||
if (future.isCancelled) {
|
||||
|
|
@ -807,7 +831,7 @@ class RemoteServerHandler(
|
|||
case i: InetSocketAddress => Some(i)
|
||||
case _ => None
|
||||
}
|
||||
server.notifyListeners(RemoteServerWriteFailed(message, future.getCause, server, socketAddress))
|
||||
server.notifyListeners(RemoteServerWriteFailed(payload, future.getCause, server, socketAddress))
|
||||
}
|
||||
}
|
||||
})
|
||||
|
|
@ -871,7 +895,9 @@ class RemoteServerHandler(
|
|||
|
||||
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = event.getMessage match {
|
||||
case null => throw new IllegalActorStateException("Message in remote MessageEvent is null: " + event)
|
||||
case requestProtocol: RemoteMessageProtocol =>
|
||||
//case remoteProtocol: AkkaRemoteProtocol if remoteProtocol.hasInstruction => RemoteServer cannot receive control messages (yet)
|
||||
case remoteProtocol: AkkaRemoteProtocol if remoteProtocol.hasMessage =>
|
||||
val requestProtocol = remoteProtocol.getMessage
|
||||
if (REQUIRE_COOKIE) authenticateRemoteClient(requestProtocol, ctx)
|
||||
handleRemoteMessageProtocol(requestProtocol, event.getChannel)
|
||||
case _ => //ignore
|
||||
|
|
@ -952,7 +978,7 @@ class RemoteServerHandler(
|
|||
// FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method
|
||||
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
|
||||
write(channel, messageBuilder.build)
|
||||
write(channel, RemoteEncoder.encode(messageBuilder.build))
|
||||
}
|
||||
}
|
||||
)
|
||||
|
|
@ -988,7 +1014,7 @@ class RemoteServerHandler(
|
|||
None)
|
||||
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
|
||||
write(channel, messageBuilder.build)
|
||||
write(channel, RemoteEncoder.encode(messageBuilder.build))
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
EventHandler notifyListeners EventHandler.Error(e, this)
|
||||
|
|
@ -1157,7 +1183,7 @@ class RemoteServerHandler(
|
|||
}
|
||||
}
|
||||
|
||||
private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol, actorType: AkkaActorType): RemoteMessageProtocol = {
|
||||
private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol, actorType: AkkaActorType): AkkaRemoteProtocol = {
|
||||
val actorInfo = request.getActorInfo
|
||||
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
|
||||
None,
|
||||
|
|
@ -1172,7 +1198,7 @@ class RemoteServerHandler(
|
|||
actorType,
|
||||
None)
|
||||
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
messageBuilder.build
|
||||
RemoteEncoder.encode(messageBuilder.build)
|
||||
}
|
||||
|
||||
private def authenticateRemoteClient(request: RemoteMessageProtocol, ctx: ChannelHandlerContext) = {
|
||||
|
|
@ -1212,4 +1238,4 @@ class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(na
|
|||
throw new IllegalStateException("ChannelGroup already closed, cannot add new channel")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -47,7 +47,6 @@ class AkkaRemoteTest extends
|
|||
|
||||
override def beforeEach {
|
||||
remote.start(host,port)
|
||||
Thread.sleep(2000)
|
||||
super.beforeEach
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -445,9 +445,8 @@ object TypedActor {
|
|||
* @param intfClass interface the typed actor implements
|
||||
* @param targetClass implementation class of the typed actor
|
||||
*/
|
||||
def newInstance[T](intfClass: Class[T], targetClass: Class[_]): T = {
|
||||
def newInstance[T](intfClass: Class[T], targetClass: Class[_]): T =
|
||||
newInstance(intfClass, targetClass, TypedActorConfiguration())
|
||||
}
|
||||
|
||||
/**
|
||||
* Factory method for typed actor.
|
||||
|
|
@ -759,7 +758,6 @@ object TypedActor {
|
|||
val typedActor =
|
||||
if (instance.isInstanceOf[TypedActor]) instance.asInstanceOf[TypedActor]
|
||||
else throw new IllegalArgumentException("Actor [" + targetClass.getName + "] is not a sub class of 'TypedActor'")
|
||||
typedActor.preStart
|
||||
typedActor
|
||||
}
|
||||
|
||||
|
|
@ -768,7 +766,6 @@ object TypedActor {
|
|||
val typedActor =
|
||||
if (instance.isInstanceOf[TypedActor]) instance.asInstanceOf[TypedActor]
|
||||
else throw new IllegalArgumentException("Actor [" + instance.getClass.getName + "] is not a sub class of 'TypedActor'")
|
||||
typedActor.preStart
|
||||
typedActor
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,51 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.actor
|
||||
|
||||
import org.scalatest.Spec
|
||||
import org.scalatest.Assertions
|
||||
import org.scalatest.matchers.ShouldMatchers
|
||||
import org.scalatest.BeforeAndAfterEach
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
|
||||
import Issue675Spec._
|
||||
|
||||
|
||||
object Issue675Spec {
|
||||
var l = collection.mutable.ListBuffer.empty[String]
|
||||
|
||||
trait RegistrationService {
|
||||
def register(user: String, cred: String): Unit
|
||||
}
|
||||
|
||||
class RegistrationServiceImpl extends TypedActor with RegistrationService {
|
||||
def register(user: String, cred: String): Unit = {}
|
||||
|
||||
override def preStart() {
|
||||
l += "RegistrationServiceImpl.preStart() called"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class Issue675Spec extends
|
||||
Spec with
|
||||
ShouldMatchers with
|
||||
BeforeAndAfterEach {
|
||||
|
||||
override def afterEach() {
|
||||
Actor.registry.shutdownAll
|
||||
}
|
||||
|
||||
describe("TypedActor preStart method") {
|
||||
it("should be invoked once") {
|
||||
import Issue675Spec._
|
||||
val simplePojo = TypedActor.newInstance(classOf[RegistrationService], classOf[RegistrationServiceImpl])
|
||||
l.size should equal(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -12,7 +12,7 @@ akka {
|
|||
|
||||
time-unit = "seconds" # Time unit for all timeout properties throughout the config
|
||||
|
||||
event-handlers = ["akka.actor.EventHandler$DefaultListener"] # register the default event handlers (the EventHandler.DefaultListener listener logs errors to STDOUT)
|
||||
event-handlers = ["akka.actor.EventHandler$DefaultListener"] # event handlers to register at boot time (EventHandler$DefaultListener logs to STDOUT)
|
||||
|
||||
# These boot classes are loaded (and created) automatically when the Akka Microkernel boots up
|
||||
# Can be used to bootstrap your application(s)
|
||||
|
|
|
|||
|
|
@ -102,7 +102,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
lazy val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", JavaNetRepo)
|
||||
lazy val multiverseModuleConfig = ModuleConfiguration("org.multiverse", CodehausRepo)
|
||||
lazy val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo)
|
||||
lazy val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshotRepo)
|
||||
lazy val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsRelRepo)
|
||||
lazy val spdeModuleConfig = ModuleConfiguration("us.technically.spde", DatabinderRepo)
|
||||
lazy val processingModuleConfig = ModuleConfiguration("org.processing", DatabinderRepo)
|
||||
lazy val scalaModuleConfig = ModuleConfiguration("org.scala-lang", ScalaToolsSnapshotRepo)
|
||||
|
|
@ -297,6 +297,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
val junit = Dependencies.junit
|
||||
val scalatest = Dependencies.scalatest
|
||||
val multiverse_test = Dependencies.multiverse_test // StandardLatch
|
||||
|
||||
override def bndExportPackage = super.bndExportPackage ++ Seq("com.eaio.*;version=3.2")
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------------------
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue