Spring Framework/Spring Boot

[Spring Boot] Redis Sorted Set, Spring Batch, STOMP를 이용하여 대기열 시스템 만들어보기

장쫄깃 2024. 12. 15. 15:59
728x90


Redis Sorted Set을 사용한 이유


대기열 시스템을 만들 때 Redis Sorted Set을 사용한 이유는 다음과 같다.

  • 우선순위 기반 작업 처리
    • 각 항목에 점수를 부여해 우선순위를 지정할 수 있으며, 점수에 따라 자동 정렬
    • 우선순위가 높은 작업을 먼저 처리하는 대기열 구현이 가능
  • 효율적인 데이터 조회 및 삭제
    • ZRANGE, ZPOPMIN과 같은 명령어를 통해 O(log(N)) 시간 복잡도로 정렬된 데이터를 조회하거나 삭제
  • 시간 기반 작업 처리
    • 점수로 타임스탬프를 사용할 수 있어 예약 작업이나 시간 순서에 따른 작업 처리가 가능
  • 중복 방지
    • Sorted Set은 동일한 키를 허용하지 않으므로 중복된 작업 추가를 방지하고 점수 업데이트로 대체 가능
  • 빠른 성능
    • Redis는 메모리 기반 데이터베이스로 매우 빠른 읽기/쓰기 성능을 제공하여 실시간 작업 처리에 적합
  • 간단한 사용
    • Spring Boot에서 Spring Data Redis를 활용하면 Sorted Set API를 쉽게 사용할 수 있어 구현이 간단

 

 

STOMP를 사용한 이유


대기열 시스템을 만들 때 STOMP를 사용한 이유는 다음과 같다.

  • 실시간 양방향 통신
    • WebSocket 기반으로 클라이언트와 서버 간 실시간 상태 업데이트 및 작업 결과 전송이 가능
  • 구독/발행(Pub/Sub) 패턴 지원
    • 특정 주제를 구독한 다수의 클라이언트에게 작업 상태나 결과를 동시에 전달
  • Spring Boot와의 통합 용이성
    • Spring WebSocket과 쉽게 연동되며, 메시지 라우팅 및 핸들링을 단순화하는 도구를 제공
  • 다양한 클라이언트 지원
    • 브라우저, 모바일, 데스크톱 등 여러 플랫폼에서 STOMP 클라이언트를 사용할 수 있어 범용성 높음
  • 가벼운 프로토콜 구조
    • 메시지 구조가 단순하고 경량화되어 빠르고 효율적인 메시지 전송 가능

 

 

대기열 시스템 비즈니스 로직


대기열 시스템 비즈니스 로직은 다음과 같다.

  1. 사용자 대기열 입장
    1. 사용자 key를 대기열 Queue에 저장
    2. rank는 time stamp 사용
    3. 사용자 web socket session key를 Redis에 저장
  2. 5초에 한 번씩 대기열 -> 작업열 이동 Job 실행
    1. 작업열이 가득 찼는지 체크
    2. 대기열의  순번이 가장 높은 사용자 조회
    3. 작업열로 이동이 가능한 사용자인지 체크
    4. 작업열로 이동
    5. 사용자에게 알림 전달
  3. 작업열이 가득 찬 경우 대기열 -> 작업열 이동 Job 종료
  4. 사용자 대기열 순번 Send Job 실행
    1. 대기열 Queue에 있는 사용자 목록 조회
    2. 각 사용자 순번 조회
    3. 사용자에게 순번 정보 Send
  5. 사용자 이탈 시 대기열, 작업열 및 web socket  session key 제거

 

 

구현


해당 게시글에서는 대기열 시스템 구현에 대한 코드를 중심으로 설명하려 한다. 전체적인 코드는 게시물 하단에 있는 깃허브 링크를 참고하면 된다.

 

기본 설정 구현

비즈니스 로직을 구현하기 전에 기본적인 설정, 유틸리티, 인터셉터 등을 구현한다.

1. build.gradle

plugins {
	id 'java'
	id 'org.springframework.boot' version '3.3.5'
	id 'io.spring.dependency-management' version '1.1.6'
}

group = 'com.jdh'
version = '0.0.1-SNAPSHOT'

java {
	toolchain {
		languageVersion = JavaLanguageVersion.of(17)
	}
}

configurations {
	compileOnly {
		extendsFrom annotationProcessor
	}
}

