ParamGetSpeasyProxy.cc 10.2 KB
/*
 * ParamGetSpeasyProxy.cc
 *
 *  Created on: April 25, 2024
 *  Author: AKKODIS - Furkan
 */

#include "SpeasyProxyInterfaceConfig.hh"
#include "ParamGetSpeasyProxy.hh"
// #include "VirtualInstrumentManager.hh"

#include "FileReaderCDF.hh"

#include <stdlib.h>

#include "Parameter.hh"
#include "ParamData.hh"
#include "ParamMgr.hh"
#include "DataSetMgr.hh"
#include "DicError.hh"
#include "TimeUtil.hh"
#include <fstream>
#include "Helper.hh"

#include <iostream>
#define CURL_STATICLIB
#include <curl/curl.h>
#include <curl/easy.h>
#include <cstdio>

namespace AMDA {

namespace SpeasyProxyInterface {

ParamGetSpeasyProxy::ParamGetSpeasyProxy(Parameter &parameter) :
		ParamGet_CRTP<ParamGetSpeasyProxy>(parameter),
		_paramId(""),_type(TYPE_FLOAT),_dim1(1), _dim2(1), _minSampling(1), _container(CONTAINER_SCALAR),_timeStamp(0) 
{

}

ParamGetSpeasyProxy::ParamGetSpeasyProxy(const ParamGetSpeasyProxy &pParamGetSpeasyProxy, Parameter &parameter) :
		ParamGet_CRTP<ParamGetSpeasyProxy>(pParamGetSpeasyProxy, parameter),
		_paramId(pParamGetSpeasyProxy._paramId), _type(pParamGetSpeasyProxy._type),
		_dim1(pParamGetSpeasyProxy._dim1), _dim2(pParamGetSpeasyProxy._dim2),
		_minSampling(pParamGetSpeasyProxy._minSampling), _container(pParamGetSpeasyProxy._container),
		_timeStamp(pParamGetSpeasyProxy._timeStamp)
		 
{

}

ParamGetSpeasyProxy::~ParamGetSpeasyProxy()
{
	//delete the pusher if needed
	// if (_pusher != NULL)
	// 	delete _pusher;
}

std::string ParamGetSpeasyProxy::readFile(const std::string& filename) {
    // Create an input file stream
    std::ifstream file(filename);

    // Check if the file stream was successfully opened
    if (!file.is_open()) {
        throw std::runtime_error("Could not open file");
    }

    // Use a stringstream to read the entire file into a string
    std::stringstream buffer;
    buffer << file.rdbuf();

    // Close the file stream
    file.close();

    // Return the contents of the file as a string
    return buffer.str();
}


/**
 * Downloads timetable file in tmp directory.
 */
std::string ParamGetSpeasyProxy::download(const std::string& pPath) {
	std::string localPath;
	std::string tmpPath(pPath);
	std::transform(tmpPath.begin(), tmpPath.end(), tmpPath.begin(), ::tolower);
	if (!boost::starts_with(tmpPath, "http:")
			&& !boost::starts_with(tmpPath, "https:")) {
		return pPath;
	}
	// download file
	CURL *pCurl;
	CURLcode codes;
	const char *url = pPath.c_str();
	// get tt name to create temp file as tmp_<ttdistantfilename>
	size_t endOfPath = pPath.find_last_of("/");
	if (endOfPath == std::string::npos) {
		endOfPath = pPath.find_last_of("=/\\");
	}
	std::string tmpFile = "./tmp_" + pPath.substr(endOfPath + 1);
	// do download
	pCurl = curl_easy_init();
	if (pCurl) {
		FILE *fptr = fopen(tmpFile.c_str(), "wb");
		if (fptr) {
			curl_easy_setopt(pCurl, CURLOPT_URL, url);
			curl_easy_setopt(pCurl, CURLOPT_WRITEFUNCTION, write_data);
			curl_easy_setopt(pCurl, CURLOPT_SSL_VERIFYPEER, false);
			curl_easy_setopt(pCurl, CURLOPT_SSL_VERIFYHOST, false);
			curl_easy_setopt(pCurl, CURLOPT_WRITEDATA, fptr);
			// create buffer to get potential error string
			std::vector<char> errBuf(1024);
			curl_easy_setopt(pCurl, CURLOPT_ERRORBUFFER, &errBuf[0]);
			codes = curl_easy_perform(pCurl);
			curl_easy_cleanup(pCurl);
			fclose(fptr);
			if (codes == CURLE_OK) {
				localPath = tmpFile;
			} else {
				
				std::string str(errBuf.begin(), errBuf.end());
				LOG4CXX_ERROR(_logger,
						"Unable to download " + pPath + " : "  << str );// errBuf[0]);
			}
		} else {
			LOG4CXX_ERROR(_logger,
					"Unable to download " + pPath + " : not found.");
		}

	} else {
		LOG4CXX_ERROR(_logger,
				"Unable to download " + pPath
						+ " : cUrl cannot be initialized.");
	}
	// else, do nothing
	// return local file or empty string if not downloaded
	return localPath;
}


TimeStamp ParamGetSpeasyProxy::init()
{
	LOG4CXX_DEBUG(gLogger, "ParamGetSpeasyProxy::init");
	
	// Ici, instanciation du Pusher (en fonction du type et des dim définis dans le fichier XML)

	_currentInterval = _timeIntervalList->begin();

	// get the right container

	if (_dim1 * _dim2 == 1) {
		_container = CONTAINER_SCALAR;
	}
	else if ((_dim1 > 1) && (_dim2 > 1)) {
		_container = CONTAINER_MATRIX;
	}
	else {
		_container = CONTAINER_VECTOR;
	}

	//create pusher
	switch (_container) {
		case CONTAINER_SCALAR :
			switch (_type) {
				case TYPE_FLOAT :
					_pusher = new Pusher<TYPE_FLOAT, CONTAINER_SCALAR>();
					break;
				case TYPE_DOUBLE :
					_pusher = new Pusher<TYPE_DOUBLE, CONTAINER_SCALAR>();
					break;
				case TYPE_SHORT :
					_pusher = new Pusher<TYPE_SHORT, CONTAINER_SCALAR>();
					break;
				case TYPE_INT :
					_pusher = new Pusher<TYPE_INT, CONTAINER_SCALAR>();
					break;
				default:
					LOG4CXX_ERROR(gLogger, "ParamGetSpeasyProxy::init() - Unknown type" << (const char*)_type);

			}
			break;
		case CONTAINER_VECTOR :
			switch (_type) {
				case TYPE_FLOAT :
					_pusher = new Pusher<TYPE_FLOAT, CONTAINER_VECTOR>(_dim1);
					break;
				case TYPE_DOUBLE :
					_pusher = new Pusher<TYPE_DOUBLE, CONTAINER_VECTOR>(_dim1);
					break;
				case TYPE_SHORT :
					_pusher = new Pusher<TYPE_SHORT, CONTAINER_VECTOR>(_dim1);
					break;
				case TYPE_INT :
					_pusher = new Pusher<TYPE_INT, CONTAINER_VECTOR>(_dim1);
					break;
				default:
					LOG4CXX_ERROR(gLogger, "ParamGetSpeasyProxy::init() - Unknown type" << (const char*)_type);
			}
			break;
		case CONTAINER_MATRIX :
			switch (_type) {
				case TYPE_FLOAT :
					_pusher = new Pusher<TYPE_FLOAT, CONTAINER_MATRIX>(_dim1, _dim2);
					break;
                                case TYPE_DOUBLE :
					_pusher = new Pusher<TYPE_DOUBLE, CONTAINER_MATRIX>(_dim1, _dim2);
					break;
                                case TYPE_SHORT :
					_pusher = new Pusher<TYPE_SHORT, CONTAINER_MATRIX>(_dim1, _dim2);
					break;
                                case TYPE_INT :
					_pusher = new Pusher<TYPE_INT, CONTAINER_MATRIX>(_dim1, _dim2);
					break;
				default:
					LOG4CXX_ERROR(gLogger, "ParamGetSpeasyProxy::init() - Unknown type" << (const char*)_type);
			}
			break;
		default:
			LOG4CXX_ERROR(gLogger, "ParamGetSpeasyProxy::init() - Unknown container format " << (const char*)_container);

	}

	//set link to the param data
	_paramData = ParamDataSPtr(_pusher->getParamData());
	_paramData->setMinSampling(_minSampling);


	if (_timeStamp == 0 && _signatureTrigger != "") {
		// _signatureTrigger must be a name of xml parameter file
		_timeStamp = AMDA::Helpers::Helper::dateOfFile(
				_signatureTrigger.c_str());
	}

	
	return _timeStamp;
}

std::string ParamGetSpeasyProxy::getURLTime(double time){

	std::string newTime = TimeUtil::DD2ISO_TimeDate(TimeUtil::double2DD_TimeDate(time));
	std::string delim = ":";
	std::string toReplaceWith = "%3A";
	size_t pos = 0;

	while ((pos = newTime.find(delim, pos)) != std::string::npos) {
        newTime.replace(pos, delim.length(), toReplaceWith);
        pos += toReplaceWith.length();
    }

	// delim = ".000";
	// newTime.erase(newTime.find(delim),newTime.find(delim) + delim.length());

	return newTime;
}

std::string  ParamGetSpeasyProxy::getSpeasyProxyLink(){
	// http://172.200.0.14:6543/get_data?path=amda%2Fc1_b_gsm&start_time=2018-10-24T00%3A00%3A00&stop_time=2018-10-24T02%3A00%3A00&format=cdf&zstd_compression=false&output_format=CDF_ISTP&pickle_proto=3

	std::string path = "http://172.200.0.14:6543/get_data?path=";
	std::string delim = "/";
	std::string toReplaceWith = "%2F";
	std::string paramId = _paramId;
	size_t pos = 0;

	while ((pos = paramId.find(delim, pos)) != std::string::npos) {
        paramId.replace(pos, delim.length(), toReplaceWith);
        pos += toReplaceWith.length();
    }
	path += paramId + "&start_time=" + getURLTime(_currentInterval->_startTime);
	path += "&stop_time=" + getURLTime(_currentInterval->_stopTime);
	path += "&format=cdf&zstd_compression=false&output_format=CDF_ISTP&pickle_proto=3";
 
	return path;
}

unsigned int ParamGetSpeasyProxy::write()
{
	unsigned int result = 0;
	
	// Getting parameter ids
	std::string timeParamId  = "time";
	std::string croptedParamId = _paramId;
	std::string delim = "/";
	croptedParamId.erase(0, croptedParamId.find(delim) + delim.length());

	// Creation of the data packet
	SpeasyProxyParamDataPacket *packet= new SpeasyProxyParamDataPacket();
	packet->init(_container,_type,_dim1,_dim2);
	
	// Downloading and loading FileReader
	std::string localPath = download(getSpeasyProxyLink());
	FileReaderCDF* fileReaderPtr = new FileReaderCDF();
	bool isOpen = fileReaderPtr->open(localPath);



	if(isOpen)
	{	
		// Call to getParamPacketData to get CDF data

		fileReaderPtr->getParamPacketData(timeParamId, croptedParamId, packet);

		
		// Call to the put function of pusher to initiate the ParamData
		// result == nb of data in ParamData
		result += _pusher->put(packet);

		// Push up the information if all time interval was processed.
		_paramData->getIndexInfo()._timeIntToProcessChanged = true; // ATTENTION - valeur forcée pour les tests!!!!!
		if (true) {
			++_currentInterval;
			_paramData->getIndexInfo()._noMoreTimeInt =  (_currentInterval == _timeIntervalList->end());
		}

		_paramData->getIndexInfo()._nbDataToProcess = result;

		// if time interval changed store index which delimit the end of the time interval.
		if (_paramData->getIndexInfo()._timeIntToProcessChanged) {
			unsigned int lEndTimeIntIndex = _paramData->getIndexInfo()._nbDataToProcess;
			_paramData->getIndexInfo()._endTimeIntIndexList.push_back(lEndTimeIntIndex);
		}
		else {
			// Nothing to do.
		}
		if (result != 0) 
			remove(localPath.c_str());
		return result;
	}

	//close the file
	if (!fileReaderPtr->close())
	{
		LOG4CXX_ERROR(gLogger, "ParamGetSpeasy::init - Cannot close file " << localPath);
	}

	return result;
}

void ParamGetSpeasyProxy::updateInfo(Parameter & parameter)
{
	LOG4CXX_DEBUG(gLogger, "ParamGetSpeasyProxy::updateInfo - " << parameter.getId());
	if (parameter.getInfoId().empty())
		parameter.setInfoId(parameter.getId());

	//Param info
	AMDA::Info::ParamInfoSPtr paramInfo = AMDA::Info::ParamMgr::getInstance()->getParamInfoFromId(parameter.getInfoId(),true);

	if (paramInfo == nullptr)
		return;
}

size_t write_data(void *ptr, size_t size, size_t nmemb, FILE *stream) {
	size_t written;
	written = fwrite(ptr, size, nmemb, stream);
	return written;
}


} /* namespace SpeasyProxyInterface */
} /* namespace AMDA */