Merge branch 'master' of github.com:jboner/akka

This commit is contained in:
Jonas Bonér 2011-03-14 10:45:55 +01:00
commit c8f1d10c62
14 changed files with 714 additions and 169 deletions

View file

@ -115,13 +115,21 @@ class ExecutorBasedEventDrivenDispatcher(
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
def createMailbox(actorRef: ActorRef): AnyRef = mailboxType match {
case UnboundedMailbox(blocking) => new DefaultUnboundedMessageQueue(blocking) with ExecutableMailbox {
def dispatcher = ExecutorBasedEventDrivenDispatcher.this
case b: UnboundedMailbox if b.blocking =>
new DefaultUnboundedMessageQueue(true) with ExecutableMailbox {
final def dispatcher = ExecutorBasedEventDrivenDispatcher.this
}
case BoundedMailbox(blocking, capacity, pushTimeOut) =>
new DefaultBoundedMessageQueue(capacity, pushTimeOut, blocking) with ExecutableMailbox {
def dispatcher = ExecutorBasedEventDrivenDispatcher.this
case b: UnboundedMailbox if !b.blocking => //If we have an unbounded, non-blocking mailbox, we can go lockless
new ConcurrentLinkedQueue[MessageInvocation] with MessageQueue with ExecutableMailbox {
final def dispatcher = ExecutorBasedEventDrivenDispatcher.this
final def enqueue(m: MessageInvocation) = this.add(m)
final def dequeue(): MessageInvocation = this.poll()
}
case b: BoundedMailbox =>
new DefaultBoundedMessageQueue(b.capacity, b.pushTimeOut, b.blocking) with ExecutableMailbox {
final def dispatcher = ExecutorBasedEventDrivenDispatcher.this
}
}
@ -222,3 +230,47 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue =>
}
}
/**
* A version of ExecutorBasedEventDrivenDispatcher that gives all actors registered to it a priority mailbox,
* prioritized according to the supplied comparator.
*/
class PriorityExecutorBasedEventDrivenDispatcher(
name: String,
val comparator: java.util.Comparator[MessageInvocation],
throughput: Int = Dispatchers.THROUGHPUT,
throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS,
mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE,
config: ThreadPoolConfig = ThreadPoolConfig()
) extends ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineTime, mailboxType, config) with PriorityMailbox {
def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, throughputDeadlineTime: Int, mailboxType: UnboundedMailbox) =
this(name, comparator, throughput, throughputDeadlineTime, mailboxType,ThreadPoolConfig()) // Needed for Java API usage
def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, mailboxType: UnboundedMailbox) =
this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage
def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int) =
this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
def this(name: String, comparator: java.util.Comparator[MessageInvocation], config: ThreadPoolConfig) =
this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, config)
def this(name: String, comparator: java.util.Comparator[MessageInvocation]) =
this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage
}
trait PriorityMailbox { self: ExecutorBasedEventDrivenDispatcher =>
def comparator: java.util.Comparator[MessageInvocation]
override def createMailbox(actorRef: ActorRef): AnyRef = self.mailboxType match {
case b: UnboundedMailbox =>
new UnboundedPriorityMessageQueue(b.blocking, comparator) with ExecutableMailbox {
final def dispatcher = self
}
case b: BoundedMailbox =>
new BoundedPriorityMessageQueue(b.capacity, b.pushTimeOut, b.blocking, comparator) with ExecutableMailbox {
final def dispatcher = self
}
}
}

View file

@ -7,7 +7,7 @@ package akka.dispatch
import akka.AkkaException
import akka.actor.{Actor, EventHandler}
import akka.routing.Dispatcher
import akka.japi.Procedure
import akka.japi.{ Procedure, Function => JFunc }
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent. {ConcurrentLinkedQueue, TimeUnit, Callable}
@ -144,7 +144,7 @@ object Future {
}
}
sealed trait Future[T] {
sealed trait Future[+T] {
/**
* Blocks the current thread until the Future has been completed or the
* timeout has expired. In the case of the timeout expiring a
@ -240,6 +240,37 @@ sealed trait Future[T] {
}
}
/**
* Creates a new Future by applying a PartialFunction to the successful
* result of this Future if a match is found, or else return a MatchError.
* If this Future is completed with an exception then the new Future will
* also contain this exception.
*/
final def collect[A](pf: PartialFunction[Any, A]): Future[A] = {
val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS)
onComplete { ft =>
val optv = ft.value
if (optv.isDefined) {
val v = optv.get
fa complete {
if (v.isLeft) v.asInstanceOf[Either[Throwable, A]]
else {
try {
val r = v.right.get
if (pf isDefinedAt r) Right(pf(r))
else Left(new MatchError(r))
} catch {
case e: Exception =>
EventHandler notifyListeners EventHandler.Error(e, this)
Left(e)
}
}
}
}
}
fa
}
/**
* Creates a new Future by applying a function to the successful result of
* this Future. If this Future is completed with an exception then the new
@ -338,7 +369,15 @@ sealed trait Future[T] {
}
/* Java API */
final def onComplete(proc: Procedure[Future[T]]): Future[T] = onComplete(proc(_))
final def onComplete[A >: T](proc: Procedure[Future[A]]): Future[T] = onComplete(proc(_))
final def map[A >: T, B](f: JFunc[A,B]): Future[B] = map(f(_))
final def flatMap[A >: T, B](f: JFunc[A,Future[B]]): Future[B] = flatMap(f(_))
final def foreach[A >: T](proc: Procedure[A]): Unit = foreach(proc(_))
final def filter[A >: T](p: JFunc[A,Boolean]): Future[T] = filter(p(_))
}

