Introduction

Definition

According to Wikipedia:
Structured concurrency is a programming paradigm aimed at improving the clarity, quality, and development time of a computer program by using a structured approach to concurrent programming.

The core concept is the encapsulation of concurrent threads of execution (here encompassing kernel and userland threads and processes) by way of control flow constructs that have clear entry and exit points and that ensure all spawned threads have completed before exit. Such encapsulation allows errors in concurrent threads to be propagated to the control structure’s parent scope and managed by the native error handling mechanisms of each particular computer language. It allows control flow to remain readily evident by the structure of the source code despite the presence of concurrency. To be effective, this model must be applied consistently throughout all levels of the program – otherwise concurrent threads may leak out, become orphaned, or fail to have runtime errors correctly propagated.

Structured concurrency is analogous to structured programming, which introduced control flow constructs that encapsulated sequential statements and subroutines.

In short: Structured Concurrency is a term similar to Structured Programming, aimed at improving the readability and controllability of concurrent programming, i.e., improving the development experience and the quality of concurrent code.
The basic concept is to encapsulate the execution of threads, ensuring they have clear entry and exit points, and that all threads complete before exiting.
The means of encapsulation also includes delegating errors generated in threads to the parent scope for handling.
Moreover, even with concurrency, the control flow structure remains clearly readable in the source code.


Structured Programming vs goto

Structured Programming: Uses code blocks to encapsulate basic control flow, including common conditional statements, loops, and function calls.

goto: Unrestricted jumps.


Illustrations



Background

Development Process

In 2016, ZeroMQ author Martin Sústrik formally proposed the concept of structured concurrency in his article.
In 2018, Nathaniel J. Smith implemented this paradigm in Python - trio, and further elaborated on Structured Concurrency in the article Notes on structured concurrency, or: Go statement considered harmful.
At the same time, Roman Elizarov also proposed the same idea, and implemented the well-known kotlinx.coroutine in Kotlin.
In 2019, OpenJDK loom project also began introducing structured concurrency as part of its lightweight threads and coroutines. It will be included in Java 19.
In 2022, Python 3.11 introduced task group and exception group, officially supporting structured concurrency.
Additionally, newer programming languages like Swift and Rust also have official or third-party libraries implementing Structured Concurrency.


Example

import asyncio  
import httpx  
  
client = httpx.AsyncClient()  
  
  
async def run():  
    r = await client.get("http://127.0.0.1:8000", timeout=3)  
    print(r.text)  
  
  
async def main():  
    async with asyncio.timeout(1):  
        async with asyncio.TaskGroup() as tg:  
            for i in range(2):  
                tg.create_task(run())  
        print("all tasks have completed now.")  
  
  
asyncio.run(main())
Traceback (most recent call last):
  File "C:\Users\jack\PycharmProjects\demo\main.py", line 14, in main
    async with asyncio.TaskGroup() as tg:
  File "C:\Users\jack\.pyenv\pyenv-win\versions\3.11.0b4\Lib\asyncio\taskgroups.py", line 121, in __aexit__
    raise propagate_cancellation_error
  File "C:\Users\jack\.pyenv\pyenv-win\versions\3.11.0b4\Lib\asyncio\taskgroups.py", line 96, in __aexit__
    await self._on_completed_fut
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\jack\PycharmProjects\demo\main.py", line 20, in <module>
    asyncio.run(main())
  File "C:\Users\jack\.pyenv\pyenv-win\versions\3.11.0b4\Lib\asyncio\runners.py", line 187, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "C:\Users\jack\.pyenv\pyenv-win\versions\3.11.0b4\Lib\asyncio\runners.py", line 120, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\jack\.pyenv\pyenv-win\versions\3.11.0b4\Lib\asyncio\base_events.py", line 650, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "C:\Users\jack\PycharmProjects\demo\main.py", line 13, in main
    async with asyncio.timeout(1):
  File "C:\Users\jack\.pyenv\pyenv-win\versions\3.11.0b4\Lib\asyncio\timeouts.py", line 98, in __aexit__
    raise TimeoutError
TimeoutError

Process finished with exit code 1

Proposals in Go

Someone proposed # proposal: Go 2: use structured concurrency #29011 to the official Go team. However, based on the opinions of Go community contributors like ianlancetaylor, this proposal might cause compatibility issues with existing Go code. Besides, the current Go standard library already has tools like context.Context, sync.WaitGroup, and x/sync/errgroup, so this proposal was ultimately not accepted.


How to Practice Structured Concurrency in Go

