diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala index 69a4498a06..7fc7b6021c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala @@ -6,21 +6,20 @@ package akka.stream.io import java.nio.file.{ Files, Path, StandardOpenOption } import akka.actor.ActorSystem +import akka.dispatch.ExecutionContexts import akka.stream.impl.ActorMaterializerImpl import akka.stream.impl.StreamSupervisor import akka.stream.impl.StreamSupervisor.Children -import akka.stream.scaladsl.{ FileIO, Source } +import akka.stream.scaladsl.{ FileIO, Sink, Source } import akka.stream.testkit._ import akka.stream.testkit.Utils._ -import akka.stream.ActorMaterializer -import akka.stream.ActorMaterializerSettings -import akka.stream.ActorAttributes +import akka.stream.{ ActorAttributes, ActorMaterializer, ActorMaterializerSettings, IOResult } import akka.util.{ ByteString, Timeout } import com.google.common.jimfs.{ Configuration, Jimfs } import org.scalatest.BeforeAndAfterAll import scala.collection.mutable.ListBuffer -import scala.concurrent.Await +import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) { @@ -138,6 +137,20 @@ class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) { } } + "write single line to a file from lazy sink" in assertAllStagesStopped { + //LazySink must wait for result of initialization even if got upstreamComplete + targetFile { f ⇒ + val completion = Source(List(TestByteStrings.head)) + .runWith(Sink.lazyInit[ByteString, Future[IOResult]]( + _ ⇒ Future.successful(FileIO.toPath(f)), () ⇒ Future.successful(IOResult.createSuccessful(0))) + .mapMaterializedValue(_.flatMap(identity)(ExecutionContexts.sameThreadExecutionContext))) + + Await.result(completion, 3.seconds) + + checkFileContents(f, TestLines.head) + } + } + } private def targetFile(block: Path ⇒ Unit, create: Boolean = true) { diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 9a911bd296..9eba8b717f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -446,6 +446,7 @@ final private[stream] class LazySink[T, M](sinkFactory: T ⇒ Future[Sink[T, M]] override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(stoppingDecider) + var completed = false val promise = Promise[M]() val stageLogic = new GraphStageLogic(shape) with InHandler { override def preStart(): Unit = pull(in) @@ -453,11 +454,17 @@ final private[stream] class LazySink[T, M](sinkFactory: T ⇒ Future[Sink[T, M]] override def onPush(): Unit = { try { val element = grab(in) - val cb: AsyncCallback[Try[Sink[T, M]]] = getAsyncCallback { - case Success(sink) ⇒ initInternalSource(sink, element) - case Failure(e) ⇒ failure(e) - } + val cb: AsyncCallback[Try[Sink[T, M]]] = + getAsyncCallback { + case Success(sink) ⇒ initInternalSource(sink, element) + case Failure(e) ⇒ failure(e) + } sinkFactory(element).onComplete { cb.invoke }(ExecutionContexts.sameThreadExecutionContext) + setHandler(in, new InHandler { + override def onPush(): Unit = () + override def onUpstreamFinish(): Unit = gotCompletionEvent() + override def onUpstreamFailure(ex: Throwable): Unit = failure(ex) + }) } catch { case NonFatal(e) ⇒ decider(e) match { case Supervision.Stop ⇒ failure(e) @@ -478,9 +485,13 @@ final private[stream] class LazySink[T, M](sinkFactory: T ⇒ Future[Sink[T, M]] override def onUpstreamFailure(ex: Throwable): Unit = failure(ex) setHandler(in, this) + private def gotCompletionEvent(): Unit = { + setKeepGoing(true) + completed = true + } + private def initInternalSource(sink: Sink[T, M], firstElement: T): Unit = { val sourceOut = new SubSourceOutlet[T]("LazySink") - var completed = false def switchToFirstElementHandlers(): Unit = { sourceOut.setHandler(new OutHandler { @@ -493,10 +504,7 @@ final private[stream] class LazySink[T, M](sinkFactory: T ⇒ Future[Sink[T, M]] setHandler(in, new InHandler { override def onPush(): Unit = sourceOut.push(grab(in)) - override def onUpstreamFinish(): Unit = { - setKeepGoing(true) - completed = true - } + override def onUpstreamFinish(): Unit = gotCompletionEvent() override def onUpstreamFailure(ex: Throwable): Unit = internalSourceFailure(ex) }) }