ARTICLE AD BOX
I'm want to transfer files quickly to my other account using python. But I get a problem whenever I tried, it always returns
2026-04-17 17:22:45,421 INFO file_id=...<file-id>....- status=failed name=Graph.JPG message=<HttpError 403 when requesting https://www.googleapis.com/drive/v3/files/....<file-id>..../permissions/04862162013125427771?supportsAllDrives=true&fields=id%2CemailAddress%2Crole%2CpendingOwner&alt=json returned "The target user cannot be a pending owner because the target user does not have a writer role for the file.". Details: "[{'message': 'The target user cannot be a pending owner because the target user does not have a writer role for the file.', 'domain': 'global', 'reason': 'pendingOwnerWriterRequired'}]">which clearly I have on the other account. I already give editor permission.
here is my code.
Note this is from AI
import argparse import json import logging import os import random import sys import time from datetime import datetime, timezone from pathlib import Path from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence from urllib.parse import parse_qs from google.auth.transport.requests import Request from google.oauth2 import service_account from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build from googleapiclient.errors import HttpError SCOPES = ["https://www.googleapis.com/auth/drive"] FOLDER_MIME = "application/vnd.google-apps.folder" RETRYABLE_REASONS = { "rateLimitExceeded", "userRateLimitExceeded", "sharingRateLimitExceeded", "backendError", "internalError", } NON_RETRYABLE_REASONS = {"pendingOwnerWriterRequired"} RETRYABLE_STATUS = {403, 429, 500, 502, 503, 504} def parse_args() -> argparse.Namespace: base_dir = Path(__file__).resolve().parent parser = argparse.ArgumentParser() parser.add_argument("--auth-mode", choices=["oauth", "service-account"], default="oauth") parser.add_argument("--credentials-file") parser.add_argument("--config-file", default=str(base_dir / "request_config.json")) parser.add_argument("--token-file", default=str(base_dir / "request_token.json")) parser.add_argument("--oauth-redirect-uri") parser.add_argument("--subject-email") parser.add_argument("--folder-id") parser.add_argument("--target-owner-email") parser.add_argument("--page-size", type=int, default=200) parser.add_argument("--batch-size", type=int, default=100) parser.add_argument("--max-files", type=int) parser.add_argument("--max-retries", type=int, default=8) parser.add_argument("--min-backoff", type=float, default=1.0) parser.add_argument("--max-backoff", type=float, default=32.0) parser.add_argument("--send-notification-email", action="store_true", default=True) parser.add_argument("--no-send-notification-email", dest="send_notification_email", action="store_false") parser.add_argument("--log-file", default=str(base_dir / "request.log")) parser.add_argument("--result-file", default=str(base_dir / "request_results.jsonl")) parser.add_argument("--dry-run", action="store_true") return parser.parse_args() def guess_credentials_file(auth_mode: str) -> Optional[str]: base_dir = Path(__file__).resolve().parent if auth_mode == "service-account": patterns = [ "service-account.json", "*service-account*.json", "*.service-account.json", ] else: patterns = [ "cred.json", "oauth-client.json", "credentials.json", "client_secret*.json", "*oauth*.json", ] for pattern in patterns: matches = sorted(base_dir.glob(pattern)) if matches: return str(matches[0]) return None def prompt_value(label: str, current_value: Optional[str], default: Optional[str] = None) -> str: if current_value: return current_value if default: entered = input(f"{label} [{default}]: ").strip() return entered or default while True: entered = input(f"{label}: ").strip() if entered: return entered def load_json_file(path: str) -> Dict[str, Any]: try: payload = json.loads(Path(path).read_text(encoding="utf-8")) except Exception: return {} return payload if isinstance(payload, dict) else {} def save_json_file(path: str, payload: Dict[str, Any]) -> None: config_path = Path(path) config_path.parent.mkdir(parents=True, exist_ok=True) config_path.write_text(json.dumps(payload, ensure_ascii=True, indent=2), encoding="utf-8") def normalize_args(args: argparse.Namespace) -> argparse.Namespace: saved_config = load_json_file(args.config_file) guessed_credentials = guess_credentials_file(args.auth_mode) saved_credentials = saved_config.get("credentials_file") saved_folder_id = saved_config.get("folder_id") saved_target_owner_email = saved_config.get("target_owner_email") saved_subject_email = saved_config.get("subject_email") saved_redirect_uri = saved_config.get("oauth_redirect_uri") args.credentials_file = prompt_value( "Path credentials file", args.credentials_file, saved_credentials or guessed_credentials, ) if args.auth_mode == "service-account": args.subject_email = prompt_value( "Email akun sumber untuk impersonation", args.subject_email, saved_subject_email, ) args.folder_id = prompt_value("Folder ID sumber", args.folder_id, saved_folder_id) args.target_owner_email = prompt_value("Email akun tujuan", args.target_owner_email, saved_target_owner_email) if not args.oauth_redirect_uri: args.oauth_redirect_uri = saved_redirect_uri save_json_file( args.config_file, { "auth_mode": args.auth_mode, "credentials_file": args.credentials_file, "oauth_redirect_uri": args.oauth_redirect_uri, "subject_email": args.subject_email, "folder_id": args.folder_id, "target_owner_email": args.target_owner_email, }, ) return args def resolve_oauth_redirect_uri(credentials_file: str, current_value: Optional[str]) -> str: if current_value: return current_value try: payload = json.loads(Path(credentials_file).read_text(encoding="utf-8")) except Exception: payload = {} for key in ("installed", "web"): config = payload.get(key) or {} redirect_uris = config.get("redirect_uris") or [] if redirect_uris: return redirect_uris[0] return "http://localhost" def configure_oauth_transport(redirect_uri: str) -> None: if redirect_uri.startswith("http://localhost") or redirect_uri.startswith("http://127.0.0.1"): os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1" def setup_logging(log_file: str) -> None: log_path = Path(log_file) log_path.parent.mkdir(parents=True, exist_ok=True) handlers = [ logging.FileHandler(log_path, encoding="utf-8"), logging.StreamHandler(sys.stdout), ] logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s", handlers=handlers, ) def utc_now() -> str: return datetime.now(timezone.utc).isoformat() def write_result(path: str, record: Dict[str, Any]) -> None: result_path = Path(path) result_path.parent.mkdir(parents=True, exist_ok=True) with result_path.open("a", encoding="utf-8") as handle: handle.write(json.dumps(record, ensure_ascii=True) + "\n") def load_credentials(args: argparse.Namespace): if args.auth_mode == "service-account": creds = service_account.Credentials.from_service_account_file( args.credentials_file, scopes=SCOPES, ) if args.subject_email: creds = creds.with_subject(args.subject_email) return creds token_path = Path(args.token_file) creds = None if token_path.exists(): creds = Credentials.from_authorized_user_file(str(token_path), SCOPES) if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) token_path.write_text(creds.to_json(), encoding="utf-8") return creds if creds and creds.valid: return creds args.oauth_redirect_uri = resolve_oauth_redirect_uri(args.credentials_file, args.oauth_redirect_uri) configure_oauth_transport(args.oauth_redirect_uri) flow = InstalledAppFlow.from_client_secrets_file(args.credentials_file, SCOPES) flow.redirect_uri = args.oauth_redirect_uri auth_url, _ = flow.authorization_url(access_type="offline", prompt="consent") print("Buka URL berikut di browser untuk login:") print(auth_url) print("") redirected = input("Setelah redirect ke localhost, paste URL lengkapnya di sini lalu Enter: ").strip() if redirected.startswith("http://") or redirected.startswith("https://"): flow.fetch_token(authorization_response=redirected) else: parsed = parse_qs(redirected) code = (parsed.get("code") or [redirected])[0] flow.fetch_token(code=code) creds = flow.credentials token_path.write_text(creds.to_json(), encoding="utf-8") return creds def build_service(args: argparse.Namespace): creds = load_credentials(args) return build("drive", "v3", credentials=creds, cache_discovery=False) def extract_error(error: HttpError) -> Dict[str, Any]: status = getattr(getattr(error, "resp", None), "status", None) reason = None message = str(error) payload = None try: payload = json.loads(error.content.decode("utf-8")) except Exception: payload = None if isinstance(payload, dict): error_block = payload.get("error") or {} message = error_block.get("message", message) details = error_block.get("errors") or [] if details: reason = details[0].get("reason") return {"status": status, "reason": reason, "message": message} def should_retry(error: Exception) -> bool: if not isinstance(error, HttpError): return isinstance(error, (TimeoutError, ConnectionError)) details = extract_error(error) if details["reason"] in NON_RETRYABLE_REASONS: return False return details["status"] in RETRYABLE_STATUS or details["reason"] in RETRYABLE_REASONS def execute_with_retry( request_factory: Callable[[], Any], args: argparse.Namespace, action: str, file_id: Optional[str] = None, ) -> Any: attempt = 0 while True: try: return request_factory().execute() except Exception as error: attempt += 1 if attempt > args.max_retries or not should_retry(error): raise sleep_for = min(args.max_backoff, args.min_backoff * (2 ** (attempt - 1))) + random.random() logging.warning( "retrying action=%s file_id=%s attempt=%s wait=%.2fs error=%s", action, file_id, attempt, sleep_for, error, ) time.sleep(sleep_for) def get_me(service, args: argparse.Namespace) -> Dict[str, Any]: return execute_with_retry( lambda: service.about().get(fields="user(emailAddress,displayName)"), args, "about.get", ) def get_file(service, args: argparse.Namespace, file_id: str) -> Dict[str, Any]: return execute_with_retry( lambda: service.files().get( fileId=file_id, fields="id,name,mimeType,owners(emailAddress),trashed,driveId", supportsAllDrives=True, ), args, "files.get", file_id=file_id, ) def list_children( service, args: argparse.Namespace, folder_id: str, page_token: Optional[str] = None, ) -> Dict[str, Any]: return execute_with_retry( lambda: service.files().list( q=f"'{folder_id}' in parents and trashed = false", fields="nextPageToken,files(id,name,mimeType,owners(emailAddress),trashed,driveId)", pageSize=args.page_size, pageToken=page_token, includeItemsFromAllDrives=True, supportsAllDrives=True, ), args, "files.list", file_id=folder_id, ) def iter_tree( service, args: argparse.Namespace, root_folder_id: str, ) -> Iterable[Dict[str, Any]]: root = get_file(service, args, root_folder_id) seen = {root_folder_id} queue = [root] yielded = 0 while queue: current = queue.pop(0) yield current yielded += 1 if args.max_files and yielded >= args.max_files: return if current.get("mimeType") != FOLDER_MIME: continue page_token = None while True: response = list_children(service, args, current["id"], page_token=page_token) for child in response.get("files", []): child_id = child["id"] if child_id in seen: continue seen.add(child_id) queue.append(child) page_token = response.get("nextPageToken") if not page_token: break def list_permissions(service, args: argparse.Namespace, file_id: str) -> List[Dict[str, Any]]: response = execute_with_retry( lambda: service.permissions().list( fileId=file_id, fields="permissions(id,emailAddress,role,type,pendingOwner,deleted,permissionDetails(inherited,inheritedFrom,role,permissionType))", supportsAllDrives=True, ), args, "permissions.list", file_id=file_id, ) return response.get("permissions", []) def is_owned_by(item: Dict[str, Any], owner_email: str) -> bool: owners = item.get("owners") or [] owner_lower = owner_email.lower() return any((owner.get("emailAddress") or "").lower() == owner_lower for owner in owners) def find_target_permission(permissions: Sequence[Dict[str, Any]], target_email: str) -> Optional[Dict[str, Any]]: target_lower = target_email.lower() for permission in permissions: if permission.get("deleted"): continue email = (permission.get("emailAddress") or "").lower() if permission.get("type") == "user" and email == target_lower: return permission return None def permission_is_direct(permission: Dict[str, Any]) -> bool: details = permission.get("permissionDetails") or [] if not details: return True return any(not detail.get("inherited", False) for detail in details) def permission_has_writer_role(permission: Dict[str, Any]) -> bool: if permission.get("role") in {"writer", "owner"}: return True details = permission.get("permissionDetails") or [] return any(detail.get("role") in {"writer", "owner"} for detail in details) def refresh_target_permission( service, args: argparse.Namespace, file_id: str, target_email: str, ) -> Optional[Dict[str, Any]]: permissions = list_permissions(service, args, file_id) return find_target_permission(permissions, target_email) def ensure_direct_writer_permission( service, args: argparse.Namespace, item: Dict[str, Any], ) -> Dict[str, Any]: target_permission = refresh_target_permission(service, args, item["id"], args.target_owner_email) if target_permission and target_permission.get("role") == "owner": return target_permission if target_permission and permission_is_direct(target_permission) and permission_has_writer_role(target_permission): return target_permission if target_permission and permission_is_direct(target_permission): updated = execute_with_retry( lambda: service.permissions().update( fileId=item["id"], permissionId=target_permission["id"], body={"role": "writer"}, supportsAllDrives=True, fields="id,emailAddress,role,pendingOwner", ), args, "permissions.update.writer", file_id=item["id"], ) time.sleep(2) return updated else: created = execute_with_retry( lambda: service.permissions().create( fileId=item["id"], body={ "type": "user", "role": "writer", "emailAddress": args.target_owner_email, }, supportsAllDrives=True, sendNotificationEmail=args.send_notification_email, fields="id,emailAddress,role,pendingOwner", ), args, "permissions.create.writer", file_id=item["id"], ) time.sleep(2) return created raise RuntimeError("Failed to prepare writer permission for target account") def set_pending_owner( service, args: argparse.Namespace, item: Dict[str, Any], permission_id: str, ) -> Dict[str, Any]: try: return execute_with_retry( lambda: service.permissions().update( fileId=item["id"], permissionId=permission_id, body={"role": "writer", "pendingOwner": True}, supportsAllDrives=True, fields="id,emailAddress,role,pendingOwner", ), args, "permissions.update.pendingOwner", file_id=item["id"], ) except HttpError as error: details = extract_error(error) if details["reason"] != "pendingOwnerWriterRequired": raise direct_writer = ensure_direct_writer_permission(service, args, item) return execute_with_retry( lambda: service.permissions().update( fileId=item["id"], permissionId=direct_writer["id"], body={"role": "writer", "pendingOwner": True}, supportsAllDrives=True, fields="id,emailAddress,role,pendingOwner", ), args, "permissions.update.pendingOwner.retry", file_id=item["id"], ) def skip_record(item: Dict[str, Any], status: str, message: str) -> Dict[str, Any]: return { "timestamp": utc_now(), "fileId": item.get("id"), "name": item.get("name"), "mimeType": item.get("mimeType"), "status": status, "message": message, } def request_transfer( service, args: argparse.Namespace, item: Dict[str, Any], source_email: str, ) -> Dict[str, Any]: record = { "timestamp": utc_now(), "fileId": item["id"], "name": item.get("name"), "mimeType": item.get("mimeType"), "targetOwnerEmail": args.target_owner_email, } if item.get("trashed"): record.update({"status": "skipped_trashed", "message": "File is trashed"}) return record if item.get("driveId"): record.update( { "status": "skipped_shared_drive", "message": "Ownership transfer is not supported for shared drive items", } ) return record if not is_owned_by(item, source_email): record.update( { "status": "skipped_not_owned", "message": f"Authenticated account does not own this file as {source_email}", } ) return record if args.dry_run: record.update({"status": "dry_run", "message": "No permission change was sent"}) return record target_permission = refresh_target_permission(service, args, item["id"], args.target_owner_email) if target_permission and target_permission.get("role") == "owner": record.update({"status": "already_owner", "message": "Target account is already the owner"}) return record direct_writer = ensure_direct_writer_permission(service, args, item) updated = set_pending_owner(service, args, item, direct_writer["id"]) record.update( { "status": "requested", "permissionId": updated.get("id"), "pendingOwner": updated.get("pendingOwner"), "role": updated.get("role"), "message": "Prepared direct writer permission and set pending owner", } ) return record def chunks(items: Sequence[Dict[str, Any]], size: int) -> Iterable[Sequence[Dict[str, Any]]]: for index in range(0, len(items), size): yield items[index : index + size] def main() -> int: args = normalize_args(parse_args()) setup_logging(args.log_file) service = build_service(args) me = get_me(service, args) source_email = ((me.get("user") or {}).get("emailAddress") or "").strip() if not source_email: logging.error("Unable to determine authenticated email address") return 1 logging.info("authenticated email=%s target=%s", source_email, args.target_owner_email) items = list(iter_tree(service, args, args.folder_id)) logging.info("discovered items=%s folder_id=%s", len(items), args.folder_id) counts = { "requested": 0, "already_owner": 0, "dry_run": 0, "skipped_not_owned": 0, "skipped_shared_drive": 0, "skipped_trashed": 0, "failed": 0, } for batch_number, batch in enumerate(chunks(items, args.batch_size), start=1): logging.info("processing batch=%s size=%s", batch_number, len(batch)) for item in batch: try: result = request_transfer(service, args, item, source_email) except Exception as error: result = skip_record(item, "failed", str(error)) counts[result["status"]] = counts.get(result["status"], 0) + 1 write_result(args.result_file, result) logging.info( "file_id=%s status=%s name=%s message=%s", result.get("fileId"), result.get("status"), result.get("name"), result.get("message"), ) logging.info("summary=%s", json.dumps(counts, ensure_ascii=True, sort_keys=True)) return 0 if counts.get("failed", 0) == 0 else 2 if __name__ == "__main__": raise SystemExit(main())My permission is already editor, idk whats wrong this. I just want to transfer quickly.
