package core import ( "context" "github.com/matchsystems/werr" "mic-wallet/server/repository" "sync" "time" ) type CommitBlockFunc func(ctx context.Context, block *Block) error type IBlocksCommiter interface { // CommitBlock pushes blocks into the storage (db). // Block must be not null CommitBlock(ctx context.Context, block *Block) } type BlocksCommiter struct { BlocksQueue []*Block Lock *sync.RWMutex CountLim int CountBusy int TaskDeadline time.Duration ErrChan chan error CommitBlockFunc CommitBlockFunc } func NewBlocksCommiter(repo *repository.Repository, errChan chan error, workersCountLim int) *BlocksCommiter { return &BlocksCommiter{ BlocksQueue: make([]*Block, 0), Lock: &sync.RWMutex{}, CountLim: workersCountLim, CountBusy: 0, TaskDeadline: 0, ErrChan: errChan, CommitBlockFunc: func(ctx context.Context, block *Block) error { return CommitBlock(ctx, repo, block) }, } } // execute executes commiting block with deadline and cancelable by the context // provided when calling CommitBlock. // all the errors from it (including context) are sent to BlocksCommiter.ErrChan func (bc *BlocksCommiter) execute(ctx context.Context, block *Block) { if bc.TaskDeadline > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, bc.TaskDeadline) defer cancel() } errChan := make(chan error) go func() { if err := bc.CommitBlockFunc(ctx, block); err != nil { errChan <- err } }() select { case <-ctx.Done(): bc.ErrChan <- werr.Wrapf(ctx.Err(), "ctx error while commiting block %d", block.Id) case err := <-errChan: bc.ErrChan <- werr.Wrapf(err, "error while commiting block %d", block.Id) } } func (bc *BlocksCommiter) CommitBlock(ctx context.Context, block *Block) { bc.Lock.Lock() if bc.CountBusy >= bc.CountLim { bc.BlocksQueue = append(bc.BlocksQueue, block) return } bc.CountBusy++ bc.Lock.Unlock() for { bc.execute(ctx, block) bc.Lock.Lock() if len(bc.BlocksQueue) < 1 { bc.CountBusy-- bc.Lock.Unlock() return } block = bc.BlocksQueue[0] bc.BlocksQueue = bc.BlocksQueue[1:] bc.Lock.Unlock() } }