Logging & Error Handling in Python for Data Engineering: A Complete Guide

Building robust and production-ready data pipelines is more than just writing code that works—it’s about ensuring your systems are reliable, maintainable, and traceable. That’s where logging and error handling come into play. In this guide, we’ll explore how to implement effective logging and error management in Python specifically tailored for data engineering workflows.

You’ll learn:

  • What is logging and why it matters

  • Different types of logs in data pipelines

  • Setting up Python logging (basic to advanced)

  • Best practices for structured logging

  • Error handling in Python: try-except, custom exceptions

  • Integrating logging with cloud environments (AWS, GCP)

  • Tools for log aggregation and monitoring


What is Logging and Why is It Important?

Logging is the process of recording messages about a program’s execution. In data engineering, logs help you:

  • Debug issues: Trace pipeline failures and bugs

  • Monitor performance: Track execution time, bottlenecks

  • Audit activity: Record job runs, user actions, API calls

  • Maintain accountability: Know what ran, when, and by whom

Without proper logging, you’re essentially flying blind in production.


Types of Logs in Data Engineering
  1. Info logs: Track standard events (start/end of ETL job, file read/write)

  2. Warning logs: Indicate potential problems (missing fields, fallback defaults)

  3. Error logs: Indicate a failure that should be addressed (connection failure, parsing error)

  4. Debug logs: Detailed insights used during development (row-level transformations)


Setting Up Python Logging (Basic to Advanced)
Basic Logging Example:
import logging

logging.basicConfig(level=logging.INFO)
logging.info("ETL job started")
logging.warning("Missing column detected")
logging.error("Database connection failed")
Writing Logs to a File:
logging.basicConfig(
    filename='pipeline.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
Adding Context:
user = "data_engineer"
logging.info(f"Job started by {user}")

Structured Logging with JSON

Structured logging makes logs easier to parse, search, and analyze.

import json
import logging

class JsonFormatter(logging.Formatter):
    def format(self, record):
        log_record = {
            'timestamp': record.asctime,
            'level': record.levelname,
            'message': record.getMessage(),
            'module': record.module
        }
        return json.dumps(log_record)

handler = logging.FileHandler('etl_log.json')
handler.setFormatter(JsonFormatter())
logger = logging.getLogger('etl_logger')
logger.addHandler(handler)
logger.setLevel(logging.INFO)

logger.info("ETL job started")

Error Handling in Python
Using Try-Except Blocks:
try:
    result = 10 / 0
except ZeroDivisionError as e:
    logging.error(f"ZeroDivisionError: {e}")
Handling File I/O:
try:
    with open('data.csv') as f:
        data = f.read()
except FileNotFoundError:
    logging.warning("File not found, skipping this step")
Creating Custom Exceptions:
class DataValidationError(Exception):
    pass

def validate_data(data):
    if 'id' not in data:
        raise DataValidationError("Missing 'id' in data")

Logging in Cloud Environments
AWS Lambda:
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    logger.info("Lambda triggered with event: %s", event)
AWS CloudWatch Integration:
  • Logs are automatically pushed from Lambda, Glue, Step Functions

  • Use filters in CloudWatch to monitor specific errors

GCP Stackdriver:
  • Python logging integrates natively with Stackdriver

  • JSON logs are easily searchable with structured metadata


Tools for Log Aggregation & Monitoring
ELK Stack:
  • Elasticsearch: Store and search logs

  • Logstash: Ingest and parse logs

  • Kibana: Visualize logs

Datadog:
  • Real-time log monitoring and alerts

  • Integrates with AWS, Python

Grafana Loki:
  • Lightweight, highly scalable log aggregation

  • Good for microservices and K8s logging


Best Practices for Logging and Error Handling
  1. Log everything critical: Job start/end, errors, retries

  2. Avoid logging sensitive info: Mask user data, credentials

  3. Use structured logs: For easier parsing and querying

  4. Categorize logs by severity: Use INFO, WARNING, ERROR correctly

  5. Always handle known exceptions: Prevent pipeline crashes

  6. Use retries and exponential backoff: For transient failures

  7. Set up alerts for errors: So you act before your users do


Real-World Scenario: ETL Job with Logging & Error Handling
import logging
import pandas as pd

logging.basicConfig(filename='etl.log', level=logging.INFO)

def run_etl():
    try:
        logging.info("ETL started")
        df = pd.read_csv('data.csv')
        if df.empty:
            raise ValueError("DataFrame is empty")

        # Transformation logic here
        df['processed'] = True

        df.to_csv('output.csv', index=False)
        logging.info("ETL completed successfully")

    except Exception as e:
        logging.error(f"ETL failed: {e}")

run_etl()

Conclusion

Logging and error handling aren’t just technical chores—they’re critical aspects of building resilient, scalable, and maintainable data systems. With Python’s rich logging libraries and best practices, you can make your ETL and real-time data pipelines reliable and production-grade.

By implementing the strategies outlined here, your data engineering projects will be more robust, and your team will spend less time firefighting and more time innovating.