diff --git a/akka-bench-jmh/src/main/scala/akka/http/HttpBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/http/HttpBenchmark.scala index dfe19990b7..2754334464 100644 --- a/akka-bench-jmh/src/main/scala/akka/http/HttpBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/http/HttpBenchmark.scala @@ -56,8 +56,7 @@ class HttpBenchmark { def shutdown() = { Await.ready(Http().shutdownAllConnectionPools(), 1.second) binding.unbind() - system.terminate() - system.awaitTermination() + Await.result(system.terminate(), 5.seconds) } @Benchmark diff --git a/akka-bench-jmh/src/main/scala/akka/stream/FlatMapMergeBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/FlatMapMergeBenchmark.scala index 8c36e39fbf..8138f2671a 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/FlatMapMergeBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/FlatMapMergeBenchmark.scala @@ -1,6 +1,6 @@ /** - * Copyright (C) 2014 Typesafe Inc. - */ + * Copyright (C) 2014 Typesafe Inc. + */ package akka.stream @@ -10,7 +10,7 @@ import akka.stream.scaladsl._ import java.util.concurrent.TimeUnit import org.openjdk.jmh.annotations._ import scala.concurrent._ -import scala.concurrent.duration.Duration.Inf +import scala.concurrent.duration._ @State(Scope.Benchmark) @OutputTimeUnit(TimeUnit.MILLISECONDS) @@ -44,13 +44,12 @@ class FlatMapMergeBenchmark { @TearDown def shutdown() { - system.terminate() - system.awaitTermination() + Await.result(system.terminate(), 5.seconds) } @Benchmark @OperationsPerInvocation(100000) // Note: needs to match NumberOfElements. def flat_map_merge_100k_elements() { - Await.result(graph.run(), Inf) + Await.result(graph.run(), Duration.Inf) } } diff --git a/akka-bench-jmh/src/main/scala/akka/stream/FlowMapBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/FlowMapBenchmark.scala index 4bf5cd3df8..3b99b10305 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/FlowMapBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/FlowMapBenchmark.scala @@ -14,6 +14,8 @@ import scala.concurrent.Lock import scala.util.Success import akka.stream.impl.fusing.GraphStages import org.reactivestreams._ +import scala.concurrent.Await +import scala.concurrent.duration._ @State(Scope.Benchmark) @OutputTimeUnit(TimeUnit.MILLISECONDS) @@ -110,8 +112,7 @@ class FlowMapBenchmark { @TearDown def shutdown() { - system.terminate() - system.awaitTermination() + Await.result(system.terminate(), 5.seconds) } @Benchmark @@ -133,5 +134,4 @@ class FlowMapBenchmark { f } - } diff --git a/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala index 4b4df25ba5..65f118cb02 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/MaterializationBenchmark.scala @@ -9,6 +9,8 @@ import akka.NotUsed import akka.actor.ActorSystem import akka.stream.scaladsl._ import org.openjdk.jmh.annotations._ +import scala.concurrent.Await +import scala.concurrent.duration._ object MaterializationBenchmark { @@ -51,22 +53,23 @@ object MaterializationBenchmark { flow ⇒ import GraphDSL.Implicits._ Source.single(()) ~> flow ~> Sink.ignore - ClosedShape + ClosedShape }) } val graphWithImportedFlowBuilder = (numOfFlows: Int) => - RunnableGraph.fromGraph(GraphDSL.create(Source.single(())) { implicit b ⇒ source ⇒ - import GraphDSL.Implicits._ - val flow = Flow[Unit].map(identity) - var out: Outlet[Unit] = source.out - for (i <- 0 until numOfFlows) { - val flowShape = b.add(flow) - out ~> flowShape - out = flowShape.outlet - } - out ~> Sink.ignore - ClosedShape + RunnableGraph.fromGraph(GraphDSL.create(Source.single(())) { implicit b ⇒ + source ⇒ + import GraphDSL.Implicits._ + val flow = Flow[Unit].map(identity) + var out: Outlet[Unit] = source.out + for (i <- 0 until numOfFlows) { + val flowShape = b.add(flow) + out ~> flowShape + out = flowShape.outlet + } + out ~> Sink.ignore + ClosedShape }) } @@ -97,8 +100,7 @@ class MaterializationBenchmark { @TearDown def shutdown() { - system.terminate() - system.awaitTermination() + Await.result(system.terminate(), 5.seconds) } @Benchmark diff --git a/akka-bench-jmh/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala index 961985ad25..2cc6c67175 100644 --- a/akka-bench-jmh/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/stream/io/FileSourcesBenchmark.scala @@ -4,18 +4,18 @@ package akka.stream.io -import java.io.{FileInputStream, File} +import java.io.{ FileInputStream, File } import java.util.concurrent.TimeUnit import akka.{Done, NotUsed} import akka.actor.ActorSystem -import akka.stream.{Attributes, ActorMaterializer} +import akka.stream.{ Attributes, ActorMaterializer } import akka.stream.scaladsl._ import akka.util.ByteString import org.openjdk.jmh.annotations._ import scala.concurrent.duration._ -import scala.concurrent.{Promise, Await, Future} +import scala.concurrent.{ Promise, Await, Future } /** * Benchmark (bufSize) Mode Cnt Score Error Units @@ -64,8 +64,7 @@ class FileSourcesBenchmark { @TearDown def shutdown() { - system.terminate() - system.awaitTermination() + Await.result(system.terminate(), Duration.Inf) } @Benchmark @@ -102,5 +101,4 @@ class FileSourcesBenchmark { Await.result(p.future, 30.seconds) } - } diff --git a/akka-camel/src/test/scala/akka/camel/internal/component/ActorComponentConfigurationTest.scala b/akka-camel/src/test/scala/akka/camel/internal/component/ActorComponentConfigurationTest.scala index 7bce201cc4..758b886098 100644 --- a/akka-camel/src/test/scala/akka/camel/internal/component/ActorComponentConfigurationTest.scala +++ b/akka-camel/src/test/scala/akka/camel/internal/component/ActorComponentConfigurationTest.scala @@ -17,11 +17,11 @@ class ActorComponentConfigurationTest extends WordSpec with Matchers with Shared val component: Component = camel.context.getComponent("akka") "Endpoint url config should be correctly parsed" in { - val actorEndpointConfig = component.createEndpoint("akka://test/user/$a?autoAck=false&replyTimeout=987000000+nanos").asInstanceOf[ActorEndpointConfig] + val actorEndpointConfig = component.createEndpoint(s"akka://test/user/$$a?autoAck=false&replyTimeout=987000000+nanos").asInstanceOf[ActorEndpointConfig] actorEndpointConfig should have( - 'endpointUri("akka://test/user/$a?autoAck=false&replyTimeout=987000000+nanos"), - 'path(ActorEndpointPath.fromCamelPath("akka://test/user/$a")), + 'endpointUri(s"akka://test/user/$$a?autoAck=false&replyTimeout=987000000+nanos"), + 'path(ActorEndpointPath.fromCamelPath(s"akka://test/user/$$a")), 'autoAck(false), 'replyTimeout(987000000 nanos)) } diff --git a/akka-cluster-tools/src/test/java/akka/cluster/singleton/ClusterSingletonManagerTest.java b/akka-cluster-tools/src/test/java/akka/cluster/singleton/ClusterSingletonManagerTest.java index e363757e90..7071657dbc 100644 --- a/akka-cluster-tools/src/test/java/akka/cluster/singleton/ClusterSingletonManagerTest.java +++ b/akka-cluster-tools/src/test/java/akka/cluster/singleton/ClusterSingletonManagerTest.java @@ -4,27 +4,13 @@ package akka.cluster.singleton; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; -import java.util.SortedSet; -import java.util.TreeSet; - import akka.actor.ActorSystem; import akka.actor.ActorRef; -import akka.actor.ActorSelection; import akka.actor.Props; -import akka.actor.UntypedActor; -import akka.cluster.Cluster; -import akka.cluster.Member; -import akka.cluster.ClusterEvent.CurrentClusterState; -import akka.cluster.ClusterEvent.MemberEvent; -import akka.cluster.ClusterEvent.MemberUp; -import akka.cluster.ClusterEvent.MemberRemoved; -import akka.cluster.MemberStatus; public class ClusterSingletonManagerTest { + @SuppressWarnings("null") public void demo() { final ActorSystem system = null; final ActorRef queue = null; diff --git a/akka-contrib/src/test/java/akka/contrib/throttle/TimerBasedThrottlerTest.java b/akka-contrib/src/test/java/akka/contrib/throttle/TimerBasedThrottlerTest.java index 8f09d843cc..91aa1e341a 100644 --- a/akka-contrib/src/test/java/akka/contrib/throttle/TimerBasedThrottlerTest.java +++ b/akka-contrib/src/test/java/akka/contrib/throttle/TimerBasedThrottlerTest.java @@ -10,7 +10,6 @@ import org.junit.Test; import java.util.concurrent.TimeUnit; import scala.concurrent.duration.Duration; import com.typesafe.config.ConfigFactory; -import akka.actor.Actor; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; diff --git a/akka-docs/rst/java/code/docs/actor/ActorDocTest.java b/akka-docs/rst/java/code/docs/actor/ActorDocTest.java index ca021910fb..38b7fc2ef6 100644 --- a/akka-docs/rst/java/code/docs/actor/ActorDocTest.java +++ b/akka-docs/rst/java/code/docs/actor/ActorDocTest.java @@ -14,7 +14,6 @@ import com.typesafe.config.ConfigFactory; import scala.PartialFunction; import scala.runtime.BoxedUnit; import static docs.actor.Messages.Swap.Swap; -import static docs.actor.Messages.*; import static akka.japi.Util.immutableSeq; import java.util.concurrent.TimeUnit; @@ -63,9 +62,8 @@ public class ActorDocTest { } @AfterClass - public static void afterClass() { - system.terminate(); - system.awaitTermination(Duration.create("5 seconds")); + public static void afterClass() throws Exception { + Await.result(system.terminate(), Duration.create("5 seconds")); } static diff --git a/akka-docs/rst/java/code/docs/actor/InitializationDocTest.java b/akka-docs/rst/java/code/docs/actor/InitializationDocTest.java index e1735839f8..5827d4522f 100644 --- a/akka-docs/rst/java/code/docs/actor/InitializationDocTest.java +++ b/akka-docs/rst/java/code/docs/actor/InitializationDocTest.java @@ -13,6 +13,7 @@ import org.junit.Test; import scala.PartialFunction; import scala.concurrent.duration.Duration; import scala.runtime.BoxedUnit; +import scala.concurrent.Await; import java.util.concurrent.TimeUnit; @@ -26,9 +27,8 @@ public class InitializationDocTest { } @AfterClass - public static void afterClass() { - system.terminate(); - system.awaitTermination(Duration.create("5 seconds")); + public static void afterClass() throws Exception { + Await.result(system.terminate(), Duration.create("5 seconds")); } public static class MessageInitExample extends AbstractActor { diff --git a/akka-http-core/src/main/java/akka/http/impl/util/Util.java b/akka-http-core/src/main/java/akka/http/impl/util/Util.java index 57f417520f..b6e968ebcb 100644 --- a/akka-http-core/src/main/java/akka/http/impl/util/Util.java +++ b/akka-http-core/src/main/java/akka/http/impl/util/Util.java @@ -36,7 +36,6 @@ public abstract class Util { public static Source upcastSource(Source p) { return (Source)(Object) p; } - @SuppressWarnings("unchecked") public static scala.collection.immutable.Map convertMapToScala(Map map) { return emptyMap.$plus$plus(scala.collection.JavaConverters.mapAsScalaMapConverter(map).asScala()); } @@ -58,7 +57,6 @@ public abstract class Util { public static Seq convertIterable(Iterable els) { return scala.collection.JavaConverters.iterableAsScalaIterableConverter((Iterable)els).asScala().toVector(); } - @SuppressWarnings("unchecked") public static Seq convertArray(T[] els) { return Util.convertIterable(Arrays.asList(els)); } diff --git a/akka-http-core/src/main/java/akka/http/javadsl/model/Uri.java b/akka-http-core/src/main/java/akka/http/javadsl/model/Uri.java index 9c7728362e..2ff674d06d 100644 --- a/akka-http-core/src/main/java/akka/http/javadsl/model/Uri.java +++ b/akka-http-core/src/main/java/akka/http/javadsl/model/Uri.java @@ -4,14 +4,12 @@ package akka.http.javadsl.model; +import java.nio.charset.Charset; + import akka.http.impl.model.JavaUri; import akka.http.scaladsl.model.UriJavaAccessor; -import akka.japi.Pair; import akka.parboiled2.ParserInput$; -import java.nio.charset.Charset; -import java.util.List; -import java.util.Map; import java.util.Optional; /** diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index 42cf832dc3..f03a6e5ecd 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -108,7 +108,8 @@ private[http] object HttpServerBluePrint { val entity = createEntity(entityCreator) withSizeLimit settings.parserSettings.maxContentLength push(out, HttpRequest(effectiveMethod, uri, effectiveHeaders, entity, protocol)) - case _ ⇒ throw new IllegalStateException + case other ⇒ + throw new IllegalStateException(s"unexpected element of type ${other.getClass}") } } setHandler(in, idle) diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/WebsocketClientBlueprint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/WebsocketClientBlueprint.scala index 6d432f8577..5c205c2bfd 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/ws/WebsocketClientBlueprint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/ws/WebsocketClientBlueprint.scala @@ -96,11 +96,15 @@ object WebsocketClientBlueprint { parser.onPull() match { case NeedMoreData ⇒ ctx.pull() case RemainingBytes(bytes) ⇒ ctx.push(bytes) + case other ⇒ + throw new IllegalStateException(s"unexpected element of type ${other.getClass}") } case Left(problem) ⇒ result.success(InvalidUpgradeResponse(response, s"Websocket server at $uri returned $problem")) ctx.fail(throw new IllegalArgumentException(s"Websocket upgrade did not finish because of '$problem'")) } + case other ⇒ + throw new IllegalStateException(s"unexpected element of type ${other.getClass}") } } } diff --git a/akka-http-core/src/test/java/akka/http/javadsl/WSEchoTestClientApp.java b/akka-http-core/src/test/java/akka/http/javadsl/WSEchoTestClientApp.java index c7650c8466..e21133c75b 100644 --- a/akka-http-core/src/test/java/akka/http/javadsl/WSEchoTestClientApp.java +++ b/akka-http-core/src/test/java/akka/http/javadsl/WSEchoTestClientApp.java @@ -26,6 +26,7 @@ import java.util.List; public class WSEchoTestClientApp { private static final Function messageStringifier = new Function() { + private static final long serialVersionUID = 1L; @Override public String apply(Message msg) throws Exception { if (msg.isText() && msg.asTextMessage().isStrict()) diff --git a/akka-http-core/src/test/java/akka/http/javadsl/model/JavaApiTestCases.java b/akka-http-core/src/test/java/akka/http/javadsl/model/JavaApiTestCases.java index a20765384a..623c3c9e21 100644 --- a/akka-http-core/src/test/java/akka/http/javadsl/model/JavaApiTestCases.java +++ b/akka-http-core/src/test/java/akka/http/javadsl/model/JavaApiTestCases.java @@ -86,14 +86,14 @@ public class JavaApiTestCases { // just for the sake of explicitly touching the interfaces if (mediaType.binary()) anything = (akka.http.javadsl.model.MediaType.Binary) mediaType; - if (1 == 2) anything = (akka.http.javadsl.model.MediaType.Multipart) mediaType; - if (1 == 2) anything = (akka.http.javadsl.model.MediaType.WithOpenCharset) mediaType; - if (1 == 2) anything = (akka.http.javadsl.model.MediaType.WithFixedCharset) mediaType; + anything = (akka.http.javadsl.model.MediaType.Multipart) mediaType; + anything = (akka.http.javadsl.model.MediaType.WithOpenCharset) mediaType; + anything = (akka.http.javadsl.model.MediaType.WithFixedCharset) mediaType; if (type.binary()) anything = (akka.http.javadsl.model.ContentType.Binary) type; - if (1 == 2) anything = (akka.http.javadsl.model.ContentType.NonBinary) type; - if (1 == 2) anything = (akka.http.javadsl.model.ContentType.WithCharset) type; - if (1 == 2) anything = (akka.http.javadsl.model.ContentType.WithFixedCharset) type; + anything = (akka.http.javadsl.model.ContentType.NonBinary) type; + anything = (akka.http.javadsl.model.ContentType.WithCharset) type; + anything = (akka.http.javadsl.model.ContentType.WithFixedCharset) type; return anything; } diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WSClientAutobahnTest.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WSClientAutobahnTest.scala index a3eac1ae93..7b03976cfe 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WSClientAutobahnTest.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/ws/WSClientAutobahnTest.scala @@ -201,6 +201,8 @@ object WSClientAutobahnTest extends App { val sink = Sink.head[Message] runWs(uri, Flow.fromSinkAndSourceMat(sink, Source.maybe[Message])(Keep.left)).flatMap { case tm: TextMessage ⇒ tm.textStream.runWith(Sink.fold("")(_ + _)) + case other ⇒ + throw new IllegalStateException(s"unexpected element of type ${other.getClass}") } } def runToSingleJsonValue[T: JsonReader](uri: Uri): Future[T] = diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/model/UriSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/model/UriSpec.scala index 189e203a87..34329495d1 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/model/UriSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/model/UriSpec.scala @@ -370,7 +370,7 @@ class UriSpec extends WordSpec with Matchers { a[IllegalUriException] should be thrownBy Uri("foo/another@url/[]and{}", mode = Uri.ParsingMode.Strict) // handle query parameters with more than percent-encoded character - Uri("?%7Ba%7D=$%7B%7D", UTF8, Uri.ParsingMode.Strict).query() shouldEqual Query.Cons("{a}", "${}", Query.Empty) + Uri("?%7Ba%7D=$%7B%7D", UTF8, Uri.ParsingMode.Strict).query() shouldEqual Query.Cons("{a}", s"$${}", Query.Empty) // don't double decode Uri("%2520").path.head shouldEqual "%20" diff --git a/akka-http-testkit/src/main/scala/akka/http/javadsl/testkit/JUnitRouteTest.scala b/akka-http-testkit/src/main/scala/akka/http/javadsl/testkit/JUnitRouteTest.scala index 5db4429200..e993305671 100644 --- a/akka-http-testkit/src/main/scala/akka/http/javadsl/testkit/JUnitRouteTest.scala +++ b/akka-http-testkit/src/main/scala/akka/http/javadsl/testkit/JUnitRouteTest.scala @@ -10,8 +10,8 @@ import akka.http.scaladsl.model.HttpResponse import akka.stream.{ Materializer, ActorMaterializer } import org.junit.rules.ExternalResource import org.junit.{ Assert, Rule } - import scala.concurrent.duration._ +import scala.concurrent.Await /** * A RouteTest that uses JUnit assertions. ActorSystem and Materializer are provided as an [[ExternalResource]] @@ -66,8 +66,7 @@ class ActorSystemResource extends ExternalResource { _materializer = createMaterializer(_system) } override def after(): Unit = { - _system.terminate() - _system.awaitTermination(5.seconds) + Await.result(_system.terminate(), 5.seconds) _system = null _materializer = null } diff --git a/akka-http-tests/src/test/java/akka/http/javadsl/server/directives/CodingDirectivesTest.java b/akka-http-tests/src/test/java/akka/http/javadsl/server/directives/CodingDirectivesTest.java index 8a21bce59e..3ff90a4ab3 100644 --- a/akka-http-tests/src/test/java/akka/http/javadsl/server/directives/CodingDirectivesTest.java +++ b/akka-http-tests/src/test/java/akka/http/javadsl/server/directives/CodingDirectivesTest.java @@ -29,9 +29,8 @@ public class CodingDirectivesTest extends JUnitRouteTest { } @AfterClass - public static void tearDown() { - system.terminate(); - system.awaitTermination(); + public static void tearDown() throws Exception { + Await.result(system.terminate(), Duration.Inf()); system = null; } diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/FormDataSpec.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/FormDataSpec.scala index 259dcfee7f..e22c03fb0b 100644 --- a/akka-http-tests/src/test/scala/akka/http/scaladsl/FormDataSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/FormDataSpec.scala @@ -12,6 +12,7 @@ import akka.stream.ActorMaterializer import akka.http.scaladsl.unmarshalling.Unmarshal import akka.http.scaladsl.marshalling.Marshal import akka.http.scaladsl.model._ +import scala.concurrent.Await class FormDataSpec extends WordSpec with Matchers with ScalaFutures with BeforeAndAfterAll { implicit val system = ActorSystem(getClass.getSimpleName) @@ -36,7 +37,6 @@ class FormDataSpec extends WordSpec with Matchers with ScalaFutures with BeforeA } override def afterAll() = { - system.terminate() - system.awaitTermination(10.seconds) + Await.result(system.terminate(), 10.seconds) } } diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/coding/CodecSpecSupport.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/coding/CodecSpecSupport.scala index 3940d70cf3..6d26603777 100644 --- a/akka-http-tests/src/test/scala/akka/http/scaladsl/coding/CodecSpecSupport.scala +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/coding/CodecSpecSupport.scala @@ -9,6 +9,7 @@ import org.scalatest.{ Suite, BeforeAndAfterAll, Matchers } import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.util.ByteString +import scala.concurrent.Await trait CodecSpecSupport extends Matchers with BeforeAndAfterAll { self: Suite ⇒ @@ -73,7 +74,6 @@ est Lorem ipsum dolor sit amet. Lorem ipsum dolor sit amet, consetetur sadipscin implicit val materializer = ActorMaterializer() override def afterAll() = { - system.terminate() - system.awaitTermination(10.seconds) + Await.result(system.terminate(), 10.seconds) } } diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/DontLeakActorsOnFailingConnectionSpecs.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/DontLeakActorsOnFailingConnectionSpecs.scala index 54355e86f3..53989aa9b9 100644 --- a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/DontLeakActorsOnFailingConnectionSpecs.scala +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/DontLeakActorsOnFailingConnectionSpecs.scala @@ -95,8 +95,7 @@ class DontLeakActorsOnFailingConnectionSpecs extends WordSpecLike with Matchers } override def afterAll = { - system.terminate() - system.awaitTermination(3.seconds) + Await.result(system.terminate(), 3.seconds) } } diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/SecurityDirectives.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/SecurityDirectives.scala index 6acd75eb69..b9221f1d03 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/SecurityDirectives.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/SecurityDirectives.scala @@ -215,6 +215,8 @@ object Credentials { new Credentials.Provided(token) { def verify(secret: String): Boolean = secret secure_== token } + case Some(GenericHttpCredentials(scheme, token, params)) ⇒ + throw new UnsupportedOperationException("cannot verify generic HTTP credentials") case None ⇒ Credentials.Missing } } @@ -250,4 +252,4 @@ trait AuthenticationDirective[T] extends Directive1[T] { object AuthenticationDirective { implicit def apply[T](other: Directive1[T]): AuthenticationDirective[T] = new AuthenticationDirective[T] { def tapply(inner: Tuple1[T] ⇒ Route) = other.tapply(inner) } -} \ No newline at end of file +} diff --git a/akka-parsing/src/main/java/akka/parboiled2/util/Base64.java b/akka-parsing/src/main/java/akka/parboiled2/util/Base64.java index 5ec91e12f1..5d91450fc2 100644 --- a/akka-parsing/src/main/java/akka/parboiled2/util/Base64.java +++ b/akka-parsing/src/main/java/akka/parboiled2/util/Base64.java @@ -73,7 +73,6 @@ package akka.parboiled2.util; import java.util.Arrays; -@SuppressWarnings({"UnnecessaryParentheses"}) public class Base64 { // -------- FIELDS ------------------------------------------------------------------------------------------------- diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala index cdba6a21b6..61f1583bcb 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala @@ -121,7 +121,7 @@ abstract class RemoteRestartedQuarantinedSpec enterBarrier("still-quarantined") - system.awaitTermination(10.seconds) + Await.result(system.whenTerminated, 10.seconds) val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s""" akka.remote.retry-gate-closed-for = 0.5 s diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestPublisherSubscriberSpec.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestPublisherSubscriberSpec.scala index 83230f3596..ed338dba0f 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestPublisherSubscriberSpec.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TestPublisherSubscriberSpec.scala @@ -38,8 +38,8 @@ class TestPublisherSubscriberSpec extends AkkaSpec { upstreamSubscription.sendComplete() downstream.expectEventPF { - case c @ OnComplete ⇒ - case _ ⇒ fail() + case OnComplete ⇒ + case _ ⇒ fail() } } diff --git a/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java b/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java index d5672555d6..e59f4eb1bc 100644 --- a/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java @@ -44,6 +44,7 @@ public class ActorPublisherTest extends StreamTest { final Publisher publisher = UntypedActorPublisher.create(ref); Source.fromPublisher(publisher) .runForeach(new akka.japi.function.Procedure() { + private static final long serialVersionUID = 1L; @Override public void apply(Integer elem) throws Exception { probe.getRef().tell(elem, ActorRef.noSender()); diff --git a/akka-stream-tests/src/test/java/akka/stream/io/OutputStreamSourceTest.java b/akka-stream-tests/src/test/java/akka/stream/io/OutputStreamSourceTest.java index 7451ccef10..61a8c025f5 100644 --- a/akka-stream-tests/src/test/java/akka/stream/io/OutputStreamSourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/io/OutputStreamSourceTest.java @@ -3,25 +3,25 @@ */ package akka.stream.io; -import akka.actor.ActorRef; -import akka.japi.Pair; -import akka.japi.function.Procedure; -import akka.stream.StreamTest; -import akka.stream.javadsl.*; -import akka.stream.testkit.AkkaSpec; -import akka.stream.testkit.Utils; -import akka.testkit.JavaTestKit; -import akka.util.ByteString; -import com.typesafe.config.ConfigFactory; -import org.junit.ClassRule; -import org.junit.Test; -import scala.concurrent.Future; -import scala.concurrent.duration.FiniteDuration; +import static org.junit.Assert.assertEquals; import java.io.OutputStream; import java.util.concurrent.TimeUnit; -import static org.junit.Assert.assertEquals; +import org.junit.ClassRule; +import org.junit.Test; + +import akka.actor.ActorRef; +import akka.japi.function.Procedure; +import akka.stream.StreamTest; +import akka.stream.javadsl.AkkaJUnitActorSystemResource; +import akka.stream.javadsl.Sink; +import akka.stream.javadsl.Source; +import akka.stream.javadsl.StreamConverters; +import akka.stream.testkit.Utils; +import akka.testkit.JavaTestKit; +import akka.util.ByteString; +import scala.concurrent.duration.FiniteDuration; public class OutputStreamSourceTest extends StreamTest { public OutputStreamSourceTest() { @@ -38,6 +38,7 @@ public class OutputStreamSourceTest extends StreamTest { final Source source = StreamConverters.asOutputStream(timeout); final OutputStream s = source.to(Sink.foreach(new Procedure() { + private static final long serialVersionUID = 1L; public void apply(ByteString elem) { probe.getRef().tell(elem, ActorRef.noSender()); } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 56b08a685b..c3ad09a65b 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -385,7 +385,6 @@ public class FlowTest extends StreamTest { })).run(materializer); List output = Arrays.asList(probe.receiveN(3)); - @SuppressWarnings("unchecked") List> expected = Arrays.asList(new Pair("A", 1), new Pair( "B", 2), new Pair("C", 3)); assertEquals(expected, output); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 85f038dfb8..d254982998 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -450,6 +450,7 @@ public class SourceTest extends StreamTest { final JavaTestKit probe = new JavaTestKit(system); Source tickSource = Source.tick(FiniteDuration.create(1, TimeUnit.SECONDS), FiniteDuration.create(500, TimeUnit.MILLISECONDS), "tick"); + @SuppressWarnings("unused") Cancellable cancellable = tickSource.to(Sink.foreach(new Procedure() { public void apply(String elem) { probe.getRef().tell(elem, ActorRef.noSender()); diff --git a/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala index f282947502..70f6946a69 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala @@ -61,8 +61,7 @@ class ActorMaterializerSpec extends AkkaSpec with ImplicitSender { "report correctly if it has been shut down from the side" in { val sys = ActorSystem() val m = ActorMaterializer.create(sys) - sys.terminate() - sys.awaitTermination() + Await.result(sys.terminate(), Duration.Inf) m.isShutdown should ===(true) } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala index 18d0f05b03..aaee1a8b4d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslConsistencySpec.scala @@ -61,12 +61,12 @@ class DslConsistencySpec extends WordSpec with Matchers { "Java and Scala DSLs" must { - ("Source" -> List(sSourceClass, jSourceClass)) :: - ("SubSource" -> List(sSubSourceClass, jSubSourceClass)) :: - ("Flow" -> List(sFlowClass, jFlowClass)) :: - ("SubFlow" -> List(sSubFlowClass, jSubFlowClass)) :: - ("Sink" -> List(sSinkClass, jSinkClass)) :: - ("RunanbleFlow" -> List(sRunnableGraphClass, jRunnableGraphClass)) :: + ("Source" -> List[Class[_]](sSourceClass, jSourceClass)) :: + ("SubSource" -> List[Class[_]](sSubSourceClass, jSubSourceClass)) :: + ("Flow" -> List[Class[_]](sFlowClass, jFlowClass)) :: + ("SubFlow" -> List[Class[_]](sSubFlowClass, jSubFlowClass)) :: + ("Sink" -> List[Class[_]](sSinkClass, jSinkClass)) :: + ("RunanbleFlow" -> List[Class[_]](sRunnableGraphClass, jRunnableGraphClass)) :: Nil foreach { case (element, classes) ⇒ diff --git a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala index 32132014fe..a793085a64 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/actor/ActorSubscriberSpec.scala @@ -25,13 +25,13 @@ object ActorSubscriberSpec { override val requestStrategy = ZeroRequestStrategy def receive = { - case next @ OnNext(elem) ⇒ probe ! next - case complete @ OnComplete ⇒ probe ! complete - case err @ OnError(cause) ⇒ probe ! err - case "ready" ⇒ request(elements = 2) - case "boom" ⇒ throw new RuntimeException("boom") with NoStackTrace - case "requestAndCancel" ⇒ { request(1); cancel() } - case "cancel" ⇒ cancel() + case next @ OnNext(elem) ⇒ probe ! next + case OnComplete ⇒ probe ! OnComplete + case err @ OnError(cause) ⇒ probe ! err + case "ready" ⇒ request(elements = 2) + case "boom" ⇒ throw new RuntimeException("boom") with NoStackTrace + case "requestAndCancel" ⇒ { request(1); cancel() } + case "cancel" ⇒ cancel() } } @@ -55,8 +55,8 @@ object ActorSubscriberSpec { override val requestStrategy = strat def receive = { - case next @ OnNext(elem) ⇒ probe ! next - case complete @ OnComplete ⇒ probe ! complete + case next @ OnNext(elem) ⇒ probe ! next + case OnComplete ⇒ probe ! OnComplete } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala index cc2f396a21..f805a91080 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala @@ -368,8 +368,8 @@ trait GraphInterpreterSpecKit extends AkkaSpec { events } - class UpstreamOneBoundedProbe[T] extends UpstreamBoundaryStageLogic[T] { - val out = Outlet[T]("out") + class UpstreamOneBoundedProbe[TT] extends UpstreamBoundaryStageLogic[TT] { + val out = Outlet[TT]("out") out.id = 0 setHandler(out, new OutHandler { @@ -381,7 +381,7 @@ trait GraphInterpreterSpecKit extends AkkaSpec { override def onDownstreamFinish(): Unit = lastEvent += Cancel }) - def onNext(elem: T): Unit = { + def onNext(elem: TT): Unit = { push(out, elem) run() } @@ -390,7 +390,7 @@ trait GraphInterpreterSpecKit extends AkkaSpec { run() } - def onNextAndComplete(elem: T): Unit = { + def onNextAndComplete(elem: TT): Unit = { push(out, elem) complete(out) run() @@ -402,8 +402,8 @@ trait GraphInterpreterSpecKit extends AkkaSpec { } } - class DownstreamOneBoundedPortProbe[T] extends DownstreamBoundaryStageLogic[T] { - val in = Inlet[T]("in") + class DownstreamOneBoundedPortProbe[TT] extends DownstreamBoundaryStageLogic[TT] { + val in = Inlet[TT]("in") in.id = 0 setHandler(in, new InHandler { diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala index 26bc79aec0..5349026505 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala @@ -392,7 +392,7 @@ class TcpSpec extends AkkaSpec("akka.stream.materializer.subscription-timeout.ti val result = Source.maybe[ByteString].via(Tcp(system2).outgoingConnection(serverAddress)).runFold(0)(_ + _.size)(mat2) // Getting rid of existing connection actors by using a blunt instrument - system2.actorSelection(akka.io.Tcp(system2).getManager.path / "selectors" / "$a" / "*") ! Kill + system2.actorSelection(akka.io.Tcp(system2).getManager.path / "selectors" / s"$$a" / "*") ! Kill a[StreamTcpException] should be thrownBy Await.result(result, 3.seconds) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDispatcherSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDispatcherSpec.scala index 4953a7c396..b1586cbbf1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDispatcherSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDispatcherSpec.scala @@ -8,7 +8,7 @@ import akka.stream.testkit.AkkaSpec import akka.stream.ActorMaterializer import akka.stream.ActorMaterializerSettings -class FlowDispatcherSpec extends AkkaSpec("my-dispatcher = ${akka.test.stream-dispatcher}") { +class FlowDispatcherSpec extends AkkaSpec(s"my-dispatcher = $${akka.test.stream-dispatcher}") { val defaultSettings = ActorMaterializerSettings(system) diff --git a/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala index 33f89c898c..48a75cdf7c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala @@ -72,7 +72,7 @@ private[akka] class GroupByProcessorImpl(settings: ActorMaterializerSettings, va if (keyToSubstreamOutput.size == maxSubstreams) throw new IllegalStateException(s"cannot open substream for key '$key': too many substreams open") val substreamOutput = createSubstreamOutput() - val substreamFlow = Source.fromPublisher(substreamOutput) + val substreamFlow = Source.fromPublisher[Any](substreamOutput) primaryOutputs.enqueueOutputElement(substreamFlow) keyToSubstreamOutput(key) = substreamOutput nextPhase(dispatchToSubstream(elem, substreamOutput)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Fusing.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Fusing.scala index 185f9ccb09..f92dbfccb5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Fusing.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Fusing.scala @@ -504,6 +504,7 @@ private[stream] object Fusing { private def removeMapping[T](orig: T, map: ju.Map[T, List[T]]): T = map.remove(orig) match { case null ⇒ null.asInstanceOf[T] + case Nil ⇒ throw new IllegalStateException("mappings corrupted") case x :: Nil ⇒ x case x :: xs ⇒ map.put(orig, xs) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index b75816f5b1..04459848fe 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -11,9 +11,7 @@ import akka.stream.impl.SubscriptionTimeoutException import akka.stream.stage._ import akka.stream.scaladsl._ import akka.stream.actor.ActorSubscriberMessage -import akka.stream.actor.ActorSubscriberMessage._ import akka.stream.actor.ActorPublisherMessage -import akka.stream.actor.ActorPublisherMessage._ import java.{ util ⇒ ju } import scala.collection.immutable import scala.concurrent._ @@ -375,7 +373,9 @@ final class Split[T](decision: Split.SplitDecision, p: T ⇒ Boolean) extends Gr * INTERNAL API */ object SubSink { - val RequestOne = Request(1) // No need to frivolously allocate these + sealed trait Command + case object RequestOne extends Command + case object Cancel extends Command } /** @@ -390,30 +390,30 @@ final class SubSink[T](name: String, externalCallback: ActorSubscriberMessage override def initialAttributes = Attributes.name(s"SubSink($name)") override val shape = SinkShape(in) - val status = new AtomicReference[AnyRef] + private val status = new AtomicReference[AnyRef] def pullSubstream(): Unit = status.get match { case f: AsyncCallback[Any] @unchecked ⇒ f.invoke(RequestOne) case null ⇒ if (!status.compareAndSet(null, RequestOne)) - status.get.asInstanceOf[ActorPublisherMessage ⇒ Unit](RequestOne) + status.get.asInstanceOf[Command ⇒ Unit](RequestOne) } def cancelSubstream(): Unit = status.get match { case f: AsyncCallback[Any] @unchecked ⇒ f.invoke(Cancel) case x ⇒ // a potential RequestOne is overwritten if (!status.compareAndSet(x, Cancel)) - status.get.asInstanceOf[ActorPublisherMessage ⇒ Unit](Cancel) + status.get.asInstanceOf[Command ⇒ Unit](Cancel) } override def createLogic(attr: Attributes) = new GraphStageLogic(shape) with InHandler { setHandler(in, this) - override def onPush(): Unit = externalCallback(OnNext(grab(in))) - override def onUpstreamFinish(): Unit = externalCallback(OnComplete) - override def onUpstreamFailure(ex: Throwable): Unit = externalCallback(OnError(ex)) + override def onPush(): Unit = externalCallback(ActorSubscriberMessage.OnNext(grab(in))) + override def onUpstreamFinish(): Unit = externalCallback(ActorSubscriberMessage.OnComplete) + override def onUpstreamFailure(ex: Throwable): Unit = externalCallback(ActorSubscriberMessage.OnError(ex)) - @tailrec private def setCB(cb: AsyncCallback[ActorPublisherMessage]): Unit = { + @tailrec private def setCB(cb: AsyncCallback[Command]): Unit = { status.get match { case null ⇒ if (!status.compareAndSet(null, cb)) setCB(cb) @@ -429,7 +429,7 @@ final class SubSink[T](name: String, externalCallback: ActorSubscriberMessage } override def preStart(): Unit = { - val ourOwnCallback = getAsyncCallback[ActorPublisherMessage] { + val ourOwnCallback = getAsyncCallback[Command] { case RequestOne ⇒ tryPull(in) case Cancel ⇒ completeStage() case _ ⇒ throw new IllegalStateException("Bug") @@ -452,7 +452,7 @@ object SubSource { private[akka] def kill[T, M](s: Source[T, M]): Unit = { s.module match { case GraphStageModule(_, _, stage: SubSource[_]) ⇒ - stage.externalCallback.invoke(Cancel) + stage.externalCallback.invoke(SubSink.Cancel) case pub: PublisherSource[_] ⇒ pub.create(null)._1.subscribe(new CancellingSubscriber) case m ⇒ @@ -467,7 +467,7 @@ object SubSource { /** * INTERNAL API */ -final class SubSource[T](name: String, private[fusing] val externalCallback: AsyncCallback[ActorPublisherMessage]) +final class SubSource[T](name: String, private[fusing] val externalCallback: AsyncCallback[SubSink.Command]) extends GraphStage[SourceShape[T]] { import SubSink._ @@ -475,48 +475,48 @@ final class SubSource[T](name: String, private[fusing] val externalCallback: Asy override def initialAttributes = Attributes.name(s"SubSource($name)") override val shape: SourceShape[T] = SourceShape(out) - val status = new AtomicReference[AnyRef] + private val status = new AtomicReference[AnyRef] def pushSubstream(elem: T): Unit = status.get match { - case f: AsyncCallback[Any] @unchecked ⇒ f.invoke(OnNext(elem)) + case f: AsyncCallback[Any] @unchecked ⇒ f.invoke(ActorSubscriberMessage.OnNext(elem)) case _ ⇒ throw new IllegalStateException("cannot push to uninitialized substream") } def completeSubstream(): Unit = status.get match { - case f: AsyncCallback[Any] @unchecked ⇒ f.invoke(OnComplete) + case f: AsyncCallback[Any] @unchecked ⇒ f.invoke(ActorSubscriberMessage.OnComplete) case null ⇒ - if (!status.compareAndSet(null, OnComplete)) - status.get.asInstanceOf[AsyncCallback[Any]].invoke(OnComplete) + if (!status.compareAndSet(null, ActorSubscriberMessage.OnComplete)) + status.get.asInstanceOf[AsyncCallback[Any]].invoke(ActorSubscriberMessage.OnComplete) } def failSubstream(ex: Throwable): Unit = status.get match { - case f: AsyncCallback[Any] @unchecked ⇒ f.invoke(OnError(ex)) + case f: AsyncCallback[Any] @unchecked ⇒ f.invoke(ActorSubscriberMessage.OnError(ex)) case null ⇒ - val failure = OnError(ex) + val failure = ActorSubscriberMessage.OnError(ex) if (!status.compareAndSet(null, failure)) status.get.asInstanceOf[AsyncCallback[Any]].invoke(failure) } def timeout(d: FiniteDuration): Boolean = - status.compareAndSet(null, OnError(new SubscriptionTimeoutException(s"Substream Source has not been materialized in $d"))) + status.compareAndSet(null, ActorSubscriberMessage.OnError(new SubscriptionTimeoutException(s"Substream Source has not been materialized in $d"))) override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler { setHandler(out, this) @tailrec private def setCB(cb: AsyncCallback[ActorSubscriberMessage]): Unit = { status.get match { - case null ⇒ if (!status.compareAndSet(null, cb)) setCB(cb) - case OnComplete ⇒ completeStage() - case OnError(ex) ⇒ failStage(ex) - case _: AsyncCallback[_] ⇒ failStage(new IllegalStateException("Substream Source cannot be materialized more than once")) + case null ⇒ if (!status.compareAndSet(null, cb)) setCB(cb) + case ActorSubscriberMessage.OnComplete ⇒ completeStage() + case ActorSubscriberMessage.OnError(ex) ⇒ failStage(ex) + case _: AsyncCallback[_] ⇒ failStage(new IllegalStateException("Substream Source cannot be materialized more than once")) } } override def preStart(): Unit = { val ourOwnCallback = getAsyncCallback[ActorSubscriberMessage] { - case OnComplete ⇒ completeStage() - case OnError(ex) ⇒ failStage(ex) - case OnNext(elem) ⇒ push(out, elem.asInstanceOf[T]) + case ActorSubscriberMessage.OnComplete ⇒ completeStage() + case ActorSubscriberMessage.OnError(ex) ⇒ failStage(ex) + case ActorSubscriberMessage.OnNext(elem) ⇒ push(out, elem.asInstanceOf[T]) } setCB(ourOwnCallback) } diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index ef051e81f1..fcef781fae 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -1014,13 +1014,13 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: private var available = false private var closed = false - private val callback = getAsyncCallback[ActorPublisherMessage] { + private val callback = getAsyncCallback[SubSink.Command] { case SubSink.RequestOne ⇒ if (!closed) { available = true handler.onPull() } - case ActorPublisherMessage.Cancel ⇒ + case SubSink.Cancel ⇒ if (!closed) { available = false closed = true diff --git a/akka-testkit/src/main/java/akka/testkit/JavaTestKit.java b/akka-testkit/src/main/java/akka/testkit/JavaTestKit.java index 3a67cc855b..9aa7f724f2 100644 --- a/akka-testkit/src/main/java/akka/testkit/JavaTestKit.java +++ b/akka-testkit/src/main/java/akka/testkit/JavaTestKit.java @@ -558,7 +558,7 @@ public class JavaTestKit { this(clazz, max, Duration.Inf(), messages); } - @SuppressWarnings("unchecked") + @SuppressWarnings("all") public ReceiveWhile(Class clazz, Duration max, Duration idle, int messages) { results = p.receiveWhile(max, idle, messages, new CachingPartialFunction() { public T match(Object msg) throws Exception {