← Back to blog

Never Lose Progress: Building Resilient ETL Pipelines with Checkpoint-Based Incremental Sync

Santiago Arias
Santiago Arias
CTO

Introduction

When syncing 100k+ records from Airtable, what happens if your function times out halfway through? Traditional ETL jobs either succeed completely or fail completely - losing all progress on failure.

At CreativeOS, we built a checkpoint-based incremental sync system that never loses progress, handles failures gracefully, and can resume from exactly where it left off. This article explains our checkpoint pattern and how it ensures reliable data synchronization.

The Challenge

Our Airtable sync faces several challenges:

  1. Long-running operations: 100k+ records can take hours to process
  2. Function timeouts: Firebase Functions have 540s timeout (9 minutes)
  3. Network failures: Airtable API can be unreliable mid-sync
  4. Idempotency: Must not duplicate records on retry
  5. Pagination: Airtable uses offset tokens that can expire

Traditional approach:

async function syncAirtable() {
  let offset = null
  do {
    const batch = await airtable.fetch({ offset })
    await processBatch(batch)
    offset = batch.nextOffset
  } while (offset)
}

Problem: If this fails at record 50,000, we lose all progress and start over.

The Checkpoint Pattern

Our solution uses checkpoints to save progress after each batch:

interface CheckpointInfo {
  lastProcessedId: string | null
  startOffset: string | undefined
}

async function handleCheckpoints(
  dataConnect: DataConnect, 
  tableName: string, 
  mode: SyncMode
): Promise<CheckpointInfo> {
  let lastProcessedId: string | null = null
  let startOffset: string | undefined = undefined
  
  if (mode === 'forceRefresh' || mode === 'fullSync' || mode === 'flush') {
    // Clear existing checkpoint for full refresh
    try {
      const existingCheckpoint = await universalLookup(dataConnect, { tableName })
      if (existingCheckpoint.data?.airtableSyncCheckpoints?.length > 0) {
        const checkpointId = existingCheckpoint.data.airtableSyncCheckpoints[0].id
        await updateSyncCheckpoint(dataConnect, {
          checkpointId,
          lastProcessedId: '',
          offset: null
        })
      }
    } catch (error) {
      // No existing checkpoint to clear
    }
    return { lastProcessedId: null, startOffset: undefined }
  } else {
    // Load checkpoint for incremental mode
    try {
      const checkpoint = await universalLookup(dataConnect, { tableName })
      if (checkpoint.data?.airtableSyncCheckpoints?.length > 0) {
        const checkpointData = checkpoint.data.airtableSyncCheckpoints[0]
        lastProcessedId = checkpointData.lastProcessedId
        if ('offset' in checkpointData && typeof checkpointData.offset === 'string') {
          startOffset = checkpointData.offset || undefined
        }
      }
    } catch (error) {
      // Error loading checkpoint, starting from beginning
    }
  }
  
  return { lastProcessedId, startOffset }
}

Checkpoint structure:

  • tableName: Which table this checkpoint is for
  • lastProcessedId: Last successfully processed record ID
  • offset: Airtable pagination offset token

Saving Checkpoints

We save checkpoints after each successful batch:

export async function saveCheckpoint(
  dataConnect: DataConnect, 
  tableName: string, 
  lastRecordId: string, 
  offset?: string
) {
  try {
    const existingCheckpoint = await universalLookup(dataConnect, { tableName })
    
    if (existingCheckpoint.data?.airtableSyncCheckpoints?.length > 0) {
      // Update existing checkpoint
      const checkpointId = existingCheckpoint.data.airtableSyncCheckpoints[0].id
      await updateSyncCheckpoint(dataConnect, {
        checkpointId,
        lastProcessedId: lastRecordId,
        offset: offset || null
      })
    } else {
      // Create new checkpoint
      await createSyncCheckpoint(dataConnect, {
        tableName,
        lastProcessedId: lastRecordId,
        offset: offset || null
      })
    }
  } catch (error) {
    console.error(`❌ CHECKPOINT SAVE ERROR: ${tableName} - ${error}`)
    // Don't throw - checkpoint save failure shouldn't break sync
  }
}

