diff --git a/akka-persistence/akka-persistence-cassandra/pom.xml b/akka-persistence/akka-persistence-cassandra/pom.xml index 4bca9ffbac..d8490382d5 100644 --- a/akka-persistence/akka-persistence-cassandra/pom.xml +++ b/akka-persistence/akka-persistence-cassandra/pom.xml @@ -18,6 +18,12 @@ akka-persistence-common ${project.groupId} ${project.version} + + + com.google.code.google-collections + google-collect + + @@ -26,6 +32,50 @@ cassandra 0.5.0 + + org.apache.cassandra + high-scale-lib + 0.5.0 + test + + + org.apache.cassandra + clhm-production + 0.5.0 + test + + + com.google.collections + google-collections + 1.0-rc1 + test + + + commons-collections + commons-collections + 3.2.1 + test + + + commons-lang + commons-lang + 2.4 + test + + + org.slf4j + slf4j-api + 1.5.8 + test + + + org.slf4j + slf4j-log4j12 + 1.5.8 + test + + + log4j log4j diff --git a/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraSession.scala b/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraSession.scala index 0cbf9fa04b..0b0c5ca43a 100644 --- a/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraSession.scala +++ b/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraSession.scala @@ -104,6 +104,42 @@ trait CassandraSession extends Closeable with Flushable { // ==================================== // ====== Java-style API names // ==================================== + + def getSlice(key: String, columnParent: ColumnParent, start: Array[Byte], end: Array[Byte], ascending: Boolean, count: Int) = / (key, columnParent, start, end, ascending, count, consistencyLevel) + + def getSlice(key: String, columnParent: ColumnParent, start: Array[Byte], end: Array[Byte], ascending: Boolean, count: Int, consistencyLevel: Int) = / (key, columnParent, start, end, ascending, count, consistencyLevel) + + def getSlice(key: String, columnParent: ColumnParent, slicePredicate: SlicePredicate) = / (key, columnParent, slicePredicate) + + def getSlice(key: String, columnParent: ColumnParent, slicePredicate: SlicePredicate, consistencyLevel: Int) = / (key, columnParent, slicePredicate, consistencyLevel) + + + def get(key: String, colPath: ColumnPath) = |(key, colPath) + + def get(key: String, colPath: ColumnPath, consistencyLevel: Int) = |(key, colPath, consistencyLevel) + + def getCount(key: String, columnParent: ColumnParent)= |#(key, columnParent) + + def getCount(key: String, columnParent: ColumnParent, consistencyLevel: Int) = |#(key, columnParent, consistencyLevel) + + + def insert(key: String, colPath: ColumnPath, value: Array[Byte]): Unit = ++|(key, colPath, value) + + def insert(key: String, colPath: ColumnPath, value: Array[Byte], consistencyLevel: Int): Unit = ++|(key, colPath, value, consistencyLevel) + + def insert(key: String, colPath: ColumnPath, value: Array[Byte], timestamp: Long): Unit = ++|(key, colPath, value, timestamp) + + def insert(key: String, colPath: ColumnPath, value: Array[Byte], timestamp: Long, consistencyLevel: Int) = ++|(key, colPath, value, timestamp, consistencyLevel) + + + def insert(key: String, batch: Map[String, List[ColumnOrSuperColumn]]): Unit = ++|(key, batch) + + def insert(key: String, batch: Map[String, List[ColumnOrSuperColumn]], consistencyLevel: Int): Unit = ++|(key, batch, consistencyLevel) + + def remove(key: String, columnPath: ColumnPath, timestamp: Long): Unit = --(key, columnPath, timestamp) + + def remove(key: String, columnPath: ColumnPath, timestamp: Long, consistencyLevel: Int): Unit = --(key, columnPath, timestamp, consistencyLevel) + } class CassandraSessionPool[T <: TTransport]( diff --git a/akka-persistence/akka-persistence-cassandra/src/test/resources/log4j.properties b/akka-persistence/akka-persistence-cassandra/src/test/resources/log4j.properties new file mode 100644 index 0000000000..3c8738fdc3 --- /dev/null +++ b/akka-persistence/akka-persistence-cassandra/src/test/resources/log4j.properties @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +log4j.rootLogger=DEBUG,R + +# rolling log file ("system.log +log4j.appender.R=org.apache.log4j.DailyRollingFileAppender +log4j.appender.R.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.R.layout=org.apache.log4j.PatternLayout +log4j.appender.R.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n +log4j.appender.R.File=target/logs/system.log diff --git a/akka-persistence/akka-persistence-cassandra/src/test/resources/storage-conf.xml b/akka-persistence/akka-persistence-cassandra/src/test/resources/storage-conf.xml new file mode 100644 index 0000000000..13df178689 --- /dev/null +++ b/akka-persistence/akka-persistence-cassandra/src/test/resources/storage-conf.xml @@ -0,0 +1,337 @@ + + + + + + + + akka + + + false + + + + + + 0.01 + + + + + + + + + + + + org.apache.cassandra.dht.RandomPartitioner + + + + + + org.apache.cassandra.locator.EndPointSnitch + + + org.apache.cassandra.locator.RackUnawareStrategy + + + 1 + + + target/cassandra/commitlog + + target/cassandra/data + + target/cassandra/callouts + target/cassandra/staging + + + + + 127.0.0.1 + + + + + + + 5000 + + 128 + + + + + + localhost + + 7000 + + 7001 + + + localhost + + 9160 + + false + + + + + + + + 64 + + + 32 + 8 + + + 64 + + + 64 + + 0.1 + + 60 + + + 8 + 32 + + + periodic + + 10000 + + + + + 864000 + + + 256 + + diff --git a/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala b/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala index 2142311f76..690c69625e 100644 --- a/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala +++ b/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala @@ -6,6 +6,9 @@ import junit.framework.TestCase import org.junit.Test import org.junit.Assert._ +import org.apache.cassandra.service.CassandraDaemon +import org.junit.BeforeClass +import org.junit.Before case class GetMapState(key: String) case object GetVectorState @@ -70,7 +73,10 @@ class CassandraPersistentActor extends Actor { } } -class CassandraPersistentActorSpec extends TestCase { +class CassandraPersistentActorTest { + + @Before + def startCassandra = EmbeddedCassandraService.start @Test def testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { @@ -144,4 +150,28 @@ class CassandraPersistentActorSpec extends TestCase { val result: Array[Byte] = (stateful !! GetRefState).get assertEquals("init", new String(result, 0, result.length, "UTF-8")) // check that state is == init state } + } + +import org.apache.cassandra.service.CassandraDaemon +object EmbeddedCassandraService { + + System.setProperty("storage-config", "src/test/resources"); + + val cassandra = new Runnable { + + val cassandraDaemon = new CassandraDaemon + cassandraDaemon.init(null) + + def run = cassandraDaemon.start + + } + + // spawn cassandra in a new thread + val t = new Thread(cassandra) + t.setDaemon(true) + t.start + + def start: Unit = {} + +} \ No newline at end of file diff --git a/embedded-repo/org/apache/cassandra/clhm-production/0.5.0/clhm-production-0.5.0.jar b/embedded-repo/org/apache/cassandra/clhm-production/0.5.0/clhm-production-0.5.0.jar new file mode 100644 index 0000000000..028f505bb9 Binary files /dev/null and b/embedded-repo/org/apache/cassandra/clhm-production/0.5.0/clhm-production-0.5.0.jar differ diff --git a/embedded-repo/org/apache/cassandra/high-scale-lib/0.5.0/high-scale-lib-0.5.0.jar b/embedded-repo/org/apache/cassandra/high-scale-lib/0.5.0/high-scale-lib-0.5.0.jar new file mode 100644 index 0000000000..421a436eed Binary files /dev/null and b/embedded-repo/org/apache/cassandra/high-scale-lib/0.5.0/high-scale-lib-0.5.0.jar differ