森 宝松 SIer Tech Blog

【SIerのための非同期処理と並列処理の考え方】

森 宝松
SIer Tech Blog
2025年3月22日

【SIerのための非同期処理と並列処理の考え方】

業務システムの大規模化・複雑化に伴い、非同期処理と並列処理の重要性が増しています。本記事では、SIerエンジニアが押さえておくべき非同期処理と並列処理の基本概念から実装パターン、パフォーマンスチューニングまでを解説します。

1. 非同期処理と並列処理の基本

1.1 非同期処理とは

非同期処理は、処理の完了を待たずに次の処理を実行する方式です。主に以下のような場面で使用されます:

  • I/O処理(ファイル、ネットワーク、データベース)
  • 長時間処理
  • ユーザーインターフェースの応答性向上
  • バッチ処理
// Javaでの非同期処理の基本例
@Service
public class AsyncService {
    @Async
    public CompletableFuture<ProcessResult> processAsync(ProcessRequest request) {
        return CompletableFuture.supplyAsync(() -> {
            // 時間のかかる処理
            ProcessResult result = performLongRunningTask(request);
            return result;
        });
    }
    
    private ProcessResult performLongRunningTask(ProcessRequest request) {
        try {
            // 処理の実行
            Thread.sleep(5000); // 実際の処理を想定
            return new ProcessResult("処理が完了しました");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ProcessingException("処理が中断されました", e);
        }
    }
}

1.2 並列処理とは

並列処理は、複数の処理を同時に実行する方式です。主に以下のような目的で使用されます:

  • 処理速度の向上
  • リソースの効率的な利用
  • スケーラビリティの確保
// 並列処理の基本例
@Service
public class ParallelService {
    private final ExecutorService executor = 
        Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    
    public List<ProcessResult> processInParallel(List<ProcessRequest> requests) {
        try {
            List<Future<ProcessResult>> futures = requests.stream()
                .map(request -> executor.submit(() -> processRequest(request)))
                .collect(Collectors.toList());
            
            return futures.stream()
                .map(this::getFutureResult)
                .collect(Collectors.toList());
        } finally {
            executor.shutdown();
        }
    }
    
    private ProcessResult processRequest(ProcessRequest request) {
        // 個別の処理実行
        return new ProcessResult("処理完了: " + request.getId());
    }
    
    private ProcessResult getFutureResult(Future<ProcessResult> future) {
        try {
            return future.get(1, TimeUnit.MINUTES);
        } catch (Exception e) {
            throw new ProcessingException("処理の取得に失敗しました", e);
        }
    }
}

2. 非同期処理のパターン

2.1 コールバックパターン

// コールバックパターンの実装例
public interface ProcessCallback {
    void onSuccess(ProcessResult result);
    void onError(Exception e);
}

@Service
public class CallbackService {
    private final ExecutorService executor = Executors.newCachedThreadPool();
    
    public void processWithCallback(ProcessRequest request, ProcessCallback callback) {
        executor.submit(() -> {
            try {
                ProcessResult result = processRequest(request);
                callback.onSuccess(result);
            } catch (Exception e) {
                callback.onError(e);
            }
        });
    }
    
    // 使用例
    public void executeProcess() {
        ProcessRequest request = new ProcessRequest("データ処理");
        processWithCallback(request, new ProcessCallback() {
            @Override
            public void onSuccess(ProcessResult result) {
                System.out.println("処理成功: " + result.getMessage());
            }
            
            @Override
            public void onError(Exception e) {
                System.err.println("処理失敗: " + e.getMessage());
            }
        });
    }
}

2.2 Future/Promiseパターン

// Future/Promiseパターンの実装例
@Service
public class FutureService {
    private final ExecutorService executor = Executors.newCachedThreadPool();
    
    public CompletableFuture<ProcessResult> processAsync(ProcessRequest request) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                return processRequest(request);
            } catch (Exception e) {
                throw new CompletionException(e);
            }
        }, executor)
        .thenApply(result -> {
            // 結果の後処理
            return enrichResult(result);
        })
        .exceptionally(throwable -> {
            // エラーハンドリング
            logger.error("処理エラー", throwable);
            return new ProcessResult("エラー発生");
        });
    }
    
    // 使用例
    public void executeProcess() {
        ProcessRequest request = new ProcessRequest("データ処理");
        processAsync(request)
            .thenAccept(result -> System.out.println("処理完了: " + result.getMessage()))
            .exceptionally(throwable -> {
                System.err.println("エラー: " + throwable.getMessage());
                return null;
            });
    }
}

2.3 Reactive Streamパターン

// Reactive Streamパターンの実装例(Project Reactor使用)
@Service
public class ReactiveService {
    public Flux<ProcessResult> processReactive(Flux<ProcessRequest> requests) {
        return requests
            .flatMap(this::processRequest)
            .retry(3)
            .onErrorResume(e -> Flux.empty())
            .publishOn(Schedulers.boundedElastic());
    }
    
    private Mono<ProcessResult> processRequest(ProcessRequest request) {
        return Mono.fromCallable(() -> {
            // 実際の処理
            return new ProcessResult("処理完了: " + request.getId());
        });
    }
    
    // 使用例
    public void executeProcess() {
        Flux<ProcessRequest> requests = Flux.just(
            new ProcessRequest("1"),
            new ProcessRequest("2"),
            new ProcessRequest("3")
        );
        
        processReactive(requests)
            .subscribe(
                result -> System.out.println("処理成功: " + result.getMessage()),
                error -> System.err.println("エラー: " + error.getMessage()),
                () -> System.out.println("全処理完了")
            );
    }
}

3. 並列処理のパターン

3.1 マルチスレッドパターン

// マルチスレッドパターンの実装例
@Service
public class MultiThreadService {
    private final int threadPoolSize = Runtime.getRuntime().availableProcessors();
    private final ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize);
    
    public List<ProcessResult> processInParallel(List<ProcessRequest> requests) {
        int batchSize = (int) Math.ceil(requests.size() / (double) threadPoolSize);
        
        List<List<ProcessRequest>> batches = Lists.partition(requests, batchSize);
        List<Future<List<ProcessResult>>> futures = new ArrayList<>();
        
        for (List<ProcessRequest> batch : batches) {
            futures.add(executor.submit(() -> processBatch(batch)));
        }
        
        return futures.stream()
            .map(this::getFutureResult)
            .flatMap(List::stream)
            .collect(Collectors.toList());
    }
    
    private List<ProcessResult> processBatch(List<ProcessRequest> batch) {
        return batch.stream()
            .map(this::processRequest)
            .collect(Collectors.toList());
    }
    
    private ProcessResult processRequest(ProcessRequest request) {
        // 個別のリクエスト処理
        return new ProcessResult("処理完了: " + request.getId());
    }
    
    private List<ProcessResult> getFutureResult(Future<List<ProcessResult>> future) {
        try {
            return future.get(1, TimeUnit.MINUTES);
        } catch (Exception e) {
            throw new ProcessingException("処理の取得に失敗しました", e);
        }
    }
}

3.2 Fork/Joinパターン

// Fork/Joinパターンの実装例
public class ProcessTask extends RecursiveTask<List<ProcessResult>> {
    private final List<ProcessRequest> requests;
    private final int threshold;
    
    public ProcessTask(List<ProcessRequest> requests, int threshold) {
        this.requests = requests;
        this.threshold = threshold;
    }
    
    @Override
    protected List<ProcessResult> compute() {
        if (requests.size() <= threshold) {
            return processDirectly();
        }
        
        return processInParallel();
    }
    
    private List<ProcessResult> processDirectly() {
        return requests.stream()
            .map(request -> new ProcessResult("処理完了: " + request.getId()))
            .collect(Collectors.toList());
    }
    
