Logo Digital Solution Architecture - the new type of Software Architecture!

How Process Modeling can help you write better structured code (part 3) - receive asynchronous messages

Welcome to the third part of our implementation series of a BPMN model in (Java) code. We came a long way, but we need to deal with asynchronous message receives now. Yes, still no workflow engine... Our main objective to have easily comprehensible and well-structured code. But be aware that this series' example is concerned with a short-lived process without recoverability!

If you haven't read the previous parts, please read them first: Part 1, Part 2

What is a Message Event, anyway?

BPMN has lots of different elements as we see in the example approval process. We went through all of the elements in the previous parts except for the so-called "intermediate message catch event." That's a lengthy name and signals complexity although it is an easily understood concept: BPMN offers messages for communication between different processes. In contrast to the "normal" process-flow, which is indicated by solid edges called sequence-flows, a message-flow is denoted by a dashed line. The dashed line appears to be weaker than a solid line and this is a perfect representation of reliablity: Usually, we have (more) control of our internal structures than of other participants (be it organizations or persons in real life or software in the virtual world.)

If we (= our process) want to receive messages, we can use message events, which come in different types (see the figure above this paragraph). The ones used to receive messages are called catching message events; they are represented with a white envelope. If we want to send a message, we can use a throwing message event, which is denoted by a black envelope instead. Depending on the position in a model, the event is starting a new process instance or let a running process instance wait for the message to arrive. The former is a start event, which is denoted by a circle with a single line, the latter is an intermediate event, which is denoted by a circle with a double line. To add some variety (not in a good sense), BPMN also has receive tasks, which have the same semantics as an intermediate message catch event, and send tasks, which have the same semantics as an intermediate message throw event.

But how do we best "catch" the message in our Java code? The answer is not trivial because this time multiple design options exist that satisfy different quality attributes. Usually, at this point, most developers will split the business process in independent parts because they will attach the logic that is triggered by receiving the message into an own method with its own call stack originating from the endpoint handler. For example, a REST request is received, the endpoint would then (perhaps asynchronously) execute a method that contains the then to be executed part of the process. However, with this approach the logical boundary of the process is lost: Its implementation is scattered in different methods and often classes. In the example it gets even worse: Because the process waits for multiple messages to return before proceeding with the process, we need another way to determine whether all messages have been received. Our nice trick to use Executor.invokeAll for synchronizing all subprocesses cannot be used anymore and custom and error prone join logic often follows...

But there is another option! Wondered why I stressed that this is a short-running and non-recoverable process since the first part of this article series? That's because this allows us to let our active thread wait for the message to be received. By doing that we can improve the clarity of our code but on the cost of some additional resource usage by having waiting threads. We can tunnel the message into our current process thread by creating a new method that is called by an endpoint to send the message to our process object. However, we do not execute any logic there but wake up the process thread! Waking up the process thread can be done in many different ways: Older versions of Java had to use the wait()/interrupt() mechanism but since Java 8 there are plentiful alternatives. For this use case I would recommend a CountDownLatch, which essentially is a counter that is decremented. If it reaches zero all waiting threads can continue.

Our Updated Code Example

With this implementation idea, we extend our example. Already existing code from previous parts is faded out to make the additions better visible:


public class ApproverProcess {

	private static final ExecutorService APPROVER_PROCESS_EXECUTOR = Executors.newFixedThreadPool(10);
	
	public void approvalDecisionsDone(List<ApprovalResult> approvalResults) {
		List<BatchTransfer> sortedBatchTransfers = prepareTransfers(approvalResults);
		transferSecurities(sortedBatchTransfers);
		proceedWithApprovedBusinessCases(approvalResults, sortedBatchTransfers);
	}
	
	private List<BatchTransfer> prepareTransfers(List<ApprovalResult> approvalResults) {
		return null;
	}
	
	private void transferSecurities(List<BatchTransfer> sortedBatchTransfers) {
		List<TransferSecuritiesProcess> subprocesses = new ArrayList<>();
		for(BatchTransfer bt : sortedBatchTransfers) {
			subprocesses.add(new TransferSecuritiesProcess(bt));
		}
		
		try {
			APPROVER_PROCESS_EXECUTOR.invokeAll(subprocesses);
		} catch (InterruptedException e) {
			throw new RuntimeException(e);
		}
	}
	
	private void proceedWithApprovedBusinessCases(List<ApprovalResult> approvalResults,
			List<BatchTransfer> sortedBatchTransfers) {
	}

}

class TransferSecuritiesProcess implements Callable<Void> {

	private BatchTransfer batchTransfer;
	private PositionKeepingService positionKeeping;
	private CountDownLatch messageLatch = new CountDownLatch(1);
	private TransferCompletedMessage message;
	
	public TransferSecuritiesProcess(BatchTransfer bt) {
		this.batchTransfer = bt;
	}

	@Override
	public Void call() throws Exception {
		positionKeeping.transferBulk(batchTransfer);
		
		TransferCompletedMessage message = waitForTransferCompleted();
		
		// we can now deal with message
		
		return null;
	}
	
	private TransferCompletedMessage waitForTransferCompleted() throws InterruptedException {
		messageLatch.await();
		return this.message;
	}

	public void messageReceived(TransferCompletedMessage message) {
		this.message = message;
		messageLatch.countDown();
	}
}

Let's walk through the changes:

  • After calling the position keeping service in line 46, we now wait for the callback message signaling the success/error in line 47. We added a new method to give the technical code a name carrying its business intention: waitForTransferCompleted that resembles the naming in the BPMN diagram.
  • In waitForTransferCompleted we wait for the CountDownLatch to let us proceed in line 57. The latch is a throw-away object that cannot be reset and has been initialized to free our thread after it has been decremented once (see parameter 1 in line 38).
  • The endpoint, which will bridge the external API protocol and initiale receive the callback (not shown here) will call messageReceived. Consequently, this method is run in a different thread than the call and waitForTransferCompleted methods. Its only purpose is to store the message and signal the other thread to proceed by decrementing the latch.

Summary and Outlook

We now have a simple, easy, and understandable implementation of a short running business process in Java. Wherever possible we used business terminology and small methods. I think that everyone will find it easy to map the code to the BPMN diagram and vice versa.

However, we are not done in this series yet. Have you realized that in this example things could have been simplified further by applying a Microservice API Pattern? And that some knowledge of and experience in BPMN could have helped you to find this easily? If you are interested, wait for the next part of this series, where I explain this. Also, we now have a thread waiting indefinitely for a message. Not a good idea. We will also solve this in a future article in this series! In order not to miss following episodes and other articles, please consider subscribing for article updates below - do not miss any interesting content!

Subscribe to get notified for new articles!

There are more articles to come. If you want to stay informed, please leave your email address to get notified!

Daniel Lübke

Dr.-Ing. Daniel Lübke is a Digital Solution Architect, who enjoys realizing high-quality business processes in software. He has over 10 years experience in architecture of distributed systems (from SOA to Microservices, BPM and workflows). Daniel likes to find better than "state of the art" solutions by combining methods from Software Engineering and BPM, in addition to researching promising, uncommon solutions. He is book author, editor, and speaker at conferences, and has published many articles in different magazines and journals.