Added bandwidth limitaion option to iperf test scripts

This commit is contained in:
Ondrej Kosta
2022-02-28 17:16:19 +01:00
committed by BOT
parent c8b08b9a4b
commit 97ddccafd3
4 changed files with 47 additions and 25 deletions

View File

@@ -286,7 +286,7 @@ class IperfTestUtility(object):
def _save_test_result(self, test_case, raw_data, att, rssi, heap_size): # type: (str, str, int, int, int) -> Any
return self.test_result[test_case].add_result(raw_data, self.ap_ssid, att, rssi, heap_size)
def _test_once(self, proto, direction): # type: (Any, str, str) -> Tuple[str, int, int]
def _test_once(self, proto, direction, bw_limit): # type: (Any, str, str, int) -> Tuple[str, int, int]
""" do measure once for one type """
# connect and scan to get RSSI
dut_ip, rssi = self.setup()
@@ -301,12 +301,18 @@ class IperfTestUtility(object):
process = subprocess.Popen(['iperf', '-s', '-B', self.pc_nic_ip,
'-t', str(TEST_TIME), '-i', '1', '-f', 'm'],
stdout=f, stderr=f)
self.dut.write('iperf -c {} -i 1 -t {}'.format(self.pc_nic_ip, TEST_TIME))
if bw_limit > 0:
self.dut.write('iperf -c {} -i 1 -t {} -b {}'.format(self.pc_nic_ip, TEST_TIME, bw_limit))
else:
self.dut.write('iperf -c {} -i 1 -t {}'.format(self.pc_nic_ip, TEST_TIME))
else:
process = subprocess.Popen(['iperf', '-s', '-u', '-B', self.pc_nic_ip,
'-t', str(TEST_TIME), '-i', '1', '-f', 'm'],
stdout=f, stderr=f)
self.dut.write('iperf -c {} -u -i 1 -t {}'.format(self.pc_nic_ip, TEST_TIME))
if bw_limit > 0:
self.dut.write('iperf -c {} -u -i 1 -t {} -b {}'.format(self.pc_nic_ip, TEST_TIME, bw_limit))
else:
self.dut.write('iperf -c {} -u -i 1 -t {}'.format(self.pc_nic_ip, TEST_TIME))
for _ in range(TEST_TIMEOUT):
if process.poll() is not None:
@@ -327,9 +333,12 @@ class IperfTestUtility(object):
except DUT.ExpectTimeout:
# compatible with old iperf example binary
Utility.console_log('create iperf tcp server fail')
process = subprocess.Popen(['iperf', '-c', dut_ip,
'-t', str(TEST_TIME), '-f', 'm'],
stdout=f, stderr=f)
if bw_limit > 0:
process = subprocess.Popen(['iperf', '-c', dut_ip, '-b', str(bw_limit) + 'm',
'-t', str(TEST_TIME), '-f', 'm'], stdout=f, stderr=f)
else:
process = subprocess.Popen(['iperf', '-c', dut_ip,
'-t', str(TEST_TIME), '-f', 'm'], stdout=f, stderr=f)
for _ in range(TEST_TIMEOUT):
if process.poll() is not None:
break
@@ -344,15 +353,25 @@ class IperfTestUtility(object):
except DUT.ExpectTimeout:
# compatible with old iperf example binary
Utility.console_log('create iperf udp server fail')
for bandwidth in range(50, 101, 5):
process = subprocess.Popen(['iperf', '-c', dut_ip, '-u', '-b', str(bandwidth) + 'm',
'-t', str(TEST_TIME / 11), '-f', 'm'], stdout=f, stderr=f)
if bw_limit > 0:
process = subprocess.Popen(['iperf', '-c', dut_ip, '-u', '-b', str(bw_limit) + 'm',
'-t', str(TEST_TIME), '-f', 'm'], stdout=f, stderr=f)
for _ in range(TEST_TIMEOUT):
if process.poll() is not None:
break
time.sleep(1)
else:
process.terminate()
else:
for bandwidth in range(50, 101, 5):
process = subprocess.Popen(['iperf', '-c', dut_ip, '-u', '-b', str(bandwidth) + 'm',
'-t', str(TEST_TIME / 11), '-f', 'm'], stdout=f, stderr=f)
for _ in range(TEST_TIMEOUT):
if process.poll() is not None:
break
time.sleep(1)
else:
process.terminate()
server_raw_data = self.dut.read()
with open(PC_IPERF_TEMP_LOG_FILE, 'r') as f:
@@ -371,7 +390,7 @@ class IperfTestUtility(object):
# return server raw data (for parsing test results) and RSSI
return server_raw_data, rssi, heap_size
def run_test(self, proto, direction, atten_val): # type: (str, str, int) -> None
def run_test(self, proto, direction, atten_val, bw_limit): # type: (str, str, int, int) -> None
"""
run test for one type, with specified atten_value and save the test result
@@ -382,7 +401,7 @@ class IperfTestUtility(object):
rssi = FAILED_TO_SCAN_RSSI
heap_size = INVALID_HEAP_SIZE
try:
server_raw_data, rssi, heap_size = self._test_once(proto, direction)
server_raw_data, rssi, heap_size = self._test_once(proto, direction, bw_limit)
throughput = self._save_test_result('{}_{}'.format(proto, direction),
server_raw_data, atten_val,
rssi, heap_size)
@@ -396,16 +415,16 @@ class IperfTestUtility(object):
self.fail_to_scan += 1
Utility.console_log('Fail to scan AP.')
def run_all_cases(self, atten_val): # type: (int) -> None
def run_all_cases(self, atten_val, bw_limit): # type: (int, int) -> None
"""
run test for all types (udp_tx, udp_rx, tcp_tx, tcp_rx).
:param atten_val: attenuate value
"""
self.run_test('tcp', 'tx', atten_val)
self.run_test('tcp', 'rx', atten_val)
self.run_test('udp', 'tx', atten_val)
self.run_test('udp', 'rx', atten_val)
self.run_test('tcp', 'tx', atten_val, bw_limit)
self.run_test('tcp', 'rx', atten_val, bw_limit)
self.run_test('udp', 'tx', atten_val, bw_limit)
self.run_test('udp', 'rx', atten_val, bw_limit)
if self.fail_to_scan > 10:
Utility.console_log(
'Fail to scan AP for more than 10 times. Lowest RSSI scanned is {}'.format(self.lowest_rssi_scanned))