Complete Job System Guide
This document provides a comprehensive overview of the job system in the ODAPI application, including all job types, duplicate prevention, the unified ParserJob system, and usage examples.
Table of Contents
- Job System Overview
- Job Types and Scheduling
- Unified ParserJob System
- Duplicate Prevention System
- Job Processing Flow
- Usage Examples
- Rake Tasks
- Best Practices
Job System Overview
The ODAPI application uses Sidekiq for background job processing with several types of jobs:
- Data Fetching Jobs: Retrieve company data from external APIs
- Processing Jobs: Parse and update company information
- Maintenance Jobs: Update statistics, completion scores, and cleanup
- Scheduled Jobs: Run on cron schedules for regular operations
All jobs inherit from ApplicationJob which provides duplicate prevention capabilities.
Job Types and Scheduling
Scheduled Jobs (via schedule.yml)
# Data fetching and loading (high frequency)
fetch_companies_from_api_job:
cron: "*/5 * * * *" # Every 5 minutes
class: "FetchCompaniesDataJob"
load_companies_job:
cron: "*/3 * * * *" # Every 3 minutes
class: "LoadCompaniesJob"
# Statistics and maintenance (regular intervals)
init_stats_job:
cron: "0 */1 * * *" # Every hour
class: "UpdateStatsJob"
update_completion_scores_job:
cron: "0 2 * * *" # Daily at 2:00 AM
class: "UpdateCompletionScoresJob"
Job Categories
Data Pipeline Jobs
- FetchCompaniesDataJob: Fetches company data from external API
- LoadCompaniesJob: Enqueues processing jobs for fetched data
- UpdateCompanyJob: Processes individual company data
Maintenance Jobs
- UpdateStatsJob: Updates application statistics
- UpdateCompletionScoresJob: Updates company completion scores
- ParseAddressJob: Processes address parsing
- UpdateActivityDescriptionsJob: Updates activity descriptions
Unified ParserJob System
Migration from Dual-Model System
The system was migrated from separate ParserQueuedJob and ParserFailedJob models to a unified ParserJob model with status tracking.
ParserJob Status Flow
queued → processing → completed (success path)
↓ ↓
└─────────→ failed (error path)
↑ ↓
└── retry ←────┘
ParserJob Model Features
class ParserJob < ApplicationRecord
enum status: { queued: 0, processing: 1, completed: 2, failed: 3 }
# Timestamps for each status
# - processed_at: When job started processing
# - completed_at: When job completed successfully
# - failed_at: When job failed
# Validation rules
# ✅ Company can have multiple completed/failed jobs (history)
# ❌ Company cannot have multiple queued/processing jobs (prevents duplicates)
end
Key Methods
# Status management
job.mark_as_processing!
job.mark_as_completed!
job.mark_as_failed!("Error message")
# Data handling
job.normalize_raw_data # Returns symbolized hash
# Statistics
ParserJob.stats
# => { queued: 45, processing: 2, completed: 1230, failed: 15, total: 1292 }
# Batch operations
ParserJob.process_queued_jobs(10)
ParserJob.retry_failed_jobs(5)
ParserJob.cleanup_completed_jobs(older_than: 1.week.ago)
Duplicate Prevention System
ApplicationJob Enhancements
All jobs inherit duplicate prevention capabilities from ApplicationJob:
class ApplicationJob < ActiveJob::Base
# Check if job with same arguments is already enqueued
def self.already_enqueued?(*args)
# Checks across all Sidekiq queues (active, scheduled, retry)
end
# Enqueue only if no duplicate exists
def self.perform_later_if_unique(*args)
return false if already_enqueued?(*args)
perform_later(*args)
true
end
end
How Duplicate Prevention Works
- Queue Checking: Scans active, scheduled, and retry queues
- Argument Serialization: Uses
ActiveJob::Arguments.serializefor comparison - Error Handling: Graceful degradation when Sidekiq unavailable
- Performance: Memory-based checks, no database queries
Duplicate Prevention Examples
# Basic usage
UpdateStatsJob.already_enqueued?
# => false
UpdateStatsJob.perform_later_if_unique
# => true (job enqueued)
UpdateStatsJob.perform_later_if_unique
# => false (duplicate found)
# With arguments
UpdateCompanyJob.already_enqueued?(parser_job_id)
UpdateCompanyJob.perform_later_if_unique(parser_job_id)
Job Processing Flow
Complete Data Pipeline
1. FetchCompaniesDataJob (cron: */5 * * * *)
↓ Fetches data from external API
↓ Creates ParserJob records with status: :queued
2. LoadCompaniesJob (cron: */3 * * * *)
↓ Finds ParserJob.queued records
↓ Enqueues UpdateCompanyJob for each (with duplicate prevention)
3. UpdateCompanyJob (queue: api_parsers)
↓ Processes ParserJob.find(id)
↓ mark_as_processing! → process data → mark_as_completed!/mark_as_failed!
Job Implementation Details
FetchCompaniesDataJob
def perform(batch_size: 50_000)
# Enhanced with interactive rake task
stored_jobs = ParserJob.queued.count
return if stored_jobs > 5000 # Rate limiting
# Fetch companies without pending jobs
saved_ids = ParserJob.where(status: [:queued, :processing]).pluck(:company_id)
# Create ParserJob records for new data
companies_data.each do |data|
ParserJob.create!(raw_data: data, company_id: company.id, status: :queued)
end
end
LoadCompaniesJob
def perform
queue = Sidekiq::Queue.new("api_parsers")
return if queue.size > 100 # Queue management
ParserJob.queued.find_each do |job|
UpdateCompanyJob.perform_later_if_unique(job.id) # Duplicate prevention
end
end
UpdateCompanyJob
def perform(parser_job_id)
job = ParserJob.find(parser_job_id)
job.mark_as_processing!
# Process with CompanyParser
raw_data = job.normalize_raw_data
parser = CompanyParser.new(raw_data, job.company)
parser.update
if parser.validate_extracted_data
job.mark_as_completed!
job.destroy # Cleanup successful jobs
else
job.mark_as_failed!(parser.errors)
end
rescue => e
job.mark_as_failed!(e.message)
end
UpdateCompletionScoresJob (Enhanced for Daily Updates)
def perform(batch_size: 1000, force_full_update: false)
last_run_time = get_last_run_time
# Only update companies modified since last run
companies_scope = if force_full_update
Companies::Info.all
else
# Check both companies.updated and companies_infos.updated_at
company_ids = Company.where("updated > ?", last_run_time).pluck(:id)
info_ids = Companies::Info.where("updated_at > ?", last_run_time).pluck(:id)
updated_ids = (company_ids + info_ids).uniq
return if updated_ids.empty? # Skip if no updates
Companies::Info.where(id: updated_ids)
end
# Bulk update with optimized SQL
companies_scope.find_in_batches(batch_size: batch_size) do |batch|
# SQL CASE statement for bulk completion score updates
end
update_last_run_time # Track for next run
end
Usage Examples
Basic Job Operations
# Manual job enqueueing
UpdateStatsJob.perform_later
FetchCompaniesDataJob.perform_later(batch_size: 1000)
# With duplicate prevention
UpdateStatsJob.perform_later_if_unique
UpdateCompanyJob.perform_later_if_unique(parser_job_id)
# Force full updates
UpdateCompletionScoresJob.perform_later(force_full_update: true)
ParserJob Management
# Create new parser job
job = ParserJob.create!(
company: company,
raw_data: api_data,
status: :queued
)
# Process jobs
ParserJob.process_queued_jobs(10) # Process 10 queued jobs
ParserJob.retry_failed_jobs(5) # Retry 5 failed jobs
# Monitoring
ParserJob.stats
# => { queued: 45, processing: 2, completed: 1230, failed: 15 }
# Cleanup
ParserJob.cleanup_completed_jobs(older_than: 1.month.ago)
API Integration
class Api::JobsController < ApplicationController
def trigger_stats_update
if UpdateStatsJob.perform_later_if_unique
render json: { status: "scheduled" }
else
render json: { status: "already_in_progress" }
end
end
def trigger_company_update
parser_job_id = params[:parser_job_id]
if UpdateCompanyJob.perform_later_if_unique(parser_job_id)
render json: { status: "scheduled", parser_job_id: parser_job_id }
else
render json: { status: "duplicate", parser_job_id: parser_job_id }
end
end
end
Batch Processing with Duplicate Prevention
parser_job_ids = [1, 2, 3, 2, 4, 1] # Note duplicates
enqueued_jobs = parser_job_ids.filter_map do |id|
if UpdateCompanyJob.perform_later_if_unique(id)
id
else
Rails.logger.info "Skipping duplicate job for parser_job_id: #{id}"
nil
end
end
puts "Enqueued #{enqueued_jobs.size} unique jobs out of #{parser_job_ids.size} requests"
# Output: Enqueued 4 unique jobs out of 6 requests
Rake Tasks
Parser Job Management
# Process queued jobs
bin/rails parser_jobs:process[10]
# Retry failed jobs
bin/rails parser_jobs:retry[5]
# Show statistics
bin/rails parser_jobs:stats
# Clean up old completed jobs
bin/rails parser_jobs:cleanup[30] # older than 30 days
# Reset stuck processing jobs
bin/rails parser_jobs:reset_processing
# Migrate from old system
bin/rails parser_jobs:migrate
Interactive Data Fetching
# Interactive company data fetching
bin/rails companies:fetch_data
# Example session:
# Enter start company ID: 1000
# Enter count (default: 5000): 2000
# Proceed with enqueueing? (y/n) y
Best Practices
1. Use Duplicate Prevention for Resource-Intensive Jobs
# Good: Prevent duplicate expensive operations
FetchCompaniesDataJob.perform_later_if_unique(api_endpoint)
ProcessLargeFileJob.perform_later_if_unique(file_path)
# Bad: Risk of duplicate processing
FetchCompaniesDataJob.perform_later(api_endpoint)
2. Combine with Job Status Tracking
class UpdateCompanyJob < ApplicationJob
def self.status_for_company(parser_job_id)
if already_enqueued?(parser_job_id)
"in_progress"
else
"ready"
end
end
end
3. Monitor Job Queues
class JobMonitor
def self.queue_status
{
update_stats_pending: UpdateStatsJob.already_enqueued?,
company_updates: Sidekiq::Queue.new("api_parsers").size,
stats_updates: Sidekiq::Queue.new("stats").size,
parser_jobs: ParserJob.stats
}
end
end
4. Incremental Processing
# Use timestamp tracking for incremental updates
def perform
last_run = get_last_run_time
# Only process records updated since last run
records = Model.where("updated_at > ?", last_run)
return if records.empty?
# Process records...
update_last_run_time
end
5. Error Handling and Retries
class MyJob < ApplicationJob
rescue_from(StandardError) do |exception|
# Log error details
Rails.logger.error "Job failed: #{exception.message}"
# Mark parser job as failed if applicable
if @parser_job_id
parser_job = ParserJob.find_by(id: @parser_job_id)
parser_job&.mark_as_failed!(exception.message)
end
end
end
6. Queue Management
def perform
# Check queue size before adding more jobs
queue = Sidekiq::Queue.new("my_queue")
return if queue.size > 100 # Prevent queue overflow
# Proceed with job logic...
end
Migration Benefits
The unified job system provides:
- Single Source of Truth: All parser jobs in one model
- Better Tracking: Clear status progression with timestamps
- Duplicate Prevention: Automatic prevention of duplicate job scheduling
- Retry Capability: Easy retry of failed jobs
- Statistics: Built-in monitoring and reporting
- Cleanup: Automatic removal of old completed jobs
- Error Context: Better error messages and debugging
- Performance: Optimized queries and bulk operations
- Incremental Updates: Process only changed data
- Queue Management: Prevent queue overflow and resource exhaustion
This comprehensive job system ensures reliable, efficient, and maintainable background processing for the ODAPI application.