diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index ae0b3d3f9b..5ee900a222 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -226,4 +226,4 @@ class ExecutorBasedEventDrivenWorkStealingDispatcherConfigurator extends Message mailboxType(config), threadPoolConfig)).build } -} +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 692a5003d5..2061e558e6 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -115,13 +115,21 @@ class ExecutorBasedEventDrivenDispatcher( override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size def createMailbox(actorRef: ActorRef): AnyRef = mailboxType match { - case UnboundedMailbox(blocking) => new DefaultUnboundedMessageQueue(blocking) with ExecutableMailbox { - def dispatcher = ExecutorBasedEventDrivenDispatcher.this - } + case b: UnboundedMailbox if b.blocking => + new DefaultUnboundedMessageQueue(true) with ExecutableMailbox { + final def dispatcher = ExecutorBasedEventDrivenDispatcher.this + } - case BoundedMailbox(blocking, capacity, pushTimeOut) => - new DefaultBoundedMessageQueue(capacity, pushTimeOut, blocking) with ExecutableMailbox { - def dispatcher = ExecutorBasedEventDrivenDispatcher.this + case b: UnboundedMailbox if !b.blocking => //If we have an unbounded, non-blocking mailbox, we can go lockless + new ConcurrentLinkedQueue[MessageInvocation] with MessageQueue with ExecutableMailbox { + final def dispatcher = ExecutorBasedEventDrivenDispatcher.this + final def enqueue(m: MessageInvocation) = this.add(m) + final def dequeue(): MessageInvocation = this.poll() + } + + case b: BoundedMailbox => + new DefaultBoundedMessageQueue(b.capacity, b.pushTimeOut, b.blocking) with ExecutableMailbox { + final def dispatcher = ExecutorBasedEventDrivenDispatcher.this } } @@ -222,3 +230,47 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue => } } +/** + * A version of ExecutorBasedEventDrivenDispatcher that gives all actors registered to it a priority mailbox, + * prioritized according to the supplied comparator. + */ +class PriorityExecutorBasedEventDrivenDispatcher( + name: String, + val comparator: java.util.Comparator[MessageInvocation], + throughput: Int = Dispatchers.THROUGHPUT, + throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, + mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE, + config: ThreadPoolConfig = ThreadPoolConfig() + ) extends ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineTime, mailboxType, config) with PriorityMailbox { + + def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, throughputDeadlineTime: Int, mailboxType: UnboundedMailbox) = + this(name, comparator, throughput, throughputDeadlineTime, mailboxType,ThreadPoolConfig()) // Needed for Java API usage + + def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, mailboxType: UnboundedMailbox) = + this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType) // Needed for Java API usage + + def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int) = + this(name, comparator, throughput, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage + + def this(name: String, comparator: java.util.Comparator[MessageInvocation], config: ThreadPoolConfig) = + this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE, config) + + def this(name: String, comparator: java.util.Comparator[MessageInvocation]) = + this(name, comparator, Dispatchers.THROUGHPUT, Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, Dispatchers.MAILBOX_TYPE) // Needed for Java API usage +} + +trait PriorityMailbox { self: ExecutorBasedEventDrivenDispatcher => + def comparator: java.util.Comparator[MessageInvocation] + + override def createMailbox(actorRef: ActorRef): AnyRef = self.mailboxType match { + case b: UnboundedMailbox => + new UnboundedPriorityMessageQueue(b.blocking, comparator) with ExecutableMailbox { + final def dispatcher = self + } + + case b: BoundedMailbox => + new BoundedPriorityMessageQueue(b.capacity, b.pushTimeOut, b.blocking, comparator) with ExecutableMailbox { + final def dispatcher = self + } + } +} diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 719f266ad0..c22acba2cc 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -7,7 +7,7 @@ package akka.dispatch import akka.AkkaException import akka.actor.{Actor, EventHandler} import akka.routing.Dispatcher -import akka.japi.Procedure +import akka.japi.{ Procedure, Function => JFunc } import java.util.concurrent.locks.ReentrantLock import java.util.concurrent. {ConcurrentLinkedQueue, TimeUnit, Callable} @@ -144,7 +144,7 @@ object Future { } } -sealed trait Future[T] { +sealed trait Future[+T] { /** * Blocks the current thread until the Future has been completed or the * timeout has expired. In the case of the timeout expiring a @@ -240,6 +240,37 @@ sealed trait Future[T] { } } + /** + * Creates a new Future by applying a PartialFunction to the successful + * result of this Future if a match is found, or else return a MatchError. + * If this Future is completed with an exception then the new Future will + * also contain this exception. + */ + final def collect[A](pf: PartialFunction[Any, A]): Future[A] = { + val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) + onComplete { ft => + val optv = ft.value + if (optv.isDefined) { + val v = optv.get + fa complete { + if (v.isLeft) v.asInstanceOf[Either[Throwable, A]] + else { + try { + val r = v.right.get + if (pf isDefinedAt r) Right(pf(r)) + else Left(new MatchError(r)) + } catch { + case e: Exception => + EventHandler notifyListeners EventHandler.Error(e, this) + Left(e) + } + } + } + } + } + fa + } + /** * Creates a new Future by applying a function to the successful result of * this Future. If this Future is completed with an exception then the new @@ -338,7 +369,15 @@ sealed trait Future[T] { } /* Java API */ - final def onComplete(proc: Procedure[Future[T]]): Future[T] = onComplete(proc(_)) + final def onComplete[A >: T](proc: Procedure[Future[A]]): Future[T] = onComplete(proc(_)) + + final def map[A >: T, B](f: JFunc[A,B]): Future[B] = map(f(_)) + + final def flatMap[A >: T, B](f: JFunc[A,Future[B]]): Future[B] = flatMap(f(_)) + + final def foreach[A >: T](proc: Procedure[A]): Unit = foreach(proc(_)) + + final def filter[A >: T](p: JFunc[A,Boolean]): Future[T] = filter(p(_)) } diff --git a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala index 8fcf688d55..e0586a40a7 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala @@ -7,9 +7,8 @@ package akka.dispatch import akka.actor.{Actor, ActorType, ActorRef, ActorInitializationException} import akka.AkkaException -import java.util.{Queue, List} +import java.util.{Queue, List, Comparator, PriorityQueue} import java.util.concurrent._ -import concurrent.forkjoin.LinkedTransferQueue import akka.util._ class MessageQueueAppendFailedException(message: String) extends AkkaException(message) @@ -40,8 +39,8 @@ case class BoundedMailbox( if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") } -class DefaultUnboundedMessageQueue(blockDequeue: Boolean) - extends LinkedBlockingQueue[MessageInvocation] with MessageQueue { +trait UnboundedMessageQueueSemantics extends MessageQueue { self: BlockingQueue[MessageInvocation] => + def blockDequeue: Boolean final def enqueue(handle: MessageInvocation) { this add handle @@ -53,11 +52,12 @@ class DefaultUnboundedMessageQueue(blockDequeue: Boolean) } } -class DefaultBoundedMessageQueue(capacity: Int, pushTimeOut: Duration, blockDequeue: Boolean) - extends LinkedBlockingQueue[MessageInvocation](capacity) with MessageQueue { +trait BoundedMessageQueueSemantics extends MessageQueue { self: BlockingQueue[MessageInvocation] => + def blockDequeue: Boolean + def pushTimeOut: Duration final def enqueue(handle: MessageInvocation) { - if (pushTimeOut.toMillis > 0) { + if (pushTimeOut.length > 0 && pushTimeOut.toMillis > 0) { if (!this.offer(handle, pushTimeOut.length, pushTimeOut.unit)) throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString) } else this put handle @@ -66,4 +66,20 @@ class DefaultBoundedMessageQueue(capacity: Int, pushTimeOut: Duration, blockDequ final def dequeue(): MessageInvocation = if (blockDequeue) this.take() else this.poll() -} \ No newline at end of file +} + +class DefaultUnboundedMessageQueue(val blockDequeue: Boolean) extends + LinkedBlockingQueue[MessageInvocation] with + UnboundedMessageQueueSemantics + +class DefaultBoundedMessageQueue(capacity: Int, val pushTimeOut: Duration, val blockDequeue: Boolean) extends + LinkedBlockingQueue[MessageInvocation](capacity) with + BoundedMessageQueueSemantics + +class UnboundedPriorityMessageQueue(val blockDequeue: Boolean, cmp: Comparator[MessageInvocation]) extends + PriorityBlockingQueue[MessageInvocation](11, cmp) with + UnboundedMessageQueueSemantics + +class BoundedPriorityMessageQueue(capacity: Int, val pushTimeOut: Duration, val blockDequeue: Boolean, cmp: Comparator[MessageInvocation]) extends + BoundedBlockingQueue[MessageInvocation](capacity, new PriorityQueue[MessageInvocation](11, cmp)) with + BoundedMessageQueueSemantics diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index f32aac0261..62dcc422ee 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -148,6 +148,7 @@ abstract class RemoteSupport extends ListenerManagement with RemoteServerModule * val actor = actorOf(classOf[MyActor],"www.akka.io", 2552).start * */ + @deprecated("Will be removed after 1.1") def actorOf(factory: => Actor, host: String, port: Int): ActorRef = Actor.remote.clientManagedActorOf(() => factory, host, port) @@ -166,6 +167,7 @@ abstract class RemoteSupport extends ListenerManagement with RemoteServerModule * val actor = actorOf(classOf[MyActor],"www.akka.io",2552).start * */ + @deprecated("Will be removed after 1.1") def actorOf(clazz: Class[_ <: Actor], host: String, port: Int): ActorRef = { import ReflectiveAccess.{ createInstance, noParams, noArgs } clientManagedActorOf(() => @@ -193,6 +195,7 @@ abstract class RemoteSupport extends ListenerManagement with RemoteServerModule * val actor = actorOf[MyActor]("www.akka.io",2552).start * */ + @deprecated("Will be removed after 1.1") def actorOf[T <: Actor : Manifest](host: String, port: Int): ActorRef = { import ReflectiveAccess.{ createInstance, noParams, noArgs } clientManagedActorOf(() => @@ -423,6 +426,7 @@ trait RemoteClientModule extends RemoteModule { self: RemoteModule => def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): T = typedActorFor(intfClass, serviceId, implClassName, timeout, hostname, port, Some(loader)) + @deprecated("Will be removed after 1.1") def clientManagedActorOf(factory: () => Actor, host: String, port: Int): ActorRef @@ -462,7 +466,9 @@ trait RemoteClientModule extends RemoteModule { self: RemoteModule => private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef + @deprecated("Will be removed after 1.1") private[akka] def registerClientManagedActor(hostname: String, port: Int, uuid: Uuid): Unit + @deprecated("Will be removed after 1.1") private[akka] def unregisterClientManagedActor(hostname: String, port: Int, uuid: Uuid): Unit } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala new file mode 100644 index 0000000000..2a7452b172 --- /dev/null +++ b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala @@ -0,0 +1,322 @@ +package akka.util + +import java.util.concurrent.locks.ReentrantLock +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.{ TimeUnit, BlockingQueue } +import java.util.{ AbstractQueue, Queue, Collection, Iterator } + +class BoundedBlockingQueue[E <: AnyRef](val maxCapacity: Int, private val backing: Queue[E]) extends AbstractQueue[E] with BlockingQueue[E] { + + backing match { + case null => throw new IllegalArgumentException("Backing Queue may not be null") + case b: BlockingQueue[_] => + require(maxCapacity > 0) + require(b.size() == 0) + require(b.remainingCapacity >= maxCapacity) + case b: Queue[_] => + require(b.size() == 0) + require(maxCapacity > 0) + } + + protected val lock = new ReentrantLock(true) + + private val notEmpty = lock.newCondition() + private val notFull = lock.newCondition() + + def put(e: E): Unit = { //Blocks until not full + if (e eq null) throw new NullPointerException + lock.lock() + try { + while (backing.size() == maxCapacity) + notFull.await() + require(backing.offer(e)) + notEmpty.signal() + } finally { + lock.unlock() + } + } + + def take(): E = { //Blocks until not empty + lock.lockInterruptibly() + try { + while (backing.size() == 0) + notEmpty.await() + val e = backing.poll() + require(e ne null) + notFull.signal() + e + } finally { + lock.unlock() + } + } + + def offer(e: E): Boolean = { //Tries to do it immediately, if fail return false + if (e eq null) throw new NullPointerException + lock.lock() + try { + if (backing.size() == maxCapacity) false + else { + require(backing.offer(e)) //Should never fail + notEmpty.signal() + true + } + } finally { + lock.unlock() + } + } + + def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = { //Tries to do it within the timeout, return false if fail + if (e eq null) throw new NullPointerException + var nanos = unit.toNanos(timeout) + lock.lockInterruptibly() + try { + while(backing.size() == maxCapacity) { + if (nanos <= 0) + return false + else + nanos = notFull.awaitNanos(nanos) + } + require(backing.offer(e)) //Should never fail + notEmpty.signal() + true + } finally { + lock.unlock() + } + } + + def poll(timeout: Long, unit: TimeUnit): E = { //Tries to do it within the timeout, returns null if fail + var nanos = unit.toNanos(timeout) + lock.lockInterruptibly() + try { + var result: E = null.asInstanceOf[E] + var hasResult = false + while(!hasResult) { + hasResult = backing.poll() match { + case null if nanos <= 0 => + result = null.asInstanceOf[E] + true + case null => + try { + nanos = notEmpty.awaitNanos(nanos) + } catch { + case ie: InterruptedException => + notEmpty.signal() + throw ie + } + false + case e => + notFull.signal() + result = e + true + } + } + result + } finally { + lock.unlock() + } + } + + def poll(): E = { //Tries to remove the head of the queue immediately, if fail, return null + lock.lock() + try { + backing.poll() match { + case null => null.asInstanceOf[E] + case e => + notFull.signal() + e + } + } finally { + lock.unlock + } + } + + override def remove(e: AnyRef): Boolean = { //Tries to do it immediately, if fail, return false + if (e eq null) throw new NullPointerException + lock.lock() + try { + if (backing remove e) { + notFull.signal() + true + } else false + } finally { + lock.unlock() + } + } + + override def contains(e: AnyRef): Boolean = { + if (e eq null) throw new NullPointerException + lock.lock() + try { + backing contains e + } finally { + lock.unlock() + } + } + + override def clear(): Unit = { + lock.lock() + try { + backing.clear + } finally { + lock.unlock() + } + } + + def remainingCapacity(): Int = { + lock.lock() + try { + maxCapacity - backing.size() + } finally { + lock.unlock() + } + } + + def size(): Int = { + lock.lock() + try { + backing.size() + } finally { + lock.unlock() + } + } + + def peek(): E = { + lock.lock() + try { + backing.peek() + } finally { + lock.unlock() + } + } + + def drainTo(c: Collection[_ >: E]): Int = drainTo(c, Int.MaxValue) + + def drainTo(c: Collection[_ >: E], maxElements: Int): Int = { + if (c eq null) throw new NullPointerException + if (c eq this) throw new IllegalArgumentException + if (maxElements <= 0) 0 + else { + lock.lock() + try { + var n = 0 + var e: E = null.asInstanceOf[E] + while(n < maxElements) { + backing.poll() match { + case null => return n + case e => + c add e + n += 1 + } + } + n + } finally { + lock.unlock() + } + } + } + + override def containsAll(c: Collection[_]): Boolean = { + lock.lock() + try { + backing containsAll c + } finally { + lock.unlock() + } + } + + override def removeAll(c: Collection[_]): Boolean = { + lock.lock() + try { + if (backing.removeAll(c)) { + val sz = backing.size() + if (sz < maxCapacity) notFull.signal() + if (sz > 0) notEmpty.signal() //FIXME needed? + true + } else false + } finally { + lock.unlock() + } + } + + override def retainAll(c: Collection[_]): Boolean = { + lock.lock() + try { + if (backing.retainAll(c)) { + val sz = backing.size() + if (sz < maxCapacity) notFull.signal() //FIXME needed? + if (sz > 0) notEmpty.signal() + true + } else false + } finally { + lock.unlock() + } + } + + def iterator(): Iterator[E] = { + lock.lock + try { + val elements = backing.toArray + new Iterator[E] { + var at = 0 + var last = -1 + + def hasNext(): Boolean = at < elements.length + + def next(): E = { + if (at >= elements.length) throw new NoSuchElementException + last = at + at += 1 + elements(last).asInstanceOf[E] + } + + def remove(): Unit = { + if (last < 0) throw new IllegalStateException + val target = elements(last) + last = -1 //To avoid 2 subsequent removes without a next in between + lock.lock() + try { + val i = backing.iterator() + while(i.hasNext) { + if (i.next eq target) { + i.remove() + notFull.signal() + return () + } + } + } finally { + lock.unlock() + } + } + } + } finally { + lock.unlock + } + } + + override def toArray(): Array[AnyRef] = { + lock.lock() + try { + backing.toArray + } finally { + lock.unlock() + } + } + + override def isEmpty(): Boolean = { + lock.lock() + try { + backing.isEmpty() + } finally { + lock.unlock() + } + } + + override def toArray[X](a: Array[X with AnyRef]) = { + lock.lock() + try { + backing.toArray[X](a) + } finally { + lock.unlock() + } + } +} \ No newline at end of file diff --git a/akka-actor/src/test/java/akka/dispatch/JavaFutureTests.java b/akka-actor/src/test/java/akka/dispatch/JavaFutureTests.java index b4d01ce575..d35946ce60 100644 --- a/akka-actor/src/test/java/akka/dispatch/JavaFutureTests.java +++ b/akka-actor/src/test/java/akka/dispatch/JavaFutureTests.java @@ -3,7 +3,8 @@ package akka.dispatch; import org.junit.Test; import static org.junit.Assert.*; import java.util.concurrent.Callable; -import scala.runtime.AbstractFunction1; +import akka.japi.Function; +import akka.japi.Procedure; import scala.Some; import scala.Right; import static akka.dispatch.Futures.future; @@ -17,7 +18,7 @@ import static akka.dispatch.Futures.future; } }); - Future f2 = f1.map(new AbstractFunction1() { + Future f2 = f1.map(new Function() { public String apply(String s) { return s + " World"; } diff --git a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala index b99cf68e40..3a9a3b258e 100644 --- a/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor/src/test/scala/akka/dispatch/FutureSpec.scala @@ -59,8 +59,21 @@ class FutureSpec extends JUnitSuite { @Test def shouldFutureCompose { val actor1 = actorOf[TestActor].start val actor2 = actorOf(new Actor { def receive = { case s: String => self reply s.toUpperCase } } ).start - val future1 = actor1.!!![Any]("Hello") flatMap { case s: String => actor2.!!![Any](s) } - val future2 = actor1.!!![Any]("Hello") flatMap { case s: Int => actor2.!!![Any](s) } + val future1 = actor1 !!! "Hello" flatMap ((s: String) => actor2 !!! s) + val future2 = actor1 !!! "Hello" flatMap (actor2 !!! (_: String)) + val future3 = actor1 !!! "Hello" flatMap (actor2 !!! (_: Int)) + assert(Some(Right("WORLD")) === future1.await.value) + assert(Some(Right("WORLD")) === future2.await.value) + intercept[ClassCastException] { future3.await.resultOrException } + actor1.stop + actor2.stop + } + + @Test def shouldFutureComposePatternMatch { + val actor1 = actorOf[TestActor].start + val actor2 = actorOf(new Actor { def receive = { case s: String => self reply s.toUpperCase } } ).start + val future1 = actor1 !!! "Hello" collect { case (s: String) => s } flatMap (actor2 !!! _) + val future2 = actor1 !!! "Hello" collect { case (n: Int) => n } flatMap (actor2 !!! _) assert(Some(Right("WORLD")) === future1.await.value) intercept[MatchError] { future2.await.resultOrException } actor1.stop @@ -103,15 +116,15 @@ class FutureSpec extends JUnitSuite { }).start val future1 = for { - Res(a: Int) <- (actor !!! Req("Hello")): Future[Any] - Res(b: String) <- (actor !!! Req(a)): Future[Any] - Res(c: String) <- (actor !!! Req(7)): Future[Any] + a <- actor !!! Req("Hello") collect { case Res(x: Int) => x } + b <- actor !!! Req(a) collect { case Res(x: String) => x } + c <- actor !!! Req(7) collect { case Res(x: String) => x } } yield b + "-" + c val future2 = for { - Res(a: Int) <- (actor !!! Req("Hello")): Future[Any] - Res(b: Int) <- (actor !!! Req(a)): Future[Any] - Res(c: String) <- (actor !!! Req(7)): Future[Any] + a <- actor !!! Req("Hello") collect { case Res(x: Int) => x } + b <- actor !!! Req(a) collect { case Res(x: Int) => x } + c <- actor !!! Req(7) collect { case Res(x: String) => x } } yield b + "-" + c assert(Some(Right("10-14")) === future1.await.value) diff --git a/akka-actor/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 5dd0dfbe6d..7a469868a4 100644 --- a/akka-actor/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -1,54 +1,188 @@ -package akka.actor.dispatch +package akka.dispatch +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} +import org.scalatest.junit.JUnitRunner +import org.junit.runner.RunWith +import akka.actor. {Actor, ActorRegistry} +import akka.actor.Actor.{actorOf} +import java.util.concurrent. {TimeUnit, CountDownLatch, BlockingQueue} +import java.util.{Queue} +import akka.util._ +import akka.util.Duration._ -import org.scalatest.junit.JUnitSuite -import org.junit.Test +@RunWith(classOf[JUnitRunner]) +abstract class MailboxSpec extends + WordSpec with + MustMatchers with + BeforeAndAfterAll with + BeforeAndAfterEach { + def name: String -import akka.actor.Actor -import akka.util.Duration -import akka.dispatch._ -import Actor._ + def factory: MailboxType => MessageQueue -import java.util.concurrent.{BlockingQueue, CountDownLatch, TimeUnit} -import java.util.concurrent.atomic.AtomicReference + name should { + "create a !blockDequeue && unbounded mailbox" in { + val config = UnboundedMailbox(false) + val q = factory(config) + ensureInitialMailboxState(config, q) -class MailboxTypeSpec extends JUnitSuite { - @Test def shouldDoNothing = assert(true) + implicit val within = Duration(1,TimeUnit.SECONDS) -/* - private val unit = TimeUnit.MILLISECONDS + val f = spawn { + q.dequeue + } - @Test def shouldCreateUnboundedQueue = { - val m = UnboundedMailbox(false) - assert(m.newMailbox("uuid").asInstanceOf[BlockingQueue[MessageInvocation]].remainingCapacity === Integer.MAX_VALUE) + f.await.resultOrException must be === Some(null) + } + + "create a !blockDequeue and bounded mailbox with 10 capacity and with push timeout" in { + val config = BoundedMailbox(false, 10, Duration(10,TimeUnit.MILLISECONDS)) + val q = factory(config) + ensureInitialMailboxState(config, q) + + val exampleMessage = createMessageInvocation("test") + + for(i <- 1 to config.capacity) q.enqueue(exampleMessage) + + q.size must be === config.capacity + q.isEmpty must be === false + + intercept[MessageQueueAppendFailedException] { + q.enqueue(exampleMessage) + } + + q.dequeue must be === exampleMessage + q.size must be (config.capacity - 1) + q.isEmpty must be === false + } + + "dequeue what was enqueued properly for unbounded mailboxes" in { + testEnqueueDequeue(UnboundedMailbox(false)) + } + + "dequeue what was enqueued properly for bounded mailboxes" in { + testEnqueueDequeue(BoundedMailbox(false, 10000, Duration(-1, TimeUnit.MILLISECONDS))) + } + + "dequeue what was enqueued properly for bounded mailboxes with pushTimeout" in { + testEnqueueDequeue(BoundedMailbox(false, 10000, Duration(100, TimeUnit.MILLISECONDS))) + } + + /** FIXME Adapt test so it works with the last dequeue + + "dequeue what was enqueued properly for unbounded mailboxes with blockDeque" in { + testEnqueueDequeue(UnboundedMailbox(true)) + } + + "dequeue what was enqueued properly for bounded mailboxes with blockDeque" in { + testEnqueueDequeue(BoundedMailbox(true, 1000, Duration(-1, TimeUnit.MILLISECONDS))) + } + + "dequeue what was enqueued properly for bounded mailboxes with blockDeque and pushTimeout" in { + testEnqueueDequeue(BoundedMailbox(true, 1000, Duration(100, TimeUnit.MILLISECONDS))) + }*/ + } + + //CANDIDATE FOR TESTKIT + def spawn[T <: AnyRef](fun: => T)(implicit within: Duration): Future[T] = { + val result = new DefaultCompletableFuture[T](within.length, within.unit) + val t = new Thread(new Runnable { + def run = try { + result.completeWithResult(fun) + } catch { + case e: Throwable => result.completeWithException(e) + } + }) + t.start + result + } + + def createMessageInvocation(msg: Any): MessageInvocation = { + new MessageInvocation( + actorOf(new Actor { //Dummy actor + def receive = { case _ => } + }), msg, None, None) + } + + def ensureInitialMailboxState(config: MailboxType, q: MessageQueue) { + q must not be null + q match { + case aQueue: BlockingQueue[_] => + config match { + case BoundedMailbox(_,capacity,_) => aQueue.remainingCapacity must be === capacity + case UnboundedMailbox(_) => aQueue.remainingCapacity must be === Int.MaxValue + } + case _ => + } + q.size must be === 0 + q.isEmpty must be === true + } + + def testEnqueueDequeue(config: MailboxType) { + implicit val within = Duration(10,TimeUnit.SECONDS) + val q = factory(config) + ensureInitialMailboxState(config, q) + + def createProducer(fromNum: Int, toNum: Int): Future[Vector[MessageInvocation]] = spawn { + val messages = Vector() ++ (for(i <- fromNum to toNum) yield createMessageInvocation(i)) + for(i <- messages) q.enqueue(i) + messages + } + + val totalMessages = 10000 + val step = 500 + + val producers = for(i <- (1 to totalMessages by step).toList) yield createProducer(i,i+step-1) + + def createConsumer: Future[Vector[MessageInvocation]] = spawn { + var r = Vector[MessageInvocation]() + while(producers.exists(_.isCompleted == false) || !q.isEmpty) { + q.dequeue match { + case null => + case message => r = r :+ message + } + } + r + } + + val consumers = for(i <- (1 to 4).toList) yield createConsumer + + val ps = producers.map(_.await.resultOrException.get) + val cs = consumers.map(_.await.resultOrException.get) + + ps.map(_.size).sum must be === totalMessages //Must have produced 1000 messages + cs.map(_.size).sum must be === totalMessages //Must have consumed all produced messages + //No message is allowed to be consumed by more than one consumer + cs.flatten.distinct.size must be === totalMessages + //All produced messages should have been consumed + (cs.flatten diff ps.flatten).size must be === 0 + (ps.flatten diff cs.flatten).size must be === 0 + } } - @Test def shouldCreateBoundedQueue = { - val m = BoundedMailbox(blocking = false, capacity = 1) - assert(m.newMailbox("uuid").asInstanceOf[BlockingQueue[MessageInvocation]].remainingCapacity === 1) +class DefaultMailboxSpec extends MailboxSpec { + lazy val name = "The default mailbox implementation" + def factory = { + case UnboundedMailbox(blockDequeue) => + new DefaultUnboundedMessageQueue(blockDequeue) + case BoundedMailbox(blocking, capacity, pushTimeOut) => + new DefaultBoundedMessageQueue(capacity, pushTimeOut, blocking) } - - @Test(expected = classOf[MessageQueueAppendFailedException]) def shouldThrowMessageQueueAppendFailedExceptionWhenTimeOutEnqueue = { - val m = BoundedMailbox(false, 1, Duration(1, unit)) - val testActor = actorOf( new Actor { def receive = { case _ => }} ) - val mbox = m.newMailbox("uuid") - (1 to 10000) foreach { i => mbox.enqueue(new MessageInvocation(testActor, i, None, None, None)) } - } - - - @Test def shouldBeAbleToDequeueUnblocking = { - val m = BoundedMailbox(false, 1, Duration(1, unit)) - val mbox = m.newMailbox("uuid") - val latch = new CountDownLatch(1) - val t = new Thread { override def run = { - mbox.dequeue - latch.countDown - }} - t.start - val result = latch.await(5000,unit) - if (!result) - t.interrupt - assert(result === true) - } - */ } + +class PriorityMailboxSpec extends MailboxSpec { + val comparator = new java.util.Comparator[MessageInvocation] { + def compare(a: MessageInvocation, b: MessageInvocation): Int = { + a.## - b.## + } + } + lazy val name = "The priority mailbox implementation" + def factory = { + case UnboundedMailbox(blockDequeue) => + new UnboundedPriorityMessageQueue(blockDequeue, comparator) + case BoundedMailbox(blocking, capacity, pushTimeOut) => + new BoundedPriorityMessageQueue(capacity, pushTimeOut, blocking, comparator) + } +} \ No newline at end of file diff --git a/akka-remote/src/main/scala/akka/remote/RemoteShared.scala b/akka-remote/src/main/scala/akka/remote/RemoteShared.scala index 0ae5971866..ee4e5cf809 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteShared.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteShared.scala @@ -44,26 +44,4 @@ object RemoteServerSettings { } val BACKLOG = config.getInt("akka.remote.server.backlog", 4096) - - val SECURE = { - /*if (config.getBool("akka.remote.ssl.service",false)) { - val properties = List( - ("key-store-type" , "keyStoreType"), - ("key-store" , "keyStore"), - ("key-store-pass" , "keyStorePassword"), - ("trust-store-type", "trustStoreType"), - ("trust-store" , "trustStore"), - ("trust-store-pass", "trustStorePassword") - ).map(x => ("akka.remote.ssl." + x._1, "javax.net.ssl." + x._2)) - - // If property is not set, and we have a value from our akka.conf, use that value - for { - p <- properties if System.getProperty(p._2) eq null - c <- config.getString(p._1) - } System.setProperty(p._2, c) - - if (config.getBool("akka.remote.ssl.debug", false)) System.setProperty("javax.net.debug","ssl") - true - } else */false - } } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 698e18a9c3..9c71387a1a 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -361,28 +361,18 @@ class ActiveRemoteClientPipelineFactory( client: ActiveRemoteClient) extends ChannelPipelineFactory { def getPipeline: ChannelPipeline = { - def join(ch: ChannelHandler*) = Array[ChannelHandler](ch: _*) - - lazy val engine = { - val e = RemoteServerSslContext.client.createSSLEngine() - e.setEnabledCipherSuites(e.getSupportedCipherSuites) //TODO is this sensible? - e.setUseClientMode(true) - e - } - - val ssl = if (RemoteServerSettings.SECURE) join(new SslHandler(engine)) else join() - val timeout = new ReadTimeoutHandler(timer, RemoteClientSettings.READ_TIMEOUT.toMillis.toInt) - val lenDec = new LengthFieldBasedFrameDecoder(RemoteClientSettings.MESSAGE_FRAME_SIZE, 0, 4, 0, 4) - val lenPrep = new LengthFieldPrepender(4) + val timeout = new ReadTimeoutHandler(timer, RemoteClientSettings.READ_TIMEOUT.toMillis.toInt) + val lenDec = new LengthFieldBasedFrameDecoder(RemoteClientSettings.MESSAGE_FRAME_SIZE, 0, 4, 0, 4) + val lenPrep = new LengthFieldPrepender(4) val protobufDec = new ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance) val protobufEnc = new ProtobufEncoder - val (enc, dec) = RemoteServerSettings.COMPRESSION_SCHEME match { - case "zlib" => (join(new ZlibEncoder(RemoteServerSettings.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder)) - case _ => (join(), join()) + val (enc, dec) = RemoteServerSettings.COMPRESSION_SCHEME match { + case "zlib" => (new ZlibEncoder(RemoteServerSettings.ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil) + case _ => (Nil,Nil) } val remoteClient = new ActiveRemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client) - val stages = ssl ++ join(timeout) ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteClient) + val stages: List[ChannelHandler] = timeout :: dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: remoteClient :: Nil new StaticChannelPipeline(stages: _*) } } @@ -465,20 +455,8 @@ class ActiveRemoteClientHandler( } override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { - def connect = { - client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress)) - client.resetReconnectionTimeWindow - } - - if (RemoteServerSettings.SECURE) { - val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler]) - sslHandler.handshake.addListener(new ChannelFutureListener { - def operationComplete(future: ChannelFuture): Unit = { - if (future.isSuccess) connect - else throw new RemoteClientException("Could not establish SSL handshake", client.module, client.remoteAddress) - } - }) - } else connect + client.notifyListeners(RemoteClientConnected(client.module, client.remoteAddress)) + client.resetReconnectionTimeWindow } override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { @@ -533,7 +511,7 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with if (optimizeLocalScoped_?) { val home = this.address - if ((host == home.getHostName || host == home.getAddress.getHostAddress) && port == home.getPort)//TODO: switch to InetSocketAddress.equals? + if ((host == home.getAddress.getHostAddress || host == home.getHostName) && port == home.getPort)//TODO: switch to InetSocketAddress.equals? return new LocalActorRef(factory, None) // Code is much simpler with return } @@ -751,21 +729,6 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => if (_isRunning.isOn) typedActorsFactories.remove(id) } -object RemoteServerSslContext { - import javax.net.ssl.SSLContext - - val (client, server) = { - val protocol = "TLS" - //val algorithm = Option(Security.getProperty("ssl.KeyManagerFactory.algorithm")).getOrElse("SunX509") - //val store = KeyStore.getInstance("JKS") - val s = SSLContext.getInstance(protocol) - s.init(null, null, null) - val c = SSLContext.getInstance(protocol) - c.init(null, null, null) - (c, s) - } -} - /** * @author Jonas Bonér */ @@ -777,27 +740,17 @@ class RemoteServerPipelineFactory( import RemoteServerSettings._ def getPipeline: ChannelPipeline = { - def join(ch: ChannelHandler*) = Array[ChannelHandler](ch:_*) - - lazy val engine = { - val e = RemoteServerSslContext.server.createSSLEngine() - e.setEnabledCipherSuites(e.getSupportedCipherSuites) //TODO is this sensible? - e.setUseClientMode(false) - e - } - - val ssl = if(SECURE) join(new SslHandler(engine)) else join() val lenDec = new LengthFieldBasedFrameDecoder(MESSAGE_FRAME_SIZE, 0, 4, 0, 4) val lenPrep = new LengthFieldPrepender(4) val protobufDec = new ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance) val protobufEnc = new ProtobufEncoder val (enc, dec) = COMPRESSION_SCHEME match { - case "zlib" => (join(new ZlibEncoder(ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder)) - case _ => (join(), join()) + case "zlib" => (new ZlibEncoder(ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil) + case _ => (Nil, Nil) } val remoteServer = new RemoteServerHandler(name, openChannels, loader, server) - val stages = ssl ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteServer) + val stages: List[ChannelHandler] = dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: remoteServer :: Nil new StaticChannelPipeline(stages: _*) } } @@ -847,20 +800,7 @@ class RemoteServerHandler( val clientAddress = getClientAddress(ctx) sessionActors.set(event.getChannel(), new ConcurrentHashMap[String, ActorRef]()) typedSessionActors.set(event.getChannel(), new ConcurrentHashMap[String, AnyRef]()) - if (SECURE) { - val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler]) - // Begin handshake. - sslHandler.handshake().addListener(new ChannelFutureListener { - def operationComplete(future: ChannelFuture): Unit = { - if (future.isSuccess) { - openChannels.add(future.getChannel) - server.notifyListeners(RemoteServerClientConnected(server, clientAddress)) - } else future.getChannel.close - } - }) - } else { - server.notifyListeners(RemoteServerClientConnected(server, clientAddress)) - } + server.notifyListeners(RemoteServerClientConnected(server, clientAddress)) if (REQUIRE_COOKIE) ctx.setAttachment(CHANNEL_INIT) // signal that this is channel initialization, which will need authentication } diff --git a/akka-stm/src/main/scala/akka/agent/Agent.scala b/akka-stm/src/main/scala/akka/agent/Agent.scala index 708c79d128..2332f28b13 100644 --- a/akka-stm/src/main/scala/akka/agent/Agent.scala +++ b/akka-stm/src/main/scala/akka/agent/Agent.scala @@ -7,12 +7,13 @@ package akka.agent import akka.stm._ import akka.actor.Actor import akka.japi.{Function => JFunc, Procedure => JProc} -import akka.dispatch.Dispatchers +import akka.dispatch.{Dispatchers, Future} /** * Used internally to send functions. */ private[akka] case class Update[T](function: T => T) +private[akka] case object Get /** * Factory method for creating an Agent. @@ -139,6 +140,17 @@ class Agent[T](initialValue: T) { value }) + /** + * A future to the current value that will be completed after any currently + * queued updates. + */ + def future(): Future[T] = (updater !!! Get).asInstanceOf[Future[T]] + + /** + * Gets this agent's value after all currently queued updates have completed. + */ + def await(): T = future.await.result.get + /** * Map this agent to a new agent, applying the function to the internal state. * Does not change the value of this agent. @@ -221,6 +233,7 @@ class AgentUpdater[T](agent: Agent[T]) extends Actor { def receive = { case update: Update[T] => atomic(txFactory) { agent.ref alter update.function } + case Get => self reply agent.get case _ => () } } diff --git a/akka-stm/src/test/scala/agent/AgentSpec.scala b/akka-stm/src/test/scala/agent/AgentSpec.scala index 7f99f24664..6a9c36dbe0 100644 --- a/akka-stm/src/test/scala/agent/AgentSpec.scala +++ b/akka-stm/src/test/scala/agent/AgentSpec.scala @@ -113,6 +113,28 @@ class AgentSpec extends WordSpec with MustMatchers { agent.close } + "be able to return a 'queued' future" in { + val agent = Agent("a") + agent send (_ + "b") + agent send (_ + "c") + + val future = agent.future + + future.await.result.get must be ("abc") + + agent.close + } + + "be able to await the value after updates have completed" in { + val agent = Agent("a") + agent send (_ + "b") + agent send (_ + "c") + + agent.await must be ("abc") + + agent.close + } + "be able to be mapped" in { val agent1 = Agent(5) val agent2 = agent1 map (_ * 2) diff --git a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala index ea072db84f..956856cab6 100644 --- a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala @@ -114,7 +114,7 @@ import java.lang.reflect.{Method, Field, InvocationHandler, Proxy => JProxy} abstract class TypedActor extends Actor with Proxyable { val DELEGATE_FIELD_NAME = "DELEGATE_0".intern - @volatile private[actor] var proxy: AnyRef = _ + @volatile private[akka] var proxy: AnyRef = _ @volatile private var proxyDelegate: Field = _ /** @@ -362,10 +362,12 @@ object TypedActorConfiguration { new TypedActorConfiguration().timeout(Duration(timeout, "millis")) } + @deprecated("Will be removed after 1.1") def apply(host: String, port: Int) : TypedActorConfiguration = { new TypedActorConfiguration().makeRemote(host, port) } + @deprecated("Will be removed after 1.1") def apply(host: String, port: Int, timeout: Long) : TypedActorConfiguration = { new TypedActorConfiguration().makeRemote(host, port).timeout(Duration(timeout, "millis")) } @@ -395,8 +397,10 @@ final class TypedActorConfiguration { this } + @deprecated("Will be removed after 1.1") def makeRemote(hostname: String, port: Int): TypedActorConfiguration = makeRemote(new InetSocketAddress(hostname, port)) + @deprecated("Will be removed after 1.1") def makeRemote(remoteAddress: InetSocketAddress): TypedActorConfiguration = { _host = Some(remoteAddress) this @@ -507,6 +511,7 @@ object TypedActor { * @param host hostanme of the remote server * @param port port of the remote server */ + @deprecated("Will be removed after 1.1") def newRemoteInstance[T](intfClass: Class[T], targetClass: Class[_], timeout: Long, hostname: String, port: Int): T = { newInstance(intfClass, targetClass, TypedActorConfiguration(hostname, port, timeout)) } @@ -519,6 +524,7 @@ object TypedActor { * @param host hostanme of the remote server * @param port port of the remote server */ + @deprecated("Will be removed after 1.1") def newRemoteInstance[T](intfClass: Class[T], factory: => AnyRef, timeout: Long, hostname: String, port: Int): T = { newInstance(intfClass, factory, TypedActorConfiguration(hostname, port, timeout)) } @@ -535,6 +541,7 @@ object TypedActor { /** * Creates an ActorRef, can be local only or client-managed-remote */ + @deprecated("Will be removed after 1.1") private[akka] def createActorRef(typedActor: => TypedActor, config: TypedActorConfiguration): ActorRef = { config match { case null => actorOf(typedActor) @@ -610,12 +617,14 @@ object TypedActor { /** * Java API. */ + @deprecated("Will be removed after 1.1") def newRemoteInstance[T](intfClass: Class[T], factory: TypedActorFactory, hostname: String, port: Int) : T = newRemoteInstance(intfClass, factory.create, hostname, port) /** * Java API. */ + @deprecated("Will be removed after 1.1") def newRemoteInstance[T](intfClass: Class[T], factory: TypedActorFactory, timeout: Long, hostname: String, port: Int) : T = newRemoteInstance(intfClass, factory.create, timeout, hostname, port)