=str rewrite Sink internal impl as GraphStage

This commit is contained in:
Nafer Sanabria 2016-08-09 21:08:31 -05:00
parent 22d669f7f0
commit bed69d1002
4 changed files with 32 additions and 26 deletions

View file

@ -11,7 +11,6 @@ import java.util.stream.{ Collector, Collectors }
import akka.stream._
import akka.stream.testkit.Utils._
import akka.stream.testkit._
import org.scalactic.ConversionCheckedTripleEquals
import akka.testkit.DefaultTimeout
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.{ Await, Future }
@ -139,7 +138,6 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
}
"Java collector Sink" must {
import scala.compat.java8.FunctionConverters._
class TestCollector(
_supplier: () Supplier[Array[Int]],

View file

@ -3,10 +3,9 @@
*/
package akka.stream.scaladsl
import akka.event.{ Logging, LoggingAdapter }
import akka.event.LoggingAdapter
import akka.stream._
import akka.Done
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.StreamLayout.Module
import akka.stream.impl._
import akka.stream.impl.fusing._
@ -1831,7 +1830,7 @@ trait FlowOps[+Out, +Mat] {
/**
* Put an asynchronous boundary around this `Flow`.
*
*
* If this is a `SubFlow` (created e.g. by `groupBy`), this creates an
* asynchronous boundary around each materialized sub-flow, not the
* super-flow. That way, the super-flow will communicate with sub-flows

View file

@ -3,23 +3,20 @@
*/
package akka.stream.scaladsl
import java.util.{ Spliterators, Spliterator }
import java.util.stream.StreamSupport
import akka.{ Done, NotUsed }
import akka.dispatch.ExecutionContexts
import akka.actor.{ Status, ActorRef, Props }
import akka.actor.{ ActorRef, Props, Status }
import akka.stream.actor.ActorSubscriber
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.StreamLayout.Module
import akka.stream.impl._
import akka.stream.stage.{ Context, PushStage, SyncDirective, TerminationDirective }
import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler, InHandler }
import akka.stream.{ javadsl, _ }
import org.reactivestreams.{ Publisher, Subscriber }
import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.duration.Duration.Inf
import scala.concurrent.{ Await, ExecutionContext, Future }
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success, Try }
/**
@ -265,23 +262,35 @@ object Sink {
*/
def onComplete[T](callback: Try[Done] Unit): Sink[T, NotUsed] = {
def newOnCompleteStage(): PushStage[T, NotUsed] = {
new PushStage[T, NotUsed] {
override def onPush(elem: T, ctx: Context[NotUsed]): SyncDirective = ctx.pull()
def newOnCompleteStage(): GraphStage[FlowShape[T, NotUsed]] = {
new GraphStage[FlowShape[T, NotUsed]] {
override def onUpstreamFailure(cause: Throwable, ctx: Context[NotUsed]): TerminationDirective = {
callback(Failure(cause))
ctx.fail(cause)
}
val in = Inlet[T]("in")
val out = Outlet[NotUsed]("out")
override val shape = FlowShape.of(in, out)
override def onUpstreamFinish(ctx: Context[NotUsed]): TerminationDirective = {
callback(Success(Done))
ctx.finish()
}
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
override def onPush(): Unit = pull(in)
override def onPull(): Unit = pull(in)
override def onUpstreamFailure(cause: Throwable): Unit = {
callback(Failure(cause))
failStage(cause)
}
override def onUpstreamFinish(): Unit = {
callback(Success(Done))
completeStage()
}
setHandlers(in, out, this)
}
}
}
Flow[T].transform(newOnCompleteStage).to(Sink.ignore).named("onCompleteSink")
Flow[T].via(newOnCompleteStage()).to(Sink.ignore).named("onCompleteSink")
}
/**

View file

@ -16,7 +16,7 @@ import org.reactivestreams.{ Publisher, Subscriber }
import scala.annotation.tailrec
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.concurrent.duration.{ FiniteDuration }
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ Future, Promise }
import java.util.concurrent.CompletionStage
import scala.compat.java8.FutureConverters._