programing

좋은 속도 제한 알고리즘은 무엇입니까?

css3 2023. 6. 9. 22:17

좋은 속도 제한 알고리즘은 무엇입니까?

사이비 코드나 더 나은 파이썬을 사용할 수 있습니다.Python IRC 봇에 대한 속도 제한 대기열을 구현하려고 하는데 부분적으로 작동하지만 누군가 제한보다 적은 메시지를 트리거하고(예: 속도 제한은 8초당 5개의 메시지를 트리거하고, 사용자는 4개만 트리거함) 다음 트리거가 8초 이상(예: 16초 후)이면 봇이 메시지를 보냅니다.그러나 8초가 지났기 때문에 필요하지 않지만 대기열이 가득 차서 봇이 8초 동안 기다립니다.

여기서 메시지가 너무 빨리 도착할 때 메시지를 삭제하려면(큐가 임의로 커질 수 있으므로 큐에 대기하는 대신) 가장 간단한 알고리즘을 사용합니다.

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    discard_message();
  else:
    forward_message();
    allowance -= 1.0;

이 솔루션에는 데이터 구조, 타이머 등이 없으며 깔끔하게 작동합니다. :) 이를 확인하려면 '허용'이 초당 최대 5/8 단위, 즉 8초당 최대 5 단위로 증가합니다.전달되는 모든 메시지는 한 단위를 차감하므로 8초마다 5개 이상의 메시지를 보낼 수 없습니다.

:rate정수여야 합니다. 즉, 0이 아닌 소수 부분이 없으면 알고리즘이 올바르게 작동하지 않습니다(평균 속도는 다음과 같습니다).rate/per예를 들어.rate=0.5; per=1.0;다음과 같은 이유로 작동하지 않습니다.allowance1.0으로 절대 증가하지 않습니다.그렇지만rate=1.0; per=2.0;잘 작동합니다.

대기열 기능을 수행하기 전에 이 데코레이터 @RateLimited(ratepersec)를 사용하십시오.

기본적으로 마지막 시간 이후로 1/rate초가 경과했는지 확인하고 그렇지 않으면 나머지 시간 동안 대기합니다. 그렇지 않으면 대기하지 않습니다.이는 사실상 초당 속도로 제한됩니다.데코레이터는 당신이 원하는 모든 기능에 적용할 수 있습니다.

이 경우 8초당 최대 5개의 메시지를 원하는 경우 sendToQueue 기능 전에 @RateLimited(0.625)를 사용합니다.

import time

def RateLimited(maxPerSecond):
    minInterval = 1.0 / float(maxPerSecond)
    def decorate(func):
        lastTimeCalled = [0.0]
        def rateLimitedFunction(*args,**kargs):
            elapsed = time.clock() - lastTimeCalled[0]
            leftToWait = minInterval - elapsed
            if leftToWait>0:
                time.sleep(leftToWait)
            ret = func(*args,**kargs)
            lastTimeCalled[0] = time.clock()
            return ret
        return rateLimitedFunction
    return decorate

@RateLimited(2)  # 2 per second at most
def PrintNumber(num):
    print num

if __name__ == "__main__":
    print "This should print 1,2,3... at about 2 per second."
    for i in range(1,100):
        PrintNumber(i)

토큰 버킷은 구현이 매우 간단합니다.

토큰이 5개인 버킷으로 시작합니다.

5/8초마다:버킷에 토큰이 5개 미만이면 토큰을 하나 추가합니다.

메시지를 보낼 때마다:버킷에 토큰이 1개 이상 있으면 토큰 하나를 꺼내 메시지를 보냅니다.그렇지 않으면 메시지를 기다리거나 삭제합니다.

(따라서 실제 코드에서는 실제 토큰 대신 정수 카운터를 사용하고 타임스탬프를 저장하여 5/8초 단계마다 최적화할 수 있습니다.)


질문을 다시 읽어보십시오. 속도 제한이 8초마다 완전히 재설정되는 경우 다음과 같은 수정 사항이 있습니다.

시작합니다.last_send오래전예에 (를들어에, 그시대)) 같은 버킷으로 합니다.또한 동일한 5 토큰 버킷으로 시작합니다.

5/8초마다 규칙을 실행합니다.

메시지를 보낼 때마다:먼저, 확인합니다.last_send8초전을 채웁니다(토큰).그런 경우 버킷을 채웁니다(토큰 5개로 설정).둘째, 버킷에 토큰이 있으면 메시지를 보냅니다(그렇지 않으면 드롭/대기/등). 째셋, 세트를 설정합니다.last_send지금까지

