MultiParamProcess.cc 7.45 KB
/*
 * MultiParamProcess.cc
 *
 *  Created on: Jan 4, 2013
 *      Author: f.casimir
 */

#include <algorithm>
#include <limits>

#include "ParamData.hh"
#include "Operation.hh"
#include "ParamMgr.hh"

#include "MultiParamProcess.hh"

namespace AMDA {
namespace Parameters {

MultiParamProcess::MultiParamProcess(Parameter &parameter) :
		Process(parameter), _minSampling(std::numeric_limits<double>::max()) {
}

MultiParamProcess::MultiParamProcess(const MultiParamProcess &pProcess, Parameter &parameter) :
		Process(pProcess,parameter),
		 _paramNameList(pProcess._paramNameList),
		 _minSampling(pProcess._minSampling) {

}

MultiParamProcess::~MultiParamProcess() {
	for (ParameterList::iterator it = _paramNameList.begin(); it != _paramNameList.end(); ++it) {
		ParameterSPtr& parameter = it->second.first;
		if ( parameter) { parameter->closeConnection(this); }
	}
}

/**
 *  @brief Connection to Parameter server.
 */
void MultiParamProcess::establishConnection() {

	for (ParameterList::iterator it = _paramNameList.begin(); it != _paramNameList.end(); ++it) {
		ParameterSPtr& parameter = it->second.first;
		parameter->openConnection(this);
	}
}

