pekko/akka-remote/src/main/scala/akka/remote/artery/Control.scala

216 lines
6.8 KiB
Scala
Raw Normal View History

/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import java.util.ArrayDeque
import scala.concurrent.Future
import scala.concurrent.Promise
import akka.Done
import akka.remote.EndpointManager.Send
import akka.stream.Attributes
import akka.stream.FlowShape
import akka.stream.Inlet
import akka.stream.Outlet
import akka.stream.stage.CallbackWrapper
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.GraphStageWithMaterializedValue
import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler
import akka.remote.UniqueAddress
import akka.util.OptionVal
/**
* INTERNAL API: Marker trait for reply messages
*/
private[akka] trait Reply extends ControlMessage
/**
* INTERNAL API
* Marker trait for control messages that can be sent via the system message sub-channel
* but don't need full reliable delivery. E.g. `HandshakeReq` and `Reply`.
*/
private[akka] trait ControlMessage
/**
* INTERNAL API
*/
private[akka] final case class Quarantined(from: UniqueAddress, to: UniqueAddress) extends ControlMessage // FIXME serialization
/**
* INTERNAL API
*/
2016-05-12 08:56:28 +02:00
private[akka] object InboundControlJunction {
/**
* Observer subject for inbound control messages.
* Interested observers can attach themselves to the
* subject to get notification of incoming control
* messages.
*/
private[akka] trait ControlMessageSubject {
def attach(observer: ControlMessageObserver): Future[Done]
def detach(observer: ControlMessageObserver): Unit
def stopped: Future[Done]
}
2016-05-12 08:56:28 +02:00
private[akka] trait ControlMessageObserver {
/**
* Notification of incoming control message. The message
* of the envelope is always a `ControlMessage`.
*/
def notify(inboundEnvelope: InboundEnvelope): Unit
}
2016-05-12 08:56:28 +02:00
// messages for the CallbackWrapper
private[InboundControlJunction] sealed trait CallbackMessage
private[InboundControlJunction] final case class Attach(observer: ControlMessageObserver, done: Promise[Done])
extends CallbackMessage
2016-05-12 08:56:28 +02:00
private[InboundControlJunction] final case class Dettach(observer: ControlMessageObserver) extends CallbackMessage
}
/**
* INTERNAL API
*/
2016-05-12 08:56:28 +02:00
private[akka] class InboundControlJunction
extends GraphStageWithMaterializedValue[FlowShape[InboundEnvelope, InboundEnvelope], InboundControlJunction.ControlMessageSubject] {
import InboundControlJunction._
2016-05-12 08:56:28 +02:00
val in: Inlet[InboundEnvelope] = Inlet("InboundControlJunction.in")
val out: Outlet[InboundEnvelope] = Outlet("InboundControlJunction.out")
override val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val stoppedPromise = Promise[Done]()
// FIXME see issue #20503 related to CallbackWrapper, we might implement this in a better way
val logic = new GraphStageLogic(shape) with CallbackWrapper[CallbackMessage] with InHandler with OutHandler {
2016-05-12 08:56:28 +02:00
private var observers: Vector[ControlMessageObserver] = Vector.empty
private val callback = getAsyncCallback[CallbackMessage] {
case Attach(observer, done)
2016-05-12 08:56:28 +02:00
observers :+= observer
done.success(Done)
case Dettach(observer)
2016-05-12 08:56:28 +02:00
observers = observers.filterNot(_ == observer)
}
override def preStart(): Unit = {
initCallback(callback.invoke)
}
override def postStop(): Unit = stoppedPromise.success(Done)
// InHandler
override def onPush(): Unit = {
grab(in) match {
case env @ InboundEnvelope(_, _, _: ControlMessage, _, _)
2016-05-12 08:56:28 +02:00
observers.foreach(_.notify(env))
pull(in)
case env
push(out, env)
}
}
// OutHandler
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
// materialized value
2016-05-12 08:56:28 +02:00
val controlSubject: ControlMessageSubject = new ControlMessageSubject {
override def attach(observer: ControlMessageObserver): Future[Done] = {
val p = Promise[Done]()
logic.invoke(Attach(observer, p))
p.future
}
2016-05-12 08:56:28 +02:00
override def detach(observer: ControlMessageObserver): Unit =
logic.invoke(Dettach(observer))
override def stopped: Future[Done] =
stoppedPromise.future
}
2016-05-12 08:56:28 +02:00
(logic, controlSubject)
}
}
/**
* INTERNAL API
*/
2016-05-12 08:56:28 +02:00
private[akka] object OutboundControlJunction {
2016-05-19 08:24:27 +02:00
private[akka] trait OutboundControlIngress {
def sendControlMessage(message: ControlMessage): Unit
}
}
/**
* INTERNAL API
*/
2016-05-12 08:56:28 +02:00
private[akka] class OutboundControlJunction(outboundContext: OutboundContext)
extends GraphStageWithMaterializedValue[FlowShape[Send, Send], OutboundControlJunction.OutboundControlIngress] {
import OutboundControlJunction._
val in: Inlet[Send] = Inlet("OutboundControlJunction.in")
val out: Outlet[Send] = Outlet("OutboundControlJunction.out")
override val shape: FlowShape[Send, Send] = FlowShape(in, out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
// FIXME see issue #20503 related to CallbackWrapper, we might implement this in a better way
2016-05-19 08:24:27 +02:00
val logic = new GraphStageLogic(shape) with CallbackWrapper[ControlMessage] with InHandler with OutHandler with StageLogging {
2016-05-12 08:56:28 +02:00
import OutboundControlJunction._
private val sendControlMessageCallback = getAsyncCallback[ControlMessage](internalSendControlMessage)
private val maxControlMessageBufferSize: Int = 1024 // FIXME config
private val buffer = new ArrayDeque[Send]
override def preStart(): Unit = {
initCallback(sendControlMessageCallback.invoke)
}
// InHandler
override def onPush(): Unit = {
if (buffer.isEmpty && isAvailable(out))
push(out, grab(in))
else
buffer.offer(grab(in))
}
// OutHandler
override def onPull(): Unit = {
if (buffer.isEmpty && !hasBeenPulled(in))
pull(in)
else if (!buffer.isEmpty)
push(out, buffer.poll())
}
private def internalSendControlMessage(message: ControlMessage): Unit = {
if (buffer.isEmpty && isAvailable(out))
push(out, wrap(message))
else if (buffer.size < maxControlMessageBufferSize)
buffer.offer(wrap(message))
else {
// it's alright to drop control messages
2016-05-19 08:24:27 +02:00
log.debug("Dropping control message [{}] due to full buffer.", message.getClass.getName)
}
}
private def wrap(message: ControlMessage): Send =
Send(message, OptionVal.None, outboundContext.dummyRecipient, None)
setHandlers(in, out, this)
}
// materialized value
2016-05-12 08:56:28 +02:00
val outboundControlIngress = new OutboundControlIngress {
override def sendControlMessage(message: ControlMessage): Unit =
logic.invoke(message)
}
2016-05-12 08:56:28 +02:00
(logic, outboundControlIngress)
}
}