ParameterManager.cc 13.4 KB
/**
 * ParamterManager.cc
 *
 *  Created on: 15 oct. 2012
 *      Author: AKKA IS
 */


#include <iostream>
#include <sstream>
#include <boost/thread/future.hpp>
#include <boost/thread/thread.hpp>
#include <boost/functional/hash.hpp>

#include "DicError.hh"

#include "ParameterManager.hh"

#include "DicError.hh"
#include "Parameter.hh"
#include "FileConfigurator.hh"
#include "ParamOutput.hh"
#include "Process.hh"
#include "ServicesServer.hh"

//#include "TimeTableCatalogFactory.hh"

using namespace std;
using namespace TimeTableCatalog;

namespace AMDA {
namespace Parameters {

log4cxx::LoggerPtr ParameterManager::_logger(
		log4cxx::Logger::getLogger("AMDA-Kernel.ParameterManager"));

/*
 * @brief Mutex used to protect multi-thread access to CDF lib (CDF lib is not thread-safe!)
 * This mutex is a part of the ParameterManager to be shared between by FileWriterCDF (DownloadOutput plugin)
 * and class and FileReaderCDF (ParamGetLocalFile)
 */
boost::mutex ParameterManager::mutexCDFLib;

#define DEFAULT_GAP_THRESHOLD_VALUE 5

ParameterManager::ParameterManager() :
		_timeIntervalList(new TimeIntervalList),
		_defaultGapThreshold(DEFAULT_GAP_THRESHOLD_VALUE) {
}

ParameterManager::~ParameterManager() {
}

void ParameterManager::createParameter(const std::string& nameParam) {
	CfgContext ctx;
	ctx.push<ParameterManager*>(this);
	ctx.push<ServicesServer*>(ServicesServer::getInstance());
	try {
		ServicesServer::getInstance()->getConfigurator()->proceed(
				nameParam.c_str(), ctx);
	} catch (AMDA::AMDA_exception & e) {
		e << AMDA::errno_code(AMDA_INFORMATION_PARAM_ERR);
		throw;
	}
}

bool ParameterManager::addParameter(Parameter* pParentParameter,
		const std::string& pIDParam, ParameterSPtr& pParamResult) {
	bool result = false;
	ParameterList::iterator it = _parameterList.find(pIDParam);
	LOG4CXX_DEBUG(_logger, "Add Parameter : " << pIDParam);
	if (it == _parameterList.end()) {
		pParamResult = ParameterSPtr(new Parameter(*this, pIDParam));
		_parameterList[pIDParam] = pParamResult;
		result = true;
	} else {
		pParamResult = it->second;
	}
	if (pParentParameter) {
		pParentParameter->addParameter(pParamResult);
		pParamResult->setGapThreshold(pParentParameter->getGapThreshold());
		for (auto info : pParentParameter->getInfoList())
			pParamResult->setInfoValues(info.first,info.second);
	}
	return result;
}

void ParameterManager::applyParamIdCorrection(std::string& paramId)
{
	//Some characters are used to apply an operation, a process, etc... in an expression
	//These characters can't be used for a parmId
	//=> replace "-", "+", "*", "/", "^", "(", ")", "[", "]", "{", "}","&","|",
	//"$" by "_"
	std::vector<char> charList = {
		'-', '+', '*', '/', '^',
		'(', ')', '[', ']', '{', '}',
		'&','|', '$'
	};

	for (auto c : charList)
		std::replace( paramId.begin(), paramId.end(), c, '_');
}

ParameterSPtr& ParameterManager::checkIfIsANeededParameter(ParameterSPtr& pParam) {
	Process* lProcess = dynamic_cast<Process*>(pParam->getDataWriterTemplate().get());
	if ( lProcess && lProcess->isEmptyExpression() ){
		ParameterSPtr temp = *pParam->getParameterList().begin();
		pParam->delegateOtherTaskTo(temp);
		_parameterList[pParam->getId()] = temp;
		pParam=temp;
	}
	return pParam;
}

ParameterSPtr ParameterManager::getSampledParameter(const std::string& pIDParam,
		const std::string &samplingMode, float samplingValue, float gapThreshold)
{
	ParameterSPtr lParameter;
	if (samplingMode == "") {
		lParameter = getParameter(pIDParam);
	} else {
		//Compute a ResmapledParameter name
	    boost::hash<std::string> string_hash;
		stringstream lIdent;
		stringstream lBuffer;
		lBuffer << samplingMode << "_" << samplingValue << "_" << gapThreshold;
		lIdent << pIDParam << "_" << string_hash(lBuffer.str());
		LOG4CXX_DEBUG(_logger, "Resampled Parameter to found or create: " << lIdent.str());
		// Search if already exist
		ParameterList::iterator it = _parameterList.find(lIdent.str());
		if ( it != _parameterList.end()) {
			LOG4CXX_DEBUG(_logger, "Resampled Parameter: " << lIdent.str() << " founded");
			lParameter = checkIfIsANeededParameter(it->second);
		} else {
			// create Parameter with ProcessRempling on $pIDParam
			LOG4CXX_DEBUG(_logger, "Resampled Parameter: " << lIdent.str() << " creation");
			lParameter = ParameterSPtr(new Parameter(*this, lIdent.str()));
			for (auto info : getParameter(pIDParam)->getInfoList())
				lParameter->setInfoValues(info.first,info.second);
			//lParameter->setXmlId(getParameter(pIDParam)->getXmlId());
			Process* lProcess = NULL;
			if (samplingMode == "classic") {
				lProcess = ServicesServer::getInstance()->getProcess("sampling_classic",*lParameter.get());
			} else {
				lProcess = ServicesServer::getInstance()->getProcess("sampling_simple",*lParameter.get());
			}
			if (lProcess) {
				lBuffer.str(""); lBuffer << samplingValue;
				lProcess->getAttributList().push_back(lBuffer.str());
				lBuffer.str(""); lBuffer << gapThreshold;
				lProcess->getAttributList().push_back(lBuffer.str());
				lBuffer.str("");  lBuffer << "$" << pIDParam;
				lProcess->setExpression(lBuffer.str());
				DataWriterSPtr lDataWriter(lProcess);
				lParameter->setDataWriter(lDataWriter);
			} else {
				BOOST_THROW_EXCEPTION(ParameterManager_exception() << AMDA::errno_code(AMDA_PROCESS_ERR) << AMDA::ex_msg(std::string("Cannot found resampling process for: ")+pIDParam));
			}
			_parameterList[lIdent.str()] = lParameter;
		}
	}
	if (!lParameter.get()) {
    	BOOST_THROW_EXCEPTION(ParameterManager_exception() << AMDA::errno_code(AMDA_PARAM_NOT_FOUND_ERR) << AMDA::ex_msg(std::string("Cannot reach parameter: ")+ pIDParam));
	}
	return lParameter;
}

ParameterSPtr ParameterManager::getSampledParameterUnderRefParam(const std::string& pIDParam, const std::string& pIDRefParam)
{
	//make a name for the resampled parameter
	boost::hash<std::string> string_hash;
	stringstream lIdent;
	stringstream lBuffer;
	lBuffer << "sampling_under_refparam";
	lBuffer << "_" << pIDRefParam;
	lIdent << pIDParam << "_" << string_hash(lBuffer.str());
	LOG4CXX_DEBUG(_logger, "Resampled Parameter under reference parameter to found or create: " << lIdent.str());
	// Search if already exist
	ParameterSPtr lParameter;
	ParameterSPtr lrefParameter;

	lrefParameter = getParameter(pIDRefParam);

	if (lrefParameter == nullptr)
	{
    	BOOST_THROW_EXCEPTION(ParameterManager_exception() << AMDA::errno_code(AMDA_PARAM_NOT_FOUND_ERR) << AMDA::ex_msg(std::string("Cannot reach reference parameter: ")+ pIDRefParam));
	}

	ParameterList::iterator it = _parameterList.find(lIdent.str());
	if ( it != _parameterList.end()) {
		LOG4CXX_DEBUG(_logger, "Resampled Parameter under reference parameter : " << lIdent.str() << " founded");
		lParameter = checkIfIsANeededParameter(it->second);
	} else {
		// create Parameter with ProcessRempling on $pIDParam
		LOG4CXX_DEBUG(_logger, "Resampled Parameter under reference parameter : " << lIdent.str() << " creation");
		lParameter = ParameterSPtr(new Parameter(*this, lIdent.str()));
		for (auto info : lrefParameter->getInfoList())
			lParameter->setInfoValues(info.first,info.second);
		//lParameter->setXmlId(getParameter(pIDParam)->getXmlId());
		Process* lProcess = NULL;
		lProcess = ServicesServer::getInstance()->getProcess("sampling_under_refparam",*lParameter.get());
		if (lProcess)
		{
			lBuffer.str("");  lBuffer << "$" << pIDParam;
			lProcess->setExpression(lBuffer.str());
			lProcess->setReferenceParameter(lrefParameter);
			DataWriterSPtr lDataWriter(lProcess);
			lParameter->setDataWriter(lDataWriter);

		} else {
			BOOST_THROW_EXCEPTION(ParameterManager_exception() << AMDA::errno_code(AMDA_PROCESS_ERR) << AMDA::ex_msg(std::string("Cannot found resampling process for: ")+pIDParam));
		}
		_parameterList[lIdent.str()] = lParameter;
	}

	if (!lParameter.get()) {
	    	BOOST_THROW_EXCEPTION(ParameterManager_exception() << AMDA::errno_code(AMDA_PARAM_NOT_FOUND_ERR) << AMDA::ex_msg(std::string("Cannot reach parameter: ")+ pIDParam));
		}
	return lParameter;
}

ParameterSPtr ParameterManager::getParameterFromExpression(const std::string& pExpression)
{
	boost::hash<std::string> string_hash;
	std::stringstream lIdent;
	lIdent << string_hash(pExpression);

	ParameterSPtr lParameter;
	ParameterSPtr lparentParameter;

	ParameterList::iterator it = _parameterList.find(lIdent.str());
	//check if the parameter already exist
	if ( it != _parameterList.end()) {
		LOG4CXX_DEBUG(_logger, "Parameter from expression : " << lIdent.str() << " founded");
		lParameter = checkIfIsANeededParameter(it->second);
	} else {
		// create Parameter
		LOG4CXX_DEBUG(_logger, "Parameter from expression : " << lIdent.str() << " creation");
		lParameter = ParameterSPtr(new Parameter(*this, lIdent.str()));
		Process* lProcess = NULL;
		//create a standard process
		lProcess = ServicesServer::getInstance()->getProcess("standard",*lParameter.get());
		if (lProcess)
		{
			//set expression and create the data writer
			lProcess->setExpression(pExpression);
			DataWriterSPtr lDataWriter(lProcess);
			lParameter->setDataWriter(lDataWriter);
			_parameterList[lIdent.str()] = lParameter;
		} else {
			stringstream lError;  lError << "Process: 'standard' not found";
			LOG4CXX_ERROR(_logger, lError.str());
			BOOST_THROW_EXCEPTION(ParameterManager_exception() << AMDA::errno_code(AMDA_PROCESS_ERR) << AMDA::ex_msg(lError.str()));
		}
	}

	if (!lParameter.get()) {
		BOOST_THROW_EXCEPTION(ParameterManager_exception() << AMDA::errno_code(AMDA_PARAM_NOT_FOUND_ERR) << AMDA::ex_msg(std::string("Cannot reach parameter: ")+ lIdent.str()));
	}
	return lParameter;
}

ParameterSPtr ParameterManager::getParameter(const std::string& pIDParam) {
	ParameterSPtr lParameter;

	ParameterList::iterator it = _parameterList.find(pIDParam);
	if (it != _parameterList.end()) {
		lParameter = checkIfIsANeededParameter(it->second);
	} else {
		LOG4CXX_ERROR(_logger, "parameter '" << pIDParam << "' not exist");
    	BOOST_THROW_EXCEPTION(ParameterManager_exception() << AMDA::errno_code(AMDA_PARAM_NOT_FOUND_ERR) << AMDA::ex_msg(std::string("Cannot reach parameter: ")+ pIDParam));
	}

	return lParameter;
}

void ParameterManager::execute(string workingPath) {
	int error=0;
	int result = AMDA_EXIT_OK;
	int lNbTraitement =  _paramOutputList.size();

	for (ParamOutputList::iterator it = _paramOutputList.begin(); it != _paramOutputList.end(); ++it) {
		bool lAsError = false;
		try {
			(*it)->setWorkPath(workingPath);
			(*it)->establishConnection();
		} catch (AMDA::AMDA_exception & e) {
			LOG4CXX_ERROR( _logger, "Error resume: " << traitException(result,e));
			lAsError=true;
		} catch (...) {
			lAsError=true;
		}
		if (lAsError) {
			++error;
			_paramOutputList.erase(it--);
		}
	}
/*
	// Read time table file.
	if (!_timeTablePath.empty()) {
		readTimeTable(workingPath + "/" + _timeTablePath);
	}
*/
	for (ParamOutputList::iterator it = _paramOutputList.begin(); it != _paramOutputList.end(); ++it) {
		bool lAsError = false;
		try {
			LOG4CXX_DEBUG(_logger, "Give TimeInterval list to method");
			(*it)->init(getInputIntervals());
		} catch (AMDA::AMDA_exception & e) {
			LOG4CXX_ERROR( _logger, "Error resume: " << traitException(result,e));
			lAsError = true;
		} catch (...) {
			lAsError = true;
		}
		if (lAsError) {
			++error;
			_paramOutputList.erase(it--);
		}
	}

	boost::thread_group lThGroup;

	 typedef std::vector<boost::shared_future<void> > ParamOutputResultList;
	 ParamOutputResultList resultList;
	 for ( ParamOutputList::iterator it = _paramOutputList.begin(); it != _paramOutputList.end(); ++it) {
		 boost::packaged_task<void> pt(boost::bind(&ParamOutput::process, *it));
		 boost::shared_future<void> future(pt.get_future());
		 resultList.push_back(future);
		 lThGroup.add_thread(new boost::thread(boost::move(pt)));
	}

	 boost::wait_for_all(resultList.begin(), resultList.end());
	 lThGroup.join_all();

	 _parameterList.clear();

	for (ParamOutputResultList::iterator it = resultList.begin();it != resultList.end(); ++it) {
		try {
			it->get();
		} catch (AMDA::AMDA_exception & e) {
			LOG4CXX_ERROR( _logger,	"Error resume: " << traitException(result,e));
			++error;
		} catch (...) {
			result = AMDA_ERROR_UNKNOWN;
		}
	}

	// Attempt to terminate each param output.
	for(ParamOutputList::iterator it = _paramOutputList.begin(); it != _paramOutputList.end(); ++it) {
		try {
			(*it)->terminate();
		} catch (...) {
			// Nothing to do
		}
	}

	if ( error != 0) {
		if ( lNbTraitement == 1) {
			BOOST_THROW_EXCEPTION( AMDA::AMDA_exception() << AMDA::errno_code(result));
		} else {
			BOOST_THROW_EXCEPTION( AMDA::AMDA_exception() << AMDA::errno_code(AMDA_PARAM_SOME_ERR));
		}
	}

 }

	TimeIntervalListSPtr ParameterManager::getInputIntervals() {
		return _timeIntervalList;
	}

	void ParameterManager::addInputInterval(double pStartTime, double pStopTime, int pIndex, std::string& pttPath, std::string& pttName, int pttTotalIntervals)  {
		_timeIntervalList->push_back(TimeInterval(pStartTime, pStopTime, pIndex, pttPath, pttName, pttTotalIntervals));
	}

	void ParameterManager::addInputInterval(const TimeInterval& pTimeInterval) {
		_timeIntervalList->push_back(pTimeInterval);
	}

	void ParameterManager::addInputIntervalDataList(const int pIndex, const std::string& pDataKey, const std::vector<std::string>& dataList)
	{
		_timeIntervalDataList[pIndex][pDataKey] = dataList;
	}

	std::vector<std::string>& ParameterManager::getInputIntervalDataList(const int pIndex, const std::string& pDataKey)
	{
		return _timeIntervalDataList[pIndex][pDataKey];
	}

	/**
	 * get the computed gap size
	 */
	double ParameterManager::getComputedGapSize(double gapThreshold, double minSampling)
	{
		return gapThreshold * minSampling;
	}

} /* namespace Parameters */
} /* namespace AMDA */