Blame view

src/ParamGetImpl/DDServerInterface/VirtualInstrumentInterval.cc 9.9 KB
fbe3c2bb   Benjamin Renard   First commit
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/*
 * VirtualInstrumentInterval.cc
 *
 *  Created on: Jan 17, 2013
 *      Author: f.casimir
 */

#include <climits>

#include "DicError.hh"
#include "TimeUtil.hh"

#include "DDServerInterfaceConfig.hh"
#include "VirtualInstrumentManager.hh"
#include "VirtualInstrument.hh"
#include "VirtualInstrumentInterval.hh"
#include "Packet.hh"
#include "TimeInterval.hh"

namespace AMDA {
	namespace DDServerInterface {

	using namespace VI;

		#define PARAM_TIME "Time"

		VirtualInstrumentInterval::VirtualInstrumentInterval(VirtualInstrument& pVI,
				TimeIntervalList* pTimeIntervalList) : Worker(),
				_vi(pVI),
				_timeIntervalList(*pTimeIntervalList),
				_currentTimeIntToProcess(_timeIntervalList.end()),
				_timeIntToProcessChanged(false),
fa4b7852   Benjamin Renard   Fix bug around ti...
33
				_noMoreTimeInt(false),
a6490f4d   Benjamin Renard   Do not throw an e...
34
				_paramList(nullptr),
29bf0a3d   Elena.Budnik   setUserName() cal...
35
					_nodata(false) {
fbe3c2bb   Benjamin Renard   First commit
36
37
			// Set host name
			_ddClient.setUserHost(VirtualInstrumentManager::getInstance()->getUserHost());
29bf0a3d   Elena.Budnik   setUserName() cal...
38
39
40
			// Set user name			
			_ddClient.setUserName(VirtualInstrumentManager::getInstance()->getUserName());
			
fbe3c2bb   Benjamin Renard   First commit
41
42
43
44
45
46
47
48
49
			/// Open Connection
			_id = _ddClient.DD_SetVariable(const_cast<char *>(_vi.getViName().c_str()));
			LOG4CXX_INFO(gLogger,
					"ParamGetDDBase: DD_SetVariable("<< _vi.getViName() << ") returns = (" << _id << ")");

			if (_id < 0) {
				_ddClient.DD_Close(99);
				BOOST_THROW_EXCEPTION(exception() << errno_code(_id));
			}
a6490f4d   Benjamin Renard   Do not throw an e...
50

fbe3c2bb   Benjamin Renard   First commit
51
52
53
54
			_step = &VirtualInstrumentInterval::initAndWriteData;

			// Initialize server to point to the first TimeInterval.
			if (!setTimeDD()) {
fa4b7852   Benjamin Renard   Fix bug around ti...
55
				_noMoreTimeInt = true;
fbe3c2bb   Benjamin Renard   First commit
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
				LOG4CXX_ERROR(gLogger, "At least one time interval must be specified to send request");
				BOOST_THROW_EXCEPTION(AMDA::AMDA_exception() << AMDA::ex_msg("At least one time interval must be specified"));
			}

			active();
		}

		unsigned int VirtualInstrumentInterval::initAndWriteData() {
			int lIndex = 0;
			_paramList = new char*[_paramFlowList.size() + 1];
			_paramList[lIndex] = new char[strlen(PARAM_TIME) + 1];
			strcpy(_paramList[lIndex], PARAM_TIME);
			lIndex++;
			for (auto lParam : _paramFlowList) {
				_paramList[lIndex] = new char[lParam.first.size() + 1];
				strcpy(_paramList[lIndex], lParam.first.c_str());
				_paramNameListForLog += lParam.first + ":";
				lIndex++;
			}
			_step = &VirtualInstrumentInterval::writeData;
			return (this->*_step)();
		}

