Merge branch 'master' of github.com:jboner/akka
This commit is contained in:
commit
0134829d8a
2 changed files with 17 additions and 8 deletions
|
|
@ -19,7 +19,8 @@ class BoundedTransferQueue[E <: AnyRef](
|
|||
require(pushTimeUnit ne null)
|
||||
|
||||
protected val guard = new Semaphore(capacity)
|
||||
|
||||
|
||||
//Enqueue an item within the push timeout (acquire Semaphore)
|
||||
protected def enq(f: => Boolean): Boolean = {
|
||||
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
|
||||
val result = try {
|
||||
|
|
@ -35,10 +36,10 @@ class BoundedTransferQueue[E <: AnyRef](
|
|||
false
|
||||
}
|
||||
|
||||
protected def deq(f: => E): E = {
|
||||
val result: E = f
|
||||
if (result ne null) guard.release
|
||||
result
|
||||
//Dequeue an item (release Semaphore)
|
||||
protected def deq(e: E): E = {
|
||||
if (e ne null) guard.release //Signal removal of item
|
||||
e
|
||||
}
|
||||
|
||||
override def take(): E = deq(super.take)
|
||||
|
|
@ -46,9 +47,15 @@ class BoundedTransferQueue[E <: AnyRef](
|
|||
override def poll(timeout: Long, unit: TimeUnit): E = deq(super.poll(timeout,unit))
|
||||
|
||||
override def remainingCapacity = guard.availablePermits
|
||||
override def remove(o: AnyRef): Boolean = deq({
|
||||
(if (o.isInstanceOf[E] && super.remove(o)) o else null).asInstanceOf[E]
|
||||
}) ne null
|
||||
|
||||
override def remove(o: AnyRef): Boolean = {
|
||||
if (super.remove(o)) {
|
||||
guard.release
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
override def offer(e: E): Boolean =
|
||||
enq(super.offer(e))
|
||||
|
|
|
|||
|
|
@ -410,6 +410,8 @@ class RemoteServerHandler(
|
|||
} else future.getChannel.close
|
||||
}
|
||||
})
|
||||
} else {
|
||||
server.foreachListener(_ ! RemoteServerClientConnected(server))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue