inject outgoing control messages at lower level

* instead of sending the the control messages (e.g. handshake reply,
  sys msg ack) via the normal message send ingress point they are
  now injected via side channel and will therefore not go through
  higher level stages such as handshake and system message delivery
This commit is contained in:
Patrik Nordwall 2016-05-11 15:55:06 +02:00
parent f3224c0f95
commit 87386e18cf
6 changed files with 144 additions and 45 deletions

View file

@ -23,7 +23,7 @@ import akka.remote.RemoteActorRef
import akka.remote.RemoteActorRefProvider
import akka.remote.RemoteTransport
import akka.remote.UniqueAddress
import akka.remote.artery.ReplyJunction.ReplySubject
import akka.remote.artery.InboundReplyJunction.ReplySubject
import akka.remote.transport.AkkaPduCodec
import akka.remote.transport.AkkaPduProtobufCodec
import akka.serialization.Serialization
@ -47,6 +47,7 @@ import io.aeron.exceptions.ConductorServiceTimeoutException
import org.agrona.ErrorHandler
import org.agrona.IoUtil
import java.io.File
import akka.remote.artery.OutboundReplyJunction.OutboundReplyIngress
/**
* INTERNAL API
@ -72,7 +73,7 @@ private[akka] trait InboundContext {
* An inbound stage can send reply message to the origin
* address with this method.
*/
def sendReply(to: Address, message: ControlMessage): Unit
def sendReply(to: Address, message: ControlMessage): Unit // FIXME rename to sendControl
/**
* Lookup the outbound association for a given address.
@ -112,7 +113,7 @@ private[akka] trait OutboundContext {
* An outbound stage can listen to reply messages
* via this observer subject.
*/
def replySubject: ReplySubject
def replySubject: ReplySubject // FIXME rename to controlSubject
// FIXME we should be able to Send without a recipient ActorRef
def dummyRecipient: RemoteActorRef
@ -146,7 +147,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
// TODO support port 0
private def inboundChannel = s"aeron:udp?endpoint=${localAddress.address.host.get}:${localAddress.address.port.get}"
private def outboundChannel(a: Address) = s"aeron:udp?endpoint=${a.host.get}:${a.port.get}"
private val systemMessageStreamId = 1
private val systemMessageStreamId = 1 // FIXME rename to controlStreamId
private val ordinaryStreamId = 3
private val taskRunner = new TaskRunner(system)
@ -241,9 +242,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
}
// InboundContext
override def sendReply(to: Address, message: ControlMessage) = {
send(message, None, association(to).dummyRecipient)
}
override def sendReply(to: Address, message: ControlMessage) =
association(to).outboundReplyIngress.sendControlMessage(message)
override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = {
val cached = recipient.cachedAssociation
@ -282,10 +282,11 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
.to(new AeronSink(outboundChannel(outboundContext.remoteAddress), ordinaryStreamId, aeron, taskRunner))
}
def outboundSystemMessage(outboundContext: OutboundContext): Sink[Send, Any] = {
def outboundSystemMessage(outboundContext: OutboundContext): Sink[Send, OutboundReplyIngress] = {
Flow.fromGraph(killSwitch.flow[Send])
.via(new OutboundHandshake(outboundContext))
.via(new SystemMessageDelivery(outboundContext, systemMessageResendInterval))
.viaMat(new OutboundReplyJunction(outboundContext))(Keep.right)
.via(encoder)
.map(_.toArray) // TODO we should use ByteString all the way
.to(new AeronSink(outboundChannel(outboundContext.remoteAddress), systemMessageStreamId, aeron, taskRunner))
@ -338,13 +339,14 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
Source.maybe[ByteString].via(killSwitch.flow))
}
// FIXME rename to controlFlow
val inboundSystemMessageFlow: Flow[ByteString, ByteString, ReplySubject] = {
Flow.fromSinkAndSourceMat(
decoder
.via(deserializer)
.via(new InboundHandshake(this))
.via(new SystemMessageAcker(this))
.viaMat(new ReplyJunction)(Keep.right)
.viaMat(new InboundReplyJunction)(Keep.right)
.to(messageDispatcherSink),
Source.maybe[ByteString].via(killSwitch.flow))((a, b) a)
}

View file

