From 1dd2b0bc7c4ddef18de53b486852ecd8fd9b2a5d Mon Sep 17 00:00:00 2001 From: Yang Jiao <72076317+YangJiao0817@users.noreply.github.com> Date: Mon, 1 Aug 2022 16:52:57 +0800 Subject: [PATCH] Add purge audit log API test cases (#17175) Added test cases for the following APIs: 1. PUT /system/purgeaudit/{purge_id} Stop the specific purge audit log execution 2. GET /system/purgeaudit/{purge_id} Get purge job status 3. GET /system/purgeaudit/{purge_id}/log Get purge job log 4. PUT /system/purgeaudit/schedule Update purge job's schedule 5. POST /system/purgeaudit/schedule Create a purge job schedule 6. GET /system/purgeaudit/schedule Get purge's schedule 7. GET /system/purgeaudit Get purge job results. Signed-off-by: Yang Jiao --- tests/apitests/python/library/base.py | 5 +- tests/apitests/python/library/purge.py | 105 ++++++++++++++++ tests/apitests/python/test_log_rotation.py | 135 +++++++++++++++++++++ tests/robot-cases/Group0-BAT/API_DB.robot | 6 +- 4 files changed, 248 insertions(+), 3 deletions(-) create mode 100644 tests/apitests/python/library/purge.py create mode 100644 tests/apitests/python/test_log_rotation.py diff --git a/tests/apitests/python/library/base.py b/tests/apitests/python/library/base.py index 4f1d16d52..f6dd9d088 100644 --- a/tests/apitests/python/library/base.py +++ b/tests/apitests/python/library/base.py @@ -31,7 +31,7 @@ def _create_client(server, credential, debug, api_type="products"): cfg = None if api_type in ('projectv2', 'artifact', 'repository', 'scanner', 'scan', 'scanall', 'preheat', 'quota', 'replication', 'registry', 'robot', 'gc', 'retention', 'immutable', 'system_cve_allowlist', - 'configure', 'user', 'member', 'health', 'label', 'webhook'): + 'configure', 'user', 'member', 'health', 'label', 'webhook', 'purge'): cfg = v2_swagger_client.Configuration() else: cfg = swagger_client.Configuration() @@ -76,7 +76,8 @@ def _create_client(server, credential, debug, api_type="products"): "user": v2_swagger_client.UserApi(v2_swagger_client.ApiClient(cfg)), "member": v2_swagger_client.MemberApi(v2_swagger_client.ApiClient(cfg)), "health": v2_swagger_client.HealthApi(v2_swagger_client.ApiClient(cfg)), - "webhook": v2_swagger_client.WebhookApi(v2_swagger_client.ApiClient(cfg)) + "webhook": v2_swagger_client.WebhookApi(v2_swagger_client.ApiClient(cfg)), + "purge": v2_swagger_client.PurgeApi(v2_swagger_client.ApiClient(cfg)) }.get(api_type,'Error: Wrong API type') def _assert_status_code(expect_code, return_code, err_msg = r"HTTPS status code s not as we expected. Expected {}, while actual HTTPS status code is {}."): diff --git a/tests/apitests/python/library/purge.py b/tests/apitests/python/library/purge.py new file mode 100644 index 000000000..2a157dcfd --- /dev/null +++ b/tests/apitests/python/library/purge.py @@ -0,0 +1,105 @@ +# -*- coding: utf-8 -*- + +import base +import v2_swagger_client +from v2_swagger_client.rest import ApiException + + +class Purge(base.Base): + + def __init__(self): + super(Purge, self).__init__(api_type="purge") + + def create_purge_schedule(self, type, cron, dry_run, audit_retention_hour=24, include_operations="create,delete,pull", expect_status_code=201, expect_response_body=None, **kwargs): + scheduleObj = v2_swagger_client.ScheduleObj(type=type) + if cron is not None: + scheduleObj.cron = cron + parameters = { + "audit_retention_hour": audit_retention_hour, + "include_operations": include_operations, + "dry_run": dry_run + } + schedule = v2_swagger_client.Schedule(schedule=scheduleObj, parameters=parameters) + try: + _, status_code, _ = self._get_client(**kwargs).create_purge_schedule_with_http_info(schedule) + except ApiException as e: + base._assert_status_code(expect_status_code, e.status) + if expect_response_body is not None: + print(e.body) + base._assert_status_body(expect_response_body, e.body) + return + base._assert_status_code(expect_status_code, status_code) + + def update_purge_schedule(self, type, cron, audit_retention_hour=24, include_operations="create,delete,pull", expect_status_code=200, expect_response_body=None, **kwargs): + scheduleObj = v2_swagger_client.ScheduleObj(type=type, cron=cron) + parameters = { + "audit_retention_hour": audit_retention_hour, + "include_operations": include_operations, + "dry_run": False + } + schedule = v2_swagger_client.Schedule(schedule=scheduleObj, parameters=parameters) + try: + _, status_code, _ = self._get_client(**kwargs).update_purge_schedule_with_http_info(schedule) + except ApiException as e: + base._assert_status_code(expect_status_code, e.status) + if expect_response_body is not None: + base._assert_status_body(expect_response_body, e.body) + return + base._assert_status_code(expect_status_code, status_code) + + def stop_purge_execution(self, purge_id, expect_status_code=200, expect_response_body=None, **kwargs): + try: + return_data, status_code, h = self._get_client(**kwargs).stop_purge_with_http_info(purge_id) + except ApiException as e: + base._assert_status_code(expect_status_code, e.status) + if expect_response_body is not None: + base._assert_status_body(expect_response_body, e.body) + return + base._assert_status_code(expect_status_code, status_code) + + def get_latest_purge_job(self, **kwargs): + return self.get_purge_jobs(sort="-creation_time", page_size=1, page=1)[0] + + def get_purge_jobs(self, sort, page_size, page, expect_status_code=200, expect_response_body=None, **kwargs): + try: + return_data, status_code, _ = self._get_client(**kwargs).get_purge_history_with_http_info(sort=sort, page_size=page_size, page=page) + except ApiException as e: + base._assert_status_code(expect_status_code, e.status) + if expect_response_body is not None: + base._assert_status_body(expect_response_body, e.body) + return + base._assert_status_code(expect_status_code, status_code) + return return_data + + def get_purge_job(self, purge_id, expect_status_code=200, expect_response_body=None, **kwargs): + try: + return_data, status_code, _ = self._get_client(**kwargs).get_purge_job_with_http_info(purge_id) + except ApiException as e: + base._assert_status_code(expect_status_code, e.status) + if expect_response_body is not None: + base._assert_status_body(expect_response_body, e.body) + return + base._assert_status_code(expect_status_code, status_code) + return return_data + + def get_purge_job_log(self, purge_id, expect_status_code=200, expect_response_body=None, **kwargs): + try: + return_data, status_code, _ = self._get_client(**kwargs).get_purge_job_log_with_http_info(purge_id) + except ApiException as e: + base._assert_status_code(expect_status_code, e.status) + if expect_response_body is not None: + base._assert_status_body(expect_response_body, e.body) + return + base._assert_status_code(expect_status_code, status_code) + return return_data + + def get_purge_schedule(self, expect_status_code=200, expect_response_body=None, **kwargs): + try: + return_data, status_code, _ = self._get_client(**kwargs).get_purge_schedule_with_http_info() + except ApiException as e: + base._assert_status_code(expect_status_code, e.status) + if expect_response_body is not None: + base._assert_status_body(expect_response_body, e.body) + return + base._assert_status_code(expect_status_code, status_code) + return return_data \ No newline at end of file diff --git a/tests/apitests/python/test_log_rotation.py b/tests/apitests/python/test_log_rotation.py new file mode 100644 index 000000000..83dd7bec0 --- /dev/null +++ b/tests/apitests/python/test_log_rotation.py @@ -0,0 +1,135 @@ +from __future__ import absolute_import +import json +import time + +import unittest + +from testutils import ADMIN_CLIENT, suppress_urllib3_warning +from library.purge import Purge +from library.user import User + + +class TestLogRotation(unittest.TestCase, object): + + @suppress_urllib3_warning + def setUp(self): + self.purge = Purge() + self.user = User() + + def tearDown(self): + # 1. Reset schedule + self.purge.update_purge_schedule(type=None, cron="", audit_retention_hour=0) + + def testLogRotation(self): + """ + Test case: + Log Rotaion API + Test step and expected result: + 1. Create a purge audit log job; + 2. Stop this purge audit log job; + 3. Verify purge audit log job status is Stopped; + 4. Create a purge audit log job; + 5. Verify purge audit log job status is Success; + 6. Verify the log of the purge audit log job; + 7. Create purge audit log schedule; + 8. Verify purge audit log schedule; + 9. Update purge audit log schedule; + 10. Verify purge audit log schedule. + Tear down: + 1 Reset schedule. + """ + # 1. Create a purge audit log job + self.purge.create_purge_schedule(type="Manual", cron=None, dry_run=True) + # 2. Stop this purge audit log job + latest_job = self.purge.get_latest_purge_job() + self.purge.stop_purge_execution(latest_job.id) + # 3. Verify purge audit log job status is Stopped + job_status = self.purge.get_purge_job(latest_job.id).job_status + self.assertEqual(self.purge.get_purge_job(latest_job.id).job_status, "Stopped") + # 4. Create a purge audit log job + self.purge.create_purge_schedule(type="Manual", cron=None, dry_run=False, audit_retention_hour=1) + # 5. Verify purge audit log job status is Success + job_status = None + job_id = None + for i in range(10): + print("wait for the job to finish:", i) + if job_id == None: + latest_job = self.purge.get_latest_purge_job() + job_status = latest_job.job_status + job_id = latest_job.id + else: + job_status = self.purge.get_purge_job(job_id).job_status + if job_status == "Success": + break + time.sleep(2) + self.assertEqual(job_status, "Success") + # 6. Verify the log of the purge audit log job + job_logs = self.purge.get_purge_job_log(job_id) + self.assertIn("Purge audit job start", job_logs) + self.assertIn("rows of audit logs", job_logs) + # 7. Create a schedule + schedule_type = "Weekly" + schedule_cron = "0 0 0 * * 0" + audit_retention_hour = 24 + include_operations = "create,delete,pull" + self.purge.create_purge_schedule(type=schedule_type, cron=schedule_cron, dry_run=False, audit_retention_hour=audit_retention_hour, include_operations=include_operations) + # 8. Verify schedule + self.verifySchedule(schedule_type, schedule_cron, audit_retention_hour, include_operations) + # 9. Update schedule + schedule_type = "Custom" + schedule_cron = "0 15 10 ? * *" + audit_retention_hour = 12 + include_operations = "create,delete" + self.purge.update_purge_schedule(type=schedule_type, cron=schedule_cron, audit_retention_hour=audit_retention_hour, include_operations=include_operations) + # 10. Verify schedule + self.verifySchedule(schedule_type, schedule_cron, audit_retention_hour, include_operations) + + def testLogRotationAPIPermission(self): + """ + Test case: + Log Rotaion Permission API + Test step and expected result: + 1. Create a new user(UA); + 2. User(UA) should not have permission to create purge schedule API; + 3. Create a purge audit log job; + 4. User(UA) should not have permission to stop purge execution API; + 5. User(UA) should not have permission to get purge job API; + 6. User(UA) should not have permission to get purge job log API; + 7. User(UA) should not have permission to get purge jobs API; + 8. User(UA) should not have permission to get purge schedule API; + 9. User(UA) should not have permission to update purge schedule API; + """ + expect_status_code = 403 + expect_response_body = "FORBIDDEN" + # 1. Create a new user(UA) + user_password = "Aa123456" + _, user_name = self.user.create_user(user_password = user_password) + USER_CLIENT = dict(endpoint = ADMIN_CLIENT["endpoint"], username = user_name, password = user_password) + # 2. User(UA) should not have permission to create purge schedule API + self.purge.create_purge_schedule(type="Manual", cron=None, dry_run=False, expect_status_code=expect_status_code, expect_response_body=expect_response_body, **USER_CLIENT) + # 3. Create a purge audit log job + self.purge.create_purge_schedule(type="Manual", cron=None, dry_run=False) + latest_job = self.purge.get_latest_purge_job() + # 4. User(UA) should not have permission to stop purge execution API + self.purge.stop_purge_execution(latest_job.id, expect_status_code=expect_status_code, expect_response_body=expect_response_body, **USER_CLIENT) + # 5. User(UA) should not have permission to get purge job API + self.purge.get_purge_job(latest_job.id, expect_status_code=expect_status_code, expect_response_body=expect_response_body, **USER_CLIENT) + # 6. User(UA) should not have permission to get purge job log API + self.purge.get_purge_job_log(latest_job.id, expect_status_code=expect_status_code, expect_response_body=expect_response_body, **USER_CLIENT) + # 7. User(UA) should not have permission to get purge jobs API + self.purge.get_purge_jobs("creation_time", 10, 1, expect_status_code=expect_status_code, expect_response_body=expect_response_body, **USER_CLIENT) + # 8. User(UA) should not have permission to get purge schedule API + self.purge.get_purge_schedule(expect_status_code=expect_status_code, expect_response_body=expect_response_body, **USER_CLIENT) + # 9. User(UA) should not have permission to update purge schedule API + self.purge.update_purge_schedule(type="Custom", cron="0 15 10 ? * *", expect_status_code=expect_status_code, expect_response_body=expect_response_body, **USER_CLIENT) + + def verifySchedule(self, schedule_type, schedule_cron, audit_retention_hour, include_operations): + purge_schedule = self.purge.get_purge_schedule() + job_parameters = json.loads(purge_schedule.job_parameters) + self.assertEqual(purge_schedule.schedule.type, schedule_type) + self.assertEqual(purge_schedule.schedule.cron, schedule_cron) + self.assertEqual(job_parameters["audit_retention_hour"], audit_retention_hour) + self.assertEqual(job_parameters["include_operations"], include_operations) + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/robot-cases/Group0-BAT/API_DB.robot b/tests/robot-cases/Group0-BAT/API_DB.robot index 47bc3c91e..72817a065 100644 --- a/tests/robot-cases/Group0-BAT/API_DB.robot +++ b/tests/robot-cases/Group0-BAT/API_DB.robot @@ -179,4 +179,8 @@ Test Case - Webhook CRUD Test Case - Cosign Sign Artifact [Tags] cosign - Harbor API Test ./tests/apitests/python/test_cosign_sign_artifact.py \ No newline at end of file + Harbor API Test ./tests/apitests/python/test_cosign_sign_artifact.py + +Test Case - Log Rotation + [Tags] log_rotation + Harbor API Test ./tests/apitests/python/test_log_rotation.py \ No newline at end of file