Add @InternalApi annotations and private markers on internal apis (#22563)

* #22506 Mark materializer extension as @InternalApi

* #22506 Added missing annotations on internal apis in the fusing package

* #22506 Missing annotations in io package

* #22506 Add internal api annotations in impl package

* #22563 more hiding of the classes in the impl package

* #22563 Formatting fixes

* #22506 Fix private access in stream tcl tests
This commit is contained in:
Martynas Mickevičius 2017-03-16 21:04:07 +02:00 committed by Patrik Nordwall
parent 6434cbe868
commit 2a9c0370e0
60 changed files with 568 additions and 316 deletions

View file

@ -91,7 +91,7 @@ API stability annotations and comments
Akka gives a very strong binary compatibility promise to end-users. However some parts of Akka are excluded
from these rules, for example internal or known evolving APIs may be marked as such and shipped as part of
an overall stable module. As general rule any breakage is avoided and handled via deprecation and additional method,
an overall stable module. As general rule any breakage is avoided and handled via deprecation and method addition,
however certain APIs which are known to not yet be fully frozen (or are fully internal) are marked as such and subject
to change at any time (even if best-effort is taken to keep them compatible).
@ -101,12 +101,12 @@ When browsing the source code and/or looking for methods available to be called,
have as rich of an access protection system as Scala has, you may sometimes find methods or classes annotated with
the ``/** INTERNAL API */`` comment or the ``@akka.annotation.InternalApi`` annotation.
No compatibility guarantees are given about these classes, they may change or even disapear in minor versions,
and user code is not supposed to be calling (or even touching) them.
No compatibility guarantees are given about these classes. They may change or even dissappear in minor versions,
and user code is not supposed to call them.
Side-note on JVM representation details of the Scala ``private[akka]`` pattern that Akka is using extensively in
it's internals: Such methods or classes, which act as "accessible only from the given package" in Scala, are compiled
down to ``public`` (!) in raw Java bytecode, and the access restriction, that Scala understands is carried along
down to ``public`` (!) in raw Java bytecode. The access restriction, that Scala understands is carried along
as metadata stored in the classfile. Thus, such methods are safely guarded from being accessed from Scala,
however Java users will not be warned about this fact by the ``javac`` compiler. Please be aware of this and do not call
into Internal APIs, as they are subject to change without any warning.
@ -117,7 +117,7 @@ The ``@DoNotInherit`` and ``@ApiMayChange`` markers
In addition to the special internal API marker two annotations exist in Akka and specifically address the following use cases:
- ``@ApiMayChange`` which marks APIs which are known to be not fully stable yet. Read more in :ref:`may-change`
- ``@DoNotInherit`` which marks APIs that are designed under an closed-world assumption, and thus must not be
- ``@DoNotInherit`` which marks APIs that are designed under a closed-world assumption, and thus must not be
extended outside Akka itself (or such code will risk facing binary incompatibilities). E.g. an interface may be
marked using this annotation, and while the type is public, it is not meant for extension by user-code. This allows
adding new methods to these interfaces without risking to break client code. Examples of such API are the ``FlowOps``

View file

@ -18,7 +18,7 @@ class FusableProcessorTest extends AkkaIdentityProcessorVerification[Int] {
implicit val materializer = ActorMaterializer(settings)(system)
// withAttributes "wraps" the underlying identity and protects it from automatic removal
Flow[Int].via(GraphStages.Identity.asInstanceOf[Graph[FlowShape[Int, Int], NotUsed]]).named("identity").toProcessor.run()
Flow[Int].via(GraphStages.identity.asInstanceOf[Graph[FlowShape[Int, Int], NotUsed]]).named("identity").toProcessor.run()
}
override def createElement(element: Int): Int = element

View file

@ -365,7 +365,7 @@ class GraphDSLCompileSpec extends StreamSpec {
import akka.stream.Attributes._
val ga = GraphDSL.create() { implicit b
import GraphDSL.Implicits._
val id = b.add(GraphStages.Identity)
val id = b.add(GraphStages.identity[Any])
FlowShape(id.in, id.out)
}.async.addAttributes(none).named("useless")

View file

@ -6,7 +6,7 @@ package akka.stream.impl
import java.util.concurrent.atomic.AtomicBoolean
import akka.actor._
import akka.annotation.InternalApi
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.event.LoggingAdapter
import akka.pattern.ask
import akka.stream._
@ -19,7 +19,7 @@ import scala.concurrent.{ Await, ExecutionContextExecutor }
/**
* ExtendedActorMaterializer used by subtypes which materializer using GraphInterpreterShell
*/
abstract class ExtendedActorMaterializer extends ActorMaterializer {
@DoNotInherit private[akka] abstract class ExtendedActorMaterializer extends ActorMaterializer {
override def withNamePrefix(name: String): ExtendedActorMaterializer
@ -41,7 +41,7 @@ abstract class ExtendedActorMaterializer extends ActorMaterializer {
/**
* INTERNAL API
*/
override def actorOf(context: MaterializationContext, props: Props): ActorRef = {
@InternalApi private[akka] override def actorOf(context: MaterializationContext, props: Props): ActorRef = {
val dispatcher =
if (props.deploy.dispatcher == Deploy.NoDispatcherGiven) effectiveSettings(context.effectiveAttributes).dispatcher
else props.dispatcher
@ -51,7 +51,7 @@ abstract class ExtendedActorMaterializer extends ActorMaterializer {
/**
* INTERNAL API
*/
def actorOf(props: Props, name: String): ActorRef = {
@InternalApi private[akka] def actorOf(props: Props, name: String): ActorRef = {
supervisor match {
case ref: LocalActorRef
ref.underlying.attachChild(props, name, systemService = false)
@ -71,12 +71,12 @@ abstract class ExtendedActorMaterializer extends ActorMaterializer {
/**
* INTERNAL API
*/
override def logger: LoggingAdapter
@InternalApi private[akka] override def logger: LoggingAdapter
/**
* INTERNAL API
*/
override def supervisor: ActorRef
@InternalApi private[akka] override def supervisor: ActorRef
}
@ -117,7 +117,7 @@ private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMa
/**
* INTERNAL API
*/
object FlowNames extends ExtensionId[FlowNames] with ExtensionIdProvider {
@InternalApi private[akka] object FlowNames extends ExtensionId[FlowNames] with ExtensionIdProvider {
override def get(system: ActorSystem): FlowNames = super.get(system)
override def lookup() = FlowNames
override def createExtension(system: ExtendedActorSystem): FlowNames = new FlowNames
@ -126,14 +126,14 @@ object FlowNames extends ExtensionId[FlowNames] with ExtensionIdProvider {
/**
* INTERNAL API
*/
class FlowNames extends Extension {
@InternalApi private[akka] class FlowNames extends Extension {
val name = SeqActorName("Flow")
}
/**
* INTERNAL API
*/
object StreamSupervisor {
@InternalApi private[akka] object StreamSupervisor {
def props(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean): Props =
Props(new StreamSupervisor(settings, haveShutDown)).withDeploy(Deploy.local)
private[stream] val baseName = "StreamSupervisor"
@ -155,7 +155,10 @@ object StreamSupervisor {
case object PrintDebugDump
}
class StreamSupervisor(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean) extends Actor {
/**
* INTERNAL API
*/
@InternalApi private[akka] class StreamSupervisor(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean) extends Actor {
import akka.stream.impl.StreamSupervisor._
override def supervisorStrategy = SupervisorStrategy.stoppingStrategy

View file

@ -4,16 +4,17 @@
package akka.stream.impl
import akka.actor._
import akka.annotation.InternalApi
import akka.stream.{ AbruptTerminationException, ActorMaterializerSettings }
import akka.stream.actor.ActorSubscriber.OnSubscribe
import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnComplete, OnError }
import org.reactivestreams.{ Subscriber, Subscription, Processor }
import akka.stream.actor.ActorSubscriberMessage.{ OnComplete, OnError, OnNext }
import org.reactivestreams.{ Processor, Subscriber, Subscription }
import akka.event.Logging
/**
* INTERNAL API
*/
private[akka] object ActorProcessor {
@InternalApi private[akka] object ActorProcessor {
def apply[I, O](impl: ActorRef): ActorProcessor[I, O] = {
val p = new ActorProcessor[I, O](impl)
@ -26,7 +27,7 @@ private[akka] object ActorProcessor {
/**
* INTERNAL API
*/
private[akka] class ActorProcessor[I, O](impl: ActorRef) extends ActorPublisher[O](impl)
@InternalApi private[akka] class ActorProcessor[I, O](impl: ActorRef) extends ActorPublisher[O](impl)
with Processor[I, O] {
override def onSubscribe(s: Subscription): Unit = {
ReactiveStreamsCompliance.requireNonNullSubscription(s)
@ -46,7 +47,7 @@ private[akka] class ActorProcessor[I, O](impl: ActorRef) extends ActorPublisher[
/**
* INTERNAL API
*/
private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump) extends DefaultInputTransferStates {
@InternalApi private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump) extends DefaultInputTransferStates {
if (size < 1) throw new IllegalArgumentException(s"buffer size must be positive (was: $size)")
if ((size & (size - 1)) != 0) throw new IllegalArgumentException(s"buffer size must be a power of two (was: $size)")
@ -158,7 +159,7 @@ private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump)
/**
* INTERNAL API
*/
private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends DefaultOutputTransferStates {
@InternalApi private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends DefaultOutputTransferStates {
import ReactiveStreamsCompliance._
protected var exposedPublisher: ActorPublisher[Any] = _
@ -247,7 +248,7 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D
/**
* INTERNAL API
*/
private[akka] abstract class ActorProcessorImpl(val settings: ActorMaterializerSettings)
@InternalApi private[akka] abstract class ActorProcessorImpl(val settings: ActorMaterializerSettings)
extends Actor
with ActorLogging
with Pump {

View file

@ -4,17 +4,19 @@
package akka.stream.impl
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.collection.immutable
import scala.util.control.{ NoStackTrace }
import scala.util.control.NoStackTrace
import akka.actor.{ Actor, ActorRef, Terminated }
import akka.annotation.InternalApi
import org.reactivestreams.{ Publisher, Subscriber }
import org.reactivestreams.Subscription
/**
* INTERNAL API
*/
object ActorPublisher {
@InternalApi private[akka] object ActorPublisher {
val NormalShutdownReasonMessage = "Cannot subscribe to shut-down Publisher"
class NormalShutdownException extends IllegalStateException(NormalShutdownReasonMessage) with NoStackTrace
val NormalShutdownReason: Throwable = new NormalShutdownException
@ -35,7 +37,7 @@ object ActorPublisher {
* When you instantiate this class, or its subclasses, you MUST send an ExposedPublisher message to the wrapped
* ActorRef! If you don't need to subclass, prefer the apply() method on the companion object which takes care of this.
*/
class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] {
@InternalApi private[akka] class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] {
import ReactiveStreamsCompliance._
// The subscriber of an subscription attempt is first placed in this list of pending subscribers.
@ -97,7 +99,7 @@ class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] {
/**
* INTERNAL API
*/
private[akka] class ActorSubscription[T]( final val impl: ActorRef, final val subscriber: Subscriber[_ >: T]) extends Subscription {
@InternalApi private[akka] class ActorSubscription[T]( final val impl: ActorRef, final val subscriber: Subscriber[_ >: T]) extends Subscription {
override def request(elements: Long): Unit = impl ! RequestMore(this, elements)
override def cancel(): Unit = impl ! Cancel(this)
}
@ -105,13 +107,13 @@ private[akka] class ActorSubscription[T]( final val impl: ActorRef, final val su
/**
* INTERNAL API
*/
private[akka] class ActorSubscriptionWithCursor[T](_impl: ActorRef, _subscriber: Subscriber[_ >: T])
@InternalApi private[akka] class ActorSubscriptionWithCursor[T](_impl: ActorRef, _subscriber: Subscriber[_ >: T])
extends ActorSubscription[T](_impl, _subscriber) with SubscriptionWithCursor[T]
/**
* INTERNAL API
*/
private[akka] trait SoftShutdown { this: Actor
@InternalApi private[akka] trait SoftShutdown { this: Actor
def softShutdown(): Unit = {
val children = context.children
if (children.isEmpty) {

View file

@ -6,15 +6,16 @@ package akka.stream.impl
import java.util
import akka.actor._
import akka.annotation.InternalApi
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.{ Inlet, SinkShape, Attributes }
import akka.stream.{ Attributes, Inlet, SinkShape }
import akka.stream.Attributes.InputBuffer
import akka.stream.stage._
/**
* INTERNAL API
*/
private[akka] class ActorRefBackpressureSinkStage[In](ref: ActorRef, onInitMessage: Any,
@InternalApi private[akka] class ActorRefBackpressureSinkStage[In](ref: ActorRef, onInitMessage: Any,
ackMessage: Any,
onCompleteMessage: Any,
onFailureMessage: (Throwable) Any)

View file

@ -10,11 +10,12 @@ import akka.actor.Status
import akka.stream.actor.WatermarkRequestStrategy
import akka.actor.Props
import akka.actor.Terminated
import akka.annotation.InternalApi
/**
* INTERNAL API
*/
private[akka] object ActorRefSinkActor {
@InternalApi private[akka] object ActorRefSinkActor {
def props(ref: ActorRef, highWatermark: Int, onCompleteMessage: Any): Props =
Props(new ActorRefSinkActor(ref, highWatermark, onCompleteMessage))
}
@ -22,7 +23,7 @@ private[akka] object ActorRefSinkActor {
/**
* INTERNAL API
*/
private[akka] class ActorRefSinkActor(ref: ActorRef, highWatermark: Int, onCompleteMessage: Any) extends ActorSubscriber {
@InternalApi private[akka] class ActorRefSinkActor(ref: ActorRef, highWatermark: Int, onCompleteMessage: Any) extends ActorSubscriber {
import ActorSubscriberMessage._
override val requestStrategy = WatermarkRequestStrategy(highWatermark)

View file

@ -6,14 +6,15 @@ package akka.stream.impl
import akka.actor.ActorLogging
import akka.actor.Props
import akka.actor.Status
import akka.annotation.InternalApi
import akka.stream.OverflowStrategies._
import akka.stream.{ BufferOverflowException, OverflowStrategy, OverflowStrategies }
import akka.stream.{ BufferOverflowException, OverflowStrategies, OverflowStrategy }
import akka.stream.ActorMaterializerSettings
/**
* INTERNAL API
*/
private[akka] object ActorRefSourceActor {
@InternalApi private[akka] object ActorRefSourceActor {
def props(bufferSize: Int, overflowStrategy: OverflowStrategy, settings: ActorMaterializerSettings) = {
require(overflowStrategy != OverflowStrategies.Backpressure, "Backpressure overflowStrategy not supported")
val maxFixedBufferSize = settings.maxFixedBufferSize
@ -24,7 +25,7 @@ private[akka] object ActorRefSourceActor {
/**
* INTERNAL API
*/
private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: OverflowStrategy, maxFixedBufferSize: Int)
@InternalApi private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: OverflowStrategy, maxFixedBufferSize: Int)
extends akka.stream.actor.ActorPublisher[Any] with ActorLogging {
import akka.stream.actor.ActorPublisherMessage._

View file

@ -4,12 +4,14 @@
package akka.stream.impl
import java.{ util ju }
import akka.annotation.InternalApi
import akka.stream._
/**
* INTERNAL API
*/
private[akka] trait Buffer[T] {
@InternalApi private[akka] trait Buffer[T] {
def capacity: Int
def used: Int
def isFull: Boolean
@ -46,7 +48,7 @@ private[akka] object Buffer {
/**
* INTERNAL API
*/
private[akka] object FixedSizeBuffer {
@InternalApi private[akka] object FixedSizeBuffer {
/**
* INTERNAL API
@ -57,7 +59,7 @@ private[akka] object FixedSizeBuffer {
*
* Returns a specialized instance for power-of-two sized buffers.
*/
def apply[T](size: Int): FixedSizeBuffer[T] =
@InternalApi private[akka] def apply[T](size: Int): FixedSizeBuffer[T] =
if (size < 1) throw new IllegalArgumentException("size must be positive")
else if (((size - 1) & size) == 0) new PowerOfTwoFixedSizeBuffer(size)
else new ModuloFixedSizeBuffer(size)
@ -140,7 +142,7 @@ private[akka] object FixedSizeBuffer {
/**
* INTERNAL API
*/
private[akka] final class BoundedBuffer[T](val capacity: Int) extends Buffer[T] {
@InternalApi private[akka] final class BoundedBuffer[T](val capacity: Int) extends Buffer[T] {
def used: Int = q.used
def isFull: Boolean = q.isFull

View file

@ -11,8 +11,7 @@ import scala.concurrent.{ ExecutionContext, Promise }
/**
* INTERNAL API
*/
@InternalApi
private[akka] case object EmptyPublisher extends Publisher[Nothing] {
@InternalApi private[akka] case object EmptyPublisher extends Publisher[Nothing] {
import ReactiveStreamsCompliance._
override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit =
try {
@ -29,7 +28,7 @@ private[akka] case object EmptyPublisher extends Publisher[Nothing] {
/**
* INTERNAL API
*/
private[akka] final case class ErrorPublisher(t: Throwable, name: String) extends Publisher[Nothing] {
@InternalApi private[akka] final case class ErrorPublisher(t: Throwable, name: String) extends Publisher[Nothing] {
ReactiveStreamsCompliance.requireNonNullElement(t)
import ReactiveStreamsCompliance._
@ -48,7 +47,7 @@ private[akka] final case class ErrorPublisher(t: Throwable, name: String) extend
/**
* INTERNAL API
*/
private[akka] final case class MaybePublisher[T](
@InternalApi private[akka] final case class MaybePublisher[T](
promise: Promise[Option[T]],
name: String)(implicit ec: ExecutionContext) extends Publisher[T] {
import ReactiveStreamsCompliance._
@ -95,12 +94,15 @@ private[akka] final case class MaybePublisher[T](
* This is only a legal subscription when it is immediately followed by
* a termination signal (onComplete, onError).
*/
private[akka] case object CancelledSubscription extends Subscription {
@InternalApi private[akka] case object CancelledSubscription extends Subscription {
override def request(elements: Long): Unit = ()
override def cancel(): Unit = ()
}
private[akka] final class CancellingSubscriber[T] extends Subscriber[T] {
/**
* INTERNAL API
*/
@InternalApi private[akka] final class CancellingSubscriber[T] extends Subscriber[T] {
override def onError(t: Throwable): Unit = ()
override def onSubscribe(s: Subscription): Unit = s.cancel()
override def onComplete(): Unit = ()
@ -110,7 +112,7 @@ private[akka] final class CancellingSubscriber[T] extends Subscriber[T] {
/**
* INTERNAL API
*/
private[akka] case object RejectAdditionalSubscribers extends Publisher[Nothing] {
@InternalApi private[akka] case object RejectAdditionalSubscribers extends Publisher[Nothing] {
import ReactiveStreamsCompliance._
override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit =
try rejectAdditionalSubscriber(subscriber, "Publisher") catch {

View file

@ -3,10 +3,14 @@
*/
package akka.stream.impl
import akka.annotation.InternalApi
import akka.japi.function.{ Function JFun, Function2 JFun2 }
import akka.japi.{ Pair JPair }
private[akka] object ConstantFun {
/**
* INTERNAL API
*/
@InternalApi private[akka] object ConstantFun {
private[this] val JavaIdentityFunction = new JFun[Any, Any] {
@throws(classOf[Exception]) override def apply(param: Any): Any = param
}

View file

@ -4,11 +4,12 @@
package akka.stream.impl
import akka.actor.Actor
import akka.annotation.InternalApi
/**
* INTERNAL API
*/
private[akka] abstract class ExposedPublisherReceive(activeReceive: Actor.Receive, unhandled: Any Unit) extends Actor.Receive {
@InternalApi private[akka] abstract class ExposedPublisherReceive(activeReceive: Actor.Receive, unhandled: Any Unit) extends Actor.Receive {
private var stash = List.empty[Any]
def isDefinedAt(o: Any): Boolean = true

View file

@ -4,14 +4,15 @@
package akka.stream.impl
import akka.actor._
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.stream.{ AbruptTerminationException, ActorMaterializerSettings }
import akka.stream.actor.{ ActorSubscriberMessage, ActorSubscriber }
import org.reactivestreams.{ Subscription, Subscriber }
import akka.stream.actor.{ ActorSubscriber, ActorSubscriberMessage }
import org.reactivestreams.{ Subscriber, Subscription }
/**
* INTERNAL API
*/
object FanIn {
@InternalApi private[akka] object FanIn {
final case class OnError(id: Int, cause: Throwable) extends DeadLetterSuppression with NoSerializationVerificationNeeded
final case class OnComplete(id: Int) extends DeadLetterSuppression with NoSerializationVerificationNeeded
@ -252,7 +253,7 @@ object FanIn {
/**
* INTERNAL API
*/
abstract class FanIn(val settings: ActorMaterializerSettings, val inputCount: Int) extends Actor with ActorLogging with Pump {
@DoNotInherit private[akka] class FanIn(val settings: ActorMaterializerSettings, val inputCount: Int) extends Actor with ActorLogging with Pump {
import FanIn._
protected val primaryOutputs: Outputs = new SimpleOutputs(self, this)

View file

@ -7,12 +7,13 @@ import akka.stream.{ AbruptTerminationException, ActorMaterializerSettings }
import scala.collection.immutable
import akka.actor._
import akka.annotation.{ DoNotInherit, InternalApi }
import org.reactivestreams.Subscription
/**
* INTERNAL API
*/
object FanOut {
@InternalApi private[akka] object FanOut {
final case class SubstreamRequestMore(id: Int, demand: Long) extends DeadLetterSuppression with NoSerializationVerificationNeeded
final case class SubstreamCancel(id: Int) extends DeadLetterSuppression with NoSerializationVerificationNeeded
@ -247,7 +248,7 @@ object FanOut {
/**
* INTERNAL API
*/
abstract class FanOut(val settings: ActorMaterializerSettings, val outputCount: Int) extends Actor with ActorLogging with Pump {
@DoNotInherit private[akka] abstract class FanOut(val settings: ActorMaterializerSettings, val outputCount: Int) extends Actor with ActorLogging with Pump {
import FanOut._
protected val outputBunch = new OutputBunch(outputCount, self, this)
@ -287,7 +288,7 @@ abstract class FanOut(val settings: ActorMaterializerSettings, val outputCount:
/**
* INTERNAL API
*/
private[akka] object Unzip {
@InternalApi private[akka] object Unzip {
def props(settings: ActorMaterializerSettings): Props =
Props(new Unzip(settings)).withDeploy(Deploy.local)
}
@ -295,7 +296,7 @@ private[akka] object Unzip {
/**
* INTERNAL API
*/
private[akka] class Unzip(_settings: ActorMaterializerSettings) extends FanOut(_settings, outputCount = 2) {
@InternalApi private[akka] class Unzip(_settings: ActorMaterializerSettings) extends FanOut(_settings, outputCount = 2) {
outputBunch.markAllOutputs()
initialPhase(1, TransferPhase(primaryInputs.NeedsInput && outputBunch.AllOfMarkedOutputs) { ()

View file

@ -1,13 +1,14 @@
package akka.stream.impl
import akka.actor.{ Deploy, Props, Actor, ActorRef }
import akka.actor.{ Actor, ActorRef, Deploy, Props }
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.stream.ActorMaterializerSettings
import org.reactivestreams.Subscriber
/**
* INTERNAL API
*/
private[akka] abstract class FanoutOutputs(
@DoNotInherit private[akka] abstract class FanoutOutputs(
val maxBufferSize: Int,
val initialBufferSize: Int,
self: ActorRef,
@ -92,14 +93,17 @@ private[akka] abstract class FanoutOutputs(
}
private[akka] object FanoutProcessorImpl {
/**
* INTERNAL API
*/
@InternalApi private[akka] object FanoutProcessorImpl {
def props(actorMaterializerSettings: ActorMaterializerSettings): Props =
Props(new FanoutProcessorImpl(actorMaterializerSettings)).withDeploy(Deploy.local)
}
/**
* INTERNAL API
*/
private[akka] class FanoutProcessorImpl(_settings: ActorMaterializerSettings)
@InternalApi private[akka] class FanoutProcessorImpl(_settings: ActorMaterializerSettings)
extends ActorProcessorImpl(_settings) {
override val primaryOutputs: FanoutOutputs =

View file

@ -4,9 +4,8 @@ import akka.stream._
import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
import akka.annotation.InternalApi
/** Internal API */
@InternalApi
private[stream] final class JavaStreamSource[T, S <: java.util.stream.BaseStream[T, S]](open: () java.util.stream.BaseStream[T, S])
/** INTERNAL API */
@InternalApi private[stream] final class JavaStreamSource[T, S <: java.util.stream.BaseStream[T, S]](open: () java.util.stream.BaseStream[T, S])
extends GraphStage[SourceShape[T]] {
val out: Outlet[T] = Outlet("JavaStreamSource")

View file

@ -3,6 +3,7 @@
*/
package akka.stream.impl
import akka.annotation.InternalApi
import akka.stream.scaladsl.Framing.FramingException
import akka.util.ByteString
@ -11,7 +12,7 @@ import scala.annotation.switch
/**
* INTERNAL API: Use [[akka.stream.scaladsl.JsonFraming]] instead.
*/
private[akka] object JsonObjectParser {
@InternalApi private[akka] object JsonObjectParser {
final val SquareBraceStart = '['.toByte
final val SquareBraceEnd = ']'.toByte
@ -42,7 +43,7 @@ private[akka] object JsonObjectParser {
*
* Leading whitespace between elements will be trimmed.
*/
private[akka] class JsonObjectParser(maximumObjectLength: Int = Int.MaxValue) {
@InternalApi private[akka] class JsonObjectParser(maximumObjectLength: Int = Int.MaxValue) {
import JsonObjectParser._
private var buffer: ByteString = ByteString.empty

View file

@ -4,28 +4,29 @@
package akka.stream.impl
import language.existentials
import akka.actor.{ NoSerializationVerificationNeeded, DeadLetterSuppression }
import akka.actor.{ DeadLetterSuppression, NoSerializationVerificationNeeded }
import akka.annotation.InternalApi
/**
* INTERNAL API
*/
private[akka] case object SubscribePending extends DeadLetterSuppression with NoSerializationVerificationNeeded
@InternalApi private[akka] case object SubscribePending extends DeadLetterSuppression with NoSerializationVerificationNeeded
/**
* INTERNAL API
*/
private[akka] final case class RequestMore(subscription: ActorSubscription[_], demand: Long)
@InternalApi private[akka] final case class RequestMore(subscription: ActorSubscription[_], demand: Long)
extends DeadLetterSuppression with NoSerializationVerificationNeeded
/**
* INTERNAL API
*/
private[akka] final case class Cancel(subscription: ActorSubscription[_])
@InternalApi private[akka] final case class Cancel(subscription: ActorSubscription[_])
extends DeadLetterSuppression with NoSerializationVerificationNeeded
/**
* INTERNAL API
*/
private[akka] final case class ExposedPublisher(publisher: ActorPublisher[Any])
@InternalApi private[akka] final case class ExposedPublisher(publisher: ActorPublisher[Any])
extends DeadLetterSuppression with NoSerializationVerificationNeeded

View file

@ -5,6 +5,7 @@ package akka.stream.impl
import akka.NotUsed
import akka.actor._
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.stream._
import akka.stream.impl.StreamLayout.AtomicModule
import org.reactivestreams._
@ -17,7 +18,7 @@ import akka.util.OptionVal
/**
* INTERNAL API
*/
abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out]) extends AtomicModule[SourceShape[Out], Mat] {
@DoNotInherit private[akka] abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out]) extends AtomicModule[SourceShape[Out], Mat] {
protected def label: String = Logging.simpleName(this)
final override def toString: String = f"$label [${System.identityHashCode(this)}%08x]"
@ -48,7 +49,7 @@ abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out]) extends Ato
* Holds a `Subscriber` representing the input side of the flow.
* The `Subscriber` can later be connected to an upstream `Publisher`.
*/
final class SubscriberSource[Out](val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, Subscriber[Out]](shape) {
@InternalApi private[akka] final class SubscriberSource[Out](val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, Subscriber[Out]](shape) {
override def create(context: MaterializationContext): (Publisher[Out], Subscriber[Out]) = {
val processor = new VirtualProcessor[Out]
@ -66,7 +67,7 @@ final class SubscriberSource[Out](val attributes: Attributes, shape: SourceShape
* that mediate the flow of elements downstream and the propagation of
* back-pressure upstream.
*/
final class PublisherSource[Out](p: Publisher[Out], val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, NotUsed](shape) {
@InternalApi private[akka] final class PublisherSource[Out](p: Publisher[Out], val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, NotUsed](shape) {
override protected def label: String = s"PublisherSource($p)"
@ -79,7 +80,7 @@ final class PublisherSource[Out](p: Publisher[Out], val attributes: Attributes,
/**
* INTERNAL API
*/
final class MaybeSource[Out](val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, Promise[Option[Out]]](shape) {
@InternalApi private[akka] final class MaybeSource[Out](val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, Promise[Option[Out]]](shape) {
override def create(context: MaterializationContext) = {
val p = Promise[Option[Out]]()
@ -94,7 +95,7 @@ final class MaybeSource[Out](val attributes: Attributes, shape: SourceShape[Out]
* Creates and wraps an actor into [[org.reactivestreams.Publisher]] from the given `props`,
* which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorPublisher]].
*/
final class ActorPublisherSource[Out](props: Props, val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, ActorRef](shape) {
@InternalApi private[akka] final class ActorPublisherSource[Out](props: Props, val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, ActorRef](shape) {
override def create(context: MaterializationContext) = {
val publisherRef = ActorMaterializerHelper.downcast(context.materializer).actorOf(context, props)
@ -109,7 +110,7 @@ final class ActorPublisherSource[Out](props: Props, val attributes: Attributes,
/**
* INTERNAL API
*/
final class ActorRefSource[Out](
@InternalApi private[akka] final class ActorRefSource[Out](
bufferSize: Int, overflowStrategy: OverflowStrategy, val attributes: Attributes, shape: SourceShape[Out])
extends SourceModule[Out, ActorRef](shape) {

View file

@ -9,6 +9,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import akka.NotUsed
import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, Cancellable, Deploy, ExtendedActorSystem, PoisonPill }
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.dispatch.Dispatchers
import akka.event.{ Logging, LoggingAdapter }
import akka.stream.Attributes.InputBuffer
@ -19,18 +20,18 @@ import akka.stream.impl.fusing.GraphInterpreter.Connection
import akka.stream.impl.fusing._
import akka.stream.impl.io.{ TLSActor, TlsModule }
import akka.stream.stage.{ GraphStageLogic, InHandler, OutHandler }
import akka.util.OptionVal
import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription }
import org.reactivestreams.{ Processor, Publisher, Subscriber }
import scala.collection.immutable.Map
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.ExecutionContextExecutor
import scala.annotation.tailrec
import akka.stream.impl.fusing.GraphInterpreter.DownstreamBoundaryStageLogic
import akka.stream.impl.fusing.GraphInterpreter.UpstreamBoundaryStageLogic
import akka.util.OptionVal
object PhasedFusingActorMaterializer {
/**
* INTERNAL API
*/
@InternalApi private[akka] object PhasedFusingActorMaterializer {
val Debug = false
@ -61,7 +62,7 @@ object PhasedFusingActorMaterializer {
},
GraphStageTag DefaultPhase)
def apply(settings: ActorMaterializerSettings)(implicit context: ActorRefFactory): ActorMaterializer = {
@InternalApi private[akka] def apply(settings: ActorMaterializerSettings)(implicit context: ActorRefFactory): ActorMaterializer = {
val haveShutDown = new AtomicBoolean(false)
val system = actorSystemOf(context)
val materializerSettings = ActorMaterializerSettings(system)
@ -121,7 +122,7 @@ private final case class ForwardWire(
private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOffset: Int, skippedSlots: Int, phase: PhaseIsland[Any])
class IslandTracking(
@InternalApi private[akka] class IslandTracking(
val phases: Map[IslandTag, Phase[Any]],
val settings: ActorMaterializerSettings,
defaultPhase: Phase[Any],
@ -151,8 +152,8 @@ class IslandTracking(
private var currentPhase: PhaseIsland[Any] = defaultPhase.apply(settings, materializer, nextIslandName())
def getCurrentPhase: PhaseIsland[Any] = currentPhase
def getCurrentOffset: Int = currentGlobalOffset
@InternalApi private[akka] def getCurrentPhase: PhaseIsland[Any] = currentPhase
@InternalApi private[akka] def getCurrentOffset: Int = currentGlobalOffset
private def completeSegment(): Unit = {
val length = currentGlobalOffset - currentSegmentGlobalOffset
@ -181,7 +182,7 @@ class IslandTracking(
}
}
def enterIsland(tag: IslandTag, attributes: Attributes): Unit = {
@InternalApi private[akka] def enterIsland(tag: IslandTag, attributes: Attributes): Unit = {
completeSegment()
val previousPhase = currentPhase
val previousIslandOffset = currentIslandGlobalOffset
@ -200,7 +201,7 @@ class IslandTracking(
if (Debug) println(s"Entering island starting at offset = $currentIslandGlobalOffset phase = $currentPhase")
}
def exitIsland(): Unit = {
@InternalApi private[akka] def exitIsland(): Unit = {
val parentIsland = islandStateStack.remove(islandStateStack.size() - 1)
val previousSegmentLength = completeSegment()
@ -215,7 +216,7 @@ class IslandTracking(
if (Debug) println(s"Exited to island starting at offset = $currentIslandGlobalOffset phase = $currentPhase")
}
def wireIn(in: InPort, logic: Any): Unit = {
@InternalApi private[akka] def wireIn(in: InPort, logic: Any): Unit = {
// The slot for this InPort always belong to the current segment, so resolving its local
// offset/slot is simple
val localInSlot = currentGlobalOffset - currentIslandGlobalOffset - currentIslandSkippetSlots
@ -258,7 +259,7 @@ class IslandTracking(
currentGlobalOffset += 1
}
def wireOut(out: OutPort, absoluteOffset: Int, logic: Any): Unit = {
@InternalApi private[akka] def wireOut(out: OutPort, absoluteOffset: Int, logic: Any): Unit = {
if (Debug) println(s" wiring $out to absolute = $absoluteOffset")
// First check if we are wiring backwards. This is important since we can only do resolution for backward wires.
@ -319,7 +320,7 @@ class IslandTracking(
}
def allNestedIslandsReady(): Unit = {
@InternalApi private[akka] def allNestedIslandsReady(): Unit = {
if (activePhases ne null) {
var i = 0
while (i < activePhases.size()) {
@ -331,7 +332,10 @@ class IslandTracking(
}
case class PhasedFusingActorMaterializer(
/**
* INTERNAL API
*/
@InternalApi private[akka] case class PhasedFusingActorMaterializer(
system: ActorSystem,
override val settings: ActorMaterializerSettings,
dispatchers: Dispatchers,
@ -547,16 +551,25 @@ case class PhasedFusingActorMaterializer(
}
trait IslandTag
/**
* INTERNAL API
*/
@DoNotInherit private[akka] trait IslandTag
trait Phase[M] {
/**
* INTERNAL API
*/
@DoNotInherit private[akka] trait Phase[M] {
def apply(
effectiveSettings: ActorMaterializerSettings,
materializer: PhasedFusingActorMaterializer,
islandName: String): PhaseIsland[M]
}
trait PhaseIsland[M] {
/**
* INTERNAL API
*/
@DoNotInherit private[akka] trait PhaseIsland[M] {
def name: String
@ -574,9 +587,15 @@ trait PhaseIsland[M] {
}
object GraphStageTag extends IslandTag
/**
* INTERNAL API
*/
@InternalApi private[akka] object GraphStageTag extends IslandTag
final class GraphStageIsland(
/**
* INTERNAL API
*/
@InternalApi private[akka] final class GraphStageIsland(
effectiveSettings: ActorMaterializerSettings,
materializer: PhasedFusingActorMaterializer,
islandName: String,
@ -739,9 +758,15 @@ final class GraphStageIsland(
override def toString: String = "GraphStagePhase"
}
object SourceModuleIslandTag extends IslandTag
/**
* INTERNAL API
*/
@InternalApi private[akka] object SourceModuleIslandTag extends IslandTag
final class SourceModulePhase(
/**
* INTERNAL API
*/
@InternalApi private[akka] final class SourceModulePhase(
materializer: PhasedFusingActorMaterializer,
islandName: String) extends PhaseIsland[Publisher[Any]] {
override def name: String = s"SourceModule phase"
@ -763,9 +788,15 @@ final class SourceModulePhase(
override def onIslandReady(): Unit = ()
}
object SinkModuleIslandTag extends IslandTag
/**
* INTERNAL API
*/
@InternalApi private[akka] object SinkModuleIslandTag extends IslandTag
final class SinkModulePhase(materializer: PhasedFusingActorMaterializer, islandName: String)
/**
* INTERNAL API
*/
@InternalApi private[akka] final class SinkModulePhase(materializer: PhasedFusingActorMaterializer, islandName: String)
extends PhaseIsland[AnyRef] {
override def name: String = s"SourceModule phase"
var subscriberOrVirtualPublisher: AnyRef = _
@ -797,9 +828,15 @@ final class SinkModulePhase(materializer: PhasedFusingActorMaterializer, islandN
override def onIslandReady(): Unit = ()
}
object ProcessorModuleIslandTag extends IslandTag
/**
* INTERNAL API
*/
@InternalApi private[akka] object ProcessorModuleIslandTag extends IslandTag
final class ProcessorModulePhase(materializer: PhasedFusingActorMaterializer, islandName: String)
/**
* INTERNAL API
*/
@InternalApi private[akka] final class ProcessorModulePhase(materializer: PhasedFusingActorMaterializer, islandName: String)
extends PhaseIsland[Processor[Any, Any]] {
override def name: String = "ProcessorModulePhase"
private[this] var processor: Processor[Any, Any] = _
@ -819,9 +856,15 @@ final class ProcessorModulePhase(materializer: PhasedFusingActorMaterializer, is
override def onIslandReady(): Unit = ()
}
object TlsModuleIslandTag extends IslandTag
/**
* INTERNAL API
*/
@InternalApi private[akka] object TlsModuleIslandTag extends IslandTag
final class TlsModulePhase(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, islandName: String) extends PhaseIsland[NotUsed] {
/**
* INTERNAL API
*/
@InternalApi private[akka] final class TlsModulePhase(settings: ActorMaterializerSettings, materializer: PhasedFusingActorMaterializer, islandName: String) extends PhaseIsland[NotUsed] {
def name: String = "TlsModulePhase"
var tlsActor: ActorRef = _

View file

@ -3,13 +3,15 @@
*/
package akka.stream.impl
import akka.annotation.InternalApi
import scala.util.control.NonFatal
import org.reactivestreams.{ Subscriber, Subscription }
/**
* INTERNAL API
*/
private[stream] object ReactiveStreamsCompliance {
@InternalApi private[stream] object ReactiveStreamsCompliance {
final val CanNotSubscribeTheSameSubscriberMultipleTimes =
"can not subscribe the same subscriber multiple times (see reactive-streams specification, rules 1.10 and 2.12)"

View file

@ -6,6 +6,7 @@ package akka.stream.impl
import scala.annotation.tailrec
import scala.util.control.NoStackTrace
import ResizableMultiReaderRingBuffer._
import akka.annotation.InternalApi
/**
* INTERNAL API
@ -13,7 +14,7 @@ import ResizableMultiReaderRingBuffer._
* Contrary to many other ring buffer implementations this one does not automatically overwrite the oldest
* elements, rather, if full, the buffer tries to grow and rejects further writes if max capacity is reached.
*/
private[akka] class ResizableMultiReaderRingBuffer[T](
@InternalApi private[akka] class ResizableMultiReaderRingBuffer[T](
initialSize: Int, // constructor param, not field
maxSize: Int, // constructor param, not field
val cursors: Cursors) {
@ -142,7 +143,7 @@ private[akka] class ResizableMultiReaderRingBuffer[T](
/**
* INTERNAL API
*/
private[akka] object ResizableMultiReaderRingBuffer {
@InternalApi private[akka] object ResizableMultiReaderRingBuffer {
object NothingToReadException extends RuntimeException with NoStackTrace
trait Cursors {

View file

@ -6,6 +6,8 @@ package akka.stream.impl
import java.util.concurrent.atomic.AtomicLong
import akka.annotation.{ DoNotInherit, InternalApi }
/**
* INTERNAL API
* As discussed in https://github.com/akka/akka/issues/16613
@ -13,15 +15,22 @@ import java.util.concurrent.atomic.AtomicLong
* Generator of sequentially numbered actor names.
* Pulled out from HTTP internals, most often used used by streams which materialize actors directly
*/
abstract class SeqActorName {
@DoNotInherit private[akka] abstract class SeqActorName {
def next(): String
def copy(name: String): SeqActorName
}
object SeqActorName {
/**
* INTERNAL API
*/
@InternalApi private[akka] object SeqActorName {
def apply(prefix: String) = new SeqActorNameImpl(prefix, new AtomicLong(0))
}
private[akka] final class SeqActorNameImpl(val prefix: String, counter: AtomicLong) extends SeqActorName {
/**
* INTERNAL API
*/
@InternalApi private[akka] final class SeqActorNameImpl(val prefix: String, counter: AtomicLong) extends SeqActorName {
def next(): String = prefix + '-' + counter.getAndIncrement()
def copy(newPrefix: String): SeqActorName = new SeqActorNameImpl(newPrefix, counter)

View file

@ -4,6 +4,7 @@
package akka.stream.impl
import akka.Done
import akka.annotation.InternalApi
import scala.concurrent.Promise
import org.reactivestreams.{ Subscriber, Subscription }
@ -11,8 +12,7 @@ import org.reactivestreams.{ Subscriber, Subscription }
/**
* INTERNAL API
*/
private[akka] final class SinkholeSubscriber[T](whenComplete: Promise[Done]) extends Subscriber[T] {
@InternalApi private[akka] final class SinkholeSubscriber[T](whenComplete: Promise[Done]) extends Subscriber[T] {
private[this] var running: Boolean = false
override def onSubscribe(sub: Subscription): Unit = {

View file

@ -8,7 +8,7 @@ import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream.Supervision.{ Stop, stoppingDecider }
import akka.stream.impl.QueueSink.{ Output, Pull }
import akka.stream.impl.fusing.GraphInterpreter
import akka.{ Done, NotUsed }
import akka.{ Done, NotUsed, annotation }
import akka.actor.{ Actor, ActorRef, Props }
import akka.stream.Attributes.InputBuffer
import akka.stream._
@ -36,13 +36,14 @@ import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._
import java.util.Optional
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.event.Logging
import akka.util.OptionVal
/**
* INTERNAL API
*/
abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) extends AtomicModule[SinkShape[In], Mat] {
@DoNotInherit private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) extends AtomicModule[SinkShape[In], Mat] {
/**
* Create the Subscriber or VirtualPublisher that consumes the incoming
@ -82,7 +83,7 @@ abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) extends AtomicModu
* elements to fill the internal buffers it will assert back-pressure until
* a subscriber connects and creates demand for elements to be emitted.
*/
private[akka] class PublisherSink[In](val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, Publisher[In]](shape) {
@InternalApi private[akka] class PublisherSink[In](val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, Publisher[In]](shape) {
/*
* This method is the reason why SinkModule.create may return something that is
@ -101,7 +102,7 @@ private[akka] class PublisherSink[In](val attributes: Attributes, shape: SinkSha
/**
* INTERNAL API
*/
private[akka] final class FanoutPublisherSink[In](
@InternalApi private[akka] final class FanoutPublisherSink[In](
val attributes: Attributes,
shape: SinkShape[In])
extends SinkModule[In, Publisher[In]](shape) {
@ -128,7 +129,7 @@ private[akka] final class FanoutPublisherSink[In](
* INTERNAL API
* Attaches a subscriber to this stream.
*/
final class SubscriberSink[In](subscriber: Subscriber[In], val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, NotUsed](shape) {
@InternalApi private[akka] final class SubscriberSink[In](subscriber: Subscriber[In], val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, NotUsed](shape) {
override def create(context: MaterializationContext) = (subscriber, NotUsed)
@ -140,7 +141,7 @@ final class SubscriberSink[In](subscriber: Subscriber[In], val attributes: Attri
* INTERNAL API
* A sink that immediately cancels its upstream upon materialization.
*/
final class CancelSink(val attributes: Attributes, shape: SinkShape[Any]) extends SinkModule[Any, NotUsed](shape) {
@InternalApi private[akka] final class CancelSink(val attributes: Attributes, shape: SinkShape[Any]) extends SinkModule[Any, NotUsed](shape) {
override def create(context: MaterializationContext): (Subscriber[Any], NotUsed) = (new CancellingSubscriber[Any], NotUsed)
override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, NotUsed] = new CancelSink(attributes, shape)
override def withAttributes(attr: Attributes): SinkModule[Any, NotUsed] = new CancelSink(attr, amendShape(attr))
@ -151,7 +152,7 @@ final class CancelSink(val attributes: Attributes, shape: SinkShape[Any]) extend
* Creates and wraps an actor into [[org.reactivestreams.Subscriber]] from the given `props`,
* which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorSubscriber]].
*/
final class ActorSubscriberSink[In](props: Props, val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, ActorRef](shape) {
@InternalApi private[akka] final class ActorSubscriberSink[In](props: Props, val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, ActorRef](shape) {
override def create(context: MaterializationContext) = {
val subscriberRef = ActorMaterializerHelper.downcast(context.materializer).actorOf(context, props)
@ -165,7 +166,7 @@ final class ActorSubscriberSink[In](props: Props, val attributes: Attributes, sh
/**
* INTERNAL API
*/
final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any,
@InternalApi private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any,
val attributes: Attributes,
shape: SinkShape[In]) extends SinkModule[In, NotUsed](shape) {
@ -184,7 +185,10 @@ final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any,
new ActorRefSink[In](ref, onCompleteMessage, attr, amendShape(attr))
}
final class LastOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] {
/**
* INTERNAL API
*/
@InternalApi private[akka] final class LastOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] {
val in: Inlet[T] = Inlet("lastOption.in")
@ -222,7 +226,10 @@ final class LastOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape
override def toString: String = "LastOptionStage"
}
final class HeadOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] {
/**
* INTERNAL API
*/
@InternalApi private[akka] final class HeadOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] {
val in: Inlet[T] = Inlet("headOption.in")
@ -255,7 +262,10 @@ final class HeadOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape
override def toString: String = "HeadOptionStage"
}
final class SeqStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[immutable.Seq[T]]] {
/**
* INTERNAL API
*/
@InternalApi private[akka] final class SeqStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[immutable.Seq[T]]] {
val in = Inlet[T]("seq.in")
override def toString: String = "SeqStage"
@ -294,7 +304,10 @@ final class SeqStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Fu
}
}
private[stream] object QueueSink {
/**
* INTERNAL API
*/
@InternalApi private[akka] object QueueSink {
sealed trait Output[+T]
final case class Pull[T](promise: Promise[Option[T]]) extends Output[T]
case object Cancel extends Output[Nothing]
@ -303,7 +316,7 @@ private[stream] object QueueSink {
/**
* INTERNAL API
*/
final class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkShape[T], SinkQueueWithCancel[T]] {
@InternalApi private[akka] final class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkShape[T], SinkQueueWithCancel[T]] {
type Requested[E] = Promise[Option[E]]
val in = Inlet[T]("queueSink.in")
@ -395,7 +408,10 @@ final class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkShape[T],
}
}
final class SinkQueueAdapter[T](delegate: SinkQueueWithCancel[T]) extends akka.stream.javadsl.SinkQueueWithCancel[T] {
/**
* INTERNAL API
*/
@InternalApi private[akka] final class SinkQueueAdapter[T](delegate: SinkQueueWithCancel[T]) extends akka.stream.javadsl.SinkQueueWithCancel[T] {
import akka.dispatch.ExecutionContexts.{ sameThreadExecutionContext same }
def pull(): CompletionStage[Optional[T]] = delegate.pull().map(_.asJava)(same).toJava
def cancel(): Unit = delegate.cancel()
@ -407,7 +423,7 @@ final class SinkQueueAdapter[T](delegate: SinkQueueWithCancel[T]) extends akka.s
*
* Helper class to be able to express collection as a fold using mutable data
*/
private[akka] final class CollectorState[T, R](val collector: java.util.stream.Collector[T, Any, R]) {
@InternalApi private[akka] final class CollectorState[T, R](val collector: java.util.stream.Collector[T, Any, R]) {
lazy val accumulated = collector.supplier().get()
private lazy val accumulator = collector.accumulator()
@ -424,7 +440,7 @@ private[akka] final class CollectorState[T, R](val collector: java.util.stream.C
*
* Helper class to be able to express reduce as a fold for parallel collector
*/
private[akka] final class ReducerState[T, R](val collector: java.util.stream.Collector[T, Any, R]) {
@InternalApi private[akka] final class ReducerState[T, R](val collector: java.util.stream.Collector[T, Any, R]) {
private var reduced: Any = null.asInstanceOf[Any]
private lazy val combiner = collector.combiner()
@ -440,7 +456,7 @@ private[akka] final class ReducerState[T, R](val collector: java.util.stream.Col
/**
* INTERNAL API
*/
final private[stream] class LazySink[T, M](sinkFactory: T Future[Sink[T, M]], zeroMat: () M) extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] {
@InternalApi final private[stream] class LazySink[T, M](sinkFactory: T Future[Sink[T, M]], zeroMat: () M) extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] {
val in = Inlet[T]("lazySink.in")
override def initialAttributes = DefaultAttributes.lazySink
override val shape: SinkShape[T] = SinkShape.of(in)

View file

@ -25,7 +25,7 @@ import scala.util.control.NonFatal
/**
* INTERNAL API
*/
private[stream] object QueueSource {
@InternalApi private[akka] object QueueSource {
sealed trait Input[+T]
final case class Offer[+T](elem: T, promise: Promise[QueueOfferResult]) extends Input[T]
case object Completion extends Input[Nothing]
@ -35,7 +35,7 @@ private[stream] object QueueSource {
/**
* INTERNAL API
*/
final class QueueSource[T](maxBuffer: Int, overflowStrategy: OverflowStrategy) extends GraphStageWithMaterializedValue[SourceShape[T], SourceQueueWithComplete[T]] {
@InternalApi private[akka] final class QueueSource[T](maxBuffer: Int, overflowStrategy: OverflowStrategy) extends GraphStageWithMaterializedValue[SourceShape[T], SourceQueueWithComplete[T]] {
import QueueSource._
val out = Outlet[T]("queueSource.out")
@ -189,7 +189,10 @@ final class QueueSource[T](maxBuffer: Int, overflowStrategy: OverflowStrategy) e
}
}
final class SourceQueueAdapter[T](delegate: SourceQueueWithComplete[T]) extends akka.stream.javadsl.SourceQueueWithComplete[T] {
/**
* INTERNAL API
*/
@InternalApi private[akka] final class SourceQueueAdapter[T](delegate: SourceQueueWithComplete[T]) extends akka.stream.javadsl.SourceQueueWithComplete[T] {
def offer(elem: T): CompletionStage[QueueOfferResult] = delegate.offer(elem).toJava
def watchCompletion(): CompletionStage[Done] = delegate.watchCompletion().toJava
def complete(): Unit = delegate.complete()
@ -199,7 +202,7 @@ final class SourceQueueAdapter[T](delegate: SourceQueueWithComplete[T]) extends
/**
* INTERNAL API
*/
final class UnfoldResourceSource[T, S](
@InternalApi private[akka] final class UnfoldResourceSource[T, S](
create: () S,
readData: (S) Option[T],
close: (S) Unit) extends GraphStage[SourceShape[T]] {
@ -256,7 +259,10 @@ final class UnfoldResourceSource[T, S](
override def toString = "UnfoldResourceSource"
}
final class UnfoldResourceSourceAsync[T, S](
/**
* INTERNAL API
*/
@InternalApi private[akka] final class UnfoldResourceSourceAsync[T, S](
create: () Future[S],
readData: (S) Future[Option[T]],
close: (S) Future[Done]) extends GraphStage[SourceShape[T]] {
@ -338,11 +344,17 @@ final class UnfoldResourceSourceAsync[T, S](
}
object LazySource {
/**
* INTERNAL API
*/
@InternalApi private[akka] object LazySource {
def apply[T, M](sourceFactory: () Source[T, M]) = new LazySource[T, M](sourceFactory)
}
final class LazySource[T, M](sourceFactory: () Source[T, M]) extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] {
/**
* INTERNAL API
*/
@InternalApi private[akka] final class LazySource[T, M](sourceFactory: () Source[T, M]) extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] {
val out = Outlet[T]("LazySource.out")
override val shape = SourceShape(out)
@ -398,9 +410,10 @@ final class LazySource[T, M](sourceFactory: () ⇒ Source[T, M]) extends GraphSt
override def toString = "LazySource"
}
/** INTERNAL API */
@InternalApi
final object EmptySource extends GraphStage[SourceShape[Nothing]] {
/**
* INTERNAL API
*/
@InternalApi private[akka] final object EmptySource extends GraphStage[SourceShape[Nothing]] {
val out = Outlet[Nothing]("EmptySource.out")
override val shape = SourceShape(out)

View file

@ -3,6 +3,7 @@
*/
package akka.stream.impl
import akka.annotation.InternalApi
import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream.Attributes._
import akka.stream.Supervision.Decider
@ -11,7 +12,7 @@ import akka.stream._
/**
* INTERNAL API
*/
object Stages {
@InternalApi private[akka] object Stages {
object DefaultAttributes {
val IODispatcher = ActorAttributes.IODispatcher

View file

@ -5,6 +5,7 @@ package akka.stream.impl
import java.util.concurrent.atomic.AtomicReference
import akka.annotation.InternalApi
import akka.stream._
import akka.stream.impl.Stages.DefaultAttributes
import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription }
@ -15,7 +16,7 @@ import scala.util.control.NonFatal
/**
* INTERNAL API
*/
object StreamLayout {
@InternalApi private[stream] object StreamLayout {
// compile-time constant
final val Debug = false
@ -31,7 +32,7 @@ object StreamLayout {
/**
* INTERNAL API
*/
private[stream] object VirtualProcessor {
@InternalApi private[stream] object VirtualProcessor {
case object Inert {
val subscriber = new CancellingSubscriber[Any]
}
@ -91,7 +92,7 @@ private[stream] object VirtualProcessor {
* Publisher if things go wrong (like `request(0)` coming in from downstream) and
* it must ensure that we drop the Subscriber reference when `cancel` is invoked.
*/
private[stream] final class VirtualProcessor[T] extends AtomicReference[AnyRef] with Processor[T, T] {
@InternalApi private[stream] final class VirtualProcessor[T] extends AtomicReference[AnyRef] with Processor[T, T] {
import ReactiveStreamsCompliance._
import VirtualProcessor._
@ -312,7 +313,7 @@ private[stream] final class VirtualProcessor[T] extends AtomicReference[AnyRef]
* to the `Subscriber` after having hooked it up with the real `Publisher`, hence
* the use of `Inert.subscriber` as a tombstone.
*/
private[impl] class VirtualPublisher[T] extends AtomicReference[AnyRef] with Publisher[T] {
@InternalApi private[impl] class VirtualPublisher[T] extends AtomicReference[AnyRef] with Publisher[T] {
import ReactiveStreamsCompliance._
import VirtualProcessor.Inert
@ -346,7 +347,7 @@ private[impl] class VirtualPublisher[T] extends AtomicReference[AnyRef] with Pub
/**
* INTERNAL API
*/
final case class ProcessorModule[In, Out, Mat](
@InternalApi private[akka] final case class ProcessorModule[In, Out, Mat](
val createProcessor: () (Processor[In, Out], Mat),
attributes: Attributes = DefaultAttributes.processor) extends StreamLayout.AtomicModule[FlowShape[In, Out], Mat] {
val inPort = Inlet[In]("ProcessorModule.in")

View file

@ -4,13 +4,18 @@
package akka.stream.impl
import akka.actor._
import akka.annotation.InternalApi
import akka.stream.StreamSubscriptionTimeoutTerminationMode.{ CancelTermination, NoopTermination, WarnTermination }
import akka.stream.StreamSubscriptionTimeoutSettings
import org.reactivestreams._
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NoStackTrace
object StreamSubscriptionTimeoutSupport {
/**
* INTERNAL API
*/
@InternalApi private[akka] object StreamSubscriptionTimeoutSupport {
/**
* A subscriber who calls `cancel` directly from `onSubscribe` and ignores all other callbacks.
@ -37,7 +42,7 @@ object StreamSubscriptionTimeoutSupport {
* Subscription timeout which does not start any scheduled events and always returns `true`.
* This specialized implementation is to be used for "noop" timeout mode.
*/
case object NoopSubscriptionTimeout extends Cancellable {
@InternalApi private[akka] case object NoopSubscriptionTimeout extends Cancellable {
override def cancel() = true
override def isCancelled = true
}
@ -50,7 +55,7 @@ object StreamSubscriptionTimeoutSupport {
*
* See `akka.stream.materializer.subscription-timeout` for configuration options.
*/
private[akka] trait StreamSubscriptionTimeoutSupport {
@InternalApi private[akka] trait StreamSubscriptionTimeoutSupport {
this: Actor with ActorLogging
import StreamSubscriptionTimeoutSupport._
@ -112,4 +117,4 @@ private[akka] trait StreamSubscriptionTimeoutSupport {
/**
* INTERNAL API
*/
private[akka] class SubscriptionTimeoutException(msg: String) extends RuntimeException(msg)
@InternalApi private[akka] class SubscriptionTimeoutException(msg: String) extends RuntimeException(msg)

View file

@ -4,17 +4,25 @@
package akka.stream.impl
import akka.NotUsed
import akka.annotation.InternalApi
import akka.stream._
import akka.stream.scaladsl._
import language.higherKinds
object SubFlowImpl {
/**
* INTERNAL API
*/
@InternalApi private[akka] object SubFlowImpl {
trait MergeBack[In, F[+_]] {
def apply[T](f: Flow[In, T, NotUsed], breadth: Int): F[T]
}
}
class SubFlowImpl[In, Out, Mat, F[+_], C](
/**
* INTERNAL API
*/
@InternalApi private[akka] class SubFlowImpl[In, Out, Mat, F[+_], C](
val subFlow: Flow[In, Out, NotUsed],
mergeBackFunction: SubFlowImpl.MergeBack[In, F],
finishFunction: Sink[In, NotUsed] C)

View file

@ -3,6 +3,7 @@
*/
package akka.stream.impl
import akka.annotation.InternalApi
import akka.stream.ThrottleMode.{ Enforcing, Shaping }
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.stage._
@ -14,7 +15,7 @@ import scala.concurrent.duration.{ FiniteDuration, _ }
/**
* INTERNAL API
*/
class Throttle[T](
@InternalApi private[akka] class Throttle[T](
val cost: Int,
val per: FiniteDuration,
val maximumBurst: Int,

View file

@ -5,6 +5,7 @@ package akka.stream.impl
import java.util.concurrent.{ TimeUnit, TimeoutException }
import akka.annotation.InternalApi
import akka.stream._
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
@ -23,7 +24,7 @@ import scala.concurrent.duration.{ Duration, FiniteDuration }
* - if the timer fires before the event happens, these stages all fail the stream
* - otherwise, these streams do not interfere with the element flow, ordinary completion or failure
*/
object Timers {
@InternalApi private[akka] object Timers {
private def idleTimeoutCheckInterval(timeout: FiniteDuration): FiniteDuration = {
import scala.concurrent.duration._
FiniteDuration(

View file

@ -4,12 +4,13 @@
package akka.stream.impl
import scala.util.control.NonFatal
import akka.actor.{ Actor }
import akka.actor.Actor
import akka.annotation.InternalApi
/**
* INTERNAL API
*/
private[akka] class SubReceive(initial: Actor.Receive) extends Actor.Receive {
@InternalApi private[akka] class SubReceive(initial: Actor.Receive) extends Actor.Receive {
private var currentReceive = initial
override def isDefinedAt(msg: Any): Boolean = currentReceive.isDefinedAt(msg)
@ -23,7 +24,7 @@ private[akka] class SubReceive(initial: Actor.Receive) extends Actor.Receive {
/**
* INTERNAL API
*/
private[akka] trait Inputs {
@InternalApi private[akka] trait Inputs {
def NeedsInput: TransferState
def NeedsInputOrComplete: TransferState
@ -42,7 +43,7 @@ private[akka] trait Inputs {
/**
* INTERNAL API
*/
private[akka] trait DefaultInputTransferStates extends Inputs {
@InternalApi private[akka] trait DefaultInputTransferStates extends Inputs {
override val NeedsInput: TransferState = new TransferState {
def isReady = inputsAvailable
def isCompleted = inputsDepleted
@ -56,7 +57,7 @@ private[akka] trait DefaultInputTransferStates extends Inputs {
/**
* INTERNAL API
*/
private[akka] trait Outputs {
@InternalApi private[akka] trait Outputs {
def NeedsDemand: TransferState
def NeedsDemandOrCancel: TransferState
@ -78,7 +79,7 @@ private[akka] trait Outputs {
/**
* INTERNAL API
*/
private[akka] trait DefaultOutputTransferStates extends Outputs {
@InternalApi private[akka] trait DefaultOutputTransferStates extends Outputs {
override val NeedsDemand: TransferState = new TransferState {
def isReady = demandAvailable
def isCompleted = isClosed
@ -93,7 +94,7 @@ private[akka] trait DefaultOutputTransferStates extends Outputs {
/**
* INTERNAL API
*/
private[akka] trait TransferState {
@InternalApi private[akka] trait TransferState {
def isReady: Boolean
def isCompleted: Boolean
def isExecutable = isReady && !isCompleted
@ -112,7 +113,7 @@ private[akka] trait TransferState {
/**
* INTERNAL API
*/
private[akka] object Completed extends TransferState {
@InternalApi private[akka] object Completed extends TransferState {
def isReady = false
def isCompleted = true
}
@ -120,7 +121,7 @@ private[akka] object Completed extends TransferState {
/**
* INTERNAL API
*/
private[akka] object NotInitialized extends TransferState {
@InternalApi private[akka] object NotInitialized extends TransferState {
def isReady = false
def isCompleted = false
}
@ -128,7 +129,7 @@ private[akka] object NotInitialized extends TransferState {
/**
* INTERNAL API
*/
private[akka] case class WaitingForUpstreamSubscription(remaining: Int, andThen: TransferPhase) extends TransferState {
@InternalApi private[akka] case class WaitingForUpstreamSubscription(remaining: Int, andThen: TransferPhase) extends TransferState {
def isReady = false
def isCompleted = false
}
@ -136,7 +137,7 @@ private[akka] case class WaitingForUpstreamSubscription(remaining: Int, andThen:
/**
* INTERNAL API
*/
private[akka] object Always extends TransferState {
@InternalApi private[akka] object Always extends TransferState {
def isReady = true
def isCompleted = false
}
@ -144,12 +145,12 @@ private[akka] object Always extends TransferState {
/**
* INTERNAL API
*/
private[akka] final case class TransferPhase(precondition: TransferState)(val action: () Unit)
@InternalApi private[akka] final case class TransferPhase(precondition: TransferState)(val action: () Unit)
/**
* INTERNAL API
*/
private[akka] trait Pump {
@InternalApi private[akka] trait Pump {
private var transferState: TransferState = NotInitialized
private var currentAction: () Unit =
() throw new IllegalStateException("Pump has been not initialized with a phase")

View file

@ -4,15 +4,19 @@
package akka.stream.impl
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.stream._
import akka.stream.impl.StreamLayout.AtomicModule
import akka.stream.impl.TraversalBuilder.{ AnyFunction1, AnyFunction2 }
import akka.stream.scaladsl.Keep
import akka.util.OptionVal
import scala.language.existentials
import scala.collection.immutable.Map.Map1
/**
* INTERNAL API
*
* Graphs to be materialized are defined by their traversal. There is no explicit graph information tracked, instead
* a sequence of steps required to "reconstruct" the graph.
*
@ -31,7 +35,7 @@ import scala.collection.immutable.Map.Map1
* be encoded somehow. The two imports don't need any special treatment as they are at different positions in
* the traversal. See [[MaterializeAtomic]] for more details.
*/
sealed trait Traversal {
@InternalApi private[akka] sealed trait Traversal {
/**
* Concatenates two traversals building a new Traversal which traverses both.
@ -43,7 +47,10 @@ sealed trait Traversal {
def rewireFirstTo(relativeOffset: Int): Traversal = null
}
object Concat {
/**
* INTERNAL API
*/
@InternalApi private[akka] object Concat {
def normalizeConcat(first: Traversal, second: Traversal): Traversal = {
if (second eq EmptyTraversal) first
@ -70,9 +77,11 @@ object Concat {
}
/**
* INTERNAL API
*
* A Traversal that consists of two traversals. The linked traversals must be traversed in first, next order.
*/
final case class Concat(first: Traversal, next: Traversal) extends Traversal {
@InternalApi private[akka] final case class Concat(first: Traversal, next: Traversal) extends Traversal {
override def rewireFirstTo(relativeOffset: Int): Traversal = {
val firstResult = first.rewireFirstTo(relativeOffset)
if (firstResult ne null)
@ -84,6 +93,8 @@ final case class Concat(first: Traversal, next: Traversal) extends Traversal {
}
/**
* INTERNAL API
*
* Arriving at this step means that an atomic module needs to be materialized (or any other activity which
* assigns something to wired output-input port pairs).
*
@ -101,28 +112,47 @@ final case class Concat(first: Traversal, next: Traversal) extends Traversal {
*
* See the `TraversalTestUtils` class and the `testMaterialize` method for a simple example.
*/
final case class MaterializeAtomic(module: AtomicModule[Shape, Any], outToSlots: Array[Int]) extends Traversal {
@InternalApi private[akka] final case class MaterializeAtomic(module: AtomicModule[Shape, Any], outToSlots: Array[Int]) extends Traversal {
override def toString: String = s"MaterializeAtomic($module, ${outToSlots.mkString("[", ", ", "]")})"
override def rewireFirstTo(relativeOffset: Int): Traversal = copy(outToSlots = Array(relativeOffset))
}
/**
* INTERNAL API
*
* Traversal with no steps.
*/
object EmptyTraversal extends Traversal {
@InternalApi private[akka] object EmptyTraversal extends Traversal {
override def concat(that: Traversal): Traversal = that
}
sealed trait MaterializedValueOp extends Traversal
/**
* INTERNAL API
*/
@InternalApi private[akka] sealed trait MaterializedValueOp extends Traversal
case object Pop extends MaterializedValueOp
case object PushNotUsed extends MaterializedValueOp
final case class Transform(mapper: AnyFunction1) extends MaterializedValueOp {
/**
* INTERNAL API
*/
@InternalApi private[akka] case object Pop extends MaterializedValueOp
/**
* INTERNAL API
*/
@InternalApi private[akka] case object PushNotUsed extends MaterializedValueOp
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class Transform(mapper: AnyFunction1) extends MaterializedValueOp {
def apply(arg: Any): Any = mapper.asInstanceOf[Any Any](arg)
}
final case class Compose(composer: AnyFunction2, reverse: Boolean = false) extends MaterializedValueOp {
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class Compose(composer: AnyFunction2, reverse: Boolean = false) extends MaterializedValueOp {
def apply(arg1: Any, arg2: Any): Any = {
if (reverse)
composer.asInstanceOf[(Any, Any) Any](arg2, arg1)
@ -131,13 +161,30 @@ final case class Compose(composer: AnyFunction2, reverse: Boolean = false) exten
}
}
final case class PushAttributes(attributes: Attributes) extends Traversal
final case object PopAttributes extends Traversal
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class PushAttributes(attributes: Attributes) extends Traversal
final case class EnterIsland(islandTag: IslandTag) extends Traversal
final case object ExitIsland extends Traversal
/**
* INTERNAL API
*/
@InternalApi private[akka] final case object PopAttributes extends Traversal
object TraversalBuilder {
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class EnterIsland(islandTag: IslandTag) extends Traversal
/**
* INTERNAL API
*/
@InternalApi private[akka] final case object ExitIsland extends Traversal
/**
* INTERNAL API
*/
@InternalApi private[akka] object TraversalBuilder {
// The most generic function1 and function2 (also completely useless, as we have thrown away all types)
// needs to be casted once to be useful (pending runtime exception in cases of bugs).
type AnyFunction1 = Nothing Any
@ -146,10 +193,12 @@ object TraversalBuilder {
private val cachedEmptyCompleted = CompletedTraversalBuilder(PushNotUsed, 0, Map.empty, Attributes.none)
/**
* INTERNAL API
*
* Assign ports their id, which is their position inside the Shape. This is used both by the GraphInterpreter
* and the layout system here.
*/
private[impl] def initShape(shape: Shape): Unit = {
@InternalApi private[impl] def initShape(shape: Shape): Unit = {
// Initialize port IDs
val inlets = shape.inlets
if (inlets.nonEmpty) {
@ -180,15 +229,20 @@ object TraversalBuilder {
}
}
def empty(attributes: Attributes = Attributes.none): TraversalBuilder = {
/**
* INTERNAL API
*/
@InternalApi private[akka] def empty(attributes: Attributes = Attributes.none): TraversalBuilder = {
if (attributes eq Attributes.none) cachedEmptyCompleted
else CompletedTraversalBuilder(PushNotUsed, 0, Map.empty, attributes)
}
/**
* INTERNAL API
*
* Create a generic traversal builder starting from an atomic module.
*/
def atomic(module: AtomicModule[Shape, Any], attributes: Attributes): TraversalBuilder = {
@InternalApi private[akka] def atomic(module: AtomicModule[Shape, Any], attributes: Attributes): TraversalBuilder = {
initShape(module.shape)
val builder =
@ -210,7 +264,10 @@ object TraversalBuilder {
builder.setAttributes(attributes)
}
def printTraversal(t: Traversal, indent: Int = 0): Unit = {
/**
* INTERNAL API
*/
@InternalApi private[impl] def printTraversal(t: Traversal, indent: Int = 0): Unit = {
var current: Traversal = t
var slot = 0
@ -240,7 +297,10 @@ object TraversalBuilder {
}
}
def printWiring(t: Traversal, baseSlot: Int = 0): Int = {
/**
* INTERNAL API
*/
@InternalApi private[impl] def printWiring(t: Traversal, baseSlot: Int = 0): Int = {
var current: Traversal = t
var slot = baseSlot
@ -271,6 +331,8 @@ object TraversalBuilder {
}
/**
* INTERNAL API
*
* A builder for a Traversal. The purpose of subclasses of this trait is to eventually build a Traversal that
* describes the graph. Depending on whether the graph is linear or generic different approaches can be used but
* they still result in a Traversal.
@ -279,7 +341,7 @@ object TraversalBuilder {
* wired). The Traversal may be accessed earlier, depending on the type of the builder and certain conditions.
* See [[CompositeTraversalBuilder]] and [[LinearTraversalBuilder]].
*/
sealed trait TraversalBuilder {
@DoNotInherit private[akka] sealed trait TraversalBuilder {
/**
* Adds a module to the builder. It is possible to add a module with a different Shape (import), in this
@ -377,9 +439,11 @@ sealed trait TraversalBuilder {
}
/**
* INTERNAL API
*
* Returned by [[CompositeTraversalBuilder]] once all output ports of a subgraph has been wired.
*/
final case class CompletedTraversalBuilder(
@InternalApi private[akka] final case class CompletedTraversalBuilder(
traversalSoFar: Traversal,
inSlots: Int,
inToOffset: Map[InPort, Int],
@ -438,10 +502,12 @@ final case class CompletedTraversalBuilder(
}
/**
* INTERNAL API
*
* Represents a builder that contains a single atomic module. Its primary purpose is to track and build the
* outToSlot array which will be then embedded in a [[MaterializeAtomic]] Traversal step.
*/
final case class AtomicTraversalBuilder(
@InternalApi private[akka] final case class AtomicTraversalBuilder(
module: AtomicModule[Shape, Any],
outToSlot: Array[Int],
unwiredOuts: Int,
@ -499,7 +565,10 @@ final case class AtomicTraversalBuilder(
TraversalBuilder.empty().add(this, module.shape, Keep.right).makeIsland(islandTag)
}
object LinearTraversalBuilder {
/**
* INTERNAL API
*/
@InternalApi private[akka] object LinearTraversalBuilder {
// TODO: Remove
private val cachedEmptyLinear = LinearTraversalBuilder(OptionVal.None, OptionVal.None, 0, 0, PushNotUsed, OptionVal.None, Attributes.none)
@ -594,6 +663,8 @@ object LinearTraversalBuilder {
}
/**
* INTERNAL API
*
* Traversal builder that is optimized for linear graphs (those that contain modules with at most one input and
* at most one output port). The Traversal is simply built up in reverse order and output ports are automatically
* assigned to -1 due to the nature of the graph. The only exception is when composites created by
@ -601,7 +672,7 @@ object LinearTraversalBuilder {
* in a fixed location, therefore the last step of the Traversal might need to be changed in those cases from the
* -1 relative offset to something else (see rewireLastOutTo).
*/
final case class LinearTraversalBuilder(
@InternalApi private[akka] final case class LinearTraversalBuilder(
inPort: OptionVal[InPort],
outPort: OptionVal[OutPort],
inOffset: Int,
@ -978,19 +1049,31 @@ final case class LinearTraversalBuilder(
}
}
sealed trait TraversalBuildStep
/**
* INTERNAL API
*/
@DoNotInherit private[akka] sealed trait TraversalBuildStep
/**
* INTERNAL API
*
* Helper class that is only used to identify a [[TraversalBuilder]] in a [[CompositeTraversalBuilder]]. The
* reason why this is needed is because the builder is referenced at various places, while it needs to be mutated.
* In an immutable data structure this is best done with an indirection, i.e. places refer to this immutable key and
* look up the current state in an extra Map.
*/
final class BuilderKey extends TraversalBuildStep {
@InternalApi private[akka] final class BuilderKey extends TraversalBuildStep {
override def toString = s"K:$hashCode"
}
final case class AppendTraversal(traversal: Traversal) extends TraversalBuildStep
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class AppendTraversal(traversal: Traversal) extends TraversalBuildStep
/**
* INTERNAL API
*
* A generic builder that builds a traversal for graphs of arbitrary shape. The memory retained by this class
* usually decreases as ports are wired since auxiliary data is only maintained for ports that are unwired.
*
@ -1011,7 +1094,7 @@ final case class AppendTraversal(traversal: Traversal) extends TraversalBuildSte
* @param outOwners Map of output ports to their parent builders (actually the BuilderKey)
* @param unwiredOuts Number of output ports that have not yet been wired/assigned
*/
final case class CompositeTraversalBuilder(
@InternalApi private[akka] final case class CompositeTraversalBuilder(
finalSteps: Traversal = EmptyTraversal,
reverseBuildSteps: List[TraversalBuildStep] = AppendTraversal(PushNotUsed) :: Nil,
inSlots: Int = 0,

View file

@ -3,8 +3,9 @@
*/
package akka.stream.impl
import akka.annotation.InternalApi
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.stage.{ OutHandler, GraphStageLogic, GraphStage }
import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }
import akka.stream._
import scala.concurrent.Future
@ -13,7 +14,7 @@ import scala.util.{ Failure, Success, Try }
/**
* INTERNAL API
*/
final class Unfold[S, E](s: S, f: S Option[(S, E)]) extends GraphStage[SourceShape[E]] {
@InternalApi private[akka] final class Unfold[S, E](s: S, f: S Option[(S, E)]) extends GraphStage[SourceShape[E]] {
val out: Outlet[E] = Outlet("Unfold.out")
override val shape: SourceShape[E] = SourceShape(out)
override def initialAttributes: Attributes = DefaultAttributes.unfold
@ -36,7 +37,7 @@ final class Unfold[S, E](s: S, f: S ⇒ Option[(S, E)]) extends GraphStage[Sourc
/**
* INTERNAL API
*/
final class UnfoldAsync[S, E](s: S, f: S Future[Option[(S, E)]]) extends GraphStage[SourceShape[E]] {
@InternalApi private[akka] final class UnfoldAsync[S, E](s: S, f: S Future[Option[(S, E)]]) extends GraphStage[SourceShape[E]] {
val out: Outlet[E] = Outlet("UnfoldAsync.out")
override val shape: SourceShape[E] = SourceShape(out)
override def initialAttributes: Attributes = DefaultAttributes.unfoldAsync

View file

@ -8,6 +8,7 @@ import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicReference
import akka.actor._
import akka.annotation.InternalApi
import akka.event.Logging
import akka.stream._
import akka.stream.impl.ReactiveStreamsCompliance._
@ -24,7 +25,7 @@ import scala.util.control.NonFatal
/**
* INTERNAL API
*/
object ActorGraphInterpreter {
@InternalApi private[akka] object ActorGraphInterpreter {
object Resume extends DeadLetterSuppression with NoSerializationVerificationNeeded
@ -435,7 +436,7 @@ object ActorGraphInterpreter {
/**
* INTERNAL API
*/
final class GraphInterpreterShell(
@InternalApi private[akka] final class GraphInterpreterShell(
var connections: Array[Connection],
var logics: Array[GraphStageLogic],
settings: ActorMaterializerSettings,
@ -646,7 +647,7 @@ final class GraphInterpreterShell(
/**
* INTERNAL API
*/
final class ActorGraphInterpreter(_initial: GraphInterpreterShell) extends Actor with ActorLogging {
@InternalApi private[akka] final class ActorGraphInterpreter(_initial: GraphInterpreterShell) extends Actor with ActorLogging {
import ActorGraphInterpreter._
var activeInterpreters = Set.empty[GraphInterpreterShell]

View file

@ -9,6 +9,8 @@ import akka.stream.stage._
import akka.stream._
import java.util.concurrent.ThreadLocalRandom
import akka.annotation.InternalApi
import scala.util.control.NonFatal
/**
@ -16,7 +18,7 @@ import scala.util.control.NonFatal
*
* (See the class for the documentation of the internals)
*/
object GraphInterpreter {
@InternalApi private[akka] object GraphInterpreter {
/**
* Compile time constant, enable it for debug logging to the console.
*/
@ -84,9 +86,6 @@ object GraphInterpreter {
else s"Connection($id, $portState, $slot, $inHandler, $outHandler)"
}
/**
* INTERNAL API
*/
private val _currentInterpreter = new ThreadLocal[Array[AnyRef]] {
/*
* Using an Object-array avoids holding on to the GraphInterpreter class
@ -187,7 +186,7 @@ object GraphInterpreter {
* be modeled, or even dissolved (if preempted and a "stealing" external event is injected; for example the non-cycle
* edge of a balance is pulled, dissolving the original cycle).
*/
final class GraphInterpreter(
@InternalApi private[akka] final class GraphInterpreter(
val materializer: Materializer,
val log: LoggingAdapter,
val logics: Array[GraphStageLogic], // Array of stage logics
@ -203,7 +202,7 @@ final class GraphInterpreter(
/**
* INTERNAL API
*/
private[stream] var activeStage: GraphStageLogic = _
@InternalApi private[stream] var activeStage: GraphStageLogic = _
// The number of currently running stages. Once this counter reaches zero, the interpreter is considered to be
// completed
@ -245,7 +244,7 @@ final class GraphInterpreter(
/**
* INTERNAL API
*/
private[stream] def nonNull: GraphInterpreter = this
@InternalApi private[stream] def nonNull: GraphInterpreter = this
/**
* Dynamic handler changes are communicated from a GraphStageLogic by this method.

View file

@ -7,6 +7,7 @@ import akka.Done
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference }
import akka.actor.Cancellable
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.event.Logging
import akka.stream.FlowMonitorState._
@ -27,7 +28,7 @@ import scala.util.Try
* INTERNAL API
*/
// TODO: Fix variance issues
final case class GraphStageModule[+S <: Shape @uncheckedVariance, +M](
@InternalApi private[akka] final case class GraphStageModule[+S <: Shape @uncheckedVariance, +M](
shape: S,
attributes: Attributes,
stage: GraphStageWithMaterializedValue[S, M]) extends AtomicModule[S, M] {
@ -44,18 +45,18 @@ final case class GraphStageModule[+S <: Shape @uncheckedVariance, +M](
/**
* INTERNAL API
*/
object GraphStages {
@InternalApi private[akka] object GraphStages {
/**
* INTERNAL API
*/
abstract class SimpleLinearGraphStage[T] extends GraphStage[FlowShape[T, T]] {
@InternalApi private[akka] abstract class SimpleLinearGraphStage[T] extends GraphStage[FlowShape[T, T]] {
val in = Inlet[T](Logging.simpleName(this) + ".in")
val out = Outlet[T](Logging.simpleName(this) + ".out")
override val shape = FlowShape(in, out)
}
object Identity extends SimpleLinearGraphStage[Any] {
private object Identity extends SimpleLinearGraphStage[Any] {
override def initialAttributes = DefaultAttributes.identityOp
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
@ -74,7 +75,7 @@ object GraphStages {
/**
* INTERNAL API
*/
final class Detacher[T] extends SimpleLinearGraphStage[T] {
@InternalApi private[akka] final class Detacher[T] extends SimpleLinearGraphStage[T] {
override def initialAttributes = DefaultAttributes.detacher
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
@ -302,7 +303,7 @@ object GraphStages {
* INTERNAL API
* Discards all received elements.
*/
object IgnoreSink extends GraphStageWithMaterializedValue[SinkShape[Any], Future[Done]] {
@InternalApi private[akka] object IgnoreSink extends GraphStageWithMaterializedValue[SinkShape[Any], Future[Done]] {
val in = Inlet[Any]("Ignore.in")
val shape = SinkShape(in)
@ -344,7 +345,7 @@ object GraphStages {
* This can either be implemented inside the stage itself, or this method can be used,
* which adds a detacher stage to every input.
*/
private[stream] def withDetachedInputs[T](stage: GraphStage[UniformFanInShape[T, T]]) =
@InternalApi private[stream] def withDetachedInputs[T](stage: GraphStage[UniformFanInShape[T, T]]) =
GraphDSL.create() { implicit builder
import GraphDSL.Implicits._
val concat = builder.add(stage)

View file

@ -4,15 +4,18 @@
package akka.stream.impl.fusing
import java.util.concurrent.TimeUnit.NANOSECONDS
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.event.Logging.LogLevel
import akka.event.{ LogSource, Logging, LoggingAdapter }
import akka.stream.Attributes.{ InputBuffer, LogLevels }
import akka.stream.OverflowStrategies._
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.impl.{ Buffer BufferImpl, Stages, ReactiveStreamsCompliance }
import akka.stream.scaladsl.{ SourceQueue, Source }
import akka.stream.impl.{ ReactiveStreamsCompliance, Stages, Buffer BufferImpl }
import akka.stream.scaladsl.{ Source, SourceQueue }
import akka.stream.stage._
import akka.stream.{ Supervision, _ }
import scala.annotation.tailrec
import scala.collection.immutable
import scala.collection.immutable.VectorBuilder
@ -20,13 +23,14 @@ import scala.concurrent.Future
import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }
import akka.stream.ActorAttributes.SupervisionStrategy
import scala.concurrent.duration.{ FiniteDuration, _ }
import akka.stream.impl.Stages.DefaultAttributes
/**
* INTERNAL API
*/
final case class Map[In, Out](f: In Out) extends GraphStage[FlowShape[In, Out]] {
@InternalApi private[akka] final case class Map[In, Out](f: In Out) extends GraphStage[FlowShape[In, Out]] {
val in = Inlet[In]("Map.in")
val out = Outlet[Out]("Map.out")
override val shape = FlowShape(in, out)
@ -58,7 +62,7 @@ final case class Map[In, Out](f: In ⇒ Out) extends GraphStage[FlowShape[In, Ou
/**
* INTERNAL API
*/
final case class Filter[T](p: T Boolean) extends SimpleLinearGraphStage[T] {
@InternalApi private[akka] final case class Filter[T](p: T Boolean) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.filter
override def toString: String = "Filter"
@ -92,7 +96,7 @@ final case class Filter[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T] {
/**
* INTERNAL API
*/
final case class TakeWhile[T](p: T Boolean, inclusive: Boolean = false) extends SimpleLinearGraphStage[T] {
@InternalApi private[akka] final case class TakeWhile[T](p: T Boolean, inclusive: Boolean = false) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.takeWhile
override def toString: String = "TakeWhile"
@ -129,7 +133,7 @@ final case class TakeWhile[T](p: T ⇒ Boolean, inclusive: Boolean = false) exte
/**
* INTERNAL API
*/
final case class DropWhile[T](p: T Boolean) extends SimpleLinearGraphStage[T] {
@InternalApi private[akka] final case class DropWhile[T](p: T Boolean) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.dropWhile
def createLogic(inheritedAttributes: Attributes) = new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler {
@ -161,7 +165,7 @@ final case class DropWhile[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T
/**
* INTERNAL API
*/
abstract class SupervisedGraphStageLogic(inheritedAttributes: Attributes, shape: Shape) extends GraphStageLogic(shape) {
@DoNotInherit private[akka] abstract class SupervisedGraphStageLogic(inheritedAttributes: Attributes, shape: Shape) extends GraphStageLogic(shape) {
private lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
def withSupervision[T](f: () T): Option[T] =
@ -194,7 +198,7 @@ private[stream] object Collect {
/**
* INTERNAL API
*/
final case class Collect[In, Out](pf: PartialFunction[In, Out]) extends GraphStage[FlowShape[In, Out]] {
@InternalApi private[akka] final case class Collect[In, Out](pf: PartialFunction[In, Out]) extends GraphStage[FlowShape[In, Out]] {
val in = Inlet[In]("Collect.in")
val out = Outlet[Out]("Collect.out")
override val shape = FlowShape(in, out)
@ -228,7 +232,7 @@ final case class Collect[In, Out](pf: PartialFunction[In, Out]) extends GraphSta
/**
* INTERNAL API
*/
final case class Recover[T](pf: PartialFunction[Throwable, T]) extends SimpleLinearGraphStage[T] {
@InternalApi private[akka] final case class Recover[T](pf: PartialFunction[Throwable, T]) extends SimpleLinearGraphStage[T] {
override protected def initialAttributes: Attributes = DefaultAttributes.recover
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
@ -276,7 +280,7 @@ final case class Recover[T](pf: PartialFunction[Throwable, T]) extends SimpleLin
* it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover
* would log the `t2` error.
*/
final case class MapError[T](f: PartialFunction[Throwable, Throwable]) extends SimpleLinearGraphStage[T] {
@InternalApi private[akka] final case class MapError[T](f: PartialFunction[Throwable, Throwable]) extends SimpleLinearGraphStage[T] {
override def createLogic(attr: Attributes) =
new GraphStageLogic(shape) with InHandler with OutHandler {
override def onPush(): Unit = push(out, grab(in))
@ -294,7 +298,7 @@ final case class MapError[T](f: PartialFunction[Throwable, Throwable]) extends S
/**
* INTERNAL API
*/
final case class Take[T](count: Long) extends SimpleLinearGraphStage[T] {
@InternalApi private[akka] final case class Take[T](count: Long) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.take
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
@ -322,7 +326,7 @@ final case class Take[T](count: Long) extends SimpleLinearGraphStage[T] {
/**
* INTERNAL API
*/
final case class Drop[T](count: Long) extends SimpleLinearGraphStage[T] {
@InternalApi private[akka] final case class Drop[T](count: Long) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.drop
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
@ -346,7 +350,7 @@ final case class Drop[T](count: Long) extends SimpleLinearGraphStage[T] {
/**
* INTERNAL API
*/
final case class Scan[In, Out](zero: Out, f: (Out, In) Out) extends GraphStage[FlowShape[In, Out]] {
@InternalApi private[akka] final case class Scan[In, Out](zero: Out, f: (Out, In) Out) extends GraphStage[FlowShape[In, Out]] {
override val shape = FlowShape[In, Out](Inlet("Scan.in"), Outlet("Scan.out"))
override def initialAttributes: Attributes = DefaultAttributes.scan
@ -404,7 +408,7 @@ final case class Scan[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends GraphSta
/**
* INTERNAL API
*/
final case class ScanAsync[In, Out](zero: Out, f: (Out, In) Future[Out]) extends GraphStage[FlowShape[In, Out]] {
@InternalApi private[akka] final case class ScanAsync[In, Out](zero: Out, f: (Out, In) Future[Out]) extends GraphStage[FlowShape[In, Out]] {
import akka.dispatch.ExecutionContexts
@ -512,7 +516,7 @@ final case class ScanAsync[In, Out](zero: Out, f: (Out, In) ⇒ Future[Out]) ext
/**
* INTERNAL API
*/
final case class Fold[In, Out](zero: Out, f: (Out, In) Out) extends GraphStage[FlowShape[In, Out]] {
@InternalApi private[akka] final case class Fold[In, Out](zero: Out, f: (Out, In) Out) extends GraphStage[FlowShape[In, Out]] {
val in = Inlet[In]("Fold.in")
val out = Outlet[Out]("Fold.out")
@ -567,7 +571,7 @@ final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends GraphSta
/**
* INTERNAL API
*/
final class FoldAsync[In, Out](zero: Out, f: (Out, In) Future[Out]) extends GraphStage[FlowShape[In, Out]] {
@InternalApi private[akka] final class FoldAsync[In, Out](zero: Out, f: (Out, In) Future[Out]) extends GraphStage[FlowShape[In, Out]] {
import akka.dispatch.ExecutionContexts
@ -662,7 +666,7 @@ final class FoldAsync[In, Out](zero: Out, f: (Out, In) ⇒ Future[Out]) extends
/**
* INTERNAL API
*/
final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) extends SimpleLinearGraphStage[T] {
@InternalApi private[akka] final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) extends SimpleLinearGraphStage[T] {
ReactiveStreamsCompliance.requireNonNullElement(inject)
if (start.isDefined) ReactiveStreamsCompliance.requireNonNullElement(start.get)
if (end.isDefined) ReactiveStreamsCompliance.requireNonNullElement(end.get)
@ -701,7 +705,7 @@ final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) ext
/**
* INTERNAL API
*/
final case class Grouped[T](n: Int) extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
@InternalApi private[akka] final case class Grouped[T](n: Int) extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
require(n > 0, "n must be greater than 0")
val in = Inlet[T]("Grouped.in")
@ -755,7 +759,7 @@ final case class Grouped[T](n: Int) extends GraphStage[FlowShape[T, immutable.Se
/**
* INTERNAL API
*/
final case class LimitWeighted[T](val n: Long, val costFn: T Long) extends SimpleLinearGraphStage[T] {
@InternalApi private[akka] final case class LimitWeighted[T](val n: Long, val costFn: T Long) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.limitWeighted
def createLogic(inheritedAttributes: Attributes) = new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler {
@ -789,7 +793,7 @@ final case class LimitWeighted[T](val n: Long, val costFn: T ⇒ Long) extends S
/**
* INTERNAL API
*/
final case class Sliding[T](val n: Int, val step: Int) extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
@InternalApi private[akka] final case class Sliding[T](val n: Int, val step: Int) extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
require(n > 0, "n must be greater than 0")
require(step > 0, "step must be greater than 0")
@ -847,7 +851,7 @@ final case class Sliding[T](val n: Int, val step: Int) extends GraphStage[FlowSh
/**
* INTERNAL API
*/
final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends SimpleLinearGraphStage[T] {
@InternalApi private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends SimpleLinearGraphStage[T] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
@ -920,7 +924,7 @@ final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extend
/**
* INTERNAL API
*/
final case class Batch[In, Out](val max: Long, val costFn: In Long, val seed: In Out, val aggregate: (Out, In) Out)
@InternalApi private[akka] final case class Batch[In, Out](val max: Long, val costFn: In Long, val seed: In Out, val aggregate: (Out, In) Out)
extends GraphStage[FlowShape[In, Out]] {
val in = Inlet[In]("Batch.in")
@ -1042,7 +1046,7 @@ final case class Batch[In, Out](val max: Long, val costFn: In ⇒ Long, val seed
/**
* INTERNAL API
*/
final class Expand[In, Out](val extrapolate: In Iterator[Out]) extends GraphStage[FlowShape[In, Out]] {
@InternalApi private[akka] final class Expand[In, Out](val extrapolate: In Iterator[Out]) extends GraphStage[FlowShape[In, Out]] {
private val in = Inlet[In]("expand.in")
private val out = Outlet[Out]("expand.out")
@ -1096,7 +1100,7 @@ final class Expand[In, Out](val extrapolate: In ⇒ Iterator[Out]) extends Graph
/**
* INTERNAL API
*/
private[akka] object MapAsync {
@InternalApi private[akka] object MapAsync {
final class Holder[T](var elem: Try[T], val cb: AsyncCallback[Holder[T]]) extends (Try[T] Unit) {
def setElem(t: Try[T]): Unit =
@ -1117,7 +1121,7 @@ private[akka] object MapAsync {
/**
* INTERNAL API
*/
final case class MapAsync[In, Out](parallelism: Int, f: In Future[Out])
@InternalApi private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In Future[Out])
extends GraphStage[FlowShape[In, Out]] {
import MapAsync._
@ -1195,7 +1199,7 @@ final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Future[Out])
/**
* INTERNAL API
*/
final case class MapAsyncUnordered[In, Out](parallelism: Int, f: In Future[Out])
@InternalApi private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: In Future[Out])
extends GraphStage[FlowShape[In, Out]] {
private val in = Inlet[In]("MapAsyncUnordered.in")
@ -1273,7 +1277,7 @@ final case class MapAsyncUnordered[In, Out](parallelism: Int, f: In ⇒ Future[O
/**
* INTERNAL API
*/
final case class Log[T](
@InternalApi private[akka] final case class Log[T](
name: String,
extract: T Any,
logAdapter: Option[LoggingAdapter]) extends SimpleLinearGraphStage[T] {
@ -1357,7 +1361,7 @@ final case class Log[T](
/**
* INTERNAL API
*/
private[akka] object Log {
@InternalApi private[akka] object Log {
/**
* Must be located here to be visible for implicit resolution, when [[Materializer]] is passed to [[Logging]]
@ -1385,7 +1389,7 @@ private[akka] object Log {
/**
* INTERNAL API
*/
private[stream] object TimerKeys {
@InternalApi private[stream] object TimerKeys {
case object TakeWithinTimerKey
@ -1395,7 +1399,10 @@ private[stream] object TimerKeys {
}
final class GroupedWithin[T](val n: Int, val d: FiniteDuration) extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
/**
* INTERNAL API
*/
@InternalApi private[akka] final class GroupedWithin[T](val n: Int, val d: FiniteDuration) extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
require(n > 0, "n must be greater than 0")
require(d > Duration.Zero)
@ -1473,7 +1480,10 @@ final class GroupedWithin[T](val n: Int, val d: FiniteDuration) extends GraphSta
}
}
final class Delay[T](val d: FiniteDuration, val strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] {
/**
* INTERNAL API
*/
@InternalApi private[akka] final class Delay[T](val d: FiniteDuration, val strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] {
private[this] def timerName = "DelayedTimer"
override def initialAttributes: Attributes = DefaultAttributes.delay
@ -1587,7 +1597,10 @@ final class Delay[T](val d: FiniteDuration, val strategy: DelayOverflowStrategy)
override def toString = "Delay"
}
final class TakeWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
/**
* INTERNAL API
*/
@InternalApi private[akka] final class TakeWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler {
def onPush(): Unit = push(out, grab(in))
@ -1604,7 +1617,10 @@ final class TakeWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraph
override def toString = "TakeWithin"
}
final class DropWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
/**
* INTERNAL API
*/
@InternalApi private[akka] final class DropWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
private val startNanoTime = System.nanoTime()
@ -1634,7 +1650,7 @@ final class DropWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraph
/**
* INTERNAL API
*/
final class Reduce[T](val f: (T, T) T) extends SimpleLinearGraphStage[T] {
@InternalApi private[akka] final class Reduce[T](val f: (T, T) T) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.reduce
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
@ -1695,11 +1711,11 @@ final class Reduce[T](val f: (T, T) ⇒ T) extends SimpleLinearGraphStage[T] {
/**
* INTERNAL API
*/
private[stream] object RecoverWith {
@InternalApi private[stream] object RecoverWith {
val InfiniteRetries = -1
}
final class RecoverWith[T, M](val maximumRetries: Int, val pf: PartialFunction[Throwable, Graph[SourceShape[T], M]]) extends SimpleLinearGraphStage[T] {
@InternalApi private[akka] final class RecoverWith[T, M](val maximumRetries: Int, val pf: PartialFunction[Throwable, Graph[SourceShape[T], M]]) extends SimpleLinearGraphStage[T] {
require(maximumRetries >= -1, "number of retries must be non-negative or equal to -1")
override def initialAttributes = DefaultAttributes.recoverWith
@ -1753,7 +1769,7 @@ final class RecoverWith[T, M](val maximumRetries: Int, val pf: PartialFunction[T
/**
* INTERNAL API
*/
final class StatefulMapConcat[In, Out](val f: () In immutable.Iterable[Out]) extends GraphStage[FlowShape[In, Out]] {
@InternalApi private[akka] final class StatefulMapConcat[In, Out](val f: () In immutable.Iterable[Out]) extends GraphStage[FlowShape[In, Out]] {
val in = Inlet[In]("StatefulMapConcat.in")
val out = Outlet[Out]("StatefulMapConcat.out")
override val shape = FlowShape(in, out)

View file

@ -4,7 +4,9 @@
package akka.stream.impl.fusing
import java.util.concurrent.atomic.AtomicReference
import akka.NotUsed
import akka.annotation.InternalApi
import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream._
import akka.stream.impl.Stages.DefaultAttributes
@ -12,19 +14,21 @@ import akka.stream.impl.SubscriptionTimeoutException
import akka.stream.stage._
import akka.stream.scaladsl._
import akka.stream.actor.ActorSubscriberMessage
import scala.collection.{ mutable, immutable }
import scala.collection.{ immutable, mutable }
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal
import scala.annotation.tailrec
import akka.stream.impl.PublisherSource
import akka.stream.impl.CancellingSubscriber
import akka.stream.impl.{ Buffer BufferImpl }
import scala.collection.JavaConversions._
/**
* INTERNAL API
*/
final class FlattenMerge[T, M](val breadth: Int) extends GraphStage[FlowShape[Graph[SourceShape[T], M], T]] {
@InternalApi private[akka] final class FlattenMerge[T, M](val breadth: Int) extends GraphStage[FlowShape[Graph[SourceShape[T], M], T]] {
private val in = Inlet[Graph[SourceShape[T], M]]("flatten.in")
private val out = Outlet[T]("flatten.out")
@ -103,7 +107,7 @@ final class FlattenMerge[T, M](val breadth: Int) extends GraphStage[FlowShape[Gr
/**
* INTERNAL API
*/
final class PrefixAndTail[T](val n: Int) extends GraphStage[FlowShape[T, (immutable.Seq[T], Source[T, NotUsed])]] {
@InternalApi private[akka] final class PrefixAndTail[T](val n: Int) extends GraphStage[FlowShape[T, (immutable.Seq[T], Source[T, NotUsed])]] {
val in: Inlet[T] = Inlet("PrefixAndTail.in")
val out: Outlet[(immutable.Seq[T], Source[T, NotUsed])] = Outlet("PrefixAndTail.out")
override val shape: FlowShape[T, (immutable.Seq[T], Source[T, NotUsed])] = FlowShape(in, out)
@ -211,7 +215,7 @@ final class PrefixAndTail[T](val n: Int) extends GraphStage[FlowShape[T, (immuta
/**
* INTERNAL API
*/
final class GroupBy[T, K](val maxSubstreams: Int, val keyFor: T K) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] {
@InternalApi private[akka] final class GroupBy[T, K](val maxSubstreams: Int, val keyFor: T K) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] {
val in: Inlet[T] = Inlet("GroupBy.in")
val out: Outlet[Source[T, NotUsed]] = Outlet("GroupBy.out")
override val shape: FlowShape[T, Source[T, NotUsed]] = FlowShape(in, out)
@ -384,7 +388,7 @@ final class GroupBy[T, K](val maxSubstreams: Int, val keyFor: T ⇒ K) extends G
/**
* INTERNAL API
*/
object Split {
@InternalApi private[akka] object Split {
sealed abstract class SplitDecision
/** Splits before the current element. The current element will be the first element in the new substream. */
@ -403,7 +407,7 @@ object Split {
/**
* INTERNAL API
*/
final class Split[T](val decision: Split.SplitDecision, val p: T Boolean, val substreamCancelStrategy: SubstreamCancelStrategy) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] {
@InternalApi private[akka] final class Split[T](val decision: Split.SplitDecision, val p: T Boolean, val substreamCancelStrategy: SubstreamCancelStrategy) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] {
val in: Inlet[T] = Inlet("Split.in")
val out: Outlet[Source[T, NotUsed]] = Outlet("Split.out")
override val shape: FlowShape[T, Source[T, NotUsed]] = FlowShape(in, out)
@ -572,7 +576,7 @@ final class Split[T](val decision: Split.SplitDecision, val p: T ⇒ Boolean, va
/**
* INTERNAL API
*/
private[stream] object SubSink {
@InternalApi private[stream] object SubSink {
sealed trait State
/** Not yet materialized and no command has been scheduled */
case object Uninitialized extends State
@ -598,7 +602,7 @@ private[stream] object SubSink {
/**
* INTERNAL API
*/
private[stream] final class SubSink[T](name: String, externalCallback: ActorSubscriberMessage Unit)
@InternalApi private[stream] final class SubSink[T](name: String, externalCallback: ActorSubscriberMessage Unit)
extends GraphStage[SinkShape[T]] {
import SubSink._
@ -668,7 +672,7 @@ private[stream] final class SubSink[T](name: String, externalCallback: ActorSubs
/**
* INTERNAL API
*/
final class SubSource[T](name: String, private[fusing] val externalCallback: AsyncCallback[SubSink.Command])
@InternalApi private[akka] final class SubSource[T](name: String, private[fusing] val externalCallback: AsyncCallback[SubSink.Command])
extends GraphStage[SourceShape[T]] {
import SubSink._

View file

@ -3,6 +3,7 @@
*/
package akka.stream.impl.io
import akka.annotation.InternalApi
import akka.stream._
import akka.stream.stage._
import akka.util.ByteString
@ -13,7 +14,7 @@ import scala.util.control.{ NoStackTrace, NonFatal }
/**
* INTERNAL API
*/
private[akka] abstract class ByteStringParser[T] extends GraphStage[FlowShape[ByteString, T]] {
@InternalApi private[akka] abstract class ByteStringParser[T] extends GraphStage[FlowShape[ByteString, T]] {
import ByteStringParser._
private val bytesIn = Inlet[ByteString]("bytesIn")
@ -139,7 +140,7 @@ private[akka] abstract class ByteStringParser[T] extends GraphStage[FlowShape[By
/**
* INTERNAL API
*/
private[akka] object ByteStringParser {
@InternalApi private[akka] object ByteStringParser {
val CompactionThreshold = 16

View file

@ -8,7 +8,8 @@ import java.nio.channels.FileChannel
import java.nio.file.Path
import akka.Done
import akka.actor.{ Deploy, ActorLogging, DeadLetterSuppression, Props }
import akka.actor.{ ActorLogging, DeadLetterSuppression, Deploy, Props }
import akka.annotation.InternalApi
import akka.stream.actor.ActorPublisherMessage
import akka.stream.IOResult
import akka.util.ByteString
@ -19,7 +20,7 @@ import scala.util.{ Failure, Success }
import scala.util.control.NonFatal
/** INTERNAL API */
private[akka] object FilePublisher {
@InternalApi private[akka] object FilePublisher {
def props(f: Path, completionPromise: Promise[IOResult], chunkSize: Int, initialBuffer: Int, maxBuffer: Int) = {
require(chunkSize > 0, s"chunkSize must be > 0 (was $chunkSize)")
require(initialBuffer > 0, s"initialBuffer must be > 0 (was $initialBuffer)")
@ -35,7 +36,7 @@ private[akka] object FilePublisher {
}
/** INTERNAL API */
private[akka] final class FilePublisher(f: Path, completionPromise: Promise[IOResult], chunkSize: Int, initialBuffer: Int, maxBuffer: Int)
@InternalApi private[akka] final class FilePublisher(f: Path, completionPromise: Promise[IOResult], chunkSize: Int, initialBuffer: Int, maxBuffer: Int)
extends akka.stream.actor.ActorPublisher[ByteString] with ActorLogging {
import FilePublisher._

View file

@ -7,7 +7,8 @@ import java.nio.channels.FileChannel
import java.nio.file.{ Path, StandardOpenOption }
import akka.Done
import akka.actor.{ Deploy, ActorLogging, Props }
import akka.actor.{ ActorLogging, Deploy, Props }
import akka.annotation.InternalApi
import akka.stream.IOResult
import akka.stream.actor.{ ActorSubscriberMessage, WatermarkRequestStrategy }
import akka.util.ByteString
@ -17,7 +18,7 @@ import scala.concurrent.Promise
import scala.util.{ Failure, Success }
/** INTERNAL API */
private[akka] object FileSubscriber {
@InternalApi private[akka] object FileSubscriber {
def props(f: Path, completionPromise: Promise[IOResult], bufSize: Int, openOptions: Set[StandardOpenOption]) = {
require(bufSize > 0, "buffer size must be > 0")
Props(classOf[FileSubscriber], f, completionPromise, bufSize, openOptions).withDeploy(Deploy.local)
@ -25,7 +26,7 @@ private[akka] object FileSubscriber {
}
/** INTERNAL API */
private[akka] class FileSubscriber(f: Path, completionPromise: Promise[IOResult], bufSize: Int, openOptions: Set[StandardOpenOption])
@InternalApi private[akka] class FileSubscriber(f: Path, completionPromise: Promise[IOResult], bufSize: Int, openOptions: Set[StandardOpenOption])
extends akka.stream.actor.ActorSubscriber
with ActorLogging {

View file

@ -6,6 +6,7 @@ package akka.stream.impl.io
import java.io.OutputStream
import java.nio.file.{ Path, StandardOpenOption }
import akka.annotation.InternalApi
import akka.stream._
import akka.stream.impl.SinkModule
import akka.stream.impl.Stages.DefaultAttributes.IODispatcher
@ -19,7 +20,7 @@ import scala.concurrent.{ Future, Promise }
* Creates simple synchronous Sink which writes all incoming elements to the given file
* (creating it before hand if necessary).
*/
private[akka] final class FileSink(f: Path, options: Set[StandardOpenOption], val attributes: Attributes, shape: SinkShape[ByteString])
@InternalApi private[akka] final class FileSink(f: Path, options: Set[StandardOpenOption], val attributes: Attributes, shape: SinkShape[ByteString])
extends SinkModule[ByteString, Future[IOResult]](shape) {
override protected def label: String = s"FileSink($f, $options)"
@ -47,7 +48,7 @@ private[akka] final class FileSink(f: Path, options: Set[StandardOpenOption], va
* INTERNAL API
* Creates simple synchronous Sink which writes all incoming elements to the output stream.
*/
private[akka] final class OutputStreamSink(createOutput: () OutputStream, val attributes: Attributes, shape: SinkShape[ByteString], autoFlush: Boolean)
@InternalApi private[akka] final class OutputStreamSink(createOutput: () OutputStream, val attributes: Attributes, shape: SinkShape[ByteString], autoFlush: Boolean)
extends SinkModule[ByteString, Future[IOResult]](shape) {
override def create(context: MaterializationContext) = {

View file

@ -6,6 +6,7 @@ package akka.stream.impl.io
import java.io.InputStream
import java.nio.file.Path
import akka.annotation.InternalApi
import akka.stream._
import akka.stream.ActorAttributes.Dispatcher
import akka.stream.IOResult
@ -13,13 +14,14 @@ import akka.stream.impl.Stages.DefaultAttributes.IODispatcher
import akka.stream.impl.{ ErrorPublisher, SourceModule }
import akka.util.ByteString
import org.reactivestreams._
import scala.concurrent.{ Future, Promise }
/**
* INTERNAL API
* Creates simple synchronous Source backed by the given file.
*/
private[akka] final class FileSource(f: Path, chunkSize: Int, val attributes: Attributes, shape: SourceShape[ByteString])
@InternalApi private[akka] final class FileSource(f: Path, chunkSize: Int, val attributes: Attributes, shape: SourceShape[ByteString])
extends SourceModule[ByteString, Future[IOResult]](shape) {
require(chunkSize > 0, "chunkSize must be greater than 0")
override def create(context: MaterializationContext) = {
@ -49,7 +51,7 @@ private[akka] final class FileSource(f: Path, chunkSize: Int, val attributes: At
* INTERNAL API
* Source backed by the given input stream.
*/
private[akka] final class InputStreamSource(createInputStream: () InputStream, chunkSize: Int, val attributes: Attributes, shape: SourceShape[ByteString])
@InternalApi private[akka] final class InputStreamSource(createInputStream: () InputStream, chunkSize: Int, val attributes: Attributes, shape: SourceShape[ByteString])
extends SourceModule[ByteString, Future[IOResult]](shape) {
override def create(context: MaterializationContext) = {
val materializer = ActorMaterializerHelper.downcast(context.materializer)

View file

@ -6,7 +6,8 @@ package akka.stream.impl.io
import java.io.InputStream
import akka.Done
import akka.actor.{ Deploy, ActorLogging, DeadLetterSuppression, Props }
import akka.actor.{ ActorLogging, DeadLetterSuppression, Deploy, Props }
import akka.annotation.InternalApi
import akka.stream.actor.ActorPublisherMessage
import akka.stream.IOResult
import akka.util.ByteString
@ -15,7 +16,7 @@ import scala.concurrent.Promise
import scala.util.{ Failure, Success }
/** INTERNAL API */
private[akka] object InputStreamPublisher {
@InternalApi private[akka] object InputStreamPublisher {
def props(is: InputStream, completionPromise: Promise[IOResult], chunkSize: Int): Props = {
require(chunkSize > 0, s"chunkSize must be > 0 (was $chunkSize)")
@ -27,7 +28,7 @@ private[akka] object InputStreamPublisher {
}
/** INTERNAL API */
private[akka] class InputStreamPublisher(is: InputStream, completionPromise: Promise[IOResult], chunkSize: Int)
@InternalApi private[akka] class InputStreamPublisher(is: InputStream, completionPromise: Promise[IOResult], chunkSize: Int)
extends akka.stream.actor.ActorPublisher[ByteString]
with ActorLogging {

View file

@ -6,6 +6,7 @@ package akka.stream.impl.io
import java.io.{ IOException, InputStream }
import java.util.concurrent.{ BlockingQueue, LinkedBlockingDeque, TimeUnit }
import akka.annotation.InternalApi
import akka.stream.Attributes.InputBuffer
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.io.InputStreamSinkStage._
@ -36,7 +37,7 @@ private[stream] object InputStreamSinkStage {
/**
* INTERNAL API
*/
final private[stream] class InputStreamSinkStage(readTimeout: FiniteDuration) extends GraphStageWithMaterializedValue[SinkShape[ByteString], InputStream] {
@InternalApi final private[stream] class InputStreamSinkStage(readTimeout: FiniteDuration) extends GraphStageWithMaterializedValue[SinkShape[ByteString], InputStream] {
val in = Inlet[ByteString]("InputStreamSink.in")
override def initialAttributes: Attributes = DefaultAttributes.inputStreamSink
@ -95,7 +96,7 @@ final private[stream] class InputStreamSinkStage(readTimeout: FiniteDuration) ex
* INTERNAL API
* InputStreamAdapter that interacts with InputStreamSinkStage
*/
private[akka] class InputStreamAdapter(
@InternalApi private[akka] class InputStreamAdapter(
sharedBuffer: BlockingQueue[StreamToAdapterMessage],
sendToStage: (AdapterToStageMessage) Unit,
readTimeout: FiniteDuration)

View file

@ -6,7 +6,8 @@ package akka.stream.impl.io
import java.io.OutputStream
import akka.Done
import akka.actor.{ Deploy, ActorLogging, Props }
import akka.actor.{ ActorLogging, Deploy, Props }
import akka.annotation.InternalApi
import akka.stream.actor.{ ActorSubscriberMessage, WatermarkRequestStrategy }
import akka.stream.IOResult
import akka.util.ByteString
@ -15,7 +16,7 @@ import scala.concurrent.Promise
import scala.util.{ Failure, Success }
/** INTERNAL API */
private[akka] object OutputStreamSubscriber {
@InternalApi private[akka] object OutputStreamSubscriber {
def props(os: OutputStream, completionPromise: Promise[IOResult], bufSize: Int, autoFlush: Boolean) = {
require(bufSize > 0, "buffer size must be > 0")
Props(classOf[OutputStreamSubscriber], os, completionPromise, bufSize, autoFlush).withDeploy(Deploy.local)
@ -24,7 +25,7 @@ private[akka] object OutputStreamSubscriber {
}
/** INTERNAL API */
private[akka] class OutputStreamSubscriber(os: OutputStream, completionPromise: Promise[IOResult], bufSize: Int, autoFlush: Boolean)
@InternalApi private[akka] class OutputStreamSubscriber(os: OutputStream, completionPromise: Promise[IOResult], bufSize: Int, autoFlush: Boolean)
extends akka.stream.actor.ActorSubscriber
with ActorLogging {

View file

@ -10,6 +10,7 @@ import javax.net.ssl.SSLEngineResult.Status._
import javax.net.ssl._
import akka.actor._
import akka.annotation.InternalApi
import akka.stream._
import akka.stream.impl.FanIn.InputBunch
import akka.stream.impl.FanOut.OutputBunch
@ -25,7 +26,7 @@ import scala.util.{ Failure, Success, Try }
/**
* INTERNAL API.
*/
private[stream] object TLSActor {
@InternalApi private[stream] object TLSActor {
def props(
settings: ActorMaterializerSettings,
@ -45,7 +46,7 @@ private[stream] object TLSActor {
/**
* INTERNAL API.
*/
private[stream] class TLSActor(
@InternalApi private[stream] class TLSActor(
settings: ActorMaterializerSettings,
createSSLEngine: ActorSystem SSLEngine, // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753
verifySession: (ActorSystem, SSLSession) Try[Unit], // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753
@ -455,7 +456,7 @@ private[stream] class TLSActor(
/**
* INTERNAL API
*/
private[stream] object TlsUtils {
@InternalApi private[stream] object TlsUtils {
def applySessionParameters(engine: SSLEngine, sessionParameters: NegotiateNewSession): Unit = {
sessionParameters.enabledCipherSuites foreach (cs engine.setEnabledCipherSuites(cs.toArray))
sessionParameters.enabledProtocols foreach (p engine.setEnabledProtocols(p.toArray))

View file

@ -9,6 +9,7 @@ import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong }
import akka.NotUsed
import akka.actor.{ ActorRef, Terminated }
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.io.Inet.SocketOption
import akka.io.Tcp
@ -29,7 +30,7 @@ import scala.util.Try
/**
* INTERNAL API
*/
private[stream] class ConnectionSourceStage(
@InternalApi private[stream] class ConnectionSourceStage(
val tcpManager: ActorRef,
val endpoint: InetSocketAddress,
val backlog: Int,
@ -167,7 +168,7 @@ private[stream] object ConnectionSourceStage {
/**
* INTERNAL API
*/
private[stream] object TcpConnectionStage {
@InternalApi private[stream] object TcpConnectionStage {
case object WriteAck extends Tcp.Event
trait TcpRole {
@ -309,7 +310,7 @@ private[stream] object TcpConnectionStage {
/**
* INTERNAL API
*/
class IncomingConnectionStage(connection: ActorRef, remoteAddress: InetSocketAddress, halfClose: Boolean)
@InternalApi private[akka] class IncomingConnectionStage(connection: ActorRef, remoteAddress: InetSocketAddress, halfClose: Boolean)
extends GraphStage[FlowShape[ByteString, ByteString]] {
import TcpConnectionStage._
@ -333,7 +334,7 @@ class IncomingConnectionStage(connection: ActorRef, remoteAddress: InetSocketAdd
/**
* INTERNAL API
*/
private[stream] class OutgoingConnectionStage(
@InternalApi private[stream] class OutgoingConnectionStage(
manager: ActorRef,
remoteAddress: InetSocketAddress,
localAddress: Option[InetSocketAddress] = None,
@ -371,7 +372,7 @@ private[stream] class OutgoingConnectionStage(
}
/** INTERNAL API */
private[akka] object TcpIdleTimeout {
@InternalApi private[akka] object TcpIdleTimeout {
def apply(idleTimeout: FiniteDuration, remoteAddress: Option[InetSocketAddress]): BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] = {
val connectionToString = remoteAddress match {
case Some(addr) s" on connection to [$addr]"

View file

@ -4,6 +4,7 @@ import javax.net.ssl.{ SSLEngine, SSLSession }
import akka.NotUsed
import akka.actor.ActorSystem
import akka.annotation.InternalApi
import akka.stream._
import akka.stream.impl.StreamLayout.AtomicModule
import akka.stream.TLSProtocol._
@ -15,7 +16,7 @@ import scala.util.Try
/**
* INTERNAL API.
*/
private[stream] final case class TlsModule(plainIn: Inlet[SslTlsOutbound], plainOut: Outlet[SslTlsInbound],
@InternalApi private[stream] final case class TlsModule(plainIn: Inlet[SslTlsOutbound], plainOut: Outlet[SslTlsInbound],
cipherIn: Inlet[ByteString], cipherOut: Outlet[ByteString],
shape: BidiShape[SslTlsOutbound, ByteString, ByteString, SslTlsInbound],
attributes: Attributes,
@ -34,7 +35,7 @@ private[stream] final case class TlsModule(plainIn: Inlet[SslTlsOutbound], plain
/**
* INTERNAL API.
*/
private[stream] object TlsModule {
@InternalApi private[stream] object TlsModule {
def apply(
attributes: Attributes,
createSSLEngine: ActorSystem SSLEngine, // ActorSystem is only needed to support the AkkaSSLConfig legacy, see #21753

View file

@ -4,6 +4,7 @@
package akka.stream.impl.io.compression
import akka.NotUsed
import akka.annotation.InternalApi
import akka.stream.{ Attributes, FlowShape }
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.scaladsl.Flow
@ -11,7 +12,7 @@ import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.util.ByteString
/** INTERNAL API */
private[stream] object CompressionUtils {
@InternalApi private[stream] object CompressionUtils {
/**
* Creates a flow from a compressor constructor.
*/

View file

@ -3,6 +3,7 @@
*/
package akka.stream.impl.io.compression
import akka.annotation.InternalApi
import akka.util.ByteString
/**
@ -10,7 +11,7 @@ import akka.util.ByteString
*
* A stateful object representing ongoing compression.
*/
private[akka] abstract class Compressor {
@InternalApi private[akka] abstract class Compressor {
/**
* Compresses the given input and returns compressed data. The implementation
* can and will choose to buffer output data to improve compression. Use

View file

@ -5,12 +5,13 @@ package akka.stream.impl.io.compression
import java.util.zip.Deflater
import akka.annotation.InternalApi
import akka.util.{ ByteString, ByteStringBuilder }
import scala.annotation.tailrec
/** INTERNAL API */
private[akka] class DeflateCompressor extends Compressor {
@InternalApi private[akka] class DeflateCompressor extends Compressor {
import DeflateCompressor._
protected lazy val deflater = new Deflater(Deflater.BEST_COMPRESSION, false)
@ -62,7 +63,7 @@ private[akka] class DeflateCompressor extends Compressor {
}
/** INTERNAL API */
private[akka] object DeflateCompressor {
@InternalApi private[akka] object DeflateCompressor {
val MinBufferSize = 1024
@tailrec

View file

@ -5,10 +5,11 @@ package akka.stream.impl.io.compression
import java.util.zip.Inflater
import akka.annotation.InternalApi
import akka.stream.Attributes
/** INTERNAL API */
private[akka] class DeflateDecompressor(maxBytesPerChunk: Int)
@InternalApi private[akka] class DeflateDecompressor(maxBytesPerChunk: Int)
extends DeflateDecompressorBase(maxBytesPerChunk) {
override def createLogic(attr: Attributes) = new DecompressorParsingLogic {

View file

@ -5,12 +5,13 @@ package akka.stream.impl.io.compression
import java.util.zip.Inflater
import akka.annotation.InternalApi
import akka.stream.impl.io.ByteStringParser
import akka.stream.impl.io.ByteStringParser.{ ParseResult, ParseStep }
import akka.util.ByteString
/** INTERNAL API */
private[akka] abstract class DeflateDecompressorBase(maxBytesPerChunk: Int)
@InternalApi private[akka] abstract class DeflateDecompressorBase(maxBytesPerChunk: Int)
extends ByteStringParser[ByteString] {
abstract class DecompressorParsingLogic extends ParsingLogic {
@ -45,4 +46,4 @@ private[akka] abstract class DeflateDecompressorBase(maxBytesPerChunk: Int)
}
/** INTERNAL API */
private[akka] object DeflateDecompressorBase
@InternalApi private[akka] object DeflateDecompressorBase

View file

@ -5,10 +5,11 @@ package akka.stream.impl.io.compression
import java.util.zip.{ CRC32, Deflater }
import akka.annotation.InternalApi
import akka.util.ByteString
/** INTERNAL API */
private[akka] class GzipCompressor extends DeflateCompressor {
@InternalApi private[akka] class GzipCompressor extends DeflateCompressor {
override protected lazy val deflater = new Deflater(Deflater.BEST_COMPRESSION, true)
private val checkSum = new CRC32 // CRC32 of uncompressed data
private var headerSent = false

View file

@ -5,13 +5,14 @@ package akka.stream.impl.io.compression
import java.util.zip.{ CRC32, Inflater, ZipException }
import akka.annotation.InternalApi
import akka.stream.Attributes
import akka.stream.impl.io.ByteStringParser
import akka.stream.impl.io.ByteStringParser.{ ParseResult, ParseStep }
import akka.util.ByteString
/** INTERNAL API */
private[akka] class GzipDecompressor(maxBytesPerChunk: Int)
@InternalApi private[akka] class GzipDecompressor(maxBytesPerChunk: Int)
extends DeflateDecompressorBase(maxBytesPerChunk) {
override def createLogic(attr: Attributes) = new DecompressorParsingLogic {
@ -66,7 +67,7 @@ private[akka] class GzipDecompressor(maxBytesPerChunk: Int)
}
/** INTERNAL API */
private[akka] object GzipDecompressor {
@InternalApi private[akka] object GzipDecompressor {
// RFC 1952: http://tools.ietf.org/html/rfc1952 section 2.2
private[impl] val Header = ByteString(
0x1F, // ID1

View file

@ -268,11 +268,11 @@ final class Flow[-In, +Out, +Mat](
object Flow {
private[stream] val identityTraversalBuilder =
LinearTraversalBuilder.fromBuilder(GraphStages.Identity.traversalBuilder, GraphStages.Identity.shape, Keep.right)
LinearTraversalBuilder.fromBuilder(GraphStages.identity.traversalBuilder, GraphStages.identity.shape, Keep.right)
private[this] val identity: Flow[Any, Any, NotUsed] = new Flow[Any, Any, NotUsed](
identityTraversalBuilder,
GraphStages.Identity.shape)
GraphStages.identity.shape)
/**
* Creates a Flow from a Reactive Streams [[org.reactivestreams.Processor]]