일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 | 31 |
- java
- aspect
- Filter
- OOP
- Interceptor
- 스프링
- network
- 스프링부트
- MYSQL
- request
- 스프링 시큐리티
- 디자인패턴
- response
- Spring Security
- 트랜잭션
- 객체지향프로그래밍
- aop
- proxy pattern
- http
- spring boot
- 인터셉터
- RestControllerAdvice
- mybatis
- Redis
- SQL
- 자바
- exception
- Spring
- 관점지향프로그래밍
- git
- Today
- Total
장쫄깃 기술블로그
[Spring Boot] Redis Sorted Set, Spring Batch, STOMP를 이용하여 대기열 시스템 만들어보기 본문
[Spring Boot] Redis Sorted Set, Spring Batch, STOMP를 이용하여 대기열 시스템 만들어보기
장쫄깃 2024. 12. 15. 15:59
Redis Sorted Set을 사용한 이유
대기열 시스템을 만들 때 Redis Sorted Set을 사용한 이유는 다음과 같다.
- 우선순위 기반 작업 처리
- 각 항목에 점수를 부여해 우선순위를 지정할 수 있으며, 점수에 따라 자동 정렬
- 우선순위가 높은 작업을 먼저 처리하는 대기열 구현이 가능
- 효율적인 데이터 조회 및 삭제
- ZRANGE, ZPOPMIN과 같은 명령어를 통해 O(log(N)) 시간 복잡도로 정렬된 데이터를 조회하거나 삭제
- 시간 기반 작업 처리
- 점수로 타임스탬프를 사용할 수 있어 예약 작업이나 시간 순서에 따른 작업 처리가 가능
- 중복 방지
- Sorted Set은 동일한 키를 허용하지 않으므로 중복된 작업 추가를 방지하고 점수 업데이트로 대체 가능
- 빠른 성능
- Redis는 메모리 기반 데이터베이스로 매우 빠른 읽기/쓰기 성능을 제공하여 실시간 작업 처리에 적합
- 간단한 사용
- Spring Boot에서 Spring Data Redis를 활용하면 Sorted Set API를 쉽게 사용할 수 있어 구현이 간단
STOMP를 사용한 이유
대기열 시스템을 만들 때 STOMP를 사용한 이유는 다음과 같다.
- 실시간 양방향 통신
- WebSocket 기반으로 클라이언트와 서버 간 실시간 상태 업데이트 및 작업 결과 전송이 가능
- 구독/발행(Pub/Sub) 패턴 지원
- 특정 주제를 구독한 다수의 클라이언트에게 작업 상태나 결과를 동시에 전달
- Spring Boot와의 통합 용이성
- Spring WebSocket과 쉽게 연동되며, 메시지 라우팅 및 핸들링을 단순화하는 도구를 제공
- 다양한 클라이언트 지원
- 브라우저, 모바일, 데스크톱 등 여러 플랫폼에서 STOMP 클라이언트를 사용할 수 있어 범용성 높음
- 가벼운 프로토콜 구조
- 메시지 구조가 단순하고 경량화되어 빠르고 효율적인 메시지 전송 가능
대기열 시스템 비즈니스 로직
대기열 시스템 비즈니스 로직은 다음과 같다.
- 사용자 대기열 입장
- 사용자 key를 대기열 Queue에 저장
- rank는 time stamp 사용
- 사용자 web socket session key를 Redis에 저장
- 5초에 한 번씩 대기열 -> 작업열 이동 Job 실행
- 작업열이 가득 찼는지 체크
- 대기열의 순번이 가장 높은 사용자 조회
- 작업열로 이동이 가능한 사용자인지 체크
- 작업열로 이동
- 사용자에게 알림 전달
- 작업열이 가득 찬 경우 대기열 -> 작업열 이동 Job 종료
- 사용자 대기열 순번 Send Job 실행
- 대기열 Queue에 있는 사용자 목록 조회
- 각 사용자 순번 조회
- 사용자에게 순번 정보 Send
- 사용자 이탈 시 대기열, 작업열 및 web socket session key 제거
구현
해당 게시글에서는 대기열 시스템 구현에 대한 코드를 중심으로 설명하려 한다. 전체적인 코드는 게시물 하단에 있는 깃허브 링크를 참고하면 된다.
기본 설정 구현
비즈니스 로직을 구현하기 전에 기본적인 설정, 유틸리티, 인터셉터 등을 구현한다.
1. build.gradle
plugins {
id 'java'
id 'org.springframework.boot' version '3.3.5'
id 'io.spring.dependency-management' version '1.1.6'
}
group = 'com.jdh'
version = '0.0.1-SNAPSHOT'
java {
toolchain {
languageVersion = JavaLanguageVersion.of(17)
}
}
configurations {
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-jdbc'
implementation 'org.springframework.batch:spring-batch-core'
implementation 'org.springframework.boot:spring-boot-starter-websocket'
implementation 'org.springframework.boot:spring-boot-starter-web'
compileOnly 'org.projectlombok:lombok'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
// thymeleaf
implementation 'org.springframework.boot:spring-boot-starter-thymeleaf'
// h2
runtimeOnly 'com.h2database:h2'
// redis
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
implementation 'org.springframework.session:spring-session-data-redis'
implementation 'org.springframework.boot:spring-boot-starter-data-redis-reactive'
}
test {
useJUnitPlatform()
}
2. application.yml
spring:
main:
allow-circular-references: true
# 메타데이터 사용을 위한 h2
datasource:
url: jdbc:h2:mem:testdb;DB_CLOSE_DELAY=-1
driver-class-name: org.h2.Driver
username: sa
password: password
h2:
console:
enabled: true
path: /h2-console
sql:
init:
mode: always
# redis
data:
redis:
port: 6379
host: ec2-13-125-169-147.ap-northeast-2.compute.amazonaws.com
password: jangjjolkit!))$
queue:
wait: waitingQueue
job: jobQueue
max-job-size: 1
Spring Batch에서 사용할 h2, 대기열 시스템에 사용할 Redis를 설정했다. 또, 대기열 시스템 구현 시 사용할 queue 정보를 추가한다.
3. WaitQueueApplication.java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.session.data.redis.config.annotation.web.http.EnableRedisHttpSession;
@SpringBootApplication
@EnableRedisHttpSession
@EnableScheduling
public class WaitQueueApplication {
public static void main(String[] args) {
SpringApplication.run(WaitQueueApplication.class, args);
}
}
Redis Session을 사용하기 위한 @EnableRedisHttpSession 어노테이션과 Scheduled 사용을 위한 @EnableScheduling 어노테이션을 적용한다.
4. BatchConfig.java
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
import org.springframework.jdbc.datasource.init.ScriptUtils;
import javax.sql.DataSource;
@Configuration
@EnableBatchProcessing
public class BatchConfig {
/**
* batch 에서 사용할 h2 기본 테이블 생성
*/
@Bean
public ApplicationRunner initializeBatchTables(DataSource dataSource) {
return args -> {
Resource resource = new ClassPathResource("org/springframework/batch/core/schema-h2.sql");
ScriptUtils.executeSqlScript(dataSource.getConnection(), resource);
};
}
}
Spring Batch에서 사용할 h2 기본 테이블을 생성하기 위한 Config Class이다.
5. RedisConfig.java
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
@Value("${spring.data.redis.host}")
private String host;
@Value(("${spring.data.redis.port}"))
private int port;
@Value(("${spring.data.redis.password}"))
private String password;
@Bean
public RedisConnectionFactory redisConnectionFactory() {
RedisStandaloneConfiguration redisConfiguration = new RedisStandaloneConfiguration();
redisConfiguration.setHostName(host);
redisConfiguration.setPort(port);
redisConfiguration.setPassword(password);
return new LettuceConnectionFactory(redisConfiguration);
}
@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory());
redisTemplate.setKeySerializer(new StringRedisSerializer());
return redisTemplate;
}
}
6. WebSocketConfig.java
import com.jdh.wait_queue.handler.stomp.StompHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Slf4j
@Configuration
@EnableWebSocketMessageBroker
@RequiredArgsConstructor
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
private final StompHandler stompHandler;
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.setApplicationDestinationPrefixes("/app");
config.enableSimpleBroker("/topic");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/connect")
.setAllowedOriginPatterns("*")
.withSockJS();
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(stompHandler);
}
}
WebSocket(STOMP) 사용을 위한 Config Class이다.
7. RedisUtil.java
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.stereotype.Component;
import java.time.Instant;
import java.util.Set;
@Slf4j
@Component
@AllArgsConstructor
public class RedisUtil {
private final RedisTemplate<String, String> redisTemplate;
// Add Redis
public void add(String key, String value) {
redisTemplate.opsForValue().set(key, value);
}
// Add Sorted Set
public Boolean addZSet(String key, String value, double score) {
return redisTemplate.opsForZSet().add(key, value, score);
}
// Add Sorted Set
public void addZSet(String key, String value) {
double score = Instant.now().toEpochMilli();
redisTemplate.opsForZSet().add(key, value, score);
}
/**
* Delete With Key
*/
public Boolean delete(String key){
return redisTemplate.delete(key);
}
/**
* Delete Sorted Set With key, value
*/
public Long deleteValue(String key, String value){
return redisTemplate.opsForZSet().remove(key, value);
}
/**
* Get Value With Key
*/
public String getValue(String key){
return redisTemplate.opsForValue().get(key);
}
/**
* Get Sorted Set Size
*/
public Long getSize(String key){
ZSetOperations<String, String> z = redisTemplate.opsForZSet();
return z.size(key);
}
/**
* Get Sorted Set Start ~ End
*/
public Set<String> zRange(String key, Long start, Long end){
return redisTemplate.opsForZSet().range(key, start, end);
}
/**
* Get Sorted Set Rank
*/
public Long getzRank(String key, String value){
return redisTemplate.opsForZSet().rank(key, value);
}
/**
* Get Sorted Set Next
*/
public String getNext(String key) {
Set<String> result = redisTemplate.opsForZSet().range(key, 0, 0);
return (result != null && !result.isEmpty()) ? result.iterator().next() : null;
}
}
8. StompHandler.java
import com.jdh.wait_queue.util.redis.RedisUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.stereotype.Component;
import java.util.Objects;
@Slf4j
@Component
@RequiredArgsConstructor
public class StompHandler implements ChannelInterceptor {
private final RedisUtil redisUtil;
@Value("${queue.wait}")
private String waitQueueKey;
@Value("${queue.job}")
private String jobQueueKey;
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
// websocket 연결 시
if (StompCommand.CONNECT == accessor.getCommand()) {
log.info("[StompHandler] CONNECT header :: " + message.getHeaders());
}
// websocket 구독 요청 시
else if (StompCommand.SUBSCRIBE == accessor.getCommand()) {
// path에서 key 획득
String key = Objects.requireNonNull(accessor.getDestination())
.substring(accessor.getDestination().lastIndexOf("/") + 1);
// redis 대기열 저장
redisUtil.addZSet(waitQueueKey, key);
// redis web socket session key 저장
redisUtil.add(accessor.getSessionId(), key);
log.info("[StompHandler] ({}) {} key joined queue.", accessor.getSessionId(), key);
}
// websocket 연결 종료 시
else if (StompCommand.DISCONNECT == accessor.getCommand()) {
log.info("[StompHandler] DISCONNECT header :: " + message.getHeaders());
// 세션 id로 key 가져오기
String key = String.valueOf(redisUtil.getValue(accessor.getSessionId()));
log.info("[StompHandler] ({}) {} key left queue.", accessor.getSessionId(), key);
// redis 대기열에서 제거
redisUtil.deleteValue(waitQueueKey, key);
// redis 작업열에서 제거
redisUtil.deleteValue(jobQueueKey, key);
// redis session 제거
redisUtil.delete(accessor.getSessionId());
}
return message;
}
}
WebSocket 연결 시 발생하는 이벤트를 처리하는 ChannelInterceptor를 구현한 Handler Class다. 이 클래스는 크게 2가지 이벤트에서 동작을 수행한다.
- 사용자가 구독 요청을 할 때 (SUBSCRIBE)
- path에서 key(사용자) 추출
- 사용자를 대기열에 저장
- web socket session key로 사용자 key 저장
- 사용자가 web socket 연결을 종료할 때 (DISCONNECT)
- web socket session key로 사용자 key 조회
- 사용자를 대기열, 작업열에서 제거
- web socket session key 제거
web socket 구독 시 사용자를 대기열에 저장하고, 연결이 종료되면 대기열과 작업열에서 제거하는 역할을 한다.
대기열 -> 작업열 이동 Job 구현
대기열에 있는 사용자를 작업열로 이동시키는 Job을 구현해보자.
1. RedisToJobQueueItemReader.java
import com.jdh.wait_queue.util.redis.RedisUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.Objects;
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisToJobQueueItemReader implements ItemReader<String> {
@Value("${queue.wait}")
private String waitQueueKey;
@Value("${queue.job}")
private String jobQueueKey;
@Value("${queue.max-job-size}")
private long maxJobSize;
private final RedisUtil redisUtil;
@Override
public String read() {
// 작업열이 꽉 찬 경우
if(!checkMaxJobQueue()) {
return null;
}
// 대기열의 다음 사용자 조회
String result = String.valueOf(redisUtil.getNext(waitQueueKey));
log.info("[RedisToJobQueueItemReader] 작업열로 넘어갈 사용자 :: {}", result);
return "null".equals(result) || result.isEmpty() ? null : result; // 가장 오래된 사용자 반환
}
/**
* check job queue count max
*/
private boolean checkMaxJobQueue() {
// 현재 작업열에 있는 사용자 수 조회
Long jobCount = redisUtil.getSize(jobQueueKey);
// 현재 작업열이 존재하지 않는 경우 default 0
jobCount = Objects.requireNonNullElse(jobCount, 0L);
log.info("[RedisToJobQueueItemReader] 현재 작업열에 있는 사용자 수 :: {}", jobCount);
return jobCount < maxJobSize;
}
}
Spring Batch의 ItemReader 인터페이스를 구현하여 Redis 대기열에서 사용자 정보를 읽고, 이를 작업열로 이동시키는 Class이다.
작업열이 꽉 찼는지 확인하고, 작업열에 여유 공간이 있다면 대기열에서 가장 첫 번째 사용자를 조회하여 Processor Class에서 처리할 수 있도록 전달한다.
2. RedisToJobQueueProcessor.java
import com.jdh.wait_queue.util.redis.RedisUtil;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisToJobQueueProcessor implements ItemProcessor<String, String> {
@Value("${queue.wait}")
private String waitQueueKey;
@Value("${queue.job}")
private String jobQueueKey;
private final RedisUtil redisUtil;
private final SimpMessagingTemplate messagingTemplate;
@Override
public String process(@NonNull String key) {
// 작업열에 추가하기 전, 대기열에 사용자가 존재하는지 확인
Boolean isPossible = checkKeyExistsWaitQueue(key);
// 대기열에 없거나 이미 처리 중인 사용자라면 null 반환
if(Boolean.FALSE.equals(isPossible)) {
return null;
}
// 대기열에서 처리한 사용자 제거
redisUtil.deleteValue("waitingQueue", key);
// 작업열로 사용자 추가
redisUtil.addZSet(jobQueueKey, key);
// 작업열로 이동한 사용자에게 STOMP를 통해 알림 전송
messagingTemplate.convertAndSend("/topic/" + key, "active");
return key;
}
/**
* Check Key exists at Wait Queue
*/
private Boolean checkKeyExistsWaitQueue(String key) {
// 대기열에서 key의 rank 조회
Long rank = redisUtil.getzRank(waitQueueKey, key);
log.info("[RedisToJobQueueProcessor] {} 사용자 rank :: {}", key, rank);
return rank != null && rank >= 0;
}
}
Spring Batch의 ItemProcessor 인터페이스를 구현하여 Redis와 STOMP를 활용해 대기열과 작업열을 관리하는 로직을 처리하는 class이다.
대기열에서 사용자를 확인한 후, 해당 사용자가 대기 중일 경우 작업열로 이동시킨다. 이 과정은 대기열에서 사용자를 삭제하고 작업열에 추가하는 방식으로 이루어진다. 이후 작업열로 이동한 사용자에게 STOMP 알림을 전송한다.
3. RedisToJobQueueJobConfig.java
import com.jdh.wait_queue.processor.job.RedisToJobQueueItemReader;
import com.jdh.wait_queue.processor.job.RedisToJobQueueProcessor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
@Slf4j
@Configuration
@RequiredArgsConstructor
public class RedisToJobQueueJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
private final RedisToJobQueueItemReader redisToJobQueueItemReader;
private final RedisToJobQueueProcessor redisToJobQueueProcessor;
@Bean("moveUsersToJobQueueJob")
public Job job() {
return new JobBuilder("moveUsersToJobQueue", jobRepository)
.start(moveUsersToJobQueueStep())
.build();
}
@Bean
public Step moveUsersToJobQueueStep() {
return new StepBuilder("moveUsersToJobQueueStep", jobRepository)
.<String, String>chunk(1, transactionManager) // 1명씩 처리
.reader(redisToJobQueueItemReader)
.processor(redisToJobQueueProcessor)
.writer(items -> items.forEach(item -> log.info("[RedisToJobQueueJobConfig] Redis To Job Queue Job Processed item: {}", item))) // 처리된 데이터를 출력
.build();
}
}
Spring Batch를 사용하여 Redis 대기열에서 작업열로 이동시키는 배치 작업을 설정한 class이다. 각 스텝은 트랜잭션을 관리하면서 데이터를 하나씩 처리하고, 처리된 데이터를 로그로 출력한다.
사용자 대기열 순번 알림 전송 Job 구현
대기열에 있는 사용자들의 현재 순번을 알림으로 전송해주는 Job을 구현해보자.
1. RankWaitQueueItemReader.java
import com.jdh.wait_queue.util.redis.RedisUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.Iterator;
import java.util.Set;
@Slf4j
@Component
@RequiredArgsConstructor
public class RankWaitQueueItemReader implements ItemReader<String> {
@Value("${queue.wait}")
private String waitQueueKey;
private final RedisUtil redisUtil;
private Set<String> dataSet;
private Iterator<String> iterator;
@Override
public String read() {
// 조회된 데이터가 없는 경우
if (iterator == null) {
// 현재 대기열에 있는 사용자 수 조회
Long waitCount = redisUtil.getSize(waitQueueKey);
log.info("[RankWaitQueueJobConfig] 현재 대기열에 있는 사용자 수 :: {}", waitCount);
// data set 조회
dataSet = redisUtil.zRange("waitingQueue", 0L, waitCount);
// Iterator로 변환
iterator = dataSet.iterator();
}
// Iterator에서 다음 데이터 조회
String result = iterator.hasNext() ? iterator.next() : null;
log.info("[RankWaitQueueItemReader] 대기열 순서 전송할 사용자 :: {}", result);
// Iterator에 더 이상 데이터가 없는 경우
if(result == null) {
// 데이터 초기화
initData();
}
return result;
}
/**
* initialize set, iterator
*/
private void initData() {
dataSet = null;
iterator = null;
}
}
Redis에서 대기열에 있는 사용자 데이터를 읽고, 처리할 수 있도록 Spring Batch의 ItemReader 인터페이스를 구현한 class이다. Redis의 ZRange 명령어로 대기열을 조회하고, Iterator를 사용해 순차적으로 데이터를 조회하고 Processor Class에서 처리할 수 있도록 전달한다.
2. RankWaitQueueProcessor.java
import com.jdh.wait_queue.util.redis.RedisUtil;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class RankWaitQueueProcessor implements ItemProcessor<String, String> {
@Value("${queue.wait}")
private String waitQueueKey;
private final RedisUtil redisUtil;
private final SimpMessagingTemplate messagingTemplate;
@Override
public String process(@NonNull String key) {
// key가 빈 값인지 체크
if(checkKeyEmpty(key))
return null;
// 현재 대기 순서 조회
Long rank = redisUtil.getzRank(waitQueueKey, key);
// 현재 대기 순서 전달
if(rank != null)
messagingTemplate.convertAndSend("/topic/" + key, "Your wait rank is " + (rank + 1));
return key;
}
/**
* check key empty
*/
private boolean checkKeyEmpty(String key) {
return key.isEmpty();
}
}
Redis 대기열에서 사용자의 순위를 조회하고, 해당 순위를 STOMP 메시지로 사용자에게 전달하는 역할을 하는 class이다.
3. RankWaitQueueJobConfig.java
import com.jdh.wait_queue.processor.waitrank.RankWaitQueueItemReader;
import com.jdh.wait_queue.processor.waitrank.RankWaitQueueProcessor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
@Slf4j
@Configuration
@RequiredArgsConstructor
public class RankWaitQueueJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
private final RankWaitQueueItemReader rankWaitQueueItemReader;
private final RankWaitQueueProcessor rankWaitQueueProcessor;
@Bean("rankWaitQueueJob")
public Job job() {
return new JobBuilder("rankWaitQueue", jobRepository)
.start(rankWaitQueueJobStep())
.build();
}
@Bean
public Step rankWaitQueueJobStep() {
return new StepBuilder("rankWaitQueueJob", jobRepository)
.<String, String>chunk(10, transactionManager) // 10명씩 처리
.reader(rankWaitQueueItemReader)
.processor(rankWaitQueueProcessor)
.writer(items -> items.forEach(item -> log.info("[RankWaitQueueJobConfig] Rank Wait Queue Job Processed item: {}", item))) // 처리된 데이터를 출력
.build();
}
}
Spring Batch를 사용하여 Redis 대기열에 있는 사용자들의 순위를 알려주는 배치 작업을 설정한 class이다. 각 스텝은 트랜잭션을 관리하면서 데이터를 10개씩 처리하고, 처리된 데이터를 로그로 출력한다.
Batch 실행 Scheduler 설정
위에서 구현한 Batch를 정해진 시간마다 실행하는 Scheduler를 설정한다.
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Slf4j
@Component
@RequiredArgsConstructor
public class BatchController {
private final JobLauncher jobLauncher;
@Qualifier("moveUsersToJobQueueJob")
private final Job moveUsersToJobQueue;
@Qualifier("rankWaitQueueJob")
private final Job rankWaitQueue;
@Scheduled(fixedRate = 5000)
public void runBatch() {
try {
JobParameters jobParameters = new JobParametersBuilder()
.addString("runBatchId", UUID.randomUUID().toString()) // 유일한 파라미터 추가
.toJobParameters();
// 대기열에서 작업열 이동 Job 실행
jobLauncher.run(moveUsersToJobQueue, jobParameters);
// 현재 대기열 순번 Send Job 실행
jobLauncher.run(rankWaitQueue, jobParameters);
} catch (Exception e) {
log.error("[BatchController] :: ", e);
}
}
}
테스트
1. 사용자 대기열 진입
Enter Queue를 클릭하여 대기열에 진입하면 다음과 같은 화면이 출력된다.
2. 대기순번 메시지 출력
만약 작업열로 진입하지 못한 경우 다음과 같이 대기순번이 출력된다.
3. 작업열 진입
작업열로 이동할 수 있게 되어 작업열로 이동한 경우, 다음과 같이 출력된다.
4. 대기열, 작업열 이탈
만약 'Leave Queue'를 클릭하거나 새로고침을 하거나 웹페이지를 닫으면 대기열 및 작업열에서 자동으로 나가게 된다.
정리하며
작업열 이동 시 실제 페이지 이동은 SPA 설계를 통해 구현할 수 있을 것으로 보인다. 다만, 본 게시글은 Spring Boot 기반의 비즈니스 로직 구현에 초점을 맞추었기 때문에 해당 기능은 포함하지 않았다.
자세한 코드는 깃허브를 참고하길 바란다.
링크 : https://github.com/JangDaeHyeok/SpringBoot-WaitQueue
'Spring Framework > Spring Boot' 카테고리의 다른 글
[Spring Boot] AOP, Redis Transaction을 이용한 분산락(Distributed Lock)으로 동시성 해결하기 (0) | 2024.10.18 |
---|---|
[Spring Boot] AOP, Redis를 이용한 멱등성 보장 구현하기 (2) | 2024.09.18 |
[Spring Boot] log4j2 로그 레벨별 색 지정 (0) | 2024.07.16 |
[Spring Boot] logback 로그 레벨별 색 지정 (4) | 2024.07.16 |
[Spring Boot] @Scheduled 사용 및 동작원리 (0) | 2023.07.27 |