nettools/data_structures/worker_pool.go

94 lines
1.8 KiB
Go

package ds_utils
// A combination of worker pool and a queue
import (
"context"
"sync"
"time"
common "git.mic.pp.ua/anderson/nettools/common"
)
type WorkersCount struct {
Lim int32
Busy int32
}
type IWorkerPool interface {
Exec(task common.Task)
QueueLen() int
WorkersCount() WorkersCount
SetWorkersCountLim(newSize int32)
}
// Just a worker pool with queue before it
// Thread-safe
type WorkerPool struct {
TaskQueue []common.Task
Mu *sync.RWMutex
CountLim int32
CountBusy int32
TaskDeadline time.Duration
}
func NewWorkerPool(workersCountLim int32, taskDeadline time.Duration) *WorkerPool {
return &WorkerPool{
TaskQueue: []common.Task{},
Mu: &sync.RWMutex{},
CountLim: workersCountLim,
CountBusy: 0,
TaskDeadline: taskDeadline,
}
}
func (wpq *WorkerPool) Exec(task common.Task) {
wpq.Mu.Lock()
if wpq.CountBusy >= wpq.CountLim {
wpq.TaskQueue = append(wpq.TaskQueue, task)
return
}
wpq.CountBusy++
wpq.Mu.Unlock()
for {
var cancel context.CancelFunc
var ctx context.Context
if wpq.TaskDeadline != 0 {
ctx = context.Background()
ctx, cancel = context.WithTimeout(ctx, wpq.TaskDeadline)
defer cancel()
}
task(ctx)
wpq.Mu.Lock()
if len(wpq.TaskQueue) < 1 {
wpq.CountBusy--
wpq.Mu.Unlock()
return
}
task = wpq.TaskQueue[0]
wpq.TaskQueue = wpq.TaskQueue[1:]
wpq.Mu.Unlock()
}
}
func (wpq *WorkerPool) QueueLen() int {
wpq.Mu.RLock()
defer wpq.Mu.RUnlock()
return len(wpq.TaskQueue)
}
func (wpq *WorkerPool) WorkersCount() WorkersCount {
wpq.Mu.RLock()
defer wpq.Mu.RUnlock()
return WorkersCount{
Lim: wpq.CountLim,
Busy: wpq.CountBusy,
}
}
func (wpq *WorkerPool) SetWorkersCountLim(newSize int32) {
wpq.Mu.Lock()
wpq.CountLim = newSize
wpq.Mu.Unlock()
}