/** * ProcessSamplingUnderTimeList.cc * * Created on: Jul 25, 2014 * Author: AKKA IS */ #include #include #include "ProcessSamplingUnderRefParam.hh" #include "Parameter.hh" #include "ParameterManager.hh" #include "ParameterCreatorFromExpression.hh" #include "Resampling.hh" #include "ParamMgr.hh" using namespace std; using namespace boost; using namespace log4cxx; namespace AMDA { namespace Parameters { ProcessSamplingUnderRefParam::ProcessSamplingUnderRefParam(Parameter ¶meter) : SingleParamProcess_CRTP(parameter), _paramRefInput(NULL) { } ProcessSamplingUnderRefParam::ProcessSamplingUnderRefParam(const ProcessSamplingUnderRefParam& pProcess, Parameter ¶meter) : SingleParamProcess_CRTP(pProcess,parameter), _paramRefInput(pProcess._paramRefInput) { } ProcessSamplingUnderRefParam::~ProcessSamplingUnderRefParam() { if (_refParameterSPtr != nullptr) { _refParameterSPtr->closeConnection(this); } } void ProcessSamplingUnderRefParam::establishConnection() { if (_refParameterSPtr != nullptr) _refParameterSPtr->openConnection(this); SingleParamProcess_CRTP::establishConnection(); } TimeStamp ProcessSamplingUnderRefParam::init() { /// Init input parameter TimeStamp timeStamp = _parameterInput->init( this, _timeIntervalList); /// Calibration information copy Parameter::InfoList lInfoList = _parameterInput->getInfoList(); _parameter.getInfoList().insert(lInfoList.begin(), lInfoList.end()); _paramInput = _parameterInput->getParamData(this).get(); // check reference parameter if (_refParameterSPtr == nullptr) { BOOST_THROW_EXCEPTION(AMDA::AMDA_exception() << AMDA::errno_code(AMDA_PROCESS_ERR) << AMDA::ex_msg(std::string("ProcessSamplingUnderRefParam : cannot retrieve reference parameter "))); } timeStamp = std::max(timeStamp, _refParameterSPtr->init( this, _timeIntervalList)); _paramRefInput = _refParameterSPtr->getParamData(this).get(); double computedGapSize = _refParameterSPtr->getParameterManager().getComputedGapSize(_parameterInput->getGapThreshold(), _paramInput->getMinSampling()); // ProcessSamplingClassic _operation creation Resampling::CreateResampling lCreateResampling(*this, _timeIntervalList, *_paramInput, *_paramRefInput, computedGapSize, _parameterInput->getDataWriterTemplate()->useNearestValue()); _operation = lCreateResampling.getResampling(); static_cast(_operation)->init(); /// Get result ParamData _paramData = ParamDataSPtr(_operation->getParamOutput()); _paramData->setMinSampling(static_cast(_operation)->getSampling()); return timeStamp; } unsigned int ProcessSamplingUnderRefParam::write() { _paramRefInput = _refParameterSPtr->getParamData(this).get(); //ParamDataIndexInfo lRefParamDataIndexInfo = _paramRefInput->getIndexInfo(); /*if (!lRefParamDataIndexInfo._timeIntToProcessChanged) { do { lRefParamDataIndexInfo = _refParameterSPtr->getAsync(this).get(); } while(!lRefParamDataIndexInfo._timeIntToProcessChanged && (lRefParamDataIndexInfo._nbDataToProcess != 0)); }*/ int ret = 0; unsigned int nbDataBeforeCallProcess = _paramData->getDataNumber(); ParamDataIndexInfo lParamDataIndexInfo = _parameterInput->getParamData(this).get()->getIndexInfo(); this->_treatTerminated = false; do { //if (!lParamDataIndexInfo._timeIntToProcessChanged && (lParamDataIndexInfo._nbDataToProcess == 0)) lParamDataIndexInfo =_parameterInput->getAsync(this).get(); if ( ! this->_treatTerminated ) { _operation->write(lParamDataIndexInfo); ret = _paramData->getDataNumber() - nbDataBeforeCallProcess; // As we do not have output data, continue processing the input data } if ( lParamDataIndexInfo._nbDataToProcess == 0) { this->_treatTerminated = true; } } while(ret == 0 && lParamDataIndexInfo._nbDataToProcess != 0); // Reset operation to prepare static data for the next TimeInterval. if (lParamDataIndexInfo._timeIntToProcessChanged) { _paramData->getIndexInfo()._endTimeIntIndexList.push_back(_paramData->getDataNumber()); _operation->reset(); } // There is no more time interval to process else if (lParamDataIndexInfo._noMoreTimeInt) { _paramData->getIndexInfo()._endTimeIntIndexList.push_back(_paramData->getDataNumber()); } else { // Nothing to do. } // Pull up information on which time interval changed. _paramData->getIndexInfo()._timeIntToProcessChanged = lParamDataIndexInfo._timeIntToProcessChanged; _paramData->getIndexInfo()._noMoreTimeInt = lParamDataIndexInfo._noMoreTimeInt; return ret; /*while (lRefParamDataIndexInfo._nbDataToProcess > 0) { lRefParamDataIndexInfo = _refParameterSPtr->getAsync(this).get(); }*/ //if (!_paramRefInput->getIndexInfo()._timeIntToProcessChanged) { /*unsigned int nbDataBeforeCallProcess = _paramRefInput->getDataNumber(); ParamDataIndexInfo lRefParamDataIndexInfo; bool _treatTerminated = false; int ret = 0; do { lRefParamDataIndexInfo = _refParameterSPtr->getAsync(this).get(); if ( ! _treatTerminated ) { ret = _paramRefInput->getDataNumber() - nbDataBeforeCallProcess; // As we do not have output data, continue processing the input data } if ( lRefParamDataIndexInfo._nbDataToProcess == 0) { _treatTerminated = true; } } while(ret == 0 && lRefParamDataIndexInfo._nbDataToProcess != 0);*/ // Reset operation to prepare static data for the next TimeInterval. /*if (lRefParamDataIndexInfo._timeIntToProcessChanged) { _paramRefInput->getIndexInfo()._endTimeIntIndexList.push_back(_paramRefInput->getDataNumber()); } // There is no more time interval to process else if (!lRefParamDataIndexInfo._timeIntToProcessChanged && lRefParamDataIndexInfo._nbDataToProcess == 0) { _paramRefInput->getIndexInfo()._endTimeIntIndexList.push_back(_paramRefInput->getDataNumber()); } else { // Nothing to do. } _paramRefInput->getIndexInfo()._timeIntToProcessChanged = true;*/ //} /*int ret = SingleParamProcess_CRTP::write(); _paramRefInput->getIndexInfo()._timeIntToProcessChanged = _paramData->getIndexInfo()._timeIntToProcessChanged; return ret;*/ //return lRefParamDataIndexInfo._nbDataToProcess; } /* * @brief Get min sampling */ double ProcessSamplingUnderRefParam::getMinSampling() { if (_refParameterSPtr == nullptr) return 0; if (_refParameterSPtr->getDataWriterTemplate() == nullptr) return 0; return _refParameterSPtr->getDataWriterTemplate()->getMinSampling(); } /** * @overload SingleParamProcess_CRTP::updateInfo update parameter info in relation to the sampling process */ void ProcessSamplingUnderRefParam::updateInfo(Parameter & parameter) { LOG4CXX_DEBUG(_logger, "ProcessSamplingUnderRefParam::updateInfo - " << parameter.getId()); SingleParamProcess::updateInfo(parameter); //Param info AMDA::Info::ParamInfoSPtr paramInfo = AMDA::Info::ParamMgr::getInstance()->getParamInfoFromId(parameter.getInfoId()); if (paramInfo == nullptr) return; std::string processInfo = "Resampling of '"; processInfo += _parameterInput->getId(); processInfo += "' under times list of '"; processInfo += _refParameterSPtr->getId(); processInfo += "'"; if (_parameterInput->getDataWriterTemplate()->useNearestValue()) processInfo += ". Use nearest value."; paramInfo->setProcessInfo(processInfo); } } /* namespace Parameters */ } /* namespace AMDA */