Skip to content

Commit 2784cde

Browse files
authored
Refactor PrismJobServer python class (#35516)
* Refactor PrismRunner class in Python. Fix typos and add comments. * Add some handling for release candidate versions. * Fix lints.
1 parent f983cf7 commit 2784cde

1 file changed

Lines changed: 144 additions & 58 deletions

File tree

sdks/python/apache_beam/runners/portability/prism_runner.py

Lines changed: 144 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@ def _rename_if_different(src, dst):
112112

113113

114114
class PrismJobServer(job_server.SubprocessJobServer):
115-
PRISM_CACHE = os.path.expanduser("~/.apache_beam/cache/prism")
116115
BIN_CACHE = os.path.expanduser("~/.apache_beam/cache/prism/bin")
117116

118117
def __init__(self, options):
@@ -131,9 +130,32 @@ def __init__(self, options):
131130
job_options = options.view_as(pipeline_options.JobServerOptions)
132131
self._job_port = job_options.job_port
133132

133+
# the method is only kept for testing and backward compatibility
134134
@classmethod
135-
def maybe_unzip_and_make_executable(
136-
cls, url: str, bin_cache: str, ignore_cache: bool = True) -> str:
135+
def local_bin(
136+
cls, url: str, bin_cache: str = '', ignore_cache: bool = False) -> str:
137+
url, ignore_cache = cls._download_to_local_path(url,
138+
bin_cache,
139+
ignore_cache)
140+
return cls._prepare_executable(url, bin_cache, ignore_cache)
141+
142+
# the method is only kept for testing and backward compatibility
143+
def path_to_binary(self) -> str:
144+
return self._resolve_source_path()
145+
146+
# the method is only kept for testing and backward compatibility
147+
def construct_download_url(self, root_tag: str, sys: str, mach: str) -> str:
148+
return self._construct_download_url(self._version, root_tag, sys, mach)
149+
150+
@staticmethod
151+
def _prepare_executable(
152+
url: str, bin_cache: str, ignore_cache: bool = True) -> str:
153+
"""
154+
Given a path to a local artifact (zip or binary), makes it an
155+
executable binary file.
156+
157+
Returns the path to the final, executable binary.
158+
"""
137159
assert (os.path.isfile(url))
138160

139161
if zipfile.is_zipfile(url):
@@ -163,14 +185,19 @@ def maybe_unzip_and_make_executable(
163185
os.chmod(target_url, st.st_mode | stat.S_IEXEC)
164186
return target_url
165187

166-
# Finds the bin or zip in the local cache, and if not, fetches it.
167-
@classmethod
168-
def local_bin(
169-
cls, url: str, bin_cache: str = '', ignore_cache: bool = False) -> str:
188+
@staticmethod
189+
def _download_to_local_path(
190+
url: str,
191+
bin_cache: str = '',
192+
ignore_cache: bool = False) -> tuple[str, bool]:
193+
"""
194+
Ensures the artifact is on local disk, downloading it if necessary.
195+
Returns the path to the local (potentially cached) artifact.
196+
"""
170197
# ignore_cache sets whether we should always be downloading and unzipping
171198
# the file or not, to avoid staleness issues.
172199
if bin_cache == '':
173-
bin_cache = cls.BIN_CACHE
200+
bin_cache = PrismJobServer.BIN_CACHE
174201
if os.path.exists(url):
175202
_LOGGER.info('Using local prism binary/zip from %s' % url)
176203
cached_file = url
@@ -198,10 +225,11 @@ def local_bin(
198225
# If we download a new prism, then we should always use it but not
199226
# the cached one.
200227
ignore_cache = True
201-
return cls.maybe_unzip_and_make_executable(
202-
cached_file, bin_cache=bin_cache, ignore_cache=ignore_cache)
228+
return cached_file, ignore_cache
203229

204-
def construct_download_url(self, root_tag: str, sys: str, mach: str) -> str:
230+
@staticmethod
231+
def _construct_download_url(
232+
version: str, root_tag: str, sys: str, mach: str) -> str:
205233
"""Construct the prism download URL with the appropriate release tag.
206234
This maps operating systems and machine architectures to the compatible
207235
and canonical names used by the Go build targets.
@@ -223,64 +251,83 @@ def construct_download_url(self, root_tag: str, sys: str, mach: str) -> str:
223251

224252
if arch not in ['amd64', 'arm64']:
225253
raise ValueError(
226-
'Machine archictecture "%s" unsupported for constructing a Prism '
254+
'Machine architecture "%s" unsupported for constructing a Prism '
227255
'release binary URL.' % (opsys))
228-
return (
229-
GITHUB_DOWNLOAD_PREFIX +
230-
f"{root_tag}/apache_beam-{self._version}-prism-{opsys}-{arch}.zip")
231256

232-
def path_to_binary(self) -> str:
233-
if self._path is not None:
234-
# The path is overidden, check various cases.
235-
if os.path.exists(self._path):
236-
# The path is local and exists, use directly.
237-
return self._path
257+
# Some special handling is needed when creating url for release candidates.
258+
# For example, v2.66.0rc2 should have the following url
259+
# https://github.com/apache/beam/releases/download/v2.66.0-RC2/apache_beam-v2.66.0-prism-xxx-yyy.zip
260+
if 'rc' in version:
261+
version = version.split('rc')[0]
238262

239-
if FileSystems.exists(self._path):
240-
# The path is in one of the supported filesystems.
241-
return self._path
242-
243-
# Check if the path is a URL.
244-
url = urllib.parse.urlparse(self._path)
245-
if not url.scheme:
246-
raise ValueError(
247-
'Unable to parse binary URL "%s". If using a full URL, make '
248-
'sure the scheme is specified. If using a local file xpath, '
249-
'make sure the file exists; you may have to first build prism '
250-
'using `go build `.' % (self._path))
251-
252-
# We have a URL, see if we need to construct a valid file name.
253-
if self._path.startswith(GITHUB_DOWNLOAD_PREFIX):
254-
# If this URL starts with the download prefix, let it through.
255-
return self._path
256-
# The only other valid option is a github release page.
257-
if not self._path.startswith(GITHUB_TAG_PREFIX):
258-
raise ValueError(
259-
'Provided --prism_location URL is not an Apache Beam Github '
260-
'Release page URL or download URL: %s' % (self._path))
261-
# Get the root tag for this URL
262-
root_tag = os.path.basename(os.path.normpath(self._path))
263-
return self.construct_download_url(
264-
root_tag, platform.system(), platform.machine())
265-
266-
if '.dev' not in self._version:
267-
# Not a development version, so construct the production download URL
268-
return self.construct_download_url(
269-
self._version, platform.system(), platform.machine())
263+
if 'rc' in root_tag:
264+
root_tag = '-RC'.join(root_tag.split('rc'))
270265

266+
return (
267+
GITHUB_DOWNLOAD_PREFIX +
268+
f"{root_tag}/apache_beam-{version}-prism-{opsys}-{arch}.zip")
269+
270+
@staticmethod
271+
def _resolve_from_location_override(path, version) -> str:
272+
"""Handles the case where --prism_location is explicitly set."""
273+
# The path is overridden, check various cases.
274+
if os.path.exists(path):
275+
# The path is local and exists, use directly.
276+
return path
277+
278+
try:
279+
if FileSystems.exists(path):
280+
# The path is in one of the supported filesystems.
281+
return path
282+
except ValueError:
283+
# If there is a value error raised by Filesystems, try to resolve
284+
# the path with the following steps.
285+
pass
286+
287+
# Check if the path is a URL.
288+
url = urllib.parse.urlparse(path)
289+
if not url.scheme:
290+
raise ValueError(
291+
'Unable to parse binary URL "%s". If using a full URL, make '
292+
'sure the scheme is specified. If using a local file xpath, '
293+
'make sure the file exists; you may have to first build prism '
294+
'using `go build `.' % (path))
295+
296+
# We have a URL, see if we need to construct a valid file name.
297+
if path.startswith(GITHUB_DOWNLOAD_PREFIX):
298+
# If this URL starts with the download prefix, let it through.
299+
return path
300+
# The only other valid option is a github release page.
301+
if not path.startswith(GITHUB_TAG_PREFIX):
302+
raise ValueError(
303+
'Provided --prism_location URL is not an Apache Beam Github '
304+
'Release page URL or download URL: %s' % (path))
305+
# Get the root tag for this URL
306+
root_tag = os.path.basename(os.path.normpath(path))
307+
return PrismJobServer._construct_download_url(
308+
version, root_tag, platform.system(), platform.machine())
309+
310+
@staticmethod
311+
def _install_from_source(version):
312+
"""Builds and installs Prism from a Go source package.
313+
It first tries the local module, then falls back to @latest.
314+
"""
271315
# This is a development version! Assume Go is installed.
272316
# Set the install directory to the cache location.
273-
envdict = {**os.environ, "GOBIN": self.BIN_CACHE}
317+
envdict = {**os.environ, "GOBIN": PrismJobServer.BIN_CACHE}
274318
PRISMPKG = "github.com/apache/beam/sdks/v2/go/cmd/prism"
275319

320+
_LOGGER.info(
321+
'Installing prism from local source into "%s".',
322+
PrismJobServer.BIN_CACHE)
276323
process = subprocess.run(["go", "install", PRISMPKG],
277324
stdout=subprocess.PIPE,
278325
stderr=subprocess.STDOUT,
279326
env=envdict,
280327
check=False)
281328
if process.returncode == 0:
282329
# Successfully installed
283-
return '%s/prism' % (self.BIN_CACHE)
330+
return '%s/prism' % (PrismJobServer.BIN_CACHE)
284331

285332
# We failed to build for some reason.
286333
output = process.stdout.decode("utf-8")
@@ -300,20 +347,24 @@ def path_to_binary(self) -> str:
300347
'compile.\nPlease install Go (see https://go.dev/doc/install) to '
301348
'enable automatic local builds.\n'
302349
'Alternatively provide a binary with the --prism_location flag.'
303-
'\nCaptured output:\n %s' % (self._version, output))
350+
'\nCaptured output:\n %s' % (version, output))
304351

305352
# Go is installed and claims we're not in a Go module that has access to
306353
# the Prism package.
307354

308355
# Fallback to using the @latest version of prism, which works everywhere.
356+
_LOGGER.info(
357+
'Installing prism from "%s@latest" into "%s".',
358+
PRISMPKG,
359+
PrismJobServer.BIN_CACHE)
309360
process = subprocess.run(["go", "install", PRISMPKG + "@latest"],
310361
stdout=subprocess.PIPE,
311362
stderr=subprocess.STDOUT,
312363
env=envdict,
313364
check=False)
314365

315366
if process.returncode == 0:
316-
return '%s/prism' % (self.BIN_CACHE)
367+
return '%s/prism' % (PrismJobServer.BIN_CACHE)
317368

318369
output = process.stdout.decode("utf-8")
319370
raise ValueError(
@@ -322,10 +373,45 @@ def path_to_binary(self) -> str:
322373
'--prism_location flag.'
323374
'\nCaptured output:\n %s' % (process.args, output))
324375

376+
def _resolve_source_path(self) -> str:
377+
"""Resolves and returns the source for the Prism binary.
378+
379+
The resolution follows this order:
380+
381+
1. A user-provided location (local path, GCS, or URL).
382+
2. A pre-built binary from GitHub for a release version.
383+
3. Build from local Go source for a development version.
384+
"""
385+
if self._path:
386+
return self._resolve_from_location_override(self._path, self._version)
387+
388+
if '.dev' not in self._version:
389+
# Not a development version, so construct the production download URL
390+
return self._construct_download_url(
391+
self._version, self._version, platform.system(), platform.machine())
392+
393+
return self._install_from_source(self._version)
394+
395+
def _get_executable_path(self) -> str:
396+
"""Orchestrates the process of getting a ready-to-use Prism binary."""
397+
source = self._resolve_source_path()
398+
if source == "%s/prism" % (self.BIN_CACHE):
399+
# source is from go installation, so it is already a local binary
400+
return self._prepare_executable(source, self.BIN_CACHE, True)
401+
402+
# Always re-download/extract if a custom path was provided to avoid
403+
# staleness
404+
ignore_cache = self._path is not None
405+
406+
local_path, ignore_cache = self._download_to_local_path(source,
407+
self.BIN_CACHE,
408+
ignore_cache)
409+
410+
return self._prepare_executable(local_path, self.BIN_CACHE, ignore_cache)
411+
325412
def subprocess_cmd_and_endpoint(
326413
self) -> typing.Tuple[typing.List[typing.Any], str]:
327-
bin_path = self.local_bin(
328-
self.path_to_binary(), ignore_cache=(self._path is not None))
414+
bin_path = self._get_executable_path()
329415
job_port, = subprocess_server.pick_port(self._job_port)
330416
subprocess_cmd = [bin_path] + self.prism_arguments(job_port)
331417
return (subprocess_cmd, f"localhost:{job_port}")

0 commit comments

Comments
 (0)