@@ -291,116 +291,85 @@ struct ContributeVolumeDataToWriter {
291291 VolumeDataAtObsId received_volume_data) {
292292 // The below gymnastics with pointers is done in order to minimize the
293293 // time spent locking the entire node, which is necessary because the
294- // DataBox does not allow any functions calls, both get and mutate, during
295- // a mutate. This design choice in DataBox is necessary to guarantee a
296- // consistent state throughout mutation. Here, however, we need to be
297- // reasonable efficient in parallel and so we manually guarantee that
298- // consistent state. To this end, we create pointers and assign to them
299- // the data in the DataBox which is guaranteed to be pointer stable. The
300- // data itself is guaranteed to be stable inside the VolumeDataLock.
301- typename TensorDataTag::type* all_volume_data = nullptr ;
302- VolumeDataAtObsId volume_data;
294+ // DataBox does not allow any function calls, either get and mutate, during
295+ // a mutate. We separate out writing from the operations that edit the
296+ // DataBox since writing to disk can be very slow, but moving data around is
297+ // comparatively quick.
303298 Parallel::NodeLock* volume_file_lock = nullptr ;
304- std::unordered_map<ObservationId,
305- std::unordered_set<Parallel::ArrayComponentId>>*
306- volume_observers_contributed = nullptr ;
307- Parallel::NodeLock* volume_data_lock = nullptr ;
308- size_t observations_registered_with_id = std::numeric_limits<size_t >::max ();
299+ bool perform_write = false ;
300+ VolumeDataAtObsId volume_data{};
309301
310302 {
311303 const std::lock_guard hold_lock (*node_lock);
312- db::mutate<TensorDataTag, Tags::ContributorsOfTensorData,
313- Tags::VolumeDataLock, Tags::H5FileLock>(
314- [&observation_id, &observations_registered_with_id,
315- &observer_group_id, &all_volume_data, &volume_observers_contributed,
316- &volume_data_lock, &volume_file_lock](
317- const gsl::not_null<typename TensorDataTag::type*>
318- volume_data_ptr,
319- const gsl::not_null<std::unordered_map<
320- ObservationId,
321- std::unordered_set<Parallel::ArrayComponentId>>*>
322- volume_observers_contributed_ptr,
323- const gsl::not_null<Parallel::NodeLock*> volume_data_lock_ptr,
324- const gsl::not_null<Parallel::NodeLock*> volume_file_lock_ptr,
325- const std::unordered_map<
326- ObservationKey,
327- std::unordered_set<Parallel::ArrayComponentId>>&
328- observations_registered) {
329- const ObservationKey& key{observation_id.observation_key ()};
330- if (const auto & registered_group_ids =
331- observations_registered.find (key);
332- LIKELY (registered_group_ids != observations_registered.end ())) {
333- if (UNLIKELY (
334- registered_group_ids->second .find (observer_group_id) ==
335- registered_group_ids->second .end ())) {
336- ERROR (" The observer group id "
337- << observer_group_id
338- << " was not registered for the observation id "
339- << observation_id);
340- }
341- } else {
342- ERROR (" key "
343- << key
344- << " not in the registered group ids. Known keys are "
345- << keys_of (observations_registered));
346- }
347304
348- all_volume_data = &*volume_data_ptr;
349- volume_observers_contributed = &*volume_observers_contributed_ptr;
350- volume_data_lock = &*volume_data_lock_ptr;
351- observations_registered_with_id =
352- observations_registered.at (key).size ();
305+ // Set file lock for later
306+ db::mutate<Tags::H5FileLock>(
307+ [&volume_file_lock](
308+ const gsl::not_null<Parallel::NodeLock*> volume_file_lock_ptr) {
353309 volume_file_lock = &*volume_file_lock_ptr;
354310 },
355- make_not_null (&box),
356- db::get<Tags::ExpectedContributorsForObservations>(box));
357- }
311+ make_not_null (&box));
358312
359- ASSERT (all_volume_data != nullptr ,
360- " Failed to set all_volume_data in the mutate" );
361- ASSERT (volume_file_lock != nullptr ,
362- " Failed to set volume_file_lock in the mutate" );
363- ASSERT (volume_observers_contributed != nullptr ,
364- " Failed to set volume_observers_contributed in the mutate" );
365- ASSERT (volume_data_lock != nullptr ,
366- " Failed to set volume_data_lock in the mutate" );
367- ASSERT (
368- observations_registered_with_id != std::numeric_limits<size_t >::max (),
369- " Failed to set observations_registered_with_id when mutating the "
370- " DataBox. This is a bug in the code." );
313+ ASSERT (volume_file_lock != nullptr ,
314+ " Failed to set volume_file_lock in the mutate" );
315+
316+ const auto & observations_registered =
317+ db::get<Tags::ExpectedContributorsForObservations>(box);
318+
319+ const ObservationKey& key = observation_id.observation_key ();
320+ if (LIKELY (observations_registered.contains (key))) {
321+ if (UNLIKELY (not observations_registered.at (key).contains (
322+ observer_group_id))) {
323+ ERROR (" The observer group id "
324+ << observer_group_id
325+ << " was not registered for the observation id "
326+ << observation_id);
327+ }
328+ } else {
329+ ERROR (" key " << key
330+ << " not in the registered group ids. Known keys are "
331+ << keys_of (observations_registered));
332+ }
333+
334+ const size_t observations_registered_with_id =
335+ observations_registered.at (key).size ();
336+
337+ // Ok because we have the node lock
338+ auto & volume_observers_contributed =
339+ db::get_mutable_reference<Tags::ContributorsOfTensorData>(
340+ make_not_null (&box));
341+ auto & all_volume_data =
342+ db::get_mutable_reference<TensorDataTag>(make_not_null (&box));
371343
372- bool perform_write = false ;
373- {
374- const std::lock_guard hold_lock (*volume_data_lock);
375344 auto & contributed_group_ids =
376- (* volume_observers_contributed) [observation_id];
345+ volume_observers_contributed[observation_id];
377346
378- if (UNLIKELY (contributed_group_ids.find (observer_group_id) !=
379- contributed_group_ids.end ())) {
347+ if (UNLIKELY (contributed_group_ids.contains (observer_group_id))) {
380348 ERROR (" Already received reduction data to observation id "
381349 << observation_id << " from array component id "
382350 << observer_group_id);
383351 }
384352 contributed_group_ids.insert (observer_group_id);
385353
386- if (all_volume_data->find (observation_id) == all_volume_data->end ()) {
387- // We haven't been called before on this processing element.
388- all_volume_data->operator [](observation_id) =
389- std::move (received_volume_data);
390- } else {
391- auto & current_data = all_volume_data->at (observation_id);
354+ // Add received volume data to the box
355+ if (all_volume_data.contains (observation_id)) {
356+ auto & current_data = all_volume_data.at (observation_id);
392357 current_data.insert (
393358 std::make_move_iterator (received_volume_data.begin ()),
394359 std::make_move_iterator (received_volume_data.end ()));
360+ } else {
361+ // We haven't been called before on this processing element.
362+ all_volume_data[observation_id] = std::move (received_volume_data);
395363 }
364+
396365 // Check if we have received all "volume" data from the Observer
397366 // group. If so we write to disk.
398- if (volume_observers_contributed-> at (observation_id).size () ==
367+ if (volume_observers_contributed. at (observation_id).size () ==
399368 observations_registered_with_id) {
400369 perform_write = true ;
401- volume_data = std::move (all_volume_data-> operator []( observation_id) );
402- all_volume_data-> erase (observation_id);
403- volume_observers_contributed-> erase (observation_id);
370+ volume_data = std::move (all_volume_data[ observation_id] );
371+ all_volume_data. erase (observation_id);
372+ volume_observers_contributed. erase (observation_id);
404373 }
405374 }
406375
0 commit comments