#include #include #include #include #include #include #include "acquisitionManager.h" #include "msg.h" #include "iSensor.h" #include "multitaskingAccumulator.h" #include "iAcquisitionManager.h" #include "debug.h" //producer count storage volatile unsigned int produceCount = 0; pthread_t producers[4]; static void *produce(void *params); MSG_BLOCK Buffer[BUFFER_SIZE]; /** * Semaphores and Mutex */ //TODO sem_t semaphore_libre; sem_t semaphore_occupe; pthread_mutex_t m_count = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t m_write = PTHREAD_MUTEX_INITIALIZER; /** * Usefull variables */ int index_libre = 0; int thread_count = 0; /* * Creates the synchronization elements. * @return ERROR_SUCCESS if the init is ok, ERROR_INIT otherwise */ static unsigned int createSynchronizationObjects(void); /* * Increments the produce count. */ static void incrementProducedCount(void); static unsigned int createSynchronizationObjects(void) { int error; //TODO DONE if((error = sem_init(&semaphore_libre, 0, BUFFER_SIZE)) < 0){ printf("[acquisitionManager]Semaphore L Error No. %d\n",error); return error; } if((error = sem_init(&semaphore_occupe, 0, 0)) < 0){ printf("[acquisitionManager]Semaphore O Error No. %d\n",error); return error; } printf("[acquisitionManager]Semaphore created\n"); return ERROR_SUCCESS; } static void incrementProducedCount(void) { //TODO DONE pthread_mutex_lock(&m_count); produceCount++; pthread_mutex_unlock(&m_count); } unsigned int getProducedCount(void) { //TODO DONE unsigned int p = 0; pthread_mutex_lock(&m_count); p = produceCount; pthread_mutex_unlock(&m_count); return p; } MSG_BLOCK getMessage(void){ //TODO MSG_BLOCK res; static int index_count = 0; sem_wait(&semaphore_occupe); res = Buffer[index_count]; index_count = (index_count+1)%BUFFER_SIZE; sem_post(&semaphore_libre); return res; } //TODO create accessors to limit semaphore and mutex usage outside of this C module. unsigned int acquisitionManagerInit(void) { unsigned int i; printf("[acquisitionManager]Synchronization initialization in progress...\n"); fflush( stdout ); if (createSynchronizationObjects() == ERROR_INIT) return ERROR_INIT; printf("[acquisitionManager]Synchronization initialization done.\n"); for (i = 0; i < PRODUCER_COUNT; i++) { //TODO DONE pthread_create(&producers[i], NULL, produce, NULL); } return ERROR_SUCCESS; } void acquisitionManagerJoin(void) { unsigned int i; for (i = 0; i < PRODUCER_COUNT; i++) { //TODO DONE pthread_join(producers[i], NULL); } //TODO DONE sem_destroy(&semaphore_libre); sem_destroy(&semaphore_occupe); printf("[acquisitionManager]Semaphore cleaned\n"); } void *produce(void* params) { D(printf("[acquisitionManager]Producer created with id %d\n", gettid())); unsigned int i = 0; int index_lock; int thread_num; MSG_BLOCK message; //Counting producers pthread_mutex_lock(&m_count); thread_num = thread_count; thread_count++; pthread_mutex_unlock(&m_count); while (i < PRODUCER_LOOP_LIMIT) { i++; sleep(PRODUCER_SLEEP_TIME+(rand() % 5)); //TODO DONE sem_wait(&semaphore_libre); pthread_mutex_lock(&m_write); index_lock = index_libre; index_libre = (index_libre + 1)%BUFFER_SIZE; pthread_mutex_unlock(&m_write); getInput(thread_num, &message); Buffer[index_lock] = message; incrementProducedCount(); sem_post(&semaphore_occupe); } printf("[acquisitionManager] %d termination\n", gettid()); //TODO DONE pthread_exit(NULL); }