diff --git a/config/environments/local/local.node.config.toml b/config/environments/local/local.node.config.toml index a6cefd8195..a2c1a7ac56 100644 --- a/config/environments/local/local.node.config.toml +++ b/config/environments/local/local.node.config.toml @@ -151,6 +151,24 @@ MaxGasPriceWei = 0 #GasPriceUsdt = 0.0001 #L2CoinId = 7184 +## the follower strategy config demo that can dynamic adjust the factor through get the L1 and L2 coin price from kafka +#Type = "follower" +#UpdatePeriod = "10s" +#Factor = 1 +#DefaultGasPriceWei = 1000000000 +#KafkaURL = "127.0.0.1:9092" +#Topic = "middle_coinPrice_push" +#GroupID = "web3_okbc_explorerchainprice" +## just in SASL_SSL mode +#Username = "" +#Password = "" +#RootCAPath = "only-4096-ca-cert" +#L1CoinId = 15756 +#L2CoinId = 7184 +#DefaultL2CoinPrice = 40 +#DefaultL1CoinPrice = 1600 +#EnableFollowerAdjustByL2L1Price = true # dynamic adjust the factor through the L1 and L2 coins price in follower strategy + [MTClient] URI = "x1-prover:50061" diff --git a/docs/config-file/node-config-doc.html b/docs/config-file/node-config-doc.html index 6245bcbf84..40fb3d4e6c 100644 --- a/docs/config-file/node-config-doc.html +++ b/docs/config-file/node-config-doc.html @@ -62,6 +62,6 @@
"300ms"
 

Default: "5m0s"Type: string

Examples:

"1m"
 
"300ms"
-

Default: ""Type: string

Default: ""Type: string

Default: ""Type: string

Default: ""Type: string

Default: ""Type: string

Default: ""Type: string

Default: 0Type: integer

Default: 0Type: number

DefaultL2CoinPrice is the native token's coin price


Default: 0Type: number

Default: 0.15Type: number

Configuration of the executor service
Default: "x1-prover:50071"Type: string

Default: 3Type: integer

MaxResourceExhaustedAttempts is the max number of attempts to make a transaction succeed because of resource exhaustion


Default: "1s"Type: string

WaitOnResourceExhaustion is the time to wait before retrying a transaction because of resource exhaustion


Examples:

"1m"
+

Default: ""Type: string

Default: ""Type: string

Default: ""Type: string

Default: ""Type: string

Default: ""Type: string

Default: ""Type: string

Default: 0Type: integer

Default: 0Type: integer

Default: 0Type: number

DefaultL1CoinPrice is the L1 token's coin price


Default: 0Type: number

DefaultL2CoinPrice is the native token's coin price


Default: 0Type: number

Default: falseType: boolean

EnableFollowerAdjustByL2L1Price is dynamic adjust the factor through the L1 and L2 coins price in follower strategy


Default: 0.15Type: number

Configuration of the executor service
Default: "x1-prover:50071"Type: string

Default: 3Type: integer

MaxResourceExhaustedAttempts is the max number of attempts to make a transaction succeed because of resource exhaustion


Default: "1s"Type: string

WaitOnResourceExhaustion is the time to wait before retrying a transaction because of resource exhaustion


Examples:

"1m"
 
"300ms"
 

Default: 100000000Type: integer

Configuration of the merkle tree client service. Not use in the node, only for testing
Default: "x1-prover:50061"Type: string

URI is the server URI.


Configuration of the state database connection
Default: "state_db"Type: string

Database name


Default: "state_user"Type: string

Database User name


Default: "state_password"Type: string

Database Password of the user


Default: "x1-state-db"Type: string

Host address of database


Default: "5432"Type: string

Port Number of database


Default: falseType: boolean

EnableLog


Default: 200Type: integer

MaxConns is the maximum number of connections in the pool.


Configuration of the metrics service, basically is where is going to publish the metrics
Default: "0.0.0.0"Type: string

Host is the address to bind the metrics server


Default: 9091Type: integer

Port is the port to bind the metrics server


Default: falseType: boolean

Enabled is the flag to enable/disable the metrics server


Default: ""Type: string

ProfilingHost is the address to bind the profiling server


Default: 0Type: integer

ProfilingPort is the port to bind the profiling server


Default: falseType: boolean

ProfilingEnabled is the flag to enable/disable the profiling server


Configuration of the event database connection

DB is the database configuration
Default: ""Type: string

Database name


Default: ""Type: string

Database User name


Default: ""Type: string

Database Password of the user


Default: ""Type: string

Host address of database


Default: ""Type: string

Port Number of database


Default: falseType: boolean

EnableLog


Default: 0Type: integer

MaxConns is the maximum number of connections in the pool.


Configuration of the hash database connection
Default: "prover_db"Type: string

Database name


Default: "prover_user"Type: string

Database User name


Default: "prover_pass"Type: string

Database Password of the user


Default: "x1-state-db"Type: string

Host address of database


Default: "5432"Type: string

Port Number of database


Default: falseType: boolean

EnableLog


Default: 200Type: integer

