#19261 override and test addAttributes compilation

This commit is contained in:
Roland Kuhn 2015-12-22 20:56:02 +01:00
parent 0f3d3c21e1
commit 8a5a420108
20 changed files with 249 additions and 17 deletions

View file

@ -272,4 +272,10 @@ public class BidiFlowTest extends StreamTest {
Arrays.sort(rr);
assertArrayEquals(new Long[] { 3L, 12L }, rr);
}
public void mustSuitablyOverrideAttributeHandlingMethods() {
@SuppressWarnings("unused")
final BidiFlow<Integer, Long, ByteString, String, BoxedUnit> b =
bidi.withAttributes(Attributes.name("")).addAttributes(Attributes.asyncBoundary()).named("");
}
}

View file

@ -785,4 +785,9 @@ public class FlowTest extends StreamTest {
assertEquals((Object) 0, result);
}
public void mustSuitablyOverrideAttributeHandlingMethods() {
@SuppressWarnings("unused")
final Flow<Integer, Integer, BoxedUnit> f =
Flow.of(Integer.class).withAttributes(Attributes.name("")).addAttributes(Attributes.asyncBoundary()).named("");
}
}

View file

@ -13,16 +13,13 @@ import java.util.concurrent.TimeUnit;
import akka.actor.ActorRef;
import akka.japi.function.Function;
import akka.japi.function.Procedure;
import akka.stream.Graph;
import akka.stream.UniformFanInShape;
import akka.stream.UniformFanOutShape;
import akka.stream.*;
import org.junit.ClassRule;
import org.junit.Test;
import org.reactivestreams.Publisher;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import akka.stream.StreamTest;
import akka.japi.function.Function2;
import akka.stream.testkit.AkkaSpec;
import akka.testkit.JavaTestKit;
@ -101,4 +98,9 @@ public class SinkTest extends StreamTest {
probe2.expectMsgEquals("done2");
}
public void mustSuitablyOverrideAttributeHandlingMethods() {
@SuppressWarnings("unused")
final Sink<Integer, Future<Integer>> s =
Sink.<Integer> head().withAttributes(Attributes.name("")).addAttributes(Attributes.asyncBoundary()).named("");
}
}

View file

@ -11,10 +11,7 @@ import akka.dispatch.OnSuccess;
import akka.japi.JavaPartialFunction;
import akka.japi.Pair;
import akka.japi.function.*;
import akka.stream.Graph;
import akka.stream.OverflowStrategy;
import akka.stream.StreamTest;
import akka.stream.UniformFanInShape;
import akka.stream.*;
import akka.stream.impl.ConstantFun;
import akka.stream.stage.*;
import akka.stream.testkit.AkkaSpec;
@ -776,4 +773,9 @@ public class SourceTest extends StreamTest {
assertEquals((Object) 0, result);
}
public void mustSuitablyOverrideAttributeHandlingMethods() {
@SuppressWarnings("unused")
final Source<Integer, BoxedUnit> f =
Source.single(42).withAttributes(Attributes.name("")).addAttributes(Attributes.asyncBoundary()).named("");
}
}

View file

@ -111,6 +111,11 @@ class BidiFlowSpec extends AkkaSpec with ConversionCheckedTripleEquals {
Await.result(r, 1.second).toSet should ===(Set(3L, 12L))
}
"suitably override attribute handling methods" in {
import Attributes._
val b: BidiFlow[Int, Long, ByteString, String, Unit] = bidi.withAttributes(name("")).addAttributes(asyncBoundary).named("")
}
}
}

View file

@ -586,6 +586,11 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
}
}
}
"suitably override attribute handling methods" in {
import Attributes._
val f: Flow[Int, Int, Unit] = Flow[Int].withAttributes(asyncBoundary).addAttributes(none).named("")
}
}
object TestException extends RuntimeException with NoStackTrace

View file