View file

@ -7,9 +7,8 @@ package akka.dispatch
import akka.actor.{Actor, ActorType, ActorRef, ActorInitializationException}
import akka.AkkaException
import java.util.{Queue, List}
import java.util.{Queue, List, Comparator, PriorityQueue}
import java.util.concurrent._
import concurrent.forkjoin.LinkedTransferQueue
import akka.util._
class MessageQueueAppendFailedException(message: String) extends AkkaException(message)
@ -40,8 +39,8 @@ case class BoundedMailbox(
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
}
class DefaultUnboundedMessageQueue(blockDequeue: Boolean)
extends LinkedBlockingQueue[MessageInvocation] with MessageQueue {
trait UnboundedMessageQueueSemantics extends MessageQueue { self: BlockingQueue[MessageInvocation] =>
def blockDequeue: Boolean
final def enqueue(handle: MessageInvocation) {
this add handle
@ -53,11 +52,12 @@ class DefaultUnboundedMessageQueue(blockDequeue: Boolean)
}
}
class DefaultBoundedMessageQueue(capacity: Int, pushTimeOut: Duration, blockDequeue: Boolean)
extends LinkedBlockingQueue[MessageInvocation](capacity) with MessageQueue {
trait BoundedMessageQueueSemantics extends MessageQueue { self: BlockingQueue[MessageInvocation] =>
def blockDequeue: Boolean
def pushTimeOut: Duration
final def enqueue(handle: MessageInvocation) {
if (pushTimeOut.toMillis > 0) {
if (pushTimeOut.length > 0 && pushTimeOut.toMillis > 0) {
if (!this.offer(handle, pushTimeOut.length, pushTimeOut.unit))
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString)
} else this put handle
@ -67,3 +67,19 @@ class DefaultBoundedMessageQueue(capacity: Int, pushTimeOut: Duration, blockDequ
if (blockDequeue) this.take()
else this.poll()
}
class DefaultUnboundedMessageQueue(val blockDequeue: Boolean) extends
LinkedBlockingQueue[MessageInvocation] with
UnboundedMessageQueueSemantics
class DefaultBoundedMessageQueue(capacity: Int, val pushTimeOut: Duration, val blockDequeue: Boolean) extends
LinkedBlockingQueue[MessageInvocation](capacity) with
BoundedMessageQueueSemantics
class UnboundedPriorityMessageQueue(val blockDequeue: Boolean, cmp: Comparator[MessageInvocation]) extends
PriorityBlockingQueue[MessageInvocation](11, cmp) with
UnboundedMessageQueueSemantics
class BoundedPriorityMessageQueue(capacity: Int, val pushTimeOut: Duration, val blockDequeue: Boolean, cmp: Comparator[MessageInvocation]) extends
BoundedBlockingQueue[MessageInvocation](capacity, new PriorityQueue[MessageInvocation](11, cmp)) with
BoundedMessageQueueSemantics

View file

@ -148,6 +148,7 @@ abstract class RemoteSupport extends ListenerManagement with RemoteServerModule
* val actor = actorOf(classOf[MyActor],"www.akka.io", 2552).start
* </pre>
*/
@deprecated("Will be removed after 1.1")
def actorOf(factory: => Actor, host: String, port: Int): ActorRef =
Actor.remote.clientManagedActorOf(() => factory, host, port)
@ -166,6 +167,7 @@ abstract class RemoteSupport extends ListenerManagement with RemoteServerModule
* val actor = actorOf(classOf[MyActor],"www.akka.io",2552).start
* </pre>
*/
@deprecated("Will be removed after 1.1")
def actorOf(clazz: Class[_ <: Actor], host: String, port: Int): ActorRef = {
import ReflectiveAccess.{ createInstance, noParams, noArgs }
clientManagedActorOf(() =>
@ -193,6 +195,7 @@ abstract class RemoteSupport extends ListenerManagement with RemoteServerModule
* val actor = actorOf[MyActor]("www.akka.io",2552).start
* </pre>
*/
@deprecated("Will be removed after 1.1")
def actorOf[T <: Actor : Manifest](host: String, port: Int): ActorRef = {
import ReflectiveAccess.{ createInstance, noParams, noArgs }
clientManagedActorOf(() =>
@ -423,6 +426,7 @@ trait RemoteClientModule extends RemoteModule { self: RemoteModule =>
def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): T =
typedActorFor(intfClass, serviceId, implClassName, timeout, hostname, port, Some(loader))
@deprecated("Will be removed after 1.1")
def clientManagedActorOf(factory: () => Actor, host: String, port: Int): ActorRef
@ -462,7 +466,9 @@ trait RemoteClientModule extends RemoteModule { self: RemoteModule =>
private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef
@deprecated("Will be removed after 1.1")
private[akka] def registerClientManagedActor(hostname: String, port: Int, uuid: Uuid): Unit
@deprecated("Will be removed after 1.1")
private[akka] def unregisterClientManagedActor(hostname: String, port: Int, uuid: Uuid): Unit
}

View file

@ -0,0 +1,322 @@
package akka.util
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ TimeUnit, BlockingQueue }
import java.util.{ AbstractQueue, Queue, Collection, Iterator }
class BoundedBlockingQueue[E <: AnyRef](val maxCapacity: Int, private val backing: Queue[E]) extends AbstractQueue[E] with BlockingQueue[E] {
backing match {
case null => throw new IllegalArgumentException("Backing Queue may not be null")
case b: BlockingQueue[_] =>
require(maxCapacity > 0)
require(b.size() == 0)
require(b.remainingCapacity >= maxCapacity)
case b: Queue[_] =>
require(b.size() == 0)
require(maxCapacity > 0)
}
protected val lock = new ReentrantLock(true)
private val notEmpty = lock.newCondition()
private val notFull = lock.newCondition()
def put(e: E): Unit = { //Blocks until not full
if (e eq null) throw new NullPointerException
lock.lock()
try {
while (backing.size() == maxCapacity)
notFull.await()
require(backing.offer(e))
notEmpty.signal()
} finally {
lock.unlock()
}
}
def take(): E = { //Blocks until not empty
lock.lockInterruptibly()
try {
while (backing.size() == 0)
notEmpty.await()
val e = backing.poll()
require(e ne null)
notFull.signal()
e
} finally {
lock.unlock()
}
}
def offer(e: E): Boolean = { //Tries to do it immediately, if fail return false
if (e eq null) throw new NullPointerException
lock.lock()
try {
if (backing.size() == maxCapacity) false
else {
require(backing.offer(e)) //Should never fail
notEmpty.signal()
true
}
} finally {
lock.unlock()
}
}
def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = { //Tries to do it within the timeout, return false if fail
if (e eq null) throw new NullPointerException
var nanos = unit.toNanos(timeout)
lock.lockInterruptibly()
try {
while(backing.size() == maxCapacity) {
if (nanos <= 0)
return false
else
nanos = notFull.awaitNanos(nanos)
}
require(backing.offer(e)) //Should never fail
notEmpty.signal()
true
} finally {
lock.unlock()
}
}
def poll(timeout: Long, unit: TimeUnit): E = { //Tries to do it within the timeout, returns null if fail
var nanos = unit.toNanos(timeout)
lock.lockInterruptibly()
try {
var result: E = null.asInstanceOf[E]
var hasResult = false
while(!hasResult) {
hasResult = backing.poll() match {
case null if nanos <= 0 =>
result = null.asInstanceOf[E]
true
case null =>
try {
nanos = notEmpty.awaitNanos(nanos)
} catch {
case ie: InterruptedException =>
notEmpty.signal()
throw ie
}
false
case e =>
notFull.signal()
result = e
true
}
}
result
} finally {
lock.unlock()
}
}
def poll(): E = { //Tries to remove the head of the queue immediately, if fail, return null
lock.lock()
try {
backing.poll() match {
case null => null.asInstanceOf[E]
case e =>
notFull.signal()
e
}
} finally {
lock.unlock
}
}
override def remove(e: AnyRef): Boolean = { //Tries to do it immediately, if fail, return false
if (e eq null) throw new NullPointerException
lock.lock()
try {
if (backing remove e) {
notFull.signal()
true
} else false
} finally {
lock.unlock()
}
}
override def contains(e: AnyRef): Boolean = {
if (e eq null) throw new NullPointerException
lock.lock()
try {
backing contains e
} finally {
lock.unlock()
}
}
override def clear(): Unit = {
lock.lock()
try {
backing.clear
} finally {
lock.unlock()
}
}
def remainingCapacity(): Int = {
lock.lock()
try {
maxCapacity - backing.size()
} finally {
lock.unlock()
}
}
def size(): Int = {
lock.lock()
try {
backing.size()
} finally {
lock.unlock()
}
}
def peek(): E = {
lock.lock()
try {
backing.peek()
} finally {
lock.unlock()
}
}
def drainTo(c: Collection[_ >: E]): Int = drainTo(c, Int.MaxValue)
def drainTo(c: Collection[_ >: E], maxElements: Int): Int = {
if (c eq null) throw new NullPointerException
if (c eq this) throw new IllegalArgumentException
if (maxElements <= 0) 0
else {
lock.lock()
try {
var n = 0
var e: E = null.asInstanceOf[E]
while(n < maxElements) {
backing.poll() match {
case null => return n
case e =>
c add e
n += 1
}
}
n
} finally {
lock.unlock()
}
}
}
override def containsAll(c: Collection[_]): Boolean = {
lock.lock()
try {
backing containsAll c
} finally {
lock.unlock()
}
}
override def removeAll(c: Collection[_]): Boolean = {
lock.lock()
try {
if (backing.removeAll(c)) {
val sz = backing.size()
if (sz < maxCapacity) notFull.signal()
if (sz > 0) notEmpty.signal() //FIXME needed?
true
} else false
} finally {
lock.unlock()
}
}
override def retainAll(c: Collection[_]): Boolean = {
lock.lock()
try {
if (backing.retainAll(c)) {
val sz = backing.size()
if (sz < maxCapacity) notFull.signal() //FIXME needed?
if (sz > 0) notEmpty.signal()
true
} else false
} finally {
lock.unlock()
}
}
def iterator(): Iterator[E] = {
lock.lock
try {
val elements = backing.toArray
new Iterator[E] {
var at = 0
var last = -1
def hasNext(): Boolean = at < elements.length
def next(): E = {
if (at >= elements.length) throw new NoSuchElementException
last = at
at += 1
elements(last).asInstanceOf[E]
}
def remove(): Unit = {
if (last < 0) throw new IllegalStateException
val target = elements(last)
last = -1 //To avoid 2 subsequent removes without a next in between
lock.lock()
try {
val i = backing.iterator()
while(i.hasNext) {
if (i.next eq target) {
i.remove()
notFull.signal()
return ()
}
}
} finally {
lock.unlock()
}
}
}
} finally {
lock.unlock
}
}
override def toArray(): Array[AnyRef] = {
lock.lock()
try {
backing.toArray
} finally {
lock.unlock()
}
}
override def isEmpty(): Boolean = {
lock.lock()
try {
backing.isEmpty()
} finally {
lock.unlock()
}
}
override def toArray[X](a: Array[X with AnyRef]) = {
lock.lock()
try {
backing.toArray[X](a)
} finally {
lock.unlock()
}
}
}

View file

@ -3,7 +3,8 @@ package akka.dispatch;
import org.junit.Test;
import static org.junit.Assert.*;
import java.util.concurrent.Callable;
import scala.runtime.AbstractFunction1;
import akka.japi.Function;
import akka.japi.Procedure;
import scala.Some;
import scala.Right;
import static akka.dispatch.Futures.future;
@ -17,7 +18,7 @@ import static akka.dispatch.Futures.future;
}
});
Future f2 = f1.map(new AbstractFunction1<String, String>() {
Future f2 = f1.map(new Function<String, String>() {
public String apply(String s) {
return s + " World";
}

View file

@ -59,8 +59,21 @@ class FutureSpec extends JUnitSuite {
@Test def shouldFutureCompose {
val actor1 = actorOf[TestActor].start
val actor2 = actorOf(new Actor { def receive = { case s: String => self reply s.toUpperCase } } ).start
val future1 = actor1.!!![Any]("Hello") flatMap { case s: String => actor2.!!![Any](s) }
val future2 = actor1.!!![Any]("Hello") flatMap { case s: Int => actor2.!!![Any](s) }
val future1 = actor1 !!! "Hello" flatMap ((s: String) => actor2 !!! s)
val future2 = actor1 !!! "Hello" flatMap (actor2 !!! (_: String))
val future3 = actor1 !!! "Hello" flatMap (actor2 !!! (_: Int))
assert(Some(Right("WORLD")) === future1.await.value)
assert(Some(Right("WORLD")) === future2.await.value)
intercept[ClassCastException] { future3.await.resultOrException }
actor1.stop
actor2.stop
}
@Test def shouldFutureComposePatternMatch {
val actor1 = actorOf[TestActor].start
val actor2 = actorOf(new Actor { def receive = { case s: String => self reply s.toUpperCase } } ).start
val future1 = actor1 !!! "Hello" collect { case (s: String) => s } flatMap (actor2 !!! _)
val future2 = actor1 !!! "Hello" collect { case (n: Int) => n } flatMap (actor2 !!! _)
assert(Some(Right("WORLD")) === future1.await.value)
intercept[MatchError] { future2.await.resultOrException }
actor1.stop
@ -103,15 +116,15 @@ class FutureSpec extends JUnitSuite {
}).start
val future1 = for {
Res(a: Int) <- (actor !!! Req("Hello")): Future[Any]
Res(b: String) <- (actor !!! Req(a)): Future[Any]
Res(c: String) <- (actor !!! Req(7)): Future[Any]
a <- actor !!! Req("Hello") collect { case Res(x: Int) => x }
b <- actor !!! Req(a) collect { case Res(x: String) => x }
c <- actor !!! Req(7) collect { case Res(x: String) => x }
} yield b + "-" + c
val future2 = for {
Res(a: Int) <- (actor !!! Req("Hello")): Future[Any]
Res(b: Int) <- (actor !!! Req(a)): Future[Any]
Res(c: String) <- (actor !!! Req(7)): Future[Any]
a <- actor !!! Req("Hello") collect { case Res(x: Int) => x }
b <- actor !!! Req(a) collect { case Res(x: Int) => x }
c <- actor !!! Req(7) collect { case Res(x: String) => x }
} yield b + "-" + c
assert(Some(Right("10-14")) === future1.await.value)

View file

@ -1,54 +1,188 @@
package akka.actor.dispatch
package akka.dispatch
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import akka.actor. {Actor, ActorRegistry}
import akka.actor.Actor.{actorOf}
import java.util.concurrent. {TimeUnit, CountDownLatch, BlockingQueue}
import java.util.{Queue}
import akka.util._
import akka.util.Duration._
import org.scalatest.junit.JUnitSuite
import org.junit.Test
@RunWith(classOf[JUnitRunner])
abstract class MailboxSpec extends
WordSpec with
MustMatchers with
BeforeAndAfterAll with
BeforeAndAfterEach {
def name: String
import akka.actor.Actor
import akka.util.Duration
import akka.dispatch._
import Actor._
def factory: MailboxType => MessageQueue
import java.util.concurrent.{BlockingQueue, CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicReference
name should {
"create a !blockDequeue && unbounded mailbox" in {
val config = UnboundedMailbox(false)
val q = factory(config)
ensureInitialMailboxState(config, q)
class MailboxTypeSpec extends JUnitSuite {
@Test def shouldDoNothing = assert(true)
implicit val within = Duration(1,TimeUnit.SECONDS)
/*
private val unit = TimeUnit.MILLISECONDS
@Test def shouldCreateUnboundedQueue = {
val m = UnboundedMailbox(false)
assert(m.newMailbox("uuid").asInstanceOf[BlockingQueue[MessageInvocation]].remainingCapacity === Integer.MAX_VALUE)
val f = spawn {
q.dequeue
}
@Test def shouldCreateBoundedQueue = {
val m = BoundedMailbox(blocking = false, capacity = 1)
assert(m.newMailbox("uuid").asInstanceOf[BlockingQueue[MessageInvocation]].remainingCapacity === 1)
f.await.resultOrException must be === Some(null)
}
@Test(expected = classOf[MessageQueueAppendFailedException]) def shouldThrowMessageQueueAppendFailedExceptionWhenTimeOutEnqueue = {
val m = BoundedMailbox(false, 1, Duration(1, unit))
val testActor = actorOf( new Actor { def receive = { case _ => }} )
val mbox = m.newMailbox("uuid")
(1 to 10000) foreach { i => mbox.enqueue(new MessageInvocation(testActor, i, None, None, None)) }
"create a !blockDequeue and bounded mailbox with 10 capacity and with push timeout" in {
val config = BoundedMailbox(false, 10, Duration(10,TimeUnit.MILLISECONDS))
val q = factory(config)
ensureInitialMailboxState(config, q)
val exampleMessage = createMessageInvocation("test")
for(i <- 1 to config.capacity) q.enqueue(exampleMessage)
q.size must be === config.capacity
q.isEmpty must be === false
intercept[MessageQueueAppendFailedException] {
q.enqueue(exampleMessage)
}
q.dequeue must be === exampleMessage
q.size must be (config.capacity - 1)
q.isEmpty must be === false
}
@Test def shouldBeAbleToDequeueUnblocking = {
val m = BoundedMailbox(false, 1, Duration(1, unit))
val mbox = m.newMailbox("uuid")
val latch = new CountDownLatch(1)
val t = new Thread { override def run = {
mbox.dequeue
latch.countDown
}}
"dequeue what was enqueued properly for unbounded mailboxes" in {
testEnqueueDequeue(UnboundedMailbox(false))
}
"dequeue what was enqueued properly for bounded mailboxes" in {
testEnqueueDequeue(BoundedMailbox(false, 10000, Duration(-1, TimeUnit.MILLISECONDS)))
}
"dequeue what was enqueued properly for bounded mailboxes with pushTimeout" in {
testEnqueueDequeue(BoundedMailbox(false, 10000, Duration(100, TimeUnit.MILLISECONDS)))
}
/** FIXME Adapt test so it works with the last dequeue
"dequeue what was enqueued properly for unbounded mailboxes with blockDeque" in {
testEnqueueDequeue(UnboundedMailbox(true))
}
"dequeue what was enqueued properly for bounded mailboxes with blockDeque" in {
testEnqueueDequeue(BoundedMailbox(true, 1000, Duration(-1, TimeUnit.MILLISECONDS)))
}
"dequeue what was enqueued properly for bounded mailboxes with blockDeque and pushTimeout" in {
testEnqueueDequeue(BoundedMailbox(true, 1000, Duration(100, TimeUnit.MILLISECONDS)))
}*/
}
//CANDIDATE FOR TESTKIT
def spawn[T <: AnyRef](fun: => T)(implicit within: Duration): Future[T] = {
val result = new DefaultCompletableFuture[T](within.length, within.unit)
val t = new Thread(new Runnable {
def run = try {
result.completeWithResult(fun)
} catch {
case e: Throwable => result.completeWithException(e)
}
})
t.start
val result = latch.await(5000,unit)
if (!result)
t.interrupt
assert(result === true)
result
}
def createMessageInvocation(msg: Any): MessageInvocation = {
new MessageInvocation(
actorOf(new Actor { //Dummy actor
def receive = { case _ => }
}), msg, None, None)
}
def ensureInitialMailboxState(config: MailboxType, q: MessageQueue) {
q must not be null
q match {
case aQueue: BlockingQueue[_] =>
config match {
case BoundedMailbox(_,capacity,_) => aQueue.remainingCapacity must be === capacity
case UnboundedMailbox(_) => aQueue.remainingCapacity must be === Int.MaxValue
}
case _ =>
}
q.size must be === 0
q.isEmpty must be === true
}
def testEnqueueDequeue(config: MailboxType) {
implicit val within = Duration(10,TimeUnit.SECONDS)
val q = factory(config)
ensureInitialMailboxState(config, q)
def createProducer(fromNum: Int, toNum: Int): Future[Vector[MessageInvocation]] = spawn {
val messages = Vector() ++ (for(i <- fromNum to toNum) yield createMessageInvocation(i))
for(i <- messages) q.enqueue(i)
messages
}
val totalMessages = 10000
val step = 500
val producers = for(i <- (1 to totalMessages by step).toList) yield createProducer(i,i+step-1)
def createConsumer: Future[Vector[MessageInvocation]] = spawn {
var r = Vector[MessageInvocation]()
while(producers.exists(_.isCompleted == false) || !q.isEmpty) {
q.dequeue match {
case null =>
case message => r = r :+ message
}
}
r
}
val consumers = for(i <- (1 to 4).toList) yield createConsumer
val ps = producers.map(_.await.resultOrException.get)
val cs = consumers.map(_.await.resultOrException.get)
ps.map(_.size).sum must be === totalMessages //Must have produced 1000 messages
cs.map(_.size).sum must be === totalMessages //Must have consumed all produced messages
//No message is allowed to be consumed by more than one consumer
cs.flatten.distinct.size must be === totalMessages
//All produced messages should have been consumed
(cs.flatten diff ps.flatten).size must be === 0
(ps.flatten diff cs.flatten).size must be === 0
}
}
class DefaultMailboxSpec extends MailboxSpec {
lazy val name = "The default mailbox implementation"
def factory = {
case UnboundedMailbox(blockDequeue) =>
new DefaultUnboundedMessageQueue(blockDequeue)
case BoundedMailbox(blocking, capacity, pushTimeOut) =>
new DefaultBoundedMessageQueue(capacity, pushTimeOut, blocking)
}
}
class PriorityMailboxSpec extends MailboxSpec {
val comparator = new java.util.Comparator[MessageInvocation] {
def compare(a: MessageInvocation, b: MessageInvocation): Int = {
a.## - b.##
}
}
lazy val name = "The priority mailbox implementation"
def factory = {
case UnboundedMailbox(blockDequeue) =>
new UnboundedPriorityMessageQueue(blockDequeue, comparator)
case BoundedMailbox(blocking, capacity, pushTimeOut) =>
new BoundedPriorityMessageQueue(capacity, pushTimeOut, blocking, comparator)
}
*/
}

View file

@ -44,26 +44,4 @@ object RemoteServerSettings {
}
val BACKLOG = config.getInt("akka.remote.server.backlog", 4096)
val SECURE = {
/*if (config.getBool("akka.remote.ssl.service",false)) {
val properties = List(
("key-store-type" , "keyStoreType"),
("key-store" , "keyStore"),
("key-store-pass" , "keyStorePassword"),
("trust-store-type", "trustStoreType"),
("trust-store" , "trustStore"),
("trust-store-pass", "trustStorePassword")
).map(x => ("akka.remote.ssl." + x._1, "javax.net.ssl." + x._2))
// If property is not set, and we have a value from our akka.conf, use that value
for {
p <- properties if System.getProperty(p._2) eq null
c <- config.getString(p._1)
} System.setProperty(p._2, c)
if (config.getBool("akka.remote.ssl.debug", false)) System.setProperty("javax.net.debug","ssl")
true
} else */false
}
}

View file

@ -361,28 +361,18 @@ class ActiveRemoteClientPipelineFactory(
client: ActiveRemoteClient) extends ChannelPipelineFactory {
def getPipeline: ChannelPipeline = {
def join(ch: ChannelHandler*) = Array[ChannelHandler](ch: _*)
lazy val engine = {
val e = RemoteServerSslContext.client.createSSLEngine()
e.setEnabledCipherSuites(e.getSupportedCipherSuites) //TODO is this sensible?
e.setUseClientMode(true)
e
}
val ssl = if (RemoteServerSettings.SECURE) join(new SslHandler(engine)) else join()
val timeout = new ReadTimeoutHandler(timer, RemoteClientSettings.READ_TIMEOUT.toMillis.toInt)
val lenDec = new LengthFieldBasedFrameDecoder(RemoteClientSettings.MESSAGE_FRAME_SIZE, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4)
val protobufDec = new ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance)
val protobufEnc = new ProtobufEncoder
val (enc, dec) = RemoteServerSettings.COMPRESSION_SCHEME match {
case "zlib" => (join(new ZlibEncoder(RemoteServerSettings.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder))
case _ => (join(), join())
case "zlib" => (new ZlibEncoder(RemoteServerSettings.ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil)
case _ => (Nil,Nil)
}
val remoteClient = new ActiveRemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client)
val stages = ssl ++ join(timeout) ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteClient)
val stages: List[ChannelHandler] = timeout :: dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: remoteClient :: Nil
new StaticChannelPipeline(stages: _*)
}
}
@ -465,22 +455,10 @@ class ActiveRemoteClientHandler(
}
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
def connect = {
client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress))
client.resetReconnectionTimeWindow
}
if (RemoteServerSettings.SECURE) {
val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler])
sslHandler.handshake.addListener(new ChannelFutureListener {
def operationComplete(future: ChannelFuture): Unit = {
if (future.isSuccess) connect
else throw new RemoteClientException("Could not establish SSL handshake", client.module, client.remoteAddress)
}
})
} else connect
}
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
client.notifyListeners(RemoteClientDisconnected(client.module, client.remoteAddress))
}
@ -533,7 +511,7 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with
if (optimizeLocalScoped_?) {
val home = this.address
if ((host == home.getHostName || host == home.getAddress.getHostAddress) && port == home.getPort)//TODO: switch to InetSocketAddress.equals?
if ((host == home.getAddress.getHostAddress || host == home.getHostName) && port == home.getPort)//TODO: switch to InetSocketAddress.equals?
return new LocalActorRef(factory, None) // Code is much simpler with return
}
@ -751,21 +729,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
if (_isRunning.isOn) typedActorsFactories.remove(id)
}
object RemoteServerSslContext {
import javax.net.ssl.SSLContext
val (client, server) = {
val protocol = "TLS"
//val algorithm = Option(Security.getProperty("ssl.KeyManagerFactory.algorithm")).getOrElse("SunX509")
//val store = KeyStore.getInstance("JKS")
val s = SSLContext.getInstance(protocol)
s.init(null, null, null)
val c = SSLContext.getInstance(protocol)
c.init(null, null, null)
(c, s)
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@ -777,27 +740,17 @@ class RemoteServerPipelineFactory(
import RemoteServerSettings._
def getPipeline: ChannelPipeline = {
def join(ch: ChannelHandler*) = Array[ChannelHandler](ch:_*)
lazy val engine = {
val e = RemoteServerSslContext.server.createSSLEngine()
e.setEnabledCipherSuites(e.getSupportedCipherSuites) //TODO is this sensible?
e.setUseClientMode(false)
e
}
val ssl = if(SECURE) join(new SslHandler(engine)) else join()
val lenDec = new LengthFieldBasedFrameDecoder(MESSAGE_FRAME_SIZE, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4)
val protobufDec = new ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance)
val protobufEnc = new ProtobufEncoder
val (enc, dec) = COMPRESSION_SCHEME match {
case "zlib" => (join(new ZlibEncoder(ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder))
case _ => (join(), join())
case "zlib" => (new ZlibEncoder(ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil)
case _ => (Nil, Nil)
}
val remoteServer = new RemoteServerHandler(name, openChannels, loader, server)
val stages = ssl ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteServer)
val stages: List[ChannelHandler] = dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: remoteServer :: Nil
new StaticChannelPipeline(stages: _*)
}
}
@ -847,20 +800,7 @@ class RemoteServerHandler(
val clientAddress = getClientAddress(ctx)
sessionActors.set(event.getChannel(), new ConcurrentHashMap[String, ActorRef]())
typedSessionActors.set(event.getChannel(), new ConcurrentHashMap[String, AnyRef]())
if (SECURE) {
val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler])
// Begin handshake.
sslHandler.handshake().addListener(new ChannelFutureListener {
def operationComplete(future: ChannelFuture): Unit = {
if (future.isSuccess) {
openChannels.add(future.getChannel)
server.notifyListeners(RemoteServerClientConnected(server, clientAddress))
} else future.getChannel.close
}
})
} else {
server.notifyListeners(RemoteServerClientConnected(server, clientAddress))
}
if (REQUIRE_COOKIE) ctx.setAttachment(CHANNEL_INIT) // signal that this is channel initialization, which will need authentication
}

View file

@ -7,12 +7,13 @@ package akka.agent
import akka.stm._
import akka.actor.Actor
import akka.japi.{Function => JFunc, Procedure => JProc}
import akka.dispatch.Dispatchers
import akka.dispatch.{Dispatchers, Future}
/**
* Used internally to send functions.
*/
private[akka] case class Update[T](function: T => T)
private[akka] case object Get
/**
* Factory method for creating an Agent.
@ -139,6 +140,17 @@ class Agent[T](initialValue: T) {
value
})
/**
* A future to the current value that will be completed after any currently
* queued updates.
*/
def future(): Future[T] = (updater !!! Get).asInstanceOf[Future[T]]
/**
* Gets this agent's value after all currently queued updates have completed.
*/
def await(): T = future.await.result.get
/**
* Map this agent to a new agent, applying the function to the internal state.
* Does not change the value of this agent.
@ -221,6 +233,7 @@ class AgentUpdater[T](agent: Agent[T]) extends Actor {
def receive = {
case update: Update[T] =>
atomic(txFactory) { agent.ref alter update.function }
case Get => self reply agent.get
case _ => ()
}
}

View file

@ -113,6 +113,28 @@ class AgentSpec extends WordSpec with MustMatchers {
agent.close
}
"be able to return a 'queued' future" in {
val agent = Agent("a")
agent send (_ + "b")
agent send (_ + "c")
val future = agent.future
future.await.result.get must be ("abc")
agent.close
}
"be able to await the value after updates have completed" in {
val agent = Agent("a")
agent send (_ + "b")
agent send (_ + "c")
agent.await must be ("abc")
agent.close
}
"be able to be mapped" in {
val agent1 = Agent(5)
val agent2 = agent1 map (_ * 2)

View file

@ -114,7 +114,7 @@ import java.lang.reflect.{Method, Field, InvocationHandler, Proxy => JProxy}
abstract class TypedActor extends Actor with Proxyable {
val DELEGATE_FIELD_NAME = "DELEGATE_0".intern
@volatile private[actor] var proxy: AnyRef = _
@volatile private[akka] var proxy: AnyRef = _
@volatile private var proxyDelegate: Field = _
/**
@ -362,10 +362,12 @@ object TypedActorConfiguration {
new TypedActorConfiguration().timeout(Duration(timeout, "millis"))
}
@deprecated("Will be removed after 1.1")
def apply(host: String, port: Int) : TypedActorConfiguration = {
new TypedActorConfiguration().makeRemote(host, port)
}
@deprecated("Will be removed after 1.1")
def apply(host: String, port: Int, timeout: Long) : TypedActorConfiguration = {
new TypedActorConfiguration().makeRemote(host, port).timeout(Duration(timeout, "millis"))
}
@ -395,8 +397,10 @@ final class TypedActorConfiguration {
this
}
@deprecated("Will be removed after 1.1")
def makeRemote(hostname: String, port: Int): TypedActorConfiguration = makeRemote(new InetSocketAddress(hostname, port))
@deprecated("Will be removed after 1.1")
def makeRemote(remoteAddress: InetSocketAddress): TypedActorConfiguration = {
_host = Some(remoteAddress)
this
@ -507,6 +511,7 @@ object TypedActor {
* @param host hostanme of the remote server
* @param port port of the remote server
*/
@deprecated("Will be removed after 1.1")
def newRemoteInstance[T](intfClass: Class[T], targetClass: Class[_], timeout: Long, hostname: String, port: Int): T = {
newInstance(intfClass, targetClass, TypedActorConfiguration(hostname, port, timeout))
}
@ -519,6 +524,7 @@ object TypedActor {
* @param host hostanme of the remote server
* @param port port of the remote server
*/
@deprecated("Will be removed after 1.1")
def newRemoteInstance[T](intfClass: Class[T], factory: => AnyRef, timeout: Long, hostname: String, port: Int): T = {
newInstance(intfClass, factory, TypedActorConfiguration(hostname, port, timeout))
}
@ -535,6 +541,7 @@ object TypedActor {
/**
* Creates an ActorRef, can be local only or client-managed-remote
*/
@deprecated("Will be removed after 1.1")
private[akka] def createActorRef(typedActor: => TypedActor, config: TypedActorConfiguration): ActorRef = {
config match {
case null => actorOf(typedActor)
@ -610,12 +617,14 @@ object TypedActor {
/**
* Java API.
*/
@deprecated("Will be removed after 1.1")
def newRemoteInstance[T](intfClass: Class[T], factory: TypedActorFactory, hostname: String, port: Int) : T =
newRemoteInstance(intfClass, factory.create, hostname, port)
/**
* Java API.
*/
@deprecated("Will be removed after 1.1")
def newRemoteInstance[T](intfClass: Class[T], factory: TypedActorFactory, timeout: Long, hostname: String, port: Int) : T =
newRemoteInstance(intfClass, factory.create, timeout, hostname, port)