/* * VirtualInstrumentInterval.cc * * Created on: Nov 25, 2014 * Author: AKKA */ #include #include "DicError.hh" #include "TimeUtil.hh" #include "LocalFileInterfaceConfig.hh" #include "VirtualInstrumentManager.hh" #include "VirtualInstrument.hh" #include "VirtualInstrumentInterval.hh" #include "LocalParamData.hh" #include "TimeInterval.hh" namespace AMDA { namespace LocalFileInterface { VirtualInstrumentInterval::VirtualInstrumentInterval(VirtualInstrument& pVI, TimeIntervalList* pTimeIntervalList) : Worker(), _vi(pVI), _timeIntervalList(*pTimeIntervalList), _currentTimeIntToProcess(_timeIntervalList.end()), _timeIntToProcessChanged(false), _crtFileIndex(0), _crtRecordIndex(0) { _step = &VirtualInstrumentInterval::writeData; // Initialize file reader to point to the first TimeInterval. if (!setTimeInterval()) { LOG4CXX_ERROR(gLogger, "At least one time interval must be specified to send request"); BOOST_THROW_EXCEPTION(AMDA::AMDA_exception() << AMDA::ex_msg("At least one time interval must be specified")); } active(); } VirtualInstrumentInterval::~VirtualInstrumentInterval() { } unsigned int VirtualInstrumentInterval::writeData() { LOG4CXX_DEBUG(gLogger, "VirtualInstrumentInterval::writeData"); if (_paramFlowSPtr == nullptr) { LOG4CXX_ERROR(gLogger, "VirtualInstrumentInterval::writeData - No param flow defined"); BOOST_THROW_EXCEPTION(AMDA::AMDA_exception() << AMDA::ex_msg("No param flow defined")); return 0; } unsigned int ret = 0; if (_currentTimeIntToProcess == _timeIntervalList.end()) return 0; _timeIntToProcessChanged = false; double lStopTime = (*_currentTimeIntToProcess)._stopTime; int crtIntervalRecIndex = _crtRecordIndex; int crtIntervalFileIndex = _crtFileIndex; for(auto paramInfo : _paramFlowSPtr->getParamInfoMap()) { ret = 0; if (paramInfo.second == nullptr) continue; std::string paramId = paramInfo.first; bool moreData = true; int crtParamRecIndex = crtIntervalRecIndex; int crtParamFileIndex = crtIntervalFileIndex; LOG4CXX_DEBUG(gLogger, "VirtualInstrumentInterval::writeData for " << paramId << " - crtParamRecIndex = " << crtParamRecIndex << " - crtParamFileIndex " << crtParamFileIndex); while (moreData) { LocalParamDataPacket *pPacket= new LocalParamDataPacket(); int paramDim1Size = paramInfo.second->getDim1Size(); int paramDim2Size = paramInfo.second->getDim2Size(); LocalContainerType containerType = CONTAINER_SCALAR; if ((paramDim1Size > 1) && (paramDim2Size <= 1)) containerType = CONTAINER_VECTOR; else if ((paramDim1Size > 1) && (paramDim2Size > 1)) containerType = CONTAINER_MATRIX; if (!pPacket->init(containerType,paramInfo.second->getType(),paramDim1Size,paramDim2Size)) { delete pPacket; LOG4CXX_ERROR(gLogger, "VirtualInstrumentInterval::writeData - Cannot init data packet for " << paramId); moreData = false; continue; } FileReaderStatus status = _vi.getParamPacketData(paramId, crtParamFileIndex, crtParamRecIndex, lStopTime, pPacket); switch (status) { case FRS_MORE : _paramFlowSPtr->push(paramId,pPacket); //there is more data to get crtParamRecIndex += pPacket->getNbData(); break; case FRS_EOF : //end of file _paramFlowSPtr->push(paramId,pPacket); //if it's last ParamFlow to proceed => update recordIndex and fileIndex crtParamRecIndex = 0; ++crtParamFileIndex; break; case FRS_FINISH : //stop time is reached _paramFlowSPtr->push(paramId,pPacket); //_paramFlowSPtr->push(paramId,nullptr); moreData = false; break; case FRS_ERROR : //error detected delete pPacket; LOG4CXX_ERROR(gLogger, "VirtualInstrumentInterval::writeData - Error detected to write data for " << paramId); BOOST_THROW_EXCEPTION(AMDA::AMDA_exception() << AMDA::ex_msg("Cannot write data")); break; } } } _step = &VirtualInstrumentInterval::writeEmptyData; return ret; } unsigned int VirtualInstrumentInterval::writeEmptyData() { LOG4CXX_INFO(gLogger, "VirtualInstrumentInterval::writeEmptyData"); if (setTimeInterval()) { _timeIntToProcessChanged = true; _step = &VirtualInstrumentInterval::writeData; } for(auto paramInfo : _paramFlowSPtr->getParamInfoMap()) { std::string paramId = paramInfo.first; _paramFlowSPtr->push(paramId,nullptr); } return 0; } bool VirtualInstrumentInterval::setTimeInterval() { if (_currentTimeIntToProcess == _timeIntervalList.end()) { _currentTimeIntToProcess = _timeIntervalList.begin(); } else if (++_currentTimeIntToProcess != _timeIntervalList.end()) { LOG4CXX_DEBUG(gLogger, "VirtualInstrumentInterval => Get Next TimeInterval"); // Nothing to do. } else { LOG4CXX_DEBUG(gLogger, "VirtualInstrumentInterval => Reached all TimeInterval"); return false; } double lStartTime = (*_currentTimeIntToProcess)._startTime; double lStopTime = (*_currentTimeIntToProcess)._stopTime; if (lStopTime < _vi.getGlobalStartTime() || lStartTime > _vi.getGlobalStopTime()) { LOG4CXX_ERROR(gLogger, "VirtualInstrumentInterval::setTimeInterval - Cannot get data for this time interval - Outside of global start and global stop definition"); BOOST_THROW_EXCEPTION(AMDA::AMDA_exception() << AMDA::errno_code(AMDA_OUTOFTIME_ERR) << AMDA::ex_msg("Cannot get data for this time interval - Outside of global start and global stop definition")); } LOG4CXX_DEBUG(gLogger, "VirtualInstrumentInterval => start time: " << lStartTime << " lStopTime: " << lStopTime); // if (!_vi.getDataPosition(lStartTime, _crtFileIndex, _crtRecordIndex)) { LOG4CXX_ERROR(gLogger, "VirtualInstrumentInterval::setTimeInterval - Cannot get data for this time interval"); BOOST_THROW_EXCEPTION(AMDA::AMDA_exception() << AMDA::errno_code(AMDA_OUTOFTIME_ERR) << AMDA::ex_msg("Cannot get data for this time interval")); } LOG4CXX_DEBUG(gLogger, "VirtualInstrumentInterval => _crtFileIndex: " << _crtFileIndex << ", _crtRecordIndex: " << _crtRecordIndex); return true; } unsigned int VirtualInstrumentInterval::getOneDataBloc() { LOG4CXX_DEBUG(gLogger, "VirtualInstrumentInterval::getOneDataBloc"); return (this->*_step)(); } ParamFlowSPtr VirtualInstrumentInterval::getParamFlow(const std::string& pParamId, LocalParamType paramType, int dim1Size, int dim2Size) { if (_paramFlowSPtr == nullptr) _paramFlowSPtr.reset(new ParamFlow(*this)); _paramFlowSPtr->addAssociatedParam(pParamId,paramType,dim1Size,dim2Size); return _paramFlowSPtr; } } /* namespace DDServerInterface */ } /* namespace AMDA */