Skip to main content

Complete Job System Guide

This document provides a comprehensive overview of the job system in the ODAPI application, including all job types, duplicate prevention, the unified ParserJob system, and usage examples.

Table of Contents

  1. Job System Overview
  2. Job Types and Scheduling
  3. Unified ParserJob System
  4. Duplicate Prevention System
  5. Job Processing Flow
  6. Usage Examples
  7. Rake Tasks
  8. Best Practices

Job System Overview

The ODAPI application uses Sidekiq for background job processing with several types of jobs:

  • Data Fetching Jobs: Retrieve company data from external APIs
  • Processing Jobs: Parse and update company information
  • Maintenance Jobs: Update statistics, completion scores, and cleanup
  • Scheduled Jobs: Run on cron schedules for regular operations

All jobs inherit from ApplicationJob which provides duplicate prevention capabilities.

Job Types and Scheduling

Scheduled Jobs (via schedule.yml)

# Data fetching and loading (high frequency)
fetch_companies_from_api_job:
cron: "*/5 * * * *" # Every 5 minutes
class: "FetchCompaniesDataJob"

load_companies_job:
cron: "*/3 * * * *" # Every 3 minutes
class: "LoadCompaniesJob"

# Statistics and maintenance (regular intervals)
init_stats_job:
cron: "0 */1 * * *" # Every hour
class: "UpdateStatsJob"

update_completion_scores_job:
cron: "0 2 * * *" # Daily at 2:00 AM
class: "UpdateCompletionScoresJob"

Job Categories

Data Pipeline Jobs

  • FetchCompaniesDataJob: Fetches company data from external API
  • LoadCompaniesJob: Enqueues processing jobs for fetched data
  • UpdateCompanyJob: Processes individual company data

Maintenance Jobs

  • UpdateStatsJob: Updates application statistics
  • UpdateCompletionScoresJob: Updates company completion scores
  • ParseAddressJob: Processes address parsing
  • UpdateActivityDescriptionsJob: Updates activity descriptions

Unified ParserJob System

Migration from Dual-Model System

The system was migrated from separate ParserQueuedJob and ParserFailedJob models to a unified ParserJob model with status tracking.

ParserJob Status Flow

queued → processing → completed (success path)
↓ ↓
└─────────→ failed (error path)
↑ ↓
└── retry ←────┘

ParserJob Model Features

class ParserJob < ApplicationRecord
enum status: { queued: 0, processing: 1, completed: 2, failed: 3 }

# Timestamps for each status
# - processed_at: When job started processing
# - completed_at: When job completed successfully
# - failed_at: When job failed

# Validation rules
# ✅ Company can have multiple completed/failed jobs (history)
# ❌ Company cannot have multiple queued/processing jobs (prevents duplicates)
end

Key Methods

# Status management
job.mark_as_processing!
job.mark_as_completed!
job.mark_as_failed!("Error message")

# Data handling
job.normalize_raw_data # Returns symbolized hash

# Statistics
ParserJob.stats
# => { queued: 45, processing: 2, completed: 1230, failed: 15, total: 1292 }

# Batch operations
ParserJob.process_queued_jobs(10)
ParserJob.retry_failed_jobs(5)
ParserJob.cleanup_completed_jobs(older_than: 1.week.ago)

Duplicate Prevention System

ApplicationJob Enhancements

All jobs inherit duplicate prevention capabilities from ApplicationJob:

class ApplicationJob < ActiveJob::Base
# Check if job with same arguments is already enqueued
def self.already_enqueued?(*args)
# Checks across all Sidekiq queues (active, scheduled, retry)
end

# Enqueue only if no duplicate exists
def self.perform_later_if_unique(*args)
return false if already_enqueued?(*args)
perform_later(*args)
true
end
end

How Duplicate Prevention Works

  1. Queue Checking: Scans active, scheduled, and retry queues
  2. Argument Serialization: Uses ActiveJob::Arguments.serialize for comparison
  3. Error Handling: Graceful degradation when Sidekiq unavailable
  4. Performance: Memory-based checks, no database queries

Duplicate Prevention Examples

