building decoupled event driven applications

A key component of microservice architectures is the decoupling of discrete services to enable independent and loosely coupled services to be orchestrated.

In this vein, event driven designs aim to decouple the flow of data between services and support fault tolerant and resilient solutions. However in practice, many times the services are built into specific cloud provider platforms or persistence mechanisms to effectively make them dependent on a monolith outside of their control. In the event of a need to migrate platforms or persistence layers, to rearchitect and refactor any such application could take days, if not weeks of developer hours.

This is where the decoupling of services becomes important. In theory, you should be able to pick up your solution from any cloud and with little-to-no code change, migrate it to any other cloud, or on-prem. Not only does this enable better platform agnosticism and fault tolerance, it also increases developer efficiency as you can focus on the core of your application rather than individual cloud service integration logic.

To enable this process-level decoupling, we will be using two binaries:

pushx and procx are decoupled DAO drivers which are responsible for taking data from your application and sending it to a cloud provider, and receiving data from a cloud provider and sending it to your application.

By separating the data flow from the cloud provider and your application, these tools enable you to write your application in a way that is independent of the cloud provider you are using, and instead focus solely on the core logic of your process. If you ever need to move databases, streaming technologies, or cloud providers, your code remains portable and easily extensible, and you simply update configuration parameters to point to the new data source.

Let's say you have a simple ETL requirement where you need to pick up data from an AWS S3 bucket, do some basic transformation, and send it into a SQS queue to be processed by a worker farther down the line.

You could import the AWS SDK for your language and configure it in your application, but this now requires your developers to learn these technologies as well as manage the interface with them. If you ever need to rearchitect your application and change data sources or destinations, you now need to rewrite your application code, even if your core business logic never changes.

Let's see how we can solve this with pushx and procx. First, let's create a basic example of a decoupled application. In this example, we will receive a JSON object, add some metadata to the object, and then send the object to a SQS queue.

var fs = require('fs');
var path = require('path');

function createMetadata() {
    var metadata = {};
    metadata.created = new Date();
    metadata.updated = new Date();
    metadata.updatedBy = 'me';
    return metadata;
}

function main() {
    // read file at local path
    var file = path.join(__dirname, 'example.json');
    var obj = JSON.parse(fs.readFileSync(file, 'utf8'));
    obj.metadata = createMetadata();
    // write output to stdout. we could also do a file if we wanted.
    console.log(JSON.stringify(obj, null, 2));
}

main();


Note that this code makes no mention of S3, SQS, or any other data providers. It is simply taking local data, operating on it, and passing it to STDOUT.

This is the power of decoupling, we've made the cloud constructs configuration parameters, and our business logic is entirely independent of our platform.

Let's run this code and see what happens.

# first, let's create an example object and put it in S3
export S3_BUCKET=my-bucket
export S3_KEY=example.json
echo '{"name": "John Doe", "id": 1}' > $S3_KEY
aws s3 cp $S3_KEY s3://$S3_BUCKET/$S3_KEY
rm $S3_KEY

# now, let's configure procx to pull the data from S3
# and place it in a local file, and delete it from S3 once processed
export PROCX_DRIVER=aws-s3
export PROCX_AWS_S3_BUCKET=$S3_BUCKET
export PROCX_PAYLOAD_FILE=$S3_KEY
export PROCX_AWS_S3_KEY=$S3_KEY
export PROCX_AWS_S3_CLEAR_OP=rm

# next, let's configure pushx to send the data to a SQS queue
export PUSHX_DRIVER=aws-sqs
export PUSHX_AWS_SQS_QUEUE_URL=https://sqs.us-east-1.amazonaws.com/123456789012/my-queue

# finally, let's run the application
procx node example.js | pushx


This is working well and we've deployed it to AWS, and business is happy. But now we have a requirement to pull the data from GCP GCS rather than S3, and send to GCP Pub/Sub rather than SQS.

Normally, this would mean a bunch of groaning developers and a month of work to update our code. But with pushx and procx, we just need to update our configuration params, and our code remains exactly the same:

export PROCX_DRIVER=gcp-gcs
export PROCX_GCP_PROJECT_ID=my-project
export PROCX_GCP_GCS_BUCKET=my-bucket
export PROCX_GCP_GCS_KEY=example.json
export PUSHX_DRIVER=gcp-pubsub
export PUSHX_GCP_PROJECT_ID=my-project
export PUSHX_GCP_TOPIC=my-topic
procx node example.js | pushx


