Merge pull request #21166 from naferx/sink-graphstage

=str rewrite Sink internal impl as GraphStage
This commit is contained in:
Patrik Nordwall 2016-08-30 10:37:30 +02:00 committed by GitHub
commit 359a087674
4 changed files with 31 additions and 25 deletions

View file

@ -11,7 +11,6 @@ import java.util.stream.{ Collector, Collectors }
import akka.stream._
import akka.stream.testkit.Utils._
import akka.stream.testkit._
import org.scalactic.ConversionCheckedTripleEquals
import akka.testkit.DefaultTimeout
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.{ Await, Future }
@ -139,7 +138,6 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
}
"Java collector Sink" must {
import scala.compat.java8.FunctionConverters._
class TestCollector(
_supplier: () Supplier[Array[Int]],

View file

@ -3,10 +3,9 @@
*/
package akka.stream.scaladsl
import akka.event.{ Logging, LoggingAdapter }
import akka.event.LoggingAdapter
import akka.stream._
import akka.Done
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.StreamLayout.Module
import akka.stream.impl._
import akka.stream.impl.fusing._

View file

@ -3,23 +3,20 @@
*/
package akka.stream.scaladsl
import java.util.{ Spliterators, Spliterator }
import java.util.stream.StreamSupport
import akka.{ Done, NotUsed }
import akka.dispatch.ExecutionContexts
import akka.actor.{ Status, ActorRef, Props }
import akka.actor.{ ActorRef, Props, Status }
import akka.stream.actor.ActorSubscriber
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.StreamLayout.Module
import akka.stream.impl._
import akka.stream.stage.{ Context, PushStage, SyncDirective, TerminationDirective }
import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler, InHandler }
import akka.stream.{ javadsl, _ }
import org.reactivestreams.{ Publisher, Subscriber }
import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.duration.Duration.Inf
import scala.concurrent.{ Await, ExecutionContext, Future }
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success, Try }
/**
@ -283,23 +280,35 @@ object Sink {
*/
def onComplete[T](callback: Try[Done] Unit): Sink[T, NotUsed] = {
def newOnCompleteStage(): PushStage[T, NotUsed] = {
new PushStage[T, NotUsed] {
override def onPush(elem: T, ctx: Context[NotUsed]): SyncDirective = ctx.pull()
def newOnCompleteStage(): GraphStage[FlowShape[T, NotUsed]] = {
new GraphStage[FlowShape[T, NotUsed]] {
override def onUpstreamFailure(cause: Throwable, ctx: Context[NotUsed]): TerminationDirective = {
callback(Failure(cause))
ctx.fail(cause)
}
val in = Inlet[T]("in")
val out = Outlet[NotUsed]("out")
override val shape = FlowShape.of(in, out)
override def onUpstreamFinish(ctx: Context[NotUsed]): TerminationDirective = {
callback(Success(Done))
ctx.finish()
}
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
override def onPush(): Unit = pull(in)
override def onPull(): Unit = pull(in)
override def onUpstreamFailure(cause: Throwable): Unit = {
callback(Failure(cause))
failStage(cause)
}
override def onUpstreamFinish(): Unit = {
callback(Success(Done))
completeStage()
}
setHandlers(in, out, this)
}
}
}
Flow[T].transform(newOnCompleteStage).to(Sink.ignore).named("onCompleteSink")
Flow[T].via(newOnCompleteStage()).to(Sink.ignore).named("onCompleteSink")
}
/**

View file

@ -16,7 +16,7 @@ import org.reactivestreams.{ Publisher, Subscriber }
import scala.annotation.tailrec
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.concurrent.duration.{ FiniteDuration }
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ Future, Promise }
import java.util.concurrent.CompletionStage
import scala.compat.java8.FutureConverters._