Skip to content

Commit

Permalink
Frame events now batch processors correctly based on their access mod…
Browse files Browse the repository at this point in the history
…es to components
  • Loading branch information
fLindahl committed Nov 6, 2023
1 parent 73c6b57 commit ac694d6
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 41 deletions.
9 changes: 9 additions & 0 deletions code/application/game/filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ ComponentsInFilter(Filter filter)
return filterAllocator.Get<2>(filter);
}

//------------------------------------------------------------------------------
/**
*/
Util::FixedArray<AccessMode> const&
AccessModesInFilter(Filter filter)
{
return filterAllocator.Get<3>(filter);
}

//------------------------------------------------------------------------------
/**
*/
Expand Down
2 changes: 2 additions & 0 deletions code/application/game/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ InclusiveTableMask const& GetInclusiveTableMask(Filter);
ExclusiveTableMask const& GetExclusiveTableMask(Filter);
/// retrieve the inclusive component array
Util::FixedArray<ComponentId> const& ComponentsInFilter(Filter);
/// retrieve the inclusive component array
Util::FixedArray<AccessMode> const& AccessModesInFilter(Filter);

class FilterBuilder
{
Expand Down
45 changes: 36 additions & 9 deletions code/application/game/frameevent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ FrameEvent::AddProcessor(Processor* processor)

if (!accepted)
{
FrameEventBatch* batch = new FrameEventBatch();
FrameEvent::Batch* batch = new FrameEvent::Batch();
batch->async = processor->async;
batch->order = processor->order;
bool res = batch->TryInsert(processor);
Expand Down Expand Up @@ -122,7 +122,7 @@ FrameEvent::CacheTable(MemDb::TableId tid, MemDb::TableSignature const& signatur
//------------------------------------------------------------------------------
/**
*/
FrameEventBatch::~FrameEventBatch()
FrameEvent::Batch::~Batch()
{
for (SizeT i = 0; i < this->processors.Size(); i++)
{
Expand All @@ -134,7 +134,7 @@ FrameEventBatch::~FrameEventBatch()
/**
*/
void
FrameEventBatch::Execute(World* world)
FrameEvent::Batch::Execute(World* world)
{
if (this->async)
{
Expand All @@ -150,10 +150,37 @@ FrameEventBatch::Execute(World* world)
/**
*/
bool
FrameEventBatch::TryInsert(Processor* processor)
FrameEvent::Batch::TryInsert(Processor* processor)
{
// TODO: if the batch is async, check so that we don't
// if the batch is async, check so that we don't
// have multiple writers to the same components
if (processor->async)
{
Util::FixedArray<ComponentId> const& components = Game::ComponentsInFilter(processor->filter);
Util::FixedArray<AccessMode> const& access = Game::AccessModesInFilter(processor->filter);
for (IndexT i = 0; i < components.Size(); i++)
{
for (auto other : this->processors)
{
auto const& otherComponents = Game::ComponentsInFilter(other->filter);
auto const& otherAccess = Game::AccessModesInFilter(other->filter);
for (IndexT k = 0; k < otherComponents.Size(); k++)
{
if (otherComponents[k] == components[i])
{
if (otherAccess[k] == AccessMode::WRITE || access[i] == AccessMode::WRITE)
{
// Some processor is already writing to this component, or we're trying to write, and some other is already reading.
// Either way, we cannot add the processor since it will cause races.
return false;
}
break; // we can break because a component should never exist twice in the filter
}
}
}
}
}

this->processors.Append(processor);
return true;
}
Expand All @@ -162,7 +189,7 @@ FrameEventBatch::TryInsert(Processor* processor)
/**
*/
void
FrameEventBatch::Prefilter(World* world, bool force)
FrameEvent::Batch::Prefilter(World* world, bool force)
{
for (SizeT i = 0; i < this->processors.Size(); i++)
{
Expand All @@ -179,7 +206,7 @@ FrameEventBatch::Prefilter(World* world, bool force)
/**
*/
void
FrameEventBatch::CacheTable(MemDb::TableId tid, MemDb::TableSignature const& signature)
FrameEvent::Batch::CacheTable(MemDb::TableId tid, MemDb::TableSignature const& signature)
{
for (SizeT i = 0; i < this->processors.Size(); i++)
{
Expand All @@ -204,7 +231,7 @@ FrameEventBatch::CacheTable(MemDb::TableId tid, MemDb::TableSignature const& sig
/**
*/
void
FrameEventBatch::ExecuteAsync(World* world)
FrameEvent::Batch::ExecuteAsync(World* world)
{
Util::FixedArray<Dataset> datasets(this->processors.Size());

Expand Down Expand Up @@ -244,7 +271,7 @@ FrameEventBatch::ExecuteAsync(World* world)
/**
*/
void
FrameEventBatch::ExecuteSequential(World* world)
FrameEvent::Batch::ExecuteSequential(World* world)
{
for (SizeT i = 0; i < this->processors.Size(); i++)
{
Expand Down
68 changes: 36 additions & 32 deletions code/application/game/frameevent.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,70 +28,74 @@ struct ProcessorJobContext

//------------------------------------------------------------------------------
/**
A batch of frame callbacks
*/
class FrameEventBatch
class FrameEvent
{
public:
FrameEventBatch() = default;
~FrameEventBatch();

void Execute(World* world);
FrameEvent() = default;
~FrameEvent();

/// Try to insert a processor into the batch.
/// Can reject the processor if the batch is executed
/// async, and the processor is not allowed to run
/// simultaneously as the other processor. In this
/// case, use linear probing to insert the processor
/// into a new batch
bool TryInsert(Processor* processor);
void Run(World* world);
void AddProcessor(Processor* processor);

/// prefilter all processors. Should not be done per frame - instead use CacheTable if you need to do incremental caching
void Prefilter(World* world, bool force = false);
/// add a table to the cache of any processors that accepts it, that is attached to this frame batch
/// add a table to the caches of any processors that accepts it, that is attached to this frame event
void CacheTable(MemDb::TableId, MemDb::TableSignature const&);

/// sorting order in frame event
Util::StringAtom name;
int order = 100;
bool async = false;

private:
void ExecuteAsync(World* world);
void ExecuteSequential(World* world);
friend FramePipeline;

Util::Array<Processor*> processors;
class Batch;

/// Which pipeline is this event attached to
FramePipeline* pipeline;

/// Batches that this event will execute
Util::Array<Batch*> batches;
};


//------------------------------------------------------------------------------
/**
A batch of frame callbacks
*/
class FrameEvent
class FrameEvent::Batch
{
public:
FrameEvent() = default;
~FrameEvent();
Batch() = default;
~Batch();

void Run(World* world);
void AddProcessor(Processor* processor);
void Execute(World* world);

/// Try to insert a processor into the batch.
/// Can reject the processor if the batch is executed
/// async, and the processor is not allowed to run
/// simultaneously as the other processor. In this
/// case, use linear probing to insert the processor
/// into a new batch
bool TryInsert(Processor* processor);

/// prefilter all processors. Should not be done per frame - instead use CacheTable if you need to do incremental caching
void Prefilter(World* world, bool force = false);
/// add a table to the caches of any processors that accepts it, that is attached to this frame event
/// add a table to the cache of any processors that accepts it, that is attached to this frame batch
void CacheTable(MemDb::TableId, MemDb::TableSignature const&);

Util::StringAtom name;
/// sorting order in frame event
int order = 100;
bool async = false;

private:
friend FramePipeline;

/// Which pipeline is this event attached to
FramePipeline* pipeline;
void ExecuteAsync(World* world);
void ExecuteSequential(World* world);

/// Batches that this event will execute
Util::Array<FrameEventBatch*> batches;
Util::Array<Processor*> processors;
};


//------------------------------------------------------------------------------
/**
*/
Expand Down

0 comments on commit ac694d6

Please sign in to comment.