pekko/akka-remote/src/main/scala/akka/remote/artery/Association.scala

92 lines
3.5 KiB
Scala
Raw Normal View History

/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import scala.concurrent.Future
import scala.concurrent.Promise
import akka.actor.ActorRef
import akka.actor.Address
import akka.actor.RootActorPath
import akka.dispatch.sysmsg.SystemMessage
import akka.remote.EndpointManager.Send
import akka.remote.RemoteActorRef
import akka.remote.UniqueAddress
2016-05-12 08:56:28 +02:00
import akka.remote.artery.InboundControlJunction.ControlMessageSubject
import akka.stream.Materializer
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.SourceQueueWithComplete
2016-05-12 08:56:28 +02:00
import akka.remote.artery.OutboundControlJunction.OutboundControlIngress
import akka.stream.scaladsl.Keep
/**
* INTERNAL API
*
* Thread-safe, mutable holder for association state. Main entry point for remote destined message to a specific
* remote address.
*/
private[akka] class Association(
val transport: ArteryTransport,
val materializer: Materializer,
override val remoteAddress: Address,
2016-05-12 08:56:28 +02:00
override val controlSubject: ControlMessageSubject) extends OutboundContext {
@volatile private[this] var queue: SourceQueueWithComplete[Send] = _
@volatile private[this] var systemMessageQueue: SourceQueueWithComplete[Send] = _
2016-05-12 08:56:28 +02:00
@volatile private[this] var _outboundControlIngress: OutboundControlIngress = _
2016-05-12 08:56:28 +02:00
def outboundControlIngress: OutboundControlIngress = {
if (_outboundControlIngress eq null)
throw new IllegalStateException("outboundControlIngress not initialized yet")
_outboundControlIngress
}
override def localAddress: UniqueAddress = transport.localAddress
// FIXME we also need to be able to switch to new uid
private val _uniqueRemoteAddress = Promise[UniqueAddress]()
override def uniqueRemoteAddress: Future[UniqueAddress] = _uniqueRemoteAddress.future
override def completeRemoteAddress(a: UniqueAddress): Unit = {
require(a.address == remoteAddress, s"Wrong UniqueAddress got [$a.address], expected [$remoteAddress]")
_uniqueRemoteAddress.trySuccess(a)
}
def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = {
// TODO: lookup subchannel
// FIXME: Use a different envelope than the old Send, but make sure the new is handled by deadLetters properly
message match {
case _: SystemMessage
implicit val ec = materializer.executionContext
systemMessageQueue.offer(Send(message, senderOption, recipient, None)).onFailure {
case e
// FIXME proper error handling, and quarantining
println(s"# System message dropped, due to $e") // FIXME
}
case _
queue.offer(Send(message, senderOption, recipient, None))
}
}
// FIXME we should be able to Send without a recipient ActorRef
override val dummyRecipient: RemoteActorRef =
transport.provider.resolveActorRef(RootActorPath(remoteAddress) / "system" / "dummy").asInstanceOf[RemoteActorRef]
def quarantine(uid: Option[Int]): Unit = ()
// Idempotent
def associate(): Unit = {
// FIXME detect and handle stream failure, e.g. handshake timeout
if (queue eq null)
queue = Source.queue(256, OverflowStrategy.dropBuffer)
.to(transport.outbound(this)).run()(materializer)
if (systemMessageQueue eq null) {
val (q, control) = Source.queue(256, OverflowStrategy.dropBuffer)
2016-05-12 08:56:28 +02:00
.toMat(transport.outboundControl(this))(Keep.both)
.run()(materializer)
systemMessageQueue = q
2016-05-12 08:56:28 +02:00
_outboundControlIngress = control
}
}
}