package ds_utils // A combination of worker pool and a queue import ( "context" "sync" "time" common "nettools/common" ) type WorkersCount struct { Lim int32 Busy int32 } type IWorkerPool interface { Exec(task common.Task) QueueLen() int WorkersCount() WorkersCount SetWorkersCountLim(newSize int32) } // Just a worker pool with queue before it // Thread-safe type WorkerPool struct { TaskQueue []common.Task Mu *sync.RWMutex CountLim int32 CountBusy int32 TaskDeadline time.Duration } func NewWorkerPool(workersCountLim int32, taskDeadline time.Duration) *WorkerPool { return &WorkerPool{ TaskQueue: []common.Task{}, Mu: &sync.RWMutex{}, CountLim: workersCountLim, CountBusy: 0, TaskDeadline: taskDeadline, } } func (wpq *WorkerPool) Exec(task common.Task) { wpq.Mu.Lock() if wpq.CountBusy >= wpq.CountLim { wpq.TaskQueue = append(wpq.TaskQueue, task) return } wpq.CountBusy++ wpq.Mu.Unlock() for { var cancel context.CancelFunc var ctx context.Context if wpq.TaskDeadline != 0 { ctx = context.Background() ctx, cancel = context.WithTimeout(ctx, wpq.TaskDeadline) defer cancel() } task(ctx) wpq.Mu.Lock() if len(wpq.TaskQueue) < 1 { wpq.CountBusy-- wpq.Mu.Unlock() return } task = wpq.TaskQueue[0] wpq.TaskQueue = wpq.TaskQueue[1:] wpq.Mu.Unlock() } } func (wpq *WorkerPool) QueueLen() int { wpq.Mu.RLock() defer wpq.Mu.RUnlock() return len(wpq.TaskQueue) } func (wpq *WorkerPool) WorkersCount() WorkersCount { wpq.Mu.RLock() defer wpq.Mu.RUnlock() return WorkersCount{ Lim: wpq.CountLim, Busy: wpq.CountBusy, } } func (wpq *WorkerPool) SetWorkersCountLim(newSize int32) { wpq.Mu.Lock() wpq.CountLim = newSize wpq.Mu.Unlock() }