use ByteString.empty
* and a few other cleanups
This commit is contained in:
parent
fe47d596bc
commit
184e45e6b2
9 changed files with 17 additions and 18 deletions
|
|
@ -63,7 +63,7 @@ public class JavaFutureTests extends JUnitSuite {
|
||||||
}, system.dispatcher());
|
}, system.dispatcher());
|
||||||
|
|
||||||
cf.success("foo");
|
cf.success("foo");
|
||||||
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
|
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||||
assertEquals(Await.result(f, timeout), "foo");
|
assertEquals(Await.result(f, timeout), "foo");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -81,7 +81,7 @@ public class JavaFutureTests extends JUnitSuite {
|
||||||
|
|
||||||
Throwable exception = new NullPointerException();
|
Throwable exception = new NullPointerException();
|
||||||
cf.failure(exception);
|
cf.failure(exception);
|
||||||
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
|
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||||
assertEquals(f.value().get().failed().get(), exception);
|
assertEquals(f.value().get().failed().get(), exception);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -97,7 +97,7 @@ public class JavaFutureTests extends JUnitSuite {
|
||||||
}, system.dispatcher());
|
}, system.dispatcher());
|
||||||
|
|
||||||
cf.success("foo");
|
cf.success("foo");
|
||||||
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
|
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||||
assertEquals(Await.result(f, timeout), "foo");
|
assertEquals(Await.result(f, timeout), "foo");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -113,7 +113,7 @@ public class JavaFutureTests extends JUnitSuite {
|
||||||
},system.dispatcher());
|
},system.dispatcher());
|
||||||
|
|
||||||
cf.success("foo");
|
cf.success("foo");
|
||||||
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
|
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||||
assertEquals(Await.result(f, timeout), "foo");
|
assertEquals(Await.result(f, timeout), "foo");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -135,7 +135,7 @@ public class JavaFutureTests extends JUnitSuite {
|
||||||
|
|
||||||
assertEquals(Await.result(f, timeout), "1000");
|
assertEquals(Await.result(f, timeout), "1000");
|
||||||
assertEquals(Await.result(r, timeout).intValue(), 1000);
|
assertEquals(Await.result(r, timeout).intValue(), 1000);
|
||||||
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
|
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
@ -151,7 +151,7 @@ public class JavaFutureTests extends JUnitSuite {
|
||||||
}), system.dispatcher());
|
}), system.dispatcher());
|
||||||
|
|
||||||
cf.success("foo");
|
cf.success("foo");
|
||||||
assertTrue(latch.await(5000, TimeUnit.MILLISECONDS));
|
assertTrue(latch.await(5, TimeUnit.SECONDS));
|
||||||
assertEquals(Await.result(f, timeout), "foo");
|
assertEquals(Await.result(f, timeout), "foo");
|
||||||
assertEquals(Await.result(r, timeout), "foo");
|
assertEquals(Await.result(r, timeout), "foo");
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -61,7 +61,7 @@ object AdaptiveLoadBalancingRouterConfig extends MultiNodeConfig {
|
||||||
// Extract individual sigar library for every node.
|
// Extract individual sigar library for every node.
|
||||||
nodeList foreach { role ⇒
|
nodeList foreach { role ⇒
|
||||||
nodeConfig(role) {
|
nodeConfig(role) {
|
||||||
ConfigFactory.parseString("akka.cluster.metrics.native-library-extract-folder=${user.dir}/target/native/" + role.name)
|
ConfigFactory.parseString(s"akka.cluster.metrics.native-library-extract-folder=$${user.dir}/target/native/" + role.name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -16,7 +16,6 @@ public class CustomRouteTestBase {
|
||||||
ActorRef responder = system.actorOf(Props.create(Responder.class), "TestResponder");
|
ActorRef responder = system.actorOf(Props.create(Responder.class), "TestResponder");
|
||||||
camel.context().addRoutes(new CustomRouteBuilder(responder));
|
camel.context().addRoutes(new CustomRouteBuilder(responder));
|
||||||
//#CustomRoute
|
//#CustomRoute
|
||||||
system.stop(responder);
|
|
||||||
} finally {
|
} finally {
|
||||||
JavaTestKit.shutdownActorSystem(system);
|
JavaTestKit.shutdownActorSystem(system);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -29,7 +29,7 @@ public class SchedulerPatternTest extends AbstractJavaTest {
|
||||||
|
|
||||||
private final Cancellable tick = getContext().system().scheduler().schedule(
|
private final Cancellable tick = getContext().system().scheduler().schedule(
|
||||||
Duration.create(500, TimeUnit.MILLISECONDS),
|
Duration.create(500, TimeUnit.MILLISECONDS),
|
||||||
Duration.create(1000, TimeUnit.MILLISECONDS),
|
Duration.create(1, TimeUnit.SECONDS),
|
||||||
getSelf(), "tick", getContext().dispatcher(), null);
|
getSelf(), "tick", getContext().dispatcher(), null);
|
||||||
//#schedule-constructor
|
//#schedule-constructor
|
||||||
// this variable and constructor is declared here to not show up in the docs
|
// this variable and constructor is declared here to not show up in the docs
|
||||||
|
|
@ -92,7 +92,7 @@ public class SchedulerPatternTest extends AbstractJavaTest {
|
||||||
if (message.equals("tick")) {
|
if (message.equals("tick")) {
|
||||||
// send another periodic tick after the specified delay
|
// send another periodic tick after the specified delay
|
||||||
getContext().system().scheduler().scheduleOnce(
|
getContext().system().scheduler().scheduleOnce(
|
||||||
Duration.create(1000, TimeUnit.MILLISECONDS),
|
Duration.create(1, TimeUnit.SECONDS),
|
||||||
getSelf(), "tick", getContext().dispatcher(), null);
|
getSelf(), "tick", getContext().dispatcher(), null);
|
||||||
// do something useful here
|
// do something useful here
|
||||||
//#schedule-receive
|
//#schedule-receive
|
||||||
|
|
|
||||||
|
|
@ -69,7 +69,7 @@ class RecipeByteStrings extends RecipeSpec {
|
||||||
val chunks = Await.result(chunksFuture, 3.seconds)
|
val chunks = Await.result(chunksFuture, 3.seconds)
|
||||||
|
|
||||||
chunks.forall(_.size <= 2) should be(true)
|
chunks.forall(_.size <= 2) should be(true)
|
||||||
chunks.fold(ByteString())(_ ++ _) should be(ByteString(1, 2, 3, 4, 5, 6, 7, 8, 9))
|
chunks.fold(ByteString.empty)(_ ++ _) should be(ByteString(1, 2, 3, 4, 5, 6, 7, 8, 9))
|
||||||
}
|
}
|
||||||
|
|
||||||
"have a working bytes limiter" in {
|
"have a working bytes limiter" in {
|
||||||
|
|
@ -108,7 +108,7 @@ class RecipeByteStrings extends RecipeSpec {
|
||||||
val bytes2 = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9, 10)))
|
val bytes2 = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9, 10)))
|
||||||
|
|
||||||
Await.result(bytes1.via(limiter).limit(10).runWith(Sink.seq), 3.seconds)
|
Await.result(bytes1.via(limiter).limit(10).runWith(Sink.seq), 3.seconds)
|
||||||
.fold(ByteString())(_ ++ _) should be(ByteString(1, 2, 3, 4, 5, 6, 7, 8, 9))
|
.fold(ByteString.empty)(_ ++ _) should be(ByteString(1, 2, 3, 4, 5, 6, 7, 8, 9))
|
||||||
|
|
||||||
an[IllegalStateException] must be thrownBy {
|
an[IllegalStateException] must be thrownBy {
|
||||||
Await.result(bytes2.via(limiter).limit(10).runWith(Sink.seq), 3.seconds)
|
Await.result(bytes2.via(limiter).limit(10).runWith(Sink.seq), 3.seconds)
|
||||||
|
|
|
||||||
|
|
@ -58,7 +58,7 @@ class FileUploadDirectivesSpec extends RoutingSpec {
|
||||||
fileUpload("field1") {
|
fileUpload("field1") {
|
||||||
case (info, bytes) ⇒
|
case (info, bytes) ⇒
|
||||||
// stream the bytes somewhere
|
// stream the bytes somewhere
|
||||||
val allBytesF = bytes.runFold(ByteString()) { (all, bytes) ⇒ all ++ bytes }
|
val allBytesF = bytes.runFold(ByteString.empty) { (all, bytes) ⇒ all ++ bytes }
|
||||||
|
|
||||||
// sum all individual file sizes
|
// sum all individual file sizes
|
||||||
onSuccess(allBytesF) { allBytes ⇒
|
onSuccess(allBytesF) { allBytes ⇒
|
||||||
|
|
@ -120,7 +120,7 @@ class FileUploadDirectivesSpec extends RoutingSpec {
|
||||||
fileUpload("missing") {
|
fileUpload("missing") {
|
||||||
case (info, bytes) ⇒
|
case (info, bytes) ⇒
|
||||||
// stream the bytes somewhere
|
// stream the bytes somewhere
|
||||||
val allBytesF = bytes.runFold(ByteString()) { (all, bytes) ⇒ all ++ bytes }
|
val allBytesF = bytes.runFold(ByteString.empty) { (all, bytes) ⇒ all ++ bytes }
|
||||||
|
|
||||||
// sum all individual file sizes
|
// sum all individual file sizes
|
||||||
onSuccess(allBytesF) { allBytes ⇒
|
onSuccess(allBytesF) { allBytes ⇒
|
||||||
|
|
|
||||||
|
|
@ -44,7 +44,7 @@ public class OutputStreamSinkTest extends StreamTest {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
final CompletionStage<IOResult> resultFuture = Source.single(ByteString.fromString("123456")).runWith(StreamConverters.fromOutputStream(() -> os), materializer);
|
final CompletionStage<IOResult> resultFuture = Source.single(ByteString.fromString("123456")).runWith(StreamConverters.fromOutputStream(() -> os), materializer);
|
||||||
final IOResult result = resultFuture.toCompletableFuture().get(3000, TimeUnit.MILLISECONDS);
|
final IOResult result = resultFuture.toCompletableFuture().get(3, TimeUnit.SECONDS);
|
||||||
|
|
||||||
assertFalse(result.wasSuccessful());
|
assertFalse(result.wasSuccessful());
|
||||||
assertTrue(result.getError().getMessage().equals("Can't accept more data."));
|
assertTrue(result.getError().getMessage().equals("Can't accept more data."));
|
||||||
|
|
|
||||||
|
|
@ -33,7 +33,7 @@ public class OutputStreamSourceTest extends StreamTest {
|
||||||
Utils.UnboundedMailboxConfig());
|
Utils.UnboundedMailboxConfig());
|
||||||
@Test
|
@Test
|
||||||
public void mustSendEventsViaOutputStream() throws Exception {
|
public void mustSendEventsViaOutputStream() throws Exception {
|
||||||
final FiniteDuration timeout = FiniteDuration.create(3000, TimeUnit.MILLISECONDS);
|
final FiniteDuration timeout = FiniteDuration.create(3, TimeUnit.SECONDS);
|
||||||
final JavaTestKit probe = new JavaTestKit(system);
|
final JavaTestKit probe = new JavaTestKit(system);
|
||||||
|
|
||||||
final Source<ByteString, OutputStream> source = StreamConverters.asOutputStream(timeout);
|
final Source<ByteString, OutputStream> source = StreamConverters.asOutputStream(timeout);
|
||||||
|
|
|
||||||
|
|
@ -122,7 +122,7 @@ final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration
|
||||||
downstreamStatus.set(Canceled)
|
downstreamStatus.set(Canceled)
|
||||||
dataQueue.clear()
|
dataQueue.clear()
|
||||||
// if blocked reading, make sure the take() completes
|
// if blocked reading, make sure the take() completes
|
||||||
dataQueue.put(ByteString())
|
dataQueue.put(ByteString.empty)
|
||||||
completeStage()
|
completeStage()
|
||||||
}
|
}
|
||||||
override def onPull(): Unit = {
|
override def onPull(): Unit = {
|
||||||
|
|
@ -135,7 +135,7 @@ final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration
|
||||||
} catch {
|
} catch {
|
||||||
case _: InterruptedException ⇒
|
case _: InterruptedException ⇒
|
||||||
Thread.interrupted()
|
Thread.interrupted()
|
||||||
ByteString()
|
ByteString.empty
|
||||||
} finally {
|
} finally {
|
||||||
blockingThread = null
|
blockingThread = null
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue