森 宝松 SIer Tech Blog

【SIerのためのバッチ処理設計パターンと実装のベストプラクティス】

森 宝松
SIer Tech Blog
2025年3月20日

【SIerのためのバッチ処理設計パターンと実装のベストプラクティス】

業務システムにおいて、バッチ処理は重要な役割を果たしています。本記事では、SIerエンジニアのためのバッチ処理設計パターンと実装のベストプラクティスについて、具体例を交えながら解説します。

1. バッチ処理の基本設計

1.1 バッチ処理の特徴と要件

バッチ処理には以下のような特徴があります:

  1. 定期実行:日次、月次などの定期的な実行
  2. 大量データ処理:大量のデータを一括で処理
  3. 無人運転:自動実行が基本
  4. リカバリ要件:障害時の再実行への対応
  5. トレーサビリティ:処理結果の追跡可能性

1.2 基本的な設計指針

// バッチ処理の基本構造
public class BatchProcessor {
    private final JobLogger logger;
    private final TransactionManager txManager;
    
    public BatchResult execute(BatchContext context) {
        try {
            // 前処理
            preProcess(context);
            
            // メイン処理
            mainProcess(context);
            
            // 後処理
            postProcess(context);
            
            return BatchResult.success();
            
        } catch (Exception e) {
            logger.error("バッチ処理でエラーが発生しました", e);
            return BatchResult.error(e);
        }
    }
    
    private void preProcess(BatchContext context) {
        // 入力チェック
        validateInput(context);
        
        // 実行環境チェック
        checkEnvironment();
        
        // 前回実行結果の確認
        checkPreviousExecution();
    }
    
    private void mainProcess(BatchContext context) {
        // トランザクション制御
        txManager.begin();
        try {
            // メイン処理の実行
            processData(context);
            
            txManager.commit();
        } catch (Exception e) {
            txManager.rollback();
            throw e;
        }
    }
    
    private void postProcess(BatchContext context) {
        // 実行結果の記録
        saveExecutionResult();
        
        // 一時ファイルの削除
        cleanupTemporaryFiles();
    }
}

2. 主要なバッチ処理パターン

2.1 Extract-Transform-Load (ETL) パターン

// ETLパターンの実装例
public class EtlBatchProcessor {
    private final DataExtractor extractor;
    private final DataTransformer transformer;
    private final DataLoader loader;
    
    public void execute() {
        // データ抽出
        List<SourceData> sourceData = extractor.extract();
        
        // データ変換
        List<TargetData> transformedData = transformer.transform(sourceData);
        
        // データロード
        loader.load(transformedData);
    }
}

// データ抽出
public class DataExtractor {
    public List<SourceData> extract() {
        try (Connection conn = dataSource.getConnection();
             PreparedStatement stmt = conn.prepareStatement(
                 "SELECT * FROM source_table WHERE process_flag = 0")) {
            
            List<SourceData> result = new ArrayList<>();
            ResultSet rs = stmt.executeQuery();
            while (rs.next()) {
                result.add(mapToSourceData(rs));
            }
            return result;
        }
    }
}

// データ変換
public class DataTransformer {
    public List<TargetData> transform(List<SourceData> sourceData) {
        return sourceData.stream()
            .map(this::convertToTargetData)
            .collect(Collectors.toList());
    }
    
    private TargetData convertToTargetData(SourceData source) {
        // ビジネスロジックに基づくデータ変換
        return new TargetData(/* 変換ロジック */);
    }
}

// データロード
public class DataLoader {
    public void load(List<TargetData> data) {
        try (Connection conn = dataSource.getConnection()) {
            conn.setAutoCommit(false);
            try (PreparedStatement stmt = conn.prepareStatement(
                "INSERT INTO target_table (col1, col2) VALUES (?, ?)")) {
                
                for (TargetData item : data) {
                    stmt.setString(1, item.getCol1());
                    stmt.setString(2, item.getCol2());
                    stmt.addBatch();
                }
                
                stmt.executeBatch();
                conn.commit();
            } catch (Exception e) {
                conn.rollback();
                throw e;
            }
        }
    }
}

2.2 チャンク処理パターン