repositories {
	mavenCentral()
}

dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-jdbc'
	implementation 'org.springframework.batch:spring-batch-core'
	implementation 'org.springframework.boot:spring-boot-starter-websocket'
	implementation 'org.springframework.boot:spring-boot-starter-web'
	compileOnly 'org.projectlombok:lombok'
	developmentOnly 'org.springframework.boot:spring-boot-devtools'
	annotationProcessor 'org.projectlombok:lombok'
	testImplementation 'org.springframework.boot:spring-boot-starter-test'
	testRuntimeOnly 'org.junit.platform:junit-platform-launcher'

	// thymeleaf
	implementation 'org.springframework.boot:spring-boot-starter-thymeleaf'

	// h2
	runtimeOnly 'com.h2database:h2'

	// redis
    implementation 'org.springframework.boot:spring-boot-starter-data-redis'
	implementation 'org.springframework.session:spring-session-data-redis'
	implementation 'org.springframework.boot:spring-boot-starter-data-redis-reactive'
}

test {
	useJUnitPlatform()
}

 

2. application.yml

spring:
  main:
    allow-circular-references: true
  # 메타데이터 사용을 위한 h2
  datasource:
    url: jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1
    driver-class-name: org.h2.Driver
    username: sa
    password: password
  h2:
    console:
      enabled: true
      path: /h2-console
  sql:
    init:
      mode: always
  # redis
  data:
    redis:
      port: 6379
      host: ec2-13-125-169-147.ap-northeast-2.compute.amazonaws.com
      password: jangjjolkit!))$

queue:
  wait: waitingQueue
  job: jobQueue
  max-job-size: 1

 

Spring Batch에서 사용할 h2, 대기열 시스템에 사용할 Redis를 설정했다. 또, 대기열 시스템 구현 시 사용할 queue 정보를 추가한다.

 

3. WaitQueueApplication.java

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.session.data.redis.config.annotation.web.http.EnableRedisHttpSession;

@SpringBootApplication
@EnableRedisHttpSession
@EnableScheduling
public class WaitQueueApplication {

	public static void main(String[] args) {
		SpringApplication.run(WaitQueueApplication.class, args);
	}

}

 

Redis Session을 사용하기 위한 @EnableRedisHttpSession 어노테이션과 Scheduled 사용을 위한 @EnableScheduling 어노테이션을 적용한다.

 

4. BatchConfig.java

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.jdbc.datasource.init.ScriptUtils;

import javax.sql.DataSource;

@Configuration
@EnableBatchProcessing
public class BatchConfig {

    /**
     * batch 에서 사용할 h2 기본 테이블 생성
     */
    @Bean
    public ApplicationRunner initializeBatchTables(DataSource dataSource) {
        return args -> {
            Resource resource = new ClassPathResource("org/springframework/batch/core/schema-h2.sql");
            ScriptUtils.executeSqlScript(dataSource.getConnection(), resource);
        };
    }

}

 

Spring Batch에서 사용할 h2 기본 테이블을 생성하기 위한 Config Class이다.

 

5. RedisConfig.java

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfig {

    @Value("${spring.data.redis.host}")
    private String host;

    @Value(("${spring.data.redis.port}"))
    private int port;

    @Value(("${spring.data.redis.password}"))
    private String password;

    @Bean
    public RedisConnectionFactory redisConnectionFactory() {
        RedisStandaloneConfiguration redisConfiguration = new RedisStandaloneConfiguration();
        redisConfiguration.setHostName(host);
        redisConfiguration.setPort(port);
        redisConfiguration.setPassword(password);

        return new LettuceConnectionFactory(redisConfiguration);
    }

    @Bean
    public RedisTemplate<String, Object> redisTemplate() {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory());
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        return redisTemplate;
    }

}

 

6. WebSocketConfig.java

import com.jdh.wait_queue.handler.stomp.StompHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Slf4j
@Configuration
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    private final StompHandler stompHandler;

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.setApplicationDestinationPrefixes("/app");
        config.enableSimpleBroker("/topic");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/connect")
                .setAllowedOriginPatterns("*")
                .withSockJS();
    }

    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(stompHandler);
    }

}

 

WebSocket(STOMP) 사용을 위한 Config Class이다.

 

7. RedisUtil.java

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.stereotype.Component;

import java.time.Instant;
import java.util.Set;

