#include #include #include #include #include #include #include "acquisitionManager.h" #include "msg.h" #include "iSensor.h" #include "multitaskingAccumulator.h" #include "iAcquisitionManager.h" #include "debug.h" //producer count storage volatile unsigned int produceCount = 0; pthread_t producers[4]; static void *produce(void *params); //Le tampon pour stocker les messages MSG_BLOCK buffer[BUFFER_SIZE]; int index_buffer_libre = 0; int nbr_threads = 0; /** * Semaphores and Mutex */ sem_t *semaphore_libre; sem_t *semaphore_occupe; pthread_mutex_t mutex_tab_indices = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t mutex_count = PTHREAD_MUTEX_INITIALIZER; /* * Creates the synchronization elements. * @return ERROR_SUCCESS if the init is ok, ERROR_INIT otherwise */ static unsigned int createSynchronizationObjects(void); /* * Increments the produce count. */ static void incrementProducedCount(void); static unsigned int createSynchronizationObjects(void) { //Initialisation des sémaphores semaphore_libre = sem_open("/acquisitionManager_semLibre", O_CREAT, 0644, BUFFER_SIZE); if (semaphore_libre == SEM_FAILED) { perror("[sem_open"); return ERROR_INIT; } semaphore_occupe = sem_open("/acquisitionManager_semLibre", O_CREAT, 0644, 0); if (semaphore_occupe == SEM_FAILED) { perror("[sem_open"); return ERROR_INIT; } //----Fin de l'initialisation sémaphore ----- printf("[acquisitionManager]Semaphore created\n"); return ERROR_SUCCESS; } static void incrementProducedCount(void) { pthread_mutex_lock(&mutex_count); produceCount++; pthread_mutex_unlock(&mutex_count); } unsigned int getProducedCount(void) { unsigned int p = 0; pthread_mutex_lock(&mutex_count); p = produceCount; pthread_mutex_unlock(&mutex_count); return p; } MSG_BLOCK getMessage(void){ //TODO MSG_BLOCK message; static int index_lecture = 0; sem_wait(semaphore_occupe); message = buffer[index_lecture]; index_lecture = (index_lecture+1)%BUFFER_SIZE; sem_post(semaphore_libre); return message; } //TODO create accessors to limit semaphore and mutex usage outside of this C module. unsigned int acquisitionManagerInit(void) { unsigned int i; printf("[acquisitionManager]Synchronization initialization in progress...\n"); fflush( stdout ); if (createSynchronizationObjects() == ERROR_INIT) return ERROR_INIT; printf("[acquisitionManager]Synchronization initialization done.\n"); for (i = 0; i < PRODUCER_COUNT; i++) { pthread_create(&producers[i], NULL, produce, NULL); } return ERROR_SUCCESS; } void acquisitionManagerJoin(void) { unsigned int i; for (i = 0; i < PRODUCER_COUNT; i++) { pthread_join(producers[i], NULL); } sem_destroy(semaphore_libre); sem_destroy(semaphore_occupe); printf("[acquisitionManager]Semaphore cleaned\n"); } void *produce(void* params) { //D(printf("[acquisitionManager]Producer created with id %d\n", gettid())); unsigned int i = 0; int index_loc; MSG_BLOCK message; //Récupérer le numéro du thread int num_thread; pthread_mutex_lock(&mutex_count); nbr_threads++; num_thread = nbr_threads; pthread_mutex_unlock(&mutex_count); while (i < PRODUCER_LOOP_LIMIT) { i++; sleep(PRODUCER_SLEEP_TIME+(rand() % 5)); //----- L'ajout d'un nouveau message ----- //On prend un jeton : une case se remplit sem_wait(semaphore_libre); pthread_mutex_lock(&mutex_tab_indices); index_loc = index_buffer_libre; index_buffer_libre = (index_buffer_libre+1)%BUFFER_SIZE; pthread_mutex_unlock(&mutex_tab_indices); //Récupérer l'entrée getInput(num_thread, &message); buffer[index_loc] = message; //On met un jeton dans le sémaphore "occupé" sem_post(semaphore_occupe); } //printf("[acquisitionManager] %d termination\n", get_id()); pthread_exit(NULL); }