/** * ParamterManager.cc * * Created on: 15 oct. 2012 * Author: AKKA IS */ #include #include #include #include #include #include "DicError.hh" #include "ParameterManager.hh" #include "DicError.hh" #include "Parameter.hh" #include "FileConfigurator.hh" #include "ParamOutput.hh" #include "Process.hh" #include "ServicesServer.hh" //#include "TimeTableCatalogFactory.hh" using namespace std; using namespace TimeTableCatalog; namespace AMDA { namespace Parameters { log4cxx::LoggerPtr ParameterManager::_logger( log4cxx::Logger::getLogger("AMDA-Kernel.ParameterManager")); /* * @brief Mutex used to protect multi-thread access to CDF lib (CDF lib is not thread-safe!) * This mutex is a part of the ParameterManager to be shared between by FileWriterCDF (DownloadOutput plugin) * and class and FileReaderCDF (ParamGetLocalFile) */ boost::mutex ParameterManager::mutexCDFLib; /* * @brief Mutex used to protect multi-thread access to NetCDF lib (NetCDF lib is not thread-safe!) * This mutex is a part of the ParameterManager to be shared between by FileWriterNetCDF (DownloadOutput plugin) * and class and FileReaderNetCDF (ParamGetLocalFile) */ boost::mutex ParameterManager::mutexNetCDFLib; #define DEFAULT_GAP_THRESHOLD_VALUE 5 ParameterManager::ParameterManager() : _timeIntervalList(new TimeIntervalList), _defaultGapThreshold(DEFAULT_GAP_THRESHOLD_VALUE) { } ParameterManager::~ParameterManager() { } void ParameterManager::createParameter(const std::string& nameParam) { CfgContext ctx; ctx.push(this); ctx.push(ServicesServer::getInstance()); try { ServicesServer::getInstance()->getConfigurator()->proceed( nameParam.c_str(), ctx); } catch (AMDA::AMDA_exception & e) { e << AMDA::errno_code(AMDA_INFORMATION_PARAM_ERR); throw; } } bool ParameterManager::addParameter(Parameter* pParentParameter, const std::string& pIDParam, ParameterSPtr& pParamResult) { bool result = false; ParameterList::iterator it = _parameterList.find(pIDParam); LOG4CXX_DEBUG(_logger, "Add Parameter : " << pIDParam); if (it == _parameterList.end()) { pParamResult = ParameterSPtr(new Parameter(*this, pIDParam)); _parameterList[pIDParam] = pParamResult; result = true; } else { pParamResult = it->second; } if (pParentParameter) { pParentParameter->addParameter(pParamResult); pParamResult->setGapThreshold(pParentParameter->getGapThreshold()); for (auto info : pParentParameter->getInfoList()) pParamResult->setInfoValues(info.first,info.second); } return result; } void ParameterManager::applyParamIdCorrection(std::string& paramId) { //Some characters are used to apply an operation, a process, etc... in an expression //These characters can't be used for a parmId //=> replace "-", "+", "*", "/", "^", "(", ")", "[", "]", "{", "}","&","|", //"$" by "_" std::vector charList = { '-', '+', '*', '/', '^', '(', ')', '[', ']', '{', '}', '&','|', '$' }; for (auto c : charList) std::replace( paramId.begin(), paramId.end(), c, '_'); } ParameterSPtr& ParameterManager::checkIfIsANeededParameter(ParameterSPtr& pParam) { Process* lProcess = dynamic_cast(pParam->getDataWriterTemplate().get()); if ( lProcess && lProcess->isEmptyExpression() ){ ParameterSPtr temp = *pParam->getParameterList().begin(); pParam->delegateOtherTaskTo(temp); _parameterList[pParam->getId()] = temp; pParam=temp; } return pParam; } ParameterSPtr ParameterManager::getSampledParameter(const std::string& pIDParam, const std::string &samplingMode, float samplingValue, float gapThreshold) { ParameterSPtr lParameter; if (samplingMode == "") { lParameter = getParameter(pIDParam); } else { //Compute a ResmapledParameter name boost::hash string_hash; stringstream lIdent; stringstream lBuffer; lBuffer << samplingMode << "_" << samplingValue << "_" << gapThreshold; lIdent << pIDParam << "_" << string_hash(lBuffer.str()); LOG4CXX_DEBUG(_logger, "Resampled Parameter to found or create: " << lIdent.str() << " - samplingValue = " << samplingValue << ", gapThreshold = " << gapThreshold); // Search if already exist ParameterList::iterator it = _parameterList.find(lIdent.str()); if ( it != _parameterList.end()) { LOG4CXX_DEBUG(_logger, "Resampled Parameter: " << lIdent.str() << " founded"); lParameter = checkIfIsANeededParameter(it->second); } else { // create Parameter with ProcessRempling on $pIDParam LOG4CXX_DEBUG(_logger, "Resampled Parameter: " << lIdent.str() << " creation"); lParameter = ParameterSPtr(new Parameter(*this, lIdent.str())); for (auto info : getParameter(pIDParam)->getInfoList()) lParameter->setInfoValues(info.first,info.second); //lParameter->setXmlId(getParameter(pIDParam)->getXmlId()); Process* lProcess = NULL; if (samplingMode == "classic") { lProcess = ServicesServer::getInstance()->getProcess("sampling_classic",*lParameter.get()); } else { lProcess = ServicesServer::getInstance()->getProcess("sampling_simple",*lParameter.get()); } if (lProcess) { lBuffer.str(""); lBuffer << samplingValue; lProcess->getAttributList().push_back(lBuffer.str()); lBuffer.str(""); lBuffer << gapThreshold; lProcess->getAttributList().push_back(lBuffer.str()); lBuffer.str(""); lBuffer << "$" << pIDParam; lProcess->setExpression(lBuffer.str()); DataWriterSPtr lDataWriter(lProcess); lParameter->setDataWriter(lDataWriter); } else { BOOST_THROW_EXCEPTION(ParameterManager_exception() << AMDA::errno_code(AMDA_PROCESS_ERR) << AMDA::ex_msg(std::string("Cannot found resampling process for: ")+pIDParam)); } _parameterList[lIdent.str()] = lParameter; } } if (!lParameter.get()) { BOOST_THROW_EXCEPTION(ParameterManager_exception() << AMDA::errno_code(AMDA_PARAM_NOT_FOUND_ERR) << AMDA::ex_msg(std::string("Cannot reach parameter: ")+ pIDParam)); } return lParameter; } ParameterSPtr ParameterManager::getSampledParameterUnderRefParam(const std::string& pIDParam, const std::string& pIDRefParam) { //make a name for the resampled parameter boost::hash string_hash; stringstream lIdent; stringstream lBuffer; lBuffer << "sampling_under_refparam"; lBuffer << "_" << pIDRefParam; lIdent << pIDParam << "_" << string_hash(lBuffer.str()); LOG4CXX_DEBUG(_logger, "Resampled Parameter under reference parameter to found or create: " << lIdent.str()); // Search if already exist ParameterSPtr lParameter; ParameterSPtr lrefParameter; lrefParameter = getParameter(pIDRefParam); if (lrefParameter == nullptr) { BOOST_THROW_EXCEPTION(ParameterManager_exception() << AMDA::errno_code(AMDA_PARAM_NOT_FOUND_ERR) << AMDA::ex_msg(std::string("Cannot reach reference parameter: ")+ pIDRefParam)); } ParameterList::iterator it = _parameterList.find(lIdent.str()); if ( it != _parameterList.end()) { LOG4CXX_DEBUG(_logger, "Resampled Parameter under reference parameter : " << lIdent.str() << " founded"); lParameter = checkIfIsANeededParameter(it->second); } else { // create Parameter with ProcessRempling on $pIDParam LOG4CXX_DEBUG(_logger, "Resampled Parameter under reference parameter : " << lIdent.str() << " creation"); lParameter = ParameterSPtr(new Parameter(*this, lIdent.str())); for (auto info : lrefParameter->getInfoList()) lParameter->setInfoValues(info.first,info.second); //lParameter->setXmlId(getParameter(pIDParam)->getXmlId()); Process* lProcess = NULL; lProcess = ServicesServer::getInstance()->getProcess("sampling_under_refparam",*lParameter.get()); if (lProcess) { lBuffer.str(""); lBuffer << "$" << pIDParam; lProcess->setExpression(lBuffer.str()); lProcess->setReferenceParameter(lrefParameter); DataWriterSPtr lDataWriter(lProcess); lParameter->setDataWriter(lDataWriter); } else { BOOST_THROW_EXCEPTION(ParameterManager_exception() << AMDA::errno_code(AMDA_PROCESS_ERR) << AMDA::ex_msg(std::string("Cannot found resampling process for: ")+pIDParam)); } _parameterList[lIdent.str()] = lParameter; } if (!lParameter.get()) { BOOST_THROW_EXCEPTION(ParameterManager_exception() << AMDA::errno_code(AMDA_PARAM_NOT_FOUND_ERR) << AMDA::ex_msg(std::string("Cannot reach parameter: ")+ pIDParam)); } return lParameter; } ParameterSPtr ParameterManager::getParameterFromExpression(const std::string& pExpression, double gapThreshold) { boost::hash string_hash; std::stringstream lIdent; lIdent << string_hash(pExpression); if (!isNAN(gapThreshold)) { lIdent << "_"; lIdent << gapThreshold; } ParameterSPtr lParameter; ParameterSPtr lparentParameter; ParameterList::iterator it = _parameterList.find(lIdent.str()); //check if the parameter already exist if ( it != _parameterList.end()) { LOG4CXX_DEBUG(_logger, "Parameter from expression : " << lIdent.str() << " founded"); lParameter = checkIfIsANeededParameter(it->second); } else { // create Parameter LOG4CXX_DEBUG(_logger, "Parameter from expression : " << lIdent.str() << " creation"); lParameter = ParameterSPtr(new Parameter(*this, lIdent.str())); Process* lProcess = NULL; //create a standard process lProcess = ServicesServer::getInstance()->getProcess("standard",*lParameter.get()); if (lProcess) { //set expression and create the data writer lProcess->setExpression(pExpression); if (!isNAN(gapThreshold)) lProcess->setGapThreshold(gapThreshold); DataWriterSPtr lDataWriter(lProcess); lParameter->setDataWriter(lDataWriter); _parameterList[lIdent.str()] = lParameter; } else { stringstream lError; lError << "Process: 'standard' not found"; LOG4CXX_ERROR(_logger, lError.str()); BOOST_THROW_EXCEPTION(ParameterManager_exception() << AMDA::errno_code(AMDA_PROCESS_ERR) << AMDA::ex_msg(lError.str())); } } if (!lParameter.get()) { BOOST_THROW_EXCEPTION(ParameterManager_exception() << AMDA::errno_code(AMDA_PARAM_NOT_FOUND_ERR) << AMDA::ex_msg(std::string("Cannot reach parameter: ")+ lIdent.str())); } return lParameter; } ParameterSPtr ParameterManager::getParameter(const std::string& pIDParam) { ParameterSPtr lParameter; ParameterList::iterator it = _parameterList.find(pIDParam); if (it != _parameterList.end()) { lParameter = checkIfIsANeededParameter(it->second); } else { LOG4CXX_ERROR(_logger, "parameter '" << pIDParam << "' not exist"); BOOST_THROW_EXCEPTION(ParameterManager_exception() << AMDA::errno_code(AMDA_PARAM_NOT_FOUND_ERR) << AMDA::ex_msg(std::string("Cannot reach parameter: ")+ pIDParam)); } return lParameter; } void ParameterManager::execute(string workingPath) { int error=0; int result = AMDA_EXIT_OK; int lNbTraitement = _paramOutputList.size(); for (ParamOutputList::iterator it = _paramOutputList.begin(); it != _paramOutputList.end(); ++it) { bool lAsError = false; try { (*it)->setWorkPath(workingPath); (*it)->establishConnection(); } catch (AMDA::AMDA_exception & e) { LOG4CXX_ERROR( _logger, "Error resume: " << traitException(result,e)); lAsError=true; } catch (...) { lAsError=true; } if (lAsError) { ++error; _paramOutputList.erase(it--); } } /* // Read time table file. if (!_timeTablePath.empty()) { readTimeTable(workingPath + "/" + _timeTablePath); } */ for (ParamOutputList::iterator it = _paramOutputList.begin(); it != _paramOutputList.end(); ++it) { bool lAsError = false; try { LOG4CXX_DEBUG(_logger, "Give TimeInterval list to method"); (*it)->init(getInputIntervals()); } catch (AMDA::AMDA_exception & e) { LOG4CXX_ERROR( _logger, "Error resume: " << traitException(result,e)); lAsError = true; } catch (...) { lAsError = true; } if (lAsError) { ++error; _paramOutputList.erase(it--); } } boost::thread_group lThGroup; typedef std::vector > ParamOutputResultList; ParamOutputResultList resultList; for ( ParamOutputList::iterator it = _paramOutputList.begin(); it != _paramOutputList.end(); ++it) { boost::packaged_task pt(boost::bind(&ParamOutput::process, *it)); boost::shared_future future(pt.get_future()); resultList.push_back(future); lThGroup.add_thread(new boost::thread(boost::move(pt))); } boost::wait_for_all(resultList.begin(), resultList.end()); lThGroup.join_all(); _parameterList.clear(); for (ParamOutputResultList::iterator it = resultList.begin();it != resultList.end(); ++it) { try { it->get(); } catch (AMDA::AMDA_exception & e) { LOG4CXX_ERROR( _logger, "Error resume: " << traitException(result,e)); ++error; } catch (...) { result = AMDA_ERROR_UNKNOWN; } } // Attempt to terminate each param output. for(ParamOutputList::iterator it = _paramOutputList.begin(); it != _paramOutputList.end(); ++it) { try { (*it)->terminate(); } catch (...) { // Nothing to do } } if ( error != 0) { if ( lNbTraitement == 1) { BOOST_THROW_EXCEPTION( AMDA::AMDA_exception() << AMDA::errno_code(result)); } else { BOOST_THROW_EXCEPTION( AMDA::AMDA_exception() << AMDA::errno_code(AMDA_PARAM_SOME_ERR)); } } } TimeIntervalListSPtr ParameterManager::getInputIntervals() { return _timeIntervalList; } void ParameterManager::addInputInterval(double pStartTime, double pStopTime, int pIndex, std::string& pttPath, std::string& pttName, int pttTotalIntervals) { _timeIntervalList->push_back(TimeInterval(pStartTime, pStopTime, pIndex, pttPath, pttName, pttTotalIntervals)); } void ParameterManager::addInputInterval(const TimeInterval& pTimeInterval) { _timeIntervalList->push_back(pTimeInterval); } void ParameterManager::addInputIntervalDataList(const int pIndex, const std::string& pDataKey, const std::vector& dataList) { _timeIntervalDataList[pIndex][pDataKey] = dataList; } std::vector& ParameterManager::getInputIntervalDataList(const int pIndex, const std::string& pDataKey) { return _timeIntervalDataList[pIndex][pDataKey]; } /** * get the computed gap size */ double ParameterManager::getComputedGapSize(double gapThreshold, double minSampling) { return gapThreshold * minSampling; } } /* namespace Parameters */ } /* namespace AMDA */