본문 바로가기

Java/기술회고

Thread.sleep()은 왜 위험할까? Kafka DLQ로 안전하게 재시도 처리하기

문제의 시작: 왜 Thread.sleep()을 넣었을까?

실시간 경매 시스템을 개발하면서 동시성 문제에 직면했다. 여러 사용자가 동시에 같은 경매에 입찰할 때, 낙관적 락(@Version)을 사용해 데이터 정합성을 보장하고 있었다.

@Entity
public class Auction {
    @Version
    private Long version;  // 낙관적 락
    private Long currentPrice;
    // ...
}

하지만 문제가 있었다. 동시 입찰이 발생하면 OptimisticLockingFailureException이 발생하고, 입찰이 실패했다. 사용자 입장에서는 "입찰 실패" 메시지만 보게 되는 것이다.

첫 번째 시도: 수동 재시도 로직

"그럼 재시도하면 되지 않을까?" 라는 생각으로 이렇게 코드를 작성했다:

@Transactional
public void placeBid(Long auctionId, Long userId, Long bidAmount) {
    int maxRetries = 3;
    
    for (int attempt = 1; attempt <= maxRetries; attempt++) {
        try {
            doPlaceBid(auctionId, userId, bidAmount);
            return; // 성공!
        } catch (OptimisticLockingFailureException e) {
            log.warn("동시 입찰 감지 - 재시도 {}/{}", attempt, maxRetries);
            // 여기서 바로 재시도하면 또 충돌날 것 같은데
        }
    }
    
    throw new ResponseStatusException(HttpStatus.CONFLICT, "입찰 실패");
}

그런데 즉시 재시도하면 또 충돌이 발생했다. 다른 트랜잭션이 아직 커밋 중이기 때문이다.

 

Thread.sleep()의 사용

"잠깐만 기다렸다가 재시도하면 되지 않을까?" 라는 생각으로 이렇게 수정했다:

 
@Transactional
public void placeBid(Long auctionId, Long userId, Long bidAmount) {
    int maxRetries = 3;
    
    for (int attempt = 1; attempt <= maxRetries; attempt++) {
        try {
            doPlaceBid(auctionId, userId, bidAmount);
            return;
        } catch (OptimisticLockingFailureException e) {
            log.warn("동시 입찰 감지 - 재시도 {}/{}", attempt, maxRetries);
            try {
                Thread.sleep(10); //10ms 대기
            } catch (InterruptedException ignored) {}
        }
    }
    
    throw new ResponseStatusException(HttpStatus.CONFLICT, "입찰 실패");
}

동작은 했다. 동시 입찰 시 성공률이 크게 올라갔다. 하지만 이 코드는 문제를 안고 있다.

Thread.sleep()의 위험성

톰캣 스레드 블로킹

[요청 1] Thread-1 → sleep(10ms) → 블로킹 중...
[요청 2] Thread-2 → sleep(10ms) → 블로킹 중...
[요청 3] Thread-3 → sleep(10ms) → 블로킹 중...
...
[요청 200] Thread-200 → 스레드 풀 고갈!

톰캣의 기본 스레드 풀은 약 200개다. 동시 입찰이 많아지면 모든 스레드가 sleep() 상태로 블로킹되어 전체 서비스가 멈출 수 있다.

Kafka Consumer의 문제

나는 Kafka를 사용한다:

@KafkaListener(topics = "auction-bid", groupId = "auction-bid-group")
public void handleBid(ChatEvent event) {
    Long auctionId = event.roomId();
    Long userId = event.senderId();
    Long amount = Long.valueOf(event.content());
    
    auctionService.placeBid(auctionId, userId, amount); // Thread.sleep() 포함!
}

Consumer 스레드가 Thread.sleep()으로 블로킹되면:

  • Kafka는 Consumer가 "죽었다"고 판단할 수 있음
  • Rebalancing 발생 → 다른 Consumer에게 파티션 재할당
  • 메시지 중복 처리 가능

재시도 실패 시 메시지 손실

3번 재시도해도 실패하면?

throw new ResponseStatusException(HttpStatus.CONFLICT, "입찰 실패");
 
이 예외가 발생하면 Kafka 메시지는 그냥 사라진다. 사용자의 입찰 요청이 영원히 손실되는 것이다.

해결 방법: Kafka의 ErrorHandler + DLQ

"재시도는 애플리케이션이 아니라 Kafka가 하게 하자"

아키텍처 개선

Before:
[Kafka] → [Consumer] → [Service with Thread.sleep()] → [DB]
                              ↓ 실패 시
                          메시지 손실

After:
[Kafka] → [Consumer] → [Service (단순화)] → [DB]
            ↓ 실패 시
        [ErrorHandler]
            ↓ 재시도 2회
        [DLQ Topic] → [모니터링]

Kafka ErrorHandler 설정

@Configuration
@Slf4j
public class KafkaConsumerConfig {

    @Bean
    public DefaultErrorHandler errorHandler(KafkaTemplate<Object, Object> kafkaTemplate) {
        // DLQ로 메시지 전송하는 Recoverer
        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(
                kafkaTemplate,
                (record, ex) -> {
                    String dltTopic = record.topic() + ".DLT";
                    log.error("DLQ 전송: topic={}, key={}, reason={}", 
                        dltTopic, record.key(), ex.getClass().getSimpleName());
                    return new TopicPartition(dltTopic, 0);
                }
        );

        // 재시도 설정: 0ms 간격으로 2번 재시도
        FixedBackOff backOff = new FixedBackOff(0L, 2);
        DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer, backOff);

        // 재시도하지 않을 예외 지정
        errorHandler.addNotRetryableExceptions(
            ResponseStatusException.class,  // 비즈니스 검증 실패
            IllegalArgumentException.class  // 잘못된 파라미터
        );

        errorHandler.setRetryListeners((record, ex, attempt) ->
            log.warn("Kafka 재시도 {}/2회: key={}, exception={}", 
                attempt, record.key(), ex.getClass().getSimpleName()));

        return errorHandler;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConsumerFactory<Object, Object> cf,
            DefaultErrorHandler errorHandler
    ) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(cf);
        factory.setCommonErrorHandler(errorHandler);
        return factory;
    }
}

 

핵심 동작:

  • OptimisticLockingFailureException 발생 → Kafka가 자동 재시도 (2번)
  • ResponseStatusException 발생 → 재시도 없이 바로 DLQ로
  • 2번 재시도 후에도 실패 → DLQ로 이동 (메시지 보존!)

Service 레이어 간소화

@Service
@RequiredArgsConstructor
@Transactional(readOnly = true)
@Slf4j
public class AuctionService {

    private final AuctionRepository auctionRepository;
    private final AuctionBidRepository auctionBidRepository;
    private final UserService userService;
    private final ChatProducer chatProducer;

    @Transactional
    public void placeBid(Long auctionId, Long userId, Long bidAmount) {
        // 경매 조회
        Auction auction = auctionRepository.findById(auctionId)
            .orElseThrow(() -> new ResponseStatusException(HttpStatus.NOT_FOUND, "경매를 찾을 수 없습니다."));

        // 입찰자 조회
        User bidder = userService.findById(userId);

        // 입찰 검증
        auction.validateBid(userId, bidAmount);

        // 이전 가격 기록
        Long oldPrice = auction.getCurrentPrice();

        // 입찰 처리
        auction.placeBid(bidder, bidAmount, LocalDateTime.now());
        auctionRepository.save(auction);

        // 입찰 이력 저장
        saveBidHistory(auction, bidder, bidAmount);

        // 로그
        log.info("입찰 완료: auctionId={}, userId={}, {}원 → {}원", 
            auctionId, userId, oldPrice, bidAmount);

        // 브로드캐스트
        sendBroadCast(auction.getId(), bidder.getId(), bidder.getEmail(), bidAmount);
    }

    private void saveBidHistory(Auction auction, User bidder, Long bidAmount) {
        auctionBidRepository.save(
            AuctionBid.builder()
                .auction(auction)
                .bidder(bidder)
                .amount(bidAmount)
                .bidTime(LocalDateTime.now())
                .build()
        );
    }

    private void sendBroadCast(Long auctionId, Long userId, String email, Long bidAmount) {
        chatProducer.send(ChatEventFactory.auctionBid(auctionId, userId, email, bidAmount));
    }
}

 