// チャンク処理パターンの実装例
public class ChunkProcessor<T> {
    private final int chunkSize;
    private final DataReader<T> reader;
    private final DataProcessor<T> processor;
    private final DataWriter<T> writer;
    
    public void execute() {
        List<T> items = new ArrayList<>();
        
        while (reader.hasNext()) {
            T item = reader.read();
            items.add(item);
            
            if (items.size() >= chunkSize) {
                processChunk(items);
                items.clear();
            }
        }
        
        // 残りのアイテムを処理
        if (!items.isEmpty()) {
            processChunk(items);
        }
    }
    
    private void processChunk(List<T> items) {
        // チャンク単位でのトランザクション制御
        TransactionStatus status = transactionManager.beginTransaction();
        try {
            // データ処理
            List<T> processedItems = items.stream()
                .map(processor::process)
                .collect(Collectors.toList());
            
            // 一括書き込み
            writer.writeAll(processedItems);
            
            transactionManager.commit(status);
        } catch (Exception e) {
            transactionManager.rollback(status);
            throw e;
        }
    }
}

// Spring Batchを使用したチャンク処理の例
@Configuration
public class ChunkJobConfig {
    @Bean
    public Job chunkJob(JobBuilderFactory jobs, StepBuilderFactory steps) {
        return jobs.get("chunkJob")
            .start(chunkStep(steps))
            .build();
    }
    
    @Bean
    public Step chunkStep(StepBuilderFactory steps) {
        return steps.get("chunkStep")
            .<InputData, OutputData>chunk(100)
            .reader(reader())
            .processor(processor())
            .writer(writer())
            .build();
    }
}

2.3 マルチスレッド処理パターン

// マルチスレッド処理パターンの実装例
public class ParallelBatchProcessor {
    private final ExecutorService executor;
    private final int threadCount;
    private final DataPartitioner partitioner;
    private final BatchTaskProcessor processor;
    
    public void execute() {
        // データの分割
        List<DataChunk> chunks = partitioner.partition();
        
        // 並列処理の実行
        List<Future<ProcessResult>> futures = new ArrayList<>();
        for (DataChunk chunk : chunks) {
            futures.add(executor.submit(() -> processor.process(chunk)));
        }
        
        // 結果の集約
        List<ProcessResult> results = new ArrayList<>();
        for (Future<ProcessResult> future : futures) {
            try {
                results.add(future.get());
            } catch (Exception e) {
                throw new BatchProcessingException("並列処理でエラーが発生しました", e);
            }
        }
        
        // 結果の集計
        aggregateResults(results);
    }
}

// Spring Batchを使用した並列処理の例
@Configuration
public class ParallelJobConfig {
    @Bean
    public Job parallelJob(JobBuilderFactory jobs, StepBuilderFactory steps) {
        return jobs.get("parallelJob")
            .start(parallelStep(steps))
            .build();
    }
    
    @Bean
    public Step parallelStep(StepBuilderFactory steps) {
        return steps.get("parallelStep")
            .<InputData, OutputData>chunk(100)
            .reader(reader())
            .processor(processor())
            .writer(writer())
            .taskExecutor(taskExecutor())
            .build();
    }
    
    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(8);
        executor.setQueueCapacity(100);
        return executor;
    }
}

3. エラーハンドリングとリカバリ

3.1 エラーハンドリングの基本パターン

// エラーハンドリングの実装例
public class ErrorHandlingBatchProcessor {
    private final RetryTemplate retryTemplate;
    private final ErrorHandler errorHandler;
    
    public void execute() {
        try {
            retryTemplate.execute(context -> {
                try {
                    // メイン処理
                    processData();
                    return BatchResult.success();
                    
                } catch (RecoverableException e) {
                    // リトライ可能なエラー
                    logger.warn("リトライ可能なエラーが発生しました", e);
                    throw e;
                    
                } catch (Exception e) {
                    // リトライ不可能なエラー
                    logger.error("致命的なエラーが発生しました", e);
                    errorHandler.handleError(e);
                    return BatchResult.error(e);
                }
            });
            
        } catch (Exception e) {
            // リトライ失敗
            logger.error("リトライ処理が失敗しました", e);
            errorHandler.handleRetryFailure(e);
            throw new BatchProcessingException("バッチ処理が失敗しました", e);
        }
    }
}

