dsUtils refactoring + cache added

This commit is contained in:
Dmitry Anderson 2024-11-12 13:31:54 +01:00
parent 49248b6d62
commit 179310ac85
12 changed files with 245 additions and 122 deletions

157
data_structures/cache.go Normal file
View File

@ -0,0 +1,157 @@
package dsUtils
import (
"context"
"sync"
"time"
)
type KeyValue struct {
Key string
Value []byte
}
type MaybeExpirableData struct {
Data []byte
ExpirationTime time.Time
CanExpire bool
}
type ToBeInvalidatedData struct {
Key string
ExpirationTime time.Time
}
type ICache interface {
Get(key string) (item []byte, isSet bool)
Set(data map[string][]byte, canExpire bool, updateExpirationTime bool)
Remove(key string)
SetIfNotExists(key string, value []byte, canExpire bool) (isSet bool)
GetAndRemove(key string) (item []byte, isSet bool)
Run(ctx context.Context) error
}
// Cache provides simple cache functionality
// All objects in the same cache instance are getting same TTL
type Cache struct {
ICache
newExpirableItem *sync.Cond
lock *sync.Mutex
data map[string]MaybeExpirableData
invalidateOrder []ToBeInvalidatedData
ttl time.Duration
}
// NewCache Creates new Cache instance
func NewCache(ttl time.Duration) *Cache {
return &Cache{
ttl: ttl,
newExpirableItem: &sync.Cond{L: &sync.Mutex{}},
lock: &sync.Mutex{},
data: make(map[string]MaybeExpirableData),
invalidateOrder: make([]ToBeInvalidatedData, 0),
}
}
func (c *Cache) get(key string) (item MaybeExpirableData, isSet bool) {
item, isSet = c.data[key]
if item.ExpirationTime.Before(time.Now()) {
delete(c.data, key)
return MaybeExpirableData{}, false
}
return item, isSet
}
func (c *Cache) Get(key string) (data []byte, isSet bool) {
c.lock.Lock()
defer c.lock.Unlock()
item, isSet := c.get(key)
return item.Data, isSet
}
func (c *Cache) GetAndRemove(key string) (data []byte, isSet bool) {
c.lock.Lock()
item, isSet := c.get(key)
delete(c.data, key)
c.lock.Unlock()
return item.Data, isSet
}
// Internal function
func (c *Cache) set(key string, value []byte, canExpire bool, expirationTime time.Time) {
c.data[key] = MaybeExpirableData{
Data: value,
ExpirationTime: expirationTime,
CanExpire: canExpire,
}
if canExpire {
c.invalidateOrder = append(c.invalidateOrder, ToBeInvalidatedData{
Key: key,
ExpirationTime: expirationTime,
})
c.newExpirableItem.Signal()
}
}
func (c *Cache) Set(data map[string][]byte, canExpire bool, updateExpirationTime bool) {
c.lock.Lock()
for key, val := range data {
if !updateExpirationTime {
if item, exists := c.data[key]; exists {
c.set(key, val, item.CanExpire, item.ExpirationTime)
return
}
}
c.set(key, val, canExpire, time.Now().Add(c.ttl))
}
c.lock.Unlock()
}
func (c *Cache) Remove(key string) {
c.lock.Lock()
delete(c.data, key)
c.lock.Unlock()
}
func (c *Cache) SetIfNotExists(key string, value []byte, canExpire bool) (isSet bool) {
c.lock.Lock()
defer c.lock.Unlock()
_, exists := c.data[key]
if exists {
return false
}
c.set(key, value, canExpire, time.Now().Add(c.ttl))
return true
}
func (c *Cache) timeInvalidate() {
c.lock.Lock()
if len(c.invalidateOrder) == 0 {
c.lock.Unlock()
c.newExpirableItem.Wait()
c.lock.Lock()
}
toBeInvalidated := c.invalidateOrder[0]
c.lock.Unlock()
time.After(time.Now().Sub(toBeInvalidated.ExpirationTime))
c.lock.Lock()
item, exists := c.data[toBeInvalidated.Key]
if exists && item.CanExpire {
if item.ExpirationTime.After(time.Now()) {
delete(c.data, toBeInvalidated.Key)
}
}
c.invalidateOrder = c.invalidateOrder[1:]
c.lock.Unlock()
}
func (c *Cache) Run(ctx context.Context) error {
// Invalidator loop
go func() {
for {
c.timeInvalidate()
}
}()
<-ctx.Done()
return ctx.Err()
}

