From 69e6052cbefb4070e9f05801002edaed87e66f14 Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Mon, 22 Feb 2016 23:22:47 -0500 Subject: [PATCH] +str #18620 add Source.blocking() for legacy API integration --- akka-docs/rst/java/stream/stages-overview.rst | 17 ++ .../rst/scala/stream/stages-overview.rst | 17 ++ .../stream/DslFactoriesConsistencySpec.scala | 1 - .../scala/akka/stream/io/FileSinkSpec.scala | 2 - .../scala/akka/stream/io/FileSourceSpec.scala | 2 - .../UnfoldResourceAsyncSourceSpec.scala | 248 ++++++++++++++++++ .../scaladsl/UnfoldResourceSourceSpec.scala | 185 +++++++++++++ .../main/scala/akka/stream/impl/Sources.scala | 148 ++++++++++- .../main/scala/akka/stream/impl/Stages.scala | 2 + .../scala/akka/stream/impl/fusing/Ops.scala | 8 +- .../scala/akka/stream/javadsl/Source.scala | 58 +++- .../scala/akka/stream/scaladsl/Source.scala | 49 ++++ 12 files changed, 726 insertions(+), 11 deletions(-) create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/UnfoldResourceAsyncSourceSpec.scala create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/UnfoldResourceSourceSpec.scala diff --git a/akka-docs/rst/java/stream/stages-overview.rst b/akka-docs/rst/java/stream/stages-overview.rst index d0902cb2ab..9bb6ce8c0f 100644 --- a/akka-docs/rst/java/stream/stages-overview.rst +++ b/akka-docs/rst/java/stream/stages-overview.rst @@ -160,6 +160,23 @@ Emit each integer in a range, with an option to take bigger steps than 1. **completes** when the end of the range has been reached +unfoldResource +^^^^^ +Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source. + +**emits** when there is demand and read method returns value + +**completes** when read function returns ``None`` + +unfoldAsyncResource +^^^^^ +Wrap any resource that can be opened, queried for next element and closed using three distinct functions into a source. +Functions return ``CompletionStage`` result to achieve asynchronous processing + +**emits** when there is demand and ``CompletionStage`` from read function returns value + +**completes** when ``CompletionStage`` from read function returns ``None`` + queue ^^^^^ Materialize a ``SourceQueue`` onto which elements can be pushed for emitting from the source. The queue contains diff --git a/akka-docs/rst/scala/stream/stages-overview.rst b/akka-docs/rst/scala/stream/stages-overview.rst index 474ff2dcd6..249fec4628 100644 --- a/akka-docs/rst/scala/stream/stages-overview.rst +++ b/akka-docs/rst/scala/stream/stages-overview.rst @@ -149,6 +149,23 @@ Combine several sources, using a given strategy such as merge or concat, into on **completes** when all sources has completed +unfoldResource +^^^^^ +Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source. + +**emits** when there is demand and read function returns value + +**completes** when read function returns ``None`` + +unfoldAsyncResource +^^^^^ +Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source. +Functions return ``Future`` to achieve asynchronous processing + +**emits** when there is demand and ``Future`` from read function returns value + +**completes** when ``Future`` from read function returns ``None`` + queue ^^^^^ Materialize a ``SourceQueue`` onto which elements can be pushed for emitting from the source. The queue contains diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala index ebb0bd5eef..c1376d6029 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala @@ -34,7 +34,6 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers { (classOf[Boolean], classOf[akka.stream.javadsl.AsPublisher]) :: (classOf[scala.Function0[_]], classOf[akka.japi.function.Creator[_]]) :: (classOf[scala.Function0[_]], classOf[java.util.concurrent.Callable[_]]) :: - (classOf[scala.Function0[_]], classOf[akka.japi.function.Creator[_]]) :: (classOf[scala.Function1[_, Unit]], classOf[akka.japi.function.Procedure[_]]) :: (classOf[scala.Function1[_, _]], classOf[akka.japi.function.Function[_, _]]) :: (classOf[akka.stream.scaladsl.Source[_, _]], classOf[akka.stream.javadsl.Source[_, _]]) :: diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala index 3b4a200416..823d0f5fdf 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala @@ -105,8 +105,6 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { targetFile { f ⇒ val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig) val materializer = ActorMaterializer()(sys) - implicit val timeout = Timeout(3.seconds) - try { Source.fromIterator(() ⇒ Iterator.continually(TestByteStrings.head)).runWith(FileIO.toFile(f))(materializer) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala index c22a4ed60c..efe4f9823f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FileSourceSpec.scala @@ -170,8 +170,6 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { "use dedicated blocking-io-dispatcher by default" in assertAllStagesStopped { val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig) val materializer = ActorMaterializer()(sys) - implicit val timeout = Timeout(500.millis) - try { val p = FileIO.fromFile(manyLines).runWith(TestSink.probe)(materializer) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/UnfoldResourceAsyncSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/UnfoldResourceAsyncSourceSpec.scala new file mode 100644 index 0000000000..a5dc0ae744 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/UnfoldResourceAsyncSourceSpec.scala @@ -0,0 +1,248 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.stream.scaladsl + +import java.io._ + +import akka.Done +import akka.actor.{ NoSerializationVerificationNeeded, ActorSystem } +import akka.stream.ActorAttributes._ +import akka.stream.Supervision._ +import akka.stream.{ ActorMaterializer, _ } +import akka.stream.impl.StreamSupervisor.Children +import akka.stream.impl.{ ActorMaterializerImpl, StreamSupervisor } +import akka.stream.testkit.TestSubscriber +import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.TestSink +import akka.util.{ ByteString, Timeout } +import akka.testkit.AkkaSpec + +import scala.concurrent.{ Await, Future, Promise } +import scala.concurrent.duration._ +import scala.util.control.NoStackTrace + +class UnfoldResourceAsyncSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { + + val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher") + implicit val materializer = ActorMaterializer(settings) + + val manyLines = { + ("a" * 100 + "\n") * 10 + + ("b" * 100 + "\n") * 10 + + ("c" * 100 + "\n") * 10 + + ("d" * 100 + "\n") * 10 + + ("e" * 100 + "\n") * 10 + + ("f" * 100 + "\n") * 10 + } + val manyLinesArray = manyLines.split("\n") + + val manyLinesFile = { + val f = File.createTempFile("blocking-source-async-spec", ".tmp") + new FileWriter(f).append(manyLines).close() + f + } + + val open: () ⇒ Future[BufferedReader] = () ⇒ Promise.successful(new BufferedReader(new FileReader(manyLinesFile))).future + val read: (BufferedReader) ⇒ Future[Option[String]] = reader ⇒ Promise.successful(Option(reader.readLine())).future + val close: (BufferedReader) ⇒ Future[Done] = + reader ⇒ { + reader.close() + Promise.successful(Done).future + } + + "Unfold Resource Async Source" must { + "read contents from a file" in assertAllStagesStopped { + val createPromise = Promise[BufferedReader]() + val readPromise = Promise[Option[String]]() + val closePromise = Promise[Done]() + + val createPromiseCalled = Promise[Done]() + val readPromiseCalled = Promise[Done]() + val closePromiseCalled = Promise[Done]() + + val resource = new BufferedReader(new FileReader(manyLinesFile)) + val p = Source.unfoldResourceAsync[String, BufferedReader](() ⇒ { + createPromiseCalled.success(Done) + createPromise.future + }, + reader ⇒ { + readPromiseCalled.success(Done) + readPromise.future + }, + reader ⇒ { + closePromiseCalled.success(Done) + closePromise.future + }) + .runWith(Sink.asPublisher(false)) + val c = TestSubscriber.manualProbe[String]() + p.subscribe(c) + val sub = c.expectSubscription() + + sub.request(1) + Await.ready(createPromiseCalled.future, 3.seconds) + c.expectNoMsg(200.millis) + createPromise.success(resource) + + val chunks = manyLinesArray.toList.iterator + + Await.ready(readPromiseCalled.future, 3.seconds) + c.expectNoMsg(200.millis) + readPromise.success(Option(resource.readLine())) + c.expectNext() should ===(chunks.next()) + + sub.cancel() + Await.ready(closePromiseCalled.future, 3.seconds) + resource.close() + closePromise.success(Done) + } + + "close resource successfully right after open" in assertAllStagesStopped { + val createPromise = Promise[BufferedReader]() + val readPromise = Promise[Option[String]]() + val closePromise = Promise[Done]() + + val createPromiseCalled = Promise[Done]() + val readPromiseCalled = Promise[Done]() + val closePromiseCalled = Promise[Done]() + + val resource = new BufferedReader(new FileReader(manyLinesFile)) + val p = Source.unfoldResourceAsync[String, BufferedReader](() ⇒ { + createPromiseCalled.success(Done) + createPromise.future + }, + reader ⇒ { + readPromiseCalled.success(Done) + readPromise.future + }, + reader ⇒ { + closePromiseCalled.success(Done) + closePromise.future + }) + .runWith(Sink.asPublisher(false)) + val c = TestSubscriber.manualProbe[String]() + p.subscribe(c) + val sub = c.expectSubscription() + + Await.ready(createPromiseCalled.future, 3.seconds) + createPromise.success(resource) + + sub.cancel() + Await.ready(closePromiseCalled.future, 3.seconds) + resource.close() + closePromise.success(Done) + } + + "continue when Strategy is Resume and exception happened" in assertAllStagesStopped { + val p = Source.unfoldResourceAsync[String, BufferedReader](open, + reader ⇒ { + val s = reader.readLine() + if (s != null && s.contains("b")) throw TE("") else Promise.successful(Option(s)).future + }, close).withAttributes(supervisionStrategy(resumingDecider)) + .runWith(Sink.asPublisher(false)) + val c = TestSubscriber.manualProbe[String]() + + p.subscribe(c) + val sub = c.expectSubscription() + + (0 to 49).foreach(i ⇒ { + sub.request(1) + c.expectNext() should ===(if (i < 10) manyLinesArray(i) else manyLinesArray(i + 10)) + }) + sub.request(1) + c.expectComplete() + } + + "close and open stream again when Strategy is Restart" in assertAllStagesStopped { + val p = Source.unfoldResourceAsync[String, BufferedReader](open, + reader ⇒ { + val s = reader.readLine() + if (s != null && s.contains("b")) throw TE("") else Promise.successful(Option(s)).future + }, close).withAttributes(supervisionStrategy(restartingDecider)) + .runWith(Sink.asPublisher(false)) + val c = TestSubscriber.manualProbe[String]() + + p.subscribe(c) + val sub = c.expectSubscription() + + (0 to 19).foreach(i ⇒ { + sub.request(1) + c.expectNext() should ===(manyLinesArray(0)) + }) + sub.cancel() + } + + "work with ByteString as well" in assertAllStagesStopped { + val chunkSize = 50 + val buffer = Array.ofDim[Char](chunkSize) + val p = Source.unfoldResourceAsync[ByteString, Reader](open, + reader ⇒ { + val p = Promise[Option[ByteString]] + val s = reader.read(buffer) + if (s > 0) p.success(Some(ByteString(buffer.mkString("")).take(s))) else p.success(None) + p.future + }, + reader ⇒ { + reader.close() + Promise.successful(Done).future + }).runWith(Sink.asPublisher(false)) + val c = TestSubscriber.manualProbe[ByteString]() + + var remaining = manyLines + def nextChunk() = { + val (chunk, rest) = remaining.splitAt(chunkSize) + remaining = rest + chunk + } + + p.subscribe(c) + val sub = c.expectSubscription() + + (0 to 121).foreach(i ⇒ { + sub.request(1) + c.expectNext().utf8String should ===(nextChunk().toString) + }) + sub.request(1) + c.expectComplete() + } + + "use dedicated blocking-io-dispatcher by default" in assertAllStagesStopped { + val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig) + val materializer = ActorMaterializer()(sys) + try { + val p = Source.unfoldResourceAsync[String, BufferedReader](open, + read, close).runWith(TestSink.probe)(materializer) + + materializer.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor) + val ref = expectMsgType[Children].children.find(_.path.toString contains "unfoldResourceSourceAsync").get + try assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher") finally p.cancel() + } finally shutdown(sys) + } + + "fail when create throws exception" in assertAllStagesStopped { + val p = Source.unfoldResourceAsync[String, BufferedReader](() ⇒ throw TE(""), + read, close).runWith(Sink.asPublisher(false)) + val c = TestSubscriber.manualProbe[String]() + p.subscribe(c) + + c.expectSubscription() + c.expectError(TE("")) + } + + "fail when close throws exception" in assertAllStagesStopped { + val p = Source.unfoldResourceAsync[String, BufferedReader](open, + read, reader ⇒ throw TE("")) + .runWith(Sink.asPublisher(false)) + val c = TestSubscriber.manualProbe[String]() + p.subscribe(c) + + val sub = c.expectSubscription() + sub.request(61) + c.expectNextN(60) + c.expectError() + } + } + override def afterTermination(): Unit = { + manyLinesFile.delete() + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/UnfoldResourceSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/UnfoldResourceSourceSpec.scala new file mode 100644 index 0000000000..1c4b7ed68e --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/UnfoldResourceSourceSpec.scala @@ -0,0 +1,185 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.stream.scaladsl + +import java.io._ + +import akka.actor.ActorSystem +import akka.stream.ActorAttributes._ +import akka.stream.Supervision._ +import akka.stream.{ ActorMaterializer, _ } +import akka.stream.impl.StreamSupervisor.Children +import akka.stream.impl.{ ActorMaterializerImpl, StreamSupervisor } +import akka.stream.testkit.TestSubscriber +import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.TestSink +import akka.util.{ ByteString, Timeout } +import akka.testkit.AkkaSpec + +import scala.concurrent.duration._ +import scala.util.control.NoStackTrace + +class UnfoldResourceSourceSpec extends AkkaSpec(UnboundedMailboxConfig) { + + val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher") + implicit val materializer = ActorMaterializer(settings) + + val manyLines = { + ("a" * 100 + "\n") * 10 + + ("b" * 100 + "\n") * 10 + + ("c" * 100 + "\n") * 10 + + ("d" * 100 + "\n") * 10 + + ("e" * 100 + "\n") * 10 + + ("f" * 100 + "\n") * 10 + } + val manyLinesArray = manyLines.split("\n") + + val manyLinesFile = { + val f = File.createTempFile("blocking-source-spec", ".tmp") + new FileWriter(f).append(manyLines).close() + f + } + + "Unfold Resource Source" must { + "read contents from a file" in assertAllStagesStopped { + val p = Source.unfoldResource[String, BufferedReader](() ⇒ new BufferedReader(new FileReader(manyLinesFile)), + reader ⇒ Option(reader.readLine()), + reader ⇒ reader.close()) + .runWith(Sink.asPublisher(false)) + val c = TestSubscriber.manualProbe[String]() + p.subscribe(c) + val sub = c.expectSubscription() + + val chunks = manyLinesArray.toList.iterator + + sub.request(1) + c.expectNext() should ===(chunks.next()) + sub.request(1) + c.expectNext() should ===(chunks.next()) + c.expectNoMsg(300.millis) + + while (chunks.hasNext) { + sub.request(1) + c.expectNext() should ===(chunks.next()) + } + sub.request(1) + + c.expectComplete() + } + + "continue when Strategy is Resume and exception happened" in assertAllStagesStopped { + val p = Source.unfoldResource[String, BufferedReader](() ⇒ new BufferedReader(new FileReader(manyLinesFile)), + reader ⇒ { + val s = reader.readLine() + if (s != null && s.contains("b")) throw TE("") else Option(s) + }, + reader ⇒ reader.close()).withAttributes(supervisionStrategy(resumingDecider)) + .runWith(Sink.asPublisher(false)) + val c = TestSubscriber.manualProbe[String]() + + p.subscribe(c) + val sub = c.expectSubscription() + + (0 to 49).foreach(i ⇒ { + sub.request(1) + c.expectNext() should ===(if (i < 10) manyLinesArray(i) else manyLinesArray(i + 10)) + }) + sub.request(1) + c.expectComplete() + } + + "close and open stream again when Strategy is Restart" in assertAllStagesStopped { + val p = Source.unfoldResource[String, BufferedReader](() ⇒ new BufferedReader(new FileReader(manyLinesFile)), + reader ⇒ { + val s = reader.readLine() + if (s != null && s.contains("b")) throw TE("") else Option(s) + }, + reader ⇒ reader.close()).withAttributes(supervisionStrategy(restartingDecider)) + .runWith(Sink.asPublisher(false)) + val c = TestSubscriber.manualProbe[String]() + + p.subscribe(c) + val sub = c.expectSubscription() + + (0 to 19).foreach(i ⇒ { + sub.request(1) + c.expectNext() should ===(manyLinesArray(0)) + }) + sub.cancel() + } + + "work with ByteString as well" in assertAllStagesStopped { + val chunkSize = 50 + val buffer = Array.ofDim[Char](chunkSize) + val p = Source.unfoldResource[ByteString, Reader](() ⇒ new BufferedReader(new FileReader(manyLinesFile)), + reader ⇒ { + val s = reader.read(buffer) + if (s > 0) Some(ByteString(buffer.mkString("")).take(s)) else None + }, + reader ⇒ reader.close()) + .runWith(Sink.asPublisher(false)) + val c = TestSubscriber.manualProbe[ByteString]() + + var remaining = manyLines + def nextChunk() = { + val (chunk, rest) = remaining.splitAt(chunkSize) + remaining = rest + chunk + } + + p.subscribe(c) + val sub = c.expectSubscription() + + (0 to 121).foreach(i ⇒ { + sub.request(1) + c.expectNext().utf8String should ===(nextChunk().toString) + }) + sub.request(1) + c.expectComplete() + } + + "use dedicated blocking-io-dispatcher by default" in assertAllStagesStopped { + val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig) + val materializer = ActorMaterializer()(sys) + try { + val p = Source.unfoldResource[String, BufferedReader](() ⇒ new BufferedReader(new FileReader(manyLinesFile)), + reader ⇒ Option(reader.readLine()), + reader ⇒ reader.close()).runWith(TestSink.probe)(materializer) + + materializer.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor) + val ref = expectMsgType[Children].children.find(_.path.toString contains "unfoldResourceSource").get + try assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher") finally p.cancel() + } finally shutdown(sys) + } + + "fail when create throws exception" in assertAllStagesStopped { + val p = Source.unfoldResource[String, BufferedReader](() ⇒ throw TE(""), + reader ⇒ Option(reader.readLine()), + reader ⇒ reader.close()) + .runWith(Sink.asPublisher(false)) + val c = TestSubscriber.manualProbe[String]() + p.subscribe(c) + + c.expectSubscription() + c.expectError(TE("")) + } + + "fail when close throws exception" in assertAllStagesStopped { + val p = Source.unfoldResource[String, BufferedReader](() ⇒ new BufferedReader(new FileReader(manyLinesFile)), + reader ⇒ Option(reader.readLine()), + reader ⇒ throw TE("")) + .runWith(Sink.asPublisher(false)) + val c = TestSubscriber.manualProbe[String]() + p.subscribe(c) + + val sub = c.expectSubscription() + sub.request(61) + c.expectNextN(60) + c.expectError(TE("")) + } + } + override def afterTermination(): Unit = { + manyLinesFile.delete() + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala index 6be339a6d8..b79eab55ab 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala @@ -3,14 +3,20 @@ */ package akka.stream.impl +import akka.dispatch.ExecutionContexts +import akka.stream.ActorAttributes.SupervisionStrategy import akka.stream.OverflowStrategies._ import akka.stream._ +import akka.stream.impl.Stages.DefaultAttributes import akka.stream.stage._ -import scala.concurrent.{ Future, Promise } import akka.stream.scaladsl.SourceQueueWithComplete +import scala.annotation.tailrec +import scala.concurrent.{ Future, Promise } import akka.Done import java.util.concurrent.CompletionStage import scala.compat.java8.FutureConverters._ +import scala.util.Try +import scala.util.control.NonFatal /** * INTERNAL API @@ -185,3 +191,143 @@ private[akka] final class SourceQueueAdapter[T](delegate: SourceQueueWithComplet def complete(): Unit = delegate.complete() def fail(ex: Throwable): Unit = delegate.fail(ex) } + +/** + * INTERNAL API + */ +private[stream] final class UnfoldResourceSource[T, S](create: () ⇒ S, + readData: (S) ⇒ Option[T], + close: (S) ⇒ Unit) extends GraphStage[SourceShape[T]] { + val out = Outlet[T]("UnfoldResourceSource.out") + override val shape = SourceShape(out) + override def initialAttributes: Attributes = DefaultAttributes.unfoldResourceSource + + def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler { + lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + var blockingStream: S = _ + setHandler(out, this) + + override def preStart(): Unit = blockingStream = create() + + @tailrec + final override def onPull(): Unit = { + var resumingMode = false + try { + readData(blockingStream) match { + case Some(data) ⇒ push(out, data) + case None ⇒ closeStage() + } + } catch { + case NonFatal(ex) ⇒ decider(ex) match { + case Supervision.Stop ⇒ + close(blockingStream) + failStage(ex) + case Supervision.Restart ⇒ + restartState() + resumingMode = true + case Supervision.Resume ⇒ + resumingMode = true + } + } + if (resumingMode) onPull() + } + + override def onDownstreamFinish(): Unit = closeStage() + + private def restartState(): Unit = { + close(blockingStream) + blockingStream = create() + } + + private def closeStage(): Unit = + try { + close(blockingStream) + completeStage() + } catch { + case NonFatal(ex) ⇒ failStage(ex) + } + + } + override def toString = "UnfoldResourceSource" +} + +private[stream] final class UnfoldResourceSourceAsync[T, S](create: () ⇒ Future[S], + readData: (S) ⇒ Future[Option[T]], + close: (S) ⇒ Future[Done]) extends GraphStage[SourceShape[T]] { + val out = Outlet[T]("UnfoldResourceSourceAsync.out") + override val shape = SourceShape(out) + override def initialAttributes: Attributes = DefaultAttributes.unfoldResourceSourceAsync + + def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler { + lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + var resource = Promise[S]() + implicit val context = ExecutionContexts.sameThreadExecutionContext + + setHandler(out, this) + + override def preStart(): Unit = createStream(false) + + private def createStream(withPull: Boolean): Unit = { + val cb = getAsyncCallback[Try[S]] { + case scala.util.Success(res) ⇒ + resource.success(res) + if (withPull) onPull() + case scala.util.Failure(t) ⇒ failStage(t) + } + try { + create().onComplete(cb.invoke) + } catch { + case NonFatal(ex) ⇒ failStage(ex) + } + } + + private def onResourceReady(f: (S) ⇒ Unit): Unit = resource.future.onSuccess { + case resource ⇒ f(resource) + } + + val errorHandler: PartialFunction[Throwable, Unit] = { + case NonFatal(ex) ⇒ decider(ex) match { + case Supervision.Stop ⇒ + onResourceReady(close(_)) + failStage(ex) + case Supervision.Restart ⇒ restartState() + case Supervision.Resume ⇒ onPull() + } + } + val callback = getAsyncCallback[Try[Option[T]]] { + case scala.util.Success(data) ⇒ data match { + case Some(d) ⇒ push(out, d) + case None ⇒ closeStage() + } + case scala.util.Failure(t) ⇒ errorHandler(t) + }.invoke _ + + final override def onPull(): Unit = onResourceReady { + case resource ⇒ + try { readData(resource).onComplete(callback) } catch errorHandler + } + + override def onDownstreamFinish(): Unit = closeStage() + + private def closeAndThen(f: () ⇒ Unit): Unit = { + setKeepGoing(true) + val cb = getAsyncCallback[Try[Done]] { + case scala.util.Success(_) ⇒ f() + case scala.util.Failure(t) ⇒ failStage(t) + } + + onResourceReady(res ⇒ + try { close(res).onComplete(cb.invoke) } catch { + case NonFatal(ex) ⇒ failStage(ex) + }) + } + private def restartState(): Unit = closeAndThen(() ⇒ { + resource = Promise[S]() + createStream(true) + }) + private def closeStage(): Unit = closeAndThen(completeStage) + + } + override def toString = "UnfoldResourceSourceAsync" + +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 8a916d0e6d..4317e29696 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -95,6 +95,8 @@ private[stream] object Stages { val inputStreamSource = name("inputStreamSource") and IODispatcher val outputStreamSource = name("outputStreamSource") and IODispatcher val fileSource = name("fileSource") and IODispatcher + val unfoldResourceSource = name("unfoldResourceSource") and IODispatcher + val unfoldResourceSourceAsync = name("unfoldResourceSourceAsync") and IODispatcher val subscriberSink = name("subscriberSink") val cancelledSink = name("cancelledSink") diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index fd8dcbca79..409a2e7a09 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -9,7 +9,7 @@ import akka.stream.Attributes.{ InputBuffer, LogLevels } import akka.stream.OverflowStrategies._ import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage import akka.stream.impl.{ Buffer ⇒ BufferImpl, ReactiveStreamsCompliance } -import akka.stream.scaladsl.Source +import akka.stream.scaladsl.{ SourceQueue, Source } import akka.stream.stage._ import akka.stream.{ Supervision, _ } import scala.annotation.tailrec @@ -507,7 +507,7 @@ private[akka] final case class Batch[In, Out](max: Long, costFn: In ⇒ Long, se override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { - val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) private var agg: Out = null.asInstanceOf[Out] private var left: Long = max @@ -699,7 +699,7 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut override def toString = s"MapAsync.Logic(buffer=$buffer)" //FIXME Put Supervision.stoppingDecider as a SupervisionStrategy on DefaultAttributes.mapAsync? - val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) var buffer: BufferImpl[Holder[Try[Out]]] = _ def todo = buffer.used @@ -1238,7 +1238,7 @@ private[stream] final class StatefulMapConcat[In, Out](f: () ⇒ In ⇒ immutabl override def initialAttributes: Attributes = DefaultAttributes.statefulMapConcat def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler { - val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) var currentIterator: Iterator[Out] = _ var plainFun = f() def hasNext = if (currentIterator != null) currentIterator.hasNext else false diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index ece94fbab4..ced4097c10 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -10,7 +10,7 @@ import akka.actor.{ ActorRef, Cancellable, Props } import akka.event.LoggingAdapter import akka.japi.{ Pair, Util, function } import akka.stream._ -import akka.stream.impl.{ ConstantFun, StreamLayout } +import akka.stream.impl.{ ConstantFun, StreamLayout, SourceQueueAdapter } import akka.stream.stage.Stage import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.unchecked.uncheckedVariance @@ -307,6 +307,62 @@ object Source { def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueueWithComplete[T]] = new Source(scaladsl.Source.queue[T](bufferSize, overflowStrategy).mapMaterializedValue(new SourceQueueAdapter(_))) + /** + * Start a new `Source` from some resource which can be opened, read and closed. + * Interaction with resource happens in a blocking way. + * + * Example: + * {{{ + * Source.unfoldResource( + * () -> new BufferedReader(new FileReader("...")), + * reader -> reader.readLine(), + * reader -> reader.close()) + * }}} + * + * You can use the supervision strategy to handle exceptions for `read` function. All exceptions thrown by `create` + * or `close` will fail the stream. + * + * `Restart` supervision strategy will close and create blocking IO again. Default strategy is `Stop` which means + * that stream will be terminated on error in `read` function by default. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * @param create - function that is called on stream start and creates/opens resource. + * @param read - function that reads data from opened resource. It is called each time backpressure signal + * is received. Stream calls close and completes when `read` returns None. + * @param close - function that closes resource + */ + def unfoldResource[T, S](create: function.Creator[S], + read: function.Function[S, Optional[T]], + close: function.Procedure[S]): javadsl.Source[T, NotUsed] = + new Source(scaladsl.Source.unfoldResource[T,S](create.create, + (s: S) ⇒ read.apply(s).asScala, close.apply)) + + /** + * Start a new `Source` from some resource which can be opened, read and closed. + * It's similar to `unfoldResource` but takes functions that return `CopletionStage` instead of plain values. + * + * You can use the supervision strategy to handle exceptions for `read` function or failures of produced `Futures`. + * All exceptions thrown by `create` or `close` as well as fails of returned futures will fail the stream. + * + * `Restart` supervision strategy will close and create resource. Default strategy is `Stop` which means + * that stream will be terminated on error in `read` function (or future) by default. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * @param create - function that is called on stream start and creates/opens resource. + * @param read - function that reads data from opened resource. It is called each time backpressure signal + * is received. Stream calls close and completes when `CompletionStage` from read function returns None. + * @param close - function that closes resource + */ + def unfoldResourceAsync[T, S](create: function.Creator[CompletionStage[S]], + read: function.Function[S, CompletionStage[Optional[T]]], + close: function.Function[S, CompletionStage[Done]]): javadsl.Source[T, NotUsed] = + new Source(scaladsl.Source.unfoldResourceAsync[T,S](() ⇒ create.create().toScala, + (s: S) ⇒ read.apply(s).toScala.map(_.asScala)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext), + (s: S) ⇒ close.apply(s).toScala)) } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 4915d9127d..efff974b83 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -440,4 +440,53 @@ object Source { def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueueWithComplete[T]] = Source.fromGraph(new QueueSource(bufferSize, overflowStrategy).withAttributes(DefaultAttributes.queueSource)) + /** + * Start a new `Source` from some resource which can be opened, read and closed. + * Interaction with resource happens in a blocking way. + * + * Example: + * {{{ + * Source.unfoldResource( + * () => new BufferedReader(new FileReader("...")), + * reader => Option(reader.readLine()), + * reader => reader.close()) + * }}} + * + * You can use the supervision strategy to handle exceptions for `read` function. All exceptions thrown by `create` + * or `close` will fail the stream. + * + * `Restart` supervision strategy will close and create blocking IO again. Default strategy is `Stop` which means + * that stream will be terminated on error in `read` function by default. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * @param create - function that is called on stream start and creates/opens resource. + * @param read - function that reads data from opened resource. It is called each time backpressure signal + * is received. Stream calls close and completes when `read` returns None. + * @param close - function that closes resource + */ + def unfoldResource[T, S](create: () ⇒ S, read: (S) ⇒ Option[T], close: (S) ⇒ Unit): Source[T, NotUsed] = + Source.fromGraph(new UnfoldResourceSource(create, read, close)) + + /** + * Start a new `Source` from some resource which can be opened, read and closed. + * It's similar to `unfoldResource` but takes functions that return `Futures` instead of plain values. + * + * You can use the supervision strategy to handle exceptions for `read` function or failures of produced `Futures`. + * All exceptions thrown by `create` or `close` as well as fails of returned futures will fail the stream. + * + * `Restart` supervision strategy will close and create resource. Default strategy is `Stop` which means + * that stream will be terminated on error in `read` function (or future) by default. + * + * You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or + * set it for a given Source by using [[ActorAttributes]]. + * + * @param create - function that is called on stream start and creates/opens resource. + * @param read - function that reads data from opened resource. It is called each time backpressure signal + * is received. Stream calls close and completes when `Future` from read function returns None. + * @param close - function that closes resource + */ + def unfoldResourceAsync[T, S](create: () ⇒ Future[S], read: (S) ⇒ Future[Option[T]], close: (S) ⇒ Future[Done]): Source[T, NotUsed] = + Source.fromGraph(new UnfoldResourceSourceAsync(create, read, close)) }