ParamInterval.cc 4.83 KB
/*
 * ParamInterval.cc
 *
 *  Created on: Dec 11, 2012
 *      Author: f.casimir
 */

#include "ParamInterval.hh"
#include "DataClient.hh"
#include "ParamData.hh"

namespace AMDA {
namespace Parameters {

log4cxx::LoggerPtr ParamInterval::_logger(log4cxx::Logger::getLogger("AMDA-Kernel.ParamInterval"));

ParamInterval::ParamInterval(TimeIntervalListSPtr pTimeIntervalList, DataWriter* dataWriter) :
		_timeIntervalList(pTimeIntervalList),
		_dataWriter(dataWriter) {
}

ParamInterval::~ParamInterval() {
	  delete _dataWriter;
}

ParamDataIndexInfo  ParamInterval::get(DataClient *dataClient) {

	DataClientInfoList::iterator it = _dataClient.find(dataClient);
	if (it != _dataClient.end()) {
		ParamDataIndexInfo &lClientIndexInfo = it->second;
		ParamDataIndexInfo &lParamDataIndexInfo = _dataWriter->getParamData()->getIndexInfo();

		lClientIndexInfo._startIndex += lClientIndexInfo._nbDataToProcess;
		// If DataClient and ParamData have not the same information on _timeIntToProcessChanged attribute,
		// it signify that DataClient was not informed of time interval modification.
		if ( (lClientIndexInfo._startIndex  >= lParamDataIndexInfo._nbDataToProcess) &&
				(lClientIndexInfo._timeIntToProcessChanged == lParamDataIndexInfo._timeIntToProcessChanged) ) {

			lClientIndexInfo._nbDataToProcess = _dataWriter->write();
			if (!isNAN(_dataWriter->getTimeRestriction())) {
				_dataWriter->applyTimeRestriction(lClientIndexInfo._startIndex, lClientIndexInfo._nbDataToProcess);
			}
			lClientIndexInfo._timeIntToProcessChanged = lParamDataIndexInfo._timeIntToProcessChanged;
			lClientIndexInfo._noMoreTimeInt = lParamDataIndexInfo._noMoreTimeInt;

			// If time interval changed push back end index.
			if(lParamDataIndexInfo._timeIntToProcessChanged || (!lParamDataIndexInfo._timeIntToProcessChanged && lParamDataIndexInfo._nbDataToProcess == 0)) {
				lClientIndexInfo._endTimeIntIndexList.push_back(lClientIndexInfo._startIndex + lClientIndexInfo._nbDataToProcess);
			}

		} else {
			// Give to the client number of data to process to give only TimeInterval per TimeInterval and not all.
			// This case mean that a DataClient hadn't time to read data for a TimeInterval before an other one is requested, or
			// for this TimeInterval data have been already received by another DataClient.

			// Check before if DataClient end time index list is synchronized
			if(lClientIndexInfo._endTimeIntIndexList.size() != lParamDataIndexInfo._endTimeIntIndexList.size()) {
				lClientIndexInfo._endTimeIntIndexList = lParamDataIndexInfo._endTimeIntIndexList;
			}


			std::list<unsigned int>::iterator it = lClientIndexInfo._endTimeIntIndexList.begin();
			// Set default number of data to process that correspond to read all stored in ParamData.
			lClientIndexInfo._nbDataToProcess = lParamDataIndexInfo._nbDataToProcess - lClientIndexInfo._startIndex;
			// Parse each end index that close a TimeInterval until a new one is found.
			bool notFound = true;
			do {
				// Get end of the next TimeInterval
				if ( (it != lClientIndexInfo._endTimeIntIndexList.end()) &&
						((*it) > lClientIndexInfo._startIndex) ) {
					lClientIndexInfo._nbDataToProcess = (*it) - lClientIndexInfo._startIndex;
					notFound = false;
				} else if (it != lClientIndexInfo._endTimeIntIndexList.end()) {
					++it;
				}
			} while( (it != lClientIndexInfo._endTimeIntIndexList.end()) && notFound );

			// Check if this step is enough to synchronize current state of parameter with DataClient.
			// This occurs when end of a time interval is reached or when ParamData is precisely on the end of a time interval
			// and ready to get data for the next time interval
			if( (lClientIndexInfo._startIndex + lClientIndexInfo._nbDataToProcess) == lParamDataIndexInfo._nbDataToProcess) {
				lClientIndexInfo._timeIntToProcessChanged = lParamDataIndexInfo._timeIntToProcessChanged;
			}
			// An other time interval is currently processed.
			// DataClient must read data for its current time interval and then change time interval.
			else if (it != lClientIndexInfo._endTimeIntIndexList.end()) {
				lClientIndexInfo._timeIntToProcessChanged = true;
			} else {
				lClientIndexInfo._timeIntToProcessChanged = false;
			}
			lClientIndexInfo._noMoreTimeInt = lParamDataIndexInfo._noMoreTimeInt;

		}

		unsigned lIndexMin =  lClientIndexInfo._startIndex;
		for (DataClientInfoList::iterator itMin = _dataClient.begin();
				itMin != _dataClient.end(); ++itMin) {
			ParamDataIndexInfo &lClientIndexInfo = itMin->second;
			unsigned int lCurrentIndexMin = lClientIndexInfo._startIndex;
			if (lIndexMin >lCurrentIndexMin) {
				lIndexMin = lCurrentIndexMin;
			}
		}

		lParamDataIndexInfo._startIndex = lIndexMin;
		_dataWriter->getParamData()->freeBefore(lIndexMin);
	} else {
		BOOST_THROW_EXCEPTION(AMDA::AMDA_exception() << AMDA::errno_code(AMDA_ERROR_UNKNOWN) );
	}

	return it->second;
}

} /* namespace Parameters */
} /* namespace AMDA */