VirtualInstrumentInterval.cc 8.25 KB
/*
 * VirtualInstrumentInterval.cc
 *
 *  Created on: Jan 17, 2013
 *      Author: f.casimir
 */

#include <climits>

#include "DicError.hh"
#include "TimeUtil.hh"

#include "DDServerInterfaceConfig.hh"
#include "VirtualInstrumentManager.hh"
#include "VirtualInstrument.hh"
#include "VirtualInstrumentInterval.hh"
#include "Packet.hh"
#include "TimeInterval.hh"

namespace AMDA {
	namespace DDServerInterface {

	using namespace VI;

		#define PARAM_TIME "Time"

		VirtualInstrumentInterval::VirtualInstrumentInterval(VirtualInstrument& pVI,
				TimeIntervalList* pTimeIntervalList) : Worker(),
				_vi(pVI),
				_timeIntervalList(*pTimeIntervalList),
				_currentTimeIntToProcess(_timeIntervalList.end()),
				_timeIntToProcessChanged(false),
				_paramList(nullptr) {
			// Set host name
			_ddClient.setUserHost(VirtualInstrumentManager::getInstance()->getUserHost());

			/// Open Connection
			_id = _ddClient.DD_SetVariable(const_cast<char *>(_vi.getViName().c_str()));
			LOG4CXX_INFO(gLogger,
					"ParamGetDDBase: DD_SetVariable("<< _vi.getViName() << ") returns = (" << _id << ")");

			if (_id < 0) {
				_ddClient.DD_Close(99);
				BOOST_THROW_EXCEPTION(exception() << errno_code(_id));
			}
			_step = &VirtualInstrumentInterval::initAndWriteData;

			// Initialize server to point to the first TimeInterval.
			if (!setTimeDD()) {
				LOG4CXX_ERROR(gLogger, "At least one time interval must be specified to send request");
				BOOST_THROW_EXCEPTION(AMDA::AMDA_exception() << AMDA::ex_msg("At least one time interval must be specified"));
			}

			active();
		}

		unsigned int VirtualInstrumentInterval::initAndWriteData() {
			int lIndex = 0;
			_paramList = new char*[_paramFlowList.size() + 1];
			_paramList[lIndex] = new char[strlen(PARAM_TIME) + 1];
			strcpy(_paramList[lIndex], PARAM_TIME);
			lIndex++;
			for (auto lParam : _paramFlowList) {
				_paramList[lIndex] = new char[lParam.first.size() + 1];
				strcpy(_paramList[lIndex], lParam.first.c_str());
				_paramNameListForLog += lParam.first + ":";
				lIndex++;
			}
			_step = &VirtualInstrumentInterval::writeData;
			return (this->*_step)();
		}

		unsigned int VirtualInstrumentInterval::writeData() {
			int error = 0;
			unsigned int ret = 0;
			DD_data_t *data = nullptr;

			_timeIntToProcessChanged = false;

			error = _ddClient.DD_GetMultiData(_id, _paramFlowList.size()+1, _paramList, _strTimeInt, &data, 1);

			while(error == MOREDELAY) {
				sleep(2);
				error = _ddClient.DD_GetMultiData(_id, _paramFlowList.size()+1, _paramList, _strTimeInt, &data, 1);
			}

			LOG4CXX_INFO(gLogger, "ParamGetDDBase::getOneDDDataBloc DD_GetData( "<< _id << ", list name:  " << _paramNameListForLog << " , " << _strStartTime << "<=> " << TimeUtil::DD2ISO_TimeDate(_strStartTime) << ", " << _strTimeInt<< "<=> " << TimeUtil::DD2ISO_TimeInterval(_strTimeInt) << ") returns = (" << error << ")");
			// Received paramFlow is not empty
			if (error >= 0 || error == MOREDATA) {
				int lIndex = 0;
				Packet::DDDataSPtr lTime = Packet::DDDataSPtr(new  DD_data_t(data[0])); data[0].Dimensions=nullptr; data[0].Variables=nullptr;
				lIndex++;
				for (auto lParam : _paramFlowList) {
					Packet::DDDataSPtr lData = Packet::DDDataSPtr(new  DD_data_t(data[lIndex])); data[lIndex].Dimensions=nullptr; data[lIndex].Variables=nullptr;
					boost::shared_ptr<Packet> lPacket( new Packet(lTime,lData));
					assert(lPacket->data->VarNumber==lPacket->time->VarNumber);
					for (auto lWParamFlow : lParam.second) {
						ParamFlowSPtr lSParamFlow = lWParamFlow.lock(); // request a strong pointer
						if (lSParamFlow) {                 // is the object still alive?
							lSParamFlow->push(new Packet(*lPacket.get()));
						}
					}
					lIndex++;
				}
				ret = data->VarNumber;
			} else {

				LOG4CXX_INFO(gLogger,
						"ParamGetDDBase: DD_GetData( "<< _id <<", " << _paramFlowList.size() << ", " << _realTimeInt << ") returns = (" << error << ")");
				ddClose();
				BOOST_THROW_EXCEPTION(exception() << errno_code(error));
			}

			// There is no more data to wait from server.
			if (error != MOREDATA) {
				_step = &VirtualInstrumentInterval::writeEmptyData;
			} else {
				// Nothing to do.
			}
			return ret;
		}

