Tag Archives: serverless

What was Old is New: Finding Joy in Modernising Legacy Systems

(cover image from ThisisEngineering RAEng)

Let’s face it: software is easier to write than maintain. This is why we, as software engineers, prefer to just “rip it out and start over” instead of trying to understand what another developer (or our past self) was thinking. We seem to have collectively forgotten that “programs must be written for people to read, and only incidentally for machines to execute”. You know it’s true — we’ve all had to painstakingly trace through a casserole of spaghetti code and thin, old-world-style abstractions digging for the meat of the program only to find nothing but a mess at the bottom of our plates.

It’s easy to yell “WTF” and blame the previous dev, but the truth is often more complicated. We can’t see the future, so it’s impossible to understand how requirements, technology, or business goals will grow when we design a net-new system. As a result, systems can become unreadable as their scope increases along with the business’s dependency on them. This is a bit of a paradox: older, harder to maintain systems often provide the most value. They are hard to work on because they’ve grown with the company, and scary to work on because breaking it could be a catastrophe.

Here’s where I’m calling you out: if you like hard, rewarding problems… try it. Take the oldest system you have and make it maintainable. You know the one I’m talking about — the one no one will “own”. That one the other departments depend on but engineers hate. The one you had to patch Log4Shell on first. Do it. I dare you.I recently had such an opportunity to update a decade old machine learning system at Bazaarvoice. On the surface, it didn’t sound exciting: this thing didn’t even have neural networks! Who cares! We ll… it mattered. This system processes nearly every user-generated product review received by Bazaarvoice — nearly 9 million per month — and does so with 90 million inference calls to machine learning models. Yup — 90 million inferences! It’s a huge scale, and I couldn’t wait to dive in.

In this post, I’ll share how modernizing this system through a re-architecture, instead of a re-write, allowed us to make it scalable and cost-effective without having to rip out all of the code and start over. The resulting system is serverless, containerized, and maintainable while reducing our hosting costs by nearly 80%. This post is a companion piece to a talk I recently presented at AWS Data Summit for Software Companies on generating value from data by leveraging our best practices to ensure success in machine learning projects. This post discusses the technical aspects in more detail, but you can watch the high-level talk linked at the end.

Something Old

First, let’s take a look at what we’re dealing with here. The legacy system my team was updating moderates user-generated content for all of Bazaarvoice. Specifically, it determines if each piece of content is appropriate for our client’s websites.

Photo by Diane Picchiottino

This sounds straightforward — eliminate obvious infractions such as hate speech, foul language, or solicitations — but in practice, it’s much more nuanced. Each client has unique requirements for what they consider appropriate. Beer brands, for example, would expect discussions of alcohol, but a children’s brand may not. We capture these client-specific options when we onboard new clients, and our Client Services team encodes them into a management database.

For some added complexity, we also sample a subset of our content to be moderated by human moderators. This allows us to continuously measure the performance of our models and discover opportunities for building more models.

The full architecture of our legacy system is shown below:

Our legacy moderation system hosted machine learning models on a single EC2 instance. This made deployments slow and limited scalability to the host’s memory size.

This system has some serious drawbacks. Specifically — all of the models are hosted on a single EC2 instance. This wasn’t due to bad engineering — just the inability of the original programmers to foresee the scale desired by the company. No one thought that it would grow as much as it did. In addition, the system suffered from developer rejection: it was written in Scala, which few engineers understood. Thus, it was often overlooked for improvement since no one wanted to touch it.

As a result, the system continued to grow in a keep-the-lights-on manner. Once we got around to re-architecting it, it was running on a single x1e.8xlarge instance. This thing had nearly a terabyte of ram and costs about $5,000/month (unreserved) to operate. Don’t worry, though, we just launched a second one for redundancy and a third for QA 🙃.

This system was costly to run and was at a high risk of failure (a single bad model can take down the whole service). Furthermore, the code base had not been actively developed and was thus significantly out of date with modern data science packages and did not follow our standard practices for services written in Scala.

Something New

When redesigning this system we had a clear goal: make it scalable. Reducing operating costs was a secondary goal, as was easing model and code management.

The new design we came up with is illustrated below:

Our new architecture deploys each model to a SageMaker Serverless endpoint. This lets us scale the number of models without limit while maintaining a small cost footprint.

