85 lines
1.9 KiB
Go
85 lines
1.9 KiB
Go
|
package core
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"github.com/matchsystems/werr"
|
||
|
"mic-wallet/server/repository"
|
||
|
"sync"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
type CommitBlockFunc func(ctx context.Context, block *Block) error
|
||
|
|
||
|
type IBlocksCommiter interface {
|
||
|
// CommitBlock pushes blocks into the storage (db).
|
||
|
// Block must be not null
|
||
|
CommitBlock(ctx context.Context, block *Block)
|
||
|
}
|
||
|
|
||
|
type BlocksCommiter struct {
|
||
|
BlocksQueue []*Block
|
||
|
Lock *sync.RWMutex
|
||
|
CountLim int32
|
||
|
CountBusy int32
|
||
|
TaskDeadline time.Duration
|
||
|
ErrChan chan error
|
||
|
|
||
|
CommitBlockFunc CommitBlockFunc
|
||
|
}
|
||
|
|
||
|
func NewBlocksCommiter(repo *repository.Repository, errChan chan error, workersCountLim int) *BlocksCommiter {
|
||
|
return &BlocksCommiter{
|
||
|
BlocksQueue: make([]*Block, 0),
|
||
|
Lock: &sync.RWMutex{},
|
||
|
CountLim: 0,
|
||
|
CountBusy: 0,
|
||
|
TaskDeadline: 0,
|
||
|
ErrChan: make(chan error, workersCountLim),
|
||
|
CommitBlockFunc: func(ctx context.Context, block *Block) error {
|
||
|
return CommitBlock(ctx, repo, block)
|
||
|
},
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (bc *BlocksCommiter) execute(ctx context.Context, block *Block) {
|
||
|
if bc.TaskDeadline > 0 {
|
||
|
var cancel context.CancelFunc
|
||
|
ctx, cancel = context.WithTimeout(ctx, bc.TaskDeadline)
|
||
|
defer cancel()
|
||
|
}
|
||
|
errChan := make(chan error)
|
||
|
go func() {
|
||
|
if err := bc.CommitBlockFunc(ctx, block); err != nil {
|
||
|
errChan <- err
|
||
|
}
|
||
|
}()
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
errChan <- werr.Wrapf(ctx.Err(), "ctx error while commiting block %d", block.ID)
|
||
|
case err := <-errChan:
|
||
|
bc.ErrChan <- werr.Wrapf(err, "error while commiting block %d", block.ID)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (bc *BlocksCommiter) CommitBlock(ctx context.Context, block *Block) {
|
||
|
bc.Lock.Lock()
|
||
|
if bc.CountBusy >= bc.CountLim {
|
||
|
bc.BlocksQueue = append(bc.BlocksQueue, block)
|
||
|
return
|
||
|
}
|
||
|
bc.CountBusy++
|
||
|
bc.Lock.Unlock()
|
||
|
for {
|
||
|
bc.execute(ctx, block)
|
||
|
bc.Lock.Lock()
|
||
|
if len(bc.BlocksQueue) < 1 {
|
||
|
bc.CountBusy--
|
||
|
bc.Lock.Unlock()
|
||
|
return
|
||
|
}
|
||
|
block = bc.BlocksQueue[0]
|
||
|
bc.BlocksQueue = bc.BlocksQueue[1:]
|
||
|
bc.Lock.Unlock()
|
||
|
}
|
||
|
}
|