diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index de824c4ddb..09b0c9e9c2 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -98,7 +98,7 @@ akka { # "scatter-gather", "broadcast" # - or: Fully qualified class name of the router class. # The class must extend akka.routing.CustomRouterConfig and - # have a constructor with com.typesafe.config.Config + # have a public constructor with com.typesafe.config.Config # parameter. # - default is "from-code"; # Whether or not an actor is transformed to a Router is decided in code @@ -194,7 +194,7 @@ akka { # Must be one of the following # Dispatcher, (BalancingDispatcher, only valid when all actors using it are # of the same type), PinnedDispatcher, or a FQCN to a class inheriting - # MessageDispatcherConfigurator with a constructor with + # MessageDispatcherConfigurator with a public constructor with # both com.typesafe.config.Config parameter and # akka.dispatch.DispatcherPrerequisites parameters. # PinnedDispatcher must be used toghether with executor=thread-pool-executor. @@ -286,7 +286,7 @@ akka { mailbox-push-timeout-time = 10s # FQCN of the MailboxType, if not specified the default bounded or unbounded - # mailbox is used. The Class of the FQCN must have a constructor with + # mailbox is used. The Class of the FQCN must have a public constructor with # (akka.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters. mailbox-type = "" @@ -388,7 +388,7 @@ akka { # - akka.actor.LightArrayRevolverScheduler # (to be benchmarked and evaluated) # The class given here must implement the akka.actor.Scheduler interface - # and offer a constructor which takes three arguments: + # and offer a public constructor which takes three arguments: # 1) com.typesafe.config.Config # 2) akka.event.LoggingAdapter # 3) java.util.concurrent.ThreadFactory diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index a665d1029b..acd93b21f9 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -11,8 +11,9 @@ import java.util.regex.Pattern import scala.annotation.tailrec /** + * INTERNAL API + * * Marker trait to show which Messages are automatically handled by Akka - * Internal use only */ private[akka] trait AutoReceivedMessage extends Serializable @@ -28,7 +29,7 @@ trait PossiblyHarmful trait NoSerializationVerificationNeeded /** - * Internal use only + * INTERNAL API */ @SerialVersionUID(2L) private[akka] case class Failed(cause: Throwable, uid: Int) extends AutoReceivedMessage with PossiblyHarmful @@ -112,19 +113,19 @@ case object ReceiveTimeout extends ReceiveTimeout { sealed trait SelectionPath extends AutoReceivedMessage with PossiblyHarmful /** - * Internal use only + * INTERNAL API */ @SerialVersionUID(1L) private[akka] case class SelectChildName(name: String, next: Any) extends SelectionPath /** - * Internal use only + * INTERNAL API */ @SerialVersionUID(1L) private[akka] case class SelectChildPattern(pattern: Pattern, next: Any) extends SelectionPath /** - * Internal use only + * INTERNAL API */ @SerialVersionUID(1L) private[akka] case class SelectParent(next: Any) extends SelectionPath diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 689d1443f5..7853832b87 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -158,11 +158,11 @@ trait ActorRefProvider { */ trait ActorRefFactory { /** - * INTERNAL USE ONLY + * INTERNAL API */ protected def systemImpl: ActorSystemImpl /** - * INTERNAL USE ONLY + * INTERNAL API */ protected def provider: ActorRefProvider @@ -174,12 +174,12 @@ trait ActorRefFactory { /** * Father of all children created by this interface. * - * INTERNAL USE ONLY + * INTERNAL API */ protected def guardian: InternalActorRef /** - * INTERNAL USE ONLY + * INTERNAL API */ protected def lookupRoot: InternalActorRef diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 8d1c67385f..0285cec26a 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -180,7 +180,7 @@ object ActorSystem { } /** - * INTERNAL USE ONLY + * INTERNAL API */ private[akka] def findClassLoader(): ClassLoader = { def findCaller(get: Int ⇒ Class[_]): ClassLoader = diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index d6f6ac488f..ad70be0a0f 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -80,12 +80,12 @@ object FSM { case object StateTimeout /** - * Internal API + * INTERNAL API */ private case class TimeoutMarker(generation: Long) /** - * Internal API + * INTERNAL API */ private[akka] case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int)(context: ActorContext) { private var ref: Option[Cancellable] = _ @@ -154,7 +154,7 @@ object FSM { } /** - * Internal API. + * INTERNAL API. */ private[akka] def withStopReason(reason: Reason): State[S, D] = { copy(stopReason = Some(reason)) @@ -390,7 +390,7 @@ trait FSM[S, D] extends Listeners with ActorLogging { final def setStateTimeout(state: S, timeout: Timeout): Unit = stateTimeouts(state) = timeout /** - * Internal API, used for testing. + * INTERNAL API, used for testing. */ private[akka] final def isStateTimerActive = timeoutFuture.isDefined diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 263a1c309d..e159ff96fe 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -167,9 +167,9 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi } /** - * Represents the serialized form of a MethodCall, uses readResolve and writeReplace to marshall the call + * INTERNAL API * - * INTERNAL USE ONLY + * Represents the serialized form of a MethodCall, uses readResolve and writeReplace to marshall the call */ private[akka] case class SerializedMethodCall(ownerType: Class[_], methodName: String, parameterTypes: Array[Class[_]], serializedParameters: Array[(Int, Class[_], Array[Byte])]) { @@ -240,9 +240,9 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi implicit def dispatcher = context.dispatcher /** - * Implementation of TypedActor as an Actor + * INTERNAL API * - * INTERNAL USE ONLY + * Implementation of TypedActor as an Actor */ private[akka] class TypedActor[R <: AnyRef, T <: R](val proxyVar: AtomVar[R], createInstance: ⇒ T) extends Actor { val me = withContext[T](createInstance) @@ -398,7 +398,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi } /** - * INTERNAL USE ONLY + * INTERNAL API */ private[akka] class TypedActorInvocationHandler(@transient val extension: TypedActorExtension, @transient val actorVar: AtomVar[ActorRef], @transient val timeout: Timeout) extends InvocationHandler with Serializable { @@ -412,7 +412,8 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi implicit val dispatcher = extension.system.dispatcher import akka.pattern.ask MethodCall(method, args) match { - case m if m.isOneWay ⇒ actor ! m; null //Null return value + case m if m.isOneWay ⇒ + actor ! m; null //Null return value case m if m.returnsFuture ⇒ ask(actor, m)(timeout) map { case NullResponse ⇒ null case other ⇒ other @@ -433,7 +434,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi } /** - * INTERNAL USE ONLY + * INTERNAL API */ private[akka] case class SerializedTypedActorInvocationHandler(val actor: ActorRef, val timeout: FiniteDuration) { @throws(classOf[ObjectStreamException]) private def readResolve(): AnyRef = JavaSerializer.currentSystem.value match { @@ -650,7 +651,7 @@ class TypedActorExtension(val system: ExtendedActorSystem) extends TypedActorFac // Private API /** - * INTERNAL USE ONLY + * INTERNAL API */ private[akka] def createActorRefProxy[R <: AnyRef, T <: R](props: TypedProps[T], proxyVar: AtomVar[R], actorRef: ⇒ ActorRef): R = { //Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling @@ -671,7 +672,7 @@ class TypedActorExtension(val system: ExtendedActorSystem) extends TypedActorFac } /** - * INTERNAL USE ONLY + * INTERNAL API */ private[akka] def invocationHandlerFor(@deprecatedName('typedActor_?) typedActor: AnyRef): TypedActorInvocationHandler = if ((typedActor ne null) && classOf[Proxy].isAssignableFrom(typedActor.getClass) && Proxy.isProxyClass(typedActor.getClass)) typedActor match { diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 895d48a0e7..841a359b87 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -39,7 +39,7 @@ class BalancingDispatcher( extends Dispatcher(_prerequisites, _id, throughput, throughputDeadlineTime, mailboxType, _executorServiceFactoryProvider, _shutdownTimeout) { /** - * INTERNAL USE ONLY + * INTERNAL API */ private[akka] val team = new ConcurrentSkipListSet[ActorCell]( Helpers.identityHashComparator(new Comparator[ActorCell] { @@ -47,7 +47,7 @@ class BalancingDispatcher( })) /** - * INTERNAL USE ONLY + * INTERNAL API */ private[akka] val messageQueue: MessageQueue = mailboxType.create(None, None) diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index b1f4557815..cc101e6311 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -47,7 +47,7 @@ class Dispatcher( protected final def executorService: ExecutorServiceDelegate = executorServiceDelegate /** - * INTERNAL USE ONLY + * INTERNAL API */ protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = { val mbox = receiver.mailbox @@ -56,7 +56,7 @@ class Dispatcher( } /** - * INTERNAL USE ONLY + * INTERNAL API */ protected[akka] def systemDispatch(receiver: ActorCell, invocation: SystemMessage): Unit = { val mbox = receiver.mailbox @@ -65,7 +65,7 @@ class Dispatcher( } /** - * INTERNAL USE ONLY + * INTERNAL API */ protected[akka] def executeTask(invocation: TaskInvocation) { try { @@ -83,13 +83,13 @@ class Dispatcher( } /** - * INTERNAL USE ONLY + * INTERNAL API */ protected[akka] def createMailbox(actor: akka.actor.Cell): Mailbox = new Mailbox(mailboxType.create(Some(actor.self), Some(actor.system))) with DefaultSystemMessageQueue /** - * INTERNAL USE ONLY + * INTERNAL API */ protected[akka] def shutdown: Unit = { val newDelegate = executorServiceDelegate.copy() // Doesn't matter which one we copy @@ -104,7 +104,7 @@ class Dispatcher( /** * Returns if it was registered * - * INTERNAL USE ONLY + * INTERNAL API */ protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = { if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index 3758bd6df0..b5a91618be 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -24,7 +24,7 @@ trait DispatcherPrerequisites { } /** - * INTERNAL USE ONLY + * INTERNAL API */ private[akka] case class DefaultDispatcherPrerequisites( val threadFactory: ThreadFactory, @@ -114,6 +114,8 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc } /** + * INTERNAL API + * * Creates a dispatcher from a Config. Internal test purpose only. * * ex: from(config.getConfig(id)) @@ -122,22 +124,20 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc * * Throws: IllegalArgumentException if the value of "type" is not valid * IllegalArgumentException if it cannot create the MessageDispatcherConfigurator - * - * INTERNAL USE ONLY */ private[akka] def from(cfg: Config): MessageDispatcher = configuratorFrom(cfg).dispatcher() private[akka] def isBalancingDispatcher(id: String): Boolean = settings.config.hasPath(id) && config(id).getString("type") == "BalancingDispatcher" /** + * INTERNAL API + * * Creates a MessageDispatcherConfigurator from a Config. * * The Config must also contain a `id` property, which is the identifier of the dispatcher. * * Throws: IllegalArgumentException if the value of "type" is not valid * IllegalArgumentException if it cannot create the MessageDispatcherConfigurator - * - * INTERNAL USE ONLY */ private def configuratorFrom(cfg: Config): MessageDispatcherConfigurator = { if (!cfg.hasPath("id")) throw new IllegalArgumentException("Missing dispatcher 'id' property in config: " + cfg.root.render) diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 8f5f376788..e3576a1184 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -347,7 +347,7 @@ trait MessageQueue { } /** - * INTERNAL USE ONLY + * INTERNAL API */ private[akka] trait SystemMessageQueue { /** @@ -364,7 +364,7 @@ private[akka] trait SystemMessageQueue { } /** - * INTERNAL USE ONLY + * INTERNAL API */ private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒ diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index f9e1f203b4..5458285245 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -75,7 +75,7 @@ akka { # FQCN of the failure detector implementation. # It must implement akka.remote.FailureDetector and have - # a constructor with a com.typesafe.config.Config parameter. + # a public constructor with a com.typesafe.config.Config parameter. implementation-class = "akka.remote.PhiAccrualFailureDetector" # How often keep-alive heartbeat messages should be sent to each connection. @@ -136,7 +136,7 @@ akka { # FQCN of the metrics collector implementation. # It must implement akka.cluster.cluster.MetricsCollector and - # have constructor with akka.actor.ActorSystem parameter. + # have public constructor with akka.actor.ActorSystem parameter. # The default SigarMetricsCollector uses JMX and Hyperic SIGAR, if SIGAR # is on the classpath, otherwise only JMX. collector-class = "akka.cluster.SigarMetricsCollector" @@ -173,10 +173,10 @@ akka { actor.deployment.default { # MetricsSelector to use # - available: "mix", "heap", "cpu", "load" - # - or: Fully qualified class name of the MetricsSelector class. - # The class must extend akka.cluster.routing.MetricsSelector - # and have a constructor with com.typesafe.config.Config - # parameter. + # - or: Fully qualified class name of the MetricsSelector class. + # The class must extend akka.cluster.routing.MetricsSelector + # and have a public constructor with com.typesafe.config.Config + # parameter. # - default is "mix" metrics-selector = mix } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala index b1ed2e7c38..5fb8376eea 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala @@ -42,7 +42,7 @@ import akka.cluster.routing.MetricsSelector * extension, i.e. the cluster will automatically be started when * the `ClusterActorRefProvider` is used. */ -class ClusterActorRefProvider( +private[akka] class ClusterActorRefProvider( _systemName: String, _settings: ActorSystem.Settings, _eventStream: EventStream, diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala index 2e735b89d1..ef74909b48 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala @@ -84,7 +84,7 @@ trait ClusterNodeMBean { } /** - * Internal API + * INTERNAL API */ private[akka] class ClusterJmx(cluster: Cluster, log: LoggingAdapter) { diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index 876ca93aae..66a0b7d623 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -9,7 +9,7 @@ import scala.collection.immutable import MemberStatus._ /** - * Internal API + * INTERNAL API */ private[cluster] object Gossip { val emptyMembers: immutable.SortedSet[Member] = immutable.SortedSet.empty diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala index 71ed4d4d17..f042a0606b 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSingletonManager.scala @@ -20,18 +20,18 @@ import akka.AkkaException object ClusterSingletonManager { /** - * Internal API + * INTERNAL API * public due to the `with FSM` type parameters */ sealed trait State /** - * Internal API + * INTERNAL API * public due to the `with FSM` type parameters */ sealed trait Data /** - * Internal API + * INTERNAL API */ private object Internal { /** diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index dbef02b9c0..00050a3967 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -136,7 +136,7 @@ akka { # FQCN of the failure detector implementation. # It must implement akka.remote.FailureDetector and have - # a constructor with a com.typesafe.config.Config parameter. + # a public constructor with a com.typesafe.config.Config parameter. implementation-class = "akka.remote.PhiAccrualFailureDetector" # How often keep-alive heartbeat messages should be sent to each connection. @@ -206,7 +206,9 @@ akka { # name to the applied-adapters setting in the configuration of a # transport. The available adapters should be configured in this # section by providing a name, and the fully qualified name of - # their corresponding implementation + # their corresponding implementation. The class given here + # must implement akka.akka.remote.transport.TransportAdapterProvider + # and have public constructor without parameters. adapters { gremlin = "akka.remote.transport.FailureInjectorProvider" trttl = "akka.remote.transport.ThrottlerProvider" @@ -215,6 +217,10 @@ akka { ### Default configuration for the Netty based transport drivers netty.tcp { + # The class given here must implement the akka.remote.transport.Transport + # interface and offer a public constructor which takes two arguments: + # 1) akka.actor.ExtendedActorSystem + # 2) com.typesafe.config.Config transport-class = "akka.remote.transport.netty.NettyTransport" # Transport drivers can be augmented with adapters by adding their diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 89ffe0ac11..15bf4a8011 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -15,11 +15,11 @@ import akka.remote.transport.AssociationHandle._ import akka.remote.transport.{ AkkaPduCodec, Transport, AssociationHandle } import akka.serialization.Serialization import akka.util.ByteString -import util.control.{ NoStackTrace, NonFatal } +import scala.util.control.{ NoStackTrace, NonFatal } import akka.remote.transport.Transport.InvalidAssociationException /** - * Internal API + * INTERNAL API */ private[remote] trait InboundMessageDispatcher { def dispatch(recipient: InternalActorRef, @@ -28,6 +28,9 @@ private[remote] trait InboundMessageDispatcher { senderOption: Option[ActorRef]): Unit } +/** + * INTERNAL API + */ private[remote] class DefaultMessageDispatcher(private val system: ExtendedActorSystem, private val provider: RemoteActorRefProvider, private val log: LoggingAdapter) extends InboundMessageDispatcher { @@ -88,7 +91,7 @@ private[remote] class DefaultMessageDispatcher(private val system: ExtendedActor } /** - * Internal API + * INTERNAL API */ private[remote] object EndpointWriter { @@ -109,13 +112,23 @@ private[remote] object EndpointWriter { case object Handoff extends State } +/** + * INTERNAL API + */ +@SerialVersionUID(1L) private[remote] class EndpointException(msg: String, cause: Throwable) extends AkkaException(msg, cause) with OnlyCauseStackTrace { def this(msg: String) = this(msg, null) } +/** + * INTERNAL API + */ private[remote] case class InvalidAssociation(localAddress: Address, remoteAddress: Address, cause: Throwable) extends EndpointException("Invalid address: " + remoteAddress, cause) +/** + * INTERNAL API + */ private[remote] class EndpointWriter( handleOrActive: Option[AssociationHandle], val localAddress: Address, @@ -279,6 +292,9 @@ private[remote] class EndpointWriter( } +/** + * INTERNAL API + */ private[remote] class EndpointReader( val codec: AkkaPduCodec, val localAddress: Address, diff --git a/akka-remote/src/main/scala/akka/remote/FailureDetector.scala b/akka-remote/src/main/scala/akka/remote/FailureDetector.scala index fc2cd61acc..ae4367f0f0 100644 --- a/akka-remote/src/main/scala/akka/remote/FailureDetector.scala +++ b/akka-remote/src/main/scala/akka/remote/FailureDetector.scala @@ -3,7 +3,7 @@ */ package akka.remote -import java.util.concurrent.TimeUnit._ +import java.util.concurrent.TimeUnit.NANOSECONDS /** * A failure detector must be a thread-safe mutable construct that registers heartbeat events of a resource and is able to diff --git a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala index fec24e3eef..7d26131f6d 100644 --- a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala @@ -4,14 +4,14 @@ package akka.remote -import language.existentials - import akka.remote.RemoteProtocol._ import com.google.protobuf.ByteString import akka.actor.ExtendedActorSystem import akka.serialization.SerializationExtension /** + * INTERNAL API + * * MessageSerializer is a helper for serializing and deserialize messages */ private[akka] object MessageSerializer { diff --git a/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala b/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala index c0d418da65..c4b6a6c8bb 100644 --- a/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala +++ b/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala @@ -144,9 +144,7 @@ class PhiAccrualFailureDetector( val mean = history.mean val stdDeviation = ensureValidStdDeviation(history.stdDeviation) - val φ = phi(timeDiff, mean + acceptableHeartbeatPauseMillis, stdDeviation) - - φ + phi(timeDiff, mean + acceptableHeartbeatPauseMillis, stdDeviation) } } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 7cf1c07f37..c138b9f2d0 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -15,7 +15,10 @@ import akka.actor.SystemGuardian.{ TerminationHookDone, TerminationHook, Registe import scala.util.control.Exception.Catcher import scala.concurrent.{ ExecutionContext, Future } -object RemoteActorRefProvider { +/** + * INTERNAL API + */ +private[akka] object RemoteActorRefProvider { private case class Internals(transport: RemoteTransport, serialization: Serialization, remoteDaemon: InternalActorRef) sealed trait TerminatorState @@ -88,13 +91,13 @@ object RemoteActorRefProvider { } /** + * INTERNAL API + * Depending on this class is not supported, only the [[akka.actor.ActorRefProvider]] interface is supported. + * * Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it. * - * INTERNAL API! - * - * Depending on this class is not supported, only the [[ActorRefProvider]] interface is supported. */ -class RemoteActorRefProvider( +private[akka] class RemoteActorRefProvider( val systemName: String, val settings: ActorSystem.Settings, val eventStream: EventStream, @@ -272,7 +275,7 @@ class RemoteActorRefProvider( case _ ⇒ local.actorFor(ref, path) } - /* + /** * INTERNAL API * Called in deserialization of incoming remote messages. In this case the correct local address is known, therefore * this method is faster than the actorFor above. @@ -317,6 +320,7 @@ private[akka] trait RemoteRef extends ActorRefScope { } /** + * INTERNAL API * Remote ActorRef that is used when referencing the Actor on a different node than its "home" node. * This reference is network-aware (remembers its origin) and immutable. */ diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index a6e8f762d9..45160e718f 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -13,15 +13,23 @@ import akka.actor.ActorRefWithCell import akka.actor.ActorRefScope import akka.util.Switch +/** + * INTERNAL API + */ private[akka] sealed trait DaemonMsg + +/** + * INTERNAL API + */ +@SerialVersionUID(1L) private[akka] case class DaemonMsgCreate(props: Props, deploy: Deploy, path: String, supervisor: ActorRef) extends DaemonMsg /** + * INTERNAL API + * * Internal system "daemon" actor for remote internal communication. * * It acts as the brain of the remote that responds to system remote events (messages) and undertakes action. - * - * INTERNAL USE ONLY! */ private[akka] class RemoteSystemDaemon( system: ActorSystemImpl, diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala index e714e453ea..640f5c38bb 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala @@ -15,6 +15,9 @@ case class RemoteScope(node: Address) extends Scope { def withFallback(other: Scope): Scope = this } +/** + * INTERNAL API + */ private[akka] class RemoteDeployer(_settings: ActorSystem.Settings, _pm: DynamicAccess) extends Deployer(_settings, _pm) { override def parseConfig(path: String, config: Config): Option[Deploy] = { import scala.collection.JavaConverters._ diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index 2dbcea4e41..2a2a64e4f6 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -7,7 +7,7 @@ import com.typesafe.config.Config import scala.concurrent.duration._ import java.util.concurrent.TimeUnit.MILLISECONDS import akka.util.Timeout -import scala.collection.immutable.Seq +import scala.collection.immutable import akka.japi.Util._ class RemoteSettings(val config: Config) { @@ -44,7 +44,7 @@ class RemoteSettings(val config: Config) { val CommandAckTimeout: Timeout = Timeout(Duration(getMilliseconds("akka.remote.command-ack-timeout"), MILLISECONDS)) - val Transports: Seq[(String, Seq[String], Config)] = transportNames.map { name ⇒ + val Transports: immutable.Seq[(String, immutable.Seq[String], Config)] = transportNames.map { name ⇒ val transportConfig = transportConfigFor(name) (transportConfig.getString("transport-class"), immutableSeq(transportConfig.getStringList("applied-adapters")).reverse, @@ -53,7 +53,7 @@ class RemoteSettings(val config: Config) { val Adapters: Map[String, String] = configToMap(getConfig("akka.remote.adapters")) - private def transportNames: Seq[String] = immutableSeq(getStringList("akka.remote.enabled-transports")) + private def transportNames: immutable.Seq[String] = immutableSeq(getStringList("akka.remote.enabled-transports")) private def transportConfigFor(transportName: String): Config = getConfig(transportName) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index 0331402148..923428c1d8 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -4,7 +4,6 @@ package akka.remote -import scala.reflect.BeanProperty import akka.dispatch.SystemMessage import akka.event.{ LoggingAdapter, Logging } import akka.AkkaException @@ -18,9 +17,12 @@ import scala.concurrent.Future * RemoteTransportException represents a general failure within a RemoteTransport, * such as inability to start, wrong configuration etc. */ +@SerialVersionUID(1L) class RemoteTransportException(message: String, cause: Throwable) extends AkkaException(message, cause) /** + * INTERNAL API + * * The remote transport is responsible for sending and receiving messages. * Each transport has an address, which it should provide in * Serialization.currentTransportAddress (thread-local) while serializing @@ -28,7 +30,7 @@ class RemoteTransportException(message: String, cause: Throwable) extends AkkaEx * be available (i.e. fully initialized) by the time the first message is * received or when the start() method returns, whatever happens first. */ -abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: RemoteActorRefProvider) { +private[akka] abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: RemoteActorRefProvider) { /** * Shuts down the remoting */ diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 1cc08772aa..a26ac79733 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -3,7 +3,6 @@ */ package akka.remote -import scala.language.postfixOps import akka.actor.SupervisorStrategy._ import akka.actor._ import akka.event.{ Logging, LoggingAdapter } @@ -23,11 +22,20 @@ import scala.concurrent.{ Promise, Await, Future } import scala.util.control.NonFatal import scala.util.{ Failure, Success } +/** + * INTERNAL API + */ private[remote] object AddressUrlEncoder { def apply(address: Address): String = URLEncoder.encode(address.toString, "utf-8") } +/** + * INTERNAL API + */ private[remote] case class RARP(provider: RemoteActorRefProvider) extends Extension +/** + * INTERNAL API + */ private[remote] object RARP extends ExtensionId[RARP] with ExtensionIdProvider { override def lookup() = RARP @@ -35,6 +43,9 @@ private[remote] object RARP extends ExtensionId[RARP] with ExtensionIdProvider { override def createExtension(system: ExtendedActorSystem) = RARP(system.provider.asInstanceOf[RemoteActorRefProvider]) } +/** + * INTERNAL API + */ private[remote] object Remoting { final val EndpointManagerName = "endpointManager" @@ -48,7 +59,7 @@ private[remote] object Remoting { responsibleTransports.size match { case 0 ⇒ throw new RemoteTransportException( - s"No transport is responsible for address: $remote although protocol ${remote.protocol} is available." + + s"No transport is responsible for address: [$remote] although protocol [${remote.protocol}] is available." + " Make sure at least one transport is configured to be responsible for the address.", null) @@ -63,7 +74,7 @@ private[remote] object Remoting { null) } case None ⇒ throw new RemoteTransportException( - s"No transport is loaded for protocol: ${remote.protocol}, available protocols: ${transportMapping.keys.mkString}", null) + s"No transport is loaded for protocol: [${remote.protocol}], available protocols: [${transportMapping.keys.mkString}]", null) } } @@ -81,6 +92,9 @@ private[remote] object Remoting { } +/** + * INTERNAL API + */ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) { @volatile private var endpointManager: Option[ActorRef] = None @@ -199,6 +213,9 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc } +/** + * INTERNAL API + */ private[remote] object EndpointManager { // Messages between Remoting and EndpointManager @@ -243,7 +260,7 @@ private[remote] object EndpointManager { def registerWritableEndpoint(address: Address, endpoint: ActorRef): ActorRef = addressToWritable.get(address) match { case Some(Pass(e)) ⇒ - throw new IllegalArgumentException(s"Attempting to overwrite existing endpoint $e with $endpoint") + throw new IllegalArgumentException(s"Attempting to overwrite existing endpoint [$e] with [$endpoint]") case _ ⇒ addressToWritable += address -> Pass(endpoint) writableToAddress += endpoint -> address @@ -309,6 +326,9 @@ private[remote] object EndpointManager { } } +/** + * INTERNAL API + */ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends Actor { import EndpointManager._ @@ -483,7 +503,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends adapters.map { TransportAdaptersExtension.get(context.system).getAdapterProvider(_) }.foldLeft(driver) { (t: Transport, provider: TransportAdapterProvider) ⇒ // The TransportAdapterProvider will wrap the given Transport and returns with a wrapped one - provider(t, context.system.asInstanceOf[ExtendedActorSystem]) + provider.create(t, context.system.asInstanceOf[ExtendedActorSystem]) } // Apply AkkaProtocolTransport wrapper to the end of the chain diff --git a/akka-remote/src/main/scala/akka/remote/RemotingLifecycle.scala b/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala similarity index 72% rename from akka-remote/src/main/scala/akka/remote/RemotingLifecycle.scala rename to akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala index 39977afafd..42c788e5d7 100644 --- a/akka-remote/src/main/scala/akka/remote/RemotingLifecycle.scala +++ b/akka-remote/src/main/scala/akka/remote/RemotingLifecycleEvent.scala @@ -5,14 +5,13 @@ package akka.remote import akka.event.{ LoggingAdapter, Logging } import akka.actor.{ ActorSystem, Address } -import scala.beans.BeanProperty -import java.util.{ Set ⇒ JSet } -import scala.collection.JavaConverters.setAsJavaSetConverter +@SerialVersionUID(1L) sealed trait RemotingLifecycleEvent extends Serializable { def logLevel: Logging.LogLevel } +@SerialVersionUID(1L) sealed trait AssociationEvent extends RemotingLifecycleEvent { def localAddress: Address def remoteAddress: Address @@ -24,55 +23,65 @@ sealed trait AssociationEvent extends RemotingLifecycleEvent { override def toString: String = s"$eventName [$localAddress]${if (inbound) " <- " else " -> "}[$remoteAddress]" } +@SerialVersionUID(1L) final case class AssociatedEvent( localAddress: Address, remoteAddress: Address, inbound: Boolean) extends AssociationEvent { - protected override val eventName: String = "Associated" + protected override def eventName: String = "Associated" override def logLevel: Logging.LogLevel = Logging.DebugLevel } +@SerialVersionUID(1L) final case class DisassociatedEvent( localAddress: Address, remoteAddress: Address, inbound: Boolean) extends AssociationEvent { - protected override val eventName: String = "Disassociated" + protected override def eventName: String = "Disassociated" override def logLevel: Logging.LogLevel = Logging.DebugLevel } +@SerialVersionUID(1L) final case class AssociationErrorEvent( cause: Throwable, localAddress: Address, remoteAddress: Address, inbound: Boolean) extends AssociationEvent { - protected override val eventName: String = "AssociationError" + protected override def eventName: String = "AssociationError" override def logLevel: Logging.LogLevel = Logging.ErrorLevel - override def toString: String = s"${super.toString}: Error[${Logging.stackTraceFor(cause)}]" + override def toString: String = s"${super.toString}: Error [${cause.getMessage}] [${Logging.stackTraceFor(cause)}]" def getCause: Throwable = cause } +@SerialVersionUID(1L) final case class RemotingListenEvent(listenAddresses: Set[Address]) extends RemotingLifecycleEvent { - def getListenAddresses: JSet[Address] = listenAddresses.asJava + def getListenAddresses: java.util.Set[Address] = + scala.collection.JavaConverters.setAsJavaSetConverter(listenAddresses).asJava override def logLevel: Logging.LogLevel = Logging.InfoLevel override def toString: String = "Remoting now listens on addresses: " + listenAddresses.mkString("[", ", ", "]") } +@SerialVersionUID(1L) case object RemotingShutdownEvent extends RemotingLifecycleEvent { override def logLevel: Logging.LogLevel = Logging.InfoLevel override val toString: String = "Remoting shut down" } +@SerialVersionUID(1L) final case class RemotingErrorEvent(cause: Throwable) extends RemotingLifecycleEvent { def getCause: Throwable = cause override def logLevel: Logging.LogLevel = Logging.ErrorLevel - override def toString: String = s"Remoting error: [${Logging.stackTraceFor(cause)}]" + override def toString: String = s"Remoting error: [${cause.getMessage}] [${Logging.stackTraceFor(cause)}]" } -class EventPublisher(system: ActorSystem, log: LoggingAdapter, logEvents: Boolean) { +/** + * INTERNAL API + */ +private[remote] class EventPublisher(system: ActorSystem, log: LoggingAdapter, logEvents: Boolean) { def notifyListeners(message: RemotingLifecycleEvent): Unit = { system.eventStream.publish(message) if (logEvents) log.log(message.logLevel, "{}", message) diff --git a/akka-remote/src/main/scala/akka/remote/security/provider/AES128CounterInetRNG.scala b/akka-remote/src/main/scala/akka/remote/security/provider/AES128CounterInetRNG.scala index cb1722c41c..ddcc986f40 100644 --- a/akka-remote/src/main/scala/akka/remote/security/provider/AES128CounterInetRNG.scala +++ b/akka-remote/src/main/scala/akka/remote/security/provider/AES128CounterInetRNG.scala @@ -7,7 +7,7 @@ import org.uncommons.maths.random.{ AESCounterRNG } import SeedSize.Seed128 /** - * Internal API + * INTERNAL API * This class is a wrapper around the 128-bit AESCounterRNG algorithm provided by http://maths.uncommons.org/ * It uses the default seed generator which uses one of the following 3 random seed sources: * Depending on availability: random.org, /dev/random, and SecureRandom (provided by Java) diff --git a/akka-remote/src/main/scala/akka/remote/security/provider/AES128CounterSecureRNG.scala b/akka-remote/src/main/scala/akka/remote/security/provider/AES128CounterSecureRNG.scala index feae55de02..eb6c1505c4 100644 --- a/akka-remote/src/main/scala/akka/remote/security/provider/AES128CounterSecureRNG.scala +++ b/akka-remote/src/main/scala/akka/remote/security/provider/AES128CounterSecureRNG.scala @@ -7,7 +7,7 @@ import org.uncommons.maths.random.{ AESCounterRNG, SecureRandomSeedGenerator } import SeedSize.Seed128 /** - * Internal API + * INTERNAL API * This class is a wrapper around the 128-bit AESCounterRNG algorithm provided by http://maths.uncommons.org/ * The only method used by netty ssl is engineNextBytes(bytes) * This RNG is good to use to prevent startup delay when you don't have Internet access to random.org diff --git a/akka-remote/src/main/scala/akka/remote/security/provider/AES256CounterInetRNG.scala b/akka-remote/src/main/scala/akka/remote/security/provider/AES256CounterInetRNG.scala index 321b66200c..a7b7cdd985 100644 --- a/akka-remote/src/main/scala/akka/remote/security/provider/AES256CounterInetRNG.scala +++ b/akka-remote/src/main/scala/akka/remote/security/provider/AES256CounterInetRNG.scala @@ -7,7 +7,7 @@ import org.uncommons.maths.random.{ AESCounterRNG } import SeedSize.Seed256 /** - * Internal API + * INTERNAL API * This class is a wrapper around the 256-bit AESCounterRNG algorithm provided by http://maths.uncommons.org/ * It uses the default seed generator which uses one of the following 3 random seed sources: * Depending on availability: random.org, /dev/random, and SecureRandom (provided by Java) diff --git a/akka-remote/src/main/scala/akka/remote/security/provider/AES256CounterSecureRNG.scala b/akka-remote/src/main/scala/akka/remote/security/provider/AES256CounterSecureRNG.scala index 833ca32109..d49bb50b3d 100644 --- a/akka-remote/src/main/scala/akka/remote/security/provider/AES256CounterSecureRNG.scala +++ b/akka-remote/src/main/scala/akka/remote/security/provider/AES256CounterSecureRNG.scala @@ -7,7 +7,7 @@ import org.uncommons.maths.random.{ AESCounterRNG, SecureRandomSeedGenerator } import SeedSize.Seed256 /** - * Internal API + * INTERNAL API * This class is a wrapper around the 256-bit AESCounterRNG algorithm provided by http://maths.uncommons.org/ * The only method used by netty ssl is engineNextBytes(bytes) * This RNG is good to use to prevent startup delay when you don't have Internet access to random.org diff --git a/akka-remote/src/main/scala/akka/remote/security/provider/InternetSeedGenerator.scala b/akka-remote/src/main/scala/akka/remote/security/provider/InternetSeedGenerator.scala index b274c4c0b6..28495070df 100644 --- a/akka-remote/src/main/scala/akka/remote/security/provider/InternetSeedGenerator.scala +++ b/akka-remote/src/main/scala/akka/remote/security/provider/InternetSeedGenerator.scala @@ -19,7 +19,7 @@ import org.uncommons.maths.random.{ SeedGenerator, SeedException, SecureRandomSe import scala.collection.immutable /** - * Internal API + * INTERNAL API * Seed generator that maintains multiple strategies for seed * generation and will delegate to the best one available for the * current operating environment. diff --git a/akka-remote/src/main/scala/akka/remote/security/provider/SeedSize.scala b/akka-remote/src/main/scala/akka/remote/security/provider/SeedSize.scala index c55e39ab99..0eb1bba58b 100644 --- a/akka-remote/src/main/scala/akka/remote/security/provider/SeedSize.scala +++ b/akka-remote/src/main/scala/akka/remote/security/provider/SeedSize.scala @@ -5,12 +5,12 @@ package akka.remote.security.provider /** - * Internal API + * INTERNAL API * From AESCounterRNG API docs: * Valid values are 16 (128 bits), 24 (192 bits) and 32 (256 bits). * Any other values will result in an exception from the AES implementation. * - * Internal API + * INTERNAL API */ private[provider] object SeedSize { val Seed128 = 16 diff --git a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala index c00492d109..b6042dc611 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala @@ -3,12 +3,9 @@ */ package akka.remote.transport -import scala.language.postfixOps import akka.actor._ import akka.pattern.{ ask, pipe } import akka.remote.Remoting.RegisterTransportActor -import akka.remote.transport.ActorTransportAdapter.ListenUnderlying -import akka.remote.transport.ActorTransportAdapter.ListenerRegistered import akka.remote.transport.Transport._ import akka.remote.RARP import akka.util.Timeout @@ -17,7 +14,12 @@ import scala.concurrent.duration._ import scala.concurrent.{ ExecutionContext, Promise, Future } import scala.util.Success -trait TransportAdapterProvider extends ((Transport, ExtendedActorSystem) ⇒ Transport) +trait TransportAdapterProvider { + /** + * Create the transport adapter that wraps an underlying transport. + */ + def create(wrappedTransport: Transport, system: ExtendedActorSystem): Transport +} class TransportAdapters(system: ExtendedActorSystem) extends Extension { val settings = RARP(system).provider.remoteSettings @@ -123,7 +125,7 @@ object ActorTransportAdapter { upstreamListener: Future[AssociationEventListener]) extends TransportOperation case object DisassociateUnderlying extends TransportOperation - implicit val AskTimeout = Timeout(5 seconds) + implicit val AskTimeout = Timeout(5.seconds) } abstract class ActorTransportAdapter(wrappedTransport: Transport, system: ActorSystem) @@ -158,6 +160,8 @@ abstract class ActorTransportAdapter(wrappedTransport: Transport, system: ActorS } abstract class ActorTransportAdapterManager extends Actor { + import ActorTransportAdapter.{ ListenUnderlying, ListenerRegistered } + private var delayedEvents = immutable.Queue.empty[Any] protected var associationListener: AssociationEventListener = _ diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala index 26d0028c60..a64d011404 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala @@ -12,10 +12,14 @@ import akka.util.ByteString import com.google.protobuf.InvalidProtocolBufferException import com.google.protobuf.{ ByteString ⇒ PByteString } -class PduCodecException(msg: String, cause: Throwable) extends AkkaException(msg, cause) +/** + * INTERNAL API + */ +@SerialVersionUID(1L) +private[remote] class PduCodecException(msg: String, cause: Throwable) extends AkkaException(msg, cause) /** - * Internal API + * INTERNAL API * * Companion object of the [[akka.remote.transport.AkkaPduCodec]] trait. Contains the representation case classes * of decoded Akka Protocol Data Units (PDUs). @@ -39,6 +43,8 @@ private[remote] object AkkaPduCodec { } /** + * INTERNAL API + * * A Codec that is able to convert Akka PDUs (Protocol Data Units) from and to [[akka.util.ByteString]]s. */ private[remote] trait AkkaPduCodec { @@ -89,6 +95,9 @@ private[remote] trait AkkaPduCodec { senderOption: Option[ActorRef]): ByteString } +/** + * INTERNAL API + */ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec { override def constructMessage( diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala index cdb14055ba..f89c3929ac 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala @@ -23,6 +23,7 @@ import scala.collection.immutable import akka.remote.transport.ActorTransportAdapter._ import akka.ConfigurationException +@SerialVersionUID(1L) class AkkaProtocolException(msg: String, cause: Throwable) extends AkkaException(msg, cause) with OnlyCauseStackTrace { def this(msg: String) = this(msg, null) } diff --git a/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala index 1efd89c2fc..ed1846dd47 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala @@ -15,27 +15,43 @@ import scala.concurrent.forkjoin.ThreadLocalRandom import scala.concurrent.{ Future, Promise } import scala.util.control.NoStackTrace +@SerialVersionUID(1L) case class FailureInjectorException(msg: String) extends AkkaException(msg) with NoStackTrace class FailureInjectorProvider extends TransportAdapterProvider { - def apply(wrappedTransport: Transport, system: ExtendedActorSystem): Transport = + override def create(wrappedTransport: Transport, system: ExtendedActorSystem): Transport = new FailureInjectorTransportAdapter(wrappedTransport, system) } +/** + * INTERNAL API + */ private[remote] object FailureInjectorTransportAdapter { val FailureInjectorSchemeIdentifier = "gremlin" trait FailureInjectorCommand + @SerialVersionUID(1L) case class All(mode: GremlinMode) + @SerialVersionUID(1L) case class One(remoteAddress: Address, mode: GremlinMode) sealed trait GremlinMode - case object PassThru extends GremlinMode + @SerialVersionUID(1L) + case object PassThru extends GremlinMode { + /** + * Java API: get the singleton instance + */ + def getInstance = this + } + @SerialVersionUID(1L) case class Drop(outboundDropP: Double, inboundDropP: Double) extends GremlinMode } +/** + * INTERNAL API + */ private[remote] class FailureInjectorTransportAdapter(wrappedTransport: Transport, val extendedSystem: ExtendedActorSystem) extends AbstractTransportAdapter(wrappedTransport)(extendedSystem.dispatcher) with AssociationEventListener { @@ -112,6 +128,9 @@ private[remote] class FailureInjectorTransportAdapter(wrappedTransport: Transpor } } +/** + * INTERNAL API + */ private[remote] case class FailureInjectorHandle(_wrappedHandle: AssociationHandle, private val gremlinAdapter: FailureInjectorTransportAdapter) extends AbstractTransportAdapterHandle(_wrappedHandle, FailureInjectorSchemeIdentifier) diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index cf6d8c313d..51fa995975 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -9,7 +9,6 @@ import akka.pattern.pipe import akka.remote.transport.ActorTransportAdapter.AssociateUnderlying import akka.remote.transport.AkkaPduCodec.Associate import akka.remote.transport.AssociationHandle.{ ActorHandleEventListener, Disassociated, InboundPayload, HandleEventListener } -import akka.remote.transport.ThrottledAssociation._ import akka.remote.transport.ThrottlerManager.Checkin import akka.remote.transport.ThrottlerTransportAdapter._ import akka.remote.transport.Transport._ @@ -27,7 +26,7 @@ import scala.concurrent.duration._ class ThrottlerProvider extends TransportAdapterProvider { - def apply(wrappedTransport: Transport, system: ExtendedActorSystem): Transport = + override def create(wrappedTransport: Transport, system: ExtendedActorSystem): Transport = new ThrottlerTransportAdapter(wrappedTransport, system) } @@ -41,32 +40,61 @@ object ThrottlerTransportAdapter { } object Direction { + + @SerialVersionUID(1L) case object Send extends Direction { override def includes(other: Direction): Boolean = other match { case Send ⇒ true case _ ⇒ false } + + /** + * Java API: get the singleton instance + */ + def getInstance = this } + + @SerialVersionUID(1L) case object Receive extends Direction { override def includes(other: Direction): Boolean = other match { case Receive ⇒ true case _ ⇒ false } + + /** + * Java API: get the singleton instance + */ + def getInstance = this } + + @SerialVersionUID(1L) case object Both extends Direction { override def includes(other: Direction): Boolean = true + + /** + * Java API: get the singleton instance + */ + def getInstance = this } } - object SetThrottle + @SerialVersionUID(1L) case class SetThrottle(address: Address, direction: Direction, mode: ThrottleMode) - case object SetThrottleAck + + @SerialVersionUID(1L) + case object SetThrottleAck { + /** + * Java API: get the singleton instance + */ + def getInstance = this + } sealed trait ThrottleMode { def tryConsumeTokens(nanoTimeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) def timeToAvailable(currentNanoTime: Long, tokens: Int): FiniteDuration } + @SerialVersionUID(1L) case class TokenBucket(capacity: Int, tokensPerSecond: Double, nanoTimeOfLastSend: Long, availableTokens: Int) extends ThrottleMode { @@ -92,14 +120,27 @@ object ThrottlerTransportAdapter { (TimeUnit.NANOSECONDS.toMillis(nanoTimeOfSend - nanoTimeOfLastSend) * tokensPerSecond / 1000.0).toInt } + @SerialVersionUID(1L) case object Unthrottled extends ThrottleMode { override def tryConsumeTokens(nanoTimeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = (this, true) override def timeToAvailable(currentNanoTime: Long, tokens: Int): FiniteDuration = Duration.Zero + + /** + * Java API: get the singleton instance + */ + def getInstance = this + } + @SerialVersionUID(1L) case object Blackhole extends ThrottleMode { override def tryConsumeTokens(nanoTimeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = (this, false) override def timeToAvailable(currentNanoTime: Long, tokens: Int): FiniteDuration = Duration.Zero + + /** + * Java API: get the singleton instance + */ + def getInstance = this } } @@ -122,11 +163,17 @@ class ThrottlerTransportAdapter(_wrappedTransport: Transport, _system: ExtendedA } } +/** + * INTERNAL API + */ private[transport] object ThrottlerManager { case class OriginResolved() case class Checkin(origin: Address, handle: ThrottlerHandle) } +/** + * INTERNAL API + */ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends ActorTransportAdapterManager { import context.dispatcher @@ -215,7 +262,10 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A } -object ThrottledAssociation { +/** + * INTERNAL API + */ +private[transport] object ThrottledAssociation { private final val DequeueTimerName = "dequeue" case object Dequeue @@ -247,12 +297,16 @@ object ThrottledAssociation { case class ExposedHandle(handle: ThrottlerHandle) extends ThrottlerData } +/** + * INTERNAL API + */ private[transport] class ThrottledAssociation( val manager: ActorRef, val associationHandler: AssociationEventListener, val originalHandle: AssociationHandle, val inbound: Boolean) - extends Actor with LoggingFSM[ThrottlerState, ThrottlerData] { + extends Actor with LoggingFSM[ThrottledAssociation.ThrottlerState, ThrottledAssociation.ThrottlerData] { + import ThrottledAssociation._ import context.dispatcher var inboundThrottleMode: ThrottleMode = _ @@ -398,6 +452,9 @@ private[transport] class ThrottledAssociation( } +/** + * INTERNAL API + */ private[transport] case class ThrottlerHandle(_wrappedHandle: AssociationHandle, throttlerActor: ActorRef) extends AbstractTransportAdapterHandle(_wrappedHandle, SchemeIdentifier) { diff --git a/akka-remote/src/main/scala/akka/remote/transport/Transport.scala b/akka-remote/src/main/scala/akka/remote/transport/Transport.scala index 82f361dfa0..88510caa31 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/Transport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/Transport.scala @@ -17,6 +17,7 @@ object Transport { * Indicates that the association setup request is invalid, and it is impossible to recover (malformed IP address, * hostname, etc.). */ + @SerialVersionUID(1L) case class InvalidAssociationException(msg: String, cause: Throwable) extends AkkaException(msg, cause) /** diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala index 94d586908b..e3d793f9e5 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyHelpers.scala @@ -8,6 +8,9 @@ import java.nio.channels.ClosedChannelException import org.jboss.netty.channel._ import scala.util.control.NonFatal +/** + * INTERNAL API + */ private[netty] trait NettyHelpers { protected def onConnect(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = {} @@ -30,6 +33,9 @@ private[netty] trait NettyHelpers { } } +/** + * INTERNAL API + */ private[netty] trait NettyServerHelpers extends SimpleChannelUpstreamHandler with NettyHelpers { final override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = { @@ -55,6 +61,9 @@ private[netty] trait NettyServerHelpers extends SimpleChannelUpstreamHandler wit } } +/** + * INTERNAL API + */ private[netty] trait NettyClientHelpers extends SimpleChannelHandler with NettyHelpers { final override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent): Unit = { super.messageReceived(ctx, e) diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettySSLSupport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettySSLSupport.scala index 949ef235fb..df12d723a3 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettySSLSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettySSLSupport.scala @@ -15,6 +15,9 @@ import java.security._ import javax.net.ssl.{ KeyManagerFactory, TrustManager, TrustManagerFactory, SSLContext } import org.jboss.netty.handler.ssl.SslHandler +/** + * INTERNAL API + */ private[akka] class SSLSettings(config: Config) { import config._ @@ -45,8 +48,9 @@ private[akka] class SSLSettings(config: Config) { } /** + * INTERNAL API + * * Used for adding SSL support to Netty pipeline - * Internal use only */ private[akka] object NettySSLSupport { diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala index 9a4362d837..b85cf8a0a7 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala @@ -25,7 +25,7 @@ import org.jboss.netty.handler.ssl.SslHandler import scala.concurrent.duration.{ Duration, FiniteDuration, MILLISECONDS } import scala.concurrent.{ ExecutionContext, Promise, Future, blocking } import scala.util.{ Failure, Success, Try } -import util.control.{ NoStackTrace, NonFatal } +import scala.util.control.{ NoStackTrace, NonFatal } object NettyTransportSettings { sealed trait Mode @@ -60,6 +60,7 @@ object NettyFutureBridge { } } +@SerialVersionUID(1L) class NettyTransportException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) with OnlyCauseStackTrace { def this(msg: String) = this(msg, null) } @@ -71,7 +72,7 @@ class NettyTransportSettings(config: Config) { val TransportMode: Mode = getString("transport-protocol") match { case "tcp" ⇒ Tcp case "udp" ⇒ Udp - case unknown ⇒ throw new ConfigurationException(s"Unknown transport: $unknown") + case unknown ⇒ throw new ConfigurationException(s"Unknown transport: [$unknown]") } val EnableSsl: Boolean = if (getBoolean("enable-ssl") && TransportMode == Udp) @@ -123,7 +124,10 @@ class NettyTransportSettings(config: Config) { } -trait CommonHandlers extends NettyHelpers { +/** + * INTERNAL API + */ +private[netty] trait CommonHandlers extends NettyHelpers { protected val transport: NettyTransport final override def onOpen(ctx: ChannelHandlerContext, e: ChannelStateEvent): Unit = transport.channelGroup.add(e.getChannel) @@ -153,8 +157,11 @@ trait CommonHandlers extends NettyHelpers { } } -abstract class ServerHandler(protected final val transport: NettyTransport, - private final val associationListenerFuture: Future[AssociationEventListener]) +/** + * INTERNAL API + */ +private[netty] abstract class ServerHandler(protected final val transport: NettyTransport, + private final val associationListenerFuture: Future[AssociationEventListener]) extends NettyServerHelpers with CommonHandlers { import transport.executionContext @@ -172,7 +179,10 @@ abstract class ServerHandler(protected final val transport: NettyTransport, } -abstract class ClientHandler(protected final val transport: NettyTransport, remoteAddress: Address) +/** + * INTERNAL API + */ +private[netty] abstract class ClientHandler(protected final val transport: NettyTransport, remoteAddress: Address) extends NettyClientHelpers with CommonHandlers { final protected val statusPromise = Promise[AssociationHandle]() def statusFuture = statusPromise.future @@ -183,6 +193,9 @@ abstract class ClientHandler(protected final val transport: NettyTransport, remo } +/** + * INTERNAL API + */ private[transport] object NettyTransport { // 4 bytes will be used to represent the frame length. Used by netty LengthFieldPrepender downstream handler. val FrameLengthFieldLength = 4 @@ -209,16 +222,19 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA implicit val executionContext: ExecutionContext = system.dispatcher override val schemeIdentifier: String = (if (EnableSsl) "ssl." else "") + TransportMode - override val maximumPayloadBytes: Int = 32000 // The number of octets required by the remoting specification + override def maximumPayloadBytes: Int = 32000 // The number of octets required by the remoting specification - private final val isDatagram: Boolean = TransportMode == Udp + private final val isDatagram = TransportMode == Udp @volatile private var localAddress: Address = _ @volatile private var serverChannel: Channel = _ private val log = Logging(system, this.getClass) - final val udpConnectionTable = new ConcurrentHashMap[SocketAddress, HandleEventListener]() + /** + * INTERNAL API + */ + private[netty] final val udpConnectionTable = new ConcurrentHashMap[SocketAddress, HandleEventListener]() /* * Be aware, that the close() method of DefaultChannelGroup is racy, because it uses an iterator over a ConcurrentHashMap. diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala index 879564f57c..fb40b7d966 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala @@ -14,11 +14,17 @@ import org.jboss.netty.channel._ import scala.concurrent.{ Future, Promise } import scala.util.{ Success, Failure } +/** + * INTERNAL API + */ private[remote] object ChannelLocalActor extends ChannelLocal[Option[HandleEventListener]] { override def initialValue(channel: Channel): Option[HandleEventListener] = None def notifyListener(channel: Channel, msg: HandleEvent): Unit = get(channel) foreach { _ notify msg } } +/** + * INTERNAL API + */ private[remote] trait TcpHandlers extends CommonHandlers { import ChannelLocalActor._ @@ -45,6 +51,9 @@ private[remote] trait TcpHandlers extends CommonHandlers { } } +/** + * INTERNAL API + */ private[remote] class TcpServerHandler(_transport: NettyTransport, _associationListenerFuture: Future[AssociationEventListener]) extends ServerHandler(_transport, _associationListenerFuture) with TcpHandlers { @@ -53,6 +62,9 @@ private[remote] class TcpServerHandler(_transport: NettyTransport, _associationL } +/** + * INTERNAL API + */ private[remote] class TcpClientHandler(_transport: NettyTransport, remoteAddress: Address) extends ClientHandler(_transport, remoteAddress) with TcpHandlers { @@ -61,6 +73,9 @@ private[remote] class TcpClientHandler(_transport: NettyTransport, remoteAddress } +/** + * INTERNAL API + */ private[remote] class TcpAssociationHandle(val localAddress: Address, val remoteAddress: Address, private val channel: Channel) extends AssociationHandle { diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala index 5630079749..cf9b3cc4ae 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/UdpSupport.scala @@ -13,6 +13,9 @@ import org.jboss.netty.buffer.{ ChannelBuffer, ChannelBuffers } import org.jboss.netty.channel._ import scala.concurrent.{ Future, Promise } +/** + * INTERNAL API + */ private[remote] trait UdpHandlers extends CommonHandlers { override def createHandle(channel: Channel, localAddress: Address, remoteAddress: Address): AssociationHandle = @@ -46,6 +49,9 @@ private[remote] trait UdpHandlers extends CommonHandlers { def initUdp(channel: Channel, remoteSocketAddress: SocketAddress, msg: ChannelBuffer): Unit } +/** + * INTERNAL API + */ private[remote] class UdpServerHandler(_transport: NettyTransport, _associationListenerFuture: Future[AssociationEventListener]) extends ServerHandler(_transport, _associationListenerFuture) with UdpHandlers { @@ -53,6 +59,9 @@ private[remote] class UdpServerHandler(_transport: NettyTransport, _associationL initInbound(channel, remoteSocketAddress, msg) } +/** + * INTERNAL API + */ private[remote] class UdpClientHandler(_transport: NettyTransport, remoteAddress: Address) extends ClientHandler(_transport, remoteAddress) with UdpHandlers { @@ -60,6 +69,9 @@ private[remote] class UdpClientHandler(_transport: NettyTransport, remoteAddress initOutbound(channel, remoteSocketAddress, msg) } +/** + * INTERNAL API + */ private[remote] class UdpAssociationHandle(val localAddress: Address, val remoteAddress: Address, private val channel: Channel,