You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
CoAP/coverage_test_proxy.py

1422 lines
54 KiB
Python

# -*- coding: utf-8 -*-
from queue import Queue
import random
import socket
import threading
import unittest
from coapclient import HelperClient
from coapforwardproxy import CoAPForwardProxy
from coapserver import CoAPServer
from coapthon import defines
from coapthon.messages.option import Option
from coapthon.messages.request import Request
from coapthon.messages.response import Response
from coapthon.serializer import Serializer
__author__ = 'Giacomo Tanganelli'
__version__ = "2.0"
class Tests(unittest.TestCase):
def setUp(self):
self.server_address = ("127.0.0.1", 5683)
self.current_mid = random.randint(1, 1000)
self.server_mid = random.randint(1000, 2000)
self.server = CoAPServer("127.0.0.1", 5684)
self.server_thread = threading.Thread(target=self.server.listen, args=(1,))
self.server_thread.start()
self.proxy = CoAPForwardProxy("127.0.0.1", 5683)
self.proxy_thread = threading.Thread(target=self.proxy.listen, args=(1,))
self.proxy_thread.start()
self.queue = Queue()
def tearDown(self):
self.proxy.close()
self.proxy_thread.join(timeout=25)
self.proxy = None
self.server.close()
self.server_thread.join(timeout=25)
self.server = None
def _test_with_client(self, message_list): # pragma: no cover
client = HelperClient(self.server_address)
for message, expected in message_list:
if message is not None:
received_message = client.send_request(message)
if expected is not None:
if expected.type is not None:
self.assertEqual(received_message.type, expected.type)
if expected.mid is not None:
self.assertEqual(received_message.mid, expected.mid)
self.assertEqual(received_message.code, expected.code)
if expected.source is not None:
self.assertEqual(received_message.source, self.server_address)
if expected.token is not None:
self.assertEqual(received_message.token, expected.token)
if expected.payload is not None:
self.assertEqual(received_message.payload, expected.payload)
if expected.options:
self.assertEqual(len(received_message.options), len(expected.options))
for o in expected.options:
assert isinstance(o, Option)
option_value = getattr(expected, o.name.lower().replace("-", "_"))
option_value_rec = getattr(received_message, o.name.lower().replace("-", "_"))
self.assertEqual(option_value, option_value_rec)
client.stop()
def _test_with_client_observe(self, message_list): # pragma: no cover
client = HelperClient(self.server_address)
for message, expected in message_list:
if message is not None:
client.send_request(message, self.client_callback)
if expected is not None:
received_message = self.queue.get()
if expected.type is not None:
self.assertEqual(received_message.type, expected.type)
if expected.mid is not None:
self.assertEqual(received_message.mid, expected.mid)
self.assertEqual(received_message.code, expected.code)
if expected.source is not None:
self.assertEqual(received_message.source, self.server_address)
if expected.token is not None:
self.assertEqual(received_message.token, expected.token)
if expected.payload is not None:
self.assertEqual(received_message.payload, expected.payload)
if expected.options:
self.assertEqual(len(received_message.options), len(expected.options))
for o in expected.options:
assert isinstance(o, Option)
option_value = getattr(expected, o.name.lower().replace("-", "_"))
option_value_rec = getattr(received_message, o.name.lower().replace("-", "_"))
self.assertEqual(option_value, option_value_rec)
client.stop()
def client_callback(self, response):
print("Callback")
self.queue.put(response)
def _test_plugtest(self, message_list): # pragma: no cover
serializer = Serializer()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
for message, expected in message_list:
if message is not None:
datagram = serializer.serialize(message)
sock.sendto(datagram, message.destination)
if expected is not None:
datagram, source = sock.recvfrom(4096)
received_message = serializer.deserialize(datagram, source)
print(received_message.pretty_print())
print(expected.pretty_print())
if expected.type is not None:
self.assertEqual(received_message.type, expected.type)
if expected.mid is not None:
self.assertEqual(received_message.mid, expected.mid)
self.assertEqual(received_message.code, expected.code)
if expected.source is not None:
self.assertEqual(received_message.source, source)
if expected.token is not None:
self.assertEqual(received_message.token, expected.token)
if expected.payload is not None:
self.assertEqual(received_message.payload, expected.payload)
if expected.options is not None:
self.assertEqual(received_message.options, expected.options)
for o in expected.options:
assert isinstance(o, Option)
option_value = getattr(expected, o.name.lower().replace("-", "_"))
option_value_rec = getattr(received_message, o.name.lower().replace("-", "_"))
self.assertEqual(option_value, option_value_rec)
sock.close()
def _test_datagram(self, message_list): # pragma: no cover
serializer = Serializer()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
for message, expected in message_list:
if message is not None:
datagram, destination = message
sock.sendto(datagram, destination)
if expected is not None:
datagram, source = sock.recvfrom(4096)
received_message = serializer.deserialize(datagram, source)
if expected.type is not None:
self.assertEqual(received_message.type, expected.type)
if expected.mid is not None:
self.assertEqual(received_message.mid, expected.mid)
self.assertEqual(received_message.code, expected.code)
if expected.source is not None:
self.assertEqual(received_message.source, source)
if expected.token is not None:
self.assertEqual(received_message.token, expected.token)
if expected.payload is not None:
self.assertEqual(received_message.payload, expected.payload)
if expected.options is not None:
self.assertEqual(received_message.options, expected.options)
for o in expected.options:
assert isinstance(o, Option)
option_value = getattr(expected, o.name.lower().replace("-", "_"))
option_value_rec = getattr(received_message, o.name.lower().replace("-", "_"))
self.assertEqual(option_value, option_value_rec)
sock.close()
def test_get_forward(self):
print("TEST_GET_FORWARD")
req = Request()
req.code = defines.Codes.GET.number
req.proxy_uri = "coap://127.0.0.1:5684/basic"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.CONTENT.number
expected.token = None
expected.payload = "Basic Resource"
exchange1 = (req, expected)
self.current_mid += 1
self._test_with_client([exchange1])
def test_separate(self):
print("TEST_SEPARATE")
req = Request()
req.code = defines.Codes.GET.number
req.proxy_uri = "coap://127.0.0.1:5684/separate"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
expected = Response()
expected.type = defines.Types["CON"]
expected._mid = None
expected.code = defines.Codes.CONTENT.number
expected.token = None
expected.max_age = 60
exchange1 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.POST.number
req.proxy_uri = "coap://127.0.0.1:5684/separate"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = "POST"
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.CHANGED.number
expected.token = None
expected.options = None
exchange2 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.PUT.number
req.proxy_uri = "coap://127.0.0.1:5684/separate"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = "PUT"
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.CHANGED.number
expected.token = None
expected.options = None
exchange3 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.DELETE.number
req.proxy_uri = "coap://127.0.0.1:5684/separate"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.DELETED.number
expected.token = None
exchange4 = (req, expected)
self.current_mid += 1
self._test_with_client([exchange1, exchange2, exchange3, exchange4])
def test_post(self):
print("TEST_POST")
req = Request()
req.code = defines.Codes.POST.number
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = "test"
req.add_if_none_match()
req.proxy_uri = "coap://127.0.0.1:5684/storage/new_res?id=1"
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.CREATED.number
expected.token = None
expected.payload = None
expected.location_path = "storage/new_res"
expected.location_query = "id=1"
exchange1 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.GET.number
req.proxy_uri = "coap://127.0.0.1:5684/storage/new_res"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.if_match = ["test", "not"]
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.CONTENT.number
expected.token = None
expected.payload = "test"
exchange2 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.PUT.number
req.proxy_uri = "coap://127.0.0.1:5684/storage/new_res"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.if_match = ["not"]
req.payload = "not"
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.PRECONDITION_FAILED.number
expected.token = None
exchange3 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.POST.number
req.proxy_uri = "coap://127.0.0.1:5684/storage/new_res"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.if_match = ["not"]
req.payload = "not"
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.PRECONDITION_FAILED.number
expected.token = None
exchange4 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.PUT.number
req.proxy_uri = "coap://127.0.0.1:5684/storage/new_res"
req._mid = self.current_mid
req.destination = self.server_address
req.add_if_none_match()
req.payload = "not"
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.PRECONDITION_FAILED.number
expected.token = None
exchange5 = (req, expected)
self.current_mid += 1
self._test_with_client([exchange1, exchange2, exchange3, exchange4, exchange5])
def test_post_block(self):
print("TEST_POST_BLOCK")
req = Request()
req.code = defines.Codes.POST.number
req.proxy_uri = "coap://127.0.0.1:5684/storage/new_res"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras sollicitudin fermentum ornare. " \
"Cras accumsan tellus quis dui lacinia eleifend. Proin ultrices rutrum orci vitae luctus. " \
"Nullam malesuada pretium elit, at aliquam odio vehicula in. Etiam nec maximus elit. " \
"Etiam at erat ac ex ornare feugiat. Curabitur sed malesuada orci, id aliquet nunc. Phasellus " \
"nec leo luctus, blandit lorem sit amet, interdum metus. Duis efficitur volutpat magna, ac " \
"ultricies nibh aliquet sit amet. Etiam tempor egestas augue in hendrerit. Nunc eget augue " \
"ultricies, dignissim lacus et, vulputate dolor. Nulla eros odio, fringilla vel massa ut, " \
"facilisis cursus quam. Fusce faucibus lobortis congue. Fusce consectetur porta neque, id " \
"sollicitudin velit maximus eu. Sed pharetra leo quam, vel finibus turpis cursus ac. " \
"Aenean ac nisi massa. Cras commodo arcu nec ante tristique ullamcorper. Quisque eu hendrerit" \
" urna. Cras fringilla eros ut nunc maximus, non porta nisl mollis. Aliquam in rutrum massa." \
" Praesent tristique turpis dui, at ultri"
req.block1 = (1, 1, 1024)
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = None
expected.code = defines.Codes.REQUEST_ENTITY_INCOMPLETE.number
expected.token = None
expected.payload = None
exchange1 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.POST.number
req.proxy_uri = "coap://127.0.0.1:5684/storage/new_res"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras sollicitudin fermentum ornare. " \
"Cras accumsan tellus quis dui lacinia eleifend. Proin ultrices rutrum orci vitae luctus. " \
"Nullam malesuada pretium elit, at aliquam odio vehicula in. Etiam nec maximus elit. " \
"Etiam at erat ac ex ornare feugiat. Curabitur sed malesuada orci, id aliquet nunc. Phasellus " \
"nec leo luctus, blandit lorem sit amet, interdum metus. Duis efficitur volutpat magna, ac " \
"ultricies nibh aliquet sit amet. Etiam tempor egestas augue in hendrerit. Nunc eget augue " \
"ultricies, dignissim lacus et, vulputate dolor. Nulla eros odio, fringilla vel massa ut, " \
"facilisis cursus quam. Fusce faucibus lobortis congue. Fusce consectetur porta neque, id " \
"sollicitudin velit maximus eu. Sed pharetra leo quam, vel finibus turpis cursus ac. " \
"Aenean ac nisi massa. Cras commodo arcu nec ante tristique ullamcorper. Quisque eu hendrerit" \
" urna. Cras fringilla eros ut nunc maximus, non porta nisl mollis. Aliquam in rutrum massa." \
" Praesent tristique turpis dui, at ultri"
req.block1 = (0, 1, 1024)
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = None
expected.code = defines.Codes.CONTINUE.number
expected.token = None
expected.payload = None
expected.block1 = (0, 1, 1024)
exchange2 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.POST.number
req.proxy_uri = "coap://127.0.0.1:5684/storage/new_res"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = "a imperdiet nisl. Quisque a iaculis libero, id tempus lacus. Aenean convallis est non justo " \
"consectetur, a hendrerit enim consequat. In accumsan ante a egestas luctus. Etiam quis neque " \
"nec eros vestibulum faucibus. Nunc viverra ipsum lectus, vel scelerisque dui dictum a. Ut orci " \
"enim, ultrices a ultrices nec, pharetra in quam. Donec accumsan sit amet eros eget fermentum."
req.block1 = (1, 1, 64)
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = None
expected.code = defines.Codes.CONTINUE.number
expected.token = None
expected.payload = None
expected.block1 = (1, 1, 64)
exchange3 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.POST.number
req.proxy_uri = "coap://127.0.0.1:5684/storage/new_res"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = "a imperdiet nisl. Quisque a iaculis libero, id tempus lacus. Aenean convallis est non justo " \
"consectetur, a hendrerit enim consequat. In accumsan ante a egestas luctus. Etiam quis neque " \
"nec eros vestibulum faucibus. Nunc viverra ipsum lectus, vel scelerisque dui dictum a. Ut orci " \
"enim, ultrices a ultrices nec, pharetra in quam. Donec accumsan sit amet eros eget fermentum."
req.block1 = (3, 1, 64)
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = None
expected.code = defines.Codes.REQUEST_ENTITY_INCOMPLETE.number
expected.token = None
expected.payload = None
exchange4 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.POST.number
req.proxy_uri = "coap://127.0.0.1:5684/storage/new_res"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = "a imperdiet nisl. Quisque a iaculis libero, id tempus lacus. Aenean convallis est non justo " \
"consectetur, a hendrerit enim consequat. In accumsan ante a egestas luctus. Etiam quis neque " \
"nec eros vestibulum faucibus. Nunc viverra ipsum lectus, vel scelerisque dui dictum a. Ut orci " \
"enim, ultrices a ultrices nec, pharetra in quam. Donec accumsan sit amet eros eget fermentum."
req.block1 = (2, 0, 64)
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = None
expected.code = defines.Codes.CREATED.number
expected.token = None
expected.payload = None
expected.location_path = "storage/new_res"
exchange5 = (req, expected)
self.current_mid += 1
self._test_plugtest([exchange1, exchange2, exchange3, exchange4, exchange5])
def test_get_block(self):
print("TEST_GET_BLOCK")
req = Request()
req.code = defines.Codes.GET.number
req.proxy_uri = "coap://127.0.0.1:5684/big"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = None
req.block2 = (0, 0, 512)
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = None
expected.code = defines.Codes.CONTENT.number
expected.token = None
expected.payload = None
expected.block2 = (0, 1, 512)
expected.size2 = 2041
exchange1 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.GET.number
req.proxy_uri = "coap://127.0.0.1:5684/big"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = None
req.block2 = (1, 0, 256)
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = None
expected.code = defines.Codes.CONTENT.number
expected.token = None
expected.payload = None
expected.block2 = (1, 1, 256)
expected.size2 = 2041
exchange2 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.GET.number
req.proxy_uri = "coap://127.0.0.1:5684/big"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = None
req.block2 = (2, 0, 128)
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = None
expected.code = defines.Codes.CONTENT.number
expected.token = None
expected.payload = None
expected.block2 = (2, 1, 128)
expected.size2 = 2041
exchange3 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.GET.number
req.proxy_uri = "coap://127.0.0.1:5684/big"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = None
req.block2 = (3, 0, 64)
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = None
expected.code = defines.Codes.CONTENT.number
expected.token = None
expected.payload = None
expected.block2 = (3, 1, 64)
expected.size2 = 2041
exchange4 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.GET.number
req.proxy_uri = "coap://127.0.0.1:5684/big"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = None
req.block2 = (4, 0, 32)
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = None
expected.code = defines.Codes.CONTENT.number
expected.token = None
expected.payload = None
expected.block2 = (4, 1, 32)
expected.size2 = 2041
exchange5 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.GET.number
req.proxy_uri = "coap://127.0.0.1:5684/big"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = None
req.block2 = (5, 0, 16)
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = None
expected.code = defines.Codes.CONTENT.number
expected.token = None
expected.payload = None
expected.block2 = (5, 1, 16)
expected.size2 = 2041
exchange6 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.GET.number
req.proxy_uri = "coap://127.0.0.1:5684/big"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = None
req.block2 = (6, 0, 1024)
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = None
expected.code = defines.Codes.CONTENT.number
expected.token = None
expected.payload = None
expected.block2 = (6, 0, 1024)
expected.size2 = 2041
exchange7 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.GET.number
req.proxy_uri = "coap://127.0.0.1:5684/big"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = None
req.block2 = (7, 0, 1024)
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = None
expected.code = defines.Codes.CONTENT.number
expected.token = None
expected.payload = None
expected.block2 = (7, 0, 1024)
expected.size2 = 2041
exchange8 = (req, expected)
self.current_mid += 1
self._test_plugtest([exchange1, exchange2, exchange3, exchange4, exchange5, exchange6, exchange7, exchange8])
def test_post_block_big(self):
print("TEST_POST_BLOCK_BIG")
req = Request()
req.code = defines.Codes.POST.number
req.proxy_uri = "coap://127.0.0.1:5684/big"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = "Lorem ipsum dolo"
req.block1 = (0, 1, 16)
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = None
expected.code = defines.Codes.CONTINUE.number
expected.token = None
expected.payload = None
expected.block1 = (0, 1, 16)
exchange1 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.POST.number
req.proxy_uri = "coap://127.0.0.1:5684/big"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = "r sit amet, consectetur adipisci"
req.block1 = (1, 1, 32)
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = None
expected.code = defines.Codes.CONTINUE.number
expected.token = None
expected.payload = None
expected.block1 = (1, 1, 32)
exchange2 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.POST.number
req.proxy_uri = "coap://127.0.0.1:5684/big"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = "ng elit. Sed ut ultrices ligula. Pellentesque purus augue, cursu"
req.block1 = (2, 1, 64)
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = None
expected.code = defines.Codes.CONTINUE.number
expected.token = None
expected.payload = None
expected.block1 = (2, 1, 64)
exchange3 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.POST.number
req.proxy_uri = "coap://127.0.0.1:5684/big"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = "s ultricies est in, vehicula congue metus. Vestibulum vel justo lacinia, porttitor quam vitae, " \
"feugiat sapien. Quisque finibus, "
req.block1 = (3, 1, 128)
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = None
expected.code = defines.Codes.CONTINUE.number
expected.token = None
expected.payload = None
expected.block1 = (3, 1, 128)
exchange4 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.POST.number
req.proxy_uri = "coap://127.0.0.1:5684/big"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = "nisi vitae rhoncus malesuada, augue mauris dapibus tellus, sit amet venenatis libero" \
" libero sed lorem. In pharetra turpis sed eros porta mollis. Quisque dictum dolor nisl," \
" imperdiet tincidunt augue malesuada vitae. Donec non felis urna. Suspendisse at hend"
req.block1 = (4, 1, 256)
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = None
expected.code = defines.Codes.CONTINUE.number
expected.token = None
expected.payload = None
expected.block1 = (4, 1, 256)
exchange5 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.POST.number
req.proxy_uri = "coap://127.0.0.1:5684/big"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = "rerit ex, quis aliquet ante. Vivamus ultrices dolor at elit tincidunt, eget fringilla " \
"ligula vestibulum. In molestie sagittis nibh, ut efficitur tellus faucibus non. Maecenas " \
"posuere elementum faucibus. Morbi nisi diam, molestie non feugiat et, elementum eget magna." \
" Donec vel sem facilisis quam viverra ultrices nec eu lacus. Sed molestie nisi id ultrices " \
"interdum. Curabitur pharetra sed tellus in dignissim. Duis placerat aliquam metus, volutpat " \
"elementum augue aliquam a. Nunc sed dolor at orci maximus portt"
req.block1 = (5, 1, 512)
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = None
expected.code = defines.Codes.CONTINUE.number
expected.token = None
expected.payload = None
expected.block1 = (5, 1, 512)
exchange6 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.POST.number
req.proxy_uri = "coap://127.0.0.1:5684/big"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = "itor ac sit amet eros. Mauris et nisi in tortor pharetra rhoncus sit amet hendrerit metus. " \
"Integer laoreet placerat cursus. Nam a nulla ex. Donec laoreet sagittis libero quis " \
"imperdiet. Vivamus facilisis turpis nec rhoncus venenatis. Duis pulvinar tellus vel quam " \
"maximus imperdiet. Mauris eget nibh orci. Duis ut cursus nibh. Nulla sed commodo elit. " \
"Suspendisse ac eros lacinia, mattis turpis at, porttitor justo. Vivamus molestie " \
"tincidunt libero. Etiam porttitor lacus odio, at lobortis tortor scelerisque nec. " \
"Nullam non ante vel nisi ultrices consectetur. Maecenas massa felis, tempor eget " \
"malesuada eget, pretium eu sapien. Vivamus dapibus ante erat, non faucibus orci sodales " \
"sit amet. Cras magna felis, sodales eget magna sed, eleifend rutrum ligula. Vivamus interdum " \
"enim enim, eu facilisis tortor dignissim quis. Ut metus nulla, mattis non lorem et, " \
"elementum ultrices orci. Quisque eleifend, arcu vitae ullamcorper pulvinar, ipsum ex " \
"sodales arcu, eget consectetur mauris metus ac tortor. Donec id sem felis. Maur"
req.block1 = (6, 0, 1024)
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = None
expected.code = defines.Codes.CHANGED.number
expected.token = None
expected.payload = None
exchange7 = (req, expected)
self.current_mid += 1
self._test_plugtest([exchange1, exchange2, exchange3, exchange4, exchange5, exchange6, exchange7])
def test_options(self):
print("TEST_OPTIONS")
path = "/storage/new_res"
req = Request()
req.code = defines.Codes.POST.number
req.proxy_uri = "coap://127.0.0.1:5684/storage/new_res"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
option = Option()
option.number = defines.OptionRegistry.ETAG.number
option.value = "test"
req.add_option(option)
req.del_option(option)
req.payload = "test"
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.CREATED.number
expected.token = None
expected.payload = None
expected.location_path = "storage/new_res"
exchange1 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.POST.number
req.proxy_uri = "coap://127.0.0.1:5684/storage/new_res"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
option = Option()
option.number = defines.OptionRegistry.ETAG.number
option.value = "test"
req.add_option(option)
req.del_option_by_name("ETag")
req.payload = "test"
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.CREATED.number
expected.token = None
expected.payload = None
expected.location_path = "storage/new_res"
exchange2 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.POST.number
req.proxy_uri = "coap://127.0.0.1:5684/storage/new_res"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
option = Option()
option.number = defines.OptionRegistry.ETAG.number
option.value = "test"
req.add_option(option)
del req.etag
req.payload = "test"
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.CREATED.number
expected.token = None
expected.payload = None
expected.location_path = "storage/new_res"
exchange3 = (req, expected)
self.current_mid += 1
self._test_with_client([exchange1, exchange2, exchange3])
def test_content_type(self):
print("TEST_CONTENT_TYPE")
req = Request()
req.code = defines.Codes.POST.number
req.proxy_uri = "coap://127.0.0.1:5684/storage/new_res"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = "<value>test</value>"
req.content_type = defines.Content_types["application/xml"]
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.CREATED.number
expected.token = None
expected.payload = None
expected.location_path = "storage/new_res"
exchange1 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.GET.number
req.proxy_uri = "coap://127.0.0.1:5684/storage/new_res"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.CONTENT.number
expected.token = None
expected.payload = "Basic Resource"
exchange2 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.PUT.number
req.proxy_uri = "coap://127.0.0.1:5684/storage/new_res"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = "test"
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.CHANGED.number
expected.token = None
expected.payload = None
exchange3 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.GET.number
req.proxy_uri = "coap://127.0.0.1:5684/storage/new_res"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.CONTENT.number
expected.token = None
expected.payload = "test"
exchange4 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.GET.number
req.proxy_uri = "coap://127.0.0.1:5684/storage/new_res"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.accept = defines.Content_types["application/xml"]
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.CONTENT.number
expected.token = None
expected.payload = "<value>test</value>"
exchange5 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.GET.number
req.proxy_uri = "coap://127.0.0.1:5684/storage/new_res"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.accept = defines.Content_types["application/json"]
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.NOT_ACCEPTABLE.number
expected.token = None
expected.payload = None
# expected.content_type = defines.Content_types["application/json"]
exchange6 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.GET.number
req.proxy_uri = "coap://127.0.0.1:5684/xml"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.CONTENT.number
expected.token = None
expected.payload = "<value>0</value>"
expected.content_type = defines.Content_types["application/xml"]
print(expected.pretty_print())
exchange7 = (req, expected)
self.current_mid += 1
self._test_with_client([exchange1, exchange2, exchange3, exchange4, exchange5, exchange6, exchange7])
def test_ETAG(self):
print("TEST_ETAG")
req = Request()
req.code = defines.Codes.GET.number
req.proxy_uri = "coap://127.0.0.1:5684/etag"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.CONTENT.number
expected.token = None
expected.payload = "ETag resource"
expected.etag = "0"
exchange1 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.POST.number
req.proxy_uri = "coap://127.0.0.1:5684/etag"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = "test"
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.CHANGED.number
expected.token = None
expected.payload = None
expected.etag = "1"
exchange2 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.GET.number
req.proxy_uri = "coap://127.0.0.1:5684/etag"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.etag = "1"
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.VALID.number
expected.token = None
expected.payload = None
expected.etag = "1"
exchange3 = (req, expected)
self.current_mid += 1
self._test_with_client([exchange1, exchange2, exchange3])
def test_child(self):
print("TEST_CHILD")
req = Request()
req.code = defines.Codes.POST.number
req.proxy_uri = "coap://127.0.0.1:5684/child"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = "test"
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.CREATED.number
expected.token = None
expected.payload = None
expected.location_path = "child"
exchange1 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.GET.number
req.proxy_uri = "coap://127.0.0.1:5684/child"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.CONTENT.number
expected.token = None
expected.payload = "test"
exchange2 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.PUT.number
req.proxy_uri = "coap://127.0.0.1:5684/child"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = "testPUT"
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.CHANGED.number
expected.token = None
expected.payload = None
exchange3 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.DELETE.number
req.proxy_uri = "coap://127.0.0.1:5684/child"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.DELETED.number
expected.token = None
expected.payload = None
exchange4 = (req, expected)
self.current_mid += 1
self._test_with_client([exchange1, exchange2, exchange3, exchange4])
def test_not_found(self):
print("TEST_not_found")
req = Request()
req.code = defines.Codes.GET.number
req.proxy_uri = "coap://127.0.0.1:5684/not_found"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.token = 100
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.NOT_FOUND.number
expected.token = 100
expected.payload = None
exchange1 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.POST.number
req.proxy_uri = "coap://127.0.0.1:5684/not_found"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = "testPOST"
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.METHOD_NOT_ALLOWED.number
expected.token = None
exchange2 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.PUT.number
req.proxy_uri = "coap://127.0.0.1:5684/not_found"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = "testPUT"
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.NOT_FOUND.number
expected.token = None
expected.payload = None
exchange3 = (req, expected)
self.current_mid += 1
req = Request()
req.code = defines.Codes.DELETE.number
req.proxy_uri = "coap://127.0.0.1:5684/not_found"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.NOT_FOUND.number
expected.token = None
expected.payload = None
exchange4 = (req, expected)
self.current_mid += 1
self._test_with_client([exchange1, exchange2, exchange3, exchange4])
def test_invalid(self):
print("TEST_INVALID")
# version
req = (b'\x00\x01\x8c\xda', self.server_address)
expected = Response()
expected.type = defines.Types["RST"]
expected._mid = None
expected.code = defines.Codes.BAD_REQUEST.number
exchange1 = (req, expected)
# version
req = (b'\x40', self.server_address)
expected = Response()
expected.type = defines.Types["RST"]
expected._mid = None
expected.code = defines.Codes.BAD_REQUEST.number
exchange2 = (req, expected)
# code
req = (b'\x40\x05\x8c\xda', self.server_address)
expected = Response()
expected.type = defines.Types["RST"]
expected._mid = None
expected.code = defines.Codes.BAD_REQUEST.number
exchange3 = (req, expected)
# option
req = (b'\x40\x01\x8c\xda\x94', self.server_address)
expected = Response()
expected.type = defines.Types["RST"]
expected._mid = None
expected.code = defines.Codes.BAD_REQUEST.number
exchange4 = (req, expected)
# payload marker
req = (b'\x40\x02\x8c\xda\x75\x62\x61\x73\x69\x63\xff', self.server_address)
expected = Response()
expected.type = defines.Types["RST"]
expected._mid = None
expected.code = defines.Codes.BAD_REQUEST.number
exchange5 = (req, expected)
self._test_datagram([exchange1, exchange2, exchange3, exchange4, exchange5])
def test_post_block_big_client(self):
print("TEST_POST_BLOCK_BIG_CLIENT")
req = Request()
req.code = defines.Codes.POST.number
req.proxy_uri = "coap://127.0.0.1:5684/big"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.payload = "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Cras sollicitudin fermentum ornare. " \
"Cras accumsan tellus quis dui lacinia eleifend. Proin ultrices rutrum orci vitae luctus. " \
"Nullam malesuada pretium elit, at aliquam odio vehicula in. Etiam nec maximus elit. " \
"Etiam at erat ac ex ornare feugiat. Curabitur sed malesuada orci, id aliquet nunc. Phasellus " \
"nec leo luctus, blandit lorem sit amet, interdum metus. Duis efficitur volutpat magna, ac " \
"ultricies nibh aliquet sit amet. Etiam tempor egestas augue in hendrerit. Nunc eget augue " \
"ultricies, dignissim lacus et, vulputate dolor. Nulla eros odio, fringilla vel massa ut, " \
"facilisis cursus quam. Fusce faucibus lobortis congue. Fusce consectetur porta neque, id " \
"sollicitudin velit maximus eu. Sed pharetra leo quam, vel finibus turpis cursus ac. " \
"Aenean ac nisi massa. Cras commodo arcu nec ante tristique ullamcorper. Quisque eu hendrerit" \
" urna. Cras fringilla eros ut nunc maximus, non porta nisl mollis. Aliquam in rutrum massa." \
" Praesent tristique turpis dui, at ultricies lorem fermentum at. Vivamus sit amet ornare neque, " \
"a imperdiet nisl. Quisque a iaculis libero, id tempus lacus. Aenean convallis est non justo " \
"consectetur, a hendrerit enim consequat. In accumsan ante a egestas luctus. Etiam quis neque " \
"nec eros vestibulum faucibus. Nunc viverra ipsum lectus, vel scelerisque dui dictum a. Ut orci " \
"enim, ultrices a ultrices nec, pharetra in quam. Donec accumsan sit amet eros eget fermentum." \
"Vivamus ut odio ac odio malesuada accumsan. Aenean vehicula diam at tempus ornare. Phasellus " \
"dictum mauris a mi consequat, vitae mattis nulla fringilla. Ut laoreet tellus in nisl efficitur," \
" a luctus justo tempus. Fusce finibus libero eget velit finibus iaculis. Morbi rhoncus purus " \
"vel vestibulum ullamcorper. Sed ac metus in urna fermentum feugiat. Nulla nunc diam, sodales " \
"aliquam mi id, varius porta nisl. Praesent vel nibh ac turpis rutrum laoreet at non odio. " \
"Phasellus ut posuere mi. Suspendisse malesuada velit nec mauris convallis porta. Vivamus " \
"sed ultrices sapien, at cras amet."
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = None
expected.code = defines.Codes.CHANGED.number
expected.token = None
expected.payload = None
exchange1 = (req, expected)
self.current_mid += 1
self._test_with_client([exchange1])
def test_observe_client(self):
print("TEST_OBSERVE_CLIENT")
req = Request()
req.code = defines.Codes.GET.number
req.proxy_uri = "coap://127.0.0.1:5684/basic"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
req.observe = 0
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = None
expected.code = defines.Codes.CONTENT.number
expected.token = None
expected.payload = None
exchange1 = (req, expected)
self.current_mid += 1
self._test_with_client_observe([exchange1])
def test_duplicate(self):
print("TEST_DUPLICATE")
req = Request()
req.code = defines.Codes.GET.number
req.proxy_uri = "coap://127.0.0.1:5684/basic"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = defines.Codes.CONTENT.number
expected.token = None
self.current_mid += 1
self._test_plugtest([(req, expected), (req, expected)])
def test_duplicate_not_completed(self):
print("TEST_DUPLICATE_NOT_COMPLETED")
req = Request()
req.code = defines.Codes.GET.number
req.proxy_uri = "coap://127.0.0.1:5684/long"
req.type = defines.Types["CON"]
req._mid = self.current_mid
req.destination = self.server_address
expected = Response()
expected.type = defines.Types["ACK"]
expected._mid = self.current_mid
expected.code = None
expected.token = None
expected2 = Response()
expected2.type = defines.Types["CON"]
expected2._mid = None
expected2.code = defines.Codes.CONTENT.number
expected2.token = None
self.current_mid += 1
self._test_plugtest([(req, None), (req, expected), (None, expected2)])
if __name__ == '__main__':
unittest.main()