/** * ParamterManager.cc * * Created on: 15 oct. 2012 * Author: AKKA IS */ #include #include #include #include #include #include #include "DicError.hh" #include "ParameterManager.hh" #include "DicError.hh" #include "Parameter.hh" #include "FileConfigurator.hh" #include "ParamOutput.hh" #include "Process.hh" #include "ServicesServer.hh" //#include "TimeTableCatalogFactory.hh" using namespace std; using namespace TimeTableCatalog; namespace AMDA { namespace Parameters { log4cxx::LoggerPtr ParameterManager::_logger( log4cxx::Logger::getLogger("AMDA-Kernel.ParameterManager")); /* * @brief Mutex used to protect multi-thread access to CDF lib (CDF lib is not thread-safe!) * This mutex is a part of the ParameterManager to be shared between by FileWriterCDF (DownloadOutput plugin) * and class and FileReaderCDF (ParamGetLocalFile) */ boost::mutex ParameterManager::mutexCDFLib; /* * @brief Mutex used to protect multi-thread access to NetCDF lib (NetCDF lib is not thread-safe!) * This mutex is a part of the ParameterManager to be shared between by FileWriterNetCDF (DownloadOutput plugin) * and class and FileReaderNetCDF (ParamGetLocalFile) */ boost::mutex ParameterManager::mutexNetCDFLib; #define DEFAULT_GAP_THRESHOLD_VALUE 5 ParameterManager::ParameterManager() : _timeIntervalList(new TimeIntervalList), _defaultGapThreshold(DEFAULT_GAP_THRESHOLD_VALUE) { } ParameterManager::~ParameterManager() { } void ParameterManager::createParameter(const std::string& nameParam) { CfgContext ctx; ctx.push(this); ctx.push(ServicesServer::getInstance()); try { ServicesServer::getInstance()->getConfigurator()->proceed( nameParam.c_str(), ctx); } catch (AMDA::AMDA_exception & e) { e << AMDA::errno_code(AMDA_INFORMATION_PARAM_ERR); throw; } } bool ParameterManager::addParameter(Parameter* pParentParameter, const std::string& pIDParam, ParameterSPtr& pParamResult) { bool result = false; std::string fixedParamId = pIDParam; applyParamIdCorrection(fixedParamId); ParameterSPtr param = findParamInParameterList(pIDParam); if (param == nullptr) { pParamResult = ParameterSPtr(new Parameter(*this, pIDParam)); addParamInParameterList(pParamResult); result = true; } else { pParamResult = param; } if (pParentParameter) { pParentParameter->addParameter(pParamResult); pParamResult->setGapThreshold(pParentParameter->getGapThreshold()); for (auto info : pParentParameter->getInfoList()) pParamResult->setInfoValues(info.first, info.second); } return result; } void ParameterManager::addParamInParameterList(ParameterSPtr& pParam) { ParameterSPtr param = findParamInParameterList(pParam->getId()); if (param != nullptr) { return; } std::string fixedParamId = pParam->getId(); applyParamIdCorrection(fixedParamId); LOG4CXX_DEBUG(_logger, "Add Parameter " << pParam->getId()) _parameterList[fixedParamId] = pParam; } ParameterSPtr ParameterManager::findParamInParameterList(const std::string& paramId) { std::string fixedParamId = paramId; applyParamIdCorrection(fixedParamId); ParameterList::iterator it = _parameterList.find(fixedParamId); if (it != _parameterList.end()) { return _parameterList[fixedParamId]; } return ParameterSPtr(); } void ParameterManager::applyParamIdCorrection(std::string& paramId) { if (_paramIdCorrectionMap.find(paramId) != _paramIdCorrectionMap.end()) { paramId = _paramIdCorrectionMap[paramId]; return; } //Some characters are used to apply an operation, a process, etc... in an expression //These characters can't be used for a parmId //=> replace "-", "+", "*", "/", "^", "(", ")", "[", "]", "{", "}","&","|", //"$" by "_" std::vector charList = { "-", "+", "*", "/", "^", "(", ")", "[", "]", "{", "}", "&", "|", "$", ".", ",", "#", }; std::string originalParamId = paramId; std::string replaceBy; for (auto c : charList) { replaceBy = ""; replaceBy += "_"; replaceBy += std::to_string((int) (c[0])); replaceBy += "_"; paramId = boost::replace_all_copy(paramId, c, replaceBy); } _paramIdCorrectionMap[originalParamId] = paramId; } ParameterSPtr& ParameterManager::checkIfIsANeededParameter(ParameterSPtr& pParam) { Process* lProcess = dynamic_cast (pParam->getDataWriterTemplate().get()); if (lProcess && lProcess->isEmptyExpression()) { ParameterSPtr temp = *pParam->getParameterList().begin(); pParam->delegateOtherTaskTo(temp); std::string fixedParamId = pParam->getId(); _parameterList[fixedParamId] = temp; pParam = temp; } return pParam; } ParameterSPtr ParameterManager::getSampledParameter(const std::string& pIDParam, const std::string &samplingMode, float samplingValue, float gapThreshold, bool isUserProcess) { ParameterSPtr lParameter; if (samplingMode == "") { lParameter = getParameter(pIDParam); } else { //Compute a ResmapledParameter name boost::hash string_hash; stringstream lIdent; stringstream lBuffer; lBuffer << samplingMode << "_" << samplingValue << "_" << gapThreshold; lIdent << pIDParam << "_" << string_hash(lBuffer.str()); LOG4CXX_DEBUG(_logger, "Resampled Parameter to found or create: " << lIdent.str() << " - samplingValue = " << samplingValue << ", gapThreshold = " << gapThreshold); // Search if already exist lParameter = findParamInParameterList(lIdent.str()); if (lParameter != nullptr) { LOG4CXX_DEBUG(_logger, "Resampled Parameter: " << lIdent.str() << " founded"); lParameter = checkIfIsANeededParameter(lParameter); } else { // create Parameter with ProcessRempling on $pIDParam LOG4CXX_DEBUG(_logger, "Resampled Parameter: " << lIdent.str() << " creation"); lParameter = ParameterSPtr(new Parameter(*this, lIdent.str())); for (auto info : getParameter(pIDParam)->getInfoList()) lParameter->setInfoValues(info.first, info.second); //lParameter->setXmlId(getParameter(pIDParam)->getXmlId()); Process* lProcess = NULL; if (samplingMode == "classic") { lProcess = ServicesServer::getInstance()->getProcess("sampling_classic", *lParameter.get()); } else { lProcess = ServicesServer::getInstance()->getProcess("sampling_simple", *lParameter.get()); } if (lProcess) { lBuffer.str(""); lBuffer << samplingValue; lProcess->getAttributList().push_back(lBuffer.str()); lBuffer.str(""); lBuffer << gapThreshold; lProcess->getAttributList().push_back(lBuffer.str()); std::string fixedParamId = pIDParam; applyParamIdCorrection(fixedParamId); lBuffer.str(""); lBuffer << "$" << fixedParamId; lProcess->setExpression(lBuffer.str()); lProcess->setIsUserProcess(isUserProcess); DataWriterSPtr lDataWriter(lProcess); lParameter->setDataWriter(lDataWriter); } else { BOOST_THROW_EXCEPTION(ParameterManager_exception() << AMDA::errno_code(AMDA_PROCESS_ERR) << AMDA::ex_msg(std::string("Cannot found resampling process for: ") + pIDParam)); } addParamInParameterList(lParameter); } } if (!lParameter.get()) { BOOST_THROW_EXCEPTION(ParameterManager_exception() << AMDA::errno_code(AMDA_PARAM_NOT_FOUND_ERR) << AMDA::ex_msg(std::string("Cannot reach parameter: ") + pIDParam)); } return lParameter; } ParameterSPtr ParameterManager::getSampledParameterUnderRefParam(const std::string& pIDParam, const std::string& pIDRefParam, bool isUserProcess) { //make a name for the resampled parameter boost::hash string_hash; stringstream lIdent; stringstream lBuffer; lBuffer << "sampling_under_refparam"; lBuffer << "_" << pIDRefParam; lIdent << pIDParam << "_" << string_hash(lBuffer.str()); LOG4CXX_DEBUG(_logger, "Resampled Parameter under reference parameter to found or create: " << lIdent.str()); // Search if already exist ParameterSPtr lParameter; ParameterSPtr lrefParameter; lrefParameter = getParameter(pIDRefParam); if (lrefParameter == nullptr) { BOOST_THROW_EXCEPTION(ParameterManager_exception() << AMDA::errno_code(AMDA_PARAM_NOT_FOUND_ERR) << AMDA::ex_msg(std::string("Cannot reach reference parameter: ") + pIDRefParam)); } lParameter = findParamInParameterList(lIdent.str()); if (lParameter != nullptr) { LOG4CXX_DEBUG(_logger, "Resampled Parameter under reference parameter : " << lIdent.str() << " founded"); lParameter = checkIfIsANeededParameter(lParameter); } else { // create Parameter with ProcessRempling on $pIDParam LOG4CXX_DEBUG(_logger, "Resampled Parameter under reference parameter : " << lIdent.str() << " creation"); lParameter = ParameterSPtr(new Parameter(*this, lIdent.str())); for (auto info : lrefParameter->getInfoList()) lParameter->setInfoValues(info.first, info.second); //lParameter->setXmlId(getParameter(pIDParam)->getXmlId()); Process* lProcess = NULL; lProcess = ServicesServer::getInstance()->getProcess("sampling_under_refparam", *lParameter.get()); if (lProcess) { std::string fixedParamId = pIDParam; applyParamIdCorrection(fixedParamId); lBuffer.str(""); lBuffer << "$" << fixedParamId; lProcess->setExpression(lBuffer.str()); lProcess->setIsUserProcess(isUserProcess); lProcess->setReferenceParameter(lrefParameter); DataWriterSPtr lDataWriter(lProcess); lParameter->setDataWriter(lDataWriter); } else { BOOST_THROW_EXCEPTION(ParameterManager_exception() << AMDA::errno_code(AMDA_PROCESS_ERR) << AMDA::ex_msg(std::string("Cannot found resampling process for: ") + pIDParam)); } addParamInParameterList(lParameter); } if (!lParameter.get()) { BOOST_THROW_EXCEPTION(ParameterManager_exception() << AMDA::errno_code(AMDA_PARAM_NOT_FOUND_ERR) << AMDA::ex_msg(std::string("Cannot reach parameter: ") + pIDParam)); } return lParameter; } ParameterSPtr ParameterManager::getParameterFromExpression(const std::string& pExpression, double gapThreshold, bool isUserProcess) { boost::hash string_hash; std::stringstream lIdent; lIdent << string_hash(pExpression); if (!isNAN(gapThreshold)) { lIdent << "_"; lIdent << gapThreshold; } ParameterSPtr lParameter; ParameterSPtr lparentParameter; lParameter = findParamInParameterList(lIdent.str()); //check if the parameter already exist if (lParameter != nullptr) { LOG4CXX_DEBUG(_logger, "Parameter from expression : " << lIdent.str() << " founded"); lParameter = checkIfIsANeededParameter(lParameter); } else { // create Parameter LOG4CXX_DEBUG(_logger, "Parameter from expression : " << lIdent.str() << " creation"); lParameter = ParameterSPtr(new Parameter(*this, lIdent.str())); Process* lProcess = NULL; //create a standard process lProcess = ServicesServer::getInstance()->getProcess("standard", *lParameter.get()); if (lProcess) { //set expression and create the data writer lProcess->setExpression(pExpression); lProcess->setIsUserProcess(isUserProcess); DataWriterSPtr lDataWriter(lProcess); lParameter->setDataWriter(lDataWriter); addParamInParameterList(lParameter); } else { stringstream lError; lError << "Process: 'standard' not found"; LOG4CXX_ERROR(_logger, lError.str()); BOOST_THROW_EXCEPTION(ParameterManager_exception() << AMDA::errno_code(AMDA_PROCESS_ERR) << AMDA::ex_msg(lError.str())); } } if (!lParameter.get()) { BOOST_THROW_EXCEPTION(ParameterManager_exception() << AMDA::errno_code(AMDA_PARAM_NOT_FOUND_ERR) << AMDA::ex_msg(std::string("Cannot reach parameter: ") + lIdent.str())); } return lParameter; } ParameterSPtr ParameterManager::getParameter(const std::string& pIDParam) { ParameterSPtr lParameter = findParamInParameterList(pIDParam); if (lParameter != nullptr) { lParameter = checkIfIsANeededParameter(lParameter); } else { LOG4CXX_ERROR(_logger, "parameter '" << pIDParam << "' not exist"); BOOST_THROW_EXCEPTION(ParameterManager_exception() << AMDA::errno_code(AMDA_PARAM_NOT_FOUND_ERR) << AMDA::ex_msg(std::string("Cannot reach parameter: ") + pIDParam)); } return lParameter; } void ParameterManager::execute(string workingPath) { int error = 0; int result = AMDA_EXIT_OK; int lNbTraitement = _paramOutputList.size(); for (ParamOutputList::iterator it = _paramOutputList.begin(); it != _paramOutputList.end(); ++it) { bool lAsError = false; try { (*it)->setWorkPath(workingPath); (*it)->establishConnection(); } catch (AMDA::AMDA_exception & e) { LOG4CXX_ERROR(_logger, "Error resume: " << traitException(result, e)); lAsError = true; } catch (...) { lAsError = true; } if (lAsError) { ++error; _paramOutputList.erase(it--); } } /* // Read time table file. if (!_timeTablePath.empty()) { readTimeTable(workingPath + "/" + _timeTablePath); } */ for (ParamOutputList::iterator it = _paramOutputList.begin(); it != _paramOutputList.end(); ++it) { bool lAsError = false; try { LOG4CXX_DEBUG(_logger, "Give TimeInterval list to method"); (*it)->init(getInputIntervals()); } catch (AMDA::AMDA_exception & e) { LOG4CXX_ERROR(_logger, "Error resume: " << traitException(result, e)); lAsError = true; } catch (...) { lAsError = true; } if (lAsError) { ++error; _paramOutputList.erase(it--); } } boost::thread_group lThGroup; typedef std::vector > ParamOutputResultList; ParamOutputResultList resultList; for (ParamOutputList::iterator it = _paramOutputList.begin(); it != _paramOutputList.end(); ++it) { boost::packaged_task pt(boost::bind(&ParamOutput::process, *it)); boost::shared_future future(pt.get_future()); resultList.push_back(future); lThGroup.add_thread(new boost::thread(boost::move(pt))); } boost::wait_for_all(resultList.begin(), resultList.end()); lThGroup.join_all(); _parameterList.clear(); for (ParamOutputResultList::iterator it = resultList.begin(); it != resultList.end(); ++it) { try { it->get(); } catch (AMDA::AMDA_exception & e) { LOG4CXX_ERROR(_logger, "Error resume: " << traitException(result, e)); ++error; } catch (...) { result = AMDA_ERROR_UNKNOWN; } } // Attempt to terminate each param output. for (ParamOutputList::iterator it = _paramOutputList.begin(); it != _paramOutputList.end(); ++it) { try { (*it)->terminate(); } catch (...) { // Nothing to do } } if (error != 0) { if (lNbTraitement == 1) { BOOST_THROW_EXCEPTION(AMDA::AMDA_exception() << AMDA::errno_code(result)); } else { BOOST_THROW_EXCEPTION(AMDA::AMDA_exception() << AMDA::errno_code(AMDA_PARAM_SOME_ERR)); } } } TimeIntervalListSPtr ParameterManager::getInputIntervals() { return _timeIntervalList; } void ParameterManager::addInputInterval(double pStartTime, double pStopTime, int pIndex, std::string& pttPath, std::string& pttName, int pttTotalIntervals) { // issue https://projects.irap.omp.eu/issues/8481 if (pStopTime == pStartTime) { pStopTime += 1E-3; pStartTime -= 1E-3; } _timeIntervalList->push_back(TimeInterval(pStartTime, pStopTime, pIndex, pttPath, pttName, pttTotalIntervals)); } void ParameterManager::addInputInterval(const TimeInterval& pTimeInterval) { _timeIntervalList->push_back(pTimeInterval); } void ParameterManager::addInputIntervalDataList(const int pIndex, const std::string& pDataKey, const std::vector& dataList) { _timeIntervalDataList[pIndex][pDataKey] = dataList; } std::vector& ParameterManager::getInputIntervalDataList(const int pIndex, const std::string& pDataKey) { return _timeIntervalDataList[pIndex][pDataKey]; } /** * get the computed gap size */ double ParameterManager::getComputedGapSize(double gapThreshold, double minSampling) { return gapThreshold * minSampling; } } /* namespace Parameters */ } /* namespace AMDA */