From aa59037e28f0663f5cdd92b4fcca0b6828813807 Mon Sep 17 00:00:00 2001 From: Hyoseong Jo Date: Fri, 12 Dec 2025 03:30:20 +0900 Subject: [PATCH] =?UTF-8?q?=EB=B3=80=EA=B2=BD3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/kr/co/ragone/config/AsyncConfig.java | 22 +- .../ragone/controller/DocumentController.java | 28 +++ .../java/kr/co/ragone/domain/DocInfo.java | 8 + .../service/AsyncDocumentProcessor.java | 207 ++++++++++++++++++ .../service/DocumentIndexingService.java | 178 ++------------- .../ragone/service/DocumentParserService.java | 8 + .../kr/co/ragone/service/VisionService.java | 62 +++++- 7 files changed, 346 insertions(+), 167 deletions(-) create mode 100644 src/main/java/kr/co/ragone/service/AsyncDocumentProcessor.java diff --git a/src/main/java/kr/co/ragone/config/AsyncConfig.java b/src/main/java/kr/co/ragone/config/AsyncConfig.java index 6993ad8..4a331da 100644 --- a/src/main/java/kr/co/ragone/config/AsyncConfig.java +++ b/src/main/java/kr/co/ragone/config/AsyncConfig.java @@ -1,10 +1,30 @@ package kr.co.ragone.config; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import java.util.concurrent.Executor; + +/** + * 비동기 처리 설정 + * - 문서 업로드 후 Vision/청킹/임베딩 처리를 백그라운드에서 실행 + */ @Configuration @EnableAsync public class AsyncConfig { - // 비동기 처리 활성화 + + @Bean(name = "documentProcessExecutor") + public Executor documentProcessExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(2); // 기본 스레드 수 + executor.setMaxPoolSize(5); // 최대 스레드 수 + executor.setQueueCapacity(10); // 대기 큐 크기 + executor.setThreadNamePrefix("DocProcess-"); + executor.setWaitForTasksToCompleteOnShutdown(true); + executor.setAwaitTerminationSeconds(60); + executor.initialize(); + return executor; + } } diff --git a/src/main/java/kr/co/ragone/controller/DocumentController.java b/src/main/java/kr/co/ragone/controller/DocumentController.java index b7b2b66..a709b48 100644 --- a/src/main/java/kr/co/ragone/controller/DocumentController.java +++ b/src/main/java/kr/co/ragone/controller/DocumentController.java @@ -69,6 +69,34 @@ public class DocumentController { .orElse(ResponseEntity.notFound().build()); } + /** + * 문서 처리 상태 조회 (폴링용) + */ + @GetMapping("/documents/{docId}/status") + public ResponseEntity getDocumentStatus(@PathVariable Long docId) { + return docInfoRepository.findById(docId) + .map(doc -> ResponseEntity.ok(DocStatusResponse.builder() + .docId(doc.getDocId()) + .docStatus(doc.getDocStatus()) + .processProgress(doc.getProcessProgress()) + .processMessage(doc.getProcessMessage()) + .chunkCount(doc.getChunkCount()) + .errorMsg(doc.getErrorMsg()) + .build())) + .orElse(ResponseEntity.notFound().build()); + } + + @lombok.Data + @lombok.Builder + public static class DocStatusResponse { + private Long docId; + private String docStatus; // PENDING, PROCESSING, INDEXED, FAILED + private Integer processProgress; // 0~100 + private String processMessage; // "문서 분석중 (3/10)" + private Integer chunkCount; + private String errorMsg; + } + /** * 문서 다운로드 */ diff --git a/src/main/java/kr/co/ragone/domain/DocInfo.java b/src/main/java/kr/co/ragone/domain/DocInfo.java index 62410f9..91f6fb1 100644 --- a/src/main/java/kr/co/ragone/domain/DocInfo.java +++ b/src/main/java/kr/co/ragone/domain/DocInfo.java @@ -50,6 +50,14 @@ public class DocInfo { @Column(name = "error_msg", columnDefinition = "TEXT") private String errorMsg; + @Column(name = "process_progress") + @Builder.Default + private Integer processProgress = 0; + + @Column(name = "process_message", length = 200) + @Builder.Default + private String processMessage = "대기중"; + @Column(name = "created_at") @Builder.Default private LocalDateTime createdAt = LocalDateTime.now(); diff --git a/src/main/java/kr/co/ragone/service/AsyncDocumentProcessor.java b/src/main/java/kr/co/ragone/service/AsyncDocumentProcessor.java new file mode 100644 index 0000000..2700350 --- /dev/null +++ b/src/main/java/kr/co/ragone/service/AsyncDocumentProcessor.java @@ -0,0 +1,207 @@ +package kr.co.ragone.service; + +import kr.co.ragone.domain.DocInfo; +import kr.co.ragone.domain.TopicInfo; +import kr.co.ragone.repository.DocInfoRepository; +import kr.co.ragone.repository.TopicInfoRepository; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +import java.time.LocalDateTime; +import java.util.List; + +/** + * 비동기 문서 처리 서비스 + * - 별도 클래스로 분리해야 @Async가 정상 동작함 + */ +@Slf4j +@Service +@RequiredArgsConstructor +public class AsyncDocumentProcessor { + + private final DocInfoRepository docInfoRepository; + private final TopicInfoRepository topicInfoRepository; + private final DocumentParserService documentParserService; + private final ChunkingService chunkingService; + private final EmbeddingService embeddingService; + private final SmartChunkingService smartChunkingService; + private final VisionService visionService; + private final JdbcTemplate jdbcTemplate; + + /** + * 비동기 인덱싱 처리 + */ + @Async("documentProcessExecutor") + public void processIndexingAsync(Long docId, Long topicId, String filePath, String fileType) { + try { + log.info("[Async] 인덱싱 시작: docId={}", docId); + processIndexing(docId, topicId, filePath, fileType); + } catch (Exception e) { + log.error("[Async] 인덱싱 실패: docId={}", docId, e); + updateProgress(docId, -1, "오류: " + e.getMessage(), "FAILED", e.getMessage()); + } + } + + /** + * 실제 인덱싱 처리 (진행률 업데이트 포함) + */ + private void processIndexing(Long docId, Long topicId, String filePath, String fileType) throws Exception { + // 상태 업데이트: 처리 시작 + updateProgress(docId, 5, "처리 시작", "PROCESSING", null); + + String content; + + // ===== 1단계: 문서 파싱 (5% → 40%) ===== + if ("pdf".equalsIgnoreCase(fileType) && visionService.isEnabled()) { + updateProgress(docId, 10, "PDF Vision 분석 시작...", null, null); + content = visionService.processPdfWithVisionAndProgress(filePath, docId, this::updateProgress); + + if (content == null || content.isBlank()) { + log.warn("[Vision] Vision 분석 실패, 기본 파서로 대체"); + updateProgress(docId, 35, "기본 파서로 문서 분석중...", null, null); + content = documentParserService.parseDocumentFromPath(filePath); + } + } else { + updateProgress(docId, 15, "문서 분석중...", null, null); + content = documentParserService.parseDocumentFromPath(filePath); + } + + updateProgress(docId, 40, "문서 분석 완료", null, null); + + if (content == null || content.isBlank()) { + throw new RuntimeException("문서 내용이 비어있습니다."); + } + + // ===== 2단계: 청킹 (40% → 50%) ===== + updateProgress(docId, 45, "텍스트 분할중...", null, null); + List chunks = chunkingService.chunkText(content); + if (chunks.isEmpty()) { + throw new RuntimeException("청크 생성 실패"); + } + updateProgress(docId, 50, "텍스트 분할 완료: " + chunks.size() + "개 구간", null, null); + log.info("청크 생성 완료: {} chunks", chunks.size()); + + // ===== 3단계: 임베딩 생성 및 저장 (50% → 95%) ===== + TopicInfo topicInfo = topicInfoRepository.findById(topicId) + .orElseThrow(() -> new RuntimeException("주제를 찾을 수 없습니다.")); + DocInfo docInfo = docInfoRepository.findById(docId) + .orElseThrow(() -> new RuntimeException("문서를 찾을 수 없습니다.")); + + int totalChunks = chunks.size(); + for (int i = 0; i < totalChunks; i++) { + ChunkingService.ChunkResult chunk = chunks.get(i); + + // 진행률 계산: 50% ~ 95% 구간에서 청크 수에 따라 분배 + int progress = 50 + (int) ((i + 1) * 45.0 / totalChunks); + updateProgress(docId, progress, "벡터 생성중... (" + (i + 1) + "/" + totalChunks + ")", null, null); + + // 임베딩 생성 + String embeddingVector = embeddingService.createEmbeddingAsString(chunk.getContent()); + + // 스마트 청킹: 메타데이터 생성 (활성화된 경우) + SmartChunkingService.ChunkMetadata metadata = null; + if (smartChunkingService.isEnabled()) { + metadata = smartChunkingService.generateMetadata(chunk.getContent()); + } + + // Native Query로 벡터 + 메타데이터 저장 + saveChunkWithEmbedding(docInfo, topicInfo, chunk, embeddingVector, metadata); + } + + // ===== 4단계: 완료 (100%) ===== + updateChunkCount(docId, chunks.size()); + updateProgress(docId, 100, "인덱싱 완료", "INDEXED", null); + + log.info("인덱싱 완료: docId={}, chunks={}", docId, chunks.size()); + } + + /** + * 진행률 업데이트 + */ + public void updateProgress(Long docId, int progress, String message, String status, String errorMsg) { + try { + docInfoRepository.findById(docId).ifPresent(doc -> { + if (progress >= 0) { + doc.setProcessProgress(progress); + } + if (message != null) { + doc.setProcessMessage(message); + } + if (status != null) { + doc.setDocStatus(status); + } + if (errorMsg != null) { + doc.setErrorMsg(errorMsg); + } + doc.setUpdatedAt(LocalDateTime.now()); + docInfoRepository.save(doc); + }); + log.debug("[Progress] docId={}, progress={}%, message={}", docId, progress, message); + } catch (Exception e) { + log.warn("진행률 업데이트 실패: docId={}", docId, e); + } + } + + /** + * 청크 + 벡터 + 메타데이터 저장 (Native Query 사용) + */ + private void saveChunkWithEmbedding(DocInfo docInfo, TopicInfo topicInfo, + ChunkingService.ChunkResult chunk, String embedding, + SmartChunkingService.ChunkMetadata metadata) { + String sql = """ + INSERT INTO TB_DOC_CHUNK + (doc_id, topic_id, chunk_content, chunk_embedding, chunk_index, token_count, + chunk_summary, chunk_keywords, chunk_questions, chunk_type, created_at) + VALUES (?, ?, ?, ?::vector, ?, ?, ?, ?, ?, ?, ?) + """; + + // 메타데이터 처리 + String summary = null; + String keywords = null; + String questions = null; + + if (metadata != null) { + summary = metadata.getSummary(); + keywords = metadata.getKeywords() != null ? String.join(", ", metadata.getKeywords()) : null; + questions = metadata.getQuestions() != null ? toJson(metadata.getQuestions()) : null; + } + + jdbcTemplate.update(sql, + docInfo.getDocId(), + topicInfo.getTopicId(), + chunk.getContent(), + embedding, + chunk.getIndex(), + chunk.getTokenCount(), + summary, + keywords, + questions, + "text", + LocalDateTime.now() + ); + } + + /** + * List를 JSON 문자열로 변환 + */ + private String toJson(java.util.List list) { + if (list == null || list.isEmpty()) return null; + try { + com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper(); + return mapper.writeValueAsString(list); + } catch (Exception e) { + return null; + } + } + + private void updateChunkCount(Long docId, int count) { + docInfoRepository.findById(docId).ifPresent(doc -> { + doc.setChunkCount(count); + doc.setUpdatedAt(LocalDateTime.now()); + docInfoRepository.save(doc); + }); + } +} diff --git a/src/main/java/kr/co/ragone/service/DocumentIndexingService.java b/src/main/java/kr/co/ragone/service/DocumentIndexingService.java index e3dc173..468f826 100644 --- a/src/main/java/kr/co/ragone/service/DocumentIndexingService.java +++ b/src/main/java/kr/co/ragone/service/DocumentIndexingService.java @@ -1,6 +1,5 @@ package kr.co.ragone.service; -import kr.co.ragone.domain.DocChunk; import kr.co.ragone.domain.DocInfo; import kr.co.ragone.domain.TopicInfo; import kr.co.ragone.repository.DocChunkRepository; @@ -9,8 +8,6 @@ import kr.co.ragone.repository.TopicInfoRepository; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; -import org.springframework.jdbc.core.JdbcTemplate; -import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.multipart.MultipartFile; @@ -19,7 +16,6 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.time.LocalDateTime; import java.util.List; import java.util.UUID; @@ -31,18 +27,13 @@ public class DocumentIndexingService { private final TopicInfoRepository topicInfoRepository; private final DocInfoRepository docInfoRepository; private final DocChunkRepository docChunkRepository; - private final DocumentParserService documentParserService; - private final ChunkingService chunkingService; - private final EmbeddingService embeddingService; - private final SmartChunkingService smartChunkingService; - private final VisionService visionService; - private final JdbcTemplate jdbcTemplate; + private final AsyncDocumentProcessor asyncDocumentProcessor; // 비동기 처리 서비스 @Value("${file.upload-dir:./uploads}") private String uploadDir; /** - * 문서 업로드 및 인덱싱 + * 문서 업로드 (즉시 응답) - 인덱싱은 비동기로 진행 */ @Transactional public DocInfo uploadAndIndex(Long topicId, MultipartFile file) throws Exception { @@ -53,157 +44,31 @@ public class DocumentIndexingService { // 2. 파일 저장 String savedFileName = saveFile(file); String filePath = Paths.get(uploadDir, savedFileName).toString(); + String fileType = getFileExtension(file.getOriginalFilename()); - // 3. 문서 정보 저장 (PROCESSING 상태) + // 3. 문서 정보 저장 (PENDING 상태) DocInfo docInfo = DocInfo.builder() .topicInfo(topicInfo) .fileName(savedFileName) .originalName(file.getOriginalFilename()) .filePath(filePath) .fileSize(file.getSize()) - .fileType(getFileExtension(file.getOriginalFilename())) - .docStatus("PROCESSING") + .fileType(fileType) + .docStatus("PENDING") + .processProgress(0) + .processMessage("업로드 완료, 처리 대기중") .build(); docInfo = docInfoRepository.save(docInfo); - // 4. 비동기로 인덱싱 처리 - processIndexingAsync(docInfo.getDocId(), topicInfo, file); + log.info("[Upload] 파일 저장 완료: docId={}, fileName={}", docInfo.getDocId(), file.getOriginalFilename()); + // 4. 비동기로 인덱싱 처리 (별도 서비스 호출 → @Async 정상 동작) + asyncDocumentProcessor.processIndexingAsync(docInfo.getDocId(), topicInfo.getTopicId(), filePath, fileType); + + // 5. 즉시 응답 (비동기 처리는 백그라운드에서 진행) return docInfo; } - /** - * 비동기 인덱싱 처리 - */ - @Async - public void processIndexingAsync(Long docId, TopicInfo topicInfo, MultipartFile file) { - try { - processIndexing(docId, topicInfo, file); - } catch (Exception e) { - log.error("인덱싱 실패: docId={}", docId, e); - updateDocStatus(docId, "FAILED", e.getMessage()); - } - } - - /** - * 실제 인덱싱 처리 - */ - private void processIndexing(Long docId, TopicInfo topicInfo, MultipartFile file) throws Exception { - log.info("인덱싱 시작: docId={}, fileName={}", docId, file.getOriginalFilename()); - - // 문서 정보 조회 - DocInfo docInfo = docInfoRepository.findById(docId) - .orElseThrow(() -> new RuntimeException("문서를 찾을 수 없습니다.")); - - String content; - - // 1. Vision 처리 (PDF + Vision 활성화된 경우) - String fileType = getFileExtension(file.getOriginalFilename()); - if ("pdf".equalsIgnoreCase(fileType) && visionService.isEnabled()) { - log.info("[Vision] PDF Vision 분석 시작..."); - content = visionService.processPdfWithVision(docInfo.getFilePath()); - - if (content == null || content.isBlank()) { - log.warn("[Vision] Vision 분석 실패, 기본 파서로 대체"); - content = documentParserService.parseDocument(file); - } else { - log.info("[Vision] Vision 분석 완료: {} 글자", content.length()); - } - } else { - // 2. 기본 문서 파싱 (Tika) - content = documentParserService.parseDocument(file); - } - - if (content == null || content.isBlank()) { - throw new RuntimeException("문서 내용이 비어있습니다."); - } - - // 3. 청킹 - List chunks = chunkingService.chunkText(content); - if (chunks.isEmpty()) { - throw new RuntimeException("청크 생성 실패"); - } - log.info("청크 생성 완료: {} chunks", chunks.size()); - - // 4. 각 청크에 대해 임베딩 생성 및 저장 - for (int i = 0; i < chunks.size(); i++) { - ChunkingService.ChunkResult chunk = chunks.get(i); - - // 임베딩 생성 - String embeddingVector = embeddingService.createEmbeddingAsString(chunk.getContent()); - - // 스마트 청킹: 메타데이터 생성 (활성화된 경우) - SmartChunkingService.ChunkMetadata metadata = null; - if (smartChunkingService.isEnabled()) { - log.info("[SmartChunking] 메타데이터 생성 중... ({}/{})", i + 1, chunks.size()); - metadata = smartChunkingService.generateMetadata(chunk.getContent()); - } - - // Native Query로 벡터 + 메타데이터 저장 - saveChunkWithEmbedding(docInfo, topicInfo, chunk, embeddingVector, metadata); - - log.debug("청크 저장 완료: index={}", chunk.getIndex()); - } - - // 5. 문서 상태 업데이트 - updateDocStatus(docId, "INDEXED", null); - updateChunkCount(docId, chunks.size()); - - log.info("인덱싱 완료: docId={}, chunks={}", docId, chunks.size()); - } - - /** - * 청크 + 벡터 + 메타데이터 저장 (Native Query 사용) - */ - private void saveChunkWithEmbedding(DocInfo docInfo, TopicInfo topicInfo, - ChunkingService.ChunkResult chunk, String embedding, - SmartChunkingService.ChunkMetadata metadata) { - String sql = """ - INSERT INTO TB_DOC_CHUNK - (doc_id, topic_id, chunk_content, chunk_embedding, chunk_index, token_count, - chunk_summary, chunk_keywords, chunk_questions, chunk_type, created_at) - VALUES (?, ?, ?, ?::vector, ?, ?, ?, ?, ?, ?, ?) - """; - - // 메타데이터 처리 - String summary = null; - String keywords = null; - String questions = null; - - if (metadata != null) { - summary = metadata.getSummary(); - keywords = metadata.getKeywords() != null ? String.join(", ", metadata.getKeywords()) : null; - questions = metadata.getQuestions() != null ? toJson(metadata.getQuestions()) : null; - } - - jdbcTemplate.update(sql, - docInfo.getDocId(), - topicInfo.getTopicId(), - chunk.getContent(), - embedding, - chunk.getIndex(), - chunk.getTokenCount(), - summary, - keywords, - questions, - "text", - LocalDateTime.now() - ); - } - - /** - * List를 JSON 문자열로 변환 - */ - private String toJson(java.util.List list) { - if (list == null || list.isEmpty()) return null; - try { - com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper(); - return mapper.writeValueAsString(list); - } catch (Exception e) { - return null; - } - } - /** * 파일 저장 */ @@ -230,23 +95,6 @@ public class DocumentIndexingService { return lastDot > 0 ? filename.substring(lastDot + 1).toLowerCase() : ""; } - private void updateDocStatus(Long docId, String status, String errorMsg) { - docInfoRepository.findById(docId).ifPresent(doc -> { - doc.setDocStatus(status); - doc.setErrorMsg(errorMsg); - doc.setUpdatedAt(LocalDateTime.now()); - docInfoRepository.save(doc); - }); - } - - private void updateChunkCount(Long docId, int count) { - docInfoRepository.findById(docId).ifPresent(doc -> { - doc.setChunkCount(count); - doc.setUpdatedAt(LocalDateTime.now()); - docInfoRepository.save(doc); - }); - } - /** * 문서 삭제 (청크 포함) */ diff --git a/src/main/java/kr/co/ragone/service/DocumentParserService.java b/src/main/java/kr/co/ragone/service/DocumentParserService.java index 29f5957..2cda223 100644 --- a/src/main/java/kr/co/ragone/service/DocumentParserService.java +++ b/src/main/java/kr/co/ragone/service/DocumentParserService.java @@ -117,4 +117,12 @@ public class DocumentParserService { log.info("문서 파싱 완료: {} chars", content.length()); return content; } + + /** + * 파일 경로로 텍스트 추출 + */ + public String parseDocumentFromPath(String filePath) throws IOException, TikaException { + java.io.File file = new java.io.File(filePath); + return parseDocument(file); + } } diff --git a/src/main/java/kr/co/ragone/service/VisionService.java b/src/main/java/kr/co/ragone/service/VisionService.java index 1f1a8c4..b6dd286 100644 --- a/src/main/java/kr/co/ragone/service/VisionService.java +++ b/src/main/java/kr/co/ragone/service/VisionService.java @@ -54,7 +54,67 @@ public class VisionService { } /** - * PDF 파일을 Vision 모델로 분석하여 텍스트 추출 + * 진행률 업데이트 콜백 인터페이스 + */ + @FunctionalInterface + public interface ProgressCallback { + void update(Long docId, int progress, String message, String status, String errorMsg); + } + + /** + * PDF 파일을 Vision 모델로 분석 (진행률 콜백 포함) + */ + public String processPdfWithVisionAndProgress(String pdfPath, Long docId, ProgressCallback callback) { + if (!visionEnabled) { + log.info("[Vision] 비활성화 상태"); + return null; + } + + StringBuilder allDescriptions = new StringBuilder(); + + try (PDDocument document = PDDocument.load(new File(pdfPath))) { + PDFRenderer renderer = new PDFRenderer(document); + int pageCount = document.getNumberOfPages(); + + log.info("[Vision] PDF 분석 시작: {} 페이지", pageCount); + + for (int i = 0; i < pageCount; i++) { + try { + // 진행률 계산: 10% ~ 40% 구간에서 페이지 수에 따라 분배 + int progress = 10 + (int) ((i + 1) * 30.0 / pageCount); + String message = "PDF 분석중... (" + (i + 1) + "/" + pageCount + " 페이지)"; + callback.update(docId, progress, message, null, null); + + log.info("[Vision] 페이지 {}/{} 분석 중...", i + 1, pageCount); + + BufferedImage image = renderer.renderImageWithDPI(i, 150, ImageType.RGB); + String base64Image = encodeImageToBase64(image); + String description = callVisionApi(base64Image, i + 1, pageCount); + + if (description != null && !description.isEmpty()) { + allDescriptions.append("\n\n=== 페이지 ").append(i + 1).append(" ===\n"); + allDescriptions.append(description); + } + + Thread.sleep(1000); + + } catch (Exception e) { + log.warn("[Vision] 페이지 {} 분석 실패: {}", i + 1, e.getMessage()); + } + } + + log.info("[Vision] PDF 분석 완료"); + + } catch (Exception e) { + log.error("[Vision] PDF 처리 실패: {}", e.getMessage()); + return null; + } + + return allDescriptions.toString(); + } + + /** + * PDF 파일을 Vision 모델로 분석하여 텍스트 추출 (기존 메서드 - 호환성 유지) */ public String processPdfWithVision(String pdfPath) { if (!visionEnabled) {