IT Log
[Python] Ray Tutorial 본문
728x90
반응형
Ray 홈페이지 : https://docs.ray.io/en/master/index.html
Ray
분산 애플리케이션을 구축하기위한 단순하고 범용적인 API를 제공
Ray Install
pip install -U ray
# FutureWarning: Not all Ray CLI dependencies were found.
# In Ray 1.4+, the Ray CLI, autoscaler, and dashboard will only be usable via `pip install 'ray[default]'`.
# Please update your install command
# 위 오류가 발생 시 아래 명령 실행
pip install 'ray[default]'
Ray API
API | 설명 |
ray.init() | Ray context를 초기화 |
@ray.remote | 함수가 될 것을 지정 (함수 또는 Class) |
.remote() | 모든 remote function, remote class 선언 또는 remote class method 호출 (비동기 작업) |
ray.put() | 개체 저장소에 개체를 저장하고 해당 ID를 반환 ID는 원격 함수 또는 메서드 호출에 대한 인수로 개체를 전달하는데 사용 (동기식 작업) |
ray.get() | 개체 ID 또는 개체 ID 목록에서 개체 또는 개체 목록을 반환 (동기식 작업) |
ray.wait() | 개체 ID 목록에서 (1) 준비된 개체의 ID 목록과 (2) 아직 준비되지 않은 개체의 ID 목록을 반환 기본적으로 한 번에 하나의 준비된 개체 ID를 반환 |
ray.get()
import time
import ray
ray.init(num_cpus = 4) # Specify this system has 4 CPUs.
@ray.remote
def do_some_work(x):
time.sleep(1) # Replace this is with work you need to do.
return x
start = time.time()
#(1) @ray.remote X / ray.get() X
# results = [do_some_work.remote(x) for x in range(4)]
#(2) @ray.remote O / ray.get() X
# results = [do_some_work.remote(x) for x in range(4)]
#(3) @ray.remote O / ray.get() O
# results = [ray.get(do_some_work.remote(x)) for x in range(4)]
#(4) @ray.remote O / ray.get() O
# results = ray.get([do_some_work.remote(x) for x in range(4)])
print("duration =", time.time() - start)
print("results = ", results)
- @ray.remote X / ray.get() X :
병렬화 하지 않으므로, 4초가 소요되며, results 값은 [0, 1, 2, 3] - @ray.remote O / ray.get() X :
병렬화를 진행하므로, 0.00..초가 소요되며, results 값은 [ObjectRef(df5a1a828c9685d3ffffffff0100000001000000), ..]로 개체 ID값을 반환 - @ray.remote O / ray.get() O :
병렬화를 진행하나, 4초가 소요되며, 1번과 같은 results 값을 반환. 이 경우 원격 작업이 끝난 후 결과를 호출하여, 병렬처리 의미가 없음 - @ray.remote O / ray.get() O :
병렬화를 진행하고, 모든작업이 끝난 후 ray.get()을 호출하므로, 1초가 소요되며, 1번과 같은 results값을 반환
※ 4번과 같이 사용하는게 올바른 사용법.
작은 작업 피하기
# 1. 일반작업
import time
def tiny_work(x):
time.sleep(0.0001) # Replace this with work you need to do.
return x
start = time.time()
results = [tiny_work(x) for x in range(100000)]
print("duration =", time.time() - start)
# duration = 13.36544418334961
# 2. @ray.remote
import time
import ray
ray.init(num_cpus = 4)
@ray.remote
def tiny_work(x):
time.sleep(0.0001) # Replace this is with work you need to do.
return x
start = time.time()
result_ids = [tiny_work.remote(x) for x in range(100000)]
results = ray.get(result_ids)
print("duration =", time.time() - start)
# duration = 27.46447515487671
# 3. @ray.remote
import time
import ray
ray.init(num_cpus = 4)
def tiny_work(x):
time.sleep(0.0001) # replace this is with work you need to do
return x
@ray.remote
def mega_work(start, end):
return [tiny_work(x) for x in range(start, end)]
start = time.time()
result_ids = []
[result_ids.append(mega_work.remote(x*1000, (x+1)*1000)) for x in range(100)]
results = ray.get(result_ids)
print("duration =", time.time() - start)
# duration = 3.2539820671081543
※ 1번 작업보다 2번 작업이 오래 걸리는 이유는 모든 작업 호출에 오버헤드가 발생하여 실행하는데 걸리는 실제 시간에 영향을 주므로, 3번과 같이 실행하는 것이 올바른 사용법
개체 저장소에 저장
# 1. ray.put() X
import time
import numpy as np
import ray
ray.init(num_cpus = 4)
@ray.remote
def no_work(a):
return
start = time.time()
a = np.zeros((5000, 5000))
result_ids = [no_work.remote(a) for x in range(10)]
results = ray.get(result_ids)
print("duration =", time.time() - start)
# duration = 1.0837509632110596
# 2. ray.put() O
import time
import numpy as np
import ray
ray.init(num_cpus = 4)
@ray.remote
def no_work(a):
return
start = time.time()
a_id = ray.put(np.zeros((5000, 5000)))
result_ids = [no_work.remote(a_id) for x in range(10)]
results = ray.get(result_ids)
print("duration =", time.time() - start)
# duration = 0.132796049118042
※ 1번 작업은 250만개의 배열이 복사되나, 2번 작업은 객체 저장소 동일한 개체의 여러 복사본을 생성하지 않으므로, 원래 프로그램보다 빠르게 실행됨
파이프 라인
# (a)
import time
import random
import ray
ray.init(num_cpus = 4)
@ray.remote
def do_some_work(x):
time.sleep(random.uniform(0, 4)) # Replace this with work you need to do.
return x
def process_results(results):
sum = 0
for x in results:
time.sleep(1) # Replace this with some processing code.
sum += x
return sum
start = time.time()
data_list = ray.get([do_some_work.remote(x) for x in range(4)])
sum = process_results(data_list)
print("duration =", time.time() - start, "\nresult = ", sum)
# duration = 7.82636022567749
# result = 6
# (b)
import time
import random
import ray
ray.init(num_cpus = 4)
@ray.remote
def do_some_work(x):
time.sleep(random.uniform(0, 4)) # Replace this is with work you need to do.
return x
def process_incremental(sum, result):
time.sleep(1) # Replace this with some processing code.
return sum + result
start = time.time()
result_ids = [do_some_work.remote(x) for x in range(4)]
sum = 0
while len(result_ids):
done_id, result_ids = ray.wait(result_ids)
sum = process_incremental(sum, ray.get(done_id[0]))
print("duration =", time.time() - start, "\nresult = ", sum)
# duration = 4.852453231811523
# result = 6

