diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlexiDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlexiDocSpec.scala index 230fce172b..022429c53a 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlexiDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlexiDocSpec.scala @@ -9,6 +9,7 @@ import akka.stream.testkit.AkkaSpec import scala.concurrent.Await import scala.concurrent.duration._ import scala.util.control.NoStackTrace +import akka.stream.OperationAttributes object FlexiDocSpec { //#fleximerge-zip-states diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowErrorDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowErrorDocSpec.scala index 0b5dbfd985..9502926710 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowErrorDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowErrorDocSpec.scala @@ -9,6 +9,8 @@ import akka.stream.ActorFlowMaterializerSettings import akka.stream.Supervision import akka.stream.scaladsl._ import akka.stream.testkit.AkkaSpec +import akka.stream.OperationAttributes +import akka.stream.ActorOperationAttributes class FlowErrorDocSpec extends AkkaSpec { @@ -52,7 +54,7 @@ class FlowErrorDocSpec extends AkkaSpec { } val flow = Flow[Int] .filter(100 / _ < 50).map(elem => 100 / (5 - elem)) - .withAttributes(OperationAttributes.supervisionStrategy(decider)) + .withAttributes(ActorOperationAttributes.supervisionStrategy(decider)) val source = Source(0 to 5).via(flow) val result = source.runWith(Sink.fold(0)(_ + _)) @@ -75,7 +77,7 @@ class FlowErrorDocSpec extends AkkaSpec { if (elem < 0) throw new IllegalArgumentException("negative not allowed") else acc + elem } - .withAttributes(OperationAttributes.supervisionStrategy(decider)) + .withAttributes(ActorOperationAttributes.supervisionStrategy(decider)) val source = Source(List(1, 3, -1, 5, 7)).via(flow) val result = source.grouped(1000).runWith(Sink.head) // the negative element cause the scan stage to be restarted, diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala index 1a8d52f21c..ec08d593ec 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala @@ -10,6 +10,7 @@ import akka.stream.testkit.AkkaSpec import scala.collection.immutable import scala.concurrent.{ Future, Await } import scala.concurrent.duration._ +import akka.stream.OperationAttributes class FlowGraphDocSpec extends AkkaSpec { diff --git a/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala index 9d6e0f307f..a27a20c1f0 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala @@ -15,6 +15,8 @@ import akka.actor.Actor import akka.actor.Props import akka.pattern.ask import akka.util.Timeout +import akka.stream.OperationAttributes +import akka.stream.ActorOperationAttributes import scala.concurrent.ExecutionContext import akka.stream.ActorFlowMaterializerSettings import java.util.concurrent.atomic.AtomicInteger @@ -170,7 +172,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { tweets.filter(_.hashtags.contains(akka)).map(_.author) //#email-addresses-mapAsync-supervision - import OperationAttributes.supervisionStrategy + import ActorOperationAttributes.supervisionStrategy import Supervision.resumingDecider val emailAddresses: Source[String, Unit] = @@ -268,7 +270,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { .map { phoneNo => smsServer.send(TextMessage(to = phoneNo, body = "I like your tweet")) } - .withAttributes(OperationAttributes.dispatcher("blocking-dispatcher")) + .withAttributes(ActorOperationAttributes.dispatcher("blocking-dispatcher")) val sendTextMessages: RunnableFlow[Unit] = phoneNumbers.via(send).to(Sink.ignore) diff --git a/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala index 60dfdc9c7b..ce659c8caa 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/StreamBuffersRateSpec.scala @@ -3,6 +3,7 @@ package docs.stream import akka.stream.{ OverflowStrategy, ActorFlowMaterializerSettings, ActorFlowMaterializer } import akka.stream.scaladsl._ import akka.stream.testkit.AkkaSpec +import akka.stream.OperationAttributes class StreamBuffersRateSpec extends AkkaSpec { implicit val mat = ActorFlowMaterializer() diff --git a/akka-http-core/src/main/scala/akka/http/engine/client/OutgoingConnectionBlueprint.scala b/akka-http-core/src/main/scala/akka/http/engine/client/OutgoingConnectionBlueprint.scala index 138f2c41b4..668a271899 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/client/OutgoingConnectionBlueprint.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/client/OutgoingConnectionBlueprint.scala @@ -14,7 +14,7 @@ import akka.util.ByteString import akka.event.LoggingAdapter import akka.stream._ import akka.stream.scaladsl._ -import akka.stream.scaladsl.OperationAttributes._ +import akka.stream.OperationAttributes._ import akka.http.model.{ IllegalResponseException, HttpMethod, HttpRequest, HttpResponse } import akka.http.engine.rendering.{ RequestRenderingContext, HttpRequestRendererFactory } import akka.http.engine.parsing._ diff --git a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpRequestParser.scala b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpRequestParser.scala index dc11882011..b54cb89e6c 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpRequestParser.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/parsing/HttpRequestParser.scala @@ -7,7 +7,7 @@ package akka.http.engine.parsing import java.lang.{ StringBuilder ⇒ JStringBuilder } import scala.annotation.tailrec import akka.actor.ActorRef -import akka.stream.scaladsl.OperationAttributes._ +import akka.stream.OperationAttributes._ import akka.stream.stage.{ Context, PushPullStage } import akka.stream.scaladsl.Flow import akka.stream.scaladsl.{ Keep, Source } diff --git a/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpRequestRendererFactory.scala b/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpRequestRendererFactory.scala index 5da3fb7d2d..ca69fdc608 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpRequestRendererFactory.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpRequestRendererFactory.scala @@ -8,7 +8,7 @@ import java.net.InetSocketAddress import scala.annotation.tailrec import akka.event.LoggingAdapter import akka.util.ByteString -import akka.stream.scaladsl.OperationAttributes._ +import akka.stream.OperationAttributes._ import akka.stream.scaladsl.Source import akka.stream.stage._ import akka.http.model._ diff --git a/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpResponseRendererFactory.scala b/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpResponseRendererFactory.scala index fe73f42f93..109a5ebfb7 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpResponseRendererFactory.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/rendering/HttpResponseRendererFactory.scala @@ -7,7 +7,7 @@ package akka.http.engine.rendering import scala.annotation.tailrec import akka.event.LoggingAdapter import akka.util.ByteString -import akka.stream.scaladsl.OperationAttributes._ +import akka.stream.OperationAttributes._ import akka.stream.scaladsl.Source import akka.stream.stage._ import akka.http.model._ diff --git a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerBluePrint.scala index 0cd5a614eb..9e90944a1a 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServerBluePrint.scala @@ -9,7 +9,7 @@ import akka.util.ByteString import akka.event.LoggingAdapter import akka.actor.{ ActorRef, Props } import akka.stream.stage.PushPullStage -import akka.stream.scaladsl.OperationAttributes._ +import akka.stream.OperationAttributes._ import akka.stream.scaladsl._ import akka.stream._ import akka.http.engine.parsing._ diff --git a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala index dbc4072b37..51373de1d4 100644 --- a/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala +++ b/akka-http-core/src/main/scala/akka/http/model/HttpEntity.scala @@ -11,7 +11,7 @@ import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration import scala.collection.immutable import akka.util.ByteString -import akka.stream.scaladsl.OperationAttributes._ +import akka.stream.OperationAttributes._ import akka.stream.FlowMaterializer import akka.stream.scaladsl._ import akka.stream.TimerTransformer diff --git a/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala index 369b49e52f..db8a9e84fc 100644 --- a/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala @@ -11,15 +11,15 @@ import scala.collection.immutable import scala.concurrent.{ ExecutionContext, Future } import akka.util.ByteString import akka.http.model.RequestEntity -import akka.stream.{ FlowMaterializer, impl } +import akka.stream.{ FlowMaterializer, impl, OperationAttributes, ActorOperationAttributes } import akka.stream.scaladsl._ import akka.stream.stage._ -import OperationAttributes._ /** * INTERNAL API */ private[http] object StreamUtils { + import OperationAttributes._ /** * Creates a transformer that will call `f` for each incoming ByteString and output its result. After the complete @@ -178,7 +178,7 @@ private[http] object StreamUtils { } else ByteString.empty } - Source(() ⇒ iterator).withAttributes(OperationAttributes.dispatcher(fileIODispatcher)) + Source(() ⇒ iterator).withAttributes(ActorOperationAttributes.dispatcher(fileIODispatcher)) } diff --git a/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala index 7b4e9cb506..a704d8c5a1 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/parsing/RequestParserSpec.scala @@ -10,7 +10,7 @@ import scala.concurrent.duration._ import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers } import org.scalatest.matchers.Matcher import akka.stream.scaladsl._ -import akka.stream.scaladsl.OperationAttributes._ +import akka.stream.OperationAttributes._ import akka.stream.scaladsl.FlattenStrategy import akka.stream.ActorFlowMaterializer import akka.util.ByteString diff --git a/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala index 859ed8ea58..b2e4348641 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/parsing/ResponseParserSpec.scala @@ -10,7 +10,7 @@ import scala.concurrent.duration._ import org.scalatest.{ BeforeAndAfterAll, FreeSpec, Matchers } import org.scalatest.matchers.Matcher import akka.stream.scaladsl._ -import akka.stream.scaladsl.OperationAttributes._ +import akka.stream.OperationAttributes._ import akka.stream.scaladsl.FlattenStrategy import akka.stream.ActorFlowMaterializer import akka.util.ByteString diff --git a/akka-http-core/src/test/scala/akka/http/engine/rendering/RequestRendererSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/rendering/RequestRendererSpec.scala index ad8b029c46..c0657f2f26 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/rendering/RequestRendererSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/rendering/RequestRendererSpec.scala @@ -16,7 +16,7 @@ import akka.http.model._ import akka.http.model.headers._ import akka.http.util._ import akka.stream.scaladsl._ -import akka.stream.scaladsl.OperationAttributes._ +import akka.stream.OperationAttributes._ import akka.stream.ActorFlowMaterializer import HttpEntity._ import HttpMethods._ diff --git a/akka-http-core/src/test/scala/akka/http/engine/rendering/ResponseRendererSpec.scala b/akka-http-core/src/test/scala/akka/http/engine/rendering/ResponseRendererSpec.scala index 6acf05d03e..9e2a4b5739 100644 --- a/akka-http-core/src/test/scala/akka/http/engine/rendering/ResponseRendererSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/engine/rendering/ResponseRendererSpec.scala @@ -16,7 +16,7 @@ import akka.http.model.headers._ import akka.http.util._ import akka.util.ByteString import akka.stream.scaladsl._ -import akka.stream.scaladsl.OperationAttributes._ +import akka.stream.OperationAttributes._ import akka.stream.ActorFlowMaterializer import HttpEntity._ diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala index eb48886a60..ea3a89d214 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala @@ -5,8 +5,9 @@ package akka.stream.tck import akka.stream.{ ActorFlowMaterializer, ActorFlowMaterializerSettings } import akka.stream.impl.Stages.Identity -import akka.stream.scaladsl.{ OperationAttributes, Flow } +import akka.stream.scaladsl.Flow import org.reactivestreams.Processor +import akka.stream.OperationAttributes class FusableProcessorTest extends AkkaIdentityProcessorVerification[Int] { diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/MapTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/MapTest.scala index 1a29043097..1a3d606897 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/MapTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/MapTest.scala @@ -4,8 +4,9 @@ package akka.stream.tck import akka.stream.ActorFlowMaterializer -import akka.stream.scaladsl.{ Flow, OperationAttributes } +import akka.stream.scaladsl.Flow import org.reactivestreams.Processor +import akka.stream.OperationAttributes class MapTest extends AkkaIdentityProcessorVerification[Int] { diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/TransformProcessorTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/TransformProcessorTest.scala index fa5a8d08f4..bb6259c54c 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/TransformProcessorTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/TransformProcessorTest.scala @@ -7,7 +7,7 @@ import akka.stream.{ ActorFlowMaterializer, ActorFlowMaterializerSettings } import akka.stream.impl.ActorFlowMaterializerImpl import akka.stream.impl.Stages.Identity import akka.stream.scaladsl.Flow -import akka.stream.scaladsl.OperationAttributes._ +import akka.stream.OperationAttributes import akka.stream.stage.{ Context, PushStage } import org.reactivestreams.{ Processor, Publisher } diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/ChainSetup.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/ChainSetup.scala index e1ce807bdc..89506b1c28 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/ChainSetup.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/ChainSetup.scala @@ -5,6 +5,7 @@ import akka.stream.ActorFlowMaterializerSettings import akka.stream.scaladsl._ import org.reactivestreams.Publisher import akka.stream.ActorFlowMaterializer +import akka.stream.OperationAttributes class ChainSetup[In, Out]( stream: Flow[In, In, _] ⇒ Flow[In, Out, _], diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/OperationAttributesTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/OperationAttributesTest.java new file mode 100644 index 0000000000..c16eae7b7d --- /dev/null +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/OperationAttributesTest.java @@ -0,0 +1,36 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.javadsl; + +import static org.junit.Assert.assertEquals; +import java.util.Arrays; +import org.junit.Test; + +import akka.stream.OperationAttributes; + +public class OperationAttributesTest { + + final OperationAttributes attributes = + OperationAttributes.name("a") + .and(OperationAttributes.name("b")) + .and(OperationAttributes.inputBuffer(1, 2)); + + @Test + public void mustGetAttributesByClass() { + assertEquals( + Arrays.asList(new OperationAttributes.Name("a"), new OperationAttributes.Name("b")), + attributes.getAttributes(OperationAttributes.Name.class)); + assertEquals( + Arrays.asList(new OperationAttributes.InputBuffer(1, 2)), + attributes.getAttributes(OperationAttributes.InputBuffer.class)); + } + + @Test + public void mustGetAttributeByClass() { + assertEquals( + new OperationAttributes.Name("a"), + attributes.getAttribute(OperationAttributes.Name.class, new OperationAttributes.Name("default"))); + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala index 620fb828c8..f91606181b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala @@ -19,6 +19,7 @@ import scala.concurrent.duration._ import scala.util.control.NoStackTrace import akka.stream.impl.SubscriberSink import akka.stream.ActorFlowMaterializerSettings +import akka.stream.ActorOperationAttributes object ActorPublisherSpec { @@ -374,7 +375,7 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic implicit val materializer = ActorFlowMaterializer() val s = StreamTestKit.SubscriberProbe[String]() val ref = Source.actorPublisher(testPublisherProps(testActor, useTestDispatcher = false)) - .withAttributes(OperationAttributes.dispatcher("my-dispatcher1")) + .withAttributes(ActorOperationAttributes.dispatcher("my-dispatcher1")) .to(Sink(s)).run() ref ! ThreadName expectMsgType[String] should include("my-dispatcher1") @@ -384,7 +385,7 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic implicit val materializer = ActorFlowMaterializer() val s = StreamTestKit.SubscriberProbe[String]() val ref = Source.actorPublisher(testPublisherProps(testActor, useTestDispatcher = false).withDispatcher("my-dispatcher1")) - .withAttributes(OperationAttributes.dispatcher("my-dispatcher2")) + .withAttributes(ActorOperationAttributes.dispatcher("my-dispatcher2")) .to(Sink(s)).run() ref ! ThreadName expectMsgType[String] should include("my-dispatcher1") diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala index bd028eef1f..5188f81752 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/BidiFlowSpec.scala @@ -11,6 +11,7 @@ import akka.stream.ActorFlowMaterializer import scala.concurrent.Await import scala.concurrent.duration._ import scala.collection.immutable +import akka.stream.OperationAttributes class BidiFlowSpec extends AkkaSpec with ConversionCheckedTripleEquals { import OperationAttributes._ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala index 35e0452d71..4e806a0dc7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala @@ -3,7 +3,7 @@ */ package akka.stream.scaladsl -import akka.stream.scaladsl.OperationAttributes._ +import akka.stream.OperationAttributes._ import akka.stream.ActorFlowMaterializer import akka.stream.OverflowStrategy import akka.stream.testkit.AkkaSpec diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala index 31f4c5fc31..ad1581d3dc 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala @@ -5,13 +5,14 @@ package akka.stream.scaladsl import scala.concurrent.duration._ import scala.util.control.NoStackTrace - import akka.stream.ActorFlowMaterializer import akka.stream.ActorFlowMaterializerSettings import akka.stream.Supervision.resumingDecider import akka.stream.testkit._ import akka.stream.testkit.StreamTestKit.TE import org.reactivestreams.Publisher +import akka.stream.OperationAttributes +import akka.stream.ActorOperationAttributes class FlowGroupBySpec extends AkkaSpec { @@ -223,7 +224,7 @@ class FlowGroupBySpec extends AkkaSpec { val exc = TE("test") val publisher = Source(publisherProbeProbe) .groupBy(elem ⇒ if (elem == 2) throw exc else elem % 2) - .withAttributes(OperationAttributes.supervisionStrategy(resumingDecider)) + .withAttributes(ActorOperationAttributes.supervisionStrategy(resumingDecider)) .runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int, Unit])]() publisher.subscribe(subscriber) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala index 6c9d86169b..e0eb22181c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala @@ -14,7 +14,7 @@ import akka.stream.testkit.AkkaSpec import akka.stream.testkit.StreamTestKit import akka.testkit.TestLatch import akka.testkit.TestProbe -import akka.stream.scaladsl.OperationAttributes.supervisionStrategy +import akka.stream.ActorOperationAttributes.supervisionStrategy import akka.stream.Supervision.resumingDecider import akka.stream.impl.ReactiveStreamsCompliance import scala.util.Try diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala index be42724a40..dc598a1074 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala @@ -13,7 +13,7 @@ import akka.stream.testkit.AkkaSpec import akka.stream.testkit.StreamTestKit import akka.testkit.TestLatch import akka.testkit.TestProbe -import akka.stream.scaladsl.OperationAttributes.supervisionStrategy +import akka.stream.ActorOperationAttributes.supervisionStrategy import akka.stream.Supervision.resumingDecider import akka.stream.testkit.StreamTestKit.OnNext import akka.stream.testkit.StreamTestKit.OnComplete diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSectionSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSectionSpec.scala index 9589467635..c0bf45003c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSectionSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSectionSpec.scala @@ -3,7 +3,8 @@ */ package akka.stream.scaladsl -import akka.stream.scaladsl.OperationAttributes._ +import akka.stream.OperationAttributes._ +import akka.stream.ActorOperationAttributes._ import akka.stream.ActorFlowMaterializer import akka.stream.testkit.AkkaSpec import akka.actor.ActorRef diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala index 1c0d5b24ac..35c0ff46e0 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala @@ -4,7 +4,6 @@ package akka.stream.scaladsl import scala.concurrent.duration._ - import akka.stream.ActorFlowMaterializer import akka.stream.ActorFlowMaterializerSettings import akka.stream.Supervision.resumingDecider @@ -12,6 +11,7 @@ import akka.stream.testkit.AkkaSpec import akka.stream.testkit.StreamTestKit import akka.stream.testkit.StreamTestKit.TE import org.reactivestreams.Publisher +import akka.stream.ActorOperationAttributes class FlowSplitWhenSpec extends AkkaSpec { @@ -145,7 +145,7 @@ class FlowSplitWhenSpec extends AkkaSpec { val exc = TE("test") val publisher = Source(publisherProbeProbe) .splitWhen(elem ⇒ if (elem == 3) throw exc else elem % 3 == 0) - .withAttributes(OperationAttributes.supervisionStrategy(resumingDecider)) + .withAttributes(ActorOperationAttributes.supervisionStrategy(resumingDecider)) .runWith(Sink.publisher) val subscriber = StreamTestKit.SubscriberProbe[Source[Int, Unit]]() publisher.subscribe(subscriber) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSupervisionSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSupervisionSpec.scala index 524bcceeb5..8a679e674e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSupervisionSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSupervisionSpec.scala @@ -13,9 +13,10 @@ import scala.concurrent.Await import akka.stream.testkit.StreamTestKit.SubscriberProbe import akka.stream.Supervision import akka.stream.impl.ReactiveStreamsCompliance +import akka.stream.ActorOperationAttributes class FlowSupervisionSpec extends AkkaSpec { - import OperationAttributes.supervisionStrategy + import ActorOperationAttributes.supervisionStrategy implicit val materializer = ActorFlowMaterializer()(system) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphJunctionAttributesSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphJunctionAttributesSpec.scala index 4ad47ab7a3..2a28bae34e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphJunctionAttributesSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphJunctionAttributesSpec.scala @@ -3,7 +3,7 @@ */ package akka.stream.scaladsl -import akka.stream.scaladsl.OperationAttributes._ +import akka.stream.OperationAttributes._ import akka.stream.ActorFlowMaterializer import akka.stream.ActorFlowMaterializerSettings import akka.stream.testkit.AkkaSpec diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala index 0922001b11..0b97a0d0fd 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphOpsIntegrationSpec.scala @@ -10,6 +10,7 @@ import akka.stream.testkit.StreamTestKit.{ OnNext, SubscriberProbe } import akka.util.ByteString import akka.stream.{ Inlet, Outlet, Shape, Graph } import org.scalactic.ConversionCheckedTripleEquals +import akka.stream.OperationAttributes object GraphOpsIntegrationSpec { import FlowGraph.Implicits._ diff --git a/akka-stream/src/main/boilerplate/akka/stream/impl/GenJunctions.scala.template b/akka-stream/src/main/boilerplate/akka/stream/impl/GenJunctions.scala.template index 839e50f7c4..ca907f2b55 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/impl/GenJunctions.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/impl/GenJunctions.scala.template @@ -7,8 +7,8 @@ import akka.actor.Props import akka.stream._ import akka.stream.impl.Junctions.FanInModule import akka.stream.impl.StreamLayout.Module -import akka.stream.scaladsl.OperationAttributes -import akka.stream.scaladsl.OperationAttributes._ +import akka.stream.OperationAttributes +import akka.stream.OperationAttributes._ /** INTERNAL API: Boilerplate generated Junctions */ private[akka] object GenJunctions { diff --git a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala index 32321eb130..1abbf56624 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala @@ -14,7 +14,6 @@ import akka.actor.Props import akka.actor.ActorRef import akka.stream.javadsl.japi import scala.concurrent.ExecutionContextExecutor -import akka.stream.scaladsl.OperationAttributes object ActorFlowMaterializer { diff --git a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala index 238c4ac4e1..178d4c3305 100644 --- a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala @@ -5,7 +5,6 @@ package akka.stream import scala.concurrent.ExecutionContextExecutor import akka.japi -import akka.stream.scaladsl.OperationAttributes abstract class FlowMaterializer { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/OperationAttributes.scala b/akka-stream/src/main/scala/akka/stream/OperationAttributes.scala similarity index 50% rename from akka-stream/src/main/scala/akka/stream/scaladsl/OperationAttributes.scala rename to akka-stream/src/main/scala/akka/stream/OperationAttributes.scala index 6bbfac70de..4007489864 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/OperationAttributes.scala +++ b/akka-stream/src/main/scala/akka/stream/OperationAttributes.scala @@ -1,26 +1,62 @@ /** * Copyright (C) 2014 Typesafe Inc. */ -package akka.stream.scaladsl +package akka.stream +import scala.collection.immutable +import akka.stream.javadsl.japi import akka.stream.impl.Stages.StageModule -import akka.stream.Supervision /** * Holds attributes which can be used to alter [[Flow]] or [[FlowGraph]] * materialization. + * + * Note that more attributes for the [[ActorFlowMaterializer]] are defined in [[ActorOperationAttributes]]. */ -final case class OperationAttributes private (attributes: List[OperationAttributes.Attribute] = Nil) { +final case class OperationAttributes private (attributes: immutable.Seq[OperationAttributes.Attribute] = Nil) { import OperationAttributes._ + /** + * Java API + */ + def getAttributes(): java.util.List[Attribute] = { + import scala.collection.JavaConverters._ + attributes.asJava + } + + /** + * Java API: Get all attributes of a given `Class` or + * subclass thereof. + */ + def getAttributes[T <: Attribute](c: Class[T]): java.util.List[T] = + if (attributes.isEmpty) java.util.Collections.emptyList() + else { + val result = new java.util.ArrayList[T] + attributes.foreach { a ⇒ + if (c.isInstance(a)) + result.add(a.asInstanceOf[T]) + } + result + } + + /** + * Get first attribute of a given `Class` or subclass thereof. + * If no such attribute exists the `default` value is returned. + */ + def getAttribute[T <: Attribute](c: Class[T], default: T): T = + attributes.find(a ⇒ c.isInstance(a)) match { + case Some(a) ⇒ a.asInstanceOf[T] + case None ⇒ default + } + /** * Adds given attributes to the end of these attributes. */ def and(other: OperationAttributes): OperationAttributes = if (attributes.isEmpty) other else if (other.attributes.isEmpty) this - else OperationAttributes(attributes ::: other.attributes) + else OperationAttributes(attributes ++ other.attributes) /** * INTERNAL API @@ -33,9 +69,9 @@ final case class OperationAttributes private (attributes: List[OperationAttribut /** * INTERNAL API */ - private[akka] def name: String = nameLifted match { + private[akka] def nameOrDefault(default: String = "unknown-operation"): String = nameLifted match { case Some(name) ⇒ name - case _ ⇒ "unknown-operation" + case _ ⇒ default } /** @@ -44,21 +80,28 @@ final case class OperationAttributes private (attributes: List[OperationAttribut private[akka] def nameOption: Option[String] = attributes.collectFirst { case Name(name) ⇒ name } + /** + * INTERNAL API + */ private[akka] def transform(node: StageModule): StageModule = if ((this eq OperationAttributes.none) || (this eq node.attributes)) node else node.withAttributes(attributes = this and node.attributes) } +/** + * Note that more attributes for the [[ActorFlowMaterializer]] are defined in [[ActorOperationAttributes]]. + */ object OperationAttributes { - sealed trait Attribute + trait Attribute final case class Name(n: String) extends Attribute final case class InputBuffer(initial: Int, max: Int) extends Attribute - final case class Dispatcher(dispatcher: String) extends Attribute - final case class SupervisionStrategy(decider: Supervision.Decider) extends Attribute - private[OperationAttributes] def apply(attribute: Attribute): OperationAttributes = + /** + * INTERNAL API + */ + private[akka] def apply(attribute: Attribute): OperationAttributes = apply(List(attribute)) val none: OperationAttributes = OperationAttributes() @@ -76,14 +119,31 @@ object OperationAttributes { */ def inputBuffer(initial: Int, max: Int): OperationAttributes = OperationAttributes(InputBuffer(initial, max)) +} + +/** + * Attributes for the [[ActorFlowMaterializer]]. + * Note that more attributes defined in [[OperationAttributes]]. + */ +object ActorOperationAttributes { + import OperationAttributes._ + final case class Dispatcher(dispatcher: String) extends Attribute + final case class SupervisionStrategy(decider: Supervision.Decider) extends Attribute + /** * Specifies the name of the dispatcher. */ def dispatcher(dispatcher: String): OperationAttributes = OperationAttributes(Dispatcher(dispatcher)) /** - * Decides how exceptions from user are to be handled. + * Scala API: Decides how exceptions from user are to be handled. */ def supervisionStrategy(decider: Supervision.Decider): OperationAttributes = OperationAttributes(SupervisionStrategy(decider)) + + /** + * Java API: Decides how exceptions from application code are to be handled. + */ + def withSupervisionStrategy(decider: japi.Function[Throwable, Supervision.Directive]): OperationAttributes = + ActorOperationAttributes.supervisionStrategy(decider.apply _) } diff --git a/akka-stream/src/main/scala/akka/stream/extra/Timed.scala b/akka-stream/src/main/scala/akka/stream/extra/Timed.scala index a0eacb7890..a2968b8cad 100644 --- a/akka-stream/src/main/scala/akka/stream/extra/Timed.scala +++ b/akka-stream/src/main/scala/akka/stream/extra/Timed.scala @@ -7,7 +7,7 @@ import java.util.concurrent.atomic.AtomicLong import scala.concurrent.duration._ import scala.language.implicitConversions import scala.language.existentials -import akka.stream.scaladsl.OperationAttributes._ +import akka.stream.OperationAttributes._ import akka.stream.scaladsl.{ Keep, Source, Flow } import akka.stream.stage._ diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala index c2c718d3be..95de18ec38 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala @@ -40,6 +40,7 @@ private[akka] case class ActorFlowMaterializerImpl(override val settings: ActorF override def effectiveSettings(opAttr: OperationAttributes): ActorFlowMaterializerSettings = { import OperationAttributes._ + import ActorOperationAttributes._ opAttr.attributes.foldLeft(settings) { (s, attr) ⇒ attr match { case InputBuffer(initial, max) ⇒ s.withInputBuffer(initial, max) @@ -57,7 +58,7 @@ private[akka] case class ActorFlowMaterializerImpl(override val settings: ActorF private val flowName = createFlowName() private var nextId = 0 private def stageName(attr: OperationAttributes): String = { - val name = s"$flowName-$nextId-${attr.name}" + val name = s"$flowName-$nextId-${attr.nameOrDefault()}" nextId += 1 name } @@ -66,7 +67,6 @@ private[akka] case class ActorFlowMaterializerImpl(override val settings: ActorF def newMaterializationContext() = new MaterializationContext(ActorFlowMaterializerImpl.this, effectiveAttributes, stageName(effectiveAttributes)) - atomic match { case sink: SinkModule[_, _] ⇒ val (sub, mat) = sink.create(newMaterializationContext()) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Junctions.scala b/akka-stream/src/main/scala/akka/stream/impl/Junctions.scala index 8acaae9167..fa38dca65f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Junctions.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Junctions.scala @@ -5,7 +5,7 @@ package akka.stream.impl import akka.stream.impl.StreamLayout.Module import akka.stream.scaladsl.FlexiRoute.RouteLogic -import akka.stream.scaladsl.OperationAttributes +import akka.stream.OperationAttributes import akka.stream.{ Inlet, Outlet, Shape, InPort, OutPort } import akka.stream.scaladsl.FlexiMerge.MergeLogic import akka.stream.UniformFanInShape diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 9e89728751..5e425e326b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -6,7 +6,7 @@ package akka.stream.impl import java.util.concurrent.atomic.AtomicReference import akka.actor.{ ActorRef, Props } import akka.stream.impl.StreamLayout.Module -import akka.stream.scaladsl.OperationAttributes +import akka.stream.OperationAttributes import akka.stream.{ Inlet, Shape, SinkShape } import org.reactivestreams.{ Publisher, Subscriber, Subscription } import scala.annotation.unchecked.uncheckedVariance diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala index fef1eed7c7..869144bb73 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala @@ -6,7 +6,7 @@ package akka.stream.impl import java.util.concurrent.atomic.AtomicBoolean import akka.actor.{ ActorRef, Cancellable, PoisonPill, Props } import akka.stream.impl.StreamLayout.Module -import akka.stream.scaladsl.OperationAttributes +import akka.stream.OperationAttributes import akka.stream.{ Outlet, OverflowStrategy, Shape, SourceShape } import org.reactivestreams._ import scala.annotation.unchecked.uncheckedVariance diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 3de5723bd3..b7446bf8fd 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -5,8 +5,8 @@ package akka.stream.impl import akka.event.Logging import akka.stream.{ OverflowStrategy, TimerTransformer } -import akka.stream.scaladsl.{ OperationAttributes } -import akka.stream.scaladsl.OperationAttributes._ +import akka.stream.OperationAttributes +import akka.stream.OperationAttributes._ import akka.stream.stage.Stage import org.reactivestreams.Processor import StreamLayout._ diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index 9554f17e54..f6001f5e6b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -6,7 +6,7 @@ package akka.stream.impl import java.util.concurrent.atomic.{ AtomicInteger, AtomicBoolean, AtomicReference } import akka.stream.impl.StreamLayout.Module -import akka.stream.scaladsl.{ Keep, OperationAttributes } +import akka.stream.scaladsl.Keep import akka.stream._ import org.reactivestreams.{ Subscription, Publisher, Subscriber } import akka.event.Logging.simpleName diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala index 098884eecd..bf0cc80ecd 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala @@ -318,7 +318,7 @@ abstract class FlexiMerge[T, Out, S <: Shape](val shape: S, val attributes: Oper def createMergeLogic(s: S): MergeLogic[T, Out] - override def toString = attributes.asScala.nameLifted match { + override def toString = attributes.nameLifted match { case Some(n) ⇒ n case None ⇒ super.toString } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlexiRoute.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlexiRoute.scala index c62b587126..0ec9ad0093 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlexiRoute.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlexiRoute.scala @@ -285,7 +285,7 @@ abstract class FlexiRoute[In, S <: Shape](val shape: S, val attributes: Operatio */ def createRouteLogic(s: S): RouteLogic[In] - override def toString = attributes.asScala.nameLifted match { + override def toString = attributes.nameLifted match { case Some(n) ⇒ n case None ⇒ super.toString } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index ed417609cf..596691fd7c 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -409,7 +409,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph new Flow(delegate.concat(second.asScala).mapMaterialized(p ⇒ Pair(p._1, p._2))) def withAttributes(attr: OperationAttributes): javadsl.Flow[In, Out, Mat] = - new Flow(delegate.withAttributes(attr.asScala)) + new Flow(delegate.withAttributes(attr)) def named(name: String): javadsl.Flow[In, Out, Mat] = new Flow(delegate.named(name)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index 3a765947aa..2f911b9b82 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -26,7 +26,7 @@ object Merge { * @param attributes optional attributes for this vertex */ def create[T](outputCount: Int, attributes: OperationAttributes): Graph[UniformFanInShape[T, T], Unit] = - scaladsl.Merge(outputCount, attributes.asScala) + scaladsl.Merge(outputCount, attributes) /** * Create a new `Merge` vertex with the specified output type. @@ -67,7 +67,7 @@ object MergePreferred { * @param attributes optional attributes for this vertex */ def create[T](outputCount: Int, attributes: OperationAttributes): Graph[scaladsl.MergePreferred.MergePreferredShape[T], Unit] = - scaladsl.MergePreferred(outputCount, attributes.asScala) + scaladsl.MergePreferred(outputCount, attributes) /** * Create a new `MergePreferred` vertex with the specified output type. @@ -106,7 +106,7 @@ object Broadcast { * @param attributes optional attributes for this vertex */ def create[T](outputCount: Int, attributes: OperationAttributes): Graph[UniformFanOutShape[T, T], Unit] = - scaladsl.Broadcast(outputCount, attributes.asScala) + scaladsl.Broadcast(outputCount, attributes) /** * Create a new `Broadcast` vertex with the specified input type. @@ -146,7 +146,7 @@ object Balance { * @param attributes optional attributes for this vertex */ def create[T](outputCount: Int, waitForAllDownstreams: Boolean, attributes: OperationAttributes): Graph[UniformFanOutShape[T, T], Unit] = - scaladsl.Balance(outputCount, waitForAllDownstreams, attributes.asScala) + scaladsl.Balance(outputCount, waitForAllDownstreams, attributes) /** * Create a new `Balance` vertex with the specified input type. @@ -202,7 +202,7 @@ object Unzip { */ def create[A, B](attributes: OperationAttributes): Graph[FanOutShape2[A Pair B, A, B], Unit] = scaladsl.FlowGraph.partial() { implicit b ⇒ - val unzip = b.add(scaladsl.Unzip[A, B](attributes.asScala)) + val unzip = b.add(scaladsl.Unzip[A, B](attributes)) val tuple = b.add(scaladsl.Flow[A Pair B].map(p ⇒ (p.first, p.second))) b.addEdge(tuple.outlet, unzip.in) new FanOutShape2(FanOutShape.Ports(tuple.inlet, unzip.out0 :: unzip.out1 :: Nil)) @@ -253,7 +253,7 @@ object Concat { * in the `FlowGraph`. This method creates a new instance every time it * is called and those instances are not `equal`. */ - def create[T](attributes: OperationAttributes): Graph[UniformFanInShape[T, T], Unit] = scaladsl.Concat[T](attributes.asScala) + def create[T](attributes: OperationAttributes): Graph[UniformFanInShape[T, T], Unit] = scaladsl.Concat[T](attributes) /** * Create a new anonymous `Concat` vertex with the specified input types. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/OperationAttributes.scala b/akka-stream/src/main/scala/akka/stream/javadsl/OperationAttributes.scala deleted file mode 100644 index 24df702dfb..0000000000 --- a/akka-stream/src/main/scala/akka/stream/javadsl/OperationAttributes.scala +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.javadsl - -import akka.stream.scaladsl -import akka.stream.Supervision - -/** - * Holds attributes which can be used to alter [[Flow]] or [[FlowGraph]] - * materialization. - */ -abstract class OperationAttributes private () { - private[akka] def asScala: scaladsl.OperationAttributes - - /** - * Adds given attributes to the end of these attributes. - */ - def and(other: OperationAttributes) = new OperationAttributes { - private[akka] def asScala = this.asScala and other.asScala - } -} - -/** - * Various attributes that can be applied to [[Flow]] or [[FlowGraph]] - * materialization. - */ -object OperationAttributes { - - /** - * Specifies the name of the operation. - */ - def name(name: String): OperationAttributes = - if (name eq null) none - else - new OperationAttributes { - private[akka] def asScala = scaladsl.OperationAttributes.name(name) - } - - /** - * Specifies the initial and maximum size of the input buffer. - */ - def inputBuffer(initial: Int, max: Int): OperationAttributes = new OperationAttributes { - private[akka] def asScala = scaladsl.OperationAttributes.inputBuffer(initial, max) - } - - /** - * Specifies the name of the dispatcher. - */ - def dispatcher(dispatcher: String): OperationAttributes = new OperationAttributes { - private[akka] def asScala = scaladsl.OperationAttributes.dispatcher(dispatcher) - } - - /** - * Decides how exceptions from application code are to be handled. - */ - def supervisionStrategy(decider: japi.Function[Throwable, Supervision.Directive]): OperationAttributes = - new OperationAttributes { - private[akka] def asScala = scaladsl.OperationAttributes.supervisionStrategy(e ⇒ decider.apply(e)) - } - - private[akka] val none: OperationAttributes = new OperationAttributes { - private[akka] def asScala = scaladsl.OperationAttributes.none - } -} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 7f0acde809..4939aacbe4 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -149,7 +149,7 @@ class Sink[-In, +Mat](delegate: scaladsl.Sink[In, Mat]) extends Graph[SinkShape[ new Sink(delegate.mapMaterialized(f.apply _)) def withAttributes(attr: OperationAttributes): javadsl.Sink[In, Mat] = - new Sink(delegate.withAttributes(attr.asScala)) + new Sink(delegate.withAttributes(attr)) def named(name: String): javadsl.Sink[In, Mat] = new Sink(delegate.named(name)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 21e7d114c5..3caa57cc82 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -499,7 +499,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour new Source(delegate.flatten(strategy)) def withAttributes(attr: OperationAttributes): javadsl.Source[Out, Mat] = - new Source(delegate.withAttributes(attr.asScala)) + new Source(delegate.withAttributes(attr)) def named(name: String): javadsl.Source[Out, Mat] = new Source(delegate.named(name)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala index 7dac11cbce..ca265a0b80 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala @@ -4,7 +4,7 @@ package akka.stream.scaladsl import akka.stream.scaladsl.FlexiMerge.MergeLogic -import akka.stream.{ Inlet, Shape, InPort, Graph } +import akka.stream.{ Inlet, Shape, InPort, Graph, OperationAttributes } import scala.collection.immutable import scala.collection.immutable.Seq import akka.stream.impl.StreamLayout diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala index 46a75722de..70eb2cb19d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala @@ -4,7 +4,7 @@ package akka.stream.scaladsl import akka.stream.impl.StreamLayout -import akka.stream.{ Outlet, Shape, OutPort, Graph } +import akka.stream.{ Outlet, Shape, OutPort, Graph, OperationAttributes } import scala.collection.immutable import akka.stream.impl.Junctions.FlexiRouteModule diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index f8badfa779..640f92b702 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -6,7 +6,7 @@ package akka.stream.scaladsl import akka.stream.impl.Stages.{ MaterializingStageFactory, StageModule } import akka.stream.impl.StreamLayout.{ EmptyModule, Module } import akka.stream._ -import akka.stream.scaladsl.OperationAttributes._ +import akka.stream.OperationAttributes._ import akka.util.Collections.EmptyImmutableSeq import org.reactivestreams.Processor import scala.annotation.unchecked.uncheckedVariance diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 2a6451e6ea..e5bdd48b7d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -6,8 +6,8 @@ package akka.stream.scaladsl import akka.stream.javadsl import akka.actor.{ ActorRef, Props } import akka.stream.impl._ -import akka.stream.{ SinkShape, Inlet, Outlet, Graph } -import akka.stream.scaladsl.OperationAttributes._ +import akka.stream.{ SinkShape, Inlet, Outlet, Graph, OperationAttributes } +import akka.stream.OperationAttributes._ import akka.stream.stage.{ TerminationDirective, Directive, Context, PushStage } import org.reactivestreams.{ Publisher, Subscriber } import scala.annotation.unchecked.uncheckedVariance diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 5487763456..6f2b9a0ba9 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -24,6 +24,7 @@ import scala.concurrent.Promise import org.reactivestreams.Subscriber import akka.stream.stage.SyncDirective import akka.stream.OverflowStrategy +import akka.stream.OperationAttributes /** * A `Source` is a set of stream processing steps that has one open output. It can comprise