diff --git a/akka-persistence/akka-persistence-memcached/src/main/scala/MemcachedStorage.scala b/akka-persistence/akka-persistence-memcached/src/main/scala/MemcachedStorage.scala new file mode 100644 index 0000000000..3289a33f12 --- /dev/null +++ b/akka-persistence/akka-persistence-memcached/src/main/scala/MemcachedStorage.scala @@ -0,0 +1,51 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package akka.persistence.memcached + +import akka.actor.{newUuid} +import akka.stm._ +import akka.persistence.common._ + + +object MemcachedStorage extends Storage { + + type ElementType = Array[Byte] + def newMap: PersistentMap[ElementType, ElementType] = newMap(newUuid.toString) + def newVector: PersistentVector[ElementType] = newVector(newUuid.toString) + def newRef: PersistentRef[ElementType] = newRef(newUuid.toString) + override def newQueue: PersistentQueue[ElementType] = newQueue(newUuid.toString) + + def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id) + def getVector(id: String): PersistentVector[ElementType] = newVector(id) + def getRef(id: String): PersistentRef[ElementType] = newRef(id) + override def getQueue(id: String): PersistentQueue[ElementType] = newQueue(id) + + def newMap(id: String): PersistentMap[ElementType, ElementType] = new MemcachedPersistentMap(id) + def newVector(id: String): PersistentVector[ElementType] = new MemcachedPersistentVector(id) + def newRef(id: String): PersistentRef[ElementType] = new MemcachedPersistentRef(id) + override def newQueue(id:String): PersistentQueue[ElementType] = new MemcachedPersistentQueue(id) +} + + +class MemcachedPersistentMap(id: String) extends PersistentMapBinary { + val uuid = id + val storage = MemcachedStorageBackend +} + + +class MemcachedPersistentVector(id: String) extends PersistentVector[Array[Byte]] { + val uuid = id + val storage = MemcachedStorageBackend +} + +class MemcachedPersistentRef(id: String) extends PersistentRef[Array[Byte]] { + val uuid = id + val storage = MemcachedStorageBackend +} + +class MemcachedPersistentQueue(id: String) extends PersistentQueue[Array[Byte]] { + val uuid = id + val storage = MemcachedStorageBackend +} diff --git a/akka-persistence/akka-persistence-memcached/src/main/scala/MemcachedStorageBackend.scala b/akka-persistence/akka-persistence-memcached/src/main/scala/MemcachedStorageBackend.scala new file mode 100644 index 0000000000..c05f710e8c --- /dev/null +++ b/akka-persistence/akka-persistence-memcached/src/main/scala/MemcachedStorageBackend.scala @@ -0,0 +1,94 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package akka.persistence.memcached + +import akka.persistence.common._ +import akka.config.Config.config +import net.spy.memcached._ +import net.spy.memcached.transcoders._ +import collection.JavaConversions +import java.lang.String +import collection.immutable.{TreeMap, Iterable} +import java.util.concurrent.TimeUnit + +private[akka] object MemcachedStorageBackend extends CommonStorageBackend { + + import CommonStorageBackendAccess._ + import CommonStorageBackend._ + import KVStorageBackend._ + import org.apache.commons.codec.binary.Base64 + + val clientAddresses = config.getString("akka.storage.memcached.client.addresses", "localhost:11211") + val factory = new ConnectionFactoryBuilder().setTranscoder(new SerializingTranscoder()).setProtocol(ConnectionFactoryBuilder.Protocol.BINARY).build + val client = new MemcachedClient(factory, AddrUtil.getAddresses(clientAddresses)) + val base64 = new Base64(76, Array.empty[Byte], true) + + def queueAccess = new MemcachedAccess("que") + + def mapAccess = new MemcachedAccess("map") + + def vectorAccess = new MemcachedAccess("vec") + + def refAccess = new MemcachedAccess("ref") + + private[akka] class MemcachedAccess(val accessType: String) extends KVStorageBackendAccess { + + val typeBytes = stringToByteArray(accessType) + + private def encodeKey(key: Array[Byte]): Array[Byte] = { + val newkey = new Array[Byte](key.length + typeBytes.length) + System.arraycopy(key, 0, newkey, 0, key.length) + System.arraycopy(typeBytes, 0, newkey, key.length, typeBytes.length) + newkey + } + + private def keyStr(key: Array[Byte]): String = { + base64.encodeToString(key) + } + + override def decodeKey(owner: String, key: Array[Byte]) = { + val newkey = new Array[Byte](key.length - typeBytes.length) + System.arraycopy(key, 0, newkey, 0, newkey.length) + super.decodeKey(owner, newkey) + } + + def drop() = client.flush() + + def delete(key: Array[Byte]) = { + val deleted = client.delete(keyStr(encodeKey(key))).get(5L, TimeUnit.SECONDS); + () + } + + def getAll(keys: Iterable[Array[Byte]]) = { + val jmap = client.getBulk(JavaConversions.asList(keys.map{ + k: Array[Byte] => + keyStr(encodeKey(k)) + }.toList)) + JavaConversions.asMap(jmap).map{ + kv => kv match { + case (key, value) => (base64.decode(key) -> value.asInstanceOf[Array[Byte]]) + } + } + } + + def getValue(key: Array[Byte], default: Array[Byte]) = { + Option(client.get(keyStr(encodeKey(key)))) match { + case Some(value) => value.asInstanceOf[Array[Byte]] + case None => default + } + } + + def getValue(key: Array[Byte]) = getValue(key, null) + + + def put(key: Array[Byte], value: Array[Byte]) = { + client.set(keyStr(encodeKey(key)), Integer.MAX_VALUE, value).get(5L, TimeUnit.SECONDS); + () + } + + } + + +} \ No newline at end of file diff --git a/akka-persistence/akka-persistence-memcached/src/test/scala/MemcachedStorageBackendCompatibilityTest.scala b/akka-persistence/akka-persistence-memcached/src/test/scala/MemcachedStorageBackendCompatibilityTest.scala new file mode 100644 index 0000000000..6881d25c20 --- /dev/null +++ b/akka-persistence/akka-persistence-memcached/src/test/scala/MemcachedStorageBackendCompatibilityTest.scala @@ -0,0 +1,49 @@ +package akka.persistence.memcached + + +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import akka.persistence.common.{QueueStorageBackendTest, VectorStorageBackendTest, MapStorageBackendTest, RefStorageBackendTest} + +@RunWith(classOf[JUnitRunner]) +class MemcachedRefStorageBackendTestIntegration extends RefStorageBackendTest { + def dropRefs = { + MemcachedStorageBackend.refAccess.drop + } + + + def storage = MemcachedStorageBackend +} + +@RunWith(classOf[JUnitRunner]) +class MemcachedMapStorageBackendTestIntegration extends MapStorageBackendTest { + def dropMaps = { + MemcachedStorageBackend.mapAccess.drop + } + + + def storage = MemcachedStorageBackend +} + +@RunWith(classOf[JUnitRunner]) +class MemcachedVectorStorageBackendTestIntegration extends VectorStorageBackendTest { + def dropVectors = { + MemcachedStorageBackend.vectorAccess.drop + } + + + def storage = MemcachedStorageBackend +} + + +@RunWith(classOf[JUnitRunner]) +class MemcachedQueueStorageBackendTestIntegration extends QueueStorageBackendTest { + def dropQueues = { + MemcachedStorageBackend.queueAccess.drop + } + + + def storage = MemcachedStorageBackend +} + + diff --git a/akka-persistence/akka-persistence-memcached/src/test/scala/MemcachedTicket343TestIntegration.scala b/akka-persistence/akka-persistence-memcached/src/test/scala/MemcachedTicket343TestIntegration.scala new file mode 100644 index 0000000000..3a5da2241f --- /dev/null +++ b/akka-persistence/akka-persistence-memcached/src/test/scala/MemcachedTicket343TestIntegration.scala @@ -0,0 +1,23 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package akka.persistence.memcached + + +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import akka.persistence.common._ + +@RunWith(classOf[JUnitRunner]) +class MemcachedTicket343TestIntegration extends Ticket343Test { + def dropMapsAndVectors: Unit = { + MemcachedStorageBackend.vectorAccess.drop + MemcachedStorageBackend.mapAccess.drop + } + + def getVector: (String) => PersistentVector[Array[Byte]] = MemcachedStorage.getVector + + def getMap: (String) => PersistentMap[Array[Byte], Array[Byte]] = MemcachedStorage.getMap + +} diff --git a/embedded-repo/spy/memcached/2.5/memcached-2.5.jar b/embedded-repo/spy/memcached/2.5/memcached-2.5.jar new file mode 100644 index 0000000000..87072eaaa0 Binary files /dev/null and b/embedded-repo/spy/memcached/2.5/memcached-2.5.jar differ diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 960d1ac892..44bfaf1197 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -279,6 +279,9 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val vold_jetty = "org.mortbay.jetty" % "jetty" % "6.1.18" % "test" lazy val velocity = "org.apache.velocity" % "velocity" % "1.6.2" % "test" lazy val dbcp = "commons-dbcp" % "commons-dbcp" % "1.2.2" % "test" + + //memcached + lazy val spymemcached = "spy" % "memcached" % "2.5" % "compile" } // ------------------------------------------------------------------------------------------------------------------- @@ -540,6 +543,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { new AkkaRiakProject(_), akka_persistence_common) lazy val akka_persistence_couchdb = project("akka-persistence-couchdb", "akka-persistence-couchdb", new AkkaCouchDBProject(_), akka_persistence_common) + lazy val akka_persistence_memcached= project("akka-persistence-memcached", "akka-persistence-memcached", + new AkkaMemcachedProject(_), akka_persistence_common) } // ------------------------------------------------------------------------------------------------------------------- @@ -660,6 +665,15 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { override def testOptions = createTestFilter( _.endsWith("Test")) } + class AkkaMemcachedProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { + val memcached = Dependencies.spymemcached + val commons_codec = Dependencies.commons_codec + + val scalatest = Dependencies.scalatest + + override def testOptions = createTestFilter( _.endsWith("Test")) + } + // ------------------------------------------------------------------------------------------------------------------- // akka-kernel subproject // -------------------------------------------------------------------------------------------------------------------