Spring/Reactive Programming
외부 API 호출 비동기적으로 처리하기 (WebClient, WebFlux에 대한 개
코딩균
2024. 2. 18. 20:46
문제 상황
- 프로젝트에서 다수의 외부 API(데이터 파이프라인 시스템)에 순차적으로 데이터셋 아이디를 넘기고, 반환되는 결과값(전송ID와 데이터셋ID)을 DB에 적재하는 로직이 있었다
- Rest template을 통해서 다수의 외부 API에 요청 후, 응답을 받으면 결과값 DB적재
- 그외 부차적인 DB 레코드 update
- 데이터 파이프라인 서비스의 데이터 처리가 느려서 외부 API의 응답이 느린경우, 클라이언트는 Timeout까지 대기
해결방법
- WebClient를 사용한 비동기 / Non-Blocking 외부 API request
- API로 들어온 요청은 외부 API request를 다른 쓰레드에서 처리 → Non-blocking으로 클라이언트에 200 응답
- 비동기로 응답받은 후, DB 업데이트 → 해당 데이터 내보내기 내역 상태값 업데이트
WebClient 선택 이유
RestTemplate
- 동기식 + Blocking I/O
- Java Servlet API 기반
- request 당 한개의 Thread 모델
- → 외부 API 요청시, Thread가 대기(Blocking)하면서 메모리와 CPU 리소스를 점거
- → 다른 요청들이 서버로 들어왔을 때, thread pool은 고갈되고 가용 메모리는 적어짐 (Java Worker Thread는 instance 별)
💡 CPU, Memory가 충분해도 Thread가 부족하여 처리율이 저하될 수 있다
💡 Thread를 늘릴 수록 CPU, Memory 부하로 이어져서 성능 저하
💡 Thread간 Context Switching으로 인한 부하 발생
→ 동기적 처리가 필요없는 외부 API 호출시, 쓰레드를 신경쓰지 않으면 성능 저하 문제 발생
WebClient
- 비동기식 + Non-Blocking I/O
- Spring Reactive Framework 기반 / WebFlux
- event-driven 아키텍처
- Reactive Framework가 task 들을 Queue에 적재하고 실행
- Thread를 Future, Actor, Callback을 발생시키는 이벤트 루프등과 공유
Blocking VS Non-Blocking | 동기 VS 비동기
Blocking
- 쓰레드가 I/O 작업을 기다리게 하는 방식
- system call을 통해 OS에 I/O 요청 후, I/O 완료했다는 메시지 받아야 기존의 쓰레드 지속 진행
Non-Blocking
- I/O 시작하자마자 함수 반환
- I/O 작업의 성공 실패는 알 수 없고, pending 여부만 알 수 있음
- Thread는 다른 작업 진행, I/O 완료는 callback 함수등을 통해 추후 처리
동기
- I/O 작업 완료 후 callback을 system call을 호출한 쓰레드에서 실행
비동기
- I/O 작업 완료 후, callback을 다른 쓰레드에서 실행
- system call한 쓰레드가 콜백 처리 할 수 없으면, 다른 쓰레드를 생성 혹은 쓰레드풀에서 꺼내와서 실행
WebFlux
Reactive Programming
- 비동기 데이터 Stream으로 Non-Blocking 어플리케이션을 구현하는 프로그래밍
- 이벤트 기반의 모델 - 데이터 사용할 준비가 완료되면, subscriber에게 push
- Netty Embedded Server
- 비동기 이벤트 기반의 고성능 네트워크 프레임워크
- 다수의 request를 한개의 Thread에서 처리 가능
비교
- Tomcat : 1개의 request → 1개의 Thread
- Node.js : 모든 request → 1개의 Thread
Reactor
Reactive Programming에 대한 명세로 interface로 구성
비동기적인 데이터 처리를 위해서 새로운 interface와 impl 필요하게됨
- publisher
- 데이터 생산
- Subscriber가 등록되지 건까지 아무 프로세스 진행하지 않음 → 등록되는 시점에 준비 완료된 데이터 push
- FeedBack
- 흐름을 제어하여 혼잡을 줄이기 위한 도구
- 모든 이벤트를 push하지 않고, N개만 보내어 subscriber의 처리량에 따라 조절
- Back-Pressure가 관장
- Operator
- 데이터에 대해서 각각의 단계를 적용하기 위한 처리
- 중간 publisher : upstream에 대한 subscriber / downstream에 대한 publisher
- flux.map() 등을 통해서 새로운 중간 publisher 생성
- map() : 데이터 변환하는 동기적인 작업 / flatMap() : 데이터를 변환하는 비동기적인 방법
구현체 - Flux
- Publisher<T> 상속받은 구현체
- 0~N개의 Item들을 produce(emit)
- static method : 생성 method / instance method : 비동기 파이프라인 구축 + 시퀀스 생성
- Subscriber들이 subscribe하기 전까지는 아무 행위 하지 않음
- onNext : 0~N개의 T 타입 원소들을 방출
- onComplete : 완료 → 터미널 이벤트로 흐름 종료
- onError : 에러 발생 → 터미널 이벤트로 흐름 종료
구현체 - Mono
- Publisher<T> 상속받은 구현체
- 0~1개의 Item들을 produce(emit)
- instance method는 Flux와 동일 → emit하는 Item 개수만 차이
WebClient 구현
의존성 주입
implementation 'org.springframework.boot:spring-boot-starter-webflux:3.2.2'
https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-webflux/3.2.2
config
@Slf4j
public class WebClientConfig {
@Bean
public WebClient webClient() throws SSLException {
SslContext sslContext = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
HttpClient httpClient = HttpClient.create()
.secure(t -> t.sslContext(sslContext)) // SSL 관련 설정
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) // connection 타임아웃 설정
.responseTimeout(Duration.ofMillis(3000)) // connection 이후 response에 걸리는 총 시간에 대한 설정
.doOnConnected(connection -> // connection 성공 시 consumer 설정
connection.addHandlerLast(new ReadTimeoutHandler(10)).addHandlerLast(new WriteTimeoutHandler(10)));
// Memory 2M로 조정
ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
.codecs(clientCodecConfigurer -> clientCodecConfigurer.defaultCodecs().maxInMemorySize(2 * 1024 * 1024))
.build();
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.filter(
ExchangeFilterFunction.ofRequestProcessor(
clientRequest -> {
log.info("Client Request - {} {} header={}", clientRequest.method(), clientRequest.url(), clientRequest.headers());
return Mono.just(clientRequest);
}
)
)
.filter(
ExchangeFilterFunction.ofResponseProcessor(
clientResponse -> {
log.info("Client Respond - statusCode={}", clientResponse.statusCode());
return Mono.just(clientResponse);
}
)
)
.exchangeStrategies(exchangeStrategies)
.build();
}
}
- connection timeout
- 서버의 장애 상황으로 connection 조차 맺어지지 않는 경우, 타임아웃 설정
- read timeout
- 클라이언트가 응답을 기다리지 못하는 타임아웃
- connection은 맺어졌지만, I/O작업이 길어지거나 Lock이 걸려서 요청이 처리되지 못하고 있는 경우 클라이언트가 connection을 끊는 타임아웃
- write timeout
- 클라리언트가 서버로 패킷을 보낼 수 있는 timeout
- connection은 맺어졌으나, 데이터 보내는 시간이 길어지면 타임아웃
- WebClient 는 javax.net.ssl.SSLContext이 아니라 io.netty.handler.ssl.SslContext를 사용
- 운영환경에서는 사용하는 것을 지양!! 테스트 용도
- 기존 RestTemplate에서는 JDK를 이용해서 insecure SSL 관련 처리를 진행 → WebClient에서는 reactor.netty.http.client.HttpClient 를 통해서 요청을 보내기 때문에, secure 메서드에 io.netty.handler.ssl.SslContext 를 전달하여서 모든 X.509 인증서 검증 없이 요청 보낼 수 있게함 - 운영 환경에서는 인증서 장착해야!
- 이 때 TrustManager에 InsecureTrustManagerFactory를 이용하며 모든 인증서를 신뢰할 수 있도록 처리하며
- 이것을 통해 생성한 ClientHttpConnector를 가지고 통신을 함
Blocking / Sync
@Test
@DisplayName("Blocking/Sync Client Request 정상")
void blockingSyncTest() {
TestResponseDto responseBody = webClient.get()
.uri(testUrl)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.onStatus(HttpStatusCode::isError, clientResponse ->
clientResponse.bodyToMono(String.class).map(body -> new Exception()))
.bodyToMono(TestResponseDto.class)
.block();
System.out.println("요청 완료 후 결과값 반환 Point");
Assertions.assertThat(responseBody.userId).isEqualTo(1);
Assertions.assertThat(responseBody.id).isEqualTo(1);
}
2024-02-18T20:23:45.942+09:00 INFO 4774 --- [ Test worker] c.L.LetMeDoWith.config.WebClientConfig : Client Request - GET <https://jsonplaceholder.typicode.com/todos/1> header=[Accept:"application/json"]
2024-02-18T20:23:46.360+09:00 INFO 4774 --- [ctor-http-nio-2] c.L.LetMeDoWith.config.WebClientConfig : Client Respond - statusCode=200 OK
요청 완료 후 결과값 반환 Point
Non-Blocking/Async
@Test
@DisplayName("Non-Blocking/Async Client Request 정상")
void nonBlockingAsyncTest() throws InterruptedException {
webClient.get()
.uri(testUrl)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.onStatus(HttpStatusCode::isError, clientResponse ->
clientResponse.bodyToMono(String.class).map(body -> new Exception()))
.bodyToMono(TestResponseDto.class)
.subscribe(body -> {
System.out.println(body.toString());
Assertions.assertThat(body.userId).isEqualTo(1);
Assertions.assertThat(body.id).isEqualTo(1);
});
System.out.println("요청 완료 후 결과값 반환 Point");
Thread.sleep(3000);
}
2024-02-18T20:26:52.882+09:00 INFO 4843 --- [ Test worker] c.L.LetMeDoWith.config.WebClientConfig : Client Request - GET <https://jsonplaceholder.typicode.com/todos/1> header=[Accept:"application/json"]
요청 완료 후 결과값 반환 Point
2024-02-18T20:26:53.285+09:00 INFO 4843 --- [ctor-http-nio-2] c.L.LetMeDoWith.config.WebClientConfig : Client Respond - statusCode=200 OK
TestResponseDto[userId=1, id=1, title=delectus aut autem, completed=false]
WebClient로는
- 기존 RestTemplate의 요청 방식(Blocking/Sync)
- MSA 환경에서 모듈간 API 호출에 용이한 Non-Blocking/Async 방식
두가지 모두 사용할 수 있어서, Java Spring 진영에서도 RestTemplate은 Deprecated 예정인 라이브러리라는 얘기가 있는것 같다.