nettools/data_structures/pipe.go
2024-11-12 09:10:09 +01:00

257 lines
5.6 KiB
Go

package ds_utils
import (
"context"
"errors"
"io"
"sync"
common "nettools/common"
)
var (
ErrPipeClosed = errors.New("pipe is closed")
ErrPipeFull = errors.New("pipe is full")
)
// IPipe provides Pipe Interface. Pipe is a data structure
// in which u can write and from which u can read chunks
// of data. Almost like buffer
type IPipe interface {
io.Closer
Write(ctx context.Context, data ...[]byte) (n int, err error)
WaitWrite(ctx context.Context, data ...[]byte) (n int, err error)
Read(ctx context.Context) (data [][]byte, isOpen bool, err error)
WaitRead(ctx context.Context) (data [][]byte, isOpen bool, err error)
Cap() int
Grow(n int)
Len() int
Reset()
}
// Pipe implements IPipe interface
type Pipe struct {
IPipe
data [][]byte
cap int
signalWritten *sync.Cond
signalRed *sync.Cond
lock *sync.RWMutex
isClosed bool
closeChan chan struct{}
}
// New creates new Pipe and returns pointer to it
// Set sizeLimit = -1 for no limit
// discard
func New(sizeLimit int) *Pipe {
return &Pipe{
data: [][]byte{},
cap: sizeLimit,
signalWritten: sync.NewCond(new(sync.Mutex)),
signalRed: sync.NewCond(new(sync.Mutex)),
lock: &sync.RWMutex{},
isClosed: false,
}
}
// =========================
// Write
func (p *Pipe) write(data [][]byte) (overflow [][]byte, err error) {
if p.isClosed {
return data, ErrPipeClosed
}
if p.data == nil {
p.data = make([][]byte, 0, p.cap)
if len(data) > p.cap {
copy(p.data, data[:p.cap])
return data[p.cap:], ErrPipeFull
}
copy(p.data, data)
return [][]byte{}, nil
}
spaceLeft := p.cap - len(p.data)
if spaceLeft < 0 {
return [][]byte{}, ErrPipeClosed
}
if spaceLeft < len(data) {
err = ErrPipeFull
overflow = make([][]byte, len(data)-spaceLeft)
copy(overflow, data[spaceLeft:])
data = data[:spaceLeft]
}
p.data = append(p.data, data...)
return overflow, err
}
// WaitWrite waits for pipe too free if there isn't enough space and then
// saves chunks into the pipe and returns the overflow.
// May return ErrPipeClosed and ErrPipeFull as an error
func (p *Pipe) WaitWrite(ctx context.Context, data ...[]byte) (overflow [][]byte, err error) {
if data == nil {
return nil, nil
}
return overflow, common.ExecTask(ctx, func(ctx context.Context) error {
p.lock.Lock()
for len(data) < p.cap && len(data)+len(p.data) > p.cap {
p.lock.Unlock()
p.signalRed.Wait()
p.lock.Lock()
break
}
overflow, err = p.write(data)
p.lock.Unlock()
p.signalWritten.Signal()
return err
})
}
// Write saves chunks into the pipe and returns the overflow.
// May return ErrPipeClosed and ErrPipeFull as an error
func (p *Pipe) Write(ctx context.Context, data ...[]byte) (overflow [][]byte, err error) {
if data == nil {
return nil, nil
}
return overflow, common.ExecTask(ctx, func(ctx context.Context) error {
p.lock.Lock()
overflow, err = p.write(data)
p.lock.Unlock()
p.signalWritten.Signal()
return err
})
}
// =========================
// Read
// waitRead is an internal function that reads and waits for data,
// but it doesn't handle context cancel or closing pipe
// Extended with WaitRead function
func (p *Pipe) waitRead() (data [][]byte, isOpen bool, err error) {
p.lock.RLock()
for len(p.data) == 0 {
if p.isClosed {
return p.data, false, io.EOF
}
p.lock.RUnlock()
p.signalWritten.Wait()
p.lock.RLock()
}
isOpen = !p.isClosed
data = make([][]byte, len(p.data))
copy(data, p.data)
p.data = nil
p.lock.RUnlock()
p.signalRed.Signal()
return p.data, isOpen, nil
}
// WaitRead
// when pipe is empty -- waits for some data to be
// written. Then reads all the chunks from the
// pipe and resets pipe data after reading.
// Also returns if pipe is open and
// may return io.EOF as an error if there is no data
// and the pipe was closed. Also, may return ErrPipeClosed
// if pipe is closed during reading and there is no data in the pipe
func (p *Pipe) WaitRead(ctx context.Context) (data [][]byte, isOpen bool, err error) {
errChan := make(chan error)
go func() {
data, isOpen, err = p.waitRead()
if err != nil {
errChan <- err
}
close(errChan)
}()
for {
select {
case <-ctx.Done():
return data, isOpen, ctx.Err()
case err := <-errChan:
return data, isOpen, err
case <-p.closeChan:
p.lock.RLock()
if len(p.data) == 0 {
p.lock.RUnlock()
return data, isOpen, ErrPipeClosed
}
p.lock.RUnlock()
}
}
}
// Read reads all the chunks from the pipe
// and resets pipe data after reading.
// Also returns if pipe is open and
// may return io.EOF as an error if there is no data
// and the pipe was closed
func (p *Pipe) Read(ctx context.Context) (data [][]byte, isOpen bool, err error) {
return data, isOpen, common.ExecTask(ctx, func(ctx context.Context) error {
p.lock.RLock()
isOpen = !p.isClosed
if len(p.data) == 0 {
return io.EOF
}
data = make([][]byte, len(p.data))
copy(data, p.data)
p.data = nil
p.lock.RUnlock()
p.signalRed.Signal()
return nil
})
}
// =========================
// Boring boilerplate bellow
func (p *Pipe) Cap() int {
p.lock.RLock()
defer p.lock.RUnlock()
return p.cap
}
func (p *Pipe) Len() int {
p.lock.RLock()
defer p.lock.RUnlock()
return len(p.data)
}
func (p *Pipe) LenCap() (length int, capacity int) {
p.lock.RLock()
defer p.lock.RUnlock()
return len(p.data), p.cap
}
func (p *Pipe) Grow(n int) {
p.lock.Lock()
p.cap += n
p.lock.Unlock()
}
func (p *Pipe) Reset() {
p.lock.Lock()
p.data = [][]byte{}
p.lock.Unlock()
p.signalRed.Signal()
}
func (p *Pipe) Close() error {
p.lock.Lock()
p.isClosed = true
p.lock.Unlock()
close(p.closeChan)
return nil
}
func (p *Pipe) GetData() *[][]byte {
return &p.data
}