Python 中的枚举类型

枚举类型可以看作是一种标签或是一系列常量的集合,通常用于表示某些特定的有限集合,例如星期、月份、状态等。Python 的原生类型(Built-in types)里并没有专门的枚举类型,但是我们可以通过很多方法来实现它,例如字典、类等:

WEEKDAY = {
    'MON': 1,
    'TUS': 2,
    'WEN': 3,
    'THU': 4,
    'FRI': 5
}
class Color:
    RED   = 0
    GREEN = 1
    BLUE  = 2

上面两种方法可以看做是简单的枚举类型的实现,如果只在局部范围内用到了这样的枚举变量是没有问题的,但问题在于它们都是可变的(mutable),也就是说可以在其它地方被修改从而影响其正常使用:

WEEKDAY['MON'] = WEEKDAY['FRI']
print(WEEKDAY)

{‘FRI’: 5, ‘TUS’: 2, ‘MON’: 5, ‘WEN’: 3, ‘THU’: 4}

通过类定义的枚举甚至可以实例化,变得不伦不类:

c = Color()
print(c.RED)
Color.RED = 2
print(c.RED)

0
2

当然也可以使用不可变类型(immutable),例如元组,但是这样就失去了枚举类型的本意,将标签退化为无意义的变量:

COLOR = ('R', 'G', 'B')
print(COLOR[0], COLOR[1], COLOR[2])

R G B

为了提供更好的解决方案,Python 通过PEP 435 在 3.4 版本中添加了 enum标准库,3.4 之前的版本也可以通过 pip install enum 下载兼容支持的库。enum 提供了 Enum,IntEnum,unique三个工具,用法也非常简单,可以通过继承 Enum/IntEnum 定义枚举类型,其中 IntEnum 限定枚举成员必须为(或可以转化为)整数类型,而 unique 方法可以作为修饰器限定枚举成员的值不可重复:

from enum import Enum, IntEnum, unique
 
try:
    @unique
    class WEEKDAY(Enum):
        MON = 1
        TUS = 2
        WEN = 3
        THU = 4
        FRI = 1
except ValueError as e:
    print(e)

duplicate values found in : FRI -> MON

try:
    class Color(IntEnum):
        RED   = 0
        GREEN = 1
        BLUE  = 'b'
except ValueError as e:
    print(e)

invalid literal for int() with base 10: ‘b’

更有趣的是 Enum的成员均为单例(Singleton),并且不可实例化,不可更改:

class Color(Enum):
    R = 0
    G = 1
    B = 2

try:
    Color.R = 2
except AttributeError as e:
    print(e)

Cannot reassign members.

虽然不可实例化,但可以将枚举成员赋值给变量:

red = Color(0)
green = Color(1)
blue = Color(2)
print(red, green, blue)

Color.R Color.G Color.B

也可以进行比较判断:

print(red is Color.R)
print(red == Color.R)
print(red is blue)
print(green != Color.B)
print(red == 0) # 不等于任何非本枚举类的值

True
True
False
True
False

最后一点,由于枚举成员本身也是枚举类型,因此也可以通过枚举成员找到其它成员:

print(red.B)
print(red.B.G.R)

Color.B
Color.R

但是要谨慎使用这一特性,因为可能与成员原有的命名空间中的名称相冲突:

print(red.name, ':', red.value)
 
class Attr(Enum):
    name  = 'NAME'
    value = 'VALUE'
print(Attr.name.value, Attr.value.name)

R : 0
NAME value

总结
enum 模块的用法很简单,功能也很明确,但是其实现方式却非常值得学习。如果你想更深入了解更多 Python 中关于 ClassMetaclass的黑魔法,又不知道如何入手,那么不妨阅读一下 enum源码,或者关注接下来后面几篇的内容!

Python元类

Python元类

从某种意义上讲,元类只是扩展了装饰器的代码插入模式。
元类主要是针对那些构建API和工具供他人使用的程序员。

Python构建工具:

  1. 内省属性。如__class__、__dict__
  2. 运算符重载方法。如__str__、__add__
  3. 属性拦截方法。如__getattr__、__setattr__、__getattribute__。
  4. 类特性。内置函数property。拦截特定的属性。
  5. 类属性描述符。拦截特定的属性。特定只是定义根据访问自动运行函数的属性描述符的一种简洁方式。
  6. 函数和类装饰器。
  7. 元类。

元类允许我们在在一条class语句的末尾,插入当创建一个类对象的时候自动运行的逻辑。
这个逻辑不会把类名重新绑定到一个装饰器可调用对象,而是把类自身的创建指向特定的逻辑。

和类装饰器不同,它通常是添加实例创建时运行的逻辑,元类在类创建时运行。
同样的,它们都是通常用来管理或扩展类的钩子,而不是管理其实例。

通过声明一个元类,我们告诉Python把类对象的创建路由到我们所提供的另一个类:

由于创建类的时候,Python在class语句的末尾自动调用元类,因此它可以根据需要扩展、注册或管理类。

类也是某物的实例:

  • 在Python3.0中,用户定义的类对象是名为type的对象的实例,type本身是一个类。
  • 在Python2.6中,新式类继承自object,它是type的一个子类;传统类是type的一个实例,并且并不创建自一个类。
    实例创建自类,而类创建自type。
    类是类型,类型也是类
  • 类型由派生自type的类定义。
  • 用户定义的类是类型类的实例。
  • 用户定义的类是产生它们自己的实例的类型。

类根本不是一个独立的概念:它们就是用户定义的类型,并且type自身也是由一个类定义的。

由于类实际上是type类的实例,从type的定制的子类创建类允许我们实现各种定制的类。
在Python3.0中以及在Python2.6的新式类中:

  • type是产生用户定义的类的一个类。
  • 元类是type类的一个子类。
  • 类对象是type类的一个实例,或一个子类。
  • 实例对象产生自一个类。

换句话说,为了控制创建类以及扩展其行为的方式,我们所需要做的只是指定个用户定义的类创建自一个用户定义的元类,而不是常规的type类。

从技术上讲,Python遵从一个标准的协议来使这发生:在一条class语句的末尾,并且在运行了一个命名控件词典中的所有嵌套代码之后,它调用type对象来创建class对象:

class = type(classname, superclasses,attrbuteddict)

type对象反过来定义了一个__call__运算符重载方法,当调用type对象的时候,该方法运行两个其他方法:

type.__new__(typeclass, classname,superclasses,attributeddict)
type.__init__(class,classname,superclasses,attributedict)

__new__方法创建并返回了新的class对象,并且随后__init__方法初始化了新创建的对象。
这是type的元类子类通常用来定制类的钩子。

尽管重新定义type超类的__new____init__方法是元类向类对象创建过程插入逻辑的最常见方法,其他方案也是可能的。
实际上任何可调用对象都可以用作一个元类,只要它接收传递的参数并且返回与目标类兼容的一个对象。

另外,我们也可以重定义元类的__call__, 以拦截创建调用。

making class
In SuperMeta.call:
…Spam
…(<class ‘__main__.Eggs’>,)
…{‘meth’: , ‘data’: 1, ‘__new__’: , ‘__module__’: ‘__main__’, ‘__qualname__’: ‘Spam’, ‘__init__’: }
In SubMeta.new:
…Spam
…(<class ‘__main__.Eggs’>,)
…{‘meth’: , ‘data’: 1, ‘__new__’: , ‘__module__’: ‘__main__’, ‘__qualname__’: ‘Spam’, ‘__init__’: }
In SubMeta init:
…Spam
…(<class ‘__main__.Eggs’>,)
…{‘meth’: , ‘data’: 1, ‘__new__’: , ‘__module__’: ‘__main__’, ‘__qualname__’: ‘Spam’, ‘__init__’: }
…init class object: [‘meth’, ‘data’, ‘__new__’, ‘__doc__’, ‘__module__’, ‘__init__’]
making instance
data: 1

调用顺序是:
1. 调用元类的__call__。只调用一次。
2. 调用元类的__new__。创建一个type实例,只调用一次。
3. 调用元类的__init__。初始化type实例,只调用一次。
4. 调用自定义类的__new__。调用多次,每创建一个自定义实例,调用一次。
5. 调用自定义类的__init__。调用多次,每创建一个自定义实例,调用一次。

实例与继承的关系

  • 元类继承自type类。
  • 元类声明由子类继承。在用户定义的类中,metaclass=M声明由该类的子类继承,因此,对于在超类链中继承了这一声明的每个类的构建,该元类都将运行。
  • 元类属性没有由类实例继承。元类声明指定了一个实例关系,它和继承不同。由于类是元类的实例,所以元类中定义的行为应用于类,而不是类随后的实例。实例从它们的类和超类获取行为,但是,不是从任何元类获取行为。从技术上讲,实例属性查找通过只是搜索实例及期所有类的__dict__字典;元类不包含在实例查找中。

元类与类装饰器在功能上有重合。

  • 在class语句末尾,类装饰器把类名重新绑定到一个函数的结果。
  • 元类通过在一条class语句的末尾把类对象创建过程路由到一个对象来工作。

学习元类可参考Enum源码

  1. _EnumDict通过继承dict,并重写__setitem__,来实现枚举的name不重复。
  2. 通过重写元类EnumMeta的__delattr____setattr__来限制删除和修改枚举成员。

python之logging模块

python之logging模块

介绍

logging模块是用来做日志记录的。
logging.config用来配置。
logging.handlers用来处理日志,可以是屏幕输出,可以是写文件(文件又可以自动切割文件),可以是发邮件。

最简单的用法是:

import logging

logging.debug('This is debug message')
logging.info('This is info message')
logging.warning('This is warning message')
 
屏幕上打印:
WARNING:root:This is warning message

logging.basicConfig(**kwargs)

以默认格式创建一个StreamHandler,然后把它加到root logger,函数参数可以做些配置。
如果root logger没有定义handlers,则当函数debug(),info(), warning(), error() and critical()调用时会自动调用basicConfig()。

如果root logger已经定义好了handlers,则这个函数什么也不干。

import logging

logging.basicConfig(level=logging.DEBUG,
                format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                datefmt='%a, %d %b %Y %H:%M:%S',
                filename='myapp.log',
                filemode='w')
    
logging.debug('This is debug message')
logging.info('This is info message')
logging.warning('This is warning message')
 
./myapp.log文件中内容为:
Sun, 24 May 2009 21:48:54 demo2.py[line:11] DEBUG This is debug message
Sun, 24 May 2009 21:48:54 demo2.py[line:12] INFO This is info message
Sun, 24 May 2009 21:48:54 demo2.py[line:13] WARNING This is warning message

logging.basicConfig函数各参数:
filename: 指定日志文件名
filemode: 和file函数意义相同,指定日志文件的打开模式,’w’或’a’
format: 指定输出的格式和内容,format可以输出很多有用信息,如上例所示:
%(levelno)s: 打印日志级别的数值
%(levelname)s: 打印日志级别名称
%(pathname)s: 打印当前执行程序的路径,其实就是sys.argv[0]
%(filename)s: 打印当前执行程序名
%(funcName)s: 打印日志的当前函数
%(lineno)d: 打印日志的当前行号
%(asctime)s: 打印日志的时间
%(thread)d: 打印线程ID
%(threadName)s: 打印线程名称
%(process)d: 打印进程ID
%(message)s: 打印日志信息
datefmt: 指定时间格式,同time.strftime()
level: 设置日志级别,默认为logging.WARNING
stream: 指定将日志的输出流,可以指定输出到sys.stderr,sys.stdout或者文件,默认输出到sys.stderr,当stream和filename同时指定时,stream被忽略

logging.handlers

handlers控制日志的落地。有下面这些方式。

logging.StreamHandler: 日志输出到流,可以是sys.stderr、sys.stdout或者文件
logging.FileHandler: 日志输出到文件
日志回滚方式,实际使用时用RotatingFileHandler和TimedRotatingFileHandler
logging.handlers.BaseRotatingHandler
logging.handlers.RotatingFileHandler
logging.handlers.TimedRotatingFileHandler
logging.handlers.SocketHandler: 远程输出日志到TCP/IP sockets
logging.handlers.DatagramHandler: 远程输出日志到UDP sockets
logging.handlers.SMTPHandler: 远程输出日志到邮件地址
logging.handlers.SysLogHandler: 日志输出到syslog
logging.handlers.NTEventLogHandler: 远程输出日志到Windows NT/2000/XP的事件日志
logging.handlers.MemoryHandler: 日志输出到内存中的制定buffer
logging.handlers.HTTPHandler: 通过”GET”或”POST”远程输出到HTTP服务器

将日志同时输同到文件和屏幕

import logging

