Unit Testing Go's Channels With the Time Component

Unit Testing Go's Channels With the Time Component

I used Java during most of my career, which had concurrency support from the early days. It allowed developers to start threads and synchronize access to the data shared by more than one thread. However, my general approach to doing concurrency in Java had always been to not do concurrency if at all possible. The reason for that is it is a very tricky area where pitfalls are common and debugging is complex. It is very hard to know if a concurrent program has no bugs because it is non-deterministic. Every run is potentially different.

Go, unlike Java, has a built-in support of channels, which somewhat simplifies the concurrency paradigm. Channels make it easier for threads (goroutines) to exchange data without a need to explicitly synchronize access to it.

In this post, I will show an example of using channels and goroutines for a real-world problem and explore ways to test code that uses them.

The Problem

Imagine an application that consumes messages from a streaming engine (like Kafka), does some processing of the data and writes the results to a remote service. One of the common requirements for such a system is batching. It means the output is buffered and is written only when the buffer is full. The reason for batching is that each remote call has an overhead and, when a request is small, batching saves quite a lot of hardware resources compared to making each request individually.

We will be handling simple objects called Message that holds just a string:

type Message struct {
   value string
}

We will define a type Accumulator that will read messages from the input channel, buffer them and write them to the output channel when the buffer is full.

type Aggregator struct {
   in         chan Message
   out        chan []Message
   bufferSize int
}

We will later define a method to start a goroutine to process the messages:

func (a *Aggregator) startAggregator()

But first, let's write a unit test that fills the buffer and verify the SUT (subject under test) writes the expected batch to the output channel.

func TestBufferFilled(t *testing.T) {
   a := &Aggregator{
      in:         make(chan Message, 2),
      out:        make(chan []Message, 2),
      bufferSize: 2,
   }
   t.Cleanup(func() {
      close(a.in)
      close(a.out)
   })

   a.startAggregator()

   expected := []Message{
      {value: "a"},
      {value: "b"},
   }
   a.in <- expected[0]
   a.in <- expected[1]
   actual := <-a.out

   assert.Equal(t, expected, actual)
}

We first create an aggregator struct, supply it with the input and output channels and define the buffer of size 2. Note that both buffers are non-blocking so that our test does not result in a deadlock when something isn't right. We also want to close the channels after the test is done no matter what the result is, so we put that code in the t.Cleanup block. After the processing has been kicked off, we write 2 messages into the input channel and read from the output channel expecting to receive the array of our 2 messages.

Now let's define the startAggregator function but without any buffering. It will just mirror the input for now.

func (a *Aggregator) startAggregator() {
   go func() {
      buffer := make([]Message, 0)
      for {
         select {
         case m, channelOpen := <-a.in:
            if channelOpen {
               buffer = append(buffer, m)
               a.out <- buffer
               buffer = make([]Message, 0)
            } else {
               return
            }
         }
      }
   }()
}

Our function kicks off a new goroutine that will be reading from the input channel until it gets closed. It saves the read message into the buffer and flushes it by writing the batch of 1 to the output channel and re-creating the buffer.

If we run the TestBufferFilled test now, it will fail as the read array only has the first message. Note that we can make our channels blocking too but then the test will fail with a deadlock as Go's runtime will find all goroutines blocked on reading or writing to I/O.

Let's fix our startAggregator logic now to actually do the batching and flush when the buffer is full. We will omit the code to flush the buffer when the channel is closed.

func (a *Aggregator) startAggregator() {
    go func() {
        buffer := make([]Message, 0)
        for {
            select {
            case m, channelOpen := <-a.in:
                if channelOpen {
                    // channel is open - flush only if the buffer is full
                    buffer = append(buffer, m)
                    if len(buffer) == a.bufferSize {
                        a.out <- buffer
                        buffer = make([]Message, 0)
                    }
                } else {
                    return
                }
            }
        }
    }()
}

If we re-run the test it succeeds.

In real production code, we would define more test cases to verify both normal and edge conditions. If you think about edge cases, there is one that is not obvious: what happens if the message rate is low? This is the case I came across while troubleshooting a real-world bug. The integration tests sent one message per 5 minutes, which got spread across many instances of Kubernetes pods and the buffer never got flushed.

To fix this issue, let's flush the buffer not only when it is full but on a timer too. It will ensure messages get written within our flush interval even when the traffic is low.

Flushing the Buffer With a Ticker

We will add another startup method that takes a Ticker. The ticker is a Go's object that provides a channel we can read from and get notifications when the time is up. It takes an interval and ticks based on that schedule.

func (a *Aggregator) startAggregatorWithTimer(ticker *time.Ticker)

Our test will now look like this:

func TestFlushOnTimer(t *testing.T) {
   a := &Aggregator{
      in:         make(chan Message, 2),
      out:        make(chan []Message, 2),
      bufferSize: 100,
   }
   t.Cleanup(func() {
      close(a.in)
      close(a.out)
   })

   ticker := time.NewTicker(time.Millisecond * 500)
   a.startAggregatorWithTimer(ticker)

   expected := []Message{
      {value: "a"},
      {value: "b"},
   }
   a.in <- expected[0]
   a.in <- expected[1]

   verifyBatchReceived(t, a.out, expected)
}

The difference from the previous test case is that we set the buffer size to 100 and expect it to be flushed even though it won't get filled. To verify the results, we introduced a new function that listened to the output channel and verified the batch was as expected.

func verifyBatchReceived(t *testing.T, out chan []Message, expected []Message) {
   for {
      select {
      case actual := <-out:
         require.Equal(t, expected, actual)
         return
      default:
      }
   }
}

If we copy the implementation of startAggregator into startAggregatorWithTimer then our new test will be waiting forever as the buffer will never get flushed. This is an example of why testing concurrency is tricky. Let's fix that by limiting the amount of time we wait. The new function will wait for up to 1 second being called every 0.2 seconds.

func verifyBatchReceived(t *testing.T, out chan []Message, expected []Message) {
   require.Eventually(t, func() bool {
      for {
         select {
         case actual := <-out:
            require.Equal(t, expected, actual)
            return true
         default:
            return false
         }
      }

   }, time.Second*1, time.Millisecond*200)
}

Now let's implement flushing on a timer.

func (a *Aggregator) startAggregatorWithTimer(ticker *time.Ticker) {
   go func() {
      buffer := make([]Message, 0)
      defer ticker.Stop()
      for {
         select {
         case m, channelOpen := <-a.in:
            if channelOpen {
               // channel is open - flush only if the buffer is full
               buffer = append(buffer, m)
               if len(buffer) == a.bufferSize {
                  a.out <- buffer
                  buffer = make([]Message, 0)
               }
            } else {
               return
            }
         case <-ticker.C:
            // it's time to flush the buffer
            if len(buffer) > 0 {
               a.out <- buffer
               buffer = make([]Message, 0)
            }
         default: // do not block
         }
      }
   }()
}

We are deferring ticker.Stop() to release its resources and then read from both our input channel and the ticker's channel. If the ticker's time is up, we flush if we have at least one message. Notice how we also added the default branch to the select. This is required so that the first case doesn't block the second and both get a chance to run. Go will choose the case to execute at random. It is also possible to prioritize the cases by having separate selects one after the other.

The Time Component

Our current test cases all work with a small number of messages. Let's add a test with a lot.

func TestLargeBufferFilled(t *testing.T) {
    maxMessages := 1000
    a := &Aggregator{
        in:         make(chan Message),
        out:        make(chan []Message, maxMessages),
        bufferSize: 100,
    }
    t.Cleanup(func() {
        close(a.in)
        close(a.out)
    })

    ticker := time.NewTicker(time.Millisecond * 500)
    a.startAggregatorWithTimer(ticker)

    expected := make([][]Message, maxMessages)
    for i := 0; i < len(expected); i++ {
        expected[i] = make([]Message, 100)
        for j := 0; j < len(expected[i]); j++ {
            expected[i][j] = Message{value: randomString(256)}
            a.in <- expected[i][j]
        }
    }

    verifyBatchesReceived(t, a.out, expected)
}

This test uses a new function verifyBatchesReceived that verifies multiple batches. You may find that the test actually fails as our flushing on an interval interferes with our expected batches of 100 messages each. How do we deal with this issue?

One way is to play with the parameters to make the code behave you'd like it to. For example, we could increase the interval to a large number so that the buffer never got flushed by the ticker. This is ugly though. In general, tests that rely on a time component are not reliable. They may fail when the machine is busy, e.g. a lot of tests run in parallel. Try running your concurrency tests with the -count 100 parameter.

Another technic is stubbing or mocking. Instead of the real ticker, we can pass a stub that does not notify:

ticker := &time.Ticker{C: make(chan time.Time)}

This makes our test happy. It also makes it deterministic as the test no longer relies on the time component of the ticker.

Conclusion

Goroutines and channels simplify concurrency compared to explicit synchronization and thread management in earlier programming languages. But when you introduce a time component, testing becomes tricky as the tests aren't deterministic any more. Use mocking and stubbing to avoid dealing with time and make your tests reliable.

The full source code is available here.