#include #include #include #include #include #include #include "acquisitionManager.h" #include "msg.h" #include "iSensor.h" #include "multitaskingAccumulator.h" #include "iAcquisitionManager.h" #include "debug.h" #include #include #define gettid() syscall(SYS_gettid) pthread_t producers[4]; static void *produce(void *params); /** * Semaphores and Mutex */ //TODO sem_t semaphore_libre; sem_t semaphore_occupe; // pthread_mutex_t m_write = PTHREAD_MUTEX_INITIALIZER; /** * Usefull variables */ MSG_BLOCK Buffer[BUFFER_SIZE]; _Atomic int index_libre = 0; _Atomic int thread_count = 0; //producer count storage _Atomic int verrou = 0; volatile unsigned int produceCount = 0; /* * Creates the synchronization elements. * @return ERROR_SUCCESS if the init is ok, ERROR_INIT otherwise */ static unsigned int createSynchronizationObjects(void); /* * Increments the produce count. */ static void incrementProducedCount(void); // Méthode pour verrouiller un accès static void pCountLockTake(){ int expected = 0; //Tant que verrou ne vaut pas 0, on attend et expected vaut 0. Si verrou vaut bien 0, on le met à 1 while(!atomic_compare_exchange_weak(&verrou, &expected, 1)) expected = 0; //La fonction se termine.. } // Méthode pour libérer un accès static void pCountLockRelease(){ verrou = 1; } static unsigned int createSynchronizationObjects(void) { int error; //TODO DONE if((error = sem_init(&semaphore_libre, 0, BUFFER_SIZE)) < 0){ printf("[acquisitionManager]Semaphore L Error No. %d\n",error); return error; } if((error = sem_init(&semaphore_occupe, 0, 0)) < 0){ printf("[acquisitionManager]Semaphore O Error No. %d\n",error); return error; } printf("[acquisitionManager]Semaphore created\n"); return ERROR_SUCCESS; } static void incrementProducedCount(void) { //On incrément le nombre de threads producteurs pCountLockTake(); produceCount++; pCountLockRelease(); } unsigned int getProducedCount(void) { unsigned int p = 0; //On crée une section actomique pCountLockTake(); p = produceCount; pCountLockRelease(); return p; } MSG_BLOCK getMessage(void){ //TODO MSG_BLOCK res; static int index_count = 0; sem_wait(&semaphore_occupe); res = Buffer[index_count]; index_count = (index_count+1)%BUFFER_SIZE; sem_post(&semaphore_libre); return res; } //TODO create accessors to limit semaphore and mutex usage outside of this C module. unsigned int acquisitionManagerInit(void) { unsigned int i; printf("[acquisitionManager]Synchronization initialization in progress...\n"); fflush( stdout ); if (createSynchronizationObjects() == ERROR_INIT) return ERROR_INIT; printf("[acquisitionManager]Synchronization initialization done.\n"); for (i = 0; i < PRODUCER_COUNT; i++) { //TODO pthread_create(&producers[i], NULL, produce, NULL); } return ERROR_SUCCESS; } void acquisitionManagerJoin(void) { unsigned int i; for (i = 0; i < PRODUCER_COUNT; i++) { //TODO pthread_join(producers[i], NULL); } //TODO sem_destroy(&semaphore_libre); sem_destroy(&semaphore_occupe); printf("[acquisitionManager]Semaphore cleaned\n"); } void *produce(void* params) { D(printf("[acquisitionManager]Producer created with id %d\n", gettid())); unsigned int i = 0; int index_lock; int thread_num; MSG_BLOCK message; //Counting producers thread_num = thread_count; thread_count++; while (i < PRODUCER_LOOP_LIMIT) { i++; sleep(PRODUCER_SLEEP_TIME+(rand() % 5)); //TODO sem_wait(&semaphore_libre); index_lock = index_libre; index_libre = (index_libre + 1)%BUFFER_SIZE; getInput(thread_num, &message); Buffer[index_lock] = message; incrementProducedCount(); sem_post(&semaphore_occupe); } printf("[acquisitionManager] %d termination\n", syscall(gettid())); //TODO pthread_exit(NULL); }