원형 큐(Queue)는 FIFO 구조를 가지고 있는 점에서 기존 큐(Queue)와 동일하다
마지막 위치가 시작 위치와 연결되는 점이 다른 점이다.
기존의 큐는 공간이 꽉 차게 되면 더 이상 요소를 채울 수 없다.
기존 큐는 앞쪽 값들이 빠져서 충분한 공간이 생기는 것처럼 보여도 해당 위치로 추가할 수 없다.
원형 큐는 앞쪽 요소들이 빠지더라도 원형으로 이뤄져 있기 때문에 앞쪽에 추가 할 수 있다.
즉, 재활용이 가능한 자료구조이다.
원형 큐 삽입 삭제 원리
출처 : 파이썬 알고리즘 인터뷰 p.260
마지막 위치와 시작 위치를 연결하도록 원형 구조를 세팅하고, 값의 시작점과 끝점을 따라 투 포인터가 움직인다.
위 그림을 참고하면, enQueue() 를 통해 rear 포인터를 이동시키고, deQueue 를 통해 front 포인터를 이동시킨다.
이 로직을 통해 투 포인터가 돌면서 이동하게 된다.
만약 rear 포인터와 front 포인터가 만나게 되면 해당 원형 큐 구조에 여유 공간이 없다는 의미이므로 공간 부족 에러를 발생해야 한다.
원형 큐 구현 풀이
배열을 사용하여 구현한다.
rear 포인터와 front 포인터를 구분한다
현재 포인터의 위치를 전체 큐 값의 개수에 따라 제한한다.
속도 : 52ms
class MyCircularQueue(object):
def __init__(self, k: int):
self.q = [None] * k # 원형 큐 정의 (배열 활용)
self.maxlen = k # 최대 길이 정의
self.fp = 0 # front pointer
self.rp = 0 # rear pointer
def enQueue(self, value: int) -> bool:
# 값을 추가할 때는 rear pointer 활용한다
# rear 포인터 위치에 있는 큐 값이 없으면 입력된 value를 넣어준다
if self.q[self.rp] is None:
self.q[self.rp] = value
# 입력해준 후 rear 포인터이 값을 업데이트 한다
self.rp = (self.rp + 1) % self.maxlen
return True
else:
return False
def deQueue(self) -> bool:
"""
원형 큐의 첫 번째 값이 제거되도록 기능 구현
"""
# 값을 삭제할 때는 front pointer를 활용한다.
# front pointer의 값이 아무것도 없으면 삭제할 값이 없다는 의미
if self.q[self.fp] is None:
return False
# 값이 있으면, 그 값을 삭제하되 출력(반환)되지 않도록 세팅 필요
else:
self.q[self.fp] = None
# front pointer를 업데이트한다.
self.fp = (self.fp + 1) % self.maxlen
return True
def Front(self) -> int:
"""
원형큐의 맨 앞에 있는 값을 반환
값이 없을 경우 -1 반환
"""
return -1 if self.q[self.fp] is None else self.q[self.fp]
def Rear(self):
"""
:rtype: int
원형 큐의 맨 뒤에 있는 값을 반환
값이 없을 경우 -1 반환
"""
# rear pointer 에서 1을 빼준 위치에서 값을 가져와야 한다.
return -1 if self.q[self.rp - 1] is None else self.q[self.rp - 1]
def isEmpty(self):
"""
:rtype: bool
값이 비어 있을 경우 True, 아닐 경우 False
포인터의 위치가 같은데 front pointer의 값이 없다면 해당 원형 큐의 값을 아무것도 없는 비어있는 상태라는 의미
"""
return self.fp == self.rp and self.q[self.fp] is None
def isFull(self):
"""
:rtype: bool
값이 모두 채워져 있는 경우를 아래와 같은 로직으로 판단
우선 포인터의 위치가 같아야 하며, 그럴 때 front pointer의 값이 채워져 있어야 한다.
"""
return self.fp == self.rp and self.q[self.fp] is not None
이유는 맨 마지막 값만을 활용할 수 있기 때문에 스택의 연산으로 값을 추출하면 값이 곧 최신에 들어간 값(마지막 값)이다.
그런데 큐는 맨 처음에 들어간 값이 추출돼야 해서 스택의 연산으로 할 경우 이동이 필요하다.
output 값이 아무것도 없기 전까지는 재입력하는 알고리즘이 돌아가지 않는다
시간복잡도는 O(1) 로 계산되게끔 구현한다
실행속도 : 18 ms
class MyQueue(object):
def __init__(self):
self.input = []
self.output = []
def push(self, x):
"""
:type x: int
:rtype: None
"""
self.input.append(x)
def pop(self):
"""
:rtype: int
"""
# input 값을 재정렬하는 과정 거친 후
self.peek()
# 제일 처음에 들어온 값을 추출
return self.output.pop()
def peek(self):
"""
:rtype: int
"""
# 출력값이 아무것도 없다면 입력값 확인해서 값 재정렬
if not self.output:
# 새로 들어온 값이 있다면
# 해당 값은 output으로 queue 구조로 재입력 돼야 함
while self.input:
# 우선 input값에서 마지막 값을 빼고
# 해당 값을 output에 집어 넣는다
self.output.append(self.input.pop())
# 결과값이 맨 마지막 위치한 값이 제일 처음 들어온 값으로 위에서 세팅했음
return self.output[-1]
def empty(self):
"""
:rtype: bool
"""
# 입력값과 결과값 stack을 모두 확인해야 함
# 입력값에 값이 들어온 후 결과값에 재정령 안 돼 있을 수도 있기 때문
return self.input == [] and self.output == []
큐의 연산(맨 앞에 있는 값을 제일 먼저 가져오는)만을 사용하기 위해 스택의 LIFO를 구현해줘야 한다
즉, 맨 나중에 들어오는 값이 맨 앞(맨 왼쪽에 위치하게끔 바꿔야 한다.
실행 속도 : 39ms
class MyStack(object):
def __init__(self):
self.q = collections.deque()
def push(self, x):
"""
:type x: int
:rtype: None
"""
# Step1. 값을 집어 넣는다
self.q.append(x)
# Step2. 데이터를 재정렬한다.
# 현재 들어 있는 값중 맨 마지막 값은 가만히 있으면 되기 때문에
# 전체 개수에서 1개 뺀만큼만 이동하면 된다.
for _ in range(len(self.q)-1):
# deque 자료구조의 연산중 popleft는 결국 que의 선입선출 연산구조를 의미한다.
self.q.append(self.q.popleft())
def pop(self):
"""
:rtype: int
"""
return self.q.popleft()
def top(self):
"""
:rtype: int
"""
return self.q[0]
def empty(self):
"""
:rtype: bool
"""
return len(self.q)==0
class Solution(object):
def dailyTemperatures(self, temperatures):
"""
:type temperatures: List[int]
:rtype: List[int]
"""
result=[]
for i, t in enumerate(temperatures):
# 다음 따뜻한 날을 체크하기 위한 변수
day=0
# 전체 온도의 개수에서 2개를 뺀 이유는 리스트의 index를 사용하는데
# index에서 +1 하기 때문에 마지막 index 값에서 1을 더하면 out of range error 발생
length=len(temperatures)-2
# 맨 마지막 값인지 아닌지 체크
if length >= i:
# 다음 온도가 현재 온도 보다 높은지 체크하고 만약 낮거나 같다면 day와 i가 +1 증가하여
# 그 다음 온도를 체크하면서 day값을 측정
while t >= temperatures[i+1] and i < length:
day += 1
i += 1
# 특별한 예외 상황이 있는데
# 만약 끝까지 확인했는데 더 높은 온도가 없으면 0으로 입력해야 함
# 이 부분이 없으면 맨 끝까지 확인하면서 증가한 day가 입력될 것으로 예상
if i==length and t >= temperatures[i+1]:
result.append(0)
else:
result.append(day+1)
# 맨 마지막 값이라면 무조건 0으로 입력
else:
result.append(0)
return result
문제 풀이 2
현재 index를 stack 자료구로로 쌓아두다가, 이전보다 상승하는 지점에서 현재 온도와 stack에 쌓아둔 index 지점의 온도 차이 비교
만약 더 높다면, stack의 값을 꺼내 현재 index와 비교하여 그 차이를 정답으로 입력한다.
즉, 현재 온도가 이전 온도들과 비교할 때 stack에 쌓여 있는 index를 활용할 것이고, 이 index의 값이 현재 온도보다 작다면 해당 index는 추출되고 그 차이가 정답으로 옮겨져서 결국, stack에는 해당 index가 안 쌓여 있을 것이다.
속도 : 31ms
class Solution(object):
def dailyTemperatures(self, temperatures):
"""
:type temperatures: List[int]
:rtype: List[int]
"""
# 정답 리스트 세팅 (0으로 세팅해두면, 알아서 아래 로직이 거치지 않는 곳은 0으로 세팅 됨)
answer=[0] * len(temperatures)
# 위 온도 값의 index를 담을 자료 구조
stack=[]
for i, cur in enumerate(temperatures):
# step1. stack 값이 있는지 확인
# step2. 현재 온도가 stack의 쌓여 있는 index의 값보다 크다면
while stack and cur > temperatures[stack[-1]]:
# stack에서 해당 index 추출
last = stack.pop()
# 해당 index 위치에 정답 입력
# 현재 i 와 차이값으로 세팅
answer[last]=i-last
# 현재 온도의 index가 추가 돼야 추후 온도들과 비교 가능
stack.append(i)
return answer
긴 시퀀스를 활용해서 RNN 신경망 모델로 구현하면 불안정한 그레디언트 문제가 발생합니다. 그레이던트의 소실, 폭주 혹은 긴 훈련 시간, 불안정한 훈련 등 다양한 문제에 봉착할 수 있습니다. 이런 긴 시퀀스를 처리하기 위해서 다양한 해결방법이 있지만 그 중 층 정규화에 대해 먼저 살펴보겠습니다.
층 정규화는 RNN 모델 각 층 사이에서 feature dimention에 대해 정규화하는 방법입니다. 즉, 각 타임 스텝 사이에서 층 정규화를 통해 긴 시퀀스 데이터를 학습할 때 생기는 정보 손실과 불안정한 그레디언트를 예방할 수 있습니다. 간단하게 구현해 보겠습니다.
class LNSimpleRNNCell(tf.keras.layers.Layer):
def __init__(self, units, activation='tanh', **kwargs):
super().__init__(**kwargs)
# state는 이전 타임 스템의 은닉층 상태를 담은 매개변수로서
# 하나 이상의 텐서를 담고 있다.
# 각 state_size와 output_size는 unit 개수로 맞춰준다
self.state_size = units
self.output_size = units
# SimpleRNN Cell을 구성하는데 활성화 함수가 없는 이유는
# 이전 스텝의 결과(output)와 현재 input(state, 은닉층)을 선형 연산 후 활성화 함수 이전에 층 정규화를 수행하기 위해서힙니다.
self.simple_rnn_cell = tf.keras.layers.SimpleRNN(units, activation=None)
# 각 타임 스텝마다 층 정규화가 이뤄질 수 있도록 설정
self.layer_norm = tf.keras.layer.LayerNormalization()
# 앞에서 설정하지 않았던 활성화 함수를 층 정규화 이후에 세팅합니다.
self.activation = tf.keras.activations.get(activation) # 탄젠트 함수를 activation으로 사용
def call(self, inputs, states):
# SimpleRNN 셀을 사용하여 현재 입력(inputs)과 이전 은닉 상태(states)의 선형 조합을 계산합니다.
# 여기서 출력은 은닉 상태와 동일합니다. 즉, new_stats[0] == outputs
# 나머지 new_stats는 무시해도 괜찮음
outputs, new_states = self.simple_rnn_cell(inputs, states)
# 층 정규화와 활성화 함수 차례대로 적용
norm_outputs = self.activation(self.layer_norm(outputs))
# 출력을 두번 반환
# 하나는 출력, 다른 하나는 새로운 은닉 상태
return norm_outputs, [norm_outputs]
# 위 사용자 정의 Cell을 사용하려면 tf.keras.layers.RNN 층을 만들어 이 class 객체를 전달하면 됩니다.
model = tf.keras.models.Sequential([
tf.keras.layers.RNN(LNSimpleRNNCell(20), return_sequences=True, input_shape=[None,1]),
tf.keras.layers.RNN(LNSimpleRNNCell(20), return_sequences=True),
tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(10))
])
위처럼 층 정규화 방법 외에도 dropout 비율 정의를 통해서도 긴 시퀀스를 다루기 위해 불안정한 그레디언트 문제를 감소할 수 있습니다. 그럼, 이제 단기 기억 문제에 대해 알아보고 이에 대한 방안인 LSTM Cell에 대해 알아보겠습니다.
LSTM
RNN 모델에서 타임 스텝이 계속 진행할수록 처음 타임 스텝에서 가져온 시퀀스 데이터의 정보들이 소멸됩니다. 즉, 단기 기억에만 머물게 되는 문제가 발생합니다. 이 문제를 해결하기 위해 장기기억을 담당하는 Cell이 개발됐습니다. 바로 LSTM입니다.
장단기 메모리 셀 즉, LSTM Cell은 훈련을 빠르게 진행하고, 시퀀스 데이터의 장기간 의존성을 유지할 것입니다. keras에서 구현은 간단합니다.
# 간단히 LSTM Cell 사용
model =tf.keras.models.Sequential([
tf.keras.layers.LSTM(20, return_sequences=True, input_shape=[None, 1]),
tf.keras.layers.LSTM(20, return_sequences=True),
tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(10))
])
# RNN 층에 LSTMCell을 매개변수로 지정
model = tf.keras.models.Sequential([
tf.keras.layers.RNN(tf.keras.layers.LSTMCell(20), return_sequences=True, inpit_shape=[None,1]),
tf.keras.layers.RNN(tf.keras.layers.LSTMCell(20), return_sequences=True)
tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(10))
])
그런데 GPU 사용을 고려할 때, LSTMCell이 최적화 돼 있어서 일반적으로는 LSTMCell을 직접사용하고, 사용자 정의 Cell을 만들 때는 RNN layer를 사용하는게 일반적입니다.
LSTM Cell의 작동 구조는 매우 복잡합니다.
출처 : http://www.rex-ai.info/docs/AI_study_LSTM
키포인트는 LSTM 네트워크에서 장기 상태로 저장할 값, 버릴 값, 읽어드릴 값을 학습하는 것입니다. 이전 장기 기억인 (Cell) state는 삭제 게이트를 지나서 일부 정보를 손실하고, 입력 게이트를 지나 선택된 정보들을 추가합니다. 이렇게 만들어진 새로운 장기 기억은 다음 장기 기억(Next Cell State)으로 전달됩니다. 그래서 각 타임 스텝 마다 일부 정보는 손실되고, 추가됩니다. 또한 이 새롭게 만들어진 장기 기억은 tanh 활성화 함수를 거쳐, 출력 게이트를 통과합니다. 이렇게 해서 만들어진 단기 상태가 Output(\(h_t\)) 값으로 전달됩니다.
간략히 정리해보면 LSTM 네트워크는 중요한 입력은 인식하고, 장기 상태를 최적의 정보로 유지하기 위해 보존하고, 필요할 때마다 이를 추출하며 학습합니다. 이런 LSTM 모델은 긴 텍스트, 오디오 등 장기 패턴을 발견하는 데 우수한 성능을 보이고 있습니다.
LSTM 변종 GRU
GRU 셀은 LSTM 셀의 변종 버전으로 구조가 간소화되고, 유사하게 작동합니다. 간소화된 내용을 확인해 보면, LSTM 셀은 새로운 장기 기억과 은닉 상태 값 2개가 전달됐습니다. GRU는 이 2개를 합쳐서 하나의 벡터 값으로 전달됩니다. 그리고 하나의 게이트 제어기가 삭제 게이트와 입력 게이트 역할을 동시에 수행합니다. 마지막으로 출력 게이트가 없습니다. 즉, 전체 타임 스텝마다 해당 state 벡터가 출력됩니다. 그래서 새로운 게이트 제어기가 만들어졌는데 각 타임 스텝에서 생성한 state 벡터 중 어떤 값을 main layer에 노출시킬지 결정하는 게이트 제어기가 있습니다.
GRU 셀은 RNN의 대표적인 성공 모델입니다. 한가지 아쉬운 점은 기존 RNN, LSTM에 비해 긴 시퀀스를 잘 다루긴 하지만, 매우 제한적인 단기 기억을 가지고 있습니다. 이 문제를 해결하기 위한 방법은 다음 포스팅에서 소개하겠습니다.
지난 포스팅에 이어 심층 RNN으로 여러개의 타임 스텝을 예측하는 문제에 대해 다뤄보겠습니다. 지난 포스팅에서는 1개의 타임 스텝의 예측값을 출력한 후 그 값을 입력값에 적용해서 그 다음 스텝을 예측하는 방식으로 살펴봤는데요. 성능이 썩 좋지 않았습니다. 그래서 이번에는 다른 방법을 소개해 보겠습니다.
한 번에 10 스텝 예측하기
두 번째 방법으로 볼 수 있는데, 한 번에 예측하고 싶은 개수 만큼 예측하는 방법입니다. 이번 포스팅 예제에서는 10개의 타임 스텝 값을 한 번에 예측해보도록 코드를 구현해 보겠습니다. 원리는 기존 RNN의 모델과 같은 시퀀스-투-벡터 형식인데, 결과값 벡터가 이전에는 1개로 구성돼 있다면 이번에는 10개로 구성되게끔 모델을 생성하면 됩니다.
# 데이터 새롭게 구성
n_steps = 50
series = generate_time_series(10000, n_steps+10)
# 출력값 벡터가 10개인 모델 구성에 맞게 데이터도 재구성
train_x, train_y = series[:7000, :n_steps], series[:7000, -10:, 0]
val_x, val_y = series[7000:9000, :n_steps], series[7000:9000, -10:, 0]
test_x, test_y = series[9000: , :n_steps], series[9000: , -10:, 0]
#######################################################################
# 모델
model = tf.keras.models.Sequential([
tf.keras.layers.SimpleRNN(20, return_sequences=True, input_shape=[None, 1]),
tf.keras.layers.SimpleRNN(20),
# 다음 타임 스텝 10개를 한 번에 예측하기 위한 layer
tf.keras.layers.Dense(10)
])
model.compile(
optimizer = tf.keras.optimizers.Adam(),
loss = tf.keras.losses.MeanSquaredError(),
metrics = [tf.keras.metrics.MeanSquaredError()]
)
history = model.fit(
train_x,
train_y,
epochs=20,
validation_data = (val_x, val_y)
)
# MSE 확인
pred_y = model.predict(test_x)
print(np.mean(tf.keras.losses.mean_squared_error(test_y, pred_y)
# ==> 0.01045579
테스트 데이터로 평가해 본 결과 0.01로 이전보다 높은 성능을 보이고 있습니다. 하지만 여기서 더 개선할 부분이 있습니다. 현재 모델은 시퀀스-투-벡터 형식의 모델입니다. 이 말은 위에서 구성한 모든 타임 스텝의 결과를 반영하는 것이 아니라 제일 마지막 타입 스텝의 결과 값만을 반영해서 최종 출력이 나온다는 의미입니다. 그렇다면 이 모델을 시퀀스-투-시퀀스로 바꾸면 어떻게 될까요?
이 방식을 적용한다면, 마지막 타임 스텝에서 나오는 결과뿐만 아니라 모든 타임 스텝에서 출력되는 결과를 적용할 수 있습니다. 이 말은 더 많은 오차 그레디언트가 모델에 흐른다는 의미고, 시간에 구애 받지 않습니다. 즉, 각 타임 스텝의 출력에서 그레디언트가 적용될 수 있습니다.
좀 더 풀어서 설명하면, 타임 스텝 0에서 타임 스텝 1 ~ 10까지 예측을 담은 벡터를 출력합니다. 그 다음 타임 스텝 1에서 2 ~ 11까지 예측을 담은 벡터를 출력합니다. 각 타겟 값은 입력 시퀀스와 길이가 동일합니다. 즉, 타겟 시퀀스틑 각 타임 스텝마다 10차원 벡터를 출력합니다.
기본 RNN 모델을 시퀀스-투-시퀀스 모델로 바꾸려면 모든 순환 층의 결과 값을 출력하고 반영해야 합니다. 그래서 마지막 층에도 return_sequences=True 값을 부여해줍니다. 그런 다음 각 층에서 나온 출력값을 마지막 Dense Layer에 모두 적용해야 합니다. 이 부분을 가능하게 해주는 기능이 keras에서 TimeDistributed Layer를 제공합니다.
TimeDistributed Layer 작동 원리는 각 타임 스텝을 별개의 샘플처럼 다룰 수 있도록 입력 크기를 (배치 크기, 타임 스텝 수, 입력 차원) → (배치 크기 × 타임 스텝 수, 입력 차원) 으로 바꿉니다. 우리 코드에서는 SimpleRNN 유닛이 20개로 설정해서 입력 차원 수를 20으로 세팅한 후 Dense Layer에 적용됩니다. 그리고 Dense Layer에서 출력할 때는 다시 원래대로 (배치 크기, 타임 스텝 수, 입력 차원) 로 출력 크기가 변하고, Dense 유닛을 10이라고 지정해서 입력 차원을 10으로 세팅될 것입니다. 모델을 보겠습니다.
new_model = tf.keras.models.Sequential([
tf.keras.layers.SimpleRNN(20, return_sequences=True, input_shape=[None,1]),
tf.keras.layers.SimpleRNN(20, return_sequences=True),
tf.keras.layers.TimeDistributed(tf.keras.layers.Dense(10))
])
# 훈련 때는 모든 타임 스텝의 결과를 활용해서 MSE를 계산하는데
# 최종 실제 모델 평가 결과는 마지막 타임 스텝의 출력에 대한 MSE만 계산하면 되서 사용자 정의 지표를 만들 필요가 있다.
def last_time_step_mse(y_true, y_pred):
return tf.keras.metrics.mean_squared_error(y_true[:, -1], y_pred[:, -1])
new_model.compile(
optimizer = tf.keras.optimizers.Adam(lr=0.01),
loss = 'mse',
metrics = [last_time_step_mse]
)
new_history = new_model.fit(
train_x,
Y_trin,
epochs=20,
validation_data = (val_x, Y_valid)
)
# 최종 MSE 확인
print(new_history.history['val_last_time_step_mse'][-1])
# ==> 0.006069639232009649
성능이 확실히 개선된 결과를 확인할 수 있습니다. 이 RNN 구조를 사용해서 다음 타임 스텝 10개를 예측하고 이 출력값을 다시 입력 시계열에 연결해서 다시 다음 10 타임 스텝의 값을 예측하도록 세팅할 수 있습니다.
지금까지 RNN 신경망에 대해 알아봤습니다. 한계점도 존재하는데 길이가 긴 시계열 데이터나 시퀀스 에서는 잘 작동하지 않습니다. 이 문제를 어떻게 해결할 수 있는지 다음 포스팅에서 다뤄보겠습니다.