그것은 그 시나리오에 효과가 있을 것입니다.


저는 실제로 이런 전략(첫 번째 접근법)을 사용하여 IRC 봇을 작성했습니다.Python이 아닌 Perl로 되어 있지만 다음은 설명할 코드입니다.

여기서 첫 번째 부분에서는 버킷에 토큰을 추가하는 작업을 다룹니다.시간을 기준으로 토큰을 추가하는 최적화를 확인할 수 있으며(두 번째 줄부터 마지막 줄까지) 마지막 줄은 버킷 내용을 최대로 클램프합니다(MESSAGE_BURST).

    my $start_time = time;
    ...
    # Bucket handling
    my $bucket = $conn->{fujiko_limit_bucket};
    my $lasttx = $conn->{fujiko_limit_lasttx};
    $bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
    ($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;

$dll은 전달되는 데이터 구조입니다.이것은 일상적으로 실행되는 메서드 안에 있습니다(다음 번에 작업할 시간을 계산하고 네트워크 트래픽이 발생할 때까지 사용).메소드의 다음 부분은 전송을 처리합니다.메시지와 관련된 우선순위가 있기 때문에 다소 복잡합니다.

    # Queue handling. Start with the ultimate queue.
    my $queues = $conn->{fujiko_queues};
    foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
            # Ultimate is special. We run ultimate no matter what. Even if
            # it sends the bucket negative.
            --$bucket;
            $entry->{code}(@{$entry->{args}});
    }
    $queues->[PRIORITY_ULTIMATE] = [];

이것이 무슨 일이 있어도 실행되는 첫 번째 대기열입니다.홍수 때문에 우리의 연결이 끊어지더라도 말입니다.서버의 PING에 응답하는 것과 같은 매우 중요한 일에 사용됩니다.다음, 나머지 대기열:

    # Continue to the other queues, in order of priority.
    QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
            my $queue = $queues->[$pri];
            while (scalar(@$queue)) {
                    if ($bucket < 1) {
                            # continue later.
                            $need_more_time = 1;
                            last QRUN;
                    } else {
                            --$bucket;
                            my $entry = shift @$queue;
                            $entry->{code}(@{$entry->{args}});
                    }
            }
    }

마지막으로 버킷 상태는 $conn 데이터 구조에 다시 저장됩니다(실제로는 메소드에서 조금 뒤에 먼저 작업량이 얼마나 더 늘어날지 계산합니다).

    # Save status.
    $conn->{fujiko_limit_bucket} = $bucket;
    $conn->{fujiko_limit_lasttx} = $start_time;

보시다시피, 실제 버킷 처리 코드는 약 4줄로 매우 작습니다.나머지 코드는 우선순위 대기열 처리입니다.봇에는 우선 순위 대기열이 있으므로 예를 들어, 누군가가 봇과 채팅하는 것이 중요한 킥/금지 작업을 수행하는 것을 막을 수 없습니다.

메시지가 전송될 때까지 처리를 차단하여 추가 메시지를 대기열에 표시하기 위해 Antti의 아름다운 솔루션도 다음과 같이 수정할 수 있습니다.

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    time.sleep( (1-allowance) * (per/rate))
    forward_message();
    allowance = 0.0;
  else:
    forward_message();
    allowance -= 1.0;

메시지를 보낼 수 있는 충분한 여유가 있을 때까지 기다립니다.두 배의 비율로 시작하지 않으려면 허용량이 0으로 초기화될 수도 있습니다.

한 가지 해결책은 각 대기열 항목에 타임스탬프를 첨부하고 8초가 지나면 항목을 삭제하는 것입니다.대기열이 추가될 때마다 이 검사를 수행할 수 있습니다.

이 기능은 대기열 크기를 5로 제한하고 대기열이 꽉 찬 상태에서 추가 사항을 삭제하는 경우에만 작동합니다.

마지막 다섯 줄이 전송된 시간을 유지합니다.대기 중인 메시지가 5번째로 최근 메시지(존재하는 경우)가 8초 이상 지난 시간(last_5를 시간 배열로 사용)이 될 때까지 대기합니다.

now = time.time()
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
    last_five.insert(0, now)
    send_message(msg)
if len(last_five) > 5:
    last_five.pop()