@ -3,9 +3,10 @@
*/
package akka.stream.scaladsl
import akka.stream.{ SinkShape, ActorMaterializer }
import akka.stream._
import akka.stream.testkit.TestPublisher.ManualProbe
import akka.stream.testkit._
import scala.concurrent.Future
class SinkSpec extends AkkaSpec {
@ -119,6 +120,10 @@ class SinkSpec extends AkkaSpec {
}
}
"suitably override attribute handling methods" in {
import Attributes._
val s: Sink[Int, Future[Int]] = Sink.head[Int].withAttributes(asyncBoundary).addAttributes(none).named("")
}
}
}

View file

@ -10,7 +10,7 @@ import scala.concurrent.{ Future, Await }
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.control.NoStackTrace
import akka.stream.{ SourceShape, ActorMaterializer }
import akka.stream._
import akka.stream.testkit._
class SourceSpec extends AkkaSpec with DefaultTimeout with ScalaFutures {
@ -268,4 +268,11 @@ class SourceSpec extends AkkaSpec with DefaultTimeout with ScalaFutures {
}
}
"A Source" must {
"suitably override attribute handling methods" in {
import Attributes._
val s: Source[Int, Unit] = Source.single(42).withAttributes(asyncBoundary).addAttributes(none).named("")
}
}
}

View file

@ -28,6 +28,12 @@ class SubFlowImpl[In, Out, Mat, F[+_], C](val subFlow: Flow[In, Out, Unit],
override def withAttributes(attr: Attributes): SubFlow[Out, Mat, F, C] =
new SubFlowImpl[In, Out, Mat, F, C](subFlow.withAttributes(attr), mergeBackFunction, finishFunction)
override def addAttributes(attr: Attributes): SubFlow[Out, Mat, F, C] =
new SubFlowImpl[In, Out, Mat, F, C](subFlow.addAttributes(attr), mergeBackFunction, finishFunction)
override def named(name: String): SubFlow[Out, Mat, F, C] =
new SubFlowImpl[In, Out, Mat, F, C](subFlow.named(name), mergeBackFunction, finishFunction)
override def mergeSubstreamsWithParallelism(breadth: Int): F[Out] = mergeBackFunction(subFlow, breadth)
def to[M](sink: Graph[SinkShape[Out], M]): C = finishFunction(subFlow.to(sink))

View file

@ -192,6 +192,28 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](delegate: scaladsl.BidiFlow[I1, O
def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): BidiFlow[I1, O1, I2, O2, Mat2] =
new BidiFlow(delegate.mapMaterializedValue(f.apply _))
/**
* Change the attributes of this [[Source]] to the given ones and seal the list
* of attributes. This means that further calls will not be able to remove these
* attributes, but instead add new ones. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def withAttributes(attr: Attributes): BidiFlow[I1, O1, I2, O2, Mat] =
new BidiFlow(delegate.withAttributes(attr))
/**
* Add the given attributes to this Source. Further calls to `withAttributes`
* will not remove these attributes. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def addAttributes(attr: Attributes): BidiFlow[I1, O1, I2, O2, Mat] =
new BidiFlow(delegate.addAttributes(attr))
/**
* Add a ``name`` attribute to this Flow.
*/
override def named(name: String): BidiFlow[I1, O1, I2, O2, Mat] =
new BidiFlow(delegate.named(name))
}

View file

@ -1431,9 +1431,28 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
def initialDelay(delay: FiniteDuration): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.initialDelay(delay))
/**
* Change the attributes of this [[Source]] to the given ones and seal the list
* of attributes. This means that further calls will not be able to remove these
* attributes, but instead add new ones. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def withAttributes(attr: Attributes): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.withAttributes(attr))
/**
* Add the given attributes to this Source. Further calls to `withAttributes`
* will not remove these attributes. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def addAttributes(attr: Attributes): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.addAttributes(attr))
/**
* Add a ``name`` attribute to this Flow.
*/
override def named(name: String): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.named(name))

View file

