VirtualInstrumentInterval.cc 10.2 KB
/*
 * VirtualInstrumentInterval.cc
 *
 *  Created on: Jan 17, 2013
 *      Author: f.casimir
 */

#include <climits>

#include "DicError.hh"
#include "TimeUtil.hh"

#include "DDServerInterfaceConfig.hh"
#include "VirtualInstrumentManager.hh"
#include "VirtualInstrument.hh"
#include "VirtualInstrumentInterval.hh"
#include "Packet.hh"
#include "TimeInterval.hh"

namespace AMDA {
	namespace DDServerInterface {

	using namespace VI;

		#define PARAM_TIME "Time"

		VirtualInstrumentInterval::VirtualInstrumentInterval(VirtualInstrument& pVI,
				TimeIntervalList* pTimeIntervalList) : Worker(),
				_vi(pVI),
				_timeIntervalList(*pTimeIntervalList),
				_currentTimeIntToProcess(_timeIntervalList.end()),
				_timeIntToProcessChanged(false),
				_noMoreTimeInt(false),
				_paramList(nullptr),
					_nodata(false) {
			// Set host name
			_ddClient.setUserHost(VirtualInstrumentManager::getInstance()->getUserHost());
			// Set user name			
			_ddClient.setUserName(VirtualInstrumentManager::getInstance()->getUserName());
			
			/// Open Connection
			_id = _ddClient.DD_SetVariable(const_cast<char *>(_vi.getViName().c_str()));
			LOG4CXX_INFO(gLogger,
					"ParamGetDDBase: DD_SetVariable("<< _vi.getViName() << ") returns = (" << _id << ")");

			if (_id < 0) {
				_ddClient.DD_Close(99);
				BOOST_THROW_EXCEPTION(exception() << errno_code(VirtualInstrumentManager::getInstance()->ddErrorToAmdaError(_id)));
			}

			_step = &VirtualInstrumentInterval::initAndWriteData;

			// Initialize server to point to the first TimeInterval.
			if (!setTimeDD()) {
				_noMoreTimeInt = true;
				LOG4CXX_ERROR(gLogger, "At least one time interval must be specified to send request");
				BOOST_THROW_EXCEPTION(AMDA::AMDA_exception() << AMDA::ex_msg("At least one time interval must be specified"));
			}

			active();
		}

		unsigned int VirtualInstrumentInterval::initAndWriteData() {
			int lIndex = 0;
			_paramList = new char*[_paramFlowList.size() + 1];
			_paramList[lIndex] = new char[strlen(PARAM_TIME) + 1];
			strcpy(_paramList[lIndex], PARAM_TIME);
			lIndex++;
			for (auto lParam : _paramFlowList) {
				_paramList[lIndex] = new char[lParam.first.size() + 1];
				strcpy(_paramList[lIndex], lParam.first.c_str());
				_paramNameListForLog += lParam.first + ":";
				lIndex++;
			}
			_step = &VirtualInstrumentInterval::writeData;
			return (this->*_step)();
		}

