Azure Tables & Python
Background
In the previous article I talked about generating SHA-256 hashes at scale via multiprocessing. Remember, the task was to generate a reverse-lookup table for all IP addresses. I want that table persisted in Azure Tables - a cloud-based key-value store, that would allow rapid lookups at scale.
These "rapid" lookups are facilitated by the provision of partition keys - a leading subset of the lookup string (in our case, the hash). The partition key allows the table engine to run a nested query - a short "partition key" filter, followed up by a slower string comparisson of the full lookup key.
You have to choose your partition key wisely - if it's short, lookups will be fast, but not near-instant (the second lookup processes more comparissons). If your partition key is longer, the partition lookup is slower. While I don't need to in my case, one should also consider how the data would grow in the future and plan for that. AFAIK, partition keys are immutable.
I've arbitrarily chosen a partition key of 4 characters - that splits the total 4.3B records in 65536 groups with ~65k records in each group.
Challenge
Now for the true challenge: writing is as slow as your network, which is usually 300-500ms per transaction. Multiply that RTT (round-trip time), add an occasional replay (write error, requiring resending of the data) and we're looking at...
4.3B * 0.33s = 16 570 days ???
Not if we can help it.
Hack #1: documentation aka. RTFM
Azure table supports transactional writes, under the following limitations:
batches are 100 records (or less) long
batches contain the same partition key
Under perfect conditions, this lowers our effort to 165 days. Definitely a 3.6 result.

So we need to work harder.
Hack #2: web requests in parallel
No biggie, quite straightforward if you ever did web scraping. But still, the considerations:
you are limited by your network bandwidth to a predefined transfer rate
network parameters like latency, jitter, throughput - and of course, availability - play a role in the equation as well [1]
you are limited by the number of cores on your system
running over 1000 threads is problematic for the python interpreter
you need to balance memory and throughput
your hardware configuration and network availability will render very different results than mine
Because of the last 2 points, I suggest you time all steps and log for comparisson purposes. You can try out various combinations and see what works in your case (cores/memory/network). For timing processes, I use the timeit library. Choose whatever tool and verbosity work for you, just do it! If you have 2TB of RAM and a black fiber Express Route to an Azure DC, you probably don't need to read further - just generate all in memory, split it in 65536 partitions and transmit the ~65k records in 10*650 parallel threads of 100 records each. It should take you less than 18 hours. If you're a mere mortal, continue reading :)
Approach
By leveraging the generation approach described earlier, the time to generate a /8 network I've achieved (32 core Xeon w. 64G RAM) is an average of 23s. [2] We'll use the /8 as building blocks for our optimization task.
Now, a /8 has 1.6M records, meaning the 65536 partitions we're filling would have around 256 records - enough for 3 requests to the Azure Table.
The process in a nutshell looks like this:
SHA-256 records are generated for a /8 network in a Pandas
DataFrame.The
DataFrameis filtered by the commonPartitionKey.The records are prepared and transmitted in parallel.
At this point, we perceive (2) as the slowest process. Why don't we send more records in parallel?
Consider this: a /8 takes around 5.5-6 GB of RAM. Based on your hardware, you may decide that generating 2* /8 dataframes, appending them as one is more efficient (thus filling the 65536 partitions with ~512 records, for ~6 requests). But soon you'll realise that (1) gets significantly slower... around the point you fill 1/2 of the physical memory of your machine, it starts really dragging.
Obviously, unless you use dask (which we will explore in our next post), Spark, MapReduce or any similar alternative, (1) is synchronous and slow. So, it's up to you to measure and optimize.
One more detail is also worth noting: you probably noticed that I used "around" for the number of records. Indeed, due to natural distribution, you might encounter partitions with less than 220 records or more than 270. These fluctuations, also due to normal distribution, get smaller the more records you add to the partitions. Therefore, if you generate 3* /8 networks, you're almost guaranteed you'll need exactly 8 transactions to Azure. Another observation: if you go with 2* /8 networks, the last request is a little wasteful, as you're only sending ~12 records through (out of ~512).
Code
To parallize requests to Azure, it's most efficient to use Threads rather than Processes. Remember, these would be IDLE most of the time:
def transact(operations : list): table_client.submit_transaction( operations ) def split_and_submit(partition : list): operations = [] tpool = ThreadPool(processes = 32) for record in partition: operations.append(('upsert', record)) if len(operations) == 100: rq = tpool.map_async(transact, [operations]) operations = [] rq.get() if len(operations) > 0: rq = tpool.map_async(transact, [operations]) rq.get() tpool.close()
The above setup will create threads for each group of 100 records. Now you can (and should) run ThreadPools in parallel Processes too:
alpha = generate(20) alpha.index = alpha["PartitionKey"].copy() alpha.index.name = None iterator = list(alpha["PartitionKey"].unique()) processed = return_temp('processed.temp') unprocessed = [i for i in iterator if i not in processed] alpha = alpha.drop(processed) for chunk in chunker(unprocessed, int(CORES)): old = timer() print("chunk", end=' ') a = alpha.loc[chunk] print("located in ", round(timer()-old, 2), "s;", sep = '', end=' ') with mp.Pool(CORES) as pool: while True: try: r = pool.map_async( split_and_submit, [(a.loc[c]).to_dict('records') for c in chunk] ) r.get() except: continue break pool.join() pool.close() print("submitted to Azure in ", round(timer()-old, 2), "s;", sep = '', end=' ') processed.extend(chunk) with open("processed.temp", 'w') as file: writer = csv.writer(file) writer.writerow(processed) print("and finalized in ", round(timer()-old, 2), "s; progress:", round(int(len(processed)/32)/20.48, 2), '%', sep = '')
The code above uses a file - "processed.temp", to persist the processed partitions in case of errors. As the main process runs for hours and is error prone, it makes sense to track your progress.
Altenatively, you can use:
ThreadPoolsinstead of MultiprocessPools that callThreadPool. That's tricky and will be more unstable - and eventually the instability translates to time losses.A CPU-rich GPU for the parallel processing
A
ThreadPoolwith dynamic feed of partitions. The tricky part here is that as part (1) above is slow, so you need to filter and split partitions inmp.Poolsto feed theThreadPool
Comments
Comments powered by Disqus