		unsigned int VirtualInstrumentInterval::writeData() {
a6490f4d   Benjamin Renard   Do not throw an e...
80
81
82
			if (_nodata)
				return writeNoData();

fbe3c2bb   Benjamin Renard   First commit
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
			int error = 0;
			unsigned int ret = 0;
			DD_data_t *data = nullptr;

			_timeIntToProcessChanged = false;

			error = _ddClient.DD_GetMultiData(_id, _paramFlowList.size()+1, _paramList, _strTimeInt, &data, 1);

			while(error == MOREDELAY) {
				sleep(2);
				error = _ddClient.DD_GetMultiData(_id, _paramFlowList.size()+1, _paramList, _strTimeInt, &data, 1);
			}

			LOG4CXX_INFO(gLogger, "ParamGetDDBase::getOneDDDataBloc DD_GetData( "<< _id << ", list name:  " << _paramNameListForLog << " , " << _strStartTime << "<=> " << TimeUtil::DD2ISO_TimeDate(_strStartTime) << ", " << _strTimeInt<< "<=> " << TimeUtil::DD2ISO_TimeInterval(_strTimeInt) << ") returns = (" << error << ")");
			// Received paramFlow is not empty
			if (error >= 0 || error == MOREDATA) {
fa4b7852   Benjamin Renard   Fix bug around ti...
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
				if (data[0].Variables != nullptr) {
					int lIndex = 0;
					Packet::DDDataSPtr lTime = Packet::DDDataSPtr(new  DD_data_t(data[0])); data[0].Dimensions=nullptr; data[0].Variables=nullptr;
					lIndex++;
					for (auto lParam : _paramFlowList) {
						if (data[lIndex].Variables != NULL) {
							Packet::DDDataSPtr lData = Packet::DDDataSPtr(new  DD_data_t(data[lIndex])); data[lIndex].Dimensions=nullptr; data[lIndex].Variables=nullptr;
							boost::shared_ptr<Packet> lPacket( new Packet(lTime,lData));
							assert(lPacket->data->VarNumber==lPacket->time->VarNumber);
							for (auto lWParamFlow : lParam.second) {
								ParamFlowSPtr lSParamFlow = lWParamFlow.lock(); // request a strong pointer
								if (lSParamFlow) {                 // is the object still alive?
									lSParamFlow->push(new Packet(*lPacket.get()));
								}
							}
fbe3c2bb   Benjamin Renard   First commit
114
						}
fa4b7852   Benjamin Renard   Fix bug around ti...
115
						lIndex++;
fbe3c2bb   Benjamin Renard   First commit
116
					}
fa4b7852   Benjamin Renard   Fix bug around ti...
117
118
119
					ret = data->VarNumber;
				} else {
					return writeEmptyData();
fbe3c2bb   Benjamin Renard   First commit
120
				}
fbe3c2bb   Benjamin Renard   First commit
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
			} else {

				LOG4CXX_INFO(gLogger,
						"ParamGetDDBase: DD_GetData( "<< _id <<", " << _paramFlowList.size() << ", " << _realTimeInt << ") returns = (" << error << ")");
				ddClose();
				BOOST_THROW_EXCEPTION(exception() << errno_code(error));
			}

			// There is no more data to wait from server.
			if (error != MOREDATA) {
				_step = &VirtualInstrumentInterval::writeEmptyData;
			} else {
				// Nothing to do.
			}
			return ret;
		}

		unsigned int VirtualInstrumentInterval::writeEmptyData() {
a6490f4d   Benjamin Renard   Do not throw an e...
139
			LOG4CXX_INFO(gLogger, "VirtualInstrumentInterval::writeEmptyData - " << _paramNameListForLog);
fbe3c2bb   Benjamin Renard   First commit
140
141
142
143
144
145
146

			// If there is an other TimeInterval to process, change _step function to retrieve data from server.
			if( (_currentTimeIntToProcess != _timeIntervalList.end()) && setTimeDD()) {
				// Time interval set to the next.
				_timeIntToProcessChanged = true;
				_step = &VirtualInstrumentInterval::writeData;
			} else {
fa4b7852   Benjamin Renard   Fix bug around ti...
147
				_noMoreTimeInt = true;
fbe3c2bb   Benjamin Renard   First commit
148
149
150
151
152
153
154
155
156
157
158
159
160
161
			}

			for (auto lParam : _paramFlowList) {
				for (auto lWParamFlow : lParam.second) {
					ParamFlowSPtr lSParamFlow = lWParamFlow.lock(); // request a strong pointer
					if (lSParamFlow) {                     // is the object still alive?
						lSParamFlow->push(nullptr);
					}
				}
			}

			return 0;
		}

a6490f4d   Benjamin Renard   Do not throw an e...
162
163
164
165
166
167
168
169
170
171
		unsigned int VirtualInstrumentInterval::writeNoData() {
			LOG4CXX_INFO(gLogger, "VirtualInstrumentInterval::writeNoData - " << _paramNameListForLog);

			double crtStartTime = _currentTimeIntToProcess->_startTime;
			double crtStopTime = _currentTimeIntToProcess->_stopTime;

			// If there is an other TimeInterval to process, change _step function to retrieve data from server.
			if( (_currentTimeIntToProcess != _timeIntervalList.end()) && setTimeDD()) {
				// Time interval set to the next.
				_timeIntToProcessChanged = true;
a6490f4d   Benjamin Renard   Do not throw an e...
172
			} else {
fa4b7852   Benjamin Renard   Fix bug around ti...
173
				_noMoreTimeInt = true;
d050fc57   Benjamin Renard   Fix a bug for a r...
174
175
176
177
				if (!_nodata)
					_step = &VirtualInstrumentInterval::writeData;
				else
					_step = &VirtualInstrumentInterval::writeEmptyData;
a6490f4d   Benjamin Renard   Do not throw an e...
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
			}

			for (auto lParam : _paramFlowList) {
				for (auto lWParamFlow : lParam.second) {
					ParamFlowSPtr lSParamFlow = lWParamFlow.lock(); // request a strong pointer
						if (lSParamFlow) {                 // is the object still alive?
							//add empty data packet
							Packet* emptyPacket = new Packet();
							emptyPacket->startTime = crtStartTime;
							emptyPacket->stopTime = crtStopTime;
							lSParamFlow->push(emptyPacket);
						}
					}
			}

a6490f4d   Benjamin Renard   Do not throw an e...
193
194
195
			return 0;
		}

fbe3c2bb   Benjamin Renard   First commit
196
197
198
199
200
201
202
203
204
205
206
207
		VirtualInstrumentInterval::~VirtualInstrumentInterval() {
			if(_paramList) {
				unsigned int lTabSize = _paramFlowList.size()+1;
				for ( unsigned int i = 0; i < lTabSize; ++i) {
					delete [] _paramList[i];
				}
				delete [] _paramList;
			}
			ddClose();
		}

