Skip to content

Commit 81159a3

Browse files
imotovfulmicoton
andauthored
Catch violations in times(...) expectations (#2766)
* Catch violations in times(...) expectations Adds a new quit() method to universe that causes all spawned actors to quit. It also collects actors exit codes that can be used to verify that all actors quit gracefully, which in turn allows to fail the test if mock objects expectations are not met. Relates to #2754 * Make clippy and rustfmt happy * Fix test_indexer_trigger_on_memory_limit * Update quickwit/quickwit-actors/src/actor_handle.rs Improve panic handling Co-authored-by: Paul Masurel <[email protected]> * Add universe.assert_quit() method Also adds a check to the universe.drop() to make sure that universe.assert_quit() was indeed called at the end of the test. * Stop test_ingest_api_source tests from hanging. * Fix compilation errors when testing a single crate Seems like rust-lang/cargo#8379 is the issue here. * Update assert_quits after merge * Avoid using map with side-effects. Co-authored-by: Paul Masurel <[email protected]> * Switch ActorJoinHandle to Shared * Explain why we join ingest source first * There should be only one universe. * Fix format --------- Co-authored-by: Paul Masurel <[email protected]>
1 parent e2b2002 commit 81159a3

37 files changed

+394
-100
lines changed

quickwit/quickwit-actors/src/actor_handle.rs

+4-10
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,20 @@ use std::fmt;
2121

2222
use serde::Serialize;
2323
use tokio::sync::{oneshot, watch};
24-
use tokio::task::JoinHandle;
2524
use tracing::error;
2625

2726
use crate::actor_state::ActorState;
2827
use crate::command::Observe;
2928
use crate::mailbox::Priority;
3029
use crate::observation::ObservationType;
30+
use crate::registry::ActorJoinHandle;
3131
use crate::{Actor, ActorContext, ActorExitStatus, Command, Mailbox, Observation};
3232

3333
/// An Actor Handle serves as an address to communicate with an actor.
3434
pub struct ActorHandle<A: Actor> {
3535
actor_context: ActorContext<A>,
3636
last_state: watch::Receiver<A::ObservableState>,
37-
join_handle: JoinHandle<ActorExitStatus>,
37+
join_handle: ActorJoinHandle,
3838
}
3939

4040
/// Describes the health of a given actor.
@@ -98,7 +98,7 @@ impl<A: Actor> Supervisable for ActorHandle<A> {
9898
impl<A: Actor> ActorHandle<A> {
9999
pub(crate) fn new(
100100
last_state: watch::Receiver<A::ObservableState>,
101-
join_handle: JoinHandle<ActorExitStatus>,
101+
join_handle: ActorJoinHandle,
102102
actor_context: ActorContext<A>,
103103
) -> Self {
104104
ActorHandle {
@@ -208,13 +208,7 @@ impl<A: Actor> ActorHandle<A> {
208208

209209
/// Waits until the actor exits by itself. This is the equivalent of `Thread::join`.
210210
pub async fn join(self) -> (ActorExitStatus, A::ObservableState) {
211-
let exit_status = self.join_handle.await.unwrap_or_else(|join_err| {
212-
if join_err.is_panic() {
213-
ActorExitStatus::Panicked
214-
} else {
215-
ActorExitStatus::Killed
216-
}
217-
});
211+
let exit_status = self.join_handle.join().await;
218212
let observation = self.last_state.borrow().clone();
219213
(exit_status, observation)
220214
}

quickwit/quickwit-actors/src/mailbox.rs

+3
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,7 @@ mod tests {
530530
assert!(backpressure_micros_counter.get() < 500);
531531
processed.await.unwrap();
532532
assert!(backpressure_micros_counter.get() < 500);
533+
universe.assert_quit().await;
533534
}
534535

535536
#[tokio::test]
@@ -563,6 +564,7 @@ mod tests {
563564
.await
564565
.unwrap();
565566
assert!(backpressure_micros_counter.get() > 1_000u64);
567+
universe.assert_quit().await;
566568
}
567569

568570
#[tokio::test]
@@ -584,6 +586,7 @@ mod tests {
584586
let elapsed = start.elapsed();
585587
assert!(elapsed.as_micros() > 1000);
586588
assert_eq!(backpressure_micros_counter.get(), 0);
589+
universe.assert_quit().await;
587590
}
588591

589592
#[tokio::test]

quickwit/quickwit-actors/src/registry.rs

+79-3
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,25 @@
1919

2020
use std::any::{Any, TypeId};
2121
use std::collections::HashMap;
22+
use std::pin::Pin;
2223
use std::sync::{Arc, RwLock};
2324
use std::time::Duration;
2425

2526
use async_trait::async_trait;
26-
use futures::future;
27+
use futures::future::{self, Shared};
28+
use futures::{Future, FutureExt};
2729
use serde::Serialize;
2830
use serde_json::Value as JsonValue;
31+
use tokio::task::JoinHandle;
2932

3033
use crate::command::Observe;
3134
use crate::mailbox::WeakMailbox;
32-
use crate::{Actor, Mailbox};
35+
use crate::{Actor, ActorExitStatus, Command, Mailbox};
3336

3437
struct TypedJsonObservable<A: Actor> {
3538
actor_instance_id: String,
3639
weak_mailbox: WeakMailbox<A>,
40+
join_handle: ActorJoinHandle,
3741
}
3842

3943
#[async_trait]
@@ -42,6 +46,8 @@ trait JsonObservable: Sync + Send {
4246
fn any(&self) -> &dyn Any;
4347
fn actor_instance_id(&self) -> &str;
4448
async fn observe(&self) -> Option<JsonValue>;
49+
async fn quit(&self) -> ActorExitStatus;
50+
async fn join(&self) -> ActorExitStatus;
4551
}
4652

4753
#[async_trait]
@@ -64,6 +70,17 @@ impl<A: Actor> JsonObservable for TypedJsonObservable<A> {
6470
let state: <A as Actor>::ObservableState = oneshot_rx.await.ok()?;
6571
serde_json::to_value(&state).ok()
6672
}
73+
74+
async fn quit(&self) -> ActorExitStatus {
75+
if let Some(mailbox) = self.weak_mailbox.upgrade() {
76+
let _ = mailbox.send_message_with_high_priority(Command::Quit);
77+
}
78+
self.join().await
79+
}
80+
81+
async fn join(&self) -> ActorExitStatus {
82+
self.join_handle.join().await
83+
}
6784
}
6885

6986
#[derive(Default, Clone)]
@@ -104,7 +121,7 @@ pub struct ActorObservation {
104121
}
105122

106123
impl ActorRegistry {
107-
pub fn register<A: Actor>(&self, mailbox: &Mailbox<A>) {
124+
pub fn register<A: Actor>(&self, mailbox: &Mailbox<A>, join_handle: ActorJoinHandle) {
108125
let typed_id = TypeId::of::<A>();
109126
let actor_instance_id = mailbox.actor_instance_id().to_string();
110127
let weak_mailbox = mailbox.downgrade();
@@ -117,6 +134,7 @@ impl ActorRegistry {
117134
.push(Arc::new(TypedJsonObservable {
118135
weak_mailbox,
119136
actor_instance_id,
137+
join_handle,
120138
}));
121139
}
122140

@@ -162,6 +180,31 @@ impl ActorRegistry {
162180
registry_for_type.gc();
163181
}
164182
}
183+
184+
pub async fn quit(&self) -> Vec<ActorExitStatus> {
185+
let mut obs_futures = Vec::new();
186+
for registry_for_type in self.actors.read().unwrap().values() {
187+
for obs in &registry_for_type.observables {
188+
let obs_clone = obs.clone();
189+
obs_futures.push(async move { obs_clone.quit().await });
190+
}
191+
}
192+
let res = future::join_all(obs_futures).await;
193+
res.into_iter().collect()
194+
}
195+
196+
pub fn is_empty(&self) -> bool {
197+
self.actors
198+
.read()
199+
.unwrap()
200+
.values()
201+
.all(|registry_for_type| {
202+
registry_for_type
203+
.observables
204+
.iter()
205+
.all(|obs| obs.is_disconnected())
206+
})
207+
}
165208
}
166209

167210
fn get_iter<A: Actor>(
@@ -181,6 +224,37 @@ fn get_iter<A: Actor>(
181224
.filter(|mailbox| !mailbox.is_disconnected())
182225
}
183226

227+
/// This structure contains an optional exit handle. The handle is present
228+
/// until the join() method is called.
229+
#[derive(Clone)]
230+
pub(crate) struct ActorJoinHandle {
231+
holder: Shared<Pin<Box<dyn Future<Output = ActorExitStatus> + Send>>>,
232+
}
233+
234+
impl ActorJoinHandle {
235+
pub(crate) fn new(join_handle: JoinHandle<ActorExitStatus>) -> Self {
236+
ActorJoinHandle {
237+
holder: Self::inner_join(join_handle).boxed().shared(),
238+
}
239+
}
240+
241+
async fn inner_join(join_handle: JoinHandle<ActorExitStatus>) -> ActorExitStatus {
242+
join_handle.await.unwrap_or_else(|join_err| {
243+
if join_err.is_panic() {
244+
ActorExitStatus::Panicked
245+
} else {
246+
ActorExitStatus::Killed
247+
}
248+
})
249+
}
250+
251+
/// Joins the actor and returns its exit status on the frist invocation.
252+
/// Returns None afterwards.
253+
pub(crate) async fn join(&self) -> ActorExitStatus {
254+
self.holder.clone().await
255+
}
256+
}
257+
184258
#[cfg(test)]
185259
mod tests {
186260
use std::time::Duration;
@@ -194,6 +268,7 @@ mod tests {
194268
let universe = Universe::with_accelerated_time();
195269
let (_mailbox, _handle) = universe.spawn_builder().spawn(test_actor);
196270
let _actor_mailbox = universe.get_one::<PingReceiverActor>().unwrap();
271+
universe.assert_quit().await;
197272
}
198273

199274
#[tokio::test]
@@ -222,5 +297,6 @@ mod tests {
222297
let (_mailbox, _handle) = universe.spawn_builder().spawn(test_actor);
223298
let obs = universe.observe(Duration::from_millis(1000)).await;
224299
assert_eq!(obs.len(), 1);
300+
universe.assert_quit().await;
225301
}
226302
}

quickwit/quickwit-actors/src/scheduler.rs

+2
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,7 @@ mod tests {
407407
assert_eq!(count.load(Ordering::SeqCst), 0);
408408
universe.sleep(Duration::from_millis(15)).await;
409409
assert_eq!(count.load(Ordering::SeqCst), 1);
410+
universe.assert_quit().await;
410411
}
411412

412413
#[tokio::test]
@@ -425,5 +426,6 @@ mod tests {
425426
let elapsed = start.elapsed();
426427
// The whole point is to accelerate time.
427428
assert!(elapsed.as_millis() < 50);
429+
universe.assert_quit().await;
428430
}
429431
}

quickwit/quickwit-actors/src/spawn_builder.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use tracing::{debug, error, info};
2424

2525
use crate::envelope::Envelope;
2626
use crate::mailbox::{create_mailbox, Inbox};
27-
use crate::registry::ActorRegistry;
27+
use crate::registry::{ActorJoinHandle, ActorRegistry};
2828
use crate::scheduler::{NoAdvanceTimeGuard, SchedulerClient};
2929
use crate::supervisor::Supervisor;
3030
use crate::{
@@ -161,11 +161,11 @@ impl<A: Actor> SpawnBuilder<A> {
161161
let (ctx, inbox, state_rx) = self.create_actor_context_and_inbox(&actor);
162162
debug!(actor_id = %ctx.actor_instance_id(), "spawn-actor");
163163
let mailbox = ctx.mailbox().clone();
164-
ctx.registry().register(&mailbox);
165164
let ctx_clone = ctx.clone();
166165
let loop_async_actor_future =
167166
async move { actor_loop(actor, inbox, no_advance_time_guard, ctx).await };
168-
let join_handle = runtime_handle.spawn(loop_async_actor_future);
167+
let join_handle = ActorJoinHandle::new(runtime_handle.spawn(loop_async_actor_future));
168+
ctx_clone.registry().register(&mailbox, join_handle.clone());
169169
let actor_handle = ActorHandle::new(state_rx, join_handle, ctx_clone);
170170
(mailbox, actor_handle)
171171
}

quickwit/quickwit-actors/src/supervisor.rs

+12
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,10 @@ mod tests {
285285
num_kills: 0
286286
}
287287
);
288+
assert!(!matches!(
289+
supervisor_handle.quit().await.0,
290+
ActorExitStatus::Panicked
291+
));
288292
}
289293

290294
#[tokio::test]
@@ -313,6 +317,10 @@ mod tests {
313317
num_kills: 0
314318
}
315319
);
320+
assert!(!matches!(
321+
supervisor_handle.quit().await.0,
322+
ActorExitStatus::Panicked
323+
));
316324
}
317325

318326
#[tokio::test]
@@ -354,6 +362,10 @@ mod tests {
354362
num_kills: 1
355363
}
356364
);
365+
assert!(!matches!(
366+
supervisor_handle.quit().await.0,
367+
ActorExitStatus::Panicked
368+
));
357369
}
358370

359371
#[tokio::test]

quickwit/quickwit-actors/src/tests.rs

+5
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@ async fn test_timeouting_actor() {
283283
assert_eq!(buggy_handle.harvest_health(), Health::Healthy);
284284
universe.sleep(crate::HEARTBEAT * 2).await;
285285
assert_eq!(buggy_handle.harvest_health(), Health::FailureOrUnhealthy);
286+
buggy_handle.kill().await;
286287
}
287288

288289
#[tokio::test]
@@ -305,6 +306,7 @@ async fn test_pause_actor() {
305306
.is_ok());
306307
let end_state = ping_handle.process_pending_and_observe().await.state;
307308
assert_eq!(end_state, 1000);
309+
universe.assert_quit().await;
308310
}
309311

310312
#[tokio::test]
@@ -320,6 +322,7 @@ async fn test_actor_running_states() {
320322
assert_eq!(*obs, 10);
321323
universe.sleep(Duration::from_millis(1)).await;
322324
assert!(ping_handle.state() == ActorState::Idle);
325+
universe.assert_quit().await;
323326
}
324327

325328
#[derive(Clone, Debug, Default, Serialize)]
@@ -551,6 +554,7 @@ async fn test_actor_return_response() -> anyhow::Result<()> {
551554
let plus_two_plus_four = mailbox.send_message(AddOperand(4)).await?;
552555
assert_eq!(plus_two.await.unwrap(), 2);
553556
assert_eq!(plus_two_plus_four.await.unwrap(), 6);
557+
universe.assert_quit().await;
554558
Ok(())
555559
}
556560

@@ -631,4 +635,5 @@ async fn test_drain_is_called() {
631635
drain_calls_count: 2
632636
}
633637
);
638+
universe.assert_quit().await;
634639
}

0 commit comments

Comments
 (0)