aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Willemsen <dwillemsen@google.com>2021-08-27 17:12:27 -0700
committerDan Willemsen <dwillemsen@google.com>2021-08-27 17:12:40 -0700
commit017d6e2373ef77b5861649f777857f07b195f59c (patch)
treeca561c8e389e01a562847f5e3f95c690e5fcde49
parent6ec98728804a67c2ee80bbfffae3233a5b46168d (diff)
parent036812b2e83c0ddf193dd5a34e034151da389d09 (diff)
downloadgolang-x-sync-017d6e2373ef77b5861649f777857f07b195f59c.tar.gz
Merge commit '036812b2e83c0ddf193dd5a34e034151da389d09'
Change-Id: I7d5acc65263db2cafe3b1265d8fa1d1f4397ea85
-rw-r--r--METADATA13
-rw-r--r--README.md2
-rw-r--r--errgroup/errgroup_test.go4
-rw-r--r--semaphore/semaphore.go11
-rw-r--r--semaphore/semaphore_bench_test.go1
-rw-r--r--semaphore/semaphore_test.go31
-rw-r--r--singleflight/singleflight.go114
-rw-r--r--singleflight/singleflight_test.go249
-rw-r--r--syncmap/go19.go1
-rw-r--r--syncmap/pre_go19.go1
10 files changed, 363 insertions, 64 deletions
diff --git a/METADATA b/METADATA
index 7510a65..2c1fe83 100644
--- a/METADATA
+++ b/METADATA
@@ -1,8 +1,5 @@
name: "golang-x-sync"
-description:
- "This repository provides Go concurrency primitives in addition to the "
- "ones provided by the language and 'sync' and 'sync/atomic' packages."
-
+description: "This repository provides Go concurrency primitives in addition to the ones provided by the language and \'sync\' and \'sync/atomic\' packages."
third_party {
url {
type: HOMEPAGE
@@ -12,7 +9,11 @@ third_party {
type: GIT
value: "https://go.googlesource.com/sync/"
}
- version: "112230192c58"
- last_upgrade_date { year: 2019 month: 11 day: 22 }
+ version: "036812b2e83c0ddf193dd5a34e034151da389d09"
license_type: NOTICE
+ last_upgrade_date {
+ year: 2021
+ month: 8
+ day: 27
+ }
}
diff --git a/README.md b/README.md
index 1f8436c..7c1c8f6 100644
--- a/README.md
+++ b/README.md
@@ -1,5 +1,7 @@
# Go Sync
+[![Go Reference](https://pkg.go.dev/badge/golang.org/x/sync.svg)](https://pkg.go.dev/golang.org/x/sync)
+
This repository provides Go concurrency primitives in addition to the
ones provided by the language and "sync" and "sync/atomic" packages.
diff --git a/errgroup/errgroup_test.go b/errgroup/errgroup_test.go
index 889323f..5a0b9cb 100644
--- a/errgroup/errgroup_test.go
+++ b/errgroup/errgroup_test.go
@@ -34,7 +34,7 @@ func fakeSearch(kind string) Search {
// simplify goroutine counting and error handling. This example is derived from
// the sync.WaitGroup example at https://golang.org/pkg/sync/#example_WaitGroup.
func ExampleGroup_justErrors() {
- var g errgroup.Group
+ g := new(errgroup.Group)
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
@@ -114,7 +114,7 @@ func TestZeroGroup(t *testing.T) {
}
for _, tc := range cases {
- var g errgroup.Group
+ g := new(errgroup.Group)
var firstErr error
for i, err := range tc.errs {
diff --git a/semaphore/semaphore.go b/semaphore/semaphore.go
index 7f096fe..30f632c 100644
--- a/semaphore/semaphore.go
+++ b/semaphore/semaphore.go
@@ -67,7 +67,12 @@ func (s *Weighted) Acquire(ctx context.Context, n int64) error {
// fix up the queue, just pretend we didn't notice the cancelation.
err = nil
default:
+ isFront := s.waiters.Front() == elem
s.waiters.Remove(elem)
+ // If we're at the front and there're extra tokens left, notify other waiters.
+ if isFront && s.size > s.cur {
+ s.notifyWaiters()
+ }
}
s.mu.Unlock()
return err
@@ -97,6 +102,11 @@ func (s *Weighted) Release(n int64) {
s.mu.Unlock()
panic("semaphore: released more than held")
}
+ s.notifyWaiters()
+ s.mu.Unlock()
+}
+
+func (s *Weighted) notifyWaiters() {
for {
next := s.waiters.Front()
if next == nil {
@@ -123,5 +133,4 @@ func (s *Weighted) Release(n int64) {
s.waiters.Remove(next)
close(w.ready)
}
- s.mu.Unlock()
}
diff --git a/semaphore/semaphore_bench_test.go b/semaphore/semaphore_bench_test.go
index f96d349..3b60ca8 100644
--- a/semaphore/semaphore_bench_test.go
+++ b/semaphore/semaphore_bench_test.go
@@ -2,6 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
+//go:build go1.7
// +build go1.7
package semaphore_test
diff --git a/semaphore/semaphore_test.go b/semaphore/semaphore_test.go
index b5f8f13..6e8eca2 100644
--- a/semaphore/semaphore_test.go
+++ b/semaphore/semaphore_test.go
@@ -169,3 +169,34 @@ func TestLargeAcquireDoesntStarve(t *testing.T) {
sem.Release(n)
wg.Wait()
}
+
+// translated from https://github.com/zhiqiangxu/util/blob/master/mutex/crwmutex_test.go#L43
+func TestAllocCancelDoesntStarve(t *testing.T) {
+ sem := semaphore.NewWeighted(10)
+
+ // Block off a portion of the semaphore so that Acquire(_, 10) can eventually succeed.
+ sem.Acquire(context.Background(), 1)
+
+ // In the background, Acquire(_, 10).
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ go func() {
+ sem.Acquire(ctx, 10)
+ }()
+
+ // Wait until the Acquire(_, 10) call blocks.
+ for sem.TryAcquire(1) {
+ sem.Release(1)
+ runtime.Gosched()
+ }
+
+ // Now try to grab a read lock, and simultaneously unblock the Acquire(_, 10) call.
+ // Both Acquire calls should unblock and return, in either order.
+ go cancel()
+
+ err := sem.Acquire(context.Background(), 1)
+ if err != nil {
+ t.Fatalf("Acquire(_, 1) failed unexpectedly: %v", err)
+ }
+ sem.Release(1)
+}
diff --git a/singleflight/singleflight.go b/singleflight/singleflight.go
index 97a1aa4..690eb85 100644
--- a/singleflight/singleflight.go
+++ b/singleflight/singleflight.go
@@ -6,7 +6,42 @@
// mechanism.
package singleflight // import "golang.org/x/sync/singleflight"
-import "sync"
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "runtime"
+ "runtime/debug"
+ "sync"
+)
+
+// errGoexit indicates the runtime.Goexit was called in
+// the user given function.
+var errGoexit = errors.New("runtime.Goexit was called")
+
+// A panicError is an arbitrary value recovered from a panic
+// with the stack trace during the execution of given function.
+type panicError struct {
+ value interface{}
+ stack []byte
+}
+
+// Error implements error interface.
+func (p *panicError) Error() string {
+ return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
+}
+
+func newPanicError(v interface{}) error {
+ stack := debug.Stack()
+
+ // The first line of the stack trace is of the form "goroutine N [status]:"
+ // but by the time the panic reaches Do the goroutine may no longer exist
+ // and its status will have changed. Trim out the misleading line.
+ if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
+ stack = stack[line+1:]
+ }
+ return &panicError{value: v, stack: stack}
+}
// call is an in-flight or completed singleflight.Do call
type call struct {
@@ -57,6 +92,12 @@ func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, e
c.dups++
g.mu.Unlock()
c.wg.Wait()
+
+ if e, ok := c.err.(*panicError); ok {
+ panic(e)
+ } else if c.err == errGoexit {
+ runtime.Goexit()
+ }
return c.val, c.err, true
}
c := new(call)
@@ -70,6 +111,8 @@ func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, e
// DoChan is like Do but returns a channel that will receive the
// results when they are ready.
+//
+// The returned channel will not be closed.
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
@@ -94,17 +137,66 @@ func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result
// doCall handles the single call for a key.
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
- c.val, c.err = fn()
- c.wg.Done()
-
- g.mu.Lock()
- if !c.forgotten {
- delete(g.m, key)
- }
- for _, ch := range c.chans {
- ch <- Result{c.val, c.err, c.dups > 0}
+ normalReturn := false
+ recovered := false
+
+ // use double-defer to distinguish panic from runtime.Goexit,
+ // more details see https://golang.org/cl/134395
+ defer func() {
+ // the given function invoked runtime.Goexit
+ if !normalReturn && !recovered {
+ c.err = errGoexit
+ }
+
+ c.wg.Done()
+ g.mu.Lock()
+ defer g.mu.Unlock()
+ if !c.forgotten {
+ delete(g.m, key)
+ }
+
+ if e, ok := c.err.(*panicError); ok {
+ // In order to prevent the waiting channels from being blocked forever,
+ // needs to ensure that this panic cannot be recovered.
+ if len(c.chans) > 0 {
+ go panic(e)
+ select {} // Keep this goroutine around so that it will appear in the crash dump.
+ } else {
+ panic(e)
+ }
+ } else if c.err == errGoexit {
+ // Already in the process of goexit, no need to call again
+ } else {
+ // Normal return
+ for _, ch := range c.chans {
+ ch <- Result{c.val, c.err, c.dups > 0}
+ }
+ }
+ }()
+
+ func() {
+ defer func() {
+ if !normalReturn {
+ // Ideally, we would wait to take a stack trace until we've determined
+ // whether this is a panic or a runtime.Goexit.
+ //
+ // Unfortunately, the only way we can distinguish the two is to see
+ // whether the recover stopped the goroutine from terminating, and by
+ // the time we know that, the part of the stack trace relevant to the
+ // panic has been discarded.
+ if r := recover(); r != nil {
+ c.err = newPanicError(r)
+ }
+ }
+ }()
+
+ c.val, c.err = fn()
+ normalReturn = true
+ }()
+
+ if !normalReturn {
+ recovered = true
}
- g.mu.Unlock()
}
// Forget tells the singleflight to forget about a key. Future calls
diff --git a/singleflight/singleflight_test.go b/singleflight/singleflight_test.go
index ad04037..3e51203 100644
--- a/singleflight/singleflight_test.go
+++ b/singleflight/singleflight_test.go
@@ -5,8 +5,14 @@
package singleflight
import (
+ "bytes"
"errors"
"fmt"
+ "os"
+ "os/exec"
+ "runtime"
+ "runtime/debug"
+ "strings"
"sync"
"sync/atomic"
"testing"
@@ -91,69 +97,224 @@ func TestDoDupSuppress(t *testing.T) {
func TestForget(t *testing.T) {
var g Group
- var firstStarted, firstFinished sync.WaitGroup
+ var (
+ firstStarted = make(chan struct{})
+ unblockFirst = make(chan struct{})
+ firstFinished = make(chan struct{})
+ )
- firstStarted.Add(1)
- firstFinished.Add(1)
-
- firstCh := make(chan struct{})
go func() {
g.Do("key", func() (i interface{}, e error) {
- firstStarted.Done()
- <-firstCh
- firstFinished.Done()
+ close(firstStarted)
+ <-unblockFirst
+ close(firstFinished)
return
})
}()
+ <-firstStarted
+ g.Forget("key")
- firstStarted.Wait()
- g.Forget("key") // from this point no two function using same key should be executed concurrently
+ unblockSecond := make(chan struct{})
+ secondResult := g.DoChan("key", func() (i interface{}, e error) {
+ <-unblockSecond
+ return 2, nil
+ })
- var secondStarted int32
- var secondFinished int32
- var thirdStarted int32
+ close(unblockFirst)
+ <-firstFinished
- secondCh := make(chan struct{})
- secondRunning := make(chan struct{})
- go func() {
- g.Do("key", func() (i interface{}, e error) {
+ thirdResult := g.DoChan("key", func() (i interface{}, e error) {
+ return 3, nil
+ })
+
+ close(unblockSecond)
+ <-secondResult
+ r := <-thirdResult
+ if r.Val != 2 {
+ t.Errorf("We should receive result produced by second call, expected: 2, got %d", r.Val)
+ }
+}
+
+func TestDoChan(t *testing.T) {
+ var g Group
+ ch := g.DoChan("key", func() (interface{}, error) {
+ return "bar", nil
+ })
+
+ res := <-ch
+ v := res.Val
+ err := res.Err
+ if got, want := fmt.Sprintf("%v (%T)", v, v), "bar (string)"; got != want {
+ t.Errorf("Do = %v; want %v", got, want)
+ }
+ if err != nil {
+ t.Errorf("Do error = %v", err)
+ }
+}
+
+// Test singleflight behaves correctly after Do panic.
+// See https://github.com/golang/go/issues/41133
+func TestPanicDo(t *testing.T) {
+ var g Group
+ fn := func() (interface{}, error) {
+ panic("invalid memory address or nil pointer dereference")
+ }
+
+ const n = 5
+ waited := int32(n)
+ panicCount := int32(0)
+ done := make(chan struct{})
+ for i := 0; i < n; i++ {
+ go func() {
+ defer func() {
+ if err := recover(); err != nil {
+ t.Logf("Got panic: %v\n%s", err, debug.Stack())
+ atomic.AddInt32(&panicCount, 1)
+ }
+
+ if atomic.AddInt32(&waited, -1) == 0 {
+ close(done)
+ }
+ }()
+
+ g.Do("key", fn)
+ }()
+ }
+
+ select {
+ case <-done:
+ if panicCount != n {
+ t.Errorf("Expect %d panic, but got %d", n, panicCount)
+ }
+ case <-time.After(time.Second):
+ t.Fatalf("Do hangs")
+ }
+}
+
+func TestGoexitDo(t *testing.T) {
+ var g Group
+ fn := func() (interface{}, error) {
+ runtime.Goexit()
+ return nil, nil
+ }
+
+ const n = 5
+ waited := int32(n)
+ done := make(chan struct{})
+ for i := 0; i < n; i++ {
+ go func() {
+ var err error
defer func() {
+ if err != nil {
+ t.Errorf("Error should be nil, but got: %v", err)
+ }
+ if atomic.AddInt32(&waited, -1) == 0 {
+ close(done)
+ }
}()
- atomic.AddInt32(&secondStarted, 1)
- // Notify that we started
- secondCh <- struct{}{}
- // Wait other get above signal
- <-secondRunning
- <-secondCh
- atomic.AddInt32(&secondFinished, 1)
- return 2, nil
+ _, err, _ = g.Do("key", fn)
+ }()
+ }
+
+ select {
+ case <-done:
+ case <-time.After(time.Second):
+ t.Fatalf("Do hangs")
+ }
+}
+
+func TestPanicDoChan(t *testing.T) {
+ if runtime.GOOS == "js" {
+ t.Skipf("js does not support exec")
+ }
+
+ if os.Getenv("TEST_PANIC_DOCHAN") != "" {
+ defer func() {
+ recover()
+ }()
+
+ g := new(Group)
+ ch := g.DoChan("", func() (interface{}, error) {
+ panic("Panicking in DoChan")
})
- }()
+ <-ch
+ t.Fatalf("DoChan unexpectedly returned")
+ }
- close(firstCh)
- firstFinished.Wait() // wait for first execution (which should not affect execution after Forget)
+ t.Parallel()
- <-secondCh
- // Notify second that we got the signal that it started
- secondRunning <- struct{}{}
- if atomic.LoadInt32(&secondStarted) != 1 {
- t.Fatal("Second execution should be executed due to usage of forget")
+ cmd := exec.Command(os.Args[0], "-test.run="+t.Name(), "-test.v")
+ cmd.Env = append(os.Environ(), "TEST_PANIC_DOCHAN=1")
+ out := new(bytes.Buffer)
+ cmd.Stdout = out
+ cmd.Stderr = out
+ if err := cmd.Start(); err != nil {
+ t.Fatal(err)
}
- if atomic.LoadInt32(&secondFinished) == 1 {
- t.Fatal("Second execution should be still active")
+ err := cmd.Wait()
+ t.Logf("%s:\n%s", strings.Join(cmd.Args, " "), out)
+ if err == nil {
+ t.Errorf("Test subprocess passed; want a crash due to panic in DoChan")
}
+ if bytes.Contains(out.Bytes(), []byte("DoChan unexpectedly")) {
+ t.Errorf("Test subprocess failed with an unexpected failure mode.")
+ }
+ if !bytes.Contains(out.Bytes(), []byte("Panicking in DoChan")) {
+ t.Errorf("Test subprocess failed, but the crash isn't caused by panicking in DoChan")
+ }
+}
- close(secondCh)
- result, _, _ := g.Do("key", func() (i interface{}, e error) {
- atomic.AddInt32(&thirdStarted, 1)
- return 3, nil
- })
+func TestPanicDoSharedByDoChan(t *testing.T) {
+ if runtime.GOOS == "js" {
+ t.Skipf("js does not support exec")
+ }
+
+ if os.Getenv("TEST_PANIC_DOCHAN") != "" {
+ blocked := make(chan struct{})
+ unblock := make(chan struct{})
- if atomic.LoadInt32(&thirdStarted) != 0 {
- t.Error("Third call should not be started because was started during second execution")
+ g := new(Group)
+ go func() {
+ defer func() {
+ recover()
+ }()
+ g.Do("", func() (interface{}, error) {
+ close(blocked)
+ <-unblock
+ panic("Panicking in Do")
+ })
+ }()
+
+ <-blocked
+ ch := g.DoChan("", func() (interface{}, error) {
+ panic("DoChan unexpectedly executed callback")
+ })
+ close(unblock)
+ <-ch
+ t.Fatalf("DoChan unexpectedly returned")
+ }
+
+ t.Parallel()
+
+ cmd := exec.Command(os.Args[0], "-test.run="+t.Name(), "-test.v")
+ cmd.Env = append(os.Environ(), "TEST_PANIC_DOCHAN=1")
+ out := new(bytes.Buffer)
+ cmd.Stdout = out
+ cmd.Stderr = out
+ if err := cmd.Start(); err != nil {
+ t.Fatal(err)
+ }
+
+ err := cmd.Wait()
+ t.Logf("%s:\n%s", strings.Join(cmd.Args, " "), out)
+ if err == nil {
+ t.Errorf("Test subprocess passed; want a crash due to panic in Do shared by DoChan")
+ }
+ if bytes.Contains(out.Bytes(), []byte("DoChan unexpectedly")) {
+ t.Errorf("Test subprocess failed with an unexpected failure mode.")
}
- if result != 2 {
- t.Errorf("We should receive result produced by second call, expected: 2, got %d", result)
+ if !bytes.Contains(out.Bytes(), []byte("Panicking in Do")) {
+ t.Errorf("Test subprocess failed, but the crash isn't caused by panicking in Do")
}
}
diff --git a/syncmap/go19.go b/syncmap/go19.go
index 41a5909..fa04dba 100644
--- a/syncmap/go19.go
+++ b/syncmap/go19.go
@@ -2,6 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
+//go:build go1.9
// +build go1.9
package syncmap
diff --git a/syncmap/pre_go19.go b/syncmap/pre_go19.go
index 01a7be7..5bba413 100644
--- a/syncmap/pre_go19.go
+++ b/syncmap/pre_go19.go
@@ -2,6 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
+//go:build !go1.9
// +build !go1.9
package syncmap