// リトライポリシーの設定
@Configuration
public class RetryConfig {
    @Bean
    public RetryTemplate retryTemplate() {
        RetryTemplate template = new RetryTemplate();
        
        // リトライポリシーの設定
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        
        // バックオフポリシーの設定
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(1000);
        backOffPolicy.setMultiplier(2);
        
        template.setRetryPolicy(retryPolicy);
        template.setBackOffPolicy(backOffPolicy);
        
        return template;
    }
}

3.2 リカバリ処理の実装

// リカバリ処理の実装例
public class RecoverableBatchProcessor {
    private final CheckpointManager checkpointManager;
    
    public void execute() {
        // チェックポイントの読み込み
        Checkpoint checkpoint = checkpointManager.loadLastCheckpoint();
        
        try {
            // チェックポイントからの再開
            if (checkpoint != null) {
                restoreFromCheckpoint(checkpoint);
            }
            
            // メイン処理
            while (hasMoreData()) {
                processNextBatch();
                
                // チェックポイントの保存
                checkpointManager.saveCheckpoint(createCheckpoint());
            }
            
        } catch (Exception e) {
            // エラー発生時の処理
            handleError(e, checkpoint);
        }
    }
    
    private void restoreFromCheckpoint(Checkpoint checkpoint) {
        // 処理状態の復元
        currentPosition = checkpoint.getPosition();
        processedCount = checkpoint.getProcessedCount();
    }
    
    private Checkpoint createCheckpoint() {
        return new Checkpoint(
            currentPosition,
            processedCount,
            new Date()
        );
    }
}

// チェックポイント管理
public class CheckpointManager {
    private final JdbcTemplate jdbcTemplate;
    
    public void saveCheckpoint(Checkpoint checkpoint) {
        jdbcTemplate.update(
            "INSERT INTO batch_checkpoints (batch_id, position, processed_count, checkpoint_time) " +
            "VALUES (?, ?, ?, ?)",
            checkpoint.getBatchId(),
            checkpoint.getPosition(),
            checkpoint.getProcessedCount(),
            checkpoint.getCheckpointTime()
        );
    }
    
    public Checkpoint loadLastCheckpoint() {
        return jdbcTemplate.queryForObject(
            "SELECT * FROM batch_checkpoints WHERE batch_id = ? " +
            "ORDER BY checkpoint_time DESC LIMIT 1",
            new CheckpointRowMapper(),
            currentBatchId
        );
    }
}

4. パフォーマンスチューニング

4.1 データベースアクセスの最適化

// バルクインサートの実装例
public class BulkInsertProcessor {
    private final JdbcTemplate jdbcTemplate;
    private final int batchSize;
    
    public void bulkInsert(List<Data> dataList) {
        jdbcTemplate.batchUpdate(
            "INSERT INTO target_table (col1, col2, col3) VALUES (?, ?, ?)",
            new BatchPreparedStatementSetter() {
                @Override
                public void setValues(PreparedStatement ps, int i) throws SQLException {
                    Data data = dataList.get(i);
                    ps.setString(1, data.getCol1());
                    ps.setString(2, data.getCol2());
                    ps.setInt(3, data.getCol3());
                }
                
                @Override
                public int getBatchSize() {
                    return dataList.size();
                }
            }
        );
    }
}

// インデックス管理
public class IndexManager {
    public void optimizeIndexes() {
        // 処理前にインデックスを削除
        dropIndexes();
        
        try {
            // バッチ処理の実行
            executeBatchProcess();
            
        } finally {
            // 処理後にインデックスを再作成
            recreateIndexes();
        }
    }
    
    private void dropIndexes() {
        jdbcTemplate.execute("DROP INDEX idx_target_table_col1");
        jdbcTemplate.execute("DROP INDEX idx_target_table_col2");
    }
    
    private void recreateIndexes() {
        jdbcTemplate.execute("CREATE INDEX idx_target_table_col1 ON target_table(col1)");
        jdbcTemplate.execute("CREATE INDEX idx_target_table_col2 ON target_table(col2)");
    }
}

4.2 メモリ管理の最適化

// メモリ効率の良いデータ処理
public class MemoryEfficientProcessor {
    private final int bufferSize;
    private final DataReader reader;
    private final DataWriter writer;
    
