BAD Marketing

Enterprise Data Infrastructure: ClickUp, ETL & Lead Automation at BAD Marketing

Built the complete data infrastructure at BAD Marketing — ClickUp analytics pipeline, modular ETL platform processing 100K+ events daily, and multi-stage lead automation with 6+ API integrations.

Duration: Sep 2025 - Present
Role: Advanced Systems & Operations Developer
Node.jsTypeScriptPythonn8nBigQueryKlaviyo APIClickUp APIZeroBounceMillionVerifierSmartleadGoogle SheetsDocker
Daily Events Processed
100K+
Development Speed
75% faster
System Reliability
99.9% uptime
API Integrations
6+ services
Published March 15, 2026
Sep 2025 - Present

Building the Complete Data Infrastructure at BAD Marketing

When I was promoted to Advanced Systems & Operations Developer in September 2025, BAD Marketing's data infrastructure was a patchwork of manual processes and fragile scripts. The agency was scaling rapidly -- onboarding new clients, running more campaigns, and generating more leads than ever -- but the underlying systems couldn't keep up. Project data lived in ClickUp with no analytics layer. Marketing performance data flowed through a brittle 300+ line monolithic ETL script. Lead processing was entirely manual, with inconsistent quality and delayed follow-ups.

I set out to build three interconnected systems that would form the backbone of BAD Marketing's data operations: a production-grade ClickUp analytics pipeline, a modular ETL platform for marketing data, and a multi-stage lead processing automation. Together, these systems process over 100K events daily, integrate with 6+ external services, and have maintained 99.9% uptime since deployment.

What ties these projects together isn't just the tech stack -- it's an architectural philosophy. Every system follows the same principles: service separation with clean interfaces, comprehensive error handling with exponential backoff, production monitoring with real-time alerting, and containerized deployment through CI/CD pipelines. The result is infrastructure that the team trusts to run unattended, day after day.

ClickUp Analytics Pipeline

BAD Marketing needed comprehensive insights into their project management workflows to optimize team performance, track client deliverables, and identify operational bottlenecks. The challenge was building a production-grade data pipeline that could reliably extract, process, and analyze ClickUp project data across multiple workspaces while maintaining enterprise-level reliability and monitoring.

Production-Grade System Design

I architected a comprehensive data collection pipeline with enterprise-level features:

# Production-ready architecture with full error handling
class ClickUpDataCollector:
    def __init__(self, config: ProductionConfig):
        self.config = self.load_environment_config()
        self.monitoring = MonitoringService()
        self.alerting = EmailAlertingService()
        self.recovery = RecoveryStateManager()
        self.validator = DataValidator()

    async def collect_workspace_data(self, workspace_id: str) -> CollectionResult:
        """Production data collection with comprehensive error handling"""
        collection_context = {
            'workspace_id': workspace_id,
            'start_time': datetime.now(),
            'session_id': self.generate_session_id()
        }

        try:
            # Initialize collection with health checks
            await self.validate_api_connectivity()
            await self.validate_storage_backends()

            # Execute collection pipeline
            tasks = await self.extract_tasks_with_retry(workspace_id)
            validated_data = await self.validate_and_process(tasks)
            storage_results = await self.store_with_redundancy(validated_data)

            # Success monitoring and metrics
            await self.monitoring.record_success({
                **collection_context,
                'tasks_collected': len(validated_data),
                'storage_backends': len(storage_results),
                'duration': (datetime.now() - collection_context['start_time']).seconds
            })

            return CollectionResult(
                success=True,
                tasks_processed=len(validated_data),
                quality_score=self.calculate_quality_score(validated_data)
            )

        except Exception as error:
            await self.handle_collection_failure(error, collection_context)
            raise

Advanced Error Recovery

The pipeline uses exponential backoff with a circuit breaker pattern to handle API rate limits and network failures gracefully:

