Sunday 5 November 2017

Active Object in Go

In Go, it is very easy to accidentally access the same data from different go-routines creating a race condition. Conventionally, you avoid this sort of problem with a mutex; and, in fact, you can easily do this in Go using sync.Mutex.

A way that is often better, and preferred in Go, is to simply avoid accessing the data from different go-routines at the same time. One way is to send messages (through a channel) to a confining go-routine responsible for all access and control of the data. By confining all access to a single go-routine no locks are required.

You can also confine use of a value by only using it within one go-routine at a time. This is idiomatically done in Go by transferring control of the variable through a channel, but I won't discuss that here as there are plenty of other articles about it. On another note, there are "low level" ways to do "lock-free" concurrency, using atomic operations, but that will have to wait for another time.


Avoiding Locks
“the main advantage
is that it simplifies
writing the code”

So what is the advantage of avoiding locks? Well, a great deal has been written about that (do a search on Google :), some of it misleading. In my opinion the main advantage is that it simplifies writing the code, by not getting side-tracked with other issues like race conditions. Using mutexes, in complex scenarios, is notorious for retaining subtle race conditions, or potential deadlocks or just performance problems such as unintentionally holding a lock while blocking on I/O.

Lock contention is often cited as a major advantage of avoiding locks but that is not really the issue. After all, using a confining go-routine (as described above) replaces lock contentions with contention for use of the go-routine. In fact proper use of locks is often more efficient than channels; it's just that it usually involves convoluted and/or repetitive code. For example, this is a typical scenario using locks:
  1. lock
  2. check "something"
  3. unlock
  4. do a lengthy operation based on knowledge obtained at 2
  5. lock
  6. check "something" again (repeating test at 2)
  7. update data using results at 4 (if "something" hasn't changed)
  8. unlock
However, there are some performance advantages. First, if many threads block on mutex(es) then thread-switching overhead becomes important. Even though the time for thread-switching is only measured in microseconds, if you have thousands of threads it can all add up. Of course, using go-routines lessens this effect, since Go multiplexes them onto a small numbers of threads, but it may still be significant.

Further, I think Go tries to run go-routines on the same core each time, which means that a confining go-routine may be better at maintaining the CPU cache which could have large performance benefits.


Active Object

A useful refinement of the confining go-routine is something that goes by many names but possibly the most common is the Active Object Concurrency Pattern. It is often used in server software involving a lot of simultaneous connections where the overhead of using a thread for every connection is too onerous. I first encountered this with Boost ASIO - the excellent C++ asynchronous I/O library. (Thanks Leon for introducing me to this and explaining it.)

However the code for Boost ASIO is complex, since it needs to create its own light-weight "co-routines" (called strands) to multiplex use of threads.  I wanted to do something similar in Go and I was amazed to find no advice on how to do this. It should be much simpler since Go provides all the requisite parts: go-routines (rather like strands), and channels of closures.

Active Object in Go can be implemented by a go-routine that reads closures from a channel (chan func() ) and executes them. This simple system means that all the closures, containing the code that accesses the data, are run on the same go-routine in the order they are posted to the channel. I guess the best way for you to understand this is with an example.

My example uses the quintypical example of concurrent coding - the bank account. First, we look at a race condition and how to fix it with a mutex, then using an Active Object. Of course, there are a few complications and things to be aware which I also explain and demonstrate in the code below.

Note that later code examples make heavy use of anonymous functions (closures), even nesting them. If you are unfamiliar with how they work you may need to read up on them first.


Race Example

Here is the code for a poorly-featured bank account that only allows deposits. Note that I could have written the Deposit() function more simply in one line (ac.bal += amt), but the code below is designed to trigger the race condition, which is there anyway, but the delay caused by the sleep should expose it. (This is one of the biggest problems with race conditions - they may be lurking but invisible - which is why you should get into the habit of using the Go Race Detector.)


type (
  Money int64 // cents
  Account struct {
    bal Money
  }
)

const Dollars Money = 100  // 100 cents to the dollar

// NewAccount creates a new account with bonus $100.
func NewAccount() *Account {
  return &Account{bal: 100 * Dollars}
}

// Deposit adds money to an account.
func (ac *Account) Deposit(amt Money) {
  current := ac.bal
  time.Sleep(1*time.Millisecond)
  ac.bal = current + amt
}

func (ac Account) Balance() Money { return ac.bal }


Now let's do a few concurrent deposits. Note that I tested all this code in a single (main) package. If you want to try it you could move all the "account" code to a separate package (eg bank), but then you need to call the function to create a new account as bank.NewAccount().


  ac := NewAccount()
  go ac.Deposit(1000 * Dollars)
  go ac.Deposit(200 * Dollars)
  time.Sleep(100*time.Millisecond)
  fmt.Printf("Balance: $%2.2f\n", ac.Balance()/100.0)


If you run the above code you will be disappointed to find that one of the deposits has gone missing. The deposits are run on separate go routines causing a race condition on bal.


Mutex Example

Luckily, this is easily fixed using a mutex to protect concurrent access to bal. We add a mutex to every account since if we had just one mutex for all accounts that would create a contention problem if many accounts were being updated at the same time.


type (
  Account struct {
    bal Money
    mutex sync.Mutex
  }
)

// Deposit adds money to an account.
func (ac *Account) Deposit(amt Money) {
  ac.mutex.Lock()
  defer ac.mutext.Unlock()

  current := ac.bal
  time.Sleep(1*time.Millisecond)
  ac.bal = current + amt
}

// Balance returns funds available.
func (ac *Account) Balance() Money {
  ac.mutex.Lock()
  defer ac.mutext.Unlock()

  return ac.bal
}


Note that Balance() now takes a pointer receiver, otherwise we would only be locking a copy of the mutex. We have to lock the mutex in the Balance() function even though it only reads from the value since there can be concurrent write operations. (If there are lots of reads and very few writes then a sync.RWMutex may be better than a sync.Mutex but that is another story.)


Active Object Example

OK that avoids the race condition by using a mutex, but how do we do this using the Active Object pattern? First, instead of a mutex we use a channel of functions. We also need to start a go-routine for each account in the NewAccount() function, which reads from the channel and runs the functions. Finally, instead of updating ac.bal directly in the Deposit() function we wrap the code in a closure (lambda function) and post this closure onto the account channel so that the account's go-routine will process it when it gets a chance.


type (
  Account struct {
    bal Money
    ch chan<- func()
  }
)

func NewAccount() *Account {
  ch := make(chan func())
  go func() {
    for f := range ch { f() }
  }()
  return &Account{bal: 100*Dollars, ch: ch}
}

// Deposit adds money to an account.
func (ac *Account) Deposit(amt Money) {
  ac.ch <- func() {
    current := ac.bal
    time.Sleep(1*time.Millisecond)
    ac.bal = current + amt
  }
}


Note that the unnamed function created in Deposit() and posted onto the account's channel is a closure (or lambda). Closures have the useful ability to capture variables from their enclosing scope (in this case ac.bal and amt).

Moreover if you make ch into a buffered channel, then the account can handle multiple concurrent deposits without ever blocking the calls to Deposit(). This means that transient spikes in activity on the account will be handled smoothly. Of course, a sustained onslaught of deposits, sent faster than they can be processed will eventually cause blocking when the channel buffer becomes full.


Returning Values

You may have noticed that the above code does not include a Balance() method. Before showing the code for Balance(), I need to explain how to "return" a value; because the closures are invoked asynchronously you can't simply use a function that returns a value. Even for methods that only update something we may want to return an error to indicate that something went wrong.

So how do we do it? We simply pass in a callback function (probably a closure) that is called when the operation completes (or fails with an error).

In the following code I have implemented the Balance() method but I have also replaced the Deposit() method with Add() since we are going to use it for withdrawals (allowing for negative amounts) too. Withdrawals may generate an error if there are insufficient funds in the account, so we pass a callback which can "return" an error.


// Adds transfers money to/from an account.
func (ac *Account) Add(amt Money, callback func(error)) {
  ac.ch <- func() {
    if ac.bal + amt < 0 {
      callback(fmt.Errorf("insuff. funds %v for w/d %v",
                          ac.bal, amt))
      return
    }
    ac.bal += amt
    callback(nil)   // successful transfer
  }
}

// Balance provides funds available.
func (ac *Account) Balance(callback func(Money)) {
  ac.ch <- func() {
    callback(ac.bal)
  }
}


Now here is some code that makes two deposits and attempts a very large withdrawal. Notice that for the deposits we provide a callback (closure) that does nothing - passing a +ve amount means the operation cannot fail so we ignore the possibility of an error. For the withdrawal, we check if there was an error and just print it out.


  ac := NewAccount()
  ac.Add(1000 * Dollars, func(error) {} )
  ac.Add(200 * Dollars, func(error) {} )
  ac.Add(-1e6 * Dollars, func(err error) {
    if err != nil { fmt.Println(err) }
  })
  ac.Balance(func(bal Money) {
    fmt.Printf("Balance: $%v\n", bal/100)
  })
  time.Sleep(100*time.Millisecond)


The first thing you may have noticed is that we don't have the keyword go before the call to ac.Add(), as we did above for ac.Deposit(). This is not necessary as most of the Add() function's code has been made asynchronous anyway. That is, the actual work is done in a closure posted onto the account's channel (for execution by the account's go-routine) allowing Add() to return almost immediately.

