Concurrent uploads with retry and backpressure
Concurrent uploads with retry and backpressure
import { Task, createGcs } from "@anabranch/storage-gcs"; import { Storage } from "@anabranch/storage"; const connector = createGcs({ bucket: "uploads", projectId: "my-project" }); const storage = await Storage.connect(connector).run(); const files = ["a.txt", "b.txt", "c.txt"]; await Task.all( files.map((file) => storage.put(file, Deno.readFileSync(`./${file}`)) .retry({ attempts: 3, delay: (i) => 500 * Math.pow(2, i) }) .timeout(30_000) ) ).run();
Upload with signed URL and result handling
Upload with signed URL and result handling
import { Task, createGcs } from "@anabranch/storage-gcs"; import { Storage } from "@anabranch/storage"; const connector = createGcs({ bucket: "images", projectId: "my-project" }); const storage = await Storage.connect(connector).run(); const result = await storage .presign("photo.jpg", { method: "PUT", expiresIn: 3600 }) .result(); if (result.type === "error") { console.error("Failed to generate upload URL:", result.error); return; } const uploadUrl = result.value; await Task.of(async () => { const response = await fetch(uploadUrl, { method: "PUT", body: await Deno.readFile("photo.jpg"), headers: { "Content-Type": "image/jpeg" }, }); if (!response.ok) throw new Error("Upload failed"); }) .retry({ attempts: 3 }) .timeout(60_000) .run();
Process list results with concurrency limits
Process list results with concurrency limits
const storage = await Storage.connect( createGcs({ bucket: "logs", prefix: "archive/" }) ).run(); const { successes, errors } = await storage.list() .withConcurrency(10) .map(async (entry) => { const obj = await storage.get(entry.key).run(); const text = await new Response(obj.body).text(); return { key: entry.key, lines: text.split("\n").length }; }) .tapErr((err) => console.error("Failed processing:", err)) .partition(); console.log(`Processed ${successes.length} files with ${errors.length} errors`);