@@ -182,23 +182,16 @@ impl<'e, B: BackingStorage> ExecuteContextImpl<'e, B> {
182182 self . backend . should_restore ( ) && self . backend . local_is_partial
183183 }
184184
185- fn restore_task_data (
185+ fn lookup_task_data (
186186 & self ,
187187 task_id : TaskId ,
188188 category : SpecificTaskDataCategory ,
189- ) -> TaskStorage {
189+ ) -> Option < turbo_persistence :: ArcBytes > {
190190 if !self . should_check_backing_storage ( ) {
191- // If we don't need to restore, we can just return an empty storage
192- return TaskStorage :: default ( ) ;
191+ return None ;
193192 }
194- let mut storage = TaskStorage :: default ( ) ;
195- let result = self
196- . backend
197- . backing_storage
198- . lookup_data ( task_id, category, & mut storage) ;
199-
200- match result {
201- Ok ( ( ) ) => storage,
193+ match self . backend . backing_storage . lookup_data ( task_id, category) {
194+ Ok ( bytes) => bytes,
202195 Err ( e) => {
203196 panic ! (
204197 "Failed to restore task data (corrupted database or bug): {:?}" ,
@@ -531,28 +524,39 @@ impl<'e, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, B> {
531524 category. includes_meta ( ) && !task. flags . is_restored ( TaskDataCategory :: Meta ) ;
532525
533526 if needs_data || needs_meta {
534- // Avoid holding the lock too long since this can also affect other tasks
535- // Drop lock once, do all I/O, then re-acquire once
527+ // Avoid holding the lock too long since this can also affect other tasks.
528+ // Drop lock once, do all I/O (returning raw bytes), then re-acquire once
529+ // and decode directly into the live task — no scratch TaskStorage needed.
536530 drop ( task) ;
537531
538- let storage_data = needs_data
539- . then ( || self . restore_task_data ( task_id, SpecificTaskDataCategory :: Data ) ) ;
540- let storage_meta = needs_meta
541- . then ( || self . restore_task_data ( task_id, SpecificTaskDataCategory :: Meta ) ) ;
532+ let bytes_data = needs_data
533+ . then ( || self . lookup_task_data ( task_id, SpecificTaskDataCategory :: Data ) ) ;
534+ let bytes_meta = needs_meta
535+ . then ( || self . lookup_task_data ( task_id, SpecificTaskDataCategory :: Meta ) ) ;
542536
543537 task = self . backend . storage . access_mut ( task_id) ;
544538
545- // Handle race conditions and merge
546- if let Some ( storage) = storage_data
547- && !task. flags . is_restored ( TaskDataCategory :: Data )
548- {
549- task. restore_from ( storage, TaskDataCategory :: Data ) ;
539+ // Handle race conditions and decode directly into live storage.
540+ // set_restored is called even when no bytes found (task simply has no
541+ // persisted data for this category).
542+ if needs_data && !task. flags . is_restored ( TaskDataCategory :: Data ) {
543+ if let Some ( bytes) = bytes_data. flatten ( ) {
544+ let mut decoder = new_turbo_bincode_decoder ( & bytes) ;
545+ task. decode ( SpecificTaskDataCategory :: Data , & mut decoder)
546+ . unwrap_or_else ( |e| {
547+ panic ! ( "Failed to decode Data for {task_id:?}: {e:?}" )
548+ } ) ;
549+ }
550550 task. flags . set_restored ( TaskDataCategory :: Data ) ;
551551 }
552- if let Some ( storage) = storage_meta
553- && !task. flags . is_restored ( TaskDataCategory :: Meta )
554- {
555- task. restore_from ( storage, TaskDataCategory :: Meta ) ;
552+ if needs_meta && !task. flags . is_restored ( TaskDataCategory :: Meta ) {
553+ if let Some ( bytes) = bytes_meta. flatten ( ) {
554+ let mut decoder = new_turbo_bincode_decoder ( & bytes) ;
555+ task. decode ( SpecificTaskDataCategory :: Meta , & mut decoder)
556+ . unwrap_or_else ( |e| {
557+ panic ! ( "Failed to decode Meta for {task_id:?}: {e:?}" )
558+ } ) ;
559+ }
556560 task. flags . set_restored ( TaskDataCategory :: Meta ) ;
557561 }
558562 }
@@ -625,47 +629,69 @@ impl<'e, B: BackingStorage> ExecuteContext<'e> for ExecuteContextImpl<'e, B> {
625629 category. includes_meta ( ) && !task2. flags . is_restored ( TaskDataCategory :: Meta ) ;
626630
627631 if needs_data1 || needs_meta1 || needs_data2 || needs_meta2 {
628- // Avoid holding the lock too long since this can also affect other tasks
629- // Drop locks once, do all I/O, then re-acquire once
632+ // Avoid holding the lock too long since this can also affect other tasks.
633+ // Drop locks once, do all I/O (returning raw bytes), then re-acquire once
634+ // and decode directly into the live tasks — no scratch TaskStorage needed.
630635 drop ( task1) ;
631636 drop ( task2) ;
632637
633- let storage_data1 = needs_data1
634- . then ( || self . restore_task_data ( task_id1, SpecificTaskDataCategory :: Data ) ) ;
635- let storage_meta1 = needs_meta1
636- . then ( || self . restore_task_data ( task_id1, SpecificTaskDataCategory :: Meta ) ) ;
637- let storage_data2 = needs_data2
638- . then ( || self . restore_task_data ( task_id2, SpecificTaskDataCategory :: Data ) ) ;
639- let storage_meta2 = needs_meta2
640- . then ( || self . restore_task_data ( task_id2, SpecificTaskDataCategory :: Meta ) ) ;
638+ let bytes_data1 = needs_data1
639+ . then ( || self . lookup_task_data ( task_id1, SpecificTaskDataCategory :: Data ) ) ;
640+ let bytes_meta1 = needs_meta1
641+ . then ( || self . lookup_task_data ( task_id1, SpecificTaskDataCategory :: Meta ) ) ;
642+ let bytes_data2 = needs_data2
643+ . then ( || self . lookup_task_data ( task_id2, SpecificTaskDataCategory :: Data ) ) ;
644+ let bytes_meta2 = needs_meta2
645+ . then ( || self . lookup_task_data ( task_id2, SpecificTaskDataCategory :: Meta ) ) ;
641646
642647 let ( t1, t2) = self . backend . storage . access_pair_mut ( task_id1, task_id2) ;
643648 task1 = t1;
644649 task2 = t2;
645650
646- // Merge results, handling race conditions
647- if let Some ( storage) = storage_data1
648- && !task1. flags . is_restored ( TaskDataCategory :: Data )
649- {
650- task1. restore_from ( storage, TaskDataCategory :: Data ) ;
651+ // Decode directly into live storage, handling race conditions.
652+ // set_restored is called even when no bytes found.
653+ if needs_data1 && !task1. flags . is_restored ( TaskDataCategory :: Data ) {
654+ if let Some ( bytes) = bytes_data1. flatten ( ) {
655+ let mut decoder = new_turbo_bincode_decoder ( & bytes) ;
656+ task1
657+ . decode ( SpecificTaskDataCategory :: Data , & mut decoder)
658+ . unwrap_or_else ( |e| {
659+ panic ! ( "Failed to decode Data for {task_id1:?}: {e:?}" )
660+ } ) ;
661+ }
651662 task1. flags . set_restored ( TaskDataCategory :: Data ) ;
652663 }
653- if let Some ( storage) = storage_meta1
654- && !task1. flags . is_restored ( TaskDataCategory :: Meta )
655- {
656- task1. restore_from ( storage, TaskDataCategory :: Meta ) ;
664+ if needs_meta1 && !task1. flags . is_restored ( TaskDataCategory :: Meta ) {
665+ if let Some ( bytes) = bytes_meta1. flatten ( ) {
666+ let mut decoder = new_turbo_bincode_decoder ( & bytes) ;
667+ task1
668+ . decode ( SpecificTaskDataCategory :: Meta , & mut decoder)
669+ . unwrap_or_else ( |e| {
670+ panic ! ( "Failed to decode Meta for {task_id1:?}: {e:?}" )
671+ } ) ;
672+ }
657673 task1. flags . set_restored ( TaskDataCategory :: Meta ) ;
658674 }
659- if let Some ( storage) = storage_data2
660- && !task2. flags . is_restored ( TaskDataCategory :: Data )
661- {
662- task2. restore_from ( storage, TaskDataCategory :: Data ) ;
675+ if needs_data2 && !task2. flags . is_restored ( TaskDataCategory :: Data ) {
676+ if let Some ( bytes) = bytes_data2. flatten ( ) {
677+ let mut decoder = new_turbo_bincode_decoder ( & bytes) ;
678+ task2
679+ . decode ( SpecificTaskDataCategory :: Data , & mut decoder)
680+ . unwrap_or_else ( |e| {
681+ panic ! ( "Failed to decode Data for {task_id2:?}: {e:?}" )
682+ } ) ;
683+ }
663684 task2. flags . set_restored ( TaskDataCategory :: Data ) ;
664685 }
665- if let Some ( storage) = storage_meta2
666- && !task2. flags . is_restored ( TaskDataCategory :: Meta )
667- {
668- task2. restore_from ( storage, TaskDataCategory :: Meta ) ;
686+ if needs_meta2 && !task2. flags . is_restored ( TaskDataCategory :: Meta ) {
687+ if let Some ( bytes) = bytes_meta2. flatten ( ) {
688+ let mut decoder = new_turbo_bincode_decoder ( & bytes) ;
689+ task2
690+ . decode ( SpecificTaskDataCategory :: Meta , & mut decoder)
691+ . unwrap_or_else ( |e| {
692+ panic ! ( "Failed to decode Meta for {task_id2:?}: {e:?}" )
693+ } ) ;
694+ }
669695 task2. flags . set_restored ( TaskDataCategory :: Meta ) ;
670696 }
671697 }
0 commit comments