!str #17039 change default materialization placeholder type from Unit to Any

This commit is contained in:
Mathias 2015-04-02 16:52:30 +02:00
parent 52117b9020
commit 85c0571620
11 changed files with 51 additions and 54 deletions

View file

@ -66,10 +66,10 @@ public class FlowGraphTest extends StreamTest {
.of(String.class) .of(String.class)
.section( .section(
OperationAttributes.name("f1"), OperationAttributes.name("f1"),
new Function<Flow<String, String, BoxedUnit>, Flow<String, String, BoxedUnit>>() { new Function<Flow<String, String, Object>, Flow<String, String, Object>>() {
@Override @Override
public Flow<String, String, BoxedUnit> apply( public Flow<String, String, Object> apply(
Flow<String, String, BoxedUnit> flow) { Flow<String, String, Object> flow) {
return flow.transform(FlowGraphTest.this.<String> op()); return flow.transform(FlowGraphTest.this.<String> op());
} }
}); });
@ -77,10 +77,10 @@ public class FlowGraphTest extends StreamTest {
.of(String.class) .of(String.class)
.section( .section(
OperationAttributes.name("f2"), OperationAttributes.name("f2"),
new Function<Flow<String, String, BoxedUnit>, Flow<String, String, BoxedUnit>>() { new Function<Flow<String, String, Object>, Flow<String, String, Object>>() {
@Override @Override
public Flow<String, String, BoxedUnit> apply( public Flow<String, String, Object> apply(
Flow<String, String, BoxedUnit> flow) { Flow<String, String, Object> flow) {
return flow.transform(FlowGraphTest.this.<String> op()); return flow.transform(FlowGraphTest.this.<String> op());
} }
}); });
@ -88,10 +88,10 @@ public class FlowGraphTest extends StreamTest {
.of(String.class) .of(String.class)
.section( .section(
OperationAttributes.name("f3"), OperationAttributes.name("f3"),
new Function<Flow<String, String, BoxedUnit>, Flow<String, String, BoxedUnit>>() { new Function<Flow<String, String, Object>, Flow<String, String, Object>>() {
@Override @Override
public Flow<String, String, BoxedUnit> apply( public Flow<String, String, Object> apply(
Flow<String, String, BoxedUnit> flow) { Flow<String, String, Object> flow) {
return flow.transform(FlowGraphTest.this.<String> op()); return flow.transform(FlowGraphTest.this.<String> op());
} }
}); });

View file

@ -19,6 +19,7 @@ import akka.testkit.JavaTestKit;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import scala.runtime.Boxed;
import scala.runtime.BoxedUnit; import scala.runtime.BoxedUnit;
import org.junit.ClassRule; import org.junit.ClassRule;
@ -245,10 +246,10 @@ public class FlowTest extends StreamTest {
.of(String.class) .of(String.class)
.section( .section(
OperationAttributes.name("f1"), OperationAttributes.name("f1"),
new Function<Flow<String, String, BoxedUnit>, Flow<String, String, BoxedUnit>>() { new Function<Flow<String, String, Object>, Flow<String, String, Object>>() {
@Override @Override
public Flow<String, String, BoxedUnit> apply( public Flow<String, String, Object> apply(
Flow<String, String, BoxedUnit> flow) { Flow<String, String, Object> flow) {
return flow.transform(FlowTest.this.<String> op()); return flow.transform(FlowTest.this.<String> op());
} }
}); });
@ -256,10 +257,10 @@ public class FlowTest extends StreamTest {
.of(String.class) .of(String.class)
.section( .section(
OperationAttributes.name("f2"), OperationAttributes.name("f2"),
new Function<Flow<String, String, BoxedUnit>, Flow<String, String, BoxedUnit>>() { new Function<Flow<String, String, Object>, Flow<String, String, Object>>() {
@Override @Override
public Flow<String, String, BoxedUnit> apply( public Flow<String, String, Object> apply(
Flow<String, String, BoxedUnit> flow) { Flow<String, String, Object> flow) {
return flow.transform(FlowTest.this.<String> op()); return flow.transform(FlowTest.this.<String> op());
} }
}); });
@ -267,10 +268,10 @@ public class FlowTest extends StreamTest {
.of(String.class) .of(String.class)
.section( .section(
OperationAttributes.name("f3"), OperationAttributes.name("f3"),
new Function<Flow<String, String, BoxedUnit>, Flow<String, String, BoxedUnit>>() { new Function<Flow<String, String, Object>, Flow<String, String, Object>>() {
@Override @Override
public Flow<String, String, BoxedUnit> apply( public Flow<String, String, Object> apply(
Flow<String, String, BoxedUnit> flow) { Flow<String, String, Object> flow) {
return flow.transform(FlowTest.this.<String> op()); return flow.transform(FlowTest.this.<String> op());
} }
}); });
@ -375,7 +376,7 @@ public class FlowTest extends StreamTest {
mainInputs.add(Source.from(input2)); mainInputs.add(Source.from(input2));
final Flow<Source<Integer, BoxedUnit>, List<Integer>, BoxedUnit> flow = Flow.<Source<Integer, BoxedUnit>>create(). final Flow<Source<Integer, BoxedUnit>, List<Integer>, BoxedUnit> flow = Flow.<Source<Integer, BoxedUnit>>create().
flatten(akka.stream.javadsl.FlattenStrategy.<Integer> concat()).grouped(6); flatten(akka.stream.javadsl.FlattenStrategy.<Integer, BoxedUnit> concat()).grouped(6);
Future<List<Integer>> future = Source.from(mainInputs).via(flow) Future<List<Integer>> future = Source.from(mainInputs).via(flow)
.runWith(Sink.<List<Integer>>head(), materializer); .runWith(Sink.<List<Integer>>head(), materializer);

View file

@ -342,7 +342,7 @@ public class SourceTest extends StreamTest {
mainInputs.add(Source.from(input2)); mainInputs.add(Source.from(input2));
Future<List<Integer>> future = Source.from(mainInputs) Future<List<Integer>> future = Source.from(mainInputs)
.flatten(akka.stream.javadsl.FlattenStrategy.<Integer>concat()).grouped(6) .flatten(akka.stream.javadsl.FlattenStrategy.<Integer, BoxedUnit>concat()).grouped(6)
.runWith(Sink.<List<Integer>>head(), materializer); .runWith(Sink.<List<Integer>>head(), materializer);
List<Integer> result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); List<Integer> result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));

View file

@ -3,14 +3,12 @@
*/ */
package akka.stream.extra package akka.stream.extra
import scala.collection.immutable
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.language.implicitConversions import scala.language.implicitConversions
import scala.language.existentials import scala.language.existentials
import akka.stream.scaladsl.OperationAttributes._ import akka.stream.scaladsl.OperationAttributes._
import akka.stream.scaladsl.Source import akka.stream.scaladsl.{ Keep, Source, Flow }
import akka.stream.scaladsl.Flow
import akka.stream.stage._ import akka.stream.stage._
/** /**
@ -28,11 +26,11 @@ private[akka] trait TimedOps {
def timed[I, O, Mat, Mat2](source: Source[I, Mat], measuredOps: Source[I, Mat] Source[O, Mat2], onComplete: FiniteDuration Unit): Source[O, Mat2] = { def timed[I, O, Mat, Mat2](source: Source[I, Mat], measuredOps: Source[I, Mat] Source[O, Mat2], onComplete: FiniteDuration Unit): Source[O, Mat2] = {
val ctx = new TimedFlowContext val ctx = new TimedFlowContext
val startTimed = (f: Flow[I, I, Unit]) f.transform(() new StartTimedFlow(ctx)) val startTimed = (f: Flow[I, I, Any]) f.transform(() new StartTimedFlow(ctx))
val stopTimed = (f: Flow[O, O, Unit]) f.transform(() new StopTimed(ctx, onComplete)) val stopTimed = (f: Flow[O, O, Any]) f.transform(() new StopTimed(ctx, onComplete))
val begin = source.section(name("startTimed"), (originalMat: Mat, _: Unit) originalMat)(startTimed) val begin = source.section(name("startTimed"))(startTimed)
measuredOps(begin).section(name("stopTimed"), (originalMat: Mat2, _: Unit) originalMat)(stopTimed) measuredOps(begin).section(name("stopTimed"))(stopTimed)
} }
/** /**
@ -45,11 +43,11 @@ private[akka] trait TimedOps {
// they do share a super-type (FlowOps), but all operations of FlowOps return path dependant type // they do share a super-type (FlowOps), but all operations of FlowOps return path dependant type
val ctx = new TimedFlowContext val ctx = new TimedFlowContext
val startTimed = (f: Flow[O, O, Unit]) f.transform(() new StartTimedFlow(ctx)) val startTimed = (f: Flow[O, O, Any]) f.transform(() new StartTimedFlow(ctx))
val stopTimed = (f: Flow[Out, Out, Unit]) f.transform(() new StopTimed(ctx, onComplete)) val stopTimed = (f: Flow[Out, Out, Any]) f.transform(() new StopTimed(ctx, onComplete))
val begin: Flow[I, O, Mat] = flow.section(name("startTimed"), (originalMat: Mat, _: Unit) originalMat)(startTimed) val begin: Flow[I, O, Mat] = flow.section(name("startTimed"))(startTimed)
measuredOps(begin).section(name("stopTimed"), (originalMat: Mat2, _: Unit) originalMat)(stopTimed) measuredOps(begin).section(name("stopTimed"))(stopTimed)
} }
} }
@ -67,7 +65,7 @@ private[akka] trait TimedIntervalBetweenOps {
* Measures rolling interval between immediately subsequent `matching(o: O)` elements. * Measures rolling interval between immediately subsequent `matching(o: O)` elements.
*/ */
def timedIntervalBetween[O, Mat](source: Source[O, Mat], matching: O Boolean, onInterval: FiniteDuration Unit): Source[O, Mat] = { def timedIntervalBetween[O, Mat](source: Source[O, Mat], matching: O Boolean, onInterval: FiniteDuration Unit): Source[O, Mat] = {
source.section(name("timedInterval"), (originalMat: Mat, _: Unit) originalMat) { source.section(name("timedInterval")) {
_.transform(() new TimedIntervalTransformer[O](matching, onInterval)) _.transform(() new TimedIntervalTransformer[O](matching, onInterval))
} }
} }
@ -78,7 +76,7 @@ private[akka] trait TimedIntervalBetweenOps {
def timedIntervalBetween[I, O, Mat](flow: Flow[I, O, Mat], matching: O Boolean, onInterval: FiniteDuration Unit): Flow[I, O, Mat] = { def timedIntervalBetween[I, O, Mat](flow: Flow[I, O, Mat], matching: O Boolean, onInterval: FiniteDuration Unit): Flow[I, O, Mat] = {
// todo is there any other way to provide this for Flow / Duct, without duplicating impl? // todo is there any other way to provide this for Flow / Duct, without duplicating impl?
// they do share a super-type (FlowOps), but all operations of FlowOps return path dependant type // they do share a super-type (FlowOps), but all operations of FlowOps return path dependant type
flow.section(name("timedInterval"), (originalMat: Mat, _: Unit) originalMat) { flow.section(name("timedInterval")) {
_.transform(() new TimedIntervalTransformer[O](matching, onInterval)) _.transform(() new TimedIntervalTransformer[O](matching, onInterval))
} }
} }

View file

@ -17,11 +17,11 @@ object FlattenStrategy {
* emitting its elements directly to the output until it completes and then taking the next stream. This has the * emitting its elements directly to the output until it completes and then taking the next stream. This has the
* consequence that if one of the input stream is infinite, no other streams after that will be consumed from. * consequence that if one of the input stream is infinite, no other streams after that will be consumed from.
*/ */
def concat[T]: FlattenStrategy[Source[T, Unit], T] = Concat[T]() def concat[T, U]: FlattenStrategy[Source[T, U], T] = Concat[T, U]()
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] final case class Concat[T]() extends FlattenStrategy[Source[T, _], T] private[akka] final case class Concat[T, U]() extends FlattenStrategy[Source[T, U], T]
} }

View file

@ -411,10 +411,10 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
/** /**
* Applies given [[OperationAttributes]] to a given section. * Applies given [[OperationAttributes]] to a given section.
*/ */
def section[O, M](attributes: OperationAttributes, section: japi.Function[javadsl.Flow[Out, Out, Unit], javadsl.Flow[Out, O, M]] @uncheckedVariance): javadsl.Flow[In, O, M] = def section[O](attributes: OperationAttributes, section: japi.Function[javadsl.Flow[Out, Out, Any], javadsl.Flow[Out, O, Any]] @uncheckedVariance): javadsl.Flow[In, O, Mat] =
new Flow(delegate.section(attributes.asScala) { new Flow(delegate.section(attributes.asScala) {
val scalaToJava = (flow: scaladsl.Flow[Out, Out, Unit]) new javadsl.Flow(flow) val scalaToJava = (flow: scaladsl.Flow[Out, Out, Any]) new javadsl.Flow(flow)
val javaToScala = (flow: javadsl.Flow[Out, O, M]) flow.asScala val javaToScala = (flow: javadsl.Flow[Out, O, Any]) flow.asScala
scalaToJava andThen section.apply andThen javaToScala scalaToJava andThen section.apply andThen javaToScala
}) })

View file

@ -501,10 +501,10 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
/** /**
* Applies given [[OperationAttributes]] to a given section. * Applies given [[OperationAttributes]] to a given section.
*/ */
def section[O, M](attributes: OperationAttributes, section: japi.Function[javadsl.Flow[Out, Out, Unit], javadsl.Flow[Out, O, M]] @uncheckedVariance): javadsl.Source[O, M] = def section[O](attributes: OperationAttributes, section: japi.Function[javadsl.Flow[Out, Out, Any], javadsl.Flow[Out, O, Any]] @uncheckedVariance): javadsl.Source[O, Mat] =
new Source(delegate.section(attributes.asScala) { new Source(delegate.section(attributes.asScala) {
val scalaToJava = (source: scaladsl.Flow[Out, Out, Unit]) new javadsl.Flow(source) val scalaToJava = (source: scaladsl.Flow[Out, Out, Any]) new javadsl.Flow(source)
val javaToScala = (source: javadsl.Flow[Out, O, M]) source.asScala val javaToScala = (source: javadsl.Flow[Out, O, Any]) source.asScala
scalaToJava andThen section.apply andThen javaToScala scalaToJava andThen section.apply andThen javaToScala
}) })

View file

@ -3,12 +3,10 @@
*/ */
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.stream.javadsl
/** /**
* Strategy that defines how a stream of streams should be flattened into a stream of simple elements. * Strategy that defines how a stream of streams should be flattened into a stream of simple elements.
*/ */
abstract class FlattenStrategy[-S, T] abstract class FlattenStrategy[-S, +T]
object FlattenStrategy { object FlattenStrategy {
@ -17,7 +15,7 @@ object FlattenStrategy {
* emitting its elements directly to the output until it completes and then taking the next stream. This has the * emitting its elements directly to the output until it completes and then taking the next stream. This has the
* consequence that if one of the input stream is infinite, no other streams after that will be consumed from. * consequence that if one of the input stream is infinite, no other streams after that will be consumed from.
*/ */
def concat[T]: FlattenStrategy[Source[T, _], T] = Concat[T]() def concat[T]: FlattenStrategy[Source[T, Any], T] = Concat[T]()
private[akka] final case class Concat[T]() extends FlattenStrategy[Source[T, _], T] private[akka] final case class Concat[T]() extends FlattenStrategy[Source[T, Any], T]
} }

View file

@ -269,7 +269,8 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
} }
// FIXME remove (in favor of .via) // FIXME remove (in favor of .via)
def section[O, O2 >: Out, Mat2, Mat3](attributes: OperationAttributes, combine: (Mat, Mat2) Mat3)(section: Flow[O2, O2, Unit] Flow[O2, O, Mat2]): Flow[In, O, Mat3] = { def section[O, O2 >: Out, Mat2, Mat3](attributes: OperationAttributes, combine: (Mat, Mat2) Mat3)(
section: Flow[O2, O2, Unit] Flow[O2, O, Mat2]): Flow[In, O, Mat3] = {
val subFlow = section(Flow[O2]).module.carbonCopy.withAttributes(attributes).wrap() val subFlow = section(Flow[O2]).module.carbonCopy.withAttributes(attributes).wrap()
if (this.isIdentity) new Flow(subFlow).asInstanceOf[Flow[In, O, Mat3]] if (this.isIdentity) new Flow(subFlow).asInstanceOf[Flow[In, O, Mat3]]
else new Flow( else new Flow(
@ -282,8 +283,8 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module)
* Applies given [[OperationAttributes]] to a given section. * Applies given [[OperationAttributes]] to a given section.
*/ */
// FIXME remove (in favor of .via) // FIXME remove (in favor of .via)
def section[O, O2 >: Out, Mat2](attributes: OperationAttributes)(section: Flow[O2, O2, Unit] Flow[O2, O, Mat2]): Flow[In, O, Mat2] = { def section[O, O2 >: Out](attributes: OperationAttributes)(section: Flow[O2, O2, Unit] Flow[O2, O, Any]): Flow[In, O, Mat] = {
this.section[O, O2, Mat2, Mat2](attributes, Keep.right)(section) this.section[O, O2, Any, Mat](attributes, Keep.left)(section)
} }
/** Converts this Scala DSL element to it's Java DSL counterpart. */ /** Converts this Scala DSL element to it's Java DSL counterpart. */
@ -632,7 +633,7 @@ trait FlowOps[+Out, +Mat] {
* This operation can be used on a stream of element type [[akka.stream.scaladsl.Source]]. * This operation can be used on a stream of element type [[akka.stream.scaladsl.Source]].
*/ */
def flatten[U](strategy: FlattenStrategy[Out, U]): Repr[U, Mat] = strategy match { def flatten[U](strategy: FlattenStrategy[Out, U]): Repr[U, Mat] = strategy match {
case _: FlattenStrategy.Concat[Out] | _: javadsl.FlattenStrategy.Concat[Out] andThen(ConcatAll()) case _: FlattenStrategy.Concat[Out] | _: javadsl.FlattenStrategy.Concat[Out, _] andThen(ConcatAll())
case _ case _
throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getName}]") throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getName}]")
} }

View file

@ -327,7 +327,7 @@ object FlowGraph extends GraphApply {
b.addEdge(importAndGetPort(b), to) b.addEdge(importAndGetPort(b), to)
} }
def ~>[Out](via: Flow[T, Out, _])(implicit b: Builder[_]): PortOps[Out, Unit] = { def ~>[Out](via: Flow[T, Out, Any])(implicit b: Builder[_]): PortOps[Out, Unit] = {
val s = b.add(via) val s = b.add(via)
b.addEdge(importAndGetPort(b), s.inlet) b.addEdge(importAndGetPort(b), s.inlet)
s.outlet s.outlet

View file

@ -147,9 +147,8 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
.replaceShape(SourceShape(subFlow.shape.outlets.head))) .replaceShape(SourceShape(subFlow.shape.outlets.head)))
} }
def section[O, O2 >: Out, Mat2](attributes: OperationAttributes)(section: Flow[O2, O2, Unit] Flow[O2, O, Mat2]): Source[O, Mat2] = { def section[O, O2 >: Out](attributes: OperationAttributes)(section: Flow[O2, O2, Unit] Flow[O2, O, Any]): Source[O, Mat] =
this.section[O, O2, Mat2, Mat2](attributes, (parentm: Mat, subm: Mat2) subm)(section) this.section[O, O2, Any, Mat](attributes, Keep.left)(section)
}
override def withAttributes(attr: OperationAttributes): Repr[Out, Mat] = override def withAttributes(attr: OperationAttributes): Repr[Out, Mat] =
new Source(module.withAttributes(attr).wrap()) new Source(module.withAttributes(attr).wrap())