MultiParamProcess.cc 9.66 KB
/*
 * MultiParamProcess.cc
 *
 *  Created on: Jan 4, 2013
 *      Author: f.casimir
 */

#include <algorithm>
#include <limits>

#include "ParamData.hh"
#include "Operation.hh"
#include "ParamMgr.hh"

#include "MultiParamProcess.hh"

namespace AMDA {
namespace Parameters {

MultiParamProcess::MultiParamProcess(Parameter &parameter) :
		Process(parameter), _minSampling(std::numeric_limits<double>::max()) {
}

MultiParamProcess::MultiParamProcess(const MultiParamProcess &pProcess, Parameter &parameter) :
		Process(pProcess,parameter),
		 _paramNameList(pProcess._paramNameList),
		 _minSampling(pProcess._minSampling) {
	establishConnection();
}

MultiParamProcess::~MultiParamProcess() {
	for (ParameterList::iterator it = _paramNameList.begin(); it != _paramNameList.end(); ++it) {
		ParameterSPtr& parameter = it->second.first;
		if ( parameter) { parameter->closeConnection(this); }
	}
}

/**
 *  @brief Connection to Parameter server.
 */
void MultiParamProcess::establishConnection() {

	for (ParameterList::iterator it = _paramNameList.begin(); it != _paramNameList.end(); ++it) {
		ParameterSPtr& parameter = it->second.first;
		parameter->openConnection(this);
	}
}