@Slf4j
@Component
@AllArgsConstructor
public class RedisUtil {

    private final RedisTemplate<String, String> redisTemplate;

    // Add Redis
    public void add(String key, String value) {
        redisTemplate.opsForValue().set(key, value);
    }

    // Add Sorted Set
    public Boolean addZSet(String key, String value, double score) {
        return redisTemplate.opsForZSet().add(key, value, score);
    }

    // Add Sorted Set
    public void addZSet(String key, String value) {
        double score = Instant.now().toEpochMilli();
        redisTemplate.opsForZSet().add(key, value, score);
    }

    /**
     * Delete With Key
     */
    public Boolean delete(String key){
        return redisTemplate.delete(key);
    }

    /**
     * Delete Sorted Set With key, value
     */
    public Long deleteValue(String key, String value){
        return redisTemplate.opsForZSet().remove(key, value);
    }

    /**
     * Get Value With Key
     */
    public String getValue(String key){
        return redisTemplate.opsForValue().get(key);
    }

    /**
     * Get Sorted Set Size
     */
    public Long getSize(String key){
        ZSetOperations<String, String> z = redisTemplate.opsForZSet();
        return z.size(key);
    }

    /**
     * Get Sorted Set Start ~ End
     */
    public Set<String> zRange(String key, Long start, Long end){
        return redisTemplate.opsForZSet().range(key, start, end);
    }

    /**
     * Get Sorted Set Rank
     */
    public Long getzRank(String key, String value){
        return redisTemplate.opsForZSet().rank(key, value);
    }

    /**
     * Get Sorted Set Next
     */
    public String getNext(String key) {
        Set<String> result = redisTemplate.opsForZSet().range(key, 0, 0);

        return (result != null && !result.isEmpty()) ? result.iterator().next() : null;
    }
}

 

8. StompHandler.java

import com.jdh.wait_queue.util.redis.RedisUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.stereotype.Component;

import java.util.Objects;

@Slf4j
@Component
@RequiredArgsConstructor
public class StompHandler implements ChannelInterceptor {

    private final RedisUtil redisUtil;

    @Value("${queue.wait}")
    private String waitQueueKey;

    @Value("${queue.job}")
    private String jobQueueKey;

    @Override
    public Message<?> preSend(Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);

        // websocket 연결 시
        if (StompCommand.CONNECT == accessor.getCommand()) {
            log.info("[StompHandler] CONNECT header :: " + message.getHeaders());
        }
        // websocket 구독 요청 시
        else if (StompCommand.SUBSCRIBE == accessor.getCommand()) {
            // path에서 key 획득
            String key = Objects.requireNonNull(accessor.getDestination())
                    .substring(accessor.getDestination().lastIndexOf("/") + 1);

            // redis 대기열 저장
            redisUtil.addZSet(waitQueueKey, key);

            // redis web socket session key 저장
            redisUtil.add(accessor.getSessionId(), key);
            log.info("[StompHandler] ({}) {} key joined queue.", accessor.getSessionId(), key);
        }
        // websocket 연결 종료 시
        else if (StompCommand.DISCONNECT == accessor.getCommand()) {
            log.info("[StompHandler] DISCONNECT header :: " + message.getHeaders());

            // 세션 id로 key 가져오기
            String key = String.valueOf(redisUtil.getValue(accessor.getSessionId()));
            log.info("[StompHandler] ({}) {} key left queue.", accessor.getSessionId(), key);

            // redis 대기열에서 제거
            redisUtil.deleteValue(waitQueueKey, key);

            // redis 작업열에서 제거
            redisUtil.deleteValue(jobQueueKey, key);

            // redis session 제거
            redisUtil.delete(accessor.getSessionId());
        }

        return message;
    }

}

 

WebSocket 연결 시 발생하는 이벤트를 처리하는 ChannelInterceptor를 구현한 Handler Class다. 이 클래스는 크게 2가지 이벤트에서 동작을 수행한다.

 

  1. 사용자가 구독 요청을 할 때 (SUBSCRIBE)
    1. path에서 key(사용자) 추출
    2. 사용자를 대기열에 저장
    3. web socket session key로 사용자 key 저장
  2. 사용자가 web socket 연결을 종료할 때 (DISCONNECT)
    1. web socket session key로 사용자 key 조회
    2. 사용자를 대기열, 작업열에서 제거
    3. web socket session key 제거

