Translating Effective Python into Go: Know When to Use Channels for Generator-Like Functions
A Python function can be a generator that returns an iterator of dynamically constructed values. A generator function acts like a coroutine. Each time a new value is requested from the iterator, the thread of control is passed to the generator so it can create the next result. Once the generator produces a value via the yield expression, control is restored to the iterator's consumer so that it can act on the new data.
Go's channel type is a powerful synchronization primitive that acts as producer-consumer queue. Go also supports lightweight concurrent functions called goroutines, which can safely communicate via channels. Though Go's syntax is different from Python's, channels and goroutines in Go can be used together to provide behavior that's very similar to Python's generator functions.
This means that the best practices for generators in Python can easily translate into best practices for Go. This article explores Item #16 ("Consider Generators Instead of Returning Lists") from my book Effective Python: 59 Specific Ways to Write Better Python, and how my advice for Python generators can be applied to Go programs as well.
You can find the source code for the examples below here on GitHub.
A Motivating Example
Say you want to read some comma-separated value (CSV) data that contains pairs of x and y coordinates as floating-point numbers. In Python, you'd do this with a plain function that uses the csv built-in module:
# Python
def load_csv_data(stream):
result = []
for row in csv.reader(stream):
if len(row) != 2:
raise ValueError('Rows must have two entries')
point = float(row[0]), float(row[1])
result.append(point)
return result
This function reads the input data using csv.reader. It verifies that each row contains only two columns of values. It parses the strings from each column as floating-point numbers. It uses the tuple (x, y) to represent each point that was found in the input CSV. It accumulates the parsed points into a list. Finally, it returns the full list of points to the caller.
The code that uses this function is straightforward:
# Python
data = '1.0,2.5\n3.5,4.1\n7.5,2.2\n6.9,1.1\n'
rows = load_csv_data(io.StringIO(data))
for i, row in enumerate(rows):
print('Row %d is %r' % (i, row))
>>>
Row 0 is (1.0, 2.5)
Row 1 is (3.5, 4.1)
Row 2 is (7.5, 2.2)
Row 3 is (6.9, 1.1)
Writing the same program in Go is similarly easy. First, I define a struct that will contain the data for the points after they're parsed:
// Go
type Point struct {
X, Y float64
}
Second, I define a function that parses two columns of CSV data into Point instances:
// Go
func recordToPoint(record []string) (p Point) {
if len(record) != 2 {
return
}
p.X, _ = strconv.ParseFloat(record[0], 64)
p.Y, _ = strconv.ParseFloat(record[1], 64)
return
}
Third, I write a function to read all of the CSV data into slices, convert those slices into Point instances, and then return the full slice of points that were found:
// Go
func LoadCsvData(in io.Reader) (result []Point) {
reader := csv.NewReader(in)
records, _ := reader.ReadAll()
for _, record := range records {
point := recordToPoint(record)
result = append(result, point)
}
return
}
The code that uses this function is also simple:
// Go
data := "1.0,2.5\n3.5,4.1\n"
points := LoadCsvData(strings.NewReader(data))
for i, point := range points {
fmt.Printf("Row %d is %v", i, point)
}
---------------------------------
Row 0 is {1 2.5}
Row 1 is {3.5 4.1}
Row 2 is {7.5 2.2}
Row 3 is {6.9 1.1}
Though these functions are short and easy to read, one big problem is common to both the Python and Go versions: These programs are unbounded in their potential memory usage. If the input stream is large enough, the programs will accumulate point data until they run out of memory and crash.
The solution to this problem is moving to generators, which allow you to avoid accumulating values before returning them. For these functions, using generators ensures that your memory usage will be limited to a single point at a time (along with any buffers for the incoming data stream, and the stack space for the generator). As soon as a new point has been sent to the consumer of the generator, its memory will no longer held by the generator function.
Creating Generators
The big difference between implementing generators in Python and implementing generator-like functions in Go is explicitness. In Python, a function becomes a generator implicitly as soon as you use a yield expression in its body. Here, I've rewritten the earlier function as a generator by removing the result list and changing the result.append call into a yield expression:
# Python
def load_csv_data_streaming(stream):
for row in csv.reader(stream):
if len(row) != 2:
raise ValueError('Rows must have two entries')
point = float(row[0]), float(row[1])
yield point
Calling a generator function immediately returns an iterator and doesn't actually execute the function's body. Each time the iterator is advanced (e.g., by the next built-in function), the generator function will execute until the next yield statement is reached or the function exits. Python will interleave the generator's execution with the execution of the code that consumes the generator, in the same way you'd expect cooperative threads to work together.
The generator version of a function is often a drop-in replacement for the version that returns a list. Python's looping constructs interact with any type of iterable (lists, dictionaries, iterators, etc.) in the same way. The only change required to use the generator function is the call to load_csv_data_streaming:
# Python
data = '1.0,2.5\n3.5,4.1\n7.5,2.2\n6.9,1.1\n'
rows = load_csv_data_streaming(io.StringIO(data))
for i, row in enumerate(rows):
print('Row %d is %r' % (i, row))
>>>
Row 0 is (1.0, 2.5)
Row 1 is (3.5, 4.1)
Row 2 is (7.5, 2.2)
Row 3 is (6.9, 1.1)
In Go, you must explicitly wire up the generator-like behavior:
// Go
func LoadCsvDataToChannel(in io.Reader) <-chan Point {
out := make(chan Point)
go func() {
defer close(out)
reader := csv.NewReader(in)
for {
record, err := reader.Read()
if err == io.EOF {
return
}
point := recordToPoint(record)
out <- point
}
}()
return out
}
This function creates a channel instance (here called out). It kicks off a goroutine that asynchronously populates the channel with Point instances (using the <- operator). Then it returns the channel as an output channel that can only be used to received values. Importantly, the interior goroutine is also responsible for closing the channel when all output has finished. This task is accomplished with the defer close(out) statement that triggers when the interior goroutine finally exits.
Using the generator-like version of this function, the code that consumes the channel is almost as simple as before (I only had to add a loop counter):
// Go
results := LoadCsvDataToChannel(strings.NewReader(data))
i := 0
for point := range results {
fmt.Printf("Row %d is %v\n", i, point)
i++
}
---------------------------------
Row 0 is {1 2.5}
Row 1 is {3.5 4.1}
Row 2 is {7.5 2.2}
Row 3 is {6.9 1.1}
Unlike Python generators, the interior goroutine's execution doesn't wait for the consuming code. The goroutine is scheduled by the Go runtime to run independently. It may or may not be interleaved with the code that consumes the channel—you don't know, and you shouldn't care. The channel takes care of this coordination for you.
The make(chan Point) call above creates a channel with no buffer. That means the goroutine populating the channel will progress only up to the moment that the first value is sent into the channel (again, using the <- operator). At that time, the goroutine will block until the consuming code receives the value (with the range clause in the for loop).
Similarly, if the consumer tries to receive from the channel and a value is not yet ready, it will block until the producing goroutine sends another value into the channel or closes the channel to indicate there is no more data.
Though the mechanism is quite different, the resulting behavior of channels and goroutines in Go feels much like Python's generator functions.
Handling Errors
The code examples I've discussed so far overlook a very important reality of writing correct programs: handling errors. In practice, there are many opportunities for things to go wrong while processing the CSV input data, including poorly formatted numbers, not enough columns, and truncated streams.
Python deals with errors by raising Exception instances that propagate up through the call stack. Unfortunately, Python's dynamic nature means the compiler can't check or enforce which types of exceptions are raised by a function. You have to inspect the code yourself, read documentation of behaviors, and make your best guess. In this case, the possible sources of exceptions are as follows:
- Issues that occur while reading from the input stream
- Errors decoding the characters received from the input
- CSV rows that don't have exactly two columns
- Any failures in parsing the floating-point numbers from the columns
When a generator raises an exception, it will traverse into the consuming code at the point where the generator's iterator was advanced. Practically speaking, the earlier generator example must be rewritten to catch any exceptions that may have occurred within the for statement:
# Python
it = load_csv_data_streaming(stream)
try:
for i, row in enumerate(it):
print('Row %d is %r' % (i, row))
except (ValueError, IOError):
raise MyException('Broke reading CSV')
However, this approach is generally considered bad style because it puts too much code inside the try block. If you hit other exceptions when running the interior of the loop, such as the print function call, those exceptions may be caught inadvertently by the except block, which was only meant to guard against problems within the load_csv_data_streaming generator.
To fix this issue, you can manually advance the iterator with the next built-in function, catch any exceptions that are raised by doing so, and then execute the print function outside of the try block:
# Python
it = enumerate(load_csv_data_streaming(stream))
while True:
try:
i, row = next(it)
except StopIteration:
break
except (ValueError, IOError) as e:
raise MyException('Broke after row')
else:
print('Row %d is %r' % (i, row))
With Python, you have to handle exceptions primarily on the consuming side of the generator, not on the producing side. In this case, the load_csv_data_streaming function doesn't have to change. All I've had to add is a try/except block in the code that drives the generator function.
In contrast, Go encourages explicit error-handling at every level and in every function. This convention translates into more error-handling code within the lower-level functions, but less error-handling code at the point of consumption.
The previous Go examples swallowed errors, which can lead to odd behaviors like zero-valued Point instances being returned or runtime panics. For good style, I need to rewrite the code to propagate errors up to callers.
I start by making the parsing function handle and return parsing errors:
// Go
func recordToPoint(record []string) (p Point, err error) {
if len(record) != 2 {
err = fmt.Errorf("Records must have two columns")
return
}
if p.X, err = strconv.ParseFloat(record[0], 64); err != nil {
return
}
if p.Y, err = strconv.ParseFloat(record[1], 64); err != nil {
return
}
return
}
That's easy enough, but adding error-handling logic to the generator-like version of this function, LoadCsvDataToChannel, is much more difficult than handling errors from the Python generator function load_csv_data_streaming.
In Python, the generator potentially raises exceptions to the caller each time its iterator is advanced by the next built-in function. This behavior gives Python two paths for communicating information to the consuming code: the return value of next, and any exceptions that it may raise.
The explicit nature of error-handling in Go means that there isn't a second path for communicating errors to the consuming code. Instead, errors are explicitly communicated through plain return values. For LoadCsvDataToChannel, this means that errors must be passed through the result channel itself.
To make this approach work, I need to augment the type of Point with any error that may have occurred while trying to produce it. I do this by declaring a new struct that embeds the Point struct:
// Go
type PointOrErr struct {
Point
Err error
}
Then I rewrite the generator version of this function to send error instances into the channel through the PointOrErr.Err field:
// Go
func LoadCsvDataToChannel(in io.Reader) <-chan PointOrErr {
out := make(chan PointOrErr)
go func() {
defer close(out)
reader := csv.NewReader(in)
for {
record, err := reader.Read()
if err == io.EOF {
return
}
if err != nil {
out <- PointOrErr{Err: err}
return
}
point, err := recordToPoint(record)
if err != nil {
out <- PointOrErr{Err: err}
return
}
out <- PointOrErr{Point: point}
}
}()
return out
}
Finally, I change the consuming code to handle any error instances found in PointOrErr values that were received from the channel:
// Go
results := LoadCsvDataToChannel(strings.NewReader(data))
i := 0
for point := range results {
if point.Err != nil {
panic(point.Err)
}
fmt.Printf("Row %d is %v\n", i, point)
i++
}
---------------------------------
Row 0 is {{1 2.5} <nil>}
Row 1 is {{3.5 4.1} <nil>}
Row 2 is {{7.5 2.2} <nil>}
Row 3 is {{6.9 1.1} <nil>}
Now the Go implementation of the generator-like function is robust. It's also easy to see that all errors are received and properly handled by the consuming code.
Constructing Pipelines
One of the biggest advantages of using generators is that you can connect them together to build pipelines. For example, you could easily create another function that consumes a generator of point data and outputs another generator with the distance between the last two points. Because you're using generators, you can be sure that the maximum working memory size for such a function is only the working set (two points)—not the full stream.
Here's such a generator implemented in Python:
# Python
def distance_stream(it):
x, y = next(it)
for a, b in it:
yield math.sqrt((a - x) ** 2 + (b - y) ** 2)
x, y = a, b
Using this new generator is easy:
# Python
stream = io.StringIO(data)
it = load_csv_data_streaming(stream)
for i, distance in enumerate(distance_stream(it)):
print('Move %d was %f far' % (i, distance))
>>>
Move 0 was 2.968164 far
Move 1 was 4.428318 far
Move 2 was 1.252996 far
Providing a similar generator-like function in Go is straightforward. As before, I have to define a new struct that contains both the results of the distance formula and any errors that may have occurred:
// Go
type DistanceOrErr struct {
Distance float64
Err error
}
Then, I follow the same pattern as before to create a channel and populate its results with a goroutine. This function is different from LoadCsvDataToChannel in that it consumes one channel and populates another instead of reading through a byte stream:
// Go
func PointDistanceToChannel(in <-chan PointOrErr) <-chan DistanceOrErr {
out := make(chan DistanceOrErr)
go func() {
defer close(out)
p := <-in
if p.Err != nil {
out <- DistanceOrErr{Err: p.Err}
}
for q := range in {
if q.Err != nil {
out <- DistanceOrErr{Err: q.Err}
continue
}
dx := math.Pow(q.X-p.X, 2)
dy := math.Pow(q.Y-p.Y, 2)
distance := math.Sqrt(dx + dy)
out <- DistanceOrErr{Distance: distance}
p = q
}
}()
return out
}
Connecting this new function into the output of LoadCsvDataToChannel is simple, and the consume code is easy to read:
// Go
pointStream := LoadCsvDataToChannel(strings.NewReader(data))
distances := PointDistanceToChannel(pointStream)
i := 0
for distance := range distances {
if distance.Err != nil {
panic(distance.Err)
}
fmt.Printf("Move %d was %f far\n", i, distance.Distance)
i++
}
---------------------------------
Move 0 was 2.968164 far
Move 1 was 4.428318 far
Move 2 was 1.252996 far
In both Python and Go, this pattern of connecting generators can be continued for many layers of functions. This technique allows you to join small pieces into large pipelines that produce complex outcomes, while still being easy to understand.
Conclusion
Generators allow you to write more scalable versions of functions that can be used in many different situations.
The biggest practical limitation of generators is that you can only access each item produced by the generator a single time. If you have to iterate over a generator's full result data multiple times (e.g., to implement a function like median value), holding all of a function's results in memory might make more sense.
The added complexity of writing generators in Python is almost zero, and often generator functions are easier to read because the yield expression eliminates superfluous code. Writing generator-like functions in Go is a bit more difficult because you have to directly wire together channels and goroutines to produce the iterative behavior.
However, the burden of using generator-like functions in both languages is nearly the same complexity as the slice- or list-based versions. The performance of using generators, especially once you scale up, is nearly the same or better in both languages. From the perspective of an API consumer, there are few downsides to using a generator version of a function.
Thus, I encourage you to consider using generators in Python and generator-like functions in Go whenever your inputs and outputs may potentially be unbounded.
Thanks to Andy Smith for reviewing an earlier draft of this post.