Goroutines und Channels sind die Bausteine — die wirklich tragenden Strukturen einer Go-Anwendung entstehen erst, wenn diese Bausteine zu Mustern zusammengesetzt werden. Drei Patterns decken nahezu jeden realen Anwendungsfall ab: die Pipeline verkettet Verarbeitungsstufen über Channels, der Worker-Pool begrenzt die Parallelität auf eine sinnvolle Zahl, und Fan-Out/Fan-In fächert Arbeit auf mehrere Goroutines auf und sammelt die Ergebnisse wieder ein. Wer diese drei Muster sauber beherrscht — inklusive Cancellation über context.Context und strukturierter Fehlerbehandlung mit errgroup — schreibt Concurrency-Code, der nicht nur schnell ist, sondern auch unter Last, bei Fehlern und beim Shutdown korrekt bleibt.
Dieser Artikel ist die Synthese des klassischen Go-Blog-Beitrags „Pipelines and Cancellation" von Sameer Ajmani mit den Erweiterungen, die seit 2014 idiomatisch geworden sind: context statt nacktem done-Channel, errgroup statt handgestrickter sync.WaitGroup-Fehlerbehandlung und SetLimit als Bounded-Worker-Pool ohne expliziten Pool-Code.
Das Pipeline-Pattern — Stages verketten
Eine Pipeline ist eine Folge von Stages, die durch Channels verbunden sind. Jede Stage ist eine Goroutine, die Werte aus einem Inbound-Channel liest, sie transformiert und auf einen Outbound-Channel schreibt. Die erste Stage heißt Source oder Generator und hat nur einen Output; die letzte Stage heißt Sink und hat nur einen Input. Dazwischen liegen beliebig viele Verarbeitungsstufen. Das Schöne an diesem Modell: Jede Stage ist isoliert testbar, und der Datenfluss ist offensichtlich, weil er der Channel-Verkettung folgt.
Zwei Konventionen aus dem Go-Blog sind dabei nicht verhandelbar: Der Owner schließt den Channel — die Goroutine, die in einen Channel schreibt, ist auch die einzige, die ihn schließt, idealerweise per defer close(out). Und Receiver lesen mit range — so beenden sie automatisch, wenn der Owner schließt. Aus diesen beiden Regeln folgt der gesamte Lifecycle einer Pipeline ohne explizite Synchronisation.
package main
import "fmt"
// Stage 1: Generator — Quellwerte produzieren
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
// Stage 2: Transformation — quadrieren
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
func main() {
// Pipeline aufbauen: gen → sq → sq
c := gen(2, 3, 4)
out := sq(sq(c))
// Sink: konsumieren
for v := range out {
fmt.Println(v)
}
}16
81
256Beachte, dass die Stages unbuffered Channels verwenden. Das ist kein Zufall: Unbuffered Channels erzeugen Backpressure. Wenn die Sink langsam ist, blockiert der letzte Send der vorherigen Stage; diese kann nichts mehr aus ihrem Input lesen, also blockiert die Stage davor — die Bremswirkung pflanzt sich rückwärts bis zum Generator fort. Das Resultat: Der Generator produziert nie schneller, als die Sink konsumieren kann. Bei buffered Channels würde dieser elegante Mechanismus durch die Pufferkapazität verzögert, und im Worst Case sammelt sich Arbeit als Speicherverbrauch an.
Die Funktionssignaturen folgen einer zweiten wichtigen Konvention: Sie geben read-only Channels (<-chan int) zurück und akzeptieren read-only Channels als Parameter. So macht der Compiler unmöglich, was die Owner-Regel verbietet — eine fremde Stage kann den Channel nicht versehentlich schließen.
Cancellation in Pipelines — context durch alle Stages reichen
Die naive Pipeline hat ein Problem: Wenn die Sink frühzeitig abbricht — etwa weil der Konsument nur die ersten drei Werte braucht und dann returnt — blockieren alle vorgelagerten Stages auf ihrem nächsten Send. Sie werden nie freigegeben; ihre Goroutines bleiben für immer im Speicher. Das ist ein klassischer Goroutine-Leak. Die Lösung: Jede Stage muss neben dem normalen Send einen Cancellation-Pfad kennen, der sie sofort beendet.
Der historische Go-Blog-Beitrag verwendet einen geteilten done-Channel: Wird er geschlossen, fällt jeder select-Send über den <-done-Case durch und die Stage returnt. Seit Go 1.7 ist context.Context die idiomatische Verallgemeinerung dieses Patterns — es trägt nicht nur das Cancellation-Signal, sondern auch Deadlines und Request-Werte. In neuem Code wird konsequent context durchgereicht.
package main
import (
"context"
"fmt"
)
func gen(ctx context.Context, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
select {
case out <- n:
case <-ctx.Done():
return // sauber raus, defer close läuft
}
}
}()
return out
}
func sq(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-ctx.Done():
return
}
}
}()
return out
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // garantiert Aufräumen, auch bei early return
out := sq(ctx, gen(ctx, 1, 2, 3, 4, 5, 6, 7, 8))
// Nur die ersten drei Werte konsumieren, dann abbrechen
for i := 0; i < 3; i++ {
fmt.Println(<-out)
}
cancel() // alle Stages bekommen das Signal
}1
4
9Die entscheidende Stelle ist der select-Block um jeden Send. Ohne ihn würde die Stage stur auf den nächsten Receiver warten, selbst wenn der Konsument längst weg ist. Mit ihm wird der Send entweder durchgeführt oder das Context-Signal beendet die Goroutine — was zuerst eintritt. Das defer close(out) läuft in beiden Fällen, sodass die nachgelagerte Stage ihr range ebenfalls beenden kann. Diese Disziplin — jeder Send in select mit <-ctx.Done(), jeder Owner schließt per defer — eliminiert Goroutine-Leaks systematisch.
Fan-Out — wenn eine Stage zur Engstelle wird
In einer linearen Pipeline ist der Durchsatz durch die langsamste Stage begrenzt. Wenn eine Stufe CPU-intensive Berechnungen oder blockierende I/O macht, ist die natürliche Antwort: mehrere Goroutines, die parallel aus demselben Input-Channel lesen. Das ist Fan-Out. Channels sind in Go von Haus aus Multi-Consumer-safe — n Goroutines können gefahrlos aus einem Channel lesen, und Go verteilt die Werte automatisch (genau ein Empfänger pro Wert).
Damit wird der Channel selbst zur Work-Queue: Jeder Worker greift sich den nächsten Job, sobald er frei ist. Es gibt keinen expliziten Scheduler; die Channel-Semantik liefert das Load-Balancing kostenlos. Das ist konzeptionell näher an einer Task-Queue als an einem klassischen Thread-Pool — Worker holen Arbeit, statt zugewiesen zu bekommen.
package main
import (
"fmt"
"sync"
"time"
)
// Teurer Job: simuliert CPU-Last
func process(id int, n int) int {
time.Sleep(50 * time.Millisecond) // teure Arbeit
return n * n
}
func fanOut(in <-chan int, workers int) []<-chan int {
outs := make([]<-chan int, workers)
for i := 0; i < workers; i++ {
ch := make(chan int)
outs[i] = ch
go func(id int, out chan<- int) {
defer close(out)
for n := range in { // alle lesen aus demselben in
out <- process(id, n)
}
}(i, ch)
}
return outs
}
func main() {
in := make(chan int)
go func() {
defer close(in)
for i := 1; i <= 8; i++ {
in <- i
}
}()
start := time.Now()
outs := fanOut(in, 4) // 4 Worker
// Naive Konsumation (Fan-In folgt in nächster Sektion)
var wg sync.WaitGroup
for _, c := range outs {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
fmt.Println(v)
}
}(c)
}
wg.Wait()
fmt.Printf("Dauer: %v\n", time.Since(start).Round(10*time.Millisecond))
}1
4
9
16
25
36
49
64
Dauer: 100msOhne Fan-Out würden 8 Jobs á 50 ms seriell 400 ms dauern; mit vier Workern parallel sind es nur noch zwei Wellen — etwa 100 ms. Die Ausgabe-Reihenfolge ist allerdings nicht mehr deterministisch: Welcher Worker welchen Job bekommt, hängt vom Scheduler ab. Das ist ein wichtiger Trade-off von Fan-Out, auf den wir später in den Insights zurückkommen.
Fan-In (Merge) — n Outputs zu einem zusammenführen
Fan-Out produziert n Output-Channels, einen pro Worker. Damit die nachgelagerte Stage nicht wissen muss, wie viele Worker es gab, brauchen wir das Spiegelbild: Fan-In, oft als merge implementiert. Die Funktion startet eine kleine Forwarder-Goroutine pro Input-Channel; jede liest aus „ihrem" Channel und schreibt auf den gemeinsamen Output. Eine sync.WaitGroup zählt, wann alle Forwarder fertig sind, und eine separate Closer-Goroutine schließt den Merge-Output, sobald die WaitGroup leerläuft.
Dieses Pattern stammt direkt aus dem Go-Blog und ist seitdem in praktisch jeder größeren Go-Codebase zu finden. Der entscheidende Twist ist die Closer-Goroutine: Sie ruft wg.Wait() blockierend, was im Owner-Goroutine selbst nicht funktionieren würde, weil dieser ja sofort returnen muss, um den Channel zurückzugeben.
package main
import (
"fmt"
"sync"
)
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Pro Input-Channel ein Forwarder
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
out <- n
}
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Closer-Goroutine: wartet, bis alle Forwarder fertig sind
go func() {
wg.Wait()
close(out)
}()
return out
}
func gen(nums ...int) <-chan int {
out := make(chan int, len(nums))
for _, n := range nums {
out <- n
}
close(out)
return out
}
func main() {
a := gen(1, 2, 3)
b := gen(10, 20, 30)
c := gen(100, 200, 300)
for v := range merge(a, b, c) {
fmt.Println(v)
}
}1
10
100
2
20
200
3
30
300Die Output-Reihenfolge ist interleaved und nicht deterministisch — der Merge garantiert nur, dass jeder Wert genau einmal im Output erscheint, nicht in welcher Reihenfolge. Wer Ordnungserhalt braucht, muss Sequenznummern mitschicken und nachträglich sortieren oder einen ordered Merge implementieren, der gleichzeitig auf alle Eingangs-Channels schaut.
Die Kombination Fan-Out + Fan-In ist das Map-Pattern für Go-Pipelines: ein Channel rein, parallele Verarbeitung in der Mitte, ein Channel raus. Die umliegenden Stages müssen von der Parallelisierung nichts wissen.
Worker-Pool — feste Anzahl Goroutines mit Job- und Result-Channel
Fan-Out startet n Worker, die direkt aus dem Input lesen — das ist bereits ein Worker-Pool im engeren Sinne. Das klassische Worker-Pool-Pattern ist nur eine etwas explizitere Variante: Es trennt sauber Jobs und Results, sodass der Caller asynchron Jobs einfüttern und gleichzeitig Ergebnisse konsumieren kann. Das macht die Verwendung in Service-Code intuitiver.
Goroutine-pro-Job ist in Go billig — aber nicht kostenlos. Bei zehntausend gleichzeitigen DB-Connections explodiert nicht die Goroutine-Stack-Memory, sondern der Connection-Pool der Datenbank. Bei zehntausend HTTP-Requests gegen denselben Host wird der Ziel-Server zur Engstelle oder es greift Rate-Limiting. Ein Worker-Pool ist also weniger ein Performance- als ein Constraint-Pattern: Er erzwingt eine Obergrenze.
package main
import (
"fmt"
"sync"
"time"
)
type Job struct {
ID int
Val int
}
type Result struct {
JobID int
Out int
}
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for j := range jobs {
time.Sleep(20 * time.Millisecond) // simulierte Arbeit
results <- Result{JobID: j.ID, Out: j.Val * j.Val}
}
}
func main() {
const numWorkers = 3
jobs := make(chan Job)
results := make(chan Result)
var wg sync.WaitGroup
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go worker(w, jobs, results, &wg)
}
// Closer: schließt results, sobald alle Worker fertig sind
go func() {
wg.Wait()
close(results)
}()
// Jobs einfüttern (in separater Goroutine, damit wir parallel konsumieren können)
go func() {
defer close(jobs)
for i := 1; i <= 6; i++ {
jobs <- Job{ID: i, Val: i * 10}
}
}()
// Ergebnisse einsammeln
for r := range results {
fmt.Printf("Job %d → %d\n", r.JobID, r.Out)
}
}Job 1 → 100
Job 3 → 900
Job 2 → 400
Job 4 → 1600
Job 5 → 2500
Job 6 → 3600Beachte die saubere Lifecycle-Choreografie: Der Producer schließt jobs, sobald er fertig ist. Die Worker beenden ihre range-Schleife und decrementen die WaitGroup. Die Closer-Goroutine wartet auf wg.Wait() und schließt results. Der Main-Loop terminiert mit dem range. Drei Channels, eine WaitGroup, exakt drei Goroutine-Gruppen — keine Leaks, keine Deadlocks, keine Race Conditions.
Sinnvoll ist ein Worker-Pool insbesondere bei: CPU-bound Arbeit mit runtime.NumCPU() Workern, beschränkten externen Ressourcen wie DB-Connections oder API-Quotas, und Szenarien mit Rate-Limit, wo die Workeranzahl direkt die Requests-per-Second bestimmt.
errgroup — strukturierte Cancellation bei Fehlern
sync.WaitGroup zählt nur Goroutines — sie kennt keine Fehler. Sobald eine Goroutine fehlschlägt, müsste man manuell ein Error-Channel-Muster aufbauen, den ersten Fehler latchen, alle anderen Goroutines abbrechen und am Ende den Fehler propagieren. Das ist genau der Boilerplate, den golang.org/x/sync/errgroup ein für allemal kapselt.
errgroup.Group.Go(fn func() error) startet eine Goroutine, deren Rückgabewert beobachtet wird. Wait() blockt bis alle fertig sind und liefert den ersten Non-Nil-Error zurück. Die WithContext-Variante liefert zusätzlich einen Context, der automatisch gecancelt wird, sobald die erste Goroutine einen Fehler returnt — was alle anderen Goroutines, die den Context respektieren, zum sofortigen Abbruch bringt. Das ist Cancellation-on-Error mit drei Zeilen Code statt drei Bildschirmen.
package main
import (
"context"
"errors"
"fmt"
"time"
"golang.org/x/sync/errgroup"
)
func task(ctx context.Context, name string, dur time.Duration, fail bool) error {
select {
case <-time.After(dur):
if fail {
return fmt.Errorf("%s: simulierter Fehler", name)
}
fmt.Printf("%s: fertig\n", name)
return nil
case <-ctx.Done():
fmt.Printf("%s: abgebrochen (%v)\n", name, ctx.Err())
return ctx.Err()
}
}
func main() {
g, ctx := errgroup.WithContext(context.Background())
g.Go(func() error { return task(ctx, "A", 100*time.Millisecond, false) })
g.Go(func() error { return task(ctx, "B", 50*time.Millisecond, true) }) // failt zuerst
g.Go(func() error { return task(ctx, "C", 200*time.Millisecond, false) })
if err := g.Wait(); err != nil {
fmt.Printf("erster Fehler: %v\n", err)
if errors.Is(err, context.Canceled) {
fmt.Println("(Folgegoroutine wegen Cancel)")
}
}
}A: abgebrochen (context canceled)
C: abgebrochen (context canceled)
erster Fehler: B: simulierter FehlerB schlägt nach 50 ms fehl. Sofort wird der von errgroup.WithContext gelieferte Context gecancelt; A und C sehen ihr <-ctx.Done() und returnen mit context.Canceled. g.Wait() liefert den ersten Fehler — also den von B, nicht die Folge-Cancellations. Das ist exakt das Verhalten, das man in der Praxis will: „Der erste echte Fehler interessiert mich; den Rest räum bitte auf."
Seit Go 1.20 hat errgroup zusätzlich SetLimit(n int). Damit wird Go zu einer blockierenden Operation, sobald n Goroutines bereits laufen — ein eingebauter Worker-Pool ohne eigenes Pool-Konstrukt:
package main
import (
"context"
"fmt"
"sync/atomic"
"time"
"golang.org/x/sync/errgroup"
)
func main() {
g, ctx := errgroup.WithContext(context.Background())
g.SetLimit(3) // maximal 3 gleichzeitig
var inflight atomic.Int32
var peak atomic.Int32
for i := 1; i <= 10; i++ {
i := i
g.Go(func() error {
n := inflight.Add(1)
if n > peak.Load() {
peak.Store(n)
}
defer inflight.Add(-1)
select {
case <-time.After(30 * time.Millisecond):
return nil
case <-ctx.Done():
return ctx.Err()
}
_ = i
})
}
_ = g.Wait()
fmt.Printf("Peak parallel: %d (Limit: 3)\n", peak.Load())
}Peak parallel: 3 (Limit: 3)Für viele reale Anwendungen ersetzt diese eine Zeile — g.SetLimit(3) — den gesamten handgeschriebenen Worker-Pool inklusive Job-Channel, Result-Channel, WaitGroup und Closer-Goroutine. Das ist 2026 der Default-Weg für „n parallele Tasks mit Limit und Error-Propagation".
Praxis: HTTP-Crawler mit Worker-Pool und errgroup
Ein typisches reales Szenario: Wir bekommen eine Liste von URLs und sollen sie parallel abrufen, aber mit einer Obergrenze — Server fair behandeln, Connection-Limit respektieren — und beim ersten unrettbaren Fehler abbrechen. Das ist der Sweet Spot für errgroup.WithContext + SetLimit.
package main
import (
"context"
"fmt"
"io"
"net/http"
"sync"
"time"
"golang.org/x/sync/errgroup"
)
type FetchResult struct {
URL string
Status int
Bytes int
}
func fetch(ctx context.Context, url string) (FetchResult, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return FetchResult{}, fmt.Errorf("build request %s: %w", url, err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return FetchResult{}, fmt.Errorf("fetch %s: %w", url, err)
}
defer resp.Body.Close()
n, err := io.Copy(io.Discard, resp.Body)
if err != nil {
return FetchResult{}, fmt.Errorf("read %s: %w", url, err)
}
return FetchResult{URL: url, Status: resp.StatusCode, Bytes: int(n)}, nil
}
func crawl(parent context.Context, urls []string, concurrency int) ([]FetchResult, error) {
g, ctx := errgroup.WithContext(parent)
g.SetLimit(concurrency)
var mu sync.Mutex
results := make([]FetchResult, 0, len(urls))
for _, url := range urls {
url := url
g.Go(func() error {
r, err := fetch(ctx, url)
if err != nil {
return err // bricht alle anderen ab
}
mu.Lock()
results = append(results, r)
mu.Unlock()
return nil
})
}
if err := g.Wait(); err != nil {
return results, err
}
return results, nil
}
func main() {
urls := []string{
"https://go.dev",
"https://pkg.go.dev",
"https://blog.golang.org",
"https://example.com",
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
results, err := crawl(ctx, urls, 2) // max 2 parallel
if err != nil {
fmt.Printf("Abbruch: %v\n", err)
}
for _, r := range results {
fmt.Printf("%-30s status=%d bytes=%d\n", r.URL, r.Status, r.Bytes)
}
}https://example.com status=200 bytes=1256
https://go.dev status=200 bytes=18234
https://pkg.go.dev status=200 bytes=24561
https://blog.golang.org status=200 bytes=12903Dieser Code ist produktionsnah und nutzt mehrere Patterns zusammen: http.NewRequestWithContext propagiert den Timeout/Cancel automatisch bis auf TCP-Ebene, sodass abgebrochene Requests ihre Sockets sofort freigeben. SetLimit(2) begrenzt die Parallelität — bei 1000 URLs wären sonst 1000 offene Connections. Der Mutex schützt das Result-Slice, weil append aus mehreren Goroutines nicht safe ist; alternativ hätte man auch einen Result-Channel verwenden können. Tritt ein Fehler auf — etwa ein DNS-Failure — wird der innere Context gecancelt, alle laufenden Requests brechen mit context.Canceled ab, und g.Wait() liefert den ursprünglichen Fehler zurück.
Praxis: Datei-Verarbeitung als Pipeline mit Backpressure
Ein klassisches ETL-Szenario: Eine große Log-Datei lesen, jede Zeile parsen und das Ergebnis batchweise in eine Datenbank schreiben. Die drei Stufen haben sehr unterschiedliche Charakteristiken — Datei-IO ist sequenziell, Parsing ist CPU-bound, DB-Writes sind latency-bound. Eine Pipeline mit unbuffered Channels balanciert die drei Stufen über Backpressure automatisch aus.
package main
import (
"bufio"
"context"
"fmt"
"os"
"strings"
"time"
"golang.org/x/sync/errgroup"
)
type LogLine struct {
Level string
Msg string
}
// Stage 1: Datei zeilenweise lesen
func readLines(ctx context.Context, path string) (<-chan string, error) {
f, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf("open: %w", err)
}
out := make(chan string)
go func() {
defer close(out)
defer f.Close()
sc := bufio.NewScanner(f)
for sc.Scan() {
select {
case out <- sc.Text():
case <-ctx.Done():
return
}
}
}()
return out, nil
}
// Stage 2: Zeile parsen — Fan-Out-fähig
func parseLines(ctx context.Context, in <-chan string) <-chan LogLine {
out := make(chan LogLine)
go func() {
defer close(out)
for line := range in {
parts := strings.SplitN(line, " ", 2)
if len(parts) != 2 {
continue
}
ll := LogLine{Level: parts[0], Msg: parts[1]}
select {
case out <- ll:
case <-ctx.Done():
return
}
}
}()
return out
}
// Stage 3: Sink — simulierter DB-Insert
func writeToDB(ctx context.Context, in <-chan LogLine) error {
count := 0
for ll := range in {
// Simuliert latentes Insert
select {
case <-time.After(5 * time.Millisecond):
count++
case <-ctx.Done():
return ctx.Err()
}
_ = ll
}
fmt.Printf("Eingefügt: %d Zeilen\n", count)
return nil
}
func main() {
// Test-Datei erzeugen
tmp, _ := os.CreateTemp("", "log-*.txt")
for i := 0; i < 5; i++ {
fmt.Fprintf(tmp, "INFO Nachricht-%d\n", i)
}
tmp.Close()
defer os.Remove(tmp.Name())
g, ctx := errgroup.WithContext(context.Background())
lines, err := readLines(ctx, tmp.Name())
if err != nil {
fmt.Println(err)
return
}
parsed := parseLines(ctx, lines)
g.Go(func() error { return writeToDB(ctx, parsed) })
if err := g.Wait(); err != nil {
fmt.Printf("Pipeline-Fehler: %v\n", err)
}
}Eingefügt: 5 ZeilenDie drei Stages laufen vollständig parallel — sobald Stage 1 die erste Zeile liest, kann Stage 2 sie schon parsen, während Stage 1 die zweite liest. Bei einer 10-GB-Datei verbraucht die Pipeline trotzdem nur wenige Bytes Speicher, weil die unbuffered Channels den Reader bremsen, sobald der DB-Writer nicht mitkommt. Würde man stattdessen erst die gesamte Datei in eine Slice einlesen, dann parsen, dann schreiben, wäre die Spitzenspeicherauslastung dramatisch höher und die End-to-End-Latenz wäre die Summe aller drei Phasen statt das Maximum.
Wenn Stage 2 zur Engstelle wird — z. B. weil das Parsing aufwendige Regex enthält — kann man sie mit Fan-Out auf mehrere Worker auffächern und mit Fan-In wieder zusammenführen, ohne dass Stage 1 oder Stage 3 davon etwas mitbekommen. Genau das ist die Stärke der Pipeline: lokale Optimierung ohne globale Umbauten.
Interessantes
Pattern-Vergleich auf einen Blick
Pipeline: linearer Datenfluss, Stage-by-Stage, Backpressure gratis. Fan-Out/Fan-In: parallele Verarbeitung einer Stage, wenn sie zur Engstelle wird. Worker-Pool: feste Anzahl Goroutines mit Job/Result-Channels — Spezialfall von Fan-Out mit explizitem Result-Stream. In der Praxis kombiniert man alle drei: Pipeline mit Fan-Out auf der teuren Stage, gesteuert durch errgroup.SetLimit.
Wann NICHT parallelisieren
Wenn die Arbeit pro Item unter ein paar Mikrosekunden liegt, frisst der Channel-Overhead jeden Parallelitätsgewinn auf. Auch sequenzielle Abhängigkeiten — z. B. inkrementelle State-Updates — bringen mit Concurrency oft nur Bugs statt Speed. Erste Frage immer: Ist das Workload-Profil parallelisierbar (independent items, sufficient work per item)?
Backpressure-Intuition: unbuffered ist meistens richtig
Unbuffered Channels koppeln Producer und Consumer tight — wer schneller ist, wartet auf den anderen. Das verhindert unbeschränktes Speicherwachstum bei langsamen Sinks. Buffered Channels sind dann sinnvoll, wenn man bekannte Bursts dämpfen will (z. B. „Producer schreibt 100 Werte in einem Schub, Consumer arbeitet sie konstant ab"). Bei Unsicherheit: unbuffered als Default, Buffer nur mit messbarer Begründung.
Goroutine-Leaks systematisch vermeiden
Jede gestartete Goroutine muss einen garantierten Exit-Pfad haben: entweder Input-Channel wird geschlossen, oder ctx.Done() feuert. Wenn beides fehlt, leakt sie. Regel: Jeder send in eine andere Stage gehört in ein select mit <-ctx.Done(); jede range-Schleife setzt voraus, dass der Owner close aufruft.
errgroup vs. handgestrickte sync.WaitGroup
sync.WaitGroup zählt nur — bei Fehlern brauchst du zusätzlich Error-Channel, First-Error-Latch und manuelle Cancellation. errgroup macht all das in drei Zeilen: g.Go, g.Wait, errgroup.WithContext. Seit Go 1.20 ersetzt g.SetLimit(n) zusätzlich den expliziten Worker-Pool. Für neuen Code: errgroup ist der Default, plain WaitGroup nur noch bei garantiert fehlerfreien Tasks.
Worker-Pool-Sizing: NumCPU, GOMAXPROCS, externe Limits
Für CPU-bound Arbeit ist runtime.NumCPU() (oder runtime.GOMAXPROCS(0)) der Startpunkt — mehr Worker als Cores bringt durch Context-Switches nichts. Bei I/O-bound Arbeit darf die Workeranzahl deutlich höher liegen, ist aber durch das externe Limit bestimmt: maximale DB-Connections, API-Rate-Limit, Datei-Deskriptoren. Pool-Größe ableiten von der schwächsten externen Ressource, nicht von der CPU.
Fan-Out zerstört die Reihenfolge
Sobald mehrere Worker aus demselben Channel lesen, ist die Output-Reihenfolge nichtdeterministisch. Wenn die downstream-Logik Order braucht (z. B. zeitliche Reihenfolge im Log), entweder Sequenznummern mitsenden und nachträglich sortieren — oder Fan-Out vermeiden und stattdessen schneller serielle Verarbeitung anstreben. Reihenfolge ist nicht gratis.
Channel als Semaphore — der Mini-Worker-Pool
Ein chan struct{} mit Kapazität n ist eine Semaphore: vor der Arbeit sem <- struct{}{}, nach der Arbeit <-sem. Das limitiert ohne expliziten Pool die gleichzeitige Ausführung auf n — nützlich, wenn die Workfunktionen unterschiedlich sind und ein klassischer Job-Channel umständlich wäre. Seit errgroup.SetLimit allerdings selten nötig.
Weiterführende Ressourcen
Externe Quellen
- Go Concurrency Patterns: Pipelines and Cancellation (Go Blog)
- Go Concurrency Patterns – Rob Pike (Talk)
golang.org/x/sync/errgroup- Effective Go: Concurrency
contextPackage Documentation