Goroutines und Channels sind die Bausteine — die wirklich tragenden Strukturen einer Go-Anwendung entstehen erst, wenn diese Bausteine zu Mustern zusammengesetzt werden. Drei Patterns decken nahezu jeden realen Anwendungsfall ab: die Pipeline verkettet Verarbeitungsstufen über Channels, der Worker-Pool begrenzt die Parallelität auf eine sinnvolle Zahl, und Fan-Out/Fan-In fächert Arbeit auf mehrere Goroutines auf und sammelt die Ergebnisse wieder ein. Wer diese drei Muster sauber beherrscht — inklusive Cancellation über context.Context und strukturierter Fehlerbehandlung mit errgroup — schreibt Concurrency-Code, der nicht nur schnell ist, sondern auch unter Last, bei Fehlern und beim Shutdown korrekt bleibt.

Dieser Artikel ist die Synthese des klassischen Go-Blog-Beitrags „Pipelines and Cancellation" von Sameer Ajmani mit den Erweiterungen, die seit 2014 idiomatisch geworden sind: context statt nacktem done-Channel, errgroup statt handgestrickter sync.WaitGroup-Fehlerbehandlung und SetLimit als Bounded-Worker-Pool ohne expliziten Pool-Code.

Das Pipeline-Pattern — Stages verketten

Eine Pipeline ist eine Folge von Stages, die durch Channels verbunden sind. Jede Stage ist eine Goroutine, die Werte aus einem Inbound-Channel liest, sie transformiert und auf einen Outbound-Channel schreibt. Die erste Stage heißt Source oder Generator und hat nur einen Output; die letzte Stage heißt Sink und hat nur einen Input. Dazwischen liegen beliebig viele Verarbeitungsstufen. Das Schöne an diesem Modell: Jede Stage ist isoliert testbar, und der Datenfluss ist offensichtlich, weil er der Channel-Verkettung folgt.

Zwei Konventionen aus dem Go-Blog sind dabei nicht verhandelbar: Der Owner schließt den Channel — die Goroutine, die in einen Channel schreibt, ist auch die einzige, die ihn schließt, idealerweise per defer close(out). Und Receiver lesen mit range — so beenden sie automatisch, wenn der Owner schließt. Aus diesen beiden Regeln folgt der gesamte Lifecycle einer Pipeline ohne explizite Synchronisation.

Go pipeline_basic.go
package main

import "fmt"

// Stage 1: Generator — Quellwerte produzieren
func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            out <- n
        }
    }()
    return out
}

// Stage 2: Transformation — quadrieren
func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            out <- n * n
        }
    }()
    return out
}

func main() {
    // Pipeline aufbauen: gen → sq → sq
    c := gen(2, 3, 4)
    out := sq(sq(c))

    // Sink: konsumieren
    for v := range out {
        fmt.Println(v)
    }
}
Output
16
81
256

Beachte, dass die Stages unbuffered Channels verwenden. Das ist kein Zufall: Unbuffered Channels erzeugen Backpressure. Wenn die Sink langsam ist, blockiert der letzte Send der vorherigen Stage; diese kann nichts mehr aus ihrem Input lesen, also blockiert die Stage davor — die Bremswirkung pflanzt sich rückwärts bis zum Generator fort. Das Resultat: Der Generator produziert nie schneller, als die Sink konsumieren kann. Bei buffered Channels würde dieser elegante Mechanismus durch die Pufferkapazität verzögert, und im Worst Case sammelt sich Arbeit als Speicherverbrauch an.

Die Funktionssignaturen folgen einer zweiten wichtigen Konvention: Sie geben read-only Channels (<-chan int) zurück und akzeptieren read-only Channels als Parameter. So macht der Compiler unmöglich, was die Owner-Regel verbietet — eine fremde Stage kann den Channel nicht versehentlich schließen.

Cancellation in Pipelines — context durch alle Stages reichen

Die naive Pipeline hat ein Problem: Wenn die Sink frühzeitig abbricht — etwa weil der Konsument nur die ersten drei Werte braucht und dann returnt — blockieren alle vorgelagerten Stages auf ihrem nächsten Send. Sie werden nie freigegeben; ihre Goroutines bleiben für immer im Speicher. Das ist ein klassischer Goroutine-Leak. Die Lösung: Jede Stage muss neben dem normalen Send einen Cancellation-Pfad kennen, der sie sofort beendet.

Der historische Go-Blog-Beitrag verwendet einen geteilten done-Channel: Wird er geschlossen, fällt jeder select-Send über den <-done-Case durch und die Stage returnt. Seit Go 1.7 ist context.Context die idiomatische Verallgemeinerung dieses Patterns — es trägt nicht nur das Cancellation-Signal, sondern auch Deadlines und Request-Werte. In neuem Code wird konsequent context durchgereicht.

Go pipeline_context.go
package main

import (
    "context"
    "fmt"
)

func gen(ctx context.Context, nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            select {
            case out <- n:
            case <-ctx.Done():
                return // sauber raus, defer close läuft
            }
        }
    }()
    return out
}

func sq(ctx context.Context, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-ctx.Done():
                return
            }
        }
    }()
    return out
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel() // garantiert Aufräumen, auch bei early return

    out := sq(ctx, gen(ctx, 1, 2, 3, 4, 5, 6, 7, 8))

    // Nur die ersten drei Werte konsumieren, dann abbrechen
    for i := 0; i < 3; i++ {
        fmt.Println(<-out)
    }
    cancel() // alle Stages bekommen das Signal
}
Output
1
4
9

Die entscheidende Stelle ist der select-Block um jeden Send. Ohne ihn würde die Stage stur auf den nächsten Receiver warten, selbst wenn der Konsument längst weg ist. Mit ihm wird der Send entweder durchgeführt oder das Context-Signal beendet die Goroutine — was zuerst eintritt. Das defer close(out) läuft in beiden Fällen, sodass die nachgelagerte Stage ihr range ebenfalls beenden kann. Diese Disziplin — jeder Send in select mit <-ctx.Done(), jeder Owner schließt per defer — eliminiert Goroutine-Leaks systematisch.

Fan-Out — wenn eine Stage zur Engstelle wird

In einer linearen Pipeline ist der Durchsatz durch die langsamste Stage begrenzt. Wenn eine Stufe CPU-intensive Berechnungen oder blockierende I/O macht, ist die natürliche Antwort: mehrere Goroutines, die parallel aus demselben Input-Channel lesen. Das ist Fan-Out. Channels sind in Go von Haus aus Multi-Consumer-safe — n Goroutines können gefahrlos aus einem Channel lesen, und Go verteilt die Werte automatisch (genau ein Empfänger pro Wert).

Damit wird der Channel selbst zur Work-Queue: Jeder Worker greift sich den nächsten Job, sobald er frei ist. Es gibt keinen expliziten Scheduler; die Channel-Semantik liefert das Load-Balancing kostenlos. Das ist konzeptionell näher an einer Task-Queue als an einem klassischen Thread-Pool — Worker holen Arbeit, statt zugewiesen zu bekommen.

Go fan_out.go
package main

import (
    "fmt"
    "sync"
    "time"
)

// Teurer Job: simuliert CPU-Last
func process(id int, n int) int {
    time.Sleep(50 * time.Millisecond) // teure Arbeit
    return n * n
}

func fanOut(in <-chan int, workers int) []<-chan int {
    outs := make([]<-chan int, workers)
    for i := 0; i < workers; i++ {
        ch := make(chan int)
        outs[i] = ch
        go func(id int, out chan<- int) {
            defer close(out)
            for n := range in { // alle lesen aus demselben in
                out <- process(id, n)
            }
        }(i, ch)
    }
    return outs
}

func main() {
    in := make(chan int)
    go func() {
        defer close(in)
        for i := 1; i <= 8; i++ {
            in <- i
        }
    }()

    start := time.Now()
    outs := fanOut(in, 4) // 4 Worker

    // Naive Konsumation (Fan-In folgt in nächster Sektion)
    var wg sync.WaitGroup
    for _, c := range outs {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                fmt.Println(v)
            }
        }(c)
    }
    wg.Wait()
    fmt.Printf("Dauer: %v\n", time.Since(start).Round(10*time.Millisecond))
}
Output
1
4
9
16
25
36
49
64
Dauer: 100ms

Ohne Fan-Out würden 8 Jobs á 50 ms seriell 400 ms dauern; mit vier Workern parallel sind es nur noch zwei Wellen — etwa 100 ms. Die Ausgabe-Reihenfolge ist allerdings nicht mehr deterministisch: Welcher Worker welchen Job bekommt, hängt vom Scheduler ab. Das ist ein wichtiger Trade-off von Fan-Out, auf den wir später in den Insights zurückkommen.

Fan-In (Merge) — n Outputs zu einem zusammenführen

Fan-Out produziert n Output-Channels, einen pro Worker. Damit die nachgelagerte Stage nicht wissen muss, wie viele Worker es gab, brauchen wir das Spiegelbild: Fan-In, oft als merge implementiert. Die Funktion startet eine kleine Forwarder-Goroutine pro Input-Channel; jede liest aus „ihrem" Channel und schreibt auf den gemeinsamen Output. Eine sync.WaitGroup zählt, wann alle Forwarder fertig sind, und eine separate Closer-Goroutine schließt den Merge-Output, sobald die WaitGroup leerläuft.

Dieses Pattern stammt direkt aus dem Go-Blog und ist seitdem in praktisch jeder größeren Go-Codebase zu finden. Der entscheidende Twist ist die Closer-Goroutine: Sie ruft wg.Wait() blockierend, was im Owner-Goroutine selbst nicht funktionieren würde, weil dieser ja sofort returnen muss, um den Channel zurückzugeben.

Go fan_in_merge.go
package main

import (
    "fmt"
    "sync"
)

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Pro Input-Channel ein Forwarder
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            out <- n
        }
    }

    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // Closer-Goroutine: wartet, bis alle Forwarder fertig sind
    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

func gen(nums ...int) <-chan int {
    out := make(chan int, len(nums))
    for _, n := range nums {
        out <- n
    }
    close(out)
    return out
}

func main() {
    a := gen(1, 2, 3)
    b := gen(10, 20, 30)
    c := gen(100, 200, 300)

    for v := range merge(a, b, c) {
        fmt.Println(v)
    }
}
Output
1
10
100
2
20
200
3
30
300

Die Output-Reihenfolge ist interleaved und nicht deterministisch — der Merge garantiert nur, dass jeder Wert genau einmal im Output erscheint, nicht in welcher Reihenfolge. Wer Ordnungserhalt braucht, muss Sequenznummern mitschicken und nachträglich sortieren oder einen ordered Merge implementieren, der gleichzeitig auf alle Eingangs-Channels schaut.

Die Kombination Fan-Out + Fan-In ist das Map-Pattern für Go-Pipelines: ein Channel rein, parallele Verarbeitung in der Mitte, ein Channel raus. Die umliegenden Stages müssen von der Parallelisierung nichts wissen.

Worker-Pool — feste Anzahl Goroutines mit Job- und Result-Channel

Fan-Out startet n Worker, die direkt aus dem Input lesen — das ist bereits ein Worker-Pool im engeren Sinne. Das klassische Worker-Pool-Pattern ist nur eine etwas explizitere Variante: Es trennt sauber Jobs und Results, sodass der Caller asynchron Jobs einfüttern und gleichzeitig Ergebnisse konsumieren kann. Das macht die Verwendung in Service-Code intuitiver.

Goroutine-pro-Job ist in Go billig — aber nicht kostenlos. Bei zehntausend gleichzeitigen DB-Connections explodiert nicht die Goroutine-Stack-Memory, sondern der Connection-Pool der Datenbank. Bei zehntausend HTTP-Requests gegen denselben Host wird der Ziel-Server zur Engstelle oder es greift Rate-Limiting. Ein Worker-Pool ist also weniger ein Performance- als ein Constraint-Pattern: Er erzwingt eine Obergrenze.

Go worker_pool.go
package main

import (
    "fmt"
    "sync"
    "time"
)

type Job struct {
    ID  int
    Val int
}

type Result struct {
    JobID int
    Out   int
}

func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
    defer wg.Done()
    for j := range jobs {
        time.Sleep(20 * time.Millisecond) // simulierte Arbeit
        results <- Result{JobID: j.ID, Out: j.Val * j.Val}
    }
}

func main() {
    const numWorkers = 3
    jobs := make(chan Job)
    results := make(chan Result)

    var wg sync.WaitGroup
    for w := 1; w <= numWorkers; w++ {
        wg.Add(1)
        go worker(w, jobs, results, &amp;wg)
    }

    // Closer: schließt results, sobald alle Worker fertig sind
    go func() {
        wg.Wait()
        close(results)
    }()

    // Jobs einfüttern (in separater Goroutine, damit wir parallel konsumieren können)
    go func() {
        defer close(jobs)
        for i := 1; i <= 6; i++ {
            jobs <- Job{ID: i, Val: i * 10}
        }
    }()

    // Ergebnisse einsammeln
    for r := range results {
        fmt.Printf("Job %d%d\n", r.JobID, r.Out)
    }
}
Output
Job 1 → 100
Job 3 → 900
Job 2 → 400
Job 4 → 1600
Job 5 → 2500
Job 6 → 3600

Beachte die saubere Lifecycle-Choreografie: Der Producer schließt jobs, sobald er fertig ist. Die Worker beenden ihre range-Schleife und decrementen die WaitGroup. Die Closer-Goroutine wartet auf wg.Wait() und schließt results. Der Main-Loop terminiert mit dem range. Drei Channels, eine WaitGroup, exakt drei Goroutine-Gruppen — keine Leaks, keine Deadlocks, keine Race Conditions.

Sinnvoll ist ein Worker-Pool insbesondere bei: CPU-bound Arbeit mit runtime.NumCPU() Workern, beschränkten externen Ressourcen wie DB-Connections oder API-Quotas, und Szenarien mit Rate-Limit, wo die Workeranzahl direkt die Requests-per-Second bestimmt.

errgroup — strukturierte Cancellation bei Fehlern

sync.WaitGroup zählt nur Goroutines — sie kennt keine Fehler. Sobald eine Goroutine fehlschlägt, müsste man manuell ein Error-Channel-Muster aufbauen, den ersten Fehler latchen, alle anderen Goroutines abbrechen und am Ende den Fehler propagieren. Das ist genau der Boilerplate, den golang.org/x/sync/errgroup ein für allemal kapselt.

errgroup.Group.Go(fn func() error) startet eine Goroutine, deren Rückgabewert beobachtet wird. Wait() blockt bis alle fertig sind und liefert den ersten Non-Nil-Error zurück. Die WithContext-Variante liefert zusätzlich einen Context, der automatisch gecancelt wird, sobald die erste Goroutine einen Fehler returnt — was alle anderen Goroutines, die den Context respektieren, zum sofortigen Abbruch bringt. Das ist Cancellation-on-Error mit drei Zeilen Code statt drei Bildschirmen.

Go errgroup_basic.go
package main

import (
    "context"
    "errors"
    "fmt"
    "time"

    "golang.org/x/sync/errgroup"
)

func task(ctx context.Context, name string, dur time.Duration, fail bool) error {
    select {
    case <-time.After(dur):
        if fail {
            return fmt.Errorf("%s: simulierter Fehler", name)
        }
        fmt.Printf("%s: fertig\n", name)
        return nil
    case <-ctx.Done():
        fmt.Printf("%s: abgebrochen (%v)\n", name, ctx.Err())
        return ctx.Err()
    }
}

func main() {
    g, ctx := errgroup.WithContext(context.Background())

    g.Go(func() error { return task(ctx, "A", 100*time.Millisecond, false) })
    g.Go(func() error { return task(ctx, "B", 50*time.Millisecond, true) }) // failt zuerst
    g.Go(func() error { return task(ctx, "C", 200*time.Millisecond, false) })

    if err := g.Wait(); err != nil {
        fmt.Printf("erster Fehler: %v\n", err)
        if errors.Is(err, context.Canceled) {
            fmt.Println("(Folgegoroutine wegen Cancel)")
        }
    }
}
Output
A: abgebrochen (context canceled)
C: abgebrochen (context canceled)
erster Fehler: B: simulierter Fehler

B schlägt nach 50 ms fehl. Sofort wird der von errgroup.WithContext gelieferte Context gecancelt; A und C sehen ihr <-ctx.Done() und returnen mit context.Canceled. g.Wait() liefert den ersten Fehler — also den von B, nicht die Folge-Cancellations. Das ist exakt das Verhalten, das man in der Praxis will: „Der erste echte Fehler interessiert mich; den Rest räum bitte auf."

Seit Go 1.20 hat errgroup zusätzlich SetLimit(n int). Damit wird Go zu einer blockierenden Operation, sobald n Goroutines bereits laufen — ein eingebauter Worker-Pool ohne eigenes Pool-Konstrukt:

Go errgroup_limit.go
package main

import (
    "context"
    "fmt"
    "sync/atomic"
    "time"

    "golang.org/x/sync/errgroup"
)

func main() {
    g, ctx := errgroup.WithContext(context.Background())
    g.SetLimit(3) // maximal 3 gleichzeitig

    var inflight atomic.Int32
    var peak atomic.Int32

    for i := 1; i <= 10; i++ {
        i := i
        g.Go(func() error {
            n := inflight.Add(1)
            if n > peak.Load() {
                peak.Store(n)
            }
            defer inflight.Add(-1)

            select {
            case <-time.After(30 * time.Millisecond):
                return nil
            case <-ctx.Done():
                return ctx.Err()
            }
            _ = i
        })
    }

    _ = g.Wait()
    fmt.Printf("Peak parallel: %d (Limit: 3)\n", peak.Load())
}
Output
Peak parallel: 3 (Limit: 3)

Für viele reale Anwendungen ersetzt diese eine Zeile — g.SetLimit(3) — den gesamten handgeschriebenen Worker-Pool inklusive Job-Channel, Result-Channel, WaitGroup und Closer-Goroutine. Das ist 2026 der Default-Weg für „n parallele Tasks mit Limit und Error-Propagation".

Praxis: HTTP-Crawler mit Worker-Pool und errgroup

Ein typisches reales Szenario: Wir bekommen eine Liste von URLs und sollen sie parallel abrufen, aber mit einer Obergrenze — Server fair behandeln, Connection-Limit respektieren — und beim ersten unrettbaren Fehler abbrechen. Das ist der Sweet Spot für errgroup.WithContext + SetLimit.

Go crawler.go
package main

import (
    "context"
    "fmt"
    "io"
    "net/http"
    "sync"
    "time"

    "golang.org/x/sync/errgroup"
)

type FetchResult struct {
    URL    string
    Status int
    Bytes  int
}

func fetch(ctx context.Context, url string) (FetchResult, error) {
    req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
    if err != nil {
        return FetchResult{}, fmt.Errorf("build request %s: %w", url, err)
    }
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return FetchResult{}, fmt.Errorf("fetch %s: %w", url, err)
    }
    defer resp.Body.Close()

    n, err := io.Copy(io.Discard, resp.Body)
    if err != nil {
        return FetchResult{}, fmt.Errorf("read %s: %w", url, err)
    }
    return FetchResult{URL: url, Status: resp.StatusCode, Bytes: int(n)}, nil
}