@ -259,9 +259,28 @@ final class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[Sink
def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): Sink[In, Mat2] =
new Sink(delegate.mapMaterializedValue(f.apply _))
/**
* Change the attributes of this [[Source]] to the given ones and seal the list
* of attributes. This means that further calls will not be able to remove these
* attributes, but instead add new ones. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def withAttributes(attr: Attributes): javadsl.Sink[In, Mat] =
new Sink(delegate.withAttributes(attr))
/**
* Add the given attributes to this Source. Further calls to `withAttributes`
* will not remove these attributes. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def addAttributes(attr: Attributes): javadsl.Sink[In, Mat] =
new Sink(delegate.addAttributes(attr))
/**
* Add a ``name`` attribute to this Flow.
*/
override def named(name: String): javadsl.Sink[In, Mat] =
new Sink(delegate.named(name))
}

View file

@ -1599,9 +1599,28 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
def initialDelay(delay: FiniteDuration): javadsl.Source[Out, Mat] =
new Source(delegate.initialDelay(delay))
/**
* Change the attributes of this [[Source]] to the given ones and seal the list
* of attributes. This means that further calls will not be able to remove these
* attributes, but instead add new ones. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def withAttributes(attr: Attributes): javadsl.Source[Out, Mat] =
new Source(delegate.withAttributes(attr))
/**
* Add the given attributes to this Source. Further calls to `withAttributes`
* will not remove these attributes. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def addAttributes(attr: Attributes): javadsl.Source[Out, Mat] =
new Source(delegate.addAttributes(attr))
/**
* Add a ``name`` attribute to this Flow.
*/
override def named(name: String): javadsl.Source[Out, Mat] =
new Source(delegate.named(name))

View file

@ -1040,9 +1040,28 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
def initialDelay(delay: FiniteDuration): SubFlow[In, Out, Mat] =
new SubFlow(delegate.initialDelay(delay))
/**
* Change the attributes of this [[Source]] to the given ones and seal the list
* of attributes. This means that further calls will not be able to remove these
* attributes, but instead add new ones. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
def withAttributes(attr: Attributes): SubFlow[In, Out, Mat] =
new SubFlow(delegate.withAttributes(attr))
/**
* Add the given attributes to this Source. Further calls to `withAttributes`
* will not remove these attributes. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
def addAttributes(attr: Attributes): SubFlow[In, Out, Mat] =
new SubFlow(delegate.addAttributes(attr))
/**
* Add a ``name`` attribute to this Flow.
*/
def named(name: String): SubFlow[In, Out, Mat] =
new SubFlow(delegate.named(name))

View file

@ -1039,9 +1039,28 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
def initialDelay(delay: FiniteDuration): SubSource[Out, Mat] =
new SubSource(delegate.initialDelay(delay))
/**
* Change the attributes of this [[Source]] to the given ones and seal the list
* of attributes. This means that further calls will not be able to remove these
* attributes, but instead add new ones. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
def withAttributes(attr: Attributes): SubSource[Out, Mat] =
new SubSource(delegate.withAttributes(attr))
/**
* Add the given attributes to this Source. Further calls to `withAttributes`
* will not remove these attributes. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
def addAttributes(attr: Attributes): SubSource[Out, Mat] =
new SubSource(delegate.addAttributes(attr))
/**
* Add a ``name`` attribute to this Flow.
*/
def named(name: String): SubSource[Out, Mat] =
new SubSource(delegate.named(name))

View file

@ -128,9 +128,28 @@ final class BidiFlow[-I1, +O1, -I2, +O2, +Mat](private[stream] override val modu
def mapMaterializedValue[Mat2](f: Mat Mat2): BidiFlow[I1, O1, I2, O2, Mat2] =
new BidiFlow(module.transformMaterializedValue(f.asInstanceOf[Any Any]))
/**
* Change the attributes of this [[Source]] to the given ones and seal the list
* of attributes. This means that further calls will not be able to remove these
* attributes, but instead add new ones. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def withAttributes(attr: Attributes): BidiFlow[I1, O1, I2, O2, Mat] =
new BidiFlow(module.withAttributes(attr).nest())
/**
* Add the given attributes to this Source. Further calls to `withAttributes`
* will not remove these attributes. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def addAttributes(attr: Attributes): BidiFlow[I1, O1, I2, O2, Mat] =
withAttributes(module.attributes and attr)
/**
* Add a ``name`` attribute to this Flow.
*/
override def named(name: String): BidiFlow[I1, O1, I2, O2, Mat] =
withAttributes(Attributes.name(name))
}

