Examples

Basic query with Task semantics

import { DB, createInMemory } from "@anabranch/db";

const users = await DB.withConnection(createInMemory(), (db) =>
  db.query("SELECT * FROM users WHERE active = ?", [true])
).run();

Streaming large result sets with error collection

import { DB, createInMemory } from "@anabranch/db";

const { successes, errors } = await DB.withConnection(createInMemory(), (db) =>
  db.stream("SELECT * FROM large_table")
    .withConcurrency(10)
    .map(row => processRow(row))
    .partition()
).run();

Transactions with automatic rollback on error

import { DB, ConstraintViolation, createInMemory } from "@anabranch/db";

const result = await DB.withConnection(createInMemory(), (db) =>
  db.withTransaction(async (tx) => {
    await tx.execute("INSERT INTO orders (user_id) VALUES (?)", [userId]);
    await tx.execute("UPDATE users SET order_count = order_count + 1 WHERE id = ?", [userId]);
    return tx.query("SELECT last_insert_rowid()");
  })
).recoverWhen(
  (e) => e instanceof ConstraintViolation,
  (e) => ({ id: 0, error: e.message })
).run();

Retry with exponential backoff

import { DB, createPostgres } from "@anabranch/db";
import { createPostgres } from "@anabranch/db-postgres";

const users = await DB.withConnection(createPostgres(), (db) =>
  db.query("SELECT * FROM users")
    .retry({ attempts: 3, delay: (attempt) => 100 * Math.pow(2, attempt) })
).run();