The Kysely adapter provides PostgreSQL support using Kysely, offering excellent TypeScript integration and performance.
Installation #
npm install @vorsteh-queue/core @vorsteh-queue/adapter-kyselypnpm add @vorsteh-queue/core @vorsteh-queue/adapter-kyselyyarn add @vorsteh-queue/core @vorsteh-queue/adapter-kyselybun add @vorsteh-queue/core @vorsteh-queue/adapter-kyselyQuick Start #
import { Kysely } from "kysely"
import { PostgresJSDialect } from "kysely-postgres-js"
import postgres from "postgres"
import type { QueueJobTableDefinition } from "@vorsteh-queue/adapter-kysely/types"
import { PostgresQueueAdapter } from "@vorsteh-queue/adapter-kysely"
import { Queue } from "@vorsteh-queue/core"
interface DB {
queue_jobs: QueueJobTableDefinition
other_table: {
name: string
}
}
// Shared database connection
const client = postgres(
process.env.DATABASE_URL || "postgresql://postgres:password@localhost:5432/queue_db",
{ max: 10 }, // Connection pool
)
const db = new Kysely<DB>({
dialect: new PostgresJSDialect({
postgres: client,
}),
})
const adapter = new PostgresQueueAdapter(db)
const queue = new Queue(adapter)
Supported providers #
- PGlite - https://github.com/czeidler/kysely-pglite-dialect
- Postgres.JS - https://github.com/kysely-org/kysely-postgres-js
- Node Progress - https://kysely-org.github.io/kysely-apidoc/classes/PostgresDialect.html
Database Setup #
Schema #
The adapter includes a pre-defined migration:
migrations/queue-jobs.ts
export { down, up } from "@vorsteh-queue/adapter-kysely/migrations"
If you don't want to use the pre-defined schema, you can create your own migration.
/migrations/queue-jobs.ts
import type { Kysely } from "kysely"
import { sql } from "kysely"
export async function up(db: Kysely<unknown>) {
await db.schema
.createTable("queue_jobs")
.addColumn("id", "uuid", (col) => col.defaultTo(sql`gen_random_uuid()`).notNull())
.addColumn("queue_name", "varchar(255)", (col) => col.notNull())
.addColumn("name", "varchar(255)", (col) => col.notNull())
.addColumn("payload", "jsonb", (col) => col.notNull())
.addColumn("status", "varchar(50)", (col) => col.notNull())
.addColumn("priority", "int4", (col) => col.notNull())
.addColumn("attempts", "int4", (col) => col.defaultTo(0).notNull())
.addColumn("max_attempts", "int4", (col) => col.notNull())
.addColumn("timeout", "jsonb")
.addColumn("cron", "varchar(255)")
.addColumn("created_at", "timestamptz", (col) =>
col.defaultTo(sql`timezone('utc'::text, now())`).notNull(),
)
.addColumn("process_at", "timestamptz", (col) => col.notNull())
.addColumn("processed_at", "timestamptz")
.addColumn("completed_at", "timestamptz")
.addColumn("failed_at", "timestamptz")
.addColumn("error", "jsonb")
.addColumn("result", "jsonb")
.addColumn("progress", "int4")
.addColumn("repeat_every", "int4")
.addColumn("repeat_limit", "int4")
.addColumn("repeat_count", "int4")
.execute()
await db.schema
.createIndex("idx_queue_jobs_status_priority")
.on("queue_jobs")
.columns(["queue_name", "status", "priority", "created_at"])
.execute()
await db.schema
.createIndex("idx_queue_jobs_process_at")
.on("queue_jobs")
.column("process_at")
.execute()
}
export async function down(db: Kysely<unknown>) {
await db.schema.dropTable("queue_jobs").execute()
}Custom Table & Schema Names #
To create a table with custom name and schema, you can use the createQueueJobsTable helper, which will create the table without taking care about the field definitions:
migrations/queue-jobs.ts
import { createQueueJobsTable } from "@vorsteh-queue/adapter-kysely"
export const { up, down } = createQueueJobsTable(
// your custom table name
"custom_queue_jobs",
ur custom schema name
"custom_schema",
)
To use the custom table and schema in the adapter, pass the adapter configuration to the PostgresQueueAdapter:
import { Kysely } from "kysely"
import type { QueueJobTableDefinition } from "@vorsteh-queue/adapter-kysely/types"
import { PostgresQueueAdapter } from "@vorsteh-queue/adapter-kysely"
interface DB {
custom_queue_jobs: QueueJobTableDefinition
}
const db = new Kysely<DB>({
//... your Kysely configuration
})
const adapter = new PostgresQueueAdapter(db, {
// Your custom schema name
schemaName: "custom_schema",
// Your custom table name
tableName: "custom_queue_jobs",
})
Want to see it in action?
Checkout our kysely example with custom schema and
table.
Migration #
To run the migrations, we recommend to use kysely-ctl.
kysely-ctlis the command-line tool for Kysely.