feat: Add mapWithResource stream operator. (#931)

This commit is contained in:
He-Pin(kerr) 2024-01-16 18:29:05 +08:00 committed by GitHub
parent d6ae9979f9
commit bd8ee25b80
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 885 additions and 2 deletions

View file

@ -0,0 +1,69 @@
# mapWithResource
Map elements with the help of a resource that can be opened, transform each element (in a blocking way) and closed.
@ref[Simple operators](../index.md#simple-operators)
## Signature
@apidoc[Flow.mapWithResource](Flow) { scala="#mapWithResource%5BS%2C%20T%5D%28create%3A%20%28%29%20%3D%3E%20S%29%28f%3A%20%28S%2C%20Out%29%20%3D%3E%20T%2C%20close%3A%20S%20%3D%3E%20Option%5BT%5D%29%3A%20Repr%5BT%5D" java="#mapWithResource(org.apache.pekko.japi.function.Creator,org.apache.pekko.japi.function.Function2,org.apache.pekko.japi.function.Function)" }
1. `create`: Open or Create the resource.
2. `f`: Transform each element inputs with the help of resource.
3. `close`: Close the resource, invoked on end of stream or if the stream fails, optionally outputting a last element.
## Description
Transform each stream element with the help of a resource.
The functions are by default called on Pekko's dispatcher for blocking IO to avoid interfering with other stream operations.
See @ref:[Blocking Needs Careful Management](../../../typed/dispatchers.md#blocking-needs-careful-management) for an explanation on why this is important.
The resource creation function is invoked once when the stream is materialized and the returned resource is passed to the mapping function for mapping the first element. The mapping function returns a mapped element to emit downstream. The returned T MUST NOT be null as it is illegal as stream element - according to the Reactive Streams specification.
The close function is called when upstream or downstream completes normally or exceptionally, and will be called only once.
- upstream completes or fails, the optional value returns by `close` will be emitted to downstream if defined.
- downstream cancels or fails, the optional value returns by `close` will be ignored.
- shutdowns abruptly, the optional value returns by `close` will be ignored.
You can do some clean-up here.
Early completion can be done with combination of the @apidoc[Flow.takeWhile](Flow) operator.
See also @ref:[unfoldResource](../Source/unfoldResource.md), @ref:[unfoldResourceAsync](../Source/unfoldResourceAsync.md).
You can configure the default dispatcher for this Source by changing the `org.apache.pekko.stream.materializer.blocking-io-dispatcher`
or set it for a given Source by using ActorAttributes.
## Examples
Imagine we have a database API which may potentially block when we perform a query,
and the database connection can be reused for each query.
Scala
: @@snip [UnfoldResource.scala](/docs/src/test/scala/docs/stream/operators/sourceorflow/MapWithResource.scala) { #mapWithResource-blocking-api }
Java
: @@snip [UnfoldResource.java](/docs/src/test/java/jdocs/stream/operators/sourceorflow/MapWithResource.java) { #mapWithResource-blocking-api }
Let's see how we make use of the API above safely through `mapWithResource`:
Scala
: @@snip [UnfoldResource.scala](/docs/src/test/scala/docs/stream/operators/sourceorflow/MapWithResource.scala) { #mapWithResource }
Java
: @@snip [UnfoldResource.java](/docs/src/test/java/jdocs/stream/operators/sourceorflow/MapWithResource.java) { #mapWithResource }
In this example we retrieve data form two tables with the same shared connection, and transform the results
to individual records with @scala[`mapConcat(identity)`]@java[`mapConcat(elems -> elems)`], once done the connection is closed.
## Reactive Streams semantics
@@@div { .callout }
**emits** the mapping function returns an element and downstream is ready to consume it
**backpressures** downstream backpressures
**completes** upstream completes
**cancels** downstream cancels
@@@

View file

@ -171,6 +171,7 @@ depending on being backpressured by downstream or not.
|Source/Flow|<a name="logwithmarker"></a>@ref[logWithMarker](Source-or-Flow/logWithMarker.md)|Log elements flowing through the stream as well as completion and erroring.|
|Source/Flow|<a name="map"></a>@ref[map](Source-or-Flow/map.md)|Transform each element in the stream by calling a mapping function with it and passing the returned value downstream.|
|Source/Flow|<a name="mapconcat"></a>@ref[mapConcat](Source-or-Flow/mapConcat.md)|Transform each element into zero or more elements that are individually passed downstream.|
|Source/Flow|<a name="mapwithresource"></a>@ref[mapWithResource](Source-or-Flow/mapWithResource.md)|Map elements with the help of a resource that can be opened, transform each element (in a blocking way) and closed.|
|Source/Flow|<a name="prematerialize"></a>@ref[preMaterialize](Source-or-Flow/preMaterialize.md)|Materializes this Graph, immediately returning (1) its materialized value, and (2) a new pre-materialized Graph.|
|Source/Flow|<a name="reduce"></a>@ref[reduce](Source-or-Flow/reduce.md)|Start with first element and then apply the current and next value to the given function, when upstream complete the current value is emitted downstream.|
|Source/Flow|<a name="scan"></a>@ref[scan](Source-or-Flow/scan.md)|Emit its current value, which starts at `zero`, and then apply the current and next value to the given function, emitting the next current value.|
@ -522,6 +523,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [mapAsyncUnordered](Source-or-Flow/mapAsyncUnordered.md)
* [mapConcat](Source-or-Flow/mapConcat.md)
* [mapError](Source-or-Flow/mapError.md)
* [mapWithResource](Source-or-Flow/mapWithResource.md)
* [maybe](Source/maybe.md)
* [merge](Source-or-Flow/merge.md)
* [mergeAll](Source-or-Flow/mergeAll.md)

View file

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package jdocs.stream.operators.sourceorflow;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.stream.javadsl.Source;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
public interface MapWithResource {
// #mapWithResource-blocking-api
interface DBDriver {
Connection create(URL url, String userName, String password);
}
interface Connection {
void close();
}
interface Database {
// blocking query
QueryResult doQuery(Connection connection, String query);
}
interface QueryResult {
boolean hasMore();
// potentially blocking retrieval of each element
DatabaseRecord next();
// potentially blocking retrieval all element
List<DatabaseRecord> toList();
}
interface DatabaseRecord {}
// #mapWithResource-blocking-api
default void mapWithResourceExample() {
final ActorSystem system = null;
final URL url = null;
final String userName = "Akka";
final String password = "Hakking";
final DBDriver dbDriver = null;
// #mapWithResource
// some database for JVM
final Database db = null;
Source.from(
Arrays.asList(
"SELECT * FROM shop ORDER BY article-0000 order by gmtModified desc limit 100;",
"SELECT * FROM shop ORDER BY article-0001 order by gmtModified desc limit 100;"))
.mapWithResource(
() -> dbDriver.create(url, userName, password),
(connection, query) -> db.doQuery(connection, query).toList(),
connection -> {
connection.close();
return Optional.empty();
})
.mapConcat(elems -> elems)
.runForeach(System.out::println, system);
// #mapWithResource
}
}

View file

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package docs.stream.operators.sourceorflow
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.Source
import java.net.URL
object MapWithResource {
implicit val actorSystem: ActorSystem = ???
// #mapWithResource-blocking-api
trait DBDriver {
def create(url: URL, userName: String, password: String): Connection
}
trait Connection {
def close(): Unit
}
trait Database {
// blocking query
def doQuery(connection: Connection, query: String): QueryResult = ???
}
trait QueryResult {
def hasMore: Boolean
// potentially blocking retrieval of each element
def next(): DataBaseRecord
// potentially blocking retrieval all element
def toList(): List[DataBaseRecord]
}
trait DataBaseRecord
// #mapWithResource-blocking-api
val url: URL = ???
val userName = "Akka"
val password = "Hakking"
val dbDriver: DBDriver = ???
def mapWithResourceExample(): Unit = {
// #mapWithResource
// some database for JVM
val db: Database = ???
Source(
List(
"SELECT * FROM shop ORDER BY article-0000 order by gmtModified desc limit 100;",
"SELECT * FROM shop ORDER BY article-0001 order by gmtModified desc limit 100;"))
.mapWithResource(() => dbDriver.create(url, userName, password))(
(connection, query) => db.doQuery(connection, query).toList(),
conn => {
conn.close()
None
})
.mapConcat(identity)
.runForeach(println)
// #mapWithResource
}
}

View file

@ -36,6 +36,7 @@ import org.reactivestreams.Publisher;
import org.apache.pekko.testkit.PekkoJUnitActorSystemResource;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
@ -216,6 +217,26 @@ public class FlowTest extends StreamTest {
Assert.assertEquals("[1, 2][3, 4][5]", grouped.toCompletableFuture().get(3, TimeUnit.SECONDS));
}
@Test
public void mustBeAbleToUseMapWithResource() {
final AtomicBoolean gate = new AtomicBoolean(true);
Source.from(Arrays.asList("1", "2", "3"))
.via(
Flow.of(String.class)
.mapWithResource(
() -> "resource",
(resource, elem) -> elem,
(resource) -> {
gate.set(false);
return Optional.of("end");
}))
.runWith(TestSink.create(system), system)
.request(4)
.expectNext("1", "2", "3", "end")
.expectComplete();
Assert.assertFalse(gate.get());
}
@Test
public void mustBeAbleToUseIntersperse() throws Exception {
final TestKit probe = new TestKit(system);

View file

@ -46,6 +46,7 @@ import scala.util.Try;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@ -796,6 +797,24 @@ public class SourceTest extends StreamTest {
Assert.assertEquals("12345", result.toCompletableFuture().get(3, TimeUnit.SECONDS));
}
@Test
public void mustBeAbleToUseMapWithResource() {
final AtomicBoolean gate = new AtomicBoolean(true);
Source.from(Arrays.asList("1", "2", "3"))
.mapWithResource(
() -> "resource",
(resource, elem) -> elem,
(resource) -> {
gate.set(false);
return Optional.of("end");
})
.runWith(TestSink.create(system), system)
.request(4)
.expectNext("1", "2", "3", "end")
.expectComplete();
Assert.assertFalse(gate.get());
}
@Test
public void mustBeAbleToUseIntersperse() throws Exception {
final TestKit probe = new TestKit(system);

View file

@ -0,0 +1,417 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pekko.stream.scaladsl
import java.io.BufferedReader
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.{ nowarn, tailrec }
import scala.collection.mutable.ListBuffer
import scala.concurrent.{ Await, Promise }
import scala.concurrent.duration.DurationInt
import scala.util.Success
import scala.util.control.NoStackTrace
import com.google.common.jimfs.{ Configuration, Jimfs }
import org.apache.pekko
import pekko.Done
import pekko.stream.{ AbruptTerminationException, ActorAttributes, ActorMaterializer, SystemMaterializer }
import pekko.stream.ActorAttributes.supervisionStrategy
import pekko.stream.Supervision.{ restartingDecider, resumingDecider }
import pekko.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
import pekko.stream.impl.StreamSupervisor.Children
import pekko.stream.testkit.{ StreamSpec, TestSubscriber }
import pekko.stream.testkit.Utils.{ assertDispatcher, TE, UnboundedMailboxConfig }
import pekko.stream.testkit.scaladsl.{ TestSink, TestSource }
import pekko.testkit.EventFilter
import pekko.util.ByteString
class FlowMapWithResourceSpec extends StreamSpec(UnboundedMailboxConfig) {
private val fs = Jimfs.newFileSystem("MapWithResourceSpec", Configuration.unix())
private val ex = new Exception("TEST") with NoStackTrace
private val manyLines = {
("a" * 100 + "\n") * 10 +
("b" * 100 + "\n") * 10 +
("c" * 100 + "\n") * 10 +
("d" * 100 + "\n") * 10 +
("e" * 100 + "\n") * 10 +
("f" * 100 + "\n") * 10
}
private val manyLinesArray = manyLines.split("\n")
private val manyLinesPath = {
val file = Files.createFile(fs.getPath("/testMapWithResource.dat"))
Files.write(file, manyLines.getBytes(StandardCharsets.UTF_8))
}
private def newBufferedReader() = Files.newBufferedReader(manyLinesPath, StandardCharsets.UTF_8)
private def readLines(reader: BufferedReader, maxCount: Int): List[String] = {
if (maxCount == 0) {
return List.empty
}
@tailrec
def loop(builder: ListBuffer[String], n: Int): ListBuffer[String] = {
if (n == 0) {
builder
} else {
val line = reader.readLine()
if (line eq null)
builder
else {
builder += line
loop(builder, n - 1)
}
}
}
loop(ListBuffer.empty, maxCount).result()
}
"MapWithResource" must {
"can read contents from a file" in {
val p = Source(List(1, 10, 20, 30))
.mapWithResource(() => newBufferedReader())((reader, count) => {
readLines(reader, count)
},
reader => {
reader.close()
None
})
.mapConcat(identity)
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[String]()
p.subscribe(c)
val sub = c.expectSubscription()
val chunks = manyLinesArray.toList.iterator
sub.request(1)
c.expectNext() should ===(chunks.next())
sub.request(1)
c.expectNext() should ===(chunks.next())
c.expectNoMessage(300.millis)
while (chunks.hasNext) {
sub.request(1)
c.expectNext() should ===(chunks.next())
}
sub.request(1)
c.expectComplete()
}
"continue when Strategy is Resume and exception happened" in {
val p = Source
.repeat(1)
.take(100)
.mapWithResource(() => newBufferedReader())((reader, _) => {
val s = reader.readLine()
if (s != null && s.contains("b")) throw TE("") else s
},
reader => {
reader.close()
None
})
.withAttributes(supervisionStrategy(resumingDecider))
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[String]()
p.subscribe(c)
val sub = c.expectSubscription()
(0 to 49).foreach(i => {
sub.request(1)
c.expectNext() should ===(if (i < 10) manyLinesArray(i) else manyLinesArray(i + 10))
})
sub.request(1)
c.expectComplete()
}
"close and open stream again when Strategy is Restart" in {
val p = Source
.repeat(1)
.take(100)
.mapWithResource(() => newBufferedReader())((reader, _) => {
val s = reader.readLine()
if (s != null && s.contains("b")) throw TE("") else s
},
reader => {
reader.close()
None
})
.withAttributes(supervisionStrategy(restartingDecider))
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[String]()
p.subscribe(c)
val sub = c.expectSubscription()
(0 to 19).foreach(_ => {
sub.request(1)
c.expectNext() should ===(manyLinesArray(0))
})
sub.cancel()
}
"work with ByteString as well" in {
val chunkSize = 50
val buffer = new Array[Char](chunkSize)
val p = Source
.repeat(1)
.mapWithResource(() => newBufferedReader())((reader, _) => {
val s = reader.read(buffer)
if (s > 0) Some(ByteString(buffer.mkString("")).take(s)) else None
},
reader => {
reader.close()
None
})
.takeWhile(_.isDefined)
.collect {
case Some(bytes) => bytes
}
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[ByteString]()
var remaining = manyLines
def nextChunk() = {
val (chunk, rest) = remaining.splitAt(chunkSize)
remaining = rest
chunk
}
p.subscribe(c)
val sub = c.expectSubscription()
(0 to 121).foreach(_ => {
sub.request(1)
c.expectNext().utf8String should ===(nextChunk())
})
sub.request(1)
c.expectComplete()
}
"use dedicated blocking-io-dispatcher by default" in {
val p = Source
.single(1)
.mapWithResource(() => newBufferedReader())((reader, _) => Option(reader.readLine()),
reader => {
reader.close()
None
})
.runWith(TestSink.probe)
SystemMaterializer(system).materializer
.asInstanceOf[PhasedFusingActorMaterializer]
.supervisor
.tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "mapWithResource").get
try assertDispatcher(ref, ActorAttributes.IODispatcher.dispatcher)
finally p.cancel()
}
"fail when create throws exception" in {
EventFilter[TE](occurrences = 1).intercept {
val p = Source
.single(1)
.mapWithResource[BufferedReader, String](() => throw TE(""))((reader, _) => reader.readLine(),
reader => {
reader.close()
None
})
.runWith(Sink.asPublisher(false))
val c = TestSubscriber.manualProbe[String]()
p.subscribe(c)
c.expectSubscription()
c.expectError(TE(""))
}
}
"fail when close throws exception" in {
val (pub, sub) = TestSource[Int]()
.mapWithResource(() => Iterator("a"))((it, _) => if (it.hasNext) Some(it.next()) else None, _ => throw TE(""))
.collect { case Some(line) => line }
.toMat(TestSink())(Keep.both)
.run()
pub.ensureSubscription()
sub.ensureSubscription()
sub.request(1)
pub.sendNext(1)
sub.expectNext("a")
pub.sendComplete()
sub.expectError(TE(""))
}
"not close the resource twice when read fails" in {
val closedCounter = new AtomicInteger(0)
val probe = Source
.repeat(1)
.mapWithResource(() => 23)( // the best resource there is
(_, _) => throw TE("failing read"),
_ => {
closedCounter.incrementAndGet()
None
})
.runWith(TestSink.probe[Int])
probe.request(1)
probe.expectError(TE("failing read"))
closedCounter.get() should ===(1)
}
"not close the resource twice when read fails and then close fails" in {
val closedCounter = new AtomicInteger(0)
val probe = Source
.repeat(1)
.mapWithResource(() => 23)((_, _) => throw TE("failing read"),
_ => {
closedCounter.incrementAndGet()
if (closedCounter.get == 1) throw TE("boom")
None
})
.runWith(TestSink.probe[Int])
EventFilter[TE](occurrences = 1).intercept {
probe.request(1)
probe.expectError(TE("boom"))
}
closedCounter.get() should ===(1)
}
"will close the resource when upstream complete" in {
val closedCounter = new AtomicInteger(0)
val (pub, sub) = TestSource
.probe[Int]
.mapWithResource(() => newBufferedReader())((reader, count) => readLines(reader, count),
reader => {
reader.close()
closedCounter.incrementAndGet()
Some(List("End"))
})
.mapConcat(identity)
.toMat(TestSink.probe)(Keep.both)
.run()
sub.expectSubscription().request(2)
pub.sendNext(1)
sub.expectNext(manyLinesArray(0))
pub.sendComplete()
sub.expectNext("End")
sub.expectComplete()
closedCounter.get shouldBe 1
}
"will close the resource when upstream fail" in {
val closedCounter = new AtomicInteger(0)
val (pub, sub) = TestSource
.probe[Int]
.mapWithResource(() => newBufferedReader())((reader, count) => readLines(reader, count),
reader => {
reader.close()
closedCounter.incrementAndGet()
Some(List("End"))
})
.mapConcat(identity)
.toMat(TestSink.probe)(Keep.both)
.run()
sub.expectSubscription().request(2)
pub.sendNext(1)
sub.expectNext(manyLinesArray(0))
pub.sendError(ex)
sub.expectNext("End")
sub.expectError(ex)
closedCounter.get shouldBe 1
}
"will close the resource when downstream cancel" in {
val closedCounter = new AtomicInteger(0)
val (pub, sub) = TestSource
.probe[Int]
.mapWithResource(() => newBufferedReader())((reader, count) => readLines(reader, count),
reader => {
reader.close()
closedCounter.incrementAndGet()
Some(List("End"))
})
.mapConcat(identity)
.toMat(TestSink.probe)(Keep.both)
.run()
val subscription = sub.expectSubscription()
subscription.request(2)
pub.sendNext(1)
sub.expectNext(manyLinesArray(0))
subscription.cancel()
pub.expectCancellation()
closedCounter.get shouldBe 1
}
"will close the resource when downstream fail" in {
val closedCounter = new AtomicInteger(0)
val (pub, sub) = TestSource
.probe[Int]
.mapWithResource(() => newBufferedReader())((reader, count) => readLines(reader, count),
reader => {
reader.close()
closedCounter.incrementAndGet()
Some(List("End"))
})
.mapConcat(identity)
.toMat(TestSink.probe)(Keep.both)
.run()
sub.request(2)
pub.sendNext(2)
sub.expectNext(manyLinesArray(0))
sub.expectNext(manyLinesArray(1))
sub.cancel(ex)
pub.expectCancellationWithCause(ex)
closedCounter.get shouldBe 1
}
"will close the resource on abrupt materializer termination" in {
@nowarn("msg=deprecated")
val mat = ActorMaterializer()
val promise = Promise[Done]()
val matVal = Source
.single(1)
.mapWithResource(() => {
newBufferedReader()
})((reader, count) => readLines(reader, count),
reader => {
reader.close()
promise.complete(Success(Done))
Some(List("End"))
})
.mapConcat(identity)
.runWith(Sink.never)(mat)
mat.shutdown()
matVal.failed.futureValue shouldBe an[AbruptTerminationException]
Await.result(promise.future, 3.seconds) shouldBe Done
}
}
override def afterTermination(): Unit = {
fs.close()
}
}

View file

@ -44,6 +44,7 @@ import pekko.stream.Attributes._
val mapAsyncUnordered = name("mapAsyncUnordered")
val mapAsyncPartition = name("mapAsyncPartition")
val mapAsyncPartitionUnordered = name("mapAsyncPartitionUnordered")
val mapWithResource = name("mapWithResource") and IODispatcher
val ask = name("ask")
val grouped = name("grouped")
val groupedWithin = name("groupedWithin")

View file

@ -2225,7 +2225,7 @@ private[pekko] final class StatefulMap[S, In, Out](create: () => S, f: (S, In) =
private val out = Outlet[Out]("StatefulMap.out")
override val shape: FlowShape[In, Out] = FlowShape(in, out)
override protected def initialAttributes: Attributes = DefaultAttributes.statefulMap and SourceLocation.forLambda(f)
override protected def initialAttributes: Attributes = Attributes(SourceLocation.forLambda(f))
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {

View file

@ -787,6 +787,47 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
(s: S, out: Out) => f.apply(s, out).toScala,
(s: S) => onComplete.apply(s).toScala))
/**
* Transform each stream element with the help of a resource.
*
* The resource creation function is invoked once when the stream is materialized and the returned resource is passed to
* the mapping function for mapping the first element. The mapping function returns a mapped element to emit
* downstream. The returned `T` MUST NOT be `null` as it is illegal as stream element - according to the Reactive Streams specification.
*
* The `close` function is called only once when the upstream or downstream finishes or fails. You can do some clean-up here,
* and if the returned value is not empty, it will be emitted to the downstream if available, otherwise the value will be dropped.
*
* Early completion can be done with combination of the [[takeWhile]] operator.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* '''Emits when''' the mapping function returns an element and downstream is ready to consume it
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @tparam R the type of the resource
* @tparam T the type of the output elements
* @param create function that creates the resource
* @param f function that transforms the upstream element and the resource to output element
* @param close function that closes the resource, optionally outputting a last element
* @since 1.1.0
*/
def mapWithResource[R, T](
create: function.Creator[R],
f: function.Function2[R, Out, T],
close: function.Function[R, Optional[T]]): javadsl.Flow[In, T, Mat] =
new Flow(
delegate.mapWithResource(() => create.create())(
(resource, out) => f(resource, out),
resource => close.apply(resource).toScala))
/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful,

View file

@ -2500,6 +2500,47 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
(s: S, out: Out) => f.apply(s, out).toScala,
(s: S) => onComplete.apply(s).toScala))
/**
* Transform each stream element with the help of a resource.
*
* The resource creation function is invoked once when the stream is materialized and the returned resource is passed to
* the mapping function for mapping the first element. The mapping function returns a mapped element to emit
* downstream. The returned `T` MUST NOT be `null` as it is illegal as stream element - according to the Reactive Streams specification.
*
* The `close` function is called only once when the upstream or downstream finishes or fails. You can do some clean-up here,
* and if the returned value is not empty, it will be emitted to the downstream if available, otherwise the value will be dropped.
*
* Early completion can be done with combination of the [[takeWhile]] operator.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* '''Emits when''' the mapping function returns an element and downstream is ready to consume it
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @tparam R the type of the resource
* @tparam T the type of the output elements
* @param create function that creates the resource
* @param f function that transforms the upstream element and the resource to output element
* @param close function that closes the resource, optionally outputting a last element
* @since 1.1.0
*/
def mapWithResource[R, T](
create: function.Creator[R],
f: function.Function2[R, Out, T],
close: function.Function[R, Optional[T]]): javadsl.Source[T, Mat] =
new Source(
delegate.mapWithResource(() => create.create())(
(resource, out) => f(resource, out),
resource => close.apply(resource).toScala))
/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful,

View file

@ -244,6 +244,47 @@ class SubFlow[In, Out, Mat](
(s: S, out: Out) => f.apply(s, out).toScala,
(s: S) => onComplete.apply(s).toScala))
/**
* Transform each stream element with the help of a resource.
*
* The resource creation function is invoked once when the stream is materialized and the returned resource is passed to
* the mapping function for mapping the first element. The mapping function returns a mapped element to emit
* downstream. The returned `T` MUST NOT be `null` as it is illegal as stream element - according to the Reactive Streams specification.
*
* The `close` function is called only once when the upstream or downstream finishes or fails. You can do some clean-up here,
* and if the returned value is not empty, it will be emitted to the downstream if available, otherwise the value will be dropped.
*
* Early completion can be done with combination of the [[takeWhile]] operator.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* '''Emits when''' the mapping function returns an element and downstream is ready to consume it
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @tparam R the type of the resource
* @tparam T the type of the output elements
* @param create function that creates the resource
* @param f function that transforms the upstream element and the resource to output element
* @param close function that closes the resource, optionally outputting a last element
* @since 1.1.0
*/
def mapWithResource[R, T](
create: function.Creator[R],
f: function.Function2[R, Out, T],
close: function.Function[R, Optional[T]]): javadsl.SubFlow[In, T, Mat] =
new SubFlow(
delegate.mapWithResource(() => create.create())(
(resource, out) => f(resource, out),
resource => close.apply(resource).toScala))
/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful,

View file

@ -235,6 +235,47 @@ class SubSource[Out, Mat](
(s: S, out: Out) => f.apply(s, out).toScala,
(s: S) => onComplete.apply(s).toScala))
/**
* Transform each stream element with the help of a resource.
*
* The resource creation function is invoked once when the stream is materialized and the returned resource is passed to
* the mapping function for mapping the first element. The mapping function returns a mapped element to emit
* downstream. The returned `T` MUST NOT be `null` as it is illegal as stream element - according to the Reactive Streams specification.
*
* The `close` function is called only once when the upstream or downstream finishes or fails. You can do some clean-up here,
* and if the returned value is not empty, it will be emitted to the downstream if available, otherwise the value will be dropped.
*
* Early completion can be done with combination of the [[takeWhile]] operator.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* '''Emits when''' the mapping function returns an element and downstream is ready to consume it
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @tparam R the type of the resource
* @tparam T the type of the output elements
* @param create function that creates the resource
* @param f function that transforms the upstream element and the resource to output element
* @param close function that closes the resource, optionally outputting a last element
* @since 1.1.0
*/
def mapWithResource[R, T](
create: function.Creator[R],
f: function.Function2[R, Out, T],
close: function.Function[R, Optional[T]]): javadsl.SubSource[T, Mat] =
new SubSource(
delegate.mapWithResource(() => create.create())(
(resource, out) => f(resource, out),
resource => close.apply(resource).toScala))
/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful,

View file

@ -1100,7 +1100,47 @@ trait FlowOps[+Out, +Mat] {
* @param onComplete a function that transforms the ongoing state into an optional output element
*/
def statefulMap[S, T](create: () => S)(f: (S, Out) => (S, T), onComplete: S => Option[T]): Repr[T] =
via(new StatefulMap[S, Out, T](create, f, onComplete))
via(new StatefulMap[S, Out, T](create, f, onComplete).withAttributes(DefaultAttributes.statefulMap))
/**
* Transform each stream element with the help of a resource.
*
* The resource creation function is invoked once when the stream is materialized and the returned resource is passed to
* the mapping function for mapping the first element. The mapping function returns a mapped element to emit
* downstream. The returned `T` MUST NOT be `null` as it is illegal as stream element - according to the Reactive Streams specification.
*
* The `close` function is called only once when the upstream or downstream finishes or fails. You can do some clean-up here,
* and if the returned value is not empty, it will be emitted to the downstream if available, otherwise the value will be dropped.
*
* Early completion can be done with combination of the [[takeWhile]] operator.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* You can configure the default dispatcher for this Source by changing the `akka.stream.materializer.blocking-io-dispatcher` or
* set it for a given Source by using [[ActorAttributes]].
*
* '''Emits when''' the mapping function returns an element and downstream is ready to consume it
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
* @tparam R the type of the resource
* @tparam T the type of the output elements
* @param create function that creates the resource
* @param f function that transforms the upstream element and the resource to output element
* @param close function that closes the resource, optionally outputting a last element
* @since 1.1.0
*/
def mapWithResource[R, T](create: () => R)(f: (R, Out) => T, close: R => Option[T]): Repr[T] =
via(
new StatefulMap[R, Out, T](
create,
(resource, out) => (resource, f(resource, out)),
resource => close(resource))
.withAttributes(DefaultAttributes.mapWithResource))
/**
* Transform each input element into an `Iterable` of output elements that is