Files
pkh/src/package_info.rs
Valentin Haudiquet b3365afe5b
All checks were successful
CI / build (push) Successful in 7m21s
docs: added documentation, enforced documentation
2026-01-01 18:37:40 +01:00

550 lines
17 KiB
Rust

use chrono::NaiveDate;
use flate2::read::GzDecoder;
use std::collections::HashMap;
use std::error::Error;
use std::io::Read;
use std::path::Path;
use crate::ProgressCallback;
use log::{debug, warn};
const BASE_URL_UBUNTU: &str = "http://archive.ubuntu.com/ubuntu";
const BASE_URL_DEBIAN: &str = "http://deb.debian.org/debian";
async fn check_launchpad_repo(package: &str) -> Result<Option<String>, Box<dyn Error>> {
let url = format!("https://git.launchpad.net/ubuntu/+source/{}", package);
let client = reqwest::Client::builder()
.redirect(reqwest::redirect::Policy::none())
.build()?;
let response = client.head(&url).send().await?;
if response.status().is_success() {
Ok(Some(url))
} else {
Ok(None)
}
}
fn parse_series_csv(content: &str) -> Result<Vec<String>, Box<dyn Error>> {
let mut rdr = csv::ReaderBuilder::new()
.flexible(true)
.from_reader(content.as_bytes());
let headers = rdr.headers()?.clone();
let series_idx = headers
.iter()
.position(|h| h == "series")
.ok_or("Column 'series' not found")?;
let created_idx = headers
.iter()
.position(|h| h == "created")
.ok_or("Column 'created' not found")?;
let mut entries = Vec::new();
for result in rdr.records() {
let record = result?;
if let (Some(s), Some(c)) = (record.get(series_idx), record.get(created_idx))
&& let Ok(date) = NaiveDate::parse_from_str(c, "%Y-%m-%d")
{
entries.push((s.to_string(), date));
}
}
// Sort by date descending (newest first)
entries.sort_by(|a, b| b.1.cmp(&a.1));
Ok(entries.into_iter().map(|(s, _)| s).collect())
}
async fn get_ordered_series(dist: &str) -> Result<Vec<String>, Box<dyn Error>> {
let content = if Path::new(format!("/usr/share/distro-info/{dist}.csv").as_str()).exists() {
std::fs::read_to_string(format!("/usr/share/distro-info/{dist}.csv"))?
} else {
reqwest::get(
format!("https://salsa.debian.org/debian/distro-info-data/-/raw/main/{dist}.csv")
.as_str(),
)
.await?
.text()
.await?
};
let mut series = parse_series_csv(&content)?;
// For Debian, ensure 'sid' is first if it's not (it usually doesn't have a date or is very old/new depending on file)
// Actually in the file sid has 1993 date.
// But we want to try 'sid' (unstable) first for Debian.
if dist == "debian" {
series.retain(|s| s != "sid");
series.insert(0, "sid".to_string());
}
Ok(series)
}
// Keep existing functions for compatibility or refactor them to use get_ordered_series
async fn get_series_from_url(url: &str) -> Result<Vec<String>, Box<dyn Error>> {
let content = reqwest::get(url).await?.text().await?;
parse_series_csv(&content)
}
fn get_series_from_file(path: &str) -> Result<Vec<String>, Box<dyn Error>> {
let content = std::fs::read_to_string(path)?;
parse_series_csv(&content)
}
/// Obtain a list of series from a distribution
pub async fn get_dist_series(dist: &str) -> Result<Vec<String>, Box<dyn Error>> {
if Path::new(format!("/usr/share/distro-info/{dist}.csv").as_str()).exists() {
get_series_from_file(format!("/usr/share/distro-info/{dist}.csv").as_str())
} else {
get_series_from_url(
format!("https://salsa.debian.org/debian/distro-info-data/-/raw/main/{dist}.csv")
.as_str(),
)
.await
}
}
/// Obtain the distribution (eg. debian, ubuntu) from a distribution series (eg. noble, bookworm)
pub async fn get_dist_from_series(series: &str) -> Result<String, Box<dyn Error>> {
let debian_series = get_dist_series("debian").await?;
if debian_series.contains(&series.to_string()) {
return Ok("debian".to_string());
}
let ubuntu_series = get_dist_series("ubuntu").await?;
if ubuntu_series.contains(&series.to_string()) {
return Ok("ubuntu".to_string());
}
Err(format!("Unknown series: {}", series).into())
}
/// A File used in a source package
#[derive(Debug, Clone)]
pub struct FileEntry {
/// Name of the file
pub name: String,
/// Size of the file
pub size: u64,
/// SHA256 hash for the file
pub sha256: String,
}
/// A package 'stanza' as found is 'Sources.gz' files, containing basic information about a source package
#[derive(Debug)]
pub struct PackageStanza {
/// Name of the package
pub package: String,
/// Version number for the package
pub version: String,
/// Directory field in the stanza
pub directory: String,
/// Source package format (e.g. '3.0 (quilt)')
pub format: String,
/// Vcs-Git field in the stanza
pub vcs_git: Option<String>,
/// Vcs-Browser field in the stanza
pub vcs_browser: Option<String>,
/// Files present in the source package
pub files: Vec<FileEntry>,
}
/// Source package information
#[derive(Debug)]
pub struct PackageInfo {
/// Source 'stanza' for the package, containing basic information
pub stanza: PackageStanza,
/// Distribution for the package
pub dist: String,
/// Distribution series for the package
pub series: String,
/// Preferred VCS for the source package
///
/// Should be Launchpad on Ubuntu, and Salsa on Debian
pub preferred_vcs: Option<String>,
/// URL for the files of the source package
pub archive_url: String,
}
impl PackageInfo {
/// Returns true if the package is a Debian native package (no orig)
pub fn is_native(&self) -> bool {
self.stanza.format.contains("(native)")
}
}
fn get_dist_pockets(dist: &str) -> Vec<&'static str> {
match dist {
"ubuntu" => vec!["proposed", "updates", ""],
"debian" => vec!["proposed-updates", "updates", ""],
_ => vec![""],
}
}
fn get_sources_url(base_url: &str, series: &str, pocket: &str, component: &str) -> String {
let pocket_full = if pocket.is_empty() {
String::new()
} else {
format!("-{}", pocket)
};
format!("{base_url}/dists/{series}{pocket_full}/{component}/source/Sources.gz")
}
fn get_base_url(dist: &str) -> &str {
match dist {
"ubuntu" => BASE_URL_UBUNTU,
"debian" => BASE_URL_DEBIAN,
_ => panic!("Unknown distribution"),
}
}
/// Obtain the URL for the 'Release' file of a distribution series
fn get_release_url(base_url: &str, series: &str, pocket: &str) -> String {
let pocket_full = if pocket.is_empty() {
String::new()
} else {
format!("-{}", pocket)
};
format!("{base_url}/dists/{series}{pocket_full}/Release")
}
/// Obtain the components of a distribution series by parsing the 'Release' file
async fn get_components(
base_url: &str,
series: &str,
pocket: &str,
) -> Result<Vec<String>, Box<dyn Error>> {
let url = get_release_url(base_url, series, pocket);
debug!("Fetching Release file from: {}", url);
let content = reqwest::get(&url).await?.text().await?;
for line in content.lines() {
if line.starts_with("Components:")
&& let Some((_, components)) = line.split_once(':')
{
return Ok(components
.split_whitespace()
.map(|s| s.to_string())
.collect());
}
}
Err("Components not found.".into())
}
struct DebianSources {
splitted_sources: std::str::Split<'static, &'static str>,
}
impl DebianSources {
fn new(data: &[u8]) -> Result<DebianSources, Box<dyn Error>> {
// Gz-decode 'Sources.gz' file into a string, and split it on stanzas
let mut d = GzDecoder::new(data);
let mut s = String::new();
d.read_to_string(&mut s)?;
// Convert the string to a static lifetime by leaking it
let static_str = Box::leak(s.into_boxed_str());
let splitted = static_str.split("\n\n");
Ok(DebianSources {
splitted_sources: splitted,
})
}
}
impl Iterator for DebianSources {
type Item = PackageStanza;
fn next(&mut self) -> Option<Self::Item> {
let stanza = self.splitted_sources.next()?;
// Parse stanza into a hashmap of strings, the fields
let mut fields: HashMap<String, String> = HashMap::new();
let mut current_key = String::new();
for line in stanza.lines() {
if line.is_empty() {
continue;
}
if line.starts_with(' ') || line.starts_with('\t') {
// Continuation line
if let Some(val) = fields.get_mut(&current_key) {
val.push('\n');
val.push_str(line.trim());
}
} else if let Some((key, value)) = line.split_once(':') {
current_key = key.trim().to_string();
fields.insert(current_key.clone(), value.trim().to_string());
}
}
let pkg = fields.get("Package");
if pkg.is_none() {
// Skip empty stanza
return self.next();
}
// Parse package files
let mut files = Vec::new();
if let Some(checksums) = fields.get("Checksums-Sha256") {
for line in checksums.lines() {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 3 {
files.push(FileEntry {
sha256: parts[0].to_string(),
size: parts[1].parse().unwrap_or(0),
name: parts[2].to_string(),
});
}
}
}
Some(PackageStanza {
package: fields.get("Package").unwrap().to_string(),
version: fields.get("Version").unwrap().to_string(),
directory: fields.get("Directory").cloned().unwrap_or_default(),
format: fields
.get("Format")
.cloned()
.unwrap_or_else(|| "1.0".to_string()),
vcs_git: fields.get("Vcs-Git").cloned(),
vcs_browser: fields.get("Vcs-Browser").cloned(),
files,
})
}
}
/// Parse a 'Sources.gz' debian package file data, to look for a target package and
/// return the data for that package stanza
fn parse_sources(
data: &[u8],
target_package: &str,
target_version: Option<&str>,
) -> Result<Option<PackageStanza>, Box<dyn Error>> {
let mut sources = DebianSources::new(data)?;
// Find the right package, with the right version if requested
Ok(sources.find(|s| {
s.package == target_package
&& (target_version.is_none() || s.version == target_version.unwrap())
}))
}
/// Get package information from a package, distribution series, and pocket
pub async fn get(
package_name: &str,
series: &str,
pocket: &str,
version: Option<&str>,
) -> Result<PackageInfo, Box<dyn Error>> {
let dist = get_dist_from_series(series).await?;
// Handle Ubuntu case: Vcs-Git does not usually point to Launchpad but Salsa
// We need to check manually if there is a launchpad repository for the package
let mut preferred_vcs = None;
if dist == "ubuntu"
&& let Some(lp_url) = check_launchpad_repo(package_name).await?
{
debug!("Found Launchpad URL: {}", lp_url);
preferred_vcs = Some(lp_url);
}
let base_url = get_base_url(&dist);
let components = get_components(base_url, series, pocket).await?;
debug!("Found components: {:?}", components);
for component in components {
let url = get_sources_url(base_url, series, pocket, &component);
debug!("Fetching sources from: {}", url);
let response = match reqwest::get(&url).await {
Ok(resp) => resp,
Err(e) => {
debug!("Failed to fetch {}: {}", url, e);
continue;
}
};
if !response.status().is_success() {
debug!("Failed to fetch {}: status {}", url, response.status());
continue;
}
let compressed_data = response.bytes().await?;
debug!(
"Downloaded Sources.gz for {}/{}/{}",
dist, series, component
);
if let Some(stanza) = parse_sources(&compressed_data, package_name, version)? {
if let Some(vcs) = &stanza.vcs_git
&& preferred_vcs.is_none()
{
preferred_vcs = Some(vcs.clone());
}
let archive_url = format!("{base_url}/{0}", stanza.directory);
return Ok(PackageInfo {
dist,
series: series.to_string(),
stanza,
preferred_vcs,
archive_url,
});
}
}
Err(format!(
"Package '{}' not found in {}/{}",
package_name, dist, series
)
.into())
}
/// Try to find package information in a distribution, trying all series and pockets
pub async fn find_package(
package_name: &str,
dist: &str,
pocket: &str,
version: Option<&str>,
progress: ProgressCallback<'_>,
) -> Result<PackageInfo, Box<dyn Error>> {
let series_list = get_ordered_series(dist).await?;
for (i, series) in series_list.iter().enumerate() {
if let Some(cb) = progress {
cb("", &format!("Checking {}...", series), i, series_list.len());
}
let pockets = if pocket.is_empty() {
get_dist_pockets(dist)
} else {
vec![pocket]
};
for p in pockets {
match get(package_name, series, p, version).await {
Ok(info) => {
if i > 0 {
warn!(
"Package '{}' not found in development release. Found in {}/{}-{}.",
package_name, dist, series, p
);
} else {
debug!(
"Found package '{}' in {}/{}-{}",
package_name, dist, series, p
);
}
return Ok(info);
}
Err(_e) => {
continue;
}
}
}
}
Err(format!("Package '{}' not found.", package_name).into())
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_check_launchpad_repo() {
// "hello" should exist on Launchpad for Ubuntu
let url = check_launchpad_repo("hello").await.unwrap();
assert!(url.is_some());
assert_eq!(
url.unwrap(),
"https://git.launchpad.net/ubuntu/+source/hello"
);
// "this-package-should-not-exist-12345" should not exist
let url = check_launchpad_repo("this-package-should-not-exist-12345")
.await
.unwrap();
assert!(url.is_none());
}
#[tokio::test]
async fn test_get_debian_series() {
let series = get_dist_series("debian").await.unwrap();
assert!(series.contains(&"sid".to_string()));
assert!(series.contains(&"bookworm".to_string()));
}
#[tokio::test]
async fn test_get_ubuntu_series() {
let series = get_dist_series("ubuntu").await.unwrap();
assert!(series.contains(&"noble".to_string()));
assert!(series.contains(&"jammy".to_string()));
}
#[tokio::test]
async fn test_get_dist_from_series() {
assert_eq!(get_dist_from_series("sid").await.unwrap(), "debian");
assert_eq!(get_dist_from_series("noble").await.unwrap(), "ubuntu");
}
#[test]
fn test_parse_sources() {
use flate2::Compression;
use flate2::write::GzEncoder;
use std::io::Write;
let data = "Package: hello
Version: 2.10-2
Format: 3.0 (quilt)
Directory: pool/main/h/hello
Vcs-Git: https://salsa.debian.org/debian/hello.git
Package: other
Version: 1.0
";
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(data.as_bytes()).unwrap();
let compressed = encoder.finish().unwrap();
let info = parse_sources(&compressed, "hello", None).unwrap().unwrap();
assert_eq!(info.package, "hello");
assert_eq!(info.version, "2.10-2");
assert_eq!(info.format, "3.0 (quilt)");
assert_eq!(info.directory, "pool/main/h/hello");
assert_eq!(
info.vcs_git.unwrap(),
"https://salsa.debian.org/debian/hello.git"
);
let none = parse_sources(&compressed, "missing", None).unwrap();
assert!(none.is_none());
}
#[tokio::test]
async fn test_find_package_fallback() {
// python2.7 is in bullseye but not above
let info = find_package("python2.7", "debian", "", None, None)
.await
.unwrap();
assert_eq!(info.stanza.package, "python2.7");
assert_eq!(info.series, "bullseye")
}
#[tokio::test]
async fn test_find_package_devel() {
// hello is in sid
let info = find_package("hello", "debian", "", None, None)
.await
.unwrap();
assert_eq!(info.stanza.package, "hello");
assert_eq!(info.series, "sid")
}
}