【SIerのためのバッチ処理設計パターンと実装のベストプラクティス】

SIer Tech Blog
2025年3月20日
【SIerのためのバッチ処理設計パターンと実装のベストプラクティス】
業務システムにおいて、バッチ処理は重要な役割を果たしています。本記事では、SIerエンジニアのためのバッチ処理設計パターンと実装のベストプラクティスについて、具体例を交えながら解説します。
1. バッチ処理の基本設計
1.1 バッチ処理の特徴と要件
バッチ処理には以下のような特徴があります:
- 定期実行:日次、月次などの定期的な実行
- 大量データ処理:大量のデータを一括で処理
- 無人運転:自動実行が基本
- リカバリ要件:障害時の再実行への対応
- トレーサビリティ:処理結果の追跡可能性
1.2 基本的な設計指針
// バッチ処理の基本構造
public class BatchProcessor {
private final JobLogger logger;
private final TransactionManager txManager;
public BatchResult execute(BatchContext context) {
try {
// 前処理
preProcess(context);
// メイン処理
mainProcess(context);
// 後処理
postProcess(context);
return BatchResult.success();
} catch (Exception e) {
logger.error("バッチ処理でエラーが発生しました", e);
return BatchResult.error(e);
}
}
private void preProcess(BatchContext context) {
// 入力チェック
validateInput(context);
// 実行環境チェック
checkEnvironment();
// 前回実行結果の確認
checkPreviousExecution();
}
private void mainProcess(BatchContext context) {
// トランザクション制御
txManager.begin();
try {
// メイン処理の実行
processData(context);
txManager.commit();
} catch (Exception e) {
txManager.rollback();
throw e;
}
}
private void postProcess(BatchContext context) {
// 実行結果の記録
saveExecutionResult();
// 一時ファイルの削除
cleanupTemporaryFiles();
}
}
2. 主要なバッチ処理パターン
2.1 Extract-Transform-Load (ETL) パターン
// ETLパターンの実装例
public class EtlBatchProcessor {
private final DataExtractor extractor;
private final DataTransformer transformer;
private final DataLoader loader;
public void execute() {
// データ抽出
List<SourceData> sourceData = extractor.extract();
// データ変換
List<TargetData> transformedData = transformer.transform(sourceData);
// データロード
loader.load(transformedData);
}
}
// データ抽出
public class DataExtractor {
public List<SourceData> extract() {
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(
"SELECT * FROM source_table WHERE process_flag = 0")) {
List<SourceData> result = new ArrayList<>();
ResultSet rs = stmt.executeQuery();
while (rs.next()) {
result.add(mapToSourceData(rs));
}
return result;
}
}
}
// データ変換
public class DataTransformer {
public List<TargetData> transform(List<SourceData> sourceData) {
return sourceData.stream()
.map(this::convertToTargetData)
.collect(Collectors.toList());
}
private TargetData convertToTargetData(SourceData source) {
// ビジネスロジックに基づくデータ変換
return new TargetData(/* 変換ロジック */);
}
}
// データロード
public class DataLoader {
public void load(List<TargetData> data) {
try (Connection conn = dataSource.getConnection()) {
conn.setAutoCommit(false);
try (PreparedStatement stmt = conn.prepareStatement(
"INSERT INTO target_table (col1, col2) VALUES (?, ?)")) {
for (TargetData item : data) {
stmt.setString(1, item.getCol1());
stmt.setString(2, item.getCol2());
stmt.addBatch();
}
stmt.executeBatch();
conn.commit();
} catch (Exception e) {
conn.rollback();
throw e;
}
}
}
}
2.2 チャンク処理パターン
// チャンク処理パターンの実装例
public class ChunkProcessor<T> {
private final int chunkSize;
private final DataReader<T> reader;
private final DataProcessor<T> processor;
private final DataWriter<T> writer;
public void execute() {
List<T> items = new ArrayList<>();
while (reader.hasNext()) {
T item = reader.read();
items.add(item);
if (items.size() >= chunkSize) {
processChunk(items);
items.clear();
}
}
// 残りのアイテムを処理
if (!items.isEmpty()) {
processChunk(items);
}
}
private void processChunk(List<T> items) {
// チャンク単位でのトランザクション制御
TransactionStatus status = transactionManager.beginTransaction();
try {
// データ処理
List<T> processedItems = items.stream()
.map(processor::process)
.collect(Collectors.toList());
// 一括書き込み
writer.writeAll(processedItems);
transactionManager.commit(status);
} catch (Exception e) {
transactionManager.rollback(status);
throw e;
}
}
}
// Spring Batchを使用したチャンク処理の例
@Configuration
public class ChunkJobConfig {
@Bean
public Job chunkJob(JobBuilderFactory jobs, StepBuilderFactory steps) {
return jobs.get("chunkJob")
.start(chunkStep(steps))
.build();
}
@Bean
public Step chunkStep(StepBuilderFactory steps) {
return steps.get("chunkStep")
.<InputData, OutputData>chunk(100)
.reader(reader())
.processor(processor())
.writer(writer())
.build();
}
}
2.3 マルチスレッド処理パターン
// マルチスレッド処理パターンの実装例
public class ParallelBatchProcessor {
private final ExecutorService executor;
private final int threadCount;
private final DataPartitioner partitioner;
private final BatchTaskProcessor processor;
public void execute() {
// データの分割
List<DataChunk> chunks = partitioner.partition();
// 並列処理の実行
List<Future<ProcessResult>> futures = new ArrayList<>();
for (DataChunk chunk : chunks) {
futures.add(executor.submit(() -> processor.process(chunk)));
}
// 結果の集約
List<ProcessResult> results = new ArrayList<>();
for (Future<ProcessResult> future : futures) {
try {
results.add(future.get());
} catch (Exception e) {
throw new BatchProcessingException("並列処理でエラーが発生しました", e);
}
}
// 結果の集計
aggregateResults(results);
}
}
// Spring Batchを使用した並列処理の例
@Configuration
public class ParallelJobConfig {
@Bean
public Job parallelJob(JobBuilderFactory jobs, StepBuilderFactory steps) {
return jobs.get("parallelJob")
.start(parallelStep(steps))
.build();
}
@Bean
public Step parallelStep(StepBuilderFactory steps) {
return steps.get("parallelStep")
.<InputData, OutputData>chunk(100)
.reader(reader())
.processor(processor())
.writer(writer())
.taskExecutor(taskExecutor())
.build();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setQueueCapacity(100);
return executor;
}
}
3. エラーハンドリングとリカバリ
3.1 エラーハンドリングの基本パターン
// エラーハンドリングの実装例
public class ErrorHandlingBatchProcessor {
private final RetryTemplate retryTemplate;
private final ErrorHandler errorHandler;
public void execute() {
try {
retryTemplate.execute(context -> {
try {
// メイン処理
processData();
return BatchResult.success();
} catch (RecoverableException e) {
// リトライ可能なエラー
logger.warn("リトライ可能なエラーが発生しました", e);
throw e;
} catch (Exception e) {
// リトライ不可能なエラー
logger.error("致命的なエラーが発生しました", e);
errorHandler.handleError(e);
return BatchResult.error(e);
}
});
} catch (Exception e) {
// リトライ失敗
logger.error("リトライ処理が失敗しました", e);
errorHandler.handleRetryFailure(e);
throw new BatchProcessingException("バッチ処理が失敗しました", e);
}
}
}
// リトライポリシーの設定
@Configuration
public class RetryConfig {
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate template = new RetryTemplate();
// リトライポリシーの設定
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
// バックオフポリシーの設定
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(2);
template.setRetryPolicy(retryPolicy);
template.setBackOffPolicy(backOffPolicy);
return template;
}
}
3.2 リカバリ処理の実装
// リカバリ処理の実装例
public class RecoverableBatchProcessor {
private final CheckpointManager checkpointManager;
public void execute() {
// チェックポイントの読み込み
Checkpoint checkpoint = checkpointManager.loadLastCheckpoint();
try {
// チェックポイントからの再開
if (checkpoint != null) {
restoreFromCheckpoint(checkpoint);
}
// メイン処理
while (hasMoreData()) {
processNextBatch();
// チェックポイントの保存
checkpointManager.saveCheckpoint(createCheckpoint());
}
} catch (Exception e) {
// エラー発生時の処理
handleError(e, checkpoint);
}
}
private void restoreFromCheckpoint(Checkpoint checkpoint) {
// 処理状態の復元
currentPosition = checkpoint.getPosition();
processedCount = checkpoint.getProcessedCount();
}
private Checkpoint createCheckpoint() {
return new Checkpoint(
currentPosition,
processedCount,
new Date()
);
}
}
// チェックポイント管理
public class CheckpointManager {
private final JdbcTemplate jdbcTemplate;
public void saveCheckpoint(Checkpoint checkpoint) {
jdbcTemplate.update(
"INSERT INTO batch_checkpoints (batch_id, position, processed_count, checkpoint_time) " +
"VALUES (?, ?, ?, ?)",
checkpoint.getBatchId(),
checkpoint.getPosition(),
checkpoint.getProcessedCount(),
checkpoint.getCheckpointTime()
);
}
public Checkpoint loadLastCheckpoint() {
return jdbcTemplate.queryForObject(
"SELECT * FROM batch_checkpoints WHERE batch_id = ? " +
"ORDER BY checkpoint_time DESC LIMIT 1",
new CheckpointRowMapper(),
currentBatchId
);
}
}
4. パフォーマンスチューニング
4.1 データベースアクセスの最適化
// バルクインサートの実装例
public class BulkInsertProcessor {
private final JdbcTemplate jdbcTemplate;
private final int batchSize;
public void bulkInsert(List<Data> dataList) {
jdbcTemplate.batchUpdate(
"INSERT INTO target_table (col1, col2, col3) VALUES (?, ?, ?)",
new BatchPreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps, int i) throws SQLException {
Data data = dataList.get(i);
ps.setString(1, data.getCol1());
ps.setString(2, data.getCol2());
ps.setInt(3, data.getCol3());
}
@Override
public int getBatchSize() {
return dataList.size();
}
}
);
}
}
// インデックス管理
public class IndexManager {
public void optimizeIndexes() {
// 処理前にインデックスを削除
dropIndexes();
try {
// バッチ処理の実行
executeBatchProcess();
} finally {
// 処理後にインデックスを再作成
recreateIndexes();
}
}
private void dropIndexes() {
jdbcTemplate.execute("DROP INDEX idx_target_table_col1");
jdbcTemplate.execute("DROP INDEX idx_target_table_col2");
}
private void recreateIndexes() {
jdbcTemplate.execute("CREATE INDEX idx_target_table_col1 ON target_table(col1)");
jdbcTemplate.execute("CREATE INDEX idx_target_table_col2 ON target_table(col2)");
}
}
4.2 メモリ管理の最適化
// メモリ効率の良いデータ処理
public class MemoryEfficientProcessor {
private final int bufferSize;
private final DataReader reader;
private final DataWriter writer;
public void process() {
try (Stream<Data> dataStream = reader.readAsStream()) {
dataStream
.filter(this::isValid)
.map(this::transform)
.forEach(writer::write);
}
}
}
// カーソル処理の実装
public class CursorBasedProcessor {
public void processCursor() {
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(
"SELECT * FROM source_table",
ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY)) {
stmt.setFetchSize(1000);
ResultSet rs = stmt.executeQuery();
while (rs.next()) {
processRow(rs);
}
}
}
}
5. 運用管理と監視
5.1 ジョブ管理
// ジョブ管理の実装例
public class JobManager {
private final JobRepository repository;
private final JobMonitor monitor;
public JobExecution startJob(JobParameters params) {
// ジョブの開始
JobExecution execution = repository.createJobExecution(params);
try {
// ジョブの実行
execution.setStatus(BatchStatus.STARTED);
execution.setStartTime(new Date());
Job job = getJob(params.getJobName());
JobResult result = job.execute(params);
// 実行結果の記録
execution.setStatus(result.isSuccess() ?
BatchStatus.COMPLETED : BatchStatus.FAILED);
execution.setEndTime(new Date());
} catch (Exception e) {
// エラー処理
execution.setStatus(BatchStatus.FAILED);
execution.setEndTime(new Date());
execution.addFailureException(e);
}
// 実行結果の保存
repository.update(execution);
return execution;
}
}
// ジョブスケジューラ
@Configuration
public class JobSchedulerConfig {
@Bean
public JobDetail sampleJobDetail() {
return JobBuilder.newJob(SampleJob.class)
.withIdentity("sampleJob")
.storeDurably()
.build();
}
@Bean
public Trigger sampleJobTrigger() {
return TriggerBuilder.newTrigger()
.forJob(sampleJobDetail())
.withIdentity("sampleTrigger")
.withSchedule(CronScheduleBuilder.dailyAtHourAndMinute(1, 0))
.build();
}
}
5.2 監視と通知
// バッチ監視の実装例
public class BatchMonitor {
private final AlertNotifier notifier;
private final MetricsCollector metricsCollector;
public void monitorExecution(JobExecution execution) {
// メトリクスの収集
metricsCollector.recordExecutionTime(
execution.getJobName(),
execution.getEndTime().getTime() - execution.getStartTime().getTime()
);
metricsCollector.recordProcessedCount(
execution.getJobName(),
execution.getProcessedCount()
);
// 異常検知
if (execution.getStatus() == BatchStatus.FAILED) {
notifier.sendAlert(
new Alert(
AlertLevel.ERROR,
"バッチ処理が失敗しました",
execution.getFailureExceptions()
)
);
}
// 処理時間の監視
if (isProcessingTimeTooLong(execution)) {
notifier.sendAlert(
new Alert(
AlertLevel.WARNING,
"バッチ処理に時間がかかっています",
execution.getExecutionContext()
)
);
}
}
}
// メトリクス収集
public class MetricsCollector {
private final MeterRegistry registry;
public void recordExecutionTime(String jobName, long executionTime) {
registry.timer("batch.execution.time", "job", jobName)
.record(executionTime, TimeUnit.MILLISECONDS);
}
public void recordProcessedCount(String jobName, long count) {
registry.counter("batch.processed.count", "job", jobName)
.increment(count);
}
}
6. テストと品質保証
6.1 単体テスト
// バッチ処理の単体テスト
@SpringBootTest
public class BatchProcessorTest {
@Autowired
private BatchProcessor processor;
@Test
public void testNormalProcessing() {
// テストデータの準備
BatchContext context = new BatchContext();
context.setInputData(createTestData());
// 実行
BatchResult result = processor.execute(context);
// 検証
assertThat(result.isSuccess()).isTrue();
assertThat(result.getProcessedCount()).isEqualTo(100);
verifyOutputData();
}
@Test
public void testErrorHandling() {
// エラーケースのテストデータ準備
BatchContext context = new BatchContext();
context.setInputData(createErrorData());
// 実行
BatchResult result = processor.execute(context);
// 検証
assertThat(result.isSuccess()).isFalse();
assertThat(result.getError()).isInstanceOf(ValidationException.class);
}
}
// Spring Batchのテスト
@SpringBatchTest
public class SpringBatchJobTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Test
public void testJob() throws Exception {
// ジョブパラメータの設定
JobParameters params = new JobParametersBuilder()
.addString("inputFile", "test-data.csv")
.addDate("executionDate", new Date())
.toJobParameters();
// ジョブの実行
JobExecution jobExecution = jobLauncherTestUtils.launchJob(params);
// 結果の検証
assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
verifyJobResults();
}
}
6.2 統合テスト
// 統合テストの実装例
@SpringBootTest
public class BatchIntegrationTest {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job batchJob;
@Autowired
private DataSource dataSource;
@Test
public void testEndToEnd() throws Exception {
// テストデータのセットアップ
setupTestData();
// ジョブの実行
JobParameters params = new JobParametersBuilder()
.addDate("date", new Date())
.toJobParameters();
JobExecution execution = jobLauncher.run(batchJob, params);
// 結果の検証
assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED);
// データベースの状態を検証
try (Connection conn = dataSource.getConnection()) {
try (PreparedStatement stmt = conn.prepareStatement(
"SELECT COUNT(*) FROM target_table WHERE process_date = ?")) {
stmt.setDate(1, new java.sql.Date(System.currentTimeMillis()));
ResultSet rs = stmt.executeQuery();
assertThat(rs.next()).isTrue();
assertThat(rs.getInt(1)).isEqualTo(expectedCount);
}
}
}
}
7. まとめ
バッチ処理の設計と実装において、以下の点に注意を払うことが重要です:
-
基本設計
- 処理の分割と責務の明確化
- エラーハンドリングの考慮
- リカバリ機能の実装
-
パフォーマンス
- 適切なバッチサイズの選定
- メモリ使用量の最適化
- データベースアクセスの効率化
-
運用管理
- ジョブ管理の仕組み
- 監視と通知の実装
- メンテナンス性の確保
-
品質保証
- 単体テストの充実
- 統合テストによる検証
- 性能テストの実施
参考文献
- 「実践 Spring Batch」- Michael Minella
- 「バッチ処理システム設計の実践」- 技術評論社
- 「Javaバッチ処理プログラミング」- 翔泳社
- 「エンタープライズアプリケーションアーキテクチャパターン」- Martin Fowler
バッチ処理の設計と実装は、業務システムの重要な部分を占めています。本記事で紹介したパターンとベストプラクティスを参考に、効率的で保守性の高いバッチ処理システムを構築してください。
関連記事
2025/3/25
【「動作保証はどこまで?」SIerのためのシステム保守の基本】
SIerエンジニアのためのシステム保守ガイド。業務システムの保守範囲の定義から具体的な保守活動まで、実践的なアプローチを解説します。
2025/3/24
【SIerが知るべきログ設計のベストプラクティス】
SIerエンジニアのためのログ設計ガイド。業務システムにおける効果的なログ設計から運用管理まで、実践的なベストプラクティスを解説します。
2025/3/23
【長年運用されている業務システムの"負債"とどう向き合うか?】
SIerエンジニアのための技術的負債管理ガイド。長年運用されてきた業務システムの負債を理解し、効果的に管理・改善していくための実践的なアプローチを解説します。