如何使用 Spring-Batch 读取由 id-column 分块的 csv 行?

如何使用 Spring-Batch 读取由 id-column 分块的 csv 行?

IT小君   2021-11-15T13:57:45

我正在使用Spring-Batch读取 csv 文件,格式化内容并将其写入数据库,例如:

StepBuilder<T, T> builder = stepBuilderFactory.get("step")
    .<T, T>chunk(100)
    .reader(flatFileItemReader)
    .processor(processor)
    .writer(jpaItemWriter);

csv 包含一个 ID 列。如何修改reader基于该 ID 的块?例子:

#id, #value
1, first
1000, second
1001, second
1005, second

在这种情况下,块只会读取第一行,然后提交,然后继续。

是否可以通过文件中的值应用分块?

服务器费用不足...
评论(1)
IT小君

我使用自定义CompletionPolicyPeekableItemReader做了同样事情
代码背后的想法是查看下一个项目,执行下一个元素读取并检查值更改。
当值发生变化时,true返回CompletionPolicy.isComplete()

重要提示:此策略必须注册为步骤监听器!

public class BreakKeyCompletionPolicy extends CompletionPolicySupport{
    private BreakKeyCompletionContext cc;
    private PeekableItemReader<Object> reader;
    // Strategy used to check for value break
    private BreakKeyStrategy<Object> strategy;

    public void setReader(PeekableItemReader<Object> forseeingReader){
        this.reader = forseeingReader;
    }

    @Override
    public boolean isComplete(RepeatContext context){
        return this.cc.isComplete();
    }

    @Override
    public RepeatContext start(RepeatContext context)   {
        context.setAttribute("current", null);
        this.cc = new BreakKeyCompletionContext(context);
        return cc;
    }
    /** Context contains current element ("current" property" and manage next element.
     * Null next element is treated as a key break
     */
    protected class BreakKeyCompletionContext extends RepeatContextSupport {
        public BreakKeyCompletionContext(RepeatContext context)     {
            super(context);
        }
        public boolean isComplete(){
            final Object next;
            try{
                next = reader.peek();
            }
            catch (Exception e){
                throw new NonTransientResourceException("Unable to peek", e);
            }
            if (null == next){
                return true;
            }
            return strategy.isKeyBreak(this.getAttribute("current"), next);
        }
    }

    @AfterRead
    public void afterRead(Object item){
        this.cc.setAttribute("current", item);
    }
}
2021-11-15T13:57:45   回复