+str #18620 add Source.blocking() for legacy API integration

This commit is contained in:
Alexander Golubev 2016-02-22 23:22:47 -05:00
parent 1596ae7f13
commit 69e6052cbe
12 changed files with 726 additions and 11 deletions

View file

@ -160,6 +160,23 @@ Emit each integer in a range, with an option to take bigger steps than 1.
**completes** when the end of the range has been reached
unfoldResource
^^^^^
Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source.
**emits** when there is demand and read method returns value
**completes** when read function returns ``None``
unfoldAsyncResource
^^^^^
Wrap any resource that can be opened, queried for next element and closed using three distinct functions into a source.
Functions return ``CompletionStage`` result to achieve asynchronous processing
**emits** when there is demand and ``CompletionStage`` from read function returns value
**completes** when ``CompletionStage`` from read function returns ``None``
queue
^^^^^
Materialize a ``SourceQueue`` onto which elements can be pushed for emitting from the source. The queue contains

View file

@ -149,6 +149,23 @@ Combine several sources, using a given strategy such as merge or concat, into on
**completes** when all sources has completed
unfoldResource
^^^^^
Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source.
**emits** when there is demand and read function returns value
**completes** when read function returns ``None``
unfoldAsyncResource
^^^^^
Wrap any resource that can be opened, queried for next element (in a blocking way) and closed using three distinct functions into a source.
Functions return ``Future`` to achieve asynchronous processing
**emits** when there is demand and ``Future`` from read function returns value
**completes** when ``Future`` from read function returns ``None``
queue
^^^^^
Materialize a ``SourceQueue`` onto which elements can be pushed for emitting from the source. The queue contains

View file