Notice also the call to Sleep() in the final line of code which is simply there to prevent the program exiting immediately. If you run the above in a main() function you may not see any messages. When main() returns the program ends and all active go-routines are silently terminated.  So the calls to Println(), executed in the account's go-routine may not get a chance to execute. Later I will look at how to wait for all pending operations on an account to complete.

A crucial thing to remember here is that the callbacks are run on the account's go-routine. This is important to keep this in mind since it is very easy to access a variable from the caller's go-routine in the callback. . If you need to send information back to the posting go-routine the callback can post to another Active Object channel as we will see.
“if you perform
a lengthy operation
... in the callback ...
delay other operations
… even cause deadlock”

Another important thing to remember is that if you perform a lengthy operation, or an operation that may block, in the callback then you will delay other operations on the account or even cause deadlock. In the example code above, I only call fmt.Printf() inside the callback but even that may be too much for a server that is handling hundreds of thousands of requests per second.


Transfers between Accounts

We have basic account features but more advanced features can introduce pitfalls to be aware of.  Here is the code for a method to transfer funds between accounts.


// WARNING: This code has problems

// TransferTo moves funds between accounts.
func (ac *Account) TransferTo(to *Account, amt Money,  
                              callback func(error)) {
   ac.ch <- func() {
    if amt > ac.bal {
      callback(fmt.Errorf("Insuff. funds %v for tfr %v",
                          ac.bal, amt))
      return
    }
    ac.bal -= amt
    to.Add(amt, callback)
  }
}


