From 58faca3ff01761c84934a5c597366332754815b4 Mon Sep 17 00:00:00 2001 From: Benjamin Renard Date: Wed, 30 Sep 2015 16:35:00 +0200 Subject: [PATCH] First implementation of new cache system --- src/INCLUDE/DD_comm.h | 26 +++++++++++++++++--------- src/SERVER/DD_Cache.c | 651 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/SERVER/DD_CacheLock.c | 143 ----------------------------------------------------------------------------------------------------------------------------------------------- src/SERVER/DD_GetData.c | 45 +++++++++++---------------------------------- src/SERVER/DD_Server.c | 10 ++++------ src/SERVER/ncfileop.c | 170 ++++++++++---------------------------------------------------------------------------------------------------------------------------------------------------------------- 6 files changed, 693 insertions(+), 352 deletions(-) create mode 100644 src/SERVER/DD_Cache.c delete mode 100644 src/SERVER/DD_CacheLock.c diff --git a/src/INCLUDE/DD_comm.h b/src/INCLUDE/DD_comm.h index 033f95b..94a5037 100755 --- a/src/INCLUDE/DD_comm.h +++ b/src/INCLUDE/DD_comm.h @@ -183,7 +183,9 @@ */ /*-------------------- TYPEDEF ---------------------------*/ -typedef struct { char names[CASHLEN][MAXSETLENGTH]; +typedef struct { + char CacheFilePath[PATHLENGTH]; + char names[CASHLEN][MAXSETLENGTH]; long times[CASHLEN]; int FileOpen[CASHLEN]; /* 1 if file is open */ int ID; /* ID of open cach file */ @@ -275,24 +277,29 @@ enum LockStatus {NOLOCK, LOCKED, LOCKREMOVED}; extern int OpenInstr(char *InstrName); /* - * Init shared memory for VI cache lock system + * Init shared memory for cache system */ -extern void CacheLock_Init(); +extern void Cache_Init(); /* - * Free shared memory used for VI cache lock system + * Free shared memory used for cache system */ -extern void CacheLock_Free(); +extern void Cache_Free(); /* - * Lock cache for a specific VI + * Free shared memory used for cache system */ -extern void CacheLock_Lock(char* VIID); +extern int Cache_RequestDataFileAccess(DD_Var_t *D, char* dataFileName); /* - * Unlock cache for a specific VI + * Free shared memory used for cache system */ -extern void CacheLock_Unlock(char* VIID); +extern int Cache_ReleaseDataFileAccess(DD_Var_t *D); + +/* + * Close cache file + */ +extern int Cache_CloseFile(DD_Var_t *D); /* * Open Virtual instrument by name and returns the ID @@ -357,6 +364,7 @@ size_t MaxRecord(int ncID); * Inspect all variables and dimensions of the file (exept Time associated) and * returm maximal record number to transmit */ + /*------------------ Global constants ----------------------------------*/ static u_int xdrlen[] = {4,4,4,8,4}; static size_t unixlen[] = {sizeof(char),sizeof(int),sizeof(float),sizeof(double),sizeof(short)}; diff --git a/src/SERVER/DD_Cache.c b/src/SERVER/DD_Cache.c new file mode 100644 index 0000000..a29a96e --- /dev/null +++ b/src/SERVER/DD_Cache.c @@ -0,0 +1,651 @@ +/*===================================================================== + * DD SYSTEM base package + * DD_Server library + * DD_CacheLock.c + * V.1.0 + * Last revision: + * May 29 2015: Version 1.0. First implementation of a lock system for concurrent access to a virtual instrument + */ + +#include +#include +#include +#include +#include +#include +#include "DD.h" +#include "DD_comm.h" + +//Max number of VI that can be used at the same time +#define CACHE_MAXVI 30 +//Max number of clients that can be used a data file of a VI at the same time +#define CACHE_MAXCLIENT 30 + +//Use state of a cache element +#define CACHE_NOTUSED 0 +#define CACHE_USED 1 + +typedef struct +{ + //Name of the related data file + char name[MAXFILENAME]; + //List of client that's currently use this data file + pid_t clientPIDS[CACHE_MAXCLIENT]; + //State flag (CACHE_NOTUSED or CACHE_USED) + int state; +} CacheDataFile; + +typedef struct +{ + //Virtual instrument ID + char VIID[MAXSETLENGTH]; + //List of data files + CacheDataFile dataFiles[CASHLEN]; + //State flag (CACHE_NOTUSED or CACHE_USED) + int state; + //Mutex for concurrential access + pthread_mutex_t lock; + //PID of the process that's currently lock the mutex + pid_t lockPID; +} CacheVIFile; + +typedef struct +{ + //List of currently used VI + CacheVIFile VIFiles[CACHE_MAXVI]; + //Mutex for concurrential access + pthread_mutex_t lock; + //PID of the process that's currently lock the mutex + pid_t lockPID; +} CacheData; + +extern int Verbose; + +/* + * Pointer to structure of Cache Data + */ +static CacheData* cache_data = NULL; + +/* + * Refresh "CACHE" table from Cache file + */ +int Cache_RefreshTable(DD_Var_t *D) +{ + int status; /* Error indicator */ + + if (D->Cash.ID < 0) + { + if(Verbose) fprintf(stderr,"Cache_RefreshTable. Cache file %s not opened\n", D->Cash.CacheFilePath); + D->LastFileStatus = CACHERR; + return CACHERR; + } + + status = nc_sync(D->Cash.ID); + if(status != NC_NOERR) + { + if(Verbose) fprintf(stderr,"Cache_RefreshTable. Error to synchronize cache file %s, message:%s\n", D->Cash.CacheFilePath, nc_strerror(status)); + D->LastFileStatus = CACHERR; + return CACHERR; + } + + size_t cashStart[2] = {0L,0L}; + size_t cashCount[2] = {1L,MAXFILENAME}; + + for(cashStart[0] = 0; cashStart[0] < CASHLEN; cashStart[0]++) + { + status = nc_get_vara_text(D->Cash.ID, D->Cash.nameID,cashStart,cashCount, D->Cash.names[cashStart[0]]); + if(status != NC_NOERR) + { + if(Verbose) fprintf(stderr,"Cache_RefreshTable. Error to synchronize cache file name, message: %s\n", nc_strerror(status)); + D->LastFileStatus = CACHERR; + return CACHERR; + } + status = nc_get_vara_long(D->Cash.ID, D->Cash.timeID,cashStart,cashCount,&(D->Cash.times[cashStart[0]])); + if(status != NC_NOERR) + { + if(Verbose) fprintf(stderr,"Cache_RefreshTable. Error to synchronize cache time, message: %s\n", nc_strerror(status)); + D->LastFileStatus = CACHERR; + return CACHERR; + } + } + + if(Verbose) fprintf(stderr,"Cache_RefreshTable. Cache table is synchronized\n"); + + return OK; +} + +/* + * Close cache file + */ +int Cache_CloseFile(DD_Var_t *D) +{ + int status; /* Error indicator */ + + if(D->Cash.ID >= 0) + { + status = nc_close(D->Cash.ID); + D->Cash.ID = -1; + if(Verbose) + fprintf(stderr,"CloseCacheFile()"); + } +} + +/* + * Open cache file + */ +int Cache_OpenFile(DD_Var_t *D) +{ + int status; /* Error indicator */ + + if (D->Cash.ID >= 0) + return (OK); + + status = nc_open(D->Cash.CacheFilePath,NC_WRITE|NC_SHARE,&(D->Cash.ID)); /* Cach file */ + if(status != NC_NOERR) + { + if(Verbose) fprintf(stderr, "Cache_OpenFile. Error to open cache file %s, message:%s\n", D->Cash.CacheFilePath, nc_strerror(status)); + D->LastFileStatus = CACHERR; + return(CACHERR); + } + status = nc_inq_varid(D->Cash.ID,"name",&(D->Cash.nameID)); + status = nc_inq_varid(D->Cash.ID,"time",&(D->Cash.timeID)); + status = nc_inq_varid(D->Cash.ID,"fopen",&(D->Cash.fopenID)); + D->CurrCushN = -1; + + if(status != NC_NOERR) + { + if(Verbose) fprintf(stderr,"Cache_OpenFile. Error to open cache file %s, message:%s\n", D->Cash.CacheFilePath, nc_strerror(status)); + D->LastFileStatus = CACHERR; + return(CACHERR); + } + + if(Verbose) fprintf(stderr,"Cache_OpenFile. Cache file %s is open\n", D->Cash.CacheFilePath); + return (OK); +} + +/* + * Decompress a data file + */ +void Cache_DecompressDataFile(DD_Var_t *D, char* dataFileName) +{ + char FullName[PATHLENGTH]; + strcpy(FullName,D->path); + strcat(FullName, dataFileName); + + char command[300]; + strcpy(command, "gunzip -c "); + strcat(command,FullName); + strcat(command, ".gz > "); + strcat(command, FullName); + system(command); /* File is unzipped */ + sprintf(command, "chmod g+w %s\0",FullName); + system(command); /* File is unzipped */ +} + +/* + * Compress a data file + */ +void Cache_CompressDataFile(DD_Var_t *D, char* dataFileName) +{ + //This is not really a compression, just a supression of the uncompressed data file + char command[300]; + strcpy(command, "rm "); + strcat(command,D->path); + strcat(command,dataFileName); + strcat(command, " &"); + system(command); +} + +/* + * Check if a process is running + */ +int Cache_PIDIsRunning(pid_t pid) +{ + return (kill(pid, 0) == 0 ? 1 : 0); +} + +/* + * Release lock for dead process + */ +void Cache_ForceUnlockIfNotRunning(pthread_mutex_t *mutex, pid_t* lockPID) +{ + if (pthread_mutex_trylock(mutex) != 0) + { + //Test if the child process is alive + if (Cache_PIDIsRunning((*lockPID)) == 0) + { + //Force unlock + (*lockPID) = 0; + pthread_mutex_unlock(mutex); + } + } + else + { + (*lockPID) = 0; + pthread_mutex_unlock(mutex); + } +} + +/* + * Unlock access + */ +void Cache_Unlock(pthread_mutex_t *mutex, pid_t* lockPID) +{ + int PID = getpid(); + if ((*lockPID) == PID) + { + (*lockPID) = 0; + pthread_mutex_unlock(mutex); + } +} + +/* + * Lock access + */ +void Cache_Lock(pthread_mutex_t* mutex, pid_t* lockPID) +{ + int PID = getpid(); + int attempt = 0; + while (1) + { + if (pthread_mutex_trylock(mutex) == 0) + { + //Mutex now lock for this PID + (*lockPID) = PID; + return; + } + //Not available + if ((attempt > 10) && ((*lockPID) != 0)) + { + Cache_ForceUnlockIfNotRunning(mutex, lockPID); + attempt = 0; + } + ++attempt; + //sleep(2); + } +} + +/* + * Init the shared memory used to manage concurrent access to cache + */ +void Cache_Init() +{ + if (cache_data != NULL) + return; + + if(Verbose) fprintf(stderr,"Cache_Init\n"); + + // place our shared data in shared memory + int prot = PROT_READ | PROT_WRITE; + int flags = MAP_SHARED | MAP_ANONYMOUS; + cache_data = mmap(NULL, sizeof(CacheData), prot, flags, -1, 0); + + memset(cache_data, 0, sizeof(CacheData)); + + // initialise mutex so it works properly in shared memory + // global mutex for global cache data access + pthread_mutexattr_t attr; + pthread_mutexattr_init(&attr); + pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); + pthread_mutex_init(&cache_data->lock, &attr); + + int i, j; + for (i = 0; i < CACHE_MAXVI; ++i) + { + // mutex for VI cache data access + pthread_mutexattr_t attr; + pthread_mutexattr_init(&attr); + pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); + pthread_mutex_init(&cache_data->VIFiles[i].lock, &attr); + } +} + +/* + * Free the shared memory used by the cache manager + */ +void Cache_Free() +{ + if(Verbose) fprintf(stderr,"Cache_Free\n"); + if (cache_data != NULL) + munmap(cache_data, sizeof(CacheData)); + cache_data = NULL; +} + +/* + * Get and lock the access to a virtual instrument + */ +int Cache_GetAndLockVI(char* VIID) +{ + if (cache_data == NULL) + return; + + int i; + int PID = getpid(); + + Cache_Lock(&(cache_data->lock), &(cache_data->lockPID)); + + for (i = 0; i < CACHE_MAXVI; ++i) + { + if ((cache_data->VIFiles[i].state == CACHE_USED) && (strcmp(VIID, cache_data->VIFiles[i].VIID) == 0)) + { + //VI already exist in the cache data + Cache_Lock(&(cache_data->VIFiles[i].lock), &(cache_data->VIFiles[i].lockPID)); + Cache_Unlock(&(cache_data->lock), &(cache_data->lockPID)); + if(Verbose) fprintf(stderr,"Cache_GetAndLockVI. %s is locked by PID %d\n", VIID, PID); + return i; + } + } + + //VI not exist in the cache data => Try to find an available slot to add it + for (i = 0; i < CACHE_MAXVI; ++i) + { + if (cache_data->VIFiles[i].state == CACHE_NOTUSED) + { + //Use this slot for the VI + Cache_Lock(&(cache_data->VIFiles[i].lock), &(cache_data->VIFiles[i].lockPID)); + Cache_Unlock(&(cache_data->lock), &(cache_data->lockPID)); + cache_data->VIFiles[i].state = CACHE_USED; + strcpy(cache_data->VIFiles[i].VIID, VIID); + if(Verbose) fprintf(stderr,"Cache_GetAndLockVI. %s is locked by PID %d\n", VIID, PID); + return i; + } + } + + //Not available slot for this VI. Wait and retry + if(Verbose) + fprintf(stderr,"Cache_GetAndLockVI. Cannot find an available slot for %s => Wait and retry\n", VIID); + + int j; + int k; + //Cleanup + for (i = 0; i < CACHE_MAXVI; ++i) + { + Cache_ForceUnlockIfNotRunning(&(cache_data->VIFiles[i].lock), &(cache_data->VIFiles[i].lockPID)); + if (cache_data->VIFiles[i].lockPID == 0) + { + cache_data->VIFiles[i].state = CACHE_NOTUSED; + for (j = 0; j < CASHLEN; ++j) + { + if (cache_data->VIFiles[i].dataFiles[j].state != CACHE_USED) + continue; + cache_data->VIFiles[i].dataFiles[j].state = CACHE_NOTUSED; + for (k = 0; k < CACHE_MAXCLIENT; ++k) + { + if (cache_data->VIFiles[i].dataFiles[j].clientPIDS[k] != 0) + { + if (Cache_PIDIsRunning(cache_data->VIFiles[i].dataFiles[j].clientPIDS[k]) == 0) + cache_data->VIFiles[i].dataFiles[j].clientPIDS[k] = 0; + else + { + cache_data->VIFiles[i].dataFiles[j].state = CACHE_USED; + } + } + } + if (cache_data->VIFiles[i].dataFiles[j].state == CACHE_USED) + cache_data->VIFiles[i].state = CACHE_USED; + } + } + } + + Cache_Unlock(&(cache_data->lock), &(cache_data->lockPID)); + //sleep(2); + return Cache_GetAndLockVI(VIID); +} + +/* + * Unlock the access to a virtual instrument + */ +void Cache_UnlockVI(char* VIID) +{ + if (cache_data == NULL) + return; + + int i; + for (i = 0; i < CACHE_MAXVI; ++i) + { + if (strcmp(VIID, cache_data->VIFiles[i].VIID) == 0) + { + Cache_Unlock(&(cache_data->VIFiles[i].lock), &(cache_data->VIFiles[i].lockPID)); + if(Verbose) fprintf(stderr,"Cache_Unlock. %s is unlocked\n", VIID); + return; + } + } + if(Verbose) fprintf(stderr,"Cache_Unlock. Cannot unlock %s\n", VIID); +} + +/* + * Request the access to a specified data file for a client + */ +int Cache_RequestDataFileAccess(DD_Var_t *D, char* dataFileName) +{ + if (cache_data == NULL) + return(CACHERR); + + //Retrieve VI in cache data + int cacheVIIndex = Cache_GetAndLockVI(D->InstrName); + + //Open Cache file if needed + + if (D->Cash.ID < 0) + { + if (Cache_OpenFile(D) != OK) + { + Cache_UnlockVI(D->InstrName); + return(CACHERR); + } + } + + //Refresh Cache Table in relation with the cache file + if (Cache_RefreshTable(D) != OK) + { + Cache_UnlockVI(D->InstrName); + return(CACHERR); + } + + int i; + int j; + int PID = getpid(); + + //Find existing data file in cache data and add current PID to client list + int dataFileIndex = -1; + for (i = 0; i < CASHLEN; ++i) + { + if (strcmp(cache_data->VIFiles[cacheVIIndex].dataFiles[i].name, dataFileName) == 0) + { + //Check if PID is already in the list of clients + for (j = 0; j < CACHE_MAXCLIENT; ++j) + { + if (cache_data->VIFiles[cacheVIIndex].dataFiles[i].clientPIDS[j] == PID) + { + dataFileIndex = i; + cache_data->VIFiles[cacheVIIndex].dataFiles[i].state = CACHE_USED; + break; + } + } + if (dataFileIndex >= 0) + break; + //Try to add PID in the list of clients + for (j = 0; j < CACHE_MAXCLIENT; ++j) + { + if (cache_data->VIFiles[cacheVIIndex].dataFiles[i].clientPIDS[j] == 0) + { + dataFileIndex = i; + cache_data->VIFiles[cacheVIIndex].dataFiles[i].clientPIDS[j] = PID; + cache_data->VIFiles[cacheVIIndex].dataFiles[i].state = CACHE_USED; + break; + } + } + if (dataFileIndex >= 0) + break; + //Try to find a not alive PID + for (j = 0; j < CACHE_MAXCLIENT; ++j) + { + if (Cache_PIDIsRunning(cache_data->VIFiles[cacheVIIndex].dataFiles[i].clientPIDS[j]) == 0) + { + //Replace by this PID + dataFileIndex = i; + cache_data->VIFiles[cacheVIIndex].dataFiles[i].clientPIDS[j] = PID; + cache_data->VIFiles[cacheVIIndex].dataFiles[i].state = CACHE_USED; + break; + } + } + if (dataFileIndex >= 0) + break; + //Too many client read this data file => wait and retry + Cache_UnlockVI(D->InstrName); + if(Verbose) + fprintf(stderr,"Cache_RequestDataFileAccess. Too many clients already access to %s.\n", dataFileName); + //Retry + //sleep(2); + return Cache_RequestDataFileAccess(D, dataFileName); + } + } + + if (dataFileIndex < 0) + { + //Data file not open by a client => try to find the older available slot + long minTime = -1; + for (i = 0; i < CASHLEN; ++i) + { + if (cache_data->VIFiles[cacheVIIndex].dataFiles[i].state == CACHE_NOTUSED) + { + if ((minTime > -1) && (minTime < D->Cash.times[i])) + continue; + dataFileIndex = i; + minTime = D->Cash.times[i]; + } + } + if (dataFileIndex >= 0) + { + memset(cache_data->VIFiles[cacheVIIndex].dataFiles[dataFileIndex].clientPIDS, 0, CACHE_MAXCLIENT * sizeof(pid_t)); + cache_data->VIFiles[cacheVIIndex].dataFiles[dataFileIndex].clientPIDS[0] = PID; + cache_data->VIFiles[cacheVIIndex].dataFiles[dataFileIndex].state = CACHE_USED; + strcpy(cache_data->VIFiles[cacheVIIndex].dataFiles[dataFileIndex].name, dataFileName); + } + } + + if (dataFileIndex < 0) + { + //Not available slot to open this data file => wait and retry + Cache_UnlockVI(D->InstrName); + if(Verbose) + fprintf(stderr,"Cache_RequestDataFileAccess. Not available slot to open data file %s.\n", dataFileName); + //Retry + //sleep(2); + return Cache_RequestDataFileAccess(D, dataFileName); + } + + if (strcmp(D->Cash.names[dataFileIndex], dataFileName) != 0) + { + if (D->Cash.names[dataFileIndex][0] != ' ') + { + //Compress old data file + Cache_CompressDataFile(D, D->Cash.names[dataFileIndex]); + } + //Decompress new data file + Cache_DecompressDataFile(D, dataFileName); + + size_t CashStart[2] = {0L,0L}; + size_t CashCount[2] = {1L,MAXFILENAME}; + + //Update cache table + strcpy(&(D->Cash.names[dataFileIndex][0]), dataFileName); + D->Cash.times[dataFileIndex] = time(NULL); + CashStart[0] = dataFileIndex; + + //Update cache file + int status; /* Error indicator */ + status = nc_put_vara_text(D->Cash.ID, D->Cash.nameID,CashStart,CashCount,&(D->Cash.names[CashStart[0]][0])); + status = nc_put_vara_long(D->Cash.ID, D->Cash.timeID,CashStart,CashCount,&(D->Cash.times[CashStart[0]])); + if(status != NC_NOERR) + { + Cache_UnlockVI(D->InstrName); + D->LastFileStatus = CACHERR; + if(Verbose) fprintf(stderr,"Cache_RequestDataFileAccess. Error to write Name and Time to cash file, message: %s\n",nc_strerror(status)); + return (CACHERR); + } + status = nc_sync(D->Cash.ID); + } + else + { + char FullName[PATHLENGTH]; + strcpy(FullName,D->path); + strcat(FullName, dataFileName); + struct stat buffer; + if (stat (FullName, &buffer) != 0) + //Force decompression + Cache_DecompressDataFile(D, dataFileName); + } + + D->CurrCushN = dataFileIndex; + + if(Verbose) fprintf(stderr,"Cache_RequestDataFileAccess. Client get access to %s\n", dataFileName); + + Cache_UnlockVI(D->InstrName); + return (OK); +} + +/* + * Release the access to a specified data file for a client + */ +int Cache_ReleaseDataFileAccess(DD_Var_t *D) +{ + if (cache_data == NULL) + return(CACHERR); + + if (D->CurrCushN < 0) + { + if(Verbose) fprintf(stderr,"Cache_ReleaseDataFileAccess. WARNING : Nothing to release!\n"); + return(OK); + } + + //Retrieve VI in cache data + int cacheVIIndex = Cache_GetAndLockVI(D->InstrName); + + //Refresh Cache Table in relation with the cache file + if (Cache_RefreshTable(D) != OK) + { + Cache_UnlockVI(D->InstrName); + return(CACHERR); + } + + int i; + int j; + int PID = getpid(); + for (i = 0; i < CASHLEN; ++i) + { + if (strcmp(cache_data->VIFiles[cacheVIIndex].dataFiles[i].name, D->Cash.names[D->CurrCushN]) == 0) + { + if (i != D->CurrCushN) + { + //Incoherence !! + if(Verbose) fprintf(stderr,"Cache_ReleaseDataFileAccess. WARNING : Incoherence between cache file and cache data!\n"); + } + //Try to find PID in clients list and update state flag + cache_data->VIFiles[cacheVIIndex].dataFiles[i].state = CACHE_NOTUSED; + for (j = 0; j < CACHE_MAXCLIENT; ++j) + { + if (cache_data->VIFiles[cacheVIIndex].dataFiles[i].clientPIDS[j] == PID) + { + //Remove PID to the list + cache_data->VIFiles[cacheVIIndex].dataFiles[i].clientPIDS[j] = 0; + if(Verbose) fprintf(stderr,"Cache_ReleaseDataFileAccess. Client release access to %s\n", D->Cash.names[D->CurrCushN]); + } + else if (cache_data->VIFiles[cacheVIIndex].dataFiles[i].clientPIDS[j] != 0) + cache_data->VIFiles[cacheVIIndex].dataFiles[i].state = CACHE_USED; + } + } + } + + D->CurrCushN = -1; + + Cache_UnlockVI(D->InstrName); + + return(OK); +} + + + + + + diff --git a/src/SERVER/DD_CacheLock.c b/src/SERVER/DD_CacheLock.c deleted file mode 100644 index fbb5292..0000000 --- a/src/SERVER/DD_CacheLock.c +++ /dev/null @@ -1,143 +0,0 @@ -/*===================================================================== - * DD SYSTEM base package - * DD_Server library - * DD_CacheLock.c - * V.1.0 - * Last revision: - * May 29 2015: Version 1.0. First implementation of a lock system for concurrent access to a virtual instrument - */ - -#include -#include -#include -#include - -typedef struct -{ - char VIID[30]; - pthread_mutex_t mutex; -} CacheLockVI; - -#define NB_CACHELOCK_VI 30 -typedef struct -{ - CacheLockVI cacheLockVI[NB_CACHELOCK_VI]; - pthread_mutex_t mutex; -} CacheLockData; - -extern int Verbose; - -static CacheLockData* cachelock_data = NULL; - -/* - * Init the shared memory used by the lock system - */ -void CacheLock_Init() -{ - if (cachelock_data != NULL) - return; - - if(Verbose) fprintf(stderr,"CacheLock_Init\n"); - - // place our shared data in shared memory - int prot = PROT_READ | PROT_WRITE; - int flags = MAP_SHARED | MAP_ANONYMOUS; - cachelock_data = mmap(NULL, sizeof(CacheLockData), prot, flags, -1, 0); - - // initialise mutex so it works properly in shared memory - - // global mutex - pthread_mutexattr_t attr; - pthread_mutexattr_init(&attr); - pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); - pthread_mutex_init(&cachelock_data->mutex, &attr); - - int i; - for (i = 0; i < NB_CACHELOCK_VI; ++i) - { - // virtual instrument mutex - pthread_mutexattr_t attr; - pthread_mutexattr_init(&attr); - pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); - pthread_mutex_init(&cachelock_data->cacheLockVI[i].mutex, &attr); - } -} - -/* - * Free the shared memory used by the lock system - */ -void CacheLock_Free() -{ - if(Verbose) fprintf(stderr,"CacheLock_Free\n"); - if (cachelock_data != NULL) - munmap(cachelock_data, sizeof(CacheLockData)); - cachelock_data = NULL; -} - -/* - * Get and lock the access to a virtual instrument - */ -void CacheLock_Lock(char* VIID) -{ - if (cachelock_data == NULL) - return; - - int i; - pthread_mutex_lock(&cachelock_data->mutex); - for (i = 0; i < NB_CACHELOCK_VI; ++i) - { - if (strcmp(VIID, cachelock_data->cacheLockVI[i].VIID) == 0) - { - //A cache lock already exist for this VI - //Try to get access - pthread_mutex_lock(&cachelock_data->cacheLockVI[i].mutex); - if(Verbose) fprintf(stderr,"CacheLock_Lock. %s is locked\n", VIID); - pthread_mutex_unlock(&cachelock_data->mutex); - return; - } - } - pthread_mutex_unlock(&cachelock_data->mutex); - - //No cache lock exist for this VI - //Try to find an available cache lock - while (1) - { - pthread_mutex_lock(&cachelock_data->mutex); - for (i = 0; i < NB_CACHELOCK_VI; ++i) - { - if (pthread_mutex_trylock(&cachelock_data->cacheLockVI[i].mutex) == 0) - { - //This cache lock is now used for this VIID - strcpy(cachelock_data->cacheLockVI[i].VIID, VIID); - if(Verbose) fprintf(stderr,"CacheLock_Lock. %s is locked\n", VIID); - pthread_mutex_unlock(&cachelock_data->mutex); - return; - } - } - //No available cache lock for the moment - if(Verbose) fprintf(stderr,"CacheLock_Lock. Cannot find an available cache lock for %s => Wait and retry later\n", VIID); - pthread_mutex_unlock(&cachelock_data->mutex); - sleep(2); - } -} - -/* - * Unlock the access to a virtual instrument - */ -void CacheLock_Unlock(char* VIID) -{ - if (cachelock_data == NULL) - return; - - int i; - for (i = 0; i < NB_CACHELOCK_VI; ++i) - { - if (strcmp(VIID, cachelock_data->cacheLockVI[i].VIID) == 0) - { - if(Verbose) fprintf(stderr,"CacheLock_Unlock. %s is unlocked\n", VIID); - pthread_mutex_unlock(&cachelock_data->cacheLockVI[i].mutex); - return; - } - } - if(Verbose) fprintf(stderr,"CacheLock_Unlock. Cannot unlock %s\n", VIID); -} diff --git a/src/SERVER/DD_GetData.c b/src/SERVER/DD_GetData.c index db388a9..0c5ed09 100755 --- a/src/SERVER/DD_GetData.c +++ b/src/SERVER/DD_GetData.c @@ -155,6 +155,7 @@ int OpenInstr(char *VInstrName) DD_Var[id]->AttrData.Variables = NULL; DD_Var[id]->ncID = -1; DD_Var[id]->Maxnc_rec = 0; + DD_Var[id]->Cash.ID = -1; strcpy(DD_Var[id]->path, RefVar[0]); /*----- Init control Flags ---------------*/ @@ -183,28 +184,12 @@ int OpenInstr(char *VInstrName) /** if(Verbose) fprintf(stderr,"OpentInstr(%s): ID = %d, Atributes are resetted\n",VInstrName,id); **/ -/* ---------------------------------------------------------------- - * Open "CACHE" file - * -----------------------------------------------------------------*/ - status = nc_open(RefVar[3],NC_WRITE|NC_SHARE,&(DD_Var[id]->Cash.ID)); /* Cach file */ - if(status != NC_NOERR) - { - if(Verbose) fprintf(stderr,"OpentInstr(): open cash %s, message:%s\n",RefVar[3], nc_strerror(status)); - return(CACHERR); - } - status = nc_inq_varid(DD_Var[id]->Cash.ID,"name",&(DD_Var[id]->Cash.nameID)); - status = nc_inq_varid(DD_Var[id]->Cash.ID,"time",&(DD_Var[id]->Cash.timeID)); - status = nc_inq_varid(DD_Var[id]->Cash.ID,"fopen",&(DD_Var[id]->Cash.fopenID)); - DD_Var[id]->CurrCushN = -1; - if(status != NC_NOERR) - { - if(Verbose) fprintf(stderr,"OpentInstr(): %s\n",nc_strerror(status)); - return(CACHERR); - } -/** - if(Verbose) fprintf(stderr,"OpentInstr(%s): ID = %d, Cache is open\n",VInstrName,id); -**/ +/* ------------------------------------------------------------------- + * Set cache file path + */ + strcpy(DD_Var[id]->Cash.CacheFilePath,RefVar[3]); + /* ---------------------------------------------------------------- * Read Global timing and minimal gap from INFO file * -----------------------------------------------------------------*/ @@ -272,8 +257,6 @@ int CloseID(int ID) */ { - size_t CashStart[2] = {0L,0L}; - size_t CashCount[2] = {1L,MAXFILENAME}; int status; int i,il, count = 0; /* Is this ID */ @@ -289,21 +272,15 @@ int CloseID(int ID) { status = nc_close(DD_Var[ID]->ncID); DD_Var[ID]->ncID = -1; + Cache_ReleaseDataFileAccess(DD_Var[ID]); /** if(Verbose) fprintf(stderr,"CloseID(%d): Data File is closed\n",ID); **/ } - if(DD_Var[ID]->Cash.ID >= 0) - { - status = nc_close(DD_Var[ID]->Cash.ID); - DD_Var[ID]->Cash.ID = -1; - if(Verbose) - { - if(DD_Var[ID]->CurrCushN >= 0) - fprintf(stderr,"CloseID(%d): File %s, N %d closed\n",ID, &(DD_Var[ID]->Cash.names[DD_Var[ID]->CurrCushN][0]),DD_Var[ID]->CurrCushN); - else fprintf(stderr,"CloseID(%d): cash closed\n",ID); - } - } + //Close cache file + + Cache_CloseFile(DD_Var[ID]); + if(DD_Var[ID]->attrID >= 0) { status = nc_close(DD_Var[ID]->attrID); diff --git a/src/SERVER/DD_Server.c b/src/SERVER/DD_Server.c index 9ea1f67..74073e4 100644 --- a/src/SERVER/DD_Server.c +++ b/src/SERVER/DD_Server.c @@ -578,13 +578,11 @@ void Serv(int SocketID) /*------ Call data until LastPacket Flag (OK) returns --------*/ else do { - CacheLock_Lock(DD_Var[id]->InstrName); err = GetMultiData(id,VarSize,VarNames,ReqTime,BackFlag); - CacheLock_Unlock(DD_Var[id]->InstrName); if(err >= 0) { - serr = SendDataPacket(DD_Var[id],SocketID,DD_Var[id]->LastPacketFlag); /* OK, MOREDATA, MOREDELAY */ - if(Verbose) fprintf(stderr, "Server:DATAGETRQ (ID=%d):GetMultiData->SentPackets\n", id); + serr = SendDataPacket(DD_Var[id],SocketID,DD_Var[id]->LastPacketFlag); /* OK, MOREDATA, MOREDELAY */ + if(Verbose) fprintf(stderr, "Server:DATAGETRQ (ID=%d):GetMultiData->SentPackets\n", id); } } while((err >= 0) && (DD_Var[id]->LastPacketFlag == MOREDATA)); @@ -663,7 +661,7 @@ int main(int argc, char **argv) else if(((argv[1])[1] == 'v') || ((argv[1])[1] == 'V')) Verbose = 1; else Verbose = 0; - CacheLock_Init(); + Cache_Init(); if((ListenerID = SetListener()) < 0) { @@ -702,7 +700,7 @@ int main(int argc, char **argv) close(ListenerID); - CacheLock_Free(); + Cache_Free(); return 0; } diff --git a/src/SERVER/ncfileop.c b/src/SERVER/ncfileop.c index 76a06cb..468b3e8 100755 --- a/src/SERVER/ncfileop.c +++ b/src/SERVER/ncfileop.c @@ -32,21 +32,21 @@ extern int Verbose; *===========================================================*/ int CloseOldFile(DD_Var_t *D) { - size_t CashStart[2] = {0L,0L}; - size_t CashCount[2] = {1L,MAXFILENAME}; int status; if(D->Maxnc_rec > 0) { status = nc_close(D->ncID); + char name[MAXFILENAME]; + strcpy(name,D->Cash.names[D->CurrCushN]); + Cache_ReleaseDataFileAccess(D); if(status < 0) { D->LastFileStatus = DATAFILEERR; - if(Verbose) fprintf(stderr,"CloseOldFile: file %s, error while closed\n",&(D->Cash.names[D->CurrCushN][0]),D->CurrCushN); + if(Verbose) fprintf(stderr,"CloseOldFile: file %s, error while closed, message : \n", name, nc_strerror(status)); return DATAFILEERR; } - if(Verbose) fprintf(stderr,"CloseOldFile: file %s, %d closed\n",&(D->Cash.names[D->CurrCushN][0]),D->CurrCushN); - D->CurrCushN = -1; + if(Verbose) fprintf(stderr,"CloseOldFile: file %s closed\n", name); D->Maxnc_rec = 0; } else if(Verbose) fprintf(stderr,"CloseOldFile: Nothing To close\n"); @@ -77,8 +77,6 @@ int SetNewFile(DD_Var_t *D, int N) */ { static char TimeDimName[] = "Time"; - static size_t CashStart[2] = {0L,0L}; - static size_t CashCount[2] = {1L,MAXFILENAME}; static size_t TimeCount[2] = {1,TIMELENGTH}; char Name[MAXFILENAME]; /* Name of data nc file */ @@ -204,31 +202,6 @@ int SetNewFile(DD_Var_t *D, int N) } /* else just continue */ } } - -/*-------------- refresh the Cache table ---------------------------*/ - status = nc_sync(D->Cash.ID); - if(status != NC_NOERR) - { - if(Verbose) fprintf(stderr,"SetNewFile(): Cache Synchro: %s\n",nc_strerror(status)); - return CACHERR; - } - for(CashStart[0] = 0; CashStart[0] < CASHLEN; CashStart[0]++) - { - status = nc_get_vara_text(D->Cash.ID, D->Cash.nameID,CashStart,CashCount,D->Cash.names[CashStart[0]]); - if(status != NC_NOERR) - { - if(Verbose) fprintf(stderr,"SetNewFile(): Cache Names: %s\n",nc_strerror(status)); - D->LastFileStatus = CACHERR; - return CACHERR; - } - status = nc_get_vara_long(D->Cash.ID, D->Cash.timeID,CashStart,CashCount,&(D->Cash.times[CashStart[0]])); - if(status != NC_NOERR) - { - if(Verbose) fprintf(stderr,"SetNewFile(): Cache Times %s\n",nc_strerror(status)); - D->LastFileStatus = CACHERR; - return CACHERR; - } - } /*----------- Close the old file ----------------------*/ if((status = CloseOldFile(D)) < 0) return DATAFILEERR; @@ -237,141 +210,17 @@ int SetNewFile(DD_Var_t *D, int N) strcpy(FullName+strlen(D->path), Name); // if(Verbose) fprintf(stderr,"SetNewFile(): New file to open: %s\n",FullName); - NewAttempt = 0; + //Request data file access to the cache manager + if (Cache_RequestDataFileAccess(D, Name) != OK) + return (CACHERR); - while (NewAttempt < 3) - { - /*----- Search the new requested file in the cache and the oldest file ----------*/ - FileNumber = 0; - CurrentTime = time(NULL); Find = 0; OldestNumber = -1; - - while((FileNumber < CASHLEN) && // Cache is finished - ((Find = strcmp(&(D->Cash.names[FileNumber][0]),Name)) != 0) && // File is already in the Cache - (D->Cash.names[FileNumber][0] != ' ')) // There is empty space - { - if((D->Cash.times[FileNumber] < (CurrentTime - FILEACCMARG) ) && (OldestNumber == -1)) OldestNumber = FileNumber; - dltt = (int)(D->Cash.times[OldestNumber] - D->Cash.times[FileNumber]); - if(dltt > 0) OldestNumber = FileNumber; - FileNumber++; - } - - /* ==================================================== - * Sometimes it is possible, that: - * 1) No corresponding file in the cache (Find != 0) - * 2) The oldest file is not found (OldestNumber == -1) - * 3) cach is full (FileNumber == CASHLEN) - * Then we need to repeat the search after one sec delay - *==================================================== */ - if((Find != 0) && (FileNumber == CASHLEN) && (OldestNumber == -1)) - { - NewAttempt++; - sleep((unsigned )(FILEACCMARG)); - - if (Verbose) - fprintf(stderr,"Waiting %d secs to get the Oldest File\n", FILEACCMARG); - - } else NewAttempt = 3; - } - -//---------- Parsing resultat of search ------------- -// if(Verbose) -// { -// if(OldestNumber > -1) fprintf(stderr,"SetNewFile(): Search: Is Oldest File %d %d %d\n", OldestNumber,D->Cash.times[OldestNumber],CurrentTime); -// else fprintf(stderr,"SetNewFile(): Search: No Oldest File %d %d\n", OldestNumber,CurrentTime); -// } - - if(Find != 0) -/*---------------- No request file in the CACHE ----------------------*/ - { - if(FileNumber < CASHLEN) - { - if(D->Cash.names[FileNumber][0] == ' ') /* There is empty space in the table */ - { - strcpy(command, "gunzip -c "); - strcat(command, FullName); - strcat(command, ".gz > "); - strcat(command, FullName); - system(command); /* File is unzipped */ - sprintf(command, "chmod g+w %s\0",FullName); - system(command); /* File is unzipped */ - - strcpy(&(D->Cash.names[FileNumber][0]),Name); - D->Cash.times[FileNumber] = CurrentTime; - CashStart[0] = FileNumber; - D->CurrCushN = FileNumber; - } - else - { - D->LastFileStatus = CACHERR; - return CACHERR; /* Unrecoverable error */ - } - } - else /* No empty space. It is necessury to remove one file */ - { - if (OldestNumber > -1) //------ Oldest file is found ------- - { - strcpy(command, "gunzip -c "); - strcat(command, FullName); - strcat(command, ".gz > "); - strcat(command, FullName); - system(command); /* File is unzipped */ - sprintf(command, "chmod g+w %s\0",FullName); - system(command); /* File is unzipped */ - - strcpy(command, "rm "); - strcat(command,D->path); - strcat(command,&(D->Cash.names[OldestNumber][0])); - strcat(command, " &"); - system(command); /* Old file removed */ - strcpy(&(D->Cash.names[OldestNumber][0]),Name); - D->Cash.times[OldestNumber] = CurrentTime; - CashStart[0] = OldestNumber; - D->CurrCushN = OldestNumber; - } - else { - if(Verbose) fprintf(stderr,"SetNewFile(): return CACHTOOREC\n"); - D->LastFileStatus = CACHTOOREC; - D->TimeRecNumber -= N; - return CACHTOOREC; //----- Say client that all files are too new - } - } - /*------------ The place for new cache is found --------------*/ - status = nc_put_vara_text(D->Cash.ID, D->Cash.nameID,CashStart,CashCount,&(D->Cash.names[CashStart[0]][0])); - status = nc_put_vara_long(D->Cash.ID, D->Cash.timeID,CashStart,CashCount,&(D->Cash.times[CashStart[0]])); - if(status != NC_NOERR) - { - if(Verbose) fprintf(stderr,"SetNewFile(): Write Name and Time to cash: %s\n",nc_strerror(status)); - D->LastFileStatus = CACHERR; - return CACHERR; - } - status = nc_sync(D->Cash.ID); - } -//------------------ There is FILE already in the CACHE -------------- - else /*------- The requested file is already in the cache just refresh the time */ - { - CashStart[0] = FileNumber; - D->Cash.times[FileNumber] = CurrentTime; /* refresh the time */ - D->CurrCushN = FileNumber; - status = nc_put_vara_long(D->Cash.ID, D->Cash.timeID,CashStart,CashCount,&(D->Cash.times[CashStart[0]])); - if(status != NC_NOERR) - { - if(Verbose) fprintf(stderr,"SetNewFile(): File In Cache: %s\n",nc_strerror(status)); - D->LastFileStatus = CACHERR; - return CACHERR; - } - status = nc_sync(D->Cash.ID); - } -// if(Verbose) fprintf(stderr,"SetNewFile(): file %s, %d to be open\n",FullName,D->CurrCushN); - - -/*--------------- CACHE refreshed and requested file is unzipped ------------------------*/ /*----------------- Open requested file -----------------------------------------------*/ status = nc_open(FullName,NC_NOWRITE,&(D->ncID)); if(status != NC_NOERR) { - if(Verbose) fprintf(stderr,"SetNewFile(): Error while File Open: %s\n",nc_strerror(status)); + if(Verbose) fprintf(stderr,"SetNewFile(): Error while File Open: %s, message : %s\n", Name, nc_strerror(status)); D->LastFileStatus =DATAFILEERR ; return DATAFILEERR; } @@ -496,3 +345,4 @@ size_t MaxRecord(int ncID) } /*----------------------------------------------------------------------*/ + -- libgit2 0.21.2