    private List<ProcessResult> processInParallel() {
        int mid = requests.size() / 2;
        
        ProcessTask leftTask = new ProcessTask(
            requests.subList(0, mid),
            threshold
        );
        ProcessTask rightTask = new ProcessTask(
            requests.subList(mid, requests.size()),
            threshold
        );
        
        leftTask.fork();
        List<ProcessResult> rightResult = rightTask.compute();
        List<ProcessResult> leftResult = leftTask.join();
        
        List<ProcessResult> results = new ArrayList<>();
        results.addAll(leftResult);
        results.addAll(rightResult);
        return results;
    }
}

@Service
public class ForkJoinService {
    private final ForkJoinPool forkJoinPool = new ForkJoinPool();
    
    public List<ProcessResult> processWithForkJoin(List<ProcessRequest> requests) {
        return forkJoinPool.invoke(new ProcessTask(requests, 100));
    }
}

3.3 アクター/メッセージパッシングパターン

// アクターパターンの実装例(Akka使用)
public class ProcessingActor extends AbstractActor {
    private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
    
    @Override
    public Receive createReceive() {
        return receiveBuilder()
            .match(ProcessRequest.class, this::handleRequest)
            .match(ProcessingStatus.class, this::handleStatus)
            .build();
    }
    
    private void handleRequest(ProcessRequest request) {
        try {
            ProcessResult result = processRequest(request);
            getSender().tell(result, getSelf());
        } catch (Exception e) {
            getSender().tell(new Status.Failure(e), getSelf());
        }
    }
    
    private void handleStatus(ProcessingStatus status) {
        log.info("現在の処理状況: {}", status);
    }
    
    private ProcessResult processRequest(ProcessRequest request) {
        // 実際の処理
        return new ProcessResult("処理完了: " + request.getId());
    }
}

@Service
public class ActorService {
    private final ActorSystem system = ActorSystem.create("ProcessingSystem");
    private final ActorRef processingActor = system.actorOf(
        Props.create(ProcessingActor.class),
        "processingActor"
    );
    
    public CompletableFuture<ProcessResult> processWithActor(ProcessRequest request) {
        return PatternsCS.ask(processingActor, request, Duration.ofMinutes(1))
            .thenApply(response -> {
                if (response instanceof ProcessResult) {
                    return (ProcessResult) response;
                } else if (response instanceof Status.Failure) {
                    throw new ProcessingException(
                        ((Status.Failure) response).cause().getMessage()
                    );
                } else {
                    throw new ProcessingException("不明なレスポンス");
                }
            })
            .toCompletableFuture();
    }
}

4. パフォーマンスチューニング

4.1 スレッドプールの最適化

// スレッドプール設定の最適化例
@Configuration
public class ThreadPoolConfig {
    
    @Bean
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        
        // コア数に基づいて設定
        int corePoolSize = Runtime.getRuntime().availableProcessors();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(corePoolSize * 2);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("AsyncThread-");
        
        // 適切な拒否ポリシーの設定
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        
        return executor;
    }
    
    @Bean
    public Executor ioExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        
        // I/O処理用の設定
        executor.setCorePoolSize(20);
        executor.setMaxPoolSize(100);
        executor.setQueueCapacity(1000);
        executor.setThreadNamePrefix("IOThread-");
        
        return executor;
    }
}

4.2 バッチサイズの最適化

// バッチ処理の最適化例
@Service
public class OptimizedBatchService {
    private static final int OPTIMAL_BATCH_SIZE = 1000;
    private final ExecutorService executor = Executors.newFixedThreadPool(
        Runtime.getRuntime().availableProcessors()
    );
    
    public List<ProcessResult> processBatch(List<ProcessRequest> requests) {
        return Lists.partition(requests, OPTIMAL_BATCH_SIZE).stream()
            .map(batch -> executor.submit(() -> processBatchInternal(batch)))
            .map(this::getFutureResult)
            .flatMap(List::stream)
            .collect(Collectors.toList());
    }
    
