SOA & WOA, 2006
In this article I’ll share my experience in using the new features in Java 5 for solving an old industry problem, the problem of collecting constantly published financial data in reliable way. The business case example I’m going to discuss relates to the acquisition of some sort of market data published by a financial data source system like Reuters
Data Source Publishing Conditions
Assume a Data Source allows for a consumer subscription that results in publishing market data messages via a DataFeed Channel. The consumer can listen to the messages, get them from the Event Queue, and process them one at a time.In this article I’ll share my experience in using the new features in Java 5 for solving an old industry problem, the problem of collecting constantly published financial data in reliable way. The business case example I’m going to discuss relates to the acquisition of some sort of market data published by a financial data source system like ReutersData Source Publishing Conditions
Assume a Data Source allows for a consumer subscription that results in publishing market data messages via a DataFeed Channel. The consumer can listen to the messages, get them from the Event Queue, and process them one at a time.
The Data Source usually provides quite a high speed in message publishing. If a consumer processes messages slower than they arrive in the Event Queue, the Data Source can temporary store not-yet- consumed messages in its buffer – Temporary Event Storage. However, if the buffer overflows, the Data Sourcecancels the subscription and loses all sent but not consumed data messages. In such a case, the consumer has to resubscribe to get new data. The basic data flow is shown in the diagram in Figure 1.
Typical Consumer Conditions
In the diagram, the consumer’s Message Receiver operates as a listener to the data messages. To process the message, the Message Receiver creates a Data Processor object that extracts data from the message. Gathering financial information, we’re dealing with highly valuable and important data, i.e., our primarily goal is to save received data as soon as possible. This is why the diagram shows only the data store – the Database – and an optional Messaging System that might be used for publishing notifications about any new data received. Obviously, the format of the data sent out by the Data Source differs in the most of cases from the receiver’s internal format. So, minimal initial data processing in the Data Processor might involve a data format transformation, data persisting and optional notification.
As you’ve probably noticed, the Message Receiver works in a single thread mode. Since data processing includes operations in the database and messaging system, a single threaded mode constitutes high processing risks for the consumer. Those risks include:
- An inability to keep up with the speed of the
arriving data, which leads to
– Temporary Event Storage overflow and
– losing data and the subscription
- Any unpredicted hanging problems in the network and the used resources
- Temporary resource unavailability.
The solution is supposed to minimize these risks.
Designing the Solution
We can minimize the risks if we decouple the data acquisition procedure from the data processing procedure. Moreover, since data processing is a repeatable procedure, we might want to process several messages in parallel. Java 5 offers a great new tool for this task – a ThreadPoolExecutor API and its companions are available in the java.util.concurrent package.
While this article is not a presentation of the ThreadPoolExecutor, we need to mention a few features used in the solution. The ThreadPoolExecutor is not really a pool of objects as a Object Pool pattern is usually understood. In particular, it consists of two major functional parts – a queue of Runnable objects and an execution engine. The latter starts a number of Runnable objects – threads – and keeps this number actual. All threads created are non-daemon threads; they belong to the same thread group and have the same NORM_PRIORITY priority.
The ThreadPoolExecutor maintains the corePoolSize and maximumPoolSize characteristics of the pool. When a new task is submitted and the number of running threads is less than corePoolSize, a new thread is created. As the specification states, “If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full.”
In our solution, we are interested in as much decoupling between two processes as possible. So we have to let the queue grow to avoid denying the new Runnable object submitted by the Message Receiver but we have finite memory resource; we have to process as many tasks (threads) in parallel as possible but we have to manage a number of concurrent threads in the system since they’re quite delicate resources. Such conflicting requirements may be mostly satisfied if we use a queue of fixed size but big enough to have time to catch consistent queue growth and compensate it. At the same time, the maximumPoolSize has to be set to the value, which still allows its increase in reasonable boundaries (via explicit management actions at runtime). The latter may help slow down the queue growing or even stabilize it.
Plus we can engage multiple subscribers (Message Receivers) and split the incoming messages into a number of sub-flows. Each sub-flow of data acquisition might be organized as described above. One possible alternative solution is to use the so-called Reliable Messaging technique and temporary store received data in the messaging infrastructure until it can be permanently persisted.
Finally, the ThreadPoolExecutor provides an API for getting some basic pool state statistics, such as:
- current pool size
- number of completed tasks
- number of active tasks
- total amount of tasks for its lifecycle
- largest size of the pool (queue) for its lifecycle
If your application uses another Java 5 feature – JMX technology – you can easily monitor the state of the pool, i.e., the dynamics of your task processing and, in some cases, manage the pool configuration “on the fly.”
The diagram in Figure 2 refines the design represented in Figure 1 by using the ThreadPoolExecutor. In particular, while the Message Receiver retrieves messages from the EventQueue sequentially it wraps them by Runnable objects – the Data Transformers– and passes them to the ThreadPoolExecutor instead of processing them immediately.
The ThreadPoolExecutor operates on the Runnable Data Transformers, i.e., it executes a corePoolSize number of threads and queues all extras. Each Data Transformer invokes a Data Processor to massage data messages as mentioned in the Typical Consumer Conditions section. Thus, the Message Receiver and Data Processor are totally decoupled and can work at their own pace.
An additional question concerns the Data Processor: is it shared between concurrent threads or does it run in multiple instances, one per thread. If Data Processor were just massaged data, I would design it in a thread-saved manner and share it between the threads. However, in our case, the Data Processor invokes two external resources and we have to consider a certain policy for resource connections. If a connection is shared, we develop an extra risk of serializing Data Processor requests for the connection, a risk of performance degradation as well as a risk of connection failure that affects all waiting Data Processor threads. A better, less risky design would be if we decoupled the Data Processor from actual connections via a Connection Pool(s). In this case, the connections can be reused but not shared and, once again, resource access becomes more transparent and suitable for our management.
The extra features shown in Figure 2 are a Management Component and a Task Execution Monitor. The Management Component is a standard JMX MBean that registers with a JMX server and can accept its commands to change the ThreadPoolExecutor configuration at runtime. The Task Execution Monitor thread periodically reads the pool’s state (statistics) and uses the Management Component to broadcast them to all who might be interested, in particular, to the same JMX server. If the JMX server is equipped with an administration console and/or mechanisms that can send out different types of notification messages, your Operation Tem may be able to monitor the data collection process constantly.
We’ve demonstrated how Java 5’s ThreadPoolExecutor can be used to decouple data acquisition from the data manipulation processes. Features in the ThreadPoolExecutorcan mitigate the data processing risks, improve performance, and increase scalability in data processing. Another Java 5 feature – JMX technology – helps in monitoring and managing the ThreadPoolExecutor component in real-time.
· Class ThreadPoolExecutor,http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ThreadPoolExecutor.html.
· Java Management Extensions (JMX),http://java.sun.com/products/JavaManagement/index.jsp. · Michael Poulin. “Assured Delivery of Audit Data with SOA and Web Services.” WLDJ, Volume 4 Issue 6. http://jdj.sys-con.com/read/169336.htm