Skip to content
7 changes: 7 additions & 0 deletions minecode/tests/collectors/test_github.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ def test_github_get_all_versions(self):
"minecode-pipelines/v0.0.1b6",
"minecode-pipelines/v0.0.1b7",
"minecode-pipelines/v0.0.1b8",
"minecode-pipelines/v0.0.1b9",
"minecode-pipelines/v0.0.1b10",
"minecode-pipelines/v0.0.1b11",
"minecode-pipelines/v0.0.1b12",
"minecode-pipelines/v0.0.1b13",
"minecode-pipelines/v0.0.1b14",
"minecode-pipelines/v0.0.1b15",
]
for item in expected:
self.assertIn(item, versions)
2 changes: 1 addition & 1 deletion minecode_pipelines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@
#


VERSION = "0.0.1b57"
VERSION = "0.0.1b59"
159 changes: 159 additions & 0 deletions minecode_pipelines/miners/cpan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# purldb is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/purldb for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import gzip
import requests

from bs4 import BeautifulSoup
from packageurl import PackageURL

from minecode_pipelines.utils import get_temp_file

"""
Visitors for cpan and cpan-like perl package repositories.
"""


CPAN_REPO = "https://www.cpan.org/"
CPAN_TYPE = "cpan"


def get_cpan_packages(cpan_repo=CPAN_REPO, logger=None):
"""
Get cpan package names parsed from the `02packages.details.txt`
which conatins a list of all modules and their respective
package archive paths. We parse the package names and their respective
path_prefixes with author page path from this list.
"""
cpan_packages_url = cpan_repo + "modules/02packages.details.txt.gz"
packages_archive = get_temp_file(file_name="cpan_packages", extension=".gz")
packages_content = get_temp_file(file_name="cpan_packages", extension=".txt")
response = requests.get(cpan_packages_url, stream=True)
with open(packages_archive, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)

with gzip.open(packages_archive, "rb") as f_in:
with open(packages_content, "wb") as f_out:
f_out.writelines(f_in)

with open(packages_content, 'r', encoding='utf-8') as file:
packages_content = file.read()

package_path_by_name = {}

# The ``modules/02packages.details.txt`` file has the following section
# at the beginning of the file:
#
# File: 02packages.details.txt
# URL: http://www.cpan.org/modules/02packages.details.txt
# Description: Package names found in directory $CPAN/authors/id/
# Columns: package name, version, path
# Intended-For: Automated fetch routines, namespace documentation.
# Written-By: PAUSE version 1.005
# Line-Count: 268940
# Last-Updated: Mon, 29 Sep 2025 22:29:02 GMT
#
# This information is there in first 10 lines, and the last line is an
# empty line, both of which we are ignoring below

modules = packages_content.split("\n")[9:-1]
Comment thread
AyanSinhaMahapatra marked this conversation as resolved.

# A sample line from this module list looks like this:
#
# Crypt::Passphrase::SHA1::Base64 0.021 L/LE/LEONT/Crypt-Passphrase-0.021.tar.gz

for module in modules:
info = [section for section in module.split(" ") if section]

# This is like: L/LE/LEONT/Crypt-Passphrase-0.021.tar.gz
package_path = info[-1]
Comment thread
AyanSinhaMahapatra marked this conversation as resolved.
path_segments = package_path.split("/")
filename = path_segments.pop()
path_prefix = "/".join(path_segments)

name_version = filename.replace(".tar.gz", "").split("-")
_version = name_version.pop()
name = "-".join(name_version)

# for the above example: name: Crypt-Passphrase, path_prefix: L/LE/LEONT/
package_path_by_name[name] = path_prefix

return package_path_by_name


def get_cpan_packageurls(name, path_prefix, logger=None):
"""
Given a package name and it's path_prefix (author page path)
return a list of packageURLs for that package.

An author page (like https://www.cpan.org/authors/id/P/PT/PTC/) lists
all versions of all packages released by the author, so we can scrape
all the packageURLs from this author packages index.
"""

author_name = path_prefix.split("/")[-1]

