Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions exercises/src/test/java/c1_Introduction.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.Collections.emptyList;
import static org.junit.jupiter.api.Assertions.*;

/**
Expand Down Expand Up @@ -41,7 +41,7 @@ public class c1_Introduction extends IntroductionBase {
public void hello_world() {
Mono<String> serviceResult = hello_world_service();

String result = null; //todo: change this line only
String result = serviceResult.block(); //todo: change this line only

assertEquals("Hello World!", result);
}
Expand All @@ -55,7 +55,7 @@ public void unresponsive_service() {
Exception exception = assertThrows(IllegalStateException.class, () -> {
Mono<String> serviceResult = unresponsiveService();

String result = null; //todo: change this line only
String result = serviceResult.block(Duration.ofSeconds(1)); //todo: change this line only
});

String expectedMessage = "Timeout on blocking read for 1";
Expand All @@ -72,7 +72,7 @@ public void unresponsive_service() {
public void empty_service() {
Mono<String> serviceResult = emptyService();

Optional<String> optionalServiceResult = null; //todo: change this line only
Optional<String> optionalServiceResult = serviceResult.blockOptional(); //todo: change this line only

assertTrue(optionalServiceResult.isEmpty());
assertTrue(emptyServiceIsCalled.get());
Expand All @@ -89,7 +89,7 @@ public void empty_service() {
public void multi_result_service() {
Flux<String> serviceResult = multiResultService();

String result = serviceResult.toString(); //todo: change this line only
String result = serviceResult.blockFirst(); //todo: change this line only

assertEquals("valid result", result);
}
Expand All @@ -103,7 +103,8 @@ public void multi_result_service() {
public void fortune_top_five() {
Flux<String> serviceResult = fortuneTop5();

List<String> results = emptyList(); //todo: change this line only
// List<String> results = serviceResult.take(5).toStream().toList(); //emptyList(); //todo: change this line only
List<String> results = serviceResult.toStream().toList(); //emptyList(); //todo: change this line only

assertEquals(Arrays.asList("Walmart", "Amazon", "Apple", "CVS Health", "UnitedHealth Group"), results);
assertTrue(fortuneTop5ServiceIsCalled.get());
Expand All @@ -126,13 +127,17 @@ public void nothing_happens_until_you_() throws InterruptedException {

Flux<String> serviceResult = fortuneTop5();

serviceResult
var fluxHandle = serviceResult
.doOnNext(companyList::add)
//todo: add an operator here, don't use any blocking operator!
.subscribe() //todo: add an operator here, don't use any blocking operator!
;

//wait for the flux our handler 'companyList.add'
Thread.sleep(1000); //bonus: can you explain why this line is needed?

fluxHandle.dispose();


assertEquals(Arrays.asList("Walmart", "Amazon", "Apple", "CVS Health", "UnitedHealth Group"), companyList);
}

Expand All @@ -153,6 +158,7 @@ public void leaving_blocking_world_behind() throws InterruptedException {

fortuneTop5()
//todo: change this line only
.subscribe(companyList::add,null, () -> serviceCallCompleted.set(true))
;

Thread.sleep(1000);
Expand Down