변경 사항:

  • Thread.sleep() 제거
  • or (int attempt = ...) 재시도 로직 제거
  • doPlaceBid() 메서드 제거
  • 코드 30줄 → 15줄로 간소화
  • 비즈니스 로직에만 집중

DLQ 모니터링 추가

@Component
@Slf4j
public class AuctionBidDlqConsumer {

    @KafkaListener(
        topics = "auction-bid.DLT",
        groupId = "auction-dlq-monitor"
    )
    public void handleDlqMessage(
        ChatEvent event,
        @Header(KafkaHeaders.EXCEPTION_MESSAGE) String exceptionMessage,
        @Header(value = KafkaHeaders.EXCEPTION_STACKTRACE, required = false) String stackTrace
    ) {
        log.error(" [DLQ] 입찰 처리 실패 메시지 감지");
        log.error("  경매ID: {}", event.roomId());
        log.error("  입찰자ID: {}", event.senderId());
        log.error("  입찰금액: {}", event.content());
        log.error("  실패 이유: {}", exceptionMessage);
        
        // TODO: 운영 환경에서는
        // 1. Slack/Discord 알림 전송
        // 2. DB에 실패 로그 저장 (수동 재처리용)
        // 3. 관리자 대시보드에 표시
    }
}

정말 개선되었나?

시나리오 1: 동시 입찰 1000건

Before (Thread.sleep 사용):

평균 응답 시간: 150ms
P95 응답 시간: 350ms
스레드 풀 사용률: 85%

 

After (Kafka ErrorHandler):

평균 응답 시간: 45ms (70% 개선 )
P95 응답 시간: 120ms (66% 개선 )
스레드 풀 사용률: 25% (블로킹 제거 )
 
 

시나리오 2: DB 장애 시뮬레이션

# PostgreSQL 컨테이너 중지
docker stop postgres

# 입찰 메시지 100건 전송

 

Before:

처리 성공: 0건
메시지 손실: 100건

 

After:

처리 성공: 0건
DLQ 이동: 100건 (나중에 재처리 가능)

시나리오 3: 낙관적 락 충돌

로그 확인:

[Consumer] 입찰 메시지 수신: auctionId=1, userId=100
[Service] OptimisticLockingFailureException 발생
[ErrorHandler] Kafka 재시도 1/2회
[Service] OptimisticLockingFailureException 발생
[ErrorHandler] Kafka 재시도 2/2회
[Service] 입찰 완료: 10000원 → 15000원

→ Kafka가 비동기로 재시도하므로 다른 요청은 계속 처리됨

 

결과 비교

항목 Before (Thread.sleep)Before After (Kafka DLQ)
재시도 방식 동기 (Thread.sleep) 비동기 (Kafka)
스레드 블로킹 있음  없음 
실패 메시지 손실  DLQ 보존 
응답 속도 150ms 45ms 
코드 복잡도 30줄 15줄 
모니터링 어려움 DLQ Consumer 

 

배운 점

인프라에 책임 위임하기

처음에는 "애플리케이션 코드로 모든 걸 해결해야지" 라고 생각했다. 하지만 Kafka는 이미 강력한 재시도/복구 메커니즘을 제공한다.

 

Thread.sleep()은 간단해 보이지만:

  • 프로덕션 환경에서는 스레드 풀 고갈 위험
  • Kafka Consumer와 함께 사용 시 Rebalancing 유발
  • 테스트로는 발견하기 어려움

절대 프로덕션 코드에 Thread.sleep()을 넣지 말자.


분산 시스템에서 실패는 피할 수 없다:

  • 네트워크 타임아웃
  • DB 락 충돌
  • 서비스 재시작

중요한 건 실패를 어떻게 처리하느냐다. DLQ는 "실패해도 괜찮아, 나중에 다시 처리하면 돼" 라는 안전망을 제공한다.

 

마치며

"왜 Thread.sleep()을 넣었을까?" 에서 시작한 여정이 Kafka의 강력한 에러 처리 메커니즘을 발견하는 것으로 마무리되었다.

핵심은 "올바른 도구에 올바른 책임을 맡기는 것"이다:

  • 재시도는 Kafka에게
  • 비즈니스 로직은 Service에게
  • 모니터링은 DLQ Consumer에게