todo completés

This commit is contained in:
hiGepi 2022-11-19 16:11:18 +01:00
parent fb7616d408
commit 7a6f332f72
16 changed files with 311 additions and 92 deletions

View file

@ -0,0 +1,173 @@
#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>
#include <unistd.h>
#include <pthread.h>
#include <fcntl.h>
#include "acquisitionManager.h"
#include "msg.h"
#include "iSensor.h"
#include "multitaskingAccumulator.h"
#include "iAcquisitionManager.h"
#include "debug.h"
//producer count storage
volatile unsigned int produceCount = 0;
pthread_t producers[4];
static void *produce(void *params);
//Le tampon pour stocker les messages
MSG_BLOCK buffer[BUFFER_SIZE];
int index_buffer_libre = 0;
int nbr_threads = 0;
/**
* Semaphores and Mutex
*/
sem_t *semaphore_libre;
sem_t *semaphore_occupe;
pthread_mutex_t mutex_tab_indices = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mutex_count = PTHREAD_MUTEX_INITIALIZER;
/*
* Creates the synchronization elements.
* @return ERROR_SUCCESS if the init is ok, ERROR_INIT otherwise
*/
static unsigned int createSynchronizationObjects(void);
/*
* Increments the produce count.
*/
static void incrementProducedCount(void);
static unsigned int createSynchronizationObjects(void)
{
//Initialisation des sémaphores
semaphore_libre = sem_open("/acquisitionManager_semLibre", O_CREAT, 0644, BUFFER_SIZE);
if (semaphore_libre == SEM_FAILED)
{
perror("[sem_open");
return ERROR_INIT;
}
semaphore_occupe = sem_open("/acquisitionManager_semLibre", O_CREAT, 0644, 0);
if (semaphore_occupe == SEM_FAILED)
{
perror("[sem_open");
return ERROR_INIT;
}
//----Fin de l'initialisation sémaphore -----
printf("[acquisitionManager]Semaphore created\n");
return ERROR_SUCCESS;
}
static void incrementProducedCount(void)
{
pthread_mutex_lock(&mutex_count);
produceCount++;
pthread_mutex_unlock(&mutex_count);
}
unsigned int getProducedCount(void)
{
unsigned int p = 0;
pthread_mutex_lock(&mutex_count);
p = produceCount;
pthread_mutex_unlock(&mutex_count);
return p;
}
MSG_BLOCK getMessage(void){
//TODO
MSG_BLOCK message;
static int index_lecture = 0;
sem_wait(semaphore_occupe);
message = buffer[index_lecture];
index_lecture = (index_lecture+1)%BUFFER_SIZE;
sem_post(semaphore_libre);
return message;
}
//TODO create accessors to limit semaphore and mutex usage outside of this C module.
unsigned int acquisitionManagerInit(void)
{
unsigned int i;
printf("[acquisitionManager]Synchronization initialization in progress...\n");
fflush( stdout );
if (createSynchronizationObjects() == ERROR_INIT)
return ERROR_INIT;
printf("[acquisitionManager]Synchronization initialization done.\n");
for (i = 0; i < PRODUCER_COUNT; i++)
{
pthread_create(&producers[i], NULL, produce, NULL);
}
return ERROR_SUCCESS;
}
void acquisitionManagerJoin(void)
{
unsigned int i;
for (i = 0; i < PRODUCER_COUNT; i++)
{
pthread_join(producers[i], NULL);
}
sem_destroy(semaphore_libre);
sem_destroy(semaphore_occupe);
printf("[acquisitionManager]Semaphore cleaned\n");
}
void *produce(void* params)
{
//D(printf("[acquisitionManager]Producer created with id %d\n", gettid()));
unsigned int i = 0;
int index_loc;
MSG_BLOCK message;
//Récupérer le numéro du thread
int num_thread;
pthread_mutex_lock(&mutex_count);
nbr_threads++;
num_thread = nbr_threads;
pthread_mutex_unlock(&mutex_count);
while (i < PRODUCER_LOOP_LIMIT)
{
i++;
sleep(PRODUCER_SLEEP_TIME+(rand() % 5));
//----- L'ajout d'un nouveau message -----
//On prend un jeton : une case se remplit
sem_wait(semaphore_libre);
pthread_mutex_lock(&mutex_tab_indices);
index_loc = index_buffer_libre;
index_buffer_libre = (index_buffer_libre+1)%BUFFER_SIZE;
pthread_mutex_unlock(&mutex_tab_indices);
//Récupérer l'entrée
getInput(num_thread, &message);
buffer[index_loc] = message;
//On met un jeton dans le sémaphore "occupé"
sem_post(semaphore_occupe);
}
//printf("[acquisitionManager] %d termination\n", get_id());
pthread_exit(NULL);
}

