Orchestrating complex workflows and managing data dependencies between tasks can be a challenging and time-consuming task. Something like this would take a lot of time and resources to build and could very well be a big hassle to maintain. That’s where AWS Step Functions come to the rescue. AWS Step Functions provide a low-code visual workflow service that simplifies this process. In this blog post, we will explore how to achieve partially dependent parallel flows in AWS Step Functions, enabling you to pass data between parallel states during their runtime efficiently.

AWS Step Functions

AWS Step Functions is a low-code powerful workflow service that offers a visual representation of application workflows, integration with various AWS services, and real-time error detection. 

By leveraging Step Functions, you can create workflows with a sequence of steps where the output of one step becomes the input for the next. With over 220 AWS service integrations available through AWS SDK integration tasks, you can call AWS SDK actions directly without writing additional code. Additionally, Step Functions can be triggered on specific events or timers using EventBridge Scheduler, adding flexibility and automation to your workflows.

Parallel Workflows

One feature that could especially be useful is Parallel Workflows. Parallel Workflows provide a way to run multiple steps that run concurrently and can, if wanted, wait for all tasks to complete before continuing. You can also choose which output you want at the end of the parallel execution.

This all sounds great, but a certain caveat might not be noticeable at first.
What if you want to pass data between parallel states during their runtime?

The challenge

Let’s first talk about why one would want to pass data between parallel states at runtime. For example, let’s imagine that you have the following situation:

I want to note that Glue Jobs and Step functions are used as an example here, any Step Function supported service could be used instead of them, both here and in the rest of the text.

If the two-step functions need all three Glue jobs to complete, you could put these step function runs at the end of the parallel workflow and then run them, parallel again.


But what if one of the Glue jobs has a very short duration and the other one has a rather long one? It wouldn’t be great if you had to wait for all of them to complete, only to have the possibly lengthy sub-step functions run when one of them could’ve been run a long time ago. This may seem like it’s not a big deal at first, but as with anything, when put at scale, this could turn out to be a major bottleneck.

Wouldn’t it be great if you could say, I want Step Function 1 to be dependent on Glue job 1 and Glue job 3, but not Glue job 2, or Step Function 2 to be dependent on Glue job 2 and Glue job 3, but not Glue job 1?

The first thing that we want to address is: What if I could tell step function 1 to wait for both Glue job 1 and Glue job 3 executions to be finished?

Well, Step functions don’t support this sort of dependency relationship, so we need to create our own workaround for this.

Achieving dependency in parallel flows

The first thing we need to create is a poller that will periodically check for Glue job 3 completion. 

Even though this may seem overwhelming initially, it is fairly simple. We have a timer which calls Get Job Status, which then checks if the desired job is completed or not, and depending on the outcome, either starts the step function or returns to the timer and calls Get Job Status again. If Glue job 1 finishes before Glue job 2, we are going to wait for Glue job 2 to finish. On the other hand, if Glue job 2 finishes before Glue job 1, Get Job run will be run only once and there will be no need for the polling procedure. 

This poller approach is relatively common in step functions, as you often need to do a certain action periodically. It is even covered in the official documentation of AWS.

Now the only thing that remains is how do we check for completion of the Glue job, or better yet, what should our Get Job Status be?

Well, if we look at the documentation, it says that Get Job Status should be an API call, and what better way to do fast repeatable API calls than Lambda functions

Okay, now all that’s left to do is write the actual Lambda function.

We first need to import the boto3 client and pass along the desired job name. We can then use GetJobRuns to get all runs of that Glue Job. This could present an issue as we can’t always be sure if the latest successful run is from this step function run or if the Glue job hasn’t yet been started at all, which could easily be the case if we had another Glue Job before Glue Job 3.

The solution for this is to pass along the Step Function started on timestamp, as well as the frequency at which this Step function is being run, by, for example, an Eventbridge scheduler.

Taking all this into consideration, we end up with:

import json
import boto3
from datetime import datetime, timedelta

sfnDateFormat = '%Y-%m-%dT%H:%M:%S.%f%z'

