/* * VirtualInstrumentInterval.cc * * Created on: Jan 17, 2013 * Author: f.casimir */ #include #include "DicError.hh" #include "TimeUtil.hh" #include "DDServerInterfaceConfig.hh" #include "VirtualInstrumentManager.hh" #include "VirtualInstrument.hh" #include "VirtualInstrumentInterval.hh" #include "Packet.hh" #include "TimeInterval.hh" namespace AMDA { namespace DDServerInterface { using namespace VI; #define PARAM_TIME "Time" VirtualInstrumentInterval::VirtualInstrumentInterval(VirtualInstrument& pVI, TimeIntervalList* pTimeIntervalList) : Worker(), _vi(pVI), _timeIntervalList(*pTimeIntervalList), _currentTimeIntToProcess(_timeIntervalList.end()), _timeIntToProcessChanged(false), _noMoreTimeInt(false), _paramList(nullptr), _nodata(false) { // Set host name _ddClient.setUserHost(VirtualInstrumentManager::getInstance()->getUserHost()); // Set user name _ddClient.setUserName(VirtualInstrumentManager::getInstance()->getUserName()); /// Open Connection _id = _ddClient.DD_SetVariable(const_cast(_vi.getViName().c_str())); LOG4CXX_INFO(gLogger, "ParamGetDDBase: DD_SetVariable("<< _vi.getViName() << ") returns = (" << _id << ")"); if (_id < 0) { _ddClient.DD_Close(99); BOOST_THROW_EXCEPTION(exception() << errno_code(VirtualInstrumentManager::getInstance()->ddErrorToAmdaError(_id))); } _step = &VirtualInstrumentInterval::initAndWriteData; // Initialize server to point to the first TimeInterval. if (!setTimeDD()) { _noMoreTimeInt = true; LOG4CXX_ERROR(gLogger, "At least one time interval must be specified to send request"); BOOST_THROW_EXCEPTION(AMDA::AMDA_exception() << AMDA::ex_msg("At least one time interval must be specified")); } active(); } unsigned int VirtualInstrumentInterval::initAndWriteData() { int lIndex = 0; _paramList = new char*[_paramFlowList.size() + 1]; _paramList[lIndex] = new char[strlen(PARAM_TIME) + 1]; strcpy(_paramList[lIndex], PARAM_TIME); lIndex++; for (auto lParam : _paramFlowList) { _paramList[lIndex] = new char[lParam.first.size() + 1]; strcpy(_paramList[lIndex], lParam.first.c_str()); _paramNameListForLog += lParam.first + ":"; lIndex++; } _step = &VirtualInstrumentInterval::writeData; return (this->*_step)(); } unsigned int VirtualInstrumentInterval::writeData() { if (_nodata) return writeNoData(); int error = 0; unsigned int ret = 0; DD_data_t *data = nullptr; _timeIntToProcessChanged = false; error = _ddClient.DD_GetMultiData(_id, _paramFlowList.size()+1, _paramList, _strTimeInt, &data, 1); while(error == MOREDELAY) { if (data[0].Variables == NULL) { sleep(2); error = _ddClient.DD_GetMultiData(_id, _paramFlowList.size()+1, _paramList, _strTimeInt, &data, 1); } else { break; } } LOG4CXX_INFO(gLogger, "ParamGetDDBase::getOneDDDataBloc DD_GetData( "<< _id << ", list name: " << _paramNameListForLog << " , " << _strStartTime << "<=> " << TimeUtil::DD2ISO_TimeDate(_strStartTime) << ", " << _strTimeInt<< "<=> " << TimeUtil::DD2ISO_TimeInterval(_strTimeInt) << ") returns = (" << error << ")"); // Received paramFlow is not empty if (error >= 0 || error == MOREDATA || error == MOREDELAY) { if (data[0].Variables != nullptr) { int lIndex = 0; Packet::DDDataSPtr lTime = Packet::DDDataSPtr(new DD_data_t(data[0])); data[0].Dimensions=nullptr; data[0].Variables=nullptr; lIndex++; for (auto lParam : _paramFlowList) { if (data[lIndex].Variables != NULL) { Packet::DDDataSPtr lData = Packet::DDDataSPtr(new DD_data_t(data[lIndex])); data[lIndex].Dimensions=nullptr; data[lIndex].Variables=nullptr; boost::shared_ptr lPacket( new Packet(lTime,lData)); assert(lPacket->data->VarNumber==lPacket->time->VarNumber); for (auto lWParamFlow : lParam.second) { ParamFlowSPtr lSParamFlow = lWParamFlow.lock(); // request a strong pointer if (lSParamFlow) { // is the object still alive? lSParamFlow->push(new Packet(*lPacket.get())); } } } lIndex++; } ret = data->VarNumber; } else { return writeEmptyData(); } } else { LOG4CXX_INFO(gLogger, "ParamGetDDBase: DD_GetData( "<< _id <<", " << _paramFlowList.size() << ", " << _realTimeInt << ") returns = (" << error << ")"); ddClose(); BOOST_THROW_EXCEPTION(exception() << errno_code(VirtualInstrumentManager::getInstance()->ddErrorToAmdaError(error))); } // There is no more data to wait from server. if ((error != MOREDATA) && (error != MOREDELAY)) { _step = &VirtualInstrumentInterval::writeEmptyData; } else { // Nothing to do. } return ret; } unsigned int VirtualInstrumentInterval::writeEmptyData() { LOG4CXX_INFO(gLogger, "VirtualInstrumentInterval::writeEmptyData - " << _paramNameListForLog); // If there is an other TimeInterval to process, change _step function to retrieve data from server. if( (_currentTimeIntToProcess != _timeIntervalList.end()) && setTimeDD()) { // Time interval set to the next. _timeIntToProcessChanged = true; _step = &VirtualInstrumentInterval::writeData; } else { _noMoreTimeInt = true; } for (auto lParam : _paramFlowList) { for (auto lWParamFlow : lParam.second) { ParamFlowSPtr lSParamFlow = lWParamFlow.lock(); // request a strong pointer if (lSParamFlow) { // is the object still alive? lSParamFlow->push(nullptr); } } } return 0; } unsigned int VirtualInstrumentInterval::writeNoData() { LOG4CXX_INFO(gLogger, "VirtualInstrumentInterval::writeNoData - " << _paramNameListForLog); double crtStartTime = _currentTimeIntToProcess->_startTime; double crtStopTime = _currentTimeIntToProcess->_stopTime; // If there is an other TimeInterval to process, change _step function to retrieve data from server. if( (_currentTimeIntToProcess != _timeIntervalList.end()) && setTimeDD()) { // Time interval set to the next. _timeIntToProcessChanged = true; } else { _noMoreTimeInt = true; if (!_nodata) _step = &VirtualInstrumentInterval::writeData; else _step = &VirtualInstrumentInterval::writeEmptyData; } for (auto lParam : _paramFlowList) { for (auto lWParamFlow : lParam.second) { ParamFlowSPtr lSParamFlow = lWParamFlow.lock(); // request a strong pointer if (lSParamFlow) { // is the object still alive? //add empty data packet Packet* emptyPacket = new Packet(); emptyPacket->startTime = crtStartTime; emptyPacket->stopTime = crtStopTime; lSParamFlow->push(emptyPacket); } } } return 0; } VirtualInstrumentInterval::~VirtualInstrumentInterval() { if(_paramList) { unsigned int lTabSize = _paramFlowList.size()+1; for ( unsigned int i = 0; i < lTabSize; ++i) { delete [] _paramList[i]; } delete [] _paramList; } ddClose(); } bool VirtualInstrumentInterval::setTimeDD() { _nodata = false; if (_currentTimeIntToProcess == _timeIntervalList.end()) { _currentTimeIntToProcess = _timeIntervalList.begin(); } else if (++_currentTimeIntToProcess != _timeIntervalList.end()) { LOG4CXX_DEBUG(gLogger, "VirtualInstrumentInterval => Get Next TimeInterval - " << _paramNameListForLog); // Nothing to do. } else { LOG4CXX_DEBUG(gLogger, "VirtualInstrumentInterval => Reached all TimeInterval - " << _paramNameListForLog); _nodata = true; return false; } double realTime = 0; int error = 0; double lStartTime = (*_currentTimeIntToProcess)._startTime; double lTimeInt = (*_currentTimeIntToProcess)._stopTime - lStartTime; LOG4CXX_DEBUG(gLogger, "VirtualInstrumentInterval => start time: " << lStartTime << " time int: " << lTimeInt); Double2DD_Time(_strStartTime, lStartTime); Double2DD_Time(_strTimeInt, lTimeInt); error = _ddClient.DD_SetTimeInfo(_id, _strStartTime, &realTime); LOG4CXX_INFO(gLogger, "VirtualInstrumentInterval::_ddClient.DD_SetTimeInfo( "<< _id <<", " << _strStartTime << "<=> " << TimeUtil::DD2ISO_TimeDate(_strStartTime) << ") returns = (" << error << ")"); if (error < 0) { if (error == OUTOFTIME) { _realTimeInt = lStartTime; Double2DD_Time(_strTimeInt,lStartTime); _nodata = true; return true; } ddClose(); BOOST_THROW_EXCEPTION(exception() << errno_code(VirtualInstrumentManager::getInstance()->ddErrorToAmdaError(error))); } double deltaTime = realTime - lStartTime; if (deltaTime > 0) { if ((lTimeInt - deltaTime) < 0) { //error = AMDA_ERROR_NODATAATTIME; double globalStartTime = _vi.getGlobalStartTime(); if (globalStartTime > (lStartTime + lTimeInt)) { //error = AMDA_ERROR_OUTOFTIME; } _realTimeInt = lStartTime; Double2DD_Time(_strTimeInt,lStartTime); _nodata = true; return true; } // Update time interval value // Do not update currentTimeIntToProcess because TimeIntervalList is shared between multiple classes. lTimeInt -= deltaTime; _realTimeInt = (*_currentTimeIntToProcess)._startTime + lTimeInt; Double2DD_Time(_strTimeInt,lTimeInt); LOG4CXX_INFO(gLogger, "VirtualInstrumentInterval: DD_SetTime - change _timeInt by " << _strTimeInt << "<=> " << TimeUtil::DD2ISO_TimeInterval(_strTimeInt)); } return true; } unsigned int VirtualInstrumentInterval::getOneDDDataBloc() { return (this->*_step)(); } void VirtualInstrumentInterval::ddClose() { if(_id != -1) { _ddClient.DD_Close(_id); _id=-1; } } VI::ParamFlowSPtr VirtualInstrumentInterval::getParamFlow(const std::string& pParamName) { ParamFlowSPtr lParamFlow(new ParamFlow(*this)); for(ParamFlowList::iterator it = _paramFlowList.begin(); it != _paramFlowList.end(); ++it) { LOG4CXX_DEBUG(gLogger, "VirtualInstrumentInterval::getParamFlow => param name: " << (*it).first); } auto lIt = _paramFlowList.find(pParamName); if ( lIt == _paramFlowList.end()) { std::vector lWeakList; lWeakList.push_back(lParamFlow); _paramFlowList.insert(make_pair(pParamName,lWeakList)); } else { lIt->second.push_back(lParamFlow); } return lParamFlow; } } /* namespace DDServerInterface */ } /* namespace AMDA */