merge master of jboner/akka

This commit is contained in:
ticktock 2010-09-21 10:50:46 -04:00
commit fed341d5bc
57 changed files with 2173 additions and 5648 deletions

View file

@ -60,8 +60,8 @@ case object ReceiveTimeout extends LifeCycleMessage
case class MaximumNumberOfRestartsWithinTimeRangeReached(
@BeanProperty val victim: ActorRef,
@BeanProperty val maxNrOfRetries: Int,
@BeanProperty val withinTimeRange: Int,
@BeanProperty val maxNrOfRetries: Option[Int],
@BeanProperty val withinTimeRange: Option[Int],
@BeanProperty val lastExceptionCausingRestart: Throwable) extends LifeCycleMessage
// Exceptions for Actors

View file

@ -28,6 +28,17 @@ import java.lang.reflect.Field
import scala.reflect.BeanProperty
object ActorRefStatus {
/** LifeCycles for ActorRefs
*/
private[akka] sealed trait StatusType
object UNSTARTED extends StatusType
object RUNNING extends StatusType
object BEING_RESTARTED extends StatusType
object SHUTDOWN extends StatusType
}
/**
* ActorRef is an immutable and serializable handle to an Actor.
* <p/>
@ -68,9 +79,7 @@ trait ActorRef extends
// Only mutable for RemoteServer in order to maintain identity across nodes
@volatile protected[akka] var _uuid = UUID.newUuid.toString
@volatile protected[this] var _isRunning = false
@volatile protected[this] var _isShutDown = false
@volatile protected[akka] var _isBeingRestarted = false
@volatile protected[this] var _status: ActorRefStatus.StatusType = ActorRefStatus.UNSTARTED
@volatile protected[akka] var _homeAddress = new InetSocketAddress(RemoteServerModule.HOSTNAME, RemoteServerModule.PORT)
@volatile protected[akka] var _futureTimeout: Option[ScheduledFuture[AnyRef]] = None
@volatile protected[akka] var registeredInRemoteNodeDuringSerialization = false
@ -229,17 +238,25 @@ trait ActorRef extends
/**
* Is the actor being restarted?
*/
def isBeingRestarted: Boolean = _isBeingRestarted
def isBeingRestarted: Boolean = _status == ActorRefStatus.BEING_RESTARTED
/**
* Is the actor running?
*/
def isRunning: Boolean = _isRunning
def isRunning: Boolean = _status match {
case ActorRefStatus.BEING_RESTARTED | ActorRefStatus.RUNNING => true
case _ => false
}
/**
* Is the actor shut down?
*/
def isShutdown: Boolean = _isShutDown
def isShutdown: Boolean = _status == ActorRefStatus.SHUTDOWN
/**
* Is the actor ever started?
*/
def isUnstarted: Boolean = _status == ActorRefStatus.UNSTARTED
/**
* Is the actor able to handle the message passed in as arguments?
@ -601,9 +618,9 @@ trait ActorRef extends
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit
protected[akka] def registerSupervisorAsRemoteActor: Option[String]
@ -800,7 +817,7 @@ class LocalActorRef private[akka](
if (isTransactor) {
_transactionFactory = Some(TransactionFactory(_transactionConfig, id))
}
_isRunning = true
_status = ActorRefStatus.RUNNING
if (!isInInitialization) initializeActorInstance
else runActorInitialization = true
}
@ -815,8 +832,7 @@ class LocalActorRef private[akka](
cancelReceiveTimeout
dispatcher.unregister(this)
_transactionFactory = None
_isRunning = false
_isShutDown = true
_status = ActorRefStatus.SHUTDOWN
actor.postStop
ActorRegistry.unregister(this)
if (isRemotingEnabled) {
@ -864,11 +880,11 @@ class LocalActorRef private[akka](
* <p/>
* To be invoked from within the actor itself.
*/
def startLink(actorRef: ActorRef) = guard.withGuard {
def startLink(actorRef: ActorRef):Unit = guard.withGuard {
try {
actorRef.start
} finally {
link(actorRef)
} finally {
actorRef.start
}
}
@ -877,13 +893,13 @@ class LocalActorRef private[akka](
* <p/>
* To be invoked from within the actor itself.
*/
def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int) = guard.withGuard {
def startLinkRemote(actorRef: ActorRef, hostname: String, port: Int): Unit = guard.withGuard {
ensureRemotingEnabled
try {
actorRef.makeRemote(hostname, port)
actorRef.start
} finally {
link(actorRef)
} finally {
actorRef.start
}
}
@ -893,9 +909,7 @@ class LocalActorRef private[akka](
* To be invoked from within the actor itself.
*/
def spawn(clazz: Class[_ <: Actor]): ActorRef = guard.withGuard {
val actorRef = spawnButDoNotStart(clazz)
actorRef.start
actorRef
spawnButDoNotStart(clazz).start
}
/**
@ -919,9 +933,9 @@ class LocalActorRef private[akka](
def spawnLink(clazz: Class[_ <: Actor]): ActorRef = guard.withGuard {
val actor = spawnButDoNotStart(clazz)
try {
actor.start
} finally {
link(actor)
} finally {
actor.start
}
actor
}
@ -936,10 +950,11 @@ class LocalActorRef private[akka](
val actor = spawnButDoNotStart(clazz)
try {
actor.makeRemote(hostname, port)
actor.start
} finally {
link(actor)
} finally {
actor.start
}
actor
}
/**
@ -1001,7 +1016,7 @@ class LocalActorRef private[akka](
}
/**
* Callback for the dispatcher. This is the ingle entry point to the user Actor implementation.
* Callback for the dispatcher. This is the single entry point to the user Actor implementation.
*/
protected[akka] def invoke(messageHandle: MessageInvocation): Unit = guard.withGuard {
if (isShutdown)
@ -1038,12 +1053,18 @@ class LocalActorRef private[akka](
}
}
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = {
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = {
if (maxNrOfRetriesCount == 0) restartsWithinTimeRangeTimestamp = System.currentTimeMillis // first time around
maxNrOfRetriesCount += 1
val tooManyRestarts = if (maxNrOfRetries.isDefined) {
maxNrOfRetriesCount += 1
maxNrOfRetriesCount > maxNrOfRetries.get
} else false
val restartingHasExpired = if (withinTimeRange.isDefined)
(System.currentTimeMillis - restartsWithinTimeRangeTimestamp) > withinTimeRange.get
else false
val tooManyRestarts = maxNrOfRetriesCount > maxNrOfRetries
val restartingHasExpired = (System.currentTimeMillis - restartsWithinTimeRangeTimestamp) > withinTimeRange
if (tooManyRestarts || restartingHasExpired) {
val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason)
Actor.log.warning(
@ -1062,7 +1083,7 @@ class LocalActorRef private[akka](
stop
} else {
_isBeingRestarted = true
_status = ActorRefStatus.BEING_RESTARTED
val failedActor = actorInstance.get
guard.withGuard {
lifeCycle match {
@ -1072,16 +1093,18 @@ class LocalActorRef private[akka](
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
Actor.log.debug("Restarting linked actors for actor [%s].", id)
restartLinkedActors(reason, maxNrOfRetries, withinTimeRange)
Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id)
if (isProxyableDispatcher(failedActor)) restartProxyableDispatcher(failedActor, reason)
else restartActor(failedActor, reason)
_isBeingRestarted = false
else restartActor(failedActor, reason)
_status = ActorRefStatus.RUNNING
}
}
}
}
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int) = {
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) = {
linkedActorsAsList.foreach { actorRef =>
actorRef.lifeCycle match {
// either permanent or none where default is permanent
@ -1131,11 +1154,7 @@ class LocalActorRef private[akka](
freshActor.postRestart(reason)
}
private def spawnButDoNotStart(clazz: Class[_ <: Actor]): ActorRef = guard.withGuard {
val actorRef = Actor.actorOf(clazz.newInstance)
if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) actorRef.dispatcher = dispatcher
actorRef
}
private def spawnButDoNotStart(clazz: Class[_ <: Actor]): ActorRef = Actor.actorOf(clazz.newInstance)
private[this] def newActor: Actor = {
Actor.actorRefInCreation.withValue(Some(this)){
@ -1235,7 +1254,7 @@ class LocalActorRef private[akka](
private def handleExceptionInDispatch(reason: Throwable, message: Any, topLevelTransaction: Boolean) = {
Actor.log.error(reason, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message)
_isBeingRestarted = true
_status = ActorRefStatus.BEING_RESTARTED
// abort transaction set
if (isTransactionSetInScope) {
val txSet = getTransactionSetInScope
@ -1375,13 +1394,12 @@ private[akka] case class RemoteActorRef private[akka] (
}
def start: ActorRef = {
_isRunning = true
_status = ActorRefStatus.RUNNING
this
}
def stop: Unit = {
_isRunning = false
_isShutDown = true
_status = ActorRefStatus.SHUTDOWN
postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
}
@ -1417,8 +1435,8 @@ private[akka] case class RemoteActorRef private[akka] (
protected[akka] def mailbox: AnyRef = unsupported
protected[akka] def mailbox_=(value: AnyRef):AnyRef = unsupported
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = unsupported
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = unsupported
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported
protected[akka] def linkedActors: JMap[String, ActorRef] = unsupported
protected[akka] def linkedActorsAsList: List[ActorRef] = unsupported
protected[akka] def invoke(messageHandle: MessageInvocation): Unit = unsupported

View file

@ -8,8 +8,19 @@ import se.scalablesolutions.akka.actor.{ActorRef}
import se.scalablesolutions.akka.dispatch.MessageDispatcher
sealed abstract class FaultHandlingStrategy
case class AllForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends FaultHandlingStrategy
case class OneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends FaultHandlingStrategy
object AllForOneStrategy {
def apply(maxNrOfRetries: Int, withinTimeRange: Int): AllForOneStrategy =
AllForOneStrategy(if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
if (withinTimeRange < 0) None else Some(withinTimeRange))
}
case class AllForOneStrategy(maxNrOfRetries: Option[Int] = None, withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy
object OneForOneStrategy {
def apply(maxNrOfRetries: Int, withinTimeRange: Int): OneForOneStrategy =
this(if (maxNrOfRetries < 0) None else Some(maxNrOfRetries),
if (withinTimeRange < 0) None else Some(withinTimeRange))
}
case class OneForOneStrategy(maxNrOfRetries: Option[Int] = None, withinTimeRange: Option[Int] = None) extends FaultHandlingStrategy
/**
* Configuration classes - not to be used as messages.

View file

@ -11,6 +11,7 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.dispatch.CompletableFuture
import se.scalablesolutions.akka.AkkaException
import se.scalablesolutions.akka.util.{ Function, SideEffect }
/**
* Implements Oz-style dataflow (single assignment) variables.
@ -27,9 +28,22 @@ object DataFlow {
*/
def thread(body: => Unit): Unit = spawn(body)
/** Executes the supplied SideEffect in another thread
* JavaAPI
*/
def thread(body: SideEffect): Unit = spawn(body.apply)
/** Executes the supplied function in another thread
*/
def thread[A <: AnyRef, R <: AnyRef](body: A => R) =
actorOf(new ReactiveEventBasedThread(body)).start
/** Executes the supplied Function in another thread
* JavaAPI
*/
def thread[A <: AnyRef, R <: AnyRef](body: Function[A,R]) =
actorOf(new ReactiveEventBasedThread(body.apply)).start
private class ReactiveEventBasedThread[A <: AnyRef, T <: AnyRef](body: A => T)
extends Actor {
def receive = {
@ -91,6 +105,11 @@ object DataFlow {
"Attempt to change data flow variable (from [" + this.value.get + "] to [" + ref() + "])")
}
/** Sets the value of this variable (if unset) with the value of the supplied variable
* JavaAPI
*/
def set(ref: DataFlowVariable[T]) { this << ref }
/** Sets the value of this variable (if unset)
*/
def <<(value: T) {
@ -99,6 +118,16 @@ object DataFlow {
"Attempt to change data flow variable (from [" + this.value.get + "] to [" + value + "])")
}
/** Sets the value of this variable (if unset) with the value of the supplied variable
* JavaAPI
*/
def set(value: T) { this << value }
/** Retrieves the value of variable
* throws a DataFlowVariableException if it times out
*/
def get(): T = this()
/** Retrieves the value of variable
* throws a DataFlowVariableException if it times out
*/
@ -121,4 +150,46 @@ object DataFlow {
def shutdown = in ! Exit
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class DataFlowStream[T <: Any] extends Seq[T] {
private[this] val queue = new LinkedBlockingQueue[DataFlowVariable[T]]
def <<<(ref: DataFlowVariable[T]) = queue.offer(ref)
def <<<(value: T) = {
val ref = new DataFlowVariable[T]
ref << value
queue.offer(ref)
}
def apply(): T = {
val ref = queue.take
val result = ref()
ref.shutdown
result
}
def take: DataFlowVariable[T] = queue.take
//==== For Seq ====
def length: Int = queue.size
def apply(i: Int): T = {
if (i == 0) apply()
else throw new UnsupportedOperationException(
"Access by index other than '0' is not supported by DataFlowStream")
}
def iterator: Iterator[T] = new Iterator[T] {
private val iter = queue.iterator
def hasNext: Boolean = iter.hasNext
def next: T = { val ref = iter.next; ref() }
}
override def toList: List[T] = queue.toArray.toList.asInstanceOf[List[T]]
}
}

View file

@ -44,9 +44,10 @@ import se.scalablesolutions.akka.util.{Duration, Logging, UUID}
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Dispatchers extends Logging {
val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", -1)
val MAILBOX_CONFIG = MailboxConfig(
val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
val THROUGHPUT_DEADLINE_MS = config.getInt("akka.actor.throughput-deadline-ms",-1)
val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", -1)
val MAILBOX_CONFIG = MailboxConfig(
capacity = Dispatchers.MAILBOX_CAPACITY,
pushTimeOut = config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-ms").map(Duration(_,TimeUnit.MILLISECONDS)),
blockingDequeue = false
@ -58,7 +59,7 @@ object Dispatchers extends Logging {
object globalHawtDispatcher extends HawtDispatcher
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global",THROUGHPUT,MAILBOX_CONFIG) {
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global",THROUGHPUT,THROUGHPUT_DEADLINE_MS,MAILBOX_CONFIG) {
override def register(actor: ActorRef) = {
if (isShutdown) init
super.register(actor)
@ -116,14 +117,14 @@ object Dispatchers extends Logging {
* <p/>
* Has a fluent builder interface for configuring its semantics.
*/
def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, mailboxCapacity: Int) = new ExecutorBasedEventDrivenDispatcher(name, throughput, mailboxCapacity)
def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxCapacity: Int) = new ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineMs, mailboxCapacity)
/**
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
* <p/>
* Has a fluent builder interface for configuring its semantics.
*/
def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, mailboxCapacity: Int, pushTimeOut: Duration) = new ExecutorBasedEventDrivenDispatcher(name, throughput, MailboxConfig(mailboxCapacity,Some(pushTimeOut),false))
def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxCapacity: Int, pushTimeOut: Duration) = new ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineMs, MailboxConfig(mailboxCapacity,Some(pushTimeOut),false))
/**
@ -198,13 +199,28 @@ object Dispatchers extends Logging {
}
val dispatcher: Option[MessageDispatcher] = cfg.getString("type") map {
case "ExecutorBasedEventDrivenWorkStealing" => new ExecutorBasedEventDrivenWorkStealingDispatcher(name,MAILBOX_CAPACITY,threadPoolConfig)
case "ExecutorBasedEventDriven" => new ExecutorBasedEventDrivenDispatcher(name, cfg.getInt("throughput",THROUGHPUT),mailboxBounds,threadPoolConfig)
case "Hawt" => new HawtDispatcher(cfg.getBool("aggregate").getOrElse(true))
case "GlobalExecutorBasedEventDriven" => globalExecutorBasedEventDrivenDispatcher
case "GlobalHawt" => globalHawtDispatcher
case "ExecutorBasedEventDrivenWorkStealing" =>
new ExecutorBasedEventDrivenWorkStealingDispatcher(name,MAILBOX_CAPACITY,threadPoolConfig)
case "ExecutorBasedEventDriven" =>
new ExecutorBasedEventDrivenDispatcher(
name,
cfg.getInt("throughput",THROUGHPUT),
cfg.getInt("throughput-deadline-ms",THROUGHPUT_DEADLINE_MS),
mailboxBounds,
threadPoolConfig)
case unknown => throw new IllegalArgumentException("Unknown dispatcher type [%s]" format unknown)
case "Hawt" =>
new HawtDispatcher(cfg.getBool("aggregate").getOrElse(true))
case "GlobalExecutorBasedEventDriven" =>
globalExecutorBasedEventDrivenDispatcher
case "GlobalHawt" =>
globalHawtDispatcher
case unknown =>
throw new IllegalArgumentException("Unknown dispatcher type [%s]" format unknown)
}
dispatcher

View file

@ -65,12 +65,13 @@ import java.util.concurrent.{RejectedExecutionException, ConcurrentLinkedQueue,
class ExecutorBasedEventDrivenDispatcher(
_name: String,
val throughput: Int = Dispatchers.THROUGHPUT,
val throughputDeadlineMs: Int = Dispatchers.THROUGHPUT_DEADLINE_MS,
mailboxConfig: MailboxConfig = Dispatchers.MAILBOX_CONFIG,
config: (ThreadPoolBuilder) => Unit = _ => ()) extends MessageDispatcher with ThreadPoolBuilder {
def this(_name: String, throughput: Int, capacity: Int) = this(_name,throughput,MailboxConfig(capacity,None,false))
def this(_name: String, throughput: Int) = this(_name, throughput, Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage
def this(_name: String) = this(_name,Dispatchers.THROUGHPUT,Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage
def this(_name: String, throughput: Int, throughputDeadlineMs: Int, capacity: Int) = this(_name,throughput,throughputDeadlineMs,MailboxConfig(capacity,None,false))
def this(_name: String, throughput: Int) = this(_name, throughput, Dispatchers.THROUGHPUT_DEADLINE_MS, Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage
def this(_name: String) = this(_name,Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_MS,Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage
//FIXME remove this from ThreadPoolBuilder
mailboxCapacity = mailboxConfig.capacity
@ -102,24 +103,28 @@ class ExecutorBasedEventDrivenDispatcher(
* @return true if the processing finished before the mailbox was empty, due to the throughput constraint
*/
final def processMailbox(): Boolean = {
val throttle = throughput > 0
var processedMessages = 0
var nextMessage = self.dequeue
if (nextMessage ne null) {
do {
nextMessage.invoke
var nextMessage = self.dequeue
if (nextMessage ne null) {
val throttle = throughput > 0
var processedMessages = 0
val isDeadlineEnabled = throttle && throughputDeadlineMs > 0
val started = if (isDeadlineEnabled) System.currentTimeMillis else 0
if(throttle) { //Will be elided when false
processedMessages += 1
if (processedMessages >= throughput) //If we're throttled, break out
return !self.isEmpty
}
nextMessage = self.dequeue
}
while (nextMessage ne null)
}
do {
nextMessage.invoke
false
if(throttle) { //Will be elided when false
processedMessages += 1
if ((processedMessages >= throughput)
|| (isDeadlineEnabled && (System.currentTimeMillis - started) >= throughputDeadlineMs)) //If we're throttled, break out
return !self.isEmpty
}
nextMessage = self.dequeue
}
while (nextMessage ne null)
}
false
}
}
@ -183,4 +188,4 @@ class ExecutorBasedEventDrivenDispatcher(
config(this)
buildThreadPool
}
}
}

View file

@ -160,6 +160,7 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
if (!_completed) {
_completed = true
_result = Some(result)
onComplete(result)
}
} finally {
_signal.signalAll
@ -171,6 +172,7 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
if (!_completed) {
_completed = true
_exception = Some(exception)
onCompleteException(exception)
}
} finally {
_signal.signalAll
@ -178,4 +180,6 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
}
private def currentTimeInNanos: Long = TIME_UNIT.toNanos(System.currentTimeMillis)
protected def onComplete(result: T) {}
protected def onCompleteException(exception: Throwable) {}
}

View file

@ -11,6 +11,14 @@ import se.scalablesolutions.akka.config.Config.config
import concurrent.forkjoin.{TransferQueue, LinkedTransferQueue}
import java.util.concurrent.{ConcurrentLinkedQueue, BlockingQueue, TimeUnit, LinkedBlockingQueue}
object ThreadBasedDispatcher {
def oneThread(b: ThreadPoolBuilder) {
b setCorePoolSize 1
b setMaxPoolSize 1
b setAllowCoreThreadTimeout true
}
}
/**
* Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
*
@ -18,16 +26,14 @@ import java.util.concurrent.{ConcurrentLinkedQueue, BlockingQueue, TimeUnit, Lin
*/
class ThreadBasedDispatcher(private val actor: ActorRef,
val mailboxConfig: MailboxConfig
) extends MessageDispatcher {
) extends ExecutorBasedEventDrivenDispatcher(
actor.getClass.getName + ":" + actor.uuid,
Dispatchers.THROUGHPUT,
-1,
mailboxConfig,
ThreadBasedDispatcher.oneThread) {
def this(actor: ActorRef, capacity: Int) = this(actor,MailboxConfig(capacity,None,true))
def this(actor: ActorRef) = this(actor, Dispatchers.MAILBOX_CAPACITY)// For Java
private val name = actor.getClass.getName + ":" + actor.uuid
private val threadName = "akka:thread-based:dispatcher:" + name
private var selectorThread: Thread = _
@volatile private var active: Boolean = false
override def createMailbox(actorRef: ActorRef): AnyRef = mailboxConfig.newMailbox(blockDequeue = true)
override def register(actorRef: ActorRef) = {
if(actorRef != actor)
@ -36,35 +42,5 @@ class ThreadBasedDispatcher(private val actor: ActorRef,
super.register(actorRef)
}
def mailbox = actor.mailbox.asInstanceOf[Queue[MessageInvocation] with MessageQueue]
def mailboxSize(a: ActorRef) = mailbox.size
def dispatch(invocation: MessageInvocation) = mailbox enqueue invocation
def start = if (!active) {
log.debug("Starting up %s", toString)
active = true
selectorThread = new Thread(threadName) {
override def run = {
while (active) {
try {
actor.invoke(mailbox.dequeue)
} catch { case e: InterruptedException => active = false }
}
}
}
selectorThread.start
}
def isShutdown = !active
def shutdown = if (active) {
log.debug("Shutting down %s", toString)
active = false
selectorThread.interrupt
uuids.clear
}
override def toString = "ThreadBasedDispatcher[" + threadName + "]"
override def toString = "ThreadBasedDispatcher[" + name + "]"
}

View file

@ -13,4 +13,11 @@ trait Function[T,R] {
*/
trait Procedure[T] {
def apply(param: T): Unit
}
}
/**
* An executable piece of code that takes no parameters and doesn't return any value
*/
trait SideEffect {
def apply: Unit
}

View file

@ -111,7 +111,7 @@ class Logger(val logger: SLFLogger) {
warning(message(fmt,arg,argN:_*))
}
def warn(fmt: => String, arg: Any, argN: Any*) = warning(fmt, arg, argN)
def warn(fmt: => String, arg: Any, argN: Any*) = warning(fmt, arg, argN:_*)
def warning(msg: => String) {
if (warning_?) logger warn msg

View file

@ -70,5 +70,32 @@ class RestartStrategySpec extends JUnitSuite {
}
assert(exceptionLatch.tryAwait(1, TimeUnit.SECONDS))
}
@Test
def slaveShouldBeImmortalWithoutMaxRestarts = {
val boss = actorOf(new Actor{
self.trapExit = List(classOf[Throwable])
self.faultHandler = Some(OneForOneStrategy(None, None))
protected def receive = { case _ => () }
}).start
val countDownLatch = new CountDownLatch(100)
val slave = actorOf(new Actor{
protected def receive = {
case Crash => throw new Exception("Crashing...")
}
override def postRestart(reason: Throwable) = {
countDownLatch.countDown
}
})
boss.startLink(slave)
(1 to 100) foreach { _ => slave ! Crash }
assert(countDownLatch.await(120, TimeUnit.SECONDS))
}
}

View file

@ -69,5 +69,97 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll {
result.get should equal (sum(0,ints(0,1000)))
List(x,y,z).foreach(_.shutdown)
}
/*it("should be able to join streams") {
import DataFlow._
ActorRegistry.shutdownAll
def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) {
stream <<< n
ints(n + 1, max, stream)
}
def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = {
out <<< s
sum(in() + s, in, out)
}
val producer = new DataFlowStream[Int]
val consumer = new DataFlowStream[Int]
val latch = new CountDownLatch(1)
val result = new AtomicInteger(0)
val t1 = thread { ints(0, 1000, producer) }
val t2 = thread {
Thread.sleep(1000)
result.set(producer.map(x => x * x).foldLeft(0)(_ + _))
latch.countDown
}
latch.await(3,TimeUnit.SECONDS) should equal (true)
result.get should equal (332833500)
}
it("should be able to sum streams recursively") {
import DataFlow._
def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) {
stream <<< n
ints(n + 1, max, stream)
}
def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = {
out <<< s
sum(in() + s, in, out)
}
val result = new AtomicLong(0)
val producer = new DataFlowStream[Int]
val consumer = new DataFlowStream[Int]
val latch = new CountDownLatch(1)
@tailrec def recurseSum(stream: DataFlowStream[Int]): Unit = {
val x = stream()
if(result.addAndGet(x) == 166666500)
latch.countDown
recurseSum(stream)
}
thread { ints(0, 1000, producer) }
thread { sum(0, producer, consumer) }
thread { recurseSum(consumer) }
latch.await(15,TimeUnit.SECONDS) should equal (true)
}*/
/* Test not ready for prime time, causes some sort of deadlock */
/* it("should be able to conditionally set variables") {
import DataFlow._
ActorRegistry.shutdownAll
val latch = new CountDownLatch(1)
val x, y, z, v = new DataFlowVariable[Int]
val main = thread {
x << 1
z << Math.max(x(),y())
latch.countDown
}
val setY = thread {
// Thread.sleep(2000)
y << 2
}
val setV = thread {
v << y
}
List(x,y,z,v) foreach (_.shutdown)
latch.await(2,TimeUnit.SECONDS) should equal (true)
}*/
}
}

View file

@ -3,9 +3,10 @@ package se.scalablesolutions.akka.actor.dispatch
import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.dispatch.Dispatchers
import se.scalablesolutions.akka.dispatch.{Dispatchers,ExecutorBasedEventDrivenDispatcher}
import se.scalablesolutions.akka.actor.Actor
import Actor._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
object ExecutorBasedEventDrivenDispatcherActorSpec {
class TestActor extends Actor {
@ -65,4 +66,73 @@ class ExecutorBasedEventDrivenDispatcherActorSpec extends JUnitSuite {
}
actor.stop
}
@Test def shouldRespectThroughput {
val throughputDispatcher = new ExecutorBasedEventDrivenDispatcher("THROUGHPUT",101,0,Dispatchers.MAILBOX_CONFIG, (e) => {
e.setCorePoolSize(1)
})
val works = new AtomicBoolean(true)
val latch = new CountDownLatch(100)
val start = new CountDownLatch(1)
val fastOne = actorOf(
new Actor {
self.dispatcher = throughputDispatcher
def receive = { case "sabotage" => works.set(false) }
}).start
val slowOne = actorOf(
new Actor {
self.dispatcher = throughputDispatcher
def receive = {
case "hogexecutor" => start.await
case "ping" => if (works.get) latch.countDown
}
}).start
slowOne ! "hogexecutor"
(1 to 100) foreach { _ => slowOne ! "ping"}
fastOne ! "sabotage"
start.countDown
val result = latch.await(3,TimeUnit.SECONDS)
fastOne.stop
slowOne.stop
throughputDispatcher.shutdown
assert(result === true)
}
@Test def shouldRespectThroughputDeadline {
val deadlineMs = 100
val throughputDispatcher = new ExecutorBasedEventDrivenDispatcher("THROUGHPUT",2,deadlineMs,Dispatchers.MAILBOX_CONFIG, (e) => {
e.setCorePoolSize(1)
})
val works = new AtomicBoolean(true)
val latch = new CountDownLatch(1)
val start = new CountDownLatch(1)
val ready = new CountDownLatch(1)
val fastOne = actorOf(
new Actor {
self.dispatcher = throughputDispatcher
def receive = { case "ping" => if(works.get) latch.countDown; self.stop }
}).start
val slowOne = actorOf(
new Actor {
self.dispatcher = throughputDispatcher
def receive = {
case "hogexecutor" => ready.countDown; start.await
case "ping" => works.set(false); self.stop
}
}).start
slowOne ! "hogexecutor"
slowOne ! "ping"
fastOne ! "ping"
assert(ready.await(5,TimeUnit.SECONDS) === true)
Thread.sleep(deadlineMs)
start.countDown
assert(latch.await(10,TimeUnit.SECONDS) === true)
}
}

View file

@ -146,7 +146,7 @@ object HawtDispatcherEchoServer {
read_source.setEventHandler(^{ read })
read_source.setCancelHandler(^{ close })
write_source = createSource(channel, SelectionKey.OP_READ, HawtDispatcher.queue(self));
write_source = createSource(channel, SelectionKey.OP_WRITE, HawtDispatcher.queue(self));
write_source.setEventHandler(^{ write })
write_source.setCancelHandler(^{ close })

View file

@ -18,10 +18,10 @@ import se.scalablesolutions.akka.camel.{Failure, CamelMessageConversion, Message
import CamelMessageConversion.toExchangeAdapter
import se.scalablesolutions.akka.dispatch.{CompletableFuture, MessageInvocation, MessageDispatcher}
import se.scalablesolutions.akka.stm.TransactionConfig
import se.scalablesolutions.akka.actor.{ScalaActorRef, ActorRegistry, Actor, ActorRef}
import se.scalablesolutions.akka.AkkaException
import scala.reflect.BeanProperty
import se.scalablesolutions.akka.actor._
/**
* Camel component for sending messages to and receiving replies from (untyped) actors.
@ -199,13 +199,12 @@ private[akka] object AsyncCallbackAdapter {
private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCallback) extends ActorRef with ScalaActorRef {
def start = {
_isRunning = true
_status = ActorRefStatus.RUNNING
this
}
def stop() = {
_isRunning = false
_isShutDown = true
_status = ActorRefStatus.SHUTDOWN
}
/**
@ -247,8 +246,8 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](message: Any, timeout: Long, senderOption: Option[ActorRef], senderFuture: Option[CompletableFuture[T]]) = unsupported
protected[akka] def mailbox: AnyRef = unsupported
protected[akka] def mailbox_=(msg: AnyRef):AnyRef = unsupported
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = unsupported
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = unsupported
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported
protected[akka] def linkedActors: JavaMap[String, ActorRef] = unsupported
protected[akka] def linkedActorsAsList: List[ActorRef] = unsupported

View file

@ -82,7 +82,6 @@ trait Storage {
*/
trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
with Transactional with Committable with Abortable with Logging {
protected val shouldClearOnCommit = Ref[Boolean]()
// operations on the Map
trait Op
@ -90,11 +89,12 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
case object PUT extends Op
case object REM extends Op
case object UPD extends Op
case object CLR extends Op
// append only log: records all mutating operations
protected val appendOnlyTxLog = TransactionalVector[LogEntry]()
case class LogEntry(key: K, value: Option[V], op: Op)
case class LogEntry(key: Option[K], value: Option[V], op: Op)
// need to override in subclasses e.g. "sameElements" for Array[Byte]
def equal(k1: K, k2: K): Boolean = k1 == k2
@ -114,7 +114,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
protected def clearDistinctKeys = keysInCurrentTx.clear
protected def filterTxLogByKey(key: K): IndexedSeq[LogEntry] =
appendOnlyTxLog filter(e => equal(e.key, key))
appendOnlyTxLog filter(e => e.key.map(equal(_, key)).getOrElse(true))
// need to get current value considering the underlying storage as well as the transaction log
protected def getCurrentValue(key: K): Option[V] = {
@ -128,7 +128,10 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
} catch { case e: Exception => None }
if (txEntries.isEmpty) underlying
else replay(txEntries, key, underlying)
else txEntries.last match {
case LogEntry(_, _, CLR) => None
case _ => replay(txEntries, key, underlying)
}
}
// replay all tx entries for key k with seed = initial
@ -140,9 +143,10 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
case Some(v) => Map((key, v))
}
txEntries.foreach {case LogEntry(k, v, o) => o match {
case PUT => m.put(k, v.get)
case REM => m -= k
case UPD => m.update(k, v.get)
case PUT => m.put(k.get, v.get)
case REM => m -= k.get
case UPD => m.update(k.get, v.get)
case CLR => Map.empty[K, V]
}}
m get key
}
@ -151,12 +155,11 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
val storage: MapStorageBackend[K, V]
def commit = {
// if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get) storage.removeMapStorageFor(uuid)
appendOnlyTxLog.foreach { case LogEntry(k, v, o) => o match {
case PUT => storage.insertMapStorageEntryFor(uuid, k, v.get)
case UPD => storage.insertMapStorageEntryFor(uuid, k, v.get)
case REM => storage.removeMapStorageFor(uuid, k)
case PUT => storage.insertMapStorageEntryFor(uuid, k.get, v.get)
case UPD => storage.insertMapStorageEntryFor(uuid, k.get, v.get)
case REM => storage.removeMapStorageFor(uuid, k.get)
case CLR => storage.removeMapStorageFor(uuid)
}}
appendOnlyTxLog.clear
@ -166,7 +169,6 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
def abort = {
appendOnlyTxLog.clear
clearDistinctKeys
shouldClearOnCommit.swap(false)
}
def -=(key: K) = {
@ -187,7 +189,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
override def put(key: K, value: V): Option[V] = {
register
val curr = getCurrentValue(key)
appendOnlyTxLog add LogEntry(key, Some(value), PUT)
appendOnlyTxLog add LogEntry(Some(key), Some(value), PUT)
addToListOfKeysInTx(key)
curr
}
@ -195,7 +197,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
override def update(key: K, value: V) = {
register
val curr = getCurrentValue(key)
appendOnlyTxLog add LogEntry(key, Some(value), UPD)
appendOnlyTxLog add LogEntry(Some(key), Some(value), UPD)
addToListOfKeysInTx(key)
curr
}
@ -203,7 +205,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
override def remove(key: K) = {
register
val curr = getCurrentValue(key)
appendOnlyTxLog add LogEntry(key, None, REM)
appendOnlyTxLog add LogEntry(Some(key), None, REM)
addToListOfKeysInTx(key)
curr
}
@ -215,9 +217,8 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
override def clear = {
register
appendOnlyTxLog.clear
appendOnlyTxLog add LogEntry(None, None, CLR)
clearDistinctKeys
shouldClearOnCommit.swap(true)
}
override def contains(key: K): Boolean = try {
@ -225,7 +226,8 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
case Seq() => // current tx doesn't use this
storage.getMapStorageEntryFor(uuid, key).isDefined // check storage
case txs => // present in log
txs.last.op != REM // last entry cannot be a REM
val lastOp = txs.last.op
lastOp != REM && lastOp != CLR // last entry cannot be a REM
}
} catch { case e: Exception => false }
@ -366,11 +368,6 @@ trait PersistentMapBinary extends PersistentMap[Array[Byte], Array[Byte]] {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committable with Abortable {
protected val newElems = TransactionalVector[T]()
protected val updatedElems = TransactionalMap[Int, T]()
protected val removedElems = TransactionalVector[T]()
protected val shouldClearOnCommit = Ref[Boolean]()
// operations on the Vector
trait Op
case object ADD extends Op
@ -400,7 +397,6 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
def abort = {
appendOnlyTxLog.clear
shouldClearOnCommit.swap(false)
}
private def replay: List[T] = {
@ -466,14 +462,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
override def first: T = get(0)
override def last: T = {
if (newElems.length != 0) newElems.last
else {
val len = length
if (len == 0) throw new NoSuchElementException("Vector is empty")
get(len - 1)
}
}
override def last: T = replay.last
def length: Int = replay.length

View file

@ -9,7 +9,6 @@ import se.scalablesolutions.akka.persistence.common._
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.config.Config.config
import java.util.NoSuchElementException
import com.novus.casbah.mongodb.Imports._
/**

View file

@ -238,7 +238,7 @@ class MongoTicket343Spec extends
val add = List(("a", "1"), ("b", "2"), ("c", "3"))
(proc !! CLEAR_AFTER_PUT(add)).getOrElse("CLEAR_AFTER_PUT failed") should equal(true)
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(0)
proc.stop
}
}

View file

@ -359,7 +359,6 @@ private [akka] object RedisStorageBackend extends
case e: java.lang.NullPointerException =>
throw new StorageException("Could not connect to Redis server")
case e =>
e.printStackTrace
throw new StorageException("Error in Redis: " + e.getMessage)
}
}

View file

@ -32,6 +32,10 @@ case class VUPD(i: Int, v: String)
case class VUPD_AND_ABORT(i: Int, v: String)
case class VGET(i: Int)
case object VSIZE
case object VLAST
case object VFIRST
case class VLAST_AFTER_ADD(vsToAdd: List[String])
case class VFIRST_AFTER_ADD(vsToAdd: List[String])
case class VGET_AFTER_VADD(vsToAdd: List[String], isToFetch: List[Int])
case class VADD_WITH_SLICE(vsToAdd: List[String], start: Int, cnt: Int)
@ -175,6 +179,30 @@ object Storage {
fooVector.slice(Some(s), None, c)
}
self.reply(l.map(new String(_)))
case VLAST =>
val l = atomic { fooVector last }
self.reply(l)
case VFIRST =>
val l = atomic { fooVector first }
self.reply(l)
case VLAST_AFTER_ADD(vs) =>
val l =
atomic {
vs.foreach(fooVector + _.getBytes)
fooVector last
}
self.reply(l)
case VFIRST_AFTER_ADD(vs) =>
val l =
atomic {
vs.foreach(fooVector + _.getBytes)
fooVector first
}
self.reply(l)
}
}
}
@ -243,7 +271,7 @@ class RedisTicket343Spec extends
val add = List(("a", "1"), ("b", "2"), ("c", "3"))
(proc !! CLEAR_AFTER_PUT(add)).getOrElse("CLEAR_AFTER_PUT failed") should equal(true)
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(0)
proc.stop
}
}
@ -344,7 +372,26 @@ class RedisTicket343Spec extends
(proc !! VADD_WITH_SLICE(List(), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("maulindu", "debasish"))
// slice with new elements added in current transaction
(proc !! VADD_WITH_SLICE(List("a", "b", "c", "d"), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("b", "a"))
(proc !! VADD_WITH_SLICE(List("a", "b", "c", "d"), 2, 4)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("b", "a", "nilanjan", "ramanendu"))
proc.stop
}
}
describe("Miscellaneous vector ops") {
it("vector slice() should not ignore elements added in current transaction") {
val proc = actorOf[RedisSampleVectorStorage]
proc.start
// add 4 elements in separate transactions
(proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
(proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
(proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
(proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
new String((proc !! VLAST).getOrElse("VLAST failed").asInstanceOf[Array[Byte]]) should equal("debasish")
new String((proc !! VFIRST).getOrElse("VFIRST failed").asInstanceOf[Array[Byte]]) should equal("nilanjan")
new String((proc !! VLAST_AFTER_ADD(List("kausik", "tarun"))).getOrElse("VLAST_AFTER_ADD failed").asInstanceOf[Array[Byte]]) should equal("debasish")
new String((proc !! VFIRST_AFTER_ADD(List("kausik", "tarun"))).getOrElse("VFIRST_AFTER_ADD failed").asInstanceOf[Array[Byte]]) should equal("tarun")
proc.stop
}
}

View file

@ -22,6 +22,15 @@ message RemoteActorRefProtocol {
optional uint64 timeout = 4;
}
/**
* Defines a remote ActorRef that "remembers" and uses its original typed Actor instance
* on the original node.
*/
message RemoteTypedActorRefProtocol {
required RemoteActorRefProtocol actorRef = 1;
required string interfaceName = 2;
}
/**
* Defines a fully serialized remote ActorRef (with serialized Actor instance)
* that is about to be instantiated on the remote node. It is fully disconnected
@ -43,6 +52,16 @@ message SerializedActorRefProtocol {
repeated RemoteRequestProtocol messages = 13;
}
/**
* Defines a fully serialized remote ActorRef (with serialized typed actor instance)
* that is about to be instantiated on the remote node. It is fully disconnected
* from its original host.
*/
message SerializedTypedActorRefProtocol {
required SerializedActorRefProtocol actorRef = 1;
required string interfaceName = 2;
}
/**
* Defines a message.
*/
@ -61,6 +80,7 @@ message ActorInfoProtocol {
required uint64 timeout = 3;
required ActorType actorType = 4;
optional TypedActorInfoProtocol typedActorInfo = 5;
optional string id = 6;
}
/**

View file

@ -30,6 +30,7 @@ import org.jboss.netty.handler.ssl.SslHandler
import scala.collection.mutable.Map
import scala.reflect.BeanProperty
import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture}
/**
* Use this object if you need a single remote server on a specific node.
@ -66,6 +67,7 @@ object RemoteNode extends RemoteServer
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object RemoteServer {
val UUID_PREFIX = "uuid:"
val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
val PORT = config.getInt("akka.remote.server.port", 9999)
@ -122,18 +124,20 @@ object RemoteServer {
private class RemoteActorSet {
private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef]
private[RemoteServer] val actorsByUuid = new ConcurrentHashMap[String, ActorRef]
private[RemoteServer] val typedActors = new ConcurrentHashMap[String, AnyRef]
private[RemoteServer] val typedActorsByUuid = new ConcurrentHashMap[String, AnyRef]
}
private val guard = new ReadWriteGuard
private val remoteActorSets = Map[Address, RemoteActorSet]()
private val remoteServers = Map[Address, RemoteServer]()
private[akka] def registerActor(address: InetSocketAddress, uuid: String, actor: ActorRef) = guard.withWriteGuard {
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid, actor)
private[akka] def registerActorByUuid(address: InetSocketAddress, uuid: String, actor: ActorRef) = guard.withWriteGuard {
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actorsByUuid.put(uuid, actor)
}
private[akka] def registerTypedActor(address: InetSocketAddress, uuid: String, typedActor: AnyRef) = guard.withWriteGuard {
private[akka] def registerTypedActorByUuid(address: InetSocketAddress, uuid: String, typedActor: AnyRef) = guard.withWriteGuard {
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(uuid, typedActor)
}
@ -191,6 +195,7 @@ case class RemoteServerClientDisconnected(@BeanProperty val server: RemoteServer
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteServer extends Logging with ListenerManagement {
import RemoteServer._
def name = "RemoteServer@" + hostname + ":" + port
private[akka] var address = RemoteServer.Address(RemoteServer.HOSTNAME,RemoteServer.PORT)
@ -282,10 +287,11 @@ class RemoteServer extends Logging with ListenerManagement {
* @param typedActor typed actor to register
*/
def registerTypedActor(id: String, typedActor: AnyRef): Unit = synchronized {
val typedActors = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors
if (!typedActors.contains(id)) {
log.debug("Registering server side remote actor [%s] with id [%s] on [%s:%d]", typedActor.getClass.getName, id, hostname, port)
typedActors.put(id, typedActor)
log.debug("Registering server side remote typed actor [%s] with id [%s]", typedActor.getClass.getName, id)
if (id.startsWith(UUID_PREFIX)) {
registerTypedActor(id.substring(UUID_PREFIX.length), typedActor, typedActorsByUuid())
} else {
registerTypedActor(id, typedActor, typedActors())
}
}
@ -300,12 +306,27 @@ class RemoteServer extends Logging with ListenerManagement {
* NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
*/
def register(id: String, actorRef: ActorRef): Unit = synchronized {
log.debug("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, id)
if (id.startsWith(UUID_PREFIX)) {
register(id.substring(UUID_PREFIX.length), actorRef, actorsByUuid())
} else {
register(id, actorRef, actors())
}
}
private def register(id: String, actorRef: ActorRef, registry: ConcurrentHashMap[String, ActorRef]) {
if (_isRunning) {
val actorMap = actors()
if (!actorMap.contains(id)) {
if (!registry.contains(id)) {
if (!actorRef.isRunning) actorRef.start
log.debug("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, id)
actorMap.put(id, actorRef)
registry.put(id, actorRef)
}
}
}
private def registerTypedActor(id: String, typedActor: AnyRef, registry: ConcurrentHashMap[String, AnyRef]) {
if (_isRunning) {
if (!registry.contains(id)) {
registry.put(id, typedActor)
}
}
}
@ -318,7 +339,7 @@ class RemoteServer extends Logging with ListenerManagement {
log.debug("Unregistering server side remote actor [%s] with id [%s:%s]", actorRef.actorClass.getName, actorRef.id, actorRef.uuid)
val actorMap = actors()
actorMap remove actorRef.id
if (actorRef.registeredInRemoteNodeDuringSerialization) actorMap remove actorRef.uuid
if (actorRef.registeredInRemoteNodeDuringSerialization) actorsByUuid() remove actorRef.uuid
}
}
@ -330,10 +351,15 @@ class RemoteServer extends Logging with ListenerManagement {
def unregister(id: String):Unit = synchronized {
if (_isRunning) {
log.info("Unregistering server side remote actor with id [%s]", id)
val actorMap = actors()
val actorRef = actorMap get id
actorMap remove id
if (actorRef.registeredInRemoteNodeDuringSerialization) actorMap remove actorRef.uuid
if (id.startsWith(UUID_PREFIX)) {
actorsByUuid().remove(id.substring(UUID_PREFIX.length))
} else {
val actorRef = actors().get(id)
if (actorRef.registeredInRemoteNodeDuringSerialization) {
actorsByUuid() remove actorRef.uuid
}
actors() remove id
}
}
}
@ -345,8 +371,11 @@ class RemoteServer extends Logging with ListenerManagement {
def unregisterTypedActor(id: String):Unit = synchronized {
if (_isRunning) {
log.info("Unregistering server side remote typed actor with id [%s]", id)
val registeredTypedActors = typedActors()
registeredTypedActors.remove(id)
if (id.startsWith(UUID_PREFIX)) {
typedActorsByUuid().remove(id.substring(UUID_PREFIX.length))
} else {
typedActors().remove(id)
}
}
}
@ -354,8 +383,10 @@ class RemoteServer extends Logging with ListenerManagement {
protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message)
private[akka] def actors() = RemoteServer.actorsFor(address).actors
private[akka] def typedActors() = RemoteServer.actorsFor(address).typedActors
private[akka] def actors() = RemoteServer.actorsFor(address).actors
private[akka] def actorsByUuid() = RemoteServer.actorsFor(address).actorsByUuid
private[akka] def typedActors() = RemoteServer.actorsFor(address).typedActors
private[akka] def typedActorsByUuid() = RemoteServer.actorsFor(address).typedActorsByUuid
}
object RemoteServerSslContext {
@ -418,6 +449,7 @@ class RemoteServerHandler(
val openChannels: ChannelGroup,
val applicationLoader: Option[ClassLoader],
val server: RemoteServer) extends SimpleChannelUpstreamHandler with Logging {
import RemoteServer._
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
applicationLoader.foreach(MessageSerializer.setClassLoader(_))
@ -476,11 +508,12 @@ class RemoteServerHandler(
private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = {
log.debug("Received RemoteRequestProtocol[\n%s]", request.toString)
val actorType = request.getActorInfo.getActorType
if (actorType == SCALA_ACTOR) dispatchToActor(request, channel)
else if (actorType == JAVA_ACTOR) throw new IllegalActorStateException("ActorType JAVA_ACTOR is currently not supported")
else if (actorType == TYPED_ACTOR) dispatchToTypedActor(request, channel)
else throw new IllegalActorStateException("Unknown ActorType [" + actorType + "]")
request.getActorInfo.getActorType match {
case SCALA_ACTOR => dispatchToActor(request, channel)
case TYPED_ACTOR => dispatchToTypedActor(request, channel)
case JAVA_ACTOR => throw new IllegalActorStateException("ActorType JAVA_ACTOR is currently not supported")
case other => throw new IllegalActorStateException("Unknown ActorType [" + other + "]")
}
}
private def dispatchToActor(request: RemoteRequestProtocol, channel: Channel) = {
@ -498,27 +531,36 @@ class RemoteServerHandler(
case RemoteActorSystemMessage.Stop => actorRef.stop
case _ => // then match on user defined messages
if (request.getIsOneWay) actorRef.!(message)(sender)
else {
try {
val resultOrNone = (actorRef.!!(message)(sender)).as[AnyRef]
val result = if (resultOrNone.isDefined) resultOrNone.get else null
else actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(message,request.getActorInfo.getTimeout,None,Some(
new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout){
override def onComplete(result: AnyRef) {
log.debug("Returning result from actor invocation [%s]", result)
val replyBuilder = RemoteReplyProtocol.newBuilder
.setId(request.getId)
.setMessage(MessageSerializer.serialize(result))
.setIsSuccessful(true)
.setIsActor(true)
log.debug("Returning result from actor invocation [%s]", result)
val replyBuilder = RemoteReplyProtocol.newBuilder
.setId(request.getId)
.setMessage(MessageSerializer.serialize(result))
.setIsSuccessful(true)
.setIsActor(true)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
channel.write(replyBuilder.build)
try {
channel.write(replyBuilder.build)
} catch {
case e: Throwable =>
server.notifyListeners(RemoteServerError(e, server))
}
}
} catch {
case e: Throwable =>
channel.write(createErrorReplyMessage(e, request, true))
server.notifyListeners(RemoteServerError(e, server))
}
override def onCompleteException(exception: Throwable) {
try {
channel.write(createErrorReplyMessage(exception, request, true))
} catch {
case e: Throwable =>
server.notifyListeners(RemoteServerError(e, server))
}
}
}
))
}
}
@ -555,32 +597,23 @@ class RemoteServerHandler(
}
}
/**
* Find a registered actor by ID (default) or UUID.
* Actors are registered by id apart from registering during serialization see SerializationProtocol.
*/
private def findActorByIdOrUuid(id: String, uuid: String) : ActorRef = {
val registeredActors = server.actors()
var actorRefOrNull = registeredActors get id
if (actorRefOrNull eq null) {
actorRefOrNull = registeredActors get uuid
}
actorRefOrNull
private def findActorById(id: String) : ActorRef = {
server.actors().get(id)
}
/**
* Find a registered typed actor by ID (default) or UUID.
* Actors are registered by id apart from registering during serialization see SerializationProtocol.
*/
private def findTypedActorByIdOrUUid(id: String, uuid: String) : AnyRef = {
val registeredActors = server.typedActors()
var actorRefOrNull = registeredActors get id
if (actorRefOrNull eq null) {
actorRefOrNull = registeredActors get uuid
}
actorRefOrNull
private def findActorByUuid(uuid: String) : ActorRef = {
server.actorsByUuid().get(uuid)
}
private def findTypedActorById(id: String) : AnyRef = {
server.typedActors().get(id)
}
private def findTypedActorByUuid(uuid: String) : AnyRef = {
server.typedActorsByUuid().get(uuid)
}
/**
* Creates a new instance of the actor with name, uuid and timeout specified as arguments.
*
@ -589,15 +622,18 @@ class RemoteServerHandler(
* Does not start the actor.
*/
private def createActor(actorInfo: ActorInfoProtocol): ActorRef = {
val ids = actorInfo.getUuid.split(':')
val uuid = ids(0)
val id = ids(1)
val uuid = actorInfo.getUuid
val id = actorInfo.getId
val name = actorInfo.getTarget
val timeout = actorInfo.getTimeout
val actorRefOrNull = findActorByIdOrUuid(id, uuid)
val actorRefOrNull = if (id.startsWith(UUID_PREFIX)) {
findActorByUuid(id.substring(UUID_PREFIX.length))
} else {
findActorById(id)
}
if (actorRefOrNull eq null) {
try {
log.info("Creating a new remote actor [%s:%s]", name, uuid)
@ -620,11 +656,14 @@ class RemoteServerHandler(
}
private def createTypedActor(actorInfo: ActorInfoProtocol): AnyRef = {
val ids = actorInfo.getUuid.split(':')
val uuid = ids(0)
val id = ids(1)
val uuid = actorInfo.getUuid
val id = actorInfo.getId
val typedActorOrNull = findTypedActorByIdOrUUid(id, uuid)
val typedActorOrNull = if (id.startsWith(UUID_PREFIX)) {
findTypedActorByUuid(id.substring(UUID_PREFIX.length))
} else {
findTypedActorById(id)
}
if (typedActorOrNull eq null) {
val typedActorInfo = actorInfo.getTypedActorInfo

View file

@ -4,10 +4,10 @@
package se.scalablesolutions.akka.serialization
import se.scalablesolutions.akka.actor.{Actor, ActorRef, LocalActorRef, RemoteActorRef, IllegalActorStateException, ActorType}
import se.scalablesolutions.akka.stm.global._
import se.scalablesolutions.akka.stm.TransactionManagement._
import se.scalablesolutions.akka.stm.TransactionManagement
import se.scalablesolutions.akka.dispatch.MessageInvocation
import se.scalablesolutions.akka.remote.{RemoteServer, RemoteRequestProtocolIdFactory, MessageSerializer}
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.{ActorType => ActorTypeProtocol, _}
import ActorTypeProtocol._
@ -15,6 +15,7 @@ import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, F
import se.scalablesolutions.akka.config.ScalaConfig._
import com.google.protobuf.ByteString
import se.scalablesolutions.akka.actor._
/**
* Type class definition for Actor Serialization
@ -36,13 +37,14 @@ trait Format[T <: Actor] extends FromBinary[T] with ToBinary[T]
* Create a Format object with the client actor as the implementation of the type class
*
* <pre>
* object BinaryFormatMyStatelessActor {
* object BinaryFormatMyStatelessActor {
* implicit object MyStatelessActorFormat extends StatelessActorFormat[MyStatelessActor]
* }
* </pre>
*/
trait StatelessActorFormat[T <: Actor] extends Format[T] {
def fromBinary(bytes: Array[Byte], act: T) = act
def toBinary(ac: T) = Array.empty[Byte]
}
@ -53,16 +55,18 @@ trait StatelessActorFormat[T <: Actor] extends Format[T] {
* a serializer object
*
* <pre>
* object BinaryFormatMyJavaSerializableActor {
* implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor] {
* object BinaryFormatMyJavaSerializableActor {
* implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor] {
* val serializer = Serializer.Java
* }
* }
* }
* </pre>
*/
trait SerializerBasedActorFormat[T <: Actor] extends Format[T] {
val serializer: Serializer
def fromBinary(bytes: Array[Byte], act: T) = serializer.fromBinary(bytes, Some(act.self.actorClass)).asInstanceOf[T]
def toBinary(ac: T) = serializer.toBinary(ac)
}
@ -70,23 +74,22 @@ trait SerializerBasedActorFormat[T <: Actor] extends Format[T] {
* Module for local actor serialization.
*/
object ActorSerialization {
def fromBinary[T <: Actor](bytes: Array[Byte])(implicit format: Format[T]): ActorRef =
fromBinaryToLocalActorRef(bytes, format)
def toBinary[T <: Actor](a: ActorRef)(implicit format: Format[T]): Array[Byte] =
toSerializedActorRefProtocol(a, format).toByteArray
def toBinary[T <: Actor](a: ActorRef, serializeMailBox: Boolean = true)(implicit format: Format[T]): Array[Byte] =
toSerializedActorRefProtocol(a, format, serializeMailBox).toByteArray
// wrapper for implicits to be used by Java
def fromBinaryJ[T <: Actor](bytes: Array[Byte], format: Format[T]): ActorRef =
fromBinary(bytes)(format)
// wrapper for implicits to be used by Java
def toBinaryJ[T <: Actor](a: ActorRef, format: Format[T]): Array[Byte] =
toBinary(a)(format)
def toBinaryJ[T <: Actor](a: ActorRef, format: Format[T], srlMailBox: Boolean = true): Array[Byte] =
toBinary(a, srlMailBox)(format)
private def toSerializedActorRefProtocol[T <: Actor](
actorRef: ActorRef, format: Format[T]): SerializedActorRefProtocol = {
private[akka] def toSerializedActorRefProtocol[T <: Actor](
actorRef: ActorRef, format: Format[T], serializeMailBox: Boolean = true): SerializedActorRefProtocol = {
val lifeCycleProtocol: Option[LifeCycleProtocol] = {
def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match {
case Permanent => builder.setLifeCycle(LifeCycleType.PERMANENT)
@ -102,17 +105,40 @@ object ActorSerialization {
}
val originalAddress = AddressProtocol.newBuilder
.setHostname(actorRef.homeAddress.getHostName)
.setPort(actorRef.homeAddress.getPort)
.build
.setHostname(actorRef.homeAddress.getHostName)
.setPort(actorRef.homeAddress.getPort)
.build
val builder = SerializedActorRefProtocol.newBuilder
.setUuid(actorRef.uuid)
.setId(actorRef.id)
.setActorClassname(actorRef.actorClass.getName)
.setOriginalAddress(originalAddress)
.setIsTransactor(actorRef.isTransactor)
.setTimeout(actorRef.timeout)
.setUuid(actorRef.uuid)
.setId(actorRef.id)
.setActorClassname(actorRef.actorClass.getName)
.setOriginalAddress(originalAddress)
.setIsTransactor(actorRef.isTransactor)
.setTimeout(actorRef.timeout)
if (serializeMailBox == true) {
val messages =
actorRef.mailbox match {
case q: java.util.Queue[MessageInvocation] =>
val l = new scala.collection.mutable.ListBuffer[MessageInvocation]
val it = q.iterator
while (it.hasNext == true) l += it.next
l
}
val requestProtocols =
messages.map(m =>
RemoteActorSerialization.createRemoteRequestProtocolBuilder(
actorRef,
m.message,
false,
actorRef.getSender,
None,
ActorType.ScalaActor).build)
requestProtocols.foreach(rp => builder.addMessages(rp))
}
actorRef.receiveTimeout.foreach(builder.setReceiveTimeout(_))
builder.setActorInstance(ByteString.copyFrom(format.toBinary(actorRef.actor.asInstanceOf[T])))
@ -126,33 +152,33 @@ object ActorSerialization {
private def fromBinaryToLocalActorRef[T <: Actor](bytes: Array[Byte], format: Format[T]): ActorRef =
fromProtobufToLocalActorRef(SerializedActorRefProtocol.newBuilder.mergeFrom(bytes).build, format, None)
private def fromProtobufToLocalActorRef[T <: Actor](
protocol: SerializedActorRefProtocol, format: Format[T], loader: Option[ClassLoader]): ActorRef = {
private[akka] def fromProtobufToLocalActorRef[T <: Actor](
protocol: SerializedActorRefProtocol, format: Format[T], loader: Option[ClassLoader]): ActorRef = {
Actor.log.debug("Deserializing SerializedActorRefProtocol to LocalActorRef:\n" + protocol)
val serializer =
if (format.isInstanceOf[SerializerBasedActorFormat[_]])
Some(format.asInstanceOf[SerializerBasedActorFormat[_]].serializer)
else None
if (format.isInstanceOf[SerializerBasedActorFormat[_]])
Some(format.asInstanceOf[SerializerBasedActorFormat[_]].serializer)
else None
val lifeCycle =
if (protocol.hasLifeCycle) {
val lifeCycleProtocol = protocol.getLifeCycle
Some(if (lifeCycleProtocol.getLifeCycle == LifeCycleType.PERMANENT) LifeCycle(Permanent)
else if (lifeCycleProtocol.getLifeCycle == LifeCycleType.TEMPORARY) LifeCycle(Temporary)
else throw new IllegalActorStateException("LifeCycle type is not valid: " + lifeCycleProtocol.getLifeCycle))
} else None
if (protocol.hasLifeCycle) {
val lifeCycleProtocol = protocol.getLifeCycle
Some(if (lifeCycleProtocol.getLifeCycle == LifeCycleType.PERMANENT) LifeCycle(Permanent)
else if (lifeCycleProtocol.getLifeCycle == LifeCycleType.TEMPORARY) LifeCycle(Temporary)
else throw new IllegalActorStateException("LifeCycle type is not valid: " + lifeCycleProtocol.getLifeCycle))
} else None
val supervisor =
if (protocol.hasSupervisor)
Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(protocol.getSupervisor, loader))
else None
if (protocol.hasSupervisor)
Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(protocol.getSupervisor, loader))
else None
val hotswap =
if (serializer.isDefined && protocol.hasHotswapStack) Some(serializer.get
if (serializer.isDefined && protocol.hasHotswapStack) Some(serializer.get
.fromBinary(protocol.getHotswapStack.toByteArray, Some(classOf[PartialFunction[Any, Unit]]))
.asInstanceOf[PartialFunction[Any, Unit]])
else None
else None
val classLoader = loader.getOrElse(getClass.getClassLoader)
@ -194,9 +220,9 @@ object RemoteActorSerialization {
def fromBinaryToRemoteActorRef(bytes: Array[Byte]): ActorRef =
fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, None)
/**
* Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance.
*/
/**
* Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance.
*/
def fromBinaryToRemoteActorRef(bytes: Array[Byte], loader: ClassLoader): ActorRef =
fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader))
@ -225,39 +251,41 @@ object RemoteActorSerialization {
if (!registeredInRemoteNodeDuringSerialization) {
Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port)
RemoteServer.getOrCreateServer(homeAddress)
RemoteServer.registerActor(homeAddress, uuid, ar)
RemoteServer.registerActorByUuid(homeAddress, uuid, ar)
registeredInRemoteNodeDuringSerialization = true
}
RemoteActorRefProtocol.newBuilder
.setUuid(uuid + ":" + id)
.setActorClassname(actorClass.getName)
.setHomeAddress(AddressProtocol.newBuilder.setHostname(host).setPort(port).build)
.setTimeout(timeout)
.build
.setUuid(uuid)
.setActorClassname(actorClass.getName)
.setHomeAddress(AddressProtocol.newBuilder.setHostname(host).setPort(port).build)
.setTimeout(timeout)
.build
}
def createRemoteRequestProtocolBuilder(
actorRef: ActorRef,
message: Any,
isOneWay: Boolean,
senderOption: Option[ActorRef],
typedActorInfo: Option[Tuple2[String, String]],
actorType: ActorType):
RemoteRequestProtocol.Builder = {
actorRef: ActorRef,
message: Any,
isOneWay: Boolean,
senderOption: Option[ActorRef],
typedActorInfo: Option[Tuple2[String, String]],
actorType: ActorType):
RemoteRequestProtocol.Builder = {
import actorRef._
val actorInfoBuilder = ActorInfoProtocol.newBuilder
.setUuid(uuid + ":" + actorRef.id)
.setUuid(uuid)
.setId(actorRef.id)
.setTarget(actorClassName)
.setTimeout(timeout)
typedActorInfo.foreach { typedActor =>
actorInfoBuilder.setTypedActorInfo(
TypedActorInfoProtocol.newBuilder
.setInterface(typedActor._1)
.setMethod(typedActor._2)
.build)
typedActorInfo.foreach {
typedActor =>
actorInfoBuilder.setTypedActorInfo(
TypedActorInfoProtocol.newBuilder
.setInterface(typedActor._1)
.setMethod(typedActor._2)
.build)
}
actorType match {
@ -275,10 +303,110 @@ object RemoteActorSerialization {
val id = registerSupervisorAsRemoteActor
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
senderOption.foreach { sender =>
RemoteServer.getOrCreateServer(sender.homeAddress).register(sender.uuid, sender)
requestBuilder.setSender(toRemoteActorRefProtocol(sender))
senderOption.foreach {
sender =>
RemoteServer.getOrCreateServer(sender.homeAddress).register(sender.uuid, sender)
requestBuilder.setSender(toRemoteActorRefProtocol(sender))
}
requestBuilder
}
}
/**
* Module for local typed actor serialization.
*/
object TypedActorSerialization {
def fromBinary[T <: Actor, U <: AnyRef](bytes: Array[Byte])(implicit format: Format[T]): U =
fromBinaryToLocalTypedActorRef(bytes, format)
def toBinary[T <: Actor](proxy: AnyRef)(implicit format: Format[T]): Array[Byte] = {
toSerializedTypedActorRefProtocol(proxy, format).toByteArray
}
// wrapper for implicits to be used by Java
def fromBinaryJ[T <: Actor, U <: AnyRef](bytes: Array[Byte], format: Format[T]): U =
fromBinary(bytes)(format)
// wrapper for implicits to be used by Java
def toBinaryJ[T <: Actor](a: AnyRef, format: Format[T]): Array[Byte] =
toBinary(a)(format)
private def toSerializedTypedActorRefProtocol[T <: Actor](
proxy: AnyRef, format: Format[T]): SerializedTypedActorRefProtocol = {
val init = AspectInitRegistry.initFor(proxy)
if (init == null) throw new IllegalArgumentException("Proxy for typed actor could not be found in AspectInitRegistry.")
SerializedTypedActorRefProtocol.newBuilder
.setActorRef(ActorSerialization.toSerializedActorRefProtocol(init.actorRef, format))
.setInterfaceName(init.interfaceClass.getName)
.build
}
private def fromBinaryToLocalTypedActorRef[T <: Actor, U <: AnyRef](bytes: Array[Byte], format: Format[T]): U =
fromProtobufToLocalTypedActorRef(SerializedTypedActorRefProtocol.newBuilder.mergeFrom(bytes).build, format, None)
private def fromProtobufToLocalTypedActorRef[T <: Actor, U <: AnyRef](
protocol: SerializedTypedActorRefProtocol, format: Format[T], loader: Option[ClassLoader]): U = {
Actor.log.debug("Deserializing SerializedTypedActorRefProtocol to LocalActorRef:\n" + protocol)
val actorRef = ActorSerialization.fromProtobufToLocalActorRef(protocol.getActorRef, format, loader)
val intfClass = toClass(loader, protocol.getInterfaceName)
TypedActor.newInstance(intfClass, actorRef).asInstanceOf[U]
}
private[akka] def toClass[U <: AnyRef](loader: Option[ClassLoader], name: String): Class[U] = {
val classLoader = loader.getOrElse(getClass.getClassLoader)
val clazz = classLoader.loadClass(name)
clazz.asInstanceOf[Class[U]]
}
}
/**
* Module for remote typed actor serialization.
*/
object RemoteTypedActorSerialization {
/**
* Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance.
*/
def fromBinaryToRemoteTypedActorRef[T <: AnyRef](bytes: Array[Byte]): T =
fromProtobufToRemoteTypedActorRef(RemoteTypedActorRefProtocol.newBuilder.mergeFrom(bytes).build, None)
/**
* Deserializes a byte array (Array[Byte]) into a AW RemoteActorRef proxy.
*/
def fromBinaryToRemoteTypedActorRef[T <: AnyRef](bytes: Array[Byte], loader: ClassLoader): T =
fromProtobufToRemoteTypedActorRef(RemoteTypedActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader))
/**
* Serialize as AW RemoteActorRef proxy.
*/
def toBinary[T <: Actor](proxy: AnyRef): Array[Byte] = {
toRemoteTypedActorRefProtocol(proxy).toByteArray
}
/**
* Deserializes a RemoteTypedActorRefProtocol Protocol Buffers (protobuf) Message into AW RemoteActorRef proxy.
*/
private[akka] def fromProtobufToRemoteTypedActorRef[T](protocol: RemoteTypedActorRefProtocol, loader: Option[ClassLoader]): T = {
Actor.log.debug("Deserializing RemoteTypedActorRefProtocol to AW RemoteActorRef proxy:\n" + protocol)
val actorRef = RemoteActorSerialization.fromProtobufToRemoteActorRef(protocol.getActorRef, loader)
val intfClass = TypedActorSerialization.toClass(loader, protocol.getInterfaceName)
TypedActor.createProxyForRemoteActorRef(intfClass, actorRef).asInstanceOf[T]
}
/**
* Serializes the AW TypedActor proxy into a Protocol Buffers (protobuf) Message.
*/
def toRemoteTypedActorRefProtocol(proxy: AnyRef): RemoteTypedActorRefProtocol = {
val init = AspectInitRegistry.initFor(proxy)
RemoteTypedActorRefProtocol.newBuilder
.setActorRef(RemoteActorSerialization.toRemoteActorRefProtocol(init.actorRef))
.setInterfaceName(init.interfaceClass.getName)
.build
}
}

View file

@ -79,7 +79,6 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
}
}
@Test
def shouldSendWithBang {
val actor = RemoteClient.actorFor(
@ -178,5 +177,41 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
assert(actor2.id == actor3.id)
}
@Test
def shouldFindActorByUuid {
val actor1 = actorOf[RemoteActorSpecActorUnidirectional]
val actor2 = actorOf[RemoteActorSpecActorUnidirectional]
server.register("uuid:" + actor1.uuid, actor1)
server.register("my-service", actor2)
val ref1 = RemoteClient.actorFor("uuid:" + actor1.uuid, HOSTNAME, PORT)
val ref2 = RemoteClient.actorFor("my-service", HOSTNAME, PORT)
ref1 ! "OneWay"
assert(RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS))
ref1.stop
ref2 ! "OneWay"
ref2.stop
}
@Test
def shouldRegisterAndUnregister {
val actor1 = actorOf[RemoteActorSpecActorUnidirectional]
server.register("my-service-1", actor1)
assert(server.actors().get("my-service-1") != null, "actor registered")
server.unregister("my-service-1")
assert(server.actors().get("my-service-1") == null, "actor unregistered")
}
@Test
def shouldRegisterAndUnregisterByUuid {
val actor1 = actorOf[RemoteActorSpecActorUnidirectional]
server.register("uuid:" + actor1.uuid, actor1)
assert(server.actorsByUuid().get(actor1.uuid) != null, "actor registered")
server.unregister("uuid:" + actor1.uuid)
assert(server.actorsByUuid().get(actor1.uuid) == null, "actor unregistered")
}
}

View file

@ -103,9 +103,34 @@ class ServerInitiatedRemoteTypedActorSpec extends
it("should register and unregister typed actors") {
val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
server.registerTypedActor("my-test-service", typedActor)
assert(server.typedActors().get("my-test-service") != null)
assert(server.typedActors().get("my-test-service") != null, "typed actor registered")
server.unregisterTypedActor("my-test-service")
assert(server.typedActors().get("my-test-service") == null)
assert(server.typedActors().get("my-test-service") == null, "typed actor unregistered")
}
it("should register and unregister typed actors by uuid") {
val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
val init = AspectInitRegistry.initFor(typedActor)
val uuid = "uuid:" + init.actorRef.uuid
server.registerTypedActor(uuid, typedActor)
assert(server.typedActorsByUuid().get(init.actorRef.uuid) != null, "typed actor registered")
server.unregisterTypedActor(uuid)
assert(server.typedActorsByUuid().get(init.actorRef.uuid) == null, "typed actor unregistered")
}
it("should find typed actors by uuid") {
val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
val init = AspectInitRegistry.initFor(typedActor)
val uuid = "uuid:" + init.actorRef.uuid
server.registerTypedActor(uuid, typedActor)
assert(server.typedActorsByUuid().get(init.actorRef.uuid) != null, "typed actor registered")
val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], uuid, HOSTNAME, PORT)
expect("oneway") {
actor.oneWay
oneWayLog.poll(5, TimeUnit.SECONDS)
}
}
}
}

View file

@ -127,9 +127,16 @@ class SerializableTypeClassActorSpec extends
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
actor1.mailboxSize should be > (0)
val actor2 = fromBinary(toBinary(actor1))
Thread.sleep(1000)
actor2.mailboxSize should be > (0)
(actor2 !! "hello-reply").getOrElse("_") should equal("world")
val actor3 = fromBinary(toBinary(actor1, false))
Thread.sleep(1000)
actor3.mailboxSize should equal(0)
(actor3 !! "hello-reply").getOrElse("_") should equal("world")
}
}
}

View file

@ -0,0 +1,126 @@
package se.scalablesolutions.akka.actor.serialization
import org.scalatest.Spec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import se.scalablesolutions.akka.serialization._
import se.scalablesolutions.akka.actor._
import ActorSerialization._
import Actor._
@RunWith(classOf[JUnitRunner])
class Ticket435Spec extends
Spec with
ShouldMatchers with
BeforeAndAfterAll {
object BinaryFormatMyStatefulActor {
implicit object MyStatefulActorFormat extends Format[MyStatefulActor] {
def fromBinary(bytes: Array[Byte], act: MyStatefulActor) = {
val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter]
act.count = p.getCount
act
}
def toBinary(ac: MyStatefulActor) =
ProtobufProtocol.Counter.newBuilder.setCount(ac.count).build.toByteArray
}
}
object BinaryFormatMyStatelessActorWithMessagesInMailbox {
implicit object MyStatelessActorFormat extends StatelessActorFormat[MyStatelessActorWithMessagesInMailbox]
}
describe("Serializable actor") {
it("should be able to serialize and deserialize a stateless actor with messages in mailbox") {
import BinaryFormatMyStatelessActorWithMessagesInMailbox._
val actor1 = actorOf[MyStatelessActorWithMessagesInMailbox].start
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
actor1.mailboxSize should be > (0)
val actor2 = fromBinary(toBinary(actor1))
Thread.sleep(1000)
actor2.mailboxSize should be > (0)
(actor2 !! "hello-reply").getOrElse("_") should equal("world")
val actor3 = fromBinary(toBinary(actor1, false))
Thread.sleep(1000)
actor3.mailboxSize should equal(0)
(actor3 !! "hello-reply").getOrElse("_") should equal("world")
}
it("should serialize the mailbox optionally") {
import BinaryFormatMyStatelessActorWithMessagesInMailbox._
val actor1 = actorOf[MyStatelessActorWithMessagesInMailbox].start
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
actor1.mailboxSize should be > (0)
val actor2 = fromBinary(toBinary(actor1, false))
Thread.sleep(1000)
actor2.mailboxSize should equal(0)
(actor2 !! "hello-reply").getOrElse("_") should equal("world")
}
it("should be able to serialize and deserialize a stateful actor with messages in mailbox") {
import BinaryFormatMyStatefulActor._
val actor1 = actorOf[MyStatefulActor].start
(actor1 ! "hi")
(actor1 ! "hi")
(actor1 ! "hi")
(actor1 ! "hi")
(actor1 ! "hi")
(actor1 ! "hi")
(actor1 ! "hi")
(actor1 ! "hi")
(actor1 ! "hi")
(actor1 ! "hi")
actor1.mailboxSize should be > (0)
val actor2 = fromBinary(toBinary(actor1))
Thread.sleep(1000)
actor2.mailboxSize should be > (0)
(actor2 !! "hello").getOrElse("_") should equal("world 1")
val actor3 = fromBinary(toBinary(actor1, false))
Thread.sleep(1000)
actor3.mailboxSize should equal(0)
(actor3 !! "hello").getOrElse("_") should equal("world 1")
}
}
}
class MyStatefulActor extends Actor {
var count = 0
def receive = {
case "hi" =>
println("# messages in mailbox " + self.mailboxSize)
Thread.sleep(500)
case "hello" =>
count = count + 1
self.reply("world " + count)
}
}

View file

@ -0,0 +1,49 @@
package se.scalablesolutions.akka.actor.serialization
import org.scalatest.Spec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.serialization.Serializable.ScalaJSON
import scala.reflect._
import scala.annotation.target._
import sjson.json.JSONTypeHint
@BeanInfo class MyJsonObject(val key: String,
@(JSONTypeHint @field)(value = classOf[Int])
val map: Map[String, Int],
val standAloneInt: Int) extends ScalaJSON {
private def this() = this(null, null, -1)
override def toString(): String = try {
val mapValue: Int = map.getOrElse(key, -1)
println("Map value: %s".format(mapValue.asInstanceOf[AnyRef].getClass))
"Key: %s, Map value: %d, Stand Alone Int: %d".format(key, mapValue, standAloneInt)
} catch {
case e: ClassCastException => e.getMessage
case _ => "Unknown error"
}
}
@RunWith(classOf[JUnitRunner])
class Ticket436Spec extends
Spec with
ShouldMatchers with
BeforeAndAfterAll {
describe("Serialization of Maps containing Int") {
it("should be able to serialize and de-serialize preserving the data types of the Map") {
val key: String = "myKey"
val value: Int = 123
val standAloneInt: Int = 35
val message = new MyJsonObject(key, Map(key -> value), standAloneInt)
val json = message.toJSON
val copy = Serializer.ScalaJSON.fromJSON[MyJsonObject](json)
copy.asInstanceOf[MyJsonObject].map.get("myKey").get.isInstanceOf[Int] should equal(true)
}
}
}

View file

@ -0,0 +1,166 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor.serialization
import org.scalatest.Spec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import se.scalablesolutions.akka.serialization._
import se.scalablesolutions.akka.actor._
import TypedActorSerialization._
import Actor._
import se.scalablesolutions.akka.remote.{RemoteClient, RemoteServer}
import se.scalablesolutions.akka.actor.remote.ServerInitiatedRemoteActorSpec.RemoteActorSpecActorUnidirectional
@RunWith(classOf[JUnitRunner])
class TypedActorSerializationSpec extends
Spec with
ShouldMatchers with
BeforeAndAfterAll {
var server1: RemoteServer = null
var typedActor: MyTypedActor = null
override def beforeAll = {
server1 = new RemoteServer().start("localhost", 9991)
typedActor = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyTypedActorImpl], 1000)
server1.registerTypedActor("typed-actor-service", typedActor)
Thread.sleep(1000)
}
// make sure the servers shutdown cleanly after the test has finished
override def afterAll = {
try {
TypedActor.stop(typedActor)
server1.shutdown
RemoteClient.shutdownAll
Thread.sleep(1000)
} catch {
case e => ()
}
}
object MyTypedStatelessActorFormat extends StatelessActorFormat[MyStatelessTypedActorImpl]
class MyTypedActorFormat extends Format[MyTypedActorImpl] {
def fromBinary(bytes: Array[Byte], act: MyTypedActorImpl) = {
val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter]
act.count = p.getCount
act
}
def toBinary(ac: MyTypedActorImpl) =
ProtobufProtocol.Counter.newBuilder.setCount(ac.count).build.toByteArray
}
class MyTypedActorWithDualCounterFormat extends Format[MyTypedActorWithDualCounter] {
def fromBinary(bytes: Array[Byte], act: MyTypedActorWithDualCounter) = {
val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.DualCounter])).asInstanceOf[ProtobufProtocol.DualCounter]
act.count1 = p.getCount1
act.count2 = p.getCount2
act
}
def toBinary(ac: MyTypedActorWithDualCounter) =
ProtobufProtocol.DualCounter.newBuilder.setCount1(ac.count1).setCount2(ac.count2).build.toByteArray
}
describe("Serializable typed actor") {
it("should be able to serialize and de-serialize a stateless typed actor") {
val typedActor1 = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyStatelessTypedActorImpl], 1000)
typedActor1.requestReply("hello") should equal("world")
typedActor1.requestReply("hello") should equal("world")
val bytes = toBinaryJ(typedActor1, MyTypedStatelessActorFormat)
val typedActor2: MyTypedActor = fromBinaryJ(bytes, MyTypedStatelessActorFormat)
typedActor2.requestReply("hello") should equal("world")
}
it("should be able to serialize and de-serialize a stateful typed actor") {
val typedActor1 = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyTypedActorImpl], 1000)
typedActor1.requestReply("hello") should equal("world 1")
typedActor1.requestReply("scala") should equal("hello scala 2")
val f = new MyTypedActorFormat
val bytes = toBinaryJ(typedActor1, f)
val typedActor2: MyTypedActor = fromBinaryJ(bytes, f)
typedActor2.requestReply("hello") should equal("world 3")
}
it("should be able to serialize and de-serialize a stateful typed actor with compound state") {
val typedActor1 = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyTypedActorWithDualCounter], 1000)
typedActor1.requestReply("hello") should equal("world 1 1")
typedActor1.requestReply("hello") should equal("world 2 2")
val f = new MyTypedActorWithDualCounterFormat
val bytes = toBinaryJ(typedActor1, f)
val typedActor2: MyTypedActor = fromBinaryJ(bytes, f)
typedActor2.requestReply("hello") should equal("world 3 3")
}
it("should be able to serialize a local yped actor ref to a remote typed actor ref proxy") {
val typedActor1 = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyStatelessTypedActorImpl], 1000)
typedActor1.requestReply("hello") should equal("world")
typedActor1.requestReply("hello") should equal("world")
val bytes = RemoteTypedActorSerialization.toBinary(typedActor1)
val typedActor2: MyTypedActor = RemoteTypedActorSerialization.fromBinaryToRemoteTypedActorRef(bytes)
typedActor1.requestReply("hello") should equal("world")
}
}
}
trait MyTypedActor {
def requestReply(s: String) : String
def oneWay() : Unit
}
class MyTypedActorImpl extends TypedActor with MyTypedActor {
var count = 0
override def oneWay() {
println("got oneWay message")
}
override def requestReply(message: String) : String = {
count = count + 1
if (message == "hello") {
"world " + count
} else ("hello " + message + " " + count)
}
}
class MyTypedActorWithDualCounter extends TypedActor with MyTypedActor {
var count1 = 0
var count2 = 0
override def oneWay() {
println("got oneWay message")
}
override def requestReply(message: String) : String = {
count1 = count1 + 1
count2 = count2 + 1
if (message == "hello") {
"world " + count1 + " " + count2
} else ("hello " + message + " " + count1 + " " + count2)
}
}
class MyStatelessTypedActorImpl extends TypedActor with MyTypedActor {
override def oneWay() {
println("got oneWay message")
}
override def requestReply(message: String) : String = {
if (message == "hello") "world" else ("hello " + message)
}
}

View file

@ -0,0 +1,46 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor.ticket
import org.scalatest.Spec
import org.scalatest.matchers.ShouldMatchers
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.remote.ServerInitiatedRemoteActorSpec.RemoteActorSpecActorUnidirectional
import java.util.concurrent.TimeUnit
import se.scalablesolutions.akka.remote.{RemoteClient, RemoteServer}
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
class Ticket434Spec extends Spec with ShouldMatchers {
describe("A server managed remote actor") {
it("should possible be use a custom service name containing ':'") {
val server = new RemoteServer().start("localhost", 9999)
server.register("my:service", actorOf[RemoteActorSpecActorUnidirectional])
val actor = RemoteClient.actorFor("my:service", 5000L, "localhost", 9999)
actor ! "OneWay"
assert(RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS))
actor.stop
server.shutdown
RemoteClient.shutdownAll
}
}
describe("The ActorInfoProtocol") {
it("should be possible to set the acor id and uuuid") {
val actorInfoBuilder = ActorInfoProtocol.newBuilder
.setUuid("unique-id")
.setId("some-id")
.setTarget("actorClassName")
.setTimeout(5000L)
.setActorType(ActorType.SCALA_ACTOR)
val actorInfo = actorInfoBuilder.build
assert(actorInfo.getUuid === "unique-id")
assert(actorInfo.getId === "some-id")
}
}
}

View file

@ -1,8 +1,5 @@
package sample.camel;
import org.apache.camel.Body;
import org.apache.camel.Header;
import se.scalablesolutions.akka.actor.TypedActor;
/**
@ -10,7 +7,7 @@ import se.scalablesolutions.akka.actor.TypedActor;
*/
public class RemoteTypedConsumer1Impl extends TypedActor implements RemoteTypedConsumer1 {
public String foo(@Body String body, @Header("name") String header) {
public String foo(String body, String header) {
return String.format("remote1: body=%s header=%s", body, header);
}
}

View file

@ -0,0 +1,15 @@
package sample.camel;
import org.apache.camel.Body;
import org.apache.camel.Header;
import se.scalablesolutions.akka.camel.consume;
/**
* @author Martin Krasser
*/
public interface RemoteTypedConsumer2 {
@consume("jetty:http://localhost:6644/camel/remote-typed-actor-2")
public String foo(@Body String body, @Header("name") String header);
}

View file

@ -1,16 +1,13 @@
package sample.camel;
import org.apache.camel.Body;
import org.apache.camel.Header;
import se.scalablesolutions.akka.camel.consume;
import se.scalablesolutions.akka.actor.TypedActor;
/**
* @author Martin Krasser
*/
public class RemoteTypedConsumer2Impl {
public class RemoteTypedConsumer2Impl extends TypedActor implements RemoteTypedConsumer2 {
@consume("jetty:http://localhost:6644/camel/remote-typed-actor-2")
public String foo(@Body String body, @Header("name") String header) {
public String foo(String body, String header) {
return String.format("remote2: body=%s header=%s", body, header);
}

View file

@ -1,7 +1,7 @@
package sample.camel
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.actor.{TypedActor, Actor, ActorRef}
import se.scalablesolutions.akka.actor.TypedActor
import se.scalablesolutions.akka.camel.Message
import se.scalablesolutions.akka.remote.RemoteClient
@ -10,22 +10,20 @@ import se.scalablesolutions.akka.remote.RemoteClient
*/
object ClientApplication extends Application {
//
// TODO: completion of example
//
val actor1 = actorOf[RemoteActor1]
val actor1 = actorOf[RemoteActor1].start
val actor2 = RemoteClient.actorFor("remote2", "localhost", 7777)
val actobj1 = TypedActor.newRemoteInstance(
classOf[RemoteTypedConsumer1], classOf[RemoteTypedConsumer1Impl], "localhost", 7777)
//val actobj2 = TODO: create reference to server-managed typed actor (RemoteTypedConsumer2Impl)
val typedActor1 = TypedActor.newRemoteInstance(
classOf[RemoteTypedConsumer1],
classOf[RemoteTypedConsumer1Impl], "localhost", 7777)
actor1.start
val typedActor2 = RemoteClient.typedActorFor(
classOf[RemoteTypedConsumer2], "remote3", "localhost", 7777)
println(actor1 !! Message("actor1")) // activates and publishes actor remotely
println(actor2 !! Message("actor2")) // actor already activated and published remotely
println(actobj1.foo("x", "y")) // activates and publishes typed actor methods remotely
// ...
println(typedActor1.foo("x1", "y1")) // activates and publishes typed actor methods remotely
println(typedActor2.foo("x2", "y2")) // typed actor methods already activated and published remotely
}

View file

@ -3,6 +3,7 @@ package sample.camel
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.remote.RemoteNode
import se.scalablesolutions.akka.camel.CamelServiceManager
import se.scalablesolutions.akka.actor.TypedActor
/**
* @author Martin Krasser
@ -10,11 +11,14 @@ import se.scalablesolutions.akka.camel.CamelServiceManager
object ServerApplication extends Application {
import CamelServiceManager._
//
// TODO: completion of example
//
startCamelService
val ua = actorOf[RemoteActor2].start
val ta = TypedActor.newInstance(
classOf[RemoteTypedConsumer2],
classOf[RemoteTypedConsumer2Impl], 2000)
RemoteNode.start("localhost", 7777)
RemoteNode.register("remote2", actorOf[RemoteActor2].start)
RemoteNode.register("remote2", ua)
RemoteNode.registerTypedActor("remote3", ta)
}

View file

@ -390,11 +390,22 @@ object TypedActor extends Logging {
if (config._messageDispatcher.isDefined) actorRef.dispatcher = config._messageDispatcher.get
if (config._threadBasedDispatcher.isDefined) actorRef.dispatcher = Dispatchers.newThreadBasedDispatcher(actorRef)
if (config._host.isDefined) actorRef.makeRemote(config._host.get)
actorRef.timeout = config.timeout
AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, config._host, config.timeout))
actorRef.start
proxy.asInstanceOf[T]
}
private[akka] def newInstance[T](intfClass: Class[T], actorRef: ActorRef): T = {
if (!actorRef.actorInstance.get.isInstanceOf[TypedActor]) throw new IllegalArgumentException("ActorRef is not a ref to a typed actor")
val typedActor = actorRef.actorInstance.get.asInstanceOf[TypedActor]
val proxy = Proxy.newInstance(Array(intfClass), Array(typedActor), true, false)
typedActor.initialize(proxy)
AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, actorRef.remoteAddress, actorRef.timeout))
actorRef.start
proxy.asInstanceOf[T]
}
private[akka] def newInstance[T](intfClass: Class[T], targetClass: Class[_],
remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
val actorRef = actorOf(newTypedActor(targetClass))

View file

@ -87,7 +87,7 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
SamplePojoImpl.reset
val pojo = TypedActor.newInstance(classOf[SimpleJavaPojo], classOf[SimpleJavaPojoImpl])
val supervisor = TypedActor.newInstance(classOf[SimpleJavaPojo], classOf[SimpleJavaPojoImpl])
link(supervisor, pojo, new OneForOneStrategy(3, 2000), Array(classOf[Throwable]))
link(supervisor, pojo, OneForOneStrategy(3, 2000), Array(classOf[Throwable]))
pojo.throwException
Thread.sleep(500)
SimpleJavaPojoImpl._pre should be(true)

View file

@ -25,6 +25,7 @@ akka {
# - TypedActor: methods with non-void return type
serialize-messages = off # Does a deep clone of (non-primitive) messages to ensure immutability
throughput = 5 # Default throughput for all ExecutorBasedEventDrivenDispatcher, set to 1 for complete fairness
throughput-deadline-ms = -1 # Default throughput deadline for all ExecutorBasedEventDrivenDispatcher, set to 0 or negative for no deadline
default-dispatcher {
type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
@ -44,6 +45,7 @@ akka {
allow-core-timeout = on # Allow core threads to time out
rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard
throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher, set to 1 for complete fairness
throughput-deadline-ms = -1 # Throughput deadline for ExecutorBasedEventDrivenDispatcher, set to 0 or negative for no deadline
aggregate = off # Aggregate on/off for HawtDispatchers
mailbox-capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default)
# If positive then a bounded mailbox is used and the capacity is set using the property

View file

@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.scala-tools</groupId>
<artifactId>time</artifactId>
<version>2.8.0-0.2-SNAPSHOT</version>
<packaging>jar</packaging>
</project>

View file

@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>sjson.json</groupId>
<artifactId>sjson</artifactId>
<version>0.5-SNAPSHOT-2.8.Beta1</version>
<packaging>jar</packaging>
</project>

View file

@ -1,9 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>sjson.json</groupId>
<artifactId>sjson</artifactId>
<version>0.5-SNAPSHOT-2.8.RC2</version>
<description>POM was created from install:install-file</description>
</project>

View file

@ -1,9 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>sjson.json</groupId>
<artifactId>sjson</artifactId>
<version>0.6-SNAPSHOT-2.8.RC3</version>
<description>POM was created from install:install-file</description>
</project>

View file

@ -1,9 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>sjson.json</groupId>
<artifactId>sjson</artifactId>
<version>0.7-2.8.0</version>
<description>POM was created from install:install-file</description>
</project>

View file

@ -1,9 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>sjson.json</groupId>
<artifactId>sjson</artifactId>
<version>0.7-SNAPSHOT-2.8.0</version>
<description>POM was created from install:install-file</description>
</project>

View file

@ -1,9 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>sjson.json</groupId>
<artifactId>sjson</artifactId>
<version>0.7-SNAPSHOT-2.8.RC7</version>
<description>POM was created from install:install-file</description>
</project>

View file

@ -3,6 +3,6 @@
<modelVersion>4.0.0</modelVersion>
<groupId>sjson.json</groupId>
<artifactId>sjson</artifactId>
<version>0.4</version>
<version>0.8-2.8.0</version>
<packaging>jar</packaging>
</project>

View file

@ -1,4 +1,4 @@
/*---------------------------------------------------------------------------\
/*---------------------------------------------------------------------------\
| Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se> |
\---------------------------------------------------------------------------*/
@ -41,6 +41,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
object Repositories {
lazy val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository")
lazy val CasbahRepo = MavenRepository("Casbah Repo", "http://repo.bumnetworks.com/releases")
lazy val CasbahSnapshotRepo = MavenRepository("Casbah Snapshots", "http://repo.bumnetworks.com/snapshots")
lazy val CodehausRepo = MavenRepository("Codehaus Repo", "http://repository.codehaus.org")
lazy val EmbeddedRepo = MavenRepository("Embedded Repo", (info.projectPath / "embedded-repo").asURL.toString)
lazy val FusesourceSnapshotRepo = MavenRepository("Fusesource Snapshots", "http://repo.fusesource.com/nexus/content/repositories/snapshots")
@ -49,7 +51,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2")
lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases")
lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
lazy val CasbahRepoReleases = MavenRepository("Casbah Release Repo", "http://repo.bumnetworks.com/releases")
lazy val ClojarsRepo = MavenRepository("Clojars Repo", "http://clojars.org/repo")
lazy val OracleRepo = MavenRepository("Oracle Repo", "http://download.oracle.com/maven")
}
@ -78,7 +79,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots)
lazy val logbackModuleConfig = ModuleConfiguration("ch.qos.logback",sbt.DefaultMavenRepository)
lazy val atomikosModuleConfig = ModuleConfiguration("com.atomikos",sbt.DefaultMavenRepository)
lazy val casbahRelease = ModuleConfiguration("com.novus",CasbahRepoReleases)
lazy val casbahModuleConfig = ModuleConfiguration("com.novus", CasbahRepo)
lazy val timeModuleConfig = ModuleConfiguration("org.scala-tools", "time", CasbahSnapshotRepo)
lazy val voldemortModuleConfig = ModuleConfiguration("voldemort", ClojarsRepo)
lazy val sleepycatModuleConfig = ModuleConfiguration("com.sleepycat", OracleRepo)
lazy val embeddedRepo = EmbeddedRepo // This is the only exception, because the embedded repo is fast!
@ -174,8 +176,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val casbah = "com.novus" % "casbah_2.8.0" % "1.0.8.5" % "compile"
lazy val time = "org.scala-tools" % "time" % "2.8.0-SNAPSHOT-0.2-SNAPSHOT" % "compile"
lazy val multiverse = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "compile" intransitive
lazy val netty = "org.jboss.netty" % "netty" % "3.2.2.Final" % "compile"
@ -190,7 +190,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val sbinary = "sbinary" % "sbinary" % "2.8.0-0.3.1" % "compile"
lazy val sjson = "sjson.json" % "sjson" % "0.8-SNAPSHOT-2.8.0" % "compile"
lazy val sjson = "sjson.json" % "sjson" % "0.8-2.8.0" % "compile"
lazy val slf4j = "org.slf4j" % "slf4j-api" % SLF4J_VERSION % "compile"