Month: March 2016

Is Reactive similar to Event Driven Architecture ?

21st March 2016 Architecture, Reactive, Software development No comments

Reactive is getting more and more popular. I decided to check it out, started to do some research. A thought came into my mind Рis Reactive related to EDA ? If you check Reactive Manifesto then quick answer would be yes, it is. But it is more complicated than than.

What is Reactive Programming ? According to the manifesto it is a way of building systems so that they have certain characteristics:

  • responsiveness – they respond in timely manner
  • resilience – they are fault tolerant and highly available
  • elastic – I must cite Manifesto here ūüôā “The system stays responsive under varying workload.” You’ll see why
  • message driven – reactive systems use asynchronous messages to propagate information and achieve loose coupling

Some points in the manifesto are more detailed, some less – but it is a manifesto, not a tech spec. So don’t expect too much. Manifesto tells about an approach to design and development¬†of systems though. But it is not an architecture, nor a design pattern. Maybe this Manifesto level of detail is the reason that most of the articles or presentations I came across was about Reactive approach to programming and did not touch architecture design.

Important fact about Reactive Manifesto is that it sets goals but only gives some advice in case of resilience and message orientation. I can’t help to thing about it as a… Manifesto… ūüôā I’ll get back to Reactive in a minute.

EDA (Event Driven Architecture) is well known, proven architecture used to decouple system’s modules, provide better scalability. EDA architecture is achieved when components communicate using events that are carried in messages. Important factor is that event generator, that is component that emits event, does not know anything about consumers. EDA is extremely loosely coupled.

Most common way to propagate events is to use messaging system, but EDA can be done in various ways depending on the system being developed:

  • Single instance (JVM in case of Java) processing can be implemented using CDI events, simple observer pattern implementation, asynchronous invocations.
  • Event processing with¬†several application instances – here we would probably use some¬†messaging technology (MOM), but we are not limited to this:
    • remote method invocations would also do
    • one-way soap web services would do
    • restful web services with asynchronous invocations

Using messaging technology like JMS (HornetQ, ActiveMQ, SonicMQ among others) or AMQP (RabbitMQ, Apache Qpid) will give us a few important benefits:

  • Reliability: both JMS and AMQP provide a few message production and consumption modes. JMS provides persistent and non-persistent delivery, durable subscriptions (subscribers will receive messages even if the weren’t listening at the time of publication), a few ackowledgement modes (allowing or not duplicate messages, exactly once guarantee is available with JTA transacted session and XA may be required).
  • Great scalability: messages can be queued and processed without being overwhelmed even in case of peak of messages a few time greater than normal processing. Message processing components can be scaled independently from other – e.g. number of threads for a given MDB component can be increased, allowing events to be consumed faster.
  • Fault tolerance: both JMS and AMQP can survive message broker crashes

EDA can have one of a few implementation types:

  • simple event processing – like in case of observer pattern implementation, some event is generated by event generator and consumed by observers
  • streaming event processing – here events are routed and processed and can be the source of other events
  • complex event processing – in this case not only current event is analyzed but also past events are taken into analysis scope, with some sophisticated event stream queries (like in Oracle CEP).

So as we can see EDA can be:

  • resilient – if done using appropriate tools that will guranteee delivery, fault tollerance, high availability
  • elastic – this is why I wanted to cite manifesto. If you do EDA using messaging system than you get elasticity. System can and will cope with peaks in events, the messages will get queued possibly spread across cluster of messagign system’s nodes. Even more – using SEDA architecture we can throtle and dynamically control throughput
  • using messaging – we can use messaging and most of the time we do
  • responsive – due to asynchronous nature of EDA system will be responsive. It will require a different style of programming though.

As you can see there are similarities between EDA and Reactive. But as I mentioned before most of the time Reactive is about how to implement details of code and does not touch architecture level. On the other hand EDA is all about architecture – it tells about components and way to connect them so they can interact.

Reactive is more about how to structure your code, get rid of some flow statements, replace pull / imperative style with push streams. Like Introduction to Reactive Programming says Reactive programming is programming with asynchronous data streams. Another nice introduction is Jonathan Worthington presentation. We must differentiate between Reactive Programming and Reactive approach to architecture.  The second one is less common.

This lets me think that EDA can be Reactive and most of the time Reactive is just more about coding your solution using streams, observables or promises.

And btw. doing EDA in simple or streaming strategy¬†is extremely easy in Java ūüėČ

Have a nice day !



Batch processing for Java Platform – partitions and error handling

13th March 2016 Batch, Software development No comments

In previous post I wrote about basic Batch processing for Java Platform capabilities. I tried out partitions and error handling now. Let’s look at error handling first. With chunk style steps have following options:

  • Skipping chunk¬†step when a skippable exception is thrown.
  • Retry a batch step when a retryable exception is thrown. Transaction for current step will be rolled back unless exception is also configured as no-rollback type exception
  • No rollback for given exception class

Processing applies for exceptions thrown from all phases (read, process, write) and checkpoint commit. We can specify what exception classes are to be included in each option – that is, if given exception class is skippable exception. We can also exclude some exception classes and batch processing engine will use nearest class algorithm to choose appropriate strategy to use. If we include exception class A as skippable and this class has two subclasses B and C, and we configure exclude rule for class C then exceptions A and B will cause engine to skip step while exception C will not cause the engine to skip step excecution. If no other strategy is configured then job execution will be marked as FAILED for C.

Let’s see an example

Here we skip step for pl.spiralarchitect.kplan.batch.InvalidResourceFormatException exception. We add code that throws this exception to factory method for KnowledgeResource:

In test file we make one line invalid:

Step for this line will be skipped.

For Batchlet style steps we need to handle exceptions our selves. If exception gets out of step job will be marked as failed.

Another more advanced feature is partitioning of steps. Partitioning is available for both chunk as well as batchlet style steps. Consider example xml below:

In this configuration we specify that there are to be two partitions and two threads are to be used to process them, so one thread for a partition. This configuration can be also specified using a partition mapper, as the comment in xml configuration snippet describes.

Partition collector’s role is to gather data from each partition to analyzer. There is a separate collector for each thread.

Partition analyzer is to collect data from all partitions and it runs on main thread.  It can also decide on batch status value to be returned.

In order to understand how this works it may be helpful to look at algorithm descriptions in JSR Spec, chapter 11. Important detail that is described here is that for each partition step intermediary data is stored in StepContext. With this knowledge we can create a super simple collector – keeping in mind that we could process the intermediary result here, we just don’ t need to do this:

This collector will be executed for each step with result returned by writer step Рyou can  find modified KnowledgeResourceWriter below

Then we can write code for super simple analyzer:

KnowledgeResourcePartitionAnalyzer adds all resources to same list that writer step did. It also sets parameters used by next step, that were previously set in writer.

Now when we execute modified example we will see that we have some strange output:

Yikes !

1. We did not tell each partition what data are to be analyzed by it nor we did contain this logic (to decide what data should be processed) in step definition itself.

2. storing resource: KnowledgeResource [title=Hadoop 2 and Hive, published= 2015] is repeated four times ! Рthis is because:

“The collector is invoked at the conclusion of each checkpoint for chunking type steps and again at the end of partition; it is invoked once at the end of partition for batchlet type steps.”

Back to the lab then

To fix first problem we simply split the file in first step into two files – one for each partition. We will create two files statically (artciles1.txt and articles2.txt). Each partition will read one file – file name will be configured (simplification for demo – in real life application this would be probably a bit more complicated). So we need to change implementation for reading files and configuration.

Important lines in configuration above are:

*¬†<jsl:property name=”fileName” value=”articles1.txt”/> and¬†<jsl:property name=”fileName” value=”articles2.txt”/> – here we configure file name for each partition

*¬†<jsl:property name=”fileName” value=”#{partitionPlan[‘fileName’]}”/> – here we configure file name for reader component, this needs to be done to transfer configured file name from partition plan to step properties (inconvenience ; ) – JSR-352 is young).

Reader component will get fileName injected (cool feature of young JSR)

And it will use this file name to construct resource path:

This fixes first problem.

To fix second problem we need to know if this is end of partition. We can check it in a few ways :

* check if we processed given number of elements, so there would be a need for some service that would monitor all partition processing

* check thread name – partitions are executed on separate thread, poor solution as threads may be pooled so this won’t work

* check if we already processed given element (using some hash)

I haven’t found any API that would tell if partition execution is done and we this is the last call to collector. Maybe this last call is an error in JBeret (JBoss’ implementation of Batch processing for Java platform).

We will try last solution- we could check if we processed chunk in analyzer, but since this is more technical detail (the important thing was to find out why we got duplicated element) we will just check this in data structure for KnowledgeResources – we will simply replace List with Set:

After this changes we get correct number of processed entries ūüôā

Take a look at this post on Roberto Cortez blog regarding Java EE 7 batch processing.

Have a nice day ! Code is on github.


Trying out Java EE Batch

7th March 2016 Batch, Java EE, Software development No comments

I decided to try Java EE Batch, then try Spring Batch and finally Spring For Hadoop with Pig. Java EE Batch is relatively new specification but for a number of use cases it will be sufficient. This sentence says it all – I thing that Java EE Batch is ok. Nothing more yet. As you will see you have to do a lot things yourself and doing other things is just cumbersome.

