add/change private visibility
This commit is contained in:
parent
b116810578
commit
74df8226de
23 changed files with 90 additions and 58 deletions
|
|
@ -27,7 +27,10 @@ import io.aeron.Publication
|
||||||
import org.agrona.concurrent.UnsafeBuffer
|
import org.agrona.concurrent.UnsafeBuffer
|
||||||
import org.agrona.hints.ThreadHints
|
import org.agrona.hints.ThreadHints
|
||||||
|
|
||||||
object AeronSink {
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
private[remote] object AeronSink {
|
||||||
|
|
||||||
final class GaveUpMessageException(msg: String) extends RuntimeException(msg) with NoStackTrace
|
final class GaveUpMessageException(msg: String) extends RuntimeException(msg) with NoStackTrace
|
||||||
|
|
||||||
|
|
@ -73,9 +76,10 @@ object AeronSink {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* INTERNAL API
|
||||||
* @param channel eg. "aeron:udp?endpoint=localhost:40123"
|
* @param channel eg. "aeron:udp?endpoint=localhost:40123"
|
||||||
*/
|
*/
|
||||||
class AeronSink(
|
private[remote] class AeronSink(
|
||||||
channel: String,
|
channel: String,
|
||||||
streamId: Int,
|
streamId: Int,
|
||||||
aeron: Aeron,
|
aeron: Aeron,
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,10 @@ import org.agrona.hints.ThreadHints
|
||||||
import akka.stream.stage.GraphStageWithMaterializedValue
|
import akka.stream.stage.GraphStageWithMaterializedValue
|
||||||
import scala.util.control.NonFatal
|
import scala.util.control.NonFatal
|
||||||
|
|
||||||
object AeronSource {
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
private[remote] object AeronSource {
|
||||||
|
|
||||||
private def pollTask(sub: Subscription, handler: MessageHandler, onMessage: AsyncCallback[EnvelopeBuffer]): () ⇒ Boolean = {
|
private def pollTask(sub: Subscription, handler: MessageHandler, onMessage: AsyncCallback[EnvelopeBuffer]): () ⇒ Boolean = {
|
||||||
() ⇒
|
() ⇒
|
||||||
|
|
@ -64,9 +67,10 @@ object AeronSource {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* INTERNAL API
|
||||||
* @param channel eg. "aeron:udp?endpoint=localhost:40123"
|
* @param channel eg. "aeron:udp?endpoint=localhost:40123"
|
||||||
*/
|
*/
|
||||||
class AeronSource(
|
private[remote] class AeronSource(
|
||||||
channel: String,
|
channel: String,
|
||||||
streamId: Int,
|
streamId: Int,
|
||||||
aeron: Aeron,
|
aeron: Aeron,
|
||||||
|
|
|
||||||
|
|
@ -148,7 +148,7 @@ private[akka] object ArterySettings {
|
||||||
def apply(config: Config) = new ArterySettings(config)
|
def apply(config: Config) = new ArterySettings(config)
|
||||||
|
|
||||||
/** INTERNAL API */
|
/** INTERNAL API */
|
||||||
private[akka] final class Compression private[ArterySettings] (config: Config) {
|
private[remote] final class Compression private[ArterySettings] (config: Config) {
|
||||||
import config._
|
import config._
|
||||||
|
|
||||||
final val Enabled = true
|
final val Enabled = true
|
||||||
|
|
|
||||||
|
|
@ -72,7 +72,7 @@ import akka.remote.artery.Association.OutboundStreamMatValues
|
||||||
* Inbound API that is used by the stream stages.
|
* Inbound API that is used by the stream stages.
|
||||||
* Separate trait to facilitate testing without real transport.
|
* Separate trait to facilitate testing without real transport.
|
||||||
*/
|
*/
|
||||||
private[akka] trait InboundContext {
|
private[remote] trait InboundContext {
|
||||||
/**
|
/**
|
||||||
* The local inbound address.
|
* The local inbound address.
|
||||||
*/
|
*/
|
||||||
|
|
@ -105,7 +105,7 @@ private[akka] trait InboundContext {
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] object AssociationState {
|
private[remote] object AssociationState {
|
||||||
def apply(): AssociationState =
|
def apply(): AssociationState =
|
||||||
new AssociationState(
|
new AssociationState(
|
||||||
incarnation = 1,
|
incarnation = 1,
|
||||||
|
|
@ -121,7 +121,7 @@ private[akka] object AssociationState {
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] final class AssociationState(
|
private[remote] final class AssociationState(
|
||||||
val incarnation: Int,
|
val incarnation: Int,
|
||||||
val uniqueRemoteAddressPromise: Promise[UniqueAddress],
|
val uniqueRemoteAddressPromise: Promise[UniqueAddress],
|
||||||
val quarantined: ImmutableLongMap[AssociationState.QuarantinedTimestamp]) {
|
val quarantined: ImmutableLongMap[AssociationState.QuarantinedTimestamp]) {
|
||||||
|
|
@ -188,7 +188,7 @@ private[akka] final class AssociationState(
|
||||||
* Outbound association API that is used by the stream stages.
|
* Outbound association API that is used by the stream stages.
|
||||||
* Separate trait to facilitate testing without real transport.
|
* Separate trait to facilitate testing without real transport.
|
||||||
*/
|
*/
|
||||||
private[akka] trait OutboundContext {
|
private[remote] trait OutboundContext {
|
||||||
/**
|
/**
|
||||||
* The local inbound address.
|
* The local inbound address.
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -157,7 +157,7 @@ private[remote] class Association(
|
||||||
// in case there is a restart at the same time as a compression table update
|
// in case there is a restart at the same time as a compression table update
|
||||||
private val changeCompressionTimeout = 5.seconds
|
private val changeCompressionTimeout = 5.seconds
|
||||||
|
|
||||||
private[artery] def changeActorRefCompression(table: CompressionTable[ActorRef]): Future[Done] = {
|
private[remote] def changeActorRefCompression(table: CompressionTable[ActorRef]): Future[Done] = {
|
||||||
import transport.system.dispatcher
|
import transport.system.dispatcher
|
||||||
val c = changeOutboundCompression
|
val c = changeOutboundCompression
|
||||||
val result =
|
val result =
|
||||||
|
|
@ -170,7 +170,7 @@ private[remote] class Association(
|
||||||
private[this] val streamMatValues = new AtomicReference(Map.empty[Int, OutboundStreamMatValues])
|
private[this] val streamMatValues = new AtomicReference(Map.empty[Int, OutboundStreamMatValues])
|
||||||
private[this] val idle = new AtomicReference[Option[Cancellable]](None)
|
private[this] val idle = new AtomicReference[Option[Cancellable]](None)
|
||||||
|
|
||||||
private[artery] def changeClassManifestCompression(table: CompressionTable[String]): Future[Done] = {
|
private[remote] def changeClassManifestCompression(table: CompressionTable[String]): Future[Done] = {
|
||||||
import transport.system.dispatcher
|
import transport.system.dispatcher
|
||||||
val c = changeOutboundCompression
|
val c = changeOutboundCompression
|
||||||
val result =
|
val result =
|
||||||
|
|
|
||||||
|
|
@ -52,7 +52,10 @@ private[remote] final class ByteFlag(val mask: Byte) extends AnyVal {
|
||||||
def isEnabled(byteFlags: Byte): Boolean = (byteFlags.toInt & mask) != 0
|
def isEnabled(byteFlags: Byte): Boolean = (byteFlags.toInt & mask) != 0
|
||||||
override def toString = s"ByteFlag(${ByteFlag.binaryLeftPad(mask)})"
|
override def toString = s"ByteFlag(${ByteFlag.binaryLeftPad(mask)})"
|
||||||
}
|
}
|
||||||
object ByteFlag {
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
private[remote] object ByteFlag {
|
||||||
def binaryLeftPad(byte: Byte): String = {
|
def binaryLeftPad(byte: Byte): String = {
|
||||||
val string = Integer.toBinaryString(byte)
|
val string = Integer.toBinaryString(byte)
|
||||||
val pad = "0" * (8 - string.length) // leftPad
|
val pad = "0" * (8 - string.length) // leftPad
|
||||||
|
|
|
||||||
|
|
@ -255,7 +255,7 @@ private[remote] object Decoder {
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] final class ActorRefResolveCacheWithAddress(provider: RemoteActorRefProvider, localAddress: UniqueAddress)
|
private[remote] final class ActorRefResolveCacheWithAddress(provider: RemoteActorRefProvider, localAddress: UniqueAddress)
|
||||||
extends LruBoundedCache[String, InternalActorRef](capacity = 1024, evictAgeThreshold = 600) {
|
extends LruBoundedCache[String, InternalActorRef](capacity = 1024, evictAgeThreshold = 600) {
|
||||||
|
|
||||||
override protected def compute(k: String): InternalActorRef =
|
override protected def compute(k: String): InternalActorRef =
|
||||||
|
|
|
||||||
|
|
@ -21,39 +21,39 @@ import akka.util.OptionVal
|
||||||
import akka.event.Logging
|
import akka.event.Logging
|
||||||
|
|
||||||
/** INTERNAL API: marker trait for protobuf-serializable artery messages */
|
/** INTERNAL API: marker trait for protobuf-serializable artery messages */
|
||||||
private[akka] trait ArteryMessage extends Serializable
|
private[remote] trait ArteryMessage extends Serializable
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API: Marker trait for reply messages
|
* INTERNAL API: Marker trait for reply messages
|
||||||
*/
|
*/
|
||||||
private[akka] trait Reply extends ControlMessage
|
private[remote] trait Reply extends ControlMessage
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
* Marker trait for control messages that can be sent via the system message sub-channel
|
* Marker trait for control messages that can be sent via the system message sub-channel
|
||||||
* but don't need full reliable delivery. E.g. `HandshakeReq` and `Reply`.
|
* but don't need full reliable delivery. E.g. `HandshakeReq` and `Reply`.
|
||||||
*/
|
*/
|
||||||
private[akka] trait ControlMessage extends ArteryMessage
|
private[remote] trait ControlMessage extends ArteryMessage
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] final case class Quarantined(from: UniqueAddress, to: UniqueAddress) extends ControlMessage
|
private[remote] final case class Quarantined(from: UniqueAddress, to: UniqueAddress) extends ControlMessage
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] case class ActorSystemTerminating(from: UniqueAddress) extends ControlMessage
|
private[remote] case class ActorSystemTerminating(from: UniqueAddress) extends ControlMessage
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] case class ActorSystemTerminatingAck(from: UniqueAddress) extends ArteryMessage
|
private[remote] case class ActorSystemTerminatingAck(from: UniqueAddress) extends ArteryMessage
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] object InboundControlJunction {
|
private[remote] object InboundControlJunction {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Observer subject for inbound control messages.
|
* Observer subject for inbound control messages.
|
||||||
|
|
@ -86,7 +86,7 @@ private[akka] object InboundControlJunction {
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] class InboundControlJunction
|
private[remote] class InboundControlJunction
|
||||||
extends GraphStageWithMaterializedValue[FlowShape[InboundEnvelope, InboundEnvelope], InboundControlJunction.ControlMessageSubject] {
|
extends GraphStageWithMaterializedValue[FlowShape[InboundEnvelope, InboundEnvelope], InboundControlJunction.ControlMessageSubject] {
|
||||||
import InboundControlJunction._
|
import InboundControlJunction._
|
||||||
|
|
||||||
|
|
@ -154,8 +154,8 @@ private[akka] class InboundControlJunction
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] object OutboundControlJunction {
|
private[remote] object OutboundControlJunction {
|
||||||
private[akka] trait OutboundControlIngress {
|
private[remote] trait OutboundControlIngress {
|
||||||
def sendControlMessage(message: ControlMessage): Unit
|
def sendControlMessage(message: ControlMessage): Unit
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -163,7 +163,7 @@ private[akka] object OutboundControlJunction {
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] class OutboundControlJunction(
|
private[remote] class OutboundControlJunction(
|
||||||
outboundContext: OutboundContext, outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope])
|
outboundContext: OutboundContext, outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope])
|
||||||
extends GraphStageWithMaterializedValue[FlowShape[OutboundEnvelope, OutboundEnvelope], OutboundControlJunction.OutboundControlIngress] {
|
extends GraphStageWithMaterializedValue[FlowShape[OutboundEnvelope, OutboundEnvelope], OutboundControlJunction.OutboundControlIngress] {
|
||||||
import OutboundControlJunction._
|
import OutboundControlJunction._
|
||||||
|
|
|
||||||
|
|
@ -263,15 +263,15 @@ private[remote] object FlightRecorder {
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] sealed trait FlightRecorderStatus
|
private[remote] sealed trait FlightRecorderStatus
|
||||||
case object Running extends FlightRecorderStatus
|
private[remote] case object Running extends FlightRecorderStatus
|
||||||
case object ShutDown extends FlightRecorderStatus
|
private[remote] case object ShutDown extends FlightRecorderStatus
|
||||||
final case class SnapshotInProgress(latch: CountDownLatch) extends FlightRecorderStatus
|
private[remote] final case class SnapshotInProgress(latch: CountDownLatch) extends FlightRecorderStatus
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] class FlightRecorder(val fileChannel: FileChannel) extends AtomicReference[FlightRecorderStatus](Running) {
|
private[remote] class FlightRecorder(val fileChannel: FileChannel) extends AtomicReference[FlightRecorderStatus](Running) {
|
||||||
import FlightRecorder._
|
import FlightRecorder._
|
||||||
|
|
||||||
private[this] val globalSection = new MappedResizeableBuffer(fileChannel, 0, GlobalSectionSize)
|
private[this] val globalSection = new MappedResizeableBuffer(fileChannel, 0, GlobalSectionSize)
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,9 @@
|
||||||
package akka.remote.artery
|
package akka.remote.artery
|
||||||
|
|
||||||
object FlightRecorderEvents {
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
private[remote] object FlightRecorderEvents {
|
||||||
|
|
||||||
// Note: Remember to update dictionary when adding new events!
|
// Note: Remember to update dictionary when adding new events!
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,7 @@ import akka.actor.Address
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] object OutboundHandshake {
|
private[remote] object OutboundHandshake {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stream is failed with this exception if the handshake is not completed
|
* Stream is failed with this exception if the handshake is not completed
|
||||||
|
|
@ -50,7 +50,7 @@ private[akka] object OutboundHandshake {
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] class OutboundHandshake(
|
private[remote] class OutboundHandshake(
|
||||||
system: ActorSystem,
|
system: ActorSystem,
|
||||||
outboundContext: OutboundContext,
|
outboundContext: OutboundContext,
|
||||||
outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope],
|
outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope],
|
||||||
|
|
@ -162,7 +162,7 @@ private[akka] class OutboundHandshake(
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] class InboundHandshake(inboundContext: InboundContext, inControlStream: Boolean) extends GraphStage[FlowShape[InboundEnvelope, InboundEnvelope]] {
|
private[remote] class InboundHandshake(inboundContext: InboundContext, inControlStream: Boolean) extends GraphStage[FlowShape[InboundEnvelope, InboundEnvelope]] {
|
||||||
val in: Inlet[InboundEnvelope] = Inlet("InboundHandshake.in")
|
val in: Inlet[InboundEnvelope] = Inlet("InboundHandshake.in")
|
||||||
val out: Outlet[InboundEnvelope] = Outlet("InboundHandshake.out")
|
val out: Outlet[InboundEnvelope] = Outlet("InboundHandshake.out")
|
||||||
override val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out)
|
override val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out)
|
||||||
|
|
|
||||||
|
|
@ -64,7 +64,7 @@ private[remote] object ReusableInboundEnvelope {
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] final class ReusableInboundEnvelope extends InboundEnvelope {
|
private[remote] final class ReusableInboundEnvelope extends InboundEnvelope {
|
||||||
private var _recipient: OptionVal[InternalActorRef] = OptionVal.None
|
private var _recipient: OptionVal[InternalActorRef] = OptionVal.None
|
||||||
private var _sender: OptionVal[ActorRef] = OptionVal.None
|
private var _sender: OptionVal[ActorRef] = OptionVal.None
|
||||||
private var _originUid: Long = 0L
|
private var _originUid: Long = 0L
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,7 @@ import akka.actor.ActorSelectionMessage
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] class InboundQuarantineCheck(inboundContext: InboundContext) extends GraphStage[FlowShape[InboundEnvelope, InboundEnvelope]] {
|
private[remote] class InboundQuarantineCheck(inboundContext: InboundContext) extends GraphStage[FlowShape[InboundEnvelope, InboundEnvelope]] {
|
||||||
val in: Inlet[InboundEnvelope] = Inlet("InboundQuarantineCheck.in")
|
val in: Inlet[InboundEnvelope] = Inlet("InboundQuarantineCheck.in")
|
||||||
val out: Outlet[InboundEnvelope] = Outlet("InboundQuarantineCheck.out")
|
val out: Outlet[InboundEnvelope] = Outlet("InboundQuarantineCheck.out")
|
||||||
override val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out)
|
override val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out)
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,10 @@ import akka.util.{ OptionVal, Unsafe }
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
import scala.reflect.ClassTag
|
import scala.reflect.ClassTag
|
||||||
|
|
||||||
object FastHash {
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
private[remote] object FastHash {
|
||||||
|
|
||||||
// Fast hash based on the 128 bit Xorshift128+ PRNG. Mixes in character bits into the random generator state.
|
// Fast hash based on the 128 bit Xorshift128+ PRNG. Mixes in character bits into the random generator state.
|
||||||
def ofString(s: String): Int = {
|
def ofString(s: String): Int = {
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,7 @@ import akka.event.LoggingReceive
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] class MessageDispatcher(
|
private[remote] class MessageDispatcher(
|
||||||
system: ExtendedActorSystem,
|
system: ExtendedActorSystem,
|
||||||
provider: RemoteActorRefProvider) {
|
provider: RemoteActorRefProvider) {
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@ import akka.util.OptionVal
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] object OutboundEnvelope {
|
private[remote] object OutboundEnvelope {
|
||||||
def apply(
|
def apply(
|
||||||
recipient: OptionVal[RemoteActorRef],
|
recipient: OptionVal[RemoteActorRef],
|
||||||
message: AnyRef,
|
message: AnyRef,
|
||||||
|
|
@ -24,7 +24,7 @@ private[akka] object OutboundEnvelope {
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] trait OutboundEnvelope {
|
private[remote] trait OutboundEnvelope {
|
||||||
def recipient: OptionVal[RemoteActorRef]
|
def recipient: OptionVal[RemoteActorRef]
|
||||||
def message: AnyRef
|
def message: AnyRef
|
||||||
def sender: OptionVal[ActorRef]
|
def sender: OptionVal[ActorRef]
|
||||||
|
|
@ -37,7 +37,7 @@ private[akka] trait OutboundEnvelope {
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] object ReusableOutboundEnvelope {
|
private[remote] object ReusableOutboundEnvelope {
|
||||||
def createObjectPool(capacity: Int) = new ObjectPool[ReusableOutboundEnvelope](
|
def createObjectPool(capacity: Int) = new ObjectPool[ReusableOutboundEnvelope](
|
||||||
capacity,
|
capacity,
|
||||||
create = () ⇒ new ReusableOutboundEnvelope, clear = outEnvelope ⇒ outEnvelope.clear())
|
create = () ⇒ new ReusableOutboundEnvelope, clear = outEnvelope ⇒ outEnvelope.clear())
|
||||||
|
|
@ -46,7 +46,7 @@ private[akka] object ReusableOutboundEnvelope {
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] final class ReusableOutboundEnvelope extends OutboundEnvelope {
|
private[remote] final class ReusableOutboundEnvelope extends OutboundEnvelope {
|
||||||
private var _recipient: OptionVal[RemoteActorRef] = OptionVal.None
|
private var _recipient: OptionVal[RemoteActorRef] = OptionVal.None
|
||||||
private var _message: AnyRef = null
|
private var _message: AnyRef = null
|
||||||
private var _sender: OptionVal[ActorRef] = OptionVal.None
|
private var _sender: OptionVal[ActorRef] = OptionVal.None
|
||||||
|
|
|
||||||
|
|
@ -184,6 +184,9 @@ private[remote] final class MetadataMap[T >: Null] {
|
||||||
else s"MetadataMap(${backing.toList.mkString("[", ",", "]")})"
|
else s"MetadataMap(${backing.toList.mkString("[", ",", "]")})"
|
||||||
}
|
}
|
||||||
|
|
||||||
object MetadataMap {
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
private[remote] object MetadataMap {
|
||||||
def apply[T >: Null]() = new MetadataMap[T]
|
def apply[T >: Null]() = new MetadataMap[T]
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -11,14 +11,14 @@ import scala.annotation.tailrec
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] object RestartCounter {
|
private[remote] object RestartCounter {
|
||||||
final case class State(count: Int, deadline: Deadline)
|
final case class State(count: Int, deadline: Deadline)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API: Thread safe "restarts with duration" counter
|
* INTERNAL API: Thread safe "restarts with duration" counter
|
||||||
*/
|
*/
|
||||||
private[akka] class RestartCounter(maxRestarts: Int, restartTimeout: FiniteDuration) {
|
private[remote] class RestartCounter(maxRestarts: Int, restartTimeout: FiniteDuration) {
|
||||||
import RestartCounter._
|
import RestartCounter._
|
||||||
|
|
||||||
private val state = new AtomicReference[State](State(0, Deadline.now + restartTimeout))
|
private val state = new AtomicReference[State](State(0, Deadline.now + restartTimeout))
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,7 @@ import scala.util.control.NoStackTrace
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] object SystemMessageDelivery {
|
private[remote] object SystemMessageDelivery {
|
||||||
// FIXME serialization of these messages
|
// FIXME serialization of these messages
|
||||||
final case class SystemMessageEnvelope(message: AnyRef, seqNo: Long, ackReplyTo: UniqueAddress) extends ArteryMessage
|
final case class SystemMessageEnvelope(message: AnyRef, seqNo: Long, ackReplyTo: UniqueAddress) extends ArteryMessage
|
||||||
final case class Ack(seqNo: Long, from: UniqueAddress) extends Reply
|
final case class Ack(seqNo: Long, from: UniqueAddress) extends Reply
|
||||||
|
|
@ -51,7 +51,7 @@ private[akka] object SystemMessageDelivery {
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] class SystemMessageDelivery(
|
private[remote] class SystemMessageDelivery(
|
||||||
outboundContext: OutboundContext,
|
outboundContext: OutboundContext,
|
||||||
deadLetters: ActorRef,
|
deadLetters: ActorRef,
|
||||||
resendInterval: FiniteDuration,
|
resendInterval: FiniteDuration,
|
||||||
|
|
@ -260,7 +260,7 @@ private[akka] class SystemMessageDelivery(
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
private[akka] class SystemMessageAcker(inboundContext: InboundContext) extends GraphStage[FlowShape[InboundEnvelope, InboundEnvelope]] {
|
private[remote] class SystemMessageAcker(inboundContext: InboundContext) extends GraphStage[FlowShape[InboundEnvelope, InboundEnvelope]] {
|
||||||
import SystemMessageDelivery._
|
import SystemMessageDelivery._
|
||||||
|
|
||||||
val in: Inlet[InboundEnvelope] = Inlet("SystemMessageAcker.in")
|
val in: Inlet[InboundEnvelope] = Inlet("SystemMessageAcker.in")
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,10 @@ import akka.remote.artery.ControlMessage
|
||||||
|
|
||||||
// FIXME serialization
|
// FIXME serialization
|
||||||
/** INTERNAL API */
|
/** INTERNAL API */
|
||||||
object CompressionProtocol {
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
private[remote] object CompressionProtocol {
|
||||||
|
|
||||||
/** INTERNAL API */
|
/** INTERNAL API */
|
||||||
sealed trait CompressionMessage
|
sealed trait CompressionMessage
|
||||||
|
|
@ -56,9 +59,9 @@ object CompressionProtocol {
|
||||||
extends ControlMessage with CompressionMessage
|
extends ControlMessage with CompressionMessage
|
||||||
|
|
||||||
/** INTERNAL API */
|
/** INTERNAL API */
|
||||||
private[akka] object Events {
|
private[remote] object Events {
|
||||||
/** INTERNAL API */
|
/** INTERNAL API */
|
||||||
private[akka] sealed trait Event
|
private[remote] sealed trait Event
|
||||||
|
|
||||||
/** INTERNAL API */
|
/** INTERNAL API */
|
||||||
final case class HeavyHitterDetected(key: Any, id: Int, count: Long) extends Event
|
final case class HeavyHitterDetected(key: Any, id: Int, count: Long) extends Event
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@
|
||||||
package akka.remote.artery.compress
|
package akka.remote.artery.compress
|
||||||
|
|
||||||
/** INTERNAL API */
|
/** INTERNAL API */
|
||||||
private[artery] final case class DecompressionTable[T](originUid: Long, version: Byte, table: Array[T]) {
|
private[remote] final case class DecompressionTable[T](originUid: Long, version: Byte, table: Array[T]) {
|
||||||
|
|
||||||
private[this] val length = table.length
|
private[this] val length = table.length
|
||||||
|
|
||||||
|
|
@ -26,7 +26,7 @@ private[artery] final case class DecompressionTable[T](originUid: Long, version:
|
||||||
}
|
}
|
||||||
|
|
||||||
/** INTERNAL API */
|
/** INTERNAL API */
|
||||||
private[artery] object DecompressionTable {
|
private[remote] object DecompressionTable {
|
||||||
private[this] val _empty = DecompressionTable(0, 0, Array.empty)
|
private[this] val _empty = DecompressionTable(0, 0, Array.empty)
|
||||||
def empty[T] = _empty.asInstanceOf[DecompressionTable[T]]
|
def empty[T] = _empty.asInstanceOf[DecompressionTable[T]]
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -201,7 +201,10 @@ private[remote] final class InboundActorRefCompression(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
final class InboundManifestCompression(
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
private[remote] final class InboundManifestCompression(
|
||||||
system: ActorSystem,
|
system: ActorSystem,
|
||||||
settings: ArterySettings.Compression,
|
settings: ArterySettings.Compression,
|
||||||
originUid: Long,
|
originUid: Long,
|
||||||
|
|
@ -452,8 +455,8 @@ private[remote] abstract class InboundCompression[T >: Null](
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Must be implementeed by extending classes in order to send a [[akka.remote.artery.ControlMessage]]
|
* Must be implemented by extending classes in order to send a `ControlMessage`
|
||||||
* of apropriate type to the remote system in order to advertise the compression table to it.
|
* of appropriate type to the remote system in order to advertise the compression table to it.
|
||||||
*/
|
*/
|
||||||
protected def advertiseCompressionTable(association: OutboundContext, table: CompressionTable[T]): Unit
|
protected def advertiseCompressionTable(association: OutboundContext, table: CompressionTable[T]): Unit
|
||||||
|
|
||||||
|
|
@ -467,7 +470,10 @@ private[remote] abstract class InboundCompression[T >: Null](
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
final class UnknownCompressedIdException(id: Long)
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
private[akka] final class UnknownCompressedIdException(id: Long)
|
||||||
extends RuntimeException(
|
extends RuntimeException(
|
||||||
s"Attempted de-compress unknown id [$id]! " +
|
s"Attempted de-compress unknown id [$id]! " +
|
||||||
s"This could happen if this node has started a new ActorSystem bound to the same address as previously, " +
|
s"This could happen if this node has started a new ActorSystem bound to the same address as previously, " +
|
||||||
|
|
@ -479,7 +485,7 @@ final class UnknownCompressedIdException(id: Long)
|
||||||
*
|
*
|
||||||
* Literarily, no compression!
|
* Literarily, no compression!
|
||||||
*/
|
*/
|
||||||
case object NoInboundCompressions extends InboundCompressions {
|
private[remote] case object NoInboundCompressions extends InboundCompressions {
|
||||||
override def hitActorRef(originUid: Long, remote: Address, ref: ActorRef, n: Int): Unit = ()
|
override def hitActorRef(originUid: Long, remote: Address, ref: ActorRef, n: Int): Unit = ()
|
||||||
override def decompressActorRef(originUid: Long, tableVersion: Byte, idx: Int): OptionVal[ActorRef] =
|
override def decompressActorRef(originUid: Long, tableVersion: Byte, idx: Int): OptionVal[ActorRef] =
|
||||||
if (idx == -1) throw new IllegalArgumentException("Attemted decompression of illegal compression id: -1")
|
if (idx == -1) throw new IllegalArgumentException("Attemted decompression of illegal compression id: -1")
|
||||||
|
|
|
||||||
|
|
@ -285,7 +285,10 @@ private[remote] final class TopHeavyHitters[T >: Null](val max: Int)(implicit cl
|
||||||
s"${getClass.getSimpleName}(max:$max)"
|
s"${getClass.getSimpleName}(max:$max)"
|
||||||
}
|
}
|
||||||
|
|
||||||
object TopHeavyHitters {
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
private[remote] object TopHeavyHitters {
|
||||||
|
|
||||||
/** Value class to avoid mixing up count and hashCode in APIs. */
|
/** Value class to avoid mixing up count and hashCode in APIs. */
|
||||||
private[compress] final class HashCodeVal(val get: Int) extends AnyVal {
|
private[compress] final class HashCodeVal(val get: Int) extends AnyVal {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue