Skip to content

Commit

Permalink
feat(time): normalize all time to epoch
Browse files Browse the repository at this point in the history
Remove all timeNormalizer object logic. Instead use simple normalization
functions and normalize all timestamps in events asap:
1. Context timestamp normalization moved to decode stage
2. Relevant timestamp arguments normalized in processing stage,
   registered to run first.
  • Loading branch information
NDStrahilevitz committed Sep 10, 2024
1 parent 8015ffd commit c1ea067
Show file tree
Hide file tree
Showing 14 changed files with 76 additions and 131 deletions.
3 changes: 2 additions & 1 deletion pkg/ebpf/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/aquasecurity/tracee/pkg/bufferdecoder"
"github.com/aquasecurity/tracee/pkg/errfmt"
"github.com/aquasecurity/tracee/pkg/logger"
"github.com/aquasecurity/tracee/pkg/time"
"github.com/aquasecurity/tracee/pkg/utils"
)

Expand Down Expand Up @@ -109,7 +110,7 @@ func (t *Tracee) handleFileCaptures(ctx context.Context) {
continue
}
// note: size of buffer will determine maximum extracted file size! (as writes from kernel are immediate)
mprotectMeta.Ts = uint64(t.timeNormalizer.NormalizeTime(int(mprotectMeta.Ts)))
mprotectMeta.Ts = time.BootToEpochNS(uint64(mprotectMeta.Ts))
filename = fmt.Sprintf("bin.pid-%d.ts-%d", mprotectMeta.Pid, mprotectMeta.Ts)
} else if meta.BinType == bufferdecoder.SendKernelModule {
err = metaBuffDecoder.DecodeKernelModuleMeta(&kernelModuleMeta)
Expand Down
4 changes: 0 additions & 4 deletions pkg/ebpf/controlplane/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/aquasecurity/tracee/pkg/events"
"github.com/aquasecurity/tracee/pkg/logger"
"github.com/aquasecurity/tracee/pkg/proctree"
traceetime "github.com/aquasecurity/tracee/pkg/time"
)

// TODO: With the introduction of signal events, the control plane can now have a generic argument
Expand All @@ -29,7 +28,6 @@ type Controller struct {
cgroupManager *containers.Containers
processTree *proctree.ProcessTree
enrichDisabled bool
timeNormalizer traceetime.TimeNormalizer
}

// NewController creates a new controller.
Expand All @@ -38,7 +36,6 @@ func NewController(
cgroupManager *containers.Containers,
enrichDisabled bool,
procTree *proctree.ProcessTree,
timeNormalizer traceetime.TimeNormalizer,
) (*Controller, error) {
var err error

Expand All @@ -49,7 +46,6 @@ func NewController(
cgroupManager: cgroupManager,
processTree: procTree,
enrichDisabled: enrichDisabled,
timeNormalizer: timeNormalizer,
}

p.signalBuffer, err = bpfModule.InitPerfBuf("signals", p.signalChan, p.lostSignalChan, 1024)
Expand Down
13 changes: 7 additions & 6 deletions pkg/ebpf/controlplane/processes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controlplane
import (
"github.com/aquasecurity/tracee/pkg/events/parse"
"github.com/aquasecurity/tracee/pkg/proctree"
"github.com/aquasecurity/tracee/pkg/time"
"github.com/aquasecurity/tracee/pkg/utils"
"github.com/aquasecurity/tracee/types/trace"
)
Expand Down Expand Up @@ -73,25 +74,25 @@ func (ctrl *Controller) procTreeForkProcessor(args []trace.Argument) error {

return ctrl.processTree.FeedFromFork(
proctree.ForkFeed{
TimeStamp: uint64(ctrl.timeNormalizer.NormalizeTime(int(timestamp))),
TimeStamp: time.BootToEpochNS(timestamp),
ChildHash: childHash,
ParentHash: parentHash,
LeaderHash: leaderHash,
ParentTid: parentTid,
ParentNsTid: parentNsTid,
ParentPid: parentPid,
ParentNsPid: parentNsPid,
ParentStartTime: uint64(ctrl.timeNormalizer.NormalizeTime(int(parentStartTime))),
ParentStartTime: time.BootToEpochNS(parentStartTime),
LeaderTid: leaderTid,
LeaderNsTid: leaderNsTid,
LeaderPid: leaderPid,
LeaderNsPid: leaderNsPid,
LeaderStartTime: uint64(ctrl.timeNormalizer.NormalizeTime(int(leaderStartTime))),
LeaderStartTime: time.BootToEpochNS(leaderStartTime),
ChildTid: childTid,
ChildNsTid: childNsTid,
ChildPid: childPid,
ChildNsPid: childNsPid,
ChildStartTime: uint64(ctrl.timeNormalizer.NormalizeTime(int(childStartTime))),
ChildStartTime: time.BootToEpochNS(childStartTime),
},
)
}
Expand Down Expand Up @@ -154,7 +155,7 @@ func (ctrl *Controller) procTreeExecProcessor(args []trace.Argument) error {

return ctrl.processTree.FeedFromExec(
proctree.ExecFeed{
TimeStamp: uint64(ctrl.timeNormalizer.NormalizeTime(int(timestamp))),
TimeStamp: time.BootToEpochNS(timestamp),
TaskHash: taskHash,
ParentHash: parentHash,
LeaderHash: leaderHash,
Expand Down Expand Up @@ -208,7 +209,7 @@ func (ctrl *Controller) procTreeExitProcessor(args []trace.Argument) error {

return ctrl.processTree.FeedFromExit(
proctree.ExitFeed{
TimeStamp: uint64(ctrl.timeNormalizer.NormalizeTime(int(timestamp))), // time of exit is already a timestamp
TimeStamp: time.BootToEpochNS(timestamp), // time of exit is already a times)p
TaskHash: taskHash,
ParentHash: parentHash,
LeaderHash: leaderHash,
Expand Down
11 changes: 6 additions & 5 deletions pkg/ebpf/events_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/aquasecurity/tracee/pkg/errfmt"
"github.com/aquasecurity/tracee/pkg/events"
"github.com/aquasecurity/tracee/pkg/logger"
"github.com/aquasecurity/tracee/pkg/time"
"github.com/aquasecurity/tracee/pkg/utils"
"github.com/aquasecurity/tracee/types/trace"
)
Expand Down Expand Up @@ -227,8 +228,8 @@ func (t *Tracee) decodeEvents(ctx context.Context, sourceChan chan []byte) (<-ch

// populate all the fields of the event used in this stage, and reset the rest

evt.Timestamp = int(eCtx.Ts)
evt.ThreadStartTime = int(eCtx.StartTime)
evt.Timestamp = int(time.BootToEpochNS(eCtx.Ts)) // set normalized to epoch
evt.ThreadStartTime = int(time.BootToEpochNS(eCtx.StartTime)) // set normalized to epoch
evt.ProcessorID = int(eCtx.ProcessorId)
evt.ProcessID = int(eCtx.Pid)
evt.ThreadID = int(eCtx.Tid)
Expand All @@ -239,8 +240,8 @@ func (t *Tracee) decodeEvents(ctx context.Context, sourceChan chan []byte) (<-ch
evt.UserID = int(eCtx.Uid)
evt.MountNS = int(eCtx.MntID)
evt.PIDNS = int(eCtx.PidID)
evt.ProcessName = string(bytes.TrimRight(eCtx.Comm[:], "\x00"))
evt.HostName = string(bytes.TrimRight(eCtx.UtsName[:], "\x00"))
evt.ProcessName = string(bytes.TrimRight(eCtx.Comm[:], "\x00")) // set and clean potential trailing null
evt.HostName = string(bytes.TrimRight(eCtx.UtsName[:], "\x00")) // set and clean potential trailing null
evt.CgroupID = uint(eCtx.CgroupID)
evt.ContainerID = containerData.ID
evt.Container = containerData
Expand Down Expand Up @@ -727,7 +728,7 @@ func (t *Tracee) parseArguments(e *trace.Event) error {
}

if t.config.Output.ParseArgumentsFDs {
return events.ParseArgsFDs(e, uint64(t.timeNormalizer.GetOriginalTime(e.Timestamp)), t.FDArgPathMap)
return events.ParseArgsFDs(e, uint64(e.Timestamp), t.FDArgPathMap)
}
}

Expand Down
6 changes: 0 additions & 6 deletions pkg/ebpf/net_capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,6 @@ func (t *Tracee) processNetCapEvents(ctx context.Context, in <-chan *trace.Event
select {
case event := <-in:
// TODO: Support captures pipeline in t.processEvent
err := t.normalizeEventCtxTimes(event)
if err != nil {
t.handleError(err)
t.eventsPool.Put(event)
continue
}
t.processNetCapEvent(event)
_ = t.stats.NetCapCount.Increment()
t.eventsPool.Put(event)
Expand Down
26 changes: 15 additions & 11 deletions pkg/ebpf/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ func init() {
func (t *Tracee) processEvent(event *trace.Event) []error {
var errs []error

processors := t.eventProcessor[events.ID(event.EventID)] // this event processors
processors = append(processors, t.eventProcessor[events.All]...) // all events processors
processors := t.eventProcessor[events.ID(event.EventID)] // this event processors

for _, processor := range processors {
err := processor(event)
Expand Down Expand Up @@ -78,6 +77,20 @@ func (t *Tracee) RegisterEventProcessor(id events.ID, proc func(evt *trace.Event

// registerEventProcessors registers all event processors, each to a specific event id.
func (t *Tracee) registerEventProcessors() {
//
// Event Timestamps Normalization
//

// Convert all time relate args to nanoseconds since epoch.
// NOTE: Make sure to convert time related args (of your event) in here, so that
// any later code has access to normalized time arguments.
t.RegisterEventProcessor(events.SchedProcessFork, t.normalizeTimeArg(
"start_time",
"parent_start_time",
"parent_process_start_time",
"leader_start_time",
))

//
// Process Tree Processors
//
Expand Down Expand Up @@ -119,15 +132,6 @@ func (t *Tracee) registerEventProcessors() {
t.RegisterEventProcessor(events.PrintMemDump, t.processTriggeredEvent)
t.RegisterEventProcessor(events.PrintMemDump, t.processPrintMemDump)
t.RegisterEventProcessor(events.SharedObjectLoaded, t.processSharedObjectLoaded)

//
// Event Timestamps Normalization Processors
//

// Convert all time relate args to nanoseconds since epoch.
// NOTE: Make sure to convert time related args (of your event) in here.
t.RegisterEventProcessor(events.SchedProcessFork, t.normalizeTimeArg("start_time"))
t.RegisterEventProcessor(events.All, t.normalizeEventCtxTimes)
}

func initKernelReadFileTypes() {
Expand Down
43 changes: 18 additions & 25 deletions pkg/ebpf/processor_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/aquasecurity/tracee/pkg/events/parse"
"github.com/aquasecurity/tracee/pkg/filehash"
"github.com/aquasecurity/tracee/pkg/logger"
"github.com/aquasecurity/tracee/pkg/time"
"github.com/aquasecurity/tracee/pkg/utils"
"github.com/aquasecurity/tracee/types/trace"
)
Expand Down Expand Up @@ -160,7 +161,11 @@ func (t *Tracee) processSchedProcessExec(event *trace.Event) error {
}
destinationFilePath := filepath.Join(
destinationDirPath,
fmt.Sprintf("exec.%d.%s", event.Timestamp, filepath.Base(filePath)),
fmt.Sprintf(
"exec.%d.%s",
event.Timestamp,
filepath.Base(filePath),
),
)
// don't capture same file twice unless it was modified
lastCtime, ok := t.capturedFiles[capturedFileID]
Expand Down Expand Up @@ -345,33 +350,21 @@ func (t *Tracee) processPrintMemDump(event *trace.Event) error {
// Timing related functions
//

// normalizeEventCtxTimes normalizes the event context timings to be relative to tracee start time
// or current time in nanoseconds.
func (t *Tracee) normalizeEventCtxTimes(event *trace.Event) error {
eventId := events.ID(event.EventID)
if eventId > events.MaxCommonID && eventId < events.MaxUserSpace {
// derived events are normalized from their base event, skip the processing
return nil
}
event.Timestamp = t.timeNormalizer.NormalizeTime(event.Timestamp)
event.ThreadStartTime = t.timeNormalizer.NormalizeTime(event.ThreadStartTime)

return nil
}

// normalizeTimeArg returns a processor function for some argument name
// which normalizes said event arg time to be relative to tracee start time or current time.
func (t *Tracee) normalizeTimeArg(argName string) func(event *trace.Event) error {
// which normalizes said event arg time from boot monotonic to epoch
func (t *Tracee) normalizeTimeArg(argNames ...string) func(event *trace.Event) error {
return func(event *trace.Event) error {
arg := events.GetArg(event, argName)
if arg == nil {
return errfmt.Errorf("couldn't find argument %s of event %s", argName, event.EventName)
}
argTime, ok := arg.Value.(uint64)
if !ok {
return errfmt.Errorf("argument %s of event %s is not of type uint64", argName, event.EventName)
for _, argName := range argNames {
arg := events.GetArg(event, argName)
if arg == nil {
return errfmt.Errorf("couldn't find argument %s of event %s", argName, event.EventName)
}
argTime, ok := arg.Value.(uint64)
if !ok {
return errfmt.Errorf("argument %s of event %s is not of type uint64", argName, event.EventName)
}
arg.Value = time.BootToEpochNS(argTime)
}
arg.Value = t.timeNormalizer.NormalizeTime(int(argTime))
return nil
}
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/ebpf/processor_proctree.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,25 +75,25 @@ func (t *Tracee) procTreeForkProcessor(event *trace.Event) error {

return t.processTree.FeedFromFork(
proctree.ForkFeed{
TimeStamp: uint64(t.timeNormalizer.NormalizeTime(int(childStartTime))), // event timestamp is the same
TimeStamp: childStartTime, // event timestamp is the same
ChildHash: childHash,
ParentHash: parentHash,
LeaderHash: leaderHash,
ParentTid: parentTid,
ParentNsTid: parentNsTid,
ParentPid: parentPid,
ParentNsPid: parentNsPid,
ParentStartTime: uint64(t.timeNormalizer.NormalizeTime(int(parentStartTime))),
ParentStartTime: parentStartTime,
LeaderTid: leaderTid,
LeaderNsTid: leaderNsTid,
LeaderPid: leaderPid,
LeaderNsPid: leaderNsPid,
LeaderStartTime: uint64(t.timeNormalizer.NormalizeTime(int(leaderStartTime))),
LeaderStartTime: leaderStartTime,
ChildTid: childTid,
ChildNsTid: childNsTid,
ChildPid: childPid,
ChildNsPid: childNsPid,
ChildStartTime: uint64(t.timeNormalizer.NormalizeTime(int(childStartTime))),
ChildStartTime: childStartTime,
},
)
}
Expand Down Expand Up @@ -153,7 +153,7 @@ func (t *Tracee) procTreeExecProcessor(event *trace.Event) error {

return t.processTree.FeedFromExec(
proctree.ExecFeed{
TimeStamp: uint64(t.timeNormalizer.NormalizeTime(int(timestamp))),
TimeStamp: timestamp,
TaskHash: taskHash,
ParentHash: 0, // regular pipeline does not have parent hash
LeaderHash: 0, // regular pipeline does not have leader hash
Expand Down Expand Up @@ -204,7 +204,7 @@ func (t *Tracee) procTreeExitProcessor(event *trace.Event) error {

return t.processTree.FeedFromExit(
proctree.ExitFeed{
TimeStamp: uint64(t.timeNormalizer.NormalizeTime(int(timestamp))), // time of exit is already a timestamp
TimeStamp: timestamp, // time of exit is already a timestamp
TaskHash: taskHash,
ParentHash: 0, // regular pipeline does not have parent hash
LeaderHash: 0, // regular pipeline does not have leader hash
Expand Down Expand Up @@ -237,7 +237,7 @@ func (t *Tracee) procTreeAddBinInfo(event *trace.Event) error {
}

// Event timestamp is changed to relative (or not) at the end of all processors only.
eventTimestamp := traceetime.NsSinceEpochToTime(uint64(t.timeNormalizer.NormalizeTime(event.Timestamp)))
eventTimestamp := traceetime.NsSinceEpochToTime(uint64(event.Timestamp))

executable := currentProcess.GetExecutable()

Expand Down
11 changes: 2 additions & 9 deletions pkg/ebpf/tracee.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@ type Tracee struct {
// This does not mean they are required for tracee to function.
// TODO: remove this in favor of dependency manager nodes
requiredKsyms []string
// Time for normalization
timeNormalizer traceetime.TimeNormalizer
}

func (t *Tracee) Stats() *metrics.Stats {
Expand Down Expand Up @@ -446,13 +444,9 @@ func (t *Tracee) Init(ctx gocontext.Context) error {
return errfmt.WrapError(err)
}

// init time functionalities
traceetime.Init(int32(usedClockID))

// time in nanoseconds when the system was booted
t.bootTime = uint64(traceetime.GetBootTimeNS())

t.timeNormalizer = traceetime.CreateTimeNormalizerByConfig(t.bootTime)

// Initialize buckets cache

var mntNSProcs map[int]int
Expand Down Expand Up @@ -579,7 +573,7 @@ func (t *Tracee) Init(ctx gocontext.Context) error {
proctreeConfig.ProcfsInitialization = false
proctreeConfig.ProcfsQuerying = false
}
t.processTree, err = proctree.NewProcessTree(ctx, proctreeConfig, t.timeNormalizer)
t.processTree, err = proctree.NewProcessTree(ctx, proctreeConfig)
if err != nil {
return errfmt.WrapError(err)
}
Expand Down Expand Up @@ -1410,7 +1404,6 @@ func (t *Tracee) initBPF() error {
t.containers,
t.config.NoContainersEnrich,
t.processTree,
t.timeNormalizer,
)
if err != nil {
return errfmt.WrapError(err)
Expand Down
Loading

0 comments on commit c1ea067

Please sign in to comment.