diff --git a/akka-actor/src/main/scala/actor/Actor.scala b/akka-actor/src/main/scala/actor/Actor.scala index 46297d32d0..dc74028964 100644 --- a/akka-actor/src/main/scala/actor/Actor.scala +++ b/akka-actor/src/main/scala/actor/Actor.scala @@ -413,8 +413,14 @@ trait Actor extends Logging { /** * Changes tha Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler. * Puts the behavior on top of the hotswap stack. + * If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack */ - def become(behavior: Receive): Unit = self.hotswap = self.hotswap.push(behavior) + def become(behavior: Receive, discardOld: Boolean = false) { + if (discardOld) + unbecome + + self.hotswap = self.hotswap.push(behavior) + } /** * Reverts the Actor behavior to the previous one in the hotswap stack. diff --git a/akka-actor/src/main/scala/actor/UntypedActor.scala b/akka-actor/src/main/scala/actor/UntypedActor.scala index e36a7837b6..b31077c8f3 100644 --- a/akka-actor/src/main/scala/actor/UntypedActor.scala +++ b/akka-actor/src/main/scala/actor/UntypedActor.scala @@ -72,7 +72,13 @@ abstract class UntypedActor extends Actor { /** * Java API for become */ - def become(behavior: Procedure[Any]): Unit = super.become { case msg => behavior.apply(msg) } + def become(behavior: Procedure[Any]):Unit = become(behavior,false) + + /* + * Java API for become with optional discardOld + */ + def become(behavior: Procedure[Any], discardOld: Boolean): Unit = + super.become({ case msg => behavior.apply(msg) }, discardOld) @throws(classOf[Exception]) def onReceive(message: Any): Unit diff --git a/akka-actor/src/main/scala/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/dispatch/Dispatchers.scala index cb7f8fab94..4d33bf03ce 100644 --- a/akka-actor/src/main/scala/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/dispatch/Dispatchers.scala @@ -48,6 +48,9 @@ import java.util.concurrent.TimeUnit */ object Dispatchers extends Logging { val THROUGHPUT = config.getInt("akka.actor.throughput", 5) + val DEFAULT_SHUTDOWN_TIMEOUT = config.getLong("akka.actor.dispatcher-shutdown-timeout"). + map(time => Duration(time, TIME_UNIT)). + getOrElse(Duration(1000,TimeUnit.MILLISECONDS)) val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", -1) val MAILBOX_PUSH_TIME_OUT = Duration(config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-time", 10), TIME_UNIT) val THROUGHPUT_DEADLINE_TIME = Duration(config.getInt("akka.actor.throughput-deadline-time",-1), TIME_UNIT) diff --git a/akka-actor/src/main/scala/dispatch/Future.scala b/akka-actor/src/main/scala/dispatch/Future.scala index 68a3ce4399..2ac412d36d 100644 --- a/akka-actor/src/main/scala/dispatch/Future.scala +++ b/akka-actor/src/main/scala/dispatch/Future.scala @@ -1,5 +1,5 @@ /** - * Copyright (C) 2009-2010 Scalable Solutions AB + * Copyright (C) 2009-2010 Scalable Solutions AB */ package akka.dispatch @@ -10,6 +10,7 @@ import akka.routing.Dispatcher import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.TimeUnit +import akka.japi.Procedure class FutureTimeoutException(message: String) extends AkkaException(message) @@ -34,19 +35,24 @@ object Futures { f } + /** + * (Blocking!) + */ def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await) /** - * Returns the First Future that is completed - * if no Future is completed, awaitOne optionally sleeps "sleepMs" millis and then re-scans + * Returns the First Future that is completed (blocking!) */ - def awaitOne(futures: List[Future[_]], sleepMs: Long = 0): Future[_] = { - var future: Option[Future[_]] = None - do { - future = futures.find(_.isCompleted) - if (sleepMs > 0 && future.isEmpty) Thread.sleep(sleepMs) - } while (future.isEmpty) - future.get + def awaitOne(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = firstCompletedOf(futures).await + + /** + * Returns a Future to the result of the first future in the list that is completed + */ + def firstCompletedOf(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = { + val futureResult = new DefaultCompletableFuture[Any](timeout) + val fun = (f: Future[_]) => futureResult completeWith f.asInstanceOf[Future[Any]] + for(f <- futures) f onComplete fun + futureResult } /** @@ -55,34 +61,10 @@ object Futures { def awaitMap[A,B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] = in map { f => fun(f.await) } - /* - def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = { - import Actor.Sender.Self - import Actor.{spawn, actor} - - case class Result(res: Option[T]) - val handOff = new SynchronousQueue[Option[T]] - spawn { - try { - println("f1 await") - f1.await - println("f1 offer") - handOff.offer(f1.result) - } catch {case _ => {}} - } - spawn { - try { - println("f2 await") - f2.await - println("f2 offer") - println("f2 offer: " + f2.result) - handOff.offer(f2.result) - } catch {case _ => {}} - } - Thread.sleep(100) - handOff.take - } -*/ + /** + * Returns Future.resultOrException of the first completed of the 2 Futures provided (blocking!) + */ + def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = awaitOne(List(f1,f2)).asInstanceOf[Future[T]].resultOrException } sealed trait Future[T] { @@ -100,6 +82,24 @@ sealed trait Future[T] { def exception: Option[Throwable] + def onComplete(func: Future[T] => Unit): Future[T] + + /** + * Returns the current result, throws the exception is one has been raised, else returns None + */ + def resultOrException: Option[T] = { + val r = result + if (r.isDefined) result + else { + val problem = exception + if (problem.isDefined) throw problem.get + else None + } + } + + /* Java API */ + def onComplete(proc: Procedure[Future[T]]): Future[T] = onComplete(f => proc(f)) + def map[O](f: (T) => O): Future[O] = { val wrapped = this new Future[O] { @@ -110,14 +110,21 @@ sealed trait Future[T] { def timeoutInNanos = wrapped.timeoutInNanos def result: Option[O] = { wrapped.result map f } def exception: Option[Throwable] = wrapped.exception + def onComplete(func: Future[O] => Unit): Future[O] = { wrapped.onComplete(_ => func(this)); this } } } } trait CompletableFuture[T] extends Future[T] { def completeWithResult(result: T) - def completeWithException(exception: Throwable) + def completeWith(other: Future[T]) { + val result = other.result + val exception = other.exception + if (result.isDefined) completeWithResult(result.get) + else if (exception.isDefined) completeWithException(exception.get) + //else TODO how to handle this case? + } } // Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/]. @@ -133,6 +140,7 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { private var _completed: Boolean = _ private var _result: Option[T] = None private var _exception: Option[Throwable] = None + private var _listeners: List[Future[T] => Unit] = Nil def await = try { _lock.lock @@ -190,33 +198,67 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] { _lock.unlock } - def completeWithResult(result: T) = try { - _lock.lock - if (!_completed) { - _completed = true - _result = Some(result) - onComplete(result) + def completeWithResult(result: T) { + val notify = try { + _lock.lock + if (!_completed) { + _completed = true + _result = Some(result) + true + } else false + } finally { + _signal.signalAll + _lock.unlock } - } finally { - _signal.signalAll - _lock.unlock + + if (notify) + notifyListeners } - def completeWithException(exception: Throwable) = try { - _lock.lock - if (!_completed) { - _completed = true - _exception = Some(exception) - onCompleteException(exception) + def completeWithException(exception: Throwable) { + val notify = try { + _lock.lock + if (!_completed) { + _completed = true + _exception = Some(exception) + true + } else false + } finally { + _signal.signalAll + _lock.unlock } - } finally { - _signal.signalAll - _lock.unlock + + if (notify) + notifyListeners } - protected def onComplete(result: T) {} + def onComplete(func: Future[T] => Unit): CompletableFuture[T] = { + val notifyNow = try { + _lock.lock + if (!_completed) { + _listeners ::= func + false + } + else + true + } finally { + _lock.unlock + } - protected def onCompleteException(exception: Throwable) {} + if (notifyNow) + notifyListener(func) + + this + } + + private def notifyListeners() { + for(l <- _listeners) + notifyListener(l) + } + + private def notifyListener(func: Future[T] => Unit) { + func(this) + } private def currentTimeInNanos: Long = TIME_UNIT.toNanos(System.currentTimeMillis) } diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala index f7eb0ca170..33a7a62af3 100644 --- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala @@ -145,7 +145,11 @@ trait MessageDispatcher extends MailboxFactory with Logging { } } - private[akka] def timeoutMs: Long = 1000 + /** + * When the dispatcher no longer has any actors registered, how long will it wait until it shuts itself down, in Ms + * defaulting to your akka configs "akka.actor.dispatcher-shutdown-timeout" or otherwise, 1 Second + */ + private[akka] def timeoutMs: Long = Dispatchers.DEFAULT_SHUTDOWN_TIMEOUT.toMillis /** * After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference diff --git a/akka-actor/src/main/scala/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/util/ReflectiveAccess.scala index 51c895728e..3320522ed6 100644 --- a/akka-actor/src/main/scala/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/util/ReflectiveAccess.scala @@ -259,7 +259,7 @@ object ReflectiveAccess extends Logging { Some(ctor.newInstance(args: _*).asInstanceOf[T]) } catch { case e => - log.debug("Could not instantiate class [%s] due to [%s]", clazz.getName, e) + log.warning("Could not instantiate class [%s] due to [%s]", clazz.getName, e.getCause) None } @@ -276,7 +276,7 @@ object ReflectiveAccess extends Logging { Some(ctor.newInstance(args: _*).asInstanceOf[T]) } catch { case e => - log.debug("Could not instantiate class [%s] due to [%s]", fqn, e) + log.warning("Could not instantiate class [%s] due to [%s]", fqn, e.getCause) None } diff --git a/akka-actor/src/test/scala/dispatch/ActorModelSpec.scala b/akka-actor/src/test/scala/dispatch/ActorModelSpec.scala index bd0f359398..df088ce89c 100644 --- a/akka-actor/src/test/scala/dispatch/ActorModelSpec.scala +++ b/akka-actor/src/test/scala/dispatch/ActorModelSpec.scala @@ -12,6 +12,7 @@ import akka.actor.Actor._ import java.util.concurrent.atomic.AtomicLong import java.util.concurrent. {ConcurrentHashMap, CountDownLatch, TimeUnit} import akka.actor.dispatch.ActorModelSpec.MessageDispatcherInterceptor +import akka.util.Duration object ActorModelSpec { @@ -164,6 +165,18 @@ object ActorModelSpec { assert(stats.restarts.get() === restarts, "Restarts") } + def await(condition: => Boolean)(withinMs: Long, intervalMs: Long = 25): Boolean = try { + val until = System.currentTimeMillis() + withinMs + while(System.currentTimeMillis() <= until) { + try { + if (condition) return true + + Thread.sleep(intervalMs) + } catch { case e: InterruptedException => } + } + false + } + def newTestActor(implicit d: MessageDispatcherInterceptor) = actorOf(new DispatcherActor(d)) } @@ -179,7 +192,7 @@ abstract class ActorModelSpec extends JUnitSuite { a.start assertDispatcher(dispatcher)(starts = 1, stops = 0) a.stop - Thread.sleep(dispatcher.timeoutMs + 100) + await(dispatcher.stops.get == 1)(withinMs = dispatcher.timeoutMs * 5) assertDispatcher(dispatcher)(starts = 1, stops = 1) assertRef(a,dispatcher)( suspensions = 0, @@ -279,7 +292,7 @@ abstract class ActorModelSpec extends JUnitSuite { } for(run <- 1 to 3) { flood(10000) - Thread.sleep(dispatcher.timeoutMs * 2) + await(dispatcher.stops.get == run)(withinMs = 10000) assertDispatcher(dispatcher)(starts = run, stops = run) } } diff --git a/akka-actor/src/test/scala/dispatch/FutureSpec.scala b/akka-actor/src/test/scala/dispatch/FutureSpec.scala index d8b03bb0da..2b456a7cd3 100644 --- a/akka-actor/src/test/scala/dispatch/FutureSpec.scala +++ b/akka-actor/src/test/scala/dispatch/FutureSpec.scala @@ -5,6 +5,7 @@ import org.junit.Test import akka.dispatch.Futures import Actor._ import org.multiverse.api.latches.StandardLatch +import java.util.concurrent.CountDownLatch object FutureSpec { class TestActor extends Actor { @@ -53,7 +54,6 @@ class FutureSpec extends JUnitSuite { actor.stop } - /* // FIXME: implement Futures.awaitEither, and uncomment these two tests @Test def shouldFutureAwaitEitherLeft = { val actor1 = actorOf[TestActor].start @@ -78,7 +78,7 @@ class FutureSpec extends JUnitSuite { actor1.stop actor2.stop } - */ + @Test def shouldFutureAwaitOneLeft = { val actor1 = actorOf[TestActor].start val actor2 = actorOf[TestActor].start diff --git a/akka-camel/src/main/java/akka/camel/consume.java b/akka-camel/src/main/java/akka/camel/consume.java index 90faa14372..ebcc2efd29 100644 --- a/akka-camel/src/main/java/akka/camel/consume.java +++ b/akka-camel/src/main/java/akka/camel/consume.java @@ -24,4 +24,11 @@ public @interface consume { */ public abstract String value(); + /** + * Route definition handler class for customizing route to annotated method. + * The handler class must have a default constructor. + */ + public abstract Class routeDefinitionHandler() + default RouteDefinitionIdentity.class; + } diff --git a/akka-camel/src/main/scala/Consumer.scala b/akka-camel/src/main/scala/Consumer.scala index 336b83e527..8720f34a39 100644 --- a/akka-camel/src/main/scala/Consumer.scala +++ b/akka-camel/src/main/scala/Consumer.scala @@ -4,16 +4,27 @@ package akka.camel -import akka.actor._ - import java.net.InetSocketAddress +import org.apache.camel.{Exchange, Processor} +import org.apache.camel.model.{RouteDefinition, ProcessorDefinition} + +import akka.actor._ +import akka.japi.{Function => JFunction} + /** * Mixed in by Actor implementations that consume message from Camel endpoints. * * @author Martin Krasser */ trait Consumer { self: Actor => + import RouteDefinitionHandler._ + + /** + * The default route definition handler is the identity function + */ + private[camel] var routeDefinitionHandler: RouteDefinitionHandler = identity + /** * Returns the Camel endpoint URI to consume messages from. */ @@ -25,6 +36,18 @@ trait Consumer { self: Actor => * doesn't have any effect on one-way communications (they'll never block). */ def blocking = false + + /** + * Sets the route definition handler for creating a custom route to this consumer instance. + */ + def onRouteDefinition(h: RouteDefinition => ProcessorDefinition[_]): Unit = onRouteDefinition(from(h)) + + /** + * Sets the route definition handler for creating a custom route to this consumer instance. + *

+ * Java API. + */ + def onRouteDefinition(h: RouteDefinitionHandler): Unit = routeDefinitionHandler = h } /** @@ -73,6 +96,42 @@ abstract class RemoteUntypedConsumerActor(address: InetSocketAddress) extends Re def this(host: String, port: Int) = this(new InetSocketAddress(host, port)) } +/** + * A callback handler for route definitions to consumer actors. + * + * @author Martin Krasser + */ +trait RouteDefinitionHandler { + def onRouteDefinition(rd: RouteDefinition): ProcessorDefinition[_] +} + +/** + * The identity route definition handler. + * + * @author Martin Krasser + * + */ +class RouteDefinitionIdentity extends RouteDefinitionHandler { + def onRouteDefinition(rd: RouteDefinition) = rd +} + +/** + * @author Martin Krasser + */ +object RouteDefinitionHandler { + /** + * Returns the identity route definition handler + */ + val identity = new RouteDefinitionIdentity + + /** + * Created a route definition handler from the given function. + */ + def from(f: RouteDefinition => ProcessorDefinition[_]) = new RouteDefinitionHandler { + def onRouteDefinition(rd: RouteDefinition) = f(rd) + } +} + /** * @author Martin Krasser */ diff --git a/akka-camel/src/main/scala/ConsumerPublisher.scala b/akka-camel/src/main/scala/ConsumerPublisher.scala index 7b7ac491a9..39c4e0bb2f 100644 --- a/akka-camel/src/main/scala/ConsumerPublisher.scala +++ b/akka-camel/src/main/scala/ConsumerPublisher.scala @@ -10,6 +10,7 @@ import java.lang.reflect.Method import java.util.concurrent.CountDownLatch import org.apache.camel.builder.RouteBuilder +import org.apache.camel.model.{ProcessorDefinition, RouteDefinition} import akka.actor._ import akka.camel.component.TypedActorComponent @@ -22,41 +23,35 @@ private[camel] object ConsumerPublisher extends Logging { /** * Creates a route to the registered consumer actor. */ - def handleConsumerRegistered(event: ConsumerRegistered) { - CamelContextManager.mandatoryContext.addRoutes(new ConsumerActorRoute(event.uri, event.uuid, event.blocking)) - log.info("published actor %s at endpoint %s" format (event.actorRef, event.uri)) + def handleConsumerActorRegistered(event: ConsumerActorRegistered) { + CamelContextManager.mandatoryContext.addRoutes(new ConsumerActorRouteBuilder(event)) + log.info("published actor %s at endpoint %s" format (event.actorRef, event.endpointUri)) } /** * Stops the route to the already un-registered consumer actor. */ - def handleConsumerUnregistered(event: ConsumerUnregistered) { - CamelContextManager.mandatoryContext.stopRoute(event.uuid.toString) - log.info("unpublished actor %s from endpoint %s" format (event.actorRef, event.uri)) + def handleConsumerActorUnregistered(event: ConsumerActorUnregistered) { + CamelContextManager.mandatoryContext.stopRoute(event.uuid) + log.info("unpublished actor %s from endpoint %s" format (event.actorRef, event.endpointUri)) } /** * Creates a route to an typed actor method. */ def handleConsumerMethodRegistered(event: ConsumerMethodRegistered) { - val targetMethod = event.method.getName - val objectId = "%s_%s" format (event.init.actorRef.uuid, targetMethod) - - CamelContextManager.typedActorRegistry.put(objectId, event.typedActor) - CamelContextManager.mandatoryContext.addRoutes(new ConsumerMethodRoute(event.uri, objectId, targetMethod)) - log.info("published method %s of %s at endpoint %s" format (targetMethod, event.typedActor, event.uri)) + CamelContextManager.typedActorRegistry.put(event.methodUuid, event.typedActor) + CamelContextManager.mandatoryContext.addRoutes(new ConsumerMethodRouteBuilder(event)) + log.info("published method %s of %s at endpoint %s" format (event.methodName, event.typedActor, event.endpointUri)) } /** * Stops the route to the already un-registered consumer actor method. */ def handleConsumerMethodUnregistered(event: ConsumerMethodUnregistered) { - val targetMethod = event.method.getName - val objectId = "%s_%s" format (event.init.actorRef.uuid, targetMethod) - - CamelContextManager.typedActorRegistry.remove(objectId) - CamelContextManager.mandatoryContext.stopRoute(objectId) - log.info("unpublished method %s of %s from endpoint %s" format (targetMethod, event.typedActor, event.uri)) + CamelContextManager.typedActorRegistry.remove(event.methodUuid) + CamelContextManager.mandatoryContext.stopRoute(event.methodUuid) + log.info("unpublished method %s of %s from endpoint %s" format (event.methodName, event.typedActor, event.endpointUri)) } } @@ -64,8 +59,8 @@ private[camel] object ConsumerPublisher extends Logging { * Actor that publishes consumer actors and typed actor methods at Camel endpoints. * The Camel context used for publishing is obtained via CamelContextManager.context. * This actor accepts messages of type - * akka.camel.ConsumerRegistered, - * akka.camel.ConsumerUnregistered, + * akka.camel.ConsumerActorRegistered, + * akka.camel.ConsumerActorUnregistered, * akka.camel.ConsumerMethodRegistered and * akka.camel.ConsumerMethodUnregistered. * @@ -78,12 +73,12 @@ private[camel] class ConsumerPublisher extends Actor { @volatile private var unregistrationLatch = new CountDownLatch(0) protected def receive = { - case r: ConsumerRegistered => { - handleConsumerRegistered(r) + case r: ConsumerActorRegistered => { + handleConsumerActorRegistered(r) registrationLatch.countDown } - case u: ConsumerUnregistered => { - handleConsumerUnregistered(u) + case u: ConsumerActorUnregistered => { + handleConsumerActorUnregistered(u) unregistrationLatch.countDown } case mr: ConsumerMethodRegistered => { @@ -117,7 +112,7 @@ private[camel] case class SetExpectedUnregistrationCount(num: Int) * * @author Martin Krasser */ -private[camel] abstract class ConsumerRoute(endpointUri: String, id: String) extends RouteBuilder { +private[camel] abstract class ConsumerRouteBuilder(endpointUri: String, id: String) extends RouteBuilder { // TODO: make conversions configurable private val bodyConversions = Map( "file" -> classOf[InputStream] @@ -125,39 +120,39 @@ private[camel] abstract class ConsumerRoute(endpointUri: String, id: String) ext def configure = { val schema = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..." - bodyConversions.get(schema) match { - case Some(clazz) => from(endpointUri).routeId(id).convertBodyTo(clazz).to(targetUri) - case None => from(endpointUri).routeId(id).to(targetUri) - } + val cnvopt = bodyConversions.get(schema) + + onRouteDefinition(startRouteDefinition(cnvopt)).to(targetUri) } + protected def routeDefinitionHandler: RouteDefinitionHandler protected def targetUri: String + + private def onRouteDefinition(rd: RouteDefinition) = routeDefinitionHandler.onRouteDefinition(rd) + private def startRouteDefinition(bodyConversion: Option[Class[_]]): RouteDefinition = bodyConversion match { + case Some(clazz) => from(endpointUri).routeId(id).convertBodyTo(clazz) + case None => from(endpointUri).routeId(id) + } } /** * Defines the route to a (untyped) consumer actor. * - * @param endpointUri endpoint URI of the (untyped) consumer actor - * @param uuid actor uuid - * @param blocking true for blocking in-out exchanges, false otherwise - * * @author Martin Krasser */ -private[camel] class ConsumerActorRoute(endpointUri: String, uuid: Uuid, blocking: Boolean) extends ConsumerRoute(endpointUri, uuid.toString) { - protected override def targetUri = "actor:uuid:%s?blocking=%s" format (uuid, blocking) +private[camel] class ConsumerActorRouteBuilder(event: ConsumerActorRegistered) extends ConsumerRouteBuilder(event.endpointUri, event.uuid) { + protected def routeDefinitionHandler: RouteDefinitionHandler = event.routeDefinitionHandler + protected def targetUri = "actor:uuid:%s?blocking=%s" format (event.uuid, event.blocking) } /** * Defines the route to a typed actor method. * - * @param endpointUri endpoint URI of the consumer actor method - * @param id typed actor identifier - * @param method name of the method to invoke. - * * @author Martin Krasser */ -private[camel] class ConsumerMethodRoute(val endpointUri: String, id: String, method: String) extends ConsumerRoute(endpointUri, id) { - protected override def targetUri = "%s:%s?method=%s" format (TypedActorComponent.InternalSchema, id, method) +private[camel] class ConsumerMethodRouteBuilder(event: ConsumerMethodRegistered) extends ConsumerRouteBuilder(event.endpointUri, event.methodUuid) { + protected def routeDefinitionHandler: RouteDefinitionHandler = event.routeDefinitionHandler + protected def targetUri = "%s:%s?method=%s" format (TypedActorComponent.InternalSchema, event.methodUuid, event.methodName) } /** @@ -179,9 +174,9 @@ private[camel] class PublishRequestor extends Actor { protected def receive = { case ActorRegistered(actor) => - for (event <- ConsumerRegistered.forConsumer(actor)) deliverCurrentEvent(event) + for (event <- ConsumerActorRegistered.forConsumer(actor)) deliverCurrentEvent(event) case ActorUnregistered(actor) => - for (event <- ConsumerUnregistered.forConsumer(actor)) deliverCurrentEvent(event) + for (event <- ConsumerActorUnregistered.forConsumer(actor)) deliverCurrentEvent(event) case AspectInitRegistered(proxy, init) => for (event <- ConsumerMethodRegistered.forConsumer(proxy, init)) deliverCurrentEvent(event) case AspectInitUnregistered(proxy, init) => @@ -214,71 +209,72 @@ private[camel] case class PublishRequestorInit(consumerPublisher: ActorRef) /** * A consumer (un)registration event. - * - * @author Martin Krasser */ private[camel] sealed trait ConsumerEvent /** - * Event indicating that a consumer actor has been registered at the actor registry. - * - * @param actorRef actor reference - * @param uri endpoint URI of the consumer actor - * @param uuid actor uuid - * @param blocking true for blocking in-out exchanges, false otherwise - * - * @author Martin Krasser + * A consumer actor (un)registration event. */ -private[camel] case class ConsumerRegistered(actorRef: ActorRef, uri: String, uuid: Uuid, blocking: Boolean) extends ConsumerEvent +private[camel] trait ConsumerActorEvent extends ConsumerEvent { + val actorRef: ActorRef + val actor: Consumer + + val uuid = actorRef.uuid.toString + val endpointUri = actor.endpointUri + val blocking = actor.blocking + val routeDefinitionHandler = actor.routeDefinitionHandler +} + +/** + * A consumer method (un)registration event. + */ +private[camel] trait ConsumerMethodEvent extends ConsumerEvent { + val typedActor: AnyRef + val init: AspectInit + val method: Method + + val uuid = init.actorRef.uuid.toString + val methodName = method.getName + val methodUuid = "%s_%s" format (uuid, methodName) + + lazy val routeDefinitionHandler = consumeAnnotation.routeDefinitionHandler.newInstance + lazy val consumeAnnotation = method.getAnnotation(classOf[consume]) + lazy val endpointUri = consumeAnnotation.value +} + +/** + * Event indicating that a consumer actor has been registered at the actor registry. + */ +private[camel] case class ConsumerActorRegistered(actorRef: ActorRef, actor: Consumer) extends ConsumerActorEvent /** * Event indicating that a consumer actor has been unregistered from the actor registry. - * - * @param actorRef actor reference - * @param uri endpoint URI of the consumer actor - * @param uuid actor uuid - * - * @author Martin Krasser */ -private[camel] case class ConsumerUnregistered(actorRef: ActorRef, uri: String, uuid: Uuid) extends ConsumerEvent +private[camel] case class ConsumerActorUnregistered(actorRef: ActorRef, actor: Consumer) extends ConsumerActorEvent /** * Event indicating that an typed actor proxy has been created for a typed actor. For each @consume * annotated typed actor method a separate instance of this class is created. - * - * @param typedActor typed actor (proxy). - * @param init - * @param uri endpoint URI of the typed actor method - * @param method method to be published. - * - * @author Martin Krasser */ -private[camel] case class ConsumerMethodRegistered(typedActor: AnyRef, init: AspectInit, uri: String, method: Method) extends ConsumerEvent +private[camel] case class ConsumerMethodRegistered(typedActor: AnyRef, init: AspectInit, method: Method) extends ConsumerMethodEvent /** * Event indicating that an typed actor has been stopped. For each @consume * annotated typed object method a separate instance of this class is created. - * - * @param typedActor typed actor (proxy). - * @param init - * @param uri endpoint URI of the typed actor method - * @param method method to be un-published. - * - * @author Martin Krasser */ -private[camel] case class ConsumerMethodUnregistered(typedActor: AnyRef, init: AspectInit, uri: String, method: Method) extends ConsumerEvent +private[camel] case class ConsumerMethodUnregistered(typedActor: AnyRef, init: AspectInit, method: Method) extends ConsumerMethodEvent /** * @author Martin Krasser */ -private[camel] object ConsumerRegistered { +private[camel] object ConsumerActorRegistered { /** - * Creates an ConsumerRegistered event message for a consumer actor or None if + * Creates an ConsumerActorRegistered event message for a consumer actor or None if * actorRef is not a consumer actor. */ - def forConsumer(actorRef: ActorRef): Option[ConsumerRegistered] = { - Consumer.forConsumer[ConsumerRegistered](actorRef) { - target => ConsumerRegistered(actorRef, target.endpointUri, actorRef.uuid, target.blocking) + def forConsumer(actorRef: ActorRef): Option[ConsumerActorRegistered] = { + Consumer.forConsumer[ConsumerActorRegistered](actorRef) { + actor => ConsumerActorRegistered(actorRef, actor) } } } @@ -286,14 +282,14 @@ private[camel] object ConsumerRegistered { /** * @author Martin Krasser */ -private[camel] object ConsumerUnregistered { +private[camel] object ConsumerActorUnregistered { /** - * Creates an ConsumerUnregistered event message for a consumer actor or None if + * Creates an ConsumerActorUnregistered event message for a consumer actor or None if * actorRef is not a consumer actor. */ - def forConsumer(actorRef: ActorRef): Option[ConsumerUnregistered] = { - Consumer.forConsumer[ConsumerUnregistered](actorRef) { - target => ConsumerUnregistered(actorRef, target.endpointUri, actorRef.uuid) + def forConsumer(actorRef: ActorRef): Option[ConsumerActorUnregistered] = { + Consumer.forConsumer[ConsumerActorUnregistered](actorRef) { + actor => ConsumerActorUnregistered(actorRef, actor) } } } @@ -333,7 +329,7 @@ private[camel] object ConsumerMethodRegistered { */ def forConsumer(typedActor: AnyRef, init: AspectInit): List[ConsumerMethodRegistered] = { ConsumerMethod.forConsumer(typedActor, init) { - m => ConsumerMethodRegistered(typedActor, init, m.getAnnotation(classOf[consume]).value, m) + m => ConsumerMethodRegistered(typedActor, init, m) } } } @@ -349,7 +345,7 @@ private[camel] object ConsumerMethodUnregistered { */ def forConsumer(typedActor: AnyRef, init: AspectInit): List[ConsumerMethodUnregistered] = { ConsumerMethod.forConsumer(typedActor, init) { - m => ConsumerMethodUnregistered(typedActor, init, m.getAnnotation(classOf[consume]).value, m) + m => ConsumerMethodUnregistered(typedActor, init, m) } } } diff --git a/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java b/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java new file mode 100644 index 0000000000..c34ce0cc2e --- /dev/null +++ b/akka-camel/src/test/java/akka/camel/ConsumerJavaTestBase.java @@ -0,0 +1,59 @@ +package akka.camel; + +import akka.actor.ActorRegistry; +import akka.actor.TypedActor; +import akka.actor.UntypedActor; +import akka.japi.SideEffect; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static akka.camel.CamelContextManager.*; +import static akka.camel.CamelServiceManager.*; + +import static org.junit.Assert.*; + +/** + * @author Martin Krasser + */ +public class ConsumerJavaTestBase { + + private SampleErrorHandlingTypedConsumer consumer; + + @BeforeClass + public static void setUpBeforeClass() { + startCamelService(); + } + + @AfterClass + public static void tearDownAfterClass() { + stopCamelService(); + ActorRegistry.shutdownAll(); + } + + @Test + public void shouldHandleExceptionThrownByActorAndGenerateCustomResponse() { + getMandatoryService().awaitEndpointActivation(1, new SideEffect() { + public void apply() { + UntypedActor.actorOf(SampleErrorHandlingConsumer.class).start(); + } + }); + String result = getMandatoryTemplate().requestBody("direct:error-handler-test-java", "hello", String.class); + assertEquals("error: hello", result); + } + + @Test + public void shouldHandleExceptionThrownByTypedActorAndGenerateCustomResponse() { + getMandatoryService().awaitEndpointActivation(1, new SideEffect() { + public void apply() { + consumer = TypedActor.newInstance( + SampleErrorHandlingTypedConsumer.class, + SampleErrorHandlingTypedConsumerImpl.class); + } + }); + String result = getMandatoryTemplate().requestBody("direct:error-handler-test-java-typed", "hello", String.class); + assertEquals("error: hello", result); + } + +} diff --git a/akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java b/akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java new file mode 100644 index 0000000000..4e35d4e6ab --- /dev/null +++ b/akka-camel/src/test/java/akka/camel/SampleErrorHandlingConsumer.java @@ -0,0 +1,34 @@ +package akka.camel; + +import org.apache.camel.builder.Builder; +import org.apache.camel.model.ProcessorDefinition; +import org.apache.camel.model.RouteDefinition; + +/** + * @author Martin Krasser + */ +public class SampleErrorHandlingConsumer extends UntypedConsumerActor { + + public String getEndpointUri() { + return "direct:error-handler-test-java"; + } + + public boolean isBlocking() { + return true; + } + + public void preStart() { + onRouteDefinition(new RouteDefinitionHandler() { + public ProcessorDefinition onRouteDefinition(RouteDefinition rd) { + return rd.onException(Exception.class).handled(true).transform(Builder.exceptionMessage()).end(); + } + }); + } + + public void onReceive(Object message) throws Exception { + Message msg = (Message)message; + String body = msg.getBodyAs(String.class); + throw new Exception(String.format("error: %s", body)); + } + +} diff --git a/akka-camel/src/test/java/akka/camel/SampleErrorHandlingTypedConsumer.java b/akka-camel/src/test/java/akka/camel/SampleErrorHandlingTypedConsumer.java new file mode 100644 index 0000000000..d8a8c79440 --- /dev/null +++ b/akka-camel/src/test/java/akka/camel/SampleErrorHandlingTypedConsumer.java @@ -0,0 +1,11 @@ +package akka.camel; + +/** + * @author Martin Krasser + */ +public interface SampleErrorHandlingTypedConsumer { + + @consume(value="direct:error-handler-test-java-typed", routeDefinitionHandler=SampleRouteDefinitionHandler.class) + String willFail(String s); + +} diff --git a/akka-camel/src/test/java/akka/camel/SampleErrorHandlingTypedConsumerImpl.java b/akka-camel/src/test/java/akka/camel/SampleErrorHandlingTypedConsumerImpl.java new file mode 100644 index 0000000000..cfa42a7521 --- /dev/null +++ b/akka-camel/src/test/java/akka/camel/SampleErrorHandlingTypedConsumerImpl.java @@ -0,0 +1,14 @@ +package akka.camel; + +import akka.actor.TypedActor; + +/** + * @author Martin Krasser + */ +public class SampleErrorHandlingTypedConsumerImpl extends TypedActor implements SampleErrorHandlingTypedConsumer { + + public String willFail(String s) { + throw new RuntimeException(String.format("error: %s", s)); + } + +} diff --git a/akka-camel/src/test/java/akka/camel/SampleRouteDefinitionHandler.java b/akka-camel/src/test/java/akka/camel/SampleRouteDefinitionHandler.java new file mode 100644 index 0000000000..f1a99aa7d4 --- /dev/null +++ b/akka-camel/src/test/java/akka/camel/SampleRouteDefinitionHandler.java @@ -0,0 +1,14 @@ +package akka.camel; + +import org.apache.camel.builder.Builder; +import org.apache.camel.model.ProcessorDefinition; +import org.apache.camel.model.RouteDefinition; + +/** + * @author Martin Krasser + */ +public class SampleRouteDefinitionHandler implements RouteDefinitionHandler { + public ProcessorDefinition onRouteDefinition(RouteDefinition rd) { + return rd.onException(Exception.class).handled(true).transform(Builder.exceptionMessage()).end(); + } +} diff --git a/akka-camel/src/test/scala/ConsumerJavaTest.scala b/akka-camel/src/test/scala/ConsumerJavaTest.scala new file mode 100644 index 0000000000..48741dda96 --- /dev/null +++ b/akka-camel/src/test/scala/ConsumerJavaTest.scala @@ -0,0 +1,5 @@ +package akka.camel + +import org.scalatest.junit.JUnitSuite + +class ConsumerJavaTest extends ConsumerJavaTestBase with JUnitSuite \ No newline at end of file diff --git a/akka-camel/src/test/scala/ConsumerRegisteredTest.scala b/akka-camel/src/test/scala/ConsumerRegisteredTest.scala index d7bf5fc2c3..e85c5f905c 100644 --- a/akka-camel/src/test/scala/ConsumerRegisteredTest.scala +++ b/akka-camel/src/test/scala/ConsumerRegisteredTest.scala @@ -2,46 +2,47 @@ package akka.camel import org.junit.Test import org.scalatest.junit.JUnitSuite - -import akka.actor.{Actor, UntypedActor} +import akka.actor.{ActorRef, Actor, UntypedActor} class ConsumerRegisteredTest extends JUnitSuite { import ConsumerRegisteredTest._ @Test def shouldCreateSomeNonBlockingPublishRequestFromConsumer = { val c = Actor.actorOf[ConsumerActor1] - val event = ConsumerRegistered.forConsumer(c) - assert(event === Some(ConsumerRegistered(c, "mock:test1", c.uuid, false))) + val event = ConsumerActorRegistered.forConsumer(c) + assert(event === Some(ConsumerActorRegistered(c, consumerOf(c)))) } @Test def shouldCreateSomeBlockingPublishRequestFromConsumer = { val c = Actor.actorOf[ConsumerActor2] - val event = ConsumerRegistered.forConsumer(c) - assert(event === Some(ConsumerRegistered(c, "mock:test2", c.uuid, true))) + val event = ConsumerActorRegistered.forConsumer(c) + assert(event === Some(ConsumerActorRegistered(c, consumerOf(c)))) } @Test def shouldCreateNoneFromConsumer = { - val event = ConsumerRegistered.forConsumer(Actor.actorOf[PlainActor]) + val event = ConsumerActorRegistered.forConsumer(Actor.actorOf[PlainActor]) assert(event === None) } @Test def shouldCreateSomeNonBlockingPublishRequestFromUntypedConsumer = { val uc = UntypedActor.actorOf(classOf[SampleUntypedConsumer]) - val event = ConsumerRegistered.forConsumer(uc) - assert(event === Some(ConsumerRegistered(uc, "direct:test-untyped-consumer", uc.uuid, false))) + val event = ConsumerActorRegistered.forConsumer(uc) + assert(event === Some(ConsumerActorRegistered(uc, consumerOf(uc)))) } @Test def shouldCreateSomeBlockingPublishRequestFromUntypedConsumer = { val uc = UntypedActor.actorOf(classOf[SampleUntypedConsumerBlocking]) - val event = ConsumerRegistered.forConsumer(uc) - assert(event === Some(ConsumerRegistered(uc, "direct:test-untyped-consumer-blocking", uc.uuid, true))) + val event = ConsumerActorRegistered.forConsumer(uc) + assert(event === Some(ConsumerActorRegistered(uc, consumerOf(uc)))) } @Test def shouldCreateNoneFromUntypedConsumer = { val a = UntypedActor.actorOf(classOf[SampleUntypedActor]) - val event = ConsumerRegistered.forConsumer(a) + val event = ConsumerActorRegistered.forConsumer(a) assert(event === None) } + + private def consumerOf(ref: ActorRef) = ref.actor.asInstanceOf[Consumer] } object ConsumerRegisteredTest { diff --git a/akka-camel/src/test/scala/ConsumerTest.scala b/akka-camel/src/test/scala/ConsumerScalaTest.scala similarity index 75% rename from akka-camel/src/test/scala/ConsumerTest.scala rename to akka-camel/src/test/scala/ConsumerScalaTest.scala index 29a2697ff5..ddbe757a3f 100644 --- a/akka-camel/src/test/scala/ConsumerTest.scala +++ b/akka-camel/src/test/scala/ConsumerScalaTest.scala @@ -3,7 +3,8 @@ package akka.camel import java.util.concurrent.{TimeoutException, CountDownLatch, TimeUnit} import org.apache.camel.CamelExecutionException -import org.apache.camel.builder.RouteBuilder +import org.apache.camel.builder.Builder +import org.apache.camel.model.RouteDefinition import org.scalatest.{BeforeAndAfterAll, WordSpec} import org.scalatest.matchers.MustMatchers @@ -13,9 +14,9 @@ import akka.actor._ /** * @author Martin Krasser */ -class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers { +class ConsumerScalaTest extends WordSpec with BeforeAndAfterAll with MustMatchers { import CamelContextManager.mandatoryTemplate - import ConsumerTest._ + import ConsumerScalaTest._ var service: CamelService = _ @@ -171,9 +172,34 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers { } } } + + "A responding, blocking consumer" when { + "activated with a custom error handler" must { + "handle thrown exceptions by generating a custom response" in { + service.awaitEndpointActivation(1) { + actorOf[ErrorHandlingConsumer].start + } must be (true) + mandatoryTemplate.requestBody("direct:error-handler-test", "hello") must equal ("error: hello") + + } + } + "activated with a custom redelivery handler" must { + "handle thrown exceptions by redelivering the initial message" in { + service.awaitEndpointActivation(1) { + actorOf[RedeliveringConsumer].start + } must be (true) + mandatoryTemplate.requestBody("direct:redelivery-test", "hello") must equal ("accepted: hello") + + } + } + } } -object ConsumerTest { +object ConsumerScalaTest { + trait BlockingConsumer extends Consumer { self: Actor => + override def blocking = true + } + class TestConsumer(uri: String) extends Actor with Consumer { def endpointUri = uri protected def receive = { @@ -181,6 +207,53 @@ object ConsumerTest { } } + class TestBlocker(uri: String) extends Actor with BlockingConsumer { + self.timeout = 1000 + def endpointUri = uri + protected def receive = { + case msg: Message => { /* do not reply */ } + } + } + + class ErrorHandlingConsumer extends Actor with BlockingConsumer { + def endpointUri = "direct:error-handler-test" + + onRouteDefinition {rd: RouteDefinition => + rd.onException(classOf[Exception]).handled(true).transform(Builder.exceptionMessage).end + } + + protected def receive = { + case msg: Message => throw new Exception("error: %s" format msg.body) + } + } + + class RedeliveringConsumer extends Actor with BlockingConsumer { + def endpointUri = "direct:redelivery-test" + + onRouteDefinition {rd: RouteDefinition => + rd.onException(classOf[Exception]).maximumRedeliveries(1).end + } + + // + // first message to this actor is not valid and will be rejected + // + + var valid = false + + protected def receive = { + case msg: Message => try { + respondTo(msg) + } finally { + valid = true + } + } + + private def respondTo(msg: Message) = + if (valid) self.reply("accepted: %s" format msg.body) + else throw new Exception("rejected: %s" format msg.body) + + } + trait TestTypedConsumer { @consume("direct:publish-test-3") def foo(s: String): String @@ -193,12 +266,6 @@ object ConsumerTest { def bar(s: String) = "bar: %s" format s } - class TestBlocker(uri: String) extends Actor with Consumer { - self.timeout = 1000 - def endpointUri = uri - override def blocking = true - protected def receive = { - case msg: Message => { /* do not reply */ } - } - } + + } diff --git a/akka-camel/src/test/scala/PublishRequestorTest.scala b/akka-camel/src/test/scala/PublishRequestorTest.scala index 30e2782132..8578abef60 100644 --- a/akka-camel/src/test/scala/PublishRequestorTest.scala +++ b/akka-camel/src/test/scala/PublishRequestorTest.scala @@ -40,9 +40,9 @@ class PublishRequestorTest extends JUnitSuite { val obj = TypedActor.newInstance(classOf[SampleTypedSingleConsumer], classOf[SampleTypedSingleConsumerImpl]) assert(latch.await(5000, TimeUnit.MILLISECONDS)) val event = (publisher !! GetRetainedMessage).as[ConsumerMethodRegistered].get - assert(event.uri === "direct:foo") + assert(event.endpointUri === "direct:foo") assert(event.typedActor === obj) - assert(event.method.getName === "foo") + assert(event.methodName === "foo") } @Test def shouldReceiveOneConsumerMethodUnregisteredEvent = { @@ -52,9 +52,9 @@ class PublishRequestorTest extends JUnitSuite { TypedActor.stop(obj) assert(latch.await(5000, TimeUnit.MILLISECONDS)) val event = (publisher !! GetRetainedMessage).as[ConsumerMethodUnregistered].get - assert(event.uri === "direct:foo") + assert(event.endpointUri === "direct:foo") assert(event.typedActor === obj) - assert(event.method.getName === "foo") + assert(event.methodName === "foo") } @Test def shouldReceiveThreeConsumerMethodRegisteredEvents = { @@ -83,7 +83,7 @@ class PublishRequestorTest extends JUnitSuite { requestor ! ActorRegistered(consumer) assert(latch.await(5000, TimeUnit.MILLISECONDS)) assert((publisher !! GetRetainedMessage) === - Some(ConsumerRegistered(consumer, "mock:test", consumer.uuid, false))) + Some(ConsumerActorRegistered(consumer, consumer.actor.asInstanceOf[Consumer]))) } @Test def shouldReceiveOneConsumerUnregisteredEvent = { @@ -91,7 +91,7 @@ class PublishRequestorTest extends JUnitSuite { requestor ! ActorUnregistered(consumer) assert(latch.await(5000, TimeUnit.MILLISECONDS)) assert((publisher !! GetRetainedMessage) === - Some(ConsumerUnregistered(consumer, "mock:test", consumer.uuid))) + Some(ConsumerActorUnregistered(consumer, consumer.actor.asInstanceOf[Consumer]))) } } diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala index 7d74496936..4e7f1d1cfe 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala @@ -700,6 +700,13 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] } } +private[akka] object PersistentSortedSet { + // operations on the SortedSet + sealed trait Op + case object ADD extends Op + case object REM extends Op +} + /** * Implements a template for a concrete persistent transactional sorted set based storage. *

@@ -734,61 +741,45 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A] * @author */ trait PersistentSortedSet[A] extends Transactional with Committable with Abortable { - protected val newElems = TransactionalMap[A, Float]() - protected val removedElems = TransactionalVector[A]() + //Import Ops + import PersistentSortedSet._ + + // append only log: records all mutating operations + protected val appendOnlyTxLog = TransactionalVector[LogEntry]() + + // need to override in subclasses e.g. "sameElements" for Array[Byte] + def equal(v1: A, v2: A): Boolean = v1 == v2 + + case class LogEntry(value: A, score: Option[Float], op: Op) val storage: SortedSetStorageBackend[A] def commit = { - for ((element, score) <- newElems) storage.zadd(uuid, String.valueOf(score), element) - for (element <- removedElems) storage.zrem(uuid, element) - newElems.clear - removedElems.clear + for (entry <- appendOnlyTxLog) { + (entry: @unchecked) match { + case LogEntry(e, Some(s), ADD) => storage.zadd(uuid, String.valueOf(s), e) + case LogEntry(e, _, REM) => storage.zrem(uuid, e) + } + } + appendOnlyTxLog.clear } def abort = { - newElems.clear - removedElems.clear + appendOnlyTxLog.clear } def +(elem: A, score: Float) = add(elem, score) def add(elem: A, score: Float) = { register - newElems.put(elem, score) + appendOnlyTxLog.add(LogEntry(elem, Some(score), ADD)) } def -(elem: A) = remove(elem) def remove(elem: A) = { register - removedElems.add(elem) - } - - private def inStorage(elem: A): Option[Float] = storage.zscore(uuid, elem) match { - case Some(s) => Some(s.toFloat) - case None => None - } - - def contains(elem: A): Boolean = { - if (newElems contains elem) true - else { - inStorage(elem) match { - case Some(f) => true - case None => false - } - } - } - - def size: Int = newElems.size + storage.zcard(uuid) - removedElems.size - - def zscore(elem: A): Float = { - if (newElems contains elem) newElems.get(elem).get - inStorage(elem) match { - case Some(f) => f - case None => - throw new NoSuchElementException(elem + " not present") - } + appendOnlyTxLog.add(LogEntry(elem, None, REM)) } implicit def order(x: (A, Float)) = new Ordered[(A, Float)] { @@ -799,11 +790,27 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab def compare(x: (A, Float), y: (A, Float)) = x._2 compare y._2 } + protected def replay: List[(A, Float)] = { + val es = collection.mutable.Map() ++ storage.zrangeWithScore(uuid, 0, -1) + + for (entry <- appendOnlyTxLog) { + (entry: @unchecked) match { + case LogEntry(v, Some(s), ADD) => es += ((v, s)) + case LogEntry(v, _, REM) => es -= v + } + } + es.toList + } + + def contains(elem: A): Boolean = replay.map(_._1).contains(elem) + + def size: Int = replay size + + def zscore(elem: A): Float = replay.filter { case (e, s) => equal(e, elem) }.map(_._2).head + def zrange(start: Int, end: Int): List[(A, Float)] = { - // need to operate on the whole range - // get all from the underlying storage - val fromStore = storage.zrangeWithScore(uuid, 0, -1) - val ts = scala.collection.immutable.TreeSet(fromStore: _*) ++ newElems.toList + import PersistentSortedSet._ + val ts = collection.immutable.TreeSet(replay: _*) val l = ts.size // -1 means the last element, -2 means the second last @@ -821,3 +828,21 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab transaction.get.get.register(uuid, this) } } + +trait PersistentSortedSetBinary extends PersistentSortedSet[Array[Byte]] { + import PersistentSortedSet._ + + override def equal(k1: Array[Byte], k2: Array[Byte]): Boolean = k1 sameElements k2 + + override protected def replay: List[(Array[Byte], Float)] = { + val es = collection.mutable.Map() ++ storage.zrangeWithScore(uuid, 0, -1).map { case (k, v) => (ArraySeq(k: _*), v) } + + for (entry <- appendOnlyTxLog) { + (entry: @unchecked) match { + case LogEntry(v, Some(s), ADD) => es += ((ArraySeq(v: _*), s)) + case LogEntry(v, _, REM) => es -= ArraySeq(v: _*) + } + } + es.toList.map { case (k, v) => (k.toArray, v) } + } +} diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala index 9dfc37770a..591d337af9 100644 --- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala +++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorage.scala @@ -74,7 +74,7 @@ class RedisPersistentQueue(id: String) extends PersistentQueue[Array[Byte]] { * * @author Debasish Ghosh */ -class RedisPersistentSortedSet(id: String) extends PersistentSortedSet[Array[Byte]] { +class RedisPersistentSortedSet(id: String) extends PersistentSortedSetBinary { val uuid = id val storage = RedisStorageBackend } diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentSortedSetSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentSortedSetSpec.scala index 8e25dbf4d6..02e3c03bab 100644 --- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentSortedSetSpec.scala +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentSortedSetSpec.scala @@ -72,7 +72,7 @@ class SortedSetActor extends Transactor { hackers.+(h.name.getBytes, h.zscore) } try { - r.foreach{ h => + r.foreach { h => if (hackers.size <= 3) throw new SetThresholdViolationException hackers.-(h.name.getBytes) @@ -184,11 +184,10 @@ class RedisPersistentSortedSetSpec extends val add1 = List(h5, h6) // remove 3 - val rem1 = List(h1, h3, h4) + val rem1 = List(h1, h3, h4, h5) try { qa !! MULTI(add1, rem1, failer) - } catch { case e: Exception => {} - } + } catch { case e: RuntimeException => {} } (qa !! SIZE).get.asInstanceOf[Int] should equal(3) } } diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket513Spec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket513Spec.scala new file mode 100644 index 0000000000..0a58592045 --- /dev/null +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket513Spec.scala @@ -0,0 +1,67 @@ +package akka.persistence.redis + +import org.scalatest.Spec +import org.scalatest.Assertions +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.BeforeAndAfterAll +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith + +import akka.actor.{Actor, ActorRef, Transactor} +import Actor._ + +/** + * A persistent actor based on Redis sortedset storage. + *

+ * Needs a running Redis server. + * @author Debasish Ghosh + */ + +case class AddEmail(email: String, value: String) +case class GetAll(email: String) + +class MySortedSet extends Transactor { + def receive = { + case AddEmail(userEmail, value) => { + val registryId = "userValues:%s".format(userEmail) + val storageSet = RedisStorage.getSortedSet(registryId) + storageSet.add(value.getBytes, System.nanoTime.toFloat) + self.reply(storageSet.size) + } + case GetAll(userEmail) => { + val registryId = "userValues:%s".format(userEmail) + val storageSet = RedisStorage.getSortedSet(registryId) + self.reply(storageSet.zrange(0, -1)) + } + } +} + +import RedisStorageBackend._ + +@RunWith(classOf[JUnitRunner]) +class RedisTicket513Spec extends + Spec with + ShouldMatchers with + BeforeAndAfterAll { + + override def beforeAll { + flushDB + println("** destroyed database") + } + + override def afterAll { + flushDB + println("** destroyed database") + } + + describe("insert into user specific set") { + val a = actorOf[MySortedSet] + a.start + it("should work with transactors") { + (a !! AddEmail("test.user@gmail.com", "foo")).get should equal(1) + Thread.sleep(10) + (a !! AddEmail("test.user@gmail.com", "bar")).get should equal(2) + (a !! GetAll("test.user@gmail.com")).get.asInstanceOf[List[_]].size should equal(2) + } + } +} diff --git a/akka-persistence/akka-persistence-simpledb/src/main/scala/SimpledbStorage.scala b/akka-persistence/akka-persistence-simpledb/src/main/scala/SimpledbStorage.scala new file mode 100644 index 0000000000..c8ff43d633 --- /dev/null +++ b/akka-persistence/akka-persistence-simpledb/src/main/scala/SimpledbStorage.scala @@ -0,0 +1,51 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package akka.persistence.simpledb + +import akka.actor.{newUuid} +import akka.stm._ +import akka.persistence.common._ + + +object SimpledbStorage extends Storage { + + type ElementType = Array[Byte] + def newMap: PersistentMap[ElementType, ElementType] = newMap(newUuid.toString) + def newVector: PersistentVector[ElementType] = newVector(newUuid.toString) + def newRef: PersistentRef[ElementType] = newRef(newUuid.toString) + override def newQueue: PersistentQueue[ElementType] = newQueue(newUuid.toString) + + def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id) + def getVector(id: String): PersistentVector[ElementType] = newVector(id) + def getRef(id: String): PersistentRef[ElementType] = newRef(id) + override def getQueue(id: String): PersistentQueue[ElementType] = newQueue(id) + + def newMap(id: String): PersistentMap[ElementType, ElementType] = new SimpledbPersistentMap(id) + def newVector(id: String): PersistentVector[ElementType] = new SimpledbPersistentVector(id) + def newRef(id: String): PersistentRef[ElementType] = new SimpledbPersistentRef(id) + override def newQueue(id:String): PersistentQueue[ElementType] = new SimpledbPersistentQueue(id) +} + + +class SimpledbPersistentMap(id: String) extends PersistentMapBinary { + val uuid = id + val storage = SimpledbStorageBackend +} + + +class SimpledbPersistentVector(id: String) extends PersistentVector[Array[Byte]] { + val uuid = id + val storage = SimpledbStorageBackend +} + +class SimpledbPersistentRef(id: String) extends PersistentRef[Array[Byte]] { + val uuid = id + val storage = SimpledbStorageBackend +} + +class SimpledbPersistentQueue(id: String) extends PersistentQueue[Array[Byte]] { + val uuid = id + val storage = SimpledbStorageBackend +} diff --git a/akka-persistence/akka-persistence-simpledb/src/main/scala/SimpledbStorageBackend.scala b/akka-persistence/akka-persistence-simpledb/src/main/scala/SimpledbStorageBackend.scala new file mode 100644 index 0000000000..ed9fcf9f1e --- /dev/null +++ b/akka-persistence/akka-persistence-simpledb/src/main/scala/SimpledbStorageBackend.scala @@ -0,0 +1,151 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package akka.persistence.simpledb + +import akka.persistence.common._ +import akka.config.Config.config +import java.lang.String +import java.util.{List => JList, ArrayList => JAList} + +import collection.immutable.{HashMap, Iterable} +import collection.mutable.{HashMap => MMap} + +import com.amazonaws.auth.BasicAWSCredentials +import com.amazonaws.services.simpledb.AmazonSimpleDBClient +import com.amazonaws.services.simpledb.model._ +import collection.{JavaConversions, Map} + +private[akka] object SimpledbStorageBackend extends CommonStorageBackend { + import org.apache.commons.codec.binary.Base64 + + val seperator = "\r\n" + val seperatorBytes = seperator.getBytes("UTF-8") + val sizeAtt = "size" + val base64 = new Base64(1024, seperatorBytes, true) + val base64key = new Base64(1024, Array.empty[Byte], true) + val id = config.getString("akka.storage.simpledb.account.id", "YOU NEED TO PROVIDE AN AWS ID") + val secretKey = config.getString("akka.storage.simpledb.account.secretKey", "YOU NEED TO PROVIDE AN AWS SECRET KEY") + val refDomain = config.getString("akka.storage.simpledb.domain.ref", "ref") + val mapDomain = config.getString("akka.storage.simpledb.domain.map", "map") + val queueDomain = config.getString("akka.storage.simpledb.domain.queue", "queue") + val vectorDomain = config.getString("akka.storage.simpledb.domain.vector", "vector") + val credentials = new BasicAWSCredentials(id, secretKey); + val client = new AmazonSimpleDBClient(credentials) + + def queueAccess = queue + + def mapAccess = map + + def vectorAccess = vector + + def refAccess = ref + + val queue = new SimpledbAccess(queueDomain) + + val map = new SimpledbAccess(mapDomain) + + val vector = new SimpledbAccess(vectorDomain) + + val ref = new SimpledbAccess(refDomain) + + private[akka] class SimpledbAccess(val domainName: String) extends KVStorageBackendAccess { + var created = false + + def getClient(): AmazonSimpleDBClient = { + if (!created) { + client.createDomain(new CreateDomainRequest(domainName)) + created = true + } + client + } + + + def drop(): Unit = { + created = false + client.deleteDomain(new DeleteDomainRequest(domainName)) + } + + def delete(key: Array[Byte]): Unit = getClient.deleteAttributes(new DeleteAttributesRequest(domainName, encodeAndValidateKey(key))) + + def getAll(keys: Iterable[Array[Byte]]): Map[Array[Byte], Array[Byte]] = { + keys.foldLeft(new HashMap[Array[Byte], Array[Byte]]) { + (map, key) => { + val value = getValue(key) + if (value != null) { + map + (key -> getValue(key)) + } else { + map + } + } + } + } + + def getValue(key: Array[Byte], default: Array[Byte]): Array[Byte] = { + val req = new GetAttributesRequest(domainName, encodeAndValidateKey(key)).withConsistentRead(true) + val resp = getClient.getAttributes(req) + recomposeValue(resp.getAttributes) match { + case Some(value) => value + case None => default + } + } + + def getValue(key: Array[Byte]): Array[Byte] = getValue(key, null) + + def put(key: Array[Byte], value: Array[Byte]): Unit = { + val req = new PutAttributesRequest(domainName, encodeAndValidateKey(key), decomposeValue(value)) + getClient.putAttributes(req) + } + + def encodeAndValidateKey(key: Array[Byte]): String = { + val keystr = base64key.encodeToString(key) + if (keystr.size > 1024) { + throw new IllegalArgumentException("encoded key was longer than 1024 bytes (or 768 bytes unencoded)") + } + keystr + } + + def decomposeValue(value: Array[Byte]): JList[ReplaceableAttribute] = { + val encoded = base64.encodeToString(value) + val strings = encoded.split(seperator) + if (strings.size > 255) { + throw new IllegalArgumentException("The decomposed value is larger than 255K (or 195840 bytes unencoded)") + } + + val list: JAList[ReplaceableAttribute] = strings.zipWithIndex.foldLeft(new JAList[ReplaceableAttribute]) { + (list, zip) => { + zip match { + case (encode, index) => { + list.add(new ReplaceableAttribute(index.toString, encode, true)) + list + } + } + } + } + list.add(new ReplaceableAttribute(sizeAtt, list.size.toString, true)) + list + } + + def recomposeValue(atts: JList[Attribute]): Option[Array[Byte]] = { + val itemSnapshot = JavaConversions.asIterable(atts).foldLeft(new MMap[String, String]) { + (map, att) => { + map += (att.getName -> att.getValue) + } + } + itemSnapshot.get(sizeAtt) match { + case Some(strSize) => { + val size = Integer.parseInt(strSize) + val encoded = (0 until size).map(_.toString).map(itemSnapshot.get(_).get).reduceLeft[String] { + (acc, str) => acc + seperator + str + } + Some(base64.decode(encoded)) + } + case None => None + } + } + + } + + +} \ No newline at end of file diff --git a/akka-persistence/akka-persistence-simpledb/src/test/scala/SimpledbStorageBackendCompatibilityTest.scala b/akka-persistence/akka-persistence-simpledb/src/test/scala/SimpledbStorageBackendCompatibilityTest.scala new file mode 100644 index 0000000000..3e2df27160 --- /dev/null +++ b/akka-persistence/akka-persistence-simpledb/src/test/scala/SimpledbStorageBackendCompatibilityTest.scala @@ -0,0 +1,49 @@ +package akka.persistence.simpledb + + +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import akka.persistence.common.{QueueStorageBackendTest, VectorStorageBackendTest, MapStorageBackendTest, RefStorageBackendTest} + +@RunWith(classOf[JUnitRunner]) +class SimpledbRefStorageBackendTestIntegration extends RefStorageBackendTest { + def dropRefs = { + SimpledbStorageBackend.refAccess.drop + } + + + def storage = SimpledbStorageBackend +} + +@RunWith(classOf[JUnitRunner]) +class SimpledbMapStorageBackendTestIntegration extends MapStorageBackendTest { + def dropMaps = { + SimpledbStorageBackend.mapAccess.drop + } + + + def storage = SimpledbStorageBackend +} + +@RunWith(classOf[JUnitRunner]) +class SimpledbVectorStorageBackendTestIntegration extends VectorStorageBackendTest { + def dropVectors = { + SimpledbStorageBackend.vectorAccess.drop + } + + + def storage = SimpledbStorageBackend +} + + +@RunWith(classOf[JUnitRunner]) +class SimpledbQueueStorageBackendTestIntegration extends QueueStorageBackendTest { + def dropQueues = { + SimpledbStorageBackend.queueAccess.drop + } + + + def storage = SimpledbStorageBackend +} + + diff --git a/akka-persistence/akka-persistence-simpledb/src/test/scala/SimpledbTestIntegration.scala b/akka-persistence/akka-persistence-simpledb/src/test/scala/SimpledbTestIntegration.scala new file mode 100644 index 0000000000..5e0452b8be --- /dev/null +++ b/akka-persistence/akka-persistence-simpledb/src/test/scala/SimpledbTestIntegration.scala @@ -0,0 +1,52 @@ +package akka.persistence.simpledb + + +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.{BeforeAndAfterEach, Spec} + +@RunWith(classOf[JUnitRunner]) +class SimpledbTestIntegration extends Spec with ShouldMatchers with BeforeAndAfterEach { + import SimpledbStorageBackend._ + + + describe("the limitations of the simpledb storage backend") { + it("should store up to 255K per item base 64 encoded with a name+key length <= 1024 bytes base64 encoded") { + val name = "123456" + val keysize: Int = 758 + log.info("key:" + keysize) + val key = new Array[Byte](keysize) + val valsize: Int = 195840 + log.info("value:" + valsize) + + val value = new Array[Byte](valsize) + mapAccess.put(name, key, value) + val result = mapAccess.getValue(name, key, Array.empty[Byte]) + result.size should be(value.size) + result should be(value) + } + + it("should not accept a name+key longer that 1024 bytes base64 encoded") { + val name = "fail" + val key = new Array[Byte](2048) + val value = new Array[Byte](1) + evaluating { + mapAccess.put(name, key, value) + } should produce[IllegalArgumentException] + } + + it("should not accept a value larger than 255K base 64 encoded") { + val name = "failValue" + val key = "failKey".getBytes + val value = new Array[Byte](1024 * 512) + evaluating { + mapAccess.put(name, key, value) + } should produce[IllegalArgumentException] + } + } + + override protected def beforeEach(): Unit = { + mapAccess.drop + } +} \ No newline at end of file diff --git a/akka-persistence/akka-persistence-simpledb/src/test/scala/SimpledbTicket343TestIntegration.scala b/akka-persistence/akka-persistence-simpledb/src/test/scala/SimpledbTicket343TestIntegration.scala new file mode 100644 index 0000000000..6fc1e75fd3 --- /dev/null +++ b/akka-persistence/akka-persistence-simpledb/src/test/scala/SimpledbTicket343TestIntegration.scala @@ -0,0 +1,23 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package akka.persistence.simpledb + + +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import akka.persistence.common._ + +@RunWith(classOf[JUnitRunner]) +class SimpledbTicket343TestIntegration extends Ticket343Test { + def dropMapsAndVectors: Unit = { + SimpledbStorageBackend.vectorAccess.drop + SimpledbStorageBackend.mapAccess.drop + } + + def getVector: (String) => PersistentVector[Array[Byte]] = SimpledbStorage.getVector + + def getMap: (String) => PersistentMap[Array[Byte], Array[Byte]] = SimpledbStorage.getMap + +} diff --git a/akka-remote/src/main/protocol/RemoteProtocol.proto b/akka-remote/src/main/protocol/RemoteProtocol.proto index fe55c115e7..19282c53be 100644 --- a/akka-remote/src/main/protocol/RemoteProtocol.proto +++ b/akka-remote/src/main/protocol/RemoteProtocol.proto @@ -11,6 +11,21 @@ option optimize_for = SPEED; protoc RemoteProtocol.proto --java_out ../java *******************************************/ +/** + * Defines a remote message. + */ +message RemoteMessageProtocol { + required UuidProtocol uuid = 1; + required ActorInfoProtocol actorInfo = 2; + required bool oneWay = 3; + optional MessageProtocol message = 4; + optional ExceptionProtocol exception = 5; + optional UuidProtocol supervisorUuid = 6; + optional RemoteActorRefProtocol sender = 7; + repeated MetadataEntryProtocol metadata = 8; + optional string cookie = 9; +} + /** * Defines a remote ActorRef that "remembers" and uses its original Actor instance * on the original node. @@ -91,21 +106,6 @@ message TypedActorInfoProtocol { required string method = 2; } -/** - * Defines a remote message. - */ -message RemoteMessageProtocol { - required UuidProtocol uuid = 1; - required ActorInfoProtocol actorInfo = 2; - required bool oneWay = 3; - optional MessageProtocol message = 4; - optional ExceptionProtocol exception = 5; - optional UuidProtocol supervisorUuid = 6; - optional RemoteActorRefProtocol sender = 7; - repeated MetadataEntryProtocol metadata = 8; - optional string cookie = 9; -} - /** * Defines a UUID. */ diff --git a/akka-remote/src/main/scala/remote/RemoteClient.scala b/akka-remote/src/main/scala/remote/RemoteClient.scala index a1b01a078f..e80c7936c2 100644 --- a/akka-remote/src/main/scala/remote/RemoteClient.scala +++ b/akka-remote/src/main/scala/remote/RemoteClient.scala @@ -310,13 +310,11 @@ class RemoteClient private[akka] ( connection.getChannel.write(request) None } else { - futures.synchronized { val futureResult = if (senderFuture.isDefined) senderFuture.get else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout) futures.put(uuidFrom(request.getUuid.getHigh, request.getUuid.getLow), futureResult) connection.getChannel.write(request) Some(futureResult) - } } } else { val exception = new RemoteClientException( diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index c85f2913e0..bc12477970 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -16,7 +16,6 @@ import akka.remote.protocol.RemoteProtocol._ import akka.remote.protocol.RemoteProtocol.ActorType._ import akka.config.Config._ import akka.config.ConfigurationException -import akka.dispatch.{DefaultCompletableFuture, CompletableFuture} import akka.serialization.RemoteActorSerialization import akka.serialization.RemoteActorSerialization._ @@ -31,6 +30,7 @@ import org.jboss.netty.handler.ssl.SslHandler import scala.collection.mutable.Map import scala.reflect.BeanProperty +import akka.dispatch. {Future, DefaultCompletableFuture, CompletableFuture} /** * Use this object if you need a single remote server on a specific node. @@ -66,10 +66,10 @@ object RemoteNode extends RemoteServer * @author Jonas Bonér */ object RemoteServer { - val UUID_PREFIX = "uuid:" - - val SECURE_COOKIE: Option[String] = config.getString("akka.remote.secure-cookie") - val REQUIRE_COOKIE = { + val UUID_PREFIX = "uuid:" + val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.server.message-frame-size", 1048576) + val SECURE_COOKIE = config.getString("akka.remote.secure-cookie") + val REQUIRE_COOKIE = { val requireCookie = config.getBool("akka.remote.server.require-cookie", true) if (requireCookie && RemoteServer.SECURE_COOKIE.isEmpty) throw new ConfigurationException( "Configuration option 'akka.remote.server.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.secure-cookie'.") @@ -400,7 +400,7 @@ class RemoteServerPipelineFactory( } val ssl = if(RemoteServer.SECURE) join(new SslHandler(engine)) else join() - val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4) + val lenDec = new LengthFieldBasedFrameDecoder(RemoteServer.MESSAGE_FRAME_SIZE, 0, 4, 0, 4) val lenPrep = new LengthFieldPrepender(4) val protobufDec = new ProtobufDecoder(RemoteMessageProtocol.getDefaultInstance) val protobufEnc = new ProtobufEncoder @@ -497,7 +497,7 @@ class RemoteServerHandler( } private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = { - log.debug("Received RemoteMessageProtocol[\n%s]", request.toString) + log.debug("Received RemoteMessageProtocol[\n%s]".format(request.toString)) request.getActorInfo.getActorType match { case SCALA_ACTOR => dispatchToActor(request, channel) case TYPED_ACTOR => dispatchToTypedActor(request, channel) @@ -538,41 +538,46 @@ class RemoteServerHandler( message, request.getActorInfo.getTimeout, None, - Some(new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout){ - override def onComplete(result: AnyRef) { - log.debug("Returning result from actor invocation [%s]", result) - val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( - Some(actorRef), - Right(request.getUuid), - actorInfo.getId, - actorInfo.getTarget, - actorInfo.getTimeout, - Left(result), - true, - Some(actorRef), - None, - AkkaActorType.ScalaActor, - None) + Some(new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout). + onComplete(f => { + val result = f.result + val exception = f.exception - // FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method - if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid) + if (exception.isDefined) { + log.debug("Returning exception from actor invocation [%s]".format(exception.get)) + try { + channel.write(createErrorReplyMessage(exception.get, request, AkkaActorType.ScalaActor)) + } catch { + case e: Throwable => server.notifyListeners(RemoteServerError(e, server)) + } + } + else if (result.isDefined) { + log.debug("Returning result from actor invocation [%s]".format(result.get)) + val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( + Some(actorRef), + Right(request.getUuid), + actorInfo.getId, + actorInfo.getTarget, + actorInfo.getTimeout, + Left(result.get), + true, + Some(actorRef), + None, + AkkaActorType.ScalaActor, + None) - try { - channel.write(messageBuilder.build) - } catch { - case e: Throwable => server.notifyListeners(RemoteServerError(e, server)) + // FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method + if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid) + + try { + channel.write(messageBuilder.build) + } catch { + case e: Throwable => server.notifyListeners(RemoteServerError(e, server)) + } } } - - override def onCompleteException(exception: Throwable) { - try { - channel.write(createErrorReplyMessage(exception, request, AkkaActorType.ScalaActor)) - } catch { - case e: Throwable => server.notifyListeners(RemoteServerError(e, server)) - } - } - } - )) + ) + )) } } @@ -589,7 +594,10 @@ class RemoteServerHandler( val messageReceiver = typedActor.getClass.getDeclaredMethod(typedActorInfo.getMethod, argClasses: _*) if (request.getOneWay) messageReceiver.invoke(typedActor, args: _*) else { - val result = messageReceiver.invoke(typedActor, args: _*) + val result = messageReceiver.invoke(typedActor, args: _*) match { + case f: Future[_] => f.await.result.get + case other => other + } log.debug("Returning result from remote typed actor invocation [%s]", result) val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( diff --git a/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala b/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala index 064bce95b0..431c633102 100644 --- a/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala +++ b/akka-remote/src/test/scala/remote/RemoteTypedActorSpec.scala @@ -83,7 +83,7 @@ class RemoteTypedActorSpec extends } describe("Remote Typed Actor ") { -/* + it("should receive one-way message") { clearMessageLogs val ta = conf.getInstance(classOf[RemoteTypedActorOne]) @@ -102,7 +102,7 @@ class RemoteTypedActorSpec extends ta.requestReply("ping") } } -*/ + it("should be restarted on failure") { clearMessageLogs val ta = conf.getInstance(classOf[RemoteTypedActorOne]) @@ -112,7 +112,7 @@ class RemoteTypedActorSpec extends } messageLog.poll(5, TimeUnit.SECONDS) should equal ("Expected exception; to test fault-tolerance") } -/* + it("should restart linked friends on failure") { clearMessageLogs val ta1 = conf.getInstance(classOf[RemoteTypedActorOne]) @@ -124,5 +124,5 @@ class RemoteTypedActorSpec extends messageLog.poll(5, TimeUnit.SECONDS) should equal ("Expected exception; to test fault-tolerance") messageLog.poll(5, TimeUnit.SECONDS) should equal ("Expected exception; to test fault-tolerance") } -*/ } + } } diff --git a/akka-remote/src/test/scala/ticket/Ticket519Spec.scala b/akka-remote/src/test/scala/ticket/Ticket519Spec.scala new file mode 100644 index 0000000000..8457f10f45 --- /dev/null +++ b/akka-remote/src/test/scala/ticket/Ticket519Spec.scala @@ -0,0 +1,30 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +package akka.actor.ticket + +import org.scalatest.Spec +import org.scalatest.matchers.ShouldMatchers +import akka.remote.{RemoteClient, RemoteServer} +import akka.actor._ + + +class Ticket519Spec extends Spec with ShouldMatchers { + + val HOSTNAME = "localhost" + val PORT = 6666 + + describe("A remote TypedActor") { + it("should handle remote future replies") { + import akka.remote._ + + val server = { val s = new RemoteServer; s.start(HOSTNAME,PORT); s} + val actor = TypedActor.newRemoteInstance(classOf[SamplePojo], classOf[SamplePojoImpl],7000,HOSTNAME,PORT) + val r = actor.someFutureString + + r.await.result.get should equal ("foo") + TypedActor.stop(actor) + server.shutdown + } + } +} diff --git a/akka-spring/src/main/resources/akka/spring/akka-1.0-SNAPSHOT.xsd b/akka-spring/src/main/resources/akka/spring/akka-1.0-SNAPSHOT.xsd index 1014e7e592..ea438ebc88 100644 --- a/akka-spring/src/main/resources/akka/spring/akka-1.0-SNAPSHOT.xsd +++ b/akka-spring/src/main/resources/akka/spring/akka-1.0-SNAPSHOT.xsd @@ -145,13 +145,20 @@ - + Name of the implementation class. + + + + Bean instance behind the actor + + + @@ -191,13 +198,20 @@ - + Name of the implementation class. + + + + Bean instance behind the actor + + + diff --git a/akka-spring/src/main/scala/ActorFactoryBean.scala b/akka-spring/src/main/scala/ActorFactoryBean.scala index d0fac73596..c5014d6e18 100644 --- a/akka-spring/src/main/scala/ActorFactoryBean.scala +++ b/akka-spring/src/main/scala/ActorFactoryBean.scala @@ -40,6 +40,7 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App @BeanProperty var typed: String = "" @BeanProperty var interface: String = "" @BeanProperty var implementation: String = "" + @BeanProperty var beanRef: String = null @BeanProperty var timeoutStr: String = "" @BeanProperty var transactional: Boolean = false @BeanProperty var host: String = "" @@ -102,10 +103,18 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App private[akka] def createTypedInstance() : AnyRef = { if ((interface eq null) || interface == "") throw new AkkaBeansException( "The 'interface' part of the 'akka:actor' element in the Spring config file can't be null or empty string") - if ((implementation eq null) || implementation == "") throw new AkkaBeansException( - "The 'implementation' part of the 'akka:typed-actor' element in the Spring config file can't be null or empty string") + if (((implementation eq null) || implementation == "") && (beanRef eq null)) throw new AkkaBeansException( + "Either 'implementation' or 'ref' must be specified as attribute of the 'akka:typed-actor' element in the Spring config file ") + + val typedActor: AnyRef = if (beanRef eq null ) { + TypedActor.newInstance(interface.toClass, implementation.toClass, createConfig) + } + else + { + TypedActor.newInstance(interface.toClass, getBeanFactory().getBean(beanRef), createConfig) + } + - val typedActor: AnyRef = TypedActor.newInstance(interface.toClass, implementation.toClass, createConfig) if (isRemote && serverManaged) { val server = RemoteServer.getOrCreateServer(new InetSocketAddress(host, port.toInt)) if (serviceName.isEmpty) { @@ -121,9 +130,13 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App * Create an UntypedActor. */ private[akka] def createUntypedInstance() : ActorRef = { - if ((implementation eq null) || implementation == "") throw new AkkaBeansException( - "The 'implementation' part of the 'akka:untyped-actor' element in the Spring config file can't be null or empty string") - val actorRef = Actor.actorOf(implementation.toClass) + if (((implementation eq null) || implementation == "") && (beanRef eq null)) throw new AkkaBeansException( + "Either 'implementation' or 'ref' must be specified as attribute of the 'akka:untyped-actor' element in the Spring config file ") + val actorRef = if (beanRef eq null ) + Actor.actorOf(implementation.toClass) + else + Actor.actorOf(getBeanFactory().getBean(beanRef).asInstanceOf[Actor]) + if (timeout > 0) { actorRef.setTimeout(timeout) } diff --git a/akka-spring/src/main/scala/ActorParser.scala b/akka-spring/src/main/scala/ActorParser.scala index 3a0c756f20..8466995dc2 100644 --- a/akka-spring/src/main/scala/ActorParser.scala +++ b/akka-spring/src/main/scala/ActorParser.scala @@ -53,7 +53,14 @@ trait ActorParser extends BeanParser with DispatcherParser { } objectProperties.timeoutStr = element.getAttribute(TIMEOUT) - objectProperties.target = mandatory(element, IMPLEMENTATION) + objectProperties.target = if (element.getAttribute(IMPLEMENTATION).isEmpty) null else element.getAttribute(IMPLEMENTATION) + objectProperties.beanRef = if (element.getAttribute(BEANREF).isEmpty) null else element.getAttribute(BEANREF) + + if (objectProperties.target == null && objectProperties.beanRef == null) { + throw new IllegalArgumentException("Mandatory attribute missing, you need to provide either implementation or ref ") + } + + objectProperties.transactional = if (element.getAttribute(TRANSACTIONAL).isEmpty) false else element.getAttribute(TRANSACTIONAL).toBoolean if (element.hasAttribute(INTERFACE)) { diff --git a/akka-spring/src/main/scala/ActorProperties.scala b/akka-spring/src/main/scala/ActorProperties.scala index 487c3530da..c403de8da4 100644 --- a/akka-spring/src/main/scala/ActorProperties.scala +++ b/akka-spring/src/main/scala/ActorProperties.scala @@ -15,6 +15,7 @@ import AkkaSpringConfigurationTags._ class ActorProperties { var typed: String = "" var target: String = "" + var beanRef: String = "" var timeoutStr: String = "" var interface: String = "" var transactional: Boolean = false @@ -40,6 +41,7 @@ class ActorProperties { builder.addPropertyValue("serviceName", serviceName) builder.addPropertyValue("timeoutStr", timeoutStr) builder.addPropertyValue(IMPLEMENTATION, target) + builder.addPropertyValue("beanRef", beanRef) builder.addPropertyValue(INTERFACE, interface) builder.addPropertyValue(TRANSACTIONAL, transactional) builder.addPropertyValue(LIFECYCLE, lifecycle) diff --git a/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala b/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala index 0871797810..67ca06d8fb 100644 --- a/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala +++ b/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala @@ -43,6 +43,7 @@ object AkkaSpringConfigurationTags { // actor attributes val TIMEOUT = "timeout" val IMPLEMENTATION = "implementation" + val BEANREF = "ref" val INTERFACE = "interface" val TRANSACTIONAL = "transactional" val HOST = "host" diff --git a/akka-spring/src/test/resources/typed-actor-config.xml b/akka-spring/src/test/resources/typed-actor-config.xml index fa0e81eeae..695be2f31d 100644 --- a/akka-spring/src/test/resources/typed-actor-config.xml +++ b/akka-spring/src/test/resources/typed-actor-config.xml @@ -14,6 +14,14 @@ http://scalablesolutions.se/akka/akka-1.0-SNAPSHOT.xsd"> implementation="akka.spring.foo.MyPojo" timeout="1000"/> + + + + + + + + + diff --git a/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala b/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala index faa307db0b..d9f9526b95 100644 --- a/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala +++ b/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala @@ -81,6 +81,14 @@ class TypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers with B assert(MyPojo.lastOneWayMessage === "hello 1") } + scenario("get a typed actor of bean") { + val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "simple-typed-actor-of-bean") + assert(myPojo.getFoo() === "foo") + myPojo.oneWay("hello 1") + MyPojo.latch.await + assert(MyPojo.lastOneWayMessage === "hello 1") + } + scenario("FutureTimeoutException when timed out") { val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "simple-typed-actor") evaluating {myPojo.longRunning()} should produce[FutureTimeoutException] diff --git a/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala b/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala index 3ae3f09b70..b2aaed7c1f 100644 --- a/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala +++ b/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala @@ -67,6 +67,14 @@ class UntypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers with assert(myactor.isDefinedAt("some string message")) } + scenario("untyped-actor of provided bean") { + val myactor = getPingActorFromContext("/untyped-actor-config.xml", "simple-untyped-actor-of-bean") + myactor.sendOneWay("Hello") + PingActor.latch.await + assert(PingActor.lastMessage === "Hello") + assert(myactor.isDefinedAt("some string message")) + } + scenario("untyped-actor with timeout") { val myactor = getPingActorFromContext("/untyped-actor-config.xml", "simple-untyped-actor-long-timeout") assert(myactor.getTimeout() === 10000) diff --git a/akka-typed-actor/src/main/scala/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/actor/TypedActor.scala index 98d9bcdea3..3ad5468369 100644 --- a/akka-typed-actor/src/main/scala/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/actor/TypedActor.scala @@ -181,7 +181,6 @@ abstract class TypedActor extends Actor with Proxyable { if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint) if (TypedActor.isOneWay(joinPoint)) joinPoint.proceed else self.reply(joinPoint.proceed) - case Link(proxy) => self.link(proxy) case Unlink(proxy) => self.unlink(proxy) case unexpected => throw new IllegalActorStateException( @@ -851,6 +850,7 @@ private[akka] abstract class ActorAspect { ActorType.TypedActor) if (isOneWay) null // for void methods + else if (TypedActor.returnsFuture_?(methodRtti)) future.get else { if (future.isDefined) { future.get.await diff --git a/akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/SamplePojo.java b/akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/SamplePojo.java index 743a189bf6..5b9de51d44 100644 --- a/akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/SamplePojo.java +++ b/akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/SamplePojo.java @@ -1,8 +1,9 @@ package akka.actor; -import java.util.concurrent.CountDownLatch; +import akka.dispatch.Future; public interface SamplePojo { public String greet(String s); public String fail(); + public Future someFutureString(); } diff --git a/akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/SamplePojoImpl.java b/akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/SamplePojoImpl.java index 093904e5e1..b61fc2f366 100644 --- a/akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/SamplePojoImpl.java +++ b/akka-typed-actor/src/test/java/se/scalablesolutions/akka/actor/SamplePojoImpl.java @@ -25,6 +25,10 @@ public class SamplePojoImpl extends TypedActor implements SamplePojo { throw new RuntimeException("expected"); } + public akka.dispatch.Future someFutureString() { + return future("foo"); + } + @Override public void preRestart(Throwable e) { _pre = true; diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 16aecf6bc9..8bb6131524 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -26,6 +26,7 @@ akka { serialize-messages = off # Does a deep clone of (non-primitive) messages to ensure immutability throughput = 5 # Default throughput for all ExecutorBasedEventDrivenDispatcher, set to 1 for complete fairness throughput-deadline-time = -1 # Default throughput deadline for all ExecutorBasedEventDrivenDispatcher, set to 0 or negative for no deadline + dispatcher-shutdown-timeout = 1 # Using the akka.time-unit, how long dispatchers by default will wait for new actors until they shut down default-dispatcher { type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable @@ -135,6 +136,7 @@ akka { service = on hostname = "localhost" # The hostname or IP that clients should connect to port = 2552 # The port clients should connect to + message-frame-size = 1048576 connection-timeout = 1 require-cookie = on untrusted-mode = off diff --git a/embedded-repo/com/amazonaws/aws-java-sdk/1.0.14/aws-java-sdk-1.0.14-javadoc.jar b/embedded-repo/com/amazonaws/aws-java-sdk/1.0.14/aws-java-sdk-1.0.14-javadoc.jar new file mode 100644 index 0000000000..6ab90957c2 Binary files /dev/null and b/embedded-repo/com/amazonaws/aws-java-sdk/1.0.14/aws-java-sdk-1.0.14-javadoc.jar differ diff --git a/embedded-repo/com/amazonaws/aws-java-sdk/1.0.14/aws-java-sdk-1.0.14-sources.jar b/embedded-repo/com/amazonaws/aws-java-sdk/1.0.14/aws-java-sdk-1.0.14-sources.jar new file mode 100644 index 0000000000..0a97b3fa7b Binary files /dev/null and b/embedded-repo/com/amazonaws/aws-java-sdk/1.0.14/aws-java-sdk-1.0.14-sources.jar differ diff --git a/embedded-repo/com/amazonaws/aws-java-sdk/1.0.14/aws-java-sdk-1.0.14.jar b/embedded-repo/com/amazonaws/aws-java-sdk/1.0.14/aws-java-sdk-1.0.14.jar new file mode 100644 index 0000000000..a11205d066 Binary files /dev/null and b/embedded-repo/com/amazonaws/aws-java-sdk/1.0.14/aws-java-sdk-1.0.14.jar differ diff --git a/project/build.properties b/project/build.properties index a0dd769ee9..4b70e4a190 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1,4 +1,4 @@ -project.organization=akka +project.organization=se.scalablesolutions.akka project.name=akka project.version=1.0-SNAPSHOT scala.version=2.8.0 diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 8a74ba9f1b..35476f0497 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -140,149 +140,150 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { object Dependencies { // Compile - lazy val commonsHttpClient = "commons-httpclient" % "commons-httpclient" % "3.1" % "compile" + lazy val commonsHttpClient = "commons-httpclient" % "commons-httpclient" % "3.1" % "compile" //ApacheV2 - lazy val annotation = "javax.annotation" % "jsr250-api" % "1.0" % "compile" + lazy val aopalliance = "aopalliance" % "aopalliance" % "1.0" % "compile" //Public domain - lazy val aopalliance = "aopalliance" % "aopalliance" % "1.0" % "compile" + lazy val atmo = "org.atmosphere" % "atmosphere-annotations" % ATMO_VERSION % "compile" //CDDL v1 + lazy val atmo_jbossweb = "org.atmosphere" % "atmosphere-compat-jbossweb" % ATMO_VERSION % "compile" //CDDL v1 + lazy val atmo_jersey = "org.atmosphere" % "atmosphere-jersey" % ATMO_VERSION % "compile" //CDDL v1 + lazy val atmo_runtime = "org.atmosphere" % "atmosphere-runtime" % ATMO_VERSION % "compile" //CDDL v1 + lazy val atmo_tomcat = "org.atmosphere" % "atmosphere-compat-tomcat" % ATMO_VERSION % "compile" //CDDL v1 + lazy val atmo_weblogic = "org.atmosphere" % "atmosphere-compat-weblogic" % ATMO_VERSION % "compile" //CDDL v1 - lazy val atmo = "org.atmosphere" % "atmosphere-annotations" % ATMO_VERSION % "compile" - lazy val atmo_jbossweb = "org.atmosphere" % "atmosphere-compat-jbossweb" % ATMO_VERSION % "compile" - lazy val atmo_jersey = "org.atmosphere" % "atmosphere-jersey" % ATMO_VERSION % "compile" - lazy val atmo_runtime = "org.atmosphere" % "atmosphere-runtime" % ATMO_VERSION % "compile" - lazy val atmo_tomcat = "org.atmosphere" % "atmosphere-compat-tomcat" % ATMO_VERSION % "compile" - lazy val atmo_weblogic = "org.atmosphere" % "atmosphere-compat-weblogic" % ATMO_VERSION % "compile" + lazy val atomikos_transactions = "com.atomikos" % "transactions" % "3.2.3" % "compile" //ApacheV2 + lazy val atomikos_transactions_api = "com.atomikos" % "transactions-api" % "3.2.3" % "compile" //ApacheV2 + lazy val atomikos_transactions_jta = "com.atomikos" % "transactions-jta" % "3.2.3" % "compile" //ApacheV2 - lazy val atomikos_transactions = "com.atomikos" % "transactions" % "3.2.3" % "compile" - lazy val atomikos_transactions_api = "com.atomikos" % "transactions-api" % "3.2.3" % "compile" - lazy val atomikos_transactions_jta = "com.atomikos" % "transactions-jta" % "3.2.3" % "compile" + lazy val camel_core = "org.apache.camel" % "camel-core" % CAMEL_VERSION % "compile" //ApacheV2 - lazy val camel_core = "org.apache.camel" % "camel-core" % CAMEL_VERSION % "compile" + lazy val cassandra = "org.apache.cassandra" % "cassandra" % CASSANDRA_VERSION % "compile" //ApacheV2 - lazy val cassandra = "org.apache.cassandra" % "cassandra" % CASSANDRA_VERSION % "compile" + lazy val commons_codec = "commons-codec" % "commons-codec" % "1.4" % "compile" //ApacheV2 - lazy val commons_codec = "commons-codec" % "commons-codec" % "1.4" % "compile" + lazy val commons_io = "commons-io" % "commons-io" % "1.4" % "compile" //ApacheV2 - lazy val commons_io = "commons-io" % "commons-io" % "1.4" % "compile" + lazy val commons_pool = "commons-pool" % "commons-pool" % "1.5.4" % "compile" //ApacheV2 - lazy val commons_pool = "commons-pool" % "commons-pool" % "1.5.4" % "compile" + lazy val configgy = "net.lag" % "configgy" % "2.8.0-1.5.5" % "compile" //ApacheV2 - lazy val configgy = "net.lag" % "configgy" % "2.8.0-1.5.5" % "compile" + lazy val dispatch_http = "net.databinder" % "dispatch-http_2.8.0" % DISPATCH_VERSION % "compile" //LGPL v2 + lazy val dispatch_json = "net.databinder" % "dispatch-json_2.8.0" % DISPATCH_VERSION % "compile" //LGPL v2 - lazy val dispatch_http = "net.databinder" % "dispatch-http_2.8.0" % DISPATCH_VERSION % "compile" - lazy val dispatch_json = "net.databinder" % "dispatch-json_2.8.0" % DISPATCH_VERSION % "compile" + lazy val jetty = "org.eclipse.jetty" % "jetty-server" % JETTY_VERSION % "compile" //Eclipse license + lazy val jetty_util = "org.eclipse.jetty" % "jetty-util" % JETTY_VERSION % "compile" //Eclipse license + lazy val jetty_xml = "org.eclipse.jetty" % "jetty-xml" % JETTY_VERSION % "compile" //Eclipse license + lazy val jetty_servlet = "org.eclipse.jetty" % "jetty-servlet" % JETTY_VERSION % "compile" //Eclipse license - lazy val jetty = "org.eclipse.jetty" % "jetty-server" % JETTY_VERSION % "compile" - lazy val jetty_util = "org.eclipse.jetty" % "jetty-util" % JETTY_VERSION % "compile" - lazy val jetty_xml = "org.eclipse.jetty" % "jetty-xml" % JETTY_VERSION % "compile" - lazy val jetty_servlet = "org.eclipse.jetty" % "jetty-servlet" % JETTY_VERSION % "compile" + lazy val uuid = "com.eaio" % "uuid" % "3.2" % "compile" //MIT license - lazy val uuid = "com.eaio" % "uuid" % "3.2" % "compile" + lazy val guicey = "org.guiceyfruit" % "guice-all" % "2.0" % "compile" //ApacheV2 - lazy val guicey = "org.guiceyfruit" % "guice-all" % "2.0" % "compile" + lazy val h2_lzf = "voldemort.store.compress" % "h2-lzf" % "1.0" % "compile" //ApacheV2 - lazy val h2_lzf = "voldemort.store.compress" % "h2-lzf" % "1.0" % "compile" + lazy val hawtdispatch = "org.fusesource.hawtdispatch" % "hawtdispatch-scala" % HAWT_DISPATCH_VERSION % "compile" //ApacheV2 - lazy val hawtdispatch = "org.fusesource.hawtdispatch" % "hawtdispatch-scala" % HAWT_DISPATCH_VERSION % "compile" + lazy val jackson = "org.codehaus.jackson" % "jackson-mapper-asl" % JACKSON_VERSION % "compile" //ApacheV2 + lazy val jackson_core = "org.codehaus.jackson" % "jackson-core-asl" % JACKSON_VERSION % "compile" //ApacheV2 - lazy val jackson = "org.codehaus.jackson" % "jackson-mapper-asl" % JACKSON_VERSION % "compile" - lazy val jackson_core = "org.codehaus.jackson" % "jackson-core-asl" % JACKSON_VERSION % "compile" + lazy val jersey = "com.sun.jersey" % "jersey-core" % JERSEY_VERSION % "compile" //CDDL v1 + lazy val jersey_json = "com.sun.jersey" % "jersey-json" % JERSEY_VERSION % "compile" //CDDL v1 + lazy val jersey_server = "com.sun.jersey" % "jersey-server" % JERSEY_VERSION % "compile" //CDDL v1 + lazy val jersey_contrib = "com.sun.jersey.contribs" % "jersey-scala" % JERSEY_VERSION % "compile" //CDDL v1 - lazy val jersey = "com.sun.jersey" % "jersey-core" % JERSEY_VERSION % "compile" - lazy val jersey_json = "com.sun.jersey" % "jersey-json" % JERSEY_VERSION % "compile" - lazy val jersey_server = "com.sun.jersey" % "jersey-server" % JERSEY_VERSION % "compile" - lazy val jersey_contrib = "com.sun.jersey.contribs" % "jersey-scala" % JERSEY_VERSION % "compile" + lazy val jgroups = "jgroups" % "jgroups" % "2.9.0.GA" % "compile" //LGPL 2.1 - lazy val jgroups = "jgroups" % "jgroups" % "2.9.0.GA" % "compile" + lazy val jsr166x = "jsr166x" % "jsr166x" % "1.0" % "compile" //CC Public Domain - lazy val jsr166x = "jsr166x" % "jsr166x" % "1.0" % "compile" + lazy val jsr250 = "javax.annotation" % "jsr250-api" % "1.0" % "compile" //CDDL v1 - lazy val jsr250 = "javax.annotation" % "jsr250-api" % "1.0" % "compile" + lazy val jsr311 = "javax.ws.rs" % "jsr311-api" % "1.1" % "compile" //CDDL v1 - lazy val jsr311 = "javax.ws.rs" % "jsr311-api" % "1.1" % "compile" + lazy val jta_1_1 = "org.apache.geronimo.specs" % "geronimo-jta_1.1_spec" % "1.1.1" % "compile" intransitive //ApacheV2 - lazy val jta_1_1 = "org.apache.geronimo.specs" % "geronimo-jta_1.1_spec" % "1.1.1" % "compile" intransitive + lazy val mongo = "org.mongodb" % "mongo-java-driver" % "2.0" % "compile" //ApacheV2 - lazy val mongo = "org.mongodb" % "mongo-java-driver" % "2.0" % "compile" + lazy val casbah = "com.novus" % "casbah_2.8.0" % "1.0.8.5" % "compile" //ApacheV2 - lazy val casbah = "com.novus" % "casbah_2.8.0" % "1.0.8.5" % "compile" + lazy val multiverse = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "compile" intransitive //ApacheV2 - lazy val multiverse = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "compile" intransitive + lazy val netty = "org.jboss.netty" % "netty" % "3.2.3.Final" % "compile" //ApacheV2 - lazy val netty = "org.jboss.netty" % "netty" % "3.2.3.Final" % "compile" + lazy val protobuf = "com.google.protobuf" % "protobuf-java" % "2.3.0" % "compile" //New BSD - lazy val protobuf = "com.google.protobuf" % "protobuf-java" % "2.3.0" % "compile" + lazy val osgi_core = "org.osgi" % "org.osgi.core" % "4.2.0" //ApacheV2 - lazy val osgi_core = "org.osgi" % "org.osgi.core" % "4.2.0" + lazy val rabbit = "com.rabbitmq" % "amqp-client" % "1.8.1" % "compile" //Mozilla public license - lazy val rabbit = "com.rabbitmq" % "amqp-client" % "1.8.1" % "compile" + lazy val redis = "com.redis" % "redisclient" % "2.8.0-2.0.3" % "compile" //ApacheV2 - lazy val redis = "com.redis" % "redisclient" % "2.8.0-2.0.3" % "compile" + lazy val sbinary = "sbinary" % "sbinary" % "2.8.0-0.3.1" % "compile" //MIT - lazy val sbinary = "sbinary" % "sbinary" % "2.8.0-0.3.1" % "compile" + lazy val sjson = "sjson.json" % "sjson" % "0.8-2.8.0" % "compile" //ApacheV2 + lazy val sjson_test = "sjson.json" % "sjson" % "0.8-2.8.0" % "test" //ApacheV2 - lazy val sjson = "sjson.json" % "sjson" % "0.8-2.8.0" % "compile" - lazy val sjson_test = "sjson.json" % "sjson" % "0.8-2.8.0" % "test" + lazy val slf4j = "org.slf4j" % "slf4j-api" % SLF4J_VERSION % "compile" //MIT - lazy val slf4j = "org.slf4j" % "slf4j-api" % SLF4J_VERSION % "compile" + lazy val logback = "ch.qos.logback" % "logback-classic" % LOGBACK_VERSION % "compile" //LGPL 2.1 + lazy val logback_core = "ch.qos.logback" % "logback-core" % LOGBACK_VERSION % "compile" //LGPL 2.1 - lazy val logback = "ch.qos.logback" % "logback-classic" % LOGBACK_VERSION % "compile" - lazy val logback_core = "ch.qos.logback" % "logback-core" % LOGBACK_VERSION % "compile" + lazy val spring_beans = "org.springframework" % "spring-beans" % SPRING_VERSION % "compile" //ApacheV2 + lazy val spring_context = "org.springframework" % "spring-context" % SPRING_VERSION % "compile" //ApacheV2 - lazy val spring_beans = "org.springframework" % "spring-beans" % SPRING_VERSION % "compile" - lazy val spring_context = "org.springframework" % "spring-context" % SPRING_VERSION % "compile" + lazy val stax_api = "javax.xml.stream" % "stax-api" % "1.0-2" % "compile" //ApacheV2 - lazy val stax_api = "javax.xml.stream" % "stax-api" % "1.0-2" % "compile" + lazy val thrift = "com.facebook" % "thrift" % "r917130" % "compile" //ApacheV2 - lazy val thrift = "com.facebook" % "thrift" % "r917130" % "compile" + lazy val voldemort = "voldemort" % "voldemort" % "0.81" % "compile" //ApacheV2 + lazy val voldemort_contrib = "voldemort" % "voldemort-contrib" % "0.81" % "compile" //ApacheV2 + lazy val voldemort_needs_log4j = "org.slf4j" % "log4j-over-slf4j" % SLF4J_VERSION % "compile" //MIT - lazy val voldemort = "voldemort" % "voldemort" % "0.81" % "compile" - lazy val voldemort_contrib = "voldemort" % "voldemort-contrib" % "0.81" % "compile" - lazy val voldemort_needs_log4j = "org.slf4j" % "log4j-over-slf4j" % SLF4J_VERSION % "compile" + lazy val werkz = "org.codehaus.aspectwerkz" % "aspectwerkz-nodeps-jdk5" % ASPECTWERKZ_VERSION % "compile" //LGPL 2.1 + lazy val werkz_core = "org.codehaus.aspectwerkz" % "aspectwerkz-jdk5" % ASPECTWERKZ_VERSION % "compile" //LGPL 2.1 - lazy val werkz = "org.codehaus.aspectwerkz" % "aspectwerkz-nodeps-jdk5" % ASPECTWERKZ_VERSION % "compile" - lazy val werkz_core = "org.codehaus.aspectwerkz" % "aspectwerkz-jdk5" % ASPECTWERKZ_VERSION % "compile" + lazy val zookeeper = "org.apache.hadoop.zookeeper" % "zookeeper" % "3.2.2" % "compile" //ApacheV2 - lazy val zookeeper = "org.apache.hadoop.zookeeper" % "zookeeper" % "3.2.2" % "compile" + lazy val hadoop_core = "org.apache.hadoop" % "hadoop-core" % "0.20.2" % "compile" //ApacheV2 - lazy val hadoop_core = "org.apache.hadoop" % "hadoop-core" % "0.20.2" % "compile" + lazy val hbase_core = "org.apache.hbase" % "hbase-core" % "0.20.6" % "compile" //ApacheV2 - lazy val hbase_core = "org.apache.hbase" % "hbase-core" % "0.20.6" % "compile" - - lazy val google_coll = "com.google.collections" % "google-collections" % "1.0" % "compile" + lazy val google_coll = "com.google.collections" % "google-collections" % "1.0" % "compile" //ApacheV2 //Riak PB Client - lazy val riak_pb_client = "com.trifork" % "riak-java-pb-client" % "1.0-for-akka-by-ticktock" % "compile" + lazy val riak_pb_client = "com.trifork" % "riak-java-pb-client" % "1.0-for-akka-by-ticktock" % "compile" //ApacheV2 // Test - lazy val camel_spring = "org.apache.camel" % "camel-spring" % CAMEL_VERSION % "test" - lazy val cassandra_clhm = "org.apache.cassandra" % "clhm-production" % CASSANDRA_VERSION % "test" - lazy val commons_coll = "commons-collections" % "commons-collections" % "3.2.1" % "test" + lazy val camel_spring = "org.apache.camel" % "camel-spring" % CAMEL_VERSION % "test" //ApacheV2 + lazy val cassandra_clhm = "org.apache.cassandra" % "clhm-production" % CASSANDRA_VERSION % "test" //ApacheV2 + lazy val commons_coll = "commons-collections" % "commons-collections" % "3.2.1" % "test" //ApacheV2 - lazy val high_scale = "org.apache.cassandra" % "high-scale-lib" % CASSANDRA_VERSION % "test" - lazy val testJetty = "org.eclipse.jetty" % "jetty-server" % JETTY_VERSION % "test" - lazy val testJettyWebApp= "org.eclipse.jetty" % "jetty-webapp" % JETTY_VERSION % "test" + lazy val high_scale = "org.apache.cassandra" % "high-scale-lib" % CASSANDRA_VERSION % "test" //ApacheV2 + lazy val testJetty = "org.eclipse.jetty" % "jetty-server" % JETTY_VERSION % "test" //Eclipse license + lazy val testJettyWebApp= "org.eclipse.jetty" % "jetty-webapp" % JETTY_VERSION % "test" //Eclipse license - lazy val junit = "junit" % "junit" % "4.5" % "test" - lazy val mockito = "org.mockito" % "mockito-all" % "1.8.1" % "test" - lazy val scalatest = "org.scalatest" % "scalatest" % SCALATEST_VERSION % "test" - lazy val specs = "org.scala-tools.testing" %% "specs" % "1.6.5" % "test" + lazy val junit = "junit" % "junit" % "4.5" % "test" //Common Public License 1.0 + lazy val mockito = "org.mockito" % "mockito-all" % "1.8.1" % "test" //MIT + lazy val scalatest = "org.scalatest" % "scalatest" % SCALATEST_VERSION % "test" //ApacheV2 + lazy val specs = "org.scala-tools.testing" %% "specs" % "1.6.5" % "test" //MIT //HBase testing - lazy val hadoop_test = "org.apache.hadoop" % "hadoop-test" % "0.20.2" % "test" - lazy val hbase_test = "org.apache.hbase" % "hbase-test" % "0.20.6" % "test" - lazy val log4j = "log4j" % "log4j" % "1.2.15" % "test" - lazy val jetty_mortbay = "org.mortbay.jetty" % "jetty" % "6.1.14" % "test" + lazy val hadoop_test = "org.apache.hadoop" % "hadoop-test" % "0.20.2" % "test" //ApacheV2 + lazy val hbase_test = "org.apache.hbase" % "hbase-test" % "0.20.6" % "test" //ApacheV2 + lazy val log4j = "log4j" % "log4j" % "1.2.15" % "test" //ApacheV2 + lazy val jetty_mortbay = "org.mortbay.jetty" % "jetty" % "6.1.14" % "test" //Eclipse license //voldemort testing - lazy val jdom = "org.jdom" % "jdom" % "1.1" % "test" - lazy val vold_jetty = "org.mortbay.jetty" % "jetty" % "6.1.18" % "test" - lazy val velocity = "org.apache.velocity" % "velocity" % "1.6.2" % "test" - lazy val dbcp = "commons-dbcp" % "commons-dbcp" % "1.2.2" % "test" + lazy val jdom = "org.jdom" % "jdom" % "1.1" % "test" //JDOM license: ApacheV2 - acknowledgement + lazy val vold_jetty = "org.mortbay.jetty" % "jetty" % "6.1.18" % "test" //ApacheV2 + lazy val velocity = "org.apache.velocity" % "velocity" % "1.6.2" % "test" //ApacheV2 + lazy val dbcp = "commons-dbcp" % "commons-dbcp" % "1.2.2" % "test" //ApacheV2 //memcached lazy val spymemcached = "spy" % "memcached" % "2.5" % "compile" + + //simpledb + lazy val simpledb = "com.amazonaws" % "aws-java-sdk" % "1.0.14" % "compile" } // ------------------------------------------------------------------------------------------------------------------- @@ -313,8 +314,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { manifestClassPath.map(cp => ManifestAttributes( (Attributes.Name.CLASS_PATH, cp), (IMPLEMENTATION_TITLE, "Akka"), - (IMPLEMENTATION_URL, "http://akkasource.org"), - (IMPLEMENTATION_VENDOR, "The Akka Project") + (IMPLEMENTATION_URL, "http://akka.io"), + (IMPLEMENTATION_VENDOR, "Scalable Solutions AB") )).toList ::: getMainClass(false).map(MainClass(_)).toList @@ -396,7 +397,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { val artifactRE(path, artifactId, artifactVersion) = absPath val command = "mvn install:install-file" + " -Dfile=" + absPath + - " -DgroupId=akka" + + " -DgroupId=se.scalablesolutions.akka" + " -DartifactId=" + artifactId + " -Dversion=" + version + " -Dpackaging=jar -DgeneratePom=true" @@ -488,7 +489,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { // ------------------------------------------------------------------------------------------------------------------- class AkkaHttpProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { - val annotation = Dependencies.annotation + val jsr250 = Dependencies.jsr250 val atmo = Dependencies.atmo val atmo_jbossweb = Dependencies.atmo_jbossweb val atmo_jersey = Dependencies.atmo_jersey @@ -546,6 +547,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { new AkkaCouchDBProject(_), akka_persistence_common) lazy val akka_persistence_memcached= project("akka-persistence-memcached", "akka-persistence-memcached", new AkkaMemcachedProject(_), akka_persistence_common) + lazy val akka_persistence_simpledb= project("akka-persistence-simpledb", "akka-persistence-simpledb", + new AkkaSimpledbProject(_), akka_persistence_common) } // ------------------------------------------------------------------------------------------------------------------- @@ -675,6 +678,16 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { override def testOptions = createTestFilter( _.endsWith("Test")) } + class AkkaSimpledbProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) { + val memcached = Dependencies.simpledb + val commons_codec = Dependencies.commons_codec + val http = Dependencies.commonsHttpClient + + val scalatest = Dependencies.scalatest + + override def testOptions = createTestFilter( _.endsWith("Test")) + } + // ------------------------------------------------------------------------------------------------------------------- // akka-kernel subproject // -------------------------------------------------------------------------------------------------------------------