logging.basicConfig(level=logging.DEBUG,
                format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                datefmt='%a, %d %b %Y %H:%M:%S',
                filename='myapp.log',
                filemode='w')

#################################################################################################
#定义一个StreamHandler,将INFO级别或更高的日志信息打印到标准错误,并将其添加到当前的日志处理对象#
console = logging.StreamHandler()
console.setLevel(logging.INFO)
formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
console.setFormatter(formatter)
logging.getLogger('').addHandler(console)
#################################################################################################

logging.debug('This is debug message')
logging.info('This is info message')
logging.warning('This is warning message')
 
屏幕上打印:
root        : INFO     This is info message
root        : WARNING  This is warning message
./myapp.log文件中内容为:
Sun, 24 May 2009 21:48:54 demo2.py[line:11] DEBUG This is debug message
Sun, 24 May 2009 21:48:54 demo2.py[line:12] INFO This is info message
Sun, 24 May 2009 21:48:54 demo2.py[line:13] WARNING This is warning message

自动切割日志

import logging
from logging.handlers import RotatingFileHandler

#################################################################################################
#定义一个RotatingFileHandler,最多备份5个日志文件,每个日志文件最大10M
Rthandler = RotatingFileHandler('myapp.log', maxBytes=10*1024*1024,backupCount=5)
Rthandler.setLevel(logging.INFO)
formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
Rthandler.setFormatter(formatter)
logging.getLogger('').addHandler(Rthandler)
#######################################################################################

这个用的比较多。留下日志,1是可以跟踪性能信息,也可以跟踪出错信息。

通过文件配置日志

配置文件

#logger.conf
###############################################
[loggers]
keys=root,test01,test02
[logger_root]
level=DEBUG
handlers=hand01,hand02
[logger_test01]
handlers=hand01,hand02
qualname=test01
propagate=0
[logger_test02]
handlers=hand01,hand03
qualname=test02
propagate=0
###############################################
[handlers]
keys=hand01,hand02,hand03
[handler_hand01]
class=StreamHandler
level=INFO
formatter=form02
args=(sys.stderr,)
[handler_hand02]
class=FileHandler
level=DEBUG
formatter=form01
args=('myapp.log', 'a')
[handler_hand03]
class=handlers.RotatingFileHandler
level=INFO
formatter=form02
args=('myapp.log', 'a', 10*1024*1024, 5)
###############################################
[formatters]
keys=form01,form02
[formatter_form01]
format=%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s
datefmt=%a, %d %b %Y %H:%M:%S
[formatter_form02]
format=%(name)-12s: %(levelname)-8s %(message)s
datefmt=

代码使用配置文件


import logging
import logging.config

logging.config.fileConfig("logger.conf")
logger = logging.getLogger("test01")

logger.debug('This is debug message')
logger.info('This is info message')
logger.warning('This is warning message')

blog_mini bug修复记录1

blog_mini bug修复记录1

发现有篇文章只显示了一半。编辑之后再发布,还是只有一半。另外发布的时候感觉比较慢。

ab测试结果

ab -n 100 -c 10 http://blog.go2live.cn/
This is ApacheBench, Version 2.3 <$Revision: 1430300 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking blog.go2live.cn (be patient).....done


Server Software:        nginx/1.10.2
Server Hostname:        blog.go2live.cn
Server Port:            80

Document Path:          /
Document Length:        24039 bytes

