Skip to content

Commit

Permalink
Fix possible data races in INX (#1561)
Browse files Browse the repository at this point in the history
* Fix possible data race in ListenToLedgerUpdates

* Fix possible data race in ListenToTreasuryUpdates

* Fix possible data race in ListenToConfirmedMilestones

* Release v2.0.0-alpha19

* Fix missing update of lastSentIndex

* refactors range logic into single function

Co-authored-by: Luca Moser <[email protected]>
  • Loading branch information
muXxer and luca-moser authored Jun 15, 2022
1 parent 7d685c7 commit a7066d6
Show file tree
Hide file tree
Showing 6 changed files with 380 additions and 173 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@

All notable changes to this project will be documented in this file.


## [2.0.0-alpha19] - 14.06.2022

### Fixed
- Fix INX deadlock #1560
- Fix possible data races in INX #1561

### Changes
- Adds supported protocol versions to REST API and INX (#1552)


## [2.0.0-alpha18] - 08.06.2022

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion core/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var (
Name = "HORNET"

// Version of the app.
Version = "2.0.0-alpha18"
Version = "2.0.0-alpha19"
)

func App() *app.App {
Expand Down
2 changes: 1 addition & 1 deletion docker-example/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: '3'
services:

hornet:
image: iotaledger/hornet:2.0.0-alpha18
image: iotaledger/hornet:2.0.0-alpha19
ulimits:
nofile:
soft: 16384
Expand Down
72 changes: 72 additions & 0 deletions plugins/inx/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package inx

import (
"context"
"github.com/iotaledger/hive.go/workerpool"
"github.com/iotaledger/hornet/pkg/model/milestone"
"net"

grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
Expand Down Expand Up @@ -129,3 +131,73 @@ func (s *INXServer) ReadNodeConfiguration(context.Context, *inx.NoParams) (*inx.
SupportedProtocolVersions: deps.SupportedProtocolVersions,
}, nil
}

type streamRange struct {
start milestone.Index
end milestone.Index
lastSent milestone.Index
}

// tells whether the stream range has a range requested.
func (stream *streamRange) rangeRequested() bool {
return stream.start > 0
}

// tells whether the stream is bounded, aka has an end index.
func (stream *streamRange) isBounded() bool {
return stream.end > 0
}

// handles the sending of data within a streamRange.
// - sendFunc gets executed for the given index.
// - if data wasn't sent between streamRange.lastSent and the given index, then the given catchUpFunc is executed
// with the range from streamRange.lastSent + 1 up to index - 1.
// - it is the caller's job to call task.Return(...).
// - streamRange.lastSent is auto. updated
func handleRangedSend(task *workerpool.Task, index milestone.Index, streamRange *streamRange,
catchUpFunc func(start milestone.Index, end milestone.Index) error,
sendFunc func(task *workerpool.Task, index milestone.Index) error,
) (bool, error) {

// below requested range
if streamRange.rangeRequested() && index < streamRange.start {
return false, nil
}

// execute catch up function with missing indices
if streamRange.rangeRequested() && index-1 > streamRange.lastSent {
startIndex := streamRange.start
if startIndex < streamRange.lastSent+1 {
startIndex = streamRange.lastSent + 1
}

endIndex := index - 1
if streamRange.isBounded() && endIndex > streamRange.end {
endIndex = streamRange.end
}

if err := catchUpFunc(startIndex, endIndex); err != nil {
return false, err
}

streamRange.lastSent = endIndex
}

// stream finished
if streamRange.isBounded() && index > streamRange.end {
return true, nil
}

if err := sendFunc(task, index); err != nil {
return false, err
}

streamRange.lastSent = index

// stream finished
if streamRange.isBounded() && index >= streamRange.end {
return true, nil
}

return false, nil
}
136 changes: 90 additions & 46 deletions plugins/inx/server_milestones.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package inx

import (
"context"

"fmt"
"github.com/pkg/errors"

"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -92,89 +92,133 @@ func (s *INXServer) ListenToLatestMilestones(_ *inx.NoParams, srv inx.INX_Listen

func (s *INXServer) ListenToConfirmedMilestones(req *inx.MilestoneRangeRequest, srv inx.INX_ListenToConfirmedMilestonesServer) error {

createMilestonePayloadForIndexAndSend := func(msIndex milestone.Index) error {
payload, err := milestoneForIndex(msIndex)
if err != nil {
return err
}
if err := srv.Send(payload); err != nil {
return fmt.Errorf("send error: %w", err)
}
return nil
}

createMilestonePayloadForCachedMilestoneAndSend := func(ms *storage.CachedMilestone) error {
payload, err := milestoneForCachedMilestone(ms)
if err != nil {
return err
}
if err := srv.Send(payload); err != nil {
return fmt.Errorf("send error: %w", err)
}
return nil
}

sendMilestonesRange := func(startIndex milestone.Index, endIndex milestone.Index) error {
for currentIndex := startIndex; currentIndex <= endIndex; currentIndex++ {
if err := createMilestonePayloadForIndexAndSend(currentIndex); err != nil {
return err
}
}
return nil
}

// if a startIndex is given, we send all available milestones including the start index.
// if an endIndex is given, we send all available milestones up to and including min(ledgerIndex, endIndex).
// if no startIndex is given, but an endIndex, we do not send previous milestones.
sendPreviousMilestones := func(startIndex milestone.Index, endIndex milestone.Index) (milestone.Index, error) {
if startIndex == 0 {
// no need to send previous milestones
return 0, nil
}

cmi := deps.SyncManager.ConfirmedMilestoneIndex()
if endIndex == 0 || cmi < endIndex {
endIndex = cmi

if startIndex > cmi {
// no need to send previous milestones
return 0, nil
}

if startIndex > 0 && startIndex <= cmi {
// Stream all available milestones
pruningIndex := deps.Storage.SnapshotInfo().PruningIndex
if startIndex <= pruningIndex {
return 0, status.Errorf(codes.InvalidArgument, "given startMilestoneIndex %d is older than the current pruningIndex %d", startIndex, pruningIndex)
}
// Stream all available milestones first
pruningIndex := deps.Storage.SnapshotInfo().PruningIndex
if startIndex <= pruningIndex {
return 0, status.Errorf(codes.InvalidArgument, "given startMilestoneIndex %d is older than the current pruningIndex %d", startIndex, pruningIndex)
}

for currentIndex := startIndex; currentIndex <= endIndex; currentIndex++ {
payload, err := milestoneForIndex(currentIndex)
if err != nil {
return 0, err
}
if err := srv.Send(payload); err != nil {
return 0, err
}
}
if endIndex == 0 || endIndex > cmi {
endIndex = cmi
}

if err := sendMilestonesRange(startIndex, endIndex); err != nil {
return 0, err
}

return endIndex, nil
}

requestStartIndex := milestone.Index(req.GetStartMilestoneIndex())
requestEndIndex := milestone.Index(req.GetEndMilestoneIndex())
stream := &streamRange{
start: milestone.Index(req.GetStartMilestoneIndex()),
end: milestone.Index(req.GetEndMilestoneIndex()),
}

lastSentIndex, err := sendPreviousMilestones(requestStartIndex, requestEndIndex)
var err error
stream.lastSent, err = sendPreviousMilestones(stream.start, stream.end)
if err != nil {
return err
}

if requestEndIndex > 0 && lastSentIndex >= requestEndIndex {
if stream.isBounded() && stream.lastSent >= stream.end {
// We are done sending, so close the stream
return nil
}

catchUpFunc := func(start milestone.Index, end milestone.Index) error {
err := sendMilestonesRange(start, end)
if err != nil {
Plugin.LogInfof("sendMilestonesRange error: %v", err)
}
return err
}

sendFunc := func(task *workerpool.Task, index milestone.Index) error {
// no release needed
cachedMilestone := task.Param(0).(*storage.CachedMilestone)
if err := createMilestonePayloadForCachedMilestoneAndSend(cachedMilestone.Retain()); err != nil { // milestone +1
Plugin.LogInfof("send error: %v", err)
return err
}

return nil
}

var innerErr error
ctx, cancel := context.WithCancel(context.Background())
wp := workerpool.New(func(task workerpool.Task) {
cachedMilestone := task.Param(0).(*storage.CachedMilestone)
defer cachedMilestone.Release(true) // milestone -1

if requestStartIndex > 0 && cachedMilestone.Milestone().Index() < requestStartIndex {
// Skip this because it is before the index we requested
task.Return(nil)
return
}

payload, err := milestoneForCachedMilestone(cachedMilestone.Retain()) // milestone +1
if err != nil {
Plugin.LogInfof("error creating milestone: %v", err)
cancel()
innerErr = err
task.Return(nil)
return
}
if err := srv.Send(payload); err != nil {
Plugin.LogInfof("send error: %v", err)
cancel()
done, err := handleRangedSend(&task, cachedMilestone.Milestone().Index(), stream, catchUpFunc, sendFunc)
switch {
case err != nil:
innerErr = err
task.Return(nil)
return
}

if requestEndIndex > 0 && cachedMilestone.Milestone().Index() >= requestEndIndex {
// We are done sending the milestones
innerErr = nil
fallthrough
case done:
cancel()
}

task.Return(nil)
}, workerpool.WorkerCount(workerCount), workerpool.QueueSize(workerQueueSize), workerpool.FlushTasksAtShutdown(true))

closure := events.NewClosure(func(milestone *storage.CachedMilestone) {
wp.Submit(milestone)
})

wp.Start()
deps.Tangle.Events.ConfirmedMilestoneChanged.Attach(closure)
<-ctx.Done()
deps.Tangle.Events.ConfirmedMilestoneChanged.Detach(closure)
wp.Stop()

return innerErr
}

Expand Down
Loading

0 comments on commit a7066d6

Please sign in to comment.