queue

阅读: 23745     评论:0

当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ、ZeroMQ,炙手可热的Kafka,还有阿里巴巴自主开发的Notify、MetaQ、RocketMQ等。这些都是大型的重量级消息队列,通常应用于商业生产环境。但是,如果只是小型服务或者任务量不大,再或者学习、实验、测试等情况下,你有必要去搭建或者购买一个本身就已经很庞大的消息服务么?杀鸡焉用牛刀,当然不需要!

Python为我们内置了一个微型轻量级的消息队列模块,queue!queue模块主要用于多生产者和消费者模式下的队列实现,特别适合多线程时的消息交换。它实现了常见的锁语法,临时阻塞线程,防止竞争,这有赖于Python对线程的支持。

image.png-80.9kB

queue模块实现了三种队列:

FIFO:先进先出队列,类似管道。元素只能从队头方向一个一个的弹出,只能从队尾一个一个的放入。

image.png-101.2kB

LIFO:后进先出队列,也就是栈。元素永远只能在栈顶出入。

image.png-61.8kB

priority queue:优先级队列,每个元素都带有一个优先值,值越小的越早出去。值相同的,先进入队列的先出去。

image.png-278.6kB

queue模块定义了下面几个类和异常(一定要注意大小写!) :

class queue.Queue(maxsize=0):

FIFO队列构造器。maxsize是队列里最多能同时存在的元素个数。如果队列满了,则会暂时阻塞队列,直到有消费者取走元素。maxsize的值如果小于或等于零,表示队列元素个数不设上限,理论上可无穷个,但要小心,内存不是无限大的,这样可能会让你的内存溢出。

class queue.LifoQueue(maxsize=0)

LIFO队列构造器。maxsize是队列里最多能同时放置的元素个数。如果队列满了,则会暂时阻塞队列,直到有消费者取走元素。maxsize的值如果小于或等于零,表示队列元素个数不设上限,可无穷个。

class queue.PriorityQueue(maxsize=0)

优先级队列构造器。maxsize是队列里最多能同时放置的元素个数。如果队列满了,则会暂时阻塞队列,直到有消费者取走元素。maxsize的值如果小于或等于零,表示队列元素个数不设上限,可无穷个。通常在这类队列中,元素的优先顺序是按sorted(list(entries))[0]的结果来定义的,而元素的结构形式通常是(priority_number, data)类型的元组。

exception queue.Empty

从空的队列里请求元素的时候,弹出该异常。

exception queue.Full

往满的队列里放入元素的时候,弹出该异常。

Queue对象

三种队列类的对象都提供了以下通用的方法:

Queue.qsize()

返回当前队列内的元素的个数。注意,qsize()大于零不等于下一个get()方法一定不会被阻塞,qsize()小于maxsize也不表示下一个put()方法一定不会被阻塞。

Queue.empty()

队列为空则返回True,否则返回False。同样地,返回True不表示下一个put()方法一定不会被阻塞。返回False不表示下一个get()一定不会被阻塞。

Queue.full()

与empty()方法正好相反。同样不保证下一步的操作不被阻塞。

Queue.put(item, block=True, timeout=None)

item参数表示具体要放入队列的元素。block和timeout两个参数配合使用。其中,如果block=True,timeout=None,队列阻塞,直到有空槽出现;当block=True,timeout=正整数N,如果在等待了N秒后,队列还没有空槽,则弹出Full异常;如果block=False,则timeout参数被忽略,队列有空槽则立即放入,如果没空槽,则弹出Full异常。

Queue.put_nowait(item)

等同于put(item, False)

Queue.get(block=True, timeout=None)

从队列内删除并返回一个元素。如果block=True, timeout=None,队列会阻塞,直到有可供弹出的元素。如果timeout指定为一个正整数N,则在N秒内如果队列内没有可供弹出的元素,则抛出Empty异常。如果block=False,timeout参数会被忽略,此时队列内如果有元素则直接弹出,无元素可弹,则抛出Empty异常。

Queue.get_nowait()

等同于get(False).

下面的两个方法用于跟踪排队的任务是否被消费者守护线程完全处理。

Queue.task_done()

表明先前的队列任务已完成。由消费者线程使用。

Queue.join()

阻塞队列,直到队列内的所有元素被获取和处理。

当有元素进入队列时未完成任务的计数将增加。每当有消费者线程调用task_done()方法表示一个任务被完成时,未完成任务的计数将减少。当该计数变成0的时候,join()方法将不再阻塞。

实例展示

看一些使用的例子:

>>> import queue
>>> q = queue.Queue(5)
>>> q.put(1)
>>> q.put(2)
>>> q.put(3)
>>> q.get()
1
>>> q.get()
2
>>> q.get()
3
>>> q.get()  # 阻塞了
-------------------------------------
>>> q = queue.Queue(5)
>>> q.maxsize
5
>>> q.qsize()
0
>>> q.empty()
True
>>> q.full()
False
>>> q.put(123)
>>> q.put("abc")
>>> q.put(["1","2"])
>>> q.put({"name":"tom"})
>>> q.put(None)
>>> q.put("6")   # 阻塞了
-----------------------------------
>>> q = queue.LifoQueue()
>>> q.put(1)
>>> q.put(2)
>>> q.put(3)
>>> q.get()
3
>>> q.get()
2
>>> q.get()
1
-------------------------------------
>>> q = queue.PriorityQueue()
>>> q.put((3,"haha"))
>>> q.put((2,"heihei"))
>>> q.put((1,"hehe"))
>>> q.get()
(1, 'hehe')
>>> q.get()
(2, 'heihei')
>>> q
<queue.PriorityQueue object at 0x0000016825583470>
>>> q.put((4, "xixi"))
>>> q.get()
(3, 'haha')

下面是一个等待排队任务如何完成的例子:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import queue
import threading


def worker(i):
    while True:
        item = q.get()
        if item is None:
            print("线程%s发现了一个None,可以休息了^-^" % i)
            break
        # do_work(item)做具体的工作
        time.sleep(0.5)
        print("线程%s将任务<%s>完成了!" % (i, item))
        # 做完后发出任务完成信号,然后继续下一个任务
        q.task_done()


if __name__ == '__main__':
    num_of_threads = 5

    source = [i for i in range(1, 21)]  # 模拟20个任务

    # 创建一个FIFO队列对象,不设置上限
    q = queue.Queue()
    # 创建一个线程池
    threads = []
    # 创建指定个数的工作线程,并讲他们放到线程池threads中
    for i in range(1, num_of_threads+1):
        t = threading.Thread(target=worker, args=(i,))
        threads.append(t)
        t.start()

    # 将任务源里的任务逐个放入队列
    for item in source:
        time.sleep(0.5)     # 每隔0.5秒发布一个新任务
        q.put(item)

    # 阻塞队列直到队列里的任务都完成了
    q.join()
    print("-----工作都完成了-----")
    # 停止工作线程
    for i in range(num_of_threads):
        q.put(None)
    for t in threads:
        t.join()
    print(threads)

注意,每次运行的结果可能都不一样:

线程1将任务<1>完成了!
线程3将任务<2>完成了!
线程3将任务<3>完成了!
线程5将任务<4>完成了!
线程5将任务<5>完成了!
线程4将任务<6>完成了!
线程3将任务<7>完成了!
线程3将任务<8>完成了!
线程5将任务<9>完成了!
线程1将任务<10>完成了!
线程1将任务<11>完成了!
线程1将任务<12>完成了!
线程3将任务<13>完成了!
线程5将任务<14>完成了!
线程1将任务<15>完成了!
线程1将任务<16>完成了!
线程1将任务<17>完成了!
线程3将任务<18>完成了!
线程5将任务<19>完成了!
线程5将任务<20>完成了!
-----工作都完成了-----
线程2发现了一个None,可以休息了^-^
线程1发现了一个None,可以休息了^-^
线程3发现了一个None,可以休息了^-^
线程5发现了一个None,可以休息了^-^
线程4发现了一个None,可以休息了^-^
[<Thread(Thread-1, stopped 7028)>, <Thread(Thread-2, stopped 6800)>, <Thread(Thread-3, stopped 5620)>, <Thread(Thread-4, stopped 6508)>, <Thread(Thread-5, stopped 4344)>]

PS:

Python提供了很多关于队列的类,其中:

Class multiprocessing.Queue是用于多进程的队列类(不要和多线程搞混了)

collections.deque则是一种可选择的队列替代方案,它提供了快速的原子级别的append()popleft()方法,但是不提供锁的能力。


 hashlib fileinput 

评论总数: 0


点击登录后方可评论

聚圣源宋词起女孩姓名猴王传种菜骷髅的异域开荒女孩春天出生起名子给女宝宝起名姓王vray渲染器下载潘朝晖英文名是怎么起的双胎男宝起名大全石竹花花语孝义新闻黄起名字女孩父母的爱作文艾灸馆起名字大全终焉誓约新出生的男孩起名字最强国防生大腹便便的意思钟姓新生儿起名cctv4直播游戏名字起名字寓意很好的成语可起名字狗年起名2018鼠宝宝起啥名好qq等级手表店起名免费起名程序排行榜扈华国最强大脑播出时间李字起名女娃淀粉肠小王子日销售额涨超10倍罗斯否认插足凯特王妃婚姻让美丽中国“从细节出发”清明节放假3天调休1天男孩疑遭霸凌 家长讨说法被踢出群国产伟哥去年销售近13亿网友建议重庆地铁不准乘客携带菜筐雅江山火三名扑火人员牺牲系谣言代拍被何赛飞拿着魔杖追着打月嫂回应掌掴婴儿是在赶虫子山西高速一大巴发生事故 已致13死高中生被打伤下体休学 邯郸通报李梦为奥运任务婉拒WNBA邀请19岁小伙救下5人后溺亡 多方发声王树国3次鞠躬告别西交大师生单亲妈妈陷入热恋 14岁儿子报警315晚会后胖东来又人满为患了倪萍分享减重40斤方法王楚钦登顶三项第一今日春分两大学生合买彩票中奖一人不认账张家界的山上“长”满了韩国人?周杰伦一审败诉网易房客欠租失踪 房东直发愁男子持台球杆殴打2名女店员被抓男子被猫抓伤后确诊“猫抓病”“重生之我在北大当嫡校长”槽头肉企业被曝光前生意红火男孩8年未见母亲被告知被遗忘恒大被罚41.75亿到底怎么缴网友洛杉矶偶遇贾玲杨倩无缘巴黎奥运张立群任西安交通大学校长黑马情侣提车了西双版纳热带植物园回应蜉蝣大爆发妈妈回应孩子在校撞护栏坠楼考生莫言也上北大硕士复试名单了韩国首次吊销离岗医生执照奥巴马现身唐宁街 黑色着装引猜测沈阳一轿车冲入人行道致3死2伤阿根廷将发行1万与2万面值的纸币外国人感慨凌晨的中国很安全男子被流浪猫绊倒 投喂者赔24万手机成瘾是影响睡眠质量重要因素春分“立蛋”成功率更高?胖东来员工每周单休无小长假“开封王婆”爆火:促成四五十对专家建议不必谈骨泥色变浙江一高校内汽车冲撞行人 多人受伤许家印被限制高消费

聚圣源 XML地图 TXT地图 虚拟主机 SEO 网站制作 网站优化