first stab at handshake, #20313

* UID exchange with handshake stages
* second iteration of reply side-channel, observable
* InboundContext and OutboundContext to facilitate testing
  without real transport
* collapse ArterySubsystem and Transport into ArteryTransport
* incomplete HandshakeRestartReceiverSpec (origin address missing
  to be able to implement that part
* remove embedded aeron media driver directory on shutdown
This commit is contained in:
Patrik Nordwall 2016-05-09 07:31:41 +02:00
parent f00a95c652
commit 16cf8d4ab6
16 changed files with 1194 additions and 641 deletions

View file

@ -76,6 +76,18 @@ object Address {
* Constructs a new Address with the specified protocol, system name, host and port
*/
def apply(protocol: String, system: String, host: String, port: Int) = new Address(protocol, system, Some(host), Some(port))
/**
* `Address` ordering type class, sorts addresses by protocol, name, host and port.
*/
implicit val addressOrdering: Ordering[Address] = Ordering.fromLessThan[Address] { (a, b)
if (a eq b) false
else if (a.protocol != b.protocol) a.system.compareTo(b.protocol) < 0
else if (a.system != b.system) a.system.compareTo(b.system) < 0
else if (a.host != b.host) a.host.getOrElse("").compareTo(b.host.getOrElse("")) < 0
else if (a.port != b.port) a.port.getOrElse(0) < b.port.getOrElse(0)
else false
}
}
private[akka] trait PathUtils {

View file

@ -20,6 +20,8 @@ import com.typesafe.config.ConfigFactory
import io.aeron.Aeron
import io.aeron.driver.MediaDriver
import akka.actor.ExtendedActorSystem
import org.agrona.IoUtil
import java.io.File
object AeronStreamConsistencySpec extends MultiNodeConfig {
val first = role("first")
@ -85,6 +87,7 @@ abstract class AeronStreamConsistencySpec
taskRunner.stop()
aeron.close()
driver.close()
IoUtil.delete(new File(driver.aeronDirectoryName), true)
super.afterAll()
}

View file

@ -22,6 +22,7 @@ import io.aeron.driver.MediaDriver
import akka.stream.KillSwitches
import java.io.File
import io.aeron.CncFileDescriptor
import org.agrona.IoUtil
object AeronStreamMaxThroughputSpec extends MultiNodeConfig {
val first = role("first")
@ -127,6 +128,7 @@ abstract class AeronStreamMaxThroughputSpec
taskRunner.stop()
aeron.close()
driver.close()
IoUtil.delete(new File(driver.aeronDirectoryName), true)
runOn(second) {
println(plot.csv(system.name))
}

View file

@ -0,0 +1,135 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit.NANOSECONDS
import scala.concurrent.duration._
import akka.actor._
import akka.remote.RemoteActorRefProvider
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._
import com.typesafe.config.ConfigFactory
import java.net.InetAddress
import scala.concurrent.Await
import akka.remote.RARP
import akka.remote.AddressUidExtension
object HandshakeRestartReceiverSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
commonConfig(debugConfig(on = false).withFallback(
ConfigFactory.parseString(s"""
akka {
loglevel = INFO
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.artery {
enabled = on
}
}
""")))
def aeronPort(roleName: RoleName): Int =
roleName match {
case `first` 20531 // TODO yeah, we should have support for dynamic port assignment
case `second` 20532
}
nodeConfig(first) {
ConfigFactory.parseString(s"""
akka.remote.artery.port = ${aeronPort(first)}
""")
}
nodeConfig(second) {
ConfigFactory.parseString(s"""
akka.remote.artery.port = ${aeronPort(second)}
""")
}
class Subject extends Actor {
def receive = {
case "shutdown" context.system.terminate()
case "identify" sender() ! (AddressUidExtension(context.system).addressUid -> self)
}
}
}
class HandshakeRestartReceiverSpecMultiJvmNode1 extends HandshakeRestartReceiverSpec
class HandshakeRestartReceiverSpecMultiJvmNode2 extends HandshakeRestartReceiverSpec
abstract class HandshakeRestartReceiverSpec
extends MultiNodeSpec(HandshakeRestartReceiverSpec)
with STMultiNodeSpec with ImplicitSender {
import HandshakeRestartReceiverSpec._
override def initialParticipants = roles.size
override def afterAll(): Unit = {
super.afterAll()
}
def identifyWithUid(rootPath: ActorPath, actorName: String): (Int, ActorRef) = {
system.actorSelection(rootPath / "user" / actorName) ! "identify"
expectMsgType[(Int, ActorRef)]
}
"Artery Handshake" must {
"detect restarted receiver and initiate new handshake" in {
runOn(second) {
system.actorOf(Props[Subject], "subject")
}
enterBarrier("subject-started")
runOn(first) {
val secondRootPath = node(second)
val (secondUid, _) = identifyWithUid(secondRootPath, "subject")
val secondAddress = node(second).address
val secondAssociation = RARP(system).provider.transport.asInstanceOf[ArteryTransport].association(secondAddress)
val secondUniqueRemoteAddress = Await.result(secondAssociation.uniqueRemoteAddress, 3.seconds)
secondUniqueRemoteAddress.address should ===(secondAddress)
secondUniqueRemoteAddress.uid should ===(secondUid)
enterBarrier("before-shutdown")
testConductor.shutdown(second).await
within(30.seconds) {
awaitAssert {
within(1.second) {
identifyWithUid(secondRootPath, "subject2")
}
}
}
val (secondUid2, subject2) = identifyWithUid(secondRootPath, "subject2")
secondUid2 should !==(secondUid)
// FIXME verify that UID in association was replaced (not implemented yet)
subject2 ! "shutdown"
}
runOn(second) {
val addr = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
enterBarrier("before-shutdown")
Await.result(system.whenTerminated, 10.seconds)
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
akka.remote.artery.port = ${addr.port.get}
""").withFallback(system.settings.config))
freshSystem.actorOf(Props[Subject], "subject2")
Await.result(freshSystem.whenTerminated, 45.seconds)
}
}
}
}

View file

@ -19,7 +19,7 @@ import scala.util.control.Exception.Catcher
import scala.concurrent.Future
import akka.ConfigurationException
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import akka.remote.artery.ArterySubsystem
import akka.remote.artery.ArteryTransport
/**
* INTERNAL API
@ -182,7 +182,7 @@ private[akka] class RemoteActorRefProvider(
d
},
serialization = SerializationExtension(system),
transport = if (remoteSettings.EnableArtery) new ArterySubsystem(system, this) else new Remoting(system, this))
transport = if (remoteSettings.EnableArtery) new ArteryTransport(system, this) else new Remoting(system, this))
_internals = internals
remotingTerminator ! internals

View file

@ -0,0 +1,17 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote
import akka.actor.Address
@SerialVersionUID(1L)
final case class UniqueAddress(address: Address, uid: Int) extends Ordered[UniqueAddress] {
override def hashCode = uid
def compare(that: UniqueAddress): Int = {
val result = Address.addressOrdering.compare(this.address, that.address)
if (result == 0) if (this.uid < that.uid) -1 else if (this.uid == that.uid) 0 else 1
else result
}
}

View file

@ -1,133 +0,0 @@
/**
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import java.util.concurrent.ConcurrentHashMap
import akka.actor.{ ActorRef, Address, ExtendedActorSystem }
import akka.event.{ Logging, LoggingAdapter }
import akka.remote.EndpointManager.Send
import akka.remote.transport.AkkaPduProtobufCodec
import akka.remote.{ DefaultMessageDispatcher, RemoteActorRef, RemoteActorRefProvider, RemoteTransport }
import akka.stream.scaladsl.{ Sink, Source, SourceQueueWithComplete, Tcp }
import akka.stream.{ ActorMaterializer, Materializer, OverflowStrategy }
import akka.{ Done, NotUsed }
import scala.concurrent.duration._
import scala.concurrent.{ Await, Future }
import akka.dispatch.sysmsg.SystemMessage
/**
* INTERNAL API
*/
private[remote] class ArterySubsystem(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) {
import provider.remoteSettings
@volatile private[this] var address: Address = _
@volatile private[this] var transport: Transport = _
@volatile private[this] var tcpBinding: Option[Tcp.ServerBinding] = None
@volatile private[this] var materializer: Materializer = _
override val log: LoggingAdapter = Logging(system.eventStream, getClass.getName)
override def defaultAddress: Address = address
override def addresses: Set[Address] = Set(address)
override def localAddressForRemote(remote: Address): Address = defaultAddress
// FIXME: This does locking on putIfAbsent, we need something smarter
private[this] val associations = new ConcurrentHashMap[Address, Association]()
override def start(): Unit = {
// TODO: Configure materializer properly
// TODO: Have a supervisor actor
address = Address("akka.artery", system.name, remoteSettings.ArteryHostname, remoteSettings.ArteryPort)
materializer = ActorMaterializer()(system)
transport =
new Transport(
address,
system,
materializer,
provider,
AkkaPduProtobufCodec)
transport.start()
}
override def shutdown(): Future[Done] = {
if (transport != null) transport.shutdown()
else Future.successful(Done)
}
override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = {
val cached = recipient.cachedAssociation
val remoteAddress = recipient.path.address
val association =
if (cached ne null) cached
else associate(remoteAddress)
association.send(message, senderOption, recipient)
}
private def associate(remoteAddress: Address): Association = {
val current = associations.get(remoteAddress)
if (current ne null) current
else {
associations.computeIfAbsent(remoteAddress, new java.util.function.Function[Address, Association] {
override def apply(remoteAddress: Address): Association = {
val newAssociation = new Association(materializer, remoteAddress, transport)
newAssociation.associate() // This is a bit costly for this blocking method :(
newAssociation
}
})
}
}
override def quarantine(remoteAddress: Address, uid: Option[Int]): Unit = {
???
}
}
/**
* INTERNAL API
*
* Thread-safe, mutable holder for association state. Main entry point for remote destined message to a specific
* remote address.
*/
private[akka] class Association(
val materializer: Materializer,
val remoteAddress: Address,
val transport: Transport) {
@volatile private[this] var queue: SourceQueueWithComplete[Send] = _
@volatile private[this] var systemMessageQueue: SourceQueueWithComplete[Send] = _
def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = {
// TODO: lookup subchannel
// FIXME: Use a different envelope than the old Send, but make sure the new is handled by deadLetters properly
message match {
case _: SystemMessage | _: SystemMessageDelivery.SystemMessageReply
implicit val ec = materializer.executionContext
systemMessageQueue.offer(Send(message, senderOption, recipient, None)).onFailure {
case e
// FIXME proper error handling, and quarantining
println(s"# System message dropped, due to $e") // FIXME
}
case _
queue.offer(Send(message, senderOption, recipient, None))
}
}
def quarantine(uid: Option[Int]): Unit = ()
// Idempotent
def associate(): Unit = {
if (queue eq null)
queue = Source.queue(256, OverflowStrategy.dropBuffer)
.to(transport.outbound(remoteAddress)).run()(materializer)
if (systemMessageQueue eq null)
systemMessageQueue = Source.queue(256, OverflowStrategy.dropBuffer)
.to(transport.outboundSystemMessage(remoteAddress)).run()(materializer)
}
}

View file

@ -0,0 +1,353 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import java.nio.ByteOrder
import java.util.concurrent.ConcurrentHashMap
import java.util.function.{ Function JFunction }
import scala.concurrent.Future
import scala.concurrent.duration._
import akka.Done
import akka.NotUsed
import akka.actor.ActorRef
import akka.actor.Address
import akka.actor.ExtendedActorSystem
import akka.actor.InternalActorRef
import akka.event.Logging
import akka.event.LoggingAdapter
import akka.remote.AddressUidExtension
import akka.remote.EndpointManager.Send
import akka.remote.MessageSerializer
import akka.remote.RemoteActorRef
import akka.remote.RemoteActorRefProvider
import akka.remote.RemoteTransport
import akka.remote.UniqueAddress
import akka.remote.artery.ReplyJunction.ReplySubject
import akka.remote.transport.AkkaPduCodec
import akka.remote.transport.AkkaPduProtobufCodec
import akka.serialization.Serialization
import akka.stream.ActorMaterializer
import akka.stream.KillSwitches
import akka.stream.Materializer
import akka.stream.SharedKillSwitch
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Framing
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.util.ByteString
import akka.util.ByteStringBuilder
import io.aeron.Aeron
import io.aeron.AvailableImageHandler
import io.aeron.Image
import io.aeron.UnavailableImageHandler
import io.aeron.driver.MediaDriver
import io.aeron.exceptions.ConductorServiceTimeoutException
import org.agrona.ErrorHandler
import org.agrona.IoUtil
import java.io.File
/**
* INTERNAL API
*/
private[akka] final case class InboundEnvelope(
recipient: InternalActorRef,
recipientAddress: Address,
message: AnyRef,
senderOption: Option[ActorRef])
/**
* INTERNAL API
* Inbound API that is used by the stream stages.
* Separate trait to facilitate testing without real transport.
*/
private[akka] trait InboundContext {
/**
* The local inbound address.
*/
def localAddress: UniqueAddress
/**
* An inbound stage can send reply message to the origin
* address with this method.
*/
def sendReply(to: Address, message: ControlMessage): Unit
/**
* Lookup the outbound association for a given address.
*/
def association(remoteAddress: Address): OutboundContext
}
/**
* INTERNAL API
* Outbound association API that is used by the stream stages.
* Separate trait to facilitate testing without real transport.
*/
private[akka] trait OutboundContext {
/**
* The local inbound address.
*/
def localAddress: UniqueAddress
/**
* The outbound address for this association.
*/
def remoteAddress: Address
/**
* Full outbound address with UID for this association.
* Completed when by the handshake.
*/
def uniqueRemoteAddress: Future[UniqueAddress]
/**
* Set the outbound address with UID when the
* handshake is completed.
*/
def completeRemoteAddress(a: UniqueAddress): Unit
/**
* An outbound stage can listen to reply messages
* via this observer subject.
*/
def replySubject: ReplySubject
// FIXME we should be able to Send without a recipient ActorRef
def dummyRecipient: RemoteActorRef
}
/**
* INTERNAL API
*/
private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider)
extends RemoteTransport(_system, _provider) with InboundContext {
import provider.remoteSettings
// these vars are initialized once in the start method
@volatile private[this] var _localAddress: UniqueAddress = _
override def localAddress: UniqueAddress = _localAddress
@volatile private[this] var materializer: Materializer = _
@volatile private[this] var replySubject: ReplySubject = _
@volatile private[this] var messageDispatcher: MessageDispatcher = _
@volatile private[this] var driver: MediaDriver = _
@volatile private[this] var aeron: Aeron = _
override val log: LoggingAdapter = Logging(system.eventStream, getClass.getName)
override def defaultAddress: Address = localAddress.address
override def addresses: Set[Address] = Set(defaultAddress)
override def localAddressForRemote(remote: Address): Address = defaultAddress
private val codec: AkkaPduCodec = AkkaPduProtobufCodec
private val killSwitch: SharedKillSwitch = KillSwitches.shared("transportKillSwitch")
private val systemMessageResendInterval: FiniteDuration = 1.second // FIXME config
// TODO support port 0
private def inboundChannel = s"aeron:udp?endpoint=${localAddress.address.host.get}:${localAddress.address.port.get}"
private def outboundChannel(a: Address) = s"aeron:udp?endpoint=${a.host.get}:${a.port.get}"
private val systemMessageStreamId = 1
private val ordinaryStreamId = 3
private val taskRunner = new TaskRunner(system)
// FIXME: This does locking on putIfAbsent, we need something smarter
private[this] val associations = new ConcurrentHashMap[Address, Association]()
override def start(): Unit = {
startMediaDriver()
startAeron()
taskRunner.start()
// TODO: Configure materializer properly
// TODO: Have a supervisor actor
_localAddress = UniqueAddress(
Address("akka.artery", system.name, remoteSettings.ArteryHostname, remoteSettings.ArteryPort),
AddressUidExtension(system).addressUid)
materializer = ActorMaterializer()(system)
messageDispatcher = new MessageDispatcher(system, provider)
runInboundFlows()
}
private def startMediaDriver(): Unit = {
// TODO also support external media driver
val driverContext = new MediaDriver.Context
// FIXME settings from config
driverContext.clientLivenessTimeoutNs(SECONDS.toNanos(10))
driverContext.imageLivenessTimeoutNs(SECONDS.toNanos(10))
driverContext.driverTimeoutMs(SECONDS.toNanos(10))
driver = MediaDriver.launchEmbedded(driverContext)
}
private def startAeron(): Unit = {
val ctx = new Aeron.Context
ctx.availableImageHandler(new AvailableImageHandler {
override def onAvailableImage(img: Image): Unit = {
if (log.isDebugEnabled)
log.debug(s"onAvailableImage from ${img.sourceIdentity} session ${img.sessionId}")
}
})
ctx.unavailableImageHandler(new UnavailableImageHandler {
override def onUnavailableImage(img: Image): Unit = {
if (log.isDebugEnabled)
log.debug(s"onUnavailableImage from ${img.sourceIdentity} session ${img.sessionId}")
// FIXME we should call FragmentAssembler.freeSessionBuffer when image is unavailable
}
})
ctx.errorHandler(new ErrorHandler {
override def onError(cause: Throwable): Unit = {
cause match {
case e: ConductorServiceTimeoutException
// Timeout between service calls
log.error(cause, s"Aeron ServiceTimeoutException, ${cause.getMessage}")
case _
log.error(cause, s"Aeron error, ${cause.getMessage}")
}
}
})
ctx.aeronDirectoryName(driver.aeronDirectoryName)
aeron = Aeron.connect(ctx)
}
private def runInboundFlows(): Unit = {
replySubject = Source.fromGraph(new AeronSource(inboundChannel, systemMessageStreamId, aeron, taskRunner))
.async // FIXME measure
.map(ByteString.apply) // TODO we should use ByteString all the way
.viaMat(inboundSystemMessageFlow)(Keep.right)
.to(Sink.ignore)
.run()(materializer)
Source.fromGraph(new AeronSource(inboundChannel, ordinaryStreamId, aeron, taskRunner))
.async // FIXME measure
.map(ByteString.apply) // TODO we should use ByteString all the way
.via(inboundFlow)
.runWith(Sink.ignore)(materializer)
}
override def shutdown(): Future[Done] = {
killSwitch.shutdown()
if (taskRunner != null) taskRunner.stop()
if (aeron != null) aeron.close()
if (driver != null) {
driver.close()
// FIXME only delete files for embedded media driver, and it should also be configurable
IoUtil.delete(new File(driver.aeronDirectoryName), true)
}
Future.successful(Done)
}
// InboundContext
override def sendReply(to: Address, message: ControlMessage) = {
send(message, None, association(to).dummyRecipient)
}
override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = {
val cached = recipient.cachedAssociation
val remoteAddress = recipient.path.address
val a =
if (cached ne null) cached
else association(remoteAddress)
a.send(message, senderOption, recipient)
}
override def association(remoteAddress: Address): Association = {
val current = associations.get(remoteAddress)
if (current ne null) current
else {
associations.computeIfAbsent(remoteAddress, new JFunction[Address, Association] {
override def apply(remoteAddress: Address): Association = {
val newAssociation = new Association(ArteryTransport.this, materializer, remoteAddress, replySubject)
newAssociation.associate() // This is a bit costly for this blocking method :(
newAssociation
}
})
}
}
override def quarantine(remoteAddress: Address, uid: Option[Int]): Unit = {
???
}
def outbound(outboundContext: OutboundContext): Sink[Send, Any] = {
Flow.fromGraph(killSwitch.flow[Send])
.via(new OutboundHandshake(outboundContext))
.via(encoder)
.map(_.toArray) // TODO we should use ByteString all the way
.to(new AeronSink(outboundChannel(outboundContext.remoteAddress), ordinaryStreamId, aeron, taskRunner))
}
def outboundSystemMessage(outboundContext: OutboundContext): Sink[Send, Any] = {
Flow.fromGraph(killSwitch.flow[Send])
.via(new OutboundHandshake(outboundContext))
.via(new SystemMessageDelivery(outboundContext, systemMessageResendInterval))
.via(encoder)
.map(_.toArray) // TODO we should use ByteString all the way
.to(new AeronSink(outboundChannel(outboundContext.remoteAddress), systemMessageStreamId, aeron, taskRunner))
}
// TODO: Try out parallelized serialization (mapAsync) for performance
val encoder: Flow[Send, ByteString, NotUsed] = Flow[Send].map { sendEnvelope
val pdu: ByteString = codec.constructMessage(
sendEnvelope.recipient.localAddressToUse,
sendEnvelope.recipient,
Serialization.currentTransportInformation.withValue(Serialization.Information(localAddress.address, system)) {
MessageSerializer.serialize(system, sendEnvelope.message.asInstanceOf[AnyRef])
},
sendEnvelope.senderOption,
seqOption = None, // FIXME: Acknowledgements will be handled differently I just reused the old codec
ackOption = None)
// TODO: Drop unserializable messages
// TODO: Drop oversized messages
(new ByteStringBuilder).putInt(pdu.size)(ByteOrder.LITTLE_ENDIAN).result() ++ pdu
}
val decoder: Flow[ByteString, AkkaPduCodec.Message, NotUsed] =
Framing.lengthField(4, maximumFrameLength = 256000)
.map { frame
// TODO: Drop unserializable messages
val pdu = codec.decodeMessage(frame.drop(4), provider, localAddress.address)._2.get
pdu
}
val messageDispatcherSink: Sink[InboundEnvelope, Future[Done]] = Sink.foreach[InboundEnvelope] { m
messageDispatcher.dispatch(m.recipient, m.recipientAddress, m.message, m.senderOption)
}
val deserializer: Flow[AkkaPduCodec.Message, InboundEnvelope, NotUsed] =
Flow[AkkaPduCodec.Message].map { m
InboundEnvelope(
m.recipient,
m.recipientAddress,
MessageSerializer.deserialize(system, m.serializedMessage),
m.senderOption)
}
val inboundFlow: Flow[ByteString, ByteString, NotUsed] = {
Flow.fromSinkAndSource(
decoder
.via(deserializer)
.via(new InboundHandshake(this))
.to(messageDispatcherSink),
Source.maybe[ByteString].via(killSwitch.flow))
}
val inboundSystemMessageFlow: Flow[ByteString, ByteString, ReplySubject] = {
Flow.fromSinkAndSourceMat(
decoder
.via(deserializer)
.via(new InboundHandshake(this))
.via(new SystemMessageAcker(this))
.viaMat(new ReplyJunction)(Keep.right)
.to(messageDispatcherSink),
Source.maybe[ByteString].via(killSwitch.flow))((a, b) a)
}
}

View file

@ -0,0 +1,80 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import scala.concurrent.Future
import scala.concurrent.Promise
import akka.actor.ActorRef
import akka.actor.Address
import akka.actor.RootActorPath
import akka.dispatch.sysmsg.SystemMessage
import akka.remote.EndpointManager.Send
import akka.remote.RemoteActorRef
import akka.remote.UniqueAddress
import akka.remote.artery.ReplyJunction.ReplySubject
import akka.stream.Materializer
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.SourceQueueWithComplete
/**
* INTERNAL API
*
* Thread-safe, mutable holder for association state. Main entry point for remote destined message to a specific
* remote address.
*/
private[akka] class Association(
val transport: ArteryTransport,
val materializer: Materializer,
override val remoteAddress: Address,
override val replySubject: ReplySubject) extends OutboundContext {
@volatile private[this] var queue: SourceQueueWithComplete[Send] = _
@volatile private[this] var systemMessageQueue: SourceQueueWithComplete[Send] = _
override def localAddress: UniqueAddress = transport.localAddress
// FIXME we also need to be able to switch to new uid
private val _uniqueRemoteAddress = Promise[UniqueAddress]()
override def uniqueRemoteAddress: Future[UniqueAddress] = _uniqueRemoteAddress.future
override def completeRemoteAddress(a: UniqueAddress): Unit = {
require(a.address == remoteAddress, s"Wrong UniqueAddress got [$a.address], expected [$remoteAddress]")
_uniqueRemoteAddress.trySuccess(a)
}
def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = {
// TODO: lookup subchannel
// FIXME: Use a different envelope than the old Send, but make sure the new is handled by deadLetters properly
message match {
case _: SystemMessage | _: Reply
implicit val ec = materializer.executionContext
systemMessageQueue.offer(Send(message, senderOption, recipient, None)).onFailure {
case e
// FIXME proper error handling, and quarantining
println(s"# System message dropped, due to $e") // FIXME
}
case _
queue.offer(Send(message, senderOption, recipient, None))
}
}
// FIXME we should be able to Send without a recipient ActorRef
override val dummyRecipient: RemoteActorRef =
transport.provider.resolveActorRef(RootActorPath(remoteAddress) / "system" / "dummy").asInstanceOf[RemoteActorRef]
def quarantine(uid: Option[Int]): Unit = ()
// Idempotent
def associate(): Unit = {
// FIXME detect and handle stream failure, e.g. handshake timeout
if (queue eq null)
queue = Source.queue(256, OverflowStrategy.dropBuffer)
.to(transport.outbound(this)).run()(materializer)
if (systemMessageQueue eq null)
systemMessageQueue = Source.queue(256, OverflowStrategy.dropBuffer)
.to(transport.outboundSystemMessage(this)).run()(materializer)
}
}

View file

@ -0,0 +1,182 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import java.util.concurrent.TimeoutException
import scala.concurrent.duration._
import akka.Done
import akka.remote.EndpointManager.Send
import akka.remote.UniqueAddress
import akka.remote.artery.ReplyJunction.ReplyObserver
import akka.stream.Attributes
import akka.stream.FlowShape
import akka.stream.Inlet
import akka.stream.Outlet
import akka.stream.stage.GraphStage
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler
import akka.stream.stage.TimerGraphStageLogic
/**
* INTERNAL API
*/
private[akka] object OutboundHandshake {
// FIXME serialization for these messages
final case class HandshakeReq(from: UniqueAddress) extends ControlMessage
final case class HandshakeRsp(from: UniqueAddress) extends Reply
private sealed trait HandshakeState
private case object Start extends HandshakeState
private case object ReplyObserverAttached extends HandshakeState
private case object ReqInProgress extends HandshakeState
private case object Completed extends HandshakeState
private case object HandshakeTimeout
}
/**
* INTERNAL API
*/
private[akka] class OutboundHandshake(outboundContext: OutboundContext) extends GraphStage[FlowShape[Send, Send]] {
val in: Inlet[Send] = Inlet("OutboundHandshake.in")
val out: Outlet[Send] = Outlet("OutboundHandshake.out")
override val shape: FlowShape[Send, Send] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with InHandler with OutHandler with ReplyObserver {
import OutboundHandshake._
private val timeout: FiniteDuration = 10.seconds // FIXME config
private var handshakeState: HandshakeState = Start
private def remoteAddress = outboundContext.remoteAddress
override def preStart(): Unit = {
if (outboundContext.uniqueRemoteAddress.isCompleted) {
handshakeState = Completed
} else {
implicit val ec = materializer.executionContext
outboundContext.replySubject.attach(this).foreach {
getAsyncCallback[Done] { _
if (handshakeState != Completed) {
if (isAvailable(out))
pushHandshakeReq()
else
handshakeState = ReplyObserverAttached
}
}.invoke
}
outboundContext.uniqueRemoteAddress.foreach {
getAsyncCallback[UniqueAddress] { a
if (handshakeState != Completed) {
handshakeCompleted()
if (isAvailable(out) && !hasBeenPulled(in))
pull(in)
}
}.invoke
}
scheduleOnce(HandshakeTimeout, timeout)
}
}
override def postStop(): Unit = {
outboundContext.replySubject.detach(this)
}
// InHandler
override def onPush(): Unit = {
if (handshakeState != Completed)
throw new IllegalStateException(s"onPush before handshake completed, was [$handshakeState]")
push(out, grab(in))
}
// OutHandler
override def onPull(): Unit = {
handshakeState match {
case Completed pull(in)
case ReplyObserverAttached
pushHandshakeReq()
case Start // will push HandshakeReq when ReplyObserver is attached
case ReqInProgress // will pull when handshake reply is received
}
}
private def pushHandshakeReq(): Unit = {
handshakeState = ReqInProgress
// FIXME we should be able to Send without recipient ActorRef
push(out, Send(HandshakeReq(outboundContext.localAddress), None, outboundContext.dummyRecipient, None))
}
private def handshakeCompleted(): Unit = {
handshakeState = Completed
cancelTimer(HandshakeTimeout)
outboundContext.replySubject.detach(this)
}
override protected def onTimer(timerKey: Any): Unit =
timerKey match {
case HandshakeTimeout
failStage(new TimeoutException(
s"Handshake with [$remoteAddress] did not complete within ${timeout.toMillis} ms"))
}
// ReplyObserver, external call
override def reply(inboundEnvelope: InboundEnvelope): Unit = {
inboundEnvelope.message match {
case rsp: HandshakeRsp
if (rsp.from.address == remoteAddress) {
getAsyncCallback[HandshakeRsp] { reply
if (handshakeState != Completed) {
handshakeCompleted()
outboundContext.completeRemoteAddress(reply.from)
if (isAvailable(out) && !hasBeenPulled(in))
pull(in)
}
}.invoke(rsp)
}
case _ // not interested
}
}
setHandlers(in, out, this)
}
}
/**
* INTERNAL API
*/
private[akka] class InboundHandshake(inboundContext: InboundContext) extends GraphStage[FlowShape[InboundEnvelope, InboundEnvelope]] {
val in: Inlet[InboundEnvelope] = Inlet("InboundHandshake.in")
val out: Outlet[InboundEnvelope] = Outlet("InboundHandshake.out")
override val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with InHandler with OutHandler {
import OutboundHandshake._
// InHandler
override def onPush(): Unit = {
grab(in) match {
case InboundEnvelope(_, _, HandshakeReq(from), _)
inboundContext.association(from.address).completeRemoteAddress(from)
inboundContext.sendReply(from.address, HandshakeRsp(inboundContext.localAddress))
pull(in)
case other
push(out, other)
}
}
// OutHandler
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
}

View file

@ -0,0 +1,84 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import akka.actor.ActorRef
import akka.actor.ActorSelection
import akka.actor.ActorSelectionMessage
import akka.actor.Address
import akka.actor.ExtendedActorSystem
import akka.actor.InternalActorRef
import akka.actor.LocalRef
import akka.actor.PossiblyHarmful
import akka.actor.RepointableRef
import akka.dispatch.sysmsg.SystemMessage
import akka.event.Logging
import akka.remote.RemoteActorRefProvider
import akka.remote.RemoteRef
/**
* INTERNAL API
*/
private[akka] class MessageDispatcher(
system: ExtendedActorSystem,
provider: RemoteActorRefProvider) {
private val remoteDaemon = provider.remoteDaemon
private val log = Logging(system.eventStream, getClass.getName)
def dispatch(recipient: InternalActorRef,
recipientAddress: Address,
message: AnyRef,
senderOption: Option[ActorRef]): Unit = {
import provider.remoteSettings._
val sender: ActorRef = senderOption.getOrElse(system.deadLetters)
val originalReceiver = recipient.path
def msgLog = s"RemoteMessage: [$message] to [$recipient]<+[$originalReceiver] from [$sender()]"
recipient match {
case `remoteDaemon`
if (UntrustedMode) log.debug("dropping daemon message in untrusted mode")
else {
if (LogReceive) log.debug("received daemon message {}", msgLog)
remoteDaemon ! message
}
case l @ (_: LocalRef | _: RepointableRef) if l.isLocal
if (LogReceive) log.debug("received local message {}", msgLog)
message match {
case sel: ActorSelectionMessage
if (UntrustedMode && (!TrustedSelectionPaths.contains(sel.elements.mkString("/", "/", "")) ||
sel.msg.isInstanceOf[PossiblyHarmful] || l != provider.rootGuardian))
log.debug("operating in UntrustedMode, dropping inbound actor selection to [{}], " +
"allow it by adding the path to 'akka.remote.trusted-selection-paths' configuration",
sel.elements.mkString("/", "/", ""))
else
// run the receive logic for ActorSelectionMessage here to make sure it is not stuck on busy user actor
ActorSelection.deliverSelection(l, sender, sel)
case msg: PossiblyHarmful if UntrustedMode
log.debug("operating in UntrustedMode, dropping inbound PossiblyHarmful message of type [{}]", msg.getClass.getName)
case msg: SystemMessage l.sendSystemMessage(msg)
case msg l.!(msg)(sender)
}
case r @ (_: RemoteRef | _: RepointableRef) if !r.isLocal && !UntrustedMode
if (LogReceive) log.debug("received remote-destined message {}", msgLog)
if (provider.transport.addresses(recipientAddress))
// if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed)
r.!(message)(sender)
else
log.error("dropping message [{}] for non-local recipient [{}] arriving at [{}] inbound addresses are [{}]",
message.getClass, r, recipientAddress, provider.transport.addresses.mkString(", "))
case r log.error("dropping message [{}] for unknown recipient [{}] arriving at [{}] inbound addresses are [{}]",
message.getClass, r, recipientAddress, provider.transport.addresses.mkString(", "))
}
}
}

View file

@ -0,0 +1,97 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import scala.concurrent.Future
import scala.concurrent.Promise
import akka.Done
import akka.stream.Attributes
import akka.stream.FlowShape
import akka.stream.Inlet
import akka.stream.Outlet
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.GraphStageWithMaterializedValue
import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler
/**
* Marker trait for reply messages
*/
trait Reply extends ControlMessage
/**
* Marker trait for control messages that can be sent via the system message sub-channel
* but don't need full reliable delivery. E.g. `HandshakeReq` and `Reply`.
*/
trait ControlMessage
/**
* INTERNAL API
*/
private[akka] object ReplyJunction {
private[akka] trait ReplySubject {
def attach(observer: ReplyObserver): Future[Done]
def detach(observer: ReplyObserver): Unit
def stopped: Future[Done]
}
private[akka] trait ReplyObserver {
def reply(inboundEnvelope: InboundEnvelope): Unit
}
}
/**
* INTERNAL API
*/
private[akka] class ReplyJunction
extends GraphStageWithMaterializedValue[FlowShape[InboundEnvelope, InboundEnvelope], ReplyJunction.ReplySubject] {
import ReplyJunction._
val in: Inlet[InboundEnvelope] = Inlet("ReplyJunction.in")
val out: Outlet[InboundEnvelope] = Outlet("ReplyJunction.out")
override val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val logic = new GraphStageLogic(shape) with InHandler with OutHandler with ReplySubject {
private var replyObservers: Vector[ReplyObserver] = Vector.empty
private val stoppedPromise = Promise[Done]()
override def postStop(): Unit = stoppedPromise.success(Done)
// InHandler
override def onPush(): Unit = {
grab(in) match {
case env @ InboundEnvelope(_, _, reply: Reply, _)
replyObservers.foreach(_.reply(env))
pull(in)
case env
push(out, env)
}
}
// OutHandler
override def onPull(): Unit = pull(in)
override def attach(observer: ReplyObserver): Future[Done] = {
val p = Promise[Done]()
getAsyncCallback[Unit](_ {
replyObservers :+= observer
p.success(Done)
}).invoke(())
p.future
}
override def detach(observer: ReplyObserver): Unit = {
replyObservers = replyObservers.filterNot(_ == observer)
}
override def stopped: Future[Done] = stoppedPromise.future
setHandlers(in, out, this)
}
(logic, logic)
}
}

View file

@ -6,26 +6,21 @@ package akka.remote.artery
import java.util.ArrayDeque
import scala.annotation.tailrec
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import akka.Done
import akka.actor.ActorRef
import akka.actor.Address
import akka.remote.EndpointManager.Send
import akka.remote.artery.Transport.InboundEnvelope
import akka.remote.UniqueAddress
import akka.remote.artery.ReplyJunction.ReplyObserver
import akka.stream.Attributes
import akka.stream.FlowShape
import akka.stream.Inlet
import akka.stream.Outlet
import akka.stream.stage.AsyncCallback
import akka.stream.stage.GraphStage
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.GraphStageWithMaterializedValue
import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler
import akka.stream.stage.TimerGraphStageLogic
@ -35,10 +30,10 @@ import akka.stream.stage.TimerGraphStageLogic
*/
private[akka] object SystemMessageDelivery {
// FIXME serialization of these messages
final case class SystemMessageEnvelope(message: AnyRef, seqNo: Long, ackReplyTo: ActorRef)
sealed trait SystemMessageReply
final case class Ack(seq: Long, from: Address) extends SystemMessageReply
final case class Nack(seq: Long, from: Address) extends SystemMessageReply
// FIXME ackReplyTo should not be needed
final case class SystemMessageEnvelope(message: AnyRef, seqNo: Long, ackReplyTo: UniqueAddress)
final case class Ack(seqNo: Long, from: UniqueAddress) extends Reply
final case class Nack(seqNo: Long, from: UniqueAddress) extends Reply
private case object ResendTick
}
@ -47,49 +42,42 @@ private[akka] object SystemMessageDelivery {
* INTERNAL API
*/
private[akka] class SystemMessageDelivery(
replyJunction: SystemMessageReplyJunction.Junction,
resendInterval: FiniteDuration,
localAddress: Address,
remoteAddress: Address,
ackRecipient: ActorRef)
outboundContext: OutboundContext,
resendInterval: FiniteDuration)
extends GraphStage[FlowShape[Send, Send]] {
import SystemMessageDelivery._
import SystemMessageReplyJunction._
val in: Inlet[Send] = Inlet("SystemMessageDelivery.in")
val out: Outlet[Send] = Outlet("SystemMessageDelivery.out")
override val shape: FlowShape[Send, Send] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with InHandler with OutHandler {
new TimerGraphStageLogic(shape) with InHandler with OutHandler with ReplyObserver {
var registered = false
var seqNo = 0L // sequence number for the first message will be 1
val unacknowledged = new ArrayDeque[Send]
var resending = new ArrayDeque[Send]
var resendingFromSeqNo = -1L
var stopping = false
private var replyObserverAttached = false
private var seqNo = 0L // sequence number for the first message will be 1
private val unacknowledged = new ArrayDeque[Send]
private var resending = new ArrayDeque[Send]
private var resendingFromSeqNo = -1L
private var stopping = false
private def localAddress = outboundContext.localAddress
private def remoteAddress = outboundContext.remoteAddress
override def preStart(): Unit = {
this.schedulePeriodically(ResendTick, resendInterval)
def filter(env: InboundEnvelope): Boolean =
env.message match {
case Ack(_, from) if from == remoteAddress true
case Nack(_, from) if from == remoteAddress true
case _ false
}
implicit val ec = materializer.executionContext
replyJunction.addReplyInterest(filter, ackCallback).foreach {
outboundContext.replySubject.attach(this).foreach {
getAsyncCallback[Done] { _
registered = true
replyObserverAttached = true
if (isAvailable(out))
pull(in) // onPull from downstream already called
}.invoke
}
replyJunction.stopped.onComplete {
outboundContext.replySubject.stopped.onComplete {
getAsyncCallback[Try[Done]] {
// FIXME quarantine
case Success(_) completeStage()
@ -99,7 +87,7 @@ private[akka] class SystemMessageDelivery(
}
override def postStop(): Unit = {
replyJunction.removeReplyInterest(ackCallback)
outboundContext.replySubject.detach(this)
}
override def onUpstreamFinish(): Unit = {
@ -118,17 +106,25 @@ private[akka] class SystemMessageDelivery(
}
}
val ackCallback = getAsyncCallback[SystemMessageReply] { reply
reply match {
case Ack(n, _)
ack(n)
case Nack(n, _)
ack(n)
if (n > resendingFromSeqNo)
// ReplyObserver, external call
override def reply(inboundEnvelope: InboundEnvelope): Unit = {
inboundEnvelope.message match {
case ack: Ack if (ack.from.address == remoteAddress) ackCallback.invoke(ack)
case nack: Nack if (nack.from.address == remoteAddress) nackCallback.invoke(nack)
case _ // not interested
}
}
private val ackCallback = getAsyncCallback[Ack] { reply
ack(reply.seqNo)
}
private val nackCallback = getAsyncCallback[Nack] { reply
ack(reply.seqNo)
if (reply.seqNo > resendingFromSeqNo)
resending = unacknowledged.clone()
tryResend()
}
}
private def ack(n: Long): Unit = {
if (n > seqNo)
@ -155,7 +151,7 @@ private[akka] class SystemMessageDelivery(
// InHandler
override def onPush(): Unit = {
grab(in) match {
case s @ Send(reply: SystemMessageReply, _, _, _)
case s @ Send(reply: ControlMessage, _, _, _)
// pass through
if (isAvailable(out))
push(out, s)
@ -166,7 +162,7 @@ private[akka] class SystemMessageDelivery(
case s @ Send(msg: AnyRef, _, _, _)
seqNo += 1
val sendMsg = s.copy(message = SystemMessageEnvelope(msg, seqNo, ackRecipient))
val sendMsg = s.copy(message = SystemMessageEnvelope(msg, seqNo, localAddress))
// FIXME quarantine if unacknowledged is full
unacknowledged.offer(sendMsg)
if (resending.isEmpty && isAvailable(out))
@ -180,7 +176,7 @@ private[akka] class SystemMessageDelivery(
// OutHandler
override def onPull(): Unit = {
if (registered) { // otherwise it will be pulled after replyJunction.addReplyInterest
if (replyObserverAttached) { // otherwise it will be pulled after attached
if (resending.isEmpty && !hasBeenPulled(in) && !stopping)
pull(in)
else
@ -195,7 +191,7 @@ private[akka] class SystemMessageDelivery(
/**
* INTERNAL API
*/
private[akka] class SystemMessageAcker(localAddress: Address) extends GraphStage[FlowShape[InboundEnvelope, InboundEnvelope]] {
private[akka] class SystemMessageAcker(inboundContext: InboundContext) extends GraphStage[FlowShape[InboundEnvelope, InboundEnvelope]] {
import SystemMessageDelivery._
val in: Inlet[InboundEnvelope] = Inlet("SystemMessageAcker.in")
@ -207,20 +203,22 @@ private[akka] class SystemMessageAcker(localAddress: Address) extends GraphStage
var seqNo = 1L
def localAddress = inboundContext.localAddress
// InHandler
override def onPush(): Unit = {
grab(in) match {
case env @ InboundEnvelope(_, _, sysEnv @ SystemMessageEnvelope(_, n, ackReplyTo), _)
if (n == seqNo) {
ackReplyTo.tell(Ack(n, localAddress), ActorRef.noSender)
inboundContext.sendReply(ackReplyTo.address, Ack(n, localAddress))
seqNo += 1
val unwrapped = env.copy(message = sysEnv.message)
push(out, unwrapped)
} else if (n < seqNo) {
ackReplyTo.tell(Ack(n, localAddress), ActorRef.noSender)
inboundContext.sendReply(ackReplyTo.address, Ack(n, localAddress))
pull(in)
} else {
ackReplyTo.tell(Nack(seqNo - 1, localAddress), ActorRef.noSender)
inboundContext.sendReply(ackReplyTo.address, Nack(seqNo - 1, localAddress))
pull(in)
}
case env
@ -237,74 +235,3 @@ private[akka] class SystemMessageAcker(localAddress: Address) extends GraphStage
}
}
/**
* INTERNAL API
*/
private[akka] object SystemMessageReplyJunction {
import SystemMessageDelivery._
trait Junction {
def addReplyInterest(filter: InboundEnvelope Boolean, replyCallback: AsyncCallback[SystemMessageReply]): Future[Done]
def removeReplyInterest(callback: AsyncCallback[SystemMessageReply]): Unit
def stopped: Future[Done]
}
}
/**
* INTERNAL API
*/
private[akka] class SystemMessageReplyJunction
extends GraphStageWithMaterializedValue[FlowShape[InboundEnvelope, InboundEnvelope], SystemMessageReplyJunction.Junction] {
import SystemMessageReplyJunction._
import SystemMessageDelivery._
val in: Inlet[InboundEnvelope] = Inlet("SystemMessageReplyJunction.in")
val out: Outlet[InboundEnvelope] = Outlet("SystemMessageReplyJunction.out")
override val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val logic = new GraphStageLogic(shape) with InHandler with OutHandler with Junction {
private var replyHandlers: Vector[(InboundEnvelope Boolean, AsyncCallback[SystemMessageReply])] = Vector.empty
private val stoppedPromise = Promise[Done]()
override def postStop(): Unit = stoppedPromise.success(Done)
// InHandler
override def onPush(): Unit = {
grab(in) match {
case env @ InboundEnvelope(_, _, reply: SystemMessageReply, _)
replyHandlers.foreach {
case (f, callback)
if (f(env))
callback.invoke(reply)
}
pull(in)
case env
push(out, env)
}
}
// OutHandler
override def onPull(): Unit = pull(in)
override def addReplyInterest(filter: InboundEnvelope Boolean, replyCallback: AsyncCallback[SystemMessageReply]): Future[Done] = {
val p = Promise[Done]()
getAsyncCallback[Unit](_ {
replyHandlers :+= (filter -> replyCallback)
p.success(Done)
}).invoke(())
p.future
}
override def removeReplyInterest(callback: AsyncCallback[SystemMessageReply]): Unit = {
replyHandlers = replyHandlers.filterNot { case (_, c) c == callback }
}
override def stopped: Future[Done] = stoppedPromise.future
setHandlers(in, out, this)
}
(logic, logic)
}
}

View file

@ -1,281 +0,0 @@
/**
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import scala.concurrent.duration._
import akka.actor.Props
import scala.concurrent.duration._
import java.net.InetSocketAddress
import java.nio.ByteOrder
import akka.NotUsed
import akka.actor.{ Address, ExtendedActorSystem }
import akka.remote.EndpointManager.Send
import akka.remote.{ InboundMessageDispatcher, MessageSerializer, RemoteActorRefProvider }
import akka.remote.transport.AkkaPduCodec
import akka.serialization.Serialization
import akka.stream.{ KillSwitches, SharedKillSwitch }
import akka.stream.scaladsl.{ Flow, Framing, Sink, Source, Tcp }
import akka.util.{ ByteString, ByteStringBuilder }
import scala.concurrent.Future
import akka.Done
import akka.stream.Materializer
import scala.concurrent.Await
import akka.event.LoggingAdapter
import akka.event.Logging
import io.aeron.driver.MediaDriver
import io.aeron.Aeron
import org.agrona.ErrorHandler
import io.aeron.AvailableImageHandler
import io.aeron.Image
import io.aeron.UnavailableImageHandler
import io.aeron.exceptions.ConductorServiceTimeoutException
import akka.actor.LocalRef
import akka.actor.InternalActorRef
import akka.dispatch.sysmsg.SystemMessage
import akka.actor.PossiblyHarmful
import akka.actor.RepointableRef
import akka.actor.ActorSelectionMessage
import akka.remote.RemoteRef
import akka.actor.ActorSelection
import akka.actor.ActorRef
import akka.stream.scaladsl.Keep
/**
* INTERNAL API
*/
private[akka] object Transport {
// FIXME avoid allocating this envelope?
final case class InboundEnvelope(
recipient: InternalActorRef,
recipientAddress: Address,
message: AnyRef,
senderOption: Option[ActorRef])
}
/**
* INTERNAL API
*/
// FIXME: Replace the codec with a custom made, hi-perf one
private[akka] class Transport(
val localAddress: Address,
val system: ExtendedActorSystem,
val materializer: Materializer,
val provider: RemoteActorRefProvider,
val codec: AkkaPduCodec) {
import Transport._
private val log: LoggingAdapter = Logging(system.eventStream, getClass.getName)
private val remoteDaemon = provider.remoteDaemon
private implicit val mat = materializer
// TODO support port 0
private val inboundChannel = s"aeron:udp?endpoint=${localAddress.host.get}:${localAddress.port.get}"
private def outboundChannel(a: Address) = s"aeron:udp?endpoint=${a.host.get}:${a.port.get}"
private val systemMessageStreamId = 1
private val ordinaryStreamId = 3
private val systemMessageResendInterval: FiniteDuration = 1.second // FIXME config
private var systemMessageReplyJunction: SystemMessageReplyJunction.Junction = _
// Need an ActorRef that is passed in the `SystemMessageEnvelope.ackReplyTo`.
// Those messages are not actually handled by this actor, but intercepted by the
// SystemMessageReplyJunction stage.
private val systemMessageReplyRecepient = system.systemActorOf(Props.empty, "systemMessageReplyTo")
private val driver = {
// TODO also support external media driver
val driverContext = new MediaDriver.Context
// FIXME settings from config
driverContext.clientLivenessTimeoutNs(SECONDS.toNanos(10))
driverContext.imageLivenessTimeoutNs(SECONDS.toNanos(10))
driverContext.driverTimeoutMs(SECONDS.toNanos(10))
MediaDriver.launchEmbedded(driverContext)
}
private val aeron = {
val ctx = new Aeron.Context
ctx.availableImageHandler(new AvailableImageHandler {
override def onAvailableImage(img: Image): Unit = {
if (log.isDebugEnabled)
log.debug(s"onAvailableImage from ${img.sourceIdentity} session ${img.sessionId}")
}
})
ctx.unavailableImageHandler(new UnavailableImageHandler {
override def onUnavailableImage(img: Image): Unit = {
if (log.isDebugEnabled)
log.debug(s"onUnavailableImage from ${img.sourceIdentity} session ${img.sessionId}")
// FIXME we should call FragmentAssembler.freeSessionBuffer when image is unavailable
}
})
ctx.errorHandler(new ErrorHandler {
override def onError(cause: Throwable): Unit = {
cause match {
case e: ConductorServiceTimeoutException
// Timeout between service calls
log.error(cause, s"Aeron ServiceTimeoutException, ${cause.getMessage}")
case _
log.error(cause, s"Aeron error, ${cause.getMessage}")
}
}
})
ctx.aeronDirectoryName(driver.aeronDirectoryName)
Aeron.connect(ctx)
}
private val taskRunner = new TaskRunner(system)
def start(): Unit = {
taskRunner.start()
systemMessageReplyJunction = Source.fromGraph(new AeronSource(inboundChannel, systemMessageStreamId, aeron, taskRunner))
.async // FIXME use dedicated dispatcher for AeronSource
.map(ByteString.apply) // TODO we should use ByteString all the way
.viaMat(inboundSystemMessageFlow)(Keep.right)
.to(Sink.ignore)
.run()
Source.fromGraph(new AeronSource(inboundChannel, ordinaryStreamId, aeron, taskRunner))
.async // FIXME use dedicated dispatcher for AeronSource
.map(ByteString.apply) // TODO we should use ByteString all the way
.via(inboundFlow)
.runWith(Sink.ignore)
}
def shutdown(): Future[Done] = {
// FIXME stop the AeronSource first?
taskRunner.stop()
aeron.close()
driver.close()
Future.successful(Done)
}
val killSwitch: SharedKillSwitch = KillSwitches.shared("transportKillSwitch")
def outbound(remoteAddress: Address): Sink[Send, Any] = {
Flow.fromGraph(killSwitch.flow[Send])
.via(encoder)
.map(_.toArray) // TODO we should use ByteString all the way
.to(new AeronSink(outboundChannel(remoteAddress), ordinaryStreamId, aeron, taskRunner))
}
def outboundSystemMessage(remoteAddress: Address): Sink[Send, Any] = {
Flow.fromGraph(killSwitch.flow[Send])
.via(new SystemMessageDelivery(systemMessageReplyJunction, systemMessageResendInterval,
localAddress, remoteAddress, systemMessageReplyRecepient))
.via(encoder)
.map(_.toArray) // TODO we should use ByteString all the way
.to(new AeronSink(outboundChannel(remoteAddress), systemMessageStreamId, aeron, taskRunner))
}
// TODO: Try out parallelized serialization (mapAsync) for performance
val encoder: Flow[Send, ByteString, NotUsed] = Flow[Send].map { sendEnvelope
val pdu: ByteString = codec.constructMessage(
sendEnvelope.recipient.localAddressToUse,
sendEnvelope.recipient,
Serialization.currentTransportInformation.withValue(Serialization.Information(localAddress, system)) {
MessageSerializer.serialize(system, sendEnvelope.message.asInstanceOf[AnyRef])
},
sendEnvelope.senderOption,
seqOption = None, // FIXME: Acknowledgements will be handled differently I just reused the old codec
ackOption = None)
// TODO: Drop unserializable messages
// TODO: Drop oversized messages
(new ByteStringBuilder).putInt(pdu.size)(ByteOrder.LITTLE_ENDIAN).result() ++ pdu
}
val decoder: Flow[ByteString, AkkaPduCodec.Message, NotUsed] =
Framing.lengthField(4, maximumFrameLength = 256000)
.map { frame
// TODO: Drop unserializable messages
val pdu = codec.decodeMessage(frame.drop(4), provider, localAddress)._2.get
pdu
}
val messageDispatcher: Sink[InboundEnvelope, Future[Done]] = Sink.foreach[InboundEnvelope] { m
dispatchInboundMessage(m.recipient, m.recipientAddress, m.message, m.senderOption)
}
val deserializer: Flow[AkkaPduCodec.Message, InboundEnvelope, NotUsed] =
Flow[AkkaPduCodec.Message].map { m
InboundEnvelope(
m.recipient,
m.recipientAddress,
MessageSerializer.deserialize(system, m.serializedMessage),
m.senderOption)
}
val inboundFlow: Flow[ByteString, ByteString, NotUsed] = {
Flow.fromSinkAndSource(
decoder.via(deserializer).to(messageDispatcher),
Source.maybe[ByteString].via(killSwitch.flow))
}
val inboundSystemMessageFlow: Flow[ByteString, ByteString, SystemMessageReplyJunction.Junction] = {
Flow.fromSinkAndSourceMat(
decoder.via(deserializer)
.via(new SystemMessageAcker(localAddress))
.viaMat(new SystemMessageReplyJunction)(Keep.right)
.to(messageDispatcher),
Source.maybe[ByteString].via(killSwitch.flow))((a, b) a)
}
private def dispatchInboundMessage(recipient: InternalActorRef,
recipientAddress: Address,
message: AnyRef,
senderOption: Option[ActorRef]): Unit = {
import provider.remoteSettings._
val sender: ActorRef = senderOption.getOrElse(system.deadLetters)
val originalReceiver = recipient.path
def msgLog = s"RemoteMessage: [$message] to [$recipient]<+[$originalReceiver] from [$sender()]"
recipient match {
case `remoteDaemon`
if (UntrustedMode) log.debug("dropping daemon message in untrusted mode")
else {
if (LogReceive) log.debug("received daemon message {}", msgLog)
remoteDaemon ! message
}
case l @ (_: LocalRef | _: RepointableRef) if l.isLocal
if (LogReceive) log.debug("received local message {}", msgLog)
message match {
case sel: ActorSelectionMessage
if (UntrustedMode && (!TrustedSelectionPaths.contains(sel.elements.mkString("/", "/", "")) ||
sel.msg.isInstanceOf[PossiblyHarmful] || l != provider.rootGuardian))
log.debug("operating in UntrustedMode, dropping inbound actor selection to [{}], " +
"allow it by adding the path to 'akka.remote.trusted-selection-paths' configuration",
sel.elements.mkString("/", "/", ""))
else
// run the receive logic for ActorSelectionMessage here to make sure it is not stuck on busy user actor
ActorSelection.deliverSelection(l, sender, sel)
case msg: PossiblyHarmful if UntrustedMode
log.debug("operating in UntrustedMode, dropping inbound PossiblyHarmful message of type [{}]", msg.getClass.getName)
case msg: SystemMessage l.sendSystemMessage(msg)
case msg l.!(msg)(sender)
}
case r @ (_: RemoteRef | _: RepointableRef) if !r.isLocal && !UntrustedMode
if (LogReceive) log.debug("received remote-destined message {}", msgLog)
if (provider.transport.addresses(recipientAddress))
// if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed)
r.!(message)(sender)
else
log.error("dropping message [{}] for non-local recipient [{}] arriving at [{}] inbound addresses are [{}]",
message.getClass, r, recipientAddress, provider.transport.addresses.mkString(", "))
case r log.error("dropping message [{}] for unknown recipient [{}] arriving at [{}] inbound addresses are [{}]",
message.getClass, r, recipientAddress, provider.transport.addresses.mkString(", "))
}
}
}

View file

@ -3,37 +3,32 @@
*/
package akka.remote.artery
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
import java.util.concurrent.ThreadLocalRandom
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.Done
import akka.NotUsed
import akka.actor.Actor
import akka.actor.ActorIdentity
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Address
import akka.actor.ExtendedActorSystem
import akka.actor.Identify
import akka.actor.InternalActorRef
import akka.actor.PoisonPill
import akka.actor.Props
import akka.actor.RootActorPath
import akka.actor.Stash
import akka.remote.AddressUidExtension
import akka.remote.EndpointManager.Send
import akka.remote.RemoteActorRef
import akka.remote.UniqueAddress
import akka.remote.artery.SystemMessageDelivery._
import akka.remote.artery.Transport.InboundEnvelope
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import akka.stream.ThrottleMode
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.stream.stage.AsyncCallback
import akka.stream.testkit.TestSubscriber
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender
@ -60,86 +55,60 @@ object SystemMessageDeliverySpec {
val configB = ConfigFactory.parseString(s"akka.remote.artery.port = $portB")
.withFallback(commonConfig)
class TestReplyJunction(sendCallbackTo: ActorRef) extends SystemMessageReplyJunction.Junction {
class ManualReplyInboundContext(
replyProbe: ActorRef,
localAddress: UniqueAddress,
replySubject: TestReplySubject) extends TestInboundContext(localAddress, replySubject) {
def addReplyInterest(filter: InboundEnvelope Boolean, replyCallback: AsyncCallback[SystemMessageReply]): Future[Done] = {
sendCallbackTo ! replyCallback
Future.successful(Done)
private var lastReply: Option[(Address, ControlMessage)] = None
override def sendReply(to: Address, message: ControlMessage) = {
lastReply = Some((to, message))
replyProbe ! message
}
override def removeReplyInterest(callback: AsyncCallback[SystemMessageReply]): Unit = ()
override def stopped: Future[Done] = Promise[Done]().future
}
def replyConnectorProps(dropRate: Double): Props =
Props(new ReplyConnector(dropRate))
class ReplyConnector(dropRate: Double) extends Actor with Stash {
override def receive = {
case callback: AsyncCallback[SystemMessageReply] @unchecked
context.become(active(callback))
unstashAll()
case _ stash()
}
def active(callback: AsyncCallback[SystemMessageReply]): Receive = {
case reply: SystemMessageReply
if (ThreadLocalRandom.current().nextDouble() >= dropRate)
callback.invoke(reply)
def deliverLastReply(): Unit = {
lastReply.foreach { case (to, message) super.sendReply(to, message) }
lastReply = None
}
}
}
class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.commonConfig) with ImplicitSender {
import SystemMessageDeliverySpec._
val addressA = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
val addressA = UniqueAddress(
system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress,
AddressUidExtension(system).addressUid)
val systemB = ActorSystem("systemB", configB)
val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
val rootB = RootActorPath(addressB)
val addressB = UniqueAddress(
systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress,
AddressUidExtension(systemB).addressUid)
val rootB = RootActorPath(addressB.address)
val matSettings = ActorMaterializerSettings(system).withFuzzing(true)
implicit val mat = ActorMaterializer(matSettings)(system)
override def afterTermination(): Unit = shutdown(systemB)
def setupManualCallback(ackRecipient: ActorRef, resendInterval: FiniteDuration,
dropSeqNumbers: Vector[Long], sendCount: Int): (TestSubscriber.Probe[String], AsyncCallback[SystemMessageReply]) = {
val callbackProbe = TestProbe()
val replyJunction = new TestReplyJunction(callbackProbe.ref)
val sink =
send(sendCount, resendInterval, replyJunction, ackRecipient)
.via(drop(dropSeqNumbers))
.via(inbound)
.map(_.message.asInstanceOf[String])
.runWith(TestSink.probe)
val callback = callbackProbe.expectMsgType[AsyncCallback[SystemMessageReply]]
(sink, callback)
}
def send(sendCount: Int, resendInterval: FiniteDuration, replyJunction: SystemMessageReplyJunction.Junction,
ackRecipient: ActorRef): Source[Send, NotUsed] = {
private def send(sendCount: Int, resendInterval: FiniteDuration, outboundContext: OutboundContext): Source[Send, NotUsed] = {
val remoteRef = null.asInstanceOf[RemoteActorRef] // not used
Source(1 to sendCount)
.map(n Send("msg-" + n, None, remoteRef, None))
.via(new SystemMessageDelivery(replyJunction, resendInterval, addressA, addressB, ackRecipient))
.via(new SystemMessageDelivery(outboundContext, resendInterval))
}
def inbound: Flow[Send, InboundEnvelope, NotUsed] = {
private def inbound(inboundContext: InboundContext): Flow[Send, InboundEnvelope, NotUsed] = {
val recipient = null.asInstanceOf[InternalActorRef] // not used
Flow[Send]
.map {
case Send(sysEnv: SystemMessageEnvelope, _, _, _)
InboundEnvelope(recipient, addressB, sysEnv, None)
InboundEnvelope(recipient, addressB.address, sysEnv, None)
}
.async
.via(new SystemMessageAcker(addressB))
.via(new SystemMessageAcker(inboundContext))
}
def drop(dropSeqNumbers: Vector[Long]): Flow[Send, Send, NotUsed] = {
private def drop(dropSeqNumbers: Vector[Long]): Flow[Send, Send, NotUsed] = {
Flow[Send]
.statefulMapConcat(() {
var dropping = dropSeqNumbers
@ -156,7 +125,7 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.commo
})
}
def randomDrop[T](dropRate: Double): Flow[T, T, NotUsed] = Flow[T].mapConcat { elem
private def randomDrop[T](dropRate: Double): Flow[T, T, NotUsed] = Flow[T].mapConcat { elem
if (ThreadLocalRandom.current().nextDouble() < dropRate) Nil
else List(elem)
}
@ -177,83 +146,108 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.commo
}
"be resent when some in the middle are lost" in {
val ackRecipient = TestProbe()
val (sink, replyCallback) =
setupManualCallback(ackRecipient.ref, resendInterval = 60.seconds, dropSeqNumbers = Vector(3L, 4L), sendCount = 5)
val replyProbe = TestProbe()
val replySubject = new TestReplySubject
val inboundContextB = new ManualReplyInboundContext(replyProbe.ref, addressB, replySubject)
val inboundContextA = new TestInboundContext(addressB, replySubject)
val outboundContextA = inboundContextA.association(addressB.address)
val sink = send(sendCount = 5, resendInterval = 60.seconds, outboundContextA)
.via(drop(dropSeqNumbers = Vector(3L, 4L)))
.via(inbound(inboundContextB))
.map(_.message.asInstanceOf[String])
.runWith(TestSink.probe)
sink.request(100)
sink.expectNext("msg-1")
sink.expectNext("msg-2")
ackRecipient.expectMsg(Ack(1L, addressB))
ackRecipient.expectMsg(Ack(2L, addressB))
replyProbe.expectMsg(Ack(1L, addressB))
replyProbe.expectMsg(Ack(2L, addressB))
// 3 and 4 was dropped
ackRecipient.expectMsg(Nack(2L, addressB))
replyProbe.expectMsg(Nack(2L, addressB))
sink.expectNoMsg(100.millis) // 3 was dropped
replyCallback.invoke(Nack(2L, addressB))
inboundContextB.deliverLastReply()
// resending 3, 4, 5
sink.expectNext("msg-3")
ackRecipient.expectMsg(Ack(3L, addressB))
replyProbe.expectMsg(Ack(3L, addressB))
sink.expectNext("msg-4")
ackRecipient.expectMsg(Ack(4L, addressB))
replyProbe.expectMsg(Ack(4L, addressB))
sink.expectNext("msg-5")
ackRecipient.expectMsg(Ack(5L, addressB))
ackRecipient.expectNoMsg(100.millis)
replyCallback.invoke(Ack(5L, addressB))
replyProbe.expectMsg(Ack(5L, addressB))
replyProbe.expectNoMsg(100.millis)
inboundContextB.deliverLastReply()
sink.expectComplete()
}
"be resent when first is lost" in {
val ackRecipient = TestProbe()
val (sink, replyCallback) =
setupManualCallback(ackRecipient.ref, resendInterval = 60.seconds, dropSeqNumbers = Vector(1L), sendCount = 3)
val replyProbe = TestProbe()
val replySubject = new TestReplySubject
val inboundContextB = new ManualReplyInboundContext(replyProbe.ref, addressB, replySubject)
val inboundContextA = new TestInboundContext(addressB, replySubject)
val outboundContextA = inboundContextA.association(addressB.address)
val sink = send(sendCount = 3, resendInterval = 60.seconds, outboundContextA)
.via(drop(dropSeqNumbers = Vector(1L)))
.via(inbound(inboundContextB))
.map(_.message.asInstanceOf[String])
.runWith(TestSink.probe)
sink.request(100)
ackRecipient.expectMsg(Nack(0L, addressB)) // from receiving 2
ackRecipient.expectMsg(Nack(0L, addressB)) // from receiving 3
replyProbe.expectMsg(Nack(0L, addressB)) // from receiving 2
replyProbe.expectMsg(Nack(0L, addressB)) // from receiving 3
sink.expectNoMsg(100.millis) // 1 was dropped
replyCallback.invoke(Nack(0L, addressB))
replyCallback.invoke(Nack(0L, addressB))
inboundContextB.deliverLastReply() // it's ok to not delivery all nacks
// resending 1, 2, 3
sink.expectNext("msg-1")
ackRecipient.expectMsg(Ack(1L, addressB))
replyProbe.expectMsg(Ack(1L, addressB))
sink.expectNext("msg-2")
ackRecipient.expectMsg(Ack(2L, addressB))
replyProbe.expectMsg(Ack(2L, addressB))
sink.expectNext("msg-3")
ackRecipient.expectMsg(Ack(3L, addressB))
replyCallback.invoke(Ack(3L, addressB))
replyProbe.expectMsg(Ack(3L, addressB))
inboundContextB.deliverLastReply()
sink.expectComplete()
}
"be resent when last is lost" in {
val ackRecipient = TestProbe()
val (sink, replyCallback) =
setupManualCallback(ackRecipient.ref, resendInterval = 1.second, dropSeqNumbers = Vector(3L), sendCount = 3)
val replyProbe = TestProbe()
val replySubject = new TestReplySubject
val inboundContextB = new ManualReplyInboundContext(replyProbe.ref, addressB, replySubject)
val inboundContextA = new TestInboundContext(addressB, replySubject)
val outboundContextA = inboundContextA.association(addressB.address)
val sink = send(sendCount = 3, resendInterval = 1.seconds, outboundContextA)
.via(drop(dropSeqNumbers = Vector(3L)))
.via(inbound(inboundContextB))
.map(_.message.asInstanceOf[String])
.runWith(TestSink.probe)
sink.request(100)
sink.expectNext("msg-1")
ackRecipient.expectMsg(Ack(1L, addressB))
replyCallback.invoke(Ack(1L, addressB))
replyProbe.expectMsg(Ack(1L, addressB))
inboundContextB.deliverLastReply()
sink.expectNext("msg-2")
ackRecipient.expectMsg(Ack(2L, addressB))
replyCallback.invoke(Ack(2L, addressB))
replyProbe.expectMsg(Ack(2L, addressB))
inboundContextB.deliverLastReply()
sink.expectNoMsg(200.millis) // 3 was dropped
// resending 3 due to timeout
sink.expectNext("msg-3")
ackRecipient.expectMsg(Ack(3L, addressB))
replyCallback.invoke(Ack(3L, addressB))
replyProbe.expectMsg(Ack(3L, addressB))
inboundContextB.deliverLastReply()
sink.expectComplete()
}
"deliver all during stress and random dropping" in {
val N = 10000
val dropRate = 0.1
val replyConnector = system.actorOf(replyConnectorProps(dropRate))
val replyJunction = new TestReplyJunction(replyConnector)
val replySubject = new TestReplySubject
val inboundContextB = new TestInboundContext(addressB, replySubject, replyDropRate = dropRate)
val inboundContextA = new TestInboundContext(addressB, replySubject)
val outboundContextA = inboundContextA.association(addressB.address)
val output =
send(N, 1.second, replyJunction, replyConnector)
send(N, 1.second, outboundContextA)
.via(randomDrop(dropRate))
.via(inbound)
.via(inbound(inboundContextB))
.map(_.message.asInstanceOf[String])
.runWith(Sink.seq)
@ -263,14 +257,16 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.commo
"deliver all during throttling and random dropping" in {
val N = 500
val dropRate = 0.1
val replyConnector = system.actorOf(replyConnectorProps(dropRate))
val replyJunction = new TestReplyJunction(replyConnector)
val replySubject = new TestReplySubject
val inboundContextB = new TestInboundContext(addressB, replySubject, replyDropRate = dropRate)
val inboundContextA = new TestInboundContext(addressB, replySubject)
val outboundContextA = inboundContextA.association(addressB.address)
val output =
send(N, 1.second, replyJunction, replyConnector)
send(N, 1.second, outboundContextA)
.throttle(200, 1.second, 10, ThrottleMode.shaping)
.via(randomDrop(dropRate))
.via(inbound)
.via(inbound(inboundContextB))
.map(_.message.asInstanceOf[String])
.runWith(Sink.seq)

View file

@ -0,0 +1,79 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import akka.remote.UniqueAddress
import akka.actor.Address
import scala.concurrent.Future
import akka.remote.artery.ReplyJunction.ReplySubject
import akka.remote.RemoteActorRef
import scala.concurrent.Promise
import akka.Done
import akka.remote.artery.ReplyJunction.ReplyObserver
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ThreadLocalRandom
private[akka] class TestInboundContext(
override val localAddress: UniqueAddress,
val replySubject: TestReplySubject = new TestReplySubject,
replyDropRate: Double = 0.0) extends InboundContext {
private val associations = new ConcurrentHashMap[Address, OutboundContext]
def sendReply(to: Address, message: ControlMessage) = {
if (ThreadLocalRandom.current().nextDouble() >= replyDropRate)
replySubject.sendReply(InboundEnvelope(null, to, message, None))
}
def association(remoteAddress: Address): OutboundContext =
associations.get(remoteAddress) match {
case null
val a = new TestOutboundContext(localAddress, remoteAddress, replySubject)
associations.putIfAbsent(remoteAddress, a) match {
case null a
case existing existing
}
case existing existing
}
protected def createAssociation(remoteAddress: Address): OutboundContext =
new TestOutboundContext(localAddress, remoteAddress, replySubject)
}
private[akka] class TestOutboundContext(
override val localAddress: UniqueAddress,
override val remoteAddress: Address,
override val replySubject: TestReplySubject) extends OutboundContext {
private val _uniqueRemoteAddress = Promise[UniqueAddress]()
def uniqueRemoteAddress: Future[UniqueAddress] = _uniqueRemoteAddress.future
def completeRemoteAddress(a: UniqueAddress): Unit = _uniqueRemoteAddress.trySuccess(a)
// FIXME we should be able to Send without a recipient ActorRef
def dummyRecipient: RemoteActorRef = null
}
private[akka] class TestReplySubject extends ReplySubject {
private var replyObservers = new CopyOnWriteArrayList[ReplyObserver]
override def attach(observer: ReplyObserver): Future[Done] = {
replyObservers.add(observer)
Future.successful(Done)
}
override def detach(observer: ReplyObserver): Unit = {
replyObservers.remove(observer)
}
override def stopped: Future[Done] = Promise[Done]().future
def sendReply(env: InboundEnvelope): Unit = {
val iter = replyObservers.iterator()
while (iter.hasNext())
iter.next().reply(env)
}
}