Merge branch 'master' of git-proxy:jboner/akka
This commit is contained in:
commit
bc7746493c
52 changed files with 1276 additions and 396 deletions
|
|
@ -413,8 +413,14 @@ trait Actor extends Logging {
|
|||
/**
|
||||
* Changes tha Actor's behavior to become the new 'Receive' (PartialFunction[Any, Unit]) handler.
|
||||
* Puts the behavior on top of the hotswap stack.
|
||||
* If "discardOld" is true, an unbecome will be issued prior to pushing the new behavior to the stack
|
||||
*/
|
||||
def become(behavior: Receive): Unit = self.hotswap = self.hotswap.push(behavior)
|
||||
def become(behavior: Receive, discardOld: Boolean = false) {
|
||||
if (discardOld)
|
||||
unbecome
|
||||
|
||||
self.hotswap = self.hotswap.push(behavior)
|
||||
}
|
||||
|
||||
/**
|
||||
* Reverts the Actor behavior to the previous one in the hotswap stack.
|
||||
|
|
|
|||
|
|
@ -72,7 +72,13 @@ abstract class UntypedActor extends Actor {
|
|||
/**
|
||||
* Java API for become
|
||||
*/
|
||||
def become(behavior: Procedure[Any]): Unit = super.become { case msg => behavior.apply(msg) }
|
||||
def become(behavior: Procedure[Any]):Unit = become(behavior,false)
|
||||
|
||||
/*
|
||||
* Java API for become with optional discardOld
|
||||
*/
|
||||
def become(behavior: Procedure[Any], discardOld: Boolean): Unit =
|
||||
super.become({ case msg => behavior.apply(msg) }, discardOld)
|
||||
|
||||
@throws(classOf[Exception])
|
||||
def onReceive(message: Any): Unit
|
||||
|
|
|
|||
|
|
@ -48,6 +48,9 @@ import java.util.concurrent.TimeUnit
|
|||
*/
|
||||
object Dispatchers extends Logging {
|
||||
val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
|
||||
val DEFAULT_SHUTDOWN_TIMEOUT = config.getLong("akka.actor.dispatcher-shutdown-timeout").
|
||||
map(time => Duration(time, TIME_UNIT)).
|
||||
getOrElse(Duration(1000,TimeUnit.MILLISECONDS))
|
||||
val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", -1)
|
||||
val MAILBOX_PUSH_TIME_OUT = Duration(config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-time", 10), TIME_UNIT)
|
||||
val THROUGHPUT_DEADLINE_TIME = Duration(config.getInt("akka.actor.throughput-deadline-time",-1), TIME_UNIT)
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.dispatch
|
||||
|
|
@ -10,6 +10,7 @@ import akka.routing.Dispatcher
|
|||
|
||||
import java.util.concurrent.locks.ReentrantLock
|
||||
import java.util.concurrent.TimeUnit
|
||||
import akka.japi.Procedure
|
||||
|
||||
class FutureTimeoutException(message: String) extends AkkaException(message)
|
||||
|
||||
|
|
@ -34,19 +35,24 @@ object Futures {
|
|||
f
|
||||
}
|
||||
|
||||
/**
|
||||
* (Blocking!)
|
||||
*/
|
||||
def awaitAll(futures: List[Future[_]]): Unit = futures.foreach(_.await)
|
||||
|
||||
/**
|
||||
* Returns the First Future that is completed
|
||||
* if no Future is completed, awaitOne optionally sleeps "sleepMs" millis and then re-scans
|
||||
* Returns the First Future that is completed (blocking!)
|
||||
*/
|
||||
def awaitOne(futures: List[Future[_]], sleepMs: Long = 0): Future[_] = {
|
||||
var future: Option[Future[_]] = None
|
||||
do {
|
||||
future = futures.find(_.isCompleted)
|
||||
if (sleepMs > 0 && future.isEmpty) Thread.sleep(sleepMs)
|
||||
} while (future.isEmpty)
|
||||
future.get
|
||||
def awaitOne(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = firstCompletedOf(futures).await
|
||||
|
||||
/**
|
||||
* Returns a Future to the result of the first future in the list that is completed
|
||||
*/
|
||||
def firstCompletedOf(futures: List[Future[_]], timeout: Long = Long.MaxValue): Future[_] = {
|
||||
val futureResult = new DefaultCompletableFuture[Any](timeout)
|
||||
val fun = (f: Future[_]) => futureResult completeWith f.asInstanceOf[Future[Any]]
|
||||
for(f <- futures) f onComplete fun
|
||||
futureResult
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -55,34 +61,10 @@ object Futures {
|
|||
def awaitMap[A,B](in: Traversable[Future[A]])(fun: (Future[A]) => B): Traversable[B] =
|
||||
in map { f => fun(f.await) }
|
||||
|
||||
/*
|
||||
def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = {
|
||||
import Actor.Sender.Self
|
||||
import Actor.{spawn, actor}
|
||||
|
||||
case class Result(res: Option[T])
|
||||
val handOff = new SynchronousQueue[Option[T]]
|
||||
spawn {
|
||||
try {
|
||||
println("f1 await")
|
||||
f1.await
|
||||
println("f1 offer")
|
||||
handOff.offer(f1.result)
|
||||
} catch {case _ => {}}
|
||||
}
|
||||
spawn {
|
||||
try {
|
||||
println("f2 await")
|
||||
f2.await
|
||||
println("f2 offer")
|
||||
println("f2 offer: " + f2.result)
|
||||
handOff.offer(f2.result)
|
||||
} catch {case _ => {}}
|
||||
}
|
||||
Thread.sleep(100)
|
||||
handOff.take
|
||||
}
|
||||
*/
|
||||
/**
|
||||
* Returns Future.resultOrException of the first completed of the 2 Futures provided (blocking!)
|
||||
*/
|
||||
def awaitEither[T](f1: Future[T], f2: Future[T]): Option[T] = awaitOne(List(f1,f2)).asInstanceOf[Future[T]].resultOrException
|
||||
}
|
||||
|
||||
sealed trait Future[T] {
|
||||
|
|
@ -100,6 +82,24 @@ sealed trait Future[T] {
|
|||
|
||||
def exception: Option[Throwable]
|
||||
|
||||
def onComplete(func: Future[T] => Unit): Future[T]
|
||||
|
||||
/**
|
||||
* Returns the current result, throws the exception is one has been raised, else returns None
|
||||
*/
|
||||
def resultOrException: Option[T] = {
|
||||
val r = result
|
||||
if (r.isDefined) result
|
||||
else {
|
||||
val problem = exception
|
||||
if (problem.isDefined) throw problem.get
|
||||
else None
|
||||
}
|
||||
}
|
||||
|
||||
/* Java API */
|
||||
def onComplete(proc: Procedure[Future[T]]): Future[T] = onComplete(f => proc(f))
|
||||
|
||||
def map[O](f: (T) => O): Future[O] = {
|
||||
val wrapped = this
|
||||
new Future[O] {
|
||||
|
|
@ -110,14 +110,21 @@ sealed trait Future[T] {
|
|||
def timeoutInNanos = wrapped.timeoutInNanos
|
||||
def result: Option[O] = { wrapped.result map f }
|
||||
def exception: Option[Throwable] = wrapped.exception
|
||||
def onComplete(func: Future[O] => Unit): Future[O] = { wrapped.onComplete(_ => func(this)); this }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
trait CompletableFuture[T] extends Future[T] {
|
||||
def completeWithResult(result: T)
|
||||
|
||||
def completeWithException(exception: Throwable)
|
||||
def completeWith(other: Future[T]) {
|
||||
val result = other.result
|
||||
val exception = other.exception
|
||||
if (result.isDefined) completeWithResult(result.get)
|
||||
else if (exception.isDefined) completeWithException(exception.get)
|
||||
//else TODO how to handle this case?
|
||||
}
|
||||
}
|
||||
|
||||
// Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
|
||||
|
|
@ -133,6 +140,7 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
|
|||
private var _completed: Boolean = _
|
||||
private var _result: Option[T] = None
|
||||
private var _exception: Option[Throwable] = None
|
||||
private var _listeners: List[Future[T] => Unit] = Nil
|
||||
|
||||
def await = try {
|
||||
_lock.lock
|
||||
|
|
@ -190,33 +198,67 @@ class DefaultCompletableFuture[T](timeout: Long) extends CompletableFuture[T] {
|
|||
_lock.unlock
|
||||
}
|
||||
|
||||
def completeWithResult(result: T) = try {
|
||||
_lock.lock
|
||||
if (!_completed) {
|
||||
_completed = true
|
||||
_result = Some(result)
|
||||
onComplete(result)
|
||||
def completeWithResult(result: T) {
|
||||
val notify = try {
|
||||
_lock.lock
|
||||
if (!_completed) {
|
||||
_completed = true
|
||||
_result = Some(result)
|
||||
true
|
||||
} else false
|
||||
} finally {
|
||||
_signal.signalAll
|
||||
_lock.unlock
|
||||
}
|
||||
} finally {
|
||||
_signal.signalAll
|
||||
_lock.unlock
|
||||
|
||||
if (notify)
|
||||
notifyListeners
|
||||
}
|
||||
|
||||
def completeWithException(exception: Throwable) = try {
|
||||
_lock.lock
|
||||
if (!_completed) {
|
||||
_completed = true
|
||||
_exception = Some(exception)
|
||||
onCompleteException(exception)
|
||||
def completeWithException(exception: Throwable) {
|
||||
val notify = try {
|
||||
_lock.lock
|
||||
if (!_completed) {
|
||||
_completed = true
|
||||
_exception = Some(exception)
|
||||
true
|
||||
} else false
|
||||
} finally {
|
||||
_signal.signalAll
|
||||
_lock.unlock
|
||||
}
|
||||
} finally {
|
||||
_signal.signalAll
|
||||
_lock.unlock
|
||||
|
||||
if (notify)
|
||||
notifyListeners
|
||||
}
|
||||
|
||||
protected def onComplete(result: T) {}
|
||||
def onComplete(func: Future[T] => Unit): CompletableFuture[T] = {
|
||||
val notifyNow = try {
|
||||
_lock.lock
|
||||
if (!_completed) {
|
||||
_listeners ::= func
|
||||
false
|
||||
}
|
||||
else
|
||||
true
|
||||
} finally {
|
||||
_lock.unlock
|
||||
}
|
||||
|
||||
protected def onCompleteException(exception: Throwable) {}
|
||||
if (notifyNow)
|
||||
notifyListener(func)
|
||||
|
||||
this
|
||||
}
|
||||
|
||||
private def notifyListeners() {
|
||||
for(l <- _listeners)
|
||||
notifyListener(l)
|
||||
}
|
||||
|
||||
private def notifyListener(func: Future[T] => Unit) {
|
||||
func(this)
|
||||
}
|
||||
|
||||
private def currentTimeInNanos: Long = TIME_UNIT.toNanos(System.currentTimeMillis)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -145,7 +145,11 @@ trait MessageDispatcher extends MailboxFactory with Logging {
|
|||
}
|
||||
}
|
||||
|
||||
private[akka] def timeoutMs: Long = 1000
|
||||
/**
|
||||
* When the dispatcher no longer has any actors registered, how long will it wait until it shuts itself down, in Ms
|
||||
* defaulting to your akka configs "akka.actor.dispatcher-shutdown-timeout" or otherwise, 1 Second
|
||||
*/
|
||||
private[akka] def timeoutMs: Long = Dispatchers.DEFAULT_SHUTDOWN_TIMEOUT.toMillis
|
||||
|
||||
/**
|
||||
* After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference
|
||||
|
|
|
|||
|
|
@ -259,7 +259,7 @@ object ReflectiveAccess extends Logging {
|
|||
Some(ctor.newInstance(args: _*).asInstanceOf[T])
|
||||
} catch {
|
||||
case e =>
|
||||
log.debug("Could not instantiate class [%s] due to [%s]", clazz.getName, e)
|
||||
log.warning("Could not instantiate class [%s] due to [%s]", clazz.getName, e.getCause)
|
||||
None
|
||||
}
|
||||
|
||||
|
|
@ -276,7 +276,7 @@ object ReflectiveAccess extends Logging {
|
|||
Some(ctor.newInstance(args: _*).asInstanceOf[T])
|
||||
} catch {
|
||||
case e =>
|
||||
log.debug("Could not instantiate class [%s] due to [%s]", fqn, e)
|
||||
log.warning("Could not instantiate class [%s] due to [%s]", fqn, e.getCause)
|
||||
None
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ import akka.actor.Actor._
|
|||
import java.util.concurrent.atomic.AtomicLong
|
||||
import java.util.concurrent. {ConcurrentHashMap, CountDownLatch, TimeUnit}
|
||||
import akka.actor.dispatch.ActorModelSpec.MessageDispatcherInterceptor
|
||||
import akka.util.Duration
|
||||
|
||||
object ActorModelSpec {
|
||||
|
||||
|
|
@ -164,6 +165,18 @@ object ActorModelSpec {
|
|||
assert(stats.restarts.get() === restarts, "Restarts")
|
||||
}
|
||||
|
||||
def await(condition: => Boolean)(withinMs: Long, intervalMs: Long = 25): Boolean = try {
|
||||
val until = System.currentTimeMillis() + withinMs
|
||||
while(System.currentTimeMillis() <= until) {
|
||||
try {
|
||||
if (condition) return true
|
||||
|
||||
Thread.sleep(intervalMs)
|
||||
} catch { case e: InterruptedException => }
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
def newTestActor(implicit d: MessageDispatcherInterceptor) = actorOf(new DispatcherActor(d))
|
||||
}
|
||||
|
||||
|
|
@ -179,7 +192,7 @@ abstract class ActorModelSpec extends JUnitSuite {
|
|||
a.start
|
||||
assertDispatcher(dispatcher)(starts = 1, stops = 0)
|
||||
a.stop
|
||||
Thread.sleep(dispatcher.timeoutMs + 100)
|
||||
await(dispatcher.stops.get == 1)(withinMs = dispatcher.timeoutMs * 5)
|
||||
assertDispatcher(dispatcher)(starts = 1, stops = 1)
|
||||
assertRef(a,dispatcher)(
|
||||
suspensions = 0,
|
||||
|
|
@ -279,7 +292,7 @@ abstract class ActorModelSpec extends JUnitSuite {
|
|||
}
|
||||
for(run <- 1 to 3) {
|
||||
flood(10000)
|
||||
Thread.sleep(dispatcher.timeoutMs * 2)
|
||||
await(dispatcher.stops.get == run)(withinMs = 10000)
|
||||
assertDispatcher(dispatcher)(starts = run, stops = run)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import org.junit.Test
|
|||
import akka.dispatch.Futures
|
||||
import Actor._
|
||||
import org.multiverse.api.latches.StandardLatch
|
||||
import java.util.concurrent.CountDownLatch
|
||||
|
||||
object FutureSpec {
|
||||
class TestActor extends Actor {
|
||||
|
|
@ -53,7 +54,6 @@ class FutureSpec extends JUnitSuite {
|
|||
actor.stop
|
||||
}
|
||||
|
||||
/*
|
||||
// FIXME: implement Futures.awaitEither, and uncomment these two tests
|
||||
@Test def shouldFutureAwaitEitherLeft = {
|
||||
val actor1 = actorOf[TestActor].start
|
||||
|
|
@ -78,7 +78,7 @@ class FutureSpec extends JUnitSuite {
|
|||
actor1.stop
|
||||
actor2.stop
|
||||
}
|
||||
*/
|
||||
|
||||
@Test def shouldFutureAwaitOneLeft = {
|
||||
val actor1 = actorOf[TestActor].start
|
||||
val actor2 = actorOf[TestActor].start
|
||||
|
|
|
|||
|
|
@ -24,4 +24,11 @@ public @interface consume {
|
|||
*/
|
||||
public abstract String value();
|
||||
|
||||
/**
|
||||
* Route definition handler class for customizing route to annotated method.
|
||||
* The handler class must have a default constructor.
|
||||
*/
|
||||
public abstract Class<? extends RouteDefinitionHandler> routeDefinitionHandler()
|
||||
default RouteDefinitionIdentity.class;
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,16 +4,27 @@
|
|||
|
||||
package akka.camel
|
||||
|
||||
import akka.actor._
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
import org.apache.camel.{Exchange, Processor}
|
||||
import org.apache.camel.model.{RouteDefinition, ProcessorDefinition}
|
||||
|
||||
import akka.actor._
|
||||
import akka.japi.{Function => JFunction}
|
||||
|
||||
/**
|
||||
* Mixed in by Actor implementations that consume message from Camel endpoints.
|
||||
*
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
trait Consumer { self: Actor =>
|
||||
import RouteDefinitionHandler._
|
||||
|
||||
/**
|
||||
* The default route definition handler is the identity function
|
||||
*/
|
||||
private[camel] var routeDefinitionHandler: RouteDefinitionHandler = identity
|
||||
|
||||
/**
|
||||
* Returns the Camel endpoint URI to consume messages from.
|
||||
*/
|
||||
|
|
@ -25,6 +36,18 @@ trait Consumer { self: Actor =>
|
|||
* doesn't have any effect on one-way communications (they'll never block).
|
||||
*/
|
||||
def blocking = false
|
||||
|
||||
/**
|
||||
* Sets the route definition handler for creating a custom route to this consumer instance.
|
||||
*/
|
||||
def onRouteDefinition(h: RouteDefinition => ProcessorDefinition[_]): Unit = onRouteDefinition(from(h))
|
||||
|
||||
/**
|
||||
* Sets the route definition handler for creating a custom route to this consumer instance.
|
||||
* <p>
|
||||
* Java API.
|
||||
*/
|
||||
def onRouteDefinition(h: RouteDefinitionHandler): Unit = routeDefinitionHandler = h
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -73,6 +96,42 @@ abstract class RemoteUntypedConsumerActor(address: InetSocketAddress) extends Re
|
|||
def this(host: String, port: Int) = this(new InetSocketAddress(host, port))
|
||||
}
|
||||
|
||||
/**
|
||||
* A callback handler for route definitions to consumer actors.
|
||||
*
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
trait RouteDefinitionHandler {
|
||||
def onRouteDefinition(rd: RouteDefinition): ProcessorDefinition[_]
|
||||
}
|
||||
|
||||
/**
|
||||
* The identity route definition handler.
|
||||
*
|
||||
* @author Martin Krasser
|
||||
*
|
||||
*/
|
||||
class RouteDefinitionIdentity extends RouteDefinitionHandler {
|
||||
def onRouteDefinition(rd: RouteDefinition) = rd
|
||||
}
|
||||
|
||||
/**
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
object RouteDefinitionHandler {
|
||||
/**
|
||||
* Returns the identity route definition handler
|
||||
*/
|
||||
val identity = new RouteDefinitionIdentity
|
||||
|
||||
/**
|
||||
* Created a route definition handler from the given function.
|
||||
*/
|
||||
def from(f: RouteDefinition => ProcessorDefinition[_]) = new RouteDefinitionHandler {
|
||||
def onRouteDefinition(rd: RouteDefinition) = f(rd)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import java.lang.reflect.Method
|
|||
import java.util.concurrent.CountDownLatch
|
||||
|
||||
import org.apache.camel.builder.RouteBuilder
|
||||
import org.apache.camel.model.{ProcessorDefinition, RouteDefinition}
|
||||
|
||||
import akka.actor._
|
||||
import akka.camel.component.TypedActorComponent
|
||||
|
|
@ -22,41 +23,35 @@ private[camel] object ConsumerPublisher extends Logging {
|
|||
/**
|
||||
* Creates a route to the registered consumer actor.
|
||||
*/
|
||||
def handleConsumerRegistered(event: ConsumerRegistered) {
|
||||
CamelContextManager.mandatoryContext.addRoutes(new ConsumerActorRoute(event.uri, event.uuid, event.blocking))
|
||||
log.info("published actor %s at endpoint %s" format (event.actorRef, event.uri))
|
||||
def handleConsumerActorRegistered(event: ConsumerActorRegistered) {
|
||||
CamelContextManager.mandatoryContext.addRoutes(new ConsumerActorRouteBuilder(event))
|
||||
log.info("published actor %s at endpoint %s" format (event.actorRef, event.endpointUri))
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops the route to the already un-registered consumer actor.
|
||||
*/
|
||||
def handleConsumerUnregistered(event: ConsumerUnregistered) {
|
||||
CamelContextManager.mandatoryContext.stopRoute(event.uuid.toString)
|
||||
log.info("unpublished actor %s from endpoint %s" format (event.actorRef, event.uri))
|
||||
def handleConsumerActorUnregistered(event: ConsumerActorUnregistered) {
|
||||
CamelContextManager.mandatoryContext.stopRoute(event.uuid)
|
||||
log.info("unpublished actor %s from endpoint %s" format (event.actorRef, event.endpointUri))
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a route to an typed actor method.
|
||||
*/
|
||||
def handleConsumerMethodRegistered(event: ConsumerMethodRegistered) {
|
||||
val targetMethod = event.method.getName
|
||||
val objectId = "%s_%s" format (event.init.actorRef.uuid, targetMethod)
|
||||
|
||||
CamelContextManager.typedActorRegistry.put(objectId, event.typedActor)
|
||||
CamelContextManager.mandatoryContext.addRoutes(new ConsumerMethodRoute(event.uri, objectId, targetMethod))
|
||||
log.info("published method %s of %s at endpoint %s" format (targetMethod, event.typedActor, event.uri))
|
||||
CamelContextManager.typedActorRegistry.put(event.methodUuid, event.typedActor)
|
||||
CamelContextManager.mandatoryContext.addRoutes(new ConsumerMethodRouteBuilder(event))
|
||||
log.info("published method %s of %s at endpoint %s" format (event.methodName, event.typedActor, event.endpointUri))
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops the route to the already un-registered consumer actor method.
|
||||
*/
|
||||
def handleConsumerMethodUnregistered(event: ConsumerMethodUnregistered) {
|
||||
val targetMethod = event.method.getName
|
||||
val objectId = "%s_%s" format (event.init.actorRef.uuid, targetMethod)
|
||||
|
||||
CamelContextManager.typedActorRegistry.remove(objectId)
|
||||
CamelContextManager.mandatoryContext.stopRoute(objectId)
|
||||
log.info("unpublished method %s of %s from endpoint %s" format (targetMethod, event.typedActor, event.uri))
|
||||
CamelContextManager.typedActorRegistry.remove(event.methodUuid)
|
||||
CamelContextManager.mandatoryContext.stopRoute(event.methodUuid)
|
||||
log.info("unpublished method %s of %s from endpoint %s" format (event.methodName, event.typedActor, event.endpointUri))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -64,8 +59,8 @@ private[camel] object ConsumerPublisher extends Logging {
|
|||
* Actor that publishes consumer actors and typed actor methods at Camel endpoints.
|
||||
* The Camel context used for publishing is obtained via CamelContextManager.context.
|
||||
* This actor accepts messages of type
|
||||
* akka.camel.ConsumerRegistered,
|
||||
* akka.camel.ConsumerUnregistered,
|
||||
* akka.camel.ConsumerActorRegistered,
|
||||
* akka.camel.ConsumerActorUnregistered,
|
||||
* akka.camel.ConsumerMethodRegistered and
|
||||
* akka.camel.ConsumerMethodUnregistered.
|
||||
*
|
||||
|
|
@ -78,12 +73,12 @@ private[camel] class ConsumerPublisher extends Actor {
|
|||
@volatile private var unregistrationLatch = new CountDownLatch(0)
|
||||
|
||||
protected def receive = {
|
||||
case r: ConsumerRegistered => {
|
||||
handleConsumerRegistered(r)
|
||||
case r: ConsumerActorRegistered => {
|
||||
handleConsumerActorRegistered(r)
|
||||
registrationLatch.countDown
|
||||
}
|
||||
case u: ConsumerUnregistered => {
|
||||
handleConsumerUnregistered(u)
|
||||
case u: ConsumerActorUnregistered => {
|
||||
handleConsumerActorUnregistered(u)
|
||||
unregistrationLatch.countDown
|
||||
}
|
||||
case mr: ConsumerMethodRegistered => {
|
||||
|
|
@ -117,7 +112,7 @@ private[camel] case class SetExpectedUnregistrationCount(num: Int)
|
|||
*
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
private[camel] abstract class ConsumerRoute(endpointUri: String, id: String) extends RouteBuilder {
|
||||
private[camel] abstract class ConsumerRouteBuilder(endpointUri: String, id: String) extends RouteBuilder {
|
||||
// TODO: make conversions configurable
|
||||
private val bodyConversions = Map(
|
||||
"file" -> classOf[InputStream]
|
||||
|
|
@ -125,39 +120,39 @@ private[camel] abstract class ConsumerRoute(endpointUri: String, id: String) ext
|
|||
|
||||
def configure = {
|
||||
val schema = endpointUri take endpointUri.indexOf(":") // e.g. "http" from "http://whatever/..."
|
||||
bodyConversions.get(schema) match {
|
||||
case Some(clazz) => from(endpointUri).routeId(id).convertBodyTo(clazz).to(targetUri)
|
||||
case None => from(endpointUri).routeId(id).to(targetUri)
|
||||
}
|
||||
val cnvopt = bodyConversions.get(schema)
|
||||
|
||||
onRouteDefinition(startRouteDefinition(cnvopt)).to(targetUri)
|
||||
}
|
||||
|
||||
protected def routeDefinitionHandler: RouteDefinitionHandler
|
||||
protected def targetUri: String
|
||||
|
||||
private def onRouteDefinition(rd: RouteDefinition) = routeDefinitionHandler.onRouteDefinition(rd)
|
||||
private def startRouteDefinition(bodyConversion: Option[Class[_]]): RouteDefinition = bodyConversion match {
|
||||
case Some(clazz) => from(endpointUri).routeId(id).convertBodyTo(clazz)
|
||||
case None => from(endpointUri).routeId(id)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines the route to a (untyped) consumer actor.
|
||||
*
|
||||
* @param endpointUri endpoint URI of the (untyped) consumer actor
|
||||
* @param uuid actor uuid
|
||||
* @param blocking true for blocking in-out exchanges, false otherwise
|
||||
*
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
private[camel] class ConsumerActorRoute(endpointUri: String, uuid: Uuid, blocking: Boolean) extends ConsumerRoute(endpointUri, uuid.toString) {
|
||||
protected override def targetUri = "actor:uuid:%s?blocking=%s" format (uuid, blocking)
|
||||
private[camel] class ConsumerActorRouteBuilder(event: ConsumerActorRegistered) extends ConsumerRouteBuilder(event.endpointUri, event.uuid) {
|
||||
protected def routeDefinitionHandler: RouteDefinitionHandler = event.routeDefinitionHandler
|
||||
protected def targetUri = "actor:uuid:%s?blocking=%s" format (event.uuid, event.blocking)
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines the route to a typed actor method.
|
||||
*
|
||||
* @param endpointUri endpoint URI of the consumer actor method
|
||||
* @param id typed actor identifier
|
||||
* @param method name of the method to invoke.
|
||||
*
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
private[camel] class ConsumerMethodRoute(val endpointUri: String, id: String, method: String) extends ConsumerRoute(endpointUri, id) {
|
||||
protected override def targetUri = "%s:%s?method=%s" format (TypedActorComponent.InternalSchema, id, method)
|
||||
private[camel] class ConsumerMethodRouteBuilder(event: ConsumerMethodRegistered) extends ConsumerRouteBuilder(event.endpointUri, event.methodUuid) {
|
||||
protected def routeDefinitionHandler: RouteDefinitionHandler = event.routeDefinitionHandler
|
||||
protected def targetUri = "%s:%s?method=%s" format (TypedActorComponent.InternalSchema, event.methodUuid, event.methodName)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -179,9 +174,9 @@ private[camel] class PublishRequestor extends Actor {
|
|||
|
||||
protected def receive = {
|
||||
case ActorRegistered(actor) =>
|
||||
for (event <- ConsumerRegistered.forConsumer(actor)) deliverCurrentEvent(event)
|
||||
for (event <- ConsumerActorRegistered.forConsumer(actor)) deliverCurrentEvent(event)
|
||||
case ActorUnregistered(actor) =>
|
||||
for (event <- ConsumerUnregistered.forConsumer(actor)) deliverCurrentEvent(event)
|
||||
for (event <- ConsumerActorUnregistered.forConsumer(actor)) deliverCurrentEvent(event)
|
||||
case AspectInitRegistered(proxy, init) =>
|
||||
for (event <- ConsumerMethodRegistered.forConsumer(proxy, init)) deliverCurrentEvent(event)
|
||||
case AspectInitUnregistered(proxy, init) =>
|
||||
|
|
@ -214,71 +209,72 @@ private[camel] case class PublishRequestorInit(consumerPublisher: ActorRef)
|
|||
|
||||
/**
|
||||
* A consumer (un)registration event.
|
||||
*
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
private[camel] sealed trait ConsumerEvent
|
||||
|
||||
/**
|
||||
* Event indicating that a consumer actor has been registered at the actor registry.
|
||||
*
|
||||
* @param actorRef actor reference
|
||||
* @param uri endpoint URI of the consumer actor
|
||||
* @param uuid actor uuid
|
||||
* @param blocking true for blocking in-out exchanges, false otherwise
|
||||
*
|
||||
* @author Martin Krasser
|
||||
* A consumer actor (un)registration event.
|
||||
*/
|
||||
private[camel] case class ConsumerRegistered(actorRef: ActorRef, uri: String, uuid: Uuid, blocking: Boolean) extends ConsumerEvent
|
||||
private[camel] trait ConsumerActorEvent extends ConsumerEvent {
|
||||
val actorRef: ActorRef
|
||||
val actor: Consumer
|
||||
|
||||
val uuid = actorRef.uuid.toString
|
||||
val endpointUri = actor.endpointUri
|
||||
val blocking = actor.blocking
|
||||
val routeDefinitionHandler = actor.routeDefinitionHandler
|
||||
}
|
||||
|
||||
/**
|
||||
* A consumer method (un)registration event.
|
||||
*/
|
||||
private[camel] trait ConsumerMethodEvent extends ConsumerEvent {
|
||||
val typedActor: AnyRef
|
||||
val init: AspectInit
|
||||
val method: Method
|
||||
|
||||
val uuid = init.actorRef.uuid.toString
|
||||
val methodName = method.getName
|
||||
val methodUuid = "%s_%s" format (uuid, methodName)
|
||||
|
||||
lazy val routeDefinitionHandler = consumeAnnotation.routeDefinitionHandler.newInstance
|
||||
lazy val consumeAnnotation = method.getAnnotation(classOf[consume])
|
||||
lazy val endpointUri = consumeAnnotation.value
|
||||
}
|
||||
|
||||
/**
|
||||
* Event indicating that a consumer actor has been registered at the actor registry.
|
||||
*/
|
||||
private[camel] case class ConsumerActorRegistered(actorRef: ActorRef, actor: Consumer) extends ConsumerActorEvent
|
||||
|
||||
/**
|
||||
* Event indicating that a consumer actor has been unregistered from the actor registry.
|
||||
*
|
||||
* @param actorRef actor reference
|
||||
* @param uri endpoint URI of the consumer actor
|
||||
* @param uuid actor uuid
|
||||
*
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
private[camel] case class ConsumerUnregistered(actorRef: ActorRef, uri: String, uuid: Uuid) extends ConsumerEvent
|
||||
private[camel] case class ConsumerActorUnregistered(actorRef: ActorRef, actor: Consumer) extends ConsumerActorEvent
|
||||
|
||||
/**
|
||||
* Event indicating that an typed actor proxy has been created for a typed actor. For each <code>@consume</code>
|
||||
* annotated typed actor method a separate instance of this class is created.
|
||||
*
|
||||
* @param typedActor typed actor (proxy).
|
||||
* @param init
|
||||
* @param uri endpoint URI of the typed actor method
|
||||
* @param method method to be published.
|
||||
*
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
private[camel] case class ConsumerMethodRegistered(typedActor: AnyRef, init: AspectInit, uri: String, method: Method) extends ConsumerEvent
|
||||
private[camel] case class ConsumerMethodRegistered(typedActor: AnyRef, init: AspectInit, method: Method) extends ConsumerMethodEvent
|
||||
|
||||
/**
|
||||
* Event indicating that an typed actor has been stopped. For each <code>@consume</code>
|
||||
* annotated typed object method a separate instance of this class is created.
|
||||
*
|
||||
* @param typedActor typed actor (proxy).
|
||||
* @param init
|
||||
* @param uri endpoint URI of the typed actor method
|
||||
* @param method method to be un-published.
|
||||
*
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
private[camel] case class ConsumerMethodUnregistered(typedActor: AnyRef, init: AspectInit, uri: String, method: Method) extends ConsumerEvent
|
||||
private[camel] case class ConsumerMethodUnregistered(typedActor: AnyRef, init: AspectInit, method: Method) extends ConsumerMethodEvent
|
||||
|
||||
/**
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
private[camel] object ConsumerRegistered {
|
||||
private[camel] object ConsumerActorRegistered {
|
||||
/**
|
||||
* Creates an ConsumerRegistered event message for a consumer actor or None if
|
||||
* Creates an ConsumerActorRegistered event message for a consumer actor or None if
|
||||
* <code>actorRef</code> is not a consumer actor.
|
||||
*/
|
||||
def forConsumer(actorRef: ActorRef): Option[ConsumerRegistered] = {
|
||||
Consumer.forConsumer[ConsumerRegistered](actorRef) {
|
||||
target => ConsumerRegistered(actorRef, target.endpointUri, actorRef.uuid, target.blocking)
|
||||
def forConsumer(actorRef: ActorRef): Option[ConsumerActorRegistered] = {
|
||||
Consumer.forConsumer[ConsumerActorRegistered](actorRef) {
|
||||
actor => ConsumerActorRegistered(actorRef, actor)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -286,14 +282,14 @@ private[camel] object ConsumerRegistered {
|
|||
/**
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
private[camel] object ConsumerUnregistered {
|
||||
private[camel] object ConsumerActorUnregistered {
|
||||
/**
|
||||
* Creates an ConsumerUnregistered event message for a consumer actor or None if
|
||||
* Creates an ConsumerActorUnregistered event message for a consumer actor or None if
|
||||
* <code>actorRef</code> is not a consumer actor.
|
||||
*/
|
||||
def forConsumer(actorRef: ActorRef): Option[ConsumerUnregistered] = {
|
||||
Consumer.forConsumer[ConsumerUnregistered](actorRef) {
|
||||
target => ConsumerUnregistered(actorRef, target.endpointUri, actorRef.uuid)
|
||||
def forConsumer(actorRef: ActorRef): Option[ConsumerActorUnregistered] = {
|
||||
Consumer.forConsumer[ConsumerActorUnregistered](actorRef) {
|
||||
actor => ConsumerActorUnregistered(actorRef, actor)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -333,7 +329,7 @@ private[camel] object ConsumerMethodRegistered {
|
|||
*/
|
||||
def forConsumer(typedActor: AnyRef, init: AspectInit): List[ConsumerMethodRegistered] = {
|
||||
ConsumerMethod.forConsumer(typedActor, init) {
|
||||
m => ConsumerMethodRegistered(typedActor, init, m.getAnnotation(classOf[consume]).value, m)
|
||||
m => ConsumerMethodRegistered(typedActor, init, m)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -349,7 +345,7 @@ private[camel] object ConsumerMethodUnregistered {
|
|||
*/
|
||||
def forConsumer(typedActor: AnyRef, init: AspectInit): List[ConsumerMethodUnregistered] = {
|
||||
ConsumerMethod.forConsumer(typedActor, init) {
|
||||
m => ConsumerMethodUnregistered(typedActor, init, m.getAnnotation(classOf[consume]).value, m)
|
||||
m => ConsumerMethodUnregistered(typedActor, init, m)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,59 @@
|
|||
package akka.camel;
|
||||
|
||||
import akka.actor.ActorRegistry;
|
||||
import akka.actor.TypedActor;
|
||||
import akka.actor.UntypedActor;
|
||||
import akka.japi.SideEffect;
|
||||
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import static akka.camel.CamelContextManager.*;
|
||||
import static akka.camel.CamelServiceManager.*;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
/**
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
public class ConsumerJavaTestBase {
|
||||
|
||||
private SampleErrorHandlingTypedConsumer consumer;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() {
|
||||
startCamelService();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() {
|
||||
stopCamelService();
|
||||
ActorRegistry.shutdownAll();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldHandleExceptionThrownByActorAndGenerateCustomResponse() {
|
||||
getMandatoryService().awaitEndpointActivation(1, new SideEffect() {
|
||||
public void apply() {
|
||||
UntypedActor.actorOf(SampleErrorHandlingConsumer.class).start();
|
||||
}
|
||||
});
|
||||
String result = getMandatoryTemplate().requestBody("direct:error-handler-test-java", "hello", String.class);
|
||||
assertEquals("error: hello", result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void shouldHandleExceptionThrownByTypedActorAndGenerateCustomResponse() {
|
||||
getMandatoryService().awaitEndpointActivation(1, new SideEffect() {
|
||||
public void apply() {
|
||||
consumer = TypedActor.newInstance(
|
||||
SampleErrorHandlingTypedConsumer.class,
|
||||
SampleErrorHandlingTypedConsumerImpl.class);
|
||||
}
|
||||
});
|
||||
String result = getMandatoryTemplate().requestBody("direct:error-handler-test-java-typed", "hello", String.class);
|
||||
assertEquals("error: hello", result);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,34 @@
|
|||
package akka.camel;
|
||||
|
||||
import org.apache.camel.builder.Builder;
|
||||
import org.apache.camel.model.ProcessorDefinition;
|
||||
import org.apache.camel.model.RouteDefinition;
|
||||
|
||||
/**
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
public class SampleErrorHandlingConsumer extends UntypedConsumerActor {
|
||||
|
||||
public String getEndpointUri() {
|
||||
return "direct:error-handler-test-java";
|
||||
}
|
||||
|
||||
public boolean isBlocking() {
|
||||
return true;
|
||||
}
|
||||
|
||||
public void preStart() {
|
||||
onRouteDefinition(new RouteDefinitionHandler() {
|
||||
public ProcessorDefinition<?> onRouteDefinition(RouteDefinition rd) {
|
||||
return rd.onException(Exception.class).handled(true).transform(Builder.exceptionMessage()).end();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void onReceive(Object message) throws Exception {
|
||||
Message msg = (Message)message;
|
||||
String body = msg.getBodyAs(String.class);
|
||||
throw new Exception(String.format("error: %s", body));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,11 @@
|
|||
package akka.camel;
|
||||
|
||||
/**
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
public interface SampleErrorHandlingTypedConsumer {
|
||||
|
||||
@consume(value="direct:error-handler-test-java-typed", routeDefinitionHandler=SampleRouteDefinitionHandler.class)
|
||||
String willFail(String s);
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
package akka.camel;
|
||||
|
||||
import akka.actor.TypedActor;
|
||||
|
||||
/**
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
public class SampleErrorHandlingTypedConsumerImpl extends TypedActor implements SampleErrorHandlingTypedConsumer {
|
||||
|
||||
public String willFail(String s) {
|
||||
throw new RuntimeException(String.format("error: %s", s));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
package akka.camel;
|
||||
|
||||
import org.apache.camel.builder.Builder;
|
||||
import org.apache.camel.model.ProcessorDefinition;
|
||||
import org.apache.camel.model.RouteDefinition;
|
||||
|
||||
/**
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
public class SampleRouteDefinitionHandler implements RouteDefinitionHandler {
|
||||
public ProcessorDefinition<?> onRouteDefinition(RouteDefinition rd) {
|
||||
return rd.onException(Exception.class).handled(true).transform(Builder.exceptionMessage()).end();
|
||||
}
|
||||
}
|
||||
5
akka-camel/src/test/scala/ConsumerJavaTest.scala
Normal file
5
akka-camel/src/test/scala/ConsumerJavaTest.scala
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
package akka.camel
|
||||
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
|
||||
class ConsumerJavaTest extends ConsumerJavaTestBase with JUnitSuite
|
||||
|
|
@ -2,46 +2,47 @@ package akka.camel
|
|||
|
||||
import org.junit.Test
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
|
||||
import akka.actor.{Actor, UntypedActor}
|
||||
import akka.actor.{ActorRef, Actor, UntypedActor}
|
||||
|
||||
class ConsumerRegisteredTest extends JUnitSuite {
|
||||
import ConsumerRegisteredTest._
|
||||
|
||||
@Test def shouldCreateSomeNonBlockingPublishRequestFromConsumer = {
|
||||
val c = Actor.actorOf[ConsumerActor1]
|
||||
val event = ConsumerRegistered.forConsumer(c)
|
||||
assert(event === Some(ConsumerRegistered(c, "mock:test1", c.uuid, false)))
|
||||
val event = ConsumerActorRegistered.forConsumer(c)
|
||||
assert(event === Some(ConsumerActorRegistered(c, consumerOf(c))))
|
||||
}
|
||||
|
||||
@Test def shouldCreateSomeBlockingPublishRequestFromConsumer = {
|
||||
val c = Actor.actorOf[ConsumerActor2]
|
||||
val event = ConsumerRegistered.forConsumer(c)
|
||||
assert(event === Some(ConsumerRegistered(c, "mock:test2", c.uuid, true)))
|
||||
val event = ConsumerActorRegistered.forConsumer(c)
|
||||
assert(event === Some(ConsumerActorRegistered(c, consumerOf(c))))
|
||||
}
|
||||
|
||||
@Test def shouldCreateNoneFromConsumer = {
|
||||
val event = ConsumerRegistered.forConsumer(Actor.actorOf[PlainActor])
|
||||
val event = ConsumerActorRegistered.forConsumer(Actor.actorOf[PlainActor])
|
||||
assert(event === None)
|
||||
}
|
||||
|
||||
@Test def shouldCreateSomeNonBlockingPublishRequestFromUntypedConsumer = {
|
||||
val uc = UntypedActor.actorOf(classOf[SampleUntypedConsumer])
|
||||
val event = ConsumerRegistered.forConsumer(uc)
|
||||
assert(event === Some(ConsumerRegistered(uc, "direct:test-untyped-consumer", uc.uuid, false)))
|
||||
val event = ConsumerActorRegistered.forConsumer(uc)
|
||||
assert(event === Some(ConsumerActorRegistered(uc, consumerOf(uc))))
|
||||
}
|
||||
|
||||
@Test def shouldCreateSomeBlockingPublishRequestFromUntypedConsumer = {
|
||||
val uc = UntypedActor.actorOf(classOf[SampleUntypedConsumerBlocking])
|
||||
val event = ConsumerRegistered.forConsumer(uc)
|
||||
assert(event === Some(ConsumerRegistered(uc, "direct:test-untyped-consumer-blocking", uc.uuid, true)))
|
||||
val event = ConsumerActorRegistered.forConsumer(uc)
|
||||
assert(event === Some(ConsumerActorRegistered(uc, consumerOf(uc))))
|
||||
}
|
||||
|
||||
@Test def shouldCreateNoneFromUntypedConsumer = {
|
||||
val a = UntypedActor.actorOf(classOf[SampleUntypedActor])
|
||||
val event = ConsumerRegistered.forConsumer(a)
|
||||
val event = ConsumerActorRegistered.forConsumer(a)
|
||||
assert(event === None)
|
||||
}
|
||||
|
||||
private def consumerOf(ref: ActorRef) = ref.actor.asInstanceOf[Consumer]
|
||||
}
|
||||
|
||||
object ConsumerRegisteredTest {
|
||||
|
|
|
|||
|
|
@ -3,7 +3,8 @@ package akka.camel
|
|||
import java.util.concurrent.{TimeoutException, CountDownLatch, TimeUnit}
|
||||
|
||||
import org.apache.camel.CamelExecutionException
|
||||
import org.apache.camel.builder.RouteBuilder
|
||||
import org.apache.camel.builder.Builder
|
||||
import org.apache.camel.model.RouteDefinition
|
||||
import org.scalatest.{BeforeAndAfterAll, WordSpec}
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
|
||||
|
|
@ -13,9 +14,9 @@ import akka.actor._
|
|||
/**
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers {
|
||||
class ConsumerScalaTest extends WordSpec with BeforeAndAfterAll with MustMatchers {
|
||||
import CamelContextManager.mandatoryTemplate
|
||||
import ConsumerTest._
|
||||
import ConsumerScalaTest._
|
||||
|
||||
var service: CamelService = _
|
||||
|
||||
|
|
@ -171,9 +172,34 @@ class ConsumerTest extends WordSpec with BeforeAndAfterAll with MustMatchers {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
"A responding, blocking consumer" when {
|
||||
"activated with a custom error handler" must {
|
||||
"handle thrown exceptions by generating a custom response" in {
|
||||
service.awaitEndpointActivation(1) {
|
||||
actorOf[ErrorHandlingConsumer].start
|
||||
} must be (true)
|
||||
mandatoryTemplate.requestBody("direct:error-handler-test", "hello") must equal ("error: hello")
|
||||
|
||||
}
|
||||
}
|
||||
"activated with a custom redelivery handler" must {
|
||||
"handle thrown exceptions by redelivering the initial message" in {
|
||||
service.awaitEndpointActivation(1) {
|
||||
actorOf[RedeliveringConsumer].start
|
||||
} must be (true)
|
||||
mandatoryTemplate.requestBody("direct:redelivery-test", "hello") must equal ("accepted: hello")
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
object ConsumerTest {
|
||||
object ConsumerScalaTest {
|
||||
trait BlockingConsumer extends Consumer { self: Actor =>
|
||||
override def blocking = true
|
||||
}
|
||||
|
||||
class TestConsumer(uri: String) extends Actor with Consumer {
|
||||
def endpointUri = uri
|
||||
protected def receive = {
|
||||
|
|
@ -181,6 +207,53 @@ object ConsumerTest {
|
|||
}
|
||||
}
|
||||
|
||||
class TestBlocker(uri: String) extends Actor with BlockingConsumer {
|
||||
self.timeout = 1000
|
||||
def endpointUri = uri
|
||||
protected def receive = {
|
||||
case msg: Message => { /* do not reply */ }
|
||||
}
|
||||
}
|
||||
|
||||
class ErrorHandlingConsumer extends Actor with BlockingConsumer {
|
||||
def endpointUri = "direct:error-handler-test"
|
||||
|
||||
onRouteDefinition {rd: RouteDefinition =>
|
||||
rd.onException(classOf[Exception]).handled(true).transform(Builder.exceptionMessage).end
|
||||
}
|
||||
|
||||
protected def receive = {
|
||||
case msg: Message => throw new Exception("error: %s" format msg.body)
|
||||
}
|
||||
}
|
||||
|
||||
class RedeliveringConsumer extends Actor with BlockingConsumer {
|
||||
def endpointUri = "direct:redelivery-test"
|
||||
|
||||
onRouteDefinition {rd: RouteDefinition =>
|
||||
rd.onException(classOf[Exception]).maximumRedeliveries(1).end
|
||||
}
|
||||
|
||||
//
|
||||
// first message to this actor is not valid and will be rejected
|
||||
//
|
||||
|
||||
var valid = false
|
||||
|
||||
protected def receive = {
|
||||
case msg: Message => try {
|
||||
respondTo(msg)
|
||||
} finally {
|
||||
valid = true
|
||||
}
|
||||
}
|
||||
|
||||
private def respondTo(msg: Message) =
|
||||
if (valid) self.reply("accepted: %s" format msg.body)
|
||||
else throw new Exception("rejected: %s" format msg.body)
|
||||
|
||||
}
|
||||
|
||||
trait TestTypedConsumer {
|
||||
@consume("direct:publish-test-3")
|
||||
def foo(s: String): String
|
||||
|
|
@ -193,12 +266,6 @@ object ConsumerTest {
|
|||
def bar(s: String) = "bar: %s" format s
|
||||
}
|
||||
|
||||
class TestBlocker(uri: String) extends Actor with Consumer {
|
||||
self.timeout = 1000
|
||||
def endpointUri = uri
|
||||
override def blocking = true
|
||||
protected def receive = {
|
||||
case msg: Message => { /* do not reply */ }
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
@ -40,9 +40,9 @@ class PublishRequestorTest extends JUnitSuite {
|
|||
val obj = TypedActor.newInstance(classOf[SampleTypedSingleConsumer], classOf[SampleTypedSingleConsumerImpl])
|
||||
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
||||
val event = (publisher !! GetRetainedMessage).as[ConsumerMethodRegistered].get
|
||||
assert(event.uri === "direct:foo")
|
||||
assert(event.endpointUri === "direct:foo")
|
||||
assert(event.typedActor === obj)
|
||||
assert(event.method.getName === "foo")
|
||||
assert(event.methodName === "foo")
|
||||
}
|
||||
|
||||
@Test def shouldReceiveOneConsumerMethodUnregisteredEvent = {
|
||||
|
|
@ -52,9 +52,9 @@ class PublishRequestorTest extends JUnitSuite {
|
|||
TypedActor.stop(obj)
|
||||
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
||||
val event = (publisher !! GetRetainedMessage).as[ConsumerMethodUnregistered].get
|
||||
assert(event.uri === "direct:foo")
|
||||
assert(event.endpointUri === "direct:foo")
|
||||
assert(event.typedActor === obj)
|
||||
assert(event.method.getName === "foo")
|
||||
assert(event.methodName === "foo")
|
||||
}
|
||||
|
||||
@Test def shouldReceiveThreeConsumerMethodRegisteredEvents = {
|
||||
|
|
@ -83,7 +83,7 @@ class PublishRequestorTest extends JUnitSuite {
|
|||
requestor ! ActorRegistered(consumer)
|
||||
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
||||
assert((publisher !! GetRetainedMessage) ===
|
||||
Some(ConsumerRegistered(consumer, "mock:test", consumer.uuid, false)))
|
||||
Some(ConsumerActorRegistered(consumer, consumer.actor.asInstanceOf[Consumer])))
|
||||
}
|
||||
|
||||
@Test def shouldReceiveOneConsumerUnregisteredEvent = {
|
||||
|
|
@ -91,7 +91,7 @@ class PublishRequestorTest extends JUnitSuite {
|
|||
requestor ! ActorUnregistered(consumer)
|
||||
assert(latch.await(5000, TimeUnit.MILLISECONDS))
|
||||
assert((publisher !! GetRetainedMessage) ===
|
||||
Some(ConsumerUnregistered(consumer, "mock:test", consumer.uuid)))
|
||||
Some(ConsumerActorUnregistered(consumer, consumer.actor.asInstanceOf[Consumer])))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -700,6 +700,13 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
|
|||
}
|
||||
}
|
||||
|
||||
private[akka] object PersistentSortedSet {
|
||||
// operations on the SortedSet
|
||||
sealed trait Op
|
||||
case object ADD extends Op
|
||||
case object REM extends Op
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements a template for a concrete persistent transactional sorted set based storage.
|
||||
* <p/>
|
||||
|
|
@ -734,61 +741,45 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
|
|||
* @author <a href="http://debasishg.blogspot.com"</a>
|
||||
*/
|
||||
trait PersistentSortedSet[A] extends Transactional with Committable with Abortable {
|
||||
protected val newElems = TransactionalMap[A, Float]()
|
||||
protected val removedElems = TransactionalVector[A]()
|
||||
//Import Ops
|
||||
import PersistentSortedSet._
|
||||
|
||||
// append only log: records all mutating operations
|
||||
protected val appendOnlyTxLog = TransactionalVector[LogEntry]()
|
||||
|
||||
// need to override in subclasses e.g. "sameElements" for Array[Byte]
|
||||
def equal(v1: A, v2: A): Boolean = v1 == v2
|
||||
|
||||
case class LogEntry(value: A, score: Option[Float], op: Op)
|
||||
|
||||
val storage: SortedSetStorageBackend[A]
|
||||
|
||||
def commit = {
|
||||
for ((element, score) <- newElems) storage.zadd(uuid, String.valueOf(score), element)
|
||||
for (element <- removedElems) storage.zrem(uuid, element)
|
||||
newElems.clear
|
||||
removedElems.clear
|
||||
for (entry <- appendOnlyTxLog) {
|
||||
(entry: @unchecked) match {
|
||||
case LogEntry(e, Some(s), ADD) => storage.zadd(uuid, String.valueOf(s), e)
|
||||
case LogEntry(e, _, REM) => storage.zrem(uuid, e)
|
||||
}
|
||||
}
|
||||
appendOnlyTxLog.clear
|
||||
}
|
||||
|
||||
def abort = {
|
||||
newElems.clear
|
||||
removedElems.clear
|
||||
appendOnlyTxLog.clear
|
||||
}
|
||||
|
||||
def +(elem: A, score: Float) = add(elem, score)
|
||||
|
||||
def add(elem: A, score: Float) = {
|
||||
register
|
||||
newElems.put(elem, score)
|
||||
appendOnlyTxLog.add(LogEntry(elem, Some(score), ADD))
|
||||
}
|
||||
|
||||
def -(elem: A) = remove(elem)
|
||||
|
||||
def remove(elem: A) = {
|
||||
register
|
||||
removedElems.add(elem)
|
||||
}
|
||||
|
||||
private def inStorage(elem: A): Option[Float] = storage.zscore(uuid, elem) match {
|
||||
case Some(s) => Some(s.toFloat)
|
||||
case None => None
|
||||
}
|
||||
|
||||
def contains(elem: A): Boolean = {
|
||||
if (newElems contains elem) true
|
||||
else {
|
||||
inStorage(elem) match {
|
||||
case Some(f) => true
|
||||
case None => false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def size: Int = newElems.size + storage.zcard(uuid) - removedElems.size
|
||||
|
||||
def zscore(elem: A): Float = {
|
||||
if (newElems contains elem) newElems.get(elem).get
|
||||
inStorage(elem) match {
|
||||
case Some(f) => f
|
||||
case None =>
|
||||
throw new NoSuchElementException(elem + " not present")
|
||||
}
|
||||
appendOnlyTxLog.add(LogEntry(elem, None, REM))
|
||||
}
|
||||
|
||||
implicit def order(x: (A, Float)) = new Ordered[(A, Float)] {
|
||||
|
|
@ -799,11 +790,27 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab
|
|||
def compare(x: (A, Float), y: (A, Float)) = x._2 compare y._2
|
||||
}
|
||||
|
||||
protected def replay: List[(A, Float)] = {
|
||||
val es = collection.mutable.Map() ++ storage.zrangeWithScore(uuid, 0, -1)
|
||||
|
||||
for (entry <- appendOnlyTxLog) {
|
||||
(entry: @unchecked) match {
|
||||
case LogEntry(v, Some(s), ADD) => es += ((v, s))
|
||||
case LogEntry(v, _, REM) => es -= v
|
||||
}
|
||||
}
|
||||
es.toList
|
||||
}
|
||||
|
||||
def contains(elem: A): Boolean = replay.map(_._1).contains(elem)
|
||||
|
||||
def size: Int = replay size
|
||||
|
||||
def zscore(elem: A): Float = replay.filter { case (e, s) => equal(e, elem) }.map(_._2).head
|
||||
|
||||
def zrange(start: Int, end: Int): List[(A, Float)] = {
|
||||
// need to operate on the whole range
|
||||
// get all from the underlying storage
|
||||
val fromStore = storage.zrangeWithScore(uuid, 0, -1)
|
||||
val ts = scala.collection.immutable.TreeSet(fromStore: _*) ++ newElems.toList
|
||||
import PersistentSortedSet._
|
||||
val ts = collection.immutable.TreeSet(replay: _*)
|
||||
val l = ts.size
|
||||
|
||||
// -1 means the last element, -2 means the second last
|
||||
|
|
@ -821,3 +828,21 @@ trait PersistentSortedSet[A] extends Transactional with Committable with Abortab
|
|||
transaction.get.get.register(uuid, this)
|
||||
}
|
||||
}
|
||||
|
||||
trait PersistentSortedSetBinary extends PersistentSortedSet[Array[Byte]] {
|
||||
import PersistentSortedSet._
|
||||
|
||||
override def equal(k1: Array[Byte], k2: Array[Byte]): Boolean = k1 sameElements k2
|
||||
|
||||
override protected def replay: List[(Array[Byte], Float)] = {
|
||||
val es = collection.mutable.Map() ++ storage.zrangeWithScore(uuid, 0, -1).map { case (k, v) => (ArraySeq(k: _*), v) }
|
||||
|
||||
for (entry <- appendOnlyTxLog) {
|
||||
(entry: @unchecked) match {
|
||||
case LogEntry(v, Some(s), ADD) => es += ((ArraySeq(v: _*), s))
|
||||
case LogEntry(v, _, REM) => es -= ArraySeq(v: _*)
|
||||
}
|
||||
}
|
||||
es.toList.map { case (k, v) => (k.toArray, v) }
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -74,7 +74,7 @@ class RedisPersistentQueue(id: String) extends PersistentQueue[Array[Byte]] {
|
|||
*
|
||||
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
|
||||
*/
|
||||
class RedisPersistentSortedSet(id: String) extends PersistentSortedSet[Array[Byte]] {
|
||||
class RedisPersistentSortedSet(id: String) extends PersistentSortedSetBinary {
|
||||
val uuid = id
|
||||
val storage = RedisStorageBackend
|
||||
}
|
||||
|
|
|
|||
|
|
@ -72,7 +72,7 @@ class SortedSetActor extends Transactor {
|
|||
hackers.+(h.name.getBytes, h.zscore)
|
||||
}
|
||||
try {
|
||||
r.foreach{ h =>
|
||||
r.foreach { h =>
|
||||
if (hackers.size <= 3)
|
||||
throw new SetThresholdViolationException
|
||||
hackers.-(h.name.getBytes)
|
||||
|
|
@ -184,11 +184,10 @@ class RedisPersistentSortedSetSpec extends
|
|||
val add1 = List(h5, h6)
|
||||
|
||||
// remove 3
|
||||
val rem1 = List(h1, h3, h4)
|
||||
val rem1 = List(h1, h3, h4, h5)
|
||||
try {
|
||||
qa !! MULTI(add1, rem1, failer)
|
||||
} catch { case e: Exception => {}
|
||||
}
|
||||
} catch { case e: RuntimeException => {} }
|
||||
(qa !! SIZE).get.asInstanceOf[Int] should equal(3)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,67 @@
|
|||
package akka.persistence.redis
|
||||
|
||||
import org.scalatest.Spec
|
||||
import org.scalatest.Assertions
|
||||
import org.scalatest.matchers.ShouldMatchers
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
|
||||
import akka.actor.{Actor, ActorRef, Transactor}
|
||||
import Actor._
|
||||
|
||||
/**
|
||||
* A persistent actor based on Redis sortedset storage.
|
||||
* <p/>
|
||||
* Needs a running Redis server.
|
||||
* @author <a href="http://debasishg.blogspot.com">Debasish Ghosh</a>
|
||||
*/
|
||||
|
||||
case class AddEmail(email: String, value: String)
|
||||
case class GetAll(email: String)
|
||||
|
||||
class MySortedSet extends Transactor {
|
||||
def receive = {
|
||||
case AddEmail(userEmail, value) => {
|
||||
val registryId = "userValues:%s".format(userEmail)
|
||||
val storageSet = RedisStorage.getSortedSet(registryId)
|
||||
storageSet.add(value.getBytes, System.nanoTime.toFloat)
|
||||
self.reply(storageSet.size)
|
||||
}
|
||||
case GetAll(userEmail) => {
|
||||
val registryId = "userValues:%s".format(userEmail)
|
||||
val storageSet = RedisStorage.getSortedSet(registryId)
|
||||
self.reply(storageSet.zrange(0, -1))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
import RedisStorageBackend._
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class RedisTicket513Spec extends
|
||||
Spec with
|
||||
ShouldMatchers with
|
||||
BeforeAndAfterAll {
|
||||
|
||||
override def beforeAll {
|
||||
flushDB
|
||||
println("** destroyed database")
|
||||
}
|
||||
|
||||
override def afterAll {
|
||||
flushDB
|
||||
println("** destroyed database")
|
||||
}
|
||||
|
||||
describe("insert into user specific set") {
|
||||
val a = actorOf[MySortedSet]
|
||||
a.start
|
||||
it("should work with transactors") {
|
||||
(a !! AddEmail("test.user@gmail.com", "foo")).get should equal(1)
|
||||
Thread.sleep(10)
|
||||
(a !! AddEmail("test.user@gmail.com", "bar")).get should equal(2)
|
||||
(a !! GetAll("test.user@gmail.com")).get.asInstanceOf[List[_]].size should equal(2)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,51 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.persistence.simpledb
|
||||
|
||||
import akka.actor.{newUuid}
|
||||
import akka.stm._
|
||||
import akka.persistence.common._
|
||||
|
||||
|
||||
object SimpledbStorage extends Storage {
|
||||
|
||||
type ElementType = Array[Byte]
|
||||
def newMap: PersistentMap[ElementType, ElementType] = newMap(newUuid.toString)
|
||||
def newVector: PersistentVector[ElementType] = newVector(newUuid.toString)
|
||||
def newRef: PersistentRef[ElementType] = newRef(newUuid.toString)
|
||||
override def newQueue: PersistentQueue[ElementType] = newQueue(newUuid.toString)
|
||||
|
||||
def getMap(id: String): PersistentMap[ElementType, ElementType] = newMap(id)
|
||||
def getVector(id: String): PersistentVector[ElementType] = newVector(id)
|
||||
def getRef(id: String): PersistentRef[ElementType] = newRef(id)
|
||||
override def getQueue(id: String): PersistentQueue[ElementType] = newQueue(id)
|
||||
|
||||
def newMap(id: String): PersistentMap[ElementType, ElementType] = new SimpledbPersistentMap(id)
|
||||
def newVector(id: String): PersistentVector[ElementType] = new SimpledbPersistentVector(id)
|
||||
def newRef(id: String): PersistentRef[ElementType] = new SimpledbPersistentRef(id)
|
||||
override def newQueue(id:String): PersistentQueue[ElementType] = new SimpledbPersistentQueue(id)
|
||||
}
|
||||
|
||||
|
||||
class SimpledbPersistentMap(id: String) extends PersistentMapBinary {
|
||||
val uuid = id
|
||||
val storage = SimpledbStorageBackend
|
||||
}
|
||||
|
||||
|
||||
class SimpledbPersistentVector(id: String) extends PersistentVector[Array[Byte]] {
|
||||
val uuid = id
|
||||
val storage = SimpledbStorageBackend
|
||||
}
|
||||
|
||||
class SimpledbPersistentRef(id: String) extends PersistentRef[Array[Byte]] {
|
||||
val uuid = id
|
||||
val storage = SimpledbStorageBackend
|
||||
}
|
||||
|
||||
class SimpledbPersistentQueue(id: String) extends PersistentQueue[Array[Byte]] {
|
||||
val uuid = id
|
||||
val storage = SimpledbStorageBackend
|
||||
}
|
||||
|
|
@ -0,0 +1,151 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.persistence.simpledb
|
||||
|
||||
import akka.persistence.common._
|
||||
import akka.config.Config.config
|
||||
import java.lang.String
|
||||
import java.util.{List => JList, ArrayList => JAList}
|
||||
|
||||
import collection.immutable.{HashMap, Iterable}
|
||||
import collection.mutable.{HashMap => MMap}
|
||||
|
||||
import com.amazonaws.auth.BasicAWSCredentials
|
||||
import com.amazonaws.services.simpledb.AmazonSimpleDBClient
|
||||
import com.amazonaws.services.simpledb.model._
|
||||
import collection.{JavaConversions, Map}
|
||||
|
||||
private[akka] object SimpledbStorageBackend extends CommonStorageBackend {
|
||||
import org.apache.commons.codec.binary.Base64
|
||||
|
||||
val seperator = "\r\n"
|
||||
val seperatorBytes = seperator.getBytes("UTF-8")
|
||||
val sizeAtt = "size"
|
||||
val base64 = new Base64(1024, seperatorBytes, true)
|
||||
val base64key = new Base64(1024, Array.empty[Byte], true)
|
||||
val id = config.getString("akka.storage.simpledb.account.id", "YOU NEED TO PROVIDE AN AWS ID")
|
||||
val secretKey = config.getString("akka.storage.simpledb.account.secretKey", "YOU NEED TO PROVIDE AN AWS SECRET KEY")
|
||||
val refDomain = config.getString("akka.storage.simpledb.domain.ref", "ref")
|
||||
val mapDomain = config.getString("akka.storage.simpledb.domain.map", "map")
|
||||
val queueDomain = config.getString("akka.storage.simpledb.domain.queue", "queue")
|
||||
val vectorDomain = config.getString("akka.storage.simpledb.domain.vector", "vector")
|
||||
val credentials = new BasicAWSCredentials(id, secretKey);
|
||||
val client = new AmazonSimpleDBClient(credentials)
|
||||
|
||||
def queueAccess = queue
|
||||
|
||||
def mapAccess = map
|
||||
|
||||
def vectorAccess = vector
|
||||
|
||||
def refAccess = ref
|
||||
|
||||
val queue = new SimpledbAccess(queueDomain)
|
||||
|
||||
val map = new SimpledbAccess(mapDomain)
|
||||
|
||||
val vector = new SimpledbAccess(vectorDomain)
|
||||
|
||||
val ref = new SimpledbAccess(refDomain)
|
||||
|
||||
private[akka] class SimpledbAccess(val domainName: String) extends KVStorageBackendAccess {
|
||||
var created = false
|
||||
|
||||
def getClient(): AmazonSimpleDBClient = {
|
||||
if (!created) {
|
||||
client.createDomain(new CreateDomainRequest(domainName))
|
||||
created = true
|
||||
}
|
||||
client
|
||||
}
|
||||
|
||||
|
||||
def drop(): Unit = {
|
||||
created = false
|
||||
client.deleteDomain(new DeleteDomainRequest(domainName))
|
||||
}
|
||||
|
||||
def delete(key: Array[Byte]): Unit = getClient.deleteAttributes(new DeleteAttributesRequest(domainName, encodeAndValidateKey(key)))
|
||||
|
||||
def getAll(keys: Iterable[Array[Byte]]): Map[Array[Byte], Array[Byte]] = {
|
||||
keys.foldLeft(new HashMap[Array[Byte], Array[Byte]]) {
|
||||
(map, key) => {
|
||||
val value = getValue(key)
|
||||
if (value != null) {
|
||||
map + (key -> getValue(key))
|
||||
} else {
|
||||
map
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def getValue(key: Array[Byte], default: Array[Byte]): Array[Byte] = {
|
||||
val req = new GetAttributesRequest(domainName, encodeAndValidateKey(key)).withConsistentRead(true)
|
||||
val resp = getClient.getAttributes(req)
|
||||
recomposeValue(resp.getAttributes) match {
|
||||
case Some(value) => value
|
||||
case None => default
|
||||
}
|
||||
}
|
||||
|
||||
def getValue(key: Array[Byte]): Array[Byte] = getValue(key, null)
|
||||
|
||||
def put(key: Array[Byte], value: Array[Byte]): Unit = {
|
||||
val req = new PutAttributesRequest(domainName, encodeAndValidateKey(key), decomposeValue(value))
|
||||
getClient.putAttributes(req)
|
||||
}
|
||||
|
||||
def encodeAndValidateKey(key: Array[Byte]): String = {
|
||||
val keystr = base64key.encodeToString(key)
|
||||
if (keystr.size > 1024) {
|
||||
throw new IllegalArgumentException("encoded key was longer than 1024 bytes (or 768 bytes unencoded)")
|
||||
}
|
||||
keystr
|
||||
}
|
||||
|
||||
def decomposeValue(value: Array[Byte]): JList[ReplaceableAttribute] = {
|
||||
val encoded = base64.encodeToString(value)
|
||||
val strings = encoded.split(seperator)
|
||||
if (strings.size > 255) {
|
||||
throw new IllegalArgumentException("The decomposed value is larger than 255K (or 195840 bytes unencoded)")
|
||||
}
|
||||
|
||||
val list: JAList[ReplaceableAttribute] = strings.zipWithIndex.foldLeft(new JAList[ReplaceableAttribute]) {
|
||||
(list, zip) => {
|
||||
zip match {
|
||||
case (encode, index) => {
|
||||
list.add(new ReplaceableAttribute(index.toString, encode, true))
|
||||
list
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
list.add(new ReplaceableAttribute(sizeAtt, list.size.toString, true))
|
||||
list
|
||||
}
|
||||
|
||||
def recomposeValue(atts: JList[Attribute]): Option[Array[Byte]] = {
|
||||
val itemSnapshot = JavaConversions.asIterable(atts).foldLeft(new MMap[String, String]) {
|
||||
(map, att) => {
|
||||
map += (att.getName -> att.getValue)
|
||||
}
|
||||
}
|
||||
itemSnapshot.get(sizeAtt) match {
|
||||
case Some(strSize) => {
|
||||
val size = Integer.parseInt(strSize)
|
||||
val encoded = (0 until size).map(_.toString).map(itemSnapshot.get(_).get).reduceLeft[String] {
|
||||
(acc, str) => acc + seperator + str
|
||||
}
|
||||
Some(base64.decode(encoded))
|
||||
}
|
||||
case None => None
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,49 @@
|
|||
package akka.persistence.simpledb
|
||||
|
||||
|
||||
import org.junit.runner.RunWith
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import akka.persistence.common.{QueueStorageBackendTest, VectorStorageBackendTest, MapStorageBackendTest, RefStorageBackendTest}
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class SimpledbRefStorageBackendTestIntegration extends RefStorageBackendTest {
|
||||
def dropRefs = {
|
||||
SimpledbStorageBackend.refAccess.drop
|
||||
}
|
||||
|
||||
|
||||
def storage = SimpledbStorageBackend
|
||||
}
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class SimpledbMapStorageBackendTestIntegration extends MapStorageBackendTest {
|
||||
def dropMaps = {
|
||||
SimpledbStorageBackend.mapAccess.drop
|
||||
}
|
||||
|
||||
|
||||
def storage = SimpledbStorageBackend
|
||||
}
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class SimpledbVectorStorageBackendTestIntegration extends VectorStorageBackendTest {
|
||||
def dropVectors = {
|
||||
SimpledbStorageBackend.vectorAccess.drop
|
||||
}
|
||||
|
||||
|
||||
def storage = SimpledbStorageBackend
|
||||
}
|
||||
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class SimpledbQueueStorageBackendTestIntegration extends QueueStorageBackendTest {
|
||||
def dropQueues = {
|
||||
SimpledbStorageBackend.queueAccess.drop
|
||||
}
|
||||
|
||||
|
||||
def storage = SimpledbStorageBackend
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,52 @@
|
|||
package akka.persistence.simpledb
|
||||
|
||||
|
||||
import org.junit.runner.RunWith
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.scalatest.matchers.ShouldMatchers
|
||||
import org.scalatest.{BeforeAndAfterEach, Spec}
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class SimpledbTestIntegration extends Spec with ShouldMatchers with BeforeAndAfterEach {
|
||||
import SimpledbStorageBackend._
|
||||
|
||||
|
||||
describe("the limitations of the simpledb storage backend") {
|
||||
it("should store up to 255K per item base 64 encoded with a name+key length <= 1024 bytes base64 encoded") {
|
||||
val name = "123456"
|
||||
val keysize: Int = 758
|
||||
log.info("key:" + keysize)
|
||||
val key = new Array[Byte](keysize)
|
||||
val valsize: Int = 195840
|
||||
log.info("value:" + valsize)
|
||||
|
||||
val value = new Array[Byte](valsize)
|
||||
mapAccess.put(name, key, value)
|
||||
val result = mapAccess.getValue(name, key, Array.empty[Byte])
|
||||
result.size should be(value.size)
|
||||
result should be(value)
|
||||
}
|
||||
|
||||
it("should not accept a name+key longer that 1024 bytes base64 encoded") {
|
||||
val name = "fail"
|
||||
val key = new Array[Byte](2048)
|
||||
val value = new Array[Byte](1)
|
||||
evaluating {
|
||||
mapAccess.put(name, key, value)
|
||||
} should produce[IllegalArgumentException]
|
||||
}
|
||||
|
||||
it("should not accept a value larger than 255K base 64 encoded") {
|
||||
val name = "failValue"
|
||||
val key = "failKey".getBytes
|
||||
val value = new Array[Byte](1024 * 512)
|
||||
evaluating {
|
||||
mapAccess.put(name, key, value)
|
||||
} should produce[IllegalArgumentException]
|
||||
}
|
||||
}
|
||||
|
||||
override protected def beforeEach(): Unit = {
|
||||
mapAccess.drop
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,23 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.persistence.simpledb
|
||||
|
||||
|
||||
import org.junit.runner.RunWith
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import akka.persistence.common._
|
||||
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class SimpledbTicket343TestIntegration extends Ticket343Test {
|
||||
def dropMapsAndVectors: Unit = {
|
||||
SimpledbStorageBackend.vectorAccess.drop
|
||||
SimpledbStorageBackend.mapAccess.drop
|
||||
}
|
||||
|
||||
def getVector: (String) => PersistentVector[Array[Byte]] = SimpledbStorage.getVector
|
||||
|
||||
def getMap: (String) => PersistentMap[Array[Byte], Array[Byte]] = SimpledbStorage.getMap
|
||||
|
||||
}
|
||||
|
|
@ -11,6 +11,21 @@ option optimize_for = SPEED;
|
|||
protoc RemoteProtocol.proto --java_out ../java
|
||||
*******************************************/
|
||||
|
||||
/**
|
||||
* Defines a remote message.
|
||||
*/
|
||||
message RemoteMessageProtocol {
|
||||
required UuidProtocol uuid = 1;
|
||||
required ActorInfoProtocol actorInfo = 2;
|
||||
required bool oneWay = 3;
|
||||
optional MessageProtocol message = 4;
|
||||
optional ExceptionProtocol exception = 5;
|
||||
optional UuidProtocol supervisorUuid = 6;
|
||||
optional RemoteActorRefProtocol sender = 7;
|
||||
repeated MetadataEntryProtocol metadata = 8;
|
||||
optional string cookie = 9;
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines a remote ActorRef that "remembers" and uses its original Actor instance
|
||||
* on the original node.
|
||||
|
|
@ -91,21 +106,6 @@ message TypedActorInfoProtocol {
|
|||
required string method = 2;
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines a remote message.
|
||||
*/
|
||||
message RemoteMessageProtocol {
|
||||
required UuidProtocol uuid = 1;
|
||||
required ActorInfoProtocol actorInfo = 2;
|
||||
required bool oneWay = 3;
|
||||
optional MessageProtocol message = 4;
|
||||
optional ExceptionProtocol exception = 5;
|
||||
optional UuidProtocol supervisorUuid = 6;
|
||||
optional RemoteActorRefProtocol sender = 7;
|
||||
repeated MetadataEntryProtocol metadata = 8;
|
||||
optional string cookie = 9;
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines a UUID.
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -310,13 +310,11 @@ class RemoteClient private[akka] (
|
|||
connection.getChannel.write(request)
|
||||
None
|
||||
} else {
|
||||
futures.synchronized {
|
||||
val futureResult = if (senderFuture.isDefined) senderFuture.get
|
||||
else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout)
|
||||
futures.put(uuidFrom(request.getUuid.getHigh, request.getUuid.getLow), futureResult)
|
||||
connection.getChannel.write(request)
|
||||
Some(futureResult)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
val exception = new RemoteClientException(
|
||||
|
|
|
|||
|
|
@ -16,7 +16,6 @@ import akka.remote.protocol.RemoteProtocol._
|
|||
import akka.remote.protocol.RemoteProtocol.ActorType._
|
||||
import akka.config.Config._
|
||||
import akka.config.ConfigurationException
|
||||
import akka.dispatch.{DefaultCompletableFuture, CompletableFuture}
|
||||
import akka.serialization.RemoteActorSerialization
|
||||
import akka.serialization.RemoteActorSerialization._
|
||||
|
||||
|
|
@ -31,6 +30,7 @@ import org.jboss.netty.handler.ssl.SslHandler
|
|||
|
||||
import scala.collection.mutable.Map
|
||||
import scala.reflect.BeanProperty
|
||||
import akka.dispatch. {Future, DefaultCompletableFuture, CompletableFuture}
|
||||
|
||||
/**
|
||||
* Use this object if you need a single remote server on a specific node.
|
||||
|
|
@ -66,10 +66,10 @@ object RemoteNode extends RemoteServer
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object RemoteServer {
|
||||
val UUID_PREFIX = "uuid:"
|
||||
|
||||
val SECURE_COOKIE: Option[String] = config.getString("akka.remote.secure-cookie")
|
||||
val REQUIRE_COOKIE = {
|
||||
val UUID_PREFIX = "uuid:"
|
||||
val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.server.message-frame-size", 1048576)
|
||||
val SECURE_COOKIE = config.getString("akka.remote.secure-cookie")
|
||||
val REQUIRE_COOKIE = {
|
||||
val requireCookie = config.getBool("akka.remote.server.require-cookie", true)
|
||||
if (requireCookie && RemoteServer.SECURE_COOKIE.isEmpty) throw new ConfigurationException(
|
||||
"Configuration option 'akka.remote.server.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.secure-cookie'.")
|
||||
|
|
@ -400,7 +400,7 @@ class RemoteServerPipelineFactory(
|
|||
}
|
||||
|
||||
val ssl = if(RemoteServer.SECURE) join(new SslHandler(engine)) else join()
|
||||
val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)
|
||||
val lenDec = new LengthFieldBasedFrameDecoder(RemoteServer.MESSAGE_FRAME_SIZE, 0, 4, 0, 4)
|
||||
val lenPrep = new LengthFieldPrepender(4)
|
||||
val protobufDec = new ProtobufDecoder(RemoteMessageProtocol.getDefaultInstance)
|
||||
val protobufEnc = new ProtobufEncoder
|
||||
|
|
@ -497,7 +497,7 @@ class RemoteServerHandler(
|
|||
}
|
||||
|
||||
private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = {
|
||||
log.debug("Received RemoteMessageProtocol[\n%s]", request.toString)
|
||||
log.debug("Received RemoteMessageProtocol[\n%s]".format(request.toString))
|
||||
request.getActorInfo.getActorType match {
|
||||
case SCALA_ACTOR => dispatchToActor(request, channel)
|
||||
case TYPED_ACTOR => dispatchToTypedActor(request, channel)
|
||||
|
|
@ -538,41 +538,46 @@ class RemoteServerHandler(
|
|||
message,
|
||||
request.getActorInfo.getTimeout,
|
||||
None,
|
||||
Some(new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout){
|
||||
override def onComplete(result: AnyRef) {
|
||||
log.debug("Returning result from actor invocation [%s]", result)
|
||||
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
|
||||
Some(actorRef),
|
||||
Right(request.getUuid),
|
||||
actorInfo.getId,
|
||||
actorInfo.getTarget,
|
||||
actorInfo.getTimeout,
|
||||
Left(result),
|
||||
true,
|
||||
Some(actorRef),
|
||||
None,
|
||||
AkkaActorType.ScalaActor,
|
||||
None)
|
||||
Some(new DefaultCompletableFuture[AnyRef](request.getActorInfo.getTimeout).
|
||||
onComplete(f => {
|
||||
val result = f.result
|
||||
val exception = f.exception
|
||||
|
||||
// FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method
|
||||
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
if (exception.isDefined) {
|
||||
log.debug("Returning exception from actor invocation [%s]".format(exception.get))
|
||||
try {
|
||||
channel.write(createErrorReplyMessage(exception.get, request, AkkaActorType.ScalaActor))
|
||||
} catch {
|
||||
case e: Throwable => server.notifyListeners(RemoteServerError(e, server))
|
||||
}
|
||||
}
|
||||
else if (result.isDefined) {
|
||||
log.debug("Returning result from actor invocation [%s]".format(result.get))
|
||||
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
|
||||
Some(actorRef),
|
||||
Right(request.getUuid),
|
||||
actorInfo.getId,
|
||||
actorInfo.getTarget,
|
||||
actorInfo.getTimeout,
|
||||
Left(result.get),
|
||||
true,
|
||||
Some(actorRef),
|
||||
None,
|
||||
AkkaActorType.ScalaActor,
|
||||
None)
|
||||
|
||||
try {
|
||||
channel.write(messageBuilder.build)
|
||||
} catch {
|
||||
case e: Throwable => server.notifyListeners(RemoteServerError(e, server))
|
||||
// FIXME lift in the supervisor uuid management into toh createRemoteMessageProtocolBuilder method
|
||||
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
|
||||
try {
|
||||
channel.write(messageBuilder.build)
|
||||
} catch {
|
||||
case e: Throwable => server.notifyListeners(RemoteServerError(e, server))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def onCompleteException(exception: Throwable) {
|
||||
try {
|
||||
channel.write(createErrorReplyMessage(exception, request, AkkaActorType.ScalaActor))
|
||||
} catch {
|
||||
case e: Throwable => server.notifyListeners(RemoteServerError(e, server))
|
||||
}
|
||||
}
|
||||
}
|
||||
))
|
||||
)
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -589,7 +594,10 @@ class RemoteServerHandler(
|
|||
val messageReceiver = typedActor.getClass.getDeclaredMethod(typedActorInfo.getMethod, argClasses: _*)
|
||||
if (request.getOneWay) messageReceiver.invoke(typedActor, args: _*)
|
||||
else {
|
||||
val result = messageReceiver.invoke(typedActor, args: _*)
|
||||
val result = messageReceiver.invoke(typedActor, args: _*) match {
|
||||
case f: Future[_] => f.await.result.get
|
||||
case other => other
|
||||
}
|
||||
log.debug("Returning result from remote typed actor invocation [%s]", result)
|
||||
|
||||
val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder(
|
||||
|
|
|
|||
|
|
@ -83,7 +83,7 @@ class RemoteTypedActorSpec extends
|
|||
}
|
||||
|
||||
describe("Remote Typed Actor ") {
|
||||
/*
|
||||
|
||||
it("should receive one-way message") {
|
||||
clearMessageLogs
|
||||
val ta = conf.getInstance(classOf[RemoteTypedActorOne])
|
||||
|
|
@ -102,7 +102,7 @@ class RemoteTypedActorSpec extends
|
|||
ta.requestReply("ping")
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
it("should be restarted on failure") {
|
||||
clearMessageLogs
|
||||
val ta = conf.getInstance(classOf[RemoteTypedActorOne])
|
||||
|
|
@ -112,7 +112,7 @@ class RemoteTypedActorSpec extends
|
|||
}
|
||||
messageLog.poll(5, TimeUnit.SECONDS) should equal ("Expected exception; to test fault-tolerance")
|
||||
}
|
||||
/*
|
||||
|
||||
it("should restart linked friends on failure") {
|
||||
clearMessageLogs
|
||||
val ta1 = conf.getInstance(classOf[RemoteTypedActorOne])
|
||||
|
|
@ -124,5 +124,5 @@ class RemoteTypedActorSpec extends
|
|||
messageLog.poll(5, TimeUnit.SECONDS) should equal ("Expected exception; to test fault-tolerance")
|
||||
messageLog.poll(5, TimeUnit.SECONDS) should equal ("Expected exception; to test fault-tolerance")
|
||||
}
|
||||
*/ }
|
||||
}
|
||||
}
|
||||
|
|
|
|||
30
akka-remote/src/test/scala/ticket/Ticket519Spec.scala
Normal file
30
akka-remote/src/test/scala/ticket/Ticket519Spec.scala
Normal file
|
|
@ -0,0 +1,30 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
package akka.actor.ticket
|
||||
|
||||
import org.scalatest.Spec
|
||||
import org.scalatest.matchers.ShouldMatchers
|
||||
import akka.remote.{RemoteClient, RemoteServer}
|
||||
import akka.actor._
|
||||
|
||||
|
||||
class Ticket519Spec extends Spec with ShouldMatchers {
|
||||
|
||||
val HOSTNAME = "localhost"
|
||||
val PORT = 6666
|
||||
|
||||
describe("A remote TypedActor") {
|
||||
it("should handle remote future replies") {
|
||||
import akka.remote._
|
||||
|
||||
val server = { val s = new RemoteServer; s.start(HOSTNAME,PORT); s}
|
||||
val actor = TypedActor.newRemoteInstance(classOf[SamplePojo], classOf[SamplePojoImpl],7000,HOSTNAME,PORT)
|
||||
val r = actor.someFutureString
|
||||
|
||||
r.await.result.get should equal ("foo")
|
||||
TypedActor.stop(actor)
|
||||
server.shutdown
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -145,13 +145,20 @@
|
|||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="implementation" type="xsd:string" use="required">
|
||||
<xsd:attribute name="implementation" type="xsd:string">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Name of the implementation class.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="ref" type="xsd:string">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Bean instance behind the actor
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="timeout" type="xsd:string" use="required">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
|
|
@ -191,13 +198,20 @@
|
|||
<xsd:element ref="beans:property" minOccurs="0" maxOccurs="unbounded"/>
|
||||
</xsd:sequence>
|
||||
<xsd:attribute name="id" type="xsd:ID"/>
|
||||
<xsd:attribute name="implementation" type="xsd:string" use="required">
|
||||
<xsd:attribute name="implementation" type="xsd:string">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Name of the implementation class.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="ref" type="xsd:string">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Bean instance behind the actor
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="timeout" type="xsd:string">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
|
|
|
|||
|
|
@ -40,6 +40,7 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App
|
|||
@BeanProperty var typed: String = ""
|
||||
@BeanProperty var interface: String = ""
|
||||
@BeanProperty var implementation: String = ""
|
||||
@BeanProperty var beanRef: String = null
|
||||
@BeanProperty var timeoutStr: String = ""
|
||||
@BeanProperty var transactional: Boolean = false
|
||||
@BeanProperty var host: String = ""
|
||||
|
|
@ -102,10 +103,18 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App
|
|||
private[akka] def createTypedInstance() : AnyRef = {
|
||||
if ((interface eq null) || interface == "") throw new AkkaBeansException(
|
||||
"The 'interface' part of the 'akka:actor' element in the Spring config file can't be null or empty string")
|
||||
if ((implementation eq null) || implementation == "") throw new AkkaBeansException(
|
||||
"The 'implementation' part of the 'akka:typed-actor' element in the Spring config file can't be null or empty string")
|
||||
if (((implementation eq null) || implementation == "") && (beanRef eq null)) throw new AkkaBeansException(
|
||||
"Either 'implementation' or 'ref' must be specified as attribute of the 'akka:typed-actor' element in the Spring config file ")
|
||||
|
||||
val typedActor: AnyRef = if (beanRef eq null ) {
|
||||
TypedActor.newInstance(interface.toClass, implementation.toClass, createConfig)
|
||||
}
|
||||
else
|
||||
{
|
||||
TypedActor.newInstance(interface.toClass, getBeanFactory().getBean(beanRef), createConfig)
|
||||
}
|
||||
|
||||
|
||||
val typedActor: AnyRef = TypedActor.newInstance(interface.toClass, implementation.toClass, createConfig)
|
||||
if (isRemote && serverManaged) {
|
||||
val server = RemoteServer.getOrCreateServer(new InetSocketAddress(host, port.toInt))
|
||||
if (serviceName.isEmpty) {
|
||||
|
|
@ -121,9 +130,13 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App
|
|||
* Create an UntypedActor.
|
||||
*/
|
||||
private[akka] def createUntypedInstance() : ActorRef = {
|
||||
if ((implementation eq null) || implementation == "") throw new AkkaBeansException(
|
||||
"The 'implementation' part of the 'akka:untyped-actor' element in the Spring config file can't be null or empty string")
|
||||
val actorRef = Actor.actorOf(implementation.toClass)
|
||||
if (((implementation eq null) || implementation == "") && (beanRef eq null)) throw new AkkaBeansException(
|
||||
"Either 'implementation' or 'ref' must be specified as attribute of the 'akka:untyped-actor' element in the Spring config file ")
|
||||
val actorRef = if (beanRef eq null )
|
||||
Actor.actorOf(implementation.toClass)
|
||||
else
|
||||
Actor.actorOf(getBeanFactory().getBean(beanRef).asInstanceOf[Actor])
|
||||
|
||||
if (timeout > 0) {
|
||||
actorRef.setTimeout(timeout)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -53,7 +53,14 @@ trait ActorParser extends BeanParser with DispatcherParser {
|
|||
}
|
||||
|
||||
objectProperties.timeoutStr = element.getAttribute(TIMEOUT)
|
||||
objectProperties.target = mandatory(element, IMPLEMENTATION)
|
||||
objectProperties.target = if (element.getAttribute(IMPLEMENTATION).isEmpty) null else element.getAttribute(IMPLEMENTATION)
|
||||
objectProperties.beanRef = if (element.getAttribute(BEANREF).isEmpty) null else element.getAttribute(BEANREF)
|
||||
|
||||
if (objectProperties.target == null && objectProperties.beanRef == null) {
|
||||
throw new IllegalArgumentException("Mandatory attribute missing, you need to provide either implementation or ref ")
|
||||
}
|
||||
|
||||
|
||||
objectProperties.transactional = if (element.getAttribute(TRANSACTIONAL).isEmpty) false else element.getAttribute(TRANSACTIONAL).toBoolean
|
||||
|
||||
if (element.hasAttribute(INTERFACE)) {
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ import AkkaSpringConfigurationTags._
|
|||
class ActorProperties {
|
||||
var typed: String = ""
|
||||
var target: String = ""
|
||||
var beanRef: String = ""
|
||||
var timeoutStr: String = ""
|
||||
var interface: String = ""
|
||||
var transactional: Boolean = false
|
||||
|
|
@ -40,6 +41,7 @@ class ActorProperties {
|
|||
builder.addPropertyValue("serviceName", serviceName)
|
||||
builder.addPropertyValue("timeoutStr", timeoutStr)
|
||||
builder.addPropertyValue(IMPLEMENTATION, target)
|
||||
builder.addPropertyValue("beanRef", beanRef)
|
||||
builder.addPropertyValue(INTERFACE, interface)
|
||||
builder.addPropertyValue(TRANSACTIONAL, transactional)
|
||||
builder.addPropertyValue(LIFECYCLE, lifecycle)
|
||||
|
|
|
|||
|
|
@ -43,6 +43,7 @@ object AkkaSpringConfigurationTags {
|
|||
// actor attributes
|
||||
val TIMEOUT = "timeout"
|
||||
val IMPLEMENTATION = "implementation"
|
||||
val BEANREF = "ref"
|
||||
val INTERFACE = "interface"
|
||||
val TRANSACTIONAL = "transactional"
|
||||
val HOST = "host"
|
||||
|
|
|
|||
|
|
@ -14,6 +14,14 @@ http://scalablesolutions.se/akka/akka-1.0-SNAPSHOT.xsd">
|
|||
implementation="akka.spring.foo.MyPojo"
|
||||
timeout="1000"/>
|
||||
|
||||
<bean id="myPojoBean" class="akka.spring.foo.MyPojo" scope="prototype"/>
|
||||
|
||||
<akka:typed-actor id="simple-typed-actor-of-bean"
|
||||
interface="akka.spring.foo.IMyPojo"
|
||||
ref="myPojoBean"
|
||||
timeout="1000"/>
|
||||
|
||||
|
||||
<akka:typed-actor id="simple-typed-actor-long-timeout"
|
||||
interface="akka.spring.foo.IMyPojo"
|
||||
implementation="akka.spring.foo.MyPojo"
|
||||
|
|
|
|||
|
|
@ -12,6 +12,11 @@ http://scalablesolutions.se/akka/akka-1.0-SNAPSHOT.xsd">
|
|||
<akka:untyped-actor id="simple-untyped-actor"
|
||||
implementation="akka.spring.foo.PingActor"/>
|
||||
|
||||
<bean id="pingActorBean" class="akka.spring.foo.PingActor" scope="prototype"/>
|
||||
|
||||
<akka:untyped-actor id="simple-untyped-actor-of-bean"
|
||||
ref="pingActorBean"/>
|
||||
|
||||
<akka:untyped-actor id="simple-untyped-actor-long-timeout"
|
||||
implementation="akka.spring.foo.PingActor"
|
||||
timeout="10000"/>
|
||||
|
|
|
|||
|
|
@ -81,6 +81,14 @@ class TypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers with B
|
|||
assert(MyPojo.lastOneWayMessage === "hello 1")
|
||||
}
|
||||
|
||||
scenario("get a typed actor of bean") {
|
||||
val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "simple-typed-actor-of-bean")
|
||||
assert(myPojo.getFoo() === "foo")
|
||||
myPojo.oneWay("hello 1")
|
||||
MyPojo.latch.await
|
||||
assert(MyPojo.lastOneWayMessage === "hello 1")
|
||||
}
|
||||
|
||||
scenario("FutureTimeoutException when timed out") {
|
||||
val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "simple-typed-actor")
|
||||
evaluating {myPojo.longRunning()} should produce[FutureTimeoutException]
|
||||
|
|
|
|||
|
|
@ -67,6 +67,14 @@ class UntypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers with
|
|||
assert(myactor.isDefinedAt("some string message"))
|
||||
}
|
||||
|
||||
scenario("untyped-actor of provided bean") {
|
||||
val myactor = getPingActorFromContext("/untyped-actor-config.xml", "simple-untyped-actor-of-bean")
|
||||
myactor.sendOneWay("Hello")
|
||||
PingActor.latch.await
|
||||
assert(PingActor.lastMessage === "Hello")
|
||||
assert(myactor.isDefinedAt("some string message"))
|
||||
}
|
||||
|
||||
scenario("untyped-actor with timeout") {
|
||||
val myactor = getPingActorFromContext("/untyped-actor-config.xml", "simple-untyped-actor-long-timeout")
|
||||
assert(myactor.getTimeout() === 10000)
|
||||
|
|
|
|||
|
|
@ -181,7 +181,6 @@ abstract class TypedActor extends Actor with Proxyable {
|
|||
if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint)
|
||||
if (TypedActor.isOneWay(joinPoint)) joinPoint.proceed
|
||||
else self.reply(joinPoint.proceed)
|
||||
|
||||
case Link(proxy) => self.link(proxy)
|
||||
case Unlink(proxy) => self.unlink(proxy)
|
||||
case unexpected => throw new IllegalActorStateException(
|
||||
|
|
@ -851,6 +850,7 @@ private[akka] abstract class ActorAspect {
|
|||
ActorType.TypedActor)
|
||||
|
||||
if (isOneWay) null // for void methods
|
||||
else if (TypedActor.returnsFuture_?(methodRtti)) future.get
|
||||
else {
|
||||
if (future.isDefined) {
|
||||
future.get.await
|
||||
|
|
|
|||
|
|
@ -1,8 +1,9 @@
|
|||
package akka.actor;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import akka.dispatch.Future;
|
||||
|
||||
public interface SamplePojo {
|
||||
public String greet(String s);
|
||||
public String fail();
|
||||
public Future<String> someFutureString();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,6 +25,10 @@ public class SamplePojoImpl extends TypedActor implements SamplePojo {
|
|||
throw new RuntimeException("expected");
|
||||
}
|
||||
|
||||
public akka.dispatch.Future<String> someFutureString() {
|
||||
return future("foo");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preRestart(Throwable e) {
|
||||
_pre = true;
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ akka {
|
|||
serialize-messages = off # Does a deep clone of (non-primitive) messages to ensure immutability
|
||||
throughput = 5 # Default throughput for all ExecutorBasedEventDrivenDispatcher, set to 1 for complete fairness
|
||||
throughput-deadline-time = -1 # Default throughput deadline for all ExecutorBasedEventDrivenDispatcher, set to 0 or negative for no deadline
|
||||
dispatcher-shutdown-timeout = 1 # Using the akka.time-unit, how long dispatchers by default will wait for new actors until they shut down
|
||||
|
||||
default-dispatcher {
|
||||
type = "GlobalExecutorBasedEventDriven" # Must be one of the following, all "Global*" are non-configurable
|
||||
|
|
@ -135,6 +136,7 @@ akka {
|
|||
service = on
|
||||
hostname = "localhost" # The hostname or IP that clients should connect to
|
||||
port = 2552 # The port clients should connect to
|
||||
message-frame-size = 1048576
|
||||
connection-timeout = 1
|
||||
require-cookie = on
|
||||
untrusted-mode = off
|
||||
|
|
|
|||
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
|
@ -1,4 +1,4 @@
|
|||
project.organization=akka
|
||||
project.organization=se.scalablesolutions.akka
|
||||
project.name=akka
|
||||
project.version=1.0-SNAPSHOT
|
||||
scala.version=2.8.0
|
||||
|
|
|
|||
|
|
@ -140,149 +140,150 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
object Dependencies {
|
||||
|
||||
// Compile
|
||||
lazy val commonsHttpClient = "commons-httpclient" % "commons-httpclient" % "3.1" % "compile"
|
||||
lazy val commonsHttpClient = "commons-httpclient" % "commons-httpclient" % "3.1" % "compile" //ApacheV2
|
||||
|
||||
lazy val annotation = "javax.annotation" % "jsr250-api" % "1.0" % "compile"
|
||||
lazy val aopalliance = "aopalliance" % "aopalliance" % "1.0" % "compile" //Public domain
|
||||
|
||||
lazy val aopalliance = "aopalliance" % "aopalliance" % "1.0" % "compile"
|
||||
lazy val atmo = "org.atmosphere" % "atmosphere-annotations" % ATMO_VERSION % "compile" //CDDL v1
|
||||
lazy val atmo_jbossweb = "org.atmosphere" % "atmosphere-compat-jbossweb" % ATMO_VERSION % "compile" //CDDL v1
|
||||
lazy val atmo_jersey = "org.atmosphere" % "atmosphere-jersey" % ATMO_VERSION % "compile" //CDDL v1
|
||||
lazy val atmo_runtime = "org.atmosphere" % "atmosphere-runtime" % ATMO_VERSION % "compile" //CDDL v1
|
||||
lazy val atmo_tomcat = "org.atmosphere" % "atmosphere-compat-tomcat" % ATMO_VERSION % "compile" //CDDL v1
|
||||
lazy val atmo_weblogic = "org.atmosphere" % "atmosphere-compat-weblogic" % ATMO_VERSION % "compile" //CDDL v1
|
||||
|
||||
lazy val atmo = "org.atmosphere" % "atmosphere-annotations" % ATMO_VERSION % "compile"
|
||||
lazy val atmo_jbossweb = "org.atmosphere" % "atmosphere-compat-jbossweb" % ATMO_VERSION % "compile"
|
||||
lazy val atmo_jersey = "org.atmosphere" % "atmosphere-jersey" % ATMO_VERSION % "compile"
|
||||
lazy val atmo_runtime = "org.atmosphere" % "atmosphere-runtime" % ATMO_VERSION % "compile"
|
||||
lazy val atmo_tomcat = "org.atmosphere" % "atmosphere-compat-tomcat" % ATMO_VERSION % "compile"
|
||||
lazy val atmo_weblogic = "org.atmosphere" % "atmosphere-compat-weblogic" % ATMO_VERSION % "compile"
|
||||
lazy val atomikos_transactions = "com.atomikos" % "transactions" % "3.2.3" % "compile" //ApacheV2
|
||||
lazy val atomikos_transactions_api = "com.atomikos" % "transactions-api" % "3.2.3" % "compile" //ApacheV2
|
||||
lazy val atomikos_transactions_jta = "com.atomikos" % "transactions-jta" % "3.2.3" % "compile" //ApacheV2
|
||||
|
||||
lazy val atomikos_transactions = "com.atomikos" % "transactions" % "3.2.3" % "compile"
|
||||
lazy val atomikos_transactions_api = "com.atomikos" % "transactions-api" % "3.2.3" % "compile"
|
||||
lazy val atomikos_transactions_jta = "com.atomikos" % "transactions-jta" % "3.2.3" % "compile"
|
||||
lazy val camel_core = "org.apache.camel" % "camel-core" % CAMEL_VERSION % "compile" //ApacheV2
|
||||
|
||||
lazy val camel_core = "org.apache.camel" % "camel-core" % CAMEL_VERSION % "compile"
|
||||
lazy val cassandra = "org.apache.cassandra" % "cassandra" % CASSANDRA_VERSION % "compile" //ApacheV2
|
||||
|
||||
lazy val cassandra = "org.apache.cassandra" % "cassandra" % CASSANDRA_VERSION % "compile"
|
||||
lazy val commons_codec = "commons-codec" % "commons-codec" % "1.4" % "compile" //ApacheV2
|
||||
|
||||
lazy val commons_codec = "commons-codec" % "commons-codec" % "1.4" % "compile"
|
||||
lazy val commons_io = "commons-io" % "commons-io" % "1.4" % "compile" //ApacheV2
|
||||
|
||||
lazy val commons_io = "commons-io" % "commons-io" % "1.4" % "compile"
|
||||
lazy val commons_pool = "commons-pool" % "commons-pool" % "1.5.4" % "compile" //ApacheV2
|
||||
|
||||
lazy val commons_pool = "commons-pool" % "commons-pool" % "1.5.4" % "compile"
|
||||
lazy val configgy = "net.lag" % "configgy" % "2.8.0-1.5.5" % "compile" //ApacheV2
|
||||
|
||||
lazy val configgy = "net.lag" % "configgy" % "2.8.0-1.5.5" % "compile"
|
||||
lazy val dispatch_http = "net.databinder" % "dispatch-http_2.8.0" % DISPATCH_VERSION % "compile" //LGPL v2
|
||||
lazy val dispatch_json = "net.databinder" % "dispatch-json_2.8.0" % DISPATCH_VERSION % "compile" //LGPL v2
|
||||
|
||||
lazy val dispatch_http = "net.databinder" % "dispatch-http_2.8.0" % DISPATCH_VERSION % "compile"
|
||||
lazy val dispatch_json = "net.databinder" % "dispatch-json_2.8.0" % DISPATCH_VERSION % "compile"
|
||||
lazy val jetty = "org.eclipse.jetty" % "jetty-server" % JETTY_VERSION % "compile" //Eclipse license
|
||||
lazy val jetty_util = "org.eclipse.jetty" % "jetty-util" % JETTY_VERSION % "compile" //Eclipse license
|
||||
lazy val jetty_xml = "org.eclipse.jetty" % "jetty-xml" % JETTY_VERSION % "compile" //Eclipse license
|
||||
lazy val jetty_servlet = "org.eclipse.jetty" % "jetty-servlet" % JETTY_VERSION % "compile" //Eclipse license
|
||||
|
||||
lazy val jetty = "org.eclipse.jetty" % "jetty-server" % JETTY_VERSION % "compile"
|
||||
lazy val jetty_util = "org.eclipse.jetty" % "jetty-util" % JETTY_VERSION % "compile"
|
||||
lazy val jetty_xml = "org.eclipse.jetty" % "jetty-xml" % JETTY_VERSION % "compile"
|
||||
lazy val jetty_servlet = "org.eclipse.jetty" % "jetty-servlet" % JETTY_VERSION % "compile"
|
||||
lazy val uuid = "com.eaio" % "uuid" % "3.2" % "compile" //MIT license
|
||||
|
||||
lazy val uuid = "com.eaio" % "uuid" % "3.2" % "compile"
|
||||
lazy val guicey = "org.guiceyfruit" % "guice-all" % "2.0" % "compile" //ApacheV2
|
||||
|
||||
lazy val guicey = "org.guiceyfruit" % "guice-all" % "2.0" % "compile"
|
||||
lazy val h2_lzf = "voldemort.store.compress" % "h2-lzf" % "1.0" % "compile" //ApacheV2
|
||||
|
||||
lazy val h2_lzf = "voldemort.store.compress" % "h2-lzf" % "1.0" % "compile"
|
||||
lazy val hawtdispatch = "org.fusesource.hawtdispatch" % "hawtdispatch-scala" % HAWT_DISPATCH_VERSION % "compile" //ApacheV2
|
||||
|
||||
lazy val hawtdispatch = "org.fusesource.hawtdispatch" % "hawtdispatch-scala" % HAWT_DISPATCH_VERSION % "compile"
|
||||
lazy val jackson = "org.codehaus.jackson" % "jackson-mapper-asl" % JACKSON_VERSION % "compile" //ApacheV2
|
||||
lazy val jackson_core = "org.codehaus.jackson" % "jackson-core-asl" % JACKSON_VERSION % "compile" //ApacheV2
|
||||
|
||||
lazy val jackson = "org.codehaus.jackson" % "jackson-mapper-asl" % JACKSON_VERSION % "compile"
|
||||
lazy val jackson_core = "org.codehaus.jackson" % "jackson-core-asl" % JACKSON_VERSION % "compile"
|
||||
lazy val jersey = "com.sun.jersey" % "jersey-core" % JERSEY_VERSION % "compile" //CDDL v1
|
||||
lazy val jersey_json = "com.sun.jersey" % "jersey-json" % JERSEY_VERSION % "compile" //CDDL v1
|
||||
lazy val jersey_server = "com.sun.jersey" % "jersey-server" % JERSEY_VERSION % "compile" //CDDL v1
|
||||
lazy val jersey_contrib = "com.sun.jersey.contribs" % "jersey-scala" % JERSEY_VERSION % "compile" //CDDL v1
|
||||
|
||||
lazy val jersey = "com.sun.jersey" % "jersey-core" % JERSEY_VERSION % "compile"
|
||||
lazy val jersey_json = "com.sun.jersey" % "jersey-json" % JERSEY_VERSION % "compile"
|
||||
lazy val jersey_server = "com.sun.jersey" % "jersey-server" % JERSEY_VERSION % "compile"
|
||||
lazy val jersey_contrib = "com.sun.jersey.contribs" % "jersey-scala" % JERSEY_VERSION % "compile"
|
||||
lazy val jgroups = "jgroups" % "jgroups" % "2.9.0.GA" % "compile" //LGPL 2.1
|
||||
|
||||
lazy val jgroups = "jgroups" % "jgroups" % "2.9.0.GA" % "compile"
|
||||
lazy val jsr166x = "jsr166x" % "jsr166x" % "1.0" % "compile" //CC Public Domain
|
||||
|
||||
lazy val jsr166x = "jsr166x" % "jsr166x" % "1.0" % "compile"
|
||||
lazy val jsr250 = "javax.annotation" % "jsr250-api" % "1.0" % "compile" //CDDL v1
|
||||
|
||||
lazy val jsr250 = "javax.annotation" % "jsr250-api" % "1.0" % "compile"
|
||||
lazy val jsr311 = "javax.ws.rs" % "jsr311-api" % "1.1" % "compile" //CDDL v1
|
||||
|
||||
lazy val jsr311 = "javax.ws.rs" % "jsr311-api" % "1.1" % "compile"
|
||||
lazy val jta_1_1 = "org.apache.geronimo.specs" % "geronimo-jta_1.1_spec" % "1.1.1" % "compile" intransitive //ApacheV2
|
||||
|
||||
lazy val jta_1_1 = "org.apache.geronimo.specs" % "geronimo-jta_1.1_spec" % "1.1.1" % "compile" intransitive
|
||||
lazy val mongo = "org.mongodb" % "mongo-java-driver" % "2.0" % "compile" //ApacheV2
|
||||
|
||||
lazy val mongo = "org.mongodb" % "mongo-java-driver" % "2.0" % "compile"
|
||||
lazy val casbah = "com.novus" % "casbah_2.8.0" % "1.0.8.5" % "compile" //ApacheV2
|
||||
|
||||
lazy val casbah = "com.novus" % "casbah_2.8.0" % "1.0.8.5" % "compile"
|
||||
lazy val multiverse = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "compile" intransitive //ApacheV2
|
||||
|
||||
lazy val multiverse = "org.multiverse" % "multiverse-alpha" % MULTIVERSE_VERSION % "compile" intransitive
|
||||
lazy val netty = "org.jboss.netty" % "netty" % "3.2.3.Final" % "compile" //ApacheV2
|
||||
|
||||
lazy val netty = "org.jboss.netty" % "netty" % "3.2.3.Final" % "compile"
|
||||
lazy val protobuf = "com.google.protobuf" % "protobuf-java" % "2.3.0" % "compile" //New BSD
|
||||
|
||||
lazy val protobuf = "com.google.protobuf" % "protobuf-java" % "2.3.0" % "compile"
|
||||
lazy val osgi_core = "org.osgi" % "org.osgi.core" % "4.2.0" //ApacheV2
|
||||
|
||||
lazy val osgi_core = "org.osgi" % "org.osgi.core" % "4.2.0"
|
||||
lazy val rabbit = "com.rabbitmq" % "amqp-client" % "1.8.1" % "compile" //Mozilla public license
|
||||
|
||||
lazy val rabbit = "com.rabbitmq" % "amqp-client" % "1.8.1" % "compile"
|
||||
lazy val redis = "com.redis" % "redisclient" % "2.8.0-2.0.3" % "compile" //ApacheV2
|
||||
|
||||
lazy val redis = "com.redis" % "redisclient" % "2.8.0-2.0.3" % "compile"
|
||||
lazy val sbinary = "sbinary" % "sbinary" % "2.8.0-0.3.1" % "compile" //MIT
|
||||
|
||||
lazy val sbinary = "sbinary" % "sbinary" % "2.8.0-0.3.1" % "compile"
|
||||
lazy val sjson = "sjson.json" % "sjson" % "0.8-2.8.0" % "compile" //ApacheV2
|
||||
lazy val sjson_test = "sjson.json" % "sjson" % "0.8-2.8.0" % "test" //ApacheV2
|
||||
|
||||
lazy val sjson = "sjson.json" % "sjson" % "0.8-2.8.0" % "compile"
|
||||
lazy val sjson_test = "sjson.json" % "sjson" % "0.8-2.8.0" % "test"
|
||||
lazy val slf4j = "org.slf4j" % "slf4j-api" % SLF4J_VERSION % "compile" //MIT
|
||||
|
||||
lazy val slf4j = "org.slf4j" % "slf4j-api" % SLF4J_VERSION % "compile"
|
||||
lazy val logback = "ch.qos.logback" % "logback-classic" % LOGBACK_VERSION % "compile" //LGPL 2.1
|
||||
lazy val logback_core = "ch.qos.logback" % "logback-core" % LOGBACK_VERSION % "compile" //LGPL 2.1
|
||||
|
||||
lazy val logback = "ch.qos.logback" % "logback-classic" % LOGBACK_VERSION % "compile"
|
||||
lazy val logback_core = "ch.qos.logback" % "logback-core" % LOGBACK_VERSION % "compile"
|
||||
lazy val spring_beans = "org.springframework" % "spring-beans" % SPRING_VERSION % "compile" //ApacheV2
|
||||
lazy val spring_context = "org.springframework" % "spring-context" % SPRING_VERSION % "compile" //ApacheV2
|
||||
|
||||
lazy val spring_beans = "org.springframework" % "spring-beans" % SPRING_VERSION % "compile"
|
||||
lazy val spring_context = "org.springframework" % "spring-context" % SPRING_VERSION % "compile"
|
||||
lazy val stax_api = "javax.xml.stream" % "stax-api" % "1.0-2" % "compile" //ApacheV2
|
||||
|
||||
lazy val stax_api = "javax.xml.stream" % "stax-api" % "1.0-2" % "compile"
|
||||
lazy val thrift = "com.facebook" % "thrift" % "r917130" % "compile" //ApacheV2
|
||||
|
||||
lazy val thrift = "com.facebook" % "thrift" % "r917130" % "compile"
|
||||
lazy val voldemort = "voldemort" % "voldemort" % "0.81" % "compile" //ApacheV2
|
||||
lazy val voldemort_contrib = "voldemort" % "voldemort-contrib" % "0.81" % "compile" //ApacheV2
|
||||
lazy val voldemort_needs_log4j = "org.slf4j" % "log4j-over-slf4j" % SLF4J_VERSION % "compile" //MIT
|
||||
|
||||
lazy val voldemort = "voldemort" % "voldemort" % "0.81" % "compile"
|
||||
lazy val voldemort_contrib = "voldemort" % "voldemort-contrib" % "0.81" % "compile"
|
||||
lazy val voldemort_needs_log4j = "org.slf4j" % "log4j-over-slf4j" % SLF4J_VERSION % "compile"
|
||||
lazy val werkz = "org.codehaus.aspectwerkz" % "aspectwerkz-nodeps-jdk5" % ASPECTWERKZ_VERSION % "compile" //LGPL 2.1
|
||||
lazy val werkz_core = "org.codehaus.aspectwerkz" % "aspectwerkz-jdk5" % ASPECTWERKZ_VERSION % "compile" //LGPL 2.1
|
||||
|
||||
lazy val werkz = "org.codehaus.aspectwerkz" % "aspectwerkz-nodeps-jdk5" % ASPECTWERKZ_VERSION % "compile"
|
||||
lazy val werkz_core = "org.codehaus.aspectwerkz" % "aspectwerkz-jdk5" % ASPECTWERKZ_VERSION % "compile"
|
||||
lazy val zookeeper = "org.apache.hadoop.zookeeper" % "zookeeper" % "3.2.2" % "compile" //ApacheV2
|
||||
|
||||
lazy val zookeeper = "org.apache.hadoop.zookeeper" % "zookeeper" % "3.2.2" % "compile"
|
||||
lazy val hadoop_core = "org.apache.hadoop" % "hadoop-core" % "0.20.2" % "compile" //ApacheV2
|
||||
|
||||
lazy val hadoop_core = "org.apache.hadoop" % "hadoop-core" % "0.20.2" % "compile"
|
||||
lazy val hbase_core = "org.apache.hbase" % "hbase-core" % "0.20.6" % "compile" //ApacheV2
|
||||
|
||||
lazy val hbase_core = "org.apache.hbase" % "hbase-core" % "0.20.6" % "compile"
|
||||
|
||||
lazy val google_coll = "com.google.collections" % "google-collections" % "1.0" % "compile"
|
||||
lazy val google_coll = "com.google.collections" % "google-collections" % "1.0" % "compile" //ApacheV2
|
||||
|
||||
//Riak PB Client
|
||||
lazy val riak_pb_client = "com.trifork" % "riak-java-pb-client" % "1.0-for-akka-by-ticktock" % "compile"
|
||||
lazy val riak_pb_client = "com.trifork" % "riak-java-pb-client" % "1.0-for-akka-by-ticktock" % "compile" //ApacheV2
|
||||
|
||||
// Test
|
||||
|
||||
lazy val camel_spring = "org.apache.camel" % "camel-spring" % CAMEL_VERSION % "test"
|
||||
lazy val cassandra_clhm = "org.apache.cassandra" % "clhm-production" % CASSANDRA_VERSION % "test"
|
||||
lazy val commons_coll = "commons-collections" % "commons-collections" % "3.2.1" % "test"
|
||||
lazy val camel_spring = "org.apache.camel" % "camel-spring" % CAMEL_VERSION % "test" //ApacheV2
|
||||
lazy val cassandra_clhm = "org.apache.cassandra" % "clhm-production" % CASSANDRA_VERSION % "test" //ApacheV2
|
||||
lazy val commons_coll = "commons-collections" % "commons-collections" % "3.2.1" % "test" //ApacheV2
|
||||
|
||||
lazy val high_scale = "org.apache.cassandra" % "high-scale-lib" % CASSANDRA_VERSION % "test"
|
||||
lazy val testJetty = "org.eclipse.jetty" % "jetty-server" % JETTY_VERSION % "test"
|
||||
lazy val testJettyWebApp= "org.eclipse.jetty" % "jetty-webapp" % JETTY_VERSION % "test"
|
||||
lazy val high_scale = "org.apache.cassandra" % "high-scale-lib" % CASSANDRA_VERSION % "test" //ApacheV2
|
||||
lazy val testJetty = "org.eclipse.jetty" % "jetty-server" % JETTY_VERSION % "test" //Eclipse license
|
||||
lazy val testJettyWebApp= "org.eclipse.jetty" % "jetty-webapp" % JETTY_VERSION % "test" //Eclipse license
|
||||
|
||||
lazy val junit = "junit" % "junit" % "4.5" % "test"
|
||||
lazy val mockito = "org.mockito" % "mockito-all" % "1.8.1" % "test"
|
||||
lazy val scalatest = "org.scalatest" % "scalatest" % SCALATEST_VERSION % "test"
|
||||
lazy val specs = "org.scala-tools.testing" %% "specs" % "1.6.5" % "test"
|
||||
lazy val junit = "junit" % "junit" % "4.5" % "test" //Common Public License 1.0
|
||||
lazy val mockito = "org.mockito" % "mockito-all" % "1.8.1" % "test" //MIT
|
||||
lazy val scalatest = "org.scalatest" % "scalatest" % SCALATEST_VERSION % "test" //ApacheV2
|
||||
lazy val specs = "org.scala-tools.testing" %% "specs" % "1.6.5" % "test" //MIT
|
||||
|
||||
//HBase testing
|
||||
lazy val hadoop_test = "org.apache.hadoop" % "hadoop-test" % "0.20.2" % "test"
|
||||
lazy val hbase_test = "org.apache.hbase" % "hbase-test" % "0.20.6" % "test"
|
||||
lazy val log4j = "log4j" % "log4j" % "1.2.15" % "test"
|
||||
lazy val jetty_mortbay = "org.mortbay.jetty" % "jetty" % "6.1.14" % "test"
|
||||
lazy val hadoop_test = "org.apache.hadoop" % "hadoop-test" % "0.20.2" % "test" //ApacheV2
|
||||
lazy val hbase_test = "org.apache.hbase" % "hbase-test" % "0.20.6" % "test" //ApacheV2
|
||||
lazy val log4j = "log4j" % "log4j" % "1.2.15" % "test" //ApacheV2
|
||||
lazy val jetty_mortbay = "org.mortbay.jetty" % "jetty" % "6.1.14" % "test" //Eclipse license
|
||||
|
||||
//voldemort testing
|
||||
lazy val jdom = "org.jdom" % "jdom" % "1.1" % "test"
|
||||
lazy val vold_jetty = "org.mortbay.jetty" % "jetty" % "6.1.18" % "test"
|
||||
lazy val velocity = "org.apache.velocity" % "velocity" % "1.6.2" % "test"
|
||||
lazy val dbcp = "commons-dbcp" % "commons-dbcp" % "1.2.2" % "test"
|
||||
lazy val jdom = "org.jdom" % "jdom" % "1.1" % "test" //JDOM license: ApacheV2 - acknowledgement
|
||||
lazy val vold_jetty = "org.mortbay.jetty" % "jetty" % "6.1.18" % "test" //ApacheV2
|
||||
lazy val velocity = "org.apache.velocity" % "velocity" % "1.6.2" % "test" //ApacheV2
|
||||
lazy val dbcp = "commons-dbcp" % "commons-dbcp" % "1.2.2" % "test" //ApacheV2
|
||||
|
||||
//memcached
|
||||
lazy val spymemcached = "spy" % "memcached" % "2.5" % "compile"
|
||||
|
||||
//simpledb
|
||||
lazy val simpledb = "com.amazonaws" % "aws-java-sdk" % "1.0.14" % "compile"
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------------------
|
||||
|
|
@ -313,8 +314,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
manifestClassPath.map(cp => ManifestAttributes(
|
||||
(Attributes.Name.CLASS_PATH, cp),
|
||||
(IMPLEMENTATION_TITLE, "Akka"),
|
||||
(IMPLEMENTATION_URL, "http://akkasource.org"),
|
||||
(IMPLEMENTATION_VENDOR, "The Akka Project")
|
||||
(IMPLEMENTATION_URL, "http://akka.io"),
|
||||
(IMPLEMENTATION_VENDOR, "Scalable Solutions AB")
|
||||
)).toList :::
|
||||
getMainClass(false).map(MainClass(_)).toList
|
||||
|
||||
|
|
@ -396,7 +397,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
val artifactRE(path, artifactId, artifactVersion) = absPath
|
||||
val command = "mvn install:install-file" +
|
||||
" -Dfile=" + absPath +
|
||||
" -DgroupId=akka" +
|
||||
" -DgroupId=se.scalablesolutions.akka" +
|
||||
" -DartifactId=" + artifactId +
|
||||
" -Dversion=" + version +
|
||||
" -Dpackaging=jar -DgeneratePom=true"
|
||||
|
|
@ -488,7 +489,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
// -------------------------------------------------------------------------------------------------------------------
|
||||
|
||||
class AkkaHttpProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
|
||||
val annotation = Dependencies.annotation
|
||||
val jsr250 = Dependencies.jsr250
|
||||
val atmo = Dependencies.atmo
|
||||
val atmo_jbossweb = Dependencies.atmo_jbossweb
|
||||
val atmo_jersey = Dependencies.atmo_jersey
|
||||
|
|
@ -546,6 +547,8 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
new AkkaCouchDBProject(_), akka_persistence_common)
|
||||
lazy val akka_persistence_memcached= project("akka-persistence-memcached", "akka-persistence-memcached",
|
||||
new AkkaMemcachedProject(_), akka_persistence_common)
|
||||
lazy val akka_persistence_simpledb= project("akka-persistence-simpledb", "akka-persistence-simpledb",
|
||||
new AkkaSimpledbProject(_), akka_persistence_common)
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------------------
|
||||
|
|
@ -675,6 +678,16 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
|
|||
override def testOptions = createTestFilter( _.endsWith("Test"))
|
||||
}
|
||||
|
||||
class AkkaSimpledbProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
|
||||
val memcached = Dependencies.simpledb
|
||||
val commons_codec = Dependencies.commons_codec
|
||||
val http = Dependencies.commonsHttpClient
|
||||
|
||||
val scalatest = Dependencies.scalatest
|
||||
|
||||
override def testOptions = createTestFilter( _.endsWith("Test"))
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------------------------------------------------
|
||||
// akka-kernel subproject
|
||||
// -------------------------------------------------------------------------------------------------------------------
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue