Concurrency By Composition

Go, CSP and the Unix Philosophy

Concurrency is not Parallelism

- Rob Pike

Concurrency

The composition of independently executable things.

Parallelism

The simultaneous execution of multiple things.

Concurrency is about the structure of the code, it may or may not be run in parallel.

“Concurrency is a powerful tool for simplifying the description of systems. Performance spins out from this, but is not the primary focus.”

“Traditional approaches to concurrency (e.g. shared data and locks) conflict with some basic assumptions of, and our intuition for, sequential programming. They are, not surprisingly, very difficult to use.”

Accidental vs Essential complexity

func Poller(res *Resources) {
    for {
        // Get the least recently-polled Resource and mark it as being polled
        res.lock.Lock()
        var r *Resource
        for _, v := range res.data {
            if v.polling { continue }
            if r == nil || v.lastPolled < r.lastPolled { r = v }
        }
        if r != nil { r.polling = true }
        res.lock.Unlock()
        if r == nil { continue }

        // Actually do the polling logic here

        // Update the resource metadata
        res.lock.Lock()
        r.polling = false
        r.lastPolled = time.Nanoseconds()
        res.lock.Unlock()
    }
}

The Go Blog - Share Memory By Communicating

Shared memory and locking are global state.

The Unix Philosophy

Simplicity, modularity and composition.

Communicating Sequential Processes

Sequential processes encourage understandable code.

Do not communicate by sharing memory; instead, share memory by communicating.

func Poller(in, out chan *Resource) {
    for r := range in {
        // Polling logic

        // send the processed Resource to out
        out <- r
    }
}

The Go Blog - Share Memory By Communicating

Designing a web indexer

  • Seed Initial URLs
  • Retrieve the HTML for a page.
  • Parsing it for links.
  • Record and search pages we have found.
graph TD A[Seed Initial URLs] B[Retrieve HTML] C[Parse for Links] D[Record URL]
graph LR A[Seed Initial URLs] --> B[Retrieve HTML] B --> C[Parse for Links] C --> B C --> D[Record URL]
graph LR A[Seed Initial URLs] --> B[Record URL] B --> C[Retrieve HTML] C --> D[Parse for Links] D --> B

Good concurrent design looks a lot like the Unix Philosophy.

Good concurrent design decouples us from parallelism

Writing our web indexer

graph LR A[Seed Initial URLs] --> B[Record URL] B --> C[Retrieve HTML] C --> D[Parse for Links] D --> B
func RetrieveContent(urls <-chan string, retrieved chan<- Page) {
	for url := range urls {
		if resp, err := http.Get(url); err == nil {
			retrieved <- Page{address: url, content: resp.Body}
		}
	}
}

liamawhite/go-concurrency-example

func ParseForLinks(retrieved <-chan Page, record chan<- []string) {
	for page := range retrieved {
		record <- parse(page)
	}
}

func parse(page Page) []string {
	// Use goquery because I'm lazy!
	doc, _ := goquery.NewDocumentFromReader(page.content)
	page.content.Close()
	res := make([]string, 0)
	doc.Find("a").Each(func(_ int, s *goquery.Selection) {
		link, _ := s.Attr("href")
		// Do some URL contruction magic
		res = append(res, link)
	})
	return res
}

liamawhite/go-concurrency-example

func VisitedPageTracker(pending chan<- string) chan<- []string {
	record := make(chan []string)
	visitedPages := map[string]bool{}
	go func() {
		for pages := range record {
			for _, page := range pages {
				if !visitedPages[page] {
					fmt.Println("recording page", page)
					visitedPages[page] = true
					pending <- page
				}
			}
		}
	}()
	return record
}

liamawhite/go-concurrency-example

func main() {
	// Pending must be buffered because the pipeline is circular and there are multiple links per page.
	// This is preferred over adding to pending concurrently as it sets an upper bound on memory usage.
	// There is an assumption here that one page will not contain more than 10000 links.
	pending := make(chan string, 10000)
	retrieved := make(chan Page)
	record := VisitedPageTracker(pending)

	// Send initial urls to be recorded
	go func() { record <- urls }()

	go RetrieveContent(pending, retrieved)
	go ParseForLinks(retrieved, record)

	// Run the code for 5 seconds.
	// Again, I'm lazy and this is a contrived example.
	time.Sleep(time.Second * 5)
}

liamawhite/go-concurrency-example

Summary

Concurrency != Parallelism

Good concurrent design decomposes the core logic into sequential routines. This makes it easier to reason about and test.

The concurrency comes from their composition, sharing memory by sending messages.

Shared memory is global state!