Skip to main content

Scheduler — SchedulerStore & createSchedulerTools

The scheduler module provides persistent, SQLite-backed job scheduling for ScheduledChannel. It lets agents drive their own future invocations — create recurring cron jobs or one-shot run-at jobs, list pending work, cancel or update jobs, all from within the agent's own tool calls.

Contents


Overview

import {
SchedulerStore,
createSchedulerTools,
ScheduledChannel,
} from '@toolpack-sdk/agents';
import { Toolpack } from 'toolpack-sdk';

// 1. Create the store (SQLite, WAL mode)
const store = new SchedulerStore({ dbPath: './scheduler.db' });

// 2. Expose scheduler tools to the LLM
const toolpack = await Toolpack.init({
provider: 'anthropic',
tools: true,
customTools: [createSchedulerTools(store)],
});

// 3. Wire the store into a ScheduledChannel
const channel = new ScheduledChannel({ name: 'dynamic', store });

The agent can then call scheduler.create, scheduler.list, scheduler.cancel, and scheduler.update during any invocation to manage its own future runs.


SchedulerStore

SQLite-backed persistent store for scheduled jobs. Uses WAL journal mode and enforces foreign keys.

Constructor

new SchedulerStore({ dbPath?: string })
OptionTypeDefaultDescription
dbPathstring':memory:'Path to the SQLite database file. Use :memory: for tests.
// Persistent
const store = new SchedulerStore({ dbPath: './jobs.db' });

// In-memory (tests)
const store = new SchedulerStore();

create

store.create(opts: CreateJobOptions): CreateJobResult

Create a new scheduled job. Exactly one of cron (recurring) or runAt (one-shot) must be provided.

Parameters — CreateJobOptions:

FieldTypeDescription
cronstringCron expression for a recurring job.
runAtDate | numberExact time for a one-shot job (Date or epoch ms).
intentstringIntent hint forwarded to AgentInput.intent on trigger.
messagestringMessage forwarded to AgentInput.message on trigger.
payloadRecord<string, unknown>Extra data merged into AgentInput.data on trigger.
channelNamestringScopes the job to a specific channel.

Returns — CreateJobResult:

interface CreateJobResult {
job: ScheduledJob;
duplicate: boolean; // true if an existing pending job matched the dedup key
}

Throws if neither cron nor runAt is provided, if both are provided, if the cron expression is invalid, or if runAt is NaN.

// Recurring job
const { job } = store.create({
intent: 'weekly_report',
cron: '0 9 * * 1', // 9am every Monday
message: 'Generate weekly summary',
channelName: 'report-channel',
});

// One-shot job
store.create({
intent: 'onboarding_followup',
runAt: new Date('2026-06-01T10:00:00Z'),
payload: { userId: 'usr_123' },
});

get

store.get(id: string): ScheduledJob | undefined

Get a single job by ID. Returns undefined if not found.


list

store.list(filter?: {
status?: JobStatus | 'all';
channelName?: string;
limit?: number;
}): ScheduledJob[]

List jobs, ordered by nextRunAt ascending.

OptionDefaultDescription
status'pending'Filter by status. Pass 'all' to include every status.
channelNameFilter to a specific channel.
limit20Maximum number of jobs to return.

getDue

store.getDue(now?: number, channelName?: string): ScheduledJob[]

Return all pending jobs with nextRunAt <= now. Used by ScheduledChannel for missed-run recovery and normal polling. Pass channelName to scope results.


getNextPending

store.getNextPending(channelName?: string): ScheduledJob | undefined

Return the single next pending job (earliest nextRunAt). Used by ScheduledChannel to calculate optimal sleep duration between polls.


update

store.update(id: string, updates: {
cron?: string;
runAt?: Date | number;
intent?: string;
message?: string;
payload?: Record<string, unknown>;
}): ScheduledJob | undefined

Update an existing pending job. Returns the updated job, or undefined if not found.

Rules:

  • Cannot update a job that is not in pending status — throws.
  • Cannot provide both cron and runAt — throws.
  • Providing runAt clears the cron field, converting a recurring job to a one-shot.
  • Providing cron recalculates nextRunAt from the new expression.
  • Invalid cron or NaN runAt throw immediately.

cancel

store.cancel(id: string): boolean

Cancel a pending job. Returns true if cancelled, false if the job was not found or was not in pending state. Only pending jobs can be cancelled — jobs already running are unaffected.


markRunning

store.markRunning(id: string): void

Set a job's status to 'running'. Called by ScheduledChannel synchronously (before any await) when a due job fires, ensuring the state change is visible immediately.


markCompleted

store.markCompleted(id: string): void

Mark a job as completed after a successful agent invocation.

  • Recurring jobs (cron set): status returns to 'pending', nextRunAt is recalculated from now, lastError is cleared.
  • One-shot jobs (cron not set): status becomes 'completed' permanently.

If the stored cron expression is corrupt at completion time, the job is marked 'failed' rather than crashing the scheduler loop.


markFailed

store.markFailed(id: string, error: string): void

Mark a job as failed after an agent invocation throws.

  • Recurring jobs: status returns to 'pending', nextRunAt is recalculated, lastError is set.
  • One-shot jobs: status becomes 'failed' permanently, lastError is set.

