diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template index 327b7f04b3..0f811c7f09 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/GraphApply.scala.template @@ -58,8 +58,8 @@ trait GraphApply { /** * INTERNAL API */ -private[stream] object GraphApply { - final class GraphImpl[S <: Shape, Mat](override val shape: S, private[stream] override val module: StreamLayout.Module) +object GraphApply { + final class GraphImpl[S <: Shape, Mat](override val shape: S, override val module: StreamLayout.Module) extends Graph[S, Mat] { override def toString: String = s"Graph($shape, $module)" @@ -70,3 +70,15 @@ private[stream] object GraphApply { override def named(name: String): Graph[S, Mat] = withAttributes(Attributes.name(name)) } } + +/** + * INTERNAL API + */ +object ModuleExtractor { + def unapply[S <: Shape, Mat](graph: Graph[S, Mat]): Option[Module] = graph match { + case module: Module => + Some(module) + case _ => + None + } +} diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template index c53b30f20c..b4824c26d3 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template @@ -23,7 +23,7 @@ trait ZipWithApply { } [2..20#/** `ZipWith` specialized for 1 inputs */ -class ZipWith1[[#A1#], O] (zipper: ([#A1#]) ⇒ O) extends GraphStage[FanInShape1[[#A1#], O]] { +class ZipWith1[[#A1#], O] (val zipper: ([#A1#]) ⇒ O) extends GraphStage[FanInShape1[[#A1#], O]] { override def initialAttributes = Attributes.name("ZipWith1") override val shape: FanInShape1[[#A1#], O] = new FanInShape1[[#A1#], O]("ZipWith1") def out: Outlet[O] = shape.out diff --git a/akka-stream/src/main/scala/akka/stream/Attributes.scala b/akka-stream/src/main/scala/akka/stream/Attributes.scala index e46a53c356..7f1c7d192f 100644 --- a/akka-stream/src/main/scala/akka/stream/Attributes.scala +++ b/akka-stream/src/main/scala/akka/stream/Attributes.scala @@ -143,7 +143,7 @@ final case class Attributes(attributeList: List[Attributes.Attribute] = Nil) { /** * INTERNAL API */ - private[akka] def nameOrDefault(default: String = "unknown-operation"): String = { + def nameOrDefault(default: String = "unknown-operation"): String = { @tailrec def concatNames(i: Iterator[Attribute], first: String, buf: StringBuilder): String = if (i.hasNext) i.next() match { @@ -187,7 +187,7 @@ object Attributes { /** * INTERNAL API */ - private[akka] def apply(attribute: Attribute): Attributes = + def apply(attribute: Attribute): Attributes = apply(List(attribute)) val none: Attributes = Attributes() diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index a3eb77d2f1..8bcda5503c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -42,7 +42,32 @@ abstract class ExtendedActorMaterializer extends ActorMaterializer { /** * INTERNAL API */ - def actorOf(context: MaterializationContext, props: Props): ActorRef + override def actorOf(context: MaterializationContext, props: Props): ActorRef = { + val dispatcher = + if (props.deploy.dispatcher == Deploy.NoDispatcherGiven) effectiveSettings(context.effectiveAttributes).dispatcher + else props.dispatcher + actorOf(props, context.stageName, dispatcher) + } + + /** + * INTERNAL API + */ + protected def actorOf(props: Props, name: String, dispatcher: String): ActorRef = { + supervisor match { + case ref: LocalActorRef ⇒ + ref.underlying.attachChild(props.withDispatcher(dispatcher), name, systemService = false) + case ref: RepointableActorRef ⇒ + if (ref.isStarted) + ref.underlying.asInstanceOf[ActorCell].attachChild(props.withDispatcher(dispatcher), name, systemService = false) + else { + implicit val timeout = ref.system.settings.CreationTimeout + val f = (supervisor ? StreamSupervisor.Materialize(props.withDispatcher(dispatcher), name)).mapTo[ActorRef] + Await.result(f, timeout.duration) + } + case unknown ⇒ + throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${unknown.getClass.getName}]") + } + } /** * INTERNAL API @@ -220,30 +245,6 @@ private[akka] case class ActorMaterializerImpl( case other ⇒ other }) - override def actorOf(context: MaterializationContext, props: Props): ActorRef = { - val dispatcher = - if (props.deploy.dispatcher == Deploy.NoDispatcherGiven) effectiveSettings(context.effectiveAttributes).dispatcher - else props.dispatcher - actorOf(props, context.stageName, dispatcher) - } - - private[akka] def actorOf(props: Props, name: String, dispatcher: String): ActorRef = { - supervisor match { - case ref: LocalActorRef ⇒ - ref.underlying.attachChild(props.withDispatcher(dispatcher), name, systemService = false) - case ref: RepointableActorRef ⇒ - if (ref.isStarted) - ref.underlying.asInstanceOf[ActorCell].attachChild(props.withDispatcher(dispatcher), name, systemService = false) - else { - implicit val timeout = ref.system.settings.CreationTimeout - val f = (supervisor ? StreamSupervisor.Materialize(props.withDispatcher(dispatcher), name)).mapTo[ActorRef] - Await.result(f, timeout.duration) - } - case unknown ⇒ - throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${unknown.getClass.getName}]") - } - } - } private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMaterializer, registerShell: GraphInterpreterShell ⇒ ActorRef) extends Materializer { @@ -263,7 +264,7 @@ private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMa /** * INTERNAL API */ -private[akka] object FlowNames extends ExtensionId[FlowNames] with ExtensionIdProvider { +object FlowNames extends ExtensionId[FlowNames] with ExtensionIdProvider { override def get(system: ActorSystem): FlowNames = super.get(system) override def lookup() = FlowNames override def createExtension(system: ExtendedActorSystem): FlowNames = new FlowNames @@ -272,14 +273,14 @@ private[akka] object FlowNames extends ExtensionId[FlowNames] with ExtensionIdPr /** * INTERNAL API */ -private[akka] class FlowNames extends Extension { +class FlowNames extends Extension { val name = SeqActorName("Flow") } /** * INTERNAL API */ -private[akka] object StreamSupervisor { +object StreamSupervisor { def props(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean): Props = Props(new StreamSupervisor(settings, haveShutDown)).withDeploy(Deploy.local) @@ -301,7 +302,7 @@ private[akka] object StreamSupervisor { case object PrintDebugDump } -private[akka] class StreamSupervisor(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean) extends Actor { +class StreamSupervisor(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean) extends Actor { import akka.stream.impl.StreamSupervisor._ override def supervisorStrategy = SupervisorStrategy.stoppingStrategy diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala index 42d90c4d1c..65b5675b36 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala @@ -14,7 +14,7 @@ import org.reactivestreams.Subscription /** * INTERNAL API */ -private[akka] object ActorPublisher { +object ActorPublisher { val NormalShutdownReasonMessage = "Cannot subscribe to shut-down Publisher" class NormalShutdownException extends IllegalStateException(NormalShutdownReasonMessage) with NoStackTrace val NormalShutdownReason: Throwable = new NormalShutdownException @@ -35,7 +35,7 @@ private[akka] object ActorPublisher { * When you instantiate this class, or its subclasses, you MUST send an ExposedPublisher message to the wrapped * ActorRef! If you don't need to subclass, prefer the apply() method on the companion object which takes care of this. */ -private[akka] class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] { +class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] { import ReactiveStreamsCompliance._ // The subscriber of an subscription attempt is first placed in this list of pending subscribers. diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala index 12d89f459e..822c1e6b13 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala @@ -11,14 +11,14 @@ import org.reactivestreams.{ Subscription, Subscriber } /** * INTERNAL API */ -private[akka] object FanIn { +object FanIn { final case class OnError(id: Int, cause: Throwable) extends DeadLetterSuppression with NoSerializationVerificationNeeded final case class OnComplete(id: Int) extends DeadLetterSuppression with NoSerializationVerificationNeeded final case class OnNext(id: Int, e: Any) extends DeadLetterSuppression with NoSerializationVerificationNeeded final case class OnSubscribe(id: Int, subscription: Subscription) extends DeadLetterSuppression with NoSerializationVerificationNeeded - private[akka] final case class SubInput[T](impl: ActorRef, id: Int) extends Subscriber[T] { + final case class SubInput[T](impl: ActorRef, id: Int) extends Subscriber[T] { override def onError(cause: Throwable): Unit = { ReactiveStreamsCompliance.requireNonNullException(cause) impl ! OnError(id, cause) @@ -252,7 +252,7 @@ private[akka] object FanIn { /** * INTERNAL API */ -private[akka] abstract class FanIn(val settings: ActorMaterializerSettings, val inputCount: Int) extends Actor with ActorLogging with Pump { +abstract class FanIn(val settings: ActorMaterializerSettings, val inputCount: Int) extends Actor with ActorLogging with Pump { import FanIn._ protected val primaryOutputs: Outputs = new SimpleOutputs(self, this) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala index 82b5e57f8c..b61e82d0f1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala @@ -12,7 +12,7 @@ import org.reactivestreams.Subscription /** * INTERNAL API */ -private[akka] object FanOut { +object FanOut { final case class SubstreamRequestMore(id: Int, demand: Long) extends DeadLetterSuppression with NoSerializationVerificationNeeded final case class SubstreamCancel(id: Int) extends DeadLetterSuppression with NoSerializationVerificationNeeded @@ -247,7 +247,7 @@ private[akka] object FanOut { /** * INTERNAL API */ -private[akka] abstract class FanOut(val settings: ActorMaterializerSettings, val outputCount: Int) extends Actor with ActorLogging with Pump { +abstract class FanOut(val settings: ActorMaterializerSettings, val outputCount: Int) extends Actor with ActorLogging with Pump { import FanOut._ protected val outputBunch = new OutputBunch(outputCount, self, this) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala index 21afc53aa4..76eefcffb6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala @@ -15,7 +15,7 @@ import akka.event.Logging /** * INTERNAL API */ -private[akka] abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out]) extends AtomicModule { +abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out]) extends AtomicModule { protected def label: String = Logging.simpleName(this) final override def toString: String = f"$label [${System.identityHashCode(this)}%08x]" @@ -45,7 +45,7 @@ private[akka] abstract class SourceModule[+Out, +Mat](val shape: SourceShape[Out * Holds a `Subscriber` representing the input side of the flow. * The `Subscriber` can later be connected to an upstream `Publisher`. */ -private[akka] final class SubscriberSource[Out](val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, Subscriber[Out]](shape) { +final class SubscriberSource[Out](val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, Subscriber[Out]](shape) { override def create(context: MaterializationContext): (Publisher[Out], Subscriber[Out]) = { val processor = new VirtualProcessor[Out] @@ -63,7 +63,7 @@ private[akka] final class SubscriberSource[Out](val attributes: Attributes, shap * that mediate the flow of elements downstream and the propagation of * back-pressure upstream. */ -private[akka] final class PublisherSource[Out](p: Publisher[Out], val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, NotUsed](shape) { +final class PublisherSource[Out](p: Publisher[Out], val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, NotUsed](shape) { override protected def label: String = s"PublisherSource($p)" @@ -76,7 +76,7 @@ private[akka] final class PublisherSource[Out](p: Publisher[Out], val attributes /** * INTERNAL API */ -private[akka] final class MaybeSource[Out](val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, Promise[Option[Out]]](shape) { +final class MaybeSource[Out](val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, Promise[Option[Out]]](shape) { override def create(context: MaterializationContext) = { val p = Promise[Option[Out]]() @@ -91,7 +91,7 @@ private[akka] final class MaybeSource[Out](val attributes: Attributes, shape: So * Creates and wraps an actor into [[org.reactivestreams.Publisher]] from the given `props`, * which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorPublisher]]. */ -private[akka] final class ActorPublisherSource[Out](props: Props, val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, ActorRef](shape) { +final class ActorPublisherSource[Out](props: Props, val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, ActorRef](shape) { override def create(context: MaterializationContext) = { val publisherRef = ActorMaterializerHelper.downcast(context.materializer).actorOf(context, props) @@ -106,7 +106,7 @@ private[akka] final class ActorPublisherSource[Out](props: Props, val attributes /** * INTERNAL API */ -private[akka] final class ActorRefSource[Out]( +final class ActorRefSource[Out]( bufferSize: Int, overflowStrategy: OverflowStrategy, val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, ActorRef](shape) { diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 60fcee909b..d2f62f25bc 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -38,7 +38,7 @@ import akka.event.Logging /** * INTERNAL API */ -private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) extends AtomicModule { +abstract class SinkModule[-In, Mat](val shape: SinkShape[In]) extends AtomicModule { /** * Create the Subscriber or VirtualPublisher that consumes the incoming @@ -125,7 +125,7 @@ private[akka] final class FanoutPublisherSink[In]( * Attaches a subscriber to this stream which will just discard all received * elements. */ -private[akka] final class SinkholeSink(val attributes: Attributes, shape: SinkShape[Any]) extends SinkModule[Any, Future[Done]](shape) { +final class SinkholeSink(val attributes: Attributes, shape: SinkShape[Any]) extends SinkModule[Any, Future[Done]](shape) { override def create(context: MaterializationContext) = { val effectiveSettings = ActorMaterializerHelper.downcast(context.materializer).effectiveSettings(context.effectiveAttributes) @@ -141,7 +141,7 @@ private[akka] final class SinkholeSink(val attributes: Attributes, shape: SinkSh * INTERNAL API * Attaches a subscriber to this stream. */ -private[akka] final class SubscriberSink[In](subscriber: Subscriber[In], val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, NotUsed](shape) { +final class SubscriberSink[In](subscriber: Subscriber[In], val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, NotUsed](shape) { override def create(context: MaterializationContext) = (subscriber, NotUsed) @@ -153,7 +153,7 @@ private[akka] final class SubscriberSink[In](subscriber: Subscriber[In], val att * INTERNAL API * A sink that immediately cancels its upstream upon materialization. */ -private[akka] final class CancelSink(val attributes: Attributes, shape: SinkShape[Any]) extends SinkModule[Any, NotUsed](shape) { +final class CancelSink(val attributes: Attributes, shape: SinkShape[Any]) extends SinkModule[Any, NotUsed](shape) { override def create(context: MaterializationContext): (Subscriber[Any], NotUsed) = (new CancellingSubscriber[Any], NotUsed) override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, NotUsed] = new CancelSink(attributes, shape) override def withAttributes(attr: Attributes): AtomicModule = new CancelSink(attr, amendShape(attr)) @@ -164,7 +164,7 @@ private[akka] final class CancelSink(val attributes: Attributes, shape: SinkShap * Creates and wraps an actor into [[org.reactivestreams.Subscriber]] from the given `props`, * which should be [[akka.actor.Props]] for an [[akka.stream.actor.ActorSubscriber]]. */ -private[akka] final class ActorSubscriberSink[In](props: Props, val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, ActorRef](shape) { +final class ActorSubscriberSink[In](props: Props, val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, ActorRef](shape) { override def create(context: MaterializationContext) = { val subscriberRef = ActorMaterializerHelper.downcast(context.materializer).actorOf(context, props) @@ -178,9 +178,9 @@ private[akka] final class ActorSubscriberSink[In](props: Props, val attributes: /** * INTERNAL API */ -private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any, - val attributes: Attributes, - shape: SinkShape[In]) extends SinkModule[In, NotUsed](shape) { +final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any, + val attributes: Attributes, + shape: SinkShape[In]) extends SinkModule[In, NotUsed](shape) { override def create(context: MaterializationContext) = { val actorMaterializer = ActorMaterializerHelper.downcast(context.materializer) @@ -197,7 +197,7 @@ private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any new ActorRefSink[In](ref, onCompleteMessage, attr, amendShape(attr)) } -private[akka] final class LastOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] { +final class LastOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] { val in: Inlet[T] = Inlet("lastOption.in") @@ -234,7 +234,7 @@ private[akka] final class LastOptionStage[T] extends GraphStageWithMaterializedV override def toString: String = "LastOptionStage" } -private[akka] final class HeadOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] { +final class HeadOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] { val in: Inlet[T] = Inlet("headOption.in") @@ -266,7 +266,7 @@ private[akka] final class HeadOptionStage[T] extends GraphStageWithMaterializedV override def toString: String = "HeadOptionStage" } -private[akka] final class SeqStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[immutable.Seq[T]]] { +final class SeqStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[immutable.Seq[T]]] { val in = Inlet[T]("seq.in") override def toString: String = "SeqStage" @@ -315,7 +315,7 @@ private[stream] object QueueSink { /** * INTERNAL API */ -final private[stream] class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkShape[T], SinkQueueWithCancel[T]] { +final class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkShape[T], SinkQueueWithCancel[T]] { type Requested[E] = Promise[Option[E]] val in = Inlet[T]("queueSink.in") @@ -406,7 +406,7 @@ final private[stream] class QueueSink[T]() extends GraphStageWithMaterializedVal } } -private[akka] final class SinkQueueAdapter[T](delegate: SinkQueueWithCancel[T]) extends akka.stream.javadsl.SinkQueueWithCancel[T] { +final class SinkQueueAdapter[T](delegate: SinkQueueWithCancel[T]) extends akka.stream.javadsl.SinkQueueWithCancel[T] { import akka.dispatch.ExecutionContexts.{ sameThreadExecutionContext ⇒ same } def pull(): CompletionStage[Optional[T]] = delegate.pull().map(_.asJava)(same).toJava def cancel(): Unit = delegate.cancel() diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala index 326d6f283f..c8a36c2323 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala @@ -31,7 +31,7 @@ private[stream] object QueueSource { /** * INTERNAL API */ -final private[stream] class QueueSource[T](maxBuffer: Int, overflowStrategy: OverflowStrategy) extends GraphStageWithMaterializedValue[SourceShape[T], SourceQueueWithComplete[T]] { +final class QueueSource[T](maxBuffer: Int, overflowStrategy: OverflowStrategy) extends GraphStageWithMaterializedValue[SourceShape[T], SourceQueueWithComplete[T]] { import QueueSource._ val out = Outlet[T]("queueSource.out") @@ -185,7 +185,7 @@ final private[stream] class QueueSource[T](maxBuffer: Int, overflowStrategy: Ove } } -private[akka] final class SourceQueueAdapter[T](delegate: SourceQueueWithComplete[T]) extends akka.stream.javadsl.SourceQueueWithComplete[T] { +final class SourceQueueAdapter[T](delegate: SourceQueueWithComplete[T]) extends akka.stream.javadsl.SourceQueueWithComplete[T] { def offer(elem: T): CompletionStage[QueueOfferResult] = delegate.offer(elem).toJava def watchCompletion(): CompletionStage[Done] = delegate.watchCompletion().toJava def complete(): Unit = delegate.complete() @@ -195,7 +195,7 @@ private[akka] final class SourceQueueAdapter[T](delegate: SourceQueueWithComplet /** * INTERNAL API */ -private[stream] final class UnfoldResourceSource[T, S]( +final class UnfoldResourceSource[T, S]( create: () ⇒ S, readData: (S) ⇒ Option[T], close: (S) ⇒ Unit) extends GraphStage[SourceShape[T]] { @@ -252,7 +252,7 @@ private[stream] final class UnfoldResourceSource[T, S]( override def toString = "UnfoldResourceSource" } -private[stream] final class UnfoldResourceSourceAsync[T, S]( +final class UnfoldResourceSourceAsync[T, S]( create: () ⇒ Future[S], readData: (S) ⇒ Future[Option[T]], close: (S) ⇒ Future[Done]) extends GraphStage[SourceShape[T]] { @@ -332,4 +332,4 @@ private[stream] final class UnfoldResourceSourceAsync[T, S]( } override def toString = "UnfoldResourceSourceAsync" -} \ No newline at end of file +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 4ae0ab8bd4..a3d9ed72da 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -18,7 +18,7 @@ import scala.collection.immutable /** * INTERNAL API */ -private[stream] object Stages { +object Stages { object DefaultAttributes { val IODispatcher = ActorAttributes.Dispatcher("akka.stream.default-blocking-io-dispatcher") diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index 41128e3ad5..80ba1c2bae 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -1004,7 +1004,7 @@ abstract class MaterializerSession(val topLevel: StreamLayout.Module, val initia /** * INTERNAL API */ -private[akka] final case class ProcessorModule[In, Out, Mat]( +final case class ProcessorModule[In, Out, Mat]( val createProcessor: () ⇒ (Processor[In, Out], Mat), attributes: Attributes = DefaultAttributes.processor) extends StreamLayout.AtomicModule { val inPort = Inlet[In]("ProcessorModule.in") diff --git a/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala b/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala index 54a740cff4..577ea899e0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala @@ -14,7 +14,7 @@ import scala.concurrent.duration.{ FiniteDuration, _ } /** * INTERNAL API */ -private[stream] class Throttle[T]( +class Throttle[T]( cost: Int, per: FiniteDuration, maximumBurst: Int, @@ -84,4 +84,4 @@ private[stream] class Throttle[T]( } override def toString = "Throttle" -} \ No newline at end of file +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Timers.scala b/akka-stream/src/main/scala/akka/stream/impl/Timers.scala index 8f6ad1081a..daa0fea687 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Timers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Timers.scala @@ -23,7 +23,7 @@ import scala.concurrent.duration.{ Duration, FiniteDuration } * - if the timer fires before the event happens, these stages all fail the stream * - otherwise, these streams do not interfere with the element flow, ordinary completion or failure */ -private[stream] object Timers { +object Timers { private def idleTimeoutCheckInterval(timeout: FiniteDuration): FiniteDuration = { import scala.concurrent.duration._ FiniteDuration( diff --git a/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala b/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala index dd5358cbd8..e60c810079 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala @@ -13,7 +13,7 @@ import scala.util.{ Failure, Success, Try } /** * INTERNAL API */ -private[akka] final class Unfold[S, E](s: S, f: S ⇒ Option[(S, E)]) extends GraphStage[SourceShape[E]] { +final class Unfold[S, E](s: S, f: S ⇒ Option[(S, E)]) extends GraphStage[SourceShape[E]] { val out: Outlet[E] = Outlet("Unfold.out") override val shape: SourceShape[E] = SourceShape(out) override def initialAttributes: Attributes = DefaultAttributes.unfold @@ -36,7 +36,7 @@ private[akka] final class Unfold[S, E](s: S, f: S ⇒ Option[(S, E)]) extends Gr /** * INTERNAL API */ -private[akka] final class UnfoldAsync[S, E](s: S, f: S ⇒ Future[Option[(S, E)]]) extends GraphStage[SourceShape[E]] { +final class UnfoldAsync[S, E](s: S, f: S ⇒ Future[Option[(S, E)]]) extends GraphStage[SourceShape[E]] { val out: Outlet[E] = Outlet("UnfoldAsync.out") override val shape: SourceShape[E] = SourceShape(out) override def initialAttributes: Attributes = DefaultAttributes.unfoldAsync diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index d50078b098..3b14652c29 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -22,8 +22,8 @@ import scala.util.control.NonFatal /** * INTERNAL API */ -private[stream] final case class GraphModule(assembly: GraphAssembly, shape: Shape, attributes: Attributes, - matValIDs: Array[Module]) extends AtomicModule { +final case class GraphModule(assembly: GraphAssembly, shape: Shape, attributes: Attributes, + matValIDs: Array[Module]) extends AtomicModule { override def withAttributes(newAttr: Attributes): Module = copy(attributes = newAttr) @@ -44,7 +44,7 @@ private[stream] final case class GraphModule(assembly: GraphAssembly, shape: Sha /** * INTERNAL API */ -private[stream] object ActorGraphInterpreter { +object ActorGraphInterpreter { trait BoundaryEvent extends DeadLetterSuppression with NoSerializationVerificationNeeded { def shell: GraphInterpreterShell } @@ -526,7 +526,7 @@ final class GraphInterpreterShell( /** * INTERNAL API */ -private[stream] class ActorGraphInterpreter(_initial: GraphInterpreterShell) extends Actor with ActorLogging { +class ActorGraphInterpreter(_initial: GraphInterpreterShell) extends Actor with ActorLogging { import ActorGraphInterpreter._ var activeInterpreters = Set.empty[GraphInterpreterShell] diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index 217f50c2fe..da05746a19 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -21,7 +21,7 @@ import akka.stream.impl.fusing.GraphStages.MaterializedValueSource * * (See the class for the documentation of the internals) */ -private[akka] object GraphInterpreter { +object GraphInterpreter { /** * Compile time constant, enable it for debug logging to the console. */ @@ -346,7 +346,7 @@ private[akka] object GraphInterpreter { * be modeled, or even dissolved (if preempted and a "stealing" external event is injected; for example the non-cycle * edge of a balance is pulled, dissolving the original cycle). */ -private[stream] final class GraphInterpreter( +final class GraphInterpreter( private val assembly: GraphInterpreter.GraphAssembly, val materializer: Materializer, val log: LoggingAdapter, diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index aa5d0bd6f4..4e2fe7e682 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -25,7 +25,7 @@ import scala.util.Try /** * INTERNAL API */ -private[akka] final case class GraphStageModule( +final case class GraphStageModule( shape: Shape, attributes: Attributes, stage: GraphStageWithMaterializedValue[Shape, Any]) extends AtomicModule { @@ -50,7 +50,7 @@ object GraphStages { /** * INTERNAL API */ - private[akka] abstract class SimpleLinearGraphStage[T] extends GraphStage[FlowShape[T, T]] { + abstract class SimpleLinearGraphStage[T] extends GraphStage[FlowShape[T, T]] { val in = Inlet[T](Logging.simpleName(this) + ".in") val out = Outlet[T](Logging.simpleName(this) + ".out") override val shape = FlowShape(in, out) @@ -77,7 +77,7 @@ object GraphStages { /** * INTERNAL API */ - private[stream] final class Detacher[T] extends GraphStage[FlowShape[T, T]] { + final class Detacher[T] extends GraphStage[FlowShape[T, T]] { val in = Inlet[T]("Detacher.in") val out = Outlet[T]("Detacher.out") override def initialAttributes = DefaultAttributes.detacher @@ -288,7 +288,7 @@ object GraphStages { override def toString: String = s"MaterializedValueSource($computation)" } - private[stream] final class SingleSource[T](val elem: T) extends GraphStage[SourceShape[T]] { + final class SingleSource[T](val elem: T) extends GraphStage[SourceShape[T]] { override def initialAttributes: Attributes = DefaultAttributes.singleSource ReactiveStreamsCompliance.requireNonNullElement(elem) val out = Outlet[T]("single.out") @@ -304,7 +304,7 @@ object GraphStages { override def toString: String = s"SingleSource($elem)" } - private[stream] final class FutureSource[T](val future: Future[T]) extends GraphStage[SourceShape[T]] { + final class FutureSource[T](val future: Future[T]) extends GraphStage[SourceShape[T]] { ReactiveStreamsCompliance.requireNonNullElement(future) val shape = SourceShape(Outlet[T]("future.out")) val out = shape.out diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 30cd433fb5..32e64cc9d1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -25,7 +25,7 @@ import akka.stream.impl.Stages.DefaultAttributes /** * INTERNAL API */ -private[akka] final case class Map[In, Out](f: In ⇒ Out, decider: Supervision.Decider) extends PushStage[In, Out] { +final case class Map[In, Out](f: In ⇒ Out, decider: Supervision.Decider) extends PushStage[In, Out] { override def onPush(elem: In, ctx: Context[Out]): SyncDirective = ctx.push(f(elem)) override def decide(t: Throwable): Supervision.Directive = decider(t) @@ -34,7 +34,7 @@ private[akka] final case class Map[In, Out](f: In ⇒ Out, decider: Supervision. /** * INTERNAL API */ -private[akka] final case class Filter[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T] { +final case class Filter[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T] { override def initialAttributes: Attributes = DefaultAttributes.filter override def toString: String = "Filter" @@ -68,7 +68,7 @@ private[akka] final case class Filter[T](p: T ⇒ Boolean) extends SimpleLinearG /** * INTERNAL API */ -private[akka] final case class TakeWhile[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T] { +final case class TakeWhile[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T] { override def initialAttributes: Attributes = DefaultAttributes.takeWhile override def toString: String = "TakeWhile" @@ -104,7 +104,7 @@ private[akka] final case class TakeWhile[T](p: T ⇒ Boolean) extends SimpleLine /** * INTERNAL API */ -private[stream] final case class DropWhile[T](p: T ⇒ Boolean) extends GraphStage[FlowShape[T, T]] { +final case class DropWhile[T](p: T ⇒ Boolean) extends GraphStage[FlowShape[T, T]] { val in = Inlet[T]("DropWhile.in") val out = Outlet[T]("DropWhile.out") override val shape = FlowShape(in, out) @@ -136,7 +136,7 @@ private[stream] final case class DropWhile[T](p: T ⇒ Boolean) extends GraphSta /** * INTERNAL API */ -abstract private[stream] class SupervisedGraphStageLogic(inheritedAttributes: Attributes, shape: Shape) extends GraphStageLogic(shape) { +abstract class SupervisedGraphStageLogic(inheritedAttributes: Attributes, shape: Shape) extends GraphStageLogic(shape) { private lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) def withSupervision[T](f: () ⇒ T): Option[T] = try { Some(f()) } catch { @@ -164,7 +164,7 @@ private[stream] object Collect { /** * INTERNAL API */ -private[stream] final case class Collect[In, Out](pf: PartialFunction[In, Out]) extends GraphStage[FlowShape[In, Out]] { +final case class Collect[In, Out](pf: PartialFunction[In, Out]) extends GraphStage[FlowShape[In, Out]] { val in = Inlet[In]("Collect.in") val out = Outlet[Out]("Collect.out") override val shape = FlowShape(in, out) @@ -192,7 +192,7 @@ private[stream] final case class Collect[In, Out](pf: PartialFunction[In, Out]) /** * INTERNAL API */ -private[akka] final case class Recover[T](pf: PartialFunction[Throwable, T]) extends GraphStage[FlowShape[T, T]] { +final case class Recover[T](pf: PartialFunction[Throwable, T]) extends GraphStage[FlowShape[T, T]] { val in = Inlet[T]("Recover.in") val out = Outlet[T]("Recover.out") override val shape: FlowShape[T, T] = FlowShape(in, out) @@ -239,7 +239,7 @@ private[akka] final case class Recover[T](pf: PartialFunction[Throwable, T]) ext /** * INTERNAL API */ -private[akka] final case class Take[T](count: Long) extends SimpleLinearGraphStage[T] { +final case class Take[T](count: Long) extends SimpleLinearGraphStage[T] { override def initialAttributes: Attributes = DefaultAttributes.take override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { @@ -268,7 +268,7 @@ private[akka] final case class Take[T](count: Long) extends SimpleLinearGraphSta /** * INTERNAL API */ -private[akka] final case class Drop[T](count: Long) extends SimpleLinearGraphStage[T] { +final case class Drop[T](count: Long) extends SimpleLinearGraphStage[T] { override def initialAttributes: Attributes = DefaultAttributes.drop override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { @@ -292,7 +292,7 @@ private[akka] final case class Drop[T](count: Long) extends SimpleLinearGraphSta /** * INTERNAL API */ -private[akka] final case class Scan[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends GraphStage[FlowShape[In, Out]] { +final case class Scan[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends GraphStage[FlowShape[In, Out]] { override val shape = FlowShape[In, Out](Inlet("Scan.in"), Outlet("Scan.out")) override def initialAttributes: Attributes = DefaultAttributes.scan @@ -338,7 +338,7 @@ private[akka] final case class Scan[In, Out](zero: Out, f: (Out, In) ⇒ Out) ex /** * INTERNAL API */ -private[akka] final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out, decider: Supervision.Decider) extends PushPullStage[In, Out] { +final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out, decider: Supervision.Decider) extends PushPullStage[In, Out] { private[this] var aggregator: Out = zero override def onPush(elem: In, ctx: Context[Out]): SyncDirective = { @@ -406,7 +406,7 @@ final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) ext /** * INTERNAL API */ -private[akka] final case class Grouped[T](n: Int) extends PushPullStage[T, immutable.Seq[T]] { +final case class Grouped[T](n: Int) extends PushPullStage[T, immutable.Seq[T]] { private val buf = { val b = Vector.newBuilder[T] b.sizeHint(n) @@ -441,7 +441,7 @@ private[akka] final case class Grouped[T](n: Int) extends PushPullStage[T, immut /** * INTERNAL API */ -private[stream] final case class LimitWeighted[T](n: Long, costFn: T ⇒ Long) extends GraphStage[FlowShape[T, T]] { +final case class LimitWeighted[T](n: Long, costFn: T ⇒ Long) extends GraphStage[FlowShape[T, T]] { val in = Inlet[T]("LimitWeighted.in") val out = Outlet[T]("LimitWeighted.out") override val shape = FlowShape(in, out) @@ -473,7 +473,7 @@ private[stream] final case class LimitWeighted[T](n: Long, costFn: T ⇒ Long) e /** * INTERNAL API */ -private[akka] final case class Sliding[T](n: Int, step: Int) extends PushPullStage[T, immutable.Seq[T]] { +final case class Sliding[T](n: Int, step: Int) extends PushPullStage[T, immutable.Seq[T]] { private var buf = Vector.empty[T] override def onPush(elem: T, ctx: Context[immutable.Seq[T]]): SyncDirective = { @@ -506,7 +506,7 @@ private[akka] final case class Sliding[T](n: Int, step: Int) extends PushPullSta /** * INTERNAL API */ -private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends DetachedStage[T, T] { +final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends DetachedStage[T, T] { private var buffer: BufferImpl[T] = _ @@ -566,7 +566,7 @@ private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowSt /** * INTERNAL API */ -private[akka] final case class Batch[In, Out](max: Long, costFn: In ⇒ Long, seed: In ⇒ Out, aggregate: (Out, In) ⇒ Out) +final case class Batch[In, Out](max: Long, costFn: In ⇒ Long, seed: In ⇒ Out, aggregate: (Out, In) ⇒ Out) extends GraphStage[FlowShape[In, Out]] { val in = Inlet[In]("Batch.in") @@ -692,7 +692,7 @@ private[akka] final case class Batch[In, Out](max: Long, costFn: In ⇒ Long, se /** * INTERNAL API */ -private[akka] final class Expand[In, Out](extrapolate: In ⇒ Iterator[Out]) extends GraphStage[FlowShape[In, Out]] { +final class Expand[In, Out](extrapolate: In ⇒ Iterator[Out]) extends GraphStage[FlowShape[In, Out]] { private val in = Inlet[In]("expand.in") private val out = Outlet[Out]("expand.out") @@ -761,7 +761,7 @@ private[akka] object MapAsync { /** * INTERNAL API */ -private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Future[Out]) +final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Future[Out]) extends GraphStage[FlowShape[In, Out]] { import MapAsync._ @@ -832,7 +832,7 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut /** * INTERNAL API */ -private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: In ⇒ Future[Out]) +final case class MapAsyncUnordered[In, Out](parallelism: Int, f: In ⇒ Future[Out]) extends GraphStage[FlowShape[In, Out]] { private val in = Inlet[In]("MapAsyncUnordered.in") @@ -904,7 +904,7 @@ private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: I /** * INTERNAL API */ -private[akka] final case class Log[T]( +final case class Log[T]( name: String, extract: T ⇒ Any, logAdapter: Option[LoggingAdapter]) extends SimpleLinearGraphStage[T] { @@ -1021,7 +1021,7 @@ private[stream] object TimerKeys { case object GroupedWithinTimerKey } -private[stream] final class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphStage[FlowShape[T, immutable.Seq[T]]] { +final class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphStage[FlowShape[T, immutable.Seq[T]]] { require(n > 0, "n must be greater than 0") require(d > Duration.Zero) @@ -1097,7 +1097,7 @@ private[stream] final class GroupedWithin[T](n: Int, d: FiniteDuration) extends } } -private[stream] final class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] { +final class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] { private[this] def timerName = "DelayedTimer" override def initialAttributes: Attributes = DefaultAttributes.delay override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { @@ -1183,7 +1183,7 @@ private[stream] final class Delay[T](d: FiniteDuration, strategy: DelayOverflowS override def toString = "Delay" } -private[stream] final class TakeWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { +final class TakeWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { setHandler(in, new InHandler { @@ -1203,7 +1203,7 @@ private[stream] final class TakeWithin[T](timeout: FiniteDuration) extends Simpl override def toString = "TakeWithin" } -private[stream] final class DropWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { +final class DropWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { private var allow = false @@ -1229,7 +1229,7 @@ private[stream] final class DropWithin[T](timeout: FiniteDuration) extends Simpl /** * INTERNAL API */ -private[stream] final class Reduce[T](f: (T, T) ⇒ T) extends SimpleLinearGraphStage[T] { +final class Reduce[T](f: (T, T) ⇒ T) extends SimpleLinearGraphStage[T] { override def initialAttributes: Attributes = DefaultAttributes.reduce override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { self ⇒ @@ -1273,7 +1273,7 @@ private[stream] object RecoverWith { val InfiniteRetries = -1 } -private[stream] final class RecoverWith[T, M](maximumRetries: Int, pf: PartialFunction[Throwable, Graph[SourceShape[T], M]]) extends SimpleLinearGraphStage[T] { +final class RecoverWith[T, M](maximumRetries: Int, pf: PartialFunction[Throwable, Graph[SourceShape[T], M]]) extends SimpleLinearGraphStage[T] { require(maximumRetries >= -1, "number of retries must be non-negative or equal to -1") override def initialAttributes = DefaultAttributes.recoverWith @@ -1331,7 +1331,7 @@ private[stream] final class RecoverWith[T, M](maximumRetries: Int, pf: PartialFu /** * INTERNAL API */ -private[stream] final class StatefulMapConcat[In, Out](f: () ⇒ In ⇒ immutable.Iterable[Out]) extends GraphStage[FlowShape[In, Out]] { +final class StatefulMapConcat[In, Out](val f: () ⇒ In ⇒ immutable.Iterable[Out]) extends GraphStage[FlowShape[In, Out]] { val in = Inlet[In]("StatefulMapConcat.in") val out = Outlet[Out]("StatefulMapConcat.out") override val shape = FlowShape(in, out) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala index 9ae2505744..9a2a4c3cd9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala @@ -23,7 +23,7 @@ import akka.stream.TLSProtocol._ /** * INTERNAL API. */ -private[akka] object TLSActor { +object TLSActor { def props( settings: ActorMaterializerSettings, @@ -46,7 +46,7 @@ private[akka] object TLSActor { /** * INTERNAL API. */ -private[akka] class TLSActor( +class TLSActor( settings: ActorMaterializerSettings, sslContext: SSLContext, externalSslConfig: Option[AkkaSSLConfig], diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala index b93b385597..01a3692c87 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala @@ -295,7 +295,7 @@ private[stream] object TcpConnectionStage { /** * INTERNAL API */ -private[stream] class IncomingConnectionStage(connection: ActorRef, remoteAddress: InetSocketAddress, halfClose: Boolean) +class IncomingConnectionStage(connection: ActorRef, remoteAddress: InetSocketAddress, halfClose: Boolean) extends GraphStage[FlowShape[ByteString, ByteString]] { import TcpConnectionStage._ diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala index e5b31bd6bf..4abaa2e9de 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TlsModule.scala @@ -11,13 +11,13 @@ import com.typesafe.sslconfig.akka.AkkaSSLConfig /** * INTERNAL API. */ -private[akka] final case class TlsModule(plainIn: Inlet[SslTlsOutbound], plainOut: Outlet[SslTlsInbound], - cipherIn: Inlet[ByteString], cipherOut: Outlet[ByteString], - shape: Shape, attributes: Attributes, - sslContext: SSLContext, - sslConfig: Option[AkkaSSLConfig], - firstSession: NegotiateNewSession, - role: TLSRole, closing: TLSClosing, hostInfo: Option[(String, Int)]) extends AtomicModule { +final case class TlsModule(plainIn: Inlet[SslTlsOutbound], plainOut: Outlet[SslTlsInbound], + cipherIn: Inlet[ByteString], cipherOut: Outlet[ByteString], + shape: Shape, attributes: Attributes, + sslContext: SSLContext, + sslConfig: Option[AkkaSSLConfig], + firstSession: NegotiateNewSession, + role: TLSRole, closing: TLSClosing, hostInfo: Option[(String, Int)]) extends AtomicModule { override def withAttributes(att: Attributes): TlsModule = copy(attributes = att) override def carbonCopy: TlsModule = @@ -35,7 +35,7 @@ private[akka] final case class TlsModule(plainIn: Inlet[SslTlsOutbound], plainOu /** * INTERNAL API. */ -private[akka] object TlsModule { +object TlsModule { def apply(attributes: Attributes, sslContext: SSLContext, sslConfig: Option[AkkaSSLConfig], firstSession: NegotiateNewSession, role: TLSRole, closing: TLSClosing, hostInfo: Option[(String, Int)]): TlsModule = { val name = attributes.nameOrDefault(s"StreamTls($role)") val cipherIn = Inlet[ByteString](s"$name.cipherIn") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala index 88ba60b061..b49042c447 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/BidiFlow.scala @@ -96,7 +96,7 @@ object BidiFlow { } final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](delegate: scaladsl.BidiFlow[I1, O1, I2, O2, Mat]) extends Graph[BidiShape[I1, O1, I2, O2], Mat] { - private[stream] override def module = delegate.module + override def module = delegate.module override def shape = delegate.shape def asScala: scaladsl.BidiFlow[I1, O1, I2, O2, Mat] = delegate diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 5ec142d2bb..70e500dadc 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -73,7 +73,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends import scala.collection.JavaConverters._ override def shape: FlowShape[In, Out] = delegate.shape - private[stream] def module: StreamLayout.Module = delegate.module + def module: StreamLayout.Module = delegate.module override def toString: String = delegate.toString diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 0aa1df808d..97ea1214aa 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -272,7 +272,7 @@ object Sink { final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[SinkShape[In], Mat] { override def shape: SinkShape[In] = delegate.shape - private[stream] def module: StreamLayout.Module = delegate.module + def module: StreamLayout.Module = delegate.module override def toString: String = delegate.toString diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 2362794819..94b31c0fd3 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -414,7 +414,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap override def shape: SourceShape[Out] = delegate.shape - private[stream] def module: StreamLayout.Module = delegate.module + def module: StreamLayout.Module = delegate.module override def toString: String = delegate.toString diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala index 3bc954ebd4..b27d817df7 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala @@ -10,7 +10,7 @@ import akka.stream.impl.Timers import scala.concurrent.duration.FiniteDuration -final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](private[stream] override val module: Module) extends Graph[BidiShape[I1, O1, I2, O2], Mat] { +final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](override val module: Module) extends Graph[BidiShape[I1, O1, I2, O2], Mat] { override def shape = module.shape.asInstanceOf[BidiShape[I1, O1, I2, O2]] def asJava: javadsl.BidiFlow[I1, O1, I2, O2, Mat] = new javadsl.BidiFlow(this) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index a2636d4bcb..5e0aad72f8 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -25,7 +25,7 @@ import akka.NotUsed /** * A `Flow` is a set of stream processing steps that has one open input and one open output. */ -final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) +final class Flow[-In, +Out, +Mat](override val module: Module) extends FlowOpsMat[Out, Mat] with Graph[FlowShape[In, Out], Mat] { override val shape: FlowShape[In, Out] = module.shape.asInstanceOf[FlowShape[In, Out]] @@ -335,7 +335,7 @@ object RunnableGraph { /** * Flow with attached input and output, can be executed. */ -final case class RunnableGraph[+Mat](private[stream] val module: StreamLayout.Module) extends Graph[ClosedShape, Mat] { +final case class RunnableGraph[+Mat](val module: StreamLayout.Module) extends Graph[ClosedShape, Mat] { require(module.isRunnable) override def shape = ClosedShape diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index a77e18f13d..e2964f727e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -26,7 +26,7 @@ import scala.util.{ Failure, Success, Try } * A `Sink` is a set of stream processing steps that has one open input and an attached output. * Can be used as a `Subscriber` */ -final class Sink[-In, +Mat](private[stream] override val module: Module) +final class Sink[-In, +Mat](override val module: Module) extends Graph[SinkShape[In], Mat] { override val shape: SinkShape[In] = module.shape.asInstanceOf[SinkShape[In]] @@ -89,7 +89,7 @@ final class Sink[-In, +Mat](private[stream] override val module: Module) object Sink { /** INTERNAL API */ - private[stream] def shape[T](name: String): SinkShape[T] = SinkShape(Inlet(name + ".in")) + def shape[T](name: String): SinkShape[T] = SinkShape(Inlet(name + ".in")) /** * A graph with the shape of a sink logically is a sink, this method makes diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 62cba7bba1..3ce9ddc387 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -27,7 +27,7 @@ import scala.compat.java8.FutureConverters._ * an “atomic” source, e.g. from a collection or a file. Materialization turns a Source into * a Reactive Streams `Publisher` (at least conceptually). */ -final class Source[+Out, +Mat](private[stream] override val module: Module) +final class Source[+Out, +Mat](override val module: Module) extends FlowOpsMat[Out, Mat] with Graph[SourceShape[Out], Mat] { override type Repr[+O] = Source[O, Mat @uncheckedVariance] @@ -170,7 +170,7 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) object Source { /** INTERNAL API */ - private[stream] def shape[T](name: String): SourceShape[T] = SourceShape(Outlet(name + ".out")) + def shape[T](name: String): SourceShape[T] = SourceShape(Outlet(name + ".out")) /** * Helper to create [[Source]] from `Publisher`. diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index cab6d14a26..493335fa9c 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -23,11 +23,11 @@ abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S, protected def initialAttributes: Attributes = Attributes.none - final override private[stream] lazy val module: Module = GraphStageModule(shape, initialAttributes, this) + final override lazy val module: Module = GraphStageModule(shape, initialAttributes, this) final override def withAttributes(attr: Attributes): Graph[S, M] = new Graph[S, M] { override def shape = GraphStageWithMaterializedValue.this.shape - override private[stream] def module = GraphStageWithMaterializedValue.this.module.withAttributes(attr) + override def module = GraphStageWithMaterializedValue.this.module.withAttributes(attr) override def withAttributes(attr: Attributes) = GraphStageWithMaterializedValue.this.withAttributes(attr) } diff --git a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala index 1ddb08d392..aa44f6859d 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala @@ -34,7 +34,7 @@ sealed trait Stage[-In, +Out] /** * INTERNAL API */ -private[stream] object AbstractStage { +object AbstractStage { private class PushPullGraphLogic[In, Out]( private val shape: FlowShape[In, Out], diff --git a/project/MiMa.scala b/project/MiMa.scala index 53e0ff285f..43bd291a6f 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -900,7 +900,10 @@ object MiMa extends AutoPlugin { // #20846 change of internal Status message ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.pubsub.protobuf.msg.DistributedPubSubMessages#StatusOrBuilder.getReplyToStatus"), - ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.pubsub.protobuf.msg.DistributedPubSubMessages#StatusOrBuilder.hasReplyToStatus") + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.pubsub.protobuf.msg.DistributedPubSubMessages#StatusOrBuilder.hasReplyToStatus"), + + // #20543 GraphStage subtypes should not be private to akka + ProblemFilters.exclude[DirectAbstractMethodProblem]("akka.stream.ActorMaterializer.actorOf") ) ) }