Superviseur Global

Création de la matrice d'état
injection du nouveau RTO
This commit is contained in:
leo 2021-07-05 14:17:40 +02:00
parent bf2508b476
commit 4d3db32cbc
4 changed files with 116 additions and 12 deletions

View file

@ -199,7 +199,7 @@ class CoAP(object):
""" """
with transaction: with transaction:
if message.type == defines.Types['CON']: if message.type == defines.Types['CON']:
future_time = random.uniform(defines.ACK_TIMEOUT, (defines.ACK_TIMEOUT * defines.ACK_RANDOM_FACTOR)) future_time = self.superviseur.RTO # random.uniform(defines.ACK_TIMEOUT, (defines.ACK_TIMEOUT * defines.ACK_RANDOM_FACTOR))
transaction.retransmit_stop = threading.Event() transaction.retransmit_stop = threading.Event()
self.to_be_stopped.append(transaction.retransmit_stop) self.to_be_stopped.append(transaction.retransmit_stop)
transaction.retransmit_thread = threading.Thread(target=self._retransmit, transaction.retransmit_thread = threading.Thread(target=self._retransmit,

View file

@ -1,3 +1,4 @@
from coapthon.client import superviseur
import random import random
from multiprocessing import Queue from multiprocessing import Queue
from queue import Empty from queue import Empty
@ -28,7 +29,14 @@ class HelperClient(object):
self.protocol = CoAP(self.server, random.randint(1, 65535), self._wait_response, sock=sock, self.protocol = CoAP(self.server, random.randint(1, 65535), self._wait_response, sock=sock,
cb_ignore_read_exception=cb_ignore_read_exception, cb_ignore_write_exception=cb_ignore_write_exception) cb_ignore_read_exception=cb_ignore_read_exception, cb_ignore_write_exception=cb_ignore_write_exception)
self.queue = Queue() self.queue = Queue()
self.superviseur = self.protocol.superviseur
@property
def superviseur(self):
return self.protocol.superviseur
@superviseur.setter
def superviseur(self, value):
self.protocol.superviseur = value
def _wait_response(self, message): def _wait_response(self, message):
""" """

View file

@ -1,14 +1,20 @@
import random
import time import time
import numpy as np
from coapthon import defines
class SuperviseurLocalPlaceHolder(): class SuperviseurLocalPlaceHolder():
"""Class de base pour le superviseur """Class de base pour le superviseur
""" """
def __init__(self, client_CoAP) -> None: def __init__(self, client_CoAP) -> None:
client_CoAP.superviseur = self
self.client = client_CoAP self.client = client_CoAP
self._RTTs = [] self._RTTs = []
self._taux_retransmition = 0 self._taux_retransmition = 0
self._RTO = defines.ACK_TIMEOUT
def envoie_message(self, message) -> None: def envoie_message(self, message) -> None:
self.envoie_token(message.token) self.envoie_token(message.token)
@ -27,7 +33,7 @@ class SuperviseurLocalPlaceHolder():
return self._RTTs return self._RTTs
@property @property
def taux_retransmition(self): def taux_retransmission(self):
return self._taux_retransmition return self._taux_retransmition
@property @property
@ -40,6 +46,10 @@ class SuperviseurLocalPlaceHolder():
"""Moyenne du RTT.""" """Moyenne du RTT."""
return sum(self.RTTs)/len(self.RTTs) return sum(self.RTTs)/len(self.RTTs)
@property
def RTO(self):
return random.uniform(self._RTO, (self._RTO * defines.ACK_RANDOM_FACTOR))
class SuperviseurLocal(SuperviseurLocalPlaceHolder): class SuperviseurLocal(SuperviseurLocalPlaceHolder):
""" """
@ -61,6 +71,7 @@ class SuperviseurLocal(SuperviseurLocalPlaceHolder):
self._n_envoie += 1 self._n_envoie += 1
self._n_tokken += not(token in self._dict_envoie) self._n_tokken += not(token in self._dict_envoie)
self._dict_envoie[token] = time.time() self._dict_envoie[token] = time.time()
self._taux_retransmition = 1 - self._n_tokken/self._n_envoie
def reception_token(self, token) -> None: def reception_token(self, token) -> None:
"""Enregistre l'arrivée d'un token """Enregistre l'arrivée d'un token
@ -71,24 +82,24 @@ class SuperviseurLocal(SuperviseurLocalPlaceHolder):
if token in self._dict_envoie: if token in self._dict_envoie:
rtt = time.time() - self._dict_envoie[token] rtt = time.time() - self._dict_envoie[token]
self._RTTs.append(time.time() - self._dict_envoie[token]) self._RTTs.append(time.time() - self._dict_envoie[token])
del self._dict_envoie[token] # del self._dict_envoie[token]
self.callback_new_rtt(rtt) self.callback_new_rtt(rtt)
else: else:
pass # raise ValueError("Tokken inconnue") pass # raise ValueError("Tokken inconnue")
@property
def tau_retransmission(self):
"""Rapport du nombre de message correspondant à des restransmission sur le nombre de message total."""
return 1 - self._n_tokken/self._n_envoie
def callback_new_rtt(self, rtt): def callback_new_rtt(self, rtt):
pass pass
def reset(self):
self._dict_envoie = {}
self._n_envoie = 0
self._n_tokken = 0
self._RTTs = []
class SuperviseurLocalFiltre(SuperviseurLocal): class SuperviseurLocalFiltre(SuperviseurLocal):
def __init__(self, client_CoAP, rtt_init=0.001, alpha_l=0.01, alpha_s=0.1) -> None: def __init__(self, client_CoAP, rtt_init=0.01, alpha_l=0.01, alpha_s=0.1) -> None:
super().__init__(client_CoAP) super().__init__(client_CoAP)
self._dict_envoie = {}
self.alpha_l = alpha_l self.alpha_l = alpha_l
self.alpha_s = alpha_s self.alpha_s = alpha_s
self._RTT_L = rtt_init self._RTT_L = rtt_init
@ -106,3 +117,42 @@ class SuperviseurLocalFiltre(SuperviseurLocal):
@property @property
def RTT_S(self): def RTT_S(self):
return self._RTT_S return self._RTT_S
class SuperviseurGlobal():
def __init__(self, clients, superviseur_type, *superviseur_args) -> None:
"""Genère un superviseur global pour la liste de client donnée
Args:
clients (List(HelperClient)): Liste des clients à supervisé
superviseur_type (Type): Type de superviseur à utilisé
"""
self.clients = clients
self.superviseurs = [superviseur_type(
client, *superviseur_args) for client in clients]
@property
def state(self):
taux_retransmissions = np.array(
[superviseur.taux_retransmission for superviseur in self.superviseurs])
min_rtts = np.array(
[superviseur.min_RTT for superviseur in self.superviseurs])
avg_rtts = np.array(
[superviseur.avg_RTT for superviseur in self.superviseurs])
ratio_rtts = np.array(min_rtts/avg_rtts)
if isinstance(self.superviseurs[0], SuperviseurLocalFiltre):
rtt_ls = np.array(
[superviseur.RTT_L for superviseur in self.superviseurs])
rtt_ss = np.array(
[superviseur.RTT_S for superviseur in self.superviseurs])
ratio_filtres = rtt_ss/rtt_ls
representation_etat = np.array(
[taux_retransmissions, ratio_rtts, ratio_filtres])
else:
representation_etat = np.array([taux_retransmissions, ratio_rtts])
return representation_etat

46
demo_global.py Normal file
View file

@ -0,0 +1,46 @@
import socket
import threading
import time
import matplotlib.pyplot as plt
from coapthon.client.helperclient import HelperClient
from coapthon.client.superviseur import (SuperviseurLocal,
SuperviseurLocalFiltre,
SuperviseurGlobal)
from coapthon.utils import parse_uri
N_rep = 50
N_client = 25
host, port, path = parse_uri("coap://raspberrypi.local/basic")
try:
tmp = socket.gethostbyname(host)
host = tmp
except socket.gaierror:
pass
clients = [HelperClient(server=(host, port)) for _ in range(N_client)]
super_global = SuperviseurGlobal(clients, SuperviseurLocalFiltre)
def experience(client, N_rep):
for n_rep in range(N_rep):
response = client.get(path)
client.stop()
threads = [threading.Thread(target=experience, args=[
client, N_rep], name='Thread-experience-{}'.format(n)) for n, client in enumerate(clients)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print(super_global.state)
#[client.stop() for client in clients]