/* * ParamInterval.cc * * Created on: Dec 11, 2012 * Author: f.casimir */ #include "ParamInterval.hh" #include "DataClient.hh" #include "ParamData.hh" namespace AMDA { namespace Parameters { log4cxx::LoggerPtr ParamInterval::_logger(log4cxx::Logger::getLogger("AMDA-Kernel.ParamInterval")); ParamInterval::ParamInterval(TimeIntervalListSPtr pTimeIntervalList, DataWriter* dataWriter) : _timeIntervalList(pTimeIntervalList), _dataWriter(dataWriter) { } ParamInterval::~ParamInterval() { delete _dataWriter; } ParamDataIndexInfo ParamInterval::get(DataClient *dataClient) { DataClientInfoList::iterator it = _dataClient.find(dataClient); if (it != _dataClient.end()) { ParamDataIndexInfo &lClientIndexInfo = it->second; ParamDataIndexInfo &lParamDataIndexInfo = _dataWriter->getParamData()->getIndexInfo(); lClientIndexInfo._startIndex += lClientIndexInfo._nbDataToProcess; // If DataClient and ParamData have not the same information on _timeIntToProcessChanged attribute, // it signify that DataClient was not informed of time interval modification. if ( (lClientIndexInfo._startIndex >= lParamDataIndexInfo._nbDataToProcess) && (lClientIndexInfo._timeIntToProcessChanged == lParamDataIndexInfo._timeIntToProcessChanged) ) { lClientIndexInfo._nbDataToProcess = _dataWriter->write(); if (!isNAN(_dataWriter->getTimeRestriction())) { _dataWriter->applyTimeRestriction(lClientIndexInfo._startIndex, lClientIndexInfo._nbDataToProcess); } lClientIndexInfo._timeIntToProcessChanged = lParamDataIndexInfo._timeIntToProcessChanged; lClientIndexInfo._noMoreTimeInt = lParamDataIndexInfo._noMoreTimeInt; // If time interval changed push back end index. if(lParamDataIndexInfo._timeIntToProcessChanged || (!lParamDataIndexInfo._timeIntToProcessChanged && lParamDataIndexInfo._nbDataToProcess == 0)) { lClientIndexInfo._endTimeIntIndexList.push_back(lClientIndexInfo._startIndex + lClientIndexInfo._nbDataToProcess); } } else { // Give to the client number of data to process to give only TimeInterval per TimeInterval and not all. // This case mean that a DataClient hadn't time to read data for a TimeInterval before an other one is requested, or // for this TimeInterval data have been already received by another DataClient. // Check before if DataClient end time index list is synchronized if(lClientIndexInfo._endTimeIntIndexList.size() != lParamDataIndexInfo._endTimeIntIndexList.size()) { lClientIndexInfo._endTimeIntIndexList = lParamDataIndexInfo._endTimeIntIndexList; } std::list::iterator it = lClientIndexInfo._endTimeIntIndexList.begin(); // Set default number of data to process that correspond to read all stored in ParamData. lClientIndexInfo._nbDataToProcess = lParamDataIndexInfo._nbDataToProcess - lClientIndexInfo._startIndex; // Parse each end index that close a TimeInterval until a new one is found. bool notFound = true; do { // Get end of the next TimeInterval if ( (it != lClientIndexInfo._endTimeIntIndexList.end()) && ((*it) > lClientIndexInfo._startIndex) ) { lClientIndexInfo._nbDataToProcess = (*it) - lClientIndexInfo._startIndex; notFound = false; } else if (it != lClientIndexInfo._endTimeIntIndexList.end()) { ++it; } } while( (it != lClientIndexInfo._endTimeIntIndexList.end()) && notFound ); // Check if this step is enough to synchronize current state of parameter with DataClient. // This occurs when end of a time interval is reached or when ParamData is precisely on the end of a time interval // and ready to get data for the next time interval if( (lClientIndexInfo._startIndex + lClientIndexInfo._nbDataToProcess) == lParamDataIndexInfo._nbDataToProcess) { lClientIndexInfo._timeIntToProcessChanged = lParamDataIndexInfo._timeIntToProcessChanged; } // An other time interval is currently processed. // DataClient must read data for its current time interval and then change time interval. else if (it != lClientIndexInfo._endTimeIntIndexList.end()) { lClientIndexInfo._timeIntToProcessChanged = true; } else { lClientIndexInfo._timeIntToProcessChanged = false; } lClientIndexInfo._noMoreTimeInt = lParamDataIndexInfo._noMoreTimeInt; } unsigned lIndexMin = lClientIndexInfo._startIndex; for (DataClientInfoList::iterator itMin = _dataClient.begin(); itMin != _dataClient.end(); ++itMin) { ParamDataIndexInfo &lClientIndexInfo = itMin->second; unsigned int lCurrentIndexMin = lClientIndexInfo._startIndex; if (lIndexMin >lCurrentIndexMin) { lIndexMin = lCurrentIndexMin; } } lParamDataIndexInfo._startIndex = lIndexMin; _dataWriter->getParamData()->freeBefore(lIndexMin); } else { BOOST_THROW_EXCEPTION(AMDA::AMDA_exception() << AMDA::errno_code(AMDA_ERROR_UNKNOWN) ); } return it->second; } } /* namespace Parameters */ } /* namespace AMDA */