"""testcases for cli/volume/top """ import sys import time import glusterutils import clientutils import hostutils import atfutils volume_top_usage = "Usage: volume top {[open|read|write|opendir|readdir] |[read-perf|write-perf bs count ]} [brick ] [list-cnt ]" def reset_testenv(): output = clientutils.umountall() assert_success_status = atfutils.assert_success(output["exitstatus"]) if assert_success_status is not 0: return assert_success_status glusterutils.volume_stop("server1", force=True) glusterutils.volume_delete("server1") glusterutils.glusterd_stop_allservers() glusterutils.glusterd_remove_dir_allservers() glusterutils.glusterd_remove_logs_allservers() return 0 def setup_testenv(): """ """ output = glusterutils.glusterd_start_allservers(force=True) assert_success_status = atfutils.assert_success(output["exitstatus"]) if assert_success_status is not 0: return assert_success_status output = glusterutils.peer_probe("server1") assert_success_status = atfutils.assert_success(output["exitstatus"]) if assert_success_status is not 0: return assert_success_status output = glusterutils.create_brick_allservers() assert_success_status = atfutils.assert_success(output["exitstatus"]) if assert_success_status is not 0: return assert_success_status output = glusterutils.volume_create("server1") assert_success_status = atfutils.assert_success(output["exitstatus"]) if assert_success_status is not 0: return assert_success_status output = glusterutils.volume_start("server1") assert_success_status = atfutils.assert_success(output["exitstatus"]) if assert_success_status is not 0: return assert_success_status output = clientutils.mount("mount1") assert_success_status = atfutils.assert_success(output["exitstatus"]) if assert_success_status is not 0: return assert_success_status return 0 def bug2645a(): """ """ operation = "write-perf bs 0 count 100" expected_output = "block size should be an integer greater than zero" expected_exit_status = 1 output = glusterutils.volume_top("server1", operation) return_status = atfutils.validate_output(output, expected_exit_status, expected_output) if return_status is not 0: return return_status return 0 def bug2645b(): """ """ operation = "write-perf bs 1024 count 0" expected_output = "count should be an integer greater than zero" expected_exit_status = 1 output = glusterutils.volume_top("server1", operation) return_status = atfutils.validate_output(output, expected_exit_status, expected_output) if return_status is not 0: return return_status return 0 def bug2645c(): """ """ operation = "write-perf bs 1024 count 100" expected_output = "" expected_exit_status = 0 output = glusterutils.volume_top("server1", operation) return_status = atfutils.validate_output(output, expected_exit_status, expected_output) if return_status is not 0: return return_status return 0 def bug2771a(): """ """ operation = "write-perf bs 2048" expected_output = volume_top_usage expected_exit_status = 1 output = glusterutils.volume_top("server1", operation, brick="brick1", list_cnt="10") return_status = atfutils.validate_output(output, expected_exit_status, expected_output) if return_status is not 0: return return_status return 0 def bug2771b(): """ """ operation = "read-perf bs 37" expected_output = volume_top_usage expected_exit_status = 1 output = glusterutils.volume_top("server1", operation, brick="brick1", list_cnt="99") return_status = atfutils.validate_output(output, expected_exit_status, expected_output) if return_status is not 0: return return_status return 0 def bug2771c(): """ """ operation = "read-perf bs 37 bs 1024" expected_output = volume_top_usage expected_exit_status = 1 output = glusterutils.volume_top("server1", operation, brick="brick1", list_cnt="99") return_status = atfutils.validate_output(output, expected_exit_status, expected_output) if return_status is not 0: return return_status return 0 def bug2771d(): """ """ operation = "read-perf count 37 count 10" expected_output = volume_top_usage expected_exit_status = 1 output = glusterutils.volume_top("server1", operation, brick="brick1", list_cnt="99") return_status = atfutils.validate_output(output, expected_exit_status, expected_output) if return_status is not 0: return return_status return 0 def bug2771e(): operation = "read-perf bs 1024 count 37" expected_output = "" expected_exit_status = 0 output = glusterutils.volume_top("server1", operation, brick="brick1", list_cnt="10") return_status = atfutils.validate_output(output, expected_exit_status, expected_output) if return_status is not 0: return return_status return 0 def bug2771f(): """ """ operation = "read-perf count 37 bs 1024" expected_output = "" expected_exit_status = 0 output = glusterutils.volume_top("server1", operation, brick="brick1", list_cnt="10") return_status = atfutils.validate_output(output, expected_exit_status, expected_output) if return_status is not 0: return return_status return 0 def bug2725a(): """ """ testcase_failed = False dir_name = 'a' for i in range(1, 255): dir_name += 'a' commands = [] commands.append("echo 'Hello World' > " + dir_name) commands.append("find . | xargs touch") for command in commands: output = clientutils.execute_on_mount("mount1", command) assert_success_status = atfutils.assert_success(output["exitstatus"]) if assert_success_status is not 0: return assert_success_status operation = "write-perf bs 2048 count 1000" expected_output = "" expected_exit_status = 0 output = glusterutils.volume_top("server1", operation, list_cnt="100") return_status = atfutils.validate_output(output, expected_exit_status, expected_output) if return_status is not 0: return return_status operations = ['open', 'read', 'write'] for operation in operations: output = glusterutils.volume_top("server1", operation) return_status = atfutils.validate_output(output, expected_exit_status, expected_output) if return_status is not 0: testcase_failed = True if testcase_failed is True: return 1 else: return 0 def bug2725b(): """ """ testcase_failed = False dir_name = 'a' for i in range(1, 255): dir_name += 'a' commands = [] commands.append("echo 'Hello World' > " + dir_name) commands.append("find . | xargs touch") for command in commands: output = clientutils.execute_on_mount("mount1", command) assert_success_status = atfutils.assert_success(output["exitstatus"]) if assert_success_status is not 0: return assert_success_status operation = "write-perf bs 2048 count 1000" expected_output = "Unable to decode response" expected_exit_status = 1 output = glusterutils.volume_top("server1", operation, list_cnt="100") return_status = atfutils.validate_output(output, expected_exit_status, expected_output) if return_status is not 0: return return_status operations = ['open', 'read', 'write'] for operation in operations: output = glusterutils.volume_top("server1", operation) return_status = atfutils.validate_output(output, expected_exit_status, expected_output) if return_status is not 0: testcase_failed = True if testcase_failed is True: return 1 else: return 0