Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add buffered reader #4018

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Examples/Framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ add_library(
src/Framework/RandomNumbers.cpp
src/Framework/Sequencer.cpp
src/Framework/DataHandle.cpp
src/Framework/BufferedReader.cpp
src/Utilities/EventDataTransforms.cpp
src/Utilities/Paths.cpp
src/Utilities/Options.cpp
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// This file is part of the ACTS project.
//
// Copyright (C) 2016 CERN for the benefit of the ACTS project
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

#pragma once

#include "Acts/Utilities/Logger.hpp"
#include "ActsExamples/Framework/AlgorithmContext.hpp"
#include "ActsExamples/Framework/IReader.hpp"
#include "ActsExamples/Framework/ProcessCode.hpp"

#include <utility>

namespace ActsExamples {

class WhiteBoard;

/// Event data reader that takes a concrete reader instance, reads a number of
/// events in a buffer, and selects events from that buffer instead of directly
/// reading them from disk.
/// The purpose is to avoid IO bottlenecks in timing measurements
class BufferedReader final : public IReader {
public:
struct Config {
/// The downstream reader that should be used
std::shared_ptr<IReader> downstreamReader;
benjaminhuth marked this conversation as resolved.
Show resolved Hide resolved

/// The seed for sampling events from the buffer
std::size_t selectionSeed = 123456;

/// Buffer size. The reader will throw and exception if the downstream
/// reader does not provide enough events
std::size_t bufferSize;
benjaminhuth marked this conversation as resolved.
Show resolved Hide resolved
};

/// Constructed the reader
BufferedReader(const Config& config, Acts::Logging::Level level);

/// Return the config
const Config& config() const { return m_cfg; }

/// Give the reader a understandable name
std::string name() const override {
return "Buffered" + m_cfg.downstreamReader->name();
}

/// The buffered reader provides the maximum available event range
std::pair<std::size_t, std::size_t> availableEvents() const override {
return {0, std::numeric_limits<std::size_t>::max()};
}
benjaminhuth marked this conversation as resolved.
Show resolved Hide resolved

/// Return a event from the buffer
ProcessCode read(const AlgorithmContext& context) override;

/// Fulfill the algorithm interface
ProcessCode initialize() override { return ProcessCode::SUCCESS; }

/// Fulfill the algorithm interface
ProcessCode finalize() override { return ProcessCode::SUCCESS; }

private:
Config m_cfg;
std::unique_ptr<const Acts::Logger> m_logger;
std::vector<std::unique_ptr<ActsExamples::WhiteBoard>> m_buffer;

const Acts::Logger& logger() const { return *m_logger; }
};

} // namespace ActsExamples
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class SequenceElement {
template <typename T>
friend class ReadDataHandle;

friend class BufferedReader;
paulgessinger marked this conversation as resolved.
Show resolved Hide resolved

std::vector<const DataHandleBase*> m_writeHandles;
std::vector<const DataHandleBase*> m_readHandles;
};
Expand Down
11 changes: 10 additions & 1 deletion Examples/Framework/include/ActsExamples/Framework/WhiteBoard.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,21 @@ class WhiteBoard {
Acts::getDefaultLogger("WhiteBoard", Acts::Logging::INFO),
std::unordered_map<std::string, std::string> objectAliases = {});

// A WhiteBoard holds unique elements and can not be copied
WhiteBoard(const WhiteBoard& other) = delete;
WhiteBoard& operator=(const WhiteBoard&) = delete;

WhiteBoard(WhiteBoard&& other) = default;
WhiteBoard& operator=(WhiteBoard&& other) = default;

bool exists(const std::string& name) const;

/// Adds the data of this whiteboard instance to another whiteboard.
/// This is a low overhead operation, since the data holders are
/// shared pointers.
/// Throws an exception if the other whiteboard already contains one of
/// the keys in this whiteboard.
void shareDataWith(WhiteBoard& other) const;
benjaminhuth marked this conversation as resolved.
Show resolved Hide resolved

private:
/// Store an object on the white board and transfer ownership.
///
Expand Down
75 changes: 75 additions & 0 deletions Examples/Framework/src/Framework/BufferedReader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// This file is part of the ACTS project.
//
// Copyright (C) 2016 CERN for the benefit of the ACTS project
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

#include "ActsExamples/Framework/BufferedReader.hpp"

#include "Acts/Utilities/Logger.hpp"
#include "ActsExamples/Framework/AlgorithmContext.hpp"
#include "ActsExamples/Framework/WhiteBoard.hpp"

#include <random>
#include <utility>

namespace ActsExamples {

BufferedReader::BufferedReader(const Config &config, Acts::Logging::Level level)
: m_cfg(config), m_logger(Acts::getDefaultLogger(name(), level)) {
if (!m_cfg.downstreamReader) {
throw std::invalid_argument("No downstream reader provided!");
}

// Register write and read handles of the downstream reader
for (auto rh : m_cfg.downstreamReader->readHandles()) {
registerReadHandle(*rh);
}

for (auto wh : m_cfg.downstreamReader->writeHandles()) {
registerWriteHandle(*wh);
}

// Read the events
auto [ebegin, eend] = m_cfg.downstreamReader->availableEvents();
if (eend - ebegin < m_cfg.bufferSize) {
throw std::runtime_error("Reader does not provide enough events");
}

ACTS_INFO("Start reading events into buffer...");

m_buffer.reserve(eend - ebegin);
for (auto i = ebegin; i < ebegin + m_cfg.bufferSize; ++i) {
auto board = std::make_unique<ActsExamples::WhiteBoard>(m_logger->clone());
ActsExamples::AlgorithmContext ctx(0, i, *board);

ACTS_DEBUG("Read event " << i << " into buffer");
m_cfg.downstreamReader->read(ctx);
m_buffer.emplace_back(std::move(board));
}

ACTS_INFO("Filled " << m_buffer.size() << " events into the buffer");
}

ProcessCode BufferedReader::read(const AlgorithmContext &ctx) {
// Set up a random event selection that is consistent if multiple
// BufferedReader are used within a workflow The linear congruential engine is
// chosen since it is cheap to instantiate. For each eventNumber, it is put in
// a reproducible state.
std::minstd_rand rng(m_cfg.selectionSeed);
rng.discard(ctx.eventNumber);
benjaminhuth marked this conversation as resolved.
Show resolved Hide resolved

/// Sample from the buffer and transfer the content
std::uniform_int_distribution<std::size_t> dist(0, m_cfg.bufferSize - 1);

const auto entry = dist(rng);
m_buffer.at(entry)->shareDataWith(ctx.eventStore);

ACTS_DEBUG("Use buffer entry " << entry << " for event " << ctx.eventNumber);

return ProcessCode::SUCCESS;
}
benjaminhuth marked this conversation as resolved.
Show resolved Hide resolved

} // namespace ActsExamples
11 changes: 11 additions & 0 deletions Examples/Framework/src/Framework/WhiteBoard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,14 @@ std::string ActsExamples::WhiteBoard::typeMismatchMessage(
boost::core::demangle(req) + " but actually " +
boost::core::demangle(act)};
}

void ActsExamples::WhiteBoard::shareDataWith(WhiteBoard &other) const {
for (auto &[key, val] : m_store) {
auto [it, success] = other.m_store.insert({key, val});

if (!success) {
throw std::runtime_error("Cannot share key '" + key +
"', is already present");
}
}
}
6 changes: 6 additions & 0 deletions Examples/Python/src/Input.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "Acts/Plugins/Python/Utilities.hpp"
#include "ActsExamples/EventData/Cluster.hpp"
#include "ActsExamples/Framework/BufferedReader.hpp"
#include "ActsExamples/Io/Csv/CsvDriftCircleReader.hpp"
#include "ActsExamples/Io/Csv/CsvExaTrkXGraphReader.hpp"
#include "ActsExamples/Io/Csv/CsvMeasurementReader.hpp"
Expand Down Expand Up @@ -39,6 +40,11 @@ namespace Acts::Python {
void addInput(Context& ctx) {
auto mex = ctx.get("examples");

// Buffered reader
ACTS_PYTHON_DECLARE_READER(ActsExamples::BufferedReader, mex,
"BufferedReader", downstreamReader, selectionSeed,
bufferSize);

// ROOT READERS
ACTS_PYTHON_DECLARE_READER(ActsExamples::RootParticleReader, mex,
"RootParticleReader", outputParticles, treeName,
Expand Down
46 changes: 46 additions & 0 deletions Examples/Python/tests/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,3 +435,49 @@ def test_edm4hep_tracks_reader(tmp_path):
)

s.run()


@pytest.mark.root
def test_buffered_reader(tmp_path, conf_const, ptcl_gun):
# Test the buffered reader with the ROOT particle reader
# need to write out some particles first
s = Sequencer(numThreads=1, events=10, logLevel=acts.logging.WARNING)
evGen = ptcl_gun(s)

file = tmp_path / "particles.root"
s.addWriter(
conf_const(
RootParticleWriter,
acts.logging.WARNING,
inputParticles=evGen.config.outputParticles,
filePath=str(file),
)
)

s.run()

# reset sequencer for reading
s2 = Sequencer(numThreads=1, logLevel=acts.logging.WARNING)

reader = acts.examples.RootParticleReader(
level=acts.logging.WARNING,
outputParticles="particles_input",
filePath=str(file),
)

s2.addReader(
acts.examples.BufferedReader(
level=acts.logging.WARNING,
downstreamReader=reader,
bufferSize=10,
)
)

alg = AssertCollectionExistsAlg(
"particles_input", "check_alg", acts.logging.WARNING
)
s2.addAlgorithm(alg)

s2.run()

assert alg.events_seen == 10
Loading