diff --git a/src/gallium/frontends/rusticl/core/event.rs b/src/gallium/frontends/rusticl/core/event.rs index cc14ec6dd08..fc35d14f42e 100644 --- a/src/gallium/frontends/rusticl/core/event.rs +++ b/src/gallium/frontends/rusticl/core/event.rs @@ -14,6 +14,7 @@ use std::sync::Arc; use std::sync::Condvar; use std::sync::Mutex; use std::sync::MutexGuard; +use std::sync::Weak; use std::time::Duration; // we assert that those are a continous range of numbers so we won't have to use HashMaps @@ -47,7 +48,8 @@ pub struct Event { pub context: Arc, pub queue: Option>, pub cmd_type: cl_command_type, - pub deps: Vec>, + // using a Weak ref so we don't cause stack overflows in the `drop` impl + pub deps: Vec>, state: Mutex, cv: Condvar, } @@ -61,6 +63,7 @@ impl Event { deps: Vec>, work: EventSig, ) -> Arc { + let deps = deps.iter().map(Arc::downgrade).collect(); Arc::new(Self { base: CLObjectBase::new(RusticlTypes::Event), context: queue.context.clone(), @@ -228,14 +231,18 @@ impl Event { } } - fn deep_unflushed_deps_impl<'a>(&'a self, result: &mut HashSet<&'a Event>) { + pub fn deps(&self) -> impl Iterator> + Clone + '_ { + self.deps.iter().filter_map(Weak::upgrade) + } + + fn deep_unflushed_deps_impl(self: &Arc, result: &mut HashSet>) { if self.status() <= CL_SUBMITTED as i32 { return; } // only scan dependencies if it's a new one - if result.insert(self) { - for e in &self.deps { + if result.insert(Arc::clone(self)) { + for e in self.deps() { e.deep_unflushed_deps_impl(result); } } @@ -243,7 +250,7 @@ impl Event { /// does a deep search and returns a list of all dependencies including `events` which haven't /// been flushed out yet - pub fn deep_unflushed_deps(events: &[Arc]) -> HashSet<&Event> { + pub fn deep_unflushed_deps(events: &[Arc]) -> HashSet> { let mut result = HashSet::new(); for e in events { diff --git a/src/gallium/frontends/rusticl/core/queue.rs b/src/gallium/frontends/rusticl/core/queue.rs index 522d77e476c..a33aa0786cb 100644 --- a/src/gallium/frontends/rusticl/core/queue.rs +++ b/src/gallium/frontends/rusticl/core/queue.rs @@ -127,16 +127,17 @@ impl Queue { let mut flushed = Vec::new(); for e in new_events { + let deps_iter = e.deps(); + // If we hit any deps from another queue, flush so we don't risk a dead - // lock. - if e.deps.iter().any(|ev| ev.queue != e.queue) { + // lock. Also clone the iter here as we'll iterate again later + if deps_iter.clone().any(|ev| ev.queue != e.queue) { + // this flush _has_ to happen before we wait on any of the deps flush_events(&mut flushed, &ctx); } // We have to wait on user events or events from other queues. - let err = e - .deps - .iter() + let err = deps_iter .filter(|ev| ev.is_user() || ev.queue != e.queue) .map(|e| e.wait()) .find(|s| *s < 0);