/* * MultiParamProcess.cc * * Created on: Jan 4, 2013 * Author: f.casimir */ #include #include #include "ParamData.hh" #include "Operation.hh" #include "ParamMgr.hh" #include "MultiParamProcess.hh" namespace AMDA { namespace Parameters { MultiParamProcess::MultiParamProcess(Parameter ¶meter) : Process(parameter), _minSampling(std::numeric_limits::max()) { } MultiParamProcess::MultiParamProcess(const MultiParamProcess &pProcess, Parameter ¶meter) : Process(pProcess,parameter), _paramNameList(pProcess._paramNameList), _minSampling(pProcess._minSampling) { establishConnection(); } MultiParamProcess::~MultiParamProcess() { for (ParameterList::iterator it = _paramNameList.begin(); it != _paramNameList.end(); ++it) { ParameterSPtr& parameter = it->second.first; if ( parameter) { parameter->closeConnection(this); } } } /** * @brief Connection to Parameter server. */ void MultiParamProcess::establishConnection() { for (ParameterList::iterator it = _paramNameList.begin(); it != _paramNameList.end(); ++it) { ParameterSPtr& parameter = it->second.first; parameter->openConnection(this); } } TimeStamp MultiParamProcess::init() { TimeStamp time = 0; for (ParameterList::iterator it = _paramNameList.begin(); it != _paramNameList.end(); ++it) { ParameterSPtr& parameter = it->second.first; time = std::max(time, parameter->init(this, _timeIntervalList)); Parameter::InfoList lInfoList = parameter->getInfoList(); _parameter.getInfoList().insert(lInfoList.begin(), lInfoList.end()); _minSampling = std::min(parameter->getParamData(this)->getMinSampling(), _minSampling); } return time; } unsigned int MultiParamProcess::write() { typedef std::list > > ParamDataIndexInfoFutureList; ParamDataIndexInfoFutureList lResultList; unsigned int lNbTotalAvalaible = 0; unsigned int nbDataBeforeCallProcess = _paramData->getDataNumber(); if (_paramDataIndexInfo._timeIntToProcessChanged) { for (ParameterList::iterator itParam = _paramNameList.begin(); itParam != _paramNameList.end(); ++itParam) { LOG4CXX_DEBUG(_logger, "MultiParamProcess::write - New interval - Need more data for: " << itParam->first ); (*itParam).second.second = (*itParam->second.first).getAsync(this).get(); } } else { for (ParameterList::iterator itParam = _paramNameList.begin(); itParam != _paramNameList.end(); ++itParam) { if (itParam->second.second._timeIntToProcessChanged) { continue; } lNbTotalAvalaible = itParam->second.second._startIndex + itParam->second.second._nbDataToProcess; if ( _paramDataIndexInfo._startIndex >= lNbTotalAvalaible) { LOG4CXX_DEBUG(_logger, "MultiParamProcess::write - Need more data for: " << itParam->first ); (*itParam).second.second = (*itParam->second.first).getAsync(this).get(); } } } //compute minimum data available ParameterList::iterator it = _paramNameList.begin(); if( it != _paramNameList.end()) { ParamDataIndexInfo &lParamDataIndexInfoTmp= (*it).second.second; LOG4CXX_DEBUG(_logger, "Parameter name: " << it->first << " Index: " << lParamDataIndexInfoTmp._startIndex <<" nbDataToProcess " << lParamDataIndexInfoTmp._nbDataToProcess); lNbTotalAvalaible = lParamDataIndexInfoTmp._nbDataToProcess + lParamDataIndexInfoTmp._startIndex ; ++it; for ( ;it != _paramNameList.end(); ++it) { ParamDataIndexInfo &lParamDataIndexInfoTmp= (*it).second.second; LOG4CXX_DEBUG(_logger, "Parameter name: " << it->first << " Index: " << lParamDataIndexInfoTmp._startIndex <<" nbDataToProcess " << lParamDataIndexInfoTmp._nbDataToProcess); lNbTotalAvalaible =std::min(lNbTotalAvalaible, lParamDataIndexInfoTmp._nbDataToProcess + lParamDataIndexInfoTmp._startIndex) ; } } _paramDataIndexInfo._nbDataToProcess = lNbTotalAvalaible - _paramDataIndexInfo._startIndex; _operation->write(_paramDataIndexInfo); bool updateDims = (_paramDataIndexInfo._nbDataToProcess > 0) && (_paramDataIndexInfo._startIndex == 0); _paramDataIndexInfo._startIndex +=_paramDataIndexInfo._nbDataToProcess ; // Reset operation instance to prepare it for the next time interval. bool lTimeIntToProcessChanged = true; bool lNoMoreTimeInt = true; for (ParameterList::iterator itParam = _paramNameList.begin(); itParam != _paramNameList.end(); ++itParam) { lTimeIntToProcessChanged &= itParam->second.second._timeIntToProcessChanged; lNoMoreTimeInt &= itParam->second.second._noMoreTimeInt; } if(lTimeIntToProcessChanged) { LOG4CXX_DEBUG(_logger, "MultiParamProcess => Need to change interval"); _operation->reset(); _paramDataIndexInfo._endTimeIntIndexList.push_back(lNbTotalAvalaible); _paramData->getIndexInfo()._endTimeIntIndexList.push_back(lNbTotalAvalaible); } // There is no more time interval to process else if (lNoMoreTimeInt) { _paramDataIndexInfo._endTimeIntIndexList.push_back(lNbTotalAvalaible); _paramData->getIndexInfo()._endTimeIntIndexList.push_back(lNbTotalAvalaible); LOG4CXX_DEBUG(_logger, "MultiParamProcess => No more time interval to process; end index for this interval is " << _paramDataIndexInfo._endTimeIntIndexList.back()); } else { // Nothing to do. } _paramDataIndexInfo._timeIntToProcessChanged = lTimeIntToProcessChanged; _paramDataIndexInfo._noMoreTimeInt = lNoMoreTimeInt; _paramData->getIndexInfo()._noMoreTimeInt = lNoMoreTimeInt; _paramData->getIndexInfo()._timeIntToProcessChanged = lTimeIntToProcessChanged; if (updateDims) _paramData->updateDims(); return _paramData->getDataNumber() - nbDataBeforeCallProcess; } /* * @brief @ to know the resampling strategy to use. If it's true, the nearest value will be use */ bool MultiParamProcess::useNearestValue() { bool useNearestValue = true; for (ParameterList::iterator it = _paramNameList.begin(); it != _paramNameList.end(); ++it) { ParameterSPtr& parameter = it->second.first; useNearestValue = useNearestValue && parameter->getDataWriterTemplate()->useNearestValue(); } return useNearestValue; } /** * @brief update parameter info in relation to the process */ void MultiParamProcess::updateInfo(Parameter & parameter) { LOG4CXX_DEBUG(_logger, "MultiParamProcess::updateInfo - " << parameter.getId()); if (parameter.getInfoId().empty()) parameter.setInfoId(parameter.getId()); //Param info AMDA::Info::ParamInfoSPtr paramInfo = AMDA::Info::ParamMgr::getInstance()->getParamInfoFromId(parameter.getInfoId(),true); if (paramInfo == nullptr) return; //Derived parameter => no comes from a dataset paramInfo->setDatasetId(""); //set name and short name = param id if (paramInfo->getName().empty()) paramInfo->setName(parameter.getId()); if (paramInfo->getShortName().empty()) paramInfo->setShortName(parameter.getId()); std::string processInfo = "Derived parameter from expression '"; processInfo += getExpression(); processInfo += "'"; for (ParameterList::iterator itParam = _paramNameList.begin(); itParam != _paramNameList.end(); ++itParam) paramInfo->addLinkedParamId(itParam->first); // adding tableParams into linked params std::map> tables =paramInfo->getTables(); if(! tables.empty()){ for(auto table : tables){ if (table.second != nullptr){ if(table.second->isVariable(¶meter.getParameterManager())){ std::map tableParams = table.second->getTableParams(¶meter.getParameterManager()); if(! tableParams.empty()) for(auto tableParam : tableParams){ paramInfo->addLinkedParamId(tableParam.second); } } } } } paramInfo->setProcessInfo(processInfo); std::string description = getDescription(); if (description.empty()) { description = getExpression(); } if ((_paramNameList.size() == 1) && paramInfo->getTables().empty()) { AMDA::Info::ParamInfoSPtr inputParamInfo = AMDA::Info::ParamMgr::getInstance()->getParamInfoFromId(_paramNameList.begin()->second.first->getInfoId()); if (inputParamInfo != nullptr) { //Copy tables std::map> inputTables = inputParamInfo->getTables(); for (std::map>::iterator it = inputTables.begin(); it != inputTables.end(); ++it) { paramInfo->addTable(it->first, it->second); } } } paramInfo->setProcessDescription(description); if ((_paramNameList.size() == 1) && paramInfo->getAdditionInfo().empty()) { AMDA::Info::ParamInfoSPtr inputParamInfo = AMDA::Info::ParamMgr::getInstance()->getParamInfoFromId(_paramNameList.begin()->second.first->getInfoId()); if (inputParamInfo != nullptr) { // Copy tables std::map inputAdditionals = inputParamInfo->getAdditionInfo(); for (std::map::iterator it = inputAdditionals.begin(); it != inputAdditionals.end(); ++it) { paramInfo->addAdditionalInfo(it->first, it->second); } } } } /* * @brief Get min sampling */ double MultiParamProcess::getMinSampling() { double minSampling = 0; for (auto param : _paramNameList) { ParameterSPtr& parameter = param.second.first; double paramMinSampling = 0; if (!parameter->getReferenceParameter().empty()) { std::string refParamId = parameter->getReferenceParameter(); ParameterSPtr refParam = parameter->getParameterManager().getParameter(refParamId); paramMinSampling = refParam->getDataWriterTemplate()->getMinSampling(); } else if (parameter->getTimeResolution() > 0) paramMinSampling = parameter->getTimeResolution(); else if (parameter->getDataWriterTemplate() != nullptr) paramMinSampling = parameter->getDataWriterTemplate()->getMinSampling(); if (minSampling == 0) minSampling = paramMinSampling; else if (paramMinSampling != 0) minSampling = std::min(minSampling,paramMinSampling); } return minSampling; } } /* namespace Parameters */ } /* namespace AMDA */