314 lines
13 KiB
Python
314 lines
13 KiB
Python
import logging
|
|
import random
|
|
import time
|
|
import socket
|
|
|
|
from coapthon import utils
|
|
from coapthon.messages.message import Message
|
|
from coapthon import defines
|
|
from coapthon.messages.request import Request
|
|
from coapthon.transaction import Transaction
|
|
|
|
__author__ = 'Giacomo Tanganelli'
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MessageLayer(object):
|
|
"""
|
|
Handles matching between messages (Message ID) and request/response (Token)
|
|
"""
|
|
def __init__(self, starting_mid):
|
|
"""
|
|
Set the layer internal structure.
|
|
|
|
:param starting_mid: the first mid used to send messages.
|
|
"""
|
|
self._transactions = {}
|
|
self._transactions_token = {}
|
|
if starting_mid is not None:
|
|
self._current_mid = starting_mid
|
|
else:
|
|
self._current_mid = random.randint(1, 1000)
|
|
|
|
def fetch_mid(self):
|
|
"""
|
|
Gets the next valid MID.
|
|
|
|
:return: the mid to use
|
|
"""
|
|
current_mid = self._current_mid
|
|
self._current_mid += 1
|
|
self._current_mid %= 65535
|
|
return current_mid
|
|
|
|
def purge(self):
|
|
for k in list(self._transactions.keys()):
|
|
now = time.time()
|
|
transaction = self._transactions[k]
|
|
if transaction.timestamp + defines.EXCHANGE_LIFETIME < now:
|
|
logger.debug("Delete transaction")
|
|
del self._transactions[k]
|
|
for k in list(self._transactions_token.keys()):
|
|
now = time.time()
|
|
transaction = self._transactions_token[k]
|
|
if transaction.timestamp + defines.EXCHANGE_LIFETIME < now:
|
|
logger.debug("Delete transaction")
|
|
del self._transactions_token[k]
|
|
|
|
def receive_request(self, request):
|
|
"""
|
|
Handle duplicates and store received messages.
|
|
|
|
:type request: Request
|
|
:param request: the incoming request
|
|
:rtype : Transaction
|
|
:return: the edited transaction
|
|
"""
|
|
logger.info("receive_request - " + str(request))
|
|
try:
|
|
host, port = request.source
|
|
except AttributeError:
|
|
return
|
|
key_mid = utils.str_append_hash(host, port, request.mid)
|
|
key_token = utils.str_append_hash(host, port, request.token)
|
|
|
|
if key_mid in list(self._transactions.keys()):
|
|
# Duplicated
|
|
self._transactions[key_mid].request.duplicated = True
|
|
transaction = self._transactions[key_mid]
|
|
else:
|
|
request.timestamp = time.time()
|
|
transaction = Transaction(request=request, timestamp=request.timestamp)
|
|
with transaction:
|
|
self._transactions[key_mid] = transaction
|
|
self._transactions_token[key_token] = transaction
|
|
return transaction
|
|
|
|
def receive_response(self, response):
|
|
"""
|
|
Pair responses with requests.
|
|
|
|
:type response: Response
|
|
:param response: the received response
|
|
:rtype : Transaction
|
|
:return: the transaction to which the response belongs to
|
|
"""
|
|
logger.info("receive_response - " + str(response))
|
|
try:
|
|
host, port = response.source
|
|
except AttributeError:
|
|
return
|
|
all_coap_nodes = defines.ALL_COAP_NODES_IPV6 if socket.getaddrinfo(host, None)[0][0] == socket.AF_INET6 else defines.ALL_COAP_NODES
|
|
key_mid = utils.str_append_hash(host, port, response.mid)
|
|
key_mid_multicast = utils.str_append_hash(all_coap_nodes, port, response.mid)
|
|
key_token = utils.str_append_hash(host, port, response.token)
|
|
key_token_multicast = utils.str_append_hash(all_coap_nodes, port, response.token)
|
|
if key_mid in list(self._transactions.keys()):
|
|
transaction = self._transactions[key_mid]
|
|
if response.token != transaction.request.token:
|
|
logger.warning("Tokens does not match - response message " + str(host) + ":" + str(port))
|
|
return None, False
|
|
elif key_token in self._transactions_token:
|
|
transaction = self._transactions_token[key_token]
|
|
elif key_mid_multicast in list(self._transactions.keys()):
|
|
transaction = self._transactions[key_mid_multicast]
|
|
elif key_token_multicast in self._transactions_token:
|
|
transaction = self._transactions_token[key_token_multicast]
|
|
if response.token != transaction.request.token:
|
|
logger.warning("Tokens does not match - response message " + str(host) + ":" + str(port))
|
|
return None, False
|
|
else:
|
|
logger.warning("Un-Matched incoming response message " + str(host) + ":" + str(port))
|
|
return None, False
|
|
send_ack = False
|
|
if response.type == defines.Types["CON"]:
|
|
send_ack = True
|
|
|
|
transaction.request.acknowledged = True
|
|
transaction.completed = True
|
|
transaction.response = response
|
|
if transaction.retransmit_stop is not None:
|
|
transaction.retransmit_stop.set()
|
|
return transaction, send_ack
|
|
|
|
def receive_empty(self, message):
|
|
"""
|
|
Pair ACKs with requests.
|
|
|
|
:type message: Message
|
|
:param message: the received message
|
|
:rtype : Transaction
|
|
:return: the transaction to which the message belongs to
|
|
"""
|
|
logger.info("receive_empty - " + str(message))
|
|
try:
|
|
host, port = message.source
|
|
except AttributeError:
|
|
return
|
|
all_coap_nodes = defines.ALL_COAP_NODES_IPV6 if socket.getaddrinfo(host, None)[0][0] == socket.AF_INET6 else defines.ALL_COAP_NODES
|
|
key_mid = utils.str_append_hash(host, port, message.mid)
|
|
key_mid_multicast = utils.str_append_hash(all_coap_nodes, port, message.mid)
|
|
key_token = utils.str_append_hash(host, port, message.token)
|
|
key_token_multicast = utils.str_append_hash(all_coap_nodes, port, message.token)
|
|
if key_mid in list(self._transactions.keys()):
|
|
transaction = self._transactions[key_mid]
|
|
elif key_token in self._transactions_token:
|
|
transaction = self._transactions_token[key_token]
|
|
elif key_mid_multicast in list(self._transactions.keys()):
|
|
transaction = self._transactions[key_mid_multicast]
|
|
elif key_token_multicast in self._transactions_token:
|
|
transaction = self._transactions_token[key_token_multicast]
|
|
else:
|
|
logger.warning("Un-Matched incoming empty message " + str(host) + ":" + str(port))
|
|
return None
|
|
|
|
if message.type == defines.Types["ACK"]:
|
|
if not transaction.request.acknowledged:
|
|
transaction.request.acknowledged = True
|
|
elif (transaction.response is not None) and (not transaction.response.acknowledged):
|
|
transaction.response.acknowledged = True
|
|
elif message.type == defines.Types["RST"]:
|
|
if not transaction.request.acknowledged:
|
|
transaction.request.rejected = True
|
|
elif not transaction.response.acknowledged:
|
|
transaction.response.rejected = True
|
|
elif message.type == defines.Types["CON"]:
|
|
#implicit ACK (might have been lost)
|
|
logger.debug("Implicit ACK on received CON for waiting transaction")
|
|
transaction.request.acknowledged = True
|
|
else:
|
|
logger.warning("Unhandled message type...")
|
|
|
|
if transaction.retransmit_stop is not None:
|
|
transaction.retransmit_stop.set()
|
|
|
|
return transaction
|
|
|
|
def send_request(self, request):
|
|
"""
|
|
Create the transaction and fill it with the outgoing request.
|
|
|
|
:type request: Request
|
|
:param request: the request to send
|
|
:rtype : Transaction
|
|
:return: the created transaction
|
|
"""
|
|
logger.info("send_request - " + str(request))
|
|
assert isinstance(request, Request)
|
|
try:
|
|
host, port = request.destination
|
|
except AttributeError:
|
|
return
|
|
|
|
request.timestamp = time.time()
|
|
transaction = Transaction(request=request, timestamp=request.timestamp)
|
|
if transaction.request.type is None:
|
|
transaction.request.type = defines.Types["CON"]
|
|
|
|
if transaction.request.mid is None:
|
|
transaction.request.mid = self.fetch_mid()
|
|
|
|
key_mid = utils.str_append_hash(host, port, request.mid)
|
|
self._transactions[key_mid] = transaction
|
|
|
|
key_token = utils.str_append_hash(host, port, request.token)
|
|
self._transactions_token[key_token] = transaction
|
|
|
|
return self._transactions[key_mid]
|
|
|
|
def send_response(self, transaction):
|
|
"""
|
|
Set the type, the token and eventually the MID for the outgoing response
|
|
|
|
:type transaction: Transaction
|
|
:param transaction: the transaction that owns the response
|
|
:rtype : Transaction
|
|
:return: the edited transaction
|
|
"""
|
|
logger.info("send_response - " + str(transaction.response))
|
|
if transaction.response.type is None:
|
|
if transaction.request.type == defines.Types["CON"] and not transaction.request.acknowledged:
|
|
transaction.response.type = defines.Types["ACK"]
|
|
transaction.response.mid = transaction.request.mid
|
|
transaction.response.acknowledged = True
|
|
transaction.completed = True
|
|
elif transaction.request.type == defines.Types["NON"]:
|
|
transaction.response.type = defines.Types["NON"]
|
|
else:
|
|
transaction.response.type = defines.Types["CON"]
|
|
transaction.response.token = transaction.request.token
|
|
|
|
if transaction.response.mid is None:
|
|
transaction.response.mid = self.fetch_mid()
|
|
try:
|
|
host, port = transaction.response.destination
|
|
except AttributeError:
|
|
return
|
|
key_mid = utils.str_append_hash(host, port, transaction.response.mid)
|
|
self._transactions[key_mid] = transaction
|
|
|
|
transaction.request.acknowledged = True
|
|
return transaction
|
|
|
|
def send_empty(self, transaction, related, message):
|
|
"""
|
|
Manage ACK or RST related to a transaction. Sets if the transaction has been acknowledged or rejected.
|
|
|
|
:param transaction: the transaction
|
|
:param related: if the ACK/RST message is related to the request or the response. Must be equal to
|
|
transaction.request or to transaction.response or None
|
|
:type message: Message
|
|
:param message: the ACK or RST message to send
|
|
"""
|
|
logger.info("send_empty - " + str(message))
|
|
if transaction is None:
|
|
try:
|
|
host, port = message.destination
|
|
except AttributeError:
|
|
return
|
|
key_mid = utils.str_append_hash(host, port, message.mid)
|
|
key_token = utils.str_append_hash(host, port, message.token)
|
|
if key_mid in self._transactions:
|
|
transaction = self._transactions[key_mid]
|
|
related = transaction.response
|
|
elif key_token in self._transactions_token:
|
|
transaction = self._transactions_token[key_token]
|
|
related = transaction.response
|
|
else:
|
|
return message
|
|
|
|
if message.type == defines.Types["ACK"]:
|
|
if transaction.request == related:
|
|
transaction.request.acknowledged = True
|
|
transaction.completed = True
|
|
message.mid = transaction.request.mid
|
|
message.code = 0
|
|
message.destination = transaction.request.source
|
|
elif transaction.response == related:
|
|
transaction.response.acknowledged = True
|
|
transaction.completed = True
|
|
message.mid = transaction.response.mid
|
|
message.code = 0
|
|
message.token = transaction.response.token
|
|
message.destination = transaction.response.source
|
|
|
|
elif message.type == defines.Types["RST"]:
|
|
if transaction.request == related:
|
|
transaction.request.rejected = True
|
|
message._mid = transaction.request.mid
|
|
if message.mid is None:
|
|
message.mid = self.fetch_mid()
|
|
message.code = 0
|
|
message.token = transaction.request.token
|
|
message.destination = transaction.request.source
|
|
elif transaction.response == related:
|
|
transaction.response.rejected = True
|
|
transaction.completed = True
|
|
message._mid = transaction.response.mid
|
|
if message.mid is None:
|
|
message.mid = self.fetch_mid()
|
|
message.code = 0
|
|
message.token = transaction.response.token
|
|
message.destination = transaction.response.source
|
|
return message
|