diff --git a/LICENSE.orig b/LICENSE.orig new file mode 100644 index 0000000..0ffdba6 --- /dev/null +++ b/LICENSE.orig @@ -0,0 +1,7 @@ +license of migrate.py.orig + +MIT License +Copyright (c) +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/migrate.py.orig b/migrate.py.orig new file mode 100644 index 0000000..3dd37d7 --- /dev/null +++ b/migrate.py.orig @@ -0,0 +1,1505 @@ +import base64 +import os +import time +import random +import string +import requests +import json +import dateutil.parser +import datetime +import re + +import gitlab # pip install python-gitlab +import gitlab.v4.objects +import pygitea # pip install pygitea (https://github.com/h44z/pygitea) + +# Add these variables near the config section +MIGRATION_STATE_FILE = "migration_state.json" +RESUME_MIGRATION = True # Set to False to force a full migration + + +SCRIPT_VERSION = "1.0" +GLOBAL_ERROR_COUNT = 0 + +####################### +# CONFIG SECTION START +####################### +GITLAB_URL = 'https://gitlab.source.com' +GITLAB_TOKEN = 'gitlab token' + +# needed to clone the repositories, keep empty to try publickey (untested) +GITLAB_ADMIN_USER = 'admin username' +GITLAB_ADMIN_PASS = 'admin password' + +GITEA_URL = 'https://gitea.dest.com' +GITEA_TOKEN = 'gitea token' +####################### +# CONFIG SECTION END +####################### + +# Global mapping of original usernames to cleaned usernames +username_map = {} + + +def main(): + print_color(bcolors.HEADER, "---=== Gitlab to Gitea migration ===---") + print("Version: " + SCRIPT_VERSION) + print() + + if os.path.exists(MIGRATION_STATE_FILE) and RESUME_MIGRATION: + print_info("Resuming previous migration...") + else: + print_info("Starting new migration...") + if os.path.exists(MIGRATION_STATE_FILE): + os.remove(MIGRATION_STATE_FILE) + + # private token or personal token authentication + gl = gitlab.Gitlab(GITLAB_URL, private_token=GITLAB_TOKEN) + gl.keep_base_url = True + gl.auth() + assert isinstance(gl.user, gitlab.v4.objects.CurrentUser) + print_info("Connected to Gitlab, version: " + str(gl.version())) + + gt = pygitea.API(GITEA_URL, token=GITEA_TOKEN) + gt_version = gt.get("/version").json() + print_info("Connected to Gitea, version: " + str(gt_version["version"])) + + try: + # IMPORT USERS AND GROUPS + import_users_groups(gl, gt) + + # IMPORT PROJECTS + import_projects(gl, gt) + + print() + if GLOBAL_ERROR_COUNT == 0: + print_success("Migration finished with no errors!") + else: + print_error( + "Migration finished with " + str(GLOBAL_ERROR_COUNT) + " errors!" + ) + except KeyboardInterrupt: + print_warning( + "\nMigration interrupted. You can resume later using the state file." + ) + except Exception as e: + print_error(f"Migration failed with error: {str(e)}") + print_warning("You can resume the migration later using the state file.") + + +# +# Data loading helpers for Gitea +# + + +# Dictionary mapping original GitLab usernames to Gitea-compatible usernames +USERNAME_MAPPING = {} + + +def normalize_username(username): + """ + Convert usernames to Gitea-compatible format: + - Replace spaces with underscores + - Remove special characters + - Handle special cases + """ + if not username: + return username + + # Check if we already have a mapping for this username + if username in USERNAME_MAPPING: + return USERNAME_MAPPING[username] + + # Handle special cases (reserved names, etc) + if username.lower() == "ghost": + clean_name = "ghost_user" + else: + # Replace spaces and special chars + clean_name = username.replace(" ", "_") + clean_name = re.sub(r"[^a-zA-Z0-9_\.-]", "_", clean_name) + + # Store mapping for future reference + USERNAME_MAPPING[username] = clean_name + return clean_name + + +def extract_user_mentions(text): + """Extract all @username mentions from text""" + if not text: + return [] + + # Pattern to match @username mentions + mention_pattern = r"@([a-zA-Z0-9_\.-]+(?:\s+[a-zA-Z0-9_\.-]+)*)" + + # Find all matches + mentions = re.findall(mention_pattern, text) + return mentions + + +def ensure_mentioned_users_exist( + gitea_api: pygitea, + gitlab_api: gitlab.Gitlab, + issues: [gitlab.v4.objects.ProjectIssue], +): + """Make sure all users mentioned in issues exist in Gitea""" + # Collect all mentioned users + mentioned_users = set() + + for issue in issues: + # Check issue description + if issue.description: + for mention in extract_user_mentions(issue.description): + mentioned_users.add(mention) + + # Check issue comments + try: + notes = issue.notes.list(all=True) + for note in notes: + if not note.system and note.body: + for mention in extract_user_mentions(note.body): + mentioned_users.add(mention) + except Exception as e: + print_warning(f"Error extracting mentions from issue comments: {str(e)}") + + # Create any missing users + for username in mentioned_users: + if not user_exists(gitea_api, username): + _import_placeholder_user(gitea_api, username) + + +# Update the _import_placeholder_user function to use normalized usernames +def _import_placeholder_user(gitea_api: pygitea, username: str): + """Import a placeholder user when a mentioned user doesn't exist""" + print_warning(f"Creating placeholder user for {username} mentioned in issues") + + # Normalize username for Gitea compatibility + clean_username = normalize_username(username) + + tmp_password = "Tmp1!" + "".join( + random.choices(string.ascii_uppercase + string.digits, k=10) + ) + try: + import_response: requests.Response = gitea_api.post( + "/admin/users", + json={ + "email": f"{clean_username}@placeholder-migration.local", + "full_name": username, # Keep original name for display + "login_name": clean_username, + "password": tmp_password, + "send_notify": False, + "source_id": 0, # local user + "username": clean_username, + }, + ) + if import_response.ok: + print_info(f"Placeholder user {username} created as {clean_username}") + return True + else: + print_error( + f"Failed to create placeholder user {username}: {import_response.text}" + ) + return False + except Exception as e: + print_error(f"Error creating placeholder user {username}: {str(e)}") + return False + + +def get_labels(gitea_api: pygitea, owner: string, repo: string) -> []: + existing_labels = [] + label_response: requests.Response = gitea_api.get( + "/repos/" + owner + "/" + repo + "/labels" + ) + if label_response.ok: + existing_labels = label_response.json() + else: + print_error( + "Failed to load existing milestones for project " + + repo + + "! " + + label_response.text + ) + + return existing_labels + + +def get_milestones(gitea_api: pygitea, owner: string, repo: string) -> []: + existing_milestones = [] + milestone_response: requests.Response = gitea_api.get( + "/repos/" + owner + "/" + repo + "/milestones" + ) + if milestone_response.ok: + existing_milestones = milestone_response.json() + else: + print_error( + "Failed to load existing milestones for project " + + repo + + "! " + + milestone_response.text + ) + + return existing_milestones + + +def get_issues(gitea_api: pygitea, owner: string, repo: string) -> []: + existing_issues = [] + try: + issue_response: requests.Response = gitea_api.get( + "/repos/" + owner + "/" + repo + "/issues", + params={"state": "all", "page": -1}, + ) + if issue_response.ok: + existing_issues = issue_response.json() + else: + error_text = issue_response.text + if "user redirect does not exist" in error_text: + print_warning( + f"User redirect error for project {repo}. Creating placeholder users might help." + ) + else: + print_error( + "Failed to load existing issues for project " + + repo + + "! " + + error_text + ) + except Exception as e: + print_error(f"Exception getting issues for {owner}/{repo}: {str(e)}") + + return existing_issues + + +def get_teams(gitea_api: pygitea, orgname: string) -> []: + existing_teams = [] + team_response: requests.Response = gitea_api.get("/orgs/" + orgname + "/teams") + if team_response.ok: + existing_teams = team_response.json() + else: + print_error( + "Failed to load existing teams for organization " + + orgname + + "! " + + team_response.text + ) + + return existing_teams + + +def get_team_members(gitea_api: pygitea, teamid: int) -> []: + existing_members = [] + member_response: requests.Response = gitea_api.get( + "/teams/" + str(teamid) + "/members" + ) + if member_response.ok: + existing_members = member_response.json() + else: + print_error( + "Failed to load existing members for team " + + str(teamid) + + "! " + + member_response.text + ) + + return existing_members + + +def get_collaborators(gitea_api: pygitea, owner: string, repo: string) -> []: + existing_collaborators = [] + collaborator_response: requests.Response = gitea_api.get( + "/repos/" + owner + "/" + repo + "/collaborators" + ) + if collaborator_response.ok: + existing_collaborators = collaborator_response.json() + else: + print_error( + "Failed to load existing collaborators for project " + + repo + + "! " + + collaborator_response.text + ) + + return existing_collaborators + + +def get_user_or_group(gitea_api: pygitea, project: gitlab.v4.objects.Project) -> {}: + result = None + response: requests.Response = gitea_api.get("/users/" + project.namespace["path"]) + if response.ok: + result = response.json() + else: + response: requests.Response = gitea_api.get( + "/orgs/" + name_clean(project.namespace["name"]) + ) + if response.ok: + result = response.json() + else: + print_error( + "Failed to load user or group " + + project.namespace["name"] + + "! " + + response.text + ) + + return result + + +def get_user_keys(gitea_api: pygitea, username: string) -> {}: + result = [] + key_response: requests.Response = gitea_api.get("/users/" + username + "/keys") + if key_response.ok: + result = key_response.json() + else: + print_error( + "Failed to load user keys for user " + username + "! " + key_response.text + ) + + return result + + +# Modify the user_exists function to use normalized usernames +def user_exists(gitea_api: pygitea, username: string) -> bool: + # Normalize the username first + clean_username = normalize_username(username) + + user_response: requests.Response = gitea_api.get("/users/" + clean_username) + + if user_response.ok: + print_warning( + f"User {username} already exists in Gitea as {clean_username}, skipping!" + ) + # Make sure we update our mapping + USERNAME_MAPPING[username] = clean_username + return True + else: + print(f"User {username} not found in Gitea, importing!") + return False + + +def user_key_exists(gitea_api: pygitea, username: string, keyname: string) -> bool: + # Normalize username first + clean_username = normalize_username(username) + + keys_response: requests.Response = gitea_api.get( + "/users/" + clean_username + "/keys" + ) + if keys_response.ok: + keys = keys_response.json() + for key in keys: + if key["title"] == keyname: + return True + return False + + +def organization_exists(gitea_api: pygitea, orgname: string) -> bool: + group_response: requests.Response = gitea_api.get("/orgs/" + orgname) + if group_response.ok: + print_warning("Group " + orgname + " does already exist in Gitea, skipping!") + else: + print("Group " + orgname + " not found in Gitea, importing!") + + return group_response.ok + + +def member_exists(gitea_api: pygitea, username: string, teamid: int) -> bool: + # Normalize username first + clean_username = normalize_username(username) + + members_response: requests.Response = gitea_api.get( + "/teams/" + str(teamid) + "/members" + ) + if members_response.ok: + members = members_response.json() + for member in members: + if member["username"] == clean_username: + return True + return False + + +def collaborator_exists( + gitea_api: pygitea, owner: string, repo: string, username: string +) -> bool: + # Normalize username first + clean_username = normalize_username(username) + + collaborator_response: requests.Response = gitea_api.get( + "/repos/" + owner + "/" + repo + "/collaborators/" + clean_username + ) + if collaborator_response.ok: + return True + return False + + +def repo_exists(gitea_api: pygitea, owner: string, repo: string) -> bool: + repo_response: requests.Response = gitea_api.get("/repos/" + owner + "/" + repo) + if repo_response.ok: + print_warning("Project " + repo + " does already exist in Gitea, skipping!") + else: + print("Project " + repo + " not found in Gitea, importing!") + + return repo_response.ok + + +def label_exists( + gitea_api: pygitea, owner: string, repo: string, labelname: string +) -> bool: + existing_labels = get_labels(gitea_api, owner, repo) + if existing_labels: + existing_label = next( + (item for item in existing_labels if item["name"] == labelname), None + ) + + if existing_label is not None: + print_warning( + "Label " + + labelname + + " already exists in project " + + repo + + ", skipping!" + ) + return True + else: + print( + "Label " + + labelname + + " does not exists in project " + + repo + + ", importing!" + ) + return False + else: + print("No labels in project " + repo + ", importing!") + return False + + +def milestone_exists( + gitea_api: pygitea, owner: string, repo: string, milestone: string +) -> bool: + existing_milestones = get_milestones(gitea_api, owner, repo) + if existing_milestones: + existing_milestone = next( + (item for item in existing_milestones if item["title"] == milestone), None + ) + + if existing_milestone is not None: + print_warning( + "Milestone " + + milestone + + " already exists in project " + + repo + + ", skipping!" + ) + return True + else: + print( + "Milestone " + + milestone + + " does not exists in project " + + repo + + ", importing!" + ) + return False + else: + print("No milestones in project " + repo + ", importing!") + return False + + +def issue_exists( + gitea_api: pygitea, owner: string, repo: string, issue: string +) -> bool: + existing_issues = get_issues(gitea_api, owner, repo) + if existing_issues: + existing_issue = next( + (item for item in existing_issues if item["title"] == issue), None + ) + + if existing_issue is not None: + print_warning( + "Issue " + issue + " already exists in project " + repo + ", skipping!" + ) + return True + else: + print( + "Issue " + + issue + + " does not exists in project " + + repo + + ", importing!" + ) + return False + else: + print("No issues in project " + repo + ", importing!") + return False + + +# +# Import helper functions +# + + +def _import_project_labels( + gitea_api: pygitea, + labels: [gitlab.v4.objects.ProjectLabel], + owner: string, + repo: string, +): + for label in labels: + if not label_exists(gitea_api, owner, repo, label.name): + import_response: requests.Response = gitea_api.post( + "/repos/" + owner + "/" + repo + "/labels", + json={ + "name": label.name, + "color": label.color, + "description": label.description, # currently not supported + }, + ) + if import_response.ok: + print_info("Label " + label.name + " imported!") + else: + print_error( + "Label " + label.name + " import failed: " + import_response.text + ) + + +def _import_project_milestones( + gitea_api: pygitea, + milestones: [gitlab.v4.objects.ProjectMilestone], + owner: string, + repo: string, +): + for milestone in milestones: + if not milestone_exists(gitea_api, owner, repo, milestone.title): + due_date = None + if milestone.due_date is not None and milestone.due_date != "": + due_date = dateutil.parser.parse(milestone.due_date).strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) + + import_response: requests.Response = gitea_api.post( + "/repos/" + owner + "/" + repo + "/milestones", + json={ + "description": milestone.description, + "due_on": due_date, + "title": milestone.title, + }, + ) + if import_response.ok: + print_info("Milestone " + milestone.title + " imported!") + existing_milestone = import_response.json() + + if existing_milestone: + # update milestone state, this cannot be done in the initial import :( + # TODO: gitea api ignores the closed state... + update_response: requests.Response = gitea_api.patch( + "/repos/" + + owner + + "/" + + repo + + "/milestones/" + + str(existing_milestone["id"]), + json={ + "description": milestone.description, + "due_on": due_date, + "title": milestone.title, + "state": milestone.state, + }, + ) + if update_response.ok: + print_info("Milestone " + milestone.title + " updated!") + else: + print_error( + "Milestone " + + milestone.title + + " update failed: " + + update_response.text + ) + else: + print_error( + "Milestone " + + milestone.title + + " import failed: " + + import_response.text + ) + + +def _import_project_issues( + gitea_api: pygitea, + issues: [gitlab.v4.objects.ProjectIssue], + owner: string, + repo: string, +): + # reload all existing milestones and labels, needed for assignment in issues + existing_milestones = get_milestones(gitea_api, owner, repo) + existing_labels = get_labels(gitea_api, owner, repo) + + for issue in issues: + if not issue_exists(gitea_api, owner, repo, issue.title): + due_date = "" + if issue.due_date is not None: + due_date = dateutil.parser.parse(issue.due_date).strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) + + assignee = None + if issue.assignee is not None: + assignee = issue.assignee["username"] + + assignees = [] + for tmp_assignee in issue.assignees: + assignees.append(tmp_assignee["username"]) + + milestone = None + if issue.milestone is not None: + existing_milestone = next( + ( + item + for item in existing_milestones + if item["title"] == issue.milestone["title"] + ), + None, + ) + if existing_milestone: + milestone = existing_milestone["id"] + + labels = [] + for label in issue.labels: + existing_label = next( + (item for item in existing_labels if item["name"] == label), None + ) + if existing_label: + labels.append(existing_label["id"]) + + import_response: requests.Response = gitea_api.post( + "/repos/" + owner + "/" + repo + "/issues", + json={ + "assignee": assignee, + "assignees": assignees, + "body": issue.description, + "closed": issue.state == "closed", + "due_on": due_date, + "labels": labels, + "milestone": milestone, + "title": issue.title, + }, + ) + if import_response.ok: + print_info("Issue " + issue.title + " imported!") + else: + print_error( + "Issue " + issue.title + " import failed: " + import_response.text + ) + + +def _import_project_repo(gitea_api: pygitea, project: gitlab.v4.objects.Project): + if not repo_exists(gitea_api, project.namespace["name"], name_clean(project.name)): + clone_url = project.http_url_to_repo + if GITLAB_ADMIN_PASS == "" and GITLAB_ADMIN_USER == "": + clone_url = project.ssh_url_to_repo + private = project.visibility == "private" or project.visibility == "internal" + + # Load the owner (users and groups can both be fetched using the /users/ endpoint) + owner = get_user_or_group(gitea_api, project) + if owner: + import_response: requests.Response = gitea_api.post( + "/repos/migrate", + json={ + "auth_password": GITLAB_ADMIN_PASS, + "auth_username": GITLAB_ADMIN_USER, + "clone_addr": clone_url, + "description": project.description, + "mirror": False, + "private": private, + "repo_name": name_clean(project.name), + "uid": owner["id"], + }, + ) + if import_response.ok: + print_info("Project " + name_clean(project.name) + " imported!") + else: + print_error( + "Project " + + name_clean(project.name) + + " import failed: " + + import_response.text + ) + else: + print_error( + "Failed to load project owner for project " + name_clean(project.name) + ) + + +def _import_project_repo_collaborators( + gitea_api: pygitea, + collaborators: [gitlab.v4.objects.ProjectMember], + project: gitlab.v4.objects.Project, +): + owner = get_user_or_group(gitea_api, project) + + for collaborator in collaborators: + clean_username = normalize_username(collaborator.username) + + # Skip if the collaborator is the owner + if owner["type"] == "user" and owner["username"] == clean_username: + continue + + # Map GitLab access levels to Gitea permissions + permission = "read" + if collaborator.access_level >= 30: # Developer+ + permission = "write" + if collaborator.access_level >= 40: # Maintainer+ + permission = "admin" + + if not collaborator_exists( + gitea_api, owner["username"], name_clean(project.name), clean_username + ): + try: + import_response: requests.Response = gitea_api.put( + f"/repos/{owner['username']}/{name_clean(project.name)}/collaborators/{clean_username}", + json={"permission": permission}, + ) + if import_response.ok: + print_info( + f"Collaborator {collaborator.username} added to {name_clean(project.name)} as {permission}!" + ) + else: + print_error( + f"Failed to add collaborator {collaborator.username}: {import_response.text}" + ) + except Exception as e: + print_error( + f"Error adding collaborator {collaborator.username}: {str(e)}" + ) + else: + print_warning( + f"Collaborator {collaborator.username} already exists for repo {name_clean(project.name)}, skipping!" + ) + + +def _import_user( + gitea_api: pygitea, user: gitlab.v4.objects.User, notify: bool = False +): + """Import a single user from GitLab to Gitea""" + # Normalize the username for Gitea compatibility + clean_username = normalize_username(user.username) + + tmp_password = "Tmp1!" + "".join( + random.choices(string.ascii_uppercase + string.digits, k=10) + ) + try: + import_response: requests.Response = gitea_api.post( + "/admin/users", + json={ + "email": ( + user.email + if hasattr(user, "email") and user.email + else f"{clean_username}@placeholder-migration.local" + ), + "full_name": user.name, + "login_name": clean_username, + "password": tmp_password, + "send_notify": notify, + "source_id": 0, # local user + "username": clean_username, + }, + ) + if import_response.ok: + print_info(f"User {user.username} created as {clean_username}") + + # import public keys + keys = user.keys.list(all=True) + _import_user_keys(gitea_api, keys, user) + else: + print_error(f"User {user.username} import failed: {import_response.text}") + except Exception as e: + print_error(f"Error importing user {user.username}: {str(e)}") + + +def _import_users( + gitea_api: pygitea, users: [gitlab.v4.objects.User], notify: bool = False +): + for user in users: + keys: [gitlab.v4.objects.UserKey] = user.keys.list(all=True) + + print("Importing user " + user.username + "...") + print("Found " + str(len(keys)) + " public keys for user " + user.username) + + if not user_exists(gitea_api, user.username): + tmp_password = "Tmp1!" + "".join( + random.choices(string.ascii_uppercase + string.digits, k=10) + ) + tmp_email = ( + user.username + "@noemail-git.local" + ) # Some gitlab instances do not publish user emails + try: + tmp_email = user.email + except AttributeError: + pass + import_response: requests.Response = gitea_api.post( + "/admin/users", + json={ + "email": tmp_email, + "full_name": user.name, + "login_name": user.username, + "password": tmp_password, + "send_notify": notify, + "source_id": 0, # local user + "username": user.username, + }, + ) + if import_response.ok: + print_info( + "User " + + user.username + + " imported, temporary password: " + + tmp_password + ) + else: + print_error( + "User " + user.username + " import failed: " + import_response.text + ) + + # import public keys + _import_user_keys(gitea_api, keys, user) + + +def _import_user( + gitea_api: pygitea, user: gitlab.v4.objects.User, notify: bool = False +): + """Import a single user from GitLab to Gitea""" + # Normalize the username for Gitea compatibility + clean_username = normalize_username(user.username) + + tmp_password = "Tmp1!" + "".join( + random.choices(string.ascii_uppercase + string.digits, k=10) + ) + try: + import_response: requests.Response = gitea_api.post( + "/admin/users", + json={ + "email": ( + user.email + if hasattr(user, "email") and user.email + else f"{clean_username}@placeholder-migration.local" + ), + "full_name": user.name, + "login_name": clean_username, + "password": tmp_password, + "send_notify": notify, + "source_id": 0, # local user + "username": clean_username, + }, + ) + if import_response.ok: + print_info(f"User {user.username} created as {clean_username}") + + # import public keys + keys = user.keys.list(all=True) + _import_user_keys(gitea_api, keys, user) + else: + print_error(f"User {user.username} import failed: {import_response.text}") + except Exception as e: + print_error(f"Error importing user {user.username}: {str(e)}") + + +def _import_group(gitea_api: pygitea, group: gitlab.v4.objects.Group): + """Import a single group (extracted from _import_groups)""" + members: [gitlab.v4.objects.GroupMember] = group.members.list(all=True) + + print("Importing group " + name_clean(group.name) + "...") + print( + "Found " + + str(len(members)) + + " gitlab members for group " + + name_clean(group.name) + ) + + if not organization_exists(gitea_api, name_clean(group.name)): + import_response: requests.Response = gitea_api.post( + "/orgs", + json={ + "description": group.description, + "full_name": group.full_name, + "location": "", + "username": name_clean(group.name), + "website": "", + }, + ) + if import_response.ok: + print_info("Group " + name_clean(group.name) + " imported!") + else: + print_error( + "Group " + + name_clean(group.name) + + " import failed: " + + import_response.text + ) + + # import group members + _import_group_members(gitea_api, members, group) + + +def _import_groups(gitea_api: pygitea, groups: [gitlab.v4.objects.Group]): + for group in groups: + members: [gitlab.v4.objects.GroupMember] = group.members.list(all=True) + + print("Importing group " + name_clean(group.name) + "...") + print( + "Found " + + str(len(members)) + + " gitlab members for group " + + name_clean(group.name) + ) + + if not organization_exists(gitea_api, name_clean(group.name)): + import_response: requests.Response = gitea_api.post( + "/orgs", + json={ + "description": group.description, + "full_name": group.full_name, + "location": "", + "username": name_clean(group.name), + "website": "", + }, + ) + if import_response.ok: + print_info("Group " + name_clean(group.name) + " imported!") + else: + print_error( + "Group " + + name_clean(group.name) + + " import failed: " + + import_response.text + ) + + # import group members + _import_group_members(gitea_api, members, group) + + +def _import_group_members( + gitea_api: pygitea, + members: [gitlab.v4.objects.GroupMember], + group: gitlab.v4.objects.Group, +): + # TODO: create teams based on gitlab permissions (access_level of group member) + existing_teams = get_teams(gitea_api, name_clean(group.name)) + if existing_teams: + first_team = existing_teams[0] + print( + "Organization teams fetched, importing users to first team: " + + first_team["name"] + ) + + # add members to teams + for member in members: + clean_username = normalize_username(member.username) + + if not member_exists(gitea_api, clean_username, first_team["id"]): + try: + import_response: requests.Response = gitea_api.put( + f"/teams/{first_team['id']}/members/{clean_username}" + ) + if import_response.ok: + print_info( + f"Member {member.username} added to team {first_team['name']}!" + ) + else: + print_error( + f"Failed to add member {member.username}: {import_response.text}" + ) + except Exception as e: + print_error(f"Error adding member {member.username}: {str(e)}") + else: + print_warning( + f"Member {member.username} already exists for team {first_team['name']}, skipping!" + ) + else: + print_error( + "Failed to import members to group " + + name_clean(group.name) + + ": no teams found!" + ) + + +# +# Import functions +# + + +def import_users_groups(gitlab_api: gitlab.Gitlab, gitea_api: pygitea, notify=False): + # Load migration state + migration_state = load_migration_state() + if "users" not in migration_state: + migration_state["users"] = [] + if "groups" not in migration_state: + migration_state["groups"] = [] + + # read all users + users: [gitlab.v4.objects.User] = gitlab_api.users.list(all=True) + groups: [gitlab.v4.objects.Group] = gitlab_api.groups.list(all=True) + + print( + "Found " + str(len(users)) + " gitlab users as user " + gitlab_api.user.username + ) + print( + "Found " + + str(len(groups)) + + " gitlab groups as user " + + gitlab_api.user.username + ) + + # import all non existing users + for user in users: + if RESUME_MIGRATION and user.username in migration_state["users"]: + print_warning(f"User {user.username} already imported, skipping!") + continue + + _import_user(gitea_api, user, notify) + migration_state["users"].append(user.username) + save_migration_state(migration_state) + + # import all non existing groups + for group in groups: + group_name = name_clean(group.name) + if RESUME_MIGRATION and group_name in migration_state["groups"]: + print_warning(f"Group {group_name} already imported, skipping!") + continue + + _import_group(gitea_api, group) + migration_state["groups"].append(group_name) + save_migration_state(migration_state) + + +def get_issue_comments( + gitea_api: pygitea, owner: string, repo: string, issue_number: int +) -> []: + """Get all existing comments for an issue""" + existing_comments = [] + try: + comment_response: requests.Response = gitea_api.get( + f"/repos/{owner}/{repo}/issues/{issue_number}/comments" + ) + if comment_response.ok: + existing_comments = comment_response.json() + else: + error_text = comment_response.text + print_error( + f"Failed to load existing comments for issue #{issue_number}! {error_text}" + ) + # If we get a "does not exist" error, return empty list silently + if "issue does not exist" in error_text: + print_warning( + f"Issue #{issue_number} doesn't exist in Gitea - skipping comment import" + ) + return [] + except Exception as e: + print_error(f"Exception getting comments for issue #{issue_number}: {str(e)}") + + return existing_comments + + +def _import_issue_comments( + gitea_api: pygitea, + gitlab_issue: gitlab.v4.objects.ProjectIssue, + gitea_issue_id: int, + owner: string, + repo: string, +): + """Import all comments/notes from a GitLab issue to the corresponding Gitea issue""" + try: + # Load migration state for comment tracking + migration_state = load_migration_state() + if "imported_comments" not in migration_state: + migration_state["imported_comments"] = {} + + comment_key = f"{owner}/{repo}/issues/{gitea_issue_id}" + if comment_key not in migration_state["imported_comments"]: + migration_state["imported_comments"][comment_key] = [] + + # Get existing comments to avoid duplicates + existing_comments = get_issue_comments(gitea_api, owner, repo, gitea_issue_id) + + # Get notes from GitLab + notes = gitlab_issue.notes.list(all=True) + print(f"Found {len(notes)} comments for issue #{gitea_issue_id}") + + imported_count = 0 + for note in notes: + # Skip system notes + if note.system: + continue + + # Skip if note was already imported + note_id = str(note.id) + if note_id in migration_state["imported_comments"][comment_key]: + print_warning(f"Comment {note_id} already imported, skipping") + continue + + # Check for duplicate content + if any(comment["body"] == note.body for comment in existing_comments): + print_warning(f"Comment content already exists, skipping") + migration_state["imported_comments"][comment_key].append(note_id) + save_migration_state(migration_state) + continue + + # Process the body to normalize any @mentions + body = note.body or "" + for mention in extract_user_mentions(body): + normalized_mention = normalize_username(mention) + body = body.replace(f"@{mention}", f"@{normalized_mention}") + + # Create the comment + import_response = gitea_api.post( + f"/repos/{owner}/{repo}/issues/{gitea_issue_id}/comments", + json={"body": body}, + ) + + if import_response.ok: + print_info(f"Comment for issue #{gitea_issue_id} imported!") + migration_state["imported_comments"][comment_key].append(note_id) + save_migration_state(migration_state) + imported_count += 1 + else: + print_error(f"Comment import failed: {import_response.text}") + + print_info( + f"Imported {imported_count} new comments for issue #{gitea_issue_id}" + ) + except Exception as e: + print_error(f"Error importing comments: {str(e)}") + + +def _import_project_issues( + gitea_api: pygitea, + issues: [gitlab.v4.objects.ProjectIssue], + owner: string, + repo: string, +): + # reload all existing milestones and labels, needed for assignment in issues + existing_milestones = get_milestones(gitea_api, owner, repo) + existing_labels = get_labels(gitea_api, owner, repo) + existing_issues = get_issues(gitea_api, owner, repo) + + for issue in issues: + if not issue_exists(gitea_api, owner, repo, issue.title): + due_date = "" + if issue.due_date is not None: + due_date = dateutil.parser.parse(issue.due_date).strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) + + assignee = None + if issue.assignee is not None: + assignee = normalize_username(issue.assignee["username"]) + + assignees = [] + for tmp_assignee in issue.assignees: + assignees.append(normalize_username(tmp_assignee["username"])) + + milestone = None + if issue.milestone is not None: + existing_milestone = next( + ( + item + for item in existing_milestones + if item["title"] == issue.milestone["title"] + ), + None, + ) + if existing_milestone: + milestone = existing_milestone["id"] + + labels = [] + for label in issue.labels: + existing_label = next( + (item for item in existing_labels if item["name"] == label), None + ) + if existing_label: + labels.append(existing_label["id"]) + + # Process the description to normalize any @mentions + description = issue.description or "" + for mention in extract_user_mentions(description): + normalized_mention = normalize_username(mention) + description = description.replace( + f"@{mention}", f"@{normalized_mention}" + ) + + import_response: requests.Response = gitea_api.post( + "/repos/" + owner + "/" + repo + "/issues", + json={ + "assignee": assignee, + "assignees": assignees, + "body": description, + "closed": issue.state == "closed", + "due_on": due_date, + "labels": labels, + "milestone": milestone, + "title": issue.title, + }, + ) + if import_response.ok: + print_info("Issue " + issue.title + " imported!") + + # Get the newly created issue ID to link comments + created_issue = import_response.json() + if created_issue: + # Import comments for this issue + _import_issue_comments( + gitea_api, issue, created_issue["id"], owner, repo + ) + else: + print_error( + "Issue " + issue.title + " import failed: " + import_response.text + ) + else: + # If the issue already exists, we might still want to import its comments + # Find the existing issue ID + existing_issue = next( + (item for item in existing_issues if item["title"] == issue.title), None + ) + if existing_issue: + print_info( + f"Issue {issue.title} already exists, importing comments only." + ) + _import_issue_comments( + gitea_api, issue, existing_issue["number"], owner, repo + ) + + +def save_migration_state(state): + """Save migration progress to a state file""" + with open(MIGRATION_STATE_FILE, "w") as f: + json.dump(state, f) + + +def load_migration_state(): + """Load migration progress from state file""" + if os.path.exists(MIGRATION_STATE_FILE): + with open(MIGRATION_STATE_FILE, "r") as f: + return json.load(f) + return { + "users": [], + "groups": [], + "projects": [], + "imported_comments": {}, # Track imported comments by issue id + } + + +def import_projects(gitlab_api: gitlab.Gitlab, gitea_api: pygitea): + # Load migration state + migration_state = load_migration_state() + if "projects" not in migration_state: + migration_state["projects"] = [] + + print_info("Pre-creating all necessary users for project migration...") + + # Get all projects to analyze + projects: [gitlab.v4.objects.Project] = gitlab_api.projects.list(all=True) + + # Create a set of all usernames and namespaces that need to exist + required_users = set() + + # Add the known problematic user + required_users.add("i2p developers") + + # Collect all users from projects + for project in projects: + # Add project namespace/owner if it's a user + if project.namespace["kind"] == "user": + required_users.add(project.namespace["path"]) + + # Collect project members + try: + collaborators = project.members.list(all=True) + for collaborator in collaborators: + required_users.add(collaborator.username) + except Exception as e: + print_warning( + f"Error collecting collaborators for {project.name}: {str(e)}" + ) + + # Collect issue authors and assignees + try: + issues = project.issues.list(all=True) + for issue in issues: + # Add issue author + if "author" in issue.attributes and issue.author: + required_users.add(issue.author["username"]) + + # Add issue assignees + if issue.assignee: + required_users.add(issue.assignee["username"]) + for assignee in issue.assignees: + required_users.add(assignee["username"]) + + # Process issue notes/comments for authors + try: + notes = issue.notes.list(all=True) + for note in notes: + if not note.system and "author" in note.attributes: + required_users.add(note.author["username"]) + except Exception as e: + print_warning( + f"Error collecting notes for issue #{issue.iid}: {str(e)}" + ) + except Exception as e: + print_warning(f"Error collecting issues for {project.name}: {str(e)}") + + # Collect milestone authors + try: + milestones = project.milestones.list(all=True) + for milestone in milestones: + if hasattr(milestone, "author") and milestone.author: + required_users.add(milestone.author["username"]) + except Exception as e: + print_warning(f"Error collecting milestones for {project.name}: {str(e)}") + + # Create any missing users + print_info(f"Found {len(required_users)} users that need to exist in Gitea") + for username in required_users: + if not user_exists(gitea_api, username): + _import_placeholder_user(gitea_api, username) + + print_info("Starting project migration...") + + # read all projects and their issues + print( + "Found " + + str(len(projects)) + + " gitlab projects as user " + + gitlab_api.user.username + ) + + for project in projects: + project_key = f"{project.namespace['name']}/{name_clean(project.name)}" + + # Skip if project was already fully imported + if RESUME_MIGRATION and project_key in migration_state["projects"]: + print_warning(f"Project {project_key} already imported, skipping!") + continue + + collaborators: [gitlab.v4.objects.ProjectMember] = project.members.list( + all=True + ) + labels: [gitlab.v4.objects.ProjectLabel] = project.labels.list(all=True) + milestones: [gitlab.v4.objects.ProjectMilestone] = project.milestones.list( + all=True + ) + issues: [gitlab.v4.objects.ProjectIssue] = project.issues.list(all=True) + + print( + "Importing project " + + name_clean(project.name) + + " from owner " + + project.namespace["name"] + ) + + # Pre-import any users mentioned in issues to avoid redirect errors + ensure_mentioned_users_exist(gitea_api, gitlab_api, issues) + + print( + "Found " + + str(len(collaborators)) + + " collaborators for project " + + name_clean(project.name) + ) + print( + "Found " + + str(len(labels)) + + " labels for project " + + name_clean(project.name) + ) + print( + "Found " + + str(len(milestones)) + + " milestones for project " + + name_clean(project.name) + ) + print( + "Found " + + str(len(issues)) + + " issues for project " + + name_clean(project.name) + ) + + # import project repo + _import_project_repo(gitea_api, project) + + # import collaborators + _import_project_repo_collaborators(gitea_api, collaborators, project) + + # import labels + _import_project_labels( + gitea_api, labels, project.namespace["name"], name_clean(project.name) + ) + + # import milestones + _import_project_milestones( + gitea_api, milestones, project.namespace["name"], name_clean(project.name) + ) + + # import issues (now includes comments) + _import_project_issues( + gitea_api, issues, project.namespace["name"], name_clean(project.name) + ) + + # Mark project as imported in the migration state + migration_state["projects"].append(project_key) + save_migration_state(migration_state) + + +# +# Helper functions +# + + +class bcolors: + HEADER = "\033[95m" + OKBLUE = "\033[94m" + OKGREEN = "\033[92m" + WARNING = "\033[93m" + FAIL = "\033[91m" + ENDC = "\033[0m" + BOLD = "\033[1m" + UNDERLINE = "\033[4m" + + +def color_message(color, message, colorend=bcolors.ENDC, bold=False): + if bold: + return bcolors.BOLD + color_message(color, message, colorend, False) + + return color + message + colorend + + +def print_color(color, message, colorend=bcolors.ENDC, bold=False): + print(color_message(color, message, colorend)) + + +def print_info(message): + print_color(bcolors.OKBLUE, message) + + +def print_success(message): + print_color(bcolors.OKGREEN, message) + + +def print_warning(message): + print_color(bcolors.WARNING, message) + + +def print_error(message): + global GLOBAL_ERROR_COUNT + GLOBAL_ERROR_COUNT += 1 + print_color(bcolors.FAIL, message) + + +def name_clean(name): + newName = name.replace(" ", "_") + newName = re.sub(r"[^a-zA-Z0-9_\.-]", "-", newName) + + if newName.lower() == "plugins": + return newName + "-user" + + return newName + + +if __name__ == "__main__": + main()