|
| 1 | +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. |
| 2 | +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. |
| 3 | +// All rights not expressly granted are reserved. |
| 4 | +// |
| 5 | +// This software is distributed under the terms of the GNU General Public |
| 6 | +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". |
| 7 | +// |
| 8 | +// In applying this license CERN does not waive the privileges and immunities |
| 9 | +// granted to it by virtue of its status as an Intergovernmental Organization |
| 10 | +// or submit itself to any jurisdiction. |
| 11 | + |
| 12 | +/// \file test_cmv_generator.cxx |
| 13 | +/// \brief DPL source workflow that generates dummy CMV data for testing the CMV FLP pipeline. |
| 14 | +/// |
| 15 | +/// Replaces o2-tpc-cmv-to-vector in tests; directly emits CMVVECTOR and CMVORBITS |
| 16 | +/// messages per CRU per TF so the workflow can be piped straight into o2-tpc-cmv-flp: |
| 17 | +/// |
| 18 | +/// o2-tpc-cmv-test-generator --crus 0-359 --timeframes 100 \ |
| 19 | +/// | o2-tpc-cmv-flp --crus 0-359 --n-TFs-buffer 10 \ |
| 20 | +/// | o2-dpl-output-proxy --dataspec "downstream:TPC/CMVGROUP;downstream:TPC/CMVORBITINFO" ... |
| 21 | +/// |
| 22 | +/// \author Ernst Hellbar <ernst.hellbar@cern.ch> |
| 23 | + |
| 24 | +#include "Framework/DataProcessorSpec.h" |
| 25 | +#include "Framework/Task.h" |
| 26 | +#include "Framework/ControlService.h" |
| 27 | +#include "Framework/ConfigParamRegistry.h" |
| 28 | +#include "Framework/ConfigParamSpec.h" |
| 29 | +#include "Framework/Logger.h" |
| 30 | +#include "Headers/DataHeader.h" |
| 31 | +#include "Algorithm/RangeTokenizer.h" |
| 32 | +#include "TPCBase/CRU.h" |
| 33 | +#include "DataFormatsTPC/CMV.h" |
| 34 | +#include "TPCWorkflow/ProcessingHelpers.h" |
| 35 | +#include "CommonUtils/TreeStreamRedirector.h" |
| 36 | +#include <fmt/format.h> |
| 37 | +#include <fmt/ranges.h> |
| 38 | + |
| 39 | +#include <vector> |
| 40 | +#include <chrono> |
| 41 | +#include <thread> |
| 42 | +#include <cmath> |
| 43 | +#include <memory> |
| 44 | +#include <random> |
| 45 | +#include <unordered_set> |
| 46 | + |
| 47 | +using namespace o2::framework; |
| 48 | +using o2::header::gDataOriginTPC; |
| 49 | + |
| 50 | +// ───────────────────────────────────────────────────────────────────────────── |
| 51 | +// workflow options |
| 52 | +// ───────────────────────────────────────────────────────────────────────────── |
| 53 | +void customize(std::vector<ConfigParamSpec>& workflowOptions) |
| 54 | +{ |
| 55 | + const std::string cruDefault = "0-" + std::to_string(o2::tpc::CRU::MaxCRU - 1); |
| 56 | + std::vector<ConfigParamSpec> options{ |
| 57 | + {"crus", VariantType::String, cruDefault.c_str(), {"List of CRUs, comma-separated ranges, e.g. 0-3,7,9-15"}}, |
| 58 | + {"timeframes", VariantType::Int, 100, {"Number of TFs to generate; use -1 to run indefinitely"}}, |
| 59 | + {"delay", VariantType::Bool, false, {"Add delay after sending all CRUs"}}, |
| 60 | + {"delayTime", VariantType::Int, 1, {"Duration of the global per-TF delay in ms (requires --delay true)"}}, |
| 61 | + {"delayCRUs", VariantType::String, "", {"CRUs for which to add an extra per-CRU delay before sending, comma-separated ranges"}}, |
| 62 | + {"delayTimeCRUs", VariantType::Int, 1, {"Duration of the per-CRU delay in ms (requires --delayCRUs)"}}, |
| 63 | + {"dropTFsRandom", VariantType::Int, 0, {"Drop a whole TF randomly: on average one every N TFs (0 = disabled)"}}, |
| 64 | + {"dropTFsRange", VariantType::String, "", {"Drop all TFs in this range, e.g. 10-12"}}, |
| 65 | + {"seed", VariantType::Int, 42, {"RNG seed for CMV value generation"}}, |
| 66 | + {"amplitude", VariantType::Float, 5.0f, {"Amplitude of the sinusoidal CMV signal (ADC units)"}}, |
| 67 | + {"noise", VariantType::Float, 1.0f, {"Gaussian noise std-dev added per time bin (ADC units)"}}, |
| 68 | + }; |
| 69 | + std::swap(workflowOptions, options); |
| 70 | +} |
| 71 | + |
| 72 | +#include "Framework/runDataProcessing.h" |
| 73 | + |
| 74 | +// ───────────────────────────────────────────────────────────────────────────── |
| 75 | +// generator device |
| 76 | +// ───────────────────────────────────────────────────────────────────────────── |
| 77 | +class CMVGeneratorDevice : public o2::framework::Task |
| 78 | +{ |
| 79 | + public: |
| 80 | + static constexpr uint32_t sOrbitsPerPacket = 8; ///< each CMV packet covers 8 heartbeat orbits |
| 81 | + |
| 82 | + CMVGeneratorDevice(const std::vector<uint32_t>& crus, |
| 83 | + const std::unordered_set<uint32_t>& delayCRUs, |
| 84 | + unsigned int maxTFs, |
| 85 | + bool delay, |
| 86 | + int delayTime, |
| 87 | + int delayTimeCRUs, |
| 88 | + int dropTFsRandom, |
| 89 | + const std::vector<int>& rangeTFsDrop, |
| 90 | + float amplitude, |
| 91 | + float noise, |
| 92 | + int seed) |
| 93 | + : mCRUs(crus), mDelayCRUs(delayCRUs), mMaxTFs(maxTFs), mDelay(delay), mDelayTime(delayTime), mDelayTimeCRUs(delayTimeCRUs), mDropTFsRandom(dropTFsRandom), mRangeTFsDrop(rangeTFsDrop), mAmplitude(amplitude), mNoise(noise), mRng(static_cast<std::mt19937::result_type>(seed)) {} |
| 94 | + |
| 95 | + void init(o2::framework::InitContext& ic) final |
| 96 | + { |
| 97 | + if (!mCRUs.empty()) { |
| 98 | + LOGP(info, "crus: {}", fmt::join(mCRUs, ", ")); |
| 99 | + } |
| 100 | + if (!mDelayCRUs.empty()) { |
| 101 | + const std::vector<uint32_t> delayCRUsSorted(mDelayCRUs.begin(), mDelayCRUs.end()); |
| 102 | + LOGP(info, "delayCRUs: {}", fmt::join(delayCRUsSorted, ", ")); |
| 103 | + } |
| 104 | + |
| 105 | + mWriteDebug = ic.options().get<bool>("write-debug"); |
| 106 | + if (mWriteDebug) { |
| 107 | + mDebugStreamFileName = ic.options().get<std::string>("debug-file-name"); |
| 108 | + LOGP(info, "Creating debug stream {}", mDebugStreamFileName); |
| 109 | + mDebugStream = std::make_unique<o2::utils::TreeStreamRedirector>(mDebugStreamFileName.data(), "recreate"); |
| 110 | + } |
| 111 | + } |
| 112 | + |
| 113 | + void run(o2::framework::ProcessingContext& ctx) final |
| 114 | + { |
| 115 | + using timer = std::chrono::high_resolution_clock; |
| 116 | + const auto tf = o2::tpc::processing_helpers::getCurrentTF(ctx); |
| 117 | + |
| 118 | + // ── TF dropping ────────────────────────────────────────────────────────── |
| 119 | + if (!mRangeTFsDrop.empty() && tf >= (uint32_t)mRangeTFsDrop.front() && tf <= (uint32_t)mRangeTFsDrop.back()) { |
| 120 | + LOGP(info, "Dropping TF {} (range drop)", tf); |
| 121 | + return; |
| 122 | + } |
| 123 | + if (mDropTFsRandom > 0 && std::uniform_int_distribution<int>{0, mDropTFsRandom - 1}(mRng) == 0) { |
| 124 | + LOGP(info, "Dropping TF {} (random drop)", tf); |
| 125 | + return; |
| 126 | + } |
| 127 | + |
| 128 | + auto start = timer::now(); |
| 129 | + static auto sTimer100TFs = start; |
| 130 | + |
| 131 | + // ── CMV values (generated once per TF, reused for all CRUs) ───────────── |
| 132 | + // NTimeBinsPerTF = NPacketsPerTFPerCRU (4) * NTimeBinsPerPacket (3564) = 14256 |
| 133 | + // Using std::mt19937 + std::normal_distribution for fast noise generation. |
| 134 | + // All CRUs share the same noise realization; the sinusoidal signal is common too. |
| 135 | + const float signal = mAmplitude * std::sin(tf * 0.05f); |
| 136 | + std::normal_distribution<float> noiseDist{0.f, mNoise}; |
| 137 | + std::vector<uint16_t> cmvVec(o2::tpc::cmv::NTimeBinsPerTF); |
| 138 | + for (auto& v : cmvVec) { |
| 139 | + o2::tpc::cmv::Data d; |
| 140 | + d.setCMVFloat(signal + noiseDist(mRng)); |
| 141 | + v = d.getCMV(); |
| 142 | + } |
| 143 | + |
| 144 | + // ── Orbit / BC info (same for all CRUs) ────────────────────────────────── |
| 145 | + // One packed (orbit<<32|bc) entry per CMV packet (4 per TF). |
| 146 | + // Each packet covers 8 heartbeat orbits (NTimeBinsPerPacket = 3564 = 8 LHC orbits), |
| 147 | + // so the orbit advances by 8 per packet and by NPacketsPerTFPerCRU*8 = 32 per TF. |
| 148 | + std::vector<uint64_t> orbitBCVec(o2::tpc::cmv::NPacketsPerTFPerCRU); |
| 149 | + for (uint32_t pkt = 0; pkt < o2::tpc::cmv::NPacketsPerTFPerCRU; ++pkt) { |
| 150 | + const uint32_t orbit = static_cast<uint32_t>(tf * o2::tpc::cmv::NPacketsPerTFPerCRU * sOrbitsPerPacket + pkt * sOrbitsPerPacket); |
| 151 | + orbitBCVec[pkt] = uint64_t(orbit) << 32; // bc = 0 |
| 152 | + } |
| 153 | + |
| 154 | + for (const auto cru : mCRUs) { |
| 155 | + const o2::header::DataHeader::SubSpecificationType subSpec{cru << 7}; |
| 156 | + |
| 157 | + // ── per-CRU delay ──────────────────────────────────────────────────── |
| 158 | + if (mDelayCRUs.count(cru)) { |
| 159 | + std::this_thread::sleep_for(std::chrono::milliseconds(mDelayTimeCRUs)); |
| 160 | + } |
| 161 | + |
| 162 | + ctx.outputs().snapshot(Output{gDataOriginTPC, "CMVVECTOR", subSpec}, cmvVec); |
| 163 | + ctx.outputs().snapshot(Output{gDataOriginTPC, "CMVORBITS", subSpec}, orbitBCVec); |
| 164 | + |
| 165 | + if (mWriteDebug) { |
| 166 | + auto& stream = (*mDebugStream) << "cmvs"; |
| 167 | + stream << "cru=" << cru |
| 168 | + << "tfCounter=" << tf |
| 169 | + << "nCMVs=" << cmvVec.size() |
| 170 | + << "cmvs=" << cmvVec |
| 171 | + << "\n"; |
| 172 | + } |
| 173 | + } |
| 174 | + |
| 175 | + if (!(tf % 100)) { |
| 176 | + const auto elapsed100 = std::chrono::duration_cast<std::chrono::milliseconds>(timer::now() - sTimer100TFs).count(); |
| 177 | + LOGP(info, "Generated CMV data for TF {} ({} ms for last 100 TFs)", tf, elapsed100); |
| 178 | + sTimer100TFs = timer::now(); |
| 179 | + } |
| 180 | + |
| 181 | + // ── global delay ───────────────────────────────────────────────────────── |
| 182 | + if (mDelay) { |
| 183 | + auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(timer::now() - start).count(); |
| 184 | + if (elapsed < mDelayTime) { |
| 185 | + std::this_thread::sleep_for(std::chrono::milliseconds(mDelayTime - elapsed)); |
| 186 | + } |
| 187 | + } |
| 188 | + |
| 189 | + if (tf >= mMaxTFs - 1) { |
| 190 | + ctx.services().get<ControlService>().endOfStream(); |
| 191 | + ctx.services().get<ControlService>().readyToQuit(QuitRequest::Me); |
| 192 | + } |
| 193 | + } |
| 194 | + |
| 195 | + void endOfStream(o2::framework::EndOfStreamContext&) final { closeFiles(); } |
| 196 | + void stop() final { closeFiles(); } |
| 197 | + |
| 198 | + private: |
| 199 | + void closeFiles() |
| 200 | + { |
| 201 | + if (mDebugStream) { |
| 202 | + auto& stream = (*mDebugStream) << "cmvs"; |
| 203 | + auto& tree = stream.getTree(); |
| 204 | + tree.SetAlias("sector", "int(cru/10)"); |
| 205 | + mDebugStream->Close(); |
| 206 | + mDebugStream.reset(nullptr); |
| 207 | + } |
| 208 | + } |
| 209 | + |
| 210 | + const std::vector<uint32_t> mCRUs{}; |
| 211 | + const std::unordered_set<uint32_t> mDelayCRUs{}; |
| 212 | + const unsigned int mMaxTFs{}; |
| 213 | + const bool mDelay{false}; |
| 214 | + const int mDelayTime{1}; |
| 215 | + const int mDelayTimeCRUs{1}; |
| 216 | + const int mDropTFsRandom{0}; |
| 217 | + const std::vector<int> mRangeTFsDrop{}; |
| 218 | + const float mAmplitude{5.f}; |
| 219 | + const float mNoise{1.f}; |
| 220 | + std::mt19937 mRng{}; |
| 221 | + bool mWriteDebug{false}; |
| 222 | + std::string mDebugStreamFileName{}; |
| 223 | + std::unique_ptr<o2::utils::TreeStreamRedirector> mDebugStream{}; |
| 224 | +}; |
| 225 | + |
| 226 | +// ───────────────────────────────────────────────────────────────────────────── |
| 227 | +DataProcessorSpec generateCMVsCRU(const std::vector<uint32_t>& crus, |
| 228 | + const std::unordered_set<uint32_t>& delayCRUs, |
| 229 | + unsigned int maxTFs, |
| 230 | + bool delay, |
| 231 | + int delayTime, |
| 232 | + int delayTimeCRUs, |
| 233 | + int dropTFsRandom, |
| 234 | + const std::vector<int>& rangeTFsDrop, |
| 235 | + float amplitude, |
| 236 | + float noise, |
| 237 | + int seed) |
| 238 | +{ |
| 239 | + std::vector<OutputSpec> outputSpecs; |
| 240 | + outputSpecs.reserve(crus.size() * 2); |
| 241 | + for (const auto cru : crus) { |
| 242 | + const o2::header::DataHeader::SubSpecificationType subSpec{cru << 7}; |
| 243 | + outputSpecs.emplace_back(gDataOriginTPC, "CMVVECTOR", subSpec, Lifetime::Timeframe); |
| 244 | + outputSpecs.emplace_back(gDataOriginTPC, "CMVORBITS", subSpec, Lifetime::Timeframe); |
| 245 | + } |
| 246 | + |
| 247 | + return DataProcessorSpec{ |
| 248 | + "tpc-cmv-generator", |
| 249 | + Inputs{}, |
| 250 | + outputSpecs, |
| 251 | + AlgorithmSpec{adaptFromTask<CMVGeneratorDevice>(crus, delayCRUs, maxTFs, delay, delayTime, delayTimeCRUs, dropTFsRandom, rangeTFsDrop, amplitude, noise, seed)}, |
| 252 | + Options{ |
| 253 | + {"write-debug", VariantType::Bool, false, {"Write a debug output tree"}}, |
| 254 | + {"debug-file-name", VariantType::String, "./cmv_generator_debug.root", {"Name of the debug output file"}}, |
| 255 | + }}; |
| 256 | +} |
| 257 | + |
| 258 | +// ───────────────────────────────────────────────────────────────────────────── |
| 259 | +WorkflowSpec defineDataProcessing(ConfigContext const& config) |
| 260 | +{ |
| 261 | + const auto tpcCRUs = o2::RangeTokenizer::tokenize<int>(config.options().get<std::string>("crus")); |
| 262 | + const std::vector<uint32_t> crus(tpcCRUs.begin(), tpcCRUs.end()); |
| 263 | + |
| 264 | + const auto delayCRUsStr = config.options().get<std::string>("delayCRUs"); |
| 265 | + std::unordered_set<uint32_t> delayCRUs; |
| 266 | + if (!delayCRUsStr.empty()) { |
| 267 | + for (const auto cru : o2::RangeTokenizer::tokenize<int>(delayCRUsStr)) { |
| 268 | + delayCRUs.insert(static_cast<uint32_t>(cru)); |
| 269 | + } |
| 270 | + } |
| 271 | + |
| 272 | + const auto rangeTFsDrop = o2::RangeTokenizer::tokenize<int>(config.options().get<std::string>("dropTFsRange")); |
| 273 | + const auto timeframes = static_cast<unsigned int>(config.options().get<int>("timeframes")); |
| 274 | + const auto delay = config.options().get<bool>("delay"); |
| 275 | + const auto delayTime = config.options().get<int>("delayTime"); |
| 276 | + const auto delayTimeCRUs = config.options().get<int>("delayTimeCRUs"); |
| 277 | + const auto dropTFsRandom = config.options().get<int>("dropTFsRandom"); |
| 278 | + const auto seed = config.options().get<int>("seed"); |
| 279 | + const auto amplitude = config.options().get<float>("amplitude"); |
| 280 | + const auto noise = config.options().get<float>("noise"); |
| 281 | + |
| 282 | + WorkflowSpec workflow; |
| 283 | + workflow.emplace_back(generateCMVsCRU(crus, delayCRUs, timeframes, delay, delayTime, delayTimeCRUs, dropTFsRandom, rangeTFsDrop, amplitude, noise, seed)); |
| 284 | + return workflow; |
| 285 | +} |
0 commit comments