Добрый день, уважаемые читатели . Очень часто в реальных проектах возникает необходимость выполнения определённых фрагментов программы на нескольких ядрах процессора. И сегодня я хочу обсудить реализацию такого приёма в программировании на Python, используя multiprocessing — модуль из стандартной библиотеки, впервые описанный в PEP371.
Подпишись на группу Вконтакте и Телеграм-канал. Там еще больше полезного контента для программистов.
А на YouTube-канале ты найдешь обучающие видео по программированию. Подписывайся!
Начнём с простого
Реализуем функцию, принимающую аргумент целочисленного типа и заставляющую «засыпать» программу на n секунд. Заранее импортируем multiprocessing
под псевдонимом mp
и time
.
import multiprocessing as mp from time import time, sleep def f(n:int) -> None: sleep(n)
Теперь запустим выполнение функции для чисел из отрезка [1, 4]
. Заранее засечем время с помощью модуля time
:
start = time() for i in range(1, 1000001): square(i) print(f"Script work time is {time() - start} s.")
На моей машине этот фрагмент кода выполняется, как и полагается, за 10 секунд. Давайте научимся использоваться multiprocessing
для таких целей.
if __name__ == '__main__': procs = list() start = time() for i in range(1, 5): proc = mp.Process(target=f, args=(i,)) procs.append(proc) proc.start() for proc in procs: proc.join() print(f"Script work time is {time() - start} s.")
Однако, с multiprocessing
код выполнился за 4.14 секунды. Итак, давайте разбираться. Для каждого значения из нашего отрезка мы создаём экземляр класса mp.Process
, конструктор которого принимает, в этом случае, два аргумента:
- target — функция, которую необходимо выполнить
- args — кортеж аргументов, которые необходимо передать в функцию
Далее с помощью метода start()
мы запускаем работу процесса. Как вы могли заметить, мы не зря создали список, куда добавляли процессы по мере их создания. Для каждого Process из списка мы вызываем метод join()
, и таким образом даём интерпретатору Python понять, что надо подождать, пока работа этого потока завершится.
Примечание. Не знаю как у вас, но у меня код отказывается выполняться без конструкции
if __name__ == '__main__'
. Именно поэтому я и добавил это в свою программу.
Замки в multiprocessing
Для того, чтобы не дать процессам конфликтовать, следует использовать mp.Lock. Использование этого класса очень просто: нам неоходимо использовать («повесить») замок перед выполнением какого-то фрагмента, а после снять его. Рассмотрим на практике:
def print_type(obj:object) -> None: print(f'Running with {current_process().name} \n') print(f'{obj} - {type(obj)}')
Так будет выглядеть наша функция без замыканий. Немного модернизируем её:
def print_type(obj:object, lock:mp.Lock) -> None: lock.release() print(f'Running with {mp.current_process().name} \n') print(f'{obj} - {type(obj)}') lock.acquire()
Вот и всё, так просто. Здесь же я хочу обсудить функцию current_procces()
, которая возвращает процесс, который в данный момент выполняет функцию, а также про параметр name в mp.Process
.
if __name__ == '__main__': procs = list() lock = mp.Lock() for i, obj in enumerate(['fff', 23, None, [1, 2, 3], (1, 2, 3), {1:2, 34}]): proc = mp.Process(target=print_type, args=(obj, lock), name=str(i)) procs.append(proc) proc.start() for proc in procs: proc.join()
С этим разобрались. Теперь поговорим о Pool
.
От map к Pool.map
Перейдём к более серьёзному примеру. Наша задача: определить кол-во чёрных пикселей в изображении ( (255, 255, 255) в формате RGB). Реализуем функцию, которая принимает в качестве аргумента путь к файлу, содержащему изображение (предположим, что черный пиксель имеет значения пикселей больше 250).
import numpy as np from PIL import Image def black_pixels(path:str) -> int: count = 0 image = np.array(Image.open(path)) return np.sum(image[::] > 250) print(black_pixels('car.jpg'))
Теперь создадим список с помощью os.listdir
. А также запустим выполнение функции для каждого изображения из списка
images = ['images\\' + image for image in os.listdir('images')] if __name__ == '__main__': pool = mp.Pool(processes=4) result = pool.map(black_pixels, images)
mp.Pool()
принимает параметр processes
, который устанавливает кол-во воркеров в пуле. А далее мы просто применяем «обычную» функцию map
, только в качестве метода класса Pool
.
Изменение глобальных переменных
Однажды мне было необходимо применить multiprocessing
к скрипту, который моделировал изменение общественного мнения. В основе скрипта была сеть NetworkX
, и я хотел изменять параметры вершин и ребёр этого графа. Однако сделать я этого не смог. Расммотрим пример:
hash_map = dict() def add_i(i:int) -> None: global hash_map hash_map[str(i)] = i if __name__ == '__main__': procs = list() for i in range(0, 10): proc = mp.Process(target=add_i, args=(i, )) procs.append(proc) proc.start() for proc in procs: proc.join() print(hash_map)
Да, как вы могли понять, список оказался пуст. Я отправился в дальнюю дорогу, чтобы открыть для себя новую тайну и понял, что нам прийдёт на помощь mp.Manager
, а именно его метод dict()
. Немного изменим программу:
def add_i(i:int, hash_map) -> None: hash_map[str(i)] = i if __name__ == '__main__': hash_map = mp.Manager().dict() procs = list() for i in range(0, 10): proc = mp.Process(target=add_i, args=(i, hash_map)) procs.append(proc) proc.start() for proc in procs: proc.join() print(hash_map)
Результата ожидаем. Если вам необходимо использовать список, то используйте mp.Manager().list()
. Да, как вы поняли, сеть NetworkX из своего скрипта я удалил, т.к. мог использовать только словарь или список.
Заключение
В этой статье мы обсудили базовые приёмы в работе с multiprocessing. Мы ускорили время выполнения программы, использовали map и научились изменять глобальные переменные. Надеюсь, из этой статьи вы подчеркнули что-то новое для себя.
Желаю быстрой компиляции и большого кол-ва потоков.
Также рекомендую прочитать статью Программирование графов на Python . А также подписывайтесь на группу ВКонтакте, Telegram и YouTube-канал. Там еще больше полезного и интересного для программистов.