diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala index 086c8a5331..e78cdd356e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala @@ -11,7 +11,6 @@ import java.util.stream.{ Collector, Collectors } import akka.stream._ import akka.stream.testkit.Utils._ import akka.stream.testkit._ -import org.scalactic.ConversionCheckedTripleEquals import akka.testkit.DefaultTimeout import org.scalatest.concurrent.ScalaFutures import scala.concurrent.{ Await, Future } @@ -139,7 +138,6 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { } "Java collector Sink" must { - import scala.compat.java8.FunctionConverters._ class TestCollector( _supplier: () ⇒ Supplier[Array[Int]], diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 8b588e7e9b..1a8b18ac8c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -3,10 +3,9 @@ */ package akka.stream.scaladsl -import akka.event.{ Logging, LoggingAdapter } +import akka.event.LoggingAdapter import akka.stream._ import akka.Done -import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.StreamLayout.Module import akka.stream.impl._ import akka.stream.impl.fusing._ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 55cfb7d8cc..d44e243de8 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -3,23 +3,20 @@ */ package akka.stream.scaladsl -import java.util.{ Spliterators, Spliterator } -import java.util.stream.StreamSupport - import akka.{ Done, NotUsed } import akka.dispatch.ExecutionContexts -import akka.actor.{ Status, ActorRef, Props } +import akka.actor.{ ActorRef, Props, Status } import akka.stream.actor.ActorSubscriber import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.StreamLayout.Module import akka.stream.impl._ -import akka.stream.stage.{ Context, PushStage, SyncDirective, TerminationDirective } +import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler, InHandler } import akka.stream.{ javadsl, _ } import org.reactivestreams.{ Publisher, Subscriber } + import scala.annotation.tailrec import scala.collection.immutable -import scala.concurrent.duration.Duration.Inf -import scala.concurrent.{ Await, ExecutionContext, Future } +import scala.concurrent.{ ExecutionContext, Future } import scala.util.{ Failure, Success, Try } /** @@ -283,23 +280,35 @@ object Sink { */ def onComplete[T](callback: Try[Done] ⇒ Unit): Sink[T, NotUsed] = { - def newOnCompleteStage(): PushStage[T, NotUsed] = { - new PushStage[T, NotUsed] { - override def onPush(elem: T, ctx: Context[NotUsed]): SyncDirective = ctx.pull() + def newOnCompleteStage(): GraphStage[FlowShape[T, NotUsed]] = { + new GraphStage[FlowShape[T, NotUsed]] { - override def onUpstreamFailure(cause: Throwable, ctx: Context[NotUsed]): TerminationDirective = { - callback(Failure(cause)) - ctx.fail(cause) - } + val in = Inlet[T]("in") + val out = Outlet[NotUsed]("out") + override val shape = FlowShape.of(in, out) - override def onUpstreamFinish(ctx: Context[NotUsed]): TerminationDirective = { - callback(Success(Done)) - ctx.finish() - } + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + + override def onPush(): Unit = pull(in) + + override def onPull(): Unit = pull(in) + + override def onUpstreamFailure(cause: Throwable): Unit = { + callback(Failure(cause)) + failStage(cause) + } + + override def onUpstreamFinish(): Unit = { + callback(Success(Done)) + completeStage() + } + + setHandlers(in, out, this) + } } } - - Flow[T].transform(newOnCompleteStage).to(Sink.ignore).named("onCompleteSink") + Flow[T].via(newOnCompleteStage()).to(Sink.ignore).named("onCompleteSink") } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 56d1a836a4..dba0c809e0 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -16,7 +16,7 @@ import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.tailrec import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable -import scala.concurrent.duration.{ FiniteDuration } +import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ Future, Promise } import java.util.concurrent.CompletionStage import scala.compat.java8.FutureConverters._