/*===================================================================== * DD SYSTEM base package * DD_Server library * DD_CacheLock.c * V.1.0 * Last revision: * May 29 2015: Version 1.0. First implementation of a lock system for concurrent access to a virtual instrument */ #include #include #include #include #include #include #include "DD.h" #include "DD_comm.h" //Max number of VI that can be used at the same time #define CACHE_MAXVI 30 //Max number of clients that can be used a data file of a VI at the same time #define CACHE_MAXCLIENT 30 //Use state of a cache element #define CACHE_NOTUSED 0 #define CACHE_USED 1 typedef struct { //Name of the related data file char name[MAXFILENAME]; //List of client that's currently use this data file pid_t clientPIDS[CACHE_MAXCLIENT]; //State flag (CACHE_NOTUSED or CACHE_USED) int state; } CacheDataFile; typedef struct { //Virtual instrument ID char VIID[MAXSETLENGTH]; //List of data files CacheDataFile dataFiles[CASHLEN]; //State flag (CACHE_NOTUSED or CACHE_USED) int state; //Mutex for concurrential access pthread_mutex_t lock; //PID of the process that's currently lock the mutex pid_t lockPID; } CacheVIFile; typedef struct { //List of currently used VI CacheVIFile VIFiles[CACHE_MAXVI]; //Mutex for concurrential access pthread_mutex_t lock; //PID of the process that's currently lock the mutex pid_t lockPID; } CacheData; extern int Verbose; /* * Pointer to structure of Cache Data */ static CacheData* cache_data = NULL; /* * Refresh "CACHE" table from Cache file */ int Cache_RefreshTable(DD_Var_t *D) { int status; /* Error indicator */ if (D->Cash.ID < 0) { if(Verbose) fprintf(stderr,"Cache_RefreshTable. Cache file %s not opened\n", D->Cash.CacheFilePath); D->LastFileStatus = CACHERR; return CACHERR; } status = nc_sync(D->Cash.ID); if(status != NC_NOERR) { if(Verbose) fprintf(stderr,"Cache_RefreshTable. Error to synchronize cache file %s, message:%s\n", D->Cash.CacheFilePath, nc_strerror(status)); D->LastFileStatus = CACHERR; return CACHERR; } size_t cashStart[2] = {0L,0L}; size_t cashCount[2] = {1L,MAXFILENAME}; for(cashStart[0] = 0; cashStart[0] < CASHLEN; cashStart[0]++) { status = nc_get_vara_text(D->Cash.ID, D->Cash.nameID,cashStart,cashCount, D->Cash.names[cashStart[0]]); if(status != NC_NOERR) { if(Verbose) fprintf(stderr,"Cache_RefreshTable. Error to synchronize cache file name, message: %s\n", nc_strerror(status)); D->LastFileStatus = CACHERR; return CACHERR; } status = nc_get_vara_long(D->Cash.ID, D->Cash.timeID,cashStart,cashCount,&(D->Cash.times[cashStart[0]])); if(status != NC_NOERR) { if(Verbose) fprintf(stderr,"Cache_RefreshTable. Error to synchronize cache time, message: %s\n", nc_strerror(status)); D->LastFileStatus = CACHERR; return CACHERR; } } if(Verbose) fprintf(stderr,"Cache_RefreshTable. Cache table is synchronized\n"); return OK; } /* * Close cache file */ int Cache_CloseFile(DD_Var_t *D) { int status; /* Error indicator */ if(D->Cash.ID >= 0) { status = nc_close(D->Cash.ID); D->Cash.ID = -1; if(Verbose) fprintf(stderr,"CloseCacheFile()"); } } /* * Open cache file */ int Cache_OpenFile(DD_Var_t *D) { int status; /* Error indicator */ if (D->Cash.ID >= 0) return (OK); status = nc_open(D->Cash.CacheFilePath,NC_WRITE|NC_SHARE,&(D->Cash.ID)); /* Cach file */ if(status != NC_NOERR) { if(Verbose) fprintf(stderr, "Cache_OpenFile. Error to open cache file %s, message:%s\n", D->Cash.CacheFilePath, nc_strerror(status)); D->LastFileStatus = CACHERR; return(CACHERR); } status = nc_inq_varid(D->Cash.ID,"name",&(D->Cash.nameID)); status = nc_inq_varid(D->Cash.ID,"time",&(D->Cash.timeID)); status = nc_inq_varid(D->Cash.ID,"fopen",&(D->Cash.fopenID)); D->CurrCushN = -1; if(status != NC_NOERR) { if(Verbose) fprintf(stderr,"Cache_OpenFile. Error to open cache file %s, message:%s\n", D->Cash.CacheFilePath, nc_strerror(status)); D->LastFileStatus = CACHERR; return(CACHERR); } if(Verbose) fprintf(stderr,"Cache_OpenFile. Cache file %s is open\n", D->Cash.CacheFilePath); return (OK); } /* * Decompress a data file */ void Cache_DecompressDataFile(DD_Var_t *D, char* dataFileName) { char FullName[PATHLENGTH]; strcpy(FullName,D->path); strcat(FullName, dataFileName); char command[300]; strcpy(command, "gunzip -c "); strcat(command,FullName); strcat(command, ".gz > "); strcat(command, FullName); system(command); /* File is unzipped */ sprintf(command, "chmod g+w %s\0",FullName); system(command); /* File is unzipped */ } /* * Compress a data file */ void Cache_CompressDataFile(DD_Var_t *D, char* dataFileName) { //This is not really a compression, just a supression of the uncompressed data file char command[300]; strcpy(command, "rm "); strcat(command,D->path); strcat(command,dataFileName); strcat(command, " &"); system(command); } /* * Check if a process is running */ int Cache_PIDIsRunning(pid_t pid) { return (kill(pid, 0) == 0 ? 1 : 0); } /* * Release lock for dead process */ void Cache_ForceUnlockIfNotRunning(pthread_mutex_t *mutex, pid_t* lockPID) { if (pthread_mutex_trylock(mutex) != 0) { //Test if the child process is alive if (Cache_PIDIsRunning((*lockPID)) == 0) { //Force unlock (*lockPID) = 0; pthread_mutex_unlock(mutex); } } else { (*lockPID) = 0; pthread_mutex_unlock(mutex); } } /* * Unlock access */ void Cache_Unlock(pthread_mutex_t *mutex, pid_t* lockPID) { int PID = getpid(); if ((*lockPID) == PID) { (*lockPID) = 0; pthread_mutex_unlock(mutex); } } /* * Lock access */ void Cache_Lock(pthread_mutex_t* mutex, pid_t* lockPID) { int PID = getpid(); int attempt = 0; while (1) { if (pthread_mutex_trylock(mutex) == 0) { //Mutex now lock for this PID (*lockPID) = PID; return; } //Not available if ((attempt > 10) && ((*lockPID) != 0)) { Cache_ForceUnlockIfNotRunning(mutex, lockPID); attempt = 0; } ++attempt; //sleep(2); } } /* * Init the shared memory used to manage concurrent access to cache */ void Cache_Init() { if (cache_data != NULL) return; if(Verbose) fprintf(stderr,"Cache_Init\n"); // place our shared data in shared memory int prot = PROT_READ | PROT_WRITE; int flags = MAP_SHARED | MAP_ANONYMOUS; cache_data = mmap(NULL, sizeof(CacheData), prot, flags, -1, 0); memset(cache_data, 0, sizeof(CacheData)); // initialise mutex so it works properly in shared memory // global mutex for global cache data access pthread_mutexattr_t attr; pthread_mutexattr_init(&attr); pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); pthread_mutex_init(&cache_data->lock, &attr); int i, j; for (i = 0; i < CACHE_MAXVI; ++i) { // mutex for VI cache data access pthread_mutexattr_t attr; pthread_mutexattr_init(&attr); pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); pthread_mutex_init(&cache_data->VIFiles[i].lock, &attr); } } /* * Free the shared memory used by the cache manager */ void Cache_Free() { if(Verbose) fprintf(stderr,"Cache_Free\n"); if (cache_data != NULL) munmap(cache_data, sizeof(CacheData)); cache_data = NULL; } /* * Get and lock the access to a virtual instrument */ int Cache_GetAndLockVI(char* VIID) { if (cache_data == NULL) return; int i; int PID = getpid(); Cache_Lock(&(cache_data->lock), &(cache_data->lockPID)); for (i = 0; i < CACHE_MAXVI; ++i) { if ((cache_data->VIFiles[i].state == CACHE_USED) && (strcmp(VIID, cache_data->VIFiles[i].VIID) == 0)) { //VI already exist in the cache data Cache_Lock(&(cache_data->VIFiles[i].lock), &(cache_data->VIFiles[i].lockPID)); Cache_Unlock(&(cache_data->lock), &(cache_data->lockPID)); if(Verbose) fprintf(stderr,"Cache_GetAndLockVI. %s is locked by PID %d\n", VIID, PID); return i; } } //VI not exist in the cache data => Try to find an available slot to add it for (i = 0; i < CACHE_MAXVI; ++i) { if (cache_data->VIFiles[i].state == CACHE_NOTUSED) { //Use this slot for the VI Cache_Lock(&(cache_data->VIFiles[i].lock), &(cache_data->VIFiles[i].lockPID)); Cache_Unlock(&(cache_data->lock), &(cache_data->lockPID)); cache_data->VIFiles[i].state = CACHE_USED; strcpy(cache_data->VIFiles[i].VIID, VIID); if(Verbose) fprintf(stderr,"Cache_GetAndLockVI. %s is locked by PID %d\n", VIID, PID); return i; } } //Not available slot for this VI. Wait and retry if(Verbose) fprintf(stderr,"Cache_GetAndLockVI. Cannot find an available slot for %s => Wait and retry\n", VIID); int j; int k; //Cleanup for (i = 0; i < CACHE_MAXVI; ++i) { Cache_ForceUnlockIfNotRunning(&(cache_data->VIFiles[i].lock), &(cache_data->VIFiles[i].lockPID)); if (cache_data->VIFiles[i].lockPID == 0) { cache_data->VIFiles[i].state = CACHE_NOTUSED; for (j = 0; j < CASHLEN; ++j) { if (cache_data->VIFiles[i].dataFiles[j].state != CACHE_USED) continue; cache_data->VIFiles[i].dataFiles[j].state = CACHE_NOTUSED; for (k = 0; k < CACHE_MAXCLIENT; ++k) { if (cache_data->VIFiles[i].dataFiles[j].clientPIDS[k] != 0) { if (Cache_PIDIsRunning(cache_data->VIFiles[i].dataFiles[j].clientPIDS[k]) == 0) cache_data->VIFiles[i].dataFiles[j].clientPIDS[k] = 0; else { cache_data->VIFiles[i].dataFiles[j].state = CACHE_USED; } } } if (cache_data->VIFiles[i].dataFiles[j].state == CACHE_USED) cache_data->VIFiles[i].state = CACHE_USED; } } } Cache_Unlock(&(cache_data->lock), &(cache_data->lockPID)); //sleep(2); return Cache_GetAndLockVI(VIID); } /* * Unlock the access to a virtual instrument */ void Cache_UnlockVI(char* VIID) { if (cache_data == NULL) return; int i; for (i = 0; i < CACHE_MAXVI; ++i) { if (strcmp(VIID, cache_data->VIFiles[i].VIID) == 0) { Cache_Unlock(&(cache_data->VIFiles[i].lock), &(cache_data->VIFiles[i].lockPID)); if(Verbose) fprintf(stderr,"Cache_Unlock. %s is unlocked\n", VIID); return; } } if(Verbose) fprintf(stderr,"Cache_Unlock. Cannot unlock %s\n", VIID); } /* * Request the access to a specified data file for a client */ int Cache_RequestDataFileAccess(DD_Var_t *D, char* dataFileName) { if (cache_data == NULL) return(CACHERR); //Retrieve VI in cache data int cacheVIIndex = Cache_GetAndLockVI(D->InstrName); //Open Cache file if needed if (D->Cash.ID < 0) { if (Cache_OpenFile(D) != OK) { Cache_UnlockVI(D->InstrName); return(CACHERR); } } //Refresh Cache Table in relation with the cache file if (Cache_RefreshTable(D) != OK) { Cache_UnlockVI(D->InstrName); return(CACHERR); } int i; int j; int PID = getpid(); //Find existing data file in cache data and add current PID to client list int dataFileIndex = -1; for (i = 0; i < CASHLEN; ++i) { if (strcmp(cache_data->VIFiles[cacheVIIndex].dataFiles[i].name, dataFileName) == 0) { //Check if PID is already in the list of clients for (j = 0; j < CACHE_MAXCLIENT; ++j) { if (cache_data->VIFiles[cacheVIIndex].dataFiles[i].clientPIDS[j] == PID) { dataFileIndex = i; cache_data->VIFiles[cacheVIIndex].dataFiles[i].state = CACHE_USED; break; } } if (dataFileIndex >= 0) break; //Try to add PID in the list of clients for (j = 0; j < CACHE_MAXCLIENT; ++j) { if (cache_data->VIFiles[cacheVIIndex].dataFiles[i].clientPIDS[j] == 0) { dataFileIndex = i; cache_data->VIFiles[cacheVIIndex].dataFiles[i].clientPIDS[j] = PID; cache_data->VIFiles[cacheVIIndex].dataFiles[i].state = CACHE_USED; break; } } if (dataFileIndex >= 0) break; //Try to find a not alive PID for (j = 0; j < CACHE_MAXCLIENT; ++j) { if (Cache_PIDIsRunning(cache_data->VIFiles[cacheVIIndex].dataFiles[i].clientPIDS[j]) == 0) { //Replace by this PID dataFileIndex = i; cache_data->VIFiles[cacheVIIndex].dataFiles[i].clientPIDS[j] = PID; cache_data->VIFiles[cacheVIIndex].dataFiles[i].state = CACHE_USED; break; } } if (dataFileIndex >= 0) break; //Too many client read this data file => wait and retry Cache_UnlockVI(D->InstrName); if(Verbose) fprintf(stderr,"Cache_RequestDataFileAccess. Too many clients already access to %s.\n", dataFileName); //Retry //sleep(2); return Cache_RequestDataFileAccess(D, dataFileName); } } if (dataFileIndex < 0) { //Data file not open by a client => try to find the older available slot long minTime = -1; for (i = 0; i < CASHLEN; ++i) { if (cache_data->VIFiles[cacheVIIndex].dataFiles[i].state == CACHE_NOTUSED) { if ((minTime > -1) && (minTime < D->Cash.times[i])) continue; dataFileIndex = i; minTime = D->Cash.times[i]; } } if (dataFileIndex >= 0) { memset(cache_data->VIFiles[cacheVIIndex].dataFiles[dataFileIndex].clientPIDS, 0, CACHE_MAXCLIENT * sizeof(pid_t)); cache_data->VIFiles[cacheVIIndex].dataFiles[dataFileIndex].clientPIDS[0] = PID; cache_data->VIFiles[cacheVIIndex].dataFiles[dataFileIndex].state = CACHE_USED; strcpy(cache_data->VIFiles[cacheVIIndex].dataFiles[dataFileIndex].name, dataFileName); } } if (dataFileIndex < 0) { //Not available slot to open this data file => wait and retry Cache_UnlockVI(D->InstrName); if(Verbose) fprintf(stderr,"Cache_RequestDataFileAccess. Not available slot to open data file %s.\n", dataFileName); //Update process lists for all slots for (i = 0; i < CASHLEN; ++i) { cache_data->VIFiles[cacheVIIndex].dataFiles[i].state = CACHE_NOTUSED; for (j = 0; j < CACHE_MAXCLIENT; ++j) { if (cache_data->VIFiles[cacheVIIndex].dataFiles[i].clientPIDS[j] == 0) continue; if (Cache_PIDIsRunning(cache_data->VIFiles[cacheVIIndex].dataFiles[i].clientPIDS[j]) == 0) cache_data->VIFiles[cacheVIIndex].dataFiles[i].clientPIDS[j] = 0; else cache_data->VIFiles[cacheVIIndex].dataFiles[i].state = CACHE_USED; } } //Retry //sleep(2); return Cache_RequestDataFileAccess(D, dataFileName); } if (strcmp(D->Cash.names[dataFileIndex], dataFileName) != 0) { if (D->Cash.names[dataFileIndex][0] != ' ') { //Compress old data file Cache_CompressDataFile(D, D->Cash.names[dataFileIndex]); } //Decompress new data file Cache_DecompressDataFile(D, dataFileName); size_t CashStart[2] = {0L,0L}; size_t CashCount[2] = {1L,MAXFILENAME}; //Update cache table strcpy(&(D->Cash.names[dataFileIndex][0]), dataFileName); D->Cash.times[dataFileIndex] = time(NULL); CashStart[0] = dataFileIndex; //Update cache file int status; /* Error indicator */ status = nc_put_vara_text(D->Cash.ID, D->Cash.nameID,CashStart,CashCount,&(D->Cash.names[CashStart[0]][0])); status = nc_put_vara_long(D->Cash.ID, D->Cash.timeID,CashStart,CashCount,&(D->Cash.times[CashStart[0]])); if(status != NC_NOERR) { Cache_UnlockVI(D->InstrName); D->LastFileStatus = CACHERR; if(Verbose) fprintf(stderr,"Cache_RequestDataFileAccess. Error to write Name and Time to cash file, message: %s\n",nc_strerror(status)); return (CACHERR); } status = nc_sync(D->Cash.ID); } else { char FullName[PATHLENGTH]; strcpy(FullName,D->path); strcat(FullName, dataFileName); struct stat buffer; if (stat (FullName, &buffer) != 0) //Force decompression Cache_DecompressDataFile(D, dataFileName); } D->CurrCushN = dataFileIndex; if(Verbose) fprintf(stderr,"Cache_RequestDataFileAccess. Client get access to %s\n", dataFileName); Cache_UnlockVI(D->InstrName); return (OK); } /* * Release the access to a specified data file for a client */ int Cache_ReleaseDataFileAccess(DD_Var_t *D) { if (cache_data == NULL) return(CACHERR); if (D->CurrCushN < 0) { if(Verbose) fprintf(stderr,"Cache_ReleaseDataFileAccess. WARNING : Nothing to release!\n"); return(OK); } //Retrieve VI in cache data int cacheVIIndex = Cache_GetAndLockVI(D->InstrName); //Refresh Cache Table in relation with the cache file if (Cache_RefreshTable(D) != OK) { Cache_UnlockVI(D->InstrName); return(CACHERR); } int i; int j; int PID = getpid(); for (i = 0; i < CASHLEN; ++i) { if (strcmp(cache_data->VIFiles[cacheVIIndex].dataFiles[i].name, D->Cash.names[D->CurrCushN]) == 0) { if (i != D->CurrCushN) { //Incoherence !! if(Verbose) fprintf(stderr,"Cache_ReleaseDataFileAccess. WARNING : Incoherence between cache file and cache data!\n"); } //Try to find PID in clients list and update state flag cache_data->VIFiles[cacheVIIndex].dataFiles[i].state = CACHE_NOTUSED; for (j = 0; j < CACHE_MAXCLIENT; ++j) { if (cache_data->VIFiles[cacheVIIndex].dataFiles[i].clientPIDS[j] == PID) { //Remove PID to the list cache_data->VIFiles[cacheVIIndex].dataFiles[i].clientPIDS[j] = 0; if(Verbose) fprintf(stderr,"Cache_ReleaseDataFileAccess. Client release access to %s\n", D->Cash.names[D->CurrCushN]); } else if (cache_data->VIFiles[cacheVIIndex].dataFiles[i].clientPIDS[j] != 0) cache_data->VIFiles[cacheVIIndex].dataFiles[i].state = CACHE_USED; } } } D->CurrCushN = -1; Cache_UnlockVI(D->InstrName); return(OK); }