mirror of
https://github.com/goharbor/harbor
synced 2025-04-15 06:31:31 +00:00
Add API test case for retag, create a user and 2 projects by this new user, push image to 1st project, and retag this image to 2nd project. (#7300)
Signed-off-by: danfengliu <danfengl@vmware.com>
This commit is contained in:
parent
0d3865c0e8
commit
08b346ee1e
|
@ -15,6 +15,12 @@ def is_member_exist_in_project(members, member_user_name, expected_member_role_i
|
|||
return True
|
||||
return result
|
||||
|
||||
def get_member_id_by_name(members, member_user_name):
|
||||
for member in members:
|
||||
if member.entity_name == member_user_name:
|
||||
return member.id
|
||||
return None
|
||||
|
||||
class Project(base.Base):
|
||||
def create_project(self, name=None, metadata=None, expect_status_code = 201, expect_response_body = None, **kwargs):
|
||||
if name is None:
|
||||
|
@ -131,6 +137,14 @@ class Project(base.Base):
|
|||
base._assert_status_code(200, status_code)
|
||||
return data
|
||||
|
||||
def get_project_member_id(self, project_id, member_user_name, **kwargs):
|
||||
members = self.get_project_members(project_id, **kwargs)
|
||||
result = get_member_id_by_name(list(members), member_user_name)
|
||||
if result == None:
|
||||
raise Exception(r"Failed to get member id of member {} in project {}.".format(member_user_name, project_id))
|
||||
else:
|
||||
return result
|
||||
|
||||
def check_project_member_not_exist(self, project_id, member_user_name, **kwargs):
|
||||
members = self.get_project_members(project_id, **kwargs)
|
||||
result = is_member_exist_in_project(list(members), member_user_name)
|
||||
|
|
|
@ -4,6 +4,7 @@ import time
|
|||
import base
|
||||
import swagger_client
|
||||
from docker_api import DockerAPI
|
||||
from swagger_client.rest import ApiException
|
||||
|
||||
def pull_harbor_image(registry, username, password, image, tag, expected_error_message = None):
|
||||
_docker_api = DockerAPI()
|
||||
|
@ -94,7 +95,7 @@ class Repository(base.Base):
|
|||
time.sleep(5)
|
||||
timeout_count = timeout_count - 1
|
||||
if (timeout_count == 0):
|
||||
break
|
||||
break
|
||||
_tag = self.get_tag(repo_name, tag, **kwargs)
|
||||
if _tag.name == tag and _tag.scan_overview !=None:
|
||||
if _tag.scan_overview.scan_status == expected_scan_status:
|
||||
|
@ -118,4 +119,20 @@ class Repository(base.Base):
|
|||
if each_sign.tag == tag and len(each_sign.hashes["sha256"]) == 44:
|
||||
print "sha256:", len(each_sign.hashes["sha256"])
|
||||
return
|
||||
raise Exception(r"Signature of {}:{} is not exist!".format(repo_name, tag))
|
||||
raise Exception(r"Signature of {}:{} is not exist!".format(repo_name, tag))
|
||||
|
||||
def retag_image(self, repo_name, tag, src_image, override=True, expect_status_code = 200, expect_response_body = None, **kwargs):
|
||||
client = self._get_client(**kwargs)
|
||||
request = swagger_client.RetagReq(tag=tag, src_image=src_image, override=override)
|
||||
|
||||
try:
|
||||
data, status_code, _ = client.repositories_repo_name_tags_post_with_http_info(repo_name, request)
|
||||
except ApiException as e:
|
||||
base._assert_status_code(expect_status_code, e.status)
|
||||
if expect_response_body is not None:
|
||||
base._assert_status_body(expect_response_body, e.body)
|
||||
return
|
||||
|
||||
base._assert_status_code(expect_status_code, status_code)
|
||||
base._assert_status_code(200, status_code)
|
||||
return data
|
127
tests/apitests/python/test_retag.py
Normal file
127
tests/apitests/python/test_retag.py
Normal file
|
@ -0,0 +1,127 @@
|
|||
from __future__ import absolute_import
|
||||
|
||||
|
||||
import unittest
|
||||
|
||||
from library.base import _assert_status_code
|
||||
from testutils import ADMIN_CLIENT
|
||||
from testutils import harbor_server
|
||||
|
||||
from testutils import TEARDOWN
|
||||
from library.project import Project
|
||||
from library.user import User
|
||||
from library.repository import Repository
|
||||
from library.repository import push_image_to_project
|
||||
from library.repository import pull_harbor_image
|
||||
|
||||
class TestProjects(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(self):
|
||||
project = Project()
|
||||
self.project= project
|
||||
|
||||
user = User()
|
||||
self.user= user
|
||||
|
||||
repo = Repository()
|
||||
self.repo= repo
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(self):
|
||||
print "Case completed"
|
||||
|
||||
@unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
|
||||
def test_ClearData(self):
|
||||
#1. Delete repository(RA);
|
||||
self.repo.delete_repoitory(TestProjects.src_repo_name, **TestProjects.USER_RETAG_CLIENT)
|
||||
|
||||
#2. Delete repository by retag;
|
||||
self.repo.delete_repoitory(TestProjects.dst_repo_name, **TestProjects.USER_RETAG_CLIENT)
|
||||
|
||||
#3. Delete project(PA);
|
||||
self.project.delete_project(TestProjects.project_src_repo_id, **TestProjects.USER_RETAG_CLIENT)
|
||||
self.project.delete_project(TestProjects.project_dst_repo_id, **TestProjects.USER_RETAG_CLIENT)
|
||||
|
||||
#4. Delete user(UA).
|
||||
self.user.delete_user(TestProjects.user_retag_id, **ADMIN_CLIENT)
|
||||
|
||||
def testRetag(self):
|
||||
"""
|
||||
Test case:
|
||||
Retag Image
|
||||
Test step and expected result:
|
||||
1. Create a new user(UA);
|
||||
2. Create a new project(PA) by user(UA);
|
||||
3. Create a new project(PB) by user(UA);
|
||||
4. Update role of user-retag as guest member of project(PB);
|
||||
5. Create a new repository(RA) in project(PA) by user(UA);
|
||||
6. Get repository in project(PA), there should be one repository which was created by user(UA);
|
||||
7. Get repository(RA)'s image tag detail information;
|
||||
8. Retag image in project(PA) to project(PB), it should be forbidden;
|
||||
9. Update role of user-retag as admin member of project(PB);
|
||||
10. Retag image in project(PA) to project(PB), it should be successful;
|
||||
11. Get repository(RB)'s image tag detail information;
|
||||
12. Read digest of retaged image, it must be the same with the image in repository(RA);
|
||||
13. Pull image from project(PB) by user_retag, it must be successful;
|
||||
Tear down:
|
||||
1. Delete repository(RA);
|
||||
2. Delete repository by retag;
|
||||
3. Delete project(PA);
|
||||
4. Delete user(UA).
|
||||
"""
|
||||
url = ADMIN_CLIENT["endpoint"]
|
||||
user_retag_password = "Aa123456"
|
||||
pull_tag_name = "latest"
|
||||
dst_repo_sub_name = "repo"
|
||||
dst_tag_name = "test_tag"
|
||||
|
||||
#1. Create a new user(UA);
|
||||
TestProjects.user_retag_id, user_retag_name = self.user.create_user(user_password = user_retag_password, **ADMIN_CLIENT)
|
||||
|
||||
TestProjects.USER_RETAG_CLIENT=dict(endpoint = url, username = user_retag_name, password = user_retag_password)
|
||||
|
||||
#2. Create a new project(PA) by user(UA);
|
||||
TestProjects.project_src_repo_id, project_src_repo_name = self.project.create_project(metadata = {"public": "false"}, **TestProjects.USER_RETAG_CLIENT)
|
||||
|
||||
#3. Create a new project(PB) by user(UA);
|
||||
TestProjects.project_dst_repo_id, project_dst_repo_name = self.project.create_project(metadata = {"public": "false"}, **TestProjects.USER_RETAG_CLIENT)
|
||||
|
||||
retag_member_id = self.project.get_project_member_id(TestProjects.project_dst_repo_id, user_retag_name, **TestProjects.USER_RETAG_CLIENT)
|
||||
|
||||
#4. Update role of user-retag as guest member of project(PB);
|
||||
self.project.update_project_member_role(TestProjects.project_dst_repo_id, retag_member_id, 3, **ADMIN_CLIENT)
|
||||
|
||||
#5. Create a new repository(RA) in project(PA) by user(UA);
|
||||
TestProjects.src_repo_name, tag_name = push_image_to_project(project_src_repo_name, harbor_server, 'admin', 'Harbor12345', "hello-world", pull_tag_name)
|
||||
|
||||
#6. Get repository in project(PA), there should be one repository which was created by user(UA);
|
||||
src_repo_data = self.repo.get_repository(TestProjects.project_src_repo_id, **TestProjects.USER_RETAG_CLIENT)
|
||||
_assert_status_code(TestProjects.src_repo_name, src_repo_data[0].name)
|
||||
|
||||
#7. Get repository(RA)'s image tag detail information;
|
||||
src_tag_data = self.repo.get_tag(TestProjects.src_repo_name, tag_name, **TestProjects.USER_RETAG_CLIENT)
|
||||
|
||||
TestProjects.dst_repo_name = project_dst_repo_name+"/"+ dst_repo_sub_name
|
||||
|
||||
#8. Retag image in project(PA) to project(PB), it should be forbidden;
|
||||
self.repo.retag_image(TestProjects.dst_repo_name, dst_tag_name, TestProjects.src_repo_name+":"+src_tag_data.digest, expect_status_code=403, **TestProjects.USER_RETAG_CLIENT)
|
||||
|
||||
#9. Update role of user-retag as admin member of project(PB);
|
||||
self.project.update_project_member_role(TestProjects.project_dst_repo_id, retag_member_id, 1, **ADMIN_CLIENT)
|
||||
|
||||
#10. Retag image in project(PA) to project(PB), it should be successful;
|
||||
self.repo.retag_image(TestProjects.dst_repo_name, dst_tag_name, TestProjects.src_repo_name+":"+src_tag_data.digest, **TestProjects.USER_RETAG_CLIENT)
|
||||
|
||||
#11. Get repository(RB)'s image tag detail information;
|
||||
dst_tag_data = self.repo.get_tag(TestProjects.dst_repo_name, dst_tag_name, **TestProjects.USER_RETAG_CLIENT)
|
||||
|
||||
#12. Read digest of retaged image, it must be the same with the image in repository(RA);
|
||||
self.assertEqual(src_tag_data.digest, dst_tag_data.digest)
|
||||
|
||||
#13. Pull image from project(PB) by user_retag, it must be successful;"
|
||||
pull_harbor_image(harbor_server, user_retag_name, user_retag_password, TestProjects.dst_repo_name, dst_tag_name)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
@ -42,4 +42,6 @@ Test Case - Scan All Images
|
|||
Test Case - List Helm Charts
|
||||
Harbor API Test ./tests/apitests/python/test_list_helm_charts.py
|
||||
Test Case - Assign Sys Admin
|
||||
Harbor API Test ./tests/apitests/python/test_assign_sys_admin.py
|
||||
Harbor API Test ./tests/apitests/python/test_assign_sys_admin.py
|
||||
Test Case - Retag Image
|
||||
Harbor API Test ./tests/apitests/python/test_retag.py
|
Loading…
Reference in New Issue
Block a user