So what is is Java EE Batch, a.k.a. JSR 352 (Batch Processing for Java Platform) – it is heavily inspired by Spring Batch, Java EE family specification describing framework for batch job processing. It allows to describe jobs in Job Specification Language (JSL) – only XML format is supported. I guess that for batch apps this may be ok. I can live with it ūüėČ

Java EE Batch specified two types of processing:

  • Chunked – here we have 3 phases – reading, processing and writing. Chunked steps can be restarted from some Checkpoint.
  • Batchlet – this is a free form step – do whatever is required.

Java EE Batch has many features like:

  • Routing processing using decision steps
  • Grouping groups of steps into flows
  • Partitioned steps –¬†step instances to be executed in parallel. Partitioned step allows to split job into multiple threads in more technical way. We can specify partition mapper, partition reducer and other partition specific elements. I won’t go into detail on this now.
  • Splits –¬†splits specify flows to be executed in parallel – there can be a few flow definitions. so this is more process or business oriented parallelization¬†of work.
  • Exception handling:
    • Skipping steps
    • Retying steps
  • JobContext to pass batch job data to steps

CDI is used and supported. But with all of these specifying job context parameters is inconvenient (in 2005 I would say that it’s ok ;)), we must do a lot our selves – like moving files, reading them and passing intermediate results.¬†It would be nice to have some utilities to help with standard batch related chores.

Ok let’s see some example – below you can fine a JSL definition for a demo batch job. You can build this XML using Eclipse xml editor with ease (or use some plugins for NetBeans) – just choose XML from schema option and then select XML Catalog and jobXML_1_0.xsd entry:


First we move file from import location to some temporary location so it can be processed without any problem:

next=”resource-router” specified next step to be invoked – there is a possibility for a typo here. So inconvenience.

<jsl:batchlet ref=”initializeResourceImport” /> – specifies a batchlet (free form job) step that will move file. Instance of class for this step will be instantiated as a CDI bean named “initializeResourceImport”:

Here we move file to a workDir location.

Next we start a flow and execute another step type – chunked step. Next step id is given in next attribute.

Each of phases is implemented by a CDI Bean. So reading looks like this:

Open and close stream in this step, w provide checkpoint definition in order to restart from some point. Single invocation of this reader is only reading single line that is passed down to process part of step. It would be easier if Batch framework provided some reading utilities and let us worry about doing important stuff with data than to worry about opening and proper closing of files. But this foundation functionality that is provided now is also required and features that make using Batch Processing for Java Platform will be added in next versions of spec.

Next phase is processing line of data read from file Рonly thing that is done here is conversion from String to KnowledgeResource instance

And finally we can write the data

This is also simple processing in order to have some fun with Java EE Batch framwork without worrying about more job related details. Important parts here is that this phases receives a list of objects that are results of processing phase.

Use of application scoped CDI bean of class¬†KnowledgeResources – this is done this way in order to ease testing. Of course in real life batch job I would not think about using application scoped bean without any special handling – it’s like having state field in a Servlet where requests store data ūüėČ Very bad idea.¬†This job passes a result of processing that will be used by Decider instance to route processing. This simplish demo uses a hard coded value.

After processing data in resource-processor step we proceed to decision step Рpath-decision. This step is also implemented by CDI Bean named pathDecider.

Depending on what value is returned from Decider instance some route will be chosen. Steps that are to be invoked next are designated by id in to attribute. Java Based config would allow here to get rid of some typos.

Again implementation is simplish:

Decider chooses on of two steps (ok, it does choose STORE step every time…):

CDI beans for both steps are similar:

Important note about running test Рwe have to use Statless EJB to start batch because of class loader ordering РArquillian does not see Java EE Batch resources:

So we use BatchImportService to start batch. In real app we would probably use a Claim check like processing in order to pass data to an SFTP server and trigger processing using JMS message or Web Service call. We could also monitor SFTP using some service (Oracle Service Bus for example can monitor SFTP and start processing).

And this is it Рa simple evaluation of Java EE Batch (or Batch Processing for Java Platform) basic features. Next thing would be to add partitioned or split features. In the mean time Рcode is on github.

Apart from batch applications we could use stream or event processing. Chosen architecture depends on requirements for system under construction. If order is important but processing delay is not so important than batch processing is worth to check. If we need to react more quickly, or we want to continue processing data despite processing of some part of this data failed and we want to have high reliability than using messaging may be way to go. But all this is a subject for another article ūüôā

Have a nice day and a lot of fun with Java EE !