		unsigned int VirtualInstrumentInterval::writeEmptyData() {
			LOG4CXX_INFO(gLogger, "VirtualInstrumentInterval::writeEmptyData");

			// If there is an other TimeInterval to process, change _step function to retrieve data from server.
			if( (_currentTimeIntToProcess != _timeIntervalList.end()) && setTimeDD()) {
				// Time interval set to the next.
				_timeIntToProcessChanged = true;
				_step = &VirtualInstrumentInterval::writeData;
			} else {
				// Nothing to do.
			}

			for (auto lParam : _paramFlowList) {
				for (auto lWParamFlow : lParam.second) {
					ParamFlowSPtr lSParamFlow = lWParamFlow.lock(); // request a strong pointer
					if (lSParamFlow) {                     // is the object still alive?
						lSParamFlow->push(nullptr);
					}
				}
			}

			return 0;
		}

		VirtualInstrumentInterval::~VirtualInstrumentInterval() {
			if(_paramList) {
				unsigned int lTabSize = _paramFlowList.size()+1;
				for ( unsigned int i = 0; i < lTabSize; ++i) {
					delete [] _paramList[i];
				}
				delete [] _paramList;
			}
			ddClose();
		}

		bool VirtualInstrumentInterval::setTimeDD() {
			if (_currentTimeIntToProcess == _timeIntervalList.end()) {
				_currentTimeIntToProcess = _timeIntervalList.begin();
			} else if (++_currentTimeIntToProcess != _timeIntervalList.end()) {
				LOG4CXX_DEBUG(gLogger, "VirtualInstrumentInterval => Get Next TimeInterval");
				// Nothing to do.
			} else {
				LOG4CXX_DEBUG(gLogger, "VirtualInstrumentInterval => Reached all TimeInterval");
				return false;
			}

			double realTime = 0;
			int error = 0;
			double lStartTime = (*_currentTimeIntToProcess)._startTime;
			double lTimeInt = (*_currentTimeIntToProcess)._stopTime - lStartTime;

			LOG4CXX_DEBUG(gLogger, "VirtualInstrumentInterval => start time: " << lStartTime << " time int: " << lTimeInt);

			Double2DD_Time(_strStartTime, lStartTime);
			Double2DD_Time(_strTimeInt, lTimeInt);

			error = _ddClient.DD_SetTimeInfo(_id, _strStartTime, &realTime);
			LOG4CXX_INFO(gLogger,
					"ParamGetDDBase::_ddClient.DD_SetTimeInfo( "<< _id <<", " << _strStartTime << "<=> " << TimeUtil::DD2ISO_TimeDate(_strStartTime) << ") returns = (" << error << ")");

			if (error < 0) {
				ddClose();
				BOOST_THROW_EXCEPTION(exception() << errno_code(error));
			}

			double deltaTime = realTime - lStartTime;
			if (deltaTime > 0) {
				if ((lTimeInt - deltaTime) < 0) {
					error = AMDA_ERROR_NODATAATTIME;
					double globalStartTime = _vi.getGlobalStartTime();
					if (globalStartTime > (lStartTime + lTimeInt))
						error = AMDA_ERROR_OUTOFTIME;
					ddClose();
					LOG4CXX_ERROR(gLogger,
							"ParamGetDDBase: DD_SetTime realTime " << realTime << "="<< TimeUtil::formatTimeDateInIso(realTime) << " > startTime + intervalTime " << TimeUtil::formatTimeDateInIso(lStartTime) << " + " << TimeUtil::formatTimeIntervalInIso(lTimeInt));
					BOOST_THROW_EXCEPTION(exception() << errno_code(error));

				}
				// Update time interval value
				// Do not update currentTimeIntToProcess because TimeIntervalList is shared between multiple classes.
				lTimeInt -= deltaTime;
				_realTimeInt = (*_currentTimeIntToProcess)._startTime + lTimeInt;
				Double2DD_Time(_strTimeInt,lTimeInt);
				LOG4CXX_INFO(gLogger,
						"ParamGetDDBase: DD_SetTime change _timeInt by " << _strTimeInt << "<=> " << TimeUtil::DD2ISO_TimeInterval(_strTimeInt));
			}

			return true;
		}


unsigned int VirtualInstrumentInterval::getOneDDDataBloc() {
	return (this->*_step)();
}

void VirtualInstrumentInterval::ddClose() {
	if(_id != -1) { _ddClient.DD_Close(_id); _id=-1; }
}

VI::ParamFlowSPtr VirtualInstrumentInterval::getParamFlow(const std::string& pParamName) {
	ParamFlowSPtr lParamFlow(new ParamFlow(*this));

	for(ParamFlowList::iterator it = _paramFlowList.begin(); it != _paramFlowList.end(); ++it) {
		LOG4CXX_DEBUG(gLogger, "VirtualInstrumentInterval => param name: " << (*it).first);
	}

	auto lIt = _paramFlowList.find(pParamName);
	if ( lIt == _paramFlowList.end()) {
		std::vector<ParamFlowWPtr> lWeakList;
		lWeakList.push_back(lParamFlow);
		_paramFlowList.insert(make_pair(pParamName,lWeakList));
	} else {
		lIt->second.push_back(lParamFlow);
	}
	return lParamFlow;
}

} /* namespace DDServerInterface */
} /* namespace AMDA */