2016-05-09 07:31:41 +02:00
/* *
* Copyright ( C ) 2016 Lightbend Inc . < http : //www.lightbend.com>
*/
package akka.remote.artery
2016-06-02 07:21:32 +02:00
import java.util.concurrent.CopyOnWriteArrayList
2016-06-10 13:04:23 +02:00
import java.util.concurrent.TimeUnit
2016-06-23 11:58:54 +02:00
import akka.remote.artery.compress.CompressionProtocol.CompressionMessage
2016-06-02 07:21:32 +02:00
import scala.collection.JavaConverters._
2016-05-09 07:31:41 +02:00
import scala.concurrent.Future
2016-05-13 08:06:13 +02:00
import scala.concurrent.Promise
2016-05-09 07:31:41 +02:00
import scala.concurrent.duration._
2016-05-13 08:06:13 +02:00
import scala.util.Failure
import scala.util.Success
2016-05-17 17:34:57 +02:00
import scala.util.Try
2016-05-09 07:31:41 +02:00
import akka.Done
import akka.NotUsed
2016-06-23 11:58:54 +02:00
import akka.actor._
2016-06-02 07:21:32 +02:00
import akka.actor.Cancellable
2016-05-09 07:31:41 +02:00
import akka.event.Logging
import akka.event.LoggingAdapter
import akka.remote.AddressUidExtension
import akka.remote.EndpointManager.Send
2016-05-13 15:34:37 +02:00
import akka.remote.EventPublisher
2016-05-09 07:31:41 +02:00
import akka.remote.RemoteActorRef
import akka.remote.RemoteActorRefProvider
2016-06-02 07:21:32 +02:00
import akka.remote.RemoteSettings
2016-05-09 07:31:41 +02:00
import akka.remote.RemoteTransport
2016-05-13 15:34:37 +02:00
import akka.remote.RemotingLifecycleEvent
import akka.remote.ThisActorSystemQuarantinedEvent
2016-05-09 07:31:41 +02:00
import akka.remote.UniqueAddress
2016-05-13 15:34:37 +02:00
import akka.remote.artery.InboundControlJunction.ControlMessageObserver
2016-05-12 08:56:28 +02:00
import akka.remote.artery.InboundControlJunction.ControlMessageSubject
2016-05-09 07:31:41 +02:00
import akka.remote.transport.AkkaPduCodec
import akka.remote.transport.AkkaPduProtobufCodec
2016-07-01 11:54:57 +02:00
import akka.remote.artery.compress. { InboundCompressionsImpl , CompressionProtocol }
2016-05-17 17:34:57 +02:00
import akka.stream.AbruptTerminationException
2016-05-09 07:31:41 +02:00
import akka.stream.ActorMaterializer
import akka.stream.KillSwitches
import akka.stream.Materializer
import akka.stream.SharedKillSwitch
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
2016-05-17 17:34:57 +02:00
import akka.util.Helpers.ConfigOps
import akka.util.Helpers.Requiring
2016-06-02 07:21:32 +02:00
import akka.util.WildcardTree
2016-05-09 07:31:41 +02:00
import io.aeron.Aeron
import io.aeron.AvailableImageHandler
import io.aeron.Image
import io.aeron.UnavailableImageHandler
import io.aeron.driver.MediaDriver
import io.aeron.exceptions.ConductorServiceTimeoutException
import org.agrona.ErrorHandler
import org.agrona.IoUtil
import java.io.File
2016-05-17 14:17:21 +02:00
import java.net.InetSocketAddress
2016-06-06 13:36:05 +02:00
import java.nio.channels. { DatagramChannel , FileChannel }
2016-05-12 08:56:28 +02:00
import akka.remote.artery.OutboundControlJunction.OutboundControlIngress
2016-05-18 13:34:51 +02:00
import io.aeron.CncFileDescriptor
import java.util.concurrent.atomic.AtomicLong
2016-05-20 12:40:56 +02:00
import scala.collection.JavaConverters._
2016-05-29 19:41:09 +02:00
import akka.stream.ActorMaterializerSettings
2016-06-04 21:53:27 +02:00
import scala.annotation.tailrec
2016-06-05 15:40:06 +02:00
import akka.util.OptionVal
2016-06-10 07:41:36 +02:00
import io.aeron.driver.ThreadingMode
import org.agrona.concurrent.BackoffIdleStrategy
import org.agrona.concurrent.BusySpinIdleStrategy
2016-06-08 18:38:54 +02:00
import scala.util.control.NonFatal
2016-06-23 18:11:56 +02:00
import akka.actor.Props
import akka.actor.Actor
2016-06-06 08:26:15 +02:00
/* *
* INTERNAL API
*/
private [ akka ] object InboundEnvelope {
def apply (
2016-06-09 09:16:44 +02:00
recipient : OptionVal [ InternalActorRef ] ,
2016-06-06 08:26:15 +02:00
recipientAddress : Address ,
message : AnyRef ,
2016-06-09 09:16:44 +02:00
sender : OptionVal [ ActorRef ] ,
2016-06-10 13:04:23 +02:00
originUid : Long ,
association : OptionVal [ OutboundContext ] ) : InboundEnvelope = {
2016-06-06 08:26:15 +02:00
val env = new ReusableInboundEnvelope
2016-06-09 09:16:44 +02:00
env . init ( recipient , recipientAddress , message , sender , originUid , association )
2016-06-06 08:26:15 +02:00
env
}
}
/* *
* INTERNAL API
*/
private [ akka ] trait InboundEnvelope {
2016-06-09 09:16:44 +02:00
def recipient : OptionVal [ InternalActorRef ]
2016-06-06 08:26:15 +02:00
def recipientAddress : Address
def message : AnyRef
2016-06-09 09:16:44 +02:00
def sender : OptionVal [ ActorRef ]
2016-06-06 08:26:15 +02:00
def originUid : Long
2016-06-10 13:04:23 +02:00
def association : OptionVal [ OutboundContext ]
2016-06-06 08:26:15 +02:00
def withMessage ( message : AnyRef ) : InboundEnvelope
2016-06-09 09:16:44 +02:00
def withRecipient ( ref : InternalActorRef ) : InboundEnvelope
2016-06-06 08:26:15 +02:00
}
2016-05-09 07:31:41 +02:00
/* *
* INTERNAL API
*/
2016-06-06 08:26:15 +02:00
private [ akka ] final class ReusableInboundEnvelope extends InboundEnvelope {
2016-06-09 09:16:44 +02:00
private var _recipient : OptionVal [ InternalActorRef ] = OptionVal . None
2016-06-06 08:26:15 +02:00
private var _recipientAddress : Address = null
private var _message : AnyRef = null
2016-06-09 09:16:44 +02:00
private var _sender : OptionVal [ ActorRef ] = OptionVal . None
2016-06-06 08:26:15 +02:00
private var _originUid : Long = 0L
2016-06-10 13:04:23 +02:00
private var _association : OptionVal [ OutboundContext ] = OptionVal . None
2016-06-06 08:26:15 +02:00
2016-06-09 09:16:44 +02:00
override def recipient : OptionVal [ InternalActorRef ] = _recipient
2016-06-06 08:26:15 +02:00
override def recipientAddress : Address = _recipientAddress
override def message : AnyRef = _message
2016-06-09 09:16:44 +02:00
override def sender : OptionVal [ ActorRef ] = _sender
2016-06-06 08:26:15 +02:00
override def originUid : Long = _originUid
2016-06-10 13:04:23 +02:00
override def association : OptionVal [ OutboundContext ] = _association
2016-06-06 08:26:15 +02:00
override def withMessage ( message : AnyRef ) : InboundEnvelope = {
_message = message
this
}
2016-06-09 09:16:44 +02:00
def withRecipient ( ref : InternalActorRef ) : InboundEnvelope = {
_recipient = OptionVal ( ref )
this
}
2016-06-06 08:26:15 +02:00
def clear ( ) : Unit = {
2016-06-09 09:16:44 +02:00
_recipient = OptionVal . None
2016-06-06 08:26:15 +02:00
_recipientAddress = null
_message = null
2016-06-09 09:16:44 +02:00
_sender = OptionVal . None
2016-06-06 08:26:15 +02:00
_originUid = 0L
2016-06-10 13:04:23 +02:00
_association = OptionVal . None
2016-06-06 08:26:15 +02:00
}
def init (
2016-06-09 09:16:44 +02:00
recipient : OptionVal [ InternalActorRef ] ,
2016-06-06 08:26:15 +02:00
recipientAddress : Address ,
message : AnyRef ,
2016-06-09 09:16:44 +02:00
sender : OptionVal [ ActorRef ] ,
2016-06-10 13:04:23 +02:00
originUid : Long ,
association : OptionVal [ OutboundContext ] ) : Unit = {
2016-06-06 08:26:15 +02:00
_recipient = recipient
_recipientAddress = recipientAddress
_message = message
2016-06-09 09:16:44 +02:00
_sender = sender
2016-06-06 08:26:15 +02:00
_originUid = originUid
2016-06-10 13:04:23 +02:00
_association = association
2016-06-06 08:26:15 +02:00
}
override def toString : String =
2016-06-09 09:16:44 +02:00
s" InboundEnvelope( $recipient , $recipientAddress , $message , $sender , $originUid , $association ) "
2016-06-06 08:26:15 +02:00
}
2016-05-09 07:31:41 +02:00
/* *
* INTERNAL API
* Inbound API that is used by the stream stages .
* Separate trait to facilitate testing without real transport .
*/
private [ akka ] trait InboundContext {
/* *
* The local inbound address .
*/
def localAddress : UniqueAddress
/* *
2016-05-12 08:56:28 +02:00
* An inbound stage can send control message , e . g . a reply , to the origin
2016-05-13 08:06:13 +02:00
* address with this method . It will be sent over the control sub - channel .
2016-05-09 07:31:41 +02:00
*/
2016-05-12 08:56:28 +02:00
def sendControl ( to : Address , message : ControlMessage ) : Unit
2016-05-09 07:31:41 +02:00
/* *
* Lookup the outbound association for a given address .
*/
def association ( remoteAddress : Address ) : OutboundContext
2016-05-13 15:34:37 +02:00
2016-05-25 12:28:44 +02:00
/* *
* Lookup the outbound association for a given UID .
2016-06-05 15:40:06 +02:00
* Will return `OptionVal.None` if the UID is unknown , i . e .
* handshake not completed .
2016-05-25 12:28:44 +02:00
*/
2016-06-05 15:40:06 +02:00
def association ( uid : Long ) : OptionVal [ OutboundContext ]
2016-05-25 12:28:44 +02:00
def completeHandshake ( peer : UniqueAddress ) : Unit
2016-05-13 15:34:37 +02:00
}
/* *
* INTERNAL API
*/
private [ akka ] object AssociationState {
def apply ( ) : AssociationState =
2016-06-23 11:58:54 +02:00
new AssociationState (
incarnation = 1 ,
uniqueRemoteAddressPromise = Promise ( ) ,
quarantined = ImmutableLongMap . empty [ QuarantinedTimestamp ] ,
2016-07-01 11:54:57 +02:00
outboundCompression = NoOutboundCompressions )
2016-06-04 21:53:27 +02:00
2016-06-10 13:04:23 +02:00
final case class QuarantinedTimestamp ( nanoTime : Long ) {
override def toString : String =
s" Quarantined ${ TimeUnit . NANOSECONDS . toSeconds ( System . nanoTime ( ) - nanoTime ) } seconds ago "
2016-06-04 21:53:27 +02:00
}
2016-05-09 07:31:41 +02:00
}
2016-05-13 15:34:37 +02:00
/* *
* INTERNAL API
*/
private [ akka ] final class AssociationState (
2016-06-03 11:59:00 +02:00
val incarnation : Int ,
2016-05-13 15:34:37 +02:00
val uniqueRemoteAddressPromise : Promise [ UniqueAddress ] ,
2016-06-23 11:58:54 +02:00
val quarantined : ImmutableLongMap [ AssociationState . QuarantinedTimestamp ] ,
2016-07-01 11:54:57 +02:00
val outboundCompression : OutboundCompressions ) {
2016-06-10 13:04:23 +02:00
import AssociationState.QuarantinedTimestamp
2016-05-13 08:06:13 +02:00
2016-06-04 22:14:28 +02:00
// doesn't have to be volatile since it's only a cache changed once
private var uniqueRemoteAddressValueCache : Option [ UniqueAddress ] = null
2016-05-13 08:06:13 +02:00
/* *
* Full outbound address with UID for this association .
* Completed when by the handshake .
*/
def uniqueRemoteAddress : Future [ UniqueAddress ] = uniqueRemoteAddressPromise . future
2016-06-04 22:14:28 +02:00
def uniqueRemoteAddressValue ( ) : Option [ UniqueAddress ] = {
if ( uniqueRemoteAddressValueCache ne null )
uniqueRemoteAddressValueCache
else {
uniqueRemoteAddress . value match {
case Some ( Success ( peer ) ) ⇒
uniqueRemoteAddressValueCache = Some ( peer )
uniqueRemoteAddressValueCache
case _ ⇒ None
}
}
2016-05-13 15:34:37 +02:00
}
2016-07-01 11:54:57 +02:00
def newIncarnation ( remoteAddressPromise : Promise [ UniqueAddress ] , compression : OutboundCompressions ) : AssociationState =
2016-06-23 11:58:54 +02:00
new AssociationState ( incarnation + 1 , remoteAddressPromise , quarantined , compression )
2016-05-13 15:34:37 +02:00
def newQuarantined ( ) : AssociationState =
uniqueRemoteAddressPromise . future . value match {
case Some ( Success ( a ) ) ⇒
2016-06-23 11:58:54 +02:00
new AssociationState (
incarnation ,
uniqueRemoteAddressPromise ,
quarantined = quarantined . updated ( a . uid , QuarantinedTimestamp ( System . nanoTime ( ) ) ) ,
2016-07-01 11:54:57 +02:00
outboundCompression = NoOutboundCompressions ) // after quarantine no compression needed anymore, drop it
2016-05-13 15:34:37 +02:00
case _ ⇒ this
}
def isQuarantined ( ) : Boolean = {
uniqueRemoteAddressValue match {
2016-06-04 22:14:28 +02:00
case Some ( a ) ⇒ isQuarantined ( a . uid )
case _ ⇒ false // handshake not completed yet
2016-05-13 15:34:37 +02:00
}
}
2016-06-10 13:04:23 +02:00
def isQuarantined ( uid : Long ) : Boolean = quarantined . contains ( uid )
2016-05-13 15:34:37 +02:00
2016-05-13 08:06:13 +02:00
override def toString ( ) : String = {
val a = uniqueRemoteAddressPromise . future . value match {
case Some ( Success ( a ) ) ⇒ a
case Some ( Failure ( e ) ) ⇒ s" Failure( ${ e . getMessage } ) "
case None ⇒ "unknown"
}
s" AssociationState( $incarnation , $a ) "
}
2016-05-17 17:34:57 +02:00
2016-05-13 08:06:13 +02:00
}
2016-05-09 07:31:41 +02:00
/* *
* INTERNAL API
* Outbound association API that is used by the stream stages .
* Separate trait to facilitate testing without real transport .
*/
private [ akka ] trait OutboundContext {
/* *
* The local inbound address .
*/
def localAddress : UniqueAddress
/* *
* The outbound address for this association .
*/
def remoteAddress : Address
2016-05-13 08:06:13 +02:00
def associationState : AssociationState
2016-05-13 15:34:37 +02:00
def quarantine ( reason : String ) : Unit
2016-05-09 07:31:41 +02:00
/* *
2016-05-13 08:06:13 +02:00
* An inbound stage can send control message , e . g . a HandshakeReq , to the remote
* address of this association . It will be sent over the control sub - channel .
2016-05-09 07:31:41 +02:00
*/
2016-05-13 08:06:13 +02:00
def sendControl ( message : ControlMessage ) : Unit
2016-05-09 07:31:41 +02:00
/* *
2016-05-12 08:56:28 +02:00
* An outbound stage can listen to control messages
2016-05-09 07:31:41 +02:00
* via this observer subject .
*/
2016-05-12 08:56:28 +02:00
def controlSubject : ControlMessageSubject
2016-05-09 07:31:41 +02:00
// FIXME we should be able to Send without a recipient ActorRef
def dummyRecipient : RemoteActorRef
}
2016-06-23 18:11:56 +02:00
/* *
* INTERNAL API
*/
private [ remote ] object FlushOnShutdown {
def props ( done : Promise [ Done ] , timeout : FiniteDuration ,
inboundContext : InboundContext , associations : Set [ Association ] ) : Props = {
require ( associations . nonEmpty )
Props ( new FlushOnShutdown ( done , timeout , inboundContext , associations ) )
}
case object Timeout
}
/* *
* INTERNAL API
*/
private [ remote ] class FlushOnShutdown ( done : Promise [ Done ] , timeout : FiniteDuration ,
inboundContext : InboundContext , associations : Set [ Association ] ) extends Actor {
var remaining = associations . flatMap ( _ . associationState . uniqueRemoteAddressValue )
val timeoutTask = context . system . scheduler . scheduleOnce ( timeout , self , FlushOnShutdown . Timeout ) ( context . dispatcher )
override def preStart ( ) : Unit = {
val msg = ActorSystemTerminating ( inboundContext . localAddress )
associations . foreach { a ⇒ a . send ( msg , OptionVal . Some ( self ) , a . dummyRecipient ) }
}
override def postStop ( ) : Unit =
timeoutTask . cancel ( )
def receive = {
case ActorSystemTerminatingAck ( from ) ⇒
remaining -= from
if ( remaining . isEmpty ) {
done . trySuccess ( Done )
context . stop ( self )
}
case FlushOnShutdown . Timeout ⇒
done . trySuccess ( Done )
context . stop ( self )
}
}
2016-05-09 07:31:41 +02:00
/* *
* INTERNAL API
*/
private [ remote ] class ArteryTransport ( _system : ExtendedActorSystem , _provider : RemoteActorRefProvider )
extends RemoteTransport ( _system , _provider ) with InboundContext {
2016-06-06 13:36:05 +02:00
import FlightRecorderEvents._
2016-05-09 07:31:41 +02:00
// these vars are initialized once in the start method
@volatile private [ this ] var _localAddress : UniqueAddress = _
2016-05-27 16:45:48 +02:00
@volatile private [ this ] var _addresses : Set [ Address ] = _
2016-05-09 07:31:41 +02:00
@volatile private [ this ] var materializer : Materializer = _
2016-05-12 08:56:28 +02:00
@volatile private [ this ] var controlSubject : ControlMessageSubject = _
2016-05-09 07:31:41 +02:00
@volatile private [ this ] var messageDispatcher : MessageDispatcher = _
2016-06-01 11:56:18 +02:00
@volatile private [ this ] var mediaDriver : Option [ MediaDriver ] = None
2016-05-09 07:31:41 +02:00
@volatile private [ this ] var aeron : Aeron = _
2016-05-18 13:34:51 +02:00
@volatile private [ this ] var aeronErrorLogTask : Cancellable = _
2016-05-09 07:31:41 +02:00
2016-05-27 16:45:48 +02:00
override def localAddress : UniqueAddress = _localAddress
2016-05-09 07:31:41 +02:00
override def defaultAddress : Address = localAddress . address
2016-05-27 16:45:48 +02:00
override def addresses : Set [ Address ] = _addresses
2016-05-09 07:31:41 +02:00
override def localAddressForRemote ( remote : Address ) : Address = defaultAddress
2016-05-13 15:34:37 +02:00
override val log : LoggingAdapter = Logging ( system , getClass . getName )
2016-06-08 12:40:40 +02:00
val eventPublisher = new EventPublisher ( system , log , remoteSettings . RemoteLifecycleEventsLogLevel )
2016-05-09 07:31:41 +02:00
private val codec : AkkaPduCodec = AkkaPduProtobufCodec
private val killSwitch : SharedKillSwitch = KillSwitches . shared ( "transportKillSwitch" )
2016-05-17 17:34:57 +02:00
@volatile private [ this ] var _shutdown = false
2016-05-12 11:42:09 +02:00
2016-06-02 07:21:32 +02:00
private val testStages : CopyOnWriteArrayList [ TestManagementApi ] = new CopyOnWriteArrayList
2016-05-12 11:42:09 +02:00
// FIXME config
private val systemMessageResendInterval : FiniteDuration = 1. second
2016-05-17 17:34:57 +02:00
private val handshakeRetryInterval : FiniteDuration = 1. second
private val handshakeTimeout : FiniteDuration =
2016-06-03 11:59:00 +02:00
system . settings . config . getMillisDuration ( "akka.remote.handshake-timeout" ) . requiring (
_ > Duration . Zero ,
2016-05-17 17:34:57 +02:00
"handshake-timeout must be > 0" )
2016-05-25 12:28:44 +02:00
private val injectHandshakeInterval : FiniteDuration = 1. second
2016-05-19 08:24:27 +02:00
private val giveUpSendAfter : FiniteDuration = 60. seconds
2016-06-23 18:11:56 +02:00
private val shutdownFlushTimeout = 1. second
private val remoteDispatcher = system . dispatchers . lookup ( remoteSettings . Dispatcher )
2016-05-09 07:31:41 +02:00
2016-05-20 12:40:56 +02:00
private val largeMessageDestinations =
system . settings . config . getStringList ( "akka.remote.artery.large-message-destinations" ) . asScala . foldLeft ( WildcardTree [ NotUsed ] ( ) ) { ( tree , entry ) ⇒
val segments = entry . split ( '/' ) . tail
tree . insert ( segments . iterator , NotUsed )
}
private val largeMessageDestinationsEnabled = largeMessageDestinations . children . nonEmpty
2016-05-09 07:31:41 +02:00
private def inboundChannel = s" aeron:udp?endpoint= ${ localAddress . address . host . get } : ${ localAddress . address . port . get } "
private def outboundChannel ( a : Address ) = s" aeron:udp?endpoint= ${ a . host . get } : ${ a . port . get } "
2016-05-12 08:56:28 +02:00
private val controlStreamId = 1
2016-05-09 07:31:41 +02:00
private val ordinaryStreamId = 3
2016-05-20 12:40:56 +02:00
private val largeStreamId = 4
2016-06-10 07:41:36 +02:00
private val taskRunner = new TaskRunner ( system , remoteSettings . IdleCpuLevel )
2016-05-09 07:31:41 +02:00
2016-05-17 17:34:57 +02:00
private val restartTimeout : FiniteDuration = 5. seconds // FIXME config
private val maxRestarts = 5 // FIXME config
private val restartCounter = new RestartCounter ( maxRestarts , restartTimeout )
2016-06-06 08:26:15 +02:00
private val envelopePool = new EnvelopeBufferPool ( ArteryTransport . MaximumFrameSize , ArteryTransport . MaximumPooledBuffers )
private val largeEnvelopePool = new EnvelopeBufferPool ( ArteryTransport . MaximumLargeFrameSize , ArteryTransport . MaximumPooledBuffers )
private val inboundEnvelopePool = new ObjectPool [ InboundEnvelope ] (
16 ,
create = ( ) ⇒ new ReusableInboundEnvelope , clear = inEnvelope ⇒ inEnvelope . asInstanceOf [ ReusableInboundEnvelope ] . clear ( ) )
2016-05-20 12:40:56 +02:00
2016-06-29 12:36:17 +02:00
val ( afrFileChannel , afrFlie , flightRecorder ) = initializeFlightRecorder ( ) match {
case None ⇒ ( None , None , None )
case Some ( ( c , f , r ) ) ⇒ ( Some ( c ) , Some ( f ) , Some ( r ) )
}
def createFlightRecorderEventSink ( synchr : Boolean = false ) : EventSink = {
flightRecorder match {
case Some ( f ) ⇒
val eventSink = f . createEventSink ( )
if ( synchr ) new SynchronizedEventSink ( eventSink )
else eventSink
case None ⇒
IgnoreEventSink
}
2016-06-23 18:11:56 +02:00
}
2016-06-29 12:36:17 +02:00
private val topLevelFREvents =
createFlightRecorderEventSink ( synchr = true )
2016-06-06 13:36:05 +02:00
2016-05-25 12:28:44 +02:00
private val associationRegistry = new AssociationRegistry (
remoteAddress ⇒ new Association ( this , materializer , remoteAddress , controlSubject , largeMessageDestinations ) )
2016-06-02 07:21:32 +02:00
def remoteSettings : RemoteSettings = provider . remoteSettings
2016-05-09 07:31:41 +02:00
override def start ( ) : Unit = {
startMediaDriver ( )
startAeron ( )
2016-06-06 13:36:05 +02:00
topLevelFREvents . loFreq ( Transport_AeronStarted , NoMetaData )
2016-05-18 13:34:51 +02:00
startAeronErrorLog ( )
2016-06-06 13:36:05 +02:00
topLevelFREvents . loFreq ( Transport_AeronErrorLogStarted , NoMetaData )
2016-05-09 07:31:41 +02:00
taskRunner . start ( )
2016-06-06 13:36:05 +02:00
topLevelFREvents . loFreq ( Transport_TaskRunnerStarted , NoMetaData )
2016-05-09 07:31:41 +02:00
2016-05-17 14:17:21 +02:00
val port =
if ( remoteSettings . ArteryPort == 0 ) ArteryTransport . autoSelectPort ( remoteSettings . ArteryHostname )
else remoteSettings . ArteryPort
2016-05-09 07:31:41 +02:00
// TODO: Configure materializer properly
// TODO: Have a supervisor actor
_localAddress = UniqueAddress (
2016-05-27 16:45:48 +02:00
Address ( ArteryTransport . ProtocolName , system . name , remoteSettings . ArteryHostname , port ) ,
2016-05-25 12:28:44 +02:00
AddressUidExtension ( system ) . longAddressUid )
2016-05-27 16:45:48 +02:00
_addresses = Set ( _localAddress . address )
2016-05-29 19:41:09 +02:00
2016-06-06 13:36:05 +02:00
// TODO: This probably needs to be a global value instead of an event as events might rotate out of the log
topLevelFREvents . loFreq ( Transport_UniqueAddressSet , _localAddress . toString ( ) . getBytes ( "US-ASCII" ) )
2016-05-29 19:41:09 +02:00
val materializerSettings = ActorMaterializerSettings (
remoteSettings . config . getConfig ( "akka.remote.artery.advanced.materializer" ) )
2016-06-23 18:11:56 +02:00
materializer = ActorMaterializer . systemMaterializer ( materializerSettings , "remote" , system )
2016-05-09 07:31:41 +02:00
messageDispatcher = new MessageDispatcher ( system , provider )
2016-06-06 13:36:05 +02:00
topLevelFREvents . loFreq ( Transport_MaterializerStarted , NoMetaData )
2016-05-09 07:31:41 +02:00
2016-05-17 17:34:57 +02:00
runInboundStreams ( )
2016-06-06 13:36:05 +02:00
topLevelFREvents . loFreq ( Transport_StartupFinished , NoMetaData )
2016-05-18 09:22:22 +02:00
log . info ( "Remoting started; listening on address: {}" , defaultAddress )
2016-05-09 07:31:41 +02:00
}
2016-06-08 18:38:54 +02:00
private lazy val stopMediaDriverShutdownHook = new Thread {
override def run ( ) : Unit = stopMediaDriver ( )
}
2016-05-09 07:31:41 +02:00
private def startMediaDriver ( ) : Unit = {
2016-06-01 11:56:18 +02:00
if ( remoteSettings . EmbeddedMediaDriver ) {
val driverContext = new MediaDriver . Context
if ( remoteSettings . AeronDirectoryName . nonEmpty )
driverContext . aeronDirectoryName ( remoteSettings . AeronDirectoryName )
// FIXME settings from config
2016-06-10 07:41:36 +02:00
driverContext . conductorIdleStrategy ( )
2016-06-01 11:56:18 +02:00
driverContext . clientLivenessTimeoutNs ( SECONDS . toNanos ( 20 ) )
driverContext . imageLivenessTimeoutNs ( SECONDS . toNanos ( 20 ) )
driverContext . driverTimeoutMs ( SECONDS . toNanos ( 20 ) )
2016-06-10 07:41:36 +02:00
if ( remoteSettings . IdleCpuLevel == 10 ) {
driverContext
. threadingMode ( ThreadingMode . DEDICATED )
. conductorIdleStrategy ( new BackoffIdleStrategy ( 1 , 1 , 1 , 1 ) )
. receiverIdleStrategy ( new BusySpinIdleStrategy )
. senderIdleStrategy ( new BusySpinIdleStrategy ) ;
} else if ( remoteSettings . IdleCpuLevel == 1 ) {
driverContext
. threadingMode ( ThreadingMode . SHARED )
//FIXME measure: .sharedIdleStrategy(new BackoffIdleStrategy(20, 50, 1, 200))
} else if ( remoteSettings . IdleCpuLevel <= 5 ) {
driverContext
. threadingMode ( ThreadingMode . SHARED_NETWORK )
//FIXME measure: .sharedNetworkIdleStrategy(new BackoffIdleStrategy(20, 50, 1, 20 * (11 - remoteSettings.IdleCpuLevel)))
}
2016-06-01 11:56:18 +02:00
val driver = MediaDriver . launchEmbedded ( driverContext )
log . debug ( "Started embedded media driver in directory [{}]" , driver . aeronDirectoryName )
2016-06-06 13:36:05 +02:00
topLevelFREvents . loFreq ( Transport_MediaDriverStarted , driver . aeronDirectoryName ( ) . getBytes ( "US-ASCII" ) )
2016-06-08 18:38:54 +02:00
Runtime . getRuntime . addShutdownHook ( stopMediaDriverShutdownHook )
2016-06-01 11:56:18 +02:00
mediaDriver = Some ( driver )
}
}
private def aeronDir : String = mediaDriver match {
case Some ( driver ) ⇒ driver . aeronDirectoryName
case None ⇒ remoteSettings . AeronDirectoryName
2016-05-09 07:31:41 +02:00
}
2016-06-08 18:38:54 +02:00
private def stopMediaDriver ( ) : Unit = {
mediaDriver . foreach { driver ⇒
// this is only for embedded media driver
driver . close ( )
try {
// FIXME it should also be configurable to not delete dir
IoUtil . delete ( new File ( driver . aeronDirectoryName ) , false )
} catch {
case NonFatal ( e ) ⇒
log . warning (
"Couldn't delete Aeron embedded media driver files in [{}] due to [{}]" ,
driver . aeronDirectoryName , e . getMessage )
}
}
Try ( Runtime . getRuntime . removeShutdownHook ( stopMediaDriverShutdownHook ) )
}
2016-06-06 13:36:05 +02:00
// TODO: Add FR events
2016-05-09 07:31:41 +02:00
private def startAeron ( ) : Unit = {
val ctx = new Aeron . Context
ctx . availableImageHandler ( new AvailableImageHandler {
override def onAvailableImage ( img : Image ) : Unit = {
if ( log . isDebugEnabled )
log . debug ( s" onAvailableImage from ${ img . sourceIdentity } session ${ img . sessionId } " )
}
} )
ctx . unavailableImageHandler ( new UnavailableImageHandler {
override def onUnavailableImage ( img : Image ) : Unit = {
if ( log . isDebugEnabled )
log . debug ( s" onUnavailableImage from ${ img . sourceIdentity } session ${ img . sessionId } " )
// FIXME we should call FragmentAssembler.freeSessionBuffer when image is unavailable
}
} )
ctx . errorHandler ( new ErrorHandler {
override def onError ( cause : Throwable ) : Unit = {
cause match {
case e : ConductorServiceTimeoutException ⇒
// Timeout between service calls
log . error ( cause , s" Aeron ServiceTimeoutException, ${ cause . getMessage } " )
case _ ⇒
log . error ( cause , s" Aeron error, ${ cause . getMessage } " )
}
}
} )
2016-06-01 11:56:18 +02:00
ctx . aeronDirectoryName ( aeronDir )
2016-05-09 07:31:41 +02:00
aeron = Aeron . connect ( ctx )
}
2016-06-06 13:36:05 +02:00
// TODO Add FR Events
2016-05-18 13:34:51 +02:00
private def startAeronErrorLog ( ) : Unit = {
2016-06-01 11:56:18 +02:00
val errorLog = new AeronErrorLog ( new File ( aeronDir , CncFileDescriptor . CNC_FILE ) )
2016-05-18 13:34:51 +02:00
val lastTimestamp = new AtomicLong ( 0L )
import system.dispatcher // FIXME perhaps use another dispatcher for this
aeronErrorLogTask = system . scheduler . schedule ( 3. seconds , 5. seconds ) {
if ( ! isShutdown ) {
val newLastTimestamp = errorLog . logErrors ( log , lastTimestamp . get )
lastTimestamp . set ( newLastTimestamp + 1 )
}
}
}
2016-05-17 17:34:57 +02:00
private def runInboundStreams ( ) : Unit = {
2016-07-01 11:54:57 +02:00
val noCompressions = new NoInboundCompressions ( system ) // TODO possibly enable on other channels too https://github.com/akka/akka/pull/20546/files#r68074082
val compressions = createInboundCompressions ( this )
2016-06-23 11:58:54 +02:00
2016-07-01 11:54:57 +02:00
runInboundControlStream ( noCompressions )
runInboundOrdinaryMessagesStream ( compressions )
2016-05-20 12:40:56 +02:00
if ( largeMessageDestinationsEnabled ) {
runInboundLargeMessagesStream ( )
}
2016-05-17 17:34:57 +02:00
}
2016-07-01 11:54:57 +02:00
private def runInboundControlStream ( compression : InboundCompressions ) : Unit = {
2016-06-02 07:21:32 +02:00
val ( ctrl , completed ) =
if ( remoteSettings . TestMode ) {
val ( mgmt , ( ctrl , completed ) ) =
aeronSource ( controlStreamId , envelopePool )
2016-06-23 11:58:54 +02:00
. via ( inboundFlow ( compression ) )
2016-06-02 07:21:32 +02:00
. viaMat ( inboundTestFlow ) ( Keep . right )
. toMat ( inboundControlSink ) ( Keep . both )
. run ( ) ( materializer )
testStages . add ( mgmt )
( ctrl , completed )
} else {
aeronSource ( controlStreamId , envelopePool )
2016-06-23 11:58:54 +02:00
. via ( inboundFlow ( compression ) )
2016-06-02 07:21:32 +02:00
. toMat ( inboundControlSink ) ( Keep . right )
. run ( ) ( materializer )
}
controlSubject = ctrl
2016-05-09 07:31:41 +02:00
2016-05-13 15:34:37 +02:00
controlSubject . attach ( new ControlMessageObserver {
override def notify ( inboundEnvelope : InboundEnvelope ) : Unit = {
2016-05-17 17:34:57 +02:00
inboundEnvelope . message match {
2016-06-23 11:58:54 +02:00
case m : CompressionMessage ⇒
m match {
2016-07-01 11:54:57 +02:00
case CompressionProtocol . ActorRefCompressionAdvertisement ( from , table ) ⇒
log . debug ( "Incoming ActorRef compression advertisement from [{}], table: [{}]" , from , table )
association ( from . address ) . compression . applyActorRefCompressionTable ( table )
system . eventStream . publish ( CompressionProtocol . Events . ReceivedCompressionTable ( from , table ) )
case CompressionProtocol . ClassManifestCompressionAdvertisement ( from , table ) ⇒
log . debug ( "Incoming Class Manifest compression advertisement from [{}], table: [{}]" , from , table )
association ( from . address ) . compression . applyClassManifestCompressionTable ( table )
system . eventStream . publish ( CompressionProtocol . Events . ReceivedCompressionTable ( from , table ) )
2016-06-23 11:58:54 +02:00
}
2016-06-23 18:11:56 +02:00
case Quarantined ( from , to ) if to == localAddress ⇒
val lifecycleEvent = ThisActorSystemQuarantinedEvent ( localAddress . address , from . address )
publishLifecycleEvent ( lifecycleEvent )
// quarantine the other system from here
association ( from . address ) . quarantine ( lifecycleEvent . toString , Some ( from . uid ) )
case _ : ActorSystemTerminating ⇒
inboundEnvelope . sender match {
case OptionVal . Some ( snd ) ⇒ snd . tell ( ActorSystemTerminatingAck ( localAddress ) , ActorRef . noSender )
case OptionVal . None ⇒ log . error ( "Expected sender for ActorSystemTerminating message" )
}
case _ ⇒ // not interesting
2016-05-17 17:34:57 +02:00
}
2016-06-23 18:11:56 +02:00
}
2016-05-17 17:34:57 +02:00
} )
2016-06-23 11:58:54 +02:00
attachStreamRestart ( "Inbound control stream" , completed , ( ) ⇒ runInboundControlStream ( compression ) )
2016-05-17 17:34:57 +02:00
}
2016-07-01 11:54:57 +02:00
private def runInboundOrdinaryMessagesStream ( compression : InboundCompressions ) : Unit = {
2016-06-02 07:21:32 +02:00
val completed =
if ( remoteSettings . TestMode ) {
val ( mgmt , c ) = aeronSource ( ordinaryStreamId , envelopePool )
2016-06-23 11:58:54 +02:00
. via ( inboundFlow ( compression ) )
2016-06-02 07:21:32 +02:00
. viaMat ( inboundTestFlow ) ( Keep . right )
. toMat ( inboundSink ) ( Keep . both )
. run ( ) ( materializer )
testStages . add ( mgmt )
c
} else {
aeronSource ( ordinaryStreamId , envelopePool )
2016-06-23 11:58:54 +02:00
. via ( inboundFlow ( compression ) )
2016-06-02 07:21:32 +02:00
. toMat ( inboundSink ) ( Keep . right )
. run ( ) ( materializer )
}
2016-05-17 17:34:57 +02:00
2016-06-23 11:58:54 +02:00
attachStreamRestart ( "Inbound message stream" , completed , ( ) ⇒ runInboundOrdinaryMessagesStream ( compression ) )
2016-05-17 17:34:57 +02:00
}
2016-05-20 12:40:56 +02:00
private def runInboundLargeMessagesStream ( ) : Unit = {
2016-07-01 11:54:57 +02:00
val compression = new NoInboundCompressions ( system ) // no compression on large message stream for now
2016-06-23 11:58:54 +02:00
2016-06-02 07:21:32 +02:00
val completed =
if ( remoteSettings . TestMode ) {
val ( mgmt , c ) = aeronSource ( largeStreamId , largeEnvelopePool )
2016-06-23 11:58:54 +02:00
. via ( inboundLargeFlow ( compression ) )
2016-06-02 07:21:32 +02:00
. viaMat ( inboundTestFlow ) ( Keep . right )
. toMat ( inboundSink ) ( Keep . both )
. run ( ) ( materializer )
testStages . add ( mgmt )
c
} else {
aeronSource ( largeStreamId , largeEnvelopePool )
2016-06-23 11:58:54 +02:00
. via ( inboundLargeFlow ( compression ) )
2016-06-02 07:21:32 +02:00
. toMat ( inboundSink ) ( Keep . right )
. run ( ) ( materializer )
}
2016-05-26 10:42:08 +02:00
attachStreamRestart ( "Inbound large message stream" , completed , ( ) ⇒ runInboundLargeMessagesStream ( ) )
2016-05-20 12:40:56 +02:00
}
2016-05-17 17:34:57 +02:00
private def attachStreamRestart ( streamName : String , streamCompleted : Future [ Done ] , restart : ( ) ⇒ Unit ) : Unit = {
implicit val ec = materializer . executionContext
streamCompleted . onFailure {
2016-05-19 08:24:27 +02:00
case _ if isShutdown ⇒ // don't restart after shutdown
2016-05-17 17:34:57 +02:00
case _ : AbruptTerminationException ⇒ // ActorSystem shutdown
case cause ⇒
2016-05-19 08:24:27 +02:00
if ( restartCounter . restart ( ) ) {
log . error ( cause , "{} failed. Restarting it. {}" , streamName , cause . getMessage )
restart ( )
} else {
log . error ( cause , "{} failed and restarted {} times within {} seconds. Terminating system. {}" ,
streamName , maxRestarts , restartTimeout . toSeconds , cause . getMessage )
system . terminate ( )
}
2016-05-17 17:34:57 +02:00
}
2016-05-09 07:31:41 +02:00
}
override def shutdown ( ) : Future [ Done ] = {
2016-05-17 17:34:57 +02:00
_shutdown = true
2016-06-23 18:11:56 +02:00
val allAssociations = associationRegistry . allAssociations
val flushing : Future [ Done ] =
if ( allAssociations . isEmpty ) Future . successful ( Done )
else {
val flushingPromise = Promise [ Done ] ( )
system . systemActorOf ( FlushOnShutdown . props ( flushingPromise , shutdownFlushTimeout ,
this , allAssociations ) . withDispatcher ( remoteSettings . Dispatcher ) , "remoteFlushOnShutdown" )
flushingPromise . future
}
implicit val ec = remoteDispatcher
flushing . recover { case _ ⇒ Done } . map { _ ⇒
killSwitch . shutdown ( )
topLevelFREvents . loFreq ( Transport_KillSwitchPulled , NoMetaData )
if ( taskRunner != null ) {
taskRunner . stop ( )
topLevelFREvents . loFreq ( Transport_Stopped , NoMetaData )
}
if ( aeronErrorLogTask != null ) {
aeronErrorLogTask . cancel ( )
topLevelFREvents . loFreq ( Transport_AeronErrorLogTaskStopped , NoMetaData )
}
if ( aeron != null ) aeron . close ( )
if ( mediaDriver . isDefined ) {
stopMediaDriver ( )
topLevelFREvents . loFreq ( Transport_MediaFileDeleted , NoMetaData )
}
topLevelFREvents . loFreq ( Transport_FlightRecorderClose , NoMetaData )
2016-06-29 12:36:17 +02:00
flightRecorder . foreach ( _ . close ( ) )
afrFileChannel . foreach ( _ . force ( true ) )
afrFileChannel . foreach ( _ . close ( ) )
2016-06-23 18:11:56 +02:00
// TODO: Be smarter about this in tests and make it always-on-for prod
2016-06-29 12:36:17 +02:00
afrFlie . foreach ( _ . delete ( ) )
2016-06-23 18:11:56 +02:00
Done
2016-05-09 07:31:41 +02:00
}
}
2016-06-23 11:58:54 +02:00
private [ remote ] def isShutdown : Boolean = _shutdown
2016-05-17 17:34:57 +02:00
2016-06-02 07:21:32 +02:00
override def managementCommand ( cmd : Any ) : Future [ Boolean ] = {
if ( testStages . isEmpty )
Future . successful ( false )
else {
import scala.collection.JavaConverters._
import system.dispatcher
val allTestStages = testStages . asScala . toVector ++ associationRegistry . allAssociations . flatMap ( _ . testStages )
Future . sequence ( allTestStages . map ( _ . send ( cmd ) ) ) . map ( _ ⇒ true )
}
}
2016-05-09 07:31:41 +02:00
// InboundContext
2016-05-12 08:56:28 +02:00
override def sendControl ( to : Address , message : ControlMessage ) =
2016-05-29 22:15:48 +02:00
association ( to ) . sendControl ( message )
2016-05-09 07:31:41 +02:00
2016-06-09 09:16:44 +02:00
override def send ( message : Any , sender : OptionVal [ ActorRef ] , recipient : RemoteActorRef ) : Unit = {
2016-05-09 07:31:41 +02:00
val cached = recipient . cachedAssociation
val a =
if ( cached ne null ) cached
2016-05-27 16:45:48 +02:00
else {
val a2 = association ( recipient . path . address )
recipient . cachedAssociation = a2
a2
}
2016-05-09 07:31:41 +02:00
2016-06-09 09:16:44 +02:00
a . send ( message , sender , recipient )
2016-05-09 07:31:41 +02:00
}
2016-06-23 11:58:54 +02:00
override def association ( remoteAddress : Address ) : Association = {
require ( remoteAddress != localAddress . address , "Attemted association with self address!" )
2016-05-25 12:28:44 +02:00
associationRegistry . association ( remoteAddress )
2016-06-23 11:58:54 +02:00
}
2016-05-25 12:28:44 +02:00
2016-06-05 15:40:06 +02:00
override def association ( uid : Long ) : OptionVal [ Association ] =
2016-05-25 12:28:44 +02:00
associationRegistry . association ( uid )
override def completeHandshake ( peer : UniqueAddress ) : Unit = {
val a = associationRegistry . setUID ( peer )
a . completeHandshake ( peer )
2016-05-09 07:31:41 +02:00
}
2016-05-13 15:34:37 +02:00
private def publishLifecycleEvent ( event : RemotingLifecycleEvent ) : Unit =
eventPublisher . notifyListeners ( event )
2016-05-25 12:28:44 +02:00
override def quarantine ( remoteAddress : Address , uid : Option [ Int ] ) : Unit = {
// FIXME change the method signature (old remoting) to include reason and use Long uid?
association ( remoteAddress ) . quarantine ( reason = "" , uid . map ( _ . toLong ) )
}
2016-05-09 07:31:41 +02:00
2016-07-01 11:54:57 +02:00
def outbound ( outboundContext : OutboundContext , compression : OutboundCompressions ) : Sink [ Send , Future [ Done ] ] = {
2016-05-09 07:31:41 +02:00
Flow . fromGraph ( killSwitch . flow [ Send ] )
2016-06-23 11:58:54 +02:00
. via ( new OutboundHandshake ( system , outboundContext , handshakeTimeout , handshakeRetryInterval , injectHandshakeInterval ) )
. via ( encoder ( compression ) )
2016-05-19 08:24:27 +02:00
. toMat ( new AeronSink ( outboundChannel ( outboundContext . remoteAddress ) , ordinaryStreamId , aeron , taskRunner ,
2016-06-23 18:11:56 +02:00
envelopePool , giveUpSendAfter , createFlightRecorderEventSink ( ) ) ) ( Keep . right )
2016-05-09 07:31:41 +02:00
}
2016-07-01 11:54:57 +02:00
def outboundLarge ( outboundContext : OutboundContext , compression : OutboundCompressions ) : Sink [ Send , Future [ Done ] ] = {
2016-05-26 10:42:08 +02:00
Flow . fromGraph ( killSwitch . flow [ Send ] )
2016-06-23 11:58:54 +02:00
. via ( new OutboundHandshake ( system , outboundContext , handshakeTimeout , handshakeRetryInterval , injectHandshakeInterval ) )
. via ( createEncoder ( largeEnvelopePool , compression ) )
2016-05-26 10:42:08 +02:00
. toMat ( new AeronSink ( outboundChannel ( outboundContext . remoteAddress ) , largeStreamId , aeron , taskRunner ,
2016-06-23 18:11:56 +02:00
envelopePool , giveUpSendAfter , createFlightRecorderEventSink ( ) ) ) ( Keep . right )
2016-05-20 12:40:56 +02:00
}
2016-07-01 11:54:57 +02:00
def outboundControl ( outboundContext : OutboundContext , compression : OutboundCompressions ) : Sink [ Send , ( OutboundControlIngress , Future [ Done ] ) ] = {
2016-05-09 07:31:41 +02:00
Flow . fromGraph ( killSwitch . flow [ Send ] )
2016-06-23 11:58:54 +02:00
. via ( new OutboundHandshake ( system , outboundContext , handshakeTimeout , handshakeRetryInterval , injectHandshakeInterval ) )
2016-06-08 12:40:40 +02:00
. via ( new SystemMessageDelivery ( outboundContext , system . deadLetters , systemMessageResendInterval ,
remoteSettings . SysMsgBufferSize ) )
2016-05-12 08:56:28 +02:00
. viaMat ( new OutboundControlJunction ( outboundContext ) ) ( Keep . right )
2016-06-23 11:58:54 +02:00
. via ( encoder ( compression ) )
2016-05-19 08:24:27 +02:00
. toMat ( new AeronSink ( outboundChannel ( outboundContext . remoteAddress ) , controlStreamId , aeron , taskRunner ,
2016-06-23 18:11:56 +02:00
envelopePool , Duration . Inf , createFlightRecorderEventSink ( ) ) ) ( Keep . both )
2016-05-13 15:34:37 +02:00
// FIXME we can also add scrubbing stage that would collapse sys msg acks/nacks and remove duplicate Quarantine messages
2016-05-09 07:31:41 +02:00
}
2016-07-01 11:54:57 +02:00
def createEncoder ( compression : OutboundCompressions , bufferPool : EnvelopeBufferPool ) : Flow [ Send , EnvelopeBuffer , NotUsed ] =
2016-06-06 08:26:15 +02:00
Flow . fromGraph ( new Encoder ( localAddress , system , compression , bufferPool ) )
2016-05-26 10:42:08 +02:00
2016-07-01 11:54:57 +02:00
private def createInboundCompressions ( inboundContext : InboundContext ) : InboundCompressions =
if ( remoteSettings . ArteryCompressionSettings . enabled ) new InboundCompressionsImpl ( system , inboundContext )
else new NoInboundCompressions ( system )
2016-06-23 11:58:54 +02:00
2016-07-01 11:54:57 +02:00
def createEncoder ( pool : EnvelopeBufferPool , compression : OutboundCompressions ) : Flow [ Send , EnvelopeBuffer , NotUsed ] =
2016-06-23 11:58:54 +02:00
Flow . fromGraph ( new Encoder ( localAddress , system , compression , pool ) )
2016-07-01 11:54:57 +02:00
def encoder ( compression : OutboundCompressions ) : Flow [ Send , EnvelopeBuffer , NotUsed ] = createEncoder ( envelopePool , compression )
2016-05-09 07:31:41 +02:00
2016-06-02 07:21:32 +02:00
def aeronSource ( streamId : Int , pool : EnvelopeBufferPool ) : Source [ EnvelopeBuffer , NotUsed ] =
Source . fromGraph ( new AeronSource ( inboundChannel , streamId , aeron , taskRunner , pool ,
2016-06-23 18:11:56 +02:00
createFlightRecorderEventSink ( ) ) )
2016-06-02 07:21:32 +02:00
2016-05-09 07:31:41 +02:00
val messageDispatcherSink : Sink [ InboundEnvelope , Future [ Done ] ] = Sink . foreach [ InboundEnvelope ] { m ⇒
2016-06-09 09:16:44 +02:00
messageDispatcher . dispatch ( m . recipient . get , m . recipientAddress , m . message , m . sender )
2016-06-06 08:26:15 +02:00
inboundEnvelopePool . release ( m )
2016-05-09 07:31:41 +02:00
}
2016-07-01 11:54:57 +02:00
def createDecoder ( compression : InboundCompressions , bufferPool : EnvelopeBufferPool ) : Flow [ EnvelopeBuffer , InboundEnvelope , NotUsed ] = {
2016-05-26 10:42:08 +02:00
val resolveActorRefWithLocalAddress : String ⇒ InternalActorRef =
recipient ⇒ provider . resolveActorRefWithLocalAddress ( recipient , localAddress . address )
2016-06-10 13:04:23 +02:00
Flow . fromGraph ( new Decoder ( this , system , resolveActorRefWithLocalAddress , compression , bufferPool ,
2016-06-06 08:26:15 +02:00
inboundEnvelopePool ) )
2016-05-26 10:42:08 +02:00
}
2016-05-09 07:31:41 +02:00
2016-07-01 11:54:57 +02:00
def decoder ( compression : InboundCompressions ) : Flow [ EnvelopeBuffer , InboundEnvelope , NotUsed ] =
2016-06-23 11:58:54 +02:00
createDecoder ( compression , envelopePool )
2016-05-26 10:42:08 +02:00
2016-06-02 07:21:32 +02:00
def inboundSink : Sink [ InboundEnvelope , Future [ Done ] ] =
2016-05-26 10:42:08 +02:00
Flow [ InboundEnvelope ]
. via ( new InboundHandshake ( this , inControlStream = false ) )
. via ( new InboundQuarantineCheck ( this ) )
2016-06-02 07:21:32 +02:00
. toMat ( messageDispatcherSink ) ( Keep . right )
2016-05-26 10:42:08 +02:00
2016-07-01 11:54:57 +02:00
def inboundFlow ( compression : InboundCompressions ) : Flow [ EnvelopeBuffer , InboundEnvelope , NotUsed ] = {
2016-06-02 07:21:32 +02:00
Flow [ EnvelopeBuffer ]
. via ( killSwitch . flow )
2016-06-23 11:58:54 +02:00
. via ( decoder ( compression ) )
2016-05-26 10:42:08 +02:00
}
2016-07-01 11:54:57 +02:00
def inboundLargeFlow ( compression : InboundCompressions ) : Flow [ EnvelopeBuffer , InboundEnvelope , NotUsed ] = {
2016-06-02 07:21:32 +02:00
Flow [ EnvelopeBuffer ]
. via ( killSwitch . flow )
2016-06-23 11:58:54 +02:00
. via ( createDecoder ( compression , largeEnvelopePool ) )
2016-05-09 07:31:41 +02:00
}
2016-06-02 07:21:32 +02:00
def inboundControlSink : Sink [ InboundEnvelope , ( ControlMessageSubject , Future [ Done ] ) ] = {
Flow [ InboundEnvelope ]
. via ( new InboundHandshake ( this , inControlStream = true ) )
. via ( new InboundQuarantineCheck ( this ) )
. viaMat ( new InboundControlJunction ) ( Keep . right )
. via ( new SystemMessageAcker ( this ) )
. toMat ( messageDispatcherSink ) ( Keep . both )
2016-05-09 07:31:41 +02:00
}
2016-06-29 12:36:17 +02:00
private def initializeFlightRecorder ( ) : Option [ ( FileChannel , File , FlightRecorder ) ] = {
if ( remoteSettings . FlightRecorderEnabled ) {
// TODO: Figure out where to put it, currently using temporary files
val afrFile = File . createTempFile ( "artery" , ".afr" )
afrFile . deleteOnExit ( )
2016-06-06 13:36:05 +02:00
2016-06-29 12:36:17 +02:00
val fileChannel = FlightRecorder . prepareFileForFlightRecorder ( afrFile )
Some ( ( fileChannel , afrFile , new FlightRecorder ( fileChannel ) ) )
} else
None
2016-06-06 13:36:05 +02:00
}
2016-06-02 07:21:32 +02:00
def inboundTestFlow : Flow [ InboundEnvelope , InboundEnvelope , TestManagementApi ] =
Flow . fromGraph ( new InboundTestStage ( this ) )
def outboundTestFlow ( association : Association ) : Flow [ Send , Send , TestManagementApi ] =
Flow . fromGraph ( new OutboundTestStage ( association ) )
2016-05-09 07:31:41 +02:00
}
2016-05-05 14:38:48 +02:00
/* *
* INTERNAL API
*/
private [ remote ] object ArteryTransport {
2016-05-27 16:45:48 +02:00
val ProtocolName = "artery"
2016-05-05 14:38:48 +02:00
val Version = 0
val MaximumFrameSize = 1024 * 1024
val MaximumPooledBuffers = 256
2016-05-20 12:40:56 +02:00
val MaximumLargeFrameSize = MaximumFrameSize * 5
2016-05-17 14:17:21 +02:00
/* *
* Internal API
2016-05-05 14:38:48 +02:00
*
2016-05-17 14:17:21 +02:00
* @return A port that is hopefully available
*/
private [ remote ] def autoSelectPort ( hostname : String ) : Int = {
val socket = DatagramChannel . open ( ) . socket ( )
socket . bind ( new InetSocketAddress ( hostname , 0 ) )
val port = socket . getLocalPort
socket . close ( )
port
}
}