    public void process() {
        try (Stream<Data> dataStream = reader.readAsStream()) {
            dataStream
                .filter(this::isValid)
                .map(this::transform)
                .forEach(writer::write);
        }
    }
}

// カーソル処理の実装
public class CursorBasedProcessor {
    public void processCursor() {
        try (Connection conn = dataSource.getConnection();
             PreparedStatement stmt = conn.prepareStatement(
                 "SELECT * FROM source_table",
                 ResultSet.TYPE_FORWARD_ONLY,
                 ResultSet.CONCUR_READ_ONLY)) {
            
            stmt.setFetchSize(1000);
            ResultSet rs = stmt.executeQuery();
            
            while (rs.next()) {
                processRow(rs);
            }
        }
    }
}

5. 運用管理と監視

5.1 ジョブ管理

// ジョブ管理の実装例
public class JobManager {
    private final JobRepository repository;
    private final JobMonitor monitor;
    
    public JobExecution startJob(JobParameters params) {
        // ジョブの開始
        JobExecution execution = repository.createJobExecution(params);
        
        try {
            // ジョブの実行
            execution.setStatus(BatchStatus.STARTED);
            execution.setStartTime(new Date());
            
            Job job = getJob(params.getJobName());
            JobResult result = job.execute(params);
            
            // 実行結果の記録
            execution.setStatus(result.isSuccess() ? 
                BatchStatus.COMPLETED : BatchStatus.FAILED);
            execution.setEndTime(new Date());
            
        } catch (Exception e) {
            // エラー処理
            execution.setStatus(BatchStatus.FAILED);
            execution.setEndTime(new Date());
            execution.addFailureException(e);
        }
        
        // 実行結果の保存
        repository.update(execution);
        
        return execution;
    }
}

// ジョブスケジューラ
@Configuration
public class JobSchedulerConfig {
    @Bean
    public JobDetail sampleJobDetail() {
        return JobBuilder.newJob(SampleJob.class)
            .withIdentity("sampleJob")
            .storeDurably()
            .build();
    }
    
    @Bean
    public Trigger sampleJobTrigger() {
        return TriggerBuilder.newTrigger()
            .forJob(sampleJobDetail())
            .withIdentity("sampleTrigger")
            .withSchedule(CronScheduleBuilder.dailyAtHourAndMinute(1, 0))
            .build();
    }
}

5.2 監視と通知

// バッチ監視の実装例
public class BatchMonitor {
    private final AlertNotifier notifier;
    private final MetricsCollector metricsCollector;
    
    public void monitorExecution(JobExecution execution) {
        // メトリクスの収集
        metricsCollector.recordExecutionTime(
            execution.getJobName(),
            execution.getEndTime().getTime() - execution.getStartTime().getTime()
        );
        
        metricsCollector.recordProcessedCount(
            execution.getJobName(),
            execution.getProcessedCount()
        );
        
        // 異常検知
        if (execution.getStatus() == BatchStatus.FAILED) {
            notifier.sendAlert(
                new Alert(
                    AlertLevel.ERROR,
                    "バッチ処理が失敗しました",
                    execution.getFailureExceptions()
                )
            );
        }
        
        // 処理時間の監視
        if (isProcessingTimeTooLong(execution)) {
            notifier.sendAlert(
                new Alert(
                    AlertLevel.WARNING,
                    "バッチ処理に時間がかかっています",
                    execution.getExecutionContext()
                )
            );
        }
    }
}

// メトリクス収集
public class MetricsCollector {
    private final MeterRegistry registry;
    
    public void recordExecutionTime(String jobName, long executionTime) {
        registry.timer("batch.execution.time", "job", jobName)
            .record(executionTime, TimeUnit.MILLISECONDS);
    }
    
    public void recordProcessedCount(String jobName, long count) {
        registry.counter("batch.processed.count", "job", jobName)
            .increment(count);
    }
}

6. テストと品質保証

6.1 単体テスト

// バッチ処理の単体テスト
@SpringBootTest
public class BatchProcessorTest {
    @Autowired
    private BatchProcessor processor;
    