To understand what is happening here you need to remember that each account has its own go-routine. The code inside the above closure is executed on the "from" account's go-routine, the end of which calls to.Add() which posts to the "to" account's go-routine. The third parameter (callback) to TransferTo() is effectively a pointer to a function that is "captured" in the closure and passed on to to.Add() whence it is again captured and called to process the result of the final Add().

However, there are two problems with this code. First, you should not be able to transfer out more funds than are available (ie we need to check that -amt <= to.bal). The second problem is due to possible deadlocks - eg if two transfers are done simultaneously in opposite directions then each account may block the other - but we'll address that problem later.

How would we fix the first problem? My first thought was something like this:


// WARNING: This code has worse problems

// TransferTo moves funds between accounts.
func (ac *Account) TransferTo(to *Account, amt Money,  
                              callback func(error)) {
   ac.ch <- func() {
    if amt > ac.bal {
      callback(fmt.Errorf("Insuff. funds %v for tfr %v",
                          ac.bal, amt))
      return
    } else if amt < 0 && -amt > to.bal {
      callback(fmt.Errorf("Insuff. funds %v for tfr %v",
                          to.bal, -amt))
      return
    }
    ac.bal -= amt
    to.Add(amt, callback)
  }
}


Can you see a problem here? If not, think about which go-routine is used to run the above code.  All the code inside the closure (including the new code in red) runs on the ac account's go-routine, but it accesses to.bal, access to which should be confined to the to account's go-routine. (Remember that each account has it's own go-routine which is the only place where that account's bal should be used.)