web socket 구독 시 사용자를 대기열에 저장하고, 연결이 종료되면 대기열과 작업열에서 제거하는 역할을 한다.

 

 

대기열 -> 작업열 이동 Job 구현

대기열에 있는 사용자를 작업열로 이동시키는 Job을 구현해보자.

 

1. RedisToJobQueueItemReader.java

import com.jdh.wait_queue.util.redis.RedisUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.Objects;

@Slf4j
@Component
@RequiredArgsConstructor
public class RedisToJobQueueItemReader implements ItemReader<String> {

    @Value("${queue.wait}")
    private String waitQueueKey;

    @Value("${queue.job}")
    private String jobQueueKey;

    @Value("${queue.max-job-size}")
    private long maxJobSize;

    private final RedisUtil redisUtil;

    @Override
    public String read() {
        // 작업열이 꽉 찬 경우
        if(!checkMaxJobQueue()) {
            return null;
        }

        // 대기열의 다음 사용자 조회
        String result = String.valueOf(redisUtil.getNext(waitQueueKey));

        log.info("[RedisToJobQueueItemReader] 작업열로 넘어갈 사용자 :: {}", result);

        return "null".equals(result) || result.isEmpty() ? null : result; // 가장 오래된 사용자 반환
    }

    /**
     * check job queue count max
     */
    private boolean checkMaxJobQueue() {
        // 현재 작업열에 있는 사용자 수 조회
        Long jobCount = redisUtil.getSize(jobQueueKey);

        // 현재 작업열이 존재하지 않는 경우 default 0
        jobCount = Objects.requireNonNullElse(jobCount, 0L);

        log.info("[RedisToJobQueueItemReader] 현재 작업열에 있는 사용자 수 :: {}", jobCount);

        return jobCount < maxJobSize;
    }

}

 

Spring Batch의 ItemReader 인터페이스를 구현하여 Redis 대기열에서 사용자 정보를 읽고, 이를 작업열로 이동시키는 Class이다.

 

작업열이 꽉 찼는지 확인하고, 작업열에 여유 공간이 있다면 대기열에서 가장 첫 번째 사용자를 조회하여 Processor Class에서 처리할 수 있도록 전달한다.

 

2. RedisToJobQueueProcessor.java

import com.jdh.wait_queue.util.redis.RedisUtil;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class RedisToJobQueueProcessor implements ItemProcessor<String, String> {

    @Value("${queue.wait}")
    private String waitQueueKey;

    @Value("${queue.job}")
    private String jobQueueKey;

    private final RedisUtil redisUtil;

    private final SimpMessagingTemplate messagingTemplate;

    @Override
    public String process(@NonNull String key) {
        // 작업열에 추가하기 전, 대기열에 사용자가 존재하는지 확인
        Boolean isPossible = checkKeyExistsWaitQueue(key);

        // 대기열에 없거나 이미 처리 중인 사용자라면 null 반환
        if(Boolean.FALSE.equals(isPossible)) {
            return null;
        }

        // 대기열에서 처리한 사용자 제거
        redisUtil.deleteValue("waitingQueue", key);

        // 작업열로 사용자 추가
        redisUtil.addZSet(jobQueueKey, key);

        // 작업열로 이동한 사용자에게 STOMP를 통해 알림 전송
        messagingTemplate.convertAndSend("/topic/" + key, "active");

        return key;
    }

    /**
     * Check Key exists at Wait Queue
     */
    private Boolean checkKeyExistsWaitQueue(String key) {
        // 대기열에서 key의 rank 조회
        Long rank = redisUtil.getzRank(waitQueueKey, key);

        log.info("[RedisToJobQueueProcessor] {} 사용자 rank :: {}", key, rank);

        return rank != null && rank >= 0;
    }

}

 

Spring Batch의 ItemProcessor 인터페이스를 구현하여 Redis와 STOMP를 활용해 대기열과 작업열을 관리하는 로직을 처리하는 class이다.

 

대기열에서 사용자를 확인한 후, 해당 사용자가 대기 중일 경우 작업열로 이동시킨다. 이 과정은 대기열에서 사용자를 삭제하고 작업열에 추가하는 방식으로 이루어진다. 이후 작업열로 이동한 사용자에게 STOMP 알림을 전송한다.

 

3. RedisToJobQueueJobConfig.java

