Cost optimization in AWS - Part II

Cost optimization in AWS - Part II

Having identified your team's strengths and weaknesses, understood your data streams, and analyzed your AWS bill as discussed in Part 1, it's time to plan and execute the changes. Drawing from my experience with S3-based data lakes, Redshift data warehouses, Glue, Lambda, and SQS for intermediate processing, I offer the following suggestions.

Remember, the goal is to reduce, in order of priority, I/O, processing, memory footprint, and storage. While storage is usually the least expensive item, it can still accumulate over time.

Eliminate Unnecessary Duplication of Work and Data

The aim here is to avoid redundant effort.

Begin by ensuring you're performing incremental loading for all data sources. While some sources may not offer Change Data Capture (CDC) capabilities, you might devise innovative ways to pull only mutated data.

One method involves generating a hash of all values per ID at both the source and destination, comparing hash values, and copying only the mutated values.

Consider performing frequent incremental loads only for frequently used data. Then, implement an "eventual consistency" or "healing" method that performs full loads less frequently, such as weekly or monthly.

Next, assess how many copies of the data you keep and how many you actually use. I typically follow a pattern of keeping one full verbatim copy, one full transformed copy (including data type transformations), and other mutations like data marts. Anything outside these categories is expendable and should be minimized.

Evaluate Your Logging Practices

Over-retaining logs is another potential pitfall. Most AWS services have a configurable log destination. Use this to monitor what's being logged and set up lifecycle rules. If you need to retain logs for years due to compliance requirements, S3's intelligent tiering is a feature I highly recommend.

Lastly, be aware that some services, like Glue, use S3 as swap storage. If you're copying large amounts of data, you might also be keeping a copy among your logs. While you can't override this behavior, consider reducing the data loads instead.

Leverage Capacity Reservations and Lower Execution Tiers

Data teams often work on long-term tasks, so capacity reservations make sense for long-term cost control. You pre-purchase what you're sure to be using a year from now. For example, with Redshift, you're unlikely to stop using it or reduce the number of nodes within the next year. So, reserving that capacity can reduce your bill by 25% in less than 10 minutes.

Another tip: Glue offers two execution tiers - standard and flex. While the official guideline suggests using flex only for development workloads, I've successfully used it in production without issues. Using job retries in combination with flex execution has proven reliable in my experience.

Remember, AWS cost optimization is about making informed decisions that align with your business objectives and budget constraints. By planning and executing these changes, you're well on your way to mastering AWS cost optimization.

Cost optimization in AWS - Part I

The Art of Cloud Cost Optimization in AWS: A Comprehensive Guide for Data Engineering Teams

In the ever-evolving domain of cloud computing, the journey towards cost optimization is as unique as the organizations undertaking it. As data engineering managers, we are often tasked with the challenge of managing and optimizing cloud costs, particularly within the context of Amazon Web Services (AWS), a leading provider of cloud services worldwide. This article is a compilation of experiences and insights gathered from various organizations and architectures, all aimed at providing a structured approach to AWS cost optimization. While I strive to provide a comprehensive guide, it's important to remember that the devil is in the details. Each organization has its unique set of resources and requirements, and a one-size-fits-all strategy may not yield the desired results. As an engineering manager, your role is crucial in understanding the specifics of your cloud usage and tailoring these strategies to fit your unique needs. We will delve into the intricacies of AWS billing, providing insights into what you're paying for and how you can adapt your data operations to minimize these costs. However, the onus is on you, the reader, to invest time and effort in understanding the resources at your disposal and how best to manage them. This guide aims to equip you with a broad understanding of AWS cost management, highlighting the importance of individualizing the approach to suit your specific needs. We will explore various strategies to optimize data loads, discuss the importance of understanding AWS billing, and emphasize the role of data engineering managers in striking a balance between cost-effectiveness and operational efficiency. By the end of this article, you will have a solid foundation to navigate the AWS cost landscape, making informed decisions that align with your business objectives and budget constraints. However, remember that this is just the starting point. The journey towards mastering AWS cost optimization is a continuous process of learning, adapting, and evolving.

Establish the strengths and weaknesses of your team

/images/guardians.gif

In the quest for AWS cost optimization, understanding your team's strengths and weaknesses is the crucial first step. The composition of your team, their skills, and their experience can significantly influence your cloud strategy and, by extension, your AWS costs. You may have a senior data architect at the helm, ideally in a team lead or managerial role. This individual's expertise can be instrumental in determining the optimal setup given your current limitations, data volume, velocity, and variety. The questions you need to address could range from whether you require a data lake or a standard data warehouse, to whether you should use AWS native technology or simply use AWS as a hosting service. On the other hand, your team might comprise members with a strong DevOps background. In such a scenario, you could lean towards a fully automated CI/CD and deploy infrastructure-as-code. This approach can facilitate rapid iteration from proof-of-concepts (PoCs) to establishing a more cost-efficient cloud strategy. If your team consists of developers or SQL wizards, they might prefer to tackle complex issues with code versus SQL procedures. This preference can significantly impact your cost sheet. Are you leveraging the best tools to augment your team's skills? Regardless of your team's structure and strengths, the key is to harness them effectively. Tailor your processes and tools to capitalize on your team's unique edge. By doing so, you not only enhance your team's performance but also potentially lower your running costs. Remember, AWS cost optimization is not a one-size-fits-all strategy. It's a journey that requires a deep understanding of your team's capabilities and a willingness to adapt and evolve. By leveraging your team's strengths, you take the first step towards mastering the art of AWS cost optimization.

Take a good, long look at your data & streams

The journey towards AWS cost optimization continues with a thorough examination of your data and streams. This step involves prioritizing how your data is used and provisioned, which can significantly impact your AWS costs. Start by asking yourself: What are your data needs? Do you require streaming services, or would batch processing suffice for your stakeholders? How frequently does your data need to converge - once a day, an hour, or a minute? Is all your data business-critical at all times, or are there certain data flows that are rarely, if ever, accessed? How many copies of your data do you need to maintain? What would be the impact if your data were unavailable for an hour, a day, or even permanently? It's important to note that the answers to these questions are rarely static. They evolve over time, influenced by changes in business needs, stakeholder requirements, and even pricing models. Therefore, it's advisable to revisit these questions periodically - a good rule of thumb is to do a comprehensive review at least once a year. Remember, understanding your data landscape is not just about identifying what you have, but also about understanding how it's used, by whom, and when. By gaining a deep understanding of your data and streams, you can make informed decisions that align with your business objectives and budget constraints, taking you one step closer to mastering AWS cost optimization.

Analyze your bill

The path to AWS cost optimization is paved with a deep understanding of your expenditure. At the highest level, your AWS bill typically comprises four components: storage (the cheapest component, scales linearly), compute (processor), memory, and I/O (traffic, usually scales exponentially). While AWS's console-provided Cost Management is a good starting point, a more granular understanding of your costs requires a deeper dive. I recommend using AWS CLI to extract data, which you can then persist and manually enrich with retail prices. For instance, you might know the DUI price for AWS Glue, but the costs of individual workflows or jobs remain opaque. You might be aware of the price per GB of storage in S3, but the daily I/O volume might be unknown. And let's not forget the additional services like Config, Cloud Trail, Cloud Watch, Guard Duty, KMS, or even Macie that need to be factored in per GB of storage. My suggested approach is to focus on the top 10 most expensive services and dissect each line item. More often than not, you'll find that the costs don't add up, indicating that the service is being used for something you haven't factored in. It's crucial to know exactly where your money is going. Armed with this knowledge and a clear understanding of your data requirements, you'll be in a position to propose immediate cost optimization tactics or an overarching strategy to re-orchestrate data flows. This can better serve your stakeholders' needs while simultaneously lowering costs. However, don't forget to factor in the effort required to realize this vision! Remember, AWS cost optimization is not just about cutting costs; it's about making informed decisions that align with your business objectives and budget constraints. By decoding your AWS bill, you take another step towards mastering AWS cost optimization.

Azure Tables & Python

Background

In the previous article I talked about generating SHA-256 hashes at scale via multiprocessing. Remember, the task was to generate a reverse-lookup table for all IP addresses. I want that table persisted in Azure Tables - a cloud-based key-value store, that would allow rapid lookups at scale.

These "rapid" lookups are facilitated by the provision of partition keys - a leading subset of the lookup string (in our case, the hash). The partition key allows the table engine to run a nested query - a short "partition key" filter, followed up by a slower string comparisson of the full lookup key.