# Basic usage
UpdateStatsJob.already_enqueued?
# => false

UpdateStatsJob.perform_later_if_unique
# => true (job enqueued)

UpdateStatsJob.perform_later_if_unique
# => false (duplicate found)

# With arguments
UpdateCompanyJob.already_enqueued?(parser_job_id)
UpdateCompanyJob.perform_later_if_unique(parser_job_id)

Job Processing Flow

Complete Data Pipeline

1. FetchCompaniesDataJob (cron: */5 * * * *)
↓ Fetches data from external API
↓ Creates ParserJob records with status: :queued

2. LoadCompaniesJob (cron: */3 * * * *)
↓ Finds ParserJob.queued records
↓ Enqueues UpdateCompanyJob for each (with duplicate prevention)

3. UpdateCompanyJob (queue: api_parsers)
↓ Processes ParserJob.find(id)
↓ mark_as_processing! → process data → mark_as_completed!/mark_as_failed!

Job Implementation Details

FetchCompaniesDataJob

def perform(batch_size: 50_000)
# Enhanced with interactive rake task
stored_jobs = ParserJob.queued.count
return if stored_jobs > 5000 # Rate limiting

# Fetch companies without pending jobs
saved_ids = ParserJob.where(status: [:queued, :processing]).pluck(:company_id)

# Create ParserJob records for new data
companies_data.each do |data|
ParserJob.create!(raw_data: data, company_id: company.id, status: :queued)
end
end

LoadCompaniesJob

def perform
queue = Sidekiq::Queue.new("api_parsers")
return if queue.size > 100 # Queue management

ParserJob.queued.find_each do |job|
UpdateCompanyJob.perform_later_if_unique(job.id) # Duplicate prevention
end
end

UpdateCompanyJob

def perform(parser_job_id)
job = ParserJob.find(parser_job_id)
job.mark_as_processing!

# Process with CompanyParser
raw_data = job.normalize_raw_data
parser = CompanyParser.new(raw_data, job.company)
parser.update

if parser.validate_extracted_data
job.mark_as_completed!
job.destroy # Cleanup successful jobs
else
job.mark_as_failed!(parser.errors)
end
rescue => e
job.mark_as_failed!(e.message)
end

UpdateCompletionScoresJob (Enhanced for Daily Updates)

def perform(batch_size: 1000, force_full_update: false)
last_run_time = get_last_run_time

# Only update companies modified since last run
companies_scope = if force_full_update
Companies::Info.all
else
# Check both companies.updated and companies_infos.updated_at
company_ids = Company.where("updated > ?", last_run_time).pluck(:id)
info_ids = Companies::Info.where("updated_at > ?", last_run_time).pluck(:id)
updated_ids = (company_ids + info_ids).uniq

return if updated_ids.empty? # Skip if no updates
Companies::Info.where(id: updated_ids)
end

# Bulk update with optimized SQL
companies_scope.find_in_batches(batch_size: batch_size) do |batch|
# SQL CASE statement for bulk completion score updates
end

update_last_run_time # Track for next run
end

Usage Examples

Basic Job Operations

# Manual job enqueueing
UpdateStatsJob.perform_later
FetchCompaniesDataJob.perform_later(batch_size: 1000)

# With duplicate prevention
UpdateStatsJob.perform_later_if_unique
UpdateCompanyJob.perform_later_if_unique(parser_job_id)

# Force full updates
UpdateCompletionScoresJob.perform_later(force_full_update: true)

ParserJob Management

# Create new parser job
job = ParserJob.create!(
company: company,
raw_data: api_data,
status: :queued
)

# Process jobs
ParserJob.process_queued_jobs(10) # Process 10 queued jobs
ParserJob.retry_failed_jobs(5) # Retry 5 failed jobs

# Monitoring
ParserJob.stats
# => { queued: 45, processing: 2, completed: 1230, failed: 15 }

# Cleanup
ParserJob.cleanup_completed_jobs(older_than: 1.month.ago)

API Integration

class Api::JobsController < ApplicationController
def trigger_stats_update
if UpdateStatsJob.perform_later_if_unique
render json: { status: "scheduled" }
else
render json: { status: "already_in_progress" }
end
end