// WARNING: There is still a problem

// TransferTo moves funds between accounts.
func (ac *Account) TransferTo(to *Account, amt Money,  
                              callback func(error)) {
   ac.ch <- func() {
    if amt < 0 {
      to.TransferTo(ac, -amt, callback)
      return
    }
    if amt > ac.bal {
      callback(fmt.Errorf("Insuff. funds %v for tfr %v",
                          ac.bal, amt))
      return
    }
    ac.bal -= amt
    to.Add(amt, callback)
  }
}


This fixes the problem with accessing to.bal on the wrong go-routine but as I mentioned before there is also a deadlock problem.


Deadlock

The first thing to note is that a channel in go has a fixed size; this means that any call to Add(), Balance() or Transfer() will block if the channel is full. If other concurrent requests can be posted to the accounts then the "to" account may be blocked waiting for the ac.Transfer() request to be posted which then blocks the "ac" account in the call to to.Add(). This causes a mutual deadlock between the two accounts.  A simpler scenario is where an account posts to its own channel causing itself to deadlock.

A solution to these problems is just to fire up another go routine like this.


// TransferTo moves funds between accounts.
func (ac *Account) TransferTo(to *Account, amt Money,  
                              callback func(error)) {
   ac.ch <- func() {
    if amt < 0 {
      go to.TransferTo(ac, -amt, callback)
      return
    }
    if amt > ac.bal {
      callback(fmt.Errorf("Insuff. funds %v for tfr %v",
                          ac.bal, amt))
      return
    }
    ac.bal -= amt
    go to.Add(amt, callback)
  }
}


This prevents TransferTo() from blocking and avoids the deadlock. The disadvantage is that we have no guarantees about when the request will be posted if the account channel is overloaded. In this case it may mean there is a delay between the "ac" account being debited and the "to" account being credited. In this example it is not a problem since the funds will eventually be transferred.

A solution that preserves the order of posted requests is to have two channels: one for external requests and one for priority requests (see priChan below) posted by an account to itself or to another account.


type (
  Account struct {
    bal Money
    pubChan, priChan chan<- func()
  }
)

func NewAccount() *Account {
  pub := make(chan func(), 2)
  pri := make(chan func(), 20)  // "private" chan

  go func() {
    for {
      if len(pri) > 0 {
        f := <- pri
        f()
      } else {
        select {
        case f := <- pri:
          f()
        case f := <- pub:
          f()
        }
      }
  }()
  return &Account{
    bal: 100*Dollars, 
    pubChan: pub, 
    priChan: pri,
  }
}


The idea is not to have any "circular" posts - that is any closures posted to a channel should never end up posting back to the same channel. In this way deadlock is not possible.


Conclusion

I hope I have demonstrated how easy it is to use the Active Object Concurrency Pattern in Go. As long as you understand how it works and are aware of the pitfalls it provides a simpler, and possibly more efficient, solution than using mutexes.

One pitfall is that, even though there is no visible locking, it is easy to create a deadlock if an Active Object's method posts (directly or indirectly) back to it's own channel, since channels have a fixed size once created. But this can be avoided as discussed above.

One thing that is very easy to do in Go is accidentally access a confined variable from the wrong go-routine. In another language like C it would be easy (though not portable) to use an assertion to verify that code is running on the right thread. Unfortunately, Go does not provide any identifier for go-routines (for arguably good reasons), but this hinders attempts to ensure that the code behaves correctly. Luckily there are (deprecated) ways to determine an go-routine's "indentity" which I will explore and keep you informed.

Also I have not explored how to wait for pending asynchronous operations to complete as I promised above. This post is long enough so we will look at that next time.