Polish the API/SPI of remoting, see #2827

* Changed TransportAdapterProvider to support java impl
* Verified java impl of AbstractTransportAdapter and
  ActorTransportAdapter
* Privatized things that should not be public api
* Consistent usage of INTERNAL API marker in scaladoc
* Added some missing doc in conf
* Added missing SerialVersionUID
This commit is contained in:
Patrik Nordwall 2013-02-08 13:13:52 +01:00
parent af7ca554c9
commit 323e5c80b5
44 changed files with 340 additions and 125 deletions

View file

@ -98,7 +98,7 @@ akka {
# "scatter-gather", "broadcast"
# - or: Fully qualified class name of the router class.
# The class must extend akka.routing.CustomRouterConfig and
# have a constructor with com.typesafe.config.Config
# have a public constructor with com.typesafe.config.Config
# parameter.
# - default is "from-code";
# Whether or not an actor is transformed to a Router is decided in code
@ -194,7 +194,7 @@ akka {
# Must be one of the following
# Dispatcher, (BalancingDispatcher, only valid when all actors using it are
# of the same type), PinnedDispatcher, or a FQCN to a class inheriting
# MessageDispatcherConfigurator with a constructor with
# MessageDispatcherConfigurator with a public constructor with
# both com.typesafe.config.Config parameter and
# akka.dispatch.DispatcherPrerequisites parameters.
# PinnedDispatcher must be used toghether with executor=thread-pool-executor.
@ -286,7 +286,7 @@ akka {
mailbox-push-timeout-time = 10s
# FQCN of the MailboxType, if not specified the default bounded or unbounded
# mailbox is used. The Class of the FQCN must have a constructor with
# mailbox is used. The Class of the FQCN must have a public constructor with
# (akka.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters.
mailbox-type = ""
@ -388,7 +388,7 @@ akka {
# - akka.actor.LightArrayRevolverScheduler
# (to be benchmarked and evaluated)
# The class given here must implement the akka.actor.Scheduler interface
# and offer a constructor which takes three arguments:
# and offer a public constructor which takes three arguments:
# 1) com.typesafe.config.Config
# 2) akka.event.LoggingAdapter
# 3) java.util.concurrent.ThreadFactory

View file

@ -11,8 +11,9 @@ import java.util.regex.Pattern
import scala.annotation.tailrec
/**
* INTERNAL API
*
* Marker trait to show which Messages are automatically handled by Akka
* Internal use only
*/
private[akka] trait AutoReceivedMessage extends Serializable
@ -28,7 +29,7 @@ trait PossiblyHarmful
trait NoSerializationVerificationNeeded
/**
* Internal use only
* INTERNAL API
*/
@SerialVersionUID(2L)
private[akka] case class Failed(cause: Throwable, uid: Int) extends AutoReceivedMessage with PossiblyHarmful
@ -112,19 +113,19 @@ case object ReceiveTimeout extends ReceiveTimeout {
sealed trait SelectionPath extends AutoReceivedMessage with PossiblyHarmful
/**
* Internal use only
* INTERNAL API
*/
@SerialVersionUID(1L)
private[akka] case class SelectChildName(name: String, next: Any) extends SelectionPath
/**
* Internal use only
* INTERNAL API
*/
@SerialVersionUID(1L)
private[akka] case class SelectChildPattern(pattern: Pattern, next: Any) extends SelectionPath
/**
* Internal use only
* INTERNAL API
*/
@SerialVersionUID(1L)
private[akka] case class SelectParent(next: Any) extends SelectionPath

View file

@ -158,11 +158,11 @@ trait ActorRefProvider {
*/
trait ActorRefFactory {
/**
* INTERNAL USE ONLY
* INTERNAL API
*/
protected def systemImpl: ActorSystemImpl
/**
* INTERNAL USE ONLY
* INTERNAL API
*/
protected def provider: ActorRefProvider
@ -174,12 +174,12 @@ trait ActorRefFactory {
/**
* Father of all children created by this interface.
*
* INTERNAL USE ONLY
* INTERNAL API
*/
protected def guardian: InternalActorRef
/**
* INTERNAL USE ONLY
* INTERNAL API
*/
protected def lookupRoot: InternalActorRef

View file

@ -180,7 +180,7 @@ object ActorSystem {
}
/**
* INTERNAL USE ONLY
* INTERNAL API
*/
private[akka] def findClassLoader(): ClassLoader = {
def findCaller(get: Int Class[_]): ClassLoader =

View file

@ -80,12 +80,12 @@ object FSM {
case object StateTimeout
/**
* Internal API
* INTERNAL API
*/
private case class TimeoutMarker(generation: Long)
/**
* Internal API
* INTERNAL API
*/
private[akka] case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int)(context: ActorContext) {
private var ref: Option[Cancellable] = _
@ -154,7 +154,7 @@ object FSM {
}
/**
* Internal API.
* INTERNAL API.
*/
private[akka] def withStopReason(reason: Reason): State[S, D] = {
copy(stopReason = Some(reason))
@ -390,7 +390,7 @@ trait FSM[S, D] extends Listeners with ActorLogging {
final def setStateTimeout(state: S, timeout: Timeout): Unit = stateTimeouts(state) = timeout
/**
* Internal API, used for testing.
* INTERNAL API, used for testing.
*/
private[akka] final def isStateTimerActive = timeoutFuture.isDefined

View file

@ -167,9 +167,9 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
}
/**
* Represents the serialized form of a MethodCall, uses readResolve and writeReplace to marshall the call
* INTERNAL API
*
* INTERNAL USE ONLY
* Represents the serialized form of a MethodCall, uses readResolve and writeReplace to marshall the call
*/
private[akka] case class SerializedMethodCall(ownerType: Class[_], methodName: String, parameterTypes: Array[Class[_]], serializedParameters: Array[(Int, Class[_], Array[Byte])]) {
@ -240,9 +240,9 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
implicit def dispatcher = context.dispatcher
/**
* Implementation of TypedActor as an Actor
* INTERNAL API
*
* INTERNAL USE ONLY
* Implementation of TypedActor as an Actor
*/
private[akka] class TypedActor[R <: AnyRef, T <: R](val proxyVar: AtomVar[R], createInstance: T) extends Actor {
val me = withContext[T](createInstance)
@ -398,7 +398,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
}
/**
* INTERNAL USE ONLY
* INTERNAL API
*/
private[akka] class TypedActorInvocationHandler(@transient val extension: TypedActorExtension, @transient val actorVar: AtomVar[ActorRef], @transient val timeout: Timeout) extends InvocationHandler with Serializable {
@ -412,7 +412,8 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
implicit val dispatcher = extension.system.dispatcher
import akka.pattern.ask
MethodCall(method, args) match {
case m if m.isOneWay actor ! m; null //Null return value
case m if m.isOneWay
actor ! m; null //Null return value
case m if m.returnsFuture ask(actor, m)(timeout) map {
case NullResponse null
case other other
@ -433,7 +434,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
}
/**
* INTERNAL USE ONLY
* INTERNAL API
*/
private[akka] case class SerializedTypedActorInvocationHandler(val actor: ActorRef, val timeout: FiniteDuration) {
@throws(classOf[ObjectStreamException]) private def readResolve(): AnyRef = JavaSerializer.currentSystem.value match {
@ -650,7 +651,7 @@ class TypedActorExtension(val system: ExtendedActorSystem) extends TypedActorFac
// Private API
/**
* INTERNAL USE ONLY
* INTERNAL API
*/
private[akka] def createActorRefProxy[R <: AnyRef, T <: R](props: TypedProps[T], proxyVar: AtomVar[R], actorRef: ActorRef): R = {
//Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling
@ -671,7 +672,7 @@ class TypedActorExtension(val system: ExtendedActorSystem) extends TypedActorFac
}
/**
* INTERNAL USE ONLY
* INTERNAL API
*/
private[akka] def invocationHandlerFor(@deprecatedName('typedActor_?) typedActor: AnyRef): TypedActorInvocationHandler =
if ((typedActor ne null) && classOf[Proxy].isAssignableFrom(typedActor.getClass) && Proxy.isProxyClass(typedActor.getClass)) typedActor match {

View file

@ -39,7 +39,7 @@ class BalancingDispatcher(
extends Dispatcher(_prerequisites, _id, throughput, throughputDeadlineTime, mailboxType, _executorServiceFactoryProvider, _shutdownTimeout) {
/**
* INTERNAL USE ONLY
* INTERNAL API
*/
private[akka] val team = new ConcurrentSkipListSet[ActorCell](
Helpers.identityHashComparator(new Comparator[ActorCell] {
@ -47,7 +47,7 @@ class BalancingDispatcher(
}))
/**
* INTERNAL USE ONLY
* INTERNAL API
*/
private[akka] val messageQueue: MessageQueue = mailboxType.create(None, None)

View file

@ -47,7 +47,7 @@ class Dispatcher(
protected final def executorService: ExecutorServiceDelegate = executorServiceDelegate
/**
* INTERNAL USE ONLY
* INTERNAL API
*/
protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {
val mbox = receiver.mailbox
@ -56,7 +56,7 @@ class Dispatcher(
}
/**
* INTERNAL USE ONLY
* INTERNAL API
*/
protected[akka] def systemDispatch(receiver: ActorCell, invocation: SystemMessage): Unit = {
val mbox = receiver.mailbox
@ -65,7 +65,7 @@ class Dispatcher(
}
/**
* INTERNAL USE ONLY
* INTERNAL API
*/
protected[akka] def executeTask(invocation: TaskInvocation) {
try {
@ -83,13 +83,13 @@ class Dispatcher(
}
/**
* INTERNAL USE ONLY
* INTERNAL API
*/
protected[akka] def createMailbox(actor: akka.actor.Cell): Mailbox =
new Mailbox(mailboxType.create(Some(actor.self), Some(actor.system))) with DefaultSystemMessageQueue
/**
* INTERNAL USE ONLY
* INTERNAL API
*/
protected[akka] def shutdown: Unit = {
val newDelegate = executorServiceDelegate.copy() // Doesn't matter which one we copy
@ -104,7 +104,7 @@ class Dispatcher(
/**
* Returns if it was registered
*
* INTERNAL USE ONLY
* INTERNAL API
*/
protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = {
if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races

View file

@ -24,7 +24,7 @@ trait DispatcherPrerequisites {
}
/**
* INTERNAL USE ONLY
* INTERNAL API
*/
private[akka] case class DefaultDispatcherPrerequisites(
val threadFactory: ThreadFactory,
@ -114,6 +114,8 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
}
/**
* INTERNAL API
*
* Creates a dispatcher from a Config. Internal test purpose only.
*
* ex: from(config.getConfig(id))
@ -122,22 +124,20 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
*
* Throws: IllegalArgumentException if the value of "type" is not valid
* IllegalArgumentException if it cannot create the MessageDispatcherConfigurator
*
* INTERNAL USE ONLY
*/
private[akka] def from(cfg: Config): MessageDispatcher = configuratorFrom(cfg).dispatcher()
private[akka] def isBalancingDispatcher(id: String): Boolean = settings.config.hasPath(id) && config(id).getString("type") == "BalancingDispatcher"
/**
* INTERNAL API
*
* Creates a MessageDispatcherConfigurator from a Config.
*
* The Config must also contain a `id` property, which is the identifier of the dispatcher.
*
* Throws: IllegalArgumentException if the value of "type" is not valid
* IllegalArgumentException if it cannot create the MessageDispatcherConfigurator
*
* INTERNAL USE ONLY
*/
private def configuratorFrom(cfg: Config): MessageDispatcherConfigurator = {
if (!cfg.hasPath("id")) throw new IllegalArgumentException("Missing dispatcher 'id' property in config: " + cfg.root.render)

View file

@ -347,7 +347,7 @@ trait MessageQueue {
}
/**
* INTERNAL USE ONLY
* INTERNAL API
*/
private[akka] trait SystemMessageQueue {
/**
@ -364,7 +364,7 @@ private[akka] trait SystemMessageQueue {
}
/**
* INTERNAL USE ONLY
* INTERNAL API
*/
private[akka] trait DefaultSystemMessageQueue { self: Mailbox

View file

@ -75,7 +75,7 @@ akka {
# FQCN of the failure detector implementation.
# It must implement akka.remote.FailureDetector and have
# a constructor with a com.typesafe.config.Config parameter.
# a public constructor with a com.typesafe.config.Config parameter.
implementation-class = "akka.remote.PhiAccrualFailureDetector"
# How often keep-alive heartbeat messages should be sent to each connection.
@ -136,7 +136,7 @@ akka {
# FQCN of the metrics collector implementation.
# It must implement akka.cluster.cluster.MetricsCollector and
# have constructor with akka.actor.ActorSystem parameter.
# have public constructor with akka.actor.ActorSystem parameter.
# The default SigarMetricsCollector uses JMX and Hyperic SIGAR, if SIGAR
# is on the classpath, otherwise only JMX.
collector-class = "akka.cluster.SigarMetricsCollector"
@ -175,7 +175,7 @@ akka {
# - available: "mix", "heap", "cpu", "load"
# - or: Fully qualified class name of the MetricsSelector class.
# The class must extend akka.cluster.routing.MetricsSelector
# and have a constructor with com.typesafe.config.Config
# and have a public constructor with com.typesafe.config.Config
# parameter.
# - default is "mix"
metrics-selector = mix

View file

@ -42,7 +42,7 @@ import akka.cluster.routing.MetricsSelector
* extension, i.e. the cluster will automatically be started when
* the `ClusterActorRefProvider` is used.
*/
class ClusterActorRefProvider(
private[akka] class ClusterActorRefProvider(
_systemName: String,
_settings: ActorSystem.Settings,
_eventStream: EventStream,

View file

@ -84,7 +84,7 @@ trait ClusterNodeMBean {
}
/**
* Internal API
* INTERNAL API
*/
private[akka] class ClusterJmx(cluster: Cluster, log: LoggingAdapter) {

View file

@ -9,7 +9,7 @@ import scala.collection.immutable
import MemberStatus._
/**
* Internal API
* INTERNAL API
*/
private[cluster] object Gossip {
val emptyMembers: immutable.SortedSet[Member] = immutable.SortedSet.empty

View file

@ -20,18 +20,18 @@ import akka.AkkaException
object ClusterSingletonManager {
/**
* Internal API
* INTERNAL API
* public due to the `with FSM` type parameters
*/
sealed trait State
/**
* Internal API
* INTERNAL API
* public due to the `with FSM` type parameters
*/
sealed trait Data
/**
* Internal API
* INTERNAL API
*/
private object Internal {
/**

View file

@ -136,7 +136,7 @@ akka {
# FQCN of the failure detector implementation.
# It must implement akka.remote.FailureDetector and have
# a constructor with a com.typesafe.config.Config parameter.
# a public constructor with a com.typesafe.config.Config parameter.
implementation-class = "akka.remote.PhiAccrualFailureDetector"
# How often keep-alive heartbeat messages should be sent to each connection.
@ -206,7 +206,9 @@ akka {
# name to the applied-adapters setting in the configuration of a
# transport. The available adapters should be configured in this
# section by providing a name, and the fully qualified name of
# their corresponding implementation
# their corresponding implementation. The class given here
# must implement akka.akka.remote.transport.TransportAdapterProvider
# and have public constructor without parameters.
adapters {
gremlin = "akka.remote.transport.FailureInjectorProvider"
trttl = "akka.remote.transport.ThrottlerProvider"
@ -215,6 +217,10 @@ akka {
### Default configuration for the Netty based transport drivers
netty.tcp {
# The class given here must implement the akka.remote.transport.Transport
# interface and offer a public constructor which takes two arguments:
# 1) akka.actor.ExtendedActorSystem
# 2) com.typesafe.config.Config
transport-class = "akka.remote.transport.netty.NettyTransport"
# Transport drivers can be augmented with adapters by adding their

View file

@ -15,11 +15,11 @@ import akka.remote.transport.AssociationHandle._
import akka.remote.transport.{ AkkaPduCodec, Transport, AssociationHandle }
import akka.serialization.Serialization
import akka.util.ByteString
import util.control.{ NoStackTrace, NonFatal }
import scala.util.control.{ NoStackTrace, NonFatal }
import akka.remote.transport.Transport.InvalidAssociationException
/**
* Internal API
* INTERNAL API
*/
private[remote] trait InboundMessageDispatcher {
def dispatch(recipient: InternalActorRef,
@ -28,6 +28,9 @@ private[remote] trait InboundMessageDispatcher {
senderOption: Option[ActorRef]): Unit
}
/**
* INTERNAL API
*/
private[remote] class DefaultMessageDispatcher(private val system: ExtendedActorSystem,
private val provider: RemoteActorRefProvider,
private val log: LoggingAdapter) extends InboundMessageDispatcher {
@ -88,7 +91,7 @@ private[remote] class DefaultMessageDispatcher(private val system: ExtendedActor
}
/**
* Internal API
* INTERNAL API
*/
private[remote] object EndpointWriter {
@ -109,13 +112,23 @@ private[remote] object EndpointWriter {
case object Handoff extends State
}
/**
* INTERNAL API
*/
@SerialVersionUID(1L)
private[remote] class EndpointException(msg: String, cause: Throwable) extends AkkaException(msg, cause) with OnlyCauseStackTrace {
def this(msg: String) = this(msg, null)
}
/**
* INTERNAL API
*/
private[remote] case class InvalidAssociation(localAddress: Address, remoteAddress: Address, cause: Throwable)
extends EndpointException("Invalid address: " + remoteAddress, cause)
/**
* INTERNAL API
*/
private[remote] class EndpointWriter(
handleOrActive: Option[AssociationHandle],
val localAddress: Address,
@ -279,6 +292,9 @@ private[remote] class EndpointWriter(
}
/**
* INTERNAL API
*/
private[remote] class EndpointReader(
val codec: AkkaPduCodec,
val localAddress: Address,

View file

@ -3,7 +3,7 @@
*/
package akka.remote
import java.util.concurrent.TimeUnit._
import java.util.concurrent.TimeUnit.NANOSECONDS
/**
* A failure detector must be a thread-safe mutable construct that registers heartbeat events of a resource and is able to

View file

@ -4,14 +4,14 @@
package akka.remote
import language.existentials
import akka.remote.RemoteProtocol._
import com.google.protobuf.ByteString
import akka.actor.ExtendedActorSystem
import akka.serialization.SerializationExtension
/**
* INTERNAL API
*
* MessageSerializer is a helper for serializing and deserialize messages
*/
private[akka] object MessageSerializer {

View file

@ -144,9 +144,7 @@ class PhiAccrualFailureDetector(
val mean = history.mean
val stdDeviation = ensureValidStdDeviation(history.stdDeviation)
val φ = phi(timeDiff, mean + acceptableHeartbeatPauseMillis, stdDeviation)
φ
phi(timeDiff, mean + acceptableHeartbeatPauseMillis, stdDeviation)
}
}

View file

@ -15,7 +15,10 @@ import akka.actor.SystemGuardian.{ TerminationHookDone, TerminationHook, Registe
import scala.util.control.Exception.Catcher
import scala.concurrent.{ ExecutionContext, Future }
object RemoteActorRefProvider {
/**
* INTERNAL API
*/
private[akka] object RemoteActorRefProvider {
private case class Internals(transport: RemoteTransport, serialization: Serialization, remoteDaemon: InternalActorRef)
sealed trait TerminatorState
@ -88,13 +91,13 @@ object RemoteActorRefProvider {
}
/**
* INTERNAL API
* Depending on this class is not supported, only the [[akka.actor.ActorRefProvider]] interface is supported.
*
* Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it.
*
* INTERNAL API!
*
* Depending on this class is not supported, only the [[ActorRefProvider]] interface is supported.
*/
class RemoteActorRefProvider(
private[akka] class RemoteActorRefProvider(
val systemName: String,
val settings: ActorSystem.Settings,
val eventStream: EventStream,
@ -272,7 +275,7 @@ class RemoteActorRefProvider(
case _ local.actorFor(ref, path)
}
/*
/**
* INTERNAL API
* Called in deserialization of incoming remote messages. In this case the correct local address is known, therefore
* this method is faster than the actorFor above.
@ -317,6 +320,7 @@ private[akka] trait RemoteRef extends ActorRefScope {
}
/**
* INTERNAL API
* Remote ActorRef that is used when referencing the Actor on a different node than its "home" node.
* This reference is network-aware (remembers its origin) and immutable.
*/

View file

@ -13,15 +13,23 @@ import akka.actor.ActorRefWithCell
import akka.actor.ActorRefScope
import akka.util.Switch
/**
* INTERNAL API
*/
private[akka] sealed trait DaemonMsg
/**
* INTERNAL API
*/
@SerialVersionUID(1L)
private[akka] case class DaemonMsgCreate(props: Props, deploy: Deploy, path: String, supervisor: ActorRef) extends DaemonMsg
/**
* INTERNAL API
*
* Internal system "daemon" actor for remote internal communication.
*
* It acts as the brain of the remote that responds to system remote events (messages) and undertakes action.
*
* INTERNAL USE ONLY!
*/
private[akka] class RemoteSystemDaemon(
system: ActorSystemImpl,

View file

@ -15,6 +15,9 @@ case class RemoteScope(node: Address) extends Scope {
def withFallback(other: Scope): Scope = this
}
/**
* INTERNAL API
*/
private[akka] class RemoteDeployer(_settings: ActorSystem.Settings, _pm: DynamicAccess) extends Deployer(_settings, _pm) {
override def parseConfig(path: String, config: Config): Option[Deploy] = {
import scala.collection.JavaConverters._

View file

@ -7,7 +7,7 @@ import com.typesafe.config.Config
import scala.concurrent.duration._
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.util.Timeout
import scala.collection.immutable.Seq
import scala.collection.immutable
import akka.japi.Util._
class RemoteSettings(val config: Config) {
@ -44,7 +44,7 @@ class RemoteSettings(val config: Config) {
val CommandAckTimeout: Timeout =
Timeout(Duration(getMilliseconds("akka.remote.command-ack-timeout"), MILLISECONDS))
val Transports: Seq[(String, Seq[String], Config)] = transportNames.map { name
val Transports: immutable.Seq[(String, immutable.Seq[String], Config)] = transportNames.map { name
val transportConfig = transportConfigFor(name)
(transportConfig.getString("transport-class"),
immutableSeq(transportConfig.getStringList("applied-adapters")).reverse,
@ -53,7 +53,7 @@ class RemoteSettings(val config: Config) {
val Adapters: Map[String, String] = configToMap(getConfig("akka.remote.adapters"))
private def transportNames: Seq[String] = immutableSeq(getStringList("akka.remote.enabled-transports"))
private def transportNames: immutable.Seq[String] = immutableSeq(getStringList("akka.remote.enabled-transports"))
private def transportConfigFor(transportName: String): Config = getConfig(transportName)

View file

@ -4,7 +4,6 @@
package akka.remote
import scala.reflect.BeanProperty
import akka.dispatch.SystemMessage
import akka.event.{ LoggingAdapter, Logging }
import akka.AkkaException
@ -18,9 +17,12 @@ import scala.concurrent.Future
* RemoteTransportException represents a general failure within a RemoteTransport,
* such as inability to start, wrong configuration etc.
*/
@SerialVersionUID(1L)
class RemoteTransportException(message: String, cause: Throwable) extends AkkaException(message, cause)
/**
* INTERNAL API
*
* The remote transport is responsible for sending and receiving messages.
* Each transport has an address, which it should provide in
* Serialization.currentTransportAddress (thread-local) while serializing
@ -28,7 +30,7 @@ class RemoteTransportException(message: String, cause: Throwable) extends AkkaEx
* be available (i.e. fully initialized) by the time the first message is
* received or when the start() method returns, whatever happens first.
*/
abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: RemoteActorRefProvider) {
private[akka] abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: RemoteActorRefProvider) {
/**
* Shuts down the remoting
*/

View file

@ -3,7 +3,6 @@
*/
package akka.remote
import scala.language.postfixOps
import akka.actor.SupervisorStrategy._
import akka.actor._
import akka.event.{ Logging, LoggingAdapter }
@ -23,11 +22,20 @@ import scala.concurrent.{ Promise, Await, Future }
import scala.util.control.NonFatal
import scala.util.{ Failure, Success }
/**
* INTERNAL API
*/
private[remote] object AddressUrlEncoder {
def apply(address: Address): String = URLEncoder.encode(address.toString, "utf-8")
}
/**
* INTERNAL API
*/
private[remote] case class RARP(provider: RemoteActorRefProvider) extends Extension
/**
* INTERNAL API
*/
private[remote] object RARP extends ExtensionId[RARP] with ExtensionIdProvider {
override def lookup() = RARP
@ -35,6 +43,9 @@ private[remote] object RARP extends ExtensionId[RARP] with ExtensionIdProvider {
override def createExtension(system: ExtendedActorSystem) = RARP(system.provider.asInstanceOf[RemoteActorRefProvider])
}
/**
* INTERNAL API
*/
private[remote] object Remoting {
final val EndpointManagerName = "endpointManager"
@ -48,7 +59,7 @@ private[remote] object Remoting {
responsibleTransports.size match {
case 0
throw new RemoteTransportException(
s"No transport is responsible for address: $remote although protocol ${remote.protocol} is available." +
s"No transport is responsible for address: [$remote] although protocol [${remote.protocol}] is available." +
" Make sure at least one transport is configured to be responsible for the address.",
null)
@ -63,7 +74,7 @@ private[remote] object Remoting {
null)
}
case None throw new RemoteTransportException(
s"No transport is loaded for protocol: ${remote.protocol}, available protocols: ${transportMapping.keys.mkString}", null)
s"No transport is loaded for protocol: [${remote.protocol}], available protocols: [${transportMapping.keys.mkString}]", null)
}
}
@ -81,6 +92,9 @@ private[remote] object Remoting {
}
/**
* INTERNAL API
*/
private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) {
@volatile private var endpointManager: Option[ActorRef] = None
@ -199,6 +213,9 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
}
/**
* INTERNAL API
*/
private[remote] object EndpointManager {
// Messages between Remoting and EndpointManager
@ -243,7 +260,7 @@ private[remote] object EndpointManager {
def registerWritableEndpoint(address: Address, endpoint: ActorRef): ActorRef = addressToWritable.get(address) match {
case Some(Pass(e))
throw new IllegalArgumentException(s"Attempting to overwrite existing endpoint $e with $endpoint")
throw new IllegalArgumentException(s"Attempting to overwrite existing endpoint [$e] with [$endpoint]")
case _
addressToWritable += address -> Pass(endpoint)
writableToAddress += endpoint -> address
@ -309,6 +326,9 @@ private[remote] object EndpointManager {
}
}
/**
* INTERNAL API
*/
private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends Actor {
import EndpointManager._
@ -483,7 +503,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
adapters.map { TransportAdaptersExtension.get(context.system).getAdapterProvider(_) }.foldLeft(driver) {
(t: Transport, provider: TransportAdapterProvider)
// The TransportAdapterProvider will wrap the given Transport and returns with a wrapped one
provider(t, context.system.asInstanceOf[ExtendedActorSystem])
provider.create(t, context.system.asInstanceOf[ExtendedActorSystem])
}
// Apply AkkaProtocolTransport wrapper to the end of the chain

View file

@ -5,14 +5,13 @@ package akka.remote
import akka.event.{ LoggingAdapter, Logging }
import akka.actor.{ ActorSystem, Address }
import scala.beans.BeanProperty
import java.util.{ Set JSet }
import scala.collection.JavaConverters.setAsJavaSetConverter
@SerialVersionUID(1L)
sealed trait RemotingLifecycleEvent extends Serializable {
def logLevel: Logging.LogLevel
}
@SerialVersionUID(1L)
sealed trait AssociationEvent extends RemotingLifecycleEvent {
def localAddress: Address
def remoteAddress: Address
@ -24,55 +23,65 @@ sealed trait AssociationEvent extends RemotingLifecycleEvent {
override def toString: String = s"$eventName [$localAddress]${if (inbound) " <- " else " -> "}[$remoteAddress]"
}
@SerialVersionUID(1L)
final case class AssociatedEvent(
localAddress: Address,
remoteAddress: Address,
inbound: Boolean)
extends AssociationEvent {
protected override val eventName: String = "Associated"
protected override def eventName: String = "Associated"
override def logLevel: Logging.LogLevel = Logging.DebugLevel
}
@SerialVersionUID(1L)
final case class DisassociatedEvent(
localAddress: Address,
remoteAddress: Address,
inbound: Boolean)
extends AssociationEvent {
protected override val eventName: String = "Disassociated"
protected override def eventName: String = "Disassociated"
override def logLevel: Logging.LogLevel = Logging.DebugLevel
}
@SerialVersionUID(1L)
final case class AssociationErrorEvent(
cause: Throwable,
localAddress: Address,
remoteAddress: Address,
inbound: Boolean) extends AssociationEvent {
protected override val eventName: String = "AssociationError"
protected override def eventName: String = "AssociationError"
override def logLevel: Logging.LogLevel = Logging.ErrorLevel
override def toString: String = s"${super.toString}: Error[${Logging.stackTraceFor(cause)}]"
override def toString: String = s"${super.toString}: Error [${cause.getMessage}] [${Logging.stackTraceFor(cause)}]"
def getCause: Throwable = cause
}
@SerialVersionUID(1L)
final case class RemotingListenEvent(listenAddresses: Set[Address]) extends RemotingLifecycleEvent {
def getListenAddresses: JSet[Address] = listenAddresses.asJava
def getListenAddresses: java.util.Set[Address] =
scala.collection.JavaConverters.setAsJavaSetConverter(listenAddresses).asJava
override def logLevel: Logging.LogLevel = Logging.InfoLevel
override def toString: String = "Remoting now listens on addresses: " + listenAddresses.mkString("[", ", ", "]")
}
@SerialVersionUID(1L)
case object RemotingShutdownEvent extends RemotingLifecycleEvent {
override def logLevel: Logging.LogLevel = Logging.InfoLevel
override val toString: String = "Remoting shut down"
}
@SerialVersionUID(1L)
final case class RemotingErrorEvent(cause: Throwable) extends RemotingLifecycleEvent {
def getCause: Throwable = cause
override def logLevel: Logging.LogLevel = Logging.ErrorLevel
override def toString: String = s"Remoting error: [${Logging.stackTraceFor(cause)}]"
override def toString: String = s"Remoting error: [${cause.getMessage}] [${Logging.stackTraceFor(cause)}]"
}
class EventPublisher(system: ActorSystem, log: LoggingAdapter, logEvents: Boolean) {
/**
* INTERNAL API
*/
private[remote] class EventPublisher(system: ActorSystem, log: LoggingAdapter, logEvents: Boolean) {
def notifyListeners(message: RemotingLifecycleEvent): Unit = {
system.eventStream.publish(message)
if (logEvents) log.log(message.logLevel, "{}", message)

View file

@ -7,7 +7,7 @@ import org.uncommons.maths.random.{ AESCounterRNG }
import SeedSize.Seed128
/**
* Internal API
* INTERNAL API
* This class is a wrapper around the 128-bit AESCounterRNG algorithm provided by http://maths.uncommons.org/
* It uses the default seed generator which uses one of the following 3 random seed sources:
* Depending on availability: random.org, /dev/random, and SecureRandom (provided by Java)

View file

@ -7,7 +7,7 @@ import org.uncommons.maths.random.{ AESCounterRNG, SecureRandomSeedGenerator }
import SeedSize.Seed128
/**
* Internal API
* INTERNAL API
* This class is a wrapper around the 128-bit AESCounterRNG algorithm provided by http://maths.uncommons.org/
* The only method used by netty ssl is engineNextBytes(bytes)
* This RNG is good to use to prevent startup delay when you don't have Internet access to random.org

View file

@ -7,7 +7,7 @@ import org.uncommons.maths.random.{ AESCounterRNG }
import SeedSize.Seed256
/**
* Internal API
* INTERNAL API
* This class is a wrapper around the 256-bit AESCounterRNG algorithm provided by http://maths.uncommons.org/
* It uses the default seed generator which uses one of the following 3 random seed sources:
* Depending on availability: random.org, /dev/random, and SecureRandom (provided by Java)

View file

@ -7,7 +7,7 @@ import org.uncommons.maths.random.{ AESCounterRNG, SecureRandomSeedGenerator }
import SeedSize.Seed256
/**
* Internal API
* INTERNAL API
* This class is a wrapper around the 256-bit AESCounterRNG algorithm provided by http://maths.uncommons.org/
* The only method used by netty ssl is engineNextBytes(bytes)
* This RNG is good to use to prevent startup delay when you don't have Internet access to random.org

View file

@ -19,7 +19,7 @@ import org.uncommons.maths.random.{ SeedGenerator, SeedException, SecureRandomSe
import scala.collection.immutable
/**
* Internal API
* INTERNAL API
* Seed generator that maintains multiple strategies for seed
* generation and will delegate to the best one available for the
* current operating environment.

View file

@ -5,12 +5,12 @@
package akka.remote.security.provider
/**
* Internal API
* INTERNAL API
* From AESCounterRNG API docs:
* Valid values are 16 (128 bits), 24 (192 bits) and 32 (256 bits).
* Any other values will result in an exception from the AES implementation.
*
* Internal API
* INTERNAL API
*/
private[provider] object SeedSize {
val Seed128 = 16

View file

@ -3,12 +3,9 @@
*/
package akka.remote.transport
import scala.language.postfixOps
import akka.actor._
import akka.pattern.{ ask, pipe }
import akka.remote.Remoting.RegisterTransportActor
import akka.remote.transport.ActorTransportAdapter.ListenUnderlying
import akka.remote.transport.ActorTransportAdapter.ListenerRegistered
import akka.remote.transport.Transport._
import akka.remote.RARP
import akka.util.Timeout
@ -17,7 +14,12 @@ import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, Promise, Future }
import scala.util.Success
trait TransportAdapterProvider extends ((Transport, ExtendedActorSystem) Transport)
trait TransportAdapterProvider {
/**
* Create the transport adapter that wraps an underlying transport.
*/
def create(wrappedTransport: Transport, system: ExtendedActorSystem): Transport
}
class TransportAdapters(system: ExtendedActorSystem) extends Extension {
val settings = RARP(system).provider.remoteSettings
@ -123,7 +125,7 @@ object ActorTransportAdapter {
upstreamListener: Future[AssociationEventListener]) extends TransportOperation
case object DisassociateUnderlying extends TransportOperation
implicit val AskTimeout = Timeout(5 seconds)
implicit val AskTimeout = Timeout(5.seconds)
}
abstract class ActorTransportAdapter(wrappedTransport: Transport, system: ActorSystem)
@ -158,6 +160,8 @@ abstract class ActorTransportAdapter(wrappedTransport: Transport, system: ActorS
}
abstract class ActorTransportAdapterManager extends Actor {
import ActorTransportAdapter.{ ListenUnderlying, ListenerRegistered }
private var delayedEvents = immutable.Queue.empty[Any]
protected var associationListener: AssociationEventListener = _

View file

@ -12,10 +12,14 @@ import akka.util.ByteString
import com.google.protobuf.InvalidProtocolBufferException
import com.google.protobuf.{ ByteString PByteString }
class PduCodecException(msg: String, cause: Throwable) extends AkkaException(msg, cause)
/**
* INTERNAL API
*/
@SerialVersionUID(1L)
private[remote] class PduCodecException(msg: String, cause: Throwable) extends AkkaException(msg, cause)
/**
* Internal API
* INTERNAL API
*
* Companion object of the [[akka.remote.transport.AkkaPduCodec]] trait. Contains the representation case classes
* of decoded Akka Protocol Data Units (PDUs).
@ -39,6 +43,8 @@ private[remote] object AkkaPduCodec {
}
/**
* INTERNAL API
*
* A Codec that is able to convert Akka PDUs (Protocol Data Units) from and to [[akka.util.ByteString]]s.
*/
private[remote] trait AkkaPduCodec {
@ -89,6 +95,9 @@ private[remote] trait AkkaPduCodec {
senderOption: Option[ActorRef]): ByteString
}
/**
* INTERNAL API
*/
private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec {
override def constructMessage(

View file

@ -23,6 +23,7 @@ import scala.collection.immutable
import akka.remote.transport.ActorTransportAdapter._
import akka.ConfigurationException
@SerialVersionUID(1L)
class AkkaProtocolException(msg: String, cause: Throwable) extends AkkaException(msg, cause) with OnlyCauseStackTrace {
def this(msg: String) = this(msg, null)
}

View file

@ -15,27 +15,43 @@ import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.concurrent.{ Future, Promise }
import scala.util.control.NoStackTrace
@SerialVersionUID(1L)
case class FailureInjectorException(msg: String) extends AkkaException(msg) with NoStackTrace
class FailureInjectorProvider extends TransportAdapterProvider {
def apply(wrappedTransport: Transport, system: ExtendedActorSystem): Transport =
override def create(wrappedTransport: Transport, system: ExtendedActorSystem): Transport =
new FailureInjectorTransportAdapter(wrappedTransport, system)
}
/**
* INTERNAL API
*/
private[remote] object FailureInjectorTransportAdapter {
val FailureInjectorSchemeIdentifier = "gremlin"
trait FailureInjectorCommand
@SerialVersionUID(1L)
case class All(mode: GremlinMode)
@SerialVersionUID(1L)
case class One(remoteAddress: Address, mode: GremlinMode)
sealed trait GremlinMode
case object PassThru extends GremlinMode
@SerialVersionUID(1L)
case object PassThru extends GremlinMode {
/**
* Java API: get the singleton instance
*/
def getInstance = this
}
@SerialVersionUID(1L)
case class Drop(outboundDropP: Double, inboundDropP: Double) extends GremlinMode
}
/**
* INTERNAL API
*/
private[remote] class FailureInjectorTransportAdapter(wrappedTransport: Transport, val extendedSystem: ExtendedActorSystem)
extends AbstractTransportAdapter(wrappedTransport)(extendedSystem.dispatcher) with AssociationEventListener {
@ -112,6 +128,9 @@ private[remote] class FailureInjectorTransportAdapter(wrappedTransport: Transpor
}
}
/**
* INTERNAL API
*/
private[remote] case class FailureInjectorHandle(_wrappedHandle: AssociationHandle,
private val gremlinAdapter: FailureInjectorTransportAdapter)
extends AbstractTransportAdapterHandle(_wrappedHandle, FailureInjectorSchemeIdentifier)

View file

@ -9,7 +9,6 @@ import akka.pattern.pipe
import akka.remote.transport.ActorTransportAdapter.AssociateUnderlying
import akka.remote.transport.AkkaPduCodec.Associate
import akka.remote.transport.AssociationHandle.{ ActorHandleEventListener, Disassociated, InboundPayload, HandleEventListener }
import akka.remote.transport.ThrottledAssociation._
import akka.remote.transport.ThrottlerManager.Checkin
import akka.remote.transport.ThrottlerTransportAdapter._
import akka.remote.transport.Transport._
@ -27,7 +26,7 @@ import scala.concurrent.duration._
class ThrottlerProvider extends TransportAdapterProvider {
def apply(wrappedTransport: Transport, system: ExtendedActorSystem): Transport =
override def create(wrappedTransport: Transport, system: ExtendedActorSystem): Transport =
new ThrottlerTransportAdapter(wrappedTransport, system)
}
@ -41,32 +40,61 @@ object ThrottlerTransportAdapter {
}
object Direction {
@SerialVersionUID(1L)
case object Send extends Direction {
override def includes(other: Direction): Boolean = other match {
case Send true
case _ false
}
/**
* Java API: get the singleton instance
*/
def getInstance = this
}
@SerialVersionUID(1L)
case object Receive extends Direction {
override def includes(other: Direction): Boolean = other match {
case Receive true
case _ false
}
/**
* Java API: get the singleton instance
*/
def getInstance = this
}
@SerialVersionUID(1L)
case object Both extends Direction {
override def includes(other: Direction): Boolean = true
/**
* Java API: get the singleton instance
*/
def getInstance = this
}
}
object SetThrottle
@SerialVersionUID(1L)
case class SetThrottle(address: Address, direction: Direction, mode: ThrottleMode)
case object SetThrottleAck
@SerialVersionUID(1L)
case object SetThrottleAck {
/**
* Java API: get the singleton instance
*/
def getInstance = this
}
sealed trait ThrottleMode {
def tryConsumeTokens(nanoTimeOfSend: Long, tokens: Int): (ThrottleMode, Boolean)
def timeToAvailable(currentNanoTime: Long, tokens: Int): FiniteDuration
}
@SerialVersionUID(1L)
case class TokenBucket(capacity: Int, tokensPerSecond: Double, nanoTimeOfLastSend: Long, availableTokens: Int)
extends ThrottleMode {
@ -92,14 +120,27 @@ object ThrottlerTransportAdapter {
(TimeUnit.NANOSECONDS.toMillis(nanoTimeOfSend - nanoTimeOfLastSend) * tokensPerSecond / 1000.0).toInt
}
@SerialVersionUID(1L)
case object Unthrottled extends ThrottleMode {
override def tryConsumeTokens(nanoTimeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = (this, true)
override def timeToAvailable(currentNanoTime: Long, tokens: Int): FiniteDuration = Duration.Zero
/**
* Java API: get the singleton instance
*/
def getInstance = this
}
@SerialVersionUID(1L)
case object Blackhole extends ThrottleMode {
override def tryConsumeTokens(nanoTimeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = (this, false)
override def timeToAvailable(currentNanoTime: Long, tokens: Int): FiniteDuration = Duration.Zero
/**
* Java API: get the singleton instance
*/
def getInstance = this
}
}
@ -122,11 +163,17 @@ class ThrottlerTransportAdapter(_wrappedTransport: Transport, _system: ExtendedA
}
}
/**
* INTERNAL API
*/
private[transport] object ThrottlerManager {
case class OriginResolved()
case class Checkin(origin: Address, handle: ThrottlerHandle)
}
/**
* INTERNAL API
*/
private[transport] class ThrottlerManager(wrappedTransport: Transport) extends ActorTransportAdapterManager {
import context.dispatcher
@ -215,7 +262,10 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A
}
object ThrottledAssociation {
/**
* INTERNAL API
*/
private[transport] object ThrottledAssociation {
private final val DequeueTimerName = "dequeue"
case object Dequeue
@ -247,12 +297,16 @@ object ThrottledAssociation {
case class ExposedHandle(handle: ThrottlerHandle) extends ThrottlerData
}
/**
* INTERNAL API
*/
private[transport] class ThrottledAssociation(
val manager: ActorRef,
val associationHandler: AssociationEventListener,
val originalHandle: AssociationHandle,
val inbound: Boolean)
extends Actor with LoggingFSM[ThrottlerState, ThrottlerData] {
extends Actor with LoggingFSM[ThrottledAssociation.ThrottlerState, ThrottledAssociation.ThrottlerData] {
import ThrottledAssociation._
import context.dispatcher
var inboundThrottleMode: ThrottleMode = _
@ -398,6 +452,9 @@ private[transport] class ThrottledAssociation(
}
/**
* INTERNAL API
*/
private[transport] case class ThrottlerHandle(_wrappedHandle: AssociationHandle, throttlerActor: ActorRef)
extends AbstractTransportAdapterHandle(_wrappedHandle, SchemeIdentifier) {

View file

@ -17,6 +17,7 @@ object Transport {
* Indicates that the association setup request is invalid, and it is impossible to recover (malformed IP address,
* hostname, etc.).
*/
@SerialVersionUID(1L)
case class InvalidAssociationException(msg: String, cause: Throwable) extends AkkaException(msg, cause)
/**

View file

@ -8,6 +8,9 @@ import java.nio.channels.ClosedChannelException
import org.jboss.netty.channel._
import scala.util.control.NonFatal
/**
* INTERNAL API
*/
private[netty] trait NettyHelpers {
protected def onConnect(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {}
@ -30,6 +33,9 @@ private[netty] trait NettyHelpers {
}
}
/**
* INTERNAL API
*/
private[netty] trait NettyServerHelpers extends SimpleChannelUpstreamHandler with NettyHelpers {
final override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = {
@ -55,6 +61,9 @@ private[netty] trait NettyServerHelpers extends SimpleChannelUpstreamHandler wit
}
}
/**
* INTERNAL API
*/
private[netty] trait NettyClientHelpers extends SimpleChannelHandler with NettyHelpers {
final override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = {
super.messageReceived(ctx, e)

View file

@ -15,6 +15,9 @@ import java.security._
import javax.net.ssl.{ KeyManagerFactory, TrustManager, TrustManagerFactory, SSLContext }
import org.jboss.netty.handler.ssl.SslHandler
/**
* INTERNAL API
*/
private[akka] class SSLSettings(config: Config) {
import config._
@ -45,8 +48,9 @@ private[akka] class SSLSettings(config: Config) {
}
/**
* INTERNAL API
*
* Used for adding SSL support to Netty pipeline
* Internal use only
*/
private[akka] object NettySSLSupport {

View file

@ -25,7 +25,7 @@ import org.jboss.netty.handler.ssl.SslHandler
import scala.concurrent.duration.{ Duration, FiniteDuration, MILLISECONDS }
import scala.concurrent.{ ExecutionContext, Promise, Future, blocking }
import scala.util.{ Failure, Success, Try }
import util.control.{ NoStackTrace, NonFatal }
import scala.util.control.{ NoStackTrace, NonFatal }
object NettyTransportSettings {
sealed trait Mode
@ -60,6 +60,7 @@ object NettyFutureBridge {
}
}
@SerialVersionUID(1L)
class NettyTransportException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) with OnlyCauseStackTrace {
def this(msg: String) = this(msg, null)
}
@ -71,7 +72,7 @@ class NettyTransportSettings(config: Config) {
val TransportMode: Mode = getString("transport-protocol") match {
case "tcp" Tcp
case "udp" Udp
case unknown throw new ConfigurationException(s"Unknown transport: $unknown")
case unknown throw new ConfigurationException(s"Unknown transport: [$unknown]")
}
val EnableSsl: Boolean = if (getBoolean("enable-ssl") && TransportMode == Udp)
@ -123,7 +124,10 @@ class NettyTransportSettings(config: Config) {
}
trait CommonHandlers extends NettyHelpers {
/**
* INTERNAL API
*/
private[netty] trait CommonHandlers extends NettyHelpers {
protected val transport: NettyTransport
final override def onOpen(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = transport.channelGroup.add(e.getChannel)
@ -153,7 +157,10 @@ trait CommonHandlers extends NettyHelpers {
}
}
abstract class ServerHandler(protected final val transport: NettyTransport,
/**
* INTERNAL API
*/
private[netty] abstract class ServerHandler(protected final val transport: NettyTransport,
private final val associationListenerFuture: Future[AssociationEventListener])
extends NettyServerHelpers with CommonHandlers {
@ -172,7 +179,10 @@ abstract class ServerHandler(protected final val transport: NettyTransport,
}
abstract class ClientHandler(protected final val transport: NettyTransport, remoteAddress: Address)
/**
* INTERNAL API
*/
private[netty] abstract class ClientHandler(protected final val transport: NettyTransport, remoteAddress: Address)
extends NettyClientHelpers with CommonHandlers {
final protected val statusPromise = Promise[AssociationHandle]()
def statusFuture = statusPromise.future
@ -183,6 +193,9 @@ abstract class ClientHandler(protected final val transport: NettyTransport, remo
}
/**
* INTERNAL API
*/
private[transport] object NettyTransport {
// 4 bytes will be used to represent the frame length. Used by netty LengthFieldPrepender downstream handler.
val FrameLengthFieldLength = 4
@ -209,16 +222,19 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
implicit val executionContext: ExecutionContext = system.dispatcher
override val schemeIdentifier: String = (if (EnableSsl) "ssl." else "") + TransportMode
override val maximumPayloadBytes: Int = 32000 // The number of octets required by the remoting specification
override def maximumPayloadBytes: Int = 32000 // The number of octets required by the remoting specification
private final val isDatagram: Boolean = TransportMode == Udp
private final val isDatagram = TransportMode == Udp
@volatile private var localAddress: Address = _
@volatile private var serverChannel: Channel = _
private val log = Logging(system, this.getClass)
final val udpConnectionTable = new ConcurrentHashMap[SocketAddress, HandleEventListener]()
/**
* INTERNAL API
*/
private[netty] final val udpConnectionTable = new ConcurrentHashMap[SocketAddress, HandleEventListener]()
/*
* Be aware, that the close() method of DefaultChannelGroup is racy, because it uses an iterator over a ConcurrentHashMap.

View file

@ -14,11 +14,17 @@ import org.jboss.netty.channel._
import scala.concurrent.{ Future, Promise }
import scala.util.{ Success, Failure }
/**
* INTERNAL API
*/
private[remote] object ChannelLocalActor extends ChannelLocal[Option[HandleEventListener]] {
override def initialValue(channel: Channel): Option[HandleEventListener] = None
def notifyListener(channel: Channel, msg: HandleEvent): Unit = get(channel) foreach { _ notify msg }
}
/**
* INTERNAL API
*/
private[remote] trait TcpHandlers extends CommonHandlers {
import ChannelLocalActor._
@ -45,6 +51,9 @@ private[remote] trait TcpHandlers extends CommonHandlers {
}
}
/**
* INTERNAL API
*/
private[remote] class TcpServerHandler(_transport: NettyTransport, _associationListenerFuture: Future[AssociationEventListener])
extends ServerHandler(_transport, _associationListenerFuture) with TcpHandlers {
@ -53,6 +62,9 @@ private[remote] class TcpServerHandler(_transport: NettyTransport, _associationL
}
/**
* INTERNAL API
*/
private[remote] class TcpClientHandler(_transport: NettyTransport, remoteAddress: Address)
extends ClientHandler(_transport, remoteAddress) with TcpHandlers {
@ -61,6 +73,9 @@ private[remote] class TcpClientHandler(_transport: NettyTransport, remoteAddress
}
/**
* INTERNAL API
*/
private[remote] class TcpAssociationHandle(val localAddress: Address, val remoteAddress: Address, private val channel: Channel)
extends AssociationHandle {

View file

@ -13,6 +13,9 @@ import org.jboss.netty.buffer.{ ChannelBuffer, ChannelBuffers }
import org.jboss.netty.channel._
import scala.concurrent.{ Future, Promise }
/**
* INTERNAL API
*/
private[remote] trait UdpHandlers extends CommonHandlers {
override def createHandle(channel: Channel, localAddress: Address, remoteAddress: Address): AssociationHandle =
@ -46,6 +49,9 @@ private[remote] trait UdpHandlers extends CommonHandlers {
def initUdp(channel: Channel, remoteSocketAddress: SocketAddress, msg: ChannelBuffer): Unit
}
/**
* INTERNAL API
*/
private[remote] class UdpServerHandler(_transport: NettyTransport, _associationListenerFuture: Future[AssociationEventListener])
extends ServerHandler(_transport, _associationListenerFuture) with UdpHandlers {
@ -53,6 +59,9 @@ private[remote] class UdpServerHandler(_transport: NettyTransport, _associationL
initInbound(channel, remoteSocketAddress, msg)
}
/**
* INTERNAL API
*/
private[remote] class UdpClientHandler(_transport: NettyTransport, remoteAddress: Address)
extends ClientHandler(_transport, remoteAddress) with UdpHandlers {
@ -60,6 +69,9 @@ private[remote] class UdpClientHandler(_transport: NettyTransport, remoteAddress
initOutbound(channel, remoteSocketAddress, msg)
}
/**
* INTERNAL API
*/
private[remote] class UdpAssociationHandle(val localAddress: Address,
val remoteAddress: Address,
private val channel: Channel,