2016-05-29 22:15:48 +02:00
|
|
|
/**
|
2018-01-04 17:26:29 +00:00
|
|
|
* Copyright (C) 2016-2018 Lightbend Inc. <https://www.lightbend.com>
|
2016-05-29 22:15:48 +02:00
|
|
|
*/
|
|
|
|
|
package akka.remote.artery
|
|
|
|
|
|
|
|
|
|
import java.util.Queue
|
|
|
|
|
import akka.stream.stage.GraphStage
|
|
|
|
|
import akka.stream.stage.OutHandler
|
|
|
|
|
import akka.stream.Attributes
|
|
|
|
|
import akka.stream.Outlet
|
|
|
|
|
import akka.stream.SourceShape
|
|
|
|
|
import akka.stream.stage.GraphStageLogic
|
|
|
|
|
import org.agrona.concurrent.ManyToOneConcurrentArrayQueue
|
|
|
|
|
import akka.stream.stage.GraphStageWithMaterializedValue
|
|
|
|
|
import org.agrona.concurrent.ManyToOneConcurrentLinkedQueueTail
|
|
|
|
|
import org.agrona.concurrent.ManyToOneConcurrentLinkedQueue
|
|
|
|
|
import java.util.concurrent.atomic.AtomicInteger
|
|
|
|
|
import scala.annotation.tailrec
|
|
|
|
|
import scala.concurrent.Promise
|
|
|
|
|
import scala.util.Try
|
|
|
|
|
import scala.util.Success
|
|
|
|
|
import scala.util.Failure
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[remote] object SendQueue {
|
|
|
|
|
trait ProducerApi[T] {
|
|
|
|
|
def offer(message: T): Boolean
|
2016-09-22 13:49:56 +02:00
|
|
|
|
|
|
|
|
def isEnabled: Boolean
|
2016-05-29 22:15:48 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
trait QueueValue[T] extends ProducerApi[T] {
|
|
|
|
|
def inject(queue: Queue[T]): Unit
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private trait WakeupSignal {
|
|
|
|
|
def wakeup(): Unit
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[remote] final class SendQueue[T] extends GraphStageWithMaterializedValue[SourceShape[T], SendQueue.QueueValue[T]] {
|
|
|
|
|
import SendQueue._
|
|
|
|
|
|
|
|
|
|
val out: Outlet[T] = Outlet("SendQueue.out")
|
|
|
|
|
override val shape: SourceShape[T] = SourceShape(out)
|
|
|
|
|
|
|
|
|
|
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, QueueValue[T]) = {
|
|
|
|
|
@volatile var needWakeup = false
|
|
|
|
|
val queuePromise = Promise[Queue[T]]()
|
|
|
|
|
|
|
|
|
|
val logic = new GraphStageLogic(shape) with OutHandler with WakeupSignal {
|
|
|
|
|
|
|
|
|
|
// using a local field for the consumer side of queue to avoid volatile access
|
|
|
|
|
private var consumerQueue: Queue[T] = null
|
|
|
|
|
|
|
|
|
|
private val wakeupCallback = getAsyncCallback[Unit] { _ ⇒
|
|
|
|
|
if (isAvailable(out))
|
|
|
|
|
tryPush()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def preStart(): Unit = {
|
|
|
|
|
implicit val ec = materializer.executionContext
|
|
|
|
|
queuePromise.future.onComplete(getAsyncCallback[Try[Queue[T]]] {
|
|
|
|
|
case Success(q) ⇒
|
|
|
|
|
consumerQueue = q
|
|
|
|
|
needWakeup = true
|
|
|
|
|
if (isAvailable(out))
|
|
|
|
|
tryPush()
|
|
|
|
|
case Failure(e) ⇒
|
|
|
|
|
failStage(e)
|
|
|
|
|
}.invoke)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onPull(): Unit = {
|
|
|
|
|
if (consumerQueue ne null)
|
|
|
|
|
tryPush()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@tailrec private def tryPush(firstAttempt: Boolean = true): Unit = {
|
|
|
|
|
consumerQueue.poll() match {
|
|
|
|
|
case null ⇒
|
|
|
|
|
needWakeup = true
|
|
|
|
|
// additional poll() to grab any elements that might missed the needWakeup
|
|
|
|
|
// and have been enqueued just after it
|
|
|
|
|
if (firstAttempt)
|
|
|
|
|
tryPush(firstAttempt = false)
|
|
|
|
|
case elem ⇒
|
|
|
|
|
needWakeup = false // there will be another onPull
|
|
|
|
|
push(out, elem)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// external call
|
|
|
|
|
override def wakeup(): Unit = {
|
|
|
|
|
wakeupCallback.invoke(())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def postStop(): Unit = {
|
2016-09-05 15:08:30 +02:00
|
|
|
// TODO quarantine will currently always be done when control stream is terminated, see issue #21359
|
2016-05-29 22:15:48 +02:00
|
|
|
if (consumerQueue ne null)
|
|
|
|
|
consumerQueue.clear()
|
|
|
|
|
super.postStop()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
setHandler(out, this)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
val queueValue = new QueueValue[T] {
|
|
|
|
|
@volatile private var producerQueue: Queue[T] = null
|
|
|
|
|
|
|
|
|
|
override def inject(q: Queue[T]): Unit = {
|
|
|
|
|
producerQueue = q
|
|
|
|
|
queuePromise.success(q)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def offer(message: T): Boolean = {
|
|
|
|
|
val q = producerQueue
|
|
|
|
|
if (q eq null) throw new IllegalStateException("offer not allowed before injecting the queue")
|
|
|
|
|
val result = q.offer(message)
|
2016-06-04 21:43:48 +02:00
|
|
|
if (result && needWakeup) {
|
|
|
|
|
needWakeup = false
|
2016-05-29 22:15:48 +02:00
|
|
|
logic.wakeup()
|
2016-06-04 21:43:48 +02:00
|
|
|
}
|
2016-05-29 22:15:48 +02:00
|
|
|
result
|
|
|
|
|
}
|
2016-09-22 13:49:56 +02:00
|
|
|
|
|
|
|
|
override def isEnabled: Boolean = true
|
2016-05-29 22:15:48 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
(logic, queueValue)
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|