如果开启容错机制调用FaultTolerantChunkProvider的read方法抛
如果异常不允许被跳跃则继续判断该异常是否可以不回退事务。
以(jdbcpagingItemReader为例) 同步方法首先读取缓存中是否存在有效记
如果读取到有效Item 则调用ItemReadListener.afterRead();
如果出现异常调用ItemReadListener.onReadError();
在chunk事务提交之前所有的读取等操作记录数会放到stepContribution中缓
开始循环处理每个Item。chunkProcessor.process(contribu
ItemProcessListener.beforeProcess();
itemProcessor.process();
ItemProcessListener.afterProcess();
处理时判断是否容错如果是则调用FaultTolerantChunkProcessor的t
调用BatchRetryTemplate的excute方法 。如果重试成功则返回retr
每次执行时首先调用RetryListener的open方法
之后在每次retry之间可以通过backoffpolicy初始化环境
调用backoffpolicy的start方法。
如果执行过程中出现异常调用backOffPolicy.backOff(backOffCo
在执行过程中如果出现异常首先判断该异常是否导致数据库事务回退。
如果不回退的化继续判断当前一场是否可以被skip。
如果可以调用skiplistener的onSkipInProcess方法。
如果不回退同时不可以跳跃则抛出异常。所以需要确保所有异常或者回退或者不回退可以被skip
最后在一次执行的结束前调用RetryListener的close方法。
如果存在异常ItemProcessListener.onProcessError();
ItemWriteListener.beforeWrite(List);
itemWriter.write(items);
ItemWriteListener.afterWrite(List);
写入时判断是否容错如果是则调用FaultTolerantChunkProcessor的w
当时调用itemwriter及writelistener出现异常时 不论是否设置了nor
执行FaultTolerantChunkProvider.postProcess(con
如果当前taskletstep创建是支持faulttolerant(容错)。
根据chunk中items是否为空判断是否继续循环。
事务执行完成。
ChunkListener.afterChunk();
判断循环是否完成。
完成调用ExitStatus StepExecutionListener.afterSt
Itemstream.close()。
处理完所有step 调用JobExecutionListener.afterJob();
源码解读-以面向chunk处理的taskletstep为例
jobLauncher触发Job。
首先利用job名称,与jobParameters获取指定instance的最后一个JobExecution实例。
如果存在当前job是否允许重启。禁止的话则抛出异常
判断当前JobExecution是否存在batchstatus为未知的StepExecution,存在则抛出异常。
进行jobParameters参数有效性校验。
创建代表本次执行的jobExecution。
使用job名称与jobParameters获取jobInstance.
如果存在历史jobInstance,查询该jobInstance的所有jobExecution。判断是否存在运行中的jobExecution以及是否存在BatchStatus为ABANDONED或COMPLETED的。存在则抛出异常。
继续获取该jobinstance最后一次执行对应的jobexecutionContext。
如果不存在历史jobInstance。则新建jobexecutionContext。
将jobexecutionContext赋值与新创建的jobExecution返回。
利用TaskExecutor以JobExecution为参数执行job的execute方法。
触发job具体实现类SimpleJob 或FlowJob的doExecute方法。
SimpleStepHandler执行handlestep方法(完成JobRepository存储与steprestart 相关的操作)。
依据JobExecution获取当前step的最后一次StepExecution,判断该StepExecution是否属于当前的JobExecution如果是的话,则可能是因为我们希望重启一下。
继续根据最后一次StepExecution的状态以及startLimit参数来判断是否允许start。
如果允许的话新建StepExecution,判断是否为step重启是的话将最后一次执行的stepExecutionContext赋值给当前StepExecution。不是重启则新建stepExecutionContext赋值与StepExecution。
存储之后调用step.execute(currentStepExecution)。
AbstractStep执行execute方法
执行StepExecutionListener的beforeStep方法。
调用ItemStream的open方法。
执行Step具体实现类的doExecute方法。
执行stream的update方法。相关信息存入JobRepository
RepeatOperations对象开始迭代执行stepOperations.iterate(new StepContextRepeatCallback(stepExecution)。
调用repeatTemplate的iterate方法 进而调用executeInternal方法。
调用RepeatListener的open方法(每次完整的批处理执行一次)
开始循环处理 首先执行RepeatListener的before方法(每个ITEM执行一次)
执行RepeatTemplate的getNextResult方法 update(RepeatContext context) {completionPolicy.update(context);} 首先更改completionPolicy计数。
执行StepContextRepeatCallback的doInIteration方法
执行StepContextRepeatCallback的doInChunkContext方法
执行ChunkTransactionCallback的doInTransaction方法。
执行chunkListener.beforeChunk(chunkContext);
执行result = tasklet.execute(contribution, chunkContext);
分析ChunkOrientedTasklet的execute方法。
执行 SimpleChunkProvider.provide(contribution)。
又一次循环执行,循环执行时RepeatTemplate的executeInternal方法内部会进行 if (isComplete(context, result) || isMarkedComplete(context) || !deferred.isEmpty()) {
running = false;} 完成策略是否结束的判断。
继续调用SimpleChunkProvider的read方法 继续doread方法,调用 beforeRead方法。
继续调用itemreader的read方法
AbstractItemCountingItemStreamItemReader的read方法
调用 AbstractPagingItemReader的 doread方法此处有线程同步所以JdbcPagingItemReader是线程安全的。
继续调用 JdbcPagingItemReader的doReadPage方法。使用PagingRowMapper ,pagesize,queryProvider,parameterValues等属性一次数据库访问批量获取pageSize个元素放入results中。
所以chunkProvider的provide方法被循环执行一个一个item获取直到满足了chunksize大小 然后返回。pagesize决定了从数据库中一次性读出多少个元素。chunksize决定了多少个元素进行一个事务的提交。
执行 chunkProcessor.process(contribution, inputs);
进行 processor和writer的调用。
一个chunk执行完成后 回到taskletstep的doExecute方法中继续循环执行。taskstep在build时AbstractTaskletStepBuilder会调用createTasklet方法
SimpleStepBuilder的createTasklet的方法中createChunkOperations继续调用getChunkCompletionPolicy方法最终返回SimpleCompletionPolicy。指定了chunk的size 并将其设置给chunkProvider。