Recurring jobs with a corrupt cron expression are marked 'failed' permanently (composite error message).


resetStuck

store.resetStuck(channelName?: string): number

Reset any 'running' jobs back to 'pending'. Returns the number of jobs reset.

When to call: ScheduledChannel calls this automatically on its first listen() in a process (crash recovery). A job becomes stuck in 'running' if the process crashed between markRunning() and markCompleted()/markFailed(). Without this call, stuck jobs would never fire again because getDue() only returns 'pending' jobs.

Do not call on subsequent stop()+listen() cycles within the same process — in-flight handlers from the previous cycle already hold the job in 'running' state, and calling resetStuck() again would race them back to 'pending', causing double-execution.


close

store.close(): void

Close the underlying SQLite connection. Call this in your shutdown handler if you manage the store's lifecycle manually.


createSchedulerTools

import { createSchedulerTools } from '@toolpack-sdk/agents';

createSchedulerTools(store: SchedulerStore): ToolProject

Returns a ToolProject containing four scheduler tools. Register it as customTools when initialising Toolpack so the LLM can manage its own schedule.

const store = new SchedulerStore({ dbPath: './scheduler.db' });

const toolpack = await Toolpack.init({
provider: 'anthropic',
tools: true,
customTools: [createSchedulerTools(store)],
});

scheduler.create

Schedule a new recurring or one-shot agent invocation.

Parameters:

NameTypeRequiredDescription
cronstringone ofCron expression for a recurring job. E.g. "0 9 * * 1" = 9am every Monday.
run_atstringone ofISO 8601 timestamp for a one-shot job. E.g. "2026-07-01T09:00:00Z".
intentstringIntent hint forwarded to the agent on trigger.
messagestringMessage forwarded to the agent on trigger.
channel_namestringScope this job to a specific channel.
payloadobjectExtra data merged into AgentInput.data on trigger.

Either cron or run_at must be provided — not both. The tool validates run_at as a proper ISO 8601 timestamp (natural-language strings like "next Tuesday" are rejected).

Returns (string): Confirmation with job ID and next run time, or an error message.

Deduplication: If a pending job with the same (intent, cron, channel_name) or (intent, run_at, channel_name) already exists, the existing job is returned and no duplicate is created.


scheduler.list

List scheduled jobs.

Parameters:

NameTypeDefaultDescription
status'pending' | 'running' | 'completed' | 'failed' | 'cancelled' | 'all''pending'Filter by status.
channel_namestringFilter by channel name.
limitnumber10Maximum number of results (capped at 50).

Returns (string): A formatted list of jobs with ID, status, type, next run time, and last error (if any).


scheduler.cancel

Cancel a pending job by ID.

Parameters:

NameTypeRequiredDescription
job_idstringID of the job to cancel.

Returns (string): Confirmation, or an error if the job was not found or not in pending state.


scheduler.update

Modify an existing pending job.

Parameters:

NameTypeRequiredDescription
job_idstringID of the job to update.
cronstringNew cron expression. Recalculates nextRunAt.
run_atstringNew one-shot run time (ISO 8601). Clears cron, converting recurring → one-shot.
intentstringNew intent.
messagestringNew message.
payloadobjectNew payload (replaces existing).

Cannot provide both cron and run_at. Invalid expressions are rejected with an error message.

Returns (string): Confirmation with updated next run time, or an error message.


ScheduledJob type

interface ScheduledJob {
id: string; // UUIDv4
channelName?: string; // Channel this job belongs to
nextRunAt: number; // Next execution time (epoch ms)
cron?: string; // Set for recurring jobs
intent?: string; // Forwarded to AgentInput.intent
message?: string; // Forwarded to AgentInput.message
payload?: Record<string, unknown>; // Merged into AgentInput.data
status: JobStatus; // Current lifecycle state
lastRunAt?: number; // Epoch ms of last execution
lastError?: string; // Error from last failed execution
createdAt: number; // Epoch ms when created
}

type JobStatus = 'pending' | 'running' | 'completed' | 'failed' | 'cancelled';

Job lifecycle

                    ┌──────────┐
│ pending │◄─────────────────────────────┐
└────┬─────┘ │
│ markRunning() (sync, before await) │
┌────▼─────┐ │
│ running │ │
└────┬─────┘ │
┌──────────┴──────────┐ │
│ markCompleted() │ markFailed() │
┌──────▼──────┐ ┌───────▼──────┐ │
│ completed │ │ failed │ │
│ (one-shot) │ │ (one-shot) │ │
└─────────────┘ └──────────────┘ │

Recurring jobs (cron set): │
markCompleted() → nextRunAt recalculated → pending ──────┘
markFailed() → nextRunAt recalculated → pending ──────┘

cancel() can only move a job from pendingcancelled. Once a job is running it cannot be cancelled externally — it completes or fails normally.


Deduplication

store.create() checks for an existing pending job before inserting:

Job typeDedup key
Recurring (cron)(intent, cron, channelName)
One-shot (runAt)(intent, nextRunAt, channelName)

If a match is found, the existing job is returned with duplicate: true and no new row is written. This makes it safe to call store.create() (or scheduler.create via the LLM) on every startup without accumulating duplicates.

The same deduplication applies to the static cron seed inserted by ScheduledChannel in hybrid mode — restarts are idempotent.