diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index 71c39ff451..da49f2c5b9 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -93,8 +93,12 @@ private[http] object HttpServerBluePrint { .via(requestStartOrRunIgnore(settings))) def requestStartOrRunIgnore(settings: ServerSettings)(implicit mat: Materializer): Flow[(ParserOutput.RequestOutput, Source[ParserOutput.RequestOutput, Unit]), HttpRequest, Unit] = - Flow.fromGraph(new FlowStage[(RequestOutput, Source[RequestOutput, Unit]), HttpRequest, Unit]("RequestStartThenRunIgnore") { - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = (new GraphStageLogic(shape) { + Flow.fromGraph(new GraphStage[FlowShape[(RequestOutput, Source[RequestOutput, Unit]), HttpRequest]] { + val in = Inlet[(RequestOutput, Source[RequestOutput, Unit])]("RequestStartThenRunIgnore.in") + val out = Outlet[HttpRequest]("RequestStartThenRunIgnore.out") + override val shape: FlowShape[(RequestOutput, Source[RequestOutput, Unit]), HttpRequest] = FlowShape.of(in, out) + + override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { val remoteAddress = inheritedAttributes.get[HttpAttributes.RemoteAddress].flatMap(_.address) setHandler(in, new InHandler { @@ -118,7 +122,7 @@ private[http] object HttpServerBluePrint { setHandler(out, new OutHandler { override def onPull(): Unit = pull(in) }) - }, ()) + } }) def parsing(settings: ServerSettings, log: LoggingAdapter): Flow[ByteString, RequestOutput, Unit] = { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StageActorRefSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StageActorRefSpec.scala index 44f83281a3..4bfbe57ee7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StageActorRefSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StageActorRefSpec.scala @@ -7,7 +7,7 @@ import akka.actor.{ Kill, PoisonPill, NoSerializationVerificationNeeded, ActorRe import akka.event.Logging import akka.stream._ import akka.stream.stage.GraphStageLogic.StageActorRef -import akka.stream.stage.{ GraphStageLogic, InHandler, SinkStage } +import akka.stream.stage.{ GraphStageWithMaterializedValue, GraphStage, GraphStageLogic, InHandler } import akka.stream.testkit.AkkaSpec import akka.testkit.{ TestProbe, TestEvent, EventFilter, ImplicitSender } import org.scalatest.concurrent.PatienceConfiguration.Timeout @@ -178,7 +178,10 @@ object StageActorRefSpec { import ControlProtocol._ - case class SumTestStage(probe: ActorRef) extends SinkStage[Int, Future[Int]]("IntSum") { + case class SumTestStage(probe: ActorRef) extends GraphStageWithMaterializedValue[SinkShape[Int], Future[Int]] { + val in = Inlet[Int]("IntSum.in") + override val shape: SinkShape[Int] = SinkShape.of(in) + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Int]) = { val p: Promise[Int] = Promise() diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 74dadc2ed9..69c277fd53 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -7,7 +7,7 @@ import akka.actor.{ ActorRef, Props } import akka.stream.actor.ActorPublisherMessage.Request import akka.stream.impl.StreamLayout.Module import akka.stream._ -import akka.stream.stage.{ InHandler, GraphStageLogic, SinkStage } +import akka.stream.stage.{ GraphStageWithMaterializedValue, InHandler, GraphStageLogic } import akka.util.Timeout import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.unchecked.uncheckedVariance @@ -193,7 +193,12 @@ private[akka] final class AcknowledgeSink[In](bufferSize: Int, val attributes: A override def toString: String = "AcknowledgeSink" } -private[akka] final class LastOptionStage[T] extends SinkStage[T, Future[Option[T]]]("lastOption") { +private[akka] final class LastOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] { + + val in = Inlet[T]("lastOption.in") + + override val shape: SinkShape[T] = SinkShape.of(in) + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { val p: Promise[Option[T]] = Promise() (new GraphStageLogic(shape) { @@ -223,7 +228,12 @@ private[akka] final class LastOptionStage[T] extends SinkStage[T, Future[Option[ } } -private[akka] final class HeadOptionStage[T] extends SinkStage[T, Future[Option[T]]]("headOption") { +private[akka] final class HeadOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] { + + val in = Inlet[T]("headOption.in") + + override val shape: SinkShape[T] = SinkShape.of(in) + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { val p: Promise[Option[T]] = Promise() (new GraphStageLogic(shape) { diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala index 109e181018..76b20008c3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala @@ -12,7 +12,7 @@ import akka.util.ByteString import scala.annotation.tailrec import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration -import akka.stream.Attributes +import akka.stream.{ Inlet, SinkShape, Attributes } private[akka] object InputStreamSinkStage { @@ -33,7 +33,12 @@ private[akka] object InputStreamSinkStage { /** * INTERNAL API */ -private[akka] class InputStreamSinkStage(readTimeout: FiniteDuration) extends SinkStage[ByteString, InputStream]("InputStreamSink") { +private[akka] class InputStreamSinkStage(readTimeout: FiniteDuration) extends GraphStageWithMaterializedValue[SinkShape[ByteString], InputStream] { + + val in = Inlet[ByteString]("InputStreamSink.in") + override val shape: SinkShape[ByteString] = SinkShape.of(in) + + // has to be in this order as module depends on shape val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max require(maxBuffer > 0, "Buffer size must be greater than 0") diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala index 1fa1cf555c..c4c1a82157 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala @@ -7,7 +7,7 @@ import java.io.{ IOException, OutputStream } import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.{ BlockingQueue, LinkedBlockingQueue } -import akka.stream.Attributes +import akka.stream.{ Outlet, SourceShape, Attributes } import akka.stream.Attributes.InputBuffer import akka.stream.impl.io.OutputStreamSourceStage._ import akka.stream.stage._ @@ -32,7 +32,11 @@ private[akka] object OutputStreamSourceStage { } } -private[akka] class OutputStreamSourceStage(writeTimeout: FiniteDuration) extends SourceStage[ByteString, OutputStream]("OutputStreamSource") { +private[akka] class OutputStreamSourceStage(writeTimeout: FiniteDuration) extends GraphStageWithMaterializedValue[SourceShape[ByteString], OutputStream] { + val out = Outlet[ByteString]("OutputStreamSource.out") + override val shape: SourceShape[ByteString] = SourceShape.of(out) + + // has to be in this order as module depends on shape val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max require(maxBuffer > 0, "Buffer size must be greater than 0") diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 2925a62ba6..a1a424e792 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -53,34 +53,6 @@ abstract class GraphStage[S <: Shape] extends GraphStageWithMaterializedValue[S, def createLogic(inheritedAttributes: Attributes): GraphStageLogic } -/** - * A SourceStage represents a reusable graph stream processing stage. A SourceStage consists of a [[akka.stream.Shape]] which describes - * its output port. - */ -abstract class SourceStage[Out, M](name: String) extends GraphStageWithMaterializedValue[SourceShape[Out], M] { - val out: Outlet[Out] = Outlet[Out](name + ".out") - override val shape: SourceShape[Out] = SourceShape(out) -} - -/** - * A SinkStage represents a reusable graph stream processing stage. A SinkStage consists of a [[akka.stream.Shape]] which describes - * its input port. - */ -abstract class SinkStage[In, M](name: String) extends GraphStageWithMaterializedValue[SinkShape[In], M] { - val in: Inlet[In] = Inlet[In](name + ".in") - override val shape: SinkShape[In] = SinkShape(in) -} - -/** - * A FlowStage represents a reusable graph stream processing stage. A FlowStage consists of a [[akka.stream.Shape]] which describes - * its input and output ports. - */ -abstract class FlowStage[In, Out, M](name: String) extends GraphStageWithMaterializedValue[FlowShape[In, Out], M] { - val in: Inlet[In] = Inlet[In](name + ".in") - val out: Outlet[Out] = Outlet[Out](name + ".out") - override val shape: FlowShape[In, Out] = FlowShape(in, out) -} - private object TimerMessages { final case class Scheduled(timerKey: Any, timerId: Int, repeating: Boolean) extends DeadLetterSuppression final case class Timer(id: Int, task: Cancellable)