micw/server/core/blocks_commiter.go

85 lines
1.9 KiB
Go

package core
import (
"context"
"github.com/matchsystems/werr"
"mic-wallet/server/repository"
"sync"
"time"
)
type CommitBlockFunc func(ctx context.Context, block *Block) error
type IBlocksCommiter interface {
// CommitBlock pushes blocks into the storage (db).
// Block must be not null
CommitBlock(ctx context.Context, block *Block)
}
type BlocksCommiter struct {
BlocksQueue []*Block
Lock *sync.RWMutex
CountLim int32
CountBusy int32
TaskDeadline time.Duration
ErrChan chan error
CommitBlockFunc CommitBlockFunc
}
func NewBlocksCommiter(repo *repository.Repository, errChan chan error, workersCountLim int) *BlocksCommiter {
return &BlocksCommiter{
BlocksQueue: make([]*Block, 0),
Lock: &sync.RWMutex{},
CountLim: 0,
CountBusy: 0,
TaskDeadline: 0,
ErrChan: make(chan error, workersCountLim),
CommitBlockFunc: func(ctx context.Context, block *Block) error {
return CommitBlock(ctx, repo, block)
},
}
}
func (bc *BlocksCommiter) execute(ctx context.Context, block *Block) {
if bc.TaskDeadline > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, bc.TaskDeadline)
defer cancel()
}
errChan := make(chan error)
go func() {
if err := bc.CommitBlockFunc(ctx, block); err != nil {
errChan <- err
}
}()
select {
case <-ctx.Done():
errChan <- werr.Wrapf(ctx.Err(), "ctx error while commiting block %d", block.ID)
case err := <-errChan:
bc.ErrChan <- werr.Wrapf(err, "error while commiting block %d", block.ID)
}
}
func (bc *BlocksCommiter) CommitBlock(ctx context.Context, block *Block) {
bc.Lock.Lock()
if bc.CountBusy >= bc.CountLim {
bc.BlocksQueue = append(bc.BlocksQueue, block)
return
}
bc.CountBusy++
bc.Lock.Unlock()
for {
bc.execute(ctx, block)
bc.Lock.Lock()
if len(bc.BlocksQueue) < 1 {
bc.CountBusy--
bc.Lock.Unlock()
return
}
block = bc.BlocksQueue[0]
bc.BlocksQueue = bc.BlocksQueue[1:]
bc.Lock.Unlock()
}
}