View File

@ -1,25 +1,32 @@
package dbUtils package dsUtils
import ( // ICBuf is a basic Circular buffer interface
"sync"
)
// Basic Circular buffer interface
type ICBuf[T any] interface { type ICBuf[T any] interface {
Push(item T) Push(item T)
Get() []T Get() []T
} }
// Implementation of ICBuf interface // CBuf is the implementation of ICBuf interface
// Simpliest circular buffer implementation // Simplest circular buffer implementation
// If thread safety is needed -- use CBufSync // If thread safety is needed -- use CBufSync
//
// WARNING: It's not thread-safe by default,
// use it with a lock
type CBuf[T any] struct { type CBuf[T any] struct {
ICBuf[T]
Buffer []T Buffer []T
Cursor int Cursor int
Capacity int Capacity int
} }
// Same as pushing back to array // NewCBuf creates a new CBuf instance
func NewCBuf[T any](capacity int) *CBuf[T] {
return &CBuf[T]{
Buffer: make([]T, capacity),
Capacity: capacity,
}
}
func (cbuf *CBuf[T]) Push(item T) { func (cbuf *CBuf[T]) Push(item T) {
cbuf.Buffer[cbuf.Cursor] = item cbuf.Buffer[cbuf.Cursor] = item
cbuf.Cursor++ cbuf.Cursor++
@ -42,54 +49,10 @@ func (cbuf *CBuf[T]) PushFront(item T) {
cbuf.Cursor = 0 cbuf.Cursor = 0
} }
// Just returning the whole buffer in the correct order // Get returns the whole buffer in the correct order
func (cbuf *CBuf[T]) Get() []T { func (cbuf *CBuf[T]) Get() []T {
return append(cbuf.Buffer[cbuf.Cursor:], cbuf.Buffer[:cbuf.Cursor]...) out := make([]T, 0, cbuf.Capacity)
} copy(out, cbuf.Buffer[cbuf.Cursor:])
out = append(out, cbuf.Buffer[:cbuf.Cursor]...)
func NewCBuf[T any](capacity int) *CBuf[T] { return out
return &CBuf[T]{
Buffer: make([]T, capacity),
Capacity: capacity,
}
}
// Same CBuf, but with mutex
type CBufSync[T any] struct {
Buffer []T
Cursor int
Capacity int
Mu *sync.RWMutex
}
func NewCBufSync[T any](capacity int) *CBufSync[T] {
return &CBufSync[T]{
Mu: &sync.RWMutex{},
}
}
func (cbuf *CBufSync[T]) Push(item T) {
cbuf.Mu.Lock()
cbuf.Buffer[cbuf.Cursor] = item
cbuf.Cursor++
if cbuf.Cursor >= cbuf.Capacity {
cbuf.Cursor = 0
}
cbuf.Mu.Unlock()
}
func (cbuf *CBufSync[T]) PushFront(item T) {
cbuf.Mu.Lock()
cbuf.Buffer = append(
append(cbuf.Buffer[cbuf.Cursor+1:], cbuf.Buffer[:cbuf.Cursor]...),
item,
)
cbuf.Cursor = 0
cbuf.Mu.Unlock()
}
func (cbuf *CBufSync[T]) Get() []T {
cbuf.Mu.RLock()
defer cbuf.Mu.RUnlock()
return append(cbuf.Buffer[cbuf.Cursor:], cbuf.Buffer[:cbuf.Cursor]...)
} }

View File

@ -1,19 +1,19 @@
package dbUtils package dsUtils
import "sync/atomic" import "sync/atomic"
type ICounter[i int32 | int64] interface { type ICounter[i int32 | int64] interface {
Set(new i) (old i) Set(new i) (old i)
Add(delta i) (val i) Add(delta i) (newVal i)
Get() (val i) Get() (val i)
} }
// 32 bit int counter implementation of ICounter // Counter32 is a 32 bit implementation of ICounter
type Counter32 struct { type Counter32 struct {
val int32 val int32
} }
// 32 bit int counter implementation of ICounter // NewCounter32 creates new Counter32 instance
func NewCounter32(initVal int32) ICounter[int32] { func NewCounter32(initVal int32) ICounter[int32] {
return &Counter32{val: initVal} return &Counter32{val: initVal}
} }
@ -22,7 +22,7 @@ func (c *Counter32) Set(new int32) (old int32) {
return atomic.SwapInt32(&c.val, new) return atomic.SwapInt32(&c.val, new)
} }
func (c *Counter32) Add(delta int32) (val int32) { func (c *Counter32) Add(delta int32) (newVal int32) {
return atomic.AddInt32(&c.val, delta) return atomic.AddInt32(&c.val, delta)
} }
@ -30,11 +30,12 @@ func (c *Counter32) Get() (val int32) {
return atomic.LoadInt32(&c.val) return atomic.LoadInt32(&c.val)
} }
// 64 bit int counter implementation of ICounter // Counter64 is a 64 bit implementation of ICounter
type Counter64 struct { type Counter64 struct {
val int64 val int64
} }
// NewCounter64 creates new Counter64 instance
func NewCounter64(initVal int64) ICounter[int64] { func NewCounter64(initVal int64) ICounter[int64] {
return &Counter64{val: initVal} return &Counter64{val: initVal}
} }
@ -43,7 +44,7 @@ func (c *Counter64) Set(new int64) (old int64) {
return atomic.SwapInt64(&c.val, new) return atomic.SwapInt64(&c.val, new)
} }
func (c *Counter64) Add(delta int64) (val int64) { func (c *Counter64) Add(delta int64) (newVal int64) {
return atomic.AddInt64(&c.val, delta) return atomic.AddInt64(&c.val, delta)
} }

