You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
CoAP/coapthon/utils.py

207 lines
4.8 KiB
Python

# -*- coding: utf-8 -*-
import binascii
import random
import string
__author__ = 'Giacomo Tanganelli'
def str_append_hash(*args):
""" Convert each argument to a lower case string, appended, then hash """
ret_hash = ""
for i in args:
if isinstance(i, (str, int)):
ret_hash += str(i).lower()
elif isinstance(i, bytes):
ret_hash += binascii.hexlify(i).decode("utf-8")
return hash(ret_hash)
def check_nocachekey(option):
"""
checks if an option is a NoCacheKey option or Etag
:param option:
:return:
"""
return ((option.number & 0x1E) == 0x1C) | (option.number == 4)
def check_code(code):
"""
checks if the response code is one of the valid ones defined in the rfc
:param code:
:return:
"""
if (65 <= code <= 69) or (128 <= code <= 134) or (code == 140) or (code == 141) or (code == 143) or (
160 <= code <= 165):
return
else:
raise InvalidResponseCode
"""
exception used to signal an invalid response code
"""
class InvalidResponseCode:
def __init__(self, code):
self.inv_code = code
def is_uri_option(number):
"""
checks if the option is part of uri-path, uri-host, uri-port, uri-query
:param number:
:return:
"""
if number == 3 | number == 7 | number == 11 | number == 15:
return True
return False
def generate_random_token(size):
return bytes([random.randint(0, 255) for _ in range(size)])
def parse_blockwise(value):
"""
Parse Blockwise option.
:param value: option value
:return: num, m, size
"""
length = byte_len(value)
if length == 1:
num = value & 0xF0
num >>= 4
m = value & 0x08
m >>= 3
size = value & 0x07
elif length == 2:
num = value & 0xFFF0
num >>= 4
m = value & 0x0008
m >>= 3
size = value & 0x0007
else:
num = value & 0xFFFFF0
num >>= 4
m = value & 0x000008
m >>= 3
size = value & 0x000007
return num, int(m), pow(2, (size + 4))
def byte_len(int_type):
"""
Get the number of byte needed to encode the int passed.
:param int_type: the int to be converted
:return: the number of bits needed to encode the int passed.
"""
length = 0
while int_type:
int_type >>= 1
length += 1
if length > 0:
if length % 8 != 0:
length = int(length / 8) + 1
else:
length = int(length / 8)
return length
def parse_uri(uri):
t = uri.split("://")
tmp = t[1]
t = tmp.split("/", 1)
tmp = t[0]
path = t[1]
if tmp.startswith("["):
t = tmp.split("]")
host = t[0][1:]
port = int(t[1][1:])
else:
t = tmp.split(":", 1)
try:
host = t[0]
port = int(t[1])
except IndexError:
host = tmp
port = 5683
return str(host), port, path
def create_logging(): # pragma: no cover
with open("logging.conf", "w") as f:
f.writelines("[loggers]\n")
f.writelines("keys=root\n\n")
f.writelines("[handlers]\n")
f.writelines("keys=consoleHandler\n\n")
f.writelines("[formatters]\n")
f.writelines("keys=simpleFormatter\n\n")
f.writelines("[logger_root]\n")
f.writelines("level=DEBUG\n")
f.writelines("handlers=consoleHandler\n\n")
f.writelines("[handler_consoleHandler]\n")
f.writelines("class=StreamHandler\n")
f.writelines("level=DEBUG\n")
f.writelines("formatter=simpleFormatter\n")
f.writelines("args=(sys.stdout,)\n\n")
f.writelines("[formatter_simpleFormatter]\n")
f.writelines("format=%(asctime)s - %(threadName)-10s - %(name)s - %(levelname)s - %(message)s\n")
f.writelines("datefmt=")
class Tree(object):
def __init__(self):
self.tree = {}
def dump(self):
"""
Get all the paths registered in the server.
:return: registered resources.
"""
return sorted(list(self.tree.keys()))
def with_prefix(self, path):
ret = []
for key in list(self.tree.keys()):
if path.startswith(key):
ret.append(key)
if len(ret) > 0:
return ret
raise KeyError
def with_prefix_resource(self, path):
ret = []
for key, value in self.tree.items():
if path.startswith(key):
ret.append(value)
if len(ret) > 0:
return ret
raise KeyError
def __getitem__(self, item):
return self.tree[item]
def __setitem__(self, key, value):
self.tree[key] = value
def __delitem__(self, key):
del self.tree[key]
def __contains__(self, item):
return item in self.tree