여전히 관심이 있는 사람이 있다면, 저는 이 단순한 호출 가능 클래스를 시간 지정된 LRU 키 값 저장소와 함께 사용하여 IP당 요청 속도를 제한합니다.데케를 사용하지만 대신 목록과 함께 사용하도록 다시 작성할 수 있습니다.

from collections import deque
import time


class RateLimiter:
    def __init__(self, maxRate=5, timeUnit=1):
        self.timeUnit = timeUnit
        self.deque = deque(maxlen=maxRate)

    def __call__(self):
        if self.deque.maxlen == len(self.deque):
            cTime = time.time()
            if cTime - self.deque[0] > self.timeUnit:
                self.deque.append(cTime)
                return False
            else:
                return True
        self.deque.append(time.time())
        return False

r = RateLimiter()
for i in range(0,100):
    time.sleep(0.1)
    print(i, "block" if r() else "pass")

단지 수락된 답변의 코드를 파이썬으로 구현한 것입니다.

import time

class Object(object):
    pass

def get_throttler(rate, per):
    scope = Object()
    scope.allowance = rate
    scope.last_check = time.time()
    def throttler(fn):
        current = time.time()
        time_passed = current - scope.last_check;
        scope.last_check = current;
        scope.allowance = scope.allowance + time_passed * (rate / per)
        if (scope.allowance > rate):
          scope.allowance = rate
        if (scope.allowance < 1):
          pass
        else:
          fn()
          scope.allowance = scope.allowance - 1
    return throttler

이거 어때:

long check_time = System.currentTimeMillis();
int msgs_sent_count = 0;

private boolean isRateLimited(int msgs_per_sec) {
    if (System.currentTimeMillis() - check_time > 1000) {
        check_time = System.currentTimeMillis();
        msgs_sent_count = 0;
    }

    if (msgs_sent_count > (msgs_per_sec - 1)) {
        return true;
    } else {
        msgs_sent_count++;
    }

    return false;
}

스칼라의 변형이 필요했습니다.여기 있습니다.

case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A ⇒ B) extends (A ⇒ B) {

  import Thread.sleep
  private def now = System.currentTimeMillis / 1000.0
  private val (calls, sec) = callsPerSecond
  private var allowance  = 1.0
  private var last = now

  def apply(a: A): B = {
    synchronized {
      val t = now
      val delta_t = t - last
      last = t
      allowance += delta_t * (calls / sec)
      if (allowance > calls)
        allowance = calls
      if (allowance < 1d) {
        sleep(((1 - allowance) * (sec / calls) * 1000d).toLong)
      }
      allowance -= 1
    }
    f(a)
  }

}

사용 방법은 다음과 같습니다.

val f = Limiter((5d, 8d), { 
  _: Unit ⇒ 
    println(System.currentTimeMillis) 
})
while(true){f(())}

또 다른 해결책

from collections import deque
from datetime import timedelta
from time import sleep

class RateLimiter:
    def __init__(self, items: int, per: timedelta = timedelta(seconds=1)):
        self.items = items
        self.per = per
        self.deque = deque(maxlen=items)

    def count(self):
        now = datetime.now()
        self.deque.append(now)

    def time_to_wait(self) -> timedelta:
        if len(self.deque) < self.deque.maxlen:
            return timedelta(0)
        now = datetime.now()
        per = now - self.deque[0]
        return max(timedelta(0), self.per - per)

    def throttle(self):
        sleep(self.time_to_wait().total_seconds())
        self.count()

if __name__ == '__main__':
    rate_limiter = RateLimiter(items=3, per=timedelta(seconds=3))

    for i in range(10):
        rate_limiter.throttle()
        print(f'{i}')

자바 구문, 주요 아이디어: 반복을 세지 않고, 윤달 시간을 세는 것.마지막 도약 시간을 기억하고 도약에 필요한 시간이 속도를 초과하지 않을 때까지 기다립니다.

public static void limitRate(int rate, AtomicLong leapTime, ReentrantLock rateLock) {
    if (rate == 0)
        return;
    long targetLeapTime = 1_000_000_000 / rate;
    rateLock.lock();
    try {
        long timeSnapshot = nanoTime();
        long parkTime = targetLeapTime - (timeSnapshot - leapTime.get());
        if (parkTime > 0) {
            parkNanos(parkTime);
            leapTime.set(timeSnapshot + parkTime);
        } else {
            leapTime.set(timeSnapshot);
        }
    } finally {
        rateLock.unlock();
    }
}

언급URL : https://stackoverflow.com/questions/667508/whats-a-good-rate-limiting-algorithm