Skip to content

Commit

Permalink
Merge pull request #13 from lvan100/master
Browse files Browse the repository at this point in the history
修复 thrift 数据拆包然后 sharingan 读不全的问题
  • Loading branch information
yj20060714 authored Jul 28, 2021
2 parents 0402bac + 81d65f6 commit 59d5b1b
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 15 deletions.
26 changes: 11 additions & 15 deletions replayer-agent/logic/outbound/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,24 +114,20 @@ func (cs *ConnState) readRequest(ctx context.Context) ([]byte, error) {
request = append(request, buf[:bytesRead]...)
helper.SetQuickAck(cs.conn)

// 可能还有数据没读完
if bytesRead >= len(buf) {
for {
cs.conn.SetReadDeadline(time.Now().Add(time.Millisecond * 25))
bytesRead, err := cs.conn.Read(buf)
if err != nil {
break
}
helper.SetQuickAck(cs.conn)
request = append(request, buf[:bytesRead]...)
if bytesRead < len(buf) {
break
}
for {
cs.conn.SetReadDeadline(time.Now().Add(time.Millisecond * 25))
bytesRead, err := cs.conn.Read(buf)
if err != nil {
break
}
helper.SetQuickAck(cs.conn)
request = append(request, buf[:bytesRead]...)
if bytesRead < len(buf) {
break
}
}

request = cs.rmTrafixPrefix(ctx, request)

return request[:len(request)], nil
}

Expand All @@ -157,7 +153,7 @@ func (cs *ConnState) match(ctx context.Context, request []byte) error {
cs.Handler = loadHandler(ctx, string(cs.traceID))
if cs.Handler == nil {
tlog.Handler.Warnf(ctx, tlog.DebugTag, "errmsg=find Handler failed||request=%s||traceID=%s", quotedRequest, string(cs.traceID))
return nil
return errors.New("find Handler failed")
}

// 去掉COM_STMT_CLOSE
Expand Down
62 changes: 62 additions & 0 deletions replayer-agent/logic/outbound/outbound_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package outbound_test

import (
"encoding/hex"
"net"
"testing"
"time"

"github.com/didi/sharingan/replayer-agent/common/handlers/tlog"
"github.com/didi/sharingan/replayer-agent/logic/outbound"
"go.uber.org/zap"
)

func init() {
tlog.Handler = tlog.NewTLog(zap.NewExample())
}

func TestOutBound(t *testing.T) {

const address = "127.0.0.1:3515"

addr, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
t.Error(err)
t.Fail()
return
}

go outbound.Start(addr)
time.Sleep(300 * time.Millisecond)

conn, err := net.Dial("tcp", address)
if err != nil {
t.Error(err)
t.Fail()
return
}

msgs := []string{
"2f2a7b22726964223a223766303030303031363066666334386561336666323934626466663761626230222c2261646472223a2231302e3139302e392e3137393a39313031227d2a2f000000e8",
"2f2a7b22726964223a223766303030303031363066666334386561336666323934626466663761626230222c2261646472223a2231302e3139302e392e3137393a39313031227d2a2f80010001",
}

for _, msg := range msgs {
time.Sleep(5 * time.Millisecond)

b, err := hex.DecodeString(msg)
if err != nil {
t.Error(err)
t.Fail()
return
}
_, err = conn.Write(b)
if err != nil {
t.Error(err)
t.Fail()
return
}
}

time.Sleep(300 * time.Millisecond)
}

0 comments on commit 59d5b1b

Please sign in to comment.