Problems to Consider in Concurrent Programming

  • When does this task start, and when does it end?
  • How to ensure the main task ends only after all subtasks have completed?
  • If a subtask fails, how does the main task cancel the remaining subtasks?
  • How to ensure all subtasks return within a specific timeout, whether they succeed or fail?
  • Furthermore, how to ensure the main task returns within a specified time, whether it succeeds or fails, while canceling all subtasks it spawned?
  • If the main task has ended but subtasks are still running, is there a resource leak?

Solutions in Go

  • channel
  • context
  • waitgroup
  • errgroup

Example 1: errgroup

Link

package main

import (
	"context"
	"crypto/md5"
	"fmt"
	"io/ioutil"
	"log"
	"os"
	"path/filepath"

	"golang.org/x/sync/errgroup"
)

// Pipeline demonstrates the use of a Group to implement a multi-stage
// pipeline: a version of the MD5All function with bounded parallelism from
// https://blog.golang.org/pipelines.
func main() {
	m, err := MD5All(context.Background(), ".")
	if err != nil {
		log.Fatal(err)
	}

	for k, sum := range m {
		fmt.Printf("%s:\t%x\n", k, sum)
	}
}

type result struct {
	path string
	sum  [md5.Size]byte
}

// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {
	// ctx is canceled when g.Wait() returns. When this version of MD5All returns
	// - even in case of error! - we know that all of the goroutines have finished
	// and the memory they were using can be garbage-collected.
	g, ctx := errgroup.WithContext(ctx)
	paths := make(chan string)

	g.Go(func() error {
		defer close(paths)
		return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
			if err != nil {
				return err
			}
			if !info.Mode().IsRegular() {
				return nil
			}
			select {
			case paths <- path:
			case <-ctx.Done():
				return ctx.Err()
			}
			return nil
		})
	})

	// Start a fixed number of goroutines to read and digest files.
	c := make(chan result)
	const numDigesters = 20
	for i := 0; i < numDigesters; i++ {
		g.Go(func() error {
			for path := range paths {
				data, err := ioutil.ReadFile(path)
				if err != nil {
					return err
				}
				select {
				case c <- result{path, md5.Sum(data)}:
				case <-ctx.Done():
					return ctx.Err()
				}
			}
			return nil
		})
	}
	go func() {
		g.Wait()
		close(c)
	}()

	m := make(map[string][md5.Size]byte)
	for r := range c {
		m[r.path] = r.sum
	}
	// Check whether any of the goroutines failed. Since g is accumulating the
	// errors, we don't need to send them (or check for them) in the individual
	// results sent on the channel.
	if err := g.Wait(); err != nil {
		return nil, err
	}
	return m, nil
}

errgroup Source Code Analysis

type Group struct {
	cancel func()       // Encapsulates context cancel method
	wg sync.WaitGroup   // Uses waitGroup for synchronization
	sem chan token      // Uses channel to limit number
	errOnce sync.Once   // Ensures only executed once
	err     error       // Stores execution error message
}


// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
	g.wg.Wait()
	if g.cancel != nil {
		g.cancel()
	}
	return g.err
}

// Go calls the given function in a new goroutine.
// It blocks until the new goroutine can be added without the number of
// active goroutines in the group exceeding the configured limit.
//
// The first call to return a non-nil error cancels the group; its error will be
// returned by Wait.
func (g *Group) Go(f func() error) {
	if g.sem != nil {
		g.sem <- token{}
	}

	g.wg.Add(1)
	go func() {
		defer g.done()

		if err := f(); err != nil {
			g.errOnce.Do(func() {
				g.err = err
				if g.cancel != nil {
					g.cancel()
				}
			})
		}
	}()
}

Example 2: waitGroup

https://github.com/arunsworld/nursery/blob/ecfe7a688cfd866de0da8ecff34de72b34d22f53/nursery.go


References and Citations

https://en.wikipedia.org/wiki/Structured_concurrency
http://250bpm.com/blog:71
https://trio.readthedocs.io/en/stable/
https://vorpus.org/blog/notes-on-structured-concurrency-or-go-statement-considered-harmful/
https://medium.com/@elizarov/structured-concurrency-722d765aa952
https://kotlinlang.org/docs/coroutines-basics.html
https://github.com/golang/go/issues/29011
https://github.com/arunsworld/nursery/blob/master/nursery.go
https://arunsworld.medium.com/structured-concurrency-in-go-b800c7c4434e
https://realpython.com/python311-exception-groups/
https://zhuanlan.zhihu.com/p/108759542
https://onevcat.com/2021/09/structured-concurrency/
https://blog.softwaremill.com/structured-concurrency-and-pure-functions-92dd8ed1a9f2