From 2e8cd0915221551ff16161ac4f03f9b9856e9807 Mon Sep 17 00:00:00 2001 From: Dmitry Anderson <4nd3r5z0n@gmail.com> Date: Tue, 12 Nov 2024 09:10:09 +0100 Subject: [PATCH] init --- common/common.go | 90 ++++++++++ common/convert.go | 58 +++++++ common/defaultVals.go | 40 +++++ common/types.go | 39 +++++ config/common.go | 28 +++ config/net.go | 12 ++ data_structures/cbuf.go | 95 +++++++++++ data_structures/counter.go | 52 ++++++ data_structures/ds.go | 4 + data_structures/pipe.go | 256 ++++++++++++++++++++++++++++ data_structures/queue.go | 81 +++++++++ data_structures/stack.go | 79 +++++++++ data_structures/syncmap.go | 66 +++++++ data_structures/worker_pool.go | 93 ++++++++++ data_structures/worker_pool_func.go | 58 +++++++ data_structures/worker_pool_test.go | 54 ++++++ db/migrate.go | 1 + env/env.go | 63 +++++++ go.mod | 3 + io/io.go | 25 +++ lecense.txt | 7 + net/basic_conn.go | 55 ++++++ net/ip.go | 152 +++++++++++++++++ net/net.go | 7 + net/ws.go | 8 + readme.txt | 1 + 26 files changed, 1427 insertions(+) create mode 100644 common/common.go create mode 100644 common/convert.go create mode 100644 common/defaultVals.go create mode 100644 common/types.go create mode 100644 config/common.go create mode 100644 config/net.go create mode 100644 data_structures/cbuf.go create mode 100644 data_structures/counter.go create mode 100644 data_structures/ds.go create mode 100644 data_structures/pipe.go create mode 100644 data_structures/queue.go create mode 100644 data_structures/stack.go create mode 100644 data_structures/syncmap.go create mode 100644 data_structures/worker_pool.go create mode 100644 data_structures/worker_pool_func.go create mode 100644 data_structures/worker_pool_test.go create mode 100644 db/migrate.go create mode 100644 env/env.go create mode 100644 go.mod create mode 100644 io/io.go create mode 100644 lecense.txt create mode 100644 net/basic_conn.go create mode 100644 net/ip.go create mode 100644 net/net.go create mode 100644 net/ws.go create mode 100644 readme.txt diff --git a/common/common.go b/common/common.go new file mode 100644 index 0000000..3baa339 --- /dev/null +++ b/common/common.go @@ -0,0 +1,90 @@ +package common_utils + +import ( + "context" + "errors" + "fmt" + "sync" + "time" +) + +func Retry( + ctx context.Context, + retryTimes int, + waitBeforeRetry time.Duration, + exec Task, +) (success bool, errs []error) { + var err error + errs = []error{} + for i := 0; i < retryTimes; i++ { + if err = exec(ctx); err == nil { + return true, errs + } + errs = append(errs, err) + } + return false, errs +} + +func ExecTask(ctx context.Context, tasks Task) error { + errChan := make(chan error) + go func() { + errChan <- tasks(ctx) + }() + + select { + case err := <-errChan: + return err + case <-ctx.Done(): + return ctx.Err() + } +} + +// ExecTasks Runs each task in a separate goroutine +// To set no task timeout -- set taskTimeout to 0 +func ExecTasks(ctx context.Context, taskTimeout time.Duration, tasks []Task) error { + wg := &sync.WaitGroup{} + errChan := make(chan error) + wgChan := make(chan struct{}) + + wg.Add(len(tasks)) + for i, task := range tasks { + go func() { + var cancel context.CancelFunc + taskCtx := context.Background() + if taskTimeout != 0 { + taskCtx, cancel = context.WithTimeout(taskCtx, taskTimeout) + } + if err := task(taskCtx); err != nil { + errChan <- errors.Join(fmt.Errorf("error running task %d: ", i), err) + } + wg.Done() + cancel() + }() + } + go func() { + wg.Wait() + close(wgChan) + }() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-wgChan: + return nil + case err := <-errChan: + return err + } +} + +func Ternar[T any](condition bool, ifTrue T, ifFalse T) T { + if condition { + return ifTrue + } + return ifFalse +} + +func Must(err error) { + if err != nil { + panic(err.Error()) + } +} diff --git a/common/convert.go b/common/convert.go new file mode 100644 index 0000000..4d3e45d --- /dev/null +++ b/common/convert.go @@ -0,0 +1,58 @@ +package common_utils + +import ( + "strconv" + "strings" +) + +// Converts all the types in +func ConvertArr[Tin any, Tout any](in []Tin, convertFunc CovertFunc[Tin, Tout]) (convertedItems []Tout, err error) { + out := make([]Tout, len(in)) + for i, item := range in { + outItem, err := convertFunc(item) + if err != nil { + return nil, err + } + out[i] = outItem + } + return out, nil +} + +// More modern approach to parsing integers and floats using generics + +func MustStr2Int[T Int](str string, base int, bitSize int) T { + int, err := strconv.ParseInt(str, base, bitSize) + if err != nil { + panic("Error while trying to convert string to integer: " + err.Error()) + } + return T(int) +} + +func Str2Int[T Int](str string, base int, bitSize int) (T, error) { + out, err := strconv.ParseInt(str, base, bitSize) + return T(out), err +} + +func MustStr2Float[T Float](str string, bitSize int) T { + out, err := strconv.ParseFloat(str, bitSize) + if err != nil { + panic("Error while trying to convert string to float: " + err.Error()) + } + return T(out) +} + +func Str2Float[T Float](str string, bitSize int) (T, error) { + out, err := strconv.ParseFloat(str, bitSize) + return T(out), err +} + +func Str2Bool(str string) (val bool, ok bool) { + switch strings.ToLower(str) { + case "true", "yes", "y", "t", "1": + return true, true + case "false", "no", "n", "f", "0": + return false, true + default: + return false, false + } +} diff --git a/common/defaultVals.go b/common/defaultVals.go new file mode 100644 index 0000000..de947eb --- /dev/null +++ b/common/defaultVals.go @@ -0,0 +1,40 @@ +package common_utils + +func DefaultIfEqual[T comparable](in T, checkEqualTo T, defaultVal T) T { + if in == checkEqualTo { + return defaultVal + } + return in +} + +// If in is empty returns the devaultVal. Else returns in +func ValOrDefaultStr(in string, defaultVal string) string { + if in == "" { + return defaultVal + } + return in +} + +// If in is 0 returns the devaultVal. Else returns in +func ValOrDefaultNum[T Num](in T, defaultVal T) T { + if in == 0 { + return defaultVal + } + return in +} + +// If array is equal to nil -- will return an empty array instead +func UnnilArray[T any](in []T) []T { + if in == nil { + return []T{} + } + return nil +} + +// If map is equal to nil -- will return an empty map instead +func UnnilMap[K comparable, V any](in map[K]V) map[K]V { + if in == nil { + return make(map[K]V) + } + return make(map[K]V) +} diff --git a/common/types.go b/common/types.go new file mode 100644 index 0000000..c9a07b3 --- /dev/null +++ b/common/types.go @@ -0,0 +1,39 @@ +package common_utils + +import "context" + +type Task func(ctx context.Context) error + +// Number types +type ( + Signed interface { + ~int | ~int8 | ~int16 | ~int32 | ~int64 + } + Unsigned interface { + ~uint | ~uint8 | ~uint32 | ~uint64 + } + Float interface{ ~float32 | ~float64 } + Int interface{ Signed | Unsigned } + Num interface{ Float | Int } +) + +type IRange[T Num] interface { + Begin() T + End() T +} + +type Range[T Num] struct { + IRange[T] + begin T + end T +} + +func NewRange[T Num](begin T, end T) *Range[T] { return &Range[T]{begin: begin, end: end} } +func (r Range[T]) Begin() T { return r.begin } +func (r Range[T]) End() T { return r.end } + +type CovertFunc[Tin any, Tout any] func(in Tin) (out Tout, err error) + +type IStrConvertable interface { + String() string +} diff --git a/config/common.go b/config/common.go new file mode 100644 index 0000000..6a10607 --- /dev/null +++ b/config/common.go @@ -0,0 +1,28 @@ +package config + +import common_utils "nettools/common" + +type FileTLS struct { + Enable bool `yaml:"enable" json:"enable"` + Cert string `yaml:"cert" json:"cert"` + Key string `yaml:"key" json:"key"` +} + +type TLS struct { + Cert string + Key string +} + +func ApplyEnv(fileCfg FileTLS) *FileTLS { + return &FileTLS{} +} + +func NewTlsConfigFromFile(fileCfg *FileTLS, defaultCertPath, defaultKeyPath string) *TLS { + if !fileCfg.Enable { + return nil + } + return &TLS{ + Cert: common_utils.ValOrDefaultStr(fileCfg.Cert, defaultCertPath), + Key: common_utils.ValOrDefaultStr(fileCfg.Key, defaultKeyPath), + } +} diff --git a/config/net.go b/config/net.go new file mode 100644 index 0000000..eb8ea7c --- /dev/null +++ b/config/net.go @@ -0,0 +1,12 @@ +package config + +import common_utils "nettools/common" + +type FileRange[T common_utils.Num] struct { + Begin T `json:"begin" yaml:"begin"` + End T `json:"end" yaml:"end"` +} + +func (r *FileRange[T]) Process() common_utils.IRange[T] { + return common_utils.NewRange(r.Begin, r.End) +} diff --git a/data_structures/cbuf.go b/data_structures/cbuf.go new file mode 100644 index 0000000..8934297 --- /dev/null +++ b/data_structures/cbuf.go @@ -0,0 +1,95 @@ +package ds_utils + +import ( + "sync" +) + +// Basic Circular buffer interface +type ICBuf[T any] interface { + Push(item T) + Get() []T +} + +// Implementation of ICBuf interface +// Simpliest circular buffer implementation +// If thread safety is needed -- use CBufSync +type CBuf[T any] struct { + Buffer []T + Cursor int + Capacity int +} + +// Same as pushing back to array +func (cbuf *CBuf[T]) Push(item T) { + cbuf.Buffer[cbuf.Cursor] = item + cbuf.Cursor++ + if cbuf.Cursor >= cbuf.Capacity { + cbuf.Cursor = 0 + } + println(cbuf.Cursor) +} + +// This method isn't a part of ICBuf interface +// +// It pushes item to the "buffer start" +// (Right before the cursor) +// removing item from back +func (cbuf *CBuf[T]) PushFront(item T) { + cbuf.Buffer = append( + append(cbuf.Buffer[cbuf.Cursor+1:], cbuf.Buffer[:cbuf.Cursor]...), + item, + ) + cbuf.Cursor = 0 +} + +// Just returning the whole buffer in the correct order +func (cbuf *CBuf[T]) Get() []T { + return append(cbuf.Buffer[cbuf.Cursor:], cbuf.Buffer[:cbuf.Cursor]...) +} + +func NewCBuf[T any](capacity int) *CBuf[T] { + return &CBuf[T]{ + Buffer: make([]T, capacity), + Capacity: capacity, + } +} + +// Same CBuf, but with mutex +type CBufSync[T any] struct { + Buffer []T + Cursor int + Capacity int + Mu *sync.RWMutex +} + +func NewCBufSync[T any](capacity int) *CBufSync[T] { + return &CBufSync[T]{ + Mu: &sync.RWMutex{}, + } +} + +func (cbuf *CBufSync[T]) Push(item T) { + cbuf.Mu.Lock() + cbuf.Buffer[cbuf.Cursor] = item + cbuf.Cursor++ + if cbuf.Cursor >= cbuf.Capacity { + cbuf.Cursor = 0 + } + cbuf.Mu.Unlock() +} + +func (cbuf *CBufSync[T]) PushFront(item T) { + cbuf.Mu.Lock() + cbuf.Buffer = append( + append(cbuf.Buffer[cbuf.Cursor+1:], cbuf.Buffer[:cbuf.Cursor]...), + item, + ) + cbuf.Cursor = 0 + cbuf.Mu.Unlock() +} + +func (cbuf *CBufSync[T]) Get() []T { + cbuf.Mu.RLock() + defer cbuf.Mu.RUnlock() + return append(cbuf.Buffer[cbuf.Cursor:], cbuf.Buffer[:cbuf.Cursor]...) +} diff --git a/data_structures/counter.go b/data_structures/counter.go new file mode 100644 index 0000000..7158773 --- /dev/null +++ b/data_structures/counter.go @@ -0,0 +1,52 @@ +package ds_utils + +import "sync/atomic" + +type ICounter[i int32 | int64] interface { + Set(new i) (old i) + Add(delta i) (val i) + Get() (val i) +} + +// 32 bit int counter implementation of ICounter +type Counter32 struct { + val int32 +} + +// 32 bit int counter implementation of ICounter +func NewCounter32(initVal int32) ICounter[int32] { + return &Counter32{val: initVal} +} + +func (c *Counter32) Set(new int32) (old int32) { + return atomic.SwapInt32(&c.val, new) +} + +func (c *Counter32) Add(delta int32) (val int32) { + return atomic.AddInt32(&c.val, delta) +} + +func (c *Counter32) Get() (val int32) { + return atomic.LoadInt32(&c.val) +} + +// 64 bit int counter implementation of ICounter +type Counter64 struct { + val int64 +} + +func NewCounter64(initVal int64) ICounter[int64] { + return &Counter64{val: initVal} +} + +func (c *Counter64) Set(new int64) (old int64) { + return atomic.SwapInt64(&c.val, new) +} + +func (c *Counter64) Add(delta int64) (val int64) { + return atomic.AddInt64(&c.val, delta) +} + +func (c *Counter64) Get() (val int64) { + return atomic.LoadInt64(&c.val) +} diff --git a/data_structures/ds.go b/data_structures/ds.go new file mode 100644 index 0000000..7362c68 --- /dev/null +++ b/data_structures/ds.go @@ -0,0 +1,4 @@ +/* +Provides useful data structures +*/ +package ds_utils diff --git a/data_structures/pipe.go b/data_structures/pipe.go new file mode 100644 index 0000000..049a98e --- /dev/null +++ b/data_structures/pipe.go @@ -0,0 +1,256 @@ +package ds_utils + +import ( + "context" + "errors" + "io" + "sync" + + common "nettools/common" +) + +var ( + ErrPipeClosed = errors.New("pipe is closed") + ErrPipeFull = errors.New("pipe is full") +) + +// IPipe provides Pipe Interface. Pipe is a data structure +// in which u can write and from which u can read chunks +// of data. Almost like buffer +type IPipe interface { + io.Closer + Write(ctx context.Context, data ...[]byte) (n int, err error) + WaitWrite(ctx context.Context, data ...[]byte) (n int, err error) + Read(ctx context.Context) (data [][]byte, isOpen bool, err error) + WaitRead(ctx context.Context) (data [][]byte, isOpen bool, err error) + + Cap() int + Grow(n int) + Len() int + Reset() +} + +// Pipe implements IPipe interface +type Pipe struct { + IPipe + data [][]byte + cap int + + signalWritten *sync.Cond + signalRed *sync.Cond + lock *sync.RWMutex + + isClosed bool + closeChan chan struct{} +} + +// New creates new Pipe and returns pointer to it +// Set sizeLimit = -1 for no limit +// discard +func New(sizeLimit int) *Pipe { + return &Pipe{ + data: [][]byte{}, + cap: sizeLimit, + + signalWritten: sync.NewCond(new(sync.Mutex)), + signalRed: sync.NewCond(new(sync.Mutex)), + lock: &sync.RWMutex{}, + + isClosed: false, + } +} + +// ========================= +// Write + +func (p *Pipe) write(data [][]byte) (overflow [][]byte, err error) { + if p.isClosed { + return data, ErrPipeClosed + } + if p.data == nil { + p.data = make([][]byte, 0, p.cap) + if len(data) > p.cap { + copy(p.data, data[:p.cap]) + return data[p.cap:], ErrPipeFull + } + copy(p.data, data) + return [][]byte{}, nil + } + + spaceLeft := p.cap - len(p.data) + if spaceLeft < 0 { + return [][]byte{}, ErrPipeClosed + } + if spaceLeft < len(data) { + err = ErrPipeFull + overflow = make([][]byte, len(data)-spaceLeft) + copy(overflow, data[spaceLeft:]) + data = data[:spaceLeft] + } + p.data = append(p.data, data...) + return overflow, err +} + +// WaitWrite waits for pipe too free if there isn't enough space and then +// saves chunks into the pipe and returns the overflow. +// May return ErrPipeClosed and ErrPipeFull as an error +func (p *Pipe) WaitWrite(ctx context.Context, data ...[]byte) (overflow [][]byte, err error) { + if data == nil { + return nil, nil + } + return overflow, common.ExecTask(ctx, func(ctx context.Context) error { + p.lock.Lock() + for len(data) < p.cap && len(data)+len(p.data) > p.cap { + p.lock.Unlock() + p.signalRed.Wait() + p.lock.Lock() + break + } + overflow, err = p.write(data) + p.lock.Unlock() + p.signalWritten.Signal() + return err + }) +} + +// Write saves chunks into the pipe and returns the overflow. +// May return ErrPipeClosed and ErrPipeFull as an error +func (p *Pipe) Write(ctx context.Context, data ...[]byte) (overflow [][]byte, err error) { + if data == nil { + return nil, nil + } + return overflow, common.ExecTask(ctx, func(ctx context.Context) error { + p.lock.Lock() + overflow, err = p.write(data) + p.lock.Unlock() + p.signalWritten.Signal() + return err + }) +} + +// ========================= +// Read + +// waitRead is an internal function that reads and waits for data, +// but it doesn't handle context cancel or closing pipe +// Extended with WaitRead function +func (p *Pipe) waitRead() (data [][]byte, isOpen bool, err error) { + p.lock.RLock() + for len(p.data) == 0 { + if p.isClosed { + return p.data, false, io.EOF + } + p.lock.RUnlock() + p.signalWritten.Wait() + p.lock.RLock() + } + isOpen = !p.isClosed + data = make([][]byte, len(p.data)) + copy(data, p.data) + p.data = nil + p.lock.RUnlock() + p.signalRed.Signal() + return p.data, isOpen, nil +} + +// WaitRead +// when pipe is empty -- waits for some data to be +// written. Then reads all the chunks from the +// pipe and resets pipe data after reading. +// Also returns if pipe is open and +// may return io.EOF as an error if there is no data +// and the pipe was closed. Also, may return ErrPipeClosed +// if pipe is closed during reading and there is no data in the pipe +func (p *Pipe) WaitRead(ctx context.Context) (data [][]byte, isOpen bool, err error) { + errChan := make(chan error) + go func() { + data, isOpen, err = p.waitRead() + if err != nil { + errChan <- err + } + close(errChan) + }() + + for { + select { + case <-ctx.Done(): + return data, isOpen, ctx.Err() + case err := <-errChan: + return data, isOpen, err + case <-p.closeChan: + p.lock.RLock() + if len(p.data) == 0 { + p.lock.RUnlock() + return data, isOpen, ErrPipeClosed + } + p.lock.RUnlock() + } + } +} + +// Read reads all the chunks from the pipe +// and resets pipe data after reading. +// Also returns if pipe is open and +// may return io.EOF as an error if there is no data +// and the pipe was closed +func (p *Pipe) Read(ctx context.Context) (data [][]byte, isOpen bool, err error) { + return data, isOpen, common.ExecTask(ctx, func(ctx context.Context) error { + p.lock.RLock() + isOpen = !p.isClosed + if len(p.data) == 0 { + return io.EOF + } + data = make([][]byte, len(p.data)) + copy(data, p.data) + p.data = nil + p.lock.RUnlock() + p.signalRed.Signal() + return nil + }) +} + +// ========================= +// Boring boilerplate bellow + +func (p *Pipe) Cap() int { + p.lock.RLock() + defer p.lock.RUnlock() + return p.cap +} + +func (p *Pipe) Len() int { + p.lock.RLock() + defer p.lock.RUnlock() + return len(p.data) +} + +func (p *Pipe) LenCap() (length int, capacity int) { + p.lock.RLock() + defer p.lock.RUnlock() + return len(p.data), p.cap +} + +func (p *Pipe) Grow(n int) { + p.lock.Lock() + p.cap += n + p.lock.Unlock() +} + +func (p *Pipe) Reset() { + p.lock.Lock() + p.data = [][]byte{} + p.lock.Unlock() + p.signalRed.Signal() +} + +func (p *Pipe) Close() error { + p.lock.Lock() + p.isClosed = true + p.lock.Unlock() + close(p.closeChan) + return nil +} + +func (p *Pipe) GetData() *[][]byte { + return &p.data +} diff --git a/data_structures/queue.go b/data_structures/queue.go new file mode 100644 index 0000000..40ae674 --- /dev/null +++ b/data_structures/queue.go @@ -0,0 +1,81 @@ +package ds_utils + +import ( + "sync" +) + +type IQueue[T any] interface { + Push(item T) + Get() T + Len() int +} + +// This queue isn't suppose to be thread safe and suppose to be used only in a single thread +// If you will try to get item from an empty queue -- it will panic +// (Because if u have single-threaded app and one thread will be locked +// by getting an item from the queue, there will be no thread to put that item) +// For multythreaded usage -- use QueueSync +type Queue[T any] struct { + IQueue[T] + Buffer []T +} + +func NewQueue[T any]() *Queue[T] { + return &Queue[T]{} +} + +func (q *Queue[T]) Push(item T) { + q.Buffer = append(q.Buffer, item) +} + +func (q *Queue[T]) Get() T { + if len(q.Buffer) < 1 { + panic("Trying to get from an empty thread unsafe queue") + } + item := q.Buffer[0] + q.Buffer = q.Buffer[1:] + return item +} + +func (q *Queue[T]) Len() int { + return len(q.Buffer) +} + +// Queue sync +type QueueSync[T any] struct { + IQueue[T] + Buffer []T + Mu *sync.Mutex + NonEmptyCond *sync.Cond +} + +func NewQueueSync[T any]() *QueueSync[T] { + return &QueueSync[T]{ + Mu: &sync.Mutex{}, + NonEmptyCond: sync.NewCond(&sync.RWMutex{}), + } +} + +func (q *QueueSync[T]) Push(item T) { + q.Mu.Lock() + q.Buffer = append(q.Buffer, item) + q.NonEmptyCond.Signal() + q.Mu.Unlock() +} + +func (q *QueueSync[T]) Get() T { + q.Mu.Lock() + if len(q.Buffer) < 1 { + q.NonEmptyCond.Wait() + } + item := q.Buffer[0] + q.Buffer = q.Buffer[1:] + q.Mu.Unlock() + return item +} + +func (q *QueueSync[T]) Len() int { + q.Mu.Lock() + defer q.Mu.Lock() + return len(q.Buffer) +} diff --git a/data_structures/stack.go b/data_structures/stack.go new file mode 100644 index 0000000..e2d6dff --- /dev/null +++ b/data_structures/stack.go @@ -0,0 +1,79 @@ +package ds_utils + +import "sync" + +type IStack[T any] interface { + Push(item T) + Get() T + Len() int +} + +// This queue isn't suppose to be thread safe +// If you will try to get item from an empty queue -- it will panic +// (Because if u have single-threaded app and one thread will be locked +// by getting an item from the queue, there will be no thread to put that item) +// For multythreaded usage -- use StackSync (it's calm, never panics) +type Stack[T any] struct { + IStack[T] + Buffer []T +} + +func NewStack[T any]() *Stack[T] { + return &Stack[T]{} +} + +func (q *Stack[T]) Push(item T) { + q.Buffer = append(q.Buffer, item) +} + +func (q *Stack[T]) Get() T { + if len(q.Buffer) < 1 { + panic("Trying to get from an empty thread unsafe queue") + } + item := q.Buffer[len(q.Buffer)-1] + q.Buffer = q.Buffer[:len(q.Buffer)-1] + return item +} + +func (q *Stack[T]) Len() int { + return len(q.Buffer) +} + +// Stack sync +type StackSync[T any] struct { + IStack[T] + Buffer []T + Mu *sync.Mutex + NonEmptyCond *sync.Cond +} + +func NewStackSync[T any]() *StackSync[T] { + return &StackSync[T]{ + Mu: &sync.Mutex{}, + NonEmptyCond: sync.NewCond(&sync.RWMutex{}), + } +} + +func (q *StackSync[T]) Push(item T) { + q.Mu.Lock() + q.Buffer = append(q.Buffer, item) + q.NonEmptyCond.Signal() + q.Mu.Unlock() +} + +func (q *StackSync[T]) Get() T { + q.Mu.Lock() + if len(q.Buffer) < 1 { + q.NonEmptyCond.Wait() + } + item := q.Buffer[len(q.Buffer)-1] + q.Buffer = q.Buffer[:len(q.Buffer)-1] + q.Mu.Unlock() + return item +} + +func (q *StackSync[T]) Len() int { + q.Mu.Lock() + defer q.Mu.Lock() + return len(q.Buffer) +} diff --git a/data_structures/syncmap.go b/data_structures/syncmap.go new file mode 100644 index 0000000..e402a12 --- /dev/null +++ b/data_structures/syncmap.go @@ -0,0 +1,66 @@ +package ds_utils + +import ( + "sync" + + common "nettools/common" +) + +type IIdSyncMap[IDType common.Int, ItemType any] interface { + GetAll() map[IDType]ItemType + Add(items ...ItemType) (id IDType) + Rm(id IDType) + Pop(id IDType) (item ItemType, exists bool) +} + +type IdSyncMap[IDType common.Int, ItemType any] struct { + IIdSyncMap[IDType, ItemType] + Lock *sync.RWMutex + Data map[IDType]ItemType + NextID IDType +} + +func (idsyncmap *IdSyncMap[IDType, ItemType]) GetAll() map[IDType]ItemType { + dataCopy := map[IDType]ItemType{} + idsyncmap.Lock.RLock() + for id, item := range idsyncmap.Data { + dataCopy[id] = item + } + idsyncmap.Lock.RUnlock() + return dataCopy +} + +func (idsyncmap *IdSyncMap[IDType, ItemType]) Add(items ...ItemType) (id IDType) { + idsyncmap.Lock.Lock() + id = idsyncmap.NextID + for i, item := range items { + idsyncmap.Data[id+IDType(i)] = item + } + idsyncmap.NextID += IDType(len(items)) + idsyncmap.Lock.Unlock() + return id +} + +func (idsyncmap *IdSyncMap[IDType, ItemType]) Rm(id IDType) { + idsyncmap.Lock.Lock() + delete(idsyncmap.Data, id) + idsyncmap.Lock.Unlock() +} + +func (idsyncmap *IdSyncMap[IDType, ItemType]) Pop(id IDType) (item ItemType, exists bool) { + idsyncmap.Lock.Lock() + defer idsyncmap.Lock.Unlock() + if item, exists = idsyncmap.Data[id]; !exists { + return item, exists + } + delete(idsyncmap.Data, id) + return item, exists +} + +func NewIDSyncMap[IDType common.Int, ItemType any]() IIdSyncMap[IDType, ItemType] { + return &IdSyncMap[IDType, ItemType]{ + Lock: &sync.RWMutex{}, + Data: map[IDType]ItemType{}, + NextID: 0, + } +} diff --git a/data_structures/worker_pool.go b/data_structures/worker_pool.go new file mode 100644 index 0000000..d5e707f --- /dev/null +++ b/data_structures/worker_pool.go @@ -0,0 +1,93 @@ +package ds_utils + +// A combination of worker pool and a queue + +import ( + "context" + "sync" + "time" + + common "nettools/common" +) + +type WorkersCount struct { + Lim int32 + Busy int32 +} + +type IWorkerPool interface { + Exec(task common.Task) + QueueLen() int + WorkersCount() WorkersCount + SetWorkersCountLim(newSize int32) +} + +// Just a worker pool with queue before it +// Thread-safe +type WorkerPool struct { + TaskQueue []common.Task + Mu *sync.RWMutex + CountLim int32 + CountBusy int32 + TaskDeadline time.Duration +} + +func NewWorkerPool(workersCountLim int32, taskDeadline time.Duration) *WorkerPool { + return &WorkerPool{ + TaskQueue: []common.Task{}, + Mu: &sync.RWMutex{}, + CountLim: workersCountLim, + CountBusy: 0, + TaskDeadline: taskDeadline, + } +} + +func (wpq *WorkerPool) Exec(task common.Task) { + wpq.Mu.Lock() + if wpq.CountBusy >= wpq.CountLim { + wpq.TaskQueue = append(wpq.TaskQueue, task) + return + } + wpq.CountBusy++ + wpq.Mu.Unlock() + for { + var cancel context.CancelFunc + var ctx context.Context + if wpq.TaskDeadline != 0 { + ctx = context.Background() + ctx, cancel = context.WithTimeout(ctx, wpq.TaskDeadline) + defer cancel() + } + task(ctx) + wpq.Mu.Lock() + if len(wpq.TaskQueue) < 1 { + wpq.CountBusy-- + wpq.Mu.Unlock() + return + } + task = wpq.TaskQueue[0] + wpq.TaskQueue = wpq.TaskQueue[1:] + wpq.Mu.Unlock() + } +} + +func (wpq *WorkerPool) QueueLen() int { + wpq.Mu.RLock() + defer wpq.Mu.RUnlock() + return len(wpq.TaskQueue) +} + +func (wpq *WorkerPool) WorkersCount() WorkersCount { + wpq.Mu.RLock() + defer wpq.Mu.RUnlock() + return WorkersCount{ + Lim: wpq.CountLim, + Busy: wpq.CountBusy, + } +} + +func (wpq *WorkerPool) SetWorkersCountLim(newSize int32) { + wpq.Mu.Lock() + wpq.CountLim = newSize + wpq.Mu.Unlock() +} diff --git a/data_structures/worker_pool_func.go b/data_structures/worker_pool_func.go new file mode 100644 index 0000000..3dd1695 --- /dev/null +++ b/data_structures/worker_pool_func.go @@ -0,0 +1,58 @@ +package ds_utils + +import ( + "context" + + common "nettools/common" +) + +// RunWorkerPool is a single function worker pool implementation +// +// Create a channel where u will send tasks, send some tasks +// there and they will execute. +// If there are too many tasks -- they will be saved to the queue. +// +// Also this function may be extended to have resizable capacity, +// more callbacks, tasks priority, but those modifications would +// harm performance. +// +// If you need out-of-the-box more flexible approach -- use WorkerPool struct instead. +// Also this wp is worse in terms of performance. IDK why it exists, it's just sucks. +func RunWorkerPool( + ctx context.Context, + tasksChan chan common.Task, + capacity int, + errHanlder func(error) error, +) (err error) { + countBusy := 0 + errChan := make(chan error, 3) + tasksQueue := []common.Task{} + + for { + select { + case task := <-tasksChan: + if countBusy >= capacity { + tasksQueue = append(tasksQueue, task) + continue + } + go func() { errChan <- task(context.Background()) }() + countBusy++ + + case err := <-errChan: + if err != nil { + if err = errHanlder(err); err != nil { + return err + } + } + if len(tasksQueue) == 0 { + countBusy-- + continue + } + task := tasksQueue[0] + tasksQueue = tasksQueue[1:] + go func() { errChan <- task(context.Background()) }() + case <-ctx.Done(): + return ctx.Err() + } + } +} diff --git a/data_structures/worker_pool_test.go b/data_structures/worker_pool_test.go new file mode 100644 index 0000000..b2f82fe --- /dev/null +++ b/data_structures/worker_pool_test.go @@ -0,0 +1,54 @@ +package ds_utils_test + +import ( + "context" + "math/rand" + "testing" + + common_utils "nettools/common" + ds_utils "nettools/data_structures" +) + +func BenchmarkWorkerPool(b *testing.B) { + wp := ds_utils.NewWorkerPool(8, 0) + + for i := 0; i < b.N; i++ { + wp.Exec(func(ctx context.Context) error { + arr := make([]byte, 128) + rand.Read(arr) + return nil + }) + } + // Prevent exit before tasks are finished + endBenchmark := make(chan struct{}) + wp.Exec(func(ctx context.Context) error { + close(endBenchmark) + return nil + }) + <-endBenchmark +} + +func BenchmarkWorkerPoolFunc(b *testing.B) { + taskChan := make(chan common_utils.Task) + go func() { + _ = ds_utils.RunWorkerPool(context.Background(), taskChan, 8, func(err error) error { + b.Error(err.Error()) + return nil + }) + }() + + for i := 0; i < b.N; i++ { + taskChan <- func(ctx context.Context) error { + arr := make([]byte, 128) + rand.Read(arr) + return nil + } + } + // Prevent exit before tasks are finished + endBenchmark := make(chan struct{}) + taskChan <- func(ctx context.Context) error { + close(endBenchmark) + return nil + } + <-endBenchmark +} diff --git a/db/migrate.go b/db/migrate.go new file mode 100644 index 0000000..998bc94 --- /dev/null +++ b/db/migrate.go @@ -0,0 +1 @@ +package db_utils diff --git a/env/env.go b/env/env.go new file mode 100644 index 0000000..2bb7d71 --- /dev/null +++ b/env/env.go @@ -0,0 +1,63 @@ +package env_utils + +import ( + "log" + "os" + "strings" + + common "nettools/common" +) + +func Getenv(envName string) (env string, ok bool) { + env = os.Getenv(envName) + return env, env != "" +} + +func MustGetenv(envName string) (env string) { + env = os.Getenv(envName) + if env == "" { + log.Panicf("Panic while parsing environment variable %s: variable wasn't set or is an empty string", envName) + } + return env +} + +func GetenvBool(envName string) (env bool, ok bool) { + envStr := os.Getenv(envName) + lcEnvStr := strings.ToLower(envStr) + if lcEnvStr == "true" || lcEnvStr == "yes" || lcEnvStr == "y" { + return true, true + } else if lcEnvStr == "false" || lcEnvStr == "no" || lcEnvStr == "n" { + return false, true + } + return false, false +} + +func GetenvOrDefaultAndIsEnv(envName string, defaultVal string) (out string, isEnv bool) { + envPlainData := os.Getenv(envName) + if envPlainData == "" { + return defaultVal, false + } + return envPlainData, true +} + +func GetenvOrDefaultStr(envName string, defaultVal string) string { + env := os.Getenv(envName) + return common.Ternar(env != "", env, defaultVal) +} + +func GetenvOrDefaultInt[i common.Int](envName string, base int, bitSize int, defaultVal i) i { + intStr, ok := Getenv(envName) + if !ok { + return defaultVal + } + res, err := common.Str2Int[i](intStr, base, bitSize) + if err != nil { + return defaultVal + } + return res +} + +func GetenvOrDefaultBool(envName string, defaultVal bool) bool { + env, ok := GetenvBool(envName) + return common.Ternar(ok, env, defaultVal) +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..ccd4439 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module nettools + +go 1.23.1 diff --git a/io/io.go b/io/io.go new file mode 100644 index 0000000..9e68020 --- /dev/null +++ b/io/io.go @@ -0,0 +1,25 @@ +package io_utils + +import ( + "errors" + "os" +) + +var ( + ErrReadingFile = errors.New("error while reading the file") + ErrFileEmpty = errors.New("provided file is empty") +) + +func ReadPassFile(path string) (pass string, err error) { + buff, err := os.ReadFile(path) + if err != nil { + return "", errors.Join(ErrReadingFile, err) + } + if len(buff) < 1 { + return "", ErrFileEmpty + } + if buff[len(buff)-1] == '\n' { + buff = buff[:len(buff)-1] + } + return string(buff), nil +} diff --git a/lecense.txt b/lecense.txt new file mode 100644 index 0000000..15f06a9 --- /dev/null +++ b/lecense.txt @@ -0,0 +1,7 @@ +Copyright 2024 Dmitry Anderson + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/net/basic_conn.go b/net/basic_conn.go new file mode 100644 index 0000000..522dba6 --- /dev/null +++ b/net/basic_conn.go @@ -0,0 +1,55 @@ +package net_utils + +import ( + "bufio" + "io" + "net" + "strconv" + "strings" +) + +/* +May be useful if you need to download/upload something asyncroniously +or if needed to use HTTP ontop of some unusual protocol like UNIX Sockets, +UDP (are u insane?) or QUIC (WTF aren't u using HTTPv3 then?) +*/ + +// Optimised to read only response codes +// Reads response codes until getting EOF or error +func ConnHttpReadRespCodes(conn net.Conn) (codes []int, err error) { + reader := bufio.NewReader(conn) + + for { + var line string + line, err = reader.ReadString('\n') + if err != nil && err == io.EOF { + break + } else if err != nil { + return codes, err + } + + if strings.HasPrefix(line, "HTTP/") { + parts := strings.Split(line, " ") + if len(parts) < 2 { + continue + } + if code, err := strconv.Atoi(parts[1]); err == nil { + codes = append(codes, code) + } + } + + for { + line, err := reader.ReadString('\n') + if err != nil && err == io.EOF { + return codes, nil + } else if err != nil { + return codes, err + } + if line == "\r\n" || line == "\n" { + break // End of headers + } + } + } + + return codes, err +} diff --git a/net/ip.go b/net/ip.go new file mode 100644 index 0000000..fc71729 --- /dev/null +++ b/net/ip.go @@ -0,0 +1,152 @@ +package net_utils + +import ( + "encoding/binary" + "errors" + "io" + "net" + "net/http" + + common_utils "nettools/common" +) + +var ( + ErrInitializingRequest = errors.New("error while initializing the request") + ErrReadingBody = errors.New("err while reading the body") + ErrDialingServ = errors.New("err while dealing the server") + ErrInterfaceNotFound = errors.New("provided network interface name wasn't found") +) + +const ( + DEFAULT_HTTP_PORT = 80 + DEFAULT_WS_PORT = 80 + DEFAULT_WSS_PORT = 443 + DEFAULT_HTTPS_PORT = 443 + DEFAULT_GRPC_PORT = 9090 +) + +const ( + // network masks + MASK8 uint32 = 0b11111111_00000000_00000000_00000000 + MASK12 uint32 = 0b11111111_11110000_00000000_00000000 + MASK16 uint32 = 0b11111111_11111111_00000000_00000000 + MASK24 uint32 = 0b11111111_11111111_11111111_00000000 + MASK32 uint32 = 0b11111111_11111111_11111111_11111111 + // reserved IP adresses + IP_192_168_0_0 uint32 = 0b11000000_10101000_00000000_00000000 + IP_172_16_0_0 uint32 = 0b10101100_00010000_00000000_00000000 + IP_10_0_0_0 uint32 = 0b00001010_00000000_00000000_00000000 + // reserved for Local Network IP addresses + PRESERVED_IP_RANGE_192 uint32 = IP_192_168_0_0 & MASK16 + PRESERVED_IP_RANGE_172 uint32 = IP_172_16_0_0 & MASK12 + PRESERVED_IP_RANGE_10 uint32 = IP_10_0_0_0 & MASK8 +) + +func MaskToBytes[T common_utils.Int](mask T) [4]uint8 { + if mask > 32 { + panic("mask is invalid: value out of range") + } + var out [4]uint8 + for i := 0; i < 4; i++ { + if mask >= 8 { + out[i] = 0b11111111 + mask -= 8 + } else { + out[i] = 1< 4 { + return 0, errors.New("bad input value length, expected up to 4 bytes") + } + copy(in, bytes) + return uint32( + bytes[0])<<24 | + uint32(bytes[1])<<16 | + uint32(bytes[2])<<8 | + uint32(bytes[3]), nil +} + +type NetInterfaceNamesT = map[string]struct{} + +// Those are ones I have on my machine, so they are default, lol +var DefaultNetInterfaceNames = NetInterfaceNamesT{ + "eth0": {}, + "wlan0": {}, +} + +// Converts IP which is a byte array to an integer +// So it can be used with the bitmasks or be compared fast +func IpToInt(ip net.IP) uint32 { + if len(ip) == 16 { + return binary.BigEndian.Uint32(ip[12:16]) + } + return binary.BigEndian.Uint32(ip) +} + +func GetWanIP(ipv4 bool) (net.IP, error) { + url := "http://4.ident.me" + if !ipv4 { + url = "http://6.ident.me" + } + + req, err := http.Get(url) + if err != nil { + return nil, errors.Join(ErrInitializingRequest, err) + } + defer req.Body.Close() + + body, err := io.ReadAll(req.Body) + if err != nil { + return nil, errors.Join(ErrReadingBody, err) + } + return net.ParseIP(string(body)), err +} + +func GetLanIP(interfaceNames NetInterfaceNamesT) (net.IP, error) { + if interfaceNames == nil { + interfaceNames = DefaultNetInterfaceNames + } + + ifaces, err := net.Interfaces() + if err != nil { + return nil, err + } + + for _, i := range ifaces { + if i.Name != "" { + _, whitelisted := interfaceNames[i.Name] + if !whitelisted { + continue + } + } + addrs, err := i.Addrs() + if err != nil { + return nil, err + } + + for _, addr := range addrs { + var ip net.IP + switch v := addr.(type) { + case *net.IPNet: + ip = v.IP + case *net.IPAddr: + ip = v.IP + } + + intIP := IpToInt(ip) + if intIP&MASK16 == PRESERVED_IP_RANGE_192 || + intIP&MASK12 == PRESERVED_IP_RANGE_172 || + intIP&MASK8 == PRESERVED_IP_RANGE_10 { + return ip, nil + } + } + } + return nil, ErrInterfaceNotFound +} diff --git a/net/net.go b/net/net.go new file mode 100644 index 0000000..4521bd0 --- /dev/null +++ b/net/net.go @@ -0,0 +1,7 @@ +package net_utils + +import ( + common_utils "nettools/common" +) + +type IPortRange common_utils.IRange[int] diff --git a/net/ws.go b/net/ws.go new file mode 100644 index 0000000..a2ffc68 --- /dev/null +++ b/net/ws.go @@ -0,0 +1,8 @@ +package net_utils + +import "encoding/json" + +type WSMsg struct { + Type int + Data json.RawMessage +} diff --git a/readme.txt b/readme.txt new file mode 100644 index 0000000..5165ffa --- /dev/null +++ b/readme.txt @@ -0,0 +1 @@ +Just set of tools that I'm using from project to project