diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala index 086c8a5331..e78cdd356e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala @@ -11,7 +11,6 @@ import java.util.stream.{ Collector, Collectors } import akka.stream._ import akka.stream.testkit.Utils._ import akka.stream.testkit._ -import org.scalactic.ConversionCheckedTripleEquals import akka.testkit.DefaultTimeout import org.scalatest.concurrent.ScalaFutures import scala.concurrent.{ Await, Future } @@ -139,7 +138,6 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { } "Java collector Sink" must { - import scala.compat.java8.FunctionConverters._ class TestCollector( _supplier: () ⇒ Supplier[Array[Int]], diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index eb5b12a91a..0e215391cd 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -3,10 +3,9 @@ */ package akka.stream.scaladsl -import akka.event.{ Logging, LoggingAdapter } +import akka.event.LoggingAdapter import akka.stream._ import akka.Done -import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.StreamLayout.Module import akka.stream.impl._ import akka.stream.impl.fusing._ @@ -1831,7 +1830,7 @@ trait FlowOps[+Out, +Mat] { /** * Put an asynchronous boundary around this `Flow`. - * + * * If this is a `SubFlow` (created e.g. by `groupBy`), this creates an * asynchronous boundary around each materialized sub-flow, not the * super-flow. That way, the super-flow will communicate with sub-flows diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index e2964f727e..a2b9216c6a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -3,23 +3,20 @@ */ package akka.stream.scaladsl -import java.util.{ Spliterators, Spliterator } -import java.util.stream.StreamSupport - import akka.{ Done, NotUsed } import akka.dispatch.ExecutionContexts -import akka.actor.{ Status, ActorRef, Props } +import akka.actor.{ ActorRef, Props, Status } import akka.stream.actor.ActorSubscriber import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.StreamLayout.Module import akka.stream.impl._ -import akka.stream.stage.{ Context, PushStage, SyncDirective, TerminationDirective } +import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler, InHandler } import akka.stream.{ javadsl, _ } import org.reactivestreams.{ Publisher, Subscriber } + import scala.annotation.tailrec import scala.collection.immutable -import scala.concurrent.duration.Duration.Inf -import scala.concurrent.{ Await, ExecutionContext, Future } +import scala.concurrent.{ ExecutionContext, Future } import scala.util.{ Failure, Success, Try } /** @@ -265,23 +262,35 @@ object Sink { */ def onComplete[T](callback: Try[Done] ⇒ Unit): Sink[T, NotUsed] = { - def newOnCompleteStage(): PushStage[T, NotUsed] = { - new PushStage[T, NotUsed] { - override def onPush(elem: T, ctx: Context[NotUsed]): SyncDirective = ctx.pull() + def newOnCompleteStage(): GraphStage[FlowShape[T, NotUsed]] = { + new GraphStage[FlowShape[T, NotUsed]] { - override def onUpstreamFailure(cause: Throwable, ctx: Context[NotUsed]): TerminationDirective = { - callback(Failure(cause)) - ctx.fail(cause) - } + val in = Inlet[T]("in") + val out = Outlet[NotUsed]("out") + override val shape = FlowShape.of(in, out) - override def onUpstreamFinish(ctx: Context[NotUsed]): TerminationDirective = { - callback(Success(Done)) - ctx.finish() - } + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + + override def onPush(): Unit = pull(in) + + override def onPull(): Unit = pull(in) + + override def onUpstreamFailure(cause: Throwable): Unit = { + callback(Failure(cause)) + failStage(cause) + } + + override def onUpstreamFinish(): Unit = { + callback(Success(Done)) + completeStage() + } + + setHandlers(in, out, this) + } } } - - Flow[T].transform(newOnCompleteStage).to(Sink.ignore).named("onCompleteSink") + Flow[T].via(newOnCompleteStage()).to(Sink.ignore).named("onCompleteSink") } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 3ce9ddc387..4fcf2f75f7 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -16,7 +16,7 @@ import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.tailrec import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable -import scala.concurrent.duration.{ FiniteDuration } +import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ Future, Promise } import java.util.concurrent.CompletionStage import scala.compat.java8.FutureConverters._