diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index 2d97a5cca6..158cc2cc25 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -28,6 +28,17 @@ import java.lang.reflect.Field import scala.reflect.BeanProperty + +object ActorRefStatus { + /** LifeCycles for ActorRefs + */ + private[akka] sealed trait StatusType + object UNSTARTED extends StatusType + object RUNNING extends StatusType + object BEING_RESTARTED extends StatusType + object SHUTDOWN extends StatusType +} + /** * ActorRef is an immutable and serializable handle to an Actor. *

@@ -68,9 +79,7 @@ trait ActorRef extends // Only mutable for RemoteServer in order to maintain identity across nodes @volatile protected[akka] var _uuid = UUID.newUuid.toString - @volatile protected[this] var _isRunning = false - @volatile protected[this] var _isShutDown = false - @volatile protected[akka] var _isBeingRestarted = false + @volatile protected[this] var _status: ActorRefStatus.StatusType = ActorRefStatus.UNSTARTED @volatile protected[akka] var _homeAddress = new InetSocketAddress(RemoteServerModule.HOSTNAME, RemoteServerModule.PORT) @volatile protected[akka] var _futureTimeout: Option[ScheduledFuture[AnyRef]] = None @volatile protected[akka] var registeredInRemoteNodeDuringSerialization = false @@ -229,17 +238,25 @@ trait ActorRef extends /** * Is the actor being restarted? */ - def isBeingRestarted: Boolean = _isBeingRestarted + def isBeingRestarted: Boolean = _status == ActorRefStatus.BEING_RESTARTED /** * Is the actor running? */ - def isRunning: Boolean = _isRunning + def isRunning: Boolean = _status match { + case ActorRefStatus.BEING_RESTARTED | ActorRefStatus.RUNNING => true + case _ => false + } /** * Is the actor shut down? */ - def isShutdown: Boolean = _isShutDown + def isShutdown: Boolean = _status == ActorRefStatus.SHUTDOWN + + /** + * Is the actor ever started? + */ + def isUnstarted: Boolean = _status == ActorRefStatus.UNSTARTED /** * Is the actor able to handle the message passed in as arguments? @@ -800,7 +817,7 @@ class LocalActorRef private[akka]( if (isTransactor) { _transactionFactory = Some(TransactionFactory(_transactionConfig, id)) } - _isRunning = true + _status = ActorRefStatus.RUNNING if (!isInInitialization) initializeActorInstance else runActorInitialization = true } @@ -815,8 +832,7 @@ class LocalActorRef private[akka]( cancelReceiveTimeout dispatcher.unregister(this) _transactionFactory = None - _isRunning = false - _isShutDown = true + _status = ActorRefStatus.SHUTDOWN actor.postStop ActorRegistry.unregister(this) if (isRemotingEnabled) { @@ -1000,7 +1016,7 @@ class LocalActorRef private[akka]( } /** - * Callback for the dispatcher. This is the ingle entry point to the user Actor implementation. + * Callback for the dispatcher. This is the single entry point to the user Actor implementation. */ protected[akka] def invoke(messageHandle: MessageInvocation): Unit = guard.withGuard { if (isShutdown) @@ -1067,7 +1083,7 @@ class LocalActorRef private[akka]( stop } else { - _isBeingRestarted = true + _status = ActorRefStatus.BEING_RESTARTED val failedActor = actorInstance.get guard.withGuard { lifeCycle match { @@ -1077,10 +1093,12 @@ class LocalActorRef private[akka]( Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id) Actor.log.debug("Restarting linked actors for actor [%s].", id) restartLinkedActors(reason, maxNrOfRetries, withinTimeRange) + Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id) if (isProxyableDispatcher(failedActor)) restartProxyableDispatcher(failedActor, reason) - else restartActor(failedActor, reason) - _isBeingRestarted = false + else restartActor(failedActor, reason) + + _status = ActorRefStatus.RUNNING } } } @@ -1236,7 +1254,7 @@ class LocalActorRef private[akka]( private def handleExceptionInDispatch(reason: Throwable, message: Any, topLevelTransaction: Boolean) = { Actor.log.error(reason, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message) - _isBeingRestarted = true + _status = ActorRefStatus.BEING_RESTARTED // abort transaction set if (isTransactionSetInScope) { val txSet = getTransactionSetInScope @@ -1376,13 +1394,12 @@ private[akka] case class RemoteActorRef private[akka] ( } def start: ActorRef = { - _isRunning = true + _status = ActorRefStatus.RUNNING this } def stop: Unit = { - _isRunning = false - _isShutDown = true + _status = ActorRefStatus.SHUTDOWN postMessageToMailbox(RemoteActorSystemMessage.Stop, None) } diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 35c46852ec..de4512c094 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -187,4 +187,4 @@ class ExecutorBasedEventDrivenDispatcher( config(this) buildThreadPool } -} +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala index 71fe06eb38..f3f8494219 100644 --- a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala @@ -16,7 +16,13 @@ import java.util.concurrent.{ConcurrentLinkedQueue, BlockingQueue, TimeUnit, Lin * * @author Jonas Bonér */ -class ThreadBasedDispatcher(private val actor: ActorRef, _mailboxType: MailboxType) extends MessageDispatcher { +class ThreadBasedDispatcher(private val actor: ActorRef, _mailboxType: MailboxType) + extends ExecutorBasedEventDrivenDispatcher( + actor.getClass.getName + ":" + actor.uuid, + Dispatchers.THROUGHPUT, + -1, + _mailboxType, + ThreadBasedDispatcher.oneThread) { def this(actor: ActorRef) = this(actor, BoundedMailbox(true)) // For Java API @@ -24,54 +30,19 @@ class ThreadBasedDispatcher(private val actor: ActorRef, _mailboxType: MailboxTy def this(actor: ActorRef, capacity: Int, pushTimeOut: Duration) = this(actor, BoundedMailbox(true, capacity, pushTimeOut)) - val mailboxType = Some(_mailboxType) - - private val name = actor.getClass.getName + ":" + actor.uuid - private val threadName = "akka:thread-based:dispatcher:" + name - private var selectorThread: Thread = _ - @volatile private var active: Boolean = false - - def createTransientMailbox(actorRef: ActorRef, mailboxType: TransientMailboxType): AnyRef = mailboxType match { - case UnboundedMailbox(blocking) => - new DefaultUnboundedMessageQueue(blocking) - case BoundedMailbox(blocking, capacity, pushTimeOut) => - new DefaultBoundedMessageQueue(capacity, pushTimeOut, blocking) - } - override def register(actorRef: ActorRef) = { if (actorRef != actor) throw new IllegalArgumentException("Cannot register to anyone but " + actor) super.register(actorRef) } - def mailbox = actor.mailbox.asInstanceOf[Queue[MessageInvocation] with MessageQueue] + override def toString = "ThreadBasedDispatcher[" + name + "]" +} - def mailboxSize(a: ActorRef) = mailbox.size - - def dispatch(invocation: MessageInvocation) = mailbox enqueue invocation - - def start = if (!active) { - log.debug("Starting up %s", toString) - active = true - selectorThread = new Thread(threadName) { - override def run = { - while (active) { - try { - actor.invoke(mailbox.dequeue) - } catch { case e: InterruptedException => active = false } - } - } - } - selectorThread.start +object ThreadBasedDispatcher { + def oneThread(b: ThreadPoolBuilder) { + b setCorePoolSize 1 + b setMaxPoolSize 1 + b setAllowCoreThreadTimeout true } +} - def isShutdown = !active - - def shutdown = if (active) { - log.debug("Shutting down %s", toString) - active = false - selectorThread.interrupt - uuids.clear - } - - override def toString = "ThreadBasedDispatcher[" + threadName + "]" -} \ No newline at end of file diff --git a/akka-actor/src/main/scala/util/Logging.scala b/akka-actor/src/main/scala/util/Logging.scala index b6ddaaa16a..8d2e64be58 100644 --- a/akka-actor/src/main/scala/util/Logging.scala +++ b/akka-actor/src/main/scala/util/Logging.scala @@ -111,7 +111,7 @@ class Logger(val logger: SLFLogger) { warning(message(fmt,arg,argN:_*)) } - def warn(fmt: => String, arg: Any, argN: Any*) = warning(fmt, arg, argN) + def warn(fmt: => String, arg: Any, argN: Any*) = warning(fmt, arg, argN:_*) def warning(msg: => String) { if (warning_?) logger warn msg diff --git a/akka-actor/src/test/scala/dataflow/DataFlowSpec.scala b/akka-actor/src/test/scala/dataflow/DataFlowSpec.scala index 2997715452..d1f663e9f4 100644 --- a/akka-actor/src/test/scala/dataflow/DataFlowSpec.scala +++ b/akka-actor/src/test/scala/dataflow/DataFlowSpec.scala @@ -69,5 +69,97 @@ class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll { result.get should equal (sum(0,ints(0,1000))) List(x,y,z).foreach(_.shutdown) } + + /*it("should be able to join streams") { + import DataFlow._ + ActorRegistry.shutdownAll + + def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { + stream <<< n + ints(n + 1, max, stream) + } + + def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = { + out <<< s + sum(in() + s, in, out) + } + + val producer = new DataFlowStream[Int] + val consumer = new DataFlowStream[Int] + val latch = new CountDownLatch(1) + val result = new AtomicInteger(0) + + val t1 = thread { ints(0, 1000, producer) } + val t2 = thread { + Thread.sleep(1000) + result.set(producer.map(x => x * x).foldLeft(0)(_ + _)) + latch.countDown + } + + latch.await(3,TimeUnit.SECONDS) should equal (true) + result.get should equal (332833500) } + + it("should be able to sum streams recursively") { + import DataFlow._ + + def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { + stream <<< n + ints(n + 1, max, stream) + } + + def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = { + out <<< s + sum(in() + s, in, out) + } + + val result = new AtomicLong(0) + + val producer = new DataFlowStream[Int] + val consumer = new DataFlowStream[Int] + val latch = new CountDownLatch(1) + + @tailrec def recurseSum(stream: DataFlowStream[Int]): Unit = { + val x = stream() + + if(result.addAndGet(x) == 166666500) + latch.countDown + + recurseSum(stream) + } + + thread { ints(0, 1000, producer) } + thread { sum(0, producer, consumer) } + thread { recurseSum(consumer) } + + latch.await(15,TimeUnit.SECONDS) should equal (true) + }*/ + + /* Test not ready for prime time, causes some sort of deadlock */ + /* it("should be able to conditionally set variables") { + + import DataFlow._ + ActorRegistry.shutdownAll + + val latch = new CountDownLatch(1) + val x, y, z, v = new DataFlowVariable[Int] + + val main = thread { + x << 1 + z << Math.max(x(),y()) + latch.countDown + } + + val setY = thread { + // Thread.sleep(2000) + y << 2 + } + + val setV = thread { + v << y + } + List(x,y,z,v) foreach (_.shutdown) + latch.await(2,TimeUnit.SECONDS) should equal (true) + }*/ + } } \ No newline at end of file diff --git a/akka-camel/src/main/scala/component/ActorComponent.scala b/akka-camel/src/main/scala/component/ActorComponent.scala index 6c1c5902fa..89cc0d4d3e 100644 --- a/akka-camel/src/main/scala/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/component/ActorComponent.scala @@ -18,10 +18,10 @@ import se.scalablesolutions.akka.camel.{Failure, CamelMessageConversion, Message import CamelMessageConversion.toExchangeAdapter import se.scalablesolutions.akka.dispatch.{CompletableFuture, MessageInvocation, MessageDispatcher} import se.scalablesolutions.akka.stm.TransactionConfig -import se.scalablesolutions.akka.actor.{ScalaActorRef, ActorRegistry, Actor, ActorRef} import se.scalablesolutions.akka.AkkaException import scala.reflect.BeanProperty +import se.scalablesolutions.akka.actor._ /** * Camel component for sending messages to and receiving replies from (untyped) actors. @@ -199,13 +199,12 @@ private[akka] object AsyncCallbackAdapter { private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCallback) extends ActorRef with ScalaActorRef { def start = { - _isRunning = true + _status = ActorRefStatus.RUNNING this } def stop() = { - _isRunning = false - _isShutDown = true + _status = ActorRefStatus.SHUTDOWN } /** diff --git a/akka-persistence/akka-persistence-hbase/src/main/scala/HbaseStorage.scala b/akka-persistence/akka-persistence-hbase/src/main/scala/HbaseStorage.scala new file mode 100644 index 0000000000..1c3abdff4e --- /dev/null +++ b/akka-persistence/akka-persistence-hbase/src/main/scala/HbaseStorage.scala @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.persistence.hbase + +import se.scalablesolutions.akka.util.UUID +import se.scalablesolutions.akka.stm._ +import se.scalablesolutions.akka.persistence.common._ + +object HbaseStorage extends Storage { + type ElementType = Array[Byte] + + def newMap: PersistentMap[ElementType, ElementType] = newMap(UUID.newUuid.toString) + def newVector: PersistentVector[ElementType] = newVector(UUID.newUuid.toString) + def newRef: PersistentRef[ElementType] = newRef(UUID.newUuid.toString) + + def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id) + def getVector(id: String): PersistentVector[ElementType] = newVector(id) + def getRef(id: String): PersistentRef[ElementType] = newRef(id) + + def newMap(id: String): PersistentMap[ElementType, ElementType] = new HbasePersistentMap(id) + def newVector(id: String): PersistentVector[ElementType] = new HbasePersistentVector(id) + def newRef(id: String): PersistentRef[ElementType] = new HbasePersistentRef(id) +} + +/** + * Implements a persistent transactional map based on Hbase. + * + * @author David Greco + */ +class HbasePersistentMap(id: String) extends PersistentMapBinary { + val uuid = id + val storage = HbaseStorageBackend +} + +/** + * Implements a persistent transactional vector based on Hbase. + * + * @author David Greco + */ +class HbasePersistentVector(id: String) extends PersistentVector[Array[Byte]] { + val uuid = id + val storage = HbaseStorageBackend +} + +class HbasePersistentRef(id: String) extends PersistentRef[Array[Byte]] { + val uuid = id + val storage = HbaseStorageBackend +} diff --git a/akka-persistence/akka-persistence-hbase/src/main/scala/HbaseStorageBackend.scala b/akka-persistence/akka-persistence-hbase/src/main/scala/HbaseStorageBackend.scala new file mode 100644 index 0000000000..4b887add31 --- /dev/null +++ b/akka-persistence/akka-persistence-hbase/src/main/scala/HbaseStorageBackend.scala @@ -0,0 +1,255 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.persistence.hbase + +import scala.collection.mutable.ListBuffer +import se.scalablesolutions.akka.stm._ +import se.scalablesolutions.akka.persistence.common._ +import se.scalablesolutions.akka.util.Logging +import se.scalablesolutions.akka.util.Helpers._ +import se.scalablesolutions.akka.config.Config.config +import org.apache.hadoop.hbase.HBaseConfiguration +import org.apache.hadoop.hbase.HColumnDescriptor +import org.apache.hadoop.hbase.HTableDescriptor +import org.apache.hadoop.hbase.client.HBaseAdmin +import org.apache.hadoop.hbase.client.HTable +import org.apache.hadoop.hbase.client.Put +import org.apache.hadoop.hbase.client.Get +import org.apache.hadoop.hbase.client.Delete +import org.apache.hadoop.hbase.util.Bytes + +/** + * @author David Greco + */ +private[akka] object HbaseStorageBackend extends MapStorageBackend[Array[Byte], Array[Byte]] with VectorStorageBackend[Array[Byte]] with RefStorageBackend[Array[Byte]] with Logging { + + val EMPTY_BYTE_ARRAY = new Array[Byte](0) + val HBASE_ZOOKEEPER_QUORUM = config.getString("akka.storage.hbase.zookeeper.quorum", "localhost") + val CONFIGURATION = new HBaseConfiguration + val REF_TABLE_NAME = "__REF_TABLE" + val VECTOR_TABLE_NAME = "__VECTOR_TABLE" + val VECTOR_ELEMENT_COLUMN_FAMILY_NAME = "__VECTOR_ELEMENT" + val MAP_ELEMENT_COLUMN_FAMILY_NAME = "__MAP_ELEMENT" + val MAP_TABLE_NAME = "__MAP_TABLE" + var REF_TABLE: HTable = _ + var VECTOR_TABLE: HTable = _ + var MAP_TABLE: HTable = _ + + CONFIGURATION.set("hbase.zookeeper.quorum", HBASE_ZOOKEEPER_QUORUM) + + init + + def init { + val ADMIN = new HBaseAdmin(CONFIGURATION) + + if (!ADMIN.tableExists(REF_TABLE_NAME)) { + ADMIN.createTable(new HTableDescriptor(REF_TABLE_NAME)) + ADMIN.disableTable(REF_TABLE_NAME) + ADMIN.addColumn(REF_TABLE_NAME, new HColumnDescriptor("element")) + ADMIN.enableTable(REF_TABLE_NAME) + } + REF_TABLE = new HTable(CONFIGURATION, REF_TABLE_NAME); + + if (!ADMIN.tableExists(VECTOR_TABLE_NAME)) { + ADMIN.createTable(new HTableDescriptor(VECTOR_TABLE_NAME)) + ADMIN.disableTable(VECTOR_TABLE_NAME) + ADMIN.addColumn(VECTOR_TABLE_NAME, new HColumnDescriptor(VECTOR_ELEMENT_COLUMN_FAMILY_NAME)) + ADMIN.enableTable(VECTOR_TABLE_NAME); + } + VECTOR_TABLE = new HTable(CONFIGURATION, VECTOR_TABLE_NAME) + + if (!ADMIN.tableExists(MAP_TABLE_NAME)) { + ADMIN.createTable(new HTableDescriptor(MAP_TABLE_NAME)) + ADMIN.disableTable(MAP_TABLE_NAME) + ADMIN.addColumn(MAP_TABLE_NAME, new HColumnDescriptor(MAP_ELEMENT_COLUMN_FAMILY_NAME)) + ADMIN.enableTable(MAP_TABLE_NAME); + } + MAP_TABLE = new HTable(CONFIGURATION, MAP_TABLE_NAME) + } + + def drop { + val ADMIN = new HBaseAdmin(CONFIGURATION) + + if (ADMIN.tableExists(REF_TABLE_NAME)) { + ADMIN.disableTable(REF_TABLE_NAME) + ADMIN.deleteTable(REF_TABLE_NAME) + } + if (ADMIN.tableExists(VECTOR_TABLE_NAME)) { + ADMIN.disableTable(VECTOR_TABLE_NAME) + ADMIN.deleteTable(VECTOR_TABLE_NAME) + } + if (ADMIN.tableExists(MAP_TABLE_NAME)) { + ADMIN.disableTable(MAP_TABLE_NAME) + ADMIN.deleteTable(MAP_TABLE_NAME) + } + init + } + + // =============================================================== + // For Ref + // =============================================================== + + def insertRefStorageFor(name: String, element: Array[Byte]) = { + val row = new Put(Bytes.toBytes(name)) + row.add(Bytes.toBytes("element"), Bytes.toBytes("element"), element) + REF_TABLE.put(row) + } + + def getRefStorageFor(name: String): Option[Array[Byte]] = { + val row = new Get(Bytes.toBytes(name)) + val result = REF_TABLE.get(row) + + if (result.isEmpty()) + None + else + Some(result.getValue(Bytes.toBytes("element"), Bytes.toBytes("element"))) + } + + // =============================================================== + // For Vector + // =============================================================== + + def insertVectorStorageEntryFor(name: String, element: Array[Byte]) = { + val row = new Put(Bytes.toBytes(name)) + val size = getVectorStorageSizeFor(name) + row.add(Bytes.toBytes(VECTOR_ELEMENT_COLUMN_FAMILY_NAME), Bytes.toBytes(size), element) + VECTOR_TABLE.put(row) + } + + def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = elements.reverse.foreach(insertVectorStorageEntryFor(name, _)) + + def updateVectorStorageEntryFor(name: String, index: Int, element: Array[Byte]) = { + val row = new Put(Bytes.toBytes(name)) + row.add(Bytes.toBytes(VECTOR_ELEMENT_COLUMN_FAMILY_NAME), Bytes.toBytes(index), element) + VECTOR_TABLE.put(row) + } + + def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = { + val row = new Get(Bytes.toBytes(name)) + val result = VECTOR_TABLE.get(row) + val size = result.size + val colnum = size - index - 1 + + result.getValue(Bytes.toBytes(VECTOR_ELEMENT_COLUMN_FAMILY_NAME),Bytes.toBytes(colnum)) + } + + /** + * if start and finish both are defined, ignore count and + * report the range [start, finish) + * if start is not defined, assume start = 0 + * if start == 0 and finish == 0, return an empty collection + */ + def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = { + + import scala.math._ + + val row = new Get(Bytes.toBytes(name)) + val result = VECTOR_TABLE.get(row) + val size = result.size + var listBuffer = new ListBuffer[Array[Byte]] + var b = 0 + var e = 0 + + if(start.isDefined && finish.isDefined) { + b = start.get + e = finish.get - 1 + } else { + b = start.getOrElse(0) + e = finish.getOrElse(min(b + count - 1, size - 1)) + } + for(i <- b to e) { + val colnum = size - i - 1 + listBuffer += result.getValue(Bytes.toBytes(VECTOR_ELEMENT_COLUMN_FAMILY_NAME),Bytes.toBytes(colnum)) + } + listBuffer.toList + } + + def getVectorStorageSizeFor(name: String): Int = { + val row = new Get(Bytes.toBytes(name)) + val result = VECTOR_TABLE.get(row) + + if (result.isEmpty) + 0 + else + result.size + } + + // =============================================================== + // For Map + // =============================================================== + + def insertMapStorageEntryFor(name: String, key: Array[Byte], element: Array[Byte]) = { + val row = new Put(Bytes.toBytes(name)) + row.add(Bytes.toBytes(MAP_ELEMENT_COLUMN_FAMILY_NAME), key, element) + MAP_TABLE.put(row) + } + + def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[Array[Byte], Array[Byte]]]) = entries.foreach((x:Tuple2[Array[Byte], Array[Byte]]) => insertMapStorageEntryFor(name, x._1, x._2)) + + def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = { + val row = new Get(Bytes.toBytes(name)) + val result = MAP_TABLE.get(row) + + Option(result.getValue(Bytes.toBytes(MAP_ELEMENT_COLUMN_FAMILY_NAME), key)) + } + + def getMapStorageFor(name: String): List[Tuple2[Array[Byte], Array[Byte]]] = { + val row = new Get(Bytes.toBytes(name)) + val result = MAP_TABLE.get(row) + val raw = result.getFamilyMap(Bytes.toBytes(MAP_ELEMENT_COLUMN_FAMILY_NAME)).entrySet.toArray + val listBuffer = new ListBuffer[Tuple2[Array[Byte], Array[Byte]]] + + for(i <- Range(raw.size-1, -1, -1)) { + listBuffer += Tuple2(raw.apply(i).asInstanceOf[java.util.Map.Entry[Array[Byte], Array[Byte]]].getKey, raw.apply(i).asInstanceOf[java.util.Map.Entry[Array[Byte],Array[Byte]]].getValue) + } + listBuffer.toList + } + + def getMapStorageSizeFor(name: String): Int = { + val row = new Get(Bytes.toBytes(name)) + val result = MAP_TABLE.get(row) + + if (result.isEmpty) + 0 + else + result.size + } + + def removeMapStorageFor(name: String): Unit = { + val row = new Delete(Bytes.toBytes(name)) + MAP_TABLE.delete(row) + } + + def removeMapStorageFor(name: String, key: Array[Byte]): Unit = { + val row = new Delete(Bytes.toBytes(name)) + row.deleteColumns(Bytes.toBytes(MAP_ELEMENT_COLUMN_FAMILY_NAME), key) + MAP_TABLE.delete(row) + } + + def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], count: Int): List[Tuple2[Array[Byte], Array[Byte]]] = { + val row = new Get(Bytes.toBytes(name)) + val result = MAP_TABLE.get(row) + val map = result.getFamilyMap(Bytes.toBytes(MAP_ELEMENT_COLUMN_FAMILY_NAME)) + + val startBytes = if (start.isDefined) start.get else map.firstEntry.getKey + val finishBytes = if (finish.isDefined) finish.get else map.lastEntry.getKey + val submap = map.subMap(startBytes, true, finishBytes, true) + + val iterator = submap.entrySet.iterator + val listBuffer = new ListBuffer[Tuple2[Array[Byte], Array[Byte]]] + val size = submap.size + + val cnt = if(count > size) size else count + var i: Int = 0 + while(iterator.hasNext && i < cnt) { + iterator.next match { + case entry: java.util.Map.Entry[Array[Byte], Array[Byte]] => listBuffer += ((entry.getKey,entry.getValue)) + case _ => + } + i = i+1 + } + listBuffer.toList + } +} diff --git a/akka-persistence/akka-persistence-hbase/src/test/resources/log4j.properties b/akka-persistence/akka-persistence-hbase/src/test/resources/log4j.properties new file mode 100644 index 0000000000..3c8738fdc3 --- /dev/null +++ b/akka-persistence/akka-persistence-hbase/src/test/resources/log4j.properties @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +log4j.rootLogger=DEBUG,R + +# rolling log file ("system.log +log4j.appender.R=org.apache.log4j.DailyRollingFileAppender +log4j.appender.R.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.R.layout=org.apache.log4j.PatternLayout +log4j.appender.R.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n +log4j.appender.R.File=target/logs/system.log diff --git a/akka-persistence/akka-persistence-hbase/src/test/scala/HbasePersistentActorSpec.scala b/akka-persistence/akka-persistence-hbase/src/test/scala/HbasePersistentActorSpec.scala new file mode 100644 index 0000000000..468cd800ce --- /dev/null +++ b/akka-persistence/akka-persistence-hbase/src/test/scala/HbasePersistentActorSpec.scala @@ -0,0 +1,174 @@ +package se.scalablesolutions.akka.persistence.hbase + +import se.scalablesolutions.akka.actor.{ Actor, ActorRef, Transactor } +import Actor._ + +import org.junit.Test +import org.junit.Assert._ +import org.junit.BeforeClass +import org.junit.Before +import org.junit.AfterClass +import org.junit.After + +import org.scalatest.junit.JUnitSuite +import org.scalatest.BeforeAndAfterAll +import org.apache.hadoop.hbase.HBaseTestingUtility + +case class GetMapState(key: String) +case object GetVectorState +case object GetVectorSize +case object GetRefState + +case class SetMapState(key: String, value: String) +case class SetVectorState(key: String) +case class SetRefState(key: String) +case class Success(key: String, value: String) +case class Failure(key: String, value: String, failer: ActorRef) + +case class SetMapStateOneWay(key: String, value: String) +case class SetVectorStateOneWay(key: String) +case class SetRefStateOneWay(key: String) +case class SuccessOneWay(key: String, value: String) +case class FailureOneWay(key: String, value: String, failer: ActorRef) + +class HbasePersistentActor extends Transactor { + self.timeout = 100000 + + private lazy val mapState = HbaseStorage.newMap + private lazy val vectorState = HbaseStorage.newVector + private lazy val refState = HbaseStorage.newRef + + def receive = { + case GetMapState(key) => + self.reply(mapState.get(key.getBytes("UTF-8")).get) + case GetVectorSize => + self.reply(vectorState.length.asInstanceOf[AnyRef]) + case GetRefState => + self.reply(refState.get.get) + case SetMapState(key, msg) => + mapState.put(key.getBytes("UTF-8"), msg.getBytes("UTF-8")) + self.reply(msg) + case SetVectorState(msg) => + vectorState.add(msg.getBytes("UTF-8")) + self.reply(msg) + case SetRefState(msg) => + refState.swap(msg.getBytes("UTF-8")) + self.reply(msg) + case Success(key, msg) => + mapState.put(key.getBytes("UTF-8"), msg.getBytes("UTF-8")) + vectorState.add(msg.getBytes("UTF-8")) + refState.swap(msg.getBytes("UTF-8")) + self.reply(msg) + case Failure(key, msg, failer) => + mapState.put(key.getBytes("UTF-8"), msg.getBytes("UTF-8")) + vectorState.add(msg.getBytes("UTF-8")) + refState.swap(msg.getBytes("UTF-8")) + failer !! "Failure" + self.reply(msg) + } +} + +@serializable +class PersistentFailerActor extends Transactor { + def receive = { + case "Failure" => + throw new RuntimeException("Expected exception; to test fault-tolerance") + } +} + +class HbasePersistentActorSpec extends JUnitSuite with BeforeAndAfterAll { + + val testUtil = new HBaseTestingUtility + + override def beforeAll { + testUtil.startMiniCluster + } + + override def afterAll { + testUtil.shutdownMiniCluster + } + + @Before + def beforeEach { + HbaseStorageBackend.drop + } + + @After + def afterEach { + HbaseStorageBackend.drop + } + + @Test + def testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { + val stateful = actorOf[HbasePersistentActor] + stateful.start + stateful !! SetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state + stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired + val result = (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).as[Array[Byte]].get + assertEquals("new state", new String(result, 0, result.length, "UTF-8")) + } + + @Test + def testMapShouldRollbackStateForStatefulServerInCaseOfFailure = { + val stateful = actorOf[HbasePersistentActor] + stateful.start + stateful !! SetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state + val failer = actorOf[PersistentFailerActor] + failer.start + try { + stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method + fail("should have thrown an exception") + } catch { case e: RuntimeException => {} } + val result = (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).as[Array[Byte]].get + assertEquals("init", new String(result, 0, result.length, "UTF-8")) // check that state is == init state + } + + @Test + def testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { + val stateful = actorOf[HbasePersistentActor] + stateful.start + stateful !! SetVectorState("init") // set init state + stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired + assertEquals(2, (stateful !! GetVectorSize).get.asInstanceOf[java.lang.Integer].intValue) + } + + @Test + def testVectorShouldRollbackStateForStatefulServerInCaseOfFailure = { + val stateful = actorOf[HbasePersistentActor] + stateful.start + stateful !! SetVectorState("init") // set init state + val failer = actorOf[PersistentFailerActor] + failer.start + try { + stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method + fail("should have thrown an exception") + } catch { case e: RuntimeException => {} } + assertEquals(1, (stateful !! GetVectorSize).get.asInstanceOf[java.lang.Integer].intValue) + } + + @Test + def testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { + val stateful = actorOf[HbasePersistentActor] + stateful.start + stateful !! SetRefState("init") // set init state + stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired + val result = (stateful !! GetRefState).as[Array[Byte]].get + assertEquals("new state", new String(result, 0, result.length, "UTF-8")) + } + + @Test + def testRefShouldRollbackStateForStatefulServerInCaseOfFailure = { + val stateful = actorOf[HbasePersistentActor] + stateful.start + stateful !! SetRefState("init") // set init state + val failer = actorOf[PersistentFailerActor] + failer.start + try { + stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method + fail("should have thrown an exception") + } catch { case e: RuntimeException => {} } + val result = (stateful !! GetRefState).as[Array[Byte]].get + assertEquals("init", new String(result, 0, result.length, "UTF-8")) // check that state is == init state + } + +} diff --git a/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseStorageSpec.scala b/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseStorageSpec.scala new file mode 100644 index 0000000000..1bad777675 --- /dev/null +++ b/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseStorageSpec.scala @@ -0,0 +1,177 @@ +package se.scalablesolutions.akka.persistence.hbase + +import org.scalatest.Spec +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.BeforeAndAfterAll +import org.scalatest.BeforeAndAfterEach + +class HbaseStorageSpec extends +Spec with +ShouldMatchers with +BeforeAndAfterAll with +BeforeAndAfterEach { + + import org.apache.hadoop.hbase.HBaseTestingUtility + + val testUtil = new HBaseTestingUtility + + override def beforeAll { + testUtil.startMiniCluster + } + + override def afterAll { + testUtil.shutdownMiniCluster + } + + override def beforeEach { + HbaseStorageBackend.drop + } + + override def afterEach { + HbaseStorageBackend.drop + } + + describe("persistent maps") { + it("should insert with single key and value") { + import HbaseStorageBackend._ + + insertMapStorageEntryFor("t1", "odersky".getBytes, "scala".getBytes) + insertMapStorageEntryFor("t1", "gosling".getBytes, "java".getBytes) + insertMapStorageEntryFor("t1", "stroustrup".getBytes, "c++".getBytes) + getMapStorageSizeFor("t1") should equal(3) + new String(getMapStorageEntryFor("t1", "odersky".getBytes).get) should equal("scala") + new String(getMapStorageEntryFor("t1", "gosling".getBytes).get) should equal("java") + new String(getMapStorageEntryFor("t1", "stroustrup".getBytes).get) should equal("c++") + getMapStorageEntryFor("t1", "torvalds".getBytes) should equal(None) + } + + it("should insert with multiple keys and values") { + import HbaseStorageBackend._ + + val l = List(("stroustrup", "c++"), ("odersky", "scala"), ("gosling", "java")) + insertMapStorageEntriesFor("t1", l.map { case (k, v) => (k.getBytes, v.getBytes) }) + getMapStorageSizeFor("t1") should equal(3) + new String(getMapStorageEntryFor("t1", "stroustrup".getBytes).get) should equal("c++") + new String(getMapStorageEntryFor("t1", "gosling".getBytes).get) should equal("java") + new String(getMapStorageEntryFor("t1", "odersky".getBytes).get) should equal("scala") + getMapStorageEntryFor("t1", "torvalds".getBytes) should equal(None) + + getMapStorageEntryFor("t2", "torvalds".getBytes) should equal(None) + + getMapStorageFor("t1").map { case (k, v) => (new String(k), new String(v)) } should equal (l) + + removeMapStorageFor("t1", "gosling".getBytes) + getMapStorageSizeFor("t1") should equal(2) + + removeMapStorageFor("t1") + getMapStorageSizeFor("t1") should equal(0) + } + + it("should do proper range queries") { + import HbaseStorageBackend._ + val l = List( + ("bjarne stroustrup", "c++"), + ("martin odersky", "scala"), + ("james gosling", "java"), + ("yukihiro matsumoto", "ruby"), + ("slava pestov", "factor"), + ("rich hickey", "clojure"), + ("ola bini", "ioke"), + ("dennis ritchie", "c"), + ("larry wall", "perl"), + ("guido van rossum", "python"), + ("james strachan", "groovy")) + val rl = List( + ("james gosling", "java"), + ("james strachan", "groovy"), + ("larry wall", "perl"), + ("martin odersky", "scala"), + ("ola bini", "ioke"), ("rich hickey", "clojure"), + ("slava pestov", "factor")) + insertMapStorageEntriesFor("t1", l.map { case (k, v) => (k.getBytes, v.getBytes) }) + getMapStorageSizeFor("t1") should equal(l.size) + getMapStorageRangeFor("t1", None, None, 100).map { case (k, v) => (new String(k), new String(v)) } should equal(l.sortWith(_._1 < _._1)) + getMapStorageRangeFor("t1", Option("james gosling".getBytes), Option("slava pestov".getBytes), 100).map { case (k, v) => (new String(k), new String(v)) } should equal(rl.sortWith(_._1 < _._1)) + getMapStorageRangeFor("t1", None, None, 5).map { case (k, v) => (new String(k), new String(v)) }.size should equal(5) + } + + } + + describe("persistent vectors") { + it("should insert a single value") { + import HbaseStorageBackend._ + + insertVectorStorageEntryFor("t1", "martin odersky".getBytes) + insertVectorStorageEntryFor("t1", "james gosling".getBytes) + new String(getVectorStorageEntryFor("t1", 0)) should equal("james gosling") + new String(getVectorStorageEntryFor("t1", 1)) should equal("martin odersky") + } + + it("should insert multiple values") { + import HbaseStorageBackend._ + + insertVectorStorageEntryFor("t1", "martin odersky".getBytes) + insertVectorStorageEntryFor("t1", "james gosling".getBytes) + insertVectorStorageEntriesFor("t1", List("ola bini".getBytes, "james strachan".getBytes, "dennis ritchie".getBytes)) + new String(getVectorStorageEntryFor("t1", 0)) should equal("ola bini") + new String(getVectorStorageEntryFor("t1", 1)) should equal("james strachan") + new String(getVectorStorageEntryFor("t1", 2)) should equal("dennis ritchie") + new String(getVectorStorageEntryFor("t1", 3)) should equal("james gosling") + new String(getVectorStorageEntryFor("t1", 4)) should equal("martin odersky") + } + + it("should fetch a range of values") { + import HbaseStorageBackend._ + + insertVectorStorageEntryFor("t1", "martin odersky".getBytes) + insertVectorStorageEntryFor("t1", "james gosling".getBytes) + getVectorStorageSizeFor("t1") should equal(2) + insertVectorStorageEntriesFor("t1", List("ola bini".getBytes, "james strachan".getBytes, "dennis ritchie".getBytes)) + getVectorStorageRangeFor("t1", None, None, 100).map(new String(_)) should equal(List("ola bini", "james strachan", "dennis ritchie", "james gosling", "martin odersky")) + getVectorStorageRangeFor("t1", Some(0), Some(5), 100).map(new String(_)) should equal(List("ola bini", "james strachan", "dennis ritchie", "james gosling", "martin odersky")) + getVectorStorageRangeFor("t1", Some(2), Some(5), 100).map(new String(_)) should equal(List("dennis ritchie", "james gosling", "martin odersky")) + getVectorStorageRangeFor("t1", Some(0), Some(0), 100).size should equal(0) + getVectorStorageSizeFor("t1") should equal(5) + } + + it("should insert and query complex structures") { + import HbaseStorageBackend._ + import sjson.json.DefaultProtocol._ + import sjson.json.JsonSerialization._ + + // a list[AnyRef] should be added successfully + val l = List("ola bini".getBytes, tobinary(List(100, 200, 300)), tobinary(List(1, 2, 3))) + + // for id = t1 + insertVectorStorageEntriesFor("t1", l) + new String(getVectorStorageEntryFor("t1", 0)) should equal("ola bini") + frombinary[List[Int]](getVectorStorageEntryFor("t1", 1)) should equal(List(100, 200, 300)) + frombinary[List[Int]](getVectorStorageEntryFor("t1", 2)) should equal(List(1, 2, 3)) + + getVectorStorageSizeFor("t1") should equal(3) + + // some more for id = t1 + val m = List(tobinary(Map(1 -> "dg", 2 -> "mc", 3 -> "nd")), tobinary(List("martin odersky", "james gosling"))) + insertVectorStorageEntriesFor("t1", m) + + // size should add up + getVectorStorageSizeFor("t1") should equal(5) + + // now for a diff id + insertVectorStorageEntriesFor("t2", l) + getVectorStorageSizeFor("t2") should equal(3) + } + } + + describe("persistent refs") { + it("should insert a ref") { + import HbaseStorageBackend._ + + insertRefStorageFor("t1", "martin odersky".getBytes) + new String(getRefStorageFor("t1").get) should equal("martin odersky") + insertRefStorageFor("t1", "james gosling".getBytes) + new String(getRefStorageFor("t1").get) should equal("james gosling") + getRefStorageFor("t2") should equal(None) + } + } +} diff --git a/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseTicket343Spec.scala b/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseTicket343Spec.scala new file mode 100644 index 0000000000..d61b82fa87 --- /dev/null +++ b/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseTicket343Spec.scala @@ -0,0 +1,347 @@ +package se.scalablesolutions.akka.persistence.hbase + +import org.scalatest.Spec +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import se.scalablesolutions.akka.actor.{Actor, ActorRef} +import se.scalablesolutions.akka.config.OneForOneStrategy +import Actor._ +import se.scalablesolutions.akka.stm.global._ +import se.scalablesolutions.akka.config.ScalaConfig._ +import se.scalablesolutions.akka.util.Logging + +import HbaseStorageBackend._ + +case class GET(k: String) +case class SET(k: String, v: String) +case class REM(k: String) +case class CONTAINS(k: String) +case object MAP_SIZE +case class MSET(kvs: List[(String, String)]) +case class REMOVE_AFTER_PUT(kvsToAdd: List[(String, String)], ksToRem: List[String]) +case class CLEAR_AFTER_PUT(kvsToAdd: List[(String, String)]) +case class PUT_WITH_SLICE(kvsToAdd: List[(String, String)], start: String, cnt: Int) +case class PUT_REM_WITH_SLICE(kvsToAdd: List[(String, String)], ksToRem: List[String], start: String, cnt: Int) + +case class VADD(v: String) +case class VUPD(i: Int, v: String) +case class VUPD_AND_ABORT(i: Int, v: String) +case class VGET(i: Int) +case object VSIZE +case class VGET_AFTER_VADD(vsToAdd: List[String], isToFetch: List[Int]) +case class VADD_WITH_SLICE(vsToAdd: List[String], start: Int, cnt: Int) + +object Storage { + class HbaseSampleMapStorage extends Actor { + self.lifeCycle = Some(LifeCycle(Permanent)) + val FOO_MAP = "akka.sample.map" + + private var fooMap = atomic { HbaseStorage.getMap(FOO_MAP) } + + def receive = { + case SET(k, v) => + atomic { + fooMap += (k.getBytes, v.getBytes) + } + self.reply((k, v)) + + case GET(k) => + val v = atomic { + fooMap.get(k.getBytes).map(new String(_)).getOrElse(k + " Not found") + } + self.reply(v) + + case REM(k) => + val v = atomic { + fooMap -= k.getBytes + } + self.reply(k) + + case CONTAINS(k) => + val v = atomic { + fooMap contains k.getBytes + } + self.reply(v) + + case MAP_SIZE => + val v = atomic { + fooMap.size + } + self.reply(v) + + case MSET(kvs) => atomic { + kvs.foreach {kv => fooMap += (kv._1.getBytes, kv._2.getBytes) } + } + self.reply(kvs.size) + + case REMOVE_AFTER_PUT(kvs2add, ks2rem) => atomic { + kvs2add.foreach {kv => + fooMap += (kv._1.getBytes, kv._2.getBytes) + } + + ks2rem.foreach {k => + fooMap -= k.getBytes + }} + self.reply(fooMap.size) + + case CLEAR_AFTER_PUT(kvs2add) => atomic { + kvs2add.foreach {kv => + fooMap += (kv._1.getBytes, kv._2.getBytes) + } + fooMap.clear + } + self.reply(true) + + case PUT_WITH_SLICE(kvs2add, from, cnt) => + val v = atomic { + kvs2add.foreach {kv => + fooMap += (kv._1.getBytes, kv._2.getBytes) + } + fooMap.slice(Some(from.getBytes), cnt) + } + self.reply(v: List[(Array[Byte], Array[Byte])]) + + case PUT_REM_WITH_SLICE(kvs2add, ks2rem, from, cnt) => + val v = atomic { + kvs2add.foreach {kv => + fooMap += (kv._1.getBytes, kv._2.getBytes) + } + ks2rem.foreach {k => + fooMap -= k.getBytes + } + fooMap.slice(Some(from.getBytes), cnt) + } + self.reply(v: List[(Array[Byte], Array[Byte])]) + } + } + + class HbaseSampleVectorStorage extends Actor { + self.lifeCycle = Some(LifeCycle(Permanent)) + val FOO_VECTOR = "akka.sample.vector" + + private var fooVector = atomic { HbaseStorage.getVector(FOO_VECTOR) } + + def receive = { + case VADD(v) => + val size = + atomic { + fooVector + v.getBytes + fooVector length + } + self.reply(size) + + case VGET(index) => + val ind = + atomic { + fooVector get index + } + self.reply(ind) + + case VGET_AFTER_VADD(vs, is) => + val els = + atomic { + vs.foreach(fooVector + _.getBytes) + (is.foldRight(List[Array[Byte]]())(fooVector.get(_) :: _)).map(new String(_)) + } + self.reply(els) + + case VUPD_AND_ABORT(index, value) => + val l = + atomic { + fooVector.update(index, value.getBytes) + // force fail + fooVector get 100 + } + self.reply(index) + + case VADD_WITH_SLICE(vs, s, c) => + val l = + atomic { + vs.foreach(fooVector + _.getBytes) + fooVector.slice(Some(s), None, c) + } + self.reply(l.map(new String(_))) + } + } +} + +import Storage._ + +@RunWith(classOf[JUnitRunner]) +class HbaseTicket343Spec extends Spec with ShouldMatchers with BeforeAndAfterAll with BeforeAndAfterEach { + + import org.apache.hadoop.hbase.HBaseTestingUtility + + val testUtil = new HBaseTestingUtility + + override def beforeAll { + testUtil.startMiniCluster + } + + override def afterAll { + testUtil.shutdownMiniCluster + } + + override def beforeEach { + HbaseStorageBackend.drop + } + + override def afterEach { + HbaseStorageBackend.drop + } + + describe("Ticket 343 Issue #1") { + it("remove after put should work within the same transaction") { + val proc = actorOf[HbaseSampleMapStorage] + proc.start + + (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft")) + (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft") + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1) + + (proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3) + + (proc !! GET("dg")).getOrElse("Get failed") should equal("1") + (proc !! GET("mc")).getOrElse("Get failed") should equal("2") + (proc !! GET("nd")).getOrElse("Get failed") should equal("3") + + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(4) + + val add = List(("a", "1"), ("b", "2"), ("c", "3")) + val rem = List("a", "debasish") + (proc !! REMOVE_AFTER_PUT(add, rem)).getOrElse("REMOVE_AFTER_PUT failed") should equal(5) + + (proc !! GET("debasish")).getOrElse("debasish not found") should equal("debasish Not found") + (proc !! GET("a")).getOrElse("a not found") should equal("a Not found") + + (proc !! GET("b")).getOrElse("b not found") should equal("2") + + (proc !! CONTAINS("b")).getOrElse("b not found") should equal(true) + (proc !! CONTAINS("debasish")).getOrElse("debasish not found") should equal(false) + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(5) + proc.stop + } + } + + describe("Ticket 343 Issue #2") { + it("clear after put should work within the same transaction") { + val proc = actorOf[HbaseSampleMapStorage] + proc.start + + (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft")) + (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft") + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1) + + val add = List(("a", "1"), ("b", "2"), ("c", "3")) + (proc !! CLEAR_AFTER_PUT(add)).getOrElse("CLEAR_AFTER_PUT failed") should equal(true) + + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(0) + proc.stop + } + } + + describe("Ticket 343 Issue #3") { + it("map size should change after the transaction") { + val proc = actorOf[HbaseSampleMapStorage] + proc.start + + (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft")) + (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft") + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1) + + (proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3) + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(4) + + (proc !! GET("dg")).getOrElse("Get failed") should equal("1") + (proc !! GET("mc")).getOrElse("Get failed") should equal("2") + (proc !! GET("nd")).getOrElse("Get failed") should equal("3") + proc.stop + } + } + + describe("slice test") { + it("should pass") { + val proc = actorOf[HbaseSampleMapStorage] + proc.start + + (proc !! SET("debasish", "anshinsoft")).getOrElse("Set failed") should equal(("debasish", "anshinsoft")) + (proc !! GET("debasish")).getOrElse("Get failed") should equal("anshinsoft") + // (proc !! MAP_SIZE).getOrElse("Size failed") should equal(1) + + (proc !! MSET(List(("dg", "1"), ("mc", "2"), ("nd", "3")))).getOrElse("Mset failed") should equal(3) + (proc !! MAP_SIZE).getOrElse("Size failed") should equal(4) + + (proc !! PUT_WITH_SLICE(List(("ec", "1"), ("tb", "2"), ("mc", "10")), "dg", 3)).get.asInstanceOf[List[(Array[Byte], Array[Byte])]].map { case (k, v) => (new String(k), new String(v)) } should equal(List(("dg", "1"), ("ec", "1"), ("mc", "10"))) + + (proc !! PUT_REM_WITH_SLICE(List(("fc", "1"), ("gb", "2"), ("xy", "10")), List("tb", "fc"), "dg", 5)).get.asInstanceOf[List[(Array[Byte], Array[Byte])]].map { case (k, v) => (new String(k), new String(v)) } should equal(List(("dg", "1"), ("ec", "1"), ("gb", "2"), ("mc", "10"), ("nd", "3"))) + proc.stop + } + } + + describe("Ticket 343 Issue #4") { + it("vector get should not ignore elements that were in vector before transaction") { + + val proc = actorOf[HbaseSampleVectorStorage] + proc.start + + // add 4 elements in separate transactions + (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1) + (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2) + (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3) + (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4) + + new String((proc !! VGET(0)).get.asInstanceOf[Array[Byte]] ) should equal("nilanjan") + new String((proc !! VGET(1)).get.asInstanceOf[Array[Byte]] ) should equal("ramanendu") + new String((proc !! VGET(2)).get.asInstanceOf[Array[Byte]] ) should equal("maulindu") + new String((proc !! VGET(3)).get.asInstanceOf[Array[Byte]] ) should equal("debasish") + + // now add 3 more and do gets in the same transaction + (proc !! VGET_AFTER_VADD(List("a", "b", "c"), List(0, 2, 4))).get.asInstanceOf[List[String]] should equal(List("c", "a", "ramanendu")) + proc.stop + } + } + + describe("Ticket 343 Issue #6") { + it("vector update should not ignore transaction") { + val proc = actorOf[HbaseSampleVectorStorage] + proc.start + + // add 4 elements in separate transactions + (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1) + (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2) + (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3) + (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4) + + evaluating { + (proc !! VUPD_AND_ABORT(0, "virat")).getOrElse("VUPD_AND_ABORT failed") + } should produce [Exception] + + // update aborts and hence values will remain unchanged + new String((proc !! VGET(0)).get.asInstanceOf[Array[Byte]] ) should equal("nilanjan") + proc.stop + } + } + + describe("Ticket 343 Issue #5") { + it("vector slice() should not ignore elements added in current transaction") { + val proc = actorOf[HbaseSampleVectorStorage] + proc.start + + // add 4 elements in separate transactions + (proc !! VADD("debasish")).getOrElse("VADD failed") should equal(1) + (proc !! VADD("maulindu")).getOrElse("VADD failed") should equal(2) + (proc !! VADD("ramanendu")).getOrElse("VADD failed") should equal(3) + (proc !! VADD("nilanjan")).getOrElse("VADD failed") should equal(4) + + // slice with no new elements added in current transaction + (proc !! VADD_WITH_SLICE(List(), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("maulindu", "debasish")) + + // slice with new elements added in current transaction + (proc !! VADD_WITH_SLICE(List("a", "b", "c", "d"), 2, 2)).getOrElse("VADD_WITH_SLICE failed") should equal(Vector("b", "a")) + proc.stop + } + } +} diff --git a/akka-persistence/akka-persistence-hbase/src/test/scala/SimpleHbaseTest.scala b/akka-persistence/akka-persistence-hbase/src/test/scala/SimpleHbaseTest.scala new file mode 100644 index 0000000000..82e8f32533 --- /dev/null +++ b/akka-persistence/akka-persistence-hbase/src/test/scala/SimpleHbaseTest.scala @@ -0,0 +1,62 @@ +package se.scalablesolutions.akka.persistence.hbase + +import org.scalatest.Spec +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.BeforeAndAfterAll +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith +import org.junit.Test + +import org.apache.hadoop.hbase.HBaseTestingUtility + +@RunWith(classOf[JUnitRunner]) +class PersistenceSpec extends Spec with BeforeAndAfterAll with ShouldMatchers { + + import org.apache.hadoop.hbase.HBaseTestingUtility + + val testUtil = new HBaseTestingUtility + + override def beforeAll { + testUtil.startMiniCluster + } + + override def afterAll { + testUtil.shutdownMiniCluster + } + + describe("simple hbase persistence test") { + it("should create a table") { + import org.apache.hadoop.hbase.util.Bytes + import org.apache.hadoop.hbase.HTableDescriptor + import org.apache.hadoop.hbase.HColumnDescriptor + import org.apache.hadoop.hbase.client.HBaseAdmin + import org.apache.hadoop.hbase.client.HTable + + val descriptor = new HTableDescriptor(Bytes.toBytes("ATable")) + descriptor.addFamily(new HColumnDescriptor(Bytes.toBytes("Family1"))) + descriptor.addFamily(new HColumnDescriptor(Bytes.toBytes("Family2"))) + val admin = new HBaseAdmin(testUtil.getConfiguration) + admin.createTable(descriptor) + val table = new HTable(testUtil.getConfiguration, Bytes.toBytes("ATable")) + + table should not equal (null) + } + + it("should use the quorum read from the akka configuration and access the table") { + import se.scalablesolutions.akka.config.Config.config + import org.apache.hadoop.hbase.HBaseConfiguration + import org.apache.hadoop.hbase.client.HBaseAdmin + import org.apache.hadoop.hbase.client.HTable + + val HBASE_ZOOKEEPER_QUORUM = config.getString("akka.storage.hbase.zookeeper.quorum", "0") + HBASE_ZOOKEEPER_QUORUM should not equal ("0") + HBASE_ZOOKEEPER_QUORUM should equal("localhost") + + val configuration = new HBaseConfiguration + configuration.set("hbase.zookeeper.quorum", HBASE_ZOOKEEPER_QUORUM) + val admin = new HBaseAdmin(configuration) + admin.tableExists("ATable") should equal(true) + } + } + +} diff --git a/akka-remote/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java b/akka-remote/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java index ab05937f61..31ae9650d4 100644 --- a/akka-remote/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java +++ b/akka-remote/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java @@ -653,6 +653,360 @@ public final class RemoteProtocol { // @@protoc_insertion_point(class_scope:RemoteActorRefProtocol) } + public static final class RemoteTypedActorRefProtocol extends + com.google.protobuf.GeneratedMessage { + // Use RemoteTypedActorRefProtocol.newBuilder() to construct. + private RemoteTypedActorRefProtocol() { + initFields(); + } + private RemoteTypedActorRefProtocol(boolean noInit) {} + + private static final RemoteTypedActorRefProtocol defaultInstance; + public static RemoteTypedActorRefProtocol getDefaultInstance() { + return defaultInstance; + } + + public RemoteTypedActorRefProtocol getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return se.scalablesolutions.akka.remote.protocol.RemoteProtocol.internal_static_RemoteTypedActorRefProtocol_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return se.scalablesolutions.akka.remote.protocol.RemoteProtocol.internal_static_RemoteTypedActorRefProtocol_fieldAccessorTable; + } + + // required .RemoteActorRefProtocol actorRef = 1; + public static final int ACTORREF_FIELD_NUMBER = 1; + private boolean hasActorRef; + private se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol actorRef_; + public boolean hasActorRef() { return hasActorRef; } + public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol getActorRef() { return actorRef_; } + + // required string interfaceName = 2; + public static final int INTERFACENAME_FIELD_NUMBER = 2; + private boolean hasInterfaceName; + private java.lang.String interfaceName_ = ""; + public boolean hasInterfaceName() { return hasInterfaceName; } + public java.lang.String getInterfaceName() { return interfaceName_; } + + private void initFields() { + actorRef_ = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.getDefaultInstance(); + } + public final boolean isInitialized() { + if (!hasActorRef) return false; + if (!hasInterfaceName) return false; + if (!getActorRef().isInitialized()) return false; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (hasActorRef()) { + output.writeMessage(1, getActorRef()); + } + if (hasInterfaceName()) { + output.writeString(2, getInterfaceName()); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (hasActorRef()) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(1, getActorRef()); + } + if (hasInterfaceName()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(2, getInterfaceName()); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder { + private se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol result; + + // Construct using se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol.newBuilder() + private Builder() {} + + private static Builder create() { + Builder builder = new Builder(); + builder.result = new se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol(); + return builder; + } + + protected se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol internalGetResult() { + return result; + } + + public Builder clear() { + if (result == null) { + throw new IllegalStateException( + "Cannot call clear() after build()."); + } + result = new se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol(); + return this; + } + + public Builder clone() { + return create().mergeFrom(result); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol.getDescriptor(); + } + + public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol getDefaultInstanceForType() { + return se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol.getDefaultInstance(); + } + + public boolean isInitialized() { + return result.isInitialized(); + } + public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol build() { + if (result != null && !isInitialized()) { + throw newUninitializedMessageException(result); + } + return buildPartial(); + } + + private se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + if (!isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return buildPartial(); + } + + public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol buildPartial() { + if (result == null) { + throw new IllegalStateException( + "build() has already been called on this Builder."); + } + se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol returnMe = result; + result = null; + return returnMe; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol) { + return mergeFrom((se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol other) { + if (other == se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol.getDefaultInstance()) return this; + if (other.hasActorRef()) { + mergeActorRef(other.getActorRef()); + } + if (other.hasInterfaceName()) { + setInterfaceName(other.getInterfaceName()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder( + this.getUnknownFields()); + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + this.setUnknownFields(unknownFields.build()); + return this; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + this.setUnknownFields(unknownFields.build()); + return this; + } + break; + } + case 10: { + se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.Builder subBuilder = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.newBuilder(); + if (hasActorRef()) { + subBuilder.mergeFrom(getActorRef()); + } + input.readMessage(subBuilder, extensionRegistry); + setActorRef(subBuilder.buildPartial()); + break; + } + case 18: { + setInterfaceName(input.readString()); + break; + } + } + } + } + + + // required .RemoteActorRefProtocol actorRef = 1; + public boolean hasActorRef() { + return result.hasActorRef(); + } + public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol getActorRef() { + return result.getActorRef(); + } + public Builder setActorRef(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasActorRef = true; + result.actorRef_ = value; + return this; + } + public Builder setActorRef(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.Builder builderForValue) { + result.hasActorRef = true; + result.actorRef_ = builderForValue.build(); + return this; + } + public Builder mergeActorRef(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol value) { + if (result.hasActorRef() && + result.actorRef_ != se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.getDefaultInstance()) { + result.actorRef_ = + se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.newBuilder(result.actorRef_).mergeFrom(value).buildPartial(); + } else { + result.actorRef_ = value; + } + result.hasActorRef = true; + return this; + } + public Builder clearActorRef() { + result.hasActorRef = false; + result.actorRef_ = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.getDefaultInstance(); + return this; + } + + // required string interfaceName = 2; + public boolean hasInterfaceName() { + return result.hasInterfaceName(); + } + public java.lang.String getInterfaceName() { + return result.getInterfaceName(); + } + public Builder setInterfaceName(java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasInterfaceName = true; + result.interfaceName_ = value; + return this; + } + public Builder clearInterfaceName() { + result.hasInterfaceName = false; + result.interfaceName_ = getDefaultInstance().getInterfaceName(); + return this; + } + + // @@protoc_insertion_point(builder_scope:RemoteTypedActorRefProtocol) + } + + static { + defaultInstance = new RemoteTypedActorRefProtocol(true); + se.scalablesolutions.akka.remote.protocol.RemoteProtocol.internalForceInit(); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:RemoteTypedActorRefProtocol) + } + public static final class SerializedActorRefProtocol extends com.google.protobuf.GeneratedMessage { // Use SerializedActorRefProtocol.newBuilder() to construct. @@ -1559,6 +1913,360 @@ public final class RemoteProtocol { // @@protoc_insertion_point(class_scope:SerializedActorRefProtocol) } + public static final class SerializedTypedActorRefProtocol extends + com.google.protobuf.GeneratedMessage { + // Use SerializedTypedActorRefProtocol.newBuilder() to construct. + private SerializedTypedActorRefProtocol() { + initFields(); + } + private SerializedTypedActorRefProtocol(boolean noInit) {} + + private static final SerializedTypedActorRefProtocol defaultInstance; + public static SerializedTypedActorRefProtocol getDefaultInstance() { + return defaultInstance; + } + + public SerializedTypedActorRefProtocol getDefaultInstanceForType() { + return defaultInstance; + } + + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return se.scalablesolutions.akka.remote.protocol.RemoteProtocol.internal_static_SerializedTypedActorRefProtocol_descriptor; + } + + protected com.google.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return se.scalablesolutions.akka.remote.protocol.RemoteProtocol.internal_static_SerializedTypedActorRefProtocol_fieldAccessorTable; + } + + // required .SerializedActorRefProtocol actorRef = 1; + public static final int ACTORREF_FIELD_NUMBER = 1; + private boolean hasActorRef; + private se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol actorRef_; + public boolean hasActorRef() { return hasActorRef; } + public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol getActorRef() { return actorRef_; } + + // required string interfaceName = 2; + public static final int INTERFACENAME_FIELD_NUMBER = 2; + private boolean hasInterfaceName; + private java.lang.String interfaceName_ = ""; + public boolean hasInterfaceName() { return hasInterfaceName; } + public java.lang.String getInterfaceName() { return interfaceName_; } + + private void initFields() { + actorRef_ = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.getDefaultInstance(); + } + public final boolean isInitialized() { + if (!hasActorRef) return false; + if (!hasInterfaceName) return false; + if (!getActorRef().isInitialized()) return false; + return true; + } + + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (hasActorRef()) { + output.writeMessage(1, getActorRef()); + } + if (hasInterfaceName()) { + output.writeString(2, getInterfaceName()); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (hasActorRef()) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(1, getActorRef()); + } + if (hasInterfaceName()) { + size += com.google.protobuf.CodedOutputStream + .computeStringSize(2, getInterfaceName()); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + public static final class Builder extends + com.google.protobuf.GeneratedMessage.Builder { + private se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol result; + + // Construct using se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol.newBuilder() + private Builder() {} + + private static Builder create() { + Builder builder = new Builder(); + builder.result = new se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol(); + return builder; + } + + protected se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol internalGetResult() { + return result; + } + + public Builder clear() { + if (result == null) { + throw new IllegalStateException( + "Cannot call clear() after build()."); + } + result = new se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol(); + return this; + } + + public Builder clone() { + return create().mergeFrom(result); + } + + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol.getDescriptor(); + } + + public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol getDefaultInstanceForType() { + return se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol.getDefaultInstance(); + } + + public boolean isInitialized() { + return result.isInitialized(); + } + public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol build() { + if (result != null && !isInitialized()) { + throw newUninitializedMessageException(result); + } + return buildPartial(); + } + + private se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol buildParsed() + throws com.google.protobuf.InvalidProtocolBufferException { + if (!isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return buildPartial(); + } + + public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol buildPartial() { + if (result == null) { + throw new IllegalStateException( + "build() has already been called on this Builder."); + } + se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol returnMe = result; + result = null; + return returnMe; + } + + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol) { + return mergeFrom((se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol other) { + if (other == se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol.getDefaultInstance()) return this; + if (other.hasActorRef()) { + mergeActorRef(other.getActorRef()); + } + if (other.hasInterfaceName()) { + setInterfaceName(other.getInterfaceName()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder( + this.getUnknownFields()); + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + this.setUnknownFields(unknownFields.build()); + return this; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + this.setUnknownFields(unknownFields.build()); + return this; + } + break; + } + case 10: { + se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.Builder subBuilder = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.newBuilder(); + if (hasActorRef()) { + subBuilder.mergeFrom(getActorRef()); + } + input.readMessage(subBuilder, extensionRegistry); + setActorRef(subBuilder.buildPartial()); + break; + } + case 18: { + setInterfaceName(input.readString()); + break; + } + } + } + } + + + // required .SerializedActorRefProtocol actorRef = 1; + public boolean hasActorRef() { + return result.hasActorRef(); + } + public se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol getActorRef() { + return result.getActorRef(); + } + public Builder setActorRef(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasActorRef = true; + result.actorRef_ = value; + return this; + } + public Builder setActorRef(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.Builder builderForValue) { + result.hasActorRef = true; + result.actorRef_ = builderForValue.build(); + return this; + } + public Builder mergeActorRef(se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol value) { + if (result.hasActorRef() && + result.actorRef_ != se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.getDefaultInstance()) { + result.actorRef_ = + se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.newBuilder(result.actorRef_).mergeFrom(value).buildPartial(); + } else { + result.actorRef_ = value; + } + result.hasActorRef = true; + return this; + } + public Builder clearActorRef() { + result.hasActorRef = false; + result.actorRef_ = se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.getDefaultInstance(); + return this; + } + + // required string interfaceName = 2; + public boolean hasInterfaceName() { + return result.hasInterfaceName(); + } + public java.lang.String getInterfaceName() { + return result.getInterfaceName(); + } + public Builder setInterfaceName(java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasInterfaceName = true; + result.interfaceName_ = value; + return this; + } + public Builder clearInterfaceName() { + result.hasInterfaceName = false; + result.interfaceName_ = getDefaultInstance().getInterfaceName(); + return this; + } + + // @@protoc_insertion_point(builder_scope:SerializedTypedActorRefProtocol) + } + + static { + defaultInstance = new SerializedTypedActorRefProtocol(true); + se.scalablesolutions.akka.remote.protocol.RemoteProtocol.internalForceInit(); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:SerializedTypedActorRefProtocol) + } + public static final class MessageProtocol extends com.google.protobuf.GeneratedMessage { // Use MessageProtocol.newBuilder() to construct. @@ -5700,11 +6408,21 @@ public final class RemoteProtocol { private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_RemoteActorRefProtocol_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_RemoteTypedActorRefProtocol_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_RemoteTypedActorRefProtocol_fieldAccessorTable; private static com.google.protobuf.Descriptors.Descriptor internal_static_SerializedActorRefProtocol_descriptor; private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_SerializedActorRefProtocol_fieldAccessorTable; + private static com.google.protobuf.Descriptors.Descriptor + internal_static_SerializedTypedActorRefProtocol_descriptor; + private static + com.google.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_SerializedTypedActorRefProtocol_fieldAccessorTable; private static com.google.protobuf.Descriptors.Descriptor internal_static_MessageProtocol_descriptor; private static @@ -5767,49 +6485,54 @@ public final class RemoteProtocol { "\n\024RemoteProtocol.proto\"v\n\026RemoteActorRef" + "Protocol\022\014\n\004uuid\030\001 \002(\t\022\026\n\016actorClassname" + "\030\002 \002(\t\022%\n\013homeAddress\030\003 \002(\0132\020.AddressPro" + - "tocol\022\017\n\007timeout\030\004 \001(\004\"\200\003\n\032SerializedAct" + - "orRefProtocol\022\014\n\004uuid\030\001 \002(\t\022\n\n\002id\030\002 \002(\t\022" + - "\026\n\016actorClassname\030\003 \002(\t\022)\n\017originalAddre" + - "ss\030\004 \002(\0132\020.AddressProtocol\022\025\n\ractorInsta" + - "nce\030\005 \001(\014\022\033\n\023serializerClassname\030\006 \001(\t\022\024" + - "\n\014isTransactor\030\007 \001(\010\022\017\n\007timeout\030\010 \001(\004\022\026\n" + - "\016receiveTimeout\030\t \001(\004\022%\n\tlifeCycle\030\n \001(\013", - "2\022.LifeCycleProtocol\022+\n\nsupervisor\030\013 \001(\013" + - "2\027.RemoteActorRefProtocol\022\024\n\014hotswapStac" + - "k\030\014 \001(\014\022(\n\010messages\030\r \003(\0132\026.RemoteReques" + - "tProtocol\"r\n\017MessageProtocol\0225\n\023serializ" + - "ationScheme\030\001 \002(\0162\030.SerializationSchemeT" + - "ype\022\017\n\007message\030\002 \002(\014\022\027\n\017messageManifest\030" + - "\003 \001(\014\"\236\001\n\021ActorInfoProtocol\022\014\n\004uuid\030\001 \002(" + - "\t\022\016\n\006target\030\002 \002(\t\022\017\n\007timeout\030\003 \002(\004\022\035\n\tac" + - "torType\030\004 \002(\0162\n.ActorType\022/\n\016typedActorI" + - "nfo\030\005 \001(\0132\027.TypedActorInfoProtocol\022\n\n\002id", - "\030\006 \001(\t\";\n\026TypedActorInfoProtocol\022\021\n\tinte" + - "rface\030\001 \002(\t\022\016\n\006method\030\002 \002(\t\"\352\001\n\025RemoteRe" + - "questProtocol\022\n\n\002id\030\001 \002(\004\022!\n\007message\030\002 \002" + - "(\0132\020.MessageProtocol\022%\n\tactorInfo\030\003 \002(\0132" + - "\022.ActorInfoProtocol\022\020\n\010isOneWay\030\004 \002(\010\022\026\n" + - "\016supervisorUuid\030\005 \001(\t\022\'\n\006sender\030\006 \001(\0132\027." + - "RemoteActorRefProtocol\022(\n\010metadata\030\007 \003(\013" + - "2\026.MetadataEntryProtocol\"\324\001\n\023RemoteReply" + - "Protocol\022\n\n\002id\030\001 \002(\004\022!\n\007message\030\002 \001(\0132\020." + - "MessageProtocol\022%\n\texception\030\003 \001(\0132\022.Exc", - "eptionProtocol\022\026\n\016supervisorUuid\030\004 \001(\t\022\017" + - "\n\007isActor\030\005 \002(\010\022\024\n\014isSuccessful\030\006 \002(\010\022(\n" + - "\010metadata\030\007 \003(\0132\026.MetadataEntryProtocol\"" + - ")\n\014UuidProtocol\022\014\n\004high\030\001 \002(\004\022\013\n\003low\030\002 \002" + - "(\004\"3\n\025MetadataEntryProtocol\022\013\n\003key\030\001 \002(\t" + - "\022\r\n\005value\030\002 \002(\014\"6\n\021LifeCycleProtocol\022!\n\t" + - "lifeCycle\030\001 \002(\0162\016.LifeCycleType\"1\n\017Addre" + - "ssProtocol\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002 \002" + - "(\r\"7\n\021ExceptionProtocol\022\021\n\tclassname\030\001 \002" + - "(\t\022\017\n\007message\030\002 \002(\t*=\n\tActorType\022\017\n\013SCAL", - "A_ACTOR\020\001\022\016\n\nJAVA_ACTOR\020\002\022\017\n\013TYPED_ACTOR" + - "\020\003*]\n\027SerializationSchemeType\022\010\n\004JAVA\020\001\022" + - "\013\n\007SBINARY\020\002\022\016\n\nSCALA_JSON\020\003\022\r\n\tJAVA_JSO" + - "N\020\004\022\014\n\010PROTOBUF\020\005*-\n\rLifeCycleType\022\r\n\tPE" + - "RMANENT\020\001\022\r\n\tTEMPORARY\020\002B-\n)se.scalables" + - "olutions.akka.remote.protocolH\001" + "tocol\022\017\n\007timeout\030\004 \001(\004\"_\n\033RemoteTypedAct" + + "orRefProtocol\022)\n\010actorRef\030\001 \002(\0132\027.Remote" + + "ActorRefProtocol\022\025\n\rinterfaceName\030\002 \002(\t\"" + + "\200\003\n\032SerializedActorRefProtocol\022\014\n\004uuid\030\001" + + " \002(\t\022\n\n\002id\030\002 \002(\t\022\026\n\016actorClassname\030\003 \002(\t" + + "\022)\n\017originalAddress\030\004 \002(\0132\020.AddressProto" + + "col\022\025\n\ractorInstance\030\005 \001(\014\022\033\n\023serializer", + "Classname\030\006 \001(\t\022\024\n\014isTransactor\030\007 \001(\010\022\017\n" + + "\007timeout\030\010 \001(\004\022\026\n\016receiveTimeout\030\t \001(\004\022%" + + "\n\tlifeCycle\030\n \001(\0132\022.LifeCycleProtocol\022+\n" + + "\nsupervisor\030\013 \001(\0132\027.RemoteActorRefProtoc" + + "ol\022\024\n\014hotswapStack\030\014 \001(\014\022(\n\010messages\030\r \003" + + "(\0132\026.RemoteRequestProtocol\"g\n\037Serialized" + + "TypedActorRefProtocol\022-\n\010actorRef\030\001 \002(\0132" + + "\033.SerializedActorRefProtocol\022\025\n\rinterfac" + + "eName\030\002 \002(\t\"r\n\017MessageProtocol\0225\n\023serial" + + "izationScheme\030\001 \002(\0162\030.SerializationSchem", + "eType\022\017\n\007message\030\002 \002(\014\022\027\n\017messageManifes" + + "t\030\003 \001(\014\"\236\001\n\021ActorInfoProtocol\022\014\n\004uuid\030\001 " + + "\002(\t\022\016\n\006target\030\002 \002(\t\022\017\n\007timeout\030\003 \002(\004\022\035\n\t" + + "actorType\030\004 \002(\0162\n.ActorType\022/\n\016typedActo" + + "rInfo\030\005 \001(\0132\027.TypedActorInfoProtocol\022\n\n\002" + + "id\030\006 \001(\t\";\n\026TypedActorInfoProtocol\022\021\n\tin" + + "terface\030\001 \002(\t\022\016\n\006method\030\002 \002(\t\"\352\001\n\025Remote" + + "RequestProtocol\022\n\n\002id\030\001 \002(\004\022!\n\007message\030\002" + + " \002(\0132\020.MessageProtocol\022%\n\tactorInfo\030\003 \002(" + + "\0132\022.ActorInfoProtocol\022\020\n\010isOneWay\030\004 \002(\010\022", + "\026\n\016supervisorUuid\030\005 \001(\t\022\'\n\006sender\030\006 \001(\0132" + + "\027.RemoteActorRefProtocol\022(\n\010metadata\030\007 \003" + + "(\0132\026.MetadataEntryProtocol\"\324\001\n\023RemoteRep" + + "lyProtocol\022\n\n\002id\030\001 \002(\004\022!\n\007message\030\002 \001(\0132" + + "\020.MessageProtocol\022%\n\texception\030\003 \001(\0132\022.E" + + "xceptionProtocol\022\026\n\016supervisorUuid\030\004 \001(\t" + + "\022\017\n\007isActor\030\005 \002(\010\022\024\n\014isSuccessful\030\006 \002(\010\022" + + "(\n\010metadata\030\007 \003(\0132\026.MetadataEntryProtoco" + + "l\")\n\014UuidProtocol\022\014\n\004high\030\001 \002(\004\022\013\n\003low\030\002" + + " \002(\004\"3\n\025MetadataEntryProtocol\022\013\n\003key\030\001 \002", + "(\t\022\r\n\005value\030\002 \002(\014\"6\n\021LifeCycleProtocol\022!" + + "\n\tlifeCycle\030\001 \002(\0162\016.LifeCycleType\"1\n\017Add" + + "ressProtocol\022\020\n\010hostname\030\001 \002(\t\022\014\n\004port\030\002" + + " \002(\r\"7\n\021ExceptionProtocol\022\021\n\tclassname\030\001" + + " \002(\t\022\017\n\007message\030\002 \002(\t*=\n\tActorType\022\017\n\013SC" + + "ALA_ACTOR\020\001\022\016\n\nJAVA_ACTOR\020\002\022\017\n\013TYPED_ACT" + + "OR\020\003*]\n\027SerializationSchemeType\022\010\n\004JAVA\020" + + "\001\022\013\n\007SBINARY\020\002\022\016\n\nSCALA_JSON\020\003\022\r\n\tJAVA_J" + + "SON\020\004\022\014\n\010PROTOBUF\020\005*-\n\rLifeCycleType\022\r\n\t" + + "PERMANENT\020\001\022\r\n\tTEMPORARY\020\002B-\n)se.scalabl", + "esolutions.akka.remote.protocolH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -5824,16 +6547,32 @@ public final class RemoteProtocol { new java.lang.String[] { "Uuid", "ActorClassname", "HomeAddress", "Timeout", }, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.class, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.Builder.class); - internal_static_SerializedActorRefProtocol_descriptor = + internal_static_RemoteTypedActorRefProtocol_descriptor = getDescriptor().getMessageTypes().get(1); + internal_static_RemoteTypedActorRefProtocol_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_RemoteTypedActorRefProtocol_descriptor, + new java.lang.String[] { "ActorRef", "InterfaceName", }, + se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol.class, + se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteTypedActorRefProtocol.Builder.class); + internal_static_SerializedActorRefProtocol_descriptor = + getDescriptor().getMessageTypes().get(2); internal_static_SerializedActorRefProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_SerializedActorRefProtocol_descriptor, new java.lang.String[] { "Uuid", "Id", "ActorClassname", "OriginalAddress", "ActorInstance", "SerializerClassname", "IsTransactor", "Timeout", "ReceiveTimeout", "LifeCycle", "Supervisor", "HotswapStack", "Messages", }, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.class, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedActorRefProtocol.Builder.class); + internal_static_SerializedTypedActorRefProtocol_descriptor = + getDescriptor().getMessageTypes().get(3); + internal_static_SerializedTypedActorRefProtocol_fieldAccessorTable = new + com.google.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_SerializedTypedActorRefProtocol_descriptor, + new java.lang.String[] { "ActorRef", "InterfaceName", }, + se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol.class, + se.scalablesolutions.akka.remote.protocol.RemoteProtocol.SerializedTypedActorRefProtocol.Builder.class); internal_static_MessageProtocol_descriptor = - getDescriptor().getMessageTypes().get(2); + getDescriptor().getMessageTypes().get(4); internal_static_MessageProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_MessageProtocol_descriptor, @@ -5841,7 +6580,7 @@ public final class RemoteProtocol { se.scalablesolutions.akka.remote.protocol.RemoteProtocol.MessageProtocol.class, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.MessageProtocol.Builder.class); internal_static_ActorInfoProtocol_descriptor = - getDescriptor().getMessageTypes().get(3); + getDescriptor().getMessageTypes().get(5); internal_static_ActorInfoProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ActorInfoProtocol_descriptor, @@ -5849,7 +6588,7 @@ public final class RemoteProtocol { se.scalablesolutions.akka.remote.protocol.RemoteProtocol.ActorInfoProtocol.class, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.ActorInfoProtocol.Builder.class); internal_static_TypedActorInfoProtocol_descriptor = - getDescriptor().getMessageTypes().get(4); + getDescriptor().getMessageTypes().get(6); internal_static_TypedActorInfoProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_TypedActorInfoProtocol_descriptor, @@ -5857,7 +6596,7 @@ public final class RemoteProtocol { se.scalablesolutions.akka.remote.protocol.RemoteProtocol.TypedActorInfoProtocol.class, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.TypedActorInfoProtocol.Builder.class); internal_static_RemoteRequestProtocol_descriptor = - getDescriptor().getMessageTypes().get(5); + getDescriptor().getMessageTypes().get(7); internal_static_RemoteRequestProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RemoteRequestProtocol_descriptor, @@ -5865,7 +6604,7 @@ public final class RemoteProtocol { se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol.class, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteRequestProtocol.Builder.class); internal_static_RemoteReplyProtocol_descriptor = - getDescriptor().getMessageTypes().get(6); + getDescriptor().getMessageTypes().get(8); internal_static_RemoteReplyProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RemoteReplyProtocol_descriptor, @@ -5873,7 +6612,7 @@ public final class RemoteProtocol { se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteReplyProtocol.class, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.RemoteReplyProtocol.Builder.class); internal_static_UuidProtocol_descriptor = - getDescriptor().getMessageTypes().get(7); + getDescriptor().getMessageTypes().get(9); internal_static_UuidProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_UuidProtocol_descriptor, @@ -5881,7 +6620,7 @@ public final class RemoteProtocol { se.scalablesolutions.akka.remote.protocol.RemoteProtocol.UuidProtocol.class, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.UuidProtocol.Builder.class); internal_static_MetadataEntryProtocol_descriptor = - getDescriptor().getMessageTypes().get(8); + getDescriptor().getMessageTypes().get(10); internal_static_MetadataEntryProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_MetadataEntryProtocol_descriptor, @@ -5889,7 +6628,7 @@ public final class RemoteProtocol { se.scalablesolutions.akka.remote.protocol.RemoteProtocol.MetadataEntryProtocol.class, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.MetadataEntryProtocol.Builder.class); internal_static_LifeCycleProtocol_descriptor = - getDescriptor().getMessageTypes().get(9); + getDescriptor().getMessageTypes().get(11); internal_static_LifeCycleProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_LifeCycleProtocol_descriptor, @@ -5897,7 +6636,7 @@ public final class RemoteProtocol { se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol.class, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.LifeCycleProtocol.Builder.class); internal_static_AddressProtocol_descriptor = - getDescriptor().getMessageTypes().get(10); + getDescriptor().getMessageTypes().get(12); internal_static_AddressProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_AddressProtocol_descriptor, @@ -5905,7 +6644,7 @@ public final class RemoteProtocol { se.scalablesolutions.akka.remote.protocol.RemoteProtocol.AddressProtocol.class, se.scalablesolutions.akka.remote.protocol.RemoteProtocol.AddressProtocol.Builder.class); internal_static_ExceptionProtocol_descriptor = - getDescriptor().getMessageTypes().get(11); + getDescriptor().getMessageTypes().get(13); internal_static_ExceptionProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_ExceptionProtocol_descriptor, diff --git a/akka-remote/src/main/protocol/RemoteProtocol.proto b/akka-remote/src/main/protocol/RemoteProtocol.proto index 4343fecf7f..e84ae9f18e 100644 --- a/akka-remote/src/main/protocol/RemoteProtocol.proto +++ b/akka-remote/src/main/protocol/RemoteProtocol.proto @@ -22,6 +22,15 @@ message RemoteActorRefProtocol { optional uint64 timeout = 4; } +/** + * Defines a remote ActorRef that "remembers" and uses its original typed Actor instance + * on the original node. + */ +message RemoteTypedActorRefProtocol { + required RemoteActorRefProtocol actorRef = 1; + required string interfaceName = 2; +} + /** * Defines a fully serialized remote ActorRef (with serialized Actor instance) * that is about to be instantiated on the remote node. It is fully disconnected @@ -43,6 +52,16 @@ message SerializedActorRefProtocol { repeated RemoteRequestProtocol messages = 13; } +/** + * Defines a fully serialized remote ActorRef (with serialized typed actor instance) + * that is about to be instantiated on the remote node. It is fully disconnected + * from its original host. + */ +message SerializedTypedActorRefProtocol { + required SerializedActorRefProtocol actorRef = 1; + required string interfaceName = 2; +} + /** * Defines a message. */ diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index c1f25b6d4f..b258c4867d 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -67,6 +67,7 @@ object RemoteNode extends RemoteServer * @author Jonas Bonér */ object RemoteServer { + val UUID_PREFIX = "uuid:" val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") val PORT = config.getInt("akka.remote.server.port", 9999) @@ -123,18 +124,20 @@ object RemoteServer { private class RemoteActorSet { private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef] + private[RemoteServer] val actorsByUuid = new ConcurrentHashMap[String, ActorRef] private[RemoteServer] val typedActors = new ConcurrentHashMap[String, AnyRef] + private[RemoteServer] val typedActorsByUuid = new ConcurrentHashMap[String, AnyRef] } private val guard = new ReadWriteGuard private val remoteActorSets = Map[Address, RemoteActorSet]() private val remoteServers = Map[Address, RemoteServer]() - private[akka] def registerActor(address: InetSocketAddress, uuid: String, actor: ActorRef) = guard.withWriteGuard { - actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid, actor) + private[akka] def registerActorByUuid(address: InetSocketAddress, uuid: String, actor: ActorRef) = guard.withWriteGuard { + actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actorsByUuid.put(uuid, actor) } - private[akka] def registerTypedActor(address: InetSocketAddress, uuid: String, typedActor: AnyRef) = guard.withWriteGuard { + private[akka] def registerTypedActorByUuid(address: InetSocketAddress, uuid: String, typedActor: AnyRef) = guard.withWriteGuard { actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(uuid, typedActor) } @@ -192,6 +195,7 @@ case class RemoteServerClientDisconnected(@BeanProperty val server: RemoteServer * @author Jonas Bonér */ class RemoteServer extends Logging with ListenerManagement { + import RemoteServer._ def name = "RemoteServer@" + hostname + ":" + port private[akka] var address = RemoteServer.Address(RemoteServer.HOSTNAME,RemoteServer.PORT) @@ -283,10 +287,11 @@ class RemoteServer extends Logging with ListenerManagement { * @param typedActor typed actor to register */ def registerTypedActor(id: String, typedActor: AnyRef): Unit = synchronized { - val typedActors = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors - if (!typedActors.contains(id)) { - log.debug("Registering server side remote actor [%s] with id [%s] on [%s:%d]", typedActor.getClass.getName, id, hostname, port) - typedActors.put(id, typedActor) + log.debug("Registering server side remote typed actor [%s] with id [%s]", typedActor.getClass.getName, id) + if (id.startsWith(UUID_PREFIX)) { + registerTypedActor(id.substring(UUID_PREFIX.length), typedActor, typedActorsByUuid()) + } else { + registerTypedActor(id, typedActor, typedActors()) } } @@ -301,12 +306,27 @@ class RemoteServer extends Logging with ListenerManagement { * NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself. */ def register(id: String, actorRef: ActorRef): Unit = synchronized { + log.debug("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, id) + if (id.startsWith(UUID_PREFIX)) { + register(id.substring(UUID_PREFIX.length), actorRef, actorsByUuid()) + } else { + register(id, actorRef, actors()) + } + } + + private def register(id: String, actorRef: ActorRef, registry: ConcurrentHashMap[String, ActorRef]) { if (_isRunning) { - val actorMap = actors() - if (!actorMap.contains(id)) { + if (!registry.contains(id)) { if (!actorRef.isRunning) actorRef.start - log.debug("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, id) - actorMap.put(id, actorRef) + registry.put(id, actorRef) + } + } + } + + private def registerTypedActor(id: String, typedActor: AnyRef, registry: ConcurrentHashMap[String, AnyRef]) { + if (_isRunning) { + if (!registry.contains(id)) { + registry.put(id, typedActor) } } } @@ -319,7 +339,7 @@ class RemoteServer extends Logging with ListenerManagement { log.debug("Unregistering server side remote actor [%s] with id [%s:%s]", actorRef.actorClass.getName, actorRef.id, actorRef.uuid) val actorMap = actors() actorMap remove actorRef.id - if (actorRef.registeredInRemoteNodeDuringSerialization) actorMap remove actorRef.uuid + if (actorRef.registeredInRemoteNodeDuringSerialization) actorsByUuid() remove actorRef.uuid } } @@ -331,10 +351,15 @@ class RemoteServer extends Logging with ListenerManagement { def unregister(id: String):Unit = synchronized { if (_isRunning) { log.info("Unregistering server side remote actor with id [%s]", id) - val actorMap = actors() - val actorRef = actorMap get id - actorMap remove id - if (actorRef.registeredInRemoteNodeDuringSerialization) actorMap remove actorRef.uuid + if (id.startsWith(UUID_PREFIX)) { + actorsByUuid().remove(id.substring(UUID_PREFIX.length)) + } else { + val actorRef = actors().get(id) + if (actorRef.registeredInRemoteNodeDuringSerialization) { + actorsByUuid() remove actorRef.uuid + } + actors() remove id + } } } @@ -346,8 +371,11 @@ class RemoteServer extends Logging with ListenerManagement { def unregisterTypedActor(id: String):Unit = synchronized { if (_isRunning) { log.info("Unregistering server side remote typed actor with id [%s]", id) - val registeredTypedActors = typedActors() - registeredTypedActors.remove(id) + if (id.startsWith(UUID_PREFIX)) { + typedActorsByUuid().remove(id.substring(UUID_PREFIX.length)) + } else { + typedActors().remove(id) + } } } @@ -355,8 +383,10 @@ class RemoteServer extends Logging with ListenerManagement { protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message) - private[akka] def actors() = RemoteServer.actorsFor(address).actors - private[akka] def typedActors() = RemoteServer.actorsFor(address).typedActors + private[akka] def actors() = RemoteServer.actorsFor(address).actors + private[akka] def actorsByUuid() = RemoteServer.actorsFor(address).actorsByUuid + private[akka] def typedActors() = RemoteServer.actorsFor(address).typedActors + private[akka] def typedActorsByUuid() = RemoteServer.actorsFor(address).typedActorsByUuid } object RemoteServerSslContext { @@ -419,6 +449,7 @@ class RemoteServerHandler( val openChannels: ChannelGroup, val applicationLoader: Option[ClassLoader], val server: RemoteServer) extends SimpleChannelUpstreamHandler with Logging { + import RemoteServer._ val AW_PROXY_PREFIX = "$$ProxiedByAW".intern applicationLoader.foreach(MessageSerializer.setClassLoader(_)) @@ -477,11 +508,12 @@ class RemoteServerHandler( private def handleRemoteRequestProtocol(request: RemoteRequestProtocol, channel: Channel) = { log.debug("Received RemoteRequestProtocol[\n%s]", request.toString) - val actorType = request.getActorInfo.getActorType - if (actorType == SCALA_ACTOR) dispatchToActor(request, channel) - else if (actorType == JAVA_ACTOR) throw new IllegalActorStateException("ActorType JAVA_ACTOR is currently not supported") - else if (actorType == TYPED_ACTOR) dispatchToTypedActor(request, channel) - else throw new IllegalActorStateException("Unknown ActorType [" + actorType + "]") + request.getActorInfo.getActorType match { + case SCALA_ACTOR => dispatchToActor(request, channel) + case TYPED_ACTOR => dispatchToTypedActor(request, channel) + case JAVA_ACTOR => throw new IllegalActorStateException("ActorType JAVA_ACTOR is currently not supported") + case other => throw new IllegalActorStateException("Unknown ActorType [" + other + "]") + } } private def dispatchToActor(request: RemoteRequestProtocol, channel: Channel) = { @@ -565,32 +597,23 @@ class RemoteServerHandler( } } - /** - * Find a registered actor by ID (default) or UUID. - * Actors are registered by id apart from registering during serialization see SerializationProtocol. - */ - private def findActorByIdOrUuid(id: String, uuid: String) : ActorRef = { - val registeredActors = server.actors() - var actorRefOrNull = registeredActors get id - if (actorRefOrNull eq null) { - actorRefOrNull = registeredActors get uuid - } - actorRefOrNull + private def findActorById(id: String) : ActorRef = { + server.actors().get(id) } - /** - * Find a registered typed actor by ID (default) or UUID. - * Actors are registered by id apart from registering during serialization see SerializationProtocol. - */ - private def findTypedActorByIdOrUUid(id: String, uuid: String) : AnyRef = { - val registeredActors = server.typedActors() - var actorRefOrNull = registeredActors get id - if (actorRefOrNull eq null) { - actorRefOrNull = registeredActors get uuid - } - actorRefOrNull + private def findActorByUuid(uuid: String) : ActorRef = { + server.actorsByUuid().get(uuid) } + private def findTypedActorById(id: String) : AnyRef = { + server.typedActors().get(id) + } + + private def findTypedActorByUuid(uuid: String) : AnyRef = { + server.typedActorsByUuid().get(uuid) + } + + /** * Creates a new instance of the actor with name, uuid and timeout specified as arguments. * @@ -605,8 +628,12 @@ class RemoteServerHandler( val name = actorInfo.getTarget val timeout = actorInfo.getTimeout - val actorRefOrNull = findActorByIdOrUuid(id, uuid) - + val actorRefOrNull = if (id.startsWith(UUID_PREFIX)) { + findActorByUuid(id.substring(UUID_PREFIX.length)) + } else { + findActorById(id) + } + if (actorRefOrNull eq null) { try { log.info("Creating a new remote actor [%s:%s]", name, uuid) @@ -632,7 +659,11 @@ class RemoteServerHandler( val uuid = actorInfo.getUuid val id = actorInfo.getId - val typedActorOrNull = findTypedActorByIdOrUUid(id, uuid) + val typedActorOrNull = if (id.startsWith(UUID_PREFIX)) { + findTypedActorByUuid(id.substring(UUID_PREFIX.length)) + } else { + findTypedActorById(id) + } if (typedActorOrNull eq null) { val typedActorInfo = actorInfo.getTypedActorInfo diff --git a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala index 1577b62e13..c9b443f4ec 100644 --- a/akka-remote/src/main/scala/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/serialization/SerializationProtocol.scala @@ -4,7 +4,6 @@ package se.scalablesolutions.akka.serialization -import se.scalablesolutions.akka.actor.{Actor, ActorRef, LocalActorRef, RemoteActorRef, IllegalActorStateException, ActorType} import se.scalablesolutions.akka.stm.global._ import se.scalablesolutions.akka.stm.TransactionManagement._ import se.scalablesolutions.akka.stm.TransactionManagement @@ -16,6 +15,7 @@ import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, F import se.scalablesolutions.akka.config.ScalaConfig._ import com.google.protobuf.ByteString +import se.scalablesolutions.akka.actor._ /** * Type class definition for Actor Serialization @@ -37,13 +37,14 @@ trait Format[T <: Actor] extends FromBinary[T] with ToBinary[T] * Create a Format object with the client actor as the implementation of the type class * *

- * object BinaryFormatMyStatelessActor {
+ * object BinaryFormatMyStatelessActor  {
  *   implicit object MyStatelessActorFormat extends StatelessActorFormat[MyStatelessActor]
  * }
  * 
*/ trait StatelessActorFormat[T <: Actor] extends Format[T] { def fromBinary(bytes: Array[Byte], act: T) = act + def toBinary(ac: T) = Array.empty[Byte] } @@ -54,16 +55,18 @@ trait StatelessActorFormat[T <: Actor] extends Format[T] { * a serializer object * *
- * object BinaryFormatMyJavaSerializableActor {
- *   implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor] {
+ * object BinaryFormatMyJavaSerializableActor  {
+ *   implicit object MyJavaSerializableActorFormat extends SerializerBasedActorFormat[MyJavaSerializableActor]  {
  *     val serializer = Serializer.Java
- *   }
+ * }
  * }
  * 
*/ trait SerializerBasedActorFormat[T <: Actor] extends Format[T] { val serializer: Serializer + def fromBinary(bytes: Array[Byte], act: T) = serializer.fromBinary(bytes, Some(act.self.actorClass)).asInstanceOf[T] + def toBinary(ac: T) = serializer.toBinary(ac) } @@ -71,7 +74,6 @@ trait SerializerBasedActorFormat[T <: Actor] extends Format[T] { * Module for local actor serialization. */ object ActorSerialization { - def fromBinary[T <: Actor](bytes: Array[Byte])(implicit format: Format[T]): ActorRef = fromBinaryToLocalActorRef(bytes, format) @@ -86,7 +88,7 @@ object ActorSerialization { def toBinaryJ[T <: Actor](a: ActorRef, format: Format[T], srlMailBox: Boolean = true): Array[Byte] = toBinary(a, srlMailBox)(format) - private def toSerializedActorRefProtocol[T <: Actor]( + private[akka] def toSerializedActorRefProtocol[T <: Actor]( actorRef: ActorRef, format: Format[T], serializeMailBox: Boolean = true): SerializedActorRefProtocol = { val lifeCycleProtocol: Option[LifeCycleProtocol] = { def setScope(builder: LifeCycleProtocol.Builder, scope: Scope) = scope match { @@ -103,17 +105,17 @@ object ActorSerialization { } val originalAddress = AddressProtocol.newBuilder - .setHostname(actorRef.homeAddress.getHostName) - .setPort(actorRef.homeAddress.getPort) - .build + .setHostname(actorRef.homeAddress.getHostName) + .setPort(actorRef.homeAddress.getPort) + .build val builder = SerializedActorRefProtocol.newBuilder - .setUuid(actorRef.uuid) - .setId(actorRef.id) - .setActorClassname(actorRef.actorClass.getName) - .setOriginalAddress(originalAddress) - .setIsTransactor(actorRef.isTransactor) - .setTimeout(actorRef.timeout) + .setUuid(actorRef.uuid) + .setId(actorRef.id) + .setActorClassname(actorRef.actorClass.getName) + .setOriginalAddress(originalAddress) + .setIsTransactor(actorRef.isTransactor) + .setTimeout(actorRef.timeout) if (serializeMailBox == true) { val messages = @@ -150,33 +152,33 @@ object ActorSerialization { private def fromBinaryToLocalActorRef[T <: Actor](bytes: Array[Byte], format: Format[T]): ActorRef = fromProtobufToLocalActorRef(SerializedActorRefProtocol.newBuilder.mergeFrom(bytes).build, format, None) - private def fromProtobufToLocalActorRef[T <: Actor]( - protocol: SerializedActorRefProtocol, format: Format[T], loader: Option[ClassLoader]): ActorRef = { + private[akka] def fromProtobufToLocalActorRef[T <: Actor]( + protocol: SerializedActorRefProtocol, format: Format[T], loader: Option[ClassLoader]): ActorRef = { Actor.log.debug("Deserializing SerializedActorRefProtocol to LocalActorRef:\n" + protocol) val serializer = - if (format.isInstanceOf[SerializerBasedActorFormat[_]]) - Some(format.asInstanceOf[SerializerBasedActorFormat[_]].serializer) - else None + if (format.isInstanceOf[SerializerBasedActorFormat[_]]) + Some(format.asInstanceOf[SerializerBasedActorFormat[_]].serializer) + else None val lifeCycle = - if (protocol.hasLifeCycle) { - val lifeCycleProtocol = protocol.getLifeCycle - Some(if (lifeCycleProtocol.getLifeCycle == LifeCycleType.PERMANENT) LifeCycle(Permanent) - else if (lifeCycleProtocol.getLifeCycle == LifeCycleType.TEMPORARY) LifeCycle(Temporary) - else throw new IllegalActorStateException("LifeCycle type is not valid: " + lifeCycleProtocol.getLifeCycle)) - } else None + if (protocol.hasLifeCycle) { + val lifeCycleProtocol = protocol.getLifeCycle + Some(if (lifeCycleProtocol.getLifeCycle == LifeCycleType.PERMANENT) LifeCycle(Permanent) + else if (lifeCycleProtocol.getLifeCycle == LifeCycleType.TEMPORARY) LifeCycle(Temporary) + else throw new IllegalActorStateException("LifeCycle type is not valid: " + lifeCycleProtocol.getLifeCycle)) + } else None val supervisor = - if (protocol.hasSupervisor) - Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(protocol.getSupervisor, loader)) - else None + if (protocol.hasSupervisor) + Some(RemoteActorSerialization.fromProtobufToRemoteActorRef(protocol.getSupervisor, loader)) + else None val hotswap = - if (serializer.isDefined && protocol.hasHotswapStack) Some(serializer.get + if (serializer.isDefined && protocol.hasHotswapStack) Some(serializer.get .fromBinary(protocol.getHotswapStack.toByteArray, Some(classOf[PartialFunction[Any, Unit]])) .asInstanceOf[PartialFunction[Any, Unit]]) - else None + else None val classLoader = loader.getOrElse(getClass.getClassLoader) @@ -218,9 +220,9 @@ object RemoteActorSerialization { def fromBinaryToRemoteActorRef(bytes: Array[Byte]): ActorRef = fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, None) - /** - * Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance. - */ + /** + * Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance. + */ def fromBinaryToRemoteActorRef(bytes: Array[Byte], loader: ClassLoader): ActorRef = fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader)) @@ -249,26 +251,26 @@ object RemoteActorSerialization { if (!registeredInRemoteNodeDuringSerialization) { Actor.log.debug("Register serialized Actor [%s] as remote @ [%s:%s]", actorClass.getName, host, port) RemoteServer.getOrCreateServer(homeAddress) - RemoteServer.registerActor(homeAddress, uuid, ar) + RemoteServer.registerActorByUuid(homeAddress, uuid, ar) registeredInRemoteNodeDuringSerialization = true } RemoteActorRefProtocol.newBuilder - .setUuid(uuid) - .setActorClassname(actorClass.getName) - .setHomeAddress(AddressProtocol.newBuilder.setHostname(host).setPort(port).build) - .setTimeout(timeout) - .build + .setUuid(uuid) + .setActorClassname(actorClass.getName) + .setHomeAddress(AddressProtocol.newBuilder.setHostname(host).setPort(port).build) + .setTimeout(timeout) + .build } def createRemoteRequestProtocolBuilder( - actorRef: ActorRef, - message: Any, - isOneWay: Boolean, - senderOption: Option[ActorRef], - typedActorInfo: Option[Tuple2[String, String]], - actorType: ActorType): - RemoteRequestProtocol.Builder = { + actorRef: ActorRef, + message: Any, + isOneWay: Boolean, + senderOption: Option[ActorRef], + typedActorInfo: Option[Tuple2[String, String]], + actorType: ActorType): + RemoteRequestProtocol.Builder = { import actorRef._ val actorInfoBuilder = ActorInfoProtocol.newBuilder @@ -277,12 +279,13 @@ object RemoteActorSerialization { .setTarget(actorClassName) .setTimeout(timeout) - typedActorInfo.foreach { typedActor => - actorInfoBuilder.setTypedActorInfo( - TypedActorInfoProtocol.newBuilder - .setInterface(typedActor._1) - .setMethod(typedActor._2) - .build) + typedActorInfo.foreach { + typedActor => + actorInfoBuilder.setTypedActorInfo( + TypedActorInfoProtocol.newBuilder + .setInterface(typedActor._1) + .setMethod(typedActor._2) + .build) } actorType match { @@ -300,10 +303,110 @@ object RemoteActorSerialization { val id = registerSupervisorAsRemoteActor if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) - senderOption.foreach { sender => - RemoteServer.getOrCreateServer(sender.homeAddress).register(sender.uuid, sender) - requestBuilder.setSender(toRemoteActorRefProtocol(sender)) + senderOption.foreach { + sender => + RemoteServer.getOrCreateServer(sender.homeAddress).register(sender.uuid, sender) + requestBuilder.setSender(toRemoteActorRefProtocol(sender)) } requestBuilder } + + +} + + +/** + * Module for local typed actor serialization. + */ +object TypedActorSerialization { + + def fromBinary[T <: Actor, U <: AnyRef](bytes: Array[Byte])(implicit format: Format[T]): U = + fromBinaryToLocalTypedActorRef(bytes, format) + + def toBinary[T <: Actor](proxy: AnyRef)(implicit format: Format[T]): Array[Byte] = { + toSerializedTypedActorRefProtocol(proxy, format).toByteArray + } + + // wrapper for implicits to be used by Java + def fromBinaryJ[T <: Actor, U <: AnyRef](bytes: Array[Byte], format: Format[T]): U = + fromBinary(bytes)(format) + + // wrapper for implicits to be used by Java + def toBinaryJ[T <: Actor](a: AnyRef, format: Format[T]): Array[Byte] = + toBinary(a)(format) + + private def toSerializedTypedActorRefProtocol[T <: Actor]( + proxy: AnyRef, format: Format[T]): SerializedTypedActorRefProtocol = { + + val init = AspectInitRegistry.initFor(proxy) + if (init == null) throw new IllegalArgumentException("Proxy for typed actor could not be found in AspectInitRegistry.") + + SerializedTypedActorRefProtocol.newBuilder + .setActorRef(ActorSerialization.toSerializedActorRefProtocol(init.actorRef, format)) + .setInterfaceName(init.interfaceClass.getName) + .build + } + + private def fromBinaryToLocalTypedActorRef[T <: Actor, U <: AnyRef](bytes: Array[Byte], format: Format[T]): U = + fromProtobufToLocalTypedActorRef(SerializedTypedActorRefProtocol.newBuilder.mergeFrom(bytes).build, format, None) + + private def fromProtobufToLocalTypedActorRef[T <: Actor, U <: AnyRef]( + protocol: SerializedTypedActorRefProtocol, format: Format[T], loader: Option[ClassLoader]): U = { + Actor.log.debug("Deserializing SerializedTypedActorRefProtocol to LocalActorRef:\n" + protocol) + val actorRef = ActorSerialization.fromProtobufToLocalActorRef(protocol.getActorRef, format, loader) + val intfClass = toClass(loader, protocol.getInterfaceName) + TypedActor.newInstance(intfClass, actorRef).asInstanceOf[U] + } + + private[akka] def toClass[U <: AnyRef](loader: Option[ClassLoader], name: String): Class[U] = { + val classLoader = loader.getOrElse(getClass.getClassLoader) + val clazz = classLoader.loadClass(name) + clazz.asInstanceOf[Class[U]] + } +} + +/** + * Module for remote typed actor serialization. + */ +object RemoteTypedActorSerialization { + /** + * Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance. + */ + def fromBinaryToRemoteTypedActorRef[T <: AnyRef](bytes: Array[Byte]): T = + fromProtobufToRemoteTypedActorRef(RemoteTypedActorRefProtocol.newBuilder.mergeFrom(bytes).build, None) + + /** + * Deserializes a byte array (Array[Byte]) into a AW RemoteActorRef proxy. + */ + def fromBinaryToRemoteTypedActorRef[T <: AnyRef](bytes: Array[Byte], loader: ClassLoader): T = + fromProtobufToRemoteTypedActorRef(RemoteTypedActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader)) + + /** + * Serialize as AW RemoteActorRef proxy. + */ + def toBinary[T <: Actor](proxy: AnyRef): Array[Byte] = { + toRemoteTypedActorRefProtocol(proxy).toByteArray + } + + /** + * Deserializes a RemoteTypedActorRefProtocol Protocol Buffers (protobuf) Message into AW RemoteActorRef proxy. + */ + private[akka] def fromProtobufToRemoteTypedActorRef[T](protocol: RemoteTypedActorRefProtocol, loader: Option[ClassLoader]): T = { + Actor.log.debug("Deserializing RemoteTypedActorRefProtocol to AW RemoteActorRef proxy:\n" + protocol) + val actorRef = RemoteActorSerialization.fromProtobufToRemoteActorRef(protocol.getActorRef, loader) + val intfClass = TypedActorSerialization.toClass(loader, protocol.getInterfaceName) + TypedActor.createProxyForRemoteActorRef(intfClass, actorRef).asInstanceOf[T] + } + + /** + * Serializes the AW TypedActor proxy into a Protocol Buffers (protobuf) Message. + */ + def toRemoteTypedActorRefProtocol(proxy: AnyRef): RemoteTypedActorRefProtocol = { + val init = AspectInitRegistry.initFor(proxy) + RemoteTypedActorRefProtocol.newBuilder + .setActorRef(RemoteActorSerialization.toRemoteActorRefProtocol(init.actorRef)) + .setInterfaceName(init.interfaceClass.getName) + .build + } + } diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala index 59f122c656..fbf723ece5 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteActorSpec.scala @@ -79,7 +79,6 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { } } - @Test def shouldSendWithBang { val actor = RemoteClient.actorFor( @@ -178,5 +177,41 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite { assert(actor2.id == actor3.id) } + @Test + def shouldFindActorByUuid { + val actor1 = actorOf[RemoteActorSpecActorUnidirectional] + val actor2 = actorOf[RemoteActorSpecActorUnidirectional] + server.register("uuid:" + actor1.uuid, actor1) + server.register("my-service", actor2) + + val ref1 = RemoteClient.actorFor("uuid:" + actor1.uuid, HOSTNAME, PORT) + val ref2 = RemoteClient.actorFor("my-service", HOSTNAME, PORT) + + ref1 ! "OneWay" + assert(RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS)) + ref1.stop + ref2 ! "OneWay" + ref2.stop + + } + + @Test + def shouldRegisterAndUnregister { + val actor1 = actorOf[RemoteActorSpecActorUnidirectional] + server.register("my-service-1", actor1) + assert(server.actors().get("my-service-1") != null, "actor registered") + server.unregister("my-service-1") + assert(server.actors().get("my-service-1") == null, "actor unregistered") + } + + @Test + def shouldRegisterAndUnregisterByUuid { + val actor1 = actorOf[RemoteActorSpecActorUnidirectional] + server.register("uuid:" + actor1.uuid, actor1) + assert(server.actorsByUuid().get(actor1.uuid) != null, "actor registered") + server.unregister("uuid:" + actor1.uuid) + assert(server.actorsByUuid().get(actor1.uuid) == null, "actor unregistered") + } + } diff --git a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala index b800fbf2c3..f50c3e6652 100644 --- a/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala +++ b/akka-remote/src/test/scala/remote/ServerInitiatedRemoteTypedActorSpec.scala @@ -103,9 +103,34 @@ class ServerInitiatedRemoteTypedActorSpec extends it("should register and unregister typed actors") { val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) server.registerTypedActor("my-test-service", typedActor) - assert(server.typedActors().get("my-test-service") != null) + assert(server.typedActors().get("my-test-service") != null, "typed actor registered") server.unregisterTypedActor("my-test-service") - assert(server.typedActors().get("my-test-service") == null) + assert(server.typedActors().get("my-test-service") == null, "typed actor unregistered") + } + + it("should register and unregister typed actors by uuid") { + val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) + val init = AspectInitRegistry.initFor(typedActor) + val uuid = "uuid:" + init.actorRef.uuid + server.registerTypedActor(uuid, typedActor) + assert(server.typedActorsByUuid().get(init.actorRef.uuid) != null, "typed actor registered") + server.unregisterTypedActor(uuid) + assert(server.typedActorsByUuid().get(init.actorRef.uuid) == null, "typed actor unregistered") + } + + it("should find typed actors by uuid") { + val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) + val init = AspectInitRegistry.initFor(typedActor) + val uuid = "uuid:" + init.actorRef.uuid + server.registerTypedActor(uuid, typedActor) + assert(server.typedActorsByUuid().get(init.actorRef.uuid) != null, "typed actor registered") + + val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], uuid, HOSTNAME, PORT) + expect("oneway") { + actor.oneWay + oneWayLog.poll(5, TimeUnit.SECONDS) + } + } } } diff --git a/akka-remote/src/test/scala/serialization/TypedActorSerializationSpec.scala b/akka-remote/src/test/scala/serialization/TypedActorSerializationSpec.scala new file mode 100644 index 0000000000..ccf4d05f7f --- /dev/null +++ b/akka-remote/src/test/scala/serialization/TypedActorSerializationSpec.scala @@ -0,0 +1,166 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +package se.scalablesolutions.akka.actor.serialization + +import org.scalatest.Spec +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.BeforeAndAfterAll +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import se.scalablesolutions.akka.serialization._ +import se.scalablesolutions.akka.actor._ + +import TypedActorSerialization._ +import Actor._ +import se.scalablesolutions.akka.remote.{RemoteClient, RemoteServer} +import se.scalablesolutions.akka.actor.remote.ServerInitiatedRemoteActorSpec.RemoteActorSpecActorUnidirectional + +@RunWith(classOf[JUnitRunner]) +class TypedActorSerializationSpec extends + Spec with + ShouldMatchers with + BeforeAndAfterAll { + + var server1: RemoteServer = null + var typedActor: MyTypedActor = null + + override def beforeAll = { + server1 = new RemoteServer().start("localhost", 9991) + typedActor = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyTypedActorImpl], 1000) + server1.registerTypedActor("typed-actor-service", typedActor) + Thread.sleep(1000) + } + + // make sure the servers shutdown cleanly after the test has finished + override def afterAll = { + try { + TypedActor.stop(typedActor) + server1.shutdown + RemoteClient.shutdownAll + Thread.sleep(1000) + } catch { + case e => () + } + } + + object MyTypedStatelessActorFormat extends StatelessActorFormat[MyStatelessTypedActorImpl] + + class MyTypedActorFormat extends Format[MyTypedActorImpl] { + def fromBinary(bytes: Array[Byte], act: MyTypedActorImpl) = { + val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.Counter])).asInstanceOf[ProtobufProtocol.Counter] + act.count = p.getCount + act + } + def toBinary(ac: MyTypedActorImpl) = + ProtobufProtocol.Counter.newBuilder.setCount(ac.count).build.toByteArray + } + + class MyTypedActorWithDualCounterFormat extends Format[MyTypedActorWithDualCounter] { + def fromBinary(bytes: Array[Byte], act: MyTypedActorWithDualCounter) = { + val p = Serializer.Protobuf.fromBinary(bytes, Some(classOf[ProtobufProtocol.DualCounter])).asInstanceOf[ProtobufProtocol.DualCounter] + act.count1 = p.getCount1 + act.count2 = p.getCount2 + act + } + def toBinary(ac: MyTypedActorWithDualCounter) = + ProtobufProtocol.DualCounter.newBuilder.setCount1(ac.count1).setCount2(ac.count2).build.toByteArray + } + + + describe("Serializable typed actor") { + + it("should be able to serialize and de-serialize a stateless typed actor") { + val typedActor1 = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyStatelessTypedActorImpl], 1000) + typedActor1.requestReply("hello") should equal("world") + typedActor1.requestReply("hello") should equal("world") + + val bytes = toBinaryJ(typedActor1, MyTypedStatelessActorFormat) + val typedActor2: MyTypedActor = fromBinaryJ(bytes, MyTypedStatelessActorFormat) + typedActor2.requestReply("hello") should equal("world") + } + + it("should be able to serialize and de-serialize a stateful typed actor") { + val typedActor1 = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyTypedActorImpl], 1000) + typedActor1.requestReply("hello") should equal("world 1") + typedActor1.requestReply("scala") should equal("hello scala 2") + + val f = new MyTypedActorFormat + val bytes = toBinaryJ(typedActor1, f) + val typedActor2: MyTypedActor = fromBinaryJ(bytes, f) + typedActor2.requestReply("hello") should equal("world 3") + } + + it("should be able to serialize and de-serialize a stateful typed actor with compound state") { + val typedActor1 = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyTypedActorWithDualCounter], 1000) + typedActor1.requestReply("hello") should equal("world 1 1") + typedActor1.requestReply("hello") should equal("world 2 2") + + val f = new MyTypedActorWithDualCounterFormat + val bytes = toBinaryJ(typedActor1, f) + val typedActor2: MyTypedActor = fromBinaryJ(bytes, f) + typedActor2.requestReply("hello") should equal("world 3 3") + } + + it("should be able to serialize a local yped actor ref to a remote typed actor ref proxy") { + val typedActor1 = TypedActor.newInstance(classOf[MyTypedActor], classOf[MyStatelessTypedActorImpl], 1000) + typedActor1.requestReply("hello") should equal("world") + typedActor1.requestReply("hello") should equal("world") + + val bytes = RemoteTypedActorSerialization.toBinary(typedActor1) + val typedActor2: MyTypedActor = RemoteTypedActorSerialization.fromBinaryToRemoteTypedActorRef(bytes) + typedActor1.requestReply("hello") should equal("world") + } + } +} + + +trait MyTypedActor { + def requestReply(s: String) : String + def oneWay() : Unit +} + +class MyTypedActorImpl extends TypedActor with MyTypedActor { + var count = 0 + + override def oneWay() { + println("got oneWay message") + } + + override def requestReply(message: String) : String = { + count = count + 1 + if (message == "hello") { + "world " + count + } else ("hello " + message + " " + count) + } +} + +class MyTypedActorWithDualCounter extends TypedActor with MyTypedActor { + var count1 = 0 + var count2 = 0 + + override def oneWay() { + println("got oneWay message") + } + + override def requestReply(message: String) : String = { + count1 = count1 + 1 + count2 = count2 + 1 + + if (message == "hello") { + "world " + count1 + " " + count2 + } else ("hello " + message + " " + count1 + " " + count2) + } +} + +class MyStatelessTypedActorImpl extends TypedActor with MyTypedActor { + + override def oneWay() { + println("got oneWay message") + } + + override def requestReply(message: String) : String = { + if (message == "hello") "world" else ("hello " + message) + } +} diff --git a/akka-typed-actor/src/main/scala/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/actor/TypedActor.scala index c3457cb43b..f6cf2e2337 100644 --- a/akka-typed-actor/src/main/scala/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/actor/TypedActor.scala @@ -390,11 +390,22 @@ object TypedActor extends Logging { if (config._messageDispatcher.isDefined) actorRef.dispatcher = config._messageDispatcher.get if (config._threadBasedDispatcher.isDefined) actorRef.dispatcher = Dispatchers.newThreadBasedDispatcher(actorRef) if (config._host.isDefined) actorRef.makeRemote(config._host.get) + actorRef.timeout = config.timeout AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, config._host, config.timeout)) actorRef.start proxy.asInstanceOf[T] } + private[akka] def newInstance[T](intfClass: Class[T], actorRef: ActorRef): T = { + if (!actorRef.actorInstance.get.isInstanceOf[TypedActor]) throw new IllegalArgumentException("ActorRef is not a ref to a typed actor") + val typedActor = actorRef.actorInstance.get.asInstanceOf[TypedActor] + val proxy = Proxy.newInstance(Array(intfClass), Array(typedActor), true, false) + typedActor.initialize(proxy) + AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, actorRef.remoteAddress, actorRef.timeout)) + actorRef.start + proxy.asInstanceOf[T] + } + private[akka] def newInstance[T](intfClass: Class[T], targetClass: Class[_], remoteAddress: Option[InetSocketAddress], timeout: Long): T = { val actorRef = actorOf(newTypedActor(targetClass)) diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 859425a6c7..ff7b57d9d5 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -165,5 +165,9 @@ akka { hostname = "127.0.0.1" # IP address or hostname of the Redis instance port = 6379 # Port to Redis } + + hbase { + zookeeper.quorum = "localhost" + } } } diff --git a/embedded-repo/org/apache/hbase/hbase-core/0.20.6/hbase-core-0.20.6.jar b/embedded-repo/org/apache/hbase/hbase-core/0.20.6/hbase-core-0.20.6.jar new file mode 100644 index 0000000000..e74cf9017e Binary files /dev/null and b/embedded-repo/org/apache/hbase/hbase-core/0.20.6/hbase-core-0.20.6.jar differ diff --git a/embedded-repo/org/apache/hbase/hbase-test/0.20.6/hbase-test-0.20.6.jar b/embedded-repo/org/apache/hbase/hbase-test/0.20.6/hbase-test-0.20.6.jar new file mode 100644 index 0000000000..34a65f908e Binary files /dev/null and b/embedded-repo/org/apache/hbase/hbase-test/0.20.6/hbase-test-0.20.6.jar differ diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 90fc49afc2..bb468c7b9f 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -51,6 +51,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2") lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases") lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo") + lazy val CasbahRepoReleases = MavenRepository("Casbah Release Repo", "http://repo.bumnetworks.com/releases") + lazy val ZookeeperRepo = MavenRepository("Zookeeper Repo", "http://lilycms.org/maven/maven2/deploy/") } // ------------------------------------------------------------------------------------------------------------------- @@ -77,6 +79,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots) lazy val logbackModuleConfig = ModuleConfiguration("ch.qos.logback",sbt.DefaultMavenRepository) lazy val atomikosModuleConfig = ModuleConfiguration("com.atomikos",sbt.DefaultMavenRepository) + lazy val casbahRelease = ModuleConfiguration("com.novus",CasbahRepoReleases) + lazy val zookeeperRelease = ModuleConfiguration("org.apache.hadoop.zookeeper",ZookeeperRepo) lazy val casbahModuleConfig = ModuleConfiguration("com.novus", CasbahRepo) lazy val timeModuleConfig = ModuleConfiguration("org.scala-tools", "time", CasbahSnapshotRepo) lazy val embeddedRepo = EmbeddedRepo // This is the only exception, because the embedded repo is fast! @@ -203,6 +207,12 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val werkz = "org.codehaus.aspectwerkz" % "aspectwerkz-nodeps-jdk5" % ASPECTWERKZ_VERSION % "compile" lazy val werkz_core = "org.codehaus.aspectwerkz" % "aspectwerkz-jdk5" % ASPECTWERKZ_VERSION % "compile" + lazy val zookeeper = "org.apache.hadoop.zookeeper" % "zookeeper" % "3.2.2" % "compile" + + lazy val hadoop_core = "org.apache.hadoop" % "hadoop-core" % "0.20.2" % "compile" + + lazy val hbase_core = "org.apache.hbase" % "hbase-core" % "0.20.6" % "compile" + // Test lazy val camel_spring = "org.apache.camel" % "camel-spring" % CAMEL_VERSION % "test" @@ -216,6 +226,10 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val junit = "junit" % "junit" % "4.5" % "test" lazy val mockito = "org.mockito" % "mockito-all" % "1.8.1" % "test" lazy val scalatest = "org.scalatest" % "scalatest" % SCALATEST_VERSION % "test" + lazy val hadoop_test = "org.apache.hadoop" % "hadoop-test" % "0.20.2" % "test" + lazy val hbase_test = "org.apache.hbase" % "hbase-test" % "0.20.6" % "test" + lazy val log4j = "log4j" % "log4j" % "1.2.15" % "test" + lazy val jett_mortbay = "org.mortbay.jetty" % "jetty" % "6.1.14" % "test" } // ------------------------------------------------------------------------------------------------------------------- @@ -461,6 +475,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { new AkkaMongoProject(_), akka_persistence_common) lazy val akka_persistence_cassandra = project("akka-persistence-cassandra", "akka-persistence-cassandra", new AkkaCassandraProject(_), akka_persistence_common) + lazy val akka_persistence_hbase = project("akka-persistence-hbase", "akka-persistence-hbase", + new AkkaHbaseProject(_), akka_persistence_common) } // ------------------------------------------------------------------------------------------------------------------- @@ -510,6 +526,22 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { override def testOptions = TestFilter((name: String) => name.endsWith("Test")) :: Nil } + // ------------------------------------------------------------------------------------------------------------------- + // akka-persistence-hbase subproject + // ------------------------------------------------------------------------------------------------------------------- + + class AkkaHbaseProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { + val zookeeper = Dependencies.zookeeper + val hadoop_core = Dependencies.hadoop_core + val hbase_core = Dependencies.hbase_core + + // testing + val hadoop_test = Dependencies.hadoop_test + val hbase_test = Dependencies.hbase_test + val jetty = Dependencies.jett_mortbay + val log4j = Dependencies.log4j + } + // ------------------------------------------------------------------------------------------------------------------- // akka-kernel subproject // -------------------------------------------------------------------------------------------------------------------