/* * StatisticProcess.cc * * Created on: Nov 05, 2014 * Author: AKKA */ #include "StatisticOperation.hh" #include "StatisticProcess.hh" #include "ParamData.hh" using namespace std; using namespace boost; using namespace log4cxx; namespace AMDA { namespace Parameters { LoggerPtr StatisticProcess::_logger(Logger::getLogger("AMDA-Kernel.StatisticProcess")); /** * * Default constructor. */ StatisticProcess::StatisticProcess(Parameter ¶meter, bool isCoverageIndependantOfResultType, int index) : DataClient(), _parameter(parameter), _args(), _isCoverageIndependantOfResultType(isCoverageIndependantOfResultType), _index(index), _timeIntervalList(), _currentTimeInterval(), _operation(NULL){ } /** * Default destructor. */ StatisticProcess::~StatisticProcess() { //_parameter.closeConnection(this); if (_operation != NULL) delete _operation; _operation = NULL; } /** * @overload DataClient::establishConnection */ void StatisticProcess::establishConnection() { _parameter.openConnection(this); } /** * @brief initializes time attribute */ TimeStamp StatisticProcess::init(TimeIntervalListSPtr pTimeIntervalList) { LOG4CXX_DEBUG(_logger,"StatisticProcess::init"); _timeIntervalList = pTimeIntervalList; _currentTimeInterval = _timeIntervalList->begin(); TimeStamp stamp = _parameter.init(this,pTimeIntervalList); createOperation(); return stamp; } /* * @brief Set arguments map */ void StatisticProcess::setArguments(std::map& args) { _args = args; } /** * @brief compute statistic result */ void StatisticProcess::compute(std::vector &result, std::vector &coverage) { //clear result vectors result.clear(); coverage.clear(); //get data and compute statistic result ParamDataIndexInfo lparamDataIndexInfo = _parameter.getAsync(this).get(); while ((!lparamDataIndexInfo._noMoreTimeInt && !lparamDataIndexInfo._timeIntToProcessChanged) || (lparamDataIndexInfo._nbDataToProcess > 0)) { _operation->compute(lparamDataIndexInfo); if (lparamDataIndexInfo._timeIntToProcessChanged || lparamDataIndexInfo._noMoreTimeInt) break; lparamDataIndexInfo = _parameter.getAsync(this).get(); } //finalize computation _operation->finalizeCompute(); //compute ideal number of data on the current interval AMDA::Parameters::ParamData *paramInput = _parameter.getParamData(this).get(); int ideal = ((_currentTimeInterval->_stopTime - _currentTimeInterval->_startTime) / paramInput->getMinSampling()) + 1; //push result _operation->pushResult(result,coverage,ideal,_isCoverageIndependantOfResultType,_index); //go to next interval ++_currentTimeInterval; _operation->reset(); } std::string StatisticProcess::getResultDimDefinition(bool forCoverage) { if (_operation == NULL) return "unknown"; if (_index >= 0) return "1"; return _operation->getResultDimDefinition(forCoverage); } std::string StatisticProcess::getUCD(void) { return "stat"; } } /* namespace Parameters */ } /* namespace AMDA */