MaxConns is the maximum number of connections in the pool.


\ No newline at end of file diff --git a/docs/config-file/node-config-doc.md b/docs/config-file/node-config-doc.md index 10f73de134..fd60c75839 100644 --- a/docs/config-file/node-config-doc.md +++ b/docs/config-file/node-config-doc.md @@ -2516,28 +2516,31 @@ GenesisBlockNum=0 **Type:** : `object` **Description:** Configuration of the gas price suggester service -| Property | Pattern | Type | Deprecated | Definition | Title/Description | -| ------------------------------------------------------------------------------ | ------- | ------- | ---------- | ---------- | ---------------------------------------------------------------------------------------------------------------------------------------- | -| - [Type](#L2GasPriceSuggester_Type ) | No | string | No | - | - | -| - [DefaultGasPriceWei](#L2GasPriceSuggester_DefaultGasPriceWei ) | No | integer | No | - | DefaultGasPriceWei is used to set the gas price to be used by the default gas pricer or as minimim gas price by the follower gas pricer. | -| - [MaxGasPriceWei](#L2GasPriceSuggester_MaxGasPriceWei ) | No | integer | No | - | MaxGasPriceWei is used to limit the gas price returned by the follower gas pricer to a maximum value. It is ignored if 0. | -| - [MaxPrice](#L2GasPriceSuggester_MaxPrice ) | No | object | No | - | - | -| - [IgnorePrice](#L2GasPriceSuggester_IgnorePrice ) | No | object | No | - | - | -| - [CheckBlocks](#L2GasPriceSuggester_CheckBlocks ) | No | integer | No | - | - | -| - [Percentile](#L2GasPriceSuggester_Percentile ) | No | integer | No | - | - | -| - [UpdatePeriod](#L2GasPriceSuggester_UpdatePeriod ) | No | string | No | - | Duration | -| - [CleanHistoryPeriod](#L2GasPriceSuggester_CleanHistoryPeriod ) | No | string | No | - | Duration | -| - [CleanHistoryTimeRetention](#L2GasPriceSuggester_CleanHistoryTimeRetention ) | No | string | No | - | Duration | -| - [KafkaURL](#L2GasPriceSuggester_KafkaURL ) | No | string | No | - | - | -| - [Topic](#L2GasPriceSuggester_Topic ) | No | string | No | - | - | -| - [GroupID](#L2GasPriceSuggester_GroupID ) | No | string | No | - | - | -| - [Username](#L2GasPriceSuggester_Username ) | No | string | No | - | - | -| - [Password](#L2GasPriceSuggester_Password ) | No | string | No | - | - | -| - [RootCAPath](#L2GasPriceSuggester_RootCAPath ) | No | string | No | - | - | -| - [L2CoinId](#L2GasPriceSuggester_L2CoinId ) | No | integer | No | - | - | -| - [DefaultL2CoinPrice](#L2GasPriceSuggester_DefaultL2CoinPrice ) | No | number | No | - | DefaultL2CoinPrice is the native token's coin price | -| - [GasPriceUsdt](#L2GasPriceSuggester_GasPriceUsdt ) | No | number | No | - | - | -| - [Factor](#L2GasPriceSuggester_Factor ) | No | number | No | - | - | +| Property | Pattern | Type | Deprecated | Definition | Title/Description | +| ------------------------------------------------------------------------------------------ | ------- | ------- | ---------- | ---------- | ---------------------------------------------------------------------------------------------------------------------------------------- | +| - [Type](#L2GasPriceSuggester_Type ) | No | string | No | - | - | +| - [DefaultGasPriceWei](#L2GasPriceSuggester_DefaultGasPriceWei ) | No | integer | No | - | DefaultGasPriceWei is used to set the gas price to be used by the default gas pricer or as minimim gas price by the follower gas pricer. | +| - [MaxGasPriceWei](#L2GasPriceSuggester_MaxGasPriceWei ) | No | integer | No | - | MaxGasPriceWei is used to limit the gas price returned by the follower gas pricer to a maximum value. It is ignored if 0. | +| - [MaxPrice](#L2GasPriceSuggester_MaxPrice ) | No | object | No | - | - | +| - [IgnorePrice](#L2GasPriceSuggester_IgnorePrice ) | No | object | No | - | - | +| - [CheckBlocks](#L2GasPriceSuggester_CheckBlocks ) | No | integer | No | - | - | +| - [Percentile](#L2GasPriceSuggester_Percentile ) | No | integer | No | - | - | +| - [UpdatePeriod](#L2GasPriceSuggester_UpdatePeriod ) | No | string | No | - | Duration | +| - [CleanHistoryPeriod](#L2GasPriceSuggester_CleanHistoryPeriod ) | No | string | No | - | Duration | +| - [CleanHistoryTimeRetention](#L2GasPriceSuggester_CleanHistoryTimeRetention ) | No | string | No | - | Duration | +| - [KafkaURL](#L2GasPriceSuggester_KafkaURL ) | No | string | No | - | - | +| - [Topic](#L2GasPriceSuggester_Topic ) | No | string | No | - | - | +| - [GroupID](#L2GasPriceSuggester_GroupID ) | No | string | No | - | - | +| - [Username](#L2GasPriceSuggester_Username ) | No | string | No | - | - | +| - [Password](#L2GasPriceSuggester_Password ) | No | string | No | - | - | +| - [RootCAPath](#L2GasPriceSuggester_RootCAPath ) | No | string | No | - | - | +| - [L1CoinId](#L2GasPriceSuggester_L1CoinId ) | No | integer | No | - | - | +| - [L2CoinId](#L2GasPriceSuggester_L2CoinId ) | No | integer | No | - | - | +| - [DefaultL1CoinPrice](#L2GasPriceSuggester_DefaultL1CoinPrice ) | No | number | No | - | DefaultL1CoinPrice is the L1 token's coin price | +| - [DefaultL2CoinPrice](#L2GasPriceSuggester_DefaultL2CoinPrice ) | No | number | No | - | DefaultL2CoinPrice is the native token's coin price | +| - [GasPriceUsdt](#L2GasPriceSuggester_GasPriceUsdt ) | No | number | No | - | - | +| - [EnableFollowerAdjustByL2L1Price](#L2GasPriceSuggester_EnableFollowerAdjustByL2L1Price ) | No | boolean | No | - | EnableFollowerAdjustByL2L1Price is dynamic adjust the factor through the L1 and L2 coins price in follower strategy | +| - [Factor](#L2GasPriceSuggester_Factor ) | No | number | No | - | - | ### 14.1. `L2GasPriceSuggester.Type` @@ -2755,7 +2758,19 @@ Password="" RootCAPath="" ``` -### 14.17. `L2GasPriceSuggester.L2CoinId` +### 14.17. `L2GasPriceSuggester.L1CoinId` + +**Type:** : `integer` + +**Default:** `0` + +**Example setting the default value** (0): +``` +[L2GasPriceSuggester] +L1CoinId=0 +``` + +### 14.18. `L2GasPriceSuggester.L2CoinId` **Type:** : `integer` @@ -2767,7 +2782,21 @@ RootCAPath="" L2CoinId=0 ``` -### 14.18. `L2GasPriceSuggester.DefaultL2CoinPrice` +### 14.19. `L2GasPriceSuggester.DefaultL1CoinPrice` + +**Type:** : `number` + +**Default:** `0` + +**Description:** DefaultL1CoinPrice is the L1 token's coin price + +**Example setting the default value** (0): +``` +[L2GasPriceSuggester] +DefaultL1CoinPrice=0 +``` + +### 14.20. `L2GasPriceSuggester.DefaultL2CoinPrice` **Type:** : `number` @@ -2781,7 +2810,7 @@ L2CoinId=0 DefaultL2CoinPrice=0 ``` -### 14.19. `L2GasPriceSuggester.GasPriceUsdt` +### 14.21. `L2GasPriceSuggester.GasPriceUsdt` **Type:** : `number` @@ -2793,7 +2822,21 @@ DefaultL2CoinPrice=0 GasPriceUsdt=0 ``` -### 14.20. `L2GasPriceSuggester.Factor` +### 14.22. `L2GasPriceSuggester.EnableFollowerAdjustByL2L1Price` + +**Type:** : `boolean` + +**Default:** `false` + +**Description:** EnableFollowerAdjustByL2L1Price is dynamic adjust the factor through the L1 and L2 coins price in follower strategy + +**Example setting the default value** (false): +``` +[L2GasPriceSuggester] +EnableFollowerAdjustByL2L1Price=false +``` + +### 14.23. `L2GasPriceSuggester.Factor` **Type:** : `number` diff --git a/docs/config-file/node-config-schema.json b/docs/config-file/node-config-schema.json index 124a2bc68a..c0a45ef64c 100644 --- a/docs/config-file/node-config-schema.json +++ b/docs/config-file/node-config-schema.json @@ -1109,10 +1109,19 @@ "type": "string", "default": "" }, + "L1CoinId": { + "type": "integer", + "default": 0 + }, "L2CoinId": { "type": "integer", "default": 0 }, + "DefaultL1CoinPrice": { + "type": "number", + "description": "DefaultL1CoinPrice is the L1 token's coin price", + "default": 0 + }, "DefaultL2CoinPrice": { "type": "number", "description": "DefaultL2CoinPrice is the native token's coin price", @@ -1122,6 +1131,11 @@ "type": "number", "default": 0 }, + "EnableFollowerAdjustByL2L1Price": { + "type": "boolean", + "description": "EnableFollowerAdjustByL2L1Price is dynamic adjust the factor through the L1 and L2 coins price in follower strategy", + "default": false + }, "Factor": { "type": "number", "default": 0.15 diff --git a/gasprice/config.go b/gasprice/config.go index 13b55e8597..a414e19522 100644 --- a/gasprice/config.go +++ b/gasprice/config.go @@ -42,10 +42,16 @@ type Config struct { Username string `mapstructure:"Username"` Password string `mapstructure:"Password"` RootCAPath string `mapstructure:"RootCAPath"` + L1CoinId int `mapstructure:"L1CoinId"` L2CoinId int `mapstructure:"L2CoinId"` + // DefaultL1CoinPrice is the L1 token's coin price + DefaultL1CoinPrice float64 `mapstructure:"DefaultL1CoinPrice"` // DefaultL2CoinPrice is the native token's coin price DefaultL2CoinPrice float64 `mapstructure:"DefaultL2CoinPrice"` GasPriceUsdt float64 `mapstructure:"GasPriceUsdt"` + // EnableFollowerAdjustByL2L1Price is dynamic adjust the factor through the L1 and L2 coins price in follower strategy + EnableFollowerAdjustByL2L1Price bool `mapstructure:"EnableFollowerAdjustByL2L1Price"` + Factor float64 `mapstructure:"Factor"` } diff --git a/gasprice/fixed.go b/gasprice/fixed.go index b6e3d26f36..de1e3abdb9 100644 --- a/gasprice/fixed.go +++ b/gasprice/fixed.go @@ -12,8 +12,8 @@ import ( const ( // OKBWei OKB wei - OKBWei = 1e18 - minOKBWei = 1e-18 + OKBWei = 1e18 + minCoinPrice = 1e-18 ) // FixedGasPrice struct @@ -49,7 +49,7 @@ func (f *FixedGasPrice) UpdateGasPriceAvg() { } l2CoinPrice := f.ratePrc.GetL2CoinPrice() - if l2CoinPrice < minOKBWei { + if l2CoinPrice < minCoinPrice { log.Warn("the L2 native coin price too small...") return } diff --git a/gasprice/follower.go b/gasprice/follower.go index fa5594336a..853cd53025 100644 --- a/gasprice/follower.go +++ b/gasprice/follower.go @@ -12,10 +12,11 @@ import ( // FollowerGasPrice struct. type FollowerGasPrice struct { - cfg Config - pool poolInterface - ctx context.Context - eth ethermanInterface + cfg Config + pool poolInterface + ctx context.Context + eth ethermanInterface + kafkaPrc *KafkaProcessor } // newFollowerGasPriceSuggester inits l2 follower gas price suggester which is based on the l1 gas price. @@ -26,6 +27,9 @@ func newFollowerGasPriceSuggester(ctx context.Context, cfg Config, pool poolInte ctx: ctx, eth: ethMan, } + if cfg.EnableFollowerAdjustByL2L1Price { + gps.kafkaPrc = newKafkaProcessor(cfg, ctx) + } gps.UpdateGasPriceAvg() return gps } @@ -44,6 +48,17 @@ func (f *FollowerGasPrice) UpdateGasPriceAvg() { factor := big.NewFloat(0).SetFloat64(f.cfg.Factor) res := new(big.Float).Mul(factor, big.NewFloat(0).SetInt(l1GasPrice)) + // convert the eth gas price to okb gas price + if f.cfg.EnableFollowerAdjustByL2L1Price { + l1CoinPrice, l2CoinPrice := f.kafkaPrc.GetL1L2CoinPrice() + if l1CoinPrice < minCoinPrice || l2CoinPrice < minCoinPrice { + log.Warn("the L1 or L2 native coin price too small...") + return + } + res = new(big.Float).Mul(big.NewFloat(0).SetFloat64(l1CoinPrice/l2CoinPrice), res) + log.Debug("L2 pre gas price value: ", res.String(), ". L1 coin price: ", l1CoinPrice, ". L2 coin price: ", l2CoinPrice) + } + // Store l2 gasPrice calculated result := new(big.Int) res.Int(result) diff --git a/gasprice/kafka_proc.go b/gasprice/kafka_proc.go index 830e80a408..1331368f79 100644 --- a/gasprice/kafka_proc.go +++ b/gasprice/kafka_proc.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "crypto/x509" "encoding/json" + "errors" "fmt" "os" "strings" @@ -18,10 +19,16 @@ import ( const ( okbcoinId = 7184 + ethcoinId = 15756 defaultTime = 10 defaultMaxData = 10e6 // 10M ) +var ( + // ErrNotFindCoinPrice not find a correct coin price + ErrNotFindCoinPrice = errors.New("not find a correct coin price") +) + // MsgInfo msg info type MsgInfo struct { Topic string `json:"topic"` @@ -64,25 +71,43 @@ type Price struct { Id string `json:"id"` } +// L1L2PriceRecord l1 l2 coin price record +type L1L2PriceRecord struct { + l1Price float64 + l2Price float64 + l1Update bool + l2Update bool +} + // KafkaProcessor kafka processor type KafkaProcessor struct { - kreader *kafka.Reader - L2Price float64 - ctx context.Context - rwLock sync.RWMutex - l2CoinId int + cfg Config + kreader *kafka.Reader + ctx context.Context + rwLock sync.RWMutex + l1CoinId int + l2CoinId int + l1Price float64 + l2Price float64 + tmpPrices L1L2PriceRecord } func newKafkaProcessor(cfg Config, ctx context.Context) *KafkaProcessor { rp := &KafkaProcessor{ + cfg: cfg, kreader: getKafkaReader(cfg), - L2Price: cfg.DefaultL2CoinPrice, + l1Price: cfg.DefaultL1CoinPrice, + l2Price: cfg.DefaultL2CoinPrice, ctx: ctx, l2CoinId: okbcoinId, + l1CoinId: ethcoinId, } if cfg.L2CoinId != 0 { rp.l2CoinId = cfg.L2CoinId } + if cfg.L1CoinId != 0 { + rp.l1CoinId = cfg.L1CoinId + } go rp.processor() return rp @@ -130,52 +155,113 @@ func (rp *KafkaProcessor) processor() { case <-rp.ctx.Done(): return default: - value, err := rp.ReadAndCalc(rp.ctx) - if err != nil { + err := rp.ReadAndUpdate(rp.ctx) + if err != nil && err != ErrNotFindCoinPrice { log.Warn("get the destion data fail ", err) time.Sleep(time.Second * defaultTime) continue } - rp.updateL2CoinPrice(value) } } } -// ReadAndCalc read and calc -func (rp *KafkaProcessor) ReadAndCalc(ctx context.Context) (float64, error) { +// ReadAndUpdate read and update +func (rp *KafkaProcessor) ReadAndUpdate(ctx context.Context) error { m, err := rp.kreader.ReadMessage(ctx) if err != nil { - return 0, err + return err } - return rp.parseL2CoinPrice(m.Value) + return rp.Update(m.Value) +} + +// Update update the coin price +func (rp *KafkaProcessor) Update(data []byte) error { + if rp.cfg.Type == FixedType { + price, err := rp.parseCoinPrice(data, []int{rp.l2CoinId}) + if err == nil { + rp.updateL2CoinPrice(price[rp.l2CoinId]) + } + return err + } else if rp.cfg.Type == FollowerType { + prices, err := rp.parseCoinPrice(data, []int{rp.l1CoinId, rp.l2CoinId}) + if err == nil { + rp.updateL1L2CoinPrice(prices) + } + return err + } + return nil } func (rp *KafkaProcessor) updateL2CoinPrice(price float64) { rp.rwLock.Lock() defer rp.rwLock.Unlock() - rp.L2Price = price + rp.l2Price = price } // GetL2CoinPrice get L2 coin price func (rp *KafkaProcessor) GetL2CoinPrice() float64 { rp.rwLock.RLock() defer rp.rwLock.RUnlock() - return rp.L2Price + return rp.l2Price +} + +func (rp *KafkaProcessor) updateL1L2CoinPrice(prices map[int]float64) { + if len(prices) == 0 { + return + } + rp.rwLock.Lock() + defer rp.rwLock.Unlock() + if v, ok := prices[rp.l1CoinId]; ok { + rp.tmpPrices.l1Price = v + rp.tmpPrices.l1Update = true + } + if v, ok := prices[rp.l2CoinId]; ok { + rp.tmpPrices.l2Price = v + rp.tmpPrices.l2Update = true + } + if rp.tmpPrices.l1Update && rp.tmpPrices.l2Update { + rp.l1Price = rp.tmpPrices.l1Price + rp.l2Price = rp.tmpPrices.l2Price + rp.tmpPrices.l1Update = false + rp.tmpPrices.l2Update = false + return + } +} + +// GetL1L2CoinPrice get l1, L2 coin price +func (rp *KafkaProcessor) GetL1L2CoinPrice() (float64, float64) { + rp.rwLock.RLock() + defer rp.rwLock.RUnlock() + return rp.l1Price, rp.l2Price } -func (rp *KafkaProcessor) parseL2CoinPrice(value []byte) (float64, error) { +func (rp *KafkaProcessor) parseCoinPrice(value []byte, coinIds []int) (map[int]float64, error) { + if len(coinIds) == 0 { + return nil, fmt.Errorf("the params coinIds is empty") + } msgI := &MsgInfo{} err := json.Unmarshal(value, &msgI) if err != nil { - return 0, err + return nil, err } if msgI.Data == nil || len(msgI.Data.PriceList) == 0 { - return 0, fmt.Errorf("the data PriceList is empty") + return nil, fmt.Errorf("the data PriceList is empty") } + mp := make(map[int]*Price) for _, price := range msgI.Data.PriceList { - if price.CoinId == rp.l2CoinId { - return price.Price, nil + mp[price.CoinId] = price + } + + results := make(map[int]float64) + for _, coinId := range coinIds { + if coin, ok := mp[coinId]; ok { + results[coinId] = coin.Price + } else { + log.Debugf("not find a correct coin price coin id is =%v", coinId) } } - return 0, fmt.Errorf("not find a correct coin price coinId=%v", rp.l2CoinId) + if len(results) == 0 { + return results, ErrNotFindCoinPrice + } + return results, nil } diff --git a/gasprice/kafka_proc_test.go b/gasprice/kafka_proc_test.go index 5d28387a01..df797149eb 100644 --- a/gasprice/kafka_proc_test.go +++ b/gasprice/kafka_proc_test.go @@ -8,51 +8,248 @@ import ( "github.com/stretchr/testify/require" ) -func TestCalculateRate(t *testing.T) { +func TestParseCoinPrice(t *testing.T) { testcases := []struct { - l2CoinId int - msg string - check func(rate float64, err error) + coinIds []int + msg string + check func(prices map[int]float64, err error) }{ { - // error - l2CoinId: okbcoinId, - msg: "{\"topic\":\"middle_coinPrice_push\"}", - check: func(rate float64, err error) { + // param err + coinIds: []int{}, + msg: "{\"topic\":\"middle_coinPrice_push\"}", + check: func(prices map[int]float64, err error) { require.Error(t, err) }, }, { - // error - l2CoinId: okbcoinId, - msg: fmt.Sprintf("{\"topic\":\"middle_coinPrice_push\",\"source\":null,\"type\":null,\"data\":{\"priceList\":[{\"coinId\":%d,\"price\":0.02}],\"id\":\"98a797ce-f61b-4e90-87ac-445e77ad3599\"}}", okbcoinId+1), - check: func(rate float64, err error) { + // param err + coinIds: []int{ethcoinId, okbcoinId}, + msg: "{\"topic\":\"middle_coinPrice_push\"}", + check: func(prices map[int]float64, err error) { require.Error(t, err) }, }, + { + // not find all, find one + coinIds: []int{ethcoinId, okbcoinId}, + msg: fmt.Sprintf("{\"topic\":\"middle_coinPrice_push\",\"source\":null,\"type\":null,\"data\":{\"priceList\":[{\"coinId\":%d,\"price\":0.02}],\"id\":\"98a797ce-f61b-4e90-87ac-445e77ad3599\"}}", ethcoinId), + check: func(prices map[int]float64, err error) { + require.NoError(t, err) + require.Equal(t, prices[ethcoinId], 0.02) + }, + }, + { + // not find all + coinIds: []int{ethcoinId, okbcoinId}, + msg: fmt.Sprintf("{\"topic\":\"middle_coinPrice_push\",\"source\":null,\"type\":null,\"data\":{\"priceList\":[{\"coinId\":%d,\"price\":0.02}],\"id\":\"98a797ce-f61b-4e90-87ac-445e77ad3599\"}}", okbcoinId), + check: func(prices map[int]float64, err error) { + require.NoError(t, err) + require.Equal(t, prices[okbcoinId], 0.02) + }, + }, + { + // correct + coinIds: []int{ethcoinId, okbcoinId, okbcoinId + 1}, + msg: fmt.Sprintf("{\"topic\":\"middle_coinPrice_push\",\"source\":null,\"type\":null,\"data\":{\"priceList\":[{\"coinId\":%d,\"price\":0.001}, {\"coinId\":%d,\"price\":0.002}, {\"coinId\":%d,\"price\":0.003}],\"id\":\"98a797ce-f61b-4e90-87ac-445e77ad3599\"}}", ethcoinId, okbcoinId, okbcoinId+1), + check: func(prices map[int]float64, err error) { + require.NoError(t, err) + require.Equal(t, len(prices), 3) + require.Equal(t, prices[ethcoinId], 0.001) + require.Equal(t, prices[okbcoinId], 0.002) + require.Equal(t, prices[okbcoinId+1], 0.003) + }, + }, + { + // correct + coinIds: []int{ethcoinId, okbcoinId}, + msg: fmt.Sprintf("{\"topic\":\"middle_coinPrice_push\",\"source\":null,\"type\":null,\"data\":{\"priceList\":[{\"coinId\":%d,\"price\":0.02}, {\"coinId\":%d,\"price\":0.002}],\"id\":\"98a797ce-f61b-4e90-87ac-445e77ad3599\"}}", ethcoinId, okbcoinId), + check: func(prices map[int]float64, err error) { + require.NoError(t, err) + require.Equal(t, len(prices), 2) + require.Equal(t, prices[ethcoinId], 0.02) + require.Equal(t, prices[okbcoinId], 0.002) + }, + }, { // correct - l2CoinId: okbcoinId, - msg: fmt.Sprintf("{\"topic\":\"middle_coinPrice_push\",\"source\":null,\"type\":null,\"data\":{\"priceList\":[{\"coinId\":%d,\"price\":0.02}, {\"coinId\":%d,\"price\":0.002}],\"id\":\"98a797ce-f61b-4e90-87ac-445e77ad3599\"}}", 1, okbcoinId), - check: func(rate float64, err error) { - require.Equal(t, rate, 0.002) + coinIds: []int{ethcoinId, okbcoinId}, + msg: fmt.Sprintf("{\"topic\":\"middle_coinPrice_push\",\"source\":null,\"type\":null,\"data\":{\"priceList\":[{\"coinId\":%d,\"price\":0.02}, {\"coinId\":%d,\"price\":0.002}],\"id\":\"98a797ce-f61b-4e90-87ac-445e77ad3599\"}}", okbcoinId, ethcoinId), + check: func(prices map[int]float64, err error) { require.NoError(t, err) + require.Equal(t, len(prices), 2) + require.Equal(t, prices[ethcoinId], 0.002) + require.Equal(t, prices[okbcoinId], 0.02) }, }, { // correct - l2CoinId: okbcoinId, - msg: fmt.Sprintf("{\"topic\":\"middle_coinPrice_push\",\"source\":null,\"type\":null,\"data\":{\"priceList\":[{\"coinId\":%d,\"price\":0.02}, {\"coinId\":%d,\"price\":10}],\"id\":\"98a797ce-f61b-4e90-87ac-445e77ad3599\"}}", 1, okbcoinId), - check: func(rate float64, err error) { - require.Equal(t, rate, float64(10)) + coinIds: []int{okbcoinId, ethcoinId}, + msg: fmt.Sprintf("{\"topic\":\"middle_coinPrice_push\",\"source\":null,\"type\":null,\"data\":{\"priceList\":[{\"coinId\":%d,\"price\":0.02}, {\"coinId\":%d,\"price\":0.002}, {\"coinId\":%d,\"price\":0.003}],\"id\":\"98a797ce-f61b-4e90-87ac-445e77ad3599\"}}", okbcoinId, ethcoinId, ethcoinId+1), + check: func(prices map[int]float64, err error) { require.NoError(t, err) + require.Equal(t, len(prices), 2) + require.Equal(t, prices[okbcoinId], 0.02) + require.Equal(t, prices[ethcoinId], 0.002) + }, + }, + { + // correct + coinIds: []int{okbcoinId}, + msg: fmt.Sprintf("{\"topic\":\"middle_coinPrice_push\",\"source\":null,\"type\":null,\"data\":{\"priceList\":[{\"coinId\":%d,\"price\":0.04}, {\"coinId\":%d,\"price\":0.002}, {\"coinId\":123,\"price\":0.005}],\"id\":\"98a797ce-f61b-4e90-87ac-445e77ad3599\"}}", ethcoinId, okbcoinId), + check: func(prices map[int]float64, err error) { + require.NoError(t, err) + require.Equal(t, len(prices), 1) + require.Equal(t, prices[okbcoinId], 0.002) }, }, } for _, tc := range testcases { - rp := newKafkaProcessor(Config{Topic: "middle_coinPrice_push", L2CoinId: tc.l2CoinId}, context.Background()) - rt, err := rp.parseL2CoinPrice([]byte(tc.msg)) + rp := newKafkaProcessor(Config{Topic: "middle_coinPrice_push"}, context.Background()) + rt, err := rp.parseCoinPrice([]byte(tc.msg), tc.coinIds) tc.check(rt, err) } } + +func TestUpdateL1L2CoinPrice(t *testing.T) { + testcases := []struct { + check func() + }{ + { + check: func() { + rp := newKafkaProcessor(Config{Topic: "middle_coinPrice_push"}, context.Background()) + prices := map[int]float64{ethcoinId: 1.5, okbcoinId: 0.5} + rp.updateL1L2CoinPrice(prices) + l1, l2 := rp.GetL1L2CoinPrice() + require.Equal(t, l1, 1.5) + require.Equal(t, l2, 0.5) + }, + }, + { + check: func() { + rp := newKafkaProcessor(Config{Topic: "middle_coinPrice_push"}, context.Background()) + prices := map[int]float64{ethcoinId: 1.5} + rp.updateL1L2CoinPrice(prices) + l1, l2 := rp.GetL1L2CoinPrice() + require.Equal(t, l1, 0.0) + require.Equal(t, l2, 0.0) + require.Equal(t, rp.tmpPrices.l1Update, true) + require.Equal(t, rp.tmpPrices.l2Update, false) + + prices = map[int]float64{okbcoinId: 0.5} + rp.updateL1L2CoinPrice(prices) + l1, l2 = rp.GetL1L2CoinPrice() + require.Equal(t, l1, 1.5) + require.Equal(t, l2, 0.5) + require.Equal(t, rp.tmpPrices.l1Update, false) + require.Equal(t, rp.tmpPrices.l2Update, false) + }, + }, + { + check: func() { + rp := newKafkaProcessor(Config{Topic: "middle_coinPrice_push"}, context.Background()) + prices := map[int]float64{okbcoinId: 0.5} + rp.updateL1L2CoinPrice(prices) + l1, l2 := rp.GetL1L2CoinPrice() + require.Equal(t, l1, 0.0) + require.Equal(t, l2, 0.0) + require.Equal(t, rp.tmpPrices.l1Update, false) + require.Equal(t, rp.tmpPrices.l2Update, true) + + prices = map[int]float64{ethcoinId: 1.5} + rp.updateL1L2CoinPrice(prices) + l1, l2 = rp.GetL1L2CoinPrice() + require.Equal(t, l1, 1.5) + require.Equal(t, l2, 0.5) + require.Equal(t, rp.tmpPrices.l1Update, false) + require.Equal(t, rp.tmpPrices.l2Update, false) + }, + }, + } + for _, tc := range testcases { + tc.check() + } +} + +func TestUpdate(t *testing.T) { + testcases := []struct { + msg string + cfg Config + check func(rp *KafkaProcessor, err error) + }{ + // FixedType + { // correct + msg: fmt.Sprintf("{\"topic\":\"middle_coinPrice_push\",\"source\":null,\"type\":null,\"data\":{\"priceList\":[{\"coinId\":%d,\"price\":0.04}, {\"coinId\":%d,\"price\":0.002}, {\"coinId\":123,\"price\":0.005}],\"id\":\"98a797ce-f61b-4e90-87ac-445e77ad3599\"}}", ethcoinId, okbcoinId), + cfg: Config{Topic: "middle_coinPrice_push", Type: FixedType}, + check: func(rp *KafkaProcessor, err error) { + require.NoError(t, err) + require.Equal(t, rp.GetL2CoinPrice(), 0.002) + }, + }, + { // not find + msg: fmt.Sprintf("{\"topic\":\"middle_coinPrice_push\",\"source\":null,\"type\":null,\"data\":{\"priceList\":[{\"coinId\":%d,\"price\":0.04}],\"id\":\"98a797ce-f61b-4e90-87ac-445e77ad3599\"}}", ethcoinId), + cfg: Config{Topic: "middle_coinPrice_push", Type: FixedType}, + check: func(rp *KafkaProcessor, err error) { + require.Equal(t, err, ErrNotFindCoinPrice) + require.Equal(t, rp.GetL2CoinPrice(), float64(0)) + }, + }, + { // not find + msg: "{\"topic\":\"middle_coinPrice_push\",\"source\":null,\"type\":null,\"data\":{\"id\":\"98a797ce-f61b-4e90-87ac-445e77ad3599\"}}", + cfg: Config{Topic: "middle_coinPrice_push", Type: FixedType}, + check: func(rp *KafkaProcessor, err error) { + require.EqualError(t, err, "the data PriceList is empty") + require.Equal(t, rp.GetL2CoinPrice(), float64(0)) + }, + }, + + // FollowerType + { // correct + msg: fmt.Sprintf("{\"topic\":\"middle_coinPrice_push\",\"source\":null,\"type\":null,\"data\":{\"priceList\":[{\"coinId\":%d,\"price\":0.04}, {\"coinId\":%d,\"price\":0.002}, {\"coinId\":123,\"price\":0.005}],\"id\":\"98a797ce-f61b-4e90-87ac-445e77ad3599\"}}", ethcoinId, okbcoinId), + cfg: Config{Topic: "middle_coinPrice_push", Type: FollowerType}, + check: func(rp *KafkaProcessor, err error) { + require.NoError(t, err) + l1, l2 := rp.GetL1L2CoinPrice() + require.Equal(t, l1, 0.04) + require.Equal(t, l2, 0.002) + }, + }, + { // not find + msg: fmt.Sprintf("{\"topic\":\"middle_coinPrice_push\",\"source\":null,\"type\":null,\"data\":{\"priceList\":[{\"coinId\":%d,\"price\":0.04}],\"id\":\"98a797ce-f61b-4e90-87ac-445e77ad3599\"}}", ethcoinId+1), + cfg: Config{Topic: "middle_coinPrice_push", Type: FollowerType}, + check: func(rp *KafkaProcessor, err error) { + require.Equal(t, err, ErrNotFindCoinPrice) + l1, l2 := rp.GetL1L2CoinPrice() + require.Equal(t, l1, float64(0)) + require.Equal(t, l2, float64(0)) + }, + }, + { // find one but not update + msg: fmt.Sprintf("{\"topic\":\"middle_coinPrice_push\",\"source\":null,\"type\":null,\"data\":{\"priceList\":[{\"coinId\":%d,\"price\":0.04}],\"id\":\"98a797ce-f61b-4e90-87ac-445e77ad3599\"}}", ethcoinId), + cfg: Config{Topic: "middle_coinPrice_push", Type: FollowerType}, + check: func(rp *KafkaProcessor, err error) { + require.NoError(t, err) + l1, l2 := rp.GetL1L2CoinPrice() + require.Equal(t, l1, float64(0)) + require.Equal(t, l2, float64(0)) + }, + }, + { // find one but not update + msg: fmt.Sprintf("{\"topic\":\"middle_coinPrice_push\",\"source\":null,\"type\":null,\"data\":{\"priceList\":[{\"coinId\":%d,\"price\":0.04}],\"id\":\"98a797ce-f61b-4e90-87ac-445e77ad3599\"}}", okbcoinId), + cfg: Config{Topic: "middle_coinPrice_push", Type: FollowerType}, + check: func(rp *KafkaProcessor, err error) { + require.NoError(t, err) + l1, l2 := rp.GetL1L2CoinPrice() + require.Equal(t, l1, float64(0)) + require.Equal(t, l2, float64(0)) + }, + }, + } + + for _, tc := range testcases { + rp := newKafkaProcessor(tc.cfg, context.Background()) + err := rp.Update([]byte(tc.msg)) + tc.check(rp, err) + } +}