Setting Up Your Environment
Python and Pandas for ETL
Handling JSON and CSV Files
Python with AWS SDK (Boto3)
Python & SQL (with SQLite/MySQL)
Data Cleaning with Pandas
Working with APIs in Python
Building Batch Jobs in Python
Real-Time Data Pipelines with Python
Logging & Error Handling in Python
ETL Jobs with Cron and AWS Lambda
Building Batch Jobs in Python: A Complete Guide for Data Engineers
Batch jobs are the backbone of many data engineering pipelines. Whether you are aggregating logs, transforming large datasets, or pulling data from APIs at scheduled intervals, batch jobs are the reliable, repeatable processes that run behind the scenes. Python, with its simplicity and rich ecosystem of libraries, is one of the most popular languages for building robust batch data pipelines.
In this comprehensive guide, we’ll explore:
What are batch jobs?
Use cases in data engineering
Core components of a Python batch job
Scheduling tools (cron, Airflow, AWS Step Functions)
Handling errors and retries
Logging and monitoring
Real-world example with code
Best practices
Let’s get started.
What Are Batch Jobs?
A batch job is a scheduled program that runs without user interaction. These jobs are usually triggered at fixed intervals (daily, hourly, weekly) or based on conditions (like a new file in a directory).
Examples:
Pulling data from an API every day at midnight
Cleaning and transforming data from a CSV file
Loading data into a database
Generating daily reports
Batch Jobs in Data Engineering
As a data engineer, batch jobs are your daily tools. Typical batch job tasks include:
ETL (Extract, Transform, Load): Pulling data from a source, transforming it, and loading it into a data warehouse.
Data cleaning: Removing duplicates, handling null values, formatting fields.
File processing: Reading and transforming large CSV, JSON, or Parquet files.
Scheduled analytics: Running predefined queries or reports every morning.
Backfilling data: Reprocessing historical data.
Core Components of a Python Batch Job
A good batch job typically includes:
Scheduler/trigger – Defines when the job runs
Job script – The Python script that executes the logic
Error handling – Try/except blocks and retries
Logging – Track execution and failures
Notification – Email or Slack alerts
Monitoring – Track job status over time
Scheduling Tools
1. Cron (Linux/macOS)
Most simple batch jobs are scheduled using cron
:
0 2 * * * /usr/bin/python3 /home/user/scripts/daily_etl.py
Runs the job every day at 2 AM.
2. Apache Airflow
A production-grade workflow orchestrator. Write workflows as DAGs (Directed Acyclic Graphs).
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def process_data():
# Your batch logic here
pass
dag = DAG('daily_job', start_date=datetime(2023, 1, 1), schedule_interval='@daily')
run_task = PythonOperator(task_id='run_batch', python_callable=process_data, dag=dag)
3. AWS Step Functions / EventBridge / Lambda
Cloud-native solution to schedule and chain batch processes.
Use Lambda to run small batch tasks
Use Step Functions to orchestrate multiple steps
Use EventBridge to schedule Lambda jobs
Writing a Batch Script in Python
import logging
from datetime import datetime
import pandas as pd
import requests
logging.basicConfig(filename='batch_job.log', level=logging.INFO)
URL = 'https://api.example.com/data'
FILENAME = f"data_{datetime.today().strftime('%Y%m%d')}.csv"
try:
logging.info("Starting batch job")
res = requests.get(URL)
res.raise_for_status()
data = res.json()
df = pd.json_normalize(data)
df.to_csv(FILENAME, index=False)
logging.info(f"Data saved to {FILENAME}")
except Exception as e:
logging.error("Batch job failed", exc_info=True)
Packaging and Dependencies
Use requirements.txt
to keep your dependencies organized:
pandas
requests
boto3
Run your job inside a virtual environment:
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
Handling Errors and Retries
Wrap key blocks in try/except and optionally retry:
import time
retries = 3
for i in range(retries):
try:
response = requests.get(URL)
response.raise_for_status()
break
except requests.RequestException as e:
if i < retries - 1:
time.sleep(5)
else:
raise e
Logging and Monitoring
Always log:
Start and end time
Any warnings or errors
Rows processed or files written
Use tools like:
CloudWatch Logs (AWS)
Datadog or Prometheus
ELK Stack (Elasticsearch + Logstash + Kibana)
Notifications
Alert on failure or success:
Email – Use SMTP
Slack – Use Slack Webhooks
AWS SNS – For cloud-native notifications
Best Practices
Use parameterized configs (YAML or JSON)
Keep your code modular and testable
Avoid hardcoded file paths or credentials
Retry and fail gracefully
Monitor job history and failures
Store logs and outputs in S3 or a shared directory
Use version control (Git) for scripts
Real-World Example: Batch Job for Weather Data
import requests
import pandas as pd
import time
from datetime import datetime
API_KEY = 'your_openweather_api_key'
CITY = 'New York'
URL = f"http://api.openweathermap.org/data/2.5/weather?q={CITY}&appid={API_KEY}"
try:
print("Starting weather job")
res = requests.get(URL)
weather = res.json()
df = pd.json_normalize(weather)
df['timestamp'] = datetime.now()
df.to_csv(f"weather_{CITY}.csv", index=False)
print("Data saved.")
except Exception as e:
print("Error fetching weather:", e)
Conclusion
Python makes it easy to build reliable batch jobs for your data engineering pipelines. Whether you’re working with cron, Airflow, or AWS services, the key is writing clean, modular code with solid error handling and monitoring. With this guide, you now have a strong foundation to build and scale your own batch processing workflows.