Critical detail: We save checkpoints even if some records in the batch were skipped (already exist). This ensures we don't reprocess those records on retry.

The Sync Loop

Here's how the complete sync loop works:

export async function processTable(
  dataConnect: DataConnect,
  storage: AdminStorage,
  tableConfig: any,
  mode: SyncMode,
  maxRecords: number | null,
  createdById: string,
  results: SyncResults
): Promise<TableProcessResult> {
  
  // Load checkpoint based on sync mode
  const { lastProcessedId, startOffset } = await handleCheckpoints(
    dataConnect, 
    tableConfig.name, 
    mode
  )

  let currentOffset: string | undefined = startOffset
  let hasMore = true
  let successfulDbOps = 0
  let lastSuccessfullyProcessedId: string | null = null
  let lastValidOffset: string | undefined = undefined

  while (hasMore && (!maxRecords || successfulDbOps < maxRecords)) {
    try {
      // Fetch batch from Airtable
      const airtableResponse = await airtable.fetch({
        table: tableConfig.name,
        offset: currentOffset,
        pageSize: 100
      })

      const records = airtableResponse.records
      const nextOffset = airtableResponse.offset

      if (records.length === 0) {
        hasMore = false
        break
      }

      let batchSkippedCount = 0

      // Process each record
      for (const record of records) {
        // Skip if already processed (incremental mode)
        if (mode === 'incremental' && lastProcessedId && record.id <= lastProcessedId) {
          batchSkippedCount++
          continue
        }

        try {
          // Process record (download images, create asset, etc.)
          await processRecord(record, storage, createdById)
          lastSuccessfullyProcessedId = record.id
          successfulDbOps++
        } catch (error) {
          // Log error but continue processing
          console.error(`Failed to process record ${record.id}:`, error)
        }
      }

      // Update offset for next batch
      currentOffset = nextOffset
      
      // CRITICAL: Preserve last valid offset
      if (nextOffset) {
        lastValidOffset = nextOffset
      } else {
        // If pagination ended, keep last valid offset
        // This prevents restarting from beginning if sync resumes
        currentOffset = lastValidOffset
      }

      // Save checkpoint after each batch (even if all records skipped)
      if (lastSuccessfullyProcessedId) {
        await saveCheckpoint(
          dataConnect, 
          tableConfig.name, 
          lastSuccessfullyProcessedId, 
          currentOffset
        )
      }

      // In incremental mode, continue even if entire batch was skipped
      // This ensures we don't exit when there are unprocessed records further down
      if (mode === 'incremental' && batchSkippedCount === records.length && batchSkippedCount > 0) {
        // Continue to next offset - don't break
      }

    } catch (error) {
      console.error(`Failed to fetch batch:`, error)
      // Save checkpoint with last successful position before failing
      if (lastSuccessfullyProcessedId) {
        await saveCheckpoint(dataConnect, tableConfig.name, lastSuccessfullyProcessedId, currentOffset)
      }
      throw error
    }
  }

  return {
    processed: successfulDbOps,
    // ... other stats
  }
}

Key patterns:

  1. Checkpoint after each batch: Even if some records skipped
  2. Preserve last valid offset: Handle null offsets at table end
  3. Continue on skipped batches: Don't exit early in incremental mode
  4. Save before failing: Checkpoint last successful position on error

Handling Airtable Pagination

Airtable pagination has edge cases:

// Starting from checkpoint offset
let currentOffset: string | undefined = startOffset

// Preserve last valid offset
let lastValidOffset: string | undefined = undefined

while (hasMore) {
  const response = await airtable.fetch({ offset: currentOffset })
  
  // Update offset
  currentOffset = response.offset
  
  // CRITICAL: Preserve last valid offset
  if (response.offset) {
    lastValidOffset = response.offset
  } else {
    // Pagination ended - use last valid offset for checkpoint
    // This prevents restarting from beginning on resume
    currentOffset = lastValidOffset
  }
  
  // Save checkpoint with current offset
  await saveCheckpoint(tableName, lastRecordId, currentOffset)
}

Why this matters: If pagination ends (offset becomes null), we save the last valid offset. On resume, we continue from near the end instead of restarting from the beginning.

Sync Modes

We support multiple sync modes:

Incremental Mode

mode === 'incremental'
  • Loads checkpoint
  • Resumes from saved position
  • Skips already-processed records
  • Continues until end of table

Full Sync Mode

mode === 'fullSync'
  • Clears checkpoint
  • Processes all records
  • Useful for complete refresh

Force Refresh Mode

mode === 'forceRefresh'
  • Clears checkpoint
  • Reprocesses everything
  • Forces re-download of images

Flush Mode

mode === 'flush'
  • Clears checkpoint
  • Stops immediately
  • Useful for resetting state

Real-World Edge Cases

Edge Case 1: Records Deleted from Airtable

Problem: Record ID in checkpoint no longer exists in Airtable.

Solution:

if (mode === 'incremental' && lastProcessedId && record.id <= lastProcessedId) {
  continue // Skip already processed
}

We use <= comparison, so if the exact record is deleted, we skip to the next available record.

Edge Case 2: Concurrent Sync Attempts

Problem: Two syncs running simultaneously could corrupt checkpoints.

Solution: Checkpoint acts as a lock. The second sync loads the checkpoint and skips already-processed records. No corruption, just redundant processing (acceptable).

Edge Case 3: Offset Token Expiration

Problem: Airtable offset tokens can expire if sync paused too long.

Solution: We also track lastProcessedId. If offset expires, we can fall back to ID-based pagination (not shown here, but possible enhancement).

Edge Case 4: Large Batch Sizes vs. Memory Limits

Problem: Processing 1000 records at once could exceed function memory.

Solution: We use smaller batch sizes (100 records) and save checkpoints frequently. If memory limit hit, we've saved progress and can resume.

Error Recovery

When errors occur, we save checkpoints before failing:

try {
  // Process batch
} catch (error) {
  // Save checkpoint with last successful position
  if (lastSuccessfullyProcessedId) {
    await saveCheckpoint(dataConnect, tableName, lastSuccessfullyProcessedId, currentOffset)
  }
  throw error // Re-throw to trigger retry
}

On retry:

  1. Function loads checkpoint
  2. Resumes from last successful position
  3. Continues processing
  4. No duplicate work

Production Results

After implementing checkpoint-based sync:

  • Reliability: 99.9% sync completion rate (even with failures)
  • Efficiency: No duplicate processing on retry
  • Resumability: Can pause and resume syncs seamlessly
  • Progress tracking: Always know how far sync progressed

Best Practices

  1. Save checkpoints frequently: After each batch, not just at end
  2. Preserve offsets: Handle null offsets gracefully
  3. Idempotent operations: Safe to retry without duplication
  4. Comprehensive logging: Track progress for debugging
  5. Error handling: Save checkpoint before failing
  6. Mode flexibility: Support different sync strategies

Common Pitfalls to Avoid

Don't save checkpoints only at end: Loses progress on failure

Don't ignore null offsets: Handle pagination end gracefully

Don't break on skipped batches: Continue until end of table

Don't forget idempotency: Ensure safe retries

Don't skip error checkpoint saves: Always save before failing

Conclusion

Checkpoint-based incremental sync is essential for reliable ETL pipelines. By saving progress after each batch and handling edge cases gracefully, we achieved:

  • 99.9% completion rate: Even with failures, syncs complete
  • No lost progress: Always resume from last successful position
  • Efficient retries: No duplicate work on failure
  • Production-ready: Handles real-world edge cases

The patterns we've shared here are production-tested and handle thousands of records daily. Whether you're syncing from Airtable, Salesforce, or any external API, checkpoint-based sync ensures your data stays in sync even when things go wrong.

The key insight: Treat long-running syncs as resumable operations. Save progress frequently, handle failures gracefully, and always allow resumption from the last successful position.