Skip to content

Commit fb61adc

Browse files
committed
Remove -Zjobserver-per-rustc
`-Zjobserver-per-rustc` has been broken for a long while. My guess is that we didn't make `-Zjobserver-token-request` into rustc (rust-lang/rust#67398). To reduce the complexity of the current job queue implementation, I propose to remove this potion. Cargo doesn't really receive any `jobserver_event`[^1] from the offical rustc. We can always bring this back if needed. [^1]: https://github.com/rust-lang/cargo/blob/65cab34dc75587eeaff68d3c19358c4d79041452/src/cargo/core/compiler/mod.rs#L1704-L1713
1 parent 65cab34 commit fb61adc

File tree

6 files changed

+8
-209
lines changed

6 files changed

+8
-209
lines changed

src/cargo/core/compiler/context/mod.rs

-24
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,6 @@ pub struct Context<'a, 'cfg> {
7171
/// metadata files in addition to the rlib itself.
7272
rmeta_required: HashSet<Unit>,
7373

74-
/// When we're in jobserver-per-rustc process mode, this keeps those
75-
/// jobserver clients for each Unit (which eventually becomes a rustc
76-
/// process).
77-
pub rustc_clients: HashMap<Unit, Client>,
78-
7974
/// Map of the LTO-status of each unit. This indicates what sort of
8075
/// compilation is happening (only object, only bitcode, both, etc), and is
8176
/// precalculated early on.
@@ -124,7 +119,6 @@ impl<'a, 'cfg> Context<'a, 'cfg> {
124119
primary_packages: HashSet::new(),
125120
files: None,
126121
rmeta_required: HashSet::new(),
127-
rustc_clients: HashMap::new(),
128122
lto: HashMap::new(),
129123
metadata_for_doc_units: HashMap::new(),
130124
failed_scrape_units: Arc::new(Mutex::new(HashSet::new())),
@@ -614,24 +608,6 @@ impl<'a, 'cfg> Context<'a, 'cfg> {
614608
self.rmeta_required.contains(unit)
615609
}
616610

617-
/// Used by `-Zjobserver-per-rustc`.
618-
pub fn new_jobserver(&mut self) -> CargoResult<Client> {
619-
let tokens = self.bcx.jobs() as usize;
620-
let client = Client::new(tokens).with_context(|| "failed to create jobserver")?;
621-
622-
// Drain the client fully
623-
for i in 0..tokens {
624-
client.acquire_raw().with_context(|| {
625-
format!(
626-
"failed to fully drain {}/{} token from jobserver at startup",
627-
i, tokens,
628-
)
629-
})?;
630-
}
631-
632-
Ok(client)
633-
}
634-
635611
/// Finds metadata for Doc/Docscrape units.
636612
///
637613
/// rustdoc needs a -Cmetadata flag in order to recognize StableCrateIds that refer to

src/cargo/core/compiler/job_queue/job_state.rs

-16
Original file line numberDiff line numberDiff line change
@@ -194,20 +194,4 @@ impl<'a, 'cfg> JobState<'a, 'cfg> {
194194
self.messages
195195
.push(Message::FutureIncompatReport(self.id, report));
196196
}
197-
198-
/// The rustc underlying this Job is about to acquire a jobserver token (i.e., block)
199-
/// on the passed client.
200-
///
201-
/// This should arrange for the associated client to eventually get a token via
202-
/// `client.release_raw()`.
203-
pub fn will_acquire(&self) {
204-
self.messages.push(Message::NeedsToken(self.id));
205-
}
206-
207-
/// The rustc underlying this Job is informing us that it is done with a jobserver token.
208-
///
209-
/// Note that it does *not* write that token back anywhere.
210-
pub fn release_token(&self) {
211-
self.messages.push(Message::ReleaseToken(self.id));
212-
}
213197
}

src/cargo/core/compiler/job_queue/mod.rs

+5-111
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ mod job;
123123
mod job_state;
124124

125125
use std::cell::RefCell;
126-
use std::collections::{BTreeMap, HashMap, HashSet};
126+
use std::collections::{HashMap, HashSet};
127127
use std::fmt::Write as _;
128128
use std::io;
129129
use std::path::{Path, PathBuf};
@@ -133,7 +133,7 @@ use std::time::Duration;
133133

134134
use anyhow::{format_err, Context as _};
135135
use cargo_util::ProcessBuilder;
136-
use jobserver::{Acquired, Client, HelperThread};
136+
use jobserver::{Acquired, HelperThread};
137137
use log::{debug, trace};
138138
use semver::Version;
139139

@@ -199,13 +199,6 @@ struct DrainState<'cfg> {
199199
/// single rustc process.
200200
tokens: Vec<Acquired>,
201201

202-
/// rustc per-thread tokens, when in jobserver-per-rustc mode.
203-
rustc_tokens: HashMap<JobId, Vec<Acquired>>,
204-
205-
/// This represents the list of rustc jobs (processes) and associated
206-
/// clients that are interested in receiving a token.
207-
to_send_clients: BTreeMap<JobId, Vec<Client>>,
208-
209202
/// The list of jobs that we have not yet started executing, but have
210203
/// retrieved from the `queue`. We eagerly pull jobs off the main queue to
211204
/// allow us to request jobserver tokens pretty early.
@@ -387,12 +380,6 @@ enum Message {
387380
Token(io::Result<Acquired>),
388381
Finish(JobId, Artifact, CargoResult<()>),
389382
FutureIncompatReport(JobId, Vec<FutureBreakageItem>),
390-
391-
// This client should get release_raw called on it with one of our tokens
392-
NeedsToken(JobId),
393-
394-
// A token previously passed to a NeedsToken client is being released.
395-
ReleaseToken(JobId),
396383
}
397384

398385
impl<'cfg> JobQueue<'cfg> {
@@ -507,8 +494,6 @@ impl<'cfg> JobQueue<'cfg> {
507494
next_id: 0,
508495
timings: self.timings,
509496
tokens: Vec::new(),
510-
rustc_tokens: HashMap::new(),
511-
to_send_clients: BTreeMap::new(),
512497
pending_queue: Vec::new(),
513498
print: DiagnosticPrinter::new(cx.bcx.config),
514499
finished: 0,
@@ -600,46 +585,9 @@ impl<'cfg> DrainState<'cfg> {
600585
self.active.len() < self.tokens.len() + 1
601586
}
602587

603-
// The oldest job (i.e., least job ID) is the one we grant tokens to first.
604-
fn pop_waiting_client(&mut self) -> (JobId, Client) {
605-
// FIXME: replace this with BTreeMap::first_entry when that stabilizes.
606-
let key = *self
607-
.to_send_clients
608-
.keys()
609-
.next()
610-
.expect("at least one waiter");
611-
let clients = self.to_send_clients.get_mut(&key).unwrap();
612-
let client = clients.pop().unwrap();
613-
if clients.is_empty() {
614-
self.to_send_clients.remove(&key);
615-
}
616-
(key, client)
617-
}
618-
619-
// If we managed to acquire some extra tokens, send them off to a waiting rustc.
620-
fn grant_rustc_token_requests(&mut self) -> CargoResult<()> {
621-
while !self.to_send_clients.is_empty() && self.has_extra_tokens() {
622-
let (id, client) = self.pop_waiting_client();
623-
// This unwrap is guaranteed to succeed. `active` must be at least
624-
// length 1, as otherwise there can't be a client waiting to be sent
625-
// on, so tokens.len() must also be at least one.
626-
let token = self.tokens.pop().unwrap();
627-
self.rustc_tokens
628-
.entry(id)
629-
.or_insert_with(Vec::new)
630-
.push(token);
631-
client
632-
.release_raw()
633-
.with_context(|| "failed to release jobserver token")?;
634-
}
635-
636-
Ok(())
637-
}
638-
639588
fn handle_event(
640589
&mut self,
641590
cx: &mut Context<'_, '_>,
642-
jobserver_helper: &HelperThread,
643591
plan: &mut BuildPlan,
644592
event: Message,
645593
) -> Result<(), ErrorToHandle> {
@@ -699,19 +647,6 @@ impl<'cfg> DrainState<'cfg> {
699647
Artifact::All => {
700648
trace!("end: {:?}", id);
701649
self.finished += 1;
702-
if let Some(rustc_tokens) = self.rustc_tokens.remove(&id) {
703-
// This puts back the tokens that this rustc
704-
// acquired into our primary token list.
705-
//
706-
// This represents a rustc bug: it did not
707-
// release all of its thread tokens but finished
708-
// completely. But we want to make Cargo resilient
709-
// to such rustc bugs, as they're generally not
710-
// fatal in nature (i.e., Cargo can make progress
711-
// still, and the build might not even fail).
712-
self.tokens.extend(rustc_tokens);
713-
}
714-
self.to_send_clients.remove(&id);
715650
self.report_warning_count(
716651
cx.bcx.config,
717652
id,
@@ -756,31 +691,6 @@ impl<'cfg> DrainState<'cfg> {
756691
let token = acquired_token.with_context(|| "failed to acquire jobserver token")?;
757692
self.tokens.push(token);
758693
}
759-
Message::NeedsToken(id) => {
760-
trace!("queue token request");
761-
jobserver_helper.request_token();
762-
let client = cx.rustc_clients[&self.active[&id]].clone();
763-
self.to_send_clients
764-
.entry(id)
765-
.or_insert_with(Vec::new)
766-
.push(client);
767-
}
768-
Message::ReleaseToken(id) => {
769-
// Note that this pops off potentially a completely
770-
// different token, but all tokens of the same job are
771-
// conceptually the same so that's fine.
772-
//
773-
// self.tokens is a "pool" -- the order doesn't matter -- and
774-
// this transfers ownership of the token into that pool. If we
775-
// end up using it on the next go around, then this token will
776-
// be truncated, same as tokens obtained through Message::Token.
777-
let rustc_tokens = self
778-
.rustc_tokens
779-
.get_mut(&id)
780-
.expect("no tokens associated");
781-
self.tokens
782-
.push(rustc_tokens.pop().expect("rustc releases token it has"));
783-
}
784694
}
785695

786696
Ok(())
@@ -795,23 +705,12 @@ impl<'cfg> DrainState<'cfg> {
795705
// listen for a message with a timeout, and on timeout we run the
796706
// previous parts of the loop again.
797707
let mut events = self.messages.try_pop_all();
798-
trace!(
799-
"tokens in use: {}, rustc_tokens: {:?}, waiting_rustcs: {:?} (events this tick: {})",
800-
self.tokens.len(),
801-
self.rustc_tokens
802-
.iter()
803-
.map(|(k, j)| (k, j.len()))
804-
.collect::<Vec<_>>(),
805-
self.to_send_clients
806-
.iter()
807-
.map(|(k, j)| (k, j.len()))
808-
.collect::<Vec<_>>(),
809-
events.len(),
810-
);
811708
if events.is_empty() {
812709
loop {
813710
self.tick_progress();
711+
// TODO: this is the beef!
814712
self.tokens.truncate(self.active.len() - 1);
713+
// TODO: line above is the beef.
815714
match self.messages.pop(Duration::from_millis(500)) {
816715
Some(message) => {
817716
events.push(message);
@@ -866,17 +765,13 @@ impl<'cfg> DrainState<'cfg> {
866765
break;
867766
}
868767

869-
if let Err(e) = self.grant_rustc_token_requests() {
870-
self.handle_error(&mut cx.bcx.config.shell(), &mut errors, e);
871-
}
872-
873768
// And finally, before we block waiting for the next event, drop any
874769
// excess tokens we may have accidentally acquired. Due to how our
875770
// jobserver interface is architected we may acquire a token that we
876771
// don't actually use, and if this happens just relinquish it back
877772
// to the jobserver itself.
878773
for event in self.wait_for_events() {
879-
if let Err(event_err) = self.handle_event(cx, jobserver_helper, plan, event) {
774+
if let Err(event_err) = self.handle_event(cx, plan, event) {
880775
self.handle_error(&mut cx.bcx.config.shell(), &mut errors, event_err);
881776
}
882777
}
@@ -970,7 +865,6 @@ impl<'cfg> DrainState<'cfg> {
970865
self.active.len(),
971866
self.pending_queue.len(),
972867
self.queue.len(),
973-
self.rustc_tokens.len(),
974868
);
975869
self.timings.record_cpu();
976870

src/cargo/core/compiler/mod.rs

+1-33
Original file line numberDiff line numberDiff line change
@@ -715,14 +715,7 @@ fn prepare_rustc(
715715
base.env("CARGO_TARGET_TMPDIR", tmp.display().to_string());
716716
}
717717

718-
if cx.bcx.config.cli_unstable().jobserver_per_rustc {
719-
let client = cx.new_jobserver()?;
720-
base.inherit_jobserver(&client);
721-
base.arg("-Z").arg("jobserver-token-requests");
722-
assert!(cx.rustc_clients.insert(unit.clone(), client).is_none());
723-
} else {
724-
base.inherit_jobserver(&cx.jobserver);
725-
}
718+
base.inherit_jobserver(&cx.jobserver);
726719
build_base_args(cx, &mut base, unit, crate_types)?;
727720
build_deps_args(&mut base, cx, unit)?;
728721
Ok(base)
@@ -1701,31 +1694,6 @@ fn on_stderr_line_inner(
17011694
return Ok(false);
17021695
}
17031696

1704-
#[derive(serde::Deserialize)]
1705-
struct JobserverNotification {
1706-
jobserver_event: Event,
1707-
}
1708-
1709-
#[derive(Debug, serde::Deserialize)]
1710-
enum Event {
1711-
WillAcquire,
1712-
Release,
1713-
}
1714-
1715-
if let Ok(JobserverNotification { jobserver_event }) =
1716-
serde_json::from_str::<JobserverNotification>(compiler_message.get())
1717-
{
1718-
trace!(
1719-
"found jobserver directive from rustc: `{:?}`",
1720-
jobserver_event
1721-
);
1722-
match jobserver_event {
1723-
Event::WillAcquire => state.will_acquire(),
1724-
Event::Release => state.release_token(),
1725-
}
1726-
return Ok(false);
1727-
}
1728-
17291697
// And failing all that above we should have a legitimate JSON diagnostic
17301698
// from the compiler, so wrap it in an external Cargo JSON message
17311699
// indicating which package it came from and then emit it.

src/cargo/core/compiler/timings.rs

+2-23
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,6 @@ struct Concurrency {
9292
/// Number of units that are not yet ready, because they are waiting for
9393
/// dependencies to finish.
9494
inactive: usize,
95-
/// Number of rustc "extra" threads -- i.e., how many tokens have been
96-
/// provided across all current rustc instances that are not the main thread
97-
/// tokens.
98-
rustc_parallelism: usize,
9995
}
10096

10197
impl<'cfg> Timings<'cfg> {
@@ -240,13 +236,7 @@ impl<'cfg> Timings<'cfg> {
240236
}
241237

242238
/// This is called periodically to mark the concurrency of internal structures.
243-
pub fn mark_concurrency(
244-
&mut self,
245-
active: usize,
246-
waiting: usize,
247-
inactive: usize,
248-
rustc_parallelism: usize,
249-
) {
239+
pub fn mark_concurrency(&mut self, active: usize, waiting: usize, inactive: usize) {
250240
if !self.enabled {
251241
return;
252242
}
@@ -255,7 +245,6 @@ impl<'cfg> Timings<'cfg> {
255245
active,
256246
waiting,
257247
inactive,
258-
rustc_parallelism,
259248
};
260249
self.concurrency.push(c);
261250
}
@@ -307,7 +296,7 @@ impl<'cfg> Timings<'cfg> {
307296
if !self.enabled {
308297
return Ok(());
309298
}
310-
self.mark_concurrency(0, 0, 0, 0);
299+
self.mark_concurrency(0, 0, 0);
311300
self.unit_times
312301
.sort_unstable_by(|a, b| a.start.partial_cmp(&b.start).unwrap());
313302
if self.report_html {
@@ -391,12 +380,6 @@ impl<'cfg> Timings<'cfg> {
391380
let num_cpus = available_parallelism()
392381
.map(|x| x.get().to_string())
393382
.unwrap_or_else(|_| "n/a".into());
394-
let max_rustc_concurrency = self
395-
.concurrency
396-
.iter()
397-
.map(|c| c.rustc_parallelism)
398-
.max()
399-
.unwrap();
400383
let rustc_info = render_rustc_info(bcx);
401384
let error_msg = match error {
402385
Some(e) => format!(
@@ -440,9 +423,6 @@ impl<'cfg> Timings<'cfg> {
440423
<tr>
441424
<td>rustc:</td><td>{}</td>
442425
</tr>
443-
<tr>
444-
<td>Max (global) rustc threads concurrency:</td><td>{}</td>
445-
</tr>
446426
{}
447427
</table>
448428
"#,
@@ -457,7 +437,6 @@ impl<'cfg> Timings<'cfg> {
457437
self.start_str,
458438
total_time,
459439
rustc_info,
460-
max_rustc_concurrency,
461440
error_msg,
462441
)?;
463442
Ok(())

0 commit comments

Comments
 (0)