In previous post I wrote about basic Batch processing for Java Platform capabilities. I tried out partitions and error handling now. Let’s look at error handling first. With chunk style steps have following options:

  • Skipping chunk step when a skippable exception is thrown.
  • Retry a batch step when a retryable exception is thrown. Transaction for current step will be rolled back unless exception is also configured as no-rollback type exception
  • No rollback for given exception class

Processing applies for exceptions thrown from all phases (read, process, write) and checkpoint commit. We can specify what exception classes are to be included in each option – that is, if given exception class is skippable exception. We can also exclude some exception classes and batch processing engine will use nearest class algorithm to choose appropriate strategy to use. If we include exception class A as skippable and this class has two subclasses B and C, and we configure exclude rule for class C then exceptions A and B will cause engine to skip step while exception C will not cause the engine to skip step excecution. If no other strategy is configured then job execution will be marked as FAILED for C.

Let’s see an example

Here we skip step for pl.spiralarchitect.kplan.batch.InvalidResourceFormatException exception. We add code that throws this exception to factory method for KnowledgeResource:

In test file we make one line invalid:

Step for this line will be skipped.

For Batchlet style steps we need to handle exceptions our selves. If exception gets out of step job will be marked as failed.

Another more advanced feature is partitioning of steps. Partitioning is available for both chunk as well as batchlet style steps. Consider example xml below:

In this configuration we specify that there are to be two partitions and two threads are to be used to process them, so one thread for a partition. This configuration can be also specified using a partition mapper, as the comment in xml configuration snippet describes.

Partition collector’s role is to gather data from each partition to analyzer. There is a separate collector for each thread.

Partition analyzer is to collect data from all partitions and it runs on main thread.  It can also decide on batch status value to be returned.

In order to understand how this works it may be helpful to look at algorithm descriptions in JSR Spec, chapter 11. Important detail that is described here is that for each partition step intermediary data is stored in StepContext. With this knowledge we can create a super simple collector – keeping in mind that we could process the intermediary result here, we just don’ t need to do this:

This collector will be executed for each step with result returned by writer step – you can  find modified KnowledgeResourceWriter below

Then we can write code for super simple analyzer:

KnowledgeResourcePartitionAnalyzer adds all resources to same list that writer step did. It also sets parameters used by next step, that were previously set in writer.

Now when we execute modified example we will see that we have some strange output:

Yikes !

1. We did not tell each partition what data are to be analyzed by it nor we did contain this logic (to decide what data should be processed) in step definition itself.

2. storing resource: KnowledgeResource [title=Hadoop 2 and Hive, published= 2015] is repeated four times ! – this is because:

“The collector is invoked at the conclusion of each checkpoint for chunking type steps and again at the end of partition; it is invoked once at the end of partition for batchlet type steps.”

Back to the lab then

To fix first problem we simply split the file in first step into two files – one for each partition. We will create two files statically (artciles1.txt and articles2.txt). Each partition will read one file – file name will be configured (simplification for demo – in real life application this would be probably a bit more complicated). So we need to change implementation for reading files and configuration.

Important lines in configuration above are:

* <jsl:property name=”fileName” value=”articles1.txt”/> and <jsl:property name=”fileName” value=”articles2.txt”/> – here we configure file name for each partition

* <jsl:property name=”fileName” value=”#{partitionPlan[‘fileName’]}”/> – here we configure file name for reader component, this needs to be done to transfer configured file name from partition plan to step properties (inconvenience ; ) – JSR-352 is young).

Reader component will get fileName injected (cool feature of young JSR)

And it will use this file name to construct resource path:

This fixes first problem.

To fix second problem we need to know if this is end of partition. We can check it in a few ways :

* check if we processed given number of elements, so there would be a need for some service that would monitor all partition processing

* check thread name – partitions are executed on separate thread, poor solution as threads may be pooled so this won’t work

* check if we already processed given element (using some hash)

I haven’t found any API that would tell if partition execution is done and we this is the last call to collector. Maybe this last call is an error in JBeret (JBoss’ implementation of Batch processing for Java platform).

We will try last solution- we could check if we processed chunk in analyzer, but since this is more technical detail (the important thing was to find out why we got duplicated element) we will just check this in data structure for KnowledgeResources – we will simply replace List with Set:

After this changes we get correct number of processed entries 🙂

Take a look at this post on Roberto Cortez blog regarding Java EE 7 batch processing.

Have a nice day ! Code is on github.