Spring/Reactive Programming

외부 API 호출 비동기적으로 처리하기 (WebClient, WebFlux에 대한 개

코딩균 2024. 2. 18. 20:46

문제 상황

  • 프로젝트에서 다수의 외부 API(데이터 파이프라인 시스템)에 순차적으로 데이터셋 아이디를 넘기고, 반환되는 결과값(전송ID와 데이터셋ID)을 DB에 적재하는 로직이 있었다
    • Rest template을 통해서 다수의 외부 API에 요청 후, 응답을 받으면 결과값 DB적재
    • 그외 부차적인 DB 레코드 update
  • 데이터 파이프라인 서비스의 데이터 처리가 느려서 외부 API의 응답이 느린경우, 클라이언트는 Timeout까지 대기

해결방법

  • WebClient를 사용한 비동기 / Non-Blocking 외부 API request
  • API로 들어온 요청은 외부 API request를 다른 쓰레드에서 처리 → Non-blocking으로 클라이언트에 200 응답
  • 비동기로 응답받은 후, DB 업데이트 → 해당 데이터 내보내기 내역 상태값 업데이트

 

WebClient 선택 이유


RestTemplate

  • 동기식 + Blocking I/O
  • Java Servlet API 기반
  • request 당 한개의 Thread 모델
    • → 외부 API 요청시, Thread가 대기(Blocking)하면서 메모리와 CPU 리소스를 점거
    • → 다른 요청들이 서버로 들어왔을 때, thread pool은 고갈되고 가용 메모리는 적어짐 (Java Worker Thread는 instance 별)

💡 CPU, Memory가 충분해도 Thread가 부족하여 처리율이 저하될 수 있다

💡 Thread를 늘릴 수록 CPU, Memory 부하로 이어져서 성능 저하

💡 Thread간 Context Switching으로 인한 부하 발생

 

→ 동기적 처리가 필요없는 외부 API 호출시, 쓰레드를 신경쓰지 않으면 성능 저하 문제 발생

WebClient

  • 비동기식 + Non-Blocking I/O
  • Spring Reactive Framework 기반 / WebFlux
    • event-driven 아키텍처
    • Reactive Framework가 task 들을 Queue에 적재하고 실행
    • Thread를 Future, Actor, Callback을 발생시키는 이벤트 루프등과 공유

 

Blocking VS Non-Blocking | 동기 VS 비동기


Blocking

  • 쓰레드가 I/O 작업을 기다리게 하는 방식
  • system call을 통해 OS에 I/O 요청 후, I/O 완료했다는 메시지 받아야 기존의 쓰레드 지속 진행

Non-Blocking

  • I/O 시작하자마자 함수 반환
  • I/O 작업의 성공 실패는 알 수 없고, pending 여부만 알 수 있음
  • Thread는 다른 작업 진행, I/O 완료는 callback 함수등을 통해 추후 처리

동기

  • I/O 작업 완료 후 callback을 system call을 호출한 쓰레드에서 실행

비동기

  • I/O 작업 완료 후, callback을 다른 쓰레드에서 실행
  • system call한 쓰레드가 콜백 처리 할 수 없으면, 다른 쓰레드를 생성 혹은 쓰레드풀에서 꺼내와서 실행

WebFlux


Reactive Programming

  • 비동기 데이터 Stream으로 Non-Blocking 어플리케이션을 구현하는 프로그래밍
  • 이벤트 기반의 모델 - 데이터 사용할 준비가 완료되면, subscriber에게 push
  • Netty Embedded Server
    • 비동기 이벤트 기반의 고성능 네트워크 프레임워크
    • 다수의 request를 한개의 Thread에서 처리 가능

 비교

  • Tomcat : 1개의 request → 1개의 Thread
  • Node.js : 모든 request → 1개의 Thread

Reactor

Reactive Programming에 대한 명세로 interface로 구성

비동기적인 데이터 처리를 위해서 새로운 interface와 impl 필요하게됨

  • publisher
    • 데이터 생산
    • Subscriber가 등록되지 건까지 아무 프로세스 진행하지 않음 → 등록되는 시점에 준비 완료된 데이터 push
  • FeedBack
    • 흐름을 제어하여 혼잡을 줄이기 위한 도구
    • 모든 이벤트를 push하지 않고, N개만 보내어 subscriber의 처리량에 따라 조절
    • Back-Pressure가 관장
  • Operator
    • 데이터에 대해서 각각의 단계를 적용하기 위한 처리
    • 중간 publisher : upstream에 대한 subscriber / downstream에 대한 publisher
      • flux.map() 등을 통해서 새로운 중간 publisher 생성
      • map() : 데이터 변환하는 동기적인 작업 / flatMap() : 데이터를 변환하는 비동기적인 방법

구현체 - Flux

  • Publisher<T> 상속받은 구현체
  • 0~N개의 Item들을 produce(emit)
  • static method : 생성 method / instance method : 비동기 파이프라인 구축 + 시퀀스 생성
  • Subscriber들이 subscribe하기 전까지는 아무 행위 하지 않음
    • onNext : 0~N개의 T 타입 원소들을 방출
    • onComplete : 완료 → 터미널 이벤트로 흐름 종료
    • onError : 에러 발생 → 터미널 이벤트로 흐름 종료

구현체 - Mono

  • Publisher<T> 상속받은 구현체
  • 0~1개의 Item들을 produce(emit)
  • instance method는 Flux와 동일 → emit하는 Item 개수만 차이

WebClient 구현


의존성 주입

implementation 'org.springframework.boot:spring-boot-starter-webflux:3.2.2'

https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-webflux/3.2.2

config

@Slf4j
public class WebClientConfig {

	@Bean
	public WebClient webClient() throws SSLException {

		SslContext sslContext = SslContextBuilder.forClient()
			.trustManager(InsecureTrustManagerFactory.INSTANCE)
			.build();

		HttpClient httpClient = HttpClient.create()
			.secure(t -> t.sslContext(sslContext)) // SSL 관련 설정
			.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) // connection 타임아웃 설정
			.responseTimeout(Duration.ofMillis(3000)) // connection 이후 response에 걸리는 총 시간에 대한 설정
			.doOnConnected(connection -> // connection 성공 시 consumer 설정
				connection.addHandlerLast(new ReadTimeoutHandler(10)).addHandlerLast(new WriteTimeoutHandler(10)));

		// Memory 2M로 조정
		ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
			.codecs(clientCodecConfigurer -> clientCodecConfigurer.defaultCodecs().maxInMemorySize(2 * 1024 * 1024))
			.build();

		return WebClient.builder()
			.clientConnector(new ReactorClientHttpConnector(httpClient))
			.filter(
				ExchangeFilterFunction.ofRequestProcessor(
					clientRequest -> {
						log.info("Client Request - {} {} header={}", clientRequest.method(), clientRequest.url(), clientRequest.headers());
						return Mono.just(clientRequest);
					}
				)
			)
			.filter(
				ExchangeFilterFunction.ofResponseProcessor(
					clientResponse -> {
						log.info("Client Respond - statusCode={}", clientResponse.statusCode());
						return Mono.just(clientResponse);
					}
				)
			)
			.exchangeStrategies(exchangeStrategies)
			.build();
	}

}
  • connection timeout
    • 서버의 장애 상황으로 connection 조차 맺어지지 않는 경우, 타임아웃 설정
  • read timeout
    • 클라이언트가 응답을 기다리지 못하는 타임아웃
    • connection은 맺어졌지만, I/O작업이 길어지거나 Lock이 걸려서 요청이 처리되지 못하고 있는 경우 클라이언트가 connection을 끊는 타임아웃
  • write timeout
    • 클라리언트가 서버로 패킷을 보낼 수 있는 timeout
    • connection은 맺어졌으나, 데이터 보내는 시간이 길어지면 타임아웃
  • WebClient 는 javax.net.ssl.SSLContext이 아니라 io.netty.handler.ssl.SslContext를 사용
    • 운영환경에서는 사용하는 것을 지양!! 테스트 용도
    • 기존 RestTemplate에서는 JDK를 이용해서 insecure SSL 관련 처리를 진행 → WebClient에서는 reactor.netty.http.client.HttpClient 를 통해서 요청을 보내기 때문에, secure 메서드에 io.netty.handler.ssl.SslContext 를 전달하여서 모든 X.509 인증서 검증 없이 요청 보낼 수 있게함 - 운영 환경에서는 인증서 장착해야!
    • 이 때 TrustManager에 InsecureTrustManagerFactory를 이용하며 모든 인증서를 신뢰할 수 있도록 처리하며
    • 이것을 통해 생성한 ClientHttpConnector를 가지고 통신을 함

Blocking / Sync

@Test
@DisplayName("Blocking/Sync Client Request 정상")
void blockingSyncTest() {
		
		TestResponseDto responseBody = webClient.get()
			.uri(testUrl)
			.accept(MediaType.APPLICATION_JSON)
			.retrieve()
			.onStatus(HttpStatusCode::isError, clientResponse ->
				clientResponse.bodyToMono(String.class).map(body -> new Exception()))
			.bodyToMono(TestResponseDto.class)
			.block();

		System.out.println("요청 완료 후 결과값 반환 Point");

		Assertions.assertThat(responseBody.userId).isEqualTo(1);
		Assertions.assertThat(responseBody.id).isEqualTo(1);

}
2024-02-18T20:23:45.942+09:00  INFO 4774 --- [    Test worker] c.L.LetMeDoWith.config.WebClientConfig   : Client Request - GET <https://jsonplaceholder.typicode.com/todos/1> header=[Accept:"application/json"]
2024-02-18T20:23:46.360+09:00  INFO 4774 --- [ctor-http-nio-2] c.L.LetMeDoWith.config.WebClientConfig   : Client Respond - statusCode=200 OK
요청 완료 후 결과값 반환 Point

Non-Blocking/Async

@Test
@DisplayName("Non-Blocking/Async Client Request 정상")
void nonBlockingAsyncTest() throws InterruptedException {

		webClient.get()
			.uri(testUrl)
			.accept(MediaType.APPLICATION_JSON)
			.retrieve()
			.onStatus(HttpStatusCode::isError, clientResponse ->
				clientResponse.bodyToMono(String.class).map(body -> new Exception()))
			.bodyToMono(TestResponseDto.class)
			.subscribe(body -> {
				System.out.println(body.toString());
				Assertions.assertThat(body.userId).isEqualTo(1);
				Assertions.assertThat(body.id).isEqualTo(1);
			});

		System.out.println("요청 완료 후 결과값 반환 Point");

		Thread.sleep(3000);

}
2024-02-18T20:26:52.882+09:00  INFO 4843 --- [    Test worker] c.L.LetMeDoWith.config.WebClientConfig   : Client Request - GET <https://jsonplaceholder.typicode.com/todos/1> header=[Accept:"application/json"]
요청 완료 후 결과값 반환 Point
2024-02-18T20:26:53.285+09:00  INFO 4843 --- [ctor-http-nio-2] c.L.LetMeDoWith.config.WebClientConfig   : Client Respond - statusCode=200 OK
TestResponseDto[userId=1, id=1, title=delectus aut autem, completed=false]

 

 

 

WebClient로는

  • 기존 RestTemplate의 요청 방식(Blocking/Sync)
  • MSA 환경에서 모듈간 API 호출에 용이한 Non-Blocking/Async 방식

두가지 모두 사용할 수 있어서, Java Spring 진영에서도 RestTemplate은 Deprecated 예정인 라이브러리라는 얘기가 있는것 같다.