From ae3e8cdf8a5becc9d044fec62cf95f557875cbdd Mon Sep 17 00:00:00 2001 From: Furkan Date: Thu, 6 Jun 2024 14:49:15 +0000 Subject: [PATCH] ParamGet of SpeasyProxy working - TBC --- config/xsd/parameter/getspeasyproxy.xsd | 1 + src/ParamGetImpl/SpeasyProxyInterface/FileReaderCDF.cc | 919 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/ParamGetImpl/SpeasyProxyInterface/FileReaderCDF.hh | 305 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/ParamGetImpl/SpeasyProxyInterface/GetSpeasyProxyNode.cc | 59 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++- src/ParamGetImpl/SpeasyProxyInterface/ParamGetSpeasyProxy.cc | 162 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------------- src/ParamGetImpl/SpeasyProxyInterface/ParamGetSpeasyProxy.hh | 110 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---- src/ParamGetImpl/SpeasyProxyInterface/Pusher.hh | 198 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------------------------------------------------------------------------ src/ParamGetImpl/SpeasyProxyInterface/SpeasyProxyParamData.hh | 435 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 8 files changed, 2075 insertions(+), 114 deletions(-) create mode 100644 src/ParamGetImpl/SpeasyProxyInterface/FileReaderCDF.cc create mode 100644 src/ParamGetImpl/SpeasyProxyInterface/FileReaderCDF.hh diff --git a/config/xsd/parameter/getspeasyproxy.xsd b/config/xsd/parameter/getspeasyproxy.xsd index 2e7a059..60019e5 100644 --- a/config/xsd/parameter/getspeasyproxy.xsd +++ b/config/xsd/parameter/getspeasyproxy.xsd @@ -9,6 +9,7 @@ + diff --git a/src/ParamGetImpl/SpeasyProxyInterface/FileReaderCDF.cc b/src/ParamGetImpl/SpeasyProxyInterface/FileReaderCDF.cc new file mode 100644 index 0000000..29d669d --- /dev/null +++ b/src/ParamGetImpl/SpeasyProxyInterface/FileReaderCDF.cc @@ -0,0 +1,919 @@ +/* + * FileReaderCDF.cc + * + * Created on: Nov 24, 2014 + * Author: AKKA + */ + +#include "FileReaderCDF.hh" + +#include "ParameterManager.hh" +#include "SpeasyProxyInterfaceConfig.hh" +#include "TimeUtil.hh" + +#include + +namespace AMDA { +namespace SpeasyProxyInterface { + +FileReaderCDF::FileReaderCDF() : _cdfid(NULL), _workingBuffers(new CDFWorkingBuffers) +{ +} + +FileReaderCDF::~FileReaderCDF() +{ +} + +bool FileReaderCDF::open(std::string filePath) +{ + boost::mutex::scoped_lock scoped_lock(AMDA::Parameters::ParameterManager::mutexCDFLib); + + if (isOpened()) + { + //LOG4CXX_ERROR(gLogger, "FileReaderCDF::open - A file is already opened"); + return true;//false; + } + + CDFstatus status = CDFopenCDF(filePath.c_str(), &_cdfid); + + return (status == CDF_OK); +} + +bool FileReaderCDF::close(void) +{ + boost::mutex::scoped_lock scoped_lock(AMDA::Parameters::ParameterManager::mutexCDFLib); + + if (isOpened()) + { + CDFstatus status = CDFcloseCDF(_cdfid); + + if (status == CDF_OK) + _cdfid = NULL; + } + + return (_cdfid == NULL); +} + +bool FileReaderCDF::isOpened(void) +{ + return (_cdfid != NULL); +} + +std::string FileReaderCDF::getTimeParamId(void) +{ + boost::mutex::scoped_lock scoped_lock(AMDA::Parameters::ParameterManager::mutexCDFLib); + + std::string result = ""; + + //search in rVariables + long numrVars; + CDFstatus status = CDFgetNumrVars (_cdfid, &numrVars); + + if (status == CDF_OK) + { + for (long i = 0; i < numrVars; ++i) + { + CDFVarInfo varInfo; + status = CDFvarInquire(_cdfid, i, varInfo._name, &varInfo._dataType, + &varInfo._numElts, &varInfo._recVary, varInfo._dimVarys); + if (status == CDF_OK) + { + if ((varInfo._dataType == CDF_EPOCH) || (varInfo._dataType == CDF_EPOCH16) || (varInfo._dataType == CDF_TIME_TT2000)) + { + result = varInfo._name; + return result; + } + } + } + } + + //search in zVariables + long numzVars; + status = CDFgetNumzVars (_cdfid, &numzVars); + + if (status == CDF_OK) + { + for (long i = 0; i < numzVars; ++i) + { + CDFVarInfo varInfo; + status = CDFinquirezVar(_cdfid, i, varInfo._name, &varInfo._dataType, + &varInfo._numElts, &varInfo._numDims, varInfo._dimSizes, + &varInfo._recVary, varInfo._dimVarys); + if (status == CDF_OK) + { + if ((varInfo._dataType == CDF_EPOCH) || (varInfo._dataType == CDF_EPOCH16) || (varInfo._dataType == CDF_TIME_TT2000)) + { + result = varInfo._name; + return result; + } + } + } + } + + return result; +} + +bool FileReaderCDF::getParamInfo(std::string& paramId, SpeasyProxyParamType& paramType, int& dim1Size, int& dim2Size) +{ + boost::mutex::scoped_lock scoped_lock(AMDA::Parameters::ParameterManager::mutexCDFLib); + + paramType = SpeasyProxyParamType::TYPE_UNKNOWN; + dim1Size = 0; + dim2Size = 0; + + //get CDF var info + CDFVarInfo varInfo; + if (!getCDFVarInfo(paramId, varInfo)) + return false; + + //get ParamType in relation to the CDF var type + switch (varInfo._dataType) + { + case CDF_REAL4 : + case CDF_FLOAT : + paramType = SpeasyProxyParamType::TYPE_FLOAT; + break; + case CDF_REAL8 : + case CDF_DOUBLE : + case CDF_EPOCH : + paramType = SpeasyProxyParamType::TYPE_DOUBLE; + break; + case CDF_EPOCH16 : + paramType = SpeasyProxyParamType::TYPE_EPOCH16; + break; + case CDF_BYTE : + case CDF_INT1 : + case CDF_UINT1 : + case CDF_INT2 : + case CDF_UINT2 : + paramType = SpeasyProxyParamType::TYPE_SHORT; + break; + case CDF_INT4 : + case CDF_UINT4 : + paramType = SpeasyProxyParamType::TYPE_INT; + break; + case CDF_TIME_TT2000 : + case CDF_INT8 : + paramType = SpeasyProxyParamType ::TYPE_TT2000; + break; + case CDF_CHAR : + case CDF_UCHAR : + default : + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getParamInfo - Unknown data type"); + return false; + } + + //set param size + if (varInfo._numDims > 2) + { + dim1Size = 0; + dim2Size = 0; + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getParamInfo - Unknown dimension"); + return false; + } + else if (varInfo._numDims == 2) + { + //Dimensions for a Tab2D + dim1Size = varInfo._dimSizes[0]; + dim2Size = varInfo._dimSizes[1]; + } + else if (varInfo._numDims == 1) + { + //Dimensions for a Vector + dim1Size = varInfo._dimSizes[0]; + dim2Size = 1; + } + else + { + //Dimensions for a Scalar + dim1Size = 1; + dim2Size = 1; + } + + return true; +} + +bool FileReaderCDF::getCDFVarInfo(std::string paramName, CDFVarInfo& varInfo) +{ + if (paramName.empty()) + { + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getCDFVarInfo - Empty param name"); + return false; + } + + if (paramName.size() >= CDF_VAR_NAME_LEN256) + { + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getCDFVarInfo - Param name too large"); + return false; + } + + //reset info structure + memset(&varInfo,0,sizeof(CDFVarInfo)); + + //confirm variable existence + memcpy(varInfo._name,paramName.c_str(),paramName.size()*sizeof(char)); + varInfo._name[paramName.size()] = '\0'; + + varInfo._num = CDFgetVarNum(_cdfid,varInfo._name); + + if (varInfo._num < 0) + { + LOG4CXX_INFO(gLogger, "FileReaderCDF::getCDFVarInfo - Cannot find variable"); + return false; + } + + //Is it a zVariable? + CDFstatus status = CDFconfirmzVarExistence(_cdfid,varInfo._name); + + bool isZVar = (status == CDF_OK); + + if (isZVar) + { + //get info about a zVariable + varInfo._type = CDFVT_Z; + + status = CDFinquirezVar(_cdfid, varInfo._num, varInfo._name, &varInfo._dataType, + &varInfo._numElts, &varInfo._numDims, varInfo._dimSizes, &varInfo._recVary, + varInfo._dimVarys); + + if (status != CDF_OK) + { + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getCDFVarInfo - Error to call CDFinquirezVar"); + return false; + } + + status = CDFgetzVarMaxWrittenRecNum(_cdfid, varInfo._num, &varInfo._maxRecNum); + + if (status != CDF_OK) + { + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getCDFVarInfo - Error to call CDFgetzVarMaxWrittenRecNum"); + return false; + } + + status = CDFgetDataTypeSize(varInfo._dataType,&varInfo._numBytes); + + if (status != CDF_OK) + { + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getCDFVarInfo - Error to call CDFgetDataTypeSize"); + return false; + } + } + else + { + //get info about a rVariable + varInfo._type = CDFVT_R; + + status = CDFvarInquire(_cdfid, varInfo._num, varInfo._name, + &varInfo._dataType, &varInfo._numElts, &varInfo._recVary, varInfo._dimVarys); + + if (status != CDF_OK) + { + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getCDFVarInfo - Error to call CDFvarInquire"); + return false; + } + + long encoding; + long majority; + long numVars; + long numAttrs; + status = CDFinquire (_cdfid, &varInfo._numDims, varInfo._dimSizes, &encoding, &majority, + &varInfo._maxRecNum, &numVars, &numAttrs); + + if (status != CDF_OK) + { + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getCDFVarInfo - Error to call CDFinquire"); + return false; + } + + status = CDFgetDataTypeSize(varInfo._dataType,&varInfo._numBytes); + + if (status != CDF_OK) + { + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getCDFVarInfo - Error to call CDFgetDataTypeSize"); + return false; + } + } + + return true; +} + +int FileReaderCDF::getRecordIndex(std::string& timeId, double time) +{ + LOG4CXX_DEBUG(gLogger, "FileReaderCDF::getRecordIndex"); + + boost::mutex::scoped_lock scoped_lock(AMDA::Parameters::ParameterManager::mutexCDFLib); + + //get corresponding epoch time + double timeEPOCH = timeStampToEPOCHTime(time); + + //get all time data + long nbRec; + long dimSize; + long dataType; + long numBytes; + + if (!getCDFData(timeId, 0, CDFWorkingBuffers::BufferType::BUFFER_TIME, + dataType, numBytes, nbRec, dimSize)) + { + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getRecordIndex - Cannot get time var data"); + return -1; + } + + if ((dataType != CDF_EPOCH) && (dataType != CDF_EPOCH16) && (dataType != CDF_TIME_TT2000)) + { + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getRecordIndex - Not a time var " << timeId); + return -1; + } + + //search nearest record + for (long i = 0; i < nbRec; ++i) + { + double crtTime; + switch (dataType) { + case CDF_EPOCH : + if (!extractDouble(CDFWorkingBuffers::BufferType::BUFFER_TIME, i, 0, dimSize,dataType,numBytes,crtTime)) + { + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getRecordIndex - Error to extract time data"); + return -1; + } + break; + case CDF_EPOCH16 : + double crtEpoch16Time[2]; + if (!extractEpoch16(CDFWorkingBuffers::BufferType::BUFFER_TIME, i, 0, dimSize,dataType,numBytes,crtEpoch16Time)) + { + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getRecordIndex - Error to extract time data"); + return -1; + } + crtTime = epoch16ToEpochTime(crtEpoch16Time); + break; + case CDF_TIME_TT2000 : + long long crtTT2000Time; + if (!extractTT2000(CDFWorkingBuffers::BufferType::BUFFER_TIME, i, 0, dimSize,dataType,numBytes,crtTT2000Time)) + { + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getRecordIndex - Error to extract time data"); + return -1; + } + crtTime = tt2000ToEpochTime(crtTT2000Time); + break; + default: + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getRecordIndex - Error to extract time data"); + return -1; + } + if (timeEPOCH <= crtTime) + return i; + } + + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getCDFVarInfo - Cannot get record index"); + return -1; +} + +FileReaderStatus FileReaderCDF::getParamPacketData(std::string& timeId, std::string& paramId, + SpeasyProxyParamDataPacket *packet) +{ + LOG4CXX_DEBUG(gLogger, "FileReaderCDF::getParamPacketData"); + + boost::mutex::scoped_lock scoped_lock(AMDA::Parameters::ParameterManager::mutexCDFLib); + + + //get the size of one value of a data in the packet + int packetValueSize; + switch (packet->getType()) + { + case SpeasyProxyParamType::TYPE_FLOAT: + packetValueSize = sizeof(float); + break; + case SpeasyProxyParamType::TYPE_DOUBLE: + packetValueSize = sizeof(double); + break; + case SpeasyProxyParamType::TYPE_SHORT: + packetValueSize = sizeof(short); + break; + case SpeasyProxyParamType::TYPE_INT: + packetValueSize = sizeof(int); + break; + default: + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getParamPacketData - ParamType not implemented for CDF format"); + return FRS_ERROR; + } + + //get time data + long nbTimeRec; + long dimTimeSize; + long dataTimeType; + long numTimeBytes; + if (!timeId.empty()) { + if (!getCDFData(timeId, 0, CDFWorkingBuffers::BufferType::BUFFER_TIME, dataTimeType, numTimeBytes, nbTimeRec, dimTimeSize)) + { + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getParamPacketData - Cannot get time var data for " << timeId); + return FRS_ERROR; + } + + if ((dataTimeType != CDF_EPOCH) && (dataTimeType != CDF_EPOCH16) && (dataTimeType != CDF_TIME_TT2000)) + { + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getParamPacketData - Bad time format " << timeId); + return FRS_ERROR; + } + } + else { + nbTimeRec = 1; + } + + //get data + long nbRec; + long dimSize; + long dataType; + long numBytes; + if (!getCDFData(paramId, 0, CDFWorkingBuffers::BufferType::BUFFER_DATA, dataType, numBytes, nbRec, dimSize)) + { + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getParamPacketData - Cannot get var data for " << paramId); + return FRS_ERROR; + } + + //check dimension integrity + if (dimSize != packet->getDimsSize()) + { + std::cout << dimSize << packet->getDimsSize() << std::endl; + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getParamPacketData - Incompatibility between dim size of the packet and the CDF var"); + return FRS_ERROR; + } + + //alloc buffer for one record of the packet + _workingBuffers->reallocBuffer(CDFWorkingBuffers::BufferType::BUFFER_PACKETREC, + packetValueSize*packet->getDimsSize()); + + for (int i = 0; i < nbTimeRec; ++i) + { + double crtTime = 0; + if (!timeId.empty()) { + //time + double crtTimeEPOCH; + switch (dataTimeType) { + case CDF_EPOCH : + if (!extractDouble(CDFWorkingBuffers::BufferType::BUFFER_TIME, i, 0, + dimTimeSize,dataTimeType,numTimeBytes,crtTimeEPOCH)) + { + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getParamPacketData - Error to extract time data"); + return FRS_ERROR; + } + break; + case CDF_EPOCH16 : + double crtTimeEPOCH16[2]; + if (!extractEpoch16(CDFWorkingBuffers::BufferType::BUFFER_TIME, i, 0, + dimTimeSize,dataTimeType,numTimeBytes,crtTimeEPOCH16)) + { + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getParamPacketData - Error to extract time data"); + return FRS_ERROR; + } + crtTimeEPOCH = epoch16ToEpochTime(crtTimeEPOCH16); + break; + case CDF_TIME_TT2000 : + long long crtTimeTT2000; + if (!extractTT2000(CDFWorkingBuffers::BufferType::BUFFER_TIME, i, 0, + dimTimeSize,dataTimeType,numTimeBytes,crtTimeTT2000)) + { + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getParamPacketData - Error to extract time data"); + return FRS_ERROR; + } + crtTimeEPOCH = tt2000ToEpochTime(crtTimeTT2000); + break; + default: + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getParamPacketData - Error to extract time data"); + return FRS_ERROR; + } + + crtTime = epochTimeToTimeStamp(crtTimeEPOCH); + } + + bool packetFull; + + if (i >= nbRec) + { + //add record data + if (!packet->addData(crtTime, + _workingBuffers->getBuffer(CDFWorkingBuffers::BufferType::BUFFER_PACKETREC), + packetFull)) + { + if (packetFull) + return FRS_MORE; + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getParamPacketData - Error to add data in packet"); + return FRS_ERROR; + } + continue; + } + + for (int j = 0; j < dimSize; ++j) + { + switch (packet->getType()) + { + case SpeasyProxyParamType::TYPE_FLOAT: + { + //extract and copy value in the buffer + float f; + if (!extractFloat(CDFWorkingBuffers::BufferType::BUFFER_DATA, i, j, + dimSize, dataType, numBytes, f)) + { + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getParamPacketData - Error to extract a data"); + return FRS_ERROR; + } + void* p = (void*)((intptr_t)_workingBuffers->getBuffer(CDFWorkingBuffers::BufferType::BUFFER_PACKETREC) + j*packetValueSize); + memcpy(p,&f,sizeof(float)); + } + break; + case SpeasyProxyParamType::TYPE_DOUBLE: + { + //extract and copy value in the buffer + double d; + if (!extractDouble(CDFWorkingBuffers::BufferType::BUFFER_DATA, i, j, + dimSize, dataType, numBytes, d)) + { + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getParamPacketData - Error to extract a data"); + return FRS_ERROR; + } + void* p = (void*)((intptr_t)_workingBuffers->getBuffer(CDFWorkingBuffers::BufferType::BUFFER_PACKETREC) + j*packetValueSize); + memcpy(p,&d,sizeof(double)); + } + break; + case SpeasyProxyParamType::TYPE_SHORT: + { + //extract and copy value in the buffer + short s; + if (!extractShort(CDFWorkingBuffers::BufferType::BUFFER_DATA, i, j, + dimSize, dataType, numBytes, s)) + { + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getParamPacketData - Error to extract a data"); + return FRS_ERROR; + } + void* p = (void*)((intptr_t)_workingBuffers->getBuffer(CDFWorkingBuffers::BufferType::BUFFER_PACKETREC) + j*packetValueSize); + memcpy(p,&s,sizeof(short)); + } + break; + case SpeasyProxyParamType::TYPE_INT: + { + //extract and copy value in the buffer + int ii; + if (!extractInt(CDFWorkingBuffers::BufferType::BUFFER_DATA, i, j, + dimSize, dataType, numBytes, ii)) + { + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getParamPacketData - Error to extract a data"); + return FRS_ERROR; + } + void* p = (void*)((intptr_t)_workingBuffers->getBuffer(CDFWorkingBuffers::BufferType::BUFFER_PACKETREC) + j*packetValueSize); + memcpy(p,&ii,sizeof(int)); + } + break; + default: + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getParamPacketData - ParamType not implemented for CDF format"); + return FRS_ERROR; + } + } + + //add data record in the packet + if (!packet->addData(crtTime, + _workingBuffers->getBuffer(CDFWorkingBuffers::BufferType::BUFFER_PACKETREC), + packetFull)) + { + if (packetFull) + return FRS_MORE; + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getParamPacketData - Error to add data in packet"); + return FRS_ERROR; + } + } + + return FRS_EOF; +} + +double FileReaderCDF::timeStampToEPOCHTime(double timeStamp) +{ + std::stringstream isoTime; + TimeUtil::formatTimeDateInIso(timeStamp, isoTime); + + std::string timeString = isoTime.str(); + + char timeChar[EPOCH4_STRING_LEN+1]; + memset(timeChar,0,(EPOCH4_STRING_LEN+1)*sizeof(char)); + + if ((timeString.size() > 0) && (timeString.size() <= EPOCH4_STRING_LEN)) + memcpy(timeChar,timeString.c_str(),timeString.size()*sizeof(char)); + + return parseEPOCH4(timeChar); +} + +double FileReaderCDF::epochTimeToTimeStamp(double epochTime) +{ + char isoTime[EPOCH4_STRING_LEN+1]; + encodeEPOCH4(epochTime,isoTime); + return TimeUtil::readTimeInIso(isoTime); +} + +double FileReaderCDF::epoch16ToEpochTime(double epoch16Time[]) +{ + long year, month, day, hour, minute, second, msec, microsec, nanosec, picosec; + EPOCH16breakdown(epoch16Time, &year, &month, &day, &hour, &minute, &second, &msec, µsec, &nanosec, &picosec); + return computeEPOCH(year, month, day, hour, minute, second, msec); +} + +double FileReaderCDF::tt2000ToEpochTime(long long tt2000Time) +{ + return CDF_TT2000_to_UTC_EPOCH(tt2000Time); +} + +bool FileReaderCDF::getCDFData(std::string& varId, long startRec, + CDFWorkingBuffers::BufferType bufferType, + long& dataType, long& numBytes, long& nbRec, long& dimSize) +{ + nbRec = 0; + dimSize = 0; + CDFVarInfo varInfo; + if (!getCDFVarInfo(varId, varInfo)) + { + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getCDFData - Cannot get param info " << varId); + return false; + } + + dataType = varInfo._dataType; + numBytes = varInfo._numBytes; + + if (varInfo._numElts > 1) + { + //only possible if CDF_CHAR + //not implemented (CDF_CHAR variable is not a valid variable for the ParamGet) + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getCDFData - _numElts >= 1! " << varId); + return false; + } + + if ((varInfo._maxRecNum <= 0) && (varInfo._recVary != 0)) + //no record in file + return true; + + nbRec = varInfo._maxRecNum - startRec + 1; + + dimSize = 1; + if (varInfo._numDims > 0) + for (int i = 0; i < varInfo._numDims; ++i) + dimSize *= varInfo._dimSizes[i]; + + _workingBuffers->reallocBuffer(bufferType, nbRec*varInfo._numBytes*dimSize); + + CDFstatus status = CDFgetVarRangeRecordsByVarName(_cdfid, varInfo._name, startRec, + varInfo._maxRecNum, _workingBuffers->getBuffer(bufferType)); + + if (status != CDF_OK) + { + LOG4CXX_ERROR(gLogger, "FileReaderCDF::getCDFData - Error to get data for " << varId); + return false; + } + + return true; +} + +bool FileReaderCDF::extractShort(CDFWorkingBuffers::BufferType bufferType, long recIndex, long dimIndex, long dimSize, + long dataType, long numBytes, short& value) +{ + void *p = (void*)((intptr_t)_workingBuffers->getBuffer(bufferType) + (recIndex*dimSize + dimIndex)*numBytes); + + switch(dataType) + { + case CDF_BYTE : + case CDF_INT1 : + { + char* c = (char*)p; + value = (short)(*c); + break; + } + case CDF_UINT1 : + { + unsigned char* uc = (unsigned char*)p; + value = (short)(*uc); + break; + } + case CDF_INT2 : + { + short* s = (short *)p; + value = (short)(*s); + break; + } + case CDF_UINT2 : + { + unsigned short* us = (unsigned short*)p; + value = (short)(*us); + break; + } + default: + return false; + } + return true; +} + +bool FileReaderCDF::extractInt(CDFWorkingBuffers::BufferType bufferType, long recIndex, long dimIndex, long dimSize, + long dataType, long numBytes, int& value) +{ + //try to extract a short + short s; + if (extractShort(bufferType, recIndex, dimIndex, dimSize, + dataType, numBytes, s)) + { + //cast short to int + value = (int)s; + return true; + } + + void *p = (void*)((intptr_t)_workingBuffers->getBuffer(bufferType) + (recIndex*dimSize + dimIndex)*numBytes); + switch (dataType) + { + case CDF_INT4 : + { + int* i = (int*)p; + value = (int)(*i); + break; + } + case CDF_UINT4 : + { + unsigned int* ui = (unsigned int*)(p); + value = (int)(*ui); + break; + } + default: + return false; + } + + return true; +} + +bool FileReaderCDF::extractFloat(CDFWorkingBuffers::BufferType bufferType, long recIndex, long dimIndex, long dimSize, + long dataType, long numBytes, float& value) +{ + //try to extract an int + int i; + if (extractInt(bufferType, recIndex, dimIndex, dimSize, + dataType, numBytes, i)) + { + //cast int to float + value = (float)i; + return true; + } + + void *p = (void*)((intptr_t)_workingBuffers->getBuffer(bufferType) + (recIndex*dimSize + dimIndex)*numBytes); + switch (dataType) + { + case CDF_REAL4 : + case CDF_FLOAT : + { + float* f = (float*)p; + value = (float)(*f); + break; + } + default : + return false; + } + + return true; +} + +bool FileReaderCDF::extractDouble(CDFWorkingBuffers::BufferType bufferType, long recIndex, long dimIndex, long dimSize, + long dataType, long numBytes, double& value) +{ + //try to extract an float + float f; + if (extractFloat(bufferType, recIndex, dimIndex, dimSize, + dataType, numBytes, f)) + { + //cast float to double + value = (double)f; + return true; + } + + void *p = (void*)((intptr_t)_workingBuffers->getBuffer(bufferType) + (recIndex*dimSize + dimIndex)*numBytes); + + switch(dataType) + { + case CDF_REAL8 : + case CDF_DOUBLE : + case CDF_EPOCH : + { + double* d = (double*)p; + value = (double)(*d); + break; + } + default : + return false; + } + + return true; +} + +bool FileReaderCDF::extractEpoch16(CDFWorkingBuffers::BufferType bufferType, long recIndex, long dimIndex, long dimSize, + long dataType, long numBytes, double value[]) +{ + void *p = (void*)((intptr_t)_workingBuffers->getBuffer(bufferType) + (recIndex*dimSize + dimIndex)*numBytes); + + switch(dataType) + { + case CDF_EPOCH16 : + { + std::memcpy(value, p, 2*sizeof(double)); + return true; + } + default : + return false; + } + + return true; +} + +bool FileReaderCDF::extractTT2000(CDFWorkingBuffers::BufferType bufferType, long recIndex, long dimIndex, long dimSize, + long dataType, long numBytes, long long& value) +{ + void *p = (void*)((intptr_t)_workingBuffers->getBuffer(bufferType) + (recIndex*dimSize + dimIndex)*numBytes); + + switch(dataType) + { + case CDF_TIME_TT2000 : + { + long long* ll = (long long*)p; + value = (long long)(*ll); + break; + } + default : + return false; + } + + return true; +} + +/* + * @brief Get an information + */ +bool FileReaderCDF::getInfo(const char* pInfoName, std::vector& res) +{ + LOG4CXX_DEBUG(gLogger, "FileReaderCDF::getInfo"); + + boost::mutex::scoped_lock scoped_lock(AMDA::Parameters::ParameterManager::mutexCDFLib); + + //get CDF var info + CDFVarInfo varInfo; + if (getCDFVarInfo(pInfoName, varInfo)) { + scoped_lock.unlock(); + std::string emptyTimeStr = ""; + std::string paramName = pInfoName; + SpeasyProxyParamType paramType; + int dim1Size, dim2Size; + getParamInfo(paramName, paramType, dim1Size, dim2Size); + + SpeasyProxyParamDataPacket packet; + + SpeasyProxyContainerType containerType = CONTAINER_SCALAR; + if ((dim1Size > 1) && (dim2Size <= 1)) + containerType = CONTAINER_VECTOR; + else if ((dim1Size > 1) && (dim2Size > 1)) + containerType = CONTAINER_MATRIX; + + packet.init(containerType,paramType,dim1Size,dim2Size); + getParamPacketData(emptyTimeStr, paramName, &packet); + if (packet.getNbData() > 0) { + for (int i = 0; i < dim1Size; ++i) { + for (int j = 0; j < dim2Size; ++j) { + switch (paramType) + { + case TYPE_FLOAT : + { + float f = 0.; + packet.getDataValue(&f, 0, i, j); + res.push_back(f); + break; + } + case TYPE_DOUBLE : + { + double d = 0;; + packet.getDataValue(&d, 0, i, j); + res.push_back(d); + break; + } + case TYPE_SHORT : + { + short s = 0; + packet.getDataValue(&s, 0, i, j); + res.push_back(s); + break; + } + case TYPE_INT : + { + int d = 0; + packet.getDataValue(&d, 0, i, j); + res.push_back(d); + break; + } + default: + return false; + } + } + } + packet.free(); + return true; + } + } + + return false; +} + +} /* SpeasyProxyInterface */ +} /* AMDA */ diff --git a/src/ParamGetImpl/SpeasyProxyInterface/FileReaderCDF.hh b/src/ParamGetImpl/SpeasyProxyInterface/FileReaderCDF.hh new file mode 100644 index 0000000..10c622b --- /dev/null +++ b/src/ParamGetImpl/SpeasyProxyInterface/FileReaderCDF.hh @@ -0,0 +1,305 @@ +/* + * FileReaderCDF.hh + * + * Created on: Nov 24, 2014 + * Author: AKKA + */ + +#ifndef FILEREADERCDF_HH_ +#define FILEREADERCDF_HH_ +#include +#include "SpeasyProxyParamData.hh" + +#include "cdf.h" + +namespace AMDA { +namespace SpeasyProxyInterface { + + +/* + * @brief Status for getParamPacketData function + */ +typedef enum +{ + //more data is needed (ie. the paket is full and must be proceed before to continue) + FRS_MORE, + //end of file + FRS_EOF, + //stop time is reached + FRS_FINISH, + //error detected + FRS_ERROR +} FileReaderStatus; +/* + * @brief Implementation of the class FileReaderAbstract to load a CDF file format + */ +class FileReaderCDF +{ +public: + /* + * @brief Constructor + */ + FileReaderCDF(); + + /* + * @brief Destructor + */ + ~FileReaderCDF(); + + /* + * @brief Open a CDF file + */ + bool open(std::string filePath); + + /* + * @brief Close the CDF file + */ + bool close(void); + + /* + * @brief Test if a CDF file is currently opened + */ + bool isOpened(void); + + /* + * @brief Get the id of the time param to use. For the CDF format, it's the first CDF_EPOCH var + */ + std::string getTimeParamId(void); + + /* + * @brief Get a param type and a param size in the CDF file + */ + bool getParamInfo(std::string& paramId, SpeasyProxyParamType& paramType, int& dim1Size, int& dim2Size); + + /* + * @brief Get the index of the nearest record of time (the higher one) in the CDF file + */ + int getRecordIndex(std::string& timeId, double time); + + /* + * @brief Get a param packet from the CDF file + */ + FileReaderStatus getParamPacketData(std::string& timeId, std::string& paramId, SpeasyProxyParamDataPacket *packet); + + /* + * @brief Get an information + */ + bool getInfo(const char* pInfoName, std::vector& res); + +protected: + /* + * @brief TimeStamp (Unix time) to EPOCH time conversion + */ + double timeStampToEPOCHTime(double timeStamp); + + /* + * @brief EPOCH time to TimeStamp conversion + */ + double epochTimeToTimeStamp(double epochTime); + + /* + * @brief EPOCH16 time to EPOCH conversion + */ + double epoch16ToEpochTime(double epoch16Time[]); + + /* + * @brief TT2000 time to EPOCH conversion + */ + double tt2000ToEpochTime(long long tt2000Time); + +private: + + /* + * @brief CDF var type : rVariable or zVariable + */ + typedef enum {CDFVT_R,CDFVT_Z} CDFVarType; + + /* + * @brief Working structure to contain CDF variable info + */ + typedef struct + { + //variable name + char _name[CDF_VAR_NAME_LEN256+1]; + //rVariable or zVariable + CDFVarType _type; + //variable number + long _num; + //data type + long _dataType; + //number of bytes for the data type + long _numBytes; + //number of elements (of the data type) + long _numElts; + //number of dimensions + long _numDims; + //dimension sizes + long _dimSizes[CDF_MAX_DIMS]; + //record variance + long _recVary; + //dimension variances + long _dimVarys[CDF_MAX_DIMS]; + //maximum record number + long _maxRecNum; + } CDFVarInfo; + + /* + * @brief Working buffers container + */ + class CDFWorkingBuffers + { + public: + /* + * @brief Buffer type: + */ + typedef enum { + //buffer used to store time data + BUFFER_TIME, + //buffer used to store param data + BUFFER_DATA, + //buffer used to store packet data for one record + BUFFER_PACKETREC + } BufferType; + + /* + * @brief Constructor + */ + CDFWorkingBuffers() : _timeBuffer(NULL), _dataBuffer(NULL), _packetRecBuffer(NULL) + { + } + + /* + * @brief Destructor + */ + ~CDFWorkingBuffers() + { + //free all buffers + if (_timeBuffer != NULL) + free(_timeBuffer); + if (_dataBuffer != NULL) + free(_dataBuffer); + if (_packetRecBuffer != NULL) + free(_packetRecBuffer); + } + + /* + * @brief Get buffer pointer from a buffer type + */ + void* getBuffer(BufferType type) + { + switch (type) + { + case BUFFER_TIME: + return _timeBuffer; + case BUFFER_DATA: + return _dataBuffer; + case BUFFER_PACKETREC: + return _packetRecBuffer; + } + return NULL; + } + + /* + * @brief Reallocate a buffer from a buffer type + */ + void reallocBuffer(BufferType type, long size) + { + switch (type) + { + case BUFFER_TIME: + if (_timeBuffer != NULL) + free(_timeBuffer); + _timeBuffer = malloc(size); + break; + case BUFFER_DATA: + if (_dataBuffer != NULL) + free(_dataBuffer); + _dataBuffer = malloc(size); + break; + case BUFFER_PACKETREC: + if (_packetRecBuffer != NULL) + free(_packetRecBuffer); + _packetRecBuffer = malloc(size); + break; + } + } + + private: + /* + * @brief Buffer for time data + */ + void* _timeBuffer; + + /* + * @brief Buffer for param data + */ + void* _dataBuffer; + + /* + * @brief Buffer for one record packet data + */ + void* _packetRecBuffer; + }; + + /* + * @brief CDF identifier + */ + CDFid _cdfid; + + /* + * @brief Working buffers container + */ + boost::shared_ptr _workingBuffers; + + /* + * @brief Get CDF variable Info from param name + */ + bool getCDFVarInfo(std::string paramName, CDFVarInfo& varInfo); + + /* + * @brief Get CDF data for paramId from startRec to the end of the file + */ + bool getCDFData(std::string& varId, long startRec, CDFWorkingBuffers::BufferType bufferType, + long& dataType, long& numBytes, long& nbRec, long& dimSize); + + /* + * @brief Extract a short to a CDF data buffer + */ + bool extractShort(CDFWorkingBuffers::BufferType bufferType, long recIndex, long dimIndex, long dimSize, + long dataType, long numBytes, short& value); + + /* + * @brief Extract a int to a CDF data buffer + */ + bool extractInt(CDFWorkingBuffers::BufferType bufferType, long recIndex, long dimIndex, long dimSize, + long dataType, long numBytes, int& value); + + /* + * @brief Extract a float to a CDF data buffer + */ + bool extractFloat(CDFWorkingBuffers::BufferType bufferType, long recIndex, long dimIndex, long dimSize, + long dataType, long numBytes, float& value); + + /* + * @brief Extract a int to a CDF data buffer + */ + bool extractDouble(CDFWorkingBuffers::BufferType bufferType, long recIndex, long dimIndex, long dimSize, + long dataType, long numBytes, double& value); + + /* + * @brief Extract a Epoch16 to a CDF data buffer + */ + bool extractEpoch16(CDFWorkingBuffers::BufferType bufferType, long recIndex, long dimIndex, long dimSize, + long dataType, long numBytes, double value[]); + + /* + * @brief Extract a TT2000 to a CDF data buffer + */ + bool extractTT2000(CDFWorkingBuffers::BufferType bufferType, long recIndex, long dimIndex, long dimSize, + long dataType, long numBytes, long long& value); +}; + +} /* SpeasyProxyInterface */ +} /* AMDA */ + +#endif /* FILEREADERCDF_HH_ */ diff --git a/src/ParamGetImpl/SpeasyProxyInterface/GetSpeasyProxyNode.cc b/src/ParamGetImpl/SpeasyProxyInterface/GetSpeasyProxyNode.cc index 71c1ee5..dbeb726 100644 --- a/src/ParamGetImpl/SpeasyProxyInterface/GetSpeasyProxyNode.cc +++ b/src/ParamGetImpl/SpeasyProxyInterface/GetSpeasyProxyNode.cc @@ -15,13 +15,15 @@ #include "ParamGetSpeasyProxy.hh" #include "ParameterManager.hh" +#include +#include + using namespace AMDA::Parameters; #include "Constant.hh" #include "Config.hh" #include "GetSpeasyProxyNode.hh" -// #include "LocalParamData.hh" using namespace AMDA::XMLConfigurator; @@ -71,6 +73,61 @@ public: lParamGet->setParamId(paramIdStr.c_str()); lParameter->setDataWriter(lDataWriter); + //type + xmlChar* value = NULL; + if ((value = xmlGetProp(pNode, (const xmlChar*)"type")) != NULL) + { + if (strcmp((const char*)value,"float") == 0) + lParamGet->setType(AMDA::SpeasyProxyInterface::SpeasyProxyParamType::TYPE_FLOAT); + else if (strcmp((const char*)value,"double") == 0) + lParamGet->setType(AMDA::SpeasyProxyInterface::SpeasyProxyParamType::TYPE_DOUBLE); + else if (strcmp((const char*)value,"short") == 0) + lParamGet->setType(AMDA::SpeasyProxyInterface::SpeasyProxyParamType::TYPE_SHORT); + else if ((strcmp((const char*)value,"int") == 0) || (strcmp((const char*)value,"integer") == 0)) + lParamGet->setType(AMDA::SpeasyProxyInterface::SpeasyProxyParamType::TYPE_INT); + else + { + LOG4CXX_ERROR(gLogger, "SpeasyProxyParamNode::proceed - Unknown data type " << ((const char*)value)) + } + xmlFree(value); + } + + //dim1 + + if ((value = xmlGetProp(pNode, (const xmlChar*)"dim1")) != NULL) + { + std::string dim1Str = std::string((const char*) value); + boost::trim(dim1Str); + int dim1 = boost::lexical_cast(dim1Str); + if(dim1 >= 0 && dim1 <= 3) + lParamGet->setDim1(dim1); + else + lParamGet->setDim1(1); + xmlFree(value); + + } + + //dim2 + if ((value = xmlGetProp(pNode, (const xmlChar*)"dim2")) != NULL) + { + std::string dim2Str = std::string((const char*) value); + boost::trim(dim2Str); + int dim2 = boost::lexical_cast(dim2Str); + if(dim2 >= 0 && dim2 <= 3) + lParamGet->setDim2(dim2); + else + lParamGet->setDim2(1); + xmlFree(value); + + } + + //minSampling + if ((value = xmlGetProp(pNode, (const xmlChar*)"minSampling")) != NULL) + { + lParamGet->setMinSampling(atof((const char*)value)); + xmlFree(value); + } + AMDA::Parameters::CfgContext lContext(pContext); lContext.push(lParamGet.get()); NodeGrpCfg::proceed(pNode, lContext); diff --git a/src/ParamGetImpl/SpeasyProxyInterface/ParamGetSpeasyProxy.cc b/src/ParamGetImpl/SpeasyProxyInterface/ParamGetSpeasyProxy.cc index 4a3deb1..7fdf479 100644 --- a/src/ParamGetImpl/SpeasyProxyInterface/ParamGetSpeasyProxy.cc +++ b/src/ParamGetImpl/SpeasyProxyInterface/ParamGetSpeasyProxy.cc @@ -9,6 +9,8 @@ #include "ParamGetSpeasyProxy.hh" // #include "VirtualInstrumentManager.hh" +#include "FileReaderCDF.hh" + #include #include "Parameter.hh" @@ -29,15 +31,18 @@ namespace SpeasyProxyInterface { ParamGetSpeasyProxy::ParamGetSpeasyProxy(Parameter ¶meter) : ParamGet_CRTP(parameter), - _paramId(""),_timeStamp(0),_type(TYPE_FLOAT), _container(CONTAINER_SCALAR) + _paramId(""),_type(TYPE_FLOAT),_dim1(1), _dim2(1), _minSampling(1), _container(CONTAINER_SCALAR),_timeStamp(0) { } ParamGetSpeasyProxy::ParamGetSpeasyProxy(const ParamGetSpeasyProxy &pParamGetSpeasyProxy, Parameter ¶meter) : ParamGet_CRTP(pParamGetSpeasyProxy, parameter), - _paramId(pParamGetSpeasyProxy._paramId), _timeStamp(pParamGetSpeasyProxy._timeStamp), - _type(pParamGetSpeasyProxy._type), _container(pParamGetSpeasyProxy._container) + _paramId(pParamGetSpeasyProxy._paramId), _type(pParamGetSpeasyProxy._type), + _dim1(pParamGetSpeasyProxy._dim1), _dim2(pParamGetSpeasyProxy._dim2), + _minSampling(pParamGetSpeasyProxy._minSampling), _container(pParamGetSpeasyProxy._container), + _timeStamp(pParamGetSpeasyProxy._timeStamp) + { } @@ -49,14 +54,6 @@ ParamGetSpeasyProxy::~ParamGetSpeasyProxy() // delete _pusher; } -/* - * @overload DataWriter::getMinSampling - */ -double ParamGetSpeasyProxy::getMinSampling() -{ - return 0; -} - std::string ParamGetSpeasyProxy::readFile(const std::string& filename) { // Create an input file stream std::ifstream file(filename); @@ -142,16 +139,97 @@ std::string ParamGetSpeasyProxy::download(const std::string& pPath) { TimeStamp ParamGetSpeasyProxy::init() { LOG4CXX_DEBUG(gLogger, "ParamGetSpeasyProxy::init"); - // Test si proxy accessible - // A Faire + + // Ici, instanciation du Pusher (en fonction du type et des dim définis dans le fichier XML) + + _currentInterval = _timeIntervalList->begin(); + + // get the right container + + if (_dim1 * _dim2 == 1) { + _container = CONTAINER_SCALAR; + } + else if ((_dim1 > 1) && (_dim2 > 1)) { + _container = CONTAINER_MATRIX; + } + else { + _container = CONTAINER_VECTOR; + } + + //create pusher + switch (_container) { + case CONTAINER_SCALAR : + switch (_type) { + case TYPE_FLOAT : + _pusher = new Pusher(); + break; + case TYPE_DOUBLE : + _pusher = new Pusher(); + break; + case TYPE_SHORT : + _pusher = new Pusher(); + break; + case TYPE_INT : + _pusher = new Pusher(); + break; + default: + LOG4CXX_ERROR(gLogger, "ParamGetSpeasyProxy::init() - Unknown type" << (const char*)_type); + + } + break; + case CONTAINER_VECTOR : + switch (_type) { + case TYPE_FLOAT : + _pusher = new Pusher(_dim1); + break; + case TYPE_DOUBLE : + _pusher = new Pusher(_dim1); + break; + case TYPE_SHORT : + _pusher = new Pusher(_dim1); + break; + case TYPE_INT : + _pusher = new Pusher(_dim1); + break; + default: + LOG4CXX_ERROR(gLogger, "ParamGetSpeasyProxy::init() - Unknown type" << (const char*)_type); + } + break; + case CONTAINER_MATRIX : + switch (_type) { + case TYPE_FLOAT : + _pusher = new Pusher(_dim1, _dim2); + break; + case TYPE_DOUBLE : + _pusher = new Pusher(_dim1, _dim2); + break; + case TYPE_SHORT : + _pusher = new Pusher(_dim1, _dim2); + break; + case TYPE_INT : + _pusher = new Pusher(_dim1, _dim2); + break; + default: + LOG4CXX_ERROR(gLogger, "ParamGetSpeasyProxy::init() - Unknown type" << (const char*)_type); + } + break; + default: + LOG4CXX_ERROR(gLogger, "ParamGetSpeasyProxy::init() - Unknown container format " << (const char*)_container); + + } + //set link to the param data + _paramData = ParamDataSPtr(_pusher->getParamData()); + _paramData->setMinSampling(_minSampling); - std::string link = "http://172.200.0.14:6543/get_data?path=amda%2Fimf&start_time=2008-01-01T00%3A00%3A00&stop_time=2008-01-02T00%3A00%3A00&format=json&zstd_compression=false&pickle_proto=3"; - std::string localPath = download(link); - std::string value = readFile(localPath); + if (_timeStamp == 0 && _signatureTrigger != "") { + // _signatureTrigger must be a name of xml parameter file + _timeStamp = AMDA::Helpers::Helper::dateOfFile( + _signatureTrigger.c_str()); + } - _container = CONTAINER_VECTOR; // For now + return _timeStamp; } @@ -160,6 +238,56 @@ unsigned int ParamGetSpeasyProxy::write() { unsigned int result = 0; + //std::string link = "http://172.200.0.14:6543/get_data?path=amda%2Fimf&start_time=2008-01-01T00%3A00%3A00&stop_time=2008-01-02T00%3A00%3A00&format=json&zstd_compression=false&pickle_proto=3"; + std::string localPath = "/home/amda_admin/AMDA/AMDA_Kernel/cdf_speasy.cdf"; + FileReaderCDF* fileReaderPtr = new FileReaderCDF(); + bool isOpen = fileReaderPtr->open(localPath); + if(isOpen) + { + std::string timeParamId = "time"; + std::string croptedParamId = _paramId; + std::string delim = "/"; + croptedParamId.erase(0, croptedParamId.find(delim) + delim.length()); + + // Call to getParamPacketData to get CDF data + + SpeasyProxyParamDataPacket *packet= new SpeasyProxyParamDataPacket(); + packet->init(_container,_type,_dim1,_dim2); + + fileReaderPtr->getParamPacketData(timeParamId, croptedParamId, packet); + + + // Call to the put function of pusher to initiate the ParamData + // result == nb of data in ParamData + result += _pusher->put(packet); + + // Push up the information if all time interval was processed. + _paramData->getIndexInfo()._timeIntToProcessChanged = true; // ATTENTION - valeur forcée pour les tests!!!!! + if (true) { + ++_currentInterval; + _paramData->getIndexInfo()._noMoreTimeInt = (_currentInterval == _timeIntervalList->end()); + } + + _paramData->getIndexInfo()._nbDataToProcess = result; + + // if time interval changed store index which delimit the end of the time interval. + if (_paramData->getIndexInfo()._timeIntToProcessChanged) { + unsigned int lEndTimeIntIndex = _paramData->getIndexInfo()._nbDataToProcess; + _paramData->getIndexInfo()._endTimeIntIndexList.push_back(lEndTimeIntIndex); + } + else { + // Nothing to do. + } + + return result; + } + + //close the file + if (!fileReaderPtr->close()) + { + LOG4CXX_ERROR(gLogger, "ParamGetSpeasy::init - Cannot close file " << localPath); + } + return result; } diff --git a/src/ParamGetImpl/SpeasyProxyInterface/ParamGetSpeasyProxy.hh b/src/ParamGetImpl/SpeasyProxyInterface/ParamGetSpeasyProxy.hh index 9af7433..9c527ae 100644 --- a/src/ParamGetImpl/SpeasyProxyInterface/ParamGetSpeasyProxy.hh +++ b/src/ParamGetImpl/SpeasyProxyInterface/ParamGetSpeasyProxy.hh @@ -45,7 +45,7 @@ public: virtual ~ParamGetSpeasyProxy(); /* - * @brief Get param id in local base + * @brief Get param id in speasy xml */ const std::string& getParamId() const { @@ -53,7 +53,7 @@ public: } /* - * @brief Set param id in local base + * @brief Set param id in speasy xml */ void setParamId(const char* paramId) { @@ -66,9 +66,84 @@ public: virtual unsigned int write(); /* - * @overload DataWriter::getMinSampling + * @brief Get param type in speasy xml */ - virtual double getMinSampling(); + SpeasyProxyParamType getType() + { + return _type; + } + + /* + * @brief Set param type in speasy xml + */ + void setType(SpeasyProxyParamType type) + { + _type = type; + } + + /* + * @brief Get param dim1 size in speasy xml + */ + int getDim1(void) + { + return _dim1; + } + + /* + * @brief Set param dim1 size in speasy xml + */ + void setDim1(int size) + { + _dim1 = size; + } + + /* + * @brief Get param dim2 size in speasy xml + */ + int getDim2(void) + { + return _dim2; + } + + /* + * @brief Set param dim2 size in speasy xml + */ + void setDim2(int size) + { + _dim2 = size; + } + + /* + * @brief Get param min sampling in speasy xml + */ + double getMinSampling(void) + { + return _minSampling; + } + + /* + * @brief Set param min sampling in speasy xml + */ + void setMinSampling(double minSampling) + { + _minSampling = minSampling; + } + + // /* + // * @brief Get param max sampling in speasy xml + // */ + // double getMaxSampling(void) + // { + // return _maxSampling; + // } + + // /* + // * @brief Set param max sampling in speasy xml + // */ + // void setMaxSampling(double maxSampling) + // { + // _maxSampling = maxSampling; + // } /* * @brief Init @@ -93,8 +168,33 @@ protected: */ std::string _paramId; + /* + * @brief Local Param type + */ + SpeasyProxyParamType _type; + /* + * @brief Local Param dim1 size + */ + int _dim1; + + /* + * @brief Local Param dim2 size + */ + int _dim2; + + /* + * @brief Local Param min sampling + */ + double _minSampling; + + // /* + // * @brief Local Param max sampling + // */ + // double _maxSampling; + + SpeasyProxyContainerType _container; /* @@ -102,6 +202,8 @@ protected: */ TimeStamp _timeStamp; + TimeIntervalList::iterator _currentInterval; + /* * @brief Data pusher */ diff --git a/src/ParamGetImpl/SpeasyProxyInterface/Pusher.hh b/src/ParamGetImpl/SpeasyProxyInterface/Pusher.hh index 10cb5ac..654d3bb 100644 --- a/src/ParamGetImpl/SpeasyProxyInterface/Pusher.hh +++ b/src/ParamGetImpl/SpeasyProxyInterface/Pusher.hh @@ -29,7 +29,7 @@ public: /* * @brief Constructor */ - PusherBase (double sampling, double value, int dim1 = 1, int dim2 = 1) : _paramData(NULL), _sampling(sampling), _value(value), _dim1(dim1), _dim2(dim2) + PusherBase (int dim1 = 1, int dim2 = 1) : _paramData(NULL), _dim1(dim1), _dim2(dim2) { } @@ -51,7 +51,7 @@ public: /* * @brief Virtual method to put a packet in the ParamData */ - virtual int put(double startTime, double stopTime, int lastIndex) = 0; + virtual int put(SpeasyProxyParamDataPacket* packet) = 0; protected: /* @@ -60,16 +60,6 @@ protected: AMDA::Parameters::ParamData* _paramData; /* - * @brief Sampling value - */ - double _sampling; - - /* - * @brief Constant value - */ - double _value; - - /* * @brief For Vector and Tab2D dimension */ int _dim1; @@ -117,7 +107,7 @@ public: /* * @brief Constructor */ - Pusher(double sampling, double value, int dim1, int dim2) : PusherBase(sampling, value, dim1, dim2) + Pusher(int dim1, int dim2) : PusherBase( dim1, dim2) { _paramData = _specParamData = createParamData(); // _nbDataByPacket = getNbDataByPacket(type, dim1, dim2); @@ -126,35 +116,45 @@ public: /* * @brief Put packet in a "vector" ParamData */ - virtual int put(double startTime, double stopTime, int lastIndex) + virtual int put(SpeasyProxyParamDataPacket* packet) { - // _specParamData->getDataList().resize(_nbDataByPacket); - - // for (int index = 0; index < _nbDataByPacket; ++index) - // { - // //get time - // double time = startTime + (lastIndex + index) * _sampling; - // if (time > stopTime) { - // return index; - // } - // //this element will be deleted by the Container designed by "_specParamData->getDataList()" - // ElemenType elem = ElemenType(_dim1,_dim2); - // for (int dim1Index = 0; dim1Index < _dim1; ++dim1Index) - // { - // for (int dim2Index = 0; dim2Index < _dim2; ++dim2Index) - // { - // BaseElemenType baseElem = _value; - // elem[dim1Index][dim2Index] = baseElem; - // } - // } - - // //push time and element in the ParamData - // _specParamData->getDataList().push_back(elem); - // _specParamData->getTimeList().push_back(time); - // } - - // //return nb of processed records - // return _nbDataByPacket; + //ParamData is created, add data + + _specParamData->getDataList().resize(packet->getNbData()); + + // BaseElemenType fillEl = _fillValue; + + for (int index = 0; index < packet->getNbData(); ++index) + { + //get time + double time = packet->getTime(index); + //this element will be deleted by the Container designed by "_specParamData->getDataList()" + ElemenType elem = ElemenType(packet->getDim1Size(),packet->getDim2Size()); + for (int dim1Index = 0; dim1Index < packet->getDim1Size(); ++dim1Index) + { + for (int dim2Index = 0; dim2Index < packet->getDim2Size(); ++dim2Index) + { + BaseElemenType baseElem; + //get data element + if (packet->getDataValue(&baseElem,index,dim1Index,dim2Index)) + { + // if (!isnan(_fillValue)) + // if (baseElem == fillEl) + // baseElem << NotANumber(); + } + else + baseElem << NotANumber(); + elem[dim1Index][dim2Index] = baseElem; + } + } + + //push time and element in the ParamData + _specParamData->getDataList().push_back(elem); + _specParamData->getTimeList().push_back(time); + } + + //return nb of processed records + return packet->getNbData(); return 0; } @@ -187,7 +187,7 @@ public: /* * @brief Constructor */ - Pusher(double sampling, double value, int dim) : PusherBase(sampling, value, dim) + Pusher(int dim) : PusherBase(dim) { _paramData = _specParamData = createParamData(); // _nbDataByPacket = getNbDataByPacket(type, dim, 1); @@ -196,33 +196,42 @@ public: /* * @brief Put packet in a "vector" ParamData */ - virtual int put(double startTime, double stopTime, int lastIndex) + virtual int put(SpeasyProxyParamDataPacket* packet) { - // _specParamData->getDataList().resize(_nbDataByPacket); - - // for (int index = 0; index < _nbDataByPacket; ++index) - // { - // //get time - // double time = startTime + (lastIndex + index) * _sampling; - // if (time > stopTime) { - // return index; - // }; - - // ElemenType elem; - // for (int dimIndex = 0; dimIndex < _dim1; ++dimIndex) - // { - // BaseElemenType baseElem = _value; - // elem.push_back(baseElem); - // } - - // //push time and element in the ParamData - // _specParamData->getDataList().push_back(elem); - // _specParamData->getTimeList().push_back(time); - // } - - // //return nb of processed records - // return _nbDataByPacket; - return 0; + //ParamData is created, add data + _specParamData->getDataList().resize(packet->getNbData()); + + //BaseElemenType fillEl = _fillValue; + + for (int index = 0; index < packet->getNbData(); ++index) + { + //get time + double time = packet->getTime(index); + + ElemenType elem; + for (int dimIndex = 0; dimIndex < packet->getDim1Size(); ++dimIndex) + { + BaseElemenType baseElem; + //get data element + if (packet->getDataValue(&baseElem,index,dimIndex)) + { + // if (!isnan(_fillValue)) + // if (baseElem == fillEl) + // baseElem << NotANumber(); + } + else + baseElem << NotANumber(); + //push data base element + elem.push_back(baseElem); + } + + //push time and element in the ParamData + _specParamData->getDataList().push_back(elem); + _specParamData->getTimeList().push_back(time); + } + + //return nb of processed records + return packet->getNbData(); } /* @@ -252,7 +261,7 @@ public: /* * @brief Constructor */ - Pusher(double sampling, double value) : PusherBase(sampling, value) + Pusher() : PusherBase() { _paramData = _specParamData = createParamData(); // _nbDataByPacket = getNbDataByPacket(type, 1, 1); @@ -261,28 +270,37 @@ public: /* * @brief Put packet in a "scalar" ParamData */ - virtual int put(double startTime, double stopTime, int lastIndex) + virtual int put(SpeasyProxyParamDataPacket* packet) { - // //ParamData is created, add data - // _specParamData->getDataList().resize(_nbDataByPacket); - - // for (int index = 0; index < _nbDataByPacket; ++index) - // { - // //get time - // double time = startTime + (lastIndex + index) * _sampling; - // if (time > stopTime) { - // return index; - // } - - // BaseElemenType baseElem = _value; - - // //push time and element in the ParamData - // _specParamData->getDataList().push_back(baseElem); - // _specParamData->getTimeList().push_back(time); - // } - - // return _nbDataByPacket; - return 0; + //ParamData is created, add data + _specParamData->getDataList().resize(packet->getNbData()); + + // BaseElemenType fillEl = _fillValue; + + for (int index = 0; index < packet->getNbData(); ++index) + { + //get time + double time = packet->getTime(index); + + BaseElemenType baseElem; + //get element + if (packet->getDataValue(&baseElem,index)) + { + // if (!isnan(_fillValue)) + // { + // if (baseElem == fillEl) + // baseElem << NotANumber(); + // } + } + else + baseElem << NotANumber(); + + //push time and element in the ParamData + _specParamData->getDataList().push_back(baseElem); + _specParamData->getTimeList().push_back(time); + } + + return packet->getNbData(); } SpecParamData* createParamData() { diff --git a/src/ParamGetImpl/SpeasyProxyInterface/SpeasyProxyParamData.hh b/src/ParamGetImpl/SpeasyProxyInterface/SpeasyProxyParamData.hh index 81dccca..bb001cc 100644 --- a/src/ParamGetImpl/SpeasyProxyInterface/SpeasyProxyParamData.hh +++ b/src/ParamGetImpl/SpeasyProxyInterface/SpeasyProxyParamData.hh @@ -9,6 +9,12 @@ #ifndef SPEASYPROXYPARAMDATA_HH_ #define SPEASYPROXYPARAMDATA_HH_ +#include "SpeasyProxyInterfaceConfig.hh" + +#include +#include + + namespace AMDA { namespace SpeasyProxyInterface { @@ -37,10 +43,435 @@ enum SpeasyProxyParamType }; /* + * @brief Define the maximum dimension size + */ +#define PARAMPACKET_MAX_DIMSIZE 10000 + +/* * @brief Define the maximum bytes available for one packet */ -// #define PARAMPACKET_MAX_DATABYTES 80000 -// #define PARAMPACKET_MIN_NBDATABYPACKET 1000 +#define PARAMPACKET_MAX_DATABYTES 80000 +/* + * @brief Define a packet to push to a ParamData + */ +class SpeasyProxyParamDataPacket +{ +public: + /* + * @brief Constructor + */ + SpeasyProxyParamDataPacket(void) : _isInit(false), _containerType(CONTAINER_SCALAR), + _paramType(TYPE_UNKNOWN), _nbData(0), _nbMaxData(0), _dims(NULL), _size(0), + _times(NULL), _datas(NULL), _noData(false), _startTime(0.), _stopTime(0.) + { + } + + /* + * @brief Destructor + */ + ~SpeasyProxyParamDataPacket(void) + { + //free allocated data if needed + free(); + } + + /* + * @brief Init the packet. This function allocate all buffers used by a packet + */ + bool init(SpeasyProxyContainerType containerType, + SpeasyProxyParamType paramType, int dim1 = 0, int dim2 = 0) + { + if (_isInit) + //already init + return false; + + _containerType = containerType; + _paramType = paramType; + _size = 0; + + //compute dimSize + switch (_containerType) + { + case CONTAINER_SCALAR : + //for a scalar, no dims defined and the size is 1 + _size = 1; + _dims = NULL; + break; + case CONTAINER_VECTOR : + //for a vector, 1 "dims" defined and the size is egal to this dimension size + if (dim1 <= 0) + return false; + _size = dim1; + _dims = new int[1]; + _dims[0] = dim1; + break; + case CONTAINER_MATRIX : + //for a matrix, 2 "dims" defined and the size is egal to the multiplication of the two dimensions sizes + if ((dim1 <= 0) || (dim2 <= 0)) + return false; + _size = dim1 * dim2; + _dims = new int[2]; + _dims[0] = dim1; + _dims[1] = dim2; + break; + default: + return false; + } + + //check dimSize + if ((_size <= 0) || (_size >= PARAMPACKET_MAX_DIMSIZE)) + { + if (_dims != NULL) + { + delete[] _dims; + _dims = NULL; + } + return false; + } + + //init times and datas + //use the PARAMPACKET_MAX_DATABYTES to determine the maximum data that's a packet can contain + _nbData = 0; + switch (_paramType) + { + case TYPE_FLOAT : + _nbMaxData = PARAMPACKET_MAX_DATABYTES / (_size * sizeof(float)); + if (_nbMaxData < 1) + _nbMaxData = 1; + _times = new double[_nbMaxData]; + _datas = new float[_nbMaxData*_size]; + break; + case TYPE_DOUBLE : + _nbMaxData = PARAMPACKET_MAX_DATABYTES / (_size * sizeof(double)); + if (_nbMaxData < 1) + _nbMaxData = 1; + _times = new double[_nbMaxData]; + _datas = new double[_nbMaxData*_size]; + break; + case TYPE_SHORT : + _nbMaxData = PARAMPACKET_MAX_DATABYTES / (_size * sizeof(short)); + if (_nbMaxData < 1) + _nbMaxData = 1; + _times = new double[_nbMaxData]; + _datas = new short[_nbMaxData*_size]; + break; + case TYPE_INT : + _nbMaxData = PARAMPACKET_MAX_DATABYTES / (_size * sizeof(int)); + if (_nbMaxData < 1) + _nbMaxData = 1; + _times = new double[_nbMaxData]; + _datas = new int[_nbMaxData*_size]; + break; + default : + if (_dims != NULL) + { + delete[] _dims; + _dims = NULL; + } + return false; + } + + _isInit = true; + return true; + } + + /* + * @brief Free all allocated buffers. This function is called in the destructor + */ + void free() + { + if (!_isInit) + return; + + //free all allocated buffers + if (_dims != NULL) + { + delete[] _dims; + _dims = NULL; + } + + if (_times != NULL) + { + delete[] _times; + _times = NULL; + } + + if (_datas != NULL) + { + switch (_paramType) + { + case TYPE_FLOAT : + delete[] (float*)_datas; + _datas = NULL; + break; + case TYPE_DOUBLE : + delete[] (double*)_datas; + _datas = NULL; + break; + case TYPE_SHORT : + delete[] (short*)_datas; + _datas = NULL; + break; + case TYPE_INT : + delete[] (int*)_datas; + _datas = NULL; + break; + default : + throw; + } + } + + _isInit = false; + } + + /* + * @brief Add one record (a pair of one time and a data) + * If the result is false and "full" parameter is true => the packet is full + */ + bool addData(double time, void* data, bool& full) + { + //add a record in the packet + full = false; + + if (!_isInit) + return false; + + full = (_nbData >= _nbMaxData); + if (full) + //cannot add more data in the packet + return false; + + //set data + void *pos = _datas; + int sizeToCopy; + + switch (_paramType) + { + case TYPE_FLOAT : + sizeToCopy = _size * sizeof(float); + break; + case TYPE_DOUBLE : + sizeToCopy = _size * sizeof(double); + break; + case TYPE_SHORT : + sizeToCopy = _size * sizeof(short); + break; + case TYPE_INT : + sizeToCopy = _size * sizeof(int); + break; + default: + return false; + } + pos = (void*)((intptr_t)pos + (_nbData * sizeToCopy)); + + memcpy(pos,data,sizeToCopy); + + //set time + _times[_nbData] = time; + + ++_nbData; + + full = (_nbData >= _nbMaxData); + return true; + } + + /* + * @brief Get number of record contained by the packet + */ + int getNbData(void) + { + if (!_isInit) + return 0; + return _nbData; + } + + /* + * @brief Get one time by record index + */ + double getTime(int index) + { + if (!_isInit) + return 0.; + if (index >= _nbData) + return 0.; + return _times[index]; + } + + /* + * @brief Get the first dimension size. + */ + int getDim1Size(void) + { + if (!_isInit) + return 0; + switch (_containerType) + { + case CONTAINER_VECTOR : + case CONTAINER_MATRIX : + if (_dims != NULL) + return _dims[0]; + break; + default: + return 0; + } + return 0; + } + + /* + * @brief Get the second dimension size. + */ + int getDim2Size(void) + { + if (!_isInit) + return 0; + switch (_containerType) + { + case CONTAINER_MATRIX : + if (_dims != NULL) + return _dims[1]; + break; + default: + return 0; + } + return 0; + } + + /* + * @brief Get the full size of a data of a record + */ + int getDimsSize(void) + { + return _size; + } + + /* + * @brief Get data type for a record + */ + SpeasyProxyParamType getType() + { + return _paramType; + } + + /* + * @brief Get one data value from record with the index "index" + * And from the dimensions indexes + */ + bool getDataValue(void *val, int index, int dim1Index = 0, int dim2Index = 0) + { + if (!_isInit) + return false; + void *pos = NULL; + + int valueSize = 0; + switch (_paramType) + { + case TYPE_FLOAT : + valueSize = sizeof(float); + break; + case TYPE_DOUBLE : + valueSize = sizeof(double); + break; + case TYPE_SHORT : + valueSize = sizeof(short); + break; + case TYPE_INT : + valueSize = sizeof(int); + break; + default: + return false; + } + + switch (_containerType) + { + case CONTAINER_SCALAR : + pos = (void*)((intptr_t)_datas + (valueSize * _size * index)); + break; + case CONTAINER_VECTOR : + pos = (void*)((intptr_t)_datas + (valueSize * _size * index)); + pos = (void*)((intptr_t)pos + (dim1Index * valueSize)); + break; + case CONTAINER_MATRIX : + pos = (void*)((intptr_t)_datas + (valueSize * _size * index)); + pos = (void*)((intptr_t)pos + (dim1Index * _dims[1] * valueSize)); + pos = (void*)((intptr_t)pos + (dim2Index * valueSize)); + break; + default : + return false; + } + memcpy(val,pos,valueSize); + return true; + } + + bool isNoData() { + return _noData; + } + + double getStartTime() { + return _startTime; + } + + double getStopTime() { + return _stopTime; + } + + void setNoData(double startTime, double stopTime) { + _noData = true; + _startTime = startTime; + _stopTime = stopTime; + } + +private: + /* + * @brief Flag to know if the packet is init + */ + bool _isInit; + + /* + * @brief Container type for data of the packet + */ + SpeasyProxyContainerType _containerType; + + /* + * @brief Param type for data of the packet + */ + SpeasyProxyParamType _paramType; + + /* + * @brief Number of record defined in the packet + */ + int _nbData; + + /* + * @brief Maximum number of record that's can be defined in the packet + */ + int _nbMaxData; + + /* + * @brief Dimensions definition + */ + int* _dims; + + /* + * @brief Full size of a data of a record + */ + int _size; + + /* + * @brief times buffer + */ + double* _times; + + /* + * @brief record data buffer + */ + void* _datas; + + bool _noData; + + double _startTime; + + double _stopTime; +}; } /* SpeasyProxyInterface */ } /* AMDA */ -- libgit2 0.21.2