test: update example and unit tests with new import roles:

tiny_test_fw is a python package now. import it using normal way.
This commit is contained in:
He Yin Ling
2019-11-27 11:58:07 +08:00
parent 4d45932c5e
commit c906e2afee
61 changed files with 1283 additions and 864 deletions

View File

@@ -0,0 +1,106 @@
#!/usr/bin/env python
#
# Copyright 2018 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from __future__ import unicode_literals
from builtins import str
from builtins import range
import http.client
import argparse
from tiny_test_fw import Utility
def start_session(ip, port):
return http.client.HTTPConnection(ip, int(port), timeout=15)
def end_session(conn):
conn.close()
def getreq(conn, path, verbose=False):
conn.request("GET", path)
resp = conn.getresponse()
data = resp.read()
if verbose:
Utility.console_log("GET : " + path)
Utility.console_log("Status : " + resp.status)
Utility.console_log("Reason : " + resp.reason)
Utility.console_log("Data length : " + str(len(data)))
Utility.console_log("Data content : " + data)
return data
def postreq(conn, path, data, verbose=False):
conn.request("POST", path, data)
resp = conn.getresponse()
data = resp.read()
if verbose:
Utility.console_log("POST : " + data)
Utility.console_log("Status : " + resp.status)
Utility.console_log("Reason : " + resp.reason)
Utility.console_log("Data length : " + str(len(data)))
Utility.console_log("Data content : " + data)
return data
def putreq(conn, path, body, verbose=False):
conn.request("PUT", path, body)
resp = conn.getresponse()
data = resp.read()
if verbose:
Utility.console_log("PUT : " + path, body)
Utility.console_log("Status : " + resp.status)
Utility.console_log("Reason : " + resp.reason)
Utility.console_log("Data length : " + str(len(data)))
Utility.console_log("Data content : " + data)
return data
if __name__ == '__main__':
# Configure argument parser
parser = argparse.ArgumentParser(description='Run HTTPd Test')
parser.add_argument('IP', metavar='IP', type=str, help='Server IP')
parser.add_argument('port', metavar='port', type=str, help='Server port')
parser.add_argument('N', metavar='integer', type=int, help='Integer to sum upto')
args = vars(parser.parse_args())
# Get arguments
ip = args['IP']
port = args['port']
N = args['N']
# Establish HTTP connection
Utility.console_log("Connecting to => " + ip + ":" + port)
conn = start_session(ip, port)
# Reset adder context to specified value(0)
# -- Not needed as new connection will always
# -- have zero value of the accumulator
Utility.console_log("Reset the accumulator to 0")
putreq(conn, "/adder", str(0))
# Sum numbers from 1 to specified value(N)
Utility.console_log("Summing numbers from 1 to " + str(N))
for i in range(1, N + 1):
postreq(conn, "/adder", str(i))
# Fetch the result
Utility.console_log("Result :" + getreq(conn, "/adder"))
# Close HTTP connection
end_session(conn)

View File

@@ -0,0 +1,268 @@
#!/usr/bin/env python
#
# Copyright 2018 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
from __future__ import unicode_literals
from builtins import str
import http.client
import argparse
from tiny_test_fw import Utility
def verbose_print(verbosity, *args):
if (verbosity):
Utility.console_log(''.join(str(elems) for elems in args))
def test_val(text, expected, received):
if expected != received:
Utility.console_log(" Fail!")
Utility.console_log(" [reason] " + text + ":")
Utility.console_log(" expected: " + str(expected))
Utility.console_log(" received: " + str(received))
return False
return True
def test_get_handler(ip, port, verbosity=False):
verbose_print(verbosity, "======== GET HANDLER TEST =============")
# Establish HTTP connection
verbose_print(verbosity, "Connecting to => " + ip + ":" + port)
sess = http.client.HTTPConnection(ip + ":" + port, timeout=15)
uri = "/hello?query1=value1&query2=value2&query3=value3"
# GET hello response
test_headers = {"Test-Header-1":"Test-Value-1", "Test-Header-2":"Test-Value-2"}
verbose_print(verbosity, "Sending GET to URI : ", uri)
verbose_print(verbosity, "Sending additional headers : ")
for k, v in test_headers.items():
verbose_print(verbosity, "\t", k, ": ", v)
sess.request("GET", url=uri, headers=test_headers)
resp = sess.getresponse()
resp_hdrs = resp.getheaders()
resp_data = resp.read().decode()
# Close HTTP connection
sess.close()
if not (
test_val("Status code mismatch", 200, resp.status) and
test_val("Response mismatch", "Custom-Value-1", resp.getheader("Custom-Header-1")) and
test_val("Response mismatch", "Custom-Value-2", resp.getheader("Custom-Header-2")) and
test_val("Response mismatch", "Hello World!", resp_data)
):
return False
verbose_print(verbosity, "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv")
verbose_print(verbosity, "Server response to GET /hello")
verbose_print(verbosity, "Response Headers : ")
for k, v in resp_hdrs:
verbose_print(verbosity, "\t", k, ": ", v)
verbose_print(verbosity, "Response Data : " + resp_data)
verbose_print(verbosity, "========================================\n")
return True
def test_post_handler(ip, port, msg, verbosity=False):
verbose_print(verbosity, "======== POST HANDLER TEST ============")
# Establish HTTP connection
verbose_print(verbosity, "Connecting to => " + ip + ":" + port)
sess = http.client.HTTPConnection(ip + ":" + port, timeout=15)
# POST message to /echo and get back response
sess.request("POST", url="/echo", body=msg)
resp = sess.getresponse()
resp_data = resp.read().decode()
verbose_print(verbosity, "Server response to POST /echo (" + msg + ")")
verbose_print(verbosity, "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv")
verbose_print(verbosity, resp_data)
verbose_print(verbosity, "========================================\n")
# Close HTTP connection
sess.close()
return test_val("Response mismatch", msg, resp_data)
def test_put_handler(ip, port, verbosity=False):
verbose_print(verbosity, "======== PUT HANDLER TEST =============")
# Establish HTTP connection
verbose_print(verbosity, "Connecting to => " + ip + ":" + port)
sess = http.client.HTTPConnection(ip + ":" + port, timeout=15)
# PUT message to /ctrl to disable /hello and /echo URI handlers
# and set 404 error handler to custom http_404_error_handler()
verbose_print(verbosity, "Disabling /hello and /echo handlers")
sess.request("PUT", url="/ctrl", body="0")
resp = sess.getresponse()
resp.read()
try:
# Send HTTP request to /hello URI
sess.request("GET", url="/hello")
resp = sess.getresponse()
resp_data = resp.read().decode()
# 404 Error must be returned from server as URI /hello is no longer available.
# But the custom error handler http_404_error_handler() will not close the
# session if the requested URI is /hello
if not test_val("Status code mismatch", 404, resp.status):
raise AssertionError
# Compare error response string with expectation
verbose_print(verbosity, "Response on GET /hello : " + resp_data)
if not test_val("Response mismatch", "/hello URI is not available", resp_data):
raise AssertionError
# Using same session for sending an HTTP request to /echo, as it is expected
# that the custom error handler http_404_error_handler() would not have closed
# the session
sess.request("POST", url="/echo", body="Some content")
resp = sess.getresponse()
resp_data = resp.read().decode()
# 404 Error must be returned from server as URI /hello is no longer available.
# The custom error handler http_404_error_handler() will close the session
# this time as the requested URI is /echo
if not test_val("Status code mismatch", 404, resp.status):
raise AssertionError
# Compare error response string with expectation
verbose_print(verbosity, "Response on POST /echo : " + resp_data)
if not test_val("Response mismatch", "/echo URI is not available", resp_data):
raise AssertionError
try:
# Using same session should fail as by now the session would have closed
sess.request("POST", url="/hello", body="Some content")
resp = sess.getresponse()
resp.read().decode()
# If control reaches this point then the socket was not closed.
# This is not expected
verbose_print(verbosity, "Socket not closed by server")
raise AssertionError
except http.client.HTTPException:
# Catch socket error as we tried to communicate with an already closed socket
pass
except http.client.HTTPException:
verbose_print(verbosity, "Socket closed by server")
return False
except AssertionError:
return False
finally:
# Close HTTP connection
sess.close()
verbose_print(verbosity, "Enabling /hello handler")
# Create new connection
sess = http.client.HTTPConnection(ip + ":" + port, timeout=15)
# PUT message to /ctrl to enable /hello URI handler
# and restore 404 error handler to default
sess.request("PUT", url="/ctrl", body="1")
resp = sess.getresponse()
resp.read()
# Close HTTP connection
sess.close()
# Create new connection
sess = http.client.HTTPConnection(ip + ":" + port, timeout=15)
try:
# Sending HTTP request to /hello should work now
sess.request("GET", url="/hello")
resp = sess.getresponse()
resp_data = resp.read().decode()
if not test_val("Status code mismatch", 200, resp.status):
raise AssertionError
verbose_print(verbosity, "Response on GET /hello : " + resp_data)
if not test_val("Response mismatch", "Hello World!", resp_data):
raise AssertionError
# 404 Error handler should have been restored to default
sess.request("GET", url="/invalid")
resp = sess.getresponse()
resp_data = resp.read().decode()
if not test_val("Status code mismatch", 404, resp.status):
raise AssertionError
verbose_print(verbosity, "Response on GET /invalid : " + resp_data)
if not test_val("Response mismatch", "This URI does not exist", resp_data):
raise AssertionError
except http.client.HTTPException:
verbose_print(verbosity, "Socket closed by server")
return False
except AssertionError:
return False
finally:
# Close HTTP connection
sess.close()
return True
def test_custom_uri_query(ip, port, query, verbosity=False):
verbose_print(verbosity, "======== GET HANDLER TEST =============")
# Establish HTTP connection
verbose_print(verbosity, "Connecting to => " + ip + ":" + port)
sess = http.client.HTTPConnection(ip + ":" + port, timeout=15)
uri = "/hello?" + query
# GET hello response
verbose_print(verbosity, "Sending GET to URI : ", uri)
sess.request("GET", url=uri, headers={})
resp = sess.getresponse()
resp_data = resp.read().decode()
verbose_print(verbosity, "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv")
verbose_print(verbosity, "Server response to GET /hello")
verbose_print(verbosity, "Response Data : " + resp_data)
verbose_print(verbosity, "========================================\n")
# Close HTTP connection
sess.close()
return "Hello World!" == resp_data
if __name__ == '__main__':
# Configure argument parser
parser = argparse.ArgumentParser(description='Run HTTPd Test')
parser.add_argument('IP', metavar='IP', type=str, help='Server IP')
parser.add_argument('port', metavar='port', type=str, help='Server port')
parser.add_argument('msg', metavar='message', type=str, help='Message to be sent to server')
args = vars(parser.parse_args())
# Get arguments
ip = args['IP']
port = args['port']
msg = args['msg']
if not (
test_get_handler(ip, port, True) and
test_put_handler(ip, port, True) and
test_post_handler(ip, port, msg, True)
):
Utility.console_log("Failed!")

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,71 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Internal use only.
This file provide method to control programmable attenuator.
"""
import time
import serial
import codecs
def set_att(port, att, att_fix=False):
"""
set attenuation value on the attenuator
:param port: serial port for attenuator
:param att: attenuation value we want to set
:param att_fix: fix the deviation with experience value
:return: True or False
"""
assert 0 <= att <= 62
# fix att
if att_fix:
if att >= 33 and (att - 30 + 1) % 4 == 0:
att_t = att - 1
elif att >= 33 and (att - 30) % 4 == 0:
att_t = att + 1
else:
att_t = att
else:
att_t = att
serial_port = serial.Serial(port, baudrate=9600, rtscts=False, timeout=0.1)
if serial_port.isOpen() is False:
raise IOError("attenuator control, failed to open att port")
cmd_hex = "7e7e10{:02x}{:x}".format(att_t, 0x10 + att_t)
exp_res_hex = "7e7e20{:02x}00{:x}".format(att_t, 0x20 + att_t)
cmd = codecs.decode(cmd_hex, "hex")
exp_res = codecs.decode(exp_res_hex, "hex")
serial_port.write(cmd)
res = b""
for i in range(5):
res += serial_port.read(20)
if res == exp_res:
result = True
break
time.sleep(0.1)
else:
result = False
serial_port.close()
return result

View File

@@ -0,0 +1,50 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import matplotlib
# fix can't draw figure with docker
matplotlib.use('Agg')
import matplotlib.pyplot as plt # noqa: E402 - matplotlib.use('Agg') need to be before this
# candidate colors
LINE_STYLE_CANDIDATE = ['b-o', 'r-o', 'k-o', 'm-o', 'c-o', 'g-o', 'y-o',
'b-s', 'r-s', 'k-s', 'm-s', 'c-s', 'g-s', 'y-s']
def draw_line_chart(file_name, title, x_label, y_label, data_list):
"""
draw line chart and save to file.
:param file_name: abs/relative file name to save chart figure
:param title: chart title
:param x_label: x-axis label
:param y_label: y-axis label
:param data_list: a list of line data.
each line is a dict of ("x-axis": list, "y-axis": list, "label": string)
"""
plt.figure(figsize=(12, 6))
plt.grid(True)
for i, data in enumerate(data_list):
plt.plot(data["x-axis"], data["y-axis"], LINE_STYLE_CANDIDATE[i], label=data["label"])
plt.xlabel(x_label)
plt.ylabel(y_label)
plt.legend(fontsize=12)
plt.title(title)
plt.tight_layout(pad=3, w_pad=3, h_pad=3)
plt.savefig(file_name)
plt.close()

View File

@@ -0,0 +1,95 @@
# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Internal use only.
This file implements controlling APC PDU via telnet.
"""
import telnetlib
class Control(object):
""" control APC via telnet """
@classmethod
def apc_telnet_make_choice(cls, telnet, choice):
""" select a choice """
telnet.read_until(b"Event Log")
telnet.read_until(b">")
telnet.write(choice.encode() + b"\r\n")
@classmethod
def apc_telnet_common_action(cls, telnet, check_str, action):
""" wait until a pattern and then write a line """
telnet.read_until(check_str.encode())
telnet.write(action.encode() + b"\r\n")
@classmethod
def control(cls, apc_ip, control_dict):
"""
control APC
:param apc_ip: IP of APC
:param control_dict: dict with outlet ID and "ON" or "OFF"
"""
for _outlet in control_dict:
assert 0 < _outlet < 9
assert control_dict[_outlet] in ["ON", "OFF"]
# telnet
# set timeout as 2s so that it won't waste time even can't access APC
tn = telnetlib.Telnet(host=apc_ip, timeout=5)
# log on
cls.apc_telnet_common_action(tn, "User Name :", "apc")
cls.apc_telnet_common_action(tn, "Password :", "apc")
# go to Device Manager
cls.apc_telnet_make_choice(tn, "1")
# go to Outlet Management
cls.apc_telnet_make_choice(tn, "2")
# go to Outlet Control/Configuration
cls.apc_telnet_make_choice(tn, "1")
# do select Outlet and control
for _outlet in control_dict:
# choose Outlet
cls.apc_telnet_make_choice(tn, str(_outlet))
# choose Control Outlet
cls.apc_telnet_make_choice(tn, "1")
# choose action
_action = control_dict[_outlet]
if "ON" in _action:
cls.apc_telnet_make_choice(tn, "1")
else:
cls.apc_telnet_make_choice(tn, "2")
# do confirm
cls.apc_telnet_common_action(tn, "cancel :", "YES")
cls.apc_telnet_common_action(tn, "continue...", "")
# return to Outlet Control/Configuration
cls.apc_telnet_make_choice(tn, "\033")
cls.apc_telnet_make_choice(tn, "\033")
# exit to main menu and logout
tn.write(b"\033\r\n")
tn.write(b"\033\r\n")
tn.write(b"\033\r\n")
tn.write(b"4\r\n")
@classmethod
def control_rest(cls, apc_ip, outlet, action):
outlet_list = list(range(1, 9)) # has to be a list if we want to remove from it under Python 3
outlet_list.remove(outlet)
cls.control(apc_ip, dict.fromkeys(outlet_list, action))