		bool VirtualInstrumentInterval::setTimeDD() {
a6490f4d   Benjamin Renard   Do not throw an e...
208
209
			_nodata = false;

fbe3c2bb   Benjamin Renard   First commit
210
211
212
			if (_currentTimeIntToProcess == _timeIntervalList.end()) {
				_currentTimeIntToProcess = _timeIntervalList.begin();
			} else if (++_currentTimeIntToProcess != _timeIntervalList.end()) {
a6490f4d   Benjamin Renard   Do not throw an e...
213
				LOG4CXX_DEBUG(gLogger, "VirtualInstrumentInterval => Get Next TimeInterval - " << _paramNameListForLog);
fbe3c2bb   Benjamin Renard   First commit
214
215
				// Nothing to do.
			} else {
a6490f4d   Benjamin Renard   Do not throw an e...
216
				LOG4CXX_DEBUG(gLogger, "VirtualInstrumentInterval => Reached all TimeInterval - " << _paramNameListForLog);
d050fc57   Benjamin Renard   Fix a bug for a r...
217
				_nodata = true;
fbe3c2bb   Benjamin Renard   First commit
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
				return false;
			}

			double realTime = 0;
			int error = 0;
			double lStartTime = (*_currentTimeIntToProcess)._startTime;
			double lTimeInt = (*_currentTimeIntToProcess)._stopTime - lStartTime;

			LOG4CXX_DEBUG(gLogger, "VirtualInstrumentInterval => start time: " << lStartTime << " time int: " << lTimeInt);

			Double2DD_Time(_strStartTime, lStartTime);
			Double2DD_Time(_strTimeInt, lTimeInt);

			error = _ddClient.DD_SetTimeInfo(_id, _strStartTime, &realTime);
			LOG4CXX_INFO(gLogger,
a6490f4d   Benjamin Renard   Do not throw an e...
233
					"VirtualInstrumentInterval::_ddClient.DD_SetTimeInfo( "<< _id <<", " << _strStartTime << "<=> " << TimeUtil::DD2ISO_TimeDate(_strStartTime) << ") returns = (" << error << ")");
fbe3c2bb   Benjamin Renard   First commit
234
235

			if (error < 0) {
a6490f4d   Benjamin Renard   Do not throw an e...
236
237
238
239
240
241
				if (error == AMDA_ERROR_OUTOFTIME) {
					_realTimeInt = lStartTime;
					Double2DD_Time(_strTimeInt,lStartTime);
					_nodata = true;
					return true;
				}
fbe3c2bb   Benjamin Renard   First commit
242
243
244
245
246
247
248
249
250
				ddClose();
				BOOST_THROW_EXCEPTION(exception() << errno_code(error));
			}

			double deltaTime = realTime - lStartTime;
			if (deltaTime > 0) {
				if ((lTimeInt - deltaTime) < 0) {
					error = AMDA_ERROR_NODATAATTIME;
					double globalStartTime = _vi.getGlobalStartTime();
be09879f   Benjamin Renard   Fix a bug in DDSe...
251
					if (globalStartTime > (lStartTime + lTimeInt)) {
fbe3c2bb   Benjamin Renard   First commit
252
						error = AMDA_ERROR_OUTOFTIME;
be09879f   Benjamin Renard   Fix a bug in DDSe...
253
					}
a6490f4d   Benjamin Renard   Do not throw an e...
254
255
256
257
					_realTimeInt = lStartTime;
					Double2DD_Time(_strTimeInt,lStartTime);
					_nodata = true;
					return true;
fbe3c2bb   Benjamin Renard   First commit
258
259
260
261
262
263
264
				}
				// Update time interval value
				// Do not update currentTimeIntToProcess because TimeIntervalList is shared between multiple classes.
				lTimeInt -= deltaTime;
				_realTimeInt = (*_currentTimeIntToProcess)._startTime + lTimeInt;
				Double2DD_Time(_strTimeInt,lTimeInt);
				LOG4CXX_INFO(gLogger,
a6490f4d   Benjamin Renard   Do not throw an e...
265
						"VirtualInstrumentInterval: DD_SetTime - change _timeInt by " << _strTimeInt << "<=> " << TimeUtil::DD2ISO_TimeInterval(_strTimeInt));
fbe3c2bb   Benjamin Renard   First commit
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
			}

			return true;
		}


unsigned int VirtualInstrumentInterval::getOneDDDataBloc() {
	return (this->*_step)();
}

void VirtualInstrumentInterval::ddClose() {
	if(_id != -1) { _ddClient.DD_Close(_id); _id=-1; }
}

VI::ParamFlowSPtr VirtualInstrumentInterval::getParamFlow(const std::string& pParamName) {
	ParamFlowSPtr lParamFlow(new ParamFlow(*this));

	for(ParamFlowList::iterator it = _paramFlowList.begin(); it != _paramFlowList.end(); ++it) {
a6490f4d   Benjamin Renard   Do not throw an e...
284
		LOG4CXX_DEBUG(gLogger, "VirtualInstrumentInterval::getParamFlow => param name: " << (*it).first);
fbe3c2bb   Benjamin Renard   First commit
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
	}

	auto lIt = _paramFlowList.find(pParamName);
	if ( lIt == _paramFlowList.end()) {
		std::vector<ParamFlowWPtr> lWeakList;
		lWeakList.push_back(lParamFlow);
		_paramFlowList.insert(make_pair(pParamName,lWeakList));
	} else {
		lIt->second.push_back(lParamFlow);
	}
	return lParamFlow;
}

} /* namespace DDServerInterface */
} /* namespace AMDA */