From 7682890b111350b24e0294b18da977f8d7cf806f Mon Sep 17 00:00:00 2001 From: shahoian Date: Mon, 11 May 2026 23:34:49 +0200 Subject: [PATCH] o2-raw-tf-reader supports remote TF reading (with --copy-cmd none) The same conventions as for the o2-ctf-reader. But remote reading of raw files is very slow, makes sense only if a small fraction of data needs to be read. --- Detectors/Raw/TFReaderDD/CMakeLists.txt | 1 + .../TFReaderDD/include/TFReaderDD/BinFileOp.h | 107 +++++++++++ .../TFReaderDD/SubTimeFrameFileReader.h | 84 ++------- Detectors/Raw/TFReaderDD/src/BinFileOp.cxx | 175 ++++++++++++++++++ .../TFReaderDD/src/SubTimeFrameFileReader.cxx | 134 +++++--------- .../Raw/TFReaderDD/src/tf-reader-workflow.cxx | 2 +- 6 files changed, 345 insertions(+), 158 deletions(-) create mode 100644 Detectors/Raw/TFReaderDD/include/TFReaderDD/BinFileOp.h create mode 100644 Detectors/Raw/TFReaderDD/src/BinFileOp.cxx diff --git a/Detectors/Raw/TFReaderDD/CMakeLists.txt b/Detectors/Raw/TFReaderDD/CMakeLists.txt index f87d1b5a7704e..7d113f5bbad92 100644 --- a/Detectors/Raw/TFReaderDD/CMakeLists.txt +++ b/Detectors/Raw/TFReaderDD/CMakeLists.txt @@ -11,6 +11,7 @@ o2_add_library(TFReaderDD SOURCES src/SubTimeFrameFile.cxx + src/BinFileOp.cxx src/SubTimeFrameFileReader.cxx PUBLIC_LINK_LIBRARIES FairRoot::Base O2::Headers diff --git a/Detectors/Raw/TFReaderDD/include/TFReaderDD/BinFileOp.h b/Detectors/Raw/TFReaderDD/include/TFReaderDD/BinFileOp.h new file mode 100644 index 0000000000000..462c52e1a4225 --- /dev/null +++ b/Detectors/Raw/TFReaderDD/include/TFReaderDD/BinFileOp.h @@ -0,0 +1,107 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +// @brief Polymorphic class to access either local or grid files with fixed sed ot methods +#ifndef _BIN_FILE_OP_H_ +#define _BIN_FILE_OP_H_ + +#include +#include +#include +#include +#include + +namespace o2::rawdd +{ + +class BinFileOp +{ + public: + static constexpr size_t KBYTE = 1024; + static constexpr size_t MaxBuffSize = KBYTE << 2; + virtual ~BinFileOp() = default; + + virtual bool read_advance(void* ptr, size_t len) = 0; + virtual void set_position(size_t pos) = 0; + virtual bool ignore_nbytes(size_t pLen) = 0; + virtual bool isGood() const = 0; + virtual unsigned char* bufferize(size_t& s) = 0; + virtual size_t bufferized_size() const = 0; + size_t bufferized_pos() const { return mBufferizedPos; } + size_t size() const { return mFileSize; } + size_t position() const { return mFileOffset; } + size_t distance_to_eof() const { return mFileSize - mFileOffset; } + bool eof() const { return mFileOffset == mFileSize; } + + static BinFileOp* open(const std::string& name); + + protected: + BinFileOp(const std::string& name) : mFileName(name) {} + + std::string mFileName = {}; + size_t mFileOffset = 0; + size_t mFileSize = 0; + size_t mBufferizedPos = -1UL; +}; + +//======================================================================== +class BinFileOpLocal : public BinFileOp +{ + public: + BinFileOpLocal(const std::string& name); + ~BinFileOpLocal() override; + + bool read_advance(void* ptr, size_t len) override; + void set_position(size_t pos) override + { + assert(pos <= mFileSize); + mFileOffset = std::min(pos, mFileSize); + } + unsigned char* bufferize(size_t& s) override; + size_t bufferized_size() const override { return mFileSize - mBufferizedPos; } + + bool ignore_nbytes(size_t len) override; + bool isGood() const override { return mFileMap.is_open(); } + + size_t size() const { return mFileSize; } + size_t position() const { return mFileOffset; } + bool eof() const { return mFileOffset == mFileSize; } + + protected: + boost::iostreams::mapped_file_source mFileMap; +}; + +//======================================================================== +class BinFileOpGrid : public BinFileOp +{ + public: + BinFileOpGrid(const std::string& name); + ~BinFileOpGrid() override = default; + + bool read_advance(void* ptr, size_t len) override; + unsigned char* bufferize(size_t& s) override; + size_t bufferized_size() const override { return mBuffer.size(); } + void set_position(size_t pos) override; + bool ignore_nbytes(size_t len) override; + bool isGood() const override { return mFile && !mFile->IsZombie(); } + + size_t size() const { return mFileSize; } + size_t position() const { return mFileOffset; } + bool eof() const { return mFileOffset == mFileSize; } + + protected: + std::unique_ptr mFile; + std::vector mBuffer; +}; + +} // namespace o2::rawdd + +#endif diff --git a/Detectors/Raw/TFReaderDD/include/TFReaderDD/SubTimeFrameFileReader.h b/Detectors/Raw/TFReaderDD/include/TFReaderDD/SubTimeFrameFileReader.h index 2b7d2b7ab8e74..5fcd8f4f1af32 100644 --- a/Detectors/Raw/TFReaderDD/include/TFReaderDD/SubTimeFrameFileReader.h +++ b/Detectors/Raw/TFReaderDD/include/TFReaderDD/SubTimeFrameFileReader.h @@ -22,8 +22,10 @@ #include #include #include -#include -#include +#include "TFReaderDD/BinFileOp.h" +// RSREM +// #include +// #include #include #include #include @@ -47,34 +49,20 @@ class SubTimeFrameFileReader SubTimeFrameFileReader() = delete; SubTimeFrameFileReader(const std::string& pFileName, o2::detectors::DetID::mask_t detMask, int verb, bool sup0xccdb, bool repaireHeaders, bool rejectDistSTF); - ~SubTimeFrameFileReader(); + ~SubTimeFrameFileReader() = default; /// Read a single TF from the file std::unique_ptr read(fair::mq::Device* device, const std::vector& outputRoutes, const std::string& rawChannel, size_t slice); - /// Tell the current position of the file - inline std::uint64_t position() const { return mFileMapOffset; } - - /// Set the current position of the file - inline void set_position(std::uint64_t pPos) - { - const std::uint64_t lPos = std::min(pPos, mFileSize); - assert(pPos == lPos); - mFileMapOffset = lPos; - } - - /// Is the stream position at EOF - inline bool eof() const { return mFileMapOffset == mFileSize; } - - /// Tell the size of the file - inline std::uint64_t size() const { return mFileSize; } - private: - std::string mFileName; std::unordered_map mDetOrigMap; - boost::iostreams::mapped_file_source mFileMap; - std::uint64_t mFileMapOffset = 0; - std::uint64_t mFileSize = 0; + + std::unique_ptr mFile; + // RSREM + // std::string mFileName; + // boost::iostreams::mapped_file_source mFileMap; + // std::uint64_t mFileMapOffset = 0; + // std::uint64_t mFileSize = 0; int mVerbosity = 0; bool mSup0xccdb = true; @@ -84,55 +72,19 @@ class SubTimeFrameFileReader const std::string describeHeader(const o2::header::DataHeader& hd, bool full = false) const; // helper to make sure written chunks are buffered, only allow pointers - template ::value>> - bool read_advance(pointer pPtr, std::uint64_t pLen) + template ::value>> + inline bool read_advance(pointer pPtr, std::uint64_t pLen) { - if (!mFileMap.is_open()) { + if (!mFile) { return false; } - - assert(mFileMapOffset <= mFileSize); - const std::uint64_t lToRead = std::min(pLen, mFileSize - mFileMapOffset); - - if (lToRead != pLen) { - LOGP(error, "FileReader: request to read beyond the file end. pos={} size={} len={}", - mFileMapOffset, mFileSize, pLen); - LOGP(error, "Closing the file {}. The read data is invalid.", mFileName); - mFileMap.close(); - mFileMapOffset = 0; - mFileSize = 0; - return false; - } - - std::memcpy(reinterpret_cast(pPtr), mFileMap.data() + mFileMapOffset, lToRead); - mFileMapOffset += lToRead; - return true; + return mFile->read_advance(pPtr, pLen); } // return the pointer - unsigned char* peek() const - { - return const_cast(reinterpret_cast(mFileMap.data() + mFileMapOffset)); - } + // inline unsigned char* peek() { mFile->peek(); } - inline bool ignore_nbytes(const std::size_t pLen) - { - const std::size_t lToIgnore = std::min(pLen, std::size_t(mFileSize - mFileMapOffset)); - if (pLen != lToIgnore) { - LOGP(error, "FileReader: request to ignore bytes beyond the file end. pos={} size={} len={}", - mFileMapOffset, mFileSize, pLen); - LOGP(error, "Closing the file {}. The read data is invalid.", mFileName); - mFileMap.close(); - mFileMapOffset = 0; - mFileSize = 0; - return false; - } - - mFileMapOffset += lToIgnore; - assert(mFileMapOffset <= mFileSize); - return true; - } + // inline bool ignore_nbytes(const std::size_t pLen) { mFle->ignore_nbytes(pLen); } std::size_t getHeaderStackSize(); o2::header::Stack getHeaderStack(std::size_t& pOrigsize); diff --git a/Detectors/Raw/TFReaderDD/src/BinFileOp.cxx b/Detectors/Raw/TFReaderDD/src/BinFileOp.cxx new file mode 100644 index 0000000000000..330492c1e0ce9 --- /dev/null +++ b/Detectors/Raw/TFReaderDD/src/BinFileOp.cxx @@ -0,0 +1,175 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +// @brief Polimorphic class to access either local or grid files with fixed sed ot methods + +#include "TFReaderDD/BinFileOp.h" +#include "Framework/Logger.h" +#include "CommonUtils/StringUtils.h" +#include +#include + +namespace o2::rawdd +{ + +//_____________________________________________________________________ +BinFileOp* BinFileOp::open(const std::string& name) +{ + BinFileOp* ptr = o2::utils::Str::beginsWith(name, "alien://") ? static_cast(new BinFileOpGrid(name)) : static_cast(new BinFileOpLocal(name)); + return ptr->isGood() ? ptr : nullptr; +} + +//_____________________________________________________________________ +BinFileOpLocal::BinFileOpLocal(const std::string& name) : BinFileOp(name) +{ + mFileMap.open(name); + if (!mFileMap.is_open()) { + LOG(error) << "Failed to open TF file for reading (mmap)."; + return; + } + mFileSize = mFileMap.size(); + mFileOffset = 0; + +#if __linux__ + madvise((void*)mFileMap.data(), mFileMap.size(), MADV_HUGEPAGE | MADV_SEQUENTIAL | MADV_DONTDUMP); +#endif +} + +BinFileOpLocal::~BinFileOpLocal() +{ + if (!mFileMap.is_open()) { +#if __linux__ + madvise((void*)mFileMap.data(), mFileMap.size(), MADV_DONTNEED); +#endif + mFileMap.close(); + } +} + +bool BinFileOpLocal::read_advance(void* ptr, size_t len) +{ + if (!mFileMap.is_open()) { + return false; + } + assert(mFileOffset <= mFileSize); + const size_t lToRead = std::min(len, mFileSize - mFileOffset); + if (lToRead != len) { + LOGP(error, "BinFileOpLocal: request to read beyond the file end. pos={} size={} len={}, closing the file {}", mFileOffset, mFileSize, len, mFileName); + mFileMap.close(); + mFileOffset = 0; + mFileSize = 0; + return false; + } + std::memcpy(reinterpret_cast(ptr), mFileMap.data() + mFileOffset, lToRead); + mFileOffset += lToRead; + return true; +} + +unsigned char* BinFileOpLocal::bufferize(size_t& s) +{ + if (s > MaxBuffSize) { + LOGP(fatal, "Requested buffer size {} exceeds max allowed {}", s, MaxBuffSize); + } + s = std::min(s, distance_to_eof()); + mBufferizedPos = position(); + return const_cast(reinterpret_cast(mFileMap.data() + mFileOffset)); +} + +bool BinFileOpLocal::ignore_nbytes(size_t len) +{ + const size_t lToIgnore = std::min(len, std::size_t(mFileSize - mFileOffset)); + if (len != lToIgnore) { + LOGP(error, "BinFileOpLocal: request to ignore bytes beyond the file end. pos={} size={} len={}, closing the file {}", mFileOffset, mFileSize, len, mFileName); + mFileMap.close(); + mFileOffset = 0; + mFileSize = 0; + return false; + } + mFileOffset += lToIgnore; + assert(mFileOffset <= mFileSize); + return true; +} + +//_____________________________________________________________________ +BinFileOpGrid::BinFileOpGrid(const std::string& name) : BinFileOp(name) +{ + mFile.reset(TFile::Open(fmt::format("{}?filetype=raw", name).c_str())); + if (!isGood()) { + LOGP(error, "Failed to open file {} for reading.", name); + return; + } + mFileSize = mFile->GetSize(); + set_position(0); + mBuffer.reserve(MaxBuffSize); +} + +void BinFileOpGrid::set_position(size_t pos) +{ + assert(pos <= mFileSize); + mFileOffset = std::min(pos, mFileSize); + mFile->Seek(mFileOffset); +} + +bool BinFileOpGrid::read_advance(void* ptr, size_t len) +{ + if (!isGood()) { + return false; + } + assert(mFileOffset <= mFileSize); + const size_t lToRead = std::min(len, mFileSize - mFileOffset); + if (lToRead != len) { + LOGP(error, "BinFileOpGrid: request to read beyond the file end. pos={} size={} len={}, closing the file {}", mFileOffset, mFileSize, len, mFileName); + mFile.reset(); + mFileOffset = 0; + mFileSize = 0; + return false; + } + if (mFile->ReadBuffer(reinterpret_cast(ptr), lToRead)) { + LOGP(error, "BinFileOpGrid: failed to read {} bytes from position {} of file {}, closing it", lToRead, mFileOffset, mFileName); + } + mFileOffset += lToRead; + return true; +} + +unsigned char* BinFileOpGrid::bufferize(size_t& s) +{ + if (s > MaxBuffSize) { + LOGP(fatal, "Requested buffer size {} exceeds max allowed {}", s, MaxBuffSize); + } + s = std::min(distance_to_eof(), s); + mBufferizedPos = position(); + mBuffer.resize(s); + if (!mFile->ReadBuffer((char*)mBuffer.data(), s)) { + set_position(mBufferizedPos); + return mBuffer.data(); + } + LOGP(error, "BinFileOpGrid:bufferize failed to read {} bytes from position {} of file {}, closing it", s, mFileOffset, mFileName); + set_position(mBufferizedPos); + mBufferizedPos = -1UL; + s = 0; + mBuffer.clear(); + return nullptr; +} + +bool BinFileOpGrid::ignore_nbytes(size_t len) +{ + const size_t lToIgnore = std::min(len, std::size_t(mFileSize - mFileOffset)); + if (len != lToIgnore) { + LOGP(error, "BinFileOpGrid: request to ignore bytes beyond the file end. pos={} size={} len={}, closing the file {}", mFileOffset, mFileSize, len, mFileName); + mFile.reset(); + mFileOffset = 0; + mFileSize = 0; + return false; + } + set_position(mFileOffset + lToIgnore); + return true; +} + +} // namespace o2::rawdd diff --git a/Detectors/Raw/TFReaderDD/src/SubTimeFrameFileReader.cxx b/Detectors/Raw/TFReaderDD/src/SubTimeFrameFileReader.cxx index c8bc6ff374ead..df41430d10b56 100644 --- a/Detectors/Raw/TFReaderDD/src/SubTimeFrameFileReader.cxx +++ b/Detectors/Raw/TFReaderDD/src/SubTimeFrameFileReader.cxx @@ -46,36 +46,19 @@ namespace o2f = o2::framework; //////////////////////////////////////////////////////////////////////////////// SubTimeFrameFileReader::SubTimeFrameFileReader(const std::string& pFileName, o2::detectors::DetID::mask_t detMask, int verb, bool sup0xccdb, bool repaireHeaders, bool rejectDistSTF) - : mFileName(pFileName), mVerbosity(verb), mSup0xccdb(sup0xccdb), mRepaireHeaders(repaireHeaders), mRejectDistSTF(rejectDistSTF) + : mVerbosity(verb), mSup0xccdb(sup0xccdb), mRepaireHeaders(repaireHeaders), mRejectDistSTF(rejectDistSTF) { - mFileMap.open(mFileName); - if (!mFileMap.is_open()) { - LOG(error) << "Failed to open TF file for reading (mmap)."; + mFile.reset(BinFileOp::open(pFileName)); + if (!mFile || !mFile->isGood()) { return; } - mFileSize = mFileMap.size(); - mFileMapOffset = 0; for (DetID::ID id = DetID::First; id <= DetID::Last; id++) { mDetOrigMap[DetID::getDataOrigin(id)] = detMask[id]; } - -#if __linux__ - madvise((void*)mFileMap.data(), mFileMap.size(), MADV_HUGEPAGE | MADV_SEQUENTIAL | MADV_DONTDUMP); -#endif } -SubTimeFrameFileReader::~SubTimeFrameFileReader() -{ - if (!mFileMap.is_open()) { -#if __linux__ - madvise((void*)mFileMap.data(), mFileMap.size(), MADV_DONTNEED); -#endif - mFileMap.close(); - } -} - -std::size_t SubTimeFrameFileReader::getHeaderStackSize() // throws ios_base::failure +Stack SubTimeFrameFileReader::getHeaderStack(std::size_t& pOrigsize) { // Expect valid Stack in the file. // First Header must be DataHeader. The size is unknown since there are multiple versions. @@ -83,67 +66,40 @@ std::size_t SubTimeFrameFileReader::getHeaderStackSize() // throws ios_base::fai // Read first the base header then the rest of the extended header. Keep going until the next flag is set. // reset the file pointer to the original incoming position, so the complete Stack can be read in - bool readNextHeader = true; - std::size_t lStackSize = 0; - DataHeader lBaseHdr; // Use DataHeader since the BaseHeader has no default contructor. - - const auto lFilePosStart = position(); - - const auto cMaxHeaders = 16; /* make sure we don't loop forever */ - auto lNumHeaders = 0; + std::size_t bufsz = 0, lStackSize = 0; + std::byte* lStackMem = nullptr; + pOrigsize = 0; + const auto lFilePosStart = mFile->position(); + const int cMaxHeaders = 16; // make sure we don't loop forever + int lNumHeaders = 0; while (readNextHeader && (++lNumHeaders <= cMaxHeaders)) { - // read BaseHeader only! - const auto lBaseHdrPos = position(); - if (!read_advance(&lBaseHdr, sizeof(BaseHeader))) { - return 0; - } - - // go back, and read the whole O2 header (Base+Derived) - set_position(lBaseHdrPos); - if (!ignore_nbytes(lBaseHdr.size())) { - return 0; + if ((lStackSize + sizeof(BaseHeader)) >= bufsz && !(lStackMem = reinterpret_cast(mFile->bufferize((bufsz += BinFileOp::KBYTE))))) { + LOGP(error, "Could not bufferize {} bytes to read the headers stack", bufsz); + return Stack{}; } - + const auto& lBaseHdr = *reinterpret_cast(lStackMem + lStackSize); lStackSize += lBaseHdr.size(); readNextHeader = (lBaseHdr.next() != nullptr); } - // reset the file pointer - set_position(lFilePosStart); - if (lNumHeaders >= cMaxHeaders) { - LOGP(error, "FileReader: Reached max number of headers allowed: {}.", cMaxHeaders); - return 0; - } - LOGP(debug, "getHeaderStackSize, pos = {}, size = {}", lFilePosStart, lStackSize); - return lStackSize; -} - -Stack SubTimeFrameFileReader::getHeaderStack(std::size_t& pOrigsize) -{ - const auto lStackSize = getHeaderStackSize(); - pOrigsize = lStackSize; - - if (lStackSize < sizeof(BaseHeader)) { - // error in the stream - pOrigsize = 0; + LOGP(error, "Reached max number of headers allowed: {}.", cMaxHeaders); return Stack{}; } - - std::byte* lStackMem = reinterpret_cast(peek()); - if (!ignore_nbytes(lStackSize)) { - // error in the stream - pOrigsize = 0; + if (lStackSize < sizeof(BaseHeader)) { + LOGP(error, "Stack size {} is smaller than BaseHeader size {}", lStackSize, sizeof(BaseHeader)); return Stack{}; } - // This must handle different versions of DataHeader - // check if DataHeader needs an upgrade by looking at the version number + // This must handle different versions of DataHeader, check if DataHeader needs an upgrade by looking at the version number const BaseHeader* lBaseOfDH = BaseHeader::get(lStackMem); if (!lBaseOfDH) { + LOGP(error, "Failed to extract the DataHeader from the buffer, position in file {}", mFile->position()); return Stack{}; } + pOrigsize = lStackSize; + mFile->set_position(lFilePosStart + lStackSize); if (lBaseOfDH->headerVersion < DataHeader::sVersion) { DataHeader lNewDh; @@ -151,15 +107,14 @@ Stack SubTimeFrameFileReader::getHeaderStack(std::size_t& pOrigsize) assert(sizeof(DataHeader) > lBaseOfDH->size()); // current DataHeader must be larger std::memcpy(&lNewDh, (void*)lBaseOfDH->data(), lBaseOfDH->size()); - // make sure to bump the version in the BaseHeader. - // TODO: Is there a better way? + // make sure to bump the version in the BaseHeader. TODO: Is there a better way? lNewDh.headerSize = sizeof(DataHeader); lNewDh.headerVersion = DataHeader::sVersion; if (lBaseOfDH->headerVersion == 1 || lBaseOfDH->headerVersion == 2) { - /* nothing to do for the upgrade */ + // nothing to do for the upgrade } else { - LOGP(error, "FileReader: DataHeader v{} read from file is not upgraded to the current version {}", + LOGP(error, "DataHeader v{} read from file is not upgraded to the current version {}", lBaseOfDH->headerVersion, DataHeader::sVersion); LOGP(error, "Try using a newer version of DataDistribution or file a BUG"); } @@ -168,10 +123,7 @@ Stack SubTimeFrameFileReader::getHeaderStack(std::size_t& pOrigsize) return Stack(lNewDh); } else { assert(lBaseOfDH->size() < lStackSize); - - return Stack( - lNewDh, - Stack(lStackMem + lBaseOfDH->size())); + return Stack(lNewDh, Stack(lStackMem + lBaseOfDH->size())); } } @@ -229,9 +181,9 @@ std::unique_ptr SubTimeFrameFileReader::read(fair::mq::Device* }; // record current position - const auto lTfStartPosition = position(); + const auto lTfStartPosition = mFile->position(); - if (lTfStartPosition == size() || !mFileMap.is_open() || eof()) { + if (lTfStartPosition == mFile->size() || !mFile || !mFile->isGood() || mFile->eof()) { return nullptr; } auto tfID = slice; @@ -256,14 +208,14 @@ std::unique_ptr SubTimeFrameFileReader::read(fair::mq::Device* auto lMetaHdrStack = getHeaderStack(lMetaHdrStackSize); if (lMetaHdrStackSize == 0) { LOG(error) << "Failed to read the TF file header. The file might be corrupted."; - mFileMap.close(); + mFile.reset(nullptr); return nullptr; } lStfMetaDataHdr = o2::header::DataHeader::Get(lMetaHdrStack.first()); if (mVerbosity > 0) { - LOGP(info, "read filemeta, pos = {}, size = {}", position(), sizeof(SubTimeFrameFileMeta)); + LOGP(info, "read filemeta, pos = {}, size = {}", mFile->position(), sizeof(SubTimeFrameFileMeta)); } - if (!read_advance(&lStfFileMeta, sizeof(SubTimeFrameFileMeta))) { + if (!mFile->read_advance(&lStfFileMeta, sizeof(SubTimeFrameFileMeta))) { return nullptr; } if (mVerbosity > 0) { @@ -282,7 +234,7 @@ std::unique_ptr SubTimeFrameFileReader::read(fair::mq::Device* // verify we're actually reading the correct data in if (!(SubTimeFrameFileMeta::getDataHeader().dataDescription == lStfMetaDataHdr->dataDescription)) { LOGP(warning, "Reading bad data: SubTimeFrame META header"); - mFileMap.close(); + mFile.reset(nullptr); return nullptr; } @@ -290,14 +242,14 @@ std::unique_ptr SubTimeFrameFileReader::read(fair::mq::Device* const auto lStfSizeInFile = lStfFileMeta.mStfSizeInFile; if (lStfSizeInFile == (sizeof(DataHeader) + sizeof(SubTimeFrameFileMeta))) { LOGP(warning, "Reading an empty TF from file. Only meta information present"); - mFileMap.close(); + mFile.reset(nullptr); return nullptr; } // check there's enough data in the file - if ((lTfStartPosition + lStfSizeInFile) > this->size()) { - LOGP(warning, "Not enough data in file for this TF. Required: {}, available: {}", lStfSizeInFile, (this->size() - lTfStartPosition)); - mFileMap.close(); + if ((lTfStartPosition + lStfSizeInFile) > mFile->size()) { + LOGP(warning, "Not enough data in file for this TF. Required: {}, available: {}", lStfSizeInFile, (mFile->size() - lTfStartPosition)); + mFile.reset(nullptr); return nullptr; } @@ -308,7 +260,7 @@ std::unique_ptr SubTimeFrameFileReader::read(fair::mq::Device* // Read DataHeader + SubTimeFrameFileMeta auto lStfIndexHdrStack = getHeaderStack(lStfIndexHdrStackSize); if (lStfIndexHdrStackSize == 0) { - mFileMap.close(); + mFile.reset(nullptr); return nullptr; } lStfIndexHdr = o2::header::DataHeader::Get(lStfIndexHdrStack.first()); @@ -317,7 +269,7 @@ std::unique_ptr SubTimeFrameFileReader::read(fair::mq::Device* return nullptr; } - if (!ignore_nbytes(lStfIndexHdr->payloadSize)) { + if (!mFile->ignore_nbytes(lStfIndexHdr->payloadSize)) { return nullptr; } #ifdef _RUN_TIMING_MEASUREMENT_ @@ -339,13 +291,13 @@ std::unique_ptr SubTimeFrameFileReader::read(fair::mq::Device* std::size_t lDataHeaderStackSize = 0; Stack lDataHeaderStack = getHeaderStack(lDataHeaderStackSize); if (lDataHeaderStackSize == 0) { - mFileMap.close(); + mFile.reset(nullptr); return nullptr; } const DataHeader* lDataHeader = o2::header::DataHeader::Get(lDataHeaderStack.first()); if (!lDataHeader) { LOG(error) << "Failed to read the TF HBF DataHeader structure. The file might be corrupted."; - mFileMap.close(); + mFile.reset(nullptr); return nullptr; } DataHeader locDataHeader(*lDataHeader); @@ -389,7 +341,7 @@ std::unique_ptr SubTimeFrameFileReader::read(fair::mq::Device* if (mVerbosity > 0) { LOGP(warn, "Ignoring stored {}", describeHeader(locDataHeader)); } - if (!ignore_nbytes(lDataSize)) { + if (!mFile->ignore_nbytes(lDataSize)) { return nullptr; } lLeftToRead -= (lDataHeaderStackSize + lDataSize); // update the counter @@ -407,7 +359,7 @@ std::unique_ptr SubTimeFrameFileReader::read(fair::mq::Device* // do we accept these data? auto detOrigStatus = mDetOrigMap.find(locDataHeader.dataOrigin); if (detOrigStatus != mDetOrigMap.end() && !detOrigStatus->second) { // this is a detector data and we don't want to read it - if (!ignore_nbytes(lDataSize)) { + if (!mFile->ignore_nbytes(lDataSize)) { return nullptr; } lLeftToRead -= (lDataHeaderStackSize + lDataSize); // update the counter @@ -421,7 +373,7 @@ std::unique_ptr SubTimeFrameFileReader::read(fair::mq::Device* findChanSW.Stop(); #endif if (fmqChannel.empty()) { // no output channel - if (!ignore_nbytes(lDataSize)) { + if (!mFile->ignore_nbytes(lDataSize)) { return nullptr; } lLeftToRead -= (lDataHeaderStackSize + lDataSize); // update the counter @@ -441,7 +393,7 @@ std::unique_ptr SubTimeFrameFileReader::read(fair::mq::Device* msgSW.Stop(); #endif memcpy(lHdrStackMsg->GetData(), headerStack.data(), headerStack.size()); - LOGP(debug, "read data, pos = {}, size = {} leftToRead {}", position(), lDataSize, lLeftToRead); + LOGP(debug, "read data, pos = {}, size = {} leftToRead {}", mFile->position(), lDataSize, lLeftToRead); if (!read_advance(lDataMsg->GetData(), lDataSize)) { return nullptr; diff --git a/Detectors/Raw/TFReaderDD/src/tf-reader-workflow.cxx b/Detectors/Raw/TFReaderDD/src/tf-reader-workflow.cxx index a29b4dadfdb25..87f67b65ab9db 100644 --- a/Detectors/Raw/TFReaderDD/src/tf-reader-workflow.cxx +++ b/Detectors/Raw/TFReaderDD/src/tf-reader-workflow.cxx @@ -30,7 +30,7 @@ void customize(std::vector& workflowOptions) options.push_back(ConfigParamSpec{"non-raw-only-det", VariantType::String, "none", {"do not open raw channel for these detectors"}}); options.push_back(ConfigParamSpec{"loop", VariantType::Int, 0, {"loop N times (-1 = infinite)"}}); options.push_back(ConfigParamSpec{"delay", VariantType::Float, 0.f, {"delay in seconds between consecutive TFs sending"}}); - options.push_back(ConfigParamSpec{"copy-cmd", VariantType::String, "alien_cp ?src file://?dst", {"copy command for remote files"}}); // Use "XrdSecPROTOCOL=sss,unix xrdcp -N root://eosaliceo2.cern.ch/?src ?dst" for direct EOS access + options.push_back(ConfigParamSpec{"copy-cmd", VariantType::String, "alien_cp ?src file://?dst", {"copy command for remote files or no-copy to avoid copying"}}); // Use "XrdSecPROTOCOL=sss,unix xrdcp -N root://eosaliceo2.cern.ch/?src ?dst" for direct EOS access options.push_back(ConfigParamSpec{"copy-dir", VariantType::String, "/tmp/", {"copy base directory for remote files"}}); options.push_back(ConfigParamSpec{"tf-file-regex", VariantType::String, ".+\\.tf$", {"regex string to identify TF files"}}); options.push_back(ConfigParamSpec{"remote-regex", VariantType::String, "^(alien://|)/alice/data/.+", {"regex string to identify remote files"}}); // Use "^/eos/aliceo2/.+" for direct EOS access