55from pathlib import Path
66from typing import Dict , List , Optional , Set
77
8- lib_logger = logging .getLogger (' rotator_library' )
8+ lib_logger = logging .getLogger (" rotator_library" )
99
1010OAUTH_BASE_DIR = Path .cwd () / "oauth_creds"
1111OAUTH_BASE_DIR .mkdir (exist_ok = True )
1616 "qwen_code" : Path .home () / ".qwen" ,
1717 "iflow" : Path .home () / ".iflow" ,
1818 "antigravity" : Path .home () / ".antigravity" ,
19+ "copilot" : Path .home () / ".copilot" ,
1920 # Add other providers like 'claude' here if they have a standard CLI path
2021}
2122
2627 "antigravity" : "ANTIGRAVITY" ,
2728 "qwen_code" : "QWEN_CODE" ,
2829 "iflow" : "IFLOW" ,
30+ "copilot" : "COPILOT" ,
2931}
3032
3133
3234class CredentialManager :
3335 """
3436 Discovers OAuth credential files from standard locations, copies them locally,
3537 and updates the configuration to use the local paths.
36-
38+
3739 Also discovers environment variable-based OAuth credentials for stateless deployments.
3840 Supports two env var formats:
39-
41+
4042 1. Single credential (legacy): PROVIDER_ACCESS_TOKEN, PROVIDER_REFRESH_TOKEN
4143 2. Multiple credentials (numbered): PROVIDER_1_ACCESS_TOKEN, PROVIDER_2_ACCESS_TOKEN, etc.
42-
44+
4345 When env-based credentials are detected, virtual paths like "env://provider/1" are created.
4446 """
47+
4548 def __init__ (self , env_vars : Dict [str , str ]):
4649 self .env_vars = env_vars
4750
4851 def _discover_env_oauth_credentials (self ) -> Dict [str , List [str ]]:
4952 """
5053 Discover OAuth credentials defined via environment variables.
51-
54+
5255 Supports two formats:
5356 1. Single credential: ANTIGRAVITY_ACCESS_TOKEN + ANTIGRAVITY_REFRESH_TOKEN
5457 2. Multiple credentials: ANTIGRAVITY_1_ACCESS_TOKEN + ANTIGRAVITY_1_REFRESH_TOKEN, etc.
55-
58+
5659 Returns:
5760 Dict mapping provider name to list of virtual paths (e.g., "env://antigravity/1")
5861 """
5962 env_credentials : Dict [str , Set [str ]] = {}
60-
63+
6164 for provider , env_prefix in ENV_OAUTH_PROVIDERS .items ():
6265 found_indices : Set [str ] = set ()
63-
66+
6467 # Check for numbered credentials (PROVIDER_N_ACCESS_TOKEN pattern)
6568 # Pattern: ANTIGRAVITY_1_ACCESS_TOKEN, ANTIGRAVITY_2_ACCESS_TOKEN, etc.
6669 numbered_pattern = re .compile (rf"^{ env_prefix } _(\d+)_ACCESS_TOKEN$" )
67-
70+
6871 for key in self .env_vars .keys ():
6972 match = numbered_pattern .match (key )
7073 if match :
@@ -73,28 +76,34 @@ def _discover_env_oauth_credentials(self) -> Dict[str, List[str]]:
7376 refresh_key = f"{ env_prefix } _{ index } _REFRESH_TOKEN"
7477 if refresh_key in self .env_vars and self .env_vars [refresh_key ]:
7578 found_indices .add (index )
76-
79+
7780 # Check for legacy single credential (PROVIDER_ACCESS_TOKEN pattern)
7881 # Only use this if no numbered credentials exist
7982 if not found_indices :
8083 access_key = f"{ env_prefix } _ACCESS_TOKEN"
8184 refresh_key = f"{ env_prefix } _REFRESH_TOKEN"
82- if (access_key in self .env_vars and self .env_vars [access_key ] and
83- refresh_key in self .env_vars and self .env_vars [refresh_key ]):
85+ if (
86+ access_key in self .env_vars
87+ and self .env_vars [access_key ]
88+ and refresh_key in self .env_vars
89+ and self .env_vars [refresh_key ]
90+ ):
8491 # Use "0" as the index for legacy single credential
8592 found_indices .add ("0" )
86-
93+
8794 if found_indices :
8895 env_credentials [provider ] = found_indices
89- lib_logger .info (f"Found { len (found_indices )} env-based credential(s) for { provider } " )
90-
96+ lib_logger .info (
97+ f"Found { len (found_indices )} env-based credential(s) for { provider } "
98+ )
99+
91100 # Convert to virtual paths
92101 result : Dict [str , List [str ]] = {}
93102 for provider , indices in env_credentials .items ():
94103 # Sort indices numerically for consistent ordering
95104 sorted_indices = sorted (indices , key = lambda x : int (x ))
96105 result [provider ] = [f"env://{ provider } /{ idx } " for idx in sorted_indices ]
97-
106+
98107 return result
99108
100109 def discover_and_prepare (self ) -> Dict [str , List [str ]]:
@@ -105,7 +114,9 @@ def discover_and_prepare(self) -> Dict[str, List[str]]:
105114 # These take priority for stateless deployments
106115 env_oauth_creds = self ._discover_env_oauth_credentials ()
107116 for provider , virtual_paths in env_oauth_creds .items ():
108- lib_logger .info (f"Using { len (virtual_paths )} env-based credential(s) for { provider } " )
117+ lib_logger .info (
118+ f"Using { len (virtual_paths )} env-based credential(s) for { provider } "
119+ )
109120 final_config [provider ] = virtual_paths
110121
111122 # Extract OAuth file paths from environment variables
@@ -115,21 +126,29 @@ def discover_and_prepare(self) -> Dict[str, List[str]]:
115126 provider = key .split ("_OAUTH_" )[0 ].lower ()
116127 if provider not in env_oauth_paths :
117128 env_oauth_paths [provider ] = []
118- if value : # Only consider non-empty values
129+ if value : # Only consider non-empty values
119130 env_oauth_paths [provider ].append (value )
120131
121132 # PHASE 2: Discover file-based OAuth credentials
122133 for provider , default_dir in DEFAULT_OAUTH_DIRS .items ():
123134 # Skip if already discovered from environment variables
124135 if provider in final_config :
125- lib_logger .debug (f"Skipping file discovery for { provider } - using env-based credentials" )
136+ lib_logger .debug (
137+ f"Skipping file discovery for { provider } - using env-based credentials"
138+ )
126139 continue
127-
140+
128141 # Check for existing local credentials first. If found, use them and skip discovery.
129- local_provider_creds = sorted (list (OAUTH_BASE_DIR .glob (f"{ provider } _oauth_*.json" )))
142+ local_provider_creds = sorted (
143+ list (OAUTH_BASE_DIR .glob (f"{ provider } _oauth_*.json" ))
144+ )
130145 if local_provider_creds :
131- lib_logger .info (f"Found { len (local_provider_creds )} existing local credential(s) for { provider } . Skipping discovery." )
132- final_config [provider ] = [str (p .resolve ()) for p in local_provider_creds ]
146+ lib_logger .info (
147+ f"Found { len (local_provider_creds )} existing local credential(s) for { provider } . Skipping discovery."
148+ )
149+ final_config [provider ] = [
150+ str (p .resolve ()) for p in local_provider_creds
151+ ]
133152 continue
134153
135154 # If no local credentials exist, proceed with a one-time discovery and copy.
@@ -140,13 +159,13 @@ def discover_and_prepare(self) -> Dict[str, List[str]]:
140159 path = Path (path_str ).expanduser ()
141160 if path .exists ():
142161 discovered_paths .add (path )
143-
162+
144163 # 2. If no overrides are provided via .env, scan the default directory
145164 # [MODIFIED] This logic is now disabled to prefer local-first credential management.
146165 # if not discovered_paths and default_dir.exists():
147166 # for json_file in default_dir.glob('*.json'):
148167 # discovered_paths.add(json_file)
149-
168+
150169 if not discovered_paths :
151170 lib_logger .debug (f"No credential files found for provider: { provider } " )
152171 continue
@@ -161,13 +180,19 @@ def discover_and_prepare(self) -> Dict[str, List[str]]:
161180 try :
162181 # Since we've established no local files exist, we can copy directly.
163182 shutil .copy (source_path , local_path )
164- lib_logger .info (f"Copied '{ source_path .name } ' to local pool at '{ local_path } '." )
183+ lib_logger .info (
184+ f"Copied '{ source_path .name } ' to local pool at '{ local_path } '."
185+ )
165186 prepared_paths .append (str (local_path .resolve ()))
166187 except Exception as e :
167- lib_logger .error (f"Failed to process OAuth file from '{ source_path } ': { e } " )
168-
188+ lib_logger .error (
189+ f"Failed to process OAuth file from '{ source_path } ': { e } "
190+ )
191+
169192 if prepared_paths :
170- lib_logger .info (f"Discovered and prepared { len (prepared_paths )} credential(s) for provider: { provider } " )
193+ lib_logger .info (
194+ f"Discovered and prepared { len (prepared_paths )} credential(s) for provider: { provider } "
195+ )
171196 final_config [provider ] = prepared_paths
172197
173198 lib_logger .info ("OAuth credential discovery complete." )
0 commit comments