- */
- public Builder setSeatTypeBytes(
- akka.protobuf.ByteString value) {
- if (value == null) {
- throw new NullPointerException();
- }
- bitField0_ |= 0x00000004;
- seatType_ = value;
- onChanged();
- return this;
- }
-
- // @@protoc_insertion_point(builder_scope:docs.persistence.SeatReserved)
- }
-
- static {
- defaultInstance = new SeatReserved(true);
- defaultInstance.initFields();
- }
-
- // @@protoc_insertion_point(class_scope:docs.persistence.SeatReserved)
- }
-
- private static akka.protobuf.Descriptors.Descriptor
- internal_static_docs_persistence_SeatReserved_descriptor;
- private static
- akka.protobuf.GeneratedMessage.FieldAccessorTable
- internal_static_docs_persistence_SeatReserved_fieldAccessorTable;
-
- public static akka.protobuf.Descriptors.FileDescriptor
- getDescriptor() {
- return descriptor;
- }
- private static akka.protobuf.Descriptors.FileDescriptor
- descriptor;
- static {
- java.lang.String[] descriptorData = {
- "\n\025FlightAppModels.proto\022\020docs.persistenc" +
- "e\"=\n\014SeatReserved\022\016\n\006letter\030\001 \002(\t\022\013\n\003row" +
- "\030\002 \002(\r\022\020\n\010seatType\030\003 \001(\tB\032\n\026docs.persist" +
- "ence.protoH\001"
- };
- akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
- new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
- public akka.protobuf.ExtensionRegistry assignDescriptors(
- akka.protobuf.Descriptors.FileDescriptor root) {
- descriptor = root;
- internal_static_docs_persistence_SeatReserved_descriptor =
- getDescriptor().getMessageTypes().get(0);
- internal_static_docs_persistence_SeatReserved_fieldAccessorTable = new
- akka.protobuf.GeneratedMessage.FieldAccessorTable(
- internal_static_docs_persistence_SeatReserved_descriptor,
- new java.lang.String[] { "Letter", "Row", "SeatType", });
- return null;
- }
- };
- akka.protobuf.Descriptors.FileDescriptor
- .internalBuildGeneratedFileFrom(descriptorData,
- new akka.protobuf.Descriptors.FileDescriptor[] {
- }, assigner);
- }
-
- // @@protoc_insertion_point(outer_class_scope)
-}
diff --git a/akka-docs/src/main/paradox/images/source-ref-animation.gif b/akka-docs/src/main/paradox/images/source-ref-animation.gif
new file mode 100644
index 0000000000..897ed88146
Binary files /dev/null and b/akka-docs/src/main/paradox/images/source-ref-animation.gif differ
diff --git a/akka-docs/src/main/paradox/stream/index.md b/akka-docs/src/main/paradox/stream/index.md
index 93be480114..9aa4aca3ef 100644
--- a/akka-docs/src/main/paradox/stream/index.md
+++ b/akka-docs/src/main/paradox/stream/index.md
@@ -16,6 +16,7 @@
* [stream-integrations](stream-integrations.md)
* [stream-error](stream-error.md)
* [stream-io](stream-io.md)
+* [stream-refs](stream-refs.md)
* [stream-parallelism](stream-parallelism.md)
* [stream-testkit](stream-testkit.md)
* [stages-overview](stages-overview.md)
diff --git a/akka-docs/src/main/paradox/stream/stream-refs.md b/akka-docs/src/main/paradox/stream/stream-refs.md
new file mode 100644
index 0000000000..90f1a35b79
--- /dev/null
+++ b/akka-docs/src/main/paradox/stream/stream-refs.md
@@ -0,0 +1,166 @@
+# Akka Streams over network boundaries
+
+Stream references, or "stream refs" for short, allow running Akka Streams across multiple nodes within
+an Akka Cluster.
+
+Unlike heavier "streaming data processing" frameworks, Akka Streams are not "deployed" nor automatically distributed.
+Akka stream refs are, as the name implies, references to existing parts of a stream, and can be used to create a
+distributed processing framework or introduce such capabilities in specific parts of your application, however they
+are not on that level of abstraction by themselves.
+
+Stream refs are trivial to make use of in existing clustered Akka applications, and require no additional configuration
+or setup. They automatically maintain flow-control / back-pressure over the network, and employ Akka's failure detection
+mechanisms to fail-fast ("let it crash!") in the case of failures of remote nodes. They can be seen as an implementation
+of the [Work Pulling Pattern](http://www.michaelpollmeier.com/akka-work-pulling-pattern), which one would otherwise
+implement manually.
+
+
+@@@ note
+ A useful way to think about stream refs is:
+ "like an `ActorRef`, but for Akka Streams's `Source` and `Sink`".
+
+ Since they refer to an already existing, possibly remote, `Sink` or `Source`.
+ This is not to be mistaken with deploying streams remotely, which this feature is not intended for.
+@@@
+
+
+@@@ note
+ Since Akka Streams are an implementation of Reactive Streams, by induction,
+ one can also say that stream refs allow running *Reactive Streams over the network*.
+@@@
+
+## Stream References
+
+The prime use case for stream refs is to replace raw actor or HTTP messaging between systems where a long
+running stream of data is expected between two entities. Often times, they can be used to effectively achieve point
+to point streaming without the need of setting up additional message brokers or similar secondary clusters.
+
+Stream refs are well suited for any system in which you need to send messages between nodes and need to do so in a
+flow-controlled fashion. Typical examples include sending work requests to worker nodes, as fast as possible, but
+not faster than the worker node can process them, or sending data elements which the downstream may be slow at processing.
+It is recommended to mix and introduce stream refs in Actor messaging based systems, where the actor messaging is used to
+orchestrate and prepare such message flows, and later the stream refs are used to do the flow-controlled message transfer.
+
+Stream refs are not persistent, however it is simple to build a resume-able stream by introducing such protocol
+on the actor messaging layer. Stream refs are absolutely expected to be sent over Akka remoting to other nodes
+within a cluster, and as such, complement and do not compete with plain Actor messaging.
+Actors would usually be used to establish the stream, by means of some initial message saying
+"I want to offer you many log elements (the stream ref)", or alternatively in the opposite way "If you need
+to send me much data, here is the stream ref you can use to do so".
+
+Since the two sides ("local" and "remote") of reach reference may be confusing to simply refer to as
+"remote" and "local" -- since either side can be seen as "local" or "remote" depending how we look at it --
+we propose to use the terminology "origin" and "target", which is defined by where the stream ref was created.
+For `SourceRef`s, the "origin" is the side which has the data that it is going to stream out. For `SinkRef`s
+the "origin" side is the actor system that is ready to receive the data and has allocated the ref. Those
+two may be seen as duals of each other, however to explain patterns about sharing references, we found this
+ wording to be rather useful.
+
+### Source Refs - offering streaming data over network
+
+A @scala[@scaladoc[`SourceRef`](akka.stream.SourceRef)]@java[@javadoc[`SourceRef`](akka.stream.SourceRef)]
+can be offered to a remote actor system in order for it to consume some source of data that we have prepared
+locally.
+
+In order to share a `Source` with a remote endpoint you need to materialize it by running it into the `Sink.sourceRef`.
+That sink materializes the `SourceRef` that you can then send to other nodes. Please note that it materializes into a
+`Future` so you will have to use the pipeTo
+
+Scala
+: @@snip [FlowStreamRefsDocSpec.scala]($code$/scala/docs/stream/FlowStreamRefsDocSpec.scala) { #offer-source }
+
+The origin actor which creates and owns the Source could also perform some validation or additional setup
+when preparing the source. Once it has handed out the `SourceRef` the remote side can run it like this:
+
+Scala
+: @@snip [FlowStreamRefsDocSpec.scala]($code$/scala/docs/stream/FlowStreamRefsDocSpec.scala) { #offer-source-use }
+
+The process of preparing and running a `SourceRef` powered distributed stream is shown by the animation below:
+
+
+
+@@@ warning
+ A `SourceRef` is *by design* "single-shot". i.e. it may only be materialized once.
+ This is in order to not complicate the mental model what materializing such value would mean.
+
+ By being single-shot, we always know what it means, and on top of those semantics offer a fan-out
+ by emitting multiple `SourceRef`s which target the same `Source` that uses `Broadcast`.
+ This also allows for fine grained control how many streams a system can expect to be running
+ at the same time, which is useful for capacity planning and "allowed number of concurrent streams
+ limiting" of clients.
+@@@
+
+### Sink Refs - offering to receive streaming data
+
+The dual of source references are A @scala[@scaladoc[`SourceRef`](akka.stream.SinkRef)]@java[@javadoc[`SourceRef`](akka.stream.SinkRef)]s. They can be used to offer the other side the capability to
+send to the *origin* side data in a streaming, flow-controlled fashion. The origin here allocates a Sink,
+which could be as simple as a `Sink.foreach` or as advanced as a complex sink which streams the incoming data
+into various other systems (e.g. any of the Alpakka provided Sinks).
+
+@@@ note
+ To form a good mental model of `SinkRef`s, you can think of them as being similar to "passive mode" in FTP.
+@@@
+
+Scala
+: @@snip [FlowStreamRefsDocSpec.scala]($code$/scala/docs/stream/FlowStreamRefsDocSpec.scala) { #offer-sink }
+
+Using the offered `SinkRef` to send data to the origin of the Sink is also simple, as we can treat the
+SinkRef just as any other Sink and directly `runWith` or `run` with it.
+
+Scala
+: @@snip [FlowStreamRefsDocSpec.scala]($code$/scala/docs/stream/FlowStreamRefsDocSpec.scala) { #offer-sink-use }
+
+
+
+
+
+@@@ warning
+ A `SinkeRef` is *by design* "single-shot". i.e. it may only be materialized once.
+ This is in order to not complicate the mental model what materializing such value would mean.
+
+ If you have an use case for building a fan-in operation accepting writes from multiple remote nodes,
+ you can build your Sink and prepend it with a `Merge` stage, each time materializing a new `SinkRef`
+ targeting that Merge. This has the added benefit of giving you full control how to merge these streams
+ (i.e. by using "merge preferred" or any other variation of the fan-in stages).
+@@@
+
+## Bulk Stream References
+
+@@@ warning
+ Not yet implemented. See ticket ...... FIXME, ticket number
+@@@
+
+Bulk stream references can be used to create simple to use side-channels to transfer humongous amounts
+of data such as huge log files, messages or even media, with as much ease as if it was a trivial local stream.
+
+Connections for each stream ref bulk stream ref are established independently, and do not utilise
+actor messaging (which is not designed for such bulk transfers, but rather small messages).
+
+## Configuration
+
+### Stream reference subscription timeouts
+
+All stream references have a subscription timeout, which is intended to prevent resource leaks
+in situations in which a remote node would requests the allocation of many streams yet never actually run
+them. In order to prevent this, each stream reference has a default timeout (of 30 seconds), after which
+if it's "handed out" side has not been materialized, the origin will terminate with a timeout exception,
+and IF the remote side eventually would be run afterwards, it would also immediately fail with an exception
+pointing out that the origin seems to be missing.
+
+Since these timeouts are often very different based on the kind of stream offered, and there can be
+many different kinds of them in the same application, it is possible to not only configure this setting
+globally (`akka.stream.materializer.stream-ref.subscription-timeout`), but also via attributes:
+
+
+
+Scala
+: @@snip [FlowStreamRefsDocSpec.scala]($code$/scala/docs/stream/FlowStreamRefsDocSpec.scala) { #attr-sub-timeout }
+
+
+
+## General configuration
+
+Other settings can be set globally, in your `application.conf`, by overriding any of the following values
+in the `akka.stream.materializer.stream-ref.*` keyspace:
+
+@@snip [reference.conf]($akka$/akka-stream/src/main/resources/reference.conf) { #stream-ref }
diff --git a/akka-docs/src/test/scala/docs/stream/FlowStreamRefsDocSpec.scala b/akka-docs/src/test/scala/docs/stream/FlowStreamRefsDocSpec.scala
new file mode 100644
index 0000000000..bc674df352
--- /dev/null
+++ b/akka-docs/src/test/scala/docs/stream/FlowStreamRefsDocSpec.scala
@@ -0,0 +1,125 @@
+/**
+ * Copyright (C) 2018 Lightbend Inc.
+ */
+package docs.stream
+
+import akka.NotUsed
+import akka.actor.{ Actor, Props }
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl._
+import akka.testkit.AkkaSpec
+import docs.CompileOnlySpec
+
+class FlowStreamRefsDocSpec extends AkkaSpec with CompileOnlySpec {
+
+ "offer a source ref" in compileOnlySpec {
+ //#offer-source
+ import akka.stream.SourceRef
+
+ case class RequestLogs(streamId: Int)
+ case class LogsOffer(streamId: Int, sourceRef: SourceRef[String])
+
+ class DataSource extends Actor {
+
+ implicit val mat = ActorMaterializer()(context)
+
+ def receive = {
+ case RequestLogs(streamId) ⇒
+ // obtain the source you want to offer:
+ val source: Source[String, NotUsed] = streamLogs(streamId)
+
+ // materialize the SourceRef:
+ val ref: SourceRef[String] = source.runWith(Sink.sourceRef())
+
+ // wrap the SourceRef in some domain message, such that the sender knows what source it is
+ val reply: LogsOffer = LogsOffer(streamId, ref)
+
+ // reply to sender
+ sender() ! reply
+ }
+
+ def streamLogs(streamId: Long): Source[String, NotUsed] = ???
+ }
+ //#offer-source
+
+ implicit val mat = ActorMaterializer()
+ //#offer-source-use
+ val sourceActor = system.actorOf(Props[DataSource], "dataSource")
+
+ sourceActor ! RequestLogs(1337)
+ val offer = expectMsgType[LogsOffer]
+
+ // implicitly converted to a Source:
+ offer.sourceRef.runWith(Sink.foreach(println))
+ // alternatively explicitly obtain Source from SourceRef:
+ // offer.sourceRef.source.runWith(Sink.foreach(println))
+
+ //#offer-source-use
+ }
+
+ "offer a sink ref" in compileOnlySpec {
+ //#offer-sink
+ import akka.pattern._
+ import akka.stream.SinkRef
+
+ case class PrepareUpload(sourceId: String)
+ case class MeasurementsSinkReady(sourceId: String, sinkRef: SinkRef[String])
+
+ class DataReceiver extends Actor {
+
+ import context.dispatcher
+ implicit val mat = ActorMaterializer()(context)
+
+ def receive = {
+ case PrepareUpload(nodeId) ⇒
+ // obtain the source you want to offer:
+ val sink: Sink[String, NotUsed] = logsSinkFor(nodeId)
+
+ // materialize the SinkRef (the remote is like a source of data for us):
+ val ref: SinkRef[String] = Source.sinkRef[String]().to(sink).run()
+
+ // wrap the SinkRef in some domain message, such that the sender knows what source it is
+ val reply: MeasurementsSinkReady = MeasurementsSinkReady(nodeId, ref)
+
+ // reply to sender
+ sender() ! reply
+ }
+
+ def logsSinkFor(nodeId: String): Sink[String, NotUsed] = ???
+ }
+
+ //#offer-sink
+
+ implicit val mat = ActorMaterializer()
+ def localMetrics(): Source[String, NotUsed] = Source.single("")
+
+ //#offer-sink-use
+ val receiver = system.actorOf(Props[DataReceiver], "receiver")
+
+ receiver ! PrepareUpload("system-42-tmp")
+ val ready = expectMsgType[MeasurementsSinkReady]
+
+ // stream local metrics to Sink's origin:
+ localMetrics().runWith(ready.sinkRef)
+ //#offer-sink-use
+ }
+
+ "show how to configure timeouts with attrs" in compileOnlySpec {
+
+ implicit val mat: ActorMaterializer = null
+ //#attr-sub-timeout
+ // configure the timeout for source
+ import scala.concurrent.duration._
+ import akka.stream.StreamRefAttributes
+
+ // configuring SourceRef.sink (notice that we apply the attributes to the Sink!):
+ Source.repeat("hello")
+ .runWith(Sink.sourceRef().addAttributes(StreamRefAttributes.subscriptionTimeout(5.seconds)))
+
+ // configuring SinkRef.source:
+ Source.sinkRef().addAttributes(StreamRefAttributes.subscriptionTimeout(5.seconds))
+ .runWith(Sink.ignore) // not very interesting Sink, just an example
+ //#attr-sub-timeout
+ }
+
+}
diff --git a/akka-remote/src/main/scala/akka/remote/artery/SendQueue.scala b/akka-remote/src/main/scala/akka/remote/artery/SendQueue.scala
index cf2c1a3107..0ff81be065 100644
--- a/akka-remote/src/main/scala/akka/remote/artery/SendQueue.scala
+++ b/akka-remote/src/main/scala/akka/remote/artery/SendQueue.scala
@@ -87,7 +87,8 @@ private[remote] final class SendQueue[T] extends GraphStageWithMaterializedValue
needWakeup = true
// additional poll() to grab any elements that might missed the needWakeup
// and have been enqueued just after it
- if (firstAttempt) tryPush(firstAttempt = false)
+ if (firstAttempt)
+ tryPush(firstAttempt = false)
case elem ⇒
needWakeup = false // there will be another onPull
push(out, elem)
diff --git a/akka-stream-tests/src/test/scala/akka/stream/remote/StreamRefsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/remote/StreamRefsSpec.scala
deleted file mode 100644
index ea8ffd59cc..0000000000
--- a/akka-stream-tests/src/test/scala/akka/stream/remote/StreamRefsSpec.scala
+++ /dev/null
@@ -1,274 +0,0 @@
-/**
- * Copyright (C) 2014-2017 Lightbend Inc.
- */
-package akka.stream.remote
-
-import akka.NotUsed
-import akka.actor.{ Actor, ActorIdentity, ActorLogging, ActorRef, ActorSystem, ActorSystemImpl, Identify, Props }
-import akka.stream.ActorMaterializer
-import akka.stream.remote.scaladsl.{ SinkRef, SourceRef }
-import akka.stream.scaladsl.{ Sink, Source }
-import akka.testkit.{ AkkaSpec, ImplicitSender, SocketUtil, TestKit, TestProbe }
-import akka.util.ByteString
-import com.typesafe.config._
-
-import scala.concurrent.duration._
-import scala.concurrent.{ Await, Future }
-
-object StreamRefsSpec {
-
- object DatasourceActor {
- def props(probe: ActorRef): Props =
- Props(new DatasourceActor(probe))
- .withDispatcher("akka.test.stream-dispatcher")
- }
-
- class DatasourceActor(probe: ActorRef) extends Actor with ActorLogging {
- implicit val mat = ActorMaterializer()
-
- def receive = {
- case "give" ⇒
- /*
- * Here we're able to send a source to a remote recipient
- *
- * For them it's a Source; for us it is a Sink we run data "into"
- */
- val source: Source[String, NotUsed] = Source(List("hello", "world"))
- val ref: Future[SourceRef[String]] = source.runWith(SourceRef.sink())
-
- println(s"source = ${source}")
- println(s"ref = ${Await.result(ref, 10.seconds)}")
-
- sender() ! Await.result(ref, 10.seconds)
-
- // case "send-bulk" ⇒
- // /*
- // * Here we're able to send a source to a remote recipient
- // * The source is a "bulk transfer one, in which we're ready to send a lot of data"
- // *
- // * For them it's a Source; for us it is a Sink we run data "into"
- // */
- // val source: Source[ByteString, NotUsed] = Source.single(ByteString("huge-file-"))
- // val ref: SourceRef[ByteString] = source.runWith(SourceRef.bulkTransfer())
- // sender() ! BulkSourceMsg(ref)
-
- case "receive" ⇒
- /*
- * We write out code, knowing that the other side will stream the data into it.
- *
- * For them it's a Sink; for us it's a Source.
- */
- val sink: Future[SinkRef[String]] =
- SinkRef.source[String]
- .to(Sink.actorRef(probe, ""))
- .run()
-
- // FIXME we want to avoid forcing people to do the Future here
- sender() ! Await.result(sink, 10.seconds)
-
- // case "receive-bulk" ⇒
- // /*
- // * We write out code, knowing that the other side will stream the data into it.
- // * This will open a dedicated connection per transfer.
- // *
- // * For them it's a Sink; for us it's a Source.
- // */
- // val sink: SinkRef[ByteString] =
- // SinkRef.bulkTransferSource()
- // .to(Sink.actorRef(probe, ""))
- // .run()
- //
- //
- // sender() ! BulkSinkMsg(sink)
- }
-
- }
-
- // -------------------------
-
- final case class SourceMsg(dataSource: SourceRef[String])
- final case class BulkSourceMsg(dataSource: SourceRef[ByteString])
- final case class SinkMsg(dataSink: SinkRef[String])
- final case class BulkSinkMsg(dataSink: SinkRef[ByteString])
-
- def config(): Config = {
- val address = SocketUtil.temporaryServerAddress()
- ConfigFactory.parseString(
- s"""
- akka {
- loglevel = INFO
-
- actor {
- provider = remote
- serialize-messages = off
-
-// serializers {
-// akka-stream-ref-test = "akka.stream.remote.StreamRefsSpecSerializer"
-// }
-//
-// serialization-bindings {
-// "akka.stream.remote.StreamRefsSpec$$SourceMsg" = akka-stream-ref-test
-// "akka.stream.remote.StreamRefsSpec$$BulkSourceMsg" = akka-stream-ref-test
-// "akka.stream.remote.StreamRefsSpec$$SinkMsg" = akka-stream-ref-test
-// "akka.stream.remote.StreamRefsSpec$$BulkSinkMsg" = akka-stream-ref-test
-// }
-//
-// serialization-identifiers {
-// "akka.stream.remote.StreamRefsSpecSerializer" = 33
-// }
-
- }
-
- remote.netty.tcp {
- port = ${address.getPort}
- hostname = "${address.getHostName}"
- }
- }
- """).withFallback(ConfigFactory.load())
- }
-}
-
-class StreamRefsSpec(config: Config) extends AkkaSpec(config) with ImplicitSender {
- import StreamRefsSpec._
-
- def this() {
- this(StreamRefsSpec.config())
- }
-
- val remoteSystem = ActorSystem("RemoteSystem", StreamRefsSpec.config())
- implicit val mat = ActorMaterializer()
-
- override protected def beforeTermination(): Unit =
- TestKit.shutdownActorSystem(remoteSystem)
-
- val p = TestProbe()
-
- // obtain the remoteActor ref via selection in order to use _real_ remoting in this test
- val remoteActor = {
- val it = remoteSystem.actorOf(DatasourceActor.props(p.ref), "remoteActor")
- val remoteAddress = remoteSystem.asInstanceOf[ActorSystemImpl].provider.getDefaultAddress
- system.actorSelection(it.path.toStringWithAddress(remoteAddress)) ! Identify("hi")
- expectMsgType[ActorIdentity].ref.get
- }
-
- "A SourceRef" must {
-
- "send messages via remoting" in {
- remoteActor ! "give"
- val sourceRef = expectMsgType[SourceRef[String]]
-
- Source.fromGraph(sourceRef)
- .log("RECEIVED")
- .runWith(Sink.actorRef(p.ref, ""))
-
- p.expectMsg("hello")
- p.expectMsg("world")
- p.expectMsg("")
- }
-
- }
-
- "A SinkRef" must {
-
- "receive elements via remoting" in {
-
- remoteActor ! "receive"
- val remoteSink: SinkRef[String] = expectMsgType[SinkRef[String]]
-
- Source("hello" :: "world" :: Nil)
- .to(remoteSink)
- .run()
-
- p.expectMsg("hello")
- p.expectMsg("world")
- p.expectMsg("")
- }
-
- "fail origin if remote Sink gets a failure" in {
-
- remoteActor ! "receive"
- val remoteSink: SinkRef[String] = expectMsgType[SinkRef[String]]
-
- val remoteFailureMessage = "Booom!"
- Source.failed(new Exception(remoteFailureMessage))
- .to(remoteSink)
- .run()
-
- val f = p.expectMsgType[akka.actor.Status.Failure]
- f.cause.getMessage should ===(s"Remote Sink failed, reason: $remoteFailureMessage")
- }
-
- "receive hundreds of elements via remoting" in {
- remoteActor ! "receive"
- val remoteSink: SinkRef[String] = expectMsgType[SinkRef[String]]
-
- val msgs = (1 to 100).toList.map(i ⇒ s"payload-$i")
-
- Source(msgs)
- .to(remoteSink)
- .run()
-
- msgs.foreach(t ⇒ p.expectMsg(t))
- p.expectMsg("")
- }
-
- // "fail origin if remote Sink is stopped abruptly" in {
- // val otherSystem = ActorSystem("OtherRemoteSystem", StreamRefsSpec.config())
- //
- // try {
- // // obtain the remoteActor ref via selection in order to use _real_ remoting in this test
- // val remoteActor = {
- // val it = otherSystem.actorOf(DatasourceActor.props(p.ref), "remoteActor")
- // val remoteAddress = otherSystem.asInstanceOf[ActorSystemImpl].provider.getDefaultAddress
- // system.actorSelection(it.path.toStringWithAddress(remoteAddress)) ! Identify("hi")
- // expectMsgType[ActorIdentity].ref.get
- // }
- //
- // remoteActor ! "receive"
- // val remoteSink: SinkRef[String] = expectMsgType[SinkRef[String]]
- //
- // val otherMat = ActorMaterializer()(otherSystem)
- // Source.maybe[String] // not emitting anything
- // .to(remoteSink)
- // .run()(otherMat)
- //
- // // and the system crashes; which should cause abrupt termination in the stream
- // Thread.sleep(300)
- // otherMat.shutdown()
- //
- // val f = p.expectMsgType[akka.actor.Status.Failure]
- // f.cause.getMessage should ===(s"Remote Sink failed, reason:")
- // } finally TestKit.shutdownActorSystem(otherSystem)
- // }
-
- }
-
-}
-//
-//class StreamRefsSpecSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest with BaseSerializer {
-//
-// lazy val ext = SerializationExtension(system)
-//
-// override def manifest(o: AnyRef): String = o match {
-// case StreamRefsSpec.SinkMsg(_) ⇒ "si"
-// case StreamRefsSpec.BulkSinkMsg(_) ⇒ "bsi"
-// case StreamRefsSpec.SourceMsg(_) ⇒ "so"
-// case StreamRefsSpec.BulkSourceMsg(_) ⇒ "bso"
-// }
-//
-// override def toBinary(o: AnyRef): Array[Byte] = {
-// system.log.warning("Serializing: " + o)
-// o match {
-// case StreamRefsSpec.SinkMsg(s) ⇒ s.
-// case StreamRefsSpec.BulkSinkMsg(s) ⇒ ext.serialize(s).get
-// case StreamRefsSpec.SourceMsg(s) ⇒ ext.serialize(s).get
-// case StreamRefsSpec.BulkSourceMsg(s) ⇒ ext.serialize(s).get
-// }
-// }
-//
-// override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
-// system.log.warning("MANI: " + manifest)
-// ???
-// }
-//
-//}
diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala
new file mode 100644
index 0000000000..217b16e741
--- /dev/null
+++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/StreamRefsSpec.scala
@@ -0,0 +1,366 @@
+/**
+ * Copyright (C) 2014-2017 Lightbend Inc.
+ */
+package akka.stream.scaladsl
+
+import akka.NotUsed
+import akka.actor.Status.Failure
+import akka.actor.{ Actor, ActorIdentity, ActorLogging, ActorRef, ActorSystem, ActorSystemImpl, Identify, Props }
+import akka.pattern._
+import akka.stream.testkit.TestPublisher
+import akka.stream.testkit.scaladsl._
+import akka.stream.{ ActorMaterializer, SinkRef, SourceRef, StreamRefAttributes }
+import akka.testkit.{ AkkaSpec, ImplicitSender, SocketUtil, TestKit, TestProbe }
+import akka.util.ByteString
+import com.typesafe.config._
+
+import scala.concurrent.duration._
+import scala.concurrent.Future
+import scala.util.control.NoStackTrace
+
+object StreamRefsSpec {
+
+ object DataSourceActor {
+ def props(probe: ActorRef): Props =
+ Props(new DataSourceActor(probe))
+ .withDispatcher("akka.test.stream-dispatcher")
+ }
+
+ class DataSourceActor(probe: ActorRef) extends Actor with ActorLogging {
+ implicit val mat = ActorMaterializer()
+
+ def receive = {
+ case "give" ⇒
+ /*
+ * Here we're able to send a source to a remote recipient
+ *
+ * For them it's a Source; for us it is a Sink we run data "into"
+ */
+ val source: Source[String, NotUsed] = Source(List("hello", "world"))
+ val ref: SourceRef[String] = source.runWith(Sink.sourceRef())
+
+ sender() ! ref
+
+ case "give-infinite" ⇒
+ val source: Source[String, NotUsed] = Source.fromIterator(() ⇒ Iterator.from(1)).map("ping-" + _)
+ val ref: SourceRef[String] = source.runWith(Sink.sourceRef())
+
+ sender() ! ref
+
+ case "give-fail" ⇒
+ val ref = Source.failed[String](new Exception("Booooom!") with NoStackTrace)
+ .runWith(Sink.sourceRef())
+
+ sender() ! ref
+
+ case "give-complete-asap" ⇒
+ val ref = Source.empty
+ .runWith(Sink.sourceRef())
+
+ sender() ! ref
+
+ case "give-subscribe-timeout" ⇒
+ val ref = Source.repeat("is anyone there?")
+ .toMat(Sink.sourceRef())(Keep.right) // attributes like this so they apply to the Sink.sourceRef
+ .withAttributes(StreamRefAttributes.subscriptionTimeout(500.millis))
+ .run()
+
+ sender() ! ref
+
+ // case "send-bulk" ⇒
+ // /*
+ // * Here we're able to send a source to a remote recipient
+ // * The source is a "bulk transfer one, in which we're ready to send a lot of data"
+ // *
+ // * For them it's a Source; for us it is a Sink we run data "into"
+ // */
+ // val source: Source[ByteString, NotUsed] = Source.single(ByteString("huge-file-"))
+ // val ref: SourceRef[ByteString] = source.runWith(SourceRef.bulkTransfer())
+ // sender() ! BulkSourceMsg(ref)
+
+ case "receive" ⇒
+ /*
+ * We write out code, knowing that the other side will stream the data into it.
+ *
+ * For them it's a Sink; for us it's a Source.
+ */
+ val sink: SinkRef[String] =
+ Source.sinkRef[String]
+ .to(Sink.actorRef(probe, ""))
+ .run()
+
+ sender() ! sink
+
+ case "receive-subscribe-timeout" ⇒
+ val sink = Source.sinkRef[String]
+ .withAttributes(StreamRefAttributes.subscriptionTimeout(500.millis))
+ .to(Sink.actorRef(probe, ""))
+ .run()
+
+ sender() ! sink
+
+ case "receive-32" ⇒
+ val (sink, driver) = Source.sinkRef[String]
+ .toMat(TestSink.probe(context.system))(Keep.both)
+ .run()
+
+ import context.dispatcher
+ Future {
+ driver.ensureSubscription()
+ driver.request(2)
+ driver.expectNext()
+ driver.expectNext()
+ driver.expectNoMessage(100.millis)
+ driver.request(30)
+ driver.expectNextN(30)
+
+ ""
+ } pipeTo probe
+
+ sender() ! sink
+
+ // case "receive-bulk" ⇒
+ // /*
+ // * We write out code, knowing that the other side will stream the data into it.
+ // * This will open a dedicated connection per transfer.
+ // *
+ // * For them it's a Sink; for us it's a Source.
+ // */
+ // val sink: SinkRef[ByteString] =
+ // SinkRef.bulkTransferSource()
+ // .to(Sink.actorRef(probe, ""))
+ // .run()
+ //
+ //
+ // sender() ! BulkSinkMsg(sink)
+ }
+
+ }
+
+ // -------------------------
+
+ final case class SourceMsg(dataSource: SourceRef[String])
+ final case class BulkSourceMsg(dataSource: SourceRef[ByteString])
+ final case class SinkMsg(dataSink: SinkRef[String])
+ final case class BulkSinkMsg(dataSink: SinkRef[ByteString])
+
+ def config(): Config = {
+ val address = SocketUtil.temporaryServerAddress()
+ ConfigFactory.parseString(
+ s"""
+ akka {
+ loglevel = INFO
+
+ actor {
+ provider = remote
+ serialize-messages = off
+ }
+
+ remote.netty.tcp {
+ port = ${address.getPort}
+ hostname = "${address.getHostName}"
+ }
+ }
+ """).withFallback(ConfigFactory.load())
+ }
+}
+
+class StreamRefsSpec(config: Config) extends AkkaSpec(config) with ImplicitSender {
+ import StreamRefsSpec._
+
+ def this() {
+ this(StreamRefsSpec.config())
+ }
+
+ val remoteSystem = ActorSystem("RemoteSystem", StreamRefsSpec.config())
+ implicit val mat = ActorMaterializer()
+
+ override protected def beforeTermination(): Unit =
+ TestKit.shutdownActorSystem(remoteSystem)
+
+ val p = TestProbe()
+
+ // obtain the remoteActor ref via selection in order to use _real_ remoting in this test
+ val remoteActor = {
+ val it = remoteSystem.actorOf(DataSourceActor.props(p.ref), "remoteActor")
+ val remoteAddress = remoteSystem.asInstanceOf[ActorSystemImpl].provider.getDefaultAddress
+ system.actorSelection(it.path.toStringWithAddress(remoteAddress)) ! Identify("hi")
+ expectMsgType[ActorIdentity].ref.get
+ }
+
+ "A SourceRef" must {
+
+ "send messages via remoting" in {
+ remoteActor ! "give"
+ val sourceRef = expectMsgType[SourceRef[String]]
+
+ sourceRef
+ .runWith(Sink.actorRef(p.ref, ""))
+
+ p.expectMsg("hello")
+ p.expectMsg("world")
+ p.expectMsg("")
+ }
+
+ "fail when remote source failed" in {
+ remoteActor ! "give-fail"
+ val sourceRef = expectMsgType[SourceRef[String]]
+
+ sourceRef
+ .runWith(Sink.actorRef(p.ref, ""))
+
+ val f = p.expectMsgType[Failure]
+ f.cause.getMessage should include("Remote stream (")
+ // actor name here, for easier identification
+ f.cause.getMessage should include("failed, reason: Booooom!")
+ }
+
+ "complete properly when remote source is empty" in {
+ // this is a special case since it makes sure that the remote stage is still there when we connect to it
+
+ remoteActor ! "give-complete-asap"
+ val sourceRef = expectMsgType[SourceRef[String]]
+
+ sourceRef
+ .runWith(Sink.actorRef(p.ref, ""))
+
+ p.expectMsg("")
+ }
+
+ "respect back-pressure from (implied by target Sink)" in {
+ remoteActor ! "give-infinite"
+ val sourceRef = expectMsgType[SourceRef[String]]
+
+ val probe = sourceRef
+ .runWith(TestSink.probe)
+
+ probe.ensureSubscription()
+ probe.expectNoMessage(100.millis)
+
+ probe.request(1)
+ probe.expectNext("ping-1")
+ probe.expectNoMessage(100.millis)
+
+ probe.request(20)
+ probe.expectNextN((1 to 20).map(i ⇒ "ping-" + (i + 1)))
+ probe.cancel()
+
+ // since no demand anyway
+ probe.expectNoMessage(100.millis)
+
+ // should not cause more pulling, since we issued a cancel already
+ probe.request(10)
+ probe.expectNoMessage(100.millis)
+ }
+
+ "receive timeout if subscribing too late to the source ref" in {
+ remoteActor ! "give-subscribe-timeout"
+ val remoteSource: SourceRef[String] = expectMsgType[SourceRef[String]]
+
+ // not materializing it, awaiting the timeout...
+ Thread.sleep(800) // the timeout is 500ms
+
+ val probe = remoteSource
+ .runWith(TestSink.probe[String](system))
+
+ // val failure = p.expectMsgType[Failure]
+ // failure.cause.getMessage should include("[SourceRef-0] Remote side did not subscribe (materialize) handed out Sink reference")
+
+ // the local "remote sink" should cancel, since it should notice the origin target actor is dead
+ probe.ensureSubscription()
+ val ex = probe.expectError()
+ ex.getMessage should include("has terminated! Tearing down this side of the stream as well.")
+ }
+ }
+
+ "A SinkRef" must {
+
+ "receive elements via remoting" in {
+
+ remoteActor ! "receive"
+ val remoteSink: SinkRef[String] = expectMsgType[SinkRef[String]]
+
+ Source("hello" :: "world" :: Nil)
+ .to(remoteSink)
+ .run()
+
+ p.expectMsg("hello")
+ p.expectMsg("world")
+ p.expectMsg("")
+ }
+
+ "fail origin if remote Sink gets a failure" in {
+
+ remoteActor ! "receive"
+ val remoteSink: SinkRef[String] = expectMsgType[SinkRef[String]]
+
+ val remoteFailureMessage = "Booom!"
+ Source.failed(new Exception(remoteFailureMessage))
+ .to(remoteSink)
+ .run()
+
+ val f = p.expectMsgType[akka.actor.Status.Failure]
+ f.cause.getMessage should include(s"Remote stream (")
+ // actor name ere, for easier identification
+ f.cause.getMessage should include(s"failed, reason: $remoteFailureMessage")
+ }
+
+ "receive hundreds of elements via remoting" in {
+ remoteActor ! "receive"
+ val remoteSink: SinkRef[String] = expectMsgType[SinkRef[String]]
+
+ val msgs = (1 to 100).toList.map(i ⇒ s"payload-$i")
+
+ Source(msgs)
+ .runWith(remoteSink)
+
+ msgs.foreach(t ⇒ p.expectMsg(t))
+ p.expectMsg("")
+ }
+
+ "receive timeout if subscribing too late to the sink ref" in {
+ remoteActor ! "receive-subscribe-timeout"
+ val remoteSink: SinkRef[String] = expectMsgType[SinkRef[String]]
+
+ // not materializing it, awaiting the timeout...
+ Thread.sleep(800) // the timeout is 500ms
+
+ val probe = TestSource.probe[String](system)
+ .to(remoteSink)
+ .run()
+
+ val failure = p.expectMsgType[Failure]
+ failure.cause.getMessage should include("Remote side did not subscribe (materialize) handed out Sink reference")
+
+ // the local "remote sink" should cancel, since it should notice the origin target actor is dead
+ probe.expectCancellation()
+ }
+
+ "respect back -pressure from (implied by origin Sink)" in {
+ remoteActor ! "receive-32"
+ val sinkRef = expectMsgType[SinkRef[String]]
+
+ Source.repeat("hello") runWith sinkRef
+
+ // if we get this message, it means no checks in the request/expect semantics were broken, good!
+ p.expectMsg("")
+ }
+
+ "not allow materializing multiple times" in {
+ remoteActor ! "receive"
+ val sinkRef = expectMsgType[SinkRef[String]]
+
+ val p1: TestPublisher.Probe[String] = TestSource.probe[String].to(sinkRef).run()
+ val p2: TestPublisher.Probe[String] = TestSource.probe[String].to(sinkRef).run()
+
+ p1.ensureSubscription()
+ val req = p1.expectRequest()
+
+ // will be cancelled immediately, since it's 2nd:
+ p2.ensureSubscription()
+ p2.expectCancellation()
+ }
+
+ }
+
+}
diff --git a/akka-stream/src/main/java/akka/stream/remote/StreamRefContainers.java b/akka-stream/src/main/java/akka/stream/StreamRefMessages.java
similarity index 70%
rename from akka-stream/src/main/java/akka/stream/remote/StreamRefContainers.java
rename to akka-stream/src/main/java/akka/stream/StreamRefMessages.java
index 8175eefff9..513ec254bd 100644
--- a/akka-stream/src/main/java/akka/stream/remote/StreamRefContainers.java
+++ b/akka-stream/src/main/java/akka/stream/StreamRefMessages.java
@@ -1,10 +1,10 @@
// Generated by the protocol buffer compiler. DO NOT EDIT!
-// source: StreamRefContainers.proto
+// source: StreamRefMessages.proto
-package akka.stream.remote;
+package akka.stream;
-public final class StreamRefContainers {
- private StreamRefContainers() {}
+public final class StreamRefMessages {
+ private StreamRefMessages() {}
public static void registerAllExtensions(
akka.protobuf.ExtensionRegistry registry) {
}
@@ -19,21 +19,11 @@ public final class StreamRefContainers {
/**
* required .ActorRef targetRef = 1;
*/
- akka.stream.remote.StreamRefContainers.ActorRef getTargetRef();
+ akka.stream.StreamRefMessages.ActorRef getTargetRef();
/**
* required .ActorRef targetRef = 1;
*/
- akka.stream.remote.StreamRefContainers.ActorRefOrBuilder getTargetRefOrBuilder();
-
- // optional int64 initialDemand = 2;
- /**
- * optional int64 initialDemand = 2;
- */
- boolean hasInitialDemand();
- /**
- * optional int64 initialDemand = 2;
- */
- long getInitialDemand();
+ akka.stream.StreamRefMessages.ActorRefOrBuilder getTargetRefOrBuilder();
}
/**
* Protobuf type {@code SinkRef}
@@ -87,11 +77,11 @@ public final class StreamRefContainers {
break;
}
case 10: {
- akka.stream.remote.StreamRefContainers.ActorRef.Builder subBuilder = null;
+ akka.stream.StreamRefMessages.ActorRef.Builder subBuilder = null;
if (((bitField0_ & 0x00000001) == 0x00000001)) {
subBuilder = targetRef_.toBuilder();
}
- targetRef_ = input.readMessage(akka.stream.remote.StreamRefContainers.ActorRef.PARSER, extensionRegistry);
+ targetRef_ = input.readMessage(akka.stream.StreamRefMessages.ActorRef.PARSER, extensionRegistry);
if (subBuilder != null) {
subBuilder.mergeFrom(targetRef_);
targetRef_ = subBuilder.buildPartial();
@@ -99,11 +89,6 @@ public final class StreamRefContainers {
bitField0_ |= 0x00000001;
break;
}
- case 16: {
- bitField0_ |= 0x00000002;
- initialDemand_ = input.readInt64();
- break;
- }
}
}
} catch (akka.protobuf.InvalidProtocolBufferException e) {
@@ -118,14 +103,14 @@ public final class StreamRefContainers {
}
public static final akka.protobuf.Descriptors.Descriptor
getDescriptor() {
- return akka.stream.remote.StreamRefContainers.internal_static_SinkRef_descriptor;
+ return akka.stream.StreamRefMessages.internal_static_SinkRef_descriptor;
}
protected akka.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
- return akka.stream.remote.StreamRefContainers.internal_static_SinkRef_fieldAccessorTable
+ return akka.stream.StreamRefMessages.internal_static_SinkRef_fieldAccessorTable
.ensureFieldAccessorsInitialized(
- akka.stream.remote.StreamRefContainers.SinkRef.class, akka.stream.remote.StreamRefContainers.SinkRef.Builder.class);
+ akka.stream.StreamRefMessages.SinkRef.class, akka.stream.StreamRefMessages.SinkRef.Builder.class);
}
public static akka.protobuf.Parser PARSER =
@@ -146,7 +131,7 @@ public final class StreamRefContainers {
private int bitField0_;
// required .ActorRef targetRef = 1;
public static final int TARGETREF_FIELD_NUMBER = 1;
- private akka.stream.remote.StreamRefContainers.ActorRef targetRef_;
+ private akka.stream.StreamRefMessages.ActorRef targetRef_;
/**
* required .ActorRef targetRef = 1;
*/
@@ -156,35 +141,18 @@ public final class StreamRefContainers {
/**
* required .ActorRef targetRef = 1;
*/
- public akka.stream.remote.StreamRefContainers.ActorRef getTargetRef() {
+ public akka.stream.StreamRefMessages.ActorRef getTargetRef() {
return targetRef_;
}
/**
* required .ActorRef targetRef = 1;
*/
- public akka.stream.remote.StreamRefContainers.ActorRefOrBuilder getTargetRefOrBuilder() {
+ public akka.stream.StreamRefMessages.ActorRefOrBuilder getTargetRefOrBuilder() {
return targetRef_;
}
- // optional int64 initialDemand = 2;
- public static final int INITIALDEMAND_FIELD_NUMBER = 2;
- private long initialDemand_;
- /**
- * optional int64 initialDemand = 2;
- */
- public boolean hasInitialDemand() {
- return ((bitField0_ & 0x00000002) == 0x00000002);
- }
- /**
- * optional int64 initialDemand = 2;
- */
- public long getInitialDemand() {
- return initialDemand_;
- }
-
private void initFields() {
- targetRef_ = akka.stream.remote.StreamRefContainers.ActorRef.getDefaultInstance();
- initialDemand_ = 0L;
+ targetRef_ = akka.stream.StreamRefMessages.ActorRef.getDefaultInstance();
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -209,9 +177,6 @@ public final class StreamRefContainers {
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeMessage(1, targetRef_);
}
- if (((bitField0_ & 0x00000002) == 0x00000002)) {
- output.writeInt64(2, initialDemand_);
- }
getUnknownFields().writeTo(output);
}
@@ -225,10 +190,6 @@ public final class StreamRefContainers {
size += akka.protobuf.CodedOutputStream
.computeMessageSize(1, targetRef_);
}
- if (((bitField0_ & 0x00000002) == 0x00000002)) {
- size += akka.protobuf.CodedOutputStream
- .computeInt64Size(2, initialDemand_);
- }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -241,53 +202,53 @@ public final class StreamRefContainers {
return super.writeReplace();
}
- public static akka.stream.remote.StreamRefContainers.SinkRef parseFrom(
+ public static akka.stream.StreamRefMessages.SinkRef parseFrom(
akka.protobuf.ByteString data)
throws akka.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
- public static akka.stream.remote.StreamRefContainers.SinkRef parseFrom(
+ public static akka.stream.StreamRefMessages.SinkRef parseFrom(
akka.protobuf.ByteString data,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws akka.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
- public static akka.stream.remote.StreamRefContainers.SinkRef parseFrom(byte[] data)
+ public static akka.stream.StreamRefMessages.SinkRef parseFrom(byte[] data)
throws akka.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
- public static akka.stream.remote.StreamRefContainers.SinkRef parseFrom(
+ public static akka.stream.StreamRefMessages.SinkRef parseFrom(
byte[] data,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws akka.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
- public static akka.stream.remote.StreamRefContainers.SinkRef parseFrom(java.io.InputStream input)
+ public static akka.stream.StreamRefMessages.SinkRef parseFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
- public static akka.stream.remote.StreamRefContainers.SinkRef parseFrom(
+ public static akka.stream.StreamRefMessages.SinkRef parseFrom(
java.io.InputStream input,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseFrom(input, extensionRegistry);
}
- public static akka.stream.remote.StreamRefContainers.SinkRef parseDelimitedFrom(java.io.InputStream input)
+ public static akka.stream.StreamRefMessages.SinkRef parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input);
}
- public static akka.stream.remote.StreamRefContainers.SinkRef parseDelimitedFrom(
+ public static akka.stream.StreamRefMessages.SinkRef parseDelimitedFrom(
java.io.InputStream input,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input, extensionRegistry);
}
- public static akka.stream.remote.StreamRefContainers.SinkRef parseFrom(
+ public static akka.stream.StreamRefMessages.SinkRef parseFrom(
akka.protobuf.CodedInputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
- public static akka.stream.remote.StreamRefContainers.SinkRef parseFrom(
+ public static akka.stream.StreamRefMessages.SinkRef parseFrom(
akka.protobuf.CodedInputStream input,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
@@ -296,7 +257,7 @@ public final class StreamRefContainers {
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
- public static Builder newBuilder(akka.stream.remote.StreamRefContainers.SinkRef prototype) {
+ public static Builder newBuilder(akka.stream.StreamRefMessages.SinkRef prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
@@ -312,20 +273,20 @@ public final class StreamRefContainers {
*/
public static final class Builder extends
akka.protobuf.GeneratedMessage.Builder
- implements akka.stream.remote.StreamRefContainers.SinkRefOrBuilder {
+ implements akka.stream.StreamRefMessages.SinkRefOrBuilder {
public static final akka.protobuf.Descriptors.Descriptor
getDescriptor() {
- return akka.stream.remote.StreamRefContainers.internal_static_SinkRef_descriptor;
+ return akka.stream.StreamRefMessages.internal_static_SinkRef_descriptor;
}
protected akka.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
- return akka.stream.remote.StreamRefContainers.internal_static_SinkRef_fieldAccessorTable
+ return akka.stream.StreamRefMessages.internal_static_SinkRef_fieldAccessorTable
.ensureFieldAccessorsInitialized(
- akka.stream.remote.StreamRefContainers.SinkRef.class, akka.stream.remote.StreamRefContainers.SinkRef.Builder.class);
+ akka.stream.StreamRefMessages.SinkRef.class, akka.stream.StreamRefMessages.SinkRef.Builder.class);
}
- // Construct using akka.stream.remote.StreamRefContainers.SinkRef.newBuilder()
+ // Construct using akka.stream.StreamRefMessages.SinkRef.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
@@ -347,13 +308,11 @@ public final class StreamRefContainers {
public Builder clear() {
super.clear();
if (targetRefBuilder_ == null) {
- targetRef_ = akka.stream.remote.StreamRefContainers.ActorRef.getDefaultInstance();
+ targetRef_ = akka.stream.StreamRefMessages.ActorRef.getDefaultInstance();
} else {
targetRefBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000001);
- initialDemand_ = 0L;
- bitField0_ = (bitField0_ & ~0x00000002);
return this;
}
@@ -363,23 +322,23 @@ public final class StreamRefContainers {
public akka.protobuf.Descriptors.Descriptor
getDescriptorForType() {
- return akka.stream.remote.StreamRefContainers.internal_static_SinkRef_descriptor;
+ return akka.stream.StreamRefMessages.internal_static_SinkRef_descriptor;
}
- public akka.stream.remote.StreamRefContainers.SinkRef getDefaultInstanceForType() {
- return akka.stream.remote.StreamRefContainers.SinkRef.getDefaultInstance();
+ public akka.stream.StreamRefMessages.SinkRef getDefaultInstanceForType() {
+ return akka.stream.StreamRefMessages.SinkRef.getDefaultInstance();
}
- public akka.stream.remote.StreamRefContainers.SinkRef build() {
- akka.stream.remote.StreamRefContainers.SinkRef result = buildPartial();
+ public akka.stream.StreamRefMessages.SinkRef build() {
+ akka.stream.StreamRefMessages.SinkRef result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
- public akka.stream.remote.StreamRefContainers.SinkRef buildPartial() {
- akka.stream.remote.StreamRefContainers.SinkRef result = new akka.stream.remote.StreamRefContainers.SinkRef(this);
+ public akka.stream.StreamRefMessages.SinkRef buildPartial() {
+ akka.stream.StreamRefMessages.SinkRef result = new akka.stream.StreamRefMessages.SinkRef(this);
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
@@ -390,32 +349,25 @@ public final class StreamRefContainers {
} else {
result.targetRef_ = targetRefBuilder_.build();
}
- if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
- to_bitField0_ |= 0x00000002;
- }
- result.initialDemand_ = initialDemand_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
}
public Builder mergeFrom(akka.protobuf.Message other) {
- if (other instanceof akka.stream.remote.StreamRefContainers.SinkRef) {
- return mergeFrom((akka.stream.remote.StreamRefContainers.SinkRef)other);
+ if (other instanceof akka.stream.StreamRefMessages.SinkRef) {
+ return mergeFrom((akka.stream.StreamRefMessages.SinkRef)other);
} else {
super.mergeFrom(other);
return this;
}
}
- public Builder mergeFrom(akka.stream.remote.StreamRefContainers.SinkRef other) {
- if (other == akka.stream.remote.StreamRefContainers.SinkRef.getDefaultInstance()) return this;
+ public Builder mergeFrom(akka.stream.StreamRefMessages.SinkRef other) {
+ if (other == akka.stream.StreamRefMessages.SinkRef.getDefaultInstance()) return this;
if (other.hasTargetRef()) {
mergeTargetRef(other.getTargetRef());
}
- if (other.hasInitialDemand()) {
- setInitialDemand(other.getInitialDemand());
- }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -436,11 +388,11 @@ public final class StreamRefContainers {
akka.protobuf.CodedInputStream input,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
- akka.stream.remote.StreamRefContainers.SinkRef parsedMessage = null;
+ akka.stream.StreamRefMessages.SinkRef parsedMessage = null;
try {
parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
} catch (akka.protobuf.InvalidProtocolBufferException e) {
- parsedMessage = (akka.stream.remote.StreamRefContainers.SinkRef) e.getUnfinishedMessage();
+ parsedMessage = (akka.stream.StreamRefMessages.SinkRef) e.getUnfinishedMessage();
throw e;
} finally {
if (parsedMessage != null) {
@@ -452,9 +404,9 @@ public final class StreamRefContainers {
private int bitField0_;
// required .ActorRef targetRef = 1;
- private akka.stream.remote.StreamRefContainers.ActorRef targetRef_ = akka.stream.remote.StreamRefContainers.ActorRef.getDefaultInstance();
+ private akka.stream.StreamRefMessages.ActorRef targetRef_ = akka.stream.StreamRefMessages.ActorRef.getDefaultInstance();
private akka.protobuf.SingleFieldBuilder<
- akka.stream.remote.StreamRefContainers.ActorRef, akka.stream.remote.StreamRefContainers.ActorRef.Builder, akka.stream.remote.StreamRefContainers.ActorRefOrBuilder> targetRefBuilder_;
+ akka.stream.StreamRefMessages.ActorRef, akka.stream.StreamRefMessages.ActorRef.Builder, akka.stream.StreamRefMessages.ActorRefOrBuilder> targetRefBuilder_;
/**
* required .ActorRef targetRef = 1;
*/
@@ -464,7 +416,7 @@ public final class StreamRefContainers {
/**
* required .ActorRef targetRef = 1;
*/
- public akka.stream.remote.StreamRefContainers.ActorRef getTargetRef() {
+ public akka.stream.StreamRefMessages.ActorRef getTargetRef() {
if (targetRefBuilder_ == null) {
return targetRef_;
} else {
@@ -474,7 +426,7 @@ public final class StreamRefContainers {
/**
* required .ActorRef targetRef = 1;
*/
- public Builder setTargetRef(akka.stream.remote.StreamRefContainers.ActorRef value) {
+ public Builder setTargetRef(akka.stream.StreamRefMessages.ActorRef value) {
if (targetRefBuilder_ == null) {
if (value == null) {
throw new NullPointerException();
@@ -491,7 +443,7 @@ public final class StreamRefContainers {
* required .ActorRef targetRef = 1;
*/
public Builder setTargetRef(
- akka.stream.remote.StreamRefContainers.ActorRef.Builder builderForValue) {
+ akka.stream.StreamRefMessages.ActorRef.Builder builderForValue) {
if (targetRefBuilder_ == null) {
targetRef_ = builderForValue.build();
onChanged();
@@ -504,12 +456,12 @@ public final class StreamRefContainers {
/**
* required .ActorRef targetRef = 1;
*/
- public Builder mergeTargetRef(akka.stream.remote.StreamRefContainers.ActorRef value) {
+ public Builder mergeTargetRef(akka.stream.StreamRefMessages.ActorRef value) {
if (targetRefBuilder_ == null) {
if (((bitField0_ & 0x00000001) == 0x00000001) &&
- targetRef_ != akka.stream.remote.StreamRefContainers.ActorRef.getDefaultInstance()) {
+ targetRef_ != akka.stream.StreamRefMessages.ActorRef.getDefaultInstance()) {
targetRef_ =
- akka.stream.remote.StreamRefContainers.ActorRef.newBuilder(targetRef_).mergeFrom(value).buildPartial();
+ akka.stream.StreamRefMessages.ActorRef.newBuilder(targetRef_).mergeFrom(value).buildPartial();
} else {
targetRef_ = value;
}
@@ -525,7 +477,7 @@ public final class StreamRefContainers {
*/
public Builder clearTargetRef() {
if (targetRefBuilder_ == null) {
- targetRef_ = akka.stream.remote.StreamRefContainers.ActorRef.getDefaultInstance();
+ targetRef_ = akka.stream.StreamRefMessages.ActorRef.getDefaultInstance();
onChanged();
} else {
targetRefBuilder_.clear();
@@ -536,7 +488,7 @@ public final class StreamRefContainers {
/**
* required .ActorRef targetRef = 1;
*/
- public akka.stream.remote.StreamRefContainers.ActorRef.Builder getTargetRefBuilder() {
+ public akka.stream.StreamRefMessages.ActorRef.Builder getTargetRefBuilder() {
bitField0_ |= 0x00000001;
onChanged();
return getTargetRefFieldBuilder().getBuilder();
@@ -544,7 +496,7 @@ public final class StreamRefContainers {
/**
* required .ActorRef targetRef = 1;
*/
- public akka.stream.remote.StreamRefContainers.ActorRefOrBuilder getTargetRefOrBuilder() {
+ public akka.stream.StreamRefMessages.ActorRefOrBuilder getTargetRefOrBuilder() {
if (targetRefBuilder_ != null) {
return targetRefBuilder_.getMessageOrBuilder();
} else {
@@ -555,11 +507,11 @@ public final class StreamRefContainers {
* required .ActorRef targetRef = 1;
*/
private akka.protobuf.SingleFieldBuilder<
- akka.stream.remote.StreamRefContainers.ActorRef, akka.stream.remote.StreamRefContainers.ActorRef.Builder, akka.stream.remote.StreamRefContainers.ActorRefOrBuilder>
+ akka.stream.StreamRefMessages.ActorRef, akka.stream.StreamRefMessages.ActorRef.Builder, akka.stream.StreamRefMessages.ActorRefOrBuilder>
getTargetRefFieldBuilder() {
if (targetRefBuilder_ == null) {
targetRefBuilder_ = new akka.protobuf.SingleFieldBuilder<
- akka.stream.remote.StreamRefContainers.ActorRef, akka.stream.remote.StreamRefContainers.ActorRef.Builder, akka.stream.remote.StreamRefContainers.ActorRefOrBuilder>(
+ akka.stream.StreamRefMessages.ActorRef, akka.stream.StreamRefMessages.ActorRef.Builder, akka.stream.StreamRefMessages.ActorRefOrBuilder>(
targetRef_,
getParentForChildren(),
isClean());
@@ -568,39 +520,6 @@ public final class StreamRefContainers {
return targetRefBuilder_;
}
- // optional int64 initialDemand = 2;
- private long initialDemand_ ;
- /**
- * optional int64 initialDemand = 2;
- */
- public boolean hasInitialDemand() {
- return ((bitField0_ & 0x00000002) == 0x00000002);
- }
- /**
- * optional int64 initialDemand = 2;
- */
- public long getInitialDemand() {
- return initialDemand_;
- }
- /**
- * optional int64 initialDemand = 2;
- */
- public Builder setInitialDemand(long value) {
- bitField0_ |= 0x00000002;
- initialDemand_ = value;
- onChanged();
- return this;
- }
- /**
- * optional int64 initialDemand = 2;
- */
- public Builder clearInitialDemand() {
- bitField0_ = (bitField0_ & ~0x00000002);
- initialDemand_ = 0L;
- onChanged();
- return this;
- }
-
// @@protoc_insertion_point(builder_scope:SinkRef)
}
@@ -618,31 +537,16 @@ public final class StreamRefContainers {
// required .ActorRef originRef = 1;
/**
* required .ActorRef originRef = 1;
- *
- *
- * FIXME: it's basically SinkRef since we just ommit the optional initial demand...
- * FIXME: could it be that all those passable refs should be expressed internally as a StreamRef?
- *
- * FIXME: it's basically SinkRef since we just ommit the optional initial demand...
- * FIXME: could it be that all those passable refs should be expressed internally as a StreamRef?
- *
- * FIXME: it's basically SinkRef since we just ommit the optional initial demand...
- * FIXME: could it be that all those passable refs should be expressed internally as a StreamRef?
- *
*/
- akka.stream.remote.StreamRefContainers.ActorRefOrBuilder getOriginRefOrBuilder();
+ akka.stream.StreamRefMessages.ActorRefOrBuilder getOriginRefOrBuilder();
}
/**
* Protobuf type {@code SourceRef}
@@ -696,11 +600,11 @@ public final class StreamRefContainers {
break;
}
case 10: {
- akka.stream.remote.StreamRefContainers.ActorRef.Builder subBuilder = null;
+ akka.stream.StreamRefMessages.ActorRef.Builder subBuilder = null;
if (((bitField0_ & 0x00000001) == 0x00000001)) {
subBuilder = originRef_.toBuilder();
}
- originRef_ = input.readMessage(akka.stream.remote.StreamRefContainers.ActorRef.PARSER, extensionRegistry);
+ originRef_ = input.readMessage(akka.stream.StreamRefMessages.ActorRef.PARSER, extensionRegistry);
if (subBuilder != null) {
subBuilder.mergeFrom(originRef_);
originRef_ = subBuilder.buildPartial();
@@ -722,14 +626,14 @@ public final class StreamRefContainers {
}
public static final akka.protobuf.Descriptors.Descriptor
getDescriptor() {
- return akka.stream.remote.StreamRefContainers.internal_static_SourceRef_descriptor;
+ return akka.stream.StreamRefMessages.internal_static_SourceRef_descriptor;
}
protected akka.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
- return akka.stream.remote.StreamRefContainers.internal_static_SourceRef_fieldAccessorTable
+ return akka.stream.StreamRefMessages.internal_static_SourceRef_fieldAccessorTable
.ensureFieldAccessorsInitialized(
- akka.stream.remote.StreamRefContainers.SourceRef.class, akka.stream.remote.StreamRefContainers.SourceRef.Builder.class);
+ akka.stream.StreamRefMessages.SourceRef.class, akka.stream.StreamRefMessages.SourceRef.Builder.class);
}
public static akka.protobuf.Parser PARSER =
@@ -750,43 +654,28 @@ public final class StreamRefContainers {
private int bitField0_;
// required .ActorRef originRef = 1;
public static final int ORIGINREF_FIELD_NUMBER = 1;
- private akka.stream.remote.StreamRefContainers.ActorRef originRef_;
+ private akka.stream.StreamRefMessages.ActorRef originRef_;
/**
* required .ActorRef originRef = 1;
- *
- *
- * FIXME: it's basically SinkRef since we just ommit the optional initial demand...
- * FIXME: could it be that all those passable refs should be expressed internally as a StreamRef?
- *
- * FIXME: it's basically SinkRef since we just ommit the optional initial demand...
- * FIXME: could it be that all those passable refs should be expressed internally as a StreamRef?
- *
- * FIXME: it's basically SinkRef since we just ommit the optional initial demand...
- * FIXME: could it be that all those passable refs should be expressed internally as a StreamRef?
- *
*/
- public akka.stream.remote.StreamRefContainers.ActorRefOrBuilder getOriginRefOrBuilder() {
+ public akka.stream.StreamRefMessages.ActorRefOrBuilder getOriginRefOrBuilder() {
return originRef_;
}
private void initFields() {
- originRef_ = akka.stream.remote.StreamRefContainers.ActorRef.getDefaultInstance();
+ originRef_ = akka.stream.StreamRefMessages.ActorRef.getDefaultInstance();
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -836,53 +725,53 @@ public final class StreamRefContainers {
return super.writeReplace();
}
- public static akka.stream.remote.StreamRefContainers.SourceRef parseFrom(
+ public static akka.stream.StreamRefMessages.SourceRef parseFrom(
akka.protobuf.ByteString data)
throws akka.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
- public static akka.stream.remote.StreamRefContainers.SourceRef parseFrom(
+ public static akka.stream.StreamRefMessages.SourceRef parseFrom(
akka.protobuf.ByteString data,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws akka.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
- public static akka.stream.remote.StreamRefContainers.SourceRef parseFrom(byte[] data)
+ public static akka.stream.StreamRefMessages.SourceRef parseFrom(byte[] data)
throws akka.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
- public static akka.stream.remote.StreamRefContainers.SourceRef parseFrom(
+ public static akka.stream.StreamRefMessages.SourceRef parseFrom(
byte[] data,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws akka.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
- public static akka.stream.remote.StreamRefContainers.SourceRef parseFrom(java.io.InputStream input)
+ public static akka.stream.StreamRefMessages.SourceRef parseFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
- public static akka.stream.remote.StreamRefContainers.SourceRef parseFrom(
+ public static akka.stream.StreamRefMessages.SourceRef parseFrom(
java.io.InputStream input,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseFrom(input, extensionRegistry);
}
- public static akka.stream.remote.StreamRefContainers.SourceRef parseDelimitedFrom(java.io.InputStream input)
+ public static akka.stream.StreamRefMessages.SourceRef parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input);
}
- public static akka.stream.remote.StreamRefContainers.SourceRef parseDelimitedFrom(
+ public static akka.stream.StreamRefMessages.SourceRef parseDelimitedFrom(
java.io.InputStream input,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input, extensionRegistry);
}
- public static akka.stream.remote.StreamRefContainers.SourceRef parseFrom(
+ public static akka.stream.StreamRefMessages.SourceRef parseFrom(
akka.protobuf.CodedInputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
- public static akka.stream.remote.StreamRefContainers.SourceRef parseFrom(
+ public static akka.stream.StreamRefMessages.SourceRef parseFrom(
akka.protobuf.CodedInputStream input,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
@@ -891,7 +780,7 @@ public final class StreamRefContainers {
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
- public static Builder newBuilder(akka.stream.remote.StreamRefContainers.SourceRef prototype) {
+ public static Builder newBuilder(akka.stream.StreamRefMessages.SourceRef prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
@@ -907,20 +796,20 @@ public final class StreamRefContainers {
*/
public static final class Builder extends
akka.protobuf.GeneratedMessage.Builder
- implements akka.stream.remote.StreamRefContainers.SourceRefOrBuilder {
+ implements akka.stream.StreamRefMessages.SourceRefOrBuilder {
public static final akka.protobuf.Descriptors.Descriptor
getDescriptor() {
- return akka.stream.remote.StreamRefContainers.internal_static_SourceRef_descriptor;
+ return akka.stream.StreamRefMessages.internal_static_SourceRef_descriptor;
}
protected akka.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
- return akka.stream.remote.StreamRefContainers.internal_static_SourceRef_fieldAccessorTable
+ return akka.stream.StreamRefMessages.internal_static_SourceRef_fieldAccessorTable
.ensureFieldAccessorsInitialized(
- akka.stream.remote.StreamRefContainers.SourceRef.class, akka.stream.remote.StreamRefContainers.SourceRef.Builder.class);
+ akka.stream.StreamRefMessages.SourceRef.class, akka.stream.StreamRefMessages.SourceRef.Builder.class);
}
- // Construct using akka.stream.remote.StreamRefContainers.SourceRef.newBuilder()
+ // Construct using akka.stream.StreamRefMessages.SourceRef.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
@@ -942,7 +831,7 @@ public final class StreamRefContainers {
public Builder clear() {
super.clear();
if (originRefBuilder_ == null) {
- originRef_ = akka.stream.remote.StreamRefContainers.ActorRef.getDefaultInstance();
+ originRef_ = akka.stream.StreamRefMessages.ActorRef.getDefaultInstance();
} else {
originRefBuilder_.clear();
}
@@ -956,23 +845,23 @@ public final class StreamRefContainers {
public akka.protobuf.Descriptors.Descriptor
getDescriptorForType() {
- return akka.stream.remote.StreamRefContainers.internal_static_SourceRef_descriptor;
+ return akka.stream.StreamRefMessages.internal_static_SourceRef_descriptor;
}
- public akka.stream.remote.StreamRefContainers.SourceRef getDefaultInstanceForType() {
- return akka.stream.remote.StreamRefContainers.SourceRef.getDefaultInstance();
+ public akka.stream.StreamRefMessages.SourceRef getDefaultInstanceForType() {
+ return akka.stream.StreamRefMessages.SourceRef.getDefaultInstance();
}
- public akka.stream.remote.StreamRefContainers.SourceRef build() {
- akka.stream.remote.StreamRefContainers.SourceRef result = buildPartial();
+ public akka.stream.StreamRefMessages.SourceRef build() {
+ akka.stream.StreamRefMessages.SourceRef result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
- public akka.stream.remote.StreamRefContainers.SourceRef buildPartial() {
- akka.stream.remote.StreamRefContainers.SourceRef result = new akka.stream.remote.StreamRefContainers.SourceRef(this);
+ public akka.stream.StreamRefMessages.SourceRef buildPartial() {
+ akka.stream.StreamRefMessages.SourceRef result = new akka.stream.StreamRefMessages.SourceRef(this);
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
@@ -989,16 +878,16 @@ public final class StreamRefContainers {
}
public Builder mergeFrom(akka.protobuf.Message other) {
- if (other instanceof akka.stream.remote.StreamRefContainers.SourceRef) {
- return mergeFrom((akka.stream.remote.StreamRefContainers.SourceRef)other);
+ if (other instanceof akka.stream.StreamRefMessages.SourceRef) {
+ return mergeFrom((akka.stream.StreamRefMessages.SourceRef)other);
} else {
super.mergeFrom(other);
return this;
}
}
- public Builder mergeFrom(akka.stream.remote.StreamRefContainers.SourceRef other) {
- if (other == akka.stream.remote.StreamRefContainers.SourceRef.getDefaultInstance()) return this;
+ public Builder mergeFrom(akka.stream.StreamRefMessages.SourceRef other) {
+ if (other == akka.stream.StreamRefMessages.SourceRef.getDefaultInstance()) return this;
if (other.hasOriginRef()) {
mergeOriginRef(other.getOriginRef());
}
@@ -1022,11 +911,11 @@ public final class StreamRefContainers {
akka.protobuf.CodedInputStream input,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
- akka.stream.remote.StreamRefContainers.SourceRef parsedMessage = null;
+ akka.stream.StreamRefMessages.SourceRef parsedMessage = null;
try {
parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
} catch (akka.protobuf.InvalidProtocolBufferException e) {
- parsedMessage = (akka.stream.remote.StreamRefContainers.SourceRef) e.getUnfinishedMessage();
+ parsedMessage = (akka.stream.StreamRefMessages.SourceRef) e.getUnfinishedMessage();
throw e;
} finally {
if (parsedMessage != null) {
@@ -1038,29 +927,19 @@ public final class StreamRefContainers {
private int bitField0_;
// required .ActorRef originRef = 1;
- private akka.stream.remote.StreamRefContainers.ActorRef originRef_ = akka.stream.remote.StreamRefContainers.ActorRef.getDefaultInstance();
+ private akka.stream.StreamRefMessages.ActorRef originRef_ = akka.stream.StreamRefMessages.ActorRef.getDefaultInstance();
private akka.protobuf.SingleFieldBuilder<
- akka.stream.remote.StreamRefContainers.ActorRef, akka.stream.remote.StreamRefContainers.ActorRef.Builder, akka.stream.remote.StreamRefContainers.ActorRefOrBuilder> originRefBuilder_;
+ akka.stream.StreamRefMessages.ActorRef, akka.stream.StreamRefMessages.ActorRef.Builder, akka.stream.StreamRefMessages.ActorRefOrBuilder> originRefBuilder_;
/**
* required .ActorRef originRef = 1;
- *
- *
- * FIXME: it's basically SinkRef since we just ommit the optional initial demand...
- * FIXME: could it be that all those passable refs should be expressed internally as a StreamRef?
- *
- * FIXME: it's basically SinkRef since we just ommit the optional initial demand...
- * FIXME: could it be that all those passable refs should be expressed internally as a StreamRef?
- *
*/
- public akka.stream.remote.StreamRefContainers.ActorRef getOriginRef() {
+ public akka.stream.StreamRefMessages.ActorRef getOriginRef() {
if (originRefBuilder_ == null) {
return originRef_;
} else {
@@ -1069,13 +948,8 @@ public final class StreamRefContainers {
}
/**
* required .ActorRef originRef = 1;
- *
- *
- * FIXME: it's basically SinkRef since we just ommit the optional initial demand...
- * FIXME: could it be that all those passable refs should be expressed internally as a StreamRef?
- *
*/
- public Builder setOriginRef(akka.stream.remote.StreamRefContainers.ActorRef value) {
+ public Builder setOriginRef(akka.stream.StreamRefMessages.ActorRef value) {
if (originRefBuilder_ == null) {
if (value == null) {
throw new NullPointerException();
@@ -1090,14 +964,9 @@ public final class StreamRefContainers {
}
/**
* required .ActorRef originRef = 1;
- *
- *
- * FIXME: it's basically SinkRef since we just ommit the optional initial demand...
- * FIXME: could it be that all those passable refs should be expressed internally as a StreamRef?
- *
*/
public Builder setOriginRef(
- akka.stream.remote.StreamRefContainers.ActorRef.Builder builderForValue) {
+ akka.stream.StreamRefMessages.ActorRef.Builder builderForValue) {
if (originRefBuilder_ == null) {
originRef_ = builderForValue.build();
onChanged();
@@ -1109,18 +978,13 @@ public final class StreamRefContainers {
}
/**
* required .ActorRef originRef = 1;
- *
- *
- * FIXME: it's basically SinkRef since we just ommit the optional initial demand...
- * FIXME: could it be that all those passable refs should be expressed internally as a StreamRef?
- *
*/
- public Builder mergeOriginRef(akka.stream.remote.StreamRefContainers.ActorRef value) {
+ public Builder mergeOriginRef(akka.stream.StreamRefMessages.ActorRef value) {
if (originRefBuilder_ == null) {
if (((bitField0_ & 0x00000001) == 0x00000001) &&
- originRef_ != akka.stream.remote.StreamRefContainers.ActorRef.getDefaultInstance()) {
+ originRef_ != akka.stream.StreamRefMessages.ActorRef.getDefaultInstance()) {
originRef_ =
- akka.stream.remote.StreamRefContainers.ActorRef.newBuilder(originRef_).mergeFrom(value).buildPartial();
+ akka.stream.StreamRefMessages.ActorRef.newBuilder(originRef_).mergeFrom(value).buildPartial();
} else {
originRef_ = value;
}
@@ -1133,15 +997,10 @@ public final class StreamRefContainers {
}
/**
* required .ActorRef originRef = 1;
- *
- *
- * FIXME: it's basically SinkRef since we just ommit the optional initial demand...
- * FIXME: could it be that all those passable refs should be expressed internally as a StreamRef?
- *
*/
public Builder clearOriginRef() {
if (originRefBuilder_ == null) {
- originRef_ = akka.stream.remote.StreamRefContainers.ActorRef.getDefaultInstance();
+ originRef_ = akka.stream.StreamRefMessages.ActorRef.getDefaultInstance();
onChanged();
} else {
originRefBuilder_.clear();
@@ -1151,26 +1010,16 @@ public final class StreamRefContainers {
}
/**
* required .ActorRef originRef = 1;
- *
- *
- * FIXME: it's basically SinkRef since we just ommit the optional initial demand...
- * FIXME: could it be that all those passable refs should be expressed internally as a StreamRef?
- *
- * FIXME: it's basically SinkRef since we just ommit the optional initial demand...
- * FIXME: could it be that all those passable refs should be expressed internally as a StreamRef?
- *
*/
- public akka.stream.remote.StreamRefContainers.ActorRefOrBuilder getOriginRefOrBuilder() {
+ public akka.stream.StreamRefMessages.ActorRefOrBuilder getOriginRefOrBuilder() {
if (originRefBuilder_ != null) {
return originRefBuilder_.getMessageOrBuilder();
} else {
@@ -1179,18 +1028,13 @@ public final class StreamRefContainers {
}
/**
* required .ActorRef originRef = 1;
- *
- *
- * FIXME: it's basically SinkRef since we just ommit the optional initial demand...
- * FIXME: could it be that all those passable refs should be expressed internally as a StreamRef?
- *
*/
private akka.protobuf.SingleFieldBuilder<
- akka.stream.remote.StreamRefContainers.ActorRef, akka.stream.remote.StreamRefContainers.ActorRef.Builder, akka.stream.remote.StreamRefContainers.ActorRefOrBuilder>
+ akka.stream.StreamRefMessages.ActorRef, akka.stream.StreamRefMessages.ActorRef.Builder, akka.stream.StreamRefMessages.ActorRefOrBuilder>
getOriginRefFieldBuilder() {
if (originRefBuilder_ == null) {
originRefBuilder_ = new akka.protobuf.SingleFieldBuilder<
- akka.stream.remote.StreamRefContainers.ActorRef, akka.stream.remote.StreamRefContainers.ActorRef.Builder, akka.stream.remote.StreamRefContainers.ActorRefOrBuilder>(
+ akka.stream.StreamRefMessages.ActorRef, akka.stream.StreamRefMessages.ActorRef.Builder, akka.stream.StreamRefMessages.ActorRefOrBuilder>(
originRef_,
getParentForChildren(),
isClean());
@@ -1298,14 +1142,14 @@ public final class StreamRefContainers {
}
public static final akka.protobuf.Descriptors.Descriptor
getDescriptor() {
- return akka.stream.remote.StreamRefContainers.internal_static_ActorRef_descriptor;
+ return akka.stream.StreamRefMessages.internal_static_ActorRef_descriptor;
}
protected akka.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
- return akka.stream.remote.StreamRefContainers.internal_static_ActorRef_fieldAccessorTable
+ return akka.stream.StreamRefMessages.internal_static_ActorRef_fieldAccessorTable
.ensureFieldAccessorsInitialized(
- akka.stream.remote.StreamRefContainers.ActorRef.class, akka.stream.remote.StreamRefContainers.ActorRef.Builder.class);
+ akka.stream.StreamRefMessages.ActorRef.class, akka.stream.StreamRefMessages.ActorRef.Builder.class);
}
public static akka.protobuf.Parser PARSER =
@@ -1414,53 +1258,53 @@ public final class StreamRefContainers {
return super.writeReplace();
}
- public static akka.stream.remote.StreamRefContainers.ActorRef parseFrom(
+ public static akka.stream.StreamRefMessages.ActorRef parseFrom(
akka.protobuf.ByteString data)
throws akka.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
- public static akka.stream.remote.StreamRefContainers.ActorRef parseFrom(
+ public static akka.stream.StreamRefMessages.ActorRef parseFrom(
akka.protobuf.ByteString data,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws akka.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
- public static akka.stream.remote.StreamRefContainers.ActorRef parseFrom(byte[] data)
+ public static akka.stream.StreamRefMessages.ActorRef parseFrom(byte[] data)
throws akka.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
- public static akka.stream.remote.StreamRefContainers.ActorRef parseFrom(
+ public static akka.stream.StreamRefMessages.ActorRef parseFrom(
byte[] data,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws akka.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
- public static akka.stream.remote.StreamRefContainers.ActorRef parseFrom(java.io.InputStream input)
+ public static akka.stream.StreamRefMessages.ActorRef parseFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
- public static akka.stream.remote.StreamRefContainers.ActorRef parseFrom(
+ public static akka.stream.StreamRefMessages.ActorRef parseFrom(
java.io.InputStream input,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseFrom(input, extensionRegistry);
}
- public static akka.stream.remote.StreamRefContainers.ActorRef parseDelimitedFrom(java.io.InputStream input)
+ public static akka.stream.StreamRefMessages.ActorRef parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input);
}
- public static akka.stream.remote.StreamRefContainers.ActorRef parseDelimitedFrom(
+ public static akka.stream.StreamRefMessages.ActorRef parseDelimitedFrom(
java.io.InputStream input,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input, extensionRegistry);
}
- public static akka.stream.remote.StreamRefContainers.ActorRef parseFrom(
+ public static akka.stream.StreamRefMessages.ActorRef parseFrom(
akka.protobuf.CodedInputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
- public static akka.stream.remote.StreamRefContainers.ActorRef parseFrom(
+ public static akka.stream.StreamRefMessages.ActorRef parseFrom(
akka.protobuf.CodedInputStream input,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
@@ -1469,7 +1313,7 @@ public final class StreamRefContainers {
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
- public static Builder newBuilder(akka.stream.remote.StreamRefContainers.ActorRef prototype) {
+ public static Builder newBuilder(akka.stream.StreamRefMessages.ActorRef prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
@@ -1485,20 +1329,20 @@ public final class StreamRefContainers {
*/
public static final class Builder extends
akka.protobuf.GeneratedMessage.Builder
- implements akka.stream.remote.StreamRefContainers.ActorRefOrBuilder {
+ implements akka.stream.StreamRefMessages.ActorRefOrBuilder {
public static final akka.protobuf.Descriptors.Descriptor
getDescriptor() {
- return akka.stream.remote.StreamRefContainers.internal_static_ActorRef_descriptor;
+ return akka.stream.StreamRefMessages.internal_static_ActorRef_descriptor;
}
protected akka.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
- return akka.stream.remote.StreamRefContainers.internal_static_ActorRef_fieldAccessorTable
+ return akka.stream.StreamRefMessages.internal_static_ActorRef_fieldAccessorTable
.ensureFieldAccessorsInitialized(
- akka.stream.remote.StreamRefContainers.ActorRef.class, akka.stream.remote.StreamRefContainers.ActorRef.Builder.class);
+ akka.stream.StreamRefMessages.ActorRef.class, akka.stream.StreamRefMessages.ActorRef.Builder.class);
}
- // Construct using akka.stream.remote.StreamRefContainers.ActorRef.newBuilder()
+ // Construct using akka.stream.StreamRefMessages.ActorRef.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
@@ -1529,23 +1373,23 @@ public final class StreamRefContainers {
public akka.protobuf.Descriptors.Descriptor
getDescriptorForType() {
- return akka.stream.remote.StreamRefContainers.internal_static_ActorRef_descriptor;
+ return akka.stream.StreamRefMessages.internal_static_ActorRef_descriptor;
}
- public akka.stream.remote.StreamRefContainers.ActorRef getDefaultInstanceForType() {
- return akka.stream.remote.StreamRefContainers.ActorRef.getDefaultInstance();
+ public akka.stream.StreamRefMessages.ActorRef getDefaultInstanceForType() {
+ return akka.stream.StreamRefMessages.ActorRef.getDefaultInstance();
}
- public akka.stream.remote.StreamRefContainers.ActorRef build() {
- akka.stream.remote.StreamRefContainers.ActorRef result = buildPartial();
+ public akka.stream.StreamRefMessages.ActorRef build() {
+ akka.stream.StreamRefMessages.ActorRef result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
- public akka.stream.remote.StreamRefContainers.ActorRef buildPartial() {
- akka.stream.remote.StreamRefContainers.ActorRef result = new akka.stream.remote.StreamRefContainers.ActorRef(this);
+ public akka.stream.StreamRefMessages.ActorRef buildPartial() {
+ akka.stream.StreamRefMessages.ActorRef result = new akka.stream.StreamRefMessages.ActorRef(this);
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
@@ -1558,16 +1402,16 @@ public final class StreamRefContainers {
}
public Builder mergeFrom(akka.protobuf.Message other) {
- if (other instanceof akka.stream.remote.StreamRefContainers.ActorRef) {
- return mergeFrom((akka.stream.remote.StreamRefContainers.ActorRef)other);
+ if (other instanceof akka.stream.StreamRefMessages.ActorRef) {
+ return mergeFrom((akka.stream.StreamRefMessages.ActorRef)other);
} else {
super.mergeFrom(other);
return this;
}
}
- public Builder mergeFrom(akka.stream.remote.StreamRefContainers.ActorRef other) {
- if (other == akka.stream.remote.StreamRefContainers.ActorRef.getDefaultInstance()) return this;
+ public Builder mergeFrom(akka.stream.StreamRefMessages.ActorRef other) {
+ if (other == akka.stream.StreamRefMessages.ActorRef.getDefaultInstance()) return this;
if (other.hasPath()) {
bitField0_ |= 0x00000001;
path_ = other.path_;
@@ -1589,11 +1433,11 @@ public final class StreamRefContainers {
akka.protobuf.CodedInputStream input,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
- akka.stream.remote.StreamRefContainers.ActorRef parsedMessage = null;
+ akka.stream.StreamRefMessages.ActorRef parsedMessage = null;
try {
parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
} catch (akka.protobuf.InvalidProtocolBufferException e) {
- parsedMessage = (akka.stream.remote.StreamRefContainers.ActorRef) e.getUnfinishedMessage();
+ parsedMessage = (akka.stream.StreamRefMessages.ActorRef) e.getUnfinishedMessage();
throw e;
} finally {
if (parsedMessage != null) {
@@ -1689,525 +1533,6 @@ public final class StreamRefContainers {
// @@protoc_insertion_point(class_scope:ActorRef)
}
- public interface OptionOrBuilder
- extends akka.protobuf.MessageOrBuilder {
-
- // optional .Payload value = 1;
- /**
- * optional .Payload value = 1;
- */
- boolean hasValue();
- /**
- * optional .Payload value = 1;
- */
- akka.stream.remote.StreamRefContainers.Payload getValue();
- /**
- * optional .Payload value = 1;
- */
- akka.stream.remote.StreamRefContainers.PayloadOrBuilder getValueOrBuilder();
- }
- /**
- * Protobuf type {@code Option}
- */
- public static final class Option extends
- akka.protobuf.GeneratedMessage
- implements OptionOrBuilder {
- // Use Option.newBuilder() to construct.
- private Option(akka.protobuf.GeneratedMessage.Builder> builder) {
- super(builder);
- this.unknownFields = builder.getUnknownFields();
- }
- private Option(boolean noInit) { this.unknownFields = akka.protobuf.UnknownFieldSet.getDefaultInstance(); }
-
- private static final Option defaultInstance;
- public static Option getDefaultInstance() {
- return defaultInstance;
- }
-
- public Option getDefaultInstanceForType() {
- return defaultInstance;
- }
-
- private final akka.protobuf.UnknownFieldSet unknownFields;
- @java.lang.Override
- public final akka.protobuf.UnknownFieldSet
- getUnknownFields() {
- return this.unknownFields;
- }
- private Option(
- akka.protobuf.CodedInputStream input,
- akka.protobuf.ExtensionRegistryLite extensionRegistry)
- throws akka.protobuf.InvalidProtocolBufferException {
- initFields();
- int mutable_bitField0_ = 0;
- akka.protobuf.UnknownFieldSet.Builder unknownFields =
- akka.protobuf.UnknownFieldSet.newBuilder();
- try {
- boolean done = false;
- while (!done) {
- int tag = input.readTag();
- switch (tag) {
- case 0:
- done = true;
- break;
- default: {
- if (!parseUnknownField(input, unknownFields,
- extensionRegistry, tag)) {
- done = true;
- }
- break;
- }
- case 10: {
- akka.stream.remote.StreamRefContainers.Payload.Builder subBuilder = null;
- if (((bitField0_ & 0x00000001) == 0x00000001)) {
- subBuilder = value_.toBuilder();
- }
- value_ = input.readMessage(akka.stream.remote.StreamRefContainers.Payload.PARSER, extensionRegistry);
- if (subBuilder != null) {
- subBuilder.mergeFrom(value_);
- value_ = subBuilder.buildPartial();
- }
- bitField0_ |= 0x00000001;
- break;
- }
- }
- }
- } catch (akka.protobuf.InvalidProtocolBufferException e) {
- throw e.setUnfinishedMessage(this);
- } catch (java.io.IOException e) {
- throw new akka.protobuf.InvalidProtocolBufferException(
- e.getMessage()).setUnfinishedMessage(this);
- } finally {
- this.unknownFields = unknownFields.build();
- makeExtensionsImmutable();
- }
- }
- public static final akka.protobuf.Descriptors.Descriptor
- getDescriptor() {
- return akka.stream.remote.StreamRefContainers.internal_static_Option_descriptor;
- }
-
- protected akka.protobuf.GeneratedMessage.FieldAccessorTable
- internalGetFieldAccessorTable() {
- return akka.stream.remote.StreamRefContainers.internal_static_Option_fieldAccessorTable
- .ensureFieldAccessorsInitialized(
- akka.stream.remote.StreamRefContainers.Option.class, akka.stream.remote.StreamRefContainers.Option.Builder.class);
- }
-
- public static akka.protobuf.Parser