View File

@@ -0,0 +1,245 @@
"""
this module generates markdown format test report for throughput test.
The test report contains 2 parts:
1. throughput with different configs
2. throughput with RSSI
"""
import os
class ThroughputForConfigsReport(object):
THROUGHPUT_TYPES = ["tcp_tx", "tcp_rx", "udp_tx", "udp_rx"]
REPORT_FILE_NAME = "ThroughputForConfigs.md"
def __init__(self, output_path, ap_ssid, throughput_results, sdkconfig_files):
"""
:param ap_ssid: the ap we expected to use
:param throughput_results: config with the following type::
{
"config_name": {
"tcp_tx": result,
"tcp_rx": result,
"udp_tx": result,
"udp_rx": result,
},
"config_name2": {},
}
"""
self.output_path = output_path
self.ap_ssid = ap_ssid
self.results = throughput_results
self.sdkconfigs = dict()
for config_name in sdkconfig_files:
self.sdkconfigs[config_name] = self._parse_config_file(sdkconfig_files[config_name])
if not os.path.exists(output_path):
os.makedirs(output_path)
self.sort_order = self.sdkconfigs.keys()
self.sort_order.sort()
@staticmethod
def _parse_config_file(config_file_path):
sdkconfig = {}
with open(config_file_path, "r") as f:
for line in f:
if not line.isspace():
if line[0] == "#":
continue
name, value = line.split("=")
value = value.strip("\r\n")
sdkconfig[name] = value if value else "n"
return sdkconfig
def _generate_the_difference_between_configs(self):
"""
generate markdown list for different configs::
default: esp-idf default
low:
* `config name 1`: old value -> new value
* `config name 2`: old value -> new value
* ...
...
"""
data = "## Config Definition:\r\n\r\n"
def find_difference(base, new):
_difference = {}
all_configs = set(base.keys())
all_configs.update(set(new.keys()))
for _config in all_configs:
try:
_base_value = base[_config]
except KeyError:
_base_value = "null"
try:
_new_value = new[_config]
except KeyError:
_new_value = "null"
if _base_value != _new_value:
_difference[_config] = "{} -> {}".format(_base_value, _new_value)
return _difference
for i, _config_name in enumerate(self.sort_order):
current_config = self.sdkconfigs[_config_name]
if i > 0:
previous_config_name = self.sort_order[i - 1]
previous_config = self.sdkconfigs[previous_config_name]
else:
previous_config = previous_config_name = None
if previous_config:
# log the difference
difference = find_difference(previous_config, current_config)
data += "* {} (compared to {}):\r\n".format(_config_name, previous_config_name)
for diff_name in difference:
data += " * `{}`: {}\r\n".format(diff_name, difference[diff_name])
return data
def _generate_report_for_one_type(self, throughput_type):
"""
generate markdown table with the following format::
| config name | throughput (Mbps) | free heap size (bytes) |
|-------------|-------------------|------------------------|
| default | 32.11 | 147500 |
| low | 32.11 | 147000 |
| medium | 33.22 | 120000 |
| high | 43.11 | 100000 |
| max | 45.22 | 79000 |
"""
empty = True
ret = "\r\n### {} {}\r\n\r\n".format(*throughput_type.split("_"))
ret += "| config name | throughput (Mbps) | free heap size (bytes) |\r\n"
ret += "|-------------|-------------------|------------------------|\r\n"
for config in self.sort_order:
try:
result = self.results[config][throughput_type]
throughput = "{:.02f}".format(max(result.throughput_by_att[self.ap_ssid].values()))
heap_size = str(result.heap_size)
# although markdown table will do alignment
# do align here for better text editor presentation
ret += "| {:<12}| {:<18}| {:<23}|\r\n".format(config, throughput, heap_size)
empty = False
except KeyError:
pass
return ret if not empty else ""
def generate_report(self):
data = "# Throughput for different configs\r\n"
data += "\r\nAP: {}\r\n".format(self.ap_ssid)
for throughput_type in self.THROUGHPUT_TYPES:
data += self._generate_report_for_one_type(throughput_type)
data += "\r\n------\r\n"
data += self._generate_the_difference_between_configs()
with open(os.path.join(self.output_path, self.REPORT_FILE_NAME), "w") as f:
f.write(data)
class ThroughputVsRssiReport(object):
REPORT_FILE_NAME = "ThroughputVsRssi.md"
def __init__(self, output_path, throughput_results):
"""
:param throughput_results: config with the following type::
{
"tcp_tx": result,
"tcp_rx": result,
"udp_tx": result,
"udp_rx": result,
}
"""
self.output_path = output_path
self.raw_data_path = os.path.join(output_path, "raw_data")
self.results = throughput_results
self.throughput_types = self.results.keys()
self.throughput_types.sort()
if not os.path.exists(self.raw_data_path):
os.makedirs(self.raw_data_path)
def _generate_summary(self):
"""
generate summary with the following format::
| item | curve analysis | max throughput (Mbps) |
|---------|----------------|-----------------------|
| tcp tx | Success | 32.11 |
| tcp rx | Success | 32.11 |
| udp tx | Success | 45.22 |
| udp rx | Failed | 55.44 |
"""
ret = "\r\n### Summary\r\n\r\n"
ret += "| item | curve analysis | max throughput (Mbps) |\r\n"
ret += "|---------|----------------|-----------------------|\r\n"
for _type in self.throughput_types:
result = self.results[_type]
max_throughput = 0.0
curve_analysis = "Failed" if result.error_list else "Success"
for ap_ssid in result.throughput_by_att:
_max_for_ap = max(result.throughput_by_rssi[ap_ssid].values())
if _max_for_ap > max_throughput:
max_throughput = _max_for_ap
max_throughput = "{:.02f}".format(max_throughput)
ret += "| {:<8}| {:<15}| {:<22}|\r\n".format("{}_{}".format(result.proto, result.direction),
curve_analysis, max_throughput)
return ret
def _generate_report_for_one_type(self, result):
"""
generate markdown table with the following format::
### tcp rx
Errors:
* detected error 1
* ...
AP: ap_ssid
![throughput Vs RSSI](path to figure)
AP: ap_ssid
![throughput Vs RSSI](path to figure)
"""
result.post_analysis()
ret = "\r\n### {} {}\r\n".format(result.proto, result.direction)
if result.error_list:
ret += "\r\nErrors:\r\n\r\n"
for error in result.error_list:
ret += "* " + error + "\r\n"
for ap_ssid in result.throughput_by_rssi:
ret += "\r\nAP: {}\r\n".format(ap_ssid)
# draw figure
file_name = result.draw_throughput_figure(self.raw_data_path, ap_ssid, "rssi")
result.draw_throughput_figure(self.raw_data_path, ap_ssid, "att")
result.draw_rssi_vs_att_figure(self.raw_data_path, ap_ssid)
ret += "\r\n![throughput Vs RSSI]({})\r\n".format(os.path.join("raw_data", file_name))
return ret
def generate_report(self):
data = "# Throughput Vs RSSI\r\n"
data += self._generate_summary()
for _type in self.throughput_types:
data += self._generate_report_for_one_type(self.results[_type])
with open(os.path.join(self.output_path, self.REPORT_FILE_NAME), "w") as f:
f.write(data)

