!str #19156 Source/Flow/Sink-Stage removed

This commit is contained in:
Johan Andrén 2015-12-13 17:02:58 +01:00
parent b16b1692e3
commit bcc3525bd9
6 changed files with 38 additions and 40 deletions

View file

@ -93,8 +93,12 @@ private[http] object HttpServerBluePrint {
.via(requestStartOrRunIgnore(settings))) .via(requestStartOrRunIgnore(settings)))
def requestStartOrRunIgnore(settings: ServerSettings)(implicit mat: Materializer): Flow[(ParserOutput.RequestOutput, Source[ParserOutput.RequestOutput, Unit]), HttpRequest, Unit] = def requestStartOrRunIgnore(settings: ServerSettings)(implicit mat: Materializer): Flow[(ParserOutput.RequestOutput, Source[ParserOutput.RequestOutput, Unit]), HttpRequest, Unit] =
Flow.fromGraph(new FlowStage[(RequestOutput, Source[RequestOutput, Unit]), HttpRequest, Unit]("RequestStartThenRunIgnore") { Flow.fromGraph(new GraphStage[FlowShape[(RequestOutput, Source[RequestOutput, Unit]), HttpRequest]] {
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = (new GraphStageLogic(shape) { val in = Inlet[(RequestOutput, Source[RequestOutput, Unit])]("RequestStartThenRunIgnore.in")
val out = Outlet[HttpRequest]("RequestStartThenRunIgnore.out")
override val shape: FlowShape[(RequestOutput, Source[RequestOutput, Unit]), HttpRequest] = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
val remoteAddress = inheritedAttributes.get[HttpAttributes.RemoteAddress].flatMap(_.address) val remoteAddress = inheritedAttributes.get[HttpAttributes.RemoteAddress].flatMap(_.address)
setHandler(in, new InHandler { setHandler(in, new InHandler {
@ -118,7 +122,7 @@ private[http] object HttpServerBluePrint {
setHandler(out, new OutHandler { setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in) override def onPull(): Unit = pull(in)
}) })
}, ()) }
}) })
def parsing(settings: ServerSettings, log: LoggingAdapter): Flow[ByteString, RequestOutput, Unit] = { def parsing(settings: ServerSettings, log: LoggingAdapter): Flow[ByteString, RequestOutput, Unit] = {

View file

@ -7,7 +7,7 @@ import akka.actor.{ Kill, PoisonPill, NoSerializationVerificationNeeded, ActorRe
import akka.event.Logging import akka.event.Logging
import akka.stream._ import akka.stream._
import akka.stream.stage.GraphStageLogic.StageActorRef import akka.stream.stage.GraphStageLogic.StageActorRef
import akka.stream.stage.{ GraphStageLogic, InHandler, SinkStage } import akka.stream.stage.{ GraphStageWithMaterializedValue, GraphStage, GraphStageLogic, InHandler }
import akka.stream.testkit.AkkaSpec import akka.stream.testkit.AkkaSpec
import akka.testkit.{ TestProbe, TestEvent, EventFilter, ImplicitSender } import akka.testkit.{ TestProbe, TestEvent, EventFilter, ImplicitSender }
import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.concurrent.PatienceConfiguration.Timeout
@ -178,7 +178,10 @@ object StageActorRefSpec {
import ControlProtocol._ import ControlProtocol._
case class SumTestStage(probe: ActorRef) extends SinkStage[Int, Future[Int]]("IntSum") { case class SumTestStage(probe: ActorRef) extends GraphStageWithMaterializedValue[SinkShape[Int], Future[Int]] {
val in = Inlet[Int]("IntSum.in")
override val shape: SinkShape[Int] = SinkShape.of(in)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Int]) = { override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Int]) = {
val p: Promise[Int] = Promise() val p: Promise[Int] = Promise()

View file

@ -7,7 +7,7 @@ import akka.actor.{ ActorRef, Props }
import akka.stream.actor.ActorPublisherMessage.Request import akka.stream.actor.ActorPublisherMessage.Request
import akka.stream.impl.StreamLayout.Module import akka.stream.impl.StreamLayout.Module
import akka.stream._ import akka.stream._
import akka.stream.stage.{ InHandler, GraphStageLogic, SinkStage } import akka.stream.stage.{ GraphStageWithMaterializedValue, InHandler, GraphStageLogic }
import akka.util.Timeout import akka.util.Timeout
import org.reactivestreams.{ Publisher, Subscriber } import org.reactivestreams.{ Publisher, Subscriber }
import scala.annotation.unchecked.uncheckedVariance import scala.annotation.unchecked.uncheckedVariance
@ -193,7 +193,12 @@ private[akka] final class AcknowledgeSink[In](bufferSize: Int, val attributes: A
override def toString: String = "AcknowledgeSink" override def toString: String = "AcknowledgeSink"
} }
private[akka] final class LastOptionStage[T] extends SinkStage[T, Future[Option[T]]]("lastOption") { private[akka] final class LastOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] {
val in = Inlet[T]("lastOption.in")
override val shape: SinkShape[T] = SinkShape.of(in)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val p: Promise[Option[T]] = Promise() val p: Promise[Option[T]] = Promise()
(new GraphStageLogic(shape) { (new GraphStageLogic(shape) {
@ -223,7 +228,12 @@ private[akka] final class LastOptionStage[T] extends SinkStage[T, Future[Option[
} }
} }
private[akka] final class HeadOptionStage[T] extends SinkStage[T, Future[Option[T]]]("headOption") { private[akka] final class HeadOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] {
val in = Inlet[T]("headOption.in")
override val shape: SinkShape[T] = SinkShape.of(in)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val p: Promise[Option[T]] = Promise() val p: Promise[Option[T]] = Promise()
(new GraphStageLogic(shape) { (new GraphStageLogic(shape) {

View file

@ -12,7 +12,7 @@ import akka.util.ByteString
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.concurrent.Future import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import akka.stream.Attributes import akka.stream.{ Inlet, SinkShape, Attributes }
private[akka] object InputStreamSinkStage { private[akka] object InputStreamSinkStage {
@ -33,7 +33,12 @@ private[akka] object InputStreamSinkStage {
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] class InputStreamSinkStage(readTimeout: FiniteDuration) extends SinkStage[ByteString, InputStream]("InputStreamSink") { private[akka] class InputStreamSinkStage(readTimeout: FiniteDuration) extends GraphStageWithMaterializedValue[SinkShape[ByteString], InputStream] {
val in = Inlet[ByteString]("InputStreamSink.in")
override val shape: SinkShape[ByteString] = SinkShape.of(in)
// has to be in this order as module depends on shape
val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
require(maxBuffer > 0, "Buffer size must be greater than 0") require(maxBuffer > 0, "Buffer size must be greater than 0")

View file

@ -7,7 +7,7 @@ import java.io.{ IOException, OutputStream }
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ BlockingQueue, LinkedBlockingQueue } import java.util.concurrent.{ BlockingQueue, LinkedBlockingQueue }
import akka.stream.Attributes import akka.stream.{ Outlet, SourceShape, Attributes }
import akka.stream.Attributes.InputBuffer import akka.stream.Attributes.InputBuffer
import akka.stream.impl.io.OutputStreamSourceStage._ import akka.stream.impl.io.OutputStreamSourceStage._
import akka.stream.stage._ import akka.stream.stage._
@ -32,7 +32,11 @@ private[akka] object OutputStreamSourceStage {
} }
} }
private[akka] class OutputStreamSourceStage(writeTimeout: FiniteDuration) extends SourceStage[ByteString, OutputStream]("OutputStreamSource") { private[akka] class OutputStreamSourceStage(writeTimeout: FiniteDuration) extends GraphStageWithMaterializedValue[SourceShape[ByteString], OutputStream] {
val out = Outlet[ByteString]("OutputStreamSource.out")
override val shape: SourceShape[ByteString] = SourceShape.of(out)
// has to be in this order as module depends on shape
val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
require(maxBuffer > 0, "Buffer size must be greater than 0") require(maxBuffer > 0, "Buffer size must be greater than 0")

View file

@ -53,34 +53,6 @@ abstract class GraphStage[S <: Shape] extends GraphStageWithMaterializedValue[S,
def createLogic(inheritedAttributes: Attributes): GraphStageLogic def createLogic(inheritedAttributes: Attributes): GraphStageLogic
} }
/**
* A SourceStage represents a reusable graph stream processing stage. A SourceStage consists of a [[akka.stream.Shape]] which describes
* its output port.
*/
abstract class SourceStage[Out, M](name: String) extends GraphStageWithMaterializedValue[SourceShape[Out], M] {
val out: Outlet[Out] = Outlet[Out](name + ".out")
override val shape: SourceShape[Out] = SourceShape(out)
}
/**
* A SinkStage represents a reusable graph stream processing stage. A SinkStage consists of a [[akka.stream.Shape]] which describes
* its input port.
*/
abstract class SinkStage[In, M](name: String) extends GraphStageWithMaterializedValue[SinkShape[In], M] {
val in: Inlet[In] = Inlet[In](name + ".in")
override val shape: SinkShape[In] = SinkShape(in)
}
/**
* A FlowStage represents a reusable graph stream processing stage. A FlowStage consists of a [[akka.stream.Shape]] which describes
* its input and output ports.
*/
abstract class FlowStage[In, Out, M](name: String) extends GraphStageWithMaterializedValue[FlowShape[In, Out], M] {
val in: Inlet[In] = Inlet[In](name + ".in")
val out: Outlet[Out] = Outlet[Out](name + ".out")
override val shape: FlowShape[In, Out] = FlowShape(in, out)
}
private object TimerMessages { private object TimerMessages {
final case class Scheduled(timerKey: Any, timerId: Int, repeating: Boolean) extends DeadLetterSuppression final case class Scheduled(timerKey: Any, timerId: Int, repeating: Boolean) extends DeadLetterSuppression
final case class Timer(id: Int, task: Cancellable) final case class Timer(id: Int, task: Cancellable)