Commit 86f177cfe4dd00fc3c3731f9e280a00801cbe00b

Authored by Benjamin Renard
1 parent 3c7c7fa7

Lock system implementation for concurrent access to a Virtual Instrument

CMakeLists.txt
... ... @@ -33,6 +33,7 @@ configure_file (
33 33 MESSAGE( STATUS "Build DD_Server Project" )
34 34 add_subdirectory(src/CLIENT)
35 35 add_subdirectory(src/SERVER)
  36 +add_subdirectory(tests)
36 37  
37 38 install(FILES "scripts/StartServer" DESTINATION . PERMISSIONS OWNER_READ OWNER_WRITE OWNER_EXECUTE GROUP_READ GROUP_EXECUTE WORLD_READ WORLD_EXECUTE)
38 39 install(DIRECTORY "src/INCLUDE/" DESTINATION include)
... ...
README
1 1 1. Set NETCDF_ROOT to use a specific netcdf installation directory :
2   -> export NETCDF_ROOT="/home/benjamin/AMDA-GIT/AMDA_COTS/netcdf/install"
  2 +> export NETCDF_ROOT="/usr/lib/"
3 3  
4 4 2. Make the build directory :
5 5 > cmake -E make_directory build
... ...
scripts/StartServer.in
... ... @@ -2,8 +2,8 @@
2 2 #-------------------------------------------------------------------
3 3 # StartServer
4 4 #-------------------------------------------------------------------
5   -DDBASE=@CMAKE_INSTALL_PREFIX@/DDBASE/DATA
6   -DDPATH=@CMAKE_INSTALL_PREFIX@/DDBASE
  5 +DDBASE=/home/budnik/AMDA-NG.core/DDBASE/DATA
  6 +DDPATH=/home/budnik/AMDA-NG.core/DDBASE
7 7 DDLIB=@CMAKE_INSTALL_PREFIX@/lib
8 8 DDBASEBIN=@CMAKE_INSTALL_PREFIX@/bin
9 9 LD_LIBRARY_PATH=$DDLIB/:@NETCDFLIB_DIR@:@USRLIB_DIR@
... ...
src/INCLUDE/DD_comm.h
... ... @@ -273,6 +273,27 @@ enum LockStatus {NOLOCK, LOCKED, LOCKREMOVED};
273 273  
274 274 /*------------------ Function for Server ---------------------*/
275 275 extern int OpenInstr(char *InstrName);
  276 +
  277 +/*
  278 + * Init shared memory for VI cache lock system
  279 + */
  280 +extern void CacheLock_Init();
  281 +
  282 +/*
  283 + * Free shared memory used for VI cache lock system
  284 + */
  285 +extern void CacheLock_Free();
  286 +
  287 +/*
  288 + * Lock cache for a specific VI
  289 + */
  290 +extern void CacheLock_Lock(char* VIID);
  291 +
  292 +/*
  293 + * Unlock cache for a specific VI
  294 + */
  295 +extern void CacheLock_Unlock(char* VIID);
  296 +
276 297 /*
277 298 * Open Virtual instrument by name and returns the ID
278 299 * Returns negative value in case of error (see DD.h)or OK
... ...
src/SERVER/CMakeLists.txt
... ... @@ -21,6 +21,7 @@ target_link_libraries(
21 21 DD_Client
22 22 ${NETCDFLIBRARY}
23 23 ${CRYPT_LIBRARY}
  24 + mpi
24 25 )
25 26  
26 27 install (TARGETS DD_Server DESTINATION bin)
... ...
src/SERVER/DD_CacheLock.c 0 โ†’ 100644
... ... @@ -0,0 +1,143 @@
  1 +/*=====================================================================
  2 + * DD SYSTEM base package
  3 + * DD_Server library
  4 + * DD_CacheLock.c
  5 + * V.1.0
  6 + * Last revision:
  7 + * May 29 2015: Version 1.0. First implementation of a lock system for concurrent access to a virtual instrument
  8 + */
  9 +
  10 +#include <stdio.h>
  11 +#include <string.h>
  12 +#include <sys/mman.h>
  13 +#include <pthread.h>
  14 +
  15 +typedef struct
  16 +{
  17 + char VIID[30];
  18 + pthread_mutex_t mutex;
  19 +} CacheLockVI;
  20 +
  21 +#define NB_CACHELOCK_VI 30
  22 +typedef struct
  23 +{
  24 + CacheLockVI cacheLockVI[NB_CACHELOCK_VI];
  25 + pthread_mutex_t mutex;
  26 +} CacheLockData;
  27 +
  28 +extern int Verbose;
  29 +
  30 +static CacheLockData* cachelock_data = NULL;
  31 +
  32 +/*
  33 + * Init the shared memory used by the lock system
  34 + */
  35 +void CacheLock_Init()
  36 +{
  37 + if (cachelock_data != NULL)
  38 + return;
  39 +
  40 + if(Verbose) fprintf(stderr,"CacheLock_Init\n");
  41 +
  42 + // place our shared data in shared memory
  43 + int prot = PROT_READ | PROT_WRITE;
  44 + int flags = MAP_SHARED | MAP_ANONYMOUS;
  45 + cachelock_data = mmap(NULL, sizeof(CacheLockData), prot, flags, -1, 0);
  46 +
  47 + // initialise mutex so it works properly in shared memory
  48 +
  49 + // global mutex
  50 + pthread_mutexattr_t attr;
  51 + pthread_mutexattr_init(&attr);
  52 + pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
  53 + pthread_mutex_init(&cachelock_data->mutex, &attr);
  54 +
  55 + int i;
  56 + for (i = 0; i < NB_CACHELOCK_VI; ++i)
  57 + {
  58 + // virtual instrument mutex
  59 + pthread_mutexattr_t attr;
  60 + pthread_mutexattr_init(&attr);
  61 + pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
  62 + pthread_mutex_init(&cachelock_data->cacheLockVI[i].mutex, &attr);
  63 + }
  64 +}
  65 +
  66 +/*
  67 + * Free the shared memory used by the lock system
  68 + */
  69 +void CacheLock_Free()
  70 +{
  71 + if(Verbose) fprintf(stderr,"CacheLock_Free\n");
  72 + if (cachelock_data != NULL)
  73 + munmap(cachelock_data, sizeof(CacheLockData));
  74 + cachelock_data = NULL;
  75 +}
  76 +
  77 +/*
  78 + * Get and lock the access to a virtual instrument
  79 + */
  80 +void CacheLock_Lock(char* VIID)
  81 +{
  82 + if (cachelock_data == NULL)
  83 + return;
  84 +
  85 + int i;
  86 + pthread_mutex_lock(&cachelock_data->mutex);
  87 + for (i = 0; i < NB_CACHELOCK_VI; ++i)
  88 + {
  89 + if (strcmp(VIID, cachelock_data->cacheLockVI[i].VIID) == 0)
  90 + {
  91 + //A cache lock already exist for this VI
  92 + //Try to get access
  93 + pthread_mutex_lock(&cachelock_data->cacheLockVI[i].mutex);
  94 + if(Verbose) fprintf(stderr,"CacheLock_Lock. %s is locked\n", VIID);
  95 + pthread_mutex_unlock(&cachelock_data->mutex);
  96 + return;
  97 + }
  98 + }
  99 + pthread_mutex_unlock(&cachelock_data->mutex);
  100 +
  101 + //No cache lock exist for this VI
  102 + //Try to find an available cache lock
  103 + while (1)
  104 + {
  105 + pthread_mutex_lock(&cachelock_data->mutex);
  106 + for (i = 0; i < NB_CACHELOCK_VI; ++i)
  107 + {
  108 + if (pthread_mutex_trylock(&cachelock_data->cacheLockVI[i].mutex) == 0)
  109 + {
  110 + //This cache lock is now used for this VIID
  111 + strcpy(cachelock_data->cacheLockVI[i].VIID, VIID);
  112 + if(Verbose) fprintf(stderr,"CacheLock_Lock. %s is locked\n", VIID);
  113 + pthread_mutex_unlock(&cachelock_data->mutex);
  114 + return;
  115 + }
  116 + }
  117 + //No available cache lock for the moment
  118 + if(Verbose) fprintf(stderr,"CacheLock_Lock. Cannot find an available cache lock for %s => Wait and retry later\n", VIID);
  119 + pthread_mutex_unlock(&cachelock_data->mutex);
  120 + sleep(2);
  121 + }
  122 +}
  123 +
  124 +/*
  125 + * Unlock the access to a virtual instrument
  126 + */
  127 +void CacheLock_Unlock(char* VIID)
  128 +{
  129 + if (cachelock_data == NULL)
  130 + return;
  131 +
  132 + int i;
  133 + for (i = 0; i < NB_CACHELOCK_VI; ++i)
  134 + {
  135 + if (strcmp(VIID, cachelock_data->cacheLockVI[i].VIID) == 0)
  136 + {
  137 + if(Verbose) fprintf(stderr,"CacheLock_Unlock. %s is unlocked\n", VIID);
  138 + pthread_mutex_unlock(&cachelock_data->cacheLockVI[i].mutex);
  139 + return;
  140 + }
  141 + }
  142 + if(Verbose) fprintf(stderr,"CacheLock_Unlock. Cannot unlock %s\n", VIID);
  143 +}
... ...
src/SERVER/DD_Server.c
... ... @@ -179,7 +179,7 @@ int SendToNet(int SocketID, char *buff, int length)
179 179 }
180 180  
181 181 /*------------------ PUTHEADER -----------------------------*
182   -/*
  182 + *
183 183 * Encode end send Reply Header
184 184 */
185 185 int PutHeader(int a1, int a2, int a3,int SocketID)
... ... @@ -358,7 +358,7 @@ void Serv(int SocketID)
358 358 static char *NewVIName = NULL; /* Full name of VI (data set to generate )*/
359 359 static char *RemSetID = NULL; /* ID of remote Data Set */
360 360 static char *RemBaseName = NULL; /* Base Name of remote WEB service */
361   - static char Command[MAXCOMMANDL]; /*Memory for command in system()
  361 + static char Command[MAXCOMMANDL]; /*Memory for command in system() */
362 362 /*--------------------------------------------------*/
363 363  
364 364 /*---------------------------------------------------
... ... @@ -578,7 +578,9 @@ void Serv(int SocketID)
578 578 /*------ Call data until LastPacket Flag (OK) returns --------*/
579 579 else do
580 580 {
  581 + CacheLock_Lock(DD_Var[id]->InstrName);
581 582 err = GetMultiData(id,VarSize,VarNames,ReqTime,BackFlag);
  583 + CacheLock_Unlock(DD_Var[id]->InstrName);
582 584 if(err >= 0)
583 585 {
584 586 serr = SendDataPacket(DD_Var[id],SocketID,DD_Var[id]->LastPacketFlag); /* OK, MOREDATA, MOREDELAY */
... ... @@ -661,6 +663,8 @@ int main(int argc, char **argv)
661 663 else if(((argv[1])[1] == 'v') || ((argv[1])[1] == 'V')) Verbose = 1;
662 664 else Verbose = 0;
663 665  
  666 + CacheLock_Init();
  667 +
664 668 if((ListenerID = SetListener()) < 0)
665 669 {
666 670 fprintf(stderr,"DD_server: I cant set Listener Socket\n");
... ... @@ -697,5 +701,8 @@ int main(int argc, char **argv)
697 701 /* Unreachable point */
698 702  
699 703 close(ListenerID);
  704 +
  705 + CacheLock_Free();
  706 +
700 707 return 0;
701 708 }
... ...
tests/CMakeLists.txt 0 โ†’ 100644
... ... @@ -0,0 +1,23 @@
  1 +
  2 +PROJECT(testParallel)
  3 +
  4 +include_directories(
  5 + ${CMAKE_HOME_DIRECTORY}/src/INCLUDE/
  6 +)
  7 +
  8 +#Configuration de l'exรฉcutable
  9 +file(
  10 + GLOB_RECURSE
  11 + source_files
  12 + ./*
  13 +)
  14 +
  15 +ADD_EXECUTABLE (testParallel ${source_files} )
  16 +
  17 +target_link_libraries(
  18 + testParallel
  19 + ${CMAKE_THREAD_LIBS_INIT}
  20 + DD_Client
  21 +)
  22 +
  23 +install (TARGETS testParallel DESTINATION tests)
... ...
tests/testParallel.c 0 โ†’ 100644
... ... @@ -0,0 +1,162 @@
  1 +/*=============================================================
  2 + * testParallel.c
  3 + * Test concurrent access of DD_Server
  4 + * May 2015, V.1.0, Renard
  5 + *=============================================================*/
  6 +#include <stdio.h>
  7 +#include <stdlib.h>
  8 +#include <unistd.h>
  9 +#include <DD.h>
  10 +#include <time.h>
  11 +#include <string.h>
  12 +#include <sys/time.h>
  13 +
  14 +//Structrure used to define a request
  15 +typedef struct {
  16 + char ViName[100];
  17 + char StartTime[17];
  18 + char TimeInt[17];
  19 + char TimeName[20];
  20 + char ParName[20];
  21 +} RequestData;
  22 +
  23 +//Number of requests to run
  24 +#define NB_REQUESTS 60
  25 +
  26 +//Array of all requests data
  27 +RequestData requestDataList[NB_REQUESTS];
  28 +
  29 +/*-------------------------*/
  30 +#define IMF_VI "ace:imf:all\0"
  31 +#define IMF_PAR "IMF\0"
  32 +#define IMF_TIME "Time\0"
  33 +#define IMF_START "2008000000000000\0"
  34 +#define IMF_TI "0000030000000000\0"
  35 +
  36 +#define DST_VI "ground:based:dst\0"
  37 +#define DST_PAR "DST\0"
  38 +#define DST_TIME "Time\0"
  39 +#define DST_START "2008000000000000\0"
  40 +#define DST_TI "0000060000000000\0"
  41 +
  42 +#define MESMAG_VI "mes:mag:orb\0"
  43 +#define MESMAG_PAR "B_MSO\0"
  44 +#define MESMAG_TIME "Time\0"
  45 +#define MESMAG_START "2012000000000000\0"
  46 +#define MESMAG_TI "0000004000000000\0"
  47 +/*-------------------------*/
  48 +
  49 +/*
  50 + * Init requests data
  51 + */
  52 +void initRequestDataList()
  53 +{
  54 + memset(&requestDataList, 0, NB_REQUESTS*sizeof(RequestData));
  55 +
  56 + int i;
  57 + for (i = 0; i < NB_REQUESTS; ++i)
  58 + {
  59 + if (i%3 == 0)
  60 + {
  61 + strcpy(requestDataList[i].ViName,MESMAG_VI);
  62 + strcpy(requestDataList[i].StartTime,MESMAG_START);
  63 + strcpy(requestDataList[i].TimeInt,MESMAG_TI);
  64 + strcpy(requestDataList[i].TimeName,MESMAG_TIME);
  65 + strcpy(requestDataList[i].ParName,MESMAG_PAR);
  66 + }
  67 + else if (i%2 == 0)
  68 + {
  69 + strcpy(requestDataList[i].ViName,DST_VI);
  70 + strcpy(requestDataList[i].StartTime,DST_START);
  71 + strcpy(requestDataList[i].TimeInt,DST_TI);
  72 + strcpy(requestDataList[i].TimeName,DST_TIME);
  73 + strcpy(requestDataList[i].ParName,DST_PAR);
  74 + }
  75 + else
  76 + {
  77 +
  78 + strcpy(requestDataList[i].ViName,IMF_VI);
  79 + strcpy(requestDataList[i].StartTime,IMF_START);
  80 + strcpy(requestDataList[i].TimeInt,IMF_TI);
  81 + strcpy(requestDataList[i].TimeName,IMF_TIME);
  82 + strcpy(requestDataList[i].ParName,IMF_PAR);
  83 + }
  84 + }
  85 +}
  86 +
  87 +/*
  88 + * Run a request
  89 + */
  90 +int runRequest(char* ViName, char* StartTime, char* TimeInt, int NbPar, char** ParNames)
  91 +{
  92 + int ID, error;
  93 + double RealTime;
  94 + DD_data_t *data;
  95 +
  96 + ID = DD_SetVariable(ViName);
  97 + if(ID < 0)
  98 + {
  99 + error = DD_Close(99);
  100 + return 0;
  101 + }
  102 +
  103 + error = DD_SetTimeInfo(ID, StartTime, &RealTime);
  104 + if(error < 0)
  105 + {
  106 + error = DD_Close(ID);
  107 + return 0;
  108 + }
  109 +
  110 + do
  111 + {
  112 + error = DD_GetMultiData(ID, NbPar, ParNames, TimeInt, &data, 1);
  113 + if(error < 0)
  114 + {
  115 + error = DD_Close(ID);
  116 + return 0;
  117 + }
  118 + if(error == MOREDELAY)
  119 + {
  120 + error = MOREDATA;
  121 + }
  122 + }
  123 + while(error == MOREDATA);
  124 +
  125 + error = DD_Close(ID);
  126 +
  127 + return 1;
  128 +}
  129 +
  130 +/*
  131 + * Main
  132 + */
  133 +int main()
  134 +{
  135 + initRequestDataList();
  136 +
  137 + pid_t pid;
  138 + int crtRequest = 0;
  139 +
  140 + do {
  141 + pid = fork();
  142 + if (pid == 0)
  143 + {
  144 + char *ParNames[2] = {(char *)requestDataList[crtRequest].TimeName, (char *)requestDataList[crtRequest].ParName};
  145 + printf("Run Request %s %s %s %s\n",requestDataList[crtRequest].ViName,requestDataList[crtRequest].ParName,requestDataList[crtRequest].StartTime,requestDataList[crtRequest].TimeInt);
  146 + if (!runRequest(requestDataList[crtRequest].ViName, requestDataList[crtRequest].StartTime, requestDataList[crtRequest].TimeInt, 2, ParNames))
  147 + printf("Error detected %s %s %s %s\n",requestDataList[crtRequest].ViName,requestDataList[crtRequest].ParName,requestDataList[crtRequest].StartTime,requestDataList[crtRequest].TimeInt);
  148 + else
  149 + printf("OK %s %s %s %s\n",requestDataList[crtRequest].ViName,requestDataList[crtRequest].ParName,requestDataList[crtRequest].StartTime,requestDataList[crtRequest].TimeInt);
  150 + }
  151 + else
  152 + {
  153 + ++crtRequest;
  154 + if (crtRequest == NB_REQUESTS)
  155 + {
  156 + wait(0);
  157 + }
  158 + }
  159 + } while ((pid != 0) && (crtRequest < NB_REQUESTS));
  160 +
  161 + return 1;
  162 +}
... ...