diff --git a/akka-persistence/akka-persistence-simpledb/src/main/scala/SimpledbStorageBackend.scala b/akka-persistence/akka-persistence-simpledb/src/main/scala/SimpledbStorageBackend.scala index 0ede9979d9..6c6b5eebdc 100644 --- a/akka-persistence/akka-persistence-simpledb/src/main/scala/SimpledbStorageBackend.scala +++ b/akka-persistence/akka-persistence-simpledb/src/main/scala/SimpledbStorageBackend.scala @@ -6,55 +6,132 @@ package akka.persistence.simpledb import akka.persistence.common._ import akka.config.Config.config -import net.spy.memcached._ -import net.spy.memcached.transcoders._ -import collection.JavaConversions import java.lang.String -import collection.immutable.{TreeMap, Iterable} -import java.util.concurrent.{TimeoutException, Future, TimeUnit} -import org.sublime.amazon.simpleDB.api.SimpleDBAccount +import java.util.{List => JList, ArrayList => JAList} + +import collection.immutable.{HashMap, Iterable} +import collection.mutable.{HashMap => MMap} + +import com.amazonaws.auth.BasicAWSCredentials +import com.amazonaws.services.simpledb.AmazonSimpleDBClient +import com.amazonaws.services.simpledb.model._ +import collection.{JavaConversions, Map} private[akka] object SimpledbStorageBackend extends CommonStorageBackend { - - import CommonStorageBackendAccess._ - import CommonStorageBackend._ - import KVStorageBackend._ import org.apache.commons.codec.binary.Base64 - - val base64 = new Base64(76, Array.empty[Byte], true) + val seperator = "\r\n" + val seperatorBytes = seperator.getBytes("UTF-8") + val base64 = new Base64(1024, seperatorBytes, true) + val base64key = new Base64(1024, Array.empty[Byte], true) val key = config.getString("akka.storage.simpledb.account.key", "foo") val keyId = config.getString("akka.storage.simpledb.account.keyId", "bar") val refDomain = config.getString("akka.storage.simpledb.domain.ref", "ref") val mapDomain = config.getString("akka.storage.simpledb.domain.map", "map") val queueDomain = config.getString("akka.storage.simpledb.domain.queue", "queue") val vectorDomain = config.getString("akka.storage.simpledb.domain.vector", "vector") + val credentials = new BasicAWSCredentials(key, keyId) + val client = new AmazonSimpleDBClient(credentials) - val account = new SimpleDBAccount(key,keyId) - def queueAccess = new SimpledbAccess(queueDomain) + def queueAccess = queue - def mapAccess = new SimpledbAccess(mapDomain) + def mapAccess = map - def vectorAccess = new SimpledbAccess(vectorDomain) + def vectorAccess = vector - def refAccess = new SimpledbAccess(refDomain) + def refAccess = ref - private[akka] class SimpledbAccess(val domainName: String) extends CommonStorageBackendAccess { + val queue = new SimpledbAccess(queueDomain) - val domain = account domain(domainName) - domain create + val map = new SimpledbAccess(mapDomain) - def drop() = domain delete + val vector = new SimpledbAccess(vectorDomain) - def getAll(owner: String, keys: Iterable[Array[Byte]]) = null + val ref = new SimpledbAccess(refDomain) - def delete(owner: String, key: Array[Byte]) = null + private[akka] class SimpledbAccess(val domainName: String) extends KVStorageBackendAccess { + client.createDomain(new CreateDomainRequest(domainName)) - def put(owner: String, key: Array[Byte], value: Array[Byte]) = null + def drop(): Unit = client.deleteDomain(new DeleteDomainRequest(domainName)) - def getValue(owner: String, key: Array[Byte], default: Array[Byte]) = { - domain.item(owner) + def delete(key: Array[Byte]): Unit = client.deleteAttributes(new DeleteAttributesRequest(domainName, encodeAndValidateKey(key))) + + def getAll(keys: Iterable[Array[Byte]]): Map[Array[Byte], Array[Byte]] = { + keys.foldLeft(new HashMap[Array[Byte], Array[Byte]]) { + (map, key) => { + val value = getValue(key) + if (value != null) { + map + (key -> getValue(key)) + } else { + map + } + } + } } + + def getValue(key: Array[Byte], default: Array[Byte]): Array[Byte] = { + val req = new GetAttributesRequest(domainName, encodeAndValidateKey(key)).withConsistentRead(true) + val resp = client.getAttributes(req) + recomposeValue(resp.getAttributes) match { + case Some(value) => value + case None => default + } + } + + def getValue(key: Array[Byte]): Array[Byte] = getValue(key, null) + + def put(key: Array[Byte], value: Array[Byte]): Unit = { + val req = new PutAttributesRequest(domainName, encodeAndValidateKey(key), decomposeValue(value)) + client.putAttributes(req) + } + + def encodeAndValidateKey(key: Array[Byte]): String = { + val keystr = base64key.encodeToString(key) + if (keystr.length > 1024) { + throw new IllegalArgumentException("encoded key was longer than 1024 bytes") + } + keystr + } + + def decomposeValue(value: Array[Byte]): JList[ReplaceableAttribute] = { + val encoded = base64.encodeToString(value) + val strings = encoded.split(seperator) + if (strings.size > 255) { + throw new IllegalArgumentException("The decomposed value is larger than 255K") + } + + val list: JAList[ReplaceableAttribute] = strings.zipWithIndex.foldLeft(new JAList[ReplaceableAttribute]) { + (list, zip) => { + zip match { + case (encode, index) => { + list.add(new ReplaceableAttribute(index.toString, encode, true)) + list + } + } + } + } + list.add(new ReplaceableAttribute("size", list.size.toString, true)) + list + } + + def recomposeValue(atts: JList[Attribute]): Option[Array[Byte]] = { + val itemSnapshot = JavaConversions.asIterable(atts).foldLeft(new MMap[String, String]) { + (map, att) => { + map += (att.getName -> att.getValue) + } + } + itemSnapshot.get("size") match { + case Some(strSize) => { + val size = Integer.parseInt(strSize) + val encoded = (0 until size).map(_.toString).map(itemSnapshot.get(_).get).reduceLeft[String] { + (acc, str) => acc + seperator + str + } + Some(base64.decode(encoded)) + } + case None => None + } + } + } diff --git a/akka-persistence/akka-persistence-simpledb/src/test/scala/SimpledbStorageBackendCompatibilityTest.scala b/akka-persistence/akka-persistence-simpledb/src/test/scala/SimpledbStorageBackendCompatibilityTest.scala index 44f1e27f3d..3e2df27160 100644 --- a/akka-persistence/akka-persistence-simpledb/src/test/scala/SimpledbStorageBackendCompatibilityTest.scala +++ b/akka-persistence/akka-persistence-simpledb/src/test/scala/SimpledbStorageBackendCompatibilityTest.scala @@ -1,12 +1,49 @@ -package +package akka.persistence.simpledb -/** - * Created by IntelliJ IDEA. - * User: ticktock - * Date: 11/7/10 - * Time: 7:48 PM - * To change this template use File | Settings | File Templates. - */ +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import akka.persistence.common.{QueueStorageBackendTest, VectorStorageBackendTest, MapStorageBackendTest, RefStorageBackendTest} + +@RunWith(classOf[JUnitRunner]) +class SimpledbRefStorageBackendTestIntegration extends RefStorageBackendTest { + def dropRefs = { + SimpledbStorageBackend.refAccess.drop + } + + + def storage = SimpledbStorageBackend +} + +@RunWith(classOf[JUnitRunner]) +class SimpledbMapStorageBackendTestIntegration extends MapStorageBackendTest { + def dropMaps = { + SimpledbStorageBackend.mapAccess.drop + } + + + def storage = SimpledbStorageBackend +} + +@RunWith(classOf[JUnitRunner]) +class SimpledbVectorStorageBackendTestIntegration extends VectorStorageBackendTest { + def dropVectors = { + SimpledbStorageBackend.vectorAccess.drop + } + + + def storage = SimpledbStorageBackend +} + + +@RunWith(classOf[JUnitRunner]) +class SimpledbQueueStorageBackendTestIntegration extends QueueStorageBackendTest { + def dropQueues = { + SimpledbStorageBackend.queueAccess.drop + } + + + def storage = SimpledbStorageBackend +} + -class SimpledbStorageBackendCompatibilityTest \ No newline at end of file diff --git a/akka-persistence/akka-persistence-simpledb/src/test/scala/SimpledbTicket343TestIntegration.scala b/akka-persistence/akka-persistence-simpledb/src/test/scala/SimpledbTicket343TestIntegration.scala index f8210d507f..6fc1e75fd3 100644 --- a/akka-persistence/akka-persistence-simpledb/src/test/scala/SimpledbTicket343TestIntegration.scala +++ b/akka-persistence/akka-persistence-simpledb/src/test/scala/SimpledbTicket343TestIntegration.scala @@ -1,12 +1,23 @@ -package - - /** - * Created by IntelliJ IDEA. - * User: ticktock - * Date: 11/7/10 - * Time: 7:48 PM - * To change this template use File | Settings | File Templates. + * Copyright (C) 2009-2010 Scalable Solutions AB */ -class SimpledbTicket343TestIntegration \ No newline at end of file +package akka.persistence.simpledb + + +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import akka.persistence.common._ + +@RunWith(classOf[JUnitRunner]) +class SimpledbTicket343TestIntegration extends Ticket343Test { + def dropMapsAndVectors: Unit = { + SimpledbStorageBackend.vectorAccess.drop + SimpledbStorageBackend.mapAccess.drop + } + + def getVector: (String) => PersistentVector[Array[Byte]] = SimpledbStorage.getVector + + def getMap: (String) => PersistentMap[Array[Byte], Array[Byte]] = SimpledbStorage.getMap + +} diff --git a/embedded-repo/com/amazonaws/aws-java-sdk/1.0.14/aws-java-sdk-1.0.14-javadoc.jar b/embedded-repo/com/amazonaws/aws-java-sdk/1.0.14/aws-java-sdk-1.0.14-javadoc.jar new file mode 100644 index 0000000000..6ab90957c2 Binary files /dev/null and b/embedded-repo/com/amazonaws/aws-java-sdk/1.0.14/aws-java-sdk-1.0.14-javadoc.jar differ diff --git a/embedded-repo/com/amazonaws/aws-java-sdk/1.0.14/aws-java-sdk-1.0.14-sources.jar b/embedded-repo/com/amazonaws/aws-java-sdk/1.0.14/aws-java-sdk-1.0.14-sources.jar new file mode 100644 index 0000000000..0a97b3fa7b Binary files /dev/null and b/embedded-repo/com/amazonaws/aws-java-sdk/1.0.14/aws-java-sdk-1.0.14-sources.jar differ diff --git a/embedded-repo/com/amazonaws/aws-java-sdk/1.0.14/aws-java-sdk-1.0.14.jar b/embedded-repo/com/amazonaws/aws-java-sdk/1.0.14/aws-java-sdk-1.0.14.jar new file mode 100644 index 0000000000..a11205d066 Binary files /dev/null and b/embedded-repo/com/amazonaws/aws-java-sdk/1.0.14/aws-java-sdk-1.0.14.jar differ diff --git a/embedded-repo/org/sublime/amazon-simpledb/0.9-SNAPSHOT/amazon-simpledb-0.9-SNAPSHOT.jar b/embedded-repo/org/sublime/amazon-simpledb/0.9-SNAPSHOT/amazon-simpledb-0.9-SNAPSHOT.jar deleted file mode 100644 index 971d386f3f..0000000000 Binary files a/embedded-repo/org/sublime/amazon-simpledb/0.9-SNAPSHOT/amazon-simpledb-0.9-SNAPSHOT.jar and /dev/null differ diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 7b7eb227e3..38482ea37f 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -285,7 +285,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val spymemcached = "spy" % "memcached" % "2.5" % "compile" //simpledb - lazy val simpledb = "org.sublime" % "amazon-simpledb" % "0.9-SNAPSHOT" % "compile" + lazy val simpledb = "com.amazonaws" % "aws-java-sdk" % "1.0.14" % "compile" } // -------------------------------------------------------------------------------------------------------------------