diff --git a/akka-actor/src/main/scala/actor/Actor.scala b/akka-actor/src/main/scala/actor/Actor.scala
index bbb51aff34..46297d32d0 100644
--- a/akka-actor/src/main/scala/actor/Actor.scala
+++ b/akka-actor/src/main/scala/actor/Actor.scala
@@ -76,6 +76,14 @@ class ActorKilledException private[akka](message: String) extends AkkaException(
class ActorInitializationException private[akka](message: String) extends AkkaException(message)
class ActorTimeoutException private[akka](message: String) extends AkkaException(message)
+/**
+ * This message is thrown by default when an Actors behavior doesn't match a message
+ */
+case class UnhandledMessageException(msg: Any, ref: ActorRef) extends Exception {
+ override def getMessage() = "Actor %s does not handle [%s]".format(ref,msg)
+ override def fillInStackTrace() = this //Don't waste cycles generating stack trace
+}
+
/**
* Actor factory module with factory methods for creating various kinds of Actors.
*
@@ -387,6 +395,16 @@ trait Actor extends Logging {
*/
def postRestart(reason: Throwable) {}
+ /**
+ * User overridable callback.
+ *
+ * Is called when a message isn't handled by the current behavior of the actor
+ * by default it throws an UnhandledMessageException
+ */
+ def unhandled(msg: Any){
+ throw new UnhandledMessageException(msg,self)
+ }
+
/**
* Is the actor able to handle the message passed in as arguments?
*/
@@ -407,8 +425,9 @@ trait Actor extends Logging {
// ==== INTERNAL IMPLEMENTATION DETAILS ====
// =========================================
- private[akka] def apply(msg: Any) = processingBehavior(msg)
+ private[akka] def apply(msg: Any) = fullBehavior(msg)
+ /*Processingbehavior and fullBehavior are duplicates so make sure changes are done to both */
private lazy val processingBehavior: Receive = {
lazy val defaultBehavior = receive
val actorBehavior: Receive = {
@@ -426,6 +445,25 @@ trait Actor extends Logging {
}
actorBehavior
}
+
+ private lazy val fullBehavior: Receive = {
+ lazy val defaultBehavior = receive
+ val actorBehavior: Receive = {
+ case HotSwap(code) => become(code)
+ case RevertHotSwap => unbecome
+ case Exit(dead, reason) => self.handleTrapExit(dead, reason)
+ case Link(child) => self.link(child)
+ case Unlink(child) => self.unlink(child)
+ case UnlinkAndStop(child) => self.unlink(child); child.stop
+ case Restart(reason) => throw reason
+ case msg if !self.hotswap.isEmpty &&
+ self.hotswap.head.isDefinedAt(msg) => self.hotswap.head.apply(msg)
+ case msg if self.hotswap.isEmpty &&
+ defaultBehavior.isDefinedAt(msg) => defaultBehavior.apply(msg)
+ case unknown => unhandled(unknown) //This is the only line that differs from processingbehavior
+ }
+ actorBehavior
+ }
}
private[actor] class AnyOptionAsTypedOption(anyOption: Option[Any]) {
diff --git a/akka-actor/src/test/scala/dispatch/ActorModelSpec.scala b/akka-actor/src/test/scala/dispatch/ActorModelSpec.scala
index bd0f359398..6cd57c15fe 100644
--- a/akka-actor/src/test/scala/dispatch/ActorModelSpec.scala
+++ b/akka-actor/src/test/scala/dispatch/ActorModelSpec.scala
@@ -12,6 +12,7 @@ import akka.actor.Actor._
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent. {ConcurrentHashMap, CountDownLatch, TimeUnit}
import akka.actor.dispatch.ActorModelSpec.MessageDispatcherInterceptor
+import akka.util.Duration
object ActorModelSpec {
@@ -164,6 +165,18 @@ object ActorModelSpec {
assert(stats.restarts.get() === restarts, "Restarts")
}
+ def await(condition: => Boolean)(withinMs: Long, intervalMs: Long = 25): Boolean = try {
+ val until = System.currentTimeMillis() + withinMs
+ while(System.currentTimeMillis() <= until) {
+ try {
+ if (condition) return true
+
+ Thread.sleep(intervalMs)
+ } catch { case e: InterruptedException => }
+ }
+ false
+ }
+
def newTestActor(implicit d: MessageDispatcherInterceptor) = actorOf(new DispatcherActor(d))
}
@@ -179,7 +192,7 @@ abstract class ActorModelSpec extends JUnitSuite {
a.start
assertDispatcher(dispatcher)(starts = 1, stops = 0)
a.stop
- Thread.sleep(dispatcher.timeoutMs + 100)
+ await(dispatcher.stops.get == 1)(withinMs = 10000)
assertDispatcher(dispatcher)(starts = 1, stops = 1)
assertRef(a,dispatcher)(
suspensions = 0,
@@ -279,7 +292,7 @@ abstract class ActorModelSpec extends JUnitSuite {
}
for(run <- 1 to 3) {
flood(10000)
- Thread.sleep(dispatcher.timeoutMs * 2)
+ await(dispatcher.stops.get == run)(withinMs = 10000)
assertDispatcher(dispatcher)(starts = run, stops = run)
}
}
diff --git a/akka-persistence/akka-persistence-common/src/main/scala/CommonStorageBackend.scala b/akka-persistence/akka-persistence-common/src/main/scala/CommonStorageBackend.scala
index 17502903f0..2dd9147d74 100644
--- a/akka-persistence/akka-persistence-common/src/main/scala/CommonStorageBackend.scala
+++ b/akka-persistence/akka-persistence-common/src/main/scala/CommonStorageBackend.scala
@@ -14,9 +14,11 @@ import collection.immutable._
private[akka] trait CommonStorageBackendAccess {
+
import CommonStorageBackend._
/*abstract*/
+
def getValue(owner: String, key: Array[Byte], default: Array[Byte]): Array[Byte]
def put(owner: String, key: Array[Byte], value: Array[Byte]): Unit
@@ -28,6 +30,7 @@ private[akka] trait CommonStorageBackendAccess {
def drop(): Unit
/*concrete*/
+
def decodeKey(owner: String, key: Array[Byte]) = key
def delete(owner: String, index: Int): Unit = delete(owner, IntSerializer.toBytes(index))
@@ -40,6 +43,7 @@ private[akka] trait CommonStorageBackendAccess {
}
private[akka] trait KVStorageBackendAccess extends CommonStorageBackendAccess {
+
import CommonStorageBackend._
import KVStorageBackend._
@@ -85,7 +89,7 @@ private[akka] trait KVStorageBackendAccess extends CommonStorageBackendAccess {
override def getAll(owner: String, keys: Iterable[Array[Byte]]): Map[Array[Byte], Array[Byte]] = {
- getAll(keys.map {
+ getAll(keys.map{
getKey(owner, _)
})
}
@@ -109,8 +113,25 @@ private[akka] object CommonStorageBackend {
val nullMapValueHeader = 0x00.byteValue
val nullMapValue: Array[Byte] = Array(nullMapValueHeader)
val notNullMapValueHeader: Byte = 0xff.byteValue
+ val mapKeySetKeyHeader = 0x00.byteValue
+ val mapKeyHeader = 0xff.byteValue
+ val mapKeysIndex: Array[Byte] = new Array[Byte](1).padTo(1, mapKeySetKeyHeader)
+ val mapKeysWrapperPad: Array[Byte] = new Array[Byte](1).padTo(1, mapKeyHeader)
- def getStoredMapValue(value: Array[Byte]): Array[Byte] = {
+ def wrapMapKey(key: Array[Byte]): Array[Byte] = {
+ val wrapped = new Array[Byte](key.length + mapKeysWrapperPad.length)
+ System.arraycopy(mapKeysWrapperPad, 0, wrapped, 0, mapKeysWrapperPad.length)
+ System.arraycopy(key, 0, wrapped, mapKeysWrapperPad.length, key.length)
+ wrapped
+ }
+
+ def unwrapMapKey(key: Array[Byte]): Array[Byte] = {
+ val unwrapped = new Array[Byte](key.length - mapKeysWrapperPad.length)
+ System.arraycopy(key, mapKeysWrapperPad.length, unwrapped, 0, unwrapped.length)
+ unwrapped
+ }
+
+ def getStoredMapValue(value: Array[Byte]): Array[Byte] = {
value match {
case null => nullMapValue
case value => {
@@ -190,7 +211,9 @@ private[akka] object CommonStorageBackend {
}
private[akka] object KVStorageBackend {
- import CommonStorageBackend._
+
+ import CommonStorageBackend._
+
/**
* Concat the ownerlenght+owner+key+ of owner so owned data will be colocated
* Store the length of owner as first byte to work around the rare case
@@ -214,8 +237,9 @@ private[akka] object KVStorageBackend {
}
private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte], Array[Byte]] with VectorStorageBackend[Array[Byte]] with RefStorageBackend[Array[Byte]] with QueueStorageBackend[Array[Byte]] with Logging {
+
import CommonStorageBackend._
- val mapKeysIndex = IntSerializer.toBytes(-1)
+
val vectorHeadIndex = IntSerializer.toBytes(-1)
val vectorTailIndex = IntSerializer.toBytes(-2)
val queueHeadIndex = IntSerializer.toBytes(-1)
@@ -261,15 +285,15 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte],
private def getKeyValues(name: String, keys: SortedSet[Array[Byte]]): List[(Array[Byte], Array[Byte])] = {
val all: Map[Array[Byte], Array[Byte]] =
- mapAccess.getAll(name, keys)
+ mapAccess.getAll(name, keys)
var returned = new TreeMap[Array[Byte], Array[Byte]]()(ordering)
- all.foreach {
+ all.foreach{
(entry) => {
entry match {
case (namePlusKey: Array[Byte], value: Array[Byte]) => {
//need to fix here
- returned += mapAccess.decodeKey(name, namePlusKey) -> getMapValueFromStored(value)
+ returned += mapAccess.decodeKey(name, unwrapMapKey(namePlusKey)) -> getMapValueFromStored(value)
}
}
}
@@ -283,7 +307,7 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte],
}
def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = {
- val result: Array[Byte] = mapAccess.getValue(name, key)
+ val result: Array[Byte] = mapAccess.getValue(name, wrapMapKey(key))
result match {
case null => None
case _ => Some(getMapValueFromStored(result))
@@ -291,15 +315,16 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte],
}
def removeMapStorageFor(name: String, key: Array[Byte]) = {
+ val wrapped = wrapMapKey(key)
var keys = getMapKeys(name)
- keys -= key
+ keys -= wrapped
putMapKeys(name, keys)
- mapAccess.delete(name, key)
+ mapAccess.delete(name, wrapped)
}
def removeMapStorageFor(name: String) = {
val keys = getMapKeys(name)
- keys.foreach {
+ keys.foreach{
key =>
mapAccess.delete(name, key)
log.debug("deleted key %s for %s", key, name)
@@ -308,17 +333,19 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte],
}
def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) = {
- mapAccess.put(name, key, getStoredMapValue(value))
+ val wrapped = wrapMapKey(key)
+ mapAccess.put(name, wrapped, getStoredMapValue(value))
var keys = getMapKeys(name)
- keys += key
+ keys += wrapped
putMapKeys(name, keys)
}
def insertMapStorageEntriesFor(name: String, entries: List[(Array[Byte], Array[Byte])]) = {
- val newKeys = entries.map {
+ val newKeys = entries.map{
case (key, value) => {
- mapAccess.put(name, key, getStoredMapValue(value))
- key
+ val wrapped = wrapMapKey(key)
+ mapAccess.put(name, wrapped, getStoredMapValue(value))
+ wrapped
}
}
var keys = getMapKeys(name)
@@ -334,6 +361,7 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte],
SortedSetSerializer.fromBytes(mapAccess.getValue(name, mapKeysIndex, Array.empty[Byte]))
}
+
def getVectorStorageSizeFor(name: String): Int = {
getVectorMetadata(name).size
}
@@ -343,12 +371,12 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte],
val st = start.getOrElse(0)
var cnt =
- if (finish.isDefined) {
- val f = finish.get
- if (f >= st) (f - st) else count
- } else {
- count
- }
+ if (finish.isDefined) {
+ val f = finish.get
+ if (f >= st) (f - st) else count
+ } else {
+ count
+ }
if (cnt > (mdata.size - st)) {
cnt = mdata.size - st
}
@@ -384,7 +412,7 @@ private[akka] trait CommonStorageBackend extends MapStorageBackend[Array[Byte],
}
def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) = {
- elements.foreach {
+ elements.foreach{
insertVectorStorageEntryFor(name, _)
}
diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala
index b9e7c3195d..7d74496936 100644
--- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala
+++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala
@@ -446,7 +446,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
case class LogEntry(index: Option[Int], value: Option[T], op: Op)
// need to override in subclasses e.g. "sameElements" for Array[Byte]
- def equal(v1: T, v2: T): Boolean = v1 == v2
+ // def equal(v1: T, v2: T): Boolean = v1 == v2
val storage: VectorStorageBackend[T]
@@ -614,90 +614,67 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
//Import Ops
import PersistentQueue._
- import scala.collection.immutable.Queue
+
+ case class LogEntry(value: Option[A], op: QueueOp)
// current trail that will be played on commit to the underlying store
- protected val enqueuedNDequeuedEntries = TransactionalVector[(Option[A], QueueOp)]()
- protected val shouldClearOnCommit = Ref[Boolean]()
-
- // local queue that will record all enqueues and dequeues in the current txn
- protected val localQ = Ref[Queue[A]]()
-
- // keeps a pointer to the underlying storage for the enxt candidate to be dequeued
- protected val pickMeForDQ = Ref[Int]()
-
- localQ.swap(Queue.empty)
- pickMeForDQ.swap(0)
+ protected val appendOnlyTxLog = TransactionalVector[LogEntry]()
// to be concretized in subclasses
val storage: QueueStorageBackend[A]
- def commit = {
- enqueuedNDequeuedEntries.toList.foreach {
- e =>
- e._2 match {
- case ENQ => storage.enqueue(uuid, e._1.get)
- case DEQ => storage.dequeue(uuid)
- }
+ def commit = synchronized {
+ for (entry <- appendOnlyTxLog) {
+ (entry: @unchecked) match {
+ case LogEntry(Some(v), ENQ) => storage.enqueue(uuid, v)
+ case LogEntry(_, DEQ) => storage.dequeue(uuid)
+ }
}
- if (shouldClearOnCommit.isDefined && shouldClearOnCommit.get) {
- storage.remove(uuid)
+ appendOnlyTxLog.clear
+ }
+
+ def abort = synchronized {
+ appendOnlyTxLog.clear
+ }
+
+ override def toList = replay
+
+ override def enqueue(elems: A*) = synchronized {
+ register
+ elems.foreach(e => appendOnlyTxLog.add(LogEntry(Some(e), ENQ)))
+ }
+
+ private def replay: List[A] = synchronized {
+ import scala.collection.mutable.ListBuffer
+ var elemsStorage = ListBuffer(storage.peek(uuid, 0, storage.size(uuid)): _*)
+
+ for (entry <- appendOnlyTxLog) {
+ (entry: @unchecked) match {
+ case LogEntry(Some(v), ENQ) => elemsStorage += v
+ case LogEntry(_, DEQ) => elemsStorage = elemsStorage.drop(1)
+ }
}
- enqueuedNDequeuedEntries.clear
- localQ.swap(Queue.empty)
- pickMeForDQ.swap(0)
- shouldClearOnCommit.swap(false)
+ elemsStorage.toList
}
- def abort = {
- enqueuedNDequeuedEntries.clear
- shouldClearOnCommit.swap(false)
- localQ.swap(Queue.empty)
- pickMeForDQ.swap(0)
- }
-
-
- override def enqueue(elems: A*) {
+ override def dequeue: A = synchronized {
register
- elems.foreach(e => {
- enqueuedNDequeuedEntries.add((Some(e), ENQ))
- localQ.get.enqueue(e)
- })
+ val l = replay
+ if (l.isEmpty) throw new NoSuchElementException("trying to dequeue from empty queue")
+ appendOnlyTxLog.add(LogEntry(None, DEQ))
+ l.head
}
- override def dequeue: A = {
+ override def clear = synchronized {
register
- // record for later playback
- enqueuedNDequeuedEntries.add((None, DEQ))
-
- val i = pickMeForDQ.get
- if (i < storage.size(uuid)) {
- // still we can DQ from storage
- pickMeForDQ.swap(i + 1)
- storage.peek(uuid, i, 1)(0)
- } else {
- // check we have transient candidates in localQ for DQ
- if (!localQ.get.isEmpty) {
- val (a, q) = localQ.get.dequeue
- localQ.swap(q)
- a
- } else throw new NoSuchElementException("trying to dequeue from empty queue")
- }
- }
-
- override def clear = {
- register
- shouldClearOnCommit.swap(true)
- localQ.swap(Queue.empty)
- pickMeForDQ.swap(0)
+ appendOnlyTxLog.clear
}
override def size: Int = try {
- storage.size(uuid) + localQ.get.length
+ replay.size
} catch {case e: Exception => 0}
- override def isEmpty: Boolean =
- size == 0
+ override def isEmpty: Boolean = size == 0
override def +=(elem: A) = {
enqueue(elem)
diff --git a/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala b/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala
index 51f3de40d0..4900ea7695 100644
--- a/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala
+++ b/akka-persistence/akka-persistence-common/src/test/scala/MapStorageBackendTest.scala
@@ -69,7 +69,7 @@ trait MapStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfter
it("should insert multiple map storage elements properly") {
val mapName = "insertMultipleTest"
val rand = new Random(3).nextInt(100)
- val entries = (1 to rand).toList.map {
+ val entries = (1 to rand).toList.map{
index =>
(("insertMultipleTestKey" + index).getBytes -> ("insertMutlipleTestValue" + index).getBytes)
}
@@ -97,7 +97,7 @@ trait MapStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfter
it("should accurately track the number of key value pairs in a map") {
val mapName = "sizeTest"
val rand = new Random(3).nextInt(100)
- val entries = (1 to rand).toList.map {
+ val entries = (1 to rand).toList.map{
index =>
(("sizeTestKey" + index).getBytes -> ("sizeTestValue" + index).getBytes)
}
@@ -112,7 +112,7 @@ trait MapStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfter
val mapName = "allTest"
val rand = new Random(3).nextInt(100)
var entries = new TreeMap[Array[Byte], Array[Byte]]()(ArrayOrdering)
- (1 to rand).foreach {
+ (1 to rand).foreach{
index =>
entries += (("allTestKey" + index).getBytes -> ("allTestValue" + index).getBytes)
}
@@ -124,12 +124,20 @@ trait MapStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfter
- val entryMap = new HashMap[String, String] ++ entries.map {_ match {case (k, v) => (new String(k), new String(v))}}
- val retrievedMap = new HashMap[String, String] ++ entries.map {_ match {case (k, v) => (new String(k), new String(v))}}
+ val entryMap = new HashMap[String, String] ++ entries.map{
+ _ match {
+ case (k, v) => (new String(k), new String(v))
+ }
+ }
+ val retrievedMap = new HashMap[String, String] ++ entries.map{
+ _ match {
+ case (k, v) => (new String(k), new String(v))
+ }
+ }
entryMap should equal(retrievedMap)
- (0 until rand).foreach {
+ (0 until rand).foreach{
i: Int => {
new String(entries.toList(i)._1) should be(new String(retrieved(i)._1))
}
@@ -155,6 +163,14 @@ trait MapStorageBackendTest extends Spec with ShouldMatchers with BeforeAndAfter
storage.getMapStorageSizeFor("nonExistent") should be(0)
}
+ it("should not stomp on the map keyset when a map key of 0xff is used") {
+ val mapName = "keySetStomp"
+ val key = CommonStorageBackend.mapKeysIndex
+ storage.insertMapStorageEntryFor(mapName, key, key)
+ storage.getMapStorageSizeFor(mapName) should be(1)
+ storage.getMapStorageEntryFor(mapName,key).get should be (key)
+ }
+
}
diff --git a/akka-persistence/akka-persistence-memcached/src/main/scala/MemcachedStorage.scala b/akka-persistence/akka-persistence-memcached/src/main/scala/MemcachedStorage.scala
new file mode 100644
index 0000000000..3289a33f12
--- /dev/null
+++ b/akka-persistence/akka-persistence-memcached/src/main/scala/MemcachedStorage.scala
@@ -0,0 +1,51 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB
+ */
+
+package akka.persistence.memcached
+
+import akka.actor.{newUuid}
+import akka.stm._
+import akka.persistence.common._
+
+
+object MemcachedStorage extends Storage {
+
+ type ElementType = Array[Byte]
+ def newMap: PersistentMap[ElementType, ElementType] = newMap(newUuid.toString)
+ def newVector: PersistentVector[ElementType] = newVector(newUuid.toString)
+ def newRef: PersistentRef[ElementType] = newRef(newUuid.toString)
+ override def newQueue: PersistentQueue[ElementType] = newQueue(newUuid.toString)
+
+ def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id)
+ def getVector(id: String): PersistentVector[ElementType] = newVector(id)
+ def getRef(id: String): PersistentRef[ElementType] = newRef(id)
+ override def getQueue(id: String): PersistentQueue[ElementType] = newQueue(id)
+
+ def newMap(id: String): PersistentMap[ElementType, ElementType] = new MemcachedPersistentMap(id)
+ def newVector(id: String): PersistentVector[ElementType] = new MemcachedPersistentVector(id)
+ def newRef(id: String): PersistentRef[ElementType] = new MemcachedPersistentRef(id)
+ override def newQueue(id:String): PersistentQueue[ElementType] = new MemcachedPersistentQueue(id)
+}
+
+
+class MemcachedPersistentMap(id: String) extends PersistentMapBinary {
+ val uuid = id
+ val storage = MemcachedStorageBackend
+}
+
+
+class MemcachedPersistentVector(id: String) extends PersistentVector[Array[Byte]] {
+ val uuid = id
+ val storage = MemcachedStorageBackend
+}
+
+class MemcachedPersistentRef(id: String) extends PersistentRef[Array[Byte]] {
+ val uuid = id
+ val storage = MemcachedStorageBackend
+}
+
+class MemcachedPersistentQueue(id: String) extends PersistentQueue[Array[Byte]] {
+ val uuid = id
+ val storage = MemcachedStorageBackend
+}
diff --git a/akka-persistence/akka-persistence-memcached/src/main/scala/MemcachedStorageBackend.scala b/akka-persistence/akka-persistence-memcached/src/main/scala/MemcachedStorageBackend.scala
new file mode 100644
index 0000000000..b84b99adbd
--- /dev/null
+++ b/akka-persistence/akka-persistence-memcached/src/main/scala/MemcachedStorageBackend.scala
@@ -0,0 +1,117 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB
+ */
+
+package akka.persistence.memcached
+
+import akka.persistence.common._
+import akka.config.Config.config
+import net.spy.memcached._
+import net.spy.memcached.transcoders._
+import collection.JavaConversions
+import java.lang.String
+import collection.immutable.{TreeMap, Iterable}
+import java.util.concurrent.{TimeoutException, Future, TimeUnit}
+
+private[akka] object MemcachedStorageBackend extends CommonStorageBackend {
+
+ import CommonStorageBackendAccess._
+ import CommonStorageBackend._
+ import KVStorageBackend._
+ import org.apache.commons.codec.binary.Base64
+
+ val clientAddresses = config.getString("akka.storage.memcached.client.addresses", "localhost:11211")
+ val factory = new KetamaConnectionFactory
+ val client = new MemcachedClient(factory, AddrUtil.getAddresses(clientAddresses))
+ val base64 = new Base64(76, Array.empty[Byte], true)
+
+ def queueAccess = new MemcachedAccess("Q")
+
+ def mapAccess = new MemcachedAccess("M")
+
+ def vectorAccess = new MemcachedAccess("V")
+
+ def refAccess = new MemcachedAccess("R")
+
+ private[akka] class MemcachedAccess(val accessType: String) extends KVStorageBackendAccess {
+
+ val typeBytes = stringToByteArray(accessType)
+
+ private def encodeKey(key: Array[Byte]): Array[Byte] = {
+ val newkey = new Array[Byte](key.length + typeBytes.length)
+ System.arraycopy(key, 0, newkey, 0, key.length)
+ System.arraycopy(typeBytes, 0, newkey, key.length, typeBytes.length)
+ newkey
+ }
+
+ private def keyStr(key: Array[Byte]): String = {
+ base64.encodeToString(key)
+ }
+
+ override def decodeKey(owner: String, key: Array[Byte]) = {
+ val newkey = new Array[Byte](key.length - typeBytes.length)
+ System.arraycopy(key, 0, newkey, 0, newkey.length)
+ super.decodeKey(owner, newkey)
+ }
+
+ def drop() = client.flush()
+
+ def delete(key: Array[Byte]) = {
+ retry(5, (1L, TimeUnit.SECONDS), false) {
+ client.delete(keyStr(encodeKey(key)))
+ }
+ }
+
+ def getAll(keys: Iterable[Array[Byte]]) = {
+ val jmap = client.getBulk(JavaConversions.asList(keys.map{
+ k: Array[Byte] =>
+ keyStr(encodeKey(k))
+ }.toList))
+ JavaConversions.asMap(jmap).map{
+ kv => kv match {
+ case (key, value) => (base64.decode(key) -> value.asInstanceOf[Array[Byte]])
+ }
+ }
+ }
+
+ def getValue(key: Array[Byte], default: Array[Byte]) = {
+ Option(client.get(keyStr(encodeKey(key)))) match {
+ case Some(value) => value.asInstanceOf[Array[Byte]]
+ case None => default
+ }
+ }
+
+ def getValue(key: Array[Byte]) = getValue(key, null)
+
+
+ def put(key: Array[Byte], value: Array[Byte]) = {
+ retry(5, (1L, TimeUnit.SECONDS), true) {
+ client.set(keyStr(encodeKey(key)), Integer.MAX_VALUE, value)
+ }
+
+ }
+
+ private def retry(tries: Int, waitFor: (Long, TimeUnit), tillTrue: Boolean)(action: => Future[java.lang.Boolean]): Unit = {
+ if (tries == 0) {
+ throw new TimeoutException("Exahusted all retries performing an operation on memcached")
+ } else {
+ val future = action
+ try
+ {
+ if (future.get(waitFor._1, waitFor._2).equals(false) && tillTrue) {
+ log.debug("memcached future returned false, operation failed. retrying")
+ retry(tries - 1, waitFor, tillTrue)(action)
+ }
+ } catch {
+ case te: TimeoutException => {
+ log.debug("memcached future timed out. retrying")
+ retry(tries - 1, waitFor, tillTrue)(action)
+ }
+ }
+ }
+ }
+
+ }
+
+
+}
\ No newline at end of file
diff --git a/akka-persistence/akka-persistence-memcached/src/test/scala/MemcachedStorageBackendCompatibilityTest.scala b/akka-persistence/akka-persistence-memcached/src/test/scala/MemcachedStorageBackendCompatibilityTest.scala
new file mode 100644
index 0000000000..6881d25c20
--- /dev/null
+++ b/akka-persistence/akka-persistence-memcached/src/test/scala/MemcachedStorageBackendCompatibilityTest.scala
@@ -0,0 +1,49 @@
+package akka.persistence.memcached
+
+
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import akka.persistence.common.{QueueStorageBackendTest, VectorStorageBackendTest, MapStorageBackendTest, RefStorageBackendTest}
+
+@RunWith(classOf[JUnitRunner])
+class MemcachedRefStorageBackendTestIntegration extends RefStorageBackendTest {
+ def dropRefs = {
+ MemcachedStorageBackend.refAccess.drop
+ }
+
+
+ def storage = MemcachedStorageBackend
+}
+
+@RunWith(classOf[JUnitRunner])
+class MemcachedMapStorageBackendTestIntegration extends MapStorageBackendTest {
+ def dropMaps = {
+ MemcachedStorageBackend.mapAccess.drop
+ }
+
+
+ def storage = MemcachedStorageBackend
+}
+
+@RunWith(classOf[JUnitRunner])
+class MemcachedVectorStorageBackendTestIntegration extends VectorStorageBackendTest {
+ def dropVectors = {
+ MemcachedStorageBackend.vectorAccess.drop
+ }
+
+
+ def storage = MemcachedStorageBackend
+}
+
+
+@RunWith(classOf[JUnitRunner])
+class MemcachedQueueStorageBackendTestIntegration extends QueueStorageBackendTest {
+ def dropQueues = {
+ MemcachedStorageBackend.queueAccess.drop
+ }
+
+
+ def storage = MemcachedStorageBackend
+}
+
+
diff --git a/akka-persistence/akka-persistence-memcached/src/test/scala/MemcachedTicket343TestIntegration.scala b/akka-persistence/akka-persistence-memcached/src/test/scala/MemcachedTicket343TestIntegration.scala
new file mode 100644
index 0000000000..3a5da2241f
--- /dev/null
+++ b/akka-persistence/akka-persistence-memcached/src/test/scala/MemcachedTicket343TestIntegration.scala
@@ -0,0 +1,23 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB
+ */
+
+package akka.persistence.memcached
+
+
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import akka.persistence.common._
+
+@RunWith(classOf[JUnitRunner])
+class MemcachedTicket343TestIntegration extends Ticket343Test {
+ def dropMapsAndVectors: Unit = {
+ MemcachedStorageBackend.vectorAccess.drop
+ MemcachedStorageBackend.mapAccess.drop
+ }
+
+ def getVector: (String) => PersistentVector[Array[Byte]] = MemcachedStorage.getVector
+
+ def getMap: (String) => PersistentMap[Array[Byte], Array[Byte]] = MemcachedStorage.getMap
+
+}
diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala
index e0ab63abe8..c487741ea6 100644
--- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala
+++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala
@@ -22,74 +22,61 @@ import akka.actor.Actor._
*/
case class Balance(accountNo: String)
-case class Debit(accountNo: String, amount: BigInt, failer: ActorRef)
-case class MultiDebit(accountNo: String, amounts: List[BigInt], failer: ActorRef)
-case class Credit(accountNo: String, amount: BigInt)
+case class Debit(accountNo: String, amount: Int, failer: ActorRef)
+case class MultiDebit(accountNo: String, amounts: List[Int], failer: ActorRef)
+case class Credit(accountNo: String, amount: Int)
case object LogSize
class AccountActor extends Transactor {
- import self._
private lazy val accountState = RedisStorage.newMap
- private lazy val txnLog = RedisStorage.newVector
- //timeout = 5000
+ private val txnLog = RedisStorage.newVector
+ self.timeout = 100000
def receive = {
// check balance
case Balance(accountNo) =>
txnLog.add("Balance:%s".format(accountNo).getBytes)
- reply(BigInt(new String(accountState.get(accountNo.getBytes).get)))
+ self.reply(new String(accountState.get(accountNo.getBytes).get).toInt)
// debit amount: can fail
case Debit(accountNo, amount, failer) =>
txnLog.add("Debit:%s %s".format(accountNo, amount.toString).getBytes)
- val m: BigInt =
- accountState.get(accountNo.getBytes) match {
- case Some(bytes) => BigInt(new String(bytes))
- case None => 0
- }
+ val Some(m) = accountState.get(accountNo.getBytes).map(x => (new String(x)).toInt) orElse Some(0)
accountState.put(accountNo.getBytes, (m - amount).toString.getBytes)
if (amount > m)
failer !! "Failure"
- reply(m - amount)
+ self.reply(m - amount)
// many debits: can fail
// demonstrates true rollback even if multiple puts have been done
case MultiDebit(accountNo, amounts, failer) =>
txnLog.add("MultiDebit:%s %s".format(accountNo, amounts.map(_.intValue).foldLeft(0)(_ + _).toString).getBytes)
- val m: BigInt =
- accountState.get(accountNo.getBytes) match {
- case Some(bytes) => BigInt(new String(bytes))
- case None => 0
- }
- var bal: BigInt = 0
+ val Some(m) = accountState.get(accountNo.getBytes).map(x => (new String(x)).toInt) orElse Some(0)
+ var bal = 0
amounts.foreach {amount =>
bal = bal + amount
accountState.put(accountNo.getBytes, (m - bal).toString.getBytes)
}
if (bal > m) failer !! "Failure"
- reply(m - bal)
+ self.reply(m - bal)
// credit amount
case Credit(accountNo, amount) =>
txnLog.add("Credit:%s %s".format(accountNo, amount.toString).getBytes)
- val m: BigInt =
- accountState.get(accountNo.getBytes) match {
- case Some(bytes) => BigInt(new String(bytes))
- case None => 0
- }
+ val Some(m) = accountState.get(accountNo.getBytes).map(x => (new String(x)).toInt) orElse Some(0)
accountState.put(accountNo.getBytes, (m + amount).toString.getBytes)
- reply(m + amount)
+ self.reply(m + amount)
case LogSize =>
- reply(txnLog.length.asInstanceOf[AnyRef])
+ self.reply(txnLog.length.asInstanceOf[AnyRef])
}
}
@serializable class PersistentFailerActor extends Transactor {
- // timeout = 5000
+ self.timeout = 5000
def receive = {
case "Failure" =>
throw new RuntimeException("Expected exception; to test fault-tolerance")
@@ -99,19 +86,19 @@ class AccountActor extends Transactor {
class RedisPersistentActorSpec extends JUnitSuite {
@Test
def testSuccessfulDebit = {
- val bactor = actorOf[AccountActor]
+ val bactor = actorOf(new AccountActor)
bactor.start
- val failer = actorOf[PersistentFailerActor]
+ val failer = actorOf(new PersistentFailerActor)
failer.start
bactor !! Credit("a-123", 5000)
bactor !! Debit("a-123", 3000, failer)
- assertEquals(BigInt(2000), (bactor !! Balance("a-123")).get)
+ assertEquals(2000, (bactor !! Balance("a-123")).get)
bactor !! Credit("a-123", 7000)
- assertEquals(BigInt(9000), (bactor !! Balance("a-123")).get)
+ assertEquals(9000, (bactor !! Balance("a-123")).get)
bactor !! Debit("a-123", 8000, failer)
- assertEquals(BigInt(1000), (bactor !! Balance("a-123")).get)
+ assertEquals(1000, (bactor !! Balance("a-123")).get)
val c = (bactor !! LogSize).as[Int].get
assertTrue(7 == c)
@@ -122,7 +109,7 @@ class RedisPersistentActorSpec extends JUnitSuite {
val bactor = actorOf[AccountActor]
bactor.start
bactor !! Credit("a-123", 5000)
- assertEquals(BigInt(5000), (bactor !! Balance("a-123")).get)
+ assertEquals(5000, (bactor !! Balance("a-123")).get)
val failer = actorOf[PersistentFailerActor]
failer.start
@@ -131,7 +118,7 @@ class RedisPersistentActorSpec extends JUnitSuite {
fail("should throw exception")
} catch { case e: RuntimeException => {}}
- assertEquals(BigInt(5000), (bactor !! Balance("a-123")).get)
+ assertEquals(5000, (bactor !! Balance("a-123")).get)
// should not count the failed one
val c = (bactor !! LogSize).as[Int].get
@@ -144,7 +131,7 @@ class RedisPersistentActorSpec extends JUnitSuite {
bactor.start
bactor !! Credit("a-123", 5000)
- assertEquals(BigInt(5000), (bactor !! (Balance("a-123"), 5000)).get)
+ assertEquals(5000, (bactor !! (Balance("a-123"), 5000)).get)
val failer = actorOf[PersistentFailerActor]
failer.start
@@ -153,7 +140,7 @@ class RedisPersistentActorSpec extends JUnitSuite {
fail("should throw exception")
} catch { case e: RuntimeException => {}}
- assertEquals(BigInt(5000), (bactor !! (Balance("a-123"), 5000)).get)
+ assertEquals(5000, (bactor !! (Balance("a-123"), 5000)).get)
// should not count the failed one
val c = (bactor !! LogSize).as[Int].get
diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala
index 828cae0f6a..c85f2913e0 100644
--- a/akka-remote/src/main/scala/remote/RemoteServer.scala
+++ b/akka-remote/src/main/scala/remote/RemoteServer.scala
@@ -68,14 +68,10 @@ object RemoteNode extends RemoteServer
object RemoteServer {
val UUID_PREFIX = "uuid:"
- val SECURE_COOKIE: Option[String] = {
- val cookie = config.getString("akka.remote.secure-cookie", "")
- if (cookie == "") None
- else Some(cookie)
- }
- val REQUIRE_COOKIE = {
+ val SECURE_COOKIE: Option[String] = config.getString("akka.remote.secure-cookie")
+ val REQUIRE_COOKIE = {
val requireCookie = config.getBool("akka.remote.server.require-cookie", true)
- if (RemoteServer.SECURE_COOKIE.isEmpty) throw new ConfigurationException(
+ if (requireCookie && RemoteServer.SECURE_COOKIE.isEmpty) throw new ConfigurationException(
"Configuration option 'akka.remote.server.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.secure-cookie'.")
requireCookie
}
diff --git a/akka-sbt-plugin/project/build.properties b/akka-sbt-plugin/project/build.properties
index 984cdaa83b..fcf840b5f1 100644
--- a/akka-sbt-plugin/project/build.properties
+++ b/akka-sbt-plugin/project/build.properties
@@ -1,5 +1,6 @@
project.name=Akka SBT Plugin
-project.organization=akka
+# need full domain name for publishing to scala-tools
+project.organization=se.scalablesolutions.akka
# mirrors akka version
project.version=1.0-SNAPSHOT
sbt.version=0.7.4
diff --git a/akka-sbt-plugin/src/main/scala/AkkaProject.scala b/akka-sbt-plugin/src/main/scala/AkkaProject.scala
index b89b159c41..2bdc47f2a8 100644
--- a/akka-sbt-plugin/src/main/scala/AkkaProject.scala
+++ b/akka-sbt-plugin/src/main/scala/AkkaProject.scala
@@ -1,13 +1,17 @@
import sbt._
object AkkaRepositories {
- val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository")
- val CodehausRepo = MavenRepository("Codehaus Repo", "http://repository.codehaus.org")
- val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/")
- val JBossRepo = MavenRepository("JBoss Repo", "http://repository.jboss.org/nexus/content/groups/public/")
- val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2")
- val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases")
- val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
+ val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository")
+ val CasbahRepo = MavenRepository("Casbah Repo", "http://repo.bumnetworks.com/releases")
+ val CasbahSnapshotRepo = MavenRepository("Casbah Snapshots", "http://repo.bumnetworks.com/snapshots")
+ val ClojarsRepo = MavenRepository("Clojars Repo", "http://clojars.org/repo")
+ val CodehausRepo = MavenRepository("Codehaus Repo", "http://repository.codehaus.org")
+ val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/")
+ val JBossRepo = MavenRepository("JBoss Repo", "http://repository.jboss.org/nexus/content/groups/public/")
+ val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2")
+ val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases")
+ val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo")
+ val ZookeeperRepo = MavenRepository("Zookeeper Repo", "http://lilycms.org/maven/maven2/deploy/")
}
trait AkkaBaseProject extends BasicScalaProject {
@@ -21,16 +25,20 @@ trait AkkaBaseProject extends BasicScalaProject {
val aspectwerkzModuleConfig = ModuleConfiguration("org.codehaus.aspectwerkz", AkkaRepo)
val cassandraModuleConfig = ModuleConfiguration("org.apache.cassandra", AkkaRepo)
+ val eaioModuleConfig = ModuleConfiguration("com.eaio", AkkaRepo)
val facebookModuleConfig = ModuleConfiguration("com.facebook", AkkaRepo)
+ val h2lzfModuleConfig = ModuleConfiguration("voldemort.store.compress", AkkaRepo)
+ val hbaseModuleConfig = ModuleConfiguration("org.apache.hbase", AkkaRepo)
val jsr166xModuleConfig = ModuleConfiguration("jsr166x", AkkaRepo)
val netLagModuleConfig = ModuleConfiguration("net.lag", AkkaRepo)
val redisModuleConfig = ModuleConfiguration("com.redis", AkkaRepo)
val sbinaryModuleConfig = ModuleConfiguration("sbinary", AkkaRepo)
val sjsonModuleConfig = ModuleConfiguration("sjson.json", AkkaRepo)
- val voldemortModuleConfig = ModuleConfiguration("voldemort.store.compress", AkkaRepo)
+ val triforkModuleConfig = ModuleConfiguration("com.trifork", AkkaRepo)
val vscaladocModuleConfig = ModuleConfiguration("org.scala-tools", "vscaladoc", "1.1-md-3", AkkaRepo)
val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", SonatypeSnapshotRepo)
+ val casbahModuleConfig = ModuleConfiguration("com.novus", CasbahRepo)
val grizzlyModuleConfig = ModuleConfiguration("com.sun.grizzly", JavaNetRepo)
val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", GuiceyFruitRepo)
val jbossModuleConfig = ModuleConfiguration("org.jboss", JBossRepo)
@@ -42,6 +50,9 @@ trait AkkaBaseProject extends BasicScalaProject {
val jgroupsModuleConfig = ModuleConfiguration("jgroups", JBossRepo)
val multiverseModuleConfig = ModuleConfiguration("org.multiverse", CodehausRepo)
val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo)
+ val timeModuleConfig = ModuleConfiguration("org.scala-tools", "time", CasbahSnapshotRepo)
+ val voldemortModuleConfig = ModuleConfiguration("voldemort", ClojarsRepo)
+ val zookeeperModuleConfig = ModuleConfiguration("org.apache.hadoop.zookeeper", ZookeeperRepo)
}
trait AkkaProject extends AkkaBaseProject {
diff --git a/config/akka-reference.conf b/config/akka-reference.conf
index d05382bfa8..16aecf6bc9 100644
--- a/config/akka-reference.conf
+++ b/config/akka-reference.conf
@@ -206,6 +206,12 @@ akka {
port = 8087 #Default Riak Protobuf port
}
}
+
+ memcached {
+ client{
+ addresses = "localhost:11211" #Formatted according to spymemcached "localhost:11211 otherhost:11211" etc..
+ }
+ }
}
camel {
diff --git a/embedded-repo/spy/memcached/2.5/memcached-2.5.jar b/embedded-repo/spy/memcached/2.5/memcached-2.5.jar
new file mode 100644
index 0000000000..87072eaaa0
Binary files /dev/null and b/embedded-repo/spy/memcached/2.5/memcached-2.5.jar differ
diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala
index 44796fc157..19bb49654c 100644
--- a/project/build/AkkaProject.scala
+++ b/project/build/AkkaProject.scala
@@ -276,10 +276,13 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val jetty_mortbay = "org.mortbay.jetty" % "jetty" % "6.1.14" % "test"
//voldemort testing
- lazy val jdom = "org.jdom" % "jdom" % "1.1" % "test"
- lazy val vold_jetty = "org.mortbay.jetty" % "jetty" % "6.1.18" % "test"
- lazy val velocity = "org.apache.velocity" % "velocity" % "1.6.2" % "test"
- lazy val dbcp = "commons-dbcp" % "commons-dbcp" % "1.2.2" % "test"
+ lazy val jdom = "org.jdom" % "jdom" % "1.1" % "test"
+ lazy val vold_jetty = "org.mortbay.jetty" % "jetty" % "6.1.18" % "test"
+ lazy val velocity = "org.apache.velocity" % "velocity" % "1.6.2" % "test"
+ lazy val dbcp = "commons-dbcp" % "commons-dbcp" % "1.2.2" % "test"
+
+ //memcached
+ lazy val spymemcached = "spy" % "memcached" % "2.5" % "compile"
}
// -------------------------------------------------------------------------------------------------------------------
@@ -541,6 +544,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
new AkkaRiakProject(_), akka_persistence_common)
lazy val akka_persistence_couchdb = project("akka-persistence-couchdb", "akka-persistence-couchdb",
new AkkaCouchDBProject(_), akka_persistence_common)
+ lazy val akka_persistence_memcached= project("akka-persistence-memcached", "akka-persistence-memcached",
+ new AkkaMemcachedProject(_), akka_persistence_common)
}
// -------------------------------------------------------------------------------------------------------------------
@@ -661,6 +666,15 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
override def testOptions = createTestFilter( _.endsWith("Test"))
}
+ class AkkaMemcachedProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
+ val memcached = Dependencies.spymemcached
+ val commons_codec = Dependencies.commons_codec
+
+ val scalatest = Dependencies.scalatest
+
+ override def testOptions = createTestFilter( _.endsWith("Test"))
+ }
+
// -------------------------------------------------------------------------------------------------------------------
// akka-kernel subproject
// -------------------------------------------------------------------------------------------------------------------