The core package contains the main queue engine and all essential functionality for job processing, scheduling, and event management.
Installation #
npm install @vorsteh-queue/corepnpm add @vorsteh-queue/coreyarn add @vorsteh-queue/corebun add @vorsteh-queue/coreKey Features #
- Queue Management - Create and manage multiple queues
- Job Processing - Register handlers and process jobs
- Event System - Comprehensive event lifecycle
- Progress Tracking - Real-time job progress updates
- Scheduling - Delayed and recurring jobs
- Priority System - Numeric priority-based processing
- Graceful Shutdown - Clean termination handling
Core Methods #
Registering Job Handlers #
Define how jobs should be processed:
import { MemoryQueueAdapter, Queue } from "@vorsteh-queue/core"
const queue = new Queue(new MemoryQueueAdapter(), { name: "example" })
interface Payload {
to: string
subject: string
}
interface Result {
messageId: string
sent: true
}
queue.register<Payload, Result>("send-email", async (job) => {
//await sendEmail(job.payload)
return { messageId: "msg_123", sent: true }
})
Adding Jobs #
Add jobs to the queue for processing:
import { MemoryQueueAdapter, Queue } from "@vorsteh-queue/core"
interface EmailPayload {
to: string
subject: string
}
const queue = new Queue(new MemoryQueueAdapter(), { name: "example" })
// Basic job
await queue.add("send-email", {
to: "user@example.com",
subject: "Welcome!",
})
// Job with options
const payload: EmailPayload = {
to: "user@example.com",
subject: "Welcome!",
}
await queue.add("send-email", payload, {
priority: 1,
delay: 5000,
maxAttempts: 5,
})
Queue Control #
Manage queue processing:
import { MemoryQueueAdapter, Queue } from "@vorsteh-queue/core"
const queue = new Queue(new MemoryQueueAdapter(), { name: "example" })
// Start processing jobs
queue.start()
// Pause processing
queue.pause()
// Resume processing
queue.resume()
// Graceful shutdown
await queue.stop()
Progress Tracking #
Update job progress in real-time:
import { MemoryQueueAdapter, Queue } from "@vorsteh-queue/core"
interface ProcessPayload {
items: string[]
}
interface ProcessResult {
processed: number
}
const queue = new Queue(new MemoryQueueAdapter(), { name: "example" })
queue.register<ProcessPayload, ProcessResult>("process-data", async (job) => {
const items = job.payload.items
for (let i = 0; i < items.length; i++) {
//await processItem(items[i])
// Update progress (0-100)
const progress = Math.round(((i + 1) / items.length) * 100)
await job.updateProgress(progress)
}
return { processed: items.length }
})
Scheduling Jobs #
Schedule jobs for future execution:
import { MemoryQueueAdapter, Queue } from "@vorsteh-queue/core"
interface CleanupPayload {
type: string
}
interface ReportPayload {
date: string
}
interface HealthCheckPayload {
endpoint: string
}
const queue = new Queue(new MemoryQueueAdapter(), { name: "example" })
// Delayed job
await queue.add<CleanupPayload>(
"cleanup",
{ type: "temp-files" },
{
delay: 60000, // 1 minute
},
)
// Recurring job with cron
await queue.add<ReportPayload>(
"daily-report",
{ date: new Date().toISOString() },
{
cron: "0 9 * * *", // Every day at 9 AM
timezone: "America/New_York",
},
)
// Recurring job with interval
await queue.add<HealthCheckPayload>(
"health-check",
{ endpoint: "/api/health" },
{
repeat: { every: 30000, limit: 10 }, // Every 30s, 10 times
},
)
Event Handling #
Listen to job and queue events:
import { MemoryQueueAdapter, Queue } from "@vorsteh-queue/core"
const queue = new Queue(new MemoryQueueAdapter(), { name: "example" })
// Job lifecycle events
queue.on("job:added", (job) => {
console.log(`Job ${job.name} added to queue`)
})
queue.on("job:completed", (job) => {
console.log(`Job ${job.name} completed`)
console.log("Result:", job.result)
})
queue.on("job:failed", (job) => {
console.error(`Job ${job.name} failed:`, job.error)
})
queue.on("job:progress", (job) => {
console.log(`Job ${job.name}: ${job.progress}% complete`)
})
// Queue control events
queue.on("queue:paused", () => {
console.log("Queue paused")
})
queue.on("queue:resumed", () => {
console.log("Queue resumed")
})
Queue Statistics #
Get queue metrics and status:
import { MemoryQueueAdapter, Queue } from "@vorsteh-queue/core"
const queue = new Queue(new MemoryQueueAdapter(), { name: "example" })
// Get current stats
const stats = await queue.getStats()
console.log(stats) // { pending: 5, processing: 2, completed: 100, failed: 3 }
// Clear jobs
await queue.clear() // Clear all jobs
await queue.clear("failed") // Clear only failed jobs
Error Handling #
Handle job failures and timeouts:
import { MemoryQueueAdapter, Queue } from "@vorsteh-queue/core"
interface RiskyJobPayload {
operation: string
data: unknown
}
interface RiskyJobResult {
success: boolean
error?: string
}
const queue = new Queue(new MemoryQueueAdapter(), { name: "example" })
queue.register<RiskyJobPayload, RiskyJobResult>("risky-job", async (job) => {
try {
//await riskyOperation(job.payload)
return { success: true }
} catch (error) {
throw error // Re-throw to mark job as failed
}
})
// Set job timeout
const payload: RiskyJobPayload = {
operation: "data-processing",
data: { items: [1, 2, 3] },
}
await queue.add("risky-job", payload, {
timeout: 30000, // 30 seconds
})
Memory Adapter #
Includes a built-in memory adapter for testing and development:
import { MemoryQueueAdapter, Queue } from "@vorsteh-queue/core"
const queue = new Queue(new MemoryQueueAdapter(), { name: "example" })
TypeScript Support #
Full TypeScript support with generic job payloads:
import { MemoryQueueAdapter, Queue } from "@vorsteh-queue/core"
const queue = new Queue(new MemoryQueueAdapter(), { name: "example" })
interface EmailPayload {
to: string
subject: string
body: string
}
interface EmailResult {
success: boolean
}
queue.register<EmailPayload, EmailResult>("send-email", async (job) => {
// job.payload is typed as EmailPayload
//await sendEmail(job.payload)
return {
success: true,
}
})