좋은 속도 제한 알고리즘은 무엇입니까?
사이비 코드나 더 나은 파이썬을 사용할 수 있습니다.Python IRC 봇에 대한 속도 제한 대기열을 구현하려고 하는데 부분적으로 작동하지만 누군가 제한보다 적은 메시지를 트리거하고(예: 속도 제한은 8초당 5개의 메시지를 트리거하고, 사용자는 4개만 트리거함) 다음 트리거가 8초 이상(예: 16초 후)이면 봇이 메시지를 보냅니다.그러나 8초가 지났기 때문에 필요하지 않지만 대기열이 가득 차서 봇이 8초 동안 기다립니다.
여기서 메시지가 너무 빨리 도착할 때 메시지를 삭제하려면(큐가 임의로 커질 수 있으므로 큐에 대기하는 대신) 가장 간단한 알고리즘을 사용합니다.
rate = 5.0; // unit: messages
per = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds
when (message_received):
current = now();
time_passed = current - last_check;
last_check = current;
allowance += time_passed * (rate / per);
if (allowance > rate):
allowance = rate; // throttle
if (allowance < 1.0):
discard_message();
else:
forward_message();
allowance -= 1.0;
이 솔루션에는 데이터 구조, 타이머 등이 없으며 깔끔하게 작동합니다. :) 이를 확인하려면 '허용'이 초당 최대 5/8 단위, 즉 8초당 최대 5 단위로 증가합니다.전달되는 모든 메시지는 한 단위를 차감하므로 8초마다 5개 이상의 메시지를 보낼 수 없습니다.
:rate
정수여야 합니다. 즉, 0이 아닌 소수 부분이 없으면 알고리즘이 올바르게 작동하지 않습니다(평균 속도는 다음과 같습니다).rate/per
예를 들어.rate=0.5; per=1.0;
다음과 같은 이유로 작동하지 않습니다.allowance
1.0으로 절대 증가하지 않습니다.그렇지만rate=1.0; per=2.0;
잘 작동합니다.
대기열 기능을 수행하기 전에 이 데코레이터 @RateLimited(ratepersec)를 사용하십시오.
기본적으로 마지막 시간 이후로 1/rate초가 경과했는지 확인하고 그렇지 않으면 나머지 시간 동안 대기합니다. 그렇지 않으면 대기하지 않습니다.이는 사실상 초당 속도로 제한됩니다.데코레이터는 당신이 원하는 모든 기능에 적용할 수 있습니다.
이 경우 8초당 최대 5개의 메시지를 원하는 경우 sendToQueue 기능 전에 @RateLimited(0.625)를 사용합니다.
import time
def RateLimited(maxPerSecond):
minInterval = 1.0 / float(maxPerSecond)
def decorate(func):
lastTimeCalled = [0.0]
def rateLimitedFunction(*args,**kargs):
elapsed = time.clock() - lastTimeCalled[0]
leftToWait = minInterval - elapsed
if leftToWait>0:
time.sleep(leftToWait)
ret = func(*args,**kargs)
lastTimeCalled[0] = time.clock()
return ret
return rateLimitedFunction
return decorate
@RateLimited(2) # 2 per second at most
def PrintNumber(num):
print num
if __name__ == "__main__":
print "This should print 1,2,3... at about 2 per second."
for i in range(1,100):
PrintNumber(i)
토큰 버킷은 구현이 매우 간단합니다.
토큰이 5개인 버킷으로 시작합니다.
5/8초마다:버킷에 토큰이 5개 미만이면 토큰을 하나 추가합니다.
메시지를 보낼 때마다:버킷에 토큰이 1개 이상 있으면 토큰 하나를 꺼내 메시지를 보냅니다.그렇지 않으면 메시지를 기다리거나 삭제합니다.
(따라서 실제 코드에서는 실제 토큰 대신 정수 카운터를 사용하고 타임스탬프를 저장하여 5/8초 단계마다 최적화할 수 있습니다.)
질문을 다시 읽어보십시오. 속도 제한이 8초마다 완전히 재설정되는 경우 다음과 같은 수정 사항이 있습니다.
시작합니다.last_send
오래전예에 (를들어에, 그시대)) 같은 버킷으로 합니다.또한 동일한 5 토큰 버킷으로 시작합니다.
5/8초마다 규칙을 실행합니다.
메시지를 보낼 때마다:먼저, 확인합니다.last_send
8초전에을 채웁니다(토큰).그런 경우 버킷을 채웁니다(토큰 5개로 설정).둘째, 버킷에 토큰이 있으면 메시지를 보냅니다(그렇지 않으면 드롭/대기/등). 째셋, 세트를 설정합니다.last_send
지금까지
그것은 그 시나리오에 효과가 있을 것입니다.
저는 실제로 이런 전략(첫 번째 접근법)을 사용하여 IRC 봇을 작성했습니다.Python이 아닌 Perl로 되어 있지만 다음은 설명할 코드입니다.
여기서 첫 번째 부분에서는 버킷에 토큰을 추가하는 작업을 다룹니다.시간을 기준으로 토큰을 추가하는 최적화를 확인할 수 있으며(두 번째 줄부터 마지막 줄까지) 마지막 줄은 버킷 내용을 최대로 클램프합니다(MESSAGE_BURST).
my $start_time = time;
...
# Bucket handling
my $bucket = $conn->{fujiko_limit_bucket};
my $lasttx = $conn->{fujiko_limit_lasttx};
$bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;
$dll은 전달되는 데이터 구조입니다.이것은 일상적으로 실행되는 메서드 안에 있습니다(다음 번에 작업할 시간을 계산하고 네트워크 트래픽이 발생할 때까지 사용).메소드의 다음 부분은 전송을 처리합니다.메시지와 관련된 우선순위가 있기 때문에 다소 복잡합니다.
# Queue handling. Start with the ultimate queue.
my $queues = $conn->{fujiko_queues};
foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
# Ultimate is special. We run ultimate no matter what. Even if
# it sends the bucket negative.
--$bucket;
$entry->{code}(@{$entry->{args}});
}
$queues->[PRIORITY_ULTIMATE] = [];
이것이 무슨 일이 있어도 실행되는 첫 번째 대기열입니다.홍수 때문에 우리의 연결이 끊어지더라도 말입니다.서버의 PING에 응답하는 것과 같은 매우 중요한 일에 사용됩니다.다음, 나머지 대기열:
# Continue to the other queues, in order of priority.
QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
my $queue = $queues->[$pri];
while (scalar(@$queue)) {
if ($bucket < 1) {
# continue later.
$need_more_time = 1;
last QRUN;
} else {
--$bucket;
my $entry = shift @$queue;
$entry->{code}(@{$entry->{args}});
}
}
}
마지막으로 버킷 상태는 $conn 데이터 구조에 다시 저장됩니다(실제로는 메소드에서 조금 뒤에 먼저 작업량이 얼마나 더 늘어날지 계산합니다).
# Save status.
$conn->{fujiko_limit_bucket} = $bucket;
$conn->{fujiko_limit_lasttx} = $start_time;
보시다시피, 실제 버킷 처리 코드는 약 4줄로 매우 작습니다.나머지 코드는 우선순위 대기열 처리입니다.봇에는 우선 순위 대기열이 있으므로 예를 들어, 누군가가 봇과 채팅하는 것이 중요한 킥/금지 작업을 수행하는 것을 막을 수 없습니다.
메시지가 전송될 때까지 처리를 차단하여 추가 메시지를 대기열에 표시하기 위해 Antti의 아름다운 솔루션도 다음과 같이 수정할 수 있습니다.
rate = 5.0; // unit: messages
per = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds
when (message_received):
current = now();
time_passed = current - last_check;
last_check = current;
allowance += time_passed * (rate / per);
if (allowance > rate):
allowance = rate; // throttle
if (allowance < 1.0):
time.sleep( (1-allowance) * (per/rate))
forward_message();
allowance = 0.0;
else:
forward_message();
allowance -= 1.0;
메시지를 보낼 수 있는 충분한 여유가 있을 때까지 기다립니다.두 배의 비율로 시작하지 않으려면 허용량이 0으로 초기화될 수도 있습니다.
한 가지 해결책은 각 대기열 항목에 타임스탬프를 첨부하고 8초가 지나면 항목을 삭제하는 것입니다.대기열이 추가될 때마다 이 검사를 수행할 수 있습니다.
이 기능은 대기열 크기를 5로 제한하고 대기열이 꽉 찬 상태에서 추가 사항을 삭제하는 경우에만 작동합니다.
마지막 다섯 줄이 전송된 시간을 유지합니다.대기 중인 메시지가 5번째로 최근 메시지(존재하는 경우)가 8초 이상 지난 시간(last_5를 시간 배열로 사용)이 될 때까지 대기합니다.
now = time.time()
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
last_five.insert(0, now)
send_message(msg)
if len(last_five) > 5:
last_five.pop()
여전히 관심이 있는 사람이 있다면, 저는 이 단순한 호출 가능 클래스를 시간 지정된 LRU 키 값 저장소와 함께 사용하여 IP당 요청 속도를 제한합니다.데케를 사용하지만 대신 목록과 함께 사용하도록 다시 작성할 수 있습니다.
from collections import deque
import time
class RateLimiter:
def __init__(self, maxRate=5, timeUnit=1):
self.timeUnit = timeUnit
self.deque = deque(maxlen=maxRate)
def __call__(self):
if self.deque.maxlen == len(self.deque):
cTime = time.time()
if cTime - self.deque[0] > self.timeUnit:
self.deque.append(cTime)
return False
else:
return True
self.deque.append(time.time())
return False
r = RateLimiter()
for i in range(0,100):
time.sleep(0.1)
print(i, "block" if r() else "pass")
단지 수락된 답변의 코드를 파이썬으로 구현한 것입니다.
import time
class Object(object):
pass
def get_throttler(rate, per):
scope = Object()
scope.allowance = rate
scope.last_check = time.time()
def throttler(fn):
current = time.time()
time_passed = current - scope.last_check;
scope.last_check = current;
scope.allowance = scope.allowance + time_passed * (rate / per)
if (scope.allowance > rate):
scope.allowance = rate
if (scope.allowance < 1):
pass
else:
fn()
scope.allowance = scope.allowance - 1
return throttler
이거 어때:
long check_time = System.currentTimeMillis();
int msgs_sent_count = 0;
private boolean isRateLimited(int msgs_per_sec) {
if (System.currentTimeMillis() - check_time > 1000) {
check_time = System.currentTimeMillis();
msgs_sent_count = 0;
}
if (msgs_sent_count > (msgs_per_sec - 1)) {
return true;
} else {
msgs_sent_count++;
}
return false;
}
스칼라의 변형이 필요했습니다.여기 있습니다.
case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A ⇒ B) extends (A ⇒ B) {
import Thread.sleep
private def now = System.currentTimeMillis / 1000.0
private val (calls, sec) = callsPerSecond
private var allowance = 1.0
private var last = now
def apply(a: A): B = {
synchronized {
val t = now
val delta_t = t - last
last = t
allowance += delta_t * (calls / sec)
if (allowance > calls)
allowance = calls
if (allowance < 1d) {
sleep(((1 - allowance) * (sec / calls) * 1000d).toLong)
}
allowance -= 1
}
f(a)
}
}
사용 방법은 다음과 같습니다.
val f = Limiter((5d, 8d), {
_: Unit ⇒
println(System.currentTimeMillis)
})
while(true){f(())}
또 다른 해결책
from collections import deque
from datetime import timedelta
from time import sleep
class RateLimiter:
def __init__(self, items: int, per: timedelta = timedelta(seconds=1)):
self.items = items
self.per = per
self.deque = deque(maxlen=items)
def count(self):
now = datetime.now()
self.deque.append(now)
def time_to_wait(self) -> timedelta:
if len(self.deque) < self.deque.maxlen:
return timedelta(0)
now = datetime.now()
per = now - self.deque[0]
return max(timedelta(0), self.per - per)
def throttle(self):
sleep(self.time_to_wait().total_seconds())
self.count()
if __name__ == '__main__':
rate_limiter = RateLimiter(items=3, per=timedelta(seconds=3))
for i in range(10):
rate_limiter.throttle()
print(f'{i}')
자바 구문, 주요 아이디어: 반복을 세지 않고, 윤달 시간을 세는 것.마지막 도약 시간을 기억하고 도약에 필요한 시간이 속도를 초과하지 않을 때까지 기다립니다.
public static void limitRate(int rate, AtomicLong leapTime, ReentrantLock rateLock) {
if (rate == 0)
return;
long targetLeapTime = 1_000_000_000 / rate;
rateLock.lock();
try {
long timeSnapshot = nanoTime();
long parkTime = targetLeapTime - (timeSnapshot - leapTime.get());
if (parkTime > 0) {
parkNanos(parkTime);
leapTime.set(timeSnapshot + parkTime);
} else {
leapTime.set(timeSnapshot);
}
} finally {
rateLock.unlock();
}
}
언급URL : https://stackoverflow.com/questions/667508/whats-a-good-rate-limiting-algorithm
'programing' 카테고리의 다른 글
node_modules 폴더 안에 있는 스크립트를 포함하는 방법은 무엇입니까? (0) | 2023.06.09 |
---|---|
아이폰 시뮬레이터에서 카메라를 테스트하려면 어떻게 해야 합니까? (0) | 2023.06.09 |
vuex 스토어 토큰이 구성 요소 간에 업데이트되지 않음 (0) | 2023.06.09 |
오류: 참조 또는 변수에 할당할 수 없습니다!앵귤러 4 (0) | 2023.06.09 |
동적으로 작성된 단추에 코드 할당 (0) | 2023.06.09 |