-
Notifications
You must be signed in to change notification settings - Fork 279
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat/l1 msg queue #1055
Feat/l1 msg queue #1055
Conversation
so we are using approach 2? |
I think both are applicable here, because |
|
||
for { | ||
select { | ||
case <-ms.ctx.Done(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why need to specifically handle <-ms.ctx.Done()
and the subsequent select handle this again
EndBlockHeader *types.Header | ||
} | ||
|
||
type MsgStorage struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add some metrics to MsgStorage eg:
- whether the task is running
- which head block is currently processed
} | ||
|
||
// PruneMessages deletes all messages that are older or equal to provided index | ||
func (ms *MsgStorage) PruneMessages(lastIndex uint64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Who call this function? if no, maybe this it's called by itself periodically
var indexesToDelete []uint64 | ||
for _, msg := range msgs { | ||
contains := false | ||
for _, header := range old { | ||
if header.Hash() == msg.headerHash { | ||
contains = true | ||
break | ||
} | ||
} | ||
if contains { | ||
indexesToDelete = append(indexesToDelete, msg.l1msg.QueueIndex) | ||
} | ||
} | ||
if len(indexesToDelete) > 0 { | ||
ms.msgsMu.Lock() | ||
for _, index := range indexesToDelete { | ||
ms.msgs.Delete(index) | ||
} | ||
ms.msgsMu.Unlock() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about storing an additional map L1 blockNum -> L1 message indexes. then we just need to iterate once and delete. this should be rare but it could be quite costly depending on how big msgs
and old
are
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
…ereum into feat/l1-msg-queue
ms.newChainNotifications <- newChainNotification{old, new} | ||
return true | ||
} else { | ||
ms.latestFinalized = new[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of having the distinction of online subscriber I think it would be better to let the Tracker always send its updates. It's the job of the receiver to handle these correctly AND not block.
In this case I think what would be sufficient is something like this:
select {
case ms.newChainNotifications <- newChainNotification{old, new}:
default:
}
This would try to add the update to the channel but if it is full, the update is dropped. I think this is fine since eventually there will be a new update (with a newer block) which can be queued once there is space in the channel (the long running fetching from before is then probably done).
return false | ||
}) | ||
go func() { | ||
fetchTicker := time.NewTicker(defaultFetchInterval) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should remove the ticker here and simply wait for the channel to be filled and the context done in the select below
case <-ms.ctx.Done(): | ||
return | ||
case <-fetchTicker.C: | ||
if ms.state.EndBlockHeader.Number.Uint64() < ms.latestFinalized.Number.Uint64() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we should just use whatever we read from the channel. if it is far away we might need to have a special way of handling it. if it is close, then we can simply request all of the L1 messages.
Closing this for now as it is paused and not based on top of |
1. Purpose or design rationale of this PR
Remove existing
L1MessageQueue
that stores all msgs in db with new one thatL1Msgs
from Finalized L1 blocks, but also load most recentL1Msgs
and handle reorgs, crucial for reducing bridge latencyDesign explanation and task page
2. PR title
Your PR title must follow conventional commits (as we are doing squash merge for each PR), so it must start with one of the following types:
3. Deployment tag versioning
Has the version in
params/version.go
been updated?4. Breaking change label
Does this PR have the
breaking-change
label?