IT Log

[Python] Ray Tutorial 본문

Python

[Python] Ray Tutorial

newly0513 2021. 6. 7. 20:53
728x90
반응형

Ray 홈페이지 : https://docs.ray.io/en/master/index.html

 


Ray

분산 애플리케이션을 구축하기위한 단순하고 범용적인 API를 제공


Ray Install

pip install -U ray


# FutureWarning: Not all Ray CLI dependencies were found. 
# In Ray 1.4+, the Ray CLI, autoscaler, and dashboard will only be usable via `pip install 'ray[default]'`. 
# Please update your install command
# 위 오류가 발생 시 아래 명령 실행

pip install 'ray[default]'

Ray API

API 설명
ray.init() Ray context를 초기화
@ray.remote 함수가 될 것을 지정 (함수 또는 Class)
.remote() 모든 remote function, remote class 선언 또는 remote class method 호출 (비동기 작업)
ray.put() 개체 저장소에 개체를 저장하고 해당 ID를 반환
ID는 원격 함수 또는 메서드 호출에 대한 인수로 개체를 전달하는데 사용 (동기식 작업)
ray.get() 개체 ID 또는 개체 ID 목록에서 개체 또는 개체 목록을 반환 (동기식 작업)
ray.wait() 개체 ID 목록에서 (1) 준비된 개체의 ID 목록과 (2) 아직 준비되지 않은 개체의 ID 목록을 반환
기본적으로 한 번에 하나의 준비된 개체 ID를 반환

ray.get()

import time
import ray

ray.init(num_cpus = 4) # Specify this system has 4 CPUs.

@ray.remote
def do_some_work(x):
    time.sleep(1) # Replace this is with work you need to do.
    return x

start = time.time()

#(1) @ray.remote X / ray.get() X
# results = [do_some_work.remote(x) for x in range(4)]
#(2) @ray.remote O / ray.get() X
# results = [do_some_work.remote(x) for x in range(4)]
#(3) @ray.remote O / ray.get() O
# results = [ray.get(do_some_work.remote(x)) for x in range(4)]
#(4) @ray.remote O / ray.get() O
# results = ray.get([do_some_work.remote(x) for x in range(4)])

print("duration =", time.time() - start)
print("results = ", results)
  1. @ray.remote X / ray.get() X :
    병렬화 하지 않으므로, 4초가 소요되며, results 값은 [0, 1, 2, 3]
  2. @ray.remote O / ray.get() X :
    병렬화를 진행하므로, 0.00..초가 소요되며, results 값은 [ObjectRef(df5a1a828c9685d3ffffffff0100000001000000), ..]로 개체 ID값을 반환
  3. @ray.remote O / ray.get() O :
    병렬화를 진행하나, 4초가 소요되며, 1번과 같은 results 값을 반환. 이 경우 원격 작업이 끝난 후 결과를 호출하여, 병렬처리 의미가 없음
  4. @ray.remote O / ray.get() O :
    병렬화를 진행하고, 모든작업이 끝난 후 ray.get()을 호출하므로, 1초가 소요되며, 1번과 같은 results값을 반환

※ 4번과 같이 사용하는게 올바른 사용법.


작은 작업 피하기

# 1. 일반작업
import time

def tiny_work(x):
    time.sleep(0.0001) # Replace this with work you need to do.
    return x

start = time.time()
results = [tiny_work(x) for x in range(100000)]
print("duration =", time.time() - start)
# duration = 13.36544418334961

# 2. @ray.remote
import time
import ray

ray.init(num_cpus = 4)

@ray.remote
def tiny_work(x):
    time.sleep(0.0001) # Replace this is with work you need to do.
    return x

start = time.time()
result_ids = [tiny_work.remote(x) for x in range(100000)]
results = ray.get(result_ids)
print("duration =", time.time() - start)
# duration = 27.46447515487671

# 3. @ray.remote 
import time
import ray

ray.init(num_cpus = 4)

def tiny_work(x):
    time.sleep(0.0001) # replace this is with work you need to do
    return x

@ray.remote
def mega_work(start, end):
    return [tiny_work(x) for x in range(start, end)]

