Setting Up Your Environment
Python and Pandas for ETL
Handling JSON and CSV Files
Python with AWS SDK (Boto3)
Python & SQL (with SQLite/MySQL)
Data Cleaning with Pandas
Working with APIs in Python
Building Batch Jobs in Python
Real-Time Data Pipelines with Python
Logging & Error Handling in Python
ETL Jobs with Cron and AWS Lambda
Logging & Error Handling in Python for Data Engineering: A Complete Guide
Building robust and production-ready data pipelines is more than just writing code that works—it’s about ensuring your systems are reliable, maintainable, and traceable. That’s where logging and error handling come into play. In this guide, we’ll explore how to implement effective logging and error management in Python specifically tailored for data engineering workflows.
You’ll learn:
What is logging and why it matters
Different types of logs in data pipelines
Setting up Python logging (basic to advanced)
Best practices for structured logging
Error handling in Python: try-except, custom exceptions
Integrating logging with cloud environments (AWS, GCP)
Tools for log aggregation and monitoring
What is Logging and Why is It Important?
Logging is the process of recording messages about a program’s execution. In data engineering, logs help you:
Debug issues: Trace pipeline failures and bugs
Monitor performance: Track execution time, bottlenecks
Audit activity: Record job runs, user actions, API calls
Maintain accountability: Know what ran, when, and by whom
Without proper logging, you’re essentially flying blind in production.
Types of Logs in Data Engineering
Info logs: Track standard events (start/end of ETL job, file read/write)
Warning logs: Indicate potential problems (missing fields, fallback defaults)
Error logs: Indicate a failure that should be addressed (connection failure, parsing error)
Debug logs: Detailed insights used during development (row-level transformations)
Setting Up Python Logging (Basic to Advanced)
Basic Logging Example:
import logging
logging.basicConfig(level=logging.INFO)
logging.info("ETL job started")
logging.warning("Missing column detected")
logging.error("Database connection failed")
Writing Logs to a File:
logging.basicConfig(
filename='pipeline.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
Adding Context:
user = "data_engineer"
logging.info(f"Job started by {user}")
Structured Logging with JSON
Structured logging makes logs easier to parse, search, and analyze.
import json
import logging
class JsonFormatter(logging.Formatter):
def format(self, record):
log_record = {
'timestamp': record.asctime,
'level': record.levelname,
'message': record.getMessage(),
'module': record.module
}
return json.dumps(log_record)
handler = logging.FileHandler('etl_log.json')
handler.setFormatter(JsonFormatter())
logger = logging.getLogger('etl_logger')
logger.addHandler(handler)
logger.setLevel(logging.INFO)
logger.info("ETL job started")
Error Handling in Python
Using Try-Except Blocks:
try:
result = 10 / 0
except ZeroDivisionError as e:
logging.error(f"ZeroDivisionError: {e}")
Handling File I/O:
try:
with open('data.csv') as f:
data = f.read()
except FileNotFoundError:
logging.warning("File not found, skipping this step")
Creating Custom Exceptions:
class DataValidationError(Exception):
pass
def validate_data(data):
if 'id' not in data:
raise DataValidationError("Missing 'id' in data")
Logging in Cloud Environments
AWS Lambda:
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
logger.info("Lambda triggered with event: %s", event)
AWS CloudWatch Integration:
Logs are automatically pushed from Lambda, Glue, Step Functions
Use filters in CloudWatch to monitor specific errors
GCP Stackdriver:
Python
logging
integrates natively with StackdriverJSON logs are easily searchable with structured metadata
Tools for Log Aggregation & Monitoring
ELK Stack:
Elasticsearch: Store and search logs
Logstash: Ingest and parse logs
Kibana: Visualize logs
Datadog:
Real-time log monitoring and alerts
Integrates with AWS, Python
Grafana Loki:
Lightweight, highly scalable log aggregation
Good for microservices and K8s logging
Best Practices for Logging and Error Handling
Log everything critical: Job start/end, errors, retries
Avoid logging sensitive info: Mask user data, credentials
Use structured logs: For easier parsing and querying
Categorize logs by severity: Use INFO, WARNING, ERROR correctly
Always handle known exceptions: Prevent pipeline crashes
Use retries and exponential backoff: For transient failures
Set up alerts for errors: So you act before your users do
Real-World Scenario: ETL Job with Logging & Error Handling
import logging
import pandas as pd
logging.basicConfig(filename='etl.log', level=logging.INFO)
def run_etl():
try:
logging.info("ETL started")
df = pd.read_csv('data.csv')
if df.empty:
raise ValueError("DataFrame is empty")
# Transformation logic here
df['processed'] = True
df.to_csv('output.csv', index=False)
logging.info("ETL completed successfully")
except Exception as e:
logging.error(f"ETL failed: {e}")
run_etl()
Conclusion
Logging and error handling aren’t just technical chores—they’re critical aspects of building resilient, scalable, and maintainable data systems. With Python’s rich logging libraries and best practices, you can make your ETL and real-time data pipelines reliable and production-grade.
By implementing the strategies outlined here, your data engineering projects will be more robust, and your team will spend less time firefighting and more time innovating.