#22030 Lazy sink wait for init

This commit is contained in:
Alexander Golubev 2017-02-16 03:19:25 -05:00 committed by Johan Andrén
parent 021829e21e
commit 552481b511
2 changed files with 35 additions and 14 deletions

View file

@ -6,21 +6,20 @@ package akka.stream.io
import java.nio.file.{ Files, Path, StandardOpenOption } import java.nio.file.{ Files, Path, StandardOpenOption }
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.dispatch.ExecutionContexts
import akka.stream.impl.ActorMaterializerImpl import akka.stream.impl.ActorMaterializerImpl
import akka.stream.impl.StreamSupervisor import akka.stream.impl.StreamSupervisor
import akka.stream.impl.StreamSupervisor.Children import akka.stream.impl.StreamSupervisor.Children
import akka.stream.scaladsl.{ FileIO, Source } import akka.stream.scaladsl.{ FileIO, Sink, Source }
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.ActorMaterializer import akka.stream.{ ActorAttributes, ActorMaterializer, ActorMaterializerSettings, IOResult }
import akka.stream.ActorMaterializerSettings
import akka.stream.ActorAttributes
import akka.util.{ ByteString, Timeout } import akka.util.{ ByteString, Timeout }
import com.google.common.jimfs.{ Configuration, Jimfs } import com.google.common.jimfs.{ Configuration, Jimfs }
import org.scalatest.BeforeAndAfterAll import org.scalatest.BeforeAndAfterAll
import scala.collection.mutable.ListBuffer import scala.collection.mutable.ListBuffer
import scala.concurrent.Await import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._ import scala.concurrent.duration._
class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) { class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) {
@ -138,6 +137,20 @@ class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) {
} }
} }
"write single line to a file from lazy sink" in assertAllStagesStopped {
//LazySink must wait for result of initialization even if got upstreamComplete
targetFile { f
val completion = Source(List(TestByteStrings.head))
.runWith(Sink.lazyInit[ByteString, Future[IOResult]](
_ Future.successful(FileIO.toPath(f)), () Future.successful(IOResult.createSuccessful(0)))
.mapMaterializedValue(_.flatMap(identity)(ExecutionContexts.sameThreadExecutionContext)))
Await.result(completion, 3.seconds)
checkFileContents(f, TestLines.head)
}
}
} }
private def targetFile(block: Path Unit, create: Boolean = true) { private def targetFile(block: Path Unit, create: Boolean = true) {

View file

@ -446,6 +446,7 @@ final private[stream] class LazySink[T, M](sinkFactory: T ⇒ Future[Sink[T, M]]
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(stoppingDecider) lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(stoppingDecider)
var completed = false
val promise = Promise[M]() val promise = Promise[M]()
val stageLogic = new GraphStageLogic(shape) with InHandler { val stageLogic = new GraphStageLogic(shape) with InHandler {
override def preStart(): Unit = pull(in) override def preStart(): Unit = pull(in)
@ -453,11 +454,17 @@ final private[stream] class LazySink[T, M](sinkFactory: T ⇒ Future[Sink[T, M]]
override def onPush(): Unit = { override def onPush(): Unit = {
try { try {
val element = grab(in) val element = grab(in)
val cb: AsyncCallback[Try[Sink[T, M]]] = getAsyncCallback { val cb: AsyncCallback[Try[Sink[T, M]]] =
getAsyncCallback {
case Success(sink) initInternalSource(sink, element) case Success(sink) initInternalSource(sink, element)
case Failure(e) failure(e) case Failure(e) failure(e)
} }
sinkFactory(element).onComplete { cb.invoke }(ExecutionContexts.sameThreadExecutionContext) sinkFactory(element).onComplete { cb.invoke }(ExecutionContexts.sameThreadExecutionContext)
setHandler(in, new InHandler {
override def onPush(): Unit = ()
override def onUpstreamFinish(): Unit = gotCompletionEvent()
override def onUpstreamFailure(ex: Throwable): Unit = failure(ex)
})
} catch { } catch {
case NonFatal(e) decider(e) match { case NonFatal(e) decider(e) match {
case Supervision.Stop failure(e) case Supervision.Stop failure(e)
@ -478,9 +485,13 @@ final private[stream] class LazySink[T, M](sinkFactory: T ⇒ Future[Sink[T, M]]
override def onUpstreamFailure(ex: Throwable): Unit = failure(ex) override def onUpstreamFailure(ex: Throwable): Unit = failure(ex)
setHandler(in, this) setHandler(in, this)
private def gotCompletionEvent(): Unit = {
setKeepGoing(true)
completed = true
}
private def initInternalSource(sink: Sink[T, M], firstElement: T): Unit = { private def initInternalSource(sink: Sink[T, M], firstElement: T): Unit = {
val sourceOut = new SubSourceOutlet[T]("LazySink") val sourceOut = new SubSourceOutlet[T]("LazySink")
var completed = false
def switchToFirstElementHandlers(): Unit = { def switchToFirstElementHandlers(): Unit = {
sourceOut.setHandler(new OutHandler { sourceOut.setHandler(new OutHandler {
@ -493,10 +504,7 @@ final private[stream] class LazySink[T, M](sinkFactory: T ⇒ Future[Sink[T, M]]
setHandler(in, new InHandler { setHandler(in, new InHandler {
override def onPush(): Unit = sourceOut.push(grab(in)) override def onPush(): Unit = sourceOut.push(grab(in))
override def onUpstreamFinish(): Unit = { override def onUpstreamFinish(): Unit = gotCompletionEvent()
setKeepGoing(true)
completed = true
}
override def onUpstreamFailure(ex: Throwable): Unit = internalSourceFailure(ex) override def onUpstreamFailure(ex: Throwable): Unit = internalSourceFailure(ex)
}) })
} }