def trigger_company_update
parser_job_id = params[:parser_job_id]

if UpdateCompanyJob.perform_later_if_unique(parser_job_id)
render json: { status: "scheduled", parser_job_id: parser_job_id }
else
render json: { status: "duplicate", parser_job_id: parser_job_id }
end
end
end

Batch Processing with Duplicate Prevention

parser_job_ids = [1, 2, 3, 2, 4, 1]  # Note duplicates

enqueued_jobs = parser_job_ids.filter_map do |id|
if UpdateCompanyJob.perform_later_if_unique(id)
id
else
Rails.logger.info "Skipping duplicate job for parser_job_id: #{id}"
nil
end
end

puts "Enqueued #{enqueued_jobs.size} unique jobs out of #{parser_job_ids.size} requests"
# Output: Enqueued 4 unique jobs out of 6 requests

Rake Tasks

Parser Job Management

# Process queued jobs
bin/rails parser_jobs:process[10]

# Retry failed jobs
bin/rails parser_jobs:retry[5]

# Show statistics
bin/rails parser_jobs:stats

# Clean up old completed jobs
bin/rails parser_jobs:cleanup[30] # older than 30 days

# Reset stuck processing jobs
bin/rails parser_jobs:reset_processing

# Migrate from old system
bin/rails parser_jobs:migrate

Interactive Data Fetching

# Interactive company data fetching
bin/rails companies:fetch_data

# Example session:
# Enter start company ID: 1000
# Enter count (default: 5000): 2000
# Proceed with enqueueing? (y/n) y

Best Practices

1. Use Duplicate Prevention for Resource-Intensive Jobs

# Good: Prevent duplicate expensive operations
FetchCompaniesDataJob.perform_later_if_unique(api_endpoint)
ProcessLargeFileJob.perform_later_if_unique(file_path)

# Bad: Risk of duplicate processing
FetchCompaniesDataJob.perform_later(api_endpoint)

2. Combine with Job Status Tracking

class UpdateCompanyJob < ApplicationJob
def self.status_for_company(parser_job_id)
if already_enqueued?(parser_job_id)
"in_progress"
else
"ready"
end
end
end

3. Monitor Job Queues

class JobMonitor
def self.queue_status
{
update_stats_pending: UpdateStatsJob.already_enqueued?,
company_updates: Sidekiq::Queue.new("api_parsers").size,
stats_updates: Sidekiq::Queue.new("stats").size,
parser_jobs: ParserJob.stats
}
end
end

4. Incremental Processing

# Use timestamp tracking for incremental updates
def perform
last_run = get_last_run_time

# Only process records updated since last run
records = Model.where("updated_at > ?", last_run)
return if records.empty?

# Process records...

update_last_run_time
end

5. Error Handling and Retries

class MyJob < ApplicationJob
rescue_from(StandardError) do |exception|
# Log error details
Rails.logger.error "Job failed: #{exception.message}"

# Mark parser job as failed if applicable
if @parser_job_id
parser_job = ParserJob.find_by(id: @parser_job_id)
parser_job&.mark_as_failed!(exception.message)
end
end
end

6. Queue Management

def perform
# Check queue size before adding more jobs
queue = Sidekiq::Queue.new("my_queue")
return if queue.size > 100 # Prevent queue overflow

# Proceed with job logic...
end

Migration Benefits

The unified job system provides:

  1. Single Source of Truth: All parser jobs in one model
  2. Better Tracking: Clear status progression with timestamps
  3. Duplicate Prevention: Automatic prevention of duplicate job scheduling
  4. Retry Capability: Easy retry of failed jobs
  5. Statistics: Built-in monitoring and reporting
  6. Cleanup: Automatic removal of old completed jobs
  7. Error Context: Better error messages and debugging
  8. Performance: Optimized queries and bulk operations
  9. Incremental Updates: Process only changed data
  10. Queue Management: Prevent queue overflow and resource exhaustion

This comprehensive job system ensures reliable, efficient, and maintainable background processing for the ODAPI application.