 TimeStamp MultiParamProcess::init() {

	 TimeStamp time = 0;
	for (ParameterList::iterator it = _paramNameList.begin(); it != _paramNameList.end(); ++it) {
		ParameterSPtr& parameter = it->second.first;
		time = std::max(time, parameter->init(this, _timeIntervalList));
		Parameter::InfoList lInfoList = parameter->getInfoList();
		_parameter.getInfoList().insert(lInfoList.begin(), lInfoList.end());
		_minSampling = std::min(parameter->getParamData(this)->getMinSampling(), _minSampling);
	}
	return time;
}

unsigned int MultiParamProcess::write() {
	typedef std::list<std::pair< ParameterList::iterator,boost::shared_future<ParamDataIndexInfo> > > ParamDataIndexInfoFutureList;

	ParamDataIndexInfoFutureList lResultList;
	unsigned int lNbTotalAvalaible = 0;

	for (ParameterList::iterator itParam = _paramNameList.begin(); itParam != _paramNameList.end(); ++itParam) {
		if (itParam->second.second._timeIntToProcessChanged && !_paramDataIndexInfo._timeIntToProcessChanged)
			continue;
		lNbTotalAvalaible = itParam->second.second._startIndex + itParam->second.second._nbDataToProcess;
		if ( _paramDataIndexInfo._startIndex >= lNbTotalAvalaible)
		{
			LOG4CXX_DEBUG(_logger, "Need more data Parameter name: " << itParam->first );
			lResultList.push_back(make_pair(itParam,(*itParam->second.first).getAsync(this)));
		}
	}

	// Wait data
	for ( ParamDataIndexInfoFutureList::iterator it= lResultList.begin();it != lResultList.end(); ++it) {
		LOG4CXX_DEBUG(_logger, "Wait data Parameter name: " << it->first->first );
		it->first->second.second=it->second.get();
	}

	//compute minimum data available
	ParameterList::iterator it = _paramNameList.begin();
	if( it != _paramNameList.end()) {
		ParamDataIndexInfo &lParamDataIndexInfoTmp= (*it).second.second;
		LOG4CXX_DEBUG(_logger, "Parameter name: " << it->first << " Index: " << lParamDataIndexInfoTmp._startIndex  <<" nbDataToProcess " << lParamDataIndexInfoTmp._nbDataToProcess);
		lNbTotalAvalaible = lParamDataIndexInfoTmp._nbDataToProcess + lParamDataIndexInfoTmp._startIndex ;
		++it;
		for ( ;it != _paramNameList.end(); ++it) {
			ParamDataIndexInfo &lParamDataIndexInfoTmp= (*it).second.second;
			LOG4CXX_DEBUG(_logger, "Parameter name: " << it->first << " Index: " << lParamDataIndexInfoTmp._startIndex  <<" nbDataToProcess " << lParamDataIndexInfoTmp._nbDataToProcess);
			lNbTotalAvalaible =std::min(lNbTotalAvalaible, lParamDataIndexInfoTmp._nbDataToProcess + lParamDataIndexInfoTmp._startIndex) ;
		}
	}

	_paramDataIndexInfo._nbDataToProcess = lNbTotalAvalaible - _paramDataIndexInfo._startIndex;

	_operation->write(_paramDataIndexInfo);

	_paramDataIndexInfo._startIndex +=_paramDataIndexInfo._nbDataToProcess ;
        
        // Reset operation instance to prepare it for the next time interval.
	bool lTimeIntToProcessChanged = true;
	bool lNoMoreTimeInt = true;

	for (ParameterList::iterator itParam = _paramNameList.begin(); itParam != _paramNameList.end(); ++itParam) {
		lTimeIntToProcessChanged &= itParam->second.second._timeIntToProcessChanged;
		lNoMoreTimeInt &= itParam->second.second._noMoreTimeInt;
	}

	if(lTimeIntToProcessChanged) {
		LOG4CXX_DEBUG(_logger, "MultiParamProcess => Need to change interval");
		_operation->reset();
		_paramDataIndexInfo._endTimeIntIndexList.push_back(lNbTotalAvalaible);
		_paramData->getIndexInfo()._endTimeIntIndexList.push_back(lNbTotalAvalaible);
	}
	// There is no more time interval to process
	else if (lNoMoreTimeInt) {
		_paramDataIndexInfo._endTimeIntIndexList.push_back(lNbTotalAvalaible);
		_paramData->getIndexInfo()._endTimeIntIndexList.push_back(lNbTotalAvalaible);
		LOG4CXX_DEBUG(_logger, "MultiParamProcess => No more time interval to process; end index for this interval is " << _paramDataIndexInfo._endTimeIntIndexList.back());
	} else {
		// Nothing to do.
	}

	_paramDataIndexInfo._timeIntToProcessChanged = lTimeIntToProcessChanged;
	_paramDataIndexInfo._noMoreTimeInt = lNoMoreTimeInt;
	_paramData->getIndexInfo()._noMoreTimeInt = lNoMoreTimeInt;
	_paramData->getIndexInfo()._timeIntToProcessChanged = lTimeIntToProcessChanged;
        
        if (_paramDataIndexInfo._startIndex > 0)
            _paramData->updateDims();

	return _paramDataIndexInfo._nbDataToProcess;
}

/*
 * @brief @ to know the resampling strategy to use. If it's true, the nearest value will be use
 */
bool MultiParamProcess::useNearestValue()
{
	bool useNearestValue = true;
	for (ParameterList::iterator it = _paramNameList.begin(); it != _paramNameList.end(); ++it)
	{
		ParameterSPtr& parameter = it->second.first;
		useNearestValue = useNearestValue && parameter->getDataWriterTemplate()->useNearestValue();
	}
	return useNearestValue;
}

/**
 * @brief update parameter info in relation to the process
 */
void MultiParamProcess::updateInfo(Parameter & parameter)
{
	LOG4CXX_DEBUG(_logger, "MultiParamProcess::updateInfo - " << parameter.getId());

	if (parameter.getInfoId().empty())
		parameter.setInfoId(parameter.getId());

	//Param info
	AMDA::Info::ParamInfoSPtr paramInfo = AMDA::Info::ParamMgr::getInstance()->getParamInfoFromId(parameter.getInfoId(),true);

	if (paramInfo == nullptr)
		return;

	//Derived parameter => no comes from a dataset
	paramInfo->setDatasetId("");

	//set name and short name = param id
	if (paramInfo->getName().empty())
		paramInfo->setName(parameter.getId());
	if (paramInfo->getShortName().empty())
		paramInfo->setShortName(parameter.getId());

	std::string processInfo = "Derived parameter from expression '";
	processInfo += getExpression();
	processInfo += "'";

	for (ParameterList::iterator itParam = _paramNameList.begin(); itParam != _paramNameList.end(); ++itParam)
		paramInfo->addLinkedParamId(itParam->first);

	paramInfo->setProcessInfo(processInfo);
	paramInfo->setProcessDescription(getDescription());
}

/*
 * @brief Get min sampling
 */
double MultiParamProcess::getMinSampling()
{
	double minSampling = 0;
	for (auto param : _paramNameList)
	{
		ParameterSPtr& parameter = param.second.first;
		double paramMinSampling = 0;
		if (parameter->getTimeResolution() > 0)
			paramMinSampling = parameter->getTimeResolution();
		else if (parameter->getDataWriterTemplate() != nullptr)
			paramMinSampling = parameter->getDataWriterTemplate()->getMinSampling();

		if (minSampling == 0)
			minSampling = paramMinSampling;
		else if (paramMinSampling != 0)
			minSampling = std::min(minSampling,paramMinSampling);
	}
	return minSampling;
}


} /* namespace Parameters */
} /* namespace AMDA */