Our approach to solving all of this was to put each machine learning model on an isolated SageMaker Serverless endpoint. Like AWS Lambda functions, serverless endpoints turn off when not in use — saving us runtime costs for infrequently used models. They also can scale out quickly in response to increases in traffic.

In addition, we exposed the client options to a single microservice that routes content to the appropriate models. This was the bulk of the new code we had to write: a small API that was easy to maintain and let our data scientists more easily update and deploy new models.

This approach has the following benefits:

  • Decreased the time to value by over 6x. Specifically, routing traffic to existing models is instantaneous, and deploying new models can be done in under 5 minutes instead of 30.
  • Scale without limit – we currently have 400 models but plan to scale to thousands to continue to increase the amount of content we can automatically moderate.
  • Saw a cost reduction of 82% moving off EC2 as the functions turn off when not in use, and we’re not paying for top-tier machines that are underutilized.

Simply designing an ideal architecture, however, isn’t the really interesting hard part of rebuilding a legacy system — you have to migrate to it.

Something Borrowed

Our first challenge in migration was figuring out how the heck to migrate a Java WEKA model to run on SageMaker, let alone SageMaker Serverless.

Fortunately, SageMaker deploys models in Docker containers, so at least we could freeze the Java and dependency versions to match our old code. This would help ensure the models hosted in the new system returned the same results as the legacy one.

Photo from JJ Ying

To make the container compatible with SageMaker, all you need to do is implement a few specific HTTP endpoints:

  • POST /invocation — accept input, perform inference, and return results.
  • GET /ping — returns 200 if the JVM server is healthy

(We chose to ignore all of the cruft around BYO multimodel containers and the SageMaker inference toolkit.)

A few quick abstractions around com.sun.net.httpserver.HttpServer and we were ready to go.

And you know what? This was actually pretty fun. Toying with Docker containers and forcing something 10 years old into SageMaker Serverless had a bit of a tinkering vibe to it. It was pretty exciting when we got it working — especially when we got the legacy code to build it in our new sbt stack instead of maven. The new sbt stack made it easy to work on, and containerization ensured we could get proper behavior while running in the SageMaker environment.

Something Blue

So we have the models in containers and can deploy them to SageMaker — almost done, right? Not quite.

Photo by Tim Gouw

The hard lesson about migrating to a new architecture is that you must build three times your actual system just to support migration.

In addition to the new system, we had to build:

  • A data capture pipeline in the old system to record inputs and outputs from the model. We used these to confirm that the new system would return the same results.
  • A data processing pipeline in the new system to compute results and compare them to the data from the old system. This involved a large amount of measurement with Datadog and needed to offer the ability to replay data when we found discrepancies.
  • A full model deployment system to avoid impacting the users of the old system (which would simply upload models to S3). We knew we wanted to move them to an API eventually, but for the initial release, we needed to do so seamlessly.

All of this was throw-away code we knew we could toss once we finished migrating all of the users, but we still had to build it and ensure that the outputs of the new system matched the old.

Expect this upfront.

While building the migration tools and systems certainly took more than 60% of our engineering time on this project, it too was a fun experience. Unit testing became more like data science experiments: we wrote whole suites to ensure that our output matched exactly. It was a different way of thinking that made the work just that much more fun. A step outside our normal boxes, if you will.

So… Just Try It

Next time you’re tempted to rebuild a system from code up, I’d like to encourage you to try migrating the architecture instead of the code. You’ll find interesting and rewarding technical challenges and will likely enjoy it much more than debugging unexpected edge cases of your new code.


The talk I gave is a bit more high-level and goes into the MLOps side of things. Check it out here:

Telling Your Data To “Back Off!” (or How To Effectively Use Streams)

Foreword

Our Curations engineering team makes heavy use of serverless architecture. While this typically gives us the benefit of reduced costs, flexibility, and rapid development, it also requires us to ensure that our processes will run within the tight memory and lifecycle constraints of serverless instances.

In this article, I will describe an actual case where a scheduled job had started to fail, the discovery of the root cause and the refactor that resolved the issue. I will assume you have at least a rudimentary knowledge of Node JS and Amazon Web Services.

Content Export

Months previously, we built a scheduled job using AWS Lambdas that would kick off an export of our social content every 24 hours. In a nutshell, its purpose was to:

  1. Query for social content documents stored in a MongoDB server.
  2. Transform each document to a row in CSV format.
  3. Write out the CSV rows to a file in S3.

The resulting CSV files were meant for ingestion into an analytics tool to measure web site conversion impact.

It all worked fine for months. Then recently, this job began to fail.

Drowning in Content

For your viewing pleasure, here is an excerpted, condensed section of the main code:

function runExport(query) {
  MongoClient.connect(MONGODB_URL)
    .then((db) => {
      let exportCount = 0;
      db
        .collection('content')
        .find(query)
        .forEach(
          // iterator function to perform on each document
          (content) => {
            // transform content doc to a csv row, then push to S3 writer
            s3.write(csvTransform(content));
            exportCount++;
          },
          // done function, no more documents
          () => {
            db.close();
            s3.close();
            logger.info(`documents exported: ${exportCount}`);
          }
        );
    });
}

Each document from the database query is transformed into a CSV row, then pushed to our S3 writer which buffered all content until an s3.close(). At this point, the CSV data would be flushed out to the S3 destination bucket.

It was obvious from this implementation that heap usage would grow unbounded, as documents from the database would be loaded into resident memory as CSV data. As we aggregated content over the previous months, the data pushed into our export process began to generate frequent “out of heap memory” errors in our logs.

Memory Profile

To gain visibility into the memory usage, I wrote a quick and dirty MemProfiler class whose purpose was to sample memory usage using process.memoryUsage(), collecting the maximum heap used over the process lifetime.

class MemProfiler {
  constructor(collectInterval) {
    this.heapUsed = {
      min : null,
      max : null,
    };
    if (collectInterval) {
      this.start(collectInterval);
    }
  }

  start(collectInterval = 500) {
    this.interval = setInterval(() => this.sample(), collectInterval);
  }

  stop() {
    if (this.interval) {
      clearInterval(this.interval);
    }
  }

  reset() {
    this.stop();
    this.heapUsed = {
      min : null,
      max : null,
    };
  }

  sample() {
    const mem = process.memoryUsage();
    if (this.heapUsed.min === null || mem.heapUsed < this.heapUsed.min) {
      this.heapUsed.min = mem.heapUsed;
    }
    if (this.heapUsed.max === null || mem.heapUsed > this.heapUsed.max) {
      this.heapUsed.max = mem.heapUsed;
    }
  }
}

I then added the MemProfiler to the runExport code:

function runExport(query) {
  memProf.start();
  MongoClient.connect(MONGODB_URL)
    .then((db) => {
      let exportCount = 0;
      db
        .collection('content')
        .find(query)
        .forEach(
          (content) => {
            // transform content doc to a csv row, then push to S3 writer
            s3.write(csvTransform(content));
            exportCount++;
            memProf.sample();
          },
          () => {
            db.close();
            s3.close();
            logger.info(`documents exported: ${exportCount}`);
            console.log('heapUsed:');
            console.log(` max: ${memProf.heapUsed.max/1024/1024} MB`);
            console.log(` min: ${memProf.heapUsed.min/1024/1024} MB`);
          }
        );
    });
}

Running the export against a small set of 16,900 documents yielded this result:

documents exported: 16900
heapUsed:
max: 30.32764434814453 MB
min: 8.863059997558594 MB

Then running against a larger set of 34,000 documents:

documents exported: 34000
heapUsed:
max: 36.204505920410156 MB
min: 8.962921142578125 MB

Testing with additional documents increased the heap memory usage linearly as we expected.

Enter the Stream

The necessary code rewrite had one goal in mind: ensure the memory profile was constant, whether a total of one document was processed or a million. Since this was a batch process, we could trade off memory usage for processing time.

Let’s reiterate the primary operations:

  1. Fetch a document
  2. Transform the document to a CSV row
  3. Write the CSV row to S3

If I may present an analogy, we have widgets moving down the assembly line from one station to the next. The conveyor belt on which the widgets are transported is moving at a constant rate, which means the throughput at each station is constant. We could increase the speed of that conveyor belt, but only as fast as can be handled by the slowest station.

In Node JS, streams are the assembly line stations, and pipes are the conveyor belt. Let’s examine the three primary types of streams:

  1. Readable streams: This is typically an upstream data source that feeds data to a writable or transform stream. In our case, this is the MongoDB find query.
  2. Transform streams: This typically takes upstream data, performs a calculation or transformation operation on it and feeds it out downstream. In our case, this would take a MongoDB document and transform it to a CSV row.
  3. Writable streams: This is typically the terminating point of a data flow. In our case this is the writing of a CSV row to S3.

Readable Stream

Conveniently, the MongoDB Node JS driver’s find() function returns a Cursor object that extends the Node JS Readable stream. How convenient!

const streamContentReader = db.collection('content').find(query);

Transform Stream

All we need to do is extend the Transform stream class, and override the _transform() method to transform the content to a CSV row, push it out the other end, and signal upstream that it is ready for the next content.

class CsvTransformer extends Transform {
  constructor(options) {
    super(options);
  }

  _transform(content, encoding, callback) {
    // write row
    const { _id, authoredAt, client, channel, mediaType, permalink , text, textLanguage } = content;
    const csvRow = `${_id},${authoredAt},${client},${channel},${mediaType},${permalink}${text},${textLanguage}\n`;
    this.push(csvRow);
    // signal upstream that we are ready to take more data
    callback();
  }
}

Writable Stream

Let’s purposely mock the S3 Writer for now. It does nothing here, but realistically, we would buffer up incoming data, then flush out the buffer in chunks over the network for best throughput.

class S3Writer extends Writable {
  constructor(options) {
    super(options);
  }

  _write(data, encoding, callback) {
    // TODO: probably use a fixed size buffer
    // that we write to and then flush out to S3 using
    // multipart data upload
    callback();
  }
}

Pipe ‘Em Up

We now create the three streams:

const streamContentReader = db.collection('content').find(query);
const streamCsvTransformer = new CsvTransformer({ objectMode: true });
const streamS3Writer = new S3Writer();

Streams alone do nothing. What makes them useful is connecting the output from one stream to the input of another. In stream terminology, this is called piping, and is accomplished using the stream’s pipe method:

streamContentReader.pipe(streamCsvTransformer).pipe(streamS3Writer)

Although the native pipe method is perfectly functional, I highly recommend using the pump library. In addition to being more readable by passing in an array of streams to pipe together, we can also invoke then() when the pipe has finished, as well as handle close or error events emitted by the streams:

pump([ streamContentReader, streamCsvTransformer, streamS3Writer ])
  .then(() => {
    console.log(`documents exported: ${exportCount}`);
  })
  .catch((err) => console.error(err));

CSV and S3 Write Streams

I purposely did not implement the S3Writer above, because npm is a treasure trove of solutions! The s3-write-stream library will take care of buffering, using multipart upload and handling retries, so we don’t have to get our hands too dirty. And wouldn’t you know it, there is also the csv-write-stream library that will generate properly escaped CSV rows.

As an exercise, you the reader may want to try using the s3-write-stream and csv-write-stream. Trust me, once you get the hang of streaming, you will enjoy it!

Back Off!

Let’s touch back on the issue of memory usage.

What if a write stream becomes blocked for a period of time? Maybe we are writing data to a third-party service over HTTP and network congestion or service throttling is slowing the write rate. The readable stream will happily pump data in as fast as it can. Won’t that cause increased heap usage as all that data gets backed up and buffered?

The quick answer is “no”. In the implementation of the _write() method of your writable stream, when done with the data chunk (after writing to a file for example), call callback(). This signals to the incoming stream that it is ready to receive the next chunk. You can also provide a highWaterMark to the writable stream constructor to give a specific number of objects to buffer before pausing the incoming stream. This is all handled internally by the Node JS streams implementation. No more unbounded buffering!

For a deep dive into the concept of backpressure control, read this great article on Backpressuring in Streams.

Stream ‘Em If You Got ’em!

Our team is now in the process of utilizing Node JS streams whenever we read in volumes of data, process that data, then write out the results. We are now seeing great gains in both stability and maintainability!

References

Node JS Streams APIhttps://nodejs.org/api/stream.html

Article: Backpressuring in Streams: https://nodejs.org/en/docs/guides/backpressuring-in-streams/

MongoDB Cursor Streamhttp://mongodb.github.io/node-mongodb-native/2.2/api/Cursor.html

Pump Modulehttps://www.npmjs.com/package/pump

CSV Write Stream Module: https://www.npmjs.com/package/csv-write-stream

S3 Write Stream Module: https://www.npmjs.com/package/s3-write-stream