diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index f8eb5e0347..732d3fabcd 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -90,7 +90,7 @@ object Actor extends Logging { */ def actor(body: PartialFunction[Any, Unit]): Actor = new Actor() { start - def receive = body + def receive: PartialFunction[Any, Unit] = body } /** @@ -108,8 +108,8 @@ object Actor extends Logging { * * */ - def actor[A](body: => Unit) = { - def handler[A](body: => Unit) = new { + def actor(body: => Unit) = { + def handler(body: => Unit) = new { def receive(handler: PartialFunction[Any, Unit]) = new Actor() { start body @@ -141,8 +141,7 @@ object Actor extends Logging { start send(Spawn) def receive = { - case Spawn => body - case _ => throw new IllegalArgumentException("Actors created with 'actor(body: => Unit)' do not respond to messages.") + case Spawn => body; stop } } } @@ -537,19 +536,13 @@ trait Actor extends TransactionManagement { */ def !![T](message: Any): Option[T] = !![T](message, timeout) - - /* - //FIXME 2.8 def !!!(message: Any)(implicit sender: AnyRef = None): FutureResult = { - def !!!(message: Any)(implicit sender: AnyRef): FutureResult = { + def !!!(message: Any): FutureResult = { if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") if (_isRunning) { - val from = if (sender != null && sender.isInstanceOf[Actor]) Some(sender.asInstanceOf[Actor]) - else None - postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, from) + postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, None) } else throw new IllegalStateException( "Actor has not been started, you need to invoke 'actor.start' before using it") } - */ /** * This method is evil and has been removed. Use '!!' with a timeout instead. diff --git a/akka-core/src/main/scala/dispatch/Future.scala b/akka-core/src/main/scala/dispatch/Future.scala index a18ac14372..c1e61695b8 100644 --- a/akka-core/src/main/scala/dispatch/Future.scala +++ b/akka-core/src/main/scala/dispatch/Future.scala @@ -8,10 +8,52 @@ package se.scalablesolutions.akka.dispatch import java.util.concurrent.locks.ReentrantLock -import java.util.concurrent.TimeUnit + +import java.util.concurrent.{SynchronousQueue, TimeUnit} class FutureTimeoutException(message: String) extends RuntimeException(message) +object Futures { + def awaitAll(futures: List[FutureResult]): Unit = futures.foreach(_.await) + + def awaitOne(futures: List[FutureResult]): FutureResult = { + var future: Option[FutureResult] = None + do { + future = futures.find(_.isCompleted) + } while (future.isEmpty) + future.get + } + + /* + def awaitEither(f1: FutureResult, f2: FutureResult): Option[Any] = { + import Actor.Sender.Self + import Actor.{spawn, actor} + + case class Result(res: Option[Any]) + val handOff = new SynchronousQueue[Option[Any]] + spawn { + try { + println("f1 await") + f1.await + println("f1 offer") + handOff.offer(f1.result) + } catch {case _ => {}} + } + spawn { + try { + println("f2 await") + f2.await + println("f2 offer") + println("f2 offer: " + f2.result) + handOff.offer(f2.result) + } catch {case _ => {}} + } + Thread.sleep(100) + handOff.take + } +*/ +} + sealed trait FutureResult { def await def awaitBlocking @@ -46,7 +88,7 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes var start = currentTimeInNanos try { wait = _signal.awaitNanos(wait) - if (wait <= 0) throw new FutureTimeoutException("Future timed out after [" + timeout + "] milliseconds") + if (wait <= 0) throw new FutureTimeoutException("Futures timed out after [" + timeout + "] milliseconds") } catch { case e: InterruptedException => wait = wait - (currentTimeInNanos - start) diff --git a/akka-core/src/test/scala/FutureTest.scala b/akka-core/src/test/scala/FutureTest.scala new file mode 100644 index 0000000000..d073a92557 --- /dev/null +++ b/akka-core/src/test/scala/FutureTest.scala @@ -0,0 +1,111 @@ +package se.scalablesolutions.akka.actor + +import org.scalatest.junit.JUnitSuite +import org.junit.Test +import se.scalablesolutions.akka.dispatch.Futures + +class FutureTest extends JUnitSuite { + class TestActor extends Actor { + def receive = { + case "Hello" => + reply("World") + case "NoReply" => {} + case "Failure" => + throw new RuntimeException("expected") + } + } + + @Test def shouldActorReplyResultThroughExplicitFuture = { + val actor = new TestActor + actor.start + val future = actor !!! "Hello" + future.await + assert(future.result.isDefined) + assert("World" === future.result.get) + actor.stop + } + + @Test def shouldActorReplyExceptionThroughExplicitFuture = { + val actor = new TestActor + actor.start + val future = actor !!! "Failure" + future.await + assert(future.exception.isDefined) + assert("expected" === future.exception.get._2.getMessage) + actor.stop + } + + /* + @Test def shouldFutureAwaitEitherLeft = { + val actor1 = new TestActor + actor1.start + val actor2 = new TestActor + actor2.start + val future1 = actor1 !!! "Hello" + val future2 = actor2 !!! "NoReply" + val result = Futures.awaitEither(future1, future2) + assert(result.isDefined) + assert("World" === result.get) + actor1.stop + actor2.stop + } + + @Test def shouldFutureAwaitEitherRight = { + val actor1 = new TestActor + actor1.start + val actor2 = new TestActor + actor2.start + val future1 = actor1 !!! "NoReply" + val future2 = actor2 !!! "Hello" + val result = Futures.awaitEither(future1, future2) + assert(result.isDefined) + assert("World" === result.get) + actor1.stop + actor2.stop + } + */ + @Test def shouldFutureAwaitOneLeft = { + val actor1 = new TestActor + actor1.start + val actor2 = new TestActor + actor2.start + val future1 = actor1 !!! "NoReply" + val future2 = actor2 !!! "Hello" + val result = Futures.awaitOne(List(future1, future2)) + assert(result.result.isDefined) + assert("World" === result.result.get) + actor1.stop + actor2.stop + } + + @Test def shouldFutureAwaitOneRight = { + val actor1 = new TestActor + actor1.start + val actor2 = new TestActor + actor2.start + val future1 = actor1 !!! "Hello" + val future2 = actor2 !!! "NoReply" + val result = Futures.awaitOne(List(future1, future2)) + assert(result.result.isDefined) + assert("World" === result.result.get) + actor1.stop + actor2.stop + } + + @Test def shouldFutureAwaitAll = { + val actor1 = new TestActor + actor1.start + val actor2 = new TestActor + actor2.start + val future1 = actor1 !!! "Hello" + val future2 = actor2 !!! "Hello" + Futures.awaitAll(List(future1, future2)) + assert(future1.result.isDefined) + assert("World" === future1.result.get) + assert(future2.result.isDefined) + assert("World" === future2.result.get) + actor1.stop + actor2.stop + } + +} diff --git a/akka-patterns/src/main/scala/Patterns.scala b/akka-patterns/src/main/scala/Patterns.scala index cc2825145a..b967c07df7 100644 --- a/akka-patterns/src/main/scala/Patterns.scala +++ b/akka-patterns/src/main/scala/Patterns.scala @@ -28,11 +28,6 @@ object Patterns { val seq = actors } - //FIXME 2.8, use default params with CyclicIterator - /*def loadBalancerActor(actors : () => List[Actor]) : Actor = loadBalancerActor( - new CyclicIterator(actors()) - ) */ - def dispatcherActor(routing : PF[Any,Actor], msgTransformer : (Any) => Any) : Actor = new Actor with Dispatcher { override def transform(msg : Any) = msgTransformer(msg) def routes = routing @@ -81,21 +76,4 @@ class CyclicIterator[T](items : List[T]) extends InfiniteIterator[T] { current = nc.tail nc.head } -} - -//Agent -/* -val a = agent(startValue) -a.set(_ + 5) -a.get -a.foreach println(_) -*/ -object Agent { - sealed trait AgentMessage - case class FunMessage[T](f : (T) => T) extends AgentMessage - case class ProcMessage[T](f : (T) => Unit) extends AgentMessage - case class ValMessage[T](t : T) extends AgentMessage -} -sealed private[akka] class Agent[T] { - } \ No newline at end of file diff --git a/akka-persistence/akka-persistence-cassandra/pom.xml b/akka-persistence/akka-persistence-cassandra/pom.xml index 4bca9ffbac..d8490382d5 100644 --- a/akka-persistence/akka-persistence-cassandra/pom.xml +++ b/akka-persistence/akka-persistence-cassandra/pom.xml @@ -18,6 +18,12 @@ akka-persistence-common ${project.groupId} ${project.version} + + + com.google.code.google-collections + google-collect + + @@ -26,6 +32,50 @@ cassandra 0.5.0 + + org.apache.cassandra + high-scale-lib + 0.5.0 + test + + + org.apache.cassandra + clhm-production + 0.5.0 + test + + + com.google.collections + google-collections + 1.0-rc1 + test + + + commons-collections + commons-collections + 3.2.1 + test + + + commons-lang + commons-lang + 2.4 + test + + + org.slf4j + slf4j-api + 1.5.8 + test + + + org.slf4j + slf4j-log4j12 + 1.5.8 + test + + + log4j log4j diff --git a/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraSession.scala b/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraSession.scala index 0cbf9fa04b..0b0c5ca43a 100644 --- a/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraSession.scala +++ b/akka-persistence/akka-persistence-cassandra/src/main/scala/CassandraSession.scala @@ -104,6 +104,42 @@ trait CassandraSession extends Closeable with Flushable { // ==================================== // ====== Java-style API names // ==================================== + + def getSlice(key: String, columnParent: ColumnParent, start: Array[Byte], end: Array[Byte], ascending: Boolean, count: Int) = / (key, columnParent, start, end, ascending, count, consistencyLevel) + + def getSlice(key: String, columnParent: ColumnParent, start: Array[Byte], end: Array[Byte], ascending: Boolean, count: Int, consistencyLevel: Int) = / (key, columnParent, start, end, ascending, count, consistencyLevel) + + def getSlice(key: String, columnParent: ColumnParent, slicePredicate: SlicePredicate) = / (key, columnParent, slicePredicate) + + def getSlice(key: String, columnParent: ColumnParent, slicePredicate: SlicePredicate, consistencyLevel: Int) = / (key, columnParent, slicePredicate, consistencyLevel) + + + def get(key: String, colPath: ColumnPath) = |(key, colPath) + + def get(key: String, colPath: ColumnPath, consistencyLevel: Int) = |(key, colPath, consistencyLevel) + + def getCount(key: String, columnParent: ColumnParent)= |#(key, columnParent) + + def getCount(key: String, columnParent: ColumnParent, consistencyLevel: Int) = |#(key, columnParent, consistencyLevel) + + + def insert(key: String, colPath: ColumnPath, value: Array[Byte]): Unit = ++|(key, colPath, value) + + def insert(key: String, colPath: ColumnPath, value: Array[Byte], consistencyLevel: Int): Unit = ++|(key, colPath, value, consistencyLevel) + + def insert(key: String, colPath: ColumnPath, value: Array[Byte], timestamp: Long): Unit = ++|(key, colPath, value, timestamp) + + def insert(key: String, colPath: ColumnPath, value: Array[Byte], timestamp: Long, consistencyLevel: Int) = ++|(key, colPath, value, timestamp, consistencyLevel) + + + def insert(key: String, batch: Map[String, List[ColumnOrSuperColumn]]): Unit = ++|(key, batch) + + def insert(key: String, batch: Map[String, List[ColumnOrSuperColumn]], consistencyLevel: Int): Unit = ++|(key, batch, consistencyLevel) + + def remove(key: String, columnPath: ColumnPath, timestamp: Long): Unit = --(key, columnPath, timestamp) + + def remove(key: String, columnPath: ColumnPath, timestamp: Long, consistencyLevel: Int): Unit = --(key, columnPath, timestamp, consistencyLevel) + } class CassandraSessionPool[T <: TTransport]( diff --git a/akka-persistence/akka-persistence-cassandra/src/test/resources/log4j.properties b/akka-persistence/akka-persistence-cassandra/src/test/resources/log4j.properties new file mode 100644 index 0000000000..3c8738fdc3 --- /dev/null +++ b/akka-persistence/akka-persistence-cassandra/src/test/resources/log4j.properties @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +log4j.rootLogger=DEBUG,R + +# rolling log file ("system.log +log4j.appender.R=org.apache.log4j.DailyRollingFileAppender +log4j.appender.R.DatePattern='.'yyyy-MM-dd-HH +log4j.appender.R.layout=org.apache.log4j.PatternLayout +log4j.appender.R.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n +log4j.appender.R.File=target/logs/system.log diff --git a/akka-persistence/akka-persistence-cassandra/src/test/resources/storage-conf.xml b/akka-persistence/akka-persistence-cassandra/src/test/resources/storage-conf.xml new file mode 100644 index 0000000000..13df178689 --- /dev/null +++ b/akka-persistence/akka-persistence-cassandra/src/test/resources/storage-conf.xml @@ -0,0 +1,337 @@ + + + + + + + + akka + + + false + + + + + + 0.01 + + + + + + + + + + + + org.apache.cassandra.dht.RandomPartitioner + + + + + + org.apache.cassandra.locator.EndPointSnitch + + + org.apache.cassandra.locator.RackUnawareStrategy + + + 1 + + + target/cassandra/commitlog + + target/cassandra/data + + target/cassandra/callouts + target/cassandra/staging + + + + + 127.0.0.1 + + + + + + + 5000 + + 128 + + + + + + localhost + + 7000 + + 7001 + + + localhost + + 9160 + + false + + + + + + + + 64 + + + 32 + 8 + + + 64 + + + 64 + + 0.1 + + 60 + + + 8 + 32 + + + periodic + + 10000 + + + + + 864000 + + + 256 + + diff --git a/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala b/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala index 2142311f76..690c69625e 100644 --- a/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala +++ b/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala @@ -6,6 +6,9 @@ import junit.framework.TestCase import org.junit.Test import org.junit.Assert._ +import org.apache.cassandra.service.CassandraDaemon +import org.junit.BeforeClass +import org.junit.Before case class GetMapState(key: String) case object GetVectorState @@ -70,7 +73,10 @@ class CassandraPersistentActor extends Actor { } } -class CassandraPersistentActorSpec extends TestCase { +class CassandraPersistentActorTest { + + @Before + def startCassandra = EmbeddedCassandraService.start @Test def testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { @@ -144,4 +150,28 @@ class CassandraPersistentActorSpec extends TestCase { val result: Array[Byte] = (stateful !! GetRefState).get assertEquals("init", new String(result, 0, result.length, "UTF-8")) // check that state is == init state } + } + +import org.apache.cassandra.service.CassandraDaemon +object EmbeddedCassandraService { + + System.setProperty("storage-config", "src/test/resources"); + + val cassandra = new Runnable { + + val cassandraDaemon = new CassandraDaemon + cassandraDaemon.init(null) + + def run = cassandraDaemon.start + + } + + // spawn cassandra in a new thread + val t = new Thread(cassandra) + t.setDaemon(true) + t.start + + def start: Unit = {} + +} \ No newline at end of file diff --git a/embedded-repo/org/apache/cassandra/clhm-production/0.5.0/clhm-production-0.5.0.jar b/embedded-repo/org/apache/cassandra/clhm-production/0.5.0/clhm-production-0.5.0.jar new file mode 100644 index 0000000000..028f505bb9 Binary files /dev/null and b/embedded-repo/org/apache/cassandra/clhm-production/0.5.0/clhm-production-0.5.0.jar differ diff --git a/embedded-repo/org/apache/cassandra/high-scale-lib/0.5.0/high-scale-lib-0.5.0.jar b/embedded-repo/org/apache/cassandra/high-scale-lib/0.5.0/high-scale-lib-0.5.0.jar new file mode 100644 index 0000000000..421a436eed Binary files /dev/null and b/embedded-repo/org/apache/cassandra/high-scale-lib/0.5.0/high-scale-lib-0.5.0.jar differ