The 500-line notebook problem: A framework for data pipelines that don't break at scale

August 9, 2025

Last week, I opened a colleague's "quick data processing script." It was 500 lines of Jupyter notebook, five utility functions deep, with cells that had to be run in a specific order that only they understood.

I've written that notebook. You've probably written it too.

It starts innocently. You just need to transform some data for an experiment. load_dataset(). A few pandas operations. Maybe a progress bar. But then the data has edge cases. The script needs error handling. And before you know it, you're maintaining infrastructure disguised as research code.

Here's what I've learned: the distance between "research script" and "production pipeline" is shorter than we think. The code I wrote to "just get results" is now containerized, running on Kubernetes, with structured logging and a dead letter queue for failed records.

The problem isn't that we're building infrastructure by accident. It's that we're building it badly because we don't admit what we're doing.

In this post, I'll share the framework I use to write data pipelines that can grow from notebook experiments without the usual descent into chaos.

1. Shorten Your Feedback Loop (Or: Why Your First Run Always Fails)

The rookie mistake? Running your script on the full dataset right away. We assume our code will just work. It rarely does. First runs often fail due to bugs, config errors, or some obscure cloud permission you forgot even existed.

The faster you get feedback, the faster you can fix things. This is what I call the feedback cycle. A tight feedback cycle doesn't just catch bugs, it also helps you validate changes quickly.

Two ways to shorten the feedback cycle:

  • Write automated tests.
  • Run your pipeline on a small subset of the data.

Everybody talks about tests, but here’s what I found out works best for data pipelines:

Test fixtures made out of real data

A fixture is any fixed state or set of resources your tests need to work consistently every time they run.

Picture a CSV filled with test records, or a JSON file pretends to be an API response.

Fixtures are incredibly useful for keeping tests organized, but here's where many teams go wrong: they populate fixtures with made-up data that doesn't reflect the messiness of real-world inputs.

Take your fixtures straight from actual data sources instead. You'll end up with tests that are much more resilient and catch the kinds of edge cases that only appear when dealing with real data.

Run on a subest of the data

But, even with tests, things will break at scale.

That’s why I always scale up gradually the amount of data I'm working with. Starting with just 10 records and increase by x10 or x100 until reaching the full scale of the data.

This helps isolate issues like misconfigurations, memory problems, or poor network performance before doing the full run.

2. Limit Data During Development

Want to run your pipeline on just a few records? Take the first N records.

Whether you're working in BigQuery or Pandas, there’s always a way to limit the data: In SQL:

SELECT * FROM my_table LIMIT 100

In Pandas:

df[:100]

I usually add a --max-records flag to my scripts that specifies the maximum number of records to load. When set, the code only loads the specified number of records, either using a LIMIT clause in SQL or slicing a list/DataFrame.

3. Structure Your Logs for Debugging at Scale

tqdm gives you a nice progress bar while you iterate in a notebook.

But once your script runs remotely, or streams logs to a centralized system, tqdm turns into noise. It clutters your logs and breaks anything expecting structured output.

Here’s what to do instead - replace human-friendly progress bars with machine-readable logs.

Instead of printing this:

100%|██████████| 500/500 [00:10<00:00, 49.98it/s]

Log this:

{
  "item_id": "abc123",
  "status": "processed",
  "duration": 0.134
}

Now your logs can be:

  • Queried by field
  • Filtered by time or status
  • Debugged at scale

In Python, you can set this up with a custom logging.Formatter:

import logging, json, threading

class JsonFormatter(logging.Formatter):
    def format(self, record):
        return json.dumps({
            "level": record.levelname,
            "message": record.getMessage(),
            "timestamp": self.formatTime(record, self.datefmt),
            "function": record.funcName,
            "file": record.pathname,
            "line": record.lineno,
            "thread": threading.get_ident(),
            # add custom fields from logger.info(..., extra={...})
            "extra": {k: v for k, v in record.__dict__.items() if k not in logging.LogRecord('', 0, '', 0, '', (), None).__dict__}
        })

Then, configure your logger to use the new foramtter:

handler = logging.StreamHandler()
handler.setFormatter(JsonFormatter())

logger = logging.getLogger("my_logger")
logger.setLevel(logging.INFO)
logger.addHandler(handler)

logger.info("Processing started", extra={"item_id": "abc123"})

4. Treat Failures as Data

As your data scales, some records will fail to process. And that’s normal. You won’t always know what kinds of edge cases or errors to expect ahead of time.

That’s why you should treat failures as first-class citizens: capture every failed record and store it somewhere queryable.

I usually write these to a dedicated failed_records table in the data warehouse.

Step 1: Create a Failed Records Table (BigQuery Example)

CREATE TABLE IF NOT EXISTS `project.dataset.failed_records` (
    record_id STRING,
    error_type STRING,
    error_message STRING,
    stack_trace STRING,
    timestamp TIMESTAMP,
    raw_data JSON,
    pipeline_run_id STRING,
    retry_count INT64,
    -- Partitioning by date for cost efficiency
    _PARTITIONTIME TIMESTAMP
)
PARTITION BY DATE(_PARTITIONTIME)
CLUSTER BY error_type, pipeline_run_id;

To make it easier to track pipeline runs, I include a unique run ID per job execution:

# Set this at the start of your job
export PIPELINE_RUN_ID="pipe_$(date +%Y-%m-%d)_$(git rev-parse --short HEAD)"

Step 2: Capture Failures in Your Code

from datetime import datetime, timezone

def process_with_error_capture(record, failed_records_writer):
    try:
        result = transform(record)
        return result
    except Exception as e:
        failed_record = {
            'record_id': record.get('id'),
            'error_type': type(e).__name__,
            'error_message': str(e),
            'stack_trace': traceback.format_exc(),
            'timestamp': datetime.now(timezone.utc).isoformat(),
            'raw_data': json.dumps(record),
            'pipeline_run_id': os.environ.get('PIPELINE_RUN_ID'),
            'retry_count': record.get('retry_count', 0)
        }
        failed_records_writer.write(failed_record)
        logger.error(f"Failed to process {record.get('id')}", extra=failed_record)

Some data processing frameworks have a specific way to capture errors (like Apache Beam's .with_execption_handling()), so make sure you use those.

Step 3: Debug with SQL

-- What types of errors are we hitting?
SELECT 
    error_type,
    COUNT(*) as error_count,
    ARRAY_AGG(error_message IGNORE NULLS ORDER BY timestamp DESC LIMIT 1)[OFFSET(0)] as latest_error
FROM `project.dataset.failed_records`
WHERE pipeline_run_id = 'prod-2024-01-15-v2'
GROUP BY error_type
ORDER BY error_count DESC;

-- Show me examples of KeyError failures
SELECT 
    record_id,
    error_message,
    JSON_EXTRACT_SCALAR(raw_data, '$.user_id') as user_id,
    timestamp
FROM `project.dataset.failed_records`
WHERE error_type = 'KeyError'
  AND pipeline_run_id = 'prod-2024-01-15-v2'
LIMIT 5;

-- Which records keep failing across multiple runs?
SELECT 
    record_id,
    COUNT(DISTINCT pipeline_run_id) as failed_runs,
    ARRAY_AGG(DISTINCT error_type IGNORE NULLS) as error_types
FROM `project.dataset.failed_records`
WHERE DATE(_PARTITIONTIME) >= CURRENT_DATE() - 7
GROUP BY record_id
HAVING failed_runs > 2;

5. Make Every Run Reproducible

Your pipeline should be deterministic. Same inputs and configs should produce the same output. Same number of records, same file paths, same table names.

Repeatability is the foundation of debugging. If each run changes slightly, you’ll never know if a bug is real or just a fluke. Here’s what makes runs repeatable:

Fixed random seeds

If your code uses randomness (e.g. shuffling data, sampling records), set the seed:

import random, numpy as np  

random.seed(42)  
np.random.seed(42)

Stable output paths

I often generate output paths using a hash of the input data and config.

For example:

config_hash = hashlib.md5(json.dumps(config, sort_keys=True).encode()).hexdigest()
output_path = f"outputs/run_{config_hash}.jsonl"

Freeze configs

Capture the exact setup that produced your results. Filters, batch size, feature flags, etc.

I log the config at the start of every run, and save it as a _config.yaml file in the output dir.

Here a config example from one of the data pipelines I built:

job_name: omnicorpus-cc
container_image: ...
dataset:
  hf_id: OpenGVLab/OmniCorpus-CC
  interleaved: true
  text_col: texts
  url_col: images
  img_meta_col: metadata
  example_meta_col: general_metadata
image_token: <|image|>
dataflow:
  machine_type: n2d-standard-16
  number_of_worker_harness_threads: 128
num_shards: 65_536
output_dir: gs://.../...
query: |
  SELECT
    *
  FROM
    `vlm.multi_modal_dev.omnicorpus-cc_raw_metadata`
  WHERE
    --- filter out rows with too large text
    NOT EXISTS (
    SELECT
      1
    FROM
      UNNEST(texts.list) AS t
    WHERE
      LENGTH(t.element) >= 262144 )
    AND NOT EXISTS (
    SELECT
      1
    FROM
      UNNEST(metadata.list) AS m
    WHERE
      --- filter out rows with too large or too small images
      m.element.width < 28
      OR m.element.width > 20000
      OR m.element.height < 28
      OR m.element.height > 20000 )
  LIMIT
    100
# we want to double check that the metadata is consistent with the actual images we download
data_filters:
  side_len_cutoff: [28, 20_000]
  text_len_max_cutoff: 262_144  # 2^18
image_processing:
  target_size: 1_120
  resample_algo: BICUBIC

Log the Git SHA

Store the Git SHA of the code that produced the results. This makes it easy to trace outputs back to a specific commit.

For example, you can embed it in logs or output paths:

export GIT_SHA=$(git rev-parse --short HEAD)

6. Save Intermediate Results

As your pipeline grows in size and complexity, you’ll naturally split it into distinct stages.

As pipelines grow in size and complexity, you’ll naturally split them into multiple stages. Usually something like:

  • Load raw data
  • Apply several transformation steps
  • Save the final output

But, when you're working on, say, step 3, you do not want to rerun step 1 and 2 over and over again.

That’s why it’s worth saving intermediate results. It gives you a way to resume from any step. You can debug and iterate on one step at a time, without running all the previous steps.

My setup: each step writes its output to its own directory.

The result looks like this:

pipe_1/
  step1_out/
  step2_out/
  step3_out/

Each step reads from the output of the previous one.

Step 2 takes step1_out/ as input, step 3 takes step2_out/, and so on.