From 179310ac8569df2dc92ff0299ee0ac059e43fcdc Mon Sep 17 00:00:00 2001 From: Dmitry Anderson <4nd3r5z0n@gmail.com> Date: Tue, 12 Nov 2024 13:31:54 +0100 Subject: [PATCH] dsUtils refactoring + cache added --- data_structures/cache.go | 157 ++++++++++++++++++++++++++++ data_structures/cbuf.go | 79 ++++---------- data_structures/counter.go | 15 +-- data_structures/ds.go | 6 +- data_structures/pipe.go | 6 +- data_structures/queue.go | 24 +++-- data_structures/stack.go | 70 +++++++------ data_structures/syncmap.go | 2 +- data_structures/worker_pool.go | 2 +- data_structures/worker_pool_func.go | 2 +- data_structures/worker_pool_test.go | 2 +- pg/pg.go | 2 +- 12 files changed, 245 insertions(+), 122 deletions(-) create mode 100644 data_structures/cache.go diff --git a/data_structures/cache.go b/data_structures/cache.go new file mode 100644 index 0000000..d857dbb --- /dev/null +++ b/data_structures/cache.go @@ -0,0 +1,157 @@ +package dsUtils + +import ( + "context" + "sync" + "time" +) + +type KeyValue struct { + Key string + Value []byte +} + +type MaybeExpirableData struct { + Data []byte + ExpirationTime time.Time + CanExpire bool +} +type ToBeInvalidatedData struct { + Key string + ExpirationTime time.Time +} + +type ICache interface { + Get(key string) (item []byte, isSet bool) + Set(data map[string][]byte, canExpire bool, updateExpirationTime bool) + Remove(key string) + SetIfNotExists(key string, value []byte, canExpire bool) (isSet bool) + GetAndRemove(key string) (item []byte, isSet bool) + + Run(ctx context.Context) error +} + +// Cache provides simple cache functionality +// All objects in the same cache instance are getting same TTL +type Cache struct { + ICache + newExpirableItem *sync.Cond + lock *sync.Mutex + data map[string]MaybeExpirableData + invalidateOrder []ToBeInvalidatedData + ttl time.Duration +} + +// NewCache Creates new Cache instance +func NewCache(ttl time.Duration) *Cache { + return &Cache{ + ttl: ttl, + newExpirableItem: &sync.Cond{L: &sync.Mutex{}}, + lock: &sync.Mutex{}, + data: make(map[string]MaybeExpirableData), + invalidateOrder: make([]ToBeInvalidatedData, 0), + } +} + +func (c *Cache) get(key string) (item MaybeExpirableData, isSet bool) { + item, isSet = c.data[key] + if item.ExpirationTime.Before(time.Now()) { + delete(c.data, key) + return MaybeExpirableData{}, false + } + return item, isSet +} + +func (c *Cache) Get(key string) (data []byte, isSet bool) { + c.lock.Lock() + defer c.lock.Unlock() + item, isSet := c.get(key) + return item.Data, isSet +} + +func (c *Cache) GetAndRemove(key string) (data []byte, isSet bool) { + c.lock.Lock() + item, isSet := c.get(key) + delete(c.data, key) + c.lock.Unlock() + return item.Data, isSet +} + +// Internal function +func (c *Cache) set(key string, value []byte, canExpire bool, expirationTime time.Time) { + c.data[key] = MaybeExpirableData{ + Data: value, + ExpirationTime: expirationTime, + CanExpire: canExpire, + } + if canExpire { + c.invalidateOrder = append(c.invalidateOrder, ToBeInvalidatedData{ + Key: key, + ExpirationTime: expirationTime, + }) + c.newExpirableItem.Signal() + } +} + +func (c *Cache) Set(data map[string][]byte, canExpire bool, updateExpirationTime bool) { + c.lock.Lock() + for key, val := range data { + if !updateExpirationTime { + if item, exists := c.data[key]; exists { + c.set(key, val, item.CanExpire, item.ExpirationTime) + return + } + } + c.set(key, val, canExpire, time.Now().Add(c.ttl)) + } + c.lock.Unlock() +} + +func (c *Cache) Remove(key string) { + c.lock.Lock() + delete(c.data, key) + c.lock.Unlock() +} + +func (c *Cache) SetIfNotExists(key string, value []byte, canExpire bool) (isSet bool) { + c.lock.Lock() + defer c.lock.Unlock() + _, exists := c.data[key] + if exists { + return false + } + c.set(key, value, canExpire, time.Now().Add(c.ttl)) + return true +} + +func (c *Cache) timeInvalidate() { + c.lock.Lock() + if len(c.invalidateOrder) == 0 { + c.lock.Unlock() + c.newExpirableItem.Wait() + c.lock.Lock() + } + toBeInvalidated := c.invalidateOrder[0] + c.lock.Unlock() + time.After(time.Now().Sub(toBeInvalidated.ExpirationTime)) + c.lock.Lock() + item, exists := c.data[toBeInvalidated.Key] + if exists && item.CanExpire { + if item.ExpirationTime.After(time.Now()) { + delete(c.data, toBeInvalidated.Key) + } + } + c.invalidateOrder = c.invalidateOrder[1:] + c.lock.Unlock() +} + +func (c *Cache) Run(ctx context.Context) error { + // Invalidator loop + go func() { + for { + c.timeInvalidate() + } + }() + <-ctx.Done() + return ctx.Err() +} diff --git a/data_structures/cbuf.go b/data_structures/cbuf.go index b6b2705..fb86b70 100644 --- a/data_structures/cbuf.go +++ b/data_structures/cbuf.go @@ -1,25 +1,32 @@ -package dbUtils +package dsUtils -import ( - "sync" -) - -// Basic Circular buffer interface +// ICBuf is a basic Circular buffer interface type ICBuf[T any] interface { Push(item T) Get() []T } -// Implementation of ICBuf interface -// Simpliest circular buffer implementation +// CBuf is the implementation of ICBuf interface +// Simplest circular buffer implementation // If thread safety is needed -- use CBufSync +// +// WARNING: It's not thread-safe by default, +// use it with a lock type CBuf[T any] struct { + ICBuf[T] Buffer []T Cursor int Capacity int } -// Same as pushing back to array +// NewCBuf creates a new CBuf instance +func NewCBuf[T any](capacity int) *CBuf[T] { + return &CBuf[T]{ + Buffer: make([]T, capacity), + Capacity: capacity, + } +} + func (cbuf *CBuf[T]) Push(item T) { cbuf.Buffer[cbuf.Cursor] = item cbuf.Cursor++ @@ -42,54 +49,10 @@ func (cbuf *CBuf[T]) PushFront(item T) { cbuf.Cursor = 0 } -// Just returning the whole buffer in the correct order +// Get returns the whole buffer in the correct order func (cbuf *CBuf[T]) Get() []T { - return append(cbuf.Buffer[cbuf.Cursor:], cbuf.Buffer[:cbuf.Cursor]...) -} - -func NewCBuf[T any](capacity int) *CBuf[T] { - return &CBuf[T]{ - Buffer: make([]T, capacity), - Capacity: capacity, - } -} - -// Same CBuf, but with mutex -type CBufSync[T any] struct { - Buffer []T - Cursor int - Capacity int - Mu *sync.RWMutex -} - -func NewCBufSync[T any](capacity int) *CBufSync[T] { - return &CBufSync[T]{ - Mu: &sync.RWMutex{}, - } -} - -func (cbuf *CBufSync[T]) Push(item T) { - cbuf.Mu.Lock() - cbuf.Buffer[cbuf.Cursor] = item - cbuf.Cursor++ - if cbuf.Cursor >= cbuf.Capacity { - cbuf.Cursor = 0 - } - cbuf.Mu.Unlock() -} - -func (cbuf *CBufSync[T]) PushFront(item T) { - cbuf.Mu.Lock() - cbuf.Buffer = append( - append(cbuf.Buffer[cbuf.Cursor+1:], cbuf.Buffer[:cbuf.Cursor]...), - item, - ) - cbuf.Cursor = 0 - cbuf.Mu.Unlock() -} - -func (cbuf *CBufSync[T]) Get() []T { - cbuf.Mu.RLock() - defer cbuf.Mu.RUnlock() - return append(cbuf.Buffer[cbuf.Cursor:], cbuf.Buffer[:cbuf.Cursor]...) + out := make([]T, 0, cbuf.Capacity) + copy(out, cbuf.Buffer[cbuf.Cursor:]) + out = append(out, cbuf.Buffer[:cbuf.Cursor]...) + return out } diff --git a/data_structures/counter.go b/data_structures/counter.go index affeab1..aefdafc 100644 --- a/data_structures/counter.go +++ b/data_structures/counter.go @@ -1,19 +1,19 @@ -package dbUtils +package dsUtils import "sync/atomic" type ICounter[i int32 | int64] interface { Set(new i) (old i) - Add(delta i) (val i) + Add(delta i) (newVal i) Get() (val i) } -// 32 bit int counter implementation of ICounter +// Counter32 is a 32 bit implementation of ICounter type Counter32 struct { val int32 } -// 32 bit int counter implementation of ICounter +// NewCounter32 creates new Counter32 instance func NewCounter32(initVal int32) ICounter[int32] { return &Counter32{val: initVal} } @@ -22,7 +22,7 @@ func (c *Counter32) Set(new int32) (old int32) { return atomic.SwapInt32(&c.val, new) } -func (c *Counter32) Add(delta int32) (val int32) { +func (c *Counter32) Add(delta int32) (newVal int32) { return atomic.AddInt32(&c.val, delta) } @@ -30,11 +30,12 @@ func (c *Counter32) Get() (val int32) { return atomic.LoadInt32(&c.val) } -// 64 bit int counter implementation of ICounter +// Counter64 is a 64 bit implementation of ICounter type Counter64 struct { val int64 } +// NewCounter64 creates new Counter64 instance func NewCounter64(initVal int64) ICounter[int64] { return &Counter64{val: initVal} } @@ -43,7 +44,7 @@ func (c *Counter64) Set(new int64) (old int64) { return atomic.SwapInt64(&c.val, new) } -func (c *Counter64) Add(delta int64) (val int64) { +func (c *Counter64) Add(delta int64) (newVal int64) { return atomic.AddInt64(&c.val, delta) } diff --git a/data_structures/ds.go b/data_structures/ds.go index 4d82c60..8687ff6 100644 --- a/data_structures/ds.go +++ b/data_structures/ds.go @@ -1,4 +1,2 @@ -/* -Provides useful data structures -*/ -package dbUtils +// Package dsUtils provides data structures +package dsUtils diff --git a/data_structures/pipe.go b/data_structures/pipe.go index d0208d6..e914a3c 100644 --- a/data_structures/pipe.go +++ b/data_structures/pipe.go @@ -1,4 +1,4 @@ -package dbUtils +package dsUtils import ( "context" @@ -44,10 +44,10 @@ type Pipe struct { closeChan chan struct{} } -// New creates new Pipe and returns pointer to it +// NewPipe creates new Pipe and returns pointer to it // Set sizeLimit = -1 for no limit // discard -func New(sizeLimit int) *Pipe { +func NewPipe(sizeLimit int) *Pipe { return &Pipe{ data: [][]byte{}, cap: sizeLimit, diff --git a/data_structures/queue.go b/data_structures/queue.go index f568dfc..4b5fa6b 100644 --- a/data_structures/queue.go +++ b/data_structures/queue.go @@ -1,6 +1,7 @@ -package dbUtils +package dsUtils import ( + "io" "sync" ) @@ -10,16 +11,14 @@ type IQueue[T any] interface { Len() int } -// This queue isn't suppose to be thread safe and suppose to be used only in a single thread -// If you will try to get item from an empty queue -- it will panic -// (Because if u have single-threaded app and one thread will be locked -// by getting an item from the queue, there will be no thread to put that item) -// For multythreaded usage -- use QueueSync +// Queue isn't thread safe and suppose to be used only in a single thread +// For multithreaded usage -- use QueueSync type Queue[T any] struct { IQueue[T] Buffer []T } +// NewQueue creates a new Queue instance func NewQueue[T any]() *Queue[T] { return &Queue[T]{} } @@ -28,20 +27,22 @@ func (q *Queue[T]) Push(item T) { q.Buffer = append(q.Buffer, item) } -func (q *Queue[T]) Get() T { - if len(q.Buffer) < 1 { - panic("Trying to get from an empty thread unsafe queue") +// Get returns a single item from a queue. If queue is empty -- it will return io.EOF +func (q *Queue[T]) Get() (T, error) { + if len(q.Buffer) == 0 { + var out T + return out, io.EOF } item := q.Buffer[0] q.Buffer = q.Buffer[1:] - return item + return item, nil } func (q *Queue[T]) Len() int { return len(q.Buffer) } -// Queue sync +// QueueSync is a Queue but for multi thread usage type QueueSync[T any] struct { IQueue[T] Buffer []T @@ -49,6 +50,7 @@ type QueueSync[T any] struct { NonEmptyCond *sync.Cond } +// NewQueueSync creates a new QueueSync instance func NewQueueSync[T any]() *QueueSync[T] { return &QueueSync[T]{ Mu: &sync.Mutex{}, diff --git a/data_structures/stack.go b/data_structures/stack.go index 7d9f391..59e9bff 100644 --- a/data_structures/stack.go +++ b/data_structures/stack.go @@ -1,6 +1,9 @@ -package dbUtils +package dsUtils -import "sync" +import ( + "io" + "sync" +) type IStack[T any] interface { Push(item T) @@ -8,38 +11,37 @@ type IStack[T any] interface { Len() int } -// This queue isn't suppose to be thread safe -// If you will try to get item from an empty queue -- it will panic -// (Because if u have single-threaded app and one thread will be locked -// by getting an item from the queue, there will be no thread to put that item) -// For multythreaded usage -- use StackSync (it's calm, never panics) +// Stack isn't thread safe and suppose to be used only in a single thread +// For multithreaded usage -- use Stack type Stack[T any] struct { IStack[T] Buffer []T } +// NewStack creates a new Stack instance func NewStack[T any]() *Stack[T] { return &Stack[T]{} } -func (q *Stack[T]) Push(item T) { - q.Buffer = append(q.Buffer, item) +func (s *Stack[T]) Push(item T) { + s.Buffer = append(s.Buffer, item) } -func (q *Stack[T]) Get() T { - if len(q.Buffer) < 1 { - panic("Trying to get from an empty thread unsafe queue") +func (s *Stack[T]) Get() (T, error) { + if len(s.Buffer) == 0 { + var out T + return out, io.EOF } - item := q.Buffer[len(q.Buffer)-1] - q.Buffer = q.Buffer[:len(q.Buffer)-1] - return item + item := s.Buffer[len(s.Buffer)-1] + s.Buffer = s.Buffer[:len(s.Buffer)-1] + return item, nil } -func (q *Stack[T]) Len() int { - return len(q.Buffer) +func (s *Stack[T]) Len() int { + return len(s.Buffer) } -// Stack sync +// StackSync is a Stack but for multi thread usage type StackSync[T any] struct { IStack[T] Buffer []T @@ -54,26 +56,26 @@ func NewStackSync[T any]() *StackSync[T] { } } -func (q *StackSync[T]) Push(item T) { - q.Mu.Lock() - q.Buffer = append(q.Buffer, item) - q.NonEmptyCond.Signal() - q.Mu.Unlock() +func (s *StackSync[T]) Push(item T) { + s.Mu.Lock() + s.Buffer = append(s.Buffer, item) + s.NonEmptyCond.Signal() + s.Mu.Unlock() } -func (q *StackSync[T]) Get() T { - q.Mu.Lock() - if len(q.Buffer) < 1 { - q.NonEmptyCond.Wait() +func (s *StackSync[T]) Get() T { + s.Mu.Lock() + if len(s.Buffer) < 1 { + s.NonEmptyCond.Wait() } - item := q.Buffer[len(q.Buffer)-1] - q.Buffer = q.Buffer[:len(q.Buffer)-1] - q.Mu.Unlock() + item := s.Buffer[len(s.Buffer)-1] + s.Buffer = s.Buffer[:len(s.Buffer)-1] + s.Mu.Unlock() return item } -func (q *StackSync[T]) Len() int { - q.Mu.Lock() - defer q.Mu.Lock() - return len(q.Buffer) +func (s *StackSync[T]) Len() int { + s.Mu.Lock() + defer s.Mu.Lock() + return len(s.Buffer) } diff --git a/data_structures/syncmap.go b/data_structures/syncmap.go index d6bf1d4..3f6f3ae 100644 --- a/data_structures/syncmap.go +++ b/data_structures/syncmap.go @@ -1,4 +1,4 @@ -package dbUtils +package dsUtils import ( "sync" diff --git a/data_structures/worker_pool.go b/data_structures/worker_pool.go index 1257c45..1cd2c82 100644 --- a/data_structures/worker_pool.go +++ b/data_structures/worker_pool.go @@ -1,4 +1,4 @@ -package dbUtils +package dsUtils // A combination of worker pool and a queue diff --git a/data_structures/worker_pool_func.go b/data_structures/worker_pool_func.go index 3c20b3b..3bc5215 100644 --- a/data_structures/worker_pool_func.go +++ b/data_structures/worker_pool_func.go @@ -1,4 +1,4 @@ -package dbUtils +package dsUtils import ( "context" diff --git a/data_structures/worker_pool_test.go b/data_structures/worker_pool_test.go index f95a514..1811fdc 100644 --- a/data_structures/worker_pool_test.go +++ b/data_structures/worker_pool_test.go @@ -1,4 +1,4 @@ -package dbUtils_test +package dsUtils_test import ( "context" diff --git a/pg/pg.go b/pg/pg.go index c10a259..6ad743e 100644 --- a/pg/pg.go +++ b/pg/pg.go @@ -19,7 +19,7 @@ type PgxQuerier interface { Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error) } -// Select executes query on a provided querier and tries to parse db response into antit +// Select executes query on a provided querier and tries to parse db response // Works only with objects // // Usage: