Merge pull request #10 from michieldwitte/master
Added purging transactions as client, don't add transaction when no_response is True
This commit is contained in:
commit
7816d86972
4 changed files with 30 additions and 6 deletions
|
@ -65,6 +65,13 @@ class CoAP(object):
|
||||||
|
|
||||||
self._receiver_thread = None
|
self._receiver_thread = None
|
||||||
|
|
||||||
|
def purge_transactions(self, timeout_time=defines.EXCHANGE_LIFETIME):
|
||||||
|
"""
|
||||||
|
Clean old transactions
|
||||||
|
|
||||||
|
"""
|
||||||
|
self._messageLayer.purge(timeout_time)
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
"""
|
"""
|
||||||
Stop the client.
|
Stop the client.
|
||||||
|
@ -96,16 +103,21 @@ class CoAP(object):
|
||||||
assert isinstance(c, int)
|
assert isinstance(c, int)
|
||||||
self._currentMID = c
|
self._currentMID = c
|
||||||
|
|
||||||
def send_message(self, message):
|
def send_message(self, message, no_response=False):
|
||||||
"""
|
"""
|
||||||
Prepare a message to send on the UDP socket. Eventually set retransmissions.
|
Prepare a message to send on the UDP socket. Eventually set retransmissions.
|
||||||
|
|
||||||
:param message: the message to send
|
:param message: the message to send
|
||||||
|
:param no_response: whether to await a response from the request
|
||||||
"""
|
"""
|
||||||
if isinstance(message, Request):
|
if isinstance(message, Request):
|
||||||
request = self._requestLayer.send_request(message)
|
request = self._requestLayer.send_request(message)
|
||||||
request = self._observeLayer.send_request(request)
|
request = self._observeLayer.send_request(request)
|
||||||
request = self._blockLayer.send_request(request)
|
request = self._blockLayer.send_request(request)
|
||||||
|
if no_response:
|
||||||
|
# don't add the send message to the message layer transactions
|
||||||
|
self.send_datagram(request)
|
||||||
|
return
|
||||||
transaction = self._messageLayer.send_request(request)
|
transaction = self._messageLayer.send_request(request)
|
||||||
self.send_datagram(transaction.request)
|
self.send_datagram(transaction.request)
|
||||||
if transaction.request.type == defines.Types["CON"]:
|
if transaction.request.type == defines.Types["CON"]:
|
||||||
|
|
|
@ -223,17 +223,26 @@ class HelperClient(object):
|
||||||
:param request: the request to send
|
:param request: the request to send
|
||||||
:param callback: the callback function to invoke upon response
|
:param callback: the callback function to invoke upon response
|
||||||
:param timeout: the timeout of the request
|
:param timeout: the timeout of the request
|
||||||
|
:param no_response: whether to await a response from the request
|
||||||
:return: the response
|
:return: the response
|
||||||
"""
|
"""
|
||||||
if callback is not None:
|
if callback is not None:
|
||||||
thread = threading.Thread(target=self._thread_body, args=(request, callback))
|
thread = threading.Thread(target=self._thread_body, args=(request, callback))
|
||||||
thread.start()
|
thread.start()
|
||||||
else:
|
else:
|
||||||
self.protocol.send_message(request)
|
self.protocol.send_message(request, no_response=no_response)
|
||||||
if no_response:
|
if no_response:
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
response = self.queue.get(block=True, timeout=timeout)
|
while True:
|
||||||
|
response = self.queue.get(block=True, timeout=timeout)
|
||||||
|
if response is not None:
|
||||||
|
if response.mid == request.mid:
|
||||||
|
return response
|
||||||
|
if response.type == defines.Types["NON"]:
|
||||||
|
return response
|
||||||
|
else:
|
||||||
|
return response
|
||||||
except Empty:
|
except Empty:
|
||||||
#if timeout is set
|
#if timeout is set
|
||||||
response = None
|
response = None
|
||||||
|
|
|
@ -42,17 +42,17 @@ class MessageLayer(object):
|
||||||
self._current_mid %= 65535
|
self._current_mid %= 65535
|
||||||
return current_mid
|
return current_mid
|
||||||
|
|
||||||
def purge(self):
|
def purge(self, timeout_time=defines.EXCHANGE_LIFETIME):
|
||||||
for k in list(self._transactions.keys()):
|
for k in list(self._transactions.keys()):
|
||||||
now = time.time()
|
now = time.time()
|
||||||
transaction = self._transactions[k]
|
transaction = self._transactions[k]
|
||||||
if transaction.timestamp + defines.EXCHANGE_LIFETIME < now:
|
if transaction.timestamp + timeout_time < now:
|
||||||
logger.debug("Delete transaction")
|
logger.debug("Delete transaction")
|
||||||
del self._transactions[k]
|
del self._transactions[k]
|
||||||
for k in list(self._transactions_token.keys()):
|
for k in list(self._transactions_token.keys()):
|
||||||
now = time.time()
|
now = time.time()
|
||||||
transaction = self._transactions_token[k]
|
transaction = self._transactions_token[k]
|
||||||
if transaction.timestamp + defines.EXCHANGE_LIFETIME < now:
|
if transaction.timestamp + timeout_time < now:
|
||||||
logger.debug("Delete transaction")
|
logger.debug("Delete transaction")
|
||||||
del self._transactions_token[k]
|
del self._transactions_token[k]
|
||||||
|
|
||||||
|
|
|
@ -129,6 +129,9 @@ class Serializer(object):
|
||||||
return defines.Codes.BAD_REQUEST.number
|
return defines.Codes.BAD_REQUEST.number
|
||||||
except struct.error:
|
except struct.error:
|
||||||
return defines.Codes.BAD_REQUEST.number
|
return defines.Codes.BAD_REQUEST.number
|
||||||
|
except UnicodeDecodeError as e:
|
||||||
|
logger.debug(e)
|
||||||
|
return defines.Codes.BAD_REQUEST.number
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def serialize(message):
|
def serialize(message):
|
||||||
|
|
Loading…
Reference in a new issue