Concurrency Level:      10
Time taken for tests:   14.268 seconds
Complete requests:      100
Failed requests:        0
Write errors:           0
Total transferred:      2435000 bytes
HTML transferred:       2403900 bytes
Requests per second:    7.01 [#/sec] (mean)
Time per request:       1426.779 [ms] (mean)
Time per request:       142.678 [ms] (mean, across all concurrent requests)
Transfer rate:          166.66 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   0.1      0       1
Processing:   141 1350 261.7   1420    1497
Waiting:      140 1350 261.7   1420    1497
Total:        141 1350 261.6   1420    1497

Percentage of the requests served within a certain time (ms)
  50%   1420
  66%   1436
  75%   1455
  80%   1460
  90%   1473
  95%   1489
  98%   1492
  99%   1497
 100%   1497 (longest request)

应该说效率比较低。代码还要走读一直,看是直接增加memcache还是可以用插件的形式。

先修复内容只有一半的问题。
1.从相关的models,forms,views都没有看出问题。
2.通过抓包发现提交上去的内容并没有变少。
3.从数据库内容看,确实内容被截断了。

光看代码没有看出来。只有debug跟踪一下了。在python中debug可以用logging模块。

编辑config.py增加文件日志

  Rthandler = RotatingFileHandler(os.path.join(basedir,'../log/myapp.log'), maxBytes=100*1024*1024,backupCount=5)
  Rthandler.setLevel(logging.INFO)
  formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
  Rthandler.setFormatter(formatter)
  app.logger.addHandler(Rthandler)

最后发现居然是数据库的 Text字段不够长。。手动把结构改成了LongText。
找不半天没有发现flask-sqlalchemy的字段里支持LongText。

python多线程相关概念及解释

python多线程相关概念及解释

简介

多线程是为了抢占资源而设计的。同样作用的有多进程,不过因为线程的创建和切换代价远比进程低,一般是选择线程来执行多任务。因为线程会共享进程内的资源,为了处理多个线程处理相同的资源,又多出了锁的概念。
而本身线程的创建其实也是有代价的,为了避免反复的创建和销毁,又有了线程池的概念。

线程的基本使用

直接调用threading.Thread类

#coding=utf-8
import threading
from time import sleep,ctime

def aLongTimeWorkWithIO():
    sleep(5)
    print("aLongTimeWorkWithIO  work  in thread:%s,%s" % (threading.current_thread().name, ctime()))

if __name__ == '__main__':
    print("start thread:", threading.current_thread().name, ctime())
    t = threading.Thread(target=aLongTimeWorkWithIO, args=())
    t.start()
    print("end thread:", threading.current_thread().name, ctime() )

输入结果:

start thread: MainThread Mon Feb 6 22:50:04 2017
end thread: MainThread Mon Feb 6 22:50:04 2017
aLongTimeWorkWithIO work in thread:Thread-1,Mon Feb 6 22:50:09 2017

最基本的代码执行是按顺序,一条一条执行下去的。只有一条路。
线程花费的CPU时间是占满主线程的。

而多线程的引入其实就是增加了代码执行路径。进程花费的CPU时间是在主线程和新线程之间交叉进行的。

继承threading.Thread类

#coding=utf-8
import threading
from time import sleep,ctime
class MyThread(threading.Thread):
    def run(self):
        sleep(5)
        print("aLongTimeWorkWithIO  work  in thread:%s,%s" % (threading.current_thread().name, ctime()))

if __name__ == '__main__':
    print("start thread:", threading.current_thread().name, ctime())
    t = MyThread()
    t.start()
    print("end thread:", threading.current_thread().name, ctime() )

重载下run方法即可。

线程交互

线程合并join

join(timeout)方法将会等待直到线程结束。这将阻塞正在调用的线程,直到被调用join()方法的线程结束。
如果线程是独立变化的挺好。譬如需要下载10张图片。因为这10张图片没有依赖关系。所以可以同时启动10个线程去下载就好了。
但有时候,线程之间是有依赖关系的,如父线程必须得等子线程完成 拿到子线程的运行结果才可以继续作业。

import threading
import time

def target():
    print('the curent threading  %s is running' % threading.current_thread().name)
    time.sleep(1)
    print('the curent threading  %s is ended' % threading.current_thread().name)

print('the curent threading  %s is running' % threading.current_thread().name)
t = threading.Thread(target=target)

#t.setDaemon(True) #1
t.start()
#t.join() #2
print('the curent threading  %s is ended' % threading.current_thread().name)

代码输出:

the curent threading MainThread is running
the curent threading Thread-1 is running
the curent threading MainThread is ended
the curent threading Thread-1 is ended

你可能会觉得奇怪。为啥主线程退出,子线程还会运行。这其实是python的主线程干的好事,他会在即将退出时检查所有的非daemon且alive的线程,一个一个调用join方法。

所以要使主线程退出,子线程也退出,只需要把子线程设置为daemon,如上面代码中的#1,打开注释就可。
而join就是等待该线程执行完成。如上面的#2所示。同时打开#1和#2和都不打开的输出内容差不多,只是顺序不同。

the curent threading MainThread is running
the curent threading Thread-1 is running
the curent threading Thread-1 is ended
the curent threading MainThread is ended

条件变量

条件变量也是处理线程间协作的一种机制,详情见下文。

Event对象

线程可以读取共享的内存,通过内存做一些数据处理。这就是线程通信的一种,python还提供了更加高级的线程通信接口。Event对象可以用来进行线程通信,调用event对象的wait方法,线程则会阻塞等待,直到别的线程set之后,才会被唤醒。

#coding: utf-8
import  threading
import  time


class MyThread(threading.Thread):
    def __init__(self, event):
        super(MyThread, self).__init__()
        self.event = event

    def run(self):
        print("thread {} is ready ".format(self.name))
        self.event.wait()
        print("thread {} run".format(self.name))

signal = threading.Event()

def main():
    start = time.time()
    for i in range(3):
        t = MyThread(signal)
        t.start()
    time.sleep(3)
    print( "after {}s".format(time.time() - start))
    signal.set()




if __name__ == '__main__':
    main()

输出结果:

thread Thread-1 is ready
thread Thread-2 is ready
thread Thread-3 is ready
after 3.00528883934021s
thread Thread-1 run
thread Thread-3 run
thread Thread-2 run

屏障barrier

屏障(barrier)是用户协调多个线程并行工作的同步机制。屏障允许每个线程等待,直到所有的合作线程都到达某一点,然后从该点继续执行。
屏障允许任意数量的线程等待,直到所有的线程完成处理工作,而线程不需要退出。所有线程达到屏障后可以接着工作。

感觉在分治法中有用。分成的子任务都完成了才能合并。所以都需要等待其它子任务。

下面的代码t1和t2都在wait,直到t5也wait,达到了Barrier的值3,才继续运行。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading
from threading import Barrier, Lock
from time import time,sleep
from datetime import datetime

def test_with_barrier(synchronizer, serializer):
    name = threading.current_thread().name
    synchronizer.wait()
    now = time()
    with serializer:
        print("thread %s -----> %s" % (name, datetime.fromtimestamp(now)))

def test_without_barrier():
    name = threading.current_thread().name
    now = time()
    print("thread %s -----> %s" % (name, datetime.fromtimestamp(now)))

def worker(dictionary, key, item):
    dictionary[key] = item
    print(key, item)

if __name__ == '__main__':
    synchronizer = Barrier(3)
    serializer = Lock()
    threads = []
    t1 = threading.Thread(target=test_with_barrier,args=(synchronizer, serializer))
    t2 = threading.Thread(target=test_with_barrier,args=(synchronizer, serializer))
    t3 =  threading.Thread(target=test_without_barrier)
    t4 = threading.Thread(target=test_without_barrier)

    t5 = threading.Thread(target=test_with_barrier,args=(synchronizer, serializer))
    threads.append(t1)
    threads.append(t2)
    threads.append(t3)
    threads.append(t4)

    for t in threads:
        t.start()


    sleep(5)
    t5.start()

输出结果:

thread Thread-3 —–> 2017-02-07 11:59:46.336922
thread Thread-4 —–> 2017-02-07 11:59:46.339094
thread Thread-5 —–> 2017-02-07 11:59:51.341117
thread Thread-1 —–> 2017-02-07 11:59:51.341264
thread Thread-2 —–> 2017-02-07 11:59:51.341474

避免共享资源冲突

锁的使用

互斥锁

多个线程可能会用到同一个资源,如果在使用这个资源的时候,不是原子操作。则很可能会产生不可预知的错误。例如线程A使用全局变量i,先读取值(i=1),然后再设置i(i=i+1)。线程B同样是先读取值(i=1),然后再设置i(i=i+1)。
因为[先读取值(i=1),然后再设置i(i=i+1)]并不是一个原子操作,这里面有时间窗口,那么就会有可能引起错误。如在线程A获取到i=1时,这个时候CPU切换到了线程B。
线程B执行的任务是把i设置为2。CPU切换为线程A,线程设置值为2。这里就有问题了,其实最初设计的时候期望值是3(期望线程A和B都能增加1)

为了模拟获取值和设置值之间的时间窗口。特意搞了个sleep。代码如下:

#coding: utf-8
import  threading
import  time

counter = 0
counter_lock = threading.Lock()
class  MyThread(threading.Thread):#使用类定义thread,继承threading.Thread
    def  __init__(self,name):
        threading.Thread.__init__(self)
        self.name = "Thread-" + str(name)

    def run(self):  #run函数必须实现
        global counter,counter_lock #多线程是共享资源的,使用全局变量
       # if counter_lock.acquire(): #当需要独占counter资源时,必须先锁定,这个锁可以是任意的一个锁,可以使用上边定义的3个锁中的任意一个
        tmp = counter
        time.sleep(1)
        counter = tmp+1
        print("I am %s, set counter:%s"  % (self.name,counter))
       #     counter_lock.release() #使用完counter资源必须要将这个锁打开,让其他线程使用


if  __name__ ==  "__main__":
    for i in range(1,11):
        my_thread = MyThread(i)
        my_thread.start()

输出结果:

I am Thread-1, set counter:1
I am Thread-8, set counter:1
I am Thread-4, set counter:1
I am Thread-6, set counter:1
I am Thread-7, set counter:1
I am Thread-2, set counter:1
I am Thread-9, set counter:1
I am Thread-5, set counter:1
I am Thread-3, set counter:1
I am Thread-10, set counter:1

锁的引入就是为了解决这个问题,它把非原子操作变成了原子操作。
在线程没有获取到锁的时候只能等待,使用完共享资源再释放锁,以供其它线程使用。
这里有2个注意点:
1. 是锁的使用必须是一致的。也就是对同样的共享资源,必须使用相同的锁机制。只要有一处没有使用,锁就没意义了。
2. 需要避免死锁。获取锁的顺序是和释放锁的顺序相反的。获取锁A,获取锁B,释放锁B,释放锁A。并且其它地方也必须按同样的顺序获取和释放锁。不这样的话很容易造成死锁。

下面的代码就是锁的简单使用,解决了上面代码中的问题。

#coding: utf-8
import  threading
import  time

counter = 0
counter_lock = threading.Lock()
class  MyThread(threading.Thread):#使用类定义thread,继承threading.Thread
    def  __init__(self,name):
        threading.Thread.__init__(self)
        self.name = "Thread-" + str(name)

    def run(self):  #run函数必须实现
        global counter,counter_lock #多线程是共享资源的,使用全局变量
        if counter_lock.acquire(): #当需要独占counter资源时,必须先锁定,这个锁可以是任意的一个锁,可以使用上边定义的3个锁中的任意一个
            tmp = counter
            time.sleep(1)
            counter = tmp+1
            print("I am %s, set counter:%s"  % (self.name,counter))
            counter_lock.release() #使用完counter资源必须要将这个锁打开,让其他线程使用


if  __name__ ==  "__main__":
    for i in range(1,11):
        my_thread = MyThread(i)
        my_thread.start()

输出结果如下:

I am Thread-1, set counter:1
I am Thread-2, set counter:2
I am Thread-3, set counter:3
I am Thread-4, set counter:4
I am Thread-5, set counter:5
I am Thread-6, set counter:6
I am Thread-7, set counter:7
I am Thread-8, set counter:8
I am Thread-9, set counter:9
I am Thread-10, set counter:10

需要说明的是,锁解决了共享资源冲突问题,同时他又影响了线程的并发度。降低了效率。
合理的线程设计很重要,最好是没有共享资源的使用,如全局变量,共同文件等。

可重入锁

为了支持在同一线程中多次请求同一资源,python提供了可重入锁(RLock)。RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。

#coding: utf-8
import  threading
import  time


mutex = threading.RLock()

class MyThread(threading.Thread):

    def run(self):
        if mutex.acquire(1):
            print("thread {} get mutex".format(self.name))
            time.sleep(1)
            mutex.acquire()
            mutex.release()
            mutex.release()



def main():
    print("Start main threading")
    for i in range(2):
        MyThread().start()

    print("End Main threading")


if __name__ == '__main__':
    main()

输出结果:

Start main threading
thread Thread-1 get mutex
End Main threading
thread Thread-2 get mutex

条件变量

有了锁,为什么还要有条件变量

条件变量(cond)是在多线程程序中用来实现”等待–》唤醒”逻辑常用的方法。条件变量利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待”条件变量的条件成立”而挂起;另一个线程使“条件成立”。为了防止竞争,条件变量的使用总是和一个互斥锁结合在一起。线程在改变条件状态前必须首先锁住互斥量,函数pthread_cond_wait把自己放到等待条件的线程列表上,然后对互斥锁解锁(这两个操作是原子操作)。在函数返回时,互斥量再次被锁住。

锁的引入已经解决了线程的同步问题,为什么还要用到条件变量呢?
首先,举个例子:在应用程序中有连个线程thread1,thread2,thread3和thread4,有一个int类型的全局变量iCount。iCount初始化为0,thread1和thread2的功能是对iCount的加1,thread3的功能是对iCount的值减1,而thread4的功能是当iCount的值大于等于100时,打印提示信息并重置iCount=0。
如果使用互斥量,线程代码大概应是下面的样子:

       thread1/2:
       while (1)
       {
             pthread_mutex_lock(&mutex);
             iCount++;
             pthread_mutex_unlock(&mutex);
       }
       thread4:
       while(1)
       {
             pthead_mutex_lock(&mutex);
             if (100 <= iCount)
             {
                   printf("iCount >= 100\r\n");
                   iCount = 0;
                   pthread_mutex_unlock(&mutex);
             }
             else
             {
                   pthread_mutex_unlock(&mutex);
             }
       }

在上面代码中由于thread4并不知道什么时候iCount会大于等于100,所以就会一直在循环判断,但是每次判断都要加锁、解锁(即使本次并没有修改iCount)。这就带来了问题一,CPU浪费严重。所以在代码中添加了sleep(),这样让每次判断都休眠一定时间。但这由带来的第二个问题,如果sleep()的时间比较长,导致thread4处理不够及时,等iCount到了很大的值时才重置。对于上面的两个问题,可以使用条件变量来解决。
首先看一下使用条件变量后,线程代码大概的样子:

      thread1/2:
       while(1)
       {
               pthread_mutex_lock(&mutex);
               iCount++;
               pthread_mutex_unlock(&mutex);
               if (iCount >= 100)
               {
                      pthread_cond_signal(&cond);
               }
       }         
       thread4:
       while (1)
       {
              pthread_mutex_lock(&mutex);
              while(iCount < 100)
              {
                     pthread_cond_wait(&cond, &mutex);
              }
              printf("iCount >= 100\r\n");
              iCount = 0;
              pthread_mutex_unlock(&mutex);
       }

从上面的代码可以看出thread4中,当iCount < 100时,会调用pthread_cond_wait。而pthread_cond_wait在上面应经讲到它会释放mutex,然后等待条件变为真返回。当返回时会再次锁住mutex。因为pthread_cond_wait会等待,从而不用一直的轮询,减少CPU的浪费。在thread1和thread2中的函数pthread_cond_signal会唤醒等待cond的线程(即thread4),这样当iCount一到大于等于100就会去唤醒thread4。从而不致出现iCount很大了,thread4才去处理。
需要注意的一点是在thread4中使用的while (iCount < 100),而不是if (iCount < 100)。这是因为在pthread_cond_singal()和pthread_cond_wait()返回之间有时间差,假如在时间差内,thread3又将iCount减到了100以下了,那么thread4就需要在等待条件为真了。

python中的例子

Pythonk中的条件变量是Condition对象。它除了具有acquire和release方法之外,还提供了wait和notify方法。线程首先acquire一个条件变量锁。如果条件不足,则该线程wait,如果满足就执行线程,甚至可以notify其他线程。其他处于wait状态的线程接到通知后会重新判断条件。

条件变量可以看成不同的线程先后acquire获得锁,如果不满足条件,可以理解为被扔到一个(Lock或RLock)的waiting池。直达其他线程notify之后再重新判断条件。该模式常用于生成者-消费者模式:

#coding: utf-8
import  threading
import  time
import random

queue = []

con = threading.Condition()

class Producer(threading.Thread):
    def run(self):
        while True:
            if con.acquire():
                if len(queue) > 10:
                    print("{} Producer waiting".format(threading.current_thread().name))
                    con.wait()
                else:
                    elem = random.randrange(100)
                    queue.append(elem)
                    print("{}, Producer a elem {}, Now size is {}".format( threading.current_thread().name,elem, len(queue)))
                    time.sleep(random.random())
                    con.notify()
                con.release()


class Consumer(threading.Thread):
    def run(self):
        while True:
            if con.acquire():
                if len(queue) < 0:
                    print("{} Consumer waiting".format(threading.current_thread().name))
                    con.wait()
                else:
                    elem = queue.pop()
                    print("{}, Consumer a elem {}. Now size is {}".format(threading.current_thread().name, elem, len(queue)))
                    time.sleep(random.random())
                    con.notify()
                con.release()

def main():
    for i in range(3):
        Producer().start()

    for i in range(2):
        Consumer().start()


if __name__ == '__main__':
    main()

输出结果如下:

Thread-1, Producer a elem 74, Now size is 1
Thread-1, Producer a elem 16, Now size is 2
Thread-1, Producer a elem 26, Now size is 3
Thread-1, Producer a elem 97, Now size is 4
Thread-1, Producer a elem 62, Now size is 5
Thread-1, Producer a elem 76, Now size is 6
Thread-1, Producer a elem 38, Now size is 7
Thread-1, Producer a elem 59, Now size is 8
Thread-1, Producer a elem 72, Now size is 9
Thread-1, Producer a elem 87, Now size is 10
Thread-1, Producer a elem 83, Now size is 11
Thread-1 Producer waiting
Thread-5, Consumer a elem 83. Now size is 10
Thread-5, Consumer a elem 87. Now size is 9
Thread-3, Producer a elem 19, Now size is 10
Thread-3, Producer a elem 15, Now size is 11
Thread-2 Producer waiting
Thread-1 Producer waiting
Thread-4, Consumer a elem 15. Now size is 10
Thread-4, Consumer a elem 19. Now size is 9
Thread-4, Consumer a elem 72. Now size is 8
Thread-4, Consumer a elem 59. Now size is 7
Thread-4, Consumer a elem 38. Now size is 6
Thread-4, Consumer a elem 76. Now size is 5
Thread-4, Consumer a elem 62. Now size is 4
Thread-4, Consumer a elem 97. Now size is 3
Thread-4, Consumer a elem 26. Now size is 2
Thread-4, Consumer a elem 16. Now size is 1
Thread-4, Consumer a elem 74. Now size is 0

信号量Semaphore

信号量是一个计数器,用于为多个线程提供对共享数据对象的访问。
为了获得共享资源,线程需要执行下列操作。

  1. 测试控制该资源的信号量。
  2. 若此信号量的值为正,则线程可以使用该资源。在这种情况下,线程会将信号量减1,表示它使用了一个资源单位。
  3. 否则,若此信号量的值为0,则线程进入休眠状态,直至信号量值大于0.进程被唤醒后,它返回到步骤1.

当线程不再使用由一个信号量控制的共享资源时,该信号量值增1。如果有线程正在休眠等待此信号量,则唤醒它们。

scan2.py

# coding=UTF-8
import optparse
import socket
import threading
screenLock = threading.Semaphore(value=1)
def connScan(tgtHost, tgtPort):
    try:
        connSkt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        connSkt.connect((tgtHost, tgtPort))
        connSkt.send('ViolentPython\r\n')
        results = connSkt.recv(100)
        screenLock.acquire()
        print('[+]%d/tcp open' % tgtPort)
        print('[+]' + str(results))
    except:
        screenLock.acquire()
        print('[-]%d/tcp close' % tgtPort)
    finally:
        screenLock.release()
        connSkt.close()

def portScan(tgtHost, tgtPorts):
    try:
        tgtIp = socket.gethostbyname(tgtHost)
    except Exception as e:
        print("[-] Cannot resolve '%s' " % tgtHost)
        return
    try:
        tgtName = socket.gethostbyaddr(tgtIp)
        print('\n[+] Scan Results for:%s' % tgtName[0])
    except:
        print('\n[+] Scan Results for:%s' % tgtIp)
    socket.setdefaulttimeout(1)
    for tgtPort in tgtPorts:
        print('Scanning port' + str(tgtPort))
        t = threading.Thread(target=connScan, args=(tgtHost, int(tgtPort)))
        t.start()

def main():
    parser = optparse.OptionParser('usage%prog -H <target host> -p <target port>')
    parser.add_option('-H', dest='tgtHost', type='string', help='specify target host')
    parser.add_option('-p', dest = 'tgtPort', type='int', help='specify target port')
    options, args = parser.parse_args()
    if options.tgtHost is None or options.tgtPort is None:
        print(parser.usage)
        exit(0)
    else:
        tgtHost = options.tgtHost
        tgtPort = options.tgtPort
    args.append(tgtPort)
    portScan(tgtHost, args)

if __name__ == '__main__':
    main()

输出示例:

hacker git:(master) ✗ python scan2.py -H www.baidu.com -p 22
[+] Scan Results for:103.235.46.39
Scanning port22
[-]22/tcp close

在这里Semaphore用来控制同一个线程在屏幕上的输出是连续的,不至于线程的输出相互交叉,从而导致不可读。

类似的还有BoundedSemaphore
示例代码:

# coding=UTF-8
from pexpect import pxssh
import optparse
import time
import threading

maxConnections = 5
connection_lock = threading.BoundedSemaphore(value=maxConnections)
Found = False
Fails = 0

def connect(host, user, password, release):
    global Found, Fails
    try:
        s = pxssh.pxssh()
        s.login(host, user, password)
        print('[+] Password Found: ' + password)
        Found = True
    except Exception as e:
        if 'read_nonblocking' in str(e):
            Fails += 1
            time.sleep(5)
            connect(host, user, password, False)
        elif 'synchronize with original prompt' in str(e):
            time.sleep(1)
            connect(host, user, password, False)
    finally:
        if release:
            connection_lock.release()

def main():
    parser = optparse.OptionParser('usage%prog '+'-H <target host> -u <user> -f <password list>')
    parser.add_option('-H', dest='tgtHost', type='string', help='specify target host')
    parser.add_option('-f', dest='passwdFile', type='string', help='specify password file')
    parser.add_option('-u', dest='user', type='string', help='specify the user')
    (options, args) = parser.parse_args()
    host = options.tgtHost
    passwdFile = options.passwdFile
    user = options.user
    if host == None or passwdFile == None or user == None:
        print(parser.usage)
        exit(0 )
    fn = open(passwdFile, 'r')
    for line in fn.readlines():
        if Found:
            print("[*] Exiting: Password Found")
            exit(0)
        if Fails > 5:
            print("[!] Exiting: Too Many Socket Timeouts")
            exit(0)
        connection_lock.acquire()
        password = line.strip('\r').strip('\n')
        print("[-] Testing: " + str(password))
        t = threading.Thread(target=connect, args=(host, user, password, True))
        t.start()

if __name__ == '__main__':
    main()

在这个例子中BoundedSemaphore起到了限制最大并发线程数的作用。

队列

生产消费者模型主要是对队列进程操作,贴心的Python为我们实现了一个队列结构,队列内部实现了锁的相关设置。可以用队列重写生产消费者模型。

queue内部实现了相关的锁,如果queue的为空,则get元素的时候会被阻塞,知道队列里面被其他线程写入数据。同理,当写入数据的时候,如果元素个数大于队列的长度,也会被阻塞。也就是在 put 或 get的时候都会获得Lock。

实现和上面类似功能的代码如下:

#coding: utf-8
import  threading
import  time
import random

import queue

queue = queue.Queue(10)

class Producer(threading.Thread):

    def run(self):
        while True:
            elem = random.randrange(100)
            queue.put(elem)
            print("Producer a elem {}, Now size is {}".format(elem, queue.qsize()))
            time.sleep(random.random())

class Consumer(threading.Thread):

    def run(self):
        while True:
            elem = queue.get()
            queue.task_done()
            print("Consumer a elem {}. Now size is {}".format(elem, queue.qsize()))
            time.sleep(random.random())

def main():

    for i in range(3):
        Producer().start()

    for i in range(2):
        Consumer().start()


if __name__ == '__main__':
    main()

输出结果:

Producer a elem 97, Now size is 1
Producer a elem 56, Now size is 2
Producer a elem 27, Now size is 3
Consumer a elem 97. Now size is 2
Consumer a elem 56. Now size is 1
Consumer a elem 27. Now size is 0
Producer a elem 41, Now size is 1
Producer a elem 75, Now size is 2
Producer a elem 18, Now size is 3
Consumer a elem 41. Now size is 2

ThreadLocal

这个概念在Java中也有。类似于伪私有类属性,在类方法前加__,如__X,Python会自动扩展这样的名称,以包含类的名称,从而使它们变成真正的唯一。这样在当前线程中就是全局变量,但对于整个进程来说,又是局部变量。

使用示例如下:

import threading
local = threading.local()
def func(name):
    print('current thread:%s' % threading.currentThread().name)
    local.name = name
    print("%s in %s" % (local.name,threading.currentThread().name))
t1 = threading.Thread(target=func,args=('haibo',))
t2 = threading.Thread(target=func,args=('lina',))
t1.start()
t2.start()
t1.join()
t2.join()

输出结果:

current thread:Thread-1
haibo in Thread-1
current thread:Thread-2
lina in Thread-2

线程池的使用

threadpool

threadpool需要安装。pip install threadpool

import threadpool
from time import sleep

def work(name):
sleep(1)
print("%s doing job" % name)

arg_list = [(["go2live.cn"],None),
(["blog.go2live.cn"],None)]
pool = threadpool.ThreadPool(5)
requests = threadpool.makeRequests(work, arg_list)
[pool.putRequest(req) for req in requests]
pool.wait()

输出结果:

go2live.cn doing job
blog.go2live.cn doing job

multiprocessing.dummy.Pool

标准库。

from time import sleep

from multiprocessing.dummy import Pool as ThreadPool

def work(name):
    sleep(1)
    print("%s doing job" % name)

pool = ThreadPool()
results = pool.map(work,['go2live.cn','blog.go2live.cn'])
print(results)
pool.close()
pool.join()

print('main ended')

输出结果:

go2live.cn doing job
blog.go2live.cn doing job
[None, None]
main ended

其它线程工具

在debug中可以利用threading模块获取到当前执行的线程情况:

支持的函数:

  • activeCount():返回激活的线程对象的数量
  • currentThread():返回当前cpu执行的线程对象
  • get_ident() 返回当前线程
  • enumerate(): 返回当前激活的线程对象列表
  • main_thread() 返回主 Thread 对象
  • settrace(func) 为所有线程设置一个 trace 函数
  • setprofile(func) 为所有线程设置一个 profile 函数
  • stack_size([size]) 返回新创建线程栈大小;或为后续创建的线程设定栈大小为 size
  • TIMEOUT_MAX Lock.acquire(), RLock.acquire(), Condition.wait() 允许的最大值

threading 可用对象列表:

  • Thread 表示执行线程的对象
  • Lock 锁原语对象
  • RLock 可重入锁对象,使单一进程再次获得已持有的锁(递归锁)
  • Condition 条件变量对象,使得一个线程等待另一个线程满足特定条件,比如改变状态或某个值
  • Semaphore 为线程间共享的有限资源提供一个”计数器”,如果没有可用资源会被阻塞
  • Event 条件变量的通用版本,任意数量的线程等待某个事件的发生,在该事件发生后所有线程被激活
  • Timer 与 Thread 相识,不过它要在运行前等待一段时间
  • Barrier 创建一个”阻碍”,必须达到指定数量的线程后才可以继续。

延迟执行新线程Timer

class threading.Timer(interval, function, args=None, kwargs=None)
过interval秒后再执行函数function,参数为后面的args和kwargs。

这东东在GUI编程中有用。把数据处理扔到新线程,从而不影响当前的用户交互。

示例代码:

#coding: utf-8
import  threading
import  time


def test():
    print("{} is running {}".format(threading.current_thread().name, time.ctime()))


def main():
    print("main thread start %s" % time.ctime())
    threading.Timer(3,test).start()
    print("main thread end %s" % time.ctime())


if __name__ == '__main__':
    main()

输出如下所示:

main thread start Tue Feb 7 11:41:56 2017
main thread end Tue Feb 7 11:41:56 2017
Thread-1 is running Tue Feb 7 11:41:59 2017

python的多线程缺陷

因为Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。

GIL是Python解释器设计的历史遗留问题,通常我们用的解释器是官方实现的CPython,要真正利用多核,除非重写一个不带GIL的解释器。

所以,在Python中,可以使用多线程,但不要指望能有效利用多核。如果一定要通过多线程利用多核,那只能通过C扩展来实现,不过这样就失去了Python简单易用的特点。

不过,也不用过于担心,Python虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。

总结

Python多线程在IO密集型任务中还是很有用处的,而对于计算密集型任务,应该使用Python多进程。

Python多进程相关概念及解释

Python多进程相关概念及解释

介绍

由于GIL的存在,为了利用多核的优势,Python程序员不得不使用多进程。(多线程在PYthon中仅在IO密集型任务中有优势)。
Python提供了非常好用的多进程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换。multiprocessing支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。
感觉使用方法和threading差不多。

原始方法fork

fork函数被调用一次,但返回两次。对父进程返回子进程ID, 对子进程返回0.
子进程获得父进程数据空间、堆和栈的副本。父子进程并不共享这些存储空间部分,父、子进程共享正文段。

由于在fork之后经常跟随着exec,所以现在的很多实现并不执行一个父进程数据段、堆和栈的完全复制。作为替代,使用了写时复制技术。
这些区域由父、子进程共享,而且内核将它们的访问权限改变为只读的。
如果父、子进程中的任一个试图修改这些区域,则内核只为修改区域的那块内存制作一个副本,通常是虚拟存储器系统中的一“页”。

一般来说,在fork之后是父进程还是子进程执行是不确定的。

在类unix系统中, python的os 模块内置了fork 函数用以创建子进程。

示例代码:

import os

print("Process %s start …" %(os.getpid()))
pid = os.fork()
if pid == 0:
    print("This is child process and my pid is %d, my father process is %d" %(os.getpid(), os.getppid()))
else:
    print("This is Fater process, And Its child pid is %d" %(pid))

输出结果:

Process 872 start …
This is Fater process, And Its child pid is 873
This is child process and my pid is 873, my father process is 872

subprocess

subprocess包主要功能是执行外部的命令和程序。比如说,我需要使用wget下载文件。我在Python中调用wget程序。从这个意义上来说,subprocess的功能与shell类似。

Popen

subprocess模块中只定义了一个类: Popen。
可以使用Popen来创建进程,并与进程进行复杂的交互。它的构造函数如下:

subprocess.Popen(args, bufsize=0, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=False, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0)

参数解释:

  • args 可以是字符串或者序列类型(如:list,元组),用于指定进程的可执行文件及其参数。如果是序列类型,第一个元素通常是可执行文件的路径。我们也可以显式的使用executeable参数来指定可执行文件的路径。在windows操作系统上,Popen通过调用CreateProcess()来创建子进程,CreateProcess接收一个字符串参数,如果args是序列类型,系统将会通过list2cmdline()函数将序列类型转换为字符串。
  • bufsize:指定打开stdin/stdout/stderr文件的缓冲类型。0 无缓冲;1 行缓冲; 其它正数 选择合适的缓冲; 负数(默认值),采用系统默认缓冲大小,即io.DEFAULT_BUFFER_SIZE。
  • executable用于指定可执行程序。一般情况下我们通过args参数来设置所要运行的程序。如果将参数shell设为True,executable将指定程序使用的shell。在windows平台下,默认的shell由COMSPEC环境变量来指定。
  • stdin, stdout, stderr分别表示程序的标准输入、输出、错误句柄。他们可以是PIPE,文件描述符或文件对象,也可以设置为None,表示从父进程继承。
  • preexec_fn 只在Unix平台下有效,用于指定一个可执行对象(callable object),它将在子进程运行之前被调用。
  • Close_sfs:在windows平台下,如果close_fds被设置为True,则新创建的子进程将不会继承父进程的输入、输出、错误管道。我们不能将close_fds设置为True同时重定向子进程的标准输入、输出与错误(stdin, stdout, stderr)。
  • 如果参数shell设为true,程序将通过shell来执行。
  • cwd用于设置子进程的当前目录。
  • env是字典类型,用于指定子进程的环境变量。如果env = None,子进程的环境变量将从父进程中继承。
  • Universal_newlines:不同操作系统下,文本的换行符是不一样的。如:windows下用’/r/n’表示换,而Linux下用’/n’。如果将此参数设置为True,Python统一把这些换行符当作’/n’来处理。
  • startupinfo与createionflags只在windows下用效,它们将被传递给底层的CreateProcess()函数,用于设置子进程的一些属性,如:主窗口的外观,进程的优先级等等。

方法解释:

  • Popen.poll() 用于检查子进程是否已经结束。设置并返回returncode属性。
  • Popen.wait() 等待子进程结束。设置并返回returncode属性。
  • Popen.communicate(input=None) 与子进程进行交互。向stdin发送数据,或从stdout和stderr中读取数据。可选参数input指定发送到子进程的参数。Communicate()返回一个元组:(stdoutdata, stderrdata)。注意:如果希望通过进程的stdin向其发送数据,在创建Popen对象的时候,参数stdin必须被设置为PIPE。同样,如果希望从stdout和stderr获取数据,必须将stdout和stderr设置为PIPE。
  • Popen.send_signal(signal) 向子进程发送信号。
  • Popen.terminate() 停止(stop)子进程。在windows平台下,该方法将调用Windows API TerminateProcess()来结束子进程。
  • Popen.kill() 杀死子进程。

简单示例如下:

import subprocess
child = subprocess.Popen(["ping","-c","5","www.google.com"])
print("parent process")
child.wait()

输出结果:

parent process
PING www.google.com (93.46.8.89): 56 data bytes
Request timeout for icmp_seq 0
Request timeout for icmp_seq 1
Request timeout for icmp_seq 2
Request timeout for icmp_seq 3

www.google.com ping statistics —
5 packets transmitted, 0 packets received, 100.0% packet loss

可以见到子进程的输出直接打印在了屏幕上,因为子进程默认继承了父进程的stdin/stdout/stderr。

Popen的再封装

subprocess.call()
父进程等待子进程完成
返回退出信息(returncode)

subprocess.check_call()
父进程等待子进程完成。
正常执行完返回0,否者抛出subprocess.CalledProcessError,其中包含returncode

subprocess.check_output()

父进程等待子进程完成。
返回子进程向标准输出的输出结果。

事实上这个三个都是利用subprocess.Popen实现的。

简单实现的check_output示例如下:

import subprocess

def my_check_output(args,
                    stdin=None,
                    stderr=None,
                    shell=False,
                    encoding=None,
                    errors=None,
                    universal_newlines=False,
                    timeout=None):

    child = subprocess.Popen(args, stdin=stdin, stdout=subprocess.PIPE,stderr=stderr,shell=shell,universal_newlines=universal_newlines)
    stdout,stderr = child.communicate(input=stdin, timeout=timeout)
    returncode = child.returncode
    if returncode != 0:
        raise subprocess.CalledProcessError(returncode=returncode)
    else:
        return stdout

try:
    ret = my_check_output("pwd",shell=True)
    print(ret)
except subprocess.CalledProcessError as e:
    print(e)

输出结果:

b’/Users/maynard/data/workbase/python/study/Blog_mini/tests\n’

标准模块multiprocessing

fork 方式是仅在linux 下才有的接口, 在windows下并没有, 那么在windows下如何实现多进程呢, 这就用到了multiprocessing

Process创建进程

直接调用

multiprocessing 模块的Process 对象表示的是一个进程对象, 可以创建子进程并执行指定的函数。
创建进程的类:Process([group [, target [, name [, args [, kwargs]]]]]),target表示调用对象,args表示调用对象的位置参数元组。kwargs表示调用对象的字典参数。name为别名。group实质上不使用。

方法:is_alive()、join([timeout])、run()、start()、terminate()。其中,Process以start()启动某个进程。

属性:authkey、daemon、exitcode(进程在运行时为None、如果为–N,表示被信号N结束)、name、pid。
其中daemon是父进程终止后自动终止,且自己不能产生新进程,必须在start()之前设置。

from multiprocessing import Process
import os

def pro_do(name, func):
    print("This is child process %d from parent process %d, and name is  %s which is used for %s"
          %(os.getpid(), os.getppid(), name, func))

if __name__ == "__main__":
    print("Parent process id %d" %(os.getpid()))
    #process 对象指定子进程将要执行的操作方法(pro_do), 以及该函数的对象列表args(必须是tuple格式, 且元素与pro_do的参数一一对应)
    pro = Process(target=pro_do, args=("test", "dev"))
    print("start child process")
    #启动子进程
    pro.start()
    #是否阻塞方式执行, 如果有, 则阻塞方式, 否则非阻塞
    pro.join() #if has this, it’s synchronous operation or asynchronous operation
    print("Process end")

输出结果:

Parent process id 921
start child process
This is child process 922 from parent process 921, and name is test which is used for dev
Process end

子类化Process

自定义类,继承自Process, 实现run方法。

import multiprocessing
import time

class ClockProcess(multiprocessing.Process):
    def __init__(self, interval):
        multiprocessing.Process.__init__(self)
        self.interval = interval

    def run(self):
        n = 5
        while n > 0:
            print("the time is {0}".format(time.ctime()))
            time.sleep(self.interval)
            n -= 1

if __name__ == '__main__':
    p = ClockProcess(3)
    p.start()

输出结果:

the time is Wed Feb 8 16:40:55 2017
the time is Wed Feb 8 16:40:58 2017
the time is Wed Feb 8 16:41:01 2017
the time is Wed Feb 8 16:41:04 2017
the time is Wed Feb 8 16:41:07 2017

Pool进程池

multiprocessing.Pool提供进程池功能。
在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

函数解释:

  • apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的
  • close() 关闭pool,使其不在接受新的任务。
  • terminate() 结束工作进程,不再处理未完成的任务。
  • join() 主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。
from multiprocessing import Pool
import os, time

def pro_do(process_num):
    print("child process id is %d" %(os.getpid()))
    time.sleep(6-process_num)
    print("this is process %d" %(process_num))

if __name__ == "__main__":
    print("Current process is %d" %(os.getpid()))
    p = Pool()#默认启动CPU个数的进程,可以传参指定。
    for i in range(5):
        p.apply_async(pro_do, (i,))  #增加新的进程
    p.close() # 禁止在增加新的进程
    p.join()#等待所有子进程结束
    print("pool process done")

输出结果:

Current process is 960
child process id is 961
child process id is 963
child process id is 962
child process id is 964
this is process 3
child process id is 964
this is process 2
this is process 1
this is process 4
this is process 0
pool process done

进程间的通信

父进程可以指定子进程执行的方法及其参数, 达到父进程向子进程传递消息的单向通信的目的, 那子进程之间或子进程怎么向父进程通信呢

Queue

Queue是通过Pipe和锁、信号量实现,在进程间共享的一个通信机制。
Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。

get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。

示例如下:

from multiprocessing import Process, Queue
import os, time

def write_queue(q):
    for name in ["Yi_Zhi_Yu", "Tony" ,"San"]:
        print("put name %s to queue" %(name))
        q.put(name)
        time.sleep(2)
    print ("write data finished")

def read_queue(q):
    print( "begin to read data")
    while True:
        name = q.get()
        print ("get name %s from queue" %(name))

if __name__ == "__main__":
    q = Queue()
    pw = Process(target=write_queue, args=(q,))
    pr = Process(target=read_queue,args=(q,))

    pw.start()
    pr.start()
    pw.join() #这个表示是否阻塞方式启动进程, 如果要立即读取的话, 两个进程的启动就应该是非阻塞式的, 所以pw在start后不能立即使用pw.join(), 要等pr start后方可
    pr.terminate() #服务进程,强制停止

输出结果:

put name Yi_Zhi_Yu to queue
begin to read data
get name Yi_Zhi_Yu from queue
put name Tony to queue
get name Tony from queue
put name San to queue
get name San from queue
write data finished

Manager

Queue不能用在父进程和Pool产生的子进程中。这个时候需要用到Manager。
其它如Lock等,也不能用multiprocessing.Lock,而是得用Manager中的Lock。

代码示例如下:

from multiprocessing import Process, Queue,Pool,Manager
import os, time

def write_queue(q):
    for name in ["Yi_Zhi_Yu", "Tony" ,"San"]:
        print("put name %s to queue" %(name))
        q.put(name)
        time.sleep(2)
    print ("write data finished")

def read_queue(q):
    print( "begin to read data")
    while True:
        name = q.get()
        print ("get name %s from queue" %(name))

if __name__ == "__main__":
    manager = Manager()
    q = manager.Queue()
    p = Pool()
    pw = p.apply_async(write_queue,args=(q,))
    pr = p.apply_async(read_queue,args=(q,))
    p.close()
    p.join()
    print("main end")

输出结果:

put name Yi_Zhi_Yu to queue
begin to read data
get name Yi_Zhi_Yu from queue
put name Tony to queue
get name Tony from queue
put name San to queue
get name San from queue
write data finished

Pipe

管道是UNIX系统IPC的最古老的形式,所有UNIX系统都提供此种通信机制。管道有以下两种局限性。

  1. 历史上,它们是半双工的(即数据只能在一个方向上流动)。现在,某些系统提供全双工管道,但是为了最佳的可移植性,我们决不应预先假定系统支持全双工管道。
  2. 管道只能在具有公共祖先的两个进程之间使用。通常,一个管道由一个进程创建,在进程调用fork之后,这个管道就能在父进程和子进程之间使用了。

Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。

send和recv方法分别是发送和接受消息的方法。例如,在全双工模式下,可以调用conn1.send发送消息,conn1.recv接收消息。如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。

代码示例:

#!/usr/bin/env python
#encoding=utf-8

from multiprocessing import Process,Pipe
import os,time,sys

def send_pipe(p):
    names = ["Yi_Zhi_Yu", "Tony", "San"]
    for name in names:
        print("put name %s to Pipe" %(name))
        p.send(name)
        time.sleep(1)
def recv_pipe(p):
    print("Try to read data in pipe")
    while True:
            name = p.recv()
            print("get name %s from pipe" %(name))

if __name__ == "__main__":
   #pipe, one for send, one for read
   ps_pipe, pr_pipe = Pipe()# return  (conn1, conn2). 有个参数来决定是否双向管理.True为双向管道,False为单向,conn1发送,conn2接收.默认True
   #process
   ps = Process(target=send_pipe, args=(ps_pipe,))
   pr = Process(target=recv_pipe, args=(pr_pipe,))
   pr.start()
   ps.start()
   ps.join()
   pr.terminate()

输出结果:

Try to read data in pipe
put name Yi_Zhi_Yu to Pipe
get name Yi_Zhi_Yu from pipe
put name Tony to Pipe
get name Tony from pipe
put name San to Pipe
get name San from pipe

Event

Event用来实现进程间同步通信。

示例代码:

import multiprocessing
import time

def wait_for_event(e):
    print("wait_for_event: starting")
    e.wait()
    print("wairt_for_event: e.is_set()->" + str(e.is_set()))

def wait_for_event_timeout(e, t):
    print("wait_for_event_timeout:starting")
    e.wait(t)
    print("wait_for_event_timeout:e.is_set->" + str(e.is_set()))

if __name__ == "__main__":
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(name = "block",
            target = wait_for_event,
            args = (e,))

    w2 = multiprocessing.Process(name = "non-block",
            target = wait_for_event_timeout,
            args = (e, 2))
    w1.start()
    w2.start()

    time.sleep(3)

    e.set()
    print("main: event is set")

输出结果:

wait_for_event: starting
wait_for_event_timeout:starting
wait_for_event_timeout:e.is_set->False
main: event is set
wairt_for_event: e.is_set()->True

解决共享资源

Lock

有with用法和明确的acquire和release用法。见代码示例:

import multiprocessing
import sys

def worker_with(lock, f):
    with lock:
        fs = open(f, 'a+')
        n = 10
        while n > 1:
            fs.write("Lockd acquired via with\n")
            n -= 1
        fs.close()

def worker_no_with(lock, f):
    lock.acquire()
    try:
        fs = open(f, 'a+')
        n = 10
        while n > 1:
            fs.write("Lock acquired directly\n")
            n -= 1
        fs.close()
    finally:
        lock.release()

if __name__ == "__main__":
    lock = multiprocessing.Lock()
    f = "file.txt"
    w = multiprocessing.Process(target = worker_with, args=(lock, f))
    nw = multiprocessing.Process(target = worker_no_with, args=(lock, f))
    w.start()
    nw.start()
    print("end")

file.txt文件内容:

Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lockd acquired via with
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly
Lock acquired directly

Semaphore

信号量(Semaphore)用来控制对共享资源的访问数量,例如池的最大连接数。

代码示例:

import multiprocessing
import time

def worker(s, i):
    s.acquire()
    print(multiprocessing.current_process().name + "acquire");
    time.sleep(i)
    print(multiprocessing.current_process().name + "release\n");
    s.release()

if __name__ == "__main__":
    s = multiprocessing.Semaphore(2)
    for i in range(5):
        p = multiprocessing.Process(target = worker, args=(s, i*2))
        p.start()

输出结果:

Process-1acquire
Process-1release

Process-2acquire
Process-3acquire
Process-2release

Process-4acquire
Process-3release

Process-5acquire
Process-4release

Process-5release

Python信号相关概念及运用

Python信号相关概念及运用

介绍

信号是软件中断。信号提供了一种处理异步事件的方法。

不存在编号为0的信号。kill函数对信号编号0有特殊的应用。POSIX.1将此信号编号值称为空信号。

很多条件可以产生信号:

  • 当用户按某些按键时,引发终端产生的信号。(DELETE,Ctrl+C,Ctrl+\,Ctrl+Z)。这些键可以自定义。
  • 硬件异常产生信号:除数为0、无效的内存引用等。
  • 进程调用kill(2)函数可将信号发送到另一个进程或进程组。有限制:只能发送给同一用户的进程,或者发送信号的进程所有者是超级用户。
  • 用户可用kill(1)命令将信号发送给其他进程。
  • 当检测到某种软件条件发生,并应将其通知有关进程时也产生信号。例句SIGURG、SIGPIPE、SIGALRM。

信号是异步事件的经典实例。产生信号的事件对进程而言是随机出现的。
可以要求内核在某个信号出现时按照下列方式之一进行处理:

  1. 忽略此信号。大多数都可以,但有两个不能忽略,他们是SIGKILL和SIGSTOP。原因是:它们向超级用户提供了使进程终止或停止的可靠方法。 另外,有些如果忽略由硬件异常引发的异常(如除0),则进程的行为是未定义的。
  2. 捕捉信号。自定义信号处理函数。注意:不能捕捉SIGKILL和SIGSTOP信号。
  3. 执行系统默认动作。针对大多数信号的系统默认动作是终止进程。

UNIX系统信号

名字 说明 默认动作 细节说明
SIGABRT 异常终止(abort) 终止+core 调用abort函数时产生此信号
SIGALRM 超时(alarm) 终止 调用alarm函数设置的计时器超时,由setitimer函数设置的间隔时间超时时,产生此信号
SIGBUS 硬件故障 终止+core 指示一个实现定义的 硬件故障。当出现某些类型的内存故障时,实现常常产生此信号
SIGCANCEL 线程库内部使用 忽略 Solaris线程库内部使用的信号
SIGCHLD 子进程状态改变 忽略 在一个进程终止或停止时,将SIGCHLD信号发送给其父进程。
SIGCONT 使暂停进程继续 继续/忽略 此作业控制信号被发送给需要继续运行,但当前处于停止状态的进程。
SIGEMT 硬件故障 终止+core 指示一个实现定义的 硬件故障。(emt = emulator trap)。
SIGFPE 算术异常 终止+core 此信号表示算术运算异常,例如除以0,浮点溢出等。
SIGFREEZE 检查点冻结 忽略 仅由Solaris定义。用于通知进程在冻结系统状态之前需要采取特定的动作。
SIGHUP 连接断开 终止 如果终端接口检测到一个连接断开,则将此信号发送给与该终端相关的控制进程(会话首进程)
SIGILL 非法硬件指令 终止+core 此信号指示进程已执行一条非法硬件指令。
SIGINFO 键盘状态请求 忽略 BSD信号
SIGINT 终端中断符 终止 当用户按中断键(一般采用DELETE或Ctrl+C)时,终端驱动程序产生此号并送至前台进程组中的每一个进程。
SIGIO 异步IO 终止/忽略 指示一个异步IO事件。
SIGIOT 硬件异常 终止+core 指示一个实现定义的 硬件故障 (iot = input/output trap)
SIGKILL 终止 终止 这是两个不能被捕捉或忽略的信号之一。它向系统管理员提供了一种可以杀死任一进程的可靠方法。
SIGLWP 线程库内部使用 忽略 Solaris线程库内部使用
SIGPIPE 写至无读进程的管道 终止 如果在写到管道时读进程已终止,则产生此信号。
SIGPOLL 可轮询事件 终止 当在一个 可轮询设备 上发生 一特定事件 时产生此信号。
SIGPROF 梗概时间超时(setitimer) 终止 当setitimer(2)函数设置的梗概统计间隔计时器已到期时产生此信号。
SIGPWR 电源失效/重启 终止/忽略 依赖于系统。主要用于具有不间断电源(UPS)的系统。如果电源失效,则UPS起作用,而且通常软件会接到通知。
SIGQUIT 终端退出符 终止+core 当用户在终端上按退出键(一般是Ctrl+)时,产生此信号,并送至前台进程组中的所有进程。
SIGSEGV 无效的内存引用 终止+core 此信号指示进程进行了一次无效的内存引用 (segv = segmentation violation)
SIGSTKFLT 协处理器栈故障 终止 此信号仅由Linux定义。
SIGSTOP 停止 暂停进程 作业控制信号,用于停止一个进程。类似于交互停止信号(SIGTSTP),但是SIGSTOP不能被捕捉或忽略。
SIGSYS 无效系统调用 终止+core 该信号指示一个无效的系统调用。
SIGTERM 终止 终止 这是由kill(1)命令发送的系统默认终止信号。
SIGTHAW 检查点解冻 忽略 此信号仅由Solaris定义。
SIGTRAP 硬件故障 终止+core 指示一个实现定义的 硬件故障 。当执行断点指令时,实现常用此信号将控制转移至调试程序
SIGTSTP 终端停止符 暂停进程 交互式停止指令。Ctrl+Z
SIGTTIN 后台读控制tty 暂停进程 当一个后台进程组中进程试图读其控制终端时,终端驱动程序产生此信号。有两种情况不产生。
SIGTTOU 后台写至控制tty 暂停进程 当一个后台进程组中的进程试图写到其控制终端时。一个进程可以允许后台进程写控制终端,不允许时也有两种情况不产生此信号
SIGURG 紧急情况(套接字) 忽略 此信号通知进程已经发生了一个紧急情况。在网络连接上接收到带外的数据时,可选择产生此信号。
SIGUSR1 用户定义的信号 终止 用户定义的信号,可用于应用程序。
SIGUSR2 用户定义的信号 终止 用户定义的信号,可用于应用程序。
SIGVTALRM 虚拟时间闹钟(setitimer) 终止 当一个由setitimer(2)函数设置的虚拟间隔时间到期时产生此信号。
SIGWAITING 线程库内部使用 忽略 此信号由Solaris线程库内部使用
SIGWINCH 终端窗口大小改变 忽略 内核维持与每个终端或伪终端相关联的窗口大小。进程可以用ioctl函数得到或设置窗口的大小。
SIGXCPU 超过CPU限制(setrlimit) 终止+core/忽略 如果进程超过了其软CPU时间限制,则产生SIGXCPU信号。
SIGXFSZ 超过文件长度限制(setrlimit) 终止+core/忽略 如果进程超过了其软文件长度限制,则产生此限制。
SIGXRES 超过资源限制 忽略 此信号仅由Solaris定义。

在下列条件下并不会产生core文件:

  • 进程是设置用户ID的,而且当前用户并非程序文件的所有者。
  • 进程是设置组ID的,而且当前用户并非该程序的组所有者。
  • 用户没有写当前工作目录的权限。
  • 该文件已存在,而且用户对该文件设有写权限。
  • 该文件太大。

signal包

signal包负责在Python程序内部处理信号,典型的操作包括预设信号处理函数,暂停并等待信号,以及定时发出SIGALRM等。要注意,signal包主要是针对UNIX平台(比如Linux, MAC OS),而Windows内核中由于对信号机制的支持不充分,所以在Windows上的Python不能发挥信号系统的功能。

捕捉信号

signal包的核心是使用signal.signal()函数来预设(register)信号处理函数,如下所示:

singnal.signal(signalnum, handler)

signalnum为某个信号,handler为该信号的处理函数。我们在信号基础里提到,进程可以无视信号,可以采取默认操作,还可以自定义操作。当handler为signal.SIG_IGN时,信号被无视(ignore)。当handler为singal.SIG_DFL,进程采取默认操作(default)。当handler为一个函数名时,进程采取函数中定义的操作。

示例代码s1.py:

#coding: utf-8
import signal
# Define signal handler function
def myHandler(signum, frame):
    print('I received: ', signum)

# register signal.SIGTSTP’s handler
signal.signal(signal.SIGTSTP, myHandler)
signal.pause()#等待信号产生,是任意信号。
print('End of Signal Demo')

执行过程1:

执行 python s1.py
键入Ctrl+Z

输出结果:

Z(‘I received: ‘, 18)
End of Signal Demo

执行过程2:

执行 python s1.py
键入Ctrl+C

输出结果:

CTraceback (most recent call last):
File “s1.py”, line 9, in
signal.pause()
KeyboardInterrupt

因为Ctrl+Z产生的SIGTSTP由自定义函数处理了。打印完返回原来代码执行处,继续打印’End of Signal Demo‘后程序退出。
而Ctrl+C产生的SIGINT信号没有设置捕捉函数,按系统默认操作执行,终止程序运行。

发送信号

signal.alarm(),它被用于在一定时间之后,向进程自身发送SIGALRM信号:

import signal
import time
# Define signal handler function
def myHandler(signum, frame):
    print("Now, it’s the time ", time.ctime())
    exit()

# register signal.SIGALRM’s handler
signal.signal(signal.SIGALRM, myHandler)
signal.alarm(1)
while True:
    print('not yet')

输出结果:

not yet
not yet

not yet
Now, it’s the time Thu Feb 9 11:49:05 2017

os包的kill发送信号

signal只有alarm可以发送SIGALRM信号。其它信号需要借助os包。

os.kill(pid, sid) #向进程pid发送信号sid

os.killpg(pgid, sid)#向进程组pgid发送信号sid

其他注意事项

进程捕捉到信号并对其进行处理时,进程正在执行的指令会被临时中断,转而去处理该信号处理程序中的指令,返回继续处理之前被中断的指令。
但是之前被中断的指令可能是malloc,getpwnam等,而信号处理函数也可能调用malloc,getpwnam,结果导致继续执行之前的malloc,getpwnam出错。malloc\getpwnam就是不可重入函数。
相对应的就是可重入函数。

Single UNIX Specification说明了保证可重入函数。一共有117个。
不可重入的原因有:

  • 已知它们使用静态数据结构
  • 它们调用malloc或free
  • 它们是标准I/O函数。标准I/O库的很多实现都以不可重入的方式使用全局数据结构。

应当了解,即使使用可重入函数,但是由于每个线程只有一个errno变量,所以信号处理程序可能会修改其原先值。
因此,作为一个通用的规则,当在信号处理程序中调用可重入函数时,应当在其前保存,在其后恢复errno。

若在信号处理程序中调用一个不可重入函数,则其结果是不可预见的。

#coding: utf-8
import signal
# Define signal handler function
i = 0
def myHandler(signum, frame):
    global i;
    i += 1;
    print('I received: ', signum)

# register signal.SIGTSTP’s handler
signal.signal(signal.SIGTSTP, myHandler)
signal.pause()
i += 1;
print('End of Signal Demo, i=',i)

输出结果:

python s1.py
Z(‘I received: ‘, 18)
(‘End of Signal Demo, i=’, 2)

这个例子比较简单。但是在更复杂的环境下,i被信号捕捉函数修改过并不是显而易见的。

mac下python编辑器

mac下python编辑器

前言

选择一款合适的编辑器有助于快速开发。
本文将会介绍3个。PyCharm、vim、emacs。
笔者在mac osx下这三个都用过,依次接触的vim,emacs,PyCharm。
后面两个是资深党用的,不建议初学者玩。
我个人是打算用emacs,然后用vim的编辑模式。

PyCharm

代码补全,语法错误提示,直接运行,Debug,底下还有Python Console,还有Terminal,版本控制工具。
一直用的比较顺,但是在使用过程中遇到几个问题。

如下代码

threads = []
t1 = threading.Thread(target=music,args=('爱情买卖',))
threads.append(t1)
t2 = threading.Thread(target=move,args=('阿凡达',))
threads.append(t2)

for t in threads:
    t.#这里没有自动补全了,在类似的场合都一样。痛苦。。

另外在看了Python高手之路后,想对代码进行语法规范检查,让期遵循pep8标准。也没有找到插件。

因为个人在使用过程中,发现vim的编辑效率是最高的。故安装了个vim插件。插件名叫IdeaVim。

另外它还支持virtualenv。

因考虑到有时候为了方便,会直接在服务器上写代码。故还得会一个命令行编辑器。于是下面两个出场了。

vim编辑器之神

mac自带vim7。
vim --versions

VIM – Vi IMproved 7.3 (2010 Aug 15, compiled Jun 14 2016 16:06:49)
Unknown option argument: “–versions”
More info with: “vim -h”

偷懒的配置方法可以参考两个命令把 Vim 打造成 Python IDE

自己动手配置参见下面的文章。

配置前准备

在配置python开发环境前,需要确保两点:

  1. Vim编辑版本应该大于7.3。
  2. 支持Python语言。在所选编辑器的功能中,确保你看到了+python。 如没有,可参考mac vim写wordpress 如果满足上述要求,接下来可以安装Vim扩展了。如果不满足,则需要安装/升级。

验证安装

确保你已经安装了7.3版本以上、支持Python的Vim编辑器。你可以再次运行vim –version进行确认。如果你想知道Vim中使用的Python版本,你可以在编辑器中运行:python import sys; print(sys.version)。

2.7.6 (default, Sep 9 2014, 15:04:36)
[GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.39)]
这行命令会输出你的编辑器当前的Python版本。如果报错,那么你的编辑器就不支持Python语言,需要重装或重新编译。

