From 8f27426e4d55ea95d2fa83fd4a8b333463d1482a Mon Sep 17 00:00:00 2001 From: Artur Abelian Date: Fri, 17 Oct 2025 23:33:21 +0200 Subject: [PATCH 01/11] feat: added waiters into request list in ws client --- v2/common/websocket/client.go | 68 +++++++++++++++++++++--------- v2/common/websocket/client_test.go | 35 ++++++++++++++- v2/common/websocket/mock/client.go | 48 ++++++++++++++++----- v2/common/websocket/types.go | 8 ++-- 4 files changed, 124 insertions(+), 35 deletions(-) diff --git a/v2/common/websocket/client.go b/v2/common/websocket/client.go index e21f54ba6..b04571306 100644 --- a/v2/common/websocket/client.go +++ b/v2/common/websocket/client.go @@ -29,8 +29,8 @@ var ( // ErrorWsReadConnectionTimeout defines that connection read timeout expired ErrorWsReadConnectionTimeout = errors.New("ws error: read connection timeout") - // ErrorWsIdAlreadySent defines that request with the same id was already sent - ErrorWsIdAlreadySent = errors.New("ws error: request with same id already sent") + // ErrorWsIdAlreadySent defines that asyncWriteRequest with the same id was already sent + ErrorWsIdAlreadySent = errors.New("ws error: asyncWriteRequest with same id already sent") // KeepAlivePingDeadline defines deadline to send ping frame KeepAlivePingDeadline = 10 * time.Second @@ -39,7 +39,7 @@ var ( WaitCheckInternal = 300 * time.Millisecond ) -// messageId define id field of request/response +// messageId define id field of asyncWriteRequest/response type messageId struct { Id string `json:"id"` } @@ -87,8 +87,22 @@ func NewClient(conn Connection) (Client, error) { return client, nil } +type asyncWriteRequest struct { + waiter chan []byte +} + +// RequestOption define option type for asyncWriteRequest +type RequestOption func(*asyncWriteRequest) + +// WithWaiter set waiter channel param for the asyncWriteRequest +func WithWaiter(waiter chan []byte) RequestOption { + return func(r *asyncWriteRequest) { + r.waiter = waiter + } +} + type Client interface { - Write(id string, data []byte) error + Write(id string, data []byte, opts ...RequestOption) error WriteSync(id string, data []byte, timeout time.Duration) ([]byte, error) GetReadChannel() <-chan []byte GetReadErrorChannel() <-chan error @@ -98,7 +112,7 @@ type Client interface { } // Write sends data into websocket connection -func (c *client) Write(id string, data []byte) error { +func (c *client) Write(id string, data []byte, opts ...RequestOption) error { c.connMu.Lock() defer c.connMu.Unlock() @@ -106,12 +120,17 @@ func (c *client) Write(id string, data []byte) error { return ErrorWsIdAlreadySent } + req := &asyncWriteRequest{} + for _, opt := range opts { + opt(req) + } + if err := c.conn.WriteMessage(websocket.TextMessage, data); err != nil { c.debug("write: unable to write message into websocket conn '%v'", err) return err } - c.requestsList.Add(id) + c.requestsList.Add(id, req.waiter) return nil } @@ -203,10 +222,15 @@ func (c *client) read() { continue } - c.debug("read: sending message into read channel '%v'", msg) - c.readC <- message + if waiter := c.requestsList.Get(msg.Id); waiter != nil { + c.debug("read: send message into waiter channel '%v'", msg.Id) + waiter <- message + } else { + c.debug("read: sending message into read channel '%v'", msg) + c.readC <- message + } - c.debug("read: remove message from request list '%v'", msg) + c.debug("read: remove message from asyncWriteRequest list '%v'", msg) c.requestsList.Remove(msg.Id) } } @@ -278,35 +302,41 @@ func (c *client) GetReconnectCount() int64 { return atomic.LoadInt64(&c.reconnectCount) } -// NewRequestList creates request list +// NewRequestList creates asyncWriteRequest list func NewRequestList() RequestList { return RequestList{ mu: sync.Mutex{}, - requests: make(map[string]struct{}), // TODO preallocate buckets + requests: make(map[string]chan []byte), // TODO preallocate buckets } } -// RequestList state of requests that was sent/received +// RequestList state of waiters that was sent/received with or without waiter channel type RequestList struct { mu sync.Mutex - requests map[string]struct{} + requests map[string]chan []byte +} + +// Add adds asyncWriteRequest into list +func (l *RequestList) Add(id string, waiterChan chan []byte) { + l.mu.Lock() + defer l.mu.Unlock() + l.requests[id] = waiterChan } -// Add adds request into list -func (l *RequestList) Add(id string) { +func (l *RequestList) Get(id string) chan []byte { l.mu.Lock() defer l.mu.Unlock() - l.requests[id] = struct{}{} + return l.requests[id] } -// RecreateList creates new request list +// RecreateList creates new asyncWriteRequest list func (l *RequestList) RecreateList() { l.mu.Lock() defer l.mu.Unlock() - l.requests = make(map[string]struct{}) + l.requests = make(map[string]chan []byte) } -// Remove adds request from list +// Remove adds asyncWriteRequest from list func (l *RequestList) Remove(id string) { l.mu.Lock() defer l.mu.Unlock() diff --git a/v2/common/websocket/client_test.go b/v2/common/websocket/client_test.go index 62853a268..27ac1cca0 100644 --- a/v2/common/websocket/client_test.go +++ b/v2/common/websocket/client_test.go @@ -95,7 +95,7 @@ func (s *clientTestSuite) TestReadWriteSync() { requestID := id.String() req := testApiRequest{ - Id: "some-other-request-id", + Id: "some-other-asyncWriteRequest-id", Method: "some-method", Params: map[string]interface{}{}, } @@ -171,6 +171,39 @@ func (s *clientTestSuite) TestReadWriteSync() { } }, }, + { + name: "WriteAsync success with waiter channel", + testCallback: func() { + id, err := uuid.NewRandom() + s.Require().NoError(err) + requestID := id.String() + + req := testApiRequest{ + Id: requestID, + Method: "some-method-with-waiter", + Params: map[string]interface{}{}, + } + reqRaw, err := json.Marshal(req) + s.Require().NoError(err) + + waiter := make(chan []byte) + + err = client.Write(requestID, reqRaw, WithWaiter(waiter)) + s.Require().NoError(err) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + select { + case <-ctx.Done(): + s.T().Fatal("timeout waiting for write") + case responseRaw := <-waiter: + s.Require().Equal(reqRaw, responseRaw) + case err := <-client.GetReadErrorChannel(): + s.T().Fatalf("unexpected error: '%v'", err) + } + }, + }, } for _, tt := range tests { diff --git a/v2/common/websocket/mock/client.go b/v2/common/websocket/mock/client.go index 688a0fedf..5001fa5e8 100644 --- a/v2/common/websocket/mock/client.go +++ b/v2/common/websocket/mock/client.go @@ -35,6 +35,20 @@ func (m *MockClient) EXPECT() *MockClientMockRecorder { return m.recorder } +// Close mocks base method. +func (m *MockClient) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockClientMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockClient)(nil).Close)) +} + // GetReadChannel mocks base method. func (m *MockClient) GetReadChannel() <-chan []byte { m.ctrl.T.Helper() @@ -90,17 +104,22 @@ func (mr *MockClientMockRecorder) Wait(timeout interface{}) *gomock.Call { } // Write mocks base method. -func (m *MockClient) Write(id string, data []byte) error { +func (m *MockClient) Write(id string, data []byte, opts ...websocket.RequestOption) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Write", id, data) + varargs := []interface{}{id, data} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Write", varargs...) ret0, _ := ret[0].(error) return ret0 } // Write indicates an expected call of Write. -func (mr *MockClientMockRecorder) Write(id, data interface{}) *gomock.Call { +func (mr *MockClientMockRecorder) Write(id, data interface{}, opts ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Write", reflect.TypeOf((*MockClient)(nil).Write), id, data) + varargs := append([]interface{}{id, data}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Write", reflect.TypeOf((*MockClient)(nil).Write), varargs...) } // WriteSync mocks base method. @@ -112,13 +131,6 @@ func (m *MockClient) WriteSync(id string, data []byte, timeout time.Duration) ([ return ret0, ret1 } -func (m *MockClient) Close() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Close") - ret0, _ := ret[0].(error) - return ret0 -} - // WriteSync indicates an expected call of WriteSync. func (mr *MockClientMockRecorder) WriteSync(id, data, timeout interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() @@ -148,6 +160,20 @@ func (m *MockConnection) EXPECT() *MockConnectionMockRecorder { return m.recorder } +// Close mocks base method. +func (m *MockConnection) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockConnectionMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockConnection)(nil).Close)) +} + // ReadMessage mocks base method. func (m *MockConnection) ReadMessage() (int, []byte, error) { m.ctrl.T.Helper() diff --git a/v2/common/websocket/types.go b/v2/common/websocket/types.go index d8efcc722..a4c0dd847 100644 --- a/v2/common/websocket/types.go +++ b/v2/common/websocket/types.go @@ -13,7 +13,7 @@ import ( // WsApiMethodType define method name for websocket API type WsApiMethodType string -// WsApiRequest define common websocket API request +// WsApiRequest define common websocket API asyncWriteRequest type WsApiRequest struct { Id string `json:"id"` Method WsApiMethodType `json:"method"` @@ -77,8 +77,8 @@ const ( ) var ( - // ErrorRequestIDNotSet defines that request ID is not set - ErrorRequestIDNotSet = errors.New("ws service: request id is not set") + // ErrorRequestIDNotSet defines that asyncWriteRequest ID is not set + ErrorRequestIDNotSet = errors.New("ws service: asyncWriteRequest id is not set") // ErrorApiKeyIsNotSet defines that ApiKey is not set ErrorApiKeyIsNotSet = errors.New("ws service: api key is not set") @@ -111,7 +111,7 @@ type RequestData struct { keyType string } -// CreateRequest creates signed ws request +// CreateRequest creates signed ws asyncWriteRequest func CreateRequest(reqData RequestData, method WsApiMethodType, params map[string]interface{}) ([]byte, error) { if reqData.requestID == "" { return nil, ErrorRequestIDNotSet From 8fa8b47b5fdd83fa51cda69e2f27ad8deeb869d8 Mon Sep 17 00:00:00 2001 From: Artur Abelian Date: Sat, 18 Oct 2025 00:17:15 +0200 Subject: [PATCH 02/11] feat: added optional created client into ws services constructors and waiters channel into write methods --- v2/common/websocket/ws_service_options.go | 15 +++++++++++ v2/futures/order_cancel_service_ws.go | 32 +++++++++++++++-------- v2/futures/order_place_service_ws.go | 32 +++++++++++++++-------- v2/futures/order_status_service_ws.go | 31 +++++++++++++++------- v2/order_list_cancel_service_ws.go | 31 +++++++++++++++------- v2/order_list_place_oto_service_ws.go | 31 +++++++++++++++------- v2/order_list_place_otoco_service_ws.go | 31 +++++++++++++++------- v2/order_list_place_service_ws.go | 31 +++++++++++++++------- v2/order_list_service_ws_create.go | 31 +++++++++++++++------- v2/order_service_ws_create.go | 31 +++++++++++++++------- v2/sor_order_place_service_ws.go | 31 +++++++++++++++------- v2/sor_order_test_service_ws.go | 31 +++++++++++++++------- 12 files changed, 246 insertions(+), 112 deletions(-) create mode 100644 v2/common/websocket/ws_service_options.go diff --git a/v2/common/websocket/ws_service_options.go b/v2/common/websocket/ws_service_options.go new file mode 100644 index 000000000..984bb1f65 --- /dev/null +++ b/v2/common/websocket/ws_service_options.go @@ -0,0 +1,15 @@ +package websocket + +// WebSocketServiceOption represents a functional option for WebSocket services +type WebSocketServiceOption func(serviceOpt *WebSocketServiceCreateOption) + +type WebSocketServiceCreateOption struct { + Client Client +} + +// WithWebSocketClient creates an option to set the websocket Client for any WebSocket service +func WithWebSocketClient(client Client) WebSocketServiceOption { + return func(opt *WebSocketServiceCreateOption) { + opt.Client = client + } +} diff --git a/v2/futures/order_cancel_service_ws.go b/v2/futures/order_cancel_service_ws.go index f47bfae88..e61a31a87 100644 --- a/v2/futures/order_cancel_service_ws.go +++ b/v2/futures/order_cancel_service_ws.go @@ -74,7 +74,6 @@ type OrderCancelWsResponse struct { Error *common.APIError `json:"error,omitempty"` } -// OrderCancelWsService cancel order type OrderCancelWsService struct { c websocket.Client ApiKey string @@ -84,27 +83,38 @@ type OrderCancelWsService struct { } // NewOrderCancelWsService init OrderCancelWsService -func NewOrderCancelWsService(apiKey, secretKey string) (*OrderCancelWsService, error) { +func NewOrderCancelWsService(apiKey, secretKey string, opts ...websocket.WebSocketServiceOption) (*OrderCancelWsService, error) { + service := &OrderCancelWsService{ + ApiKey: apiKey, + SecretKey: secretKey, + KeyType: common.KeyTypeHmac, + } + + createOpts := &websocket.WebSocketServiceCreateOption{} + for _, opt := range opts { + opt(createOpts) + } + + if createOpts.Client != nil { + service.c = createOpts.Client + return service, nil + } + conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) if err != nil { return nil, err } - client, err := websocket.NewClient(conn) if err != nil { return nil, err } + service.c = client - return &OrderCancelWsService{ - c: client, - ApiKey: apiKey, - SecretKey: secretKey, - KeyType: common.KeyTypeHmac, - }, nil + return service, nil } // Do - sends 'order.cancel' request -func (s *OrderCancelWsService) Do(requestID string, request *OrderCancelRequest) error { +func (s *OrderCancelWsService) Do(requestID string, request *OrderCancelRequest, opts ...websocket.RequestOption) error { rawData, err := websocket.CreateRequest( websocket.NewRequestData( requestID, @@ -120,7 +130,7 @@ func (s *OrderCancelWsService) Do(requestID string, request *OrderCancelRequest) return err } - if err := s.c.Write(requestID, rawData); err != nil { + if err := s.c.Write(requestID, rawData, opts...); err != nil { return err } diff --git a/v2/futures/order_place_service_ws.go b/v2/futures/order_place_service_ws.go index 1155cc039..deef85800 100644 --- a/v2/futures/order_place_service_ws.go +++ b/v2/futures/order_place_service_ws.go @@ -8,7 +8,6 @@ import ( "github.com/adshao/go-binance/v2/common/websocket" ) -// OrderPlaceWsService creates order type OrderPlaceWsService struct { c websocket.Client ApiKey string @@ -18,23 +17,34 @@ type OrderPlaceWsService struct { } // NewOrderPlaceWsService init OrderPlaceWsService -func NewOrderPlaceWsService(apiKey, secretKey string) (*OrderPlaceWsService, error) { +func NewOrderPlaceWsService(apiKey, secretKey string, opts ...websocket.WebSocketServiceOption) (*OrderPlaceWsService, error) { + service := &OrderPlaceWsService{ + ApiKey: apiKey, + SecretKey: secretKey, + KeyType: common.KeyTypeHmac, + } + + createOpts := &websocket.WebSocketServiceCreateOption{} + for _, opt := range opts { + opt(createOpts) + } + + if createOpts.Client != nil { + service.c = createOpts.Client + return service, nil + } + conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) if err != nil { return nil, err } - client, err := websocket.NewClient(conn) if err != nil { return nil, err } + service.c = client - return &OrderPlaceWsService{ - c: client, - ApiKey: apiKey, - SecretKey: secretKey, - KeyType: common.KeyTypeHmac, - }, nil + return service, nil } // OrderPlaceWsRequest parameters for 'order.place' websocket API @@ -238,7 +248,7 @@ func (s *OrderPlaceWsRequest) buildParams() params { } // Do - sends 'order.place' request -func (s *OrderPlaceWsService) Do(requestID string, request *OrderPlaceWsRequest) error { +func (s *OrderPlaceWsService) Do(requestID string, request *OrderPlaceWsRequest, opts ...websocket.RequestOption) error { rawData, err := websocket.CreateRequest( websocket.NewRequestData( requestID, @@ -254,7 +264,7 @@ func (s *OrderPlaceWsService) Do(requestID string, request *OrderPlaceWsRequest) return err } - if err := s.c.Write(requestID, rawData); err != nil { + if err := s.c.Write(requestID, rawData, opts...); err != nil { return err } diff --git a/v2/futures/order_status_service_ws.go b/v2/futures/order_status_service_ws.go index 3c98aae62..28fde6206 100644 --- a/v2/futures/order_status_service_ws.go +++ b/v2/futures/order_status_service_ws.go @@ -18,23 +18,34 @@ type OrderStatusWsService struct { } // NewOrderStatusWsService init OrderStatusWsService -func NewOrderStatusWsService(apiKey, secretKey string) (*OrderStatusWsService, error) { +func NewOrderStatusWsService(apiKey, secretKey string, opts ...websocket.WebSocketServiceOption) (*OrderStatusWsService, error) { + service := &OrderStatusWsService{ + ApiKey: apiKey, + SecretKey: secretKey, + KeyType: common.KeyTypeHmac, + } + + createOpts := &websocket.WebSocketServiceCreateOption{} + for _, opt := range opts { + opt(createOpts) + } + + if createOpts.Client != nil { + service.c = createOpts.Client + return service, nil + } + conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) if err != nil { return nil, err } - client, err := websocket.NewClient(conn) if err != nil { return nil, err } + service.c = client - return &OrderStatusWsService{ - c: client, - ApiKey: apiKey, - SecretKey: secretKey, - KeyType: common.KeyTypeHmac, - }, nil + return service, nil } // OrderStatusWsRequest parameters for 'order.status' websocket API @@ -128,7 +139,7 @@ func (s *OrderStatusWsRequest) buildParams() params { } // Do - sends 'order.status' request -func (s *OrderStatusWsService) Do(requestID string, request *OrderStatusWsRequest) error { +func (s *OrderStatusWsService) Do(requestID string, request *OrderStatusWsRequest, opts ...websocket.RequestOption) error { rawData, err := websocket.CreateRequest( websocket.NewRequestData( requestID, @@ -144,7 +155,7 @@ func (s *OrderStatusWsService) Do(requestID string, request *OrderStatusWsReques return err } - if err := s.c.Write(requestID, rawData); err != nil { + if err := s.c.Write(requestID, rawData, opts...); err != nil { return err } diff --git a/v2/order_list_cancel_service_ws.go b/v2/order_list_cancel_service_ws.go index 44cab9a0d..5faa40fde 100644 --- a/v2/order_list_cancel_service_ws.go +++ b/v2/order_list_cancel_service_ws.go @@ -18,23 +18,34 @@ type OrderListCancelWsService struct { } // NewOrderListCancelWsService init OrderListCancelWsService -func NewOrderListCancelWsService(apiKey, secretKey string) (*OrderListCancelWsService, error) { +func NewOrderListCancelWsService(apiKey, secretKey string, opts ...websocket.WebSocketServiceOption) (*OrderListCancelWsService, error) { + service := &OrderListCancelWsService{ + ApiKey: apiKey, + SecretKey: secretKey, + KeyType: common.KeyTypeHmac, + } + + createOpts := &websocket.WebSocketServiceCreateOption{} + for _, opt := range opts { + opt(createOpts) + } + + if createOpts.Client != nil { + service.c = createOpts.Client + return service, nil + } + conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) if err != nil { return nil, err } - client, err := websocket.NewClient(conn) if err != nil { return nil, err } + service.c = client - return &OrderListCancelWsService{ - c: client, - ApiKey: apiKey, - SecretKey: secretKey, - KeyType: common.KeyTypeHmac, - }, nil + return service, nil } // OrderListCancelWsRequest parameters for 'orderList.cancel' websocket API @@ -76,7 +87,7 @@ func (s *OrderListCancelWsRequest) buildParams() params { } // Do - sends 'orderList.cancel' request -func (s *OrderListCancelWsService) Do(requestID string, request *OrderListCancelWsRequest) error { +func (s *OrderListCancelWsService) Do(requestID string, request *OrderListCancelWsRequest, opts ...websocket.RequestOption) error { rawData, err := websocket.CreateRequest( websocket.NewRequestData( requestID, @@ -92,7 +103,7 @@ func (s *OrderListCancelWsService) Do(requestID string, request *OrderListCancel return err } - if err := s.c.Write(requestID, rawData); err != nil { + if err := s.c.Write(requestID, rawData, opts...); err != nil { return err } diff --git a/v2/order_list_place_oto_service_ws.go b/v2/order_list_place_oto_service_ws.go index 00edc12ba..2062128f0 100644 --- a/v2/order_list_place_oto_service_ws.go +++ b/v2/order_list_place_oto_service_ws.go @@ -18,23 +18,34 @@ type OrderListPlaceOtoWsService struct { } // NewOrderListPlaceOtoWsService init OrderListPlaceOtoWsService -func NewOrderListPlaceOtoWsService(apiKey, secretKey string) (*OrderListPlaceOtoWsService, error) { +func NewOrderListPlaceOtoWsService(apiKey, secretKey string, opts ...websocket.WebSocketServiceOption) (*OrderListPlaceOtoWsService, error) { + service := &OrderListPlaceOtoWsService{ + ApiKey: apiKey, + SecretKey: secretKey, + KeyType: common.KeyTypeHmac, + } + + createOpts := &websocket.WebSocketServiceCreateOption{} + for _, opt := range opts { + opt(createOpts) + } + + if createOpts.Client != nil { + service.c = createOpts.Client + return service, nil + } + conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) if err != nil { return nil, err } - client, err := websocket.NewClient(conn) if err != nil { return nil, err } + service.c = client - return &OrderListPlaceOtoWsService{ - c: client, - ApiKey: apiKey, - SecretKey: secretKey, - KeyType: common.KeyTypeHmac, - }, nil + return service, nil } // OrderListPlaceOtoWsRequest parameters for 'orderList.place.oto' websocket API @@ -148,7 +159,7 @@ func (s *OrderListPlaceOtoWsRequest) buildParams() params { } // Do - sends 'orderList.place.oto' request -func (s *OrderListPlaceOtoWsService) Do(requestID string, request *OrderListPlaceOtoWsRequest) error { +func (s *OrderListPlaceOtoWsService) Do(requestID string, request *OrderListPlaceOtoWsRequest, opts ...websocket.RequestOption) error { rawData, err := websocket.CreateRequest( websocket.NewRequestData( requestID, @@ -164,7 +175,7 @@ func (s *OrderListPlaceOtoWsService) Do(requestID string, request *OrderListPlac return err } - if err := s.c.Write(requestID, rawData); err != nil { + if err := s.c.Write(requestID, rawData, opts...); err != nil { return err } diff --git a/v2/order_list_place_otoco_service_ws.go b/v2/order_list_place_otoco_service_ws.go index dbe4a52da..ca252f8a3 100644 --- a/v2/order_list_place_otoco_service_ws.go +++ b/v2/order_list_place_otoco_service_ws.go @@ -18,23 +18,34 @@ type OrderListPlaceOtocoWsService struct { } // NewOrderListPlaceOtocoWsService init OrderListPlaceOtocoWsService -func NewOrderListPlaceOtocoWsService(apiKey, secretKey string) (*OrderListPlaceOtocoWsService, error) { +func NewOrderListPlaceOtocoWsService(apiKey, secretKey string, opts ...websocket.WebSocketServiceOption) (*OrderListPlaceOtocoWsService, error) { + service := &OrderListPlaceOtocoWsService{ + ApiKey: apiKey, + SecretKey: secretKey, + KeyType: common.KeyTypeHmac, + } + + createOpts := &websocket.WebSocketServiceCreateOption{} + for _, opt := range opts { + opt(createOpts) + } + + if createOpts.Client != nil { + service.c = createOpts.Client + return service, nil + } + conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) if err != nil { return nil, err } - client, err := websocket.NewClient(conn) if err != nil { return nil, err } + service.c = client - return &OrderListPlaceOtocoWsService{ - c: client, - ApiKey: apiKey, - SecretKey: secretKey, - KeyType: common.KeyTypeHmac, - }, nil + return service, nil } // OrderListPlaceOtocoWsRequest parameters for 'orderList.place.otoco' websocket API @@ -186,7 +197,7 @@ func (s *OrderListPlaceOtocoWsRequest) buildParams() params { } // Do - sends 'orderList.place.otoco' request -func (s *OrderListPlaceOtocoWsService) Do(requestID string, request *OrderListPlaceOtocoWsRequest) error { +func (s *OrderListPlaceOtocoWsService) Do(requestID string, request *OrderListPlaceOtocoWsRequest, opts ...websocket.RequestOption) error { rawData, err := websocket.CreateRequest( websocket.NewRequestData( requestID, @@ -202,7 +213,7 @@ func (s *OrderListPlaceOtocoWsService) Do(requestID string, request *OrderListPl return err } - if err := s.c.Write(requestID, rawData); err != nil { + if err := s.c.Write(requestID, rawData, opts...); err != nil { return err } diff --git a/v2/order_list_place_service_ws.go b/v2/order_list_place_service_ws.go index 9ae3a0d8f..a89960789 100644 --- a/v2/order_list_place_service_ws.go +++ b/v2/order_list_place_service_ws.go @@ -18,23 +18,34 @@ type OrderListPlaceWsService struct { } // NewOrderListPlaceWsService init OrderListPlaceWsService -func NewOrderListPlaceWsService(apiKey, secretKey string) (*OrderListPlaceWsService, error) { +func NewOrderListPlaceWsService(apiKey, secretKey string, opts ...websocket.WebSocketServiceOption) (*OrderListPlaceWsService, error) { + service := &OrderListPlaceWsService{ + ApiKey: apiKey, + SecretKey: secretKey, + KeyType: common.KeyTypeHmac, + } + + createOpts := &websocket.WebSocketServiceCreateOption{} + for _, opt := range opts { + opt(createOpts) + } + + if createOpts.Client != nil { + service.c = createOpts.Client + return service, nil + } + conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) if err != nil { return nil, err } - client, err := websocket.NewClient(conn) if err != nil { return nil, err } + service.c = client - return &OrderListPlaceWsService{ - c: client, - ApiKey: apiKey, - SecretKey: secretKey, - KeyType: common.KeyTypeHmac, - }, nil + return service, nil } // OrderListPlaceWsRequest parameters for 'orderList.place' websocket API (deprecated OCO) @@ -136,7 +147,7 @@ func (s *OrderListPlaceWsRequest) buildParams() params { } // Do - sends 'orderList.place' request -func (s *OrderListPlaceWsService) Do(requestID string, request *OrderListPlaceWsRequest) error { +func (s *OrderListPlaceWsService) Do(requestID string, request *OrderListPlaceWsRequest, opts ...websocket.RequestOption) error { rawData, err := websocket.CreateRequest( websocket.NewRequestData( requestID, @@ -152,7 +163,7 @@ func (s *OrderListPlaceWsService) Do(requestID string, request *OrderListPlaceWs return err } - if err := s.c.Write(requestID, rawData); err != nil { + if err := s.c.Write(requestID, rawData, opts...); err != nil { return err } diff --git a/v2/order_list_service_ws_create.go b/v2/order_list_service_ws_create.go index 58d34c8da..217b60755 100644 --- a/v2/order_list_service_ws_create.go +++ b/v2/order_list_service_ws_create.go @@ -18,23 +18,34 @@ type OrderListCreateWsService struct { } // NewOrderListCreateWsService init OrderListCreateWsService -func NewOrderListCreateWsService(apiKey, secretKey string) (*OrderListCreateWsService, error) { +func NewOrderListCreateWsService(apiKey, secretKey string, opts ...websocket.WebSocketServiceOption) (*OrderListCreateWsService, error) { + service := &OrderListCreateWsService{ + ApiKey: apiKey, + SecretKey: secretKey, + KeyType: common.KeyTypeHmac, + } + + createOpts := &websocket.WebSocketServiceCreateOption{} + for _, opt := range opts { + opt(createOpts) + } + + if createOpts.Client != nil { + service.c = createOpts.Client + return service, nil + } + conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) if err != nil { return nil, err } - client, err := websocket.NewClient(conn) if err != nil { return nil, err } + service.c = client - return &OrderListCreateWsService{ - c: client, - ApiKey: apiKey, - SecretKey: secretKey, - KeyType: common.KeyTypeHmac, - }, nil + return service, nil } // OrderListCreateWsRequest parameters for 'orderList.place.oco' websocket API @@ -152,7 +163,7 @@ func (s *OrderListCreateWsRequest) buildParams() params { } // Do - sends 'orderList.place.oco' request -func (s *OrderListCreateWsService) Do(requestID string, request *OrderListCreateWsRequest) error { +func (s *OrderListCreateWsService) Do(requestID string, request *OrderListCreateWsRequest, opts ...websocket.RequestOption) error { rawData, err := websocket.CreateRequest( websocket.NewRequestData( requestID, @@ -168,7 +179,7 @@ func (s *OrderListCreateWsService) Do(requestID string, request *OrderListCreate return err } - if err := s.c.Write(requestID, rawData); err != nil { + if err := s.c.Write(requestID, rawData, opts...); err != nil { return err } diff --git a/v2/order_service_ws_create.go b/v2/order_service_ws_create.go index e62c220c0..d91144134 100644 --- a/v2/order_service_ws_create.go +++ b/v2/order_service_ws_create.go @@ -18,23 +18,34 @@ type OrderCreateWsService struct { } // NewOrderCreateWsService init OrderCreateWsService -func NewOrderCreateWsService(apiKey, secretKey string) (*OrderCreateWsService, error) { +func NewOrderCreateWsService(apiKey, secretKey string, opts ...websocket.WebSocketServiceOption) (*OrderCreateWsService, error) { + service := &OrderCreateWsService{ + ApiKey: apiKey, + SecretKey: secretKey, + KeyType: common.KeyTypeHmac, + } + + createOpts := &websocket.WebSocketServiceCreateOption{} + for _, opt := range opts { + opt(createOpts) + } + + if createOpts.Client != nil { + service.c = createOpts.Client + return service, nil + } + conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) if err != nil { return nil, err } - client, err := websocket.NewClient(conn) if err != nil { return nil, err } + service.c = client - return &OrderCreateWsService{ - c: client, - ApiKey: apiKey, - SecretKey: secretKey, - KeyType: common.KeyTypeHmac, - }, nil + return service, nil } // OrderCreateWsRequest parameters for 'order.place' websocket API @@ -112,7 +123,7 @@ func (s *OrderCreateWsRequest) buildParams() params { } // Do - sends 'order.place' request -func (s *OrderCreateWsService) Do(requestID string, request *OrderCreateWsRequest) error { +func (s *OrderCreateWsService) Do(requestID string, request *OrderCreateWsRequest, opts ...websocket.RequestOption) error { rawData, err := websocket.CreateRequest( websocket.NewRequestData( requestID, @@ -128,7 +139,7 @@ func (s *OrderCreateWsService) Do(requestID string, request *OrderCreateWsReques return err } - if err := s.c.Write(requestID, rawData); err != nil { + if err := s.c.Write(requestID, rawData, opts...); err != nil { return err } diff --git a/v2/sor_order_place_service_ws.go b/v2/sor_order_place_service_ws.go index 5c3bc5d70..1c5058ce5 100644 --- a/v2/sor_order_place_service_ws.go +++ b/v2/sor_order_place_service_ws.go @@ -18,23 +18,34 @@ type SorOrderPlaceWsService struct { } // NewSorOrderPlaceWsService init SorOrderPlaceWsService -func NewSorOrderPlaceWsService(apiKey, secretKey string) (*SorOrderPlaceWsService, error) { +func NewSorOrderPlaceWsService(apiKey, secretKey string, opts ...websocket.WebSocketServiceOption) (*SorOrderPlaceWsService, error) { + service := &SorOrderPlaceWsService{ + ApiKey: apiKey, + SecretKey: secretKey, + KeyType: common.KeyTypeHmac, + } + + createOpts := &websocket.WebSocketServiceCreateOption{} + for _, opt := range opts { + opt(createOpts) + } + + if createOpts.Client != nil { + service.c = createOpts.Client + return service, nil + } + conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) if err != nil { return nil, err } - client, err := websocket.NewClient(conn) if err != nil { return nil, err } + service.c = client - return &SorOrderPlaceWsService{ - c: client, - ApiKey: apiKey, - SecretKey: secretKey, - KeyType: common.KeyTypeHmac, - }, nil + return service, nil } // SorOrderPlaceWsRequest parameters for 'sor.order.place' websocket API @@ -104,7 +115,7 @@ func (s *SorOrderPlaceWsRequest) buildParams() params { } // Do - sends 'sor.order.place' request -func (s *SorOrderPlaceWsService) Do(requestID string, request *SorOrderPlaceWsRequest) error { +func (s *SorOrderPlaceWsService) Do(requestID string, request *SorOrderPlaceWsRequest, opts ...websocket.RequestOption) error { rawData, err := websocket.CreateRequest( websocket.NewRequestData( requestID, @@ -120,7 +131,7 @@ func (s *SorOrderPlaceWsService) Do(requestID string, request *SorOrderPlaceWsRe return err } - if err := s.c.Write(requestID, rawData); err != nil { + if err := s.c.Write(requestID, rawData, opts...); err != nil { return err } diff --git a/v2/sor_order_test_service_ws.go b/v2/sor_order_test_service_ws.go index 7e343ad4d..681b821a2 100644 --- a/v2/sor_order_test_service_ws.go +++ b/v2/sor_order_test_service_ws.go @@ -18,23 +18,34 @@ type SorOrderTestWsService struct { } // NewSorOrderTestWsService init SorOrderTestWsService -func NewSorOrderTestWsService(apiKey, secretKey string) (*SorOrderTestWsService, error) { +func NewSorOrderTestWsService(apiKey, secretKey string, opts ...websocket.WebSocketServiceOption) (*SorOrderTestWsService, error) { + service := &SorOrderTestWsService{ + ApiKey: apiKey, + SecretKey: secretKey, + KeyType: common.KeyTypeHmac, + } + + createOpts := &websocket.WebSocketServiceCreateOption{} + for _, opt := range opts { + opt(createOpts) + } + + if createOpts.Client != nil { + service.c = createOpts.Client + return service, nil + } + conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) if err != nil { return nil, err } - client, err := websocket.NewClient(conn) if err != nil { return nil, err } + service.c = client - return &SorOrderTestWsService{ - c: client, - ApiKey: apiKey, - SecretKey: secretKey, - KeyType: common.KeyTypeHmac, - }, nil + return service, nil } // SorOrderTestWsRequest parameters for 'sor.order.test' websocket API @@ -102,7 +113,7 @@ func (s *SorOrderTestWsRequest) buildParams() params { } // Do - sends 'sor.order.test' request -func (s *SorOrderTestWsService) Do(requestID string, request *SorOrderTestWsRequest) error { +func (s *SorOrderTestWsService) Do(requestID string, request *SorOrderTestWsRequest, opts ...websocket.RequestOption) error { rawData, err := websocket.CreateRequest( websocket.NewRequestData( requestID, @@ -118,7 +129,7 @@ func (s *SorOrderTestWsService) Do(requestID string, request *SorOrderTestWsRequ return err } - if err := s.c.Write(requestID, rawData); err != nil { + if err := s.c.Write(requestID, rawData, opts...); err != nil { return err } From 47320d482a3075be5f4e25a17689a69765f2f832 Mon Sep 17 00:00:00 2001 From: Artur Abelian Date: Sat, 18 Oct 2025 01:05:54 +0200 Subject: [PATCH 03/11] feat: added example --- examples/main.go | 3 +- examples/multiplexed_websocket.go | 203 ++++++++++++++++++++++++++++++ 2 files changed, 205 insertions(+), 1 deletion(-) create mode 100644 examples/multiplexed_websocket.go diff --git a/examples/main.go b/examples/main.go index a3d07076b..44cbcf91a 100644 --- a/examples/main.go +++ b/examples/main.go @@ -27,6 +27,7 @@ func main() { // FuturesOrder() // DeliveryOrder() // WalletBalance() - WatchMiniMarketsStat() + // WatchMiniMarketsStat() // RunOrderListExamples() + MultiplexedFuturesWebSocketExample() } diff --git a/examples/multiplexed_websocket.go b/examples/multiplexed_websocket.go new file mode 100644 index 000000000..c95ad0290 --- /dev/null +++ b/examples/multiplexed_websocket.go @@ -0,0 +1,203 @@ +package main + +import ( + "fmt" + "log" + "strings" + "sync" + "time" + + "github.com/adshao/go-binance/v2/common/websocket" + "github.com/adshao/go-binance/v2/futures" +) + +// ResponseCounter tracks request/response counts +type ResponseCounter struct { + TotalRequests int64 + TotalResponses int64 + PlaceRequests int64 + PlaceResponses int64 + StatusRequests int64 + StatusResponses int64 + CancelRequests int64 + CancelResponses int64 +} + +// RequestResponse tracks individual request/response pairs +type RequestResponse struct { + UUID string + RequestType string + Timestamp time.Time + Responded bool +} + +// MultiplexedFuturesWebSocketExample demonstrates concurrent WebSocket requests +// with UUID tracking and waiter channels for 1:1 request/response matching +func MultiplexedFuturesWebSocketExample() { + log.Println("=== Enhanced Multiplexed Futures WebSocket Example ===") + log.Println("Testing concurrent requests with UUID tracking and waiter channels") + + if err := AppConfig.Validate(); err != nil { + log.Printf("Configuration error: %v\n", err) + return + } + + // Setup testnet + AppConfig.SetupTestnet() + + // Create a shared WebSocket connection and client for futures + conn, err := websocket.NewConnection(futures.WsApiInitReadWriteConn, futures.WebsocketKeepalive, futures.WebsocketTimeoutReadWriteConnection) + if err != nil { + log.Printf("Failed to create WebSocket connection: %v\n", err) + return + } + + sharedClient, err := websocket.NewClient(conn) + if err != nil { + log.Printf("Failed to create WebSocket client: %v\n", err) + return + } + log.Printf("Shared futures WebSocket client created successfully\n\n") + + var wg sync.WaitGroup + wg.Add(2) // 2 reading goroutines must complete + go startFallbackMessages(sharedClient) + go startOrderPlaceService(&wg, sharedClient) + go startOrderCancelService(&wg, sharedClient) + + wg.Wait() +} + +// startFallbackMessages start monitoring goroutine for fallback messages +func startFallbackMessages(sharedClient websocket.Client) { + go func() { + log.Println("Monitoring shared channel for fallback messages...") + + for { + select { + case data := <-sharedClient.GetReadChannel(): + log.Fatalf("FALLBACK #%s: Message received on shared channel (should be routed to waiter!)\n", string(data)) + case err := <-sharedClient.GetReadErrorChannel(): + if err != nil { + log.Fatalf("Shared channel error: %v\n", err) + } + } + } + }() +} + +func startOrderPlaceService(wg *sync.WaitGroup, sharedClient websocket.Client) { + // Create services + orderPlaceService, err := futures.NewOrderPlaceWsService( + AppConfig.APIKey, + AppConfig.SecretKey, + websocket.WithWebSocketClient(sharedClient), + ) + if err != nil { + log.Printf("Failed to create order place service: %v\n", err) + return + } + + // Create dedicated waiter channel + waiterOrderPlace := make(chan []byte, 1) + + go func() { + defer wg.Done() + + for { + // Wait for response with timeout + select { + case data := <-waiterOrderPlace: + response := string(data) + log.Println("order.place response:", response) + + if !strings.Contains(response, "place_") { + // if not expected response received, exit 1 + log.Fatalln("order place response does not contain prefix , something went wrong with routing") + } + + case <-time.After(5 * time.Second): + log.Println("order.place timeout") + return + } + } + }() + + // Build request + request := futures.NewOrderPlaceWsRequest(). + Symbol("BTCUSDT"). // Use valid symbol + Side(futures.SideTypeBuy). + Type(futures.OrderTypeMarket). + TimeInForce(futures.TimeInForceTypeFOK). + Quantity("0.00001") // Small quantity for testing + + const requestsCount = 100 + // Send multiple concurrent requests with single waiter + for range requestsCount { + requestID := fmt.Sprintf("place_%d", time.Now().UnixNano()) + go func() { + log.Println("order.place sending request:", requestID) + if err := orderPlaceService.Do(requestID, request, websocket.WithWaiter(waiterOrderPlace)); err != nil { + log.Fatal(err) + } + }() + + time.Sleep(100 * time.Millisecond) + } +} + +func startOrderCancelService(wg *sync.WaitGroup, sharedClient websocket.Client) { + // Create services + orderCancelWsService, err := futures.NewOrderCancelWsService( + AppConfig.APIKey, + AppConfig.SecretKey, + websocket.WithWebSocketClient(sharedClient), + ) + if err != nil { + log.Printf("Failed to create order cancel service: %v\n", err) + return + } + + // Create dedicated waiter channel + waiterOrderCancel := make(chan []byte, 1) + go func() { + defer wg.Done() + + for { + // Wait for response with timeout + select { + case data := <-waiterOrderCancel: + response := string(data) + log.Println("order.cancel response:", response) + + if !strings.Contains(response, "cancel_") { + // if not expected response received, exit 1 + log.Fatalln("order cancel response does not contain prefix , something went wrong with routing") + } + + case <-time.After(5 * time.Second): + log.Println("order.cancel timeout") + return + } + } + }() + + // Build request + request := futures.NewOrderCancelRequest(). + Symbol("BTCUSDT"). + OrderID(123123) // not existing order for testing + + const requestsCount = 100 + // Send multiple concurrent requests with single waiter + for range requestsCount { + requestID := fmt.Sprintf("cancel_%d", time.Now().UnixNano()) + go func() { + log.Println("order.cancel sending request:", requestID) + if err := orderCancelWsService.Do(requestID, request, websocket.WithWaiter(waiterOrderCancel)); err != nil { + log.Fatal(err) + } + }() + + time.Sleep(100 * time.Millisecond) + } +} From 15b351e427709c837724614efff19d8c1a004aee Mon Sep 17 00:00:00 2001 From: Artur Abelian Date: Sat, 18 Oct 2025 12:29:50 +0200 Subject: [PATCH 04/11] feat: improve example --- examples/multiplexed_websocket.go | 472 +++++++++++++++++++++++------- 1 file changed, 362 insertions(+), 110 deletions(-) diff --git a/examples/multiplexed_websocket.go b/examples/multiplexed_websocket.go index c95ad0290..de228f0e8 100644 --- a/examples/multiplexed_websocket.go +++ b/examples/multiplexed_websocket.go @@ -1,8 +1,10 @@ package main import ( + "context" "fmt" - "log" + "log/slog" + "os" "strings" "sync" "time" @@ -11,193 +13,443 @@ import ( "github.com/adshao/go-binance/v2/futures" ) -// ResponseCounter tracks request/response counts -type ResponseCounter struct { - TotalRequests int64 - TotalResponses int64 - PlaceRequests int64 - PlaceResponses int64 - StatusRequests int64 - StatusResponses int64 - CancelRequests int64 - CancelResponses int64 +var ( + logger *slog.Logger + ctx context.Context + cancel context.CancelFunc +) + +const requestsCount = 100 + +// Stats tracks request/response statistics +type Stats struct { + RequestsSent int + ResponsesReceived int + StartTime time.Time + EndTime time.Time + mu sync.Mutex +} + +func (s *Stats) IncrementRequests() { + s.mu.Lock() + defer s.mu.Unlock() + s.RequestsSent++ +} + +func (s *Stats) IncrementResponses() { + s.mu.Lock() + defer s.mu.Unlock() + s.ResponsesReceived++ } -// RequestResponse tracks individual request/response pairs -type RequestResponse struct { - UUID string - RequestType string - Timestamp time.Time - Responded bool +func (s *Stats) GetCounts() (int, int) { + s.mu.Lock() + defer s.mu.Unlock() + return s.RequestsSent, s.ResponsesReceived } +func (s *Stats) SetStartTime() { + s.mu.Lock() + defer s.mu.Unlock() + s.StartTime = time.Now() +} + +func (s *Stats) SetEndTime() { + s.mu.Lock() + defer s.mu.Unlock() + s.EndTime = time.Now() +} + +func (s *Stats) GetDuration() time.Duration { + s.mu.Lock() + defer s.mu.Unlock() + if s.EndTime.IsZero() { + return time.Since(s.StartTime) + } + return s.EndTime.Sub(s.StartTime) +} + +var ( + placeStats = &Stats{} + cancelStats = &Stats{} +) + // MultiplexedFuturesWebSocketExample demonstrates concurrent WebSocket requests -// with UUID tracking and waiter channels for 1:1 request/response matching +// with waiter channels for 1:1 request/response matching func MultiplexedFuturesWebSocketExample() { - log.Println("=== Enhanced Multiplexed Futures WebSocket Example ===") - log.Println("Testing concurrent requests with UUID tracking and waiter channels") + // Setup structured logging + logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelInfo, + })) + logger.Info("=== Enhanced Multiplexed Futures WebSocket Example ===") + logger.Info("Testing concurrent requests with UUID tracking and waiter channels") + + // Validate configuration if err := AppConfig.Validate(); err != nil { - log.Printf("Configuration error: %v\n", err) + logger.Error("Configuration error", "error", err) return } - // Setup testnet + // Setup testnet and context AppConfig.SetupTestnet() + ctx, cancel = context.WithCancel(context.Background()) + defer cancel() - // Create a shared WebSocket connection and client for futures - conn, err := websocket.NewConnection(futures.WsApiInitReadWriteConn, futures.WebsocketKeepalive, futures.WebsocketTimeoutReadWriteConnection) - if err != nil { - log.Printf("Failed to create WebSocket connection: %v\n", err) - return - } - - sharedClient, err := websocket.NewClient(conn) + // Create shared WebSocket connection and client + sharedClient, err := createWebSocketClient() if err != nil { - log.Printf("Failed to create WebSocket client: %v\n", err) + logger.Error("Failed to create WebSocket client", "error", err) return } - log.Printf("Shared futures WebSocket client created successfully\n\n") var wg sync.WaitGroup - wg.Add(2) // 2 reading goroutines must complete - go startFallbackMessages(sharedClient) + wg.Add(2) // 2 services will run + + // Start monitoring and services + go startFallbackMonitoring(sharedClient) go startOrderPlaceService(&wg, sharedClient) go startOrderCancelService(&wg, sharedClient) wg.Wait() + logger.Info("All services completed") + + // Print final statistics + printFinalStats() } -// startFallbackMessages start monitoring goroutine for fallback messages -func startFallbackMessages(sharedClient websocket.Client) { - go func() { - log.Println("Monitoring shared channel for fallback messages...") +// createWebSocketClient creates a new WebSocket client +func createWebSocketClient() (websocket.Client, error) { + conn, err := websocket.NewConnection( + futures.WsApiInitReadWriteConn, + futures.WebsocketKeepalive, + futures.WebsocketTimeoutReadWriteConnection, + ) + if err != nil { + return nil, fmt.Errorf("failed to create WebSocket connection: %w", err) + } + + client, err := websocket.NewClient(conn) + if err != nil { + return nil, fmt.Errorf("failed to create WebSocket client: %w", err) + } + + logger.Info("Shared futures WebSocket client created successfully") + return client, nil +} - for { - select { - case data := <-sharedClient.GetReadChannel(): - log.Fatalf("FALLBACK #%s: Message received on shared channel (should be routed to waiter!)\n", string(data)) - case err := <-sharedClient.GetReadErrorChannel(): - if err != nil { - log.Fatalf("Shared channel error: %v\n", err) - } +// startFallbackMonitoring monitors the shared channel for fallback messages +func startFallbackMonitoring(sharedClient websocket.Client) { + logger.Info("Starting fallback message monitoring") + + for { + select { + case data := <-sharedClient.GetReadChannel(): + logger.Error("FALLBACK: Message received on shared channel (should be routed to waiter)", + "data", string(data)) + case err := <-sharedClient.GetReadErrorChannel(): + if err != nil { + logger.Error("Shared channel error", "error", err) + return } + case <-ctx.Done(): + logger.Info("Fallback monitoring stopped") + return } - }() + } } +// startOrderPlaceService handles order placement requests func startOrderPlaceService(wg *sync.WaitGroup, sharedClient websocket.Client) { - // Create services + defer wg.Done() + + // Create order place service orderPlaceService, err := futures.NewOrderPlaceWsService( AppConfig.APIKey, AppConfig.SecretKey, websocket.WithWebSocketClient(sharedClient), ) if err != nil { - log.Printf("Failed to create order place service: %v\n", err) + logger.Error("Failed to create order place service", "error", err) return } // Create dedicated waiter channel - waiterOrderPlace := make(chan []byte, 1) + waiterChannel := make(chan []byte, 1) + defer close(waiterChannel) + + // Initialize stats + placeStats.SetStartTime() + + // Send multiple requests with concurrency control + go sendOrderPlaceRequests(orderPlaceService, waiterChannel) - go func() { - defer wg.Done() + // Start response listener + handleOrderPlaceResponses(waiterChannel) +} + +// handleOrderPlaceResponses handles responses from order place service +func handleOrderPlaceResponses(waiterChannel <-chan []byte) { + serviceLogger := logger.With("service", "order_place") + timeout := time.NewTimer(30 * time.Second) // Overall timeout for all responses + defer timeout.Stop() + + for { + select { + case data, ok := <-waiterChannel: + if !ok { + serviceLogger.Info("WaiterChannel closed") + placeStats.SetEndTime() + return + } + response := string(data) + serviceLogger.Debug("Response received", "response", response) - for { - // Wait for response with timeout - select { - case data := <-waiterOrderPlace: - response := string(data) - log.Println("order.place response:", response) + if !strings.Contains(response, "place_") { + serviceLogger.Error("Response does not contain expected prefix 'place_'", "response", response) + placeStats.SetEndTime() + return + } - if !strings.Contains(response, "place_") { - // if not expected response received, exit 1 - log.Fatalln("order place response does not contain prefix , something went wrong with routing") - } + placeStats.IncrementResponses() + sent, received := placeStats.GetCounts() + serviceLogger.Info("Progress", "responses_received", received, "requests_sent", sent) - case <-time.After(5 * time.Second): - log.Println("order.place timeout") + if received >= requestsCount { + serviceLogger.Info("All responses received successfully", "total", received) + placeStats.SetEndTime() return } + + case <-timeout.C: + sent, received := placeStats.GetCounts() + serviceLogger.Warn("Overall timeout reached", + "requests_sent", sent, + "responses_received", received, + "missing", sent-received) + placeStats.SetEndTime() + return + + case <-ctx.Done(): + serviceLogger.Info("Response handler stopped") + placeStats.SetEndTime() + return } - }() + } +} + +// sendOrderPlaceRequests sends multiple order placement requests concurrently +func sendOrderPlaceRequests(orderPlaceService *futures.OrderPlaceWsService, waiterChannel chan []byte) { + serviceLogger := logger.With("service", "order_place") // Build request request := futures.NewOrderPlaceWsRequest(). - Symbol("BTCUSDT"). // Use valid symbol + Symbol("BTCUSDT"). Side(futures.SideTypeBuy). Type(futures.OrderTypeMarket). TimeInForce(futures.TimeInForceTypeFOK). - Quantity("0.00001") // Small quantity for testing - - const requestsCount = 100 - // Send multiple concurrent requests with single waiter - for range requestsCount { - requestID := fmt.Sprintf("place_%d", time.Now().UnixNano()) - go func() { - log.Println("order.place sending request:", requestID) - if err := orderPlaceService.Do(requestID, request, websocket.WithWaiter(waiterOrderPlace)); err != nil { - log.Fatal(err) + Quantity("0.00001") + + var wg sync.WaitGroup + semaphore := make(chan struct{}, 10) // Limit concurrent requests + + for i := 0; i < requestsCount; i++ { + wg.Add(1) + go func(index int) { + defer wg.Done() + semaphore <- struct{}{} // Acquire semaphore + defer func() { <-semaphore }() // Release semaphore + + requestID := fmt.Sprintf("place_%d_%d", time.Now().UnixNano(), index) + serviceLogger.Debug("Sending request", "request_id", requestID, "index", index) + + if err := orderPlaceService.Do(requestID, request, websocket.WithWaiter(waiterChannel)); err != nil { + serviceLogger.Error("Failed to send request", "error", err, "request_id", requestID) + return } - }() + placeStats.IncrementRequests() + }(i) + // Rate limiting time.Sleep(100 * time.Millisecond) } + + wg.Wait() + serviceLogger.Info("All order place requests sent") } +// startOrderCancelService handles order cancellation requests func startOrderCancelService(wg *sync.WaitGroup, sharedClient websocket.Client) { - // Create services - orderCancelWsService, err := futures.NewOrderCancelWsService( + defer wg.Done() + + // Create order cancel service + orderCancelService, err := futures.NewOrderCancelWsService( AppConfig.APIKey, AppConfig.SecretKey, websocket.WithWebSocketClient(sharedClient), ) if err != nil { - log.Printf("Failed to create order cancel service: %v\n", err) + logger.Error("Failed to create order cancel service", "error", err) return } // Create dedicated waiter channel - waiterOrderCancel := make(chan []byte, 1) - go func() { - defer wg.Done() - - for { - // Wait for response with timeout - select { - case data := <-waiterOrderCancel: - response := string(data) - log.Println("order.cancel response:", response) - - if !strings.Contains(response, "cancel_") { - // if not expected response received, exit 1 - log.Fatalln("order cancel response does not contain prefix , something went wrong with routing") - } - - case <-time.After(5 * time.Second): - log.Println("order.cancel timeout") + waiterChannel := make(chan []byte, 1) + defer close(waiterChannel) + + // Initialize stats + cancelStats.SetStartTime() + + // Send multiple requests with concurrency control + go sendOrderCancelRequests(orderCancelService, waiterChannel) + + // Start response listener + handleOrderCancelResponses(waiterChannel) +} + +// handleOrderCancelResponses handles responses from order cancel service +func handleOrderCancelResponses(waiterChannel <-chan []byte) { + serviceLogger := logger.With("service", "order_cancel") + timeout := time.NewTimer(30 * time.Second) // Overall timeout for all responses + defer timeout.Stop() + + for { + select { + case data, ok := <-waiterChannel: + if !ok { + serviceLogger.Info("WaiterChannel closed") + cancelStats.SetEndTime() + return + } + response := string(data) + serviceLogger.Debug("Response received", "response", response) + + if !strings.Contains(response, "cancel_") { + serviceLogger.Error("Response does not contain expected prefix 'cancel_'", "response", response) + cancelStats.SetEndTime() return } + + cancelStats.IncrementResponses() + sent, received := cancelStats.GetCounts() + serviceLogger.Info("Progress", "responses_received", received, "requests_sent", sent) + + if received >= requestsCount { + serviceLogger.Info("All responses received successfully", "total", received) + cancelStats.SetEndTime() + return + } + + case <-timeout.C: + sent, received := cancelStats.GetCounts() + serviceLogger.Warn("Overall timeout reached", + "requests_sent", sent, + "responses_received", received, + "missing", sent-received) + cancelStats.SetEndTime() + return + + case <-ctx.Done(): + serviceLogger.Info("Response handler stopped") + cancelStats.SetEndTime() + return } - }() + } +} + +// sendOrderCancelRequests sends multiple order cancellation requests concurrently +func sendOrderCancelRequests(orderCancelService *futures.OrderCancelWsService, waiterChannel chan []byte) { + serviceLogger := logger.With("service", "order_cancel") // Build request request := futures.NewOrderCancelRequest(). Symbol("BTCUSDT"). - OrderID(123123) // not existing order for testing - - const requestsCount = 100 - // Send multiple concurrent requests with single waiter - for range requestsCount { - requestID := fmt.Sprintf("cancel_%d", time.Now().UnixNano()) - go func() { - log.Println("order.cancel sending request:", requestID) - if err := orderCancelWsService.Do(requestID, request, websocket.WithWaiter(waiterOrderCancel)); err != nil { - log.Fatal(err) + OrderID(123123) // Non-existing order for testing + + var wg sync.WaitGroup + semaphore := make(chan struct{}, 10) // Limit concurrent requests + + for i := 0; i < requestsCount; i++ { + wg.Add(1) + go func(index int) { + defer wg.Done() + semaphore <- struct{}{} // Acquire semaphore + defer func() { <-semaphore }() // Release semaphore + + requestID := fmt.Sprintf("cancel_%d_%d", time.Now().UnixNano(), index) + serviceLogger.Debug("Sending request", "request_id", requestID, "index", index) + + if err := orderCancelService.Do(requestID, request, websocket.WithWaiter(waiterChannel)); err != nil { + serviceLogger.Error("Failed to send request", "error", err, "request_id", requestID) + return } - }() + cancelStats.IncrementRequests() + }(i) + // Rate limiting time.Sleep(100 * time.Millisecond) } + + wg.Wait() + serviceLogger.Info("All order cancel requests sent") +} + +// printFinalStats prints comprehensive statistics for both services +func printFinalStats() { + logger.Info("=== FINAL STATISTICS ===") + + // Place stats + placeSent, placeReceived := placeStats.GetCounts() + placeDuration := placeStats.GetDuration() + placeSuccess := placeSent == placeReceived && placeSent == requestsCount + + logger.Info("Order Place Service Statistics", + "requests_sent", placeSent, + "responses_received", placeReceived, + "expected", requestsCount, + "missing", placeSent-placeReceived, + "success", placeSuccess, + "duration", placeDuration, + "avg_response_time", placeDuration/time.Duration(max(1, placeReceived)), + ) + + // Cancel stats + cancelSent, cancelReceived := cancelStats.GetCounts() + cancelDuration := cancelStats.GetDuration() + cancelSuccess := cancelSent == cancelReceived && cancelSent == requestsCount + + logger.Info("Order Cancel Service Statistics", + "requests_sent", cancelSent, + "responses_received", cancelReceived, + "expected", requestsCount, + "missing", cancelSent-cancelReceived, + "success", cancelSuccess, + "duration", cancelDuration, + "avg_response_time", cancelDuration/time.Duration(max(1, cancelReceived)), + ) + + // Overall stats + totalSent := placeSent + cancelSent + totalReceived := placeReceived + cancelReceived + totalExpected := requestsCount * 2 + overallSuccess := totalSent == totalReceived && totalSent == totalExpected + + logger.Info("Overall Statistics", + "total_requests_sent", totalSent, + "total_responses_received", totalReceived, + "total_expected", totalExpected, + "total_missing", totalSent-totalReceived, + "overall_success", overallSuccess, + "success_rate", fmt.Sprintf("%.2f%%", float64(totalReceived)/float64(totalSent)*100), + ) + + if !overallSuccess { + logger.Error("VERIFICATION FAILED: Request/Response counts do not match!", + "sent", totalSent, + "received", totalReceived, + "expected", totalExpected, + ) + } else { + logger.Info("✅ VERIFICATION PASSED: All requests received responses!") + } } From 3cb9b64262c3b26545a87119272b548f781d9f3c Mon Sep 17 00:00:00 2001 From: Artur Abelian Date: Sat, 18 Oct 2025 12:52:15 +0200 Subject: [PATCH 05/11] feat: improve example --- examples/multiplexed_websocket.go | 425 ++++++++++++++++++------------ 1 file changed, 250 insertions(+), 175 deletions(-) diff --git a/examples/multiplexed_websocket.go b/examples/multiplexed_websocket.go index de228f0e8..02c2a7979 100644 --- a/examples/multiplexed_websocket.go +++ b/examples/multiplexed_websocket.go @@ -7,119 +7,65 @@ import ( "os" "strings" "sync" + "sync/atomic" "time" "github.com/adshao/go-binance/v2/common/websocket" "github.com/adshao/go-binance/v2/futures" ) -var ( - logger *slog.Logger - ctx context.Context - cancel context.CancelFunc -) - -const requestsCount = 100 - -// Stats tracks request/response statistics -type Stats struct { - RequestsSent int - ResponsesReceived int - StartTime time.Time - EndTime time.Time - mu sync.Mutex -} - -func (s *Stats) IncrementRequests() { - s.mu.Lock() - defer s.mu.Unlock() - s.RequestsSent++ -} - -func (s *Stats) IncrementResponses() { - s.mu.Lock() - defer s.mu.Unlock() - s.ResponsesReceived++ -} - -func (s *Stats) GetCounts() (int, int) { - s.mu.Lock() - defer s.mu.Unlock() - return s.RequestsSent, s.ResponsesReceived -} - -func (s *Stats) SetStartTime() { - s.mu.Lock() - defer s.mu.Unlock() - s.StartTime = time.Now() -} - -func (s *Stats) SetEndTime() { - s.mu.Lock() - defer s.mu.Unlock() - s.EndTime = time.Now() -} - -func (s *Stats) GetDuration() time.Duration { - s.mu.Lock() - defer s.mu.Unlock() - if s.EndTime.IsZero() { - return time.Since(s.StartTime) - } - return s.EndTime.Sub(s.StartTime) -} - -var ( - placeStats = &Stats{} - cancelStats = &Stats{} +const ( + requestsCount = 100 + responseTimeout = 30 * time.Second + requestRateLimit = 100 * time.Millisecond + maxConcurrency = 10 + channelBufferSize = 10 ) // MultiplexedFuturesWebSocketExample demonstrates concurrent WebSocket requests // with waiter channels for 1:1 request/response matching func MultiplexedFuturesWebSocketExample() { - // Setup structured logging - logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ - Level: slog.LevelInfo, - })) - - logger.Info("=== Enhanced Multiplexed Futures WebSocket Example ===") - logger.Info("Testing concurrent requests with UUID tracking and waiter channels") - - // Validate configuration - if err := AppConfig.Validate(); err != nil { - logger.Error("Configuration error", "error", err) + manager, err := NewWebSocketManager() + if err != nil { + slog.Error("Failed to create WebSocket manager", "error", err) return } - // Setup testnet and context - AppConfig.SetupTestnet() - ctx, cancel = context.WithCancel(context.Background()) - defer cancel() + manager.logger.Info("=== Enhanced Multiplexed Futures WebSocket Example ===") + manager.logger.Info("Testing request/response tracking and waiter channels") - // Create shared WebSocket connection and client - sharedClient, err := createWebSocketClient() - if err != nil { - logger.Error("Failed to create WebSocket client", "error", err) - return - } + // Create context for this operation + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() var wg sync.WaitGroup wg.Add(2) // 2 services will run + // Error channel to collect errors from goroutines + errorChan := make(chan error, 3) // Buffer for all potential errors + // Start monitoring and services - go startFallbackMonitoring(sharedClient) - go startOrderPlaceService(&wg, sharedClient) - go startOrderCancelService(&wg, sharedClient) + go manager.startFallbackMonitoring(ctx) + go manager.startOrderPlaceService(ctx, &wg, errorChan) + go manager.startOrderCancelService(ctx, &wg, errorChan) + + // Monitor for errors in a separate goroutine + go func() { + for err := range errorChan { + if err != nil { + manager.logger.Error("Service error", "error", err) + } + } + }() wg.Wait() - logger.Info("All services completed") - // Print final statistics - printFinalStats() + manager.logger.Info("All services completed") + manager.printFinalStats() } // createWebSocketClient creates a new WebSocket client -func createWebSocketClient() (websocket.Client, error) { +func createWebSocketClient(logger *slog.Logger) (websocket.Client, error) { conn, err := websocket.NewConnection( futures.WsApiInitReadWriteConn, futures.WebsocketKeepalive, @@ -139,108 +85,129 @@ func createWebSocketClient() (websocket.Client, error) { } // startFallbackMonitoring monitors the shared channel for fallback messages -func startFallbackMonitoring(sharedClient websocket.Client) { - logger.Info("Starting fallback message monitoring") +func (wm *WebSocketManager) startFallbackMonitoring(ctx context.Context) { + wm.logger.Info("Starting fallback message monitoring") for { select { - case data := <-sharedClient.GetReadChannel(): - logger.Error("FALLBACK: Message received on shared channel (should be routed to waiter)", + case data := <-wm.client.GetReadChannel(): + wm.logger.Error("FALLBACK: Message received on shared channel (should be routed to waiter)", "data", string(data)) - case err := <-sharedClient.GetReadErrorChannel(): + case err := <-wm.client.GetReadErrorChannel(): if err != nil { - logger.Error("Shared channel error", "error", err) + wm.logger.Error("Shared channel error", "error", err) return } case <-ctx.Done(): - logger.Info("Fallback monitoring stopped") + wm.logger.Info("Fallback monitoring stopped") return } } } // startOrderPlaceService handles order placement requests -func startOrderPlaceService(wg *sync.WaitGroup, sharedClient websocket.Client) { +func (wm *WebSocketManager) startOrderPlaceService(ctx context.Context, wg *sync.WaitGroup, errorChan chan<- error) { defer wg.Done() // Create order place service orderPlaceService, err := futures.NewOrderPlaceWsService( AppConfig.APIKey, AppConfig.SecretKey, - websocket.WithWebSocketClient(sharedClient), + websocket.WithWebSocketClient(wm.client), ) if err != nil { - logger.Error("Failed to create order place service", "error", err) + errorChan <- fmt.Errorf("failed to create order place service: %w", err) return } - // Create dedicated waiter channel - waiterChannel := make(chan []byte, 1) + // Create dedicated waiter channel with proper buffer + waiterChannel := make(chan []byte, channelBufferSize) defer close(waiterChannel) // Initialize stats - placeStats.SetStartTime() + wm.placeStats.SetStartTime() + + // Start request sender and response handler concurrently + var serviceWg sync.WaitGroup + serviceWg.Add(2) + + requestErrors := make(chan error, requestsCount) // Buffer for all possible request errors + + go func() { + defer serviceWg.Done() + wm.sendOrderPlaceRequests(ctx, orderPlaceService, waiterChannel, requestErrors) + close(requestErrors) + }() - // Send multiple requests with concurrency control - go sendOrderPlaceRequests(orderPlaceService, waiterChannel) + go func() { + defer serviceWg.Done() + wm.handleOrderPlaceResponses(ctx, waiterChannel, requestErrors, errorChan) + }() - // Start response listener - handleOrderPlaceResponses(waiterChannel) + serviceWg.Wait() } // handleOrderPlaceResponses handles responses from order place service -func handleOrderPlaceResponses(waiterChannel <-chan []byte) { - serviceLogger := logger.With("service", "order_place") - timeout := time.NewTimer(30 * time.Second) // Overall timeout for all responses +func (wm *WebSocketManager) handleOrderPlaceResponses(ctx context.Context, waiterChannel <-chan []byte, requestErrors <-chan error, errorChan chan<- error) { + serviceLogger := wm.logger.With("service", "order_place") + timeout := time.NewTimer(responseTimeout) defer timeout.Stop() + // Monitor request errors + go func() { + for err := range requestErrors { + if err != nil { + serviceLogger.Error("Request error", "error", err) + } + } + }() + for { select { case data, ok := <-waiterChannel: if !ok { serviceLogger.Info("WaiterChannel closed") - placeStats.SetEndTime() + wm.placeStats.SetEndTime() return } response := string(data) serviceLogger.Debug("Response received", "response", response) if !strings.Contains(response, "place_") { - serviceLogger.Error("Response does not contain expected prefix 'place_'", "response", response) - placeStats.SetEndTime() + err := fmt.Errorf("response does not contain expected prefix 'place_': %s", response) + errorChan <- err + wm.placeStats.SetEndTime() return } - placeStats.IncrementResponses() - sent, received := placeStats.GetCounts() + wm.placeStats.IncrementResponses() + sent, received := wm.placeStats.GetCounts() serviceLogger.Info("Progress", "responses_received", received, "requests_sent", sent) if received >= requestsCount { serviceLogger.Info("All responses received successfully", "total", received) - placeStats.SetEndTime() + wm.placeStats.SetEndTime() return } case <-timeout.C: - sent, received := placeStats.GetCounts() - serviceLogger.Warn("Overall timeout reached", - "requests_sent", sent, - "responses_received", received, - "missing", sent-received) - placeStats.SetEndTime() + sent, received := wm.placeStats.GetCounts() + err := fmt.Errorf("timeout reached: sent=%d, received=%d, missing=%d", sent, received, sent-received) + errorChan <- err + wm.placeStats.SetEndTime() return case <-ctx.Done(): serviceLogger.Info("Response handler stopped") - placeStats.SetEndTime() + wm.placeStats.SetEndTime() return } } } // sendOrderPlaceRequests sends multiple order placement requests concurrently -func sendOrderPlaceRequests(orderPlaceService *futures.OrderPlaceWsService, waiterChannel chan []byte) { - serviceLogger := logger.With("service", "order_place") +func (wm *WebSocketManager) sendOrderPlaceRequests(ctx context.Context, orderPlaceService *futures.OrderPlaceWsService, waiterChannel chan []byte, errorChan chan<- error) { + serviceLogger := wm.logger.With("service", "order_place") // Build request request := futures.NewOrderPlaceWsRequest(). @@ -251,7 +218,7 @@ func sendOrderPlaceRequests(orderPlaceService *futures.OrderPlaceWsService, wait Quantity("0.00001") var wg sync.WaitGroup - semaphore := make(chan struct{}, 10) // Limit concurrent requests + semaphore := make(chan struct{}, maxConcurrency) for i := 0; i < requestsCount; i++ { wg.Add(1) @@ -264,14 +231,14 @@ func sendOrderPlaceRequests(orderPlaceService *futures.OrderPlaceWsService, wait serviceLogger.Debug("Sending request", "request_id", requestID, "index", index) if err := orderPlaceService.Do(requestID, request, websocket.WithWaiter(waiterChannel)); err != nil { - serviceLogger.Error("Failed to send request", "error", err, "request_id", requestID) + errorChan <- fmt.Errorf("failed to send request %s: %w", requestID, err) return } - placeStats.IncrementRequests() + wm.placeStats.IncrementRequests() }(i) // Rate limiting - time.Sleep(100 * time.Millisecond) + time.Sleep(requestRateLimit) } wg.Wait() @@ -279,87 +246,108 @@ func sendOrderPlaceRequests(orderPlaceService *futures.OrderPlaceWsService, wait } // startOrderCancelService handles order cancellation requests -func startOrderCancelService(wg *sync.WaitGroup, sharedClient websocket.Client) { +func (wm *WebSocketManager) startOrderCancelService(ctx context.Context, wg *sync.WaitGroup, errorChan chan<- error) { defer wg.Done() // Create order cancel service orderCancelService, err := futures.NewOrderCancelWsService( AppConfig.APIKey, AppConfig.SecretKey, - websocket.WithWebSocketClient(sharedClient), + websocket.WithWebSocketClient(wm.client), ) if err != nil { - logger.Error("Failed to create order cancel service", "error", err) + errorChan <- fmt.Errorf("failed to create order cancel service: %w", err) return } - // Create dedicated waiter channel - waiterChannel := make(chan []byte, 1) + // Create dedicated waiter channel with proper buffer + waiterChannel := make(chan []byte, channelBufferSize) defer close(waiterChannel) // Initialize stats - cancelStats.SetStartTime() + wm.cancelStats.SetStartTime() + + // Start request sender and response handler concurrently + var serviceWg sync.WaitGroup + serviceWg.Add(2) + + requestErrors := make(chan error, requestsCount) // Buffer for all possible request errors + + go func() { + defer serviceWg.Done() + wm.sendOrderCancelRequests(ctx, orderCancelService, waiterChannel, requestErrors) + close(requestErrors) + }() - // Send multiple requests with concurrency control - go sendOrderCancelRequests(orderCancelService, waiterChannel) + go func() { + defer serviceWg.Done() + wm.handleOrderCancelResponses(ctx, waiterChannel, requestErrors, errorChan) + }() - // Start response listener - handleOrderCancelResponses(waiterChannel) + serviceWg.Wait() } // handleOrderCancelResponses handles responses from order cancel service -func handleOrderCancelResponses(waiterChannel <-chan []byte) { - serviceLogger := logger.With("service", "order_cancel") - timeout := time.NewTimer(30 * time.Second) // Overall timeout for all responses +func (wm *WebSocketManager) handleOrderCancelResponses(ctx context.Context, waiterChannel <-chan []byte, requestErrors <-chan error, errorChan chan<- error) { + serviceLogger := wm.logger.With("service", "order_cancel") + timeout := time.NewTimer(responseTimeout) defer timeout.Stop() + // Monitor request errors + go func() { + for err := range requestErrors { + if err != nil { + serviceLogger.Error("Request error", "error", err) + } + } + }() + for { select { case data, ok := <-waiterChannel: if !ok { serviceLogger.Info("WaiterChannel closed") - cancelStats.SetEndTime() + wm.cancelStats.SetEndTime() return } response := string(data) serviceLogger.Debug("Response received", "response", response) if !strings.Contains(response, "cancel_") { - serviceLogger.Error("Response does not contain expected prefix 'cancel_'", "response", response) - cancelStats.SetEndTime() + err := fmt.Errorf("response does not contain expected prefix 'cancel_': %s", response) + errorChan <- err + wm.cancelStats.SetEndTime() return } - cancelStats.IncrementResponses() - sent, received := cancelStats.GetCounts() + wm.cancelStats.IncrementResponses() + sent, received := wm.cancelStats.GetCounts() serviceLogger.Info("Progress", "responses_received", received, "requests_sent", sent) if received >= requestsCount { serviceLogger.Info("All responses received successfully", "total", received) - cancelStats.SetEndTime() + wm.cancelStats.SetEndTime() return } case <-timeout.C: - sent, received := cancelStats.GetCounts() - serviceLogger.Warn("Overall timeout reached", - "requests_sent", sent, - "responses_received", received, - "missing", sent-received) - cancelStats.SetEndTime() + sent, received := wm.cancelStats.GetCounts() + err := fmt.Errorf("timeout reached: sent=%d, received=%d, missing=%d", sent, received, sent-received) + errorChan <- err + wm.cancelStats.SetEndTime() return case <-ctx.Done(): serviceLogger.Info("Response handler stopped") - cancelStats.SetEndTime() + wm.cancelStats.SetEndTime() return } } } // sendOrderCancelRequests sends multiple order cancellation requests concurrently -func sendOrderCancelRequests(orderCancelService *futures.OrderCancelWsService, waiterChannel chan []byte) { - serviceLogger := logger.With("service", "order_cancel") +func (wm *WebSocketManager) sendOrderCancelRequests(ctx context.Context, orderCancelService *futures.OrderCancelWsService, waiterChannel chan []byte, errorChan chan<- error) { + serviceLogger := wm.logger.With("service", "order_cancel") // Build request request := futures.NewOrderCancelRequest(). @@ -367,7 +355,7 @@ func sendOrderCancelRequests(orderCancelService *futures.OrderCancelWsService, w OrderID(123123) // Non-existing order for testing var wg sync.WaitGroup - semaphore := make(chan struct{}, 10) // Limit concurrent requests + semaphore := make(chan struct{}, maxConcurrency) for i := 0; i < requestsCount; i++ { wg.Add(1) @@ -380,14 +368,14 @@ func sendOrderCancelRequests(orderCancelService *futures.OrderCancelWsService, w serviceLogger.Debug("Sending request", "request_id", requestID, "index", index) if err := orderCancelService.Do(requestID, request, websocket.WithWaiter(waiterChannel)); err != nil { - serviceLogger.Error("Failed to send request", "error", err, "request_id", requestID) + errorChan <- fmt.Errorf("failed to send request %s: %w", requestID, err) return } - cancelStats.IncrementRequests() + wm.cancelStats.IncrementRequests() }(i) // Rate limiting - time.Sleep(100 * time.Millisecond) + time.Sleep(requestRateLimit) } wg.Wait() @@ -395,61 +383,148 @@ func sendOrderCancelRequests(orderCancelService *futures.OrderCancelWsService, w } // printFinalStats prints comprehensive statistics for both services -func printFinalStats() { - logger.Info("=== FINAL STATISTICS ===") +func (wm *WebSocketManager) printFinalStats() { + wm.logger.Info("=== FINAL STATISTICS ===") - // Place stats - placeSent, placeReceived := placeStats.GetCounts() - placeDuration := placeStats.GetDuration() + // Get counts atomically to avoid race conditions + placeSent, placeReceived := wm.placeStats.GetCounts() + placeDuration := wm.placeStats.GetDuration() placeSuccess := placeSent == placeReceived && placeSent == requestsCount - logger.Info("Order Place Service Statistics", + // Calculate average response time safely + var placeAvgTime time.Duration + if placeReceived > 0 { + placeAvgTime = placeDuration / time.Duration(placeReceived) + } + + wm.logger.Info("Order Place Service Statistics", "requests_sent", placeSent, "responses_received", placeReceived, "expected", requestsCount, "missing", placeSent-placeReceived, "success", placeSuccess, "duration", placeDuration, - "avg_response_time", placeDuration/time.Duration(max(1, placeReceived)), + "avg_response_time", placeAvgTime, ) - // Cancel stats - cancelSent, cancelReceived := cancelStats.GetCounts() - cancelDuration := cancelStats.GetDuration() + // Get counts atomically to avoid race conditions + cancelSent, cancelReceived := wm.cancelStats.GetCounts() + cancelDuration := wm.cancelStats.GetDuration() cancelSuccess := cancelSent == cancelReceived && cancelSent == requestsCount - logger.Info("Order Cancel Service Statistics", + // Calculate average response time safely + var cancelAvgTime time.Duration + if cancelReceived > 0 { + cancelAvgTime = cancelDuration / time.Duration(cancelReceived) + } + + wm.logger.Info("Order Cancel Service Statistics", "requests_sent", cancelSent, "responses_received", cancelReceived, "expected", requestsCount, "missing", cancelSent-cancelReceived, "success", cancelSuccess, "duration", cancelDuration, - "avg_response_time", cancelDuration/time.Duration(max(1, cancelReceived)), + "avg_response_time", cancelAvgTime, ) // Overall stats totalSent := placeSent + cancelSent totalReceived := placeReceived + cancelReceived - totalExpected := requestsCount * 2 + totalExpected := int64(requestsCount * 2) overallSuccess := totalSent == totalReceived && totalSent == totalExpected - logger.Info("Overall Statistics", + // Calculate success rate safely + var successRate float64 + if totalSent > 0 { + successRate = float64(totalReceived) / float64(totalSent) * 100 + } + + wm.logger.Info("Overall Statistics", "total_requests_sent", totalSent, "total_responses_received", totalReceived, "total_expected", totalExpected, "total_missing", totalSent-totalReceived, "overall_success", overallSuccess, - "success_rate", fmt.Sprintf("%.2f%%", float64(totalReceived)/float64(totalSent)*100), + "success_rate", fmt.Sprintf("%.2f%%", successRate), ) if !overallSuccess { - logger.Error("VERIFICATION FAILED: Request/Response counts do not match!", + wm.logger.Error("VERIFICATION FAILED: Request/Response counts do not match!", "sent", totalSent, "received", totalReceived, "expected", totalExpected, ) } else { - logger.Info("✅ VERIFICATION PASSED: All requests received responses!") + wm.logger.Info("✅ VERIFICATION PASSED: All requests received responses!") + } +} + +// WebSocketManager encapsulates the WebSocket client and statistics +type WebSocketManager struct { + logger *slog.Logger + placeStats *Stats + cancelStats *Stats + client websocket.Client +} + +// NewWebSocketManager creates a new manager with proper initialization +func NewWebSocketManager() (*WebSocketManager, error) { + logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelInfo, + })) + + client, err := createWebSocketClient(logger) + if err != nil { + return nil, fmt.Errorf("failed to create WebSocket client: %w", err) + } + + return &WebSocketManager{ + logger: logger, + placeStats: &Stats{}, + cancelStats: &Stats{}, + client: client, + }, nil +} + +// Stats tracks request/response statistics using atomic operations +type Stats struct { + requestsSent int64 + responsesReceived int64 + startTime time.Time + endTime time.Time + mu sync.RWMutex // Only for time operations +} + +func (s *Stats) IncrementRequests() { + atomic.AddInt64(&s.requestsSent, 1) +} + +func (s *Stats) IncrementResponses() { + atomic.AddInt64(&s.responsesReceived, 1) +} + +func (s *Stats) GetCounts() (int64, int64) { + return atomic.LoadInt64(&s.requestsSent), atomic.LoadInt64(&s.responsesReceived) +} + +func (s *Stats) SetStartTime() { + s.mu.Lock() + defer s.mu.Unlock() + s.startTime = time.Now() +} + +func (s *Stats) SetEndTime() { + s.mu.Lock() + defer s.mu.Unlock() + s.endTime = time.Now() +} + +func (s *Stats) GetDuration() time.Duration { + s.mu.RLock() + defer s.mu.RUnlock() + if s.endTime.IsZero() { + return time.Since(s.startTime) } + return s.endTime.Sub(s.startTime) } From f9ea569f68f702bf37784df39883b994e4c33408 Mon Sep 17 00:00:00 2001 From: Artur Abelian Date: Sat, 18 Oct 2025 13:13:21 +0200 Subject: [PATCH 06/11] feat: benchamrking client request list --- v2/common/websocket/client_test.go | 99 ++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) diff --git a/v2/common/websocket/client_test.go b/v2/common/websocket/client_test.go index 27ac1cca0..19fb17ccb 100644 --- a/v2/common/websocket/client_test.go +++ b/v2/common/websocket/client_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "log" "net/http" "testing" @@ -289,3 +290,101 @@ func startWsTestServer(stopCh chan struct{}) { } log.Println("Graceful shutdown complete.") } + +// Memory benchmark tests for channel references in RequestList +func BenchmarkRequestList_ChannelMemory(b *testing.B) { + tests := []struct { + name string + mapSize int + channelType string + }{ + {"SameChannel_1000entries", 1000, "same"}, + {"SameChannel_10000entries", 10000, "same"}, + {"DifferentChannels_1000entries", 1000, "different"}, + {"DifferentChannels_10000entries", 10000, "different"}, + } + + for _, tt := range tests { + b.Run(tt.name, func(b *testing.B) { + b.ReportAllocs() + + for n := 0; n < b.N; n++ { + requestList := NewRequestList() + + if tt.channelType == "same" { + // Test: Multiple map entries with SAME channel reference + sharedChannel := make(chan []byte, 1) + + for i := 0; i < tt.mapSize; i++ { + requestID := fmt.Sprintf("req_%d_%d", n, i) + requestList.Add(requestID, sharedChannel) + } + + } else { + // Test: Multiple map entries with DIFFERENT channels + for i := 0; i < tt.mapSize; i++ { + requestID := fmt.Sprintf("req_%d_%d", n, i) + uniqueChannel := make(chan []byte, 1) + requestList.Add(requestID, uniqueChannel) + } + } + + // Verify the map size + if requestList.Len() != tt.mapSize { + b.Fatalf("Expected %d entries, got %d", tt.mapSize, requestList.Len()) + } + } + }) + } +} + +// Benchmark to demonstrate memory efficiency of channel references +func BenchmarkChannelReference_vs_ChannelCopy(b *testing.B) { + const numEntries = 1000 + + b.Run("ChannelReferences", func(b *testing.B) { + b.ReportAllocs() + + for n := 0; n < b.N; n++ { + // Simulate the actual RequestList behavior + requests := make(map[string]chan []byte, numEntries) + sharedChannel := make(chan []byte, 1) + + for i := 0; i < numEntries; i++ { + requestID := fmt.Sprintf("req_%d", i) + requests[requestID] = sharedChannel // Store reference + } + + // Verify all entries point to the same channel + firstChan := requests["req_0"] + for i := 1; i < numEntries; i++ { + requestID := fmt.Sprintf("req_%d", i) + if requests[requestID] != firstChan { + b.Fatal("Channels should be identical references") + } + } + } + }) + + b.Run("UniqueChannels", func(b *testing.B) { + b.ReportAllocs() + + for n := 0; n < b.N; n++ { + requests := make(map[string]chan []byte, numEntries) + + for i := 0; i < numEntries; i++ { + requestID := fmt.Sprintf("req_%d", i) + requests[requestID] = make(chan []byte, 1) // Create unique channel + } + + // Verify all channels are different + for i := 1; i < numEntries; i++ { + req0 := fmt.Sprintf("req_%d", 0) + reqI := fmt.Sprintf("req_%d", i) + if requests[req0] == requests[reqI] { + b.Fatal("Channels should be unique") + } + } + } + }) +} From d4249e60040b83284d689d24f7df0fe08da59713 Mon Sep 17 00:00:00 2001 From: Artur Abelian Date: Sat, 18 Oct 2025 13:41:49 +0200 Subject: [PATCH 07/11] feat: rename request --- v2/common/websocket/client.go | 28 ++++++++++++++-------------- v2/common/websocket/client_test.go | 2 +- v2/common/websocket/types.go | 8 ++++---- 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/v2/common/websocket/client.go b/v2/common/websocket/client.go index b04571306..eb5c4a912 100644 --- a/v2/common/websocket/client.go +++ b/v2/common/websocket/client.go @@ -29,8 +29,8 @@ var ( // ErrorWsReadConnectionTimeout defines that connection read timeout expired ErrorWsReadConnectionTimeout = errors.New("ws error: read connection timeout") - // ErrorWsIdAlreadySent defines that asyncWriteRequest with the same id was already sent - ErrorWsIdAlreadySent = errors.New("ws error: asyncWriteRequest with same id already sent") + // ErrorWsIdAlreadySent defines that request with the same id was already sent + ErrorWsIdAlreadySent = errors.New("ws error: request with same id already sent") // KeepAlivePingDeadline defines deadline to send ping frame KeepAlivePingDeadline = 10 * time.Second @@ -39,7 +39,7 @@ var ( WaitCheckInternal = 300 * time.Millisecond ) -// messageId define id field of asyncWriteRequest/response +// messageId define id field of request/response type messageId struct { Id string `json:"id"` } @@ -87,16 +87,16 @@ func NewClient(conn Connection) (Client, error) { return client, nil } -type asyncWriteRequest struct { +type request struct { waiter chan []byte } -// RequestOption define option type for asyncWriteRequest -type RequestOption func(*asyncWriteRequest) +// RequestOption define option type for request +type RequestOption func(*request) -// WithWaiter set waiter channel param for the asyncWriteRequest +// WithWaiter set waiter channel param for the request func WithWaiter(waiter chan []byte) RequestOption { - return func(r *asyncWriteRequest) { + return func(r *request) { r.waiter = waiter } } @@ -120,7 +120,7 @@ func (c *client) Write(id string, data []byte, opts ...RequestOption) error { return ErrorWsIdAlreadySent } - req := &asyncWriteRequest{} + req := &request{} for _, opt := range opts { opt(req) } @@ -230,7 +230,7 @@ func (c *client) read() { c.readC <- message } - c.debug("read: remove message from asyncWriteRequest list '%v'", msg) + c.debug("read: remove message from request list '%v'", msg) c.requestsList.Remove(msg.Id) } } @@ -302,7 +302,7 @@ func (c *client) GetReconnectCount() int64 { return atomic.LoadInt64(&c.reconnectCount) } -// NewRequestList creates asyncWriteRequest list +// NewRequestList creates request list func NewRequestList() RequestList { return RequestList{ mu: sync.Mutex{}, @@ -316,7 +316,7 @@ type RequestList struct { requests map[string]chan []byte } -// Add adds asyncWriteRequest into list +// Add adds request into list func (l *RequestList) Add(id string, waiterChan chan []byte) { l.mu.Lock() defer l.mu.Unlock() @@ -329,14 +329,14 @@ func (l *RequestList) Get(id string) chan []byte { return l.requests[id] } -// RecreateList creates new asyncWriteRequest list +// RecreateList creates new request list func (l *RequestList) RecreateList() { l.mu.Lock() defer l.mu.Unlock() l.requests = make(map[string]chan []byte) } -// Remove adds asyncWriteRequest from list +// Remove adds request from list func (l *RequestList) Remove(id string) { l.mu.Lock() defer l.mu.Unlock() diff --git a/v2/common/websocket/client_test.go b/v2/common/websocket/client_test.go index 19fb17ccb..1638e6a6d 100644 --- a/v2/common/websocket/client_test.go +++ b/v2/common/websocket/client_test.go @@ -96,7 +96,7 @@ func (s *clientTestSuite) TestReadWriteSync() { requestID := id.String() req := testApiRequest{ - Id: "some-other-asyncWriteRequest-id", + Id: "some-other-request-id", Method: "some-method", Params: map[string]interface{}{}, } diff --git a/v2/common/websocket/types.go b/v2/common/websocket/types.go index a4c0dd847..d8efcc722 100644 --- a/v2/common/websocket/types.go +++ b/v2/common/websocket/types.go @@ -13,7 +13,7 @@ import ( // WsApiMethodType define method name for websocket API type WsApiMethodType string -// WsApiRequest define common websocket API asyncWriteRequest +// WsApiRequest define common websocket API request type WsApiRequest struct { Id string `json:"id"` Method WsApiMethodType `json:"method"` @@ -77,8 +77,8 @@ const ( ) var ( - // ErrorRequestIDNotSet defines that asyncWriteRequest ID is not set - ErrorRequestIDNotSet = errors.New("ws service: asyncWriteRequest id is not set") + // ErrorRequestIDNotSet defines that request ID is not set + ErrorRequestIDNotSet = errors.New("ws service: request id is not set") // ErrorApiKeyIsNotSet defines that ApiKey is not set ErrorApiKeyIsNotSet = errors.New("ws service: api key is not set") @@ -111,7 +111,7 @@ type RequestData struct { keyType string } -// CreateRequest creates signed ws asyncWriteRequest +// CreateRequest creates signed ws request func CreateRequest(reqData RequestData, method WsApiMethodType, params map[string]interface{}) ([]byte, error) { if reqData.requestID == "" { return nil, ErrorRequestIDNotSet From 4a057438748892b4b0d04c8ffce44f7ca9c4249a Mon Sep 17 00:00:00 2001 From: Artur Abelian Date: Sat, 18 Oct 2025 13:43:45 +0200 Subject: [PATCH 08/11] feat: refactoring --- v2/common/websocket/{ws_service_options.go => service_options.go} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename v2/common/websocket/{ws_service_options.go => service_options.go} (100%) diff --git a/v2/common/websocket/ws_service_options.go b/v2/common/websocket/service_options.go similarity index 100% rename from v2/common/websocket/ws_service_options.go rename to v2/common/websocket/service_options.go From ad583bf7b1a0041c017d8b66f1698b1566ba04d0 Mon Sep 17 00:00:00 2001 From: Artur Abelian Date: Sun, 19 Oct 2025 11:28:23 +0200 Subject: [PATCH 09/11] feat: refactoring --- v2/common/websocket/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v2/common/websocket/client.go b/v2/common/websocket/client.go index eb5c4a912..8c408cf08 100644 --- a/v2/common/websocket/client.go +++ b/v2/common/websocket/client.go @@ -310,7 +310,7 @@ func NewRequestList() RequestList { } } -// RequestList state of waiters that was sent/received with or without waiter channel +// RequestList state of requests that was sent/received with or without waiter channel type RequestList struct { mu sync.Mutex requests map[string]chan []byte From 6aa286d5b9cf8892c5f94828d60be930de62932f Mon Sep 17 00:00:00 2001 From: Artur Abelian Date: Sun, 19 Oct 2025 12:07:32 +0200 Subject: [PATCH 10/11] feat: refactoring services constructor and client creation --- v2/common/websocket/service_options.go | 10 ++++++- v2/futures/account_service_ws.go | 40 ++++++++++++++++++------- v2/futures/order_cancel_service_ws.go | 21 +++++++------ v2/futures/order_place_service_ws.go | 21 +++++++------ v2/futures/order_status_service_ws.go | 21 +++++++------ v2/order_list_cancel_service_ws.go | 21 +++++++------ v2/order_list_place_oto_service_ws.go | 21 +++++++------ v2/order_list_place_otoco_service_ws.go | 21 +++++++------ v2/order_list_place_service_ws.go | 21 +++++++------ v2/order_list_service_ws_create.go | 21 +++++++------ v2/order_service_ws_create.go | 21 +++++++------ v2/sor_order_place_service_ws.go | 21 +++++++------ v2/sor_order_test_service_ws.go | 21 +++++++------ 13 files changed, 148 insertions(+), 133 deletions(-) diff --git a/v2/common/websocket/service_options.go b/v2/common/websocket/service_options.go index 984bb1f65..8288167a5 100644 --- a/v2/common/websocket/service_options.go +++ b/v2/common/websocket/service_options.go @@ -4,7 +4,8 @@ package websocket type WebSocketServiceOption func(serviceOpt *WebSocketServiceCreateOption) type WebSocketServiceCreateOption struct { - Client Client + Client Client + RecvWindow int64 } // WithWebSocketClient creates an option to set the websocket Client for any WebSocket service @@ -13,3 +14,10 @@ func WithWebSocketClient(client Client) WebSocketServiceOption { opt.Client = client } } + +// WithRecvWindow creates an option to set the receive window for WebSocket services +func WithRecvWindow(recvWindow int64) WebSocketServiceOption { + return func(opt *WebSocketServiceCreateOption) { + opt.RecvWindow = recvWindow + } +} diff --git a/v2/futures/account_service_ws.go b/v2/futures/account_service_ws.go index 21421299a..fe09c0944 100644 --- a/v2/futures/account_service_ws.go +++ b/v2/futures/account_service_ws.go @@ -19,28 +19,46 @@ type WsAccountService struct { } func NewWsAccountService(apiKey, secretKey string, recvWindow ...int64) (*WsAccountService, error) { - conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) - if err != nil { - return nil, err + opts := []websocket.WebSocketServiceOption{} + if len(recvWindow) > 0 { + opts = append(opts, websocket.WithRecvWindow(recvWindow[0])) } + return NewWsAccountServiceWithOptions(apiKey, secretKey, opts...) +} - client, err := websocket.NewClient(conn) - if err != nil { - return nil, err +func NewWsAccountServiceWithOptions(apiKey, secretKey string, opts ...websocket.WebSocketServiceOption) (*WsAccountService, error) { + createOpts := &websocket.WebSocketServiceCreateOption{} + for _, opt := range opts { + opt(createOpts) } window := int64(5000) - if len(recvWindow) > 0 { - window = recvWindow[0] + if createOpts.RecvWindow > 0 { + window = createOpts.RecvWindow } - return &WsAccountService{ - c: client, + service := &WsAccountService{ ApiKey: apiKey, SecretKey: secretKey, KeyType: common.KeyTypeHmac, RecvWindow: window, - }, nil + } + + if createOpts.Client != nil { + service.c = createOpts.Client + } else { + conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) + if err != nil { + return nil, err + } + client, err := websocket.NewClient(conn) + if err != nil { + return nil, err + } + service.c = client + } + + return service, nil } type WsAccountV2InfoResponse struct { diff --git a/v2/futures/order_cancel_service_ws.go b/v2/futures/order_cancel_service_ws.go index e61a31a87..6f591ca04 100644 --- a/v2/futures/order_cancel_service_ws.go +++ b/v2/futures/order_cancel_service_ws.go @@ -97,19 +97,18 @@ func NewOrderCancelWsService(apiKey, secretKey string, opts ...websocket.WebSock if createOpts.Client != nil { service.c = createOpts.Client - return service, nil + } else { + conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) + if err != nil { + return nil, err + } + client, err := websocket.NewClient(conn) + if err != nil { + return nil, err + } + service.c = client } - conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) - if err != nil { - return nil, err - } - client, err := websocket.NewClient(conn) - if err != nil { - return nil, err - } - service.c = client - return service, nil } diff --git a/v2/futures/order_place_service_ws.go b/v2/futures/order_place_service_ws.go index deef85800..290f5a2c5 100644 --- a/v2/futures/order_place_service_ws.go +++ b/v2/futures/order_place_service_ws.go @@ -31,18 +31,17 @@ func NewOrderPlaceWsService(apiKey, secretKey string, opts ...websocket.WebSocke if createOpts.Client != nil { service.c = createOpts.Client - return service, nil - } - - conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) - if err != nil { - return nil, err - } - client, err := websocket.NewClient(conn) - if err != nil { - return nil, err + } else { + conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) + if err != nil { + return nil, err + } + client, err := websocket.NewClient(conn) + if err != nil { + return nil, err + } + service.c = client } - service.c = client return service, nil } diff --git a/v2/futures/order_status_service_ws.go b/v2/futures/order_status_service_ws.go index 28fde6206..5e801dd50 100644 --- a/v2/futures/order_status_service_ws.go +++ b/v2/futures/order_status_service_ws.go @@ -32,19 +32,18 @@ func NewOrderStatusWsService(apiKey, secretKey string, opts ...websocket.WebSock if createOpts.Client != nil { service.c = createOpts.Client - return service, nil + } else { + conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) + if err != nil { + return nil, err + } + client, err := websocket.NewClient(conn) + if err != nil { + return nil, err + } + service.c = client } - conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) - if err != nil { - return nil, err - } - client, err := websocket.NewClient(conn) - if err != nil { - return nil, err - } - service.c = client - return service, nil } diff --git a/v2/order_list_cancel_service_ws.go b/v2/order_list_cancel_service_ws.go index 5faa40fde..ee7f28228 100644 --- a/v2/order_list_cancel_service_ws.go +++ b/v2/order_list_cancel_service_ws.go @@ -32,19 +32,18 @@ func NewOrderListCancelWsService(apiKey, secretKey string, opts ...websocket.Web if createOpts.Client != nil { service.c = createOpts.Client - return service, nil + } else { + conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) + if err != nil { + return nil, err + } + client, err := websocket.NewClient(conn) + if err != nil { + return nil, err + } + service.c = client } - conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) - if err != nil { - return nil, err - } - client, err := websocket.NewClient(conn) - if err != nil { - return nil, err - } - service.c = client - return service, nil } diff --git a/v2/order_list_place_oto_service_ws.go b/v2/order_list_place_oto_service_ws.go index 2062128f0..1a39abf22 100644 --- a/v2/order_list_place_oto_service_ws.go +++ b/v2/order_list_place_oto_service_ws.go @@ -32,18 +32,17 @@ func NewOrderListPlaceOtoWsService(apiKey, secretKey string, opts ...websocket.W if createOpts.Client != nil { service.c = createOpts.Client - return service, nil - } - - conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) - if err != nil { - return nil, err - } - client, err := websocket.NewClient(conn) - if err != nil { - return nil, err + } else { + conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) + if err != nil { + return nil, err + } + client, err := websocket.NewClient(conn) + if err != nil { + return nil, err + } + service.c = client } - service.c = client return service, nil } diff --git a/v2/order_list_place_otoco_service_ws.go b/v2/order_list_place_otoco_service_ws.go index ca252f8a3..a916d23ea 100644 --- a/v2/order_list_place_otoco_service_ws.go +++ b/v2/order_list_place_otoco_service_ws.go @@ -32,18 +32,17 @@ func NewOrderListPlaceOtocoWsService(apiKey, secretKey string, opts ...websocket if createOpts.Client != nil { service.c = createOpts.Client - return service, nil - } - - conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) - if err != nil { - return nil, err - } - client, err := websocket.NewClient(conn) - if err != nil { - return nil, err + } else { + conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) + if err != nil { + return nil, err + } + client, err := websocket.NewClient(conn) + if err != nil { + return nil, err + } + service.c = client } - service.c = client return service, nil } diff --git a/v2/order_list_place_service_ws.go b/v2/order_list_place_service_ws.go index a89960789..d46559b98 100644 --- a/v2/order_list_place_service_ws.go +++ b/v2/order_list_place_service_ws.go @@ -32,18 +32,17 @@ func NewOrderListPlaceWsService(apiKey, secretKey string, opts ...websocket.WebS if createOpts.Client != nil { service.c = createOpts.Client - return service, nil - } - - conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) - if err != nil { - return nil, err - } - client, err := websocket.NewClient(conn) - if err != nil { - return nil, err + } else { + conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) + if err != nil { + return nil, err + } + client, err := websocket.NewClient(conn) + if err != nil { + return nil, err + } + service.c = client } - service.c = client return service, nil } diff --git a/v2/order_list_service_ws_create.go b/v2/order_list_service_ws_create.go index 217b60755..bb319976f 100644 --- a/v2/order_list_service_ws_create.go +++ b/v2/order_list_service_ws_create.go @@ -32,18 +32,17 @@ func NewOrderListCreateWsService(apiKey, secretKey string, opts ...websocket.Web if createOpts.Client != nil { service.c = createOpts.Client - return service, nil - } - - conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) - if err != nil { - return nil, err - } - client, err := websocket.NewClient(conn) - if err != nil { - return nil, err + } else { + conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) + if err != nil { + return nil, err + } + client, err := websocket.NewClient(conn) + if err != nil { + return nil, err + } + service.c = client } - service.c = client return service, nil } diff --git a/v2/order_service_ws_create.go b/v2/order_service_ws_create.go index d91144134..668bd7f13 100644 --- a/v2/order_service_ws_create.go +++ b/v2/order_service_ws_create.go @@ -32,18 +32,17 @@ func NewOrderCreateWsService(apiKey, secretKey string, opts ...websocket.WebSock if createOpts.Client != nil { service.c = createOpts.Client - return service, nil - } - - conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) - if err != nil { - return nil, err - } - client, err := websocket.NewClient(conn) - if err != nil { - return nil, err + } else { + conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) + if err != nil { + return nil, err + } + client, err := websocket.NewClient(conn) + if err != nil { + return nil, err + } + service.c = client } - service.c = client return service, nil } diff --git a/v2/sor_order_place_service_ws.go b/v2/sor_order_place_service_ws.go index 1c5058ce5..868e3abbf 100644 --- a/v2/sor_order_place_service_ws.go +++ b/v2/sor_order_place_service_ws.go @@ -32,18 +32,17 @@ func NewSorOrderPlaceWsService(apiKey, secretKey string, opts ...websocket.WebSo if createOpts.Client != nil { service.c = createOpts.Client - return service, nil - } - - conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) - if err != nil { - return nil, err - } - client, err := websocket.NewClient(conn) - if err != nil { - return nil, err + } else { + conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) + if err != nil { + return nil, err + } + client, err := websocket.NewClient(conn) + if err != nil { + return nil, err + } + service.c = client } - service.c = client return service, nil } diff --git a/v2/sor_order_test_service_ws.go b/v2/sor_order_test_service_ws.go index 681b821a2..30c014f54 100644 --- a/v2/sor_order_test_service_ws.go +++ b/v2/sor_order_test_service_ws.go @@ -32,19 +32,18 @@ func NewSorOrderTestWsService(apiKey, secretKey string, opts ...websocket.WebSoc if createOpts.Client != nil { service.c = createOpts.Client - return service, nil + } else { + conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) + if err != nil { + return nil, err + } + client, err := websocket.NewClient(conn) + if err != nil { + return nil, err + } + service.c = client } - conn, err := websocket.NewConnection(WsApiInitReadWriteConn, WebsocketKeepalive, WebsocketTimeoutReadWriteConnection) - if err != nil { - return nil, err - } - client, err := websocket.NewClient(conn) - if err != nil { - return nil, err - } - service.c = client - return service, nil } From 0cd969017c58a63127e5c3da0a4a47ca3a29e2d1 Mon Sep 17 00:00:00 2001 From: Artur Abelian Date: Sun, 19 Oct 2025 12:17:24 +0200 Subject: [PATCH 11/11] feat: dynamic ws port for testing --- v2/common/websocket/client_test.go | 37 ++++++++++++++++++++++++++---- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/v2/common/websocket/client_test.go b/v2/common/websocket/client_test.go index 1638e6a6d..79110ff5b 100644 --- a/v2/common/websocket/client_test.go +++ b/v2/common/websocket/client_test.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "log" + "net" "net/http" "testing" "time" @@ -32,19 +33,43 @@ type clientTestSuite struct { secretKey string } +// findAvailablePort finds and returns an available port on localhost +func findAvailablePort() (int, error) { + listener, err := net.Listen("tcp", "localhost:0") + if err != nil { + return 0, fmt.Errorf("failed to find available port: %w", err) + } + defer listener.Close() + + addr := listener.Addr().(*net.TCPAddr) + return addr.Port, nil +} + +// createTestWebSocketURL creates a websocket URL for the given port +func createTestWebSocketURL(port int) string { + return fmt.Sprintf("ws://localhost:%d/ws", port) +} + func TestClient(t *testing.T) { suite.Run(t, new(clientTestSuite)) } func (s *clientTestSuite) TestReadWriteSync() { + // Find an available port + port, err := findAvailablePort() + s.Require().NoError(err) + stopCh := make(chan struct{}) go func() { - startWsTestServer(stopCh) + startWsTestServer(port, stopCh) }() defer func() { stopCh <- struct{}{} }() + // Give server time to start + time.Sleep(100 * time.Millisecond) + conn, err := NewConnection(func() (*websocket.Conn, error) { Dialer := websocket.Dialer{ Proxy: http.ProxyFromEnvironment, @@ -52,7 +77,8 @@ func (s *clientTestSuite) TestReadWriteSync() { EnableCompression: false, } - c, _, err := Dialer.Dial("ws://localhost:8080/ws", nil) + wsURL := createTestWebSocketURL(port) + c, _, err := Dialer.Dial(wsURL, nil) if err != nil { return nil, err } @@ -265,13 +291,14 @@ func wsHandler(w http.ResponseWriter, r *http.Request) { } } -func startWsTestServer(stopCh chan struct{}) { +func startWsTestServer(port int, stopCh chan struct{}) { + addr := fmt.Sprintf("localhost:%d", port) server := &http.Server{ - Addr: "localhost:8080", + Addr: addr, } http.HandleFunc("/ws", wsHandler) - log.Println("WebSocket server started on :8080") + log.Printf("WebSocket server started on :%d", port) go func() { if err := server.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {