Examples

Concurrent uploads with retry and backpressure

import { Task, createGcs } from "@anabranch/storage-gcs";
import { Storage } from "@anabranch/storage";

const connector = createGcs({ bucket: "uploads", projectId: "my-project" });
const storage = await Storage.connect(connector).run();

const files = ["a.txt", "b.txt", "c.txt"];

await Task.all(
  files.map((file) =>
    storage.put(file, Deno.readFileSync(`./${file}`))
      .retry({ attempts: 3, delay: (i) => 500 * Math.pow(2, i) })
      .timeout(30_000)
  )
).run();

Upload with signed URL and result handling

import { Task, createGcs } from "@anabranch/storage-gcs";
import { Storage } from "@anabranch/storage";

const connector = createGcs({ bucket: "images", projectId: "my-project" });
const storage = await Storage.connect(connector).run();

const result = await storage
  .presign("photo.jpg", { method: "PUT", expiresIn: 3600 })
  .result();

if (result.type === "error") {
  console.error("Failed to generate upload URL:", result.error);
  return;
}

const uploadUrl = result.value;

await Task.of(async () => {
  const response = await fetch(uploadUrl, {
    method: "PUT",
    body: await Deno.readFile("photo.jpg"),
    headers: { "Content-Type": "image/jpeg" },
  });
  if (!response.ok) throw new Error("Upload failed");
})
.retry({ attempts: 3 })
.timeout(60_000)
.run();

Process list results with concurrency limits

const storage = await Storage.connect(
  createGcs({ bucket: "logs", prefix: "archive/" })
).run();

const { successes, errors } = await storage.list()
  .withConcurrency(10)
  .map(async (entry) => {
    const obj = await storage.get(entry.key).run();
    const text = await new Response(obj.body).text();
    return { key: entry.key, lines: text.split("\n").length };
  })
  .tapErr((err) => console.error("Failed processing:", err))
  .partition();

console.log(`Processed ${successes.length} files with ${errors.length} errors`);