def isWithinLatestSFNRun(sfnStartedOn: datetime, jobStartedOn: datetime, triggerOffset: int) -> bool:
    rounded = sfnStartedOn - timedelta(
        minutes=sfnStartedOn.minute % triggerOffset, 
        seconds=sfnStartedOn.second, 
        microseconds=sfnStartedOn.microsecond
    )
    
    print(f"Rounded: {rounded}")
    return jobStartedOn >= rounded

def lambda_handler(event, context):
    client = boto3.client('glue')

    attempt = event['attempt'] + 1
    jobName = event['job']['name']
    sfnStartedOn = datetime.strptime(event['sfn']['startedOn'], sfnDateFormat)
    sfnTriggerOffset = event['sfn']['triggerOffset']
    
    print(f"SFN Started on: {sfnStartedOn} \n SFN Trigger Offset: {sfnTriggerOffset}")
    
    paginator = client.get_paginator('get_job_runs')
    response_iterator = paginator.paginate(
        JobName=jobName,
        PaginationConfig={
            'MaxItems': 3
        }
    )
    
    for jobRuns in response_iterator:
        for jobRun in jobRuns['JobRuns']:
            StartedOn, JobRunState = jobRun['StartedOn'], jobRun['JobRunState']

        
            if isWithinLatestSFNRun(sfnStartedOn, StartedOn, sfnTriggerOffset) and JobRunState == 'SUCCEEDED':
                return {
                    'attempt': attempt,
                    'statusCode': 200,
                    'status': 'DONE'
                }
    
    return {
        'attempt': attempt,
        'statusCode': 200,
        'status': 'WAITING'
    }




Conclusion

Every approach comes with a certain drawback, and this isn’t any different. The most obvious one would be pulling all Glue job runs instead of the specific, desired one. Boto3 supports GetJobRun for Glue jobs, and you need to pass the ID of the job run to get information about that run. But, we can’t know the Glue job run ID as it is generated at Glue job run start, and we would need to pass that data at runtime to the poller, which isn’t trivial and could produce unexpected dependency issues when one Glue job starts before the other..

One might think that a good idea would be to somehow set the Glue job run ID manually. Even though this would be a great approach, Glue jobs don’t, unfortunately, support setting their run ID manually, even though they take it as a parameter. Upon further research, you’ll find it is only used to provide a previous job run ID in the retry phase, as explained in their documentation.

As you can see above, one solution for pulling all glue job runs is using a paginator in combination with MaxItems. By limiting the retrieval to a specified number of the latest Glue job runs, we can perform checks on a significantly reduced dataset. Although it may appear as a minor enhancement initially, this approach greatly improves performance and execution times. Since Lambda functions are designed for fast and frequent invocation with minimal execution times, optimizing data retrieval contributes to overall efficiency.

With the ability to pass along data in partially dependent parallel flows in AWS Step functions, the possibilities for improving your workflow are endless. An example step function which was previously running for more than an hour has now been reduced to under 30 minutes, which showcases how seemingly minor optimizations like this can go a long way in improving execution times at large, and while achieving partially dependent parallel flows in AWS Step Functions does require a workaround, the benefits of optimized workflow execution times make it worthwhile.


“How to achieve partially dependent parallel flows in AWS Step Functions” Tech Bite was brought to you by Nedim Badžak, Junior Software Engineer at Atlantbh.

Tech Bites are tips, tricks, snippets or explanations about various programming technologies and paradigms, which can help engineers with their everyday job.

oban
Software DevelopmentTech Bites
February 23, 2024

Background Jobs in Elixir – Oban

When and why do we need background jobs? Nowadays, background job processing is indispensable in the world of web development. The need for background jobs stems from the fact that synchronous execution of time-consuming and resource-intensive tasks would heavily impact an application's  performance and user experience.  Even though Elixir is…
selenium
QA/Test AutomationTech Bites
December 22, 2023

Selenium Grid 4 with Docker

Introduction When talking about automation testing, one of the first things that comes to mind is Selenium. Selenium is a free, open-source automated testing framework used to validate web applications across different browsers and platforms. It is not just a single tool but a suite of software. Every component of…

Want to discuss this in relation to your project? Get in touch:

One Comment

Leave a Reply