    private List<ProcessResult> processBatchInternal(List<ProcessRequest> batch) {
        return batch.stream()
            .map(this::processRequest)
            .collect(Collectors.toList());
    }
    
    private ProcessResult processRequest(ProcessRequest request) {
        // バッチ内の個別処理
        return new ProcessResult("処理完了: " + request.getId());
    }
    
    private List<ProcessResult> getFutureResult(Future<List<ProcessResult>> future) {
        try {
            return future.get(1, TimeUnit.MINUTES);
        } catch (Exception e) {
            throw new ProcessingException("バッチ処理の取得に失敗しました", e);
        }
    }
}

4.3 メモリ管理の最適化

// メモリ使用の最適化例
@Service
public class MemoryOptimizedService {
    private final BlockingQueue<ProcessRequest> requestQueue = 
        new LinkedBlockingQueue<>(1000);
    
    public void submitRequest(ProcessRequest request) {
        if (!requestQueue.offer(request)) {
            throw new ProcessingException("キューが満杯です");
        }
    }
    
    @Scheduled(fixedRate = 1000)
    public void processQueuedRequests() {
        List<ProcessRequest> batch = new ArrayList<>();
        requestQueue.drainTo(batch, 100);
        
        if (!batch.isEmpty()) {
            processBatch(batch);
        }
    }
    
    private void processBatch(List<ProcessRequest> batch) {
        try (Stream<ProcessRequest> stream = batch.stream()) {
            stream
                .map(this::processRequest)
                .forEach(this::saveResult);
        }
    }
    
    private ProcessResult processRequest(ProcessRequest request) {
        // メモリ効率の良い処理
        return new ProcessResult("処理完了: " + request.getId());
    }
    
    private void saveResult(ProcessResult result) {
        // 結果の永続化
    }
}

5. エラーハンドリングとリカバリ

5.1 リトライ処理

// リトライ処理の実装例
@Service
public class RetryService {
    private final RetryTemplate retryTemplate;
    
    public RetryService() {
        this.retryTemplate = createRetryTemplate();
    }
    
    private RetryTemplate createRetryTemplate() {
        RetryTemplate template = new RetryTemplate();
        
        // リトライポリシーの設定
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(1000);
        backOffPolicy.setMultiplier(2);
        backOffPolicy.setMaxInterval(10000);
        
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        
        template.setBackOffPolicy(backOffPolicy);
        template.setRetryPolicy(retryPolicy);
        
        return template;
    }
    
    public ProcessResult processWithRetry(ProcessRequest request) {
        return retryTemplate.execute(context -> {
            try {
                return processRequest(request);
            } catch (Exception e) {
                if (isRetryable(e)) {
                    throw e;
                } else {
                    throw new NonRetryableException("リトライ不可能なエラー", e);
                }
            }
        });
    }
    
    private boolean isRetryable(Exception e) {
        return e instanceof TimeoutException ||
               e instanceof TransientException;
    }
}

5.2 サーキットブレーカー

// サーキットブレーカーの実装例
@Service
public class CircuitBreakerService {
    private final CircuitBreaker circuitBreaker;
    
    public CircuitBreakerService() {
        this.circuitBreaker = createCircuitBreaker();
    }
    
    private CircuitBreaker createCircuitBreaker() {
        CircuitBreakerConfig config = CircuitBreakerConfig.custom()
            .failureRateThreshold(50)
            .waitDurationInOpenState(Duration.ofSeconds(60))
            .ringBufferSizeInHalfOpenState(10)
            .ringBufferSizeInClosedState(100)
            .build();
        
        return CircuitBreaker.of("processService", config);
    }
    
