diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index 597e6bb1d1..e680f13c58 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -28,6 +28,17 @@ import java.lang.reflect.Field import scala.reflect.BeanProperty + +object ActorRefStatus { + /** LifeCycles for ActorRefs + */ + private[akka] sealed trait StatusType + object UNSTARTED extends StatusType + object RUNNING extends StatusType + object BEING_RESTARTED extends StatusType + object SHUTDOWN extends StatusType +} + /** * ActorRef is an immutable and serializable handle to an Actor. *

@@ -68,9 +79,7 @@ trait ActorRef extends // Only mutable for RemoteServer in order to maintain identity across nodes @volatile protected[akka] var _uuid = newUuid - @volatile protected[this] var _isRunning = false - @volatile protected[this] var _isShutDown = false - @volatile protected[akka] var _isBeingRestarted = false + @volatile protected[this] var _status: ActorRefStatus.StatusType = ActorRefStatus.UNSTARTED @volatile protected[akka] var _homeAddress = new InetSocketAddress(RemoteServerModule.HOSTNAME, RemoteServerModule.PORT) @volatile protected[akka] var _futureTimeout: Option[ScheduledFuture[AnyRef]] = None @volatile protected[akka] var registeredInRemoteNodeDuringSerialization = false @@ -229,17 +238,25 @@ trait ActorRef extends /** * Is the actor being restarted? */ - def isBeingRestarted: Boolean = _isBeingRestarted + def isBeingRestarted: Boolean = _status == ActorRefStatus.BEING_RESTARTED /** * Is the actor running? */ - def isRunning: Boolean = _isRunning + def isRunning: Boolean = _status match { + case ActorRefStatus.BEING_RESTARTED | ActorRefStatus.RUNNING => true + case _ => false + } /** * Is the actor shut down? */ - def isShutdown: Boolean = _isShutDown + def isShutdown: Boolean = _status == ActorRefStatus.SHUTDOWN + + /** + * Is the actor ever started? + */ + def isUnstarted: Boolean = _status == ActorRefStatus.UNSTARTED /** * Is the actor able to handle the message passed in as arguments? @@ -800,7 +817,7 @@ class LocalActorRef private[akka]( if (isTransactor) { _transactionFactory = Some(TransactionFactory(_transactionConfig, id)) } - _isRunning = true + _status = ActorRefStatus.RUNNING if (!isInInitialization) initializeActorInstance else runActorInitialization = true } @@ -815,8 +832,7 @@ class LocalActorRef private[akka]( cancelReceiveTimeout dispatcher.unregister(this) _transactionFactory = None - _isRunning = false - _isShutDown = true + _status = ActorRefStatus.SHUTDOWN actor.postStop ActorRegistry.unregister(this) if (isRemotingEnabled) { @@ -1000,7 +1016,7 @@ class LocalActorRef private[akka]( } /** - * Callback for the dispatcher. This is the ingle entry point to the user Actor implementation. + * Callback for the dispatcher. This is the single entry point to the user Actor implementation. */ protected[akka] def invoke(messageHandle: MessageInvocation): Unit = guard.withGuard { if (isShutdown) @@ -1067,7 +1083,7 @@ class LocalActorRef private[akka]( stop } else { - _isBeingRestarted = true + _status = ActorRefStatus.BEING_RESTARTED val failedActor = actorInstance.get guard.withGuard { lifeCycle match { @@ -1077,10 +1093,12 @@ class LocalActorRef private[akka]( Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id) Actor.log.debug("Restarting linked actors for actor [%s].", id) restartLinkedActors(reason, maxNrOfRetries, withinTimeRange) + Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id) if (isProxyableDispatcher(failedActor)) restartProxyableDispatcher(failedActor, reason) - else restartActor(failedActor, reason) - _isBeingRestarted = false + else restartActor(failedActor, reason) + + _status = ActorRefStatus.RUNNING } } } @@ -1236,7 +1254,7 @@ class LocalActorRef private[akka]( private def handleExceptionInDispatch(reason: Throwable, message: Any, topLevelTransaction: Boolean) = { Actor.log.error(reason, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message) - _isBeingRestarted = true + _status = ActorRefStatus.BEING_RESTARTED // abort transaction set if (isTransactionSetInScope) { val txSet = getTransactionSetInScope @@ -1376,13 +1394,12 @@ private[akka] case class RemoteActorRef private[akka] ( } def start: ActorRef = { - _isRunning = true + _status = ActorRefStatus.RUNNING this } def stop: Unit = { - _isRunning = false - _isShutDown = true + _status = ActorRefStatus.SHUTDOWN postMessageToMailbox(RemoteActorSystemMessage.Stop, None) } diff --git a/akka-actor/src/main/scala/dataflow/DataFlowVariable.scala b/akka-actor/src/main/scala/dataflow/DataFlowVariable.scala index 6608f6075b..329682de52 100644 --- a/akka-actor/src/main/scala/dataflow/DataFlowVariable.scala +++ b/akka-actor/src/main/scala/dataflow/DataFlowVariable.scala @@ -150,4 +150,46 @@ object DataFlow { def shutdown = in ! Exit } -} \ No newline at end of file + + /** + * @author Jonas Bonér + */ + class DataFlowStream[T <: Any] extends Seq[T] { + private[this] val queue = new LinkedBlockingQueue[DataFlowVariable[T]] + + def <<<(ref: DataFlowVariable[T]) = queue.offer(ref) + + def <<<(value: T) = { + val ref = new DataFlowVariable[T] + ref << value + queue.offer(ref) + } + + def apply(): T = { + val ref = queue.take + val result = ref() + ref.shutdown + result + } + + def take: DataFlowVariable[T] = queue.take + + //==== For Seq ==== + + def length: Int = queue.size + + def apply(i: Int): T = { + if (i == 0) apply() + else throw new UnsupportedOperationException( + "Access by index other than '0' is not supported by DataFlowStream") + } + + def iterator: Iterator[T] = new Iterator[T] { + private val iter = queue.iterator + def hasNext: Boolean = iter.hasNext + def next: T = { val ref = iter.next; ref() } + } + + override def toList: List[T] = queue.toArray.toList.asInstanceOf[List[T]] + } +} diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 63ce310848..19e9cd38e7 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -188,4 +188,4 @@ class ExecutorBasedEventDrivenDispatcher( config(this) buildThreadPool } -} +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala index eda5a86a9e..090be85cee 100644 --- a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala @@ -11,6 +11,14 @@ import se.scalablesolutions.akka.config.Config.config import concurrent.forkjoin.{TransferQueue, LinkedTransferQueue} import java.util.concurrent.{ConcurrentLinkedQueue, BlockingQueue, TimeUnit, LinkedBlockingQueue} +object ThreadBasedDispatcher { + def oneThread(b: ThreadPoolBuilder) { + b setCorePoolSize 1 + b setMaxPoolSize 1 + b setAllowCoreThreadTimeout true + } +} + /** * Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue. * @@ -18,16 +26,14 @@ import java.util.concurrent.{ConcurrentLinkedQueue, BlockingQueue, TimeUnit, Lin */ class ThreadBasedDispatcher(private val actor: ActorRef, val mailboxConfig: MailboxConfig - ) extends MessageDispatcher { + ) extends ExecutorBasedEventDrivenDispatcher( + actor.getClass.getName + ":" + actor.uuid, + Dispatchers.THROUGHPUT, + -1, + mailboxConfig, + ThreadBasedDispatcher.oneThread) { def this(actor: ActorRef, capacity: Int) = this(actor,MailboxConfig(capacity,None,true)) def this(actor: ActorRef) = this(actor, Dispatchers.MAILBOX_CAPACITY)// For Java - - private val name = actor.getClass.getName + ":" + actor.uuid - private val threadName = "akka:thread-based:dispatcher:" + name - private var selectorThread: Thread = _ - @volatile private var active: Boolean = false - - override def createMailbox(actorRef: ActorRef): AnyRef = mailboxConfig.newMailbox(blockDequeue = true) override def register(actorRef: ActorRef) = { if(actorRef != actor) @@ -36,35 +42,5 @@ class ThreadBasedDispatcher(private val actor: ActorRef, super.register(actorRef) } - def mailbox = actor.mailbox.asInstanceOf[Queue[MessageInvocation] with MessageQueue] - - def mailboxSize(a: ActorRef) = mailbox.size - - def dispatch(invocation: MessageInvocation) = mailbox enqueue invocation - - def start = if (!active) { - log.debug("Starting up %s", toString) - active = true - selectorThread = new Thread(threadName) { - override def run = { - while (active) { - try { - actor.invoke(mailbox.dequeue) - } catch { case e: InterruptedException => active = false } - } - } - } - selectorThread.start - } - - def isShutdown = !active - - def shutdown = if (active) { - log.debug("Shutting down %s", toString) - active = false - selectorThread.interrupt - uuids.clear - } - - override def toString = "ThreadBasedDispatcher[" + threadName + "]" + override def toString = "ThreadBasedDispatcher[" + name + "]" } \ No newline at end of file diff --git a/akka-actor/src/test/scala/dataflow/DataFlowSpec.scala b/akka-actor/src/test/scala/dataflow/DataFlowSpec.scala index 2997715452..d1f663e9f4 100644 --- a/akka-actor/src/test/scala/dataflow/DataFlowSpec.scala +++ b/akka-actor/src/test/scala/dataflow/DataFlowSpec.scala @@ -69,5 +69,97 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll { result.get should equal (sum(0,ints(0,1000))) List(x,y,z).foreach(_.shutdown) } + + /*it("should be able to join streams") { + import DataFlow._ + ActorRegistry.shutdownAll + + def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { + stream <<< n + ints(n + 1, max, stream) + } + + def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = { + out <<< s + sum(in() + s, in, out) + } + + val producer = new DataFlowStream[Int] + val consumer = new DataFlowStream[Int] + val latch = new CountDownLatch(1) + val result = new AtomicInteger(0) + + val t1 = thread { ints(0, 1000, producer) } + val t2 = thread { + Thread.sleep(1000) + result.set(producer.map(x => x * x).foldLeft(0)(_ + _)) + latch.countDown + } + + latch.await(3,TimeUnit.SECONDS) should equal (true) + result.get should equal (332833500) } + + it("should be able to sum streams recursively") { + import DataFlow._ + + def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { + stream <<< n + ints(n + 1, max, stream) + } + + def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = { + out <<< s + sum(in() + s, in, out) + } + + val result = new AtomicLong(0) + + val producer = new DataFlowStream[Int] + val consumer = new DataFlowStream[Int] + val latch = new CountDownLatch(1) + + @tailrec def recurseSum(stream: DataFlowStream[Int]): Unit = { + val x = stream() + + if(result.addAndGet(x) == 166666500) + latch.countDown + + recurseSum(stream) + } + + thread { ints(0, 1000, producer) } + thread { sum(0, producer, consumer) } + thread { recurseSum(consumer) } + + latch.await(15,TimeUnit.SECONDS) should equal (true) + }*/ + + /* Test not ready for prime time, causes some sort of deadlock */ + /* it("should be able to conditionally set variables") { + + import DataFlow._ + ActorRegistry.shutdownAll + + val latch = new CountDownLatch(1) + val x, y, z, v = new DataFlowVariable[Int] + + val main = thread { + x << 1 + z << Math.max(x(),y()) + latch.countDown + } + + val setY = thread { + // Thread.sleep(2000) + y << 2 + } + + val setV = thread { + v << y + } + List(x,y,z,v) foreach (_.shutdown) + latch.await(2,TimeUnit.SECONDS) should equal (true) + }*/ + } } \ No newline at end of file diff --git a/akka-actor/src/test/scala/dispatch/HawtDispatcherEchoServer.scala b/akka-actor/src/test/scala/dispatch/HawtDispatcherEchoServer.scala index 8d4c8dedc1..6196a13490 100644 --- a/akka-actor/src/test/scala/dispatch/HawtDispatcherEchoServer.scala +++ b/akka-actor/src/test/scala/dispatch/HawtDispatcherEchoServer.scala @@ -146,7 +146,7 @@ object HawtDispatcherEchoServer { read_source.setEventHandler(^{ read }) read_source.setCancelHandler(^{ close }) - write_source = createSource(channel, SelectionKey.OP_READ, HawtDispatcher.queue(self)); + write_source = createSource(channel, SelectionKey.OP_WRITE, HawtDispatcher.queue(self)); write_source.setEventHandler(^{ write }) write_source.setCancelHandler(^{ close }) diff --git a/akka-camel/src/main/scala/component/ActorComponent.scala b/akka-camel/src/main/scala/component/ActorComponent.scala index e0a70e255c..a9c96eebb9 100644 --- a/akka-camel/src/main/scala/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/component/ActorComponent.scala @@ -19,9 +19,11 @@ import CamelMessageConversion.toExchangeAdapter import se.scalablesolutions.akka.dispatch.{CompletableFuture, MessageInvocation, MessageDispatcher} import se.scalablesolutions.akka.stm.TransactionConfig import se.scalablesolutions.akka.actor.{ScalaActorRef, ActorRegistry, Actor, ActorRef, Uuid, uuidFrom} + import se.scalablesolutions.akka.AkkaException import scala.reflect.BeanProperty +import se.scalablesolutions.akka.actor._ /** * Camel component for sending messages to and receiving replies from (untyped) actors. @@ -196,13 +198,12 @@ private[akka] object AsyncCallbackAdapter { private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCallback) extends ActorRef with ScalaActorRef { def start = { - _isRunning = true + _status = ActorRefStatus.RUNNING this } def stop() = { - _isRunning = false - _isShutDown = true + _status = ActorRefStatus.SHUTDOWN } /** diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala index 4d9ff48a60..088c0b8ff4 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala @@ -82,7 +82,6 @@ trait Storage { */ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] with Transactional with Committable with Abortable with Logging { - protected val shouldClearOnCommit = Ref[Boolean]() // operations on the Map trait Op @@ -90,11 +89,12 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] case object PUT extends Op case object REM extends Op case object UPD extends Op + case object CLR extends Op // append only log: records all mutating operations protected val appendOnlyTxLog = TransactionalVector[LogEntry]() - case class LogEntry(key: K, value: Option[V], op: Op) + case class LogEntry(key: Option[K], value: Option[V], op: Op) // need to override in subclasses e.g. "sameElements" for Array[Byte] def equal(k1: K, k2: K): Boolean = k1 == k2 @@ -114,7 +114,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] protected def clearDistinctKeys = keysInCurrentTx.clear protected def filterTxLogByKey(key: K): IndexedSeq[LogEntry] = - appendOnlyTxLog filter(e => equal(e.key, key)) + appendOnlyTxLog filter(e => e.key.map(equal(_, key)).getOrElse(true)) // need to get current value considering the underlying storage as well as the transaction log protected def getCurrentValue(key: K): Option[V] = { @@ -128,7 +128,10 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] } catch { case e: Exception => None } if (txEntries.isEmpty) underlying - else replay(txEntries, key, underlying) + else txEntries.last match { + case LogEntry(_, _, CLR) => None + case _ => replay(txEntries, key, underlying) + } } // replay all tx entries for key k with seed = initial @@ -140,9 +143,10 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] case Some(v) => Map((key, v)) } txEntries.foreach {case LogEntry(k, v, o) => o match { - case PUT => m.put(k, v.get) - case REM => m -= k - case UPD => m.update(k, v.get) + case PUT => m.put(k.get, v.get) + case REM => m -= k.get + case UPD => m.update(k.get, v.get) + case CLR => Map.empty[K, V] }} m get key } @@ -151,12 +155,11 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] val storage: MapStorageBackend[K, V] def commit = { - // if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get) storage.removeMapStorageFor(uuid) - appendOnlyTxLog.foreach { case LogEntry(k, v, o) => o match { - case PUT => storage.insertMapStorageEntryFor(uuid, k, v.get) - case UPD => storage.insertMapStorageEntryFor(uuid, k, v.get) - case REM => storage.removeMapStorageFor(uuid, k) + case PUT => storage.insertMapStorageEntryFor(uuid, k.get, v.get) + case UPD => storage.insertMapStorageEntryFor(uuid, k.get, v.get) + case REM => storage.removeMapStorageFor(uuid, k.get) + case CLR => storage.removeMapStorageFor(uuid) }} appendOnlyTxLog.clear @@ -166,7 +169,6 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] def abort = { appendOnlyTxLog.clear clearDistinctKeys - shouldClearOnCommit.swap(false) } def -=(key: K) = { @@ -187,7 +189,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] override def put(key: K, value: V): Option[V] = { register val curr = getCurrentValue(key) - appendOnlyTxLog add LogEntry(key, Some(value), PUT) + appendOnlyTxLog add LogEntry(Some(key), Some(value), PUT) addToListOfKeysInTx(key) curr } @@ -195,7 +197,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] override def update(key: K, value: V) = { register val curr = getCurrentValue(key) - appendOnlyTxLog add LogEntry(key, Some(value), UPD) + appendOnlyTxLog add LogEntry(Some(key), Some(value), UPD) addToListOfKeysInTx(key) curr } @@ -203,7 +205,7 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] override def remove(key: K) = { register val curr = getCurrentValue(key) - appendOnlyTxLog add LogEntry(key, None, REM) + appendOnlyTxLog add LogEntry(Some(key), None, REM) addToListOfKeysInTx(key) curr } @@ -215,9 +217,8 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] override def clear = { register - appendOnlyTxLog.clear + appendOnlyTxLog add LogEntry(None, None, CLR) clearDistinctKeys - shouldClearOnCommit.swap(true) } override def contains(key: K): Boolean = try { @@ -225,7 +226,8 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V] case Seq() => // current tx doesn't use this storage.getMapStorageEntryFor(uuid, key).isDefined // check storage case txs => // present in log - txs.last.op != REM // last entry cannot be a REM + val lastOp = txs.last.op + lastOp != REM && lastOp != CLR // last entry cannot be a REM } } catch { case e: Exception => false } @@ -366,11 +368,6 @@ trait PersistentMapBinary extends PersistentMap[Array[Byte], Array[Byte]] { * @author Jonas Bonér */ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committable with Abortable { - protected val newElems = TransactionalVector[T]() - protected val updatedElems = TransactionalMap[Int, T]() - protected val removedElems = TransactionalVector[T]() - protected val shouldClearOnCommit = Ref[Boolean]() - // operations on the Vector trait Op case object ADD extends Op @@ -400,7 +397,6 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa def abort = { appendOnlyTxLog.clear - shouldClearOnCommit.swap(false) } private def replay: List[T] = { @@ -466,14 +462,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa override def first: T = get(0) - override def last: T = { - if (newElems.length != 0) newElems.last - else { - val len = length - if (len == 0) throw new NoSuchElementException("Vector is empty") - get(len - 1) - } - } + override def last: T = replay.last def length: Int = replay.length diff --git a/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorageBackend.scala b/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorageBackend.scala index 01d8ababce..6573100422 100644 --- a/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorageBackend.scala +++ b/akka-persistence/akka-persistence-mongo/src/main/scala/MongoStorageBackend.scala @@ -9,7 +9,6 @@ import se.scalablesolutions.akka.persistence.common._ import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.config.Config.config -import java.util.NoSuchElementException import com.novus.casbah.mongodb.Imports._ /** diff --git a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoTicket343Spec.scala b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoTicket343Spec.scala index 3b160c8c50..413be5d860 100644 --- a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoTicket343Spec.scala +++ b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoTicket343Spec.scala @@ -238,7 +238,7 @@ class MongoTicket343Spec extends val add = List(("a", "1"), ("b", "2"), ("c", "3")) (proc !! CLEAR_AFTER_PUT(add)).getOrElse("CLEAR_AFTER_PUT failed") should equal(true) - (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1) + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(0) proc.stop } } diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala index 9fd3142019..6f2052f0bd 100644 --- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala +++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala @@ -359,7 +359,6 @@ private [akka] object RedisStorageBackend extends case e: java.lang.NullPointerException => throw new StorageException("Could not connect to Redis server") case e => - e.printStackTrace throw new StorageException("Error in Redis: " + e.getMessage) } } diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket343Spec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket343Spec.scala index de236b9a5a..2b06b17270 100644 --- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket343Spec.scala +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket343Spec.scala @@ -32,6 +32,10 @@ case class VUPD(i: Int, v: String) case class VUPD_AND_ABORT(i: Int, v: String) case class VGET(i: Int) case object VSIZE +case object VLAST +case object VFIRST +case class VLAST_AFTER_ADD(vsToAdd: List[String]) +case class VFIRST_AFTER_ADD(vsToAdd: List[String]) case class VGET_AFTER_VADD(vsToAdd: List[String], isToFetch: List[Int]) case class VADD_WITH_SLICE(vsToAdd: List[String], start: Int, cnt: Int) @@ -175,6 +179,30 @@ object Storage { fooVector.slice(Some(s), None, c) } self.reply(l.map(new String(_))) + + case VLAST => + val l = atomic { fooVector last } + self.reply(l) + + case VFIRST => + val l = atomic { fooVector first } + self.reply(l) + + case VLAST_AFTER_ADD(vs) => + val l = + atomic { + vs.foreach(fooVector + _.getBytes) + fooVector last + } + self.reply(l) + + case VFIRST_AFTER_ADD(vs) => + val l = + atomic { + vs.foreach(fooVector + _.getBytes) + fooVector first + } + self.reply(l) } } } @@ -243,7 +271,7 @@ class RedisTicket343Spec extends val add = List(("a", "1"), ("b", "2"), ("c", "3")) (proc !! CLEAR_AFTER_PUT(add)).getOrElse("CLEAR_AFTER_PUT failed") should equal(true) - (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1) + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(0) proc.stop } } @@ -344,7 +372,26 @@ class RedisTicket343Spec extends (proc !! VADD_WITH_SLICE(List(), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("maulindu", "debasish")) // slice with new elements added in current transaction - (proc !! VADD_WITH_SLICE(List("a", "b", "c", "d"), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("b", "a")) + (proc !! VADD_WITH_SLICE(List("a", "b", "c", "d"), 2, 4)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("b", "a", "nilanjan", "ramanendu")) + proc.stop + } + } + + describe("Miscellaneous vector ops") { + it("vector slice() should not ignore elements added in current transaction") { + val proc = actorOf[RedisSampleVectorStorage] + proc.start + + // add 4 elements in separate transactions + (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1) + (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2) + (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3) + (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4) + + new String((proc !! VLAST).getOrElse("VLAST failed").asInstanceOf[Array[Byte]]) should equal("debasish") + new String((proc !! VFIRST).getOrElse("VFIRST failed").asInstanceOf[Array[Byte]]) should equal("nilanjan") + new String((proc !! VLAST_AFTER_ADD(List("kausik", "tarun"))).getOrElse("VLAST_AFTER_ADD failed").asInstanceOf[Array[Byte]]) should equal("debasish") + new String((proc !! VFIRST_AFTER_ADD(List("kausik", "tarun"))).getOrElse("VFIRST_AFTER_ADD failed").asInstanceOf[Array[Byte]]) should equal("tarun") proc.stop } } diff --git a/akka-remote/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java b/akka-remote/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java index bc7e21a870..e5265ea396 100644 --- a/akka-remote/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java +++ b/akka-remote/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java @@ -653,6 +653,360 @@ public final class RemoteProtocol { // @@protoc_insertion_point(class_scope:RemoteActorRefProtocol) } + public static final class RemoteTypedActorRefProtocol extends + com.google.protobuf.GeneratedMessage { + // Use RemoteTypedActorRefProtocol.newBuilder() to construct. + private RemoteTypedActorRefProtocol() { + initFields(); + } + private RemoteTypedActorRefProtocol(boolean noInit) {} + + private static final RemoteTypedActorRefProtocol defaultInstance; + public static RemoteTypedActorRefProtocol getDefaultInstance() { + return defaultInstance; + } + + public RemoteTypedActorRefProtocol getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return se.scalablesolutions.akka.remote.protocol.RemoteProtocol.internal_static_RemoteTypedActorRefProtocol_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return se.scalablesolutions.akka.remote.protocol.RemoteProtocol.internal_static_RemoteTypedActorRefProtocol_fieldAccessorTable; + } + + // required .RemoteActorRefProtocol actorRef = 1; + public static final int ACTORREF_FIELD_NUMBER = 1; + private boolean hasActorRef; + private se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol actorRef_; + public boolean hasActorRef() { return hasActorRef; } + public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol getActorRef() { return actorRef_; } + + // required string interfaceName = 2; + public static final int INTERFACENAME_FIELD_NUMBER = 2; + private boolean hasInterfaceName; + private java.lang.String interfaceName_ = ""; + public boolean hasInterfaceName() { return hasInterfaceName; } + public java.lang.String getInterfaceName() { return interfaceName_; } + + private void initFields() { + actorRef_ = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.getDefaultInstance(); + } + public final boolean isInitialized() { + if (!hasActorRef) return false; + if (!hasInterfaceName) return false; + if (!getActorRef().isInitialized()) return false; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (hasActorRef()) { + output.writeMessage(1, getActorRef()); + } + if (hasInterfaceName()) { + output.writeString(2, getInterfaceName()); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (hasActorRef()) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(1, getActorRef()); + } + if (hasInterfaceName()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(2, getInterfaceName()); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder { + private se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol result; + + // Construct using se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol.newBuilder() + private Builder() {} + + private static Builder create() { + Builder builder = new Builder(); + builder.result = new se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol(); + return builder; + } + + protected se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol internalGetResult() { + return result; + } + + public Builder clear() { + if (result == null) { + throw new IllegalStateException( + "Cannot call clear() after build()."); + } + result = new se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol(); + return this; + } + + public Builder clone() { + return create().mergeFrom(result); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol.getDescriptor(); + } + + public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol getDefaultInstanceForType() { + return se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol.getDefaultInstance(); + } + + public boolean isInitialized() { + return result.isInitialized(); + } + public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol build() { + if (result != null && !isInitialized()) { + throw newUninitializedMessageException(result); + } + return buildPartial(); + } + + private se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + if (!isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return buildPartial(); + } + + public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol buildPartial() { + if (result == null) { + throw new IllegalStateException( + "build() has already been called on this Builder."); + } + se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol returnMe = result; + result = null; + return returnMe; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol) { + return mergeFrom((se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol other) { + if (other == se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol.getDefaultInstance()) return this; + if (other.hasActorRef()) { + mergeActorRef(other.getActorRef()); + } + if (other.hasInterfaceName()) { + setInterfaceName(other.getInterfaceName()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder( + this.getUnknownFields()); + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + this.setUnknownFields(unknownFields.build()); + return this; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + this.setUnknownFields(unknownFields.build()); + return this; + } + break; + } + case 10: { + se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.Builder subBuilder = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.newBuilder(); + if (hasActorRef()) { + subBuilder.mergeFrom(getActorRef()); + } + input.readMessage(subBuilder, extensionRegistry); + setActorRef(subBuilder.buildPartial()); + break; + } + case 18: { + setInterfaceName(input.readString()); + break; + } + } + } + } + + + // required .RemoteActorRefProtocol actorRef = 1; + public boolean hasActorRef() { + return result.hasActorRef(); + } + public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol getActorRef() { + return result.getActorRef(); + } + public Builder setActorRef(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasActorRef = true; + result.actorRef_ = value; + return this; + } + public Builder setActorRef(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.Builder builderForValue) { + result.hasActorRef = true; + result.actorRef_ = builderForValue.build(); + return this; + } + public Builder mergeActorRef(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol value) { + if (result.hasActorRef() && + result.actorRef_ != se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.getDefaultInstance()) { + result.actorRef_ = + se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.newBuilder(result.actorRef_).mergeFrom(value).buildPartial(); + } else { + result.actorRef_ = value; + } + result.hasActorRef = true; + return this; + } + public Builder clearActorRef() { + result.hasActorRef = false; + result.actorRef_ = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.getDefaultInstance(); + return this; + } + + // required string interfaceName = 2; + public boolean hasInterfaceName() { + return result.hasInterfaceName(); + } + public java.lang.String getInterfaceName() { + return result.getInterfaceName(); + } + public Builder setInterfaceName(java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasInterfaceName = true; + result.interfaceName_ = value; + return this; + } + public Builder clearInterfaceName() { + result.hasInterfaceName = false; + result.interfaceName_ = getDefaultInstance().getInterfaceName(); + return this; + } + + // @@protoc_insertion_point(builder_scope:RemoteTypedActorRefProtocol) + } + + static { + defaultInstance = new RemoteTypedActorRefProtocol(true); + se.scalablesolutions.akka.remote.protocol.RemoteProtocol.internalForceInit(); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:RemoteTypedActorRefProtocol) + } + public static final class SerializedActorRefProtocol extends com.google.protobuf.GeneratedMessage { // Use SerializedActorRefProtocol.newBuilder() to construct. @@ -1582,6 +1936,360 @@ public final class RemoteProtocol { // @@protoc_insertion_point(class_scope:SerializedActorRefProtocol) } + public static final class SerializedTypedActorRefProtocol extends + com.google.protobuf.GeneratedMessage { + // Use SerializedTypedActorRefProtocol.newBuilder() to construct. + private SerializedTypedActorRefProtocol() { + initFields(); + } + private SerializedTypedActorRefProtocol(boolean noInit) {} + + private static final SerializedTypedActorRefProtocol defaultInstance; + public static SerializedTypedActorRefProtocol getDefaultInstance() { + return defaultInstance; + } + + public SerializedTypedActorRefProtocol getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return se.scalablesolutions.akka.remote.protocol.RemoteProtocol.internal_static_SerializedTypedActorRefProtocol_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return se.scalablesolutions.akka.remote.protocol.RemoteProtocol.internal_static_SerializedTypedActorRefProtocol_fieldAccessorTable; + } + + // required .SerializedActorRefProtocol actorRef = 1; + public static final int ACTORREF_FIELD_NUMBER = 1; + private boolean hasActorRef; + private se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol actorRef_; + public boolean hasActorRef() { return hasActorRef; } + public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol getActorRef() { return actorRef_; } + + // required string interfaceName = 2; + public static final int INTERFACENAME_FIELD_NUMBER = 2; + private boolean hasInterfaceName; + private java.lang.String interfaceName_ = ""; + public boolean hasInterfaceName() { return hasInterfaceName; } + public java.lang.String getInterfaceName() { return interfaceName_; } + + private void initFields() { + actorRef_ = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.getDefaultInstance(); + } + public final boolean isInitialized() { + if (!hasActorRef) return false; + if (!hasInterfaceName) return false; + if (!getActorRef().isInitialized()) return false; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (hasActorRef()) { + output.writeMessage(1, getActorRef()); + } + if (hasInterfaceName()) { + output.writeString(2, getInterfaceName()); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (hasActorRef()) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(1, getActorRef()); + } + if (hasInterfaceName()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(2, getInterfaceName()); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder { + private se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol result; + + // Construct using se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol.newBuilder() + private Builder() {} + + private static Builder create() { + Builder builder = new Builder(); + builder.result = new se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol(); + return builder; + } + + protected se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol internalGetResult() { + return result; + } + + public Builder clear() { + if (result == null) { + throw new IllegalStateException( + "Cannot call clear() after build()."); + } + result = new se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol(); + return this; + } + + public Builder clone() { + return create().mergeFrom(result); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol.getDescriptor(); + } + + public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol getDefaultInstanceForType() { + return se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol.getDefaultInstance(); + } + + public boolean isInitialized() { + return result.isInitialized(); + } + public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol build() { + if (result != null && !isInitialized()) { + throw newUninitializedMessageException(result); + } + return buildPartial(); + } + + private se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + if (!isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return buildPartial(); + } + + public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol buildPartial() { + if (result == null) { + throw new IllegalStateException( + "build() has already been called on this Builder."); + } + se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol returnMe = result; + result = null; + return returnMe; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol) { + return mergeFrom((se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol other) { + if (other == se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol.getDefaultInstance()) return this; + if (other.hasActorRef()) { + mergeActorRef(other.getActorRef()); + } + if (other.hasInterfaceName()) { + setInterfaceName(other.getInterfaceName()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder( + this.getUnknownFields()); + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + this.setUnknownFields(unknownFields.build()); + return this; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + this.setUnknownFields(unknownFields.build()); + return this; + } + break; + } + case 10: { + se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.Builder subBuilder = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.newBuilder(); + if (hasActorRef()) { + subBuilder.mergeFrom(getActorRef()); + } + input.readMessage(subBuilder, extensionRegistry); + setActorRef(subBuilder.buildPartial()); + break; + } + case 18: { + setInterfaceName(input.readString()); + break; + } + } + } + } + + + // required .SerializedActorRefProtocol actorRef = 1; + public boolean hasActorRef() { + return result.hasActorRef(); + } + public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol getActorRef() { + return result.getActorRef(); + } + public Builder setActorRef(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasActorRef = true; + result.actorRef_ = value; + return this; + } + public Builder setActorRef(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.Builder builderForValue) { + result.hasActorRef = true; + result.actorRef_ = builderForValue.build(); + return this; + } + public Builder mergeActorRef(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol value) { + if (result.hasActorRef() && + result.actorRef_ != se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.getDefaultInstance()) { + result.actorRef_ = + se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.newBuilder(result.actorRef_).mergeFrom(value).buildPartial(); + } else { + result.actorRef_ = value; + } + result.hasActorRef = true; + return this; + } + public Builder clearActorRef() { + result.hasActorRef = false; + result.actorRef_ = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.getDefaultInstance(); + return this; + } + + // required string interfaceName = 2; + public boolean hasInterfaceName() { + return result.hasInterfaceName(); + } + public java.lang.String getInterfaceName() { + return result.getInterfaceName(); + } + public Builder setInterfaceName(java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasInterfaceName = true; + result.interfaceName_ = value; + return this; + } + public Builder clearInterfaceName() { + result.hasInterfaceName = false; + result.interfaceName_ = getDefaultInstance().getInterfaceName(); + return this; + } + + // @@protoc_insertion_point(builder_scope:SerializedTypedActorRefProtocol) + } + + static { + defaultInstance = new SerializedTypedActorRefProtocol(true); + se.scalablesolutions.akka.remote.protocol.RemoteProtocol.internalForceInit(); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:SerializedTypedActorRefProtocol) + } + public static final class MessageProtocol extends com.google.protobuf.GeneratedMessage { // Use MessageProtocol.newBuilder() to construct. @@ -5848,11 +6556,21 @@ public final class RemoteProtocol { private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_RemoteActorRefProtocol_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_RemoteTypedActorRefProtocol_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_RemoteTypedActorRefProtocol_fieldAccessorTable; private static com.google.protobuf.Descriptors.Descriptor internal_static_SerializedActorRefProtocol_descriptor; private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_SerializedActorRefProtocol_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_SerializedTypedActorRefProtocol_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_SerializedTypedActorRefProtocol_fieldAccessorTable; private static com.google.protobuf.Descriptors.Descriptor internal_static_MessageProtocol_descriptor; private static @@ -5915,52 +6633,57 @@ public final class RemoteProtocol { "\n\024RemoteProtocol.proto\"\204\001\n\026RemoteActorRe" + "fProtocol\022\032\n\022classOrServiceName\030\001 \002(\t\022\026\n" + "\016actorClassname\030\002 \002(\t\022%\n\013homeAddress\030\003 \002" + - "(\0132\020.AddressProtocol\022\017\n\007timeout\030\004 \001(\004\"\217\003" + - "\n\032SerializedActorRefProtocol\022\033\n\004uuid\030\001 \002" + - "(\0132\r.UuidProtocol\022\n\n\002id\030\002 \002(\t\022\026\n\016actorCl" + - "assname\030\003 \002(\t\022)\n\017originalAddress\030\004 \002(\0132\020" + - ".AddressProtocol\022\025\n\ractorInstance\030\005 \001(\014\022" + - "\033\n\023serializerClassname\030\006 \001(\t\022\024\n\014isTransa" + - "ctor\030\007 \001(\010\022\017\n\007timeout\030\010 \001(\004\022\026\n\016receiveTi", - "meout\030\t \001(\004\022%\n\tlifeCycle\030\n \001(\0132\022.LifeCyc" + - "leProtocol\022+\n\nsupervisor\030\013 \001(\0132\027.RemoteA" + - "ctorRefProtocol\022\024\n\014hotswapStack\030\014 \001(\014\022(\n" + - "\010messages\030\r \003(\0132\026.RemoteRequestProtocol\"" + - "r\n\017MessageProtocol\0225\n\023serializationSchem" + - "e\030\001 \002(\0162\030.SerializationSchemeType\022\017\n\007mes" + - "sage\030\002 \002(\014\022\027\n\017messageManifest\030\003 \001(\014\"\255\001\n\021" + - "ActorInfoProtocol\022\033\n\004uuid\030\001 \002(\0132\r.UuidPr" + - "otocol\022\016\n\006target\030\002 \002(\t\022\017\n\007timeout\030\003 \002(\004\022" + - "\035\n\tactorType\030\004 \002(\0162\n.ActorType\022/\n\016typedA", - "ctorInfo\030\005 \001(\0132\027.TypedActorInfoProtocol\022" + - "\n\n\002id\030\006 \001(\t\";\n\026TypedActorInfoProtocol\022\021\n" + - "\tinterface\030\001 \002(\t\022\016\n\006method\030\002 \002(\t\"\212\002\n\025Rem" + - "oteRequestProtocol\022\033\n\004uuid\030\001 \002(\0132\r.UuidP" + - "rotocol\022!\n\007message\030\002 \002(\0132\020.MessageProtoc" + - "ol\022%\n\tactorInfo\030\003 \002(\0132\022.ActorInfoProtoco" + - "l\022\020\n\010isOneWay\030\004 \002(\010\022%\n\016supervisorUuid\030\005 " + - "\001(\0132\r.UuidProtocol\022\'\n\006sender\030\006 \001(\0132\027.Rem" + - "oteActorRefProtocol\022(\n\010metadata\030\007 \003(\0132\026." + - "MetadataEntryProtocol\"\364\001\n\023RemoteReplyPro", - "tocol\022\033\n\004uuid\030\001 \002(\0132\r.UuidProtocol\022!\n\007me" + - "ssage\030\002 \001(\0132\020.MessageProtocol\022%\n\texcepti" + - "on\030\003 \001(\0132\022.ExceptionProtocol\022%\n\016supervis" + - "orUuid\030\004 \001(\0132\r.UuidProtocol\022\017\n\007isActor\030\005" + - " \002(\010\022\024\n\014isSuccessful\030\006 \002(\010\022(\n\010metadata\030\007" + - " \003(\0132\026.MetadataEntryProtocol\")\n\014UuidProt" + - "ocol\022\014\n\004high\030\001 \002(\004\022\013\n\003low\030\002 \002(\004\"3\n\025Metad" + - "ataEntryProtocol\022\013\n\003key\030\001 \002(\t\022\r\n\005value\030\002" + - " \002(\014\"6\n\021LifeCycleProtocol\022!\n\tlifeCycle\030\001" + - " \002(\0162\016.LifeCycleType\"1\n\017AddressProtocol\022", - "\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"7\n\021Excep" + - "tionProtocol\022\021\n\tclassname\030\001 \002(\t\022\017\n\007messa" + - "ge\030\002 \002(\t*=\n\tActorType\022\017\n\013SCALA_ACTOR\020\001\022\016" + - "\n\nJAVA_ACTOR\020\002\022\017\n\013TYPED_ACTOR\020\003*]\n\027Seria" + - "lizationSchemeType\022\010\n\004JAVA\020\001\022\013\n\007SBINARY\020" + - "\002\022\016\n\nSCALA_JSON\020\003\022\r\n\tJAVA_JSON\020\004\022\014\n\010PROT" + - "OBUF\020\005*-\n\rLifeCycleType\022\r\n\tPERMANENT\020\001\022\r" + - "\n\tTEMPORARY\020\002B-\n)se.scalablesolutions.ak" + - "ka.remote.protocolH\001" + "(\0132\020.AddressProtocol\022\017\n\007timeout\030\004 \001(\004\"_\n" + + "\033RemoteTypedActorRefProtocol\022)\n\010actorRef" + + "\030\001 \002(\0132\027.RemoteActorRefProtocol\022\025\n\rinter" + + "faceName\030\002 \002(\t\"\217\003\n\032SerializedActorRefPro" + + "tocol\022\033\n\004uuid\030\001 \002(\0132\r.UuidProtocol\022\n\n\002id" + + "\030\002 \002(\t\022\026\n\016actorClassname\030\003 \002(\t\022)\n\017origin" + + "alAddress\030\004 \002(\0132\020.AddressProtocol\022\025\n\ract", + "orInstance\030\005 \001(\014\022\033\n\023serializerClassname\030" + + "\006 \001(\t\022\024\n\014isTransactor\030\007 \001(\010\022\017\n\007timeout\030\010" + + " \001(\004\022\026\n\016receiveTimeout\030\t \001(\004\022%\n\tlifeCycl" + + "e\030\n \001(\0132\022.LifeCycleProtocol\022+\n\nsuperviso" + + "r\030\013 \001(\0132\027.RemoteActorRefProtocol\022\024\n\014hots" + + "wapStack\030\014 \001(\014\022(\n\010messages\030\r \003(\0132\026.Remot" + + "eRequestProtocol\"g\n\037SerializedTypedActor" + + "RefProtocol\022-\n\010actorRef\030\001 \002(\0132\033.Serializ" + + "edActorRefProtocol\022\025\n\rinterfaceName\030\002 \002(" + + "\t\"r\n\017MessageProtocol\0225\n\023serializationSch", + "eme\030\001 \002(\0162\030.SerializationSchemeType\022\017\n\007m" + + "essage\030\002 \002(\014\022\027\n\017messageManifest\030\003 \001(\014\"\255\001" + + "\n\021ActorInfoProtocol\022\033\n\004uuid\030\001 \002(\0132\r.Uuid" + + "Protocol\022\016\n\006target\030\002 \002(\t\022\017\n\007timeout\030\003 \002(" + + "\004\022\035\n\tactorType\030\004 \002(\0162\n.ActorType\022/\n\016type" + + "dActorInfo\030\005 \001(\0132\027.TypedActorInfoProtoco" + + "l\022\n\n\002id\030\006 \001(\t\";\n\026TypedActorInfoProtocol\022" + + "\021\n\tinterface\030\001 \002(\t\022\016\n\006method\030\002 \002(\t\"\212\002\n\025R" + + "emoteRequestProtocol\022\033\n\004uuid\030\001 \002(\0132\r.Uui" + + "dProtocol\022!\n\007message\030\002 \002(\0132\020.MessageProt", + "ocol\022%\n\tactorInfo\030\003 \002(\0132\022.ActorInfoProto" + + "col\022\020\n\010isOneWay\030\004 \002(\010\022%\n\016supervisorUuid\030" + + "\005 \001(\0132\r.UuidProtocol\022\'\n\006sender\030\006 \001(\0132\027.R" + + "emoteActorRefProtocol\022(\n\010metadata\030\007 \003(\0132" + + "\026.MetadataEntryProtocol\"\364\001\n\023RemoteReplyP" + + "rotocol\022\033\n\004uuid\030\001 \002(\0132\r.UuidProtocol\022!\n\007" + + "message\030\002 \001(\0132\020.MessageProtocol\022%\n\texcep" + + "tion\030\003 \001(\0132\022.ExceptionProtocol\022%\n\016superv" + + "isorUuid\030\004 \001(\0132\r.UuidProtocol\022\017\n\007isActor" + + "\030\005 \002(\010\022\024\n\014isSuccessful\030\006 \002(\010\022(\n\010metadata", + "\030\007 \003(\0132\026.MetadataEntryProtocol\")\n\014UuidPr" + + "otocol\022\014\n\004high\030\001 \002(\004\022\013\n\003low\030\002 \002(\004\"3\n\025Met" + + "adataEntryProtocol\022\013\n\003key\030\001 \002(\t\022\r\n\005value" + + "\030\002 \002(\014\"6\n\021LifeCycleProtocol\022!\n\tlifeCycle" + + "\030\001 \002(\0162\016.LifeCycleType\"1\n\017AddressProtoco" + + "l\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"7\n\021Exc" + + "eptionProtocol\022\021\n\tclassname\030\001 \002(\t\022\017\n\007mes" + + "sage\030\002 \002(\t*=\n\tActorType\022\017\n\013SCALA_ACTOR\020\001" + + "\022\016\n\nJAVA_ACTOR\020\002\022\017\n\013TYPED_ACTOR\020\003*]\n\027Ser" + + "ializationSchemeType\022\010\n\004JAVA\020\001\022\013\n\007SBINAR", + "Y\020\002\022\016\n\nSCALA_JSON\020\003\022\r\n\tJAVA_JSON\020\004\022\014\n\010PR" + + "OTOBUF\020\005*-\n\rLifeCycleType\022\r\n\tPERMANENT\020\001" + + "\022\r\n\tTEMPORARY\020\002B-\n)se.scalablesolutions." + + "akka.remote.protocolH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -5975,16 +6698,32 @@ public final class RemoteProtocol { new java.lang.String[] { "ClassOrServiceName", "ActorClassname", "HomeAddress", "Timeout", }, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.class, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.Builder.class); - internal_static_SerializedActorRefProtocol_descriptor = + internal_static_RemoteTypedActorRefProtocol_descriptor = getDescriptor().getMessageTypes().get(1); + internal_static_RemoteTypedActorRefProtocol_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_RemoteTypedActorRefProtocol_descriptor, + new java.lang.String[] { "ActorRef", "InterfaceName", }, + se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol.class, + se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol.Builder.class); + internal_static_SerializedActorRefProtocol_descriptor = + getDescriptor().getMessageTypes().get(2); internal_static_SerializedActorRefProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_SerializedActorRefProtocol_descriptor, new java.lang.String[] { "Uuid", "Id", "ActorClassname", "OriginalAddress", "ActorInstance", "SerializerClassname", "IsTransactor", "Timeout", "ReceiveTimeout", "LifeCycle", "Supervisor", "HotswapStack", "Messages", }, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.class, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.Builder.class); + internal_static_SerializedTypedActorRefProtocol_descriptor = + getDescriptor().getMessageTypes().get(3); + internal_static_SerializedTypedActorRefProtocol_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_SerializedTypedActorRefProtocol_descriptor, + new java.lang.String[] { "ActorRef", "InterfaceName", }, + se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol.class, + se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol.Builder.class); internal_static_MessageProtocol_descriptor = - getDescriptor().getMessageTypes().get(2); + getDescriptor().getMessageTypes().get(4); internal_static_MessageProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_MessageProtocol_descriptor, @@ -5992,7 +6731,7 @@ public final class RemoteProtocol { se.scalablesolutions.akka.remote.protocol.RemoteProtocol.MessageProtocol.class, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.MessageProtocol.Builder.class); internal_static_ActorInfoProtocol_descriptor = - getDescriptor().getMessageTypes().get(3); + getDescriptor().getMessageTypes().get(5); internal_static_ActorInfoProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ActorInfoProtocol_descriptor, @@ -6000,7 +6739,7 @@ public final class RemoteProtocol { se.scalablesolutions.akka.remote.protocol.RemoteProtocol.ActorInfoProtocol.class, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.ActorInfoProtocol.Builder.class); internal_static_TypedActorInfoProtocol_descriptor = - getDescriptor().getMessageTypes().get(4); + getDescriptor().getMessageTypes().get(6); internal_static_TypedActorInfoProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_TypedActorInfoProtocol_descriptor, @@ -6008,7 +6747,7 @@ public final class RemoteProtocol { se.scalablesolutions.akka.remote.protocol.RemoteProtocol.TypedActorInfoProtocol.class, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.TypedActorInfoProtocol.Builder.class); internal_static_RemoteRequestProtocol_descriptor = - getDescriptor().getMessageTypes().get(5); + getDescriptor().getMessageTypes().get(7); internal_static_RemoteRequestProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RemoteRequestProtocol_descriptor, @@ -6016,7 +6755,7 @@ public final class RemoteProtocol { se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol.class, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol.Builder.class); internal_static_RemoteReplyProtocol_descriptor = - getDescriptor().getMessageTypes().get(6); + getDescriptor().getMessageTypes().get(8); internal_static_RemoteReplyProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RemoteReplyProtocol_descriptor, @@ -6024,7 +6763,7 @@ public final class RemoteProtocol { se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteReplyProtocol.class, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteReplyProtocol.Builder.class); internal_static_UuidProtocol_descriptor = - getDescriptor().getMessageTypes().get(7); + getDescriptor().getMessageTypes().get(9); internal_static_UuidProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_UuidProtocol_descriptor, @@ -6032,7 +6771,7 @@ public final class RemoteProtocol { se.scalablesolutions.akka.remote.protocol.RemoteProtocol.UuidProtocol.class, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.UuidProtocol.Builder.class); internal_static_MetadataEntryProtocol_descriptor = - getDescriptor().getMessageTypes().get(8); + getDescriptor().getMessageTypes().get(10); internal_static_MetadataEntryProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_MetadataEntryProtocol_descriptor, @@ -6040,7 +6779,7 @@ public final class RemoteProtocol { se.scalablesolutions.akka.remote.protocol.RemoteProtocol.MetadataEntryProtocol.class, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.MetadataEntryProtocol.Builder.class); internal_static_LifeCycleProtocol_descriptor = - getDescriptor().getMessageTypes().get(9); + getDescriptor().getMessageTypes().get(11); internal_static_LifeCycleProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_LifeCycleProtocol_descriptor, @@ -6048,7 +6787,7 @@ public final class RemoteProtocol { se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol.class, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol.Builder.class); internal_static_AddressProtocol_descriptor = - getDescriptor().getMessageTypes().get(10); + getDescriptor().getMessageTypes().get(12); internal_static_AddressProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_AddressProtocol_descriptor, @@ -6056,7 +6795,7 @@ public final class RemoteProtocol { se.scalablesolutions.akka.remote.protocol.RemoteProtocol.AddressProtocol.class, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.AddressProtocol.Builder.class); internal_static_ExceptionProtocol_descriptor = - getDescriptor().getMessageTypes().get(11); + getDescriptor().getMessageTypes().get(13); internal_static_ExceptionProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ExceptionProtocol_descriptor, diff --git a/akka-remote/src/main/protocol/RemoteProtocol.proto b/akka-remote/src/main/protocol/RemoteProtocol.proto index 55c3ae35fd..1ea9d8f986 100644 --- a/akka-remote/src/main/protocol/RemoteProtocol.proto +++ b/akka-remote/src/main/protocol/RemoteProtocol.proto @@ -22,6 +22,15 @@ message RemoteActorRefProtocol { optional uint64 timeout = 4; } +/** + * Defines a remote ActorRef that "remembers" and uses its original typed Actor instance + * on the original node. + */ +message RemoteTypedActorRefProtocol { + required RemoteActorRefProtocol actorRef = 1; + required string interfaceName = 2; +} + /** * Defines a fully serialized remote ActorRef (with serialized Actor instance) * that is about to be instantiated on the remote node. It is fully disconnected @@ -43,6 +52,16 @@ message SerializedActorRefProtocol { repeated RemoteRequestProtocol messages = 13; } +/** + * Defines a fully serialized remote ActorRef (with serialized typed actor instance) + * that is about to be instantiated on the remote node. It is fully disconnected + * from its original host. + */ +message SerializedTypedActorRefProtocol { + required SerializedActorRefProtocol actorRef = 1; + required string interfaceName = 2; +} + /** * Defines a message. */ diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index bacaf22546..8784a8c81f 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -10,8 +10,7 @@ import java.util.concurrent.{ConcurrentHashMap, Executors} import java.util.{Map => JMap} import se.scalablesolutions.akka.actor.{ - Actor, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage} -import se.scalablesolutions.akka.actor.{Uuid,uuidFrom} + Actor, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage,uuidFrom,Uuid} import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.util._ import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._ @@ -68,6 +67,7 @@ object RemoteNode extends RemoteServer * @author Jonas Bonér */ object RemoteServer { + val UUID_PREFIX = "uuid:" val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") val PORT = config.getInt("akka.remote.server.port", 9999) @@ -124,19 +124,21 @@ object RemoteServer { private class RemoteActorSet { private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef] + private[RemoteServer] val actorsByUuid = new ConcurrentHashMap[String, ActorRef] private[RemoteServer] val typedActors = new ConcurrentHashMap[String, AnyRef] + private[RemoteServer] val typedActorsByUuid = new ConcurrentHashMap[String, AnyRef] } private val guard = new ReadWriteGuard private val remoteActorSets = Map[Address, RemoteActorSet]() private val remoteServers = Map[Address, RemoteServer]() - private[akka] def registerActor(address: InetSocketAddress, uuid: Uuid, actor: ActorRef) = guard.withWriteGuard { - actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid.toString, actor) + private[akka] def registerActorByUuid(address: InetSocketAddress, uuid: String, actor: ActorRef) = guard.withWriteGuard { + actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actorsByUuid.put(uuid, actor) } - private[akka] def registerTypedActor(address: InetSocketAddress, uuid: Uuid, typedActor: AnyRef) = guard.withWriteGuard { - actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(uuid.toString, typedActor) + private[akka] def registerTypedActorByUuid(address: InetSocketAddress, uuid: String, typedActor: AnyRef) = guard.withWriteGuard { + actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(uuid, typedActor) } private[akka] def getOrCreateServer(address: InetSocketAddress): RemoteServer = guard.withWriteGuard { @@ -193,6 +195,7 @@ case class RemoteServerClientDisconnected(@BeanProperty val server: RemoteServer * @author Jonas Bonér */ class RemoteServer extends Logging with ListenerManagement { + import RemoteServer._ def name = "RemoteServer@" + hostname + ":" + port private[akka] var address = RemoteServer.Address(RemoteServer.HOSTNAME,RemoteServer.PORT) @@ -284,10 +287,11 @@ class RemoteServer extends Logging with ListenerManagement { * @param typedActor typed actor to register */ def registerTypedActor(id: String, typedActor: AnyRef): Unit = synchronized { - val typedActors = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors - if (!typedActors.contains(id)) { - log.debug("Registering server side remote actor [%s] with id [%s] on [%s:%d]", typedActor.getClass.getName, id, hostname, port) - typedActors.put(id, typedActor) + log.debug("Registering server side remote typed actor [%s] with id [%s]", typedActor.getClass.getName, id) + if (id.startsWith(UUID_PREFIX)) { + registerTypedActor(id.substring(UUID_PREFIX.length), typedActor, typedActorsByUuid()) + } else { + registerTypedActor(id, typedActor, typedActors()) } } @@ -302,12 +306,27 @@ class RemoteServer extends Logging with ListenerManagement { * NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself. */ def register(id: String, actorRef: ActorRef): Unit = synchronized { + log.debug("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, id) + if (id.startsWith(UUID_PREFIX)) { + register(id.substring(UUID_PREFIX.length), actorRef, actorsByUuid()) + } else { + register(id, actorRef, actors()) + } + } + + private def register(id: String, actorRef: ActorRef, registry: ConcurrentHashMap[String, ActorRef]) { if (_isRunning) { - val actorMap = actors() - if (!actorMap.contains(id)) { + if (!registry.contains(id)) { if (!actorRef.isRunning) actorRef.start - log.debug("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, id) - actorMap.put(id, actorRef) + registry.put(id, actorRef) + } + } + } + + private def registerTypedActor(id: String, typedActor: AnyRef, registry: ConcurrentHashMap[String, AnyRef]) { + if (_isRunning) { + if (!registry.contains(id)) { + registry.put(id, typedActor) } } } @@ -320,7 +339,7 @@ class RemoteServer extends Logging with ListenerManagement { log.debug("Unregistering server side remote actor [%s] with id [%s:%s]", actorRef.actorClass.getName, actorRef.id, actorRef.uuid) val actorMap = actors() actorMap remove actorRef.id - if (actorRef.registeredInRemoteNodeDuringSerialization) actorMap remove actorRef.uuid + if (actorRef.registeredInRemoteNodeDuringSerialization) actorsByUuid() remove actorRef.uuid } } @@ -332,10 +351,15 @@ class RemoteServer extends Logging with ListenerManagement { def unregister(id: String):Unit = synchronized { if (_isRunning) { log.info("Unregistering server side remote actor with id [%s]", id) - val actorMap = actors() - val actorRef = actorMap get id - actorMap remove id - if (actorRef.registeredInRemoteNodeDuringSerialization) actorMap remove actorRef.uuid + if (id.startsWith(UUID_PREFIX)) { + actorsByUuid().remove(id.substring(UUID_PREFIX.length)) + } else { + val actorRef = actors().get(id) + if (actorRef.registeredInRemoteNodeDuringSerialization) { + actorsByUuid() remove actorRef.uuid + } + actors() remove id + } } } @@ -347,8 +371,11 @@ class RemoteServer extends Logging with ListenerManagement { def unregisterTypedActor(id: String):Unit = synchronized { if (_isRunning) { log.info("Unregistering server side remote typed actor with id [%s]", id) - val registeredTypedActors = typedActors() - registeredTypedActors.remove(id) + if (id.startsWith(UUID_PREFIX)) { + typedActorsByUuid().remove(id.substring(UUID_PREFIX.length)) + } else { + typedActors().remove(id) + } } } @@ -356,8 +383,10 @@ class RemoteServer extends Logging with ListenerManagement { protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message) - private[akka] def actors() = RemoteServer.actorsFor(address).actors - private[akka] def typedActors() = RemoteServer.actorsFor(address).typedActors + private[akka] def actors() = RemoteServer.actorsFor(address).actors + private[akka] def actorsByUuid() = RemoteServer.actorsFor(address).actorsByUuid + private[akka] def typedActors() = RemoteServer.actorsFor(address).typedActors + private[akka] def typedActorsByUuid() = RemoteServer.actorsFor(address).typedActorsByUuid } object RemoteServerSslContext { @@ -420,6 +449,7 @@ class RemoteServerHandler( val openChannels: ChannelGroup, val applicationLoader: Option[ClassLoader], val server: RemoteServer) extends SimpleChannelUpstreamHandler with Logging { + import RemoteServer._ val AW_PROXY_PREFIX = "$$ProxiedByAW".intern applicationLoader.foreach(MessageSerializer.setClassLoader(_)) @@ -478,11 +508,12 @@ class RemoteServerHandler( private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = { log.debug("Received RemoteRequestProtocol[\n%s]", request.toString) - val actorType = request.getActorInfo.getActorType - if (actorType == SCALA_ACTOR) dispatchToActor(request, channel) - else if (actorType == JAVA_ACTOR) throw new IllegalActorStateException("ActorType JAVA_ACTOR is currently not supported") - else if (actorType == TYPED_ACTOR) dispatchToTypedActor(request, channel) - else throw new IllegalActorStateException("Unknown ActorType [" + actorType + "]") + request.getActorInfo.getActorType match { + case SCALA_ACTOR => dispatchToActor(request, channel) + case TYPED_ACTOR => dispatchToTypedActor(request, channel) + case JAVA_ACTOR => throw new IllegalActorStateException("ActorType JAVA_ACTOR is currently not supported") + case other => throw new IllegalActorStateException("Unknown ActorType [" + other + "]") + } } private def dispatchToActor(request: RemoteRequestProtocol, channel: Channel) = { @@ -505,10 +536,10 @@ class RemoteServerHandler( override def onComplete(result: AnyRef) { log.debug("Returning result from actor invocation [%s]", result) val replyBuilder = RemoteReplyProtocol.newBuilder - .setUuid(request.getUuid) - .setMessage(MessageSerializer.serialize(result)) - .setIsSuccessful(true) - .setIsActor(true) + .setUuid(request.getUuid) + .setMessage(MessageSerializer.serialize(result)) + .setIsSuccessful(true) + .setIsActor(true) if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) @@ -566,32 +597,23 @@ class RemoteServerHandler( } } - /** - * Find a registered actor by ID (default) or UUID. - * Actors are registered by id apart from registering during serialization see SerializationProtocol. - */ - private def findActorByIdOrUuid(id: String, uuid: Uuid) : ActorRef = { - val registeredActors = server.actors() - var actorRefOrNull = registeredActors get id - if (actorRefOrNull eq null) { - actorRefOrNull = registeredActors get uuid - } - actorRefOrNull + private def findActorById(id: String) : ActorRef = { + server.actors().get(id) } - /** - * Find a registered typed actor by ID (default) or UUID. - * Actors are registered by id apart from registering during serialization see SerializationProtocol. - */ - private def findTypedActorByIdOrUUid(id: String, uuid: Uuid) : AnyRef = { - val registeredActors = server.typedActors() - var actorRefOrNull = registeredActors get id - if (actorRefOrNull eq null) { - actorRefOrNull = registeredActors get uuid - } - actorRefOrNull + private def findActorByUuid(uuid: String) : ActorRef = { + server.actorsByUuid().get(uuid) } + private def findTypedActorById(id: String) : AnyRef = { + server.typedActors().get(id) + } + + private def findTypedActorByUuid(uuid: String) : AnyRef = { + server.typedActorsByUuid().get(uuid) + } + + /** * Creates a new instance of the actor with name, uuid and timeout specified as arguments. * @@ -600,21 +622,25 @@ class RemoteServerHandler( * Does not start the actor. */ private def createActor(actorInfo: ActorInfoProtocol): ActorRef = { - val uuid = uuidFrom(actorInfo.getUuid.getHigh,actorInfo.getUuid.getLow) + val uuid = actorInfo.getUuid val id = actorInfo.getId val name = actorInfo.getTarget val timeout = actorInfo.getTimeout - val actorRefOrNull = findActorByIdOrUuid(id, uuid) - + val actorRefOrNull = if (id.startsWith(UUID_PREFIX)) { + findActorByUuid(id.substring(UUID_PREFIX.length)) + } else { + findActorById(id) + } + if (actorRefOrNull eq null) { try { log.info("Creating a new remote actor [%s:%s]", name, uuid) val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name) else Class.forName(name) val actorRef = Actor.actorOf(clazz.newInstance.asInstanceOf[Actor]) - actorRef.uuid = uuid + actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow) actorRef.id = id actorRef.timeout = timeout actorRef.remoteAddress = None @@ -630,10 +656,14 @@ class RemoteServerHandler( } private def createTypedActor(actorInfo: ActorInfoProtocol): AnyRef = { - val uuid = uuidFrom(actorInfo.getUuid.getHigh,actorInfo.getUuid.getLow) + val uuid = actorInfo.getUuid val id = actorInfo.getId - val typedActorOrNull = findTypedActorByIdOrUUid(id, uuid) + val typedActorOrNull = if (id.startsWith(UUID_PREFIX)) { + findTypedActorByUuid(id.substring(UUID_PREFIX.length)) + } else { + findTypedActorById(id) + } if (typedActorOrNull eq null) { val typedActorInfo = actorInfo.getTypedActorInfo diff --git a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala index 7997be128b..0e7f5ce732 100644 --- a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala @@ -4,11 +4,10 @@ package se.scalablesolutions.akka.serialization -import se.scalablesolutions.akka.actor.{Actor, ActorRef, LocalActorRef, RemoteActorRef, IllegalActorStateException, ActorType} import se.scalablesolutions.akka.stm.global._ import se.scalablesolutions.akka.stm.TransactionManagement._ import se.scalablesolutions.akka.stm.TransactionManagement -import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._ +import se.scalablesolutions.akka.dispatch.MessageInvocation import se.scalablesolutions.akka.remote.{RemoteServer, MessageSerializer} import se.scalablesolutions.akka.remote.protocol.RemoteProtocol.{ActorType => ActorTypeProtocol, _} import ActorTypeProtocol._ @@ -16,6 +15,7 @@ import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, F import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.actor.{uuidFrom,newUuid} import com.google.protobuf.ByteString +import se.scalablesolutions.akka.actor._ /** * Type class definition for Actor Serialization @@ -37,13 +37,14 @@ trait Format[T <: Actor] extends FromBinary[T] with ToBinary[T] * Create a Format object with the client actor as the implementation of the type class * *

- * object BinaryFormatMyStatelessActor {
+ * object BinaryFormatMyStatelessActor  {
  *   implicit object MyStatelessActorFormat extends StatelessActorFormat[MyStatelessActor]
  * }
  * 
*/ trait StatelessActorFormat[T <: Actor] extends Format[T] { def fromBinary(bytes: Array[Byte], act: T) = act + def toBinary(ac: T) = Array.empty[Byte] } @@ -54,16 +55,18 @@ trait StatelessActorFormat[T <: Actor] extends Format[T] { * a serializer object * *
- * object BinaryFormatMyJavaSerializableActor {
- *   implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor] {
+ * object BinaryFormatMyJavaSerializableActor  {
+ *   implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor]  {
  *     val serializer = Serializer.Java
- *   }
+ * }
  * }
  * 
*/ trait SerializerBasedActorFormat[T <: Actor] extends Format[T] { val serializer: Serializer + def fromBinary(bytes: Array[Byte], act: T) = serializer.fromBinary(bytes, Some(act.self.actorClass)).asInstanceOf[T] + def toBinary(ac: T) = serializer.toBinary(ac) } @@ -71,23 +74,22 @@ trait SerializerBasedActorFormat[T <: Actor] extends Format[T] { * Module for local actor serialization. */ object ActorSerialization { - def fromBinary[T <: Actor](bytes: Array[Byte])(implicit format: Format[T]): ActorRef = fromBinaryToLocalActorRef(bytes, format) - def toBinary[T <: Actor](a: ActorRef)(implicit format: Format[T]): Array[Byte] = - toSerializedActorRefProtocol(a, format).toByteArray + def toBinary[T <: Actor](a: ActorRef, serializeMailBox: Boolean = true)(implicit format: Format[T]): Array[Byte] = + toSerializedActorRefProtocol(a, format, serializeMailBox).toByteArray // wrapper for implicits to be used by Java def fromBinaryJ[T <: Actor](bytes: Array[Byte], format: Format[T]): ActorRef = fromBinary(bytes)(format) // wrapper for implicits to be used by Java - def toBinaryJ[T <: Actor](a: ActorRef, format: Format[T]): Array[Byte] = - toBinary(a)(format) + def toBinaryJ[T <: Actor](a: ActorRef, format: Format[T], srlMailBox: Boolean = true): Array[Byte] = + toBinary(a, srlMailBox)(format) - private def toSerializedActorRefProtocol[T <: Actor]( - actorRef: ActorRef, format: Format[T]): SerializedActorRefProtocol = { + private[akka] def toSerializedActorRefProtocol[T <: Actor]( + actorRef: ActorRef, format: Format[T], serializeMailBox: Boolean = true): SerializedActorRefProtocol = { val lifeCycleProtocol: Option[LifeCycleProtocol] = { def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match { case Permanent => builder.setLifeCycle(LifeCycleType.PERMANENT) @@ -103,9 +105,9 @@ object ActorSerialization { } val originalAddress = AddressProtocol.newBuilder - .setHostname(actorRef.homeAddress.getHostName) - .setPort(actorRef.homeAddress.getPort) - .build + .setHostname(actorRef.homeAddress.getHostName) + .setPort(actorRef.homeAddress.getPort) + .build val builder = SerializedActorRefProtocol.newBuilder .setUuid(UuidProtocol.newBuilder.setHigh(actorRef.uuid.getTime).setLow(actorRef.uuid.getClockSeqAndNode).build) @@ -115,6 +117,30 @@ object ActorSerialization { .setIsTransactor(actorRef.isTransactor) .setTimeout(actorRef.timeout) + + if (serializeMailBox == true) { + val messages = + actorRef.mailbox match { + case q: java.util.Queue[MessageInvocation] => + val l = new scala.collection.mutable.ListBuffer[MessageInvocation] + val it = q.iterator + while (it.hasNext == true) l += it.next + l + } + + val requestProtocols = + messages.map(m => + RemoteActorSerialization.createRemoteRequestProtocolBuilder( + actorRef, + m.message, + false, + actorRef.getSender, + None, + ActorType.ScalaActor).build) + + requestProtocols.foreach(rp => builder.addMessages(rp)) + } + actorRef.receiveTimeout.foreach(builder.setReceiveTimeout(_)) builder.setActorInstance(ByteString.copyFrom(format.toBinary(actorRef.actor.asInstanceOf[T]))) lifeCycleProtocol.foreach(builder.setLifeCycle(_)) @@ -127,33 +153,33 @@ object ActorSerialization { private def fromBinaryToLocalActorRef[T <: Actor](bytes: Array[Byte], format: Format[T]): ActorRef = fromProtobufToLocalActorRef(SerializedActorRefProtocol.newBuilder.mergeFrom(bytes).build, format, None) - private def fromProtobufToLocalActorRef[T <: Actor]( - protocol: SerializedActorRefProtocol, format: Format[T], loader: Option[ClassLoader]): ActorRef = { + private[akka] def fromProtobufToLocalActorRef[T <: Actor]( + protocol: SerializedActorRefProtocol, format: Format[T], loader: Option[ClassLoader]): ActorRef = { Actor.log.debug("Deserializing SerializedActorRefProtocol to LocalActorRef:\n" + protocol) val serializer = - if (format.isInstanceOf[SerializerBasedActorFormat[_]]) - Some(format.asInstanceOf[SerializerBasedActorFormat[_]].serializer) - else None + if (format.isInstanceOf[SerializerBasedActorFormat[_]]) + Some(format.asInstanceOf[SerializerBasedActorFormat[_]].serializer) + else None val lifeCycle = - if (protocol.hasLifeCycle) { - val lifeCycleProtocol = protocol.getLifeCycle - Some(if (lifeCycleProtocol.getLifeCycle == LifeCycleType.PERMANENT) LifeCycle(Permanent) - else if (lifeCycleProtocol.getLifeCycle == LifeCycleType.TEMPORARY) LifeCycle(Temporary) - else throw new IllegalActorStateException("LifeCycle type is not valid: " + lifeCycleProtocol.getLifeCycle)) - } else None + if (protocol.hasLifeCycle) { + val lifeCycleProtocol = protocol.getLifeCycle + Some(if (lifeCycleProtocol.getLifeCycle == LifeCycleType.PERMANENT) LifeCycle(Permanent) + else if (lifeCycleProtocol.getLifeCycle == LifeCycleType.TEMPORARY) LifeCycle(Temporary) + else throw new IllegalActorStateException("LifeCycle type is not valid: " + lifeCycleProtocol.getLifeCycle)) + } else None val supervisor = - if (protocol.hasSupervisor) - Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(protocol.getSupervisor, loader)) - else None + if (protocol.hasSupervisor) + Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(protocol.getSupervisor, loader)) + else None val hotswap = - if (serializer.isDefined && protocol.hasHotswapStack) Some(serializer.get + if (serializer.isDefined && protocol.hasHotswapStack) Some(serializer.get .fromBinary(protocol.getHotswapStack.toByteArray, Some(classOf[PartialFunction[Any, Unit]])) .asInstanceOf[PartialFunction[Any, Unit]]) - else None + else None val classLoader = loader.getOrElse(getClass.getClassLoader) @@ -195,9 +221,9 @@ object RemoteActorSerialization { def fromBinaryToRemoteActorRef(bytes: Array[Byte]): ActorRef = fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, None) - /** - * Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance. - */ + /** + * Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance. + */ def fromBinaryToRemoteActorRef(bytes: Array[Byte], loader: ClassLoader): ActorRef = fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader)) @@ -226,7 +252,7 @@ object RemoteActorSerialization { if (!registeredInRemoteNodeDuringSerialization) { Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port) RemoteServer.getOrCreateServer(homeAddress) - RemoteServer.registerActor(homeAddress, uuid, ar) + RemoteServer.registerActorByUuid(homeAddress, uuid.toString, ar) registeredInRemoteNodeDuringSerialization = true } @@ -239,13 +265,13 @@ object RemoteActorSerialization { } def createRemoteRequestProtocolBuilder( - actorRef: ActorRef, - message: Any, - isOneWay: Boolean, - senderOption: Option[ActorRef], - typedActorInfo: Option[Tuple2[String, String]], - actorType: ActorType): - RemoteRequestProtocol.Builder = { + actorRef: ActorRef, + message: Any, + isOneWay: Boolean, + senderOption: Option[ActorRef], + typedActorInfo: Option[Tuple2[String, String]], + actorType: ActorType): + RemoteRequestProtocol.Builder = { import actorRef._ val actorInfoBuilder = ActorInfoProtocol.newBuilder @@ -254,12 +280,13 @@ object RemoteActorSerialization { .setTarget(actorClassName) .setTimeout(timeout) - typedActorInfo.foreach { typedActor => - actorInfoBuilder.setTypedActorInfo( - TypedActorInfoProtocol.newBuilder - .setInterface(typedActor._1) - .setMethod(typedActor._2) - .build) + typedActorInfo.foreach { + typedActor => + actorInfoBuilder.setTypedActorInfo( + TypedActorInfoProtocol.newBuilder + .setInterface(typedActor._1) + .setMethod(typedActor._2) + .build) } actorType match { @@ -280,7 +307,107 @@ object RemoteActorSerialization { senderOption.foreach { sender => RemoteServer.getOrCreateServer(sender.homeAddress).register(sender.uuid.toString, sender) requestBuilder.setSender(toRemoteActorRefProtocol(sender)) + } requestBuilder } + + +} + + +/** + * Module for local typed actor serialization. + */ +object TypedActorSerialization { + + def fromBinary[T <: Actor, U <: AnyRef](bytes: Array[Byte])(implicit format: Format[T]): U = + fromBinaryToLocalTypedActorRef(bytes, format) + + def toBinary[T <: Actor](proxy: AnyRef)(implicit format: Format[T]): Array[Byte] = { + toSerializedTypedActorRefProtocol(proxy, format).toByteArray + } + + // wrapper for implicits to be used by Java + def fromBinaryJ[T <: Actor, U <: AnyRef](bytes: Array[Byte], format: Format[T]): U = + fromBinary(bytes)(format) + + // wrapper for implicits to be used by Java + def toBinaryJ[T <: Actor](a: AnyRef, format: Format[T]): Array[Byte] = + toBinary(a)(format) + + private def toSerializedTypedActorRefProtocol[T <: Actor]( + proxy: AnyRef, format: Format[T]): SerializedTypedActorRefProtocol = { + + val init = AspectInitRegistry.initFor(proxy) + if (init == null) throw new IllegalArgumentException("Proxy for typed actor could not be found in AspectInitRegistry.") + + SerializedTypedActorRefProtocol.newBuilder + .setActorRef(ActorSerialization.toSerializedActorRefProtocol(init.actorRef, format)) + .setInterfaceName(init.interfaceClass.getName) + .build + } + + private def fromBinaryToLocalTypedActorRef[T <: Actor, U <: AnyRef](bytes: Array[Byte], format: Format[T]): U = + fromProtobufToLocalTypedActorRef(SerializedTypedActorRefProtocol.newBuilder.mergeFrom(bytes).build, format, None) + + private def fromProtobufToLocalTypedActorRef[T <: Actor, U <: AnyRef]( + protocol: SerializedTypedActorRefProtocol, format: Format[T], loader: Option[ClassLoader]): U = { + Actor.log.debug("Deserializing SerializedTypedActorRefProtocol to LocalActorRef:\n" + protocol) + val actorRef = ActorSerialization.fromProtobufToLocalActorRef(protocol.getActorRef, format, loader) + val intfClass = toClass(loader, protocol.getInterfaceName) + TypedActor.newInstance(intfClass, actorRef).asInstanceOf[U] + } + + private[akka] def toClass[U <: AnyRef](loader: Option[ClassLoader], name: String): Class[U] = { + val classLoader = loader.getOrElse(getClass.getClassLoader) + val clazz = classLoader.loadClass(name) + clazz.asInstanceOf[Class[U]] + } +} + +/** + * Module for remote typed actor serialization. + */ +object RemoteTypedActorSerialization { + /** + * Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance. + */ + def fromBinaryToRemoteTypedActorRef[T <: AnyRef](bytes: Array[Byte]): T = + fromProtobufToRemoteTypedActorRef(RemoteTypedActorRefProtocol.newBuilder.mergeFrom(bytes).build, None) + + /** + * Deserializes a byte array (Array[Byte]) into a AW RemoteActorRef proxy. + */ + def fromBinaryToRemoteTypedActorRef[T <: AnyRef](bytes: Array[Byte], loader: ClassLoader): T = + fromProtobufToRemoteTypedActorRef(RemoteTypedActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader)) + + /** + * Serialize as AW RemoteActorRef proxy. + */ + def toBinary[T <: Actor](proxy: AnyRef): Array[Byte] = { + toRemoteTypedActorRefProtocol(proxy).toByteArray + } + + /** + * Deserializes a RemoteTypedActorRefProtocol Protocol Buffers (protobuf) Message into AW RemoteActorRef proxy. + */ + private[akka] def fromProtobufToRemoteTypedActorRef[T](protocol: RemoteTypedActorRefProtocol, loader: Option[ClassLoader]): T = { + Actor.log.debug("Deserializing RemoteTypedActorRefProtocol to AW RemoteActorRef proxy:\n" + protocol) + val actorRef = RemoteActorSerialization.fromProtobufToRemoteActorRef(protocol.getActorRef, loader) + val intfClass = TypedActorSerialization.toClass(loader, protocol.getInterfaceName) + TypedActor.createProxyForRemoteActorRef(intfClass, actorRef).asInstanceOf[T] + } + + /** + * Serializes the AW TypedActor proxy into a Protocol Buffers (protobuf) Message. + */ + def toRemoteTypedActorRefProtocol(proxy: AnyRef): RemoteTypedActorRefProtocol = { + val init = AspectInitRegistry.initFor(proxy) + RemoteTypedActorRefProtocol.newBuilder + .setActorRef(RemoteActorSerialization.toRemoteActorRefProtocol(init.actorRef)) + .setInterfaceName(init.interfaceClass.getName) + .build + } + } diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala index 59f122c656..e40d44ad6c 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala @@ -79,7 +79,6 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { } } - @Test def shouldSendWithBang { val actor = RemoteClient.actorFor( @@ -178,5 +177,41 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { assert(actor2.id == actor3.id) } + @Test + def shouldFindActorByUuid { + val actor1 = actorOf[RemoteActorSpecActorUnidirectional] + val actor2 = actorOf[RemoteActorSpecActorUnidirectional] + server.register("uuid:" + actor1.uuid, actor1) + server.register("my-service", actor2) + + val ref1 = RemoteClient.actorFor("uuid:" + actor1.uuid, HOSTNAME, PORT) + val ref2 = RemoteClient.actorFor("my-service", HOSTNAME, PORT) + + ref1 ! "OneWay" + assert(RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS)) + ref1.stop + ref2 ! "OneWay" + ref2.stop + + } + + @Test + def shouldRegisterAndUnregister { + val actor1 = actorOf[RemoteActorSpecActorUnidirectional] + server.register("my-service-1", actor1) + assert(server.actors().get("my-service-1") != null, "actor registered") + server.unregister("my-service-1") + assert(server.actors().get("my-service-1") == null, "actor unregistered") + } + + @Test + def shouldRegisterAndUnregisterByUuid { + val actor1 = actorOf[RemoteActorSpecActorUnidirectional] + server.register("uuid:" + actor1.uuid, actor1) + assert(server.actorsByUuid().get(actor1.uuid.toString) != null, "actor registered") + server.unregister("uuid:" + actor1.uuid) + assert(server.actorsByUuid().get(actor1.uuid) == null, "actor unregistered") + } + } diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala index b800fbf2c3..71ece9792e 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala @@ -103,9 +103,34 @@ class ServerInitiatedRemoteTypedActorSpec extends it("should register and unregister typed actors") { val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) server.registerTypedActor("my-test-service", typedActor) - assert(server.typedActors().get("my-test-service") != null) + assert(server.typedActors().get("my-test-service") ne null, "typed actor registered") server.unregisterTypedActor("my-test-service") - assert(server.typedActors().get("my-test-service") == null) + assert(server.typedActors().get("my-test-service") eq null, "typed actor unregistered") + } + + it("should register and unregister typed actors by uuid") { + val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) + val init = AspectInitRegistry.initFor(typedActor) + val uuid = "uuid:" + init.actorRef.uuid + server.registerTypedActor(uuid, typedActor) + assert(server.typedActorsByUuid().get(init.actorRef.uuid.toString) ne null, "typed actor registered") + server.unregisterTypedActor(uuid) + assert(server.typedActorsByUuid().get(init.actorRef.uuid.toString) eq null, "typed actor unregistered") + } + + it("should find typed actors by uuid") { + val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) + val init = AspectInitRegistry.initFor(typedActor) + val uuid = "uuid:" + init.actorRef.uuid + server.registerTypedActor(uuid, typedActor) + assert(server.typedActorsByUuid().get(init.actorRef.uuid.toString) ne null, "typed actor registered") + + val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], uuid, HOSTNAME, PORT) + expect("oneway") { + actor.oneWay + oneWayLog.poll(5, TimeUnit.SECONDS) + } + } } } diff --git a/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala b/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala index 7e8babe168..832a655c22 100644 --- a/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala +++ b/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala @@ -127,9 +127,16 @@ class SerializableTypeClassActorSpec extends (actor1 ! "hello") (actor1 ! "hello") (actor1 ! "hello") + actor1.mailboxSize should be > (0) val actor2 = fromBinary(toBinary(actor1)) Thread.sleep(1000) + actor2.mailboxSize should be > (0) (actor2 !! "hello-reply").getOrElse("_") should equal("world") + + val actor3 = fromBinary(toBinary(actor1, false)) + Thread.sleep(1000) + actor3.mailboxSize should equal(0) + (actor3 !! "hello-reply").getOrElse("_") should equal("world") } } } diff --git a/akka-remote/src/test/scala/serialization/Ticket435Spec.scala b/akka-remote/src/test/scala/serialization/Ticket435Spec.scala new file mode 100644 index 0000000000..ed175ea0ad --- /dev/null +++ b/akka-remote/src/test/scala/serialization/Ticket435Spec.scala @@ -0,0 +1,126 @@ +package se.scalablesolutions.akka.actor.serialization + + +import org.scalatest.Spec +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.BeforeAndAfterAll +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import se.scalablesolutions.akka.serialization._ +import se.scalablesolutions.akka.actor._ +import ActorSerialization._ +import Actor._ + +@RunWith(classOf[JUnitRunner]) +class Ticket435Spec extends + Spec with + ShouldMatchers with + BeforeAndAfterAll { + + object BinaryFormatMyStatefulActor { + implicit object MyStatefulActorFormat extends Format[MyStatefulActor] { + def fromBinary(bytes: Array[Byte], act: MyStatefulActor) = { + val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter] + act.count = p.getCount + act + } + def toBinary(ac: MyStatefulActor) = + ProtobufProtocol.Counter.newBuilder.setCount(ac.count).build.toByteArray + } + } + + object BinaryFormatMyStatelessActorWithMessagesInMailbox { + implicit object MyStatelessActorFormat extends StatelessActorFormat[MyStatelessActorWithMessagesInMailbox] + } + + describe("Serializable actor") { + + it("should be able to serialize and deserialize a stateless actor with messages in mailbox") { + import BinaryFormatMyStatelessActorWithMessagesInMailbox._ + + val actor1 = actorOf[MyStatelessActorWithMessagesInMailbox].start + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + actor1.mailboxSize should be > (0) + val actor2 = fromBinary(toBinary(actor1)) + Thread.sleep(1000) + actor2.mailboxSize should be > (0) + (actor2 !! "hello-reply").getOrElse("_") should equal("world") + + val actor3 = fromBinary(toBinary(actor1, false)) + Thread.sleep(1000) + actor3.mailboxSize should equal(0) + (actor3 !! "hello-reply").getOrElse("_") should equal("world") + } + + it("should serialize the mailbox optionally") { + import BinaryFormatMyStatelessActorWithMessagesInMailbox._ + + val actor1 = actorOf[MyStatelessActorWithMessagesInMailbox].start + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + (actor1 ! "hello") + actor1.mailboxSize should be > (0) + + val actor2 = fromBinary(toBinary(actor1, false)) + Thread.sleep(1000) + actor2.mailboxSize should equal(0) + (actor2 !! "hello-reply").getOrElse("_") should equal("world") + } + + it("should be able to serialize and deserialize a stateful actor with messages in mailbox") { + import BinaryFormatMyStatefulActor._ + + val actor1 = actorOf[MyStatefulActor].start + (actor1 ! "hi") + (actor1 ! "hi") + (actor1 ! "hi") + (actor1 ! "hi") + (actor1 ! "hi") + (actor1 ! "hi") + (actor1 ! "hi") + (actor1 ! "hi") + (actor1 ! "hi") + (actor1 ! "hi") + actor1.mailboxSize should be > (0) + val actor2 = fromBinary(toBinary(actor1)) + Thread.sleep(1000) + actor2.mailboxSize should be > (0) + (actor2 !! "hello").getOrElse("_") should equal("world 1") + + val actor3 = fromBinary(toBinary(actor1, false)) + Thread.sleep(1000) + actor3.mailboxSize should equal(0) + (actor3 !! "hello").getOrElse("_") should equal("world 1") + } + } +} + +class MyStatefulActor extends Actor { + var count = 0 + + def receive = { + case "hi" => + println("# messages in mailbox " + self.mailboxSize) + Thread.sleep(500) + case "hello" => + count = count + 1 + self.reply("world " + count) + } +} diff --git a/akka-remote/src/test/scala/serialization/TypedActorSerializationSpec.scala b/akka-remote/src/test/scala/serialization/TypedActorSerializationSpec.scala new file mode 100644 index 0000000000..ccf4d05f7f --- /dev/null +++ b/akka-remote/src/test/scala/serialization/TypedActorSerializationSpec.scala @@ -0,0 +1,166 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +package se.scalablesolutions.akka.actor.serialization + +import org.scalatest.Spec +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.BeforeAndAfterAll +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import se.scalablesolutions.akka.serialization._ +import se.scalablesolutions.akka.actor._ + +import TypedActorSerialization._ +import Actor._ +import se.scalablesolutions.akka.remote.{RemoteClient, RemoteServer} +import se.scalablesolutions.akka.actor.remote.ServerInitiatedRemoteActorSpec.RemoteActorSpecActorUnidirectional + +@RunWith(classOf[JUnitRunner]) +class TypedActorSerializationSpec extends + Spec with + ShouldMatchers with + BeforeAndAfterAll { + + var server1: RemoteServer = null + var typedActor: MyTypedActor = null + + override def beforeAll = { + server1 = new RemoteServer().start("localhost", 9991) + typedActor = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyTypedActorImpl], 1000) + server1.registerTypedActor("typed-actor-service", typedActor) + Thread.sleep(1000) + } + + // make sure the servers shutdown cleanly after the test has finished + override def afterAll = { + try { + TypedActor.stop(typedActor) + server1.shutdown + RemoteClient.shutdownAll + Thread.sleep(1000) + } catch { + case e => () + } + } + + object MyTypedStatelessActorFormat extends StatelessActorFormat[MyStatelessTypedActorImpl] + + class MyTypedActorFormat extends Format[MyTypedActorImpl] { + def fromBinary(bytes: Array[Byte], act: MyTypedActorImpl) = { + val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter] + act.count = p.getCount + act + } + def toBinary(ac: MyTypedActorImpl) = + ProtobufProtocol.Counter.newBuilder.setCount(ac.count).build.toByteArray + } + + class MyTypedActorWithDualCounterFormat extends Format[MyTypedActorWithDualCounter] { + def fromBinary(bytes: Array[Byte], act: MyTypedActorWithDualCounter) = { + val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.DualCounter])).asInstanceOf[ProtobufProtocol.DualCounter] + act.count1 = p.getCount1 + act.count2 = p.getCount2 + act + } + def toBinary(ac: MyTypedActorWithDualCounter) = + ProtobufProtocol.DualCounter.newBuilder.setCount1(ac.count1).setCount2(ac.count2).build.toByteArray + } + + + describe("Serializable typed actor") { + + it("should be able to serialize and de-serialize a stateless typed actor") { + val typedActor1 = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyStatelessTypedActorImpl], 1000) + typedActor1.requestReply("hello") should equal("world") + typedActor1.requestReply("hello") should equal("world") + + val bytes = toBinaryJ(typedActor1, MyTypedStatelessActorFormat) + val typedActor2: MyTypedActor = fromBinaryJ(bytes, MyTypedStatelessActorFormat) + typedActor2.requestReply("hello") should equal("world") + } + + it("should be able to serialize and de-serialize a stateful typed actor") { + val typedActor1 = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyTypedActorImpl], 1000) + typedActor1.requestReply("hello") should equal("world 1") + typedActor1.requestReply("scala") should equal("hello scala 2") + + val f = new MyTypedActorFormat + val bytes = toBinaryJ(typedActor1, f) + val typedActor2: MyTypedActor = fromBinaryJ(bytes, f) + typedActor2.requestReply("hello") should equal("world 3") + } + + it("should be able to serialize and de-serialize a stateful typed actor with compound state") { + val typedActor1 = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyTypedActorWithDualCounter], 1000) + typedActor1.requestReply("hello") should equal("world 1 1") + typedActor1.requestReply("hello") should equal("world 2 2") + + val f = new MyTypedActorWithDualCounterFormat + val bytes = toBinaryJ(typedActor1, f) + val typedActor2: MyTypedActor = fromBinaryJ(bytes, f) + typedActor2.requestReply("hello") should equal("world 3 3") + } + + it("should be able to serialize a local yped actor ref to a remote typed actor ref proxy") { + val typedActor1 = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyStatelessTypedActorImpl], 1000) + typedActor1.requestReply("hello") should equal("world") + typedActor1.requestReply("hello") should equal("world") + + val bytes = RemoteTypedActorSerialization.toBinary(typedActor1) + val typedActor2: MyTypedActor = RemoteTypedActorSerialization.fromBinaryToRemoteTypedActorRef(bytes) + typedActor1.requestReply("hello") should equal("world") + } + } +} + + +trait MyTypedActor { + def requestReply(s: String) : String + def oneWay() : Unit +} + +class MyTypedActorImpl extends TypedActor with MyTypedActor { + var count = 0 + + override def oneWay() { + println("got oneWay message") + } + + override def requestReply(message: String) : String = { + count = count + 1 + if (message == "hello") { + "world " + count + } else ("hello " + message + " " + count) + } +} + +class MyTypedActorWithDualCounter extends TypedActor with MyTypedActor { + var count1 = 0 + var count2 = 0 + + override def oneWay() { + println("got oneWay message") + } + + override def requestReply(message: String) : String = { + count1 = count1 + 1 + count2 = count2 + 1 + + if (message == "hello") { + "world " + count1 + " " + count2 + } else ("hello " + message + " " + count1 + " " + count2) + } +} + +class MyStatelessTypedActorImpl extends TypedActor with MyTypedActor { + + override def oneWay() { + println("got oneWay message") + } + + override def requestReply(message: String) : String = { + if (message == "hello") "world" else ("hello " + message) + } +} diff --git a/akka-typed-actor/src/main/scala/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/actor/TypedActor.scala index 2ae01a5670..a4c7ddada1 100644 --- a/akka-typed-actor/src/main/scala/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/actor/TypedActor.scala @@ -390,11 +390,22 @@ object TypedActor extends Logging { if (config._messageDispatcher.isDefined) actorRef.dispatcher = config._messageDispatcher.get if (config._threadBasedDispatcher.isDefined) actorRef.dispatcher = Dispatchers.newThreadBasedDispatcher(actorRef) if (config._host.isDefined) actorRef.makeRemote(config._host.get) + actorRef.timeout = config.timeout AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, config._host, config.timeout)) actorRef.start proxy.asInstanceOf[T] } + private[akka] def newInstance[T](intfClass: Class[T], actorRef: ActorRef): T = { + if (!actorRef.actorInstance.get.isInstanceOf[TypedActor]) throw new IllegalArgumentException("ActorRef is not a ref to a typed actor") + val typedActor = actorRef.actorInstance.get.asInstanceOf[TypedActor] + val proxy = Proxy.newInstance(Array(intfClass), Array(typedActor), true, false) + typedActor.initialize(proxy) + AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, actorRef.remoteAddress, actorRef.timeout)) + actorRef.start + proxy.asInstanceOf[T] + } + private[akka] def newInstance[T](intfClass: Class[T], targetClass: Class[_], remoteAddress: Option[InetSocketAddress], timeout: Long): T = { val actorRef = actorOf(newTypedActor(targetClass))