@ -5,20 +5,20 @@ package akka.remote.artery
import scala.concurrent.Future
import scala.concurrent.Promise
import akka.actor.ActorRef
import akka.actor.Address
import akka.actor.RootActorPath
import akka.dispatch.sysmsg.SystemMessage
import akka.remote.EndpointManager.Send
import akka.remote.RemoteActorRef
import akka.remote.UniqueAddress
import akka.remote.artery.ReplyJunction.ReplySubject
import akka.remote.artery.InboundReplyJunction.ReplySubject
import akka.stream.Materializer
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.SourceQueueWithComplete
import akka.remote.artery.OutboundReplyJunction.OutboundReplyIngress
import akka.stream.scaladsl.Keep
/**
* INTERNAL API
@ -34,6 +34,13 @@ private[akka] class Association(
@volatile private[this] var queue: SourceQueueWithComplete[Send] = _
@volatile private[this] var systemMessageQueue: SourceQueueWithComplete[Send] = _
@volatile private[this] var _outboundReplyIngress: OutboundReplyIngress = _
def outboundReplyIngress: OutboundReplyIngress = {
if (_outboundReplyIngress eq null)
throw new IllegalStateException("outboundReplyIngress not initialized yet")
_outboundReplyIngress
}
override def localAddress: UniqueAddress = transport.localAddress
@ -49,7 +56,7 @@ private[akka] class Association(
// TODO: lookup subchannel
// FIXME: Use a different envelope than the old Send, but make sure the new is handled by deadLetters properly
message match {
case _: SystemMessage | _: Reply
case _: SystemMessage
implicit val ec = materializer.executionContext
systemMessageQueue.offer(Send(message, senderOption, recipient, None)).onFailure {
case e
@ -73,8 +80,12 @@ private[akka] class Association(
if (queue eq null)
queue = Source.queue(256, OverflowStrategy.dropBuffer)
.to(transport.outbound(this)).run()(materializer)
if (systemMessageQueue eq null)
systemMessageQueue = Source.queue(256, OverflowStrategy.dropBuffer)
.to(transport.outboundSystemMessage(this)).run()(materializer)
if (systemMessageQueue eq null) {
val (q, control) = Source.queue(256, OverflowStrategy.dropBuffer)
.toMat(transport.outboundSystemMessage(this))(Keep.both)
.run()(materializer)
systemMessageQueue = q
_outboundReplyIngress = control
}
}
}

View file

@ -8,7 +8,7 @@ import scala.concurrent.duration._
import akka.Done
import akka.remote.EndpointManager.Send
import akka.remote.UniqueAddress
import akka.remote.artery.ReplyJunction.ReplyObserver
import akka.remote.artery.InboundReplyJunction.ReplyObserver
import akka.stream.Attributes
import akka.stream.FlowShape
import akka.stream.Inlet

View file

@ -14,6 +14,9 @@ import akka.stream.stage.GraphStageLogic
import akka.stream.stage.GraphStageWithMaterializedValue
import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler
import akka.remote.EndpointManager.Send
import java.util.ArrayDeque
import akka.stream.stage.CallbackWrapper
/**
* Marker trait for reply messages
@ -29,7 +32,9 @@ trait ControlMessage
/**
* INTERNAL API
*/
private[akka] object ReplyJunction {
private[akka] object InboundReplyJunction {
// FIXME rename all Reply stuff to Control or ControlMessage
private[akka] trait ReplySubject {
def attach(observer: ReplyObserver): Future[Done]
@ -40,24 +45,42 @@ private[akka] object ReplyJunction {
private[akka] trait ReplyObserver {
def reply(inboundEnvelope: InboundEnvelope): Unit
}
private[InboundReplyJunction] sealed trait CallbackMessage
private[InboundReplyJunction] final case class Attach(observer: ReplyObserver, done: Promise[Done])
extends CallbackMessage
private[InboundReplyJunction] final case class Dettach(observer: ReplyObserver) extends CallbackMessage
}
/**
* INTERNAL API
*/
private[akka] class ReplyJunction
extends GraphStageWithMaterializedValue[FlowShape[InboundEnvelope, InboundEnvelope], ReplyJunction.ReplySubject] {
import ReplyJunction._
private[akka] class InboundReplyJunction
extends GraphStageWithMaterializedValue[FlowShape[InboundEnvelope, InboundEnvelope], InboundReplyJunction.ReplySubject] {
import InboundReplyJunction._
val in: Inlet[InboundEnvelope] = Inlet("ReplyJunction.in")
val out: Outlet[InboundEnvelope] = Outlet("ReplyJunction.out")
val in: Inlet[InboundEnvelope] = Inlet("InboundReplyJunction.in")
val out: Outlet[InboundEnvelope] = Outlet("InboundReplyJunction.out")
override val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val logic = new GraphStageLogic(shape) with InHandler with OutHandler with ReplySubject {
val stoppedPromise = Promise[Done]()
// FIXME see issue #20503 related to CallbackWrapper, we might implement this in a better way
val logic = new GraphStageLogic(shape) with CallbackWrapper[CallbackMessage] with InHandler with OutHandler {
private var replyObservers: Vector[ReplyObserver] = Vector.empty
private val stoppedPromise = Promise[Done]()
private val callback = getAsyncCallback[CallbackMessage] {
case Attach(observer, done)
replyObservers :+= observer
done.success(Done)
case Dettach(observer)
replyObservers = replyObservers.filterNot(_ == observer)
}
override def preStart(): Unit = {
initCallback(callback.invoke)
}
override def postStop(): Unit = stoppedPromise.success(Done)
@ -75,23 +98,95 @@ private[akka] class ReplyJunction
// OutHandler
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
// materialized value
val replySubject: ReplySubject = new ReplySubject {
override def attach(observer: ReplyObserver): Future[Done] = {
val p = Promise[Done]()
getAsyncCallback[Unit](_ {
replyObservers :+= observer
p.success(Done)
}).invoke(())
logic.invoke(Attach(observer, p))
p.future
}
override def detach(observer: ReplyObserver): Unit = {
replyObservers = replyObservers.filterNot(_ == observer)
override def detach(observer: ReplyObserver): Unit =
logic.invoke(Dettach(observer))
override def stopped: Future[Done] =
stoppedPromise.future
}
(logic, replySubject)
}
}
/**
* INTERNAL API
*/
private[akka] object OutboundReplyJunction {
trait OutboundReplyIngress {
def sendControlMessage(message: ControlMessage): Unit
}
}
/**
* INTERNAL API
*/
private[akka] class OutboundReplyJunction(outboundContext: OutboundContext)
extends GraphStageWithMaterializedValue[FlowShape[Send, Send], OutboundReplyJunction.OutboundReplyIngress] {
import OutboundReplyJunction._
val in: Inlet[Send] = Inlet("OutboundReplyJunction.in")
val out: Outlet[Send] = Outlet("OutboundReplyJunction.out")
override val shape: FlowShape[Send, Send] = FlowShape(in, out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
// FIXME see issue #20503 related to CallbackWrapper, we might implement this in a better way
val logic = new GraphStageLogic(shape) with CallbackWrapper[ControlMessage] with InHandler with OutHandler {
import OutboundReplyJunction._
private val sendControlMessageCallback = getAsyncCallback[ControlMessage](internalSendControlMessage)
private val buffer = new ArrayDeque[Send]
override def preStart(): Unit = {
initCallback(sendControlMessageCallback.invoke)
}
override def stopped: Future[Done] = stoppedPromise.future
// InHandler
override def onPush(): Unit = {
if (buffer.isEmpty && isAvailable(out))
push(out, grab(in))
else
buffer.offer(grab(in))
}
// OutHandler
override def onPull(): Unit = {
if (buffer.isEmpty && !hasBeenPulled(in))
pull(in)
else if (!buffer.isEmpty)
push(out, buffer.poll())
}
private def internalSendControlMessage(message: ControlMessage): Unit = {
if (buffer.isEmpty && isAvailable(out))
push(out, wrap(message))
else
buffer.offer(wrap(message))
}
private def wrap(message: ControlMessage): Send =
Send(message, None, outboundContext.dummyRecipient, None)
setHandlers(in, out, this)
}
(logic, logic)
// materialized value
val outboundReplyIngress = new OutboundReplyIngress {
override def sendControlMessage(message: ControlMessage): Unit =
logic.invoke(message)
}
(logic, outboundReplyIngress)
}
}

View file

@ -14,7 +14,7 @@ import scala.util.Try
import akka.Done
import akka.remote.EndpointManager.Send
import akka.remote.UniqueAddress
import akka.remote.artery.ReplyJunction.ReplyObserver
import akka.remote.artery.InboundReplyJunction.ReplyObserver
import akka.stream.Attributes
import akka.stream.FlowShape
import akka.stream.Inlet
@ -151,15 +151,6 @@ private[akka] class SystemMessageDelivery(
// InHandler
override def onPush(): Unit = {
grab(in) match {
case s @ Send(reply: ControlMessage, _, _, _)
// pass through
if (isAvailable(out))
push(out, s)
else {
// it's ok to drop the replies, but we can try
resending.offer(s)
}
case s @ Send(msg: AnyRef, _, _, _)
seqNo += 1
val sendMsg = s.copy(message = SystemMessageEnvelope(msg, seqNo, localAddress))

View file

@ -6,11 +6,11 @@ package akka.remote.artery
import akka.remote.UniqueAddress
import akka.actor.Address
import scala.concurrent.Future
import akka.remote.artery.ReplyJunction.ReplySubject
import akka.remote.artery.InboundReplyJunction.ReplySubject
import akka.remote.RemoteActorRef
import scala.concurrent.Promise
import akka.Done
import akka.remote.artery.ReplyJunction.ReplyObserver
import akka.remote.artery.InboundReplyJunction.ReplyObserver
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ThreadLocalRandom