		unsigned int VirtualInstrumentInterval::writeData() {
			if (_nodata)
				return writeNoData();

			int error = 0;
			unsigned int ret = 0;
			DD_data_t *data = nullptr;

			_timeIntToProcessChanged = false;

			error = _ddClient.DD_GetMultiData(_id, _paramFlowList.size()+1, _paramList, _strTimeInt, &data, 1);

			while(error == MOREDELAY) {
				if (data[0].Variables == NULL) {
					sleep(2);
					error = _ddClient.DD_GetMultiData(_id, _paramFlowList.size()+1, _paramList, _strTimeInt, &data, 1);
				}
				else {
					break;
				}
			}

			LOG4CXX_INFO(gLogger, "ParamGetDDBase::getOneDDDataBloc DD_GetData( "<< _id << ", list name:  " << _paramNameListForLog << " , " << _strStartTime << "<=> " << TimeUtil::DD2ISO_TimeDate(_strStartTime) << ", " << _strTimeInt<< "<=> " << TimeUtil::DD2ISO_TimeInterval(_strTimeInt) << ") returns = (" << error << ")");
			// Received paramFlow is not empty
			if (error >= 0 || error == MOREDATA || error == MOREDELAY) {
				if (data[0].Variables != nullptr) {
					int lIndex = 0;
					Packet::DDDataSPtr lTime = Packet::DDDataSPtr(new  DD_data_t(data[0])); data[0].Dimensions=nullptr; data[0].Variables=nullptr;
					lIndex++;
					for (auto lParam : _paramFlowList) {
						if (data[lIndex].Variables != NULL) {
							Packet::DDDataSPtr lData = Packet::DDDataSPtr(new  DD_data_t(data[lIndex])); data[lIndex].Dimensions=nullptr; data[lIndex].Variables=nullptr;
							boost::shared_ptr<Packet> lPacket( new Packet(lTime,lData));
							assert(lPacket->data->VarNumber==lPacket->time->VarNumber);
							for (auto lWParamFlow : lParam.second) {
								ParamFlowSPtr lSParamFlow = lWParamFlow.lock(); // request a strong pointer
								if (lSParamFlow) {                 // is the object still alive?
									lSParamFlow->push(new Packet(*lPacket.get()));
								}
							}
						}
						lIndex++;
					}
					ret = data->VarNumber;
				} else {
					return writeEmptyData();
				}
			} else {
				LOG4CXX_INFO(gLogger,
						"ParamGetDDBase: DD_GetData( "<< _id <<", " << _paramFlowList.size() << ", " << _realTimeInt << ") returns = (" << error << ")");
				ddClose();
				BOOST_THROW_EXCEPTION(exception() << errno_code(VirtualInstrumentManager::getInstance()->ddErrorToAmdaError(error)));
			}

			// There is no more data to wait from server.
			if ((error != MOREDATA) && (error != MOREDELAY)) {
				_step = &VirtualInstrumentInterval::writeEmptyData;
			} else {
				// Nothing to do.
			}
			return ret;
		}

		unsigned int VirtualInstrumentInterval::writeEmptyData() {
			LOG4CXX_INFO(gLogger, "VirtualInstrumentInterval::writeEmptyData - " << _paramNameListForLog);

			// If there is an other TimeInterval to process, change _step function to retrieve data from server.
			if( (_currentTimeIntToProcess != _timeIntervalList.end()) && setTimeDD()) {
				// Time interval set to the next.
				_timeIntToProcessChanged = true;
				_step = &VirtualInstrumentInterval::writeData;
			} else {
				_noMoreTimeInt = true;
			}

			for (auto lParam : _paramFlowList) {
				for (auto lWParamFlow : lParam.second) {
					ParamFlowSPtr lSParamFlow = lWParamFlow.lock(); // request a strong pointer
					if (lSParamFlow) {                     // is the object still alive?
						lSParamFlow->push(nullptr);
					}
				}
			}

			return 0;
		}

		unsigned int VirtualInstrumentInterval::writeNoData() {
			LOG4CXX_INFO(gLogger, "VirtualInstrumentInterval::writeNoData - " << _paramNameListForLog);

			double crtStartTime = _currentTimeIntToProcess->_startTime;
			double crtStopTime = _currentTimeIntToProcess->_stopTime;

			// If there is an other TimeInterval to process, change _step function to retrieve data from server.
			if( (_currentTimeIntToProcess != _timeIntervalList.end()) && setTimeDD()) {
				// Time interval set to the next.
				_timeIntToProcessChanged = true;
			} else {
				_noMoreTimeInt = true;
				if (!_nodata)
					_step = &VirtualInstrumentInterval::writeData;
				else
					_step = &VirtualInstrumentInterval::writeEmptyData;
			}

			for (auto lParam : _paramFlowList) {
				for (auto lWParamFlow : lParam.second) {
					ParamFlowSPtr lSParamFlow = lWParamFlow.lock(); // request a strong pointer
						if (lSParamFlow) {                 // is the object still alive?
							//add empty data packet
							Packet* emptyPacket = new Packet();
							emptyPacket->startTime = crtStartTime;
							emptyPacket->stopTime = crtStopTime;
							lSParamFlow->push(emptyPacket);
						}
					}
			}

			return 0;
		}

		VirtualInstrumentInterval::~VirtualInstrumentInterval() {
			if(_paramList) {
				unsigned int lTabSize = _paramFlowList.size()+1;
				for ( unsigned int i = 0; i < lTabSize; ++i) {
					delete [] _paramList[i];
				}
				delete [] _paramList;
			}
			ddClose();
		}

