From 682b047ac377dda40629e1197a9e7f59faf91552 Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Tue, 2 Jan 2018 18:43:27 +0100 Subject: [PATCH] +str #24229 first working SinkRef, though serialization is wrong somehow (when serialize-messages = on) silly serialization mistake, should have fixed serialize as well tage actors now can have names, which helps a lot in debugging thread weirdness make sure to fail properly, actually go over remoting issue with not receiving the SinkRef... what initial working SinkRef over remoting remote Sink failure must fail origin Source as well cleaning up and adding failyre handling SinkRef now with low-watermark RequestStrategy source ref works, yet completely duplicated code --- .../scala/akka/actor/dungeon/Children.scala | 8 +- .../protobuf/msg/TwoPhaseSetMessages.java | 1236 +++++ .../persistence/proto/FlightAppModels.java | 823 +++ .../serialization/MessageFormats.java | 48 + .../scala/akka/remote/artery/SendQueue.scala | 3 +- .../akka/stream/remote/StreamRefsSpec.scala | 274 + .../stream/remote/StreamRefContainers.java | 4746 +++++++++++++++++ .../main/protobuf/StreamRefContainers.proto | 56 + akka-stream/src/main/resources/reference.conf | 49 +- .../scala/akka/stream/remote/StreamRefs.scala | 67 + .../stream/remote/impl/StreamRefsMaster.scala | 61 + .../akka/stream/remote/scaladsl/SinkRef.scala | 274 + .../stream/remote/scaladsl/SourceRef.scala | 289 + .../remote/scaladsl/StreamRefSettings.scala | 19 + .../serialization/StreamRefSerializer.scala | 160 + .../scala/akka/stream/stage/GraphStage.scala | 42 +- build.sbt | 5 +- 17 files changed, 8145 insertions(+), 15 deletions(-) create mode 100644 akka-docs/src/main/java/docs/ddata/protobuf/msg/TwoPhaseSetMessages.java create mode 100644 akka-docs/src/main/java/docs/persistence/proto/FlightAppModels.java create mode 100644 akka-stream-tests/src/test/scala/akka/stream/remote/StreamRefsSpec.scala create mode 100644 akka-stream/src/main/java/akka/stream/remote/StreamRefContainers.java create mode 100644 akka-stream/src/main/protobuf/StreamRefContainers.proto create mode 100644 akka-stream/src/main/scala/akka/stream/remote/StreamRefs.scala create mode 100644 akka-stream/src/main/scala/akka/stream/remote/impl/StreamRefsMaster.scala create mode 100644 akka-stream/src/main/scala/akka/stream/remote/scaladsl/SinkRef.scala create mode 100644 akka-stream/src/main/scala/akka/stream/remote/scaladsl/SourceRef.scala create mode 100644 akka-stream/src/main/scala/akka/stream/remote/scaladsl/StreamRefSettings.scala create mode 100644 akka-stream/src/main/scala/akka/stream/remote/serialization/StreamRefSerializer.scala diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala index 80c7972bd7..67531c7a24 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala @@ -9,10 +9,12 @@ import scala.util.control.NonFatal import scala.collection.immutable import akka.actor._ import akka.serialization.SerializationExtension -import akka.util.{ Unsafe, Helpers } +import akka.util.{ Helpers, Unsafe } import akka.serialization.SerializerWithStringManifest import java.util.Optional +import akka.event.Logging + private[akka] object Children { val GetNobody = () ⇒ Nobody } @@ -192,7 +194,8 @@ private[akka] trait Children { this: ActorCell ⇒ protected def getAllChildStats: immutable.Iterable[ChildRestartStats] = childrenRefs.stats - override def getSingleChild(name: String): InternalActorRef = + override def getSingleChild(name: String): InternalActorRef = { + if (name.indexOf('#') == -1) { // optimization for the non-uid case getChildByName(name) match { @@ -207,6 +210,7 @@ private[akka] trait Children { this: ActorCell ⇒ case _ ⇒ getFunctionRefOrNobody(childName, uid) } } + } protected def removeChildAndGetStateChange(child: ActorRef): Option[SuspendReason] = { @tailrec def removeChild(ref: ActorRef): ChildrenContainer = { diff --git a/akka-docs/src/main/java/docs/ddata/protobuf/msg/TwoPhaseSetMessages.java b/akka-docs/src/main/java/docs/ddata/protobuf/msg/TwoPhaseSetMessages.java new file mode 100644 index 0000000000..e882dbfd57 --- /dev/null +++ b/akka-docs/src/main/java/docs/ddata/protobuf/msg/TwoPhaseSetMessages.java @@ -0,0 +1,1236 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: TwoPhaseSetMessages.proto + +package docs.ddata.protobuf.msg; + +public final class TwoPhaseSetMessages { + private TwoPhaseSetMessages() {} + public static void registerAllExtensions( + akka.protobuf.ExtensionRegistry registry) { + } + public interface TwoPhaseSetOrBuilder + extends akka.protobuf.MessageOrBuilder { + + // repeated string adds = 1; + /** + * repeated string adds = 1; + */ + java.util.List + getAddsList(); + /** + * repeated string adds = 1; + */ + int getAddsCount(); + /** + * repeated string adds = 1; + */ + java.lang.String getAdds(int index); + /** + * repeated string adds = 1; + */ + akka.protobuf.ByteString + getAddsBytes(int index); + + // repeated string removals = 2; + /** + * repeated string removals = 2; + */ + java.util.List + getRemovalsList(); + /** + * repeated string removals = 2; + */ + int getRemovalsCount(); + /** + * repeated string removals = 2; + */ + java.lang.String getRemovals(int index); + /** + * repeated string removals = 2; + */ + akka.protobuf.ByteString + getRemovalsBytes(int index); + } + /** + * Protobuf type {@code docs.ddata.TwoPhaseSet} + */ + public static final class TwoPhaseSet extends + akka.protobuf.GeneratedMessage + implements TwoPhaseSetOrBuilder { + // Use TwoPhaseSet.newBuilder() to construct. + private TwoPhaseSet(akka.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private TwoPhaseSet(boolean noInit) { this.unknownFields = akka.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final TwoPhaseSet defaultInstance; + public static TwoPhaseSet getDefaultInstance() { + return defaultInstance; + } + + public TwoPhaseSet getDefaultInstanceForType() { + return defaultInstance; + } + + private final akka.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final akka.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private TwoPhaseSet( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + akka.protobuf.UnknownFieldSet.Builder unknownFields = + akka.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 10: { + if (!((mutable_bitField0_ & 0x00000001) == 0x00000001)) { + adds_ = new akka.protobuf.LazyStringArrayList(); + mutable_bitField0_ |= 0x00000001; + } + adds_.add(input.readBytes()); + break; + } + case 18: { + if (!((mutable_bitField0_ & 0x00000002) == 0x00000002)) { + removals_ = new akka.protobuf.LazyStringArrayList(); + mutable_bitField0_ |= 0x00000002; + } + removals_.add(input.readBytes()); + break; + } + } + } + } catch (akka.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new akka.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + if (((mutable_bitField0_ & 0x00000001) == 0x00000001)) { + adds_ = new akka.protobuf.UnmodifiableLazyStringList(adds_); + } + if (((mutable_bitField0_ & 0x00000002) == 0x00000002)) { + removals_ = new akka.protobuf.UnmodifiableLazyStringList(removals_); + } + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final akka.protobuf.Descriptors.Descriptor + getDescriptor() { + return docs.ddata.protobuf.msg.TwoPhaseSetMessages.internal_static_docs_ddata_TwoPhaseSet_descriptor; + } + + protected akka.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return docs.ddata.protobuf.msg.TwoPhaseSetMessages.internal_static_docs_ddata_TwoPhaseSet_fieldAccessorTable + .ensureFieldAccessorsInitialized( + docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet.class, docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet.Builder.class); + } + + public static akka.protobuf.Parser PARSER = + new akka.protobuf.AbstractParser() { + public TwoPhaseSet parsePartialFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return new TwoPhaseSet(input, extensionRegistry); + } + }; + + @java.lang.Override + public akka.protobuf.Parser getParserForType() { + return PARSER; + } + + // repeated string adds = 1; + public static final int ADDS_FIELD_NUMBER = 1; + private akka.protobuf.LazyStringList adds_; + /** + * repeated string adds = 1; + */ + public java.util.List + getAddsList() { + return adds_; + } + /** + * repeated string adds = 1; + */ + public int getAddsCount() { + return adds_.size(); + } + /** + * repeated string adds = 1; + */ + public java.lang.String getAdds(int index) { + return adds_.get(index); + } + /** + * repeated string adds = 1; + */ + public akka.protobuf.ByteString + getAddsBytes(int index) { + return adds_.getByteString(index); + } + + // repeated string removals = 2; + public static final int REMOVALS_FIELD_NUMBER = 2; + private akka.protobuf.LazyStringList removals_; + /** + * repeated string removals = 2; + */ + public java.util.List + getRemovalsList() { + return removals_; + } + /** + * repeated string removals = 2; + */ + public int getRemovalsCount() { + return removals_.size(); + } + /** + * repeated string removals = 2; + */ + public java.lang.String getRemovals(int index) { + return removals_.get(index); + } + /** + * repeated string removals = 2; + */ + public akka.protobuf.ByteString + getRemovalsBytes(int index) { + return removals_.getByteString(index); + } + + private void initFields() { + adds_ = akka.protobuf.LazyStringArrayList.EMPTY; + removals_ = akka.protobuf.LazyStringArrayList.EMPTY; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(akka.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + for (int i = 0; i < adds_.size(); i++) { + output.writeBytes(1, adds_.getByteString(i)); + } + for (int i = 0; i < removals_.size(); i++) { + output.writeBytes(2, removals_.getByteString(i)); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + { + int dataSize = 0; + for (int i = 0; i < adds_.size(); i++) { + dataSize += akka.protobuf.CodedOutputStream + .computeBytesSizeNoTag(adds_.getByteString(i)); + } + size += dataSize; + size += 1 * getAddsList().size(); + } + { + int dataSize = 0; + for (int i = 0; i < removals_.size(); i++) { + dataSize += akka.protobuf.CodedOutputStream + .computeBytesSizeNoTag(removals_.getByteString(i)); + } + size += dataSize; + size += 1 * getRemovalsList().size(); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet parseFrom( + akka.protobuf.ByteString data) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet parseFrom( + akka.protobuf.ByteString data, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet parseFrom(byte[] data) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet parseFrom( + byte[] data, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet parseFrom( + java.io.InputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet parseDelimitedFrom( + java.io.InputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet parseFrom( + akka.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet parseFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + akka.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code docs.ddata.TwoPhaseSet} + */ + public static final class Builder extends + akka.protobuf.GeneratedMessage.Builder + implements docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSetOrBuilder { + public static final akka.protobuf.Descriptors.Descriptor + getDescriptor() { + return docs.ddata.protobuf.msg.TwoPhaseSetMessages.internal_static_docs_ddata_TwoPhaseSet_descriptor; + } + + protected akka.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return docs.ddata.protobuf.msg.TwoPhaseSetMessages.internal_static_docs_ddata_TwoPhaseSet_fieldAccessorTable + .ensureFieldAccessorsInitialized( + docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet.class, docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet.Builder.class); + } + + // Construct using docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + akka.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + adds_ = akka.protobuf.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000001); + removals_ = akka.protobuf.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public akka.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return docs.ddata.protobuf.msg.TwoPhaseSetMessages.internal_static_docs_ddata_TwoPhaseSet_descriptor; + } + + public docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet getDefaultInstanceForType() { + return docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet.getDefaultInstance(); + } + + public docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet build() { + docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet buildPartial() { + docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet result = new docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet(this); + int from_bitField0_ = bitField0_; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + adds_ = new akka.protobuf.UnmodifiableLazyStringList( + adds_); + bitField0_ = (bitField0_ & ~0x00000001); + } + result.adds_ = adds_; + if (((bitField0_ & 0x00000002) == 0x00000002)) { + removals_ = new akka.protobuf.UnmodifiableLazyStringList( + removals_); + bitField0_ = (bitField0_ & ~0x00000002); + } + result.removals_ = removals_; + onBuilt(); + return result; + } + + public Builder mergeFrom(akka.protobuf.Message other) { + if (other instanceof docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet) { + return mergeFrom((docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet other) { + if (other == docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet.getDefaultInstance()) return this; + if (!other.adds_.isEmpty()) { + if (adds_.isEmpty()) { + adds_ = other.adds_; + bitField0_ = (bitField0_ & ~0x00000001); + } else { + ensureAddsIsMutable(); + adds_.addAll(other.adds_); + } + onChanged(); + } + if (!other.removals_.isEmpty()) { + if (removals_.isEmpty()) { + removals_ = other.removals_; + bitField0_ = (bitField0_ & ~0x00000002); + } else { + ensureRemovalsIsMutable(); + removals_.addAll(other.removals_); + } + onChanged(); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + return true; + } + + public Builder mergeFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (akka.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // repeated string adds = 1; + private akka.protobuf.LazyStringList adds_ = akka.protobuf.LazyStringArrayList.EMPTY; + private void ensureAddsIsMutable() { + if (!((bitField0_ & 0x00000001) == 0x00000001)) { + adds_ = new akka.protobuf.LazyStringArrayList(adds_); + bitField0_ |= 0x00000001; + } + } + /** + * repeated string adds = 1; + */ + public java.util.List + getAddsList() { + return java.util.Collections.unmodifiableList(adds_); + } + /** + * repeated string adds = 1; + */ + public int getAddsCount() { + return adds_.size(); + } + /** + * repeated string adds = 1; + */ + public java.lang.String getAdds(int index) { + return adds_.get(index); + } + /** + * repeated string adds = 1; + */ + public akka.protobuf.ByteString + getAddsBytes(int index) { + return adds_.getByteString(index); + } + /** + * repeated string adds = 1; + */ + public Builder setAdds( + int index, java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureAddsIsMutable(); + adds_.set(index, value); + onChanged(); + return this; + } + /** + * repeated string adds = 1; + */ + public Builder addAdds( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureAddsIsMutable(); + adds_.add(value); + onChanged(); + return this; + } + /** + * repeated string adds = 1; + */ + public Builder addAllAdds( + java.lang.Iterable values) { + ensureAddsIsMutable(); + super.addAll(values, adds_); + onChanged(); + return this; + } + /** + * repeated string adds = 1; + */ + public Builder clearAdds() { + adds_ = akka.protobuf.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000001); + onChanged(); + return this; + } + /** + * repeated string adds = 1; + */ + public Builder addAddsBytes( + akka.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + ensureAddsIsMutable(); + adds_.add(value); + onChanged(); + return this; + } + + // repeated string removals = 2; + private akka.protobuf.LazyStringList removals_ = akka.protobuf.LazyStringArrayList.EMPTY; + private void ensureRemovalsIsMutable() { + if (!((bitField0_ & 0x00000002) == 0x00000002)) { + removals_ = new akka.protobuf.LazyStringArrayList(removals_); + bitField0_ |= 0x00000002; + } + } + /** + * repeated string removals = 2; + */ + public java.util.List + getRemovalsList() { + return java.util.Collections.unmodifiableList(removals_); + } + /** + * repeated string removals = 2; + */ + public int getRemovalsCount() { + return removals_.size(); + } + /** + * repeated string removals = 2; + */ + public java.lang.String getRemovals(int index) { + return removals_.get(index); + } + /** + * repeated string removals = 2; + */ + public akka.protobuf.ByteString + getRemovalsBytes(int index) { + return removals_.getByteString(index); + } + /** + * repeated string removals = 2; + */ + public Builder setRemovals( + int index, java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureRemovalsIsMutable(); + removals_.set(index, value); + onChanged(); + return this; + } + /** + * repeated string removals = 2; + */ + public Builder addRemovals( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + ensureRemovalsIsMutable(); + removals_.add(value); + onChanged(); + return this; + } + /** + * repeated string removals = 2; + */ + public Builder addAllRemovals( + java.lang.Iterable values) { + ensureRemovalsIsMutable(); + super.addAll(values, removals_); + onChanged(); + return this; + } + /** + * repeated string removals = 2; + */ + public Builder clearRemovals() { + removals_ = akka.protobuf.LazyStringArrayList.EMPTY; + bitField0_ = (bitField0_ & ~0x00000002); + onChanged(); + return this; + } + /** + * repeated string removals = 2; + */ + public Builder addRemovalsBytes( + akka.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + ensureRemovalsIsMutable(); + removals_.add(value); + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:docs.ddata.TwoPhaseSet) + } + + static { + defaultInstance = new TwoPhaseSet(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:docs.ddata.TwoPhaseSet) + } + + public interface TwoPhaseSet2OrBuilder + extends akka.protobuf.MessageOrBuilder { + + // optional bytes adds = 1; + /** + * optional bytes adds = 1; + */ + boolean hasAdds(); + /** + * optional bytes adds = 1; + */ + akka.protobuf.ByteString getAdds(); + + // optional bytes removals = 2; + /** + * optional bytes removals = 2; + */ + boolean hasRemovals(); + /** + * optional bytes removals = 2; + */ + akka.protobuf.ByteString getRemovals(); + } + /** + * Protobuf type {@code docs.ddata.TwoPhaseSet2} + * + *
+   *#twophaseset2
+   * 
+ */ + public static final class TwoPhaseSet2 extends + akka.protobuf.GeneratedMessage + implements TwoPhaseSet2OrBuilder { + // Use TwoPhaseSet2.newBuilder() to construct. + private TwoPhaseSet2(akka.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private TwoPhaseSet2(boolean noInit) { this.unknownFields = akka.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final TwoPhaseSet2 defaultInstance; + public static TwoPhaseSet2 getDefaultInstance() { + return defaultInstance; + } + + public TwoPhaseSet2 getDefaultInstanceForType() { + return defaultInstance; + } + + private final akka.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final akka.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private TwoPhaseSet2( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + akka.protobuf.UnknownFieldSet.Builder unknownFields = + akka.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 10: { + bitField0_ |= 0x00000001; + adds_ = input.readBytes(); + break; + } + case 18: { + bitField0_ |= 0x00000002; + removals_ = input.readBytes(); + break; + } + } + } + } catch (akka.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new akka.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final akka.protobuf.Descriptors.Descriptor + getDescriptor() { + return docs.ddata.protobuf.msg.TwoPhaseSetMessages.internal_static_docs_ddata_TwoPhaseSet2_descriptor; + } + + protected akka.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return docs.ddata.protobuf.msg.TwoPhaseSetMessages.internal_static_docs_ddata_TwoPhaseSet2_fieldAccessorTable + .ensureFieldAccessorsInitialized( + docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2.class, docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2.Builder.class); + } + + public static akka.protobuf.Parser PARSER = + new akka.protobuf.AbstractParser() { + public TwoPhaseSet2 parsePartialFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return new TwoPhaseSet2(input, extensionRegistry); + } + }; + + @java.lang.Override + public akka.protobuf.Parser getParserForType() { + return PARSER; + } + + private int bitField0_; + // optional bytes adds = 1; + public static final int ADDS_FIELD_NUMBER = 1; + private akka.protobuf.ByteString adds_; + /** + * optional bytes adds = 1; + */ + public boolean hasAdds() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * optional bytes adds = 1; + */ + public akka.protobuf.ByteString getAdds() { + return adds_; + } + + // optional bytes removals = 2; + public static final int REMOVALS_FIELD_NUMBER = 2; + private akka.protobuf.ByteString removals_; + /** + * optional bytes removals = 2; + */ + public boolean hasRemovals() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * optional bytes removals = 2; + */ + public akka.protobuf.ByteString getRemovals() { + return removals_; + } + + private void initFields() { + adds_ = akka.protobuf.ByteString.EMPTY; + removals_ = akka.protobuf.ByteString.EMPTY; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(akka.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(1, adds_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeBytes(2, removals_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += akka.protobuf.CodedOutputStream + .computeBytesSize(1, adds_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += akka.protobuf.CodedOutputStream + .computeBytesSize(2, removals_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2 parseFrom( + akka.protobuf.ByteString data) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2 parseFrom( + akka.protobuf.ByteString data, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2 parseFrom(byte[] data) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2 parseFrom( + byte[] data, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2 parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2 parseFrom( + java.io.InputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2 parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2 parseDelimitedFrom( + java.io.InputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2 parseFrom( + akka.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2 parseFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2 prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + akka.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code docs.ddata.TwoPhaseSet2} + * + *
+     *#twophaseset2
+     * 
+ */ + public static final class Builder extends + akka.protobuf.GeneratedMessage.Builder + implements docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2OrBuilder { + public static final akka.protobuf.Descriptors.Descriptor + getDescriptor() { + return docs.ddata.protobuf.msg.TwoPhaseSetMessages.internal_static_docs_ddata_TwoPhaseSet2_descriptor; + } + + protected akka.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return docs.ddata.protobuf.msg.TwoPhaseSetMessages.internal_static_docs_ddata_TwoPhaseSet2_fieldAccessorTable + .ensureFieldAccessorsInitialized( + docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2.class, docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2.Builder.class); + } + + // Construct using docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + akka.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + adds_ = akka.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000001); + removals_ = akka.protobuf.ByteString.EMPTY; + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public akka.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return docs.ddata.protobuf.msg.TwoPhaseSetMessages.internal_static_docs_ddata_TwoPhaseSet2_descriptor; + } + + public docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2 getDefaultInstanceForType() { + return docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2.getDefaultInstance(); + } + + public docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2 build() { + docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2 result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2 buildPartial() { + docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2 result = new docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.adds_ = adds_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.removals_ = removals_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(akka.protobuf.Message other) { + if (other instanceof docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2) { + return mergeFrom((docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2 other) { + if (other == docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2.getDefaultInstance()) return this; + if (other.hasAdds()) { + setAdds(other.getAdds()); + } + if (other.hasRemovals()) { + setRemovals(other.getRemovals()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + return true; + } + + public Builder mergeFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2 parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (akka.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (docs.ddata.protobuf.msg.TwoPhaseSetMessages.TwoPhaseSet2) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // optional bytes adds = 1; + private akka.protobuf.ByteString adds_ = akka.protobuf.ByteString.EMPTY; + /** + * optional bytes adds = 1; + */ + public boolean hasAdds() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * optional bytes adds = 1; + */ + public akka.protobuf.ByteString getAdds() { + return adds_; + } + /** + * optional bytes adds = 1; + */ + public Builder setAdds(akka.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + adds_ = value; + onChanged(); + return this; + } + /** + * optional bytes adds = 1; + */ + public Builder clearAdds() { + bitField0_ = (bitField0_ & ~0x00000001); + adds_ = getDefaultInstance().getAdds(); + onChanged(); + return this; + } + + // optional bytes removals = 2; + private akka.protobuf.ByteString removals_ = akka.protobuf.ByteString.EMPTY; + /** + * optional bytes removals = 2; + */ + public boolean hasRemovals() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * optional bytes removals = 2; + */ + public akka.protobuf.ByteString getRemovals() { + return removals_; + } + /** + * optional bytes removals = 2; + */ + public Builder setRemovals(akka.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000002; + removals_ = value; + onChanged(); + return this; + } + /** + * optional bytes removals = 2; + */ + public Builder clearRemovals() { + bitField0_ = (bitField0_ & ~0x00000002); + removals_ = getDefaultInstance().getRemovals(); + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:docs.ddata.TwoPhaseSet2) + } + + static { + defaultInstance = new TwoPhaseSet2(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:docs.ddata.TwoPhaseSet2) + } + + private static akka.protobuf.Descriptors.Descriptor + internal_static_docs_ddata_TwoPhaseSet_descriptor; + private static + akka.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_docs_ddata_TwoPhaseSet_fieldAccessorTable; + private static akka.protobuf.Descriptors.Descriptor + internal_static_docs_ddata_TwoPhaseSet2_descriptor; + private static + akka.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_docs_ddata_TwoPhaseSet2_fieldAccessorTable; + + public static akka.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static akka.protobuf.Descriptors.FileDescriptor + descriptor; + static { + java.lang.String[] descriptorData = { + "\n\031TwoPhaseSetMessages.proto\022\ndocs.ddata\"" + + "-\n\013TwoPhaseSet\022\014\n\004adds\030\001 \003(\t\022\020\n\010removals" + + "\030\002 \003(\t\".\n\014TwoPhaseSet2\022\014\n\004adds\030\001 \001(\014\022\020\n\010" + + "removals\030\002 \001(\014B\033\n\027docs.ddata.protobuf.ms" + + "gH\001" + }; + akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = + new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { + public akka.protobuf.ExtensionRegistry assignDescriptors( + akka.protobuf.Descriptors.FileDescriptor root) { + descriptor = root; + internal_static_docs_ddata_TwoPhaseSet_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_docs_ddata_TwoPhaseSet_fieldAccessorTable = new + akka.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_docs_ddata_TwoPhaseSet_descriptor, + new java.lang.String[] { "Adds", "Removals", }); + internal_static_docs_ddata_TwoPhaseSet2_descriptor = + getDescriptor().getMessageTypes().get(1); + internal_static_docs_ddata_TwoPhaseSet2_fieldAccessorTable = new + akka.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_docs_ddata_TwoPhaseSet2_descriptor, + new java.lang.String[] { "Adds", "Removals", }); + return null; + } + }; + akka.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new akka.protobuf.Descriptors.FileDescriptor[] { + }, assigner); + } + + // @@protoc_insertion_point(outer_class_scope) +} diff --git a/akka-docs/src/main/java/docs/persistence/proto/FlightAppModels.java b/akka-docs/src/main/java/docs/persistence/proto/FlightAppModels.java new file mode 100644 index 0000000000..456eaff742 --- /dev/null +++ b/akka-docs/src/main/java/docs/persistence/proto/FlightAppModels.java @@ -0,0 +1,823 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: FlightAppModels.proto + +package docs.persistence.proto; + +public final class FlightAppModels { + private FlightAppModels() {} + public static void registerAllExtensions( + akka.protobuf.ExtensionRegistry registry) { + } + public interface SeatReservedOrBuilder + extends akka.protobuf.MessageOrBuilder { + + // required string letter = 1; + /** + * required string letter = 1; + */ + boolean hasLetter(); + /** + * required string letter = 1; + */ + java.lang.String getLetter(); + /** + * required string letter = 1; + */ + akka.protobuf.ByteString + getLetterBytes(); + + // required uint32 row = 2; + /** + * required uint32 row = 2; + */ + boolean hasRow(); + /** + * required uint32 row = 2; + */ + int getRow(); + + // optional string seatType = 3; + /** + * optional string seatType = 3; + * + *
+     * the new field
+     * 
+ */ + boolean hasSeatType(); + /** + * optional string seatType = 3; + * + *
+     * the new field
+     * 
+ */ + java.lang.String getSeatType(); + /** + * optional string seatType = 3; + * + *
+     * the new field
+     * 
+ */ + akka.protobuf.ByteString + getSeatTypeBytes(); + } + /** + * Protobuf type {@code docs.persistence.SeatReserved} + */ + public static final class SeatReserved extends + akka.protobuf.GeneratedMessage + implements SeatReservedOrBuilder { + // Use SeatReserved.newBuilder() to construct. + private SeatReserved(akka.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private SeatReserved(boolean noInit) { this.unknownFields = akka.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final SeatReserved defaultInstance; + public static SeatReserved getDefaultInstance() { + return defaultInstance; + } + + public SeatReserved getDefaultInstanceForType() { + return defaultInstance; + } + + private final akka.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final akka.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private SeatReserved( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + akka.protobuf.UnknownFieldSet.Builder unknownFields = + akka.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 10: { + bitField0_ |= 0x00000001; + letter_ = input.readBytes(); + break; + } + case 16: { + bitField0_ |= 0x00000002; + row_ = input.readUInt32(); + break; + } + case 26: { + bitField0_ |= 0x00000004; + seatType_ = input.readBytes(); + break; + } + } + } + } catch (akka.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new akka.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final akka.protobuf.Descriptors.Descriptor + getDescriptor() { + return docs.persistence.proto.FlightAppModels.internal_static_docs_persistence_SeatReserved_descriptor; + } + + protected akka.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return docs.persistence.proto.FlightAppModels.internal_static_docs_persistence_SeatReserved_fieldAccessorTable + .ensureFieldAccessorsInitialized( + docs.persistence.proto.FlightAppModels.SeatReserved.class, docs.persistence.proto.FlightAppModels.SeatReserved.Builder.class); + } + + public static akka.protobuf.Parser PARSER = + new akka.protobuf.AbstractParser() { + public SeatReserved parsePartialFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return new SeatReserved(input, extensionRegistry); + } + }; + + @java.lang.Override + public akka.protobuf.Parser getParserForType() { + return PARSER; + } + + private int bitField0_; + // required string letter = 1; + public static final int LETTER_FIELD_NUMBER = 1; + private java.lang.Object letter_; + /** + * required string letter = 1; + */ + public boolean hasLetter() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required string letter = 1; + */ + public java.lang.String getLetter() { + java.lang.Object ref = letter_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + akka.protobuf.ByteString bs = + (akka.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + letter_ = s; + } + return s; + } + } + /** + * required string letter = 1; + */ + public akka.protobuf.ByteString + getLetterBytes() { + java.lang.Object ref = letter_; + if (ref instanceof java.lang.String) { + akka.protobuf.ByteString b = + akka.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + letter_ = b; + return b; + } else { + return (akka.protobuf.ByteString) ref; + } + } + + // required uint32 row = 2; + public static final int ROW_FIELD_NUMBER = 2; + private int row_; + /** + * required uint32 row = 2; + */ + public boolean hasRow() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * required uint32 row = 2; + */ + public int getRow() { + return row_; + } + + // optional string seatType = 3; + public static final int SEATTYPE_FIELD_NUMBER = 3; + private java.lang.Object seatType_; + /** + * optional string seatType = 3; + * + *
+     * the new field
+     * 
+ */ + public boolean hasSeatType() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional string seatType = 3; + * + *
+     * the new field
+     * 
+ */ + public java.lang.String getSeatType() { + java.lang.Object ref = seatType_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + akka.protobuf.ByteString bs = + (akka.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + seatType_ = s; + } + return s; + } + } + /** + * optional string seatType = 3; + * + *
+     * the new field
+     * 
+ */ + public akka.protobuf.ByteString + getSeatTypeBytes() { + java.lang.Object ref = seatType_; + if (ref instanceof java.lang.String) { + akka.protobuf.ByteString b = + akka.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + seatType_ = b; + return b; + } else { + return (akka.protobuf.ByteString) ref; + } + } + + private void initFields() { + letter_ = ""; + row_ = 0; + seatType_ = ""; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasLetter()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasRow()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(akka.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(1, getLetterBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeUInt32(2, row_); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeBytes(3, getSeatTypeBytes()); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += akka.protobuf.CodedOutputStream + .computeBytesSize(1, getLetterBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += akka.protobuf.CodedOutputStream + .computeUInt32Size(2, row_); + } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += akka.protobuf.CodedOutputStream + .computeBytesSize(3, getSeatTypeBytes()); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static docs.persistence.proto.FlightAppModels.SeatReserved parseFrom( + akka.protobuf.ByteString data) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static docs.persistence.proto.FlightAppModels.SeatReserved parseFrom( + akka.protobuf.ByteString data, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static docs.persistence.proto.FlightAppModels.SeatReserved parseFrom(byte[] data) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static docs.persistence.proto.FlightAppModels.SeatReserved parseFrom( + byte[] data, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static docs.persistence.proto.FlightAppModels.SeatReserved parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static docs.persistence.proto.FlightAppModels.SeatReserved parseFrom( + java.io.InputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static docs.persistence.proto.FlightAppModels.SeatReserved parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static docs.persistence.proto.FlightAppModels.SeatReserved parseDelimitedFrom( + java.io.InputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static docs.persistence.proto.FlightAppModels.SeatReserved parseFrom( + akka.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static docs.persistence.proto.FlightAppModels.SeatReserved parseFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(docs.persistence.proto.FlightAppModels.SeatReserved prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + akka.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code docs.persistence.SeatReserved} + */ + public static final class Builder extends + akka.protobuf.GeneratedMessage.Builder + implements docs.persistence.proto.FlightAppModels.SeatReservedOrBuilder { + public static final akka.protobuf.Descriptors.Descriptor + getDescriptor() { + return docs.persistence.proto.FlightAppModels.internal_static_docs_persistence_SeatReserved_descriptor; + } + + protected akka.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return docs.persistence.proto.FlightAppModels.internal_static_docs_persistence_SeatReserved_fieldAccessorTable + .ensureFieldAccessorsInitialized( + docs.persistence.proto.FlightAppModels.SeatReserved.class, docs.persistence.proto.FlightAppModels.SeatReserved.Builder.class); + } + + // Construct using docs.persistence.proto.FlightAppModels.SeatReserved.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + akka.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + letter_ = ""; + bitField0_ = (bitField0_ & ~0x00000001); + row_ = 0; + bitField0_ = (bitField0_ & ~0x00000002); + seatType_ = ""; + bitField0_ = (bitField0_ & ~0x00000004); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public akka.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return docs.persistence.proto.FlightAppModels.internal_static_docs_persistence_SeatReserved_descriptor; + } + + public docs.persistence.proto.FlightAppModels.SeatReserved getDefaultInstanceForType() { + return docs.persistence.proto.FlightAppModels.SeatReserved.getDefaultInstance(); + } + + public docs.persistence.proto.FlightAppModels.SeatReserved build() { + docs.persistence.proto.FlightAppModels.SeatReserved result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public docs.persistence.proto.FlightAppModels.SeatReserved buildPartial() { + docs.persistence.proto.FlightAppModels.SeatReserved result = new docs.persistence.proto.FlightAppModels.SeatReserved(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.letter_ = letter_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.row_ = row_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.seatType_ = seatType_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(akka.protobuf.Message other) { + if (other instanceof docs.persistence.proto.FlightAppModels.SeatReserved) { + return mergeFrom((docs.persistence.proto.FlightAppModels.SeatReserved)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(docs.persistence.proto.FlightAppModels.SeatReserved other) { + if (other == docs.persistence.proto.FlightAppModels.SeatReserved.getDefaultInstance()) return this; + if (other.hasLetter()) { + bitField0_ |= 0x00000001; + letter_ = other.letter_; + onChanged(); + } + if (other.hasRow()) { + setRow(other.getRow()); + } + if (other.hasSeatType()) { + bitField0_ |= 0x00000004; + seatType_ = other.seatType_; + onChanged(); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasLetter()) { + + return false; + } + if (!hasRow()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + docs.persistence.proto.FlightAppModels.SeatReserved parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (akka.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (docs.persistence.proto.FlightAppModels.SeatReserved) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // required string letter = 1; + private java.lang.Object letter_ = ""; + /** + * required string letter = 1; + */ + public boolean hasLetter() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required string letter = 1; + */ + public java.lang.String getLetter() { + java.lang.Object ref = letter_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((akka.protobuf.ByteString) ref) + .toStringUtf8(); + letter_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * required string letter = 1; + */ + public akka.protobuf.ByteString + getLetterBytes() { + java.lang.Object ref = letter_; + if (ref instanceof String) { + akka.protobuf.ByteString b = + akka.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + letter_ = b; + return b; + } else { + return (akka.protobuf.ByteString) ref; + } + } + /** + * required string letter = 1; + */ + public Builder setLetter( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + letter_ = value; + onChanged(); + return this; + } + /** + * required string letter = 1; + */ + public Builder clearLetter() { + bitField0_ = (bitField0_ & ~0x00000001); + letter_ = getDefaultInstance().getLetter(); + onChanged(); + return this; + } + /** + * required string letter = 1; + */ + public Builder setLetterBytes( + akka.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + letter_ = value; + onChanged(); + return this; + } + + // required uint32 row = 2; + private int row_ ; + /** + * required uint32 row = 2; + */ + public boolean hasRow() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * required uint32 row = 2; + */ + public int getRow() { + return row_; + } + /** + * required uint32 row = 2; + */ + public Builder setRow(int value) { + bitField0_ |= 0x00000002; + row_ = value; + onChanged(); + return this; + } + /** + * required uint32 row = 2; + */ + public Builder clearRow() { + bitField0_ = (bitField0_ & ~0x00000002); + row_ = 0; + onChanged(); + return this; + } + + // optional string seatType = 3; + private java.lang.Object seatType_ = ""; + /** + * optional string seatType = 3; + * + *
+       * the new field
+       * 
+ */ + public boolean hasSeatType() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + /** + * optional string seatType = 3; + * + *
+       * the new field
+       * 
+ */ + public java.lang.String getSeatType() { + java.lang.Object ref = seatType_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((akka.protobuf.ByteString) ref) + .toStringUtf8(); + seatType_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * optional string seatType = 3; + * + *
+       * the new field
+       * 
+ */ + public akka.protobuf.ByteString + getSeatTypeBytes() { + java.lang.Object ref = seatType_; + if (ref instanceof String) { + akka.protobuf.ByteString b = + akka.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + seatType_ = b; + return b; + } else { + return (akka.protobuf.ByteString) ref; + } + } + /** + * optional string seatType = 3; + * + *
+       * the new field
+       * 
+ */ + public Builder setSeatType( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000004; + seatType_ = value; + onChanged(); + return this; + } + /** + * optional string seatType = 3; + * + *
+       * the new field
+       * 
+ */ + public Builder clearSeatType() { + bitField0_ = (bitField0_ & ~0x00000004); + seatType_ = getDefaultInstance().getSeatType(); + onChanged(); + return this; + } + /** + * optional string seatType = 3; + * + *
+       * the new field
+       * 
+ */ + public Builder setSeatTypeBytes( + akka.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000004; + seatType_ = value; + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:docs.persistence.SeatReserved) + } + + static { + defaultInstance = new SeatReserved(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:docs.persistence.SeatReserved) + } + + private static akka.protobuf.Descriptors.Descriptor + internal_static_docs_persistence_SeatReserved_descriptor; + private static + akka.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_docs_persistence_SeatReserved_fieldAccessorTable; + + public static akka.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static akka.protobuf.Descriptors.FileDescriptor + descriptor; + static { + java.lang.String[] descriptorData = { + "\n\025FlightAppModels.proto\022\020docs.persistenc" + + "e\"=\n\014SeatReserved\022\016\n\006letter\030\001 \002(\t\022\013\n\003row" + + "\030\002 \002(\r\022\020\n\010seatType\030\003 \001(\tB\032\n\026docs.persist" + + "ence.protoH\001" + }; + akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = + new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { + public akka.protobuf.ExtensionRegistry assignDescriptors( + akka.protobuf.Descriptors.FileDescriptor root) { + descriptor = root; + internal_static_docs_persistence_SeatReserved_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_docs_persistence_SeatReserved_fieldAccessorTable = new + akka.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_docs_persistence_SeatReserved_descriptor, + new java.lang.String[] { "Letter", "Row", "SeatType", }); + return null; + } + }; + akka.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new akka.protobuf.Descriptors.FileDescriptor[] { + }, assigner); + } + + // @@protoc_insertion_point(outer_class_scope) +} diff --git a/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java b/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java index ba11b1848c..eb154b5b96 100644 --- a/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java +++ b/akka-persistence/src/main/java/akka/persistence/serialization/MessageFormats.java @@ -4282,14 +4282,26 @@ public final class MessageFormats { // optional string timeout = 2; /** * optional string timeout = 2; + * + *
+     *not used in new records from 2.4.5
+     * 
*/ boolean hasTimeout(); /** * optional string timeout = 2; + * + *
+     *not used in new records from 2.4.5
+     * 
*/ java.lang.String getTimeout(); /** * optional string timeout = 2; + * + *
+     *not used in new records from 2.4.5
+     * 
*/ akka.protobuf.ByteString getTimeoutBytes(); @@ -4458,12 +4470,20 @@ public final class MessageFormats { private java.lang.Object timeout_; /** * optional string timeout = 2; + * + *
+     *not used in new records from 2.4.5
+     * 
*/ public boolean hasTimeout() { return ((bitField0_ & 0x00000002) == 0x00000002); } /** * optional string timeout = 2; + * + *
+     *not used in new records from 2.4.5
+     * 
*/ public java.lang.String getTimeout() { java.lang.Object ref = timeout_; @@ -4481,6 +4501,10 @@ public final class MessageFormats { } /** * optional string timeout = 2; + * + *
+     *not used in new records from 2.4.5
+     * 
*/ public akka.protobuf.ByteString getTimeoutBytes() { @@ -4863,12 +4887,20 @@ public final class MessageFormats { private java.lang.Object timeout_ = ""; /** * optional string timeout = 2; + * + *
+       *not used in new records from 2.4.5
+       * 
*/ public boolean hasTimeout() { return ((bitField0_ & 0x00000002) == 0x00000002); } /** * optional string timeout = 2; + * + *
+       *not used in new records from 2.4.5
+       * 
*/ public java.lang.String getTimeout() { java.lang.Object ref = timeout_; @@ -4883,6 +4915,10 @@ public final class MessageFormats { } /** * optional string timeout = 2; + * + *
+       *not used in new records from 2.4.5
+       * 
*/ public akka.protobuf.ByteString getTimeoutBytes() { @@ -4899,6 +4935,10 @@ public final class MessageFormats { } /** * optional string timeout = 2; + * + *
+       *not used in new records from 2.4.5
+       * 
*/ public Builder setTimeout( java.lang.String value) { @@ -4912,6 +4952,10 @@ public final class MessageFormats { } /** * optional string timeout = 2; + * + *
+       *not used in new records from 2.4.5
+       * 
*/ public Builder clearTimeout() { bitField0_ = (bitField0_ & ~0x00000002); @@ -4921,6 +4965,10 @@ public final class MessageFormats { } /** * optional string timeout = 2; + * + *
+       *not used in new records from 2.4.5
+       * 
*/ public Builder setTimeoutBytes( akka.protobuf.ByteString value) { diff --git a/akka-remote/src/main/scala/akka/remote/artery/SendQueue.scala b/akka-remote/src/main/scala/akka/remote/artery/SendQueue.scala index 0ff81be065..cf2c1a3107 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SendQueue.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SendQueue.scala @@ -87,8 +87,7 @@ private[remote] final class SendQueue[T] extends GraphStageWithMaterializedValue needWakeup = true // additional poll() to grab any elements that might missed the needWakeup // and have been enqueued just after it - if (firstAttempt) - tryPush(firstAttempt = false) + if (firstAttempt) tryPush(firstAttempt = false) case elem ⇒ needWakeup = false // there will be another onPull push(out, elem) diff --git a/akka-stream-tests/src/test/scala/akka/stream/remote/StreamRefsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/remote/StreamRefsSpec.scala new file mode 100644 index 0000000000..ea8ffd59cc --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/remote/StreamRefsSpec.scala @@ -0,0 +1,274 @@ +/** + * Copyright (C) 2014-2017 Lightbend Inc. + */ +package akka.stream.remote + +import akka.NotUsed +import akka.actor.{ Actor, ActorIdentity, ActorLogging, ActorRef, ActorSystem, ActorSystemImpl, Identify, Props } +import akka.stream.ActorMaterializer +import akka.stream.remote.scaladsl.{ SinkRef, SourceRef } +import akka.stream.scaladsl.{ Sink, Source } +import akka.testkit.{ AkkaSpec, ImplicitSender, SocketUtil, TestKit, TestProbe } +import akka.util.ByteString +import com.typesafe.config._ + +import scala.concurrent.duration._ +import scala.concurrent.{ Await, Future } + +object StreamRefsSpec { + + object DatasourceActor { + def props(probe: ActorRef): Props = + Props(new DatasourceActor(probe)) + .withDispatcher("akka.test.stream-dispatcher") + } + + class DatasourceActor(probe: ActorRef) extends Actor with ActorLogging { + implicit val mat = ActorMaterializer() + + def receive = { + case "give" ⇒ + /* + * Here we're able to send a source to a remote recipient + * + * For them it's a Source; for us it is a Sink we run data "into" + */ + val source: Source[String, NotUsed] = Source(List("hello", "world")) + val ref: Future[SourceRef[String]] = source.runWith(SourceRef.sink()) + + println(s"source = ${source}") + println(s"ref = ${Await.result(ref, 10.seconds)}") + + sender() ! Await.result(ref, 10.seconds) + + // case "send-bulk" ⇒ + // /* + // * Here we're able to send a source to a remote recipient + // * The source is a "bulk transfer one, in which we're ready to send a lot of data" + // * + // * For them it's a Source; for us it is a Sink we run data "into" + // */ + // val source: Source[ByteString, NotUsed] = Source.single(ByteString("huge-file-")) + // val ref: SourceRef[ByteString] = source.runWith(SourceRef.bulkTransfer()) + // sender() ! BulkSourceMsg(ref) + + case "receive" ⇒ + /* + * We write out code, knowing that the other side will stream the data into it. + * + * For them it's a Sink; for us it's a Source. + */ + val sink: Future[SinkRef[String]] = + SinkRef.source[String] + .to(Sink.actorRef(probe, "")) + .run() + + // FIXME we want to avoid forcing people to do the Future here + sender() ! Await.result(sink, 10.seconds) + + // case "receive-bulk" ⇒ + // /* + // * We write out code, knowing that the other side will stream the data into it. + // * This will open a dedicated connection per transfer. + // * + // * For them it's a Sink; for us it's a Source. + // */ + // val sink: SinkRef[ByteString] = + // SinkRef.bulkTransferSource() + // .to(Sink.actorRef(probe, "")) + // .run() + // + // + // sender() ! BulkSinkMsg(sink) + } + + } + + // ------------------------- + + final case class SourceMsg(dataSource: SourceRef[String]) + final case class BulkSourceMsg(dataSource: SourceRef[ByteString]) + final case class SinkMsg(dataSink: SinkRef[String]) + final case class BulkSinkMsg(dataSink: SinkRef[ByteString]) + + def config(): Config = { + val address = SocketUtil.temporaryServerAddress() + ConfigFactory.parseString( + s""" + akka { + loglevel = INFO + + actor { + provider = remote + serialize-messages = off + +// serializers { +// akka-stream-ref-test = "akka.stream.remote.StreamRefsSpecSerializer" +// } +// +// serialization-bindings { +// "akka.stream.remote.StreamRefsSpec$$SourceMsg" = akka-stream-ref-test +// "akka.stream.remote.StreamRefsSpec$$BulkSourceMsg" = akka-stream-ref-test +// "akka.stream.remote.StreamRefsSpec$$SinkMsg" = akka-stream-ref-test +// "akka.stream.remote.StreamRefsSpec$$BulkSinkMsg" = akka-stream-ref-test +// } +// +// serialization-identifiers { +// "akka.stream.remote.StreamRefsSpecSerializer" = 33 +// } + + } + + remote.netty.tcp { + port = ${address.getPort} + hostname = "${address.getHostName}" + } + } + """).withFallback(ConfigFactory.load()) + } +} + +class StreamRefsSpec(config: Config) extends AkkaSpec(config) with ImplicitSender { + import StreamRefsSpec._ + + def this() { + this(StreamRefsSpec.config()) + } + + val remoteSystem = ActorSystem("RemoteSystem", StreamRefsSpec.config()) + implicit val mat = ActorMaterializer() + + override protected def beforeTermination(): Unit = + TestKit.shutdownActorSystem(remoteSystem) + + val p = TestProbe() + + // obtain the remoteActor ref via selection in order to use _real_ remoting in this test + val remoteActor = { + val it = remoteSystem.actorOf(DatasourceActor.props(p.ref), "remoteActor") + val remoteAddress = remoteSystem.asInstanceOf[ActorSystemImpl].provider.getDefaultAddress + system.actorSelection(it.path.toStringWithAddress(remoteAddress)) ! Identify("hi") + expectMsgType[ActorIdentity].ref.get + } + + "A SourceRef" must { + + "send messages via remoting" in { + remoteActor ! "give" + val sourceRef = expectMsgType[SourceRef[String]] + + Source.fromGraph(sourceRef) + .log("RECEIVED") + .runWith(Sink.actorRef(p.ref, "")) + + p.expectMsg("hello") + p.expectMsg("world") + p.expectMsg("") + } + + } + + "A SinkRef" must { + + "receive elements via remoting" in { + + remoteActor ! "receive" + val remoteSink: SinkRef[String] = expectMsgType[SinkRef[String]] + + Source("hello" :: "world" :: Nil) + .to(remoteSink) + .run() + + p.expectMsg("hello") + p.expectMsg("world") + p.expectMsg("") + } + + "fail origin if remote Sink gets a failure" in { + + remoteActor ! "receive" + val remoteSink: SinkRef[String] = expectMsgType[SinkRef[String]] + + val remoteFailureMessage = "Booom!" + Source.failed(new Exception(remoteFailureMessage)) + .to(remoteSink) + .run() + + val f = p.expectMsgType[akka.actor.Status.Failure] + f.cause.getMessage should ===(s"Remote Sink failed, reason: $remoteFailureMessage") + } + + "receive hundreds of elements via remoting" in { + remoteActor ! "receive" + val remoteSink: SinkRef[String] = expectMsgType[SinkRef[String]] + + val msgs = (1 to 100).toList.map(i ⇒ s"payload-$i") + + Source(msgs) + .to(remoteSink) + .run() + + msgs.foreach(t ⇒ p.expectMsg(t)) + p.expectMsg("") + } + + // "fail origin if remote Sink is stopped abruptly" in { + // val otherSystem = ActorSystem("OtherRemoteSystem", StreamRefsSpec.config()) + // + // try { + // // obtain the remoteActor ref via selection in order to use _real_ remoting in this test + // val remoteActor = { + // val it = otherSystem.actorOf(DatasourceActor.props(p.ref), "remoteActor") + // val remoteAddress = otherSystem.asInstanceOf[ActorSystemImpl].provider.getDefaultAddress + // system.actorSelection(it.path.toStringWithAddress(remoteAddress)) ! Identify("hi") + // expectMsgType[ActorIdentity].ref.get + // } + // + // remoteActor ! "receive" + // val remoteSink: SinkRef[String] = expectMsgType[SinkRef[String]] + // + // val otherMat = ActorMaterializer()(otherSystem) + // Source.maybe[String] // not emitting anything + // .to(remoteSink) + // .run()(otherMat) + // + // // and the system crashes; which should cause abrupt termination in the stream + // Thread.sleep(300) + // otherMat.shutdown() + // + // val f = p.expectMsgType[akka.actor.Status.Failure] + // f.cause.getMessage should ===(s"Remote Sink failed, reason:") + // } finally TestKit.shutdownActorSystem(otherSystem) + // } + + } + +} +// +//class StreamRefsSpecSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest with BaseSerializer { +// +// lazy val ext = SerializationExtension(system) +// +// override def manifest(o: AnyRef): String = o match { +// case StreamRefsSpec.SinkMsg(_) ⇒ "si" +// case StreamRefsSpec.BulkSinkMsg(_) ⇒ "bsi" +// case StreamRefsSpec.SourceMsg(_) ⇒ "so" +// case StreamRefsSpec.BulkSourceMsg(_) ⇒ "bso" +// } +// +// override def toBinary(o: AnyRef): Array[Byte] = { +// system.log.warning("Serializing: " + o) +// o match { +// case StreamRefsSpec.SinkMsg(s) ⇒ s. +// case StreamRefsSpec.BulkSinkMsg(s) ⇒ ext.serialize(s).get +// case StreamRefsSpec.SourceMsg(s) ⇒ ext.serialize(s).get +// case StreamRefsSpec.BulkSourceMsg(s) ⇒ ext.serialize(s).get +// } +// } +// +// override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = { +// system.log.warning("MANI: " + manifest) +// ??? +// } +// +//} diff --git a/akka-stream/src/main/java/akka/stream/remote/StreamRefContainers.java b/akka-stream/src/main/java/akka/stream/remote/StreamRefContainers.java new file mode 100644 index 0000000000..8175eefff9 --- /dev/null +++ b/akka-stream/src/main/java/akka/stream/remote/StreamRefContainers.java @@ -0,0 +1,4746 @@ +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: StreamRefContainers.proto + +package akka.stream.remote; + +public final class StreamRefContainers { + private StreamRefContainers() {} + public static void registerAllExtensions( + akka.protobuf.ExtensionRegistry registry) { + } + public interface SinkRefOrBuilder + extends akka.protobuf.MessageOrBuilder { + + // required .ActorRef targetRef = 1; + /** + * required .ActorRef targetRef = 1; + */ + boolean hasTargetRef(); + /** + * required .ActorRef targetRef = 1; + */ + akka.stream.remote.StreamRefContainers.ActorRef getTargetRef(); + /** + * required .ActorRef targetRef = 1; + */ + akka.stream.remote.StreamRefContainers.ActorRefOrBuilder getTargetRefOrBuilder(); + + // optional int64 initialDemand = 2; + /** + * optional int64 initialDemand = 2; + */ + boolean hasInitialDemand(); + /** + * optional int64 initialDemand = 2; + */ + long getInitialDemand(); + } + /** + * Protobuf type {@code SinkRef} + */ + public static final class SinkRef extends + akka.protobuf.GeneratedMessage + implements SinkRefOrBuilder { + // Use SinkRef.newBuilder() to construct. + private SinkRef(akka.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private SinkRef(boolean noInit) { this.unknownFields = akka.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final SinkRef defaultInstance; + public static SinkRef getDefaultInstance() { + return defaultInstance; + } + + public SinkRef getDefaultInstanceForType() { + return defaultInstance; + } + + private final akka.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final akka.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private SinkRef( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + akka.protobuf.UnknownFieldSet.Builder unknownFields = + akka.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 10: { + akka.stream.remote.StreamRefContainers.ActorRef.Builder subBuilder = null; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + subBuilder = targetRef_.toBuilder(); + } + targetRef_ = input.readMessage(akka.stream.remote.StreamRefContainers.ActorRef.PARSER, extensionRegistry); + if (subBuilder != null) { + subBuilder.mergeFrom(targetRef_); + targetRef_ = subBuilder.buildPartial(); + } + bitField0_ |= 0x00000001; + break; + } + case 16: { + bitField0_ |= 0x00000002; + initialDemand_ = input.readInt64(); + break; + } + } + } + } catch (akka.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new akka.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final akka.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.stream.remote.StreamRefContainers.internal_static_SinkRef_descriptor; + } + + protected akka.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.stream.remote.StreamRefContainers.internal_static_SinkRef_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.stream.remote.StreamRefContainers.SinkRef.class, akka.stream.remote.StreamRefContainers.SinkRef.Builder.class); + } + + public static akka.protobuf.Parser PARSER = + new akka.protobuf.AbstractParser() { + public SinkRef parsePartialFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return new SinkRef(input, extensionRegistry); + } + }; + + @java.lang.Override + public akka.protobuf.Parser getParserForType() { + return PARSER; + } + + private int bitField0_; + // required .ActorRef targetRef = 1; + public static final int TARGETREF_FIELD_NUMBER = 1; + private akka.stream.remote.StreamRefContainers.ActorRef targetRef_; + /** + * required .ActorRef targetRef = 1; + */ + public boolean hasTargetRef() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required .ActorRef targetRef = 1; + */ + public akka.stream.remote.StreamRefContainers.ActorRef getTargetRef() { + return targetRef_; + } + /** + * required .ActorRef targetRef = 1; + */ + public akka.stream.remote.StreamRefContainers.ActorRefOrBuilder getTargetRefOrBuilder() { + return targetRef_; + } + + // optional int64 initialDemand = 2; + public static final int INITIALDEMAND_FIELD_NUMBER = 2; + private long initialDemand_; + /** + * optional int64 initialDemand = 2; + */ + public boolean hasInitialDemand() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * optional int64 initialDemand = 2; + */ + public long getInitialDemand() { + return initialDemand_; + } + + private void initFields() { + targetRef_ = akka.stream.remote.StreamRefContainers.ActorRef.getDefaultInstance(); + initialDemand_ = 0L; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasTargetRef()) { + memoizedIsInitialized = 0; + return false; + } + if (!getTargetRef().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(akka.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeMessage(1, targetRef_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeInt64(2, initialDemand_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += akka.protobuf.CodedOutputStream + .computeMessageSize(1, targetRef_); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += akka.protobuf.CodedOutputStream + .computeInt64Size(2, initialDemand_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static akka.stream.remote.StreamRefContainers.SinkRef parseFrom( + akka.protobuf.ByteString data) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.stream.remote.StreamRefContainers.SinkRef parseFrom( + akka.protobuf.ByteString data, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.stream.remote.StreamRefContainers.SinkRef parseFrom(byte[] data) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.stream.remote.StreamRefContainers.SinkRef parseFrom( + byte[] data, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.stream.remote.StreamRefContainers.SinkRef parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.stream.remote.StreamRefContainers.SinkRef parseFrom( + java.io.InputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static akka.stream.remote.StreamRefContainers.SinkRef parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static akka.stream.remote.StreamRefContainers.SinkRef parseDelimitedFrom( + java.io.InputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static akka.stream.remote.StreamRefContainers.SinkRef parseFrom( + akka.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.stream.remote.StreamRefContainers.SinkRef parseFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(akka.stream.remote.StreamRefContainers.SinkRef prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + akka.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code SinkRef} + */ + public static final class Builder extends + akka.protobuf.GeneratedMessage.Builder + implements akka.stream.remote.StreamRefContainers.SinkRefOrBuilder { + public static final akka.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.stream.remote.StreamRefContainers.internal_static_SinkRef_descriptor; + } + + protected akka.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.stream.remote.StreamRefContainers.internal_static_SinkRef_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.stream.remote.StreamRefContainers.SinkRef.class, akka.stream.remote.StreamRefContainers.SinkRef.Builder.class); + } + + // Construct using akka.stream.remote.StreamRefContainers.SinkRef.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + akka.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + getTargetRefFieldBuilder(); + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + if (targetRefBuilder_ == null) { + targetRef_ = akka.stream.remote.StreamRefContainers.ActorRef.getDefaultInstance(); + } else { + targetRefBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000001); + initialDemand_ = 0L; + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public akka.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return akka.stream.remote.StreamRefContainers.internal_static_SinkRef_descriptor; + } + + public akka.stream.remote.StreamRefContainers.SinkRef getDefaultInstanceForType() { + return akka.stream.remote.StreamRefContainers.SinkRef.getDefaultInstance(); + } + + public akka.stream.remote.StreamRefContainers.SinkRef build() { + akka.stream.remote.StreamRefContainers.SinkRef result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public akka.stream.remote.StreamRefContainers.SinkRef buildPartial() { + akka.stream.remote.StreamRefContainers.SinkRef result = new akka.stream.remote.StreamRefContainers.SinkRef(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + if (targetRefBuilder_ == null) { + result.targetRef_ = targetRef_; + } else { + result.targetRef_ = targetRefBuilder_.build(); + } + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.initialDemand_ = initialDemand_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(akka.protobuf.Message other) { + if (other instanceof akka.stream.remote.StreamRefContainers.SinkRef) { + return mergeFrom((akka.stream.remote.StreamRefContainers.SinkRef)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(akka.stream.remote.StreamRefContainers.SinkRef other) { + if (other == akka.stream.remote.StreamRefContainers.SinkRef.getDefaultInstance()) return this; + if (other.hasTargetRef()) { + mergeTargetRef(other.getTargetRef()); + } + if (other.hasInitialDemand()) { + setInitialDemand(other.getInitialDemand()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasTargetRef()) { + + return false; + } + if (!getTargetRef().isInitialized()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + akka.stream.remote.StreamRefContainers.SinkRef parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (akka.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (akka.stream.remote.StreamRefContainers.SinkRef) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // required .ActorRef targetRef = 1; + private akka.stream.remote.StreamRefContainers.ActorRef targetRef_ = akka.stream.remote.StreamRefContainers.ActorRef.getDefaultInstance(); + private akka.protobuf.SingleFieldBuilder< + akka.stream.remote.StreamRefContainers.ActorRef, akka.stream.remote.StreamRefContainers.ActorRef.Builder, akka.stream.remote.StreamRefContainers.ActorRefOrBuilder> targetRefBuilder_; + /** + * required .ActorRef targetRef = 1; + */ + public boolean hasTargetRef() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required .ActorRef targetRef = 1; + */ + public akka.stream.remote.StreamRefContainers.ActorRef getTargetRef() { + if (targetRefBuilder_ == null) { + return targetRef_; + } else { + return targetRefBuilder_.getMessage(); + } + } + /** + * required .ActorRef targetRef = 1; + */ + public Builder setTargetRef(akka.stream.remote.StreamRefContainers.ActorRef value) { + if (targetRefBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + targetRef_ = value; + onChanged(); + } else { + targetRefBuilder_.setMessage(value); + } + bitField0_ |= 0x00000001; + return this; + } + /** + * required .ActorRef targetRef = 1; + */ + public Builder setTargetRef( + akka.stream.remote.StreamRefContainers.ActorRef.Builder builderForValue) { + if (targetRefBuilder_ == null) { + targetRef_ = builderForValue.build(); + onChanged(); + } else { + targetRefBuilder_.setMessage(builderForValue.build()); + } + bitField0_ |= 0x00000001; + return this; + } + /** + * required .ActorRef targetRef = 1; + */ + public Builder mergeTargetRef(akka.stream.remote.StreamRefContainers.ActorRef value) { + if (targetRefBuilder_ == null) { + if (((bitField0_ & 0x00000001) == 0x00000001) && + targetRef_ != akka.stream.remote.StreamRefContainers.ActorRef.getDefaultInstance()) { + targetRef_ = + akka.stream.remote.StreamRefContainers.ActorRef.newBuilder(targetRef_).mergeFrom(value).buildPartial(); + } else { + targetRef_ = value; + } + onChanged(); + } else { + targetRefBuilder_.mergeFrom(value); + } + bitField0_ |= 0x00000001; + return this; + } + /** + * required .ActorRef targetRef = 1; + */ + public Builder clearTargetRef() { + if (targetRefBuilder_ == null) { + targetRef_ = akka.stream.remote.StreamRefContainers.ActorRef.getDefaultInstance(); + onChanged(); + } else { + targetRefBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + /** + * required .ActorRef targetRef = 1; + */ + public akka.stream.remote.StreamRefContainers.ActorRef.Builder getTargetRefBuilder() { + bitField0_ |= 0x00000001; + onChanged(); + return getTargetRefFieldBuilder().getBuilder(); + } + /** + * required .ActorRef targetRef = 1; + */ + public akka.stream.remote.StreamRefContainers.ActorRefOrBuilder getTargetRefOrBuilder() { + if (targetRefBuilder_ != null) { + return targetRefBuilder_.getMessageOrBuilder(); + } else { + return targetRef_; + } + } + /** + * required .ActorRef targetRef = 1; + */ + private akka.protobuf.SingleFieldBuilder< + akka.stream.remote.StreamRefContainers.ActorRef, akka.stream.remote.StreamRefContainers.ActorRef.Builder, akka.stream.remote.StreamRefContainers.ActorRefOrBuilder> + getTargetRefFieldBuilder() { + if (targetRefBuilder_ == null) { + targetRefBuilder_ = new akka.protobuf.SingleFieldBuilder< + akka.stream.remote.StreamRefContainers.ActorRef, akka.stream.remote.StreamRefContainers.ActorRef.Builder, akka.stream.remote.StreamRefContainers.ActorRefOrBuilder>( + targetRef_, + getParentForChildren(), + isClean()); + targetRef_ = null; + } + return targetRefBuilder_; + } + + // optional int64 initialDemand = 2; + private long initialDemand_ ; + /** + * optional int64 initialDemand = 2; + */ + public boolean hasInitialDemand() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * optional int64 initialDemand = 2; + */ + public long getInitialDemand() { + return initialDemand_; + } + /** + * optional int64 initialDemand = 2; + */ + public Builder setInitialDemand(long value) { + bitField0_ |= 0x00000002; + initialDemand_ = value; + onChanged(); + return this; + } + /** + * optional int64 initialDemand = 2; + */ + public Builder clearInitialDemand() { + bitField0_ = (bitField0_ & ~0x00000002); + initialDemand_ = 0L; + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:SinkRef) + } + + static { + defaultInstance = new SinkRef(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:SinkRef) + } + + public interface SourceRefOrBuilder + extends akka.protobuf.MessageOrBuilder { + + // required .ActorRef originRef = 1; + /** + * required .ActorRef originRef = 1; + * + *
+     * FIXME: it's basically SinkRef since we just ommit the optional initial demand...
+     * FIXME: could it be that all those passable refs should be expressed internally as a StreamRef?
+     * 
+ */ + boolean hasOriginRef(); + /** + * required .ActorRef originRef = 1; + * + *
+     * FIXME: it's basically SinkRef since we just ommit the optional initial demand...
+     * FIXME: could it be that all those passable refs should be expressed internally as a StreamRef?
+     * 
+ */ + akka.stream.remote.StreamRefContainers.ActorRef getOriginRef(); + /** + * required .ActorRef originRef = 1; + * + *
+     * FIXME: it's basically SinkRef since we just ommit the optional initial demand...
+     * FIXME: could it be that all those passable refs should be expressed internally as a StreamRef?
+     * 
+ */ + akka.stream.remote.StreamRefContainers.ActorRefOrBuilder getOriginRefOrBuilder(); + } + /** + * Protobuf type {@code SourceRef} + */ + public static final class SourceRef extends + akka.protobuf.GeneratedMessage + implements SourceRefOrBuilder { + // Use SourceRef.newBuilder() to construct. + private SourceRef(akka.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private SourceRef(boolean noInit) { this.unknownFields = akka.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final SourceRef defaultInstance; + public static SourceRef getDefaultInstance() { + return defaultInstance; + } + + public SourceRef getDefaultInstanceForType() { + return defaultInstance; + } + + private final akka.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final akka.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private SourceRef( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + akka.protobuf.UnknownFieldSet.Builder unknownFields = + akka.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 10: { + akka.stream.remote.StreamRefContainers.ActorRef.Builder subBuilder = null; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + subBuilder = originRef_.toBuilder(); + } + originRef_ = input.readMessage(akka.stream.remote.StreamRefContainers.ActorRef.PARSER, extensionRegistry); + if (subBuilder != null) { + subBuilder.mergeFrom(originRef_); + originRef_ = subBuilder.buildPartial(); + } + bitField0_ |= 0x00000001; + break; + } + } + } + } catch (akka.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new akka.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final akka.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.stream.remote.StreamRefContainers.internal_static_SourceRef_descriptor; + } + + protected akka.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.stream.remote.StreamRefContainers.internal_static_SourceRef_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.stream.remote.StreamRefContainers.SourceRef.class, akka.stream.remote.StreamRefContainers.SourceRef.Builder.class); + } + + public static akka.protobuf.Parser PARSER = + new akka.protobuf.AbstractParser() { + public SourceRef parsePartialFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return new SourceRef(input, extensionRegistry); + } + }; + + @java.lang.Override + public akka.protobuf.Parser getParserForType() { + return PARSER; + } + + private int bitField0_; + // required .ActorRef originRef = 1; + public static final int ORIGINREF_FIELD_NUMBER = 1; + private akka.stream.remote.StreamRefContainers.ActorRef originRef_; + /** + * required .ActorRef originRef = 1; + * + *
+     * FIXME: it's basically SinkRef since we just ommit the optional initial demand...
+     * FIXME: could it be that all those passable refs should be expressed internally as a StreamRef?
+     * 
+ */ + public boolean hasOriginRef() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required .ActorRef originRef = 1; + * + *
+     * FIXME: it's basically SinkRef since we just ommit the optional initial demand...
+     * FIXME: could it be that all those passable refs should be expressed internally as a StreamRef?
+     * 
+ */ + public akka.stream.remote.StreamRefContainers.ActorRef getOriginRef() { + return originRef_; + } + /** + * required .ActorRef originRef = 1; + * + *
+     * FIXME: it's basically SinkRef since we just ommit the optional initial demand...
+     * FIXME: could it be that all those passable refs should be expressed internally as a StreamRef?
+     * 
+ */ + public akka.stream.remote.StreamRefContainers.ActorRefOrBuilder getOriginRefOrBuilder() { + return originRef_; + } + + private void initFields() { + originRef_ = akka.stream.remote.StreamRefContainers.ActorRef.getDefaultInstance(); + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasOriginRef()) { + memoizedIsInitialized = 0; + return false; + } + if (!getOriginRef().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(akka.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeMessage(1, originRef_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += akka.protobuf.CodedOutputStream + .computeMessageSize(1, originRef_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static akka.stream.remote.StreamRefContainers.SourceRef parseFrom( + akka.protobuf.ByteString data) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.stream.remote.StreamRefContainers.SourceRef parseFrom( + akka.protobuf.ByteString data, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.stream.remote.StreamRefContainers.SourceRef parseFrom(byte[] data) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.stream.remote.StreamRefContainers.SourceRef parseFrom( + byte[] data, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.stream.remote.StreamRefContainers.SourceRef parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.stream.remote.StreamRefContainers.SourceRef parseFrom( + java.io.InputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static akka.stream.remote.StreamRefContainers.SourceRef parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static akka.stream.remote.StreamRefContainers.SourceRef parseDelimitedFrom( + java.io.InputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static akka.stream.remote.StreamRefContainers.SourceRef parseFrom( + akka.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.stream.remote.StreamRefContainers.SourceRef parseFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(akka.stream.remote.StreamRefContainers.SourceRef prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + akka.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code SourceRef} + */ + public static final class Builder extends + akka.protobuf.GeneratedMessage.Builder + implements akka.stream.remote.StreamRefContainers.SourceRefOrBuilder { + public static final akka.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.stream.remote.StreamRefContainers.internal_static_SourceRef_descriptor; + } + + protected akka.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.stream.remote.StreamRefContainers.internal_static_SourceRef_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.stream.remote.StreamRefContainers.SourceRef.class, akka.stream.remote.StreamRefContainers.SourceRef.Builder.class); + } + + // Construct using akka.stream.remote.StreamRefContainers.SourceRef.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + akka.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + getOriginRefFieldBuilder(); + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + if (originRefBuilder_ == null) { + originRef_ = akka.stream.remote.StreamRefContainers.ActorRef.getDefaultInstance(); + } else { + originRefBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public akka.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return akka.stream.remote.StreamRefContainers.internal_static_SourceRef_descriptor; + } + + public akka.stream.remote.StreamRefContainers.SourceRef getDefaultInstanceForType() { + return akka.stream.remote.StreamRefContainers.SourceRef.getDefaultInstance(); + } + + public akka.stream.remote.StreamRefContainers.SourceRef build() { + akka.stream.remote.StreamRefContainers.SourceRef result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public akka.stream.remote.StreamRefContainers.SourceRef buildPartial() { + akka.stream.remote.StreamRefContainers.SourceRef result = new akka.stream.remote.StreamRefContainers.SourceRef(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + if (originRefBuilder_ == null) { + result.originRef_ = originRef_; + } else { + result.originRef_ = originRefBuilder_.build(); + } + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(akka.protobuf.Message other) { + if (other instanceof akka.stream.remote.StreamRefContainers.SourceRef) { + return mergeFrom((akka.stream.remote.StreamRefContainers.SourceRef)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(akka.stream.remote.StreamRefContainers.SourceRef other) { + if (other == akka.stream.remote.StreamRefContainers.SourceRef.getDefaultInstance()) return this; + if (other.hasOriginRef()) { + mergeOriginRef(other.getOriginRef()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasOriginRef()) { + + return false; + } + if (!getOriginRef().isInitialized()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + akka.stream.remote.StreamRefContainers.SourceRef parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (akka.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (akka.stream.remote.StreamRefContainers.SourceRef) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // required .ActorRef originRef = 1; + private akka.stream.remote.StreamRefContainers.ActorRef originRef_ = akka.stream.remote.StreamRefContainers.ActorRef.getDefaultInstance(); + private akka.protobuf.SingleFieldBuilder< + akka.stream.remote.StreamRefContainers.ActorRef, akka.stream.remote.StreamRefContainers.ActorRef.Builder, akka.stream.remote.StreamRefContainers.ActorRefOrBuilder> originRefBuilder_; + /** + * required .ActorRef originRef = 1; + * + *
+       * FIXME: it's basically SinkRef since we just ommit the optional initial demand...
+       * FIXME: could it be that all those passable refs should be expressed internally as a StreamRef?
+       * 
+ */ + public boolean hasOriginRef() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required .ActorRef originRef = 1; + * + *
+       * FIXME: it's basically SinkRef since we just ommit the optional initial demand...
+       * FIXME: could it be that all those passable refs should be expressed internally as a StreamRef?
+       * 
+ */ + public akka.stream.remote.StreamRefContainers.ActorRef getOriginRef() { + if (originRefBuilder_ == null) { + return originRef_; + } else { + return originRefBuilder_.getMessage(); + } + } + /** + * required .ActorRef originRef = 1; + * + *
+       * FIXME: it's basically SinkRef since we just ommit the optional initial demand...
+       * FIXME: could it be that all those passable refs should be expressed internally as a StreamRef?
+       * 
+ */ + public Builder setOriginRef(akka.stream.remote.StreamRefContainers.ActorRef value) { + if (originRefBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + originRef_ = value; + onChanged(); + } else { + originRefBuilder_.setMessage(value); + } + bitField0_ |= 0x00000001; + return this; + } + /** + * required .ActorRef originRef = 1; + * + *
+       * FIXME: it's basically SinkRef since we just ommit the optional initial demand...
+       * FIXME: could it be that all those passable refs should be expressed internally as a StreamRef?
+       * 
+ */ + public Builder setOriginRef( + akka.stream.remote.StreamRefContainers.ActorRef.Builder builderForValue) { + if (originRefBuilder_ == null) { + originRef_ = builderForValue.build(); + onChanged(); + } else { + originRefBuilder_.setMessage(builderForValue.build()); + } + bitField0_ |= 0x00000001; + return this; + } + /** + * required .ActorRef originRef = 1; + * + *
+       * FIXME: it's basically SinkRef since we just ommit the optional initial demand...
+       * FIXME: could it be that all those passable refs should be expressed internally as a StreamRef?
+       * 
+ */ + public Builder mergeOriginRef(akka.stream.remote.StreamRefContainers.ActorRef value) { + if (originRefBuilder_ == null) { + if (((bitField0_ & 0x00000001) == 0x00000001) && + originRef_ != akka.stream.remote.StreamRefContainers.ActorRef.getDefaultInstance()) { + originRef_ = + akka.stream.remote.StreamRefContainers.ActorRef.newBuilder(originRef_).mergeFrom(value).buildPartial(); + } else { + originRef_ = value; + } + onChanged(); + } else { + originRefBuilder_.mergeFrom(value); + } + bitField0_ |= 0x00000001; + return this; + } + /** + * required .ActorRef originRef = 1; + * + *
+       * FIXME: it's basically SinkRef since we just ommit the optional initial demand...
+       * FIXME: could it be that all those passable refs should be expressed internally as a StreamRef?
+       * 
+ */ + public Builder clearOriginRef() { + if (originRefBuilder_ == null) { + originRef_ = akka.stream.remote.StreamRefContainers.ActorRef.getDefaultInstance(); + onChanged(); + } else { + originRefBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + /** + * required .ActorRef originRef = 1; + * + *
+       * FIXME: it's basically SinkRef since we just ommit the optional initial demand...
+       * FIXME: could it be that all those passable refs should be expressed internally as a StreamRef?
+       * 
+ */ + public akka.stream.remote.StreamRefContainers.ActorRef.Builder getOriginRefBuilder() { + bitField0_ |= 0x00000001; + onChanged(); + return getOriginRefFieldBuilder().getBuilder(); + } + /** + * required .ActorRef originRef = 1; + * + *
+       * FIXME: it's basically SinkRef since we just ommit the optional initial demand...
+       * FIXME: could it be that all those passable refs should be expressed internally as a StreamRef?
+       * 
+ */ + public akka.stream.remote.StreamRefContainers.ActorRefOrBuilder getOriginRefOrBuilder() { + if (originRefBuilder_ != null) { + return originRefBuilder_.getMessageOrBuilder(); + } else { + return originRef_; + } + } + /** + * required .ActorRef originRef = 1; + * + *
+       * FIXME: it's basically SinkRef since we just ommit the optional initial demand...
+       * FIXME: could it be that all those passable refs should be expressed internally as a StreamRef?
+       * 
+ */ + private akka.protobuf.SingleFieldBuilder< + akka.stream.remote.StreamRefContainers.ActorRef, akka.stream.remote.StreamRefContainers.ActorRef.Builder, akka.stream.remote.StreamRefContainers.ActorRefOrBuilder> + getOriginRefFieldBuilder() { + if (originRefBuilder_ == null) { + originRefBuilder_ = new akka.protobuf.SingleFieldBuilder< + akka.stream.remote.StreamRefContainers.ActorRef, akka.stream.remote.StreamRefContainers.ActorRef.Builder, akka.stream.remote.StreamRefContainers.ActorRefOrBuilder>( + originRef_, + getParentForChildren(), + isClean()); + originRef_ = null; + } + return originRefBuilder_; + } + + // @@protoc_insertion_point(builder_scope:SourceRef) + } + + static { + defaultInstance = new SourceRef(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:SourceRef) + } + + public interface ActorRefOrBuilder + extends akka.protobuf.MessageOrBuilder { + + // required string path = 1; + /** + * required string path = 1; + */ + boolean hasPath(); + /** + * required string path = 1; + */ + java.lang.String getPath(); + /** + * required string path = 1; + */ + akka.protobuf.ByteString + getPathBytes(); + } + /** + * Protobuf type {@code ActorRef} + */ + public static final class ActorRef extends + akka.protobuf.GeneratedMessage + implements ActorRefOrBuilder { + // Use ActorRef.newBuilder() to construct. + private ActorRef(akka.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private ActorRef(boolean noInit) { this.unknownFields = akka.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final ActorRef defaultInstance; + public static ActorRef getDefaultInstance() { + return defaultInstance; + } + + public ActorRef getDefaultInstanceForType() { + return defaultInstance; + } + + private final akka.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final akka.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private ActorRef( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + akka.protobuf.UnknownFieldSet.Builder unknownFields = + akka.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 10: { + bitField0_ |= 0x00000001; + path_ = input.readBytes(); + break; + } + } + } + } catch (akka.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new akka.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final akka.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.stream.remote.StreamRefContainers.internal_static_ActorRef_descriptor; + } + + protected akka.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.stream.remote.StreamRefContainers.internal_static_ActorRef_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.stream.remote.StreamRefContainers.ActorRef.class, akka.stream.remote.StreamRefContainers.ActorRef.Builder.class); + } + + public static akka.protobuf.Parser PARSER = + new akka.protobuf.AbstractParser() { + public ActorRef parsePartialFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return new ActorRef(input, extensionRegistry); + } + }; + + @java.lang.Override + public akka.protobuf.Parser getParserForType() { + return PARSER; + } + + private int bitField0_; + // required string path = 1; + public static final int PATH_FIELD_NUMBER = 1; + private java.lang.Object path_; + /** + * required string path = 1; + */ + public boolean hasPath() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required string path = 1; + */ + public java.lang.String getPath() { + java.lang.Object ref = path_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + akka.protobuf.ByteString bs = + (akka.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + path_ = s; + } + return s; + } + } + /** + * required string path = 1; + */ + public akka.protobuf.ByteString + getPathBytes() { + java.lang.Object ref = path_; + if (ref instanceof java.lang.String) { + akka.protobuf.ByteString b = + akka.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + path_ = b; + return b; + } else { + return (akka.protobuf.ByteString) ref; + } + } + + private void initFields() { + path_ = ""; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasPath()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(akka.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(1, getPathBytes()); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += akka.protobuf.CodedOutputStream + .computeBytesSize(1, getPathBytes()); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static akka.stream.remote.StreamRefContainers.ActorRef parseFrom( + akka.protobuf.ByteString data) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.stream.remote.StreamRefContainers.ActorRef parseFrom( + akka.protobuf.ByteString data, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.stream.remote.StreamRefContainers.ActorRef parseFrom(byte[] data) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.stream.remote.StreamRefContainers.ActorRef parseFrom( + byte[] data, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.stream.remote.StreamRefContainers.ActorRef parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.stream.remote.StreamRefContainers.ActorRef parseFrom( + java.io.InputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static akka.stream.remote.StreamRefContainers.ActorRef parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static akka.stream.remote.StreamRefContainers.ActorRef parseDelimitedFrom( + java.io.InputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static akka.stream.remote.StreamRefContainers.ActorRef parseFrom( + akka.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.stream.remote.StreamRefContainers.ActorRef parseFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(akka.stream.remote.StreamRefContainers.ActorRef prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + akka.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code ActorRef} + */ + public static final class Builder extends + akka.protobuf.GeneratedMessage.Builder + implements akka.stream.remote.StreamRefContainers.ActorRefOrBuilder { + public static final akka.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.stream.remote.StreamRefContainers.internal_static_ActorRef_descriptor; + } + + protected akka.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.stream.remote.StreamRefContainers.internal_static_ActorRef_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.stream.remote.StreamRefContainers.ActorRef.class, akka.stream.remote.StreamRefContainers.ActorRef.Builder.class); + } + + // Construct using akka.stream.remote.StreamRefContainers.ActorRef.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + akka.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + path_ = ""; + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public akka.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return akka.stream.remote.StreamRefContainers.internal_static_ActorRef_descriptor; + } + + public akka.stream.remote.StreamRefContainers.ActorRef getDefaultInstanceForType() { + return akka.stream.remote.StreamRefContainers.ActorRef.getDefaultInstance(); + } + + public akka.stream.remote.StreamRefContainers.ActorRef build() { + akka.stream.remote.StreamRefContainers.ActorRef result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public akka.stream.remote.StreamRefContainers.ActorRef buildPartial() { + akka.stream.remote.StreamRefContainers.ActorRef result = new akka.stream.remote.StreamRefContainers.ActorRef(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.path_ = path_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(akka.protobuf.Message other) { + if (other instanceof akka.stream.remote.StreamRefContainers.ActorRef) { + return mergeFrom((akka.stream.remote.StreamRefContainers.ActorRef)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(akka.stream.remote.StreamRefContainers.ActorRef other) { + if (other == akka.stream.remote.StreamRefContainers.ActorRef.getDefaultInstance()) return this; + if (other.hasPath()) { + bitField0_ |= 0x00000001; + path_ = other.path_; + onChanged(); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasPath()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + akka.stream.remote.StreamRefContainers.ActorRef parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (akka.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (akka.stream.remote.StreamRefContainers.ActorRef) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // required string path = 1; + private java.lang.Object path_ = ""; + /** + * required string path = 1; + */ + public boolean hasPath() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required string path = 1; + */ + public java.lang.String getPath() { + java.lang.Object ref = path_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((akka.protobuf.ByteString) ref) + .toStringUtf8(); + path_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * required string path = 1; + */ + public akka.protobuf.ByteString + getPathBytes() { + java.lang.Object ref = path_; + if (ref instanceof String) { + akka.protobuf.ByteString b = + akka.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + path_ = b; + return b; + } else { + return (akka.protobuf.ByteString) ref; + } + } + /** + * required string path = 1; + */ + public Builder setPath( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + path_ = value; + onChanged(); + return this; + } + /** + * required string path = 1; + */ + public Builder clearPath() { + bitField0_ = (bitField0_ & ~0x00000001); + path_ = getDefaultInstance().getPath(); + onChanged(); + return this; + } + /** + * required string path = 1; + */ + public Builder setPathBytes( + akka.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + path_ = value; + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:ActorRef) + } + + static { + defaultInstance = new ActorRef(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:ActorRef) + } + + public interface OptionOrBuilder + extends akka.protobuf.MessageOrBuilder { + + // optional .Payload value = 1; + /** + * optional .Payload value = 1; + */ + boolean hasValue(); + /** + * optional .Payload value = 1; + */ + akka.stream.remote.StreamRefContainers.Payload getValue(); + /** + * optional .Payload value = 1; + */ + akka.stream.remote.StreamRefContainers.PayloadOrBuilder getValueOrBuilder(); + } + /** + * Protobuf type {@code Option} + */ + public static final class Option extends + akka.protobuf.GeneratedMessage + implements OptionOrBuilder { + // Use Option.newBuilder() to construct. + private Option(akka.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private Option(boolean noInit) { this.unknownFields = akka.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final Option defaultInstance; + public static Option getDefaultInstance() { + return defaultInstance; + } + + public Option getDefaultInstanceForType() { + return defaultInstance; + } + + private final akka.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final akka.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private Option( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + akka.protobuf.UnknownFieldSet.Builder unknownFields = + akka.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 10: { + akka.stream.remote.StreamRefContainers.Payload.Builder subBuilder = null; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + subBuilder = value_.toBuilder(); + } + value_ = input.readMessage(akka.stream.remote.StreamRefContainers.Payload.PARSER, extensionRegistry); + if (subBuilder != null) { + subBuilder.mergeFrom(value_); + value_ = subBuilder.buildPartial(); + } + bitField0_ |= 0x00000001; + break; + } + } + } + } catch (akka.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new akka.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final akka.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.stream.remote.StreamRefContainers.internal_static_Option_descriptor; + } + + protected akka.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.stream.remote.StreamRefContainers.internal_static_Option_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.stream.remote.StreamRefContainers.Option.class, akka.stream.remote.StreamRefContainers.Option.Builder.class); + } + + public static akka.protobuf.Parser