class ProductionErrorHandler:
    def __init__(self):
        self.max_retries = 5
        self.base_delay = 1.0
        self.max_delay = 60.0
        self.circuit_breaker = CircuitBreaker()

    async def execute_with_recovery(self, operation: Callable, context: dict) -> Any:
        """Execute operation with comprehensive error recovery"""
        for attempt in range(self.max_retries):
            try:
                if self.circuit_breaker.is_open():
                    await self.wait_for_circuit_recovery()

                result = await asyncio.wait_for(
                    operation(),
                    timeout=self.get_timeout_for_attempt(attempt)
                )

                self.circuit_breaker.record_success()
                return result

            except (APIRateLimitError, NetworkError) as error:
                delay = min(
                    self.base_delay * (2 ** attempt),
                    self.max_delay
                )

                if attempt < self.max_retries - 1:
                    await asyncio.sleep(delay)
                    continue

                self.circuit_breaker.record_failure()
                raise MaxRetriesExceededError(f"Failed after {self.max_retries} attempts")

            except CriticalError as error:
                await self.alerting.send_critical_alert({
                    'service': 'ClickUp Data Collector',
                    'error': str(error),
                    'context': context,
                    'severity': 'CRITICAL'
                })
                raise

Concurrent Processing Engine

To handle multiple projects efficiently, the pipeline uses multi-threaded collection with intelligent load balancing and rate limiting:

class ConcurrentCollectionEngine:
    def __init__(self, max_workers: int = 3):
        self.max_workers = max_workers
        self.rate_limiter = AsyncRateLimiter(requests_per_second=2)
        self.semaphore = asyncio.Semaphore(max_workers)

    async def collect_multiple_projects(self, project_configs: List[ProjectConfig]) -> List[CollectionResult]:
        """Concurrent collection with intelligent load balancing"""
        sorted_projects = self.optimize_processing_order(project_configs)
        batches = self.create_processing_batches(sorted_projects)

        all_results = []
        for batch in batches:
            batch_tasks = [
                self.collect_project_with_limits(project)
                for project in batch
            ]

            batch_results = await asyncio.gather(
                *batch_tasks,
                return_exceptions=True
            )

            processed_results = self.process_batch_results(batch_results, batch)
            all_results.extend(processed_results)

            await asyncio.sleep(1.0)  # Inter-batch cooling

        return all_results

Data Validation & Quality Assurance

Every collection run passes through multi-layer validation -- structural integrity, business rule compliance, data consistency, and completeness checks -- producing a quality score that feeds into monitoring dashboards:

class EnterpriseDataValidator:
    async def validate_task_data(self, tasks: List[TaskData]) -> ValidationResult:
        """Multi-layer data validation with quality scoring"""
        validation_results = {
            'structural': await self.validate_structure(tasks),
            'business': await self.validate_business_rules(tasks),
            'consistency': await self.validate_data_consistency(tasks),
            'completeness': await self.validate_completeness(tasks)
        }

        quality_score = self.quality_metrics.calculate_score(tasks, validation_results)

        return ValidationResult(
            is_valid=all(result.passed for result in validation_results.values()),
            quality_score=quality_score,
            report={
                'total_tasks': len(tasks),
                'validation_results': validation_results,
                'quality_score': quality_score,
                'issues_found': self.extract_issues(validation_results),
                'recommendations': self.generate_recommendations(validation_results)
            }
        )

Cloud-Native Deployment

The pipeline runs as a containerized service on Google Cloud Run with a multi-stage Docker build, non-root user security, health check endpoints, and a full CI/CD pipeline through GitHub Actions including linting, type checking, unit tests, integration tests, security scanning, and automated deployment.

Modular ETL Platform

As BAD Marketing scaled their client portfolio, their data infrastructure hit critical limitations. A monolithic 300+ line ETL script was becoming unmaintainable, error-prone, and unable to handle the growing volume of marketing data from multiple Klaviyo accounts.

The Legacy Problem

The existing system was a single massive script with tightly coupled extraction, transformation, and loading -- no separation of concerns, no error recovery, no monitoring. One client error broke the entire process. Adding new clients meant modifying shared code and risking regressions.