View file

@ -0,0 +1,18 @@
#ifndef ACQUISITION_MANAGER_H
#define ACQUISITION_MANAGER_H
/**
* Initializes the acquisitions
*/
unsigned int acquisitionManagerInit(void);
/**
* Waits that acquisitions terminate
*/
void acquisitionManagerJoin(void);
//La taille du tampon
#define BUFFER_SIZE 13
#endif

View file

@ -1,6 +1,12 @@
#ifndef ACQUISITION_MANAGER_H #ifndef ACQUISITION_MANAGER_H
#define ACQUISITION_MANAGER_H #define ACQUISITION_MANAGER_H
/**
* Initializes constants
*/
#define BUFFER_SIZE 13
/** /**
* Initializes the acquisitions * Initializes the acquisitions
*/ */
@ -11,8 +17,4 @@ unsigned int acquisitionManagerInit(void);
*/ */
void acquisitionManagerJoin(void); void acquisitionManagerJoin(void);
#endif
//La taille du tampon
#define BUFFER_SIZE 13
#endif

View file

@ -19,22 +19,24 @@ pthread_t producers[4];
static void *produce(void *params); static void *produce(void *params);
//Le tampon pour stocker les messages MSG_BLOCK Buffer[BUFFER_SIZE];
MSG_BLOCK buffer[BUFFER_SIZE];
int index_buffer_libre = 0;
int nbr_threads = 0;
/** /**
* Semaphores and Mutex * Semaphores and Mutex
*/ */
sem_t *semaphore_libre; //TODO
sem_t *semaphore_occupe; sem_t semaphore_libre;
sem_t semaphore_occupe;
pthread_mutex_t mutex_tab_indices = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t m_count = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mutex_count = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t m_write = PTHREAD_MUTEX_INITIALIZER;
/**
* Usefull variables
*/
int index_libre = 0;
int thread_count = 0;
/* /*
* Creates the synchronization elements. * Creates the synchronization elements.
* @return ERROR_SUCCESS if the init is ok, ERROR_INIT otherwise * @return ERROR_SUCCESS if the init is ok, ERROR_INIT otherwise
@ -48,55 +50,53 @@ static void incrementProducedCount(void);
static unsigned int createSynchronizationObjects(void) static unsigned int createSynchronizationObjects(void)
{ {
int error;
//Initialisation des sémaphores //TODO DONE
semaphore_libre = sem_open("/acquisitionManager_semLibre", O_CREAT, 0644, BUFFER_SIZE); if((error = sem_init(&semaphore_libre, 0, BUFFER_SIZE)) < 0){
if (semaphore_libre == SEM_FAILED) printf("[acquisitionManager]Semaphore L Error No. %d\n",error);
{ return error;
perror("[sem_open"); }
return ERROR_INIT;
} if((error = sem_init(&semaphore_occupe, 0, 0)) < 0){
printf("[acquisitionManager]Semaphore O Error No. %d\n",error);
semaphore_occupe = sem_open("/acquisitionManager_semLibre", O_CREAT, 0644, 0); return error;
if (semaphore_occupe == SEM_FAILED) }
{
perror("[sem_open");
return ERROR_INIT;
}
//----Fin de l'initialisation sémaphore -----
printf("[acquisitionManager]Semaphore created\n"); printf("[acquisitionManager]Semaphore created\n");
return ERROR_SUCCESS; return ERROR_SUCCESS;
} }
static void incrementProducedCount(void) static void incrementProducedCount(void)
{ {
pthread_mutex_lock(&mutex_count); //TODO DONE
produceCount++; pthread_mutex_lock(&m_count);
pthread_mutex_unlock(&mutex_count); produceCount++;
pthread_mutex_unlock(&m_count);
} }
unsigned int getProducedCount(void) unsigned int getProducedCount(void)
{ {
//TODO DONE
unsigned int p = 0; unsigned int p = 0;
pthread_mutex_lock(&mutex_count); pthread_mutex_lock(&m_count);
p = produceCount; p = produceCount;
pthread_mutex_unlock(&mutex_count); pthread_mutex_unlock(&m_count);
return p; return p;
} }
MSG_BLOCK getMessage(void){ MSG_BLOCK getMessage(void){
//TODO //TODO
MSG_BLOCK message; MSG_BLOCK res;
static int index_count = 0;
static int index_lecture = 0;
sem_wait(semaphore_occupe); sem_wait(&semaphore_occupe);
res = Buffer[index_count];
message = buffer[index_lecture]; index_count = (index_count+1)%BUFFER_SIZE;
index_lecture = (index_lecture+1)%BUFFER_SIZE; incrementProducedCount();
sem_post(&semaphore_libre);
sem_post(semaphore_libre);
return res;
return message;
} }
//TODO create accessors to limit semaphore and mutex usage outside of this C module. //TODO create accessors to limit semaphore and mutex usage outside of this C module.
@ -110,10 +110,11 @@ unsigned int acquisitionManagerInit(void)
return ERROR_INIT; return ERROR_INIT;
printf("[acquisitionManager]Synchronization initialization done.\n"); printf("[acquisitionManager]Synchronization initialization done.\n");
for (i = 0; i < PRODUCER_COUNT; i++) for (i = 0; i < PRODUCER_COUNT; i++)
{ {
pthread_create(&producers[i], NULL, produce, NULL); //TODO DONE
pthread_create(&producers[i], NULL, produce, NULL);
} }
return ERROR_SUCCESS; return ERROR_SUCCESS;
@ -124,50 +125,49 @@ void acquisitionManagerJoin(void)
unsigned int i; unsigned int i;
for (i = 0; i < PRODUCER_COUNT; i++) for (i = 0; i < PRODUCER_COUNT; i++)
{ {
//TODO DONE
pthread_join(producers[i], NULL); pthread_join(producers[i], NULL);
} }
sem_destroy(semaphore_libre); //TODO DONE
sem_destroy(semaphore_occupe); sem_destroy(&semaphore_libre);
sem_destroy(&semaphore_occupe);
printf("[acquisitionManager]Semaphore cleaned\n"); printf("[acquisitionManager]Semaphore cleaned\n");
} }
void *produce(void* params) void *produce(void* params)
{ {
//D(printf("[acquisitionManager]Producer created with id %d\n", gettid())); D(printf("[acquisitionManager]Producer created with id %d\n", getpid()));
unsigned int i = 0; unsigned int i = 0;
int index_loc; int index_lock;
MSG_BLOCK message; int thread_num;
MSG_BLOCK message;
//Récupérer le numéro du thread
int num_thread; //Counting producers
pthread_mutex_lock(&mutex_count); pthread_mutex_lock(&m_count);
nbr_threads++; thread_num = thread_count;
num_thread = nbr_threads; thread_count++;
pthread_mutex_unlock(&mutex_count); pthread_mutex_unlock(&m_count);
while (i < PRODUCER_LOOP_LIMIT) while (i < PRODUCER_LOOP_LIMIT)
{ {
i++; i++;
sleep(PRODUCER_SLEEP_TIME+(rand() % 5)); sleep(PRODUCER_SLEEP_TIME+(rand() % 5));
//TODO DONE
//----- L'ajout d'un nouveau message ----- sem_wait(&semaphore_libre);
//On prend un jeton : une case se remplit
sem_wait(semaphore_libre); pthread_mutex_lock(&m_write);
index_lock = index_libre;
pthread_mutex_lock(&mutex_tab_indices); index_libre = (index_libre + 1)%BUFFER_SIZE;
index_loc = index_buffer_libre; pthread_mutex_unlock(&m_write);
index_buffer_libre = (index_buffer_libre+1)%BUFFER_SIZE;
pthread_mutex_unlock(&mutex_tab_indices); getInput(thread_num, &message);
Buffer[index_lock] = message;
//Récupérer l'entrée
getInput(num_thread, &message); sem_post(&semaphore_occupe);
buffer[index_loc] = message;
//On met un jeton dans le sémaphore "occupé"
sem_post(semaphore_occupe);
} }
//printf("[acquisitionManager] %d termination\n", get_id()); printf("[acquisitionManager] %d termination\n", getpid());
//TODO DONE
pthread_exit(NULL); pthread_exit(NULL);
} }

Binary file not shown.

BIN
T2/tp/exercice-1/display.o Normal file

Binary file not shown.

View file

@ -19,21 +19,29 @@ static void *display( void *parameters );
void displayManagerInit(void){ void displayManagerInit(void){
//TODO // TODO DONE
pthread_create(&displayThread, NULL, display, NULL);
} }
void displayManagerJoin(void){ void displayManagerJoin(void){
//TODO //TODO DONE
pthread_join(displayThread, NULL);
} }
static void *display( void *parameters ) static void *display( void *parameters )
{ {
//D(printf("[displayManager]Thread created for display with id %d\n", gettid())); D(printf("[displayManager]Thread created for display with id %d\n", getpid()));
unsigned int diffCount = 0; unsigned int diffCount = 0;
MSG_BLOCK mBlock;
while(diffCount < DISPLAY_LOOP_LIMIT){ while(diffCount < DISPLAY_LOOP_LIMIT){
sleep(DISPLAY_SLEEP_TIME); sleep(DISPLAY_SLEEP_TIME);
//TODO //TODO
diffCount++;
mBlock = getCurrentSum();
messageDisplay(&mBlock);
print(getProducedCount(),getConsumedCount());
} }
//printf("[displayManager] %d termination\n", gettid()); printf("[displayManager] %d termination\n", getpid());
//TODO //TODO DONE
pthread_exit(NULL);
} }

Binary file not shown.

View file

@ -29,11 +29,13 @@ static void *sum( void *parameters );
MSG_BLOCK getCurrentSum(){ MSG_BLOCK getCurrentSum(){
//TODO //TODO DONE
return out;
} }
unsigned int getConsumedCount(){ unsigned int getConsumedCount(){
//TODO //TODO
return consumeCount;
} }
@ -43,24 +45,40 @@ void messageAdderInit(void){
{ {
out.mData[i] = 0; out.mData[i] = 0;
} }
//TODO //TODO DONE
pthread_create(&consumer, NULL, sum, NULL);
} }
void messageAdderJoin(void){ void messageAdderJoin(void){
//TODO //TODO DONE
pthread_join(consumer, NULL);
}
static void incrementConsumeCount(void){
// pthread_mutex_lock(&m_count);
consumeCount++;
// pthread_mutex_unlock(&m_count);
} }
static void *sum( void *parameters ) static void *sum( void *parameters )
{ {
//D(printf("[messageAdder]Thread created for sum with id %d\n", gettid())); D(printf("[messageAdder]Thread created for sum with id %d\n", getpid()));
unsigned int i = 0; unsigned int i = 0;
MSG_BLOCK message;
while(i<ADDER_LOOP_LIMIT){ while(i<ADDER_LOOP_LIMIT){
i++; i++;
sleep(ADDER_SLEEP_TIME); sleep(ADDER_SLEEP_TIME);
//TODO //TODO
message = getMessage();
if(messageCheck(&message)){
messageAdd(&out,&message);
incrementConsumeCount();
}
// getCurrentSum();
} }
//printf("[messageAdder] %d termination\n", gettid()); printf("[messageAdder] %d termination\n", getpid());
//TODO //TODO
pthread_exit(NULL);
} }

Binary file not shown.

BIN
T2/tp/exercice-1/msg.o Normal file

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.