安装Vundle

Vim有多个扩展管理器,但是我们强烈推荐Vundle。你可以把它想象成Vim的pip。有了Vundle,安装和更新包这种事情不费吹灰之力。

我们现在来安装Vundle:

git clone https://github.com/gmarik/Vundle.vim.git ~/.vim/bundle/Vundle.vim

该命令将下载Vundle插件管理器,并将它放置在你的Vim编辑器bundles文件夹中。现在,你可以通过.vimrc配置文件来管理所有扩展了。

将配置文件添加到你的用户的home文件夹中:

touch ~/.vimrc
接下来,把下来的Vundle配置添加到配置文件的顶部:

set nocompatible              " required
filetype off                  " required

" set the runtime path to include Vundle and initialize
set rtp+=~/.vim/bundle/Vundle.vim
call vundle#begin()

" alternatively, pass a path where Vundle should install plugins
"call vundle#begin('~/some/path/here')

" let Vundle manage Vundle, required
Plugin 'gmarik/Vundle.vim'

" Add all your plugins here (note older versions of Vundle used Bundle instead of Plugin)

" All of your Plugins must be added before the following line
call vundle#end()            " required
filetype plugin indent on    " required

这样,你就完成了使用Vundle前的设置。之后,你就可以在配置文件中添加希望安装的插件,然后打开Vim编辑器,运行下面的命令:
:PluginInstall
这个命令告诉Vundle施展它的魔法——自动下载所有的插件,并为你进行安装和更新。