This is the most basic example, but if you've ever faced this problem before, you should already see the power of decoupling.

We can even use the -out flag of pushx to enable "tee'ing" the data off to multiple data providers.

procx node example.js | \
    pushx -driver aws-sqs ... -out=- | \
    pushx -driver gcp-pubsub ... -out=- | \
    pushx -driver gcp-gcs ... -out=- | \
    pushx -driver gcp-bq ... -out=- | ...


By making the cloud simply configuration variables, we are able to take advantage of what the cloud has to offer, without ever being locked in to any one cloud provider or technology.

While in practice you will most likely be doing some form of business logic / transformation between data load and data push, since pushx and procx are both abstraction DAOs, you can also simply join them together to create a direct data pipeline between two data providers.

For example, if you want to stream all data from a redis list to a Google BigQuery table, you can do the following:

export PROCX_DRIVER=redis-list
export PROCX_REDIS_HOST=localhost
export PROCX_REDIS_PORT=6379
export PROCX_REDIS_KEY=test-redis-list
export PROCX_PASS_WORK_AS_STDIN=true
export PROCX_DAEMON=true
export PROCX_DAEMON_INTERVAL=500
export PUSHX_DRIVER=gcp-bq
export PUSHX_GCP_PROJECT_ID=my-project
export PUSHX_GCP_BQ_QUERY='INSERT INTO mydatatest.mytable (id, name, another) VALUES ({{id}}, "{{name}}", "{{another}}")'
# start the pipeline
procx pushx &
# now push data to the redis-list, and it will be streamed to the BigQuery table
redis-cli -h localhost -p 6379 lpush test-redis-list '{"id":1,"name":"John","another":"Doe"}'


procx pushx - this is a truly one-line data pipeline, with no code or infrastructure to manage, just configuration parameters and a single command.

If you prefer to import procx and pushx into your codebase and treat them as an in-app DAO rather than a discrete pipeline step, pushx-js, procx-js, pushx-py, procx-py, pushx-go, and procx-go are available. These simply subshell out to the respective binary to enable quicker feature-complete cross-compatibility, as well as support platform-specific compilation requirements. More language libraries will be added as time permits, and PRs are always welcome.

With the in-app import, your code can send and retrieve data to and from any number of data providers by simply updating configuration parameters. Let's look at what our above code would look like with the in-app DAO.

var procx = require('@robertlestak/procx');
var pushx = require('@robertlestak/pushx');

function createMetadata() {
    var metadata = {};
    metadata.created = new Date();
    metadata.updated = new Date();
    metadata.updatedBy = 'me';
    return metadata;
}

async function getData() {
    let args = [
        "-driver",
        "aws-s3",
        "-aws-s3-bucket",
        "my-bucket",
        "-aws-s3-key",
        "example.json",
        "-aws-s3-clear-op",
        "rm"
    ];
    try {
        let data = await procx(args);
        return JSON.parse(data);
    } catch (err) {
        console.log(err);
    }

}

async function pushData(data) {
    let input = JSON.stringify(data)
    let args = [
        "-driver",
        "redis-list",
        "-redis-host",
        "localhost",
        "-redis-port",
        "6379",
        "-redis-key",
        "test-redis-list"
    ]
    try {
        await pushx(input, args)
    } catch (e) {
        console.log(e);
    }
}

async function main() {
    // get data from DAO
    let obj = await getData();
    // transform data
    obj.metadata = createMetadata();
    // send data to DAO
    await pushData(obj);
}

(async () => {
    await main();
})();


Now, instead of passing your process into procx and piping the output of the application to pushx as a DevOps/deployment process, we are calling procx and pushx directly in our code, with the entire context of the data layer contained in the single args array. This can be moved to a separate configuration layer such as HashiCorp Vault or Consul, and removed from the codebase while still enabling usage of pushx and procx as an inline DAO.

Both approaches have their use cases, so feel free to experiment and see which one works best for you.

Finally, if you are running Kubernetes, the procx-operator is an operator which will handle automatically scaling your procx workloads based on their respective data layer scaling thresholds. This completely decouples your application from the cloud, and allows you to automatically scale your event-based application without having to worry about scaling your cloud infrastructure.

At the time of writing this, procx and pushx have over 25 different drivers, with more being added regularly.

last updated 2024-03-18