Справочник функций

Ваш аккаунт

Войти через: 
Забыли пароль?
Регистрация
Информацию о новых материалах можно получать и без регистрации:

Почтовая рассылка

Подписчиков: -1
Последний выпуск: 19.06.2015

Parallel python

271
28 сентября 2011 года
MrXaK
721 / / 31.12.2002
Есть приложение, обычное оконное, написанное с помощью wx. В какой-то момент вызывается часть, отвечающая за обработку данных. Обработка данных идёт с помощью pp, то есть обычным pp.Server(), job.submit() и всё работает. Но есть три проблемы:
Bо-первых надо вывести нормальное окно вида прогресс-бар на время выполнения, но снаружи progress bar не хочет обновляться, даже если создать wx.Timer или wx.PyTimer и повесить на него обновление просто по времени. И вообще на время выполнения pp события wx не работают. А изнутри wx объект wx другой и рабочее приложение и его окно нельзя получить даже посредством wx.GetApp().GetTopWindow().
Вторая проблема - логирование. Во время работы wx у меня есть свой модуль, который при первом запуске создаёт переменную модуля вида "время_запуска" и файл такого же вида, и при всех дальнейших обращениях из программы пишет в этот файл. А изнутри pp это в каждом потоке новый модуль и получается, что каждая задача внутри pp генерит новое время запуска и новый файл лога.
А третья проблема - доступ к БД. У меня MongoDB через стандартный pymongo, дык вот каждый процесс создаёт новое подключение.

Вообще все проблемы решаемы, если в функцию, которая запускается pp, передавать аргументами сами объекты типа wx.GetApp(), pymongo.Connection() и тупо имя файла лога, но мне такой подход не нравится. Не должны по идее такие объекты идти аргументами, они должны быть глобальные. Но пока других решений я не вижу.

Заранее спасибо
87
29 сентября 2011 года
Kogrom
2.7K / / 02.02.2008
Вопрос слишком глобальный. На него и ответить можно так же: читать про потоки и процессы (модули threading, multiprocessing).

Цитата: Mr.Hacker
Bо-первых надо вывести нормальное окно вида прогресс-бар на время выполнения, но снаружи progress bar не хочет обновляться, даже если создать wx.Timer или wx.PyTimer и повесить на него обновление просто по времени.



Тут есть альтернатива потокам: сделать функцию, отвечающую за обмен, генераторной (то есть, чтобы она возвращала данные по yield). При каждой итерации обновляем прогресс-бар.

Короче, можно смотреть в сторону гринлетов (green threads, greenlets).

Цитата: Mr.Hacker
Вообще все проблемы решаемы, если в функцию, которая запускается pp, передавать аргументами сами объекты типа wx.GetApp(), pymongo.Connection() и тупо имя файла лога, но мне такой подход не нравится. Не должны по идее такие объекты идти аргументами, они должны быть глобальные. Но пока других решений я не вижу.



Глобальные переменные - это зло, если говорим не о коротеньких скриптах. Код с ними труднее тестировать, отлаживать и повторно использовать. Лучше передавать аргументами, либо сделать их данными какого-либо небольшого класса (см. ООП).

271
29 сентября 2011 года
MrXaK
721 / / 31.12.2002
Если я пытаюсь создать тред через threading.Thread перед вызовом pp, то этот тред также виснет) по идее можно изначально при старте приложения сделать один тред для основного приложения и другой для прогрессбара, то, скорее всего, заработает, но это нехорошо - тогда окно прогрессбара не будет связано с окном основного приложения..

про green threads почитаю.. yield правда не вижу как мне поможет, у функций, которые вызываются через pp, нормального возврата нет, они там по идее обёрнуты лямбда-функцией, которая их возвращает по завершению...

глобальные переменные зло? а как может быть объект wx по-другому, мы пишем import wx во всех наших файлах, а потом у нас методы либо сами вызываются как обработчики событий, либо ручками через wx.GetApp().GetMainWindow(), сам wx-то глобальный, а у методов итак первый self в определении функции на него указывает..
87
29 сентября 2011 года
Kogrom
2.7K / / 02.02.2008
Вероятно, я немного недопонял. Ты говоришь про этот модуль:
http://www.parallelpython.com/
Я с ним не работал, ибо мне не нужен сторонний модуль, который обеспечивает функционал того, что есть в стандартном (multiprocessing). То есть работа на нескольких процессорах. Или там есть ещё что-то полезное?

