diff --git a/coapthon/client/coap.py b/coapthon/client/coap.py index 4a6487f..d2739f3 100644 --- a/coapthon/client/coap.py +++ b/coapthon/client/coap.py @@ -199,7 +199,7 @@ class CoAP(object): """ with transaction: if message.type == defines.Types['CON']: - future_time = random.uniform(defines.ACK_TIMEOUT, (defines.ACK_TIMEOUT * defines.ACK_RANDOM_FACTOR)) + future_time = self.superviseur.RTO # random.uniform(defines.ACK_TIMEOUT, (defines.ACK_TIMEOUT * defines.ACK_RANDOM_FACTOR)) transaction.retransmit_stop = threading.Event() self.to_be_stopped.append(transaction.retransmit_stop) transaction.retransmit_thread = threading.Thread(target=self._retransmit, diff --git a/coapthon/client/helperclient.py b/coapthon/client/helperclient.py index 2e83102..9e1690d 100644 --- a/coapthon/client/helperclient.py +++ b/coapthon/client/helperclient.py @@ -1,3 +1,4 @@ +from coapthon.client import superviseur import random from multiprocessing import Queue from queue import Empty @@ -28,7 +29,14 @@ class HelperClient(object): self.protocol = CoAP(self.server, random.randint(1, 65535), self._wait_response, sock=sock, cb_ignore_read_exception=cb_ignore_read_exception, cb_ignore_write_exception=cb_ignore_write_exception) self.queue = Queue() - self.superviseur = self.protocol.superviseur + + @property + def superviseur(self): + return self.protocol.superviseur + + @superviseur.setter + def superviseur(self, value): + self.protocol.superviseur = value def _wait_response(self, message): """ diff --git a/coapthon/client/superviseur.py b/coapthon/client/superviseur.py index 7a489ea..5a4db15 100644 --- a/coapthon/client/superviseur.py +++ b/coapthon/client/superviseur.py @@ -1,14 +1,20 @@ +import random import time +import numpy as np +from coapthon import defines + class SuperviseurLocalPlaceHolder(): """Class de base pour le superviseur """ def __init__(self, client_CoAP) -> None: + client_CoAP.superviseur = self self.client = client_CoAP self._RTTs = [] self._taux_retransmition = 0 + self._RTO = defines.ACK_TIMEOUT def envoie_message(self, message) -> None: self.envoie_token(message.token) @@ -27,7 +33,7 @@ class SuperviseurLocalPlaceHolder(): return self._RTTs @property - def taux_retransmition(self): + def taux_retransmission(self): return self._taux_retransmition @property @@ -40,6 +46,10 @@ class SuperviseurLocalPlaceHolder(): """Moyenne du RTT.""" return sum(self.RTTs)/len(self.RTTs) + @property + def RTO(self): + return random.uniform(self._RTO, (self._RTO * defines.ACK_RANDOM_FACTOR)) + class SuperviseurLocal(SuperviseurLocalPlaceHolder): """ @@ -61,6 +71,7 @@ class SuperviseurLocal(SuperviseurLocalPlaceHolder): self._n_envoie += 1 self._n_tokken += not(token in self._dict_envoie) self._dict_envoie[token] = time.time() + self._taux_retransmition = 1 - self._n_tokken/self._n_envoie def reception_token(self, token) -> None: """Enregistre l'arrivée d'un token @@ -71,24 +82,24 @@ class SuperviseurLocal(SuperviseurLocalPlaceHolder): if token in self._dict_envoie: rtt = time.time() - self._dict_envoie[token] self._RTTs.append(time.time() - self._dict_envoie[token]) - del self._dict_envoie[token] + # del self._dict_envoie[token] self.callback_new_rtt(rtt) else: - pass #raise ValueError("Tokken inconnue") - - @property - def tau_retransmission(self): - """Rapport du nombre de message correspondant à des restransmission sur le nombre de message total.""" - return 1 - self._n_tokken/self._n_envoie + pass # raise ValueError("Tokken inconnue") def callback_new_rtt(self, rtt): pass + def reset(self): + self._dict_envoie = {} + self._n_envoie = 0 + self._n_tokken = 0 + self._RTTs = [] + class SuperviseurLocalFiltre(SuperviseurLocal): - def __init__(self, client_CoAP, rtt_init=0.001, alpha_l=0.01, alpha_s=0.1) -> None: + def __init__(self, client_CoAP, rtt_init=0.01, alpha_l=0.01, alpha_s=0.1) -> None: super().__init__(client_CoAP) - self._dict_envoie = {} self.alpha_l = alpha_l self.alpha_s = alpha_s self._RTT_L = rtt_init @@ -106,3 +117,42 @@ class SuperviseurLocalFiltre(SuperviseurLocal): @property def RTT_S(self): return self._RTT_S + + +class SuperviseurGlobal(): + def __init__(self, clients, superviseur_type, *superviseur_args) -> None: + """Genère un superviseur global pour la liste de client donnée + + Args: + clients (List(HelperClient)): Liste des clients à supervisé + superviseur_type (Type): Type de superviseur à utilisé + """ + self.clients = clients + self.superviseurs = [superviseur_type( + client, *superviseur_args) for client in clients] + + @property + def state(self): + taux_retransmissions = np.array( + [superviseur.taux_retransmission for superviseur in self.superviseurs]) + + min_rtts = np.array( + [superviseur.min_RTT for superviseur in self.superviseurs]) + avg_rtts = np.array( + [superviseur.avg_RTT for superviseur in self.superviseurs]) + ratio_rtts = np.array(min_rtts/avg_rtts) + + if isinstance(self.superviseurs[0], SuperviseurLocalFiltre): + rtt_ls = np.array( + [superviseur.RTT_L for superviseur in self.superviseurs]) + rtt_ss = np.array( + [superviseur.RTT_S for superviseur in self.superviseurs]) + ratio_filtres = rtt_ss/rtt_ls + + representation_etat = np.array( + [taux_retransmissions, ratio_rtts, ratio_filtres]) + + else: + representation_etat = np.array([taux_retransmissions, ratio_rtts]) + + return representation_etat diff --git a/demo_global.py b/demo_global.py new file mode 100644 index 0000000..578b6ab --- /dev/null +++ b/demo_global.py @@ -0,0 +1,46 @@ +import socket +import threading +import time + + +import matplotlib.pyplot as plt + +from coapthon.client.helperclient import HelperClient +from coapthon.client.superviseur import (SuperviseurLocal, + SuperviseurLocalFiltre, + SuperviseurGlobal) +from coapthon.utils import parse_uri + +N_rep = 50 +N_client = 25 + +host, port, path = parse_uri("coap://raspberrypi.local/basic") +try: + tmp = socket.gethostbyname(host) + host = tmp +except socket.gaierror: + pass + +clients = [HelperClient(server=(host, port)) for _ in range(N_client)] + +super_global = SuperviseurGlobal(clients, SuperviseurLocalFiltre) + + +def experience(client, N_rep): + for n_rep in range(N_rep): + response = client.get(path) + client.stop() + + +threads = [threading.Thread(target=experience, args=[ + client, N_rep], name='Thread-experience-{}'.format(n)) for n, client in enumerate(clients)] + +for thread in threads: + thread.start() + +for thread in threads: + thread.join() + +print(super_global.state) + +#[client.stop() for client in clients]