Stream.splitBy<K extends string | number | symbol>(keys: readonly K[],cb: (value: T,arrivalIndex: number) => Promisable<K>,bufferSize: number,options?: SplitOptions): Record<K,>
Splits the stream into separate streams based on computed keys. Each result is sent to the stream corresponding to its computed key. If a result's key does not match any of the provided keys, an error result is emitted for that value. The split streams share the same source, so they are not independent; if one split stream is slower than the others, it will cause backpressure on the source stream and all other splits until it catches up. Use with caution to avoid unintended bottlenecks.
Example 1
Example 1
import { Source } from "anabranch"; const stream = Source.from<{ type: "a" | "b"; value: number }, never>(async function* () { yield { type: "a", value: 1 }; yield { type: "b", value: 2 }; yield { type: "a", value: 3 }; }); const splits = stream.splitBy( ["a", "b"] as const, (item) => item.type, 10 ); await Promise.all([ splits.a.tap(v => console.log("Stream A:", v)).toArray(), splits.b.tap(v => console.log("Stream B:", v)).toArray(), ]);
Be mindful of consuming all splits to avoid unintended backpressure. If one of the split streams is slower further down the pipeline, the backpressure will propagate all the way up to the source and affect all other splits, even if they are fast. Always ensure that you consume all split streams at a reasonable pace to keep the data flowing smoothly.