The Kysely adapter provides PostgreSQL support using Kysely, offering excellent TypeScript integration and performance.
Installation #
npm install @vorsteh-queue/core @vorsteh-queue/adapter-kysely
pnpm add @vorsteh-queue/core @vorsteh-queue/adapter-kysely
bun add @vorsteh-queue/core @vorsteh-queue/adapter-kysely
yarn add @vorsteh-queue/core @vorsteh-queue/adapter-kysely
Quick Start #
import { Kysely } from "kysely"
import { PostgresJSDialect } from "kysely-postgres-js"
import postgres from "postgres"
import type { QueueJobTableDefinition } from "@vorsteh-queue/adapter-kysely/types"
import { PostgresQueueAdapter } from "@vorsteh-queue/adapter-kysely"
import { Queue } from "@vorsteh-queue/core"
interface DB {
queue_jobs: QueueJobTableDefinition
other_table: {
name: string
}
}
// Shared database connection
const client = postgres(
process.env.DATABASE_URL || "postgresql://postgres:password@localhost:5432/queue_db",
{ max: 10 }, // Connection pool
)
const db = new Kysely<DB>({
dialect: new PostgresJSDialect({
postgres: client,
}),
})
const adapter = new PostgresQueueAdapter(db)
const queue = new Queue(adapter)
Supported providers #
- PGlite
- Postgres.JS
- Node Progress
Database Setup #
Schema #
The adapter includes a pre-defined migration:
// migrations/queue-jobs.ts
import { down, up } from "@vorsteh-queue/adapter-kysely/migrations"
// Use in your schema file
export { up, down }
If you don't want to use the pre-defined schema, you can create your own migration.
queue-jobs.ts
import type { Kysely } from "kysely"
import { sql } from "kysely"
export async function up(db: Kysely<unknown>) {
await db.schema
.createTable("queue_jobs")
.addColumn("id", "uuid", (col) => col.defaultTo(sql`gen_random_uuid()`).notNull())
.addColumn("queue_name", "varchar(255)", (col) => col.notNull())
.addColumn("name", "varchar(255)", (col) => col.notNull())
.addColumn("payload", "jsonb", (col) => col.notNull())
.addColumn("status", "varchar(50)", (col) => col.notNull())
.addColumn("priority", "int4", (col) => col.notNull())
.addColumn("attempts", "int4", (col) => col.defaultTo(0).notNull())
.addColumn("max_attempts", "int4", (col) => col.notNull())
.addColumn("timeout", "jsonb")
.addColumn("cron", "varchar(255)")
.addColumn("created_at", "timestamptz", (col) =>
col.defaultTo(sql`timezone('utc'::text, now())`).notNull(),
)
.addColumn("process_at", "timestamptz", (col) => col.notNull())
.addColumn("processed_at", "timestamptz")
.addColumn("completed_at", "timestamptz")
.addColumn("failed_at", "timestamptz")
.addColumn("error", "jsonb")
.addColumn("result", "jsonb")
.addColumn("progress", "int4")
.addColumn("repeat_every", "int4")
.addColumn("repeat_limit", "int4")
.addColumn("repeat_count", "int4")
.execute()
await db.schema
.createIndex("idx_queue_jobs_status_priority")
.on("queue_jobs")
.columns(["queue_name", "status", "priority", "created_at"])
.execute()
await db.schema
.createIndex("idx_queue_jobs_process_at")
.on("queue_jobs")
.column("process_at")
.execute()
}
export async function down(db: Kysely<unknown>) {
await db.schema.dropTable("queue_jobs").execute()
}
Migration #
To run the migrations, we recommend to use kysely-ctl
.
kysely-ctl
is the command-line tool for Kysely.