diff --git a/akka-core/src/main/scala/dispatch/Queues.scala b/akka-core/src/main/scala/dispatch/Queues.scala index e12f22a526..7f16f7aa57 100644 --- a/akka-core/src/main/scala/dispatch/Queues.scala +++ b/akka-core/src/main/scala/dispatch/Queues.scala @@ -19,7 +19,8 @@ class BoundedTransferQueue[E <: AnyRef]( require(pushTimeUnit ne null) protected val guard = new Semaphore(capacity) - + + //Enqueue an item within the push timeout (acquire Semaphore) protected def enq(f: => Boolean): Boolean = { if (guard.tryAcquire(pushTimeout,pushTimeUnit)) { val result = try { @@ -35,10 +36,10 @@ class BoundedTransferQueue[E <: AnyRef]( false } - protected def deq(f: => E): E = { - val result: E = f - if (result ne null) guard.release - result + //Dequeue an item (release Semaphore) + protected def deq(e: E): E = { + if (e ne null) guard.release //Signal removal of item + e } override def take(): E = deq(super.take) @@ -46,9 +47,15 @@ class BoundedTransferQueue[E <: AnyRef]( override def poll(timeout: Long, unit: TimeUnit): E = deq(super.poll(timeout,unit)) override def remainingCapacity = guard.availablePermits - override def remove(o: AnyRef): Boolean = deq({ - (if (o.isInstanceOf[E] && super.remove(o)) o else null).asInstanceOf[E] - }) ne null + + override def remove(o: AnyRef): Boolean = { + if (super.remove(o)) { + guard.release + true + } else { + false + } + } override def offer(e: E): Boolean = enq(super.offer(e)) diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 5f3c12d5a4..7c9f73258c 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -410,6 +410,8 @@ class RemoteServerHandler( } else future.getChannel.close } }) + } else { + server.foreachListener(_ ! RemoteServerClientConnected(server)) } }