Atomic updates: Difference between revisions

/* {{header|Go}} big simplification. drop variant solutions, just show one solution that works well.
No edit summary
(/* {{header|Go}} big simplification. drop variant solutions, just show one solution that works well.)
Line 1,293:
 
=={{header|Go}}==
Four solutions presented here. All share the same data type declaration, as specified by the task, that supports get and transfer operations, and all share the same code to exercise the data type. This common code represents "sloppy/buggy/competing client" code, as discussed on the talk page.
 
Differences in the solutions are in the implementation of the data type and it's methods. This is where synchronization is managed and invariants are maintained.
===Channels===
This is the straightforward solution suggested by the task description. It uses a Go channel for synchronization, but really uses the channel just as a mutex. A sync.Mutex could be trivially substituted.
 
Common code:
<lang go>package main
 
Line 1,305 ⟶ 1,298:
"fmt"
"math/rand"
"sync"
"time"
)
 
const nBuckets = 10
// Data type required by task.
type bucketList interface {
// Two operations required by task. Updater parameter not specified
// by task, but useful for displaying update counts as an indication
// that transfer operations are happening "as often as possible."
bucketValue(bucket int) int
transfer(b1, b2, ammount, updater int)
 
type bucketList struct {
// Operation not specified by task, but needed for synchronization.
b [nBuckets]int // bucket data specified by task
snapshot(bucketValues []int, transferCounts []int)
 
// Operationtransfer counts for each updater, not specifiedstrictly required by task, but useful.
// useful to show that the two updaters get fair chances to run.
buckets() int // number of buckets
} tc [2]int
 
sync.Mutex // synchronization
// Total of all bucket values, declared as a const to demonstrate that
}
// it doesn't change.
const originalTotal = 1000
 
// Updater ids, usedto fortrack maintainingnumber transferof countstransfers by updater.
// these can index bucketlist.tc for example.
const (
idOrder = iota
idChaos
)
nUpdaters
)
func main() {
// Create a concrete object implementing the bucketList interface.
bl := newChList(10, originalTotal, nUpdaters)
 
const initialSum = 1000 // sum of all bucket values
// Three concurrent tasks.
go order(bl)
go chaos(bl)
buddha(bl)
}
 
// The concurrent tasks exercise the data operations by going through
// the bucketList interface. They do no explicit synchronization and
// are not responsible for maintaining invariants.
 
// Exercise (1.) required by task: make values more equal.
func order(bl bucketList) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
nBuckets := bl.buckets()
for {
b1 := r.Intn(nBuckets)
b2 := r.Intn(nBuckets)
v1 := bl.bucketValue(b1)
v2 := bl.bucketValue(b2)
if v1 > v2 {
bl.transfer(b1, b2, (v1-v2)/2, idOrder)
} else {
bl.transfer(b2, b1, (v2-v1)/2, idOrder)
}
}
}
 
// Exercise (2.) required by task: redistribute values.
func chaos(bl bucketList) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
nBuckets := bl.buckets()
for {
b1 := r.Intn(nBuckets)
b2 := r.Intn(nBuckets)
bl.transfer(b1, b2, r.Intn(bl.bucketValue(b1)+1), idChaos)
}
}
 
// Exercise (3.) requred by task: display total.
func buddha(bl bucketList) {
nBuckets := bl.buckets()
s := make([]int, nBuckets)
tc := make([]int, nUpdaters)
var total, nTicks int
 
fmt.Println("sum ---updates--- mean buckets")
tr := time.Tick(time.Second / 10)
for {
var sum int
<-tr
bl.snapshot(s, tc)
for _, l := range s {
if l < 0 {
panic("sob") // invariant not preserved
}
sum += l
}
// Output number of updates per tick and cummulative mean
// updates per tick to demonstrate "as often as possible"
// of task exercises 1 and 2.
total += tc[0] + tc[1]
nTicks++
fmt.Printf("%d %6d %6d %7d %v\n", sum, tc[0], tc[1], total/nTicks, s)
if sum != originalTotal {
panic("weep") // invariant not preserved
}
}
}</lang>
Data type implementation:
<lang go>// chList (ch for channel-synchronized) is a concrete type implementing
// the bucketList interface. The bucketList interface declared methods,
// the struct type here declares data. chList methods are repsonsible
// for synchronization so they are goroutine-safe. They are also
// responsible for maintaining the invariants that the sum of all buckets
// stays constant and that no bucket value goes negative.
type chList struct {
b []int // bucket data specified by task
s chan bool // syncronization object
tc []int // a transfer count for each updater
}
 
// Constructor.
func newBucketList() *bucketList {
func newChList(nBuckets, initialSum, nUpdaters int) *chList {
blvar :=bl &chList{bucketList
b: make([]int, nBuckets),
s: make(chan bool, 1),
tc: make([]int, nUpdaters),
}
// Distribute initialSum across buckets.
for i, dist := nBuckets, initialSum; i > 0; {
v := dist / i
i--
bl.b[i] = v
dist -= v
}
return &bl
// Synchronization is needed to maintain the invariant that the total
// of all bucket values stays the same. This is an implementation of
// the straightforward solution mentioned in the task description,
// ensuring that only one transfer happens at a time. Channel s
// holds a token. All methods must take the token from the channel
// before accessing data and then return the token when they are done.
// it is equivalent to a mutex. The constructor makes data available
// by initially dropping the token in the channel after all data is
// initialized.
bl.s <- true
return bl
}
 
// method 1 required by task, get current value of a bucket
// Four methods implementing the bucketList interface.
func (bl *chListbucketList) bucketValue(b int) int {
<-bl.sLock() // get tokenlock before accessing data
r := bl.b[b]
bl.s <- true // return tokenUnlock()
return r
}
 
// method 2 required by task
func (bl *chList) transfer(b1, b2, a int, ux int) {
func (bl *bucketList) transfer(b1, b2, a int, ux int) {
if b1 == b2 { // null operation
return
}
// Get access.
<-bl.sLock()
// Clamping maintains invariant that bucket values remain nonnegative.
if a > bl.b[b1] {
Line 1,468 ⟶ 1,356:
bl.b[b2] += a
bl.tc[ux]++ // increment transfer count
bl.Unlock()
// Release "lock".
bl.s <- true
}
 
// additional useful method
func (bl *chList) snapshot(s []int, tc []int) {
func (bl *bucketList) snapshot(s *[nBuckets]int, tc *[2]int) {
<-bl.s
copy(s, bl.bLock()
copy(tc,*s = bl.tc)b
for*tc i := range bl.tc {
bl.tc = [i2]int{} =// clear transfer 0counts
}bl.Unlock()
bl.s <- true
}
 
var bl = newBucketList()
func (bl *chList) buckets() int {
return len(bl.b)
}</lang>
Output shows constant total and lack of any negative bucket counts. It also shows that the order and chaos tasks are given roughly fair chances to run, and that updates are happening at a high rate, "as often as possible."
<pre>
sum ---updates--- mean buckets
1000 26098 21980 48078 [57 106 120 119 129 82 74 90 95 128]
1000 34982 32218 57639 [3 92 42 88 45 89 133 69 219 220]
1000 21142 19716 52045 [75 104 92 66 130 130 83 115 130 75]
1000 19450 25747 50333 [30 69 29 185 124 156 122 124 6 155]
1000 22688 20442 48892 [126 102 8 99 145 102 130 121 122 45]
...
</pre>
 
func main() {
===RWMutex===
// Three concurrent tasks.
There are two optimizations in this version. First, mutexes are somewhat faster than channels. Second, separate mutexes for each bucket allow the two transfer routines, "order" and "chaos", to update the bucket list simultaneously. I use one more lock for the whole list to pause transferring while printing. This lock is a RWMutex, and interestingly, the transfer routines lock it "R" mode when they want to write, and the buddha routine locks it "RW" when it only wants to read. This is because "R" represents the situation where I want to allow simultaneous operations—transfering, and "RW" represents the situation where I need exclusive access—taking a snapshot.
go order() // make values closer to equal
<lang go>// rwList, rw is for RWMutex-synchronized.
go chaos() // arbitrarily redistribute values
type rwList struct {
buddha() // display total value and individual values of each bucket
b []int // bucket data specified by task
 
// Syncronization objects.
m []sync.Mutex // mutex for each bucket
all sync.RWMutex // mutex for entire list, for snapshot operation
 
tc []int // a transfer count for each updater
}
 
// The concurrent tasks exercise the data operations by calling bucketList
// Constructor.
// methods. The bucketList methods are "threadsafe", by which we really mean
func newRwList(nBuckets, initialSum, nUpdaters int) *rwList {
// goroutine-safe. The conconcurrent tasks then do no explicit synchronization
bl := &rwList{
// and are not responsible for maintaining invariants.
b: make([]int, nBuckets),
m: make([]sync.Mutex, nBuckets),
tc: make([]int, nUpdaters),
}
for i, dist := nBuckets, initialSum; i > 0; {
v := dist / i
i--
bl.b[i] = v
dist -= v
}
return bl
}
 
// Exercise 1 required by task: make values more equal.
// Four methods implementing the bucketList interface.
func order() {
func (bl *rwList) bucketValue(b int) int {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
bl.m[b].Lock() // lock on bucket ensures read is atomic
r := bl.b[b]
bl.m[b].Unlock()
return r
}
 
func (bl *rwList) transfer(b1, b2, a int, ux int) {
if b1 == b2 { // null operation
return
}
// RLock on list allows other simultaneous transfers.
bl.all.RLock()
// Locking lowest bucket first prevents deadlock
// with multiple tasks working at the same time.
if b1 < b2 {
bl.m[b1].Lock()
bl.m[b2].Lock()
} else {
bl.m[b2].Lock()
bl.m[b1].Lock()
}
// clamp
if a > bl.b[b1] {
a = bl.b[b1]
}
// transfer
bl.b[b1] -= a
bl.b[b2] += a
bl.tc[ux]++ // increment transfer count
// release
bl.m[b1].Unlock()
bl.m[b2].Unlock()
bl.all.RUnlock()
// With current Go, the program can hang without a call to gosched here.
// It seems that functions in the sync package don't touch the scheduler,
// (which is good) but we need to touch it here to give the channel
// operations in buddha a chance to run. (The current Go scheduler
// is basically cooperative rather than preemptive.)
runtime.Gosched()
}
 
func (bl *rwList) snapshot(s []int, tc []int) {
bl.all.Lock() // RW lock on list prevents transfers during snap.
copy(s, bl.b)
copy(tc, bl.tc)
for i := range bl.tc {
bl.tc[i] = 0
}
bl.all.Unlock()
}
 
func (bl *rwList) buckets() int {
return len(bl.b)
}</lang>
Thoughput can be seen to be relatively better than the channel version.
<pre>
sum ---updates--- mean buckets
1000 38640 39222 77862 [127 199 75 126 11 28 165 111 23 135]
1000 33164 35308 73167 [0 138 13 276 80 23 196 53 71 150]
1000 35407 36654 72798 [88 276 72 78 71 25 28 98 181 83]
1000 39081 40104 74395 [117 33 64 332 76 85 62 123 66 42]
1000 38356 39811 75149 [108 301 41 50 10 165 69 62 20 174]
</pre>
 
===Lock-free===
This version uses no locking for the phase where the two clients are updating the buckets. Instead it watches for collisions and retries as needed.
<lang go>// lf for lock-free
type lfList struct {
b []int32
sync.RWMutex
tc []int
}
// Constructor.
func newLfList(nBuckets, initialSum, nUpdaters int) *lfList {
bl := &lfList{
b: make([]int32, nBuckets),
tc: make([]int, nUpdaters),
}
for i, dist := int32(nBuckets), int32(initialSum); i > 0; {
v := dist / i
i--
bl.b[i] = v
dist -= v
}
return bl
}
 
// Four methods implementing the bucketList interface.
func (bl *lfList) bucketValue(b int) int {
return int(atomic.LoadInt32(&bl.b[b]))
}
 
func (bl *lfList) transfer(b1, b2, a int, ux int) {
if b1 == b2 {
return
}
bl.RLock()
for {
tb1 := int32r.Intn(anBuckets)
v1b2 := atomicr.LoadInt32Intn(&bl.b[b1]nBuckets - 1)
if tb2 >= v1b1 {
t = v1b2++
}
ifv1 atomic:= bl.CompareAndSwapInt32bucketValue(&bl.b[b1], v1, v1-t) {
v2 := atomicbl.AddInt32bucketValue(&bl.b[b2], t)
if v1 > v2 break{
bl.transfer(b1, b2, (v1-v2)/2, idOrder)
} else {
bl.transfer(b2, b1, (v2-v1)/2, idOrder)
}
// else retry
}
bl.tc[ux]++
bl.RUnlock()
runtime.Gosched()
}
 
// Exercise 2 required by task: redistribute values.
func (bl *lfList) snapshot(s []int, tc []int) {
func chaos() {
bl.Lock()
r := rand.New(rand.NewSource(time.Now().Unix()))
for i, bv := range bl.b {
for {
s[i] = int(bv)
b1 := r.Intn(nBuckets)
b2 := r.Intn(nBuckets - 1)
if b2 >= b1 {
b2++
}
bl.transfer(b1, b2, r.Intn(bl.bucketValue(b1)+1), idChaos)
}
for i := range bl.tc {
tc[i], bl.tc[i] = bl.tc[i], 0
}
bl.Unlock()
}
 
// Exercise 3 requred by task: display total.
func (bl *lfList) buckets() int {
func buddha() {
return len(bl.b)
var s [nBuckets]int
}</lang>
var tc [2]int
Clearly this is the way to go when performance matters:
var total, nTicks int
<pre>
sum ---updates--- mean buckets
1000 83713 73128 156841 [80 57 136 144 137 88 132 88 58 80]
1000 79416 83022 159639 [0 132 2 29 89 79 27 281 181 180]
1000 78319 76803 158133 [114 88 106 37 142 70 43 191 18 191]
1000 74888 74702 155997 [184 195 30 112 71 112 70 68 36 122]
1000 81305 76426 156344 [67 34 66 308 168 27 3 168 29 130]
</pre>
===Monitor===
Finally, here is a channel based monitor pattern solution. This solution is worse than any of the above solutions both in terms of code complexity and run time performance. Seriously, don't use this unless you have a really good reason. It is here for reference because it predates the solutions above, it does work, and it shows a different way of doing things.
<lang go>// mnList (mn for monitor-synchronized) is a concrete type implementing
// the bucketList interface. The monitor is a goroutine, all communication
// with it is done through channels, which are the members of mnList.
// All data implementing the buckets is encapsulated in the monitor.
type mnList struct {
vrCh chan *valueReq
trCh chan *transferReq
srCh chan *snapReq
nbCh chan chan int
}
 
fmt.Println("sum ---updates--- mean buckets")
// Constructor makes channels and starts monitor.
tr := time.Tick(time.Second / 10)
func newMnList(nBuckets, initialSum, nUpdaters int) *mnList {
mn := &mnList{
make(chan *valueReq),
make(chan *transferReq),
make(chan *snapReq),
make(chan chan int),
}
go monitor(mn, nBuckets, initialSum, nUpdaters)
return mn
}
 
// Monitor goroutine ecapsulates data and enters a loop to handle requests.
// The loop handles one request at a time, thus serializing all access.
func monitor(mn *mnList, nBuckets, initialSum, nUpdaters int) {
// bucket representation
b := make([]int, nBuckets)
for i, dist := nBuckets, initialSum; i > 0; {
v := dist / i
i--
b[i] = v
dist -= v
}
// transfer count representation
count := make([]int, nUpdaters)
 
// monitor loop
for {
select {<-tr
bl.snapshot(&s, &tc)
// value request operation
casevar vrsum := <-mn.vrCh:int
for _, l := vr.resprange <-s b[vr.bucket]{
if l < 0 {
panic("sob") // transferinvariant not operationpreserved
case tr := <-mn.trCh:
// clamp
if tr.amount > b[tr.from] {
tr.amount = b[tr.from]
}
//sum transfer+= l
b[tr.from] -= tr.amount}
// Output number of b[tr.to]updates +=per tr.amounttick and cummulative mean
// updates per tick to demonstrate "as often as possible"
count[tr.updaterId]++
// of task exercises 1 and 2.
//total snap+= operationtc[0] + tc[1]
case sr := <-mn.srCh:nTicks++
fmt.Printf("%d %6d %6d %7d %3d\n", sum, tc[0], tc[1], total/nTicks, s)
copy(sr.bucketSnap, b)
if sum != initialSum copy(sr.countSnap, count){
forpanic("weep") i// :=invariant rangenot count {preserved
count[i] = 0
}
sr.resp <- true
 
// number of buckets
case nb := <-mn.nbCh:
nb <- nBuckets
}
}
}
 
type valueReq struct {
bucket int
resp chan int
}
 
func (mn *mnList) bucketValue(b int) int {
resp := make(chan int)
mn.vrCh <- &valueReq{b, resp}
return <-resp
}
 
type transferReq struct {
from, to int
amount int
updaterId int
}
 
func (mn *mnList) transfer(b1, b2, a, ux int) {
mn.trCh <- &transferReq{b1, b2, a, ux}
}
 
type snapReq struct {
bucketSnap []int
countSnap []int
resp chan bool
}
 
func (mn *mnList) snapshot(s []int, tc []int) {
resp := make(chan bool)
mn.srCh <- &snapReq{s, tc, resp}
<-resp
}
 
func (mn *mnList) buckets() int {
resp := make(chan int)
mn.nbCh <- resp
return <-resp
}</lang>
{{out}}
Output:
<pre>
sum ---updates--- mean buckets
1000 317832 137235 3407 5101 8508455067 [165100 86100 19100 88100 61100 252100 119100 86100 64100 60100]
1000 391239 339389 3732 592847 5661[ 85 266 895081 [4385 131 122 22037 173 19162 65 2080 6111 3 15762]
1000 509436 497362 2966 730831 4860[ 70 194 194 8575 62 [122 216 129193 100 15310 16 102 27616 100126 0119]
1000 512065 499038 1883 2982 7648800899 [100 8100 166100 54100 53100 94100 177100 273100 40100 35100]
1000 250590 121947 2946 715226 4467[ 47 271 760178 [28061 162 234 119199 63 5873 149 5458 100 63 5079]
...
</pre>
 
1,707

edits