This project has retired. For details please refer to its Attic page.
Apache ODE – JaCOb – Virtual Processing Unit

JaCOb – Virtual Processing Unit

Introduction

ODE's BPEL implementation relies on the JaCOb (Java Concurrent Objects) framework to implement the BPEL constructs. The framework provides the mechanism necessary to deal with two key issues in implementing BPEL constructs:

  1. Persistence of execution state.
  2. Concurrency.

By rolling up these concerns in the framework, the implementation of the BPEL constructs can be simpler by limiting itself to implementing the BPEL logic and not the infrastructure necessary to support it.

The approach we'll take in this tutorial is looking at the rationale of JaCOb and its concepts first. Then we'll illustrate with one complete example. But if you're a reverse reader, you can also decide to start with the example.

Rationale behind the model

Let's start from the most classical example of all:

void process(order) {
  billingService.bill(order.billing);
  shippingService.ship(order.product, order.shipping, self);
  shipping = receive("shipping")
  order.customer.send(shipping.details);
}

It's Java like pseudo code that works extremely well, unless:

  • You fail somewhere in the middle and the customer has to order a second time.
  • Your have too many waiting threads and the VM crashes.

So you change it to:

void process(order) {
  billingService.bill(order.billing);
  shippingService.ship(order.product, order.shipping, self);
  listenOn("shipping", part2);
}

void part2(shipping) {
  order.customer.send(shipping.details);
}

That's almost better, but still has a lot of points of failure, where you're not sure if you actually billed the customer and shipped the product or not. So:

void process(order) {
  billingService.bill(order.billing);
  continue("part2");
}

void part2() {
  shippingService.ship(order.product, order.shipping, self);
  listenOn("shipping", part2);
}

void part2(shipping) {
  order.customer.send(shipping.details);
}

You're just fracturing the code with two primitives: continue which lets you persist state and go to the next one, and listenOn which lets you persist state and wait for an external event.

Fracturing the code also addresses concurrency of execution even if you only have one thread. For example you could have a large number of 'process' calls to bill and ship an order. As we've broken down the whole treatment into several small parts, we can control when these separate parts actually get called and executed. And we're free to order them.

Let's say we have a small process doing in a sequence:

  1. Invoke
  2. Receive
  3. Wait
  4. Invoke

If we have 2 parrallel executions of this process and implement it in a simple way we'd have:

  1. Invoke1
  2. Receive1
  3. Wait1
  4. Invoke1
  5. Invoke2
  6. Receive2
  7. Wait2
  8. Invoke2

However if we break down the code as shown above by introducing a "middle man" (or stack) and do not allow activities to directly call each other (or do not have the activity directly calling the sequence that directly calls the next activity) we could well obtain the following:

  1. Invoke1
  2. Invoke2
  3. Receive1
  4. Wait1
  5. Receive2
  6. Wait2
  7. Invoke1
  8. Invoke2

From a client standpoint, we've achieved concurrency of execution even with one thread.

Next step is adding links, fault handling/termination, compensation and event handlers and seeing that continue/listenOn is all you need. The last step is just adding implementation details.

JaCOb Example

Consider the issue of persistence. Imagine a simplified and naive implementation of the BPEL constructs <sequence>, <wait>, and <empty>:

class Sequence extends Activity {
  /** From BPEL Definition */
  OSequence def;
  ...
  void run() {
     for (OActivity child:def.children)
       createActivity(child).run();
  }
}

class Wait extends Activity {
  /** From BPEL Definition */
  OWait def;
  ...
  void run() {
     Thread.wait(def.duration);
  }
}

class Empty extends Activity {
  /** From BPEL Definition */
  OEmpty def;
  ...
  void run() {
     // <empty> activity: do nothing.
  }
}

The above is the "natural" implementation of the constructs in the Java language. However, this implementation has some serious and obvious problems: the <wait> may specify a duration of days in which case a system failure during those days of waiting would mean that the process execution state would be lost. It would be nice to replace Thread.wait() in the Wait class with some sort of "suspend" operator that would save the execution state to disk.

However there are practical issues to "suspending" to disk. At the point we wish to suspend, the call stack looks like: :::java Sequence.run() Wait.run()

In order to save the process to disk, we need to end the current thread of control which means popping both stack frames. To do this we have no choice but to require the implementation of Wait and Sequence to "cooperate" with this requirement thereby greatly complicating the implementation of those constructs. This also means that the "natural" implementation model cannot be used directly.

JaCOb aims to solve this problem by providing an alternate "natural" model that allows execution state to be suspended ''without cooperation from the implementation classes''. The idea in JaCOb is to flatten the call stack and rely on explicit communication channels to handle control flow. We now consider a simplified JaCOb representation of our three BPEL activities:

class Empty  {
  OEmpty def;

  /** channel we use to notify parent we are done. */
  CompletionChannel myCompletionChannel;
  ...
  void run() {
     // <empty> activity: do nothing...except to
     // notify our parent that we are done.
     myCompletionChannel.completed();
  }
}

class Sequence  {
  OSequence def;
  CompletionChannel myCompletionChannel;
  ...
  void run() {
    // Start of by instantiating a sequential child runner for the first
    // (0th) child...
    instance(new SequenceChildRunner(0))
  }

  class SequenceChildRunner {
    int currentchild;
    SequenceChildRunner(int childNumber) { currentchild = childNumber; }
    void run() {
       if(currentChild == def.children.size()) {
         // We are past the last child, the sequence is done.
         myCompletionChannel.completed();
       } else {
         // We still have more children to run..

         // Create a completion channel for our child.
         CompletionChannel childCompletionChannel = 
            newChannel(CompletionChannel.class);

         // create a child activity based on the activity type
         // and parameterized with the model and the completion
         // channel we just created
         Activity childActivity = 
            createActivity(def.children.get(currentChild),
                           childCompletionChannel);

         // instantiate the child activity
         instance(childActivity);

         // create an object to wait for the "completed()" notification
         // from the child activity.
         object(new CompletionChannelML(childCompletionChannel)) {
            void completed() {
               // Ok, finished with the child, create a runner
               // to do the next child.
               instance(new SequenceChildRunner(currentChild+1));
            }
         }
       }
    }
  }
}

class Wait extends Activity {
  OWait def;
  CompletionChannel myCompletionChannel;

  ...
  void run() {
     // Create a channel for an externally managed alarm.
     TimerChannel timerChannel = newChannel(TimerChannel.class);
     // register the alarm with the runtime.
     getRuntimeContext().registerTimer(timerChannel, def.duration);

     // create an object to wait for the alarm and then send an
     // activity completed message to our parent.
     object(new TimerChannelML(timerChannel) {
        onTimer() {
           myCompletionChannel.completed();
        }
     });

  }
}

So JaCOb constructs help us in breaking the execution stack.

Main JaCOb Concepts

Channels

As briefly demonstrated above, channels are interfaces used for communication between activities in ODE engine. There are several types of channels like TerminationChannel, ParentScopeChannel or CompensationChannel (their respective purpose should be obvious from their name). Some basic channels are provided to all activities when they're created to allow them to interact with their environment. When an activity wants to notifies its parent that it has terminated for example, it just calls its parent TerminationChannel (see the Empty example above).

Don't look for channels implementations because there are none. Channels implementation is provided through a dynamic proxy (see ChannelFactory.createChannel() and ChannelFactory.ChannelInvocationHandler for more). That's one of the levels of decoupling between invocation and actual execution in JaCOb.

JacobObject / JacobRunnable

If you don't care much about the details, the bottom line is: a JacobObject and an JacobRunnable are just a method implementation. This method gets executed when the abstraction is executed.

A JacobObject is meant to be a closure. From Wikipedia: "A closure combines the code of a function with a special lexical environment bound to that function (scope). Closure lexical variables differ from global variables in that they do not occupy the global variable namespace. They differ from object oriented object variables in that they are bound to functions, not objects.". Normally closures aren't supported in Java so JacobObject tries to feel that gap. But it's not a true closure anyway, which makes thing easier. Closures in JaCOb are statically coded, whereas in most languages supporting closures these are dynamic. So basically in JaCOb, a closure is expected to implement some methods and provides other utility methods to manipulate channels and replicate itself.

JacobRunnable is just a JacobObject that requires the implementation of only one method: run(). As ''all activities inherit from JacobRunnable they're all supposed to implement their main processing in this run() method. Their initialization occur in their respective constructors.

Method Lists (MLs)

ML classes can be seen as the other end of a channel. Only they're not invoked directly when one calls a channel method, but only once the JaCOb engine has popped the channel invocation from its internal stack (again you can see how the execution stack gets broken here).

Usually MLs implementations in ODE are inlined because it's just easier to declare them in the activities run() method. For example if you look at the Sequence example shown above you'll see something like:

void run() {
...
     // create an object to wait for the "completed()" notification
     // from the child activity.
     object(new CompletionChannelML(childCompletionChannel)) {
        void completed() {
           // Ok, finished with the child, create a runner
           // to do the next child.
           instance(new SequenceChildRunner(currentChild+1));
        }
     }
   }
}

The object method here is inherited from !JacobObject and is just a way to hand our ML to JaCOb. So that the JaCOb runtime can match it with an incoming channel message later on.

VPU and ExecutionQueue

The VPU is where all the JaCOb processing is occuring. When a JacobObject is injected inside the VPU, it's actually registered as a Continuation, which is just wrapping the JacobObject with the method to call on the JacobObject to execute it (in our case always the run() method as we're only dealing with JacobRunnable instances).

The ExecutionQueue (and its implementation FastExecutionQueueImpl) is just a container for all the artifacts managed by the VPU (mostly channels and reactions) to organize them in queues where artifacts can be pushed and popped. It also records some execution statistics.

So the VPU main processing is just dequeuing a reaction from the soup and executing it by calling its abstraction's run() method (remember that the reaction just wraps an abstraction). That's all (check JacobVPU.execute(), you'll see that I'm not lying). However when the JacobRunnable (usually an activity) gets executed the following things can happen:

  • if other abstractions (usually other activities) are created, they will be appended to the reaction queue,
  • if new channels are created, they will be saved for later usage,
  • if channels are invoked, the message will be saved to match against a new ML,
  • if a new ML instance is created, it will be submitted to the VPU that will try to match it against a channel invocation.

The VPU is also responsible for persisting its internal state. So when an execution stops (for example our process has reach a receive) the VPU state is serialized and saved for later reuse. This logic can be seen in RuntimeContextImpl.execute().

There's one more thing that should be mentioned here. Continuations (and hence JacobRunnables) don't "stay" in the VPU queues. They just get popped, executed and that's it. So if an abstraction must last more than one execution, it should simply fork itself. This explains why in our Sequence example already pasted above we see the line:

instance(new SequenceChildRunner(currentChild+1));

This simple adds a new !ChildRunner that will monitor the next child completion. If you browse ODE's activities code you will even find things like instance(this) which directly enqueues a new instance of the same JaCOb abstraction.

Walking through examples

While

<process name="while1" 
    targetNamespace="http://ode/bpel/unit-test" 
    xmlns:bpws="http://schemas.xmlsoap.org/ws/2003/03/business-process/"
    xmlns="http://schemas.xmlsoap.org/ws/2003/03/business-process/"
    xmlns:tns="http://ode/bpel/unit-test"
    xmlns:xsd="http://www.w3.org/2001/XMLSchema"
    xmlns:test="http://ode/bpel/unit-test.wsdl" 
    suppressJoinFailure="yes">
   <partnerLinks>
      <partnerLink name="testPartnerLink" 
         partnerLinkType="test:TestPartnerLinkType" 
         myRole="me" />
   </partnerLinks>

   <variables>
     <variable name="var1" messageType="test:TestMessage2"/>
   </variables>

   <sequence>
       <receive 
          createInstance="yes"
          name="startReceive"
          partnerLink="testPartnerLink"
          portType="test:TestPortType"
          operation="testOperation"
          variable="var1"/>
      <while condition="bpws:getVariableData('var1', 'TestPart') &lt; 10">
        <assign>
           <copy>
             <from expression="bpws:getVariableData('var1', 'TestPart') + 1"/>
             <to variable="var1" part="TestPart"/>
          </copy>
        </assign>
       </while>
       <reply name="endReply" 
              operation="testOperation" 
              partnerLink="testPartnerLink"
              portType="test:TestPortType" 
              variable="var1"/>
   </sequence>
</process>

Everything starts with a receive. So our entry point here in our JaCOb-focused discussion is going to be BpelProcess.PartnerLinkMyRoleImpl.inputMsgRcvd(). The code that matters to us now is the following (executed when a message is targeted at a createInstance receive):

BpelRuntimeContextImpl instance = createRuntimeContext(newInstance, new PROCESS(_oprocess), messageExchange);
...
// run the vpu
instance.execute();

If you check the code executed by !BpelRuntimeContextImpl constructor you'll see among other things the following:

if (PROCESS != null) {
   vpu.inject(PROCESS);
}

The process itself get injected. When executed, PROCESS just instantiates a scope to control the execution of its child activity and starts listening on compensation and completion channel. From the process we go to a scope, then our main sequence and finally our receive.

Receives are just mapped to a pick onMessage so its JaCOb implementation should be looked for in PICK. The PICK is just about isolating the right correlations and selecting a message for it, then waiting for the message. In our createInstance case we'll be more interested in the following code, located in BpelRuntimeContextImpl.select() (and called by PICK):

if (_instantiatingMessageExchange != null && _dao.getState() == ProcessState.STATE_READY) {
   for (int i = 0 ; i < correlators.size(); ++i) {
     CorrelatorDAO ci = correlators.get(i);
     if (ci.equals(_dao.getInstantiatingCorrelator())) {
       inputMsgMatch(pickResponseChannelStr, i, _instantiatingMessageExchange);
       return;
     }
   }
}

Which just happens to call something like:

vpu.inject(new JacobRunnable() {
   public void run() {
      PickResponseChannel responseChannel = importChannel(responsechannel, PickResponseChannel.class);
      responseChannel.onRequestRcvd(idx, mex);
   }
});

That's where things really start. When injected, this abstraction just calls the response channel for our receive. The other side of this channel is implemented as a ML in the PICK:

object(false, new PickResponseML(_pickResponseChannel) {
    public void onRequestRcvd(int selectorIdx, Object msgex) {
        ...
      ActivityInfo child = new ActivityInfo(genMonotonic(), onMessage.activity, _self.self, _self.parent);
      instance(createChild(child,_scopeFrame,_linkFrame));
    }
});

This method just does what a receive needs to do (like variable and correlation initialization) and creates a new child. When dealing with a real pick, this child would be the onMessage activity, however in the case of a receive, this is an empty activity. So when does our receive completes? Well, when the child completes. As you can see on the child constructor, we're passing the same ParentScopeML that we've been provided. So when the child completes, the receive's parent is notified which means to our receive doesn't need to do it itself. And an empty immediately completes:

_self.parent.completed(null, CompensationHandler.emptySet());

The parent sequence gets notified almost immediately after the onRequestRcvd() methods finishes.

Now how does our sequence gets the control back? Well, once again, let's look at the ML, the other side of the channel. As one of the most important job of the VPU is matching channels invocations and MLs, we'll get to the sequence by its ParentScopeML implementation:

class SEQUENCE extends ACTIVITY {
  ...
  private class ACTIVE extends BpelJacobRunnable {
    ....
    public void run() {
      ...
      object(new ParentScopeML(_child.parent) {
        public void compensate(OScope scope, SynchChannel ret) {
          _self.parent.compensate(scope,ret);
          instance(ACTIVE.this);
        }

        public void completed(FaultData faultData, Set<CompensationHandler> compensations) {
          HashSet<CompensationHandler> comps = new HashSet<CompensationHandler>(_compensations);
          comps.addAll(compensations);
          if (faultData != null || _terminateRequested || _remaining.size() <= 1) {
            _self.parent.completed(faultData, comps);
          } else /* !fault && ! terminateRequested && !remaining.isEmpty */ {
            ArrayList<OActivity> remaining = new ArrayList<OActivity>(_remaining);
            remaining.remove(0);
            instance(new SEQUENCE(_self, _scopeFrame, _linkFrame, remaining, comps));
          }
        }
      }));
    }
  }
  ...
}

The method that will get executed is of course the completed() method. It simply completes if a fault has been thrown, a termination has been requested and if no child activities remain. Being of an optimistic nature, we'll check what happens when everything goes just fine. In this second case a remaining activity is removed and the SEQUENCE abstraction itself is reinstantiated. Which leads us to what the SEQUENCE does:

public void run() {
   final ActivityInfo child = new  ActivityInfo(genMonotonic(),
          _remaining.get(0),
          newChannel(TerminationChannel.class), newChannel(ParentScopeChannel.class));
   instance(createChild(child, _scopeFrame, _linkFrame));
   instance(new ACTIVE(child));
}

As you can see, it just instantiates the next child abstraction and also another abstraction named ACTIVE. So what's this ACTIVE that we've already seen a bit before? Well, it's just the abstraction that keeps on following child activities when they execute. It's more like a convention on all containment-based activity in ODE (while, sequence, pick, ...) that the main activity abstraction just kicks off the processing. Then an ACTIVE (also sometimes called WAITER) abstraction takes care of following the children.

Continuing to the next step, we've just instantiated an abstraction for the while in our example process, as it's the next child of the sequence. So what happens there?

public void run() {
    boolean condResult = false;
    try {
        condResult = checkCondition();
    } catch (FaultException fe) {
        _self.parent.completed(createFault(fe.getQName(), _self.o),_compHandlers);
        return;
    }
    if (condResult) {
        ActivityInfo child = new ActivityInfo(genMonotonic(),
              getOWhile().activity,
              newChannel(TerminationChannel.class), newChannel(ParentScopeChannel.class));
        instance(createChild(child, _scopeFrame, _linkFrame));
        instance(new WAITER(child));
    } else /* stop. */ {
        _self.parent.completed(null, _compHandlers);
    }
}

Now you should be getting more familiar with that sort of code. First step is evaluating the while condition. If it turns out it's true, then a child abstraction gets created as well as a WAITER to follow its execution. The WAITER implementation is again pretty straightforward:

private class WAITER extends BpelJacobRunnable {
    private ActivityInfo _child;
    private boolean _terminated;

    WAITER(ActivityInfo child) {
        _child = child;
    }

    public void run() {
        object(false, new TerminationML(_self.self) {
            public void terminate() {
                _terminated = true;
                _child.self.terminate();
                instance(WAITER.this);
              }
        }.or(new ParentScopeML(_child.parent) {
            public void compensate(OScope scope, SynchChannel ret) {
                _self.parent.compensate(scope,ret);
                instance(WAITER.this);
            }
            public void completed(FaultData faultData, Set<CompensationHandler> compensations) {
                _compHandlers.addAll(compensations);
                if (_terminated || faultData != null)
                    _self.parent.completed(faultData, compensations);
                else
                    instance(WHILE.this);
            }
        }));
    }
}

Termination and compensation aren't doing anything really interesting. Completion, just like for the sequence, re-instantiates the WHILE abstraction. And that's how we get our loop, by re-instantiating the main WHILE abstraction (again evaluating the condition and creating a child if it's true).

Finally, when the while condition becomes false, it notifies its parent channel. The sequence then goes to our last activity: reply. As expected, the reply replies, just sending the variable content and notifying its parent for completion. The sequence has no more children to execute so it also notifies its own parent, which is the process. We then just declare the process to be completed and that's it! We're done!