ParamGetSpeasyProxy.cc 8.9 KB
/*
 * ParamGetSpeasyProxy.cc
 *
 *  Created on: April 25, 2024
 *  Author: AKKODIS - Furkan
 */

#include "SpeasyProxyInterfaceConfig.hh"
#include "ParamGetSpeasyProxy.hh"
// #include "VirtualInstrumentManager.hh"

#include "FileReaderCDF.hh"

#include <stdlib.h>

#include "Parameter.hh"
#include "ParamData.hh"
#include "ParamMgr.hh"
#include "DataSetMgr.hh"
#include "DicError.hh"
#include "TimeUtil.hh"
#include <fstream>
#include "Helper.hh"
#define CURL_STATICLIB
#include <curl/curl.h>
#include <curl/easy.h>

namespace AMDA {

namespace SpeasyProxyInterface {

ParamGetSpeasyProxy::ParamGetSpeasyProxy(Parameter &parameter) :
		ParamGet_CRTP<ParamGetSpeasyProxy>(parameter),
		_paramId(""),_type(TYPE_FLOAT),_dim1(1), _dim2(1), _minSampling(1), _container(CONTAINER_SCALAR),_timeStamp(0) 
{

}

ParamGetSpeasyProxy::ParamGetSpeasyProxy(const ParamGetSpeasyProxy &pParamGetSpeasyProxy, Parameter &parameter) :
		ParamGet_CRTP<ParamGetSpeasyProxy>(pParamGetSpeasyProxy, parameter),
		_paramId(pParamGetSpeasyProxy._paramId), _type(pParamGetSpeasyProxy._type),
		_dim1(pParamGetSpeasyProxy._dim1), _dim2(pParamGetSpeasyProxy._dim2),
		_minSampling(pParamGetSpeasyProxy._minSampling), _container(pParamGetSpeasyProxy._container),
		_timeStamp(pParamGetSpeasyProxy._timeStamp)
		 
{

}

ParamGetSpeasyProxy::~ParamGetSpeasyProxy()
{
	//delete the pusher if needed
	// if (_pusher != NULL)
	// 	delete _pusher;
}

std::string ParamGetSpeasyProxy::readFile(const std::string& filename) {
    // Create an input file stream
    std::ifstream file(filename);

    // Check if the file stream was successfully opened
    if (!file.is_open()) {
        throw std::runtime_error("Could not open file");
    }

    // Use a stringstream to read the entire file into a string
    std::stringstream buffer;
    buffer << file.rdbuf();

    // Close the file stream
    file.close();

    // Return the contents of the file as a string
    return buffer.str();
}


/**
 * Downloads timetable file in tmp directory.
 */
std::string ParamGetSpeasyProxy::download(const std::string& pPath) {
	std::string localPath;
	std::string tmpPath(pPath);
	std::transform(tmpPath.begin(), tmpPath.end(), tmpPath.begin(), ::tolower);
	if (!boost::starts_with(tmpPath, "http:")
			&& !boost::starts_with(tmpPath, "https:")) {
		return pPath;
	}
	// download file
	CURL *pCurl;
	CURLcode codes;
	const char *url = pPath.c_str();
	// get tt name to create temp file as tmp_<ttdistantfilename>
	size_t endOfPath = pPath.find_last_of("/");
	if (endOfPath == std::string::npos) {
		endOfPath = pPath.find_last_of("=/\\");
	}
	std::string tmpFile = "./tmp_" + pPath.substr(endOfPath + 1);
	// do download
	pCurl = curl_easy_init();
	if (pCurl) {
		FILE *fptr = fopen(tmpFile.c_str(), "wb");
		if (fptr) {
			curl_easy_setopt(pCurl, CURLOPT_URL, url);
			curl_easy_setopt(pCurl, CURLOPT_WRITEFUNCTION, write_data);
			curl_easy_setopt(pCurl, CURLOPT_SSL_VERIFYPEER, false);
			curl_easy_setopt(pCurl, CURLOPT_SSL_VERIFYHOST, false);
			curl_easy_setopt(pCurl, CURLOPT_WRITEDATA, fptr);
			// create buffer to get potential error string
			std::vector<char> errBuf(1024);
			curl_easy_setopt(pCurl, CURLOPT_ERRORBUFFER, &errBuf[0]);
			codes = curl_easy_perform(pCurl);
			curl_easy_cleanup(pCurl);
			fclose(fptr);
			if (codes == CURLE_OK) {
				localPath = tmpFile;
			} else {
				
				std::string str(errBuf.begin(), errBuf.end());
				LOG4CXX_ERROR(_logger,
						"Unable to download " + pPath + " : "  << str );// errBuf[0]);
			}
		} else {
			LOG4CXX_ERROR(_logger,
					"Unable to download " + pPath + " : not found.");
		}

	} else {
		LOG4CXX_ERROR(_logger,
				"Unable to download " + pPath
						+ " : cUrl cannot be initialized.");
	}
	// else, do nothing
	// return local file or empty string if not downloaded
	return localPath;
}


TimeStamp ParamGetSpeasyProxy::init()
{
	LOG4CXX_DEBUG(gLogger, "ParamGetSpeasyProxy::init");
	
	// Ici, instanciation du Pusher (en fonction du type et des dim définis dans le fichier XML)

	_currentInterval = _timeIntervalList->begin();

	// get the right container

	if (_dim1 * _dim2 == 1) {
		_container = CONTAINER_SCALAR;
	}
	else if ((_dim1 > 1) && (_dim2 > 1)) {
		_container = CONTAINER_MATRIX;
	}
	else {
		_container = CONTAINER_VECTOR;
	}

	//create pusher
	switch (_container) {
		case CONTAINER_SCALAR :
			switch (_type) {
				case TYPE_FLOAT :
					_pusher = new Pusher<TYPE_FLOAT, CONTAINER_SCALAR>();
					break;
				case TYPE_DOUBLE :
					_pusher = new Pusher<TYPE_DOUBLE, CONTAINER_SCALAR>();
					break;
				case TYPE_SHORT :
					_pusher = new Pusher<TYPE_SHORT, CONTAINER_SCALAR>();
					break;
				case TYPE_INT :
					_pusher = new Pusher<TYPE_INT, CONTAINER_SCALAR>();
					break;
				default:
					LOG4CXX_ERROR(gLogger, "ParamGetSpeasyProxy::init() - Unknown type" << (const char*)_type);

			}
			break;
		case CONTAINER_VECTOR :
			switch (_type) {
				case TYPE_FLOAT :
					_pusher = new Pusher<TYPE_FLOAT, CONTAINER_VECTOR>(_dim1);
					break;
				case TYPE_DOUBLE :
					_pusher = new Pusher<TYPE_DOUBLE, CONTAINER_VECTOR>(_dim1);
					break;
				case TYPE_SHORT :
					_pusher = new Pusher<TYPE_SHORT, CONTAINER_VECTOR>(_dim1);
					break;
				case TYPE_INT :
					_pusher = new Pusher<TYPE_INT, CONTAINER_VECTOR>(_dim1);
					break;
				default:
					LOG4CXX_ERROR(gLogger, "ParamGetSpeasyProxy::init() - Unknown type" << (const char*)_type);
			}
			break;
		case CONTAINER_MATRIX :
			switch (_type) {
				case TYPE_FLOAT :
					_pusher = new Pusher<TYPE_FLOAT, CONTAINER_MATRIX>(_dim1, _dim2);
					break;
                                case TYPE_DOUBLE :
					_pusher = new Pusher<TYPE_DOUBLE, CONTAINER_MATRIX>(_dim1, _dim2);
					break;
                                case TYPE_SHORT :
					_pusher = new Pusher<TYPE_SHORT, CONTAINER_MATRIX>(_dim1, _dim2);
					break;
                                case TYPE_INT :
					_pusher = new Pusher<TYPE_INT, CONTAINER_MATRIX>(_dim1, _dim2);
					break;
				default:
					LOG4CXX_ERROR(gLogger, "ParamGetSpeasyProxy::init() - Unknown type" << (const char*)_type);
			}
			break;
		default:
			LOG4CXX_ERROR(gLogger, "ParamGetSpeasyProxy::init() - Unknown container format " << (const char*)_container);

	}

	//set link to the param data
	_paramData = ParamDataSPtr(_pusher->getParamData());
	_paramData->setMinSampling(_minSampling);


	if (_timeStamp == 0 && _signatureTrigger != "") {
		// _signatureTrigger must be a name of xml parameter file
		_timeStamp = AMDA::Helpers::Helper::dateOfFile(
				_signatureTrigger.c_str());
	}

	
	return _timeStamp;
}


unsigned int ParamGetSpeasyProxy::write()
{
	unsigned int result = 0;

	//std::string link = "http://172.200.0.14:6543/get_data?path=amda%2Fimf&start_time=2008-01-01T00%3A00%3A00&stop_time=2008-01-02T00%3A00%3A00&format=json&zstd_compression=false&pickle_proto=3";
	std::string localPath = "/home/amda_admin/AMDA/AMDA_Kernel/cdf_speasy.cdf";
	FileReaderCDF* fileReaderPtr = new FileReaderCDF();
	bool isOpen = fileReaderPtr->open(localPath);
	if(isOpen)
	{	
		std::string timeParamId  = "time";
		std::string croptedParamId = _paramId;
		std::string delim = "/";
		croptedParamId.erase(0, croptedParamId.find(delim) + delim.length());

		// Call to getParamPacketData to get CDF data

		SpeasyProxyParamDataPacket *packet= new SpeasyProxyParamDataPacket();
		packet->init(_container,_type,_dim1,_dim2);

		fileReaderPtr->getParamPacketData(timeParamId, croptedParamId, packet);

		
		// Call to the put function of pusher to initiate the ParamData
		// result == nb of data in ParamData
		result += _pusher->put(packet);

		// Push up the information if all time interval was processed.
		_paramData->getIndexInfo()._timeIntToProcessChanged = true; // ATTENTION - valeur forcée pour les tests!!!!!
		if (true) {
			++_currentInterval;
			_paramData->getIndexInfo()._noMoreTimeInt =  (_currentInterval == _timeIntervalList->end());
		}

		_paramData->getIndexInfo()._nbDataToProcess = result;

		// if time interval changed store index which delimit the end of the time interval.
		if (_paramData->getIndexInfo()._timeIntToProcessChanged) {
			unsigned int lEndTimeIntIndex = _paramData->getIndexInfo()._nbDataToProcess;
			_paramData->getIndexInfo()._endTimeIntIndexList.push_back(lEndTimeIntIndex);
		}
		else {
			// Nothing to do.
		}

		return result;
	}

	//close the file
	if (!fileReaderPtr->close())
	{
		LOG4CXX_ERROR(gLogger, "ParamGetSpeasy::init - Cannot close file " << localPath);
	}

	return result;
}

void ParamGetSpeasyProxy::updateInfo(Parameter & parameter)
{
	LOG4CXX_DEBUG(gLogger, "ParamGetSpeasyProxy::updateInfo - " << parameter.getId());
	if (parameter.getInfoId().empty())
		parameter.setInfoId(parameter.getId());

	//Param info
	AMDA::Info::ParamInfoSPtr paramInfo = AMDA::Info::ParamMgr::getInstance()->getParamInfoFromId(parameter.getInfoId(),true);

	if (paramInfo == nullptr)
		return;
}

size_t write_data(void *ptr, size_t size, size_t nmemb, FILE *stream) {
	size_t written;
	written = fwrite(ptr, size, nmemb, stream);
	return written;
}


} /* namespace SpeasyProxyInterface */
} /* namespace AMDA */