Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
.git/
.github/
.kokoro/
.vscode/
bazel-*/
googleapis/
**/output/
**/target/
**/*.jar
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ steps:
cd /workspace
git clone https://github.com/googleapis/google-cloud-java
cd google-cloud-java
git checkout chore/test-hermetic-build
git checkout chore/test-hermetic-build-parallel
mkdir ../golden
cd ../golden
cp -r ../google-cloud-java/java-apigee-connect .
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,18 @@ def _construct_effective_arg(
arguments += ["--destination_path", temp_destination_path]

return arguments


import sys
from io import StringIO
import traceback


def library_generation_worker(config, library_path, library, repo_config):
error_msg = None
try:
generate_composed_library(config, library_path, library, repo_config)
except Exception as e:
error_msg = f"{e}\n{traceback.format_exc()}"

return "", error_msg
Comment thread
diegomarquezp marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ if [ -z "${artifact}" ]; then
artifact=""
fi

temp_destination_path="${output_folder}/temp_preprocessed-$RANDOM"
# Use mktemp to guarantee collision-free unique directories when multiple
# library generation processes run concurrently in a shared output folder
temp_destination_path=$(mktemp -d -p "${output_folder}" temp_preprocessed-XXXXXX)
mkdir -p "${output_folder}/${destination_path}"
if [ -d "${temp_destination_path}" ]; then
# we don't want the preprocessed sources of a previous run
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
from common.model.generation_config import GenerationConfig
from common.model.library_config import LibraryConfig
from common.utils.proto_path_utils import ends_with_version
from library_generation.generate_composed_library import generate_composed_library
from library_generation.generate_composed_library import (
generate_composed_library,
library_generation_worker,
)
from library_generation.utils.monorepo_postprocessor import monorepo_postprocessing

from common.model.gapic_config import GapicConfig
Expand Down Expand Up @@ -57,14 +60,9 @@ def generate_from_yaml(
)
# copy api definition to output folder.
shutil.copytree(api_definitions_path, repo_config.output_folder, dirs_exist_ok=True)
for library_path, library in repo_config.get_libraries().items():
print(f"generating library {library.get_library_name()}")
generate_composed_library(
config=config,
library_path=library_path,
library=library,
repo_config=repo_config,
)
_generate_libraries_in_parallel(config, repo_config)
sys.stdout = original_stdout
sys.stderr = original_stderr
Comment thread
diegomarquezp marked this conversation as resolved.

if not config.is_monorepo() or config.contains_common_protos():
return
Expand Down Expand Up @@ -152,3 +150,72 @@ def _get_target_libraries_from_api_path(
target_libraries.append(target_library)
return target_libraries
return []


import os
import sys
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
Comment thread
diegomarquezp marked this conversation as resolved.


class ThreadLocalStream:
"""
Thread-safe interceptor to route print() statements into thread-local buffers.
Necessary because sys.stdout is a global stream; direct threading writes interleave outputs.
"""

def __init__(self, original_stream):
self.original_stream = original_stream
self.local = threading.local()

@property
def buffer(self):
return getattr(self.local, "buffer", None)

def write(self, data):
writer = self.buffer if self.buffer is not None else self.original_stream
writer.write(data)

def flush(self):
if self.buffer is not None:
return
self.original_stream.flush()


original_stdout, original_stderr = sys.stdout, sys.stderr

print_lock = threading.Lock()


def _print_worker_result(lib_name, logs, err):
"""
Atomically prints the buffered output of a worker thread directly to original_stdout,
preventing output interleaving in the console.
"""
print_lock.acquire()
status = "[FAILURE]" if err else "[SUCCESS]"
original_stdout.write(f"\n{'='*40}\n{status} Logs for {lib_name}:\n{'='*40}\n")
original_stdout.write(logs)
if err:
original_stdout.write(f"\nError details:\n{err}\n")
original_stdout.flush()
print_lock.release()
Comment thread
diegomarquezp marked this conversation as resolved.


def _generate_libraries_in_parallel(config, repo_config):
cores = os.cpu_count() or 4
executor = ThreadPoolExecutor(max_workers=min(cores, 5))

futures = {
executor.submit(
library_generation_worker, config, path, lib, repo_config
): lib.get_library_name()
for path, lib in repo_config.get_libraries().items()
}

for future in as_completed(futures):
lib_name = futures[future]
logs, err = future.result()
_print_worker_result(lib_name, logs, err)

executor.shutdown()
Comment thread
diegomarquezp marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ echo "...done"

# write or restore pom.xml files
echo "Generating missing pom.xml..."
python3 "${scripts_root}/owlbot/src/fix_poms.py" "${versions_file}" "${is_monorepo}"
# Under parallel multi-library generation, fix_poms.py modifies the shared versions_file.
# We use flock to serialize edits safely across concurrent processes.
flock "${versions_file}" python3 "${scripts_root}/owlbot/src/fix_poms.py" "${versions_file}" "${is_monorepo}"
echo "...done"

# write or restore clirr-ignored-differences.xml
Expand Down
Loading