View file

@ -202,14 +202,27 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
}
/**
* Change the attributes of this [[Flow]] to the given ones. Note that this
* Change the attributes of this [[Flow]] to the given ones and seal the list
* of attributes. This means that further calls will not be able to remove these
* attributes, but instead add new ones. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def withAttributes(attr: Attributes): Repr[Out] =
if (this.module eq EmptyModule) this
if (isIdentity) this
else new Flow(module.withAttributes(attr).nest())
/**
* Add the given attributes to this Flow. Further calls to `withAttributes`
* will not remove these attributes. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def addAttributes(attr: Attributes): Repr[Out] = withAttributes(module.attributes and attr)
/**
* Add a ``name`` attribute to this Flow.
*/
override def named(name: String): Repr[Out] = withAttributes(Attributes.name(name))
/**
@ -1585,7 +1598,9 @@ trait FlowOps[+Out, +Mat] {
def withAttributes(attr: Attributes): Repr[Out]
def named(name: String): Repr[Out] = withAttributes(Attributes.name(name))
def addAttributes(attr: Attributes): Repr[Out]
def named(name: String): Repr[Out]
/** INTERNAL API */
private[scaladsl] def andThen[T](op: SymbolicStage[Out, T]): Repr[T] =

View file

@ -904,6 +904,12 @@ object GraphDSL extends GraphApply {
override def withAttributes(attr: Attributes): Repr[Out] =
throw new UnsupportedOperationException("Cannot set attributes on chained ops from a junction output port")
override def addAttributes(attr: Attributes): Repr[Out] =
throw new UnsupportedOperationException("Cannot set attributes on chained ops from a junction output port")
override def named(name: String): Repr[Out] =
throw new UnsupportedOperationException("Cannot set attributes on chained ops from a junction output port")
override def importAndGetPort(b: Builder[_]): Outlet[Out @uncheckedVariance] = outlet
override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] =

View file

@ -40,9 +40,28 @@ final class Sink[-In, +Mat](private[stream] override val module: Module)
def mapMaterializedValue[Mat2](f: Mat Mat2): Sink[In, Mat2] =
new Sink(module.transformMaterializedValue(f.asInstanceOf[Any Any]))
/**
* Change the attributes of this [[Source]] to the given ones and seal the list
* of attributes. This means that further calls will not be able to remove these
* attributes, but instead add new ones. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def withAttributes(attr: Attributes): Sink[In, Mat] =
new Sink(module.withAttributes(attr).nest())
/**
* Add the given attributes to this Source. Further calls to `withAttributes`
* will not remove these attributes. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def addAttributes(attr: Attributes): Sink[In, Mat] =
withAttributes(module.attributes and attr)
/**
* Add a ``name`` attribute to this Flow.
*/
override def named(name: String): Sink[In, Mat] = withAttributes(Attributes.name(name))
/** Converts this Scala DSL element to it's Java DSL counterpart. */

View file

@ -110,13 +110,26 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
def runForeach(f: Out Unit)(implicit materializer: Materializer): Future[Unit] = runWith(Sink.foreach(f))
/**
* Nests the current Source and returns a Source with the given Attributes
* @param attr the attributes to add
* @return a new Source with the added attributes
* Change the attributes of this [[Source]] to the given ones and seal the list
* of attributes. This means that further calls will not be able to remove these
* attributes, but instead add new ones. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def withAttributes(attr: Attributes): Repr[Out] =
new Source(module.withAttributes(attr).nest()) // User API
new Source(module.withAttributes(attr).nest())
/**
* Add the given attributes to this Source. Further calls to `withAttributes`
* will not remove these attributes. Note that this
* operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def addAttributes(attr: Attributes): Repr[Out] = withAttributes(module.attributes and attr)
/**
* Add a ``name`` attribute to this Flow.
*/
override def named(name: String): Repr[Out] = withAttributes(Attributes.name(name))
/** Converts this Scala DSL element to it's Java DSL counterpart. */