fix(cli): fix race condition when resolving paths

This commit is contained in:
Boshen 2023-08-20 22:06:34 +08:00
parent e5d7618457
commit de7735dc1d
No known key found for this signature in database
GPG key ID: 9C7A8C8AB22BEBD1
4 changed files with 65 additions and 41 deletions

View file

@ -2,10 +2,9 @@ mod error;
use std::{ use std::{
io::BufWriter, io::BufWriter,
path::Path,
sync::{ sync::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
mpsc, Arc, Arc,
}, },
}; };
@ -13,7 +12,7 @@ pub use self::error::Error;
use oxc_diagnostics::DiagnosticService; use oxc_diagnostics::DiagnosticService;
use oxc_index::assert_impl_all; use oxc_index::assert_impl_all;
use oxc_linter::{LintOptions, LintService, Linter}; use oxc_linter::{LintOptions, LintService, Linter, PathWork};
use crate::{command::LintOptions as CliLintOptions, walk::Walk, CliRunResult, Runner}; use crate::{command::LintOptions as CliLintOptions, walk::Walk, CliRunResult, Runner};
@ -60,27 +59,22 @@ impl Runner for LintRunner {
let number_of_files = Arc::new(AtomicUsize::new(0)); let number_of_files = Arc::new(AtomicUsize::new(0));
let (tx_path, rx_path) = mpsc::channel::<Box<Path>>(); let lint_service = LintService::new(Arc::clone(&linter));
let tx_path = lint_service.tx_path.clone();
let tx_error = diagnostic_service.sender().clone(); lint_service.run(&diagnostic_service.sender().clone());
rayon::scope(|s| {
let lint_service = LintService::new(Arc::clone(&linter));
s.spawn(move |_| {
while let Ok(path) = rx_path.recv() {
lint_service.run_path(path, &tx_error);
}
});
rayon::spawn({
let number_of_files = Arc::clone(&number_of_files); let number_of_files = Arc::clone(&number_of_files);
let walk = Walk::new(&paths, &ignore_options); let walk = Walk::new(&paths, &ignore_options);
s.spawn(move |_| { move || {
let mut count = 0; let mut count = 0;
walk.iter().for_each(|path| { for path in walk.iter() {
count += 1; count += 1;
tx_path.send(path).unwrap(); tx_path.send(PathWork::Begin(path)).unwrap();
}); }
tx_path.send(PathWork::Done).unwrap();
number_of_files.store(count, Ordering::SeqCst); number_of_files.store(count, Ordering::SeqCst);
}); }
}); });
diagnostic_service.run(); diagnostic_service.run();

View file

@ -8,9 +8,9 @@ use std::{
use crate::{miette::NamedSource, Error, GraphicalReportHandler, MinifiedFileError, Severity}; use crate::{miette::NamedSource, Error, GraphicalReportHandler, MinifiedFileError, Severity};
pub type DiagnosticTuple = Option<(PathBuf, Vec<Error>)>; pub type DiagnosticTuple = (PathBuf, Vec<Error>);
pub type DiagnosticSender = mpsc::Sender<DiagnosticTuple>; pub type DiagnosticSender = mpsc::Sender<Option<DiagnosticTuple>>;
pub type DiagnosticReceiver = mpsc::Receiver<DiagnosticTuple>; pub type DiagnosticReceiver = mpsc::Receiver<Option<DiagnosticTuple>>;
pub struct DiagnosticService { pub struct DiagnosticService {
/// Disable reporting on warnings, only errors are reported /// Disable reporting on warnings, only errors are reported

View file

@ -25,7 +25,7 @@ pub use crate::{
context::LintContext, context::LintContext,
options::{AllowWarnDeny, LintOptions}, options::{AllowWarnDeny, LintOptions},
rule::RuleCategory, rule::RuleCategory,
service::LintService, service::{LintService, PathWork},
}; };
pub(crate) use rules::{RuleEnum, RULES}; pub(crate) use rules::{RuleEnum, RULES};

View file

@ -1,37 +1,74 @@
use std::{ use std::{
collections::HashSet,
fs, fs,
path::Path, path::Path,
rc::Rc, rc::Rc,
sync::{ sync::{mpsc, Arc},
atomic::{AtomicUsize, Ordering},
Arc,
},
}; };
use oxc_allocator::Allocator; use oxc_allocator::Allocator;
use oxc_diagnostics::{DiagnosticSender, DiagnosticService}; use oxc_diagnostics::{DiagnosticSender, DiagnosticService, DiagnosticTuple};
use oxc_parser::Parser; use oxc_parser::Parser;
use oxc_semantic::SemanticBuilder; use oxc_semantic::SemanticBuilder;
use oxc_span::SourceType; use oxc_span::SourceType;
use crate::{Fixer, LintContext, Linter, Message}; use crate::{Fixer, LintContext, Linter, Message};
#[derive(Debug)]
pub enum PathWork {
Begin(Box<Path>),
Finish(DiagnosticTuple),
Done,
}
pub struct LintService { pub struct LintService {
linter: Arc<Linter>, linter: Arc<Linter>,
processing: Arc<AtomicUsize>, pub tx_path: mpsc::Sender<PathWork>,
pub rx_path: mpsc::Receiver<PathWork>,
} }
impl LintService { impl LintService {
pub fn new(linter: Arc<Linter>) -> Self { pub fn new(linter: Arc<Linter>) -> Self {
Self { linter, processing: Arc::new(AtomicUsize::new(0)) } let (tx_path, rx_path) = mpsc::channel();
Self { linter, tx_path, rx_path }
} }
/// # Panics /// # Panics
pub fn run_path(&self, path: Box<Path>, tx_error: &DiagnosticSender) { pub fn run(self, tx_error: &DiagnosticSender) {
self.processing.fetch_add(1, Ordering::SeqCst);
let linter = Arc::clone(&self.linter);
let tx_error = tx_error.clone(); let tx_error = tx_error.clone();
let processing = Arc::clone(&self.processing); rayon::spawn(move || {
let mut processing: HashSet<Box<Path>> = HashSet::new();
let mut done = false;
while let Ok(work) = self.rx_path.recv() {
match work {
PathWork::Done => {
done = true;
}
PathWork::Begin(path) => {
processing.insert(path.clone());
self.run_path(path);
}
PathWork::Finish(diagnostics) => {
processing.remove(&diagnostics.0.clone().into_boxed_path());
if !diagnostics.1.is_empty() {
tx_error.send(Some(diagnostics)).unwrap();
}
if done && processing.is_empty() {
tx_error.send(None).unwrap();
break;
}
}
}
}
});
}
/// # Panics
pub fn run_path(&self, path: Box<Path>) {
let linter = Arc::clone(&self.linter);
let tx_path = self.tx_path.clone();
rayon::spawn(move || { rayon::spawn(move || {
let allocator = Allocator::default(); let allocator = Allocator::default();
let source_text = let source_text =
@ -48,14 +85,7 @@ impl LintService {
let errors = messages.into_iter().map(|m| m.error).collect(); let errors = messages.into_iter().map(|m| m.error).collect();
let diagnostics = DiagnosticService::wrap_diagnostics(&path, &source_text, errors); let diagnostics = DiagnosticService::wrap_diagnostics(&path, &source_text, errors);
if !diagnostics.1.is_empty() { tx_path.send(PathWork::Finish(diagnostics)).unwrap();
tx_error.send(Some(diagnostics)).unwrap();
}
processing.fetch_sub(1, Ordering::SeqCst);
if processing.load(Ordering::SeqCst) == 0 {
tx_error.send(None).unwrap();
}
}); });
} }