import com.jdh.wait_queue.processor.job.RedisToJobQueueItemReader;
import com.jdh.wait_queue.processor.job.RedisToJobQueueProcessor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

@Slf4j
@Configuration
@RequiredArgsConstructor
public class RedisToJobQueueJobConfig {

    private final JobRepository jobRepository;

    private final PlatformTransactionManager transactionManager;

    private final RedisToJobQueueItemReader redisToJobQueueItemReader;

    private final RedisToJobQueueProcessor redisToJobQueueProcessor;

    @Bean("moveUsersToJobQueueJob")
    public Job job() {
        return new JobBuilder("moveUsersToJobQueue", jobRepository)
                .start(moveUsersToJobQueueStep())
                .build();
    }

    @Bean
    public Step moveUsersToJobQueueStep() {
        return new StepBuilder("moveUsersToJobQueueStep", jobRepository)
                .<String, String>chunk(1, transactionManager) // 1명씩 처리
                .reader(redisToJobQueueItemReader)
                .processor(redisToJobQueueProcessor)
                .writer(items -> items.forEach(item -> log.info("[RedisToJobQueueJobConfig] Redis To Job Queue Job Processed item: {}", item))) // 처리된 데이터를 출력
                .build();
    }

}

 

Spring Batch를 사용하여 Redis 대기열에서 작업열로 이동시키는 배치 작업을 설정한 class이다. 각 스텝은 트랜잭션을 관리하면서 데이터를 하나씩 처리하고, 처리된 데이터를 로그로 출력한다.

 

 

사용자 대기열 순번 알림 전송 Job 구현

대기열에 있는 사용자들의 현재 순번을 알림으로 전송해주는 Job을 구현해보자.

 

1. RankWaitQueueItemReader.java

import com.jdh.wait_queue.util.redis.RedisUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.Iterator;
import java.util.Set;

@Slf4j
@Component
@RequiredArgsConstructor
public class RankWaitQueueItemReader implements ItemReader<String> {

    @Value("${queue.wait}")
    private String waitQueueKey;

    private final RedisUtil redisUtil;

    private Set<String> dataSet;
    private Iterator<String> iterator;

    @Override
    public String read() {
        // 조회된 데이터가 없는 경우
        if (iterator == null) {
            // 현재 대기열에 있는 사용자 수 조회
            Long waitCount = redisUtil.getSize(waitQueueKey);
            log.info("[RankWaitQueueJobConfig] 현재 대기열에 있는 사용자 수 :: {}", waitCount);

            // data set 조회
            dataSet = redisUtil.zRange("waitingQueue", 0L, waitCount);

            // Iterator로 변환
            iterator = dataSet.iterator();
        }

        // Iterator에서 다음 데이터 조회
        String result = iterator.hasNext() ? iterator.next() : null;
        log.info("[RankWaitQueueItemReader] 대기열 순서 전송할 사용자 :: {}", result);

        // Iterator에 더 이상 데이터가 없는 경우
        if(result == null) {
            // 데이터 초기화
            initData();
        }

        return result;
    }

    /**
     * initialize set, iterator
     */
    private void initData() {
        dataSet = null;
        iterator = null;
    }
}

 

Redis에서 대기열에 있는 사용자 데이터를 읽고, 처리할 수 있도록 Spring Batch의 ItemReader 인터페이스를 구현한 class이다. Redis의 ZRange 명령어로 대기열을 조회하고, Iterator를 사용해 순차적으로 데이터를 조회하고 Processor Class에서 처리할 수 있도록 전달한다.

 

2. RankWaitQueueProcessor.java

import com.jdh.wait_queue.util.redis.RedisUtil;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class RankWaitQueueProcessor implements ItemProcessor<String, String> {

    @Value("${queue.wait}")
    private String waitQueueKey;

    private final RedisUtil redisUtil;

    private final SimpMessagingTemplate messagingTemplate;

    @Override
    public String process(@NonNull String key) {
        // key가 빈 값인지 체크
        if(checkKeyEmpty(key))
            return null;

        // 현재 대기 순서 조회
        Long rank = redisUtil.getzRank(waitQueueKey, key);

        // 현재 대기 순서 전달
        if(rank != null)
            messagingTemplate.convertAndSend("/topic/" + key, "Your wait rank is " + (rank + 1));

        return key;
    }

    /**
     * check key empty
     */
    private boolean checkKeyEmpty(String key) {
        return key.isEmpty();
    }
}

 

 

