programing

함수의 반환 값을 다중 처리로 전달하는 방법.과정?

sourcejob 2023. 4. 29. 09:14
반응형

함수의 반환 값을 다중 처리로 전달하는 방법.과정?

아래 예제 코드에서 함수의 반환 값을 얻고 싶습니다.worker어떻게 하면 좋을까요?이 값은 어디에 저장됩니까?

코드 예제:

import multiprocessing

def worker(procnum):
    '''worker function'''
    print str(procnum) + ' represent!'
    return procnum


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print jobs

출력:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]

저된개체관속련수없찾다에 에서 관련 수 없는 것 .jobs.

공유 변수를 사용하여 통신합니다.예를 들어 다음과 같습니다.

import multiprocessing


def worker(procnum, return_dict):
    """worker function"""
    print(str(procnum) + " represent!")
    return_dict[procnum] = procnum


if __name__ == "__main__":
    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i, return_dict))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print(return_dict.values())

저는 @sega_sai가 제안한 접근법이 더 좋다고 생각합니다.그러나 코드 예제가 필요하므로 다음과 같이 설명합니다.

import multiprocessing
from os import getpid

def worker(procnum):
    print('I am number %d in process %d' % (procnum, getpid()))
    return getpid()

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes = 3)
    print(pool.map(worker, range(5)))

그러면 반환 값이 인쇄됩니다.

I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]

이 익경우한숙에 .map(Python 2 기본 제공) 이것은 너무 어렵지 않을 것입니다.그렇지 않으면 sega_Sai의 링크를 보십시오.

코드가 얼마나 적게 필요한지 기록합니다.(프로세스가 재사용되는 방법도 참고하십시오.)

고객으로부터 가치를 얻는 방법을 찾고 있는 다른 사람들을 위해Process용사를 Queue:

import multiprocessing

ret = {'foo': False}

def worker(queue):
    ret = queue.get()
    ret['foo'] = True
    queue.put(ret)

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    queue.put(ret)
    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    p.join()
    print(queue.get())  # Prints {"foo": True}

) 또는 Jupyter Notebook 노트북)에서는 Windows(윈도우)를 사용합니다.multithreading당신은 이것을 파일로 저장하고 파일을 실행해야 합니다.명령 프롬프트에서 이 작업을 수행하면 다음과 같은 오류가 표시됩니다.

 AttributeError: Can't get attribute 'worker' on <module '__main__' (built-in)>

어떤 이유에서인지, 나는 이것을 어떻게 하는지에 대한 일반적인 예를 찾을 수 없었습니다.Queue어디서나(파이썬의 문서 예제에서도 여러 프로세스가 생성되지 않음), 약 10번의 시도 끝에 다음과 같이 작업했습니다.

from multiprocessing import Process, Queue

def add_helper(queue, arg1, arg2): # the func called in child processes
    ret = arg1 + arg2
    queue.put(ret)

def multi_add(): # spawns child processes
    q = Queue()
    processes = []
    rets = []
    for _ in range(0, 100):
        p = Process(target=add_helper, args=(q, 1, 2))
        processes.append(p)
        p.start()
    for p in processes:
        ret = q.get() # will block
        rets.append(ret)
    for p in processes:
        p.join()
    return rets

Queue하위 프로세스의 반환 값을 저장하는 데 사용할 수 있는 차단 스레드 안전 대기열입니다.따라서 각 프로세스에 대기열을 전달해야 합니다.여기서 덜 명백한 것은 당신이 해야 한다는 것입니다.get()join그자리의 Process그렇지 않으면 대기열이 가득 차서 모든 것을 차단합니다.

객체 지향적인 사용자를 위한 업데이트(Python 3.4에서 테스트됨):

from multiprocessing import Process, Queue

class Multiprocessor():

    def __init__(self):
        self.processes = []
        self.queue = Queue()

    @staticmethod
    def _wrapper(func, queue, args, kwargs):
        ret = func(*args, **kwargs)
        queue.put(ret)

    def run(self, func, *args, **kwargs):
        args2 = [func, self.queue, args, kwargs]
        p = Process(target=self._wrapper, args=args2)
        self.processes.append(p)
        p.start()

    def wait(self):
        rets = []
        for p in self.processes:
            ret = self.queue.get()
            rets.append(ret)
        for p in self.processes:
            p.join()
        return rets