打造成IDE

扔掉鼠标

学会扔掉鼠标,习惯使用vim快捷键,你会发现编辑的效率大大提升。
硬件效率越来越高。摆在软件工程上的问题是人的效率问题。
vim编辑快,python代码量小。所以人生苦短,我用python。

Split Layouts

使用:sv 命令打开一个文件,你可以纵向分割布局(新文件会在当前文件下方界面打开),使用相反的命令:vs , 你可以得到横向分割布局(新文件会在当前文件右侧界面打开)。

你还可以嵌套分割布局,所以你可以在分割布局内容再进行分割,纵向或横向都可以,直到你满意为止。众所周知,我们开发时经常需要同时查看多个文件。

专业贴士:记得在输入完:sv后,利用tab补全功能,快速查找文件。

专业贴士:你还可以指定屏幕上可以进行分割布局的区域,只要在.vimrc文件中添加下面的代码即可:

set splitbelow
set splitright

专业贴士:想要不使用鼠标就切换分割布局吗?只要将下面的代码添加到.vimrc文件中,你就可以通过快捷组合键进行切换。

"split navigations
nnoremap <C-J> <C-W><C-J>
nnoremap <C-K> <C-W><C-K>
nnoremap <C-L> <C-W><C-L>
nnoremap <C-H> <C-W><C-H>

