#[cfg(feature = "azure")]
use crate::cache::azure::AzureBlobCache;
use crate::cache::disk::DiskCache;
#[cfg(feature = "gcs")]
use crate::cache::gcs::{self, GCSCache, GCSCredentialProvider, RWMode, ServiceAccountInfo};
#[cfg(feature = "memcached")]
use crate::cache::memcached::MemcachedCache;
#[cfg(feature = "redis")]
use crate::cache::redis::RedisCache;
#[cfg(feature = "s3")]
use crate::cache::s3::S3Cache;
use crate::config::{self, CacheType, Config};
use crate::util::fs;
#[cfg(feature = "gcs")]
use crate::util::fs::File;
use std::fmt;
use std::io::{self, Cursor, Read, Seek, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use tempfile::NamedTempFile;
use zip::write::FileOptions;
use zip::{CompressionMethod, ZipArchive, ZipWriter};
use crate::errors::*;
#[cfg(unix)]
fn get_file_mode(file: &fs::File) -> Result<Option<u32>> {
use std::os::unix::fs::MetadataExt;
Ok(Some(file.metadata()?.mode()))
}
#[cfg(windows)]
#[allow(clippy::unnecessary_wraps)]
fn get_file_mode(_file: &fs::File) -> Result<Option<u32>> {
Ok(None)
}
#[cfg(unix)]
fn set_file_mode(path: &Path, mode: u32) -> Result<()> {
use std::fs::Permissions;
use std::os::unix::fs::PermissionsExt;
let p = Permissions::from_mode(mode);
fs::set_permissions(path, p)?;
Ok(())
}
#[cfg(windows)]
#[allow(clippy::unnecessary_wraps)]
fn set_file_mode(_path: &Path, _mode: u32) -> Result<()> {
Ok(())
}
pub enum Cache {
Hit(CacheRead),
Miss,
Recache,
}
impl fmt::Debug for Cache {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
Cache::Hit(_) => write!(f, "Cache::Hit(...)"),
Cache::Miss => write!(f, "Cache::Miss"),
Cache::Recache => write!(f, "Cache::Recache"),
}
}
}
pub trait ReadSeek: Read + Seek + Send {}
impl<T: Read + Seek + Send> ReadSeek for T {}
pub struct CacheRead {
zip: ZipArchive<Box<dyn ReadSeek>>,
}
#[derive(Debug)]
pub struct DecompressionFailure;
impl std::fmt::Display for DecompressionFailure {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "failed to decompress content")
}
}
impl std::error::Error for DecompressionFailure {}
impl CacheRead {
pub fn from<R>(reader: R) -> Result<CacheRead>
where
R: ReadSeek + 'static,
{
let z = ZipArchive::new(Box::new(reader) as Box<dyn ReadSeek>)
.context("Failed to parse cache entry")?;
Ok(CacheRead { zip: z })
}
pub fn get_object<T>(&mut self, name: &str, to: &mut T) -> Result<Option<u32>>
where
T: Write,
{
let file = self.zip.by_name(name).or(Err(DecompressionFailure))?;
if file.compression() != CompressionMethod::Stored {
bail!(DecompressionFailure);
}
let mode = file.unix_mode();
zstd::stream::copy_decode(file, to).or(Err(DecompressionFailure))?;
Ok(mode)
}
pub fn get_stdout(&mut self) -> Vec<u8> {
self.get_bytes("stdout")
}
pub fn get_stderr(&mut self) -> Vec<u8> {
self.get_bytes("stderr")
}
fn get_bytes(&mut self, name: &str) -> Vec<u8> {
let mut bytes = Vec::new();
drop(self.get_object(name, &mut bytes));
bytes
}
pub async fn extract_objects<T>(
mut self,
objects: T,
pool: &tokio::runtime::Handle,
) -> Result<()>
where
T: IntoIterator<Item = (String, PathBuf)> + Send + Sync + 'static,
{
pool.spawn_blocking(move || {
for (key, path) in objects {
let dir = match path.parent() {
Some(d) => d,
None => bail!("Output file without a parent directory!"),
};
let mut tmp = NamedTempFile::new_in(dir)?;
let mode = self.get_object(&key, &mut tmp)?;
tmp.persist(&path)?;
if let Some(mode) = mode {
set_file_mode(&path, mode)?;
}
}
Ok(())
})
.await?
}
}
pub struct CacheWrite {
zip: ZipWriter<io::Cursor<Vec<u8>>>,
}
impl CacheWrite {
pub fn new() -> CacheWrite {
CacheWrite {
zip: ZipWriter::new(io::Cursor::new(vec![])),
}
}
pub async fn from_objects<T>(objects: T, pool: &tokio::runtime::Handle) -> Result<CacheWrite>
where
T: IntoIterator<Item = (String, PathBuf)> + Send + Sync + 'static,
{
pool.spawn_blocking(move || {
let mut entry = CacheWrite::new();
for (key, path) in objects {
let mut f = fs::File::open(&path)?;
let mode = get_file_mode(&f)?;
entry
.put_object(&key, &mut f, mode)
.with_context(|| format!("failed to put object `{:?}` in cache entry", path))?;
}
Ok(entry)
})
.await?
}
pub fn put_object<T>(&mut self, name: &str, from: &mut T, mode: Option<u32>) -> Result<()>
where
T: Read,
{
let opts = FileOptions::default().compression_method(CompressionMethod::Stored);
let opts = if let Some(mode) = mode {
opts.unix_permissions(mode)
} else {
opts
};
self.zip
.start_file(name, opts)
.context("Failed to start cache entry object")?;
zstd::stream::copy_encode(from, &mut self.zip, 3)?;
Ok(())
}
pub fn put_stdout(&mut self, bytes: &[u8]) -> Result<()> {
self.put_bytes("stdout", bytes)
}
pub fn put_stderr(&mut self, bytes: &[u8]) -> Result<()> {
self.put_bytes("stderr", bytes)
}
fn put_bytes(&mut self, name: &str, bytes: &[u8]) -> Result<()> {
if !bytes.is_empty() {
let mut cursor = Cursor::new(bytes);
return self.put_object(name, &mut cursor, None);
}
Ok(())
}
pub fn finish(self) -> Result<Vec<u8>> {
let CacheWrite { mut zip } = self;
let cur = zip.finish().context("Failed to finish cache entry zip")?;
Ok(cur.into_inner())
}
}
impl Default for CacheWrite {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
pub trait Storage: Send + Sync {
async fn get(&self, key: &str) -> Result<Cache>;
async fn put(&self, key: &str, entry: CacheWrite) -> Result<Duration>;
fn location(&self) -> String;
async fn current_size(&self) -> Result<Option<u64>>;
async fn max_size(&self) -> Result<Option<u64>>;
async fn clear(&self) -> Result<()>;
}
#[allow(clippy::cognitive_complexity)] pub fn storage_from_config(config: &Config, pool: &tokio::runtime::Handle) -> Arc<dyn Storage> {
for cache_type in config.caches.iter() {
match *cache_type {
CacheType::Azure(config::AzureCacheConfig) => {
debug!("Trying Azure Blob Store account");
#[cfg(feature = "azure")]
match AzureBlobCache::new() {
Ok(storage) => {
trace!("Using AzureBlobCache");
return Arc::new(storage);
}
Err(e) => warn!("Failed to create Azure cache: {:?}", e),
}
}
CacheType::GCS(config::GCSCacheConfig {
ref bucket,
ref cred_path,
ref url,
rw_mode,
}) => {
debug!(
"Trying GCS bucket({}, {:?}, {:?}, {:?})",
bucket, cred_path, url, rw_mode
);
#[cfg(feature = "gcs")]
{
let service_account_info_opt: Option<gcs::ServiceAccountInfo> =
if let Some(ref cred_path) = *cred_path {
let service_account_key_res: Result<gcs::ServiceAccountKey> = (|| {
let mut file = File::open(&cred_path)?;
let mut service_account_json = String::new();
file.read_to_string(&mut service_account_json)?;
Ok(serde_json::from_str(&service_account_json)?)
})(
);
if let Err(ref e) = service_account_key_res {
warn!(
"Failed to parse service account credentials from file: {:?}. \
Continuing without authentication.",
e
);
}
service_account_key_res
.ok()
.map(ServiceAccountInfo::AccountKey)
} else if let Some(ref url) = *url {
Some(ServiceAccountInfo::URL(url.clone()))
} else {
warn!(
"No CACHEPOT_GCS_KEY_PATH specified-- no authentication will be used."
);
None
};
let gcs_read_write_mode = match rw_mode {
config::GCSCacheRWMode::ReadOnly => RWMode::ReadOnly,
config::GCSCacheRWMode::ReadWrite => RWMode::ReadWrite,
};
let gcs_cred_provider = service_account_info_opt
.map(|info| GCSCredentialProvider::new(gcs_read_write_mode, info));
match GCSCache::new(bucket.to_owned(), gcs_cred_provider, gcs_read_write_mode) {
Ok(s) => {
trace!("Using GCSCache");
return Arc::new(s);
}
Err(e) => warn!("Failed to create GCS Cache: {:?}", e),
}
}
}
CacheType::Memcached(config::MemcachedCacheConfig { ref url }) => {
debug!("Trying Memcached({})", url);
#[cfg(feature = "memcached")]
match MemcachedCache::new(url, pool) {
Ok(s) => {
trace!("Using Memcached: {}", url);
return Arc::new(s);
}
Err(e) => warn!("Failed to create MemcachedCache: {:?}", e),
}
}
CacheType::Redis(config::RedisCacheConfig { ref url }) => {
debug!("Trying Redis({})", url);
#[cfg(feature = "redis")]
match RedisCache::new(url) {
Ok(s) => {
trace!("Using Redis: {}", url);
return Arc::new(s);
}
Err(e) => warn!("Failed to create RedisCache: {:?}", e),
}
}
CacheType::S3(ref c) => {
let region = c.region.as_deref();
let endpoint = c.endpoint.as_deref();
let key_prefix = c.key_prefix.as_deref();
debug!(
"Trying S3Cache({}, {}, {}, Anonymous {})",
c.bucket,
region.unwrap_or("default region"),
endpoint.unwrap_or("default endpoint"),
c.public,
);
#[cfg(feature = "s3")]
match S3Cache::new(
&c.bucket,
region,
endpoint,
key_prefix.unwrap_or(""),
c.public,
) {
Ok(s) => {
trace!("Using S3Cache");
return Arc::new(s);
}
Err(e) => warn!("Failed to create S3Cache: {:?}", e),
}
}
}
}
info!("No configured caches successful, falling back to default");
let (dir, size) = (&config.fallback_cache.dir, config.fallback_cache.size);
trace!("Using DiskCache({:?}, {})", dir, size);
Arc::new(DiskCache::new(&dir, size, pool))
}