ParamInterval.cc
4.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
/*
* ParamInterval.cc
*
* Created on: Dec 11, 2012
* Author: f.casimir
*/
#include "ParamInterval.hh"
#include "DataClient.hh"
#include "ParamData.hh"
namespace AMDA {
namespace Parameters {
log4cxx::LoggerPtr ParamInterval::_logger(log4cxx::Logger::getLogger("AMDA-Kernel.ParamInterval"));
ParamInterval::ParamInterval(TimeIntervalListSPtr pTimeIntervalList, DataWriter* dataWriter) :
_timeIntervalList(pTimeIntervalList),
_dataWriter(dataWriter) {
}
ParamInterval::~ParamInterval() {
delete _dataWriter;
}
ParamDataIndexInfo ParamInterval::get(DataClient *dataClient) {
DataClientInfoList::iterator it = _dataClient.find(dataClient);
if (it != _dataClient.end()) {
ParamDataIndexInfo &lClientIndexInfo = it->second;
ParamDataIndexInfo &lParamDataIndexInfo = _dataWriter->getParamData()->getIndexInfo();
lClientIndexInfo._startIndex += lClientIndexInfo._nbDataToProcess;
// If DataClient and ParamData have not the same information on _timeIntToProcessChanged attribute,
// it signify that DataClient was not informed of time interval modification.
if ( (lClientIndexInfo._startIndex >= lParamDataIndexInfo._nbDataToProcess) &&
(lClientIndexInfo._timeIntToProcessChanged == lParamDataIndexInfo._timeIntToProcessChanged) ) {
lClientIndexInfo._nbDataToProcess = _dataWriter->write();
if (!isNAN(_dataWriter->getTimeRestriction())) {
_dataWriter->applyTimeRestriction(lClientIndexInfo._startIndex, lClientIndexInfo._nbDataToProcess);
}
lClientIndexInfo._timeIntToProcessChanged = lParamDataIndexInfo._timeIntToProcessChanged;
lClientIndexInfo._noMoreTimeInt = lParamDataIndexInfo._noMoreTimeInt;
// If time interval changed push back end index.
if(lParamDataIndexInfo._timeIntToProcessChanged || (!lParamDataIndexInfo._timeIntToProcessChanged && lParamDataIndexInfo._nbDataToProcess == 0)) {
lClientIndexInfo._endTimeIntIndexList.push_back(lClientIndexInfo._startIndex + lClientIndexInfo._nbDataToProcess);
}
} else {
// Give to the client number of data to process to give only TimeInterval per TimeInterval and not all.
// This case mean that a DataClient hadn't time to read data for a TimeInterval before an other one is requested, or
// for this TimeInterval data have been already received by another DataClient.
// Check before if DataClient end time index list is synchronized
if(lClientIndexInfo._endTimeIntIndexList.size() != lParamDataIndexInfo._endTimeIntIndexList.size()) {
lClientIndexInfo._endTimeIntIndexList = lParamDataIndexInfo._endTimeIntIndexList;
}
std::list<unsigned int>::iterator it = lClientIndexInfo._endTimeIntIndexList.begin();
// Set default number of data to process that correspond to read all stored in ParamData.
lClientIndexInfo._nbDataToProcess = lParamDataIndexInfo._nbDataToProcess - lClientIndexInfo._startIndex;
// Parse each end index that close a TimeInterval until a new one is found.
bool notFound = true;
do {
// Get end of the next TimeInterval
if ( (it != lClientIndexInfo._endTimeIntIndexList.end()) &&
((*it) > lClientIndexInfo._startIndex) ) {
lClientIndexInfo._nbDataToProcess = (*it) - lClientIndexInfo._startIndex;
notFound = false;
} else if (it != lClientIndexInfo._endTimeIntIndexList.end()) {
++it;
}
} while( (it != lClientIndexInfo._endTimeIntIndexList.end()) && notFound );
// Check if this step is enough to synchronize current state of parameter with DataClient.
// This occurs when end of a time interval is reached or when ParamData is precisely on the end of a time interval
// and ready to get data for the next time interval
if( (lClientIndexInfo._startIndex + lClientIndexInfo._nbDataToProcess) == lParamDataIndexInfo._nbDataToProcess) {
lClientIndexInfo._timeIntToProcessChanged = lParamDataIndexInfo._timeIntToProcessChanged;
}
// An other time interval is currently processed.
// DataClient must read data for its current time interval and then change time interval.
else if (it != lClientIndexInfo._endTimeIntIndexList.end()) {
lClientIndexInfo._timeIntToProcessChanged = true;
} else {
lClientIndexInfo._timeIntToProcessChanged = false;
}
lClientIndexInfo._noMoreTimeInt = lParamDataIndexInfo._noMoreTimeInt;
}
unsigned lIndexMin = lClientIndexInfo._startIndex;
for (DataClientInfoList::iterator itMin = _dataClient.begin();
itMin != _dataClient.end(); ++itMin) {
ParamDataIndexInfo &lClientIndexInfo = itMin->second;
unsigned int lCurrentIndexMin = lClientIndexInfo._startIndex;
if (lIndexMin >lCurrentIndexMin) {
lIndexMin = lCurrentIndexMin;
}
}
lParamDataIndexInfo._startIndex = lIndexMin;
_dataWriter->getParamData()->freeBefore(lIndexMin);
} else {
BOOST_THROW_EXCEPTION(AMDA::AMDA_exception() << AMDA::errno_code(AMDA_ERROR_UNKNOWN) );
}
return it->second;
}
} /* namespace Parameters */
} /* namespace AMDA */