Attempt to prevent the deadlock in the QueueDiskChannel Test again (#18415)

* Attempt to prevent the deadlock in the QueueDiskChannel Test again

This time we're going to adjust the pause tests to only test the right
flag.

* Only switch off pushback once we know that we are not pushing anything else
* Ensure full redirection occurs
* More nicely handle a closed datachan
* And handle similar problems in queue_channel_test

Signed-off-by: Andrew Thornton <art27@cantab.net>
This commit is contained in:
zeripath 2022-01-29 11:37:08 +00:00 committed by GitHub
parent 726715fcfb
commit 92b715e0f2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 119 additions and 95 deletions

View file

@ -205,7 +205,10 @@ loop:
// tell the pool to shutdown. // tell the pool to shutdown.
q.baseCtxCancel() q.baseCtxCancel()
return return
case data := <-q.dataChan: case data, ok := <-q.dataChan:
if !ok {
return
}
if err := q.PushBack(data); err != nil { if err := q.PushBack(data); err != nil {
log.Error("Unable to push back data into queue %s", q.name) log.Error("Unable to push back data into queue %s", q.name)
} }

View file

@ -117,7 +117,10 @@ func (q *ChannelQueue) FlushWithContext(ctx context.Context) error {
select { select {
case <-paused: case <-paused:
return nil return nil
case data := <-q.dataChan: case data, ok := <-q.dataChan:
if !ok {
return nil
}
if unhandled := q.handle(data); unhandled != nil { if unhandled := q.handle(data); unhandled != nil {
log.Error("Unhandled Data whilst flushing queue %d", q.qid) log.Error("Unhandled Data whilst flushing queue %d", q.qid)
} }

View file

@ -9,6 +9,7 @@ import (
"testing" "testing"
"time" "time"
"code.gitea.io/gitea/modules/log"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
@ -111,7 +112,6 @@ func TestChannelQueue_Pause(t *testing.T) {
if pausable, ok := queue.(Pausable); ok { if pausable, ok := queue.(Pausable); ok {
pausable.Pause() pausable.Pause()
} }
pushBack = false
lock.Unlock() lock.Unlock()
return data return data
} }
@ -123,7 +123,9 @@ func TestChannelQueue_Pause(t *testing.T) {
} }
return nil return nil
} }
nilFn := func(_ func()) {}
queueShutdown := []func(){}
queueTerminate := []func(){}
queue, err = NewChannelQueue(handle, queue, err = NewChannelQueue(handle,
ChannelQueueConfiguration{ ChannelQueueConfiguration{
@ -139,7 +141,34 @@ func TestChannelQueue_Pause(t *testing.T) {
}, &testData{}) }, &testData{})
assert.NoError(t, err) assert.NoError(t, err)
go queue.Run(nilFn, nilFn) go queue.Run(func(shutdown func()) {
lock.Lock()
defer lock.Unlock()
queueShutdown = append(queueShutdown, shutdown)
}, func(terminate func()) {
lock.Lock()
defer lock.Unlock()
queueTerminate = append(queueTerminate, terminate)
})
// Shutdown and Terminate in defer
defer func() {
lock.Lock()
callbacks := make([]func(), len(queueShutdown))
copy(callbacks, queueShutdown)
lock.Unlock()
for _, callback := range callbacks {
callback()
}
lock.Lock()
log.Info("Finally terminating")
callbacks = make([]func(), len(queueTerminate))
copy(callbacks, queueTerminate)
lock.Unlock()
for _, callback := range callbacks {
callback()
}
}()
test1 := testData{"A", 1} test1 := testData{"A", 1}
test2 := testData{"B", 2} test2 := testData{"B", 2}
@ -155,14 +184,11 @@ func TestChannelQueue_Pause(t *testing.T) {
pausable.Pause() pausable.Pause()
paused, resumed := pausable.IsPausedIsResumed() paused, _ := pausable.IsPausedIsResumed()
select { select {
case <-paused: case <-paused:
case <-resumed: case <-time.After(100 * time.Millisecond):
assert.Fail(t, "Queue should not be resumed")
return
default:
assert.Fail(t, "Queue is not paused") assert.Fail(t, "Queue is not paused")
return return
} }
@ -179,10 +205,11 @@ func TestChannelQueue_Pause(t *testing.T) {
assert.Nil(t, result2) assert.Nil(t, result2)
pausable.Resume() pausable.Resume()
_, resumed := pausable.IsPausedIsResumed()
select { select {
case <-resumed: case <-resumed:
default: case <-time.After(100 * time.Millisecond):
assert.Fail(t, "Queue should be resumed") assert.Fail(t, "Queue should be resumed")
} }
@ -199,47 +226,47 @@ func TestChannelQueue_Pause(t *testing.T) {
pushBack = true pushBack = true
lock.Unlock() lock.Unlock()
paused, resumed = pausable.IsPausedIsResumed() _, resumed = pausable.IsPausedIsResumed()
select { select {
case <-paused:
assert.Fail(t, "Queue should not be paused")
return
case <-resumed: case <-resumed:
default: case <-time.After(100 * time.Millisecond):
assert.Fail(t, "Queue is not resumed") assert.Fail(t, "Queue is not resumed")
return return
} }
queue.Push(&test1) queue.Push(&test1)
paused, _ = pausable.IsPausedIsResumed()
select { select {
case <-paused: case <-paused:
case <-handleChan: case <-handleChan:
assert.Fail(t, "handler chan should not contain test1") assert.Fail(t, "handler chan should not contain test1")
return return
case <-time.After(500 * time.Millisecond): case <-time.After(100 * time.Millisecond):
assert.Fail(t, "queue should be paused") assert.Fail(t, "queue should be paused")
return return
} }
paused, resumed = pausable.IsPausedIsResumed() lock.Lock()
pushBack = false
lock.Unlock()
paused, _ = pausable.IsPausedIsResumed()
select { select {
case <-paused: case <-paused:
case <-resumed: case <-time.After(100 * time.Millisecond):
assert.Fail(t, "Queue should not be resumed")
return
default:
assert.Fail(t, "Queue is not paused") assert.Fail(t, "Queue is not paused")
return return
} }
pausable.Resume() pausable.Resume()
_, resumed = pausable.IsPausedIsResumed()
select { select {
case <-resumed: case <-resumed:
default: case <-time.After(100 * time.Millisecond):
assert.Fail(t, "Queue should be resumed") assert.Fail(t, "Queue should be resumed")
} }

View file

@ -313,14 +313,13 @@ func (q *PersistableChannelQueue) Shutdown() {
q.channelQueue.Wait() q.channelQueue.Wait()
q.internal.(*LevelQueue).Wait() q.internal.(*LevelQueue).Wait()
// Redirect all remaining data in the chan to the internal channel // Redirect all remaining data in the chan to the internal channel
go func() { log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name) close(q.channelQueue.dataChan)
for data := range q.channelQueue.dataChan { for data := range q.channelQueue.dataChan {
_ = q.internal.Push(data) _ = q.internal.Push(data)
atomic.AddInt64(&q.channelQueue.numInQueue, -1) atomic.AddInt64(&q.channelQueue.numInQueue, -1)
} }
log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name) log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
}()
log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name) log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name)
} }

View file

@ -207,7 +207,6 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
log.Info("pausing") log.Info("pausing")
pausable.Pause() pausable.Pause()
} }
pushBack = false
lock.Unlock() lock.Unlock()
return data return data
} }
@ -248,6 +247,25 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
queueTerminate = append(queueTerminate, terminate) queueTerminate = append(queueTerminate, terminate)
}) })
// Shutdown and Terminate in defer
defer func() {
lock.Lock()
callbacks := make([]func(), len(queueShutdown))
copy(callbacks, queueShutdown)
lock.Unlock()
for _, callback := range callbacks {
callback()
}
lock.Lock()
log.Info("Finally terminating")
callbacks = make([]func(), len(queueTerminate))
copy(callbacks, queueTerminate)
lock.Unlock()
for _, callback := range callbacks {
callback()
}
}()
test1 := testData{"A", 1} test1 := testData{"A", 1}
test2 := testData{"B", 2} test2 := testData{"B", 2}
@ -263,14 +281,11 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
assert.Equal(t, test1.TestInt, result1.TestInt) assert.Equal(t, test1.TestInt, result1.TestInt)
pausable.Pause() pausable.Pause()
paused, resumed := pausable.IsPausedIsResumed() paused, _ := pausable.IsPausedIsResumed()
select { select {
case <-paused: case <-paused:
case <-resumed: case <-time.After(100 * time.Millisecond):
assert.Fail(t, "Queue should not be resumed")
return
default:
assert.Fail(t, "Queue is not paused") assert.Fail(t, "Queue is not paused")
return return
} }
@ -287,14 +302,11 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
assert.Nil(t, result2) assert.Nil(t, result2)
pausable.Resume() pausable.Resume()
paused, resumed = pausable.IsPausedIsResumed() _, resumed := pausable.IsPausedIsResumed()
select { select {
case <-paused:
assert.Fail(t, "Queue should be resumed")
return
case <-resumed: case <-resumed:
default: case <-time.After(100 * time.Millisecond):
assert.Fail(t, "Queue should be resumed") assert.Fail(t, "Queue should be resumed")
return return
} }
@ -308,24 +320,27 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
assert.Equal(t, test2.TestString, result2.TestString) assert.Equal(t, test2.TestString, result2.TestString)
assert.Equal(t, test2.TestInt, result2.TestInt) assert.Equal(t, test2.TestInt, result2.TestInt)
// Set pushBack to so that the next handle will result in a Pause
lock.Lock() lock.Lock()
pushBack = true pushBack = true
lock.Unlock() lock.Unlock()
paused, resumed = pausable.IsPausedIsResumed() // Ensure that we're still resumed
_, resumed = pausable.IsPausedIsResumed()
select { select {
case <-paused:
assert.Fail(t, "Queue should not be paused")
return
case <-resumed: case <-resumed:
default: case <-time.After(100 * time.Millisecond):
assert.Fail(t, "Queue is not resumed") assert.Fail(t, "Queue is not resumed")
return return
} }
// push test1
queue.Push(&test1) queue.Push(&test1)
// Now as this is handled it should pause
paused, _ = pausable.IsPausedIsResumed()
select { select {
case <-paused: case <-paused:
case <-handleChan: case <-handleChan:
@ -336,27 +351,16 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
return return
} }
paused, resumed = pausable.IsPausedIsResumed() lock.Lock()
pushBack = false
select { lock.Unlock()
case <-paused:
case <-resumed:
assert.Fail(t, "Queue should not be resumed")
return
default:
assert.Fail(t, "Queue is not paused")
return
}
pausable.Resume() pausable.Resume()
paused, resumed = pausable.IsPausedIsResumed() _, resumed = pausable.IsPausedIsResumed()
select { select {
case <-paused:
assert.Fail(t, "Queue should not be paused")
return
case <-resumed: case <-resumed:
default: case <-time.After(500 * time.Millisecond):
assert.Fail(t, "Queue should be resumed") assert.Fail(t, "Queue should be resumed")
return return
} }
@ -373,6 +377,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
lock.Lock() lock.Lock()
callbacks := make([]func(), len(queueShutdown)) callbacks := make([]func(), len(queueShutdown))
copy(callbacks, queueShutdown) copy(callbacks, queueShutdown)
queueShutdown = queueShutdown[:0]
lock.Unlock() lock.Unlock()
// Now shutdown the queue // Now shutdown the queue
for _, callback := range callbacks { for _, callback := range callbacks {
@ -402,6 +407,7 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
lock.Lock() lock.Lock()
callbacks = make([]func(), len(queueTerminate)) callbacks = make([]func(), len(queueTerminate))
copy(callbacks, queueTerminate) copy(callbacks, queueTerminate)
queueShutdown = queueTerminate[:0]
lock.Unlock() lock.Unlock()
for _, callback := range callbacks { for _, callback := range callbacks {
callback() callback()
@ -453,14 +459,11 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
case <-paused: case <-paused:
} }
paused, resumed = pausable.IsPausedIsResumed() paused, _ = pausable.IsPausedIsResumed()
select { select {
case <-paused: case <-paused:
case <-resumed: case <-time.After(500 * time.Millisecond):
assert.Fail(t, "Queue should not be resumed")
return
default:
assert.Fail(t, "Queue is not paused") assert.Fail(t, "Queue is not paused")
return return
} }
@ -472,14 +475,15 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
default: default:
} }
lock.Lock()
pushBack = false
lock.Unlock()
pausable.Resume() pausable.Resume()
paused, resumed = pausable.IsPausedIsResumed() _, resumed = pausable.IsPausedIsResumed()
select { select {
case <-paused:
assert.Fail(t, "Queue should not be paused")
return
case <-resumed: case <-resumed:
default: case <-time.After(500 * time.Millisecond):
assert.Fail(t, "Queue should be resumed") assert.Fail(t, "Queue should be resumed")
return return
} }
@ -506,18 +510,4 @@ func TestPersistableChannelQueue_Pause(t *testing.T) {
assert.Equal(t, test2.TestString, result4.TestString) assert.Equal(t, test2.TestString, result4.TestString)
assert.Equal(t, test2.TestInt, result4.TestInt) assert.Equal(t, test2.TestInt, result4.TestInt)
lock.Lock()
callbacks = make([]func(), len(queueShutdown))
copy(callbacks, queueShutdown)
lock.Unlock()
for _, callback := range callbacks {
callback()
}
lock.Lock()
callbacks = make([]func(), len(queueTerminate))
copy(callbacks, queueTerminate)
lock.Unlock()
for _, callback := range callbacks {
callback()
}
} }

