Memcached Storage Backend

This commit is contained in:
ticktock 2010-11-01 20:26:28 -04:00
parent c276c621bf
commit 1af208532a
6 changed files with 231 additions and 0 deletions

View file

@ -0,0 +1,51 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.persistence.memcached
import akka.actor.{newUuid}
import akka.stm._
import akka.persistence.common._
object MemcachedStorage extends Storage {
type ElementType = Array[Byte]
def newMap: PersistentMap[ElementType, ElementType] = newMap(newUuid.toString)
def newVector: PersistentVector[ElementType] = newVector(newUuid.toString)
def newRef: PersistentRef[ElementType] = newRef(newUuid.toString)
override def newQueue: PersistentQueue[ElementType] = newQueue(newUuid.toString)
def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id)
def getVector(id: String): PersistentVector[ElementType] = newVector(id)
def getRef(id: String): PersistentRef[ElementType] = newRef(id)
override def getQueue(id: String): PersistentQueue[ElementType] = newQueue(id)
def newMap(id: String): PersistentMap[ElementType, ElementType] = new MemcachedPersistentMap(id)
def newVector(id: String): PersistentVector[ElementType] = new MemcachedPersistentVector(id)
def newRef(id: String): PersistentRef[ElementType] = new MemcachedPersistentRef(id)
override def newQueue(id:String): PersistentQueue[ElementType] = new MemcachedPersistentQueue(id)
}
class MemcachedPersistentMap(id: String) extends PersistentMapBinary {
val uuid = id
val storage = MemcachedStorageBackend
}
class MemcachedPersistentVector(id: String) extends PersistentVector[Array[Byte]] {
val uuid = id
val storage = MemcachedStorageBackend
}
class MemcachedPersistentRef(id: String) extends PersistentRef[Array[Byte]] {
val uuid = id
val storage = MemcachedStorageBackend
}
class MemcachedPersistentQueue(id: String) extends PersistentQueue[Array[Byte]] {
val uuid = id
val storage = MemcachedStorageBackend
}

View file

@ -0,0 +1,94 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.persistence.memcached
import akka.persistence.common._
import akka.config.Config.config
import net.spy.memcached._
import net.spy.memcached.transcoders._
import collection.JavaConversions
import java.lang.String
import collection.immutable.{TreeMap, Iterable}
import java.util.concurrent.TimeUnit
private[akka] object MemcachedStorageBackend extends CommonStorageBackend {
import CommonStorageBackendAccess._
import CommonStorageBackend._
import KVStorageBackend._
import org.apache.commons.codec.binary.Base64
val clientAddresses = config.getString("akka.storage.memcached.client.addresses", "localhost:11211")
val factory = new ConnectionFactoryBuilder().setTranscoder(new SerializingTranscoder()).setProtocol(ConnectionFactoryBuilder.Protocol.BINARY).build
val client = new MemcachedClient(factory, AddrUtil.getAddresses(clientAddresses))
val base64 = new Base64(76, Array.empty[Byte], true)
def queueAccess = new MemcachedAccess("que")
def mapAccess = new MemcachedAccess("map")
def vectorAccess = new MemcachedAccess("vec")
def refAccess = new MemcachedAccess("ref")
private[akka] class MemcachedAccess(val accessType: String) extends KVStorageBackendAccess {
val typeBytes = stringToByteArray(accessType)
private def encodeKey(key: Array[Byte]): Array[Byte] = {
val newkey = new Array[Byte](key.length + typeBytes.length)
System.arraycopy(key, 0, newkey, 0, key.length)
System.arraycopy(typeBytes, 0, newkey, key.length, typeBytes.length)
newkey
}
private def keyStr(key: Array[Byte]): String = {
base64.encodeToString(key)
}
override def decodeKey(owner: String, key: Array[Byte]) = {
val newkey = new Array[Byte](key.length - typeBytes.length)
System.arraycopy(key, 0, newkey, 0, newkey.length)
super.decodeKey(owner, newkey)
}
def drop() = client.flush()
def delete(key: Array[Byte]) = {
val deleted = client.delete(keyStr(encodeKey(key))).get(5L, TimeUnit.SECONDS);
()
}
def getAll(keys: Iterable[Array[Byte]]) = {
val jmap = client.getBulk(JavaConversions.asList(keys.map{
k: Array[Byte] =>
keyStr(encodeKey(k))
}.toList))
JavaConversions.asMap(jmap).map{
kv => kv match {
case (key, value) => (base64.decode(key) -> value.asInstanceOf[Array[Byte]])
}
}
}
def getValue(key: Array[Byte], default: Array[Byte]) = {
Option(client.get(keyStr(encodeKey(key)))) match {
case Some(value) => value.asInstanceOf[Array[Byte]]
case None => default
}
}
def getValue(key: Array[Byte]) = getValue(key, null)
def put(key: Array[Byte], value: Array[Byte]) = {
client.set(keyStr(encodeKey(key)), Integer.MAX_VALUE, value).get(5L, TimeUnit.SECONDS);
()
}
}
}