组合快捷键:

  • Ctrl-j 切换到下方的分割窗口
  • Ctrl-k 切换到上方的分割窗口
  • Ctrl-l 切换到右侧的分割窗口
  • Ctrl-h 切换到左侧的分割窗口 换句话说, 按Ctrl+Vim的标准移动键,就可以切换到指定窗口。

等等,nnoremap是什么意思?——简单来说,nnoremap将一个组合快捷键映射为另一个快捷键。 nnoremap = n(normal模式)no(非)re(递归)map(映射)。一开始的n,指的是在Vim的正常模式(Normal Mode)下,而不是可视模式下重新映射。基本上,nnoremap 就是说,当我在正常模式按下时,进行操作。更多信息请看这里

缓冲区(Buffers)

虽然Vim支持tab操作,仍有很多人更喜欢缓冲区和分割布局。你可以把缓冲区想象成最近打开的一个文件。Vim提供了方便访问近期缓冲区的方式,只需要输入:b <buffer name or number>,就可以切换到一个已经开启的缓冲区(此处也可使用自动补全功能)。你还可以通过ls命令查看所有的缓冲区。

专业贴士: 在:ls命令输出的最后,Vim会提示“敲击Enter继续查看”,这时你可以直接输入:b <buffer name>,立即选择缓冲区。这样可以省掉一个按键操作,也不必去记忆缓冲区的名字。

