Redis Throttled Queue¶
This utility class is a wrapper around the Redis Queue utility class. Using Redis as a Queue is great, but how do we protect against concurrent data access, if we want more than one process to be pushing and popping from the queue? This is where the RedisThrottleQueue comes in handy.
RedisThrottleQueue allows for multi-threaded and/or multiprocess access to the same Redis key being used as a
RedisQueue. The throttle acts as a wrapper around the core queue’s
pop() method, and permits access to the data only when the throttle allows it. This is highly utilized by Scrapy Cluster’s Distributed Scheduler for controlling how fast the cluster crawls a particular site.
Redis already has an official RedLock implementation for doing distributed locks, so why reinvent the wheel? Our locking implementation focuses around Scrapy Cluster’s use case, which is rate limiting Request pops to the spiders. We do not need to lock the key when pushing new requests into the queue, only for when we wish to read from it.
If you lock the key, it prevents all other processes from both pushing into and popping from the queue. The only guarantee we need to make is that for any particular queue, the transaction to
pop() is atomic. This means that only one process at a time can pop the queue, and it guarantees that no other process was able to pop that same item.
In Scrapy Cluster’s use case, it means that when a Spider process says “Give me a Request”, we can be certain that no other Spider process has received that exact request. To get a new request, the Throttled Queue tries to execute a series of operations against the lock before any other process can. If another process succeeds in performing the same series of operations before it, that process is returned the item and former is denied. These steps are repeated for all processes trying to pop from the queue at any given time.
In practice, the Redlock algorithm has many safeguards in place to do true concurrent process locking on keys, but as explained above, Scrapy Cluster does not need all of those extra features. Because those features significantly slow down the
pop() mechanism, the
RedisThrottledQueue was born.
RedisThrottledQueue(redisConn, myQueue, throttleWindow, throttleLimit, moderate=False, windowName=None, modName=None, elastic=False, elastic_buffer=0)¶
- redisConn – The Redis connection
- myQueue – The RedisQueue class instance
- throttleWindow (int) – The time window to throttle pop requests (in seconds).
- throttleLimit (int) – The number of queue pops allows in the time window
- moderation (int) – Queue pop requests are moderated to be evenly distributed throughout the window
- windowName (str) – Use a custom rolling window Redis key name
- modName (str) – Use a custom moderation key name in Redis
- elastic – When moderated and falling behind, break moderation to catch up to desired limit
- elastic_buffer – The threshold number for how close we should get to the limit when using the elastic catch up
Parameters: args – The arguments to pass through to the queue’s
Parameters: args – Pop arguments to pass through the the queue’s
Returns: None if no object can be found, otherwise returns the object.
Removes all data associated with the Throttle and Queue.
Returns: The number of items in the Queue Usage:
If you would like to throttle your Redis queue, you need to pass the queue in as part of the
>>> import redis >>> from scutils.redis_queue import RedisPriorityQueue >>> from scutils.redis_throttled_queue import RedisThrottledQueue >>> redis_conn = redis.Redis(host='scdev', port=6379) >>> queue = RedisPriorityQueue(redis_conn, 'my_key') >>> t = RedisThrottledQueue(redis_conn, queue, 10, 5) >>> t.push('item', 5) >>> t.push('item2', 10) >>> t.pop() 'item2' >>> t.pop() 'item'
The throttle merely acts as a wrapper around your queue, returning items only when allowed. You can use the same methods the original
RedisQueue provides, like
Due to the distributed nature of the throttled queue, when using the
elastic=True argument the queue must successfully pop the number of
limit items before the elastic catch up will take effect.
The Redis Throttled Queue really shines when multiple processes are trying to pop from the queue. There is a small test script under
utils/examples/example_rtq.py that allows you to tinker with all of the different settings the throttled queue provides. The script is shown below for convenience.
import sys def main(): import argparse import redis import time import random import sys from os import path sys.path.append(path.dirname(path.dirname(path.abspath(__file__)))) from scutils.redis_queue import RedisPriorityQueue from scutils.redis_throttled_queue import RedisThrottledQueue parser = argparse.ArgumentParser(description="Throttled Queue Test Script." " Start either a single or multiple processes to see the " " throttled queue mechanism in action.") parser.add_argument('-r', '--redis-host', action='store', required=True, help="The Redis host ip") parser.add_argument('-p', '--redis-port', action='store', default='6379', help="The Redis port") parser.add_argument('-m', '--moderate', action='store_const', const=True, default=False, help="Moderate the outbound Queue") parser.add_argument('-w', '--window', action='store', default=60, help="The window time to test") parser.add_argument('-n', '--num-hits', action='store', default=10, help="The number of pops allowed in the given window") parser.add_argument('-q', '--queue', action='store', default='testqueue', help="The Redis queue name") parser.add_argument('-e', '--elastic', action='store_const', const=True, default=False, help="Test variable elastic catch up" " with moderation") args = vars(parser.parse_args()) window = int(args['window']) num = int(args['num_hits']) host = args['redis_host'] port = args['redis_port'] mod = args['moderate'] queue = args['queue'] elastic = args['elastic'] conn = redis.Redis(host=host, port=port) q = RedisPriorityQueue(conn, queue) t = RedisThrottledQueue(conn, q, window, num, mod, elastic=elastic) def push_items(amount): for i in range(0, amount): t.push('item-'+str(i), i) print "Adding", num * 2, "items for testing" push_items(num * 2) def read_items(): print "Kill when satisfied ^C" ti = time.time() count = 0 while True: item = t.pop() if item: print "My item", item, "My time:", time.time() - ti count += 1 if elastic: time.sleep(int(random.random() * (t.moderation * 3))) try: read_items() except KeyboardInterrupt: pass t.clear() print "Finished" if __name__ == "__main__": sys.exit(main())
The majority of this script allows you to alter how the throttled queue is created, most importantly allowing you to change the window, hits, and moderation flag. If you spin up more than one process, you will find that any single ‘item’ popped from the queue is given to only one process. The latter portion of the script either pushes items into the queue (
item-29) or sits there and tries to
Spinning up two instances with exactly the same settings will give you similar results to the following.
When spinning up multiple processes acting upon the same throttled queue, it is extremely important they have the exact same settings! Otherwise your processes will impose different restrictions on the throttle lock with undesired results.
Note that each process inserts exactly the same items into the priority queue.
$ python example_rtq.py -r scdev -w 30 -n 15 -m Adding 30 items for testing Kill when satisfied ^C My item item-29 My time: 0.00285792350769 My item item-29 My time: 1.99865794182 My item item-27 My time: 6.05912590027 My item item-26 My time: 8.05791592598 My item item-23 My time: 14.0749168396 My item item-21 My time: 18.078263998 My item item-20 My time: 20.0878069401 My item item-19 My time: 22.0930709839 My item item-18 My time: 24.0957789421 My item item-14 My time: 36.1192228794 My item item-13 My time: 38.1225728989 My item item-11 My time: 42.1282589436 My item item-8 My time: 48.1387839317 My item item-5 My time: 54.1379349232 My item item-2 My time: 64.5046479702 My item item-1 My time: 66.508150816 My item item-0 My time: 68.5079059601
# this script was started slightly after process 1 $ python example_rtq.py -r scdev -w 30 -n 15 -m Adding 30 items for testing Kill when satisfied ^C My item item-28 My time: 2.95087885857 My item item-25 My time: 9.01049685478 My item item-24 My time: 11.023993969 My item item-22 My time: 15.0343868732 My item item-17 My time: 28.9568138123 My item item-16 My time: 31.0645618439 My item item-15 My time: 33.4570579529 My item item-12 My time: 39.0780348778 My item item-10 My time: 43.0874598026 My item item-9 My time: 45.0917098522 My item item-7 My time: 49.0903818607 My item item-6 My time: 51.0908298492 My item item-4 My time: 59.0306549072 My item item-3 My time: 61.0654230118
Notice there is a slight drift due to the queue being moderated (most noticeable in process 1), meaning that the throttle only allows the queue to be popped after the moderation time has passed. In our case, 30 seconds divided by 15 hits means that the queue should be popped only after 2 seconds has passed.
If we did not pass the
-m for moderated flag, your process output may look like the following.
$ python example_rtq.py -r scdev -w 10 -n 10 Adding 20 items for testing Kill when satisfied ^C My item item-19 My time: 0.00159978866577 My item item-18 My time: 0.0029239654541 My item item-17 My time: 0.00445079803467 My item item-16 My time: 0.00595998764038 My item item-15 My time: 0.00703096389771 My item item-14 My time: 0.00823283195496 My item item-13 My time: 0.00951099395752 My item item-12 My time: 0.0107297897339 My item item-11 My time: 0.0118489265442 My item item-10 My time: 0.0128898620605 My item item-13 My time: 10.0101749897 My item item-11 My time: 10.0123429298 My item item-10 My time: 10.0135369301 My item item-9 My time: 20.0031509399 My item item-8 My time: 20.0043399334 My item item-6 My time: 20.0072448254 My item item-5 My time: 20.0084438324 My item item-4 My time: 20.0097179413
$ python example_rtq.py -r scdev -w 10 -n 10 Adding 20 items for testing Kill when satisfied ^C My item item-19 My time: 9.12855100632 My item item-18 My time: 9.12996697426 My item item-17 My time: 9.13133692741 My item item-16 My time: 9.13272404671 My item item-15 My time: 9.13406801224 My item item-14 My time: 9.13519310951 My item item-12 My time: 9.13753604889 My item item-7 My time: 19.1323649883 My item item-3 My time: 19.1368720531 My item item-2 My time: 19.1381940842 My item item-1 My time: 19.1394021511 My item item-0 My time: 19.1405911446
Notice that when unmoderated, Process 1 pops all available items in about one hundredth of a second. By the time we switched terminals, Process 2 doesn’t have any items to pop and re-adds the 20 items to the queue. In the next 10 second increments, you can see each process receiving items when it is able to successfully pop from the same Redis Queue.
Feel free to mess with the arguments to
example_rtq.py, and figure out what kind of pop throttling works best for your use case.