A Tiny Recap
In the earlier section Part - 4, we have seen the serial and parallel versions of our image processing program.
Now let us see if we can make our parallel version even better.
Hope you're as excited as Kevin!
Worker Pool
If we take a look at our previous main function, we see that we read the image and then pass it to the Save
function. So until we read the image and prepare the Image
type, the Save
function blocks and the next image isn’t prepared until then.
What if we do this asynchronously in a goroutine, and have a channel which gives a steady input of images? Let’s see what happens.
Let’s create another function inputImages
which just takes in a sync.WaitGroup
, assigns it to all the images we create and returns a <-chan Image
.
If we haven’t discussed about channels being initialized with custom structs, then this is a good example. We can.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func inputImages(wg *sync.WaitGroup) <-chan Image {
images := make(chan Image)
go func(wg *sync.WaitGroup) {
for i := 1; i < 3000; i++ {
filePath := fmt.Sprintf("/Users/g14a/Downloads/train/documents/%d.png", i)
outputFilePath := fmt.Sprintf("/Users/g14a/Downloads/train/documents/output/%d.png", i)
img, err := imgio.Open(filePath)
if err != nil {
fmt.Println(err)
}
rImg := Image{
InputPath: filePath,
OutputPath: outputFilePath,
Img: img,
Wg: wg,
}
images <- rImg
}
close(images)
}(wg)
return images
}
Notice we close()
the images
channel after the for
loop.
We now implement a worker pool of 100 workers, which take images returned by the inputImages
function and start saving images.
1
2
3
4
5
6
7
func worker(imgChan <-chan Image, wg *sync.WaitGroup, workerWg *sync.WaitGroup) {
defer workerWg.Done()
for img := range imgChan {
wg.Add(1)
go img.save()
}
}
Now our main function becomes much simpler.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func main() {
var wg, workerWg sync.WaitGroup
images := inputImages(&wg)
for i := 0; i < 100; i++ {
workerWg.Add(1)
go worker(images, &wg, &workerWg)
}
workerWg.Wait()
wg.Wait()
fmt.Println("Time elapsed", time.Since(start))
}
What happens here is, inputImages
creates a goroutine, creates a channel of images just ready to be processed. Now, there is nothing blocking a second image to be initialized until the first image is prepared.
Let’s see how this program performs.
1
2
3
4
5
$ > time ./img-proc
2021/04/15 22:32:36 profile: memory profiling enabled (rate 1), mem.pprof
Time elapsed 2m38.151913114s
2021/04/15 22:35:14 profile: memory profiling disabled, mem.pprof
./img-proc 1666.75s user 15.45s system 1058% cpu 2:38.94 total
This looks a lot better. We have come down from 3m34s
from the earlier section to 2m38s
now.
Memory Profile
Let’s add pprof
to our Go program and see the memory profile by running it again and check the flame graph.
We observe the following from the graph and our code in general:
imgio.PNGEncoder()
is being allocated again and again.filePath
,outputFilePath
are initialized for every image.img
(line no 9) andrImg
(line no 14) are initialized for every image.- Printing known logs i.e
fmt.Println()
increased I/O overhead to print on the terminal when data set is as large 3k images. - We don’t need
InputPath
in ourImage
type unless it is logged.
We make modifications to decrease memory allocation and then our final program is updated as per the next heading.
Final Parallel Version
This is how our final program looks like:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package main
import (
"fmt"
"github.com/anthonynsimon/bild/adjust"
"github.com/anthonynsimon/bild/imgio"
"image"
"sync"
"time"
)
func main() {
start := time.Now()
var wg sync.WaitGroup
images := inputImages(&wg)
var workerWg sync.WaitGroup
encoder := imgio.PNGEncoder()
for i := 0; i < 100; i++ {
workerWg.Add(1)
go worker(images, &wg, &workerWg, encoder)
}
workerWg.Wait()
wg.Wait()
fmt.Println("Time elapsed", time.Since(start))
}
func worker(imgChan <-chan Image, wg *sync.WaitGroup, workerWg *sync.WaitGroup, encoder imgio.Encoder) {
defer workerWg.Done()
for img := range imgChan {
wg.Add(1)
go img.save(encoder)
}
}
func inputImages(wg *sync.WaitGroup) <-chan Image {
images := make(chan Image)
var filePath, outputFilePath string
var img image.Image
var rImg Image
var err error
go func(wg *sync.WaitGroup) {
for i := 1; i < 3000; i++ {
filePath = fmt.Sprintf("/Users/g14a/Downloads/train/documents/%d.png", i)
outputFilePath = fmt.Sprintf("/Users/g14a/Downloads/train/documents/output/%d.png", i)
img, err = imgio.Open(filePath)
if err != nil {
fmt.Println(err)
return
}
rImg = Image{
OutputPath: outputFilePath,
Img: img,
Wg: wg,
}
images <- rImg
}
close(images)
}(wg)
return images
}
func (i Image) save(encoder imgio.Encoder) {
defer i.Wg.Done()
brightnessDone, contrastDone := make(chan bool), make(chan bool)
go i.brightness(brightnessDone, contrastDone)
<-brightnessDone
<-contrastDone
if err := imgio.Save(i.OutputPath, i.Img, encoder); err != nil {
fmt.Println(err)
return
}
}
func (i Image) brightness(brightnessDone chan<- bool, contrastDone chan<- bool) {
i.Img = adjust.Brightness(i.Img, 0.25)
brightnessDone <- true
go i.contrast(contrastDone)
}
func (i Image) contrast(contrastDone chan<- bool) {
i.Img = adjust.Contrast(i.Img, 0.25)
contrastDone <- true
}
type Image struct {
OutputPath string
Img image.Image
Wg *sync.WaitGroup
}
We get the following output when we run this:
1
2
3
$ > time ./img-proc
Time elapsed 2m37.824414114s
./img-proc 1584.74s user 11.74s system 1008% cpu 2:38.37 total
Oops, it didn’t get any better! But we can see that the CPU time came down from 1058% to 1008% so that counts I guess? Well we tried our best :) Let me know how I can make this better.
We end this section here realizing that we have made a task which took around 20 minutes in its serial version faster by almost 7 times just by implementing channels and decreasing redundant memory allocations.
Conclusion
Hope you enjoyed the full exercise and how we tried optimising it.
Please reach out to me via email(or any social media linked down below) if you think I haven’t covered something which you consider important.
Thank you 😁