代码折叠(Code Folding)

大多数“现代”集成开发环境(IDE)都提供对方法(methods)或类(classes)进行折叠的手段,只显示类或方法的定义部分,而不是全部的代码。

你可以在.vimrc中添加下面的代码开启该功能:

" Enable folding
set foldmethod=indent
set foldlevel=99

这样就可以实现,但是你必须手动输入za来折叠(和取消折叠)。使用空格键会是更好的选择。所以在你的配置文件中加上这一行命令吧:

" Enable folding with the spacebar
nnoremap <space> za

现在你可以轻松地隐藏掉那些当前工作时不需要关注的代码了。

第一个命令,set foldmethod=ident会根据每行的缩进开启折叠。但是这样做会出现超过你所希望的折叠数目。但是别怕,有几个扩展就是专门解决这个问题的。在这里,我们推荐SimplyFold。在.vimrc中加入下面这行代码,通过Vundle进行安装:

Plugin 'tmhedberg/SimpylFold'

不要忘记执行安装命令::PluginInstall

专业贴士: 希望看到折叠代码的文档字符串?

let g:SimpylFold_docstring_preview=1

Python代码缩进

当然,想要代码折叠功能根据缩进情况正常工作,那么你就会希望自己的缩进是正确的。这里,Vim的自带功能无法满足,因为它实现不了定义函数之后的自动缩进。我们希望Vim中的缩进能做到以下两点:

  • 首先,缩进要符合PEP8标准。
  • 其次,更好地处理自动缩进。

PEP8

要支持PEP8风格的缩进,请在.vimrc文件中添加下面的代码:

au BufNewFile,BufRead *.py
\ set tabstop=4 |
\ set softtabstop=4 |
\ set shiftwidth=4 |
\ set textwidth=79 |
\ set expandtab |
\ set autoindent |
\ set fileformat=unix

这些设置将让Vim中的Tab键就相当于4个标准的空格符,确保每行代码长度不超过80个字符,并且会以unix格式储存文件,避免在推送到Github或分享给其他用户时出现文件转换问题。

另外,对于全栈开发,你可以设置针对每种文件类型设置au命令:

au BufNewFile,BufRead *.js, *.html, *.css
\ set tabstop=2 |
\ set softtabstop=2 |
\ set shiftwidth=2

自动缩进

自动缩进有用,但是在某些情况下(比如函数定义有多行的时候),并不总是会达到你想要的效果,尤其是在符合PEP8标准方面。我们可以利用indentpython.vim插件,来解决这个问题:

Plugin 'vim-scripts/indentpython.vim'

标示不必要的空白字符

我们希望避免出现多余的空白字符。可以让Vim帮我们标示出来,使其很容易发现并删除。

hi BadWhitespace guifg=gray guibg=red ctermfg=gray ctermbg=red
au BufRead,BufNewFile *.py,*.pyw,*.c,*.h match BadWhitespace /\s\+$/

这会将多余的空白字符标示出来,很可能会将它们变成红色突出。

支持UTF-8编码

大部分情况下,进行Python开发时你应该使用UTF-8编码,尤其是使用Python 3的时候。确保Vim设置文件中有下面的命令:

set encoding=utf-8

为了避免乱码,请加入下列配置:

set fileformats=unix,mac,dos
set fencs=utf-8,GB18030,ucs-bom,default,latin1

自动补全

支持Python自动补全的最好插件是YouCompleteMe。我们再次使用Vundle安装:

Plugin 'Valloric/YouCompleteMe'

YouCompleteMe插件其实底层使用了一些不同的自动补全组件(包括针对Python开发的Jedi),另外要安装一些C库才能正常工作。插件官方文档提供了很好的安装指南,我就不在这里重复了。切记跟随文档的步骤进行安装。

安装完成后,插件自带的设置效果就很好,但是我们还可以进行一些小的调整:

let g:ycm_autoclose_preview_window_after_completion=1
map <leader>g  :YcmCompleter GoToDefinitionElseDeclaration<CR>

上面的第一行确保了在你完成操作之后,自动补全窗口不会消失,第二行则定义了“转到定义”的快捷方式。

支持Virtualenv虚拟环境

上面“转到定义”功能的一个问题,就是默认情况下Vim不知道virtualenv虚拟环境的情况,所以你必须在配置文件中添加下面的代码,使得Vim和YouCompleteMe能够发现你的虚拟环境:

"python with virtualenv support
py << EOF
import os
import sys
if 'VIRTUAL_ENV' in os.environ:
project_base_dir = os.environ['VIRTUAL_ENV']
activate_this = os.path.join(project_base_dir, 'bin/activate_this.py')
execfile(activate_this, dict(file=activate_this))
EOF

这段代码会判断你目前是否在虚拟环境中编辑,然后切换到相应的虚拟环境,并设置好你的系统路径,确保YouCompleteMe能够找到相应的site packages文件夹。

语法检查/高亮

通过安装syntastic插件,每次保存文件时Vim都会检查代码的语法:

Plugin 'scrooloose/syntastic'

还可以通过这个小巧的插件,添加PEP8代码风格检查:

Plugin 'nvie/vim-flake8'

最后,让你的代码变得更漂亮:

let python_highlight_all=1
syntax on

配色方案

配色方案可以和你正在使用的基础配色共同使用。GUI模式可以尝试solarized方案, 终端模式可以尝试Zenburn方案:

Plugin 'jnurmine/Zenburn'
Plugin 'altercation/vim-colors-solarized'

接下来,只需要添加一点逻辑判断,确定什么模式下使用何种方案就可以了:

if has('gui_running')
  set background=dark
  colorscheme solarized
else
  colorscheme Zenburn
endif

Solarized方案同时提供了暗色调和轻色调两种主题。要支持切换主题功能(按F5)也非常简单,只需添加:

call togglebg#map("<F5>")

文件浏览

如果你想要一个不错的文件树形结构,那么NERDTree是不二之选。

Plugin 'scrooloose/nerdtree'

如果你想用tab键,可以利用vim-nerdtree-tabs插件实现:

Plugin 'jistr/vim-nerdtree-tabs'

还想隐藏.pyc文件?那么再添加下面这行代码吧:

let NERDTreeIgnore=['\.pyc$', '\~$'] "ignore files in NERDTree

超级搜索

想要在Vim中搜索任何文件?试试ctrlP插件吧:

Plugin 'kien/ctrlp.vim'

正如插件名,按Ctrl+P就可以进行搜索。如果你的检索词与想要查找的文件相匹配的话,这个插件就会帮你找到它。哦,对了——它不仅仅可以搜索文件,还能检索标签!更多信息,可以观看这个Youtube视频.

显示行号
开启显示行号:

set nu

Git集成

想要在Vim中执行基本的Git命令?vim-fugitive插件则是不二之选。

Plugin 'tpope/vim-fugitive'

请看Vimcasts的这部视频,了解更多情况。

Powerline状态栏

Powerline是一个状态栏插件,可以显示当前的虚拟环境、Git分支、正在编辑的文件等信息。

这个插件是用Python编写的,支持诸如zsh、bash、tmux和IPython等多种环境。

Plugin 'Lokaltog/powerline', {'rtp': 'powerline/bindings/vim/'}

请查阅插件的官方文档,了解配置选项。

系统剪贴板

通常Vim会忽视系统剪贴板,而使用自带的剪贴板。但是有时候你想从Vim之外的程序中剪切、复制、粘贴文本。在OS X平台上,你可以通过这行代码访问你的系统剪贴板:

set clipboard=unnamed

Shell开启Vim编辑模式

最后,当你熟练掌握了Vim和它的键盘快捷方式之后,你会发现自己经常因为shell中缺乏相同的快捷键而懊恼。没关系,大部分的shell程序都有Vi模式。在当前shell中开启Vi模式,你只需要在~/.inputrc文件中添加这行代码:

set editing-mode vi

现在,你不仅可以在shell中使用Vim组合快捷键,还可以在Python解释器以及任何利用GNU Readline程序的工具(例如,大多数的数据库shell)中使用。现在,你在什么地方都可以使用Vim啦!

参考阅读

vim基本命令
vim进阶命令
解决复制粘贴代码自动缩进问题
vim讲解生动篇
把vim打造成ide
Vim/Vi编程提升编写速度技巧

