VirtualInstrumentInterval.cc 7.66 KB
/*
 * VirtualInstrumentInterval.cc
 *
 *  Created on: Nov 25, 2014
 *      Author: AKKA
 */

#include <climits>

#include "DicError.hh"
#include "TimeUtil.hh"

#include "LocalFileInterfaceConfig.hh"
#include "VirtualInstrumentManager.hh"
#include "VirtualInstrument.hh"
#include "VirtualInstrumentInterval.hh"
#include "LocalParamData.hh"

#include "TimeInterval.hh"

namespace AMDA {
namespace LocalFileInterface {

VirtualInstrumentInterval::VirtualInstrumentInterval(VirtualInstrument& pVI,
	TimeIntervalList* pTimeIntervalList) : Worker(),
	_vi(pVI),
	_timeIntervalList(*pTimeIntervalList),
	_currentTimeIntToProcess(_timeIntervalList.end()),
	_timeIntToProcessChanged(false),
	_noMoreTimeInt(false),
	_crtFileIndex(0),
	_crtRecordIndex(0),
	_nodata(false)
{
	_step = &VirtualInstrumentInterval::writeData;

	// Initialize file reader to point to the first TimeInterval.
	if (!setTimeInterval())
	{
		LOG4CXX_ERROR(gLogger, "At least one time interval must be specified to send request");
		BOOST_THROW_EXCEPTION(AMDA::AMDA_exception() << AMDA::ex_msg("At least one time interval must be specified"));
	}

	active();
}

VirtualInstrumentInterval::~VirtualInstrumentInterval()
{
}

unsigned int VirtualInstrumentInterval::writeData()
{
	if (_nodata)
		return writeNoData();

	LOG4CXX_DEBUG(gLogger, "VirtualInstrumentInterval::writeData");

	if (_paramFlowSPtr == nullptr)
	{
		LOG4CXX_ERROR(gLogger, "VirtualInstrumentInterval::writeData - No param flow defined");
		BOOST_THROW_EXCEPTION(AMDA::AMDA_exception() << AMDA::ex_msg("No param flow defined"));
		return 0;
	}

	unsigned int ret = 0;

	_timeIntToProcessChanged = false;

	if (_currentTimeIntToProcess == _timeIntervalList.end()) {
		return 0;
	}

	double lStopTime  = (*_currentTimeIntToProcess)._stopTime;

	int crtIntervalRecIndex  = _crtRecordIndex;
	int crtIntervalFileIndex = _crtFileIndex;

	for(auto paramInfo : _paramFlowSPtr->getParamInfoMap())
	{
		ret = 0;

		if (paramInfo.second == nullptr)
			continue;

		std::string paramId = paramInfo.first;

		bool moreData = true;
		int crtParamRecIndex  = crtIntervalRecIndex;
		int crtParamFileIndex = crtIntervalFileIndex;

		LOG4CXX_DEBUG(gLogger, "VirtualInstrumentInterval::writeData for " << paramId << " - crtParamRecIndex = " << crtParamRecIndex << " - crtParamFileIndex " << crtParamFileIndex);

		while (moreData)
		{
			LocalParamDataPacket *pPacket= new LocalParamDataPacket();

			int paramDim1Size = paramInfo.second->getDim1Size();
			int paramDim2Size = paramInfo.second->getDim2Size();

			LocalContainerType containerType = CONTAINER_SCALAR;
			if ((paramDim1Size > 1) && (paramDim2Size <= 1))
				containerType = CONTAINER_VECTOR;
			else if ((paramDim1Size > 1) && (paramDim2Size > 1))
				containerType = CONTAINER_MATRIX;

			if (!pPacket->init(containerType,paramInfo.second->getType(),paramDim1Size,paramDim2Size))
			{
				delete pPacket;
				LOG4CXX_ERROR(gLogger, "VirtualInstrumentInterval::writeData - Cannot init data packet for " << paramId);
				moreData = false;
				continue;
			}

			FileReaderStatus status = _vi.getParamPacketData(paramId, crtParamFileIndex,
					crtParamRecIndex, lStopTime, pPacket);

			switch (status)
			{
			case FRS_MORE :
				_paramFlowSPtr->push(paramId,pPacket);
				//there is more data to get
				crtParamRecIndex += pPacket->getNbData();
				break;
			case FRS_EOF :
				//end of file
				_paramFlowSPtr->push(paramId,pPacket);
				//if it's last ParamFlow to proceed => update recordIndex and fileIndex
				crtParamRecIndex = 0;
				++crtParamFileIndex;
				break;
			case FRS_FINISH :
				//stop time is reached
				_paramFlowSPtr->push(paramId,pPacket);
				//_paramFlowSPtr->push(paramId,nullptr);
				moreData = false;
				break;
			case FRS_ERROR :
				//error detected
				delete pPacket;
				LOG4CXX_ERROR(gLogger, "VirtualInstrumentInterval::writeData - Error detected to write data for " << paramId);
				BOOST_THROW_EXCEPTION(AMDA::AMDA_exception() << AMDA::ex_msg("Cannot write data"));
				break;
			}
		}
	}

	_step = &VirtualInstrumentInterval::writeEmptyData;


	return ret;
}

unsigned int VirtualInstrumentInterval::writeEmptyData()
{
	LOG4CXX_INFO(gLogger, "VirtualInstrumentInterval::writeEmptyData");

	if ((_currentTimeIntToProcess != _timeIntervalList.end()) && setTimeInterval())
	{
		_timeIntToProcessChanged = true;
		_step = &VirtualInstrumentInterval::writeData;
	}
	else {
		_noMoreTimeInt = true;
	}

	for(auto paramInfo : _paramFlowSPtr->getParamInfoMap())
	{
		std::string paramId = paramInfo.first;
		_paramFlowSPtr->push(paramId,nullptr);
	}

	return 0;
}

unsigned int VirtualInstrumentInterval::writeNoData()
{
	LOG4CXX_INFO(gLogger, "VirtualInstrumentInterval::writeNoData");

	double crtStartTime = _currentTimeIntToProcess->_startTime;
	double crtStopTime = _currentTimeIntToProcess->_stopTime;

	// If there is an other TimeInterval to process, change _step function to retrieve data from server.
	if( (_currentTimeIntToProcess != _timeIntervalList.end()) && setTimeInterval()) {
		// Time interval set to the next.
		_timeIntToProcessChanged = true;
	} else {
		_noMoreTimeInt = true;
		if (!_nodata)
			_step = &VirtualInstrumentInterval::writeData;
		else
			_step = &VirtualInstrumentInterval::writeEmptyData;
	}

	for(auto paramInfo : _paramFlowSPtr->getParamInfoMap())
	{
		std::string paramId = paramInfo.first;
		LocalParamDataPacket *noDataPacket= new LocalParamDataPacket();
		noDataPacket->setNoData(crtStartTime, crtStopTime);
		_paramFlowSPtr->push(paramId, noDataPacket);
	}

	return 0;
}

bool VirtualInstrumentInterval::setTimeInterval()
{
	_nodata = false;

	if (_currentTimeIntToProcess == _timeIntervalList.end()) {
		_currentTimeIntToProcess = _timeIntervalList.begin();
	} else if (++_currentTimeIntToProcess != _timeIntervalList.end()) {
		LOG4CXX_DEBUG(gLogger, "VirtualInstrumentInterval => Get Next TimeInterval");
		// Nothing to do.
	} else {
		LOG4CXX_DEBUG(gLogger, "VirtualInstrumentInterval => Reached all TimeInterval");
		_nodata = true;
		return false;
	}

	double lStartTime = (*_currentTimeIntToProcess)._startTime;
	double lStopTime = (*_currentTimeIntToProcess)._stopTime;

	//if (lStopTime < _vi.getGlobalStartTime() || lStartTime > _vi.getGlobalStopTime())
	//{
	//	_nodata = true;
	//	return true;

		//LOG4CXX_ERROR(gLogger, 	"VirtualInstrumentInterval::setTimeInterval - Cannot get data for this time interval - Outside of global start and global stop definition");
		//BOOST_THROW_EXCEPTION(AMDA::AMDA_exception() << AMDA::errno_code(AMDA_OUTOFTIME_ERR) << AMDA::ex_msg("Cannot get data for this time interval - Outside of global start and global stop definition"));
	//}

	LOG4CXX_DEBUG(gLogger, "VirtualInstrumentInterval => start time: " << lStartTime << " lStopTime: " << lStopTime);

	//
	if (!_vi.getDataPosition(lStartTime, lStopTime, _crtFileIndex, _crtRecordIndex))
	{
		_nodata = true;
		return true;
		//LOG4CXX_ERROR(gLogger, 	"VirtualInstrumentInterval::setTimeInterval - Cannot get data for this time interval");
		//BOOST_THROW_EXCEPTION(AMDA::AMDA_exception() << AMDA::errno_code(AMDA_OUTOFTIME_ERR) << AMDA::ex_msg("Cannot get data for this time interval"));
	}

	LOG4CXX_DEBUG(gLogger, "VirtualInstrumentInterval => _crtFileIndex: " <<  _crtFileIndex << ", _crtRecordIndex: " << _crtRecordIndex);

	return true;
}


unsigned int VirtualInstrumentInterval::getOneDataBloc() {
	LOG4CXX_DEBUG(gLogger, "VirtualInstrumentInterval::getOneDataBloc");
	return (this->*_step)();
}

ParamFlowSPtr VirtualInstrumentInterval::getParamFlow(const std::string& pParamId,
		LocalParamType paramType, int dim1Size, int dim2Size)
{
	if (_paramFlowSPtr == nullptr)
		_paramFlowSPtr.reset(new ParamFlow(*this));

	_paramFlowSPtr->addAssociatedParam(pParamId,paramType,dim1Size,dim2Size);

	return _paramFlowSPtr;
}

} /* namespace DDServerInterface */
} /* namespace AMDA */