Skip to content

Commit e753b8d

Browse files
authored
Add ability to prefix worker_id in config (TraceMachina#1578)
WorkerId can now be prefixed with a config. This will cause all internal WorkerIds to be prefixed with this config followed by a generated UUIDv6.
1 parent 60b0049 commit e753b8d

File tree

20 files changed

+218
-156
lines changed

20 files changed

+218
-156
lines changed

Diff for: Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: nativelink-config/src/cas_server.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,8 @@ pub struct UploadActionResultConfig {
605605
#[serde(deny_unknown_fields)]
606606
pub struct LocalWorkerConfig {
607607
/// Name of the worker. This is give a more friendly name to a worker for logging
608-
/// and metric publishing.
608+
/// and metric publishing. This is also the prefix of the worker id
609+
/// (ie: "{name}{uuidv6}").
609610
/// Default: {Index position in the workers list}
610611
#[serde(default, deserialize_with = "convert_string_with_shellexpand")]
611612
pub name: String,

Diff for: nativelink-proto/com/github/trace_machina/nativelink/remote_execution/worker_api.proto

+10-4
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ service WorkerApi {
3333
/// this worker supports. The response must be listened on the client
3434
/// side for updates from the server. The first item sent will always be
3535
/// a ConnectionResult, after that it is undefined.
36-
rpc ConnectWorker(SupportedProperties) returns (stream UpdateForWorker);
36+
rpc ConnectWorker(ConnectWorkerRequest) returns (stream UpdateForWorker);
3737

3838
/// Message used to let the scheduler know that it is still alive as
3939
/// well as check to see if the scheduler is still alive. The scheduler
@@ -74,8 +74,8 @@ message GoingAwayRequest {
7474
}
7575

7676
/// Represents the initial request sent to the scheduler informing the
77-
/// scheduler about this worker's capabilities.
78-
message SupportedProperties {
77+
/// scheduler about this worker's capabilities and metadata.
78+
message ConnectWorkerRequest {
7979
/// The list of properties this worker can support. The exact
8080
/// implementation is driven by the configuration matrix between the
8181
/// worker and scheduler.
@@ -87,7 +87,13 @@ message SupportedProperties {
8787
/// The details on how to use this property can be found here:
8888
/// https://github.com/TraceMachina/nativelink/blob/3147265047544572e3483c985e4aab0f9fdded38/nativelink-config/src/cas_server.rs
8989
repeated build.bazel.remote.execution.v2.Platform.Property properties = 1;
90-
reserved 2; // NextId.
90+
91+
/// Prefix to use for worker IDs. This is primarily used for debugging
92+
/// or for other systems to identify workers. The scheduler will always
93+
/// append this prefix to the assigned worker_id followed by a UUIDv6.
94+
string worker_id_prefix = 2;
95+
96+
reserved 3; // NextId.
9197
}
9298

9399
/// The result of an ExecutionRequest.

Diff for: nativelink-proto/genproto/com.github.trace_machina.nativelink.remote_execution.pb.rs

+11-6
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ pub struct GoingAwayRequest {
2828
pub worker_id: ::prost::alloc::string::String,
2929
}
3030
/// / Represents the initial request sent to the scheduler informing the
31-
/// / scheduler about this worker's capabilities.
31+
/// / scheduler about this worker's capabilities and metadata.
3232
#[derive(Clone, PartialEq, ::prost::Message)]
33-
pub struct SupportedProperties {
33+
pub struct ConnectWorkerRequest {
3434
/// / The list of properties this worker can support. The exact
3535
/// / implementation is driven by the configuration matrix between the
3636
/// / worker and scheduler.
@@ -45,6 +45,11 @@ pub struct SupportedProperties {
4545
pub properties: ::prost::alloc::vec::Vec<
4646
super::super::super::super::super::build::bazel::remote::execution::v2::platform::Property,
4747
>,
48+
/// / Prefix to use for worker IDs. This is primarily used for debugging
49+
/// / or for other systems to identify workers. The scheduler will always
50+
/// / append this prefix to the assigned worker_id followed by a UUIDv6.
51+
#[prost(string, tag = "2")]
52+
pub worker_id_prefix: ::prost::alloc::string::String,
4853
}
4954
/// / The result of an ExecutionRequest.
5055
#[derive(Clone, PartialEq, ::prost::Message)]
@@ -261,7 +266,7 @@ pub mod worker_api_client {
261266
/// / a ConnectionResult, after that it is undefined.
262267
pub async fn connect_worker(
263268
&mut self,
264-
request: impl tonic::IntoRequest<super::SupportedProperties>,
269+
request: impl tonic::IntoRequest<super::ConnectWorkerRequest>,
265270
) -> std::result::Result<
266271
tonic::Response<tonic::codec::Streaming<super::UpdateForWorker>>,
267272
tonic::Status,
@@ -410,7 +415,7 @@ pub mod worker_api_server {
410415
/// / a ConnectionResult, after that it is undefined.
411416
async fn connect_worker(
412417
&self,
413-
request: tonic::Request<super::SupportedProperties>,
418+
request: tonic::Request<super::ConnectWorkerRequest>,
414419
) -> std::result::Result<
415420
tonic::Response<Self::ConnectWorkerStream>,
416421
tonic::Status,
@@ -533,7 +538,7 @@ pub mod worker_api_server {
533538
struct ConnectWorkerSvc<T: WorkerApi>(pub Arc<T>);
534539
impl<
535540
T: WorkerApi,
536-
> tonic::server::ServerStreamingService<super::SupportedProperties>
541+
> tonic::server::ServerStreamingService<super::ConnectWorkerRequest>
537542
for ConnectWorkerSvc<T> {
538543
type Response = super::UpdateForWorker;
539544
type ResponseStream = T::ConnectWorkerStream;
@@ -543,7 +548,7 @@ pub mod worker_api_server {
543548
>;
544549
fn call(
545550
&mut self,
546-
request: tonic::Request<super::SupportedProperties>,
551+
request: tonic::Request<super::ConnectWorkerRequest>,
547552
) -> Self::Future {
548553
let inner = Arc::clone(&self.0);
549554
let fut = async move {

Diff for: nativelink-scheduler/src/api_worker_scheduler.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ impl ApiWorkerSchedulerImpl {
111111
for operation_id in worker.running_action_infos.keys() {
112112
if self
113113
.operation_keep_alive_tx
114-
.send((operation_id.clone(), *worker_id))
114+
.send((operation_id.clone(), worker_id.clone()))
115115
.is_err()
116116
{
117117
event!(
@@ -128,8 +128,8 @@ impl ApiWorkerSchedulerImpl {
128128
/// Adds a worker to the pool.
129129
/// Note: This function will not do any task matching.
130130
fn add_worker(&mut self, worker: Worker) -> Result<(), Error> {
131-
let worker_id = worker.id;
132-
self.workers.put(worker_id, worker);
131+
let worker_id = worker.id.clone();
132+
self.workers.put(worker_id.clone(), worker);
133133

134134
// Worker is not cloneable, and we do not want to send the initial connection results until
135135
// we have added it to the map, or we might get some strange race conditions due to the way
@@ -189,7 +189,7 @@ impl ApiWorkerSchedulerImpl {
189189
w.can_accept_work() && platform_properties.is_satisfied_by(&w.platform_properties)
190190
}),
191191
};
192-
workers_iter.map(|(_, w)| &w.id).copied()
192+
workers_iter.map(|(_, w)| w.id.clone())
193193
}
194194

195195
async fn update_action(
@@ -451,7 +451,7 @@ impl WorkerScheduler for ApiWorkerScheduler {
451451

452452
async fn add_worker(&self, worker: Worker) -> Result<(), Error> {
453453
let mut inner = self.inner.lock().await;
454-
let worker_id = worker.id;
454+
let worker_id = worker.id.clone();
455455
let result = inner
456456
.add_worker(worker)
457457
.err_tip(|| "Error while adding worker, removing from pool");
@@ -505,7 +505,7 @@ impl WorkerScheduler for ApiWorkerScheduler {
505505
.rev()
506506
.map_while(|(worker_id, worker)| {
507507
if worker.last_update_timestamp <= now_timestamp - self.worker_timeout_s {
508-
Some(*worker_id)
508+
Some(worker_id.clone())
509509
} else {
510510
None
511511
}

Diff for: nativelink-scheduler/src/awaited_action_db/awaited_action.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,8 @@ impl AwaitedAction {
156156
self.maybe_origin_metadata.as_ref()
157157
}
158158

159-
pub(crate) fn worker_id(&self) -> Option<WorkerId> {
160-
self.worker_id
159+
pub(crate) fn worker_id(&self) -> Option<&WorkerId> {
160+
self.worker_id.as_ref()
161161
}
162162

163163
pub(crate) fn last_worker_updated_timestamp(&self) -> SystemTime {

Diff for: nativelink-scheduler/src/simple_scheduler_state_manager.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ where
354354
}
355355
}
356356

357-
if filter.worker_id.is_some() && filter.worker_id != awaited_action.worker_id() {
357+
if filter.worker_id.is_some() && filter.worker_id.as_ref() != awaited_action.worker_id() {
358358
return false;
359359
}
360360

@@ -500,7 +500,7 @@ where
500500
// worker that was assigned.
501501
if awaited_action.worker_id().is_some()
502502
&& maybe_worker_id.is_some()
503-
&& maybe_worker_id != awaited_action.worker_id().as_ref()
503+
&& maybe_worker_id != awaited_action.worker_id()
504504
{
505505
// If another worker is already assigned to the action, another
506506
// worker probably picked up the action. We should not update the
@@ -575,7 +575,7 @@ where
575575
// which worker sent the update.
576576
awaited_action.set_worker_id(None, now);
577577
} else {
578-
awaited_action.set_worker_id(maybe_worker_id.copied(), now);
578+
awaited_action.set_worker_id(maybe_worker_id.cloned(), now);
579579
}
580580
awaited_action.worker_set_state(
581581
Arc::new(ActionState {

Diff for: nativelink-scheduler/src/worker.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ impl Worker {
160160
send_msg_to_worker(
161161
&mut self.tx,
162162
update_for_worker::Update::ConnectionResult(ConnectionResult {
163-
worker_id: self.id.to_string(),
163+
worker_id: self.id.clone().into(),
164164
}),
165165
)
166166
.err_tip(|| format!("Failed to send ConnectionResult to worker : {}", self.id))
@@ -181,7 +181,7 @@ impl Worker {
181181

182182
pub fn keep_alive(&mut self) -> Result<(), Error> {
183183
let tx = &mut self.tx;
184-
let id = self.id;
184+
let id = &self.id;
185185
self.metrics.keep_alive.wrap(move || {
186186
send_msg_to_worker(tx, update_for_worker::Update::KeepAlive(()))
187187
.err_tip(|| format!("Failed to send KeepAlive to worker : {id}"))
@@ -196,7 +196,7 @@ impl Worker {
196196
let tx = &mut self.tx;
197197
let worker_platform_properties = &mut self.platform_properties;
198198
let running_action_infos = &mut self.running_action_infos;
199-
let worker_id = self.id.to_string();
199+
let worker_id = self.id.clone().into();
200200
self.metrics
201201
.run_action
202202
.wrap(async move {

0 commit comments

Comments
 (0)