emacs神的编辑器

安装略 ,配置emacs可以参考https://github.com/redguardtoo/emacs.d

配置python开发环境可以参考

总结

特性 PyCharm VIM Emacs
代码自动补全
语法错误提示
语法规范检查pep8 x
运行代码
Debug
git
virtualenv
代码跳转

解决pip安装慢

解决pip安装慢

在学习Python的过程中,发现pip安装慢,经常timeout,因为国外镜像太慢了。

网上找了下解决方案。有两个。都是使用国内镜像。

  1. 临时解决,加-i参数。pip install -i https://pypi.tuna.tsinghua.edu.cn/simple 软件包
  2. 永久解决。配置环境。 编辑 ~/.pip/pip.conf文件(如没有,请创建)
[global]
timeout = 60
https://pypi.tuna.tsinghua.edu.cn/simple
download-cache = /Users/maynard/software/pip_cache

download-cache是用来缓存下载的软件包的。这样就可以在多个虚拟环境里复用。
请把路径改成自己的。。

清华镜像:https://pypi.tuna.tsinghua.edu.cn/simple
豆瓣镜像:http://pypi.douban.com/simple

Python3标准库urllib

Python3标准库urllib

前言

做web开发的,和http请求打交道最多了,不得不熟悉的就是urllib。当然爬虫也经常用。并且有好的第三方库requests。
本文就介绍这些东东。
note:是在python3.5下测试运行。

urllib

urllib有4个库。分别是:

  • urllib.request 打开和读url
  • urllib.error 包含由urllib.request抛出的异常。
  • urllib.parse 用来解析url
  • urllib.robotparser 用来解析robots.txt文件。

玩爬虫的同学请关注一下urllib.robotparser,做一个好程序员。-^

request

两行代码拿到页面内容。

get请求

from urllib.request import urlopen
print(urlopen("http://www.baidu.com").read())

urllib.request.urlopen(url, data=None, [timeout, ]*, cafile=None, capath=None, cadefault=False, context=None)
支持http、https、ftp协议。
urlopen返回 一个类文件对象,他提供了如下方法:

  • read() , readline() , readlines() , fileno() , close() :这些方法的使用方式与文件对象完全一样;
  • info():返回一个httplib.HTTPMessage 对象,表示远程服务器返回的头信息;
  • getcode():返回Http状态码。如果是http请求,200表示请求成功完成;404表示网址未找到;
  • geturl():返回请求的url;

参数说明:

  • url 可以是字符串,也可以是Request对象.
  • data 用于post请求时的数据。
  • timeout 超时时间
  • cafile & capath & cadefault 用于https的ca证书
  • context 同样是用于https请法庭。

需要注意的是这个方法使用的HTTP/1.1协议,并且在HTTP header里自动加入了Connection:close。

post请求

同样可以用urllib.request.urlopen实现。传入data即可。
第二种方式是传入的url是个Request对象。Request的构造函数中传入data即可。
如果没有数据传输,只能用第二第方式,只是Request对象的函数函数中需要明确指定method=’POST’。

完整的Request类定义如下:
urllib.request.Request(url, data=None, headers={}, origin_req_host=None, unverifiable=False, method=None)

参数说明:

  • url 字符串的网址
  • data 用于post的数据。如果有,则method自动为’POST’
  • headers http头。
  • origin_req_host 原始请求host。
  • unverifiable 用于指示请求是否是不可证实的。
  • method http方法

示例如下:

from urllib import request
from urllib import parse
#从这可以看出,前端的校验是给小白用的.码农是可以直接发起http请求,绕过前端js校验的.
data = {'name':'test1','email':'test@qq.com','passwd':'123456'}
response  = request.urlopen('http://awesome.go2live.cn/api/users',parse.urlencode(data).encode('utf-8'))
print(response.getcode())
print(response.read())

自定义header

前文已经说了。在构造Request对象时,传入headers参数即可。也可以之后调用Request的add_header方法。
这个主要用来模拟User-Agent和HTTP_REFERER。因为这两个经常用来在后端屏蔽掉请求。

譬如我的博客站点就是有屏蔽功能的。

直接请求会失败。

from urllib.request import urlopen
print(urlopen("http://www.go2live.cn").read())

输出结果:

Traceback (most recent call last):

urllib.error.HTTPError: HTTP Error 403: Forbidden

通过构造合适的user-agent就可以正常访问了。

from urllib import request
req = request.Request('http://www.go2live.cn',headers={'User-Agent':'Mozilla/5.0 (X11; U; Linux i686) Gecko/20071127 Firefox/2.0.0.11'})
response  = request.urlopen(req)
print(response.getcode())

输出结果:

200

还有Referer是用来防盗链的。请求某些资源时,Referer必须是来自指定的网站才可以,否则403拒绝访问。
另一个是Content-Type。
在post请求,自动成了application/x-www-form-urlencoded。
但是在api请求时,现在好多都改成了json的。这个时候需要用application/json。
另外上传文件时也不一样:multipart/form-data。

增加cookie

这个主要用来处理需要登录才能访问的页面。
这个稍为麻烦点。修复修改下opener才能处理。
http.cookiejiar包是来保存cookie的。
request.HTTPCookieProcessor是用来处理cookie的。
完整的代码示例如下:

from urllib import request
from urllib import parse
import http.cookiejar
import hashlib

URL_ROOT = 'http://awesome.go2live.cn'

cookie = http.cookiejar.CookieJar()# 用来保存cookie的对象
handler = request.HTTPCookieProcessor(cookie) #处理cookie的工具
opener = request.build_opener((handler))
response = opener.open(URL_ROOT)
print("before login")
for item in cookie:
    print('Name={0}, Value={1}'.format(item.name, item.value))
    
data = {'email':'test@qq.com','passwd':'123456'}
data['passwd'] = data['email']+":"+data['passwd']
data['passwd'] = hashlib.sha1(data['passwd'].encode()).hexdigest()
req = request.Request(URL_ROOT+'/api/authenticate', parse.urlencode(data).encode())
response = opener.open(req)
print(response.read())
print("after login")
for item in cookie:
    print('Name={0}, Value={1}'.format(item.name, item.value))

输出结果:

before login
b'{“admin”: 0, “created_at”: 1486789465.7326, “id”: “001486789465697a2db999d99f84506a24a457bee0eab76000”, “name”: “test”, “email”: “test@qq.com“, “passwd”: “******”, “image”: “http://www.gravatar.com/avatar/bf58432148b643a8b4c41c3901b81d1b?d=mm&s=120″}
after login
Name=awesession, Value=001486789465697a2db999d99f84506a24a457bee0eab76000-1486887360-220a1b13969736bb0f868cfef9076023a7ea3b02

上传文件

用还是urlopen方法。只是data的构造真的好蛋疼。。

#!/usr/bin/env python
urllib.request
import urllib.parse
import random, string
import mimetypes

def random_string (length):
    return ''.join (random.choice (string.ascii_letters) for ii in range (length + 1))

def encode_multipart_data (data, files):

    def get_content_type (filename):
        return mimetypes.guess_type (filename)[0] or 'application/octet-stream'

    def encode_field (field_name):
        return ('-----------------------------96951961826872/r/n',
                'Content-Disposition: form-data; name="%s"' % field_name, '/r/n'
                '', str (data [field_name]))

    def encode_file (field_name):
        filename = files [field_name]
        return ('-----------------------------96951961826872/r/n',
                'Content-Disposition: form-data; name="%s"; filename="%s"' % (field_name, filename), '/r/n'
                'Content-Type: %s' % get_content_type(filename), '/r/n/r/n'
                '', open (filename, 'rb').read ())

    lines = []
    for name in data:
        lines.extend (encode_field (name))
    for name in files:
        lines.extend (encode_file (name))
    lines.extend ('/r/n-----------------------------96951961826872--/r/n')
    body = b''
    for x in lines:
        if(type(x) == str):
            body += x.encode('ascii')
        else:
            body += x
    headers = {'Content-Type': 'multipart/form-data; boundary=---------------------------96951961826872',
               'Content-Length': str (len (body))}

    return body, headers

def main():
    url = 'http://awesome.go2live.cn'
    data = {}
    files = {'notePhoto': '01.jpg'}
    req = urllib.request.Request (url, *encode_multipart_data (data, files))
    response = urllib.request.urlopen(req)
if __name__ == '__main__':
    main()

下载文件

这个其实没啥。

from urllib import request
response = request.urlopen('http://stock.gtimg.cn/data/get_hs_xls.php?id=ranka&type=1&metric=chr')
with open('test1.xls', 'wb') as fd:
    fd.write(response.read())

Debug调试

这个有时候要看看具体的http请求是啥样的。
使用下面的方式会把收发包的内容在屏幕上打印出来。

from urllib import request

http_handler = request.HTTPHandler(debuglevel=1)
https_handler = request.HTTPSHandler(debuglevel=1)
opener = request.build_opener(http_handler,https_handler)
request.install_opener(opener)
response = request.urlopen('http://m.baidu.com')
print(response.read())

输出结果:

send: b’GET http://m.baidu.com HTTP/1.1\r\nAccept-Encoding…
reply: ‘HTTP/1.1 200 OK\r\n’
header: Cache-Control header: Content-Length header: Content-Type…

内容太长。打…省略。

requests库

相当强大的一个第三方库。支持以下特性:

  • International Domains and URLs
  • Keep-Alive & Connection Pooling
  • Sessions with Cookie Persistence
  • Browser-style SSL Verification
  • Basic/Digest Authentication
  • Elegant Key/Value Cookies
  • Automatic Decompression
  • Automatic Content Decoding
  • Unicode Response Bodies
  • Multipart File Uploads
  • HTTP(S) Proxy Support
  • Connection Timeouts
  • Streaming Downloads
  • .netrc Support
  • Chunked Requests
  • Thread-safety

官方文档

get请求

同样简单的很,两行代码即可。

import requests
print(requests.get('http://blog.go2live.cn').content)

post请求

requests的api好简单。get请求就调用get方法。post请求就调用post方法。

import requests

print(requests.post('http://awesome.go2live.cn/api/users',data={'name':'test1','email':'test@qq.com','passwd':'123456'}).content)

两行代码搞定。对比下urllib用了4行代码简单多了。。

而要传入json数据。再加个json参数就ok了。

自定义header

这个好简单,直接添加一个参数就可以了。

>>> url = 'https://api.github.com/some/endpoint'
>>> headers = {'user-agent': 'my-app/0.0.1'}

>>> r = requests.get(url, headers=headers)

处理cookie

cookie可以直接用响应对象的cookies变量拿到。

>>> url = 'http://example.com/some/cookie/setting/url'
>>> r = requests.get(url)

>>> r.cookies['example_cookie_name']
'example_cookie_value'

发送cookie可以加个cookies字典。

>>> url = 'http://httpbin.org/cookies'
>>> cookies = dict(cookies_are='working')

>>> r = requests.get(url, cookies=cookies)
>>> r.text
'{"cookies": {"cookies_are": "working"}}'

RequestsCookieJar可以设置得更复杂。

>>> jar = requests.cookies.RequestsCookieJar()
>>> jar.set('tasty_cookie', 'yum', domain='httpbin.org', path='/cookies')
>>> jar.set('gross_cookie', 'blech', domain='httpbin.org', path='/elsewhere')
>>> url = 'http://httpbin.org/cookies'
>>> r = requests.get(url, cookies=jar)
>>> r.text
'{"cookies": {"tasty_cookie": "yum"}}'

上传文件

上传文件也简单。用post方法,传个files参数。

>>> url = 'http://httpbin.org/post'
>>> files = {'file': ('report.xls', open('report.xls', 'rb'), 'application/vnd.ms-excel', {'Expires': '0'})}

>>> r = requests.post(url, files=files)
>>> r.text
{
  ...
    "files": {
        "file": "<censored...binary...data>"
    },
    ...
}

下载文件

感觉好简单。见下面。

import requests
url = 'http://stock.gtimg.cn/data/get_hs_xls.php?id=ranka&type=1&metric=chr'
r = requests.get(url) 
with open('test.xls', "wb") as code:
    code.write(r.content)

考虑到一次下载,可能文件过大,会消耗过多的内存。
requests推荐的下载方式如下:

import requests
r = requests.get('http://stock.gtimg.cn/data/get_hs_xls.php?id=ranka&type=1&metric=chr',stream=True)
with open('test.xls', 'wb') as fd:
    for chunk in r.iter_content(chunk_size=128):
            fd.write(chunk)

写在后面

对比了urllib和requests库。我决定以后都用requests库了。太方便了。
给个git地址,去fork下吧。