View File

@@ -2,4 +2,3 @@ pyserial
pyyaml
junit_xml
netifaces
matplotlib

View File

@@ -0,0 +1,102 @@
# Copyright 2018 Espressif Systems (Shanghai) PTE LTD
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import dbus
import dbus.mainloop.glib
import netifaces
import time
def get_wiface_name():
for iface in netifaces.interfaces():
if iface.startswith('w'):
return iface
return None
def get_wiface_IPv4(iface):
try:
[info] = netifaces.ifaddresses(iface)[netifaces.AF_INET]
return info['addr']
except KeyError:
return None
class wpa_cli:
def __init__(self, iface, reset_on_exit=False):
self.iface_name = iface
self.iface_obj = None
self.iface_ifc = None
self.old_network = None
self.new_network = None
self.connected = False
self.reset_on_exit = reset_on_exit
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
service = dbus.Interface(bus.get_object("fi.w1.wpa_supplicant1", "/fi/w1/wpa_supplicant1"),
"fi.w1.wpa_supplicant1")
iface_path = service.GetInterface(self.iface_name)
self.iface_obj = bus.get_object("fi.w1.wpa_supplicant1", iface_path)
self.iface_ifc = dbus.Interface(self.iface_obj, "fi.w1.wpa_supplicant1.Interface")
if self.iface_ifc is None:
raise RuntimeError('supplicant : Failed to fetch interface')
self.old_network = self.iface_obj.Get("fi.w1.wpa_supplicant1.Interface", "CurrentNetwork",
dbus_interface='org.freedesktop.DBus.Properties')
if self.old_network == '/':
self.old_network = None
else:
self.connected = True
def connect(self, ssid, password):
if self.connected is True:
self.iface_ifc.Disconnect()
self.connected = False
if self.new_network is not None:
self.iface_ifc.RemoveNetwork(self.new_network)
self.new_network = self.iface_ifc.AddNetwork({"ssid": ssid, "psk": password})
self.iface_ifc.SelectNetwork(self.new_network)
ip = None
retry = 10
while retry > 0:
time.sleep(5)
ip = get_wiface_IPv4(self.iface_name)
if ip is not None:
self.connected = True
return ip
retry -= 1
self.reset()
raise RuntimeError('wpa_cli : Connection failed')
def reset(self):
if self.iface_ifc is not None:
if self.connected is True:
self.iface_ifc.Disconnect()
self.connected = False
if self.new_network is not None:
self.iface_ifc.RemoveNetwork(self.new_network)
self.new_network = None
if self.old_network is not None:
self.iface_ifc.SelectNetwork(self.old_network)
self.old_network = None
def __del__(self):
if self.reset_on_exit is True:
self.reset()