The composition of independently executable things.
The simultaneous execution of multiple things.
“Concurrency is a powerful tool for simplifying the description of systems. Performance spins out from this, but is not the primary focus.”
“Traditional approaches to concurrency (e.g. shared data and locks) conflict with some basic assumptions of, and our intuition for, sequential programming. They are, not surprisingly, very difficult to use.”
func Poller(res *Resources) {
for {
// Get the least recently-polled Resource and mark it as being polled
res.lock.Lock()
var r *Resource
for _, v := range res.data {
if v.polling { continue }
if r == nil || v.lastPolled < r.lastPolled { r = v }
}
if r != nil { r.polling = true }
res.lock.Unlock()
if r == nil { continue }
// Actually do the polling logic here
// Update the resource metadata
res.lock.Lock()
r.polling = false
r.lastPolled = time.Nanoseconds()
res.lock.Unlock()
}
}
Simplicity, modularity and composition.
func Poller(in, out chan *Resource) {
for r := range in {
// Polling logic
// send the processed Resource to out
out <- r
}
}
Good concurrent design looks a lot like the Unix Philosophy.
Good concurrent design decouples us from parallelism
func RetrieveContent(urls <-chan string, retrieved chan<- Page) {
for url := range urls {
if resp, err := http.Get(url); err == nil {
retrieved <- Page{address: url, content: resp.Body}
}
}
}
func ParseForLinks(retrieved <-chan Page, record chan<- []string) {
for page := range retrieved {
record <- parse(page)
}
}
func parse(page Page) []string {
// Use goquery because I'm lazy!
doc, _ := goquery.NewDocumentFromReader(page.content)
page.content.Close()
res := make([]string, 0)
doc.Find("a").Each(func(_ int, s *goquery.Selection) {
link, _ := s.Attr("href")
// Do some URL contruction magic
res = append(res, link)
})
return res
}
func VisitedPageTracker(pending chan<- string) chan<- []string {
record := make(chan []string)
visitedPages := map[string]bool{}
go func() {
for pages := range record {
for _, page := range pages {
if !visitedPages[page] {
fmt.Println("recording page", page)
visitedPages[page] = true
pending <- page
}
}
}
}()
return record
}
func main() {
// Pending must be buffered because the pipeline is circular and there are multiple links per page.
// This is preferred over adding to pending concurrently as it sets an upper bound on memory usage.
// There is an assumption here that one page will not contain more than 10000 links.
pending := make(chan string, 10000)
retrieved := make(chan Page)
record := VisitedPageTracker(pending)
// Send initial urls to be recorded
go func() { record <- urls }()
go RetrieveContent(pending, retrieved)
go ParseForLinks(retrieved, record)
// Run the code for 5 seconds.
// Again, I'm lazy and this is a contrived example.
time.Sleep(time.Second * 5)
}