diff --git a/akka-docs/rst/common/binary-compatibility-rules.rst b/akka-docs/rst/common/binary-compatibility-rules.rst index 1450b83b5b..af0a19283a 100644 --- a/akka-docs/rst/common/binary-compatibility-rules.rst +++ b/akka-docs/rst/common/binary-compatibility-rules.rst @@ -91,7 +91,7 @@ API stability annotations and comments Akka gives a very strong binary compatibility promise to end-users. However some parts of Akka are excluded from these rules, for example internal or known evolving APIs may be marked as such and shipped as part of -an overall stable module. As general rule any breakage is avoided and handled via deprecation and additional method, +an overall stable module. As general rule any breakage is avoided and handled via deprecation and method addition, however certain APIs which are known to not yet be fully frozen (or are fully internal) are marked as such and subject to change at any time (even if best-effort is taken to keep them compatible). @@ -101,12 +101,12 @@ When browsing the source code and/or looking for methods available to be called, have as rich of an access protection system as Scala has, you may sometimes find methods or classes annotated with the ``/** INTERNAL API */`` comment or the ``@akka.annotation.InternalApi`` annotation. -No compatibility guarantees are given about these classes, they may change or even disapear in minor versions, -and user code is not supposed to be calling (or even touching) them. +No compatibility guarantees are given about these classes. They may change or even dissappear in minor versions, +and user code is not supposed to call them. Side-note on JVM representation details of the Scala ``private[akka]`` pattern that Akka is using extensively in it's internals: Such methods or classes, which act as "accessible only from the given package" in Scala, are compiled -down to ``public`` (!) in raw Java bytecode, and the access restriction, that Scala understands is carried along +down to ``public`` (!) in raw Java bytecode. The access restriction, that Scala understands is carried along as metadata stored in the classfile. Thus, such methods are safely guarded from being accessed from Scala, however Java users will not be warned about this fact by the ``javac`` compiler. Please be aware of this and do not call into Internal APIs, as they are subject to change without any warning. @@ -117,7 +117,7 @@ The ``@DoNotInherit`` and ``@ApiMayChange`` markers In addition to the special internal API marker two annotations exist in Akka and specifically address the following use cases: - ``@ApiMayChange`` – which marks APIs which are known to be not fully stable yet. Read more in :ref:`may-change` -- ``@DoNotInherit`` – which marks APIs that are designed under an closed-world assumption, and thus must not be +- ``@DoNotInherit`` – which marks APIs that are designed under a closed-world assumption, and thus must not be extended outside Akka itself (or such code will risk facing binary incompatibilities). E.g. an interface may be marked using this annotation, and while the type is public, it is not meant for extension by user-code. This allows adding new methods to these interfaces without risking to break client code. Examples of such API are the ``FlowOps`` diff --git a/akka-stream-tests-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala b/akka-stream-tests-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala index 89ff8a9aed..4aba731323 100644 --- a/akka-stream-tests-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala +++ b/akka-stream-tests-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala @@ -18,7 +18,7 @@ class FusableProcessorTest extends AkkaIdentityProcessorVerification[Int] { implicit val materializer = ActorMaterializer(settings)(system) // withAttributes "wraps" the underlying identity and protects it from automatic removal - Flow[Int].via(GraphStages.Identity.asInstanceOf[Graph[FlowShape[Int, Int], NotUsed]]).named("identity").toProcessor.run() + Flow[Int].via(GraphStages.identity.asInstanceOf[Graph[FlowShape[Int, Int], NotUsed]]).named("identity").toProcessor.run() } override def createElement(element: Int): Int = element diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphDSLCompileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphDSLCompileSpec.scala index fffa0b7c6b..794ea27e73 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphDSLCompileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphDSLCompileSpec.scala @@ -365,7 +365,7 @@ class GraphDSLCompileSpec extends StreamSpec { import akka.stream.Attributes._ val ga = GraphDSL.create() { implicit b ⇒ import GraphDSL.Implicits._ - val id = b.add(GraphStages.Identity) + val id = b.add(GraphStages.identity[Any]) FlowShape(id.in, id.out) }.async.addAttributes(none).named("useless") diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 061ee56cc6..ba38630603 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -6,7 +6,7 @@ package akka.stream.impl import java.util.concurrent.atomic.AtomicBoolean import akka.actor._ -import akka.annotation.InternalApi +import akka.annotation.{ DoNotInherit, InternalApi } import akka.event.LoggingAdapter import akka.pattern.ask import akka.stream._ @@ -19,7 +19,7 @@ import scala.concurrent.{ Await, ExecutionContextExecutor } /** * ExtendedActorMaterializer used by subtypes which materializer using GraphInterpreterShell */ -abstract class ExtendedActorMaterializer extends ActorMaterializer { +@DoNotInherit private[akka] abstract class ExtendedActorMaterializer extends ActorMaterializer { override def withNamePrefix(name: String): ExtendedActorMaterializer @@ -41,7 +41,7 @@ abstract class ExtendedActorMaterializer extends ActorMaterializer { /** * INTERNAL API */ - override def actorOf(context: MaterializationContext, props: Props): ActorRef = { + @InternalApi private[akka] override def actorOf(context: MaterializationContext, props: Props): ActorRef = { val dispatcher = if (props.deploy.dispatcher == Deploy.NoDispatcherGiven) effectiveSettings(context.effectiveAttributes).dispatcher else props.dispatcher @@ -51,7 +51,7 @@ abstract class ExtendedActorMaterializer extends ActorMaterializer { /** * INTERNAL API */ - def actorOf(props: Props, name: String): ActorRef = { + @InternalApi private[akka] def actorOf(props: Props, name: String): ActorRef = { supervisor match { case ref: LocalActorRef ⇒ ref.underlying.attachChild(props, name, systemService = false) @@ -71,12 +71,12 @@ abstract class ExtendedActorMaterializer extends ActorMaterializer { /** * INTERNAL API */ - override def logger: LoggingAdapter + @InternalApi private[akka] override def logger: LoggingAdapter /** * INTERNAL API */ - override def supervisor: ActorRef + @InternalApi private[akka] override def supervisor: ActorRef } @@ -117,7 +117,7 @@ private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMa /** * INTERNAL API */ -object FlowNames extends ExtensionId[FlowNames] with ExtensionIdProvider { +@InternalApi private[akka] object FlowNames extends ExtensionId[FlowNames] with ExtensionIdProvider { override def get(system: ActorSystem): FlowNames = super.get(system) override def lookup() = FlowNames override def createExtension(system: ExtendedActorSystem): FlowNames = new FlowNames @@ -126,14 +126,14 @@ object FlowNames extends ExtensionId[FlowNames] with ExtensionIdProvider { /** * INTERNAL API */ -class FlowNames extends Extension { +@InternalApi private[akka] class FlowNames extends Extension { val name = SeqActorName("Flow") } /** * INTERNAL API */ -object StreamSupervisor { +@InternalApi private[akka] object StreamSupervisor { def props(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean): Props = Props(new StreamSupervisor(settings, haveShutDown)).withDeploy(Deploy.local) private[stream] val baseName = "StreamSupervisor" @@ -155,7 +155,10 @@ object StreamSupervisor { case object PrintDebugDump } -class StreamSupervisor(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean) extends Actor { +/** + * INTERNAL API + */ +@InternalApi private[akka] class StreamSupervisor(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean) extends Actor { import akka.stream.impl.StreamSupervisor._ override def supervisorStrategy = SupervisorStrategy.stoppingStrategy diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index e04897a15d..c6e2aa4ad0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -4,16 +4,17 @@ package akka.stream.impl import akka.actor._ +import akka.annotation.InternalApi import akka.stream.{ AbruptTerminationException, ActorMaterializerSettings } import akka.stream.actor.ActorSubscriber.OnSubscribe -import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnComplete, OnError } -import org.reactivestreams.{ Subscriber, Subscription, Processor } +import akka.stream.actor.ActorSubscriberMessage.{ OnComplete, OnError, OnNext } +import org.reactivestreams.{ Processor, Subscriber, Subscription } import akka.event.Logging /** * INTERNAL API */ -private[akka] object ActorProcessor { +@InternalApi private[akka] object ActorProcessor { def apply[I, O](impl: ActorRef): ActorProcessor[I, O] = { val p = new ActorProcessor[I, O](impl) @@ -26,7 +27,7 @@ private[akka] object ActorProcessor { /** * INTERNAL API */ -private[akka] class ActorProcessor[I, O](impl: ActorRef) extends ActorPublisher[O](impl) +@InternalApi private[akka] class ActorProcessor[I, O](impl: ActorRef) extends ActorPublisher[O](impl) with Processor[I, O] { override def onSubscribe(s: Subscription): Unit = { ReactiveStreamsCompliance.requireNonNullSubscription(s) @@ -46,7 +47,7 @@ private[akka] class ActorProcessor[I, O](impl: ActorRef) extends ActorPublisher[ /** * INTERNAL API */ -private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump) extends DefaultInputTransferStates { +@InternalApi private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump) extends DefaultInputTransferStates { if (size < 1) throw new IllegalArgumentException(s"buffer size must be positive (was: $size)") if ((size & (size - 1)) != 0) throw new IllegalArgumentException(s"buffer size must be a power of two (was: $size)") @@ -158,7 +159,7 @@ private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump) /** * INTERNAL API */ -private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends DefaultOutputTransferStates { +@InternalApi private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends DefaultOutputTransferStates { import ReactiveStreamsCompliance._ protected var exposedPublisher: ActorPublisher[Any] = _ @@ -247,7 +248,7 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D /** * INTERNAL API */ -private[akka] abstract class ActorProcessorImpl(val settings: ActorMaterializerSettings) +@InternalApi private[akka] abstract class ActorProcessorImpl(val settings: ActorMaterializerSettings) extends Actor with ActorLogging with Pump { diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala index 5d5ac6a345..3fefa72b71 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala @@ -4,17 +4,19 @@ package akka.stream.impl import java.util.concurrent.atomic.AtomicReference + import scala.annotation.tailrec import scala.collection.immutable -import scala.util.control.{ NoStackTrace } +import scala.util.control.NoStackTrace import akka.actor.{ Actor, ActorRef, Terminated } +import akka.annotation.InternalApi import org.reactivestreams.{ Publisher, Subscriber } import org.reactivestreams.Subscription /** * INTERNAL API */ -object ActorPublisher { +@InternalApi private[akka] object ActorPublisher { val NormalShutdownReasonMessage = "Cannot subscribe to shut-down Publisher" class NormalShutdownException extends IllegalStateException(NormalShutdownReasonMessage) with NoStackTrace val NormalShutdownReason: Throwable = new NormalShutdownException @@ -35,7 +37,7 @@ object ActorPublisher { * When you instantiate this class, or its subclasses, you MUST send an ExposedPublisher message to the wrapped * ActorRef! If you don't need to subclass, prefer the apply() method on the companion object which takes care of this. */ -class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] { +@InternalApi private[akka] class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] { import ReactiveStreamsCompliance._ // The subscriber of an subscription attempt is first placed in this list of pending subscribers. @@ -97,7 +99,7 @@ class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] { /** * INTERNAL API */ -private[akka] class ActorSubscription[T]( final val impl: ActorRef, final val subscriber: Subscriber[_ >: T]) extends Subscription { +@InternalApi private[akka] class ActorSubscription[T]( final val impl: ActorRef, final val subscriber: Subscriber[_ >: T]) extends Subscription { override def request(elements: Long): Unit = impl ! RequestMore(this, elements) override def cancel(): Unit = impl ! Cancel(this) } @@ -105,13 +107,13 @@ private[akka] class ActorSubscription[T]( final val impl: ActorRef, final val su /** * INTERNAL API */ -private[akka] class ActorSubscriptionWithCursor[T](_impl: ActorRef, _subscriber: Subscriber[_ >: T]) +@InternalApi private[akka] class ActorSubscriptionWithCursor[T](_impl: ActorRef, _subscriber: Subscriber[_ >: T]) extends ActorSubscription[T](_impl, _subscriber) with SubscriptionWithCursor[T] /** * INTERNAL API */ -private[akka] trait SoftShutdown { this: Actor ⇒ +@InternalApi private[akka] trait SoftShutdown { this: Actor ⇒ def softShutdown(): Unit = { val children = context.children if (children.isEmpty) { diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala index c23c0a63e4..bdb8ba9cd2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala @@ -6,18 +6,19 @@ package akka.stream.impl import java.util import akka.actor._ +import akka.annotation.InternalApi import akka.stream.impl.Stages.DefaultAttributes -import akka.stream.{ Inlet, SinkShape, Attributes } +import akka.stream.{ Attributes, Inlet, SinkShape } import akka.stream.Attributes.InputBuffer import akka.stream.stage._ /** * INTERNAL API */ -private[akka] class ActorRefBackpressureSinkStage[In](ref: ActorRef, onInitMessage: Any, - ackMessage: Any, - onCompleteMessage: Any, - onFailureMessage: (Throwable) ⇒ Any) +@InternalApi private[akka] class ActorRefBackpressureSinkStage[In](ref: ActorRef, onInitMessage: Any, + ackMessage: Any, + onCompleteMessage: Any, + onFailureMessage: (Throwable) ⇒ Any) extends GraphStage[SinkShape[In]] { val in: Inlet[In] = Inlet[In]("ActorRefBackpressureSink.in") override def initialAttributes = DefaultAttributes.actorRefWithAck diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkActor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkActor.scala index 6752eef299..93ad580c59 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSinkActor.scala @@ -10,11 +10,12 @@ import akka.actor.Status import akka.stream.actor.WatermarkRequestStrategy import akka.actor.Props import akka.actor.Terminated +import akka.annotation.InternalApi /** * INTERNAL API */ -private[akka] object ActorRefSinkActor { +@InternalApi private[akka] object ActorRefSinkActor { def props(ref: ActorRef, highWatermark: Int, onCompleteMessage: Any): Props = Props(new ActorRefSinkActor(ref, highWatermark, onCompleteMessage)) } @@ -22,7 +23,7 @@ private[akka] object ActorRefSinkActor { /** * INTERNAL API */ -private[akka] class ActorRefSinkActor(ref: ActorRef, highWatermark: Int, onCompleteMessage: Any) extends ActorSubscriber { +@InternalApi private[akka] class ActorRefSinkActor(ref: ActorRef, highWatermark: Int, onCompleteMessage: Any) extends ActorSubscriber { import ActorSubscriberMessage._ override val requestStrategy = WatermarkRequestStrategy(highWatermark) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala index cc03cf2f2c..0fd5206df3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala @@ -6,14 +6,15 @@ package akka.stream.impl import akka.actor.ActorLogging import akka.actor.Props import akka.actor.Status +import akka.annotation.InternalApi import akka.stream.OverflowStrategies._ -import akka.stream.{ BufferOverflowException, OverflowStrategy, OverflowStrategies } +import akka.stream.{ BufferOverflowException, OverflowStrategies, OverflowStrategy } import akka.stream.ActorMaterializerSettings /** * INTERNAL API */ -private[akka] object ActorRefSourceActor { +@InternalApi private[akka] object ActorRefSourceActor { def props(bufferSize: Int, overflowStrategy: OverflowStrategy, settings: ActorMaterializerSettings) = { require(overflowStrategy != OverflowStrategies.Backpressure, "Backpressure overflowStrategy not supported") val maxFixedBufferSize = settings.maxFixedBufferSize @@ -24,7 +25,7 @@ private[akka] object ActorRefSourceActor { /** * INTERNAL API */ -private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: OverflowStrategy, maxFixedBufferSize: Int) +@InternalApi private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: OverflowStrategy, maxFixedBufferSize: Int) extends akka.stream.actor.ActorPublisher[Any] with ActorLogging { import akka.stream.actor.ActorPublisherMessage._ diff --git a/akka-stream/src/main/scala/akka/stream/impl/Buffers.scala b/akka-stream/src/main/scala/akka/stream/impl/Buffers.scala index 5dea3fb303..1b0a8bff36 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Buffers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Buffers.scala @@ -4,12 +4,14 @@ package akka.stream.impl import java.{ util ⇒ ju } + +import akka.annotation.InternalApi import akka.stream._ /** * INTERNAL API */ -private[akka] trait Buffer[T] { +@InternalApi private[akka] trait Buffer[T] { def capacity: Int def used: Int def isFull: Boolean @@ -46,7 +48,7 @@ private[akka] object Buffer { /** * INTERNAL API */ -private[akka] object FixedSizeBuffer { +@InternalApi private[akka] object FixedSizeBuffer { /** * INTERNAL API @@ -57,7 +59,7 @@ private[akka] object FixedSizeBuffer { * * Returns a specialized instance for power-of-two sized buffers. */ - def apply[T](size: Int): FixedSizeBuffer[T] = + @InternalApi private[akka] def apply[T](size: Int): FixedSizeBuffer[T] = if (size < 1) throw new IllegalArgumentException("size must be positive") else if (((size - 1) & size) == 0) new PowerOfTwoFixedSizeBuffer(size) else new ModuloFixedSizeBuffer(size) @@ -140,7 +142,7 @@ private[akka] object FixedSizeBuffer { /** * INTERNAL API */ -private[akka] final class BoundedBuffer[T](val capacity: Int) extends Buffer[T] { +@InternalApi private[akka] final class BoundedBuffer[T](val capacity: Int) extends Buffer[T] { def used: Int = q.used def isFull: Boolean = q.isFull diff --git a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala index 585a43617e..2af88c8663 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala @@ -11,8 +11,7 @@ import scala.concurrent.{ ExecutionContext, Promise } /** * INTERNAL API */ -@InternalApi -private[akka] case object EmptyPublisher extends Publisher[Nothing] { +@InternalApi private[akka] case object EmptyPublisher extends Publisher[Nothing] { import ReactiveStreamsCompliance._ override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit = try { @@ -29,7 +28,7 @@ private[akka] case object EmptyPublisher extends Publisher[Nothing] { /** * INTERNAL API */ -private[akka] final case class ErrorPublisher(t: Throwable, name: String) extends Publisher[Nothing] { +@InternalApi private[akka] final case class ErrorPublisher(t: Throwable, name: String) extends Publisher[Nothing] { ReactiveStreamsCompliance.requireNonNullElement(t) import ReactiveStreamsCompliance._ @@ -48,7 +47,7 @@ private[akka] final case class ErrorPublisher(t: Throwable, name: String) extend /** * INTERNAL API */ -private[akka] final case class MaybePublisher[T]( +@InternalApi private[akka] final case class MaybePublisher[T]( promise: Promise[Option[T]], name: String)(implicit ec: ExecutionContext) extends Publisher[T] { import ReactiveStreamsCompliance._ @@ -95,12 +94,15 @@ private[akka] final case class MaybePublisher[T]( * This is only a legal subscription when it is immediately followed by * a termination signal (onComplete, onError). */ -private[akka] case object CancelledSubscription extends Subscription { +@InternalApi private[akka] case object CancelledSubscription extends Subscription { override def request(elements: Long): Unit = () override def cancel(): Unit = () } -private[akka] final class CancellingSubscriber[T] extends Subscriber[T] { +/** + * INTERNAL API + */ +@InternalApi private[akka] final class CancellingSubscriber[T] extends Subscriber[T] { override def onError(t: Throwable): Unit = () override def onSubscribe(s: Subscription): Unit = s.cancel() override def onComplete(): Unit = () @@ -110,7 +112,7 @@ private[akka] final class CancellingSubscriber[T] extends Subscriber[T] { /** * INTERNAL API */ -private[akka] case object RejectAdditionalSubscribers extends Publisher[Nothing] { +@InternalApi private[akka] case object RejectAdditionalSubscribers extends Publisher[Nothing] { import ReactiveStreamsCompliance._ override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit = try rejectAdditionalSubscriber(subscriber, "Publisher") catch { diff --git a/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala b/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala index fbf54b9f55..256a2860c5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ConstantFun.scala @@ -3,10 +3,14 @@ */ package akka.stream.impl +import akka.annotation.InternalApi import akka.japi.function.{ Function ⇒ JFun, Function2 ⇒ JFun2 } import akka.japi.{ Pair ⇒ JPair } -private[akka] object ConstantFun { +/** + * INTERNAL API + */ +@InternalApi private[akka] object ConstantFun { private[this] val JavaIdentityFunction = new JFun[Any, Any] { @throws(classOf[Exception]) override def apply(param: Any): Any = param } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ExposedPublisherReceive.scala b/akka-stream/src/main/scala/akka/stream/impl/ExposedPublisherReceive.scala index 899f91859f..b1c350c4d9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ExposedPublisherReceive.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ExposedPublisherReceive.scala @@ -4,11 +4,12 @@ package akka.stream.impl import akka.actor.Actor +import akka.annotation.InternalApi /** * INTERNAL API */ -private[akka] abstract class ExposedPublisherReceive(activeReceive: Actor.Receive, unhandled: Any ⇒ Unit) extends Actor.Receive { +@InternalApi private[akka] abstract class ExposedPublisherReceive(activeReceive: Actor.Receive, unhandled: Any ⇒ Unit) extends Actor.Receive { private var stash = List.empty[Any] def isDefinedAt(o: Any): Boolean = true diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala index d07219b643..91655ab9b8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala @@ -4,14 +4,15 @@ package akka.stream.impl import akka.actor._ +import akka.annotation.{ DoNotInherit, InternalApi } import akka.stream.{ AbruptTerminationException, ActorMaterializerSettings } -import akka.stream.actor.{ ActorSubscriberMessage, ActorSubscriber } -import org.reactivestreams.{ Subscription, Subscriber } +import akka.stream.actor.{ ActorSubscriber, ActorSubscriberMessage } +import org.reactivestreams.{ Subscriber, Subscription } /** * INTERNAL API */ -object FanIn { +@InternalApi private[akka] object FanIn { final case class OnError(id: Int, cause: Throwable) extends DeadLetterSuppression with NoSerializationVerificationNeeded final case class OnComplete(id: Int) extends DeadLetterSuppression with NoSerializationVerificationNeeded @@ -252,7 +253,7 @@ object FanIn { /** * INTERNAL API */ -abstract class FanIn(val settings: ActorMaterializerSettings, val inputCount: Int) extends Actor with ActorLogging with Pump { +@DoNotInherit private[akka] class FanIn(val settings: ActorMaterializerSettings, val inputCount: Int) extends Actor with ActorLogging with Pump { import FanIn._ protected val primaryOutputs: Outputs = new SimpleOutputs(self, this) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala index 6d0306e003..0c415361bc 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala @@ -7,12 +7,13 @@ import akka.stream.{ AbruptTerminationException, ActorMaterializerSettings } import scala.collection.immutable import akka.actor._ +import akka.annotation.{ DoNotInherit, InternalApi } import org.reactivestreams.Subscription /** * INTERNAL API */ -object FanOut { +@InternalApi private[akka] object FanOut { final case class SubstreamRequestMore(id: Int, demand: Long) extends DeadLetterSuppression with NoSerializationVerificationNeeded final case class SubstreamCancel(id: Int) extends DeadLetterSuppression with NoSerializationVerificationNeeded @@ -247,7 +248,7 @@ object FanOut { /** * INTERNAL API */ -abstract class FanOut(val settings: ActorMaterializerSettings, val outputCount: Int) extends Actor with ActorLogging with Pump { +@DoNotInherit private[akka] abstract class FanOut(val settings: ActorMaterializerSettings, val outputCount: Int) extends Actor with ActorLogging with Pump { import FanOut._ protected val outputBunch = new OutputBunch(outputCount, self, this) @@ -287,7 +288,7 @@ abstract class FanOut(val settings: ActorMaterializerSettings, val outputCount: /** * INTERNAL API */ -private[akka] object Unzip { +@InternalApi private[akka] object Unzip { def props(settings: ActorMaterializerSettings): Props = Props(new Unzip(settings)).withDeploy(Deploy.local) } @@ -295,7 +296,7 @@ private[akka] object Unzip { /** * INTERNAL API */ -private[akka] class Unzip(_settings: ActorMaterializerSettings) extends FanOut(_settings, outputCount = 2) { +@InternalApi private[akka] class Unzip(_settings: ActorMaterializerSettings) extends FanOut(_settings, outputCount = 2) { outputBunch.markAllOutputs() initialPhase(1, TransferPhase(primaryInputs.NeedsInput && outputBunch.AllOfMarkedOutputs) { () ⇒ diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala index 908dc63246..0711fe3b37 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala @@ -1,13 +1,14 @@ package akka.stream.impl -import akka.actor.{ Deploy, Props, Actor, ActorRef } +import akka.actor.{ Actor, ActorRef, Deploy, Props } +import akka.annotation.{ DoNotInherit, InternalApi } import akka.stream.ActorMaterializerSettings import org.reactivestreams.Subscriber /** * INTERNAL API */ -private[akka] abstract class FanoutOutputs( +@DoNotInherit private[akka] abstract class FanoutOutputs( val maxBufferSize: Int, val initialBufferSize: Int, self: ActorRef, @@ -92,14 +93,17 @@ private[akka] abstract class FanoutOutputs( } -private[akka] object FanoutProcessorImpl { +/** + * INTERNAL API + */ +@InternalApi private[akka] object FanoutProcessorImpl { def props(actorMaterializerSettings: ActorMaterializerSettings): Props = Props(new FanoutProcessorImpl(actorMaterializerSettings)).withDeploy(Deploy.local) } /** * INTERNAL API */ -private[akka] class FanoutProcessorImpl(_settings: ActorMaterializerSettings) +@InternalApi private[akka] class FanoutProcessorImpl(_settings: ActorMaterializerSettings) extends ActorProcessorImpl(_settings) { override val primaryOutputs: FanoutOutputs = diff --git a/akka-stream/src/main/scala/akka/stream/impl/JavaStreamSource.scala b/akka-stream/src/main/scala/akka/stream/impl/JavaStreamSource.scala index 6293828e18..d4c6aadec4 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/JavaStreamSource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/JavaStreamSource.scala @@ -4,9 +4,8 @@ import akka.stream._ import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } import akka.annotation.InternalApi -/** Internal API */ -@InternalApi -private[stream] final class JavaStreamSource[T, S <: java.util.stream.BaseStream[T, S]](open: () ⇒ java.util.stream.BaseStream[T, S]) +/** INTERNAL API */ +@InternalApi private[stream] final class JavaStreamSource[T, S <: java.util.stream.BaseStream[T, S]](open: () ⇒ java.util.stream.BaseStream[T, S]) extends GraphStage[SourceShape[T]] { val out: Outlet[T] = Outlet("JavaStreamSource") diff --git a/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala b/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala index 11b1655938..c8320c57d3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/JsonObjectParser.scala @@ -3,6 +3,7 @@ */ package akka.stream.impl +import akka.annotation.InternalApi import akka.stream.scaladsl.Framing.FramingException import akka.util.ByteString @@ -11,7 +12,7 @@ import scala.annotation.switch /** * INTERNAL API: Use [[akka.stream.scaladsl.JsonFraming]] instead. */ -private[akka] object JsonObjectParser { +@InternalApi private[akka] object JsonObjectParser { final val SquareBraceStart = '['.toByte final val SquareBraceEnd = ']'.toByte @@ -42,7 +43,7 @@ private[akka] object JsonObjectParser { * * Leading whitespace between elements will be trimmed. */ -private[akka] class JsonObjectParser(maximumObjectLength: Int = Int.MaxValue) { +@InternalApi private[akka] class JsonObjectParser(maximumObjectLength: Int = Int.MaxValue) { import JsonObjectParser._ private var buffer: ByteString = ByteString.empty diff --git a/akka-stream/src/main/scala/akka/stream/impl/Messages.scala b/akka-stream/src/main/scala/akka/stream/impl/Messages.scala index a328985356..038ea5bc78 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Messages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Messages.scala @@ -4,28 +4,29 @@ package akka.stream.impl import language.existentials -import akka.actor.{ NoSerializationVerificationNeeded, DeadLetterSuppression } +import akka.actor.{ DeadLetterSuppression, NoSerializationVerificationNeeded } +import akka.annotation.InternalApi /** * INTERNAL API */ -private[akka] case object SubscribePending extends DeadLetterSuppression with NoSerializationVerificationNeeded +@InternalApi private[akka] case object SubscribePending extends DeadLetterSuppression with NoSerializationVerificationNeeded /** * INTERNAL API */ -private[akka] final case class RequestMore(subscription: ActorSubscription[_], demand: Long) +@InternalApi private[akka] final case class RequestMore(subscription: ActorSubscription[_], demand: Long) extends DeadLetterSuppression with NoSerializationVerificationNeeded /** * INTERNAL API */ -private[akka] final case class Cancel(subscription: ActorSubscription[_]) +@InternalApi private[akka] final case class Cancel(subscription: ActorSubscription[_]) extends DeadLetterSuppression with NoSerializationVerificationNeeded /** * INTERNAL API */ -private[akka] final case class ExposedPublisher(publisher: ActorPublisher[Any]) +@InternalApi private[akka] final case class ExposedPublisher(publisher: ActorPublisher[Any]) extends DeadLetterSuppression with NoSerializationVerificationNeeded diff --git a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala index cf6d853bb3..bcef0cf7f9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala @@ -5,6 +5,7 @@ package akka.stream.impl import akka.NotUsed import akka.actor._ +import akka.annotation.{ DoNotInherit, InternalApi } import akka.stream._ import akka.stream.impl.StreamLayout.AtomicModule import org.reactivestreams._ @@ -17,7 +18,7 @@ import akka.util.OptionVal /** * INTERNAL API */ -abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out]) extends AtomicModule[SourceShape[Out], Mat] { +@DoNotInherit private[akka] abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out]) extends AtomicModule[SourceShape[Out], Mat] { protected def label: String = Logging.simpleName(this) final override def toString: String = f"$label [${System.identityHashCode(this)}%08x]" @@ -48,7 +49,7 @@ abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out]) extends Ato * Holds a `Subscriber` representing the input side of the flow. * The `Subscriber` can later be connected to an upstream `Publisher`. */ -final class SubscriberSource[Out](val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, Subscriber[Out]](shape) { +@InternalApi private[akka] final class SubscriberSource[Out](val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, Subscriber[Out]](shape) { override def create(context: MaterializationContext): (Publisher[Out], Subscriber[Out]) = { val processor = new VirtualProcessor[Out] @@ -66,7 +67,7 @@ final class SubscriberSource[Out](val attributes: Attributes, shape: SourceShape * that mediate the flow of elements downstream and the propagation of * back-pressure upstream. */ -final class PublisherSource[Out](p: Publisher[Out], val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, NotUsed](shape) { +@InternalApi private[akka] final class PublisherSource[Out](p: Publisher[Out], val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, NotUsed](shape) { override protected def label: String = s"PublisherSource($p)" @@ -79,7 +80,7 @@ final class PublisherSource[Out](p: Publisher[Out], val attributes: Attributes, /** * INTERNAL API */ -final class MaybeSource[Out](val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, Promise[Option[Out]]](shape) { +@InternalApi private[akka] final class MaybeSource[Out](val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, Promise[Option[Out]]](shape) { override def create(context: MaterializationContext) = { val p = Promise[Option[Out]]() @@ -94,7 +95,7 @@ final class MaybeSource[Out](val attributes: Attributes, shape: SourceShape[Out] * Creates and wraps an actor into [[org.reactivestreams.Publisher]] from the given `props`, * which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorPublisher]]. */ -final class ActorPublisherSource[Out](props: Props, val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, ActorRef](shape) { +@InternalApi private[akka] final class ActorPublisherSource[Out](props: Props, val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, ActorRef](shape) { override def create(context: MaterializationContext) = { val publisherRef = ActorMaterializerHelper.downcast(context.materializer).actorOf(context, props) @@ -109,7 +110,7 @@ final class ActorPublisherSource[Out](props: Props, val attributes: Attributes, /** * INTERNAL API */ -final class ActorRefSource[Out]( +@InternalApi private[akka] final class ActorRefSource[Out]( bufferSize: Int, overflowStrategy: OverflowStrategy, val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, ActorRef](shape) { diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index 0fa66814e6..2d51c492e3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -9,6 +9,7 @@ import java.util.concurrent.atomic.AtomicBoolean import akka.NotUsed import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, Cancellable, Deploy, ExtendedActorSystem, PoisonPill } +import akka.annotation.{ DoNotInherit, InternalApi } import akka.dispatch.Dispatchers import akka.event.{ Logging, LoggingAdapter } import akka.stream.Attributes.InputBuffer @@ -19,18 +20,18 @@ import akka.stream.impl.fusing.GraphInterpreter.Connection import akka.stream.impl.fusing._ import akka.stream.impl.io.{ TLSActor, TlsModule } import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler } -import akka.util.OptionVal -import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } +import org.reactivestreams.{ Processor, Publisher, Subscriber } import scala.collection.immutable.Map import scala.concurrent.duration.FiniteDuration import scala.concurrent.ExecutionContextExecutor import scala.annotation.tailrec -import akka.stream.impl.fusing.GraphInterpreter.DownstreamBoundaryStageLogic -import akka.stream.impl.fusing.GraphInterpreter.UpstreamBoundaryStageLogic import akka.util.OptionVal -object PhasedFusingActorMaterializer { +/** + * INTERNAL API + */ +@InternalApi private[akka] object PhasedFusingActorMaterializer { val Debug = false @@ -61,7 +62,7 @@ object PhasedFusingActorMaterializer { }, GraphStageTag → DefaultPhase) - def apply(settings: ActorMaterializerSettings)(implicit context: ActorRefFactory): ActorMaterializer = { + @InternalApi private[akka] def apply(settings: ActorMaterializerSettings)(implicit context: ActorRefFactory): ActorMaterializer = { val haveShutDown = new AtomicBoolean(false) val system = actorSystemOf(context) val materializerSettings = ActorMaterializerSettings(system) @@ -121,7 +122,7 @@ private final case class ForwardWire( private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOffset: Int, skippedSlots: Int, phase: PhaseIsland[Any]) -class IslandTracking( +@InternalApi private[akka] class IslandTracking( val phases: Map[IslandTag, Phase[Any]], val settings: ActorMaterializerSettings, defaultPhase: Phase[Any], @@ -151,8 +152,8 @@ class IslandTracking( private var currentPhase: PhaseIsland[Any] = defaultPhase.apply(settings, materializer, nextIslandName()) - def getCurrentPhase: PhaseIsland[Any] = currentPhase - def getCurrentOffset: Int = currentGlobalOffset + @InternalApi private[akka] def getCurrentPhase: PhaseIsland[Any] = currentPhase + @InternalApi private[akka] def getCurrentOffset: Int = currentGlobalOffset private def completeSegment(): Unit = { val length = currentGlobalOffset - currentSegmentGlobalOffset @@ -181,7 +182,7 @@ class IslandTracking( } } - def enterIsland(tag: IslandTag, attributes: Attributes): Unit = { + @InternalApi private[akka] def enterIsland(tag: IslandTag, attributes: Attributes): Unit = { completeSegment() val previousPhase = currentPhase val previousIslandOffset = currentIslandGlobalOffset @@ -200,7 +201,7 @@ class IslandTracking( if (Debug) println(s"Entering island starting at offset = $currentIslandGlobalOffset phase = $currentPhase") } - def exitIsland(): Unit = { + @InternalApi private[akka] def exitIsland(): Unit = { val parentIsland = islandStateStack.remove(islandStateStack.size() - 1) val previousSegmentLength = completeSegment() @@ -215,7 +216,7 @@ class IslandTracking( if (Debug) println(s"Exited to island starting at offset = $currentIslandGlobalOffset phase = $currentPhase") } - def wireIn(in: InPort, logic: Any): Unit = { + @InternalApi private[akka] def wireIn(in: InPort, logic: Any): Unit = { // The slot for this InPort always belong to the current segment, so resolving its local // offset/slot is simple val localInSlot = currentGlobalOffset - currentIslandGlobalOffset - currentIslandSkippetSlots @@ -258,7 +259,7 @@ class IslandTracking( currentGlobalOffset += 1 } - def wireOut(out: OutPort, absoluteOffset: Int, logic: Any): Unit = { + @InternalApi private[akka] def wireOut(out: OutPort, absoluteOffset: Int, logic: Any): Unit = { if (Debug) println(s" wiring $out to absolute = $absoluteOffset") // First check if we are wiring backwards. This is important since we can only do resolution for backward wires. @@ -319,7 +320,7 @@ class IslandTracking( } - def allNestedIslandsReady(): Unit = { + @InternalApi private[akka] def allNestedIslandsReady(): Unit = { if (activePhases ne null) { var i = 0 while (i < activePhases.size()) { @@ -331,7 +332,10 @@ class IslandTracking( } -case class PhasedFusingActorMaterializer( +/** + * INTERNAL API + */ +@InternalApi private[akka] case class PhasedFusingActorMaterializer( system: ActorSystem, override val settings: ActorMaterializerSettings, dispatchers: Dispatchers, @@ -547,16 +551,25 @@ case class PhasedFusingActorMaterializer( } -trait IslandTag +/** + * INTERNAL API + */ +@DoNotInherit private[akka] trait IslandTag -trait Phase[M] { +/** + * INTERNAL API + */ +@DoNotInherit private[akka] trait Phase[M] { def apply( effectiveSettings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, islandName: String): PhaseIsland[M] } -trait PhaseIsland[M] { +/** + * INTERNAL API + */ +@DoNotInherit private[akka] trait PhaseIsland[M] { def name: String @@ -574,9 +587,15 @@ trait PhaseIsland[M] { } -object GraphStageTag extends IslandTag +/** + * INTERNAL API + */ +@InternalApi private[akka] object GraphStageTag extends IslandTag -final class GraphStageIsland( +/** + * INTERNAL API + */ +@InternalApi private[akka] final class GraphStageIsland( effectiveSettings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, islandName: String, @@ -739,9 +758,15 @@ final class GraphStageIsland( override def toString: String = "GraphStagePhase" } -object SourceModuleIslandTag extends IslandTag +/** + * INTERNAL API + */ +@InternalApi private[akka] object SourceModuleIslandTag extends IslandTag -final class SourceModulePhase( +/** + * INTERNAL API + */ +@InternalApi private[akka] final class SourceModulePhase( materializer: PhasedFusingActorMaterializer, islandName: String) extends PhaseIsland[Publisher[Any]] { override def name: String = s"SourceModule phase" @@ -763,9 +788,15 @@ final class SourceModulePhase( override def onIslandReady(): Unit = () } -object SinkModuleIslandTag extends IslandTag +/** + * INTERNAL API + */ +@InternalApi private[akka] object SinkModuleIslandTag extends IslandTag -final class SinkModulePhase(materializer: PhasedFusingActorMaterializer, islandName: String) +/** + * INTERNAL API + */ +@InternalApi private[akka] final class SinkModulePhase(materializer: PhasedFusingActorMaterializer, islandName: String) extends PhaseIsland[AnyRef] { override def name: String = s"SourceModule phase" var subscriberOrVirtualPublisher: AnyRef = _ @@ -797,9 +828,15 @@ final class SinkModulePhase(materializer: PhasedFusingActorMaterializer, islandN override def onIslandReady(): Unit = () } -object ProcessorModuleIslandTag extends IslandTag +/** + * INTERNAL API + */ +@InternalApi private[akka] object ProcessorModuleIslandTag extends IslandTag -final class ProcessorModulePhase(materializer: PhasedFusingActorMaterializer, islandName: String) +/** + * INTERNAL API + */ +@InternalApi private[akka] final class ProcessorModulePhase(materializer: PhasedFusingActorMaterializer, islandName: String) extends PhaseIsland[Processor[Any, Any]] { override def name: String = "ProcessorModulePhase" private[this] var processor: Processor[Any, Any] = _ @@ -819,9 +856,15 @@ final class ProcessorModulePhase(materializer: PhasedFusingActorMaterializer, is override def onIslandReady(): Unit = () } -object TlsModuleIslandTag extends IslandTag +/** + * INTERNAL API + */ +@InternalApi private[akka] object TlsModuleIslandTag extends IslandTag -final class TlsModulePhase(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, islandName: String) extends PhaseIsland[NotUsed] { +/** + * INTERNAL API + */ +@InternalApi private[akka] final class TlsModulePhase(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, islandName: String) extends PhaseIsland[NotUsed] { def name: String = "TlsModulePhase" var tlsActor: ActorRef = _ diff --git a/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala b/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala index 7ce2fa6065..e3f3946579 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala @@ -3,13 +3,15 @@ */ package akka.stream.impl +import akka.annotation.InternalApi + import scala.util.control.NonFatal import org.reactivestreams.{ Subscriber, Subscription } /** * INTERNAL API */ -private[stream] object ReactiveStreamsCompliance { +@InternalApi private[stream] object ReactiveStreamsCompliance { final val CanNotSubscribeTheSameSubscriberMultipleTimes = "can not subscribe the same subscriber multiple times (see reactive-streams specification, rules 1.10 and 2.12)" diff --git a/akka-stream/src/main/scala/akka/stream/impl/ResizableMultiReaderRingBuffer.scala b/akka-stream/src/main/scala/akka/stream/impl/ResizableMultiReaderRingBuffer.scala index 7a979c5277..e33f10d838 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ResizableMultiReaderRingBuffer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ResizableMultiReaderRingBuffer.scala @@ -6,6 +6,7 @@ package akka.stream.impl import scala.annotation.tailrec import scala.util.control.NoStackTrace import ResizableMultiReaderRingBuffer._ +import akka.annotation.InternalApi /** * INTERNAL API @@ -13,7 +14,7 @@ import ResizableMultiReaderRingBuffer._ * Contrary to many other ring buffer implementations this one does not automatically overwrite the oldest * elements, rather, if full, the buffer tries to grow and rejects further writes if max capacity is reached. */ -private[akka] class ResizableMultiReaderRingBuffer[T]( +@InternalApi private[akka] class ResizableMultiReaderRingBuffer[T]( initialSize: Int, // constructor param, not field maxSize: Int, // constructor param, not field val cursors: Cursors) { @@ -142,7 +143,7 @@ private[akka] class ResizableMultiReaderRingBuffer[T]( /** * INTERNAL API */ -private[akka] object ResizableMultiReaderRingBuffer { +@InternalApi private[akka] object ResizableMultiReaderRingBuffer { object NothingToReadException extends RuntimeException with NoStackTrace trait Cursors { diff --git a/akka-stream/src/main/scala/akka/stream/impl/SeqActorName.scala b/akka-stream/src/main/scala/akka/stream/impl/SeqActorName.scala index 49db1b9774..a45034cf79 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SeqActorName.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SeqActorName.scala @@ -6,6 +6,8 @@ package akka.stream.impl import java.util.concurrent.atomic.AtomicLong +import akka.annotation.{ DoNotInherit, InternalApi } + /** * INTERNAL API * As discussed in https://github.com/akka/akka/issues/16613 @@ -13,15 +15,22 @@ import java.util.concurrent.atomic.AtomicLong * Generator of sequentially numbered actor names. * Pulled out from HTTP internals, most often used used by streams which materialize actors directly */ -abstract class SeqActorName { +@DoNotInherit private[akka] abstract class SeqActorName { def next(): String def copy(name: String): SeqActorName } -object SeqActorName { + +/** + * INTERNAL API + */ +@InternalApi private[akka] object SeqActorName { def apply(prefix: String) = new SeqActorNameImpl(prefix, new AtomicLong(0)) } -private[akka] final class SeqActorNameImpl(val prefix: String, counter: AtomicLong) extends SeqActorName { +/** + * INTERNAL API + */ +@InternalApi private[akka] final class SeqActorNameImpl(val prefix: String, counter: AtomicLong) extends SeqActorName { def next(): String = prefix + '-' + counter.getAndIncrement() def copy(newPrefix: String): SeqActorName = new SeqActorNameImpl(newPrefix, counter) diff --git a/akka-stream/src/main/scala/akka/stream/impl/SinkholeSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/SinkholeSubscriber.scala index 381fbbd38e..6dbb2fbc5d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SinkholeSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SinkholeSubscriber.scala @@ -4,6 +4,7 @@ package akka.stream.impl import akka.Done +import akka.annotation.InternalApi import scala.concurrent.Promise import org.reactivestreams.{ Subscriber, Subscription } @@ -11,8 +12,7 @@ import org.reactivestreams.{ Subscriber, Subscription } /** * INTERNAL API */ - -private[akka] final class SinkholeSubscriber[T](whenComplete: Promise[Done]) extends Subscriber[T] { +@InternalApi private[akka] final class SinkholeSubscriber[T](whenComplete: Promise[Done]) extends Subscriber[T] { private[this] var running: Boolean = false override def onSubscribe(sub: Subscription): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 1d7fca3d27..0a43c76706 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -8,7 +8,7 @@ import akka.stream.ActorAttributes.SupervisionStrategy import akka.stream.Supervision.{ Stop, stoppingDecider } import akka.stream.impl.QueueSink.{ Output, Pull } import akka.stream.impl.fusing.GraphInterpreter -import akka.{ Done, NotUsed } +import akka.{ Done, NotUsed, annotation } import akka.actor.{ Actor, ActorRef, Props } import akka.stream.Attributes.InputBuffer import akka.stream._ @@ -36,13 +36,14 @@ import scala.compat.java8.FutureConverters._ import scala.compat.java8.OptionConverters._ import java.util.Optional +import akka.annotation.{ DoNotInherit, InternalApi } import akka.event.Logging import akka.util.OptionVal /** * INTERNAL API */ -abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) extends AtomicModule[SinkShape[In], Mat] { +@DoNotInherit private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) extends AtomicModule[SinkShape[In], Mat] { /** * Create the Subscriber or VirtualPublisher that consumes the incoming @@ -82,7 +83,7 @@ abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) extends AtomicModu * elements to fill the internal buffers it will assert back-pressure until * a subscriber connects and creates demand for elements to be emitted. */ -private[akka] class PublisherSink[In](val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, Publisher[In]](shape) { +@InternalApi private[akka] class PublisherSink[In](val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, Publisher[In]](shape) { /* * This method is the reason why SinkModule.create may return something that is @@ -101,7 +102,7 @@ private[akka] class PublisherSink[In](val attributes: Attributes, shape: SinkSha /** * INTERNAL API */ -private[akka] final class FanoutPublisherSink[In]( +@InternalApi private[akka] final class FanoutPublisherSink[In]( val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, Publisher[In]](shape) { @@ -128,7 +129,7 @@ private[akka] final class FanoutPublisherSink[In]( * INTERNAL API * Attaches a subscriber to this stream. */ -final class SubscriberSink[In](subscriber: Subscriber[In], val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, NotUsed](shape) { +@InternalApi private[akka] final class SubscriberSink[In](subscriber: Subscriber[In], val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, NotUsed](shape) { override def create(context: MaterializationContext) = (subscriber, NotUsed) @@ -140,7 +141,7 @@ final class SubscriberSink[In](subscriber: Subscriber[In], val attributes: Attri * INTERNAL API * A sink that immediately cancels its upstream upon materialization. */ -final class CancelSink(val attributes: Attributes, shape: SinkShape[Any]) extends SinkModule[Any, NotUsed](shape) { +@InternalApi private[akka] final class CancelSink(val attributes: Attributes, shape: SinkShape[Any]) extends SinkModule[Any, NotUsed](shape) { override def create(context: MaterializationContext): (Subscriber[Any], NotUsed) = (new CancellingSubscriber[Any], NotUsed) override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, NotUsed] = new CancelSink(attributes, shape) override def withAttributes(attr: Attributes): SinkModule[Any, NotUsed] = new CancelSink(attr, amendShape(attr)) @@ -151,7 +152,7 @@ final class CancelSink(val attributes: Attributes, shape: SinkShape[Any]) extend * Creates and wraps an actor into [[org.reactivestreams.Subscriber]] from the given `props`, * which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorSubscriber]]. */ -final class ActorSubscriberSink[In](props: Props, val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, ActorRef](shape) { +@InternalApi private[akka] final class ActorSubscriberSink[In](props: Props, val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, ActorRef](shape) { override def create(context: MaterializationContext) = { val subscriberRef = ActorMaterializerHelper.downcast(context.materializer).actorOf(context, props) @@ -165,9 +166,9 @@ final class ActorSubscriberSink[In](props: Props, val attributes: Attributes, sh /** * INTERNAL API */ -final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any, - val attributes: Attributes, - shape: SinkShape[In]) extends SinkModule[In, NotUsed](shape) { +@InternalApi private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any, + val attributes: Attributes, + shape: SinkShape[In]) extends SinkModule[In, NotUsed](shape) { override def create(context: MaterializationContext) = { val actorMaterializer = ActorMaterializerHelper.downcast(context.materializer) @@ -184,7 +185,10 @@ final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any, new ActorRefSink[In](ref, onCompleteMessage, attr, amendShape(attr)) } -final class LastOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] { +/** + * INTERNAL API + */ +@InternalApi private[akka] final class LastOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] { val in: Inlet[T] = Inlet("lastOption.in") @@ -222,7 +226,10 @@ final class LastOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape override def toString: String = "LastOptionStage" } -final class HeadOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] { +/** + * INTERNAL API + */ +@InternalApi private[akka] final class HeadOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] { val in: Inlet[T] = Inlet("headOption.in") @@ -255,7 +262,10 @@ final class HeadOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape override def toString: String = "HeadOptionStage" } -final class SeqStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[immutable.Seq[T]]] { +/** + * INTERNAL API + */ +@InternalApi private[akka] final class SeqStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[immutable.Seq[T]]] { val in = Inlet[T]("seq.in") override def toString: String = "SeqStage" @@ -294,7 +304,10 @@ final class SeqStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Fu } } -private[stream] object QueueSink { +/** + * INTERNAL API + */ +@InternalApi private[akka] object QueueSink { sealed trait Output[+T] final case class Pull[T](promise: Promise[Option[T]]) extends Output[T] case object Cancel extends Output[Nothing] @@ -303,7 +316,7 @@ private[stream] object QueueSink { /** * INTERNAL API */ -final class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkShape[T], SinkQueueWithCancel[T]] { +@InternalApi private[akka] final class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkShape[T], SinkQueueWithCancel[T]] { type Requested[E] = Promise[Option[E]] val in = Inlet[T]("queueSink.in") @@ -395,7 +408,10 @@ final class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkShape[T], } } -final class SinkQueueAdapter[T](delegate: SinkQueueWithCancel[T]) extends akka.stream.javadsl.SinkQueueWithCancel[T] { +/** + * INTERNAL API + */ +@InternalApi private[akka] final class SinkQueueAdapter[T](delegate: SinkQueueWithCancel[T]) extends akka.stream.javadsl.SinkQueueWithCancel[T] { import akka.dispatch.ExecutionContexts.{ sameThreadExecutionContext ⇒ same } def pull(): CompletionStage[Optional[T]] = delegate.pull().map(_.asJava)(same).toJava def cancel(): Unit = delegate.cancel() @@ -407,7 +423,7 @@ final class SinkQueueAdapter[T](delegate: SinkQueueWithCancel[T]) extends akka.s * * Helper class to be able to express collection as a fold using mutable data */ -private[akka] final class CollectorState[T, R](val collector: java.util.stream.Collector[T, Any, R]) { +@InternalApi private[akka] final class CollectorState[T, R](val collector: java.util.stream.Collector[T, Any, R]) { lazy val accumulated = collector.supplier().get() private lazy val accumulator = collector.accumulator() @@ -424,7 +440,7 @@ private[akka] final class CollectorState[T, R](val collector: java.util.stream.C * * Helper class to be able to express reduce as a fold for parallel collector */ -private[akka] final class ReducerState[T, R](val collector: java.util.stream.Collector[T, Any, R]) { +@InternalApi private[akka] final class ReducerState[T, R](val collector: java.util.stream.Collector[T, Any, R]) { private var reduced: Any = null.asInstanceOf[Any] private lazy val combiner = collector.combiner() @@ -440,7 +456,7 @@ private[akka] final class ReducerState[T, R](val collector: java.util.stream.Col /** * INTERNAL API */ -final private[stream] class LazySink[T, M](sinkFactory: T ⇒ Future[Sink[T, M]], zeroMat: () ⇒ M) extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] { +@InternalApi final private[stream] class LazySink[T, M](sinkFactory: T ⇒ Future[Sink[T, M]], zeroMat: () ⇒ M) extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] { val in = Inlet[T]("lazySink.in") override def initialAttributes = DefaultAttributes.lazySink override val shape: SinkShape[T] = SinkShape.of(in) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala index 3db2be2bf1..89082d92c3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala @@ -25,7 +25,7 @@ import scala.util.control.NonFatal /** * INTERNAL API */ -private[stream] object QueueSource { +@InternalApi private[akka] object QueueSource { sealed trait Input[+T] final case class Offer[+T](elem: T, promise: Promise[QueueOfferResult]) extends Input[T] case object Completion extends Input[Nothing] @@ -35,7 +35,7 @@ private[stream] object QueueSource { /** * INTERNAL API */ -final class QueueSource[T](maxBuffer: Int, overflowStrategy: OverflowStrategy) extends GraphStageWithMaterializedValue[SourceShape[T], SourceQueueWithComplete[T]] { +@InternalApi private[akka] final class QueueSource[T](maxBuffer: Int, overflowStrategy: OverflowStrategy) extends GraphStageWithMaterializedValue[SourceShape[T], SourceQueueWithComplete[T]] { import QueueSource._ val out = Outlet[T]("queueSource.out") @@ -189,7 +189,10 @@ final class QueueSource[T](maxBuffer: Int, overflowStrategy: OverflowStrategy) e } } -final class SourceQueueAdapter[T](delegate: SourceQueueWithComplete[T]) extends akka.stream.javadsl.SourceQueueWithComplete[T] { +/** + * INTERNAL API + */ +@InternalApi private[akka] final class SourceQueueAdapter[T](delegate: SourceQueueWithComplete[T]) extends akka.stream.javadsl.SourceQueueWithComplete[T] { def offer(elem: T): CompletionStage[QueueOfferResult] = delegate.offer(elem).toJava def watchCompletion(): CompletionStage[Done] = delegate.watchCompletion().toJava def complete(): Unit = delegate.complete() @@ -199,7 +202,7 @@ final class SourceQueueAdapter[T](delegate: SourceQueueWithComplete[T]) extends /** * INTERNAL API */ -final class UnfoldResourceSource[T, S]( +@InternalApi private[akka] final class UnfoldResourceSource[T, S]( create: () ⇒ S, readData: (S) ⇒ Option[T], close: (S) ⇒ Unit) extends GraphStage[SourceShape[T]] { @@ -256,7 +259,10 @@ final class UnfoldResourceSource[T, S]( override def toString = "UnfoldResourceSource" } -final class UnfoldResourceSourceAsync[T, S]( +/** + * INTERNAL API + */ +@InternalApi private[akka] final class UnfoldResourceSourceAsync[T, S]( create: () ⇒ Future[S], readData: (S) ⇒ Future[Option[T]], close: (S) ⇒ Future[Done]) extends GraphStage[SourceShape[T]] { @@ -338,11 +344,17 @@ final class UnfoldResourceSourceAsync[T, S]( } -object LazySource { +/** + * INTERNAL API + */ +@InternalApi private[akka] object LazySource { def apply[T, M](sourceFactory: () ⇒ Source[T, M]) = new LazySource[T, M](sourceFactory) } -final class LazySource[T, M](sourceFactory: () ⇒ Source[T, M]) extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] { +/** + * INTERNAL API + */ +@InternalApi private[akka] final class LazySource[T, M](sourceFactory: () ⇒ Source[T, M]) extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] { val out = Outlet[T]("LazySource.out") override val shape = SourceShape(out) @@ -398,9 +410,10 @@ final class LazySource[T, M](sourceFactory: () ⇒ Source[T, M]) extends GraphSt override def toString = "LazySource" } -/** INTERNAL API */ -@InternalApi -final object EmptySource extends GraphStage[SourceShape[Nothing]] { +/** + * INTERNAL API + */ +@InternalApi private[akka] final object EmptySource extends GraphStage[SourceShape[Nothing]] { val out = Outlet[Nothing]("EmptySource.out") override val shape = SourceShape(out) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 8a1438b8f7..0924e2a7a0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -3,6 +3,7 @@ */ package akka.stream.impl +import akka.annotation.InternalApi import akka.stream.ActorAttributes.SupervisionStrategy import akka.stream.Attributes._ import akka.stream.Supervision.Decider @@ -11,7 +12,7 @@ import akka.stream._ /** * INTERNAL API */ -object Stages { +@InternalApi private[akka] object Stages { object DefaultAttributes { val IODispatcher = ActorAttributes.IODispatcher diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index 0c9fc7f3b7..292ee6a346 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -5,6 +5,7 @@ package akka.stream.impl import java.util.concurrent.atomic.AtomicReference +import akka.annotation.InternalApi import akka.stream._ import akka.stream.impl.Stages.DefaultAttributes import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } @@ -15,7 +16,7 @@ import scala.util.control.NonFatal /** * INTERNAL API */ -object StreamLayout { +@InternalApi private[stream] object StreamLayout { // compile-time constant final val Debug = false @@ -31,7 +32,7 @@ object StreamLayout { /** * INTERNAL API */ -private[stream] object VirtualProcessor { +@InternalApi private[stream] object VirtualProcessor { case object Inert { val subscriber = new CancellingSubscriber[Any] } @@ -91,7 +92,7 @@ private[stream] object VirtualProcessor { * Publisher if things go wrong (like `request(0)` coming in from downstream) and * it must ensure that we drop the Subscriber reference when `cancel` is invoked. */ -private[stream] final class VirtualProcessor[T] extends AtomicReference[AnyRef] with Processor[T, T] { +@InternalApi private[stream] final class VirtualProcessor[T] extends AtomicReference[AnyRef] with Processor[T, T] { import ReactiveStreamsCompliance._ import VirtualProcessor._ @@ -312,7 +313,7 @@ private[stream] final class VirtualProcessor[T] extends AtomicReference[AnyRef] * to the `Subscriber` after having hooked it up with the real `Publisher`, hence * the use of `Inert.subscriber` as a tombstone. */ -private[impl] class VirtualPublisher[T] extends AtomicReference[AnyRef] with Publisher[T] { +@InternalApi private[impl] class VirtualPublisher[T] extends AtomicReference[AnyRef] with Publisher[T] { import ReactiveStreamsCompliance._ import VirtualProcessor.Inert @@ -346,7 +347,7 @@ private[impl] class VirtualPublisher[T] extends AtomicReference[AnyRef] with Pub /** * INTERNAL API */ -final case class ProcessorModule[In, Out, Mat]( +@InternalApi private[akka] final case class ProcessorModule[In, Out, Mat]( val createProcessor: () ⇒ (Processor[In, Out], Mat), attributes: Attributes = DefaultAttributes.processor) extends StreamLayout.AtomicModule[FlowShape[In, Out], Mat] { val inPort = Inlet[In]("ProcessorModule.in") diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamSubscriptionTimeout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamSubscriptionTimeout.scala index 8279d4d914..54dc41e3c6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamSubscriptionTimeout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamSubscriptionTimeout.scala @@ -4,13 +4,18 @@ package akka.stream.impl import akka.actor._ +import akka.annotation.InternalApi import akka.stream.StreamSubscriptionTimeoutTerminationMode.{ CancelTermination, NoopTermination, WarnTermination } import akka.stream.StreamSubscriptionTimeoutSettings import org.reactivestreams._ + import scala.concurrent.duration.FiniteDuration import scala.util.control.NoStackTrace -object StreamSubscriptionTimeoutSupport { +/** + * INTERNAL API + */ +@InternalApi private[akka] object StreamSubscriptionTimeoutSupport { /** * A subscriber who calls `cancel` directly from `onSubscribe` and ignores all other callbacks. @@ -37,7 +42,7 @@ object StreamSubscriptionTimeoutSupport { * Subscription timeout which does not start any scheduled events and always returns `true`. * This specialized implementation is to be used for "noop" timeout mode. */ - case object NoopSubscriptionTimeout extends Cancellable { + @InternalApi private[akka] case object NoopSubscriptionTimeout extends Cancellable { override def cancel() = true override def isCancelled = true } @@ -50,7 +55,7 @@ object StreamSubscriptionTimeoutSupport { * * See `akka.stream.materializer.subscription-timeout` for configuration options. */ -private[akka] trait StreamSubscriptionTimeoutSupport { +@InternalApi private[akka] trait StreamSubscriptionTimeoutSupport { this: Actor with ActorLogging ⇒ import StreamSubscriptionTimeoutSupport._ @@ -112,4 +117,4 @@ private[akka] trait StreamSubscriptionTimeoutSupport { /** * INTERNAL API */ -private[akka] class SubscriptionTimeoutException(msg: String) extends RuntimeException(msg) +@InternalApi private[akka] class SubscriptionTimeoutException(msg: String) extends RuntimeException(msg) diff --git a/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala index 2a6ef2f6a7..3282f91403 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SubFlowImpl.scala @@ -4,17 +4,25 @@ package akka.stream.impl import akka.NotUsed +import akka.annotation.InternalApi import akka.stream._ import akka.stream.scaladsl._ + import language.higherKinds -object SubFlowImpl { +/** + * INTERNAL API + */ +@InternalApi private[akka] object SubFlowImpl { trait MergeBack[In, F[+_]] { def apply[T](f: Flow[In, T, NotUsed], breadth: Int): F[T] } } -class SubFlowImpl[In, Out, Mat, F[+_], C]( +/** + * INTERNAL API + */ +@InternalApi private[akka] class SubFlowImpl[In, Out, Mat, F[+_], C]( val subFlow: Flow[In, Out, NotUsed], mergeBackFunction: SubFlowImpl.MergeBack[In, F], finishFunction: Sink[In, NotUsed] ⇒ C) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala b/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala index 15a5fcc4ab..ee7bfb1c7c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala @@ -3,6 +3,7 @@ */ package akka.stream.impl +import akka.annotation.InternalApi import akka.stream.ThrottleMode.{ Enforcing, Shaping } import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage import akka.stream.stage._ @@ -14,7 +15,7 @@ import scala.concurrent.duration.{ FiniteDuration, _ } /** * INTERNAL API */ -class Throttle[T]( +@InternalApi private[akka] class Throttle[T]( val cost: Int, val per: FiniteDuration, val maximumBurst: Int, diff --git a/akka-stream/src/main/scala/akka/stream/impl/Timers.scala b/akka-stream/src/main/scala/akka/stream/impl/Timers.scala index db65d5ee86..27514717e3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Timers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Timers.scala @@ -5,6 +5,7 @@ package akka.stream.impl import java.util.concurrent.{ TimeUnit, TimeoutException } +import akka.annotation.InternalApi import akka.stream._ import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage @@ -23,7 +24,7 @@ import scala.concurrent.duration.{ Duration, FiniteDuration } * - if the timer fires before the event happens, these stages all fail the stream * - otherwise, these streams do not interfere with the element flow, ordinary completion or failure */ -object Timers { +@InternalApi private[akka] object Timers { private def idleTimeoutCheckInterval(timeout: FiniteDuration): FiniteDuration = { import scala.concurrent.duration._ FiniteDuration( diff --git a/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala b/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala index b10d2fe564..7c9eb71491 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala @@ -4,12 +4,13 @@ package akka.stream.impl import scala.util.control.NonFatal -import akka.actor.{ Actor } +import akka.actor.Actor +import akka.annotation.InternalApi /** * INTERNAL API */ -private[akka] class SubReceive(initial: Actor.Receive) extends Actor.Receive { +@InternalApi private[akka] class SubReceive(initial: Actor.Receive) extends Actor.Receive { private var currentReceive = initial override def isDefinedAt(msg: Any): Boolean = currentReceive.isDefinedAt(msg) @@ -23,7 +24,7 @@ private[akka] class SubReceive(initial: Actor.Receive) extends Actor.Receive { /** * INTERNAL API */ -private[akka] trait Inputs { +@InternalApi private[akka] trait Inputs { def NeedsInput: TransferState def NeedsInputOrComplete: TransferState @@ -42,7 +43,7 @@ private[akka] trait Inputs { /** * INTERNAL API */ -private[akka] trait DefaultInputTransferStates extends Inputs { +@InternalApi private[akka] trait DefaultInputTransferStates extends Inputs { override val NeedsInput: TransferState = new TransferState { def isReady = inputsAvailable def isCompleted = inputsDepleted @@ -56,7 +57,7 @@ private[akka] trait DefaultInputTransferStates extends Inputs { /** * INTERNAL API */ -private[akka] trait Outputs { +@InternalApi private[akka] trait Outputs { def NeedsDemand: TransferState def NeedsDemandOrCancel: TransferState @@ -78,7 +79,7 @@ private[akka] trait Outputs { /** * INTERNAL API */ -private[akka] trait DefaultOutputTransferStates extends Outputs { +@InternalApi private[akka] trait DefaultOutputTransferStates extends Outputs { override val NeedsDemand: TransferState = new TransferState { def isReady = demandAvailable def isCompleted = isClosed @@ -93,7 +94,7 @@ private[akka] trait DefaultOutputTransferStates extends Outputs { /** * INTERNAL API */ -private[akka] trait TransferState { +@InternalApi private[akka] trait TransferState { def isReady: Boolean def isCompleted: Boolean def isExecutable = isReady && !isCompleted @@ -112,7 +113,7 @@ private[akka] trait TransferState { /** * INTERNAL API */ -private[akka] object Completed extends TransferState { +@InternalApi private[akka] object Completed extends TransferState { def isReady = false def isCompleted = true } @@ -120,7 +121,7 @@ private[akka] object Completed extends TransferState { /** * INTERNAL API */ -private[akka] object NotInitialized extends TransferState { +@InternalApi private[akka] object NotInitialized extends TransferState { def isReady = false def isCompleted = false } @@ -128,7 +129,7 @@ private[akka] object NotInitialized extends TransferState { /** * INTERNAL API */ -private[akka] case class WaitingForUpstreamSubscription(remaining: Int, andThen: TransferPhase) extends TransferState { +@InternalApi private[akka] case class WaitingForUpstreamSubscription(remaining: Int, andThen: TransferPhase) extends TransferState { def isReady = false def isCompleted = false } @@ -136,7 +137,7 @@ private[akka] case class WaitingForUpstreamSubscription(remaining: Int, andThen: /** * INTERNAL API */ -private[akka] object Always extends TransferState { +@InternalApi private[akka] object Always extends TransferState { def isReady = true def isCompleted = false } @@ -144,12 +145,12 @@ private[akka] object Always extends TransferState { /** * INTERNAL API */ -private[akka] final case class TransferPhase(precondition: TransferState)(val action: () ⇒ Unit) +@InternalApi private[akka] final case class TransferPhase(precondition: TransferState)(val action: () ⇒ Unit) /** * INTERNAL API */ -private[akka] trait Pump { +@InternalApi private[akka] trait Pump { private var transferState: TransferState = NotInitialized private var currentAction: () ⇒ Unit = () ⇒ throw new IllegalStateException("Pump has been not initialized with a phase") diff --git a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala index 79387280ff..b7d5c2d16e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala @@ -4,15 +4,19 @@ package akka.stream.impl +import akka.annotation.{ DoNotInherit, InternalApi } import akka.stream._ import akka.stream.impl.StreamLayout.AtomicModule import akka.stream.impl.TraversalBuilder.{ AnyFunction1, AnyFunction2 } import akka.stream.scaladsl.Keep import akka.util.OptionVal + import scala.language.existentials import scala.collection.immutable.Map.Map1 /** + * INTERNAL API + * * Graphs to be materialized are defined by their traversal. There is no explicit graph information tracked, instead * a sequence of steps required to "reconstruct" the graph. * @@ -31,7 +35,7 @@ import scala.collection.immutable.Map.Map1 * be encoded somehow. The two imports don't need any special treatment as they are at different positions in * the traversal. See [[MaterializeAtomic]] for more details. */ -sealed trait Traversal { +@InternalApi private[akka] sealed trait Traversal { /** * Concatenates two traversals building a new Traversal which traverses both. @@ -43,7 +47,10 @@ sealed trait Traversal { def rewireFirstTo(relativeOffset: Int): Traversal = null } -object Concat { +/** + * INTERNAL API + */ +@InternalApi private[akka] object Concat { def normalizeConcat(first: Traversal, second: Traversal): Traversal = { if (second eq EmptyTraversal) first @@ -70,9 +77,11 @@ object Concat { } /** + * INTERNAL API + * * A Traversal that consists of two traversals. The linked traversals must be traversed in first, next order. */ -final case class Concat(first: Traversal, next: Traversal) extends Traversal { +@InternalApi private[akka] final case class Concat(first: Traversal, next: Traversal) extends Traversal { override def rewireFirstTo(relativeOffset: Int): Traversal = { val firstResult = first.rewireFirstTo(relativeOffset) if (firstResult ne null) @@ -84,6 +93,8 @@ final case class Concat(first: Traversal, next: Traversal) extends Traversal { } /** + * INTERNAL API + * * Arriving at this step means that an atomic module needs to be materialized (or any other activity which * assigns something to wired output-input port pairs). * @@ -101,28 +112,47 @@ final case class Concat(first: Traversal, next: Traversal) extends Traversal { * * See the `TraversalTestUtils` class and the `testMaterialize` method for a simple example. */ -final case class MaterializeAtomic(module: AtomicModule[Shape, Any], outToSlots: Array[Int]) extends Traversal { +@InternalApi private[akka] final case class MaterializeAtomic(module: AtomicModule[Shape, Any], outToSlots: Array[Int]) extends Traversal { override def toString: String = s"MaterializeAtomic($module, ${outToSlots.mkString("[", ", ", "]")})" override def rewireFirstTo(relativeOffset: Int): Traversal = copy(outToSlots = Array(relativeOffset)) } /** + * INTERNAL API + * * Traversal with no steps. */ -object EmptyTraversal extends Traversal { +@InternalApi private[akka] object EmptyTraversal extends Traversal { override def concat(that: Traversal): Traversal = that } -sealed trait MaterializedValueOp extends Traversal +/** + * INTERNAL API + */ +@InternalApi private[akka] sealed trait MaterializedValueOp extends Traversal -case object Pop extends MaterializedValueOp -case object PushNotUsed extends MaterializedValueOp -final case class Transform(mapper: AnyFunction1) extends MaterializedValueOp { +/** + * INTERNAL API + */ +@InternalApi private[akka] case object Pop extends MaterializedValueOp + +/** + * INTERNAL API + */ +@InternalApi private[akka] case object PushNotUsed extends MaterializedValueOp + +/** + * INTERNAL API + */ +@InternalApi private[akka] final case class Transform(mapper: AnyFunction1) extends MaterializedValueOp { def apply(arg: Any): Any = mapper.asInstanceOf[Any ⇒ Any](arg) } -final case class Compose(composer: AnyFunction2, reverse: Boolean = false) extends MaterializedValueOp { +/** + * INTERNAL API + */ +@InternalApi private[akka] final case class Compose(composer: AnyFunction2, reverse: Boolean = false) extends MaterializedValueOp { def apply(arg1: Any, arg2: Any): Any = { if (reverse) composer.asInstanceOf[(Any, Any) ⇒ Any](arg2, arg1) @@ -131,13 +161,30 @@ final case class Compose(composer: AnyFunction2, reverse: Boolean = false) exten } } -final case class PushAttributes(attributes: Attributes) extends Traversal -final case object PopAttributes extends Traversal +/** + * INTERNAL API + */ +@InternalApi private[akka] final case class PushAttributes(attributes: Attributes) extends Traversal -final case class EnterIsland(islandTag: IslandTag) extends Traversal -final case object ExitIsland extends Traversal +/** + * INTERNAL API + */ +@InternalApi private[akka] final case object PopAttributes extends Traversal -object TraversalBuilder { +/** + * INTERNAL API + */ +@InternalApi private[akka] final case class EnterIsland(islandTag: IslandTag) extends Traversal + +/** + * INTERNAL API + */ +@InternalApi private[akka] final case object ExitIsland extends Traversal + +/** + * INTERNAL API + */ +@InternalApi private[akka] object TraversalBuilder { // The most generic function1 and function2 (also completely useless, as we have thrown away all types) // needs to be casted once to be useful (pending runtime exception in cases of bugs). type AnyFunction1 = Nothing ⇒ Any @@ -146,10 +193,12 @@ object TraversalBuilder { private val cachedEmptyCompleted = CompletedTraversalBuilder(PushNotUsed, 0, Map.empty, Attributes.none) /** + * INTERNAL API + * * Assign ports their id, which is their position inside the Shape. This is used both by the GraphInterpreter * and the layout system here. */ - private[impl] def initShape(shape: Shape): Unit = { + @InternalApi private[impl] def initShape(shape: Shape): Unit = { // Initialize port IDs val inlets = shape.inlets if (inlets.nonEmpty) { @@ -180,15 +229,20 @@ object TraversalBuilder { } } - def empty(attributes: Attributes = Attributes.none): TraversalBuilder = { + /** + * INTERNAL API + */ + @InternalApi private[akka] def empty(attributes: Attributes = Attributes.none): TraversalBuilder = { if (attributes eq Attributes.none) cachedEmptyCompleted else CompletedTraversalBuilder(PushNotUsed, 0, Map.empty, attributes) } /** + * INTERNAL API + * * Create a generic traversal builder starting from an atomic module. */ - def atomic(module: AtomicModule[Shape, Any], attributes: Attributes): TraversalBuilder = { + @InternalApi private[akka] def atomic(module: AtomicModule[Shape, Any], attributes: Attributes): TraversalBuilder = { initShape(module.shape) val builder = @@ -210,7 +264,10 @@ object TraversalBuilder { builder.setAttributes(attributes) } - def printTraversal(t: Traversal, indent: Int = 0): Unit = { + /** + * INTERNAL API + */ + @InternalApi private[impl] def printTraversal(t: Traversal, indent: Int = 0): Unit = { var current: Traversal = t var slot = 0 @@ -240,7 +297,10 @@ object TraversalBuilder { } } - def printWiring(t: Traversal, baseSlot: Int = 0): Int = { + /** + * INTERNAL API + */ + @InternalApi private[impl] def printWiring(t: Traversal, baseSlot: Int = 0): Int = { var current: Traversal = t var slot = baseSlot @@ -271,6 +331,8 @@ object TraversalBuilder { } /** + * INTERNAL API + * * A builder for a Traversal. The purpose of subclasses of this trait is to eventually build a Traversal that * describes the graph. Depending on whether the graph is linear or generic different approaches can be used but * they still result in a Traversal. @@ -279,7 +341,7 @@ object TraversalBuilder { * wired). The Traversal may be accessed earlier, depending on the type of the builder and certain conditions. * See [[CompositeTraversalBuilder]] and [[LinearTraversalBuilder]]. */ -sealed trait TraversalBuilder { +@DoNotInherit private[akka] sealed trait TraversalBuilder { /** * Adds a module to the builder. It is possible to add a module with a different Shape (import), in this @@ -377,9 +439,11 @@ sealed trait TraversalBuilder { } /** + * INTERNAL API + * * Returned by [[CompositeTraversalBuilder]] once all output ports of a subgraph has been wired. */ -final case class CompletedTraversalBuilder( +@InternalApi private[akka] final case class CompletedTraversalBuilder( traversalSoFar: Traversal, inSlots: Int, inToOffset: Map[InPort, Int], @@ -438,10 +502,12 @@ final case class CompletedTraversalBuilder( } /** + * INTERNAL API + * * Represents a builder that contains a single atomic module. Its primary purpose is to track and build the * outToSlot array which will be then embedded in a [[MaterializeAtomic]] Traversal step. */ -final case class AtomicTraversalBuilder( +@InternalApi private[akka] final case class AtomicTraversalBuilder( module: AtomicModule[Shape, Any], outToSlot: Array[Int], unwiredOuts: Int, @@ -499,7 +565,10 @@ final case class AtomicTraversalBuilder( TraversalBuilder.empty().add(this, module.shape, Keep.right).makeIsland(islandTag) } -object LinearTraversalBuilder { +/** + * INTERNAL API + */ +@InternalApi private[akka] object LinearTraversalBuilder { // TODO: Remove private val cachedEmptyLinear = LinearTraversalBuilder(OptionVal.None, OptionVal.None, 0, 0, PushNotUsed, OptionVal.None, Attributes.none) @@ -594,6 +663,8 @@ object LinearTraversalBuilder { } /** + * INTERNAL API + * * Traversal builder that is optimized for linear graphs (those that contain modules with at most one input and * at most one output port). The Traversal is simply built up in reverse order and output ports are automatically * assigned to -1 due to the nature of the graph. The only exception is when composites created by @@ -601,7 +672,7 @@ object LinearTraversalBuilder { * in a fixed location, therefore the last step of the Traversal might need to be changed in those cases from the * -1 relative offset to something else (see rewireLastOutTo). */ -final case class LinearTraversalBuilder( +@InternalApi private[akka] final case class LinearTraversalBuilder( inPort: OptionVal[InPort], outPort: OptionVal[OutPort], inOffset: Int, @@ -978,19 +1049,31 @@ final case class LinearTraversalBuilder( } } -sealed trait TraversalBuildStep /** + * INTERNAL API + */ +@DoNotInherit private[akka] sealed trait TraversalBuildStep + +/** + * INTERNAL API + * * Helper class that is only used to identify a [[TraversalBuilder]] in a [[CompositeTraversalBuilder]]. The * reason why this is needed is because the builder is referenced at various places, while it needs to be mutated. * In an immutable data structure this is best done with an indirection, i.e. places refer to this immutable key and * look up the current state in an extra Map. */ -final class BuilderKey extends TraversalBuildStep { +@InternalApi private[akka] final class BuilderKey extends TraversalBuildStep { override def toString = s"K:$hashCode" } -final case class AppendTraversal(traversal: Traversal) extends TraversalBuildStep /** + * INTERNAL API + */ +@InternalApi private[akka] final case class AppendTraversal(traversal: Traversal) extends TraversalBuildStep + +/** + * INTERNAL API + * * A generic builder that builds a traversal for graphs of arbitrary shape. The memory retained by this class * usually decreases as ports are wired since auxiliary data is only maintained for ports that are unwired. * @@ -1011,7 +1094,7 @@ final case class AppendTraversal(traversal: Traversal) extends TraversalBuildSte * @param outOwners Map of output ports to their parent builders (actually the BuilderKey) * @param unwiredOuts Number of output ports that have not yet been wired/assigned */ -final case class CompositeTraversalBuilder( +@InternalApi private[akka] final case class CompositeTraversalBuilder( finalSteps: Traversal = EmptyTraversal, reverseBuildSteps: List[TraversalBuildStep] = AppendTraversal(PushNotUsed) :: Nil, inSlots: Int = 0, diff --git a/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala b/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala index 1045d488d7..d007def2af 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala @@ -3,8 +3,9 @@ */ package akka.stream.impl +import akka.annotation.InternalApi import akka.stream.impl.Stages.DefaultAttributes -import akka.stream.stage.{ OutHandler, GraphStageLogic, GraphStage } +import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } import akka.stream._ import scala.concurrent.Future @@ -13,7 +14,7 @@ import scala.util.{ Failure, Success, Try } /** * INTERNAL API */ -final class Unfold[S, E](s: S, f: S ⇒ Option[(S, E)]) extends GraphStage[SourceShape[E]] { +@InternalApi private[akka] final class Unfold[S, E](s: S, f: S ⇒ Option[(S, E)]) extends GraphStage[SourceShape[E]] { val out: Outlet[E] = Outlet("Unfold.out") override val shape: SourceShape[E] = SourceShape(out) override def initialAttributes: Attributes = DefaultAttributes.unfold @@ -36,7 +37,7 @@ final class Unfold[S, E](s: S, f: S ⇒ Option[(S, E)]) extends GraphStage[Sourc /** * INTERNAL API */ -final class UnfoldAsync[S, E](s: S, f: S ⇒ Future[Option[(S, E)]]) extends GraphStage[SourceShape[E]] { +@InternalApi private[akka] final class UnfoldAsync[S, E](s: S, f: S ⇒ Future[Option[(S, E)]]) extends GraphStage[SourceShape[E]] { val out: Outlet[E] = Outlet("UnfoldAsync.out") override val shape: SourceShape[E] = SourceShape(out) override def initialAttributes: Attributes = DefaultAttributes.unfoldAsync diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index 2b15e549dc..750381c55b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -8,6 +8,7 @@ import java.util.concurrent.TimeoutException import java.util.concurrent.atomic.AtomicReference import akka.actor._ +import akka.annotation.InternalApi import akka.event.Logging import akka.stream._ import akka.stream.impl.ReactiveStreamsCompliance._ @@ -24,7 +25,7 @@ import scala.util.control.NonFatal /** * INTERNAL API */ -object ActorGraphInterpreter { +@InternalApi private[akka] object ActorGraphInterpreter { object Resume extends DeadLetterSuppression with NoSerializationVerificationNeeded @@ -435,7 +436,7 @@ object ActorGraphInterpreter { /** * INTERNAL API */ -final class GraphInterpreterShell( +@InternalApi private[akka] final class GraphInterpreterShell( var connections: Array[Connection], var logics: Array[GraphStageLogic], settings: ActorMaterializerSettings, @@ -646,7 +647,7 @@ final class GraphInterpreterShell( /** * INTERNAL API */ -final class ActorGraphInterpreter(_initial: GraphInterpreterShell) extends Actor with ActorLogging { +@InternalApi private[akka] final class ActorGraphInterpreter(_initial: GraphInterpreterShell) extends Actor with ActorLogging { import ActorGraphInterpreter._ var activeInterpreters = Set.empty[GraphInterpreterShell] diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index 978a728692..18bf034607 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -9,6 +9,8 @@ import akka.stream.stage._ import akka.stream._ import java.util.concurrent.ThreadLocalRandom +import akka.annotation.InternalApi + import scala.util.control.NonFatal /** @@ -16,7 +18,7 @@ import scala.util.control.NonFatal * * (See the class for the documentation of the internals) */ -object GraphInterpreter { +@InternalApi private[akka] object GraphInterpreter { /** * Compile time constant, enable it for debug logging to the console. */ @@ -84,9 +86,6 @@ object GraphInterpreter { else s"Connection($id, $portState, $slot, $inHandler, $outHandler)" } - /** - * INTERNAL API - */ private val _currentInterpreter = new ThreadLocal[Array[AnyRef]] { /* * Using an Object-array avoids holding on to the GraphInterpreter class @@ -187,7 +186,7 @@ object GraphInterpreter { * be modeled, or even dissolved (if preempted and a "stealing" external event is injected; for example the non-cycle * edge of a balance is pulled, dissolving the original cycle). */ -final class GraphInterpreter( +@InternalApi private[akka] final class GraphInterpreter( val materializer: Materializer, val log: LoggingAdapter, val logics: Array[GraphStageLogic], // Array of stage logics @@ -203,7 +202,7 @@ final class GraphInterpreter( /** * INTERNAL API */ - private[stream] var activeStage: GraphStageLogic = _ + @InternalApi private[stream] var activeStage: GraphStageLogic = _ // The number of currently running stages. Once this counter reaches zero, the interpreter is considered to be // completed @@ -245,7 +244,7 @@ final class GraphInterpreter( /** * INTERNAL API */ - private[stream] def nonNull: GraphInterpreter = this + @InternalApi private[stream] def nonNull: GraphInterpreter = this /** * Dynamic handler changes are communicated from a GraphStageLogic by this method. diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index e55a36d9a4..56cf394aa0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -7,6 +7,7 @@ import akka.Done import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference } import akka.actor.Cancellable +import akka.annotation.InternalApi import akka.dispatch.ExecutionContexts import akka.event.Logging import akka.stream.FlowMonitorState._ @@ -27,7 +28,7 @@ import scala.util.Try * INTERNAL API */ // TODO: Fix variance issues -final case class GraphStageModule[+S <: Shape @uncheckedVariance, +M]( +@InternalApi private[akka] final case class GraphStageModule[+S <: Shape @uncheckedVariance, +M]( shape: S, attributes: Attributes, stage: GraphStageWithMaterializedValue[S, M]) extends AtomicModule[S, M] { @@ -44,18 +45,18 @@ final case class GraphStageModule[+S <: Shape @uncheckedVariance, +M]( /** * INTERNAL API */ -object GraphStages { +@InternalApi private[akka] object GraphStages { /** * INTERNAL API */ - abstract class SimpleLinearGraphStage[T] extends GraphStage[FlowShape[T, T]] { + @InternalApi private[akka] abstract class SimpleLinearGraphStage[T] extends GraphStage[FlowShape[T, T]] { val in = Inlet[T](Logging.simpleName(this) + ".in") val out = Outlet[T](Logging.simpleName(this) + ".out") override val shape = FlowShape(in, out) } - object Identity extends SimpleLinearGraphStage[Any] { + private object Identity extends SimpleLinearGraphStage[Any] { override def initialAttributes = DefaultAttributes.identityOp override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { @@ -74,7 +75,7 @@ object GraphStages { /** * INTERNAL API */ - final class Detacher[T] extends SimpleLinearGraphStage[T] { + @InternalApi private[akka] final class Detacher[T] extends SimpleLinearGraphStage[T] { override def initialAttributes = DefaultAttributes.detacher override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { @@ -302,7 +303,7 @@ object GraphStages { * INTERNAL API * Discards all received elements. */ - object IgnoreSink extends GraphStageWithMaterializedValue[SinkShape[Any], Future[Done]] { + @InternalApi private[akka] object IgnoreSink extends GraphStageWithMaterializedValue[SinkShape[Any], Future[Done]] { val in = Inlet[Any]("Ignore.in") val shape = SinkShape(in) @@ -344,7 +345,7 @@ object GraphStages { * This can either be implemented inside the stage itself, or this method can be used, * which adds a detacher stage to every input. */ - private[stream] def withDetachedInputs[T](stage: GraphStage[UniformFanInShape[T, T]]) = + @InternalApi private[stream] def withDetachedInputs[T](stage: GraphStage[UniformFanInShape[T, T]]) = GraphDSL.create() { implicit builder ⇒ import GraphDSL.Implicits._ val concat = builder.add(stage) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 4efc12d796..567c175188 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -4,15 +4,18 @@ package akka.stream.impl.fusing import java.util.concurrent.TimeUnit.NANOSECONDS + +import akka.annotation.{ DoNotInherit, InternalApi } import akka.event.Logging.LogLevel import akka.event.{ LogSource, Logging, LoggingAdapter } import akka.stream.Attributes.{ InputBuffer, LogLevels } import akka.stream.OverflowStrategies._ import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage -import akka.stream.impl.{ Buffer ⇒ BufferImpl, Stages, ReactiveStreamsCompliance } -import akka.stream.scaladsl.{ SourceQueue, Source } +import akka.stream.impl.{ ReactiveStreamsCompliance, Stages, Buffer ⇒ BufferImpl } +import akka.stream.scaladsl.{ Source, SourceQueue } import akka.stream.stage._ import akka.stream.{ Supervision, _ } + import scala.annotation.tailrec import scala.collection.immutable import scala.collection.immutable.VectorBuilder @@ -20,13 +23,14 @@ import scala.concurrent.Future import scala.util.control.NonFatal import scala.util.{ Failure, Success, Try } import akka.stream.ActorAttributes.SupervisionStrategy + import scala.concurrent.duration.{ FiniteDuration, _ } import akka.stream.impl.Stages.DefaultAttributes /** * INTERNAL API */ -final case class Map[In, Out](f: In ⇒ Out) extends GraphStage[FlowShape[In, Out]] { +@InternalApi private[akka] final case class Map[In, Out](f: In ⇒ Out) extends GraphStage[FlowShape[In, Out]] { val in = Inlet[In]("Map.in") val out = Outlet[Out]("Map.out") override val shape = FlowShape(in, out) @@ -58,7 +62,7 @@ final case class Map[In, Out](f: In ⇒ Out) extends GraphStage[FlowShape[In, Ou /** * INTERNAL API */ -final case class Filter[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T] { +@InternalApi private[akka] final case class Filter[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T] { override def initialAttributes: Attributes = DefaultAttributes.filter override def toString: String = "Filter" @@ -92,7 +96,7 @@ final case class Filter[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T] { /** * INTERNAL API */ -final case class TakeWhile[T](p: T ⇒ Boolean, inclusive: Boolean = false) extends SimpleLinearGraphStage[T] { +@InternalApi private[akka] final case class TakeWhile[T](p: T ⇒ Boolean, inclusive: Boolean = false) extends SimpleLinearGraphStage[T] { override def initialAttributes: Attributes = DefaultAttributes.takeWhile override def toString: String = "TakeWhile" @@ -129,7 +133,7 @@ final case class TakeWhile[T](p: T ⇒ Boolean, inclusive: Boolean = false) exte /** * INTERNAL API */ -final case class DropWhile[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T] { +@InternalApi private[akka] final case class DropWhile[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T] { override def initialAttributes: Attributes = DefaultAttributes.dropWhile def createLogic(inheritedAttributes: Attributes) = new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler { @@ -161,7 +165,7 @@ final case class DropWhile[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T /** * INTERNAL API */ -abstract class SupervisedGraphStageLogic(inheritedAttributes: Attributes, shape: Shape) extends GraphStageLogic(shape) { +@DoNotInherit private[akka] abstract class SupervisedGraphStageLogic(inheritedAttributes: Attributes, shape: Shape) extends GraphStageLogic(shape) { private lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) def withSupervision[T](f: () ⇒ T): Option[T] = @@ -194,7 +198,7 @@ private[stream] object Collect { /** * INTERNAL API */ -final case class Collect[In, Out](pf: PartialFunction[In, Out]) extends GraphStage[FlowShape[In, Out]] { +@InternalApi private[akka] final case class Collect[In, Out](pf: PartialFunction[In, Out]) extends GraphStage[FlowShape[In, Out]] { val in = Inlet[In]("Collect.in") val out = Outlet[Out]("Collect.out") override val shape = FlowShape(in, out) @@ -228,7 +232,7 @@ final case class Collect[In, Out](pf: PartialFunction[In, Out]) extends GraphSta /** * INTERNAL API */ -final case class Recover[T](pf: PartialFunction[Throwable, T]) extends SimpleLinearGraphStage[T] { +@InternalApi private[akka] final case class Recover[T](pf: PartialFunction[Throwable, T]) extends SimpleLinearGraphStage[T] { override protected def initialAttributes: Attributes = DefaultAttributes.recover override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { @@ -276,7 +280,7 @@ final case class Recover[T](pf: PartialFunction[Throwable, T]) extends SimpleLin * it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover * would log the `t2` error. */ -final case class MapError[T](f: PartialFunction[Throwable, Throwable]) extends SimpleLinearGraphStage[T] { +@InternalApi private[akka] final case class MapError[T](f: PartialFunction[Throwable, Throwable]) extends SimpleLinearGraphStage[T] { override def createLogic(attr: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler { override def onPush(): Unit = push(out, grab(in)) @@ -294,7 +298,7 @@ final case class MapError[T](f: PartialFunction[Throwable, Throwable]) extends S /** * INTERNAL API */ -final case class Take[T](count: Long) extends SimpleLinearGraphStage[T] { +@InternalApi private[akka] final case class Take[T](count: Long) extends SimpleLinearGraphStage[T] { override def initialAttributes: Attributes = DefaultAttributes.take override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { @@ -322,7 +326,7 @@ final case class Take[T](count: Long) extends SimpleLinearGraphStage[T] { /** * INTERNAL API */ -final case class Drop[T](count: Long) extends SimpleLinearGraphStage[T] { +@InternalApi private[akka] final case class Drop[T](count: Long) extends SimpleLinearGraphStage[T] { override def initialAttributes: Attributes = DefaultAttributes.drop override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { @@ -346,7 +350,7 @@ final case class Drop[T](count: Long) extends SimpleLinearGraphStage[T] { /** * INTERNAL API */ -final case class Scan[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends GraphStage[FlowShape[In, Out]] { +@InternalApi private[akka] final case class Scan[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends GraphStage[FlowShape[In, Out]] { override val shape = FlowShape[In, Out](Inlet("Scan.in"), Outlet("Scan.out")) override def initialAttributes: Attributes = DefaultAttributes.scan @@ -404,7 +408,7 @@ final case class Scan[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends GraphSta /** * INTERNAL API */ -final case class ScanAsync[In, Out](zero: Out, f: (Out, In) ⇒ Future[Out]) extends GraphStage[FlowShape[In, Out]] { +@InternalApi private[akka] final case class ScanAsync[In, Out](zero: Out, f: (Out, In) ⇒ Future[Out]) extends GraphStage[FlowShape[In, Out]] { import akka.dispatch.ExecutionContexts @@ -512,7 +516,7 @@ final case class ScanAsync[In, Out](zero: Out, f: (Out, In) ⇒ Future[Out]) ext /** * INTERNAL API */ -final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends GraphStage[FlowShape[In, Out]] { +@InternalApi private[akka] final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends GraphStage[FlowShape[In, Out]] { val in = Inlet[In]("Fold.in") val out = Outlet[Out]("Fold.out") @@ -567,7 +571,7 @@ final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends GraphSta /** * INTERNAL API */ -final class FoldAsync[In, Out](zero: Out, f: (Out, In) ⇒ Future[Out]) extends GraphStage[FlowShape[In, Out]] { +@InternalApi private[akka] final class FoldAsync[In, Out](zero: Out, f: (Out, In) ⇒ Future[Out]) extends GraphStage[FlowShape[In, Out]] { import akka.dispatch.ExecutionContexts @@ -662,7 +666,7 @@ final class FoldAsync[In, Out](zero: Out, f: (Out, In) ⇒ Future[Out]) extends /** * INTERNAL API */ -final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) extends SimpleLinearGraphStage[T] { +@InternalApi private[akka] final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) extends SimpleLinearGraphStage[T] { ReactiveStreamsCompliance.requireNonNullElement(inject) if (start.isDefined) ReactiveStreamsCompliance.requireNonNullElement(start.get) if (end.isDefined) ReactiveStreamsCompliance.requireNonNullElement(end.get) @@ -701,7 +705,7 @@ final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) ext /** * INTERNAL API */ -final case class Grouped[T](n: Int) extends GraphStage[FlowShape[T, immutable.Seq[T]]] { +@InternalApi private[akka] final case class Grouped[T](n: Int) extends GraphStage[FlowShape[T, immutable.Seq[T]]] { require(n > 0, "n must be greater than 0") val in = Inlet[T]("Grouped.in") @@ -755,7 +759,7 @@ final case class Grouped[T](n: Int) extends GraphStage[FlowShape[T, immutable.Se /** * INTERNAL API */ -final case class LimitWeighted[T](val n: Long, val costFn: T ⇒ Long) extends SimpleLinearGraphStage[T] { +@InternalApi private[akka] final case class LimitWeighted[T](val n: Long, val costFn: T ⇒ Long) extends SimpleLinearGraphStage[T] { override def initialAttributes: Attributes = DefaultAttributes.limitWeighted def createLogic(inheritedAttributes: Attributes) = new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler { @@ -789,7 +793,7 @@ final case class LimitWeighted[T](val n: Long, val costFn: T ⇒ Long) extends S /** * INTERNAL API */ -final case class Sliding[T](val n: Int, val step: Int) extends GraphStage[FlowShape[T, immutable.Seq[T]]] { +@InternalApi private[akka] final case class Sliding[T](val n: Int, val step: Int) extends GraphStage[FlowShape[T, immutable.Seq[T]]] { require(n > 0, "n must be greater than 0") require(step > 0, "step must be greater than 0") @@ -847,7 +851,7 @@ final case class Sliding[T](val n: Int, val step: Int) extends GraphStage[FlowSh /** * INTERNAL API */ -final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends SimpleLinearGraphStage[T] { +@InternalApi private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends SimpleLinearGraphStage[T] { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { @@ -920,7 +924,7 @@ final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extend /** * INTERNAL API */ -final case class Batch[In, Out](val max: Long, val costFn: In ⇒ Long, val seed: In ⇒ Out, val aggregate: (Out, In) ⇒ Out) +@InternalApi private[akka] final case class Batch[In, Out](val max: Long, val costFn: In ⇒ Long, val seed: In ⇒ Out, val aggregate: (Out, In) ⇒ Out) extends GraphStage[FlowShape[In, Out]] { val in = Inlet[In]("Batch.in") @@ -1042,7 +1046,7 @@ final case class Batch[In, Out](val max: Long, val costFn: In ⇒ Long, val seed /** * INTERNAL API */ -final class Expand[In, Out](val extrapolate: In ⇒ Iterator[Out]) extends GraphStage[FlowShape[In, Out]] { +@InternalApi private[akka] final class Expand[In, Out](val extrapolate: In ⇒ Iterator[Out]) extends GraphStage[FlowShape[In, Out]] { private val in = Inlet[In]("expand.in") private val out = Outlet[Out]("expand.out") @@ -1096,7 +1100,7 @@ final class Expand[In, Out](val extrapolate: In ⇒ Iterator[Out]) extends Graph /** * INTERNAL API */ -private[akka] object MapAsync { +@InternalApi private[akka] object MapAsync { final class Holder[T](var elem: Try[T], val cb: AsyncCallback[Holder[T]]) extends (Try[T] ⇒ Unit) { def setElem(t: Try[T]): Unit = @@ -1117,7 +1121,7 @@ private[akka] object MapAsync { /** * INTERNAL API */ -final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Future[Out]) +@InternalApi private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Future[Out]) extends GraphStage[FlowShape[In, Out]] { import MapAsync._ @@ -1195,7 +1199,7 @@ final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Future[Out]) /** * INTERNAL API */ -final case class MapAsyncUnordered[In, Out](parallelism: Int, f: In ⇒ Future[Out]) +@InternalApi private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: In ⇒ Future[Out]) extends GraphStage[FlowShape[In, Out]] { private val in = Inlet[In]("MapAsyncUnordered.in") @@ -1273,7 +1277,7 @@ final case class MapAsyncUnordered[In, Out](parallelism: Int, f: In ⇒ Future[O /** * INTERNAL API */ -final case class Log[T]( +@InternalApi private[akka] final case class Log[T]( name: String, extract: T ⇒ Any, logAdapter: Option[LoggingAdapter]) extends SimpleLinearGraphStage[T] { @@ -1357,7 +1361,7 @@ final case class Log[T]( /** * INTERNAL API */ -private[akka] object Log { +@InternalApi private[akka] object Log { /** * Must be located here to be visible for implicit resolution, when [[Materializer]] is passed to [[Logging]] @@ -1385,7 +1389,7 @@ private[akka] object Log { /** * INTERNAL API */ -private[stream] object TimerKeys { +@InternalApi private[stream] object TimerKeys { case object TakeWithinTimerKey @@ -1395,7 +1399,10 @@ private[stream] object TimerKeys { } -final class GroupedWithin[T](val n: Int, val d: FiniteDuration) extends GraphStage[FlowShape[T, immutable.Seq[T]]] { +/** + * INTERNAL API + */ +@InternalApi private[akka] final class GroupedWithin[T](val n: Int, val d: FiniteDuration) extends GraphStage[FlowShape[T, immutable.Seq[T]]] { require(n > 0, "n must be greater than 0") require(d > Duration.Zero) @@ -1473,7 +1480,10 @@ final class GroupedWithin[T](val n: Int, val d: FiniteDuration) extends GraphSta } } -final class Delay[T](val d: FiniteDuration, val strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] { +/** + * INTERNAL API + */ +@InternalApi private[akka] final class Delay[T](val d: FiniteDuration, val strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] { private[this] def timerName = "DelayedTimer" override def initialAttributes: Attributes = DefaultAttributes.delay @@ -1587,7 +1597,10 @@ final class Delay[T](val d: FiniteDuration, val strategy: DelayOverflowStrategy) override def toString = "Delay" } -final class TakeWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { +/** + * INTERNAL API + */ +@InternalApi private[akka] final class TakeWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler { def onPush(): Unit = push(out, grab(in)) @@ -1604,7 +1617,10 @@ final class TakeWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraph override def toString = "TakeWithin" } -final class DropWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { +/** + * INTERNAL API + */ +@InternalApi private[akka] final class DropWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { private val startNanoTime = System.nanoTime() @@ -1634,7 +1650,7 @@ final class DropWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraph /** * INTERNAL API */ -final class Reduce[T](val f: (T, T) ⇒ T) extends SimpleLinearGraphStage[T] { +@InternalApi private[akka] final class Reduce[T](val f: (T, T) ⇒ T) extends SimpleLinearGraphStage[T] { override def initialAttributes: Attributes = DefaultAttributes.reduce override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { @@ -1695,11 +1711,11 @@ final class Reduce[T](val f: (T, T) ⇒ T) extends SimpleLinearGraphStage[T] { /** * INTERNAL API */ -private[stream] object RecoverWith { +@InternalApi private[stream] object RecoverWith { val InfiniteRetries = -1 } -final class RecoverWith[T, M](val maximumRetries: Int, val pf: PartialFunction[Throwable, Graph[SourceShape[T], M]]) extends SimpleLinearGraphStage[T] { +@InternalApi private[akka] final class RecoverWith[T, M](val maximumRetries: Int, val pf: PartialFunction[Throwable, Graph[SourceShape[T], M]]) extends SimpleLinearGraphStage[T] { require(maximumRetries >= -1, "number of retries must be non-negative or equal to -1") override def initialAttributes = DefaultAttributes.recoverWith @@ -1753,7 +1769,7 @@ final class RecoverWith[T, M](val maximumRetries: Int, val pf: PartialFunction[T /** * INTERNAL API */ -final class StatefulMapConcat[In, Out](val f: () ⇒ In ⇒ immutable.Iterable[Out]) extends GraphStage[FlowShape[In, Out]] { +@InternalApi private[akka] final class StatefulMapConcat[In, Out](val f: () ⇒ In ⇒ immutable.Iterable[Out]) extends GraphStage[FlowShape[In, Out]] { val in = Inlet[In]("StatefulMapConcat.in") val out = Outlet[Out]("StatefulMapConcat.out") override val shape = FlowShape(in, out) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index 34f7b4dd71..d1ed5bd749 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -4,7 +4,9 @@ package akka.stream.impl.fusing import java.util.concurrent.atomic.AtomicReference + import akka.NotUsed +import akka.annotation.InternalApi import akka.stream.ActorAttributes.SupervisionStrategy import akka.stream._ import akka.stream.impl.Stages.DefaultAttributes @@ -12,19 +14,21 @@ import akka.stream.impl.SubscriptionTimeoutException import akka.stream.stage._ import akka.stream.scaladsl._ import akka.stream.actor.ActorSubscriberMessage -import scala.collection.{ mutable, immutable } + +import scala.collection.{ immutable, mutable } import scala.concurrent.duration.FiniteDuration import scala.util.control.NonFatal import scala.annotation.tailrec import akka.stream.impl.PublisherSource import akka.stream.impl.CancellingSubscriber import akka.stream.impl.{ Buffer ⇒ BufferImpl } + import scala.collection.JavaConversions._ /** * INTERNAL API */ -final class FlattenMerge[T, M](val breadth: Int) extends GraphStage[FlowShape[Graph[SourceShape[T], M], T]] { +@InternalApi private[akka] final class FlattenMerge[T, M](val breadth: Int) extends GraphStage[FlowShape[Graph[SourceShape[T], M], T]] { private val in = Inlet[Graph[SourceShape[T], M]]("flatten.in") private val out = Outlet[T]("flatten.out") @@ -103,7 +107,7 @@ final class FlattenMerge[T, M](val breadth: Int) extends GraphStage[FlowShape[Gr /** * INTERNAL API */ -final class PrefixAndTail[T](val n: Int) extends GraphStage[FlowShape[T, (immutable.Seq[T], Source[T, NotUsed])]] { +@InternalApi private[akka] final class PrefixAndTail[T](val n: Int) extends GraphStage[FlowShape[T, (immutable.Seq[T], Source[T, NotUsed])]] { val in: Inlet[T] = Inlet("PrefixAndTail.in") val out: Outlet[(immutable.Seq[T], Source[T, NotUsed])] = Outlet("PrefixAndTail.out") override val shape: FlowShape[T, (immutable.Seq[T], Source[T, NotUsed])] = FlowShape(in, out) @@ -211,7 +215,7 @@ final class PrefixAndTail[T](val n: Int) extends GraphStage[FlowShape[T, (immuta /** * INTERNAL API */ -final class GroupBy[T, K](val maxSubstreams: Int, val keyFor: T ⇒ K) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] { +@InternalApi private[akka] final class GroupBy[T, K](val maxSubstreams: Int, val keyFor: T ⇒ K) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] { val in: Inlet[T] = Inlet("GroupBy.in") val out: Outlet[Source[T, NotUsed]] = Outlet("GroupBy.out") override val shape: FlowShape[T, Source[T, NotUsed]] = FlowShape(in, out) @@ -384,7 +388,7 @@ final class GroupBy[T, K](val maxSubstreams: Int, val keyFor: T ⇒ K) extends G /** * INTERNAL API */ -object Split { +@InternalApi private[akka] object Split { sealed abstract class SplitDecision /** Splits before the current element. The current element will be the first element in the new substream. */ @@ -403,7 +407,7 @@ object Split { /** * INTERNAL API */ -final class Split[T](val decision: Split.SplitDecision, val p: T ⇒ Boolean, val substreamCancelStrategy: SubstreamCancelStrategy) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] { +@InternalApi private[akka] final class Split[T](val decision: Split.SplitDecision, val p: T ⇒ Boolean, val substreamCancelStrategy: SubstreamCancelStrategy) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] { val in: Inlet[T] = Inlet("Split.in") val out: Outlet[Source[T, NotUsed]] = Outlet("Split.out") override val shape: FlowShape[T, Source[T, NotUsed]] = FlowShape(in, out) @@ -572,7 +576,7 @@ final class Split[T](val decision: Split.SplitDecision, val p: T ⇒ Boolean, va /** * INTERNAL API */ -private[stream] object SubSink { +@InternalApi private[stream] object SubSink { sealed trait State /** Not yet materialized and no command has been scheduled */ case object Uninitialized extends State @@ -598,7 +602,7 @@ private[stream] object SubSink { /** * INTERNAL API */ -private[stream] final class SubSink[T](name: String, externalCallback: ActorSubscriberMessage ⇒ Unit) +@InternalApi private[stream] final class SubSink[T](name: String, externalCallback: ActorSubscriberMessage ⇒ Unit) extends GraphStage[SinkShape[T]] { import SubSink._ @@ -668,7 +672,7 @@ private[stream] final class SubSink[T](name: String, externalCallback: ActorSubs /** * INTERNAL API */ -final class SubSource[T](name: String, private[fusing] val externalCallback: AsyncCallback[SubSink.Command]) +@InternalApi private[akka] final class SubSource[T](name: String, private[fusing] val externalCallback: AsyncCallback[SubSink.Command]) extends GraphStage[SourceShape[T]] { import SubSink._ diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/ByteStringParser.scala b/akka-stream/src/main/scala/akka/stream/impl/io/ByteStringParser.scala index 65d4d0c09f..1c9730626f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/ByteStringParser.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/ByteStringParser.scala @@ -3,6 +3,7 @@ */ package akka.stream.impl.io +import akka.annotation.InternalApi import akka.stream._ import akka.stream.stage._ import akka.util.ByteString @@ -13,7 +14,7 @@ import scala.util.control.{ NoStackTrace, NonFatal } /** * INTERNAL API */ -private[akka] abstract class ByteStringParser[T] extends GraphStage[FlowShape[ByteString, T]] { +@InternalApi private[akka] abstract class ByteStringParser[T] extends GraphStage[FlowShape[ByteString, T]] { import ByteStringParser._ private val bytesIn = Inlet[ByteString]("bytesIn") @@ -139,7 +140,7 @@ private[akka] abstract class ByteStringParser[T] extends GraphStage[FlowShape[By /** * INTERNAL API */ -private[akka] object ByteStringParser { +@InternalApi private[akka] object ByteStringParser { val CompactionThreshold = 16 diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala index acf2dece17..572d0c6e4b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/FilePublisher.scala @@ -8,7 +8,8 @@ import java.nio.channels.FileChannel import java.nio.file.Path import akka.Done -import akka.actor.{ Deploy, ActorLogging, DeadLetterSuppression, Props } +import akka.actor.{ ActorLogging, DeadLetterSuppression, Deploy, Props } +import akka.annotation.InternalApi import akka.stream.actor.ActorPublisherMessage import akka.stream.IOResult import akka.util.ByteString @@ -19,7 +20,7 @@ import scala.util.{ Failure, Success } import scala.util.control.NonFatal /** INTERNAL API */ -private[akka] object FilePublisher { +@InternalApi private[akka] object FilePublisher { def props(f: Path, completionPromise: Promise[IOResult], chunkSize: Int, initialBuffer: Int, maxBuffer: Int) = { require(chunkSize > 0, s"chunkSize must be > 0 (was $chunkSize)") require(initialBuffer > 0, s"initialBuffer must be > 0 (was $initialBuffer)") @@ -35,7 +36,7 @@ private[akka] object FilePublisher { } /** INTERNAL API */ -private[akka] final class FilePublisher(f: Path, completionPromise: Promise[IOResult], chunkSize: Int, initialBuffer: Int, maxBuffer: Int) +@InternalApi private[akka] final class FilePublisher(f: Path, completionPromise: Promise[IOResult], chunkSize: Int, initialBuffer: Int, maxBuffer: Int) extends akka.stream.actor.ActorPublisher[ByteString] with ActorLogging { import FilePublisher._ diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala index 73546d14d5..620967cce8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/FileSubscriber.scala @@ -7,7 +7,8 @@ import java.nio.channels.FileChannel import java.nio.file.{ Path, StandardOpenOption } import akka.Done -import akka.actor.{ Deploy, ActorLogging, Props } +import akka.actor.{ ActorLogging, Deploy, Props } +import akka.annotation.InternalApi import akka.stream.IOResult import akka.stream.actor.{ ActorSubscriberMessage, WatermarkRequestStrategy } import akka.util.ByteString @@ -17,7 +18,7 @@ import scala.concurrent.Promise import scala.util.{ Failure, Success } /** INTERNAL API */ -private[akka] object FileSubscriber { +@InternalApi private[akka] object FileSubscriber { def props(f: Path, completionPromise: Promise[IOResult], bufSize: Int, openOptions: Set[StandardOpenOption]) = { require(bufSize > 0, "buffer size must be > 0") Props(classOf[FileSubscriber], f, completionPromise, bufSize, openOptions).withDeploy(Deploy.local) @@ -25,7 +26,7 @@ private[akka] object FileSubscriber { } /** INTERNAL API */ -private[akka] class FileSubscriber(f: Path, completionPromise: Promise[IOResult], bufSize: Int, openOptions: Set[StandardOpenOption]) +@InternalApi private[akka] class FileSubscriber(f: Path, completionPromise: Promise[IOResult], bufSize: Int, openOptions: Set[StandardOpenOption]) extends akka.stream.actor.ActorSubscriber with ActorLogging { diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala index 7e256cb3e9..c1ace9c4d6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSinks.scala @@ -6,6 +6,7 @@ package akka.stream.impl.io import java.io.OutputStream import java.nio.file.{ Path, StandardOpenOption } +import akka.annotation.InternalApi import akka.stream._ import akka.stream.impl.SinkModule import akka.stream.impl.Stages.DefaultAttributes.IODispatcher @@ -19,7 +20,7 @@ import scala.concurrent.{ Future, Promise } * Creates simple synchronous Sink which writes all incoming elements to the given file * (creating it before hand if necessary). */ -private[akka] final class FileSink(f: Path, options: Set[StandardOpenOption], val attributes: Attributes, shape: SinkShape[ByteString]) +@InternalApi private[akka] final class FileSink(f: Path, options: Set[StandardOpenOption], val attributes: Attributes, shape: SinkShape[ByteString]) extends SinkModule[ByteString, Future[IOResult]](shape) { override protected def label: String = s"FileSink($f, $options)" @@ -47,7 +48,7 @@ private[akka] final class FileSink(f: Path, options: Set[StandardOpenOption], va * INTERNAL API * Creates simple synchronous Sink which writes all incoming elements to the output stream. */ -private[akka] final class OutputStreamSink(createOutput: () ⇒ OutputStream, val attributes: Attributes, shape: SinkShape[ByteString], autoFlush: Boolean) +@InternalApi private[akka] final class OutputStreamSink(createOutput: () ⇒ OutputStream, val attributes: Attributes, shape: SinkShape[ByteString], autoFlush: Boolean) extends SinkModule[ByteString, Future[IOResult]](shape) { override def create(context: MaterializationContext) = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala index 2c9698e626..342330095f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/IOSources.scala @@ -6,6 +6,7 @@ package akka.stream.impl.io import java.io.InputStream import java.nio.file.Path +import akka.annotation.InternalApi import akka.stream._ import akka.stream.ActorAttributes.Dispatcher import akka.stream.IOResult @@ -13,13 +14,14 @@ import akka.stream.impl.Stages.DefaultAttributes.IODispatcher import akka.stream.impl.{ ErrorPublisher, SourceModule } import akka.util.ByteString import org.reactivestreams._ + import scala.concurrent.{ Future, Promise } /** * INTERNAL API * Creates simple synchronous Source backed by the given file. */ -private[akka] final class FileSource(f: Path, chunkSize: Int, val attributes: Attributes, shape: SourceShape[ByteString]) +@InternalApi private[akka] final class FileSource(f: Path, chunkSize: Int, val attributes: Attributes, shape: SourceShape[ByteString]) extends SourceModule[ByteString, Future[IOResult]](shape) { require(chunkSize > 0, "chunkSize must be greater than 0") override def create(context: MaterializationContext) = { @@ -49,7 +51,7 @@ private[akka] final class FileSource(f: Path, chunkSize: Int, val attributes: At * INTERNAL API * Source backed by the given input stream. */ -private[akka] final class InputStreamSource(createInputStream: () ⇒ InputStream, chunkSize: Int, val attributes: Attributes, shape: SourceShape[ByteString]) +@InternalApi private[akka] final class InputStreamSource(createInputStream: () ⇒ InputStream, chunkSize: Int, val attributes: Attributes, shape: SourceShape[ByteString]) extends SourceModule[ByteString, Future[IOResult]](shape) { override def create(context: MaterializationContext) = { val materializer = ActorMaterializerHelper.downcast(context.materializer) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala index 9ceef29268..50c530052f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamPublisher.scala @@ -6,7 +6,8 @@ package akka.stream.impl.io import java.io.InputStream import akka.Done -import akka.actor.{ Deploy, ActorLogging, DeadLetterSuppression, Props } +import akka.actor.{ ActorLogging, DeadLetterSuppression, Deploy, Props } +import akka.annotation.InternalApi import akka.stream.actor.ActorPublisherMessage import akka.stream.IOResult import akka.util.ByteString @@ -15,7 +16,7 @@ import scala.concurrent.Promise import scala.util.{ Failure, Success } /** INTERNAL API */ -private[akka] object InputStreamPublisher { +@InternalApi private[akka] object InputStreamPublisher { def props(is: InputStream, completionPromise: Promise[IOResult], chunkSize: Int): Props = { require(chunkSize > 0, s"chunkSize must be > 0 (was $chunkSize)") @@ -27,7 +28,7 @@ private[akka] object InputStreamPublisher { } /** INTERNAL API */ -private[akka] class InputStreamPublisher(is: InputStream, completionPromise: Promise[IOResult], chunkSize: Int) +@InternalApi private[akka] class InputStreamPublisher(is: InputStream, completionPromise: Promise[IOResult], chunkSize: Int) extends akka.stream.actor.ActorPublisher[ByteString] with ActorLogging { diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala index 867d87e9b4..e0752e6f7b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala @@ -6,6 +6,7 @@ package akka.stream.impl.io import java.io.{ IOException, InputStream } import java.util.concurrent.{ BlockingQueue, LinkedBlockingDeque, TimeUnit } +import akka.annotation.InternalApi import akka.stream.Attributes.InputBuffer import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.io.InputStreamSinkStage._ @@ -36,7 +37,7 @@ private[stream] object InputStreamSinkStage { /** * INTERNAL API */ -final private[stream] class InputStreamSinkStage(readTimeout: FiniteDuration) extends GraphStageWithMaterializedValue[SinkShape[ByteString], InputStream] { +@InternalApi final private[stream] class InputStreamSinkStage(readTimeout: FiniteDuration) extends GraphStageWithMaterializedValue[SinkShape[ByteString], InputStream] { val in = Inlet[ByteString]("InputStreamSink.in") override def initialAttributes: Attributes = DefaultAttributes.inputStreamSink @@ -95,7 +96,7 @@ final private[stream] class InputStreamSinkStage(readTimeout: FiniteDuration) ex * INTERNAL API * InputStreamAdapter that interacts with InputStreamSinkStage */ -private[akka] class InputStreamAdapter( +@InternalApi private[akka] class InputStreamAdapter( sharedBuffer: BlockingQueue[StreamToAdapterMessage], sendToStage: (AdapterToStageMessage) ⇒ Unit, readTimeout: FiniteDuration) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala index 498b9562d5..3f3bcdd0ac 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSubscriber.scala @@ -6,7 +6,8 @@ package akka.stream.impl.io import java.io.OutputStream import akka.Done -import akka.actor.{ Deploy, ActorLogging, Props } +import akka.actor.{ ActorLogging, Deploy, Props } +import akka.annotation.InternalApi import akka.stream.actor.{ ActorSubscriberMessage, WatermarkRequestStrategy } import akka.stream.IOResult import akka.util.ByteString @@ -15,7 +16,7 @@ import scala.concurrent.Promise import scala.util.{ Failure, Success } /** INTERNAL API */ -private[akka] object OutputStreamSubscriber { +@InternalApi private[akka] object OutputStreamSubscriber { def props(os: OutputStream, completionPromise: Promise[IOResult], bufSize: Int, autoFlush: Boolean) = { require(bufSize > 0, "buffer size must be > 0") Props(classOf[OutputStreamSubscriber], os, completionPromise, bufSize, autoFlush).withDeploy(Deploy.local) @@ -24,7 +25,7 @@ private[akka] object OutputStreamSubscriber { } /** INTERNAL API */ -private[akka] class OutputStreamSubscriber(os: OutputStream, completionPromise: Promise[IOResult], bufSize: Int, autoFlush: Boolean) +@InternalApi private[akka] class OutputStreamSubscriber(os: OutputStream, completionPromise: Promise[IOResult], bufSize: Int, autoFlush: Boolean) extends akka.stream.actor.ActorSubscriber with ActorLogging { diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala index 3fa9625290..a5386fe769 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala @@ -10,6 +10,7 @@ import javax.net.ssl.SSLEngineResult.Status._ import javax.net.ssl._ import akka.actor._ +import akka.annotation.InternalApi import akka.stream._ import akka.stream.impl.FanIn.InputBunch import akka.stream.impl.FanOut.OutputBunch @@ -25,7 +26,7 @@ import scala.util.{ Failure, Success, Try } /** * INTERNAL API. */ -private[stream] object TLSActor { +@InternalApi private[stream] object TLSActor { def props( settings: ActorMaterializerSettings, @@ -45,7 +46,7 @@ private[stream] object TLSActor { /** * INTERNAL API. */ -private[stream] class TLSActor( +@InternalApi private[stream] class TLSActor( settings: ActorMaterializerSettings, createSSLEngine: ActorSystem ⇒ SSLEngine, // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 verifySession: (ActorSystem, SSLSession) ⇒ Try[Unit], // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 @@ -455,7 +456,7 @@ private[stream] class TLSActor( /** * INTERNAL API */ -private[stream] object TlsUtils { +@InternalApi private[stream] object TlsUtils { def applySessionParameters(engine: SSLEngine, sessionParameters: NegotiateNewSession): Unit = { sessionParameters.enabledCipherSuites foreach (cs ⇒ engine.setEnabledCipherSuites(cs.toArray)) sessionParameters.enabledProtocols foreach (p ⇒ engine.setEnabledProtocols(p.toArray)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala index f0f19edc8f..63799abcb7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala @@ -9,6 +9,7 @@ import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong } import akka.NotUsed import akka.actor.{ ActorRef, Terminated } +import akka.annotation.InternalApi import akka.dispatch.ExecutionContexts import akka.io.Inet.SocketOption import akka.io.Tcp @@ -29,7 +30,7 @@ import scala.util.Try /** * INTERNAL API */ -private[stream] class ConnectionSourceStage( +@InternalApi private[stream] class ConnectionSourceStage( val tcpManager: ActorRef, val endpoint: InetSocketAddress, val backlog: Int, @@ -167,7 +168,7 @@ private[stream] object ConnectionSourceStage { /** * INTERNAL API */ -private[stream] object TcpConnectionStage { +@InternalApi private[stream] object TcpConnectionStage { case object WriteAck extends Tcp.Event trait TcpRole { @@ -309,7 +310,7 @@ private[stream] object TcpConnectionStage { /** * INTERNAL API */ -class IncomingConnectionStage(connection: ActorRef, remoteAddress: InetSocketAddress, halfClose: Boolean) +@InternalApi private[akka] class IncomingConnectionStage(connection: ActorRef, remoteAddress: InetSocketAddress, halfClose: Boolean) extends GraphStage[FlowShape[ByteString, ByteString]] { import TcpConnectionStage._ @@ -333,7 +334,7 @@ class IncomingConnectionStage(connection: ActorRef, remoteAddress: InetSocketAdd /** * INTERNAL API */ -private[stream] class OutgoingConnectionStage( +@InternalApi private[stream] class OutgoingConnectionStage( manager: ActorRef, remoteAddress: InetSocketAddress, localAddress: Option[InetSocketAddress] = None, @@ -371,7 +372,7 @@ private[stream] class OutgoingConnectionStage( } /** INTERNAL API */ -private[akka] object TcpIdleTimeout { +@InternalApi private[akka] object TcpIdleTimeout { def apply(idleTimeout: FiniteDuration, remoteAddress: Option[InetSocketAddress]): BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] = { val connectionToString = remoteAddress match { case Some(addr) ⇒ s" on connection to [$addr]" diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala index 3817a0fe1a..f836a5382d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala @@ -4,6 +4,7 @@ import javax.net.ssl.{ SSLEngine, SSLSession } import akka.NotUsed import akka.actor.ActorSystem +import akka.annotation.InternalApi import akka.stream._ import akka.stream.impl.StreamLayout.AtomicModule import akka.stream.TLSProtocol._ @@ -15,13 +16,13 @@ import scala.util.Try /** * INTERNAL API. */ -private[stream] final case class TlsModule(plainIn: Inlet[SslTlsOutbound], plainOut: Outlet[SslTlsInbound], - cipherIn: Inlet[ByteString], cipherOut: Outlet[ByteString], - shape: BidiShape[SslTlsOutbound, ByteString, ByteString, SslTlsInbound], - attributes: Attributes, - createSSLEngine: ActorSystem ⇒ SSLEngine, // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 - verifySession: (ActorSystem, SSLSession) ⇒ Try[Unit], // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 - closing: TLSClosing) +@InternalApi private[stream] final case class TlsModule(plainIn: Inlet[SslTlsOutbound], plainOut: Outlet[SslTlsInbound], + cipherIn: Inlet[ByteString], cipherOut: Outlet[ByteString], + shape: BidiShape[SslTlsOutbound, ByteString, ByteString, SslTlsInbound], + attributes: Attributes, + createSSLEngine: ActorSystem ⇒ SSLEngine, // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 + verifySession: (ActorSystem, SSLSession) ⇒ Try[Unit], // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 + closing: TLSClosing) extends AtomicModule[BidiShape[SslTlsOutbound, ByteString, ByteString, SslTlsInbound], NotUsed] { override def withAttributes(att: Attributes): TlsModule = copy(attributes = att) @@ -34,7 +35,7 @@ private[stream] final case class TlsModule(plainIn: Inlet[SslTlsOutbound], plain /** * INTERNAL API. */ -private[stream] object TlsModule { +@InternalApi private[stream] object TlsModule { def apply( attributes: Attributes, createSSLEngine: ActorSystem ⇒ SSLEngine, // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753 diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/compression/CompressionUtils.scala b/akka-stream/src/main/scala/akka/stream/impl/io/compression/CompressionUtils.scala index 94ba98ef28..d8da78b7da 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/compression/CompressionUtils.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/compression/CompressionUtils.scala @@ -4,6 +4,7 @@ package akka.stream.impl.io.compression import akka.NotUsed +import akka.annotation.InternalApi import akka.stream.{ Attributes, FlowShape } import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage import akka.stream.scaladsl.Flow @@ -11,7 +12,7 @@ import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } import akka.util.ByteString /** INTERNAL API */ -private[stream] object CompressionUtils { +@InternalApi private[stream] object CompressionUtils { /** * Creates a flow from a compressor constructor. */ diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/compression/Compressor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/compression/Compressor.scala index 17021da8a1..fce19ef7c8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/compression/Compressor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/compression/Compressor.scala @@ -3,6 +3,7 @@ */ package akka.stream.impl.io.compression +import akka.annotation.InternalApi import akka.util.ByteString /** @@ -10,7 +11,7 @@ import akka.util.ByteString * * A stateful object representing ongoing compression. */ -private[akka] abstract class Compressor { +@InternalApi private[akka] abstract class Compressor { /** * Compresses the given input and returns compressed data. The implementation * can and will choose to buffer output data to improve compression. Use diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateCompressor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateCompressor.scala index 93f405ea74..91e5539cac 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateCompressor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateCompressor.scala @@ -5,12 +5,13 @@ package akka.stream.impl.io.compression import java.util.zip.Deflater +import akka.annotation.InternalApi import akka.util.{ ByteString, ByteStringBuilder } import scala.annotation.tailrec /** INTERNAL API */ -private[akka] class DeflateCompressor extends Compressor { +@InternalApi private[akka] class DeflateCompressor extends Compressor { import DeflateCompressor._ protected lazy val deflater = new Deflater(Deflater.BEST_COMPRESSION, false) @@ -62,7 +63,7 @@ private[akka] class DeflateCompressor extends Compressor { } /** INTERNAL API */ -private[akka] object DeflateCompressor { +@InternalApi private[akka] object DeflateCompressor { val MinBufferSize = 1024 @tailrec diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressor.scala index 95e4d630a2..cf022ecd6a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressor.scala @@ -5,10 +5,11 @@ package akka.stream.impl.io.compression import java.util.zip.Inflater +import akka.annotation.InternalApi import akka.stream.Attributes /** INTERNAL API */ -private[akka] class DeflateDecompressor(maxBytesPerChunk: Int) +@InternalApi private[akka] class DeflateDecompressor(maxBytesPerChunk: Int) extends DeflateDecompressorBase(maxBytesPerChunk) { override def createLogic(attr: Attributes) = new DecompressorParsingLogic { diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressorBase.scala b/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressorBase.scala index 7ddb49d97d..09aebff556 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressorBase.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressorBase.scala @@ -5,12 +5,13 @@ package akka.stream.impl.io.compression import java.util.zip.Inflater +import akka.annotation.InternalApi import akka.stream.impl.io.ByteStringParser import akka.stream.impl.io.ByteStringParser.{ ParseResult, ParseStep } import akka.util.ByteString /** INTERNAL API */ -private[akka] abstract class DeflateDecompressorBase(maxBytesPerChunk: Int) +@InternalApi private[akka] abstract class DeflateDecompressorBase(maxBytesPerChunk: Int) extends ByteStringParser[ByteString] { abstract class DecompressorParsingLogic extends ParsingLogic { @@ -45,4 +46,4 @@ private[akka] abstract class DeflateDecompressorBase(maxBytesPerChunk: Int) } /** INTERNAL API */ -private[akka] object DeflateDecompressorBase +@InternalApi private[akka] object DeflateDecompressorBase diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipCompressor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipCompressor.scala index 18fe9a13e8..27fb04711e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipCompressor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipCompressor.scala @@ -5,10 +5,11 @@ package akka.stream.impl.io.compression import java.util.zip.{ CRC32, Deflater } +import akka.annotation.InternalApi import akka.util.ByteString /** INTERNAL API */ -private[akka] class GzipCompressor extends DeflateCompressor { +@InternalApi private[akka] class GzipCompressor extends DeflateCompressor { override protected lazy val deflater = new Deflater(Deflater.BEST_COMPRESSION, true) private val checkSum = new CRC32 // CRC32 of uncompressed data private var headerSent = false diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipDecompressor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipDecompressor.scala index 0b3d494dbf..c9b0cf6b28 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipDecompressor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipDecompressor.scala @@ -5,13 +5,14 @@ package akka.stream.impl.io.compression import java.util.zip.{ CRC32, Inflater, ZipException } +import akka.annotation.InternalApi import akka.stream.Attributes import akka.stream.impl.io.ByteStringParser import akka.stream.impl.io.ByteStringParser.{ ParseResult, ParseStep } import akka.util.ByteString /** INTERNAL API */ -private[akka] class GzipDecompressor(maxBytesPerChunk: Int) +@InternalApi private[akka] class GzipDecompressor(maxBytesPerChunk: Int) extends DeflateDecompressorBase(maxBytesPerChunk) { override def createLogic(attr: Attributes) = new DecompressorParsingLogic { @@ -66,7 +67,7 @@ private[akka] class GzipDecompressor(maxBytesPerChunk: Int) } /** INTERNAL API */ -private[akka] object GzipDecompressor { +@InternalApi private[akka] object GzipDecompressor { // RFC 1952: http://tools.ietf.org/html/rfc1952 section 2.2 private[impl] val Header = ByteString( 0x1F, // ID1 diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 7592a2d0f7..e973df6b77 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -268,11 +268,11 @@ final class Flow[-In, +Out, +Mat]( object Flow { private[stream] val identityTraversalBuilder = - LinearTraversalBuilder.fromBuilder(GraphStages.Identity.traversalBuilder, GraphStages.Identity.shape, Keep.right) + LinearTraversalBuilder.fromBuilder(GraphStages.identity.traversalBuilder, GraphStages.identity.shape, Keep.right) private[this] val identity: Flow[Any, Any, NotUsed] = new Flow[Any, Any, NotUsed]( identityTraversalBuilder, - GraphStages.Identity.shape) + GraphStages.identity.shape) /** * Creates a Flow from a Reactive Streams [[org.reactivestreams.Processor]]