// Legacy monolithic approach (simplified)
async function processAllClients() {
  const clients = ['ClientA', 'ClientB', 'ClientC', ...];

  for (const client of clients) {
    try {
      const campaigns = await fetch(`/api/campaigns?client=${client}`);
      const flows = await fetch(`/api/flows?client=${client}`);
      const processed = campaigns.map(c => ({
        ...transformCampaign(c),
        revenue: calculateRevenue(c)
      }));
      await bigquery.insert(processed);
    } catch (error) {
      console.log(`Error for ${client}:`, error);
    }
  }
}

Service-Oriented Architecture

I designed and implemented a completely new ETL platform with clean service interfaces and single-responsibility modules:

// Clean service interfaces
interface ETLService {
    extract(): Promise<DataSource>;
    transform(data: DataSource): Promise<TransformedData>;
    load(data: TransformedData): Promise<LoadResult>;
}

interface DataTransformer {
    standardize(data: RawData, config: TransformConfig): Promise<StandardData>;
    validate(data: StandardData): ValidationResult;
    enrich(data: StandardData): Promise<EnrichedData>;
}

interface StorageService {
    insert(data: TransformedData): Promise<InsertResult>;
    merge(data: TransformedData): Promise<MergeResult>;
    verify(operation: StorageOperation): Promise<VerificationResult>;
}

Campaign Extraction Service with built-in rate limiting and exponential backoff:

export class KlaviyoCampaignService implements DataExtractor {
    private rateLimiter = new RateLimiter(1000);
    private retryHandler = new ExponentialBackoff({
        maxRetries: 5,
        baseDelay: 1000,
        maxDelay: 30000,
    });

    async extract(clientConfig: ClientConfig): Promise<CampaignData[]> {
        const campaigns = [];
        let cursor = null;

        do {
            await this.rateLimiter.wait();

            const batch = await this.retryHandler.execute(async () => {
                return this.klaviyoClient.getCampaigns({
                    apiKey: clientConfig.klaviyoApiKey,
                    cursor,
                    pageSize: 100,
                });
            });

            campaigns.push(...batch.data);
            cursor = batch.links?.next;
        } while (cursor);

        return campaigns;
    }
}

Data Transformation Service handling timezone conversion, revenue attribution, and rate calculations:

export class DataTransformationService implements DataTransformer {
    async standardize(
        rawData: KlaviyoRawData,
        config: TransformConfig
    ): Promise<StandardizedData> {
        return rawData.map((item) => ({
            client_name: config.clientName,
            item_type: this.detectItemType(item),
            name: this.sanitizeName(item.name),
            campaign_id: item.id,
            subject_line: item.subject_line || null,
            send_channel: "email",
            sent_at: this.convertToEST(item.send_time),
            recipients: item.recipients || 0,
            opens: item.opens || 0,
            clicks: item.clicks || 0,
            conversions: item.conversions || 0,
            revenue: this.calculateRevenue(item),
            open_rate: this.calculateRate(item.opens, item.recipients),
            click_rate: this.calculateRate(item.clicks, item.recipients),
            conversion_rate: this.calculateRate(item.conversions, item.recipients),
            last_updated: new Date().toISOString(),
        }));
    }
}

BigQuery Storage Service with batch inserts and sophisticated MERGE logic for flows with revenue attribution:

export class BigQueryStorageService implements StorageService {
    async merge(data: StandardizedData[]): Promise<MergeResult> {
        const mergeQuery = `
      MERGE \`${this.config.projectId}.${this.config.datasetId}.${this.config.tableId}\` AS target
      USING (${this.buildValuesList(data)}) AS source
      ON target.client_name = source.client_name 
        AND target.flow_id = source.flow_id
      WHEN MATCHED THEN
        UPDATE SET
          recipients = source.recipients,
          opens = source.opens,
          clicks = source.clicks,
          conversions = source.conversions,
          revenue = source.revenue,
          last_updated = source.last_updated
      WHEN NOT MATCHED THEN
        INSERT ROW
    `;

        const [job] = await this.bigquery.createQueryJob({ query: mergeQuery });
        await job.getQueryResults();

        return { success: true, operation: "MERGE", affectedRows: data.length };
    }
}

ETL Orchestration

The main orchestrator coordinates extraction, transformation, validation, and loading with comprehensive monitoring at each phase:

export class ETLOrchestrator {
    async processClient(clientConfig: ClientConfig): Promise<ProcessingResult> {
        const startTime = Date.now();

        try {
            // Extract phase - campaigns and flows in parallel
            const [campaigns, flows] = await Promise.all([
                this.campaignService.extract(clientConfig),
                this.flowService.extract(clientConfig),
            ]);

            // Transform phase
            const transformedData = await this.transformer.standardize(
                [...campaigns, ...flows],
                { clientName: clientConfig.name }
            );

            // Validate phase
            const validationResult = await this.transformer.validate(transformedData);
            if (!validationResult.isValid) {
                throw new ValidationError(
                    `Data validation failed: ${validationResult.errors.join(", ")}`
                );
            }

            // Load phase
            const loadResult = await this.determineLoadStrategy(transformedData);

            return {
                success: true,
                client: clientConfig.name,
                recordsProcessed: loadResult.totalRecords,
                duration: Date.now() - startTime,
            };
        } catch (error) {
            await this.handleProcessingError(error, { client: clientConfig.name });
            throw error;
        }
    }
}

Revenue Attribution Engine

One of the more complex challenges was accurate revenue attribution across Klaviyo flows. The system handles a 180-day attribution window with 5-day click / 1-day view attribution, aggregating 118 individual flow message records down to 32 unique flows with correct revenue totals:

export class FlowRevenueService {
    async extractFlowRevenue(clientConfig: ClientConfig): Promise<FlowRevenueData[]> {
        const flowMetrics = await this.klaviyoClient.getFlowMetrics({
            apiKey: clientConfig.klaviyoApiKey,
            startDate: moment().subtract(180, "days").format("YYYY-MM-DD"),
            endDate: moment().format("YYYY-MM-DD"),
            metrics: ["Placed Order", "Fulfilled Order"],
            attributionWindow: "5d1d",
        });

        // Aggregate duplicate flow records
        const aggregatedFlows = this.aggregateFlowData(flowMetrics);
        return aggregatedFlows.map((flow) => ({
            client_name: clientConfig.name,
            flow_id: flow.id,
            flow_name: flow.name,
            recipients: flow.total_recipients,
            revenue: this.calculateAttributedRevenue(flow),
            // ... rates calculated from aggregated totals
        }));
    }
}

Lead Processing Automation

BAD Marketing's rapid growth created a critical need for sophisticated lead processing automation. Manual lead handling was becoming a bottleneck, with inconsistent data quality, delayed follow-ups, and no systematic approach to lead scoring and campaign assignment.

I designed and implemented a comprehensive 4-stage lead processing pipeline using n8n as the orchestration platform, integrating ZeroBounce, MillionVerifier, Smartlead, Google Sheets, Slack, and ClickUp.

Stage 1: Intelligent Data Ingestion

The pipeline accepts leads from multiple sources -- Google Sheets webhooks, ClickUp task creation events, and CSV bulk imports -- with real-time validation:

// Lead ingestion with comprehensive validation
const lead = items[0].json;

// Required field validation
const requiredFields = ['email', 'firstName', 'lastName', 'company'];
const missingFields = requiredFields.filter(field => !lead[field]);

if (missingFields.length > 0) {
  throw new Error(`Missing required fields: ${missingFields.join(', ')}`);
}

// Normalize data
const normalizedLead = {
  ...lead,
  email: lead.email.toLowerCase().trim(),
  firstName: lead.firstName.trim(),
  lastName: lead.lastName.trim(),
  company: lead.company.trim(),
  source: lead.source || 'manual_entry',
  ingestionTimestamp: new Date().toISOString(),
  processingStatus: 'validation_passed'
};

Stage 2: Advanced Email Verification

A dual-service verification strategy uses ZeroBounce as the primary provider with MillionVerifier as fallback. High-value leads get dual validation for increased confidence:

{
    "verification_logic": {
        "dual_validation": "high_value_leads",
        "fallback_triggers": ["api_error", "inconclusive_result"],
        "quality_threshold": 70,
        "manual_review_threshold": 60
    }
}

The verification process calculates a comprehensive quality score incorporating deliverability status, MX record presence, SMTP provider validation, and name matching -- producing a score from 0-100 that feeds into lead scoring.

Stage 3: Intelligent Lead Scoring & Enrichment

A multi-factor scoring algorithm weighs five dimensions:

  • Email Quality (30%): Deliverability score from verification
  • Company Data (25%): Company size, industry match, corporate vs generic email domain
  • Contact Completeness (20%): Phone, LinkedIn, job title, website presence
  • Behavioral Indicators (15%): Source quality, UTM parameters, form engagement
  • Timing Factors (10%): Business hours submission, weekday preference
// Determine lead grade from composite score
let leadGrade = 'D';
if (totalScore >= 80) leadGrade = 'A';
else if (totalScore >= 65) leadGrade = 'B';
else if (totalScore >= 50) leadGrade = 'C';

Stage 4: Automated Campaign Creation & Distribution

Based on lead grade, the pipeline automatically creates Smartlead campaigns, assigns ClickUp tasks with SLA deadlines, and sends Slack notifications:

  • Grade A leads: Personal outreach campaign, 2-hour response SLA, executive touch sequence
  • Grade B leads: Automated sequence, 24-hour SLA, standard nurture
  • Grade C leads: Educational content campaign, 72-hour SLA, long-term nurture

Every lead gets a complete audit trail tracking its journey through all four stages, including API calls made, data transformations applied, quality metrics, and compliance notes.

De-duplication & Data Quality

Advanced duplicate detection uses multi-factor matching -- exact email, email domain + name combination, phone + company combination, and fuzzy name matching with company similarity scoring -- to prevent duplicate processing while maintaining data integrity.

Architecture & Approach

Across all three systems, several architectural principles proved essential:

Service Separation: Each system is decomposed into single-responsibility services with clean interfaces. The ETL platform separates extraction, transformation, and loading. The lead pipeline separates ingestion, verification, scoring, and distribution. The ClickUp pipeline separates collection, validation, and storage. This makes each component independently testable and modifiable.

Resilient Error Handling: Every external API call is wrapped in retry logic with exponential backoff and circuit breakers. When ZeroBounce is unavailable, the lead pipeline falls back to MillionVerifier. When the ClickUp API hits rate limits, the collector backs off intelligently. No single API failure cascades into system-wide failure.

Production Monitoring: All three systems feed into centralized monitoring with real-time Slack alerts for critical failures, structured logging for debugging, and health check endpoints for automated recovery. The team has complete visibility into system status without checking dashboards manually.

Containerized Deployment: Docker containers with multi-stage builds, non-root users, health checks, and CI/CD pipelines through GitHub Actions ensure consistent, secure, zero-downtime deployments.

Environment-Aware Configuration: Development, staging, and production environments each get optimized settings -- batch sizes, worker counts, rate limits, and logging levels -- through a single configuration system.

Key Results

ClickUp Analytics Pipeline

  • 99.9% uptime with automated failover and recovery
  • >95% validation success rate across all data collections
  • 60% faster processing through multi-threaded concurrent collection
  • Real-time project insights: team velocity, completion rates, bottleneck identification

Modular ETL Platform

  • 100K+ events processed daily with sub-second latency
  • 75% faster development cycles for new client integrations
  • 15+ concurrent clients supported (up from 3-5 on the monolith)
  • 90% fewer production issues through modular architecture and comprehensive testing
  • Client onboarding reduced from weeks to hours

Lead Processing Automation

  • Under 3 minutes end-to-end processing per lead
  • 95%+ data accuracy through multi-layer validation
  • 75% faster lead response time from intake to campaign assignment
  • 60% cost reduction in manual lead processing labor
  • Complete audit trail for every lead processed

Combined Impact

  • 6+ API integrations working seamlessly across all systems
  • Zero manual intervention required for daily operations
  • Enterprise-grade reliability that the team trusts to run unattended
  • Foundation for scale: all systems handle 10x volume increases without architectural changes

Technologies Used

Node.jsTypeScriptPythonn8nBigQueryKlaviyo APIClickUp APIZeroBounceMillionVerifierSmartleadGoogle SheetsDocker

Want Similar Results?

Let's discuss how I can help your business achieve growth through strategic development.