package dbUtils import ( "context" "errors" "io" "sync" common "git.mic.pp.ua/anderson/nettools/common" ) var ( ErrPipeClosed = errors.New("pipe is closed") ErrPipeFull = errors.New("pipe is full") ) // IPipe provides Pipe Interface. Pipe is a data structure // in which u can write and from which u can read chunks // of data. Almost like buffer type IPipe interface { io.Closer Write(ctx context.Context, data ...[]byte) (n int, err error) WaitWrite(ctx context.Context, data ...[]byte) (n int, err error) Read(ctx context.Context) (data [][]byte, isOpen bool, err error) WaitRead(ctx context.Context) (data [][]byte, isOpen bool, err error) Cap() int Grow(n int) Len() int Reset() } // Pipe implements IPipe interface type Pipe struct { IPipe data [][]byte cap int signalWritten *sync.Cond signalRed *sync.Cond lock *sync.RWMutex isClosed bool closeChan chan struct{} } // New creates new Pipe and returns pointer to it // Set sizeLimit = -1 for no limit // discard func New(sizeLimit int) *Pipe { return &Pipe{ data: [][]byte{}, cap: sizeLimit, signalWritten: sync.NewCond(new(sync.Mutex)), signalRed: sync.NewCond(new(sync.Mutex)), lock: &sync.RWMutex{}, isClosed: false, } } // ========================= // Write func (p *Pipe) write(data [][]byte) (overflow [][]byte, err error) { if p.isClosed { return data, ErrPipeClosed } if p.data == nil { p.data = make([][]byte, 0, p.cap) if len(data) > p.cap { copy(p.data, data[:p.cap]) return data[p.cap:], ErrPipeFull } copy(p.data, data) return [][]byte{}, nil } spaceLeft := p.cap - len(p.data) if spaceLeft < 0 { return [][]byte{}, ErrPipeClosed } if spaceLeft < len(data) { err = ErrPipeFull overflow = make([][]byte, len(data)-spaceLeft) copy(overflow, data[spaceLeft:]) data = data[:spaceLeft] } p.data = append(p.data, data...) return overflow, err } // WaitWrite waits for pipe too free if there isn't enough space and then // saves chunks into the pipe and returns the overflow. // May return ErrPipeClosed and ErrPipeFull as an error func (p *Pipe) WaitWrite(ctx context.Context, data ...[]byte) (overflow [][]byte, err error) { if data == nil { return nil, nil } return overflow, common.ExecTask(ctx, func(ctx context.Context) error { p.lock.Lock() for len(data) < p.cap && len(data)+len(p.data) > p.cap { p.lock.Unlock() p.signalRed.Wait() p.lock.Lock() break } overflow, err = p.write(data) p.lock.Unlock() p.signalWritten.Signal() return err }) } // Write saves chunks into the pipe and returns the overflow. // May return ErrPipeClosed and ErrPipeFull as an error func (p *Pipe) Write(ctx context.Context, data ...[]byte) (overflow [][]byte, err error) { if data == nil { return nil, nil } return overflow, common.ExecTask(ctx, func(ctx context.Context) error { p.lock.Lock() overflow, err = p.write(data) p.lock.Unlock() p.signalWritten.Signal() return err }) } // ========================= // Read // waitRead is an internal function that reads and waits for data, // but it doesn't handle context cancel or closing pipe // Extended with WaitRead function func (p *Pipe) waitRead() (data [][]byte, isOpen bool, err error) { p.lock.RLock() for len(p.data) == 0 { if p.isClosed { return p.data, false, io.EOF } p.lock.RUnlock() p.signalWritten.Wait() p.lock.RLock() } isOpen = !p.isClosed data = make([][]byte, len(p.data)) copy(data, p.data) p.data = nil p.lock.RUnlock() p.signalRed.Signal() return p.data, isOpen, nil } // WaitRead // when pipe is empty -- waits for some data to be // written. Then reads all the chunks from the // pipe and resets pipe data after reading. // Also returns if pipe is open and // may return io.EOF as an error if there is no data // and the pipe was closed. Also, may return ErrPipeClosed // if pipe is closed during reading and there is no data in the pipe func (p *Pipe) WaitRead(ctx context.Context) (data [][]byte, isOpen bool, err error) { errChan := make(chan error) go func() { data, isOpen, err = p.waitRead() if err != nil { errChan <- err } close(errChan) }() for { select { case <-ctx.Done(): return data, isOpen, ctx.Err() case err := <-errChan: return data, isOpen, err case <-p.closeChan: p.lock.RLock() if len(p.data) == 0 { p.lock.RUnlock() return data, isOpen, ErrPipeClosed } p.lock.RUnlock() } } } // Read reads all the chunks from the pipe // and resets pipe data after reading. // Also returns if pipe is open and // may return io.EOF as an error if there is no data // and the pipe was closed func (p *Pipe) Read(ctx context.Context) (data [][]byte, isOpen bool, err error) { return data, isOpen, common.ExecTask(ctx, func(ctx context.Context) error { p.lock.RLock() isOpen = !p.isClosed if len(p.data) == 0 { return io.EOF } data = make([][]byte, len(p.data)) copy(data, p.data) p.data = nil p.lock.RUnlock() p.signalRed.Signal() return nil }) } // ========================= // Boring boilerplate bellow func (p *Pipe) Cap() int { p.lock.RLock() defer p.lock.RUnlock() return p.cap } func (p *Pipe) Len() int { p.lock.RLock() defer p.lock.RUnlock() return len(p.data) } func (p *Pipe) LenCap() (length int, capacity int) { p.lock.RLock() defer p.lock.RUnlock() return len(p.data), p.cap } func (p *Pipe) Grow(n int) { p.lock.Lock() p.cap += n p.lock.Unlock() } func (p *Pipe) Reset() { p.lock.Lock() p.data = [][]byte{} p.lock.Unlock() p.signalRed.Signal() } func (p *Pipe) Close() error { p.lock.Lock() p.isClosed = true p.lock.Unlock() close(p.closeChan) return nil } func (p *Pipe) GetData() *[][]byte { return &p.data }