| By Parameswaran Seshan | Article Rating: |
|
| October 16, 2008 08:00 PM EDT | Reads: |
5,057 |
After completing the execution of the corresponding activity, each ActivityRunner calls the await() method on the barrier and waits for others to join. Inside the ActivityRunner we have:
class ActivityRunner implements Java.lang.Runnable {
private Activity act; //reference to the activity
private CyclicBarrier barrier; // Reference to the barrier
...
synchronized public void run() {
...
try {
act.execute(); //execute the activity
} finally {
barrier.await(); //join the sync point and wait for others
}
...
Sub-Process Spawn
The "Spawn" activity in a process launches the execution of another process so that the launched process executes independently and the main process moves on to the activity subsequent to the "Spawn" activity, without waiting for the launched process to complete. In WS-BPEL, an equivalent would be to have a process with a <receive> element at the top and no <reply> element at the end of the process, indicating that this process is always invoked asynchronously from the calling process.
We refer to the launched process as a sub-process and the process that is launching it as the main process. In the execute() method of the spawn activity, a new process object instance (similar to the way it's done for a normal process) is created for the sub-process from the sub-process's process definition. Then, values for its input parameters are set. ThreadPoolExecutor and ProcRunner classes are used to run the sub-process independently in a separate thread.
/* extends
runnable */
ProcRunner procRunner = new ProcRunner(subProcess); //extends Runnable
// Spawn the sub process in a new thread.
procPooledExecutor.execute(procRunner);
//end of execution of spawn activity
The spawn activity's execution is over right after the launch of the sub-process and control immediately returns to the main process and moves on to the subsequent activity of the main process.
Process State Storage Shared Update
The state (including context) of the executing process is persisted in the database by the BPMS server at regular logical intervals such as at the start of the process, before the core execution of activities, and after execution of activities. The process parameters (variables) and activity states are captured along with the data items such as timestamps that are relevant for process history. The process state information is used by the BPMS server to faithfully recover/restore and revive a process instance from an exception, or revive a passivated process instance upon the arrival of the message it was waiting for and so on. That way, the process execution can continue from a logically correct (valid) state.
Since a process can involve parallel paths (fork and join) and given that activities executing in parallel can update the same process parameter, whenever the process state is stored it has to be done in such a way that the context is logically consistent across activities. At the time, as the process resumption from this state, the activities receive a consistent set of parameter values in the context. This is ensured in the BPMS server by synchronizing the state storage update across all the parallel activity threads using the CyclicBarrier from the concurrent library, and by making sure the process state is stored only when all the executing parallel activities are in the same state of execution (i.e., either all of them are in the pre-execute stage or all of them are in the post-execute stage). The activities that are in a waiting state (say, user activity waiting for a user's message indicating the action is complete) are not considered for this synchronization as there would be no chance that they update any process parameter; they are currently non-executing.
In the activity pre-execute stage, a CyclicBarrier instance is created with the number of parties specified as equal to the number of activities executing at that time. Also specified in the CyclicBarrier is a common task that the barrier gets executed when all the threads join it and before the threads are released from the barrier. That common task extends the Java.lang.Runnable with a run() method, executing the logic of storing the process state, context, and activity state in the database. Then each activity would supply the activity state information and join the barrier by calling the await() method on the barrier. When the last activity executing joins the barrier, the common task to update the database with process state runs, while all the activity threads remain on hold in the barrier. This is a beautiful feature of the CyclicBarrier, which runs the common task just once and makes sure it's done before the threads are released from the barrier. In the process object, we have the code as shown in Listing 3.
In the Activity pre-execute and post-execute code, we first call createStateBarrier() and then join and wait.
thisprocess.createStateBarrier();
thisprocess.getStateRef().addActivityStateDetails(activityStateDetail);
thisprocess.stateBarrier.await();
At post-execute of activities, a new CyclicBarrier is created similar to what we see above for pre-execute. Ideally all the activity threads executing simultaneously would reach the post-execute stage and join the state update synchronization point to do a synchronized update. However, some activities might have gone into a waiting state (say, a user activity expecting a user action) and so wouldn't be active anymore. Such activities inform the thisprocess object that they are in a waiting state and that they should not be considered for a shared state update, since they can't possibly update the state or bring it into an inconsistent state. When this happens, the thisprocess increments the numOfThreadsWaiting counter, so that when the stateBarrier gets created, it's created for one less thread count. However, if by this time the barrier has already been created, then before waiting such an activity joins the stateBarrier so that the other executing threads can join this barrier in their post-execute and complete the state update.
Semaphores
In the code above, _mutex refers to a Semaphore object. Semaphore is a class provided by the concurrent utility to manage critical sections in the code that need to be made safe in the context of simultaneous multithread access. This class is a counting semaphore and restricts the number of threads that can access some resource in the program.
In the case above, access to the thread counter variables (numOfThreadsWaiting, totalThreadCount) is controlled. At each logical point in the process execution where the process state needs to be stored, a barrier should be created only for the number of threads currently active (not waiting), and since there would be as many threads executing in parallel as there are active activities, the access/update of these counter variables must be made thread-safe.
Semaphore _mutex = new Semaphore(1, true);
We create an instance of the Semaphore class here each time we reach the logical point to store the process state and then use it to lock access to the critical section. The first argument to the Semaphore constructor is given as one indicating that only one thread can acquire the lock and execute the critical section code (mutual exclusion lock) and the other threads trying to acquire it at the same time would simply wait until the lock is released.
Published October 16, 2008 Reads 5,057
Copyright © 2008 SYS-CON Media, Inc. — All Rights Reserved.
Syndicated stories and blog feeds, all rights reserved by the author.
More Stories By Parameswaran Seshan
Parameswaran Seshan is a Senior architect with E-Comm Research Lab, Infosys Technologies Limited, Bangalore, India. He has around 14 years of work experience in the IT industry, involving research, teaching, architecture, and programming. His areas of interest include Process-centric architecture, Intelligent software systems, software architecture, Business Process Management systems, Web services and Java.
- Why SOA Needs Cloud Computing - Part 1
- Cloud Expo and The End of Tech Recession
- The Transition to Cloud Computing: What Does It Mean For You?
- A Rules Engine Built in PowerBuilder
- Sybase Named “Silver Sponsor” of iPhone Developer Summit
- How PowerBuilder Got Its Groove Back
- The Cloud Has Cross-Border Ambitions
- Ulitzer Names The World's 30 Most Influential Virtualization Bloggers
- Ulitzer Named "New Media" Partner of Greatly Anticipated iStrategy Event in Berlin
- Risks and Enterprise Mobility?
- Steps for Success in Enterprise Mobility?
- Are Mobile Luddites Resisting Mobility?
- The Difference Between Web Hosting and Cloud Computing
- Sybase CTO to Speak at 4th International Cloud Computing Expo
- Why SOA Needs Cloud Computing - Part 1
- Cloud Expo and The End of Tech Recession
- The Transition to Cloud Computing: What Does It Mean For You?
- Five Reasons to Choose a Private Cloud
- Seeding The Cloud: The Future of Data Management
- The Threat Behind the Firewall
- Economy Drives Adoption of Virtual Lab Technology
- Tips for Efficient PaaS Application Design
- A Rules Engine Built in PowerBuilder
- Sybase Named “Silver Sponsor” of iPhone Developer Summit
- Where Are RIA Technologies Headed in 2008?
- PowerBuilder History - How Did It Evolve?
- The Top 250 Players in the Cloud Computing Ecosystem
- Custom Common Dialogs Using SetWindowsHookEx
- DDDW Tips and Tricks
- OLE - Extending the Capabilities of PowerBuilder
- DataWindow.NET How To: Data Entry Form
- Book Excerpt: Sybase Adaptive Server Anywhere
- Sybase ASE 12.5 Performance and Tuning
- Working with SOA & Web Services in PowerBuilder
- Office 2003 Toolbar: A New Look For Your Old PowerBuilder App
- Dynamically Creating DataWindow Objects

