You have to choose your partition key wisely - if it's short, lookups will be fast, but not near-instant (the second lookup processes more comparissons). If your partition key is longer, the partition lookup is slower. While I don't need to in my case, one should also consider how the data would grow in the future and plan for that. AFAIK, partition keys are immutable.

I've arbitrarily chosen a partition key of 4 characters - that splits the total 4.3B records in 65536 groups with ~65k records in each group.

Challenge

Now for the true challenge: writing is as slow as your network, which is usually 300-500ms per transaction. Multiply that RTT (round-trip time), add an occasional replay (write error, requiring resending of the data) and we're looking at...

4.3B * 0.33s = 16 570 days ???

Not if we can help it.

Hack #1: documentation aka. RTFM

Azure table supports transactional writes, under the following limitations:

  • batches are 100 records (or less) long

  • batches contain the same partition key

Under perfect conditions, this lowers our effort to 165 days. Definitely a 3.6 result.

/images/3.6.roentgen.jpeg

So we need to work harder.

Hack #2: web requests in parallel

No biggie, quite straightforward if you ever did web scraping. But still, the considerations:

  • you are limited by your network bandwidth to a predefined transfer rate

  • network parameters like latency, jitter, throughput - and of course, availability - play a role in the equation as well [1]

  • you are limited by the number of cores on your system

  • running over 1000 threads is problematic for the python interpreter

  • you need to balance memory and throughput

  • your hardware configuration and network availability will render very different results than mine

Because of the last 2 points, I suggest you time all steps and log for comparisson purposes. You can try out various combinations and see what works in your case (cores/memory/network). For timing processes, I use the timeit library. Choose whatever tool and verbosity work for you, just do it! If you have 2TB of RAM and a black fiber Express Route to an Azure DC, you probably don't need to read further - just generate all in memory, split it in 65536 partitions and transmit the ~65k records in 10*650 parallel threads of 100 records each. It should take you less than 18 hours. If you're a mere mortal, continue reading :)

Approach

By leveraging the generation approach described earlier, the time to generate a /8 network I've achieved (32 core Xeon w. 64G RAM) is an average of 23s. [2] We'll use the /8 as building blocks for our optimization task.

Now, a /8 has 1.6M records, meaning the 65536 partitions we're filling would have around 256 records - enough for 3 requests to the Azure Table.

The process in a nutshell looks like this:

  1. SHA-256 records are generated for a /8 network in a Pandas DataFrame.

  2. The DataFrame is filtered by the common PartitionKey.

  3. The records are prepared and transmitted in parallel.

At this point, we perceive (2) as the slowest process. Why don't we send more records in parallel?

Consider this: a /8 takes around 5.5-6 GB of RAM. Based on your hardware, you may decide that generating 2* /8 dataframes, appending them as one is more efficient (thus filling the 65536 partitions with ~512 records, for ~6 requests). But soon you'll realise that (1) gets significantly slower... around the point you fill 1/2 of the physical memory of your machine, it starts really dragging.

Obviously, unless you use dask (which we will explore in our next post), Spark, MapReduce or any similar alternative, (1) is synchronous and slow. So, it's up to you to measure and optimize.

One more detail is also worth noting: you probably noticed that I used "around" for the number of records. Indeed, due to natural distribution, you might encounter partitions with less than 220 records or more than 270. These fluctuations, also due to normal distribution, get smaller the more records you add to the partitions. Therefore, if you generate 3* /8 networks, you're almost guaranteed you'll need exactly 8 transactions to Azure. Another observation: if you go with 2* /8 networks, the last request is a little wasteful, as you're only sending ~12 records through (out of ~512).

Code

To parallize requests to Azure, it's most efficient to use Threads rather than Processes. Remember, these would be IDLE most of the time:

def transact(operations : list):
    table_client.submit_transaction(
                    operations
                )

def split_and_submit(partition : list):
    operations = []
    tpool = ThreadPool(processes = 32)
    for record in partition:
        operations.append(('upsert', record))
        if len(operations) == 100:
            rq = tpool.map_async(transact, [operations])
            operations = []
            rq.get()
    if len(operations) > 0:
        rq = tpool.map_async(transact, [operations])
        rq.get()
    tpool.close()

The above setup will create threads for each group of 100 records. Now you can (and should) run ThreadPools in parallel Processes too:

alpha = generate(20)

alpha.index = alpha["PartitionKey"].copy()
alpha.index.name = None
iterator = list(alpha["PartitionKey"].unique())

processed = return_temp('processed.temp')
unprocessed = [i for i in iterator if i not in processed]
alpha = alpha.drop(processed)

for chunk in chunker(unprocessed, int(CORES)):
    old = timer()
    print("chunk", end=' ')
    a = alpha.loc[chunk]
    print("located in ", round(timer()-old, 2), "s;", sep = '', end=' ')
    with mp.Pool(CORES) as pool:
        while True:
            try:
                r = pool.map_async(
                    split_and_submit,
                    [(a.loc[c]).to_dict('records') for c in chunk]
                    )
                r.get()
            except:
                continue
            break
    pool.join()
    pool.close()
    print("submitted to Azure in ", round(timer()-old, 2), "s;", sep = '', end=' ')
    processed.extend(chunk)
    with open("processed.temp", 'w') as file:
        writer = csv.writer(file)
        writer.writerow(processed)
    print("and finalized in ", round(timer()-old, 2), "s; progress:", round(int(len(processed)/32)/20.48, 2), '%', sep = '')

The code above uses a file - "processed.temp", to persist the processed partitions in case of errors. As the main process runs for hours and is error prone, it makes sense to track your progress.

Altenatively, you can use:

  • ThreadPools instead of MultiprocessPools that call ThreadPool. That's tricky and will be more unstable - and eventually the instability translates to time losses.

  • A CPU-rich GPU for the parallel processing

  • A ThreadPool with dynamic feed of partitions. The tricky part here is that as part (1) above is slow, so you need to filter and split partitions in mp.Pools to feed the ThreadPool

Python parallel execution

Last weekend, a friend of mine had a peculiar request - to decypher sha-256 hashes back to the original strings. We had little to go on, except that the original strings are IP addresses. As most people in IT know, the goal of hashing is to encode strings unilaterally, with a kink: you can generate all possible combinations and lookup the hash.

Long story short, there are two ways to reverse hashing:

  • rainbow tables (using reduction functions as per Philippe Oechsin's 2003 paper, ~99% probability usually)

  • precomputed full combination of all possibilities (100% probability, basically reverse lookup)

Rainbow tables are a probabilistic approach; we need 100% certainty. I'll deep dive into performance there in a separate post. We will focus on the "brute force" approach of generating SHA-256 hashes for the whole (finite) set of strings.

In general, our chosen approach falls somewhere between impractical, slow and quite expensive: if we are to generate all lower alphabetical strings (namespace [a-z]), we'll be looking at 2^28 combinations, some 308M combinations. We have an important advantage, however: our namespace, length and string structure are all finite and predefined.

The total universe is 2^32 ≈ 4.3B. According to specification, we'll remove the following networks:

  • 0.x.x.x /8

  • 255.x.x.x /8

  • 224.x.x.x /4

The approach I'm taking is as follows:

  1. Generate sha-256 hashes and export them to a widely available, text-based format (e.g. csv)

  2. Ingest the generated data in a database or key-value store for fast lookup. Suggested technology is Azure Table .

Initial version

Let's warm up with this:

from hashlib import sha256
import pandas as pd

for a in range(1, 256, 1):
    for b in range(0, 256, 1):
        ip_list = pd.DataFrame()
        for c in range(0, 256, 1):
            for d in range(0, 256, 1):
                ip = '.'.join([str(a), str(b), str(c), str(d)])
                hash = sha256(ip.encode()).hexdigest()
                dict = {"RowKey": hash, "PartitionKey": hash[0:4], "ip": ip}
                ip_list = ip_list.append(dict, ignore_index=True)
        ip_list.to_csv('.'.join([str(a), str(b), 'csv']), index=False)

How anticmatic - to generate a /16 network, the average time was 368 seconds (sample size 40).

We'll need to optimize, as we have a total of 60928 lists to generate, taking ~212 days at our current rate!

Nested generators

Here's an improved version of the same piece, this time making use of (nested) generators:

for a in range(1, 256, 1):
    for b in range(0, 256, 1):
        start = timer()
        ip_list = pd.DataFrame(
            columns =["RowKey","PartitionKey", "ip"],
            data=(
                    {
                        "RowKey": sha256('.'.join([str(a), str(b), str(c), str(d)]).encode()).hexdigest(),
                        "PartitionKey": sha256('.'.join([str(a), str(b), str(c), str(d)]).encode()).hexdigest()[0:4],
                        "ip": '.'.join([str(a), str(b), str(c), str(d)])
                    } for d in range(0, 256, 1) for c in range(0, 256, 1))
                    )
        ip_list.to_csv('.'.join([str(a), str(b), 'csv']), index=False)
        end = timer()
        print('Time to generate', '.'.join([str(a), str(b), 'X', 'X']), ":'", str(round(end-start, 2)), "seconds")

Now, that's more like it - 0.5s for a /16, hopefully we're on the right track here. Using a triple-nested generator, the time to calculate a /8 sha-256 table is, on average 4 minutes (sample size of 40), better than the result we prevously got for a /16. That's a 350x improvement, and with larger datasets, it' precisely what we're gunning for.

But it's not enough.

Also, we've hit another issue - writing to disk. A set of 2^24 addresses, 16M rows, takes 1.37G on disk - and roughly 25-35% of the 4 minutes. We'll take that as a "natural" limitation which we'll ignore for two reasons:

  • The csv's are not the final destination of our data;

  • Writing to disk is a topic of its own, and we'll assume limitations there are a given.

Threads and Multiprocessing in Python

Chances are you're looking for the good thread vs. multiprocessing stuff already. So what are the differences? Well, the main difference is memory allocation: threads are "lighter" than processes and share the same memory space. Ideally, you should chose the number of threads to correspond to the number of virtual cores your computer has, programmatically available through:

import multiprocessing as mp

print(mp.cpu_count())

Optimizing this parameter is actually dependent upon a multitude of factors, so I suggest profiling and documenting multiple combinations by running something like a Monte Carlo simulation to identify the best parameters for your machine. Please note: we're considering options for this parameter only in the context of threads. If we aim at multiple processes, we should limit the maximum number of processes by the number of cores we've shown above. You can't run processes in parallel on the same core.

Another key observation: multithreaded code is executing one at a time throughout this process due to the GIL. Therefore, multithreaded code is concurrent but not parallel. Still, one might see improvements in cases like IO-bound tasks like asyynchronously downloading files, where time is spent IDLE-ing for the netowrk response. Using the threading module in Python or any other interpreted language with a GIL can actually result in reduced performance...

Our code is performing a CPU bound task, therefore using the threading module will result in a slower execution time. For CPU bound tasks and truly parallel execution, we can use the multiprocessing module.

Basic Multiprocessing

Here's our initial version, that tries to run the process on multiple cores:

from hashlib import sha256
import pandas as pd
from timeit import default_timer as timer
import multiprocessing as mp


def process(a):
    start = timer()
    ip_list = pd.DataFrame(
        columns =["RowKey","PartitionKey", "ip"],
        data=(
                {
                    "RowKey": sha256('.'.join([str(a), str(b), str(c), str(d)]).encode()).hexdigest(),
                    "PartitionKey": sha256('.'.join([str(a), str(b), str(c), str(d)]).encode()).hexdigest()[0:4],
                    "ip": '.'.join([str(a), str(b), str(c), str(d)])
                } for d in range(0, 256, 1) for c in range(0, 256, 1) for b in range(0, 256, 1))
            )
    end = timer()
    ip_list.to_csv('.'.join([str(a), 'csv']), index=False)
    end2 = timer()
    print('Time to generate', '.'.join([str(a), 'X', 'X', 'X']), ":'", str(round(end-start, 2)), "seconds")
    print('Time to export', '.'.join([str(a), 'X', 'X', 'X']), ":'", str(round(end2-start, 2)), "seconds")

cores = mp.cpu_count()

with mp.Pool(int(cores/2)) as pool:
    iterators = range(1, 256, 1)
    pool.map(process, iterators)

You might notice that I'm runnning the code above on 1/2 of the available cores. The reason is that I want to use my computer for other activities as well. The results:

  • with 6 cores, I'm able to generate 6 /8s for 10 minutes: roughly 2 minutes per /8

  • with 16 cores, generation time is 30 minutes, again a little less than 2 minutes per /8

In both cases, writing to disk takes 60-70s, and that's a limitation I'll have to live with. In the end, we managed to cut our work from ~212 days down to 8 hours, a 650x improvement.

Further reading

  • If you are looking to share memory between cores or processes, a package called SharedArray can help.

  • You might want to share the work between multiple servers via a common queue. Consider using RQ.