Python : Python 함수를 지불해야합니까?
먼저 조사했지만 내 질문에 대한 답을 찾지 못했습니다. Python에서 여러 함수를 지원합니다.
다음과 같은 것이 있습니다.
files.py
import common #common is a util class that handles all the IO stuff
dir1 = 'C:\folder1'
dir2 = 'C:\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]
def func1():
c = common.Common()
for i in range(len(addFiles)):
c.createFiles(addFiles[i], filename, dir1)
c.getFiles(dir1)
time.sleep(10)
c.removeFiles(addFiles[i], dir1)
c.getFiles(dir1)
def func2():
c = common.Common()
for i in range(len(addFiles)):
c.createFiles(addFiles[i], filename, dir2)
c.getFiles(dir2)
time.sleep(10)
c.removeFiles(addFiles[i], dir2)
c.getFiles(dir2)
func1과 func2를 호출하고 동시에 실행하고 싶습니다. 함수는 서로 상호 작용하는 객체에서 상호 작용하지 않습니다. 지금은 func2가 시작되기 전에 func1이 끝날 때까지 기다려야합니다. 다음과 같이 어떻게하면?
process.py
from files import func1, func2
runBothFunc(func1(), func2())
매 분마다 생성되는 파일 수를 계산하기 때문에 두 디렉토리를 거의 같은 시간에 만들 수 있기 때문에 원합니다. 디렉토리가 보관 내 타이밍이 떨어질 것입니다.
은 사용할 수 당신 있습니다 또는 .threading
multiprocessing
로 인해 CPython과의 특수성 , threading
진정한 송신 처리를 달성하기 어렵다. 일반적으로 multiprocessing
더 나은 방법입니다.
다음은 완전한 예입니다.
from multiprocessing import Process
def func1():
print 'func1: starting'
for i in xrange(10000000): pass
print 'func1: finishing'
def func2():
print 'func2: starting'
for i in xrange(10000000): pass
print 'func2: finishing'
if __name__ == '__main__':
p1 = Process(target=func1)
p1.start()
p2 = Process(target=func2)
p2.start()
p1.join()
p2.join()
프로세스를 시작 / 결합하는 함수는 다음과 같은 라인을 따라 쉽게 캡슐화 될 수 있습니다 runBothFunc
.
def runInParallel(*fns):
proc = []
for fn in fns:
p = Process(target=fn)
p.start()
proc.append(p)
for p in proc:
p.join()
runInParallel(func1, func2)
이는 Python 코드를 쉽게보고 화하고 배포 할 수있는 시스템 인 Ray로 우아하게 수행 할 수 있습니다 .
예제를 제출 화 한 @ray.remote
데코레이터로 함수를 정의한 다음 .remote
.
import ray
ray.init()
dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]
# Define the functions.
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
# func1() code here...
@ray.remote
def func2(filename, addFiles, dir):
# func2() code here...
# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)])
두 함수에 동일한 인수를 전달하고 인수가 큰 경우이를 수행하는 것보다 방법은 ray.put()
. 이렇게하면 큰 인수가 두 번 중첩 화되고 두 개의 메모리 복사본이 생성되는 것을 방지 할 수 있습니다.
largeData_id = ray.put(largeData)
ray.get([func1(largeData_id), func2(largeData_id)])
만약 func1()
및 func2()
반환은 다음과 같이 코드를 다시 작성해야합니다.
ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func1.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])
다중 처리 모듈에 비해 Ray를 사용하면 많은 이점이 있습니다. 특히, 동일한 코드 가 단일 시스템과 시스템 클러스터에서 실행됩니다. Ray의 더 많은 장점은 이 관련 게시물을 참조하십시오 .
두 기능이 서로 동기화되어 실행 가능한 보장이 없습니다.
가장 좋은 방법은 함수를 여러 단계로 나눈 다음 Process.join
@aix의 답과 같은 중요한 동기화 지점에서 둘 다 완료 될 때까지있을 것입니다 .
이보다 낫다 time.sleep(10)
정확한 타이밍을 보장 할 수 없기 때문에 . 명시 적으로 기다림을 사용하면 다음 단계로 이동하기 전에 해당 단계를 실행해야 함을 의미합니다. 10ms 내에서 수행 될 것이라고 가정하는 대신 컴퓨터에서 다른 작업이 진행되고 있습니다.
Windows 사용자이고 Python 3을 사용하는 경우이 게시물은 python에서 메시지가 될 프로그래밍을 수행하는 데 도움이됩니다. 일반적인 다중 처리 라이브러리의 풀 프로그래밍을 때 프로그램의 주요 기능과 오류가 발생합니다. 이것은 창에 fork () 기능이 없기 때문입니다. 아래에있는 언급 된 문제에 대한 해결책을 제공합니다.
http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html
거의 3을 사용하고 있었기 때문에 프로그램을 다음과 같이 약간 변경했습니다.
from types import FunctionType
import marshal
def _applicable(*args, **kwargs):
name = kwargs['__pw_name']
code = marshal.loads(kwargs['__pw_code'])
gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls'])
defs = marshal.loads(kwargs['__pw_defs'])
clsr = marshal.loads(kwargs['__pw_clsr'])
fdct = marshal.loads(kwargs['__pw_fdct'])
func = FunctionType(code, gbls, name, defs, clsr)
func.fdct = fdct
del kwargs['__pw_name']
del kwargs['__pw_code']
del kwargs['__pw_defs']
del kwargs['__pw_clsr']
del kwargs['__pw_fdct']
return func(*args, **kwargs)
def make_applicable(f, *args, **kwargs):
if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
kwargs['__pw_name'] = f.__name__ # edited
kwargs['__pw_code'] = marshal.dumps(f.__code__) # edited
kwargs['__pw_defs'] = marshal.dumps(f.__defaults__) # edited
kwargs['__pw_clsr'] = marshal.dumps(f.__closure__) # edited
kwargs['__pw_fdct'] = marshal.dumps(f.__dict__) # edited
return _applicable, args, kwargs
def _mappable(x):
x,name,code,defs,clsr,fdct = x
code = marshal.loads(code)
gbls = globals() #gbls = marshal.loads(gbls)
defs = marshal.loads(defs)
clsr = marshal.loads(clsr)
fdct = marshal.loads(fdct)
func = FunctionType(code, gbls, name, defs, clsr)
func.fdct = fdct
return func(x)
def make_mappable(f, iterable):
if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
name = f.__name__ # edited
code = marshal.dumps(f.__code__) # edited
defs = marshal.dumps(f.__defaults__) # edited
clsr = marshal.dumps(f.__closure__) # edited
fdct = marshal.dumps(f.__dict__) # edited
return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable)
이 함수 후에는 위의 문제 코드도 다음과 같이 약간 변경됩니다.
from multiprocessing import Pool
from poolable import make_applicable, make_mappable
def cube(x):
return x**3
if __name__ == "__main__":
pool = Pool(processes=2)
results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)]
print([result.get(timeout=10) for result in results])
그리고 출력은 다음과 가변합니다.
[1, 8, 27, 64, 125, 216]
이 게시물이 일부 Windows 사용자에게 유용 할 생각합니다.
함수가 주로 I / O 작업 (그리고 CPU 작업이 적음)을 수행하고 Python 3.2 이상을 사용하는 경우 ThreadPoolExecutor를 사용할 수 있습니다 .
from concurrent.futures import ThreadPoolExecutor
def run_io_tasks_in_parallel(tasks):
with ThreadPoolExecutor() as executor:
running_tasks = [executor.submit(task) for task in tasks]
for running_task in running_tasks:
running_task.result()
run_io_tasks_in_parallel([
lambda: print('IO task 1 running!'),
lambda: print('IO task 2 running!'),
])
함수가 주로 CPU 작업을 수행하고 (그리고 I / O 작업이 적다면) Python 2.6 이상을 사용하는 경우 다중 처리 모듈을 사용할 수 있습니다 .
from multiprocessing import Process
def run_cpu_tasks_in_parallel(tasks):
running_tasks = [Process(target=task) for task in tasks]
for running_task in running_tasks:
running_task.start()
for running_task in running_tasks:
running_task.join()
run_cpu_tasks_in_parallel([
lambda: print('CPU task 1 running!'),
lambda: print('CPU task 2 running!'),
])
참고 URL : https://stackoverflow.com/questions/7207309/python-how-can-i-run-python-functions-in-parallel
'ProgramingTip' 카테고리의 다른 글
어디에서 문자, 숫자, 밑줄 및 대시 만 포함되어 있는지 확인하십시오. (0) | 2020.10.07 |
---|---|
git 브랜치를 리베이스하는 동안 타임 스탬프 변경 (0) | 2020.10.05 |
golang 인터랙티브 디버거가 있습니까? (0) | 2020.10.05 |
Go에서 파일 이름에 대한 규칙은 무엇입니까? (0) | 2020.10.05 |
Git 저장소의 기록 축소 (0) | 2020.10.05 |