# tester
if __name__ == "__main__":
    mp = Multiprocessor()
    num_proc = 64
    for _ in range(num_proc): # queue up multiple tasks running `sum`
        mp.run(sum, [1, 2, 3, 4, 5])
    ret = mp.wait() # get all results
    print(ret)
    assert len(ret) == num_proc and all(r == 15 for r in ret)

이 예에서는 다중 처리 목록을 사용하는 방법을 보여 줍니다.임의 개수의 프로세스에서 문자열을 반환하는 파이프 인스턴스:

import multiprocessing

def worker(procnum, send_end):
    '''worker function'''
    result = str(procnum) + ' represent!'
    print result
    send_end.send(result)

def main():
    jobs = []
    pipe_list = []
    for i in range(5):
        recv_end, send_end = multiprocessing.Pipe(False)
        p = multiprocessing.Process(target=worker, args=(i, send_end))
        jobs.append(p)
        pipe_list.append(recv_end)
        p.start()

    for proc in jobs:
        proc.join()
    result_list = [x.recv() for x in pipe_list]
    print result_list

if __name__ == '__main__':
    main()

출력:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']

이 솔루션은 멀티프로세싱보다 적은 리소스를 사용합니다.다음을 사용하는 대기열

  • 파이프
  • 하나 이상의 잠금
  • 완충제
  • 실타래

또는 다중 처리.다음을 사용하는 단순 대기열

  • 파이프
  • 하나 이상의 잠금

이러한 유형별로 소스를 살펴보는 것이 매우 유용합니다.

당신은 멀티프로세싱을 사용해야 할 것 같습니다.대신 풀 클래스를 풀하고 .apply() .apply_async(), map() 메서드를 사용합니다.

http://docs.python.org/library/multiprocessing.html?highlight=pool#multiprocessing.pool.AsyncResult

사용할 수 있습니다.exit프로세스의 종료 코드를 설정하는 기본 제공.다음에서 얻을 수 있습니다.exitcode프로세스의 속성:

import multiprocessing

def worker(procnum):
    print str(procnum) + ' represent!'
    exit(procnum)

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    result = []
    for proc in jobs:
        proc.join()
        result.append(proc.exitcode)
    print result

출력:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]

조약돌 패키지는 이를 활용한 멋진 추상화를 가지고 있습니다.multiprocessing.Pipe이는 이것을 꽤 쉽게 만듭니다.

from pebble import concurrent

@concurrent.process
def function(arg, kwarg=0):
    return arg + kwarg

future = function(1, kwarg=1)

print(future.result())

예: https://pythonhosted.org/Pebble/ #current-compreators

위에서 복사한 가장 간단한 예를 단순화하려고 생각했습니다. Py3.6에서 작업했습니다.가장 간단한 것은:

import multiprocessing
import time

def worker(x):
    time.sleep(1)
    return x

pool = multiprocessing.Pool()
print(pool.map(worker, range(10)))

풀의 프로세스 수를 다음과 같이 설정할 수 있습니다.Pool(processes=5)그러나 CPU 수는 기본적으로 설정되므로 CPU 바인딩 작업의 경우 비워 두십시오. (I/O 바인딩 작업은 스레드가 대부분 대기 중이므로 CPU 코어를 공유할 수 있으므로 어쨌든 스레드에 적합한 경우가 많습니다.) Pool청킹 최적화도 적용합니다.

작업자 메서드는 메서드 내에 중첩될 수 없습니다.나는 처음에 전화를 거는 방법 안에서 나의 작업자 방법을 정의했습니다.pool.map이 모든 것을 독립적으로 유지하기 위해 프로세스는 이를 가져올 수 없으며 "속성 오류:로컬 개체 outer_method를 선택할 수 없습니다.내면_지혜"여기 더 있습니다.클래스 안에 있을 수 있습니다.)

(원래 질문에 지정된 인쇄를 감사합니다.'represent!'보다는time.sleep()하지만 그것이 없으면 어떤 코드가 동시에 실행되지 않을 때 실행된다고 생각했습니다.)