View File

@ -1,4 +1,2 @@
/* // Package dsUtils provides data structures
Provides useful data structures package dsUtils
*/
package dbUtils

View File

@ -1,4 +1,4 @@
package dbUtils package dsUtils
import ( import (
"context" "context"
@ -44,10 +44,10 @@ type Pipe struct {
closeChan chan struct{} closeChan chan struct{}
} }
// New creates new Pipe and returns pointer to it // NewPipe creates new Pipe and returns pointer to it
// Set sizeLimit = -1 for no limit // Set sizeLimit = -1 for no limit
// discard // discard
func New(sizeLimit int) *Pipe { func NewPipe(sizeLimit int) *Pipe {
return &Pipe{ return &Pipe{
data: [][]byte{}, data: [][]byte{},
cap: sizeLimit, cap: sizeLimit,

View File

@ -1,6 +1,7 @@
package dbUtils package dsUtils
import ( import (
"io"
"sync" "sync"
) )
@ -10,16 +11,14 @@ type IQueue[T any] interface {
Len() int Len() int
} }
// This queue isn't suppose to be thread safe and suppose to be used only in a single thread // Queue isn't thread safe and suppose to be used only in a single thread
// If you will try to get item from an empty queue -- it will panic // For multithreaded usage -- use QueueSync
// (Because if u have single-threaded app and one thread will be locked
// by getting an item from the queue, there will be no thread to put that item)
// For multythreaded usage -- use QueueSync
type Queue[T any] struct { type Queue[T any] struct {
IQueue[T] IQueue[T]
Buffer []T Buffer []T
} }
// NewQueue creates a new Queue instance
func NewQueue[T any]() *Queue[T] { func NewQueue[T any]() *Queue[T] {
return &Queue[T]{} return &Queue[T]{}
} }
@ -28,20 +27,22 @@ func (q *Queue[T]) Push(item T) {
q.Buffer = append(q.Buffer, item) q.Buffer = append(q.Buffer, item)
} }
func (q *Queue[T]) Get() T { // Get returns a single item from a queue. If queue is empty -- it will return io.EOF
if len(q.Buffer) < 1 { func (q *Queue[T]) Get() (T, error) {
panic("Trying to get from an empty thread unsafe queue") if len(q.Buffer) == 0 {
var out T
return out, io.EOF
} }
item := q.Buffer[0] item := q.Buffer[0]
q.Buffer = q.Buffer[1:] q.Buffer = q.Buffer[1:]
return item return item, nil
} }
func (q *Queue[T]) Len() int { func (q *Queue[T]) Len() int {
return len(q.Buffer) return len(q.Buffer)
} }
// Queue sync // QueueSync is a Queue but for multi thread usage
type QueueSync[T any] struct { type QueueSync[T any] struct {
IQueue[T] IQueue[T]
Buffer []T Buffer []T
@ -49,6 +50,7 @@ type QueueSync[T any] struct {
NonEmptyCond *sync.Cond NonEmptyCond *sync.Cond
} }
// NewQueueSync creates a new QueueSync instance
func NewQueueSync[T any]() *QueueSync[T] { func NewQueueSync[T any]() *QueueSync[T] {
return &QueueSync[T]{ return &QueueSync[T]{
Mu: &sync.Mutex{}, Mu: &sync.Mutex{},

View File

@ -1,6 +1,9 @@
package dbUtils package dsUtils
import "sync" import (
"io"
"sync"
)
type IStack[T any] interface { type IStack[T any] interface {
Push(item T) Push(item T)
@ -8,38 +11,37 @@ type IStack[T any] interface {
Len() int Len() int
} }
// This queue isn't suppose to be thread safe // Stack isn't thread safe and suppose to be used only in a single thread
// If you will try to get item from an empty queue -- it will panic // For multithreaded usage -- use Stack
// (Because if u have single-threaded app and one thread will be locked
// by getting an item from the queue, there will be no thread to put that item)
// For multythreaded usage -- use StackSync (it's calm, never panics)
type Stack[T any] struct { type Stack[T any] struct {
IStack[T] IStack[T]
Buffer []T Buffer []T
} }
// NewStack creates a new Stack instance
func NewStack[T any]() *Stack[T] { func NewStack[T any]() *Stack[T] {
return &Stack[T]{} return &Stack[T]{}
} }
func (q *Stack[T]) Push(item T) { func (s *Stack[T]) Push(item T) {
q.Buffer = append(q.Buffer, item) s.Buffer = append(s.Buffer, item)
} }
func (q *Stack[T]) Get() T { func (s *Stack[T]) Get() (T, error) {
if len(q.Buffer) < 1 { if len(s.Buffer) == 0 {
panic("Trying to get from an empty thread unsafe queue") var out T
return out, io.EOF
} }
item := q.Buffer[len(q.Buffer)-1] item := s.Buffer[len(s.Buffer)-1]
q.Buffer = q.Buffer[:len(q.Buffer)-1] s.Buffer = s.Buffer[:len(s.Buffer)-1]
return item return item, nil
} }
func (q *Stack[T]) Len() int { func (s *Stack[T]) Len() int {
return len(q.Buffer) return len(s.Buffer)
} }
// Stack sync // StackSync is a Stack but for multi thread usage
type StackSync[T any] struct { type StackSync[T any] struct {
IStack[T] IStack[T]
Buffer []T Buffer []T
@ -54,26 +56,26 @@ func NewStackSync[T any]() *StackSync[T] {
} }
} }
func (q *StackSync[T]) Push(item T) { func (s *StackSync[T]) Push(item T) {
q.Mu.Lock() s.Mu.Lock()
q.Buffer = append(q.Buffer, item) s.Buffer = append(s.Buffer, item)
q.NonEmptyCond.Signal() s.NonEmptyCond.Signal()
q.Mu.Unlock() s.Mu.Unlock()
} }
func (q *StackSync[T]) Get() T { func (s *StackSync[T]) Get() T {
q.Mu.Lock() s.Mu.Lock()
if len(q.Buffer) < 1 { if len(s.Buffer) < 1 {
q.NonEmptyCond.Wait() s.NonEmptyCond.Wait()
} }
item := q.Buffer[len(q.Buffer)-1] item := s.Buffer[len(s.Buffer)-1]
q.Buffer = q.Buffer[:len(q.Buffer)-1] s.Buffer = s.Buffer[:len(s.Buffer)-1]
q.Mu.Unlock() s.Mu.Unlock()
return item return item
} }
func (q *StackSync[T]) Len() int { func (s *StackSync[T]) Len() int {
q.Mu.Lock() s.Mu.Lock()
defer q.Mu.Lock() defer s.Mu.Lock()
return len(q.Buffer) return len(s.Buffer)
} }

View File

@ -1,4 +1,4 @@
package dbUtils package dsUtils
import ( import (
"sync" "sync"

View File

@ -1,4 +1,4 @@
package dbUtils package dsUtils
// A combination of worker pool and a queue // A combination of worker pool and a queue

View File

@ -1,4 +1,4 @@
package dbUtils package dsUtils
import ( import (
"context" "context"

View File

@ -1,4 +1,4 @@
package dbUtils_test package dsUtils_test
import ( import (
"context" "context"

View File

@ -19,7 +19,7 @@ type PgxQuerier interface {
Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error) Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
} }
// Select executes query on a provided querier and tries to parse db response into antit // Select executes query on a provided querier and tries to parse db response
// Works only with objects // Works only with objects
// //
// Usage: // Usage: