@@ -774,23 +774,32 @@ o2::framework::ServiceSpec ArrowSupport::arrowTableSlicingCacheSpec()
774774 .preProcessing = [](ProcessingContext& pc, void * service_ptr) {
775775 auto * service = static_cast <ArrowTableSlicingCache*>(service_ptr);
776776 auto & caches = service->bindingsKeys ;
777- for (auto i = 0u ; i < caches.size (); ++i) {
778- if (caches[i].enabled && pc.inputs ().getPos (caches[i].binding .c_str ()) >= 0 ) {
779- auto status = service->updateCacheEntry (i, pc.inputs ().get <TableConsumer>(caches[i].matcher )->asArrowTable ());
780- if (!status.ok ()) {
781- throw runtime_error_f (" Failed to update slice cache for %s/%s" , caches[i].binding .c_str (), caches[i].key .c_str ());
782- }
783- }
784- }
777+
778+ int counter = 0 ;
779+ (void ) (caches |
780+ std::views::transform ([&counter](auto const & entry){ return std::make_pair (counter++, entry); }) |
781+ std::views::filter ([&pc](auto const & pair){ return pair.second .enabled && pc.inputs ().getPos (pair.second .matcher ) >= 0 ; }) |
782+ std::views::transform ([&service, &pc](auto const & pair){
783+ auto status = service->updateCacheEntry (pair.first , pc.inputs ().get <TableConsumer>(pair.second .matcher )->asArrowTable ());
784+ if (!status.ok ()) {
785+ throw runtime_error_f (" Failed to update slice cache for %s/%s: %s" , pair.second .binding .c_str (), pair.second .key .c_str (), status.ToString ().c_str ());
786+ }
787+ return status;
788+ }));
789+
790+ counter = 0 ;
785791 auto & unsortedCaches = service->bindingsKeysUnsorted ;
786- for (auto i = 0u ; i < unsortedCaches.size (); ++i) {
787- if (unsortedCaches[i].enabled && pc.inputs ().getPos (unsortedCaches[i].binding .c_str ()) >= 0 ) {
788- auto status = service->updateCacheEntryUnsorted (i, pc.inputs ().get <TableConsumer>(unsortedCaches[i].matcher )->asArrowTable ());
789- if (!status.ok ()) {
790- throw runtime_error_f (" failed to update slice cache (unsorted) for %s/%s" , unsortedCaches[i].binding .c_str (), unsortedCaches[i].key .c_str ());
791- }
792- }
793- } },
792+ (void ) (unsortedCaches |
793+ std::views::transform ([&counter](auto const & entry){ return std::make_pair (counter++, entry); }) |
794+ std::views::filter ([&pc](auto const & pair){ return pair.second .enabled && pc.inputs ().getPos (pair.second .matcher ) >= 0 ; }) |
795+ std::views::transform ([&service, &pc](auto const & pair){
796+ auto status = service->updateCacheEntryUnsorted (pair.first , pc.inputs ().get <TableConsumer>(pair.second .matcher )->asArrowTable ());
797+ if (!status.ok ()) {
798+ throw runtime_error_f (" Failed to update slice cache (unsorted) for %s/%s: %s" , pair.second .binding .c_str (), pair.second .key .c_str (), status.ToString ().c_str ());
799+ }
800+ return status;
801+ }));
802+ },
794803 .kind = ServiceKind::Stream};
795804}
796805
0 commit comments