From 3c069df5c120de181e8e332fdf86bb7f7309d5e3 Mon Sep 17 00:00:00 2001 From: Dmitry Anderson <4nd3r5z0n@gmail.com> Date: Sun, 24 Nov 2024 15:04:52 +0100 Subject: [PATCH] Minor changes --- .idea/.gitignore | 8 -- .idea/vcs.xml | 2 +- common/base_errors.go | 50 +++++++++ common/block.go | 134 ++++++++++++++++++++++++- common/block_test.go | 3 + common/burning.go | 15 +++ common/common.go | 6 +- common/get_txs.go | 34 +++++++ common/node_info.go | 5 + common/tx.go | 119 +++++++++++++--------- infra.docker-compose.yml | 8 ++ server/api/http/api.go | 36 ++++++- server/api/http/transaction.go | 29 ++++-- server/core/block.go | 54 +++++++--- server/core/blocks_commiter.go | 15 +-- server/core/blocks_commiter_test.go | 7 ++ server/core/blocks_creator.go | 59 ++++++++--- server/core/commit_block.go | 41 +++++++- server/migrations/0.up.sql | 1 - server/repository/entities/entities.go | 40 +++----- server/repository/repo.go | 23 +++-- server/run.go | 20 ++-- server/server.go | 33 +++--- server/setup.go | 15 ++- 24 files changed, 580 insertions(+), 177 deletions(-) delete mode 100644 .idea/.gitignore create mode 100644 common/base_errors.go create mode 100644 common/block_test.go create mode 100644 common/get_txs.go create mode 100644 common/node_info.go create mode 100644 infra.docker-compose.yml create mode 100644 server/core/blocks_commiter_test.go diff --git a/.idea/.gitignore b/.idea/.gitignore deleted file mode 100644 index 13566b8..0000000 --- a/.idea/.gitignore +++ /dev/null @@ -1,8 +0,0 @@ -# Default ignored files -/shelf/ -/workspace.xml -# Editor-based HTTP Client requests -/httpRequests/ -# Datasource local storage ignored files -/dataSources/ -/dataSources.local.xml diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 94a25f7..35eb1dd 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -1,6 +1,6 @@ - + \ No newline at end of file diff --git a/common/base_errors.go b/common/base_errors.go new file mode 100644 index 0000000..73d9fbd --- /dev/null +++ b/common/base_errors.go @@ -0,0 +1,50 @@ +package common + +import ( + "errors" + "github.com/matchsystems/werr" + "net/http" +) + +var ( + ErrNotImplemented = errors.New("not implemented yet") + + ErrCanceled = errors.New("canceled") + ErrDeadlineExceeded = errors.New("deadline exceeded") + ErrRemoteServiceFailed = errors.New("remote service failed") + + ErrInvalidAction = errors.New("invalid action") + ErrInvalidArgument = errors.New("invalid argument") + ErrOutOfRange = errors.New("out of range") + + ErrPermissionDenied = errors.New("permission denied") + ErrUnauthenticated = errors.New("unauthenticated") + + ErrEntityExists = errors.New("entity already exists") + ErrEntityNotFound = errors.New("entity not found") + ErrEntityOutdated = errors.New("entity outdated") +) + +func GetHttpCode(err error) int { + err = werr.UnwrapAll(err) + switch { + case errors.Is(err, ErrNotImplemented): + return http.StatusNotImplemented + case errors.Is(err, ErrCanceled), errors.Is(err, ErrDeadlineExceeded): + return http.StatusRequestTimeout + case errors.Is(err, ErrRemoteServiceFailed): + return http.StatusServiceUnavailable + case errors.Is(err, ErrInvalidAction), errors.Is(err, ErrInvalidArgument), errors.Is(err, ErrOutOfRange): + return http.StatusBadRequest + case errors.Is(err, ErrPermissionDenied): + return http.StatusForbidden + case errors.Is(err, ErrUnauthenticated): + return http.StatusUnauthorized + case errors.Is(err, ErrEntityExists): + return http.StatusConflict + case errors.Is(err, ErrEntityNotFound): + return http.StatusNotFound + default: + return http.StatusInternalServerError + } +} diff --git a/common/block.go b/common/block.go index 33321d1..89d9ddc 100644 --- a/common/block.go +++ b/common/block.go @@ -1,10 +1,138 @@ package common +import ( + "bytes" + "crypto/sha512" + "encoding/binary" + "encoding/json" + "github.com/matchsystems/werr" + "io" + "time" +) + +// BlockSecsDiff is the time in second that passes +// between blocks rotation +const BlockSecsDiff = 100 + +func GetBlockId(t time.Time) int64 { + // because number is int -- it will just cut off the floating point part + return (t.UTC().Unix() / BlockSecsDiff) * BlockSecsDiff +} + type BlockJson struct { + Id int64 `json:"id"` + PrevHash string `json:"prev_hash"` + Hash string `json:"hash"` + TXs []json.RawMessage `json:"txs"` + IsFinished bool `json:"is_finished"` + ByteEncoding string `json:"byte_encoding"` } type Block struct { - Id int64 - PrevHash []byte - Hash []byte + Id int64 + PrevHash []byte + Hash []byte + TXs []*Tx + IsFinished bool +} + +func (block *Block) CalcHash() ([]byte, error) { + hash := sha512.New() + if err := binary.Write(hash, binary.BigEndian, block.Id); err != nil { + return nil, werr.Wrapf(err, "failed to write block id into hash buffer for block %d", block.Id) + } + hash.Write(block.PrevHash) + for _, tx := range block.TXs { + hash.Write(tx.Hash) + } + return hash.Sum(nil), nil +} + +// ValidateHash calculates new block hash and compares it to current block.Hash +// returning is block hash is valid +func (block *Block) ValidateHash() bool { + hash, err := block.CalcHash() + if err != nil { + return false + } + return bytes.Equal(block.Hash, hash) +} + +// ValidateTXs returns the number of invalid transactions +// and map of hashes : reasons why txs were invalid. +// If invalid count > 0 -- the whole block isn't legit +func (block *Block) ValidateTXs() (invalidCount int, invalidTXs map[string]string) { + invalidTXs = make(map[string]string) + for _, tx := range block.TXs { + if err := tx.Validate(); err != nil { + invalidTXs[tx.HashString()] = err.Error() + invalidCount++ + } + } + return invalidCount, invalidTXs +} + +func (block *Block) ToJSON(byteEncodingStr string) ([]byte, error) { + encoding, err := GetEncoding(byteEncodingStr) + if err != nil { + return nil, err + } + var txData json.RawMessage + txsData := make([]json.RawMessage, len(block.TXs)) + for i, tx := range block.TXs { + if txData, err = tx.ToJSON(byteEncodingStr); err != nil { + return nil, werr.Wrapf(err, "error while converting transaction to JSON %d", i) + } + txsData[i] = txData + } + data := &BlockJson{ + Id: block.Id, + PrevHash: encoding.EncodeToString(block.PrevHash), + Hash: encoding.EncodeToString(block.Hash), + TXs: txsData, + IsFinished: block.IsFinished, + } + jsonBytes, err := json.Marshal(data) + if err != nil { + return nil, werr.Wrapf(err, "error while marshaling") + } + return jsonBytes, err +} + +func (block *BlockJson) Process() (*Block, error) { + var err error + txs := make([]*Tx, len(block.TXs)) + for i, jsonTx := range block.TXs { + if txs[i], err = TxFromJSON(jsonTx); err != nil { + return nil, err + } + } + out := &Block{ + Id: block.Id, + TXs: txs, + IsFinished: block.IsFinished, + } + encoding, err := GetEncoding(block.ByteEncoding) + if err != nil { + return nil, err + } + if out.Hash, err = encoding.DecodeString(block.Hash); err != nil { + return nil, werr.Wrapf(err, "error while decoding hash") + } + return out, nil +} + +func BlockFromJSON(marshaledData []byte) (opts *Block, err error) { + data := &BlockJson{} + if err := json.Unmarshal(marshaledData, data); err != nil { + return nil, werr.Wrapf(err, "error while unmarshaling") + } + return data.Process() +} +func BlockFromJsonReader(r io.Reader) (opts *Block, err error) { + data := &BlockJson{} + if err := json.NewDecoder(r).Decode(&data); err != nil { + return nil, werr.Wrapf(err, "error while unmarshaling") + } + return data.Process() } diff --git a/common/block_test.go b/common/block_test.go new file mode 100644 index 0000000..6b5e538 --- /dev/null +++ b/common/block_test.go @@ -0,0 +1,3 @@ +package common + +// TODO diff --git a/common/burning.go b/common/burning.go index e058658..18c036b 100644 --- a/common/burning.go +++ b/common/burning.go @@ -7,3 +7,18 @@ const BurningRate = 0.001 func CalcBurning(trxAmount float64) float64 { return BurningRate * math.Sqrt(trxAmount) } + +// ValidateBurning doesn't return valid/invalid like usual Validate functions. +// Instead, it returns difference between expected and set values. +// Bcs of a differences in how programming languages might be implemented +// or approach numeric types (yes, JS, I'm about u) we might not get exactly the same burning amount, +// but still it should be a very close value. +// To get boolean value -- use ValidateBurningBool instead +func ValidateBurning(tx *Tx) float64 { + return CalcBurning(tx.Amount) - tx.AmountBurned +} + +func ValidateBurningBool(tx *Tx) bool { + // Let's say, that before that count of digits we shouldn't have any mistake + return math.Abs(ValidateBurning(tx)) < 0.1e-10 +} diff --git a/common/common.go b/common/common.go index f84a5d3..98ab5e3 100644 --- a/common/common.go +++ b/common/common.go @@ -3,7 +3,7 @@ package common import ( "encoding/base64" "encoding/binary" - "errors" + "github.com/matchsystems/werr" ) const ( @@ -11,6 +11,7 @@ const ( EncodingRawStd = "encoring_row_std" ) +// GetEncoding Returns base64 encoding based on the provided string func GetEncoding(encoding string) (*base64.Encoding, error) { switch encoding { case EncodingRawUrl: @@ -18,12 +19,11 @@ func GetEncoding(encoding string) (*base64.Encoding, error) { case EncodingRawStd: return base64.RawStdEncoding, nil default: - return nil, errors.New("unsupported encoding") + return nil, werr.Wrapf(ErrInvalidArgument, "unknown encoding %s", encoding) } } // float64ToBytes converts float64 to []byte -// May be moved to nettools later func float64ToBytes(f float64) []byte { buf := make([]byte, 8) binary.BigEndian.PutUint64(buf, uint64(f)) diff --git a/common/get_txs.go b/common/get_txs.go new file mode 100644 index 0000000..e7ef3e8 --- /dev/null +++ b/common/get_txs.go @@ -0,0 +1,34 @@ +package common + +import "encoding/json" + +const () + +type GetTxOpts struct { + TxHash []byte `param:"tx_hash"` + UserPubKey []byte `param:"user_pub_key"` + GetSent bool `param:"get_sent"` + GetReceived bool `param:"get_received"` + OrderAsc bool `param:"order_asc"` + Limit int `param:"limit"` + Offset int `param:"offset"` + Before int64 `param:"before"` + After int64 `param:"after"` + ByteEncoding string `param:"byte_encoding"` +} + +type GetTxRespJson struct { + TXs []TxJson `json:"txs"` + PendingTXs []TxJson `json:"pending_txs"` + Count int `json:"count"` + + ByteEncoding string `json:"byte_encoding"` +} +type GetTxResp struct { + TXs []Tx + PendingTXs []Tx +} + +func (resp *GetTxResp) ToJson() ([]byte, error) { + return +} diff --git a/common/node_info.go b/common/node_info.go new file mode 100644 index 0000000..c3faa29 --- /dev/null +++ b/common/node_info.go @@ -0,0 +1,5 @@ +package common + +type NodeInfo struct { + NodeVersion string `json:"node_version"` +} diff --git a/common/tx.go b/common/tx.go index 296543d..35136b3 100644 --- a/common/tx.go +++ b/common/tx.go @@ -23,10 +23,6 @@ func GetTxHash(receiverPubKey []byte, message string, amount float64, blockId, t return hash.Sum(nil) } -func GetBlockId(t time.Time) int64 { - return (t.UTC().Unix() / 100) * 100 -} - type NewTxOptsJson struct { Hash string `json:"hash"` // use function GetTxHash // (CreatedAt / 100) * 100 @@ -103,7 +99,7 @@ func MakeNewTxOpts(senderPubKey, senderPrivateKey, receiverPubKey []byte, now := time.Now().UTC() createdAt := now.Unix() createdAtNs := now.UnixNano() - blockId := (createdAt / 100) * 100 + blockId := (createdAt / BlockSecsDiff) * BlockSecsDiff txHash := GetTxHash(receiverPubKey, msg, amount, blockId, createdAt, createdAtNs) sig := ed25519.Sign(senderPrivateKey, txHash) return &NewTxOpts{ @@ -121,7 +117,7 @@ func MakeNewTxOpts(senderPubKey, senderPrivateKey, receiverPubKey []byte, } func (opts *NewTxOpts) ValidateBlockId() (valid bool) { - return opts.BlockId == (opts.CreatedAt/100)*100 + return opts.BlockId == (opts.CreatedAt/BlockSecsDiff)*BlockSecsDiff } func (opts *NewTxOpts) ValidateHash() (valid bool) { return bytes.Equal(opts.Hash, GetTxHash( @@ -131,17 +127,40 @@ func (opts *NewTxOpts) ValidateHash() (valid bool) { func (opts *NewTxOpts) ValidateSignature() (valid bool) { return ed25519.Verify(opts.SenderPublicKey, opts.Hash, opts.Signature) } - -func (opts *Tx) ValidateBlockId() (valid bool) { - return opts.BlockId == (opts.CreatedAt/100)*100 +func (opts *NewTxOpts) Validate() (err error) { + switch { + case !opts.ValidateBlockId(): + return werr.Wrapf(ErrInvalidArgument, "invalid block id") + case !opts.ValidateHash(): + return werr.Wrapf(ErrInvalidArgument, "invalid hash") + case !opts.ValidateSignature(): + return werr.Wrapf(ErrInvalidArgument, "invalid signature") + } + return nil } -func (opts *Tx) ValidateHash() (valid bool) { - return bytes.Equal(opts.Hash, GetTxHash( - opts.ReceiverPublicKey, opts.Message, opts.Amount, opts.BlockId, opts.CreatedAt, opts.CreatedAtNs, + +func (tx *Tx) ValidateBlockId() (valid bool) { + return tx.BlockId == (tx.CreatedAt/BlockSecsDiff)*BlockSecsDiff +} +func (tx *Tx) ValidateHash() (valid bool) { + return bytes.Equal(tx.Hash, GetTxHash( + tx.ReceiverPublicKey, tx.Message, tx.Amount, tx.BlockId, tx.CreatedAt, tx.CreatedAtNs, )) } -func (opts *Tx) ValidateSignature() (valid bool) { - return ed25519.Verify(opts.SenderPublicKey, opts.Hash, opts.Signature) +func (tx *Tx) ValidateSignature() (valid bool) { + return ed25519.Verify(tx.SenderPublicKey, tx.Hash, tx.Signature) +} +func (tx *Tx) Validate() (err error) { + if tx.ValidateBlockId() { + return werr.Wrapf(ErrInvalidArgument, "invalid block id") + } else if tx.ValidateHash() { + return werr.Wrapf(ErrInvalidArgument, "invalid hash") + } else if tx.ValidateSignature() { + return werr.Wrapf(ErrInvalidArgument, "invalid signature") + } else if ValidateBurningBool(tx) { + return werr.Wrapf(ErrInvalidArgument, "invalid burning") + } + return nil } // HashString uses base64.RawURLEncoding encoding @@ -155,19 +174,19 @@ func (opts *NewTxOpts) SignatureString() string { } // HashString uses base64.RawURLEncoding encoding -func (opts *Tx) HashString() string { - return base64.RawURLEncoding.EncodeToString(opts.Hash) +func (tx *Tx) HashString() string { + return base64.RawURLEncoding.EncodeToString(tx.Hash) } // SignatureString uses base64.RawURLEncoding encoding -func (opts *Tx) SignatureString() string { - return base64.RawURLEncoding.EncodeToString(opts.Signature) +func (tx *Tx) SignatureString() string { + return base64.RawURLEncoding.EncodeToString(tx.Signature) } func (opts *NewTxOpts) ToJSON(byteEncodingStr string) ([]byte, error) { encoding, err := GetEncoding(byteEncodingStr) if err != nil { - return nil, werr.Wrapf(err, "error while trying to get byte->string encoding") + return nil, werr.Wrapf(err, "error while getting byte->string encoding") } data := &NewTxOptsJson{ Hash: encoding.EncodeToString(opts.Hash), @@ -184,36 +203,36 @@ func (opts *NewTxOpts) ToJSON(byteEncodingStr string) ([]byte, error) { } jsonBytes, err := json.Marshal(data) if err != nil { - return nil, werr.Wrapf(err, "error while trying to marshal to json") + return nil, werr.Wrapf(err, "error while marshaling") } return jsonBytes, nil } -func (opts *Tx) ToJSON(byteEncodingStr string) ([]byte, error) { +func (tx *Tx) ToJSON(byteEncodingStr string) ([]byte, error) { encoding, err := GetEncoding(byteEncodingStr) if err != nil { - return nil, werr.Wrapf(err, "error while trying to get byte->string encoding") + return nil, werr.Wrapf(err, "error while getting byte->string encoding") } data := &TxJson{ - Hash: encoding.EncodeToString(opts.Hash), - BlockId: opts.BlockId, - SenderPublicKey: encoding.EncodeToString(opts.SenderPublicKey), - ReceiverPublicKey: encoding.EncodeToString(opts.ReceiverPublicKey), - IsReward: opts.IsReward, - Amount: opts.Amount, - AmountBurned: opts.AmountBurned, - Message: opts.Message, - Signature: encoding.EncodeToString(opts.Signature), - CreatedAt: opts.CreatedAt, - CreatedAtNs: opts.CreatedAtNs, - AddedAt: opts.AddedAt, - IsAdded: opts.IsAdded, + Hash: encoding.EncodeToString(tx.Hash), + BlockId: tx.BlockId, + SenderPublicKey: encoding.EncodeToString(tx.SenderPublicKey), + ReceiverPublicKey: encoding.EncodeToString(tx.ReceiverPublicKey), + IsReward: tx.IsReward, + Amount: tx.Amount, + AmountBurned: tx.AmountBurned, + Message: tx.Message, + Signature: encoding.EncodeToString(tx.Signature), + CreatedAt: tx.CreatedAt, + CreatedAtNs: tx.CreatedAtNs, + AddedAt: tx.AddedAt, + IsAdded: tx.IsAdded, ByteEncoding: byteEncodingStr, } jsonBytes, err := json.Marshal(data) if err != nil { - return nil, werr.Wrapf(err, "error while trying to marshal to json") + return nil, werr.Wrapf(err, "error while marshaling") } return jsonBytes, nil } @@ -229,19 +248,19 @@ func (opts *NewTxOptsJson) Process() (*NewTxOpts, error) { } encoding, err := GetEncoding(opts.ByteEncoding) if err != nil { - return nil, werr.Wrapf(err, "error while trying to get byte->string encoding") + return nil, werr.Wrapf(err, "error while getting byte->string encoding") } if out.Hash, err = encoding.DecodeString(opts.Hash); err != nil { - return nil, werr.Wrapf(err, "error while trying to decode hash") + return nil, werr.Wrapf(err, "error while decoding hash") } if out.SenderPublicKey, err = encoding.DecodeString(opts.SenderPublicKey); err != nil { - return nil, werr.Wrapf(err, "error while trying to decode sender public key") + return nil, werr.Wrapf(err, "error while decoding sender public key") } if out.ReceiverPublicKey, err = encoding.DecodeString(opts.ReceiverPublicKey); err != nil { - return nil, werr.Wrapf(err, "error while trying to decode receiver public key") + return nil, werr.Wrapf(err, "error while decoding receiver public key") } if out.Signature, err = encoding.DecodeString(opts.Signature); err != nil { - return nil, werr.Wrapf(err, "error while trying to decode signature") + return nil, werr.Wrapf(err, "error while decoding signature") } return out, nil } @@ -279,19 +298,19 @@ func (tx *TxJson) Process() (*Tx, error) { } encoding, err := GetEncoding(tx.ByteEncoding) if err != nil { - return nil, werr.Wrapf(err, "error while trying to get byte->string encoding") + return nil, err } if out.Hash, err = encoding.DecodeString(tx.Hash); err != nil { - return nil, werr.Wrapf(err, "error while trying to decode hash") + return nil, werr.Wrapf(err, "error while decoding hash") } if out.SenderPublicKey, err = encoding.DecodeString(tx.SenderPublicKey); err != nil { - return nil, werr.Wrapf(err, "error while trying to decode sender public key") + return nil, werr.Wrapf(err, "error while decoding sender public key") } if out.ReceiverPublicKey, err = encoding.DecodeString(tx.ReceiverPublicKey); err != nil { - return nil, werr.Wrapf(err, "error while trying to decode receiver public key") + return nil, werr.Wrapf(err, "error while decoding receiver public key") } if out.Signature, err = encoding.DecodeString(tx.Signature); err != nil { - return nil, werr.Wrapf(err, "error while trying to decode signature") + return nil, werr.Wrapf(err, "error while decoding signature") } return out, nil } @@ -299,14 +318,14 @@ func (tx *TxJson) Process() (*Tx, error) { func NewTxOptsFromJSON(marshaledData []byte) (opts *NewTxOpts, err error) { data := &NewTxOptsJson{} if err := json.Unmarshal(marshaledData, data); err != nil { - return nil, werr.Wrapf(err, "error while trying to unmarshal new transaction options") + return nil, werr.Wrapf(err, "error while unmarshaling") } return data.Process() } func NewTxOptsFromJsonReader(r io.Reader) (opts *NewTxOpts, err error) { data := &NewTxOptsJson{} if err := json.NewDecoder(r).Decode(&data); err != nil { - return nil, werr.Wrapf(err, "error while trying to unmarshal new transaction options") + return nil, werr.Wrapf(err, "error while unmarshaling") } return data.Process() } @@ -314,14 +333,14 @@ func NewTxOptsFromJsonReader(r io.Reader) (opts *NewTxOpts, err error) { func TxFromJSON(marshaledData []byte) (opts *Tx, err error) { data := &TxJson{} if err := json.Unmarshal(marshaledData, data); err != nil { - return nil, werr.Wrapf(err, "error while trying to unmarshal new transaction options") + return nil, werr.Wrapf(err, "error while unmarshaling") } return data.Process() } func TxFromJsonReader(r io.Reader) (opts *Tx, err error) { data := &TxJson{} if err := json.NewDecoder(r).Decode(&data); err != nil { - return nil, werr.Wrapf(err, "error while trying to unmarshal new transaction options") + return nil, werr.Wrapf(err, "error while unmarshaling") } return data.Process() } diff --git a/infra.docker-compose.yml b/infra.docker-compose.yml new file mode 100644 index 0000000..fded2f7 --- /dev/null +++ b/infra.docker-compose.yml @@ -0,0 +1,8 @@ +services: + postgres: + image: postgres:latest + ports: + - "127.0.0.1:5432:5432" + volumes: + - ./pg_data:/var/lib/postgres + environment: diff --git a/server/api/http/api.go b/server/api/http/api.go index 860fa2c..3230fdc 100644 --- a/server/api/http/api.go +++ b/server/api/http/api.go @@ -2,15 +2,47 @@ package http import ( "context" + "encoding/json" + "mic-wallet/common" + "mic-wallet/server/core" "net/http" ) -func NewApi(ctx context.Context, baseRoute string) *http.ServeMux { +type ApiDependencies struct { + *core.BlocksCreator +} + +/* +/ -- node info +/txs -- add/get txs +/blocks -- get/blocks +/blocks/current -- get current block info +*/ + +func NewApi(ctx context.Context, d ApiDependencies, baseRoute string) *http.ServeMux { if baseRoute == "" { baseRoute = "/" } - mux := http.NewServeMux() + /* + + */ + + mux.HandleFunc(baseRoute, func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/" { + w.WriteHeader(http.StatusNotFound) + return + } + if r.Method != http.MethodGet { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + data, _ := json.Marshal(common.NodeInfo{ + NodeVersion: common.VersionNumberStr, + }) + w.Write(data) + w.WriteHeader(http.StatusOK) + }) return mux } diff --git a/server/api/http/transaction.go b/server/api/http/transaction.go index 2079e99..5454bc3 100644 --- a/server/api/http/transaction.go +++ b/server/api/http/transaction.go @@ -3,20 +3,35 @@ package http import ( "context" "encoding/json" + "errors" + "github.com/matchsystems/werr" + "log" + "mic-wallet/common" + "mic-wallet/server/core" logicErr "mic-wallet/server/logic/err" "net/http" ) -type NewTransactionReqBody struct { -} - -func NewTransaction(w http.ResponseWriter, r *http.Request) *logicErr.Err { - req := &NewTransactionReqBody{} +func NewTransaction(ctx context.Context, w http.ResponseWriter, r *http.Request, d ApiDependencies) { + defer func() { + if err := r.Body.Close(); err != nil { + log.Printf("error closing request body: %s", err.Error()) + } + }() + req := &common.NewTxOpts{} if err := json.NewDecoder(r.Body).Decode(req); err != nil { + http.Error(w, "invalid request body", http.StatusBadRequest) + return } - if err := r.Body.Close(); err != nil { + if err := d.WriteTx(req); err != nil { + if errors.Is(werr.UnwrapAll(err), core.ErrTxDuplicate) { + http.Error(w, "transaction already exists", http.StatusConflict) + return + } + w.WriteHeader(http.StatusInternalServerError) } - return nil + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) } func GetTransaction(w http.ResponseWriter, r *http.Request) *logicErr.Err { diff --git a/server/core/block.go b/server/core/block.go index d407418..14cc06f 100644 --- a/server/core/block.go +++ b/server/core/block.go @@ -2,25 +2,33 @@ package core import ( "crypto/sha512" - "fmt" + "encoding/binary" + "github.com/matchsystems/werr" "mic-wallet/common" + "mic-wallet/server/repository/entities" "sync" ) type Block struct { - ID int64 + Id int64 + PrevHash []byte + Hash []byte + TXs map[string]*common.NewTxOpts + // If there is some transactions added after block is finished + TXsLeftOver map[string]*common.NewTxOpts IgnoreSignatures map[string]struct{} - TXs map[string]*common.NewTxOpts Lock *sync.RWMutex - PrevHash []byte - Hash []byte + IsFinished bool } -func NewBlock(id int64, prevHash []byte) *Block { +func NewBlock(id int64, prevHash []byte, + prevTxsLeftOver map[string]*common.NewTxOpts, txsLeftOver map[string]*common.NewTxOpts) *Block { + return &Block{ - ID: id, + Id: id, IgnoreSignatures: make(map[string]struct{}), - TXs: make(map[string]*common.NewTxOpts), + TXs: prevTxsLeftOver, + TXsLeftOver: txsLeftOver, Lock: &sync.RWMutex{}, PrevHash: prevHash, } @@ -28,25 +36,41 @@ func NewBlock(id int64, prevHash []byte) *Block { func (b *Block) AddTx(tx *common.NewTxOpts) error { b.Lock.Lock() - hashStr := tx.HashString() sigStr := tx.SignatureString() + hashStr := tx.HashString() if _, ok := b.IgnoreSignatures[sigStr]; ok { - return fmt.Errorf("duplicate") + return werr.Wrapf(common.ErrEntityExists, + "transaction with signature %s already exists", sigStr) + } + if !b.IsFinished { + b.TXs[hashStr] = tx + } else { + b.TXsLeftOver[hashStr] = tx } - b.TXs[hashStr] = tx b.Lock.Unlock() return nil } -func (b *Block) RemoveTx(hash string) {} - -func (b *Block) CalcHash() []byte { +func (b *Block) Finish() ([]byte, error) { hash := sha512.New() + if err := binary.Write(hash, binary.BigEndian, b.Id); err != nil { + return nil, werr.Wrapf(err, "failed to write block id into hash buffer for block %d", b.Id) + } b.Lock.Lock() hash.Write(b.PrevHash) for _, tx := range b.TXs { hash.Write(tx.Hash) } + b.IsFinished = true + b.Lock.Unlock() b.Hash = hash.Sum(nil) - return b.Hash + return b.Hash, nil +} + +func (b *Block) ToRepoEntity() *entities.Block { + return &entities.Block{ + Id: b.Id, + Hash: b.Hash, + PrevHash: b.PrevHash, + } } diff --git a/server/core/blocks_commiter.go b/server/core/blocks_commiter.go index 8734037..16bca74 100644 --- a/server/core/blocks_commiter.go +++ b/server/core/blocks_commiter.go @@ -19,8 +19,8 @@ type IBlocksCommiter interface { type BlocksCommiter struct { BlocksQueue []*Block Lock *sync.RWMutex - CountLim int32 - CountBusy int32 + CountLim int + CountBusy int TaskDeadline time.Duration ErrChan chan error @@ -31,16 +31,19 @@ func NewBlocksCommiter(repo *repository.Repository, errChan chan error, workersC return &BlocksCommiter{ BlocksQueue: make([]*Block, 0), Lock: &sync.RWMutex{}, - CountLim: 0, + CountLim: workersCountLim, CountBusy: 0, TaskDeadline: 0, - ErrChan: make(chan error, workersCountLim), + ErrChan: errChan, CommitBlockFunc: func(ctx context.Context, block *Block) error { return CommitBlock(ctx, repo, block) }, } } +// execute executes commiting block with deadline and cancelable by the context +// provided when calling CommitBlock. +// all the errors from it (including context) are sent to BlocksCommiter.ErrChan func (bc *BlocksCommiter) execute(ctx context.Context, block *Block) { if bc.TaskDeadline > 0 { var cancel context.CancelFunc @@ -55,9 +58,9 @@ func (bc *BlocksCommiter) execute(ctx context.Context, block *Block) { }() select { case <-ctx.Done(): - errChan <- werr.Wrapf(ctx.Err(), "ctx error while commiting block %d", block.ID) + bc.ErrChan <- werr.Wrapf(ctx.Err(), "ctx error while commiting block %d", block.Id) case err := <-errChan: - bc.ErrChan <- werr.Wrapf(err, "error while commiting block %d", block.ID) + bc.ErrChan <- werr.Wrapf(err, "error while commiting block %d", block.Id) } } diff --git a/server/core/blocks_commiter_test.go b/server/core/blocks_commiter_test.go new file mode 100644 index 0000000..e487b89 --- /dev/null +++ b/server/core/blocks_commiter_test.go @@ -0,0 +1,7 @@ +package core + +import "testing" + +func TestBlocksCommiter_CommitBlock(t *testing.T) { + // TODO +} diff --git a/server/core/blocks_creator.go b/server/core/blocks_creator.go index 9e7fc51..8b53388 100644 --- a/server/core/blocks_creator.go +++ b/server/core/blocks_creator.go @@ -8,6 +8,9 @@ import ( "time" ) +var ErrOutOfDateBlockId = errors.New("") +var ErrBlockIdDoesntExist = errors.New("") + type IBlocksCreator interface { Run(ctx context.Context) error WriteTx(opts *common.NewTxOpts) error @@ -16,41 +19,69 @@ type IBlocksCreator interface { GetTxs() []*common.NewTxOpts } +// BlocksCreator creates and rotates new Blocks and pushed them to the commiter type BlocksCreator struct { blocks map[int64]*Block + txsLeftOver map[string]*common.NewTxOpts commiter IBlocksCommiter prevBlockHash []byte } func NewBlocksCreator(commiter IBlocksCommiter) *BlocksCreator { return &BlocksCreator{ - blocks: make(map[int64]*Block), - commiter: commiter, + blocks: make(map[int64]*Block), + txsLeftOver: make(map[string]*common.NewTxOpts), + commiter: commiter, } } -func (bc *BlocksCreator) Run(ctx context.Context) error { +func (bc *BlocksCreator) Run(ctx context.Context) (err error) { var blockID int64 + var prevLeftOver map[string]*common.NewTxOpts for { blockID = common.GetBlockId(time.Now()) - if oldBlock, ok := bc.blocks[blockID-100]; ok { // Commit and delete old block - bc.prevBlockHash = oldBlock.CalcHash() - bc.commiter.CommitBlock(ctx, oldBlock) - delete(bc.blocks, blockID-100) + if oldBlock, ok := bc.blocks[blockID-common.BlockSecsDiff]; ok { // Commit and delete old block + // we need to know previous hash for new block. + // new transactions while oldBlock.Finish is executing will + // end up in leftOver so they will be parts of the next block + if bc.prevBlockHash, err = oldBlock.Finish(); err != nil { + return nil + } + go bc.commiter.CommitBlock(ctx, oldBlock) // may take a while + delete(bc.blocks, blockID-common.BlockSecsDiff) } - bc.blocks[blockID] = NewBlock(blockID, bc.prevBlockHash) - time.Sleep(time.Until(time.Unix(blockID+100, 0))) + // prevLeftOver becomes new block's transactions + // so there is no "lost" transactions + prevLeftOver = bc.txsLeftOver + // and also now we need to make new leftOver + bc.txsLeftOver = make(map[string]*common.NewTxOpts) + bc.blocks[blockID] = NewBlock(blockID, bc.prevBlockHash, prevLeftOver, bc.txsLeftOver) + // Waiting for the next block + time.Sleep(time.Until(time.Unix(blockID+common.BlockSecsDiff, 0))) } } +// WriteTx validates transaction and writes it into the current block +// if transaction is valid func (bc *BlocksCreator) WriteTx(opts *common.NewTxOpts) error { + // validating may take some time, so to don't write into + if err := opts.Validate(); err != nil { + return werr.Wrapf(err, + "error validating transaction %s", opts.HashString()) + } currentBlockID := common.GetBlockId(time.Now()) if currentBlockID < opts.BlockId { - return errors.New("block with such id wasn't created yet. " + - "Ensure that you generate block IDs in UTC timezone") + return werr.Wrapf(common.ErrEntityNotFound, "block id %d does not exist", currentBlockID) } else if currentBlockID > opts.BlockId { - return errors.New("block id is out of date") + return werr.Wrapf(common.ErrEntityOutdated, "block id %d is out of date", currentBlockID) } - return werr.Wrapf(bc.blocks[currentBlockID].AddTx(opts), - "error adding transaction to block %d", currentBlockID) + block, ok := bc.blocks[currentBlockID] + if !ok { + return werr.Wrapf(common.ErrEntityNotFound, "block id %d does not exist", currentBlockID) + } + if err := block.AddTx(opts); err != nil { + return werr.Wrapf(err, + "error adding transaction to block %d", currentBlockID) + } + return nil } diff --git a/server/core/commit_block.go b/server/core/commit_block.go index 81ff9b1..4de811b 100644 --- a/server/core/commit_block.go +++ b/server/core/commit_block.go @@ -2,9 +2,46 @@ package core import ( "context" + "github.com/matchsystems/werr" + "mic-wallet/common" "mic-wallet/server/repository" + "mic-wallet/server/repository/entities" ) -func CommitBlock(ctx context.Context, repo *repository.Repository, block *Block) error { - return nil +func NewTxOptsToRepoEntity(opts *common.NewTxOpts) *entities.NewTransactionOpts { + return &entities.NewTransactionOpts{ + Hash: opts.Hash, + BlockId: opts.BlockId, + SenderPublicKey: opts.SenderPublicKey, + ReceiverPublicKey: opts.ReceiverPublicKey, + IsReward: opts.IsReward, + Amount: opts.Amount, + AmountBurned: common.CalcBurning(opts.Amount), + Message: opts.Message, + Signature: opts.Signature, + CreatedAt: opts.CreatedAt, + CreatedAtNs: opts.CreatedAtNs, + } +} + +func CommitBlock(ctx context.Context, repo *repository.Repository, block *Block) (err error) { + // If micw will be decentralized + // this function will be sending blocks to other nodes + // and then there will be block-fetcher to see which blocks were accepted to the network + + if err = repo.AddBLock(ctx, &entities.Block{ + Id: block.Id, + Hash: block.Hash, + PrevHash: block.PrevHash, + }); err != nil { + return werr.Wrapf(err, "failed to commit block") + } + + repoTxOpts := make([]*entities.NewTransactionOpts, len(block.TXs)) + var i int + for _, tx := range block.TXs { + repoTxOpts[i] = NewTxOptsToRepoEntity(tx) + i++ + } + return werr.Wrapf(repo.AddTransactions(ctx, repoTxOpts), "failed to commit transactions") } diff --git a/server/migrations/0.up.sql b/server/migrations/0.up.sql index 6938e94..7911b42 100644 --- a/server/migrations/0.up.sql +++ b/server/migrations/0.up.sql @@ -11,7 +11,6 @@ CREATE TABLE blocks ( hash BYTEA UNIQUE, -- NULL only on the first block prev_hash BYTEA UNIQUE, - finished_at TIMESTAMP, ) CREATE TABLE IF NOT EXISTS transactions ( diff --git a/server/repository/entities/entities.go b/server/repository/entities/entities.go index cb9b73b..2f38327 100644 --- a/server/repository/entities/entities.go +++ b/server/repository/entities/entities.go @@ -9,8 +9,7 @@ type ( Id int64 `db:"id"` Hash []byte `db:"hash"` PrevHash []byte `db:"prev_hash"` - StartedAt time.Time `db:"started_at"` - FinishedAt *time.Time `db:"finished_at"` + FinishedAt time.Time `db:"finished_at"` Transactions []Transaction `db:"transactions"` } @@ -18,23 +17,8 @@ type ( Id int64 `db:"id"` Hash []byte `db:"hash"` PrevHash []byte `db:"prev_hash"` - StartedAt time.Time `db:"started_at"` FinishedAt *time.Time `db:"finished_at"` } - - NewBlockOpts struct { - Id int64 `db:"id"` - Hash []byte `db:"hash"` // May be nil - PrevHash []byte `db:"prev_hash"` // May be nil - StartedAt time.Time `db:"started_at"` - FinishedAt *time.Time `db:"finished_at"` // May be nil - } - - BlockFinishedOpts struct { - Id int64 `db:"id"` - Hash []byte `db:"hash"` - FinishedAt time.Time `db:"finished_at"` - } ) type ( @@ -54,16 +38,16 @@ type ( } NewTransactionOpts struct { - Hash []byte `db:"hash"` - BlockId int64 `db:"block_id"` - SenderPublicKey []byte `db:"sender_public_key"` - ReceiverPublicKey []byte `db:"receiver_public_key"` - IsReward bool `db:"is_reward"` - Amount float64 `db:"amount"` - AmountBurned float64 `db:"amount_burned"` - Message string `db:"message"` - Signature []byte `db:"signature"` - CreatedAt time.Time `db:"created_at"` - CreatedAtNs int64 `db:"created_at_ns"` + Hash []byte `db:"hash"` + BlockId int64 `db:"block_id"` + SenderPublicKey []byte `db:"sender_public_key"` + ReceiverPublicKey []byte `db:"receiver_public_key"` + IsReward bool `db:"is_reward"` + Amount float64 `db:"amount"` + AmountBurned float64 `db:"amount_burned"` + Message string `db:"message"` + Signature []byte `db:"signature"` + CreatedAt int64 `db:"created_at"` + CreatedAtNs int64 `db:"created_at_ns"` } ) diff --git a/server/repository/repo.go b/server/repository/repo.go index b93cb08..8d191de 100644 --- a/server/repository/repo.go +++ b/server/repository/repo.go @@ -7,25 +7,28 @@ import ( ) type IBlocksRepository interface { - NewBLock(ctx context.Context, opts e.NewBlockOpts) (blockID int64, err error) - FinishBlock(ctx context.Context, opts e.BlockFinishedOpts) (err error) + AddBLock(ctx context.Context, opts *e.Block) (err error) + GetBlock(ctx context.Context, id int64) (block *e.Block, err error) GetBlockByHash(ctx context.Context, hash string) (block *e.Block, err error) - GetLastFinishedBlock(ctx context.Context) (block *e.Block, err error) + + GetBlockWithTxs(ctx context.Context, id int64) (block *e.BlockWithTransactions, err error) + GetBlockWithTxsByHash(ctx context.Context, hash string) (block *e.BlockWithTransactions, err error) } type ITransactionsRepository interface { - GetLastTransactionHash(ctx context.Context) ([]byte, error) - AddTransaction(ctx context.Context, opts e.NewTransactionOpts) (transaction *e.Transaction, err error) - GetTransaction(ctx context.Context, txID int64) (tx e.Transaction, err error) + AddTransaction(ctx context.Context, opts *e.NewTransactionOpts) (transaction *e.Transaction, err error) + AddTransactions(ctx context.Context, opts []*e.NewTransactionOpts) (err error) + + GetTransaction(ctx context.Context, hash string) (tx *e.Transaction, err error) GetUserIncomingTransactions( - ctx context.Context, userPubKey string, orderAsc bool, limit int, offset int) ([]e.Transaction, error) + ctx context.Context, userPubKey string, orderAsc bool, limit int, offset int) ([]*e.Transaction, error) GetUserOutcomingTransactions( - ctx context.Context, userPubKey string, orderAsc bool, limit int, offset int) ([]e.Transaction, error) + ctx context.Context, userPubKey string, orderAsc bool, limit int, offset int) ([]*e.Transaction, error) GetUserTransactions( - ctx context.Context, userPubKey string, orderAsc bool, limit int, offset int) ([]e.Transaction, error) + ctx context.Context, userPubKey string, orderAsc bool, limit int, offset int) ([]*e.Transaction, error) GetTransactions( - ctx context.Context, orderAsc bool, limit int, offset int) ([]e.Transaction, error) + ctx context.Context, orderAsc bool, limit int, offset int) ([]*e.Transaction, error) } type Repository struct { diff --git a/server/run.go b/server/run.go index 916d08d..1e5344c 100644 --- a/server/run.go +++ b/server/run.go @@ -3,34 +3,36 @@ package server import ( "context" "fmt" - commonUtils "git.mic.pp.ua/anderson/nettools/common" "net/http" ) func (s *Server) Run(ctx context.Context) error { - tasks := make([]commonUtils.Task, 0) + s.ErrChan = make(chan error) + defer close(s.ErrChan) + + tasks := make([]serverTask, 0) var mux *http.ServeMux if s.HttpListener != nil || s.HttpsListener != nil { mux = http.NewServeMux() } if s.HttpListener != nil { - tasks = append(tasks, func(ctx context.Context) error { - return http.Serve(s.HttpListener, mux) + tasks = append(tasks, func() error { + return s.RunHttp(mux) }) } if s.HttpsListener != nil { - tasks = append(tasks, func(ctx context.Context) error { - return http.ServeTLS(s.HttpsListener, mux, s.Cfg.TlsCfg.Cert, s.Cfg.TlsCfg.Key) + tasks = append(tasks, func() error { + return s.RunHttps(mux) }) } if s.GrpcListener != nil { - tasks = append(tasks, func(ctx context.Context) error { - return nil + tasks = append(tasks, func() error { + return s.RunGrpc() }) } - return commonUtils.ExecTasks(ctx, 0, tasks) + return s.execTasks(ctx, tasks) } func (s *Server) RunHttp(mux http.Handler) error { diff --git a/server/server.go b/server/server.go index 6799cd1..76e7c28 100644 --- a/server/server.go +++ b/server/server.go @@ -2,38 +2,45 @@ package server import ( "context" - commonUtils "git.mic.pp.ua/anderson/nettools/common" "github.com/jackc/pgx/v5/pgxpool" "mic-wallet/server/config" + "mic-wallet/server/core" "mic-wallet/server/repository" "net" "net/http" ) +type serverTask func() error + type Server struct { - Cfg *config.Processed - DB *pgxpool.Pool - HttpListener net.Listener - HttpsListener net.Listener - GrpcListener net.Listener - Mux *http.ServeMux - Repo *repository.Repository + Cfg *config.Processed + DB *pgxpool.Pool + Repo *repository.Repository + BlocksCommiter *core.BlocksCommiter + BlocksCreator *core.BlocksCreator + HttpListener net.Listener + HttpsListener net.Listener + GrpcListener net.Listener + Mux *http.ServeMux + ErrChan chan error } func New(cfg *config.Processed) *Server { - return &Server{Cfg: cfg} + return &Server{ + Cfg: cfg, + ErrChan: make(chan error), + } } -func (s *Server) execTasks(ctx context.Context, tasks []commonUtils.Task) error { - errChan := make(chan error) +func (s *Server) execTasks(ctx context.Context, tasks []serverTask) error { for _, task := range tasks { go func() { - errChan <- task(ctx) + s.ErrChan <- task() }() } for { select { - case err := <-errChan: + case err := <-s.ErrChan: if err != nil { return err } diff --git a/server/setup.go b/server/setup.go index 96fbbdd..cf6ba34 100644 --- a/server/setup.go +++ b/server/setup.go @@ -11,16 +11,15 @@ import ( _ "github.com/jackc/pgx/v5/stdlib" "github.com/matchsystems/werr" "log" + "mic-wallet/server/core" "mic-wallet/server/repository" "net" "net/http" + "runtime" "time" ) func (s *Server) Setup(ctx context.Context) error { - // ================================ - // Set up the transactions pipeline - // =================== // Setting up database @@ -67,7 +66,13 @@ func (s *Server) Setup(ctx context.Context) error { // Internals initialization s.Repo = repository.NewRepository(s.DB) - // TODO: + + commiterWorkersLim := runtime.NumCPU() - 1 + if commiterWorkersLim < 1 { + commiterWorkersLim = 1 + } + s.BlocksCommiter = core.NewBlocksCommiter(s.Repo, s.ErrChan, commiterWorkersLim) + s.BlocksCreator = core.NewBlocksCreator(s.BlocksCommiter) // ====================== // Creating net listeners @@ -79,7 +84,7 @@ func (s *Server) Setup(ctx context.Context) error { addr := fmt.Sprintf("%s:%d", s.Cfg.Addr, s.Cfg.HttpPort) s.HttpListener, err = net.Listen("tcp", addr) if err != nil { - return werr.Wrapf(err, "failed to listen http_api_entities on address %s", addr) + return werr.Wrapf(err, "failed to listen http on address %s", addr) } } if s.Cfg.HttpsPort > 0 {