diff --git a/akka-persistence/akka-persistence-riak/src/main/scala/RiakStorageBackend.scala b/akka-persistence/akka-persistence-riak/src/main/scala/RiakStorageBackend.scala index 940056cbfb..3a4a436e26 100644 --- a/akka-persistence/akka-persistence-riak/src/main/scala/RiakStorageBackend.scala +++ b/akka-persistence/akka-persistence-riak/src/main/scala/RiakStorageBackend.scala @@ -470,16 +470,20 @@ MapStorageBackend[Array[Byte], Array[Byte]] with val default: Int = 0xfffffffb def put(key: Array[Byte], value: Array[Byte]) = { - riakClient.store(new RiakObject(bucket, key, value), new RequestMeta().w(quorum).dw(quorum)) + val objs: Array[RiakObject] = riakClient.fetch(bucket, key, quorum) + objs.size match { + case 0 => riakClient.store(new RiakObject(bucket, key, value), new RequestMeta().w(quorum).dw(quorum)) + case _ => riakClient.store(new RiakObject(objs(0).getVclock, bucket, key, value),new RequestMeta().w(quorum).dw(quorum)) + } } def getValue(key: Array[Byte]): Array[Byte] = { val objs = riakClient.fetch(bucket, key, quorum) objs.size match { case 0 => null; - case _ => objs(0).getValue.isEmpty match { + case _ => objs.last.getValue.isEmpty match { case true => null - case false => objs(0).getValue + case false => objs.last.getValue } } } diff --git a/akka-persistence/akka-persistence-riak/src/test/scala/RiakStorageBackendTestIntegration.scala b/akka-persistence/akka-persistence-riak/src/test/scala/RiakStorageBackendTestIntegration.scala deleted file mode 100644 index e72cc28ba4..0000000000 --- a/akka-persistence/akka-persistence-riak/src/test/scala/RiakStorageBackendTestIntegration.scala +++ /dev/null @@ -1,53 +0,0 @@ -package se.scalablesolutions.akka.persistence.riak - -import org.scalatest.matchers.ShouldMatchers -import org.junit.runner.RunWith -import org.scalatest.junit.JUnitRunner -import se.scalablesolutions.akka.persistence.riak.RiakStorageBackend._ -import se.scalablesolutions.akka.util.{Logging} -import collection.immutable.TreeSet -import scala.None -import org.scalatest.{Spec, FunSuite} -import com.trifork.riak.{RiakObject, RiakClient} -import collection.JavaConversions - -@RunWith(classOf[JUnitRunner]) -class RiakStorageBackendTestIntegration extends Spec with ShouldMatchers with Logging { - - import se.scalablesolutions.akka.persistence.riak.RiakStorageBackend.RiakAccess._ - - describe("successfuly configuring the riak pb client"){ - it("should connect to riak, if riak is running"){ - val riakClient = new RiakClient("localhost"); - val props = riakClient.getServerInfo - JavaConversions.asMap(props) foreach { - _ match { - case (k,v) => log.info("%s -> %s",k,v) - } - } - val maps = riakClient.getBucketProperties("Maps") - debug(maps) - riakClient.listBuckets should not be (null) - riakClient.store(new RiakObject("Maps", "testkey", "testvalue")) - riakClient.fetch("Maps", "testkey").isEmpty should be (false) - riakClient.fetch("Maps", "testkey")(0).getValue.toStringUtf8 should be("testvalue") - //riakClient.delete("Maps","testkey") - RiakStorageBackend.MapClient.delete("testkey") - RiakStorageBackend.VectorClient.quorum - riakClient.fetch("Maps", "testkey").isEmpty should be (true) - riakClient.store(new RiakObject("Maps", "testkey", "testvalue")) - riakClient.fetch("Maps", "testkey").isEmpty should be (false) - riakClient.fetch("Maps", "testkey")(0).getValue.toStringUtf8 should be("testvalue") - //riakClient.delete("Maps","testkey") - RiakStorageBackend.MapClient.delete("testkey") - - riakClient.fetch("Maps", "testkey").isEmpty should be (true) - } - } - - - def debug(ani:Any){ - log.debug("ani") - } - -} \ No newline at end of file diff --git a/akka-persistence/akka-persistence-riak/src/test/scala/RiakTicket343TestIntegration.scala b/akka-persistence/akka-persistence-riak/src/test/scala/RiakTicket343TestIntegration.scala index 9782730803..5fe32e9424 100644 --- a/akka-persistence/akka-persistence-riak/src/test/scala/RiakTicket343TestIntegration.scala +++ b/akka-persistence/akka-persistence-riak/src/test/scala/RiakTicket343TestIntegration.scala @@ -10,7 +10,7 @@ import org.scalatest.junit.JUnitRunner import se.scalablesolutions.akka.persistence.common._ @RunWith(classOf[JUnitRunner]) -class RiakTicket343Test extends Ticket343Test { +class RiakTicket343TestIntegration extends Ticket343Test { def dropMapsAndVectors: Unit = { RiakStorageBackend.VectorClient.drop RiakStorageBackend.MapClient.drop diff --git a/embedded-repo/com/trifork/riak-java-pb-client/1.0-for-akka-by-ticktock/riak-java-pb-client-1.0-for-akka-by-ticktock.jar b/embedded-repo/com/trifork/riak-java-pb-client/1.0-for-akka-by-ticktock/riak-java-pb-client-1.0-for-akka-by-ticktock.jar new file mode 100644 index 0000000000..053eb397c7 Binary files /dev/null and b/embedded-repo/com/trifork/riak-java-pb-client/1.0-for-akka-by-ticktock/riak-java-pb-client-1.0-for-akka-by-ticktock.jar differ diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 44655aab6b..2fb0b11e26 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -86,7 +86,6 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val casbahModuleConfig = ModuleConfiguration("com.novus", CasbahRepo) lazy val timeModuleConfig = ModuleConfiguration("org.scala-tools", "time", CasbahSnapshotRepo) lazy val voldemortModuleConfig = ModuleConfiguration("voldemort", ClojarsRepo) - lazy val riakPBModuleConfig = ModuleConfiguration("org.clojars.mmcgrana", ClojarsRepo) lazy val embeddedRepo = EmbeddedRepo // This is the only exception, because the embedded repo is fast! // ------------------------------------------------------------------------------------------------------------------- @@ -251,7 +250,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val dbcp = "commons-dbcp" % "commons-dbcp" % "1.2.2" % "test" //Riak PB Client - lazy val riak_pb_client = "org.clojars.mmcgrana" % "riak-java-pb-client" % "0.1.0-SNAPSHOT" % "compile" + lazy val riak_pb_client = "com.trifork" % "riak-java-pb-client" % "1.0-for-akka-by-ticktock" % "compile" } // ------------------------------------------------------------------------------------------------------------------- @@ -615,6 +614,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { class AkkaRiakProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { val riak_pb = Dependencies.riak_pb_client + val protobuf = Dependencies.protobuf //testing val scalatest = Dependencies.scalatest