+str #24229 back to Future[] API for stream refs

nitpicks
This commit is contained in:
Konrad Malawski 2018-01-22 19:13:40 +09:00 committed by Konrad `ktoso` Malawski
parent c5a2785c7c
commit 6264f8ea70
28 changed files with 242 additions and 514 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 129 KiB

After

Width:  |  Height:  |  Size: 95 KiB

Before After
Before After

Binary file not shown.

Before

Width:  |  Height:  |  Size: 83 KiB

After

Width:  |  Height:  |  Size: 95 KiB

Before After
Before After

View file

@ -5,8 +5,7 @@ an Akka Cluster.
Unlike heavier "streaming data processing" frameworks, Akka Streams are not "deployed" nor automatically distributed.
Akka stream refs are, as the name implies, references to existing parts of a stream, and can be used to create a
distributed processing framework or introduce such capabilities in specific parts of your application, however they
are not on that level of abstraction by themselves.
distributed processing framework or introduce such capabilities in specific parts of your application.
Stream refs are trivial to make use of in existing clustered Akka applications, and require no additional configuration
or setup. They automatically maintain flow-control / back-pressure over the network, and employ Akka's failure detection
@ -19,7 +18,7 @@ implement manually.
A useful way to think about stream refs is:
"like an `ActorRef`, but for Akka Streams's `Source` and `Sink`".
Since they refer to an already existing, possibly remote, `Sink` or `Source`.
Stream refs refer to an already existing, possibly remote, `Sink` or `Source`.
This is not to be mistaken with deploying streams remotely, which this feature is not intended for.
@@@
@ -48,7 +47,7 @@ Actors would usually be used to establish the stream, by means of some initial m
"I want to offer you many log elements (the stream ref)", or alternatively in the opposite way "If you need
to send me much data, here is the stream ref you can use to do so".
Since the two sides ("local" and "remote") of reach reference may be confusing to simply refer to as
Since the two sides ("local" and "remote") of each reference may be confusing to simply refer to as
"remote" and "local" -- since either side can be seen as "local" or "remote" depending how we look at it --
we propose to use the terminology "origin" and "target", which is defined by where the stream ref was created.
For `SourceRef`s, the "origin" is the side which has the data that it is going to stream out. For `SinkRef`s
@ -56,7 +55,7 @@ the "origin" side is the actor system that is ready to receive the data and has
two may be seen as duals of each other, however to explain patterns about sharing references, we found this
wording to be rather useful.
### Source Refs - offering streaming data over network
### Source Refs - offering streaming data to a remote system
A @scala[@scaladoc[`SourceRef`](akka.stream.SourceRef)]@java[@javadoc[`SourceRef`](akka.stream.SourceRef)]
can be offered to a remote actor system in order for it to consume some source of data that we have prepared
@ -89,16 +88,17 @@ The process of preparing and running a `SourceRef` powered distributed stream is
A `SourceRef` is *by design* "single-shot". i.e. it may only be materialized once.
This is in order to not complicate the mental model what materializing such value would mean.
By being single-shot, we always know what it means, and on top of those semantics offer a fan-out
by emitting multiple `SourceRef`s which target the same `Source` that uses `Broadcast`.
This also allows for fine grained control how many streams a system can expect to be running
at the same time, which is useful for capacity planning and "allowed number of concurrent streams
limiting" of clients.
While stream refs are designed to be single shot, you may use them to mimic multicast scenarios,
simply by starting a `Broadcast` stage once, and attaching multiple new streams to it, for each
emitting a new stream ref. This way each output of the broadcast is by itself an unique single-shot
reference, however they can all be powered using a single `Source` -- located before the `Broadcast` stage.
@@@
### Sink Refs - offering to receive streaming data
### Sink Refs - offering to receive streaming data from a remote system
The dual of source references are A @scala[@scaladoc[`SourceRef`](akka.stream.SinkRef)]@java[@javadoc[`SourceRef`](akka.stream.SinkRef)]s. They can be used to offer the other side the capability to
The dual of @scala[@scaladoc[`SourceRef`](akka.stream.SinkRef)]@java[@javadoc[`SourceRef`](akka.stream.SinkRef)]s.
They can be used to offer the other side the capability to
send to the *origin* side data in a streaming, flow-controlled fashion. The origin here allocates a Sink,
which could be as simple as a `Sink.foreach` or as advanced as a complex sink which streams the incoming data
into various other systems (e.g. any of the Alpakka provided Sinks).
@ -128,7 +128,7 @@ The process of preparing and running a `SinkRef` powered distributed stream is s
@@@ warning
A `SinkeRef` is *by design* "single-shot". i.e. it may only be materialized once.
A `SinkRef` is *by design* "single-shot". i.e. it may only be materialized once.
This is in order to not complicate the mental model what materializing such value would mean.
If you have an use case for building a fan-in operation accepting writes from multiple remote nodes,
@ -153,10 +153,9 @@ of data such as huge log files, messages or even media, with as much ease as if
All stream references have a subscription timeout, which is intended to prevent resource leaks
in situations in which a remote node would requests the allocation of many streams yet never actually run
them. In order to prevent this, each stream reference has a default timeout (of 30 seconds), after which
if it's "handed out" side has not been materialized, the origin will terminate with a timeout exception,
and IF the remote side eventually would be run afterwards, it would also immediately fail with an exception
pointing out that the origin seems to be missing.
them. In order to prevent this, each stream reference has a default timeout (of 30 seconds), after which the
origin will abort the stream offer if the target has not materialized the stream ref in time. After the
timeout has triggered, materialization of the target side will fail pointing out that the origin is missing.
Since these timeouts are often very different based on the kind of stream offered, and there can be
many different kinds of them in the same application, it is possible to not only configure this setting

View file

@ -8,6 +8,7 @@ import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.pattern.PatternsCS;
import akka.remote.WireFormats;
import akka.stream.*;
import akka.stream.javadsl.*;
@ -21,6 +22,7 @@ import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CompletionStage;
import static org.junit.Assert.assertEquals;
@ -61,9 +63,10 @@ public class FlowStreamRefsDocTest extends AbstractJavaTest {
private void handleRequestLogs(RequestLogs requestLogs) {
Source<String, NotUsed> logs = streamLogs(requestLogs.streamId);
SourceRef<String> logsRef = logs.runWith(Sink.sourceRef(), mat);
LogsOffer offer = new LogsOffer(logsRef);
sender().tell(offer, self());
CompletionStage<SourceRef<String>> logsRef = logs.runWith(StreamRefs.sourceRef(), mat);
PatternsCS.pipe(logsRef.thenApply(ref -> new LogsOffer(ref)), context().dispatcher())
.to(sender());
}
private Source<String, NotUsed> streamLogs(long streamId) {
@ -89,18 +92,18 @@ public class FlowStreamRefsDocTest extends AbstractJavaTest {
}
//#offer-sink
class PrepareUpload {
static class PrepareUpload {
final String id;
public PrepareUpload(String id) {
this.id = id;
}
}
class MeasurementsSinkReady {
static class MeasurementsSinkReady {
final String id;
final SinkRef<String> sinkRef;
public PrepareUpload(String id, SinkRef<String> ref) {
public MeasurementsSinkReady(String id, SinkRef<String> ref) {
this.id = id;
this.sinkRef = ref;
}
@ -112,15 +115,16 @@ public class FlowStreamRefsDocTest extends AbstractJavaTest {
return receiveBuilder()
.match(PrepareUpload.class, prepare -> {
Sink<String, NotUsed> sink = logsSinkFor(prepare.id);
SinkRef<String> sinkRef = Source.sinkRef().to(sink).run(mat);
CompletionStage<SinkRef<String>> sinkRef = StreamRefs.<String>sinkRef().to(sink).run(mat);
sender().tell(new MeasurementsSinkReady(sinkRef), self());
PatternsCS.pipe(sinkRef.thenApply(ref -> new MeasurementsSinkReady(prepare.id, ref)), context().dispatcher())
.to(sender());
})
.create();
.build();
}
private Sink<String, NotUsed> logsSinkFor(String id) {
return Sink.ignore(); // would be actual useful Sink in reality
return Sink.<String>ignore().mapMaterializedValue(done -> NotUsed.getInstance());
}
}
//#offer-sink
@ -132,10 +136,10 @@ public class FlowStreamRefsDocTest extends AbstractJavaTest {
ActorRef receiver = system.actorOf(Props.create(DataReceiver.class), "dataReceiver");
receiver.tell(new PrepareUpload("system-42-tmp"), getTestActor());
MeasurementsSinkReady ready = expectMsgClass(LogsOffer.class);
MeasurementsSinkReady ready = expectMsgClass(MeasurementsSinkReady.class);
Source.repeat("hello")
.runWith(ready.sinkRef, mat);
.runWith(ready.sinkRef.getSink(), mat);
//#offer-sink-use
}};
}
@ -149,11 +153,11 @@ public class FlowStreamRefsDocTest extends AbstractJavaTest {
// configuring Sink.sourceRef (notice that we apply the attributes to the Sink!):
Source.repeat("hello")
.runWith(Sink.sourceRef().addAttributes(timeoutAttributes), mat);
.runWith(StreamRefs.<String>sourceRef().addAttributes(timeoutAttributes), mat);
// configuring SinkRef.source:
Source.sinkRef().addAttributes(timeoutAttributes)
.runWith(Sink.ignore(), mat); // not very interesting sink, just an example
StreamRefs.<String>sinkRef().addAttributes(timeoutAttributes)
.runWith(Sink.<String>ignore(), mat); // not very interesting sink, just an example
//#attr-sub-timeout
}};

View file

@ -9,18 +9,20 @@ import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.testkit.AkkaSpec
import docs.CompileOnlySpec
import scala.concurrent.Future
class FlowStreamRefsDocSpec extends AkkaSpec with CompileOnlySpec {
"offer a source ref" in compileOnlySpec {
//#offer-source
import akka.stream.SourceRef
import akka.pattern.pipe
case class RequestLogs(streamId: Int)
case class LogsOffer(streamId: Int, sourceRef: SourceRef[String])
class DataSource extends Actor {
import context.dispatcher
implicit val mat = ActorMaterializer()(context)
def receive = {
@ -29,13 +31,13 @@ class FlowStreamRefsDocSpec extends AkkaSpec with CompileOnlySpec {
val source: Source[String, NotUsed] = streamLogs(streamId)
// materialize the SourceRef:
val ref: SourceRef[String] = source.runWith(Sink.sourceRef())
val ref: Future[SourceRef[String]] = source.runWith(StreamRefs.sourceRef())
// wrap the SourceRef in some domain message, such that the sender knows what source it is
val reply: LogsOffer = LogsOffer(streamId, ref)
val reply: Future[LogsOffer] = ref.map(LogsOffer(streamId, _))
// reply to sender
sender() ! reply
reply pipeTo sender()
}
def streamLogs(streamId: Long): Source[String, NotUsed] = ???
@ -59,7 +61,7 @@ class FlowStreamRefsDocSpec extends AkkaSpec with CompileOnlySpec {
"offer a sink ref" in compileOnlySpec {
//#offer-sink
import akka.pattern._
import akka.pattern.pipe
import akka.stream.SinkRef
case class PrepareUpload(id: String)
@ -76,13 +78,13 @@ class FlowStreamRefsDocSpec extends AkkaSpec with CompileOnlySpec {
val sink: Sink[String, NotUsed] = logsSinkFor(nodeId)
// materialize the SinkRef (the remote is like a source of data for us):
val ref: SinkRef[String] = Source.sinkRef[String]().to(sink).run()
val ref: Future[SinkRef[String]] = StreamRefs.sinkRef[String]().to(sink).run()
// wrap the SinkRef in some domain message, such that the sender knows what source it is
val reply: MeasurementsSinkReady = MeasurementsSinkReady(nodeId, ref)
val reply: Future[MeasurementsSinkReady] = ref.map(MeasurementsSinkReady(nodeId, _))
// reply to sender
sender() ! reply
reply pipeTo sender()
}
def logsSinkFor(nodeId: String): Sink[String, NotUsed] = ???
@ -114,10 +116,10 @@ class FlowStreamRefsDocSpec extends AkkaSpec with CompileOnlySpec {
// configuring Sink.sourceRef (notice that we apply the attributes to the Sink!):
Source.repeat("hello")
.runWith(Sink.sourceRef().addAttributes(StreamRefAttributes.subscriptionTimeout(5.seconds)))
.runWith(StreamRefs.sourceRef().addAttributes(StreamRefAttributes.subscriptionTimeout(5.seconds)))
// configuring SinkRef.source:
Source.sinkRef().addAttributes(StreamRefAttributes.subscriptionTimeout(5.seconds))
StreamRefs.sinkRef().addAttributes(StreamRefAttributes.subscriptionTimeout(5.seconds))
.runWith(Sink.ignore) // not very interesting Sink, just an example
//#attr-sub-timeout
}

View file

@ -11,9 +11,10 @@ import akka.stream.testkit.TestPublisher
import akka.stream.testkit.scaladsl._
import akka.stream.{ ActorMaterializer, SinkRef, SourceRef, StreamRefAttributes }
import akka.testkit.{ AkkaSpec, ImplicitSender, SocketUtil, TestKit, TestProbe }
import akka.util.ByteString
import akka.util.{ ByteString, PrettyDuration }
import com.typesafe.config._
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.Future
import scala.util.control.NoStackTrace
@ -27,6 +28,7 @@ object StreamRefsSpec {
}
class DataSourceActor(probe: ActorRef) extends Actor with ActorLogging {
import context.dispatcher
implicit val mat = ActorMaterializer()
def receive = {
@ -37,35 +39,35 @@ object StreamRefsSpec {
* For them it's a Source; for us it is a Sink we run data "into"
*/
val source: Source[String, NotUsed] = Source(List("hello", "world"))
val ref: SourceRef[String] = source.runWith(Sink.sourceRef())
val ref: Future[SourceRef[String]] = source.runWith(StreamRefs.sourceRef())
sender() ! ref
ref pipeTo sender()
case "give-infinite"
val source: Source[String, NotUsed] = Source.fromIterator(() Iterator.from(1)).map("ping-" + _)
val (r: NotUsed, ref: SourceRef[String]) = source.toMat(Sink.sourceRef())(Keep.both).run()
val (r: NotUsed, ref: Future[SourceRef[String]]) = source.toMat(StreamRefs.sourceRef())(Keep.both).run()
sender() ! ref
ref pipeTo sender()
case "give-fail"
val ref = Source.failed[String](new Exception("Booooom!") with NoStackTrace)
.runWith(Sink.sourceRef())
.runWith(StreamRefs.sourceRef())
sender() ! ref
ref pipeTo sender()
case "give-complete-asap"
val ref = Source.empty
.runWith(Sink.sourceRef())
.runWith(StreamRefs.sourceRef())
sender() ! ref
ref pipeTo sender()
case "give-subscribe-timeout"
val ref = Source.repeat("is anyone there?")
.toMat(Sink.sourceRef())(Keep.right) // attributes like this so they apply to the Sink.sourceRef
.toMat(StreamRefs.sourceRef())(Keep.right) // attributes like this so they apply to the Sink.sourceRef
.withAttributes(StreamRefAttributes.subscriptionTimeout(500.millis))
.run()
sender() ! ref
ref pipeTo sender()
// case "send-bulk"
// /*
@ -84,23 +86,23 @@ object StreamRefsSpec {
*
* For them it's a Sink; for us it's a Source.
*/
val sink: SinkRef[String] =
Source.sinkRef[String]()
val sink =
StreamRefs.sinkRef[String]()
.to(Sink.actorRef(probe, "<COMPLETE>"))
.run()
sender() ! sink
sink pipeTo sender()
case "receive-subscribe-timeout"
val sink = Source.sinkRef[String]()
val sink = StreamRefs.sinkRef[String]()
.withAttributes(StreamRefAttributes.subscriptionTimeout(500.millis))
.to(Sink.actorRef(probe, "<COMPLETE>"))
.run()
sender() ! sink
sink pipeTo sender()
case "receive-32"
val (sink, driver) = Source.sinkRef[String]()
val (sink, driver) = StreamRefs.sinkRef[String]()
.toMat(TestSink.probe(context.system))(Keep.both)
.run()
@ -117,7 +119,7 @@ object StreamRefsSpec {
"<COMPLETED>"
} pipeTo probe
sender() ! sink
sink pipeTo sender()
// case "receive-bulk"
// /*

View file

@ -1,3 +0,0 @@
# #24230 stream refs - SourceRef / SinkRef
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.ActorMaterializerSettings.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.stage.GraphStageLogic#StageActor.this")

View file

@ -9,13 +9,11 @@ import java.util.concurrent.atomic.AtomicBoolean
import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, ExtendedActorSystem, Props }
import akka.event.LoggingAdapter
import akka.util.Helpers.toRootLowerCase
import akka.stream.ActorMaterializerSettings.defaultMaxFixedBufferSize
import akka.stream.impl._
import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.duration._
import akka.japi.function
import akka.stream.impl.streamref.StreamRefSettingsImpl
import akka.stream.stage.GraphStageLogic
import scala.util.control.NoStackTrace
@ -249,7 +247,8 @@ object ActorMaterializerSettings {
maxFixedBufferSize: Int) =
new ActorMaterializerSettings(
initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, defaultMaxFixedBufferSize, defaultIoSettings, defaultStreamRefSettings)
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, 1000, IOSettings(tcpWriteBufferSize = 16 * 1024),
StreamRefSettings(ConfigFactory.load().getConfig("akka.stream.materializer.stream-ref")))
/**
* Create [[ActorMaterializerSettings]] from the settings of an [[akka.actor.ActorSystem]] (Scala).
@ -294,7 +293,8 @@ object ActorMaterializerSettings {
maxFixedBufferSize: Int) =
new ActorMaterializerSettings(
initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, defaultMaxFixedBufferSize, defaultIoSettings, defaultStreamRefSettings)
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, 1000, IOSettings(tcpWriteBufferSize = 16 * 1024),
StreamRefSettings(ConfigFactory.load().getConfig("akka.stream.materializer.stream-ref")))
/**
* Create [[ActorMaterializerSettings]] from the settings of an [[akka.actor.ActorSystem]] (Java).
@ -308,10 +308,6 @@ object ActorMaterializerSettings {
def create(config: Config): ActorMaterializerSettings =
apply(config)
private val defaultMaxFixedBufferSize = 1000
private val defaultIoSettings = IOSettings(tcpWriteBufferSize = 16 * 1024)
// sadly due to the existence of the create-from-individual-parts methods, we need to replicate the defaults here from reference.conf...
private val defaultStreamRefSettings: StreamRefSettings = StreamRefSettings(ConfigFactory.load().getConfig("akka.stream.materializer.stream-ref"))
}
/**
@ -345,6 +341,57 @@ final class ActorMaterializerSettings private (
requirePowerOfTwo(maxInputBufferSize, "maxInputBufferSize")
require(initialInputBufferSize <= maxInputBufferSize, s"initialInputBufferSize($initialInputBufferSize) must be <= maxInputBufferSize($maxInputBufferSize)")
// backwards compatibility when added IOSettings, shouldn't be needed since private, but added to satisfy mima
def this(
initialInputBufferSize: Int,
maxInputBufferSize: Int,
dispatcher: String,
supervisionDecider: Supervision.Decider,
subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings,
debugLogging: Boolean,
outputBurstLimit: Int,
fuzzingMode: Boolean,
autoFusing: Boolean,
maxFixedBufferSize: Int,
syncProcessingLimit: Int,
ioSettings: IOSettings) =
this(initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, syncProcessingLimit, ioSettings,
StreamRefSettings(ConfigFactory.load().getConfig("akka.stream.materializer.stream-ref")))
// backwards compatibility when added IOSettings, shouldn't be needed since private, but added to satisfy mima
def this(
initialInputBufferSize: Int,
maxInputBufferSize: Int,
dispatcher: String,
supervisionDecider: Supervision.Decider,
subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings,
debugLogging: Boolean,
outputBurstLimit: Int,
fuzzingMode: Boolean,
autoFusing: Boolean,
maxFixedBufferSize: Int,
syncProcessingLimit: Int) =
this(initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, syncProcessingLimit,
IOSettings(tcpWriteBufferSize = 16 * 1024), StreamRefSettings(ConfigFactory.load().getConfig("akka.stream.materializer.stream-ref")))
// backwards compatibility when added IOSettings, shouldn't be needed since private, but added to satisfy mima
def this(
initialInputBufferSize: Int,
maxInputBufferSize: Int,
dispatcher: String,
supervisionDecider: Supervision.Decider,
subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings,
debugLogging: Boolean,
outputBurstLimit: Int,
fuzzingMode: Boolean,
autoFusing: Boolean,
maxFixedBufferSize: Int) =
this(initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, 1000, IOSettings(tcpWriteBufferSize = 16 * 1024),
StreamRefSettings(ConfigFactory.load().getConfig("akka.stream.materializer.stream-ref")))
private def copy(
initialInputBufferSize: Int = this.initialInputBufferSize,
maxInputBufferSize: Int = this.maxInputBufferSize,

View file

@ -46,7 +46,7 @@ trait StreamRefSettings {
// --- with... methods ---
def withBufferCapacity(value: Int): StreamRefSettings
def withDemandRedeliveryInterval(value: scala.concurrent.duration.FiniteDuration): StreamRefSettings
def withDemandRedeliveryInterval(value: FiniteDuration): StreamRefSettings
def withSubscriptionTimeout(value: FiniteDuration): StreamRefSettings
}

View file

@ -23,7 +23,7 @@ object SinkRef {
* to accept some data from it, and the remote one decides to accept the request to send data in a back-pressured
* streaming fashion -- using a sink ref.
*
* To create a [[SinkRef]] you have to materialize the `Sink` that you want to obtain a reference to by attaching it to a `Source.sinkRef`.
* To create a [[SinkRef]] you have to materialize the `Sink` that you want to obtain a reference to by attaching it to a `StreamRefs.sinkRef()`.
*
* Stream refs can be seen as Reactive Streams over network boundaries.
* See also [[akka.stream.SourceRef]] which is the dual of a `SinkRef`.
@ -69,14 +69,16 @@ trait SourceRef[T] {
// --- exceptions ---
final case class TargetRefNotInitializedYetException()
extends IllegalStateException("Internal remote target actor ref not yet resolved, yet attempted to send messages to it. This should not happen due to proper flow-control, please open a ticket on the issue tracker: https://github.com/akka/akka")
extends IllegalStateException("Internal remote target actor ref not yet resolved, yet attempted to send messages to it. " +
"This should not happen due to proper flow-control, please open a ticket on the issue tracker: https://github.com/akka/akka")
final case class StreamRefSubscriptionTimeoutException(msg: String)
extends IllegalStateException(msg)
final case class RemoteStreamRefActorTerminatedException(msg: String) extends RuntimeException(msg)
final case class InvalidSequenceNumberException(expectedSeqNr: Long, gotSeqNr: Long, msg: String)
extends IllegalStateException(s"$msg (expected: $expectedSeqNr, got: $gotSeqNr)")
extends IllegalStateException(s"$msg (expected: $expectedSeqNr, got: $gotSeqNr). " +
s"In most cases this means that message loss on this connection has occurred and the stream will fail eagerly.")
/**
* Stream refs establish a connection between a local and remote actor, representing the origin and remote sides
@ -89,4 +91,7 @@ final case class InvalidSequenceNumberException(expectedSeqNr: Long, gotSeqNr: L
* This is not meant as a security feature, but rather as plain sanity-check.
*/
final case class InvalidPartnerActorException(expectedRef: ActorRef, gotRef: ActorRef, msg: String)
extends IllegalStateException(s"$msg (expected: $expectedRef, got: $gotRef)")
extends IllegalStateException(s"$msg (expected: $expectedRef, got: $gotRef). " +
s"This may happen due to 'double-materialization' on the other side of this stream ref. " +
s"Do note that stream refs are one-shot references and have to be paired up in 1:1 pairs. " +
s"Multi-cast such as broadcast etc can be implemented by sharing multiple new stream references. ")

View file

@ -24,11 +24,8 @@ import org.reactivestreams.{ Processor, Publisher, Subscriber }
import scala.collection.immutable.Map
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ Await, ExecutionContextExecutor, Future }
import scala.annotation.tailrec
import akka.util.{ OptionVal, PrettyDuration }
import scala.util.{ Failure, Success }
import scala.concurrent.ExecutionContextExecutor
import akka.util.OptionVal
/**
* INTERNAL API
@ -433,7 +430,6 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
val traversalStack = new java.util.ArrayDeque[Traversal](16)
traversalStack.addLast(current)
var needsFlattening = false
val matValueStack = new java.util.ArrayDeque[Any](8)
if (Debug) {
@ -481,18 +477,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
case compose: Compose
val second = matValueStack.removeLast()
val first = matValueStack.removeLast()
val result =
if (needsFlattening && (first.isInstanceOf[FlattenMatHolder[_]] || second.isInstanceOf[FlattenMatHolder[_]])) {
(first, second) match {
case (FlattenMatHolder(f1, t1), FlattenMatHolder(f2, t2))
FlattenMatHolder[Any](f1.zip(f2).map({ case (left, right) compose(left, right) })(system.dispatcher), t1) // FIXME dedicate a dispatcher thread?
case (FlattenMatHolder(f1, t1), v2)
FlattenMatHolder(f1.map(compose(_, v2))(system.dispatcher), t1) // FIXME dedicate a dispatcher thread?
case (v1, FlattenMatHolder(f2, t2))
FlattenMatHolder(f2.map(compose(v1, _))(system.dispatcher), t2) // FIXME dedicate a dispatcher thread?
}
} else compose(first, second)
val result = compose(first, second)
matValueStack.addLast(result)
if (Debug) println(s"COMP: $matValueStack")
case PushAttributes(attr)
@ -505,13 +490,6 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
islandTracking.enterIsland(tag, attributesStack.getLast)
case ExitIsland
islandTracking.exitIsland()
case flatten: FlattenMat
val prev = matValueStack.removeLast()
if (!prev.isInstanceOf[Future[_]]) throw new IllegalArgumentException("flattenMaterializedValue MUST be applied immediately after a materialized value ")
val result = FlattenMatHolder(prev.asInstanceOf[Future[Mat]], flatten.timeout)
needsFlattening = true
matValueStack.addLast(result)
if (Debug) println(s"FLTN: $matValueStack")
case _
}
current = nextStep
@ -525,23 +503,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
islandTracking.allNestedIslandsReady()
if (Debug) println("--- Finished materialization")
matValueStack.peekLast() match {
case FlattenMatHolder(f: Future[Mat @unchecked], t)
f.value match {
case Some(Success(m)) m
case Some(Failure(ex)) throw new Exception("Flattened materialized value failed!", ex)
case None
// last resort, await
val start = System.currentTimeMillis()
val mat = Await.result(f, t)
val stop = System.currentTimeMillis()
import scala.concurrent.duration._
println(s"Waiting took: >>> ${PrettyDuration.format((stop - start).millis)} <<<")
mat
}
case mat: Mat mat
}
matValueStack.peekLast().asInstanceOf[Mat]
} finally {
if (isShutdown) throw shutdownWhileMaterializingFailure

View file

@ -125,7 +125,6 @@ import akka.stream._
val fanoutPublisherSink = name("fanoutPublisherSink")
val ignoreSink = name("ignoreSink")
val actorRefSink = name("actorRefSink")
val futureFlattenSink = name("futureFlattenSink")
val actorRefWithAck = name("actorRefWithAckSink")
val actorSubscriberSink = name("actorSubscriberSink")
val queueSink = name("queueSink")

View file

@ -13,8 +13,6 @@ import akka.util.OptionVal
import scala.language.existentials
import scala.collection.immutable.Map.Map1
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ Await, Future }
/**
* INTERNAL API
@ -156,12 +154,6 @@ import scala.concurrent.{ Await, Future }
@InternalApi private[akka] final case class Transform(mapper: AnyFunction1) extends MaterializedValueOp {
def apply(arg: Any): Any = mapper.asInstanceOf[Any Any](arg)
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class FlattenMat(timeout: FiniteDuration) extends MaterializedValueOp
/** INTERNAL API */
@InternalApi private[akka] final case class FlattenMatHolder[M](f: Future[M], timeout: FiniteDuration)
/**
* INTERNAL API
@ -373,8 +365,6 @@ import scala.concurrent.{ Await, Future }
*/
def transformMat(f: AnyFunction1): TraversalBuilder
def flattenMat(timeout: FiniteDuration): TraversalBuilder
protected def internalSetAttributes(attributes: Attributes): TraversalBuilder
def setAttributes(attributes: Attributes): TraversalBuilder = {
@ -490,9 +480,6 @@ import scala.concurrent.{ Await, Future }
override def transformMat(f: AnyFunction1): TraversalBuilder =
copy(traversalSoFar = traversalSoFar.concat(Transform(f)))
override def flattenMat(timeout: FiniteDuration): TraversalBuilder =
copy(traversalSoFar = traversalSoFar.concat(FlattenMat(timeout)))
override def offsetOf(in: InPort): Int = inToOffset(in)
override def isTraversalComplete: Boolean = true
@ -545,9 +532,6 @@ import scala.concurrent.{ Await, Future }
override def transformMat(f: AnyFunction1): TraversalBuilder =
TraversalBuilder.empty().add(this, module.shape, Keep.right).transformMat(f)
override def flattenMat(timeout: FiniteDuration): TraversalBuilder =
TraversalBuilder.empty().add(this, module.shape, Keep.right).flattenMat(timeout)
override val inSlots: Int = module.shape.inlets.size
override def offsetOfModule(out: OutPort): Int = 0
@ -1066,15 +1050,6 @@ import scala.concurrent.{ Await, Future }
copy(traversalSoFar = traversalSoFar.concat(Transform(f)))
}
/**
* UNSAFE API: May cause blocking DURING MATERIALIZATION.
* Use this API only in places where it is KNOWN that the flattened Future will be completed ASAP (less than a few ms, zero at best).
* WITH GREAT POWER, COMES GREAT RESPONSIBILITY.
*/
override def flattenMat(timeout: FiniteDuration): LinearTraversalBuilder = {
copy(traversalSoFar = traversalSoFar.concat(FlattenMat(timeout)))
}
/**
* Wraps the builder in an island that can be materialized differently, using async boundaries to bridge
* between islands.
@ -1326,9 +1301,6 @@ import scala.concurrent.{ Await, Future }
override def transformMat(f: AnyFunction1): TraversalBuilder = {
copy(finalSteps = finalSteps.concat(Transform(f)))
}
override def flattenMat(timeout: FiniteDuration): TraversalBuilder = {
copy(finalSteps = finalSteps.concat(FlattenMat(timeout)))
}
override def makeIsland(islandTag: IslandTag): TraversalBuilder = {
this.islandTag match {

View file

@ -371,90 +371,6 @@ import scala.util.control.NonFatal
override def toString: String = "FutureFlattenSource"
}
// final class FutureFlattenSink[T, M](futureSink: Future[Graph[SinkShape[T], M]])
// extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] {
// ReactiveStreamsCompliance.requireNonNullElement(futureSink)
//
// val in: Inlet[T] = Inlet("FutureFlattenSink.in")
// override val shape = SinkShape(in)
//
// override def initialAttributes = DefaultAttributes.futureFlattenSink
//
// override def createLogicAndMaterializedValue(attr: Attributes): (GraphStageLogic, Future[M]) = {
// val materialized = Promise[M]()
//
// val logic = new GraphStageLogic(shape) with InHandler with OutHandler {
// private val sourceOut = new SubSourceOutlet[T]("FutureFlattenSink.out")
//
// override def preStart(): Unit =
// futureSink.value match {
// case Some(it)
// // this optimisation avoids going through any execution context, in similar vein to FastFuture
// onFutureSinkCompleted(it)
// case _
// val cb = getAsyncCallback[Try[Graph[SinkShape[T], M]]](onFutureSinkCompleted).invoke _
// futureSink.onComplete(cb)(ExecutionContexts.sameThreadExecutionContext) // could be optimised FastFuture-like
// }
//
// // initial handler (until future completes)
// setHandler(in, new InHandler {
// override def onPush(): Unit = grab(it)
//
//// def onPull(): Unit = {}
////
//// override def onDownstreamFinish(): Unit = {
//// if (!materialized.isCompleted) {
//// // we used to try to materialize the "inner" source here just to get
//// // the materialized value, but that is not safe and may cause the graph shell
//// // to leak/stay alive after the stage completes
////
//// materialized.tryFailure(new StreamDetachedException("Stream cancelled before Source Future completed"))
//// }
////
//// super.onDownstreamFinish()
//// }
//})
//
// def onPush(): Unit =
// push(out, sourceOut.grab())
//
// def onPull(): Unit =
// sourceOut.pull()
//
// override def onUpstreamFinish(): Unit =
// completeStage()
//
// override def postStop(): Unit =
// if (!sourceOut.isClosed) sourceOut.cancel()
//
// def onFutureSinkCompleted(result: Try[Graph[SinkShape[T], M]]): Unit = {
// result.map { graph
// val runnable = Sink.fromGraph(graph).toMat(sourceOut.sink)(Keep.left)
// val matVal = interpreter.subFusingMaterializer.materialize(runnable, defaultAttributes = attr)
// materialized.success(matVal)
//
// setHandler(out, this)
// sourceOut.setHandler(this)
//
// if (isAvailable(out)) {
// sourceOut.pull()
// }
//
// }.recover {
// case t
// sourceOut.cancel()
// materialized.failure(t)
// failStage(t)
// }
// }
// }
//
// (logic, materialized.future)
// }
//
// override def toString: String = "FutureFlattenSource"
// }
final class FutureSource[T](val future: Future[T]) extends GraphStage[SourceShape[T]] {
ReactiveStreamsCompliance.requireNonNullElement(future)
val shape = SourceShape(Outlet[T]("FutureSource.out"))

View file

@ -1,47 +0,0 @@
///**
// * Copyright (C) 2018 Lightbend Inc. <http://www.lightbend.com>
// */
//package akka.stream.impl.streamref
//
//import akka.NotUsed
//import akka.annotation.InternalApi
//import akka.stream.{ SinkRef, javadsl }
//import akka.stream.scaladsl.{ Sink, Source }
//
//import scala.concurrent.{ Await, Future }
//import scala.util.{ Failure, Success }
//import scala.concurrent.duration._
//
///**
// * INTERNAL API
// * Allows users to directly use the SinkRef, even though we do have to go through the Future in order to be able
// * to materialize it. Since we initialize the ref from within the GraphStageLogic. See [[SinkRefStageImpl]] for usage.
// */
//@InternalApi
//private[akka] final case class MaterializedSinkRef[In](futureSink: Future[SinkRefImpl[In]]) extends SinkRef[In] {
//
// override def sink: Sink[In, NotUsed] =
// futureSink.value match {
//
// case Some(Success(ready))
// // the normal case, since once materialization finishes, the future is guaranteed to have been completed
// ready.sink
//
// case Some(Failure(cause))
// // materialization failed
// Sink.cancelled
//
// case None
// ???
// // not yet materialized, awaiting the preStart to be run...
// // Source.fromFutureSource(futureSource.map(ref => ref.source)(ex)).mapMaterializedValue(_ NotUsed)
// }
//
// override def toString: String =
// futureSink.value match {
// case None s"SinkRef(<materializing-source-ref>)"
// case Some(Success(ready)) ready.toString
// case Some(Failure(ex)) s"SinkRef(<failed:${ex.getMessage}>)"
// }
//
//}

View file

@ -1,79 +0,0 @@
///**
// * Copyright (C) 2018 Lightbend Inc. <http://www.lightbend.com>
// */
//package akka.stream.impl.streamref
//
//import java.util.concurrent.atomic.AtomicReference
//
//import akka.NotUsed
//import akka.actor.ActorRef
//import akka.annotation.InternalApi
//import akka.stream.scaladsl.Source
//import akka.stream.{ OverflowStrategy, SourceRef, javadsl }
//
//import scala.concurrent.{ ExecutionContext, Future }
//import scala.util.{ Failure, Success }
//
///**
// * INTERNAL API
// * Allows users to directly use the SourceRef, even though we do have to go through the Future in order to be able
// * to materialize it. Since we initialize the ref from within the GraphStageLogic. See [[SourceRefStageImpl]] for usage.
// */
//@InternalApi
//private[akka] final case class MaterializedSourceRef[Out](futureSource: Future[SourceRefImpl[Out]]) extends SourceRef[Out] {
//
// class State
// final case class Initializing(buffer: Vector[Out]) extends State
// final case class Initialized(ref: SourceRefImpl[Out]) extends State
//
// val it = new AtomicReference[State](Initializing(Vector.empty))
//
// // the advanced logic here is in order to allow RUNNING a stream locally ASAP even while materialization is still in-flight (preStart has not completed futureSource)
// override def source: Source[Out, NotUsed] =
// futureSource.value match {
//
// case Some(Success(ready))
// // the normal case, since once materialization finishes, the future is guaranteed to have been completed
// ready.source
//
// case Some(Failure(cause))
// // materialization failed
// Source.failed(cause).named("SourceRef")
//
// case None
// // not yet materialized -- in reality this case should not happen, since once materialization is finished, this Future is already completed
// // this impl is kept in case materialization semantics would change for some reason
// Source.fromFutureSource(futureSource.map(ref ref.source)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)).mapMaterializedValue(_ NotUsed)
// }
//
// // def forSerializationRef: SourceRef[Out] =
// // futureSource.value match {
// // case Some(Success(ready))
// // // the normal case, since once materialization finishes, the future is guaranteed to have been completed
// // ready
// //
// // case Some(Failure(cause))
// // throw new IllegalStateException("Illegal serialization attempt, this stream has never materialized the SourceRef!", cause)
// //
// // case None
// // // preStart has not finished yet, so we need to create and serialize a proxy ref
// // val proxy = mkProxy()
// //
// // new SourceRef[Out] {
// // override def source: Source[Out, NotUsed] =
// // ???
// // }
// // }
//
// override def toString: String =
// futureSource.value match {
// case None s"SourceRef(<materializing-source-ref>)"
// case Some(Success(ready)) ready.toString
// case Some(Failure(ex)) s"SourceRef(<failed:${ex.getMessage}>)"
// }
//
//}
//
//case class BufferedRef() {
//
//}

View file

@ -106,6 +106,7 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (
s"Local stream terminating, message loss (on remote side) may have happened."))
case (sender, StreamRefsProtocol.CumulativeDemand(d))
// the other side may attempt to "double subscribe", which we want to fail eagerly since we're 1:1 pairings
observeAndValidateSender(sender, "Illegal sender for CumulativeDemand")
if (remoteCumulativeDemandReceived < d) {
@ -154,6 +155,7 @@ private[stream] final class SinkRefStageImpl[In] private[akka] (
case _
completedBeforeRemoteConnected = OptionVal(scala.util.Failure(ex))
// not terminating on purpose, since other side may subscribe still and then we want to fail it
// the stage will be terminated either by timeout, or by the handling in `observeAndValidateSender`
setKeepGoing(true)
}

View file

@ -18,7 +18,7 @@ private[akka] final case class StreamRefSettingsImpl private (
) extends StreamRefSettings {
override def withBufferCapacity(value: Int): StreamRefSettings = copy(bufferCapacity = value)
override def withDemandRedeliveryInterval(value: scala.concurrent.duration.FiniteDuration): StreamRefSettings = copy(demandRedeliveryInterval = value)
override def withDemandRedeliveryInterval(value: FiniteDuration): StreamRefSettings = copy(demandRedeliveryInterval = value)
override def withSubscriptionTimeout(value: FiniteDuration): StreamRefSettings = copy(subscriptionTimeout = value)
override def productPrefix: String = Logging.simpleName(classOf[StreamRefSettings])

View file

@ -32,16 +32,6 @@ private[stream] final class StreamRefsMaster(system: ExtendedActorSystem) extend
private[this] val sourceRefStageNames = SeqActorName("SourceRef") // "local target"
private[this] val sinkRefStageNames = SeqActorName("SinkRef") // "remote sender"
// // needed because serialization may want to serialize a ref, before we have finished materialization
// def bufferedRefFor[T](allocatedName: String, future: Future[SourceRefImpl[T]]): SourceRef[T] = {
// def mkProxyOnDemand(): ActorRef = {
// log.warning("HAVE TO CREATE PROXY!!!")
// val proxyName = allocatedName + "Proxy"
// system.actorOf(Props(new ProxyRefFor(future)), proxyName)
// }
// MaterializedSourceRef[T](future)
// }
// TODO introduce a master with which all stages running the streams register themselves?
def nextSourceRefStageName(): String =
@ -51,29 +41,3 @@ private[stream] final class StreamRefsMaster(system: ExtendedActorSystem) extend
sinkRefStageNames.next()
}
/** INTERNAL API */
@InternalApi
private[akka] final class ProxyRefFor[T](futureRef: Future[SourceRefImpl[T]]) extends Actor with Stash with ActorLogging {
import context.dispatcher
import akka.pattern.pipe
override def preStart(): Unit = {
futureRef.pipeTo(self)
}
override def receive: Receive = {
case ref: SourceRefImpl[T]
log.warning("REF:::: initial = " + ref.initialPartnerRef)
context become initialized(ref)
unstashAll()
case msg
log.warning("Stashing [{}], since target reference is still not initialized...", msg.getClass)
stash()
}
def initialized(ref: SourceRefImpl[T]): Receive = {
case any ???
}
}

View file

@ -16,8 +16,6 @@ import akka.japi.Util
import java.util.Comparator
import java.util.concurrent.CompletionStage
import akka.annotation.InternalApi
import scala.compat.java8.FutureConverters._
object Flow {
@ -221,13 +219,6 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): Flow[In, Out, Mat2] =
new Flow(delegate.mapMaterializedValue(f.apply _))
/**
* INTERNAL API: Unsafe BLOCKING flattening if current materialized value is a Future.
*/
@InternalApi
private[akka] def flattenMaterializedValue[Mat2](timeout: FiniteDuration): Flow[In, Out, Mat2] =
new Flow(delegate.flattenMaterializedValue(timeout))
/**
* Transform this [[Flow]] by appending the given processing steps.
* {{{

View file

@ -18,10 +18,7 @@ import scala.concurrent.ExecutionContext
import scala.util.Try
import java.util.concurrent.CompletionStage
import akka.annotation.InternalApi
import scala.compat.java8.FutureConverters._
import scala.concurrent.duration.FiniteDuration
/** Java API */
object Sink {
@ -281,18 +278,6 @@ object Sink {
new Sink(scaladsl.Sink.lazyInit[T, M](
t sinkFactory.apply(t).toScala.map(_.asScala)(ExecutionContexts.sameThreadExecutionContext),
() fallback.create()).mapMaterializedValue(_.toJava))
/**
* A local [[Sink]] which materializes a [[SourceRef]] which can be used by other streams (including remote ones),
* to consume data from this local stream, as if they were attached in the spot of the local Sink directly.
*
* Adheres to [[StreamRefAttributes]].
*
* See more detailed documentation on [[SourceRef]].
*/
def sourceRef[T](): javadsl.Sink[T, SourceRef[T]] =
scaladsl.Sink.sourceRef[T]().asJava
}
/**
@ -336,13 +321,6 @@ final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[Sink
def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): Sink[In, Mat2] =
new Sink(delegate.mapMaterializedValue(f.apply _))
/**
* INTERNAL API: Unsafe BLOCKING flattening if current materialized value is a Future.
*/
@InternalApi
private[akka] def flattenMaterializedValue[Mat2](timeout: FiniteDuration): Sink[In, Mat2] =
new Sink(delegate.flattenMaterializedValue[Mat2](timeout))
/**
* Replace the attributes of this [[Sink]] with the given ones. If this Sink is a composite
* of multiple graphs, new attributes on the composite will be less specific than attributes

View file

@ -26,7 +26,6 @@ import java.util.concurrent.CompletionStage
import java.util.concurrent.CompletableFuture
import akka.annotation.InternalApi
import akka.stream.scaladsl.Sink
import scala.compat.java8.FutureConverters._
@ -446,16 +445,6 @@ object Source {
(s: S) read.apply(s).toScala.map(_.asScala)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext),
(s: S) close.apply(s).toScala))
/**
* A local [[Sink]] which materializes a [[SourceRef]] which can be used by other streams (including remote ones),
* to consume data from this local stream, as if they were attached in the spot of the local Sink directly.
*
* Adheres to [[StreamRefAttributes]].
*
* See more detailed documentation on [[SinkRef]].
*/
def sinkRef[T](): javadsl.Source[T, SinkRef[T]] =
scaladsl.Source.sinkRef[T]().asJava
}
/**
@ -485,13 +474,6 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): Source[Out, Mat2] =
new Source(delegate.mapMaterializedValue(f.apply _))
/**
* INTERNAL API: Unsafe BLOCKING flattening if current materialized value is a Future.
*/
@InternalApi
private[akka] def flattenMaterializedValue[Mat2](timeout: FiniteDuration): Source[Out, Mat2] =
new Source(delegate.flattenMaterializedValue[Mat2](timeout))
/**
* Transform this [[Source]] by appending the given processing stages.
* {{{

View file

@ -0,0 +1,46 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.javadsl
import java.util.concurrent.CompletionStage
import akka.annotation.ApiMayChange
import akka.stream._
/**
* API MAY CHANGE: The functionality of stream refs is working, however it is expected that the materialized value
* will eventually be able to remove the Future wrapping the stream references. For this reason the API is now marked
* as API may change. See ticket https://github.com/akka/akka/issues/24372 for more details.
*
* Factories for creating stream refs.
*/
@ApiMayChange
object StreamRefs {
import scala.compat.java8.FutureConverters._
/**
* A local [[Sink]] which materializes a [[SourceRef]] which can be used by other streams (including remote ones),
* to consume data from this local stream, as if they were attached in the spot of the local Sink directly.
*
* Adheres to [[StreamRefAttributes]].
*
* See more detailed documentation on [[SourceRef]].
*/
@ApiMayChange
def sourceRef[T](): javadsl.Sink[T, CompletionStage[SourceRef[T]]] =
scaladsl.StreamRefs.sourceRef[T]().mapMaterializedValue(_.toJava).asJava
/**
* A local [[Sink]] which materializes a [[SourceRef]] which can be used by other streams (including remote ones),
* to consume data from this local stream, as if they were attached in the spot of the local Sink directly.
*
* Adheres to [[StreamRefAttributes]].
*
* See more detailed documentation on [[SinkRef]].
*/
@ApiMayChange
def sinkRef[T](): javadsl.Source[T, CompletionStage[SinkRef[T]]] =
scaladsl.StreamRefs.sinkRef[T]().mapMaterializedValue(_.toJava).asJava
}

View file

@ -19,7 +19,7 @@ import scala.concurrent.duration.FiniteDuration
import scala.language.higherKinds
import akka.stream.impl.fusing.FlattenMerge
import akka.NotUsed
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.annotation.DoNotInherit
/**
* A `Flow` is a set of stream processing steps that has one open input and one open output.
@ -114,15 +114,6 @@ final class Flow[-In, +Out, +Mat](
traversalBuilder.transformMat(f),
shape)
/**
* INTERNAL API: Unsafe BLOCKING flattening if current materialized value is a Future.
*/
@InternalApi
private[akka] override def flattenMaterializedValue[Mat2](timeout: FiniteDuration): ReprMat[Out, Mat2] =
new Flow(
traversalBuilder.flattenMat(timeout),
shape)
/**
* Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableGraph]].
* {{{
@ -2565,12 +2556,6 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
*/
def mapMaterializedValue[Mat2](f: Mat Mat2): ReprMat[Out, Mat2]
/**
* INTERNAL API: Unsafe BLOCKING flattening if current materialized value is a Future.
*/
@InternalApi
private[akka] def flattenMaterializedValue[Mat2](timeout: FiniteDuration): ReprMat[Out, Mat2]
/**
* Materializes to `FlowMonitor[Out]` that allows monitoring of the current flow. All events are propagated
* by the monitor unchanged. Note that the monitor inserts a memory barrier every time it processes an

View file

@ -6,21 +6,17 @@ package akka.stream.scaladsl
import akka.{ Done, NotUsed }
import akka.dispatch.ExecutionContexts
import akka.actor.{ ActorRef, Props, Status }
import akka.annotation.InternalApi
import akka.stream.actor.ActorSubscriber
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl._
import akka.stream.impl.fusing.GraphStages
import akka.stream.impl.streamref.{ SinkRefStageImpl, SourceRefStageImpl }
import akka.stream.stage._
import akka.stream.{ javadsl, _ }
import akka.util.OptionVal
import org.reactivestreams.{ Publisher, Subscriber }
import scala.annotation.tailrec
import scala.collection.generic.CanBuildFrom
import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success, Try }
@ -61,15 +57,6 @@ final class Sink[-In, +Mat](
traversalBuilder.transformMat(f.asInstanceOf[Any Any]),
shape)
/**
* INTERNAL API: Unsafe BLOCKING flattening if current materialized value is a Future.
*/
@InternalApi
private[akka] def flattenMaterializedValue[Mat2](timeout: FiniteDuration): Sink[In, Mat2] =
new Sink(
traversalBuilder.flattenMat(timeout),
shape)
/**
* Replace the attributes of this [[Sink]] with the given ones. If this Sink is a composite
* of multiple graphs, new attributes on the composite will be less specific than attributes
@ -464,18 +451,4 @@ object Sink {
def lazyInit[T, M](sinkFactory: T Future[Sink[T, M]], fallback: () M): Sink[T, Future[M]] =
Sink.fromGraph(new LazySink(sinkFactory, fallback))
/**
* A local [[Sink]] which materializes a [[SourceRef]] which can be used by other streams (including remote ones),
* to consume data from this local stream, as if they were attached in the spot of the local Sink directly.
*
* Adheres to [[StreamRefAttributes]].
*
* See more detailed documentation on [[SourceRef]].
*/
def sourceRef[T](): Sink[T, SourceRef[T]] = {
import scala.concurrent.duration._
val sink = Sink.fromGraph(new SinkRefStageImpl[T](OptionVal.None))
sink.flattenMaterializedValue[SourceRef[T]](10.seconds)
}
}

View file

@ -5,16 +5,12 @@ package akka.stream.scaladsl
import java.util.concurrent.CompletionStage
import akka.util.ConstantFun
import akka.{ Done, NotUsed }
import akka.actor.{ ActorRef, Cancellable, Props }
import akka.annotation.InternalApi
import akka.stream.actor.ActorPublisher
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.fusing.GraphStages
import akka.stream.impl.fusing.GraphStages._
import akka.stream.impl.{ PublisherSource, _ }
import akka.stream.stage.GraphStageWithMaterializedValue
import akka.stream.{ Outlet, SourceShape, _ }
import akka.util.ConstantFun
import akka.{ Done, NotUsed }
@ -23,12 +19,9 @@ import org.reactivestreams.{ Publisher, Subscriber }
import scala.annotation.tailrec
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.compat.java8.FutureConverters._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ Future, Promise }
import akka.stream.impl.streamref.SourceRefStageImpl
import akka.stream.stage.{ GraphStage, GraphStageWithMaterializedValue }
import akka.util.OptionVal
import akka.stream.stage.GraphStageWithMaterializedValue
import scala.compat.java8.FutureConverters._
@ -93,13 +86,6 @@ final class Source[+Out, +Mat](
override def mapMaterializedValue[Mat2](f: Mat Mat2): ReprMat[Out, Mat2] =
new Source[Out, Mat2](traversalBuilder.transformMat(f.asInstanceOf[Any Any]), shape)
/**
* INTERNAL API: Unsafe BLOCKING flattening if current materialized value is a Future.
*/
@InternalApi
private[akka] override def flattenMaterializedValue[Mat2](timeout: FiniteDuration): ReprMat[Out, Mat2] =
new Source[Out, Mat2](traversalBuilder.flattenMat(timeout), shape)
/**
* Connect this `Source` to a `Sink` and run it. The returned value is the materialized value
* of the `Sink`, e.g. the `Publisher` of a [[akka.stream.scaladsl.Sink#publisher]].
@ -617,17 +603,4 @@ object Source {
def unfoldResourceAsync[T, S](create: () Future[S], read: (S) Future[Option[T]], close: (S) Future[Done]): Source[T, NotUsed] =
Source.fromGraph(new UnfoldResourceSourceAsync(create, read, close))
/**
* A local [[Sink]] which materializes a [[SourceRef]] which can be used by other streams (including remote ones),
* to consume data from this local stream, as if they were attached in the spot of the local Sink directly.
*
* Adheres to [[StreamRefAttributes]].
*
* See more detailed documentation on [[SinkRef]].
*/
def sinkRef[T](): Source[T, SinkRef[T]] = {
import scala.concurrent.duration._
val value: Source[T, Future[SinkRef[T]]] = Source.fromGraph(new SourceRefStageImpl[T](OptionVal.None))
value.flattenMaterializedValue[SinkRef[T]](1.second)
}
}

View file

@ -0,0 +1,46 @@
/**
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.annotation.ApiMayChange
import akka.stream.{ SinkRef, SourceRef, StreamRefAttributes }
import akka.stream.impl.streamref.{ SinkRefStageImpl, SourceRefStageImpl }
import akka.util.OptionVal
import scala.concurrent.Future
/**
* API MAY CHANGE: The functionality of stream refs is working, however it is expected that the materialized value
* will eventually be able to remove the Future wrapping the stream references. For this reason the API is now marked
* as API may change. See ticket https://github.com/akka/akka/issues/24372 for more details.
*
* Factories for creating stream refs.
*/
@ApiMayChange
object StreamRefs {
/**
* A local [[Sink]] which materializes a [[SourceRef]] which can be used by other streams (including remote ones),
* to consume data from this local stream, as if they were attached in the spot of the local Sink directly.
*
* Adheres to [[StreamRefAttributes]].
*
* See more detailed documentation on [[SourceRef]].
*/
@ApiMayChange
def sourceRef[T](): Sink[T, Future[SourceRef[T]]] =
Sink.fromGraph(new SinkRefStageImpl[T](OptionVal.None))
/**
* A local [[Sink]] which materializes a [[SourceRef]] which can be used by other streams (including remote ones),
* to consume data from this local stream, as if they were attached in the spot of the local Sink directly.
*
* Adheres to [[StreamRefAttributes]].
*
* See more detailed documentation on [[SinkRef]].
*/
@ApiMayChange
def sinkRef[T](): Source[T, Future[SinkRef[T]]] =
Source.fromGraph(new SourceRefStageImpl[T](OptionVal.None))
}

View file

@ -186,6 +186,15 @@ object GraphStageLogic {
initialReceive: StageActorRef.Receive,
name: String) {
// not really needed, but let's keep MiMa happy
def this(
materializer: akka.stream.ActorMaterializer,
getAsyncCallback: StageActorRef.Receive AsyncCallback[(ActorRef, Any)],
initialReceive: StageActorRef.Receive
) {
this(materializer, getAsyncCallback, initialReceive, "")
}
private val callback = getAsyncCallback(internalReceive)
private def cell = materializer.supervisor match {
case ref: LocalActorRef ref.underlying