Basic query with Task semantics
Basic query with Task semantics
import { DB, createInMemory } from "@anabranch/db"; const users = await DB.withConnection(createInMemory(), (db) => db.query("SELECT * FROM users WHERE active = ?", [true]) ).run();
Streaming large result sets with error collection
Streaming large result sets with error collection
import { DB, createInMemory } from "@anabranch/db"; const { successes, errors } = await DB.withConnection(createInMemory(), (db) => db.stream("SELECT * FROM large_table") .withConcurrency(10) .map(row => processRow(row)) .partition() ).run();
Transactions with automatic rollback on error
Transactions with automatic rollback on error
import { DB, ConstraintViolation, createInMemory } from "@anabranch/db"; const result = await DB.withConnection(createInMemory(), (db) => db.withTransaction(async (tx) => { await tx.execute("INSERT INTO orders (user_id) VALUES (?)", [userId]); await tx.execute("UPDATE users SET order_count = order_count + 1 WHERE id = ?", [userId]); return tx.query("SELECT last_insert_rowid()"); }) ).recoverWhen( (e) => e instanceof ConstraintViolation, (e) => ({ id: 0, error: e.message }) ).run();
Retry with exponential backoff
Retry with exponential backoff
import { DB } from "@anabranch/db"; import { createPostgres } from "@anabranch/db-postgres"; const users = await DB.withConnection(createPostgres(), (db) => db.query("SELECT * FROM users") .retry({ attempts: 3, delay: (attempt) => 100 * Math.pow(2, attempt) }) ).run();
Pub/sub with in-memory connector (swap for createPostgres in production)
Pub/sub with in-memory connector (swap for createPostgres in production)
import { createInMemory } from "@anabranch/db"; const connector = createInMemory(); const ch = await connector.listen("orders").run(); await connector.notify("orders", JSON.stringify({ id: 1 })).run(); for await (const n of ch.successes()) { console.log(n.payload); }
Failed to close the database connection.
Failed to establish a database connection.
Constraint violation (e.g., unique, foreign key).
Database wrapper with Task/Stream semantics.
-
execute(): Task<number, QueryFailed | ConstraintViolation>sql: string,params?: unknown[]
Execute INSERT/UPDATE/DELETE and return affected row count.
-
executeBatch(): Task<number[], QueryFailed | ConstraintViolation>sql: string,paramsArray: unknown[][]
Execute multiple commands in a batch. Leverages optimized adapter method if available.
-
from(adapter: DBAdapter): DB
Wrap an existing adapter in a DB instance.
-
query<T extends Record<string, any> = Record<string, any>>(): Task<T[], QueryFailed | ConstraintViolation>sql: string,params?: unknown[]
Execute a SELECT query and return rows.
-
stream<T extends Record<string, any> = Record<string, any>>(): Source<T, QueryFailed>sql: string,params?: unknown[]
Stream rows from a SELECT query for memory-efficient processing.
- transaction(): Task<DBTransaction, TransactionFailed>
-
withConnection<R, E>(): Task<R, E | ConnectionFailed>connector: DBConnector,fn: (db: DB) => Task<R, E>
Execute operations with a connection acquired from the connector. The connection is automatically released after the operation completes, whether successful or failed.
-
withTransaction<R>(fn: (tx: DBTransaction) => Promisable<R>): Task<R, TransactionFailed | QueryFailed | ConstraintViolation>
Execute a callback within a transaction. Supports nested transactions via SQL savepoints.
Structured error types for database operations.
Database transaction with Task semantics.
- commit(): Task<void, TransactionFailed>
-
execute(): Task<number, QueryFailed | ConstraintViolation>sql: string,params?: unknown[]
-
executeBatch(): Task<number[], QueryFailed | ConstraintViolation>sql: string,paramsArray: unknown[][]
-
query<T>(): Task<T[], QueryFailed | ConstraintViolation>sql: string,params?: unknown[]
- rollback(): Task<void, TransactionFailed>
- settled: boolean
-
withTransaction<R>(fn: (tx: DBTransaction) => Promisable<R>): Task<R, TransactionFailed | QueryFailed | ConstraintViolation>
Execute a callback within a nested transaction (savepoint).
Failed to establish or maintain a pub/sub subscription.
Query execution failed.
Serialization failure (concurrent modification detected).
Transaction failed.
& { listen(channel: string): Task<Channel<Notification, ListenFailed>, ListenFailed>; notify(
Creates an in-memory SQLite connector for testing.
Database adapter interface for DB-agnostic operations.
-
close(): Promise<void>
Release the connection back to its source (e.g., pool).
-
execute(): Promise<number>sql: string,params?: unknown[]
Execute INSERT/UPDATE/DELETE and return affected row count.
-
executeBatch(): Promise<number[]>sql: string,paramsArray: unknown[][]
Execute multiple commands in a batch. Implementation should optimize this if possible (e.g. using prepared statements or multi-row inserts).
-
query<T extends Record<string, any> = Record<string, any>>(): Promise<T[]>sql: string,params?: unknown[]
Execute a SELECT query and return rows.
-
stream<T extends Record<string, any> = Record<string, any>>(): AsyncIterable<T>sql: string,params?: unknown[]
Stream rows from a SELECT query using a cursor.
Connector that produces connected DBAdapter instances.
-
connect(signal?: AbortSignal): Promise<DBAdapter>
Acquire a connected adapter.
Transaction adapter interface.
-
commit(): Promise<void>
Commit the transaction.
-
execute(): Promise<number>sql: string,params?: unknown[]
Execute INSERT/UPDATE/DELETE and return affected row count.
-
executeBatch(): Promise<number[]>sql: string,paramsArray: unknown[][]
Execute multiple commands in a batch.
-
query<T extends Record<string, any> = Record<string, any>>(): Promise<T[]>sql: string,params?: unknown[]
Execute a SELECT query and return rows.
-
rollback(): Promise<void>
Rollback the transaction.
A notification received from a pub/sub channel.
-
channel: string
The channel the notification was sent to.
-
payload: string
The payload string.
Registry of error constructors for instanceof checks.