/* * ParamGetSpeasyProxy.cc * * Created on: April 25, 2024 * Author: AKKODIS - Furkan */ #include "SpeasyProxyInterfaceConfig.hh" #include "ParamGetSpeasyProxy.hh" // #include "VirtualInstrumentManager.hh" #include "FileReaderCDF.hh" #include #include "Parameter.hh" #include "ParamData.hh" #include "ParamMgr.hh" #include "DataSetMgr.hh" #include "DicError.hh" #include "TimeUtil.hh" #include #include "Helper.hh" #include #define CURL_STATICLIB #include #include #include namespace AMDA { namespace SpeasyProxyInterface { ParamGetSpeasyProxy::ParamGetSpeasyProxy(Parameter ¶meter) : ParamGet_CRTP(parameter), _paramId(""),_type(TYPE_FLOAT),_dim1(1), _dim2(1), _minSampling(1), _container(CONTAINER_SCALAR),_timeStamp(0) { } ParamGetSpeasyProxy::ParamGetSpeasyProxy(const ParamGetSpeasyProxy &pParamGetSpeasyProxy, Parameter ¶meter) : ParamGet_CRTP(pParamGetSpeasyProxy, parameter), _paramId(pParamGetSpeasyProxy._paramId), _type(pParamGetSpeasyProxy._type), _dim1(pParamGetSpeasyProxy._dim1), _dim2(pParamGetSpeasyProxy._dim2), _minSampling(pParamGetSpeasyProxy._minSampling), _container(pParamGetSpeasyProxy._container), _timeStamp(pParamGetSpeasyProxy._timeStamp) { } ParamGetSpeasyProxy::~ParamGetSpeasyProxy() { //delete the pusher if needed // if (_pusher != NULL) // delete _pusher; } std::string ParamGetSpeasyProxy::readFile(const std::string& filename) { // Create an input file stream std::ifstream file(filename); // Check if the file stream was successfully opened if (!file.is_open()) { throw std::runtime_error("Could not open file"); } // Use a stringstream to read the entire file into a string std::stringstream buffer; buffer << file.rdbuf(); // Close the file stream file.close(); // Return the contents of the file as a string return buffer.str(); } /** * Downloads timetable file in tmp directory. */ std::string ParamGetSpeasyProxy::download(const std::string& pPath) { std::string localPath; std::string tmpPath(pPath); std::transform(tmpPath.begin(), tmpPath.end(), tmpPath.begin(), ::tolower); if (!boost::starts_with(tmpPath, "http:") && !boost::starts_with(tmpPath, "https:")) { return pPath; } // download file CURL *pCurl; CURLcode codes; const char *url = pPath.c_str(); // get tt name to create temp file as tmp_ size_t endOfPath = pPath.find_last_of("/"); if (endOfPath == std::string::npos) { endOfPath = pPath.find_last_of("=/\\"); } std::string tmpFile = "./tmp_" + pPath.substr(endOfPath + 1); // do download pCurl = curl_easy_init(); if (pCurl) { FILE *fptr = fopen(tmpFile.c_str(), "wb"); if (fptr) { curl_easy_setopt(pCurl, CURLOPT_URL, url); curl_easy_setopt(pCurl, CURLOPT_WRITEFUNCTION, write_data); curl_easy_setopt(pCurl, CURLOPT_SSL_VERIFYPEER, false); curl_easy_setopt(pCurl, CURLOPT_SSL_VERIFYHOST, false); curl_easy_setopt(pCurl, CURLOPT_WRITEDATA, fptr); // create buffer to get potential error string std::vector errBuf(1024); curl_easy_setopt(pCurl, CURLOPT_ERRORBUFFER, &errBuf[0]); codes = curl_easy_perform(pCurl); curl_easy_cleanup(pCurl); fclose(fptr); if (codes == CURLE_OK) { localPath = tmpFile; } else { std::string str(errBuf.begin(), errBuf.end()); LOG4CXX_ERROR(_logger, "Unable to download " + pPath + " : " << str );// errBuf[0]); } } else { LOG4CXX_ERROR(_logger, "Unable to download " + pPath + " : not found."); } } else { LOG4CXX_ERROR(_logger, "Unable to download " + pPath + " : cUrl cannot be initialized."); } // else, do nothing // return local file or empty string if not downloaded return localPath; } TimeStamp ParamGetSpeasyProxy::init() { LOG4CXX_DEBUG(gLogger, "ParamGetSpeasyProxy::init"); // Ici, instanciation du Pusher (en fonction du type et des dim définis dans le fichier XML) _currentInterval = _timeIntervalList->begin(); // get the right container if (_dim1 * _dim2 == 1) { _container = CONTAINER_SCALAR; } else if ((_dim1 > 1) && (_dim2 > 1)) { _container = CONTAINER_MATRIX; } else { _container = CONTAINER_VECTOR; } //create pusher switch (_container) { case CONTAINER_SCALAR : switch (_type) { case TYPE_FLOAT : _pusher = new Pusher(); break; case TYPE_DOUBLE : _pusher = new Pusher(); break; case TYPE_SHORT : _pusher = new Pusher(); break; case TYPE_INT : _pusher = new Pusher(); break; default: LOG4CXX_ERROR(gLogger, "ParamGetSpeasyProxy::init() - Unknown type" << (const char*)_type); } break; case CONTAINER_VECTOR : switch (_type) { case TYPE_FLOAT : _pusher = new Pusher(_dim1); break; case TYPE_DOUBLE : _pusher = new Pusher(_dim1); break; case TYPE_SHORT : _pusher = new Pusher(_dim1); break; case TYPE_INT : _pusher = new Pusher(_dim1); break; default: LOG4CXX_ERROR(gLogger, "ParamGetSpeasyProxy::init() - Unknown type" << (const char*)_type); } break; case CONTAINER_MATRIX : switch (_type) { case TYPE_FLOAT : _pusher = new Pusher(_dim1, _dim2); break; case TYPE_DOUBLE : _pusher = new Pusher(_dim1, _dim2); break; case TYPE_SHORT : _pusher = new Pusher(_dim1, _dim2); break; case TYPE_INT : _pusher = new Pusher(_dim1, _dim2); break; default: LOG4CXX_ERROR(gLogger, "ParamGetSpeasyProxy::init() - Unknown type" << (const char*)_type); } break; default: LOG4CXX_ERROR(gLogger, "ParamGetSpeasyProxy::init() - Unknown container format " << (const char*)_container); } //set link to the param data _paramData = ParamDataSPtr(_pusher->getParamData()); _paramData->setMinSampling(_minSampling); if (_timeStamp == 0 && _signatureTrigger != "") { // _signatureTrigger must be a name of xml parameter file _timeStamp = AMDA::Helpers::Helper::dateOfFile( _signatureTrigger.c_str()); } return _timeStamp; } std::string ParamGetSpeasyProxy::getURLTime(double time){ std::string newTime = TimeUtil::DD2ISO_TimeDate(TimeUtil::double2DD_TimeDate(time)); std::string delim = ":"; std::string toReplaceWith = "%3A"; size_t pos = 0; while ((pos = newTime.find(delim, pos)) != std::string::npos) { newTime.replace(pos, delim.length(), toReplaceWith); pos += toReplaceWith.length(); } // delim = ".000"; // newTime.erase(newTime.find(delim),newTime.find(delim) + delim.length()); return newTime; } std::string ParamGetSpeasyProxy::getSpeasyProxyLink(){ // http://172.200.0.14:6543/get_data?path=amda%2Fc1_b_gsm&start_time=2018-10-24T00%3A00%3A00&stop_time=2018-10-24T02%3A00%3A00&format=cdf&zstd_compression=false&output_format=CDF_ISTP&pickle_proto=3 std::string path = "http://172.200.0.14:6543/get_data?path="; std::string delim = "/"; std::string toReplaceWith = "%2F"; std::string paramId = _paramId; size_t pos = 0; while ((pos = paramId.find(delim, pos)) != std::string::npos) { paramId.replace(pos, delim.length(), toReplaceWith); pos += toReplaceWith.length(); } path += paramId + "&start_time=" + getURLTime(_currentInterval->_startTime); path += "&stop_time=" + getURLTime(_currentInterval->_stopTime); path += "&format=cdf&zstd_compression=false&output_format=CDF_ISTP&pickle_proto=3"; return path; } unsigned int ParamGetSpeasyProxy::write() { unsigned int result = 0; // Getting parameter ids std::string timeParamId = "time"; std::string croptedParamId = _paramId; std::string delim = "/"; croptedParamId.erase(0, croptedParamId.find(delim) + delim.length()); // Creation of the data packet SpeasyProxyParamDataPacket *packet= new SpeasyProxyParamDataPacket(); packet->init(_container,_type,_dim1,_dim2); // Downloading and loading FileReader std::string localPath = download(getSpeasyProxyLink()); FileReaderCDF* fileReaderPtr = new FileReaderCDF(); bool isOpen = fileReaderPtr->open(localPath); if(isOpen) { // Call to getParamPacketData to get CDF data fileReaderPtr->getParamPacketData(timeParamId, croptedParamId, packet); // Call to the put function of pusher to initiate the ParamData // result == nb of data in ParamData result += _pusher->put(packet); // Push up the information if all time interval was processed. _paramData->getIndexInfo()._timeIntToProcessChanged = true; // ATTENTION - valeur forcée pour les tests!!!!! if (true) { ++_currentInterval; _paramData->getIndexInfo()._noMoreTimeInt = (_currentInterval == _timeIntervalList->end()); } _paramData->getIndexInfo()._nbDataToProcess = result; // if time interval changed store index which delimit the end of the time interval. if (_paramData->getIndexInfo()._timeIntToProcessChanged) { unsigned int lEndTimeIntIndex = _paramData->getIndexInfo()._nbDataToProcess; _paramData->getIndexInfo()._endTimeIntIndexList.push_back(lEndTimeIntIndex); } else { // Nothing to do. } if (result != 0) remove(localPath.c_str()); return result; } //close the file if (!fileReaderPtr->close()) { LOG4CXX_ERROR(gLogger, "ParamGetSpeasy::init - Cannot close file " << localPath); } return result; } void ParamGetSpeasyProxy::updateInfo(Parameter & parameter) { LOG4CXX_DEBUG(gLogger, "ParamGetSpeasyProxy::updateInfo - " << parameter.getId()); if (parameter.getInfoId().empty()) parameter.setInfoId(parameter.getId()); //Param info AMDA::Info::ParamInfoSPtr paramInfo = AMDA::Info::ParamMgr::getInstance()->getParamInfoFromId(parameter.getInfoId(),true); if (paramInfo == nullptr) return; } size_t write_data(void *ptr, size_t size, size_t nmemb, FILE *stream) { size_t written; written = fwrite(ptr, size, nmemb, stream); return written; } } /* namespace SpeasyProxyInterface */ } /* namespace AMDA */