packageurls = []

# file extensions found in cpan index
ignorable_extensions = [".meta", ".readme", ".tar.gz"]

cpan_authors_path = "/authors/id/"
cpan_authors_url = CPAN_REPO + cpan_authors_path

cpan_author_page_url = cpan_authors_url + path_prefix

response = requests.get(cpan_author_page_url)
if not response.ok:
return packageurls

if logger:
logger(f"Getting package versions for {name} from {cpan_author_page_url}")

soup = BeautifulSoup(response.text, "html.parser")

# We get all the listed packages in the author page index
package_list = soup.find("ul")
if not package_list:
return packageurls

package_list_elements = package_list.text.split("\n")

package_elements = [
element.replace(" ", "")
for element in package_list_elements
if element and element not in {" Parent Directory", " CHECKSUMS"}
]

versions = []
for package_file in package_elements:
for extension in ignorable_extensions:
if extension in package_file:
package_file = package_file.replace(extension, "")

name_version = package_file.split("-")
version = name_version.pop()
package_name = "-".join(name_version)
if package_name != name:
continue

versions.append(version)

unique_versions = list(set(versions))
for version in unique_versions:
purl = PackageURL(
type=CPAN_TYPE,
namespace=author_name,
name=name,
version=version,
)
packageurls.append(purl.to_string())

return packageurls
60 changes: 60 additions & 0 deletions minecode_pipelines/pipelines/mine_cpan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# SPDX-License-Identifier: Apache-2.0
#
# http://nexb.com and https://github.com/aboutcode-org/scancode.io
# The ScanCode.io software is licensed under the Apache License version 2.0.
# Data generated with ScanCode.io is provided as-is without warranties.
# ScanCode is a trademark of nexB Inc.
#
# You may not use this software except in compliance with the License.
# You may obtain a copy of the License at: http://apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
# Data Generated with ScanCode.io is provided on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, either express or implied. No content created from
# ScanCode.io should be considered or used as legal advice. Consult an Attorney
# for any legal advice.
#
# ScanCode.io is a free software code scanning tool from nexB Inc. and others.
# Visit https://github.com/aboutcode-org/scancode.io for support and download.


from minecode_pipelines import pipes
from minecode_pipelines.pipes import cpan
from minecode_pipelines.pipelines import MineCodeBasePipeline
from scanpipe.pipes import federatedcode


class MineCpan(MineCodeBasePipeline):
"""
Mine all packageURLs from a cpan index and publish them to
a FederatedCode repo.
"""

@classmethod
def steps(cls):
return (
cls.check_federatedcode_eligibility,
cls.create_federatedcode_working_dir,
cls.mine_cpan_packages,
cls.fetch_federation_config,
cls.mine_and_publish_packageurls,
cls.delete_working_dir,
)

def mine_cpan_packages(self):
"""Mine cpan package names from cpan indexes or checkpoint."""
self.cpan_packages_path_by_name = cpan.mine_cpan_packages(logger=self.log)

def packages_count(self):
return len(self.cpan_packages_path_by_name)

def mine_packageurls(self):
"""Get cpan packageURLs for all mined cpan package names."""
yield from cpan.mine_and_publish_cpan_packageurls(
package_path_by_name=self.cpan_packages_path_by_name,
logger=self.log,
)

95 changes: 95 additions & 0 deletions minecode_pipelines/pipes/cpan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# SPDX-License-Identifier: Apache-2.0
#
# http://nexb.com and https://github.com/aboutcode-org/scancode.io
# The ScanCode.io software is licensed under the Apache License version 2.0.
# Data generated with ScanCode.io is provided as-is without warranties.
# ScanCode is a trademark of nexB Inc.
#
# You may not use this software except in compliance with the License.
# You may obtain a copy of the License at: http://apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software distributed
# under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
# CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
#
# Data Generated with ScanCode.io is provided on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, either express or implied. No content created from
# ScanCode.io should be considered or used as legal advice. Consult an Attorney
# for any legal advice.
#
# ScanCode.io is a free software code scanning tool from nexB Inc. and others.
# Visit https://github.com/aboutcode-org/scancode.io for support and download.