Если интересно, то дальше могу отвечать по стандартным модулям. Если ты делаешь правильно, что у тебя будет процесс для GUI и процесс(ы) для обработчика данных. Каким образом процессы обмениваются данными? Они обмениваются через каналы или очереди. То есть тебе надо читать про них.

Кроме того, в multiprocessing есть интересная штука multiprocessing.Manager(), который облегчает обмен данными в некоторых случаях.

Чтобы что-то подробнее обсуждать, нужен конкретный код. Можешь выделить минимальный код, который у тебя не работает?

Что касается wx, то у него тоже есть некоторые особенности, но они относятся скорее к потокам. Можно посмотреть, где применяют wx.CallAfter(), например.

Цитата: Mr.Hacker
глобальные переменные зло? а как может быть объект wx по-другому, мы пишем import wx во всех наших файлах, а потом у нас методы либо сами вызываются как обработчики событий, либо ручками через wx.GetApp().GetMainWindow(), сам wx-то глобальный, а у методов итак первый self в определении функции на него указывает..



wx - это модуль, а не глобальная переменная.
wx.GetApp().GetMainWindow() - а тут мы обращаемся к синглтонам. Что, в общем, тоже не есть хорошо, но уже лучше чем глобальные переменные.

271
29 сентября 2011 года
MrXaK
721 / / 31.12.2002
да, модуль этот.. со стандартным multiprocessing у меня что-то не вышло, хотя пытался по мануалу..

процессы у меня данными между собой не обмениваются вообще, а наверх максимум что он должен послать - то, что он отработал (вывести в лог и в окно прогресса) и засторить результат..

код упрощённо такой:

ui:
Код:
class mcnpPrefPanel(wx.Panel):
    def __init__(self, parent, **kwargs):
        wx.Panel.__init__(self, parent, style=0)
        ...
        self.Bind(wx.EVT_BUTTON, self.save, id=btnSave)

    def save(self, event):
        # тут сначала всякие проверки на корректность данных
        wx.GetApp().GetTopWindow().makeProgress(_('save mcnp'), _('saving outfile for mcnp'))
        collect = self.collect()
        self.pat.setRunPref(collect)
        params.log('Run preferences: ')
        params.log(collect)
        mcnpparser.calculateMCNP()
        wx.GetApp().GetTopWindow().updateProgress()
        wx.GetApp().GetTopWindow().destroyProgress()
        self.pat.setCurrentPatient({'status' : 5})
        pass


main UI:
Код:
class mainWin(wx.Frame):
    ...
    def makeProgress(self, title, caption, noModal=False):
        """
        Создаёт прогресс-диалог
        @param title: заголовок
        @type title: string
        @param caption: текст окна
        @type caption: string
        @param noModal: модальное или нет
        @type noModal: boolean
        """

        params.log('Creating progress bar with title: %s' %title)
        style = wx.PD_ELAPSED_TIME
        if (not noModal):
            style |= wx.PD_APP_MODAL
        self.dlg = PP.PyProgress(wx.GetApp().GetTopWindow(), -1, title,
                            caption,
                            agwStyle= style,
                            )

        self.dlg.SetGaugeProportion(0.25)
        self.dlg.SetGaugeSteps(18)
        self.dlg.SetGaugeBackground(wx.WHITE)
        self.dlg.SetFirstGradientColour(wx.WHITE)
        self.dlg.SetSecondGradientColour(wx.RED)
        self.dlg.SetSize((500, 180))
        self.timer = wx.PyTimer(self.updateProgress)
        self.timer.Start(1000)
        self.updateProgress()


    def updateProgress(self, data=''):
        """
        Обновляет окно прогресс-бара
        @param data: новый текст. Если пустая строка - обновляться не будет
        @type data: string
        """

        try:
            if ('' == data):
                self.dlg.UpdatePulse()
            else:
                self.dlg.UpdatePulse(data)
        except:
            pass

    def destroyProgress(self):
        """
        Удаляет прогресс-бар
        """

        self.dlg.Destroy()
        try:
            self.timer.Stop()
            self.timer.Destroy()
        except:
            pass
        wx.SafeYield()
        wx.GetApp().GetTopWindow().Raise()
        params.log('Progress closed')