		bool VirtualInstrumentInterval::setTimeDD() {
			_nodata = false;

			if (_currentTimeIntToProcess == _timeIntervalList.end()) {
				_currentTimeIntToProcess = _timeIntervalList.begin();
			} else if (++_currentTimeIntToProcess != _timeIntervalList.end()) {
				LOG4CXX_DEBUG(gLogger, "VirtualInstrumentInterval => Get Next TimeInterval - " << _paramNameListForLog);
				// Nothing to do.
			} else {
				LOG4CXX_DEBUG(gLogger, "VirtualInstrumentInterval => Reached all TimeInterval - " << _paramNameListForLog);
				_nodata = true;
				return false;
			}

			double realTime = 0;
			int error = 0;
			double lStartTime = (*_currentTimeIntToProcess)._startTime;
			double lTimeInt = (*_currentTimeIntToProcess)._stopTime - lStartTime;

			LOG4CXX_DEBUG(gLogger, "VirtualInstrumentInterval => start time: " << lStartTime << " time int: " << lTimeInt);

			Double2DD_Time(_strStartTime, lStartTime);
			Double2DD_Time(_strTimeInt, lTimeInt);

			error = _ddClient.DD_SetTimeInfo(_id, _strStartTime, &realTime);
			LOG4CXX_INFO(gLogger,
					"VirtualInstrumentInterval::_ddClient.DD_SetTimeInfo( "<< _id <<", " << _strStartTime << "<=> " << TimeUtil::DD2ISO_TimeDate(_strStartTime) << ") returns = (" << error << ")");

			if (error < 0) {
				if (error == OUTOFTIME) {
					_realTimeInt = lStartTime;
					Double2DD_Time(_strTimeInt,lStartTime);
					_nodata = true;
					return true;
				}
				ddClose();
				BOOST_THROW_EXCEPTION(exception() << errno_code(VirtualInstrumentManager::getInstance()->ddErrorToAmdaError(error)));
			}

			double deltaTime = realTime - lStartTime;
			if (deltaTime > 0) {
				if ((lTimeInt - deltaTime) < 0) {
					//error = AMDA_ERROR_NODATAATTIME;
					double globalStartTime = _vi.getGlobalStartTime();
					if (globalStartTime > (lStartTime + lTimeInt)) {
						//error = AMDA_ERROR_OUTOFTIME;
					}
					_realTimeInt = lStartTime;
					Double2DD_Time(_strTimeInt,lStartTime);
					_nodata = true;
					return true;
				}
				// Update time interval value
				// Do not update currentTimeIntToProcess because TimeIntervalList is shared between multiple classes.
				lTimeInt -= deltaTime;
				_realTimeInt = (*_currentTimeIntToProcess)._startTime + lTimeInt;
				Double2DD_Time(_strTimeInt,lTimeInt);
				LOG4CXX_INFO(gLogger,
						"VirtualInstrumentInterval: DD_SetTime - change _timeInt by " << _strTimeInt << "<=> " << TimeUtil::DD2ISO_TimeInterval(_strTimeInt));
			}

			return true;
		}


unsigned int VirtualInstrumentInterval::getOneDDDataBloc() {
	return (this->*_step)();
}

void VirtualInstrumentInterval::ddClose() {
	if(_id != -1) { _ddClient.DD_Close(_id); _id=-1; }
}

VI::ParamFlowSPtr VirtualInstrumentInterval::getParamFlow(const std::string& pParamName) {
	ParamFlowSPtr lParamFlow(new ParamFlow(*this));

	for(ParamFlowList::iterator it = _paramFlowList.begin(); it != _paramFlowList.end(); ++it) {
		LOG4CXX_DEBUG(gLogger, "VirtualInstrumentInterval::getParamFlow => param name: " << (*it).first);
	}

	auto lIt = _paramFlowList.find(pParamName);
	if ( lIt == _paramFlowList.end()) {
		std::vector<ParamFlowWPtr> lWeakList;
		lWeakList.push_back(lParamFlow);
		_paramFlowList.insert(make_pair(pParamName,lWeakList));
	} else {
		lIt->second.push_back(lParamFlow);
	}
	return lParamFlow;
}

} /* namespace DDServerInterface */
} /* namespace AMDA */