 TimeStamp MultiParamProcess::init() {

	 TimeStamp time = 0;
	for (ParameterList::iterator it = _paramNameList.begin(); it != _paramNameList.end(); ++it) {
		ParameterSPtr& parameter = it->second.first;
		time = std::max(time, parameter->init(this, _timeIntervalList));
		Parameter::InfoList lInfoList = parameter->getInfoList();
		_parameter.getInfoList().insert(lInfoList.begin(), lInfoList.end());
		_minSampling = std::min(parameter->getParamData(this)->getMinSampling(), _minSampling);
	}
	return time;
}

unsigned int MultiParamProcess::write() {
	typedef std::list<std::pair< ParameterList::iterator,boost::shared_future<ParamDataIndexInfo> > > ParamDataIndexInfoFutureList;

	ParamDataIndexInfoFutureList lResultList;
	unsigned int lNbTotalAvalaible = 0;

	unsigned int nbDataBeforeCallProcess = _paramData->getDataNumber();

	if (_paramDataIndexInfo._timeIntToProcessChanged) {
		for (ParameterList::iterator itParam = _paramNameList.begin(); itParam != _paramNameList.end(); ++itParam) {
			LOG4CXX_DEBUG(_logger, "MultiParamProcess::write - New interval - Need more data for: " << itParam->first );
			(*itParam).second.second = (*itParam->second.first).getAsync(this).get();
		}
	}
	else {
		for (ParameterList::iterator itParam = _paramNameList.begin(); itParam != _paramNameList.end(); ++itParam) {
			if (itParam->second.second._timeIntToProcessChanged) {
				continue;
			}
			 lNbTotalAvalaible = itParam->second.second._startIndex + itParam->second.second._nbDataToProcess;
	               	if ( _paramDataIndexInfo._startIndex >= lNbTotalAvalaible) {
				LOG4CXX_DEBUG(_logger, "MultiParamProcess::write - Need more data for: " << itParam->first );
				(*itParam).second.second = (*itParam->second.first).getAsync(this).get();
			}
		}
	}

	//compute minimum data available
	ParameterList::iterator it = _paramNameList.begin();
	if( it != _paramNameList.end()) {
		ParamDataIndexInfo &lParamDataIndexInfoTmp= (*it).second.second;
		LOG4CXX_DEBUG(_logger, "Parameter name: " << it->first << " Index: " << lParamDataIndexInfoTmp._startIndex  <<" nbDataToProcess " << lParamDataIndexInfoTmp._nbDataToProcess);
		lNbTotalAvalaible = lParamDataIndexInfoTmp._nbDataToProcess + lParamDataIndexInfoTmp._startIndex ;
		++it;
		for ( ;it != _paramNameList.end(); ++it) {
			ParamDataIndexInfo &lParamDataIndexInfoTmp= (*it).second.second;
			LOG4CXX_DEBUG(_logger, "Parameter name: " << it->first << " Index: " << lParamDataIndexInfoTmp._startIndex  <<" nbDataToProcess " << lParamDataIndexInfoTmp._nbDataToProcess);
			lNbTotalAvalaible =std::min(lNbTotalAvalaible, lParamDataIndexInfoTmp._nbDataToProcess + lParamDataIndexInfoTmp._startIndex) ;
		}
	}

	_paramDataIndexInfo._nbDataToProcess = lNbTotalAvalaible - _paramDataIndexInfo._startIndex;

	_operation->write(_paramDataIndexInfo);
        
        bool updateDims = (_paramDataIndexInfo._nbDataToProcess > 0) && (_paramDataIndexInfo._startIndex == 0);

	_paramDataIndexInfo._startIndex +=_paramDataIndexInfo._nbDataToProcess ;
        
        // Reset operation instance to prepare it for the next time interval.
	bool lTimeIntToProcessChanged = true;
	bool lNoMoreTimeInt = true;

	for (ParameterList::iterator itParam = _paramNameList.begin(); itParam != _paramNameList.end(); ++itParam) {
		lTimeIntToProcessChanged &= itParam->second.second._timeIntToProcessChanged;
		lNoMoreTimeInt &= itParam->second.second._noMoreTimeInt;
	}

	if(lTimeIntToProcessChanged) {
		LOG4CXX_DEBUG(_logger, "MultiParamProcess => Need to change interval");
		_operation->reset();
		_paramDataIndexInfo._endTimeIntIndexList.push_back(lNbTotalAvalaible);
		_paramData->getIndexInfo()._endTimeIntIndexList.push_back(lNbTotalAvalaible);
	}
	// There is no more time interval to process
	else if (lNoMoreTimeInt) {
		_paramDataIndexInfo._endTimeIntIndexList.push_back(lNbTotalAvalaible);
		_paramData->getIndexInfo()._endTimeIntIndexList.push_back(lNbTotalAvalaible);
		LOG4CXX_DEBUG(_logger, "MultiParamProcess => No more time interval to process; end index for this interval is " << _paramDataIndexInfo._endTimeIntIndexList.back());
	} else {
		// Nothing to do.
	}

	_paramDataIndexInfo._timeIntToProcessChanged = lTimeIntToProcessChanged;
	_paramDataIndexInfo._noMoreTimeInt = lNoMoreTimeInt;
	_paramData->getIndexInfo()._noMoreTimeInt = lNoMoreTimeInt;
	_paramData->getIndexInfo()._timeIntToProcessChanged = lTimeIntToProcessChanged;
        
        if (updateDims)
            _paramData->updateDims();
        
	return _paramData->getDataNumber() - nbDataBeforeCallProcess;
}

/*
 * @brief @ to know the resampling strategy to use. If it's true, the nearest value will be use
 */
bool MultiParamProcess::useNearestValue()
{
	bool useNearestValue = true;
	for (ParameterList::iterator it = _paramNameList.begin(); it != _paramNameList.end(); ++it)
	{
		ParameterSPtr& parameter = it->second.first;
		useNearestValue = useNearestValue && parameter->getDataWriterTemplate()->useNearestValue();
	}
	return useNearestValue;
}

/**
 * @brief update parameter info in relation to the process
 */
void MultiParamProcess::updateInfo(Parameter & parameter)
{
	LOG4CXX_DEBUG(_logger, "MultiParamProcess::updateInfo - " << parameter.getId());

	if (parameter.getInfoId().empty())
		parameter.setInfoId(parameter.getId());

	//Param info
	AMDA::Info::ParamInfoSPtr paramInfo = AMDA::Info::ParamMgr::getInstance()->getParamInfoFromId(parameter.getInfoId(),true);

	if (paramInfo == nullptr)
		return;

	//Derived parameter => no comes from a dataset
	paramInfo->setDatasetId("");

	//set name and short name = param id
	if (paramInfo->getName().empty())
		paramInfo->setName(parameter.getId());
	if (paramInfo->getShortName().empty())
		paramInfo->setShortName(parameter.getId());

	std::string processInfo = "Derived parameter from expression '";
	processInfo += getExpression();
	processInfo += "'";

	for (ParameterList::iterator itParam = _paramNameList.begin(); itParam != _paramNameList.end(); ++itParam)
		paramInfo->addLinkedParamId(itParam->first);
                   
                    // adding tableParams into linked params 
                    std::map<int, boost::shared_ptr<AMDA::Info::ParamTable>> tables =paramInfo->getTables();
                    
                    if(! tables.empty()){
                        for(auto table : tables){
                            ParameterManager * parameterManager;
                             if (table.second != nullptr){
                            if(table.second->isVariable(parameterManager)){
                            std::map<std::string, std::string> tableParams = table.second->getTableParams(parameterManager);
                            if(! tableParams.empty())
                            for(auto tableParam : tableParams){
                                paramInfo->addLinkedParamId(tableParam.second);
                            }
                         }
                        
                      }
                    }
                    }
                     
	paramInfo->setProcessInfo(processInfo);

	std::string description = getDescription();
	if (description.empty()) {
		description = getExpression();
	}

	if ((_paramNameList.size() == 1) && paramInfo->getTables().empty()) {
		AMDA::Info::ParamInfoSPtr inputParamInfo = AMDA::Info::ParamMgr::getInstance()->getParamInfoFromId(_paramNameList.begin()->second.first->getInfoId());
		if (inputParamInfo != nullptr) {
			//Copy tables
			std::map<int, boost::shared_ptr<AMDA::Info::ParamTable>> inputTables = inputParamInfo->getTables();
			for (std::map<int, boost::shared_ptr<AMDA::Info::ParamTable>>::iterator it = inputTables.begin(); it != inputTables.end(); ++it) {
				paramInfo->addTable(it->first, it->second);
			}
		}
	}

	paramInfo->setProcessDescription(description);
}

/*
 * @brief Get min sampling
 */
double MultiParamProcess::getMinSampling()
{
	double minSampling = 0;
	for (auto param : _paramNameList)
	{
		ParameterSPtr& parameter = param.second.first;
		double paramMinSampling = 0;
		if (parameter->getTimeResolution() > 0)
			paramMinSampling = parameter->getTimeResolution();
		else if (parameter->getDataWriterTemplate() != nullptr)
			paramMinSampling = parameter->getDataWriterTemplate()->getMinSampling();

		if (minSampling == 0)
			minSampling = paramMinSampling;
		else if (paramMinSampling != 0)
			minSampling = std::min(minSampling,paramMinSampling);
	}
	return minSampling;
}


} /* namespace Parameters */
} /* namespace AMDA */