Py3's 또한 두 줄입니다(.map합니다.list()):

from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
    print(list(executor.map(worker, range(10))))

플레인 포함:

import multiprocessing
import time

def worker(x, queue):
    time.sleep(1)
    queue.put(x)

queue = multiprocessing.SimpleQueue()
tasks = range(10)

for task in tasks:
    multiprocessing.Process(target=worker, args=(task, queue,)).start()

for _ in tasks:
    print(queue.get())

필요한 것이 다음과 같은 경우 사용put그리고.get하고, 두 루프는 에 시작합니다.queue.get 전화할 가 없는 것 p.join()도 마찬가지야

Python 3을 사용하는 경우 다음을 편리한 추상화로 사용할 수 있습니다.

from concurrent.futures import ProcessPoolExecutor

def worker(procnum):
    '''worker function'''
    print(str(procnum) + ' represent!')
    return procnum


if __name__ == '__main__':
    with ProcessPoolExecutor() as executor:
        print(list(executor.map(worker, range(5))))

출력:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]

간단한 솔루션:

import multiprocessing

output=[]
data = range(0,10)

def f(x):
    return x**2

def handler():
    p = multiprocessing.Pool(64)
    r=p.map(f, data)
    return r

if __name__ == '__main__':
    output.append(handler())

print(output[0])

출력:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

다음과 같이 프로세스 풀 실행기를 사용하여 함수에서 반환 값을 가져올 수 있습니다.

from concurrent.futures import ProcessPoolExecutor

def test(num1, num2):
    return num1 + num2

with ProcessPoolExecutor() as executor:
    feature = executor.submit(test, 2, 3)
    print(feature.result()) # 5

함수에서 오류 코드를 가져와야 하기 때문에 바텍의 답변을 조금 수정했습니다.(고마워요, 척추!!!그것은 멋진 속임수입니다.

이것은 또한 다음과 같은 방법으로 수행할 수 있습니다.manager.list하지만 저는 그것을 딕트에 넣고 그 안에 목록을 저장하는 것이 더 낫다고 생각합니다.그래야 목록이 채워질 순서를 확신할 수 없기 때문에 함수와 결과를 유지할 수 있습니다.

from multiprocessing import Process
import time
import datetime
import multiprocessing


def func1(fn, m_list):
    print 'func1: starting'
    time.sleep(1)
    m_list[fn] = "this is the first function"
    print 'func1: finishing'
    # return "func1"  # no need for return since Multiprocess doesnt return it =(

def func2(fn, m_list):
    print 'func2: starting'
    time.sleep(3)
    m_list[fn] = "this is function 2"
    print 'func2: finishing'
    # return "func2"

def func3(fn, m_list):
    print 'func3: starting'
    time.sleep(9)
    # if fail wont join the rest because it never populate the dict
    # or do a try/except to get something in return.
    raise ValueError("failed here")
    # if we want to get the error in the manager dict we can catch the error
    try:
        raise ValueError("failed here")
        m_list[fn] = "this is third"
    except:
        m_list[fn] = "this is third and it fail horrible"
        # print 'func3: finishing'
        # return "func3"


def runInParallel(*fns):  # * is to accept any input in list
    start_time = datetime.datetime.now()
    proc = []
    manager = multiprocessing.Manager()
    m_list = manager.dict()
    for fn in fns:
        # print fn
        # print dir(fn)
        p = Process(target=fn, name=fn.func_name, args=(fn, m_list))
        p.start()
        proc.append(p)
    for p in proc:
        p.join()  # 5 is the time out

    print datetime.datetime.now() - start_time
    return m_list, proc

if __name__ == '__main__':
    manager, proc = runInParallel(func1, func2, func3)
    # print dir(proc[0])
    # print proc[0]._name
    # print proc[0].name
    # print proc[0].exitcode

    # here you can check what did fail
    for i in proc:
        print i.name, i.exitcode  # name was set up in the Process line 53

    # here will only show the function that worked and where able to populate the 
    # manager dict
    for i, j in manager.items():
        print dir(i)  # things you can do to the function
        print i, j

언급URL : https://stackoverflow.com/questions/10415028/how-to-get-the-return-value-of-a-function-passed-to-multiprocessing-process

반응형