Parallel python
Bо-первых надо вывести нормальное окно вида прогресс-бар на время выполнения, но снаружи progress bar не хочет обновляться, даже если создать wx.Timer или wx.PyTimer и повесить на него обновление просто по времени. И вообще на время выполнения pp события wx не работают. А изнутри wx объект wx другой и рабочее приложение и его окно нельзя получить даже посредством wx.GetApp().GetTopWindow().
Вторая проблема - логирование. Во время работы wx у меня есть свой модуль, который при первом запуске создаёт переменную модуля вида "время_запуска" и файл такого же вида, и при всех дальнейших обращениях из программы пишет в этот файл. А изнутри pp это в каждом потоке новый модуль и получается, что каждая задача внутри pp генерит новое время запуска и новый файл лога.
А третья проблема - доступ к БД. У меня MongoDB через стандартный pymongo, дык вот каждый процесс создаёт новое подключение.
Вообще все проблемы решаемы, если в функцию, которая запускается pp, передавать аргументами сами объекты типа wx.GetApp(), pymongo.Connection() и тупо имя файла лога, но мне такой подход не нравится. Не должны по идее такие объекты идти аргументами, они должны быть глобальные. Но пока других решений я не вижу.
Заранее спасибо
Цитата: Mr.Hacker
Bо-первых надо вывести нормальное окно вида прогресс-бар на время выполнения, но снаружи progress bar не хочет обновляться, даже если создать wx.Timer или wx.PyTimer и повесить на него обновление просто по времени.
Тут есть альтернатива потокам: сделать функцию, отвечающую за обмен, генераторной (то есть, чтобы она возвращала данные по yield). При каждой итерации обновляем прогресс-бар.
Короче, можно смотреть в сторону гринлетов (green threads, greenlets).
Цитата: Mr.Hacker
Вообще все проблемы решаемы, если в функцию, которая запускается pp, передавать аргументами сами объекты типа wx.GetApp(), pymongo.Connection() и тупо имя файла лога, но мне такой подход не нравится. Не должны по идее такие объекты идти аргументами, они должны быть глобальные. Но пока других решений я не вижу.
Глобальные переменные - это зло, если говорим не о коротеньких скриптах. Код с ними труднее тестировать, отлаживать и повторно использовать. Лучше передавать аргументами, либо сделать их данными какого-либо небольшого класса (см. ООП).
про green threads почитаю.. yield правда не вижу как мне поможет, у функций, которые вызываются через pp, нормального возврата нет, они там по идее обёрнуты лямбда-функцией, которая их возвращает по завершению...
глобальные переменные зло? а как может быть объект wx по-другому, мы пишем import wx во всех наших файлах, а потом у нас методы либо сами вызываются как обработчики событий, либо ручками через wx.GetApp().GetMainWindow(), сам wx-то глобальный, а у методов итак первый self в определении функции на него указывает..
http://www.parallelpython.com/
Я с ним не работал, ибо мне не нужен сторонний модуль, который обеспечивает функционал того, что есть в стандартном (multiprocessing). То есть работа на нескольких процессорах. Или там есть ещё что-то полезное?
Если интересно, то дальше могу отвечать по стандартным модулям. Если ты делаешь правильно, что у тебя будет процесс для GUI и процесс(ы) для обработчика данных. Каким образом процессы обмениваются данными? Они обмениваются через каналы или очереди. То есть тебе надо читать про них.
Кроме того, в multiprocessing есть интересная штука multiprocessing.Manager(), который облегчает обмен данными в некоторых случаях.
Чтобы что-то подробнее обсуждать, нужен конкретный код. Можешь выделить минимальный код, который у тебя не работает?
Что касается wx, то у него тоже есть некоторые особенности, но они относятся скорее к потокам. Можно посмотреть, где применяют wx.CallAfter(), например.
Цитата: Mr.Hacker
глобальные переменные зло? а как может быть объект wx по-другому, мы пишем import wx во всех наших файлах, а потом у нас методы либо сами вызываются как обработчики событий, либо ручками через wx.GetApp().GetMainWindow(), сам wx-то глобальный, а у методов итак первый self в определении функции на него указывает..
wx - это модуль, а не глобальная переменная.
wx.GetApp().GetMainWindow() - а тут мы обращаемся к синглтонам. Что, в общем, тоже не есть хорошо, но уже лучше чем глобальные переменные.
по мануалу..
процессы у меня данными между собой не обмениваются вообще, а наверх максимум что он должен послать - то, что он отработал (вывести в лог и в окно прогресса) и засторить результат..
код упрощённо такой:
ui:
main UI:
logic:
Да, прошу прощения за терминологию, wx конечно модуль, но я его просто для себя как объект-переменную воспринимаю. И внутри его wx.GetApp().GetTopWindow() ведут себя как синглетоны, но внутри pp - нет. То есть у него внутри это почему-то другие объекты, причём похоже что у каждого потока свои, ибо если я туда в модули добавлю свой класс-синглетон logic.patients, который, помимо всего, вызывает синглетоновое pymongo, то в итоге на серваке я получу одновременных новых входящих коннектов с число тредов)) и переменные внутри модуля тоже новые:
например, logic.config as params:
из разных файлов, которые импортят logic.config лог спокойно пишется в один файл, а внутри pp у каждого curApp получается свой
и таймер, который во втором листинге создаётся в makeProgress() просто виснет во время работы pp.. пробовал внутри makeProgress сделать threading.Thread и в нём создать прогресс, дык тоже виснет)
да, модуль этот.. со стандартным multiprocessing у меня что-то не вышло, хотя пытался
процессы у меня данными между собой не обмениваются вообще, а наверх максимум что он должен послать - то, что он отработал (вывести в лог и в окно прогресса) и засторить результат..
код упрощённо такой:
ui:
Код:
class mcnpPrefPanel(wx.Panel):
def __init__(self, parent, **kwargs):
wx.Panel.__init__(self, parent, style=0)
...
self.Bind(wx.EVT_BUTTON, self.save, id=btnSave)
def save(self, event):
# тут сначала всякие проверки на корректность данных
wx.GetApp().GetTopWindow().makeProgress(_('save mcnp'), _('saving outfile for mcnp'))
collect = self.collect()
self.pat.setRunPref(collect)
params.log('Run preferences: ')
params.log(collect)
mcnpparser.calculateMCNP()
wx.GetApp().GetTopWindow().updateProgress()
wx.GetApp().GetTopWindow().destroyProgress()
self.pat.setCurrentPatient({'status' : 5})
pass
def __init__(self, parent, **kwargs):
wx.Panel.__init__(self, parent, style=0)
...
self.Bind(wx.EVT_BUTTON, self.save, id=btnSave)
def save(self, event):
# тут сначала всякие проверки на корректность данных
wx.GetApp().GetTopWindow().makeProgress(_('save mcnp'), _('saving outfile for mcnp'))
collect = self.collect()
self.pat.setRunPref(collect)
params.log('Run preferences: ')
params.log(collect)
mcnpparser.calculateMCNP()
wx.GetApp().GetTopWindow().updateProgress()
wx.GetApp().GetTopWindow().destroyProgress()
self.pat.setCurrentPatient({'status' : 5})
pass
main UI:
Код:
class mainWin(wx.Frame):
...
def makeProgress(self, title, caption, noModal=False):
"""
Создаёт прогресс-диалог
@param title: заголовок
@type title: string
@param caption: текст окна
@type caption: string
@param noModal: модальное или нет
@type noModal: boolean
"""
params.log('Creating progress bar with title: %s' %title)
style = wx.PD_ELAPSED_TIME
if (not noModal):
style |= wx.PD_APP_MODAL
self.dlg = PP.PyProgress(wx.GetApp().GetTopWindow(), -1, title,
caption,
agwStyle= style,
)
self.dlg.SetGaugeProportion(0.25)
self.dlg.SetGaugeSteps(18)
self.dlg.SetGaugeBackground(wx.WHITE)
self.dlg.SetFirstGradientColour(wx.WHITE)
self.dlg.SetSecondGradientColour(wx.RED)
self.dlg.SetSize((500, 180))
self.timer = wx.PyTimer(self.updateProgress)
self.timer.Start(1000)
self.updateProgress()
def updateProgress(self, data=''):
"""
Обновляет окно прогресс-бара
@param data: новый текст. Если пустая строка - обновляться не будет
@type data: string
"""
try:
if ('' == data):
self.dlg.UpdatePulse()
else:
self.dlg.UpdatePulse(data)
except:
pass
def destroyProgress(self):
"""
Удаляет прогресс-бар
"""
self.dlg.Destroy()
try:
self.timer.Stop()
self.timer.Destroy()
except:
pass
wx.SafeYield()
wx.GetApp().GetTopWindow().Raise()
params.log('Progress closed')
...
def makeProgress(self, title, caption, noModal=False):
"""
Создаёт прогресс-диалог
@param title: заголовок
@type title: string
@param caption: текст окна
@type caption: string
@param noModal: модальное или нет
@type noModal: boolean
"""
params.log('Creating progress bar with title: %s' %title)
style = wx.PD_ELAPSED_TIME
if (not noModal):
style |= wx.PD_APP_MODAL
self.dlg = PP.PyProgress(wx.GetApp().GetTopWindow(), -1, title,
caption,
agwStyle= style,
)
self.dlg.SetGaugeProportion(0.25)
self.dlg.SetGaugeSteps(18)
self.dlg.SetGaugeBackground(wx.WHITE)
self.dlg.SetFirstGradientColour(wx.WHITE)
self.dlg.SetSecondGradientColour(wx.RED)
self.dlg.SetSize((500, 180))
self.timer = wx.PyTimer(self.updateProgress)
self.timer.Start(1000)
self.updateProgress()
def updateProgress(self, data=''):
"""
Обновляет окно прогресс-бара
@param data: новый текст. Если пустая строка - обновляться не будет
@type data: string
"""
try:
if ('' == data):
self.dlg.UpdatePulse()
else:
self.dlg.UpdatePulse(data)
except:
pass
def destroyProgress(self):
"""
Удаляет прогресс-бар
"""
self.dlg.Destroy()
try:
self.timer.Stop()
self.timer.Destroy()
except:
pass
wx.SafeYield()
wx.GetApp().GetTopWindow().Raise()
params.log('Progress closed')
logic:
Код:
def calculateMCNP(event=0):
"""
Основная функция построения выходного файла
Читает параметры пациента, вызывает функции построения списков материалов, создаёт файл с материалами,
плотностями, маткартами и переносом+поворотом источника
"""
pat = patient.patientInfo()
curP = pat.getSinglePatient()
NumberOfSlices = int(curP['numberOfSlices'])
FileDir = pat.getPatDir() + '/Segmented'
params.log('Parsing images from %s to MCNP materials' %(FileDir))
nums = parseImages(NumberOfSlices, FileDir)
def parseImages(n, srdir):
"""
Читает файлы в директории srdir, создаёт мультипроцессорную обработку (pp) изображений
@param n: число срезов
@type n: int
@param srdir: путь до директории со срезами (сегментированными)
@type srdir: string
@return: список со списками материалов в каждом срезе [ [...], ...]
@rtype: list
"""
global h, mixes
im_dir=srdir
lf=os.listdir(im_dir)
lf = sortLf(lf)
r = []
sr = n
ppservers = ()
job_server = pp.Server(multiprocessing.cpu_count(), ppservers=ppservers)
params.log('Starting parallel python with %d workers' %(job_server.get_ncpus()))
wx.GetApp().GetTopWindow().updateProgress(_('Starting count with %d workers') %job_server.get_ncpus())
jobs = []
for i in range(sr):
im_file=im_dir+'/'+lf
jobs.append(job_server.submit(png2mcnp, # это функция, которая вызывается
(im_file, i, h, mixes), # это её аргументы
(retNums,square,retSumRazn), # это список функций, которые нужны ей в работе
("Image","numpy","wx", "logic.config") # это модули, которые должны быть подключены
)
)
wx.GetApp().GetTopWindow().updateProgress()
for job in jobs:
r.append(job())
wx.GetApp().GetTopWindow().updateProgress()
wx.GetApp().GetTopWindow().updateProgress()
return r
def png2mcnp(inputFile, q, h, mixes):
"""
Функция, создающая список материалов по заданному PNG-файлу
@param inputFile: путь до входного файла
@type inputFile: string
@return: массив материалов
@rtype: list
"""
# вот это по идее основная функция для pp, но если изнутри попробовать вызвать
wx.GetApp().GetTopWindow()
# то получается ошибка, что Window нет
# а надо просто что-то типа
... work ...
params.log('Thread %d finished with' %q)
params.log(result)
wx.GetApp().GetTopWindow().updateProgress('Thread %d finished' %q)
"""
Основная функция построения выходного файла
Читает параметры пациента, вызывает функции построения списков материалов, создаёт файл с материалами,
плотностями, маткартами и переносом+поворотом источника
"""
pat = patient.patientInfo()
curP = pat.getSinglePatient()
NumberOfSlices = int(curP['numberOfSlices'])
FileDir = pat.getPatDir() + '/Segmented'
params.log('Parsing images from %s to MCNP materials' %(FileDir))
nums = parseImages(NumberOfSlices, FileDir)
def parseImages(n, srdir):
"""
Читает файлы в директории srdir, создаёт мультипроцессорную обработку (pp) изображений
@param n: число срезов
@type n: int
@param srdir: путь до директории со срезами (сегментированными)
@type srdir: string
@return: список со списками материалов в каждом срезе [ [...], ...]
@rtype: list
"""
global h, mixes
im_dir=srdir
lf=os.listdir(im_dir)
lf = sortLf(lf)
r = []
sr = n
ppservers = ()
job_server = pp.Server(multiprocessing.cpu_count(), ppservers=ppservers)
params.log('Starting parallel python with %d workers' %(job_server.get_ncpus()))
wx.GetApp().GetTopWindow().updateProgress(_('Starting count with %d workers') %job_server.get_ncpus())
jobs = []
for i in range(sr):
im_file=im_dir+'/'+lf
jobs.append(job_server.submit(png2mcnp, # это функция, которая вызывается
(im_file, i, h, mixes), # это её аргументы
(retNums,square,retSumRazn), # это список функций, которые нужны ей в работе
("Image","numpy","wx", "logic.config") # это модули, которые должны быть подключены
)
)
wx.GetApp().GetTopWindow().updateProgress()
for job in jobs:
r.append(job())
wx.GetApp().GetTopWindow().updateProgress()
wx.GetApp().GetTopWindow().updateProgress()
return r
def png2mcnp(inputFile, q, h, mixes):
"""
Функция, создающая список материалов по заданному PNG-файлу
@param inputFile: путь до входного файла
@type inputFile: string
@return: массив материалов
@rtype: list
"""
# вот это по идее основная функция для pp, но если изнутри попробовать вызвать
wx.GetApp().GetTopWindow()
# то получается ошибка, что Window нет
# а надо просто что-то типа
... work ...
params.log('Thread %d finished with' %q)
params.log(result)
wx.GetApp().GetTopWindow().updateProgress('Thread %d finished' %q)
Да, прошу прощения за терминологию, wx конечно модуль, но я его просто для себя как объект-переменную воспринимаю. И внутри его wx.GetApp().GetTopWindow() ведут себя как синглетоны, но внутри pp - нет. То есть у него внутри это почему-то другие объекты, причём похоже что у каждого потока свои, ибо если я туда в модули добавлю свой класс-синглетон logic.patients, который, помимо всего, вызывает синглетоновое pymongo, то в итоге на серваке я получу одновременных новых входящих коннектов с число тредов)) и переменные внутри модуля тоже новые:
например, logic.config as params:
Код:
curApp = 0
def genAppHandle():
"""
Создаёт строку, которая однозначно идентифицирует запущенное приложение\n
Сейчас используется дамп системного времени
@return: строка идентификации приложения
@rtype: string
"""
return 'log'+strftime("%d%m%Y%H%M%S", localtime())
def log(str=None, **kwargs):
"""
Логирует действия. Пишет в файл, при установленной переменной отладки выводит лог на экран
@param str: строка
@type str: string
"""
if (isLogDisabled()):
return
global curApp
logDir = getWorkDir()+'/log'
if (not os.path.exists(logDir)):
os.mkdir(logDir)
if (0 == curApp):
curApp = genAppHandle()
logFileName = logDir+'/'+curApp+'.txt'
if (str is not None):
if ('str' == type(str).__name__):
logStr = "%s> %s\n" %(strftime("%H:%M:%S> ", localtime()), str)
else:
logStr = "%s> %s" %(strftime("%H:%M:%S> ", localtime()), objToStr(str))
writeToFile(logFileName, logStr)
if isDebug():
print logStr
def genAppHandle():
"""
Создаёт строку, которая однозначно идентифицирует запущенное приложение\n
Сейчас используется дамп системного времени
@return: строка идентификации приложения
@rtype: string
"""
return 'log'+strftime("%d%m%Y%H%M%S", localtime())
def log(str=None, **kwargs):
"""
Логирует действия. Пишет в файл, при установленной переменной отладки выводит лог на экран
@param str: строка
@type str: string
"""
if (isLogDisabled()):
return
global curApp
logDir = getWorkDir()+'/log'
if (not os.path.exists(logDir)):
os.mkdir(logDir)
if (0 == curApp):
curApp = genAppHandle()
logFileName = logDir+'/'+curApp+'.txt'
if (str is not None):
if ('str' == type(str).__name__):
logStr = "%s> %s\n" %(strftime("%H:%M:%S> ", localtime()), str)
else:
logStr = "%s> %s" %(strftime("%H:%M:%S> ", localtime()), objToStr(str))
writeToFile(logFileName, logStr)
if isDebug():
print logStr
из разных файлов, которые импортят logic.config лог спокойно пишется в один файл, а внутри pp у каждого curApp получается свой
и таймер, который во втором листинге создаётся в makeProgress() просто виснет во время работы pp.. пробовал внутри makeProgress сделать threading.Thread и в нём создать прогресс, дык тоже виснет)
Работа должна идти как-то так:
1. Произошло событие типа "Нажали кнопку для обработки картинки".
2. Перешли к функции обработки - это метод окна.
3. В этом методе создали диалог прогресс-бара и запускаем.
4. В этом методе запускаем функцию обработки картинки.
4. Не выходя из метода опрашиваем в цикле со sleep(n) некую функцию, отвечающую за готовность. Соответственно с готовностью обновляем прогресс-бар.
Короче, вид (см. MVC) через контроллер контролирует работу модели, а не модель грубо напрямую управляет видом, не учитывая его особенностей.
http://www.parallelpython.com/content/view/17/31/ )..
я бы на multiprocessing перешёл, если там всё, что ты говоришь, можно реализовать) но я пробовал в своём parseImages сделать это всё в Queue из multiprocessing - и ничего не работало.. они потом не запускались
про ивенты почитаю... я на самом деле работу global interpreter lock не особо понимаю)
как я понял, всё что внутри pp я снаружи опросить не смогу.. pp как бы переданные функции выделяет в новое приложение и запускает их.. то есть пункт 5 (который у тебя второй 4) не получится реализовать... из pp по идее только callback'ом можно выйти ( №4 в
я бы на multiprocessing перешёл, если там всё, что ты говоришь, можно реализовать) но я пробовал в своём parseImages сделать это всё в Queue из multiprocessing - и ничего не работало.. они потом не запускались
про ивенты почитаю... я на самом деле работу global interpreter lock не особо понимаю)
Код:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time
import multiprocessing
import wx
def func(mpEvt):
time.sleep(5)
mpEvt.set()
class MainFrame(wx.Frame):
def __init__(self):
wx.Frame.__init__(self, None, wx.ID_ANY, "MainFrame", size=(400, 300),
style = wx.MAXIMIZE | wx.DEFAULT_FRAME_STYLE )
self.Bind(wx.EVT_LEFT_DCLICK, self.on_left_dclick)
self.Show()
self.Center()
def on_left_dclick(self, event):
wx.GetApp().Yield(True)
progressMax = 10
manager = multiprocessing.Manager()
mpEvt = manager.Event()
process = multiprocessing.Process(target=func,args=(mpEvt,))
process.daemon=True
process.start()
dialog = wx.ProgressDialog("",
u"Some Text", progressMax)
counter = 0
while(not mpEvt.is_set()):
time.sleep(0.2)
if counter < progressMax:
dialog.Update(counter)
counter += 1
else:
counter = 0;
dialog.Destroy()
def main():
app = wx.PySimpleApp(False)
frame = MainFrame()
frame.Show()
app.MainLoop()
if __name__ == "__main__":
main()
# -*- coding: utf-8 -*-
import time
import multiprocessing
import wx
def func(mpEvt):
time.sleep(5)
mpEvt.set()
class MainFrame(wx.Frame):
def __init__(self):
wx.Frame.__init__(self, None, wx.ID_ANY, "MainFrame", size=(400, 300),
style = wx.MAXIMIZE | wx.DEFAULT_FRAME_STYLE )
self.Bind(wx.EVT_LEFT_DCLICK, self.on_left_dclick)
self.Show()
self.Center()
def on_left_dclick(self, event):
wx.GetApp().Yield(True)
progressMax = 10
manager = multiprocessing.Manager()
mpEvt = manager.Event()
process = multiprocessing.Process(target=func,args=(mpEvt,))
process.daemon=True
process.start()
dialog = wx.ProgressDialog("",
u"Some Text", progressMax)
counter = 0
while(not mpEvt.is_set()):
time.sleep(0.2)
if counter < progressMax:
dialog.Update(counter)
counter += 1
else:
counter = 0;
dialog.Destroy()
def main():
app = wx.PySimpleApp(False)
frame = MainFrame()
frame.Show()
app.MainLoop()
if __name__ == "__main__":
main()
Кликаешь дважды по окну и процесс запускается.
Спасибо, попробую