View file

@ -178,7 +178,10 @@ func (q *ChannelUniqueQueue) FlushWithContext(ctx context.Context) error {
default: default:
} }
select { select {
case data := <-q.dataChan: case data, ok := <-q.dataChan:
if !ok {
return nil
}
if unhandled := q.handle(data); unhandled != nil { if unhandled := q.handle(data); unhandled != nil {
log.Error("Unhandled Data whilst flushing queue %d", q.qid) log.Error("Unhandled Data whilst flushing queue %d", q.qid)
} }

View file

@ -282,13 +282,12 @@ func (q *PersistableChannelUniqueQueue) Shutdown() {
q.channelQueue.Wait() q.channelQueue.Wait()
q.internal.(*LevelUniqueQueue).Wait() q.internal.(*LevelUniqueQueue).Wait()
// Redirect all remaining data in the chan to the internal channel // Redirect all remaining data in the chan to the internal channel
go func() { close(q.channelQueue.dataChan)
log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name) log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name)
for data := range q.channelQueue.dataChan { for data := range q.channelQueue.dataChan {
_ = q.internal.Push(data) _ = q.internal.Push(data)
} }
log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name) log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
}()
log.Debug("PersistableChannelUniqueQueue: %s Shutdown", q.delayedStarter.name) log.Debug("PersistableChannelUniqueQueue: %s Shutdown", q.delayedStarter.name)
} }