func crawl(parent context.Context, urls []string, concurrency int) ([]FetchResult, error) {
    g, ctx := errgroup.WithContext(parent)
    g.SetLimit(concurrency)

    var mu sync.Mutex
    results := make([]FetchResult, 0, len(urls))

    for _, url := range urls {
        url := url
        g.Go(func() error {
            r, err := fetch(ctx, url)
            if err != nil {
                return err // bricht alle anderen ab
            }
            mu.Lock()
            results = append(results, r)
            mu.Unlock()
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        return results, err
    }
    return results, nil
}

func main() {
    urls := []string{
        "https://go.dev",
        "https://pkg.go.dev",
        "https://blog.golang.org",
        "https://example.com",
    }

    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    results, err := crawl(ctx, urls, 2) // max 2 parallel
    if err != nil {
        fmt.Printf("Abbruch: %v\n", err)
    }
    for _, r := range results {
        fmt.Printf("%-30s status=%d bytes=%d\n", r.URL, r.Status, r.Bytes)
    }
}
Output
https://example.com            status=200 bytes=1256
https://go.dev                 status=200 bytes=18234
https://pkg.go.dev             status=200 bytes=24561
https://blog.golang.org        status=200 bytes=12903

Dieser Code ist produktionsnah und nutzt mehrere Patterns zusammen: http.NewRequestWithContext propagiert den Timeout/Cancel automatisch bis auf TCP-Ebene, sodass abgebrochene Requests ihre Sockets sofort freigeben. SetLimit(2) begrenzt die Parallelität — bei 1000 URLs wären sonst 1000 offene Connections. Der Mutex schützt das Result-Slice, weil append aus mehreren Goroutines nicht safe ist; alternativ hätte man auch einen Result-Channel verwenden können. Tritt ein Fehler auf — etwa ein DNS-Failure — wird der innere Context gecancelt, alle laufenden Requests brechen mit context.Canceled ab, und g.Wait() liefert den ursprünglichen Fehler zurück.

Praxis: Datei-Verarbeitung als Pipeline mit Backpressure

Ein klassisches ETL-Szenario: Eine große Log-Datei lesen, jede Zeile parsen und das Ergebnis batchweise in eine Datenbank schreiben. Die drei Stufen haben sehr unterschiedliche Charakteristiken — Datei-IO ist sequenziell, Parsing ist CPU-bound, DB-Writes sind latency-bound. Eine Pipeline mit unbuffered Channels balanciert die drei Stufen über Backpressure automatisch aus.

Go file_pipeline.go
package main

import (
    "bufio"
    "context"
    "fmt"
    "os"
    "strings"
    "time"

    "golang.org/x/sync/errgroup"
)

type LogLine struct {
    Level string
    Msg   string
}

// Stage 1: Datei zeilenweise lesen
func readLines(ctx context.Context, path string) (<-chan string, error) {
    f, err := os.Open(path)
    if err != nil {
        return nil, fmt.Errorf("open: %w", err)
    }
    out := make(chan string)
    go func() {
        defer close(out)
        defer f.Close()
        sc := bufio.NewScanner(f)
        for sc.Scan() {
            select {
            case out <- sc.Text():
            case <-ctx.Done():
                return
            }
        }
    }()
    return out, nil
}

// Stage 2: Zeile parsen — Fan-Out-fähig
func parseLines(ctx context.Context, in <-chan string) <-chan LogLine {
    out := make(chan LogLine)
    go func() {
        defer close(out)
        for line := range in {
            parts := strings.SplitN(line, " ", 2)
            if len(parts) != 2 {
                continue
            }
            ll := LogLine{Level: parts[0], Msg: parts[1]}
            select {
            case out <- ll:
            case <-ctx.Done():
                return
            }
        }
    }()
    return out
}

// Stage 3: Sink — simulierter DB-Insert
func writeToDB(ctx context.Context, in <-chan LogLine) error {
    count := 0
    for ll := range in {
        // Simuliert latentes Insert
        select {
        case <-time.After(5 * time.Millisecond):
            count++
        case <-ctx.Done():
            return ctx.Err()
        }
        _ = ll
    }
    fmt.Printf("Eingefügt: %d Zeilen\n", count)
    return nil
}

func main() {
    // Test-Datei erzeugen
    tmp, _ := os.CreateTemp("", "log-*.txt")
    for i := 0; i < 5; i++ {
        fmt.Fprintf(tmp, "INFO Nachricht-%d\n", i)
    }
    tmp.Close()
    defer os.Remove(tmp.Name())

    g, ctx := errgroup.WithContext(context.Background())

    lines, err := readLines(ctx, tmp.Name())
    if err != nil {
        fmt.Println(err)
        return
    }
    parsed := parseLines(ctx, lines)

    g.Go(func() error { return writeToDB(ctx, parsed) })

    if err := g.Wait(); err != nil {
        fmt.Printf("Pipeline-Fehler: %v\n", err)
    }
}
Output
Eingefügt: 5 Zeilen

Die drei Stages laufen vollständig parallel — sobald Stage 1 die erste Zeile liest, kann Stage 2 sie schon parsen, während Stage 1 die zweite liest. Bei einer 10-GB-Datei verbraucht die Pipeline trotzdem nur wenige Bytes Speicher, weil die unbuffered Channels den Reader bremsen, sobald der DB-Writer nicht mitkommt. Würde man stattdessen erst die gesamte Datei in eine Slice einlesen, dann parsen, dann schreiben, wäre die Spitzenspeicherauslastung dramatisch höher und die End-to-End-Latenz wäre die Summe aller drei Phasen statt das Maximum.

Wenn Stage 2 zur Engstelle wird — z. B. weil das Parsing aufwendige Regex enthält — kann man sie mit Fan-Out auf mehrere Worker auffächern und mit Fan-In wieder zusammenführen, ohne dass Stage 1 oder Stage 3 davon etwas mitbekommen. Genau das ist die Stärke der Pipeline: lokale Optimierung ohne globale Umbauten.

Interessantes

Pattern-Vergleich auf einen Blick

Pipeline: linearer Datenfluss, Stage-by-Stage, Backpressure gratis. Fan-Out/Fan-In: parallele Verarbeitung einer Stage, wenn sie zur Engstelle wird. Worker-Pool: feste Anzahl Goroutines mit Job/Result-Channels — Spezialfall von Fan-Out mit explizitem Result-Stream. In der Praxis kombiniert man alle drei: Pipeline mit Fan-Out auf der teuren Stage, gesteuert durch errgroup.SetLimit.

Wann NICHT parallelisieren

Wenn die Arbeit pro Item unter ein paar Mikrosekunden liegt, frisst der Channel-Overhead jeden Parallelitätsgewinn auf. Auch sequenzielle Abhängigkeiten — z. B. inkrementelle State-Updates — bringen mit Concurrency oft nur Bugs statt Speed. Erste Frage immer: Ist das Workload-Profil parallelisierbar (independent items, sufficient work per item)?

Backpressure-Intuition: unbuffered ist meistens richtig

Unbuffered Channels koppeln Producer und Consumer tight — wer schneller ist, wartet auf den anderen. Das verhindert unbeschränktes Speicherwachstum bei langsamen Sinks. Buffered Channels sind dann sinnvoll, wenn man bekannte Bursts dämpfen will (z. B. „Producer schreibt 100 Werte in einem Schub, Consumer arbeitet sie konstant ab"). Bei Unsicherheit: unbuffered als Default, Buffer nur mit messbarer Begründung.

Goroutine-Leaks systematisch vermeiden

Jede gestartete Goroutine muss einen garantierten Exit-Pfad haben: entweder Input-Channel wird geschlossen, oder ctx.Done() feuert. Wenn beides fehlt, leakt sie. Regel: Jeder send in eine andere Stage gehört in ein select mit <-ctx.Done(); jede range-Schleife setzt voraus, dass der Owner close aufruft.

errgroup vs. handgestrickte sync.WaitGroup

sync.WaitGroup zählt nur — bei Fehlern brauchst du zusätzlich Error-Channel, First-Error-Latch und manuelle Cancellation. errgroup macht all das in drei Zeilen: g.Go, g.Wait, errgroup.WithContext. Seit Go 1.20 ersetzt g.SetLimit(n) zusätzlich den expliziten Worker-Pool. Für neuen Code: errgroup ist der Default, plain WaitGroup nur noch bei garantiert fehlerfreien Tasks.

Worker-Pool-Sizing: NumCPU, GOMAXPROCS, externe Limits

Für CPU-bound Arbeit ist runtime.NumCPU() (oder runtime.GOMAXPROCS(0)) der Startpunkt — mehr Worker als Cores bringt durch Context-Switches nichts. Bei I/O-bound Arbeit darf die Workeranzahl deutlich höher liegen, ist aber durch das externe Limit bestimmt: maximale DB-Connections, API-Rate-Limit, Datei-Deskriptoren. Pool-Größe ableiten von der schwächsten externen Ressource, nicht von der CPU.

Fan-Out zerstört die Reihenfolge

Sobald mehrere Worker aus demselben Channel lesen, ist die Output-Reihenfolge nichtdeterministisch. Wenn die downstream-Logik Order braucht (z. B. zeitliche Reihenfolge im Log), entweder Sequenznummern mitsenden und nachträglich sortieren — oder Fan-Out vermeiden und stattdessen schneller serielle Verarbeitung anstreben. Reihenfolge ist nicht gratis.

Channel als Semaphore — der Mini-Worker-Pool

Ein chan struct{} mit Kapazität n ist eine Semaphore: vor der Arbeit sem <- struct{}{}, nach der Arbeit <-sem. Das limitiert ohne expliziten Pool die gleichzeitige Ausführung auf n — nützlich, wenn die Workfunktionen unterschiedlich sind und ein klassischer Job-Channel umständlich wäre. Seit errgroup.SetLimit allerdings selten nötig.

Weiterführende Ressourcen

Externe Quellen

/ Weiter

Zurück zu Goroutines & Channels

Zur Übersicht