2020-01-07 16:53:09 +05:30
// Copyright 2019 The Gitea Authors. All rights reserved.
2022-11-27 23:50:29 +05:30
// SPDX-License-Identifier: MIT
2020-01-07 16:53:09 +05:30
package queue
import (
"fmt"
2020-02-03 04:49:58 +05:30
"strings"
2020-01-07 16:53:09 +05:30
2021-07-24 21:33:58 +05:30
"code.gitea.io/gitea/modules/json"
2020-01-07 16:53:09 +05:30
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
)
func validType ( t string ) ( Type , error ) {
if len ( t ) == 0 {
return PersistableChannelQueueType , nil
}
for _ , typ := range RegisteredTypes ( ) {
if t == string ( typ ) {
return typ , nil
}
}
2022-01-25 04:24:35 +05:30
return PersistableChannelQueueType , fmt . Errorf ( "unknown queue type: %s defaulting to %s" , t , string ( PersistableChannelQueueType ) )
2020-01-07 16:53:09 +05:30
}
2020-01-29 06:31:06 +05:30
func getQueueSettings ( name string ) ( setting . QueueSettings , [ ] byte ) {
2020-01-07 16:53:09 +05:30
q := setting . GetQueueSettings ( name )
2020-10-16 03:10:03 +05:30
cfg , err := json . Marshal ( q )
2020-01-07 16:53:09 +05:30
if err != nil {
2020-10-16 03:10:03 +05:30
log . Error ( "Unable to marshall generic options: %v Error: %v" , q , err )
2020-01-07 16:53:09 +05:30
log . Error ( "Unable to create queue for %s" , name , err )
2020-01-29 06:31:06 +05:30
return q , [ ] byte { }
}
return q , cfg
}
// CreateQueue for name with provided handler and exemplar
func CreateQueue ( name string , handle HandlerFunc , exemplar interface { } ) Queue {
q , cfg := getQueueSettings ( name )
if len ( cfg ) == 0 {
2020-01-07 16:53:09 +05:30
return nil
}
2020-01-29 06:31:06 +05:30
typ , err := validType ( q . Type )
if err != nil {
log . Error ( "Invalid type %s provided for queue named %s defaulting to %s" , q . Type , name , string ( typ ) )
}
2020-01-07 16:53:09 +05:30
returnable , err := NewQueue ( typ , handle , cfg , exemplar )
if q . WrapIfNecessary && err != nil {
log . Warn ( "Unable to create queue for %s: %v" , name , err )
log . Warn ( "Attempting to create wrapped queue" )
returnable , err = NewQueue ( WrappedQueueType , handle , WrappedQueueConfiguration {
2020-01-29 06:31:06 +05:30
Underlying : typ ,
2020-01-07 16:53:09 +05:30
Timeout : q . Timeout ,
MaxAttempts : q . MaxAttempts ,
Config : cfg ,
2020-10-16 03:10:03 +05:30
QueueLength : q . QueueLength ,
2020-07-06 01:08:03 +05:30
Name : name ,
2020-01-07 16:53:09 +05:30
} , exemplar )
}
if err != nil {
log . Error ( "Unable to create queue for %s: %v" , name , err )
return nil
}
2022-01-23 02:52:14 +05:30
// Sanity check configuration
if q . Workers == 0 && ( q . BoostTimeout == 0 || q . BoostWorkers == 0 || q . MaxWorkers == 0 ) {
log . Warn ( "Queue: %s is configured to be non-scaling and have no workers\n - this configuration is likely incorrect and could cause Gitea to block" , q . Name )
if pausable , ok := returnable . ( Pausable ) ; ok {
log . Warn ( "Queue: %s is being paused to prevent data-loss, add workers manually and unpause." , q . Name )
pausable . Pause ( )
}
}
2020-01-07 16:53:09 +05:30
return returnable
}
2020-02-03 04:49:58 +05:30
// CreateUniqueQueue for name with provided handler and exemplar
func CreateUniqueQueue ( name string , handle HandlerFunc , exemplar interface { } ) UniqueQueue {
q , cfg := getQueueSettings ( name )
if len ( cfg ) == 0 {
return nil
}
2021-09-03 15:50:57 +05:30
if len ( q . Type ) > 0 && q . Type != "dummy" && q . Type != "immediate" && ! strings . HasPrefix ( q . Type , "unique-" ) {
2020-02-03 04:49:58 +05:30
q . Type = "unique-" + q . Type
}
typ , err := validType ( q . Type )
if err != nil || typ == PersistableChannelQueueType {
typ = PersistableChannelUniqueQueueType
if err != nil {
log . Error ( "Invalid type %s provided for queue named %s defaulting to %s" , q . Type , name , string ( typ ) )
}
}
returnable , err := NewQueue ( typ , handle , cfg , exemplar )
if q . WrapIfNecessary && err != nil {
log . Warn ( "Unable to create unique queue for %s: %v" , name , err )
log . Warn ( "Attempting to create wrapped queue" )
returnable , err = NewQueue ( WrappedUniqueQueueType , handle , WrappedUniqueQueueConfiguration {
Underlying : typ ,
Timeout : q . Timeout ,
MaxAttempts : q . MaxAttempts ,
Config : cfg ,
2020-10-16 03:10:03 +05:30
QueueLength : q . QueueLength ,
2020-02-03 04:49:58 +05:30
} , exemplar )
}
if err != nil {
log . Error ( "Unable to create unique queue for %s: %v" , name , err )
return nil
}
2022-01-23 02:52:14 +05:30
// Sanity check configuration
if q . Workers == 0 && ( q . BoostTimeout == 0 || q . BoostWorkers == 0 || q . MaxWorkers == 0 ) {
log . Warn ( "Queue: %s is configured to be non-scaling and have no workers\n - this configuration is likely incorrect and could cause Gitea to block" , q . Name )
if pausable , ok := returnable . ( Pausable ) ; ok {
log . Warn ( "Queue: %s is being paused to prevent data-loss, add workers manually and unpause." , q . Name )
pausable . Pause ( )
}
}
2020-02-03 04:49:58 +05:30
return returnable . ( UniqueQueue )
}