View file

@ -0,0 +1,49 @@
package akka.persistence.memcached
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import akka.persistence.common.{QueueStorageBackendTest, VectorStorageBackendTest, MapStorageBackendTest, RefStorageBackendTest}
@RunWith(classOf[JUnitRunner])
class MemcachedRefStorageBackendTestIntegration extends RefStorageBackendTest {
def dropRefs = {
MemcachedStorageBackend.refAccess.drop
}
def storage = MemcachedStorageBackend
}
@RunWith(classOf[JUnitRunner])
class MemcachedMapStorageBackendTestIntegration extends MapStorageBackendTest {
def dropMaps = {
MemcachedStorageBackend.mapAccess.drop
}
def storage = MemcachedStorageBackend
}
@RunWith(classOf[JUnitRunner])
class MemcachedVectorStorageBackendTestIntegration extends VectorStorageBackendTest {
def dropVectors = {
MemcachedStorageBackend.vectorAccess.drop
}
def storage = MemcachedStorageBackend
}
@RunWith(classOf[JUnitRunner])
class MemcachedQueueStorageBackendTestIntegration extends QueueStorageBackendTest {
def dropQueues = {
MemcachedStorageBackend.queueAccess.drop
}
def storage = MemcachedStorageBackend
}

View file

@ -0,0 +1,23 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.persistence.memcached
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import akka.persistence.common._
@RunWith(classOf[JUnitRunner])
class MemcachedTicket343TestIntegration extends Ticket343Test {
def dropMapsAndVectors: Unit = {
MemcachedStorageBackend.vectorAccess.drop
MemcachedStorageBackend.mapAccess.drop
}
def getVector: (String) => PersistentVector[Array[Byte]] = MemcachedStorage.getVector
def getMap: (String) => PersistentMap[Array[Byte], Array[Byte]] = MemcachedStorage.getMap
}

Binary file not shown.

View file

@ -279,6 +279,9 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val vold_jetty = "org.mortbay.jetty" % "jetty" % "6.1.18" % "test"
lazy val velocity = "org.apache.velocity" % "velocity" % "1.6.2" % "test"
lazy val dbcp = "commons-dbcp" % "commons-dbcp" % "1.2.2" % "test"
//memcached
lazy val spymemcached = "spy" % "memcached" % "2.5" % "compile"
}
// -------------------------------------------------------------------------------------------------------------------
@ -540,6 +543,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
new AkkaRiakProject(_), akka_persistence_common)
lazy val akka_persistence_couchdb = project("akka-persistence-couchdb", "akka-persistence-couchdb",
new AkkaCouchDBProject(_), akka_persistence_common)
lazy val akka_persistence_memcached= project("akka-persistence-memcached", "akka-persistence-memcached",
new AkkaMemcachedProject(_), akka_persistence_common)
}
// -------------------------------------------------------------------------------------------------------------------
@ -660,6 +665,15 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
override def testOptions = createTestFilter( _.endsWith("Test"))
}
class AkkaMemcachedProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
val memcached = Dependencies.spymemcached
val commons_codec = Dependencies.commons_codec
val scalatest = Dependencies.scalatest
override def testOptions = createTestFilter( _.endsWith("Test"))
}
// -------------------------------------------------------------------------------------------------------------------
// akka-kernel subproject
// -------------------------------------------------------------------------------------------------------------------