diff options
author | Dan Willemsen <dwillemsen@google.com> | 2021-08-27 17:12:27 -0700 |
---|---|---|
committer | Dan Willemsen <dwillemsen@google.com> | 2021-08-27 17:12:40 -0700 |
commit | 017d6e2373ef77b5861649f777857f07b195f59c (patch) | |
tree | ca561c8e389e01a562847f5e3f95c690e5fcde49 | |
parent | 6ec98728804a67c2ee80bbfffae3233a5b46168d (diff) | |
parent | 036812b2e83c0ddf193dd5a34e034151da389d09 (diff) | |
download | golang-x-sync-017d6e2373ef77b5861649f777857f07b195f59c.tar.gz |
Merge commit '036812b2e83c0ddf193dd5a34e034151da389d09'
Change-Id: I7d5acc65263db2cafe3b1265d8fa1d1f4397ea85
-rw-r--r-- | METADATA | 13 | ||||
-rw-r--r-- | README.md | 2 | ||||
-rw-r--r-- | errgroup/errgroup_test.go | 4 | ||||
-rw-r--r-- | semaphore/semaphore.go | 11 | ||||
-rw-r--r-- | semaphore/semaphore_bench_test.go | 1 | ||||
-rw-r--r-- | semaphore/semaphore_test.go | 31 | ||||
-rw-r--r-- | singleflight/singleflight.go | 114 | ||||
-rw-r--r-- | singleflight/singleflight_test.go | 249 | ||||
-rw-r--r-- | syncmap/go19.go | 1 | ||||
-rw-r--r-- | syncmap/pre_go19.go | 1 |
10 files changed, 363 insertions, 64 deletions
@@ -1,8 +1,5 @@ name: "golang-x-sync" -description: - "This repository provides Go concurrency primitives in addition to the " - "ones provided by the language and 'sync' and 'sync/atomic' packages." - +description: "This repository provides Go concurrency primitives in addition to the ones provided by the language and \'sync\' and \'sync/atomic\' packages." third_party { url { type: HOMEPAGE @@ -12,7 +9,11 @@ third_party { type: GIT value: "https://go.googlesource.com/sync/" } - version: "112230192c58" - last_upgrade_date { year: 2019 month: 11 day: 22 } + version: "036812b2e83c0ddf193dd5a34e034151da389d09" license_type: NOTICE + last_upgrade_date { + year: 2021 + month: 8 + day: 27 + } } @@ -1,5 +1,7 @@ # Go Sync +[![Go Reference](https://pkg.go.dev/badge/golang.org/x/sync.svg)](https://pkg.go.dev/golang.org/x/sync) + This repository provides Go concurrency primitives in addition to the ones provided by the language and "sync" and "sync/atomic" packages. diff --git a/errgroup/errgroup_test.go b/errgroup/errgroup_test.go index 889323f..5a0b9cb 100644 --- a/errgroup/errgroup_test.go +++ b/errgroup/errgroup_test.go @@ -34,7 +34,7 @@ func fakeSearch(kind string) Search { // simplify goroutine counting and error handling. This example is derived from // the sync.WaitGroup example at https://golang.org/pkg/sync/#example_WaitGroup. func ExampleGroup_justErrors() { - var g errgroup.Group + g := new(errgroup.Group) var urls = []string{ "http://www.golang.org/", "http://www.google.com/", @@ -114,7 +114,7 @@ func TestZeroGroup(t *testing.T) { } for _, tc := range cases { - var g errgroup.Group + g := new(errgroup.Group) var firstErr error for i, err := range tc.errs { diff --git a/semaphore/semaphore.go b/semaphore/semaphore.go index 7f096fe..30f632c 100644 --- a/semaphore/semaphore.go +++ b/semaphore/semaphore.go @@ -67,7 +67,12 @@ func (s *Weighted) Acquire(ctx context.Context, n int64) error { // fix up the queue, just pretend we didn't notice the cancelation. err = nil default: + isFront := s.waiters.Front() == elem s.waiters.Remove(elem) + // If we're at the front and there're extra tokens left, notify other waiters. + if isFront && s.size > s.cur { + s.notifyWaiters() + } } s.mu.Unlock() return err @@ -97,6 +102,11 @@ func (s *Weighted) Release(n int64) { s.mu.Unlock() panic("semaphore: released more than held") } + s.notifyWaiters() + s.mu.Unlock() +} + +func (s *Weighted) notifyWaiters() { for { next := s.waiters.Front() if next == nil { @@ -123,5 +133,4 @@ func (s *Weighted) Release(n int64) { s.waiters.Remove(next) close(w.ready) } - s.mu.Unlock() } diff --git a/semaphore/semaphore_bench_test.go b/semaphore/semaphore_bench_test.go index f96d349..3b60ca8 100644 --- a/semaphore/semaphore_bench_test.go +++ b/semaphore/semaphore_bench_test.go @@ -2,6 +2,7 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. +//go:build go1.7 // +build go1.7 package semaphore_test diff --git a/semaphore/semaphore_test.go b/semaphore/semaphore_test.go index b5f8f13..6e8eca2 100644 --- a/semaphore/semaphore_test.go +++ b/semaphore/semaphore_test.go @@ -169,3 +169,34 @@ func TestLargeAcquireDoesntStarve(t *testing.T) { sem.Release(n) wg.Wait() } + +// translated from https://github.com/zhiqiangxu/util/blob/master/mutex/crwmutex_test.go#L43 +func TestAllocCancelDoesntStarve(t *testing.T) { + sem := semaphore.NewWeighted(10) + + // Block off a portion of the semaphore so that Acquire(_, 10) can eventually succeed. + sem.Acquire(context.Background(), 1) + + // In the background, Acquire(_, 10). + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + sem.Acquire(ctx, 10) + }() + + // Wait until the Acquire(_, 10) call blocks. + for sem.TryAcquire(1) { + sem.Release(1) + runtime.Gosched() + } + + // Now try to grab a read lock, and simultaneously unblock the Acquire(_, 10) call. + // Both Acquire calls should unblock and return, in either order. + go cancel() + + err := sem.Acquire(context.Background(), 1) + if err != nil { + t.Fatalf("Acquire(_, 1) failed unexpectedly: %v", err) + } + sem.Release(1) +} diff --git a/singleflight/singleflight.go b/singleflight/singleflight.go index 97a1aa4..690eb85 100644 --- a/singleflight/singleflight.go +++ b/singleflight/singleflight.go @@ -6,7 +6,42 @@ // mechanism. package singleflight // import "golang.org/x/sync/singleflight" -import "sync" +import ( + "bytes" + "errors" + "fmt" + "runtime" + "runtime/debug" + "sync" +) + +// errGoexit indicates the runtime.Goexit was called in +// the user given function. +var errGoexit = errors.New("runtime.Goexit was called") + +// A panicError is an arbitrary value recovered from a panic +// with the stack trace during the execution of given function. +type panicError struct { + value interface{} + stack []byte +} + +// Error implements error interface. +func (p *panicError) Error() string { + return fmt.Sprintf("%v\n\n%s", p.value, p.stack) +} + +func newPanicError(v interface{}) error { + stack := debug.Stack() + + // The first line of the stack trace is of the form "goroutine N [status]:" + // but by the time the panic reaches Do the goroutine may no longer exist + // and its status will have changed. Trim out the misleading line. + if line := bytes.IndexByte(stack[:], '\n'); line >= 0 { + stack = stack[line+1:] + } + return &panicError{value: v, stack: stack} +} // call is an in-flight or completed singleflight.Do call type call struct { @@ -57,6 +92,12 @@ func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, e c.dups++ g.mu.Unlock() c.wg.Wait() + + if e, ok := c.err.(*panicError); ok { + panic(e) + } else if c.err == errGoexit { + runtime.Goexit() + } return c.val, c.err, true } c := new(call) @@ -70,6 +111,8 @@ func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, e // DoChan is like Do but returns a channel that will receive the // results when they are ready. +// +// The returned channel will not be closed. func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { ch := make(chan Result, 1) g.mu.Lock() @@ -94,17 +137,66 @@ func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result // doCall handles the single call for a key. func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { - c.val, c.err = fn() - c.wg.Done() - - g.mu.Lock() - if !c.forgotten { - delete(g.m, key) - } - for _, ch := range c.chans { - ch <- Result{c.val, c.err, c.dups > 0} + normalReturn := false + recovered := false + + // use double-defer to distinguish panic from runtime.Goexit, + // more details see https://golang.org/cl/134395 + defer func() { + // the given function invoked runtime.Goexit + if !normalReturn && !recovered { + c.err = errGoexit + } + + c.wg.Done() + g.mu.Lock() + defer g.mu.Unlock() + if !c.forgotten { + delete(g.m, key) + } + + if e, ok := c.err.(*panicError); ok { + // In order to prevent the waiting channels from being blocked forever, + // needs to ensure that this panic cannot be recovered. + if len(c.chans) > 0 { + go panic(e) + select {} // Keep this goroutine around so that it will appear in the crash dump. + } else { + panic(e) + } + } else if c.err == errGoexit { + // Already in the process of goexit, no need to call again + } else { + // Normal return + for _, ch := range c.chans { + ch <- Result{c.val, c.err, c.dups > 0} + } + } + }() + + func() { + defer func() { + if !normalReturn { + // Ideally, we would wait to take a stack trace until we've determined + // whether this is a panic or a runtime.Goexit. + // + // Unfortunately, the only way we can distinguish the two is to see + // whether the recover stopped the goroutine from terminating, and by + // the time we know that, the part of the stack trace relevant to the + // panic has been discarded. + if r := recover(); r != nil { + c.err = newPanicError(r) + } + } + }() + + c.val, c.err = fn() + normalReturn = true + }() + + if !normalReturn { + recovered = true } - g.mu.Unlock() } // Forget tells the singleflight to forget about a key. Future calls diff --git a/singleflight/singleflight_test.go b/singleflight/singleflight_test.go index ad04037..3e51203 100644 --- a/singleflight/singleflight_test.go +++ b/singleflight/singleflight_test.go @@ -5,8 +5,14 @@ package singleflight import ( + "bytes" "errors" "fmt" + "os" + "os/exec" + "runtime" + "runtime/debug" + "strings" "sync" "sync/atomic" "testing" @@ -91,69 +97,224 @@ func TestDoDupSuppress(t *testing.T) { func TestForget(t *testing.T) { var g Group - var firstStarted, firstFinished sync.WaitGroup + var ( + firstStarted = make(chan struct{}) + unblockFirst = make(chan struct{}) + firstFinished = make(chan struct{}) + ) - firstStarted.Add(1) - firstFinished.Add(1) - - firstCh := make(chan struct{}) go func() { g.Do("key", func() (i interface{}, e error) { - firstStarted.Done() - <-firstCh - firstFinished.Done() + close(firstStarted) + <-unblockFirst + close(firstFinished) return }) }() + <-firstStarted + g.Forget("key") - firstStarted.Wait() - g.Forget("key") // from this point no two function using same key should be executed concurrently + unblockSecond := make(chan struct{}) + secondResult := g.DoChan("key", func() (i interface{}, e error) { + <-unblockSecond + return 2, nil + }) - var secondStarted int32 - var secondFinished int32 - var thirdStarted int32 + close(unblockFirst) + <-firstFinished - secondCh := make(chan struct{}) - secondRunning := make(chan struct{}) - go func() { - g.Do("key", func() (i interface{}, e error) { + thirdResult := g.DoChan("key", func() (i interface{}, e error) { + return 3, nil + }) + + close(unblockSecond) + <-secondResult + r := <-thirdResult + if r.Val != 2 { + t.Errorf("We should receive result produced by second call, expected: 2, got %d", r.Val) + } +} + +func TestDoChan(t *testing.T) { + var g Group + ch := g.DoChan("key", func() (interface{}, error) { + return "bar", nil + }) + + res := <-ch + v := res.Val + err := res.Err + if got, want := fmt.Sprintf("%v (%T)", v, v), "bar (string)"; got != want { + t.Errorf("Do = %v; want %v", got, want) + } + if err != nil { + t.Errorf("Do error = %v", err) + } +} + +// Test singleflight behaves correctly after Do panic. +// See https://github.com/golang/go/issues/41133 +func TestPanicDo(t *testing.T) { + var g Group + fn := func() (interface{}, error) { + panic("invalid memory address or nil pointer dereference") + } + + const n = 5 + waited := int32(n) + panicCount := int32(0) + done := make(chan struct{}) + for i := 0; i < n; i++ { + go func() { + defer func() { + if err := recover(); err != nil { + t.Logf("Got panic: %v\n%s", err, debug.Stack()) + atomic.AddInt32(&panicCount, 1) + } + + if atomic.AddInt32(&waited, -1) == 0 { + close(done) + } + }() + + g.Do("key", fn) + }() + } + + select { + case <-done: + if panicCount != n { + t.Errorf("Expect %d panic, but got %d", n, panicCount) + } + case <-time.After(time.Second): + t.Fatalf("Do hangs") + } +} + +func TestGoexitDo(t *testing.T) { + var g Group + fn := func() (interface{}, error) { + runtime.Goexit() + return nil, nil + } + + const n = 5 + waited := int32(n) + done := make(chan struct{}) + for i := 0; i < n; i++ { + go func() { + var err error defer func() { + if err != nil { + t.Errorf("Error should be nil, but got: %v", err) + } + if atomic.AddInt32(&waited, -1) == 0 { + close(done) + } }() - atomic.AddInt32(&secondStarted, 1) - // Notify that we started - secondCh <- struct{}{} - // Wait other get above signal - <-secondRunning - <-secondCh - atomic.AddInt32(&secondFinished, 1) - return 2, nil + _, err, _ = g.Do("key", fn) + }() + } + + select { + case <-done: + case <-time.After(time.Second): + t.Fatalf("Do hangs") + } +} + +func TestPanicDoChan(t *testing.T) { + if runtime.GOOS == "js" { + t.Skipf("js does not support exec") + } + + if os.Getenv("TEST_PANIC_DOCHAN") != "" { + defer func() { + recover() + }() + + g := new(Group) + ch := g.DoChan("", func() (interface{}, error) { + panic("Panicking in DoChan") }) - }() + <-ch + t.Fatalf("DoChan unexpectedly returned") + } - close(firstCh) - firstFinished.Wait() // wait for first execution (which should not affect execution after Forget) + t.Parallel() - <-secondCh - // Notify second that we got the signal that it started - secondRunning <- struct{}{} - if atomic.LoadInt32(&secondStarted) != 1 { - t.Fatal("Second execution should be executed due to usage of forget") + cmd := exec.Command(os.Args[0], "-test.run="+t.Name(), "-test.v") + cmd.Env = append(os.Environ(), "TEST_PANIC_DOCHAN=1") + out := new(bytes.Buffer) + cmd.Stdout = out + cmd.Stderr = out + if err := cmd.Start(); err != nil { + t.Fatal(err) } - if atomic.LoadInt32(&secondFinished) == 1 { - t.Fatal("Second execution should be still active") + err := cmd.Wait() + t.Logf("%s:\n%s", strings.Join(cmd.Args, " "), out) + if err == nil { + t.Errorf("Test subprocess passed; want a crash due to panic in DoChan") } + if bytes.Contains(out.Bytes(), []byte("DoChan unexpectedly")) { + t.Errorf("Test subprocess failed with an unexpected failure mode.") + } + if !bytes.Contains(out.Bytes(), []byte("Panicking in DoChan")) { + t.Errorf("Test subprocess failed, but the crash isn't caused by panicking in DoChan") + } +} - close(secondCh) - result, _, _ := g.Do("key", func() (i interface{}, e error) { - atomic.AddInt32(&thirdStarted, 1) - return 3, nil - }) +func TestPanicDoSharedByDoChan(t *testing.T) { + if runtime.GOOS == "js" { + t.Skipf("js does not support exec") + } + + if os.Getenv("TEST_PANIC_DOCHAN") != "" { + blocked := make(chan struct{}) + unblock := make(chan struct{}) - if atomic.LoadInt32(&thirdStarted) != 0 { - t.Error("Third call should not be started because was started during second execution") + g := new(Group) + go func() { + defer func() { + recover() + }() + g.Do("", func() (interface{}, error) { + close(blocked) + <-unblock + panic("Panicking in Do") + }) + }() + + <-blocked + ch := g.DoChan("", func() (interface{}, error) { + panic("DoChan unexpectedly executed callback") + }) + close(unblock) + <-ch + t.Fatalf("DoChan unexpectedly returned") + } + + t.Parallel() + + cmd := exec.Command(os.Args[0], "-test.run="+t.Name(), "-test.v") + cmd.Env = append(os.Environ(), "TEST_PANIC_DOCHAN=1") + out := new(bytes.Buffer) + cmd.Stdout = out + cmd.Stderr = out + if err := cmd.Start(); err != nil { + t.Fatal(err) + } + + err := cmd.Wait() + t.Logf("%s:\n%s", strings.Join(cmd.Args, " "), out) + if err == nil { + t.Errorf("Test subprocess passed; want a crash due to panic in Do shared by DoChan") + } + if bytes.Contains(out.Bytes(), []byte("DoChan unexpectedly")) { + t.Errorf("Test subprocess failed with an unexpected failure mode.") } - if result != 2 { - t.Errorf("We should receive result produced by second call, expected: 2, got %d", result) + if !bytes.Contains(out.Bytes(), []byte("Panicking in Do")) { + t.Errorf("Test subprocess failed, but the crash isn't caused by panicking in Do") } } diff --git a/syncmap/go19.go b/syncmap/go19.go index 41a5909..fa04dba 100644 --- a/syncmap/go19.go +++ b/syncmap/go19.go @@ -2,6 +2,7 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. +//go:build go1.9 // +build go1.9 package syncmap diff --git a/syncmap/pre_go19.go b/syncmap/pre_go19.go index 01a7be7..5bba413 100644 --- a/syncmap/pre_go19.go +++ b/syncmap/pre_go19.go @@ -2,6 +2,7 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. +//go:build !go1.9 // +build !go1.9 package syncmap |