diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/AttributesTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/AttributesTest.java index 54e9d8c4ec..33332a6cb2 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/AttributesTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/AttributesTest.java @@ -5,6 +5,8 @@ package akka.stream.javadsl; import static org.junit.Assert.assertEquals; import java.util.Arrays; +import java.util.Collections; + import org.junit.Test; import akka.stream.Attributes; @@ -22,7 +24,7 @@ public class AttributesTest { Arrays.asList(new Attributes.Name("a"), new Attributes.Name("b")), attributes.getAttributeList(Attributes.Name.class)); assertEquals( - Arrays.asList(new Attributes.InputBuffer(1, 2)), + Collections.singletonList(new Attributes.InputBuffer(1, 2)), attributes.getAttributeList(Attributes.InputBuffer.class)); } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java index 5fac956e12..5dc2b093b8 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/BidiFlowTest.java @@ -263,7 +263,7 @@ public class BidiFlowTest extends StreamTest { final Future> r = result.second(); assertEquals((Integer) 1, Await.result(l, oneSec)); assertEquals((Integer) 42, Await.result(m, oneSec)); - final Long[] rr = Await.result(r, oneSec).toArray(new Long[0]); + final Long[] rr = Await.result(r, oneSec).toArray(new Long[2]); Arrays.sort(rr); assertArrayEquals(new Long[] { 3L, 12L }, rr); } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java index 18d072930a..72d1a1ae87 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java @@ -3,8 +3,6 @@ */ package akka.stream.javadsl; -import akka.actor.ActorRef; -import akka.dispatch.japi; import akka.japi.Pair; import akka.pattern.Patterns; import akka.japi.tuple.Tuple4; @@ -63,7 +61,7 @@ public class FlowGraphTest extends StreamTest { final Flow f2 = Flow.of(String.class).transform(FlowGraphTest.this. op()).named("f2"); @SuppressWarnings("unused") - final Flow f3 = + final Flow f3 = Flow.of(String.class).transform(FlowGraphTest.this. op()).named("f3"); final Source in1 = Source.from(Arrays.asList("a", "b", "c")); @@ -282,6 +280,7 @@ public class FlowGraphTest extends StreamTest { @Test public void mustBeAbleToUseMatValue() throws Exception { + @SuppressWarnings("unused") final Source in1 = Source.single(1); final TestProbe probe = TestProbe.apply(system); diff --git a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala index af3e7e75cc..4343b1f061 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala @@ -168,8 +168,8 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic val s = TestSubscriber.manualProbe[String]() ActorPublisher[String](ref).subscribe(s) ref ! Err("wrong") - s.expectSubscription - s.expectError.getMessage should be("wrong") + s.expectSubscription() + s.expectError().getMessage should be("wrong") } "not terminate after signalling onError" in { @@ -177,10 +177,10 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic val ref = system.actorOf(testPublisherProps(probe.ref)) val s = TestSubscriber.manualProbe[String]() ActorPublisher[String](ref).subscribe(s) - s.expectSubscription + s.expectSubscription() probe.watch(ref) ref ! Err("wrong") - s.expectError.getMessage should be("wrong") + s.expectError().getMessage should be("wrong") probe.expectNoMsg(200.millis) } @@ -189,10 +189,10 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic val ref = system.actorOf(testPublisherProps(probe.ref)) val s = TestSubscriber.manualProbe[String]() ActorPublisher[String](ref).subscribe(s) - s.expectSubscription + s.expectSubscription() probe.watch(ref) ref ! ErrThenStop("wrong") - s.expectError.getMessage should be("wrong") + s.expectError().getMessage should be("wrong") probe.expectTerminated(ref, 3.seconds) } @@ -202,7 +202,7 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic ref ! Err("early err") val s = TestSubscriber.manualProbe[String]() ActorPublisher[String](ref).subscribe(s) - s.expectSubscriptionAndError.getMessage should be("early err") + s.expectSubscriptionAndError().getMessage should be("early err") } "drop onNext elements after cancel" in { @@ -246,7 +246,7 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic ref ! Produce("elem-1") ref ! Complete s.expectNext("elem-1") - s.expectComplete + s.expectComplete() } "not terminate after signalling onComplete" in { @@ -254,14 +254,14 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic val ref = system.actorOf(testPublisherProps(probe.ref)) val s = TestSubscriber.manualProbe[String]() ActorPublisher[String](ref).subscribe(s) - val sub = s.expectSubscription + val sub = s.expectSubscription() sub.request(3) probe.expectMsg(TotalDemand(3)) probe.watch(ref) ref ! Produce("elem-1") ref ! Complete s.expectNext("elem-1") - s.expectComplete + s.expectComplete() probe.expectNoMsg(200.millis) } @@ -270,14 +270,14 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic val ref = system.actorOf(testPublisherProps(probe.ref)) val s = TestSubscriber.manualProbe[String]() ActorPublisher[String](ref).subscribe(s) - val sub = s.expectSubscription + val sub = s.expectSubscription() sub.request(3) probe.expectMsg(TotalDemand(3)) probe.watch(ref) ref ! Produce("elem-1") ref ! CompleteThenStop s.expectNext("elem-1") - s.expectComplete + s.expectComplete() probe.expectTerminated(ref, 3.seconds) } @@ -287,7 +287,7 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic ref ! Complete val s = TestSubscriber.manualProbe[String]() ActorPublisher[String](ref).subscribe(s) - s.expectSubscriptionAndComplete + s.expectSubscriptionAndComplete() } "only allow one subscriber" in { @@ -295,10 +295,10 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic val ref = system.actorOf(testPublisherProps(probe.ref)) val s = TestSubscriber.manualProbe[String]() ActorPublisher[String](ref).subscribe(s) - s.expectSubscription + s.expectSubscription() val s2 = TestSubscriber.manualProbe[String]() ActorPublisher[String](ref).subscribe(s2) - s2.expectSubscriptionAndError.getClass should be(classOf[IllegalStateException]) + s2.expectSubscriptionAndError().getClass should be(classOf[IllegalStateException]) } "signal onCompete when actor is stopped" in { @@ -306,9 +306,9 @@ class ActorPublisherSpec extends AkkaSpec(ActorPublisherSpec.config) with Implic val ref = system.actorOf(testPublisherProps(probe.ref)) val s = TestSubscriber.manualProbe[String]() ActorPublisher[String](ref).subscribe(s) - s.expectSubscription + s.expectSubscription() ref ! PoisonPill - s.expectComplete + s.expectComplete() } "work together with Flow and ActorSubscriber" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala index ae66bd5c6a..bd9b5b25d8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/StreamLayoutSpec.scala @@ -212,7 +212,7 @@ class StreamLayoutSpec extends AkkaSpec { def getAllAtomic(module: Module): Set[Module] = { val (atomics, composites) = module.subModules.partition(_.isAtomic) - atomics ++ composites.map(getAllAtomic).flatten + atomics ++ composites.flatMap(getAllAtomic) } val allAtomic = getAllAtomic(topLevel) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FramingSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FramingSpec.scala index 560fcd7aa3..34ca9fe8ae 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FramingSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FramingSpec.scala @@ -67,7 +67,7 @@ class FramingSpec extends AkkaSpec { .named("lineFraming") def completeTestSequences(delimiter: ByteString): immutable.Iterable[ByteString] = - for (prefix ← 0 until delimiter.size; s ← baseTestSequences) + for (prefix ← delimiter.indices; s ← baseTestSequences) yield delimiter.take(prefix) ++ s "work with various delimiters and test sequences" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala index 00a0899095..297329f465 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala @@ -340,8 +340,6 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround- } "properly full-close if requested" in assertAllStagesStopped { - import system.dispatcher - val serverAddress = temporaryServerAddress() val writeButIgnoreRead: Flow[ByteString, ByteString, Unit] = Flow.wrap(Sink.ignore, Source.single(ByteString("Early response")))(Keep.right) @@ -362,8 +360,6 @@ class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround- } "Echo should work even if server is in full close mode" in { - import system.dispatcher - val serverAddress = temporaryServerAddress() val binding = diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala index 1c24fc6815..949cb77b9f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala @@ -1,6 +1,5 @@ package akka.stream.io -import java.net.InetSocketAddress import java.security.KeyStore import java.security.SecureRandom import java.util.concurrent.TimeoutException @@ -15,7 +14,6 @@ import akka.actor.ActorSystem import akka.pattern.{ after ⇒ later } import akka.stream.ActorMaterializer import akka.stream.scaladsl._ -import akka.stream.scaladsl.FlowGraph.Implicits._ import akka.stream.stage._ import akka.stream.testkit._ import akka.stream.testkit.Utils._ @@ -60,7 +58,7 @@ object TlsSpec { private var last: ByteString = _ override def preStart(ctx: AsyncContext[ByteString, Unit]) = { - val cb = ctx.getAsyncCallback() + val cb = ctx.getAsyncCallback system.scheduler.scheduleOnce(duration)(cb.invoke(()))(system.dispatcher) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAppendSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAppendSpec.scala index 62d43355d0..f4f7f7bba2 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAppendSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowAppendSpec.scala @@ -45,7 +45,7 @@ class FlowAppendSpec extends AkkaSpec with River { trait River { self: Matchers ⇒ - val elements = (1 to 10) + val elements = 1 to 10 val otherFlow = Flow[Int].map(_.toString) def riverOf[T](flowConstructor: Subscriber[T] ⇒ Unit)(implicit system: ActorSystem) = { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala index f9d68afa0a..5dbedeacd3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala @@ -79,7 +79,7 @@ class FlowExpandSpec extends AkkaSpec { .expand(seed = i ⇒ i)(extrapolate = i ⇒ (i, i)) .runFold(Set.empty[Int])(_ + _) - Await.result(future, 10.seconds) should contain theSameElementsAs ((1 to 100).toSet) + Await.result(future, 10.seconds) should contain theSameElementsAs (1 to 100).toSet } "backpressure publisher when subscriber is slower" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala index c20718261f..fe8a92390c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala @@ -6,7 +6,7 @@ package akka.stream.scaladsl import scala.concurrent.Await import scala.util.control.NoStackTrace -import akka.stream.{ OverflowStrategy, ActorMaterializer } +import akka.stream.ActorMaterializer import akka.stream.testkit.AkkaSpec import akka.stream.testkit.Utils._ import scala.concurrent.duration._ @@ -16,7 +16,7 @@ class FlowFoldSpec extends AkkaSpec { "A Fold" must { val input = 1 to 100 - val expected = input.fold(0)(_ + _) + val expected = input.sum val inputSource = Source(input).filter(_ ⇒ true).map(identity) val foldSource = inputSource.fold[Int](0)(_ + _).filter(_ ⇒ true).map(identity) val foldFlow = Flow[Int].filter(_ ⇒ true).map(identity).fold(0)(_ + _).filter(_ ⇒ true).map(identity) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala index d534ff3077..54c22583e8 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala @@ -39,11 +39,11 @@ class FlowForeachSpec extends AkkaSpec { Source(p).runForeach(testActor ! _) onFailure { case ex ⇒ testActor ! ex } - val proc = p.expectSubscription + val proc = p.expectSubscription() proc.expectRequest() - val ex = new RuntimeException("ex") with NoStackTrace - proc.sendError(ex) - expectMsg(ex) + val rte = new RuntimeException("ex") with NoStackTrace + proc.sendError(rte) + expectMsg(rte) } "complete future with failure when function throws" in assertAllStagesStopped { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedSpec.scala index d741e74f73..8a1d15c2a2 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedSpec.scala @@ -28,7 +28,7 @@ class FlowGroupedSpec extends AkkaSpec with ScriptedTest { "group with rest" in { val testLen = random.nextInt(1, 16) - def script = Script((TestConfig.RandomTestRange.map { _ ⇒ randomTest(testLen) } :+ randomTest(1)): _*) + def script = Script(TestConfig.RandomTestRange.map { _ ⇒ randomTest(testLen) } :+ randomTest(1): _*) TestConfig.RandomTestRange foreach (_ ⇒ runScript(script, settings)(_.grouped(testLen))) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala index f9503559b6..0c495b6fe4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala @@ -39,7 +39,7 @@ class FlowIterableSpec extends AbstractFlowIteratorSpec { c.expectNext(1) c.expectNoMsg(100.millis) sub.request(2) - c.expectError.getMessage should be("not two") + c.expectError().getMessage should be("not two") sub.request(2) c.expectNoMsg(100.millis) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala index 57c0d90e32..ecce6e93fd 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala @@ -28,7 +28,7 @@ object FlowMapAsyncSpec { override def onPush(elem: In, ctx: AsyncContext[Out, Try[Out]]) = { val future = f(elem) - val cb = ctx.getAsyncCallback() + val cb = ctx.getAsyncCallback future.onComplete(cb.invoke) ctx.holdUpstream() } @@ -132,7 +132,7 @@ class FlowMapAsyncSpec extends AkkaSpec { }).to(Sink(c)).run() val sub = c.expectSubscription() sub.request(10) - c.expectError.getMessage should be("err1") + c.expectError().getMessage should be("err1") latch.countDown() } @@ -151,7 +151,7 @@ class FlowMapAsyncSpec extends AkkaSpec { to(Sink(c)).run() val sub = c.expectSubscription() sub.request(10) - c.expectError.getMessage should be("err2") + c.expectError().getMessage should be("err2") latch.countDown() } @@ -216,7 +216,7 @@ class FlowMapAsyncSpec extends AkkaSpec { val p = Source(List("a", "b")).mapAsync(4)(elem ⇒ Future.successful(null)).to(Sink(c)).run() val sub = c.expectSubscription() sub.request(10) - c.expectError.getMessage should be(ReactiveStreamsCompliance.ElementMustNotBeNullMsg) + c.expectError().getMessage should be(ReactiveStreamsCompliance.ElementMustNotBeNullMsg) } "resume when future is completed with null" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSectionSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSectionSpec.scala index 4c431cc762..c12b6f19b3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSectionSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSectionSpec.scala @@ -78,8 +78,8 @@ class FlowSectionSpec extends AkkaSpec(FlowSectionSpec.config) { //FIXME: Flow has no simple toString anymore pending val n = "Uppercase reverser" - val f1 = Flow[String].map(_.toLowerCase()) - val f2 = Flow[String].map(_.toUpperCase).map(_.reverse).named(n).map(_.toLowerCase()) + val f1 = Flow[String].map(_.toLowerCase) + val f2 = Flow[String].map(_.toUpperCase).map(_.reverse).named(n).map(_.toLowerCase) f1.via(f2).toString should include(n) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 9adaba3d36..17c5eac49a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -160,12 +160,12 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece val source: Publisher[String] = Source(List("1", "2", "3")).runWith(Sink.publisher) source.subscribe(flowIn) - val sub1 = c1.expectSubscription + val sub1 = c1.expectSubscription() sub1.request(3) c1.expectNext("1") c1.expectNext("2") c1.expectNext("3") - c1.expectComplete + c1.expectComplete() } "materialize into Publisher/Subscriber and transformation processor" in { @@ -174,7 +174,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece val c1 = TestSubscriber.manualProbe[String]() flowOut.subscribe(c1) - val sub1 = c1.expectSubscription + val sub1 = c1.expectSubscription() sub1.request(3) c1.expectNoMsg(200.millis) @@ -184,7 +184,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece c1.expectNext("1") c1.expectNext("2") c1.expectNext("3") - c1.expectComplete + c1.expectComplete() } "materialize into Publisher/Subscriber and multiple transformation processors" in { @@ -193,7 +193,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece val c1 = TestSubscriber.manualProbe[String]() flowOut.subscribe(c1) - val sub1 = c1.expectSubscription + val sub1 = c1.expectSubscription() sub1.request(3) c1.expectNoMsg(200.millis) @@ -203,7 +203,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece c1.expectNext("elem-1") c1.expectNext("elem-2") c1.expectNext("elem-3") - c1.expectComplete + c1.expectComplete() } "subscribe Subscriber" in { @@ -213,12 +213,12 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece val publisher: Publisher[String] = Source(List("1", "2", "3")).runWith(Sink.publisher) Source(publisher).to(sink).run() - val sub1 = c1.expectSubscription + val sub1 = c1.expectSubscription() sub1.request(3) c1.expectNext("1") c1.expectNext("2") c1.expectNext("3") - c1.expectComplete + c1.expectComplete() } "perform transformation operation" in { @@ -239,12 +239,12 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece val publisher: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher) Source(publisher).to(sink).run() - val sub1 = c1.expectSubscription + val sub1 = c1.expectSubscription() sub1.request(3) c1.expectNext("1") c1.expectNext("2") c1.expectNext("3") - c1.expectComplete + c1.expectComplete() } "be materializable several times with fanout publisher" in assertAllStagesStopped { @@ -258,26 +258,26 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece p2.subscribe(s2) p2.subscribe(s3) - val sub1 = s1.expectSubscription - val sub2 = s2.expectSubscription - val sub3 = s3.expectSubscription + val sub1 = s1.expectSubscription() + val sub2 = s2.expectSubscription() + val sub3 = s3.expectSubscription() sub1.request(3) s1.expectNext("1") s1.expectNext("2") s1.expectNext("3") - s1.expectComplete + s1.expectComplete() sub2.request(3) sub3.request(3) s2.expectNext("1") s2.expectNext("2") s2.expectNext("3") - s2.expectComplete + s2.expectComplete() s3.expectNext("1") s3.expectNext("2") s3.expectNext("3") - s3.expectComplete + s3.expectComplete() } "be covariant" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HeadSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HeadSinkSpec.scala index 981f61ee44..52c3516b57 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HeadSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HeadSinkSpec.scala @@ -27,7 +27,7 @@ class HeadSinkSpec extends AkkaSpec with ScriptedTest { "yield the first value" in assertAllStagesStopped { val p = TestPublisher.manualProbe[Int]() val f: Future[Int] = Source(p).map(identity).runWith(Sink.head) - val proc = p.expectSubscription + val proc = p.expectSubscription() proc.expectRequest() proc.sendNext(42) Await.result(f, 100.millis) should be(42) @@ -41,7 +41,7 @@ class HeadSinkSpec extends AkkaSpec with ScriptedTest { val (subscriber, future) = s.toMat(f)(Keep.both).run() p.subscribe(subscriber) - val proc = p.expectSubscription + val proc = p.expectSubscription() proc.expectRequest() proc.sendNext(42) Await.result(future, 100.millis) should be(42) @@ -51,7 +51,7 @@ class HeadSinkSpec extends AkkaSpec with ScriptedTest { "yield the first error" in assertAllStagesStopped { val p = TestPublisher.manualProbe[Int]() val f = Source(p).runWith(Sink.head) - val proc = p.expectSubscription + val proc = p.expectSubscription() proc.expectRequest() val ex = new RuntimeException("ex") proc.sendError(ex) @@ -62,12 +62,12 @@ class HeadSinkSpec extends AkkaSpec with ScriptedTest { "yield NoSuchElementExcption for empty stream" in assertAllStagesStopped { val p = TestPublisher.manualProbe[Int]() val f = Source(p).runWith(Sink.head) - val proc = p.expectSubscription + val proc = p.expectSubscription() proc.expectRequest() proc.sendComplete() Await.ready(f, 100.millis) f.value.get match { - case Failure(e: NoSuchElementException) ⇒ e.getMessage() should be("empty stream") + case Failure(e: NoSuchElementException) ⇒ e.getMessage should be("empty stream") case x ⇒ fail("expected NoSuchElementException, got " + x) } } diff --git a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala index 546d1de89a..5dadcea25d 100644 --- a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala @@ -174,7 +174,7 @@ trait ActorPublisher[T] extends Actor { * otherwise `onNext` will throw `IllegalStateException`. */ def onNext(element: T): Unit = lifecycleState match { - case Active | PreSubscriber ⇒ + case Active | PreSubscriber | CompleteThenStop ⇒ if (demand > 0) { demand -= 1 tryOnNext(subscriber, element) @@ -193,7 +193,7 @@ trait ActorPublisher[T] extends Actor { * call [[#onNext]], [[#onError]] and [[#onComplete]]. */ def onComplete(): Unit = lifecycleState match { - case Active | PreSubscriber ⇒ + case Active | PreSubscriber | CompleteThenStop ⇒ lifecycleState = Completed if (subscriber ne null) // otherwise onComplete will be called when the subscription arrives try tryOnComplete(subscriber) finally subscriber = null @@ -226,8 +226,8 @@ trait ActorPublisher[T] extends Actor { * call [[#onNext]], [[#onError]] and [[#onComplete]]. */ def onError(cause: Throwable): Unit = lifecycleState match { - case Active | PreSubscriber ⇒ - lifecycleState = ErrorEmitted(cause, false) + case Active | PreSubscriber | CompleteThenStop ⇒ + lifecycleState = ErrorEmitted(cause, stop = false) if (subscriber ne null) // otherwise onError will be called when the subscription arrives try tryOnError(subscriber, cause) finally subscriber = null case _: ErrorEmitted ⇒ diff --git a/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala b/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala index 1f5ab8d933..7ea07d308e 100644 --- a/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala @@ -310,7 +310,7 @@ private[akka] final class ActorSubscriberImpl[T](val impl: ActorRef) extends Sub private[akka] object ActorSubscriberState extends ExtensionId[ActorSubscriberState] with ExtensionIdProvider { override def get(system: ActorSystem): ActorSubscriberState = super.get(system) - override def lookup = ActorSubscriberState + override def lookup() = ActorSubscriberState override def createExtension(system: ExtendedActorSystem): ActorSubscriberState = new ActorSubscriberState diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index b95c808801..c14b759d9a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -15,12 +15,9 @@ import akka.stream.impl.Junctions._ import akka.stream.impl.StreamLayout.Module import akka.stream.impl.fusing.ActorInterpreter import akka.stream.impl.io.SslTlsCipherActor -import akka.stream.scaladsl._ import akka.stream._ -import akka.stream.io._ import akka.stream.io.SslTls.TlsModule import akka.stream.stage.Stage -import akka.util.ByteString import org.reactivestreams._ import scala.concurrent.{ Await, ExecutionContextExecutor } @@ -28,17 +25,14 @@ import scala.concurrent.{ Await, ExecutionContextExecutor } /** * INTERNAL API */ -private[akka] case class ActorMaterializerImpl( - val system: ActorSystem, - override val settings: ActorMaterializerSettings, - dispatchers: Dispatchers, - val supervisor: ActorRef, - val haveShutDown: AtomicBoolean, - flowNameCounter: AtomicLong, - namePrefix: String, - optimizations: Optimizations) - extends ActorMaterializer { - import ActorMaterializerImpl._ +private[akka] case class ActorMaterializerImpl(val system: ActorSystem, + override val settings: ActorMaterializerSettings, + dispatchers: Dispatchers, + val supervisor: ActorRef, + val haveShutDown: AtomicBoolean, + flowNameCounter: AtomicLong, + namePrefix: String, + optimizations: Optimizations) extends ActorMaterializer { import akka.stream.impl.Stages._ override def shutdown(): Unit = @@ -103,7 +97,7 @@ private[akka] case class ActorMaterializerImpl( case tls: TlsModule ⇒ // TODO solve this so TlsModule doesn't need special treatment here val es = effectiveSettings(effectiveAttributes) val props = - SslTlsCipherActor.props(es, tls.sslContext, tls.firstSession, tracing = false, tls.role, tls.closing, tls.hostInfo) + SslTlsCipherActor.props(es, tls.sslContext, tls.firstSession, tls.role, tls.closing, tls.hostInfo) val impl = actorOf(props, stageName(effectiveAttributes), es.dispatcher) def factory(id: Int) = new ActorPublisher[Any](impl) { override val wakeUpMsg = FanOut.SubstreamSubscribePending(id) @@ -174,10 +168,10 @@ private[akka] case class ActorMaterializerImpl( (FlexiRoute.props(effectiveSettings, r.shape, flexi), r.shape.inlets.head: InPort, r.shape.outlets) case BroadcastModule(shape, eagerCancel, _) ⇒ - (Broadcast.props(effectiveSettings, eagerCancel, shape.outArray.size), shape.in, shape.outArray.toSeq) + (Broadcast.props(effectiveSettings, eagerCancel, shape.outArray.length), shape.in, shape.outArray.toSeq) case BalanceModule(shape, waitForDownstreams, _) ⇒ - (Balance.props(effectiveSettings, shape.outArray.size, waitForDownstreams), shape.in, shape.outArray.toSeq) + (Balance.props(effectiveSettings, shape.outArray.length, waitForDownstreams), shape.in, shape.outArray.toSeq) case unzip: UnzipWithModule ⇒ (unzip.props(effectiveSettings), unzip.inPorts.head, unzip.shape.outlets) @@ -237,7 +231,7 @@ private[akka] case class ActorMaterializerImpl( */ private[akka] object FlowNameCounter extends ExtensionId[FlowNameCounter] with ExtensionIdProvider { override def get(system: ActorSystem): FlowNameCounter = super.get(system) - override def lookup = FlowNameCounter + override def lookup() = FlowNameCounter override def createExtension(system: ExtendedActorSystem): FlowNameCounter = new FlowNameCounter } @@ -259,13 +253,13 @@ private[akka] object StreamSupervisor { extends DeadLetterSuppression with NoSerializationVerificationNeeded /** Testing purpose */ - final case object GetChildren + case object GetChildren /** Testing purpose */ final case class Children(children: Set[ActorRef]) /** Testing purpose */ - final case object StopChildren + case object StopChildren /** Testing purpose */ - final case object StoppedChildren + case object StoppedChildren } private[akka] class StreamSupervisor(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean) extends Actor { @@ -291,7 +285,6 @@ private[akka] class StreamSupervisor(settings: ActorMaterializerSettings, haveSh */ private[akka] object ActorProcessorFactory { import akka.stream.impl.Stages._ - import ActorMaterializerImpl._ def props(materializer: ActorMaterializer, op: StageModule, parentAttributes: Attributes): (Props, Any) = { val att = parentAttributes and op.attributes @@ -309,17 +302,17 @@ private[akka] object ActorProcessorFactory { case Collect(pf, _) ⇒ interp(fusing.Collect(pf, settings.supervisionDecider)) case Scan(z, f, _) ⇒ interp(fusing.Scan(z, f, settings.supervisionDecider)) case Fold(z, f, _) ⇒ interp(fusing.Fold(z, f, settings.supervisionDecider)) - case Recover(pf, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Recover(pf)), materializer, att), ()) - case Scan(z, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Scan(z, f, settings.supervisionDecider)), materializer, att), ()) - case Expand(s, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Expand(s, f)), materializer, att), ()) - case Conflate(s, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Conflate(s, f, settings.supervisionDecider)), materializer, att), ()) - case Buffer(n, s, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Buffer(n, s)), materializer, att), ()) - case MapConcat(f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.MapConcat(f, settings.supervisionDecider)), materializer, att), ()) - case MapAsync(p, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.MapAsync(p, f, settings.supervisionDecider)), materializer, att), ()) - case MapAsyncUnordered(p, f, _) ⇒ (ActorInterpreter.props(settings, List(fusing.MapAsyncUnordered(p, f, settings.supervisionDecider)), materializer, att), ()) - case Grouped(n, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Grouped(n)), materializer, att), ()) - case Sliding(n, step, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Sliding(n, step)), materializer, att), ()) - case Log(n, e, l, _) ⇒ (ActorInterpreter.props(settings, List(fusing.Log(n, e, l)), materializer, att), ()) + case Recover(pf, _) ⇒ interp(fusing.Recover(pf)) + case Scan(z, f, _) ⇒ interp(fusing.Scan(z, f, settings.supervisionDecider)) + case Expand(s, f, _) ⇒ interp(fusing.Expand(s, f)) + case Conflate(s, f, _) ⇒ interp(fusing.Conflate(s, f, settings.supervisionDecider)) + case Buffer(n, s, _) ⇒ interp(fusing.Buffer(n, s)) + case MapConcat(f, _) ⇒ interp(fusing.MapConcat(f, settings.supervisionDecider)) + case MapAsync(p, f, _) ⇒ interp(fusing.MapAsync(p, f, settings.supervisionDecider)) + case MapAsyncUnordered(p, f, _) ⇒ interp(fusing.MapAsyncUnordered(p, f, settings.supervisionDecider)) + case Grouped(n, _) ⇒ interp(fusing.Grouped(n)) + case Sliding(n, step, _) ⇒ interp(fusing.Sliding(n, step)) + case Log(n, e, l, _) ⇒ interp(fusing.Log(n, e, l)) case GroupBy(f, _) ⇒ (GroupByProcessorImpl.props(settings, f), ()) case PrefixAndTail(n, _) ⇒ (PrefixAndTailImpl.props(settings, n), ()) case Split(d, _) ⇒ (SplitWhereProcessorImpl.props(settings, d), ()) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index f7e9711cd7..a513baed8d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -3,7 +3,6 @@ */ package akka.stream.impl -import java.util.Arrays import akka.actor._ import akka.stream.{ AbruptTerminationException, ActorMaterializerSettings } import akka.stream.actor.ActorSubscriber.OnSubscribe @@ -101,7 +100,7 @@ private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump) override def isClosed: Boolean = upstreamCompleted private def clear(): Unit = { - Arrays.fill(inputBuffer, 0, inputBuffer.length, null) + java.util.Arrays.fill(inputBuffer, 0, inputBuffer.length, null) inputBufferElements = 0 } diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala index 38b56f32a0..70feb471ec 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala @@ -9,8 +9,6 @@ import akka.stream.actor.{ ActorSubscriberMessage, ActorSubscriber } import akka.stream.scaladsl.FlexiMerge.MergeLogic import org.reactivestreams.{ Subscription, Subscriber } -import scala.collection.immutable - /** * INTERNAL API */ @@ -105,7 +103,7 @@ private[akka] object FanIn { def cancel(input: Int) = if (!cancelled(input)) { inputs(input).cancel() - cancelled(input, true) + cancelled(input, on = true) unmarkInput(input) } @@ -117,7 +115,7 @@ private[akka] object FanIn { if (!marked(input)) { if (depleted(input)) markedDepleted += 1 if (pending(input)) markedPending += 1 - marked(input, true) + marked(input, on = true) markCount += 1 } } @@ -126,7 +124,7 @@ private[akka] object FanIn { if (marked(input)) { if (depleted(input)) markedDepleted -= 1 if (pending(input)) markedPending -= 1 - marked(input, false) + marked(input, on = false) markCount -= 1 } } @@ -171,11 +169,11 @@ private[akka] object FanIn { val elem = input.dequeueInputElement() if (!input.inputsAvailable) { if (marked(id)) markedPending -= 1 - pending(id, false) + pending(id, on = false) } if (input.inputsDepleted) { if (marked(id)) markedDepleted += 1 - depleted(id, true) + depleted(id, on = true) onDepleted(id) } elem @@ -202,7 +200,7 @@ private[akka] object FanIn { } val AnyOfMarkedInputs = new TransferState { - override def isCompleted: Boolean = (markedDepleted == markCount && markedPending == 0) + override def isCompleted: Boolean = markedDepleted == markCount && markedPending == 0 override def isReady: Boolean = markedPending > 0 } @@ -222,15 +220,15 @@ private[akka] object FanIn { inputs(id).subreceive(ActorSubscriber.OnSubscribe(subscription)) case OnNext(id, elem) ⇒ if (marked(id) && !pending(id)) markedPending += 1 - pending(id, true) + pending(id, on = true) inputs(id).subreceive(ActorSubscriberMessage.OnNext(elem)) case OnComplete(id) ⇒ if (!pending(id)) { if (marked(id) && !depleted(id)) markedDepleted += 1 - depleted(id, true) + depleted(id, on = true) onDepleted(id) } - completed(id, true) + completed(id, on = true) inputs(id).subreceive(ActorSubscriberMessage.OnComplete) case OnError(id, e) ⇒ onError(id, e) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala index 268729d814..7976583228 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala @@ -42,9 +42,9 @@ private[akka] class FlexiMergeImpl[T, S <: Shape]( } override protected val inputBunch = new FanIn.InputBunch(inputCount, settings.maxInputBufferSize, this) { - override def onError(input: Int, e: Throwable): Unit = { + override def onError(input: Int, t: Throwable): Unit = { changeBehavior( - try completion.onUpstreamFailure(ctx, inputMapping(input), e) + try completion.onUpstreamFailure(ctx, inputMapping(input), t) catch { case NonFatal(e) ⇒ fail(e); mergeLogic.SameState }) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlexiRouteImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlexiRouteImpl.scala index a6b361b4ab..c5e9d41b2f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlexiRouteImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlexiRouteImpl.scala @@ -51,9 +51,9 @@ private[akka] class FlexiRouteImpl[T, S <: Shape](_settings: ActorMaterializerSe } override protected val primaryInputs: Inputs = new BatchingInputBuffer(settings.maxInputBufferSize, this) { - override def onError(e: Throwable): Unit = { - try completion.onUpstreamFailure(ctx, e) catch { case NonFatal(e) ⇒ fail(e) } - fail(e) + override def onError(t: Throwable): Unit = { + try completion.onUpstreamFailure(ctx, t) catch { case NonFatal(e) ⇒ fail(e) } + fail(t) } override def onComplete(): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/Flows.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowModule.scala similarity index 100% rename from akka-stream/src/main/scala/akka/stream/impl/Flows.scala rename to akka-stream/src/main/scala/akka/stream/impl/FlowModule.scala diff --git a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala index 3776655de2..1ded51b290 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala @@ -10,7 +10,7 @@ import akka.stream._ import org.reactivestreams._ import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.duration.FiniteDuration -import scala.concurrent.{ Promise } +import scala.concurrent.Promise import scala.util.{ Failure, Success } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 0fa58bfc28..c29a0127d8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -3,14 +3,13 @@ */ package akka.stream.impl -import akka.event.{ LoggingAdapter, Logging } +import akka.event.LoggingAdapter import akka.stream.impl.SplitDecision.SplitDecision import akka.stream.impl.StreamLayout._ import akka.stream.{ OverflowStrategy, TimerTransformer, Attributes } import akka.stream.Attributes._ import akka.stream.stage.Stage import org.reactivestreams.Processor -import akka.event.Logging.simpleName import scala.collection.immutable import scala.concurrent.Future diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index 461ae2f691..d84da3721c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -3,7 +3,7 @@ */ package akka.stream.impl -import java.util.concurrent.atomic.{ AtomicInteger, AtomicBoolean, AtomicReference } +import java.util.concurrent.atomic.{ AtomicInteger, AtomicReference } import akka.stream.impl.MaterializerSession.MaterializationPanic import akka.stream.impl.StreamLayout.Module import akka.stream.scaladsl.Keep @@ -58,7 +58,8 @@ private[akka] object StreamLayout { if (downs != ups2) problems ::= s"inconsistent maps: ups ${pairs(ups2 -- inter)} downs ${pairs(downs -- inter)}" val (allIn, dupIn, allOut, dupOut) = subModules.foldLeft((Set.empty[InPort], Set.empty[InPort], Set.empty[OutPort], Set.empty[OutPort])) { - case ((ai, di, ao, doo), m) ⇒ (ai ++ m.inPorts, di ++ ai.intersect(m.inPorts), ao ++ m.outPorts, doo ++ ao.intersect(m.outPorts)) + case ((ai, di, ao, doo), sm) ⇒ + (ai ++ sm.inPorts, di ++ ai.intersect(sm.inPorts), ao ++ sm.outPorts, doo ++ ao.intersect(sm.outPorts)) } if (dupIn.nonEmpty) problems ::= s"duplicate ports in submodules ${ins(dupIn)}" if (dupOut.nonEmpty) problems ::= s"duplicate ports in submodules ${outs(dupOut)}" @@ -73,7 +74,7 @@ private[akka] object StreamLayout { n match { case Ignore ⇒ Set.empty case Transform(f, dep) ⇒ atomics(dep) - case Atomic(m) ⇒ Set(m) + case Atomic(module) ⇒ Set(module) case Combine(f, left, right) ⇒ atomics(left) ++ atomics(right) } val atomic = atomics(materializedValueComputation) diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala index 4bc2e02dbf..c659035bd3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala @@ -8,7 +8,6 @@ import akka.actor._ import akka.stream.ActorMaterializerSettings import org.reactivestreams.{ Publisher, Subscriber, Subscription } import scala.collection.mutable -import scala.concurrent.duration.FiniteDuration /** * INTERNAL API @@ -48,7 +47,7 @@ private[akka] object MultiStreamOutputProcessor { override def subreceive: SubReceive = throw new UnsupportedOperationException("Substream outputs are managed in a dedicated receive block") - def isAttached() = state.get().isInstanceOf[Attached] + def isAttached = state.get().isInstanceOf[Attached] def enqueueOutputDemand(demand: Long): Unit = { downstreamDemand += demand @@ -188,8 +187,8 @@ private[akka] trait MultiStreamOutputProcessorLike extends Pump with StreamSubsc case _ ⇒ // ignore... } case SubstreamSubscriptionTimeout(key) ⇒ substreamOutputs.get(key) match { - case Some(sub) if !sub.isAttached() ⇒ subscriptionTimedOut(sub) - case _ ⇒ // ignore... + case Some(sub) if !sub.isAttached ⇒ subscriptionTimedOut(sub) + case _ ⇒ // ignore... } case SubstreamCancel(key) ⇒ invalidateSubstreamOutput(key) diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamSubscriptionTimeout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamSubscriptionTimeout.scala index 3662ceec4f..a6388d55c2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamSubscriptionTimeout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamSubscriptionTimeout.scala @@ -15,7 +15,7 @@ object StreamSubscriptionTimeoutSupport { /** * A subscriber who calls `cancel` directly from `onSubscribe` and ignores all other callbacks. */ - final case object CancelingSubscriber extends Subscriber[Any] { + case object CancelingSubscriber extends Subscriber[Any] { override def onSubscribe(s: Subscription): Unit = { ReactiveStreamsCompliance.requireNonNullSubscription(s) s.cancel() @@ -37,7 +37,7 @@ object StreamSubscriptionTimeoutSupport { * Subscription timeout which does not start any scheduled events and always returns `true`. * This specialized implementation is to be used for "noop" timeout mode. */ - final case object NoopSubscriptionTimeout extends Cancellable { + case object NoopSubscriptionTimeout extends Cancellable { override def cancel() = true override def isCancelled = true } @@ -79,11 +79,11 @@ private[akka] trait StreamSubscriptionTimeoutSupport { target match { case p: Processor[_, _] ⇒ log.debug("Cancelling {} Processor's publisher and subscriber sides (after {} ms)", p, millis) - handleSubscriptionTimeout(target, new SubscriptionTimeoutException(s"Publisher was not attached to upstream within deadline (${millis}) ms") with NoStackTrace) + handleSubscriptionTimeout(target, new SubscriptionTimeoutException(s"Publisher was not attached to upstream within deadline ($millis) ms") with NoStackTrace) case p: Publisher[_] ⇒ log.debug("Cancelling {} (after: {} ms)", p, millis) - handleSubscriptionTimeout(target, new SubscriptionTimeoutException(s"Publisher (${p}) you are trying to subscribe to has been shut-down " + + handleSubscriptionTimeout(target, new SubscriptionTimeoutException(s"Publisher ($p) you are trying to subscribe to has been shut-down " + s"because exceeding it's subscription-timeout.") with NoStackTrace) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala b/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala index 3b8051b349..bb3adf558e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala @@ -173,7 +173,7 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff val element = buffer.read(head) head.dispatch(element) head.totalDemand -= 1 - dispatch(tail, true) + dispatch(tail, sent = true) } else dispatch(tail, sent) case _ ⇒ sent } diff --git a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala index a7808c0e35..b022c3a39b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala @@ -122,7 +122,7 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite } override def postStop(): Unit = { - tickTask.foreach(_.cancel) + tickTask.foreach(_.cancel()) cancelled.set(true) if (exposedPublisher ne null) exposedPublisher.shutdown(ActorPublisher.SomeNormalShutdownReason) diff --git a/akka-stream/src/main/scala/akka/stream/impl/TimerTransformerProcessorsImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/TimerTransformerProcessorsImpl.scala index 31df7c7518..de00397883 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TimerTransformerProcessorsImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TimerTransformerProcessorsImpl.scala @@ -3,7 +3,6 @@ */ package akka.stream.impl -import java.util.LinkedList import akka.stream.ActorMaterializerSettings import akka.stream.TimerTransformer import scala.util.control.NonFatal @@ -46,7 +45,7 @@ private[akka] class TimerTransformerProcessorsImpl( } val schedulerInputs: Inputs = new DefaultInputTransferStates { - val queue = new LinkedList[Any] + val queue = new java.util.LinkedList[Any] override def dequeueInputElement(): Any = queue.removeFirst() @@ -76,6 +75,11 @@ private[akka] class TimerTransformerProcessorsImpl( def isCompleted = false } + private val terminate = TransferPhase(Always) { () ⇒ + emits = transformer.onTermination(errorEvent) + emitAndThen(completedPhase) + } + private val running: TransferPhase = TransferPhase(RunningCondition) { () ⇒ if (primaryInputs.inputsDepleted || (transformer.isComplete && !schedulerInputs.inputsAvailable)) { nextPhase(terminate) @@ -89,11 +93,6 @@ private[akka] class TimerTransformerProcessorsImpl( } } - private val terminate = TransferPhase(Always) { () ⇒ - emits = transformer.onTermination(errorEvent) - emitAndThen(completedPhase) - } - override def toString: String = s"Transformer(emits=$emits, transformer=$transformer)" } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala index 8481c9d1fc..76e9fdb77b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala @@ -3,20 +3,17 @@ */ package akka.stream.impl.fusing -import java.util.Arrays import akka.actor._ import akka.stream.impl.ReactiveStreamsCompliance._ import akka.stream.{ AbruptTerminationException, ActorMaterializerSettings, Attributes, ActorMaterializer } import akka.stream.actor.ActorSubscriber.OnSubscribe import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnError, OnComplete } import akka.stream.impl._ -import akka.stream.impl.fusing.OneBoundedInterpreter.{ InitializationFailed, InitializationFailure, InitializationSuccessful } +import akka.stream.impl.fusing.OneBoundedInterpreter.{ InitializationFailed, InitializationSuccessful } import akka.stream.stage._ import org.reactivestreams.{ Subscriber, Subscription } import akka.event.{ Logging, LoggingAdapter } -import scala.util.control.NonFatal - /** * INTERNAL API */ @@ -101,7 +98,7 @@ private[akka] class BatchingActorInputBoundary(val size: Int, val name: String) } private def clear(): Unit = { - Arrays.fill(inputBuffer, 0, inputBuffer.length, null) + java.util.Arrays.fill(inputBuffer, 0, inputBuffer.length, null) inputBufferElements = 0 } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/OneBoundedInterpreter.scala similarity index 99% rename from akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala rename to akka-stream/src/main/scala/akka/stream/impl/fusing/OneBoundedInterpreter.scala index 6b7052232f..bdb6efa411 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Interpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/OneBoundedInterpreter.scala @@ -21,7 +21,7 @@ private[akka] object OneBoundedInterpreter { /** INTERNAL API */ private[akka] sealed trait InitializationStatus /** INTERNAL API */ - private[akka] final case object InitializationSuccessful extends InitializationStatus + private[akka] case object InitializationSuccessful extends InitializationStatus /** INTERNAL API */ private[akka] final case class InitializationFailed(failures: immutable.Seq[InitializationFailure]) extends InitializationStatus { // exceptions are reverse ordered here, below methods help to avoid confusion when used from the outside @@ -197,7 +197,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], private var lastOpFailing: Int = -1 private def pipeName(op: UntypedOp): String = { - val o = (op: AbstractStage[_, _, _, _, _, _]) + val o = op: AbstractStage[_, _, _, _, _, _] (o match { case Finished ⇒ "finished" case _: BoundaryStage ⇒ "boundary" @@ -219,7 +219,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], private def calculateJumpBacks: Array[Int] = { val table = Array.ofDim[Int](pipeline.length) var nextJumpBack = -1 - for (pos ← 0 until pipeline.length) { + for (pos ← pipeline.indices) { table(pos) = nextJumpBack if (!pipeline(pos).isInstanceOf[PushStage[_, _]]) nextJumpBack = pos } @@ -310,7 +310,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], null } - override def getAsyncCallback(): AsyncCallback[Any] = { + override def getAsyncCallback: AsyncCallback[Any] = { val current = currentOp.asInstanceOf[AsyncStage[Any, Any, Any]] val context = current.context // avoid concurrent access (to avoid @volatile) new AsyncCallback[Any] { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index dc288fd9eb..679f7a643a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -6,7 +6,6 @@ package akka.stream.impl.fusing import akka.event.Logging.LogLevel import akka.event.{ LogSource, Logging, LoggingAdapter } import akka.stream.Attributes.LogLevels -import akka.stream.Supervision.Resume import akka.stream.impl.{ FixedSizeBuffer, ReactiveStreamsCompliance } import akka.stream.stage._ import akka.stream.{ Supervision, _ } @@ -68,7 +67,7 @@ private[akka] final case class DropWhile[T](p: T ⇒ Boolean, decider: Supervisi override def decide(t: Throwable): Supervision.Directive = decider(t) } -private[akka] final object Collect { +private[akka] object Collect { // Cached function that can be used with PartialFunction.applyOrElse to ensure that A) the guard is only applied once, // and the caller can check the returned value with Collect.notApplied to query whether the PF was applied or not. // Prior art: https://github.com/scala/scala/blob/v2.11.4/src/library/scala/collection/immutable/List.scala#L458 @@ -336,12 +335,12 @@ private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowSt override def onPull(ctx: DetachedContext[T]): DownstreamDirective = { if (ctx.isFinishing) { - val elem = buffer.dequeue().asInstanceOf[T] + val elem = buffer.dequeue() if (buffer.isEmpty) ctx.pushAndFinish(elem) else ctx.push(elem) - } else if (ctx.isHoldingUpstream) ctx.pushAndPull(buffer.dequeue().asInstanceOf[T]) + } else if (ctx.isHoldingUpstream) ctx.pushAndPull(buffer.dequeue()) else if (buffer.isEmpty) ctx.holdDownstream() - else ctx.push(buffer.dequeue().asInstanceOf[T]) + else ctx.push(buffer.dequeue()) } override def onUpstreamFinish(ctx: DetachedContext[T]): TerminationDirective = @@ -350,37 +349,31 @@ private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowSt val enqueueAction: (DetachedContext[T], T) ⇒ UpstreamDirective = { overflowStrategy match { - case DropHead ⇒ { (ctx, elem) ⇒ + case DropHead ⇒ (ctx, elem) ⇒ if (buffer.isFull) buffer.dropHead() buffer.enqueue(elem) ctx.pull() - } - case DropTail ⇒ { (ctx, elem) ⇒ + case DropTail ⇒ (ctx, elem) ⇒ if (buffer.isFull) buffer.dropTail() buffer.enqueue(elem) ctx.pull() - } - case DropBuffer ⇒ { (ctx, elem) ⇒ + case DropBuffer ⇒ (ctx, elem) ⇒ if (buffer.isFull) buffer.clear() buffer.enqueue(elem) ctx.pull() - } - case DropNew ⇒ { (ctx, elem) ⇒ + case DropNew ⇒ (ctx, elem) ⇒ if (!buffer.isFull) buffer.enqueue(elem) ctx.pull() - } - case Backpressure ⇒ { (ctx, elem) ⇒ + case Backpressure ⇒ (ctx, elem) ⇒ buffer.enqueue(elem) if (buffer.isFull) ctx.holdUpstream() else ctx.pull() - } - case Fail ⇒ { (ctx, elem) ⇒ + case Fail ⇒ (ctx, elem) ⇒ if (buffer.isFull) ctx.fail(new Fail.BufferOverflowException(s"Buffer overflow (max capacity was: $size)!")) else { buffer.enqueue(elem) ctx.pull() } - } } } } @@ -478,9 +471,9 @@ private[akka] final case class Expand[In, Out, Seed](seed: In ⇒ Seed, extrapol else ctx.absorbTermination() } - final override def decide(t: Throwable): Supervision.Directive = Supervision.Stop + override def decide(t: Throwable): Supervision.Directive = Supervision.Stop - final override def restart(): Expand[In, Out, Seed] = + override def restart(): Expand[In, Out, Seed] = throw new UnsupportedOperationException("Expand doesn't support restart") } @@ -505,7 +498,7 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut private val elemsInFlight = FixedSizeBuffer[Try[Out]](parallelism) override def preStart(ctx: AsyncContext[Out, Notification]): Unit = { - callback = ctx.getAsyncCallback() + callback = ctx.getAsyncCallback } override def decide(ex: Throwable) = decider(ex) @@ -554,7 +547,7 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut else ctx.ignore() } else ctx.fail(ex) case (idx, s: Success[_]) ⇒ - val ex = try { + val exception = try { ReactiveStreamsCompliance.requireNonNullElement(s.value) elemsInFlight.put(idx, s) null: Exception @@ -565,7 +558,7 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut null: Exception } else ex } - if (ex != null) ctx.fail(ex) + if (exception != null) ctx.fail(exception) else if (ctx.isHoldingDownstream) rec() else ctx.ignore() } @@ -589,7 +582,7 @@ private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: I private def todo = inFlight + buffer.used override def preStart(ctx: AsyncContext[Out, Try[Out]]): Unit = - callback = ctx.getAsyncCallback() + callback = ctx.getAsyncCallback override def decide(ex: Throwable) = decider(ex) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/SslTls.scala b/akka-stream/src/main/scala/akka/stream/impl/io/SslTlsCipherActor.scala similarity index 93% rename from akka-stream/src/main/scala/akka/stream/impl/io/SslTls.scala rename to akka-stream/src/main/scala/akka/stream/impl/io/SslTlsCipherActor.scala index 9ad40125fc..8780f53b8d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/SslTls.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/SslTlsCipherActor.scala @@ -4,8 +4,6 @@ package akka.stream.impl.io import java.nio.ByteBuffer -import java.security.Principal -import java.security.cert.Certificate import javax.net.ssl.SSLEngineResult.HandshakeStatus import javax.net.ssl.SSLEngineResult.HandshakeStatus._ import javax.net.ssl.SSLEngineResult.Status._ @@ -16,13 +14,8 @@ import akka.stream.impl.FanIn.InputBunch import akka.stream.impl.FanOut.OutputBunch import akka.stream.impl._ import akka.util.ByteString -import akka.util.ByteStringBuilder -import org.reactivestreams.Publisher -import org.reactivestreams.Subscriber import scala.annotation.tailrec -import scala.collection.immutable import akka.stream.io._ -import akka.event.LoggingReceive /** * INTERNAL API. @@ -32,11 +25,11 @@ private[akka] object SslTlsCipherActor { def props(settings: ActorMaterializerSettings, sslContext: SSLContext, firstSession: NegotiateNewSession, - tracing: Boolean, role: Role, closing: Closing, - hostInfo: Option[(String, Int)]): Props = - Props(new SslTlsCipherActor(settings, sslContext, firstSession, tracing, role, closing, hostInfo)).withDeploy(Deploy.local) + hostInfo: Option[(String, Int)], + tracing: Boolean = false): Props = + Props(new SslTlsCipherActor(settings, sslContext, firstSession, role, closing, hostInfo, tracing)).withDeploy(Deploy.local) final val TransportIn = 0 final val TransportOut = 0 @@ -49,8 +42,8 @@ private[akka] object SslTlsCipherActor { * INTERNAL API. */ private[akka] class SslTlsCipherActor(settings: ActorMaterializerSettings, sslContext: SSLContext, - firstSession: NegotiateNewSession, tracing: Boolean, - role: Role, closing: Closing, hostInfo: Option[(String, Int)]) + firstSession: NegotiateNewSession, role: Role, closing: Closing, + hostInfo: Option[(String, Int)], tracing: Boolean) extends Actor with ActorLogging with Pump { import SslTlsCipherActor._ @@ -113,7 +106,7 @@ private[akka] class SslTlsCipherActor(settings: ActorMaterializerSettings, sslCo * not know that we are runnable. */ def putBack(b: ByteBuffer): Unit = - if (b.hasRemaining()) { + if (b.hasRemaining) { if (tracing) log.debug(s"putting back ${b.remaining} bytes into $name") val bs = ByteString(b) if (bs.nonEmpty) buffer = bs ++ buffer @@ -156,12 +149,10 @@ private[akka] class SslTlsCipherActor(settings: ActorMaterializerSettings, sslCo e } var currentSession = engine.getSession - var currentSessionParameters = firstSession - applySessionParameters() + applySessionParameters(firstSession) - def applySessionParameters(): Unit = { - val csp = currentSessionParameters - import csp._ + def applySessionParameters(params: NegotiateNewSession): Unit = { + import params._ enabledCipherSuites foreach (cs ⇒ engine.setEnabledCipherSuites(cs.toArray)) enabledProtocols foreach (p ⇒ engine.setEnabledProtocols(p.toArray)) clientAuth match { @@ -175,11 +166,10 @@ private[akka] class SslTlsCipherActor(settings: ActorMaterializerSettings, sslCo lastHandshakeStatus = engine.getHandshakeStatus } - def setNewSessionParameters(n: NegotiateNewSession): Unit = { - if (tracing) log.debug(s"applying $n") + def setNewSessionParameters(params: NegotiateNewSession): Unit = { + if (tracing) log.debug(s"applying $params") currentSession.invalidate() - currentSessionParameters = n - applySessionParameters() + applySessionParameters(params) corkUser = true } @@ -280,7 +270,7 @@ private[akka] class SslTlsCipherActor(settings: ActorMaterializerSettings, sslCo } def completeOrFlush(): Unit = - if (engine.isOutboundDone()) nextPhase(completedPhase) + if (engine.isOutboundDone) nextPhase(completedPhase) else nextPhase(flushingOutbound) private def doInbound(isOutboundClosed: Boolean, inboundState: TransferState): Boolean = @@ -370,7 +360,7 @@ private[akka] class SslTlsCipherActor(settings: ActorMaterializerSettings, sslCo userInChoppingBlock.putBack(userInBuffer) case CLOSED ⇒ flushToTransport() - if (engine.isInboundDone()) nextPhase(completedPhase) + if (engine.isInboundDone) nextPhase(completedPhase) else nextPhase(awaitingClose) case s ⇒ fail(new IllegalStateException(s"unexpected status $s in doWrap()")) } @@ -392,12 +382,12 @@ private[akka] class SslTlsCipherActor(settings: ActorMaterializerSettings, sslCo handshakeFinished() transportInChoppingBlock.putBack(transportInBuffer) case _ ⇒ - if (transportInBuffer.hasRemaining()) doUnwrap() + if (transportInBuffer.hasRemaining) doUnwrap() else flushToUser() } case CLOSED ⇒ flushToUser() - if (engine.isOutboundDone()) nextPhase(completedPhase) + if (engine.isOutboundDone) nextPhase(completedPhase) else nextPhase(flushingOutbound) case BUFFER_UNDERFLOW ⇒ flushToUser() diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala index a053f212a1..bb84039bdd 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpConnectionStream.scala @@ -5,13 +5,12 @@ package akka.stream.impl.io import java.net.InetSocketAddress import akka.io.{ IO, Tcp } -import akka.stream.impl.io.StreamTcpManager.ExposedProcessor import scala.concurrent.Promise import akka.actor._ import akka.util.ByteString import akka.io.Tcp._ -import akka.stream.{ AbruptTerminationException, StreamSubscriptionTimeoutSettings, ActorMaterializerSettings, StreamTcpException } -import org.reactivestreams.{ Publisher, Processor } +import akka.stream.{ AbruptTerminationException, ActorMaterializerSettings, StreamTcpException } +import org.reactivestreams.Processor import akka.stream.impl._ import scala.util.control.NoStackTrace @@ -248,7 +247,7 @@ private[akka] abstract class TcpStreamActor(val settings: ActorMaterializerSetti case SubscriptionTimeout ⇒ val millis = settings.subscriptionTimeoutSettings.timeout.toMillis if (!primaryOutputs.isSubscribed) { - fail(new SubscriptionTimeoutException(s"Publisher was not attached to upstream within deadline (${millis}) ms") with NoStackTrace) + fail(new SubscriptionTimeoutException(s"Publisher was not attached to upstream within deadline ($millis) ms") with NoStackTrace) context.stop(self) } } @@ -305,7 +304,6 @@ private[akka] class OutboundTcpStreamActor(processorPromise: Promise[Processor[B _halfClose: Boolean, val connectCmd: Connect, _settings: ActorMaterializerSettings) extends TcpStreamActor(_settings, _halfClose) { - import TcpStreamActor._ import context.system val initSteps = new SubReceive(waitingExposedProcessor) diff --git a/akka-stream/src/main/scala/akka/stream/io/SslTls.scala b/akka-stream/src/main/scala/akka/stream/io/SslTls.scala index 186dd5759a..eae3bf3305 100644 --- a/akka-stream/src/main/scala/akka/stream/io/SslTls.scala +++ b/akka-stream/src/main/scala/akka/stream/io/SslTls.scala @@ -121,7 +121,7 @@ object SslTls { override def withAttributes(att: Attributes): Module = copy(attributes = att) override def carbonCopy: Module = { val mod = TlsModule(attributes, sslContext, firstSession, role, closing, hostInfo) - if (plainIn == shape.inlets(0)) mod + if (plainIn == shape.inlets.head) mod else mod.replaceShape(mod.shape.asInstanceOf[BidiShape[_, _, _, _]].reversed) } @@ -158,7 +158,7 @@ object SslTlsPlacebo { scaladsl.BidiFlow() { implicit b ⇒ // this constructs a session for (invalid) protocol SSL_NULL_WITH_NULL_NULL val session = SSLContext.getDefault.createSSLEngine.getSession - val top = b.add(scaladsl.Flow[SslTlsOutbound].collect { case SendBytes(b) ⇒ b }) + val top = b.add(scaladsl.Flow[SslTlsOutbound].collect { case SendBytes(bytes) ⇒ bytes }) val bottom = b.add(scaladsl.Flow[ByteString].map(SessionBytes(session, _))) BidiShape(top, bottom) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 821e7b6330..df2857bbb5 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -5,18 +5,16 @@ package akka.stream.javadsl import akka.event.LoggingAdapter import akka.stream._ -import akka.japi.{ Util, Pair } +import akka.japi.Pair import akka.japi.function -import akka.stream.impl.Stages.Recover import akka.stream.scaladsl -import akka.stream.scaladsl.{ Keep, Sink, Source } -import org.reactivestreams.{ Subscription, Publisher, Subscriber, Processor } +import org.reactivestreams.Processor import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration import akka.stream.stage.Stage -import akka.stream.impl.{ Stages, StreamLayout } +import akka.stream.impl.StreamLayout object Flow { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala index 463b72380b..dc2ce75bb4 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala @@ -150,12 +150,14 @@ object Balance { /** * Create a new `Balance` vertex with the specified input type. */ - def create[T](outputCount: Int): Graph[UniformFanOutShape[T, T], Unit] = create(outputCount, false) + def create[T](outputCount: Int): Graph[UniformFanOutShape[T, T], Unit] = + create(outputCount, waitForAllDownstreams = false) /** * Create a new `Balance` vertex with the specified input type. */ - def create[T](clazz: Class[T], outputCount: Int): Graph[UniformFanOutShape[T, T], Unit] = create(outputCount) + def create[T](clazz: Class[T], outputCount: Int): Graph[UniformFanOutShape[T, T], Unit] = + create(outputCount) /** * Create a new `Balance` vertex with the specified input type. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Materialization.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Keep.scala similarity index 94% rename from akka-stream/src/main/scala/akka/stream/javadsl/Materialization.scala rename to akka-stream/src/main/scala/akka/stream/javadsl/Keep.scala index bd08f4ea16..dc47d50092 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Materialization.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Keep.scala @@ -4,11 +4,8 @@ package akka.stream.javadsl import akka.japi.function -import akka.stream.scaladsl import akka.japi.Pair -import scala.runtime.BoxedUnit - object Keep { private val _left = new function.Function2[Any, Any, Any] with ((Any, Any) ⇒ Any) { def apply(l: Any, r: Any) = l } private val _right = new function.Function2[Any, Any, Any] with ((Any, Any) ⇒ Any) { def apply(l: Any, r: Any) = r } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala index 794a48e168..69de538ab5 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala @@ -36,7 +36,7 @@ object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider { * * The produced [[scala.concurrent.Future]] is fulfilled when the unbinding has been completed. */ - def unbind(): Future[Unit] = delegate.unbind + def unbind(): Future[Unit] = delegate.unbind() } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 29474686ce..61268a1ed7 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -214,9 +214,9 @@ final class Flow[-In, +Out, +Mat](private[stream] override val module: Module) val outs = copy.shape.outlets new Flow(module .compose(copy, combine) - .wire(shape.outlet, ins(0)) + .wire(shape.outlet, ins.head) .wire(outs(1), shape.inlet) - .replaceShape(FlowShape(ins(1), outs(0)))) + .replaceShape(FlowShape(ins(1), outs.head))) } /** @@ -365,7 +365,7 @@ case class RunnableGraph[+Mat](private[stream] val module: StreamLayout.Module) def run()(implicit materializer: Materializer): Mat = materializer.materialize(this) override def withAttributes(attr: Attributes): RunnableGraph[Mat] = - new RunnableGraph(module.withAttributes(attr).nest) + new RunnableGraph(module.withAttributes(attr).nest()) override def named(name: String): RunnableGraph[Mat] = withAttributes(Attributes.name(name)) diff --git a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala index e176606f4c..34b3e3377b 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala @@ -3,8 +3,7 @@ */ package akka.stream.stage -import akka.event.{ Logging, LogSource } -import akka.stream.{ ActorMaterializer, Materializer, Attributes, Supervision } +import akka.stream.{ Materializer, Attributes, Supervision } /** * General interface for stream transformation. @@ -34,10 +33,10 @@ sealed trait Stage[-In, Out] private[stream] object AbstractStage { final val UpstreamBall = 1 final val DownstreamBall = 2 - final val BothBalls = UpstreamBall | DownstreamBall - final val BothBallsAndNoTerminationPending = UpstreamBall | DownstreamBall | NoTerminationPending final val PrecedingWasPull = 0x4000 final val NoTerminationPending = 0x8000 + final val BothBalls = UpstreamBall | DownstreamBall + final val BothBallsAndNoTerminationPending = UpstreamBall | DownstreamBall | NoTerminationPending } abstract class AbstractStage[-In, Out, PushD <: Directive, PullD <: Directive, Ctx <: Context[Out], LifeCtx <: LifecycleContext] extends Stage[In, Out] { @@ -655,7 +654,7 @@ trait AsyncContext[Out, Ext] extends DetachedContext[Out] { * * This object can be cached and reused within the same [[AsyncStage]]. */ - def getAsyncCallback(): AsyncCallback[Ext] + def getAsyncCallback: AsyncCallback[Ext] /** * In response to an asynchronous notification an [[AsyncStage]] may choose * to neither push nor pull nor terminate, which is represented as this