※ (a)는 'do_some_work()'함수를 ray_get()으로 호출한 경우 마지막 작업이 완료될 때까지 기다린 후 'process_results'함수가 실행되나, (b)는 ray_wait()으로 개체 ID 목록에서 준비된 개체의 ID 목록과 아직 준비되지 않은 개체의 ID 목록을 반환하여 ray.get()을 통해 개체 ID 목록에서 개체를 반환하여 한 번에 하나의 결과를 처리합니다.
Example
# First, run `pip install ray`.
import ray
ray.init()
@ray.remote
def f(x):
return x * x
futures = [f.remote(i) for i in range(4)]
print(ray.get(futures)) # [0, 1, 4, 9]
@ray.remote
class Counter(object):
def __init__(self):
self.n = 0
def increment(self):
self.n += 1
def read(self):
return self.n
counters = [Counter.remote() for i in range(4)]
[c.increment.remote() for c in counters]
futures = [c.read.remote() for c in counters]
print(ray.get(futures)) # [1, 1, 1, 1]
728x90
반응형
'Python' 카테고리의 다른 글
[Python] 정규 표현식 (0) | 2022.05.24 |
---|---|
[Python] iterable (0) | 2021.06.06 |
[Python] Multiline String (여러줄 문자열) (0) | 2021.06.06 |
[Python] WARNING: The script flask is installed in (0) | 2021.06.06 |
[Python] Selenium (Keys) (1) | 2021.06.06 |