diff --git a/akka-stream/src/main/scala/akka/stream/impl/EmptySource.scala b/akka-stream/src/main/scala/akka/stream/impl/EmptySource.scala new file mode 100644 index 0000000000..676458b91f --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/EmptySource.scala @@ -0,0 +1,30 @@ +/** + * Copyright (C) 2015-2017 Lightbend Inc. + */ +package akka.stream.impl + +import akka.annotation.InternalApi +import akka.stream._ +import akka.stream.impl.Stages.DefaultAttributes +import akka.stream.stage._ + +/** + * INTERNAL API + */ +@InternalApi private[akka] final object EmptySource extends GraphStage[SourceShape[Nothing]] { + val out = Outlet[Nothing]("EmptySource.out") + override val shape = SourceShape(out) + + override protected def initialAttributes = DefaultAttributes.lazySource + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with OutHandler { + override def preStart(): Unit = completeStage() + override def onPull(): Unit = completeStage() + + setHandler(out, this) + } + + override def toString = "EmptySource" +} + diff --git a/akka-stream/src/main/scala/akka/stream/impl/LazySource.scala b/akka-stream/src/main/scala/akka/stream/impl/LazySource.scala new file mode 100644 index 0000000000..ca9e39ce55 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/LazySource.scala @@ -0,0 +1,84 @@ +/** + * Copyright (C) 2015-2017 Lightbend Inc. + */ +package akka.stream.impl + +import akka.annotation.InternalApi +import akka.stream._ +import akka.stream.impl.Stages.DefaultAttributes +import akka.stream.scaladsl.{ Keep, Source } +import akka.stream.stage._ + +import scala.concurrent.{ Future, Promise } +import scala.util.control.NonFatal + +/** + * INTERNAL API + */ +@InternalApi private[akka] object LazySource { + def apply[T, M](sourceFactory: () ⇒ Source[T, M]) = new LazySource[T, M](sourceFactory) +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] final class LazySource[T, M](sourceFactory: () ⇒ Source[T, M]) extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] { + val out = Outlet[T]("LazySource.out") + override val shape = SourceShape(out) + + override protected def initialAttributes = DefaultAttributes.lazySource + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { + val matPromise = Promise[M]() + val logic = new GraphStageLogic(shape) with OutHandler { + + override def onDownstreamFinish(): Unit = { + matPromise.failure(new RuntimeException("Downstream canceled without triggering lazy source materialization")) + completeStage() + } + + override def onPull(): Unit = { + val source = sourceFactory() + val subSink = new SubSinkInlet[T]("LazySource") + subSink.pull() + + setHandler(out, new OutHandler { + override def onPull(): Unit = { + subSink.pull() + } + + override def onDownstreamFinish(): Unit = { + subSink.cancel() + completeStage() + } + }) + + subSink.setHandler(new InHandler { + override def onPush(): Unit = { + push(out, subSink.grab()) + } + }) + + try { + val matVal = subFusingMaterializer.materialize(source.toMat(subSink.sink)(Keep.left), inheritedAttributes) + matPromise.trySuccess(matVal) + } catch { + case NonFatal(ex) ⇒ + subSink.cancel() + failStage(ex) + matPromise.tryFailure(ex) + } + } + + setHandler(out, this) + + override def postStop() = { + matPromise.tryFailure(new RuntimeException("LazySource stopped without completing the materialized future")) + } + } + + (logic, matPromise.future) + } + + override def toString = "LazySource" +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala b/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala new file mode 100644 index 0000000000..db6e986d45 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala @@ -0,0 +1,201 @@ +/** + * Copyright (C) 2015-2017 Lightbend Inc. + */ +package akka.stream.impl + +import akka.dispatch.ExecutionContexts +import akka.stream.ActorAttributes.SupervisionStrategy +import akka.stream.OverflowStrategies._ +import akka.stream._ +import akka.stream.impl.Stages.DefaultAttributes +import akka.stream.stage._ +import akka.stream.scaladsl.{ Keep, Source, SourceQueueWithComplete } + +import scala.annotation.tailrec +import scala.concurrent.{ Future, Promise } +import akka.Done +import java.util.concurrent.CompletionStage + +import akka.annotation.InternalApi +import akka.util.OptionVal + +import scala.compat.java8.FutureConverters._ +import scala.util.Try +import scala.util.control.NonFatal + +/** + * INTERNAL API + */ +@InternalApi private[akka] object QueueSource { + sealed trait Input[+T] + final case class Offer[+T](elem: T, promise: Promise[QueueOfferResult]) extends Input[T] + case object Completion extends Input[Nothing] + final case class Failure(ex: Throwable) extends Input[Nothing] +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] final class QueueSource[T](maxBuffer: Int, overflowStrategy: OverflowStrategy) extends GraphStageWithMaterializedValue[SourceShape[T], SourceQueueWithComplete[T]] { + import QueueSource._ + + val out = Outlet[T]("queueSource.out") + override val shape: SourceShape[T] = SourceShape.of(out) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { + val completion = Promise[Done] + val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[Input[T]] with OutHandler { + var buffer: Buffer[T] = _ + var pendingOffer: Option[Offer[T]] = None + var terminating = false + + override def preStart(): Unit = { + if (maxBuffer > 0) buffer = Buffer(maxBuffer, materializer) + initCallback(callback.invoke) + } + override def postStop(): Unit = stopCallback { + case Offer(elem, promise) ⇒ promise.failure(new IllegalStateException("Stream is terminated. SourceQueue is detached")) + case _ ⇒ // ignore + } + + private def enqueueAndSuccess(offer: Offer[T]): Unit = { + buffer.enqueue(offer.elem) + offer.promise.success(QueueOfferResult.Enqueued) + } + + private def bufferElem(offer: Offer[T]): Unit = { + if (!buffer.isFull) { + enqueueAndSuccess(offer) + } else overflowStrategy match { + case DropHead ⇒ + buffer.dropHead() + enqueueAndSuccess(offer) + case DropTail ⇒ + buffer.dropTail() + enqueueAndSuccess(offer) + case DropBuffer ⇒ + buffer.clear() + enqueueAndSuccess(offer) + case DropNew ⇒ + offer.promise.success(QueueOfferResult.Dropped) + case Fail ⇒ + val bufferOverflowException = new BufferOverflowException(s"Buffer overflow (max capacity was: $maxBuffer)!") + offer.promise.success(QueueOfferResult.Failure(bufferOverflowException)) + completion.failure(bufferOverflowException) + failStage(bufferOverflowException) + case Backpressure ⇒ + pendingOffer match { + case Some(_) ⇒ + offer.promise.failure(new IllegalStateException("You have to wait for previous offer to be resolved to send another request")) + case None ⇒ + pendingOffer = Some(offer) + } + } + } + + private val callback: AsyncCallback[Input[T]] = getAsyncCallback { + + case offer @ Offer(elem, promise) ⇒ + if (maxBuffer != 0) { + bufferElem(offer) + if (isAvailable(out)) push(out, buffer.dequeue()) + } else if (isAvailable(out)) { + push(out, elem) + promise.success(QueueOfferResult.Enqueued) + } else if (pendingOffer.isEmpty) + pendingOffer = Some(offer) + else overflowStrategy match { + case DropHead | DropBuffer ⇒ + pendingOffer.get.promise.success(QueueOfferResult.Dropped) + pendingOffer = Some(offer) + case DropTail | DropNew ⇒ + promise.success(QueueOfferResult.Dropped) + case Fail ⇒ + val bufferOverflowException = new BufferOverflowException(s"Buffer overflow (max capacity was: $maxBuffer)!") + promise.success(QueueOfferResult.Failure(bufferOverflowException)) + completion.failure(bufferOverflowException) + failStage(bufferOverflowException) + case Backpressure ⇒ + promise.failure(new IllegalStateException("You have to wait for previous offer to be resolved to send another request")) + } + + case Completion ⇒ + if (maxBuffer != 0 && buffer.nonEmpty || pendingOffer.nonEmpty) terminating = true + else { + completion.success(Done) + completeStage() + } + + case Failure(ex) ⇒ + completion.failure(ex) + failStage(ex) + } + + setHandler(out, this) + + override def onDownstreamFinish(): Unit = { + pendingOffer match { + case Some(Offer(elem, promise)) ⇒ + promise.success(QueueOfferResult.QueueClosed) + pendingOffer = None + case None ⇒ // do nothing + } + completion.success(Done) + completeStage() + } + + override def onPull(): Unit = { + if (maxBuffer == 0) { + pendingOffer match { + case Some(Offer(elem, promise)) ⇒ + push(out, elem) + promise.success(QueueOfferResult.Enqueued) + pendingOffer = None + if (terminating) { + completion.success(Done) + completeStage() + } + case None ⇒ + } + } else if (buffer.nonEmpty) { + push(out, buffer.dequeue()) + pendingOffer match { + case Some(offer) ⇒ + enqueueAndSuccess(offer) + pendingOffer = None + case None ⇒ //do nothing + } + if (terminating && buffer.isEmpty) { + completion.success(Done) + completeStage() + } + } + } + } + + (stageLogic, new SourceQueueWithComplete[T] { + override def watchCompletion() = completion.future + override def offer(element: T): Future[QueueOfferResult] = { + val p = Promise[QueueOfferResult] + stageLogic.invoke(Offer(element, p)) + p.future + } + override def complete(): Unit = { + stageLogic.invoke(Completion) + } + override def fail(ex: Throwable): Unit = { + stageLogic.invoke(Failure(ex)) + } + }) + } +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] final class SourceQueueAdapter[T](delegate: SourceQueueWithComplete[T]) extends akka.stream.javadsl.SourceQueueWithComplete[T] { + def offer(elem: T): CompletionStage[QueueOfferResult] = delegate.offer(elem).toJava + def watchCompletion(): CompletionStage[Done] = delegate.watchCompletion().toJava + def complete(): Unit = delegate.complete() + def fail(ex: Throwable): Unit = delegate.fail(ex) +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala deleted file mode 100644 index 30b8ff475b..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala +++ /dev/null @@ -1,457 +0,0 @@ -/** - * Copyright (C) 2015-2017 Lightbend Inc. - */ -package akka.stream.impl - -import akka.dispatch.ExecutionContexts -import akka.stream.ActorAttributes.SupervisionStrategy -import akka.stream.OverflowStrategies._ -import akka.stream._ -import akka.stream.impl.Stages.DefaultAttributes -import akka.stream.stage._ -import akka.stream.scaladsl.{ Keep, Source, SourceQueueWithComplete } - -import scala.annotation.tailrec -import scala.concurrent.{ Future, Promise } -import akka.Done -import java.util.concurrent.CompletionStage - -import akka.annotation.InternalApi -import akka.util.OptionVal - -import scala.compat.java8.FutureConverters._ -import scala.util.Try -import scala.util.control.NonFatal - -/** - * INTERNAL API - */ -@InternalApi private[akka] object QueueSource { - sealed trait Input[+T] - final case class Offer[+T](elem: T, promise: Promise[QueueOfferResult]) extends Input[T] - case object Completion extends Input[Nothing] - final case class Failure(ex: Throwable) extends Input[Nothing] -} - -/** - * INTERNAL API - */ -@InternalApi private[akka] final class QueueSource[T](maxBuffer: Int, overflowStrategy: OverflowStrategy) extends GraphStageWithMaterializedValue[SourceShape[T], SourceQueueWithComplete[T]] { - import QueueSource._ - - val out = Outlet[T]("queueSource.out") - override val shape: SourceShape[T] = SourceShape.of(out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { - val completion = Promise[Done] - val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[Input[T]] with OutHandler { - var buffer: Buffer[T] = _ - var pendingOffer: Option[Offer[T]] = None - var terminating = false - - override def preStart(): Unit = { - if (maxBuffer > 0) buffer = Buffer(maxBuffer, materializer) - initCallback(callback.invoke) - } - override def postStop(): Unit = stopCallback { - case Offer(elem, promise) ⇒ promise.failure(new IllegalStateException("Stream is terminated. SourceQueue is detached")) - case _ ⇒ // ignore - } - - private def enqueueAndSuccess(offer: Offer[T]): Unit = { - buffer.enqueue(offer.elem) - offer.promise.success(QueueOfferResult.Enqueued) - } - - private def bufferElem(offer: Offer[T]): Unit = { - if (!buffer.isFull) { - enqueueAndSuccess(offer) - } else overflowStrategy match { - case DropHead ⇒ - buffer.dropHead() - enqueueAndSuccess(offer) - case DropTail ⇒ - buffer.dropTail() - enqueueAndSuccess(offer) - case DropBuffer ⇒ - buffer.clear() - enqueueAndSuccess(offer) - case DropNew ⇒ - offer.promise.success(QueueOfferResult.Dropped) - case Fail ⇒ - val bufferOverflowException = new BufferOverflowException(s"Buffer overflow (max capacity was: $maxBuffer)!") - offer.promise.success(QueueOfferResult.Failure(bufferOverflowException)) - completion.failure(bufferOverflowException) - failStage(bufferOverflowException) - case Backpressure ⇒ - pendingOffer match { - case Some(_) ⇒ - offer.promise.failure(new IllegalStateException("You have to wait for previous offer to be resolved to send another request")) - case None ⇒ - pendingOffer = Some(offer) - } - } - } - - private val callback: AsyncCallback[Input[T]] = getAsyncCallback { - - case offer @ Offer(elem, promise) ⇒ - if (maxBuffer != 0) { - bufferElem(offer) - if (isAvailable(out)) push(out, buffer.dequeue()) - } else if (isAvailable(out)) { - push(out, elem) - promise.success(QueueOfferResult.Enqueued) - } else if (pendingOffer.isEmpty) - pendingOffer = Some(offer) - else overflowStrategy match { - case DropHead | DropBuffer ⇒ - pendingOffer.get.promise.success(QueueOfferResult.Dropped) - pendingOffer = Some(offer) - case DropTail | DropNew ⇒ - promise.success(QueueOfferResult.Dropped) - case Fail ⇒ - val bufferOverflowException = new BufferOverflowException(s"Buffer overflow (max capacity was: $maxBuffer)!") - promise.success(QueueOfferResult.Failure(bufferOverflowException)) - completion.failure(bufferOverflowException) - failStage(bufferOverflowException) - case Backpressure ⇒ - promise.failure(new IllegalStateException("You have to wait for previous offer to be resolved to send another request")) - } - - case Completion ⇒ - if (maxBuffer != 0 && buffer.nonEmpty || pendingOffer.nonEmpty) terminating = true - else { - completion.success(Done) - completeStage() - } - - case Failure(ex) ⇒ - completion.failure(ex) - failStage(ex) - } - - setHandler(out, this) - - override def onDownstreamFinish(): Unit = { - pendingOffer match { - case Some(Offer(elem, promise)) ⇒ - promise.success(QueueOfferResult.QueueClosed) - pendingOffer = None - case None ⇒ // do nothing - } - completion.success(Done) - completeStage() - } - - override def onPull(): Unit = { - if (maxBuffer == 0) { - pendingOffer match { - case Some(Offer(elem, promise)) ⇒ - push(out, elem) - promise.success(QueueOfferResult.Enqueued) - pendingOffer = None - if (terminating) { - completion.success(Done) - completeStage() - } - case None ⇒ - } - } else if (buffer.nonEmpty) { - push(out, buffer.dequeue()) - pendingOffer match { - case Some(offer) ⇒ - enqueueAndSuccess(offer) - pendingOffer = None - case None ⇒ //do nothing - } - if (terminating && buffer.isEmpty) { - completion.success(Done) - completeStage() - } - } - } - } - - (stageLogic, new SourceQueueWithComplete[T] { - override def watchCompletion() = completion.future - override def offer(element: T): Future[QueueOfferResult] = { - val p = Promise[QueueOfferResult] - stageLogic.invoke(Offer(element, p)) - p.future - } - override def complete(): Unit = { - stageLogic.invoke(Completion) - } - override def fail(ex: Throwable): Unit = { - stageLogic.invoke(Failure(ex)) - } - }) - } -} - -/** - * INTERNAL API - */ -@InternalApi private[akka] final class SourceQueueAdapter[T](delegate: SourceQueueWithComplete[T]) extends akka.stream.javadsl.SourceQueueWithComplete[T] { - def offer(elem: T): CompletionStage[QueueOfferResult] = delegate.offer(elem).toJava - def watchCompletion(): CompletionStage[Done] = delegate.watchCompletion().toJava - def complete(): Unit = delegate.complete() - def fail(ex: Throwable): Unit = delegate.fail(ex) -} - -/** - * INTERNAL API - */ -@InternalApi private[akka] final class UnfoldResourceSource[T, S]( - create: () ⇒ S, - readData: (S) ⇒ Option[T], - close: (S) ⇒ Unit) extends GraphStage[SourceShape[T]] { - val out = Outlet[T]("UnfoldResourceSource.out") - override val shape = SourceShape(out) - override def initialAttributes: Attributes = DefaultAttributes.unfoldResourceSource - - def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler { - lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) - var open = false - var blockingStream: S = _ - setHandler(out, this) - - override def preStart(): Unit = { - blockingStream = create() - open = true - } - - @tailrec - final override def onPull(): Unit = { - var resumingMode = false - try { - readData(blockingStream) match { - case Some(data) ⇒ push(out, data) - case None ⇒ closeStage() - } - } catch { - case NonFatal(ex) ⇒ decider(ex) match { - case Supervision.Stop ⇒ - close(blockingStream) - failStage(ex) - case Supervision.Restart ⇒ - restartState() - resumingMode = true - case Supervision.Resume ⇒ - resumingMode = true - } - } - if (resumingMode) onPull() - } - - override def onDownstreamFinish(): Unit = closeStage() - - private def restartState(): Unit = { - close(blockingStream) - blockingStream = create() - open = true - } - - private def closeStage(): Unit = - try { - close(blockingStream) - open = false - completeStage() - } catch { - case NonFatal(ex) ⇒ failStage(ex) - } - - override def postStop(): Unit = { - if (open) close(blockingStream) - } - - } - override def toString = "UnfoldResourceSource" -} - -/** - * INTERNAL API - */ -@InternalApi private[akka] final class UnfoldResourceSourceAsync[T, S]( - create: () ⇒ Future[S], - readData: (S) ⇒ Future[Option[T]], - close: (S) ⇒ Future[Done]) extends GraphStage[SourceShape[T]] { - val out = Outlet[T]("UnfoldResourceSourceAsync.out") - override val shape = SourceShape(out) - override def initialAttributes: Attributes = DefaultAttributes.unfoldResourceSourceAsync - - def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler { - lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) - var resource = Promise[S]() - var open = false - implicit val context = ExecutionContexts.sameThreadExecutionContext - - setHandler(out, this) - - override def preStart(): Unit = createStream(false) - - private def createStream(withPull: Boolean): Unit = { - val createdCallback = getAsyncCallback[Try[S]] { - case scala.util.Success(res) ⇒ - open = true - resource.success(res) - if (withPull) onPull() - case scala.util.Failure(t) ⇒ failStage(t) - } - try { - create().onComplete(createdCallback.invoke) - } catch { - case NonFatal(ex) ⇒ failStage(ex) - } - } - - private def onResourceReady(f: (S) ⇒ Unit): Unit = resource.future.foreach(f) - - val errorHandler: PartialFunction[Throwable, Unit] = { - case NonFatal(ex) ⇒ decider(ex) match { - case Supervision.Stop ⇒ - onResourceReady(close(_)) - failStage(ex) - case Supervision.Restart ⇒ restartState() - case Supervision.Resume ⇒ onPull() - } - } - - val readCallback = getAsyncCallback[Try[Option[T]]] { - case scala.util.Success(data) ⇒ data match { - case Some(d) ⇒ push(out, d) - case None ⇒ closeStage() - } - case scala.util.Failure(t) ⇒ errorHandler(t) - }.invoke _ - - final override def onPull(): Unit = - onResourceReady { resource ⇒ - try { readData(resource).onComplete(readCallback) } catch errorHandler - } - - override def onDownstreamFinish(): Unit = closeStage() - - private def closeAndThen(f: () ⇒ Unit): Unit = { - setKeepGoing(true) - val closedCallback = getAsyncCallback[Try[Done]] { - case scala.util.Success(_) ⇒ - open = false - f() - case scala.util.Failure(t) ⇒ - open = false - failStage(t) - } - - onResourceReady(res ⇒ - try { close(res).onComplete(closedCallback.invoke) } catch { - case NonFatal(ex) ⇒ failStage(ex) - }) - } - private def restartState(): Unit = closeAndThen(() ⇒ { - resource = Promise[S]() - createStream(true) - }) - private def closeStage(): Unit = closeAndThen(completeStage) - - override def postStop(): Unit = { - if (open) closeStage() - } - - } - override def toString = "UnfoldResourceSourceAsync" - -} - -/** - * INTERNAL API - */ -@InternalApi private[akka] object LazySource { - def apply[T, M](sourceFactory: () ⇒ Source[T, M]) = new LazySource[T, M](sourceFactory) -} - -/** - * INTERNAL API - */ -@InternalApi private[akka] final class LazySource[T, M](sourceFactory: () ⇒ Source[T, M]) extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] { - val out = Outlet[T]("LazySource.out") - override val shape = SourceShape(out) - - override protected def initialAttributes = DefaultAttributes.lazySource - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = { - val matPromise = Promise[M]() - val logic = new GraphStageLogic(shape) with OutHandler { - - override def onDownstreamFinish(): Unit = { - matPromise.failure(new RuntimeException("Downstream canceled without triggering lazy source materialization")) - completeStage() - } - - override def onPull(): Unit = { - val source = sourceFactory() - val subSink = new SubSinkInlet[T]("LazySource") - subSink.pull() - - setHandler(out, new OutHandler { - override def onPull(): Unit = { - subSink.pull() - } - - override def onDownstreamFinish(): Unit = { - subSink.cancel() - completeStage() - } - }) - - subSink.setHandler(new InHandler { - override def onPush(): Unit = { - push(out, subSink.grab()) - } - }) - - try { - val matVal = subFusingMaterializer.materialize(source.toMat(subSink.sink)(Keep.left), inheritedAttributes) - matPromise.trySuccess(matVal) - } catch { - case NonFatal(ex) ⇒ - subSink.cancel() - failStage(ex) - matPromise.tryFailure(ex) - } - } - - setHandler(out, this) - - override def postStop() = { - matPromise.tryFailure(new RuntimeException("LazySource stopped without completing the materialized future")) - } - } - - (logic, matPromise.future) - } - - override def toString = "LazySource" -} - -/** - * INTERNAL API - */ -@InternalApi private[akka] final object EmptySource extends GraphStage[SourceShape[Nothing]] { - val out = Outlet[Nothing]("EmptySource.out") - override val shape = SourceShape(out) - - override protected def initialAttributes = DefaultAttributes.lazySource - - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new GraphStageLogic(shape) with OutHandler { - override def preStart(): Unit = completeStage() - override def onPull(): Unit = completeStage() - - setHandler(out, this) - } - - override def toString = "EmptySource" -} - diff --git a/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSource.scala b/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSource.scala new file mode 100644 index 0000000000..ca5d6d0d1e --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSource.scala @@ -0,0 +1,83 @@ +/** + * Copyright (C) 2015-2017 Lightbend Inc. + */ +package akka.stream.impl + +import akka.annotation.InternalApi +import akka.stream.ActorAttributes.SupervisionStrategy +import akka.stream._ +import akka.stream.impl.Stages.DefaultAttributes +import akka.stream.stage._ + +import scala.annotation.tailrec +import scala.util.control.NonFatal + +/** + * INTERNAL API + */ +@InternalApi private[akka] final class UnfoldResourceSource[T, S]( + create: () ⇒ S, + readData: (S) ⇒ Option[T], + close: (S) ⇒ Unit) extends GraphStage[SourceShape[T]] { + val out = Outlet[T]("UnfoldResourceSource.out") + override val shape = SourceShape(out) + override def initialAttributes: Attributes = DefaultAttributes.unfoldResourceSource + + def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler { + lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + var open = false + var blockingStream: S = _ + setHandler(out, this) + + override def preStart(): Unit = { + blockingStream = create() + open = true + } + + @tailrec + final override def onPull(): Unit = { + var resumingMode = false + try { + readData(blockingStream) match { + case Some(data) ⇒ push(out, data) + case None ⇒ closeStage() + } + } catch { + case NonFatal(ex) ⇒ decider(ex) match { + case Supervision.Stop ⇒ + close(blockingStream) + failStage(ex) + case Supervision.Restart ⇒ + restartState() + resumingMode = true + case Supervision.Resume ⇒ + resumingMode = true + } + } + if (resumingMode) onPull() + } + + override def onDownstreamFinish(): Unit = closeStage() + + private def restartState(): Unit = { + close(blockingStream) + blockingStream = create() + open = true + } + + private def closeStage(): Unit = + try { + close(blockingStream) + open = false + completeStage() + } catch { + case NonFatal(ex) ⇒ failStage(ex) + } + + override def postStop(): Unit = { + if (open) close(blockingStream) + } + + } + override def toString = "UnfoldResourceSource" +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSourceAsync.scala b/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSourceAsync.scala new file mode 100644 index 0000000000..1451aaff1b --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSourceAsync.scala @@ -0,0 +1,110 @@ +/** + * Copyright (C) 2015-2017 Lightbend Inc. + */ +package akka.stream.impl + +import akka.Done +import akka.annotation.InternalApi +import akka.dispatch.ExecutionContexts +import akka.stream.ActorAttributes.SupervisionStrategy +import akka.stream._ +import akka.stream.impl.Stages.DefaultAttributes +import akka.stream.stage._ + +import scala.concurrent.{ Future, Promise } +import scala.util.Try +import scala.util.control.NonFatal + +/** + * INTERNAL API + */ +@InternalApi private[akka] final class UnfoldResourceSourceAsync[T, S]( + create: () ⇒ Future[S], + readData: (S) ⇒ Future[Option[T]], + close: (S) ⇒ Future[Done]) extends GraphStage[SourceShape[T]] { + val out = Outlet[T]("UnfoldResourceSourceAsync.out") + override val shape = SourceShape(out) + override def initialAttributes: Attributes = DefaultAttributes.unfoldResourceSourceAsync + + def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler { + lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + var resource = Promise[S]() + var open = false + implicit val context = ExecutionContexts.sameThreadExecutionContext + + setHandler(out, this) + + override def preStart(): Unit = createStream(false) + + private def createStream(withPull: Boolean): Unit = { + val createdCallback = getAsyncCallback[Try[S]] { + case scala.util.Success(res) ⇒ + open = true + resource.success(res) + if (withPull) onPull() + case scala.util.Failure(t) ⇒ failStage(t) + } + try { + create().onComplete(createdCallback.invoke) + } catch { + case NonFatal(ex) ⇒ failStage(ex) + } + } + + private def onResourceReady(f: (S) ⇒ Unit): Unit = resource.future.foreach(f) + + val errorHandler: PartialFunction[Throwable, Unit] = { + case NonFatal(ex) ⇒ decider(ex) match { + case Supervision.Stop ⇒ + onResourceReady(close(_)) + failStage(ex) + case Supervision.Restart ⇒ restartState() + case Supervision.Resume ⇒ onPull() + } + } + + val readCallback = getAsyncCallback[Try[Option[T]]] { + case scala.util.Success(data) ⇒ data match { + case Some(d) ⇒ push(out, d) + case None ⇒ closeStage() + } + case scala.util.Failure(t) ⇒ errorHandler(t) + }.invoke _ + + final override def onPull(): Unit = + onResourceReady { resource ⇒ + try { readData(resource).onComplete(readCallback) } catch errorHandler + } + + override def onDownstreamFinish(): Unit = closeStage() + + private def closeAndThen(f: () ⇒ Unit): Unit = { + setKeepGoing(true) + val closedCallback = getAsyncCallback[Try[Done]] { + case scala.util.Success(_) ⇒ + open = false + f() + case scala.util.Failure(t) ⇒ + open = false + failStage(t) + } + + onResourceReady(res ⇒ + try { close(res).onComplete(closedCallback.invoke) } catch { + case NonFatal(ex) ⇒ failStage(ex) + }) + } + private def restartState(): Unit = closeAndThen(() ⇒ { + resource = Promise[S]() + createStream(true) + }) + private def closeStage(): Unit = closeAndThen(completeStage) + + override def postStop(): Unit = { + if (open) closeStage() + } + + } + override def toString = "UnfoldResourceSourceAsync" + +}