diff --git a/cmd/node/config/external.toml b/cmd/node/config/external.toml index b5e11f73646..14b5a148da3 100644 --- a/cmd/node/config/external.toml +++ b/cmd/node/config/external.toml @@ -42,6 +42,12 @@ # marshalled structures in block events data MarshallerType = "json" +[[GRPCDriversConfig]] + Enabled = false + URL = "localhost:50051" + MarshallerType = "json" + + [[HostDriversConfig]] # This flag shall only be used for observer nodes Enabled = false diff --git a/config/externalConfig.go b/config/externalConfig.go index 9fea75b5024..07dda04728e 100644 --- a/config/externalConfig.go +++ b/config/externalConfig.go @@ -5,6 +5,7 @@ type ExternalConfig struct { ElasticSearchConnector ElasticSearchConfig EventNotifierConnector EventNotifierConfig HostDriversConfig []HostDriversConfig + GRPCDriversConfig []GRPCDriversConfig } // ElasticSearchConfig will hold the configuration for the elastic search @@ -51,3 +52,9 @@ type HostDriversConfig struct { AcknowledgeTimeoutInSec int Version uint32 } + +type GRPCDriversConfig struct { + Enabled bool + URL string + MarshallerType string +} diff --git a/dataRetriever/txpool/memorytests/memory_test.go b/dataRetriever/txpool/memorytests/memory_test.go index 727cdbdca72..936d3409372 100644 --- a/dataRetriever/txpool/memorytests/memory_test.go +++ b/dataRetriever/txpool/memorytests/memory_test.go @@ -46,7 +46,7 @@ func TestShardedTxPool_MemoryFootprint(t *testing.T) { // With larger memory footprint journals = append(journals, runScenario(t, newScenario(100000, 3, 650, "0"), memoryAssertion{290, 340}, memoryAssertion{80, 148})) - journals = append(journals, runScenario(t, newScenario(150000, 2, 650, "0"), memoryAssertion{290, 340}, memoryAssertion{90, 160})) + journals = append(journals, runScenario(t, newScenario(150000, 2, 650, "0"), memoryAssertion{290, 340}, memoryAssertion{90, 161})) journals = append(journals, runScenario(t, newScenario(300000, 1, 650, "0"), memoryAssertion{290, 340}, memoryAssertion{100, 190})) journals = append(journals, runScenario(t, newScenario(30, 10000, 650, "0"), memoryAssertion{290, 340}, memoryAssertion{60, 132})) journals = append(journals, runScenario(t, newScenario(300, 1000, 650, "0"), memoryAssertion{290, 340}, memoryAssertion{60, 148})) diff --git a/factory/status/statusComponents.go b/factory/status/statusComponents.go index 2d2d3fab527..758d4628420 100644 --- a/factory/status/statusComponents.go +++ b/factory/status/statusComponents.go @@ -222,12 +222,18 @@ func (scf *statusComponentsFactory) createOutportDriver() (outport.OutportHandle return nil, err } + grpcDriverArgs, err := scf.makeGRPCDriversArgs() + if err != nil { + return nil, err + } + outportFactoryArgs := &outportDriverFactory.OutportFactoryArgs{ ShardID: scf.shardCoordinator.SelfId(), RetrialInterval: common.RetrialIntervalForOutportDriver, ElasticIndexerFactoryArgs: scf.makeElasticIndexerArgs(), EventNotifierFactoryArgs: eventNotifierArgs, HostDriversArgs: hostDriversArgs, + GRPCDriversArgs: grpcDriverArgs, IsImportDB: scf.isInImportMode, EnableEpochsHandler: scf.coreComponents.EnableEpochsHandler(), EnableRoundsHandler: scf.coreComponents.EnableRoundsHandler(), @@ -295,3 +301,25 @@ func (scf *statusComponentsFactory) makeHostDriversArgs() ([]outportDriverFactor return argsHostDriverFactorySlice, nil } + +func (scf *statusComponentsFactory) makeGRPCDriversArgs() ([]outportDriverFactory.ArgsGRPCDriverFactory, error) { + argsHostDriverFactorySlice := make([]outportDriverFactory.ArgsGRPCDriverFactory, 0, len(scf.externalConfig.GRPCDriversConfig)) + for idx := 0; idx < len(scf.externalConfig.GRPCDriversConfig); idx++ { + grpcConfig := scf.externalConfig.GRPCDriversConfig[idx] + if !grpcConfig.Enabled { + continue + } + + marshaller, err := factoryMarshalizer.NewMarshalizer(grpcConfig.MarshallerType) + if err != nil { + return argsHostDriverFactorySlice, err + } + + argsHostDriverFactorySlice = append(argsHostDriverFactorySlice, outportDriverFactory.ArgsGRPCDriverFactory{ + Marshaller: marshaller, + GRPCClient: grpcConfig, + }) + } + + return argsHostDriverFactorySlice, nil +} diff --git a/go.mod b/go.mod index 6586dc95a90..57dcfa366c3 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/multiversx/mx-chain-go -go 1.23.0 +go 1.23 require ( github.com/beevik/ntp v1.3.0 @@ -16,8 +16,8 @@ require ( github.com/libp2p/go-libp2p v0.38.2 github.com/libp2p/go-libp2p-pubsub v0.13.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/multiversx/mx-chain-communication-go v1.3.1 - github.com/multiversx/mx-chain-core-go v1.5.0 + github.com/multiversx/mx-chain-communication-go v1.3.0 + github.com/multiversx/mx-chain-core-go v1.5.1-0.20260512082900-27993689577d github.com/multiversx/mx-chain-crypto-go v1.3.1 github.com/multiversx/mx-chain-es-indexer-go v1.10.2 github.com/multiversx/mx-chain-logger-go v1.1.0 @@ -34,9 +34,10 @@ require ( github.com/shirou/gopsutil v3.21.11+incompatible github.com/stretchr/testify v1.10.0 github.com/urfave/cli v1.22.16 - golang.org/x/crypto v0.35.0 + golang.org/x/crypto v0.33.0 golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c golang.org/x/sync v0.11.0 + google.golang.org/grpc v1.67.1 gopkg.in/go-playground/validator.v8 v8.18.2 ) @@ -170,7 +171,6 @@ require ( github.com/quic-go/webtransport-go v0.8.1-0.20241018022711-4ac2c9250e66 // indirect github.com/raulk/go-watchdog v1.3.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect - github.com/smartystreets/assertions v1.13.1 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect github.com/tidwall/gjson v1.18.0 // indirect @@ -183,7 +183,7 @@ require ( github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee // indirect github.com/wlynxg/anet v0.0.5 // indirect - github.com/yusufpapurcu/wmi v1.2.2 // indirect + github.com/yusufpapurcu/wmi v1.2.4 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/otel v1.34.0 // indirect @@ -196,11 +196,12 @@ require ( go.uber.org/zap v1.27.0 // indirect golang.org/x/arch v0.8.0 // indirect golang.org/x/mod v0.22.0 // indirect - golang.org/x/net v0.34.0 // indirect + golang.org/x/net v0.35.0 // indirect golang.org/x/sys v0.30.0 // indirect golang.org/x/text v0.22.0 // indirect golang.org/x/tools v0.29.0 // indirect gonum.org/v1/gonum v0.15.1 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 // indirect google.golang.org/protobuf v1.36.4 // indirect gopkg.in/go-playground/assert.v1 v1.2.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index db597d35dba..c7713effa34 100644 --- a/go.sum +++ b/go.sum @@ -399,10 +399,10 @@ github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/n github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU= github.com/multiversx/concurrent-map v0.1.4 h1:hdnbM8VE4b0KYJaGY5yJS2aNIW9TFFsUYwbO0993uPI= github.com/multiversx/concurrent-map v0.1.4/go.mod h1:8cWFRJDOrWHOTNSqgYCUvwT7c7eFQ4U2vKMOp4A/9+o= -github.com/multiversx/mx-chain-communication-go v1.3.1 h1:rJj4FOTqacD+yaAfz61FoEtwpAYmOQFyLEHdy1YZya4= -github.com/multiversx/mx-chain-communication-go v1.3.1/go.mod h1:gDVWn6zUW6aCN1YOm/FbbT5MUmhgn/L1Rmpl8EoH3Yg= -github.com/multiversx/mx-chain-core-go v1.5.0 h1:YBxTsxBGd4hy9A3plcILu+jDy4BcQaD8oyVRDC1tz8A= -github.com/multiversx/mx-chain-core-go v1.5.0/go.mod h1:IO+vspNan+gT0WOHnJ95uvWygiziHZvfXpff6KnxV7g= +github.com/multiversx/mx-chain-communication-go v1.3.0 h1:ziNM1dRuiR/7al2L/jGEA/a/hjurtJ/HEqgazHNt9P8= +github.com/multiversx/mx-chain-communication-go v1.3.0/go.mod h1:gDVWn6zUW6aCN1YOm/FbbT5MUmhgn/L1Rmpl8EoH3Yg= +github.com/multiversx/mx-chain-core-go v1.5.1-0.20260512082900-27993689577d h1:PajYv8TEvVWZFmZ7kRdO4gtlSDSpARW8sTqfOjZjdc8= +github.com/multiversx/mx-chain-core-go v1.5.1-0.20260512082900-27993689577d/go.mod h1:MClgyVtz/diZlfqBvHMC2RnHRHTOe59dvY7faN5HlNg= github.com/multiversx/mx-chain-crypto-go v1.3.1 h1:tCoGkfiv0wz97kuW6AZPW4RVL0Yp7PBo8NKQj9f2oh4= github.com/multiversx/mx-chain-crypto-go v1.3.1/go.mod h1:nPIkxxzyTP8IquWKds+22Q2OJ9W7LtusC7cAosz7ojM= github.com/multiversx/mx-chain-es-indexer-go v1.10.2 h1:mLFRUpZ2bWeYplU1e0kb318kk1x7AV9owq5B4XRdOqE= @@ -576,9 +576,8 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeV github.com/shurcooL/users v0.0.0-20180125191416-49c67e49c537/go.mod h1:QJTqeLYEDaXHZDBsXlPCDqdhQuJkuw4NOtaxYe3xii4= github.com/shurcooL/webdavfs v0.0.0-20170829043945-18c3829fa133/go.mod h1:hKmq5kWdCj2z2KEozexVbfEZIWiTjhE0+UjmZgPqehw= github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= +github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs= github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= -github.com/smartystreets/assertions v1.13.1 h1:Ef7KhSmjZcK6AVf9YbJdvPYG9avaF0ZxudX+ThRdWfU= -github.com/smartystreets/assertions v1.13.1/go.mod h1:cXr/IwVfSo/RbCSPhoAPv73p3hlSdrBH/b3SdnW/LMY= github.com/smartystreets/goconvey v1.7.2 h1:9RBaZCeXEQ3UselpuwUQHltGVXvdwm6cv1hgR6gDIPg= github.com/smartystreets/goconvey v1.7.2/go.mod h1:Vw0tHAZW6lzCRk3xgdin6fKYcG+G3Pg9vgXWeJpQFMM= github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:UdhH50NIW0fCiwBSr0co2m7BnFLdv4fQTgdqdJTHFeE= @@ -643,8 +642,8 @@ github.com/xlab/treeprint v1.0.0/go.mod h1:IoImgRak9i3zJyuxOKUP1v4UZd1tMoKkq/Cim github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg= -github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= +github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= @@ -697,8 +696,8 @@ golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE golang.org/x/crypto v0.10.0/go.mod h1:o4eNf7Ede1fv+hwOwZsTHl9EsPFO6q6ZvYR8vYfY45I= golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= -golang.org/x/crypto v0.35.0 h1:b15kiHdrGCHrP6LvwaQ3c03kgNhhiMgvlhxHQhmg2Xs= -golang.org/x/crypto v0.35.0/go.mod h1:dy7dXNW32cAb/6/PRuTNsix8T+vJAqvuIy5Bli/x0YQ= +golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus= +golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c h1:KL/ZBHXgKGVmuZBZ01Lt57yE5ws8ZPSkkihmEyq7FXc= golang.org/x/exp v0.0.0-20250128182459-e0ece0dbea4c/go.mod h1:tujkw807nyEEAamNbDrEGzRav+ilXA7PCRAd6xsmwiU= @@ -745,8 +744,8 @@ golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ= golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= -golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= -golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= +golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8= +golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -871,6 +870,8 @@ google.golang.org/genproto v0.0.0-20181202183823-bd91e49a0898/go.mod h1:7Ep/1NZk google.golang.org/genproto v0.0.0-20190306203927-b5d61aea6440/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 h1:QCqS/PdaHTSWGvupk2F/ehwHtGc0/GYkT+3GAcR1CCc= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= @@ -879,6 +880,8 @@ google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyac google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= +google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= +google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/integrationTests/chainSimulator/outport/outport_test.go b/integrationTests/chainSimulator/outport/outport_test.go new file mode 100644 index 00000000000..08176f49c29 --- /dev/null +++ b/integrationTests/chainSimulator/outport/outport_test.go @@ -0,0 +1,66 @@ +package outport + +import ( + "testing" + + "github.com/multiversx/mx-chain-core-go/core" + outportcore "github.com/multiversx/mx-chain-core-go/data/outport" + "github.com/multiversx/mx-chain-core-go/data/outport/grpcadapter" + "github.com/multiversx/mx-chain-go/config" + "github.com/multiversx/mx-chain-go/node/chainSimulator" + "github.com/multiversx/mx-chain-go/node/chainSimulator/components/api" + "github.com/multiversx/mx-chain-go/outport/mock" + "github.com/stretchr/testify/require" +) + +func TestChainSimulatorWithOutportGrpcEnabled(t *testing.T) { + count := 0 + indexer := &mock.DriverStub{ + SaveBlockCalled: func(outportBlock *outportcore.OutportBlock) error { + require.NotNil(t, outportBlock.BlockData) + count++ + return nil + }, + } + outportGRPCServer, err := grpcadapter.NewOutportGRPCServer("127.0.0.1:0", indexer) + require.Nil(t, err) + address := outportGRPCServer.Address() + require.NotEmpty(t, address) + + defer func() { + _ = outportGRPCServer.Close() + }() + go func() { + _ = outportGRPCServer.Start() + }() + + roundsPerEpochOpt := core.OptionalUint64{ + HasValue: true, + Value: 20, + } + + cs, err := chainSimulator.NewChainSimulator(chainSimulator.ArgsChainSimulator{ + BypassTxSignatureCheck: true, + BypassCreateBlockTimeCheck: true, + TempDir: t.TempDir(), + PathToInitialConfig: "../../../cmd/node/config/", + NumOfShards: 3, + RoundDurationInMillis: 6000, + SupernovaRoundDurationInMillis: 600, + RoundsPerEpoch: roundsPerEpochOpt, + SupernovaRoundsPerEpoch: roundsPerEpochOpt, + ApiInterface: api.NewNoApiInterface(), + MinNodesPerShard: 3, + MetaChainMinNodes: 3, + AlterConfigsFunction: func(cfg *config.Configs) { + cfg.ExternalConfig.GRPCDriversConfig[0].Enabled = true + cfg.ExternalConfig.GRPCDriversConfig[0].URL = address + }, + }) + require.Nil(t, err) + require.NotNil(t, cs) + + err = cs.GenerateBlocks(1) + require.Nil(t, err) + require.Equal(t, 8, count) +} diff --git a/node/chainSimulator/chainSimulator_test.go b/node/chainSimulator/chainSimulator_test.go index 45b656d9ca0..866e24c0f9c 100644 --- a/node/chainSimulator/chainSimulator_test.go +++ b/node/chainSimulator/chainSimulator_test.go @@ -61,9 +61,9 @@ func TestChainSimulatorCheckSupernova(t *testing.T) { ApiInterface: api.NewNoApiInterface(), MinNodesPerShard: 3, MetaChainMinNodes: 3, - AlterConfigsFunction: func(cfg *config.Configs) { - - }, + //AlterConfigsFunction: func(cfg *config.Configs) { + // cfg.ExternalConfig.GRPCDriversConfig[0].Enabled = true + //}, }) require.Nil(t, err) require.NotNil(t, chainSimulator) diff --git a/node/chainSimulator/components/statusComponents.go b/node/chainSimulator/components/statusComponents.go index 4b3f5bb6de1..b817d3c5bcb 100644 --- a/node/chainSimulator/components/statusComponents.go +++ b/node/chainSimulator/components/statusComponents.go @@ -58,11 +58,17 @@ func CreateStatusComponents(shardID uint32, appStatusHandler core.AppStatusHandl if err != nil { return nil, err } + grpcDriversArgs, err := makeGRPCDriversArgs(external) + if err != nil { + return nil, err + } + instance.outportHandler, err = outportFactory.CreateOutport(&outportFactory.OutportFactoryArgs{ IsImportDB: false, ShardID: shardID, RetrialInterval: time.Second, HostDriversArgs: hostDriverArgs, + GRPCDriversArgs: grpcDriversArgs, EventNotifierFactoryArgs: &outportFactory.EventNotifierFactoryArgs{}, ElasticIndexerFactoryArgs: makeElasticIndexerArgs(external, coreComponents), EnableEpochsHandler: coreComponents.EnableEpochsHandler(), @@ -111,6 +117,28 @@ func (s *statusComponentsHolder) epochStartEventHandler() epochStart.ActionHandl return subscribeHandler } +func makeGRPCDriversArgs(external config.ExternalConfig) ([]outportFactory.ArgsGRPCDriverFactory, error) { + argsGRPCDriverFactorySlice := make([]outportFactory.ArgsGRPCDriverFactory, 0, len(external.GRPCDriversConfig)) + for idx := 0; idx < len(external.GRPCDriversConfig); idx++ { + grpcConfig := external.GRPCDriversConfig[idx] + if !grpcConfig.Enabled { + continue + } + + marshaller, err := factoryMarshalizer.NewMarshalizer(grpcConfig.MarshallerType) + if err != nil { + return argsGRPCDriverFactorySlice, err + } + + argsGRPCDriverFactorySlice = append(argsGRPCDriverFactorySlice, outportFactory.ArgsGRPCDriverFactory{ + Marshaller: marshaller, + GRPCClient: grpcConfig, + }) + } + + return argsGRPCDriverFactorySlice, nil +} + func makeHostDriversArgs(external config.ExternalConfig) ([]outportFactory.ArgsHostDriverFactory, error) { argsHostDriverFactorySlice := make([]outportFactory.ArgsHostDriverFactory, 0, len(external.HostDriversConfig)) for idx := 0; idx < len(external.HostDriversConfig); idx++ { diff --git a/outport/factory/outportFactory.go b/outport/factory/outportFactory.go index 64c5671737d..3044f4285c9 100644 --- a/outport/factory/outportFactory.go +++ b/outport/factory/outportFactory.go @@ -1,13 +1,20 @@ package factory import ( + "context" "fmt" "time" outportcore "github.com/multiversx/mx-chain-core-go/data/outport" + "github.com/multiversx/mx-chain-core-go/data/outport/grpcadapter" + "github.com/multiversx/mx-chain-core-go/marshal" indexerFactory "github.com/multiversx/mx-chain-es-indexer-go/process/factory" "github.com/multiversx/mx-chain-go/common" + "github.com/multiversx/mx-chain-go/config" "github.com/multiversx/mx-chain-go/outport" + "github.com/multiversx/mx-chain-go/outport/grpcdriver" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) // OutportFactoryArgs holds the factory arguments of different outport drivers @@ -18,6 +25,7 @@ type OutportFactoryArgs struct { ElasticIndexerFactoryArgs indexerFactory.ArgsIndexerFactory EventNotifierFactoryArgs *EventNotifierFactoryArgs HostDriversArgs []ArgsHostDriverFactory + GRPCDriversArgs []ArgsGRPCDriverFactory EnableEpochsHandler common.EnableEpochsHandler EnableRoundsHandler common.EnableRoundsHandler } @@ -70,6 +78,13 @@ func createAndSubscribeDrivers(outport outport.OutportHandler, args *OutportFact } } + for idx := 0; idx < len(args.GRPCDriversArgs); idx++ { + err = createAndSubscribeGRPCDriverIfNeeded(outport, args.GRPCDriversArgs[idx]) + if err != nil { + return fmt.Errorf("%w when calling createAndSubscribeGRPCDriverIfNeeded, grpc driver index %d", err, idx) + } + } + return nil } @@ -128,3 +143,48 @@ func createAndSubscribeHostDriverIfNeeded( return outport.SubscribeDriver(hostDriver) } + +type ArgsGRPCDriverFactory struct { + GRPCClient config.GRPCDriversConfig + Marshaller marshal.Marshalizer +} + +func createAndSubscribeGRPCDriverIfNeeded( + outport outport.OutportHandler, + args ArgsGRPCDriverFactory, +) error { + if !args.GRPCClient.Enabled { + return nil + } + + grpcClient, err := grpcadapter.NewOutportGRPCClient( + args.GRPCClient.URL, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithUnaryInterceptor(logGRPCOutportCalls), + ) + if err != nil { + return err + } + + grpcDriver, err := grpcdriver.NewGRPCDriver(grpcClient, args.Marshaller) + if err != nil { + return err + } + + return outport.SubscribeDriver(grpcDriver) +} + +func logGRPCOutportCalls( + ctx context.Context, + method string, + req any, + reply any, + cc *grpc.ClientConn, + invoker grpc.UnaryInvoker, + opts ...grpc.CallOption, +) error { + start := time.Now() + err := invoker(ctx, method, req, reply, cc, opts...) + log.Debug("grpc call", "method", method, "duration", time.Since(start)) + return err +} diff --git a/outport/grpcdriver/grpcDriver.go b/outport/grpcdriver/grpcDriver.go new file mode 100644 index 00000000000..28d985b9e5c --- /dev/null +++ b/outport/grpcdriver/grpcDriver.go @@ -0,0 +1,119 @@ +package grpcdriver + +import ( + "context" + + outportcore "github.com/multiversx/mx-chain-core-go/data/outport" + "github.com/multiversx/mx-chain-core-go/data/outport/grpcadapter" + "github.com/multiversx/mx-chain-core-go/marshal" + "github.com/multiversx/mx-chain-go/outport" +) + +// grpcDriver forwards outport driver calls to a remote gRPC outport service. +type grpcDriver struct { + client grpcadapter.OutportClient + marshaller marshal.Marshalizer +} + +// NewGRPCDriver creates a driver implementation backed by a gRPC outport client. +func NewGRPCDriver(client grpcadapter.OutportClient, marshaller marshal.Marshalizer) (outport.Driver, error) { + return &grpcDriver{ + client: client, + marshaller: marshaller, + }, nil +} + +// SaveBlock forwards the block payload to the remote outport service. +func (g *grpcDriver) SaveBlock(outportBlock *outportcore.OutportBlock) error { + _, err := g.client.SaveBlock(context.Background(), outportBlock) + if err != nil { + return err + } + + return nil +} + +// RevertIndexedBlock calls the remote service to rollback indexed data for a block. +func (g *grpcDriver) RevertIndexedBlock(blockData *outportcore.BlockData) error { + _, err := g.client.RevertIndexedBlock(context.Background(), blockData) + if err != nil { + return err + } + + return nil +} + +// SaveRoundsInfo synchronizes latest rounds metadata with the outport service. +func (g *grpcDriver) SaveRoundsInfo(roundsInfos *outportcore.RoundsInfo) error { + _, err := g.client.SaveRoundsInfo(context.Background(), roundsInfos) + if err != nil { + return err + } + + return nil +} + +// SaveValidatorsPubKeys sends validator keys to the remote store. +func (g *grpcDriver) SaveValidatorsPubKeys(validatorsPubKeys *outportcore.ValidatorsPubKeys) error { + _, err := g.client.SaveValidatorsPubKeys(context.Background(), validatorsPubKeys) + if err != nil { + return err + } + + return nil +} + +// SaveValidatorsRating streams the current validators rating snapshot to gRPC. +func (g *grpcDriver) SaveValidatorsRating(validatorsRating *outportcore.ValidatorsRating) error { + _, err := g.client.SaveValidatorsRating(context.Background(), validatorsRating) + if err != nil { + return err + } + + return nil +} + +// SaveAccounts writes account details via the remote controller. +func (g *grpcDriver) SaveAccounts(accounts *outportcore.Accounts) error { + _, err := g.client.SaveAccounts(context.Background(), accounts) + if err != nil { + return err + } + + return nil +} + +// FinalizedBlock publishes the finalized block event over gRPC. +func (g *grpcDriver) FinalizedBlock(finalizedBlock *outportcore.FinalizedBlock) error { + _, err := g.client.FinalizedBlockEvent(context.Background(), finalizedBlock) + if err != nil { + return err + } + + return nil +} + +// GetMarshaller returns the marshaller assigned to this driver for serialization. +func (g *grpcDriver) GetMarshaller() marshal.Marshalizer { + return g.marshaller +} + +// SetCurrentSettings does nothing +func (g *grpcDriver) SetCurrentSettings(_ outportcore.OutportConfig) error { + return nil +} + +// RegisterHandler does nothing +func (g *grpcDriver) RegisterHandler(_ func() error, _ string) error { + return nil +} + +// Close does nothing/ +func (g *grpcDriver) Close() error { + return nil +} + +// IsInterfaceNil supports the interface nil check used by the caller. +func (g *grpcDriver) IsInterfaceNil() bool { + return g == nil +} diff --git a/outport/grpcdriver/grpcDriver_test.go b/outport/grpcdriver/grpcDriver_test.go new file mode 100644 index 00000000000..00daeb9cb33 --- /dev/null +++ b/outport/grpcdriver/grpcDriver_test.go @@ -0,0 +1,187 @@ +package grpcdriver + +import ( + "context" + "errors" + "testing" + + outportcore "github.com/multiversx/mx-chain-core-go/data/outport" + "github.com/multiversx/mx-chain-core-go/data/outport/grpcadapter" + "github.com/multiversx/mx-chain-go/outport/mock" + "github.com/multiversx/mx-chain-go/testscommon" + "github.com/stretchr/testify/require" +) + +func TestNewGRPCDriver(t *testing.T) { + t.Parallel() + + marshaller := &testscommon.MarshallerStub{} + client := &mock.OutportGRPCClientStub{} + + driver, err := NewGRPCDriver(client, marshaller) + + require.NoError(t, err) + require.NotNil(t, driver) + require.Same(t, marshaller, driver.GetMarshaller()) +} + +func TestGrpcDriverDelegatesCalls(t *testing.T) { + t.Parallel() + + expectedErr := errors.New("expected error") + + tests := []struct { + name string + client func(t *testing.T) grpcadapter.OutportClient + call func(driver *grpcDriver) error + }{ + { + name: "SaveBlock", + client: func(t *testing.T) grpcadapter.OutportClient { + expected := &outportcore.OutportBlock{ShardID: 1} + return &mock.OutportGRPCClientStub{ + SaveBlockCalled: func(ctx context.Context, in *outportcore.OutportBlock) error { + require.Equal(t, expected, in) + require.NotNil(t, ctx) + return expectedErr + }, + } + }, + call: func(driver *grpcDriver) error { + return driver.SaveBlock(&outportcore.OutportBlock{ShardID: 1}) + }, + }, + { + name: "RevertIndexedBlock", + client: func(t *testing.T) grpcadapter.OutportClient { + expected := &outportcore.BlockData{ShardID: 2} + return &mock.OutportGRPCClientStub{ + RevertIndexedBlockCalled: func(ctx context.Context, in *outportcore.BlockData) error { + require.Equal(t, expected, in) + require.NotNil(t, ctx) + return expectedErr + }, + } + }, + call: func(driver *grpcDriver) error { + return driver.RevertIndexedBlock(&outportcore.BlockData{ShardID: 2}) + }, + }, + { + name: "SaveRoundsInfo", + client: func(t *testing.T) grpcadapter.OutportClient { + expected := &outportcore.RoundsInfo{} + return &mock.OutportGRPCClientStub{ + SaveRoundsInfoCalled: func(ctx context.Context, in *outportcore.RoundsInfo) error { + require.Equal(t, expected, in) + require.NotNil(t, ctx) + return expectedErr + }, + } + }, + call: func(driver *grpcDriver) error { + return driver.SaveRoundsInfo(&outportcore.RoundsInfo{}) + }, + }, + { + name: "SaveValidatorsPubKeys", + client: func(t *testing.T) grpcadapter.OutportClient { + expected := &outportcore.ValidatorsPubKeys{ShardID: 3} + return &mock.OutportGRPCClientStub{ + SaveValidatorsPubKeysCalled: func(ctx context.Context, in *outportcore.ValidatorsPubKeys) error { + require.Equal(t, expected, in) + require.NotNil(t, ctx) + return expectedErr + }, + } + }, + call: func(driver *grpcDriver) error { + return driver.SaveValidatorsPubKeys(&outportcore.ValidatorsPubKeys{ShardID: 3}) + }, + }, + { + name: "SaveValidatorsRating", + client: func(t *testing.T) grpcadapter.OutportClient { + expected := &outportcore.ValidatorsRating{ShardID: 4} + return &mock.OutportGRPCClientStub{ + SaveValidatorsRatingCalled: func(ctx context.Context, in *outportcore.ValidatorsRating) error { + require.Equal(t, expected, in) + require.NotNil(t, ctx) + return expectedErr + }, + } + }, + call: func(driver *grpcDriver) error { + return driver.SaveValidatorsRating(&outportcore.ValidatorsRating{ShardID: 4}) + }, + }, + { + name: "SaveAccounts", + client: func(t *testing.T) grpcadapter.OutportClient { + expected := &outportcore.Accounts{ShardID: 5} + return &mock.OutportGRPCClientStub{ + SaveAccountsCalled: func(ctx context.Context, in *outportcore.Accounts) error { + require.Equal(t, expected, in) + require.NotNil(t, ctx) + return expectedErr + }, + } + }, + call: func(driver *grpcDriver) error { + return driver.SaveAccounts(&outportcore.Accounts{ShardID: 5}) + }, + }, + { + name: "FinalizedBlock", + client: func(t *testing.T) grpcadapter.OutportClient { + expected := &outportcore.FinalizedBlock{ShardID: 6} + return &mock.OutportGRPCClientStub{ + FinalizedBlockEventCalled: func(ctx context.Context, in *outportcore.FinalizedBlock) error { + require.Equal(t, expected, in) + require.NotNil(t, ctx) + return expectedErr + }, + } + }, + call: func(driver *grpcDriver) error { + return driver.FinalizedBlock(&outportcore.FinalizedBlock{ShardID: 6}) + }, + }, + } + + for _, test := range tests { + tt := test + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + driver := &grpcDriver{ + client: tt.client(t), + marshaller: &testscommon.MarshallerStub{}, + } + + err := tt.call(driver) + + require.ErrorIs(t, err, expectedErr) + }) + } +} + +func TestGrpcDriverNoOpMethods(t *testing.T) { + t.Parallel() + + driver := &grpcDriver{marshaller: &testscommon.MarshallerStub{}} + + require.NoError(t, driver.SetCurrentSettings(outportcore.OutportConfig{})) + require.NoError(t, driver.RegisterHandler(nil, "")) + require.NoError(t, driver.Close()) +} + +func TestGrpcDriverIsInterfaceNil(t *testing.T) { + t.Parallel() + + var nilDriver *grpcDriver + require.True(t, nilDriver.IsInterfaceNil()) + + driver := &grpcDriver{} + require.False(t, driver.IsInterfaceNil()) +} diff --git a/outport/mock/grpcClientStub.go b/outport/mock/grpcClientStub.go new file mode 100644 index 00000000000..f45bab1cf6b --- /dev/null +++ b/outport/mock/grpcClientStub.go @@ -0,0 +1,58 @@ +package mock + +import ( + "context" + + outportcore "github.com/multiversx/mx-chain-core-go/data/outport" +) + +// OutportGRPCClientStub - +type OutportGRPCClientStub struct { + SaveBlockCalled func(ctx context.Context, in *outportcore.OutportBlock) error + RevertIndexedBlockCalled func(ctx context.Context, in *outportcore.BlockData) error + SaveRoundsInfoCalled func(ctx context.Context, in *outportcore.RoundsInfo) error + SaveValidatorsPubKeysCalled func(ctx context.Context, in *outportcore.ValidatorsPubKeys) error + SaveValidatorsRatingCalled func(ctx context.Context, in *outportcore.ValidatorsRating) error + SaveAccountsCalled func(ctx context.Context, in *outportcore.Accounts) error + FinalizedBlockEventCalled func(ctx context.Context, in *outportcore.FinalizedBlock) error +} + +// SaveBlock - +func (stub *OutportGRPCClientStub) SaveBlock(ctx context.Context, in *outportcore.OutportBlock) (*outportcore.ResponseData, error) { + return nil, stub.SaveBlockCalled(ctx, in) +} + +// RevertIndexedBlock - +func (stub *OutportGRPCClientStub) RevertIndexedBlock(ctx context.Context, in *outportcore.BlockData) (*outportcore.ResponseData, error) { + return nil, stub.RevertIndexedBlockCalled(ctx, in) +} + +// SaveRoundsInfo - +func (stub *OutportGRPCClientStub) SaveRoundsInfo(ctx context.Context, in *outportcore.RoundsInfo) (*outportcore.ResponseData, error) { + return nil, stub.SaveRoundsInfoCalled(ctx, in) +} + +// SaveValidatorsPubKeys - +func (stub *OutportGRPCClientStub) SaveValidatorsPubKeys(ctx context.Context, in *outportcore.ValidatorsPubKeys) (*outportcore.ResponseData, error) { + return nil, stub.SaveValidatorsPubKeysCalled(ctx, in) +} + +// SaveValidatorsRating - +func (stub *OutportGRPCClientStub) SaveValidatorsRating(ctx context.Context, in *outportcore.ValidatorsRating) (*outportcore.ResponseData, error) { + return nil, stub.SaveValidatorsRatingCalled(ctx, in) +} + +// SaveAccounts - +func (stub *OutportGRPCClientStub) SaveAccounts(ctx context.Context, in *outportcore.Accounts) (*outportcore.ResponseData, error) { + return nil, stub.SaveAccountsCalled(ctx, in) +} + +// FinalizedBlockEvent - +func (stub *OutportGRPCClientStub) FinalizedBlockEvent(ctx context.Context, in *outportcore.FinalizedBlock) (*outportcore.ResponseData, error) { + return nil, stub.FinalizedBlockEventCalled(ctx, in) +} + +// IsInterfaceNil - +func (stub *OutportGRPCClientStub) IsInterfaceNil() bool { + return stub == nil +}