@ -34,7 +34,6 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers {
(classOf[Boolean], classOf[akka.stream.javadsl.AsPublisher]) ::
(classOf[scala.Function0[_]], classOf[akka.japi.function.Creator[_]]) ::
(classOf[scala.Function0[_]], classOf[java.util.concurrent.Callable[_]]) ::
(classOf[scala.Function0[_]], classOf[akka.japi.function.Creator[_]]) ::
(classOf[scala.Function1[_, Unit]], classOf[akka.japi.function.Procedure[_]]) ::
(classOf[scala.Function1[_, _]], classOf[akka.japi.function.Function[_, _]]) ::
(classOf[akka.stream.scaladsl.Source[_, _]], classOf[akka.stream.javadsl.Source[_, _]]) ::

View file

@ -105,8 +105,6 @@ class FileSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
targetFile { f
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
val materializer = ActorMaterializer()(sys)
implicit val timeout = Timeout(3.seconds)
try {
Source.fromIterator(() Iterator.continually(TestByteStrings.head)).runWith(FileIO.toFile(f))(materializer)

View file

@ -170,8 +170,6 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
"use dedicated blocking-io-dispatcher by default" in assertAllStagesStopped {
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
val materializer = ActorMaterializer()(sys)
implicit val timeout = Timeout(500.millis)
try {
val p = FileIO.fromFile(manyLines).runWith(TestSink.probe)(materializer)

View file

@ -0,0 +1,248 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.scaladsl
import java.io._
import akka.Done
import akka.actor.{ NoSerializationVerificationNeeded, ActorSystem }
import akka.stream.ActorAttributes._
import akka.stream.Supervision._
import akka.stream.{ ActorMaterializer, _ }
import akka.stream.impl.StreamSupervisor.Children
import akka.stream.impl.{ ActorMaterializerImpl, StreamSupervisor }
import akka.stream.testkit.TestSubscriber
import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.TestSink
import akka.util.{ ByteString, Timeout }
import akka.testkit.AkkaSpec
import scala.concurrent.{ Await, Future, Promise }
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
class UnfoldResourceAsyncSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher")
implicit val materializer = ActorMaterializer(settings)
val manyLines = {
("a" * 100 + "\n") * 10 +
("b" * 100 + "\n") * 10 +
("c" * 100 + "\n") * 10 +
("d" * 100 + "\n") * 10 +
("e" * 100 + "\n") * 10 +
("f" * 100 + "\n") * 10
}
val manyLinesArray = manyLines.split("\n")
val manyLinesFile = {
val f = File.createTempFile("blocking-source-async-spec", ".tmp")
new FileWriter(f).append(manyLines).close()
f
}
val open: () Future[BufferedReader] = () Promise.successful(new BufferedReader(new FileReader(manyLinesFile))).future
val read: (BufferedReader) Future[Option[String]] = reader Promise.successful(Option(reader.readLine())).future
val close: (BufferedReader) Future[Done] =
reader {
reader.close()
Promise.successful(Done).future
}
"Unfold Resource Async Source" must {
"read contents from a file" in assertAllStagesStopped {
val createPromise = Promise[BufferedReader]()
val readPromise = Promise[Option[String]]()
val closePromise = Promise[Done]()
val createPromiseCalled = Promise[Done]()
val readPromiseCalled = Promise[Done]()
val closePromiseCalled = Promise[Done]()
val resource = new BufferedReader(new FileReader(manyLinesFile))
val p = Source.unfoldResourceAsync[String, BufferedReader](() {
createPromiseCalled.success(Done)
createPromise.future
},
reader {
readPromiseCalled.success(Done)
readPromise.future
},
reader {
closePromiseCalled.success(Done)
closePromise.future
})
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[String]()
p.subscribe(c)
val sub = c.expectSubscription()
sub.request(1)
Await.ready(createPromiseCalled.future, 3.seconds)
c.expectNoMsg(200.millis)
createPromise.success(resource)
val chunks = manyLinesArray.toList.iterator
Await.ready(readPromiseCalled.future, 3.seconds)
c.expectNoMsg(200.millis)
readPromise.success(Option(resource.readLine()))
c.expectNext() should ===(chunks.next())
sub.cancel()
Await.ready(closePromiseCalled.future, 3.seconds)
resource.close()
closePromise.success(Done)
}
"close resource successfully right after open" in assertAllStagesStopped {
val createPromise = Promise[BufferedReader]()
val readPromise = Promise[Option[String]]()
val closePromise = Promise[Done]()
val createPromiseCalled = Promise[Done]()
val readPromiseCalled = Promise[Done]()
val closePromiseCalled = Promise[Done]()
val resource = new BufferedReader(new FileReader(manyLinesFile))
val p = Source.unfoldResourceAsync[String, BufferedReader](() {
createPromiseCalled.success(Done)
createPromise.future
},
reader {
readPromiseCalled.success(Done)
readPromise.future
},
reader {
closePromiseCalled.success(Done)
closePromise.future
})
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[String]()
p.subscribe(c)
val sub = c.expectSubscription()
Await.ready(createPromiseCalled.future, 3.seconds)
createPromise.success(resource)
sub.cancel()
Await.ready(closePromiseCalled.future, 3.seconds)
resource.close()
closePromise.success(Done)
}
"continue when Strategy is Resume and exception happened" in assertAllStagesStopped {
val p = Source.unfoldResourceAsync[String, BufferedReader](open,
reader {
val s = reader.readLine()
if (s != null && s.contains("b")) throw TE("") else Promise.successful(Option(s)).future
}, close).withAttributes(supervisionStrategy(resumingDecider))
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[String]()
p.subscribe(c)
val sub = c.expectSubscription()
(0 to 49).foreach(i {
sub.request(1)
c.expectNext() should ===(if (i < 10) manyLinesArray(i) else manyLinesArray(i + 10))
})
sub.request(1)
c.expectComplete()
}
"close and open stream again when Strategy is Restart" in assertAllStagesStopped {
val p = Source.unfoldResourceAsync[String, BufferedReader](open,
reader {
val s = reader.readLine()
if (s != null && s.contains("b")) throw TE("") else Promise.successful(Option(s)).future
}, close).withAttributes(supervisionStrategy(restartingDecider))
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[String]()
p.subscribe(c)
val sub = c.expectSubscription()
(0 to 19).foreach(i {
sub.request(1)
c.expectNext() should ===(manyLinesArray(0))
})
sub.cancel()
}
"work with ByteString as well" in assertAllStagesStopped {
val chunkSize = 50
val buffer = Array.ofDim[Char](chunkSize)
val p = Source.unfoldResourceAsync[ByteString, Reader](open,
reader {
val p = Promise[Option[ByteString]]
val s = reader.read(buffer)
if (s > 0) p.success(Some(ByteString(buffer.mkString("")).take(s))) else p.success(None)
p.future
},
reader {
reader.close()
Promise.successful(Done).future
}).runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[ByteString]()
var remaining = manyLines
def nextChunk() = {
val (chunk, rest) = remaining.splitAt(chunkSize)
remaining = rest
chunk
}
p.subscribe(c)
val sub = c.expectSubscription()
(0 to 121).foreach(i {
sub.request(1)
c.expectNext().utf8String should ===(nextChunk().toString)
})
sub.request(1)
c.expectComplete()
}
"use dedicated blocking-io-dispatcher by default" in assertAllStagesStopped {
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
val materializer = ActorMaterializer()(sys)
try {
val p = Source.unfoldResourceAsync[String, BufferedReader](open,
read, close).runWith(TestSink.probe)(materializer)
materializer.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "unfoldResourceSourceAsync").get
try assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher") finally p.cancel()
} finally shutdown(sys)
}
"fail when create throws exception" in assertAllStagesStopped {
val p = Source.unfoldResourceAsync[String, BufferedReader](() throw TE(""),
read, close).runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[String]()
p.subscribe(c)
c.expectSubscription()
c.expectError(TE(""))
}
"fail when close throws exception" in assertAllStagesStopped {
val p = Source.unfoldResourceAsync[String, BufferedReader](open,
read, reader throw TE(""))
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[String]()
p.subscribe(c)
val sub = c.expectSubscription()
sub.request(61)
c.expectNextN(60)
c.expectError()
}
}
override def afterTermination(): Unit = {
manyLinesFile.delete()
}
}

View file

@ -0,0 +1,185 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.scaladsl
import java.io._
import akka.actor.ActorSystem
import akka.stream.ActorAttributes._
import akka.stream.Supervision._
import akka.stream.{ ActorMaterializer, _ }
import akka.stream.impl.StreamSupervisor.Children
import akka.stream.impl.{ ActorMaterializerImpl, StreamSupervisor }
import akka.stream.testkit.TestSubscriber
import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.TestSink
import akka.util.{ ByteString, Timeout }
import akka.testkit.AkkaSpec
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
class UnfoldResourceSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher")
implicit val materializer = ActorMaterializer(settings)
val manyLines = {
("a" * 100 + "\n") * 10 +
("b" * 100 + "\n") * 10 +
("c" * 100 + "\n") * 10 +
("d" * 100 + "\n") * 10 +
("e" * 100 + "\n") * 10 +
("f" * 100 + "\n") * 10
}
val manyLinesArray = manyLines.split("\n")
val manyLinesFile = {
val f = File.createTempFile("blocking-source-spec", ".tmp")
new FileWriter(f).append(manyLines).close()
f
}
"Unfold Resource Source" must {
"read contents from a file" in assertAllStagesStopped {
val p = Source.unfoldResource[String, BufferedReader](() new BufferedReader(new FileReader(manyLinesFile)),
reader Option(reader.readLine()),
reader reader.close())
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[String]()
p.subscribe(c)
val sub = c.expectSubscription()
val chunks = manyLinesArray.toList.iterator
sub.request(1)
c.expectNext() should ===(chunks.next())
sub.request(1)
c.expectNext() should ===(chunks.next())
c.expectNoMsg(300.millis)
while (chunks.hasNext) {
sub.request(1)
c.expectNext() should ===(chunks.next())
}
sub.request(1)
c.expectComplete()
}
"continue when Strategy is Resume and exception happened" in assertAllStagesStopped {
val p = Source.unfoldResource[String, BufferedReader](() new BufferedReader(new FileReader(manyLinesFile)),
reader {
val s = reader.readLine()
if (s != null && s.contains("b")) throw TE("") else Option(s)
},
reader reader.close()).withAttributes(supervisionStrategy(resumingDecider))
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[String]()
p.subscribe(c)
val sub = c.expectSubscription()
(0 to 49).foreach(i {
sub.request(1)
c.expectNext() should ===(if (i < 10) manyLinesArray(i) else manyLinesArray(i + 10))
})
sub.request(1)
c.expectComplete()
}
"close and open stream again when Strategy is Restart" in assertAllStagesStopped {
val p = Source.unfoldResource[String, BufferedReader](() new BufferedReader(new FileReader(manyLinesFile)),
reader {
val s = reader.readLine()
if (s != null && s.contains("b")) throw TE("") else Option(s)
},
reader reader.close()).withAttributes(supervisionStrategy(restartingDecider))
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[String]()
p.subscribe(c)
val sub = c.expectSubscription()
(0 to 19).foreach(i {
sub.request(1)
c.expectNext() should ===(manyLinesArray(0))
})
sub.cancel()
}
"work with ByteString as well" in assertAllStagesStopped {
val chunkSize = 50
val buffer = Array.ofDim[Char](chunkSize)
val p = Source.unfoldResource[ByteString, Reader](() new BufferedReader(new FileReader(manyLinesFile)),
reader {
val s = reader.read(buffer)
if (s > 0) Some(ByteString(buffer.mkString("")).take(s)) else None
},
reader reader.close())
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[ByteString]()
var remaining = manyLines
def nextChunk() = {
val (chunk, rest) = remaining.splitAt(chunkSize)
remaining = rest
chunk
}
p.subscribe(c)
val sub = c.expectSubscription()
(0 to 121).foreach(i {
sub.request(1)
c.expectNext().utf8String should ===(nextChunk().toString)
})
sub.request(1)
c.expectComplete()
}
"use dedicated blocking-io-dispatcher by default" in assertAllStagesStopped {
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
val materializer = ActorMaterializer()(sys)
try {
val p = Source.unfoldResource[String, BufferedReader](() new BufferedReader(new FileReader(manyLinesFile)),
reader Option(reader.readLine()),
reader reader.close()).runWith(TestSink.probe)(materializer)
materializer.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "unfoldResourceSource").get
try assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher") finally p.cancel()
} finally shutdown(sys)
}
"fail when create throws exception" in assertAllStagesStopped {
val p = Source.unfoldResource[String, BufferedReader](() throw TE(""),
reader Option(reader.readLine()),
reader reader.close())
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[String]()
p.subscribe(c)
c.expectSubscription()
c.expectError(TE(""))
}
"fail when close throws exception" in assertAllStagesStopped {
val p = Source.unfoldResource[String, BufferedReader](() new BufferedReader(new FileReader(manyLinesFile)),
reader Option(reader.readLine()),
reader throw TE(""))
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[String]()
p.subscribe(c)
val sub = c.expectSubscription()
sub.request(61)
c.expectNextN(60)
c.expectError(TE(""))
}
}
override def afterTermination(): Unit = {
manyLinesFile.delete()
}
}

View file

@ -3,14 +3,20 @@
*/
package akka.stream.impl
import akka.dispatch.ExecutionContexts
import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream.OverflowStrategies._
import akka.stream._
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.stage._
import scala.concurrent.{ Future, Promise }
import akka.stream.scaladsl.SourceQueueWithComplete
import scala.annotation.tailrec
import scala.concurrent.{ Future, Promise }
import akka.Done
import java.util.concurrent.CompletionStage
import scala.compat.java8.FutureConverters._
import scala.util.Try
import scala.util.control.NonFatal
/**
* INTERNAL API
@ -185,3 +191,143 @@ private[akka] final class SourceQueueAdapter[T](delegate: SourceQueueWithComplet
def complete(): Unit = delegate.complete()
def fail(ex: Throwable): Unit = delegate.fail(ex)
}
/**
* INTERNAL API
*/
private[stream] final class UnfoldResourceSource[T, S](create: () S,
readData: (S) Option[T],
close: (S) Unit) extends GraphStage[SourceShape[T]] {
val out = Outlet[T]("UnfoldResourceSource.out")
override val shape = SourceShape(out)
override def initialAttributes: Attributes = DefaultAttributes.unfoldResourceSource
def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler {
lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
var blockingStream: S = _
setHandler(out, this)
override def preStart(): Unit = blockingStream = create()
@tailrec
final override def onPull(): Unit = {
var resumingMode = false
try {
readData(blockingStream) match {
case Some(data) push(out, data)
case None closeStage()
}
} catch {
case NonFatal(ex) decider(ex) match {
case Supervision.Stop
close(blockingStream)
failStage(ex)
case Supervision.Restart
restartState()
resumingMode = true
case Supervision.Resume
resumingMode = true
}
}
if (resumingMode) onPull()
}
override def onDownstreamFinish(): Unit = closeStage()
private def restartState(): Unit = {
close(blockingStream)
blockingStream = create()
}
private def closeStage(): Unit =
try {
close(blockingStream)
completeStage()
} catch {
case NonFatal(ex) failStage(ex)
}
}
override def toString = "UnfoldResourceSource"
}
private[stream] final class UnfoldResourceSourceAsync[T, S](create: () Future[S],
readData: (S) Future[Option[T]],
close: (S) Future[Done]) extends GraphStage[SourceShape[T]] {
val out = Outlet[T]("UnfoldResourceSourceAsync.out")
override val shape = SourceShape(out)
override def initialAttributes: Attributes = DefaultAttributes.unfoldResourceSourceAsync
def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler {
lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
var resource = Promise[S]()
implicit val context = ExecutionContexts.sameThreadExecutionContext
setHandler(out, this)
override def preStart(): Unit = createStream(false)
private def createStream(withPull: Boolean): Unit = {
val cb = getAsyncCallback[Try[S]] {
case scala.util.Success(res)
resource.success(res)
if (withPull) onPull()
case scala.util.Failure(t) failStage(t)
}
try {
create().onComplete(cb.invoke)
} catch {
case NonFatal(ex) failStage(ex)
}
}
private def onResourceReady(f: (S) Unit): Unit = resource.future.onSuccess {
case resource f(resource)
}
val errorHandler: PartialFunction[Throwable, Unit] = {
case NonFatal(ex) decider(ex) match {
case Supervision.Stop
onResourceReady(close(_))
failStage(ex)
case Supervision.Restart restartState()
case Supervision.Resume onPull()
}
}
val callback = getAsyncCallback[Try[Option[T]]] {
case scala.util.Success(data) data match {
case Some(d) push(out, d)
case None closeStage()
}
case scala.util.Failure(t) errorHandler(t)
}.invoke _
final override def onPull(): Unit = onResourceReady {
case resource
try { readData(resource).onComplete(callback) } catch errorHandler
}
override def onDownstreamFinish(): Unit = closeStage()
private def closeAndThen(f: () Unit): Unit = {
setKeepGoing(true)
val cb = getAsyncCallback[Try[Done]] {
case scala.util.Success(_) f()
case scala.util.Failure(t) failStage(t)
}
onResourceReady(res
try { close(res).onComplete(cb.invoke) } catch {
case NonFatal(ex) failStage(ex)
})
}
private def restartState(): Unit = closeAndThen(() {
resource = Promise[S]()
createStream(true)
})
private def closeStage(): Unit = closeAndThen(completeStage)
}
override def toString = "UnfoldResourceSourceAsync"
}

View file

@ -95,6 +95,8 @@ private[stream] object Stages {
val inputStreamSource = name("inputStreamSource") and IODispatcher
val outputStreamSource = name("outputStreamSource") and IODispatcher
val fileSource = name("fileSource") and IODispatcher
val unfoldResourceSource = name("unfoldResourceSource") and IODispatcher
val unfoldResourceSourceAsync = name("unfoldResourceSourceAsync") and IODispatcher
val subscriberSink = name("subscriberSink")
val cancelledSink = name("cancelledSink")

View file

@ -9,7 +9,7 @@ import akka.stream.Attributes.{ InputBuffer, LogLevels }
import akka.stream.OverflowStrategies._
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.impl.{ Buffer BufferImpl, ReactiveStreamsCompliance }
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.{ SourceQueue, Source }
import akka.stream.stage._
import akka.stream.{ Supervision, _ }
import scala.annotation.tailrec
@ -507,7 +507,7 @@ private[akka] final case class Batch[In, Out](max: Long, costFn: In ⇒ Long, se
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
private var agg: Out = null.asInstanceOf[Out]
private var left: Long = max
@ -699,7 +699,7 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut
override def toString = s"MapAsync.Logic(buffer=$buffer)"
//FIXME Put Supervision.stoppingDecider as a SupervisionStrategy on DefaultAttributes.mapAsync?
val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
var buffer: BufferImpl[Holder[Try[Out]]] = _
def todo = buffer.used
@ -1238,7 +1238,7 @@ private[stream] final class StatefulMapConcat[In, Out](f: () ⇒ In ⇒ immutabl
override def initialAttributes: Attributes = DefaultAttributes.statefulMapConcat
def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler {
val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
var currentIterator: Iterator[Out] = _
var plainFun = f()
def hasNext = if (currentIterator != null) currentIterator.hasNext else false

View file

@ -10,7 +10,7 @@ import akka.actor.{ ActorRef, Cancellable, Props }
import akka.event.LoggingAdapter
import akka.japi.{ Pair, Util, function }
import akka.stream._
import akka.stream.impl.{ ConstantFun, StreamLayout }
import akka.stream.impl.{ ConstantFun, StreamLayout, SourceQueueAdapter }
import akka.stream.stage.Stage
import org.reactivestreams.{ Publisher, Subscriber }
import scala.annotation.unchecked.uncheckedVariance
@ -307,6 +307,62 @@ object Source {
def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueueWithComplete[T]] =
new Source(scaladsl.Source.queue[T](bufferSize, overflowStrategy).mapMaterializedValue(new SourceQueueAdapter(_)))
/**
* Start a new `Source` from some resource which can be opened, read and closed.
* Interaction with resource happens in a blocking way.
*
* Example:
* {{{
* Source.unfoldResource(
* () -> new BufferedReader(new FileReader("...")),
* reader -> reader.readLine(),
* reader -> reader.close())
* }}}
*
* You can use the supervision strategy to handle exceptions for `read` function. All exceptions thrown by `create`
* or `close` will fail the stream.
*
* `Restart` supervision strategy will close and create blocking IO again. Default strategy is `Stop` which means
* that stream will be terminated on error in `read` function by default.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* @param create - function that is called on stream start and creates/opens resource.
* @param read - function that reads data from opened resource. It is called each time backpressure signal
* is received. Stream calls close and completes when `read` returns None.
* @param close - function that closes resource
*/
def unfoldResource[T, S](create: function.Creator[S],
read: function.Function[S, Optional[T]],
close: function.Procedure[S]): javadsl.Source[T, NotUsed] =
new Source(scaladsl.Source.unfoldResource[T,S](create.create,
(s: S) read.apply(s).asScala, close.apply))
/**
* Start a new `Source` from some resource which can be opened, read and closed.
* It's similar to `unfoldResource` but takes functions that return `CopletionStage` instead of plain values.
*
* You can use the supervision strategy to handle exceptions for `read` function or failures of produced `Futures`.
* All exceptions thrown by `create` or `close` as well as fails of returned futures will fail the stream.
*
* `Restart` supervision strategy will close and create resource. Default strategy is `Stop` which means
* that stream will be terminated on error in `read` function (or future) by default.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* @param create - function that is called on stream start and creates/opens resource.
* @param read - function that reads data from opened resource. It is called each time backpressure signal
* is received. Stream calls close and completes when `CompletionStage` from read function returns None.
* @param close - function that closes resource
*/
def unfoldResourceAsync[T, S](create: function.Creator[CompletionStage[S]],
read: function.Function[S, CompletionStage[Optional[T]]],
close: function.Function[S, CompletionStage[Done]]): javadsl.Source[T, NotUsed] =
new Source(scaladsl.Source.unfoldResourceAsync[T,S](() create.create().toScala,
(s: S) read.apply(s).toScala.map(_.asScala)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext),
(s: S) close.apply(s).toScala))
}
/**

View file

@ -440,4 +440,53 @@ object Source {
def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueueWithComplete[T]] =
Source.fromGraph(new QueueSource(bufferSize, overflowStrategy).withAttributes(DefaultAttributes.queueSource))
/**
* Start a new `Source` from some resource which can be opened, read and closed.
* Interaction with resource happens in a blocking way.
*
* Example:
* {{{
* Source.unfoldResource(
* () => new BufferedReader(new FileReader("...")),
* reader => Option(reader.readLine()),
* reader => reader.close())
* }}}
*
* You can use the supervision strategy to handle exceptions for `read` function. All exceptions thrown by `create`
* or `close` will fail the stream.
*
* `Restart` supervision strategy will close and create blocking IO again. Default strategy is `Stop` which means
* that stream will be terminated on error in `read` function by default.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* @param create - function that is called on stream start and creates/opens resource.
* @param read - function that reads data from opened resource. It is called each time backpressure signal
* is received. Stream calls close and completes when `read` returns None.
* @param close - function that closes resource
*/
def unfoldResource[T, S](create: () S, read: (S) Option[T], close: (S) Unit): Source[T, NotUsed] =
Source.fromGraph(new UnfoldResourceSource(create, read, close))
/**
* Start a new `Source` from some resource which can be opened, read and closed.
* It's similar to `unfoldResource` but takes functions that return `Futures` instead of plain values.
*
* You can use the supervision strategy to handle exceptions for `read` function or failures of produced `Futures`.
* All exceptions thrown by `create` or `close` as well as fails of returned futures will fail the stream.
*
* `Restart` supervision strategy will close and create resource. Default strategy is `Stop` which means
* that stream will be terminated on error in `read` function (or future) by default.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* @param create - function that is called on stream start and creates/opens resource.
* @param read - function that reads data from opened resource. It is called each time backpressure signal
* is received. Stream calls close and completes when `Future` from read function returns None.
* @param close - function that closes resource
*/
def unfoldResourceAsync[T, S](create: () Future[S], read: (S) Future[Option[T]], close: (S) Future[Done]): Source[T, NotUsed] =
Source.fromGraph(new UnfoldResourceSourceAsync(create, read, close))
}