    @Test
    public void testNormalProcessing() {
        // テストデータの準備
        BatchContext context = new BatchContext();
        context.setInputData(createTestData());
        
        // 実行
        BatchResult result = processor.execute(context);
        
        // 検証
        assertThat(result.isSuccess()).isTrue();
        assertThat(result.getProcessedCount()).isEqualTo(100);
        verifyOutputData();
    }
    
    @Test
    public void testErrorHandling() {
        // エラーケースのテストデータ準備
        BatchContext context = new BatchContext();
        context.setInputData(createErrorData());
        
        // 実行
        BatchResult result = processor.execute(context);
        
        // 検証
        assertThat(result.isSuccess()).isFalse();
        assertThat(result.getError()).isInstanceOf(ValidationException.class);
    }
}

// Spring Batchのテスト
@SpringBatchTest
public class SpringBatchJobTest {
    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;
    
    @Test
    public void testJob() throws Exception {
        // ジョブパラメータの設定
        JobParameters params = new JobParametersBuilder()
            .addString("inputFile", "test-data.csv")
            .addDate("executionDate", new Date())
            .toJobParameters();
        
        // ジョブの実行
        JobExecution jobExecution = jobLauncherTestUtils.launchJob(params);
        
        // 結果の検証
        assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
        verifyJobResults();
    }
}

6.2 統合テスト

// 統合テストの実装例
@SpringBootTest
public class BatchIntegrationTest {
    @Autowired
    private JobLauncher jobLauncher;
    
    @Autowired
    private Job batchJob;
    
    @Autowired
    private DataSource dataSource;
    
    @Test
    public void testEndToEnd() throws Exception {
        // テストデータのセットアップ
        setupTestData();
        
        // ジョブの実行
        JobParameters params = new JobParametersBuilder()
            .addDate("date", new Date())
            .toJobParameters();
        
        JobExecution execution = jobLauncher.run(batchJob, params);
        
        // 結果の検証
        assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
        
        // データベースの状態を検証
        try (Connection conn = dataSource.getConnection()) {
            try (PreparedStatement stmt = conn.prepareStatement(
                "SELECT COUNT(*) FROM target_table WHERE process_date = ?")) {
                
                stmt.setDate(1, new java.sql.Date(System.currentTimeMillis()));
                ResultSet rs = stmt.executeQuery();
                
                assertThat(rs.next()).isTrue();
                assertThat(rs.getInt(1)).isEqualTo(expectedCount);
            }
        }
    }
}

7. まとめ

バッチ処理の設計と実装において、以下の点に注意を払うことが重要です:

  1. 基本設計

    • 処理の分割と責務の明確化
    • エラーハンドリングの考慮
    • リカバリ機能の実装
  2. パフォーマンス

    • 適切なバッチサイズの選定
    • メモリ使用量の最適化
    • データベースアクセスの効率化
  3. 運用管理

    • ジョブ管理の仕組み
    • 監視と通知の実装
    • メンテナンス性の確保
  4. 品質保証

    • 単体テストの充実
    • 統合テストによる検証
    • 性能テストの実施

参考文献

  1. 「実践 Spring Batch」- Michael Minella
  2. 「バッチ処理システム設計の実践」- 技術評論社
  3. 「Javaバッチ処理プログラミング」- 翔泳社
  4. 「エンタープライズアプリケーションアーキテクチャパターン」- Martin Fowler

バッチ処理の設計と実装は、業務システムの重要な部分を占めています。本記事で紹介したパターンとベストプラクティスを参考に、効率的で保守性の高いバッチ処理システムを構築してください。

関連記事

2025/3/25

【「動作保証はどこまで?」SIerのためのシステム保守の基本】

SIerエンジニアのためのシステム保守ガイド。業務システムの保守範囲の定義から具体的な保守活動まで、実践的なアプローチを解説します。

2025/3/24

【SIerが知るべきログ設計のベストプラクティス】

SIerエンジニアのためのログ設計ガイド。業務システムにおける効果的なログ設計から運用管理まで、実践的なベストプラクティスを解説します。

2025/3/23

【長年運用されている業務システムの"負債"とどう向き合うか?】

SIerエンジニアのための技術的負債管理ガイド。長年運用されてきた業務システムの負債を理解し、効果的に管理・改善していくための実践的なアプローチを解説します。