ProcessSamplingUnderRefParam.cc 7.32 KB
/**
 * ProcessSamplingUnderTimeList.cc
 *
 *  Created on: Jul 25, 2014
 *      Author: AKKA IS
 */
#include <stdlib.h>
#include <string>

#include "ProcessSamplingUnderRefParam.hh"
#include "Parameter.hh"
#include "ParameterManager.hh"
#include "ParameterCreatorFromExpression.hh"
#include "Resampling.hh"
#include "ParamMgr.hh"

using namespace std;
using namespace boost;
using namespace log4cxx;

namespace AMDA {
namespace Parameters {

ProcessSamplingUnderRefParam::ProcessSamplingUnderRefParam(Parameter &parameter) :
		SingleParamProcess_CRTP(parameter), _paramRefInput(NULL) {
}

ProcessSamplingUnderRefParam::ProcessSamplingUnderRefParam(const ProcessSamplingUnderRefParam& pProcess, Parameter &parameter)
: SingleParamProcess_CRTP(pProcess,parameter), _paramRefInput(pProcess._paramRefInput) {
}

ProcessSamplingUnderRefParam::~ProcessSamplingUnderRefParam() {
	if (_refParameterSPtr != nullptr)
	{
		_refParameterSPtr->closeConnection(this);
	}
}


void ProcessSamplingUnderRefParam::establishConnection() {
	if (_refParameterSPtr != nullptr)
		_refParameterSPtr->openConnection(this);

	SingleParamProcess_CRTP::establishConnection();
}

TimeStamp ProcessSamplingUnderRefParam::init() {
	/// Init input parameter
	TimeStamp timeStamp = _parameterInput->init( this, _timeIntervalList);
	/// Calibration information copy
	Parameter::InfoList lInfoList = _parameterInput->getInfoList();
	_parameter.getInfoList().insert(lInfoList.begin(), lInfoList.end());
	_paramInput = _parameterInput->getParamData(this).get();

	// check reference parameter
	if (_refParameterSPtr == nullptr) {
		BOOST_THROW_EXCEPTION(AMDA::AMDA_exception() << AMDA::errno_code(AMDA_PROCESS_ERR) << AMDA::ex_msg(std::string("ProcessSamplingUnderRefParam : cannot retrieve reference parameter ")));
	}

	timeStamp = std::max(timeStamp, _refParameterSPtr->init( this, _timeIntervalList));

	_paramRefInput = _refParameterSPtr->getParamData(this).get();

	double computedGapSize = _refParameterSPtr->getParameterManager().getComputedGapSize(_parameterInput->getGapThreshold(), _paramInput->getMinSampling());

	// ProcessSamplingClassic _operation creation
	Resampling::CreateResampling lCreateResampling(*this, _timeIntervalList,
			*_paramInput, *_paramRefInput, computedGapSize, _parameterInput->getDataWriterTemplate()->useNearestValue());
	_operation = lCreateResampling.getResampling();
	static_cast<Resampling::ResamplingAbstract*>(_operation)->init();

	/// Get result ParamData
	_paramData = ParamDataSPtr(_operation->getParamOutput());

	_paramData->setMinSampling(static_cast<Resampling::ResamplingAbstract*>(_operation)->getSampling());

	return timeStamp;
}

unsigned int ProcessSamplingUnderRefParam::write()
{
	_paramRefInput = _refParameterSPtr->getParamData(this).get();

	//ParamDataIndexInfo lRefParamDataIndexInfo = _paramRefInput->getIndexInfo();

	/*if (!lRefParamDataIndexInfo._timeIntToProcessChanged) {
		do {
			lRefParamDataIndexInfo = _refParameterSPtr->getAsync(this).get();
		} while(!lRefParamDataIndexInfo._timeIntToProcessChanged && (lRefParamDataIndexInfo._nbDataToProcess != 0));
	}*/

	int ret = 0;
	unsigned int nbDataBeforeCallProcess = _paramData->getDataNumber();
	ParamDataIndexInfo lParamDataIndexInfo = _parameterInput->getParamData(this).get()->getIndexInfo();

	this->_treatTerminated = false;

	do {
		//if (!lParamDataIndexInfo._timeIntToProcessChanged && (lParamDataIndexInfo._nbDataToProcess == 0))
			lParamDataIndexInfo =_parameterInput->getAsync(this).get();

		if ( ! this->_treatTerminated ) {
			_operation->write(lParamDataIndexInfo);
			ret = _paramData->getDataNumber() - nbDataBeforeCallProcess;
			// As we do not have output data, continue processing the input data
		}
		if ( lParamDataIndexInfo._nbDataToProcess == 0) {
			this->_treatTerminated = true;
		}
	} while(ret == 0 && lParamDataIndexInfo._nbDataToProcess != 0);


	bool updateDims = (nbDataBeforeCallProcess == 0) && (ret > 0);
	if (updateDims)
		_paramData->updateDims();

	// Reset operation to prepare static data for the next TimeInterval.
	if (lParamDataIndexInfo._timeIntToProcessChanged) {
		_paramData->getIndexInfo()._endTimeIntIndexList.push_back(_paramData->getDataNumber());
		_operation->reset();
	}
	// There is no more time interval to process
	else if (lParamDataIndexInfo._noMoreTimeInt) {
		_paramData->getIndexInfo()._endTimeIntIndexList.push_back(_paramData->getDataNumber());
	} else {
		// Nothing to do.
	}

	// Pull up information on which time interval changed.
	_paramData->getIndexInfo()._timeIntToProcessChanged = lParamDataIndexInfo._timeIntToProcessChanged;
	_paramData->getIndexInfo()._noMoreTimeInt = lParamDataIndexInfo._noMoreTimeInt;

	return ret;






	/*while (lRefParamDataIndexInfo._nbDataToProcess > 0)
	{
		lRefParamDataIndexInfo = _refParameterSPtr->getAsync(this).get();
	}*/


	//if (!_paramRefInput->getIndexInfo()._timeIntToProcessChanged) {

		/*unsigned int nbDataBeforeCallProcess = _paramRefInput->getDataNumber();

		ParamDataIndexInfo lRefParamDataIndexInfo;
		bool _treatTerminated = false;
		int ret = 0;
		do {
			lRefParamDataIndexInfo = _refParameterSPtr->getAsync(this).get();

			if ( ! _treatTerminated ) {
				ret = _paramRefInput->getDataNumber() - nbDataBeforeCallProcess;
				// As we do not have output data, continue processing the input data
			}
			if ( lRefParamDataIndexInfo._nbDataToProcess == 0) {
				_treatTerminated = true;
			}
		} while(ret == 0 && lRefParamDataIndexInfo._nbDataToProcess != 0);*/

		// Reset operation to prepare static data for the next TimeInterval.
		/*if (lRefParamDataIndexInfo._timeIntToProcessChanged) {
			_paramRefInput->getIndexInfo()._endTimeIntIndexList.push_back(_paramRefInput->getDataNumber());
		}
		// There is no more time interval to process
		else if (!lRefParamDataIndexInfo._timeIntToProcessChanged && lRefParamDataIndexInfo._nbDataToProcess == 0) {
			_paramRefInput->getIndexInfo()._endTimeIntIndexList.push_back(_paramRefInput->getDataNumber());
		} else {
			// Nothing to do.
		}

		_paramRefInput->getIndexInfo()._timeIntToProcessChanged = true;*/
	//}

	/*int ret = SingleParamProcess_CRTP::write();

	_paramRefInput->getIndexInfo()._timeIntToProcessChanged = _paramData->getIndexInfo()._timeIntToProcessChanged;

	return ret;*/
	//return lRefParamDataIndexInfo._nbDataToProcess;
}

/*
 * @brief Get min sampling
 */
double ProcessSamplingUnderRefParam::getMinSampling()
{
	if (_refParameterSPtr == nullptr)
		return 0;
	if (_refParameterSPtr->getDataWriterTemplate() == nullptr)
		return 0;
	return _refParameterSPtr->getDataWriterTemplate()->getMinSampling();
}

/**
 * @overload SingleParamProcess_CRTP::updateInfo update parameter info in relation to the sampling process
 */
void ProcessSamplingUnderRefParam::updateInfo(Parameter & parameter)
{
	LOG4CXX_DEBUG(_logger, "ProcessSamplingUnderRefParam::updateInfo - " << parameter.getId());

	SingleParamProcess::updateInfo(parameter);

	//Param info
	AMDA::Info::ParamInfoSPtr paramInfo = AMDA::Info::ParamMgr::getInstance()->getParamInfoFromId(parameter.getInfoId());

	if (paramInfo == nullptr)
		return;

	std::string processInfo = "Resampling of '";
	processInfo += _parameterInput->getId();
	processInfo += "' under times list of '";
	processInfo += _refParameterSPtr->getId();
	processInfo += "'";

	if (_parameterInput->getDataWriterTemplate()->useNearestValue())
			processInfo += ". Use nearest value.";

	paramInfo->setProcessInfo(processInfo);
}

} /* namespace Parameters */
} /* namespace AMDA */