from minecode_pipelines.miners.cpan import get_cpan_packages
from minecode_pipelines.miners.cpan import get_cpan_packageurls
from minecode_pipelines.miners.cpan import CPAN_REPO

from minecode_pipelines.miners.cpan import CPAN_TYPE
from minecode_pipelines.utils import grouper

from packageurl import PackageURL

# If True, show full details on fetching packageURL for
# a package name present in the index
LOG_PACKAGEURL_DETAILS = False

PACKAGE_BATCH_SIZE = 500


def mine_cpan_packages(logger=None):
if logger:
logger("Getting packages from cpan index")

package_path_by_name = get_cpan_packages(cpan_repo=CPAN_REPO, logger=logger)

if logger:
packages_count = len(package_path_by_name.keys())
logger(f"Mined {packages_count} packages from cpan index")

return package_path_by_name


def mine_and_publish_cpan_packageurls(package_path_by_name, logger=None):
if not package_path_by_name:
return

for package_batch in grouper(n=PACKAGE_BATCH_SIZE, iterable=package_path_by_name.keys()):
packages_mined = []

if logger and LOG_PACKAGEURL_DETAILS:
logger("Starting package mining for a batch of packages")

for package_name in package_batch:
if not package_name or package_name in packages_mined:
continue

# fetch packageURLs for package
if logger and LOG_PACKAGEURL_DETAILS:
logger(f"getting packageURLs for package: {package_name}")

path_prefix = package_path_by_name.get(package_name)
if not path_prefix:
continue

packageurls = get_cpan_packageurls(
name=package_name,
path_prefix=path_prefix,
logger=logger,
)
if not packageurls:
if logger and LOG_PACKAGEURL_DETAILS:
logger(f"Package versions not present for package: {package_name}")

# We don't want to try fetching versions for these again
packages_mined.append(package_name)
continue

# get repo and path for package
base_purl = PackageURL(type=CPAN_TYPE, name=package_name).to_string()
if logger and LOG_PACKAGEURL_DETAILS:
logger(f"fetched packageURLs for package: {base_purl}")
purls_string = " ".join(packageurls)
logger(f"packageURLs: {purls_string}")

packages_mined.append(package_name)
yield base_purl, packageurls
8 changes: 5 additions & 3 deletions pyproject-minecode_pipelines.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "flot.buildapi"

[project]
name = "minecode_pipelines"
version = "0.0.1b57"
version = "0.0.1b59"
description = "A library for mining packageURLs and package metadata from ecosystem repositories."
readme = "minecode_pipelines/README.rst"
license = { text = "Apache-2.0" }
Expand Down Expand Up @@ -43,7 +43,8 @@ dependencies = [
"scancodeio >= 35.3.0",
"ftputil >= 5.1.0",
"jawa >= 2.2.0",
"arrow >= 1.3.0"
"arrow >= 1.3.0",
"beautifulsoup4 >= 4.13.4"
]

urls = { Homepage = "https://github.com/aboutcode-org/purldb" }
Expand All @@ -56,12 +57,13 @@ mine_debian = "minecode_pipelines.pipelines.mine_debian:MineDebian"
mine_nuget = "minecode_pipelines.pipelines.mine_nuget:MineNuGet"
mine_alpine = "minecode_pipelines.pipelines.mine_alpine:MineAlpine"
mine_conan = "minecode_pipelines.pipelines.mine_conan:MineConan"
mine_cpan = "minecode_pipelines.pipelines.mine_cpan:MineCpan"
mine_cran = "minecode_pipelines.pipelines.mine_cran:MineCran"
mine_swift = "minecode_pipelines.pipelines.mine_swift:MineSwift"
mine_composer = "minecode_pipelines.pipelines.mine_composer:MineComposer"

[tool.bumpversion]
current_version = "0.0.1b25"
current_version = "0.0.1b59"
allow_dirty = true

files = [
Expand Down