Skip to content

Commit

Permalink
Merge pull request #14 from HRX980829/master
Browse files Browse the repository at this point in the history
 Fix cross-language seqId problem
  • Loading branch information
qiaodandedidi authored Jul 28, 2021
2 parents 59d5b1b + 8ef70f4 commit a69806b
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 1 deletion.
6 changes: 6 additions & 0 deletions replayer-agent/logic/match/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
"github.com/didi/sharingan/replayer-agent/model/protocol"
"github.com/didi/sharingan/replayer-agent/model/recording"
"github.com/didi/sharingan/replayer-agent/model/replaying"
"github.com/didi/sharingan/replayer-agent/utils/fastcgi"
"github.com/didi/sharingan/replayer-agent/utils/helper"
"github.com/didi/sharingan/replayer-agent/utils/protocol/pthrift"
)

var expect100 = []byte("Expect: 100-continue")
Expand Down Expand Up @@ -126,6 +128,10 @@ func (m *Matcher) DoMatchOutboundTalk(
m.MaxMatchedIndex = maxScoreIndex
}
m.Visited[maxScoreIndex] = true

if bytes.HasPrefix(session.CallFromInbound.Raw, fastcgi.FastCGIRequestHeader) {
pthrift.ReplaceSequenceId(request, session.CallOutbounds[maxScoreIndex])
}
return maxScoreIndex, mark, session.CallOutbounds[maxScoreIndex]
}

Expand Down
9 changes: 8 additions & 1 deletion replayer-agent/logic/outbound/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ func (cs *ConnState) readRequest(ctx context.Context) ([]byte, error) {

request := pool.GetBuf(81920, true)

// SetReadDeadline sets the deadline for future Read calls
// and any currently-blocked Read call.
// A zero value for t means Read will not time out.
cs.conn.SetReadDeadline(time.Time{})

bytesRead, err := cs.conn.Read(buf)
if err != nil {
return nil, err
Expand Down Expand Up @@ -153,9 +158,11 @@ func (cs *ConnState) match(ctx context.Context, request []byte) error {
cs.Handler = loadHandler(ctx, string(cs.traceID))
if cs.Handler == nil {
tlog.Handler.Warnf(ctx, tlog.DebugTag, "errmsg=find Handler failed||request=%s||traceID=%s", quotedRequest, string(cs.traceID))
return errors.New("find Handler failed")
return nil
}

ctx = cs.Handler.Ctx

// 去掉COM_STMT_CLOSE
if request = removeMysqlStmtClose(request); len(request) == 0 {
return nil
Expand Down
23 changes: 23 additions & 0 deletions replayer-agent/utils/protocol/helper/helper.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package helper

import (
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"strconv"
Expand Down Expand Up @@ -43,3 +45,24 @@ func convertModelMap2GeneralMap(m model.Map) map[string]interface{} {
}
return result
}

func IntToBytes(n int,b byte) ([]byte,error) {
switch b {
case 1:
tmp := int8(n)
bytesBuffer := bytes.NewBuffer([]byte{})
binary.Write(bytesBuffer, binary.BigEndian, &tmp)
return bytesBuffer.Bytes(),nil
case 2:
tmp := int16(n)
bytesBuffer := bytes.NewBuffer([]byte{})
binary.Write(bytesBuffer, binary.BigEndian, &tmp)
return bytesBuffer.Bytes(),nil
case 3,4:
tmp := int32(n)
bytesBuffer := bytes.NewBuffer([]byte{})
binary.Write(bytesBuffer, binary.BigEndian, &tmp)
return bytesBuffer.Bytes(),nil
}
return nil,fmt.Errorf("IntToBytes b param is invaild")
}
30 changes: 30 additions & 0 deletions replayer-agent/utils/protocol/pthrift/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"bytes"
"errors"

"github.com/didi/sharingan/replayer-agent/model/recording"
"github.com/didi/sharingan/replayer-agent/utils/protocol/helper"
"github.com/modern-go/parse"
"github.com/modern-go/parse/model"
)
Expand Down Expand Up @@ -407,3 +409,31 @@ func decodeFieldValueCompact(src *parse.Source, vType CompactKind) (interface{},
return nil, errInvalidFeildType
}
}

func ReplaceSequenceId(request []byte, callOutbound *recording.CallOutbound) {
if len(request) <= 4 {
return
}
thrift, DecErr := DecodeBinary(request[4:])
if DecErr != nil {
return
}

seqId := thrift["sequence_id"]
seqIdBytes ,err := helper.IntToBytes(seqId.(int), 4)
if err != nil {
return
}

name := thrift["name"].(string)
nameBytes := []byte(name)
begin := bytes.LastIndex(callOutbound.Response, nameBytes)
if begin == -1 {
return
}

begin += len(nameBytes)
for i, v := range seqIdBytes {
callOutbound.Response[begin+i] = v
}
}
33 changes: 33 additions & 0 deletions replayer-agent/utils/protocol/pthrift/packet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pthrift

import (
"bytes"
"reflect"
"testing"

"github.com/modern-go/parse"
Expand Down Expand Up @@ -264,3 +265,35 @@ func TestDecodeCompact(t *testing.T) {
should.Equal(tc.expect, actual, "case #%d fail", idx)
}
}

func TestIntToBytes(t *testing.T) {
type args struct {
n int
b byte
}
tests := []struct {
name string
args args
want []byte
wantErr bool
}{
{
name: "1",
args: args{n: 10, b: 4},
want: []byte{0, 0, 0, 0x0a},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := IntToBytes(tt.args.n, tt.args.b)
if (err != nil) != tt.wantErr {
t.Errorf("IntToBytes() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("IntToBytes() got = %v, want %v", got, tt.want)
}
})
}
}

0 comments on commit a69806b

Please sign in to comment.