Hmmm. I feel like error handling is a strange topic for most Data Engineers, more so than your average Software Engineer writing TypeScript all day.
We do different things, write different code, and therefore have different needs.
I’ve working on Data Platforms running 1,000 nodes on Kubernetes. I’ve worked on small-footprint Python scripts running on a server … and everything in between.
Exception Handling is not an often thought-about topic, yet has major ramifications down the road … when you have to crack open Pandora’s Box to solve a problem. Then it matters.
You should check out Prefect, the sponsor of the newsletter this week! Prefect is a workflow orchestration tool that gives you observability across all of your data pipelines. Deploy your Python code in minutes with Prefect Cloud.
What are we trying to accomplish with Error Handling?
This is an interesting question and I think the answer is different for most Data Engineering teams than it is for say … front-end folks writing TypeScript all day.
Data Engineers aren’t worried about handling all errors in a graceful manner, we aren’t building user-facing applications.
If you are a normal Front-End engineer, you want everything to go smoothly, you don’t want your users ever to see some random Traceback or StackOverflow error.
For Data Engineers when it comes to Error Handling this is completely different.
Data Engineers want data pipeline errors to be easily traceable, researched, and discoverable, giving an exact idea of what and where the problem occurred.
This is where Error handling on Data Teams starts to diverge from most of what we would think of as “normal” Software Engineering. Let’s dive into Error Handling Data Engineering style.
Error Handling for Data Engineering.
Let’s list the main things that Data Engineers should be thinking about when they are creating Data Pipelines.
A central repository of discovery for all Errors.
Alerting around Errors.
Some sort of “logging” module.
Error handling “metadata.”
The core concepts are at the heart of what we want for Error Handling as Data Engineers.
Central Repository
We Data Engineers have been sent from above to tame the teaming masses of Data Pipelines that run every day on our watch. Unlike some Software Engineers who handle errors in a single application, we are dealing with many hundreds of Data Pipelines.
If we have errors coming from multiple different sources, we need a central place from which we can look, discover, and dig into those errors.
Amazon CloudWatch is a good example of this. Having the ability to go to a single place where one can search and discover errors in a time series-type fashion is a game changer.
You don’t want to have many Data Pipelines producing and shipping errors to various spots in various ways. You need to be able to say … “Look, if there are Errors … the logs for them are always going to be in this one spot.”
For example, you can read here about using Prefect for observability into Cloud Watch logs.
Sometimes half the battle of debugging can be finding the correct error and finding its root cause, taking half that battle away, and making it easy.
Alerting around Errors.
Another core tenant around Error messages in Data Engineering is alerting and notifications. Although Slack may be the bane of many Engineer’s existence, they do serve a purpose.
Data Teams typically have a number of pipelines that are extremely important to the business, for decision-making and customers. We need to know if things go wrong.
How else can you fix something if you don’t know when things have gone south?
For example, many tools provide out-of-the-box Slack integrations to this common use case. Making it extremely easy to be notified of pipeline failures.
from prefect_slack import SlackWebhook
SlackWebhook(url="WEBHOOK_URL_PLACEHOLDER").save("BLOCK-NAME-PLACEHOLDER")
Like it or not, having Slack on your phone can be annoying, but very helpful!
Logging Modules and Error Meta Data
You might be thinking … why talk about logging when writing about error handling? Well, any sort of templating and meta-data around error handling can make a huge difference when it comes to the business of actually making something of an error message.
What do I mean …
Timestamps and dates
Logging from activities leading up to the error.
Templated meta-data about the pipeline itself etc.
For example …
logging.basicConfig(
format="%(asctime)s %(levelname)s - Data Warehouse Staging Load - %(message)s",
level=logging.INFO,
)
logging.getLogger().setLevel(logging.INFO)
logging.info("Assigning and tracking job etl id.")
This sort of information combined with actual error logs can help hone in on exactly where the problem happened.
If you have never run across a good logging module, I recommend looking into this one, for Python.
Some specifics about Error Handling.
I wanted to take a little time to dig into some examples of Error Handling, mostly based around what you should NOT do. Personal opinion, but it does come with some experience.
Here is what I don’t like to see for Error Handling in a Data Engineering context.
try:
spark.sql(sql_etl_cmd)
except Exception as e:
"""Delta Lake throws random ConcurrentAppendException's randomly when using COPY INTO.
Attempt to solve this by catching and retrying after a sleep."""
if "ConcurrentAppendException" in str(e):
logging.info(
"Encountered ConcurrentAppendException exception .. trying again."
)
sleep(180)
spark.sql(sql_etl_cmd)
else:
# kill script for unknown exception
print(e)
exit(1)
I really have a love-hate relationship with `try: execpt` logic used in a lot of spots to try and “catch” and deal with errors.
Why? Because it’s really easy to miss something and layer in so much logic that the real underlying stack track for the error can get hidden in the noise. It can make debugging harder than it needs to be.
It’s simply dangerous in my mind. Feel free to do it, but move wisely and cautiously.
For example, here is a raw error log dump from a production failure that is without any `try:except` logic.
[2024-01-02, 13:50:09 UTC] [2024-01-02, 13:50:09 UTC] {{taskinstance.py:1851}} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.10/site-packages/airflow/operators/python.py", line 175, in execute
return_value = self.execute_callable()
File "/usr/local/airflow/.local/lib/python3.10/site-packages/airflow/operators/python.py", line 193, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/usr/local/airflow/dags/airflow_utils/sftp_transfer.py", line 199, in fiserv_file_processing
r.sftp_run_command(ssh, cmd=cmd)
File "/usr/local/airflow/dags/airflow_utils/sftp_transfer.py", line 105, in sftp_run_command
return stdout.readlines()
File "/usr/local/airflow/.local/lib/python3.10/site-packages/paramiko/file.py", line 349, in readlines
line = self.readline()
File "/usr/local/airflow/.local/lib/python3.10/site-packages/paramiko/file.py", line 291, in readline
new_data = self._read(n)
File "/usr/local/airflow/.local/lib/python3.10/site-packages/paramiko/channel.py", line 1361, in _read
return self.channel.recv(size)
File "/usr/local/airflow/.local/lib/python3.10/site-packages/paramiko/channel.py", line 699, in recv
out = self.in_buffer.read(nbytes, self.timeout)
File "/usr/local/airflow/.local/lib/python3.10/site-packages/paramiko/buffered_pipe.py", line 160, in read
self._cv.wait(timeout)
File "/usr/lib/python3.10/threading.py", line 320, in wait
waiter.acquire()
File "/usr/local/airflow/.local/lib/python3.10/site-packages/airflow/utils/timeout.py", line 69, in handle_timeout
raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 29035
For me, it’s easy within a minute to hone down on actually what this bare exception is telling me, I don’t have to dig through other weird `try:except` logic that might have been triggered and buried the problem.
I can deduce that `sftp_run_command()` with a `handle_timeout` error, not suprising in a SFTP context. Easy.
Wrapping Up
So what did we learn?
Have a central spot or technology that gathers all your logs and errors (like Cloud Watch)
Have good alerting and notifications on your errors (like Slack)
Have plenty of metadata to go with your errors, use a logging library.
Be careful how you handle errors.
Let me know in the comments how you handle your errors in all your pipelines!
to clarify, can you provide an example of what does not work well and detail / code of what would work well?
ie you say:
For me, it’s easy within a minute to hone down on actually what this bare exception is telling me, I don’t have to dig through other weird `try:except` logic that might have been triggered and buried the problem.
so you are saying or propsing to just have not try:except or what would the code you write look like in the absence of try:except
in a try except block, you can raise the error and that entire error log dump will show