是的,那是可行的。您的计算不依赖于中间结果,因此您可以轻松地将任务划分为多个块并将其分布在多个流程中。这就是所谓的
令人尴尬的并行问题 。
这里唯一棘手的部分可能是,首先将范围分成相当相等的部分。理顺我的个人lib两个功能来处理此问题:
# mp_utils.pyfrom itertools import accumulatedef calc_batch_sizes(n_tasks: int, n_workers: int) -> list: """Divide `n_tasks` optimally between n_workers to get batch_sizes. Guarantees batch sizes won't differ for more than 1. Example: # >>>calc_batch_sizes(23, 4) # Out: [6, 6, 6, 5] In case you're going to use numpy anyway, use np.array_split: [len(a) for a in np.array_split(np.arange(23), 4)] # Out: [6, 6, 6, 5] """ x = int(n_tasks / n_workers) y = n_tasks % n_workers batch_sizes = [x + (y > 0)] * y + [x] * (n_workers - y) return batch_sizesdef build_batch_ranges(batch_sizes: list) -> list: """Build batch_ranges from list of batch_sizes. Example: # batch_sizes [6, 6, 6, 5] # >>>build_batch_ranges(batch_sizes) # Out: [range(0, 6), range(6, 12), range(12, 18), range(18, 23)] """ upper_bounds = [*accumulate(batch_sizes)] lower_bounds = [0] + upper_bounds[:-1] batch_ranges = [range(l, u) for l, u in zip(lower_bounds, upper_bounds)] return batch_ranges
然后您的主脚本将如下所示:
import timefrom multiprocessing import Poolfrom mp_utils import calc_batch_sizes, build_batch_rangesdef target_foo(batch_range): return sum(batch_range) # ~ 6x faster than target_foo1def target_foo1(batch_range): numbers = [] for num in batch_range: numbers.append(num) return sum(numbers)if __name__ == '__main__': N = 100000000 N_CORES = 4 batch_sizes = calc_batch_sizes(N, n_workers=N_CORES) batch_ranges = build_batch_ranges(batch_sizes) start = time.perf_counter() with Pool(N_CORES) as pool: result = pool.map(target_foo, batch_ranges) r_sum = sum(result) print(r_sum) print(f'elapsed: {time.perf_counter() - start:.2f} s')
请注意,我也将for循环切换为range对象的简单总和,因为它提供了更好的性能。如果您无法在实际的应用程序中执行此 *** 作,则列表理解仍比示例中的手动填充列表要快60%。
示例输出:
4999999950000000elapsed: 0.51 sProcess finished with exit pre 0
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)