Merge with master

This commit is contained in:
Viktor Klang 2010-09-21 16:00:47 +02:00
commit a4b3ead976
22 changed files with 1716 additions and 269 deletions

View file

@ -28,6 +28,17 @@ import java.lang.reflect.Field
import scala.reflect.BeanProperty
object ActorRefStatus {
/** LifeCycles for ActorRefs
*/
private[akka] sealed trait StatusType
object UNSTARTED extends StatusType
object RUNNING extends StatusType
object BEING_RESTARTED extends StatusType
object SHUTDOWN extends StatusType
}
/**
* ActorRef is an immutable and serializable handle to an Actor.
* <p/>
@ -68,9 +79,7 @@ trait ActorRef extends
// Only mutable for RemoteServer in order to maintain identity across nodes
@volatile protected[akka] var _uuid = newUuid
@volatile protected[this] var _isRunning = false
@volatile protected[this] var _isShutDown = false
@volatile protected[akka] var _isBeingRestarted = false
@volatile protected[this] var _status: ActorRefStatus.StatusType = ActorRefStatus.UNSTARTED
@volatile protected[akka] var _homeAddress = new InetSocketAddress(RemoteServerModule.HOSTNAME, RemoteServerModule.PORT)
@volatile protected[akka] var _futureTimeout: Option[ScheduledFuture[AnyRef]] = None
@volatile protected[akka] var registeredInRemoteNodeDuringSerialization = false
@ -229,17 +238,25 @@ trait ActorRef extends
/**
* Is the actor being restarted?
*/
def isBeingRestarted: Boolean = _isBeingRestarted
def isBeingRestarted: Boolean = _status == ActorRefStatus.BEING_RESTARTED
/**
* Is the actor running?
*/
def isRunning: Boolean = _isRunning
def isRunning: Boolean = _status match {
case ActorRefStatus.BEING_RESTARTED | ActorRefStatus.RUNNING => true
case _ => false
}
/**
* Is the actor shut down?
*/
def isShutdown: Boolean = _isShutDown
def isShutdown: Boolean = _status == ActorRefStatus.SHUTDOWN
/**
* Is the actor ever started?
*/
def isUnstarted: Boolean = _status == ActorRefStatus.UNSTARTED
/**
* Is the actor able to handle the message passed in as arguments?
@ -800,7 +817,7 @@ class LocalActorRef private[akka](
if (isTransactor) {
_transactionFactory = Some(TransactionFactory(_transactionConfig, id))
}
_isRunning = true
_status = ActorRefStatus.RUNNING
if (!isInInitialization) initializeActorInstance
else runActorInitialization = true
}
@ -815,8 +832,7 @@ class LocalActorRef private[akka](
cancelReceiveTimeout
dispatcher.unregister(this)
_transactionFactory = None
_isRunning = false
_isShutDown = true
_status = ActorRefStatus.SHUTDOWN
actor.postStop
ActorRegistry.unregister(this)
if (isRemotingEnabled) {
@ -1000,7 +1016,7 @@ class LocalActorRef private[akka](
}
/**
* Callback for the dispatcher. This is the ingle entry point to the user Actor implementation.
* Callback for the dispatcher. This is the single entry point to the user Actor implementation.
*/
protected[akka] def invoke(messageHandle: MessageInvocation): Unit = guard.withGuard {
if (isShutdown)
@ -1067,7 +1083,7 @@ class LocalActorRef private[akka](
stop
} else {
_isBeingRestarted = true
_status = ActorRefStatus.BEING_RESTARTED
val failedActor = actorInstance.get
guard.withGuard {
lifeCycle match {
@ -1077,10 +1093,12 @@ class LocalActorRef private[akka](
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
Actor.log.debug("Restarting linked actors for actor [%s].", id)
restartLinkedActors(reason, maxNrOfRetries, withinTimeRange)
Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id)
if (isProxyableDispatcher(failedActor)) restartProxyableDispatcher(failedActor, reason)
else restartActor(failedActor, reason)
_isBeingRestarted = false
else restartActor(failedActor, reason)
_status = ActorRefStatus.RUNNING
}
}
}
@ -1236,7 +1254,7 @@ class LocalActorRef private[akka](
private def handleExceptionInDispatch(reason: Throwable, message: Any, topLevelTransaction: Boolean) = {
Actor.log.error(reason, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message)
_isBeingRestarted = true
_status = ActorRefStatus.BEING_RESTARTED
// abort transaction set
if (isTransactionSetInScope) {
val txSet = getTransactionSetInScope
@ -1376,13 +1394,12 @@ private[akka] case class RemoteActorRef private[akka] (
}
def start: ActorRef = {
_isRunning = true
_status = ActorRefStatus.RUNNING
this
}
def stop: Unit = {
_isRunning = false
_isShutDown = true
_status = ActorRefStatus.SHUTDOWN
postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
}

View file

@ -150,4 +150,46 @@ object DataFlow {
def shutdown = in ! Exit
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class DataFlowStream[T <: Any] extends Seq[T] {
private[this] val queue = new LinkedBlockingQueue[DataFlowVariable[T]]
def <<<(ref: DataFlowVariable[T]) = queue.offer(ref)
def <<<(value: T) = {
val ref = new DataFlowVariable[T]
ref << value
queue.offer(ref)
}
def apply(): T = {
val ref = queue.take
val result = ref()
ref.shutdown
result
}
def take: DataFlowVariable[T] = queue.take
//==== For Seq ====
def length: Int = queue.size
def apply(i: Int): T = {
if (i == 0) apply()
else throw new UnsupportedOperationException(
"Access by index other than '0' is not supported by DataFlowStream")
}
def iterator: Iterator[T] = new Iterator[T] {
private val iter = queue.iterator
def hasNext: Boolean = iter.hasNext
def next: T = { val ref = iter.next; ref() }
}
override def toList: List[T] = queue.toArray.toList.asInstanceOf[List[T]]
}
}

View file

@ -188,4 +188,4 @@ class ExecutorBasedEventDrivenDispatcher(
config(this)
buildThreadPool
}
}
}

View file

@ -11,6 +11,14 @@ import se.scalablesolutions.akka.config.Config.config
import concurrent.forkjoin.{TransferQueue, LinkedTransferQueue}
import java.util.concurrent.{ConcurrentLinkedQueue, BlockingQueue, TimeUnit, LinkedBlockingQueue}
object ThreadBasedDispatcher {
def oneThread(b: ThreadPoolBuilder) {
b setCorePoolSize 1
b setMaxPoolSize 1
b setAllowCoreThreadTimeout true
}
}
/**
* Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
*
@ -18,16 +26,14 @@ import java.util.concurrent.{ConcurrentLinkedQueue, BlockingQueue, TimeUnit, Lin
*/
class ThreadBasedDispatcher(private val actor: ActorRef,
val mailboxConfig: MailboxConfig
) extends MessageDispatcher {
) extends ExecutorBasedEventDrivenDispatcher(
actor.getClass.getName + ":" + actor.uuid,
Dispatchers.THROUGHPUT,
-1,
mailboxConfig,
ThreadBasedDispatcher.oneThread) {
def this(actor: ActorRef, capacity: Int) = this(actor,MailboxConfig(capacity,None,true))
def this(actor: ActorRef) = this(actor, Dispatchers.MAILBOX_CAPACITY)// For Java
private val name = actor.getClass.getName + ":" + actor.uuid
private val threadName = "akka:thread-based:dispatcher:" + name
private var selectorThread: Thread = _
@volatile private var active: Boolean = false
override def createMailbox(actorRef: ActorRef): AnyRef = mailboxConfig.newMailbox(blockDequeue = true)
override def register(actorRef: ActorRef) = {
if(actorRef != actor)
@ -36,35 +42,5 @@ class ThreadBasedDispatcher(private val actor: ActorRef,
super.register(actorRef)
}
def mailbox = actor.mailbox.asInstanceOf[Queue[MessageInvocation] with MessageQueue]
def mailboxSize(a: ActorRef) = mailbox.size
def dispatch(invocation: MessageInvocation) = mailbox enqueue invocation
def start = if (!active) {
log.debug("Starting up %s", toString)
active = true
selectorThread = new Thread(threadName) {
override def run = {
while (active) {
try {
actor.invoke(mailbox.dequeue)
} catch { case e: InterruptedException => active = false }
}
}
}
selectorThread.start
}
def isShutdown = !active
def shutdown = if (active) {
log.debug("Shutting down %s", toString)
active = false
selectorThread.interrupt
uuids.clear
}
override def toString = "ThreadBasedDispatcher[" + threadName + "]"
override def toString = "ThreadBasedDispatcher[" + name + "]"
}

View file

@ -69,5 +69,97 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll {
result.get should equal (sum(0,ints(0,1000)))
List(x,y,z).foreach(_.shutdown)
}
/*it("should be able to join streams") {
import DataFlow._
ActorRegistry.shutdownAll
def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) {
stream <<< n
ints(n + 1, max, stream)
}
def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = {
out <<< s
sum(in() + s, in, out)
}
val producer = new DataFlowStream[Int]
val consumer = new DataFlowStream[Int]
val latch = new CountDownLatch(1)
val result = new AtomicInteger(0)
val t1 = thread { ints(0, 1000, producer) }
val t2 = thread {
Thread.sleep(1000)
result.set(producer.map(x => x * x).foldLeft(0)(_ + _))
latch.countDown
}
latch.await(3,TimeUnit.SECONDS) should equal (true)
result.get should equal (332833500)
}
it("should be able to sum streams recursively") {
import DataFlow._
def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) {
stream <<< n
ints(n + 1, max, stream)
}
def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = {
out <<< s
sum(in() + s, in, out)
}
val result = new AtomicLong(0)
val producer = new DataFlowStream[Int]
val consumer = new DataFlowStream[Int]
val latch = new CountDownLatch(1)
@tailrec def recurseSum(stream: DataFlowStream[Int]): Unit = {
val x = stream()
if(result.addAndGet(x) == 166666500)
latch.countDown
recurseSum(stream)
}
thread { ints(0, 1000, producer) }
thread { sum(0, producer, consumer) }
thread { recurseSum(consumer) }
latch.await(15,TimeUnit.SECONDS) should equal (true)
}*/
/* Test not ready for prime time, causes some sort of deadlock */
/* it("should be able to conditionally set variables") {
import DataFlow._
ActorRegistry.shutdownAll
val latch = new CountDownLatch(1)
val x, y, z, v = new DataFlowVariable[Int]
val main = thread {
x << 1
z << Math.max(x(),y())
latch.countDown
}
val setY = thread {
// Thread.sleep(2000)
y << 2
}
val setV = thread {
v << y
}
List(x,y,z,v) foreach (_.shutdown)
latch.await(2,TimeUnit.SECONDS) should equal (true)
}*/
}
}

View file

@ -146,7 +146,7 @@ object HawtDispatcherEchoServer {
read_source.setEventHandler(^{ read })
read_source.setCancelHandler(^{ close })
write_source = createSource(channel, SelectionKey.OP_READ, HawtDispatcher.queue(self));
write_source = createSource(channel, SelectionKey.OP_WRITE, HawtDispatcher.queue(self));
write_source.setEventHandler(^{ write })
write_source.setCancelHandler(^{ close })

View file

@ -19,9 +19,11 @@ import CamelMessageConversion.toExchangeAdapter
import se.scalablesolutions.akka.dispatch.{CompletableFuture, MessageInvocation, MessageDispatcher}
import se.scalablesolutions.akka.stm.TransactionConfig
import se.scalablesolutions.akka.actor.{ScalaActorRef, ActorRegistry, Actor, ActorRef, Uuid, uuidFrom}
import se.scalablesolutions.akka.AkkaException
import scala.reflect.BeanProperty
import se.scalablesolutions.akka.actor._
/**
* Camel component for sending messages to and receiving replies from (untyped) actors.
@ -196,13 +198,12 @@ private[akka] object AsyncCallbackAdapter {
private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCallback) extends ActorRef with ScalaActorRef {
def start = {
_isRunning = true
_status = ActorRefStatus.RUNNING
this
}
def stop() = {
_isRunning = false
_isShutDown = true
_status = ActorRefStatus.SHUTDOWN
}
/**

View file

@ -82,7 +82,6 @@ trait Storage {
*/
trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
with Transactional with Committable with Abortable with Logging {
protected val shouldClearOnCommit = Ref[Boolean]()
// operations on the Map
trait Op
@ -90,11 +89,12 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
case object PUT extends Op
case object REM extends Op
case object UPD extends Op
case object CLR extends Op
// append only log: records all mutating operations
protected val appendOnlyTxLog = TransactionalVector[LogEntry]()
case class LogEntry(key: K, value: Option[V], op: Op)
case class LogEntry(key: Option[K], value: Option[V], op: Op)
// need to override in subclasses e.g. "sameElements" for Array[Byte]
def equal(k1: K, k2: K): Boolean = k1 == k2
@ -114,7 +114,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
protected def clearDistinctKeys = keysInCurrentTx.clear
protected def filterTxLogByKey(key: K): IndexedSeq[LogEntry] =
appendOnlyTxLog filter(e => equal(e.key, key))
appendOnlyTxLog filter(e => e.key.map(equal(_, key)).getOrElse(true))
// need to get current value considering the underlying storage as well as the transaction log
protected def getCurrentValue(key: K): Option[V] = {
@ -128,7 +128,10 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
} catch { case e: Exception => None }
if (txEntries.isEmpty) underlying
else replay(txEntries, key, underlying)
else txEntries.last match {
case LogEntry(_, _, CLR) => None
case _ => replay(txEntries, key, underlying)
}
}
// replay all tx entries for key k with seed = initial
@ -140,9 +143,10 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
case Some(v) => Map((key, v))
}
txEntries.foreach {case LogEntry(k, v, o) => o match {
case PUT => m.put(k, v.get)
case REM => m -= k
case UPD => m.update(k, v.get)
case PUT => m.put(k.get, v.get)
case REM => m -= k.get
case UPD => m.update(k.get, v.get)
case CLR => Map.empty[K, V]
}}
m get key
}
@ -151,12 +155,11 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
val storage: MapStorageBackend[K, V]
def commit = {
// if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get) storage.removeMapStorageFor(uuid)
appendOnlyTxLog.foreach { case LogEntry(k, v, o) => o match {
case PUT => storage.insertMapStorageEntryFor(uuid, k, v.get)
case UPD => storage.insertMapStorageEntryFor(uuid, k, v.get)
case REM => storage.removeMapStorageFor(uuid, k)
case PUT => storage.insertMapStorageEntryFor(uuid, k.get, v.get)
case UPD => storage.insertMapStorageEntryFor(uuid, k.get, v.get)
case REM => storage.removeMapStorageFor(uuid, k.get)
case CLR => storage.removeMapStorageFor(uuid)
}}
appendOnlyTxLog.clear
@ -166,7 +169,6 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
def abort = {
appendOnlyTxLog.clear
clearDistinctKeys
shouldClearOnCommit.swap(false)
}
def -=(key: K) = {
@ -187,7 +189,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
override def put(key: K, value: V): Option[V] = {
register
val curr = getCurrentValue(key)
appendOnlyTxLog add LogEntry(key, Some(value), PUT)
appendOnlyTxLog add LogEntry(Some(key), Some(value), PUT)
addToListOfKeysInTx(key)
curr
}
@ -195,7 +197,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
override def update(key: K, value: V) = {
register
val curr = getCurrentValue(key)
appendOnlyTxLog add LogEntry(key, Some(value), UPD)
appendOnlyTxLog add LogEntry(Some(key), Some(value), UPD)
addToListOfKeysInTx(key)
curr
}
@ -203,7 +205,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
override def remove(key: K) = {
register
val curr = getCurrentValue(key)
appendOnlyTxLog add LogEntry(key, None, REM)
appendOnlyTxLog add LogEntry(Some(key), None, REM)
addToListOfKeysInTx(key)
curr
}
@ -215,9 +217,8 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
override def clear = {
register
appendOnlyTxLog.clear
appendOnlyTxLog add LogEntry(None, None, CLR)
clearDistinctKeys
shouldClearOnCommit.swap(true)
}
override def contains(key: K): Boolean = try {
@ -225,7 +226,8 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
case Seq() => // current tx doesn't use this
storage.getMapStorageEntryFor(uuid, key).isDefined // check storage
case txs => // present in log
txs.last.op != REM // last entry cannot be a REM
val lastOp = txs.last.op
lastOp != REM && lastOp != CLR // last entry cannot be a REM
}
} catch { case e: Exception => false }
@ -366,11 +368,6 @@ trait PersistentMapBinary extends PersistentMap[Array[Byte], Array[Byte]] {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committable with Abortable {
protected val newElems = TransactionalVector[T]()
protected val updatedElems = TransactionalMap[Int, T]()
protected val removedElems = TransactionalVector[T]()
protected val shouldClearOnCommit = Ref[Boolean]()
// operations on the Vector
trait Op
case object ADD extends Op
@ -400,7 +397,6 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
def abort = {
appendOnlyTxLog.clear
shouldClearOnCommit.swap(false)
}
private def replay: List[T] = {
@ -466,14 +462,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
override def first: T = get(0)
override def last: T = {
if (newElems.length != 0) newElems.last
else {
val len = length
if (len == 0) throw new NoSuchElementException("Vector is empty")
get(len - 1)
}
}
override def last: T = replay.last
def length: Int = replay.length

View file

@ -9,7 +9,6 @@ import se.scalablesolutions.akka.persistence.common._
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.config.Config.config
import java.util.NoSuchElementException
import com.novus.casbah.mongodb.Imports._
/**

View file

@ -238,7 +238,7 @@ class MongoTicket343Spec extends
val add = List(("a", "1"), ("b", "2"), ("c", "3"))
(proc !! CLEAR_AFTER_PUT(add)).getOrElse("CLEAR_AFTER_PUT failed") should equal(true)
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(0)
proc.stop
}
}

View file

@ -359,7 +359,6 @@ private [akka] object RedisStorageBackend extends
case e: java.lang.NullPointerException =>
throw new StorageException("Could not connect to Redis server")
case e =>
e.printStackTrace
throw new StorageException("Error in Redis: " + e.getMessage)
}
}

View file

@ -32,6 +32,10 @@ case class VUPD(i: Int, v: String)
case class VUPD_AND_ABORT(i: Int, v: String)
case class VGET(i: Int)
case object VSIZE
case object VLAST
case object VFIRST
case class VLAST_AFTER_ADD(vsToAdd: List[String])
case class VFIRST_AFTER_ADD(vsToAdd: List[String])
case class VGET_AFTER_VADD(vsToAdd: List[String], isToFetch: List[Int])
case class VADD_WITH_SLICE(vsToAdd: List[String], start: Int, cnt: Int)
@ -175,6 +179,30 @@ object Storage {
fooVector.slice(Some(s), None, c)
}
self.reply(l.map(new String(_)))
case VLAST =>
val l = atomic { fooVector last }
self.reply(l)
case VFIRST =>
val l = atomic { fooVector first }
self.reply(l)
case VLAST_AFTER_ADD(vs) =>
val l =
atomic {
vs.foreach(fooVector + _.getBytes)
fooVector last
}
self.reply(l)
case VFIRST_AFTER_ADD(vs) =>
val l =
atomic {
vs.foreach(fooVector + _.getBytes)
fooVector first
}
self.reply(l)
}
}
}
@ -243,7 +271,7 @@ class RedisTicket343Spec extends
val add = List(("a", "1"), ("b", "2"), ("c", "3"))
(proc !! CLEAR_AFTER_PUT(add)).getOrElse("CLEAR_AFTER_PUT failed") should equal(true)
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(1)
(proc !! MAP_SIZE).getOrElse("Size failed") should equal(0)
proc.stop
}
}
@ -344,7 +372,26 @@ class RedisTicket343Spec extends
(proc !! VADD_WITH_SLICE(List(), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("maulindu", "debasish"))
// slice with new elements added in current transaction
(proc !! VADD_WITH_SLICE(List("a", "b", "c", "d"), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("b", "a"))
(proc !! VADD_WITH_SLICE(List("a", "b", "c", "d"), 2, 4)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("b", "a", "nilanjan", "ramanendu"))
proc.stop
}
}
describe("Miscellaneous vector ops") {
it("vector slice() should not ignore elements added in current transaction") {
val proc = actorOf[RedisSampleVectorStorage]
proc.start
// add 4 elements in separate transactions
(proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1)
(proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2)
(proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3)
(proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4)
new String((proc !! VLAST).getOrElse("VLAST failed").asInstanceOf[Array[Byte]]) should equal("debasish")
new String((proc !! VFIRST).getOrElse("VFIRST failed").asInstanceOf[Array[Byte]]) should equal("nilanjan")
new String((proc !! VLAST_AFTER_ADD(List("kausik", "tarun"))).getOrElse("VLAST_AFTER_ADD failed").asInstanceOf[Array[Byte]]) should equal("debasish")
new String((proc !! VFIRST_AFTER_ADD(List("kausik", "tarun"))).getOrElse("VFIRST_AFTER_ADD failed").asInstanceOf[Array[Byte]]) should equal("tarun")
proc.stop
}
}

View file

@ -653,6 +653,360 @@ public final class RemoteProtocol {
// @@protoc_insertion_point(class_scope:RemoteActorRefProtocol)
}
public static final class RemoteTypedActorRefProtocol extends
com.google.protobuf.GeneratedMessage {
// Use RemoteTypedActorRefProtocol.newBuilder() to construct.
private RemoteTypedActorRefProtocol() {
initFields();
}
private RemoteTypedActorRefProtocol(boolean noInit) {}
private static final RemoteTypedActorRefProtocol defaultInstance;
public static RemoteTypedActorRefProtocol getDefaultInstance() {
return defaultInstance;
}
public RemoteTypedActorRefProtocol getDefaultInstanceForType() {
return defaultInstance;
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return se.scalablesolutions.akka.remote.protocol.RemoteProtocol.internal_static_RemoteTypedActorRefProtocol_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return se.scalablesolutions.akka.remote.protocol.RemoteProtocol.internal_static_RemoteTypedActorRefProtocol_fieldAccessorTable;
}
// required .RemoteActorRefProtocol actorRef = 1;
public static final int ACTORREF_FIELD_NUMBER = 1;
private boolean hasActorRef;
private se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol actorRef_;
public boolean hasActorRef() { return hasActorRef; }
public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol getActorRef() { return actorRef_; }
// required string interfaceName = 2;
public static final int INTERFACENAME_FIELD_NUMBER = 2;
private boolean hasInterfaceName;
private java.lang.String interfaceName_ = "";
public boolean hasInterfaceName() { return hasInterfaceName; }
public java.lang.String getInterfaceName() { return interfaceName_; }
private void initFields() {
actorRef_ = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.getDefaultInstance();
}
public final boolean isInitialized() {
if (!hasActorRef) return false;
if (!hasInterfaceName) return false;
if (!getActorRef().isInitialized()) return false;
return true;
}
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
if (hasActorRef()) {
output.writeMessage(1, getActorRef());
}
if (hasInterfaceName()) {
output.writeString(2, getInterfaceName());
}
getUnknownFields().writeTo(output);
}
private int memoizedSerializedSize = -1;
public int getSerializedSize() {
int size = memoizedSerializedSize;
if (size != -1) return size;
size = 0;
if (hasActorRef()) {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(1, getActorRef());
}
if (hasInterfaceName()) {
size += com.google.protobuf.CodedOutputStream
.computeStringSize(2, getInterfaceName());
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol parseFrom(java.io.InputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
Builder builder = newBuilder();
if (builder.mergeDelimitedFrom(input)) {
return builder.buildParsed();
} else {
return null;
}
}
public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
Builder builder = newBuilder();
if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
return builder.buildParsed();
} else {
return null;
}
}
public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
public static final class Builder extends
com.google.protobuf.GeneratedMessage.Builder<Builder> {
private se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol result;
// Construct using se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol.newBuilder()
private Builder() {}
private static Builder create() {
Builder builder = new Builder();
builder.result = new se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol();
return builder;
}
protected se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol internalGetResult() {
return result;
}
public Builder clear() {
if (result == null) {
throw new IllegalStateException(
"Cannot call clear() after build().");
}
result = new se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol();
return this;
}
public Builder clone() {
return create().mergeFrom(result);
}
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol.getDescriptor();
}
public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol getDefaultInstanceForType() {
return se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol.getDefaultInstance();
}
public boolean isInitialized() {
return result.isInitialized();
}
public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol build() {
if (result != null && !isInitialized()) {
throw newUninitializedMessageException(result);
}
return buildPartial();
}
private se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol buildParsed()
throws com.google.protobuf.InvalidProtocolBufferException {
if (!isInitialized()) {
throw newUninitializedMessageException(
result).asInvalidProtocolBufferException();
}
return buildPartial();
}
public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol buildPartial() {
if (result == null) {
throw new IllegalStateException(
"build() has already been called on this Builder.");
}
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol returnMe = result;
result = null;
return returnMe;
}
public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol) {
return mergeFrom((se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol other) {
if (other == se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol.getDefaultInstance()) return this;
if (other.hasActorRef()) {
mergeActorRef(other.getActorRef());
}
if (other.hasInterfaceName()) {
setInterfaceName(other.getInterfaceName());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
public Builder mergeFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
com.google.protobuf.UnknownFieldSet.newBuilder(
this.getUnknownFields());
while (true) {
int tag = input.readTag();
switch (tag) {
case 0:
this.setUnknownFields(unknownFields.build());
return this;
default: {
if (!parseUnknownField(input, unknownFields,
extensionRegistry, tag)) {
this.setUnknownFields(unknownFields.build());
return this;
}
break;
}
case 10: {
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.Builder subBuilder = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.newBuilder();
if (hasActorRef()) {
subBuilder.mergeFrom(getActorRef());
}
input.readMessage(subBuilder, extensionRegistry);
setActorRef(subBuilder.buildPartial());
break;
}
case 18: {
setInterfaceName(input.readString());
break;
}
}
}
}
// required .RemoteActorRefProtocol actorRef = 1;
public boolean hasActorRef() {
return result.hasActorRef();
}
public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol getActorRef() {
return result.getActorRef();
}
public Builder setActorRef(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol value) {
if (value == null) {
throw new NullPointerException();
}
result.hasActorRef = true;
result.actorRef_ = value;
return this;
}
public Builder setActorRef(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.Builder builderForValue) {
result.hasActorRef = true;
result.actorRef_ = builderForValue.build();
return this;
}
public Builder mergeActorRef(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol value) {
if (result.hasActorRef() &&
result.actorRef_ != se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.getDefaultInstance()) {
result.actorRef_ =
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.newBuilder(result.actorRef_).mergeFrom(value).buildPartial();
} else {
result.actorRef_ = value;
}
result.hasActorRef = true;
return this;
}
public Builder clearActorRef() {
result.hasActorRef = false;
result.actorRef_ = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.getDefaultInstance();
return this;
}
// required string interfaceName = 2;
public boolean hasInterfaceName() {
return result.hasInterfaceName();
}
public java.lang.String getInterfaceName() {
return result.getInterfaceName();
}
public Builder setInterfaceName(java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
result.hasInterfaceName = true;
result.interfaceName_ = value;
return this;
}
public Builder clearInterfaceName() {
result.hasInterfaceName = false;
result.interfaceName_ = getDefaultInstance().getInterfaceName();
return this;
}
// @@protoc_insertion_point(builder_scope:RemoteTypedActorRefProtocol)
}
static {
defaultInstance = new RemoteTypedActorRefProtocol(true);
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.internalForceInit();
defaultInstance.initFields();
}
// @@protoc_insertion_point(class_scope:RemoteTypedActorRefProtocol)
}
public static final class SerializedActorRefProtocol extends
com.google.protobuf.GeneratedMessage {
// Use SerializedActorRefProtocol.newBuilder() to construct.
@ -1582,6 +1936,360 @@ public final class RemoteProtocol {
// @@protoc_insertion_point(class_scope:SerializedActorRefProtocol)
}
public static final class SerializedTypedActorRefProtocol extends
com.google.protobuf.GeneratedMessage {
// Use SerializedTypedActorRefProtocol.newBuilder() to construct.
private SerializedTypedActorRefProtocol() {
initFields();
}
private SerializedTypedActorRefProtocol(boolean noInit) {}
private static final SerializedTypedActorRefProtocol defaultInstance;
public static SerializedTypedActorRefProtocol getDefaultInstance() {
return defaultInstance;
}
public SerializedTypedActorRefProtocol getDefaultInstanceForType() {
return defaultInstance;
}
public static final com.google.protobuf.Descriptors.Descriptor
getDescriptor() {
return se.scalablesolutions.akka.remote.protocol.RemoteProtocol.internal_static_SerializedTypedActorRefProtocol_descriptor;
}
protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return se.scalablesolutions.akka.remote.protocol.RemoteProtocol.internal_static_SerializedTypedActorRefProtocol_fieldAccessorTable;
}
// required .SerializedActorRefProtocol actorRef = 1;
public static final int ACTORREF_FIELD_NUMBER = 1;
private boolean hasActorRef;
private se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol actorRef_;
public boolean hasActorRef() { return hasActorRef; }
public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol getActorRef() { return actorRef_; }
// required string interfaceName = 2;
public static final int INTERFACENAME_FIELD_NUMBER = 2;
private boolean hasInterfaceName;
private java.lang.String interfaceName_ = "";
public boolean hasInterfaceName() { return hasInterfaceName; }
public java.lang.String getInterfaceName() { return interfaceName_; }
private void initFields() {
actorRef_ = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.getDefaultInstance();
}
public final boolean isInitialized() {
if (!hasActorRef) return false;
if (!hasInterfaceName) return false;
if (!getActorRef().isInitialized()) return false;
return true;
}
public void writeTo(com.google.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
if (hasActorRef()) {
output.writeMessage(1, getActorRef());
}
if (hasInterfaceName()) {
output.writeString(2, getInterfaceName());
}
getUnknownFields().writeTo(output);
}
private int memoizedSerializedSize = -1;
public int getSerializedSize() {
int size = memoizedSerializedSize;
if (size != -1) return size;
size = 0;
if (hasActorRef()) {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(1, getActorRef());
}
if (hasInterfaceName()) {
size += com.google.protobuf.CodedOutputStream
.computeStringSize(2, getInterfaceName());
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol parseFrom(
com.google.protobuf.ByteString data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol parseFrom(
com.google.protobuf.ByteString data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol parseFrom(byte[] data)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data).buildParsed();
}
public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol parseFrom(
byte[] data,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws com.google.protobuf.InvalidProtocolBufferException {
return newBuilder().mergeFrom(data, extensionRegistry)
.buildParsed();
}
public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol parseFrom(java.io.InputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol parseFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
Builder builder = newBuilder();
if (builder.mergeDelimitedFrom(input)) {
return builder.buildParsed();
} else {
return null;
}
}
public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol parseDelimitedFrom(
java.io.InputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
Builder builder = newBuilder();
if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
return builder.buildParsed();
} else {
return null;
}
}
public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol parseFrom(
com.google.protobuf.CodedInputStream input)
throws java.io.IOException {
return newBuilder().mergeFrom(input).buildParsed();
}
public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol parseFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return newBuilder().mergeFrom(input, extensionRegistry)
.buildParsed();
}
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
public static final class Builder extends
com.google.protobuf.GeneratedMessage.Builder<Builder> {
private se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol result;
// Construct using se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol.newBuilder()
private Builder() {}
private static Builder create() {
Builder builder = new Builder();
builder.result = new se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol();
return builder;
}
protected se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol internalGetResult() {
return result;
}
public Builder clear() {
if (result == null) {
throw new IllegalStateException(
"Cannot call clear() after build().");
}
result = new se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol();
return this;
}
public Builder clone() {
return create().mergeFrom(result);
}
public com.google.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol.getDescriptor();
}
public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol getDefaultInstanceForType() {
return se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol.getDefaultInstance();
}
public boolean isInitialized() {
return result.isInitialized();
}
public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol build() {
if (result != null && !isInitialized()) {
throw newUninitializedMessageException(result);
}
return buildPartial();
}
private se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol buildParsed()
throws com.google.protobuf.InvalidProtocolBufferException {
if (!isInitialized()) {
throw newUninitializedMessageException(
result).asInvalidProtocolBufferException();
}
return buildPartial();
}
public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol buildPartial() {
if (result == null) {
throw new IllegalStateException(
"build() has already been called on this Builder.");
}
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol returnMe = result;
result = null;
return returnMe;
}
public Builder mergeFrom(com.google.protobuf.Message other) {
if (other instanceof se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol) {
return mergeFrom((se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol other) {
if (other == se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol.getDefaultInstance()) return this;
if (other.hasActorRef()) {
mergeActorRef(other.getActorRef());
}
if (other.hasInterfaceName()) {
setInterfaceName(other.getInterfaceName());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
public Builder mergeFrom(
com.google.protobuf.CodedInputStream input,
com.google.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
com.google.protobuf.UnknownFieldSet.Builder unknownFields =
com.google.protobuf.UnknownFieldSet.newBuilder(
this.getUnknownFields());
while (true) {
int tag = input.readTag();
switch (tag) {
case 0:
this.setUnknownFields(unknownFields.build());
return this;
default: {
if (!parseUnknownField(input, unknownFields,
extensionRegistry, tag)) {
this.setUnknownFields(unknownFields.build());
return this;
}
break;
}
case 10: {
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.Builder subBuilder = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.newBuilder();
if (hasActorRef()) {
subBuilder.mergeFrom(getActorRef());
}
input.readMessage(subBuilder, extensionRegistry);
setActorRef(subBuilder.buildPartial());
break;
}
case 18: {
setInterfaceName(input.readString());
break;
}
}
}
}
// required .SerializedActorRefProtocol actorRef = 1;
public boolean hasActorRef() {
return result.hasActorRef();
}
public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol getActorRef() {
return result.getActorRef();
}
public Builder setActorRef(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol value) {
if (value == null) {
throw new NullPointerException();
}
result.hasActorRef = true;
result.actorRef_ = value;
return this;
}
public Builder setActorRef(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.Builder builderForValue) {
result.hasActorRef = true;
result.actorRef_ = builderForValue.build();
return this;
}
public Builder mergeActorRef(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol value) {
if (result.hasActorRef() &&
result.actorRef_ != se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.getDefaultInstance()) {
result.actorRef_ =
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.newBuilder(result.actorRef_).mergeFrom(value).buildPartial();
} else {
result.actorRef_ = value;
}
result.hasActorRef = true;
return this;
}
public Builder clearActorRef() {
result.hasActorRef = false;
result.actorRef_ = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.getDefaultInstance();
return this;
}
// required string interfaceName = 2;
public boolean hasInterfaceName() {
return result.hasInterfaceName();
}
public java.lang.String getInterfaceName() {
return result.getInterfaceName();
}
public Builder setInterfaceName(java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
result.hasInterfaceName = true;
result.interfaceName_ = value;
return this;
}
public Builder clearInterfaceName() {
result.hasInterfaceName = false;
result.interfaceName_ = getDefaultInstance().getInterfaceName();
return this;
}
// @@protoc_insertion_point(builder_scope:SerializedTypedActorRefProtocol)
}
static {
defaultInstance = new SerializedTypedActorRefProtocol(true);
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.internalForceInit();
defaultInstance.initFields();
}
// @@protoc_insertion_point(class_scope:SerializedTypedActorRefProtocol)
}
public static final class MessageProtocol extends
com.google.protobuf.GeneratedMessage {
// Use MessageProtocol.newBuilder() to construct.
@ -5848,11 +6556,21 @@ public final class RemoteProtocol {
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_RemoteActorRefProtocol_fieldAccessorTable;
private static com.google.protobuf.Descriptors.Descriptor
internal_static_RemoteTypedActorRefProtocol_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_RemoteTypedActorRefProtocol_fieldAccessorTable;
private static com.google.protobuf.Descriptors.Descriptor
internal_static_SerializedActorRefProtocol_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_SerializedActorRefProtocol_fieldAccessorTable;
private static com.google.protobuf.Descriptors.Descriptor
internal_static_SerializedTypedActorRefProtocol_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_SerializedTypedActorRefProtocol_fieldAccessorTable;
private static com.google.protobuf.Descriptors.Descriptor
internal_static_MessageProtocol_descriptor;
private static
@ -5915,52 +6633,57 @@ public final class RemoteProtocol {
"\n\024RemoteProtocol.proto\"\204\001\n\026RemoteActorRe" +
"fProtocol\022\032\n\022classOrServiceName\030\001 \002(\t\022\026\n" +
"\016actorClassname\030\002 \002(\t\022%\n\013homeAddress\030\003 \002" +
"(\0132\020.AddressProtocol\022\017\n\007timeout\030\004 \001(\004\"\217\003" +
"\n\032SerializedActorRefProtocol\022\033\n\004uuid\030\001 \002" +
"(\0132\r.UuidProtocol\022\n\n\002id\030\002 \002(\t\022\026\n\016actorCl" +
"assname\030\003 \002(\t\022)\n\017originalAddress\030\004 \002(\0132\020" +
".AddressProtocol\022\025\n\ractorInstance\030\005 \001(\014\022" +
"\033\n\023serializerClassname\030\006 \001(\t\022\024\n\014isTransa" +
"ctor\030\007 \001(\010\022\017\n\007timeout\030\010 \001(\004\022\026\n\016receiveTi",
"meout\030\t \001(\004\022%\n\tlifeCycle\030\n \001(\0132\022.LifeCyc" +
"leProtocol\022+\n\nsupervisor\030\013 \001(\0132\027.RemoteA" +
"ctorRefProtocol\022\024\n\014hotswapStack\030\014 \001(\014\022(\n" +
"\010messages\030\r \003(\0132\026.RemoteRequestProtocol\"" +
"r\n\017MessageProtocol\0225\n\023serializationSchem" +
"e\030\001 \002(\0162\030.SerializationSchemeType\022\017\n\007mes" +
"sage\030\002 \002(\014\022\027\n\017messageManifest\030\003 \001(\014\"\255\001\n\021" +
"ActorInfoProtocol\022\033\n\004uuid\030\001 \002(\0132\r.UuidPr" +
"otocol\022\016\n\006target\030\002 \002(\t\022\017\n\007timeout\030\003 \002(\004\022" +
"\035\n\tactorType\030\004 \002(\0162\n.ActorType\022/\n\016typedA",
"ctorInfo\030\005 \001(\0132\027.TypedActorInfoProtocol\022" +
"\n\n\002id\030\006 \001(\t\";\n\026TypedActorInfoProtocol\022\021\n" +
"\tinterface\030\001 \002(\t\022\016\n\006method\030\002 \002(\t\"\212\002\n\025Rem" +
"oteRequestProtocol\022\033\n\004uuid\030\001 \002(\0132\r.UuidP" +
"rotocol\022!\n\007message\030\002 \002(\0132\020.MessageProtoc" +
"ol\022%\n\tactorInfo\030\003 \002(\0132\022.ActorInfoProtoco" +
"l\022\020\n\010isOneWay\030\004 \002(\010\022%\n\016supervisorUuid\030\005 " +
"\001(\0132\r.UuidProtocol\022\'\n\006sender\030\006 \001(\0132\027.Rem" +
"oteActorRefProtocol\022(\n\010metadata\030\007 \003(\0132\026." +
"MetadataEntryProtocol\"\364\001\n\023RemoteReplyPro",
"tocol\022\033\n\004uuid\030\001 \002(\0132\r.UuidProtocol\022!\n\007me" +
"ssage\030\002 \001(\0132\020.MessageProtocol\022%\n\texcepti" +
"on\030\003 \001(\0132\022.ExceptionProtocol\022%\n\016supervis" +
"orUuid\030\004 \001(\0132\r.UuidProtocol\022\017\n\007isActor\030\005" +
" \002(\010\022\024\n\014isSuccessful\030\006 \002(\010\022(\n\010metadata\030\007" +
" \003(\0132\026.MetadataEntryProtocol\")\n\014UuidProt" +
"ocol\022\014\n\004high\030\001 \002(\004\022\013\n\003low\030\002 \002(\004\"3\n\025Metad" +
"ataEntryProtocol\022\013\n\003key\030\001 \002(\t\022\r\n\005value\030\002" +
" \002(\014\"6\n\021LifeCycleProtocol\022!\n\tlifeCycle\030\001" +
" \002(\0162\016.LifeCycleType\"1\n\017AddressProtocol\022",
"\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"7\n\021Excep" +
"tionProtocol\022\021\n\tclassname\030\001 \002(\t\022\017\n\007messa" +
"ge\030\002 \002(\t*=\n\tActorType\022\017\n\013SCALA_ACTOR\020\001\022\016" +
"\n\nJAVA_ACTOR\020\002\022\017\n\013TYPED_ACTOR\020\003*]\n\027Seria" +
"lizationSchemeType\022\010\n\004JAVA\020\001\022\013\n\007SBINARY\020" +
"\002\022\016\n\nSCALA_JSON\020\003\022\r\n\tJAVA_JSON\020\004\022\014\n\010PROT" +
"OBUF\020\005*-\n\rLifeCycleType\022\r\n\tPERMANENT\020\001\022\r" +
"\n\tTEMPORARY\020\002B-\n)se.scalablesolutions.ak" +
"ka.remote.protocolH\001"
"(\0132\020.AddressProtocol\022\017\n\007timeout\030\004 \001(\004\"_\n" +
"\033RemoteTypedActorRefProtocol\022)\n\010actorRef" +
"\030\001 \002(\0132\027.RemoteActorRefProtocol\022\025\n\rinter" +
"faceName\030\002 \002(\t\"\217\003\n\032SerializedActorRefPro" +
"tocol\022\033\n\004uuid\030\001 \002(\0132\r.UuidProtocol\022\n\n\002id" +
"\030\002 \002(\t\022\026\n\016actorClassname\030\003 \002(\t\022)\n\017origin" +
"alAddress\030\004 \002(\0132\020.AddressProtocol\022\025\n\ract",
"orInstance\030\005 \001(\014\022\033\n\023serializerClassname\030" +
"\006 \001(\t\022\024\n\014isTransactor\030\007 \001(\010\022\017\n\007timeout\030\010" +
" \001(\004\022\026\n\016receiveTimeout\030\t \001(\004\022%\n\tlifeCycl" +
"e\030\n \001(\0132\022.LifeCycleProtocol\022+\n\nsuperviso" +
"r\030\013 \001(\0132\027.RemoteActorRefProtocol\022\024\n\014hots" +
"wapStack\030\014 \001(\014\022(\n\010messages\030\r \003(\0132\026.Remot" +
"eRequestProtocol\"g\n\037SerializedTypedActor" +
"RefProtocol\022-\n\010actorRef\030\001 \002(\0132\033.Serializ" +
"edActorRefProtocol\022\025\n\rinterfaceName\030\002 \002(" +
"\t\"r\n\017MessageProtocol\0225\n\023serializationSch",
"eme\030\001 \002(\0162\030.SerializationSchemeType\022\017\n\007m" +
"essage\030\002 \002(\014\022\027\n\017messageManifest\030\003 \001(\014\"\255\001" +
"\n\021ActorInfoProtocol\022\033\n\004uuid\030\001 \002(\0132\r.Uuid" +
"Protocol\022\016\n\006target\030\002 \002(\t\022\017\n\007timeout\030\003 \002(" +
"\004\022\035\n\tactorType\030\004 \002(\0162\n.ActorType\022/\n\016type" +
"dActorInfo\030\005 \001(\0132\027.TypedActorInfoProtoco" +
"l\022\n\n\002id\030\006 \001(\t\";\n\026TypedActorInfoProtocol\022" +
"\021\n\tinterface\030\001 \002(\t\022\016\n\006method\030\002 \002(\t\"\212\002\n\025R" +
"emoteRequestProtocol\022\033\n\004uuid\030\001 \002(\0132\r.Uui" +
"dProtocol\022!\n\007message\030\002 \002(\0132\020.MessageProt",
"ocol\022%\n\tactorInfo\030\003 \002(\0132\022.ActorInfoProto" +
"col\022\020\n\010isOneWay\030\004 \002(\010\022%\n\016supervisorUuid\030" +
"\005 \001(\0132\r.UuidProtocol\022\'\n\006sender\030\006 \001(\0132\027.R" +
"emoteActorRefProtocol\022(\n\010metadata\030\007 \003(\0132" +
"\026.MetadataEntryProtocol\"\364\001\n\023RemoteReplyP" +
"rotocol\022\033\n\004uuid\030\001 \002(\0132\r.UuidProtocol\022!\n\007" +
"message\030\002 \001(\0132\020.MessageProtocol\022%\n\texcep" +
"tion\030\003 \001(\0132\022.ExceptionProtocol\022%\n\016superv" +
"isorUuid\030\004 \001(\0132\r.UuidProtocol\022\017\n\007isActor" +
"\030\005 \002(\010\022\024\n\014isSuccessful\030\006 \002(\010\022(\n\010metadata",
"\030\007 \003(\0132\026.MetadataEntryProtocol\")\n\014UuidPr" +
"otocol\022\014\n\004high\030\001 \002(\004\022\013\n\003low\030\002 \002(\004\"3\n\025Met" +
"adataEntryProtocol\022\013\n\003key\030\001 \002(\t\022\r\n\005value" +
"\030\002 \002(\014\"6\n\021LifeCycleProtocol\022!\n\tlifeCycle" +
"\030\001 \002(\0162\016.LifeCycleType\"1\n\017AddressProtoco" +
"l\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"7\n\021Exc" +
"eptionProtocol\022\021\n\tclassname\030\001 \002(\t\022\017\n\007mes" +
"sage\030\002 \002(\t*=\n\tActorType\022\017\n\013SCALA_ACTOR\020\001" +
"\022\016\n\nJAVA_ACTOR\020\002\022\017\n\013TYPED_ACTOR\020\003*]\n\027Ser" +
"ializationSchemeType\022\010\n\004JAVA\020\001\022\013\n\007SBINAR",
"Y\020\002\022\016\n\nSCALA_JSON\020\003\022\r\n\tJAVA_JSON\020\004\022\014\n\010PR" +
"OTOBUF\020\005*-\n\rLifeCycleType\022\r\n\tPERMANENT\020\001" +
"\022\r\n\tTEMPORARY\020\002B-\n)se.scalablesolutions." +
"akka.remote.protocolH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -5975,16 +6698,32 @@ public final class RemoteProtocol {
new java.lang.String[] { "ClassOrServiceName", "ActorClassname", "HomeAddress", "Timeout", },
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.class,
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.Builder.class);
internal_static_SerializedActorRefProtocol_descriptor =
internal_static_RemoteTypedActorRefProtocol_descriptor =
getDescriptor().getMessageTypes().get(1);
internal_static_RemoteTypedActorRefProtocol_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_RemoteTypedActorRefProtocol_descriptor,
new java.lang.String[] { "ActorRef", "InterfaceName", },
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol.class,
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol.Builder.class);
internal_static_SerializedActorRefProtocol_descriptor =
getDescriptor().getMessageTypes().get(2);
internal_static_SerializedActorRefProtocol_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_SerializedActorRefProtocol_descriptor,
new java.lang.String[] { "Uuid", "Id", "ActorClassname", "OriginalAddress", "ActorInstance", "SerializerClassname", "IsTransactor", "Timeout", "ReceiveTimeout", "LifeCycle", "Supervisor", "HotswapStack", "Messages", },
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.class,
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.Builder.class);
internal_static_SerializedTypedActorRefProtocol_descriptor =
getDescriptor().getMessageTypes().get(3);
internal_static_SerializedTypedActorRefProtocol_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_SerializedTypedActorRefProtocol_descriptor,
new java.lang.String[] { "ActorRef", "InterfaceName", },
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol.class,
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol.Builder.class);
internal_static_MessageProtocol_descriptor =
getDescriptor().getMessageTypes().get(2);
getDescriptor().getMessageTypes().get(4);
internal_static_MessageProtocol_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_MessageProtocol_descriptor,
@ -5992,7 +6731,7 @@ public final class RemoteProtocol {
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.MessageProtocol.class,
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.MessageProtocol.Builder.class);
internal_static_ActorInfoProtocol_descriptor =
getDescriptor().getMessageTypes().get(3);
getDescriptor().getMessageTypes().get(5);
internal_static_ActorInfoProtocol_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_ActorInfoProtocol_descriptor,
@ -6000,7 +6739,7 @@ public final class RemoteProtocol {
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.ActorInfoProtocol.class,
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.ActorInfoProtocol.Builder.class);
internal_static_TypedActorInfoProtocol_descriptor =
getDescriptor().getMessageTypes().get(4);
getDescriptor().getMessageTypes().get(6);
internal_static_TypedActorInfoProtocol_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_TypedActorInfoProtocol_descriptor,
@ -6008,7 +6747,7 @@ public final class RemoteProtocol {
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.TypedActorInfoProtocol.class,
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.TypedActorInfoProtocol.Builder.class);
internal_static_RemoteRequestProtocol_descriptor =
getDescriptor().getMessageTypes().get(5);
getDescriptor().getMessageTypes().get(7);
internal_static_RemoteRequestProtocol_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_RemoteRequestProtocol_descriptor,
@ -6016,7 +6755,7 @@ public final class RemoteProtocol {
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol.class,
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol.Builder.class);
internal_static_RemoteReplyProtocol_descriptor =
getDescriptor().getMessageTypes().get(6);
getDescriptor().getMessageTypes().get(8);
internal_static_RemoteReplyProtocol_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_RemoteReplyProtocol_descriptor,
@ -6024,7 +6763,7 @@ public final class RemoteProtocol {
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteReplyProtocol.class,
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteReplyProtocol.Builder.class);
internal_static_UuidProtocol_descriptor =
getDescriptor().getMessageTypes().get(7);
getDescriptor().getMessageTypes().get(9);
internal_static_UuidProtocol_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_UuidProtocol_descriptor,
@ -6032,7 +6771,7 @@ public final class RemoteProtocol {
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.UuidProtocol.class,
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.UuidProtocol.Builder.class);
internal_static_MetadataEntryProtocol_descriptor =
getDescriptor().getMessageTypes().get(8);
getDescriptor().getMessageTypes().get(10);
internal_static_MetadataEntryProtocol_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_MetadataEntryProtocol_descriptor,
@ -6040,7 +6779,7 @@ public final class RemoteProtocol {
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.MetadataEntryProtocol.class,
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.MetadataEntryProtocol.Builder.class);
internal_static_LifeCycleProtocol_descriptor =
getDescriptor().getMessageTypes().get(9);
getDescriptor().getMessageTypes().get(11);
internal_static_LifeCycleProtocol_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_LifeCycleProtocol_descriptor,
@ -6048,7 +6787,7 @@ public final class RemoteProtocol {
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol.class,
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol.Builder.class);
internal_static_AddressProtocol_descriptor =
getDescriptor().getMessageTypes().get(10);
getDescriptor().getMessageTypes().get(12);
internal_static_AddressProtocol_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_AddressProtocol_descriptor,
@ -6056,7 +6795,7 @@ public final class RemoteProtocol {
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.AddressProtocol.class,
se.scalablesolutions.akka.remote.protocol.RemoteProtocol.AddressProtocol.Builder.class);
internal_static_ExceptionProtocol_descriptor =
getDescriptor().getMessageTypes().get(11);
getDescriptor().getMessageTypes().get(13);
internal_static_ExceptionProtocol_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_ExceptionProtocol_descriptor,

View file

@ -22,6 +22,15 @@ message RemoteActorRefProtocol {
optional uint64 timeout = 4;
}
/**
* Defines a remote ActorRef that "remembers" and uses its original typed Actor instance
* on the original node.
*/
message RemoteTypedActorRefProtocol {
required RemoteActorRefProtocol actorRef = 1;
required string interfaceName = 2;
}
/**
* Defines a fully serialized remote ActorRef (with serialized Actor instance)
* that is about to be instantiated on the remote node. It is fully disconnected
@ -43,6 +52,16 @@ message SerializedActorRefProtocol {
repeated RemoteRequestProtocol messages = 13;
}
/**
* Defines a fully serialized remote ActorRef (with serialized typed actor instance)
* that is about to be instantiated on the remote node. It is fully disconnected
* from its original host.
*/
message SerializedTypedActorRefProtocol {
required SerializedActorRefProtocol actorRef = 1;
required string interfaceName = 2;
}
/**
* Defines a message.
*/

View file

@ -10,8 +10,7 @@ import java.util.concurrent.{ConcurrentHashMap, Executors}
import java.util.{Map => JMap}
import se.scalablesolutions.akka.actor.{
Actor, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage}
import se.scalablesolutions.akka.actor.{Uuid,uuidFrom}
Actor, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage,uuidFrom,Uuid}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.util._
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
@ -68,6 +67,7 @@ object RemoteNode extends RemoteServer
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object RemoteServer {
val UUID_PREFIX = "uuid:"
val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
val PORT = config.getInt("akka.remote.server.port", 9999)
@ -124,19 +124,21 @@ object RemoteServer {
private class RemoteActorSet {
private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef]
private[RemoteServer] val actorsByUuid = new ConcurrentHashMap[String, ActorRef]
private[RemoteServer] val typedActors = new ConcurrentHashMap[String, AnyRef]
private[RemoteServer] val typedActorsByUuid = new ConcurrentHashMap[String, AnyRef]
}
private val guard = new ReadWriteGuard
private val remoteActorSets = Map[Address, RemoteActorSet]()
private val remoteServers = Map[Address, RemoteServer]()
private[akka] def registerActor(address: InetSocketAddress, uuid: Uuid, actor: ActorRef) = guard.withWriteGuard {
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid.toString, actor)
private[akka] def registerActorByUuid(address: InetSocketAddress, uuid: String, actor: ActorRef) = guard.withWriteGuard {
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actorsByUuid.put(uuid, actor)
}
private[akka] def registerTypedActor(address: InetSocketAddress, uuid: Uuid, typedActor: AnyRef) = guard.withWriteGuard {
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(uuid.toString, typedActor)
private[akka] def registerTypedActorByUuid(address: InetSocketAddress, uuid: String, typedActor: AnyRef) = guard.withWriteGuard {
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(uuid, typedActor)
}
private[akka] def getOrCreateServer(address: InetSocketAddress): RemoteServer = guard.withWriteGuard {
@ -193,6 +195,7 @@ case class RemoteServerClientDisconnected(@BeanProperty val server: RemoteServer
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteServer extends Logging with ListenerManagement {
import RemoteServer._
def name = "RemoteServer@" + hostname + ":" + port
private[akka] var address = RemoteServer.Address(RemoteServer.HOSTNAME,RemoteServer.PORT)
@ -284,10 +287,11 @@ class RemoteServer extends Logging with ListenerManagement {
* @param typedActor typed actor to register
*/
def registerTypedActor(id: String, typedActor: AnyRef): Unit = synchronized {
val typedActors = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors
if (!typedActors.contains(id)) {
log.debug("Registering server side remote actor [%s] with id [%s] on [%s:%d]", typedActor.getClass.getName, id, hostname, port)
typedActors.put(id, typedActor)
log.debug("Registering server side remote typed actor [%s] with id [%s]", typedActor.getClass.getName, id)
if (id.startsWith(UUID_PREFIX)) {
registerTypedActor(id.substring(UUID_PREFIX.length), typedActor, typedActorsByUuid())
} else {
registerTypedActor(id, typedActor, typedActors())
}
}
@ -302,12 +306,27 @@ class RemoteServer extends Logging with ListenerManagement {
* NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
*/
def register(id: String, actorRef: ActorRef): Unit = synchronized {
log.debug("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, id)
if (id.startsWith(UUID_PREFIX)) {
register(id.substring(UUID_PREFIX.length), actorRef, actorsByUuid())
} else {
register(id, actorRef, actors())
}
}
private def register(id: String, actorRef: ActorRef, registry: ConcurrentHashMap[String, ActorRef]) {
if (_isRunning) {
val actorMap = actors()
if (!actorMap.contains(id)) {
if (!registry.contains(id)) {
if (!actorRef.isRunning) actorRef.start
log.debug("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, id)
actorMap.put(id, actorRef)
registry.put(id, actorRef)
}
}
}
private def registerTypedActor(id: String, typedActor: AnyRef, registry: ConcurrentHashMap[String, AnyRef]) {
if (_isRunning) {
if (!registry.contains(id)) {
registry.put(id, typedActor)
}
}
}
@ -320,7 +339,7 @@ class RemoteServer extends Logging with ListenerManagement {
log.debug("Unregistering server side remote actor [%s] with id [%s:%s]", actorRef.actorClass.getName, actorRef.id, actorRef.uuid)
val actorMap = actors()
actorMap remove actorRef.id
if (actorRef.registeredInRemoteNodeDuringSerialization) actorMap remove actorRef.uuid
if (actorRef.registeredInRemoteNodeDuringSerialization) actorsByUuid() remove actorRef.uuid
}
}
@ -332,10 +351,15 @@ class RemoteServer extends Logging with ListenerManagement {
def unregister(id: String):Unit = synchronized {
if (_isRunning) {
log.info("Unregistering server side remote actor with id [%s]", id)
val actorMap = actors()
val actorRef = actorMap get id
actorMap remove id
if (actorRef.registeredInRemoteNodeDuringSerialization) actorMap remove actorRef.uuid
if (id.startsWith(UUID_PREFIX)) {
actorsByUuid().remove(id.substring(UUID_PREFIX.length))
} else {
val actorRef = actors().get(id)
if (actorRef.registeredInRemoteNodeDuringSerialization) {
actorsByUuid() remove actorRef.uuid
}
actors() remove id
}
}
}
@ -347,8 +371,11 @@ class RemoteServer extends Logging with ListenerManagement {
def unregisterTypedActor(id: String):Unit = synchronized {
if (_isRunning) {
log.info("Unregistering server side remote typed actor with id [%s]", id)
val registeredTypedActors = typedActors()
registeredTypedActors.remove(id)
if (id.startsWith(UUID_PREFIX)) {
typedActorsByUuid().remove(id.substring(UUID_PREFIX.length))
} else {
typedActors().remove(id)
}
}
}
@ -356,8 +383,10 @@ class RemoteServer extends Logging with ListenerManagement {
protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message)
private[akka] def actors() = RemoteServer.actorsFor(address).actors
private[akka] def typedActors() = RemoteServer.actorsFor(address).typedActors
private[akka] def actors() = RemoteServer.actorsFor(address).actors
private[akka] def actorsByUuid() = RemoteServer.actorsFor(address).actorsByUuid
private[akka] def typedActors() = RemoteServer.actorsFor(address).typedActors
private[akka] def typedActorsByUuid() = RemoteServer.actorsFor(address).typedActorsByUuid
}
object RemoteServerSslContext {
@ -420,6 +449,7 @@ class RemoteServerHandler(
val openChannels: ChannelGroup,
val applicationLoader: Option[ClassLoader],
val server: RemoteServer) extends SimpleChannelUpstreamHandler with Logging {
import RemoteServer._
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
applicationLoader.foreach(MessageSerializer.setClassLoader(_))
@ -478,11 +508,12 @@ class RemoteServerHandler(
private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = {
log.debug("Received RemoteRequestProtocol[\n%s]", request.toString)
val actorType = request.getActorInfo.getActorType
if (actorType == SCALA_ACTOR) dispatchToActor(request, channel)
else if (actorType == JAVA_ACTOR) throw new IllegalActorStateException("ActorType JAVA_ACTOR is currently not supported")
else if (actorType == TYPED_ACTOR) dispatchToTypedActor(request, channel)
else throw new IllegalActorStateException("Unknown ActorType [" + actorType + "]")
request.getActorInfo.getActorType match {
case SCALA_ACTOR => dispatchToActor(request, channel)
case TYPED_ACTOR => dispatchToTypedActor(request, channel)
case JAVA_ACTOR => throw new IllegalActorStateException("ActorType JAVA_ACTOR is currently not supported")
case other => throw new IllegalActorStateException("Unknown ActorType [" + other + "]")
}
}
private def dispatchToActor(request: RemoteRequestProtocol, channel: Channel) = {
@ -505,10 +536,10 @@ class RemoteServerHandler(
override def onComplete(result: AnyRef) {
log.debug("Returning result from actor invocation [%s]", result)
val replyBuilder = RemoteReplyProtocol.newBuilder
.setUuid(request.getUuid)
.setMessage(MessageSerializer.serialize(result))
.setIsSuccessful(true)
.setIsActor(true)
.setUuid(request.getUuid)
.setMessage(MessageSerializer.serialize(result))
.setIsSuccessful(true)
.setIsActor(true)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
@ -566,32 +597,23 @@ class RemoteServerHandler(
}
}
/**
* Find a registered actor by ID (default) or UUID.
* Actors are registered by id apart from registering during serialization see SerializationProtocol.
*/
private def findActorByIdOrUuid(id: String, uuid: Uuid) : ActorRef = {
val registeredActors = server.actors()
var actorRefOrNull = registeredActors get id
if (actorRefOrNull eq null) {
actorRefOrNull = registeredActors get uuid
}
actorRefOrNull
private def findActorById(id: String) : ActorRef = {
server.actors().get(id)
}
/**
* Find a registered typed actor by ID (default) or UUID.
* Actors are registered by id apart from registering during serialization see SerializationProtocol.
*/
private def findTypedActorByIdOrUUid(id: String, uuid: Uuid) : AnyRef = {
val registeredActors = server.typedActors()
var actorRefOrNull = registeredActors get id
if (actorRefOrNull eq null) {
actorRefOrNull = registeredActors get uuid
}
actorRefOrNull
private def findActorByUuid(uuid: String) : ActorRef = {
server.actorsByUuid().get(uuid)
}
private def findTypedActorById(id: String) : AnyRef = {
server.typedActors().get(id)
}
private def findTypedActorByUuid(uuid: String) : AnyRef = {
server.typedActorsByUuid().get(uuid)
}
/**
* Creates a new instance of the actor with name, uuid and timeout specified as arguments.
*
@ -600,21 +622,25 @@ class RemoteServerHandler(
* Does not start the actor.
*/
private def createActor(actorInfo: ActorInfoProtocol): ActorRef = {
val uuid = uuidFrom(actorInfo.getUuid.getHigh,actorInfo.getUuid.getLow)
val uuid = actorInfo.getUuid
val id = actorInfo.getId
val name = actorInfo.getTarget
val timeout = actorInfo.getTimeout
val actorRefOrNull = findActorByIdOrUuid(id, uuid)
val actorRefOrNull = if (id.startsWith(UUID_PREFIX)) {
findActorByUuid(id.substring(UUID_PREFIX.length))
} else {
findActorById(id)
}
if (actorRefOrNull eq null) {
try {
log.info("Creating a new remote actor [%s:%s]", name, uuid)
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
else Class.forName(name)
val actorRef = Actor.actorOf(clazz.newInstance.asInstanceOf[Actor])
actorRef.uuid = uuid
actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow)
actorRef.id = id
actorRef.timeout = timeout
actorRef.remoteAddress = None
@ -630,10 +656,14 @@ class RemoteServerHandler(
}
private def createTypedActor(actorInfo: ActorInfoProtocol): AnyRef = {
val uuid = uuidFrom(actorInfo.getUuid.getHigh,actorInfo.getUuid.getLow)
val uuid = actorInfo.getUuid
val id = actorInfo.getId
val typedActorOrNull = findTypedActorByIdOrUUid(id, uuid)
val typedActorOrNull = if (id.startsWith(UUID_PREFIX)) {
findTypedActorByUuid(id.substring(UUID_PREFIX.length))
} else {
findTypedActorById(id)
}
if (typedActorOrNull eq null) {
val typedActorInfo = actorInfo.getTypedActorInfo

View file

@ -4,11 +4,10 @@
package se.scalablesolutions.akka.serialization
import se.scalablesolutions.akka.actor.{Actor, ActorRef, LocalActorRef, RemoteActorRef, IllegalActorStateException, ActorType}
import se.scalablesolutions.akka.stm.global._
import se.scalablesolutions.akka.stm.TransactionManagement._
import se.scalablesolutions.akka.stm.TransactionManagement
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
import se.scalablesolutions.akka.dispatch.MessageInvocation
import se.scalablesolutions.akka.remote.{RemoteServer, MessageSerializer}
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.{ActorType => ActorTypeProtocol, _}
import ActorTypeProtocol._
@ -16,6 +15,7 @@ import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, F
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.actor.{uuidFrom,newUuid}
import com.google.protobuf.ByteString
import se.scalablesolutions.akka.actor._
/**
* Type class definition for Actor Serialization
@ -37,13 +37,14 @@ trait Format[T <: Actor] extends FromBinary[T] with ToBinary[T]
* Create a Format object with the client actor as the implementation of the type class
*
* <pre>
* object BinaryFormatMyStatelessActor {
* object BinaryFormatMyStatelessActor {
* implicit object MyStatelessActorFormat extends StatelessActorFormat[MyStatelessActor]
* }
* </pre>
*/
trait StatelessActorFormat[T <: Actor] extends Format[T] {
def fromBinary(bytes: Array[Byte], act: T) = act
def toBinary(ac: T) = Array.empty[Byte]
}
@ -54,16 +55,18 @@ trait StatelessActorFormat[T <: Actor] extends Format[T] {
* a serializer object
*
* <pre>
* object BinaryFormatMyJavaSerializableActor {
* implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor] {
* object BinaryFormatMyJavaSerializableActor {
* implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor] {
* val serializer = Serializer.Java
* }
* }
* }
* </pre>
*/
trait SerializerBasedActorFormat[T <: Actor] extends Format[T] {
val serializer: Serializer
def fromBinary(bytes: Array[Byte], act: T) = serializer.fromBinary(bytes, Some(act.self.actorClass)).asInstanceOf[T]
def toBinary(ac: T) = serializer.toBinary(ac)
}
@ -71,23 +74,22 @@ trait SerializerBasedActorFormat[T <: Actor] extends Format[T] {
* Module for local actor serialization.
*/
object ActorSerialization {
def fromBinary[T <: Actor](bytes: Array[Byte])(implicit format: Format[T]): ActorRef =
fromBinaryToLocalActorRef(bytes, format)
def toBinary[T <: Actor](a: ActorRef)(implicit format: Format[T]): Array[Byte] =
toSerializedActorRefProtocol(a, format).toByteArray
def toBinary[T <: Actor](a: ActorRef, serializeMailBox: Boolean = true)(implicit format: Format[T]): Array[Byte] =
toSerializedActorRefProtocol(a, format, serializeMailBox).toByteArray
// wrapper for implicits to be used by Java
def fromBinaryJ[T <: Actor](bytes: Array[Byte], format: Format[T]): ActorRef =
fromBinary(bytes)(format)
// wrapper for implicits to be used by Java
def toBinaryJ[T <: Actor](a: ActorRef, format: Format[T]): Array[Byte] =
toBinary(a)(format)
def toBinaryJ[T <: Actor](a: ActorRef, format: Format[T], srlMailBox: Boolean = true): Array[Byte] =
toBinary(a, srlMailBox)(format)
private def toSerializedActorRefProtocol[T <: Actor](
actorRef: ActorRef, format: Format[T]): SerializedActorRefProtocol = {
private[akka] def toSerializedActorRefProtocol[T <: Actor](
actorRef: ActorRef, format: Format[T], serializeMailBox: Boolean = true): SerializedActorRefProtocol = {
val lifeCycleProtocol: Option[LifeCycleProtocol] = {
def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match {
case Permanent => builder.setLifeCycle(LifeCycleType.PERMANENT)
@ -103,9 +105,9 @@ object ActorSerialization {
}
val originalAddress = AddressProtocol.newBuilder
.setHostname(actorRef.homeAddress.getHostName)
.setPort(actorRef.homeAddress.getPort)
.build
.setHostname(actorRef.homeAddress.getHostName)
.setPort(actorRef.homeAddress.getPort)
.build
val builder = SerializedActorRefProtocol.newBuilder
.setUuid(UuidProtocol.newBuilder.setHigh(actorRef.uuid.getTime).setLow(actorRef.uuid.getClockSeqAndNode).build)
@ -115,6 +117,30 @@ object ActorSerialization {
.setIsTransactor(actorRef.isTransactor)
.setTimeout(actorRef.timeout)
if (serializeMailBox == true) {
val messages =
actorRef.mailbox match {
case q: java.util.Queue[MessageInvocation] =>
val l = new scala.collection.mutable.ListBuffer[MessageInvocation]
val it = q.iterator
while (it.hasNext == true) l += it.next
l
}
val requestProtocols =
messages.map(m =>
RemoteActorSerialization.createRemoteRequestProtocolBuilder(
actorRef,
m.message,
false,
actorRef.getSender,
None,
ActorType.ScalaActor).build)
requestProtocols.foreach(rp => builder.addMessages(rp))
}
actorRef.receiveTimeout.foreach(builder.setReceiveTimeout(_))
builder.setActorInstance(ByteString.copyFrom(format.toBinary(actorRef.actor.asInstanceOf[T])))
lifeCycleProtocol.foreach(builder.setLifeCycle(_))
@ -127,33 +153,33 @@ object ActorSerialization {
private def fromBinaryToLocalActorRef[T <: Actor](bytes: Array[Byte], format: Format[T]): ActorRef =
fromProtobufToLocalActorRef(SerializedActorRefProtocol.newBuilder.mergeFrom(bytes).build, format, None)
private def fromProtobufToLocalActorRef[T <: Actor](
protocol: SerializedActorRefProtocol, format: Format[T], loader: Option[ClassLoader]): ActorRef = {
private[akka] def fromProtobufToLocalActorRef[T <: Actor](
protocol: SerializedActorRefProtocol, format: Format[T], loader: Option[ClassLoader]): ActorRef = {
Actor.log.debug("Deserializing SerializedActorRefProtocol to LocalActorRef:\n" + protocol)
val serializer =
if (format.isInstanceOf[SerializerBasedActorFormat[_]])
Some(format.asInstanceOf[SerializerBasedActorFormat[_]].serializer)
else None
if (format.isInstanceOf[SerializerBasedActorFormat[_]])
Some(format.asInstanceOf[SerializerBasedActorFormat[_]].serializer)
else None
val lifeCycle =
if (protocol.hasLifeCycle) {
val lifeCycleProtocol = protocol.getLifeCycle
Some(if (lifeCycleProtocol.getLifeCycle == LifeCycleType.PERMANENT) LifeCycle(Permanent)
else if (lifeCycleProtocol.getLifeCycle == LifeCycleType.TEMPORARY) LifeCycle(Temporary)
else throw new IllegalActorStateException("LifeCycle type is not valid: " + lifeCycleProtocol.getLifeCycle))
} else None
if (protocol.hasLifeCycle) {
val lifeCycleProtocol = protocol.getLifeCycle
Some(if (lifeCycleProtocol.getLifeCycle == LifeCycleType.PERMANENT) LifeCycle(Permanent)
else if (lifeCycleProtocol.getLifeCycle == LifeCycleType.TEMPORARY) LifeCycle(Temporary)
else throw new IllegalActorStateException("LifeCycle type is not valid: " + lifeCycleProtocol.getLifeCycle))
} else None
val supervisor =
if (protocol.hasSupervisor)
Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(protocol.getSupervisor, loader))
else None
if (protocol.hasSupervisor)
Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(protocol.getSupervisor, loader))
else None
val hotswap =
if (serializer.isDefined && protocol.hasHotswapStack) Some(serializer.get
if (serializer.isDefined && protocol.hasHotswapStack) Some(serializer.get
.fromBinary(protocol.getHotswapStack.toByteArray, Some(classOf[PartialFunction[Any, Unit]]))
.asInstanceOf[PartialFunction[Any, Unit]])
else None
else None
val classLoader = loader.getOrElse(getClass.getClassLoader)
@ -195,9 +221,9 @@ object RemoteActorSerialization {
def fromBinaryToRemoteActorRef(bytes: Array[Byte]): ActorRef =
fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, None)
/**
* Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance.
*/
/**
* Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance.
*/
def fromBinaryToRemoteActorRef(bytes: Array[Byte], loader: ClassLoader): ActorRef =
fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader))
@ -226,7 +252,7 @@ object RemoteActorSerialization {
if (!registeredInRemoteNodeDuringSerialization) {
Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port)
RemoteServer.getOrCreateServer(homeAddress)
RemoteServer.registerActor(homeAddress, uuid, ar)
RemoteServer.registerActorByUuid(homeAddress, uuid.toString, ar)
registeredInRemoteNodeDuringSerialization = true
}
@ -239,13 +265,13 @@ object RemoteActorSerialization {
}
def createRemoteRequestProtocolBuilder(
actorRef: ActorRef,
message: Any,
isOneWay: Boolean,
senderOption: Option[ActorRef],
typedActorInfo: Option[Tuple2[String, String]],
actorType: ActorType):
RemoteRequestProtocol.Builder = {
actorRef: ActorRef,
message: Any,
isOneWay: Boolean,
senderOption: Option[ActorRef],
typedActorInfo: Option[Tuple2[String, String]],
actorType: ActorType):
RemoteRequestProtocol.Builder = {
import actorRef._
val actorInfoBuilder = ActorInfoProtocol.newBuilder
@ -254,12 +280,13 @@ object RemoteActorSerialization {
.setTarget(actorClassName)
.setTimeout(timeout)
typedActorInfo.foreach { typedActor =>
actorInfoBuilder.setTypedActorInfo(
TypedActorInfoProtocol.newBuilder
.setInterface(typedActor._1)
.setMethod(typedActor._2)
.build)
typedActorInfo.foreach {
typedActor =>
actorInfoBuilder.setTypedActorInfo(
TypedActorInfoProtocol.newBuilder
.setInterface(typedActor._1)
.setMethod(typedActor._2)
.build)
}
actorType match {
@ -280,7 +307,107 @@ object RemoteActorSerialization {
senderOption.foreach { sender =>
RemoteServer.getOrCreateServer(sender.homeAddress).register(sender.uuid.toString, sender)
requestBuilder.setSender(toRemoteActorRefProtocol(sender))
}
requestBuilder
}
}
/**
* Module for local typed actor serialization.
*/
object TypedActorSerialization {
def fromBinary[T <: Actor, U <: AnyRef](bytes: Array[Byte])(implicit format: Format[T]): U =
fromBinaryToLocalTypedActorRef(bytes, format)
def toBinary[T <: Actor](proxy: AnyRef)(implicit format: Format[T]): Array[Byte] = {
toSerializedTypedActorRefProtocol(proxy, format).toByteArray
}
// wrapper for implicits to be used by Java
def fromBinaryJ[T <: Actor, U <: AnyRef](bytes: Array[Byte], format: Format[T]): U =
fromBinary(bytes)(format)
// wrapper for implicits to be used by Java
def toBinaryJ[T <: Actor](a: AnyRef, format: Format[T]): Array[Byte] =
toBinary(a)(format)
private def toSerializedTypedActorRefProtocol[T <: Actor](
proxy: AnyRef, format: Format[T]): SerializedTypedActorRefProtocol = {
val init = AspectInitRegistry.initFor(proxy)
if (init == null) throw new IllegalArgumentException("Proxy for typed actor could not be found in AspectInitRegistry.")
SerializedTypedActorRefProtocol.newBuilder
.setActorRef(ActorSerialization.toSerializedActorRefProtocol(init.actorRef, format))
.setInterfaceName(init.interfaceClass.getName)
.build
}
private def fromBinaryToLocalTypedActorRef[T <: Actor, U <: AnyRef](bytes: Array[Byte], format: Format[T]): U =
fromProtobufToLocalTypedActorRef(SerializedTypedActorRefProtocol.newBuilder.mergeFrom(bytes).build, format, None)
private def fromProtobufToLocalTypedActorRef[T <: Actor, U <: AnyRef](
protocol: SerializedTypedActorRefProtocol, format: Format[T], loader: Option[ClassLoader]): U = {
Actor.log.debug("Deserializing SerializedTypedActorRefProtocol to LocalActorRef:\n" + protocol)
val actorRef = ActorSerialization.fromProtobufToLocalActorRef(protocol.getActorRef, format, loader)
val intfClass = toClass(loader, protocol.getInterfaceName)
TypedActor.newInstance(intfClass, actorRef).asInstanceOf[U]
}
private[akka] def toClass[U <: AnyRef](loader: Option[ClassLoader], name: String): Class[U] = {
val classLoader = loader.getOrElse(getClass.getClassLoader)
val clazz = classLoader.loadClass(name)
clazz.asInstanceOf[Class[U]]
}
}
/**
* Module for remote typed actor serialization.
*/
object RemoteTypedActorSerialization {
/**
* Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance.
*/
def fromBinaryToRemoteTypedActorRef[T <: AnyRef](bytes: Array[Byte]): T =
fromProtobufToRemoteTypedActorRef(RemoteTypedActorRefProtocol.newBuilder.mergeFrom(bytes).build, None)
/**
* Deserializes a byte array (Array[Byte]) into a AW RemoteActorRef proxy.
*/
def fromBinaryToRemoteTypedActorRef[T <: AnyRef](bytes: Array[Byte], loader: ClassLoader): T =
fromProtobufToRemoteTypedActorRef(RemoteTypedActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader))
/**
* Serialize as AW RemoteActorRef proxy.
*/
def toBinary[T <: Actor](proxy: AnyRef): Array[Byte] = {
toRemoteTypedActorRefProtocol(proxy).toByteArray
}
/**
* Deserializes a RemoteTypedActorRefProtocol Protocol Buffers (protobuf) Message into AW RemoteActorRef proxy.
*/
private[akka] def fromProtobufToRemoteTypedActorRef[T](protocol: RemoteTypedActorRefProtocol, loader: Option[ClassLoader]): T = {
Actor.log.debug("Deserializing RemoteTypedActorRefProtocol to AW RemoteActorRef proxy:\n" + protocol)
val actorRef = RemoteActorSerialization.fromProtobufToRemoteActorRef(protocol.getActorRef, loader)
val intfClass = TypedActorSerialization.toClass(loader, protocol.getInterfaceName)
TypedActor.createProxyForRemoteActorRef(intfClass, actorRef).asInstanceOf[T]
}
/**
* Serializes the AW TypedActor proxy into a Protocol Buffers (protobuf) Message.
*/
def toRemoteTypedActorRefProtocol(proxy: AnyRef): RemoteTypedActorRefProtocol = {
val init = AspectInitRegistry.initFor(proxy)
RemoteTypedActorRefProtocol.newBuilder
.setActorRef(RemoteActorSerialization.toRemoteActorRefProtocol(init.actorRef))
.setInterfaceName(init.interfaceClass.getName)
.build
}
}

View file

@ -79,7 +79,6 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
}
}
@Test
def shouldSendWithBang {
val actor = RemoteClient.actorFor(
@ -178,5 +177,41 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
assert(actor2.id == actor3.id)
}
@Test
def shouldFindActorByUuid {
val actor1 = actorOf[RemoteActorSpecActorUnidirectional]
val actor2 = actorOf[RemoteActorSpecActorUnidirectional]
server.register("uuid:" + actor1.uuid, actor1)
server.register("my-service", actor2)
val ref1 = RemoteClient.actorFor("uuid:" + actor1.uuid, HOSTNAME, PORT)
val ref2 = RemoteClient.actorFor("my-service", HOSTNAME, PORT)
ref1 ! "OneWay"
assert(RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS))
ref1.stop
ref2 ! "OneWay"
ref2.stop
}
@Test
def shouldRegisterAndUnregister {
val actor1 = actorOf[RemoteActorSpecActorUnidirectional]
server.register("my-service-1", actor1)
assert(server.actors().get("my-service-1") != null, "actor registered")
server.unregister("my-service-1")
assert(server.actors().get("my-service-1") == null, "actor unregistered")
}
@Test
def shouldRegisterAndUnregisterByUuid {
val actor1 = actorOf[RemoteActorSpecActorUnidirectional]
server.register("uuid:" + actor1.uuid, actor1)
assert(server.actorsByUuid().get(actor1.uuid.toString) != null, "actor registered")
server.unregister("uuid:" + actor1.uuid)
assert(server.actorsByUuid().get(actor1.uuid) == null, "actor unregistered")
}
}

View file

@ -103,9 +103,34 @@ class ServerInitiatedRemoteTypedActorSpec extends
it("should register and unregister typed actors") {
val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
server.registerTypedActor("my-test-service", typedActor)
assert(server.typedActors().get("my-test-service") != null)
assert(server.typedActors().get("my-test-service") ne null, "typed actor registered")
server.unregisterTypedActor("my-test-service")
assert(server.typedActors().get("my-test-service") == null)
assert(server.typedActors().get("my-test-service") eq null, "typed actor unregistered")
}
it("should register and unregister typed actors by uuid") {
val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
val init = AspectInitRegistry.initFor(typedActor)
val uuid = "uuid:" + init.actorRef.uuid
server.registerTypedActor(uuid, typedActor)
assert(server.typedActorsByUuid().get(init.actorRef.uuid.toString) ne null, "typed actor registered")
server.unregisterTypedActor(uuid)
assert(server.typedActorsByUuid().get(init.actorRef.uuid.toString) eq null, "typed actor unregistered")
}
it("should find typed actors by uuid") {
val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
val init = AspectInitRegistry.initFor(typedActor)
val uuid = "uuid:" + init.actorRef.uuid
server.registerTypedActor(uuid, typedActor)
assert(server.typedActorsByUuid().get(init.actorRef.uuid.toString) ne null, "typed actor registered")
val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], uuid, HOSTNAME, PORT)
expect("oneway") {
actor.oneWay
oneWayLog.poll(5, TimeUnit.SECONDS)
}
}
}
}

View file

@ -127,9 +127,16 @@ class SerializableTypeClassActorSpec extends
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
actor1.mailboxSize should be > (0)
val actor2 = fromBinary(toBinary(actor1))
Thread.sleep(1000)
actor2.mailboxSize should be > (0)
(actor2 !! "hello-reply").getOrElse("_") should equal("world")
val actor3 = fromBinary(toBinary(actor1, false))
Thread.sleep(1000)
actor3.mailboxSize should equal(0)
(actor3 !! "hello-reply").getOrElse("_") should equal("world")
}
}
}

View file

@ -0,0 +1,126 @@
package se.scalablesolutions.akka.actor.serialization
import org.scalatest.Spec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import se.scalablesolutions.akka.serialization._
import se.scalablesolutions.akka.actor._
import ActorSerialization._
import Actor._
@RunWith(classOf[JUnitRunner])
class Ticket435Spec extends
Spec with
ShouldMatchers with
BeforeAndAfterAll {
object BinaryFormatMyStatefulActor {
implicit object MyStatefulActorFormat extends Format[MyStatefulActor] {
def fromBinary(bytes: Array[Byte], act: MyStatefulActor) = {
val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter]
act.count = p.getCount
act
}
def toBinary(ac: MyStatefulActor) =
ProtobufProtocol.Counter.newBuilder.setCount(ac.count).build.toByteArray
}
}
object BinaryFormatMyStatelessActorWithMessagesInMailbox {
implicit object MyStatelessActorFormat extends StatelessActorFormat[MyStatelessActorWithMessagesInMailbox]
}
describe("Serializable actor") {
it("should be able to serialize and deserialize a stateless actor with messages in mailbox") {
import BinaryFormatMyStatelessActorWithMessagesInMailbox._
val actor1 = actorOf[MyStatelessActorWithMessagesInMailbox].start
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
actor1.mailboxSize should be > (0)
val actor2 = fromBinary(toBinary(actor1))
Thread.sleep(1000)
actor2.mailboxSize should be > (0)
(actor2 !! "hello-reply").getOrElse("_") should equal("world")
val actor3 = fromBinary(toBinary(actor1, false))
Thread.sleep(1000)
actor3.mailboxSize should equal(0)
(actor3 !! "hello-reply").getOrElse("_") should equal("world")
}
it("should serialize the mailbox optionally") {
import BinaryFormatMyStatelessActorWithMessagesInMailbox._
val actor1 = actorOf[MyStatelessActorWithMessagesInMailbox].start
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
(actor1 ! "hello")
actor1.mailboxSize should be > (0)
val actor2 = fromBinary(toBinary(actor1, false))
Thread.sleep(1000)
actor2.mailboxSize should equal(0)
(actor2 !! "hello-reply").getOrElse("_") should equal("world")
}
it("should be able to serialize and deserialize a stateful actor with messages in mailbox") {
import BinaryFormatMyStatefulActor._
val actor1 = actorOf[MyStatefulActor].start
(actor1 ! "hi")
(actor1 ! "hi")
(actor1 ! "hi")
(actor1 ! "hi")
(actor1 ! "hi")
(actor1 ! "hi")
(actor1 ! "hi")
(actor1 ! "hi")
(actor1 ! "hi")
(actor1 ! "hi")
actor1.mailboxSize should be > (0)
val actor2 = fromBinary(toBinary(actor1))
Thread.sleep(1000)
actor2.mailboxSize should be > (0)
(actor2 !! "hello").getOrElse("_") should equal("world 1")
val actor3 = fromBinary(toBinary(actor1, false))
Thread.sleep(1000)
actor3.mailboxSize should equal(0)
(actor3 !! "hello").getOrElse("_") should equal("world 1")
}
}
}
class MyStatefulActor extends Actor {
var count = 0
def receive = {
case "hi" =>
println("# messages in mailbox " + self.mailboxSize)
Thread.sleep(500)
case "hello" =>
count = count + 1
self.reply("world " + count)
}
}

View file

@ -0,0 +1,166 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor.serialization
import org.scalatest.Spec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import se.scalablesolutions.akka.serialization._
import se.scalablesolutions.akka.actor._
import TypedActorSerialization._
import Actor._
import se.scalablesolutions.akka.remote.{RemoteClient, RemoteServer}
import se.scalablesolutions.akka.actor.remote.ServerInitiatedRemoteActorSpec.RemoteActorSpecActorUnidirectional
@RunWith(classOf[JUnitRunner])
class TypedActorSerializationSpec extends
Spec with
ShouldMatchers with
BeforeAndAfterAll {
var server1: RemoteServer = null
var typedActor: MyTypedActor = null
override def beforeAll = {
server1 = new RemoteServer().start("localhost", 9991)
typedActor = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyTypedActorImpl], 1000)
server1.registerTypedActor("typed-actor-service", typedActor)
Thread.sleep(1000)
}
// make sure the servers shutdown cleanly after the test has finished
override def afterAll = {
try {
TypedActor.stop(typedActor)
server1.shutdown
RemoteClient.shutdownAll
Thread.sleep(1000)
} catch {
case e => ()
}
}
object MyTypedStatelessActorFormat extends StatelessActorFormat[MyStatelessTypedActorImpl]
class MyTypedActorFormat extends Format[MyTypedActorImpl] {
def fromBinary(bytes: Array[Byte], act: MyTypedActorImpl) = {
val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter]
act.count = p.getCount
act
}
def toBinary(ac: MyTypedActorImpl) =
ProtobufProtocol.Counter.newBuilder.setCount(ac.count).build.toByteArray
}
class MyTypedActorWithDualCounterFormat extends Format[MyTypedActorWithDualCounter] {
def fromBinary(bytes: Array[Byte], act: MyTypedActorWithDualCounter) = {
val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.DualCounter])).asInstanceOf[ProtobufProtocol.DualCounter]
act.count1 = p.getCount1
act.count2 = p.getCount2
act
}
def toBinary(ac: MyTypedActorWithDualCounter) =
ProtobufProtocol.DualCounter.newBuilder.setCount1(ac.count1).setCount2(ac.count2).build.toByteArray
}
describe("Serializable typed actor") {
it("should be able to serialize and de-serialize a stateless typed actor") {
val typedActor1 = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyStatelessTypedActorImpl], 1000)
typedActor1.requestReply("hello") should equal("world")
typedActor1.requestReply("hello") should equal("world")
val bytes = toBinaryJ(typedActor1, MyTypedStatelessActorFormat)
val typedActor2: MyTypedActor = fromBinaryJ(bytes, MyTypedStatelessActorFormat)
typedActor2.requestReply("hello") should equal("world")
}
it("should be able to serialize and de-serialize a stateful typed actor") {
val typedActor1 = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyTypedActorImpl], 1000)
typedActor1.requestReply("hello") should equal("world 1")
typedActor1.requestReply("scala") should equal("hello scala 2")
val f = new MyTypedActorFormat
val bytes = toBinaryJ(typedActor1, f)
val typedActor2: MyTypedActor = fromBinaryJ(bytes, f)
typedActor2.requestReply("hello") should equal("world 3")
}
it("should be able to serialize and de-serialize a stateful typed actor with compound state") {
val typedActor1 = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyTypedActorWithDualCounter], 1000)
typedActor1.requestReply("hello") should equal("world 1 1")
typedActor1.requestReply("hello") should equal("world 2 2")
val f = new MyTypedActorWithDualCounterFormat
val bytes = toBinaryJ(typedActor1, f)
val typedActor2: MyTypedActor = fromBinaryJ(bytes, f)
typedActor2.requestReply("hello") should equal("world 3 3")
}
it("should be able to serialize a local yped actor ref to a remote typed actor ref proxy") {
val typedActor1 = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyStatelessTypedActorImpl], 1000)
typedActor1.requestReply("hello") should equal("world")
typedActor1.requestReply("hello") should equal("world")
val bytes = RemoteTypedActorSerialization.toBinary(typedActor1)
val typedActor2: MyTypedActor = RemoteTypedActorSerialization.fromBinaryToRemoteTypedActorRef(bytes)
typedActor1.requestReply("hello") should equal("world")
}
}
}
trait MyTypedActor {
def requestReply(s: String) : String
def oneWay() : Unit
}
class MyTypedActorImpl extends TypedActor with MyTypedActor {
var count = 0
override def oneWay() {
println("got oneWay message")
}
override def requestReply(message: String) : String = {
count = count + 1
if (message == "hello") {
"world " + count
} else ("hello " + message + " " + count)
}
}
class MyTypedActorWithDualCounter extends TypedActor with MyTypedActor {
var count1 = 0
var count2 = 0
override def oneWay() {
println("got oneWay message")
}
override def requestReply(message: String) : String = {
count1 = count1 + 1
count2 = count2 + 1
if (message == "hello") {
"world " + count1 + " " + count2
} else ("hello " + message + " " + count1 + " " + count2)
}
}
class MyStatelessTypedActorImpl extends TypedActor with MyTypedActor {
override def oneWay() {
println("got oneWay message")
}
override def requestReply(message: String) : String = {
if (message == "hello") "world" else ("hello " + message)
}
}

View file

@ -390,11 +390,22 @@ object TypedActor extends Logging {
if (config._messageDispatcher.isDefined) actorRef.dispatcher = config._messageDispatcher.get
if (config._threadBasedDispatcher.isDefined) actorRef.dispatcher = Dispatchers.newThreadBasedDispatcher(actorRef)
if (config._host.isDefined) actorRef.makeRemote(config._host.get)
actorRef.timeout = config.timeout
AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, config._host, config.timeout))
actorRef.start
proxy.asInstanceOf[T]
}
private[akka] def newInstance[T](intfClass: Class[T], actorRef: ActorRef): T = {
if (!actorRef.actorInstance.get.isInstanceOf[TypedActor]) throw new IllegalArgumentException("ActorRef is not a ref to a typed actor")
val typedActor = actorRef.actorInstance.get.asInstanceOf[TypedActor]
val proxy = Proxy.newInstance(Array(intfClass), Array(typedActor), true, false)
typedActor.initialize(proxy)
AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, actorRef.remoteAddress, actorRef.timeout))
actorRef.start
proxy.asInstanceOf[T]
}
private[akka] def newInstance[T](intfClass: Class[T], targetClass: Class[_],
remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
val actorRef = actorOf(newTypedActor(targetClass))