fix(async): drop the RW guard of config before suspend points

This commit is contained in:
Orhun Parmaksız 2021-12-05 15:03:51 +03:00
parent 5cbb41c247
commit 8f3f89716f
No known key found for this signature in database
GPG key ID: F83424824B3E4B90
3 changed files with 38 additions and 19 deletions

View file

@ -9,7 +9,7 @@ use std::env;
use std::fs; use std::fs;
use std::io::Result as IoResult; use std::io::Result as IoResult;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, RwLock};
use std::time::Duration; use std::time::Duration;
#[actix_web::main] #[actix_web::main]
@ -21,11 +21,11 @@ async fn main() -> IoResult<()> {
dotenv::dotenv().ok(); dotenv::dotenv().ok();
let config_path = let config_path =
PathBuf::from(env::var("CONFIG").unwrap_or_else(|_| String::from("config.toml"))); PathBuf::from(env::var("CONFIG").unwrap_or_else(|_| String::from("config.toml")));
let config = Arc::new(Mutex::new( let config = Arc::new(RwLock::new(
Config::parse(&config_path).expect("failed to parse config"), Config::parse(&config_path).expect("failed to parse config"),
)); ));
let cloned_config = Arc::clone(&config); let cloned_config = Arc::clone(&config);
let server_config = config.lock().expect("cannot acquire config").server.clone(); let server_config = config.read().expect("cannot acquire config").server.clone();
// Create necessary directories. // Create necessary directories.
fs::create_dir_all(&server_config.upload_path)?; fs::create_dir_all(&server_config.upload_path)?;
@ -41,7 +41,7 @@ async fn main() -> IoResult<()> {
let config_watcher = move |event: Event| { let config_watcher = move |event: Event| {
if let Event::Write(path) = event { if let Event::Write(path) = event {
match Config::parse(&path) { match Config::parse(&path) {
Ok(config) => match cloned_config.lock() { Ok(config) => match cloned_config.write() {
Ok(mut cloned_config) => { Ok(mut cloned_config) => {
*cloned_config = config; *cloned_config = config;
log::info!("Configuration has been updated."); log::info!("Configuration has been updated.");

View file

@ -158,7 +158,7 @@ impl Paste {
&mut self, &mut self,
expiry_date: Option<u128>, expiry_date: Option<u128>,
client: &Client, client: &Client,
config: &Config, config: Config,
) -> Result<String, Error> { ) -> Result<String, Error> {
let data = str::from_utf8(&self.data).map_err(error::ErrorBadRequest)?; let data = str::from_utf8(&self.data).map_err(error::ErrorBadRequest)?;
let url = Url::parse(data).map_err(error::ErrorBadRequest)?; let url = Url::parse(data).map_err(error::ErrorBadRequest)?;
@ -189,7 +189,7 @@ impl Paste {
.to_string()); .to_string());
} }
} }
Ok(self.store_file(file_name, expiry_date, config)?) Ok(self.store_file(file_name, expiry_date, &config)?)
} }
/// Writes an URL to a file in upload directory. /// Writes an URL to a file in upload directory.

View file

@ -14,7 +14,7 @@ use futures_util::stream::StreamExt;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::env; use std::env;
use std::fs; use std::fs;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, RwLock};
/// Shows the landing page. /// Shows the landing page.
#[get("/")] #[get("/")]
@ -29,10 +29,10 @@ async fn index() -> impl Responder {
async fn serve( async fn serve(
request: HttpRequest, request: HttpRequest,
file: web::Path<String>, file: web::Path<String>,
config: web::Data<Arc<Mutex<Config>>>, config: web::Data<Arc<RwLock<Config>>>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let config = config let config = config
.lock() .read()
.map_err(|_| error::ErrorInternalServerError("cannot acquire config"))?; .map_err(|_| error::ErrorInternalServerError("cannot acquire config"))?;
let path = config.server.upload_path.join(&*file); let path = config.server.upload_path.join(&*file);
let mut path = util::glob_match_file(path)?; let mut path = util::glob_match_file(path)?;
@ -85,16 +85,13 @@ async fn upload(
request: HttpRequest, request: HttpRequest,
mut payload: Multipart, mut payload: Multipart,
client: web::Data<Client>, client: web::Data<Client>,
config: web::Data<Arc<Mutex<Config>>>, config: web::Data<Arc<RwLock<Config>>>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let connection = request.connection_info().clone(); let connection = request.connection_info().clone();
let host = connection.remote_addr().unwrap_or("unknown host"); let host = connection.remote_addr().unwrap_or("unknown host");
auth::check(host, request.headers(), env::var("AUTH_TOKEN").ok())?; auth::check(host, request.headers(), env::var("AUTH_TOKEN").ok())?;
let expiry_date = header::parse_expiry_date(request.headers())?; let expiry_date = header::parse_expiry_date(request.headers())?;
let mut urls: Vec<String> = Vec::new(); let mut urls: Vec<String> = Vec::new();
let config = config
.lock()
.map_err(|_| error::ErrorInternalServerError("cannot acquire config"))?;
while let Some(item) = payload.next().await { while let Some(item) = payload.next().await {
let mut field = item?; let mut field = item?;
let content = ContentDisposition::try_from(field.content_disposition())?; let content = ContentDisposition::try_from(field.content_disposition())?;
@ -102,6 +99,9 @@ async fn upload(
let mut bytes = Vec::<u8>::new(); let mut bytes = Vec::<u8>::new();
while let Some(chunk) = field.next().await { while let Some(chunk) = field.next().await {
bytes.append(&mut chunk?.to_vec()); bytes.append(&mut chunk?.to_vec());
let config = config
.read()
.map_err(|_| error::ErrorInternalServerError("cannot acquire config"))?;
if bytes.len() as u128 > config.server.max_content_length.get_bytes() { if bytes.len() as u128 > config.server.max_content_length.get_bytes() {
log::warn!("Upload rejected for {}", host); log::warn!("Upload rejected for {}", host);
return Err(error::ErrorPayloadTooLarge("upload limit exceeded")); return Err(error::ErrorPayloadTooLarge("upload limit exceeded"));
@ -114,9 +114,17 @@ async fn upload(
if paste_type != PasteType::Oneshot if paste_type != PasteType::Oneshot
&& paste_type != PasteType::RemoteFile && paste_type != PasteType::RemoteFile
&& expiry_date.is_none() && expiry_date.is_none()
&& !config.paste.duplicate_files.unwrap_or(true) && !config
.read()
.map_err(|_| error::ErrorInternalServerError("cannot acquire config"))?
.paste
.duplicate_files
.unwrap_or(true)
{ {
let bytes_checksum = util::sha256_digest(&*bytes)?; let bytes_checksum = util::sha256_digest(&*bytes)?;
let config = config
.read()
.map_err(|_| error::ErrorInternalServerError("cannot acquire config"))?;
if let Some(file) = Directory::try_from(config.server.upload_path.as_path())? if let Some(file) = Directory::try_from(config.server.upload_path.as_path())?
.get_file(bytes_checksum) .get_file(bytes_checksum)
{ {
@ -128,7 +136,6 @@ async fn upload(
.file_name() .file_name()
.map(|v| v.to_string_lossy()) .map(|v| v.to_string_lossy())
.unwrap_or_default() .unwrap_or_default()
.to_string()
)); ));
continue; continue;
} }
@ -139,14 +146,26 @@ async fn upload(
}; };
let file_name = match paste.type_ { let file_name = match paste.type_ {
PasteType::File | PasteType::Oneshot => { PasteType::File | PasteType::Oneshot => {
let config = config
.read()
.map_err(|_| error::ErrorInternalServerError("cannot acquire config"))?;
paste.store_file(content.get_file_name()?, expiry_date, &config)? paste.store_file(content.get_file_name()?, expiry_date, &config)?
} }
PasteType::RemoteFile => { PasteType::RemoteFile => {
paste {
.store_remote_file(expiry_date, &client, &config) let config = config.read().map_err(|_| {
.await? error::ErrorInternalServerError("cannot acquire config")
})?;
paste.store_remote_file(expiry_date, &client, config.clone())
}
.await?
}
PasteType::Url => {
let config = config
.read()
.map_err(|_| error::ErrorInternalServerError("cannot acquire config"))?;
paste.store_url(expiry_date, &config)?
} }
PasteType::Url => paste.store_url(expiry_date, &config)?,
}; };
log::info!( log::info!(
"{} ({}) is uploaded from {}", "{} ({}) is uploaded from {}",