Redis 대기열에서 사용자의 순위를 조회하고, 해당 순위를 STOMP 메시지로 사용자에게 전달하는 역할을 하는 class이다.

 

3. RankWaitQueueJobConfig.java

import com.jdh.wait_queue.processor.waitrank.RankWaitQueueItemReader;
import com.jdh.wait_queue.processor.waitrank.RankWaitQueueProcessor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

@Slf4j
@Configuration
@RequiredArgsConstructor
public class RankWaitQueueJobConfig {

    private final JobRepository jobRepository;

    private final PlatformTransactionManager transactionManager;

    private final RankWaitQueueItemReader rankWaitQueueItemReader;

    private final RankWaitQueueProcessor rankWaitQueueProcessor;

    @Bean("rankWaitQueueJob")
    public Job job() {
        return new JobBuilder("rankWaitQueue", jobRepository)
                .start(rankWaitQueueJobStep())
                .build();
    }

    @Bean
    public Step rankWaitQueueJobStep() {
        return new StepBuilder("rankWaitQueueJob", jobRepository)
                .<String, String>chunk(10, transactionManager) // 10명씩 처리
                .reader(rankWaitQueueItemReader)
                .processor(rankWaitQueueProcessor)
                .writer(items -> items.forEach(item -> log.info("[RankWaitQueueJobConfig] Rank Wait Queue Job Processed item: {}", item))) // 처리된 데이터를 출력
                .build();
    }

}

 

Spring Batch를 사용하여 Redis 대기열에 있는 사용자들의 순위를 알려주는 배치 작업을 설정한 class이다. 각 스텝은 트랜잭션을 관리하면서 데이터를 10개씩 처리하고, 처리된 데이터를 로그로 출력한다.

 

 

Batch 실행 Scheduler 설정

위에서 구현한 Batch를 정해진 시간마다 실행하는 Scheduler를 설정한다.

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.UUID;

@Slf4j
@Component
@RequiredArgsConstructor
public class BatchController {

    private final JobLauncher jobLauncher;

    @Qualifier("moveUsersToJobQueueJob")
    private final Job moveUsersToJobQueue;

    @Qualifier("rankWaitQueueJob")
    private final Job rankWaitQueue;

    @Scheduled(fixedRate = 5000)
    public void runBatch() {
        try {
            JobParameters jobParameters = new JobParametersBuilder()
                    .addString("runBatchId", UUID.randomUUID().toString()) // 유일한 파라미터 추가
                    .toJobParameters();

            // 대기열에서 작업열 이동 Job 실행
            jobLauncher.run(moveUsersToJobQueue, jobParameters);

            // 현재 대기열 순번 Send Job 실행
            jobLauncher.run(rankWaitQueue, jobParameters);
        } catch (Exception e) {
            log.error("[BatchController] :: ", e);
        }
    }

}

 

 

 

테스트


1. 사용자 대기열 진입

Enter Queue를 클릭하여 대기열에 진입하면 다음과 같은 화면이 출력된다.

 

2. 대기순번 메시지 출력

만약 작업열로 진입하지 못한 경우 다음과 같이 대기순번이 출력된다.

 

3. 작업열 진입

작업열로 이동할 수 있게 되어 작업열로 이동한 경우, 다음과 같이 출력된다.

 

 

4. 대기열, 작업열 이탈

만약 'Leave Queue'를 클릭하거나 새로고침을 하거나 웹페이지를 닫으면 대기열 및 작업열에서 자동으로 나가게 된다.

 

 

정리하며


작업열 이동 시 실제 페이지 이동은 SPA 설계를 통해 구현할 수 있을 것으로 보인다. 다만, 본 게시글은 Spring Boot 기반의 비즈니스 로직 구현에 초점을 맞추었기 때문에 해당 기능은 포함하지 않았다.

 

자세한 코드는 깃허브를 참고하길 바란다.

링크 : https://github.com/JangDaeHyeok/SpringBoot-WaitQueue

 

GitHub - JangDaeHyeok/SpringBoot-WaitQueue: Spring Boot Project for Wait Queue With Redis, WebSocket

Spring Boot Project for Wait Queue With Redis, WebSocket - JangDaeHyeok/SpringBoot-WaitQueue

github.com

 

 

728x90