Source code for atomistics.shared.parallel
from collections.abc import Callable
from concurrent.futures import Executor
from typing import Any
def _convert_task_dict_to_task_lst(task_dict: dict) -> list:
"""
Convert a task dictionary to a list of tasks.
Args:
task_dict (dict): The task dictionary to be converted.
Returns:
list: A list of tasks.
"""
task_lst = []
for task_name, task_data in task_dict.items():
if isinstance(task_data, dict):
for task_parameter, task_object in task_data.items():
task_lst.append({task_name: {task_parameter: task_object}})
else:
task_lst.append({task_name: task_data})
return task_lst
def _convert_task_lst_to_task_dict(task_lst: list) -> dict:
"""
Convert a list of tasks into a dictionary representation.
Args:
task_lst (list): A list of tasks.
Returns:
dict: A dictionary representation of the tasks.
"""
task_dict: dict[Any, Any] = {}
for task in task_lst:
for task_name, task_data in task.items():
if isinstance(task_data, dict):
if task_name not in task_dict:
task_dict[task_name] = {}
task_dict[task_name].update(dict(task_data.items()))
else:
task_dict[task_name] = task_data
return task_dict
[docs]
def evaluate_with_parallel_executor(
evaluate_function: Callable[..., dict],
task_dict: dict,
executor: Executor,
**kwargs,
) -> dict:
"""
Executes the given `evaluate_function` in parallel using the provided `executor` and returns the results as a dictionary.
Args:
evaluate_function (callable): The function to be executed in parallel.
task_dict (dict): A dictionary containing the tasks to be executed.
executor (Executor): The executor to be used for parallel execution.
**kwargs: Additional keyword arguments to be passed to the `evaluate_function`.
Returns:
dict: A dictionary containing the results of the parallel execution.
"""
future_lst: list[Any] = [
executor.submit(evaluate_function, task_dict=task, **kwargs)
for task in _convert_task_dict_to_task_lst(task_dict=task_dict)
]
return _convert_task_lst_to_task_dict(
task_lst=[future.result() for future in future_lst]
)