/* * MultiParamProcess.cc * * Created on: Jan 4, 2013 * Author: f.casimir */ #include #include #include "ParamData.hh" #include "Operation.hh" #include "ParamMgr.hh" #include "MultiParamProcess.hh" namespace AMDA { namespace Parameters { MultiParamProcess::MultiParamProcess(Parameter ¶meter) : Process(parameter), _minSampling(std::numeric_limits::max()) { } MultiParamProcess::MultiParamProcess(const MultiParamProcess &pProcess, Parameter ¶meter) : Process(pProcess,parameter), _paramNameList(pProcess._paramNameList), _minSampling(pProcess._minSampling) { } MultiParamProcess::~MultiParamProcess() { for (ParameterList::iterator it = _paramNameList.begin(); it != _paramNameList.end(); ++it) { ParameterSPtr& parameter = it->second.first; if ( parameter) { parameter->closeConnection(this); } } } /** * @brief Connection to Parameter server. */ void MultiParamProcess::establishConnection() { for (ParameterList::iterator it = _paramNameList.begin(); it != _paramNameList.end(); ++it) { ParameterSPtr& parameter = it->second.first; parameter->openConnection(this); } } TimeStamp MultiParamProcess::init() { TimeStamp time = 0; for (ParameterList::iterator it = _paramNameList.begin(); it != _paramNameList.end(); ++it) { ParameterSPtr& parameter = it->second.first; time = std::max(time, parameter->init(this, _timeIntervalList)); Parameter::InfoList lInfoList = parameter->getInfoList(); _parameter.getInfoList().insert(lInfoList.begin(), lInfoList.end()); _minSampling = std::min(parameter->getParamData(this)->getMinSampling(), _minSampling); } return time; } unsigned int MultiParamProcess::write() { typedef std::list > > ParamDataIndexInfoFutureList; ParamDataIndexInfoFutureList lResultList; unsigned int lNbTotalAvalaible = 0; for (ParameterList::iterator itParam = _paramNameList.begin(); itParam != _paramNameList.end(); ++itParam) { if (itParam->second.second._timeIntToProcessChanged && !_paramDataIndexInfo._timeIntToProcessChanged) continue; lNbTotalAvalaible = itParam->second.second._startIndex + itParam->second.second._nbDataToProcess; if ( _paramDataIndexInfo._startIndex >= lNbTotalAvalaible) { LOG4CXX_DEBUG(_logger, "Need more data Parameter name: " << itParam->first ); lResultList.push_back(make_pair(itParam,(*itParam->second.first).getAsync(this))); } } // Wait data for ( ParamDataIndexInfoFutureList::iterator it= lResultList.begin();it != lResultList.end(); ++it) { LOG4CXX_DEBUG(_logger, "Wait data Parameter name: " << it->first->first ); it->first->second.second=it->second.get(); } //compute minimum data available ParameterList::iterator it = _paramNameList.begin(); if( it != _paramNameList.end()) { ParamDataIndexInfo &lParamDataIndexInfoTmp= (*it).second.second; LOG4CXX_DEBUG(_logger, "Parameter name: " << it->first << " Index: " << lParamDataIndexInfoTmp._startIndex <<" nbDataToProcess " << lParamDataIndexInfoTmp._nbDataToProcess); lNbTotalAvalaible = lParamDataIndexInfoTmp._nbDataToProcess + lParamDataIndexInfoTmp._startIndex ; ++it; for ( ;it != _paramNameList.end(); ++it) { ParamDataIndexInfo &lParamDataIndexInfoTmp= (*it).second.second; LOG4CXX_DEBUG(_logger, "Parameter name: " << it->first << " Index: " << lParamDataIndexInfoTmp._startIndex <<" nbDataToProcess " << lParamDataIndexInfoTmp._nbDataToProcess); lNbTotalAvalaible =std::min(lNbTotalAvalaible, lParamDataIndexInfoTmp._nbDataToProcess + lParamDataIndexInfoTmp._startIndex) ; } } _paramDataIndexInfo._nbDataToProcess = lNbTotalAvalaible - _paramDataIndexInfo._startIndex; _operation->write(_paramDataIndexInfo); bool updateDims = (_paramDataIndexInfo._nbDataToProcess > 0) && (_paramDataIndexInfo._startIndex == 0); _paramDataIndexInfo._startIndex +=_paramDataIndexInfo._nbDataToProcess ; // Reset operation instance to prepare it for the next time interval. bool lTimeIntToProcessChanged = true; bool lNoMoreTimeInt = true; for (ParameterList::iterator itParam = _paramNameList.begin(); itParam != _paramNameList.end(); ++itParam) { lTimeIntToProcessChanged &= itParam->second.second._timeIntToProcessChanged; lNoMoreTimeInt &= itParam->second.second._noMoreTimeInt; } if(lTimeIntToProcessChanged) { LOG4CXX_DEBUG(_logger, "MultiParamProcess => Need to change interval"); _operation->reset(); _paramDataIndexInfo._endTimeIntIndexList.push_back(lNbTotalAvalaible); _paramData->getIndexInfo()._endTimeIntIndexList.push_back(lNbTotalAvalaible); } // There is no more time interval to process else if (lNoMoreTimeInt) { _paramDataIndexInfo._endTimeIntIndexList.push_back(lNbTotalAvalaible); _paramData->getIndexInfo()._endTimeIntIndexList.push_back(lNbTotalAvalaible); LOG4CXX_DEBUG(_logger, "MultiParamProcess => No more time interval to process; end index for this interval is " << _paramDataIndexInfo._endTimeIntIndexList.back()); } else { // Nothing to do. } _paramDataIndexInfo._timeIntToProcessChanged = lTimeIntToProcessChanged; _paramDataIndexInfo._noMoreTimeInt = lNoMoreTimeInt; _paramData->getIndexInfo()._noMoreTimeInt = lNoMoreTimeInt; _paramData->getIndexInfo()._timeIntToProcessChanged = lTimeIntToProcessChanged; if (updateDims) _paramData->updateDims(); return _paramDataIndexInfo._nbDataToProcess; } /* * @brief @ to know the resampling strategy to use. If it's true, the nearest value will be use */ bool MultiParamProcess::useNearestValue() { bool useNearestValue = true; for (ParameterList::iterator it = _paramNameList.begin(); it != _paramNameList.end(); ++it) { ParameterSPtr& parameter = it->second.first; useNearestValue = useNearestValue && parameter->getDataWriterTemplate()->useNearestValue(); } return useNearestValue; } /** * @brief update parameter info in relation to the process */ void MultiParamProcess::updateInfo(Parameter & parameter) { LOG4CXX_DEBUG(_logger, "MultiParamProcess::updateInfo - " << parameter.getId()); if (parameter.getInfoId().empty()) parameter.setInfoId(parameter.getId()); //Param info AMDA::Info::ParamInfoSPtr paramInfo = AMDA::Info::ParamMgr::getInstance()->getParamInfoFromId(parameter.getInfoId(),true); if (paramInfo == nullptr) return; //Derived parameter => no comes from a dataset paramInfo->setDatasetId(""); //set name and short name = param id if (paramInfo->getName().empty()) paramInfo->setName(parameter.getId()); if (paramInfo->getShortName().empty()) paramInfo->setShortName(parameter.getId()); std::string processInfo = "Derived parameter from expression '"; processInfo += getExpression(); processInfo += "'"; for (ParameterList::iterator itParam = _paramNameList.begin(); itParam != _paramNameList.end(); ++itParam) paramInfo->addLinkedParamId(itParam->first); paramInfo->setProcessInfo(processInfo); paramInfo->setProcessDescription(getDescription()); } /* * @brief Get min sampling */ double MultiParamProcess::getMinSampling() { double minSampling = 0; for (auto param : _paramNameList) { ParameterSPtr& parameter = param.second.first; double paramMinSampling = 0; if (parameter->getTimeResolution() > 0) paramMinSampling = parameter->getTimeResolution(); else if (parameter->getDataWriterTemplate() != nullptr) paramMinSampling = parameter->getDataWriterTemplate()->getMinSampling(); if (minSampling == 0) minSampling = paramMinSampling; else if (paramMinSampling != 0) minSampling = std::min(minSampling,paramMinSampling); } return minSampling; } } /* namespace Parameters */ } /* namespace AMDA */