    public ProcessResult processWithCircuitBreaker(ProcessRequest request) {
        return Try.of(() -> 
            circuitBreaker.executeSupplier(() -> processRequest(request))
        )
        .recover(CircuitBreakerOpenException.class, e -> 
            handleCircuitBreakerOpen(request)
        )
        .get();
    }
    
    private ProcessResult handleCircuitBreakerOpen(ProcessRequest request) {
        // フォールバック処理
        return new ProcessResult("サーキットブレーカー作動中: " + request.getId());
    }
}

5.3 デッドレター処理

// デッドレター処理の実装例
@Service
public class DeadLetterService {
    private final Queue<ProcessRequest> deadLetterQueue = 
        new ConcurrentLinkedQueue<>();
    private final int maxRetries = 3;
    
    public void handleFailedRequest(ProcessRequest request, Exception e) {
        DeadLetterEntry entry = new DeadLetterEntry(request, e);
        deadLetterQueue.offer(entry);
        logFailedRequest(entry);
    }
    
    @Scheduled(fixedRate = 300000) // 5分ごとに実行
    public void processDeadLetters() {
        List<DeadLetterEntry> entries = new ArrayList<>();
        deadLetterQueue.drainTo(entries);
        
        for (DeadLetterEntry entry : entries) {
            if (entry.getRetryCount() < maxRetries) {
                retryProcessing(entry);
            } else {
                handlePermanentFailure(entry);
            }
        }
    }
    
    private void retryProcessing(DeadLetterEntry entry) {
        try {
            ProcessResult result = processRequest(entry.getRequest());
            handleSuccessfulRetry(entry, result);
        } catch (Exception e) {
            entry.incrementRetryCount();
            if (entry.getRetryCount() < maxRetries) {
                deadLetterQueue.offer(entry);
            } else {
                handlePermanentFailure(entry);
            }
        }
    }
    
    private void handlePermanentFailure(DeadLetterEntry entry) {
        // 永続的な失敗の処理
        // 例: 管理者への通知、エラーログへの記録など
    }
}

6. モニタリングと運用管理

6.1 メトリクス収集

// メトリクス収集の実装例
@Service
public class MetricsService {
    private final MeterRegistry registry;
    
    public MetricsService(MeterRegistry registry) {
        this.registry = registry;
    }
    
    public void recordProcessingTime(String processName, long timeMs) {
        Timer timer = registry.timer("process.execution.time", 
            "process", processName);
        timer.record(timeMs, TimeUnit.MILLISECONDS);
    }
    
    public void incrementProcessCount(String processName, String status) {
        Counter counter = registry.counter("process.count",
            "process", processName,
            "status", status);
        counter.increment();
    }
    
    public void recordQueueSize(String queueName, int size) {
        Gauge.builder("queue.size", size)
            .tag("queue", queueName)
            .description("現在のキューサイズ")
            .register(registry);
    }
}

6.2 ログ管理

// ログ管理の実装例
@Aspect
@Component
public class ProcessingLogger {
    private static final Logger logger = 
        LoggerFactory.getLogger(ProcessingLogger.class);
    
    @Around("execution(* com.example.service.*Service.*(..))")
    public Object logProcessing(ProceedingJoinPoint joinPoint) throws Throwable {
        String methodName = joinPoint.getSignature().getName();
        String className = joinPoint.getTarget().getClass().getSimpleName();
        
        MDC.put("className", className);
        MDC.put("methodName", methodName);
        MDC.put("requestId", generateRequestId());
        
        try {
            logger.info("処理開始: {}.{}", className, methodName);
            long startTime = System.currentTimeMillis();
            
            Object result = joinPoint.proceed();
            
            long endTime = System.currentTimeMillis();
            logger.info("処理完了: {}.{} (所要時間: {}ms)",
                className, methodName, endTime - startTime);
            
            return result;
        } catch (Exception e) {
            logger.error("処理エラー: {}.{}", className, methodName, e);
            throw e;
        } finally {
            MDC.clear();
        }
    }
    
