Go Concurrency ala Rob Pike December 18, 2014
I watched Rob Pike's talk, "Concurrency is not Parallelism", so I wanted to take what he was saying with his gopher example, and make a program that tightly followed his model.
My example program, in re-writing this website in Go, is the bit of it that gets photos from my Flickr account that I have tagged with "jtccom" in order to make them into header images. This utilizes the Flickr JSON API which is pretty easy to use.
There are multiple steps to using the Flickr API, three separate web calls, which makes this ideal for Go concurrency style programming. First step is to get the photos from my account tagged with "jtccom". This returns an array with photo ID and photo Secret. In order to get the URL for the photo, you have to get the sizes first. This is a separate call to the Flickr API, which returns an array of, among other things, Label and Source. Source is the URL, Label is the size name. In this case I'm only interested in the Original size, which has the "Original" label. The next part is to download the content pointed to by Source in the "Original" Size.
So the idea was to have a goroutine that gets photos (step 1), another one that gets sizes (step 2), and the other one that downloads content. Conceptually, this looks like this:
Realistically, it works pretty much in order because the calls to getPhotos and getSizes are done way before it's done downloading the content, as each file is around 9-12 MB, but at least the getPhotos and getSizes can pretty much run in parallel.
Code-wise, it looks very similar, just with go routines and some object style things, json parsing etc.
For clarity I broke out Flickr specific calls into a separate file, but not a separate package. Here's the "flickrsvc.go" file, with some hidden things like API key obfuscated.
package main
import (
"fmt"
"time"
"sync"
)
func saveFiles(tmp, dest string, photoContent chan PhotoContent){
for photoContent := range photoContent {
fmt.Println("Downloaded", photoContent.Photo.Id, "of size", len(photoContent.Content))
}
}
func process(){
var apiKey = "blahblah"
var userId = "28240873@N07"
var tag = "jtccom"
var tmp = "../jtccom/content/tmp_download/"
var destination = "../jtccom/static/images/backgrounds/"
procWG := sync.WaitGroup{}
photos := make(chan Photo)
sizes := make(chan PhotoWithSizes)
content := make(chan PhotoContent)
procWG.Add(3)
go func(){
getPhotosByTag(tag, apiKey, userId, photos)
close(photos)
procWG.Done()
}()
go func(){
getPhotoSizes(apiKey, photos, sizes)
close(sizes)
procWG.Done()
}()
go func(){
downloadPhotos("Original", sizes, content)
close(content)
procWG.Done()
}()
saveFiles(tmp, destination, content)
fmt.Println("wait procWG")
procWG.Wait()
}
func main(){
for {
fmt.Println("going")
process()
fmt.Println("wait wg")
fmt.Println("Sleeping")
time.Sleep(3*time.Second)
}
}
And here is the output:
C:\Users\jconnell\Documents\go\src\jtccom.flickrsvc>jtccom.flickrsvc.exe
going
Downloaded 14685510038 of size 9867146
Downloaded 14465862480 of size 11279714
Downloaded 14649298391 of size 9423168
Downloaded 14076004795 of size 8925512
Downloaded 13936652032 of size 14851399
Downloaded 12076007194 of size 14099167
Downloaded 11678436824 of size 9671802
Downloaded 11507180674 of size 13510941
Downloaded 11507190024 of size 11963353
Downloaded 11412952753 of size 13030709
Here is flickr.go (although it doesn't matter what it's called).
package main
import (
"strings"
"net/http"
"net/url"
"encoding/json"
"io/ioutil"
)
type Response struct {
Wrap Photos `json:"photos"`
}
type Photos struct {
Photo []Photo `json:"photo"`
}
type Photo struct {
Id string `json:"id"`
Secret string `json:"secret"`
}
type SizeArray []Size
func (sizeArray SizeArray) GetSize(label string) Size {
var size Size
for _,sz := range sizeArray {
if strings.EqualFold(sz.Label, label) {
size = sz
break
}
}
return size
}
type SizesResponse struct {
Wrap Sizes `json:"sizes"`
}
type Sizes struct {
Sizes SizeArray `json:"size"`
}
type Size struct {
Label string `json:"label"`
Source string `json:"source"`
}
type PhotoWithSizes struct {
Photo *Photo
Sizes SizeArray
}
type PhotoContent struct {
Photo *Photo
Content []byte
}
func getPhotosByTag(tag, apiKey, userId string, pchan chan Photo) {
qs := url.Values{}
qs.Add("method", "flickr.photos.search")
qs.Add("api_key", apiKey)
qs.Add("user_id", userId)
qs.Add("tags", tag)
qs.Add("format", "json")
qs.Add("nojsoncallback", "1")
flickrUrl, _ := url.Parse("https://api.flickr.com/services/rest/?" + qs.Encode())
if resp,err := http.Get(flickrUrl.String()); err == nil {
defer resp.Body.Close()
decoder := json.NewDecoder(resp.Body)
photos := Response{}
decoder.Decode(&photos)
for _, p := range photos.Wrap.Photo {
pchan <- p
}
} else {
panic(err)
}
}
func downloadPhotos(sizeLabel string, download chan PhotoWithSizes, downloaded chan PhotoContent) {
for p := range download {
url := p.Sizes.GetSize(sizeLabel).Source
if resp,err := http.Get(url); err == nil {
bytes,err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
panic(err)
} else {
pc := PhotoContent{ Photo: p.Photo, Content: bytes }
downloaded <- pc
}
} else {
panic(err)
}
}
}
func getPhotoSizes(apiKey string, photos chan Photo, photoSizes chan PhotoWithSizes) {
for p := range photos {
qs := url.Values{}
qs.Add("method", "flickr.photos.getSizes")
qs.Add("api_key", apiKey)
qs.Add("photo_id", p.Id)
qs.Add("format", "json")
qs.Add("nojsoncallback", "1")
if sizesUrl, err := url.Parse("https://api.flickr.com/services/rest/?" + qs.Encode()); err == nil {
if resp,err := http.Get(sizesUrl.String()); err == nil {
decoder := json.NewDecoder(resp.Body)
sizeResp := SizesResponse{}
decoder.Decode(&sizeResp)
resp.Body.Close()
photoWithSizes := PhotoWithSizes{ Photo: &p, Sizes: sizeResp.Wrap.Sizes }
photoSizes <- photoWithSizes
} else {
panic(err)
}
}
}
}
I had some problems where the Flickr methods would return channels and they weren't working. And I had to experiment with buffered vs unbuffered channels, internal sync.WaitGroups, and stuff that wasn't working out so well. I will play around with this more, since apparently you can use WaitGroup without using Channels. I definitely want to play more to get a better understanding and find out why stuff I was trying initially wasn't working. But it's working now, I just have to finish it by saving it to the destination folder, and checking if the image was already downloaded. For future me, this would be good to do with a func that takes a channel and outputs to another channel all of the files that haven't yet been downloaded, to keep with the passing channels paradigm I've used so far.