From 07ce43b97109f5ae583a6f661b47e592a0324b34 Mon Sep 17 00:00:00 2001 From: Robert Fritzsche Date: Thu, 14 Sep 2017 19:47:47 +0200 Subject: [PATCH 1/4] feat(tcpclient): solar edge PV inverter almost always won't respond to the first ACK request, but connecting request come through --- tcpclient.go | 57 +++++++++++++++++++++++++++++++--------------------- 1 file changed, 34 insertions(+), 23 deletions(-) diff --git a/tcpclient.go b/tcpclient.go index 4e53c73..08d56aa 100644 --- a/tcpclient.go +++ b/tcpclient.go @@ -134,6 +134,8 @@ type tcpTransporter struct { Timeout time.Duration // Idle timeout to close the connection IdleTimeout time.Duration + // Recovery timeout if tcp communication misbehaves + LinkRecoveryTimeout time.Duration // Transmission logger Logger *log.Logger @@ -149,30 +151,39 @@ func (mb *tcpTransporter) Send(aduRequest []byte) (aduResponse []byte, err error mb.mu.Lock() defer mb.mu.Unlock() - // Establish a new connection if not connected - if err = mb.connect(); err != nil { - return - } - // Set timer to close when idle - mb.lastActivity = time.Now() - mb.startCloseTimer() - // Set write and read timeout - var timeout time.Time - if mb.Timeout > 0 { - timeout = mb.lastActivity.Add(mb.Timeout) - } - if err = mb.conn.SetDeadline(timeout); err != nil { - return - } - // Send data - mb.logf("modbus: sending % x", aduRequest) - if _, err = mb.conn.Write(aduRequest); err != nil { - return - } - // Read header first var data [tcpMaxLength]byte - if _, err = io.ReadFull(mb.conn, data[:tcpHeaderSize]); err != nil { - return + recoveryStart := time.Now() + for { + // Establish a new connection if not connected + if err = mb.connect(); err != nil { + return + } + // Set timer to close when idle + mb.lastActivity = time.Now() + mb.startCloseTimer() + // Set write and read timeout + var timeout time.Time + if mb.Timeout > 0 { + timeout = mb.lastActivity.Add(mb.Timeout) + } + if err = mb.conn.SetDeadline(timeout); err != nil { + return + } + // Send data + mb.logf("modbus: sending % x", aduRequest) + if _, err = mb.conn.Write(aduRequest); err != nil { + return + } + // Read header first + if _, err = io.ReadFull(mb.conn, data[:tcpHeaderSize]); err == nil { + break + // Read attempt failed + } else if err != io.EOF || mb.LinkRecoveryTimeout == 0 || time.Now().Sub(recoveryStart) > mb.IdleTimeout { + return + } + + mb.close() + time.Sleep(mb.LinkRecoveryTimeout) } // Read length, ignore transaction & protocol id (4 bytes) length := int(binary.BigEndian.Uint16(data[4:])) From 64f713f8c20c2df1e0e2d67739a11a94986561df Mon Sep 17 00:00:00 2001 From: Robert Fritzsche Date: Sat, 16 Sep 2017 10:06:06 +0200 Subject: [PATCH 2/4] feat(tcpclient): add also partial reads (incomplete tcp headers) to the retry reason, further add some logging --- tcpclient.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tcpclient.go b/tcpclient.go index 08d56aa..3ddf67c 100644 --- a/tcpclient.go +++ b/tcpclient.go @@ -178,9 +178,11 @@ func (mb *tcpTransporter) Send(aduRequest []byte) (aduResponse []byte, err error if _, err = io.ReadFull(mb.conn, data[:tcpHeaderSize]); err == nil { break // Read attempt failed - } else if err != io.EOF || mb.LinkRecoveryTimeout == 0 || time.Now().Sub(recoveryStart) > mb.IdleTimeout { + } else if (err != io.EOF && err != io.ErrUnexpectedEOF) || + mb.LinkRecoveryTimeout == 0 || time.Now().Sub(recoveryStart) > mb.IdleTimeout { return } + mb.logf("modbus: close connection and retry, because of %v", err) mb.close() time.Sleep(mb.LinkRecoveryTimeout) From 06b8e527a02e5868cf070b6dc2fc5ab645abdd63 Mon Sep 17 00:00:00 2001 From: Robert Fritzsche Date: Wed, 20 Sep 2017 08:25:33 +0200 Subject: [PATCH 3/4] feat(tcpclient): move protocol recovery into the lib --- tcpclient.go | 88 ++++++++++++++++++++++++++++++++-------------------- 1 file changed, 54 insertions(+), 34 deletions(-) diff --git a/tcpclient.go b/tcpclient.go index 3ddf67c..8b78c9a 100644 --- a/tcpclient.go +++ b/tcpclient.go @@ -83,27 +83,8 @@ func (mb *tcpPackager) Encode(pdu *ProtocolDataUnit) (adu []byte, err error) { } // Verify confirms transaction, protocol and unit id. -func (mb *tcpPackager) Verify(aduRequest []byte, aduResponse []byte) (err error) { - // Transaction id - responseVal := binary.BigEndian.Uint16(aduResponse) - requestVal := binary.BigEndian.Uint16(aduRequest) - if responseVal != requestVal { - err = fmt.Errorf("modbus: response transaction id '%v' does not match request '%v'", responseVal, requestVal) - return - } - // Protocol id - responseVal = binary.BigEndian.Uint16(aduResponse[2:]) - requestVal = binary.BigEndian.Uint16(aduRequest[2:]) - if responseVal != requestVal { - err = fmt.Errorf("modbus: response protocol id '%v' does not match request '%v'", responseVal, requestVal) - return - } - // Unit id (1 byte) - if aduResponse[6] != aduRequest[6] { - err = fmt.Errorf("modbus: response unit id '%v' does not match request '%v'", aduResponse[6], aduRequest[6]) - return - } - return +func (mb *tcpPackager) Verify(aduRequest []byte, aduResponse []byte) error { + return verify(aduRequest, aduResponse) } // Decode extracts PDU from TCP frame: @@ -136,6 +117,8 @@ type tcpTransporter struct { IdleTimeout time.Duration // Recovery timeout if tcp communication misbehaves LinkRecoveryTimeout time.Duration + // Recovery timeout if the protocol is malformed, e.g. wrong transaction ID + ProtocolRecoveryTimeout time.Duration // Transmission logger Logger *log.Logger @@ -152,12 +135,13 @@ func (mb *tcpTransporter) Send(aduRequest []byte) (aduResponse []byte, err error defer mb.mu.Unlock() var data [tcpMaxLength]byte - recoveryStart := time.Now() + recoveryDeadline := time.Now().Add(mb.IdleTimeout) + + // Establish a new connection if not connected + if err = mb.connect(); err != nil { + return + } for { - // Establish a new connection if not connected - if err = mb.connect(); err != nil { - return - } // Set timer to close when idle mb.lastActivity = time.Now() mb.startCloseTimer() @@ -176,36 +160,72 @@ func (mb *tcpTransporter) Send(aduRequest []byte) (aduResponse []byte, err error } // Read header first if _, err = io.ReadFull(mb.conn, data[:tcpHeaderSize]); err == nil { - break + aduResponse, err = mb.processResponse(data[:]) + if err == nil && mb.ProtocolRecoveryTimeout > 0 && time.Until(recoveryDeadline) > 0 && + verify(aduRequest, aduResponse) != nil { + continue + } + mb.logf("modbus: received % x\n", aduResponse) + return // Read attempt failed } else if (err != io.EOF && err != io.ErrUnexpectedEOF) || - mb.LinkRecoveryTimeout == 0 || time.Now().Sub(recoveryStart) > mb.IdleTimeout { + mb.LinkRecoveryTimeout == 0 || time.Until(recoveryDeadline) < 0 { return } mb.logf("modbus: close connection and retry, because of %v", err) mb.close() time.Sleep(mb.LinkRecoveryTimeout) + + // Establish a new connection if not connected + if err = mb.connect(); err != nil { + return + } } +} + +func (mb *tcpTransporter) processResponse(aduRequest []byte) (aduResponse []byte, err error) { // Read length, ignore transaction & protocol id (4 bytes) - length := int(binary.BigEndian.Uint16(data[4:])) + length := int(binary.BigEndian.Uint16(aduRequest[4:])) if length <= 0 { - mb.flush(data[:]) + mb.flush(aduRequest[:]) err = fmt.Errorf("modbus: length in response header '%v' must not be zero", length) return } if length > (tcpMaxLength - (tcpHeaderSize - 1)) { - mb.flush(data[:]) + mb.flush(aduRequest[:]) err = fmt.Errorf("modbus: length in response header '%v' must not greater than '%v'", length, tcpMaxLength-tcpHeaderSize+1) return } // Skip unit id length += tcpHeaderSize - 1 - if _, err = io.ReadFull(mb.conn, data[tcpHeaderSize:length]); err != nil { + if _, err = io.ReadFull(mb.conn, aduRequest[tcpHeaderSize:length]); err != nil { + return + } + aduResponse = aduRequest[:length] + return +} + +func verify(aduRequest []byte, aduResponse []byte) (err error) { + // Transaction id + responseVal := binary.BigEndian.Uint16(aduResponse) + requestVal := binary.BigEndian.Uint16(aduRequest) + if responseVal != requestVal { + err = fmt.Errorf("modbus: response transaction id '%v' does not match request '%v'", responseVal, requestVal) + return + } + // Protocol id + responseVal = binary.BigEndian.Uint16(aduResponse[2:]) + requestVal = binary.BigEndian.Uint16(aduRequest[2:]) + if responseVal != requestVal { + err = fmt.Errorf("modbus: response protocol id '%v' does not match request '%v'", responseVal, requestVal) + return + } + // Unit id (1 byte) + if aduResponse[6] != aduRequest[6] { + err = fmt.Errorf("modbus: response unit id '%v' does not match request '%v'", aduResponse[6], aduRequest[6]) return } - aduResponse = data[:length] - mb.logf("modbus: received % x\n", aduResponse) return } From 7302e1e20f55e3d2bff0b286e4ffaa49037477fe Mon Sep 17 00:00:00 2001 From: Robert Fritzsche Date: Fri, 29 Sep 2017 07:14:43 +0200 Subject: [PATCH 4/4] fix(tcpclient): review feedback, rename aduRequest to data for processData and replace time.Until to make it buildable with older go versions --- tcpclient.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tcpclient.go b/tcpclient.go index 8b78c9a..cfe51d9 100644 --- a/tcpclient.go +++ b/tcpclient.go @@ -161,7 +161,7 @@ func (mb *tcpTransporter) Send(aduRequest []byte) (aduResponse []byte, err error // Read header first if _, err = io.ReadFull(mb.conn, data[:tcpHeaderSize]); err == nil { aduResponse, err = mb.processResponse(data[:]) - if err == nil && mb.ProtocolRecoveryTimeout > 0 && time.Until(recoveryDeadline) > 0 && + if err == nil && mb.ProtocolRecoveryTimeout > 0 && recoveryDeadline.Sub(time.Now()) > 0 && verify(aduRequest, aduResponse) != nil { continue } @@ -169,7 +169,7 @@ func (mb *tcpTransporter) Send(aduRequest []byte) (aduResponse []byte, err error return // Read attempt failed } else if (err != io.EOF && err != io.ErrUnexpectedEOF) || - mb.LinkRecoveryTimeout == 0 || time.Until(recoveryDeadline) < 0 { + mb.LinkRecoveryTimeout == 0 || recoveryDeadline.Sub(time.Now()) < 0 { return } mb.logf("modbus: close connection and retry, because of %v", err) @@ -184,25 +184,25 @@ func (mb *tcpTransporter) Send(aduRequest []byte) (aduResponse []byte, err error } } -func (mb *tcpTransporter) processResponse(aduRequest []byte) (aduResponse []byte, err error) { +func (mb *tcpTransporter) processResponse(data []byte) (aduResponse []byte, err error) { // Read length, ignore transaction & protocol id (4 bytes) - length := int(binary.BigEndian.Uint16(aduRequest[4:])) + length := int(binary.BigEndian.Uint16(data[4:])) if length <= 0 { - mb.flush(aduRequest[:]) + mb.flush(data[:]) err = fmt.Errorf("modbus: length in response header '%v' must not be zero", length) return } if length > (tcpMaxLength - (tcpHeaderSize - 1)) { - mb.flush(aduRequest[:]) + mb.flush(data[:]) err = fmt.Errorf("modbus: length in response header '%v' must not greater than '%v'", length, tcpMaxLength-tcpHeaderSize+1) return } // Skip unit id length += tcpHeaderSize - 1 - if _, err = io.ReadFull(mb.conn, aduRequest[tcpHeaderSize:length]); err != nil { + if _, err = io.ReadFull(mb.conn, data[tcpHeaderSize:length]); err != nil { return } - aduResponse = aduRequest[:length] + aduResponse = data[:length] return }