start = time.time()
result_ids = []
[result_ids.append(mega_work.remote(x*1000, (x+1)*1000)) for x in range(100)]
results = ray.get(result_ids)
print("duration =", time.time() - start)
# duration = 3.2539820671081543

※ 1번 작업보다 2번 작업이 오래 걸리는 이유는 모든 작업 호출에 오버헤드가 발생하여 실행하는데 걸리는 실제 시간에 영향을 주므로, 3번과 같이 실행하는 것이 올바른 사용법


개체 저장소에 저장

# 1. ray.put() X
import time
import numpy as np
import ray

ray.init(num_cpus = 4)

@ray.remote
def no_work(a):
    return

start = time.time()
a = np.zeros((5000, 5000))
result_ids = [no_work.remote(a) for x in range(10)]
results = ray.get(result_ids)
print("duration =", time.time() - start)
# duration = 1.0837509632110596

# 2. ray.put() O
import time
import numpy as np
import ray

ray.init(num_cpus = 4)

@ray.remote
def no_work(a):
    return

start = time.time()
a_id = ray.put(np.zeros((5000, 5000)))
result_ids = [no_work.remote(a_id) for x in range(10)]
results = ray.get(result_ids)
print("duration =", time.time() - start)
# duration = 0.132796049118042

※ 1번 작업은 250만개의 배열이 복사되나, 2번 작업은 객체 저장소 동일한 개체의 여러 복사본을 생성하지 않으므로, 원래 프로그램보다 빠르게 실행됨 


파이프 라인

# (a)
import time
import random
import ray

ray.init(num_cpus = 4)

@ray.remote
def do_some_work(x):
    time.sleep(random.uniform(0, 4)) # Replace this with work you need to do.
    return x

def process_results(results):
    sum = 0
    for x in results:
        time.sleep(1) # Replace this with some processing code.
        sum += x
    return sum

start = time.time()
data_list = ray.get([do_some_work.remote(x) for x in range(4)])
sum = process_results(data_list)
print("duration =", time.time() - start, "\nresult = ", sum)

# duration = 7.82636022567749
# result =  6

# (b)
import time
import random
import ray

ray.init(num_cpus = 4)

@ray.remote
def do_some_work(x):
    time.sleep(random.uniform(0, 4)) # Replace this is with work you need to do.
    return x

def process_incremental(sum, result):
    time.sleep(1) # Replace this with some processing code.
    return sum + result

start = time.time()
result_ids = [do_some_work.remote(x) for x in range(4)]
sum = 0
while len(result_ids):
    done_id, result_ids = ray.wait(result_ids)
    sum = process_incremental(sum, ray.get(done_id[0]))

print("duration =", time.time() - start, "\nresult = ", sum)
# duration = 4.852453231811523
# result =  6

※ (a)는 'do_some_work()'함수를 ray_get()으로 호출한 경우 마지막 작업이 완료될 때까지 기다린 후 'process_results'함수가 실행되나, (b)는 ray_wait()으로 개체 ID 목록에서  준비된 개체의 ID 목록과 아직 준비되지 않은 개체의 ID 목록을 반환하여 ray.get()을 통해 개체 ID 목록에서 개체를 반환하여 한 번에 하나의 결과를 처리합니다.

 


Example

# First, run `pip install ray`.

import ray
ray.init()

@ray.remote
def f(x):
    return x * x

futures = [f.remote(i) for i in range(4)]
print(ray.get(futures)) # [0, 1, 4, 9]

@ray.remote
class Counter(object):
    def __init__(self):
        self.n = 0

    def increment(self):
        self.n += 1

    def read(self):
        return self.n

counters = [Counter.remote() for i in range(4)]
[c.increment.remote() for c in counters]
futures = [c.read.remote() for c in counters]
print(ray.get(futures)) # [1, 1, 1, 1]

 

728x90
반응형

'Python' 카테고리의 다른 글

[Python] 정규 표현식  (0) 2022.05.24
[Python] iterable  (0) 2021.06.06
[Python] Multiline String (여러줄 문자열)  (0) 2021.06.06
[Python] WARNING: The script flask is installed in  (0) 2021.06.06
[Python] Selenium (Keys)  (1) 2021.06.06
Comments