    private String generateRequestId() {
        return UUID.randomUUID().toString();
    }
}

6.3 アラート設定

// アラート設定の実装例
@Service
public class AlertService {
    private final MetricsService metricsService;
    private final NotificationService notificationService;
    
    @Scheduled(fixedRate = 60000) // 1分ごとに実行
    public void checkMetrics() {
        // キューサイズの監視
        checkQueueSize();
        
        // エラー率の監視
        checkErrorRate();
        
        // 処理時間の監視
        checkProcessingTime();
    }
    
    private void checkQueueSize() {
        int queueSize = getQueueSize();
        if (queueSize > 1000) {
            sendAlert(AlertLevel.WARNING,
                "キューサイズが閾値を超えています",
                Map.of("queueSize", queueSize));
        }
    }
    
    private void checkErrorRate() {
        double errorRate = calculateErrorRate();
        if (errorRate > 0.05) { // 5%以上
            sendAlert(AlertLevel.ERROR,
                "エラー率が高くなっています",
                Map.of("errorRate", errorRate));
        }
    }
    
    private void checkProcessingTime() {
        long avgProcessingTime = calculateAverageProcessingTime();
        if (avgProcessingTime > 5000) { // 5秒以上
            sendAlert(AlertLevel.WARNING,
                "処理時間が長くなっています",
                Map.of("avgProcessingTime", avgProcessingTime));
        }
    }
    
    private void sendAlert(AlertLevel level, String message, Map<String, Object> details) {
        Alert alert = new Alert(level, message, details);
        notificationService.sendAlert(alert);
    }
}

7. まとめ

非同期処理と並列処理は、現代の業務システム開発において不可欠な要素となっています。以下のポイントを押さえることで、効果的な実装が可能となります:

7.1 設計のポイント

  1. 適切なパターンの選択

    • 処理の特性に応じたパターンの選択
    • スケーラビリティの考慮
    • エラーハンドリングの設計
  2. パフォーマンスの最適化

    • スレッドプールの適切な設定
    • バッチサイズの調整
    • メモリ使用の効率化
  3. 運用管理の考慮

    • モニタリング体制の整備
    • ログ管理の実装
    • アラート設定の構築

7.2 実装時の注意点

  1. リソース管理

    • スレッドプールのサイズ設定
    • メモリ使用量の監視
    • コネクションプールの管理
  2. エラーハンドリング

    • 適切なリトライ戦略
    • デッドレター処理
    • フォールバック処理
  3. 監視と運用

    • パフォーマンスメトリクスの収集
    • ログの適切な設計
    • アラートの設定

参考文献

  1. 「Java並行処理プログラミング」- Brian Goetz
  2. 「Reactive Programming with RxJava」- Tomasz Nurkiewicz
  3. 「実践アプリケーションパフォーマンス」- Scott Oaks
  4. 「Patterns of Enterprise Application Architecture」- Martin Fowler

非同期処理と並列処理は、適切に実装することで大幅なパフォーマンス向上が期待できます。一方で、複雑性も増すため、十分な設計と検証が必要です。本記事で紹介した実装パターンやベストプラクティスを参考に、プロジェクトの要件に合わせた最適な実装を選択してください。

関連記事

2025/3/25

【「動作保証はどこまで?」SIerのためのシステム保守の基本】

SIerエンジニアのためのシステム保守ガイド。業務システムの保守範囲の定義から具体的な保守活動まで、実践的なアプローチを解説します。

2025/3/24

【SIerが知るべきログ設計のベストプラクティス】

SIerエンジニアのためのログ設計ガイド。業務システムにおける効果的なログ設計から運用管理まで、実践的なベストプラクティスを解説します。

2025/3/23

【長年運用されている業務システムの"負債"とどう向き合うか?】

SIerエンジニアのための技術的負債管理ガイド。長年運用されてきた業務システムの負債を理解し、効果的に管理・改善していくための実践的なアプローチを解説します。