logic:
Код:
def calculateMCNP(event=0):
    """
    Основная функция построения выходного файла
    Читает параметры пациента, вызывает функции построения списков материалов, создаёт файл с материалами,
    плотностями, маткартами и переносом+поворотом источника
    """

    pat = patient.patientInfo()
    curP = pat.getSinglePatient()
    NumberOfSlices = int(curP['numberOfSlices'])
    FileDir = pat.getPatDir() + '/Segmented'
    params.log('Parsing images from %s to MCNP materials' %(FileDir))
    nums = parseImages(NumberOfSlices, FileDir)

def parseImages(n, srdir):
        """
        Читает файлы в директории srdir, создаёт мультипроцессорную обработку (pp) изображений
        @param n: число срезов
        @type n: int
        @param srdir: путь до директории со срезами (сегментированными)
        @type srdir: string
        @return: список со списками материалов в каждом срезе [ [...], ...]
        @rtype: list
        """

        global h, mixes
        im_dir=srdir
        lf=os.listdir(im_dir)
        lf = sortLf(lf)
        r = []
        sr = n
        ppservers = ()
        job_server = pp.Server(multiprocessing.cpu_count(), ppservers=ppservers)
        params.log('Starting parallel python with %d workers' %(job_server.get_ncpus()))
        wx.GetApp().GetTopWindow().updateProgress(_('Starting count with %d workers') %job_server.get_ncpus())
        jobs = []
        for i in range(sr):
            im_file=im_dir+'/'+lf
            jobs.append(job_server.submit(png2mcnp,   # это функция, которая вызывается
                                                         (im_file, i, h, mixes),  # это её аргументы
                                                         (retNums,square,retSumRazn), # это список функций, которые нужны ей в работе
                                                         ("Image","numpy","wx", "logic.config") # это модули, которые должны быть подключены
                                                         )
                              )
            wx.GetApp().GetTopWindow().updateProgress()
        for job in jobs:
            r.append(job())
            wx.GetApp().GetTopWindow().updateProgress()
        wx.GetApp().GetTopWindow().updateProgress()

        return r

def png2mcnp(inputFile, q, h, mixes):
    """
    Функция, создающая список материалов по заданному PNG-файлу
    @param inputFile: путь до входного файла
    @type inputFile: string
    @return: массив материалов
    @rtype: list
    """

    # вот это по идее основная функция для pp, но если изнутри попробовать вызвать
    wx.GetApp().GetTopWindow()
    # то получается ошибка, что Window нет
    # а надо просто что-то типа
    ... work ...
    params.log('Thread %d finished with' %q)
    params.log(result)
    wx.GetApp().GetTopWindow().updateProgress('Thread %d finished' %q)


Да, прошу прощения за терминологию, wx конечно модуль, но я его просто для себя как объект-переменную воспринимаю. И внутри его wx.GetApp().GetTopWindow() ведут себя как синглетоны, но внутри pp - нет. То есть у него внутри это почему-то другие объекты, причём похоже что у каждого потока свои, ибо если я туда в модули добавлю свой класс-синглетон logic.patients, который, помимо всего, вызывает синглетоновое pymongo, то в итоге на серваке я получу одновременных новых входящих коннектов с число тредов)) и переменные внутри модуля тоже новые:

например, logic.config as params:
Код:
curApp = 0

def genAppHandle():
    """
    Создаёт строку, которая однозначно идентифицирует запущенное приложение\n
    Сейчас используется дамп системного времени
    @return: строка идентификации приложения
    @rtype: string
    """

    return 'log'+strftime("%d%m%Y%H%M%S", localtime())

def log(str=None, **kwargs):
    """
    Логирует действия. Пишет в файл, при установленной переменной отладки выводит лог на экран
    @param str: строка
    @type str: string
    """

    if (isLogDisabled()):
        return
    global curApp
    logDir = getWorkDir()+'/log'
    if (not os.path.exists(logDir)):
        os.mkdir(logDir)
    if (0 == curApp):
        curApp = genAppHandle()
    logFileName = logDir+'/'+curApp+'.txt'
    if (str is not None):
        if ('str' == type(str).__name__):
            logStr = "%s> %s\n" %(strftime("%H:%M:%S> ", localtime()), str)
        else:
            logStr = "%s> %s" %(strftime("%H:%M:%S> ", localtime()), objToStr(str))
    writeToFile(logFileName, logStr)
    if isDebug():
        print logStr

из разных файлов, которые импортят logic.config лог спокойно пишется в один файл, а внутри pp у каждого curApp получается свой

и таймер, который во втором листинге создаётся в makeProgress() просто виснет во время работы pp.. пробовал внутри makeProgress сделать threading.Thread и в нём создать прогресс, дык тоже виснет)
87
30 сентября 2011 года
Kogrom
2.7K / / 02.02.2008
В общем, тебе надо ещё почитать как работает очередь событий (event-ов, короче) в wxPython. Ты и без потоков можешь так же все подвесить, если не поймешь их работу.

Работа должна идти как-то так:
1. Произошло событие типа "Нажали кнопку для обработки картинки".
2. Перешли к функции обработки - это метод окна.
3. В этом методе создали диалог прогресс-бара и запускаем.
4. В этом методе запускаем функцию обработки картинки.
4. Не выходя из метода опрашиваем в цикле со sleep(n) некую функцию, отвечающую за готовность. Соответственно с готовностью обновляем прогресс-бар.

Короче, вид (см. MVC) через контроллер контролирует работу модели, а не модель грубо напрямую управляет видом, не учитывая его особенностей.
271
30 сентября 2011 года
MrXaK
721 / / 31.12.2002
как я понял, всё что внутри pp я снаружи опросить не смогу.. pp как бы переданные функции выделяет в новое приложение и запускает их.. то есть пункт 5 (который у тебя второй 4) не получится реализовать... из pp по идее только callback'ом можно выйти ( №4 в http://www.parallelpython.com/content/view/17/31/ )..

я бы на multiprocessing перешёл, если там всё, что ты говоришь, можно реализовать) но я пробовал в своём parseImages сделать это всё в Queue из multiprocessing - и ничего не работало.. они потом не запускались


про ивенты почитаю... я на самом деле работу global interpreter lock не особо понимаю)
87
30 сентября 2011 года
Kogrom
2.7K / / 02.02.2008
Короче, наклепал на скорую руку пример (возможно, что-то там лишнее, но главное - работает):

Код:
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
import multiprocessing

import wx


def func(mpEvt):
    time.sleep(5)
    mpEvt.set()


class MainFrame(wx.Frame):

    def __init__(self):

        wx.Frame.__init__(self, None, wx.ID_ANY, "MainFrame", size=(400, 300),
                        style = wx.MAXIMIZE | wx.DEFAULT_FRAME_STYLE )

        self.Bind(wx.EVT_LEFT_DCLICK, self.on_left_dclick)
        self.Show()
        self.Center()


    def on_left_dclick(self, event):
        wx.GetApp().Yield(True)
        progressMax = 10

        manager = multiprocessing.Manager()
        mpEvt = manager.Event()
        process = multiprocessing.Process(target=func,args=(mpEvt,))
        process.daemon=True
        process.start()

        dialog = wx.ProgressDialog("",
                u"Some Text", progressMax)
        counter = 0
        while(not mpEvt.is_set()):
            time.sleep(0.2)
            if counter < progressMax:
                dialog.Update(counter)
                counter += 1
            else:
                counter = 0;
        dialog.Destroy()


def main():
    app = wx.PySimpleApp(False)
    frame = MainFrame()
    frame.Show()
    app.MainLoop()


if __name__ == "__main__":
    main()


Кликаешь дважды по окну и процесс запускается.
271
01 октября 2011 года
MrXaK
721 / / 31.12.2002
Спасибо, попробую
Реклама на сайте | Обмен ссылками | Ссылки | Экспорт (RSS) | Контакты
Добавить статью | Добавить исходник | Добавить хостинг-провайдера | Добавить сайт в каталог