From 69649007c8f94ca95a526f3409a0308cdda11df7 Mon Sep 17 00:00:00 2001 From: Theodor Rauch Date: Wed, 20 Aug 2025 08:48:54 +0200 Subject: [PATCH 1/8] fix: switch to exchangeKey made from path and remote addr instead of token.hash --- net/blockwise/blockwise.go | 107 +++++++++++++++++++++++--------- net/blockwise/blockwise_test.go | 34 ++++++++-- tcp/client/conn.go | 2 +- udp/client/conn.go | 2 +- 4 files changed, 108 insertions(+), 37 deletions(-) diff --git a/net/blockwise/blockwise.go b/net/blockwise/blockwise.go index 33bc53b5..eabec50f 100644 --- a/net/blockwise/blockwise.go +++ b/net/blockwise/blockwise.go @@ -5,7 +5,9 @@ import ( "context" "errors" "fmt" + "hash/fnv" "io" + "net" "time" "github.com/dsnet/golib/memfile" @@ -156,7 +158,7 @@ func newRequestGuard(request *pool.Message) *messageGuard { } // New provides blockwise. -// getSentRequestFromOutside must returns a copy of request which will be released after use. +// getSentRequestFromOutside must return a copy of request which will be released after use. func New[C Client]( cc C, expiration time.Duration, @@ -202,7 +204,7 @@ func payloadSizeError(err error) error { return fmt.Errorf("cannot get size of payload: %w", err) } -// Do sends an coap message and returns an coap response via blockwise transfer. +// Do sends a coap message and returns a coap response via blockwise transfer. func (b *BlockWise[C]) Do(r *pool.Message, maxSzx SZX, maxMessageSize uint32, do func(req *pool.Message) (*pool.Message, error)) (*pool.Message, error) { if maxSzx > SZXBERT { return nil, errors.New("invalid szx") @@ -347,25 +349,31 @@ func (b *BlockWise[C]) getSendingMessageCode(token uint64) (codes.Code, bool) { } // Handle middleware which constructs COAP request from blockwise transfer and send COAP response via blockwise. -func (b *BlockWise[C]) Handle(w *responsewriter.ResponseWriter[C], r *pool.Message, maxSZX SZX, maxMessageSize uint32, next func(w *responsewriter.ResponseWriter[C], r *pool.Message)) { +func (b *BlockWise[C]) Handle(w *responsewriter.ResponseWriter[C], r *pool.Message, maxSZX SZX, maxMessageSize uint32, remoteAddr net.Addr, next func(w *responsewriter.ResponseWriter[C], r *pool.Message)) { if maxSZX > SZXBERT { panic("invalid maxSZX") } - token := r.Token() - if len(token) == 0 { - err := b.handleReceivedMessage(w, r, maxSZX, maxMessageSize, next) - if err != nil { - b.sendEntityIncomplete(w, token) - b.errors(fmt.Errorf("handleReceivedMessage(%v): %w", r, err)) - } + exchangeKey, err := getExchangeKey(remoteAddr, r) + if err != nil { + b.errors(fmt.Errorf("cannot get exchange key from request(%v): %w", r, err)) return } - tokenStr := token.Hash() - sendingMessageCode, sendingMessageExist := b.getSendingMessageCode(tokenStr) + token := r.Token() + // + //if len(token) == 0 { + // err := b.handleReceivedMessage(w, r, maxSZX, maxMessageSize, next) + // if err != nil { + // b.sendEntityIncomplete(w, token) + // b.errors(fmt.Errorf("handleReceivedMessage(%v): %w", r, err)) + // } + // return + //} + + sendingMessageCode, sendingMessageExist := b.getSendingMessageCode(exchangeKey) if !sendingMessageExist || wantsToBeReceived(r) { - err := b.handleReceivedMessage(w, r, maxSZX, maxMessageSize, next) + err := b.handleReceivedMessage(w, r, maxSZX, maxMessageSize, remoteAddr, next) if err != nil { b.sendEntityIncomplete(w, token) b.errors(fmt.Errorf("handleReceivedMessage(%v): %w", r, err)) @@ -374,17 +382,17 @@ func (b *BlockWise[C]) Handle(w *responsewriter.ResponseWriter[C], r *pool.Messa } more, err := b.continueSendingMessage(w, r, maxSZX, maxMessageSize, sendingMessageCode) if err != nil { - b.sendingMessagesCache.Delete(tokenStr) + b.sendingMessagesCache.Delete(exchangeKey) b.errors(fmt.Errorf("continueSendingMessage(%v): %w", r, err)) return } // For codes GET,POST,PUT,DELETE, we want them to wait for pairing response and then delete them when the full response comes in or when timeout occurs. if !more && sendingMessageCode > codes.DELETE { - b.sendingMessagesCache.Delete(tokenStr) + b.sendingMessagesCache.Delete(exchangeKey) } } -func (b *BlockWise[C]) handleReceivedMessage(w *responsewriter.ResponseWriter[C], r *pool.Message, maxSZX SZX, maxMessageSize uint32, next func(w *responsewriter.ResponseWriter[C], r *pool.Message)) error { +func (b *BlockWise[C]) handleReceivedMessage(w *responsewriter.ResponseWriter[C], r *pool.Message, maxSZX SZX, maxMessageSize uint32, remoteAddr net.Addr, next func(w *responsewriter.ResponseWriter[C], r *pool.Message)) error { startSendingMessageBlock, err := EncodeBlockOption(maxSZX, 0, true) if err != nil { return fmt.Errorf("cannot encode start sending message block option(%v,%v,%v): %w", maxSZX, 0, true, err) @@ -402,13 +410,13 @@ func (b *BlockWise[C]) handleReceivedMessage(w *responsewriter.ResponseWriter[C] } case codes.POST, codes.PUT: maxSZX = fitSZX(r, message.Block1, maxSZX) - errP := b.processReceivedMessage(w, r, maxSZX, next, message.Block1, message.Size1) + errP := b.processReceivedMessage(w, r, maxSZX, next, remoteAddr, message.Block1, message.Size1) if errP != nil { return errP } default: maxSZX = fitSZX(r, message.Block2, maxSZX) - errP := b.processReceivedMessage(w, r, maxSZX, next, message.Block2, message.Size2) + errP := b.processReceivedMessage(w, r, maxSZX, next, remoteAddr, message.Block2, message.Size2) if errP != nil { return errP } @@ -566,8 +574,9 @@ func (b *BlockWise[C]) startSendingMessage(w *responsewriter.ResponseWriter[C], return nil } -func (b *BlockWise[C]) getSentRequest(token message.Token) *pool.Message { - data, ok := b.sendingMessagesCache.LoadWithFunc(token.Hash(), func(value *cache.Element[*pool.Message]) *cache.Element[*pool.Message] { +func (b *BlockWise[C]) getSentRequest(exchangeKey uint64, token message.Token) *pool.Message { + + data, ok := b.sendingMessagesCache.LoadWithFunc(exchangeKey, func(value *cache.Element[*pool.Message]) *cache.Element[*pool.Message] { if value == nil { return nil } @@ -589,7 +598,7 @@ func (b *BlockWise[C]) getSentRequest(token message.Token) *pool.Message { return nil } -func (b *BlockWise[C]) handleObserveResponse(sentRequest *pool.Message) (message.Token, time.Time, error) { +func (b *BlockWise[C]) handleObserveResponse(sentRequest *pool.Message, exchangeKey uint64) (message.Token, time.Time, error) { // https://tools.ietf.org/html/rfc7959#section-2.6 - performs GET with new token. if sentRequest == nil { return nil, time.Time{}, errors.New("observation is not registered") @@ -601,7 +610,7 @@ func (b *BlockWise[C]) handleObserveResponse(sentRequest *pool.Message) (message validUntil := time.Now().Add(b.expiration) // context of observation can be expired. bwSentRequest := b.cloneMessage(sentRequest) bwSentRequest.SetToken(token) - _, loaded := b.sendingMessagesCache.LoadOrStore(token.Hash(), cache.NewElement(bwSentRequest, validUntil, nil)) + _, loaded := b.sendingMessagesCache.LoadOrStore(exchangeKey, cache.NewElement(bwSentRequest, validUntil, nil)) if loaded { return nil, time.Time{}, errors.New("cannot process message: message with token already exist") } @@ -739,7 +748,7 @@ func (b *BlockWise[C]) getCachedReceivedMessage(mg *messageGuard, r *pool.Messag } //nolint:gocyclo,gocognit -func (b *BlockWise[C]) processReceivedMessage(w *responsewriter.ResponseWriter[C], r *pool.Message, maxSzx SZX, next func(w *responsewriter.ResponseWriter[C], r *pool.Message), blockType message.OptionID, sizeType message.OptionID) error { +func (b *BlockWise[C]) processReceivedMessage(w *responsewriter.ResponseWriter[C], r *pool.Message, maxSzx SZX, next func(w *responsewriter.ResponseWriter[C], r *pool.Message), remoteAddr net.Addr, blockType message.OptionID, sizeType message.OptionID) error { token := r.Token() if len(token) == 0 { next(w, r) @@ -761,7 +770,13 @@ func (b *BlockWise[C]) processReceivedMessage(w *responsewriter.ResponseWriter[C if err != nil { return fmt.Errorf("cannot decode block option: %w", err) } - sentRequest := b.getSentRequest(token) + + exchangeKey, err := getExchangeKey(remoteAddr, r) + if err != nil { + return fmt.Errorf("cannot get exchange key from request(%v): %w", r, err) + } + + sentRequest := b.getSentRequest(exchangeKey, token) if sentRequest != nil { defer b.cc.ReleaseMessage(sentRequest) } @@ -770,15 +785,14 @@ func (b *BlockWise[C]) processReceivedMessage(w *responsewriter.ResponseWriter[C return errors.New("cannot request body without paired request") } if isObserveResponse(r) { - token, validUntil, err = b.handleObserveResponse(sentRequest) + token, validUntil, err = b.handleObserveResponse(sentRequest, exchangeKey) if err != nil { return fmt.Errorf("cannot process message: %w", err) } } - tokenStr := token.Hash() var cachedReceivedMessageGuard *messageGuard - if e := b.receivingMessagesCache.Load(tokenStr); e != nil { + if e := b.receivingMessagesCache.Load(exchangeKey); e != nil { cachedReceivedMessageGuard = e.Data() } if cachedReceivedMessageGuard == nil { @@ -789,7 +803,7 @@ func (b *BlockWise[C]) processReceivedMessage(w *responsewriter.ResponseWriter[C return nil } } - cachedReceivedMessage, closeCachedReceivedMessage, err := b.getCachedReceivedMessage(cachedReceivedMessageGuard, r, tokenStr, validUntil) + cachedReceivedMessage, closeCachedReceivedMessage, err := b.getCachedReceivedMessage(cachedReceivedMessageGuard, r, exchangeKey, validUntil) if err != nil { return err } @@ -797,7 +811,7 @@ func (b *BlockWise[C]) processReceivedMessage(w *responsewriter.ResponseWriter[C defer func(err *error) { if *err != nil { - b.receivingMessagesCache.Delete(tokenStr) + b.receivingMessagesCache.Delete(exchangeKey) } }(&err) payloadFile, payloadSize, err := b.getPayloadFromCachedReceivedMessage(r, cachedReceivedMessage) @@ -811,12 +825,12 @@ func (b *BlockWise[C]) processReceivedMessage(w *responsewriter.ResponseWriter[C return fmt.Errorf("cannot copy data to payload: %w", err) } if !more { - b.receivingMessagesCache.Delete(tokenStr) + b.receivingMessagesCache.Delete(exchangeKey) cachedReceivedMessage.Remove(blockType) cachedReceivedMessage.Remove(sizeType) cachedReceivedMessage.SetType(r.Type()) if !bytes.Equal(cachedReceivedMessage.Token(), token) { - b.sendingMessagesCache.Delete(tokenStr) + b.sendingMessagesCache.Delete(exchangeKey) } _, errS := cachedReceivedMessage.Body().Seek(0, io.SeekStart) if errS != nil { @@ -849,3 +863,34 @@ func (b *BlockWise[C]) processReceivedMessage(w *responsewriter.ResponseWriter[C w.SetMessage(sendMessage) return nil } + +// getExchangeKey returns a key for the blockwise exchange cache. +// According to RFC 7252 the token is to be treated as opaque if not created by the entity and +// according to RFC 7959 there can't be concurrent blockwise transfers for the same endpoint and ressource +// so we can use the address of the endpoint and the path of the resource as hash as a key. +func getExchangeKey(addr net.Addr, r *pool.Message) (uint64, error) { + if addr == nil { + return 0, errors.New("cannot get exchange key: addr is nil") + } + if r == nil { + return 0, errors.New("cannot get exchange key: request is nil") + } + path, err := r.Path() + if err != nil { + return 0, fmt.Errorf("cannot get path from request(%v): %w", r, err) + } + h := fnv.New64a() + _, err = h.Write([]byte(path)) + if err != nil { + return 0, fmt.Errorf("cannot write path(%v) to hash: %w", path, err) + } + _, err = h.Write([]byte("|")) + if err != nil { + return 0, fmt.Errorf("cannot write | to hash: %w", err) + } + _, err = h.Write([]byte(addr.String())) + if err != nil { + return 0, fmt.Errorf("cannot write addr(%v) to hash: %w", addr, err) + } + return h.Sum64(), nil +} diff --git a/net/blockwise/blockwise_test.go b/net/blockwise/blockwise_test.go index 70d173ff..198953be 100644 --- a/net/blockwise/blockwise_test.go +++ b/net/blockwise/blockwise_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "io" + "net" "testing" "time" @@ -69,6 +70,18 @@ func (c *testClient) ReleaseMessage(m *pool.Message) { } func makeDo[C Client](t *testing.T, sender, receiver *BlockWise[C], senderMaxSZX SZX, senderMaxMessageSize uint32, receiverMaxSZX SZX, receiverMaxMessageSize uint32, next func(*responsewriter.ResponseWriter[C], *pool.Message)) func(*pool.Message) (*pool.Message, error) { + rcvAddr := net.UDPAddr{ + IP: net.IPv4(127, 0, 0, 1), + Port: 5683, + Zone: "", + } + + sndAddr := net.UDPAddr{ + IP: net.IPv4(127, 0, 0, 1), + Port: 56833, + Zone: "", + } + return func(req *pool.Message) (*pool.Message, error) { c := make(chan *pool.Message) go func() { @@ -77,10 +90,10 @@ func makeDo[C Client](t *testing.T, sender, receiver *BlockWise[C], senderMaxSZX for { var resp *pool.Message receiverResp := responsewriter.New(receiver.cc.AcquireMessage(roReq.Context()), receiver.cc) - receiver.Handle(receiverResp, roReq, senderMaxSZX, senderMaxMessageSize, next) + receiver.Handle(receiverResp, roReq, senderMaxSZX, senderMaxMessageSize, &rcvAddr, next) t.Logf("receiver.Handle - receiverResp %v senderResp.Message: %v\n", receiverResp.Message(), roReq) senderResp := responsewriter.New(sender.cc.AcquireMessage(roReq.Context()), sender.cc) - sender.Handle(senderResp, receiverResp.Message(), receiverMaxSZX, receiverMaxMessageSize, func(_ *responsewriter.ResponseWriter[C], r *pool.Message) { + sender.Handle(senderResp, receiverResp.Message(), receiverMaxSZX, receiverMaxMessageSize, &sndAddr, func(_ *responsewriter.ResponseWriter[C], r *pool.Message) { resp = r }) t.Logf("sender.Handle - senderResp %v receiverResp.Message: %v\n", senderResp.Message(), receiverResp.Message()) @@ -489,6 +502,19 @@ func TestDecodeBlockOption(t *testing.T) { } func makeWriteReq[C Client](sender, receiver *BlockWise[C], senderMaxSZX SZX, senderMaxMessageSize uint32, receiverMaxSZX SZX, receiverMaxMessageSize uint32, next func(*responsewriter.ResponseWriter[C], *pool.Message)) func(*pool.Message) error { + + rcvAddr := net.UDPAddr{ + IP: net.IPv4(127, 0, 0, 1), + Port: 5683, + Zone: "", + } + + sndAddr := net.UDPAddr{ + IP: net.IPv4(127, 0, 0, 1), + Port: 56833, + Zone: "", + } + return func(req *pool.Message) error { c := make(chan bool, 1) go func() { @@ -496,13 +522,13 @@ func makeWriteReq[C Client](sender, receiver *BlockWise[C], senderMaxSZX SZX, se roReq = req for { receiverResp := responsewriter.New(receiver.cc.AcquireMessage(roReq.Context()), receiver.cc) - receiver.Handle(receiverResp, roReq, senderMaxSZX, senderMaxMessageSize, func(w *responsewriter.ResponseWriter[C], r *pool.Message) { + receiver.Handle(receiverResp, roReq, senderMaxSZX, senderMaxMessageSize, &rcvAddr, func(w *responsewriter.ResponseWriter[C], r *pool.Message) { defer close(c) next(w, r) }) senderResp := responsewriter.New(sender.cc.AcquireMessage(roReq.Context()), sender.cc) orig := senderResp.Message() - sender.Handle(senderResp, receiverResp.Message(), receiverMaxSZX, receiverMaxMessageSize, func(*responsewriter.ResponseWriter[C], *pool.Message) { + sender.Handle(senderResp, receiverResp.Message(), receiverMaxSZX, receiverMaxMessageSize, &sndAddr, func(*responsewriter.ResponseWriter[C], *pool.Message) { }) if orig == senderResp.Message() { select { diff --git a/tcp/client/conn.go b/tcp/client/conn.go index 018ead4d..935985ae 100644 --- a/tcp/client/conn.go +++ b/tcp/client/conn.go @@ -352,7 +352,7 @@ func (cc *Conn) blockwiseHandle(w *responsewriter.ResponseWriter[*Conn], r *pool func (cc *Conn) handle(w *responsewriter.ResponseWriter[*Conn], r *pool.Message) { if cc.blockWise != nil && cc.peerBlockWiseTranferEnabled.Load() { - cc.blockWise.Handle(w, r, cc.blockwiseSZX, cc.Session().maxMessageSize, cc.blockwiseHandle) + cc.blockWise.Handle(w, r, cc.blockwiseSZX, cc.Session().maxMessageSize, cc.RemoteAddr(), cc.blockwiseHandle) return } if h, ok := cc.tokenHandlerContainer.LoadAndDelete(r.Token().Hash()); ok { diff --git a/udp/client/conn.go b/udp/client/conn.go index 6e1703c4..56ba442d 100644 --- a/udp/client/conn.go +++ b/udp/client/conn.go @@ -663,7 +663,7 @@ func (cc *Conn) handle(w *responsewriter.ResponseWriter[*Conn], m *pool.Message) return } if cc.blockWise != nil { - cc.blockWise.Handle(w, m, cc.blockwiseSZX, cc.session.MaxMessageSize(), func(rw *responsewriter.ResponseWriter[*Conn], rm *pool.Message) { + cc.blockWise.Handle(w, m, cc.blockwiseSZX, cc.session.MaxMessageSize(), cc.RemoteAddr(), func(rw *responsewriter.ResponseWriter[*Conn], rm *pool.Message) { if h, ok := cc.tokenHandlerContainer.LoadAndDelete(rm.Token().Hash()); ok { h(rw, rm) return From 1f1722324139fab4ada823e8b7b912bb18570bb3 Mon Sep 17 00:00:00 2001 From: Theodor Rauch Date: Thu, 11 Sep 2025 10:37:41 +0200 Subject: [PATCH 2/8] fix: Only use alternative exchangeKey if no token is available --- .idea/.gitignore | 8 ++++ .idea/go-coap.iml | 13 ++++++ .idea/modules.xml | 8 ++++ .idea/vcs.xml | 6 +++ examples/simple/client/main.go | 2 +- net/blockwise/blockwise.go | 73 ++++++++++++++++++++++----------- net/blockwise/blockwise_test.go | 29 ++++++++++++- tcp/client/conn.go | 2 +- udp/client/conn.go | 2 +- 9 files changed, 113 insertions(+), 30 deletions(-) create mode 100644 .idea/.gitignore create mode 100644 .idea/go-coap.iml create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 00000000..13566b81 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/go-coap.iml b/.idea/go-coap.iml new file mode 100644 index 00000000..2d935577 --- /dev/null +++ b/.idea/go-coap.iml @@ -0,0 +1,13 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 00000000..da310041 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 00000000..35eb1ddf --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/examples/simple/client/main.go b/examples/simple/client/main.go index 35640800..8e6f45d4 100644 --- a/examples/simple/client/main.go +++ b/examples/simple/client/main.go @@ -10,7 +10,7 @@ import ( ) func main() { - co, err := udp.Dial("localhost:5688") + co, err := udp.Dial("localhost:5685") if err != nil { log.Fatalf("Error dialing: %v", err) } diff --git a/net/blockwise/blockwise.go b/net/blockwise/blockwise.go index eabec50f..1ad40f1e 100644 --- a/net/blockwise/blockwise.go +++ b/net/blockwise/blockwise.go @@ -205,7 +205,7 @@ func payloadSizeError(err error) error { } // Do sends a coap message and returns a coap response via blockwise transfer. -func (b *BlockWise[C]) Do(r *pool.Message, maxSzx SZX, maxMessageSize uint32, do func(req *pool.Message) (*pool.Message, error)) (*pool.Message, error) { +func (b *BlockWise[C]) Do(r *pool.Message, maxSzx SZX, maxMessageSize uint32, remoteAddr net.Addr, do func(req *pool.Message) (*pool.Message, error)) (*pool.Message, error) { if maxSzx > SZXBERT { return nil, errors.New("invalid szx") } @@ -217,11 +217,13 @@ func (b *BlockWise[C]) Do(r *pool.Message, maxSzx SZX, maxMessageSize uint32, do if !ok { expire = time.Now().Add(b.expiration) } - _, loaded := b.sendingMessagesCache.LoadOrStore(r.Token().Hash(), cache.NewElement(r, expire, nil)) + + exchangeKey := r.Token().Hash() + _, loaded := b.sendingMessagesCache.LoadOrStore(exchangeKey, cache.NewElement(r, expire, nil)) if loaded { return nil, errors.New("invalid token") } - defer b.sendingMessagesCache.Delete(r.Token().Hash()) + defer b.sendingMessagesCache.Delete(exchangeKey) if r.Body() == nil { return do(r) } @@ -291,7 +293,7 @@ func (b *BlockWise[C]) WriteMessage(request *pool.Message, maxSZX SZX, maxMessag } w := newWriteRequestResponse(b.cc, request) - err = b.startSendingMessage(w, maxSZX, maxMessageSize, startSendingMessageBlock) + err = b.startSendingMessage(w, maxSZX, maxMessageSize, startSendingMessageBlock, 0) if err != nil { return fmt.Errorf("cannot start writing request: %w", err) } @@ -354,10 +356,23 @@ func (b *BlockWise[C]) Handle(w *responsewriter.ResponseWriter[C], r *pool.Messa panic("invalid maxSZX") } - exchangeKey, err := getExchangeKey(remoteAddr, r) - if err != nil { - b.errors(fmt.Errorf("cannot get exchange key from request(%v): %w", r, err)) - return + path, _ := r.Path() + if len(path) == 0 { + path = "/" + print(path) + } + + var exchangeKey uint64 + if len(r.Token()) != 0 { + exchangeKey = r.Token().Hash() + } else { + var err error + exchangeKey, err = getExchangeKey(remoteAddr, r) + if err != nil { + b.errors(fmt.Errorf("cannot get exchange key from request Parameters(%v): %w", r, err)) + return + } + } token := r.Token() @@ -373,14 +388,14 @@ func (b *BlockWise[C]) Handle(w *responsewriter.ResponseWriter[C], r *pool.Messa sendingMessageCode, sendingMessageExist := b.getSendingMessageCode(exchangeKey) if !sendingMessageExist || wantsToBeReceived(r) { - err := b.handleReceivedMessage(w, r, maxSZX, maxMessageSize, remoteAddr, next) + err := b.handleReceivedMessage(w, r, maxSZX, maxMessageSize, exchangeKey, next) if err != nil { b.sendEntityIncomplete(w, token) b.errors(fmt.Errorf("handleReceivedMessage(%v): %w", r, err)) } return } - more, err := b.continueSendingMessage(w, r, maxSZX, maxMessageSize, sendingMessageCode) + more, err := b.continueSendingMessage(w, r, maxSZX, maxMessageSize, sendingMessageCode, exchangeKey) if err != nil { b.sendingMessagesCache.Delete(exchangeKey) b.errors(fmt.Errorf("continueSendingMessage(%v): %w", r, err)) @@ -392,7 +407,7 @@ func (b *BlockWise[C]) Handle(w *responsewriter.ResponseWriter[C], r *pool.Messa } } -func (b *BlockWise[C]) handleReceivedMessage(w *responsewriter.ResponseWriter[C], r *pool.Message, maxSZX SZX, maxMessageSize uint32, remoteAddr net.Addr, next func(w *responsewriter.ResponseWriter[C], r *pool.Message)) error { +func (b *BlockWise[C]) handleReceivedMessage(w *responsewriter.ResponseWriter[C], r *pool.Message, maxSZX SZX, maxMessageSize uint32, exchangeKey uint64, next func(w *responsewriter.ResponseWriter[C], r *pool.Message)) error { startSendingMessageBlock, err := EncodeBlockOption(maxSZX, 0, true) if err != nil { return fmt.Errorf("cannot encode start sending message block option(%v,%v,%v): %w", maxSZX, 0, true, err) @@ -410,18 +425,18 @@ func (b *BlockWise[C]) handleReceivedMessage(w *responsewriter.ResponseWriter[C] } case codes.POST, codes.PUT: maxSZX = fitSZX(r, message.Block1, maxSZX) - errP := b.processReceivedMessage(w, r, maxSZX, next, remoteAddr, message.Block1, message.Size1) + errP := b.processReceivedMessage(w, r, maxSZX, next, exchangeKey, message.Block1, message.Size1) if errP != nil { return errP } default: maxSZX = fitSZX(r, message.Block2, maxSZX) - errP := b.processReceivedMessage(w, r, maxSZX, next, remoteAddr, message.Block2, message.Size2) + errP := b.processReceivedMessage(w, r, maxSZX, next, exchangeKey, message.Block2, message.Size2) if errP != nil { return errP } } - return b.startSendingMessage(w, maxSZX, maxMessageSize, startSendingMessageBlock) + return b.startSendingMessage(w, maxSZX, maxMessageSize, startSendingMessageBlock, exchangeKey) } func (b *BlockWise[C]) createSendingMessage(sendingMessage *pool.Message, maxSZX SZX, maxMessageSize uint32, block uint32) (sendMessage *pool.Message, more bool, err error) { @@ -505,7 +520,7 @@ func (b *BlockWise[C]) createSendingMessage(sendingMessage *pool.Message, maxSZX return sendMessage, more, nil } -func (b *BlockWise[C]) continueSendingMessage(w *responsewriter.ResponseWriter[C], r *pool.Message, maxSZX SZX, maxMessageSize uint32, sendingMessageCode codes.Code /* msg *pool.Message*/) (bool, error) { +func (b *BlockWise[C]) continueSendingMessage(w *responsewriter.ResponseWriter[C], r *pool.Message, maxSZX SZX, maxMessageSize uint32, sendingMessageCode codes.Code /* msg *pool.Message*/, exchangeKey uint64) (bool, error) { blockType := message.Block2 switch sendingMessageCode { case codes.POST, codes.PUT: @@ -518,7 +533,7 @@ func (b *BlockWise[C]) continueSendingMessage(w *responsewriter.ResponseWriter[C } var sendMessage *pool.Message var more bool - b.sendingMessagesCache.LoadWithFunc(r.Token().Hash(), func(value *cache.Element[*pool.Message]) *cache.Element[*pool.Message] { + b.sendingMessagesCache.LoadWithFunc(exchangeKey, func(value *cache.Element[*pool.Message]) *cache.Element[*pool.Message] { sendMessage, more, err = b.createSendingMessage(value.Data(), maxSZX, maxMessageSize, block) if err != nil { err = fmt.Errorf("cannot create sending message: %w", err) @@ -543,7 +558,7 @@ func isObserveResponse(msg *pool.Message) bool { return msg.Code() >= codes.Created } -func (b *BlockWise[C]) startSendingMessage(w *responsewriter.ResponseWriter[C], maxSZX SZX, maxMessageSize uint32, block uint32) error { +func (b *BlockWise[C]) startSendingMessage(w *responsewriter.ResponseWriter[C], maxSZX SZX, maxMessageSize uint32, block uint32, exchangeKey uint64) error { payloadSize, err := w.Message().BodySize() if err != nil { return payloadSizeError(err) @@ -566,7 +581,12 @@ func (b *BlockWise[C]) startSendingMessage(w *responsewriter.ResponseWriter[C], if !ok { expire = time.Now().Add(b.expiration) } - el, loaded := b.sendingMessagesCache.LoadOrStore(sendingMessage.Token().Hash(), cache.NewElement(originalSendingMessage, expire, nil)) + + if exchangeKey == 0 { + exchangeKey = sendingMessage.Token().Hash() + } + + el, loaded := b.sendingMessagesCache.LoadOrStore(exchangeKey, cache.NewElement(originalSendingMessage, expire, nil)) if loaded { defer b.cc.ReleaseMessage(originalSendingMessage) return fmt.Errorf("cannot add message (%v) to sending message cache: message(%v) with token(%v) already exist", originalSendingMessage, el.Data(), sendingMessage.Token()) @@ -748,7 +768,7 @@ func (b *BlockWise[C]) getCachedReceivedMessage(mg *messageGuard, r *pool.Messag } //nolint:gocyclo,gocognit -func (b *BlockWise[C]) processReceivedMessage(w *responsewriter.ResponseWriter[C], r *pool.Message, maxSzx SZX, next func(w *responsewriter.ResponseWriter[C], r *pool.Message), remoteAddr net.Addr, blockType message.OptionID, sizeType message.OptionID) error { +func (b *BlockWise[C]) processReceivedMessage(w *responsewriter.ResponseWriter[C], r *pool.Message, maxSzx SZX, next func(w *responsewriter.ResponseWriter[C], r *pool.Message), exchangeKey uint64, blockType message.OptionID, sizeType message.OptionID) error { token := r.Token() if len(token) == 0 { next(w, r) @@ -771,11 +791,6 @@ func (b *BlockWise[C]) processReceivedMessage(w *responsewriter.ResponseWriter[C return fmt.Errorf("cannot decode block option: %w", err) } - exchangeKey, err := getExchangeKey(remoteAddr, r) - if err != nil { - return fmt.Errorf("cannot get exchange key from request(%v): %w", r, err) - } - sentRequest := b.getSentRequest(exchangeKey, token) if sentRequest != nil { defer b.cc.ReleaseMessage(sentRequest) @@ -866,9 +881,11 @@ func (b *BlockWise[C]) processReceivedMessage(w *responsewriter.ResponseWriter[C // getExchangeKey returns a key for the blockwise exchange cache. // According to RFC 7252 the token is to be treated as opaque if not created by the entity and -// according to RFC 7959 there can't be concurrent blockwise transfers for the same endpoint and ressource +// according to RFC 7959 there can't be concurrent blockwise transfers for the same endpoint and resource // so we can use the address of the endpoint and the path of the resource as hash as a key. func getExchangeKey(addr net.Addr, r *pool.Message) (uint64, error) { + //TODO: Possibly seperate hash domains of token hashes and hashes from path, and address, to avoid systematic overlaps. + if addr == nil { return 0, errors.New("cannot get exchange key: addr is nil") } @@ -879,7 +896,13 @@ func getExchangeKey(addr net.Addr, r *pool.Message) (uint64, error) { if err != nil { return 0, fmt.Errorf("cannot get path from request(%v): %w", r, err) } + code := r.Code() + h := fnv.New64a() + _, err = h.Write([]byte(fmt.Sprintf("%d|", code))) + if err != nil { + return 0, fmt.Errorf("cannot write code(%v) to hash: %w", code, err) + } _, err = h.Write([]byte(path)) if err != nil { return 0, fmt.Errorf("cannot write path(%v) to hash: %w", path, err) diff --git a/net/blockwise/blockwise_test.go b/net/blockwise/blockwise_test.go index 198953be..01a7dd22 100644 --- a/net/blockwise/blockwise_test.go +++ b/net/blockwise/blockwise_test.go @@ -24,6 +24,7 @@ type testmessage struct { options message.Options payload io.ReadSeeker sequence uint64 + path string } func toPoolMessage(m *testmessage) *pool.Message { @@ -33,6 +34,7 @@ func toPoolMessage(m *testmessage) *pool.Message { msg.ResetOptionsTo(m.options) msg.SetBody(m.payload) msg.SetSequence(m.sequence) + msg.SetPath(m.path) return msg } @@ -41,6 +43,9 @@ func fromPoolMessage(m *pool.Message) *testmessage { if len(opts) == 0 { opts = nil } + + path, _ := m.Path() + return &testmessage{ code: m.Code(), ctx: m.Context(), @@ -48,6 +53,7 @@ func fromPoolMessage(m *pool.Message) *testmessage { options: opts, payload: m.Body(), sequence: m.Sequence(), + path: path, } } @@ -112,6 +118,13 @@ func makeDo[C Client](t *testing.T, sender, receiver *BlockWise[C], senderMaxSZX func TestBlockWiseDo(t *testing.T) { sender := New(newTestClient(), time.Second*3600, func(err error) { t.Log(err) }, nil) receiver := New(newTestClient(), time.Second*3600, func(err error) { t.Log(err) }, nil) + + rcvAddr := net.UDPAddr{ + IP: net.IPv4(127, 0, 0, 1), + Port: 5683, + Zone: "", + } + type args struct { r *testmessage szx SZX @@ -312,7 +325,7 @@ func TestBlockWiseDo(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := sender.Do(toPoolMessage(tt.args.r), tt.args.szx, uint32(tt.args.maxMessageSize), tt.args.do) + got, err := sender.Do(toPoolMessage(tt.args.r), tt.args.szx, uint32(tt.args.maxMessageSize), &rcvAddr, tt.args.do) if tt.wantErr { require.Error(t, err) return @@ -552,6 +565,18 @@ func makeWriteReq[C Client](sender, receiver *BlockWise[C], senderMaxSZX SZX, se } func TestBlockWiseWriteTestMessage(t *testing.T) { + rcvAddr := net.UDPAddr{ + IP: net.IPv4(127, 0, 0, 1), + Port: 5683, + Zone: "", + } + + //sndAddr := net.UDPAddr{ + // IP: net.IPv4(127, 0, 0, 1), + // Port: 56833, + // Zone: "", + //} + sender := New(newTestClient(), time.Second*3600, func(err error) { t.Log(err) }, nil) receiver := New(newTestClient(), time.Second*3600, func(err error) { t.Log(err) }, nil) type args struct { @@ -632,7 +657,7 @@ func TestBlockWiseWriteTestMessage(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - err := sender.WriteMessage(toPoolMessage(tt.args.r), tt.args.szx, uint32(tt.args.maxMessageSize), tt.args.writetestmessage) + err := sender.WriteMessage(toPoolMessage(tt.args.r), tt.args.szx, uint32(tt.args.maxMessageSize), &rcvAddr, tt.args.writetestmessage) if tt.wantErr { require.Error(t, err) return diff --git a/tcp/client/conn.go b/tcp/client/conn.go index 935985ae..563f48d0 100644 --- a/tcp/client/conn.go +++ b/tcp/client/conn.go @@ -207,7 +207,7 @@ func (cc *Conn) do(req *pool.Message) (*pool.Message, error) { if !cc.peerBlockWiseTranferEnabled.Load() || cc.blockWise == nil { return cc.doInternal(req) } - resp, err := cc.blockWise.Do(req, cc.blockwiseSZX, cc.session.maxMessageSize, cc.doInternal) + resp, err := cc.blockWise.Do(req, cc.blockwiseSZX, cc.session.maxMessageSize, cc.RemoteAddr(), cc.doInternal) if err != nil { return nil, err } diff --git a/udp/client/conn.go b/udp/client/conn.go index 56ba442d..4d1d0dab 100644 --- a/udp/client/conn.go +++ b/udp/client/conn.go @@ -440,7 +440,7 @@ func (cc *Conn) do(req *pool.Message) (*pool.Message, error) { if cc.blockWise == nil { return cc.doInternal(req) } - resp, err := cc.blockWise.Do(req, cc.blockwiseSZX, cc.session.MaxMessageSize(), func(bwReq *pool.Message) (*pool.Message, error) { + resp, err := cc.blockWise.Do(req, cc.blockwiseSZX, cc.session.MaxMessageSize(), cc.RemoteAddr(), func(bwReq *pool.Message) (*pool.Message, error) { if bwReq.Options().HasOption(message.Block1) || bwReq.Options().HasOption(message.Block2) { bwReq.SetMessageID(cc.GetMessageID()) } From 74b9347e4d13a899151d5726f61c3b6b0028c84c Mon Sep 17 00:00:00 2001 From: Theodor Rauch Date: Thu, 11 Sep 2025 13:21:34 +0200 Subject: [PATCH 3/8] revert blockwise_test to master as the changes were based on guessing. unit-tests will be implemented later --- net/blockwise/blockwise_test.go | 63 ++++----------------------------- 1 file changed, 6 insertions(+), 57 deletions(-) diff --git a/net/blockwise/blockwise_test.go b/net/blockwise/blockwise_test.go index 01a7dd22..70d173ff 100644 --- a/net/blockwise/blockwise_test.go +++ b/net/blockwise/blockwise_test.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "io" - "net" "testing" "time" @@ -24,7 +23,6 @@ type testmessage struct { options message.Options payload io.ReadSeeker sequence uint64 - path string } func toPoolMessage(m *testmessage) *pool.Message { @@ -34,7 +32,6 @@ func toPoolMessage(m *testmessage) *pool.Message { msg.ResetOptionsTo(m.options) msg.SetBody(m.payload) msg.SetSequence(m.sequence) - msg.SetPath(m.path) return msg } @@ -43,9 +40,6 @@ func fromPoolMessage(m *pool.Message) *testmessage { if len(opts) == 0 { opts = nil } - - path, _ := m.Path() - return &testmessage{ code: m.Code(), ctx: m.Context(), @@ -53,7 +47,6 @@ func fromPoolMessage(m *pool.Message) *testmessage { options: opts, payload: m.Body(), sequence: m.Sequence(), - path: path, } } @@ -76,18 +69,6 @@ func (c *testClient) ReleaseMessage(m *pool.Message) { } func makeDo[C Client](t *testing.T, sender, receiver *BlockWise[C], senderMaxSZX SZX, senderMaxMessageSize uint32, receiverMaxSZX SZX, receiverMaxMessageSize uint32, next func(*responsewriter.ResponseWriter[C], *pool.Message)) func(*pool.Message) (*pool.Message, error) { - rcvAddr := net.UDPAddr{ - IP: net.IPv4(127, 0, 0, 1), - Port: 5683, - Zone: "", - } - - sndAddr := net.UDPAddr{ - IP: net.IPv4(127, 0, 0, 1), - Port: 56833, - Zone: "", - } - return func(req *pool.Message) (*pool.Message, error) { c := make(chan *pool.Message) go func() { @@ -96,10 +77,10 @@ func makeDo[C Client](t *testing.T, sender, receiver *BlockWise[C], senderMaxSZX for { var resp *pool.Message receiverResp := responsewriter.New(receiver.cc.AcquireMessage(roReq.Context()), receiver.cc) - receiver.Handle(receiverResp, roReq, senderMaxSZX, senderMaxMessageSize, &rcvAddr, next) + receiver.Handle(receiverResp, roReq, senderMaxSZX, senderMaxMessageSize, next) t.Logf("receiver.Handle - receiverResp %v senderResp.Message: %v\n", receiverResp.Message(), roReq) senderResp := responsewriter.New(sender.cc.AcquireMessage(roReq.Context()), sender.cc) - sender.Handle(senderResp, receiverResp.Message(), receiverMaxSZX, receiverMaxMessageSize, &sndAddr, func(_ *responsewriter.ResponseWriter[C], r *pool.Message) { + sender.Handle(senderResp, receiverResp.Message(), receiverMaxSZX, receiverMaxMessageSize, func(_ *responsewriter.ResponseWriter[C], r *pool.Message) { resp = r }) t.Logf("sender.Handle - senderResp %v receiverResp.Message: %v\n", senderResp.Message(), receiverResp.Message()) @@ -118,13 +99,6 @@ func makeDo[C Client](t *testing.T, sender, receiver *BlockWise[C], senderMaxSZX func TestBlockWiseDo(t *testing.T) { sender := New(newTestClient(), time.Second*3600, func(err error) { t.Log(err) }, nil) receiver := New(newTestClient(), time.Second*3600, func(err error) { t.Log(err) }, nil) - - rcvAddr := net.UDPAddr{ - IP: net.IPv4(127, 0, 0, 1), - Port: 5683, - Zone: "", - } - type args struct { r *testmessage szx SZX @@ -325,7 +299,7 @@ func TestBlockWiseDo(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := sender.Do(toPoolMessage(tt.args.r), tt.args.szx, uint32(tt.args.maxMessageSize), &rcvAddr, tt.args.do) + got, err := sender.Do(toPoolMessage(tt.args.r), tt.args.szx, uint32(tt.args.maxMessageSize), tt.args.do) if tt.wantErr { require.Error(t, err) return @@ -515,19 +489,6 @@ func TestDecodeBlockOption(t *testing.T) { } func makeWriteReq[C Client](sender, receiver *BlockWise[C], senderMaxSZX SZX, senderMaxMessageSize uint32, receiverMaxSZX SZX, receiverMaxMessageSize uint32, next func(*responsewriter.ResponseWriter[C], *pool.Message)) func(*pool.Message) error { - - rcvAddr := net.UDPAddr{ - IP: net.IPv4(127, 0, 0, 1), - Port: 5683, - Zone: "", - } - - sndAddr := net.UDPAddr{ - IP: net.IPv4(127, 0, 0, 1), - Port: 56833, - Zone: "", - } - return func(req *pool.Message) error { c := make(chan bool, 1) go func() { @@ -535,13 +496,13 @@ func makeWriteReq[C Client](sender, receiver *BlockWise[C], senderMaxSZX SZX, se roReq = req for { receiverResp := responsewriter.New(receiver.cc.AcquireMessage(roReq.Context()), receiver.cc) - receiver.Handle(receiverResp, roReq, senderMaxSZX, senderMaxMessageSize, &rcvAddr, func(w *responsewriter.ResponseWriter[C], r *pool.Message) { + receiver.Handle(receiverResp, roReq, senderMaxSZX, senderMaxMessageSize, func(w *responsewriter.ResponseWriter[C], r *pool.Message) { defer close(c) next(w, r) }) senderResp := responsewriter.New(sender.cc.AcquireMessage(roReq.Context()), sender.cc) orig := senderResp.Message() - sender.Handle(senderResp, receiverResp.Message(), receiverMaxSZX, receiverMaxMessageSize, &sndAddr, func(*responsewriter.ResponseWriter[C], *pool.Message) { + sender.Handle(senderResp, receiverResp.Message(), receiverMaxSZX, receiverMaxMessageSize, func(*responsewriter.ResponseWriter[C], *pool.Message) { }) if orig == senderResp.Message() { select { @@ -565,18 +526,6 @@ func makeWriteReq[C Client](sender, receiver *BlockWise[C], senderMaxSZX SZX, se } func TestBlockWiseWriteTestMessage(t *testing.T) { - rcvAddr := net.UDPAddr{ - IP: net.IPv4(127, 0, 0, 1), - Port: 5683, - Zone: "", - } - - //sndAddr := net.UDPAddr{ - // IP: net.IPv4(127, 0, 0, 1), - // Port: 56833, - // Zone: "", - //} - sender := New(newTestClient(), time.Second*3600, func(err error) { t.Log(err) }, nil) receiver := New(newTestClient(), time.Second*3600, func(err error) { t.Log(err) }, nil) type args struct { @@ -657,7 +606,7 @@ func TestBlockWiseWriteTestMessage(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - err := sender.WriteMessage(toPoolMessage(tt.args.r), tt.args.szx, uint32(tt.args.maxMessageSize), &rcvAddr, tt.args.writetestmessage) + err := sender.WriteMessage(toPoolMessage(tt.args.r), tt.args.szx, uint32(tt.args.maxMessageSize), tt.args.writetestmessage) if tt.wantErr { require.Error(t, err) return From 48c278fadb21ce1be6517fb7484137b548a1e855 Mon Sep 17 00:00:00 2001 From: Theodor Rauch Date: Thu, 11 Sep 2025 14:12:18 +0200 Subject: [PATCH 4/8] fix: remove .idea folder from git tracking --- .gitignore | 2 +- .idea/.gitignore | 8 -------- .idea/go-coap.iml | 13 ------------- .idea/modules.xml | 8 -------- .idea/vcs.xml | 6 ------ 5 files changed, 1 insertion(+), 36 deletions(-) delete mode 100644 .idea/.gitignore delete mode 100644 .idea/go-coap.iml delete mode 100644 .idea/modules.xml delete mode 100644 .idea/vcs.xml diff --git a/.gitignore b/.gitignore index 7c18b313..16611204 100644 --- a/.gitignore +++ b/.gitignore @@ -11,7 +11,7 @@ client !client/ vendor/ v3/ - +/.idea/ # Test binary, build with `go test -c` *.test diff --git a/.idea/.gitignore b/.idea/.gitignore deleted file mode 100644 index 13566b81..00000000 --- a/.idea/.gitignore +++ /dev/null @@ -1,8 +0,0 @@ -# Default ignored files -/shelf/ -/workspace.xml -# Editor-based HTTP Client requests -/httpRequests/ -# Datasource local storage ignored files -/dataSources/ -/dataSources.local.xml diff --git a/.idea/go-coap.iml b/.idea/go-coap.iml deleted file mode 100644 index 2d935577..00000000 --- a/.idea/go-coap.iml +++ /dev/null @@ -1,13 +0,0 @@ - - - - - - - - - - - - \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml deleted file mode 100644 index da310041..00000000 --- a/.idea/modules.xml +++ /dev/null @@ -1,8 +0,0 @@ - - - - - - - - \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml deleted file mode 100644 index 35eb1ddf..00000000 --- a/.idea/vcs.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - - \ No newline at end of file From a4ccff68c0f988ab4d64e46059169f46c6085a23 Mon Sep 17 00:00:00 2001 From: Theodor Rauch Date: Thu, 11 Sep 2025 14:15:46 +0200 Subject: [PATCH 5/8] fix: remove debug statement --- net/blockwise/blockwise.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/net/blockwise/blockwise.go b/net/blockwise/blockwise.go index 1ad40f1e..642370f5 100644 --- a/net/blockwise/blockwise.go +++ b/net/blockwise/blockwise.go @@ -356,12 +356,6 @@ func (b *BlockWise[C]) Handle(w *responsewriter.ResponseWriter[C], r *pool.Messa panic("invalid maxSZX") } - path, _ := r.Path() - if len(path) == 0 { - path = "/" - print(path) - } - var exchangeKey uint64 if len(r.Token()) != 0 { exchangeKey = r.Token().Hash() From 98d33e72ddc14e59ea38ba2e94f1d21731410da5 Mon Sep 17 00:00:00 2001 From: Theodor Rauch Date: Thu, 11 Sep 2025 14:16:24 +0200 Subject: [PATCH 6/8] fix: remove comment --- net/blockwise/blockwise.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/net/blockwise/blockwise.go b/net/blockwise/blockwise.go index 642370f5..836472ec 100644 --- a/net/blockwise/blockwise.go +++ b/net/blockwise/blockwise.go @@ -370,15 +370,6 @@ func (b *BlockWise[C]) Handle(w *responsewriter.ResponseWriter[C], r *pool.Messa } token := r.Token() - // - //if len(token) == 0 { - // err := b.handleReceivedMessage(w, r, maxSZX, maxMessageSize, next) - // if err != nil { - // b.sendEntityIncomplete(w, token) - // b.errors(fmt.Errorf("handleReceivedMessage(%v): %w", r, err)) - // } - // return - //} sendingMessageCode, sendingMessageExist := b.getSendingMessageCode(exchangeKey) if !sendingMessageExist || wantsToBeReceived(r) { From 8a3f4a07319464d2780e879321a5a29f34d4903f Mon Sep 17 00:00:00 2001 From: Theodor Rauch Date: Thu, 11 Sep 2025 14:18:27 +0200 Subject: [PATCH 7/8] fix: more efficient hash generation --- net/blockwise/blockwise.go | 20 ++++---------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/net/blockwise/blockwise.go b/net/blockwise/blockwise.go index 836472ec..de09138e 100644 --- a/net/blockwise/blockwise.go +++ b/net/blockwise/blockwise.go @@ -884,21 +884,9 @@ func getExchangeKey(addr net.Addr, r *pool.Message) (uint64, error) { code := r.Code() h := fnv.New64a() - _, err = h.Write([]byte(fmt.Sprintf("%d|", code))) - if err != nil { - return 0, fmt.Errorf("cannot write code(%v) to hash: %w", code, err) - } - _, err = h.Write([]byte(path)) - if err != nil { - return 0, fmt.Errorf("cannot write path(%v) to hash: %w", path, err) - } - _, err = h.Write([]byte("|")) - if err != nil { - return 0, fmt.Errorf("cannot write | to hash: %w", err) - } - _, err = h.Write([]byte(addr.String())) - if err != nil { - return 0, fmt.Errorf("cannot write addr(%v) to hash: %w", addr, err) - } + h.Write([]byte{byte(code >> 8), byte(code), '|'}) + h.Write([]byte(path)) + h.Write([]byte{'|'}) + h.Write([]byte(addr.String())) return h.Sum64(), nil } From b4eb8a24cfa1e5f3bdff061b7d8c22986d9f4899 Mon Sep 17 00:00:00 2001 From: Theodor Rauch Date: Mon, 15 Sep 2025 13:12:10 +0200 Subject: [PATCH 8/8] fix: only one request per endpoint (peer) --- net/blockwise/blockwise.go | 21 ++++----------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/net/blockwise/blockwise.go b/net/blockwise/blockwise.go index de09138e..223ddf96 100644 --- a/net/blockwise/blockwise.go +++ b/net/blockwise/blockwise.go @@ -361,7 +361,7 @@ func (b *BlockWise[C]) Handle(w *responsewriter.ResponseWriter[C], r *pool.Messa exchangeKey = r.Token().Hash() } else { var err error - exchangeKey, err = getExchangeKey(remoteAddr, r) + exchangeKey, err = getExchangeKey(remoteAddr) if err != nil { b.errors(fmt.Errorf("cannot get exchange key from request Parameters(%v): %w", r, err)) return @@ -865,28 +865,15 @@ func (b *BlockWise[C]) processReceivedMessage(w *responsewriter.ResponseWriter[C } // getExchangeKey returns a key for the blockwise exchange cache. -// According to RFC 7252 the token is to be treated as opaque if not created by the entity and -// according to RFC 7959 there can't be concurrent blockwise transfers for the same endpoint and resource -// so we can use the address of the endpoint and the path of the resource as hash as a key. -func getExchangeKey(addr net.Addr, r *pool.Message) (uint64, error) { - //TODO: Possibly seperate hash domains of token hashes and hashes from path, and address, to avoid systematic overlaps. +// According to RFC 7252: "[...] An empty token value is appropriate e.g., when no other tokens are in use to a destination" +// so we need to generate a key from the remote address of the corresponding endpoint. +func getExchangeKey(addr net.Addr) (uint64, error) { if addr == nil { return 0, errors.New("cannot get exchange key: addr is nil") } - if r == nil { - return 0, errors.New("cannot get exchange key: request is nil") - } - path, err := r.Path() - if err != nil { - return 0, fmt.Errorf("cannot get path from request(%v): %w", r, err) - } - code := r.Code() h := fnv.New64a() - h.Write([]byte{byte(code >> 8), byte(code), '|'}) - h.Write([]